RabbitMQ学习总结

目录

一:介绍        

二:应用场景

三:工作原理

组成部分说明

消息发布接收流程

四:下载安装

五:环境搭建

创建Maven工程

生产者

消费者 

 六:工作模式

Work queues

Publish/subscribe

生产者

消费者 

Routing 

生产者

​消费者

思考

 Topics

生产者

匹配规则 

Header模式

生产者

RPC

说明

七:SpringBoot整合RabbitMQ



一:介绍        

MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发.

RabbitMQ官方地址:http://www.rabbitmq.com/

二:应用场景

1)任务异步处理

2)应用程序解耦合

三:工作原理

组成部分说明

 Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。

 Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。

 Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。

 Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。

 Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

消息发布接收流程

 发送消息:

 1、生产者和Broker建立TCP连接。
 2、生产者和Broker建立通道。
 3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
 4、Exchange将消息转发到指定的Queue(队列)

 接收消息:

 1、消费者和Broker建立TCP连接
 2、消费者和Broker建立通道
 3、消费者监听指定的Queue(队列)
 4、当有消息到达Queue时Broker默认将消息推送给消费者。
 5、消费者接收到消息

四:下载安装

 RabbitMQ的下载地址:http://www.rabbitmq.com/download.html

 本项目使用Erlang/OTP 20.3版本和RabbitMQ3.7.3版本。

 下载erlang: http://erlang.org/download/otp_win64_20.3.exe

  erlang安装完成需要配置erlang环境变量: ERLANG_HOME=D:\Program Files\erl9.3 在path中 添加%ERLANG_HOME%\bin;

安装erlang直接下一步就好

 安装RabbitMQ: Release RabbitMQ 3.7.3 · rabbitmq/rabbitmq-server · GitHub

安装完开始菜单会有显示:

RabbitMQ Service-install :安装服务
RabbitMQ Service-remove 删除服务
RabbitMQ Service-start 启动
RabbitMQ Service-stop 启动

如果没有开始菜单则进入安装目录下sbin目录手动启动:

1)安装并运行服务
rabbitmq-service.bat install 安装服务 rabbitmq-service.bat stop 停止服务 rabbitmq-service.bat start 启动服务
2)安装管理插件
安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ
管理员身份运行 rabbitmq-plugins.bat enable rabbitmq_management

3、启动成功 登录RabbitMQ
进入浏览器,输入:http://localhost:15672

初始账号和密码:guest/guest

注:每个虚拟机就相当于一个独立的MQ,默认虚拟机的名字为/

4、如果启动失败找到.erlang.cookie,位于C:\windows\system32\config\systemprofile下,将此处的.erlang.cookie覆盖C:\user\admin.erlang.cookie后重启RabbitMQ即可解决问题

注意事项

1、安装erlang和rabbitMQ以管理员身份运行。
2、当卸载重新安装时会出现RabbitMQ服务注册失败,此时需要进入注册表清理erlang搜索RabbitMQ、ErlSrv,将对应的项全部删除。

五:环境搭建

创建Maven工程

创建生产者工程和消费者工程,分别加入RabbitMQ java client的依赖。
test-rabbitmq-producer:生产者工程
test-rabbitmq-consumer:消费者工程

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp‐client</artifactId>
<version>4.0.3</version><!‐‐此版本与spring boot 1.5.9版本匹配‐‐>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐logging</artifactId>
</dependency>

生产者

public class Producer01 {
    //队列名称
    private static final String QUEUE = "helloworld";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
            //创建与RabbitMQ服务的TCP连接
            connection = factory.newConnection();
            //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
            channel = connection.createChannel();

            /**
             * 声明队列,如果Rabbit中没有此队列将自动创建
             * param1:队列名称
             * param2:durable是否持久化,如果持久化,mq重启后队列还在
             * param3:exclusive队列是否独占此连接,队列只允许在该连接中访问,如果连接关闭队列自动删除,如果将此参数设置为true可用于临时队列的创建
             * param4:autoDelete队列不再使用时是否自动删除此队列和exclusive搭配使用
             * param5:队列参数(可以设置队列的扩展参数)
             */
            channel.queueDeclare(QUEUE, true, false, false, null);
            String message = "helloworld小明" + System.currentTimeMillis();
            /**
             * 消息发布方法
             * param1:Exchange的名称,如果没有指定,则使用Default Exchange
             * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列,如果使用默认交换机,routingKey设置为队列的名称
             * param3:消息包含的属性
             * param4:消息体
             */

            /**
             * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显
             示绑定或解除绑定
             * 默认的交换机,routingKey等于队列名称
             */
            channel.basicPublish("", QUEUE, null, message.getBytes());
            System.out.println("Send Message is:'" + message + "'");
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
}

