RocketMQ 消息发送流程

先介绍RocketMQ有哪些服务,然后介绍消息如何从Producer发送到Consumer。

一个消息的发送轨迹

Producer 端发送

Producer端的发送分为同步异步单向三种,下面以同步发送为例,异步发送只是把发送任务放到了一个线程池中执行,单向和同步的区别只是单向发出去后不会再管发没发成功()。

  1. Producer 端调发送接口
    org.apache.rocketmq.client.producer.DefaultMQProducer#send(Message)
    发送前需要设置 NameServer 的地址信息,这样发消息前才能根据 NameServer 中的服务注册表信息来进行消息的路由。
  2. 找 Topic 的路由信息
    org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo()
    Client 需要根据 Topic 来决定应该将消息发往哪台 Broker,这部分路由信息是保存在 NameServer 上的。
    org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(topic)
    第一次获取某个 Topic 的路由信息、或路由信息发生变化时,需要刷新一次本地路由表信息,并更新发布和订阅信息。
    org.apache.rocketmq.remoting.netty.NettyRemotingClient#getAndCreateNameserverChannel
    随机选一个 NameServer 获取 Topic 路由信息。
  3. 选一个 ConsumerQueue 发送
    org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue
    一个 Topic 包含多个 ConsumerQueue,客户端的负载均衡机制会从中选择一个发送
  4. 发送
    org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
    获取 Broker 的地址,调 Netty 接口发送。

Server 端存储

RocketMQ服务端代码层次结构

  1. 启动Server
    org.apache.rocketmq.broker.BrokerStartup#main
  2. 注册Processor
    org.apache.rocketmq.broker.BrokerController#registerProcessor
  3. 启动 Netty 监听器
    org.apache.rocketmq.remoting.netty.NettyRemotingServer#start
    Netty 中的线程模型总而言之:
    • 一个线程负责监听 TCP 请求(eventLoopGroupBoss),建立好连接后丢给Reactor 线程池eventLoopGroupSelector);
    • Reactor 负责将 socket 注册到 selector,当监听到网络数据后,再丢给Worker 线程池defaultEventExecutorGroup);
    • Worker 线程在真正执行业务逻辑之前需要进行 SSL 验证、编解码、空闲检查、网络连接管理,然后根据 RomotingCommand 的业务请求码 code 去 processorTable 这个本地缓存变量中找到对应的 processor,然后封装成 task 任务后,提交给对应的业务 processor 处理线程池来执行(sendMessageExecutor)。
      org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler#channelRead0
      之后,利用 Netty 的 Handler 链处理请求,实际处理命令的 Handler 为NettyServerHandler
  4. 处理请求
    org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand
    每个请求对应一个NettyRequestProcessor和一个线程池。
  5. 开始实际发送逻辑
    org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncSendMessage
    获取MessageStore,MessageStore负责消息的存储。
  6. 处理存储请求
    org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage
    存储消息是最常见的请求,当然RocketMQ还支持很多其他命令。
    org.apache.rocketmq.store.CommitLog#asyncPutMessage
    RocketMQ并不会立刻将消息写入到磁盘(CommitLog文件),而是使用MappedFile先写入PageCache,然后异步同步。
    org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput -> org.apache.rocketmq.store.DefaultMessageStore#doDispatch
    存储的消息也并不会立刻被写入到索引文件(IndexFile)队列文件(ConsumeQueue),而是由一个线程异步同步。

Consumer 端拉取

Consumer端的消费线程模型如下图所示:
Consumer线程模型

  1. 启动客户端
    org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
    可以看到,启动了很多实例,重点需要关注的是最后启动的MQClientInstance实例。
  2. 客户端实例的初始化
    org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask
    使用定时任务执行从 NameServer 拉取路由信息(包括 Topic 下的 Queue 和 Consumer 列表)、向 Broker 发送心跳等任务。
    org.apache.rocketmq.client.impl.consumer.PullMessageService#start
    启动一个线程轮询拉消息。
    org.apache.rocketmq.client.impl.consumer.RebalanceService#start
    重新负载均衡。
  3. 接收消息
    RebalanceService#start中启动了一个拉取线程
    可见Consumer端Pull消息是单线程的
  4. 提交处理
    org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest
    虽然Pull消息是单线程的,但是消费消息是多线程的,且多个group共享同一个线程池

    线程池定义:corePoolSize=20, maxPoolSize=64,等待队列是LinkedBlockingQueue

  5. 处理消费结果
    org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult
    可见有CONSUME_SUCCESSRECONSUME_LATER这两种消费结果(报错也被当作RECONSUME_LATER处理),这里最核心逻辑就是更新offset确定消费进度。

