ZooKeeper 源码分析

连接建立和会话管理

ZooKeeper的连接与会话就是客户端通过实例化ZooKeeper对象来实现客户端与服务器创建并保持TCP连接的过程。

Watcher

数据存储

Leader 选举

选举相关概念

  • 服务器 ID
    编号越大在选择算法中的权重越大。比如有三台服务器,编号分别是 1、2、3,其中 3 的那台权重最大。
  • 数据 ID
    服务器中存放的最大数据 ID。值越大说明数据越新,在选举算法中数据越新权重越大。
  • 逻辑时钟
    或者叫投票的次数,同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加,然后与接收到的其它服务器返回的投票信息中的数值相比,根据不同的值做出不同的判断。
  • 选举状态
    LOOKING,竞选状态。
    FOLLOWING,随从状态,同步 leader 状态,参与投票。
    OBSERVING,观察状态,同步 leader 状态,不参与投票。
    LEADING,领导者状态。

zk 集群选举概述

配置多个实例共同构成一个集群对外提供服务以达到水平扩展的目的,每个服务器上的数据是相同的,每一个服务器均可以对外提供读和写的服务,这点和 redis 是相同的,即对客户端来讲每个服务器都是平等的。
zk集群选举概述
zookeeper 提供了三种集群选举方式:

  • LeaderElection
  • AuthFastLeaderElection
  • FastLeaderElection

默认的算法是 FastLeaderElection,所以这里主要分析它的选举机制。

QuorumPeer

主要看这个类,只有 LOOKING 状态才会去执行选举算法。每个服务器在启动时都会选择自己做为领导,然后将投票信息发送出去,循环一直到选举出领导为止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public void run() {
//.......
try {
while (running) {
switch (getPeerState()) {
case LOOKING:
if (Boolean.getBoolean("readonlymode.enabled")) {
//...
try {
//投票给自己...
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
//...
} finally {
//...
}
} else {
try {
//...
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
//...
}
}
break;
case OBSERVING:
//...
break;
case FOLLOWING:
//...
break;
case LEADING:
//...
break;
}

}
} finally {
//...
}
}

FastLeaderElection

它是 zookeeper 默认提供的选举算法,核心方法如下。可以与本文上面的流程图对照。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
public Vote lookForLeader() throws InterruptedException {
//...
try {
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
//给自己投票
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
//将投票信息发送给集群中的每个服务器
sendNotifications();
//循环,如果是竞选状态一直到选举出结果
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){

Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
//没有收到投票信息
if(n == null){
if(manager.haveDelivered()){
sendNotifications();
} else {
manager.connectAll();
}
//...
}
//收到投票信息
else if (self.getCurrentAndNextConfigVoters().contains(n.sid)) {

switch (n.state) {
case LOOKING:

// 判断投票是否过时,如果过时就清除之前已经接收到的信息
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
//更新投票信息
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
//发送投票信息
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
//忽略
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
//更新投票信息
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//判断是否投票结束
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}

if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
//忽略
break;
case FOLLOWING:
case LEADING:
//如果是同一轮投票
if(n.electionEpoch == logicalclock.get()){
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//判断是否投票结束
if(termPredicate(recvset, new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
//记录投票已经完成
outofelection.put(n.sid, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
if (termPredicate(outofelection, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, IGNOREVALUE)) {
synchronized(this){
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
//忽略
break;
}
} else {
LOG.warn("Ignoring notification from non-cluster member " + n.sid);
}
}
return null;
} finally {
//...
}
}

  • 判断是否已经胜出
    默认是采用投票数大于半数则胜出的逻辑。

选举消息内容

在投票完成后,需要将投票信息发送给集群中的所有服务器,它包含如下内容。

  • 服务器 ID
  • 数据 ID
  • 逻辑时钟
  • 选举状态

选举流程简述

目前有 5 台服务器,每台服务器均没有数据,它们的编号分别是 1,2,3,4,5,按编号依次启动,它们的选择举过程如下:

  • 服务器 1 启动,给自己投票,然后发投票信息,由于其它机器还没有启动所以它收不到反馈信息,服务器 1 的状态一直属于 Looking。
  • 服务器 2 启动,给自己投票,同时与之前启动的服务器 1 交换结果,由于服务器 2 的编号大所以服务器 2 胜出,但此时投票数没有大于半数,所以两个服务器的状态依然是 LOOKING。
  • 服务器 3 启动,给自己投票,同时与之前启动的服务器 1,2 交换信息,由于服务器 3 的编号最大所以服务器 3 胜出,此时投票数正好大于半数,所以服务器 3 成为领导者,服务器 1,2 成为小弟。
  • 服务器 4 启动,给自己投票,同时与之前启动的服务器 1,2,3 交换信息,尽管服务器 4 的编号大,但之前服务器 3 已经胜出,所以服务器 4 只能成为小弟。
  • 服务器 5 启动,后面的逻辑同服务器 4 成为小弟。

选举流程

描述 Leader 选择过程中的状态变化,这是假设全部实例中均没有数据,假设服务器启动顺序分别为:A,B,C。
选举流程图
选举状态图

如果规模为 5 的集群只起来其中的 3 台服务器,这时会进行选举吗

不会,ZooKeeper 更倾向于保持一致性,如果配置中的部分服务器不可用,那么整个集群都是不可用的。

参考

  1. 【分布式】Zookeeper 系统模型

ZooKeeper 服务器的启动流程

  1. 【分布式】Zookeeper 服务端启动
  2. 【分布式】Zookeeper 的服务器角色
  3. 【Zookeeper】源码分析之服务器(一)
  4. 【Zookeeper】源码分析之服务器(二)之 ZooKeeperServer
  5. 【Zookeeper】源码分析之服务器(三)之 LeaderZooKeeperServer
  6. 【Zookeeper】源码分析之服务器(四)之 FollowerZooKeeperServer
  7. 【Zookeeper】源码分析之服务器(五)之 ObserverZooKeeperServer

ZooKeeper 客户端

  1. 【分布式】Zookeeper 客户端

Watcher

  1. zookeeper 中 Watcher 通知机制的一点理解
  2. 【Zookeeper】源码分析之 Watcher 机制(一)
  3. 【Zookeeper】源码分析之 Watcher 机制(二)之 WatchManager
  4. 【Zookeeper】源码分析之 Watcher 机制(三)之 ZooKeeper

Leader 选举

  1. 【分布式】Zookeeper 的 Leader 选举
  2. 【Zookeeper】源码分析之 Leader 选举(一)
  3. 【Zookeeper】源码分析之 Leader 选举(二)之 FastLeaderElection

持久化

  1. 【分布式】Zookeeper 数据与存储
  2. 【Zookeeper】源码分析之持久化(一)之 FileTxnLog
  3. 【Zookeeper】源码分析之持久化(二)之 FileSnap
  4. 【Zookeeper】源码分析之持久化(三)之 FileTxnSnapLog

通信(通信协议、序列化、会话、请求处理)

  1. 【分布式】Zookeeper 序列化及通信协议
  2. 【Zookeeper】源码分析之序列化
  3. 【分布式】Zookeeper 会话
  4. 【分布式】Zookeeper 请求处理
  5. 【Zookeeper】源码分析之请求处理链(一)
  6. 【Zookeeper】源码分析之请求处理链(二)之 PrepRequestProcessor
  7. 【Zookeeper】源码分析之请求处理链(三)之 SyncRequestProcessor
  8. 【Zookeeper】源码分析之请求处理链(四)之 FinalRequestProcessor
  9. 【Zookeeper】源码分析之网络通信(一)
  10. 【Zookeeper】源码分析之网络通信(二)之 NIOServerCnxn
  11. 【Zookeeper】源码分析之网络通信(三)之 NettyServerCnxn