RabbitMQ交换机(2)-Direct

1.Direct

直连(路由)交换机,生产者将消息发送到交换机,并指定消息的Routing Key(路由键)。交换机会将Routing Key与队列绑定进行匹配,如果匹配成功,则将该消息路由到对应的队列中。如果没有匹配成功,该消息将被丢弃或返回给生产者。在Direct模式中,每个消息只能被一个消费者接收。
通过使用Exchange和Routing Key来进行消息传输,Direct模式实现了消息的有选择性地路由,提高了消息传输的效率,减少了系统负载.
在这里插入图片描述
在这里插入图片描述
如上图中的routingKey为error绑定队列disk,routingKey为info或warning绑定队列console。

2.生产者

package com.hong.rabbitmq7;

import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;

/**
 * @Description: 直连模式消息发送者
 * @Author: hong
 * @Date: 2024-01-15 22:24
 * @Version: 1.0
 **/
public class DirectSend {
   
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception {
   
        Channel channel = RabbitMQUtil.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        Map<String,String> map = new HashMap<>();
        map.put("info","我是info信息");
        map.put("debug","我是debug信息");
        map.put("warning","我是warning信息");
        map.put("error","我是error信息");

        for(Map.Entry<String,String> bindingKeys : map.entrySet()){
   
            String bindingKey = bindingKeys.getKey();
            String message = bindingKeys.getValue();
            channel.basicPublish(EXCHANGE_NAME,bindingKey,null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("消息发送完成------" + message);
        }
    }
}

在这里插入图片描述

3.消费者1-disk只接受error消息

package com.hong.rabbitmq7;

import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;

/**
 * @Description: 直连模式消息接收者1-disk接收error消息
 * @Author: hong
 * @Date: 2024-01-15 20:22
 * @Version: 1.0
 **/
public class Receiver1 {
   
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception {
   
        Channel channel = RabbitMQUtil.getChannel();
        /*
         *声明交换机
         *第1个参数:交换机名称
         *第2个参数:交换机类型
         */
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //声明队列
        String queueName = "disk";
        channel.queueDeclare(queueName,false,false,false,null);

        /*
         * 绑定队列与交换机
         * 第1个参数:队列名称
         * 第2个参数:交换机名称
         * 第3个参数:routingKey
         */
        channel.queueBind(queueName,EXCHANGE_NAME,"error");

        DeliverCallback deliverCallback = (comsumerTag, message) -> {
   
            System.out.println("disk中的:"+  new String(message.getBody(), StandardCharsets.UTF_8));
        };

        CancelCallback cancelCallback = var -> {
   
        };

        channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
    }
}

在这里插入图片描述

4.消费2-console接收info和warning消息

package com.hong.rabbitmq7;

import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;

/**
 * @Description: 直连模式消息接收者2-console接收info和warning消息
 * @Author: hong
 * @Date: 2024-01-15 20:22
 * @Version: 1.0
 **/
public class Receiver2 {
   
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception {
   
        Channel channel = RabbitMQUtil.getChannel();
        /*
         *声明交换机
         *第1个参数:交换机名称
         *第2个参数:交换机类型
         */
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //声明队列
        String queueName = "console";
        channel.queueDeclare(queueName,false,false,false,null);

        /*
         * 绑定队列与交换机
         * 第1个参数:队列名称
         * 第2个参数:交换机名称
         * 第3个参数:routingKey
         */
        channel.queueBind(queueName,EXCHANGE_NAME,"info");
        channel.queueBind(queueName,EXCHANGE_NAME,"warning");

        DeliverCallback deliverCallback = (comsumerTag, message) -> {
   
            System.out.println("console中的:"+  new String(message.getBody(), StandardCharsets.UTF_8));
        };

        CancelCallback cancelCallback = var -> {
   
        };

        channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
    }
}

在这里插入图片描述
在这里插入图片描述

相关推荐

  1. RabbitMq——direct交换器和fanout交换器 扇形交换器

    2024-01-16 23:14:03       41 阅读
  2. RabbitMQ交换机

    2024-01-16 23:14:03       51 阅读
  3. RabbitMQ交换机

    2024-01-16 23:14:03       30 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-01-16 23:14:03       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-01-16 23:14:03       100 阅读
  3. 在Django里面运行非项目文件

    2024-01-16 23:14:03       82 阅读
  4. Python语言-面向对象

    2024-01-16 23:14:03       91 阅读

热门阅读

  1. 【Kubernetes】K8s 查看 Pod 的状态

    2024-01-16 23:14:03       53 阅读
  2. CSS笔记

    CSS笔记

    2024-01-16 23:14:03      49 阅读
  3. 常见的排序算法

    2024-01-16 23:14:03       59 阅读
  4. go快速生成二维码

    2024-01-16 23:14:03       56 阅读
  5. Qt获取当前系统网络接口信息

    2024-01-16 23:14:03       52 阅读
  6. Kafka

    2024-01-16 23:14:03       55 阅读
  7. 大模型学习第五课

    2024-01-16 23:14:03       58 阅读
  8. Vue生成图片并下载

    2024-01-16 23:14:03       54 阅读
  9. 2、合并两张图像

    2024-01-16 23:14:03       53 阅读
  10. vuex是什么?怎么使用?哪种功能场景使用它?

    2024-01-16 23:14:03       56 阅读