消息队列-RockMQ-定时延时发送消息

定时延时发送消息

任务需要延迟一段时间再进行处理。
生产者

public class Producer {
   
    public static void main(String[] args) throws Exception {
   
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("ip:9876");
        producer.start();
        List<Order> F = OrderBuilder.build(1, "A", "B", "C");
        List<Order> S = OrderBuilder.build(2, "D", "Q");
        List<Order> T = OrderBuilder.build(3, "N", "Q", "R");
        ArrayList<Order> orders = new ArrayList<Order>() {
   {
   
            addAll(F);
            addAll(S);
            addAll(T);
        }};
        for (Order order : orders) {
   
            Message msg = new Message("test-topic", "test-topic_str", order.toString().getBytes());
            msg.setKeys("test-topic_trace");
            // 官网提供了这些延迟级别 分别对应 0 1 2
            // messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
            // 重要的逻辑在这里设置队列延迟等级
            msg.setDelayTimeLevel(3);
            producer.send(msg);
        }
        System.out.println("finish");
        // 这里发送了两个Tag 的消息
        // 下面这个消息没有设置延迟时间
        for (Order order : orders) {
   
            Message msg = new Message("test-topic", "test-topic_str_other", ("other" + order.toString()).getBytes());
            msg.setKeys("test-topic_trace_other");
            
            producer.send(msg);
        }
        System.out.println("finish");
    }
}

消费者1订阅tag 为*的消息

public class Consumer {
   
    public static void main(String[] args) throws Exception {
   
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-group");
        consumer.setNamesrvAddr("ip:9876");
        // *表示订阅所有的消息
        consumer.subscribe("test-topic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
   
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
   
                for (MessageExt msg : msgs) {
   
                    System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

可以看到我们没有设置延迟发送和延迟发送的sentTime和recvTime是很有区别的:
在这里插入图片描述

我们看到test-group里面的消息总共有16条
在这里插入图片描述
消费者订阅tag为test-topic_str_other的消息

consumer.subscribe("test-topic", "test-topic_str_other");

在这里插入图片描述
消费者订阅tag为test-topic_str的消息

consumer.subscribe("test-topic", "test-topic_str");

在这里插入图片描述
通过上面的案例验证了:

// 后面这个值是根据消息的tag进行正则匹配的
consumer.subscribe("test-topic", "*");

源码的注解也有说明:
在这里插入图片描述

相关推荐

  1. 消息队列-RockMQ-重试参数设置

    2024-01-10 09:08:01       40 阅读
  2. SpringAMPQ(高级消息队列协议)消息发送与接收

    2024-01-10 09:08:01       17 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-01-10 09:08:01       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-01-10 09:08:01       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-01-10 09:08:01       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-01-10 09:08:01       20 阅读

热门阅读

  1. Unity两组Toggle一一对应关系

    2024-01-10 09:08:01       37 阅读
  2. Agisoft Metashape 地面点分类参数设置

    2024-01-10 09:08:01       43 阅读
  3. 如何使用 Golang 比较版本号大小?

    2024-01-10 09:08:01       33 阅读
  4. vim文本编辑器,常用命令

    2024-01-10 09:08:01       39 阅读
  5. Swagger2以及Spring Boot整合Swagger2教程

    2024-01-10 09:08:01       42 阅读