Redis高可用方案Cluster

Redis Cluster介绍

Cluster 优势

  1. 线性的可扩展性:扩容即迁移槽,已有很多迁移案例;
    如果要保存更多的数据,可以直接增加Master来支持,比如每台Master存32G,要完美存下1T数据的话,可以设置32台的Master,当然实际情况下这样非常浪费,一般会少设置一些,只用几台Master来存储最热的数据。
  2. 没有合并操作:因为 Redis 中的 List 和 Set 中保存的 Value 通常是比较大的,可能会达数以百万计的元素,而它们可能被存储到了不同的 Redis 实例上,传输和合并这样的值将很容易称为一个主要的性能瓶颈;
  3. 写入安全(Write Safety):只有在非常少见的 Master 宕机的情况下,写入才会失败,并且这个失败的时间窗口不大(由一个 Slave 顶替上来);
  4. 可用性(Availability):就算有部分 Master 不可用了,它们的 Slave 仍然可以通过选举提升为 Master。

Cluster 缺点

  1. Redis 集群并不支持处理多个 keys 的命令,因为这需要在不同的节点间移动数据,从而达不到像 Redis 那样的性能,在高负载的情况下可能会导致不可预料的错误。
  2. Redis 集群不像单机版本的 Redis 那样支持多个数据库,集群只有数据库 0,而且也不支持 SELECT 命令。

Cluster的去中心化架构

Cluster
redis cluster在设计的时候,就考虑到了去中心化,去中间件,也就是说,集群中的每个节点都是平等的关系,都是对等的,每个节点都保存各自的数据和整个集群的状态。所有的 redis 节点彼此互联(PING-PONG 机制),内部使用二进制协议优化传输速度和带宽,而且这些连接保持活跃,这样就保证了我们只需要连接集群中的任意一个节点,就可以获取到其他节点的数据。客户端与 redis 节点直连,不需要中间 proxy 层,客户端不需要连接集群所有节点,连接集群中任何一个可用节点即可。

一致性

Redis 不能保证强一致性,因为:

  1. 异步复制:写操作会被异步复制到 slave 节点,但可能由于出现网络分区、脑裂而导致数据丢失。
    网络分区
    如上图所示,客户端 Z1 向 Master-B 写入数据后,集群出现了网络分区,且分区持续的时间足够长导致此时 B1 被选举为新的 Master,则在此期间 Z1 向 B 写入的数据就都丢失了。

    网络分区出现期间,客户端 Z1 可以向主节点 B 发送写命令的最大时间是有限制的, 这一时间限制称为节点超时时间(node timeout), 是 Redis 集群的一个重要的配置选项。

Cluster VS Codis

Codis 集群中包含了 4 类关键组件。

  • codis server:这是进行了二次开发的 Redis 实例,其中增加了额外的数据结构,支持数据迁移操作,主要负责处理具体的数据读写请求。
  • codis proxy:接收客户端请求,并把请求转发给 codis server。
  • Zookeeper 集群:保存集群元数据,例如数据位置信息和 codis proxy 信息。
  • codis dashboard 和 codis fe:共同组成了集群管理工具。其中,codis dashboard 负责执行集群管理工作,包括增删 codis server、codis proxy 和进行数据迁移。而 codis fe 负责提供 dashboard 的 Web 操作界面,便于我们直接在 Web 界面上进行集群管理。

Codis如何处理一次请求:

  1. 客户端连接Codis Proxy,将请求发给Proxy;
  2. codis proxy 接收到请求,就会查询请求数据和 codis server 的映射关系,并把请求转发给相应的 codis server 进行处理
  3. 当 codis server 处理完请求后,会把结果返回给 codis proxy,proxy 再把数据返回给客户端。

以4个方面来讨论

  • 数据分布
    和Cluster类似,Codis将数据保存到slot,集群一共有1024个slot,需要手动分配给Codis Server,或者由dashboard自动分配。
    当客户端要读写数据时,会使用CRC32算法计算key的哈希值,并对1024取模,就可以知道对应的是哪个slot了。
    这个路由规则需要先配置到dashboard,dashboard会把路由表发送给codis proxy,并同时保存到ZooKeeper。
    而Cluster的数据路由表是由每个实例管理的,如果发生变化,这些实例会通过Gossip协议来互相传播,如果实例比较多,就会占用比较多的网络资源。
  • 集群扩容和数据迁移
    如果要增加Codis Server来负载slot,需要配置要迁移的slot,Codis Server会将该slot中的数据一个一个地发送给目标Server。
    增加Codis Proxy也是类似的流程。
  • 客户端兼容性
    Codis客户端直接和Codis Proxy连接,codis proxy 是和单实例客户端兼容的,而集群相关的管理工作又都是由Codis Proxy和Codis dashboard这些组件来完成的,不需要客户端参与。
  • 可靠性保证
    Codis Server保证可靠性:Codis Server本身是Redis实例,只是增加了集群相关的操作命令,可靠性是可以通过主从机制+哨兵来实现的。
    Codis Proxy的可靠性:Proxy上的信息都来自ZooKeeper,例如路由表,只要ZooKeeper集群中实例半数以上可以正常工作,那么ZooKeeper集群就是正常的。

比较:
RedisCluster比对Codis

  • 从稳定性和成熟度来看,Codis 应用得比较早,在业界已经有了成熟的生产部署。虽然 Codis 引入了 proxy 和 Zookeeper,增加了集群复杂度,但是,proxy 的无状态设计和 Zookeeper 自身的稳定性,也给 Codis 的稳定使用提供了保证。而 Redis Cluster 的推出时间晚于 Codis,相对来说,成熟度要弱于 Codis,如果你想选择一个成熟稳定的方案,Codis 更加合适些。
  • 从业务应用客户端兼容性来看,连接单实例的客户端可以直接连接 codis proxy,而原本连接单实例的客户端要想连接 Redis Cluster 的话,就需要开发新功能。所以,如果你的业务应用中大量使用了单实例的客户端,而现在想应用切片集群的话,建议你选择 Codis,这样可以避免修改业务应用中的客户端。
  • 从使用 Redis 新命令和新特性来看,Codis server 是基于开源的 Redis 3.2.8 开发的,所以,Codis 并不支持 Redis 后续的开源版本中的新增命令和数据类型。另外,Codis 并没有实现开源 Redis 版本的所有命令,比如 BITOP、BLPOP、BRPOP,以及和与事务相关的 MUTLI、EXEC 等命令。Codis 官网上列出了不被支持的命令列表,你在使用时记得去核查一下。所以,如果你想使用开源 Redis 版本的新特性,Redis Cluster 是一个合适的选择。
  • 从数据迁移性能维度来看,Codis 能支持异步迁移,异步迁移对集群处理正常请求的性能影响要比使用同步迁移的小。所以,如果你在应用集群时,数据迁移比较频繁的话,Codis 是个更合适的选择。

数据分布(分区)和查询路由

分区策略

分区将原来比较大的数据集分离存储到多个存储媒介上,分区后Redis可以管理更大的内存空间和计算能力,但同时多主机又会面临很多分布式集群的可用性、一致性等问题。
分区策略:

  1. 范围分区
    将不同范围的对象映射到不同的Redis实例,比如用户ID为0到10000的存储到R0,10001到20000的存储到R1,以此类推。
    缺点是需要建立一张映射表,谨小甚微地维护ID和Redis实例之间的映射关系,而且由于需要维护表,导致效率不如其他方案。
  2. 散列分区
    使用散列函数(如CRC32)将key转换为一个数字,取模得到一个0到3的数字(假设Redis服务器有4台),这个数字即对应服务器的序号。
  3. 一致性哈希
    一致性哈希的一种示例实现可以参考Dubbo中的实现:com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance
    关键代码如下:
    ConsistentHashLoadBalance
  4. 哈希槽
    Redis Cluster采用的是哈希槽的方式。
    Redis 集群没有并使用传统的一致性哈希来分配数据,而是采用另外一种叫做哈希槽 (hash slot)的方式来分配的。redis cluster 默认分配了 16384 个 slot,当我们 set 一个 key 时,会用CRC16算法来取模得到所属的 slot,然后将这个 key 分到哈希槽区间的节点上,具体算法就是:CRC16(key) % 16384。所以我们在测试的时候看到 set 和 get 的时候,直接跳转到了 7000 端口的节点。
    客户端在接收到重定向错误(redirections errors) -MOVED 和 -ASK 的时候, 将命令重定向到其他节点。客户端不需要存储集群信息(槽所在位置),但是如何客户端可以缓存键值和节点之间的映射关系,就可以明显提高命令执行的效率了(Redisson 中就是这么做的)。
    在 Cluster 架构中,slave 节点不分配槽,只拥有读权限,但是在代码中 cluster 执行读写操作的都是 master 节点,并不是读就是从节点、写就是主节点。

源码中,Redis采用一个大小固定为CLUSTER_SLOTS的clusterNode数组slots来保存每个桶的负责节点,这是个字节数组,每个位表示当前节点是否负责这个槽:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
// 节点状态
struct clusterNode {

// 创建节点的时间
mstime_t ctime; /* Node object creation time. */

// 节点的名字,由 40 个十六进制字符组成
// 例如 68eef66df23420a5862208ef5b1a7005b806f2ff
char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */

// 节点标识
// 使用各种不同的标识值记录节点的角色(比如主节点或者从节点),
// 以及节点目前所处的状态(比如在线或者下线)。
int flags; /* REDIS_NODE_... */

// 节点当前的配置纪元,用于实现故障转移
uint64_t configEpoch; /* Last configEpoch observed for this node */

// 由这个节点负责处理的槽
// 一共有 REDIS_CLUSTER_SLOTS / 8 个字节长
// 每个字节的每个位记录了一个槽的保存状态
// 位的值为 1 表示槽正由本节点处理,值为 0 则表示槽并非本节点处理
// 比如 slots[0] 的第一个位保存了槽 0 的保存情况
// slots[0] 的第二个位保存了槽 1 的保存情况,以此类推
unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */

// 该节点负责处理的槽数量
int numslots; /* Number of slots handled by this node */

// 如果本节点是主节点,那么用这个属性记录从节点的数量
int numslaves; /* Number of slave nodes, if this is a master */

// 指针数组,指向各个从节点
struct clusterNode **slaves; /* pointers to slave nodes */

// 如果这是一个从节点,那么指向主节点
struct clusterNode *slaveof; /* pointer to the master node */

// 最后一次发送 PING 命令的时间
mstime_t ping_sent; /* Unix time we sent latest ping */

// 最后一次接收 PONG 回复的时间戳
mstime_t pong_received; /* Unix time we received the pong */

// 最后一次被设置为 FAIL 状态的时间
mstime_t fail_time; /* Unix time when FAIL flag was set */

// 最后一次给某个从节点投票的时间
mstime_t voted_time; /* Last time we voted for a slave of this master */

// 最后一次从这个节点接收到复制偏移量的时间
mstime_t repl_offset_time; /* Unix time we received offset for this node */

// 这个节点的复制偏移量
long long repl_offset; /* Last known repl offset for this node. */

// 节点的 IP 地址
char ip[REDIS_IP_STR_LEN]; /* Latest known IP address of this node */

// 节点的端口号
int port; /* Latest known port of this node */

// 保存连接节点所需的有关信息
clusterLink *link; /* TCP/IP link with this node */

// 一个链表,记录了所有其他节点对该节点的下线报告
list *fail_reports; /* List of nodes signaling this as failing */

};
typedef struct clusterNode clusterNode;

