流畅的Python(十八)-使用asyncio包处理并发

一、核心要义

1. 对比一个简答的多线程程序和对应的asyncio版,说明多线程和异步任务之间的关系

2. 网络下载的异步版

3. 在异步编程中,与回调相比,协程显著提升性能的方式

二、代码示例

1、相关知识点

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2024/3/10 10:14
# @Author  : Maple
# @File    : 00-相关知识点.py
# @Software: PyCharm
import sys

import collections

if __name__ == '__main__':

    write, flush = sys.stdout.write, sys.stdout.flush
    write('Hello world')
    flush()
    # \x08是回退符
    write('\x08' * 1) # Hello worl
    write('\n')

    write('Hello world')
    flush()
    write('\x08' * 2) # Hello wo
    write('\n')

    write('Hello world')
    flush()
    # 如果回退长度为字符串本身长度,相当于全部被清空
    write('\x08' * len('Hello world'))

2、指针旋转(多线程示例)

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2024/3/8 22:05
# @Author  : Maple
# @File    : 01-指针旋转(多线程示例).py
# @Software: PyCharm
import itertools
import sys
import threading
import time


class Signal:
    go = True

def spin(msg,signal):
    write,flush = sys.stdout.write,sys.stdout.flush
    for  char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        time.sleep(.1)
        if not signal.go:
            break
        write(' ' * len(status) + '\x08' * len(status))

def slow_function():
    # 假装等待IO一段时间
    time.sleep(3)
    return 42

def supervisor():
    signal = Signal()
    spinner = threading.Thread(target= spin, args=('thinking',signal))
    # 输出从属线程对象,类似:<Thread(thread-1,initial)>
    print('spinner object',spinner)
    spinner.start()
    result = slow_function()
    signal.go = False

    # 等待spinner线程结束
    spinner.join()
    return result

def main():
    result = supervisor()
    print('Answer:',result)

if __name__ == '__main__':

    main()

3、指针旋转(asyncio示例)

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2024/3/10 11:20
# @Author  : Maple
# @File    : 02-指针旋转(asyncio示例).py
# @Software: PyCharm
import asyncio
import itertools
import sys

# 打算交给asyncio处理的协程要使用该装饰器(Python3.8及之后的版本中已经过时了,建议使用async代替,详见指针旋转(asyncio示例2)),并非强制,但建议这么做

@asyncio.coroutine
def spin(msg):
    write,flush = sys.stdout.write,sys.stdout.flush
    for  char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        try:
            # 代替time.sleep,这样的休眠不会阻塞事件循环
            yield from asyncio.sleep(.1)
        except asyncio.CancelledError:
            break
        write(' ' * len(status) + '\x08' * len(status))

@asyncio.coroutine
def slow_function():
    # 假装等待I/0一段事件
    yield from asyncio.sleep(3)
    return 42

@asyncio.coroutine
def supervisor():
    # 使用async函数排定spin协程运行时间,使用一个Task对象包装spin协程,并立即返回
    # Task对象也是一个Future对象,因为Task类是future类的一个子类
    spinner = asyncio.create_task(spin('thinking'))
    print('spinner object',spinner)

    # 驱动slow_function函数,结束后获取返回值(参考16章-协程中的`委派生成器`一节)
    """
    第一级委派生成器:supervisor
    第二级委派生成器:slow_function
    """
    # 同时事件循环继续运行,因为slow_function函数最后使用yield from asyncio.sleep(3)
    # 表达式控制权交给了主线程

    result = yield from slow_function()
    spinner.cancel()
    return result

def main():
    # 获取事件循环的引用
    loop = asyncio.get_event_loop()
    # 驱动supervisor协程,让它运行完毕,这个协程的返回值是这次调用的返回值
    # 事件循环就是外部调用方(参考16章-协程中的`委派生成器`一节)
    result = loop.run_until_complete(supervisor())
    loop.close()
    print('Answer',result)


if __name__ == '__main__':
    main()

4、指针旋转(asyncio示例2)

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2024/3/10 11:20
# @Author  : Maple
# @File    : 02-指针旋转(asyncio示例).py
# @Software: PyCharm
import asyncio
import itertools
import sys

# 由于@asyncio.coroutine装饰器在Python3.8及以后的版本中已经过时
# Python官方建议使用async代替该装饰器

async def spin(msg):
    write,flush = sys.stdout.write,sys.stdout.flush
    for  char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        try:
            # 代替time.sleep,这样的休眠不会阻塞事件循环
            await asyncio.sleep(.1)
        except asyncio.CancelledError:
            break
        write(' ' * len(status) + '\x08' * len(status))


async def slow_function():
    # 假装等待I/0一段事件
    await asyncio.sleep(3)
    return 42


async def supervisor():
    # 使用async函数排定spin协程运行时间,使用一个Task对象包装spin协程,并立即返回
    spinner = asyncio.create_task(spin('thinking'))
    print('spinner object:',spinner) # spinner object: <Task pending name='Task-2' coro=<spin() running at D:/01-study/python/fluent_python/18-使用asyncio包处理并发/03-指针旋转(asyncio示例2).py:14>>
    print('spinner object type:',type(spinner)) #  <class '_asyncio.Task'>

    # 驱动slow_function函数,结束后获取返回值。同时事件循环继续运行,因为slow_function函数最后使用yield from asyncio.sleep(3)
    # 表达式控制权交给了主线程

    result = await slow_function()
    spinner.cancel()
    return result

def main():
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(supervisor())
    loop.close()
    print('Answer',result)


if __name__ == '__main__':
    main()

5、网络下载asyncio版本(1)

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2024/3/10 12:29
# @Author  : Maple
# @File    : 04-网络下载asyncio版本(1).py
# @Software: PyCharm
import asyncio
import time
from random import randint

"""
我们编程的协程链条:
1. 通过把最外层的委派生成器传给asyncio包API中的某个函数(如loop.run_until_complete驱动)
   即不通过调用next()或send()[对比参考16章-协程中的`委派生成器`一节]驱动协程
2. 我们编写的协程链条最终通过yield from把职责委托给asyncio包的某个协程函数或协程方法(比如本例中的asyncio.sleep(download_time))
   也就是说,最内层的`子生成器`是库中真正执行的I/O操作的函数,而不是我们自己编写的函数
3. 通常来说,我们编写的只是`委派生成器` 
"""


CC_LIST = [1, 2, 3, 4, 5, 6]

def get_randint():
    return randint(1, 10)

async def get_img(id):
    print('***', id, '号图片开始下载***')
    download_time = get_randint()
    await asyncio.sleep(download_time)
    return download_time


async def download_one_img(id):

    result  = await get_img(id)
    print('***{}号图片下载完成,花费时长{}s***'.format(id, result))
    return str(id) + '号图片内容'


def download_many_img(cc_list):
    # 获取事件循环的引用
    loop = asyncio.get_event_loop()
    # 构建协程对象列表
    to_do = [download_one_img(c) for c in sorted(cc_list)]
    # wait方法的参数是future对象或者协程构成的可迭代对象,该方法会把各个协程包装进一个Task对象(也是Future对象,因为Task类是
    # Future类的子类)
    wait_coro = asyncio.wait(to_do)

    # 执行事件循环,直到wait_coro运行结束,事件循环运行的过程中,程序会在这里阻塞.
    # 返回的第一个元素是一系列结束的future,第二个是一系列未结束的future(此例始终为空,所以赋值给_, wait有两个关键字,如果设置了可
    #   能会返回未结束的future -- timeout和return_when)
    res,_ = loop.run_until_complete(wait_coro)
    loop.close()
    return len(res)


def main(download_many_img):
    t0 = time.time()
    count = download_many_img(CC_LIST)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))


if __name__ == '__main__':
    main(download_many_img)

6、网络下载asyncio版本(2)

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2024/3/10 17:50
# @Author  : Maple
# @File    : 05-网路下载asyncio版本(2).py
# @Software: PyCharm


"""
使用asyncio.as_completed函数,由于
1. 该函数必须在协程中使用
2. download_many_img 不能是协程,因为main函数中需要调用download_many_img,而该参数是普通函数

因此,需要新增一个 downloader_coro 协程函数,专门用于多线程(协程)下载,而 download_many_img仅用于设置事件循环
"""
import asyncio
import time
from random import randint

CC_LIST = [1, 2, 3, 4, 5, 6]

def get_randint():
    return randint(1, 10)

async def get_img(id):
    print('***', id, '号图片开始下载***')
    download_time = get_randint()
    await asyncio.sleep(download_time)
    return download_time


async def download_one_img(id):

    result  = await get_img(id)
    print('***{}号图片下载完成,花费时长{}s***'.format(id, result))
    return str(id) + '号图片内容'

