1. 前言
1.1. 概述
基于Flask、Pika、Multiprocessing、Thread搭建一个架构,完成多线程、多进程工作。具体需求如下:
- 并行计算任务:使用
multiprocessing
模块实现并行计算任务,提高计算效率、计算能力。 - 消息侦听任务:使用
threading
模块完成RabbitMQ消息队列的侦听任务,将接收到的数据放入multiprocessing.Queue
中,以便并行计算任务处理。 - Web服务:使用
Flask
框架实现Web API服务,提供启停消息侦听任务、启停并行计算任务以及动态调整参数的功能。 - 任务交互:通过
multiprocessing.Queue
实现消息侦听任务与并行计算任务之间的资源交互。 - 非阻塞运行:使用
threading
模块非阻塞地运行Flask Web服务。
1.2. 续上一篇《python多线程与多进程开发实践及填坑记(1)》
1.3. 重新启动工作进程报错
AttributeError: Can’t get attribute ‘worker’ on <module ‘main’ (built-in)>
程序没有报错,但是,没有启动侦听服务线程。
@app.route('/startworking', methods=['GET'])
def start_worker():
if len(processes) == 0:
for _ in range(3):
p = multiprocessing.Process(target=worker, args=(data_queue, rabbitmq_params, target_exchange, target_queue, routing_key, pso_params))
p.start()
processes.append(p)
print(f'process id = {p.pid}')
return jsonify({'status': 'started working'}), 200
else:
return jsonify({'status': 'already working'}), 202
2. 原理解析
这里遇到的问题是由 Python 的 multiprocessing 模块在 Windows 上的行为引起的。具体来说,multiprocessing 模块在 Windows 上使用“spawn”启动方法,而不是“fork”,这意味着每个子进程需要能够导入主模块中的所有内容。如果某些对象不能被 pickle 化(例如局部函数),将会导致你看到的 AttributeError 错误。
为了解决这个问题,我们需要确保 worker 函数在主模块的全局命名空间中可用,将 worker 函数和其他依赖函数放到一个单独的模块中,然后在主模块中导入它们。
3. 拆分出代码
import pika
import json
from loguru import logger
# 假设这是你的计算函数
def compute_result(data, pso_params):
return {"result": data}
# 发送结果到RabbitMQ的函数
def send_result_to_rabbitmq(channel, exchange_name, queue_name, routing_key, result):
# 省略
# 工作进程函数
def worker(data_queue, rabbitmq_params, target_exchange, target_queue, routing_key, pso_params):
logger.info('Worker started')
# 省略
logger.info('Worker finished')
关键改动:
- 将 worker 函数和相关依赖函数移到 workers.py 模块中。
- 在主模块中导入 worker 函数。
- 保持 data_queue 的初始化和使用不变。
- 这样可以确保在多进程环境中,每个子进程都可以正确地导入 worker 函数及其依赖函数。
4. python多线程与多进程及RabbitMQ程序架构
4.1. 模块划分
- 主模块 (
main.py
): 启动Flask服务,管理消息侦听任务和并行计算任务。 - Worker模块 (
workers.py
): 定义并行计算任务及其辅助函数。 - Utils模块 (
utils.py
): 定义RabbitMQ相关的辅助函数。
4.2. 代码实现
- 主模块
文件名称:main.py
from flask import Flask, jsonify
from threading import Thread, Event
import multiprocessing
import pika
import logging
from workers import worker
from utils import consume_from_rabbitmq_and_enqueue
# 配置日志记录
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 创建一个事件来控制侦听
stop_event = Event()
# 定义web服务
app = Flask(__name__)
@app.route('/startlistening', methods=['GET'])
def start_listening():
if stop_event.is_set():
stop_event.clear()
if not hasattr(app, 'pika_thread') or not app.pika_thread.is_alive():
app.pika_thread = Thread(target=consume_from_rabbitmq_and_enqueue, args=(rabbitmq_params, rabbitmq_queue, data_queue, stop_event))
app.pika_thread.start()
return jsonify({'status': 'listening'}), 200
else:
return jsonify({'status': 'already listening'}), 200
@app.route('/stoplistening', methods=['GET'])
def stop_listening():
if hasattr(app, 'pika_thread') and app.pika_thread.is_alive():
stop_event.set()
app.pika_thread.join()
del app.pika_thread
return jsonify({'status': 'stopped'}), 200
else:
return jsonify({'status': 'not running'}), 400
@app.route('/startworking', methods=['GET'])
def start_worker():
if len(processes) == 0:
for _ in range(3):
p = multiprocessing.Process(target=worker, args=(data_queue, rabbitmq_params, target_exchange, target_queue, routing_key, pso_params))
p.start()
processes.append(p)
print(f'process id = {p.pid}')
return jsonify({'status': 'started working'}), 200
else:
return jsonify({'status': 'already working'}), 202
@app.route('/stopworking', methods=['GET'])
def stop_worker():
for p in processes:
data_queue.put(None)
for p in processes:
p.join()
processes.clear()
return jsonify({'status': 'stopped working'}), 200
if __name__ == "__main__":
rabbitmq_queue = 'energyStorageStrategy.queue'
target_queue = 'energyStorageStrategy.queue.typc-fpd-tysh'
target_exchange = 'energyStorageStrategy.direct'
routing_key = 'typc-fpd-tysh'
pso_params = {} # 假设你的PSO参数
credentials = pika.PlainCredentials('rabbit', 'rabbit****') # mq用户名和密码
rabbitmq_params = pika.ConnectionParameters('192.168.*.*', port=5671, virtual_host='/typc-fpd-dev', credentials=credentials)
# 创建一个multiprocessing.Queue用于进程间通信
data_queue = multiprocessing.Queue()
# 创建工作进程列表
processes = []
print(' [*] Waiting for messages. To exit press CTRL+C')
flask_thread = Thread(target=lambda: app.run(host='0.0.0.0', port=5002, debug=True))
flask_thread.start()
with app.app_context():
start_worker()
start_listening()
- Worker模块
文件名称:workers.py
import pika
import json
from loguru import logger
# 假设这是你的计算函数
def compute_result(data, pso_params):
return {"result": data}
# 发送结果到RabbitMQ的函数
def send_result_to_rabbitmq(channel, exchange_name, queue_name, routing_key, result):
# 省略
# 工作进程函数
def worker(data_queue, rabbitmq_params, target_exchange, target_queue, routing_key, pso_params):
logger.info('Worker started')
# 省略
logger.info('Worker finished')
- Utils模块
文件名称:utils.py
import pika
def consume_from_rabbitmq_and_enqueue(rabbitmq_params, rabbitmq_queue, data_queue, stop_event):
# 略
5. 总结
- 模块化:将不同的功能模块化,便于维护和扩展。
- 多进程与多线程结合:使用multiprocessing实现并行计算任务,使用threading实现RabbitMQ消息侦听和Flask Web服务的非阻塞运行。
- 进程间通信:通过multiprocessing.Queue实现消息侦听任务与并行计算任务之间的资源交互。
- 事件控制:通过threading.Event控制消息侦听任务的启停。
这种架构设计能够满足需求,并且具有较好的扩展性和可维护性。如果有更多具体的需求或优化,可以在此基础上进一步完善。