此时显示创建了一个队列,有1条消息待消费, 消息总数为1

 

 注:队列中是可以拿到发送的消息的

 注:Purge Message可以清空该队列的消息

 注:Publish message可以在指定的队列中发消息,因此生产者发送消息时要打印发送的body,消费者端做幂等,一旦出现问题可通过控制台重新发送该消息

消费者 

注:两边都要声明队列,防止消费者启动在生产者之前报错

public class Consumer01 {
    private static final String QUEUE = "helloworld";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        //设置MabbitMQ所在服务器的ip和端口
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE, true, false, false, null);
        //定义消费方法
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * 消费者接收消息调用此方法
             * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
             * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
            (收到消息失败后是否需要重新发送)
             * @param properties
             * @param body
             * @throws IOException
            }
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交换机
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                //消息id
                long deliveryTag = envelope.getDeliveryTag();
                //消息内容
                String msg = new String(body);
                System.out.println("receive message.." + msg);
            }
        };
        /**
         * 监听队列String queue, boolean autoAck,Consumer callback
         * 参数明细
         * 1、队列名称
         * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置
         为false则需要手动回复
         * 3、消费消息的方法,消费者接收到消息后调用此方法
         */
        channel.basicConsume(QUEUE, true, consumer);
    }
}

 六:工作模式

RabbitMQ有以下几种工作模式 :
1、Work queues
2、Publish/Subscribe
3、Routing
4、Topics
5、Header
6、RPC

Work queues

工作队列模式

work queues与入门程序相比:多了一个消费端,两个消费端共同消费同一个队列中的消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

测试:
1、使用入门程序,启动多个消费者。
2、生产者发送多个消息。

结果:
1、一条消息只会被一个消费者接收;
2、rabbit采用轮询的方式将消息是平均发送给消费者的;
3、消费者在处理完某条消息后,才会收到下一条消息。

 设置idea同时启用多个客户端

Publish/subscribe

发布订阅模式

发布订阅模式
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息 

案例:
用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法 。

生产者

1、声明exchange_fanout_inform交换机。
2、声明两个队列并且绑定到此交换机,绑定时不需要指定routingkey
3、发送消息时不需要指定routingkey

public class Producer02_publish {
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    private static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //创建一个与MQ的连接
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
            //创建一个连接
            connection = factory.newConnection();
            //创建与交换机的通道,每个通道代表一个会话
            channel = connection.createChannel();
            //声明交换机 String exchange, BuiltinExchangeType type
            /**
             * 参数明细
             * 1、交换机名称
             * 2、交换机类型,fanout、topic、direct、headers
             */
            channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, "fanout");
            //声明队列
            // (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments)
            /**
             * 参数明细:
             * 1、队列名称
             * 2、是否持久化
             * 3、是否独占此队列
             * 4、队列不用是否自动删除
             * 5、参数
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
            channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
            //交换机和队列绑定String queue, String exchange, String routingKey
            /**
             * 参数明细
             * 1、队列名称
             * 2、交换机名称
             * 3、路由key
             */
            channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "");
            channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_FANOUT_INFORM, "");
            //发送消息
            for (int i = 0; i < 10; i++) {
                String message = "inform to user" + i;
                //向交换机发送消息 String exchange, String routingKey, BasicProperties props, byte[] body
                /**
                 * 参数明细
                 * 1、交换机名称,不指令使用默认交换机名称 Default Exchange
                 * 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消息将发到此队列
                 * 3、消息属性
                 * 4、消息内容
                 */
                channel.basicPublish(EXCHANGE_FANOUT_INFORM, "", null, message.getBytes());
                System.out.println("Send Message is:'" + message + "'");
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

 可以看到新声明的交换机exchange_fanout_inform

 新生成两个队列,每个队列接收到10条消息

消费者 

邮件发送消费者

public class Consumer02_subscribe_email {
    //队列名称
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String EXCHANGE_FANOUT_INFORM = "inform_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建一个与MQ的连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
        //创建一个连接
        Connection connection = factory.newConnection();
        //创建与交换机的通道,每个通道代表一个会话
        Channel channel = connection.createChannel();
        //声明交换机 String exchange, BuiltinExchangeType type
        /**
         * 参数明细
         * 1、交换机名称
         * 2、交换机类型,fanout、topic、direct、headers
         */
        channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, "fanout");
        //声明队列
        // channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        /**
         * 参数明细:
         * 1、队列名称
         * 2、是否持久化
         * 3、是否独占此队列
         * 4、队列不用是否自动删除
         * 5、参数
         */
        channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
        //交换机和队列绑定String queue, String exchange, String routingKey
        /**
         * 参数明细
         * 1、队列名称
         * 2、交换机名称
         * 3、路由key
         */
        channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "");
        //定义消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                long deliveryTag = envelope.getDeliveryTag();
                String exchange = envelope.getExchange();
                //消息内容
                String message = new String(body);
                System.out.println(message);
            }
        };
        /**
         * 监听队列String queue, boolean autoAck,Consumer callback
         * 参数明细
         * 1、队列名称
         * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置
         为false则需要手动回复
         * 3、消费消息的方法,消费者接收到消息后调用此方法
         */
        channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer);
    }

}

