RocketMQ 服务发现

RocketMQ 启动流程中的服务注册

RocketMQ 的消息队列集群结构主要包含 NameServer、Broker(Master/Slave)、Producer、Consumer 4 个部分,基本通信流程如下:

  1. Broker 启动后需要完成一次将自己注册至 NameServer 的操作;随后每隔 30s 时间定时向 NameServer 上报 Topic 路由信息。
    Broker 启动入口:org.apache.rocketmq.broker.BrokerController#start
    注册到每个 NameServer:org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll
  2. 消息生产者 Producer 作为客户端发送消息时候,需要根据消息的 Topic 从本地缓存的 TopicPublishInfoTable 获取路由信息。如果没有则更新路由信息会从 NameServer 上重新拉取,同时 Producer 会默认每隔 30s 向 NameServer 拉取一次路由信息。
  3. 消息生产者 Producer 根据2中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker 作为消息的接收者接收消息并落盘存储。
  4. 消息消费者 Consumer 根据2中获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费。

服务发现

Broker

Broker端启动时会向NameServer注册,并开启一个定时任务,用于每隔十秒向所有NameServer发送心跳请求,将Topic信息注册到NameServer。

NameServer

除了Broker可以向NameServer注册服务信息,NameServer也会启动一个定时任务来定时清理不活动的Broker,默认情况下是清除两分钟没有向NameServer发送心跳更新的Broker。
NameServer的结构比较简单,主要类只有6个:

  • NamesrvStartup:程序入口。
  • NamesrvController:NameServer 的总控制器,负责所有服务的生命周期管理。
  • RouteInfoManager:NameServer 最核心的实现类,负责保存和管理集群路由信息
    topicQueueTable 保存的是主题和队列信息,其中每个队列信息对应的类 QueueData 中,还保存了 brokerName。需要注意的是,这个 brokerName 并不真正是某个 Broker 的物理地址,它对应的一组 Broker 节点,包括一个主节点和若干个从节点。
    brokerAddrTable 中保存了集群中每个 brokerName 对应 Broker 信息,每个 Broker 信息用一个 BrokerData 对象表示。
    brokerLiveTable 中保存了集群中所有活跃的Broker。
    定时每10秒一次扫描并清除不活跃的Broker,代码见:RouteInfoManager#scanNotActiveBroker
  • BrokerHousekeepingService:监控 Broker 连接状态的代理类。
  • DefaultRequestProcessor:负责处理客户端和 Broker 发送过来的 RPC 请求的处理器。
    先用读写锁保证并发安全,然后比较所有路由信息Map并更新。
  • ClusterTestRequestProcessor:用于测试的请求处理器。

Producer

Producer是只能发给Broker集群里的Master的,如果Master挂掉,那么Producer也不能继续发消息了,只能等集群重新选举出一个新的Master,虽然可用性会降低,但是也给顺序消息的实现提供了方便。

Consumer

定时从NameServer拉取topicRouteTable更新本地的brokerAddrTable,也就是说,只要NameServer会把宕掉的Broker清掉,那么Consumer最终(注意并不能实时)也可以取到拿到一份活跃的Broker列表。
MQClientInstance#updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer)
Consumer端是定时地上Broker拉取消息。
org.apache.rocketmq.common.ServiceThread#start
如果某次消费出错了,就会触发Fallback方案,改为稍后再重试。
org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync