Tallate

该吃吃该喝喝 啥事别往心里搁

Redis 为了达到最快的读写速度将数据都读到内存中,并通过异步的方式将数据写入磁盘。所以 redis 具有快速和数据持久化的特征。如果不将数据放在内存中,磁盘 I/O 速度为严重影响 redis 的性能。在内存越来越便宜的今天,redis 将会越来越受欢迎。
如果设置了最大使用的内存,则数据已有记录数达到内存限值后不能继续插入新值。
不过 Redis 也提供了持久化的选项。

阅读全文 »

基本数据结构

String

特点

  1. 最大能存储 512MB == 536870912 B(byte) ;
  2. 二进制安全,在传输数据的时候,能保证二进制数据的信息安全,也就是不会被篡改、破译;如果被攻击,能够及时检测出来
  3. 能存储各种类型的数据,字符串、数字,以至对象(通过json序列化)、位图等。

基本使用

容量:512M

1
2
set aa 'str'
get aa

操作总结

1
2
3
4
5
6
set/get/del/append/strlen key
incr/decr/incrby/decrby key
getrange key start end/setrange key offset value(从offset处开始读取/覆盖)
setex key seconds value(set with expire插入key的同时设置过期时间)/setnx key value(set if not exists如果已存在则直接返回0)
mset/mget/msetnx key value {key value}(设置/读取多个key的值,msetnx比较特殊,要么都成功,要么一个都不执行,可以用来设置一个对象的多个不同字段)
getset key(设置并返回key对应的旧值,可以用于计数器的重置)

常见应用

字符串、jpg图片、序列化对象、一些复杂的计数功能的缓存

Hash

存储 String 类型键值对的映射表、对象

基本使用方法

容量:每个 Hash 可存 2^32 - 1(约 40 亿)个键值对

1
2
3
hmset user username 'name' password '123456' # 定义一个有两个元素的Hash表
hgetall user # 获取user中所有key和value
hget user username # 获取user中key为username的value

常见应用

单点登录(存<CookieId, 用户信息>,设置 30 分钟为缓存过期时间,能很好地模拟出类似 Session 的效果)。

List

String列表

基本使用方法

容量:每个 List 可存 2^32 - 1 个元素

1
2
3
lpush ball basketball soccer # 按顺序从左侧压入,如果ball列表不存在则创建
rpush ball volleyball
lrange 0 1 # 获取索引从0到1的值

操作总结

1
2
3
4
5
6
7
8
9
lpush/rpush/lrange
lpop/rpop
lindex
llen
lrem key
ltrim key
rpoplpush
lset key index value
linsert key before/after val1 val2

常见应用

简单的消息队列
基于 Redis 的分页功能(利用 lrang 命令,性能极佳,用户体验好)

Set

字符串的无序集合,使用 Hash 实现(key 和 value 相同的 Hash)

基本使用方法

容量:2^32 - 1 个成员

1
2
sadd myset li1 # 向集合myset中添加一个元素li1,若不存在则创建
smembers myset

常见应用

全局去重(为什么不用 JDK 自带的 Set 去重?因为我们的系统一般都是集群部署)
计算共同喜好、全部喜好、自己独有的喜好等功能(交集、并集、差集)

ZSet

字符串的有序集合,每个元素都关联一个 double 类型的权重参数 score,集合中的元素能够按 score 进行排列。

基本使用方法

容量:2^32 - 1 个成员

1
2
zadd myzset 0 abc
zrangebyscore myzset 0 10

常见应用

排行榜
取 Top N 操作
延时任务(https://www.cnblogs.com/rjzheng/p/8972725.html)
范围查找

原理

实现上类似于 Java 的 SortedSet 和 HashMap 的结合体,value 唯一(Set 结构的特点),每个 value 一个 score 代表该 value 的排序权重。
zset 内部是使用一种叫做跳跃列表的结构实现的。

Redis 数据结构的实现

数据结构的声明和实现

Redis数据结构及其实现
Redis 中的 set、zset 等结构在 Redis 中并不是由一个单独的数据结构实现的,而是会根据情况有所变化。

set

set和Java中的HashSet有点像,它本身是HashMap的封装,key是集合中的对象,而value直接用NULL代替。
但是注意一些特殊情况:

  • 创建集合对象时,如果发现集合内的元素可以使用整数(longlong)编码,则创建一个intset而不是dict;
  • 之前是intset编码的情况下,插入的新元素如果是非整数的,那么集合会被重新转换成dict编码的;或者插入的元素数量达到了阈值(512),也会自动转换成dict。
    创建代码见:t_set.c/setTypeCreate()

zset

zset同样有两种形态:ziplist编码和skiplist编码。

  • 按ziplist编码的情况下:
    zset本身就是个ziplist对象。
  • 按skiplist编码的情况下:
    zset的集合功能是通过dict实现的,这部分和set并无区别;
    zset的有序性是通过skiplist实现的,skiplist按分值排序成员,支持平均复杂度为O(logN)的按分值定位成员的操作。

执行zadd命令代码:t_zset.c/zaddGenericCommand()
创建zset对象:object.c/createZsetZiplistObject()object.c/createZsetObject()

SDS(Simple Dynamic String)

Redis 中的动态数组有以下特点:

  • 可动态扩展内存。sds 表示的字符串其内容可以修改,也可以追加。在很多语言中字符串会分为 mutable 和 immutable 两种,显然 sds 属于 mutable 类型的。
  • 减少修改字符串的内存重新分配次数
    C语言由于不记录字符串的长度,所以如果要修改字符串,必须要重新分配内存(先释放再申请),因为如果没有重新分配,字符串长度增大时会造成内存缓冲区溢出,字符串长度减小时会造成内存泄露。
    而对于SDS,由于len属性和free属性的存在,对于修改字符串SDS实现了空间预分配和惰性空间释放两种策略:
    1、空间预分配:对字符串进行空间扩展的时候,扩展的内存比实际需要的多,这样可以减少连续执行字符串增长操作所需的内存重分配次数。
    2、惰性空间释放:对字符串进行缩短操作时,程序不立即使用内存重新分配来回收缩短后多余的字节,而是使用 free 属性将这些字节的数量记录下来,等待后续使用。(当然SDS也提供了相应的API,当我们有需要时,也可以手动释放这些未使用的空间。)
  • 二进制安全(Binary Safe)。sds 能存储任意二进制数据,而不仅仅是可打印字符。
  • 与传统的 C 语言字符串类型兼容。
1
2
3
4
5
6
struct SDS<T> {
T capacity; // 数组容量
T len; // 数组当前长度
byte flags; // 特殊标识位,不理睬它
byte[] content; // 数组内容
}

下面的函数将 t 数组拷贝到 s 中,如果长度不够则需要进行扩容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* Append the specified binary-safe string pointed by 't' of 'len' bytes to the
* end of the specified sds string 's'.
*
* After the call, the passed sds string is no longer valid and all the
* references must be substituted with the new pointer returned by the call. */
sds sdscatlen(sds s, const void *t, size_t len) {
size_t curlen = sdslen(s); // 原字符串长度

// 按需调整空间,如果 capacity 不够容纳追加的内容,就会重新分配字节数组并复制原字符串的内容到新数组中
s = sdsMakeRoomFor(s,len);
if (s == NULL) return NULL; // 内存不足
memcpy(s+curlen, t, len); // 追加目标字符串的内容到字节数组中
sdssetlen(s, curlen+len); // 设置追加后的长度值
s[curlen+len] = '\0'; // 让字符串以\0 结尾,便于调试打印,还可以直接使用 glibc 的字符串函数进行操作
return s;
}

SDS 有 embstr 和 raw 两种存储结构,它们的区别是:

  1. 内存分配上:
    embstr 调用 1 次 malloc, 因此 redisObject 和 SDS 内存是连续分配的;
    raw 需要调用 2 次 malloc, 因此 redisObject 和 SDS 内存不连续分配
  2. 使用上:
    embstr 整体 64 byte, 正好和cpu cache line 64byte 一样, 可以更好的使用缓存, 效率更高

quicklist

Redis 早期版本存储 list 数据结构采用(元素少时 ziplist、多时 linkedlist )的方案,但是:

  1. 链表的附加空间太高,prev 和 next 指针就要占去 16 个字节(64 位系统);
  2. 链表每个节点都是单独分配,会加剧内存的碎片化。

因此在之后的版本中转换为了 quicklist 存储。
quicklist 是 ziplist 和 linkedlist 的混合体,它将 linkedlist 按段切分,每一段使用 ziplist 来紧凑存储,多个 ziplist 之间使用双向指针串接起来。
Redis-quicklist结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
struct ziplist {
...
}
struct ziplist_compressed {
int32 size;
byte[] compressed_data;
}
struct quicklistNode {
quicklistNode* prev;
quicklistNode* next;
ziplist* zl; // 指向压缩列表
int32 size; // ziplist 的字节总数
int16 count; // ziplist 中的元素数量
int2 encoding; // 存储形式 2bit,原生字节数组还是 LZF 压缩存储
...
}
struct quicklist {
quicklistNode* head;
quicklistNode* tail;
long count; // 元素总数
int nodes; // ziplist 节点的个数
int compressDepth; // LZF 算法压缩深度
...
}

ziplist

ziplist 是一种压缩存储的数组结构,当 Redis 中的集合数据结构很小,则它会使用这种紧凑的存储形式存储,元素之间紧挨着存储,查找就是对数组进行遍历找到目标对象。

  • zset 和 hash 容器在元素个数较少时会采用 ziplist 存储。当存储的对象数量小于 512 且所有 entry 的 value 值长度小于 64,采用 ziplist 存储,否则转为采用 hashtable 存储。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    import redis
    client = redis.StrictRedis()
    client.delete("hello")
    for i in range(512):
    client.hset("hello", str(i), str(i))
    print client.object("encoding", "hello")
    client.hset("hello", "512", "512")
    # 或者插入一个长度为65的值也能起到转化的作用
    print client.object("encoding", "hello")

可以上服务器上使用debug object命令验证数据结构的类型:

1
2
3
4
> zadd hgc_test 1.0 go 2.0 python 2.0 java
...
> debug object hgc_test
Value at:0x7f73c6d673a0 refcount:1 encoding:ziplist serializedlength:36 lru:1381596 lru_seconds_idle:77

结构

Redis-ziplist结构

1
2
3
4
5
6
7
8
9
10
11
12
struct ziplist<T> {
int32 zlbytes; // 整个压缩列表占用字节数
int32 zltail_offset; // 最后一个元素距离压缩列表起始位置的偏移量,用于快速定位到最后一个节点
int16 zllength; // 元素个数
T[] entries; // 元素内容列表,挨个挨个紧凑存储
int8 zlend; // 标志压缩列表的结束,值恒为 0xFF
}
struct entry {
int<var> prevlen; // 前一个 entry 的字节长度
int<var> encoding; // 元素类型编码
optional byte[] content; // 元素内容
}
  • zltail_offset 字段可以快速定位到 ziplist 中的最后一个节点,可以用于倒序遍历,entry 中的 prevlen 表示前一个 entry 的字节长度,可以用于倒序遍历时找到下一个元素的位置;
  • encoding 记录编码类型,ziplist 利用该字段决定后面的 content 内容的形式,比如用00xxxxxx表示最大长度为 63 的短字符串,01xxxxxx xxxxxxxx表示中等长度的字符串;

插入

ziplist 是紧凑存储的,没有冗余空间,因此插入一个新元素就需要调用 realloc 重新分配内存空间,并将之前的内容一次性拷贝到新的内存空间中。
重新分配空间是比较耗时的,因此 ziplist 不适合存储大量数据。

更新/删除

修改或删除一个元素后其后一个位置的元素中的 prevlen 也需要级联更新,prevlen 字段又是变长的,所以可能会导致连锁反应。

ziplist vs dict

为什么 hash 结构中会采用 ziplist 而不是 dict,主要原因如下:

  1. 数据量小时,ziplist 的速度也很快;
  2. 数据量大时,ziplist 在每次插入或修改时引发的 realloc 操作会有更大的概率造成内存拷贝,从而降低性能,而且数据项过多的时候,在 ziplist 上查找指定数据项的性能会变得很低,因为在 ziplist 上的查找需要进行遍历。

dict(字典)

dict 是 Redis 中使用最广泛的数据结构:

  1. hash 结构的数据会用到字典;
  2. 整个 Redis 数据库的所有 key 和 value 也组成了一个全局字典;
  3. 带过期时间的 key 集合也是一个字典;
  4. set 结构的底层实现也是字典,只是所有 value 都是 NULL;
  5. zset 集合中存储 value 和 score 值的映射关系也是通过 dict 结构实现的。
1
2
3
4
5
6
7
8
9
10
struct RedisDb {
dict* dict; // all keys key=>value
dict* expires; // all expired keys key=>long(timestamp)
...
}

struct dict {
...
dictht ht[2];
}
1
2
3
4
struct zset {
dict *dict; // all values value=>score
zskiplist *zsl;
}

hashtable

dict 中的 hashtable 结构和 Java 中的 HashMap 类似,使用一个数组来保存所有的哈希桶,通过siphash函数来将 key 散列到数组中的某个桶上,每个哈希桶都是一个链表,也就是说如果发生哈希冲突,则将新元素直接插入到桶的头部。

1
2
3
4
5
6
7
8
9
10
11
struct dictEntry {
void* key;
void* val;
dictEntry* next; // 链接下一个 entry
}
struct dictht {
dictEntry** table; // 二维
long size; // 第一维数组的长度
long used; // hash 表中的元素个数
...
}

扩容:渐进式 rehash

正常情况下,当 hashtable 中元素的个数等于第一维数组的长度时、来了一个新增/修改/删除操作,就会触发扩容,扩容的新数组是原数组大小的 2 倍。

存在一个特殊情况:如果 Redis 正在做 bgsave,为了减少内存页的过多分离 (Copy On Write),Redis 尽量不去扩容 (dict_can_resize),但是如果 hash 表已经非常满了,元素的个数已经达到了第一维数组长度的 5 倍 (dict_force_resize_ratio),说明 hash 表已经过于拥挤了,这个时候就会强制扩容。

Redis-dict扩容rehash
一般情况下 dict 中只有一个 hashtable 有值,但是在扩容时会分配另一个新的 hashtable,然后执行渐进式的数据迁移,避免一次性对所有 key 执行 rehash,而是将 rehash 操作分散到了对 dict 的各个增删改查操作中去了。

  1. 在扩容过程中,如果有新增元素,则该元素会被同时添加到新 hashtable 中;
  2. 查询、删除、修改操作中,会先查询旧 hashtable,若存在则迁移这个 key 所在的桶并返回元素,若不存在则到新 hashtable 中查找元素。
  3. 有一个异步线程执行定时任务对字典主动迁移。

dict 之所以这样设计,是为了避免 rehash 期间单个请求的响应时间剧烈增加。
当旧 hashtable 中无元素时,即代表迁移完毕,这时会将新旧 hashtable 的指针交换,旧的会被删除,而新的则取而代之。

缩容

当 hash 表因为元素的逐渐删除变得越来越稀疏时,Redis 会对 hash 表进行缩容来减少 hash 表的第一维数组空间占用。缩容的条件是元素个数低于数组长度的 10%。
缩容不会考虑 Redis 是否正在做 bgsave,因为 COW 的特性是当内存页上的数据被修改时会复制一页做修改,如果删除操作并不会触发删除内存页的数据,操作系统回收内存机制导致的。

全局哈希表

get allen b中的a和b是不同数据结构的对象,他们统统被存储在一个叫全局哈希表的地方。
哈希表中的每个哈希桶存储的不是值本身,而是指向它们的指针。
Redis中的全局哈希表
代码定义在:redis.h/redisDb

缺点:

  1. 过多的哈希冲突容易产生过长的哈希桶(哈希冲突链)。
    为了减少这个问题产生的影响,需要对哈希表进行rehash操作,这个rehash操作和dict数据结构的rehash原理是一样的。

    全局哈希表实际上就是dict,可以看源码中的定义。

优点:

  1. 合适的散列函数和扩容机制可以保证O(1)的操作复杂度。

skiplist

zset 中除了 dict(字典)外,还会用一个 skiplist 来提供按score排序的要求,以实现指定 score 的范围来获取 value 列表的功能。

Redis-skiplist结构

1
2
3
4
5
6
7
8
9
10
11
12
struct zslnode {
string value;
double score;
zslnode*[] forwards; // 多层连接指针
zslnode* backward; // 回溯指针
}

struct zsl {
zslnode* header; // 跳跃列表头指针
int maxLevel; // 跳跃列表当前的最高层
map<string, zslnode*> ht; // hash 结构的所有键值对
}
  • 各层均为一个有序的链表结构;
  • 层数越大,节点越少;
  • 有一个 header 节点作为哨兵,value=null,score=Double.MIN_VALUE。

插入

插入时会先自顶向下找到新节点在跳表中底层的插入位置,插入每一层时都有概率晋升到更高一层,在 Redis 中是 25%。

删除

删除每一层上的对应节点。

更新

如果不影响排序则直接更新,否则会先删除再重新插入。

布隆过滤器

HyperLogLog

布隆过滤器用于实现contains的需求,而 HyperLogLog 主要用于实现count
同样是一个特别大的位数组,HyperLogLog 将位数组拆分为桶,每个桶是连续的 6 个位,计数时并非单独对某个桶计数,而是:

  • set 操作:计算 key 的散列值,为一个 64 位的数字,前 14 位是桶的位置,桶记录后 50 位中第一个 1 的位置 count,并且count = max(count, oldCount),即每次记录最大的计数。
  • count 操作:因为是概率算法,每个桶的计数值并不精确,但是所有桶的调和均值非常接近真实的计数值。

pubsub

用于实现轻量级的发布订阅功能。

geohash

QA

使用string还是hash?

当数据量少时,使用hash明显更加节省内存,因为数据少时hash会转成ziplist的结构,而string每个kv都需要一大堆的额外空间存储元数据。

如何使用Redis的数据结构实现统计?

  1. 需要支持集合运算(差集、交集、并集)的场合
    使用set、zset,数据量少时会转成ziplist节省内存。
  2. 需要进行二值统计的场合
    使用bitmap
  3. 需要大规模统计,且不要求精确统计的场合
    使用HyperLogLog

采用渐进式hash时,如果实例暂时没有接收到新请求,是不是就不会做rehash了?

不会,还有一个定时任务每隔100ms执行rehash,而且每次执行时长不会超过1ms,以免影响其他任务。

参考

  1. redis源码解析

操作

  1. Master
    TODO

  2. Slave

  3. Sentinel

  4. 获取集群信息

    1
    redis-cli -p 26379 info Sentinel
  5. 获取 master 节点地址

    1
    redis-cli -p 26379 SENTINEL get-master-addr-by-name mymaster

客户端如何连接Sentinel集群

Sentinel
在 Sentinel 模式下,客户端不是直接连接服务器的,而是先访问 Sentinel 拿到集群信息再尝试连接 Master。当 Master 发生故障时,客户端会重新向 Sentinel 要地址,并自动完成节点切换。

  • Master 和 Slave 的配置和之前并无区别;
  • Sentinel 相当于对 Master 的代理,Sentinel 可以通过发布订阅功能获取到 Slave 和其他 Sentinel 的信息。

    其实 Sentinel 的内核与其他形式的 Redis 服务器基本一致,只是支持的命令不同、负责的任务也不同。

同理,客户端也可以通过pubsub功能来订阅集群中的其他信息,关键事件如下:
RedisSentinel事件

Sentinel 执行原理

Sentinel的主要任务
在Sentinel的主事件循环中可以看到它每100毫秒执行的定时任务:

1
2
3
4
5
6
7
8
9
10
11
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...

/* Run the Sentinel timer if we are in sentinel mode. */
// 如果服务器运行在 sentinel 模式下,那么执行 SENTINEL 的主函数
run_with_period(100) {
if (server.sentinel_mode) sentinelTimer();
}

...
}

实例状态探测

  • 每个 Sentinel 以每秒钟一次的频率向它所知的主服务器、从服务器以及其他 Sentinel 实例发送一个 PING 命令
    如果一个实例(instance)距离最后一次有效回复 PING 命令的时间超过 down-after-milliseconds 选项所指定的值, 那么这个实例会被 Sentinel 标记为主观下线。 一个有效回复可以是: +PONG 、 -LOADING 或者 -MASTERDOWN。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    void sentinelHandleDictOfRedisInstances(dict *instances) {
    ...

    // 遍历多个实例,这些实例可以是多个主服务器、多个从服务器或者多个 sentinel
    di = dictGetIterator(instances);
    while((de = dictNext(di)) != NULL) {

    // 取出实例对应的实例结构
    sentinelRedisInstance *ri = dictGetVal(de);

    // 执行调度操作
    sentinelHandleRedisInstance(ri);

    // 如果被遍历的是主服务器,那么递归地遍历该主服务器的所有从服务器
    // 以及所有 sentinel
    if (ri->flags & SRI_MASTER) {

    // 所有从服务器
    sentinelHandleDictOfRedisInstances(ri->slaves);

    // 所有 sentinel
    sentinelHandleDictOfRedisInstances(ri->sentinels);

    // 对已下线主服务器(ri)的故障迁移已经完成
    // ri 的所有从服务器都已经同步到新主服务器
    if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
    // 已选出新的主服务器
    switch_to_promoted = ri;
    }
    }
    }

    ...
    }

    /* Perform scheduled operations for the specified Redis instance. */
    // 对给定的实例执行定期操作
    void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {

    /* ========== MONITORING HALF ============ */
    /* ========== 监控操作 =========*/

    /* Every kind of instance */
    /* 对所有类型实例进行处理 */

    // 如果有需要的话,创建连向实例的网络连接
    sentinelReconnectInstance(ri);

    // 根据情况,向实例发送 PING、 INFO 或者 PUBLISH 命令
    sentinelSendPeriodicCommands(ri);

    ...
    }

    // 根据时间和实例类型等情况,向实例发送命令,比如 INFO 、PING 和 PUBLISH
    // 虽然函数的名字包含 Ping ,但命令并不只发送 PING 命令
    /* Send periodic PING, INFO, and PUBLISH to the Hello channel to
    * the specified master or slave instance. */
    void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
    mstime_t now = mstime();
    mstime_t info_period, ping_period;
    int retval;

    /* Return ASAP if we have already a PING or INFO already pending, or
    * in the case the instance is not properly connected. */
    // 函数不能在网络连接未创建时执行
    if (ri->flags & SRI_DISCONNECTED) return;

    /* For INFO, PING, PUBLISH that are not critical commands to send we
    * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
    * want to use a lot of memory just because a link is not working
    * properly (note that anyway there is a redundant protection about this,
    * that is, the link will be disconnected and reconnected if a long
    * timeout condition is detected. */
    // 为了避免 sentinel 在实例处于不正常状态时,发送过多命令
    // sentinel 只在待发送命令的数量未超过 SENTINEL_MAX_PENDING_COMMANDS 常量时
    // 才进行命令发送
    if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;

    /* If this is a slave of a master in O_DOWN condition we start sending
    * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
    * period. In this state we want to closely monitor slaves in case they
    * are turned into masters by another Sentinel, or by the sysadmin. */
    // 对于从服务器来说, sentinel 默认每 SENTINEL_INFO_PERIOD 秒向它发送一次 INFO 命令
    // 但是,当从服务器的主服务器处于 SDOWN 状态,或者正在执行故障转移时
    // 为了更快速地捕捉从服务器的变动, sentinel 会将发送 INFO 命令的频率该为每秒一次
    if ((ri->flags & SRI_SLAVE) &&
    (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
    info_period = 1000;
    } else {
    info_period = SENTINEL_INFO_PERIOD;
    }

    /* We ping instances every time the last received pong is older than
    * the configured 'down-after-milliseconds' time, but every second
    * anyway if 'down-after-milliseconds' is greater than 1 second. */
    ping_period = ri->down_after_period;
    if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;

    // 实例不是 Sentinel (主服务器或者从服务器)
    // 并且以下条件的其中一个成立:
    // 1)SENTINEL 未收到过这个服务器的 INFO 命令回复
    // 2)距离上一次该实例回复 INFO 命令已经超过 info_period 间隔
    // 那么向实例发送 INFO 命令
    if ((ri->flags & SRI_SENTINEL) == 0 &&
    (ri->info_refresh == 0 ||
    (now - ri->info_refresh) > info_period))
    {
    /* Send INFO to masters and slaves, not sentinels. */
    retval = redisAsyncCommand(ri->cc,
    sentinelInfoReplyCallback, NULL, "INFO");
    if (retval == REDIS_OK) ri->pending_commands++;
    } else if ((now - ri->last_pong_time) > ping_period) {
    /* Send PING to all the three kinds of instances. */
    sentinelSendPing(ri);
    } else if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
    /* PUBLISH hello messages to all the three kinds of instances. */
    sentinelSendHello(ri);
    }
    }

从主观下线到客观下线

  • 如果一个主服务器被标记为主观下线, 那么正在监视这个主服务器的所有 Sentinel 要以每秒一次的频率确认主服务器的确进入了主观下线状态。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    /* Perform scheduled operations for the specified Redis instance. */
    // 对给定的实例执行定期操作
    void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {

    ...

    /* ============== ACTING HALF ============= */
    /* ============== 故障检测 ============= */

    /* We don't proceed with the acting half if we are in TILT mode.
    * TILT happens when we find something odd with the time, like a
    * sudden change in the clock. */
    // 如果 Sentinel 处于 TILT 模式,那么不执行故障检测。
    if (sentinel.tilt) {

    // 如果 TILI 模式未解除,那么不执行动作
    if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;

    // 时间已过,退出 TILT 模式
    sentinel.tilt = 0;
    sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited");
    }

    /* Every kind of instance */
    // 检查给定实例是否进入 SDOWN 状态
    sentinelCheckSubjectivelyDown(ri);

    ...
    }

    /* Is this instance down from our point of view? */
    // 检查实例是否已下线(从本 Sentinel 的角度来看)
    void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {

    ...

    /* Update the SDOWN flag. We believe the instance is SDOWN if:
    *
    * 更新 SDOWN 标识。如果以下条件被满足,那么 Sentinel 认为实例已下线:
    *
    * 1) It is not replying.
    * 它没有回应命令
    * 2) We believe it is a master, it reports to be a slave for enough time
    * to meet the down_after_period, plus enough time to get two times
    * INFO report from the instance.
    * Sentinel 认为实例是主服务器,这个服务器向 Sentinel 报告它将成为从服务器,
    * 但在超过给定时限之后,服务器仍然没有完成这一角色转换。
    */
    if (elapsed > ri->down_after_period ||
    (ri->flags & SRI_MASTER &&
    ri->role_reported == SRI_SLAVE &&
    mstime() - ri->role_reported_time >
    (ri->down_after_period+SENTINEL_INFO_PERIOD*2)))
    {
    /* Is subjectively down */
    if ((ri->flags & SRI_S_DOWN) == 0) {
    // 发送事件
    sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@");
    // 记录进入 SDOWN 状态的时间
    ri->s_down_since_time = mstime();
    // 打开 SDOWN 标志
    ri->flags |= SRI_S_DOWN;
    }
    } else {
    // 移除(可能有的) SDOWN 状态
    /* Is subjectively up */
    if (ri->flags & SRI_S_DOWN) {
    // 发送事件
    sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@");
    // 移除相关标志
    ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
    }
    }
    }
    如果有足够数量的 Sentinel (至少要达到配置文件指定的数量)在指定的时间范围内同意这一判断, 那么这个主服务器被标记为客观下线
    这个数量是可以配置的,即quorum的数量。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    /* Perform scheduled operations for the specified Redis instance. */
    // 对给定的实例执行定期操作
    void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
    ...

    /* ============== ACTING HALF ============= */
    /* ============== 故障检测 ============= */

    ...这里省略SDOWN检测代码

    /* Only masters */
    /* 对主服务器进行处理 */
    if (ri->flags & SRI_MASTER) {

    // 判断 master 是否进入 ODOWN 状态
    sentinelCheckObjectivelyDown(ri);

    // 如果主服务器进入了 ODOWN 状态,那么开始一次故障转移操作
    if (sentinelStartFailoverIfNeeded(ri))
    // 强制向其他 Sentinel 发送 SENTINEL is-master-down-by-addr 命令
    // 刷新其他 Sentinel 关于主服务器的状态
    sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);

    // 执行故障转移
    sentinelFailoverStateMachine(ri);

    // 如果有需要的话,向其他 Sentinel 发送 SENTINEL is-master-down-by-addr 命令
    // 刷新其他 Sentinel 关于主服务器的状态
    // 这一句是对那些没有进入 if(sentinelStartFailoverIfNeeded(ri)) { /* ... */ }
    // 语句的主服务器使用的
    sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
    }
    }
  • 在一般情况下, 每个 Sentinel 会以每 10 秒一次的频率向它已知的所有主服务器和从服务器发送 INFO 命令。 当一个主服务器被 Sentinel 标记为客观下线时, Sentinel 向下线主服务器的所有从服务器发送 INFO 命令的频率会从 10 秒一次改为每秒一次。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {

    ...

    /* If this is a slave of a master in O_DOWN condition we start sending
    * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
    * period. In this state we want to closely monitor slaves in case they
    * are turned into masters by another Sentinel, or by the sysadmin. */
    // 对于从服务器来说, sentinel 默认每 SENTINEL_INFO_PERIOD 秒向它发送一次 INFO 命令
    // 但是,当从服务器的主服务器处于 SDOWN 状态,或者正在执行故障转移时
    // 为了更快速地捕捉从服务器的变动, sentinel 会将发送 INFO 命令的频率该为每秒一次
    if ((ri->flags & SRI_SLAVE) &&
    (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
    info_period = 1000;
    } else {
    info_period = SENTINEL_INFO_PERIOD;
    }

    ...

    // 实例不是 Sentinel (主服务器或者从服务器)
    // 并且以下条件的其中一个成立:
    // 1)SENTINEL 未收到过这个服务器的 INFO 命令回复
    // 2)距离上一次该实例回复 INFO 命令已经超过 info_period 间隔
    // 那么向实例发送 INFO 命令
    if ((ri->flags & SRI_SENTINEL) == 0 &&
    (ri->info_refresh == 0 ||
    (now - ri->info_refresh) > info_period))
    {
    /* Send INFO to masters and slaves, not sentinels. */
    retval = redisAsyncCommand(ri->cc,
    sentinelInfoReplyCallback, NULL, "INFO");
    if (retval == REDIS_OK) ri->pending_commands++;
    } else if
    ...
    }
    }
    注意上边发请求时使用的回调函数sentinelInfoReplyCallback
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // 处理 INFO 命令的回复
    void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
    sentinelRedisInstance *ri = c->data;
    redisReply *r;

    if (ri) ri->pending_commands--;
    if (!reply || !ri) return;
    r = reply;

    if (r->type == REDIS_REPLY_STRING) {
    // 解析info命令的响应数据
    sentinelRefreshInstanceInfo(ri,r->str);
    }
    }
  • 当没有足够数量的 Sentinel 同意主服务器已经下线,主服务器的客观下线状态就会被移除。
    当主服务器重新向 Sentinel 的 PING 命令返回有效回复时,主服务器的主观下线状态就会被移除。

