【消息队列】MQ进阶篇之 RocketMQ 的理论

这是理论篇,实践篇参考【消息队列】MQ进阶篇之 RocketMQ 的实践

一、Apache RocketMQ 部署架构

主要分为四部分:

1、生产者 Producer

发布消息的角色。Producer通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败和重试。

  • 消息 RocketMQ 消息构成,Message 可以设置的属性值包括:如下图所示。
    在这里插入图片描述
  • Topic 与 Tag 都是业务上用来归类的标识,区别在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。使用 Tag 可以实现对 Topic 中的消息进行过滤。Topic 和 Tag 的关系如下图所示。具体区别参考【消息队列】RocketMQ 消息的 topic 和 tag 区别以及使用方式

    在这里插入图片描述
  • 队列 为了支持高并发和水平扩展,需要对 Topic 进行分区,在 RocketMQ 中这被称为队列,一个 Topic 可能有多个队列,并且可能分布在不同的 Broker 上。
    在这里插入图片描述
    一般来说一条消息,如果没有重复发送(比如因为服务端没有响应而进行重试),则只会存在在 Topic 的其中一个队列中,消息在队列中按照先进先出的原则存储,每条消息会有自己的位点,每个队列会统计当前消息的总条数,这个称为最大位点 MaxOffset;队列的起始位置对应的位置叫做起始位点 MinOffset。队列可以提升消息发送和消费的并发度。

2、消费者 Consumer

消息消费的角色。在 Apache RocketMQ 中,消费者的概念涉及到一些重要的概念,包括消费组、消费位点、负载均衡以及消息推送和拉取。下面对这些概念进行简要介绍:

  • 支持以推(push),拉(pull)两种模式对消息进行消费。

  • 同时也支持集群方式和广播方式的消费。

  • 提供实时消息订阅机制,可以满足大多数用户的需求。

  • 消费组(Consumer Group)

    • 在消费者中消费组的有非常重要的作用,如果多个消费者设置了相同的Consumer Group,我们认为这些消费者在同一个消费组内。
    • 消费组(Consumer Group)是一组共享相同消费逻辑的消费者实例的集合。消费组的主要作用是协同处理消息,确保消息在消费者集合内得到分发,以达到负载均衡和容错的目的。
  • 消费位点(Consumer Offset)

  • 消费位点表示消费者在主题中的消费进度,即消费者已经成功消费的消息的位置。RocketMQ 使用消费位点来记录消费者的消费状态。

  • 消费者在订阅主题时,可以选择从最早的消息开始消费(EARLIEST)或从最新的消息开始消费(LATEST)。

    如上图所示,在Apache RocketMQ中每个队列都会记录自己的最小位点、最大位点。针对于消费组,还有消费位点的概念,在集群模式下,消费位点是由客户端提给交服务端保存的,在广播模式下,消费位点是由客户端自己保存的。一般情况下消费位点正常更新,不会出现消息重复,但如果消费者发生崩溃或有新的消费者加入群组,就会触发重平衡,重平衡完成后,每个消费者可能会分配到新的队列,而不是之前处理的队列。为了能继续之前的工作,消费者需要读取每个队列最后一次的提交的消费位点,然后从消费位点处继续拉取消息。但在实际执行过程中,由于客户端提交给服务端的消费位点并不是实时的,所以重平衡就可能会导致消息少量重复。

  • 负载均衡

    消费者负载均衡是指消费者组内的多个消费者实例之间如何分配和协同处理消息。RocketMQ 提供了两种负载均衡算法:平均分配和加权分配。

    • 平均分配是将消息均匀地分配给消费者组内的每个消费者实例。
    • 加权分配则根据消费者实例的权重来分配消息,以实现不同消费者实例之间的不同处理能力。
  • 消息推送和拉取:

    • 消息推送是指消息 Broker 主动推送给消费者,消费者注册监听器,当有消息到达时触发回调。
    • 消息拉取是指消费者主动从 Broker 拉取消息,可以根据需要控制拉取的频率和数量。

