php使用rabbitmq

这里使用的是 php-amqplib

 composer require php-amqplib/php-amqplib

生产端send.php

<?php

require_once "./vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//交换机名称
$exc_name = 'exchange.canal';
//路由键
$routing_key = 'canal-routing-key';
//队列名称
$queue_key = 'canal_queue';

//生产者
//Connection: publisher/consumer和broker之间的TCP连接
//Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。
//Channel是在connection内部建立的逻辑连接Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。

//1.建立connction
/**
 * 主要参数说明:
 * $host:  RabbitMQ服务器主机IP地址
 * $port:  RabbitMQ服务器端口
 * $user:  连接RabbitMQ服务器的用户名
 * $password:  连接RabbitMQ服务器的用户密码
 * $vhost:   连接RabbitMQ服务器的vhost(服务器可以有多个vhost,虚拟主机,类似nginx的vhost)
 */
$connection = new AMQPStreamConnection('192.168.0.105', 5672, 'admin', 'admin', "/");
//2.建立通道Channel
$channel = $connection->channel();

//3.声明交换器
/**
 * exchange_declare($exchange, $type, $passive = false, $durable = false, $auto_delete = true, $internal = false, $nowait = false, $arguments = array(), $ticket = null) 。
 * 试探性申请一个交换器,若该交换器不存在,则创建;若存在,则跳过。
 * 主要参数说明:
 * $exchange:队列名称
 * $type 交换器类型,常见类型如:fanout,dircet,topic,hraders四种
 * $passive 只判断不创建,一般是判断该交换机是否存在
 * $durable:是否开启持久化 设置true表示持久化,反之非持久化。持久化可以将交换器存盘,在服务器重启的适合不会丢失相关数据。
 * $auto_delete:是否自动删除
 * $internal:设置是否内置,设置true表示内置交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这个方式
 * $nowait:如果为true表示不等待服务器回执信息,函数将返回NULL,可提高访问速度。
 */
$channel->exchange_declare($exc_name, 'direct', false, true, false);

//4.声明队列
/**
 * queue_declare($queue, $passive = false, $durable = false, $exclusive = false, $auto_delete = true, $nowait = false, $arguments = null, $ticket = null)
 * 试探性申请一个队列,若该队列不存在,则创建;若存在,则跳过。
 * 主要参数说明:
 * $queue:队列名称
 * $passive 只判断不创建,一般是判断该队列是否存在
 * $durable:是否开启持久化 设置true表示持久化,反之非持久化。持久化可以将队列存盘,在服务器重启的适合不会
 * $exclusive:是否独占,设置true表示独占,只能有一个消费者监听该队列,false表示共享,可以有多个消费者监听该队列。
 */
$channel->queue_declare($queue_key, false, true, false, false);

//5.绑定队列到交换器
/**
 * queue_bind($queue, $exchange, $routing_key = '', $nowait = false, $arguments = null, $ticket = null)
 * 绑定队列到交换器
 * 主要参数说明:
 * $queue:队列名称
 * $exchange:交换器名称
 * $routing_key:路由键
 */
$channel->queue_bind($queue_key, $exc_name, $routing_key);

//4.创建消息
$content = 'this is ' . $routing_key . ' message';
/**
 * 主要参数说明:
 *  data:消息
 *  properties Array 设置的属性,比如设置该消息是否持久化
 * DELIVERY_MODE_NON_PERSISTENT 不持久化
 * DELIVERY_MODE_PERSISTENT 持久化
 */
$msg = new AMQPMessage($content, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);

//5.发布消息
/**
 * basic_publish主要参数说明:
 * $msg 消息内容
 * $exchange 交换器
 * $routing_key routing_key或者队列名称
 * $mandatory 匹配不到队列时,是否立即丢弃消息
 * $immediate 队列无消费者时,是否立即丢弃消息
 * $ticket 不清楚是什么作用
 */
$channel->basic_publish($msg, $exc_name, $routing_key);
//7.通道关闭
$channel->close();
//8.连接关闭
$connection->close();

消费端receive.php

<?php
require_once "./vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;

//交换机名称
$exc_name  = 'exchange.canal';
//路由键
$routing_key = 'canal-routing-key';

//1.建立connction
$connection = new AMQPStreamConnection('192.168.0.105', 5672, 'admin', 'admin', "/");
//2.建立通道Channel
$channel = $connection->channel();

//3.声明交换器
$channel->exchange_declare($exc_name, 'direct', false, true, false);

//4.获取系统生成的消息队列名称,这里也可以指定一个队列名称
list($queue_name, ,) = $channel->queue_declare('', false, true, true, false);

//5.将队列名与交换器名进行绑定,并指定routing_key
$channel->queue_bind($queue_name,$exc_name,$routing_key);

//6.设置消费成功后才能继续进行下一个消费
$channel->basic_qos(null, 1, null);

//7.定义收到消息回调函数
$callback = function ($msg) {
    echo 'received = ', $msg->body . "\n";
    //确认消息已被消费,从生产队列中移除
    $msg->ack();
};

//8.开启消费no_ack=false,设置为手动应答
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);

//9.不断的循环进行消费
while ($channel->is_open()) {
    $channel->wait();
}

//关闭连接
$channel->close();
$connection->close();

参考文章

  • //https://blog.csdn.net/qq_28979487/article/details/136325509

相关推荐

  1. php使用rabbitmq

    2024-05-04 07:38:03       38 阅读
  2. PHP使用 enqueue/amqp-lib拓展实现rabbitmq任务处理

    2024-05-04 07:38:03       47 阅读
  3. RabbitMQ php amqp

    2024-05-04 07:38:03       37 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-05-04 07:38:03       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-05-04 07:38:03       101 阅读
  3. 在Django里面运行非项目文件

    2024-05-04 07:38:03       82 阅读
  4. Python语言-面向对象

    2024-05-04 07:38:03       91 阅读

热门阅读

  1. ChatGPT等AI大模型输出格式错误优化分享

    2024-05-04 07:38:03       36 阅读
  2. 应用回归分析,R语言,多元回归模型总结(上)

    2024-05-04 07:38:03       32 阅读
  3. R语言计算特定列的和(自备)

    2024-05-04 07:38:03       39 阅读
  4. pytorch笔记:ReplicationPad1d

    2024-05-04 07:38:03       34 阅读
  5. json转excel

    2024-05-04 07:38:03       34 阅读
  6. DRF路由组件分析

    2024-05-04 07:38:03       30 阅读
  7. docker搭建redis集群三主三从

    2024-05-04 07:38:03       31 阅读
  8. 【QT】QStackedWidget一利器,常在项目中的使用

    2024-05-04 07:38:03       33 阅读
  9. AWTK 和 QT 资源占用不完全对比

    2024-05-04 07:38:03       34 阅读
  10. 设计模式-02 设计模式-工厂模式factory

    2024-05-04 07:38:03       29 阅读