RocketMQ 消息的存储和查询原理

RocketMQ-消息存储
如上图所示,消息的存储分为如下 3 个部分:

  1. CommitLog:日志,存储消息主体;
  2. ConsumerQueue:在 CommitLog 中根据 Topic 和 Tag 检索消息是非常低效的,因此引入了 ConsumerQueue 作为消费消息的索引,它保存的其实是 CommitLog 中存储的消息的指针。
  3. IndexFile:hash 索引,提供一种通过 key 或时间区间来查询消息的方法。

消息存储

页缓存与内存映射

  1. ConsumerQueue 存储的数据少、且是顺序读取(按顺序被消费),因此在page cache机制的预读取作用下,ConsumerQueue 文件的读取性能几乎接近于读内存。
  2. RocketMQ 主要通过 MappedByteBuffer 对文件进行读写操作,其主要利用 NIO 中的 FileChannel 模型将磁盘上的物理文件直接映射到用户态的内存地址中,避免了传统 IO 方式需要将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间拷贝带来的开销。

CommitLog

CommitLog负责存储所有消息的内容,做个类比可以理解为CommitLog就是MySQL中的主键索引,ConsumerQueue就是MySQL中的辅助索引,主键索引存储的是完整数据,而辅助索引存储的是对主键索引的指针。

CommitLog写入消息的流程

org.apache.rocketmq.store.CommitLog#asyncPutMessage

  1. 构建消息体
  2. 获取可用的MappedFile文件
    刚开始会取到最后一个文件,并对该文件加锁,如果加锁后发现文件满了,那么再尝试重新创建一个新的文件
  3. 一个一个字段写入CommitLog文件
    CommitLog.DefaultAppendMessageCallback#doAppend
  4. 刷盘
    org.apache.rocketmq.store.CommitLog#submitFlushRequest
    看Broker配置的是同步还是异步刷盘,如果是异步刷盘则只是唤醒异步刷盘线程。
    FlushRealTimeService#wakeup
  5. 主从同步
    org.apache.rocketmq.store.CommitLog#submitReplicaRequest
    一般Master默认情况下都是ASYNC_MASTER,这种模式下消息都是异步同步给Slave的。

DLedgerCommitLog写入流程

DLedger模式和普通模式的区别主要在于消息的高可靠性,写入数据时:
io.openmessaging.storage.dledger.DLedgerServer#handleAppend

  1. 写入本地文件;
  2. 请求其他Slave,等待大多数(quorum)返回写入成功。

Topic 和 ConsumerQueue

在消息队列中主题和队列一般是包含的关系,刚开始消息队列只是一个单纯的队列,后来为了应对多消费者重复消费的需求,出现了发布订阅、Topic的概念,一个Topic中的消息是可以被订阅者重复消费的。
在RocketMQ中Topic和队列之间的关系如下图所示:
RocketMQ-Topic和ConsumerQueue
如上图所示,RocketMQ 消息存储中有三个容易混淆的概念:

  • Broker:消息代理服务器;
  • Topic:消息主题,一个 Topic 分片是 Topic 在一个 Broker 上的子集,是 Queue 的逻辑集合;
    RocketMQ-Queue和消费者
  • Queue:队列,负载均衡过程中资源分配的基本单元,每个 Queue 只有一个消费者,但一个消费者可以消费多个 Queue,可以避免消费过程中多线程的竞争,提高多 Consumer 的并发消费效率。

在RocketMQ中,订阅者的概念是通过消费组(Consumer Group)来体现的,多个消费组都能消费主题中的一份完整消息,不同消费组之间消费进度彼此互不影响。
但是要注意,一个消费组内可能会有多个消费者,这些消费者不能重复消费一条消息,主要一个消息被其中一个消费者消费了,那同组的其他消费者就不会再收到这条消息。
那么如何实现多个消费组能重复消费一条消息的功能呢?
在RocketMQ中,Broker会为每个消费组在每个队列上维护一个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。

为什么RocketMQ中都需要手动创建Topic?

1
2
# 自动创建topic,默认为false
autoCreateTopicEnable=false

