python多线程与多进程开发实践及填坑记(3)

1. 前言

1.1. 概述

基于Flask、Pika、Multiprocessing、Thread搭建一个架构,完成多线程、多进程工作。具体需求如下:

  1. 并行计算任务:使用multiprocessing模块实现并行计算任务,提高计算效率、计算能力。
  2. 消息侦听任务:使用threading模块完成RabbitMQ消息队列的侦听任务,将接收到的数据放入multiprocessing.Queue中,以便并行计算任务处理。
  3. Web服务:使用Flask框架实现Web API服务,提供启停消息侦听任务、启停并行计算任务以及动态调整参数的功能。
  4. 任务交互:通过multiprocessing.Queue实现消息侦听任务与并行计算任务之间的资源交互。
  5. 非阻塞运行:使用threading模块非阻塞地运行Flask Web服务。

1.2. 续上一篇《python多线程与多进程开发实践及填坑记(1)》

python多线程与多进程开发实践及填坑记(1)

1.3. 重新启动工作进程报错

AttributeError: Can’t get attribute ‘worker’ on <module ‘main’ (built-in)>

程序没有报错,但是,没有启动侦听服务线程。

@app.route('/startworking', methods=['GET'])
def start_worker():
    if len(processes) == 0: 
        for _ in range(3):  
            p = multiprocessing.Process(target=worker, args=(data_queue, rabbitmq_params, target_exchange, target_queue, routing_key, pso_params))  
            p.start()  
            processes.append(p)   
            print(f'process id = {p.pid}')
        return jsonify({'status': 'started working'}), 200
    else:
        return jsonify({'status': 'already working'}), 202

2. 原理解析

这里遇到的问题是由 Python 的 multiprocessing 模块在 Windows 上的行为引起的。具体来说,multiprocessing 模块在 Windows 上使用“spawn”启动方法,而不是“fork”,这意味着每个子进程需要能够导入主模块中的所有内容。如果某些对象不能被 pickle 化(例如局部函数),将会导致你看到的 AttributeError 错误。

为了解决这个问题,我们需要确保 worker 函数在主模块的全局命名空间中可用,将 worker 函数和其他依赖函数放到一个单独的模块中,然后在主模块中导入它们。

3. 拆分出代码

import pika
import json
from loguru import logger

# 假设这是你的计算函数  
def compute_result(data, pso_params):  
    return {"result": data}

# 发送结果到RabbitMQ的函数  
def send_result_to_rabbitmq(channel, exchange_name, queue_name, routing_key, result):  
    # 省略

# 工作进程函数  
def worker(data_queue, rabbitmq_params, target_exchange, target_queue, routing_key, pso_params): 
    logger.info('Worker started') 
    # 省略
    logger.info('Worker finished')

关键改动:

  • 将 worker 函数和相关依赖函数移到 workers.py 模块中。
  • 在主模块中导入 worker 函数。
  • 保持 data_queue 的初始化和使用不变。
  • 这样可以确保在多进程环境中,每个子进程都可以正确地导入 worker 函数及其依赖函数。

4. python多线程与多进程及RabbitMQ程序架构

4.1. 模块划分

  1. 主模块 (main.py): 启动Flask服务,管理消息侦听任务和并行计算任务。
  2. Worker模块 (workers.py): 定义并行计算任务及其辅助函数。
  3. Utils模块 (utils.py): 定义RabbitMQ相关的辅助函数。

4.2. 代码实现

  1. 主模块

文件名称:main.py

from flask import Flask, jsonify
from threading import Thread, Event
import multiprocessing
import pika
import logging
from workers import worker
from utils import consume_from_rabbitmq_and_enqueue

# 配置日志记录
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 创建一个事件来控制侦听
stop_event = Event()

# 定义web服务
app = Flask(__name__)

@app.route('/startlistening', methods=['GET'])
def start_listening():
    if stop_event.is_set():
        stop_event.clear()
    if not hasattr(app, 'pika_thread') or not app.pika_thread.is_alive():
        app.pika_thread = Thread(target=consume_from_rabbitmq_and_enqueue, args=(rabbitmq_params, rabbitmq_queue, data_queue, stop_event))
        app.pika_thread.start()
        return jsonify({'status': 'listening'}), 200
    else:
        return jsonify({'status': 'already listening'}), 200

@app.route('/stoplistening', methods=['GET'])
def stop_listening():
    if hasattr(app, 'pika_thread') and app.pika_thread.is_alive():
        stop_event.set()
        app.pika_thread.join()
        del app.pika_thread
        return jsonify({'status': 'stopped'}), 200  
    else:
        return jsonify({'status': 'not running'}), 400

