Tallate

该吃吃该喝喝 啥事别往心里搁

消息有序性

一些场景需要保证操作的顺序性,比如A系统要将订单同步给B系统,但是要按照订单所发生事件顺序来同步,比如后台先修改订单价格再用户支付,那么就要先发修改价格的状态再发送支付成功的状态。
RocketMQ 可以严格的保证消息有序。

消息优先级

消息优先级机制可以让消息队列中优先级较高的消息先投递,比如订单创建消息就可以优先于日常推送消息。
RocketMQ 没有直接实现消息的优先级,主要是处于性能考虑,因为 RocketMQ 所有消息都是持久化的,所以如果按照优先级来排序,开销会非常大。但也可以用一种变通的方式实现消息的优先级,比如创建 3 个代表不同优先级的队列。

有序消息的实现方式

要保证MQ消息消费的有序性,需要保证以下3个阶段的有序性:
RocketMQ保持消息的有序性

  1. 消息被发送时保持有序;
  2. 消息被存储时保持和发送时的顺序一致;
  3. 消息被消费时保持和存储时的顺序一致。

在RocketMQ中,有两种实现方式:

  1. 全局有序
    RocketMQ-全局有序
    一个Topic内所有的消息都发送到同一个Queue。
    适用于性能要求不高,所有的消息严格按照FIFO原则进行消息发布和消费的场景。
  2. 分区有序
    RocketMQ-分区有序
    RocketMQ根据用户自定义的Sharding Key将消息散列到不同的Queue,每个Queue内的消费是严格有序的。
    适用于性能要求较高的场景。

RocketMQ中有序消息的实现原理

发送端需要指定消息的ShardingKey:

1
org.apache.rocketmq.client.producer.DefaultMQProducer#send(Message, MessageQueueSelector, Object)

如上述消息发送API所示,第二个参数MessageQueueSelector可以用于发送消息时指定某个Queue,在API层面没有区分全局有序和分区有序,如果要实现全局有序就把所有消息都Sharding到一个Queue上即可。

消费方拉取时需要区分Pull和Push两种模式:

  1. Pull
    Pull模式就是每次从一个Queue中拉取。
  2. Push
    Push模式使用一个线程轮询Broker拉取消息,然后调用客户端提供的回调函数进行消费,在客户端中需要保证调用MessageListener时消息的顺序性。
    ConsumeMessageOrderlyService
    实现上,就是在消费前先对队列加锁,避免Consumer并发消费一个队列(一个队列只能被一个Consumer消费,因此这个锁主要是保证Consumer不会并发消费一个消息,是单机锁而不是分布式锁)。

有序性与可用性间存在的矛盾

为了保证服务的高可用性,RocketMQ支持把一个主题分布到多对主从节点上去,每对主从节点中承担主题中的一部分队列,如果某个主节点宕机了,会自动切换到其他主节点上继续发消息,这样既解决了可用性的问题,还可以通过水平扩容来提升 Topic 总体的性能。
但是严格的顺序性要求指定队列来发送消息,一个队列一定是落在一个特定的主节点上的,如果该主节点宕机了,那么顺序性也就不存在了。
在RocketMQ中引入了Dledger的复制方式,这种方式对上述问题的解决方案是:消息必须被复制到半数以上节点才能返回成功,且主节点宕机后支持通过选举来动态切换主节点,Dledger在选举时,总会把数据和主节点一致的从节点选为新的主节点,这样就保证了数据的一致性,既不会丢消息,还可以保证严格顺序。

消息可靠性

只要不是存储硬件发生不可逆的损坏,RocketMQ 都可以保证消息不丢或少丢,比如 Broker 挂掉、操作系统挂掉等;
如果硬件发生不可逆的损坏,则该节点的消息就无法恢复了,需要通过异步复制来恢复。

如何检测消息丢失?

  1. 利用分布式链路追踪系统
  2. 利用消息队列的有序性来验证是否有消息丢失

RocketMQ 如何保证消息不丢?

保证消息不丢失的原理是复制,传统的复制方式有异步复制和同步双写复制两种:

  1. 异步复制
    消息先发送到主节点上,就返回”写入成功”,然后消息再异步复制到从节点上。
  2. 同步双写复制
    消息同步双写到主从节点上,主从都写成功,才返回”写入成功”。

    这两种方式本质区别是:写入多少个副本再返回,异步复制需要的副本数是1,而同步双写需要的副本数为2。

RocketMQ的复制模式是基于Deldger的新复制模式
在 RocketMQ 中,Broker 的主从关系是通过配置固定的,不支持动态切换。如果主节点宕机,生产者就不能再生产消息了,消费者可以自动切换到从节点继续进行消费。这时候,即使有一些消息没有来得及复制到从节点上,这些消息依然躺在主节点的磁盘上,除非是主节点的磁盘坏了,否则等主节点重新恢复服务的时候,这些消息依然可以继续复制到从节点上,也可以继续消费,不会丢消息,消息的顺序也是没有问题的。

Deldger的复制模式

  • RocketMQ-on-DLedger Group 是指一组相同名称的 Broker,至少需要 3 个节点,通过 Raft 自动选举出一个 Leader,其余节点 作为 Follower,并在 Leader 和 Follower 之间复制数据以保证高可用。
  • RocketMQ-on-DLedger Group 能自动容灾切换,并保证数据一致。
  • RocketMQ-on-DLedger Group 是可以水平扩展的,也即可以部署任意多个 RocketMQ-on-DLedger Group 同时对外提供服务。

一主二从的三副本集群的复制过程:

  1. Dledger 在写入消息的时候,要求至少消息复制到半数以上的节点之后,才给客户端返回写入成功,并且它是支持通过选举来动态切换主节点的。

    至少半数这个要求有点像MySQL里的semi-sync和Redis里的复制策略,说明这种方案确实非常有效,只要是涉及集群复制的场景都可以考虑采用。

  2. 拿 3 个节点举例说明一下。
    当主节点宕机的时候,2 个从节点会通过投票选出一个新的主节点来继续提供服务,相比主从的复制模式,解决了可用性的问题。
    由于消息要至少复制到 2 个节点上才会返回写入成功,即使主节点宕机了,也至少有一个节点上的消息是和主节点一样的。
  3. Dledger 在选举时,总会把数据和主节点一样的从节点选为新的主节点,
    这样就保证了数据的一致性,既不会丢消息,还可以保证严格顺序。

Dledger的不足:

  • 选举过程中不能提供服务。
    最少需要 3 个节点才能保证数据一致性,3 节点时,只能保证 1 个节点宕机时可
    用,如果 2 个节点同时宕机,即使还有 1 个节点存活也无法提供服务,资源的利用率比较
    低。另外,由于至少要复制到半数以上的节点才返回写入成功,性能上也不如主从异步复制
    的方式快。

RocketMQ如何保证高可用

  • 通过多主多从架构保证高可用
    RocketMQ 支持把一个主题分布到多对主从节点上去,每对主从节点中承担主题中的一部分队列,如果某个主节点宕机了,会自动切换到其他主节点上继续发消息,这样既解决了可用性的问题,还可以通过水平扩容来提升 Topic 总体的性能。
  • 高可用与严格顺序不能并存
    在需要保证消息严格顺序的场景下,由于在主题层面无法保证严格顺序,所以必须指定队列来发送消息(指分区有序或全局有序),对于任何一个队列,它一定是落在一组特定的主从节点上,如果这个主节点宕机,其他的主节点是无法替代这个主节点的,否则就无法保证严格顺序。在这种复制模式下,严格顺序和高可用只能选择一个。

如果服务器宕机怎么办?

现在我们可以来回答服务器宕机应该怎么办?