使用rocketmq的时候,需要去控制台预先创建topic,为什么要这么设计,因为rocketmq topic的创建机制:
rocketmq在发送消息时,会先去获取topic的路由信息,如果topic是第一次发送消息,由于nameserver没有topic的路由信息,所以会再次以“TBW102”这个默认topic获取路由信息,假设broker都开启了自动创建开关,那么此时会获取所有broker的路由信息,消息的发送会根据负载算法选择其中一台Broker发送消息,消息到达broker后,发现本地没有该topic,会创建该topic的信息再塞进本地缓存中,同时会将topic路由信息注册到nameserver中,那么这样就会造成一个后果:以后所有该topic的消息,都将发送到这台broker上,如果该topic消息量非常大,会造成某个broker上负载过大,这样消息的存储就达不到负载均衡的目的了
因此,自动创建Topic开关一般只会在线下开启,线上是关闭的。

为什么RocketMQ中最好手动创建Consumer Group?

1
2
# 自动创建group,默认为true
autoCreateSubscriptionGroup=true

在开源版本RocketMQ中,这个开关是默认打开的,因为RocketMQ面向的是普通用户和量级不大的小公司,他们不想费精力去管控这些事情。
但是如果是一些大公司内部的自研版本,会把这个开关给闭掉,因为这个开关打开会带来潜在风险、资源管理风险,比如用户没有预估好用量、自动创建大量group,把name server搞挂。

消息刷盘

RocketMQ-刷盘
刷盘有两种策略:

  1. 同步刷盘,只有在消息真正持久化到磁盘中后 Broker 才会返回给 Producer 一个 ACK 响应。
    同步刷盘可以保证消息的可靠性,但是对性能会有比较大的影响。
  2. 异步刷盘,利用 OS 的 PageCache,只要消息写入 PageCache 即可将 ACK 返回给 Producer。
    消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了 MQ 的性能和吞吐量。

刷盘策略见配置文件broker.conf,默认为异步刷盘:

1
flushDiskType = ASYNC_FLUSH

CommitLog的刷盘

异步刷盘的实现代码见:org.apache.rocketmq.store.CommitLog.FlushRealTimeService
可以看到,默认情况下是每500ms刷新一次,刷盘时先找到要刷盘的MappedFile,刷盘利用了NIO的MappedByteBuffer
一般情况下每次刷4页,并且每10S将未同步的页面全量同步一次。

ConsumeQueue的刷盘

实现代码见FlushConsumeQueueService
每秒执行一次,一般每次刷2页,Broker将退出时将所有缓存页全量刷一次。

消息查询

按 Message ID 查询消息

MessageID 包含:

  • 消息存储的主机地址(IP 和端口号);
  • 消息存储的 Commit Log 的 offset;

按 Message ID 查询的过程是:

  1. Client 端从 MessageID 中解析出 Broker 地址和 CommitLog 的 offset 地址后封装为一个 RPC 请求,通过 Remoting 通信层发送;
  2. Broker 端走的是 QueryMessageProcessor,读取消息的过程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的记录并解析成一个完整的消息返回。

按 Message Key 查询消息

主要是基于 RocketMQ 的 IndexFile 索引文件来实现的,IndexFile 的结构类似 JDK 中 HashMap 的实现:
RocketMQ-IndexFile结构

按 Tag 过滤消息

一般来说,Topic 可以用于区分业务类型,Tag 可以细分比如数据的类型、状态等。
在 RocketMQ 中,Topic 对应的是一些 ConsumerQueue 文件,而 Tag 是消息的属性,用于过滤消息,RocketMQ 有别于其他 MQ 中间件,Tag 是在 Consumer 端执行过滤的,这和其存储结构有关,Tag 存储在两个地方:

  1. ConsumerQueue 存储 Tag 的 hash 值;
  2. CommitLog 存储 Tag 的原值。

过滤时:

  1. 服务端接收 Pull 消息的请求后,先根据消息 Tag hash 值去过滤 ConsumerQueue 中的数据,得到消息偏移量等信息后到 CommitLog 中查出消息数据;
  2. Consumer 端接收到消息后,还需要对消息的原始 Tag 字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。

消息堆积(积压)