// 集群状态,每个节点都保存着一个这样的状态,记录了它们眼中的集群的样子。
// 另外,虽然这个结构主要用于记录集群的属性,但是为了节约资源,
// 有些与节点有关的属性,比如 slots_to_keys 、 failover_auth_count
// 也被放到了这个结构里面。
typedef struct clusterState {

// 指向当前节点的指针
clusterNode *myself; /* This node */

// 集群当前的配置纪元,用于实现故障转移
uint64_t currentEpoch;

// 集群当前的状态:是在线还是下线
int state; /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */

// 集群中至少处理着一个槽的节点的数量。
int size; /* Num of master nodes with at least one slot */

// 集群节点名单(包括 myself 节点)
// 字典的键为节点的名字,字典的值为 clusterNode 结构
dict *nodes; /* Hash table of name -> clusterNode structures */

// 节点黑名单,用于 CLUSTER FORGET 命令
// 防止被 FORGET 的命令重新被添加到集群里面
// (不过现在似乎没有在使用的样子,已废弃?还是尚未实现?)
dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */

// 记录要从当前节点迁移到目标节点的槽,以及迁移的目标节点
// migrating_slots_to[i] = NULL 表示槽 i 未被迁移
// migrating_slots_to[i] = clusterNode_A 表示槽 i 要从本节点迁移至节点 A
clusterNode *migrating_slots_to[REDIS_CLUSTER_SLOTS];

// 记录要从源节点迁移到本节点的槽,以及进行迁移的源节点
// importing_slots_from[i] = NULL 表示槽 i 未进行导入
// importing_slots_from[i] = clusterNode_A 表示正从节点 A 中导入槽 i
clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS];

// 负责处理各个槽的节点
// 例如 slots[i] = clusterNode_A 表示槽 i 由节点 A 处理
clusterNode *slots[REDIS_CLUSTER_SLOTS];

// 跳跃表,表中以槽作为分值,键作为成员,对槽进行有序排序
// 当需要对某些槽进行区间(range)操作时,这个跳跃表可以提供方便
// 具体操作定义在 db.c 里面
zskiplist *slots_to_keys;

/* The following fields are used to take the slave state on elections. */
// 以下这些域被用于进行故障转移选举

// 上次执行选举或者下次执行选举的时间
mstime_t failover_auth_time; /* Time of previous or next election. */

// 节点获得的投票数量
int failover_auth_count; /* Number of votes received so far. */

// 如果值为 1 ,表示本节点已经向其他节点发送了投票请求
int failover_auth_sent; /* True if we already asked for votes. */

int failover_auth_rank; /* This slave rank for current auth request. */

uint64_t failover_auth_epoch; /* Epoch of the current election. */

/* Manual failover state in common. */
/* 共用的手动故障转移状态 */

// 手动故障转移执行的时间限制
mstime_t mf_end; /* Manual failover time limit (ms unixtime).
It is zero if there is no MF in progress. */
/* Manual failover state of master. */
/* 主服务器的手动故障转移状态 */
clusterNode *mf_slave; /* Slave performing the manual failover. */
/* Manual failover state of slave. */
/* 从服务器的手动故障转移状态 */
long long mf_master_offset; /* Master offset the slave needs to start MF
or zero if stil not received. */
// 指示手动故障转移是否可以开始的标志值
// 值为非 0 时表示各个主服务器可以开始投票
int mf_can_start; /* If non-zero signal that the manual failover
can start requesting masters vote. */

/* The followign fields are uesd by masters to take state on elections. */
/* 以下这些域由主服务器使用,用于记录选举时的状态 */

// 集群最后一次进行投票的纪元
uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */

// 在进入下个事件循环之前要做的事情,以各个 flag 来记录
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */

// 通过 cluster 连接发送的消息数量
long long stats_bus_messages_sent; /* Num of msg sent via cluster bus. */

// 通过 cluster 接收到的消息数量
long long stats_bus_messages_received; /* Num of msg rcvd via cluster bus.*/

} clusterState;

分区的实现层次

分区可以在程序的不同层次实现。

  • 客户端分区
    就是在客户端就已经决定数据会被存储到哪个redis节点或者从哪个redis节点读取。大多数客户端已经实现了客户端分区。
  • 代理分区
    意味着客户端将请求发送给代理,然后代理决定去哪个节点写数据或者读数据。代理根据分区规则决定请求哪些Redis实例,然后根据Redis的响应结果返回给客户端。redis和memcached的一种代理实现就是Twemproxy
  • 查询路由(Query routing)
    意思是客户端随机地请求任意一个redis实例,然后由Redis将请求转发给正确的Redis节点。Redis Cluster实现了一种混合形式的查询路由,但并不是直接将请求从一个redis节点转发到另一个redis节点,而是在客户端的帮助下直接redirected到正确的redis节点。

Redis Cluster采用的是查询路由的方式。
在Cluster模式下,Redis接收任何命令都会首先计算键对应的桶编号,再根据桶找出所对应的节点,如果节点是自身,则处理键命令;否则回复MOVED重定向错误,通知客户端请求正确的节点,这个过程称为MOVED重定向。
在客户端初次连接Redis集群时,如果客户端是Smart Client,它会获取集群的节点信息及slot的分布信息,并在本地缓存一份 hash slot 与node关系的路由表,这样不必每次访问服务器时都因为重定向而经过多次网络调用。
redis-cli不是smart client,它没有缓存路由表的功能;Java客户端Redisson是smart client,它在初始化时会调用redis实例的CLUSTER NODES命令来获取集群中每个Master负责的slot范围,并启动一个定时任务来每秒刷新本地缓存的集群状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
/**
* 启动时查询集群状态
*/
public ClusterConnectionManager(ClusterServersConfig cfg, Config config) {
super(config);

this.config = create(cfg);
initTimer(this.config);

Throwable lastException = null;
List<String> failedMasters = new ArrayList<String>();
for (URI addr : cfg.getNodeAddresses()) {
RFuture<RedisConnection> connectionFuture = connect(cfg, addr);
try {
RedisConnection connection = connectionFuture.syncUninterruptibly().getNow();

// 发送cluster nodes命令

clusterNodesCommand = RedisCommands.CLUSTER_NODES;
if ("rediss".equals(addr.getScheme())) {
clusterNodesCommand = RedisCommands.CLUSTER_NODES_SSL;
}

List<ClusterNodeInfo> nodes = connection.sync(clusterNodesCommand);

StringBuilder nodesValue = new StringBuilder();
for (ClusterNodeInfo clusterNodeInfo : nodes) {
nodesValue.append(clusterNodeInfo.getNodeInfo()).append("\n");
}
log.info("Redis cluster nodes configuration got from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue);

lastClusterNode = addr;

// 读取每个节点的分区配置

Collection<ClusterPartition> partitions = parsePartitions(nodes);

...

} catch (Exception e) {
lastException = e;
log.warn(e.getMessage());
}
}

...

// 每秒定时刷新本地缓存的cluster状态,包括每个Master节点负责的slot范围
scheduleClusterChangeCheck(cfg, null);
}

/**
* 获取key所处的节点
*/
private NodeSource getNodeSource(String key) {
int slot = connectionManager.calcSlot(key);
MasterSlaveEntry entry = connectionManager.getEntry(slot);
return new NodeSource(entry);
}

/**
* 获取key所处的节点
*/
protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final Codec codec,
// 建立连接、发送命令
final RFuture<RedisConnection> connectionFuture;
if (readOnlyMode) {
connectionFuture = connectionManager.connectionReadOp(source, command);
} else {
connectionFuture = connectionManager.connectionWriteOp(source, command);
}

...

connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
if (connFuture.isCancelled()) {
return;
}
// 如果执行不成功,则设置异常信息
if (!connFuture.isSuccess()) {
connectionManager.getShutdownLatch().release();
details.setException(convertException(connectionFuture));
return;
}
// 如果执行OK,释放连接
if (details.getAttemptPromise().isDone() || details.getMainPromise().isDone()) {
releaseConnection(source, connectionFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details);
return;
}

final RedisConnection connection = connFuture.getNow();
// 响应ASK的情况
if (details.getSource().getRedirect() == Redirect.ASK) {
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2);
RPromise<Void> promise = connectionManager.newPromise();
list.add(new CommandData<Void, Void>(promise, details.getCodec(), RedisCommands.ASKING, new Object[]{}));
list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
RPromise<Void> main = connectionManager.newPromise();
ChannelFuture future = connection.send(new CommandsData(main, list));
details.setWriteFuture(future);
}
// 响应MOVED的情况
else {
if (log.isDebugEnabled()) {
log.debug("acquired connection for command {} and params {} from slot {} using node {}... {}",
details.getCommand(), Arrays.toString(details.getParams()), details.getSource(), connection.getRedisClient().getAddr(), connection);
}
ChannelFuture future = connection.send(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
details.setWriteFuture(future);
}

details.getWriteFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
checkWriteFuture(details, connection);
}
});

releaseConnection(source, connectionFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details);
}
});
}

下面是手动调用cluster nodes可以得到的响应,从中可以看到每个master所负责的slot范围:

1
2
3
4
5
6
7
8
9
10
hgc@hgc-X555LD:~$ redis-cli -h 10.32.64.12 -p 16371 -c
10.32.64.12:16371> auth 123456
OK
10.32.64.12:16371> cluster nodes
d0af93527054ae3713c6ae82f4f58e016c4968d7 10.32.64.161:16371@26371 slave dbf60db7dc4c2d8ea944481d162bf6be7ef48f5a 0 1604569999114 28 connected
20def2dff31ba78c04f72431a51054def2120638 10.32.64.161:16372@26372 master - 0 1604569998112 31 connected 0-5460
b0c86436cd4cf6d2240faf01b45735616b82cae8 10.32.64.12:16371@26371 myself,slave 20def2dff31ba78c04f72431a51054def2120638 0 1604569995000 30 connected
dbf60db7dc4c2d8ea944481d162bf6be7ef48f5a 10.32.64.162:16372@26372 master - 0 1604569996000 28 connected 5461-10922
005c670071b5dab4ef085613f0ca6666fc5bcbce 10.32.64.12:16373@26373 slave df7f690feee2ad536f2573b55190e0f8d576779e 0 1604569998000 25 connected
df7f690feee2ad536f2573b55190e0f8d576779e 10.32.64.162:16373@26373 master - 0 1604569997000 25 connected 10923-16383