Broker宕机

  1. RocketMQ并没有实现高可用性,如上所述,Master宕机会触发重新选举,因为消息需要被同步到过半Slave后才会返回,因此Master宕机后消息本身不会丢。
  2. 在RocketMQ中,Broker会定时向NameServer发送心跳,Broker如果宕机,一段时间后NameServer发现Broker上次心跳已经超过了时间阈值,则会将该Broker移除出服务注册表。

NameServer宕机

NameServer集群中的节点是没有Master、Slave之分的,其中一台挂掉并不会影响其他节点提供服务。

Consumer / Producer 宕机

Consumer或Producer宕机不会产生任何影响,随时可以扩容缩容。

主从集群的扩容缩容

在RocketMQ中消费的基本单位是队列而不是Broker,因此扩容时单纯增加一台Broker并没有什么作用,还需要给这台Broker分配Queue。
在RocketMQ中,读写队列与读写分离是完全不同的两个概念:

  • 读写分离是HA机制:将一个节点的数据同步到另一个节点,主节点可用于读写,从节点用于读。
  • 读写队列则是在做路由信息时使用。在消息发送时,使用写队列个数返回路由信息,而消息消费时按照读队列个数返回路由信息。在物理文件层面,只有写队列才会创建文件。举个例子:写队列个数是8,设置的读队列个数是4.这个时候,会创建8个文件夹,代表0 1 2 3 4 5 6 7,但在消息消费时,路由信息只返回4,在具体拉取消息时,就只会消费0 1 2 3这4个队列中的消息,4 5 6 7压根就没有消息。反过来,如果写队列个数是4,读队列个数是8,在生产消息时只会往0 1 2 3中生产消息,消费消息时则会从0 1 2 3 4 5 6 7所有的队列中消费,当然 4 5 6 7中压根就没有消息 ,假设消费group有两个消费者,事实上只有第一个消费者在真正的消费消息(0 1 2 3),第二个消费者压根就消费不到消息。
    因此只有readQueueNums>=writeQueueNums时程序才能正常进行,最好是readQueueNums>writeQueueNums。
    读写队列的分配见代码:MQClientInstance#updateTopicRouteInfoFromNameServer

rocketmq设置读写队列数的目的在于方便队列的缩容和扩容。比如缩容至1/2的话就是先把写队列缩小至1/2,等后一半队列中的消息被消费完毕,然后再将读队列缩小一半,即可达到平滑缩容的目的。

  • 将已有的Queue分配到新Broker
  • 增加更多的Queue并分配到新Broker

具体扩容的操作可参考:RocketMQ 主题扩分片后遇到的坑

事务消息

RocketMQ中的实现

  • 半事务消息:发事务消息时,发送方会先发送一条半事务消息给 Broker,此时 Broker 暂未收到 Producer 对该消息的二次确认,此时该消息被标记成暂不能投递状态,处于该种状态下的消息即半事务消息;
  • 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,Broker 通过扫描发现某条消息长期处于半事务消息时,需要主动向 Producer 询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。

RocketMQ-事务消息
事务消息的处理流程如上图所示:
图片来自事务消息

  1. Producer 发半事务消息给 Broker,Producer需要记录消息状态到一张本地消息事务表内,和业务数据在一个事务里提交,之后生产消费者均通过这张表里的状态来判断事务的状态;

    如果业务数据本身就是有状态的,其实也可以通过查询业务数据状态来判断事务状态。

  2. Broker 将半事务消息持久化后,向 Producer 返回 ACK 确认消息已发送成功,此时消息为半事务消息;
  3. Producer 端开始执行本地事务逻辑;
  4. Producer 端根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),Broker 收到 Commit 状态则将半事务消息标记为可投递,Consumer 最终将收到该消息;Broker 收到 Rollback 状态则删除半事务消息,Consumer 将不会接受该消息。

接下来我们看看各步骤如果出错会怎么样:

  1. 2消息丢失或3执行失败:发送方终止本次事务,提交 Rollback 消息给服务端;
  2. 4的 Commit 消息提交失败:见5,即服务端会回查事务状态;

事务消息回查步骤如下:

  1. 在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
  2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行操作。

事务消息源码

Producer发送

  1. 发送
    org.apache.rocketmq.client.producer.TransactionMQProducer#sendMessageInTransaction
    先添加半消息的标识,然后和普通消息一样发送,如果发送成功则执行本地事务。
  2. 执行本地事务
    org.apache.rocketmq.client.producer.TransactionListener#executeLocalTransaction
    使用了发送前传入的回调。
    返回值表示本地事务(LocalTransactionState)的执行情况:
    • COMMIT_MESSAGE:endTransaction时发送TRANSACTION_COMMIT_TYPE类型的消息到Broker,此时事务被提交,Consumer端可以消费该条消息;
    • ROLLBACK_MESSAGE:endTransaction时通知Broker TRANSACTION_ROLLBACK_TYPE,事务被回滚,Consumer端不可消费该消息;
    • UNKNOW:endTransaction时通知Broker TRANSACTION_NOT_TYPE,本地事务未执行完毕,Broker需要稍后反查本地事务状态。
  3. 根据本地事务执行结果,开始执行提交或回滚
    org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#endTransaction
    正如我们之前讨论的那样,这个消息发失败也没有关系,Broker之后会有反查。

Broker接收

  1. 处理发消息请求
    org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage
  2. 处理事务消息
    org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#prepareMessage -> org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#putHalfMessage
    可以看到RocketMQ并没有将半消息保存到客户端指定的那个队列中,而是记录了原始的主题队列后,把这个半消息保存在了一个特殊的内部主题 RMQ_SYS_TRANS_HALF_TOPIC 中,使用的队列号固定为 0
    这个主题和队列对消费者是不可见的,所以里面的消息永远不会被消费。这样,就保证了在事务提交成功之前,这个半消息对消费者来说是消费不到的。

Broker反查

  1. 启动反查定时任务
    TransactionalMessageCheckService
  2. 反查
    org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener#resolveHalfMsg
    定时从半消息队列中读出所有待反查的半消息,针对每个需要反查的半消息,Broker 会给对应的 Producer 发一个要求执行事务状态反查的 RPC 请求,根据 RPC 返回响应中的反查结果,来决定这个半消息是需要提交还是回滚,或者后续继续来反查。
  3. 结束事务
    org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest
    最后,提交或者回滚事务实现的逻辑是差不多的,首先把半消息标记为已处理,如果是提交事务,那就把半消息从半消息队列中复制到这个消息真正的主题和队列中去,如果要回滚事务,这一步什么都不需要做,最后结束这个事务。

QA

Broker的offset如何维护?本地的offset如何维护?

拉取速度是否会受到消费速度制约?如果不是,那么拉取速度过快是不是会导致重复消费甚至OOM?

并发消费offset为1-10的消息,其中8失败了,其他都是成功的,这种情况下本地的offset如何记录,broker的offset如何记录,有哪些是可能被重复消费的?

如何尽可能的避免重复消费?

应用第一次启动时offset如何定义?是否会消费历史消息,应用重启是否会丢消息?

应用扩所容是否会导致消费消息丢失和消费队列的重新排序?

重试和幂等

Retry队列和offset

在RocketMQ启动时,每个group层面都会再定义一个专用的重试topic,重试消息被插入了重试topic队列。
重试队列存在的意义就是快速推进offset,重试topic的名字是%RETRY%+consumerGroup,因此重试topic是group维度的,所以默认情况下一个group的consumer会有2个订阅topic,2个topic同时进行队列的rebalance。
offset是按照MessageQueue的维度进行维护的
消息重试有2种反馈方式:

  1. 重试队列:客户端先通过Netty API发送消息到Broker,如果这时调用Netty发送异常则调用Producer发送到RetryTopic中。
  2. 死信队列:如果重试次数过多(默认16次)则会进入死信队列,死信队列的逻辑在Broker,Client不会将消息发送至死信队列Topic。

Producer端重试

下面的代码同步发送消息,如果5秒内没有发送成功,则重试5次

1
2
3
DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
producer.setRetryTimesWhenSendFailed(5);
producer.send(msg,5000L);

Producer 的 send 方法本身支持内部重试
同步发送代码:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(Message),注意传超时参数时取的defaultMQProducer.getSendMsgTimeout()
异步发送:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(Message, SendCallback)
实际发送消息的代码位置(注意对sendResult的处理):org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
从源码中可以得到以下结论:

  • 至多重试 2 次。
    同步发送为 2 次,异步发送为 0 次,也就是说,异步发送是不会重试的。
  • 如果发送失败,则轮转到下一个 Broker。这个方法的总耗时时间不超过 sendMsgTimeout 设置的值,默认 10s。
  • 如果本身向 broker 发送消息产生超时异常,就不会再重试。

除了Producer客户端的自动重试外,应用程序在接收到SendResult后也可以自己尝试去重试。

Consumer重试

消费者消费消息后需要给Broker返回消费状态,比如并发消费者MessageListenerConcurrently会返回ConsumeConcurrentlyStatus

  • 如果消费成功,返回CONSUME_SUCCESS
  • 如果消费出错,返回RECONSUME_LATER,一段时间后重试。

状态的返回是由用户线程控制的,但还有第三种可能,就是超时了,因此Consumer端的重试包含以下两种情况:

  1. 异常重试:Consumer端主动返回RECONSUME_LATER状态,Broker会在一段时间后重试;
  2. 超时重试:Consumer端处理时间过长,或者由于某些原因线程挂起,导致迟迟没有返回消费状态,Broker就会认为Consumer消费超时,此时会发起超时重试。

如果Consumer端因为各种类型异常导致本次消费失败(如上所述的两种情况),为防止该消息丢失而需要将其重新回发给Broker端保存,保存这种因为异常无法正常消费而回发给MQ的消息队列称之为重试队列。RocketMQ会为每个消费组都设置一个Topic名称为%RETRY%+consumerGroup的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的)。

不能保证消息消费失败加入重试队列后还能被同一消费者消费,可能会破坏消息的顺序性。

由于有些原因导致Consumer端长时间的无法正常消费从Broker端Pull过来的业务消息,为了确保消息不会被无故的丢弃,那么超过配置的“最大重试消费次数”后就会移入到这个死信队列中,RocketMQ会为每个消费组都设置一个Topic命名为“%DLQ%+consumerGroup”的死信队列。一般在实际应用中,移入至死信队列的消息,需要人工干预处理

另外还有两种需要注意的情况:

  • 只有消息模式为MessageModel.CLUSTERING集群模式时,Broker才会自动进行重试,而广播消息是不会重试的。
  • 事务消息中的半事务消息通过 Broker 的回查机制重试,具体流程见下面的事务消息

消费进度和 offset

offset的更新
消息消费完成后,需要将消费进度存储起来,即前面提到的offset。
consumerQueue类似一个无限长的数组,可以利用offset来直接定位。
offset的存储分为本地模式和远程模式:

  • 本地模式:广播模式下,同消费组的消费者相互独立,消费进度要单独存储,对应的数据结构是LocalFileOffsetStore
  • 远程模式:集群模式下,同一条消息只会被同一个消费组消费一次,消费进度会参与到负载均衡中,故消费进度是需要共享的,对应的数据结构是RemoteBrokerOffsetStore,下面对offset的讨论集中于远程模式。

Consumer更新offset到Broker

  1. 消费消息维护offset
    org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult
    处理失败的消息会反馈给Consumer,然后发送到topic对应的RetryTopic,这样能快速令offset前进。
  2. 定时任务
    每隔一段时间将各个队列的消费进度存储到对应的broker上,该时间由DefaultMQPushConsumerpersistConsumerOffsetInterval属性控制,默认为5秒。
    MQClientInstance#startScheduledTask -> MQClientInstance#persistAllConsumerOffset
    启动一个定时任务提交offset。
    RemoteBrokerOffsetStore#updateConsumeOffsetToBroker
    将offset发送到Broker。

Broker端offset的存储

org.apache.rocketmq.broker.processor.ConsumerManageProcessor#updateConsumerOffset
Broker会将offset存储在内存的一个offsetTable中,即RemoteBrokerOffsetStore

Consumer从Broker拉取offset

DefaultMQPushConsumerImpl#pullMessage
拉消息后触发offset的更新。
RemoteBrokerOffsetStore#readOffset
将offset保存到缓存offsetTable中。

消息幂等

RocketMQ提供At least once的消息服务质量标准,表示一条消息至少被送达一次,也就是说,不允许丢消息,但允许有少量重复消息出现。

另外两种服务质量标准是At most onceExactly once

比如Producer发出了10个消息,如果Consumer接收中间两条消息时出错了,返回RECONSUME_LATER,则该两条消息会被加入到RETRY队列中重新消费。

解决消息重复消费问题的主要方法是幂等,一个幂等操作的特点是,其任意多次执行仅会产生一次影响,因此从对系统的影响结果来说:At least once + 幂等消费 = Exactly once
实现幂等的方式有很多种,不过这些方案与消息队列本身已经没有多大关系了,因此这里仅仅简单描述一下这些实现方式:

  1. 利用数据库的唯一约束实现幂等
    为一个操作设置一个唯一键,比如一个账单每个用户只允许变更一次,则可以给转账流水表中的账单ID和账户ID创建一个唯一约束。
  2. 加上前置条件
    限制数据更新前的状态,比如只有在余额为500的时候才允许更新。
    也可以单独加上一个唯一ID,每次发消息时生成一个全局唯一ID,消费时检查这个唯一ID是否有被消费过。

重试源码

1、Consumer端初始化重试队列信息
1.1、Consumer端启动后,创建重试队列的订阅group

1
2
3
4
5
6
7
8
9
10
11
12
13
// Consumer自动创建一个group=%RETRY%+ConsumerGroup,用于后续的消费重试
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}

2、Consumer端处理消费结果ConsumeMessageConcurrentlyService#processConsumeResult
2.1、设置ack
如果ConsumeRequest封装的消息全消费成功,则设置ackIndex的值为消息总条数-1,反之ackIndex-=1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
switch (status) {
// 消费成功
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
// 消费失败、重试
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}

2.2、消费失败的消息触发重试
sendMessageBack将消费失败的msg发回broker,如果sendMessageBack也失败则保存到msgBackFailed

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
switch (this.defaultMQPushConsumer.getMessageModel()) {
...
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
// ackIndex+1开始的是未成功消费的
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
// 如果发送失败,则保存到msgBackFailed
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}

// 将sendMessageBack失败的消息从consumeRequest移除,并包装起来5s后转发给消费线程池继续消费
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);

this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}

2.3、更新offset
本地消费成功后会将消费进度同步到本地的processQueue
sendMessageBack成功的消息会从本地processQueue中移除,并更新进度,这条消息的消费会交由消费集群中的一个节点去继续消费,取决于负载均衡将此消息对应的topic对应的重试队列retryQueue分配给哪个节点。

1
2
3
4
5
6
7
// 这里开始更新offset
// 先从队列在consumer端的视图(一个treeMap)中移除
// 这里返回的offset是经过删除后最小的偏移量
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}

3、Broker端接收sendMessageBack消息
Broker端的处理主要是重试和延迟
3.1、设置topic
设置此条消息新的topic为%RETRY%消费组的名称,并且选择新topic的队列(默认为0,默认情况下RetryQueueNum为1)

1
2
3
4
5
6
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}

3.2、将消息topic设置为重试topic
通过物理偏移量找到消息体

1
2
3
4
5
6
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed, " + requestHeader.getOffset());
return CompletableFuture.completedFuture(response);
}

