【源码解析】Apache RocketMQ发送消息源码

send message源码解析

引入

send message方法作为我们经常使用的方法,平时我们很难去关注他底层到底做了什么。大部分人只知道通过send message方法可以将消息发送到broker,然后供消费者进行消费。其实不然,消息从客户端发送到broker,需要中间需要经过很多步骤,比如:首先客户端需要向nameserver拿路由,拿到路由后才能将消息发送到对应的broker。消息到了broker,需要先进行校验,校验无误后,再写到commitLog,写完commitLog后,再根据具体的策略判断是否需要同步到slave节点,同步完slave节点完后才response给客户端。

源码阅读入口

// 客户端入口
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl

// NameServer入口
org.apache.rocketmq.namesrv.processor.ClientRequestProcessor#getRouteInfoByTopic

// Broker端入口
org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequest

源码解析

mid

  • RocketMQ中的客户端send方法提供了单条发送发送也批量发送的API,不管是单条发送还是批量发送本质都是一样的,批量发送会把消息集合包装一下了,具体可以看batch里面的实现,将消息集合封装了MessageBatch对象,当然MessageBatch继承Message。然后再尝试去topicPublishInfoTable中拿路由,如果没有就请求NameServer(忽略经过Proxy层),需要注意的是,请求NameServer获取路由的这个过程是同步的,同一时间只有一个线程可以请求NameServer,需要等到NameServer返回之后才会执行后续的操作。拿到路由后再根据轮询策略选中其中一个broker进行发送。这就是发送消息客户端大致的逻辑,总体来说是还是比较简单的。
  • CONSUMER_SEND_MSG_BACK是消费者发过来的RETRY消息,本次重点不在这里,后续单独讲下这里。当消息到达Broker’端,先根据请求头构建出一个MappingContext对象,再把request对象封装成sendMessageContext;执行注册到sendMessageProcessor里面的钩子方法sendMessageBefore;之后根据是否是batch消息,如果是batch消息,执行sendBatchMessage,不是执行sendMessage方法,其实本质上还是一样的,只是sendBatchMessage中间构建的是messageExtBatch对象,而sendMessage构建的是messageExtBrokerInner对象。MessageExtBatch是MessageExtBrokerInner的子类,所以两者后续还是共用一套逻辑;然后根据是否开启异步写入执行asyncPutMessage或者putMessage,同步的putMessage实际上还是调用的asynPutMessage,只是要等到asyncPutMessage有返回值之后才执行后续的逻辑。我们这里以asyncPutMessage为主,还是先执行注册到SendMessageProcessor里面的钩子方法SendMessageAfter,然后再先判断时候是否是HA(高可用,高可用是需要等到消息写入slave节点成功之后才说明消息发送成功,一般使用在一些金融场景,对消息可靠性要求较高。),然后再然后分配offset(这个offset是由consumeQueue分配的),分配完offset之后,分配完了之后,再将消息体append到commitLog的分配的buf中,返回的状态码PUT_OK执行handleDiskFlush方法,如果是配置的是同步刷盘就等到刷盘成功后返回,如果是异步刷盘,wakeup对应的FlushManager就算写入完成。
  • 上述执行成功后,执行handleHA方法,如果是不是HA模式执行response PUT_OK,否则,构建一个GroupCommitRequest对象put到haService里面,对应slave节点写完最终才算发送成功。

参考:
· https://rocketmq.apache.org/
· 基于Apache Rocket 5.1.0
· https://github.com/apache/rocketmq

相关推荐

  1. ffmplay 解读

    2024-01-07 08:34:03       6 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-01-07 08:34:03       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-01-07 08:34:03       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-01-07 08:34:03       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-01-07 08:34:03       20 阅读

热门阅读

  1. 2024.1.5 Hadoop各组件工作原理,面试题

    2024-01-07 08:34:03       27 阅读
  2. c# 学习笔记 - LINQ

    2024-01-07 08:34:03       34 阅读
  3. ElasticSearch删除索引的命令

    2024-01-07 08:34:03       40 阅读
  4. 2024年学习计划

    2024-01-07 08:34:03       50 阅读
  5. 牛客网编程题——“求IBSN码”

    2024-01-07 08:34:03       37 阅读
  6. Mybatis缓存相关面试题有多卷

    2024-01-07 08:34:03       31 阅读
  7. Android NumberPicker使用

    2024-01-07 08:34:03       43 阅读
  8. SQL SELECT 语句

    2024-01-07 08:34:03       38 阅读
  9. 大模型查询工具助手之股票免费查询接口

    2024-01-07 08:34:03       36 阅读
  10. 数据结构 —— 手写排序算法

    2024-01-07 08:34:03       47 阅读
  11. centoss7安装mysql详细教程

    2024-01-07 08:34:03       44 阅读
  12. Linux | 20 个常用的 Linux 基本指令

    2024-01-07 08:34:03       28 阅读