RabbitMQ(二)

简单模式(HellWorld)

RabbitMQ依赖

<dependency> 
	<groupId>com.rabbitmq</groupId> 
	<artifactId>amqp-client</artifactId> 
	<version>5.8.0</version> 
</dependency> 

消息生产者

public class Producer { 
    private final static String QUEUE_NAME = "hello"; 
    public static void main(String[] args) throws Exception { 
        //创建一个连接工厂 
        ConnectionFactory factory = new ConnectionFactory(); 
        factory.setHost("182.92.234.71"); 
        factory.setUsername("admin"); 
        factory.setPassword("123"); 
        //channel实现了自动close接口 自动关闭 不需要显示关闭 
        try(Connection connection = factory.newConnection(); //创建链接
        Channel channel = connection.createChannel() //获取信道) { 
            /** 
             queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        	参数:
            1. queue:队列名称
            2. durable:是否持久化,当mq重启之后,还在
            3. exclusive:
                * 是否独占。只能有一个消费者监听这队列,true可以多个消费者消费 
                * 当Connection关闭时,是否删除队列
            4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
            5. arguments:其他参数。
            */ 
            channel.queueDeclare(QUEUE_NAME,false,false,false,null); 
            String message="hello world"; 
            /**
            basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
        	参数:
            7. exchange:交换机名称。简单模式下交换机会使用默认的 ""
            8. routingKey:路由名称
            9. props:配置信息
            10. body:发送消息数据
         	*/ 
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); 
            System.out.println("消息发送完毕"); 
        } 
    } 
} 

消息消费者

public class Consumer { 
    private final static String QUEUE_NAME = "hello"; 
    public static void main(String[] args) throws Exception { 
    	//创建一个连接工厂 
        ConnectionFactory factory = new ConnectionFactory(); 
        factory.setHost("182.92.234.71"); 
        factory.setUsername("admin"); 
        factory.setPassword("123"); 
        Connection connection = factory.newConnection(); 
        Channel channel = connection.createChannel(); 
        System.out.println("等待接收消息...."); 
        
        //推送的消息如何进行消费的接口回调 
        DeliverCallback deliverCallback=(consumerTag,delivery)->{ 
            String message= new String(delivery.getBody()); 
            System.out.println(message); 
        }; 
        //取消消费的一个回调接口 如在消费的时候队列被删除掉了 
        CancelCallback cancelCallback=(consumerTag)->{ 
            System.out.println("消息消费被中断"); 
        }; 
        /** 
         * 消费者消费消息 
         * 1.消费哪个队列 
         * 2.消费成功之后是否要自动应答 true代表自动应答 false手动应答 
         * 3.消费者未成功消费的回调 
         */ 
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); 
    } 
} 

工作队列模式(Work queues)

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

在这里插入图片描述

实现工作队列

抽取连接工厂的工具类

从简单模式(HelloWorld)中不难发现,生产者与消费者的创建连接工厂是一样的,那么我们就可以将其抽取出来封装成一个工具类,减少重复代买的书写

/*
* 此类为连接工厂创建信道的工具类
* */
public class RabbitMqUtils {
     // 得到一个连接的channel
    public static Channel getChannel() throws IOException, TimeoutException {
        // 创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.163.128");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

消费者(工作线程)代码
public class Worker01 {
    // 队列名称
    public static final String QUEUE_NAME = "hello";

    // 接受消息
    public static void main(String[] args) throws IOException, TimeoutException {
        //使用工具类创建连接工厂并获取信道
        Channel channel = RabbitMqUtils.getChannel();
        // 接受消息参数
        DeliverCallback deliverCallback = (consumerTag,message) -> {
            System.out.println("接受到的消息:"+message.getBody());
        };
        // 取消消费参数
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println(consumerTag+"消费者取消消费借口回调逻辑");
        };
        // 消息的接受
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

生产者代码
public class Task01 {
    // 队列名称
    public static final String QUEUE_NAME = "hello";

    // 发送大量消息
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        // 队列的声明
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        // 从控制台中输入消息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("发送消息完成:"+ message);

        }
    }
}

主题模式(Topic)

Topic模式与Direct模式相比,他们都可以根据Routing key把消息路由到对应的队列上,但是Topic模式相较于Direct来说,它可以基于多个标准进行路由。也就是在队列绑定Routing key的时候使用通配符。这让Topic模式相较于Direct模式灵活性更大。

Topic的要求

发送到类型是topic交换机的消息的routing_key不能随意写,必须是由"."进行分隔的单词列表,最大限制为255字节

通配符规则

