Tallate

该吃吃该喝喝 啥事别往心里搁

二叉树、平衡树、AVL 树和红黑树

二叉树是一类特殊的树形结构,其他类似的还有三叉树、B 树、B+树等,二叉树的特征是 1 对 2,即每个节点都有 2 个子节点(这里认为空节点也算子节点)。

这么定义主要是为了避免将二叉树的实现局限于指针,实际上我们也可以使用数组来实现二叉树,也就是二叉堆。

二叉树所有操作的时间复杂度为O(logN),但是它存在的主要问题是不稳定,比如数据是从小到大依次插入的情况下,最终结果是得到一条斜线,这时二叉树会退化为链表。
平衡树的特点是在每次写入操作后会进行一次重平衡,让树的高度保持在O(logN)
AVL 树是平衡树的一种,它严格保证树的高度为O(logN),每次都会根据高度重平衡,其缺点是过于严格,会导致旋转操作占用比较多的时间。
红黑树作为 AVL 树的一种替代,通过红黑规则控制树的旋转,能以较少次旋转作为代价得到较为平衡的树。

AVL 树严格保证树的高度在[logN, logN + 1],红黑树理论上极端情况可以出现高度达到 2*logN,但是现实中很难遇到。

红黑树的起源 - 23树

23树的分裂
23树并不是指3叉树,23树是一种平衡树,不过它维持平衡的方式并不是旋转,而是分裂,如上图所示:

  • 一个节点有至多3个元素,当达到3个元素后会发生分裂;
  • 分裂后中间元素上移,与父节点合并。

23树可以证明高度在log3(N)=(lgN)/(lg3)(如果都是3-nodes即2元素节点)到lgN(如果都是2-nodes即1元素节点),从而保证查询时顶多查lgN个节点。
23树的缺点是实现的额外开销过大,比如要变更节点类型、比较是否相等时要比较节点中的所有元素值等,有时候23树的性能甚至不如普通的BST,因此Sedgewick之后便提出了红黑树。

红黑树的定义

红黑树是从23树演化而来的,它将原来2-3树中的3-nodes表示为使用红线连接的两个节点,因为每个节点只有一条线连上它,因此为了简单起见把颜色字段保存到节点里。
23树和红黑树
红黑树的定义:

  1. Red links lean left. - 红色节点必须在左边
  2. No node has two red links connected to it. - 3个红色节点不能连一起
  3. The tree has perfect black balance: every path from the root to a null link has the same number of black links. - 红黑树是完美黑平衡的,从root沿任意路径到达叶节点的黑节点数量都是一样的。

以上3个定义其实和2-3树的定义是一一对应的:

  1. 和左红节点连一块可以看作2-3树中的一个3-node;
  2. 2-3树中一个节点中塞满3个元素后就会分裂;
  3. 把红黑树中节点和其左红子节点合并后,最终的树其实和2-3树等价。

红黑树的红黑规则

  1. 任何一个节点都有颜色,黑色或者红色;
  2. 根节点是黑色的;
  3. 空节点(有些实现中叶节点是哨兵节点nil)被认为是黑色的。
  4. 父子节点之间不能出现两个连续的红节点(如果父节点是红色,则它的两个儿子都是黑色);
  5. 任何一个节点向下遍历到其子孙的叶子节点,所经过的黑节点个数必须相等;

红黑树操作 - 查询

红黑树的查询就是普通的二叉树查询。

红黑树操作 - 旋转

左旋操作将右子节点旋转到父节点位置,并改变二者的颜色。
红黑树左旋
右旋同理:
红黑树右旋

红黑树操作 - 插入

红黑树的插入操作和 BST 差不多,只不过插入后可能会破坏上面红黑树的定义,因此需要做一些旋转和颜色修改操作来恢复。
很多地方描述红黑树的方式并不一致,这里还是以《算法》中的实现为主。
插入节点到一个3-node的3种情况

  • 可以看到上面3种情况最终都转换成了一个三角形的形状,然后进行了颜色的翻转,实际上相当于2-3树中一个3元素节点分裂成了3个。

下图是一个插入节点到红黑树中的例子,其中被红线连接的子节点是红色的节点:
红黑树节点插入轨迹

红黑树操作 - 删除

参考

  1. 《Algorithms》 - Robert_Sedgewick
    红黑树原来是从23树演化而来,之前以为是凭空编出来的。
  2. 《Algorithms》中的红黑树实现

实验内容

1-3:了解 Linux 的启动过程、用户空间是如何与内核空间进行交互的。
4-5:了解进程的执行原理、状态的转换过程,及进程之间是如何转换的。
6:通过信号量机制了解内核是如何实现同步的。
7、9:了解内核的内存和文件系统的实现原理。
8:了解内核如何实现设备管理,明白驱动是如何实现的。
10:图形界面是开课老师提到的扩展实验。

阅读全文 »

分布式一致性协议的演化

2PC

3PC

与 2PC 的区别是:

  1. 将 2PC 的 prepare 阶段拆为 CanCommit 和 PreCommit 两个阶段,其中:
    • CanCommit 阶段:参与者需要判断是否可以执行事务提交操作
      这个阶段如果出现超时、某个参与者返回 No 等情况,事务直接失败;
      在所有参与者 CanCommit 阶段都返回 Yes 响应后可以进入 PreCommit 阶段;
    • PreCommit 阶段:预提交即执行事务操作,并记录 undo 和 redo 日志,这时本地事务还未 commit
      这个阶段如果有任何一个参与者向协调者返回了 No 响应或超时协调者还未接收到参与者的响应,则协调者会开始执行事务的 abort;
      如果所有参与者都返回了 ACK 响应,则可以进入最终的 doCommit 阶段;
    • abort:上一步,如果要开始执行中断事务,协调者会向所有参与者发送 abort 请求,参与者接收
    • doCommit 阶段:

Paxos

与 2PC(及其衍生协议)区别

Paxos 协议和 2PC 协议在分布式系统中所起的作用并不相同。Paxos 协议用于保证同一个数据分片的多个副本之间的数据一致性。当这些副本分布到不同的数据中心时,这个需求尤其强烈。2PC 协议用于保证属于多个数据分片上的操作的原子性。这些数据分片可能分布在不同的服务器上,2PC 协议保证多台服务器上的操作要么全部成功,要么全部失败。
Paxos 协议有两种用法:一种用法是用它来实现全局的锁服务或者命名和配置服务,例如 Google Chubby 以及 Apache Zookeeper。另外一种用法是用它来将用户数据复制到多个数据中心,例如 Google Megastore 以及 Google Spanner。
2PC 协议最大的缺陷在于无法处理协调者宕机问题。如果协调者宕机,那么,2PC 协议中的每个参与者可能都不知道事务应该提交还是回滚,整个协议被阻塞,执行过程中申请的资源都无法释放。因此,常见的做法是将 2PC 和 Paxos 协议结合起来,通过 2PC 保证多个数据分片上的操作的原子性,通过 Paxos 协议实现同一个数据分片的多个副本之间的一致性(一般)。另外,通过 Paxos 协议解决 2PC 协议中协调者宕机问题。当 2PC 协议中的协调者出现故障时,通过 Paxos 协议选举出新的协调者继续提供服务,可以解决单点故障问题(但是你确定服务参与者足够多能够发起一次选举吗?)。

Paxos

Basic Paxos(单值Paxos)

目前在多个proposer可以同时发起提议的情况下,满足P1、P2a即能做到确定并只确定一个值。如果再加上节点宕机恢复、消息丢包的考量呢?
假设acceptor c 宕机一段时间后恢复,c 宕机期间其他acceptor已经确定了一项值为v的决议但c 因为宕机并不知晓;c 恢复后如果有proposer马上发起一项值不是v的提议,由于条件P1,c 会接受该提议,这与P2a矛盾。为了避免这样的情况出现,进一步地我们对proposer作约束:

1
P2b. 如果一项值为v的提议被确定,那么proposer后续只发起值为v的提议

满足P2b则P2a成立 (P2b => P2a => P2)。
P2b约束的是提议被确定(chosen)后proposer的行为,我们更关心提议被确定前proposer应该怎么做:

1
P2c. 对于提议(n,v),acceptor的多数派S中,如果存在acceptor最近一次(即ID值最大)接受的提议的值为v',那么要求v = v';否则v可为任意值

满足P2c则P2b成立 (P2c => P2b => P2a => P2)。

条件P2c是Basic Paxos的核心,光看P2c的描述可能会觉得一头雾水,我们通过 The Part-Time Parliament中的例子加深理解:
BasicPaxos例子
假设有A~E 5个acceptor,- 表示acceptor因宕机等原因缺席当次决议,x 表示acceptor不接受提议,o 表示接受提议;多数派acceptor接受提议后提议被确定,以上表格对应的决议过程如下:
ID为2的提议最早提出,根据P2c其提议值可为任意值,这里假设为a
acceptor A/B/C/E 在之前的决议中没有接受(accept)任何提议,因而ID为5的提议的值也可以为任意值,这里假设为b
acceptor B/D/E,其中D曾接受ID为2的提议,根据P2c,该轮ID为14的提议的值必须与ID为2的提议的值相同,为a
acceptor A/C/D,其中D曾接受ID为2的提议、C曾接受ID为5的提议,相比之下ID 5较ID 2大,根据P2c,该轮ID为27的提议的值必须与ID为5的提议的值相同,为b;该轮决议被多数派acceptor接受,因此该轮决议得以确定
acceptor B/C/D,3个acceptor之前都接受过提议,相比之下C、D曾接受的ID 27的ID号最大,该轮ID为29的提议的值必须与ID为27的提议的值相同,为b
以上提到的各项约束条件可以归纳为3点,如果proposer/acceptor满足下面3点,那么在少数节点宕机、网络分化隔离的情况下,在“确定并只确定一个值”这件事情上可以保证一致性(consistency):
B1(ß): ß中每一轮决议都有唯一的ID标识
B2(ß): 如果决议B被acceptor多数派接受,则确定决议B
B3(ß): 对于ß中的任意提议B(n,v),acceptor的多数派中如果存在acceptor最近一次(即ID值最大)接受的提议的值为v’,那么要求v = v’;否则v可为任意值
(注: 希腊字母ß表示多轮决议的集合,字母B表示一轮决议)
另外为保证P2c,我们对acceptor作两个要求:

  1. 记录曾接受的ID最大的提议,因proposer需要问询该信息以决定提议值
  2. 在回应提议ID为n的proposer自己曾接受过ID最大的提议时,acceptor同时保证(promise)不再接受ID小于n的提议
    至此,proposer/acceptor完成一轮决议可归纳为prepare和accept两个阶段。prepare阶段proposer发起提议问询提议值、acceptor回应问询并进行promise;accept阶段完成决议,图示如下:
    accept流程
    还有一个问题需要考量,假如proposer A发起ID为n的提议,在提议未完成前proposer B又发起ID为n+1的提议,在n+1提议未完成前proposer C又发起ID为n+2的提议…… 如此acceptor不能完成决议、形成活锁(livelock),虽然这不影响一致性,但我们一般不想让这样的情况发生。解决的方法是从proposer中选出一个leader,提议统一由leader发起。
    最后我们再引入一个新的角色:learner,learner依附于acceptor,用于习得已确定的决议。以上决议过程都只要求acceptor多数派参与,而我们希望尽量所有acceptor的状态一致。如果部分acceptor因宕机等原因未知晓已确定决议,宕机恢复后可经本机learner采用pull的方式从其他acceptor习得。

Multi Paxos(多值Paxos)

通过以上步骤分布式系统已经能确定一个值,“只确定一个值有什么用?这可解决不了我面临的问题。” 你心中可能有这样的疑问。
其实不断地进行“确定一个值”的过程、再为每个过程编上序号,就能得到具有全序关系(total order)的系列值,进而能应用在数据库副本存储等很多场景。我们把单次“确定一个值”的过程称为实例(instance),它由proposer/acceptor/learner组成,下图说明了A/B/C三机上的实例:
MultiPaxos
不同序号的实例之间互相不影响,A/B/C三机输入相同、过程实质等同于执行相同序列的状态机(state machine)指令 ,因而将得到一致的结果。
proposer leader在Multi Paxos中还有助于提升性能,常态下统一由leader发起提议,可节省prepare步骤(leader不用问询acceptor曾接受过的ID最大的提议、只有leader提议也不需要acceptor进行promise)直至发生leader宕机、重新选主。

总结

以上介绍了Paxos的推演过程、如何在Basic Paxos的基础上通过状态机构建Multi Paxos。
微信后台开发同学实现并开源了一套基于Paxos协议的多机状态拷贝类库PhxPaxos,PhxPaxos用于将单机服务扩展到多机,其经过线上系统验证并在一致性保证、性能等方面作了很多考量。

Paxos变种和优化

如果想把Paxos应用于工程实践,了解基本原理还不够。
有很多基于Paxos的优化,在保证一致性协议正确(safety)的前提下,减少Paxos决议通信步骤、避免单点故障、实现节点负载均衡,从而降低时延、增加吞吐量、提升可用性,下面我们就来了解这些Paxos变种。

Multi Paxos

首先我们来回顾一下Multi Paxos,Multi Paxos在Basic Paxos的基础上确定一系列值,其决议过程如下:
MultiPaxos例子
phase1a: leader提交提议给acceptor
phase1b: acceptor返回最近一次接受的提议(即曾接受的最大的提议ID和对应的value),未接受过提议则返回空
phase2a: leader收集acceptor的应答,分两种情况处理
phase2a.1: 如果应答内容都为空,则自由选择一个提议value
phase2a.2: 如果应答内容不为空,则选择应答里面ID最大的提议的value
phase2b: acceptor将决议同步给learner
Multi Paxos中leader用于避免活锁,但leader的存在会带来其他问题,一是如何选举和保持唯一leader(虽然无leader或多leader不影响一致性,但影响决议进程progress),二是充当leader的节点会承担更多压力,如何均衡节点的负载。Mencius[1]提出节点轮流担任leader,以达到均衡负载的目的;租约(lease)可以帮助实现唯一leader,但leader故障情况下可导致服务短期不可用。

Fast Paxos

在Multi Paxos中,proposer -> leader -> acceptor -> learner,从提议到完成决议共经过3次通信,能不能减少通信步骤?
对Multi Paxos phase2a,如果可以自由提议value,则可以让proposer直接发起提议、leader退出通信过程,变为proposer -> acceptor -> learner,这就是Fast Paxos[2]的由来。
FastPaxos例子
Multi Paxos里提议都由leader提出,因而不存在一次决议出现多个value,Fast Paxos里由proposer直接提议,一次决议里可能有多个proposer提议、出现多个value,即出现提议冲突(collision)。leader起到初始化决议进程(progress)和解决冲突的作用,当冲突发生时leader重新参与决议过程、回退到3次通信步骤。
Paxos自身隐含的一个特性也可以达到减少通信步骤的目标,如果acceptor上一次确定(chosen)的提议来自proposerA,则当次决议proposerA可以直接提议减少一次通信步骤。如果想实现这样的效果,需要在proposer、acceptor记录上一次决议确定(chosen)的历史,用以在提议前知道哪个proposer的提议上一次被确定、当次决议能不能节省一次通信步骤。

EPaxos

除了从减少通信步骤的角度提高Paxos决议效率外,还有其他方面可以降低Paxos决议时延,比如Generalized Paxos[3]提出不冲突的提议(例如对不同key的写请求)可以同时决议、以降低Paxos时延。
更进一步地,EPaxos[4](Egalitarian Paxos)提出一种既支持不冲突提议同时提交降低时延、还均衡各节点负载、同时将通信步骤减少到最少的Paxos优化方法。
为达到这些目标,EPaxos的实现有几个要点。一是EPaxos中没有全局的leader,而是每一次提议发起提议的proposer作为当次提议的leader(command leader);二是不相互影响(interfere)的提议可以同时提交;三是跳过prepare,直接进入accept阶段。EPaxos决议的过程如下:
EPaxos的accept过程

小结

以上介绍了几个基于Paxos的变种,Mencius中节点轮流做leader、均衡节点负载,Fast Paxos减少一次通信步骤,Generalized Paxos允许互不影响的决议同时进行,EPaxos无全局leader、各节点平等分担负载。

Raft

Paxos 偏向于理论、对如何应用到工程实践提及较少。理解的难度加上现实的骨感,在生产环境中基于 Paxos 实现一个正确的分布式系统非常难。
Raft 在 2013 年提出,提出的时间虽然不长,但已经有很多系统基于 Raft 实现。相比 Paxos,Raft 的优点就是更利于理解、更易于实行。
为达到更容易理解和实行的目的,Raft 将问题分解和具体化:Leader 统一处理变更操作请求,一致性协议的作用具化为保证节点间操作日志副本(log replication)一致,以 term 作为逻辑时钟(logical clock)保证时序,节点运行相同状态机(state machine)得到一致结果。Raft 协议具体过程如下:

Raft执行流程

  1. Client 发起请求,每一条请求包含操作指令
  2. 请求交由 Leader 处理,Leader 将操作指令(entry)追加(append)至操作日志,紧接着对 Follower 发起 AppendEntries 请求、尝试让操作日志副本在 Follower 落地
  3. 如果 Follower 多数派(quorum)同意 AppendEntries 请求,Leader 进行 commit 操作、把指令交由状态机处理
  4. 状态机处理完成后将结果返回给 Client

指令通过 log index(指令 id)和 term number 保证时序,正常情况下 Leader、Follower 状态机按相同顺序执行指令,得出相同结果、状态一致。
宕机、网络分化等情况可引起 Leader 重新选举(每次选举产生新 Leader 的同时,产生新的 term)、Leader/Follower 间状态不一致。Raft 中 Leader 为自己和所有 Follower 各维护一个 nextIndex 值,其表示 Leader 紧接下来要处理的指令 id 以及将要发给 Follower 的指令 id,LnextIndex 不等于 FnextIndex 时代表 Leader 操作日志和 Follower 操作日志存在不一致,这时将从 Follower 操作日志中最初不一致的地方开始,由 Leader 操作日志覆盖 Follower,直到 LnextIndex、FnextIndex 相等。
Paxos 中 Leader 的存在是为了提升决议效率,Leader 的有无和数目并不影响决议一致性,Raft 要求具备唯一 Leader,并把一致性问题具体化为保持日志副本的一致性,以此实现相较 Paxos 而言更容易理解、更容易实现的目标。

Zab

Zab 的全称是 Zookeeper atomic broadcast protocol,是 Zookeeper 内部用到的一致性协议。相比 Paxos,Zab 最大的特点是保证强一致性(strong consistency,或叫线性一致性 linearizable consistency)。
和 Raft 一样,Zab 要求唯一 Leader 参与决议,Zab 可以分解成 discovery、sync、broadcast 三个阶段:
Zab原理

  • discovery
    选举产生PL(prospective leader),PL 收集Follower epoch(cepoch),根据Follower的反馈 PL 产生newepoch(每次选举产生新 Leader 的同时产生新 epoch,类似 Raft 的 term)。
  • sync
    PL 补齐相比 Follower 多数派缺失的状态、之后各 Follower 再补齐相比 PL 缺失的状态,PL 和 Follower 完成状态同步后 PL 变为**正式 Leader(established leader)**。
  • broadcast
    Leader 处理 Client 的写操作,并将状态变更广播至 Follower,Follower多数派通过之后 Leader 发起将状态变更**提交(deliver/commit)**。

Leader 和 Follower 之间通过心跳判别健康状态,正常情况下 Zab 处在 broadcast 阶段,出现 Leader 宕机、网络隔离等异常情况时 Zab 重新回到 discovery 阶段。

Zookeeper介绍

Zookeeper是一个开源的分布式协调服务,其设计目标是将那些复杂的且容易出错的分布式一致性服务封装起来,构成一个高效可靠的原语集,并以一些列简单的接口提供给用户使用。其是一个典型的分布式数据一致性的解决方案,分布式应用程序可以基于它实现诸如数据发布/发布、负载均衡、命名服务、分布式协调/通知、集群管理、Master选举、分布式锁和分布式队列等功能。其可以保证如下分布式一致性特性。

  1. 顺序一致性,从同一个客户端发起的事务请求,最终将会严格地按照其发起顺序被应用到Zookeeper中去。
  2. 原子性,所有事务请求的处理结果在整个集群中所有机器上的应用情况是一致的,即整个集群要么都成功应用了某个事务,要么都没有应用。
  3. 单一视图,无论客户端连接的是哪个Zookeeper服务器,其看到的服务端数据模型都是一致的。
  4. 可靠性,一旦服务端成功地应用了一个事务,并完成对客户端的响应,那么该事务所引起的服务端状态变更将会一直被保留,除非有另一个事务对其进行了变更。
  5. 实时性,Zookeeper保证在一定的时间段内,客户端最终一定能够从服务端上读取到最新的数据状态。

设计目标

Zookeeper致力于提供一个高性能、高可用、且具有严格的顺序访问控制能力(主要是写操作的严格顺序性)的分布式协调服务,其具有如下的设计目标。

  1. 简单的数据模型,Zookeeper使得分布式程序能够通过一个共享的树形结构的名字空间来进行相互协调,即Zookeeper服务器内存中的数据模型由一系列被称为ZNode的数据节点组成,Zookeeper将全量的数据存储在内存中,以此来提高服务器吞吐、减少延迟的目的。
  2. 可构建集群,一个Zookeeper集群通常由一组机器构成,组成Zookeeper集群的而每台机器都会在内存中维护当前服务器状态,并且每台机器之间都相互通信。
  3. 顺序访问,对于来自客户端的每个更新请求,Zookeeper都会分配一个全局唯一的递增编号,这个编号反映了所有事务操作的先后顺序。
  4. 高性能,Zookeeper将全量数据存储在内存中,并直接服务于客户端的所有非事务请求,因此它尤其适用于以读操作为主的应用场景。

基本概念

  1. 集群角色,最典型的集群就是Master/Slave模式(主备模式),此情况下把所有能够处理写操作的机器称为Master机器,把所有通过异步复制方式获取最新数据,并提供读服务的机器为Slave机器。Zookeeper引入了Leader、Follower、Observer三种角色,Zookeeper集群中的所有机器通过Leader选举过程来选定一台被称为Leader的机器,Leader服务器为客户端提供写服务,Follower和Observer提供读服务,但是Observer不参与Leader选举过程,不参与写操作的过半写成功策略,Observer可以在不影响写性能的情况下提升集群的性能。
  2. 会话,指客户端会话,一个客户端连接是指客户端和服务端之间的一个TCP长连接,Zookeeper对外的服务端口默认为2181,客户端启动的时候,首先会与服务器建立一个TCP连接,从第一次连接建立开始,客户端会话的生命周期也开始了,通过这个连接,客户端能够心跳检测与服务器保持有效的会话,也能够向Zookeeper服务器发送请求并接受响应,同时还能够通过该连接接受来自服务器的Watch事件通知。
  3. 数据节点,第一类指构成集群的机器,称为机器节点,第二类是指数据模型中的数据单元,称为数据节点-Znode,Zookeeper将所有数据存储在内存中,数据模型是一棵树,由斜杠/进行分割的路径,就是一个ZNode,如/foo/path1,每个ZNode都会保存自己的数据内存,同时还会保存一些列属性信息。ZNode分为持久节点和临时节点两类,持久节点是指一旦这个ZNode被创建了,除非主动进行ZNode的移除操作,否则这个ZNode将一直保存在Zookeeper上,而临时节点的生命周期和客户端会话绑定,一旦客户端会话失效,那么这个客户端创建的所有临时节点都会被移除。另外,Zookeeper还允许用户为每个节点添加一个特殊的属性:SEQUENTIAL。一旦节点被标记上这个属性,那么在这个节点被创建的时候,Zookeeper会自动在其节点后面追加一个整形数字,其是由父节点维护的自增数字。
  4. 版本,对于每个ZNode,Zookeeper都会为其维护一个叫作Stat的数据结构,Stat记录了这个ZNode的三个数据版本,分别是version(当前ZNode的版本)、cversion(当前ZNode子节点的版本)、aversion(当前ZNode的ACL版本)。
  5. Watcher,Zookeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,Zookeeper服务端会将事件通知到感兴趣的客户端。
  6. ACL,Zookeeper采用ACL(Access Control Lists)策略来进行权限控制,其定义了如下五种权限:
    · CREATE:创建子节点的权限。
    · READ:获取节点数据和子节点列表的权限。
    · WRITE:更新节点数据的权限。
    · DELETE:删除子节点的权限。
    · ADMIN:设置节点ACL的权限。

ZAB协议

Zookeeper使用了Zookeeper Atomic Broadcast(ZAB,Zookeeper原子消息广播协议)的协议作为其数据一致性的核心算法。ZAB协议是为Zookeeper专门设计的一种支持崩溃恢复的原子广播协议。
Zookeeper依赖ZAB协议来实现分布式数据的一致性,基于该协议,Zookeeper实现了一种主备模式的系统架构来保持集群中各副本之间的数据的一致性,即其使用一个单一的主进程来接收并处理客户端的所有事务请求,并采用ZAB的原子广播协议,将服务器数据的状态变更以事务Proposal的形式广播到所有的副本进程中,ZAB协议的主备模型架构保证了同一时刻集群中只能够有一个主进程来广播服务器的状态变更,因此能够很好地处理客户端大量的并发请求。
ZAB协议的核心是定义了对于那些会改变Zookeeper服务器数据状态的事务请求的处理方式,即:所有事务请求必须由一个全局唯一的服务器来协调处理,这样的服务器被称为Leader服务器,余下的服务器则称为Follower服务器,Leader服务器负责将一个客户端事务请求转化成一个事务Proposal(提议),并将该Proposal分发给集群中所有的Follower服务器,之后Leader服务器需要等待所有Follower服务器的反馈,一旦超过半数的Follower服务器进行了正确的反馈后,那么Leader就会再次向所有的Follower服务器分发Commit消息,要求其将前一个Proposal进行提交。
ZAB协议包括两种基本的模式:崩溃恢复和消息广播。
当整个服务框架启动过程中或Leader服务器出现网络中断、崩溃退出与重启等异常情况时,ZAB协议就会进入恢复模式并选举产生新的Leader服务器。当选举产生了新的Leader服务器,同时集群中已经有过半的机器与该Leader服务器完成了状态同步之后,ZAB协议就会退出恢复模式,状态同步时指数据同步,用来保证集群在过半的机器能够和Leader服务器的数据状态保持一致。
当集群中已经有过半的Follower服务器完成了和Leader服务器的状态同步,那么整个服务框架就可以进入消息广播模式,当一台同样遵守ZAB协议的服务器启动后加入到集群中,如果此时集群中已经存在一个Leader服务器在负责进行消息广播,那么加入的服务器就会自觉地进入数据恢复模式:找到Leader所在的服务器,并与其进行数据同步,然后一起参与到消息广播流程中去。Zookeeper只允许唯一的一个Leader服务器来进行事务请求的处理,Leader服务器在接收到客户端的事务请求后,会生成对应的事务提议并发起一轮广播协议,而如果集群中的其他机器收到客户端的事务请求后,那么这些非Leader服务器会首先将这个事务请求转发给Leader服务器。
当Leader服务器出现崩溃或者机器重启、集群中已经不存在过半的服务器与Leader服务器保持正常通信时,那么在重新开始新的一轮的原子广播事务操作之前,所有进程首先会使用崩溃恢复协议来使彼此到达一致状态,于是整个ZAB流程就会从消息广播模式进入到崩溃恢复模式。一个机器要成为新的Leader,必须获得过半机器的支持,同时由于每个机器都有可能会崩溃,因此,ZAB协议运行过程中,前后会出现多个Leader,并且每台机器也有可能会多次成为Leader,进入崩溃恢复模式后,只要集群中存在过半的服务器能够彼此进行正常通信,那么就可以产生一个新的Leader并再次进入消息广播模式。如一个由三台机器组成的ZAB服务,通常由一个Leader、2个Follower服务器组成,某一个时刻,加入其中一个Follower挂了,整个ZAB集群是不会中断服务的。

  1. 消息广播,ZAB协议的消息广播过程使用原子广播协议,类似于一个二阶段提交过程,针对客户端的事务请求,Leader服务器会为其生成对应的事务Proposal,并将其发送给集群中其余所有的机器,然后再分别收集各自的选票,最后进行事务提交。

在ZAB的二阶段提交过程中,移除了中断逻辑,所有的Follower服务器要么正常反馈Leader提出的事务Proposal,要么就抛弃Leader服务器,同时,ZAB协议将二阶段提交中的中断逻辑移除意味着我们可以在过半的Follower服务器已经反馈Ack之后就开始提交事务Proposal,而不需要等待集群中所有的Follower服务器都反馈响应,但是,在这种简化的二阶段提交模型下,无法处理Leader服务器崩溃退出而带来的数据不一致问题,因此ZAB采用了崩溃恢复模式来解决此问题,另外,整个消息广播协议是基于具有FIFO特性的TCP协议来进行网络通信的,因此能够很容易保证消息广播过程中消息接受与发送的顺序性。再整个消息广播过程中,Leader服务器会为每个事务请求生成对应的Proposal来进行广播,并且在广播事务Proposal之前,Leader服务器会首先为这个事务Proposal分配一个全局单调递增的唯一ID,称之为事务ID(ZXID),由于ZAB协议需要保证每个消息严格的因果关系(顺序一致性>因果一致性),因此必须将每个事务Proposal按照其ZXID的先后顺序来进行排序和处理。

  1. 崩溃恢复,在Leader服务器出现崩溃,或者由于网络原因导致Leader服务器失去了与过半Follower的联系,那么就会进入崩溃恢复模式,在ZAB协议中,为了保证程序的正确运行,整个恢复过程结束后需要选举出一个新的Leader服务器,因此,ZAB协议需要一个高效且可靠的Leader选举算法,从而保证能够快速地选举出新的Leader,同时,Leader选举算法不仅仅需要让Leader自身知道已经被选举为Leader,同时还需要让集群中的所有其他机器也能够快速地感知到选举产生的新的Leader服务器。
  2. 基本特性,ZAB协议规定了如果一个事务Proposal在一台机器上被处理成功,那么应该在所有的机器上都被处理成功,哪怕机器出现故障崩溃。ZAB协议需要确保那些已经在Leader服务器上提交的事务最终被所有服务器都提交,假设一个事务在Leader服务器上被提交了,并且已经得到了过半Follower服务器的Ack反馈,但是在它Commit消息发送给所有Follower机器之前,Leader服务挂了。如下图所示

在集群正常运行过程中的某一个时刻,Server1是Leader服务器,其先后广播了P1、P2、C1、P3、C2(C2是Commit Of Proposal2的缩写),其中,当Leader服务器发出C2后就立即崩溃退出了,针对这种情况,ZAB协议就需要确保事务Proposal2最终能够在所有的服务器上都被提交成功,否则将出现不一致。
ZAB协议需要确保丢弃那些只在Leader服务器上被提出的事务。如果在崩溃恢复过程中出现一个需要被丢弃的提议,那么在崩溃恢复结束后需要跳过该事务Proposal,如下图所示
假设初始的Leader服务器Server1在提出一个事务Proposal3之后就崩溃退出了,从而导致集群中的其他服务器都没有收到这个事务Proposal,于是,当Server1恢复过来再次加入到集群中的时候,ZAB协议需要确保丢弃Proposal3这个事务。
在上述的崩溃恢复过程中需要处理的特殊情况,就决定了ZAB协议必须设计这样的Leader选举算法:能够确保提交已经被Leader提交的事务的Proposal,同时丢弃已经被跳过的事务Proposal。如果让Leader选举算法能够保证新选举出来的Leader服务器拥有集群中所有机器最高编号(ZXID最大)的事务Proposal,那么就可以保证这个新选举出来的Leader一定具有所有已经提交的提议,更为重要的是如果让具有最高编号事务的Proposal机器称为Leader,就可以省去Leader服务器查询Proposal的提交和丢弃工作这一步骤了。

  1. 数据同步,完成Leader选举后,在正式开始工作前,Leader服务器首先会确认日志中的所有Proposal是否都已经被集群中的过半机器提交了,即是否完成了数据同步。Leader服务器需要确所有的Follower服务器都能够接收到每一条事务Proposal,并且能够正确地将所有已经提交了的事务Proposal应用到内存数据库中。Leader服务器会为每个Follower服务器维护一个队列,并将那些没有被各Follower服务器同步的事务以Proposal消息的形式逐个发送给Follower服务器,并在每一个Proposal消息后面紧接着再发送一个Commit消息,以表示该事务已经被提交,等到Follower服务器将所有其尚未同步的事务Proposal都从Leader服务器上同步过来并成功应用到本地数据库后,Leader服务器就会将该Follower服务器加入到真正的可用Follower列表并开始之后的其他流程。
    下面分析ZAB协议如何处理需要丢弃的事务Proposal的,ZXID是一个64位的数字,其中32位可以看做是一个简单的单调递增的计数器,针对客户端的每一个事务请求,Leader服务器在产生一个新的事务Proposal时,都会对该计数器进行加1操作,而高32位则代表了Leader周期epoch的编号,每当选举产生一个新的Leader时,就会从这个Leader上取出其本地日志中最大事务Proposal的ZXID,并解析出epoch值,然后加1,之后以该编号作为新的epoch,低32位则置为0来开始生成新的ZXID,ZAB协议通过epoch号来区分Leader周期变化的策略,能够有效地避免不同的Leader服务器错误地使用不同的ZXID编号提出不一样的事务Proposal的异常情况。当一个包含了上一个Leader周期中尚未提交过的事务Proposal的服务器启动时,其肯定无法成为Leader,因为当前集群中一定包含了一个Quorum(过半)集合,该集合中的机器一定包含了更高epoch的事务的Proposal,因此这台机器的事务Proposal并非最高,也就无法成为Leader。
    2.4 ZAB协议原理
    ZAB主要包括消息广播和崩溃恢复两个过程,进一步可以分为三个阶段,分别是发现(Discovery)、同步(Synchronization)、广播(Broadcast)阶段。ZAB的每一个分布式进程会循环执行这三个阶段,称为主进程周期。
    · 发现,选举产生PL(prospective leader),PL收集Follower epoch(cepoch),根据Follower的反馈,PL产生newepoch(每次选举产生新Leader的同时产生新epoch)。
    · 同步,PL补齐相比Follower多数派缺失的状态、之后各Follower再补齐相比PL缺失的状态,PL和Follower完成状态同步后PL变为正式Leader(established leader)。
    · 广播,Leader处理客户端的写操作,并将状态变更广播至Follower,Follower多数派通过之后Leader发起将状态变更落地(deliver/commit)。
    在正常运行过程中,ZAB协议会一直运行于阶段三来反复进行消息广播流程,如果出现崩溃或其他原因导致Leader缺失,那么此时ZAB协议会再次进入发现阶段,选举新的Leader。
    2.4.1 运行分析
    每个进程都有可能处于如下三种状态之一
    · LOOKING:Leader选举阶段。
    · FOLLOWING:Follower服务器和Leader服务器保持同步状态。
    · LEADING:Leader服务器作为主进程领导状态。
    所有进程初始状态都是LOOKING状态,此时不存在Leader,此时,进程会试图选举出一个新的Leader,之后,如果进程发现已经选举出新的Leader了,那么它就会切换到FOLLOWING状态,并开始和Leader保持同步,处于FOLLOWING状态的进程称为Follower,LEADING状态的进程称为Leader,当Leader崩溃或放弃领导地位时,其余的Follower进程就会转换到LOOKING状态开始新一轮的Leader选举。
    一个Follower只能和一个Leader保持同步,Leader进程和所有与所有的Follower进程之间都通过心跳检测机制来感知彼此的情况。若Leader能够在超时时间内正常收到心跳检测,那么Follower就会一直与该Leader保持连接,而如果在指定时间内Leader无法从过半的Follower进程那里接收到心跳检测,或者TCP连接断开,那么Leader会放弃当前周期的领导,比你转换到LOOKING状态,其他的Follower也会选择放弃这个Leader,同时转换到LOOKING状态,之后会进行新一轮的Leader选举,并在选举产生新的Leader之后开始新的一轮主进程周期。
    2.5 ZAB与Paxos的联系和区别
    联系:
  2. 都存在一个类似于Leader进程的角色,由其负责协调多个Follower进程的运行。
  3. Leader进程都会等待超过半数的Follower做出正确的反馈后,才会将一个提议进行提交。
  4. 在ZAB协议中,每个Proposal中都包含了一个epoch值,用来代表当前的Leader周期,在Paxos算法中,同样存在这样的一个标识,名字为Ballot。
    区别:
    Paxos算法中,新选举产生的主进程会进行两个阶段的工作,第一阶段称为读阶段,新的主进程和其他进程通信来收集主进程提出的提议,并将它们提交。第二阶段称为写阶段,当前主进程开始提出自己的提议。
    ZAB协议在Paxos基础上添加了同步阶段,此时,新的Leader会确保存在过半的Follower已经提交了之前的Leader周期中的所有事务Proposal。
    ZAB协议主要用于构建一个高可用的分布式数据主备系统,而Paxos算法则用于构建一个分布式的一致性状态机系统。

Zab 如何保证强一致

Zab 通过约束事务先后顺序达到强一致性,先广播的事务先 commit、FIFO,Zab 称之为**primary order(以下简称 PO)**。实现 PO 的核心是zxid
Zab 中每个事务对应一个 zxid,它由两部分组成:<e, c>,e 即 Leader 选举时生成的 epoch,c 表示当次 epoch 内事务的编号、依次递增。假设有两个事务的 zxid 分别是 z、z’,当满足 z.e < z'.e 或者 z.e = z'.e && z.c < z'.c 时,定义 z 先于 z’发生(z < z')。
为实现 PO,Zab 对 Follower、Leader 有以下约束:

  1. 有事务 z 和 z’,如果 Leader 先广播 z,则 Follower 需保证先 commit z 对应的事务
  2. 有事务 z 和 z’,z 由 Leader p 广播,z’由 Leader q 广播,Leader p 先于 Leader q,则 Follower 需保证先 commit z 对应的事务
  3. 有事务 z 和 z’,z 由 Leader p 广播,z’由 Leader q 广播,Leader p 先于 Leader q,如果 Follower 已经 commit z,则 q 需保证已 commit z 才能广播 z’

第 1、2 点保证事务 FIFO,第 3 点保证 Leader 上具备所有已 commit 的事务。
相比 Paxos,Zab 约束了事务顺序、适用于有强一致性需求的场景。

Paxos、Raft、Zab 比较

除 Paxos、Raft 和 Zab 外,Viewstamped Replication(简称 VR)也是讨论比较多的一致性协议。这些协议包含很多共同的内容(Leader、quorum、state machine 等),因而我们不禁要问:Paxos、Raft、Zab 和 VR 等分布式一致性协议区别到底在哪,还是根本就是一回事?
Paxos、Raft、Zab 和 VR 都是解决一致性问题的协议,Paxos 协议原文倾向于理论,Raft、Zab、VR 倾向于实践,一致性保证程度等的不同也导致这些协议间存在差异。下图帮助我们理解这些协议的相似点和区别
Paxos、Raft、Zab比较

参考

Paxos

  1. 分布式系统理论进阶 - Paxos
  2. Paxos Made Simple
  3. The Part-Time Parliament
  4. Tencent/phxpaxos

Raft

  1. [1] Paxos made live - An engineering perspective, Tushar Chandra, Robert Griesemer and Joshua Redstone, 2007
  2. [2] In Search of an Understandable Consensus Algorithm, Diego Ongaro and John Ousterhout, 2013
  3. [3] In Search of an Understandable Consensus Algorithm (Extended Version), Diego Ongaro and John Ousterhout, 2013
  4. [4] Implementing Fault-Tolerant Services Using the State Machine, Fred B. Schneider, 1990

Zab

  1. [5] Zab:High-performance broadcast for primary-backup systems, FlavioP.Junqueira,BenjaminC.Reed,andMarcoSerafini, 2011
  2. [6] ZooKeeper’s atomic broadcast protocol: Theory and practice, Andr´e Medeiros, 2012
  3. [7] Viewstamped Replication A New Primary Copy Method to Support Highly-Available Distributed Systems, Brian M.Oki and Barbar H.Liskov, 1988
  4. [8] Viewstamped Replication Revisited, Barbara Liskov and James Cowling, Barbara Liskov and James Cowling ,2012
  5. [9] Can’t we all just agree? The morning paper, 2015
  6. [10] Vive La Difference: Paxos vs. Viewstamped Replication vs. Zab, Robbert van Renesse, Nicolas Schiper and Fred B. Schneider, 2014

Paxos 变种和优化

  1. [1] Mencius: Building Efficient Replicated State Machines for WANs, Yanhua Mao,Flavio P. Junqueira,Keith Marzullo, 2018
  2. [2] Fast Paxos, Leslie Lamport, 2005
  3. [3] Generalized Consensus and Paxos, Leslie Lamport, 2004
  4. [4] There Is More Consensus in Egalitarian Parliaments, Iulian Moraru, David G. Andersen, Michael Kaminsky, 2013

RocketMQ 启动流程中的服务注册

RocketMQ 的消息队列集群结构主要包含 NameServer、Broker(Master/Slave)、Producer、Consumer 4 个部分,基本通信流程如下:

  1. Broker 启动后需要完成一次将自己注册至 NameServer 的操作;随后每隔 30s 时间定时向 NameServer 上报 Topic 路由信息。
    Broker 启动入口:org.apache.rocketmq.broker.BrokerController#start
    注册到每个 NameServer:org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll
  2. 消息生产者 Producer 作为客户端发送消息时候,需要根据消息的 Topic 从本地缓存的 TopicPublishInfoTable 获取路由信息。如果没有则更新路由信息会从 NameServer 上重新拉取,同时 Producer 会默认每隔 30s 向 NameServer 拉取一次路由信息。
  3. 消息生产者 Producer 根据2中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker 作为消息的接收者接收消息并落盘存储。
  4. 消息消费者 Consumer 根据2中获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费。

服务发现

Broker

Broker端启动时会向NameServer注册,并开启一个定时任务,用于每隔十秒向所有NameServer发送心跳请求,将Topic信息注册到NameServer。

NameServer

除了Broker可以向NameServer注册服务信息,NameServer也会启动一个定时任务来定时清理不活动的Broker,默认情况下是清除两分钟没有向NameServer发送心跳更新的Broker。
NameServer的结构比较简单,主要类只有6个:

  • NamesrvStartup:程序入口。
  • NamesrvController:NameServer 的总控制器,负责所有服务的生命周期管理。
  • RouteInfoManager:NameServer 最核心的实现类,负责保存和管理集群路由信息
    topicQueueTable 保存的是主题和队列信息,其中每个队列信息对应的类 QueueData 中,还保存了 brokerName。需要注意的是,这个 brokerName 并不真正是某个 Broker 的物理地址,它对应的一组 Broker 节点,包括一个主节点和若干个从节点。
    brokerAddrTable 中保存了集群中每个 brokerName 对应 Broker 信息,每个 Broker 信息用一个 BrokerData 对象表示。
    brokerLiveTable 中保存了集群中所有活跃的Broker。
    定时每10秒一次扫描并清除不活跃的Broker,代码见:RouteInfoManager#scanNotActiveBroker
  • BrokerHousekeepingService:监控 Broker 连接状态的代理类。
  • DefaultRequestProcessor:负责处理客户端和 Broker 发送过来的 RPC 请求的处理器。
    先用读写锁保证并发安全,然后比较所有路由信息Map并更新。
  • ClusterTestRequestProcessor:用于测试的请求处理器。

Producer

Producer是只能发给Broker集群里的Master的,如果Master挂掉,那么Producer也不能继续发消息了,只能等集群重新选举出一个新的Master,虽然可用性会降低,但是也给顺序消息的实现提供了方便。

Consumer

定时从NameServer拉取topicRouteTable更新本地的brokerAddrTable,也就是说,只要NameServer会把宕掉的Broker清掉,那么Consumer最终(注意并不能实时)也可以取到拿到一份活跃的Broker列表。
MQClientInstance#updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer)
Consumer端是定时地上Broker拉取消息。
org.apache.rocketmq.common.ServiceThread#start
如果某次消费出错了,就会触发Fallback方案,改为稍后再重试。
org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync

消息有序性

一些场景需要保证操作的顺序性,比如A系统要将订单同步给B系统,但是要按照订单所发生事件顺序来同步,比如后台先修改订单价格再用户支付,那么就要先发修改价格的状态再发送支付成功的状态。
RocketMQ 可以严格的保证消息有序。

消息优先级

消息优先级机制可以让消息队列中优先级较高的消息先投递,比如订单创建消息就可以优先于日常推送消息。
RocketMQ 没有直接实现消息的优先级,主要是处于性能考虑,因为 RocketMQ 所有消息都是持久化的,所以如果按照优先级来排序,开销会非常大。但也可以用一种变通的方式实现消息的优先级,比如创建 3 个代表不同优先级的队列。

有序消息的实现方式

要保证MQ消息消费的有序性,需要保证以下3个阶段的有序性:
RocketMQ保持消息的有序性

  1. 消息被发送时保持有序;
  2. 消息被存储时保持和发送时的顺序一致;
  3. 消息被消费时保持和存储时的顺序一致。

在RocketMQ中,有两种实现方式:

  1. 全局有序
    RocketMQ-全局有序
    一个Topic内所有的消息都发送到同一个Queue。
    适用于性能要求不高,所有的消息严格按照FIFO原则进行消息发布和消费的场景。
  2. 分区有序
    RocketMQ-分区有序
    RocketMQ根据用户自定义的Sharding Key将消息散列到不同的Queue,每个Queue内的消费是严格有序的。
    适用于性能要求较高的场景。

RocketMQ中有序消息的实现原理

发送端需要指定消息的ShardingKey:

1
org.apache.rocketmq.client.producer.DefaultMQProducer#send(Message, MessageQueueSelector, Object)

如上述消息发送API所示,第二个参数MessageQueueSelector可以用于发送消息时指定某个Queue,在API层面没有区分全局有序和分区有序,如果要实现全局有序就把所有消息都Sharding到一个Queue上即可。

消费方拉取时需要区分Pull和Push两种模式:

  1. Pull
    Pull模式就是每次从一个Queue中拉取。
  2. Push
    Push模式使用一个线程轮询Broker拉取消息,然后调用客户端提供的回调函数进行消费,在客户端中需要保证调用MessageListener时消息的顺序性。
    ConsumeMessageOrderlyService
    实现上,就是在消费前先对队列加锁,避免Consumer并发消费一个队列(一个队列只能被一个Consumer消费,因此这个锁主要是保证Consumer不会并发消费一个消息,是单机锁而不是分布式锁)。

有序性与可用性间存在的矛盾

为了保证服务的高可用性,RocketMQ支持把一个主题分布到多对主从节点上去,每对主从节点中承担主题中的一部分队列,如果某个主节点宕机了,会自动切换到其他主节点上继续发消息,这样既解决了可用性的问题,还可以通过水平扩容来提升 Topic 总体的性能。
但是严格的顺序性要求指定队列来发送消息,一个队列一定是落在一个特定的主节点上的,如果该主节点宕机了,那么顺序性也就不存在了。
在RocketMQ中引入了Dledger的复制方式,这种方式对上述问题的解决方案是:消息必须被复制到半数以上节点才能返回成功,且主节点宕机后支持通过选举来动态切换主节点,Dledger在选举时,总会把数据和主节点一致的从节点选为新的主节点,这样就保证了数据的一致性,既不会丢消息,还可以保证严格顺序。

消息可靠性

只要不是存储硬件发生不可逆的损坏,RocketMQ 都可以保证消息不丢或少丢,比如 Broker 挂掉、操作系统挂掉等;
如果硬件发生不可逆的损坏,则该节点的消息就无法恢复了,需要通过异步复制来恢复。

如何检测消息丢失?

  1. 利用分布式链路追踪系统
  2. 利用消息队列的有序性来验证是否有消息丢失

RocketMQ 如何保证消息不丢?

保证消息不丢失的原理是复制,传统的复制方式有异步复制和同步双写复制两种:

  1. 异步复制
    消息先发送到主节点上,就返回”写入成功”,然后消息再异步复制到从节点上。
  2. 同步双写复制
    消息同步双写到主从节点上,主从都写成功,才返回”写入成功”。

    这两种方式本质区别是:写入多少个副本再返回,异步复制需要的副本数是1,而同步双写需要的副本数为2。

RocketMQ的复制模式是基于Deldger的新复制模式
在 RocketMQ 中,Broker 的主从关系是通过配置固定的,不支持动态切换。如果主节点宕机,生产者就不能再生产消息了,消费者可以自动切换到从节点继续进行消费。这时候,即使有一些消息没有来得及复制到从节点上,这些消息依然躺在主节点的磁盘上,除非是主节点的磁盘坏了,否则等主节点重新恢复服务的时候,这些消息依然可以继续复制到从节点上,也可以继续消费,不会丢消息,消息的顺序也是没有问题的。

Deldger的复制模式

  • RocketMQ-on-DLedger Group 是指一组相同名称的 Broker,至少需要 3 个节点,通过 Raft 自动选举出一个 Leader,其余节点 作为 Follower,并在 Leader 和 Follower 之间复制数据以保证高可用。
  • RocketMQ-on-DLedger Group 能自动容灾切换,并保证数据一致。
  • RocketMQ-on-DLedger Group 是可以水平扩展的,也即可以部署任意多个 RocketMQ-on-DLedger Group 同时对外提供服务。

一主二从的三副本集群的复制过程:

  1. Dledger 在写入消息的时候,要求至少消息复制到半数以上的节点之后,才给客户端返回写入成功,并且它是支持通过选举来动态切换主节点的。

    至少半数这个要求有点像MySQL里的semi-sync和Redis里的复制策略,说明这种方案确实非常有效,只要是涉及集群复制的场景都可以考虑采用。

  2. 拿 3 个节点举例说明一下。
    当主节点宕机的时候,2 个从节点会通过投票选出一个新的主节点来继续提供服务,相比主从的复制模式,解决了可用性的问题。
    由于消息要至少复制到 2 个节点上才会返回写入成功,即使主节点宕机了,也至少有一个节点上的消息是和主节点一样的。
  3. Dledger 在选举时,总会把数据和主节点一样的从节点选为新的主节点,
    这样就保证了数据的一致性,既不会丢消息,还可以保证严格顺序。

Dledger的不足:

  • 选举过程中不能提供服务。
    最少需要 3 个节点才能保证数据一致性,3 节点时,只能保证 1 个节点宕机时可
    用,如果 2 个节点同时宕机,即使还有 1 个节点存活也无法提供服务,资源的利用率比较
    低。另外,由于至少要复制到半数以上的节点才返回写入成功,性能上也不如主从异步复制
    的方式快。

RocketMQ如何保证高可用

  • 通过多主多从架构保证高可用
    RocketMQ 支持把一个主题分布到多对主从节点上去,每对主从节点中承担主题中的一部分队列,如果某个主节点宕机了,会自动切换到其他主节点上继续发消息,这样既解决了可用性的问题,还可以通过水平扩容来提升 Topic 总体的性能。
  • 高可用与严格顺序不能并存
    在需要保证消息严格顺序的场景下,由于在主题层面无法保证严格顺序,所以必须指定队列来发送消息(指分区有序或全局有序),对于任何一个队列,它一定是落在一组特定的主从节点上,如果这个主节点宕机,其他的主节点是无法替代这个主节点的,否则就无法保证严格顺序。在这种复制模式下,严格顺序和高可用只能选择一个。

如果服务器宕机怎么办?

现在我们可以来回答服务器宕机应该怎么办?

Broker宕机

  1. RocketMQ并没有实现高可用性,如上所述,Master宕机会触发重新选举,因为消息需要被同步到过半Slave后才会返回,因此Master宕机后消息本身不会丢。
  2. 在RocketMQ中,Broker会定时向NameServer发送心跳,Broker如果宕机,一段时间后NameServer发现Broker上次心跳已经超过了时间阈值,则会将该Broker移除出服务注册表。

NameServer宕机

NameServer集群中的节点是没有Master、Slave之分的,其中一台挂掉并不会影响其他节点提供服务。

Consumer / Producer 宕机

Consumer或Producer宕机不会产生任何影响,随时可以扩容缩容。

主从集群的扩容缩容

在RocketMQ中消费的基本单位是队列而不是Broker,因此扩容时单纯增加一台Broker并没有什么作用,还需要给这台Broker分配Queue。
在RocketMQ中,读写队列与读写分离是完全不同的两个概念:

  • 读写分离是HA机制:将一个节点的数据同步到另一个节点,主节点可用于读写,从节点用于读。
  • 读写队列则是在做路由信息时使用。在消息发送时,使用写队列个数返回路由信息,而消息消费时按照读队列个数返回路由信息。在物理文件层面,只有写队列才会创建文件。举个例子:写队列个数是8,设置的读队列个数是4.这个时候,会创建8个文件夹,代表0 1 2 3 4 5 6 7,但在消息消费时,路由信息只返回4,在具体拉取消息时,就只会消费0 1 2 3这4个队列中的消息,4 5 6 7压根就没有消息。反过来,如果写队列个数是4,读队列个数是8,在生产消息时只会往0 1 2 3中生产消息,消费消息时则会从0 1 2 3 4 5 6 7所有的队列中消费,当然 4 5 6 7中压根就没有消息 ,假设消费group有两个消费者,事实上只有第一个消费者在真正的消费消息(0 1 2 3),第二个消费者压根就消费不到消息。
    因此只有readQueueNums>=writeQueueNums时程序才能正常进行,最好是readQueueNums>writeQueueNums。
    读写队列的分配见代码:MQClientInstance#updateTopicRouteInfoFromNameServer

rocketmq设置读写队列数的目的在于方便队列的缩容和扩容。比如缩容至1/2的话就是先把写队列缩小至1/2,等后一半队列中的消息被消费完毕,然后再将读队列缩小一半,即可达到平滑缩容的目的。

  • 将已有的Queue分配到新Broker
  • 增加更多的Queue并分配到新Broker

具体扩容的操作可参考:RocketMQ 主题扩分片后遇到的坑

事务消息

RocketMQ中的实现

  • 半事务消息:发事务消息时,发送方会先发送一条半事务消息给 Broker,此时 Broker 暂未收到 Producer 对该消息的二次确认,此时该消息被标记成暂不能投递状态,处于该种状态下的消息即半事务消息;
  • 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,Broker 通过扫描发现某条消息长期处于半事务消息时,需要主动向 Producer 询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。

RocketMQ-事务消息
事务消息的处理流程如上图所示:
图片来自事务消息

  1. Producer 发半事务消息给 Broker,Producer需要记录消息状态到一张本地消息事务表内,和业务数据在一个事务里提交,之后生产消费者均通过这张表里的状态来判断事务的状态;

    如果业务数据本身就是有状态的,其实也可以通过查询业务数据状态来判断事务状态。

  2. Broker 将半事务消息持久化后,向 Producer 返回 ACK 确认消息已发送成功,此时消息为半事务消息;
  3. Producer 端开始执行本地事务逻辑;
  4. Producer 端根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),Broker 收到 Commit 状态则将半事务消息标记为可投递,Consumer 最终将收到该消息;Broker 收到 Rollback 状态则删除半事务消息,Consumer 将不会接受该消息。

接下来我们看看各步骤如果出错会怎么样:

  1. 2消息丢失或3执行失败:发送方终止本次事务,提交 Rollback 消息给服务端;
  2. 4的 Commit 消息提交失败:见5,即服务端会回查事务状态;

事务消息回查步骤如下:

  1. 在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
  2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行操作。

事务消息源码

Producer发送

  1. 发送
    org.apache.rocketmq.client.producer.TransactionMQProducer#sendMessageInTransaction
    先添加半消息的标识,然后和普通消息一样发送,如果发送成功则执行本地事务。
  2. 执行本地事务
    org.apache.rocketmq.client.producer.TransactionListener#executeLocalTransaction
    使用了发送前传入的回调。
    返回值表示本地事务(LocalTransactionState)的执行情况:
    • COMMIT_MESSAGE:endTransaction时发送TRANSACTION_COMMIT_TYPE类型的消息到Broker,此时事务被提交,Consumer端可以消费该条消息;
    • ROLLBACK_MESSAGE:endTransaction时通知Broker TRANSACTION_ROLLBACK_TYPE,事务被回滚,Consumer端不可消费该消息;
    • UNKNOW:endTransaction时通知Broker TRANSACTION_NOT_TYPE,本地事务未执行完毕,Broker需要稍后反查本地事务状态。
  3. 根据本地事务执行结果,开始执行提交或回滚
    org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#endTransaction
    正如我们之前讨论的那样,这个消息发失败也没有关系,Broker之后会有反查。

Broker接收

  1. 处理发消息请求
    org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage
  2. 处理事务消息
    org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#prepareMessage -> org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#putHalfMessage
    可以看到RocketMQ并没有将半消息保存到客户端指定的那个队列中,而是记录了原始的主题队列后,把这个半消息保存在了一个特殊的内部主题 RMQ_SYS_TRANS_HALF_TOPIC 中,使用的队列号固定为 0
    这个主题和队列对消费者是不可见的,所以里面的消息永远不会被消费。这样,就保证了在事务提交成功之前,这个半消息对消费者来说是消费不到的。

Broker反查

  1. 启动反查定时任务
    TransactionalMessageCheckService
  2. 反查
    org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener#resolveHalfMsg
    定时从半消息队列中读出所有待反查的半消息,针对每个需要反查的半消息,Broker 会给对应的 Producer 发一个要求执行事务状态反查的 RPC 请求,根据 RPC 返回响应中的反查结果,来决定这个半消息是需要提交还是回滚,或者后续继续来反查。
  3. 结束事务
    org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest
    最后,提交或者回滚事务实现的逻辑是差不多的,首先把半消息标记为已处理,如果是提交事务,那就把半消息从半消息队列中复制到这个消息真正的主题和队列中去,如果要回滚事务,这一步什么都不需要做,最后结束这个事务。

QA

Broker的offset如何维护?本地的offset如何维护?

拉取速度是否会受到消费速度制约?如果不是,那么拉取速度过快是不是会导致重复消费甚至OOM?

并发消费offset为1-10的消息,其中8失败了,其他都是成功的,这种情况下本地的offset如何记录,broker的offset如何记录,有哪些是可能被重复消费的?

如何尽可能的避免重复消费?

应用第一次启动时offset如何定义?是否会消费历史消息,应用重启是否会丢消息?

应用扩所容是否会导致消费消息丢失和消费队列的重新排序?

重试和幂等

Retry队列和offset

在RocketMQ启动时,每个group层面都会再定义一个专用的重试topic,重试消息被插入了重试topic队列。
重试队列存在的意义就是快速推进offset,重试topic的名字是%RETRY%+consumerGroup,因此重试topic是group维度的,所以默认情况下一个group的consumer会有2个订阅topic,2个topic同时进行队列的rebalance。
offset是按照MessageQueue的维度进行维护的
消息重试有2种反馈方式:

  1. 重试队列:客户端先通过Netty API发送消息到Broker,如果这时调用Netty发送异常则调用Producer发送到RetryTopic中。
  2. 死信队列:如果重试次数过多(默认16次)则会进入死信队列,死信队列的逻辑在Broker,Client不会将消息发送至死信队列Topic。

Producer端重试

下面的代码同步发送消息,如果5秒内没有发送成功,则重试5次

1
2
3
DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
producer.setRetryTimesWhenSendFailed(5);
producer.send(msg,5000L);

Producer 的 send 方法本身支持内部重试
同步发送代码:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(Message),注意传超时参数时取的defaultMQProducer.getSendMsgTimeout()
异步发送:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(Message, SendCallback)
实际发送消息的代码位置(注意对sendResult的处理):org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
从源码中可以得到以下结论:

  • 至多重试 2 次。
    同步发送为 2 次,异步发送为 0 次,也就是说,异步发送是不会重试的。
  • 如果发送失败,则轮转到下一个 Broker。这个方法的总耗时时间不超过 sendMsgTimeout 设置的值,默认 10s。
  • 如果本身向 broker 发送消息产生超时异常,就不会再重试。

除了Producer客户端的自动重试外,应用程序在接收到SendResult后也可以自己尝试去重试。

Consumer重试

消费者消费消息后需要给Broker返回消费状态,比如并发消费者MessageListenerConcurrently会返回ConsumeConcurrentlyStatus

  • 如果消费成功,返回CONSUME_SUCCESS
  • 如果消费出错,返回RECONSUME_LATER,一段时间后重试。

状态的返回是由用户线程控制的,但还有第三种可能,就是超时了,因此Consumer端的重试包含以下两种情况:

  1. 异常重试:Consumer端主动返回RECONSUME_LATER状态,Broker会在一段时间后重试;
  2. 超时重试:Consumer端处理时间过长,或者由于某些原因线程挂起,导致迟迟没有返回消费状态,Broker就会认为Consumer消费超时,此时会发起超时重试。

如果Consumer端因为各种类型异常导致本次消费失败(如上所述的两种情况),为防止该消息丢失而需要将其重新回发给Broker端保存,保存这种因为异常无法正常消费而回发给MQ的消息队列称之为重试队列。RocketMQ会为每个消费组都设置一个Topic名称为%RETRY%+consumerGroup的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的)。

不能保证消息消费失败加入重试队列后还能被同一消费者消费,可能会破坏消息的顺序性。

由于有些原因导致Consumer端长时间的无法正常消费从Broker端Pull过来的业务消息,为了确保消息不会被无故的丢弃,那么超过配置的“最大重试消费次数”后就会移入到这个死信队列中,RocketMQ会为每个消费组都设置一个Topic命名为“%DLQ%+consumerGroup”的死信队列。一般在实际应用中,移入至死信队列的消息,需要人工干预处理

另外还有两种需要注意的情况:

  • 只有消息模式为MessageModel.CLUSTERING集群模式时,Broker才会自动进行重试,而广播消息是不会重试的。
  • 事务消息中的半事务消息通过 Broker 的回查机制重试,具体流程见下面的事务消息

消费进度和 offset

offset的更新
消息消费完成后,需要将消费进度存储起来,即前面提到的offset。
consumerQueue类似一个无限长的数组,可以利用offset来直接定位。
offset的存储分为本地模式和远程模式:

  • 本地模式:广播模式下,同消费组的消费者相互独立,消费进度要单独存储,对应的数据结构是LocalFileOffsetStore
  • 远程模式:集群模式下,同一条消息只会被同一个消费组消费一次,消费进度会参与到负载均衡中,故消费进度是需要共享的,对应的数据结构是RemoteBrokerOffsetStore,下面对offset的讨论集中于远程模式。

Consumer更新offset到Broker

  1. 消费消息维护offset
    org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult
    处理失败的消息会反馈给Consumer,然后发送到topic对应的RetryTopic,这样能快速令offset前进。
  2. 定时任务
    每隔一段时间将各个队列的消费进度存储到对应的broker上,该时间由DefaultMQPushConsumerpersistConsumerOffsetInterval属性控制,默认为5秒。
    MQClientInstance#startScheduledTask -> MQClientInstance#persistAllConsumerOffset
    启动一个定时任务提交offset。
    RemoteBrokerOffsetStore#updateConsumeOffsetToBroker
    将offset发送到Broker。

Broker端offset的存储

org.apache.rocketmq.broker.processor.ConsumerManageProcessor#updateConsumerOffset
Broker会将offset存储在内存的一个offsetTable中,即RemoteBrokerOffsetStore

Consumer从Broker拉取offset

DefaultMQPushConsumerImpl#pullMessage
拉消息后触发offset的更新。
RemoteBrokerOffsetStore#readOffset
将offset保存到缓存offsetTable中。

消息幂等

RocketMQ提供At least once的消息服务质量标准,表示一条消息至少被送达一次,也就是说,不允许丢消息,但允许有少量重复消息出现。

另外两种服务质量标准是At most onceExactly once

比如Producer发出了10个消息,如果Consumer接收中间两条消息时出错了,返回RECONSUME_LATER,则该两条消息会被加入到RETRY队列中重新消费。

解决消息重复消费问题的主要方法是幂等,一个幂等操作的特点是,其任意多次执行仅会产生一次影响,因此从对系统的影响结果来说:At least once + 幂等消费 = Exactly once
实现幂等的方式有很多种,不过这些方案与消息队列本身已经没有多大关系了,因此这里仅仅简单描述一下这些实现方式:

  1. 利用数据库的唯一约束实现幂等
    为一个操作设置一个唯一键,比如一个账单每个用户只允许变更一次,则可以给转账流水表中的账单ID和账户ID创建一个唯一约束。
  2. 加上前置条件
    限制数据更新前的状态,比如只有在余额为500的时候才允许更新。
    也可以单独加上一个唯一ID,每次发消息时生成一个全局唯一ID,消费时检查这个唯一ID是否有被消费过。

重试源码

1、Consumer端初始化重试队列信息
1.1、Consumer端启动后,创建重试队列的订阅group

1
2
3
4
5
6
7
8
9
10
11
12
13
// Consumer自动创建一个group=%RETRY%+ConsumerGroup,用于后续的消费重试
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}

2、Consumer端处理消费结果ConsumeMessageConcurrentlyService#processConsumeResult
2.1、设置ack
如果ConsumeRequest封装的消息全消费成功,则设置ackIndex的值为消息总条数-1,反之ackIndex-=1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
switch (status) {
// 消费成功
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
// 消费失败、重试
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}

2.2、消费失败的消息触发重试
sendMessageBack将消费失败的msg发回broker,如果sendMessageBack也失败则保存到msgBackFailed

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
switch (this.defaultMQPushConsumer.getMessageModel()) {
...
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
// ackIndex+1开始的是未成功消费的
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
// 如果发送失败,则保存到msgBackFailed
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}

// 将sendMessageBack失败的消息从consumeRequest移除,并包装起来5s后转发给消费线程池继续消费
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);

this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}

2.3、更新offset
本地消费成功后会将消费进度同步到本地的processQueue
sendMessageBack成功的消息会从本地processQueue中移除,并更新进度,这条消息的消费会交由消费集群中的一个节点去继续消费,取决于负载均衡将此消息对应的topic对应的重试队列retryQueue分配给哪个节点。

1
2
3
4
5
6
7
// 这里开始更新offset
// 先从队列在consumer端的视图(一个treeMap)中移除
// 这里返回的offset是经过删除后最小的偏移量
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}

3、Broker端接收sendMessageBack消息
Broker端的处理主要是重试和延迟
3.1、设置topic
设置此条消息新的topic为%RETRY%消费组的名称,并且选择新topic的队列(默认为0,默认情况下RetryQueueNum为1)

1
2
3
4
5
6
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}

3.2、将消息topic设置为重试topic
通过物理偏移量找到消息体

1
2
3
4
5
6
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed, " + requestHeader.getOffset());
return CompletableFuture.completedFuture(response);
}

给原始消息新增属性,key为RETRY_TOPIC,value为原始消息的实际topic
和Consumer端消费消息时的resetRetryTopic(msgs)相呼应

1
2
3
4
5
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);

3.3、获取延迟并判断是否进入死信队列
获取消息的延迟级别,默认此时的值为0

1
2
3
4
5
6
int delayLevel = requestHeader.getDelayLevel();

int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}

消息每消费失败一次都会增加ReconsumeTimes的值,当这个值达到了maxReconsumeTimes(默认为16),则将此消息送入死信队列,且此死信队列不可读,也就是说这条消息在没有人工干预的情况下再也不能被消费了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes 
|| delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;

topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return CompletableFuture.completedFuture(response);
}
} else {
// 设置延迟级别为3,意味着要延迟10s再消费这条消息,消息重复消费需要借助延迟消费的功能实现
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
}

3.4、存储新创建的消息
这里有延迟消息的实现:如果delayLevel大于0,会将此消息的topic和queueID再进行一次转换,将此消息的newTopic、queueID存入到属性中(real_topic, real_qid),新的topic为SCHEDULE_TOPIC_XXXX,新的queue为根据delayLevel的等级去本地delayTimeLevel找到对应的队列;后续会有ScheduleMessageService做后续的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
return putMessageResult.thenApply((r) -> {
if (r != null) {
switch (r.getPutMessageStatus()) {
case PUT_OK:
String backTopic = msgExt.getTopic();
String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (correctTopic != null) {
backTopic = correctTopic;
}
this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
default:
break;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(r.getPutMessageStatus().name());
return response;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("putMessageResult is null");
return response;
});

4、Broker端重试
ScheduleMessageService服务是来处理延迟消息的服务组件,delayLevelTable存储了不同的延迟级别的延迟时间,可配置。

1
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

4.1、遍历消费队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 遍历delayLevelTable里所有级别队列
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}

if (timeDelay != null) {
// 由一个timer来处理
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}

4.2、重置延时消息
判断时间是否达到了延迟时间,达到了再将这些消息的原始topic和原始队列取出转发存储起来,待消费者消费。
4.3、设置重试消息
重试消息会被转变2次topic和queueID,导致在ScheduleMessageService转发存储的时候会将第一次转变的topic和queueID取出转发到topic=%RETRY%+consumerGroup、queueId=0的消息队列。
这个消息会被consumerGroup这个消费组消费,至于哪个节点消费则由负载均衡来决定。

消息实时性

RocketMQ 支持 pull 和 push 两种消息消费模式,但 push 是使用长轮询 Pull 的方式实现的,可保证消息非常实时,消息实时性不低于 Push。
长轮询 pull 的原理是:发起 pull 请求失败后(比如 Broker 端暂时没有可以消费的消息),先 hold 住线程并挂起该请求。

RocketMQ除了上述的准实时消息外,还支持延时消息

延时消息

RocketMQ里延时消息功能并不能指定时间,而是只能指定延时级别:

1
2
3
4
Message message = new Message("TopicTest", "TagA", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
producer.send(message);

原理

  1. 延时消息和普通消息一样会先被写入commitLog,但不会立刻写入consumerQueue中,而是存放到SCHEDULE_TOPIC_XXX的topic下面,并且以延时粒度作为queueId区分;
  2. 之后Broker端会有定时任务扫描SCHEDULE_TOPIC_XXX下的每个Queue,到时候后写入到consumerQueue中。

源码入口是ScheduleMessageService.start,启动时会调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void start() {
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
// 1. 根据支持的各种延迟级别,添加不同延迟时间的TimeTask
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
// 每个延迟级别对应一个offset,代表一个普通消息队列文件
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}

if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
// 2. 添加一个10s执行一次的TimeTask
this.timer.scheduleAtFixedRate(new TimerTask() {

@Override
public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}

DeliverDelayedMessageTimerTask
ScheduleMessageService.executeOnTimeup
扫描延迟消息队列(SCHEDULE_TOPIC_XXX)的消息,将该延迟消息转换为指定的topic的消息。
1、读取不同延迟级别对应的延迟消息;
2、取得对应延迟级别读取的开始位置offset;
3、将延迟消息转换为指定topic的普通消息并存放起来。
4、修改下一次读取的offset值(修改的只是缓存),并指定下一次转换延迟消息的timetask。

ScheduleMessageService.this.persist
将延迟队列扫描处理的进度offset持久化到delayOffset.json文件中。

RocketMQ延迟队列也有一个缺点:Java中的Timer是单线程,而延迟消息的原理是Timer,也就是说当同时发送的延迟消息过多的时候一个线程处理速度一定是有瓶颈的,因此在实际项目中使用延迟消息一定不要过多依赖,只能作为一个辅助手段。

RocketMQ-消息存储
如上图所示,消息的存储分为如下 3 个部分:

  1. CommitLog:日志,存储消息主体;
  2. ConsumerQueue:在 CommitLog 中根据 Topic 和 Tag 检索消息是非常低效的,因此引入了 ConsumerQueue 作为消费消息的索引,它保存的其实是 CommitLog 中存储的消息的指针。
  3. IndexFile:hash 索引,提供一种通过 key 或时间区间来查询消息的方法。
阅读全文 »
0%