故障转移 - 选举 Sentinel Leader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {

...

/* ============== ACTING HALF ============= */
/* ============== 故障检测 ============= */

...

/* Only masters */
/* 对主服务器进行处理 */
if (ri->flags & SRI_MASTER) {

// 判断 master 是否进入 ODOWN 状态
sentinelCheckObjectivelyDown(ri);

// 如果主服务器进入了 ODOWN 状态,那么开始一次故障转移操作
if (sentinelStartFailoverIfNeeded(ri))
// 强制向其他 Sentinel 发送 SENTINEL is-master-down-by-addr 命令
// 刷新其他 Sentinel 关于主服务器的状态
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);

// 执行故障转移
sentinelFailoverStateMachine(ri);

// 如果有需要的话,向其他 Sentinel 发送 SENTINEL is-master-down-by-addr 命令
// 刷新其他 Sentinel 关于主服务器的状态
// 这一句是对那些没有进入 if(sentinelStartFailoverIfNeeded(ri)) { /* ... */ }
// 语句的主服务器使用的
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
}
}

状态感知(info)

Sentinel服务器只需配置Master的地址,其他Slave的信息是通过定时(10秒)向Master发送info命令来获取的,info命令返回的信息中,包含了主从拓扑关系,其中包括每个slave的地址和端口号。有了这些信息后,哨兵就会记住这些节点的拓扑信息,在后续发生故障时,选择合适的slave节点进行故障恢复。
哨兵除了向master发送info之外,还会向每个master节点特殊的pubsub中发送master当前的状态信息和哨兵自身的信息,其他哨兵节点通过订阅这个pubsub,就可以拿到每个哨兵发来的信息。这么做的目的主要有2个:

  • 哨兵节点可以发现其他哨兵的加入,进而方便多个哨兵节点通信,为后续共同协商提供基础
  • 与其他哨兵节点交换master的状态信息,为后续判断master是否故障提供依据

心跳检测(ping)

故障发生时,需要立即启动故障恢复机制,那么Sentinel怎么及时地知道哪些节点发生故障了呢?这主要是通过向所有其他节点发送PING命令来实现的。
每个哨兵节点每隔1秒向master、slave、其他哨兵节点发送ping命令,如果对方能在指定时间内响应,说明节点健康存活。如果未在规定时间内(可配置)响应,那么该哨兵节点认为此节点主观下线

至于Sentinel怎么知道其他节点的地址,其实就是通过前面提到的info命令来感知的。

主观下线和客观下线

  • 主观下线(Subjectively Down, 简称 SDOWN)
    主观下线指的是单个 Sentinel 实例对服务器做出的下线判断。
    如果一个服务器没有在 master-down-after-milliseconds 选项所指定的时间内, 对向它发送 PING 命令的 Sentinel 返回一个有效回复(有效回复只有+PONG、-LOADING 错误或 -MASTERDOWN 错误), 那么 Sentinel 就会将这个服务器标记为主观下线。

    注意是在master-down-after-milliseconds时间内一直返回无效回复。

  • 客观下线(Objectively Down, 简称 ODOWN)
    客观下线指的是多个 Sentinel 实例在对同一个 Master 做出 SDOWN 判断, 并且通过 SENTINEL is-master-down-by-addr 命令互相交流之后,得出的服务器下线判断。 (一个 Sentinel 可以通过向另一个 Sentinel 发送 SENTINEL is-master-down-by-addr 命令来询问对方是否认为给定的服务器已下线。)
    从主观下线切换到客观下线并不是通过较严格的投票算法,而是采用了流言协议(gossip protocol):只要 Sentinel 在给定时间内从其他 Sentinel 接收到足够数量的 Master 下线通知,那么 Sentinel 就会执行状态的切换;如果之后其他 Sentinel 不再报告 Master 已下线,则客观下线状态就会被移除。
    只要一个 Sentinel 发现某个 Master 进入客观下线状态,之后就会进入故障迁移阶段,选举出一个 Sentinel 对失效的 Master 执行自动故障迁移操作。

    客观下线只适用于 Master,对 Slave 或 Sentinel 则不会达到客观下线状态。

故障迁移(Master 挂掉,Sentinel选举新Master)

单纯的主从架构并不能挽救 Master 挂掉的情况,因此引入了 Sentinel 集群。Sentinel 会不断地检查集群主服务器和从服务器是否运作正常,并在超过 n 个 Sentinel 同意后判断主节点失效(配置sentinel monitor mymaster 127.0.0.1 6379 2表示这个n=2),不过要注意,无论设置多少个 Sentinel 同意才能判断一个服务器失效, 一个 Sentinel 都需要获得系统中多数 Sentinel 的支持, 才能发起一次自动故障迁移。

  • 当一个主服务器不能正常工作时, Sentinel 会开始一次自动故障迁移操作,它会将失效主服务器的其中一个从服务器升级为新的主服务器,并让失效主服务器的其他从服务器改为复制新的主服务器;
  • 当客户端试图连接失效的主服务器时,集群也会向客户端返回新主服务器的地址,使得集群可以使用新主服务器代替失效服务器。

故障转移主要分为Sentinel选举和故障转移(Master替换)两个步骤,Sentinel选主流程如下:

  • Sentinel发现主服务器已经进入客观下线状态。
  • 利用Raft leader election算法选举 Sentinel 中的 Leader,对我们的当前 epoch 进行自增, 并尝试在这个epoch中当选,之后,所有 Sentinel 都以更高的 epoch 为准,并主动用更新的 epoch 代替自己的配置。
  • 如果当选失败, 那么在设定的故障迁移超时时间的两倍之后,重新尝试当选。 如果当选成功, 那么执行Slave的选主。

Slave选举

Slave选举的规则如下:

  • 在失效主服务器属下的从服务器当中, 那些被标记为主观下线、已断线、或者最后一次回复 PING 命令的时间大于五秒钟的从服务器都会被淘汰。
  • 在失效主服务器属下的从服务器当中, 那些与失效主服务器连接断开的时长超过 down-after 选项指定的时长十倍的从服务器都会被淘汰。
  • 在经历了以上两轮淘汰之后剩下来的从服务器中, 我们选出复制偏移量(replication offset)最大的那个从服务器作为新的主服务器; 如果复制偏移量不可用, 或者从服务器的复制偏移量相同, 那么带有最小运行 ID 的那个从服务器成为新的主服务器。

也就是说,多个Slave的优先级按照:slave-priority配置 > 数据完整性 > runid较小者进行选择。

之后所有Sentinel要进行投票选出一个Leader:
RedisSentinel投票

选出Leader后,Leader需要从现有的Slave中选出

故障转移

提升新的Master的流程如下:

  • 向被选中的从服务器发送 SLAVEOF NO ONE 命令,让它转变为主服务器。
  • 通过发布与订阅功能, 将更新后的配置传播给所有其他 Sentinel , 其他 Sentinel 对它们自己的配置进行更新。
  • 向已下线主服务器的从服务器发送 SLAVEOF 命令, 让它们去复制新的主服务器。
  • 当所有从服务器都已经开始复制新的主服务器时, 领头 Sentinel 终止这次故障迁移操作。

客户端感知新master流程如下:
哨兵在故障切换完成之后,会向自身节点的指定pubsub中写入一条信息,客户端可以订阅这个pubsub来感知master的变化通知。我们的客户端也可以通过在哨兵节点主动查询当前最新的master,来拿到最新的master地址。
另外,哨兵还提供了“钩子”机制,我们也可以在哨兵配置文件中配置一些脚本逻辑,在故障切换完成时,触发“钩子”逻辑,通知客户端发生了切换,让客户端重新在哨兵上获取最新的master地址。
一般来说,推荐采用第一种方式进行处理,很多客户端SDK中已经集成好了从哨兵节点获取最新master的方法,我们直接使用即可。

Sentinel 选举的安全性

配置安全性:

  • 每当一个 Redis 实例被重新配置(reconfigured) —— 无论是被设置成主服务器、从服务器、又或者被设置成其他主服务器的从服务器 —— Sentinel 都会向被重新配置的实例发送一个 CONFIG REWRITE 命令, 从而确保这些配置会持久化在硬盘里。完成重新配置之后,从服务器会去复制正确的主服务器。
  • Sentinel 的状态会被持久化到 Sentinel 配置文件里,当 Sentinel 接收到新配置或 Leader Sentinel 为 Master 创建一个新配置时,这些配置都会与epoch一起被保存到磁盘;

故障自动迁移的一致性:

  • Raft 算法保证在一个 epoch 里只有一个 Leader Sentinel 产生,减少了脑裂的风险;
  • Sentinel 集群总是以更高的 epoch 为准,因为发生网络分区(network partition)时可能会有 Sentinel 包含老的配置,而当这个 Sentinel 服务器接收到其他 Sentinel 的版本更新配置时就会进行更新。
  • 发生网络分区并且某些 Sentinel 仍在采用老的配置时,如果有客户端连接到这些 Sentinel 上,最终可能就会将请求转发到非 Master 服务器上,造成数据不一致。因此,应该使用 min-slaves-to-write 选项, 让主服务器在连接的从实例少于给定数量时停止执行写操作, 与此同时, 应该在每个运行 Redis 主服务器或从服务器的机器上运行 Redis Sentinel 进程。

Sentinel 故障迁移的实时性

故障迁移虽然能提供主从切换来保证挂掉的Master能被其他Slave顶替上来,但是这个顶替过程大概需要多长时间呢?具体又是哪些步骤会比较耗时?

  1. 判断Master下线
    Sentinel会PING Master,如果距离上次PING成功的时间超过了master-down-after-milliseconds时间,则表示主观下线了。
    将Master标记为SDOWN后,这个Sentinel会通过发事件消息来通知其他Sentinel。

    Cluster中是通过gossip协议来通知其他节点的。

  2. 重新选主

  3. Slave提升

这个实时性的讨论并不是纯粹的极客行为,因为切换要多长时间是评估我们服务可用性的重要指标,并且提供后续优化的指导方向。

TILT 模式

TILT 模式是一种特殊的保护模式,Sentinel 每隔 100ms 会向实例发一次PING命令,并将上一次 PING 成功的时间和当前时间比对,从而知道与该实例有多长时间没有进行任何成功通讯:

  • 如果两次调用时间之间的差距为负值, 或者非常大(超过 2 秒钟), 那么 Sentinel 进入 TILT 模式。
  • 如果 Sentinel 已经进入 TILT 模式, 那么 Sentinel 延迟退出 TILT 模式的时间。

    Sentinel严重依赖计算机的时间功能,一旦计算机的时间功能出现故障, 或者计算机非常忙碌, 又或者进程因为某些原因而被阻塞时, Sentinel 可能也会跟着出现故障。

进入 TILT 模式后,Sentinel 仍然会继续监视所有目标,但是:

  • 它不再执行任何操作,比如故障转移。
  • 当有实例向这个 Sentinel 发送 SENTINEL is-master-down-by-addr 命令时, Sentinel 返回负值: 因为这个 Sentinel 所进行的下线判断已经不再准确。

TILT 相当于降级,如果 Sentinel 可以在 TILT 模式下正常维持 30s,那么 Sentinel 会退出 TILT 模式。

BUSY 状态

当 Lus 脚本执行时间超过阈值,Redis 会返回BUSY错误,当出现这种情况时, Sentinel 在尝试执行故障转移操作之前, 会先向服务器发送一个 SCRIPT KILL 命令, 如果服务器正在执行的是一个只读脚本的话, 那么这个脚本就会被杀死, 服务器就会回到正常状态。

脑裂

虽然Sentinel利用Raft选举不会发生脑裂,但是在一些极端的情况下还是有可能会发生脑裂的,比如:

  1. 原Master不能提供服务了,但是它本身并没有挂掉;
  2. Sentinel发现连不上Master,于是判定客观下线,并发起主从切换;
  3. 原Master和新Master同时给Client提供服务,发生脑裂。

这种脑裂并不会影响可用性,但是却破坏了数据的一致性,甚至会导致数据丢失:在Sentinel重连上原Master后,会将其归入到新Master的Slave,这时脑裂期间的数据就会被从新Master上复制过来的数据覆盖掉了,导致数据的丢失。

脑裂的解决办法主要是以下两个配置参数:

  • min-slaves-to-write:这个配置项设置了主库能进行数据同步的最少从库数量;
  • min-slaves-max-lag:这个配置项设置了主从库间进行数据复制时,从库给主库发送 ACK 消息的最大延迟(以秒为单位)。

QA

5个哨兵的集群,quorum设置为2,在运行过程中,有3个实例都发生了故障,这时主库也发生了故障,还能正确判断主库的客观下线吗?还能执行主从的自动切换吗?

判断客户端下线是可以的,因为判断ODOWN的条件是有不少于quorum数量的Sentinel同意即可。
不可执行主从切换,因为一个哨兵要执行主从切换,得获得半数以上哨兵的投票同意,也就是3个哨兵。

哨兵实例是不是越多越好?

哨兵实例越多,误判率越低,但是判断主库下线和选举Leader时实例要拿到的赞成票也越多,主从切换花费的时间也相对会更多。
如果客户端对Redis的响应时间有要求,则很有可能会报警。

调大down-after-milliseconds对减少误判是不是有好处?

这个值的作用是:判断距离上次PING成功的时间超过了这个值,就标记实例主观下线。
调大的话Sentinel需要更长的时间才能判断集群出问题了,也即影响到Redis的可用性。

参考

  1. Sentinel

Redis Cluster介绍

Cluster 优势

  1. 线性的可扩展性:扩容即迁移槽,已有很多迁移案例;
    如果要保存更多的数据,可以直接增加Master来支持,比如每台Master存32G,要完美存下1T数据的话,可以设置32台的Master,当然实际情况下这样非常浪费,一般会少设置一些,只用几台Master来存储最热的数据。
  2. 没有合并操作:因为 Redis 中的 List 和 Set 中保存的 Value 通常是比较大的,可能会达数以百万计的元素,而它们可能被存储到了不同的 Redis 实例上,传输和合并这样的值将很容易称为一个主要的性能瓶颈;
  3. 写入安全(Write Safety):只有在非常少见的 Master 宕机的情况下,写入才会失败,并且这个失败的时间窗口不大(由一个 Slave 顶替上来);
  4. 可用性(Availability):就算有部分 Master 不可用了,它们的 Slave 仍然可以通过选举提升为 Master。

Cluster 缺点

  1. Redis 集群并不支持处理多个 keys 的命令,因为这需要在不同的节点间移动数据,从而达不到像 Redis 那样的性能,在高负载的情况下可能会导致不可预料的错误。
  2. Redis 集群不像单机版本的 Redis 那样支持多个数据库,集群只有数据库 0,而且也不支持 SELECT 命令。

Cluster的去中心化架构

Cluster
redis cluster在设计的时候,就考虑到了去中心化,去中间件,也就是说,集群中的每个节点都是平等的关系,都是对等的,每个节点都保存各自的数据和整个集群的状态。所有的 redis 节点彼此互联(PING-PONG 机制),内部使用二进制协议优化传输速度和带宽,而且这些连接保持活跃,这样就保证了我们只需要连接集群中的任意一个节点,就可以获取到其他节点的数据。客户端与 redis 节点直连,不需要中间 proxy 层,客户端不需要连接集群所有节点,连接集群中任何一个可用节点即可。

一致性

Redis 不能保证强一致性,因为:

  1. 异步复制:写操作会被异步复制到 slave 节点,但可能由于出现网络分区、脑裂而导致数据丢失。
    网络分区
    如上图所示,客户端 Z1 向 Master-B 写入数据后,集群出现了网络分区,且分区持续的时间足够长导致此时 B1 被选举为新的 Master,则在此期间 Z1 向 B 写入的数据就都丢失了。

    网络分区出现期间,客户端 Z1 可以向主节点 B 发送写命令的最大时间是有限制的, 这一时间限制称为节点超时时间(node timeout), 是 Redis 集群的一个重要的配置选项。

Cluster VS Codis

Codis 集群中包含了 4 类关键组件。

  • codis server:这是进行了二次开发的 Redis 实例,其中增加了额外的数据结构,支持数据迁移操作,主要负责处理具体的数据读写请求。
  • codis proxy:接收客户端请求,并把请求转发给 codis server。
  • Zookeeper 集群:保存集群元数据,例如数据位置信息和 codis proxy 信息。
  • codis dashboard 和 codis fe:共同组成了集群管理工具。其中,codis dashboard 负责执行集群管理工作,包括增删 codis server、codis proxy 和进行数据迁移。而 codis fe 负责提供 dashboard 的 Web 操作界面,便于我们直接在 Web 界面上进行集群管理。

Codis如何处理一次请求:

  1. 客户端连接Codis Proxy,将请求发给Proxy;
  2. codis proxy 接收到请求,就会查询请求数据和 codis server 的映射关系,并把请求转发给相应的 codis server 进行处理
  3. 当 codis server 处理完请求后,会把结果返回给 codis proxy,proxy 再把数据返回给客户端。

以4个方面来讨论

  • 数据分布
    和Cluster类似,Codis将数据保存到slot,集群一共有1024个slot,需要手动分配给Codis Server,或者由dashboard自动分配。
    当客户端要读写数据时,会使用CRC32算法计算key的哈希值,并对1024取模,就可以知道对应的是哪个slot了。
    这个路由规则需要先配置到dashboard,dashboard会把路由表发送给codis proxy,并同时保存到ZooKeeper。
    而Cluster的数据路由表是由每个实例管理的,如果发生变化,这些实例会通过Gossip协议来互相传播,如果实例比较多,就会占用比较多的网络资源。
  • 集群扩容和数据迁移
    如果要增加Codis Server来负载slot,需要配置要迁移的slot,Codis Server会将该slot中的数据一个一个地发送给目标Server。
    增加Codis Proxy也是类似的流程。
  • 客户端兼容性
    Codis客户端直接和Codis Proxy连接,codis proxy 是和单实例客户端兼容的,而集群相关的管理工作又都是由Codis Proxy和Codis dashboard这些组件来完成的,不需要客户端参与。
  • 可靠性保证
    Codis Server保证可靠性:Codis Server本身是Redis实例,只是增加了集群相关的操作命令,可靠性是可以通过主从机制+哨兵来实现的。
    Codis Proxy的可靠性:Proxy上的信息都来自ZooKeeper,例如路由表,只要ZooKeeper集群中实例半数以上可以正常工作,那么ZooKeeper集群就是正常的。

比较:
RedisCluster比对Codis

  • 从稳定性和成熟度来看,Codis 应用得比较早,在业界已经有了成熟的生产部署。虽然 Codis 引入了 proxy 和 Zookeeper,增加了集群复杂度,但是,proxy 的无状态设计和 Zookeeper 自身的稳定性,也给 Codis 的稳定使用提供了保证。而 Redis Cluster 的推出时间晚于 Codis,相对来说,成熟度要弱于 Codis,如果你想选择一个成熟稳定的方案,Codis 更加合适些。
  • 从业务应用客户端兼容性来看,连接单实例的客户端可以直接连接 codis proxy,而原本连接单实例的客户端要想连接 Redis Cluster 的话,就需要开发新功能。所以,如果你的业务应用中大量使用了单实例的客户端,而现在想应用切片集群的话,建议你选择 Codis,这样可以避免修改业务应用中的客户端。
  • 从使用 Redis 新命令和新特性来看,Codis server 是基于开源的 Redis 3.2.8 开发的,所以,Codis 并不支持 Redis 后续的开源版本中的新增命令和数据类型。另外,Codis 并没有实现开源 Redis 版本的所有命令,比如 BITOP、BLPOP、BRPOP,以及和与事务相关的 MUTLI、EXEC 等命令。Codis 官网上列出了不被支持的命令列表,你在使用时记得去核查一下。所以,如果你想使用开源 Redis 版本的新特性,Redis Cluster 是一个合适的选择。
  • 从数据迁移性能维度来看,Codis 能支持异步迁移,异步迁移对集群处理正常请求的性能影响要比使用同步迁移的小。所以,如果你在应用集群时,数据迁移比较频繁的话,Codis 是个更合适的选择。

数据分布(分区)和查询路由

分区策略

分区将原来比较大的数据集分离存储到多个存储媒介上,分区后Redis可以管理更大的内存空间和计算能力,但同时多主机又会面临很多分布式集群的可用性、一致性等问题。
分区策略:

  1. 范围分区
    将不同范围的对象映射到不同的Redis实例,比如用户ID为0到10000的存储到R0,10001到20000的存储到R1,以此类推。
    缺点是需要建立一张映射表,谨小甚微地维护ID和Redis实例之间的映射关系,而且由于需要维护表,导致效率不如其他方案。
  2. 散列分区
    使用散列函数(如CRC32)将key转换为一个数字,取模得到一个0到3的数字(假设Redis服务器有4台),这个数字即对应服务器的序号。
  3. 一致性哈希
    一致性哈希的一种示例实现可以参考Dubbo中的实现:com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance
    关键代码如下:
    ConsistentHashLoadBalance
  4. 哈希槽
    Redis Cluster采用的是哈希槽的方式。
    Redis 集群没有并使用传统的一致性哈希来分配数据,而是采用另外一种叫做哈希槽 (hash slot)的方式来分配的。redis cluster 默认分配了 16384 个 slot,当我们 set 一个 key 时,会用CRC16算法来取模得到所属的 slot,然后将这个 key 分到哈希槽区间的节点上,具体算法就是:CRC16(key) % 16384。所以我们在测试的时候看到 set 和 get 的时候,直接跳转到了 7000 端口的节点。
    客户端在接收到重定向错误(redirections errors) -MOVED 和 -ASK 的时候, 将命令重定向到其他节点。客户端不需要存储集群信息(槽所在位置),但是如何客户端可以缓存键值和节点之间的映射关系,就可以明显提高命令执行的效率了(Redisson 中就是这么做的)。
    在 Cluster 架构中,slave 节点不分配槽,只拥有读权限,但是在代码中 cluster 执行读写操作的都是 master 节点,并不是读就是从节点、写就是主节点。

源码中,Redis采用一个大小固定为CLUSTER_SLOTS的clusterNode数组slots来保存每个桶的负责节点,这是个字节数组,每个位表示当前节点是否负责这个槽:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
// 节点状态
struct clusterNode {

// 创建节点的时间
mstime_t ctime; /* Node object creation time. */

// 节点的名字,由 40 个十六进制字符组成
// 例如 68eef66df23420a5862208ef5b1a7005b806f2ff
char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */

// 节点标识
// 使用各种不同的标识值记录节点的角色(比如主节点或者从节点),
// 以及节点目前所处的状态(比如在线或者下线)。
int flags; /* REDIS_NODE_... */

// 节点当前的配置纪元,用于实现故障转移
uint64_t configEpoch; /* Last configEpoch observed for this node */

// 由这个节点负责处理的槽
// 一共有 REDIS_CLUSTER_SLOTS / 8 个字节长
// 每个字节的每个位记录了一个槽的保存状态
// 位的值为 1 表示槽正由本节点处理,值为 0 则表示槽并非本节点处理
// 比如 slots[0] 的第一个位保存了槽 0 的保存情况
// slots[0] 的第二个位保存了槽 1 的保存情况,以此类推
unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */

// 该节点负责处理的槽数量
int numslots; /* Number of slots handled by this node */

// 如果本节点是主节点,那么用这个属性记录从节点的数量
int numslaves; /* Number of slave nodes, if this is a master */

// 指针数组,指向各个从节点
struct clusterNode **slaves; /* pointers to slave nodes */

// 如果这是一个从节点,那么指向主节点
struct clusterNode *slaveof; /* pointer to the master node */

// 最后一次发送 PING 命令的时间
mstime_t ping_sent; /* Unix time we sent latest ping */

// 最后一次接收 PONG 回复的时间戳
mstime_t pong_received; /* Unix time we received the pong */

// 最后一次被设置为 FAIL 状态的时间
mstime_t fail_time; /* Unix time when FAIL flag was set */

// 最后一次给某个从节点投票的时间
mstime_t voted_time; /* Last time we voted for a slave of this master */

// 最后一次从这个节点接收到复制偏移量的时间
mstime_t repl_offset_time; /* Unix time we received offset for this node */

// 这个节点的复制偏移量
long long repl_offset; /* Last known repl offset for this node. */

// 节点的 IP 地址
char ip[REDIS_IP_STR_LEN]; /* Latest known IP address of this node */

// 节点的端口号
int port; /* Latest known port of this node */

// 保存连接节点所需的有关信息
clusterLink *link; /* TCP/IP link with this node */

// 一个链表,记录了所有其他节点对该节点的下线报告
list *fail_reports; /* List of nodes signaling this as failing */

};
typedef struct clusterNode clusterNode;

// 集群状态,每个节点都保存着一个这样的状态,记录了它们眼中的集群的样子。
// 另外,虽然这个结构主要用于记录集群的属性,但是为了节约资源,
// 有些与节点有关的属性,比如 slots_to_keys 、 failover_auth_count
// 也被放到了这个结构里面。
typedef struct clusterState {

// 指向当前节点的指针
clusterNode *myself; /* This node */

// 集群当前的配置纪元,用于实现故障转移
uint64_t currentEpoch;

// 集群当前的状态:是在线还是下线
int state; /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */

// 集群中至少处理着一个槽的节点的数量。
int size; /* Num of master nodes with at least one slot */

// 集群节点名单(包括 myself 节点)
// 字典的键为节点的名字,字典的值为 clusterNode 结构
dict *nodes; /* Hash table of name -> clusterNode structures */

// 节点黑名单,用于 CLUSTER FORGET 命令
// 防止被 FORGET 的命令重新被添加到集群里面
// (不过现在似乎没有在使用的样子,已废弃?还是尚未实现?)
dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */

// 记录要从当前节点迁移到目标节点的槽,以及迁移的目标节点
// migrating_slots_to[i] = NULL 表示槽 i 未被迁移
// migrating_slots_to[i] = clusterNode_A 表示槽 i 要从本节点迁移至节点 A
clusterNode *migrating_slots_to[REDIS_CLUSTER_SLOTS];

// 记录要从源节点迁移到本节点的槽,以及进行迁移的源节点
// importing_slots_from[i] = NULL 表示槽 i 未进行导入
// importing_slots_from[i] = clusterNode_A 表示正从节点 A 中导入槽 i
clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS];

// 负责处理各个槽的节点
// 例如 slots[i] = clusterNode_A 表示槽 i 由节点 A 处理
clusterNode *slots[REDIS_CLUSTER_SLOTS];

// 跳跃表,表中以槽作为分值,键作为成员,对槽进行有序排序
// 当需要对某些槽进行区间(range)操作时,这个跳跃表可以提供方便
// 具体操作定义在 db.c 里面
zskiplist *slots_to_keys;

/* The following fields are used to take the slave state on elections. */
// 以下这些域被用于进行故障转移选举

// 上次执行选举或者下次执行选举的时间
mstime_t failover_auth_time; /* Time of previous or next election. */

// 节点获得的投票数量
int failover_auth_count; /* Number of votes received so far. */

// 如果值为 1 ,表示本节点已经向其他节点发送了投票请求
int failover_auth_sent; /* True if we already asked for votes. */

int failover_auth_rank; /* This slave rank for current auth request. */

uint64_t failover_auth_epoch; /* Epoch of the current election. */

/* Manual failover state in common. */
/* 共用的手动故障转移状态 */

// 手动故障转移执行的时间限制
mstime_t mf_end; /* Manual failover time limit (ms unixtime).
It is zero if there is no MF in progress. */
/* Manual failover state of master. */
/* 主服务器的手动故障转移状态 */
clusterNode *mf_slave; /* Slave performing the manual failover. */
/* Manual failover state of slave. */
/* 从服务器的手动故障转移状态 */
long long mf_master_offset; /* Master offset the slave needs to start MF
or zero if stil not received. */
// 指示手动故障转移是否可以开始的标志值
// 值为非 0 时表示各个主服务器可以开始投票
int mf_can_start; /* If non-zero signal that the manual failover
can start requesting masters vote. */

/* The followign fields are uesd by masters to take state on elections. */
/* 以下这些域由主服务器使用,用于记录选举时的状态 */

// 集群最后一次进行投票的纪元
uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */

// 在进入下个事件循环之前要做的事情,以各个 flag 来记录
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */

// 通过 cluster 连接发送的消息数量
long long stats_bus_messages_sent; /* Num of msg sent via cluster bus. */

// 通过 cluster 接收到的消息数量
long long stats_bus_messages_received; /* Num of msg rcvd via cluster bus.*/

} clusterState;

分区的实现层次

分区可以在程序的不同层次实现。

  • 客户端分区
    就是在客户端就已经决定数据会被存储到哪个redis节点或者从哪个redis节点读取。大多数客户端已经实现了客户端分区。
  • 代理分区
    意味着客户端将请求发送给代理,然后代理决定去哪个节点写数据或者读数据。代理根据分区规则决定请求哪些Redis实例,然后根据Redis的响应结果返回给客户端。redis和memcached的一种代理实现就是Twemproxy
  • 查询路由(Query routing)
    意思是客户端随机地请求任意一个redis实例,然后由Redis将请求转发给正确的Redis节点。Redis Cluster实现了一种混合形式的查询路由,但并不是直接将请求从一个redis节点转发到另一个redis节点,而是在客户端的帮助下直接redirected到正确的redis节点。