在 RocketMQ 中,消费者通过配置消费组消费者实例名称来加入消费组。通过设置消费者的订阅信息消费位点,可以控制消息的消费行为。负载均衡和消息推送/拉取方式的选择取决于具体的业务需求和性能要求。

总体而言,这些概念帮助构建了 RocketMQ 中的消息消费体系,使得消息能够被有效地分发和处理。

3、名字服务器 NameServer

NameServer(Name Server)是 RocketMQ 分布式架构中的一个关键组件。NameServer 的主要作用是管理整个 RocketMQ 集群的元数据信息,包括 T o p i c \color{Green}Topic Topic B r o k e r \color{Green}Broker Broker Q u e u e \color{Green}Queue Queue 等信息,最主要的是管理 Broker 和 路由信息。以下是 NameServer 的主要功能和作用:

  • Broker 注册: 当一个 Broker 启动时,它会向 NameServer 注册自己的信息,包括 Broker 的名称、地址、版本等。NameServer 将这些信息保存下来,用于后续的路由查找。Broker 是存储消息Topic的 代理服务器,是实际部署过程对应的代理服务器。
  • Topic 路由: 当生产者发送消息或消费者订阅消息时,NameServer 负责为相应的 Topic 提供路由信息。路由信息包括哪些 Broker 拥有该 Topic 的消息队列,以及这些队列的数量。每个NameServer将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常会有多个实例部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。
  • 负载均衡: NameServer 负责监控 Broker 的状态,并提供负载均衡。它可以检测 Broker 的上线和下线,以及负载的情况,从而为生产者和消费者提供最优的路由信息。
  • 心跳检测: NameServer 通过定期的心跳检测来保持与 Broker 的连接状态。如果某个 Broker 长时间未响应,NameServer 将移除该 Broker 的注册信息,确保系统的稳定性。
  • 提供查询服务: 开发者可以通过查询 NameServer 来获取整个 RocketMQ 集群的状态信息,包括各个 Broker 的信息、Topic 路由信息等。

在 RocketMQ 集群中,通常有多个 NameServer 以提供高可用性。生产者和消费者通过连接任意一个可用的 NameServer 来获取集群的路由信息。这种设计保证了系统的稳定性和可扩展性。

总体而言,NameServer 在 RocketMQ 中充当了集群元数据管理和路由分发的重要角色,确保消息在集群中的顺利传递和负载均衡。

4、代理服务器 Broker

在 Apache RocketMQ 中,代理服务器通常被称为 Broker。RocketMQ Broker 是整个消息队列系统中的核心组件,负责接收、存储和传递消息。扮演了消息的中转站和存储库的重要角色。以下是 RocketMQ Broker 的主要功能和特点:

  • 消息存储: Broker 负责将生产者发送的消息存储在持久化存储(如磁盘)中,以确保消息在需要时能够被消费者获取。
  • 消息传递: Broker 负责将存储在持久化存储中的消息传递给相应的消费者。消息的传递可以是点对点传递,也可以是发布/订阅模式下的广播传递。
  • 消息路由: Broker 管理主题(Topic)和队列(Queue)之间的消息路由,确保消息能够被正确地传递到目标队列。
  • 负载均衡: RocketMQ 支持多个 Broker 组成的集群,Broker 之间可以进行负载均衡,以提高系统的吞吐量和可用性。
  • 可靠性和持久性: Broker 提供机制来确保消息的可靠性传递,并可以配置消息的持久性,以防止数据丢失。
  • 集群管理: 在 RocketMQ 中,Broker 通过 Namesrv(Name
    Server)注册自己的信息,实现集群的管理。Namesrv 负责为生产者和消费者提供路由信息。
  • 高可用性: RocketMQ 支持主从复制,即每个主题的队列都可以有多个副本,确保在某个 Broker 宕机时消息仍然可用。

