RocketMQ—RocketMQ发送同步、异步、单向、延迟、批量、顺序、批量消息、带标签消息

RocketMQ—RocketMQ发送同步、异步、单向、延迟、批量、顺序、批量消息、带标签消息

发送同步消息

同步消息

生产者发送消息,mq进行确认,然后返回给生产者状态。这就是同步消息。

前文demo程序就是发送的同步消息。

发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。发送完以后会有一个异步消息通知。

代码如下:

/**
     * 异步消息测试
     */
@Test
public void simpleAsyncProducer() throws Exception {
   
    //创建一个生产者,并指定一个组名
    DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");
    //连接namesrv,参数是namesrv的ip地址:端口号
    producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    //启动
    producer.start();


    //指定topic,创建一个消息
    Message message = new Message("asyncTopic1", "这是一条异步消息".getBytes());

    //发送异步消息,并设置回调内容
    producer.send(message, new SendCallback() {
   
        @Override
        public void onSuccess(SendResult sendResult) {
   
            log.info("回调内容,发送成功");
        }

        @Override
        public void onException(Throwable throwable) {
   
            log.info("回调内容,发送失败");
        }
    });


    log.info("主线程执行中=========");

    System.in.read();
}

运行结果

从运行结果可以看到是不同的线程输出的内容。

发送单向消息

这种方式主要用在不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,例如日志信息的发送。

代码如下:

@Test
public void oneWayMessageTest() throws Exception {
   
    //创建一个生产者,并指定一个组名
    DefaultMQProducer producer = new DefaultMQProducer("oneway-producer-group");
    //连接namesrv,参数是namesrv的ip地址:端口号
    producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    //启动
    producer.start();


    //指定topic,创建一个消息
    Message message = new Message("onewayTopic1", "这是一条单向消息".getBytes());

    //发送单向消息
    producer.sendOneway(message);

    producer.shutdown();
}

发送延迟消息

消息放入mq后,过一段时间,才会被监听到,然后消费.

比如下订单业务,提交了一个订单就可以发送一个延时消息,30min后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

代码如下

@Test
public void msMessageTest() throws Exception{
   
    //创建一个生产者,并指定一个组名
    DefaultMQProducer producer = new DefaultMQProducer("ms-producer-group");
    //连接namesrv,参数是namesrv的ip地址:端口号
    producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    //启动
    producer.start();

    //指定topic,创建一个消息
    Message message = new Message("msTopic1", "这是一条单向消息".getBytes());
    //给消息设置一个延迟时间
    message.setDelayTimeLevel(3);

    //发送延时消息
    producer.sendOneway(message);

    producer.shutdown();
}

延时等级如下:

消息延时等级

发送批量消息

代码如下:

@Test
public void testBatchProducer() throws Exception {
   
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-batch-group");
    //连接namesrv,参数是namesrv的ip地址:端口号
    producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    // 启动实例
    producer.start();
    List<Message> msgs = Arrays.asList(
        new Message("batchTopicTest", "我是一组消息的A消息".getBytes()),
        new Message("batchTopicTest", "我是一组消息的B消息".getBytes()),
        new Message("batchTopicTest", "我是一组消息的C消息".getBytes())

    );
    SendResult send = producer.send(msgs);
    System.out.println(send);
    // 关闭实例
    producer.shutdown();
}

这些消息会被放到同一个队列中。

发送顺序消息

可以想象一个场景,我们在网上购物时,需要先完成下订单操作,然后再去发短信,再进行发货,需要保证顺序的。

前文我们讲的都是并发消息,这种消息并不能完成上述的场景逻辑。比如一个topic里有10个消息,分别在4个队列中;

  • 如果消费者,同时有20个线程在消费,可能A线程拿到消息1了,B线程拿到消息2了,但是B线程可能完成的比A线程早,这就没办法上述场景的顺序了。
  • 如果消费者只有一个线程,轮询消费四个队列中的消息时,也不能保证是网购场景中的顺序的。

这就要引出顺序消息:把消费者变为单线程,把下订单消息、发短信消息、发货消息放到同一个队列就可以了。

代码

消息封装成实体类如下:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageModel {
   
    //订单id
    private String orderId;
    //用户id
    private String userId;
    //消息描述
    private String description;
}

发送顺序消息的生产者代码如下:

/**
     * 顺序消息
     */

@Test
public void testOrderlyProducer() throws Exception {
   

    List<MessageModel> messageModelList = Arrays.asList(
        //用户1的订单
        new MessageModel("order-111","user-1","下单"),
        new MessageModel("order-111","user-1","发短信"),
        new MessageModel("order-111","user-1","发货"),

        //用户2的订单
        new MessageModel("order-222","user-2","下单"),
        new MessageModel("order-222","user-2","发短信"),
        new MessageModel("order-222","user-2","发货")
    );

    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-orderly-group");
    //连接namesrv,参数是namesrv的ip地址:端口号
    producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    // 启动实例
    producer.start();

    //发送顺序消息时 发送时相同用户的消息要保证有序,并且发到同一个队列里
    messageModelList.forEach(
        messageModel->{
   
            Message message = new Message("orderlyTopic", messageModel.toString().getBytes());
            try {
   
                //发送消息,相同订单号去相同队列
                producer.send(message, new MessageQueueSelector() {
   
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message message, Object arg) {
   
                        //producer.send(message,selector,arg),第三个参数订单号会传给selector要实现的方法的arg
                        //在这里选择队列
                        int hashCode = Math.abs(arg.toString().hashCode());
                        int index = hashCode % mqs.size();
                        return mqs.get(index);
                    }
                }, messageModel.getOrderId());
            } catch (Exception e) {
   
                log.error("有错误发生",e);
            }
        }
    );
    // 关闭实例
    producer.shutdown();
    log.info("发送完成");
}