QA、消费者阻塞

如上所示,客户端通过线程池来处理消息,那么当线程被阻塞后还会继续消费消息吗?
不会,如果处理中的消息量达到了阈值,客户端会阻塞拉取线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}

if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}

QA2、如果客户端group消费线程池是共用的,那么一个group消费慢会影响其他group的消费吗?

是有可能的,需要分2阶段来分析

  1. 拉消息阶段,每个group的拉消息请求数都是公平的
    见代码 RebalanceImpl.updateProcessQueueTableInRebalance
  2. 消费消息阶段,除非某个group的消息消费速度过慢了,把所有的线程都阻塞了,才会影响其他group消息的消费

参考:https://blog.csdn.net/b1303110335/article/details/118755341

负载均衡

Producer 端负载均衡

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue
Producer 端发送消息前会先从 NameServer 拿到 Topic 的路由信息,即 Topic 有几个 Queue、及这些 Queue 位于哪个 Broker。之后,客户端会根据负载均衡和容错策略从中选择一个发送。

Consumer 端负载均衡

RocketMQ 中 Consumer 端有两种消费模式(Push/Pull),它们本质上都是基于拉模式来获取消息的,Push 模式只是对 Pull 模式的一种封装:

  1. Pull:向服务器直连发请求拉取消息。
  2. Push:每次拉取后立刻向服务器再次尝试拉取消息,如果没拉取到则延迟一段时间再继续拉取。

RocketMQ-负载均衡
Consumer 端的负载均衡体现在 RocketMQ 对 Queue 的分配:

  1. Consumer 端心跳
    Consumer 启动后会通过定时任务不断向集群中所有 Broker 实例发送心跳包,Broker 会将 Consumer 注册到缓存中;
    另还有个定时任务从 NameServer 拉取 Topic 下的 Queue 和 Consumer 列表,便于之后的负载均衡。
  2. Consumer 端负载均衡
    RebalanceImpl#rebalanceByTopic
    获取该 Topic 下的消息队列和消费者集合,按消息队列分配策略算法(AllocateMessageQueueStrategy)将每个消息队列分配给消费者,默认为平均分配。
    这里的平均分配算法,类似于分页的算法,将所有 MessageQueue 排好序类似于记录,将所有消费端 Consumer 排好序类似页数,并求出每一页需要包含的平均 size 和每个页面记录的范围 range,最后遍历整个 range 而计算出当前 Consumer 端应该分配到的记录(这里即为:MessageQueue)。
    其中的源码如下所示:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    public static List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
    List<String> cidAll) {
    if (currentCID == null || currentCID.length() < 1) {
    throw new IllegalArgumentException("currentCID is empty");
    }
    if (mqAll == null || mqAll.isEmpty()) {
    throw new IllegalArgumentException("mqAll is null or mqAll empty");
    }
    if (cidAll == null || cidAll.isEmpty()) {
    throw new IllegalArgumentException("cidAll is null or cidAll empty");
    }

    List<MessageQueue> result = new ArrayList<MessageQueue>();
    if (!cidAll.contains(currentCID)) {
    log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
    consumerGroup,
    currentCID,
    cidAll);
    return result;
    }

    // 当前Consumer在第几页
    int index = cidAll.indexOf(currentCID);
    // 如果是最后一页,取余数
    int mod = mqAll.size() % cidAll.size();
    // 每页数量
    int averageSize = mqAll.size() <= cidAll.size() ?
    1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1
    : mqAll.size() / cidAll.size());
    // 当前Consumer负责的第一个Queue所处的下标
    int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
    // 当前Consumer需要负责几个Queue
    int range = Math.min(averageSize, mqAll.size() - startIndex);
    for (int i = 0; i < range; i++) {
    result.add(mqAll.get((startIndex + i) % mqAll.size()));
    }
    return result;
    }
    举个例子:
    有32个消息队列(ConsumerQueue),7个Consumer,我们求出的值分别是:
  • mod=4
  • 前4台Consumer求出averageSize=5,后3台Consumer求出averageSize=4
  • 这7台Consumer求出的startIndex分别为:0 5 10 15 20 24 28
  • range分别为:5 5 5 5 4 4 4

参考

  1. 从年末生产故障解锁RocketMQ集群部署的最佳实践
    硬件影响RocketMQ可用性,背景是NameServer所在宿主机假死,TCP却未断开,客户端一直使用该NameServer的服务一直超时,因此在宕机期间一直使用的是之前缓存的服务列表。
    NameServer与Broker部署在一起,宿主机宕机后两个服务都不能用了,但是客户端没有更新注册表,因此一直将请求发给宕掉的Broker。
    解决办法是把NameServer和Broker分开来部署。