如果Cluster发生了扩容缩容或failover导致客户端缓存的信息过期,客户端只需要MOVED时重新更新本地缓存即可。
但是这里有一个问题,如果扩容缩容时正在发生槽迁移,这时正在迁移中的槽在哪个节点是不确定的,可能会导致客户端本地缓存的频繁更新。因此,Redis迁移过程中,会对正在迁移的槽打标记(server.cluster->migrating_slots_to),如果客户端访问的key命中了正在迁移中的槽,则服务器会返回ASK而不是MOVED,客户端接收到ASK后不会重新更新本地的槽缓存。
代码:redis.c/processCommand

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/* If cluster is enabled perform the cluster redirection here.
*
* 如果开启了集群模式,那么在这里进行转向操作。
*
* However we don't perform the redirection if:
*
* 不过,如果有以下情况出现,那么节点不进行转向:
*
* 1) The sender of this command is our master.
* 命令的发送者是本节点的主节点
*
* 2) The command has no key arguments.
* 命令没有 key 参数
*/
if (server.cluster_enabled &&
!(c->flags & REDIS_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
{
int hashslot;

// 集群已下线
if (server.cluster->state != REDIS_CLUSTER_OK) {
flagTransaction(c);
addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n"));
return REDIS_OK;

// 集群运作正常
} else {
int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
// 不能执行多键处理命令
if (n == NULL) {
flagTransaction(c);
if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) {
addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
} else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) {
/* The request spawns mutliple keys in the same slot,
* but the slot is not "stable" currently as there is
* a migration or import in progress. */
addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
} else {
redisPanic("getNodeByQuery() unknown error.");
}
return REDIS_OK;

// 命令针对的槽和键不是本节点处理的,进行转向
} else if (n != server.cluster->myself) {
flagTransaction(c);
// -<ASK or MOVED> <slot> <ip>:<port>
// 例如 -ASK 10086 127.0.0.1:12345
addReplySds(c,sdscatprintf(sdsempty(),
"-%s %d %s:%d\r\n",
(error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
hashslot,n->ip,n->port));

return REDIS_OK;
}

// 如果执行到这里,说明键 key 所在的槽由本节点处理
// 或者客户端执行的是无参数命令
}
}

分区的缺点

有些特性在分区的情况下会受到限制:

  • 涉及多个key的操作通常不会被支持。例如你不能对两个集合求交集,因为他们可能被存储到不同的Redis实例(实际上这种情况也有办法,但是不能直接使用交集指令)。
    同时操作多个key,则不能使用Redis事务.
  • 分区使用的粒度是key,不能使用一个非常长的排序key存储一个数据集(The partitioning granularity is the key, so it is not possible to shard a dataset with a single huge key like a very big sorted set).
  • 当使用分区的时候,数据处理会非常复杂,例如为了备份你必须从不同的Redis实例和主机同时收集RDB / AOF文件。
  • 分区时动态扩容或缩容可能非常复杂。Redis集群在运行时增加或者删除Redis节点,能做到最大程度对用户透明地数据再平衡,但其他一些客户端分区或者代理分区方法则不支持这种特性。然而,有一种预分片的技术也可以较好的解决这个问题。

当要把Redis当作持久化存储时,需要注意分区的性质

  • 如果Redis被当做缓存使用,使用一致性哈希实现动态扩容缩容。
  • 如果Redis被当做一个持久化存储使用,必须使用固定的keys-to-nodes映射关系,节点的数量一旦确定不能变化。否则的话(即Redis节点需要动态变化的情况),必须使用可以在运行时进行数据再平衡的一套系统,现在Redis Cluster已经支持这种再平衡。

节点通信

Redis Cluster采用Gossip协议完成集群状态数据及路由数据等元数据的管理。
一种简单的集群内状态同步思路是:每次节点都将自己本地的集群状态数据广播到集群内所有N个节点,其他节点判断接收到的数据比本地的新则更新本地数据。但是这种方式的缺点是通信量剧增,网络带宽变得紧张。
因此Redis采用Gossip协议来进行集群内元数据的同步,而且:
1、每次只随机选择K(K << N)个其他节点来同步状态;
集群内每个节点维护定时任务默认每秒执行10次,每秒会随机选取5个节点找出最久没有通信的节点发送ping消息,用于保证Gossip信息交换的随机性。每100毫秒都会扫描本地节点列表,如果发现节点最近一次接受pong消息的时间大于cluster_node_timeout/2,则立刻发送ping消息,防止该节点信息太长时间未更新。根据以上规则得出每个节点每秒需要发送ping消息的数量=1+10*num(node.pong_received>cluster_node_timeout/2),因此cluster_node_timeout参数对消息发送的节点数量影响非常大。当我们的带宽资源紧张时,可以适当调大这个参数,如从默认15秒改为30秒来降低带宽占用率。过度调大cluster_node_timeout会影响消息交换的频率从而影响故障转移、槽信息更新、新节点发现的速度。因此需要根据业务容忍度和资源消耗进行平衡。同时整个集群消息总交换量也跟节点数成正比。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
/* This is executed 10 times every second */
// 集群常规操作函数,默认每秒执行 10 次(每间隔 100 毫秒执行一次)
void clusterCron(void) {
dictIterator *di;
dictEntry *de;
int update_state = 0;
int orphaned_masters; /* How many masters there are without ok slaves. */
int max_slaves; /* Max number of ok slaves for a single master. */
int this_slaves; /* Number of ok slaves for our master (if we are slave). */
mstime_t min_pong = 0, now = mstime();
clusterNode *min_pong_node = NULL;
// 迭代计数器,一个静态变量
static unsigned long long iteration = 0;
mstime_t handshake_timeout;

// 记录一次迭代
iteration++; /* Number of times this function was called so far. */

/* The handshake timeout is the time after which a handshake node that was
* not turned into a normal node is removed from the nodes. Usually it is
* just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
* the value of 1 second. */
// 如果一个 handshake 节点没有在 handshake timeout 内
// 转换成普通节点(normal node),
// 那么节点会从 nodes 表中移除这个 handshake 节点
// 一般来说 handshake timeout 的值总是等于 NODE_TIMEOUT
// 不过如果 NODE_TIMEOUT 太少的话,程序会将值设为 1 秒钟
handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000;

/* Check if we have disconnected nodes and re-establish the connection. */
// 向集群中的所有断线或者未连接节点发送消息
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

// 跳过当前节点以及没有地址的节点
if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;

/* A Node in HANDSHAKE state has a limited lifespan equal to the
* configured node timeout. */
// 如果 handshake 节点已超时,释放它
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
freeClusterNode(node);
continue;
}

// 为未创建连接的节点创建连接
if (node->link == NULL) {
int fd;
mstime_t old_ping_sent;
clusterLink *link;

fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
node->port+REDIS_CLUSTER_PORT_INCR,
server.bindaddr_count ? server.bindaddr[0] : NULL);
if (fd == -1) {
redisLog(REDIS_DEBUG, "Unable to connect to "
"Cluster Node [%s]:%d -> %s", node->ip,
node->port+REDIS_CLUSTER_PORT_INCR,
server.neterr);
continue;
}
link = createClusterLink(node);
link->fd = fd;
node->link = link;
aeCreateFileEvent(server.el,link->fd,AE_READABLE,
clusterReadHandler,link);
/* Queue a PING in the new connection ASAP: this is crucial
* to avoid false positives in failure detection.
*
* If the node is flagged as MEET, we send a MEET message instead
* of a PING one, to force the receiver to add us in its node
* table. */
// 向新连接的节点发送 PING 命令,防止节点被识进入下线
// 如果节点被标记为 MEET ,那么发送 MEET 命令,否则发送 PING 命令
old_ping_sent = node->ping_sent;
clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);

// 这不是第一次发送 PING 信息,所以可以还原这个时间
// 等 clusterSendPing() 函数来更新它
if (old_ping_sent) {
/* If there was an active ping before the link was
* disconnected, we want to restore the ping time, otherwise
* replaced by the clusterSendPing() call. */
node->ping_sent = old_ping_sent;
}

/* We can clear the flag after the first packet is sent.
*
* 在发送 MEET 信息之后,清除节点的 MEET 标识。
*
* If we'll never receive a PONG, we'll never send new packets
* to this node. Instead after the PONG is received and we
* are no longer in meet/handshake status, we want to send
* normal PING packets.
*
* 如果当前节点(发送者)没能收到 MEET 信息的回复,
* 那么它将不再向目标节点发送命令。
*
* 如果接收到回复的话,那么节点将不再处于 HANDSHAKE 状态,
* 并继续向目标节点发送普通 PING 命令。
*/
node->flags &= ~REDIS_NODE_MEET;

redisLog(REDIS_DEBUG,"Connecting with Node %.40s at %s:%d",
node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
}
}
dictReleaseIterator(di);

/* Ping some random node 1 time every 10 iterations, so that we usually ping
* one random node every second. */
// clusterCron() 每执行 10 次(至少间隔一秒钟),就向一个随机节点发送 gossip 信息
if (!(iteration % 10)) {
int j;

/* Check a few random nodes and ping the one with the oldest
* pong_received time. */
// 随机 5 个节点,选出其中一个
for (j = 0; j < 5; j++) {

// 随机在集群中挑选节点
de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);

/* Don't ping nodes disconnected or with a ping currently active. */
// 不要 PING 连接断开的节点,也不要 PING 最近已经 PING 过的节点
if (this->link == NULL || this->ping_sent != 0) continue;

if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE))
continue;

// 选出 5 个随机节点中最近一次接收 PONG 回复距离现在最旧的节点
if (min_pong_node == NULL || min_pong > this->pong_received) {
min_pong_node = this;
min_pong = this->pong_received;
}
}

// 向最久没有收到 PONG 回复的节点发送 PING 命令
if (min_pong_node) {
redisLog(REDIS_DEBUG,"Pinging node %.40s", min_pong_node->name);
clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
}
}

// 遍历所有节点,检查是否需要将某个节点标记为下线
/* Iterate nodes to check if we need to flag something as failing.
* This loop is also responsible to:
* 1) Check if there are orphaned masters (masters without non failing
* slaves).
* 2) Count the max number of non failing slaves for a single master.
* 3) Count the number of slaves for our master, if we are a slave. */
orphaned_masters = 0;
max_slaves = 0;
this_slaves = 0;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
now = mstime(); /* Use an updated time at every iteration. */
mstime_t delay;

// 跳过节点本身、无地址节点、HANDSHAKE 状态的节点
if (node->flags &
(REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE))
continue;

/* Orphaned master check, useful only if the current instance
* is a slave that may migrate to another master. */
if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
int okslaves = clusterCountNonFailingSlaves(node);

if (okslaves == 0 && node->numslots > 0) orphaned_masters++;
if (okslaves > max_slaves) max_slaves = okslaves;
if (nodeIsSlave(myself) && myself->slaveof == node)
this_slaves = okslaves;
}

/* If we are waiting for the PONG more than half the cluster
* timeout, reconnect the link: maybe there is a connection
* issue even if the node is alive. */
// 如果等到 PONG 到达的时间超过了 node timeout 一半的连接
// 因为尽管节点依然正常,但连接可能已经出问题了
if (node->link && /* is connected */
now - node->link->ctime >
server.cluster_node_timeout && /* was not already reconnected */
node->ping_sent && /* we already sent a ping */
node->pong_received < node->ping_sent && /* still waiting pong */
/* and we are waiting for the pong more than timeout/2 */
now - node->ping_sent > server.cluster_node_timeout/2)
{
/* Disconnect the link, it will be reconnected automatically. */
// 释放连接,下次 clusterCron() 会自动重连
freeClusterLink(node->link);
}

/* If we have currently no active ping in this instance, and the
* received PONG is older than half the cluster timeout, send
* a new ping now, to ensure all the nodes are pinged without
* a too big delay. */
// 如果目前没有在 PING 节点
// 并且已经有 node timeout 一半的时间没有从节点那里收到 PONG 回复
// 那么向节点发送一个 PING ,确保节点的信息不会太旧
// (因为一部分节点可能一直没有被随机中)
if (node->link &&
node->ping_sent == 0 &&
(now - node->pong_received) > server.cluster_node_timeout/2)
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}

/* If we are a master and one of the slaves requested a manual
* failover, ping it continuously. */
// 如果这是一个主节点,并且有一个从服务器请求进行手动故障转移
// 那么向从服务器发送 PING 。
if (server.cluster->mf_end &&
nodeIsMaster(myself) &&
server.cluster->mf_slave == node &&
node->link)
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}

