零基础学python之高级编程(6)---Python中进程的Queue 和进程锁,以及进程池的创建 (包含详细注释代码)

Python中进程的Queue 和进程锁,以及进程池的创建


前言

大家好,上一篇文章,我们初步接触了进程的概念及其应该如何创建一个进程等等,今天我们继续来深入学习一下进程中的进程间同步 Queue进程锁lock 还有进行创建一个进程池pool 等等,我们开始今天的学习吧 !


一、进程间同步通信(Queue)

我们在使用进程的时候,有时候我们需要进行进程之间的通信,multiprocessing 模块提供了Queue类 来实现不同进程之间的通信.

Queue():括号内传的参数表示最大可接受的消息数量;没有传参或者数量为负数,则表示最大可接受的数量没有上限(直到内存的尽头)

Queue.put(item, block, timeout) : 将item消息写入队列中,block默认为True.

  • 如果block使用默认值,且没有设置timeout秒数,消息队列如果已经没有空间可以写入,将会被阻塞,直到从消息队列腾出空间为止
  • 如果block使用默认值,设置了timeout秒数,则会等待timeout秒,若还没有空间,则会抛出异常’queue.full’.

我们首先先创建一个进程对象:

在这里插入图片描述
上图就是我们queue 中的方法,接下来我们看看如何使用queue.

from  multiprocessing import Queue

q = Queue(3)  # 创建一个Queue 对象,设置最大可接受put对象为3个
q.put('消息1')
q.put('消息2')
print(q.full()) # 判断队列是否满了 这里面输出False
q.put('消息3')
print(q.full()) #判断队列是否满了 这里面输出True

print(q.get()) # 获取队列中的消息,获取后把消息从队列中删除

if not q.full(): # 判断队列是否满,如果不满就继续将消息写入队列
    q.put('消息4')

print(q.get())
print(q.get())
print(q.get())

运行结果:

在这里插入图片描述

from  multiprocessing import Queue

q = Queue(3)  # 创建一个Queue 对象,设置最大可接受put对象为3个
q.put('消息1')
q.put('消息2')
print(q.full()) # 判断队列是否满了 这里面输出False
q.put('消息3')
print(q.full()) #判断队列是否满了 这里面输出True

print(q.get())
if not q.full():
    q.put('消息4')

# print(q.get())
# print(q.get())
# print(q.get())

if not q.empty():  # 判断队列是否为空
    for i in range(q.qsize()): 
        print(q.get_nowait())

运行结果:
在这里插入图片描述

注意:

Queue.put_nowait(item)===>Queue.put(item, False)

Queue.get_nowait()===>Queue.get(False)

Queue.get(block, timeout):获取队列中的一条信息,然后将其从队列中移除,block默认为True

  • 如果block使用默认值,且没有设置timeout秒数,消息队列如果为空,将会被阻塞,直到从消息队列读到消息为止.

  • 如果block使用默认值,设置了timeout秒数,则会等待timeout秒,若还没有消息,则会抛出异常’queue.empty’.

如果block值为False,消息队列如果为空,则立即抛出异常’queue.empty

接下来我们进行一次完整的进程间的通信

import multiprocessing  #导包
import time
import random


def write(q):  # 定义一个函数
    for i in ['a', 'b', 'c']:  
        print('放入:', i)
        q.put(i) # 分别将a,b,c 消息传入队列
        time.sleep(random.random())


def read(q):
    while True:
        if not q.empty():
            print('获取:', q.get()) 获取队列中的消息
            time.sleep(random.random())
        else:
            break


if __name__ == '__main__':
    q = multiprocessing.Queue()  # 创建一个Queue对象 q
    pw = multiprocessing.Process(target=write, args=(q, ))  # 创建一个子进程 pw ,并将queue 对象传入
    pr = multiprocessing.Process(target=read, args=(q, ))   # 创建一个子进程 pr,并将queue对象传入
    pw.start() # pw 子进程开始
    pw.join()  # 等待 pw进程结束 
    pr.start() # pr 子进程开始
    pr.join() # 等待 pr进程结束
    print('结束')

运行结果:

在这里插入图片描述

二、进程锁(Lock)

Python进程锁是用来在多进程程序中对共享资源进行同步访问的一种机制。
当多个进程需要同时访问某个共享资源时,可能会出现竞争条件(race condition),导致数据不一致或错误的结果。进程锁可以用来保证在任意时刻只有一个进程能够访问共享资源,从而避免竞争条件的发生。

进程锁的主要作用是确保在某个进程正在使用共享资源时,其他进程无法访问该资源,直到当前进程释放锁为止。这样可以保证共享资源在任意时刻只会被一个进程修改,从而避免数据不一致或错误的结果。

Python中的进程锁可以通过使用multiprocessing模块中的Lock类来实现。通过调用lock.acquire()方法可以获得锁,其他进程如果尝试获得同一个锁则会被阻塞。而通过调用lock.release()方法可以释放锁,允许其他进程获得锁并访问共享资源。

