从零开始读RocketMq源码(三)Broker存储Message流程解析

目录

前言

准备

消息载体CommitLog

文件持久化位置 

源码解析

broker消息对象MessageExtBrokerInner

异步存储message

CommitLog的真相

创建MappedFile文件

加入异步刷盘队列

Message异步存储MappedByteBuffer

总结


前言

在面试中我们经常会听到这样的回答,生产者将message发送给broker服务,然后消费者从broker中获取消息并消费,为了保证message在broker服务中不丢失,mq会对消息数据进行持久化到磁盘中。那么message到达broker服务后是如何进行存储并持久化到磁盘中的呢?这就是本篇要学习的内容。

准备

源码地址:https://github.com/apache/rocketmq

目前最新版本为:5.2.0

那么我们在idea上切换分支为 release-5.2.0

消息载体CommitLog

该对象是broker服务接收到message后进行存储的数据对象,一般就把存储消息的文件就称为commitLog文件也就是最终存储磁盘上的数据文件。

大致的message流向如图:

根据源码可以知道,一个commitLog文件最大存储1G数据,文件写满了,则会写入下一个文件中

文件持久化位置 

commitlog文件的持久化存放的位置是通过broker.conf配置文件中storePathCommitLog配置

storePathCommitLog = /Users/leonsh/rocketmqnamesrv/store/commitlog

最后生成的文件为这样

文件命名

查看上面图片可知文件的名称是一串数字20个0组成,因为文件名称是按照偏移量offset来命名的,

因为这是第一个文件所以offset为0,补全20位,所以文件名称为20个0

,以此类推第二个文件名称则为00000000001073741824

上面说过一个commitlog文件最大存储1G,而1G=1024*1024*1024=1073741824bit,这就是第二个文件的偏移量

源码解析

前面说到Producer发送message到broker后,broker会对接收的message请求进行处理

//源码位置
//包名:org.apache.rocketmq.broker.processor
//文件:SendMessageProcessor
//行数:87
public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request)

上面的方法名中顾名思义就是处理请求的,并且所在的文件命名SendMessageProcessor也说明了该类的作用。那么我们就从该方法深入源码中

看方法引用位置我们会发现许多地方调用了该方法,先抛开前面broker如何接收的,反正最后消息会到达这里,从该方法开始就是broker处理message的核心流程也是本篇学习的重点

broker消息对象MessageExtBrokerInner

MessageExtBrokerInner该对象就是用来后续对message处理的封装

//源码位置
//包名:org.apache.rocketmq.broker.processor
//文件:SendMessageProcessor
//行数:255
//获取请求对象中的消息体
final byte[] body = request.getBody();
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {
    queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
}
//初始化消息对象
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
  • requestHeader 该对象就是在上一篇中讲到的发送message的消息请求头
  • 从请求头中获取设置的队列id,如果没有设置,则会从对应的topic中随机获取一个randomQueueId()
  • 从请求头中获取topic名称,通过名称再去获取broker中存储的topic对应的数据对象,深入源码会发现,broker中存储topic数据也是使用的map,ConcurrentMap<String, TopicConfig> topicConfigTable
  • 最后就是创建MessageExtBrokerInner对象并设值

异步存储message

//源码位置
//包名:org.apache.rocketmq.broker.processor
//文件:SendMessageProcessor
//行数:255
CompletableFuture<PutMessageResult> asyncPutMessageFuture;
if (sendTransactionPrepareMessage) {
    //事务消息
    asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
    //普通消息
    asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}

或许大家和博主开始一样都有一个疑惑,我们生产者发送的是同步消息,为何到了broker却是异常存储呢?

1.其实生产者发送同步消息和broker异步存储都是相互独立互不干扰的,broker异步存储只是为了提高mq接收消息的写入性能吞吐量broker异步存储会将写入内存的message进行异步刷盘。

2.就算broker是异步存储但也不会立即返回结果给生产者,需要等待broker异步刷盘成功才会返回结果给生产者,通过broker提供的CompletableFuture机制实现。

什么,看完解释还是有点懵,有点抽象,我们继续向下深入源码,一步一步解开疑惑,我相信看完后面的解析同样会豁然开朗的!

CommitLog的真相

//源码位置
//包名:org.apache.rocketmq.broker.processor
//文件:SendMessageProcessor
//行数:255
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);

到这里,本文开头提到的commitLog对象终于出现了,查看该源码可知,commitlog对象中定义了一个MappedFileQueue对象这个对象又是做什么的,我们继续深入源码

//源码位置
//包名:org.apache.rocketmq.store
//文件:CommitLog
//行数:942
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

深入该方法,大概意思就是从MappedFileQueue对象中的CopyOnWriteArrayList<MappedFile> mappedFiles集合中取出里面的最后一个MappedFile对象,至此赢来大结局MappedFile对象才是最终映射到磁盘文件的,而CommitLog可以理解为MappedFile对象的外层封装。但落到磁盘上的文件我们依然称为commitLog文件