/* Check only if we have an active ping for this instance. */
// 以下代码只在节点发送了 PING 命令的情况下执行
if (node->ping_sent == 0) continue;

/* Compute the delay of the PONG. Note that if we already received
* the PONG, then node->ping_sent is zero, so can't reach this
* code at all. */
// 计算等待 PONG 回复的时长
delay = now - node->ping_sent;

// 等待 PONG 回复的时长超过了限制值,将目标节点标记为 PFAIL (疑似下线)
if (delay > server.cluster_node_timeout) {
/* Timeout reached. Set the node as possibly failing if it is
* not already in this state. */
if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {
redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
node->name);
// 打开疑似下线标记
node->flags |= REDIS_NODE_PFAIL;
update_state = 1;
}
}
}
dictReleaseIterator(di);

/* If we are a slave node but the replication is still turned off,
* enable it if we know the address of our master and it appears to
* be up. */
// 如果从节点没有在复制主节点,那么对从节点进行设置
if (nodeIsSlave(myself) &&
server.masterhost == NULL &&
myself->slaveof &&
nodeHasAddr(myself->slaveof))
{
replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
}

/* Abourt a manual failover if the timeout is reached. */
manualFailoverCheckTimeout();

if (nodeIsSlave(myself)) {
clusterHandleManualFailover();
clusterHandleSlaveFailover();
/* If there are orphaned slaves, and we are a slave among the masters
* with the max number of non-failing slaves, consider migrating to
* the orphaned masters. Note that it does not make sense to try
* a migration if there is no master with at least *two* working
* slaves. */
if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
clusterHandleSlaveMigration(max_slaves);
}

// 更新集群状态
if (update_state || server.cluster->state == REDIS_CLUSTER_FAIL)
clusterUpdateState();
}

2、状态信息并不是全量同步,而是随机选M(M << N)个节点的状态同步到其他节点。
M值最小为3,最大为N - 2,一般情况下M = N / 10

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/* Send a PING or PONG packet to the specified node, making sure to add enough
* gossip informations. */
// 向指定节点发送一条 MEET 、 PING 或者 PONG 消息
void clusterSendPing(clusterLink *link, int type) {
unsigned char buf[sizeof(clusterMsg)];
clusterMsg *hdr = (clusterMsg*) buf;
int gossipcount = 0, totlen;
/* freshnodes is the number of nodes we can still use to populate the
* gossip section of the ping packet. Basically we start with the nodes
* we have in memory minus two (ourself and the node we are sending the
* message to). Every time we add a node we decrement the counter, so when
* it will drop to <= zero we know there is no more gossip info we can
* send. */
// freshnodes 是用于发送 gossip 信息的计数器
// 每次发送一条信息时,程序将 freshnodes 的值减一
// 当 freshnodes 的数值小于等于 0 时,程序停止发送 gossip 信息
// freshnodes 的数量是节点目前的 nodes 表中的节点数量减去 2
// 这里的 2 指两个节点,一个是 myself 节点(也即是发送信息的这个节点)
// 另一个是接受 gossip 信息的节点
int freshnodes = dictSize(server.cluster->nodes)-2;

// 如果发送的信息是 PING ,那么更新最后一次发送 PING 命令的时间戳
if (link->node && type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = mstime();

// 将当前节点的信息(比如名字、地址、端口号、负责处理的槽)记录到消息里面
clusterBuildMessageHdr(hdr,type);

/* Populate the gossip fields */
// 从当前节点已知的节点中随机选出两个节点
// 并通过这条消息捎带给目标节点,从而实现 gossip 协议

// 每个节点有 freshnodes 次发送 gossip 信息的机会
// 每次向目标节点发送 2 个被选中节点的 gossip 信息(gossipcount 计数)
while(freshnodes > 0 && gossipcount < 3) {
// 从 nodes 字典中随机选出一个节点(被选中节点)
dictEntry *de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);

clusterMsgDataGossip *gossip;
int j;

/* In the gossip section don't include:
* 以下节点不能作为被选中节点:
* 1) Myself.
* 节点本身。
* 2) Nodes in HANDSHAKE state.
* 处于 HANDSHAKE 状态的节点。
* 3) Nodes with the NOADDR flag set.
* 带有 NOADDR 标识的节点
* 4) Disconnected nodes if they don't have configured slots.
* 因为不处理任何槽而被断开连接的节点
*/
if (this == myself ||
this->flags & (REDIS_NODE_HANDSHAKE|REDIS_NODE_NOADDR) ||
(this->link == NULL && this->numslots == 0))
{
freshnodes--; /* otherwise we may loop forever. */
continue;
}

/* Check if we already added this node */
// 检查被选中节点是否已经在 hdr->data.ping.gossip 数组里面
// 如果是的话说明这个节点之前已经被选中了
// 不要再选中它(否则就会出现重复)
for (j = 0; j < gossipcount; j++) {
if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
REDIS_CLUSTER_NAMELEN) == 0) break;
}
if (j != gossipcount) continue;

/* Add it */

// 这个被选中节点有效,计数器减一
freshnodes--;

// 指向 gossip 信息结构
gossip = &(hdr->data.ping.gossip[gossipcount]);

// 将被选中节点的名字记录到 gossip 信息
memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
// 将被选中节点的 PING 命令发送时间戳记录到 gossip 信息
gossip->ping_sent = htonl(this->ping_sent);
// 将被选中节点的 PING 命令回复的时间戳记录到 gossip 信息
gossip->pong_received = htonl(this->pong_received);
// 将被选中节点的 IP 记录到 gossip 信息
memcpy(gossip->ip,this->ip,sizeof(this->ip));
// 将被选中节点的端口号记录到 gossip 信息
gossip->port = htons(this->port);
// 将被选中节点的标识值记录到 gossip 信息
gossip->flags = htons(this->flags);

// 这个被选中节点有效,计数器增一
gossipcount++;
}

// 计算信息长度
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
// 将被选中节点的数量(gossip 信息中包含了多少个节点的信息)
// 记录在 count 属性里面
hdr->count = htons(gossipcount);
// 将信息的长度记录到信息里面
hdr->totlen = htonl(totlen);

// 发送信息
clusterSendMessage(link,buf,totlen);
}

扩容 / 缩容

当新的节点加入时,我们该如何重新分配数据,让新的节点也对外提供服务。当有节点退出时,我们该如何把存在该节点上的数据分配到其他机器上,让其他机器来提供这部分数据的服务。即集群的扩缩容问题。

新节点加入流程

新节点加入时,需要把一部分数据迁移到新节点来达到集群的负载均衡。
在Redis集群中,数据的存储是以slot为单位的,因此:

  1. 集群的伸缩本质上就是slot在不同机器节点之间的迁移;
  2. 迁移过程中,有的slot在老节点上,有的slot在新节点上,这时,客户端请求应该被重定向到正确的节点上。
    比如slot1从A迁移到B上时,请求A或B会怎么样?请求别的节点又会怎么样?

节点的迁移过程主要分为3个步骤:

  1. 准备新节点
  2. 加入集群
  3. 迁移slot到新节点

以下迁移过程的伪代码来自:Redis集群详解(中)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def move_slot(source,target,slot):
# 目标节点准备导入槽
target.cluster("setslot",slot,"importing",source.nodeId);
# 目标节点准备全出槽
source.cluster("setslot",slot,"migrating",target.nodeId);
while true :
# 批量从源节点获取键
keys = source.cluster("getkeysinslot",slot,pipeline_size);
if keys.length == 0:
# 键列表为空时,退出循环
break;
# 批量迁移键到目标节点
source.call("migrate",target.host,target.port,"",0,timeout,"keys",keys);
# 向集群所有主节点通知槽被分配给目标节点
for node in nodes:
if node.flag == "slave":
continue;
node.cluster("setslot",slot,"node",target.nodeId);

节点迁移过程

以下命令告知目标节点准备导入slot:

1
cluster setslot <slot> IMPORTING <nodeId>

以下命令告知目标节点准备导出slot:

1
cluster setslot <slot> MIGRATING <nodeId>

每个节点保存的集群状态中记录了迁移中的slot,其中,迁出的slot放到migrating_slots_to中,迁入的slot放到importing_slots_from

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
typedef struct clusterState {
clusterNode *myself; /* This node */
// 当前纪元
uint64_t currentEpoch;
// 集群的状态
int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */
// 集群中至少负责一个槽的主节点个数
int size; /* Num of master nodes with at least one slot */
// 保存集群节点的字典,键是节点名字,值是clusterNode结构的指针
dict *nodes; /* Hash table of name -> clusterNode structures */
// 防止重复添加节点的黑名单
dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */
// 导入槽数据到目标节点,该数组记录这些节点
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
// 导出槽数据到目标节点,该数组记录这些节点
clusterNode *importing_slots_from[CLUSTER_SLOTS];
// 槽和负责槽节点的映射
clusterNode *slots[CLUSTER_SLOTS];
// 槽映射到键的有序集合
zskiplist *slots_to_keys;

} clusterState;

接下来,将待迁移slot中的key批量转移到目标节点:

1
2
3
4
# 返回count个slot中的键
cluster getkeysinslot <slot> <count>
# 需要对上面命令返回的每个键都发送以下命令,该命令会将所指定的键原子地从源节点移动到目标节点
migrate <host> <port> key destination-db timeout

migrate命令就是向节点发送了N个RESTORE-ASKING命令,实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/* Create RESTORE payload and generate the protocol to call the command. */
for (j = 0; j < num_keys; j++) {
long long ttl = 0;
long long expireat = getExpire(c->db,kv[j]);
//检查键是不是已经过期
if (expireat != -1) {
ttl = expireat-mstime();
if (ttl < 0) {
continue;
}
if (ttl < 1) ttl = 1;
}
kv[non_expired++] = kv[j];

// 集群模式下写入RESTORE-ASKING命令,普通模式下写入RESTORE命令
if (server.cluster_enabled)
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
else
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
// 写入KEY,写入TTL
serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
sdslen(kv[j]->ptr)));
serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
// 写入VALUE以及Redis版本校验码等信息
createDumpPayload(&payload,ov[j],kv[j]);
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,payload.io.buffer.ptr,
sdslen(payload.io.buffer.ptr)));

}

迁入节点接收到restore-asking命令后,执行节点的恢复操作,即获取key,解析出value,然后写入数据库:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/* RESTORE key ttl serialized-value [REPLACE] */
// 根据给定的 DUMP 数据,还原出一个键值对数据,并将它保存到数据库里面
void restoreCommand(redisClient *c) {
long long ttl;
rio payload;
int j, type, replace = 0;
robj *obj;

...

// 读取 DUMP 数据,并反序列化出键值对的类型和值
rioInitWithBuffer(&payload,c->argv[3]->ptr);
if (((type = rdbLoadObjectType(&payload)) == -1) ||
((obj = rdbLoadObject(type,&payload)) == NULL))
{
addReplyError(c,"Bad data format");
return;
}

/* Remove the old key if needed. */
// 如果给定了 REPLACE 选项,那么先删除数据库中已存在的同名键
if (replace) dbDelete(c->db,c->argv[1]);

/* Create the key and set the TTL if any */
// 将键值对添加到数据库
dbAdd(c->db,c->argv[1],obj);

// 如果键带有 TTL 的话,设置键的 TTL
if (ttl) setExpire(c->db,c->argv[1],mstime()+ttl);

signalModifiedKey(c->db,c->argv[1]);

addReply(c,shared.ok);
server.dirty++;
}

