QMQ 总结

总体架构

QMQ-消息的发送和消费流程

Producer

入口

生产者入口:MessageProducerProvider / MessageConsumerProvider -> InitializingBean, DisposableBean
将消息生产者(MessageProducerProvider)注入到 Spring 容器。

发送同步消息

MessageProducerProvider -> RPCQueueSender

  1. 同步发送消息 ProduceMessage::send -> sendSync -> RPCQueueSender::send
  2. 路由分组发送(RPCQueueSender::process -> groupBy -> MessageSenderGroup::send)
    路由表示将相同类型的请求放到一个 Connection 中处理,每个 Connection 使用客户端类别和发送的主题进行标识clientType.getCode() + "|" + message.getSubject();
    构建Map<Connection, MessageSenderGroup>,将向同一 Server 发送的消息分到同一组、打包发送,以提高效率。
    创建 NettyConnection 的时候,需要将客户端信息发送到 MetaServer 上进行注册(NettyRouter::route -> connection::init)。
  3. 获取 broker 集群信息(NettyConnection::send -> brokerService::getClusterBySubject)
  4. 负载均衡(NettyConnection::send -> BrokerLoadBalance::loadBalance)
    从集群信息 BrokerClusterInfo 中选出下一个可用的 broker group(一个集群由一个 master 和 0 到多个 slaves 组成)。
  5. 发送消息(NettyClient::sendSync)
    同步发送其实内部调用了异步发送 sendAsync,只是在发送后直接调用 result.get()等待响应、而不是将接收响应的任务交给用户。
    封装 Netty 来发送请求。
  6. 发送失败重试(MessageSenderGroup::send -> errorHandler:error)
    注意它是怎么处理链路追踪监控的。

异步发送消息

ProduceMessage::send -> QueueSender::offer

  1. 将消息压入一个发送队列
    com.benmu.rapidq.client.producer.sender.RPCQueueSender#offer
    消息被添加到了一个BlockingQueue中,类型是LinkedBlockingQueue,默认的队列长度限制是 1000。
  2. 执行发送
    com.benmu.rapidq.common.batch.BatchExecutor#addItem(Item)
    如果消息被成功添加到发送队列,则提交一个发送消息线程任务到线程池中,线程池核心线程数为 1,最大线程数为 CPU 核心线程数+1。
  3. 实际发送过程与上面发送同步消息的流程并无不同。

Consumer

入口

消费者入口:MessageConsumerProvider::addListener

启动

  1. MessageConsumerProvider::addListener 添加监听器
  2. MessageDistributor::addListener
  3. ConsumerRegister::regist(subjectPrefix, consumerGroup, registParam)
  4. PullRegister::createAndSubmitPullEntry
    创建 PullEntry 并将其添加到线程池(Executors.newCachedThreadPool)中,轮询拉取消息。
    轮询拉取(PullEntry::run),注意 isRunning 是 AtomicBoolean 类型的,它是一个状态变量,用于控制消息循环。
  5. AbstractPullEntry::pull 执行拉取操作
  6. AckService::getAckSendInfo 生成请求参数
  7. PullService::pull
    使用 Netty 作客户端(NIO),是单例的 NettyClient。
    使用 Google Guava 包中的 Listenable Future 实现异步获取返回值(伪 AIO)。
  8. NettyClient::sendAsync

消费消息

com.benmu.rapidq.client.consumer.handler.MessageDistributor#handle

  1. 找一个 MessageHandler 消费消息

  2. 消费成功后,向 Broker 返回 ACK
    java.lang.Runnable#triggerAfterCompletion

Consumer(Subscriber)自动上下线管理

Consumer 上线注册(AckMessageProcessor::subscribe -> subscriberStatusChecker::addSubscriber,还有 PullMessageProcessor 同理)
启动服务器时开启一个线程心跳检测 Consumer 健康状况(ServerWrapper::start -> startConsumerChecker)

At Least Once Consume

QMQ 保证至少一次的消费,在失败时会重试,重试需要保证 Consumer 端的消息幂等性,所谓的幂等其实是每次接口或者业务的调用,参数一样,返回结果就必须一样。QMQ 的 producer 可能由于网络问题导致消息发送重复,虽然 QMQ 尽量保证消息的可靠性,但是在业务处理中我们应该进行判重处理,实现业务逻辑处理的幂等,QMQ 消息的特性是保证最少一次,也就是说可能出现重复,对于重复性问题要求严格的自己做业务上的判断。

Server(Broker)

入口

Bootstrap(qmq-server 包内)

服务器启动流程

  1. 注册配置
    主要是 broker.properties 中的配置项
  2. 初始化日志存储
    日志包括 consumerlog、messagelog、pulllog、actionlog、checkpoint
  3. startServerHandlers(启动服务器接收请求)
    启动一个 NettyServer,监听 SEND_MESSAGEPULL_MESSAGEACK_REQUEST 这几种请求,分别对应消息处理器 SendMessageProcessorPullMessageProcessorAckMessageProcessor
  4. startConsumerChecker(检查订阅者)
    心跳验活 Subscriber
  5. addToResources

3 种消息处理器的执行方式

  1. SendMessageProcessor
    接收消息,将消息存放到 store 中(SendMessageWorker)
  2. PullMessageProcessor
    根据路由规则
  3. AckMessageProcessor

消息路由

在 Producer 端,发送消息 MessageProducer::generateMessage 需要标识该消息的 subject(主题)

在 Consumer 端,发送消息 MessageConsumerProvider::addListener 有好几个参数,其中有两个和路由关系比较大的消息:

  • subject(主题)
    消息主题,一般与业务相关联。
  • consumerGroup(消费者集群)
    标识一群消费者,一个 group 可以表示订阅该消息主题的一个应用。

在 Server 端,接收到请求后会先取出 subjectgroup 这两个参数,进行路由的操作:

  1. 将请求分派到一个 actor (比如 PullMessageWorker::pull -> actorSystem::dispatch)
  2. 线程池调度 actor 执行任务
  3. actor 执行对应 MessageWorker 的任务(比如 PullMessageWorker::process)

所以 Producer 将某一主题消息发送到 Server 上保存,多个 Consumer 可以同时订阅同一 subject,且它们可能是来自多个 group 的。
所以, subjectgroup 其实唯一标识了一个 actor ,一个 actor 服务于一群 ConsumerConsumer

消息的推拉模型

推拉系统概述

不管是消息队列还是其他涉及到数据同步的应用,在进行数据同步的时候都有 pull 和 push 两种模式:

  • pull 模式下,客户端连接上 broker 之后,主动发起方法调用获取远程的结果,说的直白一点就是一次 RPC 调用,即同步方法调用;
  • push 模式下,客户端与 broker 建立长连接,当有消息进入 broker,broker 进行消息推送至所有的连接客户端,即异步方法调用;
    但是真正的实现中 broker 一般是很难维护这么多长连接,因为每个长连接都需要占用一个端口号。
  • 结合 pull 和 push 的一种方案:客户端连接到 broker 后,启动一个线程,这个线程会循环从 broker 中拉取消息。

因此通过比较上面两种方式,可以启发我们为什么在互联网上大吞吐量的消息队列都是采用 pull 模式,而非 broker push 模式?

  1. push 模式下,消费端的性能会影响整个消息队列服务器的性能。
  2. push 模式下,容易造成 broker 的消息积压。

常见系统中的推拉模型

从一些资料(消息队列-推/拉模式学习 & ActiveMQ 及 JMS 学习)上看到消息队列的推拉模型没有固定的实现模式(生产端基本上都是推模型,最常被讨论的是消费端如何实现):

  • ActiveMQ:消费端为拉模型
  • RabbitMQ:消费端为推模型
  • Kafka:消费端为拉模型

消息的存储模型

QMQ 中有 ConsumerLogMessageLogPullLogActionLogCheckpoint 这 5 种日志,其中前三者与消息密切相关,运行示意图如下:
QMQ-消息的存储模型

  1. message log
    存储所有消息
  2. consume log
    存储每个主题的消息在 message log 中的位置(偏移)
  3. pull log
    记录每个 consumer 消费了主题中的哪些消息

消息存储的操作在 MessageStoreWrapper 中实现。

连接管理

分情况看:

  1. 对 metaServer 的请求属于长连接(连接被缓存了可以重复使用 NettyRouter -> cached);
  2. Consumer 端同样会将连接换缓存起来(PullRegister::pullEntryMap),注意一个 PullEntry 并不代表一个 Subscriber,一个 Subscriber 需要使用一组 subject 和 group 来唯一标识;
  3. Server 端使用 Netty 管理连接,Netty 采用的是 NIO 模型,连接由 Netty 框架进行管理;

Actor

正如上边分析所述,消息路由的时候会根据主题(topic)和消费组(group)来定位一条消息,一个 topic 可能会由多个 group 共享,这时候会引起一个消费隔离的问题:原本情况下,服务器使用一个 Thread Pool 来处理所有请求,如下图所示,request queue 中白色的属于来自 groupA 的请求,紫色的属于来自 groupB 的请求,来自 groupA 的请求比来自 groupB 的请求要更多,会导致 groupA 分去更多线程池资源来处理请求、占用更多服务器资源。
QMQ-消费隔离问题

参考

https://github.com/qunarcorp/qmq/blob/master/docs/cn/pattern.md

  1. 消息消费隔离
  2. The actor model in 10 minutes
  3. actor-engine
  4. 消息队列技术点梳理(思维导图版)