Thinkphp+workman+redis实现多线程异步任务处理

前言


PHP本身并不直接支持多线程编程,因为PHP的设计初衷是作为一个脚本语言,主要面向的是Web开发。不过我们可以使用一些扩展和库来实现多线程的功能,比如workerman和swoole。通过多线程异步执行任务,可以大大提高代码的执行效率。

安装workman


Thinkphp5.0使用workman创建多线程任务


  • 1.在项目根目录(注意不是pubcli目录)下创建文件server.php,文件内容如下

    <?php
    define('APP_PATH', __DIR__ . '/application/');
    define('BIND_MODULE','workman/Worker');
    // 加载框架引导文件
    require __DIR__ . '/thinkphp/start.php';
    
  • 2.在根目录创建\application\workman\controller目录,然后在该目录下新建Worker.php,文件内容如下

    <?php
    
    namespace app\workman\controller;
    
    use think\worker\Server;
    
    class Worker extends Server
    {
        //websocket服务端地址和端口
        protected $socket = 'websocket://0.0.0.0:2346';
        //设置进程数,默认为4,根据自己的需要和服务器配置合理设置,一般设置进程数为CPU核数的1倍-3倍
        protected $processes = 4;
    
        /**
         * 收到信息
         * @param $connection
         * @param $data
         */
        public function onMessage($connection, $data)
        {
    
        }
    
        /**
         * 当连接建立时触发的回调函数
         * @param $connection
         */
        public function onConnect($connection)
        {
    
        }
    
        /**
         * 当连接断开时触发的回调函数
         * @param $connection
         */
        public function onClose($connection)
        {
    
        }
    
        /**
         * 当客户端的连接上发生错误时触发
         * @param $connection
         * @param $code
         * @param $msg
         */
        public function onError($connection, $code, $msg)
        {
            echo "error $code $msg\n";
        }
    
        /**
         * 每个进程启动
         * @param $worker
         */
        public function onWorkerStart($worker)
        {
            echo 'workman进程启动,进程id ' . $worker->id . PHP_EOL;
    
            //监听redis队列
            $redis = new \Redis();
            $redis->connect('192.168.204.128', 6379);
            while (true) {
                //读取redis队列
                $data = $redis->lPop('test-queue');
                if ($data) {
                    //处理业务
                    echo '进程id ' . $worker->id . ' 开始处理业务数据' . $data . PHP_EOL;
                    //模拟耗时任务
                    sleep(5);
                    echo '进程id ' . $worker->id . ' 处理业务数据' . $data . ' 完成' . PHP_EOL;
                } else {
                    echo '进程id ' . $worker->id . ' 空闲中,休息5秒'. PHP_EOL;
                    sleep(5);
                }
            }
        }
    }
    

    这里主要的功能就是创建一个workman的websocket服务端(使用其他tcp服务也是可以的),然后在每个进程启动的时候监听redis队列,利用这些进程异步去处理redis队列里的任务,代码里的模拟耗时任务可以直接替换成你在tp框架里写的耗时任务。

  • 3.在tp框架中将任务加入redis队列,例如我这里写一个添加redis列表元素的方法

    <?php
    
    namespace app\index\controller;
    
    class Index
    {
        //新增队列数据
        public function addQueue()
        {
            $redis = new \Redis();
            $redis->connect('192.168.204.128', 6379);
            $redis->rPush('test-queue', '1');
            $redis->rPush('test-queue', '2');
            $redis->rPush('test-queue', '3');
            $redis->rPush('test-queue', '4');
            $redis->rPush('test-queue', '5');
            $redis->rPush('test-queue', '6');
            $redis->rPush('test-queue', '7');
            echo 'success';
        }
    }
    
  • 4.启动workman

    直接在根目录下运行第一步创建的server.php

    php server.php start
    

    可以看到下面的输出

    Workerman[server.php] start in DEBUG mode
    -------------------------------------------- WORKERMAN --------------------------------------------
    Workerman version:3.5.35          PHP version:7.4.33           Event-Loop:\Workerman\Events\Select
    --------------------------------------------- WORKERS ---------------------------------------------
    proto   user            worker          listen                      processes    status
    tcp     root            none            websocket://0.0.0.0:2346    4             [OK]
    ---------------------------------------------------------------------------------------------------
    Press Ctrl+C to stop. Start success.
    workman进程启动,进程id 0
    workman进程启动,进程id 3
    workman进程启动,进程id 2
    workman进程启动,进程id 1
    进程id 1 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    进程id 2 空闲中,休息5秒
    进程id 3 空闲中,休息5秒
    

    注意:如果是在windows下,设置进程数是没有用的,就只会启动一个worker进程,也就是只有单进程,要体验多进程只能在Linux环境下,同时也无法守护进程,cmd窗口关掉后服务即停止

  • 5.访问tp框架中的将任务加入redis队列接口,直接用浏览器或者命令行curl访问http://网站域名/index/index/addQueue即可,然后你就可以看到所有的redis队列将被workman线程分配并执行,以下是我启动workman->添加redis队列->workman处理->队列处理结束打印的结果:

    Workerman[server.php] start in DEBUG mode
    -------------------------------------------- WORKERMAN --------------------------------------------
    Workerman version:3.5.35          PHP version:7.4.33           Event-Loop:\Workerman\Events\Select
    --------------------------------------------- WORKERS ---------------------------------------------
    proto   user            worker          listen                      processes    status
    tcp     root            none            websocket://0.0.0.0:2346    4             [OK]
    ---------------------------------------------------------------------------------------------------
    Press Ctrl+C to stop. Start success.
    workman进程启动,进程id 0
    workman进程启动,进程id 3
    workman进程启动,进程id 2
    workman进程启动,进程id 1
    进程id 1 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    进程id 2 空闲中,休息5秒
    进程id 3 空闲中,休息5秒
    进程id 2 空闲中,休息5秒
    进程id 1 空闲中,休息5秒
    进程id 3 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    (这里开始将任务加入redis队列,然后workman开始消费队列)
    进程id 0 开始处理业务数据1
    进程id 1 开始处理业务数据3
    进程id 3 开始处理业务数据4
    进程id 2 开始处理业务数据2
    进程id 0 处理业务数据1 完成
    进程id 1 处理业务数据3 完成
    进程id 2 处理业务数据2 完成
    进程id 1 开始处理业务数据6
    进程id 3 处理业务数据4 完成
    进程id 0 开始处理业务数据5
    进程id 3 开始处理业务数据7
    进程id 2 空闲中,休息5秒
    进程id 1 处理业务数据6 完成
    进程id 0 处理业务数据5 完成
    进程id 3 处理业务数据7 完成
    (到这里所有的7项任务已经处理完成)
    进程id 1 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    进程id 3 空闲中,休息5秒
    进程id 2 空闲中,休息5秒
    