Redis Cluster采用的是查询路由的方式。
在Cluster模式下,Redis接收任何命令都会首先计算键对应的桶编号,再根据桶找出所对应的节点,如果节点是自身,则处理键命令;否则回复MOVED重定向错误,通知客户端请求正确的节点,这个过程称为MOVED重定向。
在客户端初次连接Redis集群时,如果客户端是Smart Client,它会获取集群的节点信息及slot的分布信息,并在本地缓存一份 hash slot 与node关系的路由表,这样不必每次访问服务器时都因为重定向而经过多次网络调用。
redis-cli不是smart client,它没有缓存路由表的功能;Java客户端Redisson是smart client,它在初始化时会调用redis实例的CLUSTER NODES命令来获取集群中每个Master负责的slot范围,并启动一个定时任务来每秒刷新本地缓存的集群状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
/**
* 启动时查询集群状态
*/
public ClusterConnectionManager(ClusterServersConfig cfg, Config config) {
super(config);

this.config = create(cfg);
initTimer(this.config);

Throwable lastException = null;
List<String> failedMasters = new ArrayList<String>();
for (URI addr : cfg.getNodeAddresses()) {
RFuture<RedisConnection> connectionFuture = connect(cfg, addr);
try {
RedisConnection connection = connectionFuture.syncUninterruptibly().getNow();

// 发送cluster nodes命令

clusterNodesCommand = RedisCommands.CLUSTER_NODES;
if ("rediss".equals(addr.getScheme())) {
clusterNodesCommand = RedisCommands.CLUSTER_NODES_SSL;
}

List<ClusterNodeInfo> nodes = connection.sync(clusterNodesCommand);

StringBuilder nodesValue = new StringBuilder();
for (ClusterNodeInfo clusterNodeInfo : nodes) {
nodesValue.append(clusterNodeInfo.getNodeInfo()).append("\n");
}
log.info("Redis cluster nodes configuration got from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue);

lastClusterNode = addr;

// 读取每个节点的分区配置

Collection<ClusterPartition> partitions = parsePartitions(nodes);

...

} catch (Exception e) {
lastException = e;
log.warn(e.getMessage());
}
}

...

// 每秒定时刷新本地缓存的cluster状态,包括每个Master节点负责的slot范围
scheduleClusterChangeCheck(cfg, null);
}

/**
* 获取key所处的节点
*/
private NodeSource getNodeSource(String key) {
int slot = connectionManager.calcSlot(key);
MasterSlaveEntry entry = connectionManager.getEntry(slot);
return new NodeSource(entry);
}

/**
* 获取key所处的节点
*/
protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final Codec codec,
// 建立连接、发送命令
final RFuture<RedisConnection> connectionFuture;
if (readOnlyMode) {
connectionFuture = connectionManager.connectionReadOp(source, command);
} else {
connectionFuture = connectionManager.connectionWriteOp(source, command);
}

...

connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
if (connFuture.isCancelled()) {
return;
}
// 如果执行不成功,则设置异常信息
if (!connFuture.isSuccess()) {
connectionManager.getShutdownLatch().release();
details.setException(convertException(connectionFuture));
return;
}
// 如果执行OK,释放连接
if (details.getAttemptPromise().isDone() || details.getMainPromise().isDone()) {
releaseConnection(source, connectionFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details);
return;
}

final RedisConnection connection = connFuture.getNow();
// 响应ASK的情况
if (details.getSource().getRedirect() == Redirect.ASK) {
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2);
RPromise<Void> promise = connectionManager.newPromise();
list.add(new CommandData<Void, Void>(promise, details.getCodec(), RedisCommands.ASKING, new Object[]{}));
list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
RPromise<Void> main = connectionManager.newPromise();
ChannelFuture future = connection.send(new CommandsData(main, list));
details.setWriteFuture(future);
}
// 响应MOVED的情况
else {
if (log.isDebugEnabled()) {
log.debug("acquired connection for command {} and params {} from slot {} using node {}... {}",
details.getCommand(), Arrays.toString(details.getParams()), details.getSource(), connection.getRedisClient().getAddr(), connection);
}
ChannelFuture future = connection.send(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
details.setWriteFuture(future);
}

details.getWriteFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
checkWriteFuture(details, connection);
}
});

releaseConnection(source, connectionFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details);
}
});
}

下面是手动调用cluster nodes可以得到的响应,从中可以看到每个master所负责的slot范围:

1
2
3
4
5
6
7
8
9
10
hgc@hgc-X555LD:~$ redis-cli -h 10.32.64.12 -p 16371 -c
10.32.64.12:16371> auth 123456
OK
10.32.64.12:16371> cluster nodes
d0af93527054ae3713c6ae82f4f58e016c4968d7 10.32.64.161:16371@26371 slave dbf60db7dc4c2d8ea944481d162bf6be7ef48f5a 0 1604569999114 28 connected
20def2dff31ba78c04f72431a51054def2120638 10.32.64.161:16372@26372 master - 0 1604569998112 31 connected 0-5460
b0c86436cd4cf6d2240faf01b45735616b82cae8 10.32.64.12:16371@26371 myself,slave 20def2dff31ba78c04f72431a51054def2120638 0 1604569995000 30 connected
dbf60db7dc4c2d8ea944481d162bf6be7ef48f5a 10.32.64.162:16372@26372 master - 0 1604569996000 28 connected 5461-10922
005c670071b5dab4ef085613f0ca6666fc5bcbce 10.32.64.12:16373@26373 slave df7f690feee2ad536f2573b55190e0f8d576779e 0 1604569998000 25 connected
df7f690feee2ad536f2573b55190e0f8d576779e 10.32.64.162:16373@26373 master - 0 1604569997000 25 connected 10923-16383

如果Cluster发生了扩容缩容或failover导致客户端缓存的信息过期,客户端只需要MOVED时重新更新本地缓存即可。
但是这里有一个问题,如果扩容缩容时正在发生槽迁移,这时正在迁移中的槽在哪个节点是不确定的,可能会导致客户端本地缓存的频繁更新。因此,Redis迁移过程中,会对正在迁移的槽打标记(server.cluster->migrating_slots_to),如果客户端访问的key命中了正在迁移中的槽,则服务器会返回ASK而不是MOVED,客户端接收到ASK后不会重新更新本地的槽缓存。
代码:redis.c/processCommand

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/* If cluster is enabled perform the cluster redirection here.
*
* 如果开启了集群模式,那么在这里进行转向操作。
*
* However we don't perform the redirection if:
*
* 不过,如果有以下情况出现,那么节点不进行转向:
*
* 1) The sender of this command is our master.
* 命令的发送者是本节点的主节点
*
* 2) The command has no key arguments.
* 命令没有 key 参数
*/
if (server.cluster_enabled &&
!(c->flags & REDIS_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
{
int hashslot;

// 集群已下线
if (server.cluster->state != REDIS_CLUSTER_OK) {
flagTransaction(c);
addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n"));
return REDIS_OK;

// 集群运作正常
} else {
int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
// 不能执行多键处理命令
if (n == NULL) {
flagTransaction(c);
if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) {
addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
} else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) {
/* The request spawns mutliple keys in the same slot,
* but the slot is not "stable" currently as there is
* a migration or import in progress. */
addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
} else {
redisPanic("getNodeByQuery() unknown error.");
}
return REDIS_OK;

// 命令针对的槽和键不是本节点处理的,进行转向
} else if (n != server.cluster->myself) {
flagTransaction(c);
// -<ASK or MOVED> <slot> <ip>:<port>
// 例如 -ASK 10086 127.0.0.1:12345
addReplySds(c,sdscatprintf(sdsempty(),
"-%s %d %s:%d\r\n",
(error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
hashslot,n->ip,n->port));

return REDIS_OK;
}

// 如果执行到这里,说明键 key 所在的槽由本节点处理
// 或者客户端执行的是无参数命令
}
}

分区的缺点

有些特性在分区的情况下会受到限制:

  • 涉及多个key的操作通常不会被支持。例如你不能对两个集合求交集,因为他们可能被存储到不同的Redis实例(实际上这种情况也有办法,但是不能直接使用交集指令)。
    同时操作多个key,则不能使用Redis事务.
  • 分区使用的粒度是key,不能使用一个非常长的排序key存储一个数据集(The partitioning granularity is the key, so it is not possible to shard a dataset with a single huge key like a very big sorted set).
  • 当使用分区的时候,数据处理会非常复杂,例如为了备份你必须从不同的Redis实例和主机同时收集RDB / AOF文件。
  • 分区时动态扩容或缩容可能非常复杂。Redis集群在运行时增加或者删除Redis节点,能做到最大程度对用户透明地数据再平衡,但其他一些客户端分区或者代理分区方法则不支持这种特性。然而,有一种预分片的技术也可以较好的解决这个问题。

当要把Redis当作持久化存储时,需要注意分区的性质

  • 如果Redis被当做缓存使用,使用一致性哈希实现动态扩容缩容。
  • 如果Redis被当做一个持久化存储使用,必须使用固定的keys-to-nodes映射关系,节点的数量一旦确定不能变化。否则的话(即Redis节点需要动态变化的情况),必须使用可以在运行时进行数据再平衡的一套系统,现在Redis Cluster已经支持这种再平衡。

节点通信

Redis Cluster采用Gossip协议完成集群状态数据及路由数据等元数据的管理。
一种简单的集群内状态同步思路是:每次节点都将自己本地的集群状态数据广播到集群内所有N个节点,其他节点判断接收到的数据比本地的新则更新本地数据。但是这种方式的缺点是通信量剧增,网络带宽变得紧张。
因此Redis采用Gossip协议来进行集群内元数据的同步,而且:
1、每次只随机选择K(K << N)个其他节点来同步状态;
集群内每个节点维护定时任务默认每秒执行10次,每秒会随机选取5个节点找出最久没有通信的节点发送ping消息,用于保证Gossip信息交换的随机性。每100毫秒都会扫描本地节点列表,如果发现节点最近一次接受pong消息的时间大于cluster_node_timeout/2,则立刻发送ping消息,防止该节点信息太长时间未更新。根据以上规则得出每个节点每秒需要发送ping消息的数量=1+10*num(node.pong_received>cluster_node_timeout/2),因此cluster_node_timeout参数对消息发送的节点数量影响非常大。当我们的带宽资源紧张时,可以适当调大这个参数,如从默认15秒改为30秒来降低带宽占用率。过度调大cluster_node_timeout会影响消息交换的频率从而影响故障转移、槽信息更新、新节点发现的速度。因此需要根据业务容忍度和资源消耗进行平衡。同时整个集群消息总交换量也跟节点数成正比。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
/* This is executed 10 times every second */
// 集群常规操作函数,默认每秒执行 10 次(每间隔 100 毫秒执行一次)
void clusterCron(void) {
dictIterator *di;
dictEntry *de;
int update_state = 0;
int orphaned_masters; /* How many masters there are without ok slaves. */
int max_slaves; /* Max number of ok slaves for a single master. */
int this_slaves; /* Number of ok slaves for our master (if we are slave). */
mstime_t min_pong = 0, now = mstime();
clusterNode *min_pong_node = NULL;
// 迭代计数器,一个静态变量
static unsigned long long iteration = 0;
mstime_t handshake_timeout;

// 记录一次迭代
iteration++; /* Number of times this function was called so far. */

/* The handshake timeout is the time after which a handshake node that was
* not turned into a normal node is removed from the nodes. Usually it is
* just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
* the value of 1 second. */
// 如果一个 handshake 节点没有在 handshake timeout 内
// 转换成普通节点(normal node),
// 那么节点会从 nodes 表中移除这个 handshake 节点
// 一般来说 handshake timeout 的值总是等于 NODE_TIMEOUT
// 不过如果 NODE_TIMEOUT 太少的话,程序会将值设为 1 秒钟
handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000;

/* Check if we have disconnected nodes and re-establish the connection. */
// 向集群中的所有断线或者未连接节点发送消息
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

// 跳过当前节点以及没有地址的节点
if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;

/* A Node in HANDSHAKE state has a limited lifespan equal to the
* configured node timeout. */
// 如果 handshake 节点已超时,释放它
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
freeClusterNode(node);
continue;
}

// 为未创建连接的节点创建连接
if (node->link == NULL) {
int fd;
mstime_t old_ping_sent;
clusterLink *link;

fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
node->port+REDIS_CLUSTER_PORT_INCR,
server.bindaddr_count ? server.bindaddr[0] : NULL);
if (fd == -1) {
redisLog(REDIS_DEBUG, "Unable to connect to "
"Cluster Node [%s]:%d -> %s", node->ip,
node->port+REDIS_CLUSTER_PORT_INCR,
server.neterr);
continue;
}
link = createClusterLink(node);
link->fd = fd;
node->link = link;
aeCreateFileEvent(server.el,link->fd,AE_READABLE,
clusterReadHandler,link);
/* Queue a PING in the new connection ASAP: this is crucial
* to avoid false positives in failure detection.
*
* If the node is flagged as MEET, we send a MEET message instead
* of a PING one, to force the receiver to add us in its node
* table. */
// 向新连接的节点发送 PING 命令,防止节点被识进入下线
// 如果节点被标记为 MEET ,那么发送 MEET 命令,否则发送 PING 命令
old_ping_sent = node->ping_sent;
clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);

// 这不是第一次发送 PING 信息,所以可以还原这个时间
// 等 clusterSendPing() 函数来更新它
if (old_ping_sent) {
/* If there was an active ping before the link was
* disconnected, we want to restore the ping time, otherwise
* replaced by the clusterSendPing() call. */
node->ping_sent = old_ping_sent;
}

/* We can clear the flag after the first packet is sent.
*
* 在发送 MEET 信息之后,清除节点的 MEET 标识。
*
* If we'll never receive a PONG, we'll never send new packets
* to this node. Instead after the PONG is received and we
* are no longer in meet/handshake status, we want to send
* normal PING packets.
*
* 如果当前节点(发送者)没能收到 MEET 信息的回复,
* 那么它将不再向目标节点发送命令。
*
* 如果接收到回复的话,那么节点将不再处于 HANDSHAKE 状态,
* 并继续向目标节点发送普通 PING 命令。
*/
node->flags &= ~REDIS_NODE_MEET;

redisLog(REDIS_DEBUG,"Connecting with Node %.40s at %s:%d",
node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
}
}
dictReleaseIterator(di);

/* Ping some random node 1 time every 10 iterations, so that we usually ping
* one random node every second. */
// clusterCron() 每执行 10 次(至少间隔一秒钟),就向一个随机节点发送 gossip 信息
if (!(iteration % 10)) {
int j;

/* Check a few random nodes and ping the one with the oldest
* pong_received time. */
// 随机 5 个节点,选出其中一个
for (j = 0; j < 5; j++) {

// 随机在集群中挑选节点
de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);

/* Don't ping nodes disconnected or with a ping currently active. */
// 不要 PING 连接断开的节点,也不要 PING 最近已经 PING 过的节点
if (this->link == NULL || this->ping_sent != 0) continue;

if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE))
continue;

// 选出 5 个随机节点中最近一次接收 PONG 回复距离现在最旧的节点
if (min_pong_node == NULL || min_pong > this->pong_received) {
min_pong_node = this;
min_pong = this->pong_received;
}
}

// 向最久没有收到 PONG 回复的节点发送 PING 命令
if (min_pong_node) {
redisLog(REDIS_DEBUG,"Pinging node %.40s", min_pong_node->name);
clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
}
}

// 遍历所有节点,检查是否需要将某个节点标记为下线
/* Iterate nodes to check if we need to flag something as failing.
* This loop is also responsible to:
* 1) Check if there are orphaned masters (masters without non failing
* slaves).
* 2) Count the max number of non failing slaves for a single master.
* 3) Count the number of slaves for our master, if we are a slave. */
orphaned_masters = 0;
max_slaves = 0;
this_slaves = 0;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
now = mstime(); /* Use an updated time at every iteration. */
mstime_t delay;

// 跳过节点本身、无地址节点、HANDSHAKE 状态的节点
if (node->flags &
(REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE))
continue;

/* Orphaned master check, useful only if the current instance
* is a slave that may migrate to another master. */
if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
int okslaves = clusterCountNonFailingSlaves(node);

if (okslaves == 0 && node->numslots > 0) orphaned_masters++;
if (okslaves > max_slaves) max_slaves = okslaves;
if (nodeIsSlave(myself) && myself->slaveof == node)
this_slaves = okslaves;
}

/* If we are waiting for the PONG more than half the cluster
* timeout, reconnect the link: maybe there is a connection
* issue even if the node is alive. */
// 如果等到 PONG 到达的时间超过了 node timeout 一半的连接
// 因为尽管节点依然正常,但连接可能已经出问题了
if (node->link && /* is connected */
now - node->link->ctime >
server.cluster_node_timeout && /* was not already reconnected */
node->ping_sent && /* we already sent a ping */
node->pong_received < node->ping_sent && /* still waiting pong */
/* and we are waiting for the pong more than timeout/2 */
now - node->ping_sent > server.cluster_node_timeout/2)
{
/* Disconnect the link, it will be reconnected automatically. */
// 释放连接,下次 clusterCron() 会自动重连
freeClusterLink(node->link);
}

/* If we have currently no active ping in this instance, and the
* received PONG is older than half the cluster timeout, send
* a new ping now, to ensure all the nodes are pinged without
* a too big delay. */
// 如果目前没有在 PING 节点
// 并且已经有 node timeout 一半的时间没有从节点那里收到 PONG 回复
// 那么向节点发送一个 PING ,确保节点的信息不会太旧
// (因为一部分节点可能一直没有被随机中)
if (node->link &&
node->ping_sent == 0 &&
(now - node->pong_received) > server.cluster_node_timeout/2)
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}

/* If we are a master and one of the slaves requested a manual
* failover, ping it continuously. */
// 如果这是一个主节点,并且有一个从服务器请求进行手动故障转移
// 那么向从服务器发送 PING 。
if (server.cluster->mf_end &&
nodeIsMaster(myself) &&
server.cluster->mf_slave == node &&
node->link)
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}

/* Check only if we have an active ping for this instance. */
// 以下代码只在节点发送了 PING 命令的情况下执行
if (node->ping_sent == 0) continue;

/* Compute the delay of the PONG. Note that if we already received
* the PONG, then node->ping_sent is zero, so can't reach this
* code at all. */
// 计算等待 PONG 回复的时长
delay = now - node->ping_sent;

// 等待 PONG 回复的时长超过了限制值,将目标节点标记为 PFAIL (疑似下线)
if (delay > server.cluster_node_timeout) {
/* Timeout reached. Set the node as possibly failing if it is
* not already in this state. */
if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {
redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
node->name);
// 打开疑似下线标记
node->flags |= REDIS_NODE_PFAIL;
update_state = 1;
}
}
}
dictReleaseIterator(di);

/* If we are a slave node but the replication is still turned off,
* enable it if we know the address of our master and it appears to
* be up. */
// 如果从节点没有在复制主节点,那么对从节点进行设置
if (nodeIsSlave(myself) &&
server.masterhost == NULL &&
myself->slaveof &&
nodeHasAddr(myself->slaveof))
{
replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
}

/* Abourt a manual failover if the timeout is reached. */
manualFailoverCheckTimeout();

if (nodeIsSlave(myself)) {
clusterHandleManualFailover();
clusterHandleSlaveFailover();
/* If there are orphaned slaves, and we are a slave among the masters
* with the max number of non-failing slaves, consider migrating to
* the orphaned masters. Note that it does not make sense to try
* a migration if there is no master with at least *two* working
* slaves. */
if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
clusterHandleSlaveMigration(max_slaves);
}

// 更新集群状态
if (update_state || server.cluster->state == REDIS_CLUSTER_FAIL)
clusterUpdateState();
}

2、状态信息并不是全量同步,而是随机选M(M << N)个节点的状态同步到其他节点。
M值最小为3,最大为N - 2,一般情况下M = N / 10

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/* Send a PING or PONG packet to the specified node, making sure to add enough
* gossip informations. */
// 向指定节点发送一条 MEET 、 PING 或者 PONG 消息
void clusterSendPing(clusterLink *link, int type) {
unsigned char buf[sizeof(clusterMsg)];
clusterMsg *hdr = (clusterMsg*) buf;
int gossipcount = 0, totlen;
/* freshnodes is the number of nodes we can still use to populate the
* gossip section of the ping packet. Basically we start with the nodes
* we have in memory minus two (ourself and the node we are sending the
* message to). Every time we add a node we decrement the counter, so when
* it will drop to <= zero we know there is no more gossip info we can
* send. */
// freshnodes 是用于发送 gossip 信息的计数器
// 每次发送一条信息时,程序将 freshnodes 的值减一
// 当 freshnodes 的数值小于等于 0 时,程序停止发送 gossip 信息
// freshnodes 的数量是节点目前的 nodes 表中的节点数量减去 2
// 这里的 2 指两个节点,一个是 myself 节点(也即是发送信息的这个节点)
// 另一个是接受 gossip 信息的节点
int freshnodes = dictSize(server.cluster->nodes)-2;

// 如果发送的信息是 PING ,那么更新最后一次发送 PING 命令的时间戳
if (link->node && type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = mstime();

// 将当前节点的信息(比如名字、地址、端口号、负责处理的槽)记录到消息里面
clusterBuildMessageHdr(hdr,type);

/* Populate the gossip fields */
// 从当前节点已知的节点中随机选出两个节点
// 并通过这条消息捎带给目标节点,从而实现 gossip 协议

// 每个节点有 freshnodes 次发送 gossip 信息的机会
// 每次向目标节点发送 2 个被选中节点的 gossip 信息(gossipcount 计数)
while(freshnodes > 0 && gossipcount < 3) {
// 从 nodes 字典中随机选出一个节点(被选中节点)
dictEntry *de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);

clusterMsgDataGossip *gossip;
int j;

/* In the gossip section don't include:
* 以下节点不能作为被选中节点:
* 1) Myself.
* 节点本身。
* 2) Nodes in HANDSHAKE state.
* 处于 HANDSHAKE 状态的节点。
* 3) Nodes with the NOADDR flag set.
* 带有 NOADDR 标识的节点
* 4) Disconnected nodes if they don't have configured slots.
* 因为不处理任何槽而被断开连接的节点
*/
if (this == myself ||
this->flags & (REDIS_NODE_HANDSHAKE|REDIS_NODE_NOADDR) ||
(this->link == NULL && this->numslots == 0))
{
freshnodes--; /* otherwise we may loop forever. */
continue;
}

/* Check if we already added this node */
// 检查被选中节点是否已经在 hdr->data.ping.gossip 数组里面
// 如果是的话说明这个节点之前已经被选中了
// 不要再选中它(否则就会出现重复)
for (j = 0; j < gossipcount; j++) {
if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
REDIS_CLUSTER_NAMELEN) == 0) break;
}
if (j != gossipcount) continue;

/* Add it */

// 这个被选中节点有效,计数器减一
freshnodes--;

// 指向 gossip 信息结构
gossip = &(hdr->data.ping.gossip[gossipcount]);

// 将被选中节点的名字记录到 gossip 信息
memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
// 将被选中节点的 PING 命令发送时间戳记录到 gossip 信息
gossip->ping_sent = htonl(this->ping_sent);
// 将被选中节点的 PING 命令回复的时间戳记录到 gossip 信息
gossip->pong_received = htonl(this->pong_received);
// 将被选中节点的 IP 记录到 gossip 信息
memcpy(gossip->ip,this->ip,sizeof(this->ip));
// 将被选中节点的端口号记录到 gossip 信息
gossip->port = htons(this->port);
// 将被选中节点的标识值记录到 gossip 信息
gossip->flags = htons(this->flags);

// 这个被选中节点有效,计数器增一
gossipcount++;
}

// 计算信息长度
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
// 将被选中节点的数量(gossip 信息中包含了多少个节点的信息)
// 记录在 count 属性里面
hdr->count = htons(gossipcount);
// 将信息的长度记录到信息里面
hdr->totlen = htonl(totlen);

// 发送信息
clusterSendMessage(link,buf,totlen);
}

扩容 / 缩容

当新的节点加入时,我们该如何重新分配数据,让新的节点也对外提供服务。当有节点退出时,我们该如何把存在该节点上的数据分配到其他机器上,让其他机器来提供这部分数据的服务。即集群的扩缩容问题。

新节点加入流程

新节点加入时,需要把一部分数据迁移到新节点来达到集群的负载均衡。
在Redis集群中,数据的存储是以slot为单位的,因此:

  1. 集群的伸缩本质上就是slot在不同机器节点之间的迁移;
  2. 迁移过程中,有的slot在老节点上,有的slot在新节点上,这时,客户端请求应该被重定向到正确的节点上。
    比如slot1从A迁移到B上时,请求A或B会怎么样?请求别的节点又会怎么样?

节点的迁移过程主要分为3个步骤:

  1. 准备新节点
  2. 加入集群
  3. 迁移slot到新节点

以下迁移过程的伪代码来自:Redis集群详解(中)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def move_slot(source,target,slot):
# 目标节点准备导入槽
target.cluster("setslot",slot,"importing",source.nodeId);
# 目标节点准备全出槽
source.cluster("setslot",slot,"migrating",target.nodeId);
while true :
# 批量从源节点获取键
keys = source.cluster("getkeysinslot",slot,pipeline_size);
if keys.length == 0:
# 键列表为空时,退出循环
break;
# 批量迁移键到目标节点
source.call("migrate",target.host,target.port,"",0,timeout,"keys",keys);
# 向集群所有主节点通知槽被分配给目标节点
for node in nodes:
if node.flag == "slave":
continue;
node.cluster("setslot",slot,"node",target.nodeId);

节点迁移过程

以下命令告知目标节点准备导入slot:

1
cluster setslot <slot> IMPORTING <nodeId>

以下命令告知目标节点准备导出slot:

1
cluster setslot <slot> MIGRATING <nodeId>

每个节点保存的集群状态中记录了迁移中的slot,其中,迁出的slot放到migrating_slots_to中,迁入的slot放到importing_slots_from

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
typedef struct clusterState {
clusterNode *myself; /* This node */
// 当前纪元
uint64_t currentEpoch;
// 集群的状态
int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */
// 集群中至少负责一个槽的主节点个数
int size; /* Num of master nodes with at least one slot */
// 保存集群节点的字典,键是节点名字,值是clusterNode结构的指针
dict *nodes; /* Hash table of name -> clusterNode structures */
// 防止重复添加节点的黑名单
dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */
// 导入槽数据到目标节点,该数组记录这些节点
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
// 导出槽数据到目标节点,该数组记录这些节点
clusterNode *importing_slots_from[CLUSTER_SLOTS];
// 槽和负责槽节点的映射
clusterNode *slots[CLUSTER_SLOTS];
// 槽映射到键的有序集合
zskiplist *slots_to_keys;

} clusterState;

接下来,将待迁移slot中的key批量转移到目标节点:

1
2
3
4
# 返回count个slot中的键
cluster getkeysinslot <slot> <count>
# 需要对上面命令返回的每个键都发送以下命令,该命令会将所指定的键原子地从源节点移动到目标节点
migrate <host> <port> key destination-db timeout

migrate命令就是向节点发送了N个RESTORE-ASKING命令,实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/* Create RESTORE payload and generate the protocol to call the command. */
for (j = 0; j < num_keys; j++) {
long long ttl = 0;
long long expireat = getExpire(c->db,kv[j]);
//检查键是不是已经过期
if (expireat != -1) {
ttl = expireat-mstime();
if (ttl < 0) {
continue;
}
if (ttl < 1) ttl = 1;
}
kv[non_expired++] = kv[j];

// 集群模式下写入RESTORE-ASKING命令,普通模式下写入RESTORE命令
if (server.cluster_enabled)
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
else
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
// 写入KEY,写入TTL
serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
sdslen(kv[j]->ptr)));
serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
// 写入VALUE以及Redis版本校验码等信息
createDumpPayload(&payload,ov[j],kv[j]);
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,payload.io.buffer.ptr,
sdslen(payload.io.buffer.ptr)));

}

迁入节点接收到restore-asking命令后,执行节点的恢复操作,即获取key,解析出value,然后写入数据库:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/* RESTORE key ttl serialized-value [REPLACE] */
// 根据给定的 DUMP 数据,还原出一个键值对数据,并将它保存到数据库里面
void restoreCommand(redisClient *c) {
long long ttl;
rio payload;
int j, type, replace = 0;
robj *obj;

...

// 读取 DUMP 数据,并反序列化出键值对的类型和值
rioInitWithBuffer(&payload,c->argv[3]->ptr);
if (((type = rdbLoadObjectType(&payload)) == -1) ||
((obj = rdbLoadObject(type,&payload)) == NULL))
{
addReplyError(c,"Bad data format");
return;
}

/* Remove the old key if needed. */
// 如果给定了 REPLACE 选项,那么先删除数据库中已存在的同名键
if (replace) dbDelete(c->db,c->argv[1]);

/* Create the key and set the TTL if any */
// 将键值对添加到数据库
dbAdd(c->db,c->argv[1],obj);

// 如果键带有 TTL 的话,设置键的 TTL
if (ttl) setExpire(c->db,c->argv[1],mstime()+ttl);

signalModifiedKey(c->db,c->argv[1]);

addReply(c,shared.ok);
server.dirty++;
}

迁移过程中,在外部客户端的视角看来,在任意时间点上,key只会存在于某个节点上,而不会同时存在于两个节点上。

现在,待迁移槽中的key都已经被迁移了,但是对其他节点来说,该slot仍是由迁出节点负责的,它们接收到相关请求后仍然会路由到迁出节点,所以迁移的最后一步需要向集群中的所有主节点通知槽已经被分配给目标节点。

1
cluster setslot <slot> node <nodeId>

迁移过程中对新请求的响应

迁移过程中:

  • 如果迁出节点接收请求,迁出节点判断slot或key是否已迁出,若是则ASK重定向到迁入节点上,否则迁出节点自己负责处理请求;
  • 如果迁入节点接收请求,会把请求重定向到迁出节点上,除非请求中包含ASKING命令;
  • 其他节点接收到的相关请求会被重定向到迁出节点上;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    int processCommand(redisClient *c) {

    ...

    /* If cluster is enabled perform the cluster redirection here.
    *
    * 如果开启了集群模式,那么在这里进行转向操作。
    *
    * However we don't perform the redirection if:
    *
    * 不过,如果有以下情况出现,那么节点不进行转向:
    *
    * 1) The sender of this command is our master.
    * 命令的发送者是本节点的主节点
    *
    * 2) The command has no key arguments.
    * 命令没有 key 参数
    */
    if (server.cluster_enabled &&
    !(c->flags & REDIS_MASTER) &&
    !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
    {
    int hashslot;

    // 集群已下线
    if (server.cluster->state != REDIS_CLUSTER_OK) {
    flagTransaction(c);
    addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n"));
    return REDIS_OK;

    // 集群运作正常
    } else {
    int error_code;
    clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
    // 不能执行多键处理命令
    if (n == NULL) {
    flagTransaction(c);
    if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) {
    addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
    } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) {
    /* The request spawns mutliple keys in the same slot,
    * but the slot is not "stable" currently as there is
    * a migration or import in progress. */
    addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
    } else {
    redisPanic("getNodeByQuery() unknown error.");
    }
    return REDIS_OK;

    // 命令针对的槽和键不是本节点处理的,进行转向
    } else if (n != server.cluster->myself) {
    flagTransaction(c);
    // -<ASK or MOVED> <slot> <ip>:<port>
    // 例如 -ASK 10086 127.0.0.1:12345
    addReplySds(c,sdscatprintf(sdsempty(),
    "-%s %d %s:%d\r\n",
    (error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
    hashslot,n->ip,n->port));

    return REDIS_OK;
    }

    // 如果执行到这里,说明键 key 所在的槽由本节点处理
    // 或者客户端执行的是无参数命令
    }
    }

    ...
    }
    注意上边的getNodeByQuery根据key的散列结果查询命令应该被打到的节点,可以看到这个函数里有对ASKING标识的特殊处理:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {

    ...

    /* If we are receiving the slot, and the client correctly flagged the
    * request as "ASKING", we can serve the request. However if the request
    * involves multiple keys and we don't have them all, the only option is
    * to send a TRYAGAIN error. */
    if (importing_slot &&
    (c->flags & REDIS_ASKING || cmd->flags & REDIS_CMD_ASKING))
    {
    if (multiple_keys && missing_keys) {
    if (error_code) *error_code = REDIS_CLUSTER_REDIR_UNSTABLE;
    return NULL;
    } else {
    return myself;
    }
    }

    ...
    }

