RocketMQ消息可靠性

消息有序性

一些场景需要保证操作的顺序性,比如A系统要将订单同步给B系统,但是要按照订单所发生事件顺序来同步,比如后台先修改订单价格再用户支付,那么就要先发修改价格的状态再发送支付成功的状态。
RocketMQ 可以严格的保证消息有序。

消息优先级

消息优先级机制可以让消息队列中优先级较高的消息先投递,比如订单创建消息就可以优先于日常推送消息。
RocketMQ 没有直接实现消息的优先级,主要是处于性能考虑,因为 RocketMQ 所有消息都是持久化的,所以如果按照优先级来排序,开销会非常大。但也可以用一种变通的方式实现消息的优先级,比如创建 3 个代表不同优先级的队列。

有序消息的实现方式

要保证MQ消息消费的有序性,需要保证以下3个阶段的有序性:
RocketMQ保持消息的有序性

  1. 消息被发送时保持有序;
  2. 消息被存储时保持和发送时的顺序一致;
  3. 消息被消费时保持和存储时的顺序一致。

在RocketMQ中,有两种实现方式:

  1. 全局有序
    RocketMQ-全局有序
    一个Topic内所有的消息都发送到同一个Queue。
    适用于性能要求不高,所有的消息严格按照FIFO原则进行消息发布和消费的场景。
  2. 分区有序
    RocketMQ-分区有序
    RocketMQ根据用户自定义的Sharding Key将消息散列到不同的Queue,每个Queue内的消费是严格有序的。
    适用于性能要求较高的场景。

RocketMQ中有序消息的实现原理

发送端需要指定消息的ShardingKey:

1
org.apache.rocketmq.client.producer.DefaultMQProducer#send(Message, MessageQueueSelector, Object)

如上述消息发送API所示,第二个参数MessageQueueSelector可以用于发送消息时指定某个Queue,在API层面没有区分全局有序和分区有序,如果要实现全局有序就把所有消息都Sharding到一个Queue上即可。

消费方拉取时需要区分Pull和Push两种模式:

  1. Pull
    Pull模式就是每次从一个Queue中拉取。
  2. Push
    Push模式使用一个线程轮询Broker拉取消息,然后调用客户端提供的回调函数进行消费,在客户端中需要保证调用MessageListener时消息的顺序性。
    ConsumeMessageOrderlyService
    实现上,就是在消费前先对队列加锁,避免Consumer并发消费一个队列(一个队列只能被一个Consumer消费,因此这个锁主要是保证Consumer不会并发消费一个消息,是单机锁而不是分布式锁)。

有序性与可用性间存在的矛盾

为了保证服务的高可用性,RocketMQ支持把一个主题分布到多对主从节点上去,每对主从节点中承担主题中的一部分队列,如果某个主节点宕机了,会自动切换到其他主节点上继续发消息,这样既解决了可用性的问题,还可以通过水平扩容来提升 Topic 总体的性能。
但是严格的顺序性要求指定队列来发送消息,一个队列一定是落在一个特定的主节点上的,如果该主节点宕机了,那么顺序性也就不存在了。
在RocketMQ中引入了Dledger的复制方式,这种方式对上述问题的解决方案是:消息必须被复制到半数以上节点才能返回成功,且主节点宕机后支持通过选举来动态切换主节点,Dledger在选举时,总会把数据和主节点一致的从节点选为新的主节点,这样就保证了数据的一致性,既不会丢消息,还可以保证严格顺序。

消息可靠性

只要不是存储硬件发生不可逆的损坏,RocketMQ 都可以保证消息不丢或少丢,比如 Broker 挂掉、操作系统挂掉等;
如果硬件发生不可逆的损坏,则该节点的消息就无法恢复了,需要通过异步复制来恢复。

如何检测消息丢失?

  1. 利用分布式链路追踪系统
  2. 利用消息队列的有序性来验证是否有消息丢失

RocketMQ 如何保证消息不丢?

保证消息不丢失的原理是复制,传统的复制方式有异步复制和同步双写复制两种:

  1. 异步复制
    消息先发送到主节点上,就返回”写入成功”,然后消息再异步复制到从节点上。
  2. 同步双写复制
    消息同步双写到主从节点上,主从都写成功,才返回”写入成功”。

    这两种方式本质区别是:写入多少个副本再返回,异步复制需要的副本数是1,而同步双写需要的副本数为2。

RocketMQ的复制模式是基于Deldger的新复制模式
在 RocketMQ 中,Broker 的主从关系是通过配置固定的,不支持动态切换。如果主节点宕机,生产者就不能再生产消息了,消费者可以自动切换到从节点继续进行消费。这时候,即使有一些消息没有来得及复制到从节点上,这些消息依然躺在主节点的磁盘上,除非是主节点的磁盘坏了,否则等主节点重新恢复服务的时候,这些消息依然可以继续复制到从节点上,也可以继续消费,不会丢消息,消息的顺序也是没有问题的。