RocketMQ 中的 Broker 有两个主要角色:Master(主节点)Slave(从节点)。Master 负责处理写入消息的操作,而 Slave 负责从 Master 复制消息,以保持数据一致性。当 Master 节点发生故障时,系统可以自动或手动切换到 Slave 节点,提高系统的可用性。

在 Master-Slave 架构中,Broker 分为 Master 与 Slave。一个Master可以对应多个Slave,但是一个Slave只能对应一个Master。Master 与 Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。

每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。
·
Producer 与 NameServer 集群中的其中一个节点建立长连接,定期从NameServer 获取Topic路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态。
·
Consumer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息。

二、消息发送分类

1、普通消息发送

  • 特点: 普通消息是最常见的消息类型,不要求严格的顺序,也没有延迟要求。

  • 使用场景: 适用于不要求消息按照特定顺序处理,也无需延迟的业务场景。

    import org.apache.rocketmq.client.producer.DefaultMQProducer; 
    import org.apache.rocketmq.common.message.Message;
      
      DefaultMQProducer producer = new
      DefaultMQProducer("YourProducerGroup");
      producer.setNamesrvAddr("localhost:9876"); producer.start();
      
      // 普通消息发送 Message message = new Message("YourTopic", "YourTag",
      "Hello RocketMQ".getBytes()); producer.send(message);
      
      producer.shutdown(); 
    

2、顺序消息发送

  • 特点: 顺序消息要求消息的发送和消费按照一定的顺序进行,同一个消息队列上的消息按照发送和消费的顺序处理。
  • 逻辑: 生产者通过设置业务关键值,选择消息队列,确保相关的消息有序发送和处理。
  • 应用场景: 适用于业务需要强制保证消息的顺序性的场景,如订单处理、流程审批等。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

DefaultMQProducer producer = new DefaultMQProducer("YourProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();

// 顺序消息发送
for (int i = 0; i < 10; i++) {
   
    Message message = new Message("YourTopic", "YourTag", ("OrderMessage " + i).getBytes());
    producer.send(message, new MessageQueueSelector() {
   
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
   
            int orderId = (Integer) arg;
            int index = orderId % mqs.size();
            return mqs.get(index);
        }
    }, i); // 传入业务关键值,用于选择消息队列
}

producer.shutdown();

3、延迟消息发送

  • 特点: 延迟消息允许设置消息在发送后延迟一定时间才能被消费。
  • 逻辑: 生产者设置消息的延迟级别,Broker 在指定时间后将消息解锁供消费。
  • 应用场景: 适用于需要在未来的特定时间点触发消息处理的场景,如定时任务、提醒等。
    在这里插入图片描述
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