需要注意的是,进程锁只能在同一个计算机的多个进程之间起作用,无法在不同计算机之间的进程之间起作用。如果需要进行跨网络的进程同步,可以考虑使用分布式锁的机制。

Python中可以使用多种方式实现进程锁,以下是常用的两种:

使用multiprocessing.Lock():multiprocessing.Lock()是Python标准库中的一个进程锁实现。可以通过acquire()方法获得锁,并在任务完成后使用release()方法释放锁。
例如:

from multiprocessing import Lock, Process

def func(lock, num):
    lock.acquire()
    try:
        # 临界区代码
        print(f"进程{num}获得了锁")
    finally:
        lock.release()
        print(f"进程{num}释放了锁")

if __name__ == "__main__":
    lock = Lock()
    processes = []
    for i in range(5):
        p = Process(target=func, args=(lock, i))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

也可使用multiprocessing.Manager().Lock():multiprocessing.Manager()是一个多进程共享数据的管理器。可以通过Manager().Lock()方法创建一个进程锁。具体使用方式与multiprocessing.Lock()相同,只是创建锁的方式有所不同。
例如:

from multiprocessing import Process, Manager

def func(lock, num):
    lock.acquire()
    try:
        # 临界区代码
        print(f"进程{num}获得了锁")
    finally:
        lock.release()
        print(f"进程{num}释放了锁")

if __name__ == "__main__":
    manager = Manager()
    lock = manager.Lock()
    processes = []
    for i in range(5):
        p = Process(target=func, args=(lock, i))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()



三、创建进程池Poor

Python进程池:是一种用于管理和复用多个进程的工具。进程池可以提高程序的并发处理能力,通过复用已经创建的进程,避免频繁创建和销毁进程的开销。

pool 类方法:

Pool()括号内表示处理的进程数量

apply_async(func, args, kwargs):使用非阻塞方式调用func(并行执行,阻塞就是必须等待上一个进程退出才能执行下一个进程)

close():关闭进程池,不在接收新的进程

terminate():不管任务是否完成,立即终止

join():阻塞,等待所有子进程结束,必须在close之后使用.

import multiprocessing
import time

def run(f):
    time.sleep(1)
    return f * f


if __name__ == '__main__':
    test = [1, 2, 3, 4, 5, 6]
        # print('顺序:')
        # s = time.time()  # 计算当前时间
        # for i in test:
        #     print(run(i))
    e = time.time()
        # print('顺序执行时间:', int(e-s))

        # map(函数名, 可迭代对象):循环将可迭代对象传递给函数执行
    print('并发:')
    pool = multiprocessing.Pool(5)  # 创建能够有5条进程的进程池
    r1 = pool.map(run, test)
    pool.close()
    pool.join()
    e2 = time.time()
    print('并发时间:', int(e2 - e))
    print(r1)

运行结果:
在这里插入图片描述

在进程池里进行进程间的通信
例如:

import multiprocessing, os, time, random


def write(q):
    print('write(%s)启动' % os.getpid())
    for i in ['a', 'b', 'c', 'd', 'e']:
        q.put(i)


def read(q):
    print('read(%s)启动' % os.getpid())
    for i in range(q.qsize()):
        print('获取:', q.get())


if __name__ == '__main__':
    print('主进程(%s)开始' % os.getpid())
    q = multiprocessing.Manager().Queue()
    pool = multiprocessing.Pool()
    pool.apply_async(write, (q, ))
    pool.apply_async(read, (q, ))
    pool.close()
    pool.join()
    print('主进程结束')

运行结果:

在这里插入图片描述


End!

讲的不好,多多见谅,我们下次再见!

更多优质文章点这里

最近更新

  1. TCP协议是安全的吗?

    2024-03-28 00:38:03       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-28 00:38:03       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-28 00:38:03       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-28 00:38:03       18 阅读

热门阅读

  1. AI大模型学习:开启智能时代的新篇章

    2024-03-28 00:38:03       17 阅读
  2. spring-boot解析spring.factories文件

    2024-03-28 00:38:03       19 阅读
  3. 蓝桥杯day15刷题日记

    2024-03-28 00:38:03       16 阅读
  4. vue中数字转汉字,带小数转化

    2024-03-28 00:38:03       17 阅读
  5. 利用Cas中service重定向钓鱼网站问题

    2024-03-28 00:38:03       22 阅读
  6. 【机器学习】如何计算解释模型的SHAP值

    2024-03-28 00:38:03       18 阅读
  7. 华为机试真题练习汇总(101~110)

    2024-03-28 00:38:03       16 阅读
  8. 新建uni-modules插件

    2024-03-28 00:38:03       18 阅读
  9. 前端理论总结(js)——闭包和内存泄漏

    2024-03-28 00:38:03       19 阅读
  10. 关于远程调试应用中的网页鸿蒙

    2024-03-28 00:38:03       18 阅读