RocketMQ 延时消息
消息实时性
RocketMQ 支持 pull 和 push 两种消息消费模式,但 push 是使用长轮询 Pull 的方式实现的,可保证消息非常实时,消息实时性不低于 Push。
长轮询 pull 的原理是:发起 pull 请求失败后(比如 Broker 端暂时没有可以消费的消息),先 hold 住线程并挂起该请求。
RocketMQ除了上述的准实时消息外,还支持延时消息。
延时消息
RocketMQ里延时消息功能并不能指定时间,而是只能指定延时级别:
1 | Message message = new Message("TopicTest", "TagA", ("Hello scheduled message " + i).getBytes()); |
原理
- 延时消息和普通消息一样会先被写入
commitLog
,但不会立刻写入consumerQueue
中,而是存放到SCHEDULE_TOPIC_XXX
的topic下面,并且以延时粒度作为queueId区分; - 之后Broker端会有定时任务扫描
SCHEDULE_TOPIC_XXX
下的每个Queue,到时候后写入到consumerQueue中。
源码入口是ScheduleMessageService.start
,启动时会调用:
1 | public void start() { |
DeliverDelayedMessageTimerTaskScheduleMessageService.executeOnTimeup
:
扫描延迟消息队列(SCHEDULE_TOPIC_XXX
)的消息,将该延迟消息转换为指定的topic的消息。
1、读取不同延迟级别对应的延迟消息;
2、取得对应延迟级别读取的开始位置offset;
3、将延迟消息转换为指定topic的普通消息并存放起来。
4、修改下一次读取的offset值(修改的只是缓存),并指定下一次转换延迟消息的timetask。
ScheduleMessageService.this.persist
:
将延迟队列扫描处理的进度offset持久化到delayOffset.json文件中。
RocketMQ延迟队列也有一个缺点:Java中的Timer是单线程,而延迟消息的原理是Timer,也就是说当同时发送的延迟消息过多的时候一个线程处理速度一定是有瓶颈的,因此在实际项目中使用延迟消息一定不要过多依赖,只能作为一个辅助手段。