Deldger的复制模式

  • RocketMQ-on-DLedger Group 是指一组相同名称的 Broker,至少需要 3 个节点,通过 Raft 自动选举出一个 Leader,其余节点 作为 Follower,并在 Leader 和 Follower 之间复制数据以保证高可用。
  • RocketMQ-on-DLedger Group 能自动容灾切换,并保证数据一致。
  • RocketMQ-on-DLedger Group 是可以水平扩展的,也即可以部署任意多个 RocketMQ-on-DLedger Group 同时对外提供服务。

一主二从的三副本集群的复制过程:

  1. Dledger 在写入消息的时候,要求至少消息复制到半数以上的节点之后,才给客户端返回写入成功,并且它是支持通过选举来动态切换主节点的。

    至少半数这个要求有点像MySQL里的semi-sync和Redis里的复制策略,说明这种方案确实非常有效,只要是涉及集群复制的场景都可以考虑采用。

  2. 拿 3 个节点举例说明一下。
    当主节点宕机的时候,2 个从节点会通过投票选出一个新的主节点来继续提供服务,相比主从的复制模式,解决了可用性的问题。
    由于消息要至少复制到 2 个节点上才会返回写入成功,即使主节点宕机了,也至少有一个节点上的消息是和主节点一样的。
  3. Dledger 在选举时,总会把数据和主节点一样的从节点选为新的主节点,
    这样就保证了数据的一致性,既不会丢消息,还可以保证严格顺序。

Dledger的不足:

  • 选举过程中不能提供服务。
    最少需要 3 个节点才能保证数据一致性,3 节点时,只能保证 1 个节点宕机时可
    用,如果 2 个节点同时宕机,即使还有 1 个节点存活也无法提供服务,资源的利用率比较
    低。另外,由于至少要复制到半数以上的节点才返回写入成功,性能上也不如主从异步复制
    的方式快。

RocketMQ如何保证高可用

  • 通过多主多从架构保证高可用
    RocketMQ 支持把一个主题分布到多对主从节点上去,每对主从节点中承担主题中的一部分队列,如果某个主节点宕机了,会自动切换到其他主节点上继续发消息,这样既解决了可用性的问题,还可以通过水平扩容来提升 Topic 总体的性能。
  • 高可用与严格顺序不能并存
    在需要保证消息严格顺序的场景下,由于在主题层面无法保证严格顺序,所以必须指定队列来发送消息(指分区有序或全局有序),对于任何一个队列,它一定是落在一组特定的主从节点上,如果这个主节点宕机,其他的主节点是无法替代这个主节点的,否则就无法保证严格顺序。在这种复制模式下,严格顺序和高可用只能选择一个。

如果服务器宕机怎么办?

现在我们可以来回答服务器宕机应该怎么办?

Broker宕机

  1. RocketMQ并没有实现高可用性,如上所述,Master宕机会触发重新选举,因为消息需要被同步到过半Slave后才会返回,因此Master宕机后消息本身不会丢。
  2. 在RocketMQ中,Broker会定时向NameServer发送心跳,Broker如果宕机,一段时间后NameServer发现Broker上次心跳已经超过了时间阈值,则会将该Broker移除出服务注册表。

NameServer宕机

NameServer集群中的节点是没有Master、Slave之分的,其中一台挂掉并不会影响其他节点提供服务。

Consumer / Producer 宕机

Consumer或Producer宕机不会产生任何影响,随时可以扩容缩容。

主从集群的扩容缩容

在RocketMQ中消费的基本单位是队列而不是Broker,因此扩容时单纯增加一台Broker并没有什么作用,还需要给这台Broker分配Queue。
在RocketMQ中,读写队列与读写分离是完全不同的两个概念:

  • 读写分离是HA机制:将一个节点的数据同步到另一个节点,主节点可用于读写,从节点用于读。
  • 读写队列则是在做路由信息时使用。在消息发送时,使用写队列个数返回路由信息,而消息消费时按照读队列个数返回路由信息。在物理文件层面,只有写队列才会创建文件。举个例子:写队列个数是8,设置的读队列个数是4.这个时候,会创建8个文件夹,代表0 1 2 3 4 5 6 7,但在消息消费时,路由信息只返回4,在具体拉取消息时,就只会消费0 1 2 3这4个队列中的消息,4 5 6 7压根就没有消息。反过来,如果写队列个数是4,读队列个数是8,在生产消息时只会往0 1 2 3中生产消息,消费消息时则会从0 1 2 3 4 5 6 7所有的队列中消费,当然 4 5 6 7中压根就没有消息 ,假设消费group有两个消费者,事实上只有第一个消费者在真正的消费消息(0 1 2 3),第二个消费者压根就消费不到消息。
    因此只有readQueueNums>=writeQueueNums时程序才能正常进行,最好是readQueueNums>writeQueueNums。
    读写队列的分配见代码:MQClientInstance#updateTopicRouteInfoFromNameServer