可以看到邮件的信息被消费了,短信的还在

可以看到该交换机绑定了两个队列 

Routing 

路由模式
1、每个消费者监听自己的队列,并且设置routingkey。
2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。
 

生产者

1、声明exchange_routing_inform交换机。
2、声明两个队列并且绑定到此交换机,绑定时需要指定routingkey
3、发送消息时需要指定routingkey

public class Producer03_routing {
    //队列名称
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //创建一个与MQ的连接
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
            //创建一个连接
            connection = factory.newConnection();
            //创建与交换机的通道,每个通道代表一个会话
            channel = connection.createChannel();
            //声明交换机 String exchange, BuiltinExchangeType type
            /**
             * 参数明细
             * 1、交换机名称
             * 2、交换机类型,fanout、topic、direct、headers
             */
            channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, "direct");
            //声明队列
            // channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
            /**
             * 参数明细:
             * 1、队列名称
             * 2、是否持久化
             * 3、是否独占此队列
             * 4、队列不用是否自动删除
             * 5、参数
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
            channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
            //交换机和队列绑定String queue, String exchange, String routingKey
            /**
             * 参数明细
             * 1、队列名称
             * 2、交换机名称
             * 3、路由key
             */
            channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL);
            channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS);
            //发送邮件消息
            for (int i = 0; i < 10; i++) {
                String message = "email inform to user" + i;
                //向交换机发送消息 String exchange, String routingKey, BasicProperties props, byte[] body
                /**
                 * 参数明细
                 * 1、交换机名称,不指令使用默认交换机名称 Default Exchange
                 * 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消
                 息将发到此队列
                 * 3、消息属性
                 * 4、消息内容
                 */
                channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL, null, message.getBytes());
                System.out.println("Send Message is:'" + message + "'");
            }
            //发送短信消息
            for (int i = 0; i < 10; i++) {
                String message = "sms inform to user" + i;
                //向交换机发送消息 String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS, null, message.getBytes());
                System.out.println("Send Message is:'" + message + "'");
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

可以看到创建了exchange_routing_inform交换机,并绑定了两个队列

可以看到对列绑定了Routing_key

消费者

邮件发送消费者

public class Consumer03_routing_email {
    //队列名称
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建一个与MQ的连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
        //创建一个连接
        Connection connection = factory.newConnection();
        //创建与交换机的通道,每个通道代表一个会话
        Channel channel = connection.createChannel();
        //声明交换机 String exchange, BuiltinExchangeType type
        /**
         * 参数明细
         * 1、交换机名称
         * 2、交换机类型,fanout、topic、direct、headers
         */
        channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, "direct");
        //声明队列
        // channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        /**
         * 参数明细:
         * 1、队列名称
         * 2、是否持久化
         * 3、是否独占此队列
         * 4、队列不用是否自动删除
         * 5、参数
         */
        channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
        //交换机和队列绑定String queue, String exchange, String routingKey
        /**
         * 参数明细
         * 1、队列名称
         * 2、交换机名称
         * 3、路由key
         */
        channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL);
        //定义消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                long deliveryTag = envelope.getDeliveryTag();
                String exchange = envelope.getExchange();
                //消息内容
                String message = new String(body);
                System.out.println(message);
            }
        };
        /**
         * 监听队列String queue, boolean autoAck,Consumer callback
         * 参数明细
         * 1、队列名称
         * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置
         为false则需要手动回复
         * 3、消费消息的方法,消费者接收到消息后调用此方法
         */
        channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer);
    }

}

 思考

1、Routing模式和Publish/subscibe有啥区别?
Routing模式要求队列在绑定交换机时要指定routingkey,消息会转发到符合routingkey的队列。

 Topics

路由模式:
1、每个消费者监听自己的队列,并且设置带统配符的routingkey。
2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。 

案例:

根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种通知类型都接收的则两种通知都有效。

生产者