给原始消息新增属性,key为RETRY_TOPIC,value为原始消息的实际topic
和Consumer端消费消息时的resetRetryTopic(msgs)相呼应

1
2
3
4
5
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);

3.3、获取延迟并判断是否进入死信队列
获取消息的延迟级别,默认此时的值为0

1
2
3
4
5
6
int delayLevel = requestHeader.getDelayLevel();

int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}

消息每消费失败一次都会增加ReconsumeTimes的值,当这个值达到了maxReconsumeTimes(默认为16),则将此消息送入死信队列,且此死信队列不可读,也就是说这条消息在没有人工干预的情况下再也不能被消费了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes 
|| delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;

topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return CompletableFuture.completedFuture(response);
}
} else {
// 设置延迟级别为3,意味着要延迟10s再消费这条消息,消息重复消费需要借助延迟消费的功能实现
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
}

3.4、存储新创建的消息
这里有延迟消息的实现:如果delayLevel大于0,会将此消息的topic和queueID再进行一次转换,将此消息的newTopic、queueID存入到属性中(real_topic, real_qid),新的topic为SCHEDULE_TOPIC_XXXX,新的queue为根据delayLevel的等级去本地delayTimeLevel找到对应的队列;后续会有ScheduleMessageService做后续的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
return putMessageResult.thenApply((r) -> {
if (r != null) {
switch (r.getPutMessageStatus()) {
case PUT_OK:
String backTopic = msgExt.getTopic();
String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (correctTopic != null) {
backTopic = correctTopic;
}
this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
default:
break;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(r.getPutMessageStatus().name());
return response;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("putMessageResult is null");
return response;
});

4、Broker端重试
ScheduleMessageService服务是来处理延迟消息的服务组件,delayLevelTable存储了不同的延迟级别的延迟时间,可配置。

1
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

4.1、遍历消费队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 遍历delayLevelTable里所有级别队列
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}

if (timeDelay != null) {
// 由一个timer来处理
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}

4.2、重置延时消息
判断时间是否达到了延迟时间,达到了再将这些消息的原始topic和原始队列取出转发存储起来,待消费者消费。
4.3、设置重试消息
重试消息会被转变2次topic和queueID,导致在ScheduleMessageService转发存储的时候会将第一次转变的topic和queueID取出转发到topic=%RETRY%+consumerGroup、queueId=0的消息队列。
这个消息会被consumerGroup这个消费组消费,至于哪个节点消费则由负载均衡来决定。

消息实时性

RocketMQ 支持 pull 和 push 两种消息消费模式,但 push 是使用长轮询 Pull 的方式实现的,可保证消息非常实时,消息实时性不低于 Push。
长轮询 pull 的原理是:发起 pull 请求失败后(比如 Broker 端暂时没有可以消费的消息),先 hold 住线程并挂起该请求。

RocketMQ除了上述的准实时消息外,还支持延时消息

延时消息

RocketMQ里延时消息功能并不能指定时间,而是只能指定延时级别:

1
2
3
4
Message message = new Message("TopicTest", "TagA", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
producer.send(message);

原理

  1. 延时消息和普通消息一样会先被写入commitLog,但不会立刻写入consumerQueue中,而是存放到SCHEDULE_TOPIC_XXX的topic下面,并且以延时粒度作为queueId区分;
  2. 之后Broker端会有定时任务扫描SCHEDULE_TOPIC_XXX下的每个Queue,到时候后写入到consumerQueue中。

源码入口是ScheduleMessageService.start,启动时会调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void start() {
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
// 1. 根据支持的各种延迟级别,添加不同延迟时间的TimeTask
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
// 每个延迟级别对应一个offset,代表一个普通消息队列文件
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}

if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
// 2. 添加一个10s执行一次的TimeTask
this.timer.scheduleAtFixedRate(new TimerTask() {

@Override
public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}

DeliverDelayedMessageTimerTask
ScheduleMessageService.executeOnTimeup
扫描延迟消息队列(SCHEDULE_TOPIC_XXX)的消息,将该延迟消息转换为指定的topic的消息。
1、读取不同延迟级别对应的延迟消息;
2、取得对应延迟级别读取的开始位置offset;
3、将延迟消息转换为指定topic的普通消息并存放起来。
4、修改下一次读取的offset值(修改的只是缓存),并指定下一次转换延迟消息的timetask。

ScheduleMessageService.this.persist
将延迟队列扫描处理的进度offset持久化到delayOffset.json文件中。

RocketMQ延迟队列也有一个缺点:Java中的Timer是单线程,而延迟消息的原理是Timer,也就是说当同时发送的延迟消息过多的时候一个线程处理速度一定是有瓶颈的,因此在实际项目中使用延迟消息一定不要过多依赖,只能作为一个辅助手段。

RocketMQ-消息存储
如上图所示,消息的存储分为如下 3 个部分:

  1. CommitLog:日志,存储消息主体;
  2. ConsumerQueue:在 CommitLog 中根据 Topic 和 Tag 检索消息是非常低效的,因此引入了 ConsumerQueue 作为消费消息的索引,它保存的其实是 CommitLog 中存储的消息的指针。
  3. IndexFile:hash 索引,提供一种通过 key 或时间区间来查询消息的方法。
阅读全文 »

架构

选型

消息队列 Kafka RocketMQ
适用场景 大量消息快速消费如流式计算 高性能、稳定、高可靠
热度 与周边生态系统的兼容性最好 有活跃中文社区
消息可靠传递
延迟 毫秒级 毫秒级
性能 每秒几十万 每秒几十万
消息丢失 参数优化配置后0丢失 参数优化配置后0丢失
消费模式 Pull Pull + Push(原理都是Pull)
可用性 非常高(分布式) 非常高(主从)
topic数量对吞吐量的影响 topic达到几十,几百个时,吞吐量会大幅度下降 topic达到几百,几千个时,吞吐量会有较小幅度的下降

缺点:

  • Kafka:同步收发消息的响应时延比较高,因为当客户端发送一条消息的时候,Kafka 并不会立即发送出去,而是要等一会儿攒一批再发送,在它的 Broker 中,很多地方都会使用这种“先攒一波再一起处理”的设计。当业务场景中,每秒钟消息数量没有那么多的时候,Kafka 的时延反而会比较高。所以,Kafka 不太适合在线业务场景。
  • RocketMQ:没有太明显的缺点

部署结构

RocketMQ-架构图
RocketMQ-部署图

  • 启动 NameServer,NameServer 起来后监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。
  • Broker 启动,跟所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息(IP+端口等)以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
  • 收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。
  • Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发消息。
  • Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。

下图来自GitHub
rocketmq_architecture

推拉模型

推模式优缺点

  • 实时性高

  • 推送速率难以适应消费速率

  • 不同消费者的消费速率很有可能不一样,Broker难以平衡每个消费者的推送速率,如果要实现自适应就会大大增加Broker自身的复杂度

因此推模式适用于消息量不大、消费能力强要求实时性高的情况下。

拉模式优缺点

  • 消费者可以根据自己能力拉取消息处理,灵活稳定

  • 可以更合适地进行消息的批量发送,基于推模式可以来一个消息就推送,也可以缓存一些消息之后再推送,但是推送的时候其实不知道消费者能不能一次性处理这么多消息。而拉模式可以根据消费者缓存能力决定拉取多少消息。

  • 会造成消息的延迟消费,如果长时间没有消息,消费端不断轮询拉取,会造成一定时间的忙等,如果轮询时间过长,又会导致消息的延迟加大。

QA

消息队列可以做什么?

异步处理耗时任务
解耦上下游系统
削峰填谷

哪些消息队列可以做到在消息生产、消费过程中不重、不丢(Exactly once)?

Kafka、RocketMQ、RabbitMQ都没有实现这个需求,因为要实现Exactly once,除了重发外还需要做幂等,实现比较复杂,而且对性能影响比较大。

RocketMQ中的Consumer是推还是拉?

RocketMQ支持推和拉,但这两种方式实际上都是通过pull实现的,只是拉是同步的,而推是传个回调函数,当RocketMQ客户端接收到消息后再调用这个回调函数。

RocketMQ发送、存储、接收的流程?

当发现消费者不消费时,如何诊断问题?

  1. 检查连接状态,看消费者是否正常连接Broker;
  2. 看消费者是否有分配到ConsumeQueue,因为一个ConsumeQueue只能被一个消费者消费,所以消费者数量超过ConsumeQueue时,就会出现部分消费者没有ConsumeQueue可消费的情况;
  3. 生产者是否有正常消费,从控制台就可以看;
  4. 如果检查完以上步骤后仍然没有发现问题,则需要查看消费者的客户端日志再进一步分析。

怎么实现消息发送的严格顺序性?

RMQ中的分区算法指的就是把消息发到固定的某些队列上,因为同一队列只能被一个消费者消费,因此可以保证这个队列中消息的顺序性。
可选的分区算法如:

  1. 在表中存储key和分区的对应关系,通过查表确定分区号;
  2. 取模

RocketMQ能否做到单队列的并行消费?

RocketMQ 在消费的时候,为了保证消息的不丢失和严格顺序,每个队列只能串行消费(一个消费者可以消费多个队列),无法做到并发,否则会出现消费空洞的问题。那如果放宽一下限制,不要求严格顺序,能否做到单个队列的并行消费呢?

怎么实现负载均衡?

RocketMQ如何保证消息不丢(消息一定能被消费)?

  1. Producer端重试
    默认push重试3次。
  2. Broker端只有在复制半数以上副本之后才会返回发送成功。

    和MySQL里的semisync有点像。

  3. Consumer端重复消费
    DefaultMQPushConsumer默认超时重试无限次,默认异常重试16次,过期或重试不成功则进入死信队列、默认凌晨 3 点会清除死信队列,为了确保重试不会出现重复消费,业务逻辑一般都需要保证幂等(幂等key可以使用业务oid或uniqId)。
    Consumer有两种返回值,CONSUME_SUCCESS和CONSUME_LATER,后者令Broker将消息转移到另一个Retry队列中供重试使用。

RocketMQ如何实现消息去重?

RocketMQ本身没有实现消息的去重功能,因为RocketMQ是At-Least-Once的。
所以,很多时候我们需要自己通过Redis等来实现消息去重,但是要注意的是不要用错了msgId:

  • MessageExt.msgId:重试时这个msgId是会变的,因此不适合当作幂等key;
  • MessageExt.properties["PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX"]:相对上面那个msgId来说,这个UNIQ_KEY就算重试多次值还是一样的,因此更适合当作幂等key。

消费失败怎么重发的?

怎么判断消息堆积了?

刷盘的原理?

  1. CommitLog
  2. ConsumeLog

怎么实现消息复制(Broker主从之间)?

RocketMQ 如何保证消息的高可用?

  1. NameServer 集群
    NameServer集群节点没有Master、Slave之分,即使挂掉其中几台,其他的仍可提供服务。
  2. Broker多主多从
    Broker支持多主多从集群,即使其中某台Master挂掉了,其他Master照样可以提供服务,而且挂掉的Master,其从节点照样可以通过选举得到一个新的Master。

broker集群的master宕机,slave是怎么提供服务的?master是怎么切换回来的?

为什么 RocketMQ 使用 NameServer 而不是 ZooKeeper 作为服务注册表

NameServer 具有高可用性,就算其中某台挂掉,其他服务器仍然能提供服务注册和查询功能。
ZooKeeper 的设计目标是高一致性,其中某台服务器挂掉,整个 ZooKeeper 集群就无法提供服务了——直到下一个Leader被选举出来为止。

NameServer是怎么感知Broker的变化的?

RocketMQ的事务消息是否完整实现了事务的ACID特性?

为什么要有Half Message?

  1. 可以先确认Broker服务器是否正常,如果半消息都发送失败了,就说明Broker挂了。
  2. 可以通过半消息来回查事务状态,如果半消息发出后一直没有被二次确认,就会回查事务状态。

    事务回查有两种情况:1、由于网络等原因一直没有执行事务的commit和rollback;2、本地事务执行成功了,但是返回commit的时候服务挂了,Broker最终也没有收到消息,因此还是半消息状态,因此仍会进行重试。

为什么说RocketMQ只能保证最终一致性?

比如以一个转帐功能为例,A账户扣减、发MQ消息通知另一个服务增加B账户的余额,扣减和增加是在两个事务中执行的,MQ虽然能保证两个事务最终一定都能执行上,但是并不能保证中间状态不会出现,比如某个时刻A账户扣减了、但是B账户仍为原状。

RocketMQ使用某个消息序号messageID消费某个队列的消息,时间复杂度是多少?(假设消息文件commitLog数量为m,每个消息文件中消息条数是k,索引文件consumerQueue的数量是n,队列中共有j条消息)

复杂度是O(1),因为消息序号中包含了消息在commitLog中的偏移量,因此可以直接通过偏移量来拿到消息。

参考

环境

  1. rocketmq 控制台搭建(rocketmq-console)
  2. Quick Start
  3. RocketMQ 源码解析 —— 调试环境搭建
  4. RocketMQ管理命令说明

原理

  1. 特性(features)
  2. 设计(design)
  3. RocketMQ 实战(三) - 消息的有序性
  4. 事务消息
  5. RocketMQ 源码分析

为什么使用RocketMQ

使用MQ的优点

  1. 解耦
    直连的情况下,每接入一个系统就需要直接改代码调新系统接口;
    使用 RocketMQ 的情况下,新系统可以自己监听消息。
  2. 异步
    非必要的业务逻辑异步处理,加快响应速度。
  3. 削峰
    直连的情况下,所有请求会直接全部打到下游服务,引入消息队列后,消息队列可以暂存消息,让下游服务慢慢消费;

使用 MQ 的缺点

系统可用性降低:消息队列挂掉将影响系统可用性。
系统复杂性增加:加入了消息队列,要多考虑很多方面的问题,比如:一致性问题、如何保证消息不被重复消费、如何保证消息可靠性传输等。因此,需要考虑的东西更多,系统复杂性增大。

使用 RocketMQ

搭建 Name Server 和 Broker Server(单点)

看下这个
为了方便,需要设置 Broker 服务器的一些属性,在 broker.conf 中:

1
2
3
4
# 允许使用SQL语法过滤消息
enablePropertyFilter=true
# 自动创建Topic,否则使用新Topic时还需要在控制台创建
autoCreateTopicEnable=true

对 Broker 服务器的启动可以使用下面的命令:

1
start mqbroker.cmd -n 127.0.0.1:9876 -c ../conf/broker.conf &

如果后续生产消息时报错:maybe your broker machine memory too small
出现这个问题的主要原因是磁盘空间不够了,而我的电脑磁盘一直都是稀缺资源,所以就出问题了。
可以修改 runbroker.sh,在里面增加一句话即可: JAVA_OPT=”${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98” 我这里把磁盘保护的百分比设置成98%,只有磁盘空间使用率达到98%时才拒绝接收producer消息。

搭建RocketMQ Console

控制台,方便创建Topic、统计消息、查看集群状态等。
看下这个

多主多从(2主多从)

同上下载源码并编译完成后,第一步是创建配置文件,我计划将所有主从放到同一台服务器上,因此在本地创建所有配置文件和数据存储目录:
1、创建数据存储目录

1
2
mkdir -p /home/hgc/Documents/project/open/rocketmq/store1/{rootdir-a-m,commitlog-a-m,rootdir-a-s,commitlog-a-s}
mkdir -p /home/hgc/Documents/project/open/rocketmq/store2/{rootdir-b-m,commitlog-b-m,rootdir-b-s,commitlog-b-s}

2、配置环境变量
/etc/profile文件中加入:

1
2
3
# RocketMQ
export ROCKETMQ_HOME=/home/hgc/Documents/project/open/rocketmq
export PATH=$PATH:$ROCKETMQ_HOME/bin

3、创建配置文件
/home/hgc/Documents/project/open/rocketmq/conf.1.m/broker.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
brokerClusterName = DefaultCluster
brokerName = rocketmq-nameserver-1
#brokerId 0 表示 Master,>0 表示 Slave
brokerId=0
# Broker 对外服务的监听端口
listenPort=10911
#nameServer地址,分号分割
namesrvAddr=127.0.0.1:9876
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=72
#Broker role有3种:SYNC_MASTER、ASYNC_MASTER、SLAVE。关键词SYNC和ASYNC表示Master和Slave之间同步消息的机制,SYNC即同步更新,指当Slave和Master消息同步完成后,再返回发送成功的状态。ASYNC即异步更新,master与slave有短暂消息延迟,毫> 秒级。本文在此使用了异步复制集群模式,线上环境推荐使用同步双写模式,即SYNC_MASTER
brokerRole=SYNC_MASTER
# 刷盘方式 ASYNC_FLUSH 异步刷盘
flushDiskType=ASYNC_FLUSH
#存储路径
storePathRootDir=/home/hgc/Documents/project/open/rocketmq/store1/rootdir-a-m
storePathCommitLog=/home/hgc/Documents/project/open/rocketmq/store1/commitlog-a-m
# 是否允许 Broker 自动创建Topic
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组
autoCreateSubscriptionGroup=true

/home/hgc/Documents/project/open/rocketmq/conf.2.m/broker.conf

1
2
3
4
5
6
7
8
9
10
11
12
brokerName=rocketmq-nameserver-2
listenPort=10921
namesrvAddr=rocketmq-nameserver-1:9876
brokerId=0
deleteWhen=04
fileReservedTime=72
brokerRole=SYNC_MASTER
storePathRootDir=/home/hgc/Documents/project/open/rocketmq/store2/rootdir-b-m
storePathCommitLog=/home/hgc/Documents/project/open/rocketmq/store2/commitlog-b-m
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
flushDiskType=ASYNC_FLUSH

/home/hgc/Documents/project/open/rocketmq/conf.1.s/broker.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
brokerClusterName = DefaultCluster
brokerName = rocketmq-nameserver-1
brokerId=1
listenPort=10917
namesrvAddr=127.0.0.1:9876
deleteWhen=04
fileReservedTime=72
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
storePathRootDir=/home/hgc/Documents/project/open/rocketmq/store1/rootdir-a-s
storePathCommitLog=/home/hgc/Documents/project/open/rocketmq/store1/commitlog-a-s
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true

/home/hgc/Documents/project/open/rocketmq/conf.2.s/broker.conf

1
2
3
4
5
6
7
8
9
10
11
12
brokerName=rocketmq-nameserver-2
listenPort=10927
namesrvAddr=127.0.0.1:9876
brokerId=1
deleteWhen=04
fileReservedTime=72
brokerRole=SLAVE
storePathRootDir=/home/hgc/Documents/project/open/rocketmq/store2/rootdir-b-s
storePathCommitLog=/home/hgc/Documents/project/open/rocketmq/store2/commitlog-b-s
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
flushDiskType=ASYNC_FLUSH

4、启动NameServer
启动前先修改启动脚本,因为是单机使用,把堆大小稍微改小点:

1
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
1
nohup sh bin/mqnamesrv &

5、启动Broker
同理改下启动脚本:

1
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g"

按master - slave的顺序启动每个Broker

1
2
3
4
nohup sh bin/mqbroker -c conf.1.m/broker.conf > conf.1.m/logs/broker.1.m.log &
nohup sh bin/mqbroker -c conf.2.m/broker.conf > conf.2.m/logs/broker.2.m.log &
nohup sh bin/mqbroker -c conf.1.s/broker.conf > conf.1.s/logs/broker.1.s.log &
nohup sh bin/mqbroker -c conf.2.s/broker.conf > conf.2.s/logs/broker.2.s.log &

6、查看注册情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 看看集群注册情况
./mqadmin clusterList -n 127.0.0.1:9876
# 创建一个Topic
./mqadmin updateTopic -c DefaultCluster -n localhost:9876 -t TopicTest
# 看看Topic列表
./mqadmin topicList -n 127.0.0.1:9876
# 看看Topic路由信息
./mqadmin topicRoute -n 127.0.0.1:9876 -t TopicTest
# 查看Topic统计信息
mqadmin topicStatus -n 127.0.0.1:9876 -t TopicTest
# 根据消息key查询消息
./mqadmin queryMsgByKey -n "127.0.0.1:9876" -t TopicTest -k messageKey
# 根据消息ID查询消息
./mqadmin queryMsgById -n "127.0.0.1:9876" -t TopicTest -i 240882208B134140BD0FB45F5DBD2075000018B4AAC2130BE69B0001
# 查询Producer的网络连接
./mqadmin producerConnection -n 127.0.0.1:9876 -g ExampleProducerGroup -t TopicTest
# 查询Consumer的网络连接
./mqadmin consumerConnection -n 127.0.0.1:9876 -g ExampleConsumerGroup -t TopicTest
# 查看订阅组的消费状态
consumerProgress
# 添加KV配置信息
updateKvConfig
# 删除KV配置信息
deleteKvConfig
# 添加project group配置信息
updateProjectGroup
# 删除project group配置信息
deleteProjectGroup
# 获取product group配置信息
getProjectGroup
# 设置消费进度
resetOffsetByTime
# 清除特定Broker权限
wipeWritePerm
# 获取Consumer消费进度
getConsumerStatus

7、使用客户端测试

8、关闭服务器

1
2
sh mqshutdown namesrv
sh mqshutdown broker

主从自动切换(DLedger)

  • enableDLegerCommitLog
    是否启用 DLedger,即是否启用 RocketMQ 主从切换,默认值为 false。如果需要开启主从切换,则该值需要设置为 true 。
  • dLegerGroup
    节点所属的 raft 组,建议与 brokerName 保持一致,例如 broker-a。
  • dLegerPeers
    集群节点信息,示例配置如下:n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913,多个节点用英文冒号隔开,单个条目遵循 legerSlefId-ip:端口,这里的端口用作 dledger 内部通信。
  • dLegerSelfId
    当前节点id。取自 legerPeers 中条目的开头,即上述示例中的 n0,并且特别需要强调,只能第一个字符为英文,其他字符需要配置成数字。
  • storePathRootDir
    DLedger 日志文件的存储根目录,为了能够支持平滑升级,该值与 storePathCommitLog 设置为不同的目录。

第一个节点的broker.conf:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
brokerClusterName = DefaultCluster
brokerName = broker1
brokerId = 0
listenPort=10911
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr=127.0.0.1:9876
storePathRootDir=/home/hgc/Documents/project/open/rocketmq-dleger/store1/
# 与dledger相关的属性
enableDLegerCommitLog=true
dLegerGroup=broker1
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
dLegerSelfId=n0

第二个节点的broker.conf:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
brokerClusterName = DefaultCluster
brokerName = broker1
brokerId = 0
listenPort=10921
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr=127.0.0.1:9876
storePathRootDir=/home/hgc/Documents/project/open/rocketmq-dleger/store2/
# 与dledger 相关的配置属性
enableDLegerCommitLog=true
dLegerGroup=broker1
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
dLegerSelfId=n1

第三个节点的broker.conf:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
brokerClusterName = DefaultCluster
brokerName = broker1
brokerId = 0
listenPort=10931
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr=127.0.0.1:9876
storePathRootDir=/home/hgc/Documents/project/open/rocketmq-dleger/store3/
# 与dledger 相关的配置属性
enableDLegerCommitLog=true
dLegerGroup=broker1
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
dLegerSelfId=n2

对于DLedger集群来说,一方面要保证消息仍能正常收发,另一方面还要保证Master挂掉后集群会选举出一个新的Master,可以将Master kill掉来模拟这种情况,在rocketmq-console中查看节点角色的变化情况。

客户端配置

消费模式

有以下3种:

  1. CONSUME_FROM_LAST_OFFSET:默认策略,从该队列最尾开始消费,即跳过历史消息
  2. CONSUME_FROM_FIRST_OFFSET:从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
  3. CONSUME_FROM_TIMESTAMP:从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前

其他概念

msgId:根据 queueNum 取模放到 broker 队列
instanceName:每个集群不一致,同一集群 groupName 不能重,两个 topic 的 groupName 不能一样
两种模式本质都是拉:push 是构建长连接,一次拉完,一个一个给 broker 回执 ack,多个 Consumer 平分(必须是 4 的倍数,不够再给余数,比如 9 个消息三个 Consumer 分别拉到 4、4、1),poll 模式批量,按设定拉取

消息的生成和消费

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>

消息生产:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public class ProducerTest {

/**
* 同步消息
*/
@Test
public void sendSyncMsg() throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}

/**
* 异步消息
*/
@Test
public void sendAsyncMsg() throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
// 根据消息数量实例化倒计时计算器
final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
for (int i = 0; i < messageCount; i++) {
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}

@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
// 等待5s
countDownLatch.await(5, TimeUnit.SECONDS);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}

/**
* 单向消息
*/
@Test
public void testSendOnewayMsg() throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 发送单向消息,没有任何返回结果
producer.sendOneway(msg);

}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}

消息消费:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ConsumerTest {

public static void main(String[] args) throws MQClientException {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

// 设置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");

// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe("TopicTest", "*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}

顺序消息

发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
public class OrderedProducer {

public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

producer.setNamesrvAddr("127.0.0.1:9876");

producer.start();

String[] tags = new String[]{"TagA", "TagC", "TagD"};

// 订单列表
List<OrderStep> orderList = new OrderedProducer().buildOrders();

Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加个时间前缀
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());

SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> {
Long id = (Long) arg; //根据订单id选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
}, orderList.get(i).getOrderId());//订单id

System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}

/**
* 订单的步骤
*/
private static class OrderStep {

private long orderId;
private String desc;

public long getOrderId() {
return orderId;
}

public void setOrderId(long orderId) {
this.orderId = orderId;
}

public String getDesc() {
return desc;
}

public void setDesc(String desc) {
this.desc = desc;
}

@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}

/**
* 生成模拟订单数据
*/
private List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<OrderStep>();

OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);