迁移过程中,在外部客户端的视角看来,在任意时间点上,key只会存在于某个节点上,而不会同时存在于两个节点上。

现在,待迁移槽中的key都已经被迁移了,但是对其他节点来说,该slot仍是由迁出节点负责的,它们接收到相关请求后仍然会路由到迁出节点,所以迁移的最后一步需要向集群中的所有主节点通知槽已经被分配给目标节点。

1
cluster setslot <slot> node <nodeId>

迁移过程中对新请求的响应

迁移过程中:

  • 如果迁出节点接收请求,迁出节点判断slot或key是否已迁出,若是则ASK重定向到迁入节点上,否则迁出节点自己负责处理请求;
  • 如果迁入节点接收请求,会把请求重定向到迁出节点上,除非请求中包含ASKING命令;
  • 其他节点接收到的相关请求会被重定向到迁出节点上;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    int processCommand(redisClient *c) {

    ...

    /* If cluster is enabled perform the cluster redirection here.
    *
    * 如果开启了集群模式,那么在这里进行转向操作。
    *
    * However we don't perform the redirection if:
    *
    * 不过,如果有以下情况出现,那么节点不进行转向:
    *
    * 1) The sender of this command is our master.
    * 命令的发送者是本节点的主节点
    *
    * 2) The command has no key arguments.
    * 命令没有 key 参数
    */
    if (server.cluster_enabled &&
    !(c->flags & REDIS_MASTER) &&
    !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
    {
    int hashslot;

    // 集群已下线
    if (server.cluster->state != REDIS_CLUSTER_OK) {
    flagTransaction(c);
    addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n"));
    return REDIS_OK;

    // 集群运作正常
    } else {
    int error_code;
    clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
    // 不能执行多键处理命令
    if (n == NULL) {
    flagTransaction(c);
    if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) {
    addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
    } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) {
    /* The request spawns mutliple keys in the same slot,
    * but the slot is not "stable" currently as there is
    * a migration or import in progress. */
    addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
    } else {
    redisPanic("getNodeByQuery() unknown error.");
    }
    return REDIS_OK;

    // 命令针对的槽和键不是本节点处理的,进行转向
    } else if (n != server.cluster->myself) {
    flagTransaction(c);
    // -<ASK or MOVED> <slot> <ip>:<port>
    // 例如 -ASK 10086 127.0.0.1:12345
    addReplySds(c,sdscatprintf(sdsempty(),
    "-%s %d %s:%d\r\n",
    (error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
    hashslot,n->ip,n->port));

    return REDIS_OK;
    }

    // 如果执行到这里,说明键 key 所在的槽由本节点处理
    // 或者客户端执行的是无参数命令
    }
    }

    ...
    }
    注意上边的getNodeByQuery根据key的散列结果查询命令应该被打到的节点,可以看到这个函数里有对ASKING标识的特殊处理:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {

    ...

    /* If we are receiving the slot, and the client correctly flagged the
    * request as "ASKING", we can serve the request. However if the request
    * involves multiple keys and we don't have them all, the only option is
    * to send a TRYAGAIN error. */
    if (importing_slot &&
    (c->flags & REDIS_ASKING || cmd->flags & REDIS_CMD_ASKING))
    {
    if (multiple_keys && missing_keys) {
    if (error_code) *error_code = REDIS_CLUSTER_REDIR_UNSTABLE;
    return NULL;
    } else {
    return myself;
    }
    }

    ...
    }

旧节点退出流程

与新节点的加入相反的是,旧节点退出时需要把其上的数据迁移到其他节点上,确保该节点上的数据能够被正常访问。
槽的迁移过程和上边扩容中描述的没有区别,主要区别是在迁移完毕后需要轮询每个节点发送cluster forget命令,让它们能忘记下线的节点。
节点在接收cluster forget命令后,会将目标节点的状态从自己保存的集群状态中移除,并将其加入黑名单中60s,这期间其他节点不会再去更新自己维护的该节点的信息,也就是说这60秒内该节点无法重新加入集群内。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def delnode_cluster_cmd(downNode):
# 下线节点不允许包含slots
if downNode.slots.length != 0
exit 1
end
# 向集群内节点发送cluster forget
for n in nodes:
if n.id == downNode.id:
# 不能对自己做forget操作
continue;
# 如果下线节点有从节点则把从节点指向其他主节点
if n.replicate && n.replicate.nodeId == downNode.id :
# 指向拥有最少从节点的主节点
master = get_master_with_least_replicas();
n.cluster("replicate",master.nodeId);
#发送忘记节点命令
n.cluster('forget',downNode.id)
# 节点关闭
downNode.shutdown();

集群规模估算

集群规模并不是没有限制的,理论上每个节点一个slot集群可以扩容到16384个节点,但是Redis官方给出的规模上限是一个集群1000个节点,因为实例间的通信开销会随着实例规模增加而增大
下面来讨论下集群内部有哪些交互,并分析它们会对性能有什么样的影响。

实例间数据的同步

集群每个节点都会记录slot和实例间的映射关系,用于请求的重定向。
每个实例都需要通过Gossip协议将数据同步到其他节点,大致流程为:

  1. 每个实例之间会按照一定的频率,从集群中随机挑选一些实例,把 PING 消息发送给挑选出来的实例,用来检测这些实例是否在线,并交换彼此的状态信息。PING 消息中封装了发送消息的实例自身的状态信息、部分其它实例的状态信息,以及 Slot 映射表。
    发送的节点状态信息在源码中由clusterMsgDataGossip这个结构来表示,大小为104字节。每个实例在发送Gossip消息时,除了传递自身的状态信息,默认还会传递集群十分之一实例的状态信息,比如,对于一个包含了 1000 个实例的集群来说,每个实例发送一个 PING 消息时,会包含 100 个实例的状态信息,总的数据量是 10400 字节,再加上发送实例自身的信息,一个 Gossip 消息大约是 10KB。
    另外,Slot映射表是一个16384位的bitmap,算上上面的10KB就是12KB的内容。
  2. 一个实例在接收到 PING 消息后,会给发送 PING 消息的实例,发送一个 PONG 消息。PONG 消息包含的内容和 PING 消息一样,也是12KB。
  3. 另外,上面是随机选节点发PING请求的,如果部分节点一直没有被选到,就会导致这些节点和其他节点不同步。
    为了避免这种情况,Redis Cluster 的实例会按照每 100ms 一次的频率,扫描本地的实例列表,如果发现有实例最近一次接收 PONG 消息的时间,已经大于配置项 cluster-node-timeout 的一半了(cluster-node-timeout/2),就会立刻给该实例发送 PING 消息,更新这个实例上的集群状态信息。
    当集群规模扩大之后,因为网络拥塞或是不同服务器间的流量竞争,会导致实例间的网络通信延迟增加。如果有部分实例无法收到其它实例发送的 PONG 消息,就会引起实例之间频繁地发送 PING 消息,这又会对集群网络通信带来额外的开销了。

从上可知,实例间的数据同步受到通信消息大小通信频率这两方面的影响。
当集群规模扩大后,PING/PONG会占用大量的集群内网络带宽,降低集群服务正常请求的吞吐量。
单实例每秒会发送的PING消息数量大致可以算出是(注意cron是100ms执行一次):

1
PING 消息发送数量 = 1 + 10 * 实例数(最近一次接收 PONG 消息的时间超出 cluster-node-timeout/2)

其中,1 是指单实例常规按照每 1 秒发送一个 PING 消息,10 是指每 1 秒内实例会执行 10 次检查,每次检查后会给 PONG 消息超时的实例发送消息。
假设单个实例检测发现,每 100 毫秒有 10 个实例的 PONG 消息接收超时,那么,这个实例每秒就会发送 101 个 PING 消息,约占 1.2MB/s 带宽。如果集群中有 30 个实例按照这种频率发送消息,就会占用 36MB/s 带宽,这就会挤占集群中用于服务正常请求的带宽。

因此实例间的通信开销优化主要是:

  1. 减少实例传输的消息大小(PING/PONG 消息、Slot 分配信息)
    但是,因为集群实例依赖 PING、PONG 消息和 Slot 分配信息,来维持集群状态的统一,一旦减小了传递的消息大小,就会导致实例间的通信信息减少,不利于集群维护,所以,我们不能采用这种方式。
  2. 降低实例间发送消息的频率
    从上面PING消息发送数量公式可以看出,每秒发送一条PING消息的频率不算高,如果要降低可能导致集群内数据同步延迟;每100ms做一次检测并给延迟超过cluster-node-timeout/2的节点发送PING消息,这个配置是可以适当调大的。
    • 如果配置得比较小,则在大规模集群中会频繁出现PONG超时的情况;
    • 如果配置得过大,则如果真得发生了故障,我们反而需要等比较长的时间才能检测出来。
      可以在调整前后使用tcpdump抓取实例发送心跳网络包的情况。
      tcpdump host 192.168.10.3 port 16379 -i 网卡名 -w /tmp/r1.cap

故障恢复(容错)

Redis故障恢复主要分为以下3个步骤:

  1. 故障发现
    采用多数派协议完成故障检测判断(即至少有半数以上节点认为某主节点故障后才真正判断节点故障)。
  2. 子节点选举
    Redis Cluster中每个Master都会有1至多个Slave,通过复制实现高可用(故障转移),当Master有多个Slave,会采用Raft实现选举出一个主节点以实现故障恢复。
  3. 配置更新
    故障转移后,那么之前的Master和其他Slave怎么处理?Redis会将这些节点成为新Master节点的子节点。

故障发现

一些 CP 特性且中心化的集群来说,当出现节点宕机时经常需要选举新的 Leader 节点,但是 Redis-Cluster 是去中心化的,某个 Master 的宕机并不会影响其他节点的工作。但是,当节点失联时,需要考虑网络的抖动情况,毕竟不能因为某几个请求意外超时就推断集群失败了,部分节点判断一个节点失联只会标记这个节点状态为PFAIL(主观下线),之后如果多数节点投票通过才会真正标记这个节点FAIL(下线)
投票过程是集群中所有 master 参与的,每个节点都存有整个集群所有主节点及从节点的信息,它们之间通过互相 ping-pong 来判断节点是否可以连上,如果半数以上 master 节点与当前 master 节点通信超时(cluster-node-timeout),则认为当前 master 节点挂掉,标记这个节点状态为FAIL

当 master 挂掉时,并不意味着集群已无法再提供服务了,集群要进入fail(不可用)状态需要满足以下条件之一:

  1. 集群的任意 master 挂掉,且该 master 没有 slave 或 slave 全挂掉了,则集群进入 fail 状态。
    这是因为,Cluster中所有slot是平均分配到每个Master的,如果有一个Master的slot不能用了、而且这个Master还没有Slave,那么集群就不能提供服务了,如果Master还有Slave,Slave可以代替Master继续向外提供服务,这个步骤称为slave promotion
    单独的一对Master-Slave挂掉,Redis还提供一个叫 Replica Migration 的解决方案:当集群中的某个Master节点没有Slave节点时(称之为 Orphaned Master),其他有富余Slave节点的主节点会向该节点迁移一个Slave节点以防该节点下线之后没有子节点来替换从而导致整个集群下线。
  2. 集群超过半数以上 master 挂掉,无论有无 slave 都进入 fail 状态。