旧节点退出流程

与新节点的加入相反的是,旧节点退出时需要把其上的数据迁移到其他节点上,确保该节点上的数据能够被正常访问。
槽的迁移过程和上边扩容中描述的没有区别,主要区别是在迁移完毕后需要轮询每个节点发送cluster forget命令,让它们能忘记下线的节点。
节点在接收cluster forget命令后,会将目标节点的状态从自己保存的集群状态中移除,并将其加入黑名单中60s,这期间其他节点不会再去更新自己维护的该节点的信息,也就是说这60秒内该节点无法重新加入集群内。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def delnode_cluster_cmd(downNode):
# 下线节点不允许包含slots
if downNode.slots.length != 0
exit 1
end
# 向集群内节点发送cluster forget
for n in nodes:
if n.id == downNode.id:
# 不能对自己做forget操作
continue;
# 如果下线节点有从节点则把从节点指向其他主节点
if n.replicate && n.replicate.nodeId == downNode.id :
# 指向拥有最少从节点的主节点
master = get_master_with_least_replicas();
n.cluster("replicate",master.nodeId);
#发送忘记节点命令
n.cluster('forget',downNode.id)
# 节点关闭
downNode.shutdown();

集群规模估算

集群规模并不是没有限制的,理论上每个节点一个slot集群可以扩容到16384个节点,但是Redis官方给出的规模上限是一个集群1000个节点,因为实例间的通信开销会随着实例规模增加而增大
下面来讨论下集群内部有哪些交互,并分析它们会对性能有什么样的影响。

实例间数据的同步

集群每个节点都会记录slot和实例间的映射关系,用于请求的重定向。
每个实例都需要通过Gossip协议将数据同步到其他节点,大致流程为:

  1. 每个实例之间会按照一定的频率,从集群中随机挑选一些实例,把 PING 消息发送给挑选出来的实例,用来检测这些实例是否在线,并交换彼此的状态信息。PING 消息中封装了发送消息的实例自身的状态信息、部分其它实例的状态信息,以及 Slot 映射表。
    发送的节点状态信息在源码中由clusterMsgDataGossip这个结构来表示,大小为104字节。每个实例在发送Gossip消息时,除了传递自身的状态信息,默认还会传递集群十分之一实例的状态信息,比如,对于一个包含了 1000 个实例的集群来说,每个实例发送一个 PING 消息时,会包含 100 个实例的状态信息,总的数据量是 10400 字节,再加上发送实例自身的信息,一个 Gossip 消息大约是 10KB。
    另外,Slot映射表是一个16384位的bitmap,算上上面的10KB就是12KB的内容。
  2. 一个实例在接收到 PING 消息后,会给发送 PING 消息的实例,发送一个 PONG 消息。PONG 消息包含的内容和 PING 消息一样,也是12KB。
  3. 另外,上面是随机选节点发PING请求的,如果部分节点一直没有被选到,就会导致这些节点和其他节点不同步。
    为了避免这种情况,Redis Cluster 的实例会按照每 100ms 一次的频率,扫描本地的实例列表,如果发现有实例最近一次接收 PONG 消息的时间,已经大于配置项 cluster-node-timeout 的一半了(cluster-node-timeout/2),就会立刻给该实例发送 PING 消息,更新这个实例上的集群状态信息。
    当集群规模扩大之后,因为网络拥塞或是不同服务器间的流量竞争,会导致实例间的网络通信延迟增加。如果有部分实例无法收到其它实例发送的 PONG 消息,就会引起实例之间频繁地发送 PING 消息,这又会对集群网络通信带来额外的开销了。

从上可知,实例间的数据同步受到通信消息大小通信频率这两方面的影响。
当集群规模扩大后,PING/PONG会占用大量的集群内网络带宽,降低集群服务正常请求的吞吐量。
单实例每秒会发送的PING消息数量大致可以算出是(注意cron是100ms执行一次):

1
PING 消息发送数量 = 1 + 10 * 实例数(最近一次接收 PONG 消息的时间超出 cluster-node-timeout/2)

其中,1 是指单实例常规按照每 1 秒发送一个 PING 消息,10 是指每 1 秒内实例会执行 10 次检查,每次检查后会给 PONG 消息超时的实例发送消息。
假设单个实例检测发现,每 100 毫秒有 10 个实例的 PONG 消息接收超时,那么,这个实例每秒就会发送 101 个 PING 消息,约占 1.2MB/s 带宽。如果集群中有 30 个实例按照这种频率发送消息,就会占用 36MB/s 带宽,这就会挤占集群中用于服务正常请求的带宽。

因此实例间的通信开销优化主要是:

  1. 减少实例传输的消息大小(PING/PONG 消息、Slot 分配信息)
    但是,因为集群实例依赖 PING、PONG 消息和 Slot 分配信息,来维持集群状态的统一,一旦减小了传递的消息大小,就会导致实例间的通信信息减少,不利于集群维护,所以,我们不能采用这种方式。
  2. 降低实例间发送消息的频率
    从上面PING消息发送数量公式可以看出,每秒发送一条PING消息的频率不算高,如果要降低可能导致集群内数据同步延迟;每100ms做一次检测并给延迟超过cluster-node-timeout/2的节点发送PING消息,这个配置是可以适当调大的。
    • 如果配置得比较小,则在大规模集群中会频繁出现PONG超时的情况;
    • 如果配置得过大,则如果真得发生了故障,我们反而需要等比较长的时间才能检测出来。
      可以在调整前后使用tcpdump抓取实例发送心跳网络包的情况。
      tcpdump host 192.168.10.3 port 16379 -i 网卡名 -w /tmp/r1.cap

故障恢复(容错)

Redis故障恢复主要分为以下3个步骤:

  1. 故障发现
    采用多数派协议完成故障检测判断(即至少有半数以上节点认为某主节点故障后才真正判断节点故障)。
  2. 子节点选举
    Redis Cluster中每个Master都会有1至多个Slave,通过复制实现高可用(故障转移),当Master有多个Slave,会采用Raft实现选举出一个主节点以实现故障恢复。
  3. 配置更新
    故障转移后,那么之前的Master和其他Slave怎么处理?Redis会将这些节点成为新Master节点的子节点。

故障发现

一些 CP 特性且中心化的集群来说,当出现节点宕机时经常需要选举新的 Leader 节点,但是 Redis-Cluster 是去中心化的,某个 Master 的宕机并不会影响其他节点的工作。但是,当节点失联时,需要考虑网络的抖动情况,毕竟不能因为某几个请求意外超时就推断集群失败了,部分节点判断一个节点失联只会标记这个节点状态为PFAIL(主观下线),之后如果多数节点投票通过才会真正标记这个节点FAIL(下线)
投票过程是集群中所有 master 参与的,每个节点都存有整个集群所有主节点及从节点的信息,它们之间通过互相 ping-pong 来判断节点是否可以连上,如果半数以上 master 节点与当前 master 节点通信超时(cluster-node-timeout),则认为当前 master 节点挂掉,标记这个节点状态为FAIL

当 master 挂掉时,并不意味着集群已无法再提供服务了,集群要进入fail(不可用)状态需要满足以下条件之一:

  1. 集群的任意 master 挂掉,且该 master 没有 slave 或 slave 全挂掉了,则集群进入 fail 状态。
    这是因为,Cluster中所有slot是平均分配到每个Master的,如果有一个Master的slot不能用了、而且这个Master还没有Slave,那么集群就不能提供服务了,如果Master还有Slave,Slave可以代替Master继续向外提供服务,这个步骤称为slave promotion
    单独的一对Master-Slave挂掉,Redis还提供一个叫 Replica Migration 的解决方案:当集群中的某个Master节点没有Slave节点时(称之为 Orphaned Master),其他有富余Slave节点的主节点会向该节点迁移一个Slave节点以防该节点下线之后没有子节点来替换从而导致整个集群下线。
  2. 集群超过半数以上 master 挂掉,无论有无 slave 都进入 fail 状态。

当集群不可用时,任何操作都将返回((error) CLUSTERDOWN The cluster is down)错误。需要注意的是,必须要 3 个或以上的主节点,否则在创建集群时会失败。

PFAIL

1、Redis每个节点会不断向其他节点发送PING消息来检测其他节点是否可达,如果超时会先断开连接:
代码:cluster.c/clusterCron
RedisCluster故障发现1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/* This is executed 10 times every second */
// 集群常规操作函数,默认每秒执行 10 次(每间隔 100 毫秒执行一次)
void clusterCron(void) {
...

while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
now = mstime(); /* Use an updated time at every iteration. */
mstime_t delay;

...

/* If we are waiting for the PONG more than half the cluster
* timeout, reconnect the link: maybe there is a connection
* issue even if the node is alive. */
// 判断连接的节点是否出事
if (node->link && /* is connected */
now - node->link->ctime >
server.cluster_node_timeout && /* was not already reconnected */
// ping_sent记录发送命令的时间
node->ping_sent && /* we already sent a ping */
node->pong_received < node->ping_sent && /* still waiting pong */
/* and we are waiting for the pong more than timeout/2 */
// PONG 到达的时间超过了 node_timeout 的一半
now - node->ping_sent > server.cluster_node_timeout/2)
{
/* Disconnect the link, it will be reconnected automatically. */
// 释放连接,此时node->link=NULL,下次 clusterCron() 会自动重连
freeClusterLink(node->link);
}

...
}

...
}

2、此时节点A PING目标节点B失败,A会尝试重连,并将重连时间记录到ping_sent变量中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/* This is executed 10 times every second */
// 集群常规操作函数,默认每秒执行 10 次(每间隔 100 毫秒执行一次)
void clusterCron(void) {

...

/* Check if we have disconnected nodes and re-establish the connection. */
// 向集群中的所有断线或者未连接节点发送消息
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

// 跳过当前节点以及没有地址的节点
if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;

...

// 为未创建连接的节点创建连接
if (node->link == NULL) {
int fd;
mstime_t old_ping_sent;
clusterLink *link;

fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
node->port+REDIS_CLUSTER_PORT_INCR,
server.bindaddr_count ? server.bindaddr[0] : NULL);
if (fd == -1) {
redisLog(REDIS_DEBUG, "Unable to connect to "
"Cluster Node [%s]:%d -> %s", node->ip,
node->port+REDIS_CLUSTER_PORT_INCR,
server.neterr);
continue;
}
link = createClusterLink(node);
link->fd = fd;
node->link = link;
aeCreateFileEvent(server.el,link->fd,AE_READABLE,
clusterReadHandler,link);
/* Queue a PING in the new connection ASAP: this is crucial
* to avoid false positives in failure detection.
*
* If the node is flagged as MEET, we send a MEET message instead
* of a PING one, to force the receiver to add us in its node
* table. */
// 向新连接的节点发送 PING 命令,防止节点被识进入下线
// 如果节点被标记为 MEET ,那么发送 MEET 命令,否则发送 PING 命令
old_ping_sent = node->ping_sent;
clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);

// 这不是第一次发送 PING 信息,所以可以还原这个时间
// 等 clusterSendPing() 函数来更新它
if (old_ping_sent) {
/* If there was an active ping before the link was
* disconnected, we want to restore the ping time, otherwise
* replaced by the clusterSendPing() call. */
node->ping_sent = old_ping_sent;
}

/* We can clear the flag after the first packet is sent.
*
* 在发送 MEET 信息之后,清除节点的 MEET 标识。
*
* If we'll never receive a PONG, we'll never send new packets
* to this node. Instead after the PONG is received and we
* are no longer in meet/handshake status, we want to send
* normal PING packets.
*
* 如果当前节点(发送者)没能收到 MEET 信息的回复,
* 那么它将不再向目标节点发送命令。
*
* 如果接收到回复的话,那么节点将不再处于 HANDSHAKE 状态,
* 并继续向目标节点发送普通 PING 命令。
*/
node->flags &= ~REDIS_NODE_MEET;

redisLog(REDIS_DEBUG,"Connecting with Node %.40s at %s:%d",
node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
}
}

...

}

3、节点A发现PING B的延时时间超过了node_timeout之后,就会标记该节点为PFAIL(Possible FAILure),即主观下线:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
void clusterCron(void) {

...

// 遍历所有节点,检查是否需要将某个节点标记为下线
/* Iterate nodes to check if we need to flag something as failing.
* This loop is also responsible to:
* 1) Check if there are orphaned masters (masters without non failing
* slaves).
* 2) Count the max number of non failing slaves for a single master.
* 3) Count the number of slaves for our master, if we are a slave. */
orphaned_masters = 0;
max_slaves = 0;
this_slaves = 0;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {

...

/* Check only if we have an active ping for this instance. */
// 以下代码只在节点发送了 PING 命令的情况下执行
if (node->ping_sent == 0) continue;

/* Compute the delay of the PONG. Note that if we already received
* the PONG, then node->ping_sent is zero, so can't reach this
* code at all. */
// 计算等待 PONG 回复的时长
delay = now - node->ping_sent;

// 等待 PONG 回复的时长超过了限制值,将目标节点标记为 PFAIL (疑似下线)
if (delay > server.cluster_node_timeout) {
/* Timeout reached. Set the node as possibly failing if it is
* not already in this state. */
if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {
redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
node->name);
// 打开疑似下线标记
node->flags |= REDIS_NODE_PFAIL;
update_state = 1;
}
}
}

...
}

FAIL

RedisCluster故障发现2
1、A将B标记为PFAIL后,A会通过Gossip通知到其他节点。

2、所有节点会维护一个下线报告列表(Fail Report),主要维护一个节点被哪些节点报告处于下线状态,此时,C会记录“B被A报告下线了”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
int clusterProcessPacket(clusterLink *link) {

...

/* Process packets by type. */
// 根据消息的类型,处理节点

// 这是一条 PING 消息或者 MEET 消息
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
redisLog(REDIS_DEBUG,"Ping packet received: %p", (void*)link->node);

/* Add this node if it is new for us and the msg type is MEET.
*
* 如果当前节点是第一次遇见这个节点,并且对方发来的是 MEET 信息,
* 那么将这个节点添加到集群的节点列表里面。
*
* In this stage we don't try to add the node with the right
* flags, slaveof pointer, and so forth, as this details will be
* resolved when we'll receive PONGs from the node.
*
* 节点目前的 flag 、 slaveof 等属性的值都是未设置的,
* 等当前节点向对方发送 PING 命令之后,
* 这些信息可以从对方回复的 PONG 信息中取得。
*/
if (!sender && type == CLUSTERMSG_TYPE_MEET) {
clusterNode *node;

// 创建 HANDSHAKE 状态的新节点
node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);

// 设置 IP 和端口
nodeIp2String(node->ip,link);
node->port = ntohs(hdr->port);

// 将新节点添加到集群
clusterAddNode(node);

clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}

/* Get info from the gossip section */
// 分析并取出消息中的 gossip 节点信息
clusterProcessGossipSection(hdr,link);

/* Anyway reply with a PONG */
// 向目标节点返回一个 PONG
clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
}

...
}

void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {

// 记录这条消息中包含了多少个节点的信息
uint16_t count = ntohs(hdr->count);

// 指向第一个节点的信息
clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;

// 取出发送者
clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);

// 遍历所有节点的信息
while(count--) {
sds ci = sdsempty();

// 分析节点的 flag
uint16_t flags = ntohs(g->flags);

// 信息节点
clusterNode *node;

...

/* Update our state accordingly to the gossip sections */
// 使用消息中的信息对节点进行更新
node = clusterLookupNode(g->nodename);
// 节点已经存在于当前节点
if (node) {
/* We already know this node.
Handle failure reports, only when the sender is a master. */
// 如果 sender 是一个主节点,那么我们需要处理下线报告
if (sender && nodeIsMaster(sender) && node != myself) {
// 节点处于 FAIL 或者 PFAIL 状态
if (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)) {

// 添加 sender 对 node 的下线报告
if (clusterNodeAddFailureReport(node,sender)) {
redisLog(REDIS_VERBOSE,
"Node %.40s reported node %.40s as not reachable.",
sender->name, node->name);
}

// 尝试将 node 标记为 FAIL
markNodeAsFailingIfNeeded(node);

// 节点处于正常状态
} else {

// 如果 sender 曾经发送过对 node 的下线报告
// 那么清除该报告
if (clusterNodeDelFailureReport(node,sender)) {
redisLog(REDIS_VERBOSE,
"Node %.40s reported node %.40s is back online.",
sender->name, node->name);
}
}
}

/* If we already know this node, but it is not reachable, and
* we see a different address in the gossip section, start an
* handshake with the (possibly) new address: this will result
* into a node address update if the handshake will be
* successful. */
// 如果节点之前处于 PFAIL 或者 FAIL 状态
// 并且该节点的 IP 或者端口号已经发生变化
// 那么可能是节点换了新地址,尝试对它进行握手
if (node->flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL) &&
(strcasecmp(node->ip,g->ip) || node->port != ntohs(g->port)))
{
clusterStartHandshake(g->ip,ntohs(g->port));
}

// 当前节点不认识 node
} else {
/* If it's not in NOADDR state and we don't have it, we
* start a handshake process against this IP/PORT pairs.
*
* 如果 node 不在 NOADDR 状态,并且当前节点不认识 node
* 那么向 node 发送 HANDSHAKE 消息。
*
* Note that we require that the sender of this gossip message
* is a well known node in our cluster, otherwise we risk
* joining another cluster.
*
* 注意,当前节点必须保证 sender 是本集群的节点,
* 否则我们将有加入了另一个集群的风险。
*/
if (sender &&
!(flags & REDIS_NODE_NOADDR) &&
!clusterBlacklistExists(g->nodename))
{
clusterStartHandshake(g->ip,ntohs(g->port));
}
}

/* Next node */
// 处理下个节点的信息
g++;
}
}

3、C添加下线报告之后,会进行B节点的客观下线状态(FAIL)判定。
当集群中有超过半数的节点都认为节点B处于PFAIL后才会判断B为FAIL,且需要注意的是,A将PFAIL通知给C后,C自己本身也得认为B处于PFAIL状态才会开始客观下线判定。
当C认为B正式FAIL后,它就会立刻向集群所有节点广播这个消息。
RedisCluster故障发现3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/* This function checks if a given node should be marked as FAIL.
* It happens if the following conditions are met:
*
* 此函数用于判断是否需要将 node 标记为 FAIL 。
*
* 将 node 标记为 FAIL 需要满足以下两个条件:
*
* 1) We received enough failure reports from other master nodes via gossip.
* Enough means that the majority of the masters signaled the node is
* down recently.
* 有半数以上的主节点将 node 标记为 PFAIL 状态。
* 2) We believe this node is in PFAIL state.
* 当前节点也将 node 标记为 PFAIL 状态。
*
* If a failure is detected we also inform the whole cluster about this
* event trying to force every other node to set the FAIL flag for the node.
*
* 如果确认 node 已经进入了 FAIL 状态,
* 那么节点还会向其他节点发送 FAIL 消息,让其他节点也将 node 标记为 FAIL 。
*
* Note that the form of agreement used here is weak, as we collect the majority
* of masters state during some time, and even if we force agreement by
* propagating the FAIL message, because of partitions we may not reach every
* node. However:
*
* 注意,集群判断一个 node 进入 FAIL 所需的条件是弱(weak)的,
* 因为节点们对 node 的状态报告并不是实时的,而是有一段时间间隔
* (这段时间内 node 的状态可能已经发生了改变),
* 并且尽管当前节点会向其他节点发送 FAIL 消息,
* 但因为网络分裂(network partition)的问题,
* 有一部分节点可能还是会不知道将 node 标记为 FAIL 。
*
* 不过:
*
* 1) Either we reach the majority and eventually the FAIL state will propagate
* to all the cluster.
* 只要我们成功将 node 标记为 FAIL ,
* 那么这个 FAIL 状态最终(eventually)总会传播至整个集群的所有节点。
* 2) Or there is no majority so no slave promotion will be authorized and the
* FAIL flag will be cleared after some time.
* 又或者,因为没有半数的节点支持,当前节点不能将 node 标记为 FAIL ,
* 所以对 FAIL 节点的故障转移将无法进行, FAIL 标识可能会在之后被移除。
*
*/
void markNodeAsFailingIfNeeded(clusterNode *node) {
int failures;

// 标记为 FAIL 所需的节点数量,需要超过集群节点数量的一半
int needed_quorum = (server.cluster->size / 2) + 1;

if (!nodeTimedOut(node)) return; /* We can reach it. */
if (nodeFailed(node)) return; /* Already FAILing. */

// 统计将 node 标记为 PFAIL 或者 FAIL 的节点数量(不包括当前节点)
failures = clusterNodeFailureReportsCount(node);

/* Also count myself as a voter if I'm a master. */
// 如果当前节点是主节点,那么将当前节点也算在 failures 之内
if (nodeIsMaster(myself)) failures++;
// 报告下线节点的数量不足节点总数的一半,不能将节点判断为 FAIL ,返回
if (failures < needed_quorum) return; /* No weak agreement from masters. */

redisLog(REDIS_NOTICE,
"Marking node %.40s as failing (quorum reached).", node->name);

/* Mark the node as failing. */
// 将 node 标记为 FAIL
node->flags &= ~REDIS_NODE_PFAIL;
node->flags |= REDIS_NODE_FAIL;
node->fail_time = mstime();

/* Broadcast the failing node name to everybody, forcing all the other
* reachable nodes to flag the node as FAIL. */
// 如果当前节点是主节点的话,那么向其他节点发送报告 node 的 FAIL 信息
// 让其他节点也将 node 标记为 FAIL
if (nodeIsMaster(myself)) clusterSendFail(node->name);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
}

4、当C标记了B为FAIL状态,则它会广播到整个集群中的所有节点(包括子节点),其他节点都会更新自己维护的节点B的状态信息为FAIL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/* Send a FAIL message to all the nodes we are able to contact.
*
* 向当前节点已知的所有节点发送 FAIL 信息。
*/
void clusterSendFail(char *nodename) {
unsigned char buf[sizeof(clusterMsg)];
clusterMsg *hdr = (clusterMsg *) buf;

// 创建下线消息
clusterBuildMessageHdr(hdr, CLUSTERMSG_TYPE_FAIL);

// 记录命令
memcpy(hdr->data.fail.about.nodename, nodename, REDIS_CLUSTER_NAMELEN);

// 广播消息
clusterBroadcastMessage(buf, ntohl(hdr->totlen));
}

/* Send a message to all the nodes that are part of the cluster having
* a connected link.
*
* 向节点连接的所有其他节点发送信息。
*/
void clusterBroadcastMessage(void *buf, size_t len) {
dictIterator *di;
dictEntry *de;

// 遍历所有已知节点
di = dictGetSafeIterator(server.cluster->nodes);
while ((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

// 不向未连接节点发送信息
if (!node->link) continue;

// 不向节点自身或者 HANDSHAKE 状态的节点发送信息
if (node->flags & (REDIS_NODE_MYSELF | REDIS_NODE_HANDSHAKE))
continue;

// 发送信息
clusterSendMessage(node->link, buf, len);
}
dictReleaseIterator(di);
}

子节点选举(故障迁移)

1、当B的两个子节点接收到B的FAIL状态消息时,它们会更新自己本地内存中的集群状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int clusterProcessPacket(clusterLink *link) {

...

// 这是一条 FAIL 消息: sender 告知当前节点,某个节点已经进入 FAIL 状态。
else if (type == CLUSTERMSG_TYPE_FAIL) {
clusterNode *failing;

if (sender) {

// 获取下线节点的消息
failing = clusterLookupNode(hdr->data.fail.about.nodename);
// 下线的节点既不是当前节点,也没有处于 FAIL 状态
if (failing &&
!(failing->flags & (REDIS_NODE_FAIL | REDIS_NODE_MYSELF))) {
redisLog(REDIS_NOTICE,
"FAIL message received from %.40s about %.40s",
hdr->sender, hdr->data.fail.about.nodename);

// 打开 FAIL 状态
failing->flags |= REDIS_NODE_FAIL;
failing->fail_time = mstime();
// 关闭 PFAIL 状态
failing->flags &= ~REDIS_NODE_PFAIL;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG |
CLUSTER_TODO_UPDATE_STATE);
}
} else {
redisLog(REDIS_NOTICE,
"Ignoring FAIL message from unknonw node %.40s about %.40s",
hdr->sender, hdr->data.fail.about.nodename);
}

}

...

}

2、随后,在clusterCron定时任务中就会开始发起故障迁移,竞选成为新的Master

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
void clusterCron(void) {

...

/* Abourt a manual failover if the timeout is reached. */
manualFailoverCheckTimeout();

// 如果当前节点是子节点
if (nodeIsSlave(myself)) {
clusterHandleManualFailover();
// 处理集群子节点的故障迁移
clusterHandleSlaveFailover();

/* If there are orphaned slaves, and we are a slave among the masters
* with the max number of non-failing slaves, consider migrating to
* the orphaned masters. Note that it does not make sense to try
* a migration if there is no master with at least *two* working
* slaves. */
if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
clusterHandleSlaveMigration(max_slaves);
}

// 更新集群状态
if (update_state || server.cluster->state == REDIS_CLUSTER_FAIL)
clusterUpdateState();
}

/* This function is called if we are a slave node and our master serving
* a non-zero amount of hash slots is in FAIL state.
*
* 如果当前节点是一个从节点,并且它正在复制的一个负责非零个槽的主节点处于 FAIL 状态,
* 那么执行这个函数。
*
* The gaol of this function is:
*
* 这个函数有三个目标:
*
* 1) To check if we are able to perform a failover, is our data updated?
* 检查是否可以对主节点执行一次故障转移,节点的关于主节点的信息是否准确和最新(updated)?
* 2) Try to get elected by masters.
* 选举一个新的主节点
* 3) Perform the failover informing all the other nodes.
* 执行故障转移,并通知其他节点
*/
void clusterHandleSlaveFailover(void) {
mstime_t data_age;
mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
int needed_quorum = (server.cluster->size / 2) + 1;
int manual_failover = server.cluster->mf_end != 0 &&
server.cluster->mf_can_start;
int j;
mstime_t auth_timeout, auth_retry_time;

server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;

/* Compute the failover timeout (the max time we have to send votes
* and wait for replies), and the failover retry time (the time to wait
* before waiting again.
*
* Timeout is MIN(NODE_TIMEOUT*2,2000) milliseconds.
* Retry is two times the Timeout.
*/
auth_timeout = server.cluster_node_timeout * 2;
if (auth_timeout < 2000) auth_timeout = 2000;
auth_retry_time = auth_timeout * 2;

/* Pre conditions to run the function, that must be met both in case
* of an automatic or manual failover:
* 1) We are a slave.
* 2) Our master is flagged as FAIL, or this is a manual failover.
* 3) It is serving slots. */
if (nodeIsMaster(myself) ||
myself->slaveof == NULL ||
(!nodeFailed(myself->slaveof) && !manual_failover) ||
myself->slaveof->numslots == 0)
return;

...

}

3、资格检查
Slave节点会不停的与Master节点通信来复制Master节点的数据,如果一个Slave节点长时间不与Master节点通信,那么很可能意味着该Slave节点上的数据已经落后Master节点过多(因为Master节点再不停的更新数据但是Slave节点并没有随之更新)。Redis认为,当一个Slave节点过长时间不与Master节点通信,那么该节点就不具备参与竞选的资格。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void clusterHandleSlaveFailover(void) {

...

/* Set data_age to the number of seconds we are disconnected from
* the master. */
// 将 data_age 设置为从节点与主节点的断开秒数
if (server.repl_state == REDIS_REPL_CONNECTED) {
data_age = (mstime_t) (server.unixtime - server.master->lastinteraction)
* 1000;
} else {
data_age = (mstime_t) (server.unixtime - server.repl_down_since) * 1000;
}

/* Remove the node timeout from the data age as it is fine that we are
* disconnected from our master at least for the time it was down to be
* flagged as FAIL, that's the baseline. */
// node timeout 的时间不计入断线时间之内
if (data_age > server.cluster_node_timeout)
data_age -= server.cluster_node_timeout;

/* Check if our data is recent enough. For now we just use a fixed
* constant of ten times the node timeout since the cluster should
* react much faster to a master down.
*
* Check bypassed for manual failovers. */
// 检查这个从节点的数据是否较新:
// 目前的检测办法是断线时间不能超过 node timeout 的十倍
if (data_age >
((mstime_t)server.repl_ping_slave_period * 1000) +
(server.cluster_node_timeout * REDIS_CLUSTER_SLAVE_VALIDITY_MULT))
{
if (!manual_failover) return;
}

...

}

4、休眠时间计算
B的所有子节点(B1、B2)在判断自己具备选举资格时,就开始执行竞选,竞选协议是Raft,选举过程中,所有参与选举的节点首先随机休眠一段时间。
整个休眠时间由两个部分组成:

1
DELAY = 500 milliseconds + random delay between 0 and 500 milliseconds + SLAVE_RANK * 1000 milliseconds.
  • 一部分为固定的500ms时间,这500ms主要是为了等待集群状态同步。上面提到节点C会向集群所有节点广播消息,那么这500ms就是等待确保集群的所有节点都收到了消息并更新了状态。
  • 另一部分主要是一个随机的时间加上由该Slave节点的排名决定的附加时间。每个slave都会记录自己从主节点同步数据的复制偏移量。复制偏移量越大,说明该节点与主节点数据保持的越一致。那么显然我们选举的时候肯定是想选状态更新最近的子节点,所以我们按照更新状态的排序来确定休眠时间的附加部分。状态更新最近的节点SLAVE_RANK排名为1,那么其休眠的时间相应的也最短,也就意味着该节点最有可能获得大部分选票。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    void clusterHandleSlaveFailover(void) {

    ...

    /* If the previous failover attempt timedout and the retry time has
    * elapsed, we can setup a new one. */
    if (auth_age > auth_retry_time) {
    server.cluster->failover_auth_time = mstime() +
    500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
    random() % 500; /* Random delay between 0 and 500 milliseconds. */
    server.cluster->failover_auth_count = 0;
    server.cluster->failover_auth_sent = 0;
    server.cluster->failover_auth_rank = clusterGetSlaveRank();
    /* We add another delay that is proportional to the slave rank.
    * Specifically 1 second * rank. This way slaves that have a probably
    * less updated replication offset, are penalized. */
    server.cluster->failover_auth_time +=
    server.cluster->failover_auth_rank * 1000;
    /* However if this is a manual failover, no delay is needed. */
    if (server.cluster->mf_end) {
    server.cluster->failover_auth_time = mstime();
    server.cluster->failover_auth_rank = 0;
    }
    redisLog(REDIS_WARNING,
    "Start of election delayed for %lld milliseconds "
    "(rank #%d, offset %lld).",
    server.cluster->failover_auth_time - mstime(),
    server.cluster->failover_auth_rank,
    replicationGetSlaveOffset());
    /* Now that we have a scheduled election, broadcast our offset
    * to all the other slaves so that they'll updated their offsets
    * if our offset is better. */
    clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
    return;
    }

    ...

    /* Return ASAP if we can't still start the election. */
    // 如果执行故障转移的时间未到,先返回
    if (mstime() < server.cluster->failover_auth_time) return;

    ...

    }
    5、发起拉票 & 选举投票
    B1唤醒后,会向其他所有节点发送拉票请求,即CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST类型的消息。
    其他主节点接收到拉票请求,且此时它还没有投出自己的票,则会将自己票投给发请求的B1,即回复FAILOVER_AUTH_ACK消息。
    其他子节点没有投票的资格,因此即使接收到CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST类型消息也会直接忽略。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    void clusterHandleSlaveFailover(void) {

    ...

    /* Ask for votes if needed. */
    // 向其他节点发送故障转移请求
    if (server.cluster->failover_auth_sent == 0) {

    // 增加配置纪元
    server.cluster->currentEpoch++;

    // 记录发起故障转移的配置纪元
    server.cluster->failover_auth_epoch = server.cluster->currentEpoch;

    redisLog(REDIS_WARNING, "Starting a failover election for epoch %llu.",
    (unsigned long long) server.cluster->currentEpoch);

    // 向其他所有节点发送信息,看它们是否支持由本节点来对下线主节点进行故障转移
    clusterRequestFailoverAuth();

    // 打开标识,表示已发送信息
    server.cluster->failover_auth_sent = 1;

    // TODO:
    // 在进入下个事件循环之前,执行:
    // 1)保存配置文件
    // 2)更新节点状态
    // 3)同步配置
    clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG |
    CLUSTER_TODO_UPDATE_STATE |
    CLUSTER_TODO_FSYNC_CONFIG);
    return; /* Wait for replies. */
    }

    ...

    }
    6、替换节点(failover)
    当子节点接收到来自其他节点的ACK消息时会统计自己获得的票数,当达到集群Master总数的一半以上时,就会开始执行failover,即替换自己的主节点。
    首先标记自己为主节点,然后将原来由节点B负责的slots标记为由自己负责,最后向整个集群广播现在自己是Master同时负责旧Master所有slots的信息。其他节点接收到该信息后会更新自己维护的B1的状态并标记B1为主节点,将节点B负责的slots的负责节点设置为B1节点。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    void clusterHandleSlaveFailover(void) {

    ...

    /* Check if we reached the quorum. */
    // 如果当前节点获得了足够多的投票,那么对下线主节点进行故障转移
    if (server.cluster->failover_auth_count >= needed_quorum) {
    // 旧主节点
    clusterNode *oldmaster = myself->slaveof;

    redisLog(REDIS_WARNING,
    "Failover election won: I'm the new master.");

    /* We have the quorum, perform all the steps to correctly promote
    * this slave to a master.
    *
    * 1) Turn this node into a master.
    * 将当前节点的身份由从节点改为主节点
    */
    clusterSetNodeAsMaster(myself);
    // 让从节点取消复制,成为新的主节点
    replicationUnsetMaster();

    /* 2) Claim all the slots assigned to our master. */
    // 接收所有主节点负责处理的槽
    for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
    if (clusterNodeGetSlotBit(oldmaster, j)) {
    // 将槽设置为未分配的
    clusterDelSlot(j);
    // 将槽的负责人设置为当前节点
    clusterAddSlot(myself, j);
    }
    }

    /* 3) Update my configEpoch to the epoch of the election. */
    // 更新集群配置纪元
    myself->configEpoch = server.cluster->failover_auth_epoch;

    /* 4) Update state and save config. */
    // 更新节点状态
    clusterUpdateState();
    // 并保存配置文件
    clusterSaveConfigOrDie(1);

    /* 5) Pong all the other nodes so that they can update the state
    * accordingly and detect that we switched to master role. */
    // 向所有节点发送 PONG 信息
    // 让它们可以知道当前节点已经升级为主节点了
    clusterBroadcastPong(CLUSTER_BROADCAST_ALL);

    /* 6) If there was a manual failover in progress, clear the state. */
    // 如果有手动故障转移正在执行,那么清理和它有关的状态
    resetManualFailover();
    }
    }

配置更新

上边我们已经通过故障发现和子节点选举机制用B1这个子节点替换掉了它的Master节点B,那么留下来的节点B和B2应该怎么处理呢?实际上Redis会让它们变成B1的Slave节点。
1、对B2来说,B1升级成Master后会给B2发送消息,让它知道自己已经升级成Master了。

1
2
3
4
5
6
7
8
9
10
11
12
void clusterHandleSlaveFailover(void) {

...

/* 5) Pong all the other nodes so that they can update the state
* accordingly and detect that we switched to master role. */
// 向所有节点发送 PONG 信息
// 让它们可以知道当前节点已经升级为主节点了
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);

...
}

2、对B来说,B1已经成为了Master,B从故障恢复后再次加入集群时,会成为B1的Slave。

Cluster数据丢失隐患及处理方案

一种数据丢失的场景是主从复制时Master挂掉了,这点我在《Redis 复制》讨论过。
另一种数据丢失的场景存在于Cluster集群中,并且并不是特别容易出现,也就是Cluster发生了脑裂,分区恢复时脑裂期间的数据被覆盖:

  1. 主节点挂掉了,从节点选举出了一个新的主节点,但是此时客户端还在与老主节点通信,将数据写入到老的主节点上;

    这种情况是可能发生的,因为客户端会记忆槽所在的节点,而不是每次请求都通过重定向定位到槽实际所在的节点上。

  2. 之后主从切换成功后,老的主节点会成功新主节点的Slave,并从新的主节点上获取数据,这时该节点上的数据会被清空,从而导致数据丢失。

为什么会发生脑裂?

在《Redis核心技术与实战》中提到了一种会导致脑裂的情况:

  1. Slave会定时地PING Master,发生的错误达到一定时间会被标记为主观下线,当标记主观下线的次数达到规定数量后,标记为客观下线;
  2. 但是Master实际上是“假故障”,即虽然响应Slave的心跳失败了,但是客户端还是可以和Master正常通信的。
    比如宿主机上有一些其他进程将CPU打满了,在打满期间,Slave就会有可能将Master判断为下线,开始选举及主从切换。

为什么脑裂会导致数据丢失?

主从切换后,从库会升级为新主库,这时如果老主库重新上线了,会成为新主库的Slave,执行全量同步,而全量同步执行的最后阶段,需要清空本地的数据,加载新主库发送过来的RDB文件,这期间写入的数据就会丢失了。

如何解决这种脑裂问题?

可以通过两个配置来解决这个脑裂问题:

  • min-slaves-to-write
    这个配置项设置了主库能进行数据同步的最少从库数量
  • min-slaves-to-write
    min-slaves-max-lag:这个配置项设置了主从库间进行数据复制时,从库给主库发送 ACK 消息的最大延迟(以秒为单位)

我们可以把 min-slaves-to-write 和 min-slaves-max-lag 这两个配置项搭配起来使用,分别给它们设置一定的阈值,假设为 N 和 T。这两个配置项组合后的要求是,主库连接的从库中至少有 N 个从库,和主库进行数据复制时的 ACK 消息延迟不能超过 T 秒,否则,主库就不会再接收客户端的请求了。即使原主库是假故障,它在假故障期间也无法响应哨兵心跳,也不能和从库进行同步,自然也就无法和从库进行 ACK 确认了。这样一来,min-slaves-to-write 和 min-slaves-max-lag 的组合要求就无法得到满足,原主库就会被限制接收客户端请求,客户端也就不能在原主库中写入新数据了。等到新主库上线时,就只有新主库能接收和处理客户端请求,此时,新写的数据会被直接写到新主库中。而原主库会被哨兵降为从库,即使它的数据被清空了,也不会有新数据丢失。
举个例子:假设我们将min-slaves-to-write 设置为 1,把 min-slaves-max-lag 设置为 12s,把哨兵的 down-after-milliseconds 设置为 10s,主库因为某些原因卡住了 15s,导致哨兵判断主库客观下线,开始进行主从切换。同时,因为原主库卡住了 15s,没有一个从库能和原主库在 12s 内进行数据复制,原主库也无法接收客户端请求了。这样一来,主从切换完成后,也只有新主库能接收请求,不会发生脑裂,也就不会发生数据丢失的问题了。

数据倾斜

Cluster集群通过CRC16算法将key hash到节点槽上,这个过程还是存在很多不确定性,可能很多数据会被hash到固定的某几个槽上,造成数据分布的不均匀,或者某些key是热点数据,被访问得尤其频繁。

数据倾斜的危害

数据倾斜的危害主要是保存热点数据的节点处理压力会增大,速度变慢,甚至内存资源耗尽而崩溃。

数据倾斜的成因

数据倾斜的成因主要有3个:

  1. bigkey
    bigkey一般是value值很大的string或保存了大量对象的集合类型。
    bigkey可能会造成实例IO线程阻塞,影响其他请求的执行效率。
    为了处理bigkey,设计的时候最好避免把过多的数据保存在同一个键值对中,如果是集合类型,还可以把bigkey拆分成多个小的集合类型数据,分散保存在不同的实例上。
  2. slot分配不均衡
    如果没有均衡地分配slot,就会有大量的数据被分配到同一个slot中,而同一个slot只会在一个实例上分布,并导致大量数据被集中到同一个实例上。
  3. Hash Tag
    hash tag指针对key的某个部分进行hash,比如user:123,可以加上hash tag后变成user:{123},只针对123进行hash。
    hash tag的意义主要在于可以将同类的数据hash到同一个槽上,便于范围查询。
    hash tag的缺点也在于分布到同一槽内后,对该槽所在节点的压力会变大。

数据倾斜的解决办法

数据倾斜可以通过重分配slot来解决。
但是热点数据往往是少部分数据被频繁访问,这种情况下重分配slot是无法解决的,为此可以通过热点数据多副本的方法来解决,比如同一key添加一个前缀然后hash到其他slot上。
但是多副本只能用于只读热点key,对于有读有写的热点数据,就只能给实例本身增加资源了,比如改成配置更高的机器。

QA

  1. Redis Cluster哈希槽通过CRC16算法将key哈希到实例上的槽,这样做有什么好处?为什么不直接用一张表来存储key和哈希槽之间的对应关系?
    如果用一张关系表来做映射,问题太多了,比如:key太多了怎么存关系?集群扩容、缩容、故障转移时怎么修改key和实例间的对应关系?
    而引入哈希槽,实际上是将数据和节点解耦,客户端只需关注key被hash到哪个哈希槽,就算打到错误的节点上,也可以通过

参考

  1. Redis 集群规范
  2. redis源码解析
  3. Redis集群详解(上)
  4. Redis集群详解(中)
  5. Redis集群(终篇)
  6. Redis在线数据迁移工具redis-migrate-tool详解,轻松实现redis集群之间的数据同步
  7. CRDT——解决最终一致问题的利器

Redis架构图
Redis知识框架
Redis问题画像

以上图片来自极客时间的《Redis核心技术与实战》。
对原理的分析都是基于Redis3.0版本的代码,现在最新的版本应该是6.0,很多功能都是后面的版本引入的,因此这篇里不会描述多线程这些功能。

为什么使用 Redis

Redis 的缺点 & 优点

特性及优势:

  1. 内存数据库,吞吐率不受磁盘影响;
  2. 每秒可以处理超过 10 万次读写操作;
  3. 多数据结构支持,包括 string、hash、list、set、zset、Bitmaps、Hyperloglog、Geo、Pub/Sub、Redis Module、BloomFilter、RedisSearch、Redis-ML 等,支持绝大多数应用场景;
  4. Replication(复制);
  5. lua(支持服务端执行复杂的业务逻辑);
  6. LRU eviction(缓存淘汰);
  7. Transactions;
  8. Persistence(持久化),包括 rdb(快照)和 AOF 两种;
  9. Sentinel(哨兵);
  10. Cluster(分区)。

但是也不能忽略 Redis 本身的一些缺点:

  1. 数据库容量受到物理内存的限制,不能用作海量数据的高性能读写,因此 Redis 适合的场景主要局限在较小数据量的高性能操作和运算上;
  2. 缓存和数据库双写一致性问题;
  3. 缓存雪崩问题;
  4. 缓存击穿问题;
  5. 缓存的并发竞争问题。

Redis & Memcached

Redis 相对 Memcached 来说有以下优点:

  1. memcached 所有的值均是简单的字符串,redis 作为其替代者,支持更为丰富的数据类型
  2. redis 的速度比 memcached 快很多
  3. redis 可以持久化其数据

Redis 和 Memcached 之间存在以下区别:

  1. 存储方式 Memecache 把数据全部存在内存之中,断电后会挂掉,数据不能超过内存大小。 Redis 有部份存在硬盘上,这样能保证数据的持久性。
  2. 数据支持类型 Memcache 对数据类型支持相对简单。 Redis 有复杂的数据类型。
  3. 使用底层模型不同 它们之间底层实现方式 以及与客户端之间通信的应用协议不一样。 Redis 直接自己构建了 VM 机制 ,因为一般的系统调用系统函数的话,会浪费一定的时间去移动和请求。

应用场景

  1. 会话缓存(Session Cache)
    最常用的一种使用 Redis 的情景是会话缓存(session cache)。用 Redis 缓存会话比其他存储(如 Memcached)的优势在于:Redis 提供持久化。当维护一个不是严格要求一致性的缓存时,如果用户的购物车信息全部丢失,大部分人都会不高兴的,现在,他们还会这样吗?
    幸运的是,随着 Redis 这些年的改进,很容易找到怎么恰当的使用 Redis 来缓存会话的文档。甚至广为人知的商业平台 Magento 也提供 Redis 的插件。
  2. 全页缓存(FPC)
    除基本的会话 token 之外,Redis 还提供很简便的 FPC 平台。回到一致性问题,即使重启了 Redis 实例,因为有磁盘的持久化,用户也不会看到页面加载速度的下降,这是一个极大改进,类似 PHP 本地 FPC。
    再次以 Magento 为例,Magento 提供一个插件来使用 Redis 作为全页缓存后端。
    此外,对 WordPress 的用户来说,Pantheon 有一个非常好的插件 wp-redis,这个插件能帮助你以最快速度加载你曾浏览过的页面。
  3. 队列
    Reids 在内存存储引擎领域的一大优点是提供 list 和 set 操作,这使得 Redis 能作为一个很好的消息队列平台来使用。Redis 作为队列使用的操作,就类似于本地程序语言(如 Python)对 list 的 push/pop 操作。
    当然要将 Redis 作为消息队列投入生产环境还有很多设计要点,比如采用 sleep 一段时间重试还是 blpop 阻塞、主题订阅、如何应对消费者下线导致的消息丢失问题(如何保证消息一定能被消费)等。
    Redis 作为消息队列坑比较多,如果希望少点麻烦且对服务质量有一定要求,最好还是采用 RocketMQ 这些比较成熟的方案。
  4. 延时队列
    使用 zset,时间戳作为 score,消费者用 zrangebyscore 指令获取 N 秒之前的数据轮询进行处理,这种思路和 JDK 中的 ScheduledThreadPoolExecutor 有点像。
  5. 排行榜/计数器
    Redis 在内存中对数字进行递增或递减的操作实现的非常好。集合(Set)和有序集合(Sorted Set)也使得我们在执行这些操作的时候变的非常简单,Redis 只是正好提供了这两种数据结构。所以,我们要从排序集合中获取到排名最靠前的 10 个用户–我们
    称之为“user_scores”,我们只需要像下面一样执行即可:
    当然,这是假定你是根据你用户的分数做递增的排序。如果你想返回用户及用户的分数,你需要这样执行:
    ZRANGE user_scores 0 10 WITHSCORES
    Agora Games 就是一个很好的例子,用 Ruby 实现的,它的排行榜就是使用 Redis 来存储数据的,你可以在这里看到。
  6. 发布/订阅
    最后(但肯定不是最不重要的)是 Redis 的发布/订阅功能。发布/订阅的使用场景确实非常多。我已看见人们在社交网络连接中使用,还可作为基于发布/订阅的脚本触发器,甚至用 Redis 的发布/订阅功能来建立聊天系统!(不,这是真的,你可以去核实)。
    Redis 提供的所有特性中,我感觉这个是喜欢的人最少的一个,虽然它为用户提供如果此多功能。
  7. 分布式锁
    不要用 setnx+expire,因为如果进程 crash 或重启这个锁就直接失效了。实际上 set 命令本身就包含超时和 cas 的设置。
  8. 扫描
    如果 Redis 中有 1 亿多个 key,其中有 10W+个 key 有固定的前缀(这种场景非常常见),如何将它们全部找出来?
    由于 Redis 的单线程特性,使用 keys 可能会阻塞 Redis 进程,最好换成 scan 命令,不过可能提取出的部分 key 是重复的,需要客户端做去重。

QA

Redis与其他KV存储有何不同?

  • 更多复杂的数据结构,支持更多特殊的场景;
  • Redis是内存数据库,但是支持持久化到磁盘。

有人说 Redis 只适合用来做缓存,当数据库来用并不合适,为什么?

Redis 的事务并不严格:
* A(原子性):Redis 支持事务,所有命令会被保存到一个事务队列中,服务器接收到 exec 时才会被真正执行,注意如果中间出错,事务不会回滚,后面的指令还会继续执行;而且如果涉及到复杂的逻辑判断,则只能通过lua 脚本实现“伪原子性”,说它是“伪原子性”是因为虽然脚本可以一次性执行多条命令,如果中间某个命令出错还是会无法保证“要么全部执行,要么都不执行”的要求。
* I(隔离性):Redis 是单线程模型,因此可以保证隔离性。
* D(持久性):Redis 是内存数据库,服务器意外崩溃会导致内存中的数据丢失,除非开启 AOF,并且配置成每次写入都记日志,但是这样又会极大地影响效率,所以一般会配置成混合模式的持久化。

Redis会占用多少内存空间?

  • An empty instance uses ~ 3MB of memory.
  • 1 Million small Keys -> String Value pairs use ~ 85MB of memory.
    那么10亿个kv,大概就会占用85G的内存了。
  • 1 Million Keys -> Hash value, representing an object with 5 fields, use ~ 160 MB of memory.

64位机器会占用比32位机器更多的内存,因为指针在64位机器上占用更多空间,但同时64位机器也可以有更大的内存空间。

Redis 的底层数据结构有哪些

sds:string 使用,变长字符串,不够的情况下重新分配空间并将老字符串数据拷贝过去;
dict:字典应用很多,包括 Redis 数据库中保存所有 key-value、hash、set、zset。dict 类似 Java 中的 HashMap,将 key 散列到哈希桶数组中,每个哈希桶都是一个链表,插入就是插入到链表头部,当元素超过了容量的一半后会启动渐进式 rehash 进行扩容。
ziplist:相当于一个数组,查询时需要遍历一次,每次插入都需要 realloc 重新分配一个新的更大的数组,然后把老数组内容拷贝过去。
quicklist:由于 linkedlist 附加空间成本高且容易产生碎片,因此 Redis 里的 quicklist 设计成了 linkedlist 和 ziplist 的结合,它将 linkedlist 按段切分,每一段使用 ziplist 存储;
skiplist:skiplist 用于实现 zset 中按 score 排序的要求,插入时先自顶向下查位置,然后按概率计算该节点应该分配到几层。

存储数据选择 string 还是 hash?

从业务层面来看,如果要存好多字段的对象、而且这个对象的每个字段都会单独拿出来用,则可以考虑使用 hash,否则没有太多限制条件。
从性能角度来看,如果存的字段少,hash 会使用 ziplist 结构存储,性能多少受点影响,而且还要考虑转换结构和渐进式扩容对性能的损耗。
从节约空间的角度来看,string 的 key 一般都会加个前缀,一般会比 hash 占用更多的空间,不过差距不大。

设计 redis 排序,数据结构是金额+花钱的时间,金额越大,时间越早越靠前

用 zset 存,score 是金额拼上时间,金额放高位,MAX_INT 和时间作差放低位,查询时使用ZREVRANGE命令查询。

hash 中哈希冲突怎么解决的

分两种情况:hash 在数据量小时结构是 ziplist,这时插入不会做冲突检测,插入到目标位置后就向后统一移动数据,给新插入的数据项流出空间;在数据量大时结构是 dict,这种结构和 Java 中的 HashMap 类似,使用链表来处理冲突。

  1. 说说 Redis 为什么那么快。
    单线程模型->减少了线程间上下文切换的开销。
    多路复用的 IO 模型->单线程监控多个连接。
  2. 为什么 Redis 记录 AOF 日志是先执行指令然后再记录 AOF 日志?而不是像其他存储引擎一样反过来?
    主要是因为 Redis 本身是缓存而不是 db,侧重点不同,db 先写日志是为了失败回滚,而 Redis 持久化是一个附加功能,只能保证数据不会完全丢失。

Redis 淘汰时,如果读取,会不会数据不完整

redis 的淘汰分两种:

  • 一种是过期,这种不会导致这种问题,因为查询时会判断下过期时间,过期了就不返回;
  • 另一种是超过内存容量淘汰,比如 LRU,这种也不会导致这种问题,因为执行每个命令时都会检查下缓存是否超出了阈值,可见代码server.c/processCommand
    Redis-执行命令前检查缓存是否溢出

Redis 的持久化原理是什么

Redis 有两种持久化方式:RDB 和 AOF
RDB 是快照,AOF 记录了写操作,效率起见,一般 RDB 作为 checkpoint,checkpoint 后的数据通过 AOF 恢复。

RDB 和 AOF 之间的区别

RDB 二进制文件可以直接加载到内存速度较快;AOF 要重放命令,所以速度比较慢。
RDB 需要全量备份,AOF 可以增量备份,二者的应用场景不同。

Redis的复制原理是什么?

master 会启动一个后台进程进行持久化(RDB or AOF),第一次连接时会将 RDB 文件发给 slave,slave 先保存到磁盘,之后加载到内存中;如果不是第一次连接,slave 连接 master 后通过 PSYNC 命令告知自己同步的起始位置,master 将增量部分 AOF 文件发送给 slave。

Redis 持久化期间,主进程还能对外提供服务吗?为什么

能。
因为 Redis 的复制是通过 fork 子进程实现的,父进程仍然可以接收请求。

持久化期间,Redis如何处理新写入的数据呢,这个数据也会直接进行持久化吗?

不会。
因为 Redis 复制是通过 fork 子进程实现的,由于 COW 机制,子进程只能看到老数据。

主从复制为什么会发生延迟?怎么解决

延迟无法避免,比如主从之间的网络抖动、slave 发生阻塞(如 IO)等情况。
解决办法有两种:

  • min-slave-to-write Nmin-slave-max-lag M,控制 Master,只有在至少有 N 个 slave 正在工作,并且滞后时间均小于 M 秒的情况下,Master 将不接受写入请求;
  • slave-serve-stale-data,控制从库对主库失去响应或复制进行过程中从库的表现,为 yes 则从库会继续响应客户端的请求,为 no 则除去 INFO 和 SLAVOF 命令之外的任何请求都会返回一个错误SYNC with master in progress
  • 编写外部监控程序,如果某个 slave 延迟较大,则通知 client 不要读这个 slave。

Redis 怎么实现高可用

从复制、Sentinel 到 Cluster

sentinel 中,使用客户端是怎么连接服务器的?(Redisson 配置)

见《Redis 客户端》。

哈希槽原理?和一致性哈希的区别?怎么落点

redis cluster 默认分配了 16384 个 slot,当我们 set 一个 key 时,会用CRC16算法来取模得到所属的 slot,然后将这个 key 分到哈希槽区间的节点上,具体算法就是:CRC16(key) % 16384。所以我们在测试的时候看到 set 和 get 的时候,直接跳转到了 7000 端口的节点。
哈希槽与一致性哈希的区别:哈希槽由客户端来重定向到目标 slot 所在节点,一致性哈希需要由服务器端重定向到目标节点,而且需要按顺时针方向一个一个节点递归地找。

Redis雪崩、击穿、穿透等现象是怎么出现的?怎么解决

  1. 缓存穿透
    缓存穿透指查询一个不存在的数据,出于容错考虑这个查询会穿透到 DB 层,如果这种带穿透的查询特别多可能会把 DB 打挂掉。
    解决办法:使用布隆过滤器,保存所有可能存在的数据到一个足够大的 bitmap 中,由于布隆过滤器的特性,一定不存在的数据在 bitmap 中一定找不到,从而可以很大程度上避免对底层存储系统的查询压力;还有一种更简单的方法,就是在查询返回结果为空时也把这个空结果缓存起来,但是它的过期时间会短一些,最长时间不超过 5 分钟。
  2. 缓存雪崩
    缓存雪崩指的是设置缓存时采用了相同的过期时间,导致缓存在同一时间同时失效,请求全部打到 DB,DB 瞬时压力过大导致雪崩。
    解决办法:缓存失效时间随机化,在原有失效时间基础上加上一个随机值,可以使得过期时间的重复率降低;加锁并令请求排队,使得请求串行化,避免所有请求都查询数据库,不过这样会导致性能的降低。
  3. 缓存击穿
    缓存击穿指的是某个 key 在过期时正好有大量请求访问该 key,这些请求会同时回表,可能会瞬间将后端 DB 打挂。
    解决办法:使用互斥锁,缓存失效时先加锁,避免并发回表;一些长时间不变的数据完全可以不设置过期时间,或者过期时间特别长。

主从复制的流程?传的是文件吗?

流程见《主从同步》。
如果是全量同步,同步时会先同步 RDB 文件,再同步增量写命令;
如果是部分重同步,则只同步增量写命令。

中间传输失败怎么办?中间传输不一致怎么办

如果上次传输中断,则下次同步时从中断位置开始执行部分重同步。

参考

  1. FAQ
  2. 使用vscode(gdb)调试redis

应用

  1. Redis strings vs Redis hashes to represent JSON: efficiency?

数据结构

  1. Redis 源码涉及 C 语言
  2. Redis 内部数据结构详解(1)——dict
  3. Redis 内部数据结构详解(2)——sds
  4. Redis 内部数据结构详解(3)——robj
  5. Redis 内部数据结构详解(4)——ziplist
  6. Redis 内部数据结构详解(5)——quicklist
  7. Redis 为什么用跳表而不用平衡树?
  8. Redis 中的集合类型是怎么实现的?

Persistence

  1. 剖析 Redis RDB 文件
  2. Redis 源码分析–RDB 实现源码阅读
  3. Redis 源码分析–AOF 文件全量重写源码阅读
  4. Redis 源码分析–AOF 文件增量追写源码阅读

客户端

  1. Redis 如何处理客户端连接
  2. 剖析 Redis 协议
  3. 剖析 Redis 协议(续)

主从复制

  1. 复制
  2. redis 系列–主从复制以及 redis 复制演进

Sentinel & Cluster

  1. 分区:怎样将数据分布到多个 redis 实例
  2. Redis 的 Sentinel 文档
  3. Redis 集群教程
  4. Redis 集群规范
  5. 一致性哈希和哈希槽对比
  6. Redis 集群扩容和缩容

架构迁移

  1. Redis 集群迁移案例
  2. redis-migrate-tool
  3. redis-port
  4. redis-migration
    redis-migration
    redis-migration:独创的 redis 在线数据迁移工具

Twemproxy

  1. Twemproxy

Codis

  1. Codis

Redisson

  1. Redisson

系统优化

  1. Redis 响应延迟问题排查

发号器几乎是最简单的一个中间件了,它旨在生成一个全局唯一ID,用于在业务领域内标识一个对象。

阅读全文 »

概述

序列化(Serialization):事件A必须在事件B之前发生。
互斥(Mutual exclusion):事件A和B不能同时发生。

同步方式

使用消息同步

线程A:

1
2
do sth...
call B

线程B:

1
2
wait for A
do sth...

B会等待A发来消息后再执行后续的指令。

共享变量

1、并发写
线程A

1
2
x = 5
print x

线程B

1
x = 7

这两个线程并发执行,最后打印出来的结果不确定是5还是7。
2、并发更新
线程A:

1
count = count + 1

线程B:

1
count = count + 1

两个线程的操作都是读后写,可能就会发生同时读出旧值然后都+1,最终结果并没有+2的情况。
3、通过发消息互斥执行
通过发消息保证共享变量的安全更新。

信号量

支持PV操作,P原子地减少信号量值,当值为0时阻塞,V原子地增加信号量值。

信号量的优点:

  1. 信号量的约定使得代码更不容易出错;
  2. 信号量在很多系统都有实现,使用信号量是可移植的。

基础同步模式

发信号(Signaling)

一个线程发消息给另一个线程告知某件事情的发生。
线程A:

1
2
statement a1
sem.signal()

线程B:

1
2
sem.wait()
statement b1

只有A signal发出消息后,B才能从wait离开继续执行。

在Java中,发信号的功能可以通过Object的wait/notify、Lock的Condition、LockSupport、Semaphore实现。

Rendezvous

不知道怎么翻译,叫做汇聚?作者给出的是类似下面这样的例子:
线程A:

