struct SDS<T> { T capacity; // 数组容量 T len; // 数组当前长度 byte flags; // 特殊标识位,不理睬它 byte[] content; // 数组内容 }
下面的函数将 t 数组拷贝到 s 中,如果长度不够则需要进行扩容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/* Append the specified binary-safe string pointed by 't' of 'len' bytes to the * end of the specified sds string 's'. * * After the call, the passed sds string is no longer valid and all the * references must be substituted with the new pointer returned by the call. */ sds sdscatlen(sds s, const void *t, size_t len) { size_t curlen = sdslen(s); // 原字符串长度
// 创建 epoll 实例 state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ if (state->epfd == -1) { zfree(state->events); zfree(state); return -1; }
// 1、看是否有事件到达了执行时间 // 2、如果有,则执行这些事件 /* Process every pending time event, then every pending file event * (that may be registered by time event callbacks just processed). * * 处理所有已到达的时间事件,以及所有已就绪的文件事件。 * * Without special flags the function sleeps until some file event * fires, or when the next time event occurs (if any). * * 如果不传入特殊 flags 的话,那么函数睡眠直到文件事件就绪, * 或者下个时间事件到达(如果有的话)。 * * If flags is 0, the function does nothing and returns. * 如果 flags 为 0 ,那么函数不作动作,直接返回。 * * if flags has AE_ALL_EVENTS set, all the kind of events are processed. * 如果 flags 包含 AE_ALL_EVENTS ,所有类型的事件都会被处理。 * * if flags has AE_FILE_EVENTS set, file events are processed. * 如果 flags 包含 AE_FILE_EVENTS ,那么处理文件事件。 * * if flags has AE_TIME_EVENTS set, time events are processed. * 如果 flags 包含 AE_TIME_EVENTS ,那么处理时间事件。 * * if flags has AE_DONT_WAIT set the function returns ASAP until all * the events that's possible to process without to wait are processed. * 如果 flags 包含 AE_DONT_WAIT , * 那么函数在处理完所有不许阻塞的事件之后,即刻返回。 * * The function returns the number of events processed. * 函数的返回值为已处理事件的数量 */ int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents;
/* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp;
// 获取最近的时间事件 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); if (shortest) { // 如果时间事件存在的话 // 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间 long now_sec, now_ms;
/* Calculate the time missing for the nearest * timer to fire. */ // 计算距今最近的时间事件还要多久才能达到 // 并将该时间距保存在 tv 结构中 aeGetTime(&now_sec, &now_ms); tvp = &tv; tvp->tv_sec = shortest->when_sec - now_sec; if (shortest->when_ms < now_ms) { tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000; tvp->tv_sec --; } else { tvp->tv_usec = (shortest->when_ms - now_ms)*1000; }
/* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { // 设置文件事件不阻塞 tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ // 文件事件可以阻塞直到有事件到达为止 tvp = NULL; /* wait forever */ } }
int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn't * processed, so we check if the event is still valid. */ // 读事件 if (fe->mask & mask & AE_READABLE) { // rfired 确保读/写事件只能执行其中一个 rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } // 写事件 if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); }
processed++; } }
/* Check time events */ // 执行时间事件 if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */ }
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { ...
/* Run the Sentinel timer if we are in sentinel mode. */ // 如果服务器运行在 sentinel 模式下,那么执行 SENTINEL 的主函数 run_with_period(100) { if (server.sentinel_mode) sentinelTimer(); }
// 根据情况,向实例发送 PING、 INFO 或者 PUBLISH 命令 sentinelSendPeriodicCommands(ri);
... }
// 根据时间和实例类型等情况,向实例发送命令,比如 INFO 、PING 和 PUBLISH // 虽然函数的名字包含 Ping ,但命令并不只发送 PING 命令 /* Send periodic PING, INFO, and PUBLISH to the Hello channel to * the specified master or slave instance. */ void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) { mstime_t now = mstime(); mstime_t info_period, ping_period; int retval;
/* Return ASAP if we have already a PING or INFO already pending, or * in the case the instance is not properly connected. */ // 函数不能在网络连接未创建时执行 if (ri->flags & SRI_DISCONNECTED) return;
/* For INFO, PING, PUBLISH that are not critical commands to send we * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't * want to use a lot of memory just because a link is not working * properly (note that anyway there is a redundant protection about this, * that is, the link will be disconnected and reconnected if a long * timeout condition is detected. */ // 为了避免 sentinel 在实例处于不正常状态时,发送过多命令 // sentinel 只在待发送命令的数量未超过 SENTINEL_MAX_PENDING_COMMANDS 常量时 // 才进行命令发送 if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
/* If this is a slave of a master in O_DOWN condition we start sending * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD * period. In this state we want to closely monitor slaves in case they * are turned into masters by another Sentinel, or by the sysadmin. */ // 对于从服务器来说, sentinel 默认每 SENTINEL_INFO_PERIOD 秒向它发送一次 INFO 命令 // 但是,当从服务器的主服务器处于 SDOWN 状态,或者正在执行故障转移时 // 为了更快速地捕捉从服务器的变动, sentinel 会将发送 INFO 命令的频率该为每秒一次 if ((ri->flags & SRI_SLAVE) && (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) { info_period = 1000; } else { info_period = SENTINEL_INFO_PERIOD; }
/* We ping instances every time the last received pong is older than * the configured 'down-after-milliseconds' time, but every second * anyway if 'down-after-milliseconds' is greater than 1 second. */ ping_period = ri->down_after_period; if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;
// 实例不是 Sentinel (主服务器或者从服务器) // 并且以下条件的其中一个成立: // 1)SENTINEL 未收到过这个服务器的 INFO 命令回复 // 2)距离上一次该实例回复 INFO 命令已经超过 info_period 间隔 // 那么向实例发送 INFO 命令 if ((ri->flags & SRI_SENTINEL) == 0 && (ri->info_refresh == 0 || (now - ri->info_refresh) > info_period)) { /* Send INFO to masters and slaves, not sentinels. */ retval = redisAsyncCommand(ri->cc, sentinelInfoReplyCallback, NULL, "INFO"); if (retval == REDIS_OK) ri->pending_commands++; } else if ((now - ri->last_pong_time) > ping_period) { /* Send PING to all the three kinds of instances. */ sentinelSendPing(ri); } else if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) { /* PUBLISH hello messages to all the three kinds of instances. */ sentinelSendHello(ri); } }
/* We don't proceed with the acting half if we are in TILT mode. * TILT happens when we find something odd with the time, like a * sudden change in the clock. */ // 如果 Sentinel 处于 TILT 模式,那么不执行故障检测。 if (sentinel.tilt) {
// 如果 TILI 模式未解除,那么不执行动作 if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
/* Every kind of instance */ // 检查给定实例是否进入 SDOWN 状态 sentinelCheckSubjectivelyDown(ri);
... }
/* Is this instance down from our point of view? */ // 检查实例是否已下线(从本 Sentinel 的角度来看) void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
...
/* Update the SDOWN flag. We believe the instance is SDOWN if: * * 更新 SDOWN 标识。如果以下条件被满足,那么 Sentinel 认为实例已下线: * * 1) It is not replying. * 它没有回应命令 * 2) We believe it is a master, it reports to be a slave for enough time * to meet the down_after_period, plus enough time to get two times * INFO report from the instance. * Sentinel 认为实例是主服务器,这个服务器向 Sentinel 报告它将成为从服务器, * 但在超过给定时限之后,服务器仍然没有完成这一角色转换。 */ if (elapsed > ri->down_after_period || (ri->flags & SRI_MASTER && ri->role_reported == SRI_SLAVE && mstime() - ri->role_reported_time > (ri->down_after_period+SENTINEL_INFO_PERIOD*2))) { /* Is subjectively down */ if ((ri->flags & SRI_S_DOWN) == 0) { // 发送事件 sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@"); // 记录进入 SDOWN 状态的时间 ri->s_down_since_time = mstime(); // 打开 SDOWN 标志 ri->flags |= SRI_S_DOWN; } } else { // 移除(可能有的) SDOWN 状态 /* Is subjectively up */ if (ri->flags & SRI_S_DOWN) { // 发送事件 sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@"); // 移除相关标志 ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT); } } }
/* If this is a slave of a master in O_DOWN condition we start sending * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD * period. In this state we want to closely monitor slaves in case they * are turned into masters by another Sentinel, or by the sysadmin. */ // 对于从服务器来说, sentinel 默认每 SENTINEL_INFO_PERIOD 秒向它发送一次 INFO 命令 // 但是,当从服务器的主服务器处于 SDOWN 状态,或者正在执行故障转移时 // 为了更快速地捕捉从服务器的变动, sentinel 会将发送 INFO 命令的频率该为每秒一次 if ((ri->flags & SRI_SLAVE) && (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) { info_period = 1000; } else { info_period = SENTINEL_INFO_PERIOD; }
...
// 实例不是 Sentinel (主服务器或者从服务器) // 并且以下条件的其中一个成立: // 1)SENTINEL 未收到过这个服务器的 INFO 命令回复 // 2)距离上一次该实例回复 INFO 命令已经超过 info_period 间隔 // 那么向实例发送 INFO 命令 if ((ri->flags & SRI_SENTINEL) == 0 && (ri->info_refresh == 0 || (now - ri->info_refresh) > info_period)) { /* Send INFO to masters and slaves, not sentinels. */ retval = redisAsyncCommand(ri->cc, sentinelInfoReplyCallback, NULL, "INFO"); if (retval == REDIS_OK) ri->pending_commands++; } else if ... } }
/* The following fields are used to take the slave state on elections. */ // 以下这些域被用于进行故障转移选举
// 上次执行选举或者下次执行选举的时间 mstime_t failover_auth_time; /* Time of previous or next election. */
// 节点获得的投票数量 int failover_auth_count; /* Number of votes received so far. */
// 如果值为 1 ,表示本节点已经向其他节点发送了投票请求 int failover_auth_sent; /* True if we already asked for votes. */
int failover_auth_rank; /* This slave rank for current auth request. */
uint64_t failover_auth_epoch; /* Epoch of the current election. */
/* Manual failover state in common. */ /* 共用的手动故障转移状态 */
// 手动故障转移执行的时间限制 mstime_t mf_end; /* Manual failover time limit (ms unixtime). It is zero if there is no MF in progress. */ /* Manual failover state of master. */ /* 主服务器的手动故障转移状态 */ clusterNode *mf_slave; /* Slave performing the manual failover. */ /* Manual failover state of slave. */ /* 从服务器的手动故障转移状态 */ long long mf_master_offset; /* Master offset the slave needs to start MF or zero if stil not received. */ // 指示手动故障转移是否可以开始的标志值 // 值为非 0 时表示各个主服务器可以开始投票 int mf_can_start; /* If non-zero signal that the manual failover can start requesting masters vote. */
/* The followign fields are uesd by masters to take state on elections. */ /* 以下这些域由主服务器使用,用于记录选举时的状态 */
// 集群最后一次进行投票的纪元 uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
// 在进入下个事件循环之前要做的事情,以各个 flag 来记录 int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
// 通过 cluster 连接发送的消息数量 long long stats_bus_messages_sent; /* Num of msg sent via cluster bus. */
// 通过 cluster 接收到的消息数量 long long stats_bus_messages_received; /* Num of msg rcvd via cluster bus.*/
/* If cluster is enabled perform the cluster redirection here. * * 如果开启了集群模式,那么在这里进行转向操作。 * * However we don't perform the redirection if: * * 不过,如果有以下情况出现,那么节点不进行转向: * * 1) The sender of this command is our master. * 命令的发送者是本节点的主节点 * * 2) The command has no key arguments. * 命令没有 key 参数 */ if (server.cluster_enabled && !(c->flags & REDIS_MASTER) && !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) { int hashslot;
// 集群已下线 if (server.cluster->state != REDIS_CLUSTER_OK) { flagTransaction(c); addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n")); return REDIS_OK;
// 集群运作正常 } else { int error_code; clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code); // 不能执行多键处理命令 if (n == NULL) { flagTransaction(c); if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) { addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n")); } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) { /* The request spawns mutliple keys in the same slot, * but the slot is not "stable" currently as there is * a migration or import in progress. */ addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n")); } else { redisPanic("getNodeByQuery() unknown error."); } return REDIS_OK;
分区使用的粒度是key,不能使用一个非常长的排序key存储一个数据集(The partitioning granularity is the key, so it is not possible to shard a dataset with a single huge key like a very big sorted set).
/* This is executed 10 times every second */ // 集群常规操作函数,默认每秒执行 10 次(每间隔 100 毫秒执行一次) void clusterCron(void) { dictIterator *di; dictEntry *de; int update_state = 0; int orphaned_masters; /* How many masters there are without ok slaves. */ int max_slaves; /* Max number of ok slaves for a single master. */ int this_slaves; /* Number of ok slaves for our master (if we are slave). */ mstime_t min_pong = 0, now = mstime(); clusterNode *min_pong_node = NULL; // 迭代计数器,一个静态变量 static unsigned long long iteration = 0; mstime_t handshake_timeout;
// 记录一次迭代 iteration++; /* Number of times this function was called so far. */
/* The handshake timeout is the time after which a handshake node that was * not turned into a normal node is removed from the nodes. Usually it is * just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use * the value of 1 second. */ // 如果一个 handshake 节点没有在 handshake timeout 内 // 转换成普通节点(normal node), // 那么节点会从 nodes 表中移除这个 handshake 节点 // 一般来说 handshake timeout 的值总是等于 NODE_TIMEOUT // 不过如果 NODE_TIMEOUT 太少的话,程序会将值设为 1 秒钟 handshake_timeout = server.cluster_node_timeout; if (handshake_timeout < 1000) handshake_timeout = 1000;
/* Check if we have disconnected nodes and re-establish the connection. */ // 向集群中的所有断线或者未连接节点发送消息 di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de);
// 跳过当前节点以及没有地址的节点 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
/* A Node in HANDSHAKE state has a limited lifespan equal to the * configured node timeout. */ // 如果 handshake 节点已超时,释放它 if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) { freeClusterNode(node); continue; }
// 为未创建连接的节点创建连接 if (node->link == NULL) { int fd; mstime_t old_ping_sent; clusterLink *link;
fd = anetTcpNonBlockBindConnect(server.neterr, node->ip, node->port+REDIS_CLUSTER_PORT_INCR, server.bindaddr_count ? server.bindaddr[0] : NULL); if (fd == -1) { redisLog(REDIS_DEBUG, "Unable to connect to " "Cluster Node [%s]:%d -> %s", node->ip, node->port+REDIS_CLUSTER_PORT_INCR, server.neterr); continue; } link = createClusterLink(node); link->fd = fd; node->link = link; aeCreateFileEvent(server.el,link->fd,AE_READABLE, clusterReadHandler,link); /* Queue a PING in the new connection ASAP: this is crucial * to avoid false positives in failure detection. * * If the node is flagged as MEET, we send a MEET message instead * of a PING one, to force the receiver to add us in its node * table. */ // 向新连接的节点发送 PING 命令,防止节点被识进入下线 // 如果节点被标记为 MEET ,那么发送 MEET 命令,否则发送 PING 命令 old_ping_sent = node->ping_sent; clusterSendPing(link, node->flags & REDIS_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
// 这不是第一次发送 PING 信息,所以可以还原这个时间 // 等 clusterSendPing() 函数来更新它 if (old_ping_sent) { /* If there was an active ping before the link was * disconnected, we want to restore the ping time, otherwise * replaced by the clusterSendPing() call. */ node->ping_sent = old_ping_sent; }
/* We can clear the flag after the first packet is sent. * * 在发送 MEET 信息之后,清除节点的 MEET 标识。 * * If we'll never receive a PONG, we'll never send new packets * to this node. Instead after the PONG is received and we * are no longer in meet/handshake status, we want to send * normal PING packets. * * 如果当前节点(发送者)没能收到 MEET 信息的回复, * 那么它将不再向目标节点发送命令。 * * 如果接收到回复的话,那么节点将不再处于 HANDSHAKE 状态, * 并继续向目标节点发送普通 PING 命令。 */ node->flags &= ~REDIS_NODE_MEET;
redisLog(REDIS_DEBUG,"Connecting with Node %.40s at %s:%d", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR); } } dictReleaseIterator(di);
/* Ping some random node 1 time every 10 iterations, so that we usually ping * one random node every second. */ // clusterCron() 每执行 10 次(至少间隔一秒钟),就向一个随机节点发送 gossip 信息 if (!(iteration % 10)) { int j;
/* Check a few random nodes and ping the one with the oldest * pong_received time. */ // 随机 5 个节点,选出其中一个 for (j = 0; j < 5; j++) {
// 随机在集群中挑选节点 de = dictGetRandomKey(server.cluster->nodes); clusterNode *this = dictGetVal(de);
/* Don't ping nodes disconnected or with a ping currently active. */ // 不要 PING 连接断开的节点,也不要 PING 最近已经 PING 过的节点 if (this->link == NULL || this->ping_sent != 0) continue;
if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue;
// 遍历所有节点,检查是否需要将某个节点标记为下线 /* Iterate nodes to check if we need to flag something as failing. * This loop is also responsible to: * 1) Check if there are orphaned masters (masters without non failing * slaves). * 2) Count the max number of non failing slaves for a single master. * 3) Count the number of slaves for our master, if we are a slave. */ orphaned_masters = 0; max_slaves = 0; this_slaves = 0; di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); now = mstime(); /* Use an updated time at every iteration. */ mstime_t delay;
// 跳过节点本身、无地址节点、HANDSHAKE 状态的节点 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE)) continue;
/* Orphaned master check, useful only if the current instance * is a slave that may migrate to another master. */ if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) { int okslaves = clusterCountNonFailingSlaves(node);
if (okslaves == 0 && node->numslots > 0) orphaned_masters++; if (okslaves > max_slaves) max_slaves = okslaves; if (nodeIsSlave(myself) && myself->slaveof == node) this_slaves = okslaves; }
/* If we are waiting for the PONG more than half the cluster * timeout, reconnect the link: maybe there is a connection * issue even if the node is alive. */ // 如果等到 PONG 到达的时间超过了 node timeout 一半的连接 // 因为尽管节点依然正常,但连接可能已经出问题了 if (node->link && /* is connected */ now - node->link->ctime > server.cluster_node_timeout && /* was not already reconnected */ node->ping_sent && /* we already sent a ping */ node->pong_received < node->ping_sent && /* still waiting pong */ /* and we are waiting for the pong more than timeout/2 */ now - node->ping_sent > server.cluster_node_timeout/2) { /* Disconnect the link, it will be reconnected automatically. */ // 释放连接,下次 clusterCron() 会自动重连 freeClusterLink(node->link); }
/* If we have currently no active ping in this instance, and the * received PONG is older than half the cluster timeout, send * a new ping now, to ensure all the nodes are pinged without * a too big delay. */ // 如果目前没有在 PING 节点 // 并且已经有 node timeout 一半的时间没有从节点那里收到 PONG 回复 // 那么向节点发送一个 PING ,确保节点的信息不会太旧 // (因为一部分节点可能一直没有被随机中) if (node->link && node->ping_sent == 0 && (now - node->pong_received) > server.cluster_node_timeout/2) { clusterSendPing(node->link, CLUSTERMSG_TYPE_PING); continue; }
/* If we are a master and one of the slaves requested a manual * failover, ping it continuously. */ // 如果这是一个主节点,并且有一个从服务器请求进行手动故障转移 // 那么向从服务器发送 PING 。 if (server.cluster->mf_end && nodeIsMaster(myself) && server.cluster->mf_slave == node && node->link) { clusterSendPing(node->link, CLUSTERMSG_TYPE_PING); continue; }
/* Check only if we have an active ping for this instance. */ // 以下代码只在节点发送了 PING 命令的情况下执行 if (node->ping_sent == 0) continue;
/* Compute the delay of the PONG. Note that if we already received * the PONG, then node->ping_sent is zero, so can't reach this * code at all. */ // 计算等待 PONG 回复的时长 delay = now - node->ping_sent;
// 等待 PONG 回复的时长超过了限制值,将目标节点标记为 PFAIL (疑似下线) if (delay > server.cluster_node_timeout) { /* Timeout reached. Set the node as possibly failing if it is * not already in this state. */ if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) { redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing", node->name); // 打开疑似下线标记 node->flags |= REDIS_NODE_PFAIL; update_state = 1; } } } dictReleaseIterator(di);
/* If we are a slave node but the replication is still turned off, * enable it if we know the address of our master and it appears to * be up. */ // 如果从节点没有在复制主节点,那么对从节点进行设置 if (nodeIsSlave(myself) && server.masterhost == NULL && myself->slaveof && nodeHasAddr(myself->slaveof)) { replicationSetMaster(myself->slaveof->ip, myself->slaveof->port); }
/* Abourt a manual failover if the timeout is reached. */ manualFailoverCheckTimeout();
if (nodeIsSlave(myself)) { clusterHandleManualFailover(); clusterHandleSlaveFailover(); /* If there are orphaned slaves, and we are a slave among the masters * with the max number of non-failing slaves, consider migrating to * the orphaned masters. Note that it does not make sense to try * a migration if there is no master with at least *two* working * slaves. */ if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves) clusterHandleSlaveMigration(max_slaves); }
// 更新集群状态 if (update_state || server.cluster->state == REDIS_CLUSTER_FAIL) clusterUpdateState(); }
2、状态信息并不是全量同步,而是随机选M(M << N)个节点的状态同步到其他节点。 M值最小为3,最大为N - 2,一般情况下M = N / 10。
/* Send a PING or PONG packet to the specified node, making sure to add enough * gossip informations. */ // 向指定节点发送一条 MEET 、 PING 或者 PONG 消息 void clusterSendPing(clusterLink *link, int type) { unsigned char buf[sizeof(clusterMsg)]; clusterMsg *hdr = (clusterMsg*) buf; int gossipcount = 0, totlen; /* freshnodes is the number of nodes we can still use to populate the * gossip section of the ping packet. Basically we start with the nodes * we have in memory minus two (ourself and the node we are sending the * message to). Every time we add a node we decrement the counter, so when * it will drop to <= zero we know there is no more gossip info we can * send. */ // freshnodes 是用于发送 gossip 信息的计数器 // 每次发送一条信息时,程序将 freshnodes 的值减一 // 当 freshnodes 的数值小于等于 0 时,程序停止发送 gossip 信息 // freshnodes 的数量是节点目前的 nodes 表中的节点数量减去 2 // 这里的 2 指两个节点,一个是 myself 节点(也即是发送信息的这个节点) // 另一个是接受 gossip 信息的节点 int freshnodes = dictSize(server.cluster->nodes)-2;
// 如果发送的信息是 PING ,那么更新最后一次发送 PING 命令的时间戳 if (link->node && type == CLUSTERMSG_TYPE_PING) link->node->ping_sent = mstime();
/* Create RESTORE payload and generate the protocol to call the command. */ for (j = 0; j < num_keys; j++) { long long ttl = 0; long long expireat = getExpire(c->db,kv[j]); //检查键是不是已经过期 if (expireat != -1) { ttl = expireat-mstime(); if (ttl < 0) { continue; } if (ttl < 1) ttl = 1; } kv[non_expired++] = kv[j];
/* If cluster is enabled perform the cluster redirection here. * * 如果开启了集群模式,那么在这里进行转向操作。 * * However we don't perform the redirection if: * * 不过,如果有以下情况出现,那么节点不进行转向: * * 1) The sender of this command is our master. * 命令的发送者是本节点的主节点 * * 2) The command has no key arguments. * 命令没有 key 参数 */ if (server.cluster_enabled && !(c->flags & REDIS_MASTER) && !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) { int hashslot;
// 集群已下线 if (server.cluster->state != REDIS_CLUSTER_OK) { flagTransaction(c); addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n")); return REDIS_OK;
// 集群运作正常 } else { int error_code; clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code); // 不能执行多键处理命令 if (n == NULL) { flagTransaction(c); if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) { addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n")); } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) { /* The request spawns mutliple keys in the same slot, * but the slot is not "stable" currently as there is * a migration or import in progress. */ addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n")); } else { redisPanic("getNodeByQuery() unknown error."); } return REDIS_OK;
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
...
/* If we are receiving the slot, and the client correctly flagged the * request as "ASKING", we can serve the request. However if the request * involves multiple keys and we don't have them all, the only option is * to send a TRYAGAIN error. */ if (importing_slot && (c->flags & REDIS_ASKING || cmd->flags & REDIS_CMD_ASKING)) { if (multiple_keys && missing_keys) { if (error_code) *error_code = REDIS_CLUSTER_REDIR_UNSTABLE; return NULL; } else { return myself; } } ... }
/* This is executed 10 times every second */ // 集群常规操作函数,默认每秒执行 10 次(每间隔 100 毫秒执行一次) void clusterCron(void) { ...
while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); now = mstime(); /* Use an updated time at every iteration. */ mstime_t delay;
...
/* If we are waiting for the PONG more than half the cluster * timeout, reconnect the link: maybe there is a connection * issue even if the node is alive. */ // 判断连接的节点是否出事 if (node->link && /* is connected */ now - node->link->ctime > server.cluster_node_timeout && /* was not already reconnected */ // ping_sent记录发送命令的时间 node->ping_sent && /* we already sent a ping */ node->pong_received < node->ping_sent && /* still waiting pong */ /* and we are waiting for the pong more than timeout/2 */ // PONG 到达的时间超过了 node_timeout 的一半 now - node->ping_sent > server.cluster_node_timeout/2) { /* Disconnect the link, it will be reconnected automatically. */ // 释放连接,此时node->link=NULL,下次 clusterCron() 会自动重连 freeClusterLink(node->link); } ... } ... }
/* This is executed 10 times every second */ // 集群常规操作函数,默认每秒执行 10 次(每间隔 100 毫秒执行一次) void clusterCron(void) {
...
/* Check if we have disconnected nodes and re-establish the connection. */ // 向集群中的所有断线或者未连接节点发送消息 di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de);
// 跳过当前节点以及没有地址的节点 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
...
// 为未创建连接的节点创建连接 if (node->link == NULL) { int fd; mstime_t old_ping_sent; clusterLink *link;
fd = anetTcpNonBlockBindConnect(server.neterr, node->ip, node->port+REDIS_CLUSTER_PORT_INCR, server.bindaddr_count ? server.bindaddr[0] : NULL); if (fd == -1) { redisLog(REDIS_DEBUG, "Unable to connect to " "Cluster Node [%s]:%d -> %s", node->ip, node->port+REDIS_CLUSTER_PORT_INCR, server.neterr); continue; } link = createClusterLink(node); link->fd = fd; node->link = link; aeCreateFileEvent(server.el,link->fd,AE_READABLE, clusterReadHandler,link); /* Queue a PING in the new connection ASAP: this is crucial * to avoid false positives in failure detection. * * If the node is flagged as MEET, we send a MEET message instead * of a PING one, to force the receiver to add us in its node * table. */ // 向新连接的节点发送 PING 命令,防止节点被识进入下线 // 如果节点被标记为 MEET ,那么发送 MEET 命令,否则发送 PING 命令 old_ping_sent = node->ping_sent; clusterSendPing(link, node->flags & REDIS_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
// 这不是第一次发送 PING 信息,所以可以还原这个时间 // 等 clusterSendPing() 函数来更新它 if (old_ping_sent) { /* If there was an active ping before the link was * disconnected, we want to restore the ping time, otherwise * replaced by the clusterSendPing() call. */ node->ping_sent = old_ping_sent; }
/* We can clear the flag after the first packet is sent. * * 在发送 MEET 信息之后,清除节点的 MEET 标识。 * * If we'll never receive a PONG, we'll never send new packets * to this node. Instead after the PONG is received and we * are no longer in meet/handshake status, we want to send * normal PING packets. * * 如果当前节点(发送者)没能收到 MEET 信息的回复, * 那么它将不再向目标节点发送命令。 * * 如果接收到回复的话,那么节点将不再处于 HANDSHAKE 状态, * 并继续向目标节点发送普通 PING 命令。 */ node->flags &= ~REDIS_NODE_MEET;
redisLog(REDIS_DEBUG,"Connecting with Node %.40s at %s:%d", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR); } } ... }
// 遍历所有节点,检查是否需要将某个节点标记为下线 /* Iterate nodes to check if we need to flag something as failing. * This loop is also responsible to: * 1) Check if there are orphaned masters (masters without non failing * slaves). * 2) Count the max number of non failing slaves for a single master. * 3) Count the number of slaves for our master, if we are a slave. */ orphaned_masters = 0; max_slaves = 0; this_slaves = 0; di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { ...
/* Check only if we have an active ping for this instance. */ // 以下代码只在节点发送了 PING 命令的情况下执行 if (node->ping_sent == 0) continue;
/* Compute the delay of the PONG. Note that if we already received * the PONG, then node->ping_sent is zero, so can't reach this * code at all. */ // 计算等待 PONG 回复的时长 delay = now - node->ping_sent;
// 等待 PONG 回复的时长超过了限制值,将目标节点标记为 PFAIL (疑似下线) if (delay > server.cluster_node_timeout) { /* Timeout reached. Set the node as possibly failing if it is * not already in this state. */ if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) { redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing", node->name); // 打开疑似下线标记 node->flags |= REDIS_NODE_PFAIL; update_state = 1; } } } ... }
// 这是一条 PING 消息或者 MEET 消息 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) { redisLog(REDIS_DEBUG,"Ping packet received: %p", (void*)link->node);
/* Add this node if it is new for us and the msg type is MEET. * * 如果当前节点是第一次遇见这个节点,并且对方发来的是 MEET 信息, * 那么将这个节点添加到集群的节点列表里面。 * * In this stage we don't try to add the node with the right * flags, slaveof pointer, and so forth, as this details will be * resolved when we'll receive PONGs from the node. * * 节点目前的 flag 、 slaveof 等属性的值都是未设置的, * 等当前节点向对方发送 PING 命令之后, * 这些信息可以从对方回复的 PONG 信息中取得。 */ if (!sender && type == CLUSTERMSG_TYPE_MEET) { clusterNode *node;
// 遍历所有节点的信息 while(count--) { sds ci = sdsempty();
// 分析节点的 flag uint16_t flags = ntohs(g->flags);
// 信息节点 clusterNode *node;
...
/* Update our state accordingly to the gossip sections */ // 使用消息中的信息对节点进行更新 node = clusterLookupNode(g->nodename); // 节点已经存在于当前节点 if (node) { /* We already know this node. Handle failure reports, only when the sender is a master. */ // 如果 sender 是一个主节点,那么我们需要处理下线报告 if (sender && nodeIsMaster(sender) && node != myself) { // 节点处于 FAIL 或者 PFAIL 状态 if (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)) {
// 添加 sender 对 node 的下线报告 if (clusterNodeAddFailureReport(node,sender)) { redisLog(REDIS_VERBOSE, "Node %.40s reported node %.40s as not reachable.", sender->name, node->name); }
// 如果 sender 曾经发送过对 node 的下线报告 // 那么清除该报告 if (clusterNodeDelFailureReport(node,sender)) { redisLog(REDIS_VERBOSE, "Node %.40s reported node %.40s is back online.", sender->name, node->name); } } }
/* If we already know this node, but it is not reachable, and * we see a different address in the gossip section, start an * handshake with the (possibly) new address: this will result * into a node address update if the handshake will be * successful. */ // 如果节点之前处于 PFAIL 或者 FAIL 状态 // 并且该节点的 IP 或者端口号已经发生变化 // 那么可能是节点换了新地址,尝试对它进行握手 if (node->flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL) && (strcasecmp(node->ip,g->ip) || node->port != ntohs(g->port))) { clusterStartHandshake(g->ip,ntohs(g->port)); }
// 当前节点不认识 node } else { /* If it's not in NOADDR state and we don't have it, we * start a handshake process against this IP/PORT pairs. * * 如果 node 不在 NOADDR 状态,并且当前节点不认识 node * 那么向 node 发送 HANDSHAKE 消息。 * * Note that we require that the sender of this gossip message * is a well known node in our cluster, otherwise we risk * joining another cluster. * * 注意,当前节点必须保证 sender 是本集群的节点, * 否则我们将有加入了另一个集群的风险。 */ if (sender && !(flags & REDIS_NODE_NOADDR) && !clusterBlacklistExists(g->nodename)) { clusterStartHandshake(g->ip,ntohs(g->port)); } }
/* This function checks if a given node should be marked as FAIL. * It happens if the following conditions are met: * * 此函数用于判断是否需要将 node 标记为 FAIL 。 * * 将 node 标记为 FAIL 需要满足以下两个条件: * * 1) We received enough failure reports from other master nodes via gossip. * Enough means that the majority of the masters signaled the node is * down recently. * 有半数以上的主节点将 node 标记为 PFAIL 状态。 * 2) We believe this node is in PFAIL state. * 当前节点也将 node 标记为 PFAIL 状态。 * * If a failure is detected we also inform the whole cluster about this * event trying to force every other node to set the FAIL flag for the node. * * 如果确认 node 已经进入了 FAIL 状态, * 那么节点还会向其他节点发送 FAIL 消息,让其他节点也将 node 标记为 FAIL 。 * * Note that the form of agreement used here is weak, as we collect the majority * of masters state during some time, and even if we force agreement by * propagating the FAIL message, because of partitions we may not reach every * node. However: * * 注意,集群判断一个 node 进入 FAIL 所需的条件是弱(weak)的, * 因为节点们对 node 的状态报告并不是实时的,而是有一段时间间隔 * (这段时间内 node 的状态可能已经发生了改变), * 并且尽管当前节点会向其他节点发送 FAIL 消息, * 但因为网络分裂(network partition)的问题, * 有一部分节点可能还是会不知道将 node 标记为 FAIL 。 * * 不过: * * 1) Either we reach the majority and eventually the FAIL state will propagate * to all the cluster. * 只要我们成功将 node 标记为 FAIL , * 那么这个 FAIL 状态最终(eventually)总会传播至整个集群的所有节点。 * 2) Or there is no majority so no slave promotion will be authorized and the * FAIL flag will be cleared after some time. * 又或者,因为没有半数的节点支持,当前节点不能将 node 标记为 FAIL , * 所以对 FAIL 节点的故障转移将无法进行, FAIL 标识可能会在之后被移除。 * */ void markNodeAsFailingIfNeeded(clusterNode *node) { int failures;
/* Also count myself as a voter if I'm a master. */ // 如果当前节点是主节点,那么将当前节点也算在 failures 之内 if (nodeIsMaster(myself)) failures++; // 报告下线节点的数量不足节点总数的一半,不能将节点判断为 FAIL ,返回 if (failures < needed_quorum) return; /* No weak agreement from masters. */
redisLog(REDIS_NOTICE, "Marking node %.40s as failing (quorum reached).", node->name);
/* Mark the node as failing. */ // 将 node 标记为 FAIL node->flags &= ~REDIS_NODE_PFAIL; node->flags |= REDIS_NODE_FAIL; node->fail_time = mstime();
/* Broadcast the failing node name to everybody, forcing all the other * reachable nodes to flag the node as FAIL. */ // 如果当前节点是主节点的话,那么向其他节点发送报告 node 的 FAIL 信息 // 让其他节点也将 node 标记为 FAIL if (nodeIsMaster(myself)) clusterSendFail(node->name); clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); }
/* Send a FAIL message to all the nodes we are able to contact. * * 向当前节点已知的所有节点发送 FAIL 信息。 */ void clusterSendFail(char *nodename) { unsigned char buf[sizeof(clusterMsg)]; clusterMsg *hdr = (clusterMsg *) buf;
/* Send a message to all the nodes that are part of the cluster having * a connected link. * * 向节点连接的所有其他节点发送信息。 */ void clusterBroadcastMessage(void *buf, size_t len) { dictIterator *di; dictEntry *de;
// 遍历所有已知节点 di = dictGetSafeIterator(server.cluster->nodes); while ((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de);
/* Abourt a manual failover if the timeout is reached. */ manualFailoverCheckTimeout();
// 如果当前节点是子节点 if (nodeIsSlave(myself)) { clusterHandleManualFailover(); // 处理集群子节点的故障迁移 clusterHandleSlaveFailover(); /* If there are orphaned slaves, and we are a slave among the masters * with the max number of non-failing slaves, consider migrating to * the orphaned masters. Note that it does not make sense to try * a migration if there is no master with at least *two* working * slaves. */ if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves) clusterHandleSlaveMigration(max_slaves); }
// 更新集群状态 if (update_state || server.cluster->state == REDIS_CLUSTER_FAIL) clusterUpdateState(); }
/* This function is called if we are a slave node and our master serving * a non-zero amount of hash slots is in FAIL state. * * 如果当前节点是一个从节点,并且它正在复制的一个负责非零个槽的主节点处于 FAIL 状态, * 那么执行这个函数。 * * The gaol of this function is: * * 这个函数有三个目标: * * 1) To check if we are able to perform a failover, is our data updated? * 检查是否可以对主节点执行一次故障转移,节点的关于主节点的信息是否准确和最新(updated)? * 2) Try to get elected by masters. * 选举一个新的主节点 * 3) Perform the failover informing all the other nodes. * 执行故障转移,并通知其他节点 */ void clusterHandleSlaveFailover(void) { mstime_t data_age; mstime_t auth_age = mstime() - server.cluster->failover_auth_time; int needed_quorum = (server.cluster->size / 2) + 1; int manual_failover = server.cluster->mf_end != 0 && server.cluster->mf_can_start; int j; mstime_t auth_timeout, auth_retry_time;
/* Compute the failover timeout (the max time we have to send votes * and wait for replies), and the failover retry time (the time to wait * before waiting again. * * Timeout is MIN(NODE_TIMEOUT*2,2000) milliseconds. * Retry is two times the Timeout. */ auth_timeout = server.cluster_node_timeout * 2; if (auth_timeout < 2000) auth_timeout = 2000; auth_retry_time = auth_timeout * 2;
/* Pre conditions to run the function, that must be met both in case * of an automatic or manual failover: * 1) We are a slave. * 2) Our master is flagged as FAIL, or this is a manual failover. * 3) It is serving slots. */ if (nodeIsMaster(myself) || myself->slaveof == NULL || (!nodeFailed(myself->slaveof) && !manual_failover) || myself->slaveof->numslots == 0) return; ...
/* Set data_age to the number of seconds we are disconnected from * the master. */ // 将 data_age 设置为从节点与主节点的断开秒数 if (server.repl_state == REDIS_REPL_CONNECTED) { data_age = (mstime_t) (server.unixtime - server.master->lastinteraction) * 1000; } else { data_age = (mstime_t) (server.unixtime - server.repl_down_since) * 1000; }
/* Remove the node timeout from the data age as it is fine that we are * disconnected from our master at least for the time it was down to be * flagged as FAIL, that's the baseline. */ // node timeout 的时间不计入断线时间之内 if (data_age > server.cluster_node_timeout) data_age -= server.cluster_node_timeout;
/* Check if our data is recent enough. For now we just use a fixed * constant of ten times the node timeout since the cluster should * react much faster to a master down. * * Check bypassed for manual failovers. */ // 检查这个从节点的数据是否较新: // 目前的检测办法是断线时间不能超过 node timeout 的十倍 if (data_age > ((mstime_t)server.repl_ping_slave_period * 1000) + (server.cluster_node_timeout * REDIS_CLUSTER_SLAVE_VALIDITY_MULT)) { if (!manual_failover) return; }
/* If the previous failover attempt timedout and the retry time has * elapsed, we can setup a new one. */ if (auth_age > auth_retry_time) { server.cluster->failover_auth_time = mstime() + 500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */ random() % 500; /* Random delay between 0 and 500 milliseconds. */ server.cluster->failover_auth_count = 0; server.cluster->failover_auth_sent = 0; server.cluster->failover_auth_rank = clusterGetSlaveRank(); /* We add another delay that is proportional to the slave rank. * Specifically 1 second * rank. This way slaves that have a probably * less updated replication offset, are penalized. */ server.cluster->failover_auth_time += server.cluster->failover_auth_rank * 1000; /* However if this is a manual failover, no delay is needed. */ if (server.cluster->mf_end) { server.cluster->failover_auth_time = mstime(); server.cluster->failover_auth_rank = 0; } redisLog(REDIS_WARNING, "Start of election delayed for %lld milliseconds " "(rank #%d, offset %lld).", server.cluster->failover_auth_time - mstime(), server.cluster->failover_auth_rank, replicationGetSlaveOffset()); /* Now that we have a scheduled election, broadcast our offset * to all the other slaves so that they'll updated their offsets * if our offset is better. */ clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES); return; } ... /* Return ASAP if we can't still start the election. */ // 如果执行故障转移的时间未到,先返回 if (mstime() < server.cluster->failover_auth_time) return;
/* Check if we reached the quorum. */ // 如果当前节点获得了足够多的投票,那么对下线主节点进行故障转移 if (server.cluster->failover_auth_count >= needed_quorum) { // 旧主节点 clusterNode *oldmaster = myself->slaveof;
redisLog(REDIS_WARNING, "Failover election won: I'm the new master.");
/* We have the quorum, perform all the steps to correctly promote * this slave to a master. * * 1) Turn this node into a master. * 将当前节点的身份由从节点改为主节点 */ clusterSetNodeAsMaster(myself); // 让从节点取消复制,成为新的主节点 replicationUnsetMaster();
/* 2) Claim all the slots assigned to our master. */ // 接收所有主节点负责处理的槽 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) { if (clusterNodeGetSlotBit(oldmaster, j)) { // 将槽设置为未分配的 clusterDelSlot(j); // 将槽的负责人设置为当前节点 clusterAddSlot(myself, j); } }
/* 3) Update my configEpoch to the epoch of the election. */ // 更新集群配置纪元 myself->configEpoch = server.cluster->failover_auth_epoch;
/* 4) Update state and save config. */ // 更新节点状态 clusterUpdateState(); // 并保存配置文件 clusterSaveConfigOrDie(1);
/* 5) Pong all the other nodes so that they can update the state * accordingly and detect that we switched to master role. */ // 向所有节点发送 PONG 信息 // 让它们可以知道当前节点已经升级为主节点了 clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
/* 6) If there was a manual failover in progress, clear the state. */ // 如果有手动故障转移正在执行,那么清理和它有关的状态 resetManualFailover(); } }
/* 5) Pong all the other nodes so that they can update the state * accordingly and detect that we switched to master role. */ // 向所有节点发送 PONG 信息 // 让它们可以知道当前节点已经升级为主节点了 clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
public class ReferenceCountingGC { public Object instance = null; private static final int _1MB = 1024 * 1024; /** * 这个成员属性的唯一意义就是占点内存,以便能在GC日志中看清楚是否被回收过 */ private byte[] bigSize = new byte[2 * _1MB];
public static void testGC() { // 定义两个对象 ReferenceCountingGC objA = new ReferenceCountingGC(); ReferenceCountingGC objB = new ReferenceCountingGC();
过早晋升 如果发生了以下情况,可能是发生了过早晋升: 分配速率接近于晋升速率,对象晋升年龄较小。 GC 日 志 中 出 现“Desired survivor size 107347968 bytes, new threshold 1(max 6)”等信息,说明此时经历过一次 GC 就会放到 Old 区。 Full GC 比较频繁,且经历过一次 GC 之后 Old 区的变化比例非常大。
发生过早晋升的根本原因可能是:Young/Eden区过小;分配速率过大。
晋升年龄受一个阈值MaxTenuringThreshold控制,如果设置得过大,会导致该晋升的对象一直停留在年轻代,每次YoungGC都需要复制大量对象,失去了老年代的作用;如果设置得过小,大量对象被晋升到Old区,失去了年轻代的作用。不同情况下JVM内存成分不同,对象的生命周期分布也不同,因此晋升年龄是动态调整的。 /src/hotspot/share/gc/shared/ageTable.cpp#compute_tenuring_threshold 可以看到 Hotspot 遍历所有对象时,从所有年龄为 0 的对象占用的空间开始累加,如果加上年龄等于 n 的所有对象的空间之后,使用 Survivor 区的条件值(Target-SurvivorRatio / 100,TargetSurvivorRatio 默认值为 50)进行判断,若大于这个值则结束循环,将 n 和 MaxTenuringThreshold 比较,若 n 小,则阈值为 n,若 n 大,则只能去设置最大阈值为 MaxTenuringThreshold。动态年龄触发后导致更多的对象进入了 Old 区,造成资源浪费。 如果是Young/Eden过小,可以调整比例,一般可以在Heap 内存不变的情况下适当增大 Young 区,一般情况下 Old 的大小应当为活跃对象的 2~3 倍左右,考虑到浮动垃圾问题最好在 3 倍左右,剩下的都可以分给 Young 区。 如果是分配速率过大,可以分析一下代码是不是哪些地方动态加载类过快了;或者直接扩大元空间,适应这种速度。
老年代空间不足,引起FullGC,这种情况比较复杂,有以下几种情况: 3.1、通过对象的正常晋升机制触发对象向老年代移动时,老年代空间不足,由-XX:MaxTenureThreshold参数定义; 3.2、大对象直接进入老年代,此时老年代空间不足,由-XX:PretenureSizeThreshold参数定义; 3.3、动态年龄判定机制会将对象提前转移至老年代。年龄从小到大累加,当加入某个年龄段后,这个年龄对象占用空间大小总和超过survivor区域 * -XX:TargetSurvivorRatio的时候,从这个年龄段往上年龄的对象进入老年代; 3.4、由 Eden 区、From Space 区向 To Space 区复制时,对象大小大于 To Space 可用内存,则把该对象转存到老年代,且老年代的可用内存小于该对象大小。
-Xms20M -Xmx20M -Xmn10M -XX:SurvivorRatio=8 -XX:+PrintGCDetails -XX:+UseSerialGC public class AllocEdenTest { private static final int _1MB = 1024 * 1024;
public static void testAllocation() { byte[] alloc1, alloc2, alloc3, alloc4; alloc1 = new byte[2 * _1MB]; alloc2 = new byte[2 * _1MB]; alloc3 = new byte[2 * _1MB]; alloc4 = new byte[4 * _1MB]; }
public static void main(String[] args) { testAllocation(); } } GC日志: [GC (Allocation Failure) [DefNew: 7223K->685K(9216K), 0.0125141 secs] 7223K->4781K(19456K), 0.0125503 secs] [Times: user=0.00 sys=0.00, real=0.01 secs] Heap def new generation total 9216K, used 7071K [0x00000007bec00000, 0x00000007bf600000, 0x00000007bf600000) eden space 8192K, 77% used [0x00000007bec00000, 0x00000007bf23c948, 0x00000007bf400000) from space 1024K, 66% used [0x00000007bf500000, 0x00000007bf5ab658, 0x00000007bf600000) Disconnected from the target VM, address: '127.0.0.1:58261', transport: 'socket' to space 1024K, 0% used [0x00000007bf400000, 0x00000007bf400000, 0x00000007bf500000) tenured generation total 10240K, used 4096K [0x00000007bf600000, 0x00000007c0000000, 0x00000007c0000000) the space 10240K, 40% used [0x00000007bf600000, 0x00000007bfa00020, 0x00000007bfa00200, 0x00000007c0000000) Metaspace used 2989K, capacity 4568K, committed 4864K, reserved 1056768K class space used 318K, capacity 392K, committed 512K, reserved 1048576K
新生代可用的空间:9M = 8M(Eden 空间容量) + 1M(一个 Survivor 空间的容量) 老年代可用的空间:10M 分配完 alloc1、alloc2、alloc3 之后,无法再分配 alloc4,会发生分配失败,则需要进行一次 Minor GC,survivor to 区域的容量为 1M,无法容纳总量为 6M 的三个对象,则会通过担保机制将 alloc1、allo2 转移到老年代,然后再将 alloc4 分配在 Eden 区。
大对象直接进入 Tenured 区
大对象需要大块连续内存空间,大对象的出现容易提前触发 GC 以获取更大的连续空间来供分配大对象,可以设置-XX:PretenureSizeThreshold的值来控制多大的对象直接分配到 Tenured 区,默认是 0,即所有对象不管多大都先在 Eden 区中分配空间。
/** * -Xms20M -Xmx20M -Xmn10M * -XX:SurvivorRatio=8 * -XX:+PrintGCDetails * -XX:+UseSerialGC * -XX:PretenureSizeThreshold=3145728 */ public class AllocBigObjectTest { private static final int _1MB = 1024 * 1024;
public static void main(String[] args) { byte[] alloc = new byte[5 * _1MB]; } }
Heap def new generation total 9216K, used 1180K [0x00000007bec00000, 0x00000007bf600000, 0x00000007bf600000) eden space 8192K, 14% used [0x00000007bec00000, 0x00000007bed27010, 0x00000007bf400000) from space 1024K, 0% used [0x00000007bf400000, 0x00000007bf400000, 0x00000007bf500000) to space 1024K, 0% used [0x00000007bf500000, 0x00000007bf500000, 0x00000007bf600000) tenured generation total 10240K, used 5120K [0x00000007bf600000, 0x00000007c0000000, 0x00000007c0000000) the space 10240K, 50% used [0x00000007bf600000, 0x00000007bfb00010, 0x00000007bfb00200, 0x00000007c0000000) Metaspace used 2662K, capacity 4486K, committed 4864K, reserved 1056768K class space used 287K, capacity 386K, committed 512K, reserved 1048576K
// If incremental collection failed, we just want to expand // to the limit. if (incremental_collection_failed()) { clear_incremental_collection_failed(); grow_to_reserved(); return; }
// The heap has been compacted but not reset yet. // Any metric such as free() or used() will be incorrect.
CardGeneration::compute_new_size();
// Reset again after a possible resizing if (did_compact()) { cmsSpace()->reset_after_compaction(); } }
对垃圾回收算法的改进
复制算法
两个区域 A 和 B,初始对象在 A,继续存活的对象被转移到 B。 这两个区域并不需要根据 1:1 划分内存空间,而是将内存划分为一块较大的 Eden Space 和两块较小的 Survivor Space,在 HotSpot 中默认大小比例为 8:1。 当执行年轻代回收时会将 Eden 区存活的对象复制到一个空闲的 Survivor,下一次 GC 时将 Eden 区和这个 Survivor 区存活的对象复制到另一个 Survivor 区,因此总是会有一块 Survivor 区是空闲的。 当 Survivor 空间不够用的时候,需要依赖于老年代的空间担保。
对 CPU 资源非常敏感。其实,面向并发设计的程序都对 CPU 资源比较敏感。在并发阶段,它虽然不会导致用户线程停顿,但会因为占用了一部分线程(或者说 CPU 资源)而导致应用程序变慢,总吞吐量会降低。CMS 默认启动的回收线程数是(CPU 数量+3)/4,也就是当 CPU 在 4 个以上时,并发回收时垃圾收集线程不少于 25%的 CPU 资源,并且随着 CPU 数量的增加而下降。但是当 CPU 不足 4 个时(比如 2 个),CMS 对用户程序的影响就可能变得很大,如果本来 CPU 负载就比较大,还要分出一半的运算能力去执行收集器线程,就可能导致用户程序的执行速度忽然降低了 50%,其实也让人无法接受。
-XX:G1HeapRegionSize 配置 Region 块的大小,范围 1MB 到 32MB,设置后会根据最小堆 Java 堆内存划分出 2048 个 Region 块
实现原理 - 内存结构与GC算法
在 G1 算法中,采用了另外一种完全不同的方式组织堆内存,堆内存被划分为多个大小相等的内存块,称为Region,每个 Region 是逻辑连续的一段内存,结构如下: 由上图可见:
新生代与老年代并不是连续的,而是一些 Region 的集合;
为了避免全堆扫描,对其他 Region 对象的引用会被记录到一个Remembered Set中,每个 Region 都对应一个 Remembered Set,虚拟机发现程序在对 Reference 类型的数据进行写操作时,会插入一个 Write Barrier 暂时中断写操作,检查 Reference 引用的对象是否位于其他 Region 中,如果是则将其引用信息记录到该 Region 对应的 Remembered Set 中,当进行内存回收时,在 GC 根节点的枚举范围中加入 Remembered Set 即可保证即使不对全堆扫描也不会产生遗漏。
堆内存中一个 Region 的大小可以通过 -XX:G1HeapRegionSize 参数指定,大小区间只能是 1M、2M、4M、8M、16M 和 32M,总之是 2 的幂次方,如果 G1HeapRegionSize 为默认值,则在堆初始化时计算 Region 的实践大小。 G1 可以独立管理整个堆空间,但是能够采用不同方式来处理新创建对象和已经存活了一段时间、经历过多次 GC 的老对象,以获取更好的收集效果。G1 中提供了三种模式垃圾回收模式:Young GC、Mixed GC 和 Full GC,在不同的条件下被触发。
Young GC
发生在年轻代的 GC 算法,一般对象(除了巨型对象)都是在 Eden Region 中分配内存,当所有 Eden Region 被耗尽无法申请内存时,就会触发一次 Young GC,这种触发机制和之前的 Young GC 差不多,执行完一次 Young GC,活跃对象会被拷贝到 Survivor Region 或者晋升到 Old Region 中,空闲的 Region 会被放入空闲列表中,等待下次被使用。
Mixed GC
当越来越多的对象晋升到老年代 Old Region 时,为了避免堆内存被耗尽,虚拟机会触发一个混合的垃圾收集器,即 Mixed GC,该算法并不是一个 old gc,除了回收整个 Young Region,还会回收一部分的 Old Region,这里需要注意:是一部分老年代,而不是全部老年代,可以选择哪些 Old Region 进行收集,从而可以对垃圾回收的耗时时间进行控制。 Mixed GC 的执行过程有点类似 CMS,主要分为以下几个步骤: