RocketMQ 消息重试和幂等

重试和幂等

Retry队列和offset

在RocketMQ启动时,每个group层面都会再定义一个专用的重试topic,重试消息被插入了重试topic队列。
重试队列存在的意义就是快速推进offset,重试topic的名字是%RETRY%+consumerGroup,因此重试topic是group维度的,所以默认情况下一个group的consumer会有2个订阅topic,2个topic同时进行队列的rebalance。
offset是按照MessageQueue的维度进行维护的
消息重试有2种反馈方式:

  1. 重试队列:客户端先通过Netty API发送消息到Broker,如果这时调用Netty发送异常则调用Producer发送到RetryTopic中。
  2. 死信队列:如果重试次数过多(默认16次)则会进入死信队列,死信队列的逻辑在Broker,Client不会将消息发送至死信队列Topic。

Producer端重试

下面的代码同步发送消息,如果5秒内没有发送成功,则重试5次

1
2
3
DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
producer.setRetryTimesWhenSendFailed(5);
producer.send(msg,5000L);

Producer 的 send 方法本身支持内部重试
同步发送代码:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(Message),注意传超时参数时取的defaultMQProducer.getSendMsgTimeout()
异步发送:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(Message, SendCallback)
实际发送消息的代码位置(注意对sendResult的处理):org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
从源码中可以得到以下结论:

  • 至多重试 2 次。
    同步发送为 2 次,异步发送为 0 次,也就是说,异步发送是不会重试的。
  • 如果发送失败,则轮转到下一个 Broker。这个方法的总耗时时间不超过 sendMsgTimeout 设置的值,默认 10s。
  • 如果本身向 broker 发送消息产生超时异常,就不会再重试。

除了Producer客户端的自动重试外,应用程序在接收到SendResult后也可以自己尝试去重试。

Consumer重试

消费者消费消息后需要给Broker返回消费状态,比如并发消费者MessageListenerConcurrently会返回ConsumeConcurrentlyStatus

  • 如果消费成功,返回CONSUME_SUCCESS
  • 如果消费出错,返回RECONSUME_LATER,一段时间后重试。

状态的返回是由用户线程控制的,但还有第三种可能,就是超时了,因此Consumer端的重试包含以下两种情况:

  1. 异常重试:Consumer端主动返回RECONSUME_LATER状态,Broker会在一段时间后重试;
  2. 超时重试:Consumer端处理时间过长,或者由于某些原因线程挂起,导致迟迟没有返回消费状态,Broker就会认为Consumer消费超时,此时会发起超时重试。

如果Consumer端因为各种类型异常导致本次消费失败(如上所述的两种情况),为防止该消息丢失而需要将其重新回发给Broker端保存,保存这种因为异常无法正常消费而回发给MQ的消息队列称之为重试队列。RocketMQ会为每个消费组都设置一个Topic名称为%RETRY%+consumerGroup的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的)。

不能保证消息消费失败加入重试队列后还能被同一消费者消费,可能会破坏消息的顺序性。

由于有些原因导致Consumer端长时间的无法正常消费从Broker端Pull过来的业务消息,为了确保消息不会被无故的丢弃,那么超过配置的“最大重试消费次数”后就会移入到这个死信队列中,RocketMQ会为每个消费组都设置一个Topic命名为“%DLQ%+consumerGroup”的死信队列。一般在实际应用中,移入至死信队列的消息,需要人工干预处理

另外还有两种需要注意的情况:

  • 只有消息模式为MessageModel.CLUSTERING集群模式时,Broker才会自动进行重试,而广播消息是不会重试的。
  • 事务消息中的半事务消息通过 Broker 的回查机制重试,具体流程见下面的事务消息

消费进度和 offset

offset的更新
消息消费完成后,需要将消费进度存储起来,即前面提到的offset。
consumerQueue类似一个无限长的数组,可以利用offset来直接定位。
offset的存储分为本地模式和远程模式:

  • 本地模式:广播模式下,同消费组的消费者相互独立,消费进度要单独存储,对应的数据结构是LocalFileOffsetStore
  • 远程模式:集群模式下,同一条消息只会被同一个消费组消费一次,消费进度会参与到负载均衡中,故消费进度是需要共享的,对应的数据结构是RemoteBrokerOffsetStore,下面对offset的讨论集中于远程模式。

Consumer更新offset到Broker

  1. 消费消息维护offset
    org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult
    处理失败的消息会反馈给Consumer,然后发送到topic对应的RetryTopic,这样能快速令offset前进。
  2. 定时任务
    每隔一段时间将各个队列的消费进度存储到对应的broker上,该时间由DefaultMQPushConsumerpersistConsumerOffsetInterval属性控制,默认为5秒。
    MQClientInstance#startScheduledTask -> MQClientInstance#persistAllConsumerOffset
    启动一个定时任务提交offset。
    RemoteBrokerOffsetStore#updateConsumeOffsetToBroker
    将offset发送到Broker。

Broker端offset的存储

org.apache.rocketmq.broker.processor.ConsumerManageProcessor#updateConsumerOffset
Broker会将offset存储在内存的一个offsetTable中,即RemoteBrokerOffsetStore

Consumer从Broker拉取offset

DefaultMQPushConsumerImpl#pullMessage
拉消息后触发offset的更新。
RemoteBrokerOffsetStore#readOffset
将offset保存到缓存offsetTable中。

消息幂等

RocketMQ提供At least once的消息服务质量标准,表示一条消息至少被送达一次,也就是说,不允许丢消息,但允许有少量重复消息出现。

另外两种服务质量标准是At most onceExactly once

比如Producer发出了10个消息,如果Consumer接收中间两条消息时出错了,返回RECONSUME_LATER,则该两条消息会被加入到RETRY队列中重新消费。

解决消息重复消费问题的主要方法是幂等,一个幂等操作的特点是,其任意多次执行仅会产生一次影响,因此从对系统的影响结果来说:At least once + 幂等消费 = Exactly once
实现幂等的方式有很多种,不过这些方案与消息队列本身已经没有多大关系了,因此这里仅仅简单描述一下这些实现方式:

  1. 利用数据库的唯一约束实现幂等
    为一个操作设置一个唯一键,比如一个账单每个用户只允许变更一次,则可以给转账流水表中的账单ID和账户ID创建一个唯一约束。
  2. 加上前置条件
    限制数据更新前的状态,比如只有在余额为500的时候才允许更新。
    也可以单独加上一个唯一ID,每次发消息时生成一个全局唯一ID,消费时检查这个唯一ID是否有被消费过。

重试源码

1、Consumer端初始化重试队列信息
1.1、Consumer端启动后,创建重试队列的订阅group

1
2
3
4
5
6
7
8
9
10
11
12
13
// Consumer自动创建一个group=%RETRY%+ConsumerGroup,用于后续的消费重试
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}

2、Consumer端处理消费结果ConsumeMessageConcurrentlyService#processConsumeResult
2.1、设置ack
如果ConsumeRequest封装的消息全消费成功,则设置ackIndex的值为消息总条数-1,反之ackIndex-=1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
switch (status) {
// 消费成功
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
// 消费失败、重试
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}

2.2、消费失败的消息触发重试
sendMessageBack将消费失败的msg发回broker,如果sendMessageBack也失败则保存到msgBackFailed

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
switch (this.defaultMQPushConsumer.getMessageModel()) {
...
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
// ackIndex+1开始的是未成功消费的
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
// 如果发送失败,则保存到msgBackFailed
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}

// 将sendMessageBack失败的消息从consumeRequest移除,并包装起来5s后转发给消费线程池继续消费
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);

this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}

2.3、更新offset
本地消费成功后会将消费进度同步到本地的processQueue
sendMessageBack成功的消息会从本地processQueue中移除,并更新进度,这条消息的消费会交由消费集群中的一个节点去继续消费,取决于负载均衡将此消息对应的topic对应的重试队列retryQueue分配给哪个节点。

1
2
3
4
5
6
7
// 这里开始更新offset
// 先从队列在consumer端的视图(一个treeMap)中移除
// 这里返回的offset是经过删除后最小的偏移量
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}

3、Broker端接收sendMessageBack消息
Broker端的处理主要是重试和延迟
3.1、设置topic
设置此条消息新的topic为%RETRY%消费组的名称,并且选择新topic的队列(默认为0,默认情况下RetryQueueNum为1)

1
2
3
4
5
6
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}

3.2、将消息topic设置为重试topic
通过物理偏移量找到消息体

1
2
3
4
5
6
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed, " + requestHeader.getOffset());
return CompletableFuture.completedFuture(response);
}

给原始消息新增属性,key为RETRY_TOPIC,value为原始消息的实际topic
和Consumer端消费消息时的resetRetryTopic(msgs)相呼应

1
2
3
4
5
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);

3.3、获取延迟并判断是否进入死信队列
获取消息的延迟级别,默认此时的值为0

1
2
3
4
5
6
int delayLevel = requestHeader.getDelayLevel();

int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}

消息每消费失败一次都会增加ReconsumeTimes的值,当这个值达到了maxReconsumeTimes(默认为16),则将此消息送入死信队列,且此死信队列不可读,也就是说这条消息在没有人工干预的情况下再也不能被消费了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes 
|| delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;

topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return CompletableFuture.completedFuture(response);
}
} else {
// 设置延迟级别为3,意味着要延迟10s再消费这条消息,消息重复消费需要借助延迟消费的功能实现
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
}

3.4、存储新创建的消息
这里有延迟消息的实现:如果delayLevel大于0,会将此消息的topic和queueID再进行一次转换,将此消息的newTopic、queueID存入到属性中(real_topic, real_qid),新的topic为SCHEDULE_TOPIC_XXXX,新的queue为根据delayLevel的等级去本地delayTimeLevel找到对应的队列;后续会有ScheduleMessageService做后续的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
return putMessageResult.thenApply((r) -> {
if (r != null) {
switch (r.getPutMessageStatus()) {
case PUT_OK:
String backTopic = msgExt.getTopic();
String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (correctTopic != null) {
backTopic = correctTopic;
}
this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
default:
break;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(r.getPutMessageStatus().name());
return response;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("putMessageResult is null");
return response;
});

4、Broker端重试
ScheduleMessageService服务是来处理延迟消息的服务组件,delayLevelTable存储了不同的延迟级别的延迟时间,可配置。

1
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

4.1、遍历消费队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 遍历delayLevelTable里所有级别队列
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}

if (timeDelay != null) {
// 由一个timer来处理
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}

4.2、重置延时消息
判断时间是否达到了延迟时间,达到了再将这些消息的原始topic和原始队列取出转发存储起来,待消费者消费。
4.3、设置重试消息
重试消息会被转变2次topic和queueID,导致在ScheduleMessageService转发存储的时候会将第一次转变的topic和queueID取出转发到topic=%RETRY%+consumerGroup、queueId=0的消息队列。
这个消息会被consumerGroup这个消费组消费,至于哪个节点消费则由负载均衡来决定。