return orderList;
}
}

接收:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
*/
public class OrderedConsumer {

public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "TagA || TagC || TagD");

consumer.registerMessageListener(new MessageListenerOrderly() {

Random random = new Random();

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + " queueId=" + msg.getQueueId() + ", topic:" + msg.getTopic() + " content:" + new String(msg.getBody()));
}

try {
//模拟业务逻辑处理中...
// TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});

consumer.start();

System.out.println("Consumer Started.");
}
}

延时消息

延时消息可以用于一些需要延迟处理业务的场景,比如下单后超过一定时间没支付就自动取消。
发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ScheduledProducer {

public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TopicTest", "TagA", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
producer.send(message);
}
producer.shutdown();
}
}

接收:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ScheduledConsumer {

public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}

批量消息

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息,此外,这一批消息的总大小不应超过 4MB。
发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class BatchProducer {

public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String topic = "TopicTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}

接收逻辑并无不同:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class BatchConsumer {

public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}

注意我们上边提到的 4MB 的大小限制,如果消息列表中的消息变多了,很有可能会超过这个大小,这时最好对消息列表进行分割再发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class BatchProducer {

public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String topic = "TopicTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
splitSend(producer, messages);
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}

static void splitSend(MQProducer producer, List<Message> messages) {
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
//处理error
}
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class ListSplitter implements Iterator<List<Message>> {

private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;

public ListSplitter(List<Message> messages) {
this.messages = messages;
}

@Override
public boolean hasNext() {
return currIndex < messages.size();
}

@Override
public List<Message> next() {
int startIndex = getStartIndex();
int nextIndex = startIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = calcMessageSize(message);
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(startIndex, nextIndex);
currIndex = nextIndex;
return subList;
}

private int getStartIndex() {
Message currMessage = messages.get(currIndex);
int tmpSize = calcMessageSize(currMessage);
while (tmpSize > SIZE_LIMIT) {
currIndex += 1;
Message message = messages.get(currIndex);
tmpSize = calcMessageSize(message);
}
return currIndex;
}

private int calcMessageSize(Message message) {
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加⽇日志的开销20字节
return tmpSize;
}
}

消息订阅

RocketMQ 支持用 SQL 语法的语句来“模糊”订阅消息,只有使用 push 模式的消费者才能用使用 SQL92 标准的 sql 语句,接口如下:

1
public void subscribe(finalString topic, final MessageSelector messageSelector)
  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;
  • 字符,比如:’abc’,必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUE 或 FALSE

消息发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class TagProducer {

public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest",
"TagA",
("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置一些属性
msg.putUserProperty("a", "2");
SendResult sendResult = producer.send(msg);
producer.shutdown();
}
}

消息消费:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class TagConsumer {

public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("localhost:9876");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3"));
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.println(new String(msgs.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}

事务消息

事务消息有3种状态:

  • TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
  • TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
  • TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

发送事务消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class TransactionProducer {

public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
});
producer.setExecutorService(executorService);
// 事务监听
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}

实现事务监听接口,在发送半消息成功后会触发executeLocalTransaction方法来执行本地事务,checkLocalTransaction用于检查本地事务状态,并回应消息队列的检查请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}