扩展:

CopyOnWriteArrayList 是 Java 中的一种线程安全的 List 实现,属于 java.util.concurrent 包

读操作:不需要加锁,直接操作底层数组,底层数组在写操作时是一个副本,读操作不会影响正在进行的写操作,能够保证高效的并发读性能。

写操作:会创建底层数组的一个新的副本,对这个副本进行修改, 修改完成后,新的副本会替换原来的数组

创建MappedFile文件

//源码位置
//包名:org.apache.rocketmq.store
//文件:CommitLog
//行数:1001
if (null == mappedFile || mappedFile.isFull()) {
    mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
    if (isCloseReadAhead()) {
        setFileReadMode(mappedFile, LibC.MADV_RANDOM);
    }
}

因为broker是启动后首次存储数据,所以上面获取出来的mappedFile一定为空则进入if代码块

因此偏移量也是初始值0

生成MappedFile文件路径名称

//源码位置
//包名:org.apache.rocketmq.store
//文件:MappedFileQueue
//行数:345
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset
                                                                                    + this.mappedFileSize);
  • this.storePath:该字段就是前面在broker.conf文件中配置的文件地址
  • File.separator:分隔符
  • UtilAll.offset2FileName(createOffset):生成20位数字组成的文件名称,当前createOffset=0。

为何会生成两个地址nextFilePathnextNextFilePath呢?

因为mq在生成当前需要使用的文件时同时生成下一个使用的文件,当第一个文件存储满后,直接使用下一个文件,减小了创建文件的开销,提高mq的性能。所以会同时生成2个文件

那么问题来了,为何本文开头生成的文件怎么只有一个?

我们查看源码提交记录可知,nextNextFilePath第二个文件是2021年9月才新增的

查看rocketMq在github上各个版本的发布时间,2021年9月并没有发布新版本,但是2021年10月发布了rocketmq-all-4.9.2

那么由此可得,rocketMq同时创建2个文件从版本4.9.2开始支持,之前的版本都只会创建1个文件

因为博主的broker服务是通过docker镜像启动的,但是查看镜像版本显示的确为最新版本

其实这只是rocketMq镜像的版本,而我们看的是镜像中使用rocketMq框架版本

执行命令查看镜像的详细信息

docker inspect apacherocketmq/rocketmq:latest

由此可得博主的rocketMq版本为:4.6.0,所以只会创建一个commitLog文件

加入异步刷盘队列

//源码位置
//包名:org.apache.rocketmq.store
//文件:AllocateMappedFileService
//行数:62
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
//...
//加入队列触发异步刷盘操作
boolean offerOK = this.requestQueue.offer(nextReq);
//...
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
//...
boolean offerOK = this.requestQueue.offer(nextNextReq);
  • AllocateRequest:就是message异步存储请求最后的封装
  • this.requestTable:也是一个map对象 ConcurrentMap<String, AllocateRequest> requestTable;key为文件的路径,value则为AllocateRequest
  • this.requestQueue这是一个队列PriorityBlockingQueue<AllocateRequest> requestQueue队列元素为AllocateRequest

PriorityBlockingQueue是如何做到异步刷盘的呢?

该队列就是为broker实现异步存储核心,可能大家对这个队列比较陌生

它是Java 中 java.util.concurrent 包提供的一个线程安全的优先级队列。它基于优先级堆实现,能够保证元素按照自然顺序或者指定的比较器顺序进行排序

因为它是一个队列那么我们首先就会想到生产者消费者,那么就起到了异步解耦的作用

他有两个非常重要的方法:

  • offer(): 将一个元素插入到队列中
  • take(): 从队列中获取并移除元素 由于 PriorityBlockingQueue 是一个阻塞队列,如果队列为空,take 方法会一直阻塞直到有元素可用

总结:由上面我们知道offer()一般用于生产者调用,而take()则是消费者调用,当队列为空时消费者线程会一直阻塞,只要队列中存入对象,消费者就会感知到并消费。可以理解为消费者和生产者共享PriorityBlockingQueue对象

//源码位置
//包名:org.apache.rocketmq.store
//文件:AllocateMappedFileService
//行数:99
AllocateRequest result = this.requestTable.get(nextFilePath);
//...
//阻塞等待刷盘结果
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);

上面源码的作用就是等待异步刷盘结果

  • 第一段就是取出之前存入的第一个请求对象AllocateRequest
  • 第二段则是判断异步刷盘是否完成,成功则返回,还没有处理完则一直阻塞,直到达到超时时间waitTimeOut

result.getCountDownLatch().await为何能做到阻塞等待结果呢?

进入AllocateRequest对象中可知,操作的是这个对象CountDownLatch countDownLatch = new CountDownLatch(1)

