/* The following fields are used to take the slave state on elections. */ // 以下这些域被用于进行故障转移选举
// 上次执行选举或者下次执行选举的时间 mstime_t failover_auth_time; /* Time of previous or next election. */
// 节点获得的投票数量 int failover_auth_count; /* Number of votes received so far. */
// 如果值为 1 ,表示本节点已经向其他节点发送了投票请求 int failover_auth_sent; /* True if we already asked for votes. */
int failover_auth_rank; /* This slave rank for current auth request. */
uint64_t failover_auth_epoch; /* Epoch of the current election. */
/* Manual failover state in common. */ /* 共用的手动故障转移状态 */
// 手动故障转移执行的时间限制 mstime_t mf_end; /* Manual failover time limit (ms unixtime). It is zero if there is no MF in progress. */ /* Manual failover state of master. */ /* 主服务器的手动故障转移状态 */ clusterNode *mf_slave; /* Slave performing the manual failover. */ /* Manual failover state of slave. */ /* 从服务器的手动故障转移状态 */ long long mf_master_offset; /* Master offset the slave needs to start MF or zero if stil not received. */ // 指示手动故障转移是否可以开始的标志值 // 值为非 0 时表示各个主服务器可以开始投票 int mf_can_start; /* If non-zero signal that the manual failover can start requesting masters vote. */
/* The followign fields are uesd by masters to take state on elections. */ /* 以下这些域由主服务器使用,用于记录选举时的状态 */
// 集群最后一次进行投票的纪元 uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
// 在进入下个事件循环之前要做的事情,以各个 flag 来记录 int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
// 通过 cluster 连接发送的消息数量 long long stats_bus_messages_sent; /* Num of msg sent via cluster bus. */
// 通过 cluster 接收到的消息数量 long long stats_bus_messages_received; /* Num of msg rcvd via cluster bus.*/
/* If cluster is enabled perform the cluster redirection here. * * 如果开启了集群模式,那么在这里进行转向操作。 * * However we don't perform the redirection if: * * 不过,如果有以下情况出现,那么节点不进行转向: * * 1) The sender of this command is our master. * 命令的发送者是本节点的主节点 * * 2) The command has no key arguments. * 命令没有 key 参数 */ if (server.cluster_enabled && !(c->flags & REDIS_MASTER) && !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) { int hashslot;
// 集群已下线 if (server.cluster->state != REDIS_CLUSTER_OK) { flagTransaction(c); addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n")); return REDIS_OK;
// 集群运作正常 } else { int error_code; clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code); // 不能执行多键处理命令 if (n == NULL) { flagTransaction(c); if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) { addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n")); } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) { /* The request spawns mutliple keys in the same slot, * but the slot is not "stable" currently as there is * a migration or import in progress. */ addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n")); } else { redisPanic("getNodeByQuery() unknown error."); } return REDIS_OK;
分区使用的粒度是key,不能使用一个非常长的排序key存储一个数据集(The partitioning granularity is the key, so it is not possible to shard a dataset with a single huge key like a very big sorted set).
/* This is executed 10 times every second */ // 集群常规操作函数,默认每秒执行 10 次(每间隔 100 毫秒执行一次) void clusterCron(void) { dictIterator *di; dictEntry *de; int update_state = 0; int orphaned_masters; /* How many masters there are without ok slaves. */ int max_slaves; /* Max number of ok slaves for a single master. */ int this_slaves; /* Number of ok slaves for our master (if we are slave). */ mstime_t min_pong = 0, now = mstime(); clusterNode *min_pong_node = NULL; // 迭代计数器,一个静态变量 static unsigned long long iteration = 0; mstime_t handshake_timeout;
// 记录一次迭代 iteration++; /* Number of times this function was called so far. */
/* The handshake timeout is the time after which a handshake node that was * not turned into a normal node is removed from the nodes. Usually it is * just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use * the value of 1 second. */ // 如果一个 handshake 节点没有在 handshake timeout 内 // 转换成普通节点(normal node), // 那么节点会从 nodes 表中移除这个 handshake 节点 // 一般来说 handshake timeout 的值总是等于 NODE_TIMEOUT // 不过如果 NODE_TIMEOUT 太少的话,程序会将值设为 1 秒钟 handshake_timeout = server.cluster_node_timeout; if (handshake_timeout < 1000) handshake_timeout = 1000;
/* Check if we have disconnected nodes and re-establish the connection. */ // 向集群中的所有断线或者未连接节点发送消息 di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de);
// 跳过当前节点以及没有地址的节点 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
/* A Node in HANDSHAKE state has a limited lifespan equal to the * configured node timeout. */ // 如果 handshake 节点已超时,释放它 if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) { freeClusterNode(node); continue; }
// 为未创建连接的节点创建连接 if (node->link == NULL) { int fd; mstime_t old_ping_sent; clusterLink *link;
fd = anetTcpNonBlockBindConnect(server.neterr, node->ip, node->port+REDIS_CLUSTER_PORT_INCR, server.bindaddr_count ? server.bindaddr[0] : NULL); if (fd == -1) { redisLog(REDIS_DEBUG, "Unable to connect to " "Cluster Node [%s]:%d -> %s", node->ip, node->port+REDIS_CLUSTER_PORT_INCR, server.neterr); continue; } link = createClusterLink(node); link->fd = fd; node->link = link; aeCreateFileEvent(server.el,link->fd,AE_READABLE, clusterReadHandler,link); /* Queue a PING in the new connection ASAP: this is crucial * to avoid false positives in failure detection. * * If the node is flagged as MEET, we send a MEET message instead * of a PING one, to force the receiver to add us in its node * table. */ // 向新连接的节点发送 PING 命令,防止节点被识进入下线 // 如果节点被标记为 MEET ,那么发送 MEET 命令,否则发送 PING 命令 old_ping_sent = node->ping_sent; clusterSendPing(link, node->flags & REDIS_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
// 这不是第一次发送 PING 信息,所以可以还原这个时间 // 等 clusterSendPing() 函数来更新它 if (old_ping_sent) { /* If there was an active ping before the link was * disconnected, we want to restore the ping time, otherwise * replaced by the clusterSendPing() call. */ node->ping_sent = old_ping_sent; }
/* We can clear the flag after the first packet is sent. * * 在发送 MEET 信息之后,清除节点的 MEET 标识。 * * If we'll never receive a PONG, we'll never send new packets * to this node. Instead after the PONG is received and we * are no longer in meet/handshake status, we want to send * normal PING packets. * * 如果当前节点(发送者)没能收到 MEET 信息的回复, * 那么它将不再向目标节点发送命令。 * * 如果接收到回复的话,那么节点将不再处于 HANDSHAKE 状态, * 并继续向目标节点发送普通 PING 命令。 */ node->flags &= ~REDIS_NODE_MEET;
redisLog(REDIS_DEBUG,"Connecting with Node %.40s at %s:%d", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR); } } dictReleaseIterator(di);
/* Ping some random node 1 time every 10 iterations, so that we usually ping * one random node every second. */ // clusterCron() 每执行 10 次(至少间隔一秒钟),就向一个随机节点发送 gossip 信息 if (!(iteration % 10)) { int j;
/* Check a few random nodes and ping the one with the oldest * pong_received time. */ // 随机 5 个节点,选出其中一个 for (j = 0; j < 5; j++) {
// 随机在集群中挑选节点 de = dictGetRandomKey(server.cluster->nodes); clusterNode *this = dictGetVal(de);
/* Don't ping nodes disconnected or with a ping currently active. */ // 不要 PING 连接断开的节点,也不要 PING 最近已经 PING 过的节点 if (this->link == NULL || this->ping_sent != 0) continue;
if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue;
// 遍历所有节点,检查是否需要将某个节点标记为下线 /* Iterate nodes to check if we need to flag something as failing. * This loop is also responsible to: * 1) Check if there are orphaned masters (masters without non failing * slaves). * 2) Count the max number of non failing slaves for a single master. * 3) Count the number of slaves for our master, if we are a slave. */ orphaned_masters = 0; max_slaves = 0; this_slaves = 0; di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); now = mstime(); /* Use an updated time at every iteration. */ mstime_t delay;
// 跳过节点本身、无地址节点、HANDSHAKE 状态的节点 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE)) continue;
/* Orphaned master check, useful only if the current instance * is a slave that may migrate to another master. */ if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) { int okslaves = clusterCountNonFailingSlaves(node);
if (okslaves == 0 && node->numslots > 0) orphaned_masters++; if (okslaves > max_slaves) max_slaves = okslaves; if (nodeIsSlave(myself) && myself->slaveof == node) this_slaves = okslaves; }
/* If we are waiting for the PONG more than half the cluster * timeout, reconnect the link: maybe there is a connection * issue even if the node is alive. */ // 如果等到 PONG 到达的时间超过了 node timeout 一半的连接 // 因为尽管节点依然正常,但连接可能已经出问题了 if (node->link && /* is connected */ now - node->link->ctime > server.cluster_node_timeout && /* was not already reconnected */ node->ping_sent && /* we already sent a ping */ node->pong_received < node->ping_sent && /* still waiting pong */ /* and we are waiting for the pong more than timeout/2 */ now - node->ping_sent > server.cluster_node_timeout/2) { /* Disconnect the link, it will be reconnected automatically. */ // 释放连接,下次 clusterCron() 会自动重连 freeClusterLink(node->link); }
/* If we have currently no active ping in this instance, and the * received PONG is older than half the cluster timeout, send * a new ping now, to ensure all the nodes are pinged without * a too big delay. */ // 如果目前没有在 PING 节点 // 并且已经有 node timeout 一半的时间没有从节点那里收到 PONG 回复 // 那么向节点发送一个 PING ,确保节点的信息不会太旧 // (因为一部分节点可能一直没有被随机中) if (node->link && node->ping_sent == 0 && (now - node->pong_received) > server.cluster_node_timeout/2) { clusterSendPing(node->link, CLUSTERMSG_TYPE_PING); continue; }
/* If we are a master and one of the slaves requested a manual * failover, ping it continuously. */ // 如果这是一个主节点,并且有一个从服务器请求进行手动故障转移 // 那么向从服务器发送 PING 。 if (server.cluster->mf_end && nodeIsMaster(myself) && server.cluster->mf_slave == node && node->link) { clusterSendPing(node->link, CLUSTERMSG_TYPE_PING); continue; }
/* Check only if we have an active ping for this instance. */ // 以下代码只在节点发送了 PING 命令的情况下执行 if (node->ping_sent == 0) continue;
/* Compute the delay of the PONG. Note that if we already received * the PONG, then node->ping_sent is zero, so can't reach this * code at all. */ // 计算等待 PONG 回复的时长 delay = now - node->ping_sent;
// 等待 PONG 回复的时长超过了限制值,将目标节点标记为 PFAIL (疑似下线) if (delay > server.cluster_node_timeout) { /* Timeout reached. Set the node as possibly failing if it is * not already in this state. */ if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) { redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing", node->name); // 打开疑似下线标记 node->flags |= REDIS_NODE_PFAIL; update_state = 1; } } } dictReleaseIterator(di);
/* If we are a slave node but the replication is still turned off, * enable it if we know the address of our master and it appears to * be up. */ // 如果从节点没有在复制主节点,那么对从节点进行设置 if (nodeIsSlave(myself) && server.masterhost == NULL && myself->slaveof && nodeHasAddr(myself->slaveof)) { replicationSetMaster(myself->slaveof->ip, myself->slaveof->port); }
/* Abourt a manual failover if the timeout is reached. */ manualFailoverCheckTimeout();
if (nodeIsSlave(myself)) { clusterHandleManualFailover(); clusterHandleSlaveFailover(); /* If there are orphaned slaves, and we are a slave among the masters * with the max number of non-failing slaves, consider migrating to * the orphaned masters. Note that it does not make sense to try * a migration if there is no master with at least *two* working * slaves. */ if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves) clusterHandleSlaveMigration(max_slaves); }
// 更新集群状态 if (update_state || server.cluster->state == REDIS_CLUSTER_FAIL) clusterUpdateState(); }
2、状态信息并不是全量同步,而是随机选M(M << N)个节点的状态同步到其他节点。 M值最小为3,最大为N - 2,一般情况下M = N / 10。
/* Send a PING or PONG packet to the specified node, making sure to add enough * gossip informations. */ // 向指定节点发送一条 MEET 、 PING 或者 PONG 消息 void clusterSendPing(clusterLink *link, int type) { unsigned char buf[sizeof(clusterMsg)]; clusterMsg *hdr = (clusterMsg*) buf; int gossipcount = 0, totlen; /* freshnodes is the number of nodes we can still use to populate the * gossip section of the ping packet. Basically we start with the nodes * we have in memory minus two (ourself and the node we are sending the * message to). Every time we add a node we decrement the counter, so when * it will drop to <= zero we know there is no more gossip info we can * send. */ // freshnodes 是用于发送 gossip 信息的计数器 // 每次发送一条信息时,程序将 freshnodes 的值减一 // 当 freshnodes 的数值小于等于 0 时,程序停止发送 gossip 信息 // freshnodes 的数量是节点目前的 nodes 表中的节点数量减去 2 // 这里的 2 指两个节点,一个是 myself 节点(也即是发送信息的这个节点) // 另一个是接受 gossip 信息的节点 int freshnodes = dictSize(server.cluster->nodes)-2;
// 如果发送的信息是 PING ,那么更新最后一次发送 PING 命令的时间戳 if (link->node && type == CLUSTERMSG_TYPE_PING) link->node->ping_sent = mstime();
/* Create RESTORE payload and generate the protocol to call the command. */ for (j = 0; j < num_keys; j++) { long long ttl = 0; long long expireat = getExpire(c->db,kv[j]); //检查键是不是已经过期 if (expireat != -1) { ttl = expireat-mstime(); if (ttl < 0) { continue; } if (ttl < 1) ttl = 1; } kv[non_expired++] = kv[j];
/* If cluster is enabled perform the cluster redirection here. * * 如果开启了集群模式,那么在这里进行转向操作。 * * However we don't perform the redirection if: * * 不过,如果有以下情况出现,那么节点不进行转向: * * 1) The sender of this command is our master. * 命令的发送者是本节点的主节点 * * 2) The command has no key arguments. * 命令没有 key 参数 */ if (server.cluster_enabled && !(c->flags & REDIS_MASTER) && !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) { int hashslot;
// 集群已下线 if (server.cluster->state != REDIS_CLUSTER_OK) { flagTransaction(c); addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n")); return REDIS_OK;
// 集群运作正常 } else { int error_code; clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code); // 不能执行多键处理命令 if (n == NULL) { flagTransaction(c); if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) { addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n")); } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) { /* The request spawns mutliple keys in the same slot, * but the slot is not "stable" currently as there is * a migration or import in progress. */ addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n")); } else { redisPanic("getNodeByQuery() unknown error."); } return REDIS_OK;
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
...
/* If we are receiving the slot, and the client correctly flagged the * request as "ASKING", we can serve the request. However if the request * involves multiple keys and we don't have them all, the only option is * to send a TRYAGAIN error. */ if (importing_slot && (c->flags & REDIS_ASKING || cmd->flags & REDIS_CMD_ASKING)) { if (multiple_keys && missing_keys) { if (error_code) *error_code = REDIS_CLUSTER_REDIR_UNSTABLE; return NULL; } else { return myself; } } ... }
/* This is executed 10 times every second */ // 集群常规操作函数,默认每秒执行 10 次(每间隔 100 毫秒执行一次) void clusterCron(void) { ...
while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); now = mstime(); /* Use an updated time at every iteration. */ mstime_t delay;
...
/* If we are waiting for the PONG more than half the cluster * timeout, reconnect the link: maybe there is a connection * issue even if the node is alive. */ // 判断连接的节点是否出事 if (node->link && /* is connected */ now - node->link->ctime > server.cluster_node_timeout && /* was not already reconnected */ // ping_sent记录发送命令的时间 node->ping_sent && /* we already sent a ping */ node->pong_received < node->ping_sent && /* still waiting pong */ /* and we are waiting for the pong more than timeout/2 */ // PONG 到达的时间超过了 node_timeout 的一半 now - node->ping_sent > server.cluster_node_timeout/2) { /* Disconnect the link, it will be reconnected automatically. */ // 释放连接,此时node->link=NULL,下次 clusterCron() 会自动重连 freeClusterLink(node->link); } ... } ... }
/* This is executed 10 times every second */ // 集群常规操作函数,默认每秒执行 10 次(每间隔 100 毫秒执行一次) void clusterCron(void) {
...
/* Check if we have disconnected nodes and re-establish the connection. */ // 向集群中的所有断线或者未连接节点发送消息 di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de);
// 跳过当前节点以及没有地址的节点 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
...
// 为未创建连接的节点创建连接 if (node->link == NULL) { int fd; mstime_t old_ping_sent; clusterLink *link;
fd = anetTcpNonBlockBindConnect(server.neterr, node->ip, node->port+REDIS_CLUSTER_PORT_INCR, server.bindaddr_count ? server.bindaddr[0] : NULL); if (fd == -1) { redisLog(REDIS_DEBUG, "Unable to connect to " "Cluster Node [%s]:%d -> %s", node->ip, node->port+REDIS_CLUSTER_PORT_INCR, server.neterr); continue; } link = createClusterLink(node); link->fd = fd; node->link = link; aeCreateFileEvent(server.el,link->fd,AE_READABLE, clusterReadHandler,link); /* Queue a PING in the new connection ASAP: this is crucial * to avoid false positives in failure detection. * * If the node is flagged as MEET, we send a MEET message instead * of a PING one, to force the receiver to add us in its node * table. */ // 向新连接的节点发送 PING 命令,防止节点被识进入下线 // 如果节点被标记为 MEET ,那么发送 MEET 命令,否则发送 PING 命令 old_ping_sent = node->ping_sent; clusterSendPing(link, node->flags & REDIS_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
// 这不是第一次发送 PING 信息,所以可以还原这个时间 // 等 clusterSendPing() 函数来更新它 if (old_ping_sent) { /* If there was an active ping before the link was * disconnected, we want to restore the ping time, otherwise * replaced by the clusterSendPing() call. */ node->ping_sent = old_ping_sent; }
/* We can clear the flag after the first packet is sent. * * 在发送 MEET 信息之后,清除节点的 MEET 标识。 * * If we'll never receive a PONG, we'll never send new packets * to this node. Instead after the PONG is received and we * are no longer in meet/handshake status, we want to send * normal PING packets. * * 如果当前节点(发送者)没能收到 MEET 信息的回复, * 那么它将不再向目标节点发送命令。 * * 如果接收到回复的话,那么节点将不再处于 HANDSHAKE 状态, * 并继续向目标节点发送普通 PING 命令。 */ node->flags &= ~REDIS_NODE_MEET;
redisLog(REDIS_DEBUG,"Connecting with Node %.40s at %s:%d", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR); } } ... }
// 遍历所有节点,检查是否需要将某个节点标记为下线 /* Iterate nodes to check if we need to flag something as failing. * This loop is also responsible to: * 1) Check if there are orphaned masters (masters without non failing * slaves). * 2) Count the max number of non failing slaves for a single master. * 3) Count the number of slaves for our master, if we are a slave. */ orphaned_masters = 0; max_slaves = 0; this_slaves = 0; di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { ...
/* Check only if we have an active ping for this instance. */ // 以下代码只在节点发送了 PING 命令的情况下执行 if (node->ping_sent == 0) continue;
/* Compute the delay of the PONG. Note that if we already received * the PONG, then node->ping_sent is zero, so can't reach this * code at all. */ // 计算等待 PONG 回复的时长 delay = now - node->ping_sent;
// 等待 PONG 回复的时长超过了限制值,将目标节点标记为 PFAIL (疑似下线) if (delay > server.cluster_node_timeout) { /* Timeout reached. Set the node as possibly failing if it is * not already in this state. */ if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) { redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing", node->name); // 打开疑似下线标记 node->flags |= REDIS_NODE_PFAIL; update_state = 1; } } } ... }
// 这是一条 PING 消息或者 MEET 消息 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) { redisLog(REDIS_DEBUG,"Ping packet received: %p", (void*)link->node);
/* Add this node if it is new for us and the msg type is MEET. * * 如果当前节点是第一次遇见这个节点,并且对方发来的是 MEET 信息, * 那么将这个节点添加到集群的节点列表里面。 * * In this stage we don't try to add the node with the right * flags, slaveof pointer, and so forth, as this details will be * resolved when we'll receive PONGs from the node. * * 节点目前的 flag 、 slaveof 等属性的值都是未设置的, * 等当前节点向对方发送 PING 命令之后, * 这些信息可以从对方回复的 PONG 信息中取得。 */ if (!sender && type == CLUSTERMSG_TYPE_MEET) { clusterNode *node;
// 遍历所有节点的信息 while(count--) { sds ci = sdsempty();
// 分析节点的 flag uint16_t flags = ntohs(g->flags);
// 信息节点 clusterNode *node;
...
/* Update our state accordingly to the gossip sections */ // 使用消息中的信息对节点进行更新 node = clusterLookupNode(g->nodename); // 节点已经存在于当前节点 if (node) { /* We already know this node. Handle failure reports, only when the sender is a master. */ // 如果 sender 是一个主节点,那么我们需要处理下线报告 if (sender && nodeIsMaster(sender) && node != myself) { // 节点处于 FAIL 或者 PFAIL 状态 if (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)) {
// 添加 sender 对 node 的下线报告 if (clusterNodeAddFailureReport(node,sender)) { redisLog(REDIS_VERBOSE, "Node %.40s reported node %.40s as not reachable.", sender->name, node->name); }
// 如果 sender 曾经发送过对 node 的下线报告 // 那么清除该报告 if (clusterNodeDelFailureReport(node,sender)) { redisLog(REDIS_VERBOSE, "Node %.40s reported node %.40s is back online.", sender->name, node->name); } } }
/* If we already know this node, but it is not reachable, and * we see a different address in the gossip section, start an * handshake with the (possibly) new address: this will result * into a node address update if the handshake will be * successful. */ // 如果节点之前处于 PFAIL 或者 FAIL 状态 // 并且该节点的 IP 或者端口号已经发生变化 // 那么可能是节点换了新地址,尝试对它进行握手 if (node->flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL) && (strcasecmp(node->ip,g->ip) || node->port != ntohs(g->port))) { clusterStartHandshake(g->ip,ntohs(g->port)); }
// 当前节点不认识 node } else { /* If it's not in NOADDR state and we don't have it, we * start a handshake process against this IP/PORT pairs. * * 如果 node 不在 NOADDR 状态,并且当前节点不认识 node * 那么向 node 发送 HANDSHAKE 消息。 * * Note that we require that the sender of this gossip message * is a well known node in our cluster, otherwise we risk * joining another cluster. * * 注意,当前节点必须保证 sender 是本集群的节点, * 否则我们将有加入了另一个集群的风险。 */ if (sender && !(flags & REDIS_NODE_NOADDR) && !clusterBlacklistExists(g->nodename)) { clusterStartHandshake(g->ip,ntohs(g->port)); } }
/* This function checks if a given node should be marked as FAIL. * It happens if the following conditions are met: * * 此函数用于判断是否需要将 node 标记为 FAIL 。 * * 将 node 标记为 FAIL 需要满足以下两个条件: * * 1) We received enough failure reports from other master nodes via gossip. * Enough means that the majority of the masters signaled the node is * down recently. * 有半数以上的主节点将 node 标记为 PFAIL 状态。 * 2) We believe this node is in PFAIL state. * 当前节点也将 node 标记为 PFAIL 状态。 * * If a failure is detected we also inform the whole cluster about this * event trying to force every other node to set the FAIL flag for the node. * * 如果确认 node 已经进入了 FAIL 状态, * 那么节点还会向其他节点发送 FAIL 消息,让其他节点也将 node 标记为 FAIL 。 * * Note that the form of agreement used here is weak, as we collect the majority * of masters state during some time, and even if we force agreement by * propagating the FAIL message, because of partitions we may not reach every * node. However: * * 注意,集群判断一个 node 进入 FAIL 所需的条件是弱(weak)的, * 因为节点们对 node 的状态报告并不是实时的,而是有一段时间间隔 * (这段时间内 node 的状态可能已经发生了改变), * 并且尽管当前节点会向其他节点发送 FAIL 消息, * 但因为网络分裂(network partition)的问题, * 有一部分节点可能还是会不知道将 node 标记为 FAIL 。 * * 不过: * * 1) Either we reach the majority and eventually the FAIL state will propagate * to all the cluster. * 只要我们成功将 node 标记为 FAIL , * 那么这个 FAIL 状态最终(eventually)总会传播至整个集群的所有节点。 * 2) Or there is no majority so no slave promotion will be authorized and the * FAIL flag will be cleared after some time. * 又或者,因为没有半数的节点支持,当前节点不能将 node 标记为 FAIL , * 所以对 FAIL 节点的故障转移将无法进行, FAIL 标识可能会在之后被移除。 * */ void markNodeAsFailingIfNeeded(clusterNode *node) { int failures;
/* Also count myself as a voter if I'm a master. */ // 如果当前节点是主节点,那么将当前节点也算在 failures 之内 if (nodeIsMaster(myself)) failures++; // 报告下线节点的数量不足节点总数的一半,不能将节点判断为 FAIL ,返回 if (failures < needed_quorum) return; /* No weak agreement from masters. */
redisLog(REDIS_NOTICE, "Marking node %.40s as failing (quorum reached).", node->name);
/* Mark the node as failing. */ // 将 node 标记为 FAIL node->flags &= ~REDIS_NODE_PFAIL; node->flags |= REDIS_NODE_FAIL; node->fail_time = mstime();
/* Broadcast the failing node name to everybody, forcing all the other * reachable nodes to flag the node as FAIL. */ // 如果当前节点是主节点的话,那么向其他节点发送报告 node 的 FAIL 信息 // 让其他节点也将 node 标记为 FAIL if (nodeIsMaster(myself)) clusterSendFail(node->name); clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); }
/* Send a FAIL message to all the nodes we are able to contact. * * 向当前节点已知的所有节点发送 FAIL 信息。 */ void clusterSendFail(char *nodename) { unsigned char buf[sizeof(clusterMsg)]; clusterMsg *hdr = (clusterMsg *) buf;
/* Send a message to all the nodes that are part of the cluster having * a connected link. * * 向节点连接的所有其他节点发送信息。 */ void clusterBroadcastMessage(void *buf, size_t len) { dictIterator *di; dictEntry *de;
// 遍历所有已知节点 di = dictGetSafeIterator(server.cluster->nodes); while ((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de);
/* Abourt a manual failover if the timeout is reached. */ manualFailoverCheckTimeout();
// 如果当前节点是子节点 if (nodeIsSlave(myself)) { clusterHandleManualFailover(); // 处理集群子节点的故障迁移 clusterHandleSlaveFailover(); /* If there are orphaned slaves, and we are a slave among the masters * with the max number of non-failing slaves, consider migrating to * the orphaned masters. Note that it does not make sense to try * a migration if there is no master with at least *two* working * slaves. */ if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves) clusterHandleSlaveMigration(max_slaves); }
// 更新集群状态 if (update_state || server.cluster->state == REDIS_CLUSTER_FAIL) clusterUpdateState(); }
/* This function is called if we are a slave node and our master serving * a non-zero amount of hash slots is in FAIL state. * * 如果当前节点是一个从节点,并且它正在复制的一个负责非零个槽的主节点处于 FAIL 状态, * 那么执行这个函数。 * * The gaol of this function is: * * 这个函数有三个目标: * * 1) To check if we are able to perform a failover, is our data updated? * 检查是否可以对主节点执行一次故障转移,节点的关于主节点的信息是否准确和最新(updated)? * 2) Try to get elected by masters. * 选举一个新的主节点 * 3) Perform the failover informing all the other nodes. * 执行故障转移,并通知其他节点 */ void clusterHandleSlaveFailover(void) { mstime_t data_age; mstime_t auth_age = mstime() - server.cluster->failover_auth_time; int needed_quorum = (server.cluster->size / 2) + 1; int manual_failover = server.cluster->mf_end != 0 && server.cluster->mf_can_start; int j; mstime_t auth_timeout, auth_retry_time;
/* Compute the failover timeout (the max time we have to send votes * and wait for replies), and the failover retry time (the time to wait * before waiting again. * * Timeout is MIN(NODE_TIMEOUT*2,2000) milliseconds. * Retry is two times the Timeout. */ auth_timeout = server.cluster_node_timeout * 2; if (auth_timeout < 2000) auth_timeout = 2000; auth_retry_time = auth_timeout * 2;
/* Pre conditions to run the function, that must be met both in case * of an automatic or manual failover: * 1) We are a slave. * 2) Our master is flagged as FAIL, or this is a manual failover. * 3) It is serving slots. */ if (nodeIsMaster(myself) || myself->slaveof == NULL || (!nodeFailed(myself->slaveof) && !manual_failover) || myself->slaveof->numslots == 0) return; ...
/* Set data_age to the number of seconds we are disconnected from * the master. */ // 将 data_age 设置为从节点与主节点的断开秒数 if (server.repl_state == REDIS_REPL_CONNECTED) { data_age = (mstime_t) (server.unixtime - server.master->lastinteraction) * 1000; } else { data_age = (mstime_t) (server.unixtime - server.repl_down_since) * 1000; }
/* Remove the node timeout from the data age as it is fine that we are * disconnected from our master at least for the time it was down to be * flagged as FAIL, that's the baseline. */ // node timeout 的时间不计入断线时间之内 if (data_age > server.cluster_node_timeout) data_age -= server.cluster_node_timeout;
/* Check if our data is recent enough. For now we just use a fixed * constant of ten times the node timeout since the cluster should * react much faster to a master down. * * Check bypassed for manual failovers. */ // 检查这个从节点的数据是否较新: // 目前的检测办法是断线时间不能超过 node timeout 的十倍 if (data_age > ((mstime_t)server.repl_ping_slave_period * 1000) + (server.cluster_node_timeout * REDIS_CLUSTER_SLAVE_VALIDITY_MULT)) { if (!manual_failover) return; }
/* If the previous failover attempt timedout and the retry time has * elapsed, we can setup a new one. */ if (auth_age > auth_retry_time) { server.cluster->failover_auth_time = mstime() + 500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */ random() % 500; /* Random delay between 0 and 500 milliseconds. */ server.cluster->failover_auth_count = 0; server.cluster->failover_auth_sent = 0; server.cluster->failover_auth_rank = clusterGetSlaveRank(); /* We add another delay that is proportional to the slave rank. * Specifically 1 second * rank. This way slaves that have a probably * less updated replication offset, are penalized. */ server.cluster->failover_auth_time += server.cluster->failover_auth_rank * 1000; /* However if this is a manual failover, no delay is needed. */ if (server.cluster->mf_end) { server.cluster->failover_auth_time = mstime(); server.cluster->failover_auth_rank = 0; } redisLog(REDIS_WARNING, "Start of election delayed for %lld milliseconds " "(rank #%d, offset %lld).", server.cluster->failover_auth_time - mstime(), server.cluster->failover_auth_rank, replicationGetSlaveOffset()); /* Now that we have a scheduled election, broadcast our offset * to all the other slaves so that they'll updated their offsets * if our offset is better. */ clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES); return; } ... /* Return ASAP if we can't still start the election. */ // 如果执行故障转移的时间未到,先返回 if (mstime() < server.cluster->failover_auth_time) return;
/* Check if we reached the quorum. */ // 如果当前节点获得了足够多的投票,那么对下线主节点进行故障转移 if (server.cluster->failover_auth_count >= needed_quorum) { // 旧主节点 clusterNode *oldmaster = myself->slaveof;
redisLog(REDIS_WARNING, "Failover election won: I'm the new master.");
/* We have the quorum, perform all the steps to correctly promote * this slave to a master. * * 1) Turn this node into a master. * 将当前节点的身份由从节点改为主节点 */ clusterSetNodeAsMaster(myself); // 让从节点取消复制,成为新的主节点 replicationUnsetMaster();
/* 2) Claim all the slots assigned to our master. */ // 接收所有主节点负责处理的槽 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) { if (clusterNodeGetSlotBit(oldmaster, j)) { // 将槽设置为未分配的 clusterDelSlot(j); // 将槽的负责人设置为当前节点 clusterAddSlot(myself, j); } }
/* 3) Update my configEpoch to the epoch of the election. */ // 更新集群配置纪元 myself->configEpoch = server.cluster->failover_auth_epoch;
/* 4) Update state and save config. */ // 更新节点状态 clusterUpdateState(); // 并保存配置文件 clusterSaveConfigOrDie(1);
/* 5) Pong all the other nodes so that they can update the state * accordingly and detect that we switched to master role. */ // 向所有节点发送 PONG 信息 // 让它们可以知道当前节点已经升级为主节点了 clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
/* 6) If there was a manual failover in progress, clear the state. */ // 如果有手动故障转移正在执行,那么清理和它有关的状态 resetManualFailover(); } }
/* 5) Pong all the other nodes so that they can update the state * accordingly and detect that we switched to master role. */ // 向所有节点发送 PONG 信息 // 让它们可以知道当前节点已经升级为主节点了 clusterBroadcastPong(CLUSTER_BROADCAST_ALL);