消息队列-RockMQ-批量收发实践

批量收发实战

发送消息是需要网络连接的如果我们单条发送吞吐量可能没有批量发送好。剖来那个发送可以减少网络IO开销,但是也不能一批次发送太多的数据,需要根据每条消息的大小和网络带宽来确定量的数目。
比如网络带宽为可以支持一次性发送8M的数据包,如果数据包确定不会超过8M,那么我们可以除以每条消息的大小(粗略估算),然后会得到一个数值,这个数值再取70%-80%留一定的缓冲空间。
如果我们一次性发送的数据超过了8M,就需要对这些消息进行分组发送,保证每一组的数据大小不超过8M,每一组发送的数量逻辑也是按照前面这样来计算。
在这里插入图片描述
生产者

public class Producer {
   
    public static void main(String[] args) throws Exception {
   
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("ip:9876");
        producer.start();
        List<Order> F = OrderBuilder.build(1, "A", "B", "C");
        List<Order> S = OrderBuilder.build(2, "D", "Q");
        List<Order> T = OrderBuilder.build(3, "N", "Q", "R");
        ArrayList<Order> orders = new ArrayList<Order>() {
   {
   
            addAll(F);
            addAll(S);
            addAll(T);
        }};
        List<Message> msgs = new ArrayList<>();
        for (Order order : orders) {
   
            Message msg = new Message("test-topic", "test-topic_str", order.toString().getBytes());
            msg.setKeys("test-topic_trace");
            msgs.add(msg);
        }
        producer.send(msgs);
    }
}

消费者1

public class Consumer {
   
    public static void main(String[] args) throws Exception{
   
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
        consumer.setNamesrvAddr("ip:9876");
        consumer.subscribe("test-topic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
   
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List< MessageExt > msgs, ConsumeConcurrentlyContext context) {
   
                for (MessageExt msg : msgs) {
   
                    System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

可以看到批量消费的时候没有保证顺序:
在这里插入图片描述

消费者2

public class Consumer2 {
   
    public static void main(String[] args) throws Exception{
   
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-group");
        consumer.setNamesrvAddr("ip:9876");
        consumer.subscribe("test-topic", "*");
        // 使用顺序的方式来消费MessageListenerOrderly
        consumer.registerMessageListener(new MessageListenerOrderly() {
   
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
   for (MessageExt msg : msgs) {
   
                    System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
    }
}

消费的时候没有产生顺序问题,完全是按照批量发送的顺序:
在这里插入图片描述

相关推荐

  1. 消息队列-RockMQ-重试参数设置

    2024-01-09 07:26:02       40 阅读
  2. Unix消息队列实例

    2024-01-09 07:26:02       16 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-01-09 07:26:02       19 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-01-09 07:26:02       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-01-09 07:26:02       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-01-09 07:26:02       20 阅读

热门阅读

  1. 传统图像处理学习笔记更新中

    2024-01-09 07:26:02       33 阅读
  2. 正则表达式

    2024-01-09 07:26:02       32 阅读
  3. css——box-shadow阴影效果

    2024-01-09 07:26:02       37 阅读
  4. 开源软件运维安全防护的六个手段

    2024-01-09 07:26:02       39 阅读
  5. redis原子命令和 lua 脚本解决并发问题

    2024-01-09 07:26:02       38 阅读
  6. qt day1

    qt day1

    2024-01-09 07:26:02      37 阅读
  7. 游戏辅助从0到1-C++调用游戏Lua脚本实现辅助

    2024-01-09 07:26:02       53 阅读
  8. spark读sqlserver出现的异常

    2024-01-09 07:26:02       41 阅读
  9. MySql02:增删改查

    2024-01-09 07:26:02       36 阅读
  10. 前端基础面试题

    2024-01-09 07:26:02       24 阅读
  11. pytorch 分布式 Node/Worker/Rank等基础概念

    2024-01-09 07:26:02       42 阅读
  12. nginx.conf 文件配置

    2024-01-09 07:26:02       31 阅读