1
2
3
4
statement a1
aArrived.signal()
bArrived.wait()
statement a2

线程B:

1
2
3
4
statement b1
bArrived.signal()
aArrived.wait()
statement b2

注意signal和wait不要写反了,写反了会死锁。
在Java中,可以通过CyclicBarrier实现。

互斥量(Mutex)

使用信号量可以实现互斥量,实际上互斥量可以看作Semaphore(1),使用以下代码就可以实现两个线程的互斥执行:
线程A:

1
2
3
mutex.wait()
count = count + 1
mutex.signal()

线程B:

1
2
3
mutex.wait()
count = count + 1
mutex.signal()

上边mutex包围的代码就称为临界区代码(critical section)。

多路复用(Multiplex)

将上边的互斥量泛化,我们让多个线程可以同时执行一块临界区代码。
其实就是用Semaphore(n)就可以实现n个线程同时执行了。

栅栏(Barrier)

只有所有线程都到达某个位置才能一块继续执行下去,栅栏可以通过以下代码实现(有bug,会出现死锁):

1
2
3
count = 0
mutex = Semaphore(1)
barrier = Semaphore(0)
1
2
3
4
5
6
7
8
9
10
11
rendezvous // 汇聚

mutex.wait()
count = count + 1
mutex.signal()

if count == n
barrier.signal()

barrier.wait()
其他代码

如果直接拿来执行,容易发现只有1个线程能执行下去,因为:

  1. 假设有n=5,即5个线程并发执行;
  2. 前4个线程到了barrier.wait后barrier的值变为-4;
  3. 第5个线程barrier.signal释放了1个,barrier的值变为-3,此时只有一个线程被放过去了,另外还有3个线程仍阻塞,且第5个线程随后也会进入阻塞状态。

修改后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
rendezvous // 汇聚

mutex.wait()
count = count + 1
mutex.signal()

if count == n
barrier.signal()

barrier.wait()
barrier.sign
其他代码

并发问题解题模型

分析问题时:

  1. 寻找角色,每个角色对应一个独立线程;
  2. 寻找共享资源,每个共享资源对应一个信号量(或其他并发控制类);
  3. 按场景描述进行模拟;

交错打印

两个线程交替打印数组的功能,虽然比较简单,但是面试时问的还蛮多的,如果用Semaphore实现会比较简单,用Java的wait/notify或Condition实现则会稍微麻烦一点。

Semaphore实现交错打印

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class OneByOneTest {

static int count = 0;
static Semaphore first = new Semaphore(1);
static Semaphore second = new Semaphore(0);

public static void main(String[] args) {
Thread t1 = new Thread(() -> {
while(true) {
try {
first.acquire();
Thread.sleep(10);
System.out.println("线程1: " + count++);
second.release();
}catch (Exception e) {
e.printStackTrace();
}
}
});
Thread t2 = new Thread(() -> {
while(true) {
try {
second.acquire();
Thread.sleep(10);
System.out.println("线程2: " + count++);
first.release();
}catch (Exception e) {
e.printStackTrace();
}
}
});
t1.start();
t2.start();
}
}

Condition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class OneByOneConditionTest {

private static int counter = 0;
private static Lock lock = new ReentrantLock();
private static Condition first = lock.newCondition();
private static Condition second = lock.newCondition();
private static int currentPrinter = 0;

public static void main(String[] args) {
Thread t1 = new Thread(() -> {
while(true) {
try {
lock.lock();
while(currentPrinter == 1) {
first.await();
}
Thread.sleep(100);
System.out.println("线程1: " + counter++);
second.signal();
currentPrinter = 1;
lock.unlock();
} catch (Exception e) {
e.printStackTrace();
}
}
});
Thread t2 = new Thread(() -> {
while(true) {
try {
lock.lock();
while(currentPrinter == 0) {
second.await();
}
Thread.sleep(100);
System.out.println("线程2: " + counter++);
first.signal();
currentPrinter = 0;
lock.unlock();
} catch (Exception e) {
e.printStackTrace();
}
}
});
t1.start();
t2.start();
}
}