rocketmq设置读写队列数的目的在于方便队列的缩容和扩容。比如缩容至1/2的话就是先把写队列缩小至1/2,等后一半队列中的消息被消费完毕,然后再将读队列缩小一半,即可达到平滑缩容的目的。

  • 将已有的Queue分配到新Broker
  • 增加更多的Queue并分配到新Broker

具体扩容的操作可参考:RocketMQ 主题扩分片后遇到的坑

事务消息

RocketMQ中的实现

  • 半事务消息:发事务消息时,发送方会先发送一条半事务消息给 Broker,此时 Broker 暂未收到 Producer 对该消息的二次确认,此时该消息被标记成暂不能投递状态,处于该种状态下的消息即半事务消息;
  • 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,Broker 通过扫描发现某条消息长期处于半事务消息时,需要主动向 Producer 询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。

RocketMQ-事务消息
事务消息的处理流程如上图所示:
图片来自事务消息

  1. Producer 发半事务消息给 Broker,Producer需要记录消息状态到一张本地消息事务表内,和业务数据在一个事务里提交,之后生产消费者均通过这张表里的状态来判断事务的状态;

    如果业务数据本身就是有状态的,其实也可以通过查询业务数据状态来判断事务状态。

  2. Broker 将半事务消息持久化后,向 Producer 返回 ACK 确认消息已发送成功,此时消息为半事务消息;
  3. Producer 端开始执行本地事务逻辑;
  4. Producer 端根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),Broker 收到 Commit 状态则将半事务消息标记为可投递,Consumer 最终将收到该消息;Broker 收到 Rollback 状态则删除半事务消息,Consumer 将不会接受该消息。

接下来我们看看各步骤如果出错会怎么样:

  1. 2消息丢失或3执行失败:发送方终止本次事务,提交 Rollback 消息给服务端;
  2. 4的 Commit 消息提交失败:见5,即服务端会回查事务状态;

事务消息回查步骤如下:

  1. 在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
  2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行操作。

事务消息源码

Producer发送

  1. 发送
    org.apache.rocketmq.client.producer.TransactionMQProducer#sendMessageInTransaction
    先添加半消息的标识,然后和普通消息一样发送,如果发送成功则执行本地事务。
  2. 执行本地事务
    org.apache.rocketmq.client.producer.TransactionListener#executeLocalTransaction
    使用了发送前传入的回调。
    返回值表示本地事务(LocalTransactionState)的执行情况:
    • COMMIT_MESSAGE:endTransaction时发送TRANSACTION_COMMIT_TYPE类型的消息到Broker,此时事务被提交,Consumer端可以消费该条消息;
    • ROLLBACK_MESSAGE:endTransaction时通知Broker TRANSACTION_ROLLBACK_TYPE,事务被回滚,Consumer端不可消费该消息;
    • UNKNOW:endTransaction时通知Broker TRANSACTION_NOT_TYPE,本地事务未执行完毕,Broker需要稍后反查本地事务状态。
  3. 根据本地事务执行结果,开始执行提交或回滚
    org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#endTransaction
    正如我们之前讨论的那样,这个消息发失败也没有关系,Broker之后会有反查。

Broker接收

  1. 处理发消息请求
    org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage
  2. 处理事务消息
    org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#prepareMessage -> org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#putHalfMessage
    可以看到RocketMQ并没有将半消息保存到客户端指定的那个队列中,而是记录了原始的主题队列后,把这个半消息保存在了一个特殊的内部主题 RMQ_SYS_TRANS_HALF_TOPIC 中,使用的队列号固定为 0
    这个主题和队列对消费者是不可见的,所以里面的消息永远不会被消费。这样,就保证了在事务提交成功之前,这个半消息对消费者来说是消费不到的。

Broker反查

  1. 启动反查定时任务
    TransactionalMessageCheckService
  2. 反查
    org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener#resolveHalfMsg
    定时从半消息队列中读出所有待反查的半消息,针对每个需要反查的半消息,Broker 会给对应的 Producer 发一个要求执行事务状态反查的 RPC 请求,根据 RPC 返回响应中的反查结果,来决定这个半消息是需要提交还是回滚,或者后续继续来反查。
  3. 结束事务
    org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest
    最后,提交或者回滚事务实现的逻辑是差不多的,首先把半消息标记为已处理,如果是提交事务,那就把半消息从半消息队列中复制到这个消息真正的主题和队列中去,如果要回滚事务,这一步什么都不需要做,最后结束这个事务。

QA

Broker的offset如何维护?本地的offset如何维护?

拉取速度是否会受到消费速度制约?如果不是,那么拉取速度过快是不是会导致重复消费甚至OOM?

并发消费offset为1-10的消息,其中8失败了,其他都是成功的,这种情况下本地的offset如何记录,broker的offset如何记录,有哪些是可能被重复消费的?

如何尽可能的避免重复消费?

应用第一次启动时offset如何定义?是否会消费历史消息,应用重启是否会丢消息?

应用扩所容是否会导致消费消息丢失和消费队列的重新排序?