CountDownLatch或许大家不太熟悉,但ReentrantLock大家并不陌生吧,面试中经常问到,他们同属于java并发包JUC( java.util.concurrent 下的对象.

概念:它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。它是通过一个计数器实现的,该计数器初始化为一个给定的值。每当一个线程完成了它的一项操作后,这个计数器就递减。当计数器的值到达零时,等待在这个计数器上的线程将被唤醒并继续执行

总结:通过源码我们看到AllocateRequest被创建时里面属性CountDownLatch中计数器默认就是1所以需要一直等待被修改为0时才会继续执行后续逻辑,那就是等待异步刷盘完成。

Message异步存储MappedByteBuffer

//源码位置
//包名:org.apache.rocketmq.store
//文件:AllocateMappedFileService
//行数:155
AllocateRequest req = null;
req = this.requestQueue.take();
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());

该源码就是对之前加入队列的AllocateRequest取出来,并执行后续的存储操作,可以说就是消费者消费的地方,我们可以结合源码上下文代码可以知道,所在的类的顶级继承类是Runnable,而上面代码所在方法就是被重写的run()方法调用,可以认为消费者是在单独的一个线程中执行的。

获取缓冲区

//源码位置
//包名:org.apache.rocketmq.store.logfile
//文件:DefaultMappedFile
//行数:607
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();

被操作的对象是MappedByteBuffer

MappedByteBuffer是什么?

是 Java NIO(New Input/Output)中的一个类,它允许将文件直接映射到内存中,从而提高文件的读写效率。RocketMQ 使用 MappedByteBuffer 来管理 CommitLog 文件,以实现高效的消息存储和检索通过将文件映射到内存,RocketMQ 可以直接操作内存数据,而无需频繁的磁盘 I/O 操作。

MappedByteBuffer也是mmap的一种实现方式

什么是mmap?

mmap(内存映射文件)是一种将文件内容映射到进程的地址空间的技术。这样一来,文件内容就可以像访问内存一样被读写,从而显著提高 I/O 操作的效率。

调用mappedByteBuffer.slice()方法的作用是什么?

用于创建一个新的缓冲区,该缓冲区与原始缓冲区共享相同的底层内存,但具有独立的位置、限制和标记。这在需要操作内存映射文件的某一部分时非常有用,而不影响整个映射文件的其他部分。

MappedByteBuffer有两大特点:

  • 延迟写入:数据写入 MappedByteBuffer 时,实际上是写入了内存中的映射区域,操作系统会在合适的时候将这些数据同步到磁盘,而不是立即进行磁盘 I/O 操作。
  • 强制刷新:为了确保数据的一致性和持久性,MappedByteBuffer 提供了 force() 方法,可以将内存中的修改强制刷新到磁盘
//源码位置
//包名:org.apache.rocketmq.store.logfile
//文件:DefaultMappedFile
//行数:611
byteBuffer.put((int) i, (byte) 0);
//...
mappedByteBuffer.force();

总结:那么在RocketMQ 中,MappedFile 类通过使用 MappedByteBuffer 来管理 CommitLog 文件,并且使用 slice() 方法来创建子缓冲区进行局部操作,通过延迟写入减少了频繁的磁盘 I/O 操作,定期调用 force() 方法,将内存中的数据同步到磁盘,减少数据丢失的风险。这样可以提高性能和灵活性,特别是在处理大量消息时。


内存数据的刷盘过程本篇就不在深究,只要知道是通过MappedByteBuffer对延迟写入配置相关策略,并在设定的时期将内存数据写入磁盘文件中就可以了


基于上面所有内容重新修改一版简易的流程图如下

总结

本篇涉及到的知识面比较广,在broker存储message中出现了许多我们在日常开发中并不常见但功能强大的对象,比如PriorityBlockingQueueCountDownLatchMappedByteBufferRocketMq正是合理的运用了他们,从而造就了rocketMq本身这款优秀的消息队列框架,这也是我们读源码所要学习的。下一篇我们将学习RocketMq的“大脑”NameServer!

相关推荐

  1. Dubbo解读-Consumer调用流程

    2024-07-11 03:44:01       28 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-11 03:44:01       67 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-11 03:44:01       71 阅读
  3. 在Django里面运行非项目文件

    2024-07-11 03:44:01       58 阅读
  4. Python语言-面向对象

    2024-07-11 03:44:01       69 阅读

热门阅读

  1. QT入门详解含源码)

    2024-07-11 03:44:01       27 阅读
  2. 前端程序员常用快捷键

    2024-07-11 03:44:01       24 阅读
  3. React@16.x(54)Redux@4.x(3)- reducer

    2024-07-11 03:44:01       25 阅读
  4. Linux 安装 docker-compose

    2024-07-11 03:44:01       20 阅读
  5. PostgreSQL

    2024-07-11 03:44:01       20 阅读
  6. Perl词法切分器:文本解析的瑞士军刀

    2024-07-11 03:44:01       18 阅读
  7. 解决Spring Boot中的安全漏洞与防护策略

    2024-07-11 03:44:01       20 阅读