1、flask的异步并发
问题
- flask在开发环境下是单线程的,如果某个请求长时间无响应(阻塞),会导致其他请求也无法响应。
- flask原生并
不支持协程异步
并发。也就是说即使使用了协程异步编程,但如果使用flask开发的app接口,外部并发
请求时,flask
还是会把这些"已经异步编程的接口"
当成同步接口
执行。
解决办法
- 1、使用多线程或者多进程的服务器,例如 Gunicorn 或者 uWSGI。这些服务器可以同时处理多个请求。
- 2、使用异步非阻塞的服务器,例如 Tornado 或者 Twisted。这些服务器可以在处理一个请求的时候,如果遇到 IO 阻塞,就先去处理其他的请求,等 IO 完成后再回来继续处理这个请求。
- 3、使用异步编程,例如 asyncio 或者 gevent。这些库可以让你的代码在遇到 IO 阻塞的时候,自动切换到其他的任务,从而提高整体的并发性能。
但flask并不支持异步编程,推荐支持异步编程的 Web 框架,如 aiohttp 或 fastapi。
- 4、对于超时的请求,可以考虑设置一些超时机制,例如使用 future 或者 promise,如果一个请求超时,就直接返回错误,不再等待它完成。这样可以避免一个请求阻塞住整个服务器。
实现方案
(1)falsk + 异步视图装饰器
本质: 多线程并发。虽然宏观上看好像是协程的并发(单线程的并发)。
虽然asynic await关键字旨在实现同一线程里的并发,但flask并不原生支持这一特性(“不支持”不是说“不能调用”)。
所以,同一时间、同一线程、多个请求并发请求时,不管有无asynic await关键字修饰请求接口,flask都是一个一个顺序的、同步处理他们,而不是并发处理。
这里的装饰器实现的功能:
在 Making Flask async and Quart sync, Quart 的作者 PG Jones 给出了一个 Flask 异步化的代码,route 方法可加上 async 关键字
和 @run_async 装饰
当并发请求时,给
每个请求
开创一个线程
单独的处理,这样并发的请求就被放到了多线程里,从宏观上看就是并发了,这样也不会因为某个接口请求阻塞而导致其他接口也无法响应的问题。
但这种异步装饰器
,只是宏观上的“像”协程并发(协程:单线程下的并发),其实是多线程并发
。
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
'''
@Project :hippo-ai-py
@Author :cf
@Date :2024/4/2
@Desc : api公用信息
'''
import asyncio
from concurrent.futures import Future, ThreadPoolExecutor
from functools import wraps
from flask import Flask, has_request_context, copy_current_request_context
def run_async(func):
'''
flask异步视图装饰器。
Args:
func: 调用的方法
Returns:
'''
@wraps(func)
def _wrapper(*args, **kwargs):
call_result = Future()
def _run():
loop = asyncio.new_event_loop()
try:
result = loop.run_until_complete(func(*args, **kwargs))
except Exception as error:
call_result.set_exception(error)
else:
call_result.set_result(result)
finally:
loop.close()
loop_executor = ThreadPoolExecutor(max_workers=1)
if has_request_context():
_run = copy_current_request_context(_run)
loop_future = loop_executor.submit(_run)
loop_future.result()
return call_result.result()
return _wrapper
app = Flask(__name__)
async def fetch(url):
print(f"{threading.current_thread().name}:{url}")
return requests.get(url).text
async def main(t):
await asyncio.sleep(t)
tasks = [fetch(url) for url in ["https://baidu.com", "https://bing.com", "https://yanbin.blog"]]
return await asyncio.gather(*tasks)
@app.route("/")
@run_async
async def index():
time1 = time.time()
responses = await main(3)
time2 = time.time()
print(f'{threading.current_thread().name}--3:response sizes: {[len(res) for res in responses]},耗时{time2 - time1}s\n')
return responses
if __name__ == "__main__":
# app.run(debug=False, use_reloader=False,threaded=False)
app.run(debug=False, use_reloader=False)
(2)WSGI启动服务
本质: 多线程、多进程的并发。
2、fastapi异步编程
本质: 协程(单线程)并发。
待续。。。。。。。。。