QMQ 总结
总体架构
Producer
入口
生产者入口:MessageProducerProvider / MessageConsumerProvider -> InitializingBean, DisposableBean
将消息生产者(MessageProducerProvider)注入到 Spring 容器。
发送同步消息
MessageProducerProvider -> RPCQueueSender
- 同步发送消息 ProduceMessage::send -> sendSync -> RPCQueueSender::send
- 按路由分组发送(RPCQueueSender::process -> groupBy -> MessageSenderGroup::send)
路由表示将相同类型的请求放到一个 Connection 中处理,每个 Connection 使用客户端类别和发送的主题进行标识clientType.getCode() + "|" + message.getSubject()
;
构建Map<Connection, MessageSenderGroup>
,将向同一 Server 发送的消息分到同一组、打包发送,以提高效率。
创建 NettyConnection 的时候,需要将客户端信息发送到 MetaServer 上进行注册(NettyRouter::route -> connection::init)。 - 获取 broker 集群信息(NettyConnection::send -> brokerService::getClusterBySubject)
- 负载均衡(NettyConnection::send -> BrokerLoadBalance::loadBalance)
从集群信息 BrokerClusterInfo 中选出下一个可用的 broker group(一个集群由一个 master 和 0 到多个 slaves 组成)。 - 发送消息(NettyClient::sendSync)
同步发送其实内部调用了异步发送 sendAsync,只是在发送后直接调用 result.get()等待响应、而不是将接收响应的任务交给用户。
封装 Netty 来发送请求。 - 发送失败重试(MessageSenderGroup::send -> errorHandler:error)
注意它是怎么处理链路追踪、监控的。
异步发送消息
ProduceMessage::send -> QueueSender::offer
- 将消息压入一个发送队列
com.benmu.rapidq.client.producer.sender.RPCQueueSender#offer
消息被添加到了一个BlockingQueue
中,类型是LinkedBlockingQueue
,默认的队列长度限制是 1000。 - 执行发送
com.benmu.rapidq.common.batch.BatchExecutor#addItem(Item)
如果消息被成功添加到发送队列,则提交一个发送消息线程任务到线程池中,线程池核心线程数为 1,最大线程数为 CPU 核心线程数+1。 - 实际发送过程与上面发送同步消息的流程并无不同。
Consumer
入口
消费者入口:MessageConsumerProvider::addListener
启动
- MessageConsumerProvider::addListener 添加监听器
- MessageDistributor::addListener
- ConsumerRegister::regist(subjectPrefix, consumerGroup, registParam)
- PullRegister::createAndSubmitPullEntry
创建 PullEntry 并将其添加到线程池(Executors.newCachedThreadPool)中,轮询拉取消息。
轮询拉取(PullEntry::run),注意 isRunning 是 AtomicBoolean 类型的,它是一个状态变量,用于控制消息循环。 - AbstractPullEntry::pull 执行拉取操作
- AckService::getAckSendInfo 生成请求参数
- PullService::pull
使用 Netty 作客户端(NIO),是单例的 NettyClient。
使用 Google Guava 包中的 Listenable Future 实现异步获取返回值(伪 AIO)。 - NettyClient::sendAsync
消费消息
com.benmu.rapidq.client.consumer.handler.MessageDistributor#handle
找一个 MessageHandler 消费消息
消费成功后,向 Broker 返回 ACK
java.lang.Runnable#triggerAfterCompletion
Consumer(Subscriber)自动上下线管理
Consumer 上线注册(AckMessageProcessor::subscribe -> subscriberStatusChecker::addSubscriber,还有 PullMessageProcessor 同理)
启动服务器时开启一个线程心跳检测 Consumer 健康状况(ServerWrapper::start -> startConsumerChecker)
At Least Once Consume
QMQ 保证至少一次的消费,在失败时会重试,重试需要保证 Consumer 端的消息幂等性,所谓的幂等其实是每次接口或者业务的调用,参数一样,返回结果就必须一样。QMQ 的 producer 可能由于网络问题导致消息发送重复,虽然 QMQ 尽量保证消息的可靠性,但是在业务处理中我们应该进行判重处理,实现业务逻辑处理的幂等,QMQ 消息的特性是保证最少一次,也就是说可能出现重复,对于重复性问题要求严格的自己做业务上的判断。
Server(Broker)
入口
Bootstrap(qmq-server 包内)
服务器启动流程
- 注册配置
主要是 broker.properties 中的配置项 - 初始化日志存储
日志包括 consumerlog、messagelog、pulllog、actionlog、checkpoint - startServerHandlers(启动服务器接收请求)
启动一个 NettyServer,监听SEND_MESSAGE
、PULL_MESSAGE
、ACK_REQUEST
这几种请求,分别对应消息处理器SendMessageProcessor
、PullMessageProcessor
、AckMessageProcessor
。 - startConsumerChecker(检查订阅者)
心跳验活 Subscriber - addToResources
3 种消息处理器的执行方式
- SendMessageProcessor
接收消息,将消息存放到 store 中(SendMessageWorker) - PullMessageProcessor
根据路由规则 - AckMessageProcessor
消息路由
在 Producer 端,发送消息 MessageProducer::generateMessage
需要标识该消息的 subject(主题)
。
在 Consumer 端,发送消息 MessageConsumerProvider::addListener
有好几个参数,其中有两个和路由关系比较大的消息:
- subject(主题)
消息主题,一般与业务相关联。 - consumerGroup(消费者集群)
标识一群消费者,一个 group 可以表示订阅该消息主题的一个应用。
在 Server 端,接收到请求后会先取出 subject
和 group
这两个参数,进行路由的操作:
- 将请求分派到一个
actor
(比如 PullMessageWorker::pull -> actorSystem::dispatch) - 线程池调度
actor
执行任务 - actor 执行对应 MessageWorker 的任务(比如 PullMessageWorker::process)
所以 Producer 将某一主题消息发送到 Server 上保存,多个 Consumer 可以同时订阅同一 subject,且它们可能是来自多个 group 的。
所以, subject
和 group
其实唯一标识了一个 actor
,一个 actor
服务于一群 Consumer
和 Consumer
。
消息的推拉模型
推拉系统概述
不管是消息队列还是其他涉及到数据同步的应用,在进行数据同步的时候都有 pull 和 push 两种模式:
- pull 模式下,客户端连接上 broker 之后,主动发起方法调用获取远程的结果,说的直白一点就是一次 RPC 调用,即同步方法调用;
- push 模式下,客户端与 broker 建立长连接,当有消息进入 broker,broker 进行消息推送至所有的连接客户端,即异步方法调用;
但是真正的实现中 broker 一般是很难维护这么多长连接,因为每个长连接都需要占用一个端口号。 - 结合 pull 和 push 的一种方案:客户端连接到 broker 后,启动一个线程,这个线程会循环从 broker 中拉取消息。
因此通过比较上面两种方式,可以启发我们为什么在互联网上大吞吐量的消息队列都是采用 pull 模式,而非 broker push 模式?
- push 模式下,消费端的性能会影响整个消息队列服务器的性能。
- push 模式下,容易造成 broker 的消息积压。
常见系统中的推拉模型
从一些资料(消息队列-推/拉模式学习 & ActiveMQ 及 JMS 学习)上看到消息队列的推拉模型没有固定的实现模式(生产端基本上都是推模型,最常被讨论的是消费端如何实现):
- ActiveMQ:消费端为拉模型
- RabbitMQ:消费端为推模型
- Kafka:消费端为拉模型
消息的存储模型
QMQ 中有 ConsumerLog
、 MessageLog
、 PullLog
、 ActionLog
、Checkpoint
这 5 种日志,其中前三者与消息密切相关,运行示意图如下:
- message log
存储所有消息 - consume log
存储每个主题的消息在 message log 中的位置(偏移) - pull log
记录每个 consumer 消费了主题中的哪些消息
消息存储的操作在 MessageStoreWrapper
中实现。
连接管理
分情况看:
- 对 metaServer 的请求属于长连接(连接被缓存了可以重复使用 NettyRouter -> cached);
- Consumer 端同样会将连接换缓存起来(PullRegister::pullEntryMap),注意一个 PullEntry 并不代表一个 Subscriber,一个 Subscriber 需要使用一组 subject 和 group 来唯一标识;
- Server 端使用 Netty 管理连接,Netty 采用的是 NIO 模型,连接由 Netty 框架进行管理;
Actor
正如上边分析所述,消息路由的时候会根据主题(topic)和消费组(group)来定位一条消息,一个 topic 可能会由多个 group 共享,这时候会引起一个消费隔离的问题:原本情况下,服务器使用一个 Thread Pool 来处理所有请求,如下图所示,request queue 中白色的属于来自 groupA 的请求,紫色的属于来自 groupB 的请求,来自 groupA 的请求比来自 groupB 的请求要更多,会导致 groupA 分去更多线程池资源来处理请求、占用更多服务器资源。
参考
https://github.com/qunarcorp/qmq/blob/master/docs/cn/pattern.md