不论是桌面应用还是 Web 应用,多线程代码都是比较难玩得转的,玩不明白的结果就是一大堆令人毛骨悚然且难以捉摸、难以调试的问题——实际上,一旦你意识到正在处理一个并发问题,你可能就不得不完全放弃调试了,并转而手动检查代码。 鉴于此,我们当然是希望尽量避免并发问题的,理想情况下希望完全避免多线程错误,同样,不存在那种一刀切的方法,但这有一些调试和防止多线程错误的实际考虑因素:
数据一致性问题 本来好好的,A 系统调用 BC 系统接口,如果 BC 系统出错了,会抛出异常,返回给 A 系统让 A 系统知道,这样的话就可以做回滚操作了 但是使用了 MQ 之后,A 系统发送完消息就完事了,认为成功了。而刚好 C 系统写数据库的时候失败了,但是 A 认为 C 已经成功了?这样一来数据就不一致了。
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; }
public void onData(ByteBuffer bb) { // Grab the next sequence long sequence = ringBuffer.next(); try { // Get the entry in the Disruptor LongEvent event = ringBuffer.get(sequence); // for the sequence Fill with data event.set(bb.getLong(0)); } finally { ringBuffer.publish(sequence); } } }
public static void main(String[] args) { // Executor that will be used to construct new threads for consumers Executor executor = Executors.newCachedThreadPool();
// The factory for the event LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024;
// Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, executor);
// Connect the handler disruptor.handleEventsWith(new LongEventHandler());
// Start the Disruptor, starts all threads running disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); producer.onData(bb); } } }
// 检查 master id 是否和 runid 一致,只有一致的情况下才考虑执行psync if (strcasecmp(master_runid, server.runid)) { /* Run id "?" is used by slaves that want to force a full resync. */ // 从服务器提供的 run id 和服务器的 run id 不一致 if (master_runid[0] != '?') { redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: " "Runid mismatch (Client asked for runid '%s', my runid is '%s')", master_runid, server.runid); // 从服务器提供的 run id 为 '?' ,表示强制 FULL RESYNC } else { redisLog(REDIS_NOTICE,"Full resync requested by slave."); } // 需要 full resync goto need_full_resync; }
// 判断当前Slave带来的offset在Master的backlog中是否还能找到,找不到则执行全量复制 if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != REDIS_OK) goto need_full_resync;
// 如果没有backlog if (!server.repl_backlog || // 或者 psync_offset 小于 server.repl_backlog_off // (想要恢复的那部分数据已经被覆盖) psync_offset < server.repl_backlog_off || // psync offset 大于 backlog 所保存的数据的偏移量 psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) { // 执行 FULL RESYNC redisLog(REDIS_NOTICE, "Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset); if (psync_offset > server.master_repl_offset) { redisLog(REDIS_WARNING, "Warning: slave tried to PSYNC with an offset that is greater than the master replication offset."); } goto need_full_resync; }
void syncCommand(redisClient *c) { ... if (!strcasecmp(c->argv[0]->ptr,"psync")) { // 尝试进行 PSYNC if (masterTryPartialResynchronization(c) == REDIS_OK) { // 可执行 PSYNC server.stat_sync_partial_ok++; return; /* No full resync needed, return. */ } else { // 不可执行 PSYNC char *master_runid = c->argv[1]->ptr; /* Increment stats for failed PSYNCs, but only if the * runid is not "?", as this is used by slaves to force a full * resync on purpose when they are not albe to partially * resync. */ if (master_runid[0] != '?') server.stat_sync_partial_err++; } } ... }
int masterTryPartialResynchronization(redisClient *c) { ...
/* If we reached this point, we are able to perform a partial resync: * 程序运行到这里,说明可以执行 partial resync * * 1) Set client state to make it a slave. * 将客户端状态设为 salve * * 2) Inform the client we can continue with +CONTINUE * 向 slave 发送 +CONTINUE ,表示 partial resync 的请求被接受 * * 3) Send the backlog data (from the offset to the end) to the slave. * 发送 backlog 中,客户端所需要的数据 */ c->flags |= REDIS_SLAVE; c->replstate = REDIS_REPL_ONLINE; c->repl_ack_time = server.unixtime; listAddNodeTail(server.slaves,c); /* We can't use the connection buffers since they are used to accumulate * new commands at this stage. But we are sure the socket send buffer is * emtpy so this write will never fail actually. */ // 向从服务器发送一个同步 +CONTINUE ,表示 PSYNC 可以执行 buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); if (write(c->fd,buf,buflen) != buflen) { freeClientAsync(c); return REDIS_OK; } // 发送 backlog 中的内容(也即是从服务器缺失的那些内容)到从服务器 psync_len = addReplyReplicationBacklog(c,psync_offset); redisLog(REDIS_NOTICE, "Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset); ... }
void processInputBufferAndReplicate(client *c) { // 处理命令然后广播命令 // if this is a slave, we just process the commands if (!(c->flags & CLIENT_MASTER)) { processInputBuffer(c); } else { /* If the client is a master we need to compute the difference * between the applied offset before and after processing the buffer, * to understand how much of the replication stream was actually * applied to the master state: this quantity, and its corresponding * part of the replication stream, will be propagated to the * sub-replicas and to the replication backlog. */ size_t prev_offset = c->reploff; processInputBuffer(c); // applied is how much of the replication stream was actually applied to the master state size_t applied = c->reploff - prev_offset; if (applied) {
/* Here we need to check if there is a background saving operation * in progress, or if it is required to start one */ // 检查是否有 BGSAVE 在执行 if (server.rdb_child_pid != -1) { /* Ok a background save is in progress. Let's check if it is a good * one for replication, i.e. if there is another slave that is * registering differences since the server forked to save */ redisClient *slave; listNode *ln; listIter li;
if (ln) { /* Perfect, the server is already registering differences for * another slave. Set the right state, and copy the buffer. */ // 幸运的情况,可以使用目前 BGSAVE 所生成的 RDB copyClientOutputBuffer(c,slave); // 设置复制状态 c->replstate = REDIS_REPL_WAIT_BGSAVE_END; redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC"); } else { /* No way, we need to wait for the next BGSAVE in order to * register differences */ // 不好运的情况,必须等待下个 BGSAVE c->replstate = REDIS_REPL_WAIT_BGSAVE_START; redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); } } else { /* Ok we don't have a BGSAVE in progress, let's start one */ // 没有 BGSAVE 在进行,开始一个新的 BGSAVE redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC"); if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) { redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); addReplyError(c,"Unable to perform background save"); return; } // 设置状态 c->replstate = REDIS_REPL_WAIT_BGSAVE_END; /* Flush the script cache for the new slave. */ // 因为新 slave 进入,刷新复制脚本缓存 replicationScriptCacheFlush(); } ...
# It is possible for a master to stop accepting writes if there are less than # N slaves connected, having a lag less or equal than M seconds. # # The N slaves need to be in "online" state. # # The lag in seconds, that must be <= the specified value, is calculated from # the last ping received from the slave, that is usually sent every second. # # This option does not GUARANTEES that N replicas will accept the write, but # will limit the window of exposure for lost writes in case not enough slaves # are available, to the specified number of seconds. # # For example to require at least 3 slaves with a lag <= 10 seconds use: # # min-slaves-to-write 3 # min-slaves-max-lag 10 # # Setting one or the other to 0 disables the feature. # # By default min-slaves-to-write is set to 0 (feature disabled) and # min-slaves-max-lag is set to 10.
struct SDS<T> { T capacity; // 数组容量 T len; // 数组当前长度 byte flags; // 特殊标识位,不理睬它 byte[] content; // 数组内容 }
下面的函数将 t 数组拷贝到 s 中,如果长度不够则需要进行扩容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/* Append the specified binary-safe string pointed by 't' of 'len' bytes to the * end of the specified sds string 's'. * * After the call, the passed sds string is no longer valid and all the * references must be substituted with the new pointer returned by the call. */ sds sdscatlen(sds s, const void *t, size_t len) { size_t curlen = sdslen(s); // 原字符串长度
// 创建 epoll 实例 state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ if (state->epfd == -1) { zfree(state->events); zfree(state); return -1; }
// 1、看是否有事件到达了执行时间 // 2、如果有,则执行这些事件 /* Process every pending time event, then every pending file event * (that may be registered by time event callbacks just processed). * * 处理所有已到达的时间事件,以及所有已就绪的文件事件。 * * Without special flags the function sleeps until some file event * fires, or when the next time event occurs (if any). * * 如果不传入特殊 flags 的话,那么函数睡眠直到文件事件就绪, * 或者下个时间事件到达(如果有的话)。 * * If flags is 0, the function does nothing and returns. * 如果 flags 为 0 ,那么函数不作动作,直接返回。 * * if flags has AE_ALL_EVENTS set, all the kind of events are processed. * 如果 flags 包含 AE_ALL_EVENTS ,所有类型的事件都会被处理。 * * if flags has AE_FILE_EVENTS set, file events are processed. * 如果 flags 包含 AE_FILE_EVENTS ,那么处理文件事件。 * * if flags has AE_TIME_EVENTS set, time events are processed. * 如果 flags 包含 AE_TIME_EVENTS ,那么处理时间事件。 * * if flags has AE_DONT_WAIT set the function returns ASAP until all * the events that's possible to process without to wait are processed. * 如果 flags 包含 AE_DONT_WAIT , * 那么函数在处理完所有不许阻塞的事件之后,即刻返回。 * * The function returns the number of events processed. * 函数的返回值为已处理事件的数量 */ int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents;
/* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp;
// 获取最近的时间事件 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); if (shortest) { // 如果时间事件存在的话 // 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间 long now_sec, now_ms;
/* Calculate the time missing for the nearest * timer to fire. */ // 计算距今最近的时间事件还要多久才能达到 // 并将该时间距保存在 tv 结构中 aeGetTime(&now_sec, &now_ms); tvp = &tv; tvp->tv_sec = shortest->when_sec - now_sec; if (shortest->when_ms < now_ms) { tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000; tvp->tv_sec --; } else { tvp->tv_usec = (shortest->when_ms - now_ms)*1000; }
/* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { // 设置文件事件不阻塞 tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ // 文件事件可以阻塞直到有事件到达为止 tvp = NULL; /* wait forever */ } }
int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn't * processed, so we check if the event is still valid. */ // 读事件 if (fe->mask & mask & AE_READABLE) { // rfired 确保读/写事件只能执行其中一个 rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } // 写事件 if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); }
processed++; } }
/* Check time events */ // 执行时间事件 if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */ }
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { ...
/* Run the Sentinel timer if we are in sentinel mode. */ // 如果服务器运行在 sentinel 模式下,那么执行 SENTINEL 的主函数 run_with_period(100) { if (server.sentinel_mode) sentinelTimer(); }
// 根据情况,向实例发送 PING、 INFO 或者 PUBLISH 命令 sentinelSendPeriodicCommands(ri);
... }
// 根据时间和实例类型等情况,向实例发送命令,比如 INFO 、PING 和 PUBLISH // 虽然函数的名字包含 Ping ,但命令并不只发送 PING 命令 /* Send periodic PING, INFO, and PUBLISH to the Hello channel to * the specified master or slave instance. */ void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) { mstime_t now = mstime(); mstime_t info_period, ping_period; int retval;
/* Return ASAP if we have already a PING or INFO already pending, or * in the case the instance is not properly connected. */ // 函数不能在网络连接未创建时执行 if (ri->flags & SRI_DISCONNECTED) return;
/* For INFO, PING, PUBLISH that are not critical commands to send we * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't * want to use a lot of memory just because a link is not working * properly (note that anyway there is a redundant protection about this, * that is, the link will be disconnected and reconnected if a long * timeout condition is detected. */ // 为了避免 sentinel 在实例处于不正常状态时,发送过多命令 // sentinel 只在待发送命令的数量未超过 SENTINEL_MAX_PENDING_COMMANDS 常量时 // 才进行命令发送 if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
/* If this is a slave of a master in O_DOWN condition we start sending * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD * period. In this state we want to closely monitor slaves in case they * are turned into masters by another Sentinel, or by the sysadmin. */ // 对于从服务器来说, sentinel 默认每 SENTINEL_INFO_PERIOD 秒向它发送一次 INFO 命令 // 但是,当从服务器的主服务器处于 SDOWN 状态,或者正在执行故障转移时 // 为了更快速地捕捉从服务器的变动, sentinel 会将发送 INFO 命令的频率该为每秒一次 if ((ri->flags & SRI_SLAVE) && (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) { info_period = 1000; } else { info_period = SENTINEL_INFO_PERIOD; }
/* We ping instances every time the last received pong is older than * the configured 'down-after-milliseconds' time, but every second * anyway if 'down-after-milliseconds' is greater than 1 second. */ ping_period = ri->down_after_period; if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;
// 实例不是 Sentinel (主服务器或者从服务器) // 并且以下条件的其中一个成立: // 1)SENTINEL 未收到过这个服务器的 INFO 命令回复 // 2)距离上一次该实例回复 INFO 命令已经超过 info_period 间隔 // 那么向实例发送 INFO 命令 if ((ri->flags & SRI_SENTINEL) == 0 && (ri->info_refresh == 0 || (now - ri->info_refresh) > info_period)) { /* Send INFO to masters and slaves, not sentinels. */ retval = redisAsyncCommand(ri->cc, sentinelInfoReplyCallback, NULL, "INFO"); if (retval == REDIS_OK) ri->pending_commands++; } else if ((now - ri->last_pong_time) > ping_period) { /* Send PING to all the three kinds of instances. */ sentinelSendPing(ri); } else if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) { /* PUBLISH hello messages to all the three kinds of instances. */ sentinelSendHello(ri); } }
/* We don't proceed with the acting half if we are in TILT mode. * TILT happens when we find something odd with the time, like a * sudden change in the clock. */ // 如果 Sentinel 处于 TILT 模式,那么不执行故障检测。 if (sentinel.tilt) {
// 如果 TILI 模式未解除,那么不执行动作 if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
/* Every kind of instance */ // 检查给定实例是否进入 SDOWN 状态 sentinelCheckSubjectivelyDown(ri);
... }
/* Is this instance down from our point of view? */ // 检查实例是否已下线(从本 Sentinel 的角度来看) void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
...
/* Update the SDOWN flag. We believe the instance is SDOWN if: * * 更新 SDOWN 标识。如果以下条件被满足,那么 Sentinel 认为实例已下线: * * 1) It is not replying. * 它没有回应命令 * 2) We believe it is a master, it reports to be a slave for enough time * to meet the down_after_period, plus enough time to get two times * INFO report from the instance. * Sentinel 认为实例是主服务器,这个服务器向 Sentinel 报告它将成为从服务器, * 但在超过给定时限之后,服务器仍然没有完成这一角色转换。 */ if (elapsed > ri->down_after_period || (ri->flags & SRI_MASTER && ri->role_reported == SRI_SLAVE && mstime() - ri->role_reported_time > (ri->down_after_period+SENTINEL_INFO_PERIOD*2))) { /* Is subjectively down */ if ((ri->flags & SRI_S_DOWN) == 0) { // 发送事件 sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@"); // 记录进入 SDOWN 状态的时间 ri->s_down_since_time = mstime(); // 打开 SDOWN 标志 ri->flags |= SRI_S_DOWN; } } else { // 移除(可能有的) SDOWN 状态 /* Is subjectively up */ if (ri->flags & SRI_S_DOWN) { // 发送事件 sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@"); // 移除相关标志 ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT); } } }
/* If this is a slave of a master in O_DOWN condition we start sending * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD * period. In this state we want to closely monitor slaves in case they * are turned into masters by another Sentinel, or by the sysadmin. */ // 对于从服务器来说, sentinel 默认每 SENTINEL_INFO_PERIOD 秒向它发送一次 INFO 命令 // 但是,当从服务器的主服务器处于 SDOWN 状态,或者正在执行故障转移时 // 为了更快速地捕捉从服务器的变动, sentinel 会将发送 INFO 命令的频率该为每秒一次 if ((ri->flags & SRI_SLAVE) && (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) { info_period = 1000; } else { info_period = SENTINEL_INFO_PERIOD; }
...
// 实例不是 Sentinel (主服务器或者从服务器) // 并且以下条件的其中一个成立: // 1)SENTINEL 未收到过这个服务器的 INFO 命令回复 // 2)距离上一次该实例回复 INFO 命令已经超过 info_period 间隔 // 那么向实例发送 INFO 命令 if ((ri->flags & SRI_SENTINEL) == 0 && (ri->info_refresh == 0 || (now - ri->info_refresh) > info_period)) { /* Send INFO to masters and slaves, not sentinels. */ retval = redisAsyncCommand(ri->cc, sentinelInfoReplyCallback, NULL, "INFO"); if (retval == REDIS_OK) ri->pending_commands++; } else if ... } }
/* The following fields are used to take the slave state on elections. */ // 以下这些域被用于进行故障转移选举
// 上次执行选举或者下次执行选举的时间 mstime_t failover_auth_time; /* Time of previous or next election. */
// 节点获得的投票数量 int failover_auth_count; /* Number of votes received so far. */
// 如果值为 1 ,表示本节点已经向其他节点发送了投票请求 int failover_auth_sent; /* True if we already asked for votes. */
int failover_auth_rank; /* This slave rank for current auth request. */
uint64_t failover_auth_epoch; /* Epoch of the current election. */
/* Manual failover state in common. */ /* 共用的手动故障转移状态 */
// 手动故障转移执行的时间限制 mstime_t mf_end; /* Manual failover time limit (ms unixtime). It is zero if there is no MF in progress. */ /* Manual failover state of master. */ /* 主服务器的手动故障转移状态 */ clusterNode *mf_slave; /* Slave performing the manual failover. */ /* Manual failover state of slave. */ /* 从服务器的手动故障转移状态 */ long long mf_master_offset; /* Master offset the slave needs to start MF or zero if stil not received. */ // 指示手动故障转移是否可以开始的标志值 // 值为非 0 时表示各个主服务器可以开始投票 int mf_can_start; /* If non-zero signal that the manual failover can start requesting masters vote. */
/* The followign fields are uesd by masters to take state on elections. */ /* 以下这些域由主服务器使用,用于记录选举时的状态 */
// 集群最后一次进行投票的纪元 uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
// 在进入下个事件循环之前要做的事情,以各个 flag 来记录 int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
// 通过 cluster 连接发送的消息数量 long long stats_bus_messages_sent; /* Num of msg sent via cluster bus. */
// 通过 cluster 接收到的消息数量 long long stats_bus_messages_received; /* Num of msg rcvd via cluster bus.*/
/* If cluster is enabled perform the cluster redirection here. * * 如果开启了集群模式,那么在这里进行转向操作。 * * However we don't perform the redirection if: * * 不过,如果有以下情况出现,那么节点不进行转向: * * 1) The sender of this command is our master. * 命令的发送者是本节点的主节点 * * 2) The command has no key arguments. * 命令没有 key 参数 */ if (server.cluster_enabled && !(c->flags & REDIS_MASTER) && !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) { int hashslot;
// 集群已下线 if (server.cluster->state != REDIS_CLUSTER_OK) { flagTransaction(c); addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n")); return REDIS_OK;
// 集群运作正常 } else { int error_code; clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code); // 不能执行多键处理命令 if (n == NULL) { flagTransaction(c); if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) { addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n")); } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) { /* The request spawns mutliple keys in the same slot, * but the slot is not "stable" currently as there is * a migration or import in progress. */ addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n")); } else { redisPanic("getNodeByQuery() unknown error."); } return REDIS_OK;
分区使用的粒度是key,不能使用一个非常长的排序key存储一个数据集(The partitioning granularity is the key, so it is not possible to shard a dataset with a single huge key like a very big sorted set).
/* This is executed 10 times every second */ // 集群常规操作函数,默认每秒执行 10 次(每间隔 100 毫秒执行一次) void clusterCron(void) { dictIterator *di; dictEntry *de; int update_state = 0; int orphaned_masters; /* How many masters there are without ok slaves. */ int max_slaves; /* Max number of ok slaves for a single master. */ int this_slaves; /* Number of ok slaves for our master (if we are slave). */ mstime_t min_pong = 0, now = mstime(); clusterNode *min_pong_node = NULL; // 迭代计数器,一个静态变量 static unsigned long long iteration = 0; mstime_t handshake_timeout;
// 记录一次迭代 iteration++; /* Number of times this function was called so far. */
/* The handshake timeout is the time after which a handshake node that was * not turned into a normal node is removed from the nodes. Usually it is * just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use * the value of 1 second. */ // 如果一个 handshake 节点没有在 handshake timeout 内 // 转换成普通节点(normal node), // 那么节点会从 nodes 表中移除这个 handshake 节点 // 一般来说 handshake timeout 的值总是等于 NODE_TIMEOUT // 不过如果 NODE_TIMEOUT 太少的话,程序会将值设为 1 秒钟 handshake_timeout = server.cluster_node_timeout; if (handshake_timeout < 1000) handshake_timeout = 1000;
/* Check if we have disconnected nodes and re-establish the connection. */ // 向集群中的所有断线或者未连接节点发送消息 di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de);
// 跳过当前节点以及没有地址的节点 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
/* A Node in HANDSHAKE state has a limited lifespan equal to the * configured node timeout. */ // 如果 handshake 节点已超时,释放它 if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) { freeClusterNode(node); continue; }
// 为未创建连接的节点创建连接 if (node->link == NULL) { int fd; mstime_t old_ping_sent; clusterLink *link;
fd = anetTcpNonBlockBindConnect(server.neterr, node->ip, node->port+REDIS_CLUSTER_PORT_INCR, server.bindaddr_count ? server.bindaddr[0] : NULL); if (fd == -1) { redisLog(REDIS_DEBUG, "Unable to connect to " "Cluster Node [%s]:%d -> %s", node->ip, node->port+REDIS_CLUSTER_PORT_INCR, server.neterr); continue; } link = createClusterLink(node); link->fd = fd; node->link = link; aeCreateFileEvent(server.el,link->fd,AE_READABLE, clusterReadHandler,link); /* Queue a PING in the new connection ASAP: this is crucial * to avoid false positives in failure detection. * * If the node is flagged as MEET, we send a MEET message instead * of a PING one, to force the receiver to add us in its node * table. */ // 向新连接的节点发送 PING 命令,防止节点被识进入下线 // 如果节点被标记为 MEET ,那么发送 MEET 命令,否则发送 PING 命令 old_ping_sent = node->ping_sent; clusterSendPing(link, node->flags & REDIS_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
// 这不是第一次发送 PING 信息,所以可以还原这个时间 // 等 clusterSendPing() 函数来更新它 if (old_ping_sent) { /* If there was an active ping before the link was * disconnected, we want to restore the ping time, otherwise * replaced by the clusterSendPing() call. */ node->ping_sent = old_ping_sent; }
/* We can clear the flag after the first packet is sent. * * 在发送 MEET 信息之后,清除节点的 MEET 标识。 * * If we'll never receive a PONG, we'll never send new packets * to this node. Instead after the PONG is received and we * are no longer in meet/handshake status, we want to send * normal PING packets. * * 如果当前节点(发送者)没能收到 MEET 信息的回复, * 那么它将不再向目标节点发送命令。 * * 如果接收到回复的话,那么节点将不再处于 HANDSHAKE 状态, * 并继续向目标节点发送普通 PING 命令。 */ node->flags &= ~REDIS_NODE_MEET;
redisLog(REDIS_DEBUG,"Connecting with Node %.40s at %s:%d", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR); } } dictReleaseIterator(di);
/* Ping some random node 1 time every 10 iterations, so that we usually ping * one random node every second. */ // clusterCron() 每执行 10 次(至少间隔一秒钟),就向一个随机节点发送 gossip 信息 if (!(iteration % 10)) { int j;
/* Check a few random nodes and ping the one with the oldest * pong_received time. */ // 随机 5 个节点,选出其中一个 for (j = 0; j < 5; j++) {
// 随机在集群中挑选节点 de = dictGetRandomKey(server.cluster->nodes); clusterNode *this = dictGetVal(de);
/* Don't ping nodes disconnected or with a ping currently active. */ // 不要 PING 连接断开的节点,也不要 PING 最近已经 PING 过的节点 if (this->link == NULL || this->ping_sent != 0) continue;
if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue;
// 遍历所有节点,检查是否需要将某个节点标记为下线 /* Iterate nodes to check if we need to flag something as failing. * This loop is also responsible to: * 1) Check if there are orphaned masters (masters without non failing * slaves). * 2) Count the max number of non failing slaves for a single master. * 3) Count the number of slaves for our master, if we are a slave. */ orphaned_masters = 0; max_slaves = 0; this_slaves = 0; di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); now = mstime(); /* Use an updated time at every iteration. */ mstime_t delay;
// 跳过节点本身、无地址节点、HANDSHAKE 状态的节点 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE)) continue;
/* Orphaned master check, useful only if the current instance * is a slave that may migrate to another master. */ if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) { int okslaves = clusterCountNonFailingSlaves(node);
if (okslaves == 0 && node->numslots > 0) orphaned_masters++; if (okslaves > max_slaves) max_slaves = okslaves; if (nodeIsSlave(myself) && myself->slaveof == node) this_slaves = okslaves; }
/* If we are waiting for the PONG more than half the cluster * timeout, reconnect the link: maybe there is a connection * issue even if the node is alive. */ // 如果等到 PONG 到达的时间超过了 node timeout 一半的连接 // 因为尽管节点依然正常,但连接可能已经出问题了 if (node->link && /* is connected */ now - node->link->ctime > server.cluster_node_timeout && /* was not already reconnected */ node->ping_sent && /* we already sent a ping */ node->pong_received < node->ping_sent && /* still waiting pong */ /* and we are waiting for the pong more than timeout/2 */ now - node->ping_sent > server.cluster_node_timeout/2) { /* Disconnect the link, it will be reconnected automatically. */ // 释放连接,下次 clusterCron() 会自动重连 freeClusterLink(node->link); }
/* If we have currently no active ping in this instance, and the * received PONG is older than half the cluster timeout, send * a new ping now, to ensure all the nodes are pinged without * a too big delay. */ // 如果目前没有在 PING 节点 // 并且已经有 node timeout 一半的时间没有从节点那里收到 PONG 回复 // 那么向节点发送一个 PING ,确保节点的信息不会太旧 // (因为一部分节点可能一直没有被随机中) if (node->link && node->ping_sent == 0 && (now - node->pong_received) > server.cluster_node_timeout/2) { clusterSendPing(node->link, CLUSTERMSG_TYPE_PING); continue; }
/* If we are a master and one of the slaves requested a manual * failover, ping it continuously. */ // 如果这是一个主节点,并且有一个从服务器请求进行手动故障转移 // 那么向从服务器发送 PING 。 if (server.cluster->mf_end && nodeIsMaster(myself) && server.cluster->mf_slave == node && node->link) { clusterSendPing(node->link, CLUSTERMSG_TYPE_PING); continue; }
/* Check only if we have an active ping for this instance. */ // 以下代码只在节点发送了 PING 命令的情况下执行 if (node->ping_sent == 0) continue;
/* Compute the delay of the PONG. Note that if we already received * the PONG, then node->ping_sent is zero, so can't reach this * code at all. */ // 计算等待 PONG 回复的时长 delay = now - node->ping_sent;
// 等待 PONG 回复的时长超过了限制值,将目标节点标记为 PFAIL (疑似下线) if (delay > server.cluster_node_timeout) { /* Timeout reached. Set the node as possibly failing if it is * not already in this state. */ if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) { redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing", node->name); // 打开疑似下线标记 node->flags |= REDIS_NODE_PFAIL; update_state = 1; } } } dictReleaseIterator(di);
/* If we are a slave node but the replication is still turned off, * enable it if we know the address of our master and it appears to * be up. */ // 如果从节点没有在复制主节点,那么对从节点进行设置 if (nodeIsSlave(myself) && server.masterhost == NULL && myself->slaveof && nodeHasAddr(myself->slaveof)) { replicationSetMaster(myself->slaveof->ip, myself->slaveof->port); }
/* Abourt a manual failover if the timeout is reached. */ manualFailoverCheckTimeout();
if (nodeIsSlave(myself)) { clusterHandleManualFailover(); clusterHandleSlaveFailover(); /* If there are orphaned slaves, and we are a slave among the masters * with the max number of non-failing slaves, consider migrating to * the orphaned masters. Note that it does not make sense to try * a migration if there is no master with at least *two* working * slaves. */ if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves) clusterHandleSlaveMigration(max_slaves); }
// 更新集群状态 if (update_state || server.cluster->state == REDIS_CLUSTER_FAIL) clusterUpdateState(); }
2、状态信息并不是全量同步,而是随机选M(M << N)个节点的状态同步到其他节点。 M值最小为3,最大为N - 2,一般情况下M = N / 10。
/* Send a PING or PONG packet to the specified node, making sure to add enough * gossip informations. */ // 向指定节点发送一条 MEET 、 PING 或者 PONG 消息 void clusterSendPing(clusterLink *link, int type) { unsigned char buf[sizeof(clusterMsg)]; clusterMsg *hdr = (clusterMsg*) buf; int gossipcount = 0, totlen; /* freshnodes is the number of nodes we can still use to populate the * gossip section of the ping packet. Basically we start with the nodes * we have in memory minus two (ourself and the node we are sending the * message to). Every time we add a node we decrement the counter, so when * it will drop to <= zero we know there is no more gossip info we can * send. */ // freshnodes 是用于发送 gossip 信息的计数器 // 每次发送一条信息时,程序将 freshnodes 的值减一 // 当 freshnodes 的数值小于等于 0 时,程序停止发送 gossip 信息 // freshnodes 的数量是节点目前的 nodes 表中的节点数量减去 2 // 这里的 2 指两个节点,一个是 myself 节点(也即是发送信息的这个节点) // 另一个是接受 gossip 信息的节点 int freshnodes = dictSize(server.cluster->nodes)-2;
// 如果发送的信息是 PING ,那么更新最后一次发送 PING 命令的时间戳 if (link->node && type == CLUSTERMSG_TYPE_PING) link->node->ping_sent = mstime();
/* Create RESTORE payload and generate the protocol to call the command. */ for (j = 0; j < num_keys; j++) { long long ttl = 0; long long expireat = getExpire(c->db,kv[j]); //检查键是不是已经过期 if (expireat != -1) { ttl = expireat-mstime(); if (ttl < 0) { continue; } if (ttl < 1) ttl = 1; } kv[non_expired++] = kv[j];
/* If cluster is enabled perform the cluster redirection here. * * 如果开启了集群模式,那么在这里进行转向操作。 * * However we don't perform the redirection if: * * 不过,如果有以下情况出现,那么节点不进行转向: * * 1) The sender of this command is our master. * 命令的发送者是本节点的主节点 * * 2) The command has no key arguments. * 命令没有 key 参数 */ if (server.cluster_enabled && !(c->flags & REDIS_MASTER) && !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) { int hashslot;
// 集群已下线 if (server.cluster->state != REDIS_CLUSTER_OK) { flagTransaction(c); addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n")); return REDIS_OK;
// 集群运作正常 } else { int error_code; clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code); // 不能执行多键处理命令 if (n == NULL) { flagTransaction(c); if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) { addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n")); } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) { /* The request spawns mutliple keys in the same slot, * but the slot is not "stable" currently as there is * a migration or import in progress. */ addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n")); } else { redisPanic("getNodeByQuery() unknown error."); } return REDIS_OK;
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
...
/* If we are receiving the slot, and the client correctly flagged the * request as "ASKING", we can serve the request. However if the request * involves multiple keys and we don't have them all, the only option is * to send a TRYAGAIN error. */ if (importing_slot && (c->flags & REDIS_ASKING || cmd->flags & REDIS_CMD_ASKING)) { if (multiple_keys && missing_keys) { if (error_code) *error_code = REDIS_CLUSTER_REDIR_UNSTABLE; return NULL; } else { return myself; } } ... }
/* This is executed 10 times every second */ // 集群常规操作函数,默认每秒执行 10 次(每间隔 100 毫秒执行一次) void clusterCron(void) { ...
while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); now = mstime(); /* Use an updated time at every iteration. */ mstime_t delay;
...
/* If we are waiting for the PONG more than half the cluster * timeout, reconnect the link: maybe there is a connection * issue even if the node is alive. */ // 判断连接的节点是否出事 if (node->link && /* is connected */ now - node->link->ctime > server.cluster_node_timeout && /* was not already reconnected */ // ping_sent记录发送命令的时间 node->ping_sent && /* we already sent a ping */ node->pong_received < node->ping_sent && /* still waiting pong */ /* and we are waiting for the pong more than timeout/2 */ // PONG 到达的时间超过了 node_timeout 的一半 now - node->ping_sent > server.cluster_node_timeout/2) { /* Disconnect the link, it will be reconnected automatically. */ // 释放连接,此时node->link=NULL,下次 clusterCron() 会自动重连 freeClusterLink(node->link); } ... } ... }
/* This is executed 10 times every second */ // 集群常规操作函数,默认每秒执行 10 次(每间隔 100 毫秒执行一次) void clusterCron(void) {
...
/* Check if we have disconnected nodes and re-establish the connection. */ // 向集群中的所有断线或者未连接节点发送消息 di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de);
// 跳过当前节点以及没有地址的节点 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
...
// 为未创建连接的节点创建连接 if (node->link == NULL) { int fd; mstime_t old_ping_sent; clusterLink *link;
fd = anetTcpNonBlockBindConnect(server.neterr, node->ip, node->port+REDIS_CLUSTER_PORT_INCR, server.bindaddr_count ? server.bindaddr[0] : NULL); if (fd == -1) { redisLog(REDIS_DEBUG, "Unable to connect to " "Cluster Node [%s]:%d -> %s", node->ip, node->port+REDIS_CLUSTER_PORT_INCR, server.neterr); continue; } link = createClusterLink(node); link->fd = fd; node->link = link; aeCreateFileEvent(server.el,link->fd,AE_READABLE, clusterReadHandler,link); /* Queue a PING in the new connection ASAP: this is crucial * to avoid false positives in failure detection. * * If the node is flagged as MEET, we send a MEET message instead * of a PING one, to force the receiver to add us in its node * table. */ // 向新连接的节点发送 PING 命令,防止节点被识进入下线 // 如果节点被标记为 MEET ,那么发送 MEET 命令,否则发送 PING 命令 old_ping_sent = node->ping_sent; clusterSendPing(link, node->flags & REDIS_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
// 这不是第一次发送 PING 信息,所以可以还原这个时间 // 等 clusterSendPing() 函数来更新它 if (old_ping_sent) { /* If there was an active ping before the link was * disconnected, we want to restore the ping time, otherwise * replaced by the clusterSendPing() call. */ node->ping_sent = old_ping_sent; }
/* We can clear the flag after the first packet is sent. * * 在发送 MEET 信息之后,清除节点的 MEET 标识。 * * If we'll never receive a PONG, we'll never send new packets * to this node. Instead after the PONG is received and we * are no longer in meet/handshake status, we want to send * normal PING packets. * * 如果当前节点(发送者)没能收到 MEET 信息的回复, * 那么它将不再向目标节点发送命令。 * * 如果接收到回复的话,那么节点将不再处于 HANDSHAKE 状态, * 并继续向目标节点发送普通 PING 命令。 */ node->flags &= ~REDIS_NODE_MEET;
redisLog(REDIS_DEBUG,"Connecting with Node %.40s at %s:%d", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR); } } ... }
// 遍历所有节点,检查是否需要将某个节点标记为下线 /* Iterate nodes to check if we need to flag something as failing. * This loop is also responsible to: * 1) Check if there are orphaned masters (masters without non failing * slaves). * 2) Count the max number of non failing slaves for a single master. * 3) Count the number of slaves for our master, if we are a slave. */ orphaned_masters = 0; max_slaves = 0; this_slaves = 0; di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { ...
/* Check only if we have an active ping for this instance. */ // 以下代码只在节点发送了 PING 命令的情况下执行 if (node->ping_sent == 0) continue;
/* Compute the delay of the PONG. Note that if we already received * the PONG, then node->ping_sent is zero, so can't reach this * code at all. */ // 计算等待 PONG 回复的时长 delay = now - node->ping_sent;
// 等待 PONG 回复的时长超过了限制值,将目标节点标记为 PFAIL (疑似下线) if (delay > server.cluster_node_timeout) { /* Timeout reached. Set the node as possibly failing if it is * not already in this state. */ if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) { redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing", node->name); // 打开疑似下线标记 node->flags |= REDIS_NODE_PFAIL; update_state = 1; } } } ... }
// 这是一条 PING 消息或者 MEET 消息 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) { redisLog(REDIS_DEBUG,"Ping packet received: %p", (void*)link->node);
/* Add this node if it is new for us and the msg type is MEET. * * 如果当前节点是第一次遇见这个节点,并且对方发来的是 MEET 信息, * 那么将这个节点添加到集群的节点列表里面。 * * In this stage we don't try to add the node with the right * flags, slaveof pointer, and so forth, as this details will be * resolved when we'll receive PONGs from the node. * * 节点目前的 flag 、 slaveof 等属性的值都是未设置的, * 等当前节点向对方发送 PING 命令之后, * 这些信息可以从对方回复的 PONG 信息中取得。 */ if (!sender && type == CLUSTERMSG_TYPE_MEET) { clusterNode *node;
// 遍历所有节点的信息 while(count--) { sds ci = sdsempty();
// 分析节点的 flag uint16_t flags = ntohs(g->flags);
// 信息节点 clusterNode *node;
...
/* Update our state accordingly to the gossip sections */ // 使用消息中的信息对节点进行更新 node = clusterLookupNode(g->nodename); // 节点已经存在于当前节点 if (node) { /* We already know this node. Handle failure reports, only when the sender is a master. */ // 如果 sender 是一个主节点,那么我们需要处理下线报告 if (sender && nodeIsMaster(sender) && node != myself) { // 节点处于 FAIL 或者 PFAIL 状态 if (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)) {
// 添加 sender 对 node 的下线报告 if (clusterNodeAddFailureReport(node,sender)) { redisLog(REDIS_VERBOSE, "Node %.40s reported node %.40s as not reachable.", sender->name, node->name); }
// 如果 sender 曾经发送过对 node 的下线报告 // 那么清除该报告 if (clusterNodeDelFailureReport(node,sender)) { redisLog(REDIS_VERBOSE, "Node %.40s reported node %.40s is back online.", sender->name, node->name); } } }
/* If we already know this node, but it is not reachable, and * we see a different address in the gossip section, start an * handshake with the (possibly) new address: this will result * into a node address update if the handshake will be * successful. */ // 如果节点之前处于 PFAIL 或者 FAIL 状态 // 并且该节点的 IP 或者端口号已经发生变化 // 那么可能是节点换了新地址,尝试对它进行握手 if (node->flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL) && (strcasecmp(node->ip,g->ip) || node->port != ntohs(g->port))) { clusterStartHandshake(g->ip,ntohs(g->port)); }
// 当前节点不认识 node } else { /* If it's not in NOADDR state and we don't have it, we * start a handshake process against this IP/PORT pairs. * * 如果 node 不在 NOADDR 状态,并且当前节点不认识 node * 那么向 node 发送 HANDSHAKE 消息。 * * Note that we require that the sender of this gossip message * is a well known node in our cluster, otherwise we risk * joining another cluster. * * 注意,当前节点必须保证 sender 是本集群的节点, * 否则我们将有加入了另一个集群的风险。 */ if (sender && !(flags & REDIS_NODE_NOADDR) && !clusterBlacklistExists(g->nodename)) { clusterStartHandshake(g->ip,ntohs(g->port)); } }
/* This function checks if a given node should be marked as FAIL. * It happens if the following conditions are met: * * 此函数用于判断是否需要将 node 标记为 FAIL 。 * * 将 node 标记为 FAIL 需要满足以下两个条件: * * 1) We received enough failure reports from other master nodes via gossip. * Enough means that the majority of the masters signaled the node is * down recently. * 有半数以上的主节点将 node 标记为 PFAIL 状态。 * 2) We believe this node is in PFAIL state. * 当前节点也将 node 标记为 PFAIL 状态。 * * If a failure is detected we also inform the whole cluster about this * event trying to force every other node to set the FAIL flag for the node. * * 如果确认 node 已经进入了 FAIL 状态, * 那么节点还会向其他节点发送 FAIL 消息,让其他节点也将 node 标记为 FAIL 。 * * Note that the form of agreement used here is weak, as we collect the majority * of masters state during some time, and even if we force agreement by * propagating the FAIL message, because of partitions we may not reach every * node. However: * * 注意,集群判断一个 node 进入 FAIL 所需的条件是弱(weak)的, * 因为节点们对 node 的状态报告并不是实时的,而是有一段时间间隔 * (这段时间内 node 的状态可能已经发生了改变), * 并且尽管当前节点会向其他节点发送 FAIL 消息, * 但因为网络分裂(network partition)的问题, * 有一部分节点可能还是会不知道将 node 标记为 FAIL 。 * * 不过: * * 1) Either we reach the majority and eventually the FAIL state will propagate * to all the cluster. * 只要我们成功将 node 标记为 FAIL , * 那么这个 FAIL 状态最终(eventually)总会传播至整个集群的所有节点。 * 2) Or there is no majority so no slave promotion will be authorized and the * FAIL flag will be cleared after some time. * 又或者,因为没有半数的节点支持,当前节点不能将 node 标记为 FAIL , * 所以对 FAIL 节点的故障转移将无法进行, FAIL 标识可能会在之后被移除。 * */ void markNodeAsFailingIfNeeded(clusterNode *node) { int failures;
/* Also count myself as a voter if I'm a master. */ // 如果当前节点是主节点,那么将当前节点也算在 failures 之内 if (nodeIsMaster(myself)) failures++; // 报告下线节点的数量不足节点总数的一半,不能将节点判断为 FAIL ,返回 if (failures < needed_quorum) return; /* No weak agreement from masters. */
redisLog(REDIS_NOTICE, "Marking node %.40s as failing (quorum reached).", node->name);
/* Mark the node as failing. */ // 将 node 标记为 FAIL node->flags &= ~REDIS_NODE_PFAIL; node->flags |= REDIS_NODE_FAIL; node->fail_time = mstime();
/* Broadcast the failing node name to everybody, forcing all the other * reachable nodes to flag the node as FAIL. */ // 如果当前节点是主节点的话,那么向其他节点发送报告 node 的 FAIL 信息 // 让其他节点也将 node 标记为 FAIL if (nodeIsMaster(myself)) clusterSendFail(node->name); clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); }
/* Send a FAIL message to all the nodes we are able to contact. * * 向当前节点已知的所有节点发送 FAIL 信息。 */ void clusterSendFail(char *nodename) { unsigned char buf[sizeof(clusterMsg)]; clusterMsg *hdr = (clusterMsg *) buf;
/* Send a message to all the nodes that are part of the cluster having * a connected link. * * 向节点连接的所有其他节点发送信息。 */ void clusterBroadcastMessage(void *buf, size_t len) { dictIterator *di; dictEntry *de;
// 遍历所有已知节点 di = dictGetSafeIterator(server.cluster->nodes); while ((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de);
/* Abourt a manual failover if the timeout is reached. */ manualFailoverCheckTimeout();
// 如果当前节点是子节点 if (nodeIsSlave(myself)) { clusterHandleManualFailover(); // 处理集群子节点的故障迁移 clusterHandleSlaveFailover(); /* If there are orphaned slaves, and we are a slave among the masters * with the max number of non-failing slaves, consider migrating to * the orphaned masters. Note that it does not make sense to try * a migration if there is no master with at least *two* working * slaves. */ if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves) clusterHandleSlaveMigration(max_slaves); }
// 更新集群状态 if (update_state || server.cluster->state == REDIS_CLUSTER_FAIL) clusterUpdateState(); }
/* This function is called if we are a slave node and our master serving * a non-zero amount of hash slots is in FAIL state. * * 如果当前节点是一个从节点,并且它正在复制的一个负责非零个槽的主节点处于 FAIL 状态, * 那么执行这个函数。 * * The gaol of this function is: * * 这个函数有三个目标: * * 1) To check if we are able to perform a failover, is our data updated? * 检查是否可以对主节点执行一次故障转移,节点的关于主节点的信息是否准确和最新(updated)? * 2) Try to get elected by masters. * 选举一个新的主节点 * 3) Perform the failover informing all the other nodes. * 执行故障转移,并通知其他节点 */ void clusterHandleSlaveFailover(void) { mstime_t data_age; mstime_t auth_age = mstime() - server.cluster->failover_auth_time; int needed_quorum = (server.cluster->size / 2) + 1; int manual_failover = server.cluster->mf_end != 0 && server.cluster->mf_can_start; int j; mstime_t auth_timeout, auth_retry_time;
/* Compute the failover timeout (the max time we have to send votes * and wait for replies), and the failover retry time (the time to wait * before waiting again. * * Timeout is MIN(NODE_TIMEOUT*2,2000) milliseconds. * Retry is two times the Timeout. */ auth_timeout = server.cluster_node_timeout * 2; if (auth_timeout < 2000) auth_timeout = 2000; auth_retry_time = auth_timeout * 2;
/* Pre conditions to run the function, that must be met both in case * of an automatic or manual failover: * 1) We are a slave. * 2) Our master is flagged as FAIL, or this is a manual failover. * 3) It is serving slots. */ if (nodeIsMaster(myself) || myself->slaveof == NULL || (!nodeFailed(myself->slaveof) && !manual_failover) || myself->slaveof->numslots == 0) return; ...
/* Set data_age to the number of seconds we are disconnected from * the master. */ // 将 data_age 设置为从节点与主节点的断开秒数 if (server.repl_state == REDIS_REPL_CONNECTED) { data_age = (mstime_t) (server.unixtime - server.master->lastinteraction) * 1000; } else { data_age = (mstime_t) (server.unixtime - server.repl_down_since) * 1000; }
/* Remove the node timeout from the data age as it is fine that we are * disconnected from our master at least for the time it was down to be * flagged as FAIL, that's the baseline. */ // node timeout 的时间不计入断线时间之内 if (data_age > server.cluster_node_timeout) data_age -= server.cluster_node_timeout;
/* Check if our data is recent enough. For now we just use a fixed * constant of ten times the node timeout since the cluster should * react much faster to a master down. * * Check bypassed for manual failovers. */ // 检查这个从节点的数据是否较新: // 目前的检测办法是断线时间不能超过 node timeout 的十倍 if (data_age > ((mstime_t)server.repl_ping_slave_period * 1000) + (server.cluster_node_timeout * REDIS_CLUSTER_SLAVE_VALIDITY_MULT)) { if (!manual_failover) return; }
/* If the previous failover attempt timedout and the retry time has * elapsed, we can setup a new one. */ if (auth_age > auth_retry_time) { server.cluster->failover_auth_time = mstime() + 500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */ random() % 500; /* Random delay between 0 and 500 milliseconds. */ server.cluster->failover_auth_count = 0; server.cluster->failover_auth_sent = 0; server.cluster->failover_auth_rank = clusterGetSlaveRank(); /* We add another delay that is proportional to the slave rank. * Specifically 1 second * rank. This way slaves that have a probably * less updated replication offset, are penalized. */ server.cluster->failover_auth_time += server.cluster->failover_auth_rank * 1000; /* However if this is a manual failover, no delay is needed. */ if (server.cluster->mf_end) { server.cluster->failover_auth_time = mstime(); server.cluster->failover_auth_rank = 0; } redisLog(REDIS_WARNING, "Start of election delayed for %lld milliseconds " "(rank #%d, offset %lld).", server.cluster->failover_auth_time - mstime(), server.cluster->failover_auth_rank, replicationGetSlaveOffset()); /* Now that we have a scheduled election, broadcast our offset * to all the other slaves so that they'll updated their offsets * if our offset is better. */ clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES); return; } ... /* Return ASAP if we can't still start the election. */ // 如果执行故障转移的时间未到,先返回 if (mstime() < server.cluster->failover_auth_time) return;
/* Check if we reached the quorum. */ // 如果当前节点获得了足够多的投票,那么对下线主节点进行故障转移 if (server.cluster->failover_auth_count >= needed_quorum) { // 旧主节点 clusterNode *oldmaster = myself->slaveof;
redisLog(REDIS_WARNING, "Failover election won: I'm the new master.");
/* We have the quorum, perform all the steps to correctly promote * this slave to a master. * * 1) Turn this node into a master. * 将当前节点的身份由从节点改为主节点 */ clusterSetNodeAsMaster(myself); // 让从节点取消复制,成为新的主节点 replicationUnsetMaster();
/* 2) Claim all the slots assigned to our master. */ // 接收所有主节点负责处理的槽 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) { if (clusterNodeGetSlotBit(oldmaster, j)) { // 将槽设置为未分配的 clusterDelSlot(j); // 将槽的负责人设置为当前节点 clusterAddSlot(myself, j); } }
/* 3) Update my configEpoch to the epoch of the election. */ // 更新集群配置纪元 myself->configEpoch = server.cluster->failover_auth_epoch;
/* 4) Update state and save config. */ // 更新节点状态 clusterUpdateState(); // 并保存配置文件 clusterSaveConfigOrDie(1);
/* 5) Pong all the other nodes so that they can update the state * accordingly and detect that we switched to master role. */ // 向所有节点发送 PONG 信息 // 让它们可以知道当前节点已经升级为主节点了 clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
/* 6) If there was a manual failover in progress, clear the state. */ // 如果有手动故障转移正在执行,那么清理和它有关的状态 resetManualFailover(); } }
/* 5) Pong all the other nodes so that they can update the state * accordingly and detect that we switched to master role. */ // 向所有节点发送 PONG 信息 // 让它们可以知道当前节点已经升级为主节点了 clusterBroadcastPong(CLUSTER_BROADCAST_ALL);