事务消息使用上的限制:

  • 事务消息不支持延时消息和批量消息。
  • 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。
  • 事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
  • 事务性消息可能不止一次被检查或消费。
  • 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  • 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ 服务器能通过它们的生产者 ID 查询到消费者。

RocketMQ的运维

海量数据处理方法

当数据量特别大时,首先排除直接加载到内存中的算法,最核心的思路往往是分治,比如排序时先将大文件分割成多个足以加载到内存中的小文件,然后利用内存排序算法分别排序得到有序的一些小文件,最后通过归并排序得到最终的结果。

阅读全文 »

MyBatis 整体结构

MyBatis结构

配置文件

配置类提供的功能几乎贯穿了整个处理过程:

  1. 解析 Xml 文件
  2. 创建 SQL 处理器 Executor
  3. 对语句进行缓存 MappedStatement

怎么定位路径

  • getResourceAsStream

怎么解析文件

xml 文件的解析方式有两种,一种 DOM 是直接读入整个 xml 文件,根据标签的嵌套关系构建一棵文档树;另一种方式叫 SAX(Simple API for XML),是一种事件驱动的文档解析方式,什么是事件驱动呢?比如说 SAX 驱动扫描到了起始标签,就代表发生了一个事件,它会转而调用某个由用户定义的函数(startElement)执行逻辑。
有一种设计原则叫好莱坞法则(Hollywood),形象地说就是“你不要 call 我,需要你时我会 call 你”,一个例子是异步调用,这是一种通信机制,客户端在发出请求后不必等待服务端处理完毕就可以返回处理自己的逻辑,等到服务端处理完毕后再将结果传回,这种方式一定程度上可以解决客户端长期阻塞的问题、改善用户体验,回调函数也是一个例子。
据网上的说法,DOM 需要一次构建整棵 DOM 树,所以比较占内存,不适合大的 xml 文档解析,但是由于 DOM 树上可以任意遍历,所以自由度很高,相对来说,SAX 是读到什么就调用什么回调函数,所以内存占用小,但是编程多少会复杂一些。