在RocketMQ中,每条消息会记录一个它所在的偏移量offset,我们可以比较当前消息的offset和队列的总偏移量maxOffset来确定是否发生了消息堆积。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class PileUpConsumerTest {

public static void main(String[] args) throws MQClientException {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("PileUpTopicTest", "*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
long offset = msg.getQueueOffset();
String maxOffset = msg.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
long diff = Long.parseLong(maxOffset) - offset;
if(diff > 10) {
// 处理消息堆积情况
System.out.println("消息堆积 maxOffset:" + maxOffset + " currOffset:" + offset + " 消息堆积个数:" + diff);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
System.out.println("正常消费 maxOffset:" + maxOffset + " currOffset:" + offset + " 消息堆积个数:" + diff);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}

或者可以通过RocketMQ的Console界面来检查堆积情况:
RocketMQ-Console查看消息堆积1
RocketMQ-Console查看消息堆积2

发生消息堆积的主要原因是消息生产速度远远大于消息消费速度:

  • 如果堆积不严重,可以放着让消费者继续慢慢消费完;
  • 如果堆积严重且消息生产速度一直处于远远高于消费速度的程度,可以加机器来提升并发消费能力,但是单纯地加机器不能改善这种堆积情况,因为每个队列实际上只能被一个Consumer消费
    可以先将堆积的消息发给另一个Topic,该Topic有更多Consumer。
    或者,可以选择丢弃不重要的消息,即仅仅记录日志,而不真正消费,从而在不影响消息完整性的前提下,达到处理消息堆积问题的目的。
    解决消息堆积问题的根本还是优化系统本身,比如是赶上大促或抢购这种场景,就很容易引起消息堆积,短时间内不太可能优化消费端的代码来提升消费性能,唯一的方法是通过扩容消费端的实例数来提升总体的消费能力。如果短时间内没有足够的服务器资源进行扩容,没办法的办法是,将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务。

RocketMQ怎么保证高效读消息

  • 发送消息时,生产者端的消息确实是顺序写入CommitLog;
  • 订阅消息时,消费者端也是顺序读取ConsumeQueue,然而根据其中的起始物理位置偏移量offset读取消息真实内容却是随机读取CommitLog
  • RocketMQ通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型直接将磁盘上的物理文件直接映射到用户态的内存地址中,这样程序就好像可以直接从内存中完成对文件读/写操作一样。只有当缺页中断发生时,直接将文件从磁盘拷贝至用户态的进程空间内,只进行了一次数据拷贝。对于容量较大的文件来说(文件大小一般需要限制在1.5~2G以下),采用Mmap的方式其读/写的效率和性能都非常高。

优点:

  1. 对ConsumeQueue的读取是顺序读取,在pageCache机制的预读取作用下,ConsumeQueue的读性能会比较高近乎内存,即使在有消息堆积情况下也不会影响性能。
  2. ConsumeQueue消息逻辑队列较为轻量级;
  3. 对磁盘的访问串行化,避免磁盘竟争,不会因为队列增加导致IOWAIT增高;

缺点:

  1. 对于CommitLog来说写入消息虽然是顺序写,但是读却变成了完全的随机读;
  2. Consumer端订阅消费一条消息,需要先读ConsumeQueue,再读CommitLog,一定程度上增加了开销;

QA

我们公司有多个测试环境,消费消息时通过TAG来区分环境,结果导致有的消息消费不到,这是发生甚么事了?

这些消息有着相同的Topic,并且Consumer都有着相同的group。

  1. 根据Rebalance原理,每个Consumer都会分到固定的几个Queue,多个环境的Consumer因为Topic、group相同,它们会均分这些Queue。
  2. Producer发送消息时,由于负载均衡机制,不确定会发给哪个Queue,也就是说A环境的TagA消息可能会发给任意一个Queue,B环境的TagB消息同理。

也就是说,有可能Producer会把打上TagA的A环境消息发给任意一个Queue,这个Queue并不是由A环境的Consumer消费的,这个消息会在Consumer端被过滤掉,并且随着offset的更新而被忽略掉,相当于丢失了。
根据tag划分环境导致消息被过滤的问题