async def downloader_coro(cc_list):

    count = 0

    # 生成协程列表
    to_do = [download_one_img(c) for c in sorted(cc_list)]

    # 获取包装式Future对象
    to_do_iter = asyncio.as_completed(to_do)

    for future in to_do_iter:
        # 获取future(里面执行的是download_one_img函数)返回结果
        res = await future
        count +=1
        print(res)

    return count



def download_many_img(cc_list):
    # 获取事件循环的引用
    loop = asyncio.get_event_loop()

    # 实例化协程
    coro = downloader_coro(cc_list)

    res = loop.run_until_complete(coro)
    loop.close()
    return res


def main(download_many_img):
    t0 = time.time()
    count = download_many_img(CC_LIST)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))


if __name__ == '__main__':
    main(download_many_img)

7、一次性多次请求

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2024/3/10 20:14
# @Author  : Maple
# @File    : 06-每次多次请求.py
# @Software: PyCharm
import asyncio
import json
import re
from asgiref.sync import sync_to_async


def regex_data(data):
    """
    :param data:异步读取返回的原始数据,Sample数据如下:
    ({<Task finished name='Task-2' coro=<get_info() done, defined at D:/01-study/python/fluent_python/18-使用asyncio包处理并发/06-每次多次请求.py:47> result='China'>}, set())
    :return:提取原始数据中的result部分
    """
    regex = re.compile('result=(.*?)')
    result = regex.findall(str(data))
    return result

def read_json(country):
    with open('data/name.json', 'r') as f:
        # returns JSON object as a  dictionary
        data = json.load(f)
    return data[country]


async def read_json_async(country_code):
    try:
        data = await sync_to_async(read_json)(country_code)
    except Exception as e:
        data = 'code不存在'
    return data


def read_img(path):
    with open(path, 'rb') as f:
        data = f.read()
    return data


# f.read二级制文件 不能直接使用await
async def read_img_async(path):
    # 同步函数(必须有参数,否则报错,暂时未研究原因)转异步
    try:
        data = await sync_to_async(read_img)(path)
    except Exception as e:
        data = 'code不存在'
    return data


async def get_info(type,code=None,path=None):
    # 如果传入json,就返回姓名信息
    if 'json' == type:
        data = await read_json_async(code)

    elif 'img' == type:
        # 否则返回图片信息
        #  直接返回的数据Sample: {<Task finished name='Task-2' coro=<read_img_sync() done, defined at D:/01-study/python/fluent_python/18-使用asyncio包处理并发/test.py:64>
        #   result=b'\xff\xd8\xf...8\x8c\xf6\x9f'>}
        data = await read_img_async(path)
    return data

async def get_names(code,path):

    darling = await get_info('json',code=code)
    img_data = await get_info('img',path=path)
    return (darling,img_data)


def download_many(code=None,path=None):
    loop = asyncio.get_event_loop()
    to_do = [get_names(code='0001', path ='data/profile_photo.jpg'), get_names(code='0002', path ='data/profile_photo.jpg')]
    wait_coro = asyncio.wait(to_do)
    # ({<Task finished name='Task-2' coro=<get_info() done, defined at D:/01-study/python/fluent_python/18-使用asyncio包处理并发/06-每次多次请求.py:47> result='China'>}, set())
    datas = loop.run_until_complete(wait_coro)
    print(datas)
    for i in range(len(datas)):
        # 解析数据
        if datas[i]:
            data = regex_data(datas[i])[0]
            print(data)
    loop.close()




def main(download_many):
    data = download_many()


if __name__ == '__main__':
    main(download_many)

最近更新

  1. TCP协议是安全的吗?

    2024-03-12 04:26:02       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-12 04:26:02       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-12 04:26:02       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-12 04:26:02       20 阅读

热门阅读

  1. ARM TrustZone技术介绍

    2024-03-12 04:26:02       18 阅读
  2. linux新一代的RPM软件包管理器dnf

    2024-03-12 04:26:02       28 阅读
  3. Linux中basename作用

    2024-03-12 04:26:02       24 阅读
  4. Dutree:Linux 文件系统磁盘使用追踪工具

    2024-03-12 04:26:02       20 阅读
  5. 权限管理系统-0.3.0

    2024-03-12 04:26:02       20 阅读
  6. 【Flink SQL】Flink SQL 基础概念:数据类型

    2024-03-12 04:26:02       21 阅读
  7. Vuex getters源码分析

    2024-03-12 04:26:02       19 阅读
  8. 嵌出式008

    2024-03-12 04:26:02       17 阅读