python watchdog检测到文件产生,动态创建进程,不指定进程数去处理,处理过程中需要写文件,同时产生多个文件的话会同时创建多个进程去处理,怎么保存写文件不冲突

如果希望在检测到文件时动态创建进程而不预先指定进程数,并确保写文件时不发生冲突,可以使用队列和锁的机制。以下是一个改进的方案:

python

from multiprocessing import Process, Queue, Lock
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import time
import sys

class FileProcessor(Process):
    def __init__(self, queue, lock):
        super().__init__()
        self.queue = queue
        self.lock = lock

    def run(self):
        while True:
            path = self.queue.get()
            if path is None:  # 接收到结束信号
                self.queue.task_done()
                break
            
            # 假设的文件处理逻辑
            result = f"Processed content of {path}"
            
            with self.lock:
                with open(f"{path}.processed", "w") as f:
                    f.write(result)
            self.queue.task_done()

class MyHandler(FileSystemEventHandler):
    def __init__(self, queue):
        self.queue = queue

    def on_created(self, event):
        if event.is_directory:
            return
        self.queue.put(event.src_path)

def main():
    manager = Manager()
    queue = manager.Queue()
    lock = manager.Lock()
    
    observer = Observer()
    observer.schedule(MyHandler(queue), path='./watchdir', recursive=False)
    observer.start()
    
    try:
        while True:
            # 动态创建进程,但需考虑上限,这里简单示例,实际情况可能需要更复杂的管理逻辑
            if queue.qsize() > 0 and len(active_processes) < MAX_PROCESSES:
                p = FileProcessor(queue, lock)
                p.start()
                active_processes.append(p)
            time.sleep(1)  # 防止过于频繁的检查,可根据需要调整
    except KeyboardInterrupt:
        observer.stop()
        print("Stopping...")
    
    # 清理和结束进程
    for _ in active_processes:
        queue.put(None)  # 发送结束信号
    observer.join()
    for p in active_processes:
        p.join()

if __name__ == "__main__":
    MAX_PROCESSES = 5  # 最大同时运行的进程数,根据实际情况调整
    active_processes = []  # 用于追踪活动的进程
    main()

我们定义了一个FileProcessor类继承自Process,这样每个处理任务都在一个独立的进程中运行。在MyHandler检测到文件创建时,将文件路径加入队列。主程序会检查队列大小以及当前活跃进程的数量,如果队列中有待处理的任务且活跃进程数未达到预设的最大值(例如MAX_PROCESSES),则创建新的FileProcessor进程。每个FileProcessor在完成任务或接收到结束信号后会自动结束,同时使用锁来保护写文件操作,防止冲突。
请注意,虽然这个方案允许动态创建进程,但实际应用中应当考虑设置合理的最大进程数限制,以避免因资源耗尽导致的系统不稳定。此外,还需要考虑如何优雅地管理和结束这些动态创建的进程,尤其是在程序退出时。

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-06-16 19:18:01       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-06-16 19:18:01       106 阅读
  3. 在Django里面运行非项目文件

    2024-06-16 19:18:01       87 阅读
  4. Python语言-面向对象

    2024-06-16 19:18:01       96 阅读

热门阅读

  1. 洛谷 AT_arc168_a [ARC168A] <Inversion> 题解

    2024-06-16 19:18:01       35 阅读
  2. 用户组的概念(linux篇)

    2024-06-16 19:18:01       27 阅读
  3. CentOS下 conda环境设置

    2024-06-16 19:18:01       28 阅读
  4. HTTP!!!

    HTTP!!!

    2024-06-16 19:18:01      34 阅读
  5. Android基础-ANR详解

    2024-06-16 19:18:01       32 阅读
  6. oracle的xmlagg的用法

    2024-06-16 19:18:01       30 阅读
  7. 异常处理与IO

    2024-06-16 19:18:01       26 阅读
  8. C语言:进程

    2024-06-16 19:18:01       23 阅读
  9. 数据库 | 数据库设计的步骤

    2024-06-16 19:18:01       31 阅读
  10. 创建你的第一个Windows程序

    2024-06-16 19:18:01       24 阅读