public class Producer04_topics {
    //队列名称
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email_topics";
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms_topics";
    private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //创建一个与MQ的连接
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
            //创建一个连接
            connection = factory.newConnection();
            //创建与交换机的通道,每个通道代表一个会话
            channel = connection.createChannel();
            //声明交换机 String exchange, BuiltinExchangeType type
            /**
             * 参数明细
             * 1、交换机名称
             * 2、交换机类型,fanout、topic、direct、headers
             */
            channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, "topic");
            //声明队列
            /**
             * 参数明细:
             * 1、队列名称
             * 2、是否持久化
             * 3、是否独占此队列
             * 4、队列不用是否自动删除
             * 5、参数
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
            channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);

            //绑定email通知队列
            channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,"inform.#.email.#");
            //绑定sms通知队列
            channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,"inform.#.sms.#");

            //发送邮件消息
            for (int i = 0; i < 10; i++) {
                String message = "email inform to user" + i;
                //向交换机发送消息 String exchange, String routingKey, BasicProperties props, byte[] body
                /**
                 * 参数明细
                 * 1、交换机名称,不指令使用默认交换机名称 Default Exchange
                 * 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消
                 息将发到此队列
                 * 3、消息属性
                 * 4、消息内容
                 */
                channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.email", null, message.getBytes());
                System.out.println("Send Message is:'" + message + "'");
            }
            //发送短信消息
            for (int i = 0; i < 10; i++) {
                String message = "sms inform to user" + i;
                channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms", null, message.getBytes());
                System.out.println("Send Message is:'" + message + "'");
            }
            //发送短信和邮件消息
            for (int i = 0; i < 10; i++) {
                String message = "sms and email inform to user" + i;
                channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms.email", null, message.getBytes());
                System.out.println("Send Message is:'" + message + "'");
            }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

可以看到exchange_topics_inform绑定了两个Routing_key,进行通配符匹配

匹配规则 

统配符规则:
中间以“.”分隔。
符号#可以匹配多个词,符号*可以匹配一个词语。

对列绑定的Routing_key为:

inform.#.email.#

inform.#.sms.#

发送消息指定的Routing_key为:

inform.email 匹配 inform.#.email.#

inform.sms 匹配 inform.#.sms.#

inform.sms.email 匹配 inform.#.email.# 和 inform.#.sms.# 两个队列都能发送

(即使什么都没有,#也可以匹配个空)

Header模式

header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列。

案例:
根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种通知类型都接收的则两种通知都有效

生产者

Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_type", "email");
Map<String, Object> headers_sms = new Hashtable<String, Object>();
headers_sms.put("inform_type", "sms");
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);

通知

String message = "email inform to user"+i;
Map<String,Object> headers = new Hashtable<String, Object>();
headers.put("inform_type", "email");//匹配email通知消费者绑定的header
//headers.put("inform_type", "sms");//匹配sms通知消费者绑定的header
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
properties.headers(headers);
//Email通知
channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());

发送邮件消费者 

channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS);
Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_email", "email");
//交换机和队列绑定
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
//指定消费队列
channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);

RPC

说明

RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:
1、客户端即是生产者就是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。
2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果
3、服务端将RPC方法 的结果发送到RPC响应队列
4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

七:SpringBoot整合RabbitMQ

待更新...

相关推荐

  1. RabbitMq总结

    2024-06-17 02:54:04       28 阅读
  2. RabbitMQ相关总结

    2024-06-17 02:54:04       40 阅读
  3. RabbitMQ学习

    2024-06-17 02:54:04       47 阅读
  4. RabbitMq学习

    2024-06-17 02:54:04       45 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-06-17 02:54:04       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-06-17 02:54:04       100 阅读
  3. 在Django里面运行非项目文件

    2024-06-17 02:54:04       82 阅读
  4. Python语言-面向对象

    2024-06-17 02:54:04       91 阅读

热门阅读

  1. 无法获取Response Header内容的问题

    2024-06-17 02:54:04       25 阅读
  2. python单元测试

    2024-06-17 02:54:04       20 阅读
  3. exit和_exit函数和atexit函数

    2024-06-17 02:54:04       28 阅读
  4. GitHub项目里的api

    2024-06-17 02:54:04       25 阅读
  5. 算法训练营day60

    2024-06-17 02:54:04       27 阅读
  6. 15. 三数之和

    2024-06-17 02:54:04       30 阅读
  7. 【C++】类相关知识

    2024-06-17 02:54:04       26 阅读
  8. ubuntu commands

    2024-06-17 02:54:04       27 阅读
  9. 【Linux】如何查看网卡驱动

    2024-06-17 02:54:04       30 阅读
  10. MySql 数据库、表常用语句

    2024-06-17 02:54:04       27 阅读
  11. Linux 常用命令

    2024-06-17 02:54:04       24 阅读