Rocketmq学习4——Broker消息持久化原理源码浅析
- 作者: 五速梦信息网
- 时间: 2026年04月04日 13:29
一丶前言
- 本地缓存+rpc 请求namesever + 定时刷新,topic路由信息
- 负载均衡的选择一个Broker进行发送,还支持【故障转移(即支持规避短时间内发送失败的broker)】
- 基于netty实现的rpc进行消息发送
这一篇我们将学习,消息是如何持久化在broker上的
二丶概述
消息存储的流程如下:
- 发送消息: 生产者(Producer)发送消息到 Broker。
- 消息存储:Broker 接收到消息后,将消息存储在消息存储文件中,通常是 CommitLog 文件。 RocketMQ 使用了内存映射文件(MappedByteBuffer)来提高文件的读写速度,它可以将文件直接映射到虚拟内存,减少了文件 I/O 操作。
- 写入磁盘:RocketMQ 使用了顺序写的方式将消息写入到 CommitLog,这是因为顺序写磁盘的速度远快于随机写。
- 索引文件更新:为了提高查询效率,消息会被索引,索引信息存储在 ConsumerQueue 和 IndexFile 中。ConsumerQueue 存储了消息在 CommitLog 中的偏移量,而 IndexFile 存储了关键字到消息偏移量的映射。
- 数据刷盘:RocketMQ 提供两种消息刷盘方式:
- 同步刷盘和异步刷盘。同步刷盘会在消息确实写入磁盘后再向生产者确认消息发送成功,
- 异步刷盘则在写入操作系统 PageCache 后就确认,依靠操作系统异步将数据刷写到磁盘。
- HA 机制:为了保证数据的高可用性,RocketMQ 还提供了主从同步机制,从服务器可以从主服务器上复制数据,确保在主服务器宕机时,从服务器可以接管消息服务。
三丶broker是接收消息发送请求
BrokerControllerremotingServer

SEND_MESSAGE
NettyServerHandlerByteToMessageDecoder(rocketmq根据自己的协议实现了解码器——NettyDecoder)

其中会根据请求类型,获取到对应的Processor,消息发送一般最后由SendMessageProcessor处理
四丶rocketmq基于netty实现的远程服务处理请求的流程

SendMessageProcessor接收到请求的时候,不是立马在当前线程进行处理,而是将封装成一个任务,提交到业务线程池。
在提交之前,还是会进行当前broker是否关闭中,是否拒绝请求的判断。
如下是处理请求的大致流程

可看到绿色部分才是真正处理请求的部分,处理后将响应写到netty的channel中,实习响应!
五丶SendMessageProcessor 处理请求大致流程

rocketmq留了一堆扩展的钩子,最终在sendMessage方法中进行一系列的校验,包装消息为MessageExtBrokerInner,然后进行消息存储流程,源码如下

asyncPutMessage
看到这里你可能会疑问,那么同步消息发送者岂不是收不到响应,同步消息消费者还会block住么?
还是会的,因为

只有在MessageStore异步存储完消息后,才会回调doResponse写回响应!
这样做的目的在于将业务处理Executor,和消息存储Executor进行解耦
六丶消息持久化

可看到最终使用CommitLog进行消息存储
1.消息持久化前置流程

如上主要是进行一些校验,其中有两层锁
topicQueueKey
2.消息持久化

2.1 MappedFile文件创建

这里会构建出两个文件路径,这意味着会一次性创建两个文件,可看到文件名称是偏移量的大小——比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推
下面我们看看文件创建的源码:

requestTable
然后再背后存在一个线程,不断从队列中拿任务进行处理

可以看到MappedFile支持SPI机制,但是这里的代码让人作呕
new DefaultMappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool())
new DefaultMappedFile(req.getFilePath(), req.getFileSize())
不适应堆外内存缓冲

使用fileChannel.map创建mappedByteBuffer
使用堆外写缓冲

会从TransientStorePool中获取一个ByteBuffer

这里堆外缓冲是TransientStorePool初始化时申请的

2.2 文件预热
至此完成了文件的创建,rocktmq还会进行文件的预热:

预热的过程其实就是每隔4K写入0值,这样做的好处是:
提高文件的访问效率,尤其是在使用内存映射(Memory Mapped File,MMF)技术时。内存映射文件技术能将文件直接映射到操作系统的虚拟内存中,进而可以像访问内存一样访问这些文件,这样可以显著提高文件I/O的效率。
预热(mappedFile)的过程,主要是提前将文件内容加载到物理内存中,确保在实际使用这些文件时,能够避免或减少磁盘I/O带来的延迟。因为当进程首次访问内存映射文件中的某个部分时,如果这部分数据还没有加载到物理内存中,操作系统需要从磁盘中读取数据到物理内存,这个过程称为缺页中断(page fault)。缺页中断会导致一定的延迟。
进行预热主要是通过以下几种方式:
mlock
2.3 消息写入

写入的时候会获取ByteBuffer,如下:如果具备写堆外内存缓冲,那么使用堆外内存,反之使用mmap生成的byteBuffer

最终就是将消息按照消息格式put到ByteBuffer中
2.4 消息刷盘
当消息写入到ByteBuffer后,会进行持久化 和高可用同步副本

这里我们看下刷盘的源码

可以看到根据是否由堆外写缓冲和刷盘方式,会使用不同的service进行wakeup实现刷盘:
MappedFileTransientStorePoolByteBufferMappedFileMappedByteBufferMappedByteBuffer
RocketMQ通过这种方式实现了一种内存双写的机制:先写入堆外内存,然后再提交到内存映射文件中。这样做可以利用堆外内存池做一层缓冲,提高写入效率,同时减少JVM垃圾回收的压力。
MappedByteBufferMappedByteBuffer.force()MappedByteBufferFlushRealTimeServiceMappedByteBuffer.force()
在RocketMQ中,刷盘策略可以根据数据的重要性和对性能的要求来选择。如果数据安 全性要求极高,可以选择同步刷盘;如果追求高吞吐量,可以选择异步刷盘。
2.4.1 同步刷盘
同步刷盘,rocketmq的消息可以设置是否等待消息存储完成,如下

- 如果设置了等待刷盘成功,那么会向GroupCommitService中提交刷盘请求,然后返回对应future
- 如果没有,那么唤醒刷盘线程,然后返回
GroupCommitService
GroupCommitRequestGroupCommitService
GroupCommitServiceputRequestCountDownLatch.await()
GroupCommitServiceCommitLogGroupCommitRequestwakeupCustomerCountDownLatch
如下是GroupCommitService处理刷盘,和异步刷盘的不同在于其会设置刷盘future状态,从而让等待刷盘的线程被唤醒

2.4.3 异步刷盘

异步刷盘针对是否开启了堆外写缓冲会调用不同的Service
GroupCommitServiceCommitRealTimeServiceCommitLog
七丶高可用
让我们回到CommitLog#asyncPutMessage方法,可以看到下面有一个高可用的处理(needHandleHA)

那什么是否需要刷新到其他副本昵?

Message必须setWaitStoreMsgOK(true),且消息存储表明需要副本,并且角色是SYNC_MASTER
那么高可用如何实现的?