  • *(星号)可以代替一个单词
  • #(井号)可以替代零个或多个单词

Topic匹配案例

在这里插入图片描述

从图中看下表示例结论

routingKey 结果
quick.orange.rabbit 被队列Q1Q2接收到
lazy.orange.elephant 被队列Q1Q2接收到
quick.orange.fox 被队列Q1接收到
lazy.brown.fox 被队列Q2接收到
lazy.pink.rabbit 虽然满足两个绑定但只被队列Q2接收一次
quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit 是四个单词但匹配Q2

注意:

当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像fanout了

如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是direct了

生产者代码

public class EmitLogTopic { 
    private static final String EXCHANGE_NAME = "topic_logs"; 
    public static void main(String[] argv) throws Exception { 
        try (Channel channel = RabbitUtils.getChannel()) { 
            channel.exchangeDeclare(EXCHANGE_NAME, "topic"); 
            /** 
             * Q1-->绑定的是 
             *      中间带orange带3个单词的字符串(*.orange.*) 
             * Q2-->绑定的是 
             *      最后一个单词是rabbit的3个单词(*.*.rabbit) 
             *      第一个单词是lazy的多个单词(lazy.#) 
             * 
             */ 
            Map<String, String> bindingKeyMap = new HashMap<>(); 
            bindingKeyMap.put("quick.orange.rabbit","被队列Q1Q2接收到"); 
            bindingKeyMap.put("lazy.orange.elephant","被队列Q1Q2接收到"); 
            bindingKeyMap.put("quick.orange.fox","被队列Q1接收到"); 
            bindingKeyMap.put("lazy.brown.fox","被队列Q2接收到"); 

            bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列Q2接收一次"); 
            bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃"); 
            bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃"); 
            bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配Q2"); 
 			
            // 从map中获取发送消息的routingKey和消息本体
            for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){ 
                String bindingKey = bindingKeyEntry.getKey(); 
                String message = bindingKeyEntry.getValue(); 
                channel.basicPublish(EXCHANGE_NAME,bindingKey, null, 
message.getBytes("UTF-8")); 
                System.out.println("生产者发出消息" + message); 
            } 
        } 
    } 
}

消费者代码

public class ReceiveLogsTopic01 { 
    private static final String EXCHANGE_NAME = "topic_logs"; 
    public static void main(String[] argv) throws Exception { 
        Channel channel = RabbitUtils.getChannel(); 
        channel.exchangeDeclare(EXCHANGE_NAME, "topic"); 
        //声明Q1队列与绑定关系 
        // String queueName="Q1"; 可以改为以下方式灵活获取队列名称
        String queueName = channel.queueDeclare().getQueue();
        channel.queueDeclare(queueName, false, false, false, null); 
        channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*"); 
 
        System.out.println("等待接收消息....."); 
        DeliverCallback deliverCallback = (consumerTag, delivery) -> { 
            String message = new String(delivery.getBody(), "UTF-8"); 
            System.out.println("接收队列:"+queueName+"绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message); 
        }; 
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { 
        }); 
    } 
}
public class ReceiveLogsTopic02 { 
    private static final String EXCHANGE_NAME = "topic_logs"; 
    public static void main(String[] argv) throws Exception { 
        Channel channel = RabbitUtils.getChannel(); 
        channel.exchangeDeclare(EXCHANGE_NAME, "topic"); 
        //声明Q2队列与绑定关系 
        String queueName="Q2"; 
        channel.queueDeclare(queueName, false, false, false, null); 
        channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit"); 
        channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#"); 
        System.out.println("等待接收消息....."); 
        DeliverCallback deliverCallback = (consumerTag, delivery) -> { 
            String message = new String(delivery.getBody(), "UTF-8"); 
            System.out.println("接收队列:"+queueName+"绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message); 
        }; 
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { 
        }); 
    } 
} 

相关推荐

  1. RabbitMQ项目实战(

    2024-04-02 01:08:03       14 阅读
  2. RabbitMQ常用命令(

    2024-04-02 01:08:03       32 阅读
  3. Spring Boot 集成 RabbitMQ()

    2024-04-02 01:08:03       13 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-04-02 01:08:03       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-04-02 01:08:03       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-04-02 01:08:03       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-04-02 01:08:03       18 阅读

热门阅读

  1. 汇编——SSE对齐(一. 未对齐情况)

    2024-04-02 01:08:03       13 阅读
  2. 【qt】打开图像、保存图像

    2024-04-02 01:08:03       12 阅读
  3. table Diffusion 的Web 用户界面简介

    2024-04-02 01:08:03       14 阅读
  4. 详解SPWM与SVPWM的原理、算法以及两者的区别

    2024-04-02 01:08:03       14 阅读
  5. 服了,一线城市的后端都卷成这样了吗!?

    2024-04-02 01:08:03       14 阅读