当集群不可用时,任何操作都将返回((error) CLUSTERDOWN The cluster is down)错误。需要注意的是,必须要 3 个或以上的主节点,否则在创建集群时会失败。

PFAIL

1、Redis每个节点会不断向其他节点发送PING消息来检测其他节点是否可达,如果超时会先断开连接:
代码:cluster.c/clusterCron
RedisCluster故障发现1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/* This is executed 10 times every second */
// 集群常规操作函数,默认每秒执行 10 次(每间隔 100 毫秒执行一次)
void clusterCron(void) {
...

while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
now = mstime(); /* Use an updated time at every iteration. */
mstime_t delay;

...

/* If we are waiting for the PONG more than half the cluster
* timeout, reconnect the link: maybe there is a connection
* issue even if the node is alive. */
// 判断连接的节点是否出事
if (node->link && /* is connected */
now - node->link->ctime >
server.cluster_node_timeout && /* was not already reconnected */
// ping_sent记录发送命令的时间
node->ping_sent && /* we already sent a ping */
node->pong_received < node->ping_sent && /* still waiting pong */
/* and we are waiting for the pong more than timeout/2 */
// PONG 到达的时间超过了 node_timeout 的一半
now - node->ping_sent > server.cluster_node_timeout/2)
{
/* Disconnect the link, it will be reconnected automatically. */
// 释放连接,此时node->link=NULL,下次 clusterCron() 会自动重连
freeClusterLink(node->link);
}

...
}

...
}

2、此时节点A PING目标节点B失败,A会尝试重连,并将重连时间记录到ping_sent变量中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/* This is executed 10 times every second */
// 集群常规操作函数,默认每秒执行 10 次(每间隔 100 毫秒执行一次)
void clusterCron(void) {

...

/* Check if we have disconnected nodes and re-establish the connection. */
// 向集群中的所有断线或者未连接节点发送消息
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

// 跳过当前节点以及没有地址的节点
if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;

...

// 为未创建连接的节点创建连接
if (node->link == NULL) {
int fd;
mstime_t old_ping_sent;
clusterLink *link;

fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
node->port+REDIS_CLUSTER_PORT_INCR,
server.bindaddr_count ? server.bindaddr[0] : NULL);
if (fd == -1) {
redisLog(REDIS_DEBUG, "Unable to connect to "
"Cluster Node [%s]:%d -> %s", node->ip,
node->port+REDIS_CLUSTER_PORT_INCR,
server.neterr);
continue;
}
link = createClusterLink(node);
link->fd = fd;
node->link = link;
aeCreateFileEvent(server.el,link->fd,AE_READABLE,
clusterReadHandler,link);
/* Queue a PING in the new connection ASAP: this is crucial
* to avoid false positives in failure detection.
*
* If the node is flagged as MEET, we send a MEET message instead
* of a PING one, to force the receiver to add us in its node
* table. */
// 向新连接的节点发送 PING 命令,防止节点被识进入下线
// 如果节点被标记为 MEET ,那么发送 MEET 命令,否则发送 PING 命令
old_ping_sent = node->ping_sent;
clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);

// 这不是第一次发送 PING 信息,所以可以还原这个时间
// 等 clusterSendPing() 函数来更新它
if (old_ping_sent) {
/* If there was an active ping before the link was
* disconnected, we want to restore the ping time, otherwise
* replaced by the clusterSendPing() call. */
node->ping_sent = old_ping_sent;
}

/* We can clear the flag after the first packet is sent.
*
* 在发送 MEET 信息之后,清除节点的 MEET 标识。
*
* If we'll never receive a PONG, we'll never send new packets
* to this node. Instead after the PONG is received and we
* are no longer in meet/handshake status, we want to send
* normal PING packets.
*
* 如果当前节点(发送者)没能收到 MEET 信息的回复,
* 那么它将不再向目标节点发送命令。
*
* 如果接收到回复的话,那么节点将不再处于 HANDSHAKE 状态,
* 并继续向目标节点发送普通 PING 命令。
*/
node->flags &= ~REDIS_NODE_MEET;

redisLog(REDIS_DEBUG,"Connecting with Node %.40s at %s:%d",
node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
}
}

...

}

3、节点A发现PING B的延时时间超过了node_timeout之后,就会标记该节点为PFAIL(Possible FAILure),即主观下线:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
void clusterCron(void) {

...

// 遍历所有节点,检查是否需要将某个节点标记为下线
/* Iterate nodes to check if we need to flag something as failing.
* This loop is also responsible to:
* 1) Check if there are orphaned masters (masters without non failing
* slaves).
* 2) Count the max number of non failing slaves for a single master.
* 3) Count the number of slaves for our master, if we are a slave. */
orphaned_masters = 0;
max_slaves = 0;
this_slaves = 0;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {

...

/* Check only if we have an active ping for this instance. */
// 以下代码只在节点发送了 PING 命令的情况下执行
if (node->ping_sent == 0) continue;

/* Compute the delay of the PONG. Note that if we already received
* the PONG, then node->ping_sent is zero, so can't reach this
* code at all. */
// 计算等待 PONG 回复的时长
delay = now - node->ping_sent;

// 等待 PONG 回复的时长超过了限制值,将目标节点标记为 PFAIL (疑似下线)
if (delay > server.cluster_node_timeout) {
/* Timeout reached. Set the node as possibly failing if it is
* not already in this state. */
if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {
redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
node->name);
// 打开疑似下线标记
node->flags |= REDIS_NODE_PFAIL;
update_state = 1;
}
}
}

...
}

FAIL

RedisCluster故障发现2
1、A将B标记为PFAIL后,A会通过Gossip通知到其他节点。

2、所有节点会维护一个下线报告列表(Fail Report),主要维护一个节点被哪些节点报告处于下线状态,此时,C会记录“B被A报告下线了”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
int clusterProcessPacket(clusterLink *link) {

...

/* Process packets by type. */
// 根据消息的类型,处理节点

// 这是一条 PING 消息或者 MEET 消息
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
redisLog(REDIS_DEBUG,"Ping packet received: %p", (void*)link->node);

/* Add this node if it is new for us and the msg type is MEET.
*
* 如果当前节点是第一次遇见这个节点,并且对方发来的是 MEET 信息,
* 那么将这个节点添加到集群的节点列表里面。
*
* In this stage we don't try to add the node with the right
* flags, slaveof pointer, and so forth, as this details will be
* resolved when we'll receive PONGs from the node.
*
* 节点目前的 flag 、 slaveof 等属性的值都是未设置的,
* 等当前节点向对方发送 PING 命令之后,
* 这些信息可以从对方回复的 PONG 信息中取得。
*/
if (!sender && type == CLUSTERMSG_TYPE_MEET) {
clusterNode *node;

// 创建 HANDSHAKE 状态的新节点
node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);

// 设置 IP 和端口
nodeIp2String(node->ip,link);
node->port = ntohs(hdr->port);

// 将新节点添加到集群
clusterAddNode(node);

clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}

/* Get info from the gossip section */
// 分析并取出消息中的 gossip 节点信息
clusterProcessGossipSection(hdr,link);

/* Anyway reply with a PONG */
// 向目标节点返回一个 PONG
clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
}

...
}

void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {

// 记录这条消息中包含了多少个节点的信息
uint16_t count = ntohs(hdr->count);

// 指向第一个节点的信息
clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;

// 取出发送者
clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);

// 遍历所有节点的信息
while(count--) {
sds ci = sdsempty();

// 分析节点的 flag
uint16_t flags = ntohs(g->flags);

// 信息节点
clusterNode *node;

...

/* Update our state accordingly to the gossip sections */
// 使用消息中的信息对节点进行更新
node = clusterLookupNode(g->nodename);
// 节点已经存在于当前节点
if (node) {
/* We already know this node.
Handle failure reports, only when the sender is a master. */
// 如果 sender 是一个主节点,那么我们需要处理下线报告
if (sender && nodeIsMaster(sender) && node != myself) {
// 节点处于 FAIL 或者 PFAIL 状态
if (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)) {

// 添加 sender 对 node 的下线报告
if (clusterNodeAddFailureReport(node,sender)) {
redisLog(REDIS_VERBOSE,
"Node %.40s reported node %.40s as not reachable.",
sender->name, node->name);
}

// 尝试将 node 标记为 FAIL
markNodeAsFailingIfNeeded(node);

// 节点处于正常状态
} else {

// 如果 sender 曾经发送过对 node 的下线报告
// 那么清除该报告
if (clusterNodeDelFailureReport(node,sender)) {
redisLog(REDIS_VERBOSE,
"Node %.40s reported node %.40s is back online.",
sender->name, node->name);
}
}
}

/* If we already know this node, but it is not reachable, and
* we see a different address in the gossip section, start an
* handshake with the (possibly) new address: this will result
* into a node address update if the handshake will be
* successful. */
// 如果节点之前处于 PFAIL 或者 FAIL 状态
// 并且该节点的 IP 或者端口号已经发生变化
// 那么可能是节点换了新地址,尝试对它进行握手
if (node->flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL) &&
(strcasecmp(node->ip,g->ip) || node->port != ntohs(g->port)))
{
clusterStartHandshake(g->ip,ntohs(g->port));
}

// 当前节点不认识 node
} else {
/* If it's not in NOADDR state and we don't have it, we
* start a handshake process against this IP/PORT pairs.
*
* 如果 node 不在 NOADDR 状态,并且当前节点不认识 node
* 那么向 node 发送 HANDSHAKE 消息。
*
* Note that we require that the sender of this gossip message
* is a well known node in our cluster, otherwise we risk
* joining another cluster.
*
* 注意,当前节点必须保证 sender 是本集群的节点,
* 否则我们将有加入了另一个集群的风险。
*/
if (sender &&
!(flags & REDIS_NODE_NOADDR) &&
!clusterBlacklistExists(g->nodename))
{
clusterStartHandshake(g->ip,ntohs(g->port));
}
}

/* Next node */
// 处理下个节点的信息
g++;
}
}