TP5.1及TP6使用workman创建多线程任务


  • 1.在根目录创建\application\workman\controller目录,然后在该目录下新建Worker.php,文件内容如下

    <?php
    
    namespace app\workman;
    
    use think\worker\Server;
    
    class Worker extends Server
    {
        protected $host = '127.0.0.1';
        protected $port = 2346;
        protected $protocol = 'websocket';
        protected $option = [
            'count'		=> 4, //设置进程数,默认为4,根据自己的需要和服务器配置合理设置,一般设置进程数为CPU核数的1倍-3倍
            'name'		=> 'think'
        ];
    
    
        /**
         * 收到信息
         * @param $connection
         * @param $data
         */
        public function onMessage($connection, $data)
        {
    
        }
    
        /**
         * 当连接建立时触发的回调函数
         * @param $connection
         */
        public function onConnect($connection)
        {
    
        }
    
        /**
         * 当连接断开时触发的回调函数
         * @param $connection
         */
        public function onClose($connection)
        {
    
        }
    
        /**
         * 当客户端的连接上发生错误时触发
         * @param $connection
         * @param $code
         * @param $msg
         */
        public function onError($connection, $code, $msg)
        {
            echo "error $code $msg\n";
        }
    
        /**
         * 每个进程启动
         * @param $worker
         */
        public function onWorkerStart($worker)
        {
            echo 'workman进程启动,进程id ' . $worker->id . PHP_EOL;
    
            //监听redis队列
            $redis = new \Redis();
            $redis->connect('127.0.0.1', 6379);
            while (true) {
                //读取redis队列
                $data = $redis->lPop('test-queue');
                if ($data) {
                    //处理业务
                    echo '进程id ' . $worker->id . ' 开始处理业务数据' . $data . PHP_EOL;
                    //模拟耗时任务
                    sleep(5);
                    echo '进程id ' . $worker->id . ' 处理业务数据' . $data . ' 完成' . PHP_EOL;
                } else {
                    echo '进程id ' . $worker->id . ' 空闲中,休息5秒' . PHP_EOL;
                    sleep(5);
                }
            }
        }
    }
    

    这里主要的功能就是创建一个workman的websocket服务端(使用其他tcp服务也是可以的),然后在每个进程启动的时候监听redis队列,利用这些进程异步去处理redis队列里的任务,代码里的模拟耗时任务可以直接替换成你在tp框架里写的耗时任务。

  • 2.指定workman服务类名

    修改config/worker_server.php,将worker_class的值改为app\workman\Worker

    'worker_class'   => 'app\workman\Worker', // 自定义Workerman服务类名 支持数组定义多个服务
    
  • 3.启动workman

    直接在根目录下运行命令php think worker:server或者php think worker:server -d即可启动,如果要调整workman参数,修改config/worker_server.php中的选项即可

    php think worker:server
    

    可以看到下面的输出

    Starting Workerman server...
    ----------------------- WORKERMAN -----------------------------
    Workerman version:3.5.35          PHP version:7.3.4
    ------------------------ WORKERS -------------------------------
    worker               listen                              processes status
    none                 websocket://127.0.0.1:2346          1         [ok]
    workman进程启动,进程id 0
    进程id 0 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    

    注意:如果是在windows下,设置进程数是没有用的,就只会启动一个worker进程,也就是只有单进程,要体验多进程只能在Linux环境下,同时也无法守护进程,cmd窗口关掉后服务即停止

  • 4.在tp框架中将任务加入redis队列,例如我这里写一个添加redis列表元素的方法

    <?php
    namespace app\controller;
    
    use app\BaseController;
    
    class Index extends BaseController
    {
        //新增队列数据
        public function addQueue()
        {
            $redis = new \Redis();
            $redis->connect('127.0.0.1', 6379);
            $redis->rPush('test-queue', '1');
            $redis->rPush('test-queue', '2');
            $redis->rPush('test-queue', '3');
            $redis->rPush('test-queue', '4');
            $redis->rPush('test-queue', '5');
            $redis->rPush('test-queue', '6');
            $redis->rPush('test-queue', '7');
            echo 'success';
        }
    }
    
  • 5.访问上面的将任务加入redis队列接口,直接用浏览器或者命令行curl访问http://网站域名/index/addQueue即可,然后你就可以看到所有的redis队列将被workman线程分配并执行,以下是我启动workman->添加redis队列->workman处理->队列处理结束打印的结果:

    Starting Workerman server...
    Workerman[think] start in DEBUG mode
    -------------------------------------------- WORKERMAN ---------------------------------------------
    Workerman version:3.5.35          PHP version:7.4.33           Event-Loop:\Workerman\Events\Select
    --------------------------------------------- WORKERS ----------------------------------------------
    proto   user            worker          listen                        processes    status
    tcp     root            think           websocket://127.0.0.1:2346    4             [OK]
    ----------------------------------------------------------------------------------------------------
    Press Ctrl+C to stop. Start success.
    workman进程启动,进程id 0
    workman进程启动,进程id 3
    进程id 0 空闲中,休息5秒
    进程id 3 空闲中,休息5秒
    workman进程启动,进程id 1
    进程id 1 空闲中,休息5秒
    workman进程启动,进程id 2
    进程id 2 空闲中,休息5秒
    进程id 0 开始处理业务数据2
    进程id 3 开始处理业务数据1
    进程id 1 开始处理业务数据3
    进程id 2 开始处理业务数据4
    进程id 0 处理业务数据2 完成
    进程id 0 开始处理业务数据5
    进程id 3 处理业务数据1 完成
    进程id 1 处理业务数据3 完成
    进程id 3 开始处理业务数据6
    进程id 1 开始处理业务数据7
    进程id 2 处理业务数据4 完成
    进程id 2 空闲中,休息5秒
    进程id 0 处理业务数据5 完成
    进程id 1 处理业务数据7 完成
    进程id 3 处理业务数据6 完成
    进程id 0 空闲中,休息5秒
    进程id 1 空闲中,休息5秒
    进程id 3 空闲中,休息5秒
    进程id 2 空闲中,休息5秒
    

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-16 06:16:06       91 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-16 06:16:06       97 阅读
  3. 在Django里面运行非项目文件

    2024-03-16 06:16:06       78 阅读
  4. Python语言-面向对象

    2024-03-16 06:16:06       88 阅读

热门阅读

  1. 单例模式详解

    2024-03-16 06:16:06       42 阅读
  2. springboot的自动配置原理

    2024-03-16 06:16:06       50 阅读
  3. apache和tomcat的优化总结概述

    2024-03-16 06:16:06       38 阅读
  4. 如何判断两个表的结构相同?desc t1 和desc t2

    2024-03-16 06:16:06       37 阅读
  5. SpringBoot3下Kafka分组均衡消费实现

    2024-03-16 06:16:06       42 阅读
  6. mysql 查询字段大于某长度的SQL

    2024-03-16 06:16:06       35 阅读
  7. Lua 学习

    2024-03-16 06:16:06       42 阅读