消费顺序消息的消费者代码如下:

//消费者
@Test
public void orderlyConsumer() throws Exception {
   
    //创建一个消费者,并指定一个组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-orderly-consumer-group");
    //连接namesrv,参数是namesrv的ip地址:端口号
    consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    //订阅一个主题 *号表示订阅这个主题中所有的消息
    consumer.subscribe("orderlyTopic","*");
    //设置一个监听器(一直监听,异步回调方式)
    consumer.registerMessageListener(new MessageListenerOrderly() {
   
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
   
            log.info("线程id"+Thread.currentThread().getId());
            log.info("消息内容:"+new String(msgs.get(0).getBody()));
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });

    //启动消费者
    consumer.start();

    //挂起当前jvm,防止主线程结束,让监听器一直监听
    System.in.read();

}

运行结果如下:

运行结果

可以看到同一个订单是顺序消费的。

其他问题

如果我们的消息消费失败了怎么办?

如果是并发模式,消费失败会进行重试,重试16次后还会没消费成功,会被放到死信队列里。

如果是顺序模式,如果重试失败,会无限重试,是int的最大值。

发送带标签的消息,消息过滤

如果我们有衣服订单的消息、手机订单的消息,如果我们只使用topic进行区分,就要使用两个topic;但是它们都是订单,所以在同一个topic中会好一些,Rocketmq就提供了消息过滤功能,通过tag或者key进行区分。

生产者代码如下:

@Test
public void testTagProducer() throws Exception {
   
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-tag-group");
    //连接namesrv,参数是namesrv的ip地址:端口号
    producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    // 启动实例
    producer.start();
    Message messageTopic1 = new Message("tagTopic", "tag1", "这是topic1的消息".getBytes());
    Message messageTopic2 = new Message("tagTopic", "tag2", "这是topic2的消息".getBytes());
    producer.send(messageTopic1);
    producer.send(messageTopic2);
    // 关闭实例
    producer.shutdown();
}

消费tag1的消费者

//消费tag1的消费者
@Test
public void tagConsumer1() throws Exception {
   
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");
    consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    consumer.subscribe("tagTopic", "tag1");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
   
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
   
            System.out.println("我是tag1的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

消费tag1和tag2的消费者

//消费tag1和tag2的消费者
@Test
public void tagConsumer2() throws Exception {
   
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");
    consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    consumer.subscribe("tagTopic", "tag1 || tag2");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
   
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
   
            System.out.println("我是tag1和tag2的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

带key的消息

消息都会有自己的MessageId的,如下图:

messageid

那我们能否指定id呢?

在发送消息时可以指定key:

@Test
public void testKeyProducer() throws Exception {
   
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-key-group");
    //连接namesrv,参数是namesrv的ip地址:端口号
    producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);

    String key = UUID.randomUUID().toString();

    // 启动实例
    producer.start();
    Message messageTopic1 = new Message("keyTopic", "tag1",key, "这是topic1的消息".getBytes());
    producer.send(messageTopic1);
    // 关闭实例
    producer.shutdown();
}

消费者获取key:

@Test
public void testKeyConsumer() throws Exception {
   
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("key-consumer-group-a");
    consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    consumer.subscribe("keyTopic","*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
   
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
   
            System.out.println("我们设置的key:" + msgs.get(0).getKeys());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

输出如下:

输出

最近更新

  1. TCP协议是安全的吗?

    2024-02-04 14:20:03       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-02-04 14:20:03       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-02-04 14:20:03       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-02-04 14:20:03       20 阅读

热门阅读

  1. 全志H713 Android 11 :给AOSP源码,新增一个Product

    2024-02-04 14:20:03       30 阅读
  2. 【SpringBoot1】Spring Boot是如何推断你的工程类型的

    2024-02-04 14:20:03       38 阅读
  3. redis的数据淘汰测略

    2024-02-04 14:20:03       33 阅读
  4. C++的十宗罪:如何避免常见的错误和陷阱

    2024-02-04 14:20:03       33 阅读
  5. 调试OpenHarmony应用/服务

    2024-02-04 14:20:03       36 阅读
  6. 讲解机器学习中的 K-均值聚类算法及其优缺点

    2024-02-04 14:20:03       33 阅读
  7. SparingBoot高级-数据交换格式-请求响应

    2024-02-04 14:20:03       26 阅读
  8. apt轻松管理软件包的利器

    2024-02-04 14:20:03       33 阅读
  9. SparkException: A master URL必须在配置中设置

    2024-02-04 14:20:03       33 阅读
  10. 数据结构:用顺序表和单链表实现通讯录(上)

    2024-02-04 14:20:03       27 阅读
  11. Django商城项目实战

    2024-02-04 14:20:03       38 阅读