3、C添加下线报告之后,会进行B节点的客观下线状态(FAIL)判定。
当集群中有超过半数的节点都认为节点B处于PFAIL后才会判断B为FAIL,且需要注意的是,A将PFAIL通知给C后,C自己本身也得认为B处于PFAIL状态才会开始客观下线判定。
当C认为B正式FAIL后,它就会立刻向集群所有节点广播这个消息。
RedisCluster故障发现3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/* This function checks if a given node should be marked as FAIL.
* It happens if the following conditions are met:
*
* 此函数用于判断是否需要将 node 标记为 FAIL 。
*
* 将 node 标记为 FAIL 需要满足以下两个条件:
*
* 1) We received enough failure reports from other master nodes via gossip.
* Enough means that the majority of the masters signaled the node is
* down recently.
* 有半数以上的主节点将 node 标记为 PFAIL 状态。
* 2) We believe this node is in PFAIL state.
* 当前节点也将 node 标记为 PFAIL 状态。
*
* If a failure is detected we also inform the whole cluster about this
* event trying to force every other node to set the FAIL flag for the node.
*
* 如果确认 node 已经进入了 FAIL 状态,
* 那么节点还会向其他节点发送 FAIL 消息,让其他节点也将 node 标记为 FAIL 。
*
* Note that the form of agreement used here is weak, as we collect the majority
* of masters state during some time, and even if we force agreement by
* propagating the FAIL message, because of partitions we may not reach every
* node. However:
*
* 注意,集群判断一个 node 进入 FAIL 所需的条件是弱(weak)的,
* 因为节点们对 node 的状态报告并不是实时的,而是有一段时间间隔
* (这段时间内 node 的状态可能已经发生了改变),
* 并且尽管当前节点会向其他节点发送 FAIL 消息,
* 但因为网络分裂(network partition)的问题,
* 有一部分节点可能还是会不知道将 node 标记为 FAIL 。
*
* 不过:
*
* 1) Either we reach the majority and eventually the FAIL state will propagate
* to all the cluster.
* 只要我们成功将 node 标记为 FAIL ,
* 那么这个 FAIL 状态最终(eventually)总会传播至整个集群的所有节点。
* 2) Or there is no majority so no slave promotion will be authorized and the
* FAIL flag will be cleared after some time.
* 又或者,因为没有半数的节点支持,当前节点不能将 node 标记为 FAIL ,
* 所以对 FAIL 节点的故障转移将无法进行, FAIL 标识可能会在之后被移除。
*
*/
void markNodeAsFailingIfNeeded(clusterNode *node) {
int failures;

// 标记为 FAIL 所需的节点数量,需要超过集群节点数量的一半
int needed_quorum = (server.cluster->size / 2) + 1;

if (!nodeTimedOut(node)) return; /* We can reach it. */
if (nodeFailed(node)) return; /* Already FAILing. */

// 统计将 node 标记为 PFAIL 或者 FAIL 的节点数量(不包括当前节点)
failures = clusterNodeFailureReportsCount(node);

/* Also count myself as a voter if I'm a master. */
// 如果当前节点是主节点,那么将当前节点也算在 failures 之内
if (nodeIsMaster(myself)) failures++;
// 报告下线节点的数量不足节点总数的一半,不能将节点判断为 FAIL ,返回
if (failures < needed_quorum) return; /* No weak agreement from masters. */

redisLog(REDIS_NOTICE,
"Marking node %.40s as failing (quorum reached).", node->name);

/* Mark the node as failing. */
// 将 node 标记为 FAIL
node->flags &= ~REDIS_NODE_PFAIL;
node->flags |= REDIS_NODE_FAIL;
node->fail_time = mstime();

/* Broadcast the failing node name to everybody, forcing all the other
* reachable nodes to flag the node as FAIL. */
// 如果当前节点是主节点的话,那么向其他节点发送报告 node 的 FAIL 信息
// 让其他节点也将 node 标记为 FAIL
if (nodeIsMaster(myself)) clusterSendFail(node->name);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
}

4、当C标记了B为FAIL状态,则它会广播到整个集群中的所有节点(包括子节点),其他节点都会更新自己维护的节点B的状态信息为FAIL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/* Send a FAIL message to all the nodes we are able to contact.
*
* 向当前节点已知的所有节点发送 FAIL 信息。
*/
void clusterSendFail(char *nodename) {
unsigned char buf[sizeof(clusterMsg)];
clusterMsg *hdr = (clusterMsg *) buf;

// 创建下线消息
clusterBuildMessageHdr(hdr, CLUSTERMSG_TYPE_FAIL);

// 记录命令
memcpy(hdr->data.fail.about.nodename, nodename, REDIS_CLUSTER_NAMELEN);

// 广播消息
clusterBroadcastMessage(buf, ntohl(hdr->totlen));
}

/* Send a message to all the nodes that are part of the cluster having
* a connected link.
*
* 向节点连接的所有其他节点发送信息。
*/
void clusterBroadcastMessage(void *buf, size_t len) {
dictIterator *di;
dictEntry *de;

// 遍历所有已知节点
di = dictGetSafeIterator(server.cluster->nodes);
while ((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

// 不向未连接节点发送信息
if (!node->link) continue;

// 不向节点自身或者 HANDSHAKE 状态的节点发送信息
if (node->flags & (REDIS_NODE_MYSELF | REDIS_NODE_HANDSHAKE))
continue;

// 发送信息
clusterSendMessage(node->link, buf, len);
}
dictReleaseIterator(di);
}

子节点选举(故障迁移)

1、当B的两个子节点接收到B的FAIL状态消息时,它们会更新自己本地内存中的集群状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int clusterProcessPacket(clusterLink *link) {

...

// 这是一条 FAIL 消息: sender 告知当前节点,某个节点已经进入 FAIL 状态。
else if (type == CLUSTERMSG_TYPE_FAIL) {
clusterNode *failing;

if (sender) {

// 获取下线节点的消息
failing = clusterLookupNode(hdr->data.fail.about.nodename);
// 下线的节点既不是当前节点,也没有处于 FAIL 状态
if (failing &&
!(failing->flags & (REDIS_NODE_FAIL | REDIS_NODE_MYSELF))) {
redisLog(REDIS_NOTICE,
"FAIL message received from %.40s about %.40s",
hdr->sender, hdr->data.fail.about.nodename);

// 打开 FAIL 状态
failing->flags |= REDIS_NODE_FAIL;
failing->fail_time = mstime();
// 关闭 PFAIL 状态
failing->flags &= ~REDIS_NODE_PFAIL;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG |
CLUSTER_TODO_UPDATE_STATE);
}
} else {
redisLog(REDIS_NOTICE,
"Ignoring FAIL message from unknonw node %.40s about %.40s",
hdr->sender, hdr->data.fail.about.nodename);
}

}

...

}

2、随后,在clusterCron定时任务中就会开始发起故障迁移,竞选成为新的Master

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
void clusterCron(void) {

...

/* Abourt a manual failover if the timeout is reached. */
manualFailoverCheckTimeout();

// 如果当前节点是子节点
if (nodeIsSlave(myself)) {
clusterHandleManualFailover();
// 处理集群子节点的故障迁移
clusterHandleSlaveFailover();

/* If there are orphaned slaves, and we are a slave among the masters
* with the max number of non-failing slaves, consider migrating to
* the orphaned masters. Note that it does not make sense to try
* a migration if there is no master with at least *two* working
* slaves. */
if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
clusterHandleSlaveMigration(max_slaves);
}

// 更新集群状态
if (update_state || server.cluster->state == REDIS_CLUSTER_FAIL)
clusterUpdateState();
}

/* This function is called if we are a slave node and our master serving
* a non-zero amount of hash slots is in FAIL state.
*
* 如果当前节点是一个从节点,并且它正在复制的一个负责非零个槽的主节点处于 FAIL 状态,
* 那么执行这个函数。
*
* The gaol of this function is:
*
* 这个函数有三个目标:
*
* 1) To check if we are able to perform a failover, is our data updated?
* 检查是否可以对主节点执行一次故障转移,节点的关于主节点的信息是否准确和最新(updated)?
* 2) Try to get elected by masters.
* 选举一个新的主节点
* 3) Perform the failover informing all the other nodes.
* 执行故障转移,并通知其他节点
*/
void clusterHandleSlaveFailover(void) {
mstime_t data_age;
mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
int needed_quorum = (server.cluster->size / 2) + 1;
int manual_failover = server.cluster->mf_end != 0 &&
server.cluster->mf_can_start;
int j;
mstime_t auth_timeout, auth_retry_time;

server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;

/* Compute the failover timeout (the max time we have to send votes
* and wait for replies), and the failover retry time (the time to wait
* before waiting again.
*
* Timeout is MIN(NODE_TIMEOUT*2,2000) milliseconds.
* Retry is two times the Timeout.
*/
auth_timeout = server.cluster_node_timeout * 2;
if (auth_timeout < 2000) auth_timeout = 2000;
auth_retry_time = auth_timeout * 2;

/* Pre conditions to run the function, that must be met both in case
* of an automatic or manual failover:
* 1) We are a slave.
* 2) Our master is flagged as FAIL, or this is a manual failover.
* 3) It is serving slots. */
if (nodeIsMaster(myself) ||
myself->slaveof == NULL ||
(!nodeFailed(myself->slaveof) && !manual_failover) ||
myself->slaveof->numslots == 0)
return;

...

}

3、资格检查
Slave节点会不停的与Master节点通信来复制Master节点的数据,如果一个Slave节点长时间不与Master节点通信,那么很可能意味着该Slave节点上的数据已经落后Master节点过多(因为Master节点再不停的更新数据但是Slave节点并没有随之更新)。Redis认为,当一个Slave节点过长时间不与Master节点通信,那么该节点就不具备参与竞选的资格。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void clusterHandleSlaveFailover(void) {

...

/* Set data_age to the number of seconds we are disconnected from
* the master. */
// 将 data_age 设置为从节点与主节点的断开秒数
if (server.repl_state == REDIS_REPL_CONNECTED) {
data_age = (mstime_t) (server.unixtime - server.master->lastinteraction)
* 1000;
} else {
data_age = (mstime_t) (server.unixtime - server.repl_down_since) * 1000;
}

/* Remove the node timeout from the data age as it is fine that we are
* disconnected from our master at least for the time it was down to be
* flagged as FAIL, that's the baseline. */
// node timeout 的时间不计入断线时间之内
if (data_age > server.cluster_node_timeout)
data_age -= server.cluster_node_timeout;

/* Check if our data is recent enough. For now we just use a fixed
* constant of ten times the node timeout since the cluster should
* react much faster to a master down.
*
* Check bypassed for manual failovers. */
// 检查这个从节点的数据是否较新:
// 目前的检测办法是断线时间不能超过 node timeout 的十倍
if (data_age >
((mstime_t)server.repl_ping_slave_period * 1000) +
(server.cluster_node_timeout * REDIS_CLUSTER_SLAVE_VALIDITY_MULT))
{
if (!manual_failover) return;
}

...

}

4、休眠时间计算
B的所有子节点(B1、B2)在判断自己具备选举资格时,就开始执行竞选,竞选协议是Raft,选举过程中,所有参与选举的节点首先随机休眠一段时间。
整个休眠时间由两个部分组成:

1
DELAY = 500 milliseconds + random delay between 0 and 500 milliseconds + SLAVE_RANK * 1000 milliseconds.
  • 一部分为固定的500ms时间,这500ms主要是为了等待集群状态同步。上面提到节点C会向集群所有节点广播消息,那么这500ms就是等待确保集群的所有节点都收到了消息并更新了状态。
  • 另一部分主要是一个随机的时间加上由该Slave节点的排名决定的附加时间。每个slave都会记录自己从主节点同步数据的复制偏移量。复制偏移量越大,说明该节点与主节点数据保持的越一致。那么显然我们选举的时候肯定是想选状态更新最近的子节点,所以我们按照更新状态的排序来确定休眠时间的附加部分。状态更新最近的节点SLAVE_RANK排名为1,那么其休眠的时间相应的也最短,也就意味着该节点最有可能获得大部分选票。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    void clusterHandleSlaveFailover(void) {

    ...

    /* If the previous failover attempt timedout and the retry time has
    * elapsed, we can setup a new one. */
    if (auth_age > auth_retry_time) {
    server.cluster->failover_auth_time = mstime() +
    500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
    random() % 500; /* Random delay between 0 and 500 milliseconds. */
    server.cluster->failover_auth_count = 0;
    server.cluster->failover_auth_sent = 0;
    server.cluster->failover_auth_rank = clusterGetSlaveRank();
    /* We add another delay that is proportional to the slave rank.
    * Specifically 1 second * rank. This way slaves that have a probably
    * less updated replication offset, are penalized. */
    server.cluster->failover_auth_time +=
    server.cluster->failover_auth_rank * 1000;
    /* However if this is a manual failover, no delay is needed. */
    if (server.cluster->mf_end) {
    server.cluster->failover_auth_time = mstime();
    server.cluster->failover_auth_rank = 0;
    }
    redisLog(REDIS_WARNING,
    "Start of election delayed for %lld milliseconds "
    "(rank #%d, offset %lld).",
    server.cluster->failover_auth_time - mstime(),
    server.cluster->failover_auth_rank,
    replicationGetSlaveOffset());
    /* Now that we have a scheduled election, broadcast our offset
    * to all the other slaves so that they'll updated their offsets
    * if our offset is better. */
    clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
    return;
    }

    ...

    /* Return ASAP if we can't still start the election. */
    // 如果执行故障转移的时间未到,先返回
    if (mstime() < server.cluster->failover_auth_time) return;

    ...

    }
    5、发起拉票 & 选举投票
    B1唤醒后,会向其他所有节点发送拉票请求,即CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST类型的消息。
    其他主节点接收到拉票请求,且此时它还没有投出自己的票,则会将自己票投给发请求的B1,即回复FAILOVER_AUTH_ACK消息。
    其他子节点没有投票的资格,因此即使接收到CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST类型消息也会直接忽略。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    void clusterHandleSlaveFailover(void) {

    ...

    /* Ask for votes if needed. */
    // 向其他节点发送故障转移请求
    if (server.cluster->failover_auth_sent == 0) {

    // 增加配置纪元
    server.cluster->currentEpoch++;

    // 记录发起故障转移的配置纪元
    server.cluster->failover_auth_epoch = server.cluster->currentEpoch;

    redisLog(REDIS_WARNING, "Starting a failover election for epoch %llu.",
    (unsigned long long) server.cluster->currentEpoch);

    // 向其他所有节点发送信息,看它们是否支持由本节点来对下线主节点进行故障转移
    clusterRequestFailoverAuth();

    // 打开标识,表示已发送信息
    server.cluster->failover_auth_sent = 1;

    // TODO:
    // 在进入下个事件循环之前,执行:
    // 1)保存配置文件
    // 2)更新节点状态
    // 3)同步配置
    clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG |
    CLUSTER_TODO_UPDATE_STATE |
    CLUSTER_TODO_FSYNC_CONFIG);
    return; /* Wait for replies. */
    }

    ...

    }
    6、替换节点(failover)
    当子节点接收到来自其他节点的ACK消息时会统计自己获得的票数,当达到集群Master总数的一半以上时,就会开始执行failover,即替换自己的主节点。
    首先标记自己为主节点,然后将原来由节点B负责的slots标记为由自己负责,最后向整个集群广播现在自己是Master同时负责旧Master所有slots的信息。其他节点接收到该信息后会更新自己维护的B1的状态并标记B1为主节点,将节点B负责的slots的负责节点设置为B1节点。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    void clusterHandleSlaveFailover(void) {

    ...

    /* Check if we reached the quorum. */
    // 如果当前节点获得了足够多的投票,那么对下线主节点进行故障转移
    if (server.cluster->failover_auth_count >= needed_quorum) {
    // 旧主节点
    clusterNode *oldmaster = myself->slaveof;

    redisLog(REDIS_WARNING,
    "Failover election won: I'm the new master.");

    /* We have the quorum, perform all the steps to correctly promote
    * this slave to a master.
    *
    * 1) Turn this node into a master.
    * 将当前节点的身份由从节点改为主节点
    */
    clusterSetNodeAsMaster(myself);
    // 让从节点取消复制,成为新的主节点
    replicationUnsetMaster();

    /* 2) Claim all the slots assigned to our master. */
    // 接收所有主节点负责处理的槽
    for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
    if (clusterNodeGetSlotBit(oldmaster, j)) {
    // 将槽设置为未分配的
    clusterDelSlot(j);
    // 将槽的负责人设置为当前节点
    clusterAddSlot(myself, j);
    }
    }

    /* 3) Update my configEpoch to the epoch of the election. */
    // 更新集群配置纪元
    myself->configEpoch = server.cluster->failover_auth_epoch;

    /* 4) Update state and save config. */
    // 更新节点状态
    clusterUpdateState();
    // 并保存配置文件
    clusterSaveConfigOrDie(1);

    /* 5) Pong all the other nodes so that they can update the state
    * accordingly and detect that we switched to master role. */
    // 向所有节点发送 PONG 信息
    // 让它们可以知道当前节点已经升级为主节点了
    clusterBroadcastPong(CLUSTER_BROADCAST_ALL);

    /* 6) If there was a manual failover in progress, clear the state. */
    // 如果有手动故障转移正在执行,那么清理和它有关的状态
    resetManualFailover();
    }
    }

配置更新

上边我们已经通过故障发现和子节点选举机制用B1这个子节点替换掉了它的Master节点B,那么留下来的节点B和B2应该怎么处理呢?实际上Redis会让它们变成B1的Slave节点。
1、对B2来说,B1升级成Master后会给B2发送消息,让它知道自己已经升级成Master了。

1
2
3
4
5
6
7
8
9
10
11
12
void clusterHandleSlaveFailover(void) {

...

/* 5) Pong all the other nodes so that they can update the state
* accordingly and detect that we switched to master role. */
// 向所有节点发送 PONG 信息
// 让它们可以知道当前节点已经升级为主节点了
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);

...
}

2、对B来说,B1已经成为了Master,B从故障恢复后再次加入集群时,会成为B1的Slave。

Cluster数据丢失隐患及处理方案

一种数据丢失的场景是主从复制时Master挂掉了,这点我在《Redis 复制》讨论过。
另一种数据丢失的场景存在于Cluster集群中,并且并不是特别容易出现,也就是Cluster发生了脑裂,分区恢复时脑裂期间的数据被覆盖:

  1. 主节点挂掉了,从节点选举出了一个新的主节点,但是此时客户端还在与老主节点通信,将数据写入到老的主节点上;

    这种情况是可能发生的,因为客户端会记忆槽所在的节点,而不是每次请求都通过重定向定位到槽实际所在的节点上。

  2. 之后主从切换成功后,老的主节点会成功新主节点的Slave,并从新的主节点上获取数据,这时该节点上的数据会被清空,从而导致数据丢失。

为什么会发生脑裂?

在《Redis核心技术与实战》中提到了一种会导致脑裂的情况:

  1. Slave会定时地PING Master,发生的错误达到一定时间会被标记为主观下线,当标记主观下线的次数达到规定数量后,标记为客观下线;
  2. 但是Master实际上是“假故障”,即虽然响应Slave的心跳失败了,但是客户端还是可以和Master正常通信的。
    比如宿主机上有一些其他进程将CPU打满了,在打满期间,Slave就会有可能将Master判断为下线,开始选举及主从切换。

为什么脑裂会导致数据丢失?

主从切换后,从库会升级为新主库,这时如果老主库重新上线了,会成为新主库的Slave,执行全量同步,而全量同步执行的最后阶段,需要清空本地的数据,加载新主库发送过来的RDB文件,这期间写入的数据就会丢失了。

如何解决这种脑裂问题?

可以通过两个配置来解决这个脑裂问题:

  • min-slaves-to-write
    这个配置项设置了主库能进行数据同步的最少从库数量
  • min-slaves-to-write
    min-slaves-max-lag:这个配置项设置了主从库间进行数据复制时,从库给主库发送 ACK 消息的最大延迟(以秒为单位)

我们可以把 min-slaves-to-write 和 min-slaves-max-lag 这两个配置项搭配起来使用,分别给它们设置一定的阈值,假设为 N 和 T。这两个配置项组合后的要求是,主库连接的从库中至少有 N 个从库,和主库进行数据复制时的 ACK 消息延迟不能超过 T 秒,否则,主库就不会再接收客户端的请求了。即使原主库是假故障,它在假故障期间也无法响应哨兵心跳,也不能和从库进行同步,自然也就无法和从库进行 ACK 确认了。这样一来,min-slaves-to-write 和 min-slaves-max-lag 的组合要求就无法得到满足,原主库就会被限制接收客户端请求,客户端也就不能在原主库中写入新数据了。等到新主库上线时,就只有新主库能接收和处理客户端请求,此时,新写的数据会被直接写到新主库中。而原主库会被哨兵降为从库,即使它的数据被清空了,也不会有新数据丢失。
举个例子:假设我们将min-slaves-to-write 设置为 1,把 min-slaves-max-lag 设置为 12s,把哨兵的 down-after-milliseconds 设置为 10s,主库因为某些原因卡住了 15s,导致哨兵判断主库客观下线,开始进行主从切换。同时,因为原主库卡住了 15s,没有一个从库能和原主库在 12s 内进行数据复制,原主库也无法接收客户端请求了。这样一来,主从切换完成后,也只有新主库能接收请求,不会发生脑裂,也就不会发生数据丢失的问题了。

数据倾斜

Cluster集群通过CRC16算法将key hash到节点槽上,这个过程还是存在很多不确定性,可能很多数据会被hash到固定的某几个槽上,造成数据分布的不均匀,或者某些key是热点数据,被访问得尤其频繁。

数据倾斜的危害

数据倾斜的危害主要是保存热点数据的节点处理压力会增大,速度变慢,甚至内存资源耗尽而崩溃。

数据倾斜的成因

数据倾斜的成因主要有3个:

  1. bigkey
    bigkey一般是value值很大的string或保存了大量对象的集合类型。
    bigkey可能会造成实例IO线程阻塞,影响其他请求的执行效率。
    为了处理bigkey,设计的时候最好避免把过多的数据保存在同一个键值对中,如果是集合类型,还可以把bigkey拆分成多个小的集合类型数据,分散保存在不同的实例上。
  2. slot分配不均衡
    如果没有均衡地分配slot,就会有大量的数据被分配到同一个slot中,而同一个slot只会在一个实例上分布,并导致大量数据被集中到同一个实例上。
  3. Hash Tag
    hash tag指针对key的某个部分进行hash,比如user:123,可以加上hash tag后变成user:{123},只针对123进行hash。
    hash tag的意义主要在于可以将同类的数据hash到同一个槽上,便于范围查询。
    hash tag的缺点也在于分布到同一槽内后,对该槽所在节点的压力会变大。

数据倾斜的解决办法

数据倾斜可以通过重分配slot来解决。
但是热点数据往往是少部分数据被频繁访问,这种情况下重分配slot是无法解决的,为此可以通过热点数据多副本的方法来解决,比如同一key添加一个前缀然后hash到其他slot上。
但是多副本只能用于只读热点key,对于有读有写的热点数据,就只能给实例本身增加资源了,比如改成配置更高的机器。

QA

  1. Redis Cluster哈希槽通过CRC16算法将key哈希到实例上的槽,这样做有什么好处?为什么不直接用一张表来存储key和哈希槽之间的对应关系?
    如果用一张关系表来做映射,问题太多了,比如:key太多了怎么存关系?集群扩容、缩容、故障转移时怎么修改key和实例间的对应关系?
    而引入哈希槽,实际上是将数据和节点解耦,客户端只需关注key被hash到哪个哈希槽,就算打到错误的节点上,也可以通过

参考

  1. Redis 集群规范
  2. redis源码解析
  3. Redis集群详解(上)
  4. Redis集群详解(中)
  5. Redis集群(终篇)
  6. Redis在线数据迁移工具redis-migrate-tool详解,轻松实现redis集群之间的数据同步
  7. CRDT——解决最终一致问题的利器