一、核心要义
1. 对比一个简答的多线程程序和对应的asyncio版,说明多线程和异步任务之间的关系
2. 网络下载的异步版
3. 在异步编程中,与回调相比,协程显著提升性能的方式
二、代码示例
1、相关知识点
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/10 10:14
# @Author : Maple
# @File : 00-相关知识点.py
# @Software: PyCharm
import sys
import collections
if __name__ == '__main__':
write, flush = sys.stdout.write, sys.stdout.flush
write('Hello world')
flush()
# \x08是回退符
write('\x08' * 1) # Hello worl
write('\n')
write('Hello world')
flush()
write('\x08' * 2) # Hello wo
write('\n')
write('Hello world')
flush()
# 如果回退长度为字符串本身长度,相当于全部被清空
write('\x08' * len('Hello world'))
2、指针旋转(多线程示例)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/8 22:05
# @Author : Maple
# @File : 01-指针旋转(多线程示例).py
# @Software: PyCharm
import itertools
import sys
import threading
import time
class Signal:
go = True
def spin(msg,signal):
write,flush = sys.stdout.write,sys.stdout.flush
for char in itertools.cycle('|/-\\'):
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status))
time.sleep(.1)
if not signal.go:
break
write(' ' * len(status) + '\x08' * len(status))
def slow_function():
# 假装等待IO一段时间
time.sleep(3)
return 42
def supervisor():
signal = Signal()
spinner = threading.Thread(target= spin, args=('thinking',signal))
# 输出从属线程对象,类似:<Thread(thread-1,initial)>
print('spinner object',spinner)
spinner.start()
result = slow_function()
signal.go = False
# 等待spinner线程结束
spinner.join()
return result
def main():
result = supervisor()
print('Answer:',result)
if __name__ == '__main__':
main()
3、指针旋转(asyncio示例)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/10 11:20
# @Author : Maple
# @File : 02-指针旋转(asyncio示例).py
# @Software: PyCharm
import asyncio
import itertools
import sys
# 打算交给asyncio处理的协程要使用该装饰器(Python3.8及之后的版本中已经过时了,建议使用async代替,详见指针旋转(asyncio示例2)),并非强制,但建议这么做
@asyncio.coroutine
def spin(msg):
write,flush = sys.stdout.write,sys.stdout.flush
for char in itertools.cycle('|/-\\'):
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status))
try:
# 代替time.sleep,这样的休眠不会阻塞事件循环
yield from asyncio.sleep(.1)
except asyncio.CancelledError:
break
write(' ' * len(status) + '\x08' * len(status))
@asyncio.coroutine
def slow_function():
# 假装等待I/0一段事件
yield from asyncio.sleep(3)
return 42
@asyncio.coroutine
def supervisor():
# 使用async函数排定spin协程运行时间,使用一个Task对象包装spin协程,并立即返回
# Task对象也是一个Future对象,因为Task类是future类的一个子类
spinner = asyncio.create_task(spin('thinking'))
print('spinner object',spinner)
# 驱动slow_function函数,结束后获取返回值(参考16章-协程中的`委派生成器`一节)
"""
第一级委派生成器:supervisor
第二级委派生成器:slow_function
"""
# 同时事件循环继续运行,因为slow_function函数最后使用yield from asyncio.sleep(3)
# 表达式控制权交给了主线程
result = yield from slow_function()
spinner.cancel()
return result
def main():
# 获取事件循环的引用
loop = asyncio.get_event_loop()
# 驱动supervisor协程,让它运行完毕,这个协程的返回值是这次调用的返回值
# 事件循环就是外部调用方(参考16章-协程中的`委派生成器`一节)
result = loop.run_until_complete(supervisor())
loop.close()
print('Answer',result)
if __name__ == '__main__':
main()
4、指针旋转(asyncio示例2)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/10 11:20
# @Author : Maple
# @File : 02-指针旋转(asyncio示例).py
# @Software: PyCharm
import asyncio
import itertools
import sys
# 由于@asyncio.coroutine装饰器在Python3.8及以后的版本中已经过时
# Python官方建议使用async代替该装饰器
async def spin(msg):
write,flush = sys.stdout.write,sys.stdout.flush
for char in itertools.cycle('|/-\\'):
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status))
try:
# 代替time.sleep,这样的休眠不会阻塞事件循环
await asyncio.sleep(.1)
except asyncio.CancelledError:
break
write(' ' * len(status) + '\x08' * len(status))
async def slow_function():
# 假装等待I/0一段事件
await asyncio.sleep(3)
return 42
async def supervisor():
# 使用async函数排定spin协程运行时间,使用一个Task对象包装spin协程,并立即返回
spinner = asyncio.create_task(spin('thinking'))
print('spinner object:',spinner) # spinner object: <Task pending name='Task-2' coro=<spin() running at D:/01-study/python/fluent_python/18-使用asyncio包处理并发/03-指针旋转(asyncio示例2).py:14>>
print('spinner object type:',type(spinner)) # <class '_asyncio.Task'>
# 驱动slow_function函数,结束后获取返回值。同时事件循环继续运行,因为slow_function函数最后使用yield from asyncio.sleep(3)
# 表达式控制权交给了主线程
result = await slow_function()
spinner.cancel()
return result
def main():
loop = asyncio.get_event_loop()
result = loop.run_until_complete(supervisor())
loop.close()
print('Answer',result)
if __name__ == '__main__':
main()
5、网络下载asyncio版本(1)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/10 12:29
# @Author : Maple
# @File : 04-网络下载asyncio版本(1).py
# @Software: PyCharm
import asyncio
import time
from random import randint
"""
我们编程的协程链条:
1. 通过把最外层的委派生成器传给asyncio包API中的某个函数(如loop.run_until_complete驱动)
即不通过调用next()或send()[对比参考16章-协程中的`委派生成器`一节]驱动协程
2. 我们编写的协程链条最终通过yield from把职责委托给asyncio包的某个协程函数或协程方法(比如本例中的asyncio.sleep(download_time))
也就是说,最内层的`子生成器`是库中真正执行的I/O操作的函数,而不是我们自己编写的函数
3. 通常来说,我们编写的只是`委派生成器`
"""
CC_LIST = [1, 2, 3, 4, 5, 6]
def get_randint():
return randint(1, 10)
async def get_img(id):
print('***', id, '号图片开始下载***')
download_time = get_randint()
await asyncio.sleep(download_time)
return download_time
async def download_one_img(id):
result = await get_img(id)
print('***{}号图片下载完成,花费时长{}s***'.format(id, result))
return str(id) + '号图片内容'
def download_many_img(cc_list):
# 获取事件循环的引用
loop = asyncio.get_event_loop()
# 构建协程对象列表
to_do = [download_one_img(c) for c in sorted(cc_list)]
# wait方法的参数是future对象或者协程构成的可迭代对象,该方法会把各个协程包装进一个Task对象(也是Future对象,因为Task类是
# Future类的子类)
wait_coro = asyncio.wait(to_do)
# 执行事件循环,直到wait_coro运行结束,事件循环运行的过程中,程序会在这里阻塞.
# 返回的第一个元素是一系列结束的future,第二个是一系列未结束的future(此例始终为空,所以赋值给_, wait有两个关键字,如果设置了可
# 能会返回未结束的future -- timeout和return_when)
res,_ = loop.run_until_complete(wait_coro)
loop.close()
return len(res)
def main(download_many_img):
t0 = time.time()
count = download_many_img(CC_LIST)
elapsed = time.time() - t0
msg = '\n{} flags downloaded in {:.2f}s'
print(msg.format(count, elapsed))
if __name__ == '__main__':
main(download_many_img)
6、网络下载asyncio版本(2)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/10 17:50
# @Author : Maple
# @File : 05-网路下载asyncio版本(2).py
# @Software: PyCharm
"""
使用asyncio.as_completed函数,由于
1. 该函数必须在协程中使用
2. download_many_img 不能是协程,因为main函数中需要调用download_many_img,而该参数是普通函数
因此,需要新增一个 downloader_coro 协程函数,专门用于多线程(协程)下载,而 download_many_img仅用于设置事件循环
"""
import asyncio
import time
from random import randint
CC_LIST = [1, 2, 3, 4, 5, 6]
def get_randint():
return randint(1, 10)
async def get_img(id):
print('***', id, '号图片开始下载***')
download_time = get_randint()
await asyncio.sleep(download_time)
return download_time
async def download_one_img(id):
result = await get_img(id)
print('***{}号图片下载完成,花费时长{}s***'.format(id, result))
return str(id) + '号图片内容'
async def downloader_coro(cc_list):
count = 0
# 生成协程列表
to_do = [download_one_img(c) for c in sorted(cc_list)]
# 获取包装式Future对象
to_do_iter = asyncio.as_completed(to_do)
for future in to_do_iter:
# 获取future(里面执行的是download_one_img函数)返回结果
res = await future
count +=1
print(res)
return count
def download_many_img(cc_list):
# 获取事件循环的引用
loop = asyncio.get_event_loop()
# 实例化协程
coro = downloader_coro(cc_list)
res = loop.run_until_complete(coro)
loop.close()
return res
def main(download_many_img):
t0 = time.time()
count = download_many_img(CC_LIST)
elapsed = time.time() - t0
msg = '\n{} flags downloaded in {:.2f}s'
print(msg.format(count, elapsed))
if __name__ == '__main__':
main(download_many_img)
7、一次性多次请求
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/10 20:14
# @Author : Maple
# @File : 06-每次多次请求.py
# @Software: PyCharm
import asyncio
import json
import re
from asgiref.sync import sync_to_async
def regex_data(data):
"""
:param data:异步读取返回的原始数据,Sample数据如下:
({<Task finished name='Task-2' coro=<get_info() done, defined at D:/01-study/python/fluent_python/18-使用asyncio包处理并发/06-每次多次请求.py:47> result='China'>}, set())
:return:提取原始数据中的result部分
"""
regex = re.compile('result=(.*?)')
result = regex.findall(str(data))
return result
def read_json(country):
with open('data/name.json', 'r') as f:
# returns JSON object as a dictionary
data = json.load(f)
return data[country]
async def read_json_async(country_code):
try:
data = await sync_to_async(read_json)(country_code)
except Exception as e:
data = 'code不存在'
return data
def read_img(path):
with open(path, 'rb') as f:
data = f.read()
return data
# f.read二级制文件 不能直接使用await
async def read_img_async(path):
# 同步函数(必须有参数,否则报错,暂时未研究原因)转异步
try:
data = await sync_to_async(read_img)(path)
except Exception as e:
data = 'code不存在'
return data
async def get_info(type,code=None,path=None):
# 如果传入json,就返回姓名信息
if 'json' == type:
data = await read_json_async(code)
elif 'img' == type:
# 否则返回图片信息
# 直接返回的数据Sample: {<Task finished name='Task-2' coro=<read_img_sync() done, defined at D:/01-study/python/fluent_python/18-使用asyncio包处理并发/test.py:64>
# result=b'\xff\xd8\xf...8\x8c\xf6\x9f'>}
data = await read_img_async(path)
return data
async def get_names(code,path):
darling = await get_info('json',code=code)
img_data = await get_info('img',path=path)
return (darling,img_data)
def download_many(code=None,path=None):
loop = asyncio.get_event_loop()
to_do = [get_names(code='0001', path ='data/profile_photo.jpg'), get_names(code='0002', path ='data/profile_photo.jpg')]
wait_coro = asyncio.wait(to_do)
# ({<Task finished name='Task-2' coro=<get_info() done, defined at D:/01-study/python/fluent_python/18-使用asyncio包处理并发/06-每次多次请求.py:47> result='China'>}, set())
datas = loop.run_until_complete(wait_coro)
print(datas)
for i in range(len(datas)):
# 解析数据
if datas[i]:
data = regex_data(datas[i])[0]
print(data)
loop.close()
def main(download_many):
data = download_many()
if __name__ == '__main__':
main(download_many)