下面是RocketMq高可用机制:
RocketMQ 通过其 HAService(高可用性服务)实现了主从同步复制,确保了消息的高可用性。它的工作原理是在主Broker(Master)上的CommitLog更新之后,这些更新会被复制到一个或多个从Broker(Slave)上。这样,即使主Broker发生故障,从Broker也可以接管工作,保证消息服务的可用性。
HAServiceHAConnection
1.Master 端
HAService
HAServiceHAServiceHAConnectionHAServiceHAConnectionHAServiceHAConnection
2. Slave 端
HAConnection
HAConnection
通过这种方式,RocketMQ的HAService确保了消息数据在Master和Slave之间实时同步,即使在Master出现故障的情况下,也能保证服务的高可用性。
需要注意的是,这种主从同步机制虽然提供了高可用性,但它可能会对消息的发送性能产生一定影响,因为Master需要在将消息存储到本地CommitLog并且同步到从Broker之后才能向生产者发送确认响应。此外,如果从Broker落后于Master太多,也有可能影响整体的同步效率。
brokerRoleSYNC_MASTER
八丶总结
感觉rocketmq代码写的很垃圾,但是功能还是实现了的。其落盘+副本同步,再很多其他中间件中也是适用的
1.顺序写
CommitLog 的写入被视为磁盘的顺序写,主要是因为 RocketMQ 采用了顺序向 CommitLog 文件追加消息的方式进行数据记录。消息生产者产生的消息按照接受的顺序依次追加到 CommitLog 文件的尾部,而不是随机分散地写到文件的不同位置。
顺序写为什么比随机写快:
1.机械硬盘(HDD)的顺序写性能通常远高于随机写,因为顺序写不需要硬盘头移动去查找不同的写入位置,而是连续在同一轨道上写入数据,大大减少了寻址时间。即使在固态硬盘(SSD)中,顺序写也有一些优势,因为 SSD 的写入操作涉及擦除以前的数据块然后再写入新数据。顺序写可以减少数据移动和合并操作,提高了 SSD 的写入效率。
2.文件系统一般也对顺序写进行了优化,能够更好地利用缓存和预取策略,提高写入效率。
3.顺序写有助于减少 I/O 操作的开销,增加了操作的预测性,使操作系统和硬件能够对写入进行优化。
2.mmap内存映射文件
内存映射文件(Memory-Mapped File,简称 mmap)
- 内存映射:mmap 通过将磁盘上的文件映射到虚拟内存的方式,使得应用程序可以像访问内存一样直接读写文件区域,避免了传统的文件I/O操作(read/write)中用户空间和内核空间之间上下文切换的开销。
- 操作系统缓存:mmap 的数据可以被操作系统自动地缓存,提高了数据访问速度。操作系统会负责将修改过的内存数据同步回文件,减少了显式的读写操作
3.文件预热
预热(mappedFile)的过程,主要是提前将文件内容加载到物理内存中,确保在实际使用这些文件时,能够避免或减少磁盘I/O带来的延迟。因为当进程首次访问内存映射文件中的某个部分时,如果这部分数据还没有加载到物理内存中,操作系统需要从磁盘中读取数据到物理内存,这个过程称为缺页中断(page fault)。缺页中断会导致一定的延迟。
4.堆外内存写缓冲
rocketmq支持开启堆外写缓冲,优先写到DirectByteBuffer中,然后使用FileChannel#write刷新到pageCache,这样做好处是可以将多个消息先聚合到堆外byteBuffer然后一次性写入到page'Cache,减少系统调用。
5.HA机制
同步到副本,避免master宕机时消息丢失。
弊端是如果写master成功,同步副本失败,消息生产者maybe重试,导致消息重复,以及同步副本带来的延迟降低了系统的吞吐量。
相关文章
-
RocketMQ中PullConsumer的消息拉取源码分析
RocketMQ中PullConsumer的消息拉取源码分析
- 互联网
- 2026年04月04日
-
rollup 本地启动
rollup 本地启动
- 互联网
- 2026年04月04日
-
ROP寻找gadget
ROP寻找gadget
- 互联网
- 2026年04月04日
-
robotframework日期 不留时间
robotframework日期 不留时间
- 互联网
- 2026年04月04日
-
roadhog版本的影响
roadhog版本的影响
- 互联网
- 2026年04月04日
-
rn info.plist 文件
rn info.plist 文件
- 互联网
- 2026年04月04日





