RocketMQ 服务发现
RocketMQ 启动流程中的服务注册
RocketMQ 的消息队列集群结构主要包含 NameServer、Broker(Master/Slave)、Producer、Consumer 4 个部分,基本通信流程如下:
- Broker 启动后需要完成一次将自己注册至 NameServer 的操作;随后每隔 30s 时间定时向 NameServer 上报 Topic 路由信息。
Broker 启动入口:org.apache.rocketmq.broker.BrokerController#start
注册到每个 NameServer:org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll
- 消息生产者 Producer 作为客户端发送消息时候,需要根据消息的 Topic 从本地缓存的 TopicPublishInfoTable 获取路由信息。如果没有则更新路由信息会从 NameServer 上重新拉取,同时 Producer 会默认每隔 30s 向 NameServer 拉取一次路由信息。
- 消息生产者 Producer 根据
2
中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker 作为消息的接收者接收消息并落盘存储。 - 消息消费者 Consumer 根据
2
中获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费。
服务发现
Broker
Broker端启动时会向NameServer注册,并开启一个定时任务,用于每隔十秒向所有NameServer发送心跳请求,将Topic信息注册到NameServer。
NameServer
除了Broker可以向NameServer注册服务信息,NameServer也会启动一个定时任务来定时清理不活动的Broker,默认情况下是清除两分钟没有向NameServer发送心跳更新的Broker。
NameServer的结构比较简单,主要类只有6个:
NamesrvStartup
:程序入口。NamesrvController
:NameServer 的总控制器,负责所有服务的生命周期管理。RouteInfoManager
:NameServer 最核心的实现类,负责保存和管理集群路由信息。topicQueueTable
保存的是主题和队列信息,其中每个队列信息对应的类 QueueData 中,还保存了 brokerName。需要注意的是,这个 brokerName 并不真正是某个 Broker 的物理地址,它对应的一组 Broker 节点,包括一个主节点和若干个从节点。brokerAddrTable
中保存了集群中每个 brokerName 对应 Broker 信息,每个 Broker 信息用一个 BrokerData 对象表示。brokerLiveTable
中保存了集群中所有活跃的Broker。
定时每10秒一次扫描并清除不活跃的Broker,代码见:RouteInfoManager#scanNotActiveBroker
。BrokerHousekeepingService
:监控 Broker 连接状态的代理类。DefaultRequestProcessor
:负责处理客户端和 Broker 发送过来的 RPC 请求的处理器。
先用读写锁保证并发安全,然后比较所有路由信息Map并更新。ClusterTestRequestProcessor
:用于测试的请求处理器。
Producer
Producer是只能发给Broker集群里的Master的,如果Master挂掉,那么Producer也不能继续发消息了,只能等集群重新选举出一个新的Master,虽然可用性会降低,但是也给顺序消息的实现提供了方便。
Consumer
定时从NameServer拉取topicRouteTable
更新本地的brokerAddrTable
,也就是说,只要NameServer会把宕掉的Broker清掉,那么Consumer最终(注意并不能实时)也可以取到拿到一份活跃的Broker列表。MQClientInstance#updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer)
Consumer端是定时地上Broker拉取消息。org.apache.rocketmq.common.ServiceThread#start
如果某次消费出错了,就会触发Fallback方案,改为稍后再重试。org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync