ZooKeeper 的使用

就 ZooKeeper 来说,只明白原理是不够的,因为实际场景非常多,在不同场景下 ZooKeeper 几乎都有不同的应用模式,不过幸运的是 ZooKeeper 已经有一个比较完善的客户端 Curator,在生产环境下几乎不需要投入太多人力就可以解决大部分集群协调问题。

命令行运行 ZooKeeper

Zookeeper 有三种运行形式:集群模式、单机模式、伪集群模式(一台机器上配个集群)。以下实验都是在单机模式下进行。

下载

1
2
3
wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz
tar -zxvf zookeeper-3.4.13.tar.gz
cd zookeeper-3.4.13
  • zkCleanup  清理 Zookeeper 历史数据,包括食物日志文件和快照数据文件
  • zkCli     Zookeeper 的一个简易客户端
  • zkEnv    设置 Zookeeper 的环境变量
  • zkServer   Zookeeper 服务器的启动、停止、和重启脚本

创建配置文件(单机)

在 zookeeper 根目录下创建 conf/zoo.cfg(可以是任何其他文件名):

1
2
3
tickTime=2000 # 单位是毫秒,It is used to do heartbeats and the minimum session timeout will be twice the tickTime
dataDir=/var/lib/zookeeper # the location to store the in-memory database snapshots and, unless specified otherwise, the transaction log of updates to the database
clientPort=2181 # the port to listen for client connections

启动实例:

1
sudo bin/zkServer.sh start

可以使用 jps 命令查看进程。

创建配置文件(集群)

最少需要有三台服务器,且最好是奇数台服务器,如果只有两台,那么在丢失一台后剩余服务器无法组成majority quorum,两台服务器甚至比一台服务器还不稳定,因为存在两个single points of failure
在一台机器上启动多台服务器不会产生任何冗余,完全的冗余需要每个服务器运行在单独的机器上(不是一台机器上的多台虚拟机),但是 ZooKeeper 本身的配置都是差不多的。

1
2
3
4
5
6
7
8
9
10
11
tickTime=2000
dataDir=/var/lib/zookeeper/data1
clientPort=2181
initLimit=5 # 在quorum中连接上leader的超时时间,也就是5次tick
syncLimit=2 # how far out of date a server can be from a leader
# 下面配置构成集群的所有服务器,对于第一个参数server.X,当服务器启动后它会在data directory下查询以myid命名的文件,从而知道自己是集群中的哪一个服务器,That file has the contains the server number, in ASCII.
# ip表示该服务器的ip地址,如果是单机配置可以设置为localhost
# 后面有两个端口号,每个peer使用第一个端口号连接上其他peers(比如followers可以使用这个端口号连接上leader,当一个新的leader选举出来后,follower就会在这个端口打开一个TCP连接到leader)。因为默认的leader选举也要使用TCP连接,所以多出来的后一个端口就是用于选举的。
server.1=ip1:2888:3888
server.2=ip2:2888:3888
server.3=ip3:2888:3888

然后,在 dataDir 目录下创建 myid 文件,包含一个 ASCII 字符 1。
其他两个节点是类似的,需要修改 dataDir 和 clientPort 两个参数,并且在各自的 dataDir 下创建 myid 文件,包含且仅包含一个数字(1、2、3)。
如果并非单机部署,意味着会发生主机间的通信,需要开启端口:

1
2
3
4
5
6
7
8
9
10
11
12
# 对客户端开放的端口
iptables -A INPUT -p tcp --dport 2181 -j ACCEPT
iptables -A INPUT -p tcp --dport 2182 -j ACCEPT
iptables -A INPUT -p tcp --dport 2183 -j ACCEPT
# 用于peers间通信的端口
iptables -A INPUT -p tcp --dport 2888 -j ACCEPT
iptables -A INPUT -p tcp --dport 2889 -j ACCEPT
iptables -A INPUT -p tcp --dport 2890 -j ACCEPT
# 用于选举的端口
iptables -A INPUT -p tcp --dport 3888 -j ACCEPT
iptables -A INPUT -p tcp --dport 3889 -j ACCEPT
iptables -A INPUT -p tcp --dport 3890 -j ACCEPT

连接

如果只启动了一个服务器,整个集群不能对外提供服务(使用 zkCli 连接会报一大堆错),但是可以使用 telnet 来测试可用性:

1
telnet 127.0.0.1 2181

启动所有服务器后就可以通过 zkCli 连接了:

1
bin/zkCli.sh -server 127.0.0.1:2181

测试命令

  • 基本用法
    1
    2
    3
    4
    5
    6
    help
    ls /
    # 创建一个新的znode:/zk_test,并关联数据my_data
    create /zk_test my_data
    # 检查关联于该节点的数据
    get /zk_test
  • 三种节点类型的创建
    1
    2
    3
    4
    5
    6
    7
    # create [-s] [-e] path data acl
    # 创建节点,默认是持久节点
    create /perm_test perm_data
    # -s 指定创建顺序节点
    create -s /ordered_test ordered_data
    # -e 指定临时节点
    create -e /temp_test temp_data
    acl 用来进行权限控制
  • 查看节点
    临时节点会在客户端会话结束后被自动删除,下面退出后重新连接观察节点(根目录下已经不存在临时节点):
    1
    2
    quit
    ls /
    ls2 命令除了列出目录下的所有子节点外,还可以获取目录的其他属性:
    1
    ls2 /
    get 命令查看节点的具体信息
    1
    get /zk_test
  • 修改该节点的数据
    1
    2
    # set path data [version]
    set /zk_test junk
  • 删除节点
    1
    2
    3
    # delete path [version]
    delete /zk_test
    ls /
    注意若删除节点存在子节点,那么无法删除该节点,必须先删除子节点,再删除父节点。

停止服务

1
bin/zkServer.sh stop

日志

启动 zookeeper 实例后会在当前目录下创建一个 zookeeper.out 文件,里面是 debug 信息。
另外还有一个 transaction log(可以在更新时降低延迟),默认放在 dataDir 目录下,也可以通过添加 dataLogDir 配置来指定。

通过 Docker 部署 standalone 模式

standalone.cfg:

1
2
3
4
5
6
7
tickTime=2000
dataDir=/var/lib/zookeeper/
clientPort=2181
initLimit=5
syncLimit=2
server.1=127.0.0.1:2888:3888
4lw.commands.whitelist=*

standalone.sh:

1
2
3
4
5
6
#!/usr/bin/env bash
sudo cp standalone.cfg /opt/zookeeper/config/standalone.cfg
sudo mkdir -p /opt/zookeeper/data
sudo chmod 777 /opt/zookeeper/data

docker run -p 2181:2181 -v /opt/zookeeper/data:/var/lib/zookeeper/ -v /opt/zookeeper/config/standalone.cfg:/conf/zoo.cfg --name standalone-zk -d zookeeper

镜像:https://hub.docker.com/_/zookeeper
部署完后连接执行命令时可能报错:stat is not executed because it is not in the whitelist.
需要允许所有 4 字命令:https://blog.csdn.net/AroudSoft/article/details/98489889

ZooKeeper 集群

集群的部署方式

ZooKeeper 的部署方式是怎样的?集群中的机器角色都有哪些?集群最少需要几台机器?
单机、集群、伪集群。
Leader、Follower。
集群需要至少 3(2N + 1)台机器,保证奇数,主要是为了保证选举算法能得到大多数节点的确认。

崩溃恢复

集群如果有 3 台机器,挂掉一台的情况下集群还能工作吗?挂掉两台呢?
记住一个原则:过半存活即可用,只要能得到超过半数的选票就可以最终选出一个 Leader。

扩容

ZooKeeper 集群支持动态添加机器吗?这个问题讨论的是水平扩容能力,ZooKeeper 在这方面的支持不太好。如果需要添加机器,可以采取以下两种方式:

  • 全部重启:关闭所有 ZooKeeper 服务器,修改配置之后再启动,不影响之前客户端的会话。
  • 逐个重启:顾名思义。这是比较常用的方式。

ZooKeeper 原生客户端

一般不会直接使用 ZooKeeper 原生客户端,坑太多:

  1. 初始化连接的问题: 在 client 与 server 之间握手建立连接的过程中,如果握手失败,执行所有的同步方法(比如 create,getData 等)将抛出异常;
  2. 自动恢复(failover)的问题: 当 client 与一台 server 的连接丢失,并试图去连接另外一台 server 时,client 将回到初始连接模式;
  3. session 过期的问题: 在极端情况下,出现 ZooKeeper session 过期,客户端需要自己去监听该状态并重新创建 ZooKeeper 实例;
  4. 对可恢复异常的处理:当在 server 端创建一个有序 ZNode,而在将节点名返回给客户端时崩溃,此时 client 端抛出可恢复的异常,用户需要自己捕获这些异常并进行重试;
  5. 使用场景的问题:Zookeeper 提供了一些标准的使用场景支持,但是 ZooKeeper 对这些功能的使用说明文档很少,而且很容易用错. 在一些极端场景下如何处理,zk 并没有给出详细的文档说明. 比如共享锁服务,当服务器端创建临时顺序节点成功,但是在客户端接收到节点名之前挂掉了,如果不能很好的处理这种情况,将导致死锁。

Curator

  1. 封装 ZooKeeper client 与 ZooKeeper server 之间的连接处理,提供连接状态监控、zk 客户端实例管理等特性;
  2. 提供了一套 Fluent 风格的操作 API;
  3. 提供 ZooKeeper 各种应用场景(recipe, 比如共享锁服务, 集群领导选举机制)的抽象封装,提供对各种使用场景的支持(甚至包括 zk 自身不支持的场景), 这些实现都遵循了 zk 的最佳实践, 并考虑了各种极端情况。

下面按 Curator 的组成进行分析。

Client

Client 是 ZooKeeper 客户端的一个替代品,提供了一些底层处理和相关的工具方法。

连接管理

Curator 对 zk 客户端到 server 集群连接进行管理. 并在需要的情况, 重建 zk 实例, 保证与 zk 集群的可靠连接。
Curator 初始化之后会一直的对 zk 连接进行监听, 一旦发现连接状态发生变化, 将作出相应的处理

连接重试

1
2
3
4
5
6
7
8
9
10
11
12
13
RetryLoop retryLoop = client.newRetryLoop();  
while (retryLoop.shouldContinue()) {
try {
// perform your work
...
// it's important to re-get the ZK instance as there may have been an error and the instance was re-created
ZooKeeper zk = client.getZookeeper();

retryLoop.markComplete();
} catch (Exception e) {
retryLoop.takeException(e);
}
}
1
2
3
4
5
6
7
8
RetryLoop.callWithRetry(client, new Callable() {  
@Override
public Void call() throws Exception
{
// do your work here - it will get retried if needed
return null;
}
});

Connection Guarantees

Curator 是通过重连机制保证连接的可靠的:

  • Every Curator operation properly waits until the ZooKeeper connection is established
  • Every Curator Framework operation (create, getData, etc.) is guaranteed to manage connection loss and/or session expiration per the currently set retry policy
  • If the connection is temporarily lost, Curator will attempt to retry the operation until it succeeds per the currently set retry policy
  • All Curator recipes attempt to deal with connection issues in an appropriate way

Curator 提供可插拔的重试机制,它将给捕获所有可恢复的异常配置一个重试策略, 并且内部也提供了几种标准的重试策略(比如指数补偿)
RetryPolicy 接口只有一个方法(以前版本有两个方法):
public boolean allowRetry(int retryCount, long elapsedTimeMs);
在开始重试之前, allowRetry 方法被调用, 其参数将指定当前重试次数, 和操作已消耗时间. 如果允许, 将继续重试, 否则抛出异常.
有四种内置的重试策略:
ExponentialBackoffRetry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加.
RetryNTimes:指定最大重试次数的重试策略
RetryOneTime:仅重试一次
RetryUntilElapsed:一直重试直到达到规定的时间

Framework

用来简化 ZooKeeper 高级功能的使用,并增加了一些新的功能, 比如管理到 ZooKeeper 集群的连接、重试处理

CuratorFrameworkFactory 类

提供了两个方法, 一个工厂方法 newClient, 一个构建方法 build. 使用工厂方法 newClient 可以创建一个默认的实例, 而 build 构建方法可以对实例进行定制. 当 CuratorFramework 实例构建完成, 紧接着调用 start()方法, 在应用结束的时候, 需要调用 close()方法. CuratorFramework 是线程安全的. 在一个应用中可以共享同一个 zk 集群的 CuratorFramework.

1
2
3
4
client.create().forPath("/head", new byte[0]);  
client.delete().inBackground().forPath("/head");
client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child", new byte[0]);
client.getData().watched().inBackground().forPath("/test");

方法说明:
create(): 发起一个 create 操作. 可以组合其他方法 (比如 mode 或 background) 最后以 forPath()方法结尾
delete(): 发起一个删除操作. 可以组合其他方法(version 或 background) 最后以 forPath()方法结尾
checkExists(): 发起一个检查 ZNode 是否存在的操作. 可以组合其他方法(watch 或 background) 最后以 forPath()方法结尾
getData(): 发起一个获取 ZNode 数据的操作. 可以组合其他方法(watch, background 或 get stat) 最后以 forPath()方法结尾
setData(): 发起一个设置 ZNode 数据的操作. 可以组合其他方法(version 或 background) 最后以 forPath()方法结尾
getChildren(): 发起一个获取 ZNode 子节点的操作. 可以组合其他方法(watch, background 或 get stat) 最后以 forPath()方法结尾
inTransaction(): 发起一个 ZooKeeper 事务. 可以组合 create, setData, check, 和/或 delete 为一个操作, 然后 commit() 提交

通知(Notification)

Curator 的相关代码已经更新了, 里面的接口已经由 ClientListener 改成 CuratorListener 了, 而且接口中去掉了 clientCloseDueToError 方法. 只有一个方法:
eventReceived() 当一个后台操作完成或者指定的 watch 被触发时该方法被调用
UnhandledErrorListener 接口用来对异常进行处理.
CuratorEvent(在以前版本为 ClientEvent)是对各种操作触发相关事件对象(POJO)的一个完整封装, 而事件对象的内容跟事件类型相关, 下面是对应关系:
CREATE getResultCode() and getPath()
DELETE getResultCode() and getPath()
EXISTS getResultCode(), getPath() and getStat()
GET_DATA getResultCode(), getPath(), getStat() and getData()
SET_DATA getResultCode(), getPath() and getStat()
CHILDREN getResultCode(), getPath(), getStat(), getChildren()
WATCHED getWatchedEvent()

名称空间(Namespace)

因为一个 zk 集群会被多个应用共享, 为了避免各个应用的 zk patch 冲突, Curator Framework 内部会给每一个 Curator Framework 实例分配一个 namespace(可选). 这样你在 create ZNode 的时候都会自动加上这个 namespace 作为这个 node path 的 root. 使用代码如下:

1
2
3
4
CuratorFramework client = CuratorFrameworkFactory.builder().namespace("MyApp") ... build();  

client.create().forPath("/test", data);
// node was actually written to: "/MyApp/test"

Recipes

实现了通用 ZooKeeper 的 recipe, 该组件建立在 Framework 的基础之上,Curator 实现了 ZooKeeper 的所有 recipe(除了两段提交)

选举

集群领导选举(leader election)

锁服务

共享锁: 全局同步分布式锁, 同一时间两台机器只有一台能获得同一把锁.
共享读写锁: 用于分布式的读写互斥处理, 同时生成两个锁:一个读锁, 一个写锁, 读锁能被多个应用持有, 而写锁只能一个独占, 当写锁未被持有时, 多个读锁持有者可以同时进行读操作
共享信号量: 在分布式系统中的各个 JVM 使用同一个 zk lock path, 该 path 将跟一个给定数量的租约(lease)相关联, 然后各个应用根据请求顺序获得对应的 lease, 相对来说, 这是最公平的锁服务使用方式.
多共享锁:内部构件多个共享锁(会跟一个 znode path 关联), 在 acquire()过程中, 执行所有共享锁的 acquire()方法, 如果中间出现一个失败, 则将释放所有已 require 的共享锁; 执行 release()方法时, 则执行内部多个共享锁的 release 方法(如果出现失败将忽略)

队列(Queue)

分布式队列:采用持久顺序 zk node 来实现 FIFO 队列, 如果有多个消费者, 可以使用 LeaderSelector 来保证队列的消费者顺序
分布式优先队列: 优先队列的分布式版本
BlockingQueueConsumer: JDK 阻塞队列的分布式版本

关卡(Barrier)

分布式关卡:一堆客户端去处理一堆任务, 只有所有的客户端都执行完, 所有客户端才能继续往下处理
双分布式关卡:同时开始, 同时结束

计数器(Counter)

共享计数器:所有客户端监听同一个 znode path, 并共享一个最新的 integer 计数值
分布式 AtomicLong(AtomicInteger): AtomicXxx 的分布式版本, 先采用乐观锁更新, 若失败再采用互斥锁更新, 可以配置重试策略来处理重试

Utilities:各种 ZooKeeper 的工具类

Path Cache

Path Cache 用于监听 ZNode 的子节点的变化, 当 add, update, remove 子节点时将改变 Path Cache state, 同时返回所有子节点的 data 和 state.
Curator 中采用了 PathChildrenCache 类来处理 Path Cache, 状态的变化则采用 PathChildrenCacheListener 来监听.
相关用法参见 TestPathChildrenCache 测试类
注意: 当 zk server 的数据发生变化, zk client 会出现不一致, 这个需要通过版本号来识别这种状态的变化

Test Server

用来在测试中模拟一个本地进程内 ZooKeeper Server.

Test Cluster

用来在测试中模拟一个 ZooKeeper Server 集群

ZKPaths 工具类

提供了和 ZNode 相关的 path 处理工具方法:
getNodeFromPath: 根据给定 path 获取 node name. i.e. “/one/two/three” -> “three”
mkdirs: 根据给定路径递归创建所有 node
getSortedChildren: 根据给定路径, 返回一个按序列号排序的子节点列表
makePath: 根据给定的 path 和子节点名, 创建一个完整 path

EnsurePath 工具类

直接看例子, 具体的说就是调用多次, 只会执行一次创建节点操作.

1
2
3
4
5
6
7
8
EnsurePath ensurePath = new EnsurePath(aFullPathToEnsure);  
...
String nodePath = aFullPathToEnsure + "/foo";
ensurePath.ensure(zk); // first time syncs and creates if needed
zk.create(nodePath, ...);
...
ensurePath.ensure(zk); // subsequent times are NOPs
zk.create(nodePath, ...);

Notification 事件处理

Curator 对 ZooKeeper 的事件 Watcher 进行了封装处理, 然后实现了一套监听机制. 提供了几个监听接口用来处理 ZooKeeper 连接状态的变化
当连接出现异常, 将通过 ConnectionStateListener 接口进行监听, 并进行相应的处理, 这些状态变化包括:
暂停(SUSPENDED): 当连接丢失, 将暂停所有操作, 直到连接重新建立, 如果在规定时间内无法建立连接, 将触发 LOST 通知
重连(RECONNECTED): 连接丢失, 执行重连时, 将触发该通知
丢失(LOST): 连接超时时, 将触发该通知
从 com.netflix.curator.framework.imps.CuratorFrameworkImpl.validateConnection(CuratorEvent)方法中我们可以知道, Curator 分别将 ZooKeeper 的 Disconnected, Expired, SyncConnected 三种状态转换成上面三种状态.

日志工具

Curator 内部采用 SLF4J 来输出日志
采用驱动器(driver)机制, 允许扩展和定制日志和跟踪处理
提供了一个 TracerDriver 接口, 通过实现 addTrace()和 addCount()接口来集成用户自己的跟踪框架

ZkClient

相对 Curator 的不足

  • 文档几乎没有
  • 异常处理弱爆了(简单的抛出 RuntimeException)
  • 重试处理太难用了
  • 没有提供各种使用场景的实现

特性

  1. 提供了 Zk 断链重连的特性::这个特性似乎每个开发者都会设计,而且代码风格几乎”如出一辙”。在大部分 Zk 使用场景中,我们都要求它能够在断链的时候,重新建立连接,无论 session 失效与否。
  2. Event 监听器机制:向 ZNode 节点注册 watch,每个开发者都使用过,尽管 watch 机制并不能确保数据变更的实时性。Watch-Event 属于”即发即失”,因为我们需要得到 Event 时候,再去注册一遍,这也是一个非常繁琐的事情,I0Itec-ZkClient 提供了 Event-Listener 的小技巧,可以帮助我们”解脱”。
  3. Zk 异常处理:Zkr 中繁多的 Exception,以及每个 Exception 所需要关注的事情各有不同,你应该记得那一堆 try-catch 给你带来的烦恼。
  4. Data 序列化:简单的 data 序列化(Serialzer/Deserialzer)。

ZkClient API

  1. ZkConnection 类:对 Zk API 的简单分装,提供了链接 Zk Server 和数据 CRUD 的操作;此类实现了 IZkConnection 接口,通常情况下,如果 I0Itec-Zkclient 不能满足需要的时候,我们可以重写 ZkConnection 即可。
  2. ZkClient 类:核心类,也是开发者需要直接使用的类,它内部维护了 Zk 的链接管理和 Event 处理逻辑等,同时也暴露了 Zookeeper Znode 的 CRUD 方法列表。
  3. IZkChildListener 接口:ZNode 子节点事件侦听器,当 ZkClient 接收到某个 path 节点变更或者子节点变更事件时,会触发 Listener。
  4. IZkDataListener 接口
  5. IZkStateListener 接口:当 Zk 客户端状态变更时,触发。

实现分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private ZkClient zkClient;

private String path;
private final String LOCK;

public boolean lock() throws Exception {
if (zkClient.exists(path))
return true;
else
return zkClient.create(path, LOCK.getBytes(), CreateMode.EPHEMERAL) == null ? true : false;
}

public boolean unlock() throws Exception {
return zkClient.delete(path);
}

public boolean islock() throws Exception {
return zkClient.exists(path);
}

参考

环境搭建

  1. ZooKeeper Getting Started Guide
  2. ZooKeeper 单机模式和集群模式的环境搭建

客户端

  1. 原生客户端
    【分布式】Zookeeper 使用–Java API
  2. 开源客户端
    ZooKeeper Programmer’s Guide
    【分布式】Zookeeper 使用–开源客户端
  3. Curator
    Apache Curator
    Zookeeper 客户端 Curator 使用详解
    Zookeeper 开源客户端框架 Curator 简介
    Recipes
  4. ZkClient
    zkclient

应用

  1. Zookeeper-Zookeeper 可以干什么
  2. 【分布式】Zookeeper 在大型分布式系统中的应用
  3. 基于 Zk 实现分布式锁
  4. 【分布式】Zookeeper 应用场景