@app.route('/startworking', methods=['GET'])
def start_worker():
    if len(processes) == 0: 
        for _ in range(3):  
            p = multiprocessing.Process(target=worker, args=(data_queue, rabbitmq_params, target_exchange, target_queue, routing_key, pso_params))  
            p.start()  
            processes.append(p)   
            print(f'process id = {p.pid}')
        return jsonify({'status': 'started working'}), 200
    else:
        return jsonify({'status': 'already working'}), 202

@app.route('/stopworking', methods=['GET'])
def stop_worker():
    for p in processes:
        data_queue.put(None)
    for p in processes:
        p.join()
    processes.clear() 
    return jsonify({'status': 'stopped working'}), 200

if __name__ == "__main__":
    rabbitmq_queue = 'energyStorageStrategy.queue'
    target_queue = 'energyStorageStrategy.queue.typc-fpd-tysh'
    target_exchange = 'energyStorageStrategy.direct'
    routing_key = 'typc-fpd-tysh'
    pso_params = {}  # 假设你的PSO参数

    credentials = pika.PlainCredentials('rabbit', 'rabbit****')  # mq用户名和密码
    rabbitmq_params = pika.ConnectionParameters('192.168.*.*', port=5671, virtual_host='/typc-fpd-dev', credentials=credentials)

    # 创建一个multiprocessing.Queue用于进程间通信  
    data_queue = multiprocessing.Queue()  

    # 创建工作进程列表
    processes = [] 

    print(' [*] Waiting for messages. To exit press CTRL+C')

    flask_thread = Thread(target=lambda: app.run(host='0.0.0.0', port=5002, debug=True))
    flask_thread.start()
    
    with app.app_context():
        start_worker()
        start_listening()
  1. Worker模块

文件名称:workers.py

import pika
import json
from loguru import logger

# 假设这是你的计算函数  
def compute_result(data, pso_params):  
    return {"result": data}

# 发送结果到RabbitMQ的函数  
def send_result_to_rabbitmq(channel, exchange_name, queue_name, routing_key, result):  
    # 省略

# 工作进程函数  
def worker(data_queue, rabbitmq_params, target_exchange, target_queue, routing_key, pso_params): 
    logger.info('Worker started') 
    # 省略
    logger.info('Worker finished')
  1. Utils模块

文件名称:utils.py

import pika

def consume_from_rabbitmq_and_enqueue(rabbitmq_params, rabbitmq_queue, data_queue, stop_event):
	# 略

5. 总结

  • 模块化:将不同的功能模块化,便于维护和扩展。
  • 多进程与多线程结合:使用multiprocessing实现并行计算任务,使用threading实现RabbitMQ消息侦听和Flask Web服务的非阻塞运行。
  • 进程间通信:通过multiprocessing.Queue实现消息侦听任务与并行计算任务之间的资源交互。
  • 事件控制:通过threading.Event控制消息侦听任务的启停。

这种架构设计能够满足需求,并且具有较好的扩展性和可维护性。如果有更多具体的需求或优化,可以在此基础上进一步完善。

相关推荐

  1. python线进程开发实践3

    2024-07-13 03:16:02       27 阅读
  2. Python---进程---线

    2024-07-13 03:16:02       59 阅读
  3. Python中的线进程编程:深入解析应用

    2024-07-13 03:16:02       33 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-13 03:16:02       70 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-13 03:16:02       74 阅读
  3. 在Django里面运行非项目文件

    2024-07-13 03:16:02       62 阅读
  4. Python语言-面向对象

    2024-07-13 03:16:02       72 阅读

热门阅读

  1. MySQL-锁

    2024-07-13 03:16:02       15 阅读
  2. 我的PHP8编译日志

    2024-07-13 03:16:02       20 阅读
  3. error: #29: expected an expression

    2024-07-13 03:16:02       20 阅读
  4. MySQL版本升级

    2024-07-13 03:16:02       19 阅读
  5. 数据建设实践之大数据平台(四)安装mysql

    2024-07-13 03:16:02       22 阅读
  6. Python-数据爬取(爬虫)

    2024-07-13 03:16:02       21 阅读
  7. 关于QT实现绘图库的技术栈考虑

    2024-07-13 03:16:02       21 阅读
  8. 使用Python绘制百分比堆积条形图

    2024-07-13 03:16:02       23 阅读
  9. How to Use shred to Erase a Drive or File in Fedora

    2024-07-13 03:16:02       25 阅读