wait/notify

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class WaitNotiftTest {

private static int count = 1;
private static Object lock = new Object();
private static int cur = 1;

static class A implements Runnable {

@Override
public void run() {
while (true) {
try {
synchronized (lock) {
if (count > 100) {
return;
}
while (cur != 1) {
lock.wait();
}
System.out.println("第一个线程输出: " + count++);
Thread.sleep(100);
cur = 2;
lock.notifyAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

static class B implements Runnable {

@Override
public void run() {
while (true) {
try {
synchronized (lock) {
if (count > 100) {
return;
}
while (cur != 2) {
lock.wait();
}
System.out.println("第二个线程输出: " + count++);
Thread.sleep(100);
cur = 1;
lock.notifyAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args) {
Thread a = new Thread(new A());
Thread b = new Thread(new B());
a.start();
b.start();
}
}

Producer-Consumer

解决生产者/消费者问题需要维护一个队列,生产者向队列添加,消费者从队列获取,同步问题出现在队列为空或满的情况,因此我们需要对队列进行同步化。
为了简化问题,可以使用 juc 引入的 BlockingQueue(阻塞队列),这种数据结构能在下面两种情况下阻塞当前线程

  • 当队列为空时,调用 take 或 poll
  • 当队列满时,调用 put 或 offer

使用 Semaphore 实现Producer/Consumer代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class ProducerConsumerTest {

static Semaphore mutex = new Semaphore(1);
static Semaphore items = new Semaphore(0);
static Semaphore spaces = new Semaphore(10);

static int resource = 0;

static class Producer implements Runnable {

@Override
public void run() {
int count = 5;
while (count-- > 0) {
try {
spaces.acquire();
mutex.acquire();
resource++;
System.out.println("Producer添加一个,现在resource=" + resource);
mutex.release();
items.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

static class Consumer implements Runnable {

@Override
public void run() {
int count = 5;
while (count-- > 0) {
try {
items.acquire();
mutex.acquire();
resource--;
System.out.println("Consumer消费一个,现在resource=" + resource);
mutex.release();
spaces.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args) {
Thread a = new Thread(new Producer());
Thread b = new Thread(new Consumer());
a.start();
b.start();
}
}

使用 BlockingQueue 实现生产者/消费者代码

下面是使用 BlockingQueue 实现的生产者/消费者代码
注意要使用put/take这组方法,而不是offer/poll,因为后者会在满/空时直接返回(而非阻塞等待)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class Producer implements Runnable {
private final BlockingQueue<String> queue;

public Producer(BlockingQueue<String> queue) {
this.queue = queue;
}

@Override
public void run() {
for(int i = 0; i < 10; i++) {
try {
queue.put("Course" + (i + 1));
System.out.println("Complete production:Course" + (i + 1));
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

public class Consumer implements Runnable {
private final BlockingQueue<String> queue;

public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}

@Override
public void run() {
for(int i = 0; i < 10; i++) {
try {
String course = queue.take();
System.out.println("Complete consumption:" + course);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

public class Main {
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
Thread t1 = new Thread(new Producer(queue));
Thread t2 = new Thread(new Consumer(queue));
t1.start();
t2.start();
try {
t1.join();
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

Readers-writers

读写问题中有两类线程:

  • 读线程:多个读线程可以同时在临界区;
  • 写线程:多个写线程之间互斥,与读线程也互斥。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class ReaderWriterTest {

// 当前有多少读线程正在读
static int readers = 0;
// 保护readers计数器
static Semaphore mutex = new Semaphore(1);
// 0表示有线程正在临界区,1表示没有
static Semaphore rootEmpty = new Semaphore(1);

static Runnable reader = () -> {
try {
mutex.acquire();
readers++;
// 第一个来的需要等锁释放,其他reader会在mutex那里等着
if(readers == 1) {
rootEmpty.acquire();
}
mutex.release();
System.out.println("读取开始, readers:" + readers);
Thread.sleep(1000);
System.out.println("读取结束, readers:" + readers);
mutex.acquire();
readers--;
if(readers == 0) {
rootEmpty.release();
}
mutex.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
};

static Runnable writer = () -> {
try {
rootEmpty.acquire();
System.out.println("写入开始");
Thread.sleep(3000);
System.out.println("写入结束");
rootEmpty.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
};

public static void main(String[] args) {
Thread reader1 = new Thread(reader);
Thread reader2 = new Thread(reader);
Thread reader3 = new Thread(reader);
Thread reader4 = new Thread(reader);
Thread reader5 = new Thread(reader);
Thread reader6 = new Thread(reader);
Thread writer1 = new Thread(writer);
reader1.start();
reader2.start();
reader3.start();
reader4.start();
reader5.start();
reader6.start();
writer1.start();
}
}

上面的代码有个问题,就是写线程可能被饿死,因为第一个读线程通过rootEmpty.acquire进来后,后续的读线程都不必再等待,可以直接进入临界区,而同时执行的写线程就永远都等在rootEmpty.acquire上了。
改成如下的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class ReaderWriterTest {

// 当前有多少读线程正在读
static int readers = 0;
// 保护readers计数器
static Semaphore mutex = new Semaphore(1);
// 0表示有线程正在临界区,1表示没有
static Semaphore rootEmpty = new Semaphore(1);
// 控制reader和writer获取锁
static Semaphore turnstile = new Semaphore(1);

static Runnable reader = () -> {
try {
turnstile.acquire();
turnstile.release();
mutex.acquire();
readers++;
// 第一个来的需要等锁释放,其他reader会在mutex那里等着
if(readers == 1) {
rootEmpty.acquire();
}
mutex.release();
System.out.println("读取开始, readers:" + readers);
Thread.sleep(1000);
System.out.println("读取结束, readers:" + readers);
mutex.acquire();
readers--;
if(readers == 0) {
rootEmpty.release();
}
mutex.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
};

static Runnable writer = () -> {
try {
turnstile.acquire();
rootEmpty.acquire();
System.out.println("写入开始");
Thread.sleep(3000);
System.out.println("写入结束");
turnstile.release();
rootEmpty.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
};

public static void main(String[] args) {
Thread reader1 = new Thread(reader);
Thread reader2 = new Thread(reader);
Thread reader3 = new Thread(reader);
Thread reader4 = new Thread(reader);
Thread reader5 = new Thread(reader);
Thread reader6 = new Thread(reader);
Thread writer1 = new Thread(writer);
reader1.start();
reader2.start();
reader3.start();
reader4.start();
reader5.start();
reader6.start();
writer1.start();
}
}

Dining philosophers(哲学家就餐)

我们先来看下最开始最直观的一种错误解法,这种解法会导致死锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class DiningPhilosophersTest {

private static final int count = 2;

private Semaphore[] forks = new Semaphore[count];

{
for (int i = 0; i < forks.length; i++) {
forks[i] = new Semaphore(1);
}
}

private int left(int i) {
return i;
}

private int right(int i) {
return (i + 1) % forks.length;
}

private void getForks(int i) throws InterruptedException {
forks[left(i)].acquire();
forks[right(i)].acquire();
}

private void putForks(int i) {
forks[left(i)].release();
forks[right(i)].release();
}

public void doDining() {
Thread[] threads = new Thread[count];
for (int i = 0; i < threads.length; i++) {
int finalI = i;
threads[i] = new Thread(() -> {
while (true) {
try {
getForks(finalI);
System.out.println(finalI + " 开始就餐");
Thread.sleep(10);
System.out.println(finalI + " 结束就餐");
putForks(finalI);
} catch (Exception e) {
e.printStackTrace();
}
}
});
threads[i].start();
}
}

public static void main(String[] args) {
DiningPhilosophersTest test = new DiningPhilosophersTest();
test.doDining();
}
}

显然,n位哲学家刚开始都没有处于就餐状态,如果他们同时拿起左边的叉子,然后尝试取右边的叉子,就会直接导致死锁。

减少同时获取叉子的哲学家数量

注意,上面发生死锁的必要条件是“n位哲学家同时就餐”,如果n位无法同时就餐,那这个问题也就迎刃而解了,所以我们额外引入一个footman信号量,它的数量控制在n - 1

1
2
3
4
5
6
7
8
9
10
11
12
13
private Semaphore footman = new Semaphore(count - 1);

private void getForks(int i) throws InterruptedException {
footman.acquire();
forks[left(i)].acquire();
forks[right(i)].acquire();
}

private void putForks(int i) {
forks[left(i)].release();
forks[right(i)].release();
footman.release();
}

同时存在左撇子(先拿左手边叉子的)和右撇子(先拿右手边叉子的)

另外一种解决办法是让一个哲学家先获取右边的叉子再获取左边的叉子,这样其实解除了环路等待条件:假设有5个哲学家,其中4个哲学家拿到左手的叉子后,第五个哲学家会尝试取第一个叉子,也就是第一个哲学家左手的叉子,他们两个不满足死锁的条件。
这种思路的代码比较简单,就先忽略了。

Tanenbaum的解

这是一种相对比较极端的解,每个哲学家都需要等两边的人不在就餐的情况下才能就餐,否则他什么都不做:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private static final int count = 2;

private Semaphore mutex = new Semaphore(1);

/**
* state为0表示正在思考(thinking),1表示准备就餐(hungry),2表示正在就餐(eating)
*/
private int[] states = new int[count];

private Semaphore[] forks = new Semaphore[count];

{
for (int i = 0; i < forks.length; i++) {
forks[i] = new Semaphore(0);
states[i] = 0;
}
}

private int left(int i) {
return i;
}

private int right(int i) {
return (i + 1) % forks.length;
}

private void test(int i) {
// 如果自己准备就餐且两边的人都不在就餐,则自己可以就餐
if(states[i] == 1
&& states[left(i)] != 2
&& states[right(i)] != 2) {
states[i] = 2;
forks[i].release();
}
}

private void getForks(int i) throws InterruptedException {
mutex.acquire();
states[i] = 1;
test(i);
mutex.release();
forks[i].acquire();
}

private void putForks(int i) throws InterruptedException {
mutex.acquire();
states[i] = 0;
test(right(i));
test(left(i));
mutex.release();
}

这种解存在的主要问题是会发生饥饿,比如2个哲学家的情况下,可能1号会一直处于就餐状态,2号一直处于循环检测的状态,于是就发生了饥饿。

Cigarette smokers

The dining savages(野人就餐)

The barbershop(理发师问题)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#define N 10 //最多10个顾客
typedef struct queue{
int nums[N];
int front;
int rear;
}queue;
int isFull(queue *q){
return q->front + 1 == q->rear;
}
int isEmpty(queue *q){
return q->front == q->rear;
}
//返回顾客标志
int dequeue(queue *q){
if(isEmpty(q)){
puts("error: empty!");
return 0;
}
else{
int num = q->nums[q->rear];
q->rear = (q->rear + 1) % N;

return num;
}
}
void enqueue(queue *q, int num){
if(isFull(q)){
puts("error: full queue!");
return ;
}
else{
q->nums[q->front] = num;
q->front = (q->front + 1) % N;
q->front++;
}
}

//顾客每次等待,若队列已满将会被忽略
void customerWait(semaphore *barber, int num){
if(barber->busy == no){
barber->busy = yes;
enqueue(&barber->customers, num);
printf("这个顾客开始接受服务");
}
else{
//将当前顾客加入等待队列,似乎不是原子操作?
enqueue(&barber->customers, num);
printf("加入顾客%d", num);
}
}
//理发师每次等待新用户,
void barberSignal(semaphore *barber){
if(isEmpty(&barber->customers)){
barber->busy = no;
}
else{
int num = dequeue(&barber->customers);
printf("顾客%d开始交易", num);
sleep(3000);//每个顾客睡三秒
printf("结束交易");
}
}

/*
用一个主函数开启一个理发师进程,理发师进程等待新顾客,对每一个顾客sleep(3000)作为服务时间,然后signal,
主函数等待用户输入用户id,对每一个用户id开启一个进程,
*/

semaphore barber;
void simulate(){
barber.busy = no;
barber.customers.front = barber.customers.rear = 0;

int pid = fork();
if(pid == 0){
//说明是理发师
while(1){
barberSignal(&barber);
}
}
else{
int num;
while(1){
scanf("%d", &num);
printf("%d", num);
//int pid1 = fork();
//if(pid1 != 0){
//顾客进程
customerWait(&barber, num);
// printf("哈哈哈");
// return ;
//}
//父进程继续运行
}

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>

#define N 10 //最多10个顾客
/***********************信号量**************************/
sem_t barbers;
sem_t customers;
sem_t mutex;
int customerCount = 0;

void haircut(){
printf("理发师剪头中\n");
sleep(3);//服务时间
}
void get_haircut(){
sleep(3);
}
void *barber(void *arg){
while(1){
if(customerCount == 0){
printf("理发师打瞌睡\n");
sem_wait(&customers);//等顾客
printf("理发师被叫醒了\n");
}
else{
sem_wait(&customers);
}

sem_post(&barbers);//唤醒理发师
haircut();//开始服务
}
}
void *customer(void *arg){
while(1){
if(customerCount > 0){
printf("顾客等理发师\n");
sem_wait(&barbers);//等理发师

sem_wait(&mutex);
customerCount--;
sem_post(&mutex);

get_haircut();//接受服务
printf("理完头这个顾客离开了\n");
}
}
}
void *customer_arrive(void *arg){
int num;
while(1){
scanf("%d", &num);
printf("来了一个顾客\n");
sem_wait(&mutex);
if(customerCount < N){//如果还有空位
sem_post(&mutex);
sem_post(&customers);//添加一个顾客资源
customerCount++;//顾客增加
}
else{
sem_post(&mutex);
printf("没椅子了,顾客离开了\n");
}
}
}

void simulate(){
pthread_t barber_t, customer_t, customer_arrive_t;


if(sem_init(&barbers, 0, 1) != 0){
printf("sem init failed");
}
if(sem_init(&customers, 0, 0) != 0){
printf("sem init failed");
}
if(sem_init(&mutex, 0, 1) != 0){
printf("sem init failed");
}

printf("begin barber_t\n");
pthread_create(&barber_t, NULL, barber, NULL);
printf("begin customer_t\n");
pthread_create(&customer_t, NULL, customer, NULL);
printf("begin customer_arrive_t\n");
pthread_create(&customer_arrive_t, NULL, customer_arrive, NULL);

while(1){}
}


int main(void ){
simulate();

return 0;
}

卖票问题

一个火车站有多个窗口,它们同时卖票,而票数使用一个 ticket 变量进行计算,对票数有查询和修改两个操作,这两个操作不能同时进行,并且写操作可能不是原子的,两个写操作也不能同时进行

  • 使用 Atom 类型来保存票数,这样写之间就不需要进行同步了

参考

  1. The Little Book of Semaphores

在 Java 体系中,提到并发就不得不提到 JMM,因为所有并发安全都是围绕内存来展开的,可以说不懂内存结构就不懂并发。

阅读全文 »

垃圾收集(GC)

垃圾检测

在实际回收垃圾对象前,我们必须标识出哪些对象该被回收,即垃圾检测。

对象引用类型

  1. 强引用(StrongReference)
    Object obj = new Object()的 obj 就是一个强引用。
    当内存不足,JVM 宁愿抛出 OutOfMemoryError 错误,使程序异常终止,也不会回收强引用对象来释放内存,除非已经没有引用关联这些对象了。
    除了强引用之外,其他三种引用都在java.lang.ref包中。
  2. 软引用(SoftReference)
    GC 发现了只具有软引用的对象并不会立即进行回收,而是让它活的尽可能久一些,在内存不足前再进行回收。
    在使用缓存的场景的时候会经常采用此种引用方式,来增加系统可用性的弹性空间。Spring 和 cache 里面大量采用了此种引用方式。
  3. 弱引用(WeakReference)
    GC 一旦发现了只具有弱引用的对象,不管当前内存空间足够与否,都会回收它的内存。如果有场景,发现创建完对象很少可能会用到,就采用这种方式,不过实际工作确实很少见到有人用到3,4两个引用。
  4. 虚引用(PhantomReference)
    “虚引用”顾名思义,就是形同虚设,与其他几种引用都不同,虚引用并不会决定对象的生命周期;如果一个对象仅持有虚引用,那么它就和没有任何引用一样,在任何时候都可能被垃圾回收器回收。当垃圾回收器准备回收一个对象时,如果发现它还有虚引用,就会在回收对象的内存之前,把这个虚引用加入到与之关联的引用队列中。
    虚引用主要用来跟踪对象被 GC 回收的活动,虚引用必须和引用队列(ReferenceQueue)配合使用。

Reference

Reference 抽象类是除强引用外的所有引用类型的父类,有以下几种子类

  1. SoftReference 类:软引用
    1
    2
    MyObject obj = new MyObject();  
    SoftReference<MyObject> ref = new SoftReference<MyObject>(obj);
  2. WeakReference 类:弱引用
  3. PhantomReference 类:虚引用
  4. ReferenceQueue 类:引用队列

垃圾检测算法 - 引用计数

堆中的每一个对象的对象域包含一个引用计数器。该计数器的维护规则如下:

  • 当一个对象被创建,并把指向该对象的引用赋值给一个变量时,引用计数置为1
  • 当再把这个引用赋值给其他变量时,引用计数加1
  • 当一个对象的引用超过了生命周期或者被设置为新值时,对象的引用计数减 1,任何引用计数为 0 的对象都可以被当成垃圾回收。
  • 当一个对象被回收时,它所引用的任何对象计数减1,这样,可能会导致其他对象也被当垃圾回收。

但是一般垃圾回收器并不会采用这种算法,主要是因为引用计数算法存在循环引用的问题(注意不是栈帧里的引用,而是堆中实例的互相引用)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ReferenceCountingGC {
public Object instance = null;
private static final int _1MB = 1024 * 1024;
/**
* 这个成员属性的唯一意义就是占点内存,以便能在GC日志中看清楚是否被回收过
*/
private byte[] bigSize = new byte[2 * _1MB];

public static void testGC() {
// 定义两个对象
ReferenceCountingGC objA = new ReferenceCountingGC();
ReferenceCountingGC objB = new ReferenceCountingGC();

// 给对象的成员赋值,即存在相互引用情况
objA.instance = objB;
objB.instance = objA;

// 将引用设为空,即没有到堆对象的引用了
objA = null;
objB = null;

// 进行垃圾回收
System.gc();
}

public static void main(String[] args) {
testGC();
}
}

如上边代码所示,执行objA = nullobjB = null后,它们二者的 instance 域仍然互相是对方的引用。

垃圾检测算法 - 可达性分析

若一个对象没有引用链与任一个 GC Roots 相连时,此对象可回收
包括虚拟机栈中引用的对象、方法区中类的静态成员变量引用的对象、方法区中的常量引用的对象、本地方法栈中 Native 方法引用的对象
根部(Roots):表示引用链的头部
引用链(Reference Chain):多个引用形成的一条链
引用:是 reference 类型的对象,其中存储的数据代表的是另外一块内存的起始位置,有强引用(Strong)、软引用(Soft)、弱引用(Weak)、虚引用(Phantom)四种。

此算法的基本思想就是选取一系列 GC Roots 对象作为起点,开始向下遍历搜索其他相关的对象,搜索所走过的路径成为引用链,遍历完成后,如果一个对象到 GCRoots 对象没有任何引用链,则证明此对象是不可用的,可以被当做垃圾进行回收。
那么问题又来了,如何选取 GCRoots 对象呢?在 Java 语言中,可以作为 GCRoots 的对象包括下面几种:

  1. 虚拟机栈(栈帧中的局部变量区,也叫做局部变量表)中引用的对象。
  2. 方法区中的类静态属性引用的对象。
  3. 方法区中常量引用的对象。
  4. 本地方法栈中 JNI(Native 方法)引用的对象。

可达性分析算法
如上图所示,Obj8、Obj9、Obj10 都没有到 GC Root 的引用链,因此它们会被标记为垃圾,即便 Obj9 和 Obj10 之间有引用关系。

引用与垃圾检测算法

对于可达性分析算法而言,未到达的对象并非是“非死不可”的,若要宣判一个对象死亡,至少需要经历两次标记阶段。

  1. 如果对象在进行可达性分析后发现没有与 GCRoots 相连的引用链,则该对象被第一次标记并进行一次筛选,筛选条件为是否有必要执行该对象的finalize方法,若对象没有覆盖 finalize 方法或者该 finalize 方法是否已经被虚拟机执行过了,则均视作不必要执行该对象的 finalize 方法,即该对象将会被回收。反之,若对象覆盖了 finalize 方法并且该 finalize 方法并没有被执行过,那么,这个对象会被放置在一个叫F-Queue的队列中,之后会由虚拟机自动建立的、优先级低的Finalizer线程去执行,而虚拟机不必要等待该线程执行结束,即虚拟机只负责建立线程,其他的事情交给此线程去处理。
  2. 对 F-Queue 中对象进行第二次标记,如果对象在 finalize 方法中拯救了自己,即关联上了 GCRoot 引用链,如把 this 关键字赋值给其他变量,那么在第二次标记的时候该对象将从“即将回收”的集合中移除,如果对象还是没有拯救自己,那就会被回收。如下代码演示了一个对象如何在 finalize 方法中拯救了自己,然而,它只能拯救自己一次,第二次就被回收了。具体代码如下
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    /*
    * 此代码演示了两点:
    * 1.对象可以再被GC时自我拯救
    * 2.这种自救的机会只有一次,因为一个对象的finalize()方法最多只会被系统自动调用一次
    * */
    public class FinalizeEscapeGC {
    public static FinalizeEscapeGC SAVE_HOOK = null;
    @Override
    protected void finalize() throws Throwable {
    super.finalize();
    System.out.println(this + ": finalize method executed!");
    FinalizeEscapeGC.SAVE_HOOK = this;
    }
    public static void main(String[] args) throws InterruptedException {
    SAVE_HOOK = new FinalizeEscapeGC();
    System.out.println(SAVE_HOOK);
    // 对象第一次拯救自己
    SAVE_HOOK = null;
    System.out.println(SAVE_HOOK);
    System.gc();
    // 因为finalize方法优先级很低,所以暂停0.5秒以等待它
    Thread.sleep(500);
    System.out.println(SAVE_HOOK);
    // 下面这段代码与上面的完全相同,但是这一次自救却失败了
    // 一个对象的finalize方法只会被调用一次
    SAVE_HOOK = null;
    System.gc();
    // 因为finalize方法优先级很低,所以暂停0.5秒以等待它
    Thread.sleep(500);
    System.out.println(SAVE_HOOK);
    }
    }

垃圾收集算法

标记清除(Mark-Sweep)

先标记所有需要清除的对象,再统一回收。是最基础的垃圾回收算法,后续的收集算法都是基于这种思路并对其缺点进行改进而得到的。
问题

  • 效率低,标记和清除都需要一次线性扫描;
  • 产生大量内存碎片,当程序在以后的运行过程中需要分配较大对象时无法找到足够的连续内存而不得不提前触发另一次垃圾收集动作。

首先标记出所有需要回收的对象,使用可达性分析算法判断一个对象是否为可回收,在标记完成后统一回收所有被标记的对象。下图是算法具体的一次执行过程后的结果对比。
标记清除算法

复制算法(Copying)

将可用内存划分为大小相等的两半,对每一块使用指针碰撞(从已分配内存向空闲内存空间移动对象大小的空间)的方法为对象分配空间,如果这一块内存用完,就将还存活的对象复制到另一半块上,将原来的这一半一次清理掉。
HotSpot 中使用的是 Eden-Survivor 方法,大体上每次使用一个 Eden 和一个 Survivor 来分配对象空间,当回收时,将这两块中还存活的对象一次性复制到另一块 Survivor 中,Eden 和 Survivor 的比例为8:1。如果 Survivor 的空间不够了,就会使用老年代进行分配担保(Handle Promotion)

  • 将现有的内存空间分为两快,每次只使用其中一块;
  • 当其中一块时候完的时候,就将还存活的对象复制到另外一块上去;
  • 再把已使用过的内存空间一次清理掉。

优点

  • 由于是每次都对整个半区进行内存回收,内存分配时不必考虑内存碎片问题;
  • 只要移动堆顶指针,按顺序分配内存即可,可以利用Bump-the-pointer(指针碰撞)实现,实现简单,运行高效;

    像标记-清除算法清理后的内存空间并不规整,可能会有很多碎片,因此只能使用空闲列表(Free List)的方式分配内存。

缺点

  • 内存减少为原来的一半,太浪费了(用空间换时间);
  • 对象存活率较高的时候就要执行较多的复制操作,效率变低;
  • 如果不使用50%的对分策略,老年代需要考虑空间担保策略,复杂度变高。

复制算法
将内存分为两等块,每次使用其中一块。当这一块内存用完后,就将还存活的对象复制到另外一个块上面,然后再把已经使用过的内存空间一次清理掉。图是算法具体的一次执行过程后的结果对比。

标记-整理算法(Mark-Compact)

标记过程和Mark-Sweep一样,但是不直接清除,而是让存活的对象向前移,再清理端边界外的内存。
标记过程还是和标记-清除算法一样,之后让所有存活的对象都向一端移动,然后直接清理掉边界以外的内存,标记 - 整理算法示意图如下
标记整理算法

标记-整理算法往往与标记-清除同时使用,优先执行标记-清除,当内存空间碎片过多时,才运行标记-整理压缩内存空间。

分代收集算法(Generational Collection)

将 Java 堆分为新生代和老生代,根据各个年代的特点采取最适当的收集算法。在新生代中死得快,就选用复制算法(要复制的少),老生代中对象存活率高,就使用标记整理或标记清除算法。

Java垃圾回收的基本概念

GC文章有些常用的概念:

  • Mutator:生产垃圾的对象;
  • TLAB(Thread Local Allocation Buffer):线程可以优先将对象分配在Eden区的一块线程独享内存,因为是线程独享的,没有锁竞争,所以分配速度更快。
  • Card Table:Java中的垃圾收集器以内存页作为分配单位,使用Card Table标记被写入过的卡页为dirty,dirty页面中的对象可达性可能发生变化,因此在像CMS这样的垃圾回收器的重标记阶段会被重新扫描一次。
  • 分代回收
    JVM中采用的分代回收算法将堆内存划分为年轻代、老年代、元空间和常量池(字符串、常量),以及栈空间、堆外内存。
    垃圾回收主要处理的是年轻代和老年代的对象。
  • 对象分配
    JVM通过Unsafe调用C的allocate和free方法分配、释放对象,分配方法有空闲链表(free list)和碰撞指针(bump pointer)两种。
  • GC
    垃圾收集需要先识别垃圾,然后再使用垃圾回收算法回收空间。
    垃圾识别算法主要有引用计数、可达性分析;
    GC算法常见的主要是Mark-Sweep、Mark-Compact、Copying。
  • 垃圾收集器
    不同的收集器会有不同的内存负责范围,不同的算法,比如CMS采用标记清除算法清理老年代空间,使用CMS时需要和ParNew搭配回收年轻代。

HotSpot GC 触发时机

GC 目标内存区域

对于虚拟机中线程私有的区域,如程序计数器虚拟机栈本地方法栈都不需要进行垃圾回收,因为它们是自动进行的,随着线程的消亡而消亡,不需要我们去回收,比如栈的栈帧结构,当进入一个方法时,就会产生一个栈帧,栈帧大小也可以借助类信息确定,然后栈帧入栈,执行方法体,退出方法时,栈帧出栈,于是其所占据的内存空间也就被自动回收了。
而对于虚拟机中线程共享的区域,则需要进行垃圾回收,如方法区,线程都会在这两个区域产生自身的数据,占据一定的内存大小,并且这些数据又可能会存在相互关联的关系,所以,这部分的区域不像线程私有的区域那样可以简单自动的进行垃圾回收,此部分区域的垃圾回收非常复杂,而垃圾回收也主要是针对这部分区域。

可达性分析

对于可达性分析而言,我们知道,首先需要选取 GCRoots 结点,而 GCRoots 结点主要在全局性的引用(如常量或类静态属性)与执行上下文(如栈帧中的局部变量表)中。方法区可以很大,这对于寻找 GCRoots 结点来说会非常耗时。当选取了 GCRoots 结点之后,进行可达性分析时必须要保证一致性,即在进行分析的过程中整个执行系统看起来就好像被冻结在某个时间点上,不可以在分析的时候,对象的关系还在动态变化,这样的话分析的准确性就得不到保证,所以可达性分析是时间非常敏感的。
为了保证分析结果的准确性,就会导致GC 进行时必须停顿所有 Java 执行线程(Stop the world),为了尽可能的减少 Stop the world 的时间,Java 虚拟机使用了一组称为OopMap的数据结构,该数据结构用于存放对象引用的地址,这样,进行可达性分析的时候就可以直接访问 OopMap 就可以获得对象的引用,从而加快分析过程,减少 Stop the world 时间
OopMap 数据结构有利于进行 GC,是不是虚拟机无论何时想要进行 GC 都可以进行 GC,即无论虚拟机在执行什么指令都可以进行 GC?答案是否定的,因为要想让虚拟机无论在执行什么指令的时候都可以进行 GC 的话,需要为每条指令都生成 OopMap,显然,这样太浪费空间了。为了节约宝贵的空间,虚拟机只在”特定的位置“存放了 OopMap 数据结构,这个特定的位置我们称之为安全点程序执行时并非在所有地方都能够停顿下来开始 GC(可达性分析),只有到达安全点的时候才能暂停安全点可以由方法调用、循环跳转、异常跳转等指令产生,因为这些指令会让程序长时间执行
现在我们已经知道了安全点的概念,即进行 GC 必须要到达安全点,那么在发生 GC 时如何让所有线程到达安全点再暂停呢?有两种方法:

  1. 抢先式中断,在发生 GC 时,首先把所有线程全部中断,如果发现线程中断的地方不在安全点上,就恢复线程,让它跑到安全点上。
  2. 主动式中断,在发生 GC 时,不中断线程,而是设置一个标志,所有线程执行时主动轮询这个标志,发生标志位真就自己中断挂起,轮询标志的地方和安全点是重合的,也有可能是创建对象需要分配内存的地方。

现在问题又来了,当程序不执行的时候,如何让所有线程达到安全点呢?典型的就是线程处于 Sleep 状态或者 Blocked 状态,这时候线程是无法跑到安全点再中断自己的,虚拟机也肯定不可能等待该线程被唤醒并重新分配 CPU 时间后,跑到安全点再暂停。为了解决这个问题,引入安全区域的概念。安全区域是对安全点的扩展,可以看成由很多安全点组成,安全区域是指一段代码片段之中,引用关系不会发生变化。在这个区域的任何地方开始 GC 都是安全的。当线程执行到安全区域的代码时,首先标示自己已经进入了安全区域,那么,在这段时间里 JVM 发起 GC 时,就不用管标示自己为安全区域状态的线程了。在线程要离开安全区域时,它要检查系统是否已经完成了根节点枚举(或者整个 GC 过程),若完成,线程继续执行;否则,它必须等待直到收到可以安全离开安全区域的信号。

分代回收 GC 类型及对象晋升(新生代 -> 老年代)

根据作用区域的不同,GC 主要分为 3 种:

  • Minor GC:对象通常在新生代的 Eden 区进行分配,当 Eden 区没有足够空间进行分配时,虚拟机将发起一次 Minor GC,非常频繁,速度较快;
  • Major GC:指发生在老年代的 GC,出现 Major GC,经常会伴随一次 Minor GC,同时 Minor GC 也会引起 Major GC,一般在 GC 日志中统称为 GC,不频繁。
  • Full GC:指发生在老年代和新生代的GC,速度很慢,需要Stop The World。可以用System.gc() 强制执行 Full GC,但这在生产环境中是需要被禁止的。

对象的晋升机制:

  1. 对象优先在新生代区中分配,若没有足够空间,则触发 Minor GC,经过 Minor GC 仍存活的对象年龄 +1,若年龄超过一定限制(默认为 15),则被晋升到老年态;
  2. 大对象(需要大量连续内存空间)直接进入老年态;
  3. 长期存活的对象进入老年态。

GC Cause

定义GC Cause的代码位置:src/share/vm/gc/shared/gcCause.hppsrc/share/vm/gc/shared/gcCause.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
const char* GCCause::to_string(GCCause::Cause cause) {
switch (cause) {
// 手动触发
case _java_lang_system_gc:
return "System.gc()";

case _full_gc_alot:
return "FullGCAlot";

case _scavenge_alot:
return "ScavengeAlot";

case _allocation_profiler:
return "Allocation Profiler";

case _jvmti_force_gc:
return "JvmtiEnv ForceGarbageCollection";

//
case _gc_locker:
return "GCLocker Initiated GC";

case _heap_inspection:
return "Heap Inspection Initiated GC";

case _heap_dump:
return "Heap Dump Initiated GC";

case _wb_young_gc:
return "WhiteBox Initiated Young GC";

case _update_allocation_context_stats_inc:
case _update_allocation_context_stats_full:
return "Update Allocation Context Stats";

case _no_gc:
return "No GC";

// 分配对象失败,触发Young GC
case _allocation_failure:
return "Allocation Failure";

// 老年代满了
case _tenured_generation_full:
return "Tenured Generation Full";

case _metadata_GC_threshold:
return "Metadata GC Threshold";

// CMS
case _cms_generation_full:
return "CMS Generation Full";

case _cms_initial_mark:
return "CMS Initial Mark";

case _cms_final_remark:
return "CMS Final Remark";

case _cms_concurrent_mark:
return "CMS Concurrent Mark";

case _old_generation_expanded_on_last_scavenge:
return "Old Generation Expanded On Last Scavenge";

case _old_generation_too_full_to_scavenge:
return "Old Generation Too Full To Scavenge";

case _adaptive_size_policy:
return "Ergonomics";

case _g1_inc_collection_pause:
return "G1 Evacuation Pause";

case _g1_humongous_allocation:
return "G1 Humongous Allocation";

case _last_ditch_collection:
return "Last ditch collection";

case _last_gc_cause:
return "ILLEGAL VALUE - last gc cause - ILLEGAL VALUE";

default:
return "unknown GCCause";
}
ShouldNotReachHere();
}

JVM会因这些Cause触发回收:
/src/hotspot/share/gc/cms/concurrentMarkSweepGeneration.cpp

列举一些经典的GC Cause及参考的解决方案:

  1. 扩容时发生的GC
    如果-Xms-Xmx的值设置得不一样,刚开始只会分配-Xms大小的堆空间,每次不够时再向操作系统申请,这时必须进行一次GC。
    因此,需要尽量将-Xms-Xmx-XX:-MaxNewSize-XX:NewSize-XX:MetaSpaceSize-XX:MaxMetaSpaceSize这样的值设置成一样的。
  2. System.gc()
    如果扩容缩容、Old区达到回收阈值、Metaspace空间不足、Young区晋升失败、大对象担保失败等几种情况都没有发生,却触发了GC,那有可能是因为代码中显式调用了System.gc()
    System.gc()一般用于清理DiectBuffer对象,因为DirectBuffer会申请堆外空间。
    因此System.gc()的去留需要根据即使情况来判断。
  3. Metaspace OOM
    1.8之后,Java将类、字符串常量等数据保存到了元空间,而元空间又位于堆中,因此GC时会将元空间的数据也一并回收掉。
    但是元空间大小会受-XX:MaxMetaSpaceSize这个属性限制,如果空间不够且无法继续扩容,则将触发OOM。
    一般Metaspace OOM是由动态加载类数据造成的,可以dump内存快照观察类数据的Histogram(直方图),或者直接通过命令定位,jcmd打几次Histogram的图,看一下具体是哪个包下的Class增加较多即可定位。
  4. 过早晋升
    如果发生了以下情况,可能是发生了过早晋升:
    分配速率接近于晋升速率,对象晋升年龄较小。
    GC 日 志 中 出 现“Desired survivor size 107347968 bytes, new threshold 1(max 6)”等信息,说明此时经历过一次 GC 就会放到 Old 区。
    Full GC 比较频繁,且经历过一次 GC 之后 Old 区的变化比例非常大。

发生过早晋升的根本原因可能是:Young/Eden区过小分配速率过大

晋升年龄受一个阈值MaxTenuringThreshold控制,如果设置得过大,会导致该晋升的对象一直停留在年轻代,每次YoungGC都需要复制大量对象,失去了老年代的作用;如果设置得过小,大量对象被晋升到Old区,失去了年轻代的作用。不同情况下JVM内存成分不同,对象的生命周期分布也不同,因此晋升年龄是动态调整的。
/src/hotspot/share/gc/shared/ageTable.cpp#compute_tenuring_threshold
可以看到 Hotspot 遍历所有对象时,从所有年龄为 0 的对象占用的空间开始累加,如果加上年龄等于 n 的所有对象的空间之后,使用 Survivor 区的条件值(Target-SurvivorRatio / 100,TargetSurvivorRatio 默认值为 50)进行判断,若大于这个值则结束循环,将 n 和 MaxTenuringThreshold 比较,若 n 小,则阈值为 n,若 n 大,则只能去设置最大阈值为 MaxTenuringThreshold。动态年龄触发后导致更多的对象进入了 Old 区,造成资源浪费。
如果是Young/Eden过小,可以调整比例,一般可以在Heap 内存不变的情况下适当增大 Young 区,一般情况下 Old 的大小应当为活跃对象的 2~3 倍左右,考虑到浮动垃圾问题最好在 3 倍左右,剩下的都可以分给 Young 区。
如果是分配速率过大,可以分析一下代码是不是哪些地方动态加载类过快了;或者直接扩大元空间,适应这种速度。

  1. CMS FullGC频繁
    CMS的原理是一次Young GC后,负责处理CMS的一个后台线程concurrentMarkSweep会不断地轮询,使用shouldConcurrentCollect()检测是否达到回收条件。如果达到条件则调用collect_in_background()启动一次Background模式GC。
    判断是否进行回收的代码:/src/hotspot/share/gc/cms/concurrentMarkSweepGeneration.cpp
    比较常见的有:-XX:+UseCMSInitiatingOccupancyFraction触发、上次Young GC失败触发。

  2. 单次CMS GC(老年代GC)耗时过长
    CMS回收主要耗时阶段是Init Mark和Final Remark,因为这两个阶段都需要STW,
    见Old区垃圾回收细节:CMSCollector::collect_in_backgroundCMSCollector::collect

不同算法触发的时机

  1. Minor GC(年轻代 GC)
    触发时机:在 Enden 满了之后将被触发
    GC 在优先级最低的线程中运行,一般在应用程序空闲即没有应用线程在运行时被调用。
    当发生 Minor GC 后空间仍不够,触发 Major GC
  2. Full GC / Major GC(老年代GC)
    触发时机:
    1. 调用 System.gc 时,系统建议执行 Full GC,但是不必然执行。(可通过通过-XX:+ DisableExplicitGC来禁止 RMI 调用 System.gc。)
    2. 方法区空间不足,如果没有动态加载,一般是发生在启动的时候的,但是JDK1.8之后元空间替换了方法区,因此不会有这种情况了。
    3. 老年代空间不足,引起FullGC,这种情况比较复杂,有以下几种情况:
      3.1、通过对象的正常晋升机制触发对象向老年代移动时,老年代空间不足,由-XX:MaxTenureThreshold参数定义;
      3.2、大对象直接进入老年代,此时老年代空间不足,由-XX:PretenureSizeThreshold参数定义;
      3.3、动态年龄判定机制会将对象提前转移至老年代。年龄从小到大累加,当加入某个年龄段后,这个年龄对象占用空间大小总和超过survivor区域 * -XX:TargetSurvivorRatio的时候,从这个年龄段往上年龄的对象进入老年代
      3.4、由 Eden 区、From Space 区向 To Space 区复制时,对象大小大于 To Space 可用内存,则把该对象转存到老年代,且老年代的可用内存小于该对象大小。

在进行MinorGC之前,JVM的空间分配担保机制可能会触发3.2、3.3、3.4的发生,也就是触发一次FullGC。
所谓的空间分配担保机制,就是在MinorGC之前,虚拟机会检查老年代最大可用连续内存空间是否大于新生代所有对象的总空间。

  • 如果大于,则此次Minor是安全的;
  • 如果小于,则虚拟机会查看HandlePromotionFailure设置值是否允许担保失败。如果HandlePromotionFailure=true,那么会继续检查老年代最大可用连续空间是否大于历次晋升到老年代的对象的平均大小,如果大于,则尝试进行一次MinorGC,但这次MinorGC依然是有风险的,失败后会重新发起一次FullGC,如果小于或者HandlePromotionFailure=false,则改为直接进行一次FullGC。

最后,当发生 FullGC 之后空间还是不够,将抛出 OutOfMemoryError。

对象分配和回收策略

对象的内存分配,绝大部分都是在堆上分配,少数经过JIT编译后被拆散为标量类型并间接在栈上分配。
在堆上的分配又可以有如下分配,主要在新生代的 Eden 区分配,如果启动了本地线程分配缓冲,将按照线程优先在TLAB上分配,少数直接在 Tenured 区分配,虚拟机也提供了一些参数供我们来控制对象内存空间的分配。
总而言之,对象分配具有以下几种策略:

对象优先在 Eden 区分配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
-Xms20M -Xmx20M -Xmn10M
-XX:SurvivorRatio=8
-XX:+PrintGCDetails
-XX:+UseSerialGC
public class AllocEdenTest {
private static final int _1MB = 1024 * 1024;

public static void testAllocation() {
byte[] alloc1, alloc2, alloc3, alloc4;
alloc1 = new byte[2 * _1MB];
alloc2 = new byte[2 * _1MB];
alloc3 = new byte[2 * _1MB];
alloc4 = new byte[4 * _1MB];
}

public static void main(String[] args) {
testAllocation();
}
}
GC日志:
[GC (Allocation Failure) [DefNew: 7223K->685K(9216K), 0.0125141 secs] 7223K->4781K(19456K), 0.0125503 secs] [Times: user=0.00 sys=0.00, real=0.01 secs]
Heap
def new generation total 9216K, used 7071K [0x00000007bec00000, 0x00000007bf600000, 0x00000007bf600000)
eden space 8192K, 77% used [0x00000007bec00000, 0x00000007bf23c948, 0x00000007bf400000)
from space 1024K, 66% used [0x00000007bf500000, 0x00000007bf5ab658, 0x00000007bf600000)
Disconnected from the target VM, address: '127.0.0.1:58261', transport: 'socket'
to space 1024K, 0% used [0x00000007bf400000, 0x00000007bf400000, 0x00000007bf500000)
tenured generation total 10240K, used 4096K [0x00000007bf600000, 0x00000007c0000000, 0x00000007c0000000)
the space 10240K, 40% used [0x00000007bf600000, 0x00000007bfa00020, 0x00000007bfa00200, 0x00000007c0000000)
Metaspace used 2989K, capacity 4568K, committed 4864K, reserved 1056768K
class space used 318K, capacity 392K, committed 512K, reserved 1048576K

新生代可用的空间:9M = 8M(Eden 空间容量) + 1M(一个 Survivor 空间的容量)
老年代可用的空间:10M
分配完 alloc1、alloc2、alloc3 之后,无法再分配 alloc4,会发生分配失败,则需要进行一次 Minor GC,survivor to 区域的容量为 1M,无法容纳总量为 6M 的三个对象,则会通过担保机制将 alloc1、allo2 转移到老年代,然后再将 alloc4 分配在 Eden 区。

大对象直接进入 Tenured 区

大对象需要大块连续内存空间,大对象的出现容易提前触发 GC 以获取更大的连续空间来供分配大对象,可以设置-XX:PretenureSizeThreshold的值来控制多大的对象直接分配到 Tenured 区,默认是 0,即所有对象不管多大都先在 Eden 区中分配空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* -Xms20M -Xmx20M -Xmn10M
* -XX:SurvivorRatio=8
* -XX:+PrintGCDetails
* -XX:+UseSerialGC
* -XX:PretenureSizeThreshold=3145728
*/
public class AllocBigObjectTest {
private static final int _1MB = 1024 * 1024;

public static void main(String[] args) {
byte[] alloc = new byte[5 * _1MB];
}
}

Heap
def new generation total 9216K, used 1180K [0x00000007bec00000, 0x00000007bf600000, 0x00000007bf600000)
eden space 8192K, 14% used [0x00000007bec00000, 0x00000007bed27010, 0x00000007bf400000)
from space 1024K, 0% used [0x00000007bf400000, 0x00000007bf400000, 0x00000007bf500000)
to space 1024K, 0% used [0x00000007bf500000, 0x00000007bf500000, 0x00000007bf600000)
tenured generation total 10240K, used 5120K [0x00000007bf600000, 0x00000007c0000000, 0x00000007c0000000)
the space 10240K, 50% used [0x00000007bf600000, 0x00000007bfb00010, 0x00000007bfb00200, 0x00000007c0000000)
Metaspace used 2662K, capacity 4486K, committed 4864K, reserved 1056768K
class space used 287K, capacity 386K, committed 512K, reserved 1048576K

因为设置了-XX:PretenureSizeThreshold=3145728控制大小超过 3M 的对象直接进入 Tenured 区,可以看到 5M 的对象直接被分配到了 Tenured 区。

长期存活的对象进入 Tenured 区

每个对象有一个对象年龄计数器,与前面的对象的存储布局中的 GC 分代年龄对应。对象出生在 Eden 区、经过一次 Minor GC 后仍然存活,并能够被 Survivor 容纳,则设置年龄为 1,对象在 Survivor 区每次经过一次 Minor GC,年龄就加 1,当年龄达到阈值(默认 15),就晋升到老年代,虚拟机提供了-XX:MaxTenuringThreshold来进行设置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* -Xms20M -Xmx20M -Xmn10M
* -XX:SurvivorRatio=8
* -XX:+PrintGCDetails
* -XX:+UseSerialGC
* -XX:MaxTenuringThreshold=1
* -XX:+PrintTenuringDistribution
*/
public class AllocLongTimeTest {
private static final int _1MB = 1024 * 1024;

public static void main(String[] args) {
byte[] alloc1, alloc2, alloc3;
alloc1 = new byte[_1MB / 4];
alloc2 = new byte[4 * _1MB];
alloc3 = new byte[4 * _1MB];
alloc3 = null;
alloc3 = new byte[4 * _1MB];
}
}

[GC (Allocation Failure) [DefNew
Desired survivor size 524288 bytes, new threshold 1 (max 1)
- age 1: 964208 bytes, 964208 total
: 7479K->941K(9216K), 0.0063212 secs] 7479K->5037K(19456K), 0.0063540 secs] [Times: user=0.00 sys=0.01, real=0.00 secs]
[GC (Allocation Failure) [DefNew
Desired survivor size 524288 bytes, new threshold 1 (max 1)
: 5037K->0K(9216K), 0.0014434 secs] 9133K->4814K(19456K), 0.0014629 secs] [Times: user=0.00 sys=0.00, real=0.01 secs]
Heap
def new generation total 9216K, used 4178K [0x00000007bec00000, 0x00000007bf600000, 0x00000007bf600000)
eden space 8192K, 51% used [0x00000007bec00000, 0x00000007bf014930, 0x00000007bf400000)
from space 1024K, 0% used [0x00000007bf400000, 0x00000007bf400000, 0x00000007bf500000)
to space 1024K, 0% used [0x00000007bf500000, 0x00000007bf500000, 0x00000007bf600000)
tenured generation total 10240K, used 4814K [0x00000007bf600000, 0x00000007c0000000, 0x00000007c0000000)
the space 10240K, 47% used [0x00000007bf600000, 0x00000007bfab3b38, 0x00000007bfab3c00, 0x00000007c0000000)
Metaspace used 2988K, capacity 4568K, committed 4864K, reserved 1056768K
class space used 318K, capacity 392K, committed 512K, reserved 1048576K

如 GC 日志中所示,总共发生了两次 Minor GC:

  1. 第一次是在给 alloc3 分配的时候,此时 Survivor 区不能容纳 alloc2,但是可以容纳 alloc1,所以 alloc1 进入了 Survivor 区并且年龄变成 1、达到了阈值,将在下一次 GC 时晋升到老年代,而 alloc2 则通过担保机制进入了老年代;
  2. 第二次 GC 是在第二次给 alloc3 分配空间时,这时 alloc1 年龄+1,晋升到老年代,此时 GC 也可以清理出原来 alloc3 占据的 4MB 空间,将 alloc3 分配在 Eden 区。

因此,最后的结果是 alloc1、alloc2 在老年代,alloc3 在 Eden 区。

动态对象年龄判断

除了对象年龄自然达到-XX:MaxTenuringThreshold而被转移到 Tenured 区外,如果在 Survivor 区中相同年龄所有对象大小的总和大于 Survivor 区的一半,则年龄大于等于该年龄的对象也可以直接转移到 Tenured 区、而无需等年龄达到-XX:MaxTenuringThreshold

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* -Xms20M -Xmx20M -Xmn10M
* -XX:SurvivorRatio=8
* -XX:+PrintGCDetails
* -XX:+UseSerialGC
* -XX:MaxTenuringThreshold=15
* -XX:+PrintTenuringDistribution
*/
public class AllocDynamicAgeTest {
private static final int _1MB = 1024 * 1024;

public static void main(String[] args) {
byte[] alloc1, alloc2, alloc3, alloc4;
alloc1 = new byte[_1MB / 4];
alloc2 = new byte[_1MB / 4];
alloc3 = new byte[4 * _1MB];
alloc4 = new byte[4 * _1MB];
alloc4 = null;
alloc4 = new byte[4 * _1MB];
}
}

[GC (Allocation Failure) [DefNew
Desired survivor size 524288 bytes, new threshold 1 (max 15)
- age 1: 1048568 bytes, 1048568 total
: 7735K->1023K(9216K), 0.0066947 secs] 7735K->5293K(19456K), 0.0067283 secs] [Times: user=0.00 sys=0.00, real=0.01 secs]
[GC (Allocation Failure) [DefNew
Desired survivor size 524288 bytes, new threshold 15 (max 15)
: 5120K->0K(9216K), 0.0015566 secs] 9389K->5244K(19456K), 0.0015767 secs] [Times: user=0.00 sys=0.00, real=0.00 secs]
Heap
def new generation total 9216K, used 4178K [0x00000007bec00000, 0x00000007bf600000, 0x00000007bf600000)
eden space 8192K, 51% used [0x00000007bec00000, 0x00000007bf014930, 0x00000007bf400000)
from space 1024K, 0% used [0x00000007bf400000, 0x00000007bf400000, 0x00000007bf500000)
to space 1024K, 0% used [0x00000007bf500000, 0x00000007bf500000, 0x00000007bf600000)
tenured generation total 10240K, used 5244K [0x00000007bf600000, 0x00000007c0000000, 0x00000007c0000000)
the space 10240K, 51% used [0x00000007bf600000, 0x00000007bfb1f248, 0x00000007bfb1f400, 0x00000007c0000000)
Metaspace used 2986K, capacity 4568K, committed 4864K, reserved 1056768K
class space used 318K, capacity 392K, committed 512K, reserved 1048576K

发生了两次 Minor GC:

  1. 第一次发生在给 alloc4 分配内存时,此时 alloc1、alloc2 将会进入 Survivor 区,而 alloc3 通过担保机制将会进入老年代;
  2. 第二次发生在给 alloc4 分配内存时,此时,Survivor 区的 alloc1、alloc2 达到了 Survivor 区容量的一半,将会进入老年代,此时 GC 可以清理出 alloc4 原来的 4MB 空间,并将 alloc4 分配在 Eden 区。

最终,alloc1、alloc2、alloc3 在老年代,alloc4 在 Eden 区。

空间分配担保

老年代连续空间大于新生代对象总大小、或者历次晋升的平均大小,就会执行 Minor GC,否则将进行 Full GC。GC 期间,如果 Survivor 区空闲空间小于存活对象,则需要老年代进行分配担保,把 Survivor 区无法容纳的对象直接转移到老年代。
例子在上一节中已经给出,这里不再赘述。

HotSpot GC 实现方式

计算所需空间大小

ConcurrentMarkSweepGeneration::compute_new_size()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void ConcurrentMarkSweepGeneration::compute_new_size() {
assert_locked_or_safepoint(Heap_lock);

// If incremental collection failed, we just want to expand
// to the limit.
if (incremental_collection_failed()) {
clear_incremental_collection_failed();
grow_to_reserved();
return;
}

// The heap has been compacted but not reset yet.
// Any metric such as free() or used() will be incorrect.

CardGeneration::compute_new_size();

// Reset again after a possible resizing
if (did_compact()) {
cmsSpace()->reset_after_compaction();
}
}

对垃圾回收算法的改进

复制算法

两个区域 A 和 B,初始对象在 A,继续存活的对象被转移到 B。
这两个区域并不需要根据 1:1 划分内存空间,而是将内存划分为一块较大的 Eden Space 和两块较小的 Survivor Space,在 HotSpot 中默认大小比例为 8:1。
当执行年轻代回收时会将 Eden 区存活的对象复制到一个空闲的 Survivor,下一次 GC 时将 Eden 区和这个 Survivor 区存活的对象复制到另一个 Survivor 区,因此总是会有一块 Survivor 区是空闲的。
当 Survivor 空间不够用的时候,需要依赖于老年代的空间担保。

标记-清除算法

一块区域,标记可达对象(可达性分析),然后回收不可达对象,这会引入碎片,因此在空间碎片过多导致无法继续分配时往往会执行一次整理来压缩空间。

标记-整理算法

相对标记清理算法来说多了碎片整理的过程,可以整理出更大的内存放更大的对象。
复制收集算法在对象存活率较高时就要执行较多的复制操作,效率将会变低。更关键的是,如果不想浪费 50%的空间,就需要有额外的空间进行分配担保,以应对被使用的内存中所有对象都100%存活的极端情况,所以在老年代一般不能直接选用这种算法。
根据老年代的特点,有人提出了另外一种“标记-整理”(Mark-Compact)算法,标记过程仍然与“标记-清除”算法一样,但后续步骤不是直接对可回收对象进行清理,而是让所有存活的对象都向一端移动,然后直接清理掉端边界以外的内存(有点 copy 的意思,但是比 copy 省空间。比清理好的一点是没有碎片)。

分代回收

新生代:初始对象,生命周期短的
永久代:长时间存在的对象
整个 java 的垃圾回收是新生代和年老代的协作,这种叫做分代回收。

在大的分代回收的思想下面,不同的代区可以选择不同的收集器,而不同的收集器在不同的代区又会用到不同的算法。

方法区回收策略

方法区与 Java 堆一样,是各个线程共享的内存区域,它用于存储已被虚拟机加载的类信息、常量、静态变量、即时编译器编译后的代码等数据。
Java 虚拟机规范对方法区的限制非常宽松,除了和 Java 堆一样不需要连续的内存和可以选择固定大小或者可扩展外,还可以选择不实现垃圾收集。
方法区的垃圾回收主要回收两部分内容:

  1. 从常量池回收废弃常量。
    如何判断废弃常量呢?以字面量回收为例,如果一个字符串“abc”已经进入常量池,但是当前系统没有任何一个 String 对象引用了叫做“abc”的字面量,那么,如果发生垃圾回收并且有必要时,“abc”就会被系统移出常量池。常量池中的其他类(接口)、方法、字段的符号引用也与此类似。
  2. 卸载无用的类。既然进行垃圾回收,就需要判断哪些是废弃常量,哪些是无用的类。
    如何判断无用的类呢?需要满足以下三个条件
    • 该类的所有实例都已经被回收,即 Java 堆中不存在该类的任何实例。
    • 加载该类的 ClassLoader 已经被回收。
    • 该类对应的 java.lang.Class 对象没有在任何地方被引用,无法在任何地方通过反射访问该类的方法。
      满足以上三个条件的类可以进行垃圾回收,但是并不是无用就被回收,虚拟机额外提供了一些参数供我们配置。

直接内存(堆外内存)

直接内存并不是虚拟机运行时数据区的一部分,也不是 Java 虚拟机规范中定义的内存区域。但是这部分内存也被频繁地使用,而且也可能导致 OutOfMemoryError 异常出现。
NIO 类可以直接通过 Native 函数分配堆外内存,然后通过一个存储在 Java 堆中的 DirectByteBuffer 对象作为这块内存的引用进行操作。这样能在一些场景中显著提高性能,因为避免了在 Java 堆和 Native 堆中来回复制数据。
使用堆外内存时需要注意:

  • 由于垃圾收集器不涉及堆外内存,因此堆外内存何时分配何时回收都需要用户自己来定义;
  • 直接内存的分配不会受到 Java 堆大小的限制,但是,既然是内存,肯定还是会受到本机总内存(包括 RAM 以及 SWAP 区或者分页文件)大小以及处理器寻址空间的限制。

由 DirectMemory 导致的内存溢出,一个明显的特征是在 Heap Dump 文件中不会看见明显的异常,如果我们发现 OOM 之后 Dump 文件很小,而程序中有直接或间接使用了 NIO ,那就可以考虑检查一下是不是这方面的原因。

JVM 垃圾收集器的演进

垃圾收集器是内存回收算法的具体实现,随着 JDK 的升级我们已经有很多种垃圾收集器可供选择:

  • JDK1.4 && JDK1.5 很少用了,基本上是 Serial(Serial Old)。
  • JDK1.6 是ParNew或者Parallel(Parallel Old)。
  • JDK1.7 Parallel、Parallel Old。
  • JDK1.8 Parallel Scavenge(新生代)、Parallel Old(老年代) 配合 CMS。
  • JDK1.9+ G1出现,且为默认收集器

在Java中如何配置垃圾收集器

如何知道 JVM 进程当前使用的是哪种垃圾收集器?

  1. java -XX:+PrintCommandLineFlags
    打印启动时参数,根据启动时参数可以推断 JVM 进程使用的是什么垃圾收集器,但是这并不准确。
  2. jmap
    1
    jmap -heap <PID>

垃圾统计配置

  • -XX:+PrintGC
  • -XX:+PrintGCDetails
  • -XX:+PrintGCTimeStamps:可与上面参数一起使用
  • -XX:+PrintGCApplicationConcurrentTime:打印每次垃圾回收前,程序未中断的执行时间,可与上面参数一起使用
  • -XX:+PrintGCApplicationStoppedTime:打印垃圾回收期间程序暂停的时间,可与上面参数一起使用
  • -XX:PrintHeapAtGC:打印 GC 前后的详细堆栈信息
  • -Xloggc:filename:与上面几个配合使用,把日志信息记录到文件来分析

使用什么垃圾回收器

  • -XX:+UseG1GC 在整个 Java 堆使用 G1 进行垃圾回收
  • -XX:+UseConcMarkSweepGC 设定新生代使用 ParNew(并发复制)收集器,老年代使用 CMS Concurrent Mark-Sweep(并发标记清除)收集器执行内存回收
  • -XX:+UseParallelOldGC 手动指定新生代使用 Parallel Scavenge(并行复制)收集器,老年代使用 Parallel Old(并行标记-压缩)收集器执行内存回收
  • -XX:+UseSerialGC 手动指定新生代使用 Serial Coping(串行复制)收集器,老年代使用 Serial Old (串行标记-清理-压缩)收集器执行内存回收
  • -XX:+UseParNewGC 手动指定新生代使用 ParNew(并发复制)收集器,老年代使用 Serial Old (串行标记-清理-压缩)收集器执行内存回收
  • -XX:+UseParallelGC 手动指定新生代使用 Parallel Scavenge(并行复制)收集器,老年代使用 Serial Old (串行标记-清理-压缩)收集器执行内存回收

Serial / Serial Old 收集器

Serial(串行)收集器是最基本、发展历史最悠久的串行收集器,JDK 1.5 之前默认都是此收集器,因为那时候 CPU 都是单核的。

使用

  • -XX:+UseSerialGC
    这个配置指定年轻代为 Serial,同时会指定老年代采用 Serial Old。

实现原理

Serial(SerialOld)收集器工作过程

  • 单线程阻塞队列。
  • 年轻代采用复制算法,老年代采用标记整理算法,作用于老年代时称作 Serial Old 收集器。

优点

简单而高效(与其他收集器的单线程相比),对于限定单个 CPU 的环境来说,Serial 收集器由于没有线程交互的开销,专心做垃圾收集自然可以获得更高的单线程收集效率。

缺点

  • 它是一个单线程收集器,只会使用一个 CPU 或一条收集线程去完成垃圾收集工作,无法有效利用多核 CPU;
  • 它在进行垃圾收集时,必须暂停其他所有的工作线程,直至 Serial 收集器收集结束为止(Stop The World)。

应用场景

  • HotSpot 虚拟机运行在 Client 模式下的默认的新生代收集器。
  • 单 CPU 虚拟机里面。
  • JDK 1.3.1 之前,是虚拟机新生代收集的唯一选择。JDK 1.5.0 之前老年代的唯一选择。
  • 内存比较小的情况下,效率还是很高的。

ParNew 收集器

使用

  • -XX:+UseParNewGC
    如果使用此配置默认年轻代,老年代采用 Serial Old。
  • -XX:ParallerGCThreads=3
    ParNew 默认开启的收集线程数与 CPU 的数量相同,在 CPU 非常多的情况下可使用 -XX:ParallerGCThreads 参数设置。

实现原理

ParNew收集器工作过程
ParNew 收集器就是 Serial 收集器的多线程版本(即并发模式),除了使用多线程进行垃圾收集外,其余行为包括 Serial 收集器可用的所有控制参数、收集算法(复制算法)、Stop The World、对象分配规则、回收策略等与 Serial 收集器完全相同,两者共用了相当多的代码。

优点

  • 多 CPU 环境下 GC 时更有效利用系统资源,是 Server 模式下虚拟机的首选新生收集器。
  • 可以与 CMS 搭配使用。

缺点

  • 只能用于新生代。
  • ParNew 收集器在单 CPU 的环境中绝对不会有比 Serial 收集器有更好的效果,甚至由于存在线程交互的开销,该收集器在通过超线程技术实现的两个 CPU 的环境中都不能百分之百地保证可以超越。

Parallel Scavenge 并行收集器

使用

  • -XX:+UseParallelGC
  • -XX:+UseParallelOldGC
  • -XX:+UseAdaptiveSizePolicy
    这是一个动态调整各个代区的内存大小的开关参数,打开参数后,就不需要手工指定新生代的大小(-Xmn)、Eden 和 Survivor 区的比例(-XX:SurvivorRatio)、晋升老年代对象年龄(-XX:PretenureSizeThreshold)等细节参数了,虚拟机会根据当前系统的运行情况收集性能监控信息,动态调整这些参数以提供最合适的停顿时间或者最大的吞吐量,这种方式称为 GC 自适应调节策略(GC Ergonomics)
  • -XX:ParallelGCThreads=n
    并行 GC 线程数。
  • -XX:MaxGCpauseMillis=5
    默认 GC 最大停留时间。
  • -xx:GCTimeRatio
    GC 占用总时间的最大比率。

实现原理

ParallelScavenge(ParallelOld)收集器工作过程

  • 并行
  • 可控的吞吐量

    吞吐量(Throughput),即 CPU 用于运行用户代码的时间与 CPU 总消耗时间的比值,即“吞吐量 = 运行用户代码时间 /(运行用户代码时间 + 垃圾收集时间)”。
    假设虚拟机总共运行了 100 分钟,其中垃圾收集花掉 1 分钟,那吞吐量就是 99%。

  • 自适应调节策略

优点

  • 可以调整吞吐量,减少停顿时间,从而提升用户体验
    停顿时间越短就越适合需要与用户交互的程序,良好的响应速度能提升用户体验。而高吞吐量则可以高效率地利用 CPU 时间,尽快完成程序的运算任务,主要适合在后台运算而不需要太多交互的任务。

缺点

Parallel Scavenge 收集器无法与 CMS 收集器配合使用。

并发标记清理(Concurrent Mark-Sweep,CMS)收集器

使用

  • -XX:+UseConcMarkSweepGC,使用 CMS 收集器;
  • -XX:CMSInitiatingOccupancyFraction=80
    当老年代的使用率达到80%时,就会触发一次 CMS GC
  • -XX:+UseCMSCompactAtFullCollection
    Full GC 后,进行一次碎片整理,整理过程是独占的,会引起停顿时间变长。
  • -XX:+CMSFullGCsBeforeCompaction
    设置进行几次 Full GC 后,进行一次碎片整理。
  • -XX:ParallelCMSThreads,设定 CMS 的线程数量(一般情况约等于可用 CPU 数量)。

实现原理

CMS 收集器运行过程中各步骤所涉及的并发和所需的停顿时间如下图所示:
CMS收集器工作过程
CMS(Concurrent Mark Sweep)收集器是一种以获取最短回收停顿时间为目标的收集器。
顾名思义,CMS 采用标记清除算法,它的工作流程分为以下 6 个步骤:

  1. 初始标记(CMS initial mark):仅仅只是标记一下 GC Roots 能直接关联到的对象,速度很快,需要Stop The World(stw)
    CMS-InitialMark
  2. 并发标记(CMS concurrent mark):进行 GC Roots Tracing 的过程,在整个过程中耗时最长。
    CMS-ConcurrentMark
    根据上个阶段找到的 GC Roots 遍历查找,并不是上一阶段存活的对象都会被标记,因为在标记期间用户的程序可能会改变一些引用,如上图所示。
  3. 并发预清理(CMS Concurrent Preclean):并发过程,标记并发执行过程中的脏区域(Card)。
    CMS-ConcurrentPreclean
    如上图所示,在并发运行过程中(包括上一阶段),一些对象的引用可能会发生变化,预清理过程将包含这个对象的区域(Card)标记为 Dirty,这也就是Card Marking
    然后,由这些脏可达的对象也会被重新标记:
    CMS-ConcurrentPreclean-Mark
  4. 可中断预清理(CMS Concurrent Abortable Preclean):这也是一个并发阶段,这个阶段的主要目的是尽量承担最终标记阶段的工作。
    因为重新标记阶段阶段需要全堆扫描,此时如果先进行了MinorGC则可以大大较少需要扫描的对象数量,因此Abortable Preclean阶段的目的就是等一段时间,看看能不能在重新标记前执行一次MinorGC。
    为什么重新标记阶段需要做全堆扫描?因为判断对象是否可达需要使用根搜索算法,而只有MinorGC时才会使用根搜索算法,否则CMS也不知道之前的并发阶段是否产生了新的不可达对象。
  5. 重新标记(CMS remark):为了修正并发标记期间因用户程序继续运作而导致标记产生变动的那一部分对象的标记记录,这个阶段的停顿时间一般会比初始标记阶段稍长一些,但远比并发标记的时间短。此阶段也需要Stop The World
    CMS-Remark
    通常 Remark 阶段会在年轻代尽可能干净的时候运行,目的是为了减少连续 STW 发生的可能性。
  6. 并发清除(CMS concurrent sweep):清除不再使用的对象。
    CMS-ConcurrentSweep

下面以一个真实环境中的FullGC日志为例:

1
2
3
4
5
6
7
8
9
10
11
12
2020-08-20T04:37:36.159+0800: 638682.623: [GC (CMS Initial Mark) [1 CMS-initial-mark: 1930043K(2097152K)] 2000027K(4793536K), 0.2664430 secs] [Times: user=0.11 sys=0.02, real=0.26 secs]
2020-08-20T04:37:36.426+0800: 638682.890: [CMS-concurrent-mark-start]
2020-08-20T04:37:42.956+0800: 638689.420: [CMS-concurrent-mark: 6.513/6.529 secs] [Times: user=2.11 sys=0.40, real=6.53 secs]
2020-08-20T04:37:42.956+0800: 638689.420: [CMS-concurrent-preclean-start]
2020-08-20T04:37:42.982+0800: 638689.445: [CMS-concurrent-preclean: 0.024/0.026 secs] [Times: user=0.03 sys=0.01, real=0.03 secs]
2020-08-20T04:37:42.982+0800: 638689.446: [CMS-concurrent-abortable-preclean-start]
CMS: abort preclean due to time 2020-08-20T04:37:48.340+0800: 638694.804: [CMS-concurrent-abortable-preclean: 5.356/5.358 secs] [Times: user=6.26 sys=0.24, real=5.36 secs]
2020-08-20T04:37:48.344+0800: 638694.807: [GC (CMS Final Remark) [YG occupancy: 571811 K (2696384 K)]2020-08-20T04:37:48.344+0800: 638694.808: [Rescan (parallel) , 0.0743374 secs]2020-08-20T04:37:48.418+0800: 638694.882: [weak refs processing, 0.0004330 secs]2020-08-20T04:37:48.419+0800: 638694.882: [class unloading, 3.9423498 secs]2020-08-20T04:37:52.361+0800: 638698.825: [scrub symbol table, 0.5589452 secs]2020-08-20T04:37:52.920+0800: 638699.384: [scrub string table, 0.0015701 secs][1 CMS-remark: 1930043K(2097152K)] 2501855K(4793536K), 4.5824373 secs] [Times: user=0.47 sys=0.04, real=4.58 secs]
2020-08-20T04:37:52.927+0800: 638699.391: [CMS-concurrent-sweep-start]
2020-08-20T04:37:56.807+0800: 638703.271: [CMS-concurrent-sweep: 3.877/3.880 secs] [Times: user=2.69 sys=0.11, real=3.88 secs]
2020-08-20T04:37:56.808+0800: 638703.271: [CMS-concurrent-reset-start]
2020-08-20T04:37:56.815+0800: 638703.279: [CMS-concurrent-reset: 0.007/0.007 secs] [Times: user=0.01 sys=0.00, real=0.00 secs]

上面的GC日志中:

  • 第 1 行、初始标记阶段,会发生STW,标记GC Root直接引用的对象,GC Root直接引用的对象不多,因此很快。
    1930043K:当前老年代使用的容量;
    2097152K:老年代可用的最大容量;
    2000027K:整个堆目前使用的容量;
    4793536K:整个堆的可用容量;
    0.2664430 secs:这个阶段的持续时间;
    [Times: user=0.11 sys=0.02, real=0.26 secs]:对应 user、system 和 real 的时间统计。
  • 第 2~3 行、并发标记阶段,由第一阶段标记过的对象出发所有可达的对象都在本阶段标记。
    6.513/6.529 secs:这个阶段的持续时间与时钟时间;
    [Times: user=2.11 sys=0.40, real=6.53 secs]:时间统计,但是因为是并发执行的,并不仅仅包含 GC 线程的工作。
  • 第 4~5 行、并发预清理阶段,查找前一阶段执行过程中,从新生代晋升或新分配或被更新的对象,通过并发地重新扫描这些对象,可以减少下一个 STW 重新标记阶段的工作量。
    0.024/0.026 secs:持续时间与时钟时间;
    Times: user=0.03 sys=0.01, real=0.03 secs:时间统计。
  • 第 6~7 行、并发可终止的预清理阶段,这个阶段其实跟上一个阶段做的东西一样,也是为了减少下一个 STW 重新标记阶段的工作量。增加这一阶段是为了让我们可以控制这个阶段的结束时机,比如扫描多长时间(默认 5 秒)或者 Eden 区使用占比达到期望比例(默认 50%)就结束本阶段。
  • 第 8 行、Final Remark 重新标记阶段,会发生STW,暂停所有用户线程,从 GC Root 开始重新扫描整个堆,标记存活的对象。这一阶段是为了修正并发标记期间因用户线程继续运行而导致标记产生变动的那一部分对象的标记记录。这一阶段停顿时间一般比初始标记阶段稍长,但远比并发标记时间短。需要注意的是,虽然 CMS 只回收老年代的垃圾对象,但是这个阶段依然需要扫描新生代,因为很多 GC Root 都在新生代,而这些 GC Root 指向的对象又在老年代,这称为跨代引用
    YG occupancy: 571811 K (2696384 K):年轻代当前占用量及容量;
    Rescan (parallel) , 0.0743374 secs:Rescan 是当应用暂停的情况下完成对所有存活对象的标记,这个阶段是并行处理的;
    weak refs processing, 0.0004330 secs:第 1 个子阶段,处理弱引用;
    class unloading, 3.9423498 secs:第 2 个子阶段,卸载不再使用的 class;
    scrub symbol table, 0.5589452 secs ... scrub string table, 0.0015701 secs:最后一个子阶段,清理符号表和字符表。
    1 CMS-remark: 1930043K(2097152K):这一阶段之后老年代的使用量与总量;
    2501855K(4793536K):这一阶段后堆的使用量与总量(包括年轻代);
    4.5824373 secs:这一阶段的持续时间,也就是 STW 的时间。
    [Times: user=0.47 sys=0.04, real=4.58 secs]:这一阶段统计的持续时间。
    经过这5个阶段之后,老年代所有存活的对象就都被标记过了,之后可以通过清除算法去清理老年代不再使用的对象。
  • 第 9~10 行、并发清除;
  • 第 11~12 行、重置,重新初始化 CMS 内部数据结构,以备下一轮 GC 使用。

普通串行标记清除算法与并行标记清除算法(CMS)的比较如下图所示:
串行标记清除算法与并行标记清除算法之间的比较
如上图可知,并发标记清除算法与串行标记清除算法之间的区别主要在于,前者将标记过程分成了 3 个部分,其中占用时间最长的Concurrent Mark不需要stw。 
由于整个过程中耗时最长的并发标记和并发清除过程收集器线程都可以与用户线程一起工作,所以从总体上来说,CMS 收集器的内存回收过程是与用户线程一起并发执行的。

优点

并发收集、低停顿,因此 CMS 收集器也被称为并发低停顿收集器(Concurrent Low Pause Collector)。

缺点

  • 对 CPU 资源非常敏感。其实,面向并发设计的程序都对 CPU 资源比较敏感。在并发阶段,它虽然不会导致用户线程停顿,但会因为占用了一部分线程(或者说 CPU 资源)而导致应用程序变慢,总吞吐量会降低。CMS 默认启动的回收线程数是(CPU 数量+3)/4,也就是当 CPU 在 4 个以上时,并发回收时垃圾收集线程不少于 25%的 CPU 资源,并且随着 CPU 数量的增加而下降。但是当 CPU 不足 4 个时(比如 2 个),CMS 对用户程序的影响就可能变得很大,如果本来 CPU 负载就比较大,还要分出一半的运算能力去执行收集器线程,就可能导致用户程序的执行速度忽然降低了 50%,其实也让人无法接受。
  • 标记-清除算法导致的内存碎片
    CMS 是一款基于“标记-清除”算法实现的收集器,这意味着收集结束时会有大量空间碎片产生,可能会提前触发一次 FullGC。空间碎片过多时,将会给大对象分配带来很大麻烦,往往出现老年代空间剩余,但无法找到足够大连续空间来分配当前对象。
    可能会引起Promotion Failed(空间分配担保失败),即进行Minor GC时,发现Survivor Space放不下,对象只能放到老年代,而老年代也放不下。
  • 无法处理浮动垃圾(Floating Garbage),可能出现Concurrent Mode Failure失败而导致另一次 Full GC 的产生。
    由于 CMS 并发清理阶段用户线程还在运行着,伴随程序运行自然就还会有新的垃圾不断产生。这一部分垃圾出现在标记过程之后,CMS 无法在当次收集中处理掉它们,只好留待下一次 GC 时再清理掉。这一部分垃圾就被称为“浮动垃圾”。也是由于在垃圾收集阶段用户线程还需要运行,那也就还需要预留有足够的内存空间给用户线程使用,因此 CMS 收集器不能像其他收集器那样等到老年代几乎完全被填满了再进行收集,需要预留一部分空间提供并发收集时的程序运作使用。

应用场景

  • CMS 以最短回收停顿时间为目标,非常符合那些集中在互联网站或者 B/S 系统的服务端上的 Java 应用,这些应用都非常重视服务的响应速度,不能有明显的暂停时间。
  • 当你的应用程序需要有较短的应用程序暂停,而可以接受垃圾收集器与应用程序共享应用程序时,则可以选择 CMS 垃圾收集器。
  • 典型情况下,有很多长时间保持 live 状态的数据对象(一个较大的老年代)的应用程序,和运行在多处理上的应用程序,更适合使用 CMS 垃圾收集器。例如 Web 服务器。

G1 收集器

G1(Garbage-First)收集器是当今收集器技术发展最前沿的成果之一。它是一款面向服务端应用的垃圾收集器。

使用

G1 可以用于年轻代和老年代,且算法分 3 个步骤,所以配置种类比较多。
只作用于年轻代的配置:

  • -XX:G1NewSizePercent
    年轻代最小值,默认值 5%。
  • -XX:G1MaxNewSizePercent
    年轻代最大值,默认值 60%。

作用于老年代的配置:

  • -XX:InitiatingHeapOccupancyPercent
    当老年代大小占整个堆大小百分比达到该阈值时,会触发一次 Mixed GC
  • -XX:+UseCMSInitiatingOccupancyOnly

其他配置:

  • -XX:MaxGCPauseMillis
    设置 G1 收集过程目标时间,默认值 200ms。
  • -XX:G1ReservePercent
    默认值 10%,预留的空闲空间的百分比
  • -XX:G1HeapRegionSize
    配置 Region 块的大小,范围 1MB 到 32MB,设置后会根据最小堆 Java 堆内存划分出 2048 个 Region 块

实现原理 - 内存结构与GC算法

在 G1 算法中,采用了另外一种完全不同的方式组织堆内存,堆内存被划分为多个大小相等的内存块,称为Region,每个 Region 是逻辑连续的一段内存,结构如下:
G1收集器内存结构
由上图可见:

  • 新生代与老年代并不是连续的,而是一些 Region 的集合;
  • 为了避免全堆扫描,对其他 Region 对象的引用会被记录到一个Remembered Set中,每个 Region 都对应一个 Remembered Set,虚拟机发现程序在对 Reference 类型的数据进行写操作时,会插入一个 Write Barrier 暂时中断写操作,检查 Reference 引用的对象是否位于其他 Region 中,如果是则将其引用信息记录到该 Region 对应的 Remembered Set 中,当进行内存回收时,在 GC 根节点的枚举范围中加入 Remembered Set 即可保证即使不对全堆扫描也不会产生遗漏。
  • 一些Regine被标明了H,代表Humongous,这表示这些Region存储的是巨大对象(Humongous object,H-obj),即大小大于等于Region一半的对象,对这些大对象有一些特殊的规则。

堆内存中一个 Region 的大小可以通过 -XX:G1HeapRegionSize 参数指定,大小区间只能是 1M、2M、4M、8M、16M 和 32M,总之是 2 的幂次方,如果 G1HeapRegionSize 为默认值,则在堆初始化时计算 Region 的实践大小。
G1 可以独立管理整个堆空间,但是能够采用不同方式来处理新创建对象和已经存活了一段时间、经历过多次 GC 的老对象,以获取更好的收集效果。G1 中提供了三种模式垃圾回收模式:Young GCMixed GCFull GC,在不同的条件下被触发。

Young GC

发生在年轻代的 GC 算法,一般对象(除了巨型对象)都是在 Eden Region 中分配内存,当所有 Eden Region 被耗尽无法申请内存时,就会触发一次 Young GC,这种触发机制和之前的 Young GC 差不多,执行完一次 Young GC,活跃对象会被拷贝到 Survivor Region 或者晋升到 Old Region 中,空闲的 Region 会被放入空闲列表中,等待下次被使用。

Mixed GC

当越来越多的对象晋升到老年代 Old Region 时,为了避免堆内存被耗尽,虚拟机会触发一个混合的垃圾收集器,即 Mixed GC,该算法并不是一个 old gc,除了回收整个 Young Region,还会回收一部分的 Old Region,这里需要注意:是一部分老年代,而不是全部老年代,可以选择哪些 Old Region 进行收集,从而可以对垃圾回收的耗时时间进行控制。
G1收集器工作过程
Mixed GC 的执行过程有点类似 CMS,主要分为以下几个步骤:

  • initial mark: 初始标记过程,整个过程需要 STW,但耗时比较短,标记了从 GC Root 可达的对象,它们能被 GC Root 直接关联到;
  • concurrent marking: 并发标记过程,整个过程 gc collector 线程与应用线程可以并行执行,标记出 GC Root 可达对象衍生出去的存活对象,并收集各个 Region 的存活对象信息;
  • remark: 最终标记过程,整个过程需要 STW,GC 线程与用户线程并行执行,耗时较短,标记出那些在并发标记过程中遗漏的、或者由于用户线程继续运行导致的标记变动,变动记录将被记录在 Remembered Set Logs 中,此阶段会把其整合到 Remembered Set 中;
  • clean up: 垃圾清除过程,与用户线程并发执行,时间用户可控,对各个 Region 的回收价值和成本进行排序,根据用户期望的 GC 时间进行回收,如果发现一个 Region 中没有存活对象,则把该 Region 加入到空闲列表中。

Full GC

如果对象内存分配速度过快,Mixed GC 来不及回收,导致老年代被填满,就会触发一次 Full GC,G1 的 Full GC 算法就是单线程执行的 Serial Old GC,使用标记-整理算法,会导致异常长时间的暂停时间,需要进行不断的调优,尽可能的避免 Full GC。

实现原理 - 并行和并发

G1 使用多个 CPU 来缩短 Stop The World 停顿时间,与用户线程并发执行。

实现原理 - 可预测的停顿

G1 建立了可预测的停顿时间模型,能让使用者明确指定在一个长度为 M 毫秒的时间片段内,消耗在垃圾收集上的时间不得超过 N 毫秒。

优点

缺点

应用场景

各垃圾收集器之间的比较

各垃圾收集器之间的关系

  1. CMS 与 Serial Old 是可以相互配合的
  2. G1 既可以用于年轻代又可以用于老年代
收集器 串行、并行or并发 新生代/老年代 算法 目标 适用场景
Serial 串行 新生代 复制算法 响应速度优先 单 CPU 环境下的 Client 模式
Serial Old 串行 老年代 标记-整理 响应速度优先 单 CPU 环境下的 Client 模式、CMS 的后备预案
ParNew 并行 新生代 复制算法 响应速度优先 多 CPU 环境时在 Server 模式下与 CMS 配合
Parallel Scavenge 并行 新生代 复制算法 吞吐量优先 在后台运算而不需要太多交互的任务
Parallel Old 并行 老年代 标记-整理 吞吐量优先 在后台运算而不需要太多交互的任务
CMS 并发 老年代 标记-清除 响应速度优先 集中在互联网站或 B/S 系统服务端上的 Java 应用
G1 并发 both 标记-整理+复制算法 响应速度优先 面向服务端应用,将来替换 CMS

如何排查GC问题

GC问题可能会有很多表象,比如:GC耗时增大、线程Block增多、慢查询增多、CPU负载高等。
为了排查根因,有几种比较有效的判断方法:

  1. 先发生的事件是根因的概率更大,监控各个指标发生异常的时间点,比如如果先观察到CPU负载高,那么整个问题的影响链就有可能是:CPU负载高->慢查询增多->GC耗时增大->线程Block增多->RT上涨。
  2. 结合历史情况,比如之前慢查问题比较多,那么问题影响链就可能是:慢查询增多->GC耗时增大->CPU负载高->线程Block增多->RT上涨。
  3. 实验,比如只触发线程Block就会发生问题,那么问题很有可能就是线程Block引起的。
  4. 反证,比如发现其他节点CPU和慢查都正常,但是还是出现了问题,那么问题很有可能和CPU和慢查无关。

QA

哪些对象的引用会被当作 GC Root 呢

  • 虚拟机栈(栈帧中的本地变量表)中引用的对象
    下面的变量a即为一个GC Root。
    1
    2
    3
    int main() {
    int a = 1;
    }
  • 方法区中类静态属性(类变量)引用的对象
    下面的b即一个GC Root。
    1
    2
    3
    class A {
    int b = 1;
    }
  • 方法区中常量引用的对象
    下面的字符串”123”会被加载到方法区中的字符串常量表,也是一个GC Root。
    1
    2
    3
    class A {
    static final String c = "123";
    }
  • 本地方法栈中 JNI(native 方法)引用的对象
    实现JNI方法时,在方法体内创建的局部变量。

弱引用和软引用有什么区别?

强引用比较简单,虚引用很少见,容易混淆的是弱引用和软引用:

  1. 弱引用
    只要垃圾回收时弱引用对象没有任何其他强引用,则对象会被回收。
  2. 软引用
    在系统将要发生溢出异常之前,将会把这些对象列进回收范围进行第二次回收,如果这次回收没有足够内存,才会抛出内存溢出异常。

    JVM 在分配空间时,若果 Heap 空间不足,就会进行相应的 GC,但是这次 GC 并不会收集软引用关联的对象,但是在 JVM 发现就算进行了一次回收后还是不足(Allocation Failure),JVM 会尝试第二次 GC,回收软引用关联的对象。

为什么新生代采取复制算法而老年代采取标记-整理算法

这个问题等价于为什么在不同的代中使用不同的垃圾收集器。
主要原因来自新生代和老年代的区别,新生代新陈代谢快,采用复制算法,Survivor 区可以相对较小,不会有太大的空间浪费,并且保证了较高的效率;老年代反之。

为什么不用标记清除算法

效率低,标记和清除都需要一次线性扫描,相当于比别的算法慢一倍,而且产生大量内存碎片,内存碎片的问题也出现在 C 语言的 malloc/free 中。

垃圾收集器中的并发和并行分别代表什么?

并行指各垃圾收集器线程可以同时运行,此时用户线程仍然处于等待状态。
并发指用户线程可以和垃圾收集器同时(可能是交替)运行,它们不在同一个CPU上执行。

为什么 CMS 要 3 次标记

  • 第 1 次标记(Initial Mark):标记 GCRoot 可直达的对象,耗时短。
  • 第 2 次标记(Concurrent Mark):从上一部分标记对象出发标记引用链。
    为什么这个阶段可以并发标记?如果新创建了一个 GC Root 引用的对象或者引用链变更了怎么办?实际上这个步骤已经能将绝大多数需要标记的对象标记上了,如果有遗漏都是在下一阶段弥补的。
  • 第 3 次标记(Remark):重新标记阶段将上一阶段执行过程中用户线程新创建的对象和引用链中新引用的对象都标记上,这个过程相对较短,因此 STW 也可以接受。

从 3 次标记过程的特征可以看出,CMS 将耗时长的部分并行化了,从而保证整个 gc 过程的高性能。

参考

  1. Minor GC、Major GC 和 Full GC 之间的区别
  2. JAVA GARBAGE COLLECTION HANDBOOK
    图解GC流程
  3. Java Platform, Standard Edition HotSpot Virtual Machine Garbage Collection Tuning Guide
    oracle官网对1.8垃圾回收改进的描述。
  4. 从实际案例聊聊Java应用的GC优化
  5. Java Hotspot G1 GC的一些关键技术
  6. Java中9种常见的CMS GC问题分析与解决

Spring Cloud 是一系列框架的有序集合,它利用 Spring Boot 的开发便利性简化了分布式系统的开发,比如服务发现、服务网关、服务路由、链路追踪等。Spring Cloud 并不重复造轮子,而是将市面上开发得比较好的模块集成进去,进行封装,从而减少了各模块的开发成本。换句话说:Spring Cloud 提供了构建分布式系统所需的“全家桶”。
Spring Cloud 常常被拿来和 Dubbo 比较,实际上 Dubbo 只实现了服务治理,接入 Dubbo 的服务能够实现自动上下线、能通过 Dubbo 协议(其实 Dubbo 还支持其他很多协议)互联,但是 Dubbo 并不提供网关、配置中心、链路追踪等一系列微服务架构常用的技术,需要单独引入。

阅读全文 »
0%