RocketMQ 消息重试和幂等
重试和幂等
Retry队列和offset
在RocketMQ启动时,每个group层面都会再定义一个专用的重试topic,重试消息被插入了重试topic队列。
重试队列存在的意义就是快速推进offset,重试topic的名字是%RETRY%+consumerGroup
,因此重试topic是group维度的,所以默认情况下一个group的consumer会有2个订阅topic,2个topic同时进行队列的rebalance。
offset是按照MessageQueue的维度进行维护的。
消息重试有2种反馈方式:
- 重试队列:客户端先通过Netty API发送消息到Broker,如果这时调用Netty发送异常则调用Producer发送到RetryTopic中。
- 死信队列:如果重试次数过多(默认16次)则会进入死信队列,死信队列的逻辑在Broker,Client不会将消息发送至死信队列Topic。
Producer端重试
下面的代码同步发送消息,如果5秒内没有发送成功,则重试5次
1 | DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer"); |
Producer 的 send 方法本身支持内部重试:
同步发送代码:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(Message)
,注意传超时参数时取的defaultMQProducer.getSendMsgTimeout()
。
异步发送:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(Message, SendCallback)
实际发送消息的代码位置(注意对sendResult的处理):org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
从源码中可以得到以下结论:
- 至多重试 2 次。
同步发送为 2 次,异步发送为 0 次,也就是说,异步发送是不会重试的。 - 如果发送失败,则轮转到下一个 Broker。这个方法的总耗时时间不超过 sendMsgTimeout 设置的值,默认 10s。
- 如果本身向 broker 发送消息产生超时异常,就不会再重试。
除了Producer客户端的自动重试外,应用程序在接收到SendResult后也可以自己尝试去重试。
Consumer重试
消费者消费消息后需要给Broker返回消费状态,比如并发消费者MessageListenerConcurrently
会返回ConsumeConcurrentlyStatus
:
- 如果消费成功,返回
CONSUME_SUCCESS
; - 如果消费出错,返回
RECONSUME_LATER
,一段时间后重试。
状态的返回是由用户线程控制的,但还有第三种可能,就是超时了,因此Consumer端的重试包含以下两种情况:
- 异常重试:Consumer端主动返回
RECONSUME_LATER
状态,Broker会在一段时间后重试; - 超时重试:Consumer端处理时间过长,或者由于某些原因线程挂起,导致迟迟没有返回消费状态,Broker就会认为Consumer消费超时,此时会发起超时重试。
如果Consumer端因为各种类型异常导致本次消费失败(如上所述的两种情况),为防止该消息丢失而需要将其重新回发给Broker端保存,保存这种因为异常无法正常消费而回发给MQ的消息队列称之为重试队列。RocketMQ会为每个消费组都设置一个Topic名称为%RETRY%+consumerGroup
的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的)。
不能保证消息消费失败加入重试队列后还能被同一消费者消费,可能会破坏消息的顺序性。
由于有些原因导致Consumer端长时间的无法正常消费从Broker端Pull过来的业务消息,为了确保消息不会被无故的丢弃,那么超过配置的“最大重试消费次数”后就会移入到这个死信队列中,RocketMQ会为每个消费组都设置一个Topic命名为“%DLQ%+consumerGroup”的死信队列。一般在实际应用中,移入至死信队列的消息,需要人工干预处理;
另外还有两种需要注意的情况:
- 只有消息模式为
MessageModel.CLUSTERING
集群模式时,Broker才会自动进行重试,而广播消息是不会重试的。 - 事务消息中的半事务消息通过 Broker 的回查机制重试,具体流程见下面的事务消息。
消费进度和 offset
消息消费完成后,需要将消费进度存储起来,即前面提到的offset。
consumerQueue类似一个无限长的数组,可以利用offset来直接定位。
offset的存储分为本地模式和远程模式:
- 本地模式:广播模式下,同消费组的消费者相互独立,消费进度要单独存储,对应的数据结构是
LocalFileOffsetStore
; - 远程模式:集群模式下,同一条消息只会被同一个消费组消费一次,消费进度会参与到负载均衡中,故消费进度是需要共享的,对应的数据结构是
RemoteBrokerOffsetStore
,下面对offset的讨论集中于远程模式。
Consumer更新offset到Broker
- 消费消息维护offset
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult
处理失败的消息会反馈给Consumer,然后发送到topic对应的RetryTopic,这样能快速令offset前进。 - 定时任务
每隔一段时间将各个队列的消费进度存储到对应的broker上,该时间由DefaultMQPushConsumer
的persistConsumerOffsetInterval
属性控制,默认为5秒。MQClientInstance#startScheduledTask
->MQClientInstance#persistAllConsumerOffset
启动一个定时任务提交offset。RemoteBrokerOffsetStore#updateConsumeOffsetToBroker
将offset发送到Broker。
Broker端offset的存储
org.apache.rocketmq.broker.processor.ConsumerManageProcessor#updateConsumerOffset
Broker会将offset存储在内存的一个offsetTable
中,即RemoteBrokerOffsetStore
。
Consumer从Broker拉取offset
DefaultMQPushConsumerImpl#pullMessage
拉消息后触发offset的更新。RemoteBrokerOffsetStore#readOffset
将offset保存到缓存offsetTable
中。
消息幂等
RocketMQ提供At least once的消息服务质量标准,表示一条消息至少被送达一次,也就是说,不允许丢消息,但允许有少量重复消息出现。
另外两种服务质量标准是At most once和Exactly once。
比如Producer发出了10个消息,如果Consumer接收中间两条消息时出错了,返回RECONSUME_LATER
,则该两条消息会被加入到RETRY
队列中重新消费。
解决消息重复消费问题的主要方法是幂等,一个幂等操作的特点是,其任意多次执行仅会产生一次影响,因此从对系统的影响结果来说:At least once + 幂等消费 = Exactly once。
实现幂等的方式有很多种,不过这些方案与消息队列本身已经没有多大关系了,因此这里仅仅简单描述一下这些实现方式:
- 利用数据库的唯一约束实现幂等
为一个操作设置一个唯一键,比如一个账单每个用户只允许变更一次,则可以给转账流水表中的账单ID和账户ID创建一个唯一约束。 - 加上前置条件
限制数据更新前的状态,比如只有在余额为500的时候才允许更新。
也可以单独加上一个唯一ID,每次发消息时生成一个全局唯一ID,消费时检查这个唯一ID是否有被消费过。
重试源码
1、Consumer端初始化重试队列信息
1.1、Consumer端启动后,创建重试队列的订阅group
1 | // Consumer自动创建一个group=%RETRY%+ConsumerGroup,用于后续的消费重试 |
2、Consumer端处理消费结果ConsumeMessageConcurrentlyService#processConsumeResult
2.1、设置ack
如果ConsumeRequest封装的消息全消费成功,则设置ackIndex的值为消息总条数-1,反之ackIndex-=1
1 | switch (status) { |
2.2、消费失败的消息触发重试
sendMessageBack将消费失败的msg发回broker,如果sendMessageBack也失败则保存到msgBackFailed
1 | switch (this.defaultMQPushConsumer.getMessageModel()) { |
2.3、更新offset
本地消费成功后会将消费进度同步到本地的processQueue
sendMessageBack成功的消息会从本地processQueue中移除,并更新进度,这条消息的消费会交由消费集群中的一个节点去继续消费,取决于负载均衡将此消息对应的topic对应的重试队列retryQueue分配给哪个节点。
1 | // 这里开始更新offset |
3、Broker端接收sendMessageBack消息
Broker端的处理主要是重试和延迟
3.1、设置topic
设置此条消息新的topic为%RETRY%消费组的名称,并且选择新topic的队列(默认为0,默认情况下RetryQueueNum为1)
1 | String newTopic = MixAll.getRetryTopic(requestHeader.getGroup()); |
3.2、将消息topic设置为重试topic
通过物理偏移量找到消息体
1 | MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset()); |
给原始消息新增属性,key为RETRY_TOPIC,value为原始消息的实际topic
和Consumer端消费消息时的resetRetryTopic(msgs)相呼应
1 | final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); |
3.3、获取延迟并判断是否进入死信队列
获取消息的延迟级别,默认此时的值为0
1 | int delayLevel = requestHeader.getDelayLevel(); |
消息每消费失败一次都会增加ReconsumeTimes的值,当这个值达到了maxReconsumeTimes(默认为16),则将此消息送入死信队列,且此死信队列不可读,也就是说这条消息在没有人工干预的情况下再也不能被消费了。
1 | if (msgExt.getReconsumeTimes() >= maxReconsumeTimes |
3.4、存储新创建的消息
这里有延迟消息的实现:如果delayLevel大于0,会将此消息的topic和queueID再进行一次转换,将此消息的newTopic、queueID存入到属性中(real_topic, real_qid),新的topic为SCHEDULE_TOPIC_XXXX,新的queue为根据delayLevel的等级去本地delayTimeLevel
找到对应的队列;后续会有ScheduleMessageService
做后续的逻辑。
1 | CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); |
4、Broker端重试ScheduleMessageService
服务是来处理延迟消息的服务组件,delayLevelTable
存储了不同的延迟级别的延迟时间,可配置。
1 | private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; |
4.1、遍历消费队列
1 | // 遍历delayLevelTable里所有级别队列 |
4.2、重置延时消息
判断时间是否达到了延迟时间,达到了再将这些消息的原始topic和原始队列取出转发存储起来,待消费者消费。
4.3、设置重试消息
重试消息会被转变2次topic和queueID,导致在ScheduleMessageService
转发存储的时候会将第一次转变的topic和queueID取出转发到topic=%RETRY%+consumerGroup
、queueId=0的消息队列。
这个消息会被consumerGroup这个消费组消费,至于哪个节点消费则由负载均衡来决定。