RocketMQ源码 Consumer 消费者源码架构分析

前言

消息消费者 MQConsumer 即消息的消费方,主要负责消息消息生产者 MQ Producer 投递的消息。它的源码架构如下图,以常用的消费者实现类 DefaultMQPushConsumer 视角分析消费者的源码架构,介绍消费者核心数据结构。

DefaultMQPushConsumer 内部依赖了 DefaultMQPushConsumerImpl 核心实现类,DefaultMQPushConsumerImpl 组件串联了消费者其他组件,如:RebalanceImpl 重平衡组件、OffsetStore偏移量存储组件、MessageListener消息监听组件、ConsumeMessageService 消息消费组件、PullAPIWrapper 消息拉取API组件,并实现了消费者常用功能:发送心跳、拉取消息、消费消息、提交偏移量、与远程broker和NameSrv网络通信。

这里只记录了消费者核心数据结构,细节功能实现,比如:发送心跳、注册消费者实例、长轮询拉取消息、并发消费消息、顺序消费消息,在后续其他文章中详细介绍。

源码版本:4.9.3

源码架构图

 核心数据结构

DefaultMQPushConsumerImpl 默认消费者实现组件

// 默认消费者实现组件
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    // 重要:重平衡组件
    private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
    // 网络通信客户端组件,重要
    private MQClientInstance mQClientFactory;
    // 拉取消息API组件,重要
    private PullAPIWrapper pullAPIWrapper;
    // 消息监听组件
    private MessageListener messageListenerInner;
    // 偏移量存储组件
    private OffsetStore offsetStore;
    // 消息消费服务组件
    private ConsumeMessageService consumeMessageService;
}

PullAPIWrapper 拉取消息API组件

// 拉取消息API组件
public class PullAPIWrapper {
    // 拉取消息broker节点映射表,key为MessageQueue,value为brokerId
    private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
        new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
    // 网络通信客户端组件
    private final MQClientInstance mQClientFactory;
}

RebalanceImpl 重平衡组件

// 重平衡组件实现
public abstract class RebalanceImpl {
    protected static final InternalLogger log = ClientLogger.getLog();
    // 远程消息队列和处理队列映射表,key:MessageQueue,value:ProcessQueue
    protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
    // topic 和 消息队列映射表,key:topic,value:MessageQueue集合
    protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
        new ConcurrentHashMap<String, Set<MessageQueue>>();
    // 内部订阅关系映射表,key:topic,value:SubscriptionData
    protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
        new ConcurrentHashMap<String, SubscriptionData>();
    // 负载均衡策略, 临近机房、同机房、轮询、一致性hash
    protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
}

RemoteBrokerOffsetStore 偏移量存储组件

public class RemoteBrokerOffsetStore implements OffsetStore {
    // 消费偏移量映射表,key为MessageQueue,value为offset
    private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
        new ConcurrentHashMap<MessageQueue, AtomicLong>();
}

ConsumeMessageService 消息消费服务接口

public interface ConsumeMessageService {
    // 启动
    void start();
    // 关闭
    void shutdown(long awaitTerminateMillis);
    // 更新核心线程池大小
    void updateCorePoolSize(int corePoolSize);
    // 增加核心线程池大小
    void incCorePoolSize();
    // 减少核心线程池大小
    void decCorePoolSize();
    // 获取核心线程池大小
    int getCorePoolSize();
    // 消费消息
    ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);

    void submitConsumeRequest(
            // 拉取到的消息
        final List<MessageExt> msgs,
        // 处理队列
        final ProcessQueue processQueue,
        // 拉取请求中的队列
        final MessageQueue messageQueue,
        // 是否需要派发到消费者
        final boolean dispathToConsume);
}

相关推荐

  1. Dubbo解读-Consumer消费端服务列表刷新

    2024-01-11 21:06:03       29 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-01-11 21:06:03       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-01-11 21:06:03       101 阅读
  3. 在Django里面运行非项目文件

    2024-01-11 21:06:03       82 阅读
  4. Python语言-面向对象

    2024-01-11 21:06:03       91 阅读

热门阅读

  1. 【无标题】

    2024-01-11 21:06:03       42 阅读
  2. 我国实施个人信息出境认证的要点

    2024-01-11 21:06:03       54 阅读
  3. SpringMVC-03

    2024-01-11 21:06:03       55 阅读
  4. Vue父子组件值的传递【极简版】

    2024-01-11 21:06:03       57 阅读
  5. CMake编译选项CMAKE_CXX_FLAGS详解

    2024-01-11 21:06:03       48 阅读