DefaultMQProducer producer = new DefaultMQProducer("YourProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();

// 延迟消息发送,设置延迟级别(1-18)
Message message = new Message("YourTopic", "YourTag", "Delayed Message".getBytes());
message.setDelayTimeLevel(3); // 设置延迟级别,对应延迟时间

producer.send(message);

producer.shutdown();

4、批量消息发送

  • 特点: 批量消息允许将多个消息一次性发送到 Broker,减少网络开销,提高发送效率。
  • 逻辑: 生产者将多个消息打包成批量消息,一并发送到 Broker。
  • 应用场景: 适用于需要发送大量消息的场景,如日志收集、数据同步等。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

DefaultMQProducer producer = new DefaultMQProducer("YourProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();

// 批量消息发送
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
   
    Message message = new Message("YourTopic", "YourTag", ("BatchMessage " + i).getBytes());
    messages.add(message);
}
producer.send(messages);

producer.shutdown();

5、事务消息发送

  • 特点: 事务消息允许在消息发送和确认之间执行本地事务,以确保消息的最终一致性。
  • 逻辑: 生产者发送半消息,执行本地事务逻辑,根据事务结果提交或回滚消息。
  • 应用场景: 适用于需要保障消息发送和本地事务执行的一致性的场景,如订单处理、支付等。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

DefaultMQProducer producer = new DefaultMQProducer("YourProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();

// 事务消息发送
TransactionListener transactionListener = new YourTransactionListenerImpl();
producer.setTransactionListener(transactionListener);

Message message = new Message("YourTopic", "YourTag", "Transactional Message".getBytes());
producer.sendMessageInTransaction(message, new LocalTransactionExecuter() {
   
    @Override
    public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
   
        // 执行本地事务逻辑,返回事务状态
        // ...
        return LocalTransactionState.COMMIT_MESSAGE; // 或者 ROLLBACK_MESSAGE、UNKNOW
    }
}, null);

producer.shutdown();

三、消息获取分类

在 Apache RocketMQ 中,消费者的消息消费方式主要有 Push 模式和 Pull 模式。这两种模式有不同的特点和适用场景:

1、Push 消费:

  • 特点: 在 Push 消费模式下,消息 Broker 主动推送消息给消费者。消费者需要注册消息监听器,一旦有新消息到达,Broker 将主动将消息推送给消费者。
  • 逻辑: 消费者注册消息监听器,当有新消息到达时,消息 Broker 主动推送消息给监听器,消费者通过监听器处理消息。
  • 应用场景: 适用于实时性要求较高的场景,消息到达后立即进行处理,例如实时监控、通知推送等。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YourConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");

// 设置消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
   
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
   
        // 处理消息的逻辑...
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

consumer.subscribe("YourTopic", "YourTag");
consumer.start();

2、Pull 消费:

  • 特点: 在 Pull 消费模式下,消费者主动从 Broker 拉取消息。消费者通过定时拉取的方式获取消息,控制拉取的速度和频率。
  • 逻辑: 消费者定时调用拉取消息的方法,获取消息并进行处理。
  • 应用场景: 适用于对实时性要求不高,可以按照一定频率批量处理消息的场景,例如离线数据处理、大数据批处理等。
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.consumer.PullTaskCallback;
import org.apache.rocketmq.client.consumer.PullTaskContext;
import org.apache.rocketmq.common.message.MessageExt;

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("YourConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");

// 定义拉取消息的任务
PullTaskCallback callback = new PullTaskCallback() {
   
    @Override
    public void doPullTask(final MessageQueue mq, final PullTaskContext context) {
   
        // 拉取消息的逻辑...
    }
};

// 注册拉取消息的任务
consumer.registerPullTaskCallback("YourTopic", callback);

// 启动消费者
consumer.start();

总体而言,Push 模式适用于实时性要求高的场景,消息到达时立即处理;而 Pull 模式适用于对实时性要求较低,可以按照一定频率批量处理消息的场景。选择合适的消费模式取决于业务需求和对消息处理时效性的要求。

详情实践参考【消息队列】MQ进阶篇之 RocketMQ 的实践

持续更新中。。。

相关推荐

  1. SQL理论(一):数据库调优

    2024-01-19 19:06:01       41 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-01-19 19:06:01       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-01-19 19:06:01       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-01-19 19:06:01       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-01-19 19:06:01       20 阅读

热门阅读

  1. 《设计模式的艺术》笔记 - 组合模式

    2024-01-19 19:06:01       25 阅读
  2. 开发安全之:SQL Injection

    2024-01-19 19:06:01       25 阅读
  3. RockerMQ发送消息流程

    2024-01-19 19:06:01       36 阅读
  4. Qt 遍历多个按钮单击信号带参数,绑定到一个槽

    2024-01-19 19:06:01       32 阅读
  5. 对比分析ChatGPT 和文心一言。

    2024-01-19 19:06:01       31 阅读
  6. Vue 做文件的上传和下载

    2024-01-19 19:06:01       34 阅读
  7. 公关部门的OKR

    2024-01-19 19:06:01       35 阅读
  8. Python os模块

    2024-01-19 19:06:01       33 阅读
  9. Docker修改默认根目录(含镜像位置)

    2024-01-19 19:06:01       33 阅读
  10. CLion指定远程编译目录

    2024-01-19 19:06:01       31 阅读