构建数据库连接

数据库连接

SqlSessionFactoryBuilder

应用了建造者模式,根据配置文件来创建 SqlSessionFactory,创建后其任务就结束了,生命周期在一个方法内。

SqlSessionFactory

创建和数据库连接的工具,在整个应用运行期间应该作为一个单例存在,或者使用依赖注入管理其生命周期。

SqlSession

代表和数据库的一次连接,在 MyBatis 中其实现是线程不安全的,生命周期最好控制在一次请求之间。

数据源

  • DBCP
  • C3P0
  • Druid
  • MyBatis 内置数据源(UNPOOLED、POOLED、JNDI)
  • 自定义数据源

映射器

  • mapper 文件
  • 注解

SQL 执行

SqlSession 本身是可以直接执行 sql 语句的,它的所有 update、query 等方法都是对语句进行了包装(MappedStatement),然后再调用 Executor 的相应方法,Executor 是执行器,是 MyBatis 的核心。
SQL 的执行是由 Executor 负责的,Executor 对象是和 SqlSession 同时创建的,SqlSessionFactory 会为 Executor 创建事务,事务类默认为 ManagedTransactionFactory,Executor 需要从事务对象获取数据库连接(包装上一层事务后扩展性更好),事务会从环境对象中获取 DataSource 对象,然后委托 DataSource 创建连接,并且可以根据事务等级来为连接设置事务。说白了,把 Config 对象传给新建的 Transaction,由 Transaction 创建连接。
Executor 并不是直接执行 SQL 语句,SQL 语句由 MappedStatement 包装,再交给 StatementHandler 执行

