RocketMQ 消息发送流程
先介绍RocketMQ有哪些服务,然后介绍消息如何从Producer发送到Consumer。
一个消息的发送轨迹
Producer 端发送
Producer端的发送分为同步、异步、单向三种,下面以同步发送为例,异步发送只是把发送任务放到了一个线程池中执行,单向和同步的区别只是单向发出去后不会再管发没发成功()。
- Producer 端调发送接口
org.apache.rocketmq.client.producer.DefaultMQProducer#send(Message)
发送前需要设置 NameServer 的地址信息,这样发消息前才能根据 NameServer 中的服务注册表信息来进行消息的路由。 - 找 Topic 的路由信息
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo()
Client 需要根据 Topic 来决定应该将消息发往哪台 Broker,这部分路由信息是保存在 NameServer 上的。org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(topic)
第一次获取某个 Topic 的路由信息、或路由信息发生变化时,需要刷新一次本地路由表信息,并更新发布和订阅信息。org.apache.rocketmq.remoting.netty.NettyRemotingClient#getAndCreateNameserverChannel
随机选一个 NameServer 获取 Topic 路由信息。 - 选一个 ConsumerQueue 发送
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue
一个 Topic 包含多个 ConsumerQueue,客户端的负载均衡机制会从中选择一个发送。 - 发送
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
获取 Broker 的地址,调 Netty 接口发送。
Server 端存储
- 启动Server
org.apache.rocketmq.broker.BrokerStartup#main
- 注册Processor
org.apache.rocketmq.broker.BrokerController#registerProcessor
- 启动 Netty 监听器
org.apache.rocketmq.remoting.netty.NettyRemotingServer#start
Netty 中的线程模型总而言之:- 一个线程负责监听 TCP 请求(
eventLoopGroupBoss
),建立好连接后丢给Reactor 线程池(eventLoopGroupSelector
); - Reactor 负责将 socket 注册到 selector,当监听到网络数据后,再丢给Worker 线程池(
defaultEventExecutorGroup
); - Worker 线程在真正执行业务逻辑之前需要进行 SSL 验证、编解码、空闲检查、网络连接管理,然后根据 RomotingCommand 的业务请求码 code 去 processorTable 这个本地缓存变量中找到对应的 processor,然后封装成 task 任务后,提交给对应的业务 processor 处理线程池来执行(
sendMessageExecutor
)。org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler#channelRead0
之后,利用 Netty 的 Handler 链处理请求,实际处理命令的 Handler 为NettyServerHandler
。
- 一个线程负责监听 TCP 请求(
- 处理请求
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand
每个请求对应一个NettyRequestProcessor
和一个线程池。 - 开始实际发送逻辑
org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncSendMessage
获取MessageStore,MessageStore负责消息的存储。 - 处理存储请求
org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage
存储消息是最常见的请求,当然RocketMQ还支持很多其他命令。org.apache.rocketmq.store.CommitLog#asyncPutMessage
RocketMQ并不会立刻将消息写入到磁盘(CommitLog文件),而是使用MappedFile
先写入PageCache,然后异步同步。org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput
->org.apache.rocketmq.store.DefaultMessageStore#doDispatch
存储的消息也并不会立刻被写入到索引文件(IndexFile)和队列文件(ConsumeQueue),而是由一个线程异步同步。
Consumer 端拉取
Consumer端的消费线程模型如下图所示:
- 启动客户端
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
可以看到,启动了很多实例,重点需要关注的是最后启动的MQClientInstance
实例。 - 客户端实例的初始化
org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask
使用定时任务执行从 NameServer 拉取路由信息(包括 Topic 下的 Queue 和 Consumer 列表)、向 Broker 发送心跳等任务。org.apache.rocketmq.client.impl.consumer.PullMessageService#start
启动一个线程轮询拉消息。org.apache.rocketmq.client.impl.consumer.RebalanceService#start
重新负载均衡。 - 接收消息
RebalanceService#start
中启动了一个拉取线程
可见Consumer端Pull消息是单线程的 - 提交处理
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest
虽然Pull消息是单线程的,但是消费消息是多线程的,且多个group共享同一个线程池。线程池定义:corePoolSize=20, maxPoolSize=64,等待队列是LinkedBlockingQueue
- 处理消费结果
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult
可见有CONSUME_SUCCESS
和RECONSUME_LATER
这两种消费结果(报错也被当作RECONSUME_LATER
处理),这里最核心逻辑就是更新offset确定消费进度。
QA、消费者阻塞
如上所示,客户端通过线程池来处理消息,那么当线程被阻塞后还会继续消费消息吗?
不会,如果处理中的消息量达到了阈值,客户端会阻塞拉取线程:
1 | long cachedMessageCount = processQueue.getMsgCount().get(); |
QA2、如果客户端group消费线程池是共用的,那么一个group消费慢会影响其他group的消费吗?
是有可能的,需要分2阶段来分析
- 拉消息阶段,每个group的拉消息请求数都是公平的
见代码RebalanceImpl.updateProcessQueueTableInRebalance
- 消费消息阶段,除非某个group的消息消费速度过慢了,把所有的线程都阻塞了,才会影响其他group消息的消费
参考:https://blog.csdn.net/b1303110335/article/details/118755341
负载均衡
Producer 端负载均衡
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue
Producer 端发送消息前会先从 NameServer 拿到 Topic 的路由信息,即 Topic 有几个 Queue、及这些 Queue 位于哪个 Broker。之后,客户端会根据负载均衡和容错策略从中选择一个发送。
Consumer 端负载均衡
RocketMQ 中 Consumer 端有两种消费模式(Push/Pull),它们本质上都是基于拉模式来获取消息的,Push 模式只是对 Pull 模式的一种封装:
- Pull:向服务器直连发请求拉取消息。
- Push:每次拉取后立刻向服务器再次尝试拉取消息,如果没拉取到则延迟一段时间再继续拉取。
Consumer 端的负载均衡体现在 RocketMQ 对 Queue 的分配:
- Consumer 端心跳
Consumer 启动后会通过定时任务不断向集群中所有 Broker 实例发送心跳包,Broker 会将 Consumer 注册到缓存中;
另还有个定时任务从 NameServer 拉取 Topic 下的 Queue 和 Consumer 列表,便于之后的负载均衡。 - Consumer 端负载均衡
RebalanceImpl#rebalanceByTopic
获取该 Topic 下的消息队列和消费者集合,按消息队列分配策略算法(AllocateMessageQueueStrategy
)将每个消息队列分配给消费者,默认为平均分配。
这里的平均分配算法,类似于分页的算法,将所有 MessageQueue 排好序类似于记录,将所有消费端 Consumer 排好序类似页数,并求出每一页需要包含的平均 size 和每个页面记录的范围 range,最后遍历整个 range 而计算出当前 Consumer 端应该分配到的记录(这里即为:MessageQueue)。
其中的源码如下所示:举个例子:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38public static List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
// 当前Consumer在第几页
int index = cidAll.indexOf(currentCID);
// 如果是最后一页,取余数
int mod = mqAll.size() % cidAll.size();
// 每页数量
int averageSize = mqAll.size() <= cidAll.size() ?
1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1
: mqAll.size() / cidAll.size());
// 当前Consumer负责的第一个Queue所处的下标
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
// 当前Consumer需要负责几个Queue
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
有32个消息队列(ConsumerQueue),7个Consumer,我们求出的值分别是:
- mod=4
- 前4台Consumer求出averageSize=5,后3台Consumer求出averageSize=4
- 这7台Consumer求出的startIndex分别为:0 5 10 15 20 24 28
- range分别为:5 5 5 5 4 4 4
参考
- 从年末生产故障解锁RocketMQ集群部署的最佳实践
硬件影响RocketMQ可用性,背景是NameServer所在宿主机假死,TCP却未断开,客户端一直使用该NameServer的服务一直超时,因此在宕机期间一直使用的是之前缓存的服务列表。
NameServer与Broker部署在一起,宿主机宕机后两个服务都不能用了,但是客户端没有更新注册表,因此一直将请求发给宕掉的Broker。
解决办法是把NameServer和Broker分开来部署。