RocketMQ 延时消息

消息实时性

RocketMQ 支持 pull 和 push 两种消息消费模式,但 push 是使用长轮询 Pull 的方式实现的,可保证消息非常实时,消息实时性不低于 Push。
长轮询 pull 的原理是:发起 pull 请求失败后(比如 Broker 端暂时没有可以消费的消息),先 hold 住线程并挂起该请求。

RocketMQ除了上述的准实时消息外,还支持延时消息

延时消息

RocketMQ里延时消息功能并不能指定时间,而是只能指定延时级别:

1
2
3
4
Message message = new Message("TopicTest", "TagA", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
producer.send(message);

原理

  1. 延时消息和普通消息一样会先被写入commitLog,但不会立刻写入consumerQueue中,而是存放到SCHEDULE_TOPIC_XXX的topic下面,并且以延时粒度作为queueId区分;
  2. 之后Broker端会有定时任务扫描SCHEDULE_TOPIC_XXX下的每个Queue,到时候后写入到consumerQueue中。

源码入口是ScheduleMessageService.start,启动时会调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void start() {
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
// 1. 根据支持的各种延迟级别,添加不同延迟时间的TimeTask
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
// 每个延迟级别对应一个offset,代表一个普通消息队列文件
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}

if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
// 2. 添加一个10s执行一次的TimeTask
this.timer.scheduleAtFixedRate(new TimerTask() {

@Override
public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}

DeliverDelayedMessageTimerTask
ScheduleMessageService.executeOnTimeup
扫描延迟消息队列(SCHEDULE_TOPIC_XXX)的消息,将该延迟消息转换为指定的topic的消息。
1、读取不同延迟级别对应的延迟消息;
2、取得对应延迟级别读取的开始位置offset;
3、将延迟消息转换为指定topic的普通消息并存放起来。
4、修改下一次读取的offset值(修改的只是缓存),并指定下一次转换延迟消息的timetask。

ScheduleMessageService.this.persist
将延迟队列扫描处理的进度offset持久化到delayOffset.json文件中。

RocketMQ延迟队列也有一个缺点:Java中的Timer是单线程,而延迟消息的原理是Timer,也就是说当同时发送的延迟消息过多的时候一个线程处理速度一定是有瓶颈的,因此在实际项目中使用延迟消息一定不要过多依赖,只能作为一个辅助手段。