Executor

MyBatis 提供 4 种 Executor,他们都继承于 BaseExecutor
BaseExecutor 是一个抽象类,实现了延迟加载、一级缓存(PerpetualCache)等功能
SimpleExecutor 语句使用 PreparedStatement 保存,使用 StatementHandler 处理
ReuseExecutor 与 SimpleExecutor 的区别是它使用一个 Map<String, Statement>来缓存 SQL 语句对应的 Statement,如果某些 Sql 复杂且使用频繁的话可以使用这个执行器,因为这个 Map 不是静态的,并且 MyBatis 实际上会为每个新建的 SqlSession 创建一个 Executor,所以这个缓存只在同一个 Session 内有效

1
2
3
4
5
6
7
8
9
10
private final Map<String, Statement> statementMap = new HashMap<String, Statement>();
if (hasStatementFor(sql)) {
//如果缓存中已经有了,直接得到Statement
stmt = getStatement(sql);
} else {
//如果缓存没有找到,则和SimpleExecutor处理完全一样,然后加入缓存
Connection connection = getConnection(statementLog);
stmt = handler.prepare(connection);
putStatement(sql, stmt);
}

BatchExecutor(批量执行器) 将一些 SQL 语句放在一个 List 中,最后 doFlushStatements 一块执行,并且如果两个相邻的 SQL 语句是相同的,还会复用前一个 Statement 对象。
CachingExecutor(二级缓存执行器) 为什么说是二级缓存?一级缓存由 BaseExecutor 中的 PerpetualCache 实现,CachingExecutor 会先在二级缓存中查找,如果找不到再委托给 delegate 执行,delegate 是 BaseExecutor 的子类,当然有一级缓存的功能。

参数类型和返回值

我们很多时候会指定 parameterType 和 resultType 为复杂类型,怎么将这些类型和数据库表结构进行映射正是 orm 框架的任务之一。
parameterType 表示传入参数类型,在 sql 语句中可以使用#{参数名}来调用,比如

1
sqlSession.selectOne("com.tallate.UserMapper.selectUser", 1);

传入了一个 Integer 类型的参数 1,那么 PreparedStatementHandler 在准备语句时,应该对这个参数的类型进行判断,这个是由 ParameterHandler 负责的。
resultType 表示返回值类型,PreparedStatementHandler 在获得 ResultSet 后应该将查询到的表记录转换为 Java 对象,这个是由 ResultSetHandler 负责的,它最终会调用对应类的构造函数将查询出的结果传入。

动态代理

我们平常使用 MyBatis 时都会定义一个 XXMapper 接口,对应 mapper.xml 中的一个 namespace,而且我们也不必显示写出其实现类,调用过程都是由动态代理实现的。
一般来说,代理类和被代理类应该实现相同的接口,但是现在我们的被代理类是一个 xxmapper.xml 文件,所以问题现在变成了:怎么将 xxmapper.xml 文件转换成被代理类。
查看源码中的 MapperProxyFactory 和 MapperProxy 可以知道,MyBatis 实现 Mapper 接口其实是调用了 SqlSession 中的方法(select、selectOne 等,已经实现了),但是它们的方法名并不相同,比如 selectUser 怎么和 selectOne 关联上呢?
MapperProxy 的 invoke 方法并不是直接调用被代理对象的方法,而是使用 MapperMethod 来表示映射的方法,通过 MapperMethod 可以判断接口方法的返回值、方法名等来确定应该调用 SqlSession 的哪个方法

  1. 启动时 XMLConfigBuilder 会为 config.xml 中所有 mapper 节点扫描包下所有映射器
  2. 创建对应的映射 interface -> MapperProxyFactory
  3. 添加动态代理对象到 MapperRegistry 中(我为了方便,直接加到 Config 中了,其实是刚开始对 MapperRegistry 的功能理解错了…)
  4. 之后每次 getMapper,都可以根据接口名来找到对应的动态代理对象,调用方法时实际上是调用了相应的 MapperMethod

并发

有哪些资源是中心化的?如果是,会不会被多线程同时访问?在 web 环境中,假设每个用户代表一个线程,当他们同时访问服务器就会出现并发问题。

  • 线程池(数据源)
    如果线程池是使用链表(LinkedList)实现的,可以使用 Collections.synchronizedList 进行包装,或者直接使用 Vector
  • Map<String, MappedStatement> MappedStatements
    使用 Map 容器储存 MappedStatement,MappedStatement 表示调用语句到 sql 语句的映射,比如”namespace.selectUser”到 mapper.xml 中对应的 sql 语句(使用 SqlSource 包装)。
  • List environments
    表示 config.xml 中注册的所有环境对象列表
  • List mappers
    表示 config.xml 中注册的所有 mapper 的列表
  • Map<Method, MapperMethod> methodCache
    MapperProxyFactory 中的映射器方法缓冲是使用 ConcurrentHashMap 实现的

QA

  1. 一级缓存不够吗?为什么要有二级缓存?
    一级缓存是会话级缓存,在 BaseExecutor 中,是成员变量,生命周期在一个 SqlSession 内,连接断开就没了;
    二级缓存是语句级缓存,在 MappedStatement 中,可以跨多个 SqlSession,当一些数据不常发生变化或者允许偶尔的并发的时候,二级缓存可能更有效率。
  2. 为什么不推荐使用 MyBatis 中的缓存?
    一级缓存会产生脏数据。因为作用范围是会话,如果有俩会话,会话 1 加载数据到缓存,会话 2 修改该条数据,之后会话 1 读到的是缓存里的老数据。
    二级缓存同样会产生脏数据。二级缓存作用范围是语句,需要手动刷新或在 xml 中配置需要刷新,一般在写入操作和事务提交后都需要刷新一下。但是如果表 A 的 Amapper.xml 中关联了表 B,即使表 B 的数据有变更,我们在 Amapper.xml 中执行查询语句仍然会读到缓存中的脏数据。
  3. MyBatis 与 JDBC 对象之前的关联?
    ParameterStatement - ParameterStatementHandler
    SimpleStatement - SimpleStatementHandler
    ResultSet - ResultSetHandler
0%