Tallate

该吃吃该喝喝 啥事别往心里搁

通过操作 Thread、Object 的 API 和内置锁我们可以解决大部分的线程协调问题,但这绝不是最优雅的方式,接下来我们就来见识一下 JUC 中的各种高效线程同步工具。

阅读全文 »

RPC

客户端 OR 服务端 异步

客户端异步比较常见,因为任何网络调用基本都可以方便地包装成异步调用,朴实无华且枯燥,下载这段代码是一个简单的 demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ClientAsync {

private static class ClientAsyncExecutor {

private ExecutorService threadPool;
private Supplier<String> dataSupplier;
private Consumer<String> callback;

public ClientAsyncExecutor(ExecutorService threadPool,
Supplier<String> dataSupplier,
Consumer<String> callback) {
this.threadPool = threadPool;
this.dataSupplier = dataSupplier;
this.callback = callback;
}

public void call() {
threadPool.submit(() -> {
String data = dataSupplier.get();
callback.accept(data);
});
}
}

public static void main(String[] args) throws InterruptedException {
ExecutorService threadPool = Executors.newCachedThreadPool();
ClientAsyncExecutor executor = new ClientAsyncExecutor(
threadPool,
() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "done";
},
res -> System.out.println("结果:" + res));
executor.call();
System.out.println("call returned");
threadPool.shutdown();
threadPool.awaitTermination(10000, TimeUnit.SECONDS);
}
}

客户端异步是一种伪异步,本质上仍是同步调用,只不过等待是放到另外一个线程中去做的,如果这样等待的线程比较多,对客户端的线程池容易造成压力,。
服务端的异步实现起来就比较费劲了,因为客户端需要额外提供一个入口来接收服务端执行完毕的结果:
服务端异步

良好习惯

不论是桌面应用还是 Web 应用,多线程代码都是比较难玩得转的,玩不明白的结果就是一大堆令人毛骨悚然且难以捉摸、难以调试的问题——实际上,一旦你意识到正在处理一个并发问题,你可能就不得不完全放弃调试了,并转而手动检查代码。
鉴于此,我们当然是希望尽量避免并发问题的,理想情况下希望完全避免多线程错误,同样,不存在那种一刀切的方法,但这有一些调试和防止多线程错误的实际考虑因素:

  1. 避免全局状态
    首先,牢记 “全局状态” 问题。如果你正创建一个多线程应用,那么应该密切关注任何可能全局修改的内容,如果可能的话,将他们全部删掉,如果部分全局变量确实有理由保留,那么应该仔细保证其并发安全,并对程序性能进行跟踪,以确定不会因为引入新的等待时间而导致系统性能降低(并发修改时需要同步多个线程)。
  2. 避免可变性
    这点直接来自于 函数式编程,并且适用于 OOP,声明应该避免类和对象状态的改变。简而言之,这意味着放弃 setter 方法,并在需要避免可变性的类或字段上拥有私有的 final 字段,它们的值唯一发生变化的时间是在构造期间。这样,你可以确定不会出现争用问题,且访问对象属性将始终提供正确的值。
  3. 日志及报警
    评估你的程序可能会在何处发生异常,并预先记录所有关键数据。如果发生错误,你将很高兴可以得到信息说明收到了哪些请求,并可更好地了解你的应用程序为什么会出现错误。需要再次注意的是,日志记录引入了额外的文件 I/O,可能会严重影响应用的性能,因此请不要滥用日志。
    在记日志的基础上,有必要根据 SLA 记录一些指标的报警阈值,比如订单中心下单失败,考虑可能是网络出现抖动引起超时(如果用到三方服务这个问题会更明显),因此报警阈值可以稍微调高一些,比如 1 分钟 3 次失败就打电话报警。
  4. 复用现存实现
    每当你需要创建自己的线程时(例如:向不同的服务发出异步请求),复用现有的安全实现来代替创建自己的解决方案。这在很大程度上意味着要使用 ExecutorService 和 Java 8 简洁的函数式 CompletableFuture 来创建线程。Spring 还允许通过 DeferredResult 类来进行异步请求处理。

反模式-异步狂热

上边我们已经讨论过异步存在的优势和劣势,实际上在我读过的项目代码中,确实存在不少那种不异步不开心的“炫技代码”,给维护带来很大困难。
下面是一个非常直观的方法:

1
2
3
public String punch(People t) {
return "Oh, No";
}

经过一个莫名其妙的异步包装,原来的可扩展性、性能均没有提升,甚至性能成功降低了:

1
2
3
4
5
6
7
8
9
10
public Function doPunch(People t) {
return t -> {
return "Oh, No";
}
}

public String punch(People t) {
Function f = doPunch(t);
return f.apply(t);
}

实际情况可能会复杂得多,这样的代码不够简单直观、容易暗藏 Bug、不符合 KISS 原则,但是即便如此,还是有很多人会觉得特别绕的代码能体现一个人的水平、对代码的驾驭能力、能灵活运用设计模式的能力,我觉得大部分情况下事实并非如此。

Spring - DeferredResult

TODO

Hystrix - Command

Hystrix 的 Command 框架在 CompletableFuture 的基础上提供了合并请求的特性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class BatchGetDataCommand extends HystrixCommand<List<Double>> {

private Collection<CollapsedRequest<Double, Long>> requests;

public BatchGetDataCommand(Collection<CollapsedRequest<Double, Long>> requests) {
super(Setter.withGroupKey(
HystrixCommandGroupKey.Factory.asKey("batchGetData")));
this.requests = requests;
}

@Override
protected List<Double> run() throws Exception {
// TODO: 做些批量查询操作,这里作为示范直接返回
return requests.stream()
.map(CollapsedRequest::getArgument)
.map(arg -> (double) arg)
.collect(Collectors.toList());
}
}

public class SimpleGetDataCommand extends HystrixCollapser<List<Double>, Double, Long> {

private Long id;

public SimpleGetDataCommand(Long id) {
super(HystrixCollapser.Setter
.withCollapserKey(HystrixCollapserKey.Factory.asKey("getData"))
.andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter()
.withMaxRequestsInBatch(2)
.withTimerDelayInMilliseconds(5)
// 允许缓存request的结果
.withRequestCacheEnabled(true))
.andScope(Scope.REQUEST));
this.id = id;
}

@Override
public Long getRequestArgument() {
return id;
}

@Override
protected HystrixCommand<List<Double>> createCommand(Collection<CollapsedRequest<Double, Long>> collapsedRequests) {
return new BatchGetDataCommand(collapsedRequests);
}

@Override
protected void mapResponseToRequests(final List<Double> batchResponse, Collection<CollapsedRequest<Double, Long>> collapsedRequests) {
final AtomicInteger count = new AtomicInteger();
collapsedRequests.forEach(request -> {
request.setResponse(
batchResponse.get(count.getAndIncrement()));
});
}

public static void main(String[] args) {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
// Hystrix内部将多个查询合并成一个
SimpleGetDataCommand command1 = new SimpleGetDataCommand(1L);
SimpleGetDataCommand command2 = new SimpleGetDataCommand(2L);
// 这里需要先使用queue而不是execute
Future<Double> f1 = command1.queue();
Future<Double> f2 = command2.queue();
System.out.println(f1.get());
System.out.println(f2.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
context.shutdown();
}
}
}

任务调度

补偿执行

异常订单表,定时任务

定时任务的实现

无分布式的,HashTimeWheelTimer
分布式情况下,可以先用 Redis,当复杂时再用 RabbitMQ 等消息队列中间件。

消息 OR 定时任务

很多异步实现的功能既可以通过消息实现又可以通过定时任务来实现,我经历过很多需要抉择的情况,甚至也碰到过对此都不大清楚的架构师,我认为对此没有唯一的答案,只能根据具体业务场景来分析,一些要点可供参考。

  • 分布式

  • 并发安全

  • 运维便利性
    消息本质上是把任务暂存在队列服务里,统计某段时间内发生了什么、会发生什么就比较困难了,因为往往消息队列都会有自定义的数据格式,判断一条消息是否被消费往往得通过日志来判断。
    定时任务一般都会有执行记录,什么时间会执行任务也可以直接用 cron 表达式计算出来,缺点是使用定时任务意味着需要自己维护很多东西,比如在数据库里维护一个队列保存消息,保存有创建时间、重试次数、重试时间等,失败 N 次后还需要存一份失败记录。

以一个“通知拉取”的场景举例,A 系统需要从 B 系统拉取数据,但并不是定时地直接调 B 的接口,而是先由 B 通知 A 哪些数据发生了变更,然后由 A 去拉取这些数据的变更部分:

  • 如果使用消息队列来实现:
  • 如果使用定时任务实现:

使用 Thread 实现任务调度

实现任务调度最简单的方式可以直接利用 Thread:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
Thread thread = new Thread(() -> {
while (true) {
String task = "";
try {
task = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(task);
}
});
thread.start();
queue.put("hello");

Timer

Timer 内部通过一个 TimerThread 来循环执行提交给 Timer 的任务,因此任务是串行的,前一个任务的延迟会影响后续的所有任务。
Timer 可以看做一种简化版的 ScheduledExecutor,下面是一种 Demo 实现:

1
2
3
4
5
6
7
8
9
10
11
12
public class DemoTimer {
private static ScheduledExecutorService timer = Executors.newScheduledThreadPool(1);

public static void setTimeout(final Runnable function, long time,
TimeUnit timeUnit, final Executor executor) {
Preconditions.checkArgument(time >= 0);
Preconditions.checkNotNull(function);
Preconditions.checkNotNull(timeUnit);
Preconditions.checkNotNull(executor);
timer.schedule(() -> executor.execute(function), time, timeUnit);
}
}

ScheduledExecutor

ScheduledExecutor 能提供一种更灵活的周期性任务处理功能,

Quartz

Spring Task

Quartz 集群版

tbschedule

xxl-job

Elastic-Job

Elastic-Job架构
Elastic-Job 分为 Elastic-Job-Lite 和 Elastic-Job-Console 两个模块。Elastic-Job-Lite 实现了分布式任务调度、动态扩容缩容、任务分片、失效转移等功能。
如上图所示,Elastic-Job-Lite 采用去中心化的调度方式,由 Elastic-Job-Lite 的客户端定时自动触发任务调度,通过任务分片的概念实现服务器负载的动态扩容和缩容,并且使用 ZooKeeper 作为分布式任务调度的注册中心;当某任务实例崩溃后,自动失效转移,实现高可用。

消息队列(MQ)原理

MQ 有很多优势,当我们选择 MQ 时,主要是为了:

  • 解耦……
    比如 A 系统要将用户提交的数据推送到 B、C 两个系统的时候,最初的想法很有可能是直接用 http 或 rpc 调用实现。像这样的下游系统后来又会多出 D、E、F…,对 A 系统的压力就会越来越大,更复杂的场景中,数据通过接口传给其他系统有时候还要考虑重试、超时等一些异常情况。
    这时,对 A 来说更好的方案是将消息发送给 mq,不管有哪个下游系统需要这个数据都可以直接订阅这个 subject。
  • 异步
    当请求比较复杂,而其中有部分数据没必要实时更新时,可以用 mq 实现异步化。比如取消订单后需要做订单状态的更新、对账、退款等操作,而其中只有状态的变更是有必要实时反馈给用户的,那么后续的所有操作就完全可以做成异步的。
  • 削峰填谷
    消息队列作为缓冲队列应对突发流量时,并不能使处理速度变快,而是使处理速度变平滑,从而不会因瞬时压力过大而压垮应用。
    举个例子,比如我们的订单系统,在下单的时候就会往数据库写数据。但是数据库只能支撑每秒 1000 左右的并发写入,并发量再高就容易宕机。
    低峰期的时候并发也就 100 多个,但是在高峰期时候,并发量会突然激增到 5000 以上,这个时候数据库肯定死了。
    但是使用了 MQ 之后,情况就变了,消息被 MQ 保存起来了,然后系统就可以按照自己的消费能力来消费,比如每秒 1000 个数据,这样慢慢写入数据库,这样就不会打死数据库了。
    如果没有用 MQ 的情况下,并发量高峰期的时候是有一个“顶峰”的,然后高峰期过后又是一个低并发的“谷”。
    但是使用了 MQ 之后,限制消费消息的速度为 1000,但是这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了。
    但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在 1000QPS,直到消费完积压的消息,这就叫做“填谷”。

高可用

使用了 MQ 之后,我们肯定是希望 MQ 有高可用特性,因为不可能接受机器宕机了,就无法收发消息的情况。
这一块我们也是基于 RabbitMQ 这种经典的 MQ 来说明一下:
RabbitMQ 是比较有代表性的,因为是基于主从做高可用性的,我们就以他为例子讲解第一种 MQ 的高可用性怎么实现。
rabbitmq 有三种模式:单机模式,普通集群模式,镜像集群模式
单机模式
单机模式就是 demo 级别的,就是说只有一台机器部署了一个 RabbitMQ 程序。
这个会存在单点问题,宕机就玩完了,没什么高可用性可言。一般就是你本地启动了玩玩儿的,没人生产用单机模式。
普通集群模式
这个模式的意思就是在多台机器上启动多个 rabbitmq 实例。类似的 master-slave 模式一样。
但是创建的 queue,只会放在一个 master rabbtimq 实例上,其他实例都同步那个接收消息的 RabbitMQ 元数据。
在消费消息的时候,如果你连接到的 RabbitMQ 实例不是存放 Queue 数据的实例,这个时候 RabbitMQ 就会从存放 Queue 数据的实例上拉去数据,然后返回给客户端。
总的来说,这种方式有点麻烦,没有做到真正的分布式,每次消费者连接一个实例后拉取数据,如果连接到不是存放 queue 数据的实例,这个时候会造成额外的性能开销。如果从放 Queue 的实例拉取,会导致单实例性能瓶颈。
如果放 queue 的实例宕机了,会导致其他实例无法拉取数据,这个集群都无法消费消息了,没有做到真正的高可用。
所以这个事儿就比较尴尬了,这就没有什么所谓的高可用性可言了,这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。
镜像集群模式
镜像集群模式才是真正的 rabbitmq 的高可用模式,跟普通集群模式不一样的是:创建的 queue 无论元数据还是 queue 里的消息都会存在于多个实例上,
每次写消息到 queue 的时候,都会自动把消息到多个实例的 queue 里进行消息同步。
这样的话任何一个机器宕机了别的实例都可以用提供服务,这样就做到了真正的高可用了。
但是也存在着不好之处:

  • 性能开销过高,消息需要同步所有机器,会导致网络带宽压力和消耗很重
  • 扩展性低:无法解决某个 queue 数据量特别大的情况,导致 queue 无法线性拓展。
就算加了机器,那个机器也会包含 queue 的所有数据,queue 的数据没有做到分布式存储。
    对于 RabbitMQ 的高可用一般的做法都是开启镜像集群模式,这样起码来说做到了高可用,一个节点宕机了,其他节点可以继续提供服务。

高性能

  • 对于内存操作的线程分离,大部分中间件做法是将数据文件缓存与内存中,通过异步线程 flush 至硬盘
  • 合理的存储引擎对应不同的服务场景 B+树,hash,LSM
  • 对于消息队列,选取顺序读写磁盘的方式,可以高效的提升磁盘 IO 速度
  • 顺序写磁盘可以带来足够的写入速度,其读取方式为二分查找
  • 对于 LSM 存储引擎,同样采用顺序写磁盘方式,牺牲一部分读性能从而获得更优越的写性能

消息队列如何选型

异步处理

延时消费

应用隔离(系统解耦)

比如有两个主题的消息,其中 A 主题的消息特别多,别的消息就会来不及处理。
这种情况有点类似于服务治理中的隔离策略:一个服务出错不能影响别的服务不可用。一般会采用线程池、信号量来实现。
MQ 消息中的主题隔离

数据同步

Canel 订阅数据库 binlog 可以实现数据库数据变更捕获,然后业务端订阅 Canel 进行业务处理,这种方式可以保证一致性,且不会有乱序问题。

数据异构

反模式-为了撇清关系所以使用 MQ 消息

反模式-为了解耦过度使用 MQ 消息

反模式-利用数据差异化来触发事件

公司里有几位老员工基于 MQ 监听器组件开发了一套 MQ 客户端,这套 MQ 客户端的核心就是能通过修改数据来触发监听对应字段修改事件的监听器,这样可以避免定义一大堆主题,看起来似乎变简单了对吗?但是经过一段时间的维护发现情况并非如此,大量监听器不再根据主题来相互关联,而是数据中的一大堆字段,最开始的一批开发爽了,因为需要定义的主题少了,少了很多手动发消息的代码,但是后续维护的人就糟了,试想,每次希望修改某个字段的时候都需要把监听该字段修改事件的监听器都找一遍。
两条业务同时修改

MQ 存在的缺陷

上边已经说过了优点,那么 mq 又有哪些缺点呢?

  • 系统可用性降低
    上面的说解耦的场景,本来 A 系统的哥们要把系统关键数据发送给 B、C 系统的,现在突然加入一个 MQ,现在 BC 系统接收数据要通过 MQ 来接收。
    万一 MQ 挂了怎么办?这就引出一个问题,加入了 MQ 之后,系统的可用性是不是就降低了?
    因为多了一个风险因素:MQ 可能会挂掉。只要 MQ 挂了,数据没了,系统运行就不对了。
  • 系统复杂度提高
    本来我的系统通过接口调用一下就能完事的,但是加入一个 MQ 之后,需要考虑消息重复消费、消息丢失、甚至消息顺序性的问题
    为了解决这些问题,又需要引入很多复杂的机制,这样一来是不是系统的复杂度提高了。
  • 数据一致性问题
    本来好好的,A 系统调用 BC 系统接口,如果 BC 系统出错了,会抛出异常,返回给 A 系统让 A 系统知道,这样的话就可以做回滚操作了
    但是使用了 MQ 之后,A 系统发送完消息就完事了,认为成功了。而刚好 C 系统写数据库的时候失败了,但是 A 认为 C 已经成功了?这样一来数据就不一致了。

并发修改

消息从发出到被消费会有一小段时间,这一段时间内数据可能会经过其他线程的多次修改,所以在消息消费方的编程中尤其需要注意并发修改的问题。
如果是同步操作——比如用户购买商品扣款的场景——需要在比较高并发的情况下才会出现并发问题,但是如果功能是基于消息实现的,由于消息消费具有不确定性,这种风险会被放大。
并发扣款

  • 这些并发查询是在不同的站点实例 / 服务实例上完成的,进程内互斥锁无法解决问题。
  • 不确定性来自于很多方面,比如同一主题的消息可能被多个业务线的触发、被重试、被手动重发。

解决这种不一致问题的解决办法一般是加锁,因为异步处理并没有直接被用户感知,因此对效率并没有特别高的要求,悲观锁或乐观锁都是可行的。

悲观锁会牺牲一定的吞吐量,乐观锁实现起来比较有技巧性、且可能会和业务数据耦合。

以乐观锁为例:

1
2
3
4
5
6
7
8
9
// 查询订单,Order中包含了版本信息
Order order = queryOrder();
// 这里使用状态机校验订单状态
if (isStatusInvalid(order)) {
记一下日志
return ;
}
// 扭转状态的同时也做了版本的校验,相当于一个原子操作,如果校验失败则抛出异常、交给外层MQ组件重试
changeOrderStatus(order, targetStatus);

At-Least-Once(消息丢失)

有很多情况可能发生 MQ 消息的丢失:

  • 生产者向 MQ 发送消息时,网络传输出现问题;
  • 消息在 MQ 中存储时,发生磁盘故障等不可控问题;
  • 消费者从 MQ 接收消息时,网络传输出现问题;

一般 MQ 中间件都会保证At-Least-Once的消费,就需要避免消息丢失的情况,有两种方式可以解决这种情况:

事务方式:
在生产者发送消息之前,通过channel.txSelect开启一个事务,接着发送消息
如果消息没有成功被 RabbitMQ 接收到,生产者会收到异常,此时就可以进行事务回滚channel.txRollback然后重新发送。假如 RabbitMQ 收到了这个消息,就可以提交事务channel.txCommit
但是这样一来,生产者的吞吐量和性能都会降低很多,现在一般不这么干。

另外一种方式就是通过 confirm 机制:
这个 confirm 模式是在生产者哪里设置的,就是每次写消息的时候会分配一个唯一的 id,然后 RabbitMQ 收到之后会回传一个 ack,告诉生产者这个消息 ok 了。
如果 rabbitmq 没有处理到这个消息,那么就回调一个 nack 的接口,这个时候生产者就可以重发。
事务机制和 cnofirm 机制最大的不同在于事务机制是同步的,提交一个事务之后会阻塞在那儿
但是 confirm 机制是异步的,发送一个消息之后就可以发送下一个消息,然后那个消息 rabbitmq 接收了之后会异步回调你一个接口通知你这个消息接收到了。
所以一般在生产者这块避免数据丢失,都是用 confirm 机制的。

Rabbitmq 弄丢了数据
RabbitMQ 集群也会弄丢消息,这个问题在官方文档的教程中也提到过,就是说在消息发送到 RabbitMQ 之后,默认是没有落地磁盘的,万一 RabbitMQ 宕机了,这个时候消息就丢失了。
所以为了解决这个问题,RabbitMQ 提供了一个持久化的机制,消息写入之后会持久化到磁盘
这样哪怕是宕机了,恢复之后也会自动恢复之前存储的数据,这样的机制可以确保消息不会丢失。
设置持久化有两个步骤:

  • 第一个是创建 queue 的时候将其设置为持久化的,这样就可以保证 rabbitmq 持久化 queue 的元数据,但是不会持久化 queue 里的数据
  • 第二个是发送消息的时候将消息的 deliveryMode 设置为 2,就是将消息设置为持久化的,此时 rabbitmq 就会将消息持久化到磁盘上去。
    但是这样一来可能会有人说:万一消息发送到 RabbitMQ 之后,还没来得及持久化到磁盘就挂掉了,数据也丢失了,怎么办?
    对于这个问题,其实是配合上面的 confirm 机制一起来保证的,就是在消息持久化到磁盘之后才会给生产者发送 ack 消息。
    万一真的遇到了那种极端的情况,生产者是可以感知到的,此时生产者可以通过重试发送消息给别的 RabbitMQ 节点
    消费端弄丢了数据
    RabbitMQ 消费端弄丢了数据的情况是这样的:在消费消息的时候,刚拿到消息,结果进程挂了,这个时候 RabbitMQ 就会认为你已经消费成功了,这条数据就丢了。
    对于这个问题,要先说明一下 RabbitMQ 消费消息的机制:在消费者收到消息的时候,会发送一个 ack 给 RabbitMQ,告诉 RabbitMQ 这条消息被消费到了,这样 RabbitMQ 就会把消息删除。
    但是默认情况下这个发送 ack 的操作是自动提交的,也就是说消费者一收到这个消息就会自动返回 ack 给 RabbitMQ,所以会出现丢消息的问题。
    所以针对这个问题的解决方案就是:关闭 RabbitMQ 消费者的自动提交 ack,在消费者处理完这条消息之后再手动提交 ack。
    这样即使遇到了上面的情况,RabbitMQ 也不会把这条消息删除,会在你程序重启之后,重新下发这条消息过来。

消息重复

一般情况下 MQ 除了不保证消息的有序性外、还不保证消息不重复。
因为在「网络不可达」的情况下,MQ 不能确认消息接收方收到了消息必然会重试。重试除了本文讲的幂等处理外,还可以采用每个消息有唯一的 ID+去重表实现。

消息的有序性

因为 MQ 消息在服务器上是分区存储的,每个分区自己是有序的。分区被接收端消费的时候。一般也是多个接收端一起消费。中间的每个环节都是只能保证局部有序。如果想全局有序。就需要分区只有一个,并且接收端服务器是单点,而且一次只处理一个请求。
TODO: TCP 是怎么做的。

消息积压

参考

任务调度

  1. cron
    crontab 定时任务
  2. 中心化-去中心化调度设计
    xxl-job 和 elastic-job 在设计上的本质区别是中心化还是去中心化。

Disruptor 相对于传统方式(普通)的优点

  1. 无锁
    相对 Lock 来说效率更高(线程不需要挂起,只涉及到一次内存交换速度快),但是同时会带来 ABA 问题,且多线程下竞争容易产生空转。
  2. 所有访问者都记录自己的序号的实现方式,允许多个生产者与多个消费者共享相同的数据结构。
  3. 在每个对象中都能跟踪序列号(ring buffer,claim Strategy,生产者和消费者),加上神奇的 cache line padding,就意味着没有为伪共享和非预期的竞争。

如何使用 Disruptor

1
2
3
4
5
6
7
8
9
10
11
12
public class LongEvent {

private long value;

public void set(long value) {
this.value = value;
}

public long getValue() {
return this.value;
}
}
1
2
3
4
5
6
7
public class LongEventFactory implements EventFactory<LongEvent> {

@Override
public LongEvent newInstance() {
return new LongEvent();
}
}
1
2
3
4
5
6
7
public class LongEventHandler implements EventHandler<LongEvent> {

@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("Event: " + event.getValue() + " sequence:+" + sequence + " endOfBatch:" + endOfBatch);
}
}
1
2
3
4
5
6
7
public class LongEventHandler implements EventHandler<LongEvent> {

@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("Event: " + event.getValue() + " sequence:+" + sequence + " endOfBatch:" + endOfBatch);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class LongEventProducer {

private final RingBuffer<LongEvent> ringBuffer;

public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}

public void onData(ByteBuffer bb) {
// Grab the next sequence
long sequence = ringBuffer.next();
try {
// Get the entry in the Disruptor
LongEvent event = ringBuffer.get(sequence);
// for the sequence Fill with data
event.set(bb.getLong(0));
} finally {
ringBuffer.publish(sequence);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class LongEventMain {

public static void main(String[] args) {
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();

// The factory for the event
LongEventFactory factory = new LongEventFactory();

// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;

// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, executor);

// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());

// Start the Disruptor, starts all threads running
disruptor.start();

// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

LongEventProducer producer = new LongEventProducer(ringBuffer);

ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++) {
bb.putLong(0, l);
producer.onData(bb);
}
}
}

QA

  1. 并发框架 Disruptor 译文

使用主从复制

  1. 运行 Master
    调整 Master 内存中保存的缓冲积压部分(replication backlog),以便执行部分重同步。
    1
    2
    3
    # 缓冲区越大,可断开连接再重连执行部分重同步的时间越长,缓冲区会在每次连接时分配。
    repl-backlog-size 1mb
    repl-backlog-ttl 3600
  2. 运行 Slave
    先在配置文件中设置 Master 和 logfile 路径再运行
    1
    2
    slaveof 172.16.205.141 6379
    logfile "/usr/redis/log/slave.log"
  3. 级联复制(从从复制)
    之前是所有 Slave 连到一个 Master 上,这是一种中心化的办法,对 Master 的负担较大,事实上我们完全可以不全部连到 Master 上,而是 Master->Slave1->Slave2 这样传递。
    实现级联复制也较简单,只用修改 Slave2 配置文件的slaveof属性即可。
  4. Master write,Slave read
    通过程序(客户端)实现数据的读写分离,即在程序中判断请求是读是写,让 Master 负责处理写请求,Slave 负责处理读请求;通过扩展 Slave 处理更多的并发请求,减轻 Master 端的负载。

只读 Slave

Redis2.6 之后,Redis 支持只读模式,可以使用slave-read-only配置来控制这个行为。
只读模式下的 slave 将会拒绝所有写入命令,因此实践中不可能由于某种出错而将数据写入 slave 。但这并不意味着该特性旨在将一个 slave 实例暴露到 Internet ,或者更广泛地说,将之暴露在存在不可信客户端的网络,因为像 DEBUG 或者 CONFIG 这样的管理员命令仍在启用。但是,在 redis.conf 文件中使用 rename-command 指令可以禁用上述管理员命令以提高只读实例的安全性。

同步复制和异步复制

Redis 使用默认的异步复制,其特点是低延迟和高性能,不会影响 Redis 主线程的响应效率。

  • Redis 复制在 master 侧是非阻塞的。这意味着 master 在一个或多个 slave 进行初次同步或者是部分重同步时,可以继续处理查询请求。
  • 复制在 slave 侧大部分也是非阻塞的。当 slave 进行初次同步时,它可以使用旧数据集处理查询请求,假设你在 redis.conf 中配置了让 Redis 这样做的话。否则,你可以配置如果复制流断开, Redis slave 会返回一个 error 给客户端。但是,在初次同步之后,旧数据集必须被删除,同时加载新的数据集。 slave 在这个短暂的时间窗口内(如果数据集很大,会持续较长时间),会阻塞到来的连接请求。自 Redis 4.0 开始,可以配置 Redis 使删除旧数据集的操作在另一个不同的线程中进行,但是,加载新数据集的操作依然需要在主线程中进行并且会阻塞 slave 。

Redis 虽然声称是单线程模型,但是很多功能仍然是采用多线程实现的。

什么时候触发复制

  • 当一个 Master 和一个 Slave 实例连接正常时,Master 通过向 Slave 发送命令流来增量同步自身数据集的改变情况,包括客户端的写入、key 的过期等;
  • Master 与 Slave 之间因为网络问题或宕机,之后 Slave 重新连上 Master 时会尝试进行部分重同步,即只获取在断开连接期间内丢失的命令流;
    为此,slave 会记住旧 master 的旧 replication ID复制偏移量,因此即使询问旧的 replication ID,其也可以将部分复制缓冲提供给连接的 slave 。
  • 当无法进行部分重同步时,Slave 会请求进行全量重同步。Master 需要创建所有数据的快照,将之发送给 Slave,之后在数据集发生更改时持续发送命令流到 Slave。

主从复制原理

当用户往 Master 端写入数据时,通过Redis Sync机制将数据文件发送至 Slave,Slave 也会执行相同的操作确保数据一致。

  1. 同一个 Master 可以拥有多个 Slaves。Master 下的 Slave 还可以接受同一架构中其它 Slave 的链接与同步请求,实现数据的级联复制,即 Master->Slave->Slave 模式;
    repl-diskless-sync-delay参数可以延迟启动数据传输,目的可以在第一个 slave 就绪后,等待更多的 slave 就绪。
    主从复制最好配置成级联复制,因为这样更容易解决单点问题,避免Master承受过大的复制压力
  2. Master 以非阻塞的方式同步数据至 slave,这将意味着 Master 会继续处理一个或多个 slave 的读写请求;
  3. Slave 端同步数据也可以修改为非阻塞的方式,当 slave 在执行新的同步时,它仍可以用旧的数据信息来提供查询;否则,当 slave 与 master 失去联系时,slave 会返回一个错误给客户端;
  4. 主从复制可以做到读写分离,保证了可扩展性,即多个 slave 专门提供只读查询与数据的冗余,Master 端专门提供写操作;
  5. 通过配置禁用 Master 数据持久化机制,将其数据持久化操作交给 Slaves 完成,避免在 Master 中要有独立的进程来完成此操作。
  6. Redis 主从复制的性能问题,为了主从复制的速度和连接的稳定性,Slave 和 Master 最好在同一个局域网内。

标识同步进程:

  1. 每个 Master 都有一个Replication ID:这是一个较大的伪随机字符串,标记了一个给定的数据集。
  2. 每个 Master 持有一个偏移量offset,Master 将自己产生的复制流发送给 slave 时,发送多少个字节的数据,自身的偏移量就会增加多少,目的是当有新的操作修改自己的数据集时,它可以以此更新 Slave 的状态。即使没有 Slave 连接到 Master,offset 也会自增,所以基本上每一对 <Replication ID, offset> 都会标识一个 Master 数据集的确切版本。
  3. Slave 也维护了一个复制偏移量offset,代表从库同步的字节数,从库每收到主节点传来的 N 个字节数据时,从库的 offset 增加 N。
    Master 和 Slave 的offset总是不断增大,这也是判断主从数据是否同步的标志,若主从的 offset 相同则表示数据同步量,不通则表示数据不同步。

复制积压缓冲区
主节点(master)响应写命令时,不但会把命名发送给从节点,还会写入复制积压缓冲区,用于复制命令丢失的数据补救。
Slave 连接中断时主节点仍然可以响应命令,但因复制连接中断命令无法发送给 Slave。之后,当 Slave 重启并触发部分复制时,Master 可以将复制积压缓冲区的内容同步给 Slave,从而提高复制效率;

部分重同步过程:

  1. 当 Slave 连接到 Master,发送一个PSYNC命令表明自己记录的旧的 Master Replication ID和它们至今为止处理的偏移量offset
  2. Master 仅发送 Slave 所需的增量部分的命令流,即上次同步偏移量offset之后执行的写命令;
  3. 但是如果 master 的缓冲区中没有足够的命令积压缓冲记录,或者如果 slave 引用了不再知道的历史记录(replication ID),则会转而进行一个全量重同步:在这种情况下, slave 会得到一个完整的数据集副本,从头开始。

全量同步(完整重同步):

  1. Slave 向 Master 发送PSYNC命令;
  2. Master 执行BGSAVE命令,开启一个后台进程用于生成一个 RDB 文件;
  3. 同时它开始缓冲所有从客户端接收到的新的写入命令;
  4. 当后台保存完成时, master 将数据集文件传输给 slave, slave 将之保存在磁盘上,然后加载文件到内存;
  5. 再然后 master 会将所有缓冲的写命令发给 slave,这个过程以指令流的形式完成并且和 Redis 协议本身的格式相同。

    可以通过telnet连接到 Redis 服务器上然后发送SYNC命令来模拟这个过程,但是因为SYNC功能有限(比如不支持部分重同步),现在的版本用PSYNC作为代替。
    正常情况下,全量同步会先在磁盘上创建一个 RDB 文件,传输时将其加载进内存,然后 Slave 对此进行数据的同步,如果磁盘性能很低,这个过程压力会比较大,Redis 2.8.18之后支持直接传输 RDB 文件,可以使用repl-diskless-sync配置参数配置。

全量同步完成以后,在此后的时间里主从维护着心跳检查来确认对方是否在线,每隔一段时间(默认 10 秒,通过repl-ping-slave-period参数指定)主节点向从节点发送 PING 命令判断从节点是否在线,而从节点每秒 1 次向主节点发送 REPLCONF ACK 命令,命令格式为:REPLCONF ACK {offset},其中 offset 指的是从节点保存的复制偏移量,作用是:

  1. 向主节点报告自己复制进度,主节点会对比复制偏移量向从节点发送未同步的命令;
  2. 判断主节点是否在线。

主从复制执行过程 - Slave怎么与Master建立连接

主从复制
1、Slave Redis实例上配置slaveof xxx,表示将成为另一台Redis实例的从服务器,启动 Slave时,需要设置当前节点的Master信息,并开始主从同步过程;
代码位置:replication.c/slaveofCommand()

1
2
3
4
// 进入连接状态(重点)
server.repl_state = REDIS_REPL_CONNECT;
server.master_repl_offset = 0;
server.repl_down_since = 0;

2、上边设置复制信息成功后,Redis服务器会有一个cron任务(serverCron)定时判断需要进行同步操作,向Master建立连接,也就是一个握手的过程;
代码位置:replication.c/replicationCron()

1
2
3
4
5
if (server.repl_state == REPL_STATE_CONNECT) {
if (connectWithMaster() == C_OK) {
serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started");
}
}

serverCron是Redis的主事件循环,负责超多的任务,包括过期key处理、rehash、备份RDB文件、AOF重写等等。

3、确定连接后,接下来,cron任务里还有比较关键的一项是确定复制方案,
会先向 Master 发送一个 PSYNC Command,Master会返回复制方案,也就是下面的全量、增量及不支持这3种情况:
代码位置:replication.c/syncWithMaster()
replication.c/slaveTryPartialResynchronization()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 向主服务器发送 PSYNC 命令
reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);

// 全量复制
if (!strncmp(reply,"+FULLRESYNC",11)) {
...
}

// 增量复制
if (!strncmp(reply,"+CONTINUE",9)) {
...
}

// 错误,目前master不支持PSYNC
if (strncmp(reply,"-ERR",4)) {
...
}

注意PSYNC命令的两个参数:

  • 主库的runID:每个Redis实例启动时都会自动生成的一个随机ID,用来唯一标识这个实例。
    当从库和主库第一次复制时,因为不知道主库的runID,因此会将runID设为”?”。
  • 复制进度offset:设为-1表示第一次复制。

4、Master接收到命令后需要判断需要全量同步还是部分同步
这部分代码在replication.c/syncCommand()中,接下来我们再讨论主节点如何判断同步方式及同步的流程。

主从复制执行过程 - Master如何处理PSYNC命令

1、无论是第一次连接还是重新连接,Master 都会启动一个后台进程(fork),将数据快照保存到数据文件中,同时 Master 会记录所有修改数据的命令并缓存在数据文件中(持久化),Master会将文件内容加载到内存中,等之后回传给Slave(复制);
2、Master端与Slave端完成握手后,需要判断是需要进行全量还是增量复制(也就是上面的返回+FULLRESYNC还是+CONTINUE
处理Slave的PSYNC命令的代码位置:replication.c/syncCommand()
判断是否需要执行全量复制的代码位置:replication.c/masterTryPartialResynchronization()
判断执行全量复制的条件如下代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 检查 master id 是否和 runid 一致,只有一致的情况下才考虑执行psync
if (strcasecmp(master_runid, server.runid)) {
/* Run id "?" is used by slaves that want to force a full resync. */
// 从服务器提供的 run id 和服务器的 run id 不一致
if (master_runid[0] != '?') {
redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
"Runid mismatch (Client asked for runid '%s', my runid is '%s')",
master_runid, server.runid);
// 从服务器提供的 run id 为 '?' ,表示强制 FULL RESYNC
} else {
redisLog(REDIS_NOTICE,"Full resync requested by slave.");
}
// 需要 full resync
goto need_full_resync;
}

// 判断当前Slave带来的offset在Master的backlog中是否还能找到,找不到则执行全量复制
if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
REDIS_OK) goto need_full_resync;

// 如果没有backlog
if (!server.repl_backlog ||
// 或者 psync_offset 小于 server.repl_backlog_off
// (想要恢复的那部分数据已经被覆盖)
psync_offset < server.repl_backlog_off ||
// psync offset 大于 backlog 所保存的数据的偏移量
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
{
// 执行 FULL RESYNC
redisLog(REDIS_NOTICE,
"Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset);
if (psync_offset > server.master_repl_offset) {
redisLog(REDIS_WARNING,
"Warning: slave tried to PSYNC with an offset that is greater than the master replication offset.");
}
goto need_full_resync;
}

3、如果是部分复制
Master会向Slave发送 backlog 中从 offset 到 backlog 尾部之间的数据
代码:replication.c/addReplyReplicationBacklog()
部分复制在3.0版本和之后的版本中的实现有比较大的差异。
在3.0时,部分复制发生在Slave向Master发送PSYNC命令时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
void syncCommand(redisClient *c) {
...

if (!strcasecmp(c->argv[0]->ptr,"psync")) {
// 尝试进行 PSYNC
if (masterTryPartialResynchronization(c) == REDIS_OK) {
// 可执行 PSYNC
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
// 不可执行 PSYNC
char *master_runid = c->argv[1]->ptr;

/* Increment stats for failed PSYNCs, but only if the
* runid is not "?", as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync. */
if (master_runid[0] != '?') server.stat_sync_partial_err++;
}
}

...
}

int masterTryPartialResynchronization(redisClient *c) {
...

/* If we reached this point, we are able to perform a partial resync:
* 程序运行到这里,说明可以执行 partial resync
*
* 1) Set client state to make it a slave.
* 将客户端状态设为 salve
*
* 2) Inform the client we can continue with +CONTINUE
* 向 slave 发送 +CONTINUE ,表示 partial resync 的请求被接受
*
* 3) Send the backlog data (from the offset to the end) to the slave.
* 发送 backlog 中,客户端所需要的数据
*/
c->flags |= REDIS_SLAVE;
c->replstate = REDIS_REPL_ONLINE;
c->repl_ack_time = server.unixtime;
listAddNodeTail(server.slaves,c);
/* We can't use the connection buffers since they are used to accumulate
* new commands at this stage. But we are sure the socket send buffer is
* emtpy so this write will never fail actually. */
// 向从服务器发送一个同步 +CONTINUE ,表示 PSYNC 可以执行
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return REDIS_OK;
}
// 发送 backlog 中的内容(也即是从服务器缺失的那些内容)到从服务器
psync_len = addReplyReplicationBacklog(c,psync_offset);
redisLog(REDIS_NOTICE,
"Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset);

...
}

3.0后,在每次命令执行完之后,还会触发命令传播:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void processInputBufferAndReplicate(client *c) {
// 处理命令然后广播命令
// if this is a slave, we just process the commands
if (!(c->flags & CLIENT_MASTER)) {
processInputBuffer(c);
} else {
/* If the client is a master we need to compute the difference
* between the applied offset before and after processing the buffer,
* to understand how much of the replication stream was actually
* applied to the master state: this quantity, and its corresponding
* part of the replication stream, will be propagated to the
* sub-replicas and to the replication backlog. */
size_t prev_offset = c->reploff;
processInputBuffer(c);
// applied is how much of the replication stream was actually applied to the master state
size_t applied = c->reploff - prev_offset;
if (applied) {

replicationFeedSlavesFromMasterStream(server.slaves,
c->pending_querybuf, applied);
sdsrange(c->pending_querybuf,applied,-1);
}
}
}

所谓命令传播,就是当Master节点每处理完一个命令都会把命令广播给所有的子节点,而每个子节点接收到Master的广播过来的命令后,会在处理完之后继续广播给自己的子节点。
命令传播也是异步的操作,即Master节点处理完客户端的命令之后会立马向客户端返回结果,而不会一直等待所有的子节点都确认完成操作后再返回以保证Redis高效的性能。
4、什么时候会改为采用全量复制
上面的增量复制中,我们看到Redis实际上是将repl_backlog中的内容复制给了Slave,backlog是一块内存缓冲区(默认大小为1M),每次处理完命令之后,先写入缓冲区repl_backlog, 然后再发送给Slave。
如果一个Slave断连了一段时间,重启后Master可以将这块缓冲区内的内容复制给Slave,但是如果断连的时间比较长,也有可能会触发全量复制,因为缓冲区能保存的命令有限,只能至多保存的命令长度为repl_backlog_length,如果某个子节点落后当前最新命令的长度大于了repl_backlog_length,那么就会触发全量复制。
5、如果是全量复制
这种情况下,Master并不会直接将RDB文件传给Slave,而是先发给Slave+FULLRESYNC,;
代码:replication.c/masterTryPartialResynchronization()的末尾
什么时候Master会将RDB文件传给Slave呢?如果当前已经有可用的RDB文件,则直接将RDB文件传输给Slave;如果当前RDB正在备份过程中,Master会在每次RDB文件备份完毕后执行一次传输任务。
replication.c/syncCommand()末尾Master判断RDB当前的备份状态,设置标识表示当前RDB文件是否可用于复制,如果可以复制则会在之后的主事件循环中触发文件的发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
void syncCommand(redisClient *c) {
...

/* Here we need to check if there is a background saving operation
* in progress, or if it is required to start one */
// 检查是否有 BGSAVE 在执行
if (server.rdb_child_pid != -1) {
/* Ok a background save is in progress. Let's check if it is a good
* one for replication, i.e. if there is another slave that is
* registering differences since the server forked to save */
redisClient *slave;
listNode *ln;
listIter li;

// 如果有至少一个 slave 在等待这个 BGSAVE 完成
// 那么说明正在进行的 BGSAVE 所产生的 RDB 也可以为其他 slave 所用
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
}

if (ln) {
/* Perfect, the server is already registering differences for
* another slave. Set the right state, and copy the buffer. */
// 幸运的情况,可以使用目前 BGSAVE 所生成的 RDB
copyClientOutputBuffer(c,slave);
// 设置复制状态
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
/* No way, we need to wait for the next BGSAVE in order to
* register differences */
// 不好运的情况,必须等待下个 BGSAVE
c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
}
} else {
/* Ok we don't have a BGSAVE in progress, let's start one */
// 没有 BGSAVE 在进行,开始一个新的 BGSAVE
redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
addReplyError(c,"Unable to perform background save");
return;
}
// 设置状态
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
/* Flush the script cache for the new slave. */
// 因为新 slave 进入,刷新复制脚本缓存
replicationScriptCacheFlush();
}

...

}

主事件循环中发RDB文件的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {

...

/* Check if a background saving or AOF rewrite in progress terminated. */
// 检查 BGSAVE 或者 BGREWRITEAOF 是否已经执行完毕
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
int statloc;
pid_t pid;

// 接收子进程发来的信号,非阻塞
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;

if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

// BGSAVE 执行完毕
if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);

// BGREWRITEAOF 执行完毕
} else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);

} else {
redisLog(REDIS_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
updateDictResizePolicy();
}
}

...
}

接下来的调用包括:
replication.c/backgroundSaveDoneHandler()
replication.c/updateSlavesWaitingBgsave()
replication.c/sendBulkToSlave()

全量同步的大致流程如此,主要分为以下几步:

  1. Master节点开启子进程进行RDB文件生成
  2. Master节点将RDB文件发送给Slave节点
  3. Slave节点清空内存中的所有数据并删除之前的RDB文件
  4. Slave节点使用从Master接收的RDB文件恢复数据到内存中

需要注意的是,这个过程中的每一步都是耗时的IO操作,所以大部分时候Redis都是尽可能采用增量复制,而不是全量复制。
下面再来讨论Master如何发送及Slave如何接收这份数据。

主从复制执行过程 - Master如何发送及Slave如何接收复制数据

1、如果是全量复制
Slave和Master刚开始握手完毕后,会注册一个readSyncBulkPayload处理器,用于读取从Master发送过来的RDB文件。
2、Slave 将数据文件保存到磁盘上,然后再加载到内存中;
从库接收到RDB文件后,会先清空当前数据库,然后加载RDB文件,这是因为从库在开始和主库同步前可能保存了其他数据,为了避免之前数据的影响,从库需要先把当前数据库清空。
3、同步过程中主库产生的新数据也要同步给从库
主库同步数据给从库的过程中,主库不会被阻塞,仍然可以正常接收请求(否则Redis服务不就中断了?),但是这些请求中的写操作并没有记录到刚刚生成的RDB文件中,为了保证主从库的数据一致性,主库会在内存中用专门的replication buffer(代码中对应repl_backlog_buffer记录RDB文件生成后收到的所有写操作。
repl_backlog_buffer是一个环形缓冲区,主库会记录自己写到的位置,而从库则会记录自己已经读到的位置,可以使用repl_backlog_size来配置这个缓冲区的大小,如果配得过小,可能会导致增量复制阶段从库复制进度赶不上主库,进而导致从库重新进行全量复制。
在Master端定义的offset是master_repl_offset,在Slave端定义的offset是slave_repl_offset,正常情况下这两个偏移量是基本相等的。
增量同步期间,从库在发送psync的同时,会把自己当前的slave_repl_offset发给主库,主库判断自己的master_repl_offset和slave_repl_offset之间的差距,如果断连了,master_repl_offset可能会超过slave_repl_offset,那么将这超过的部分发给slave就可以恢复同步了。

主从复制存在的问题

主从库间网络断了怎么办?

Redis2.8之前,如果主从同步过程中出现了网络闪断,那么主从是会重新进行一次全量复制的,开销非常大。
Redis2.8之后,网络闪断后,主从会采取增量复制,将闪断期间的命令发给从库。

宕机恢复

因为 slave 顶多只负责处理读请求,slave 挂掉不会造成数据丢失的问题。
slave 宕机的情况下,应该要求客户端具有一定的熔断恢复能力,并且能在重启后快速恢复:

  1. 恢复正常后重新连接;
  2. Master 收到 Slave 的连接后,第一次同步时,主节点做一次 bgsave,并同时将后续修改操作记录到内存 buffer;
  3. Master 将其完整的 rdb 数据文件全量发送给 Slave;
  4. Slave 接收完成后将 rdb 镜像文件加载到内存,加载完成后,再通知 Master 将期间修改的操作记录同步到 Slave 节点进行重放就完成了同步过程;
  5. 如果 Master 同时收到多个 Slave 发来的同步请求,Master 只会在后台启动一个进程保存数据文件,然后将其发送给所有的 Slave,确保 Slave 正常。

主从复制无法应对 Master 挂掉的情况,实际上这种方案只能尽量保证数据不会丢失,不能保证服务的高可用性,为此,需要引入 Redis 的 Sentinel 机制。

客户端可以使用 WAIT 命令来请求同步复制某些特定的数据。但是,WAIT 命令只能确保在其他 Redis 实例中有指定数量的已确认的副本:在故障转移期间,由于不同原因的故障转移或是由于 Redis 持久性的实际配置,故障转移期间确认的写入操作可能仍然会丢失。

是否可以关闭持久化

作为复制方案中的一环,可以考虑关闭 Master 或 Slave 的持久化功能,但是并不建议关掉它们,因为:

  • 如果关闭 Master 的持久化:重启(重启功能可以由一些只能运维工具来保证,比如 K8S)的 Master 将从一个空数据集开始,如果一个 Slave 试图与它同步,那么这个 Slave 也会被清空。
  • 如果关闭 Slave 的持久化:重启的 Slave 需要从 Master 全量同步数据。

正如前所述,关闭了持久化并配置了自动重启的 Master 是危险的——会导致整个集群的数据全部被清空。
如果 Sentinel 集群用于需要高可用的场景、且 Master 被关闭掉了持久化功能,也是非常危险的:

  • 如果重启比较慢,Sentinel 的故障迁移机制重新选主,一个 Slave 会上升为 Master;
  • 如果重启得足够快,Sentinel 没有探测到故障,此时 Master 数据被清空了,而 Slave 仍从 Master 同步数据,这将引起上边提到的故障模式——数据将丢失。

因此,如果考虑磁盘性能过慢会导致延迟、关掉了持久化,那么自动重启进程这项应该被禁用。

如何保证主从数据的一致性 - 数据丢失窗口的存在

由于 Redis 使用异步复制,无法保证Slave和Master的实时一致性,因此总会有一个数据丢失窗口
那在什么情况下,从库会滞后执行同步命令呢?

  1. 一方面,主从库间的网络可能会有传输延迟,所以从库不能及时地收到主库发送的命令,从库上执行同步命令的时间就会被延后。
  2. 另一方面,即使从库及时收到了主库的命令,但是,也可能会因为正在处理其它复杂度高的命令(例如集合操作命令)而阻塞。此时,从库需要处理完当前的命令,才能执行主库发送的命令操作,这就会造成主从数据不一致。而在主库命令被滞后处理的这段时间内,主库本身可能又执行了新的写操作。这样一来,主从库间的数据不一致程度就会进一步加剧。

因为异步复制的本质,Redis主从复制无法完全避免数据的丢失,除了尽量保证网络连接状况良好外,还可以写一些监控程序来监控主从库间的复制进度,原理是实时给Redis实例发info replication命令得到master_repl_offsetslave_repl_offset这两个进度信息,计算这二者的差值即可得到主从复制进度的实时程度,如果某个从库进度差值大于我们预设的一个阈值,我们可以让客户端不再和这个从库连接进行数据读取,从而减少读到不一致数据的情况。

这个阈值当然不能设置得过低,否则可能导致所有从库都连不上了。

既然无法避免,那么只能退一步、控制影响范围了,Redis 可以保证:

  1. Redis slave 每秒钟都会 ping master,确认已处理的复制流的数量。
  2. Redis master 会记得上一次从每个 slave 都收到 ping 的时间。
  3. 用户可以配置一个最小的 slave 数量,使得它滞后 <= 最大秒数。
  4. 如果至少有 N 个 slave ,并且滞后小于 M 秒,则写入将被接受。如果条件不满足,master 将会回复一个 error 并且写入将不被接受。

这些条件是通过min-slaves-to-writemin-slaves-max-lag这两个配置来实现的:

  • min-slaves-to-write:最少有n个slave的连接还是健康的情况下才能提供服务,至于怎么判断连接是否健康,需要看下面一个配置;
  • min-slaves-max-lag:判断连接健康的最大延迟时间,slave每次PING Master时Master都会记录该Slave 最后一次PING的时间,如果最后一次PING成功的时间距今比较长了,就说明该Slave的连接状态很有可能已经出问题了。

对于给定的写入来说,虽然不能保证绝对实时的一致性,但至少数据丢失的时间窗限制在给定的秒数内。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# It is possible for a master to stop accepting writes if there are less than
# N slaves connected, having a lag less or equal than M seconds.
#
# The N slaves need to be in "online" state.
#
# The lag in seconds, that must be <= the specified value, is calculated from
# the last ping received from the slave, that is usually sent every second.
#
# This option does not GUARANTEES that N replicas will accept the write, but
# will limit the window of exposure for lost writes in case not enough slaves
# are available, to the specified number of seconds.
#
# For example to require at least 3 slaves with a lag <= 10 seconds use:
#
# min-slaves-to-write 3
# min-slaves-max-lag 10
#
# Setting one or the other to 0 disables the feature.
#
# By default min-slaves-to-write is set to 0 (feature disabled) and
# min-slaves-max-lag is set to 10.

min-slaves-to-write <slave 数量>
min-slaves-max-lag <秒数>

过期的 key 问题

由于复制的异步特性,对 key 设置过期时间和写入操作很容易导致 race condition 及导致数据集不一致,比如:

1
2
3
(1) sadd x 1
(2) expire x 100
(3) sadd x 2

在 Master 上,命令(3)是在过期前执行的,而 Slave 上可能因为延后导致命令(3)执行前 x 就已经过期了,此时 x 是没有过期时间的(ttl x 得到-1 表示不过期),这就导致了数据的不一致。

set 命令不会出现这个问题,因为 set 会将过期时间给覆盖成-1。当然情况比较复杂,也有可能是我没有想到。

为了保证针对过期的 key 的复制能够正确工作,Redis 提供如下保证:

  1. slave 不会让 key 过期,而是等待 master 让 key 过期。当一个 master 让一个 key 到期(或由于 LRU 算法将之驱逐)时,它会合成一个 DEL 命令并传输到所有的 slave。一旦一个 slave 被提升为一个 master ,它将开始独立地过期 key,而不需要任何旧 master 的帮助。
  2. 但是,由于这是 master 驱动的 key 过期行为,master 无法及时提供 DEL 命令,所以有时候 slave 的内存中仍然可能存在在逻辑上已经过期的 key 。为了处理这个问题,slave 使用它的逻辑时钟以报告只有在不违反数据集的一致性的读取操作(从主机的新命令到达)中才存在 key。用这种方法,slave 避免报告逻辑过期的 key 仍然存在。在实际应用中,使用 slave 程序进行缩放的 HTML 碎片缓存,将避免返回已经比期望的时间更早的数据项。
  3. 在 Lua 脚本执行期间,不执行任何 key 过期操作。当一个 Lua 脚本运行时,从概念上讲,master 中的时间是被冻结的,这样脚本运行的时候,一个给定的键要么存在要么不存在。这可以防止 key 在脚本中间过期,保证将相同的脚本发送到 slave ,从而在二者的数据集中产生相同的效果。

QA

AOF日志更全,为什么主从同步不使用AOF而是RDB呢?

网络传输效率:RDB直接存储数据,而不是命令,数据量更小,传输更快。
恢复效率:因为使用AOF恢复数据库的话是需要将AOF中记录的命令再执行一次的,这个效率远不如直接将RDB中的数据直接加载到内存里。

主从切换过程中,客户端能正常进行请求吗?

主库故障后从库仍能正常接收读请求,但主库挂掉了所以无法处理写请求。

如果实现应用程序不感知服务器的中断?

  1. 客户端可以缓存写请求,因为使用Redis的场景同步写请求比较少,且一般都不会在应用程序的关键路径上,所以在不能立刻执行写请求的情况下,客户端完全可以先把请求缓存起来,给应用程序返回一个确认即可。
  2. 另外,主从切换后,客户端要能及时和新主库重新建立连接。

主从数据发生不一致怎么办?

参考

  1. Redis复制实现原理
  2. Redis集群——主从复制数据同步

Redis 为了达到最快的读写速度将数据都读到内存中,并通过异步的方式将数据写入磁盘。所以 redis 具有快速和数据持久化的特征。如果不将数据放在内存中,磁盘 I/O 速度为严重影响 redis 的性能。在内存越来越便宜的今天,redis 将会越来越受欢迎。
如果设置了最大使用的内存,则数据已有记录数达到内存限值后不能继续插入新值。
不过 Redis 也提供了持久化的选项。

阅读全文 »

基本数据结构

String

特点

  1. 最大能存储 512MB == 536870912 B(byte) ;
  2. 二进制安全,在传输数据的时候,能保证二进制数据的信息安全,也就是不会被篡改、破译;如果被攻击,能够及时检测出来
  3. 能存储各种类型的数据,字符串、数字,以至对象(通过json序列化)、位图等。

基本使用

容量:512M

1
2
set aa 'str'
get aa

操作总结

1
2
3
4
5
6
set/get/del/append/strlen key
incr/decr/incrby/decrby key
getrange key start end/setrange key offset value(从offset处开始读取/覆盖)
setex key seconds value(set with expire插入key的同时设置过期时间)/setnx key value(set if not exists如果已存在则直接返回0)
mset/mget/msetnx key value {key value}(设置/读取多个key的值,msetnx比较特殊,要么都成功,要么一个都不执行,可以用来设置一个对象的多个不同字段)
getset key(设置并返回key对应的旧值,可以用于计数器的重置)

常见应用

字符串、jpg图片、序列化对象、一些复杂的计数功能的缓存

Hash

存储 String 类型键值对的映射表、对象

基本使用方法

容量:每个 Hash 可存 2^32 - 1(约 40 亿)个键值对

1
2
3
hmset user username 'name' password '123456' # 定义一个有两个元素的Hash表
hgetall user # 获取user中所有key和value
hget user username # 获取user中key为username的value

常见应用

单点登录(存<CookieId, 用户信息>,设置 30 分钟为缓存过期时间,能很好地模拟出类似 Session 的效果)。

List

String列表

基本使用方法

容量:每个 List 可存 2^32 - 1 个元素

1
2
3
lpush ball basketball soccer # 按顺序从左侧压入,如果ball列表不存在则创建
rpush ball volleyball
lrange 0 1 # 获取索引从0到1的值

操作总结

1
2
3
4
5
6
7
8
9
lpush/rpush/lrange
lpop/rpop
lindex
llen
lrem key
ltrim key
rpoplpush
lset key index value
linsert key before/after val1 val2

常见应用

简单的消息队列
基于 Redis 的分页功能(利用 lrang 命令,性能极佳,用户体验好)

Set

字符串的无序集合,使用 Hash 实现(key 和 value 相同的 Hash)

基本使用方法

容量:2^32 - 1 个成员

1
2
sadd myset li1 # 向集合myset中添加一个元素li1,若不存在则创建
smembers myset

常见应用

全局去重(为什么不用 JDK 自带的 Set 去重?因为我们的系统一般都是集群部署)
计算共同喜好、全部喜好、自己独有的喜好等功能(交集、并集、差集)

ZSet

字符串的有序集合,每个元素都关联一个 double 类型的权重参数 score,集合中的元素能够按 score 进行排列。

基本使用方法

容量:2^32 - 1 个成员

1
2
zadd myzset 0 abc
zrangebyscore myzset 0 10

常见应用

排行榜
取 Top N 操作
延时任务(https://www.cnblogs.com/rjzheng/p/8972725.html)
范围查找

原理

实现上类似于 Java 的 SortedSet 和 HashMap 的结合体,value 唯一(Set 结构的特点),每个 value 一个 score 代表该 value 的排序权重。
zset 内部是使用一种叫做跳跃列表的结构实现的。

Redis 数据结构的实现

数据结构的声明和实现

Redis数据结构及其实现
Redis 中的 set、zset 等结构在 Redis 中并不是由一个单独的数据结构实现的,而是会根据情况有所变化。

set

set和Java中的HashSet有点像,它本身是HashMap的封装,key是集合中的对象,而value直接用NULL代替。
但是注意一些特殊情况:

  • 创建集合对象时,如果发现集合内的元素可以使用整数(longlong)编码,则创建一个intset而不是dict;
  • 之前是intset编码的情况下,插入的新元素如果是非整数的,那么集合会被重新转换成dict编码的;或者插入的元素数量达到了阈值(512),也会自动转换成dict。
    创建代码见:t_set.c/setTypeCreate()

zset

zset同样有两种形态:ziplist编码和skiplist编码。

  • 按ziplist编码的情况下:
    zset本身就是个ziplist对象。
  • 按skiplist编码的情况下:
    zset的集合功能是通过dict实现的,这部分和set并无区别;
    zset的有序性是通过skiplist实现的,skiplist按分值排序成员,支持平均复杂度为O(logN)的按分值定位成员的操作。

执行zadd命令代码:t_zset.c/zaddGenericCommand()
创建zset对象:object.c/createZsetZiplistObject()object.c/createZsetObject()

SDS(Simple Dynamic String)

Redis 中的动态数组有以下特点:

  • 可动态扩展内存。sds 表示的字符串其内容可以修改,也可以追加。在很多语言中字符串会分为 mutable 和 immutable 两种,显然 sds 属于 mutable 类型的。
  • 减少修改字符串的内存重新分配次数
    C语言由于不记录字符串的长度,所以如果要修改字符串,必须要重新分配内存(先释放再申请),因为如果没有重新分配,字符串长度增大时会造成内存缓冲区溢出,字符串长度减小时会造成内存泄露。
    而对于SDS,由于len属性和free属性的存在,对于修改字符串SDS实现了空间预分配和惰性空间释放两种策略:
    1、空间预分配:对字符串进行空间扩展的时候,扩展的内存比实际需要的多,这样可以减少连续执行字符串增长操作所需的内存重分配次数。
    2、惰性空间释放:对字符串进行缩短操作时,程序不立即使用内存重新分配来回收缩短后多余的字节,而是使用 free 属性将这些字节的数量记录下来,等待后续使用。(当然SDS也提供了相应的API,当我们有需要时,也可以手动释放这些未使用的空间。)
  • 二进制安全(Binary Safe)。sds 能存储任意二进制数据,而不仅仅是可打印字符。
  • 与传统的 C 语言字符串类型兼容。
1
2
3
4
5
6
struct SDS<T> {
T capacity; // 数组容量
T len; // 数组当前长度
byte flags; // 特殊标识位,不理睬它
byte[] content; // 数组内容
}

下面的函数将 t 数组拷贝到 s 中,如果长度不够则需要进行扩容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* Append the specified binary-safe string pointed by 't' of 'len' bytes to the
* end of the specified sds string 's'.
*
* After the call, the passed sds string is no longer valid and all the
* references must be substituted with the new pointer returned by the call. */
sds sdscatlen(sds s, const void *t, size_t len) {
size_t curlen = sdslen(s); // 原字符串长度

// 按需调整空间,如果 capacity 不够容纳追加的内容,就会重新分配字节数组并复制原字符串的内容到新数组中
s = sdsMakeRoomFor(s,len);
if (s == NULL) return NULL; // 内存不足
memcpy(s+curlen, t, len); // 追加目标字符串的内容到字节数组中
sdssetlen(s, curlen+len); // 设置追加后的长度值
s[curlen+len] = '\0'; // 让字符串以\0 结尾,便于调试打印,还可以直接使用 glibc 的字符串函数进行操作
return s;
}

SDS 有 embstr 和 raw 两种存储结构,它们的区别是:

  1. 内存分配上:
    embstr 调用 1 次 malloc, 因此 redisObject 和 SDS 内存是连续分配的;
    raw 需要调用 2 次 malloc, 因此 redisObject 和 SDS 内存不连续分配
  2. 使用上:
    embstr 整体 64 byte, 正好和cpu cache line 64byte 一样, 可以更好的使用缓存, 效率更高

quicklist

Redis 早期版本存储 list 数据结构采用(元素少时 ziplist、多时 linkedlist )的方案,但是:

  1. 链表的附加空间太高,prev 和 next 指针就要占去 16 个字节(64 位系统);
  2. 链表每个节点都是单独分配,会加剧内存的碎片化。

因此在之后的版本中转换为了 quicklist 存储。
quicklist 是 ziplist 和 linkedlist 的混合体,它将 linkedlist 按段切分,每一段使用 ziplist 来紧凑存储,多个 ziplist 之间使用双向指针串接起来。
Redis-quicklist结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
struct ziplist {
...
}
struct ziplist_compressed {
int32 size;
byte[] compressed_data;
}
struct quicklistNode {
quicklistNode* prev;
quicklistNode* next;
ziplist* zl; // 指向压缩列表
int32 size; // ziplist 的字节总数
int16 count; // ziplist 中的元素数量
int2 encoding; // 存储形式 2bit,原生字节数组还是 LZF 压缩存储
...
}
struct quicklist {
quicklistNode* head;
quicklistNode* tail;
long count; // 元素总数
int nodes; // ziplist 节点的个数
int compressDepth; // LZF 算法压缩深度
...
}

ziplist

ziplist 是一种压缩存储的数组结构,当 Redis 中的集合数据结构很小,则它会使用这种紧凑的存储形式存储,元素之间紧挨着存储,查找就是对数组进行遍历找到目标对象。

  • zset 和 hash 容器在元素个数较少时会采用 ziplist 存储。当存储的对象数量小于 512 且所有 entry 的 value 值长度小于 64,采用 ziplist 存储,否则转为采用 hashtable 存储。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    import redis
    client = redis.StrictRedis()
    client.delete("hello")
    for i in range(512):
    client.hset("hello", str(i), str(i))
    print client.object("encoding", "hello")
    client.hset("hello", "512", "512")
    # 或者插入一个长度为65的值也能起到转化的作用
    print client.object("encoding", "hello")

可以上服务器上使用debug object命令验证数据结构的类型:

1
2
3
4
> zadd hgc_test 1.0 go 2.0 python 2.0 java
...
> debug object hgc_test
Value at:0x7f73c6d673a0 refcount:1 encoding:ziplist serializedlength:36 lru:1381596 lru_seconds_idle:77

结构

Redis-ziplist结构

1
2
3
4
5
6
7
8
9
10
11
12
struct ziplist<T> {
int32 zlbytes; // 整个压缩列表占用字节数
int32 zltail_offset; // 最后一个元素距离压缩列表起始位置的偏移量,用于快速定位到最后一个节点
int16 zllength; // 元素个数
T[] entries; // 元素内容列表,挨个挨个紧凑存储
int8 zlend; // 标志压缩列表的结束,值恒为 0xFF
}
struct entry {
int<var> prevlen; // 前一个 entry 的字节长度
int<var> encoding; // 元素类型编码
optional byte[] content; // 元素内容
}
  • zltail_offset 字段可以快速定位到 ziplist 中的最后一个节点,可以用于倒序遍历,entry 中的 prevlen 表示前一个 entry 的字节长度,可以用于倒序遍历时找到下一个元素的位置;
  • encoding 记录编码类型,ziplist 利用该字段决定后面的 content 内容的形式,比如用00xxxxxx表示最大长度为 63 的短字符串,01xxxxxx xxxxxxxx表示中等长度的字符串;

插入

ziplist 是紧凑存储的,没有冗余空间,因此插入一个新元素就需要调用 realloc 重新分配内存空间,并将之前的内容一次性拷贝到新的内存空间中。
重新分配空间是比较耗时的,因此 ziplist 不适合存储大量数据。

更新/删除

修改或删除一个元素后其后一个位置的元素中的 prevlen 也需要级联更新,prevlen 字段又是变长的,所以可能会导致连锁反应。

ziplist vs dict

为什么 hash 结构中会采用 ziplist 而不是 dict,主要原因如下:

  1. 数据量小时,ziplist 的速度也很快;
  2. 数据量大时,ziplist 在每次插入或修改时引发的 realloc 操作会有更大的概率造成内存拷贝,从而降低性能,而且数据项过多的时候,在 ziplist 上查找指定数据项的性能会变得很低,因为在 ziplist 上的查找需要进行遍历。

dict(字典)

dict 是 Redis 中使用最广泛的数据结构:

  1. hash 结构的数据会用到字典;
  2. 整个 Redis 数据库的所有 key 和 value 也组成了一个全局字典;
  3. 带过期时间的 key 集合也是一个字典;
  4. set 结构的底层实现也是字典,只是所有 value 都是 NULL;
  5. zset 集合中存储 value 和 score 值的映射关系也是通过 dict 结构实现的。
1
2
3
4
5
6
7
8
9
10
struct RedisDb {
dict* dict; // all keys key=>value
dict* expires; // all expired keys key=>long(timestamp)
...
}

struct dict {
...
dictht ht[2];
}
1
2
3
4
struct zset {
dict *dict; // all values value=>score
zskiplist *zsl;
}

hashtable

dict 中的 hashtable 结构和 Java 中的 HashMap 类似,使用一个数组来保存所有的哈希桶,通过siphash函数来将 key 散列到数组中的某个桶上,每个哈希桶都是一个链表,也就是说如果发生哈希冲突,则将新元素直接插入到桶的头部。

1
2
3
4
5
6
7
8
9
10
11
struct dictEntry {
void* key;
void* val;
dictEntry* next; // 链接下一个 entry
}
struct dictht {
dictEntry** table; // 二维
long size; // 第一维数组的长度
long used; // hash 表中的元素个数
...
}

扩容:渐进式 rehash

正常情况下,当 hashtable 中元素的个数等于第一维数组的长度时、来了一个新增/修改/删除操作,就会触发扩容,扩容的新数组是原数组大小的 2 倍。

存在一个特殊情况:如果 Redis 正在做 bgsave,为了减少内存页的过多分离 (Copy On Write),Redis 尽量不去扩容 (dict_can_resize),但是如果 hash 表已经非常满了,元素的个数已经达到了第一维数组长度的 5 倍 (dict_force_resize_ratio),说明 hash 表已经过于拥挤了,这个时候就会强制扩容。

Redis-dict扩容rehash
一般情况下 dict 中只有一个 hashtable 有值,但是在扩容时会分配另一个新的 hashtable,然后执行渐进式的数据迁移,避免一次性对所有 key 执行 rehash,而是将 rehash 操作分散到了对 dict 的各个增删改查操作中去了。

  1. 在扩容过程中,如果有新增元素,则该元素会被同时添加到新 hashtable 中;
  2. 查询、删除、修改操作中,会先查询旧 hashtable,若存在则迁移这个 key 所在的桶并返回元素,若不存在则到新 hashtable 中查找元素。
  3. 有一个异步线程执行定时任务对字典主动迁移。

dict 之所以这样设计,是为了避免 rehash 期间单个请求的响应时间剧烈增加。
当旧 hashtable 中无元素时,即代表迁移完毕,这时会将新旧 hashtable 的指针交换,旧的会被删除,而新的则取而代之。

缩容

当 hash 表因为元素的逐渐删除变得越来越稀疏时,Redis 会对 hash 表进行缩容来减少 hash 表的第一维数组空间占用。缩容的条件是元素个数低于数组长度的 10%。
缩容不会考虑 Redis 是否正在做 bgsave,因为 COW 的特性是当内存页上的数据被修改时会复制一页做修改,如果删除操作并不会触发删除内存页的数据,操作系统回收内存机制导致的。

全局哈希表

get allen b中的a和b是不同数据结构的对象,他们统统被存储在一个叫全局哈希表的地方。
哈希表中的每个哈希桶存储的不是值本身,而是指向它们的指针。
Redis中的全局哈希表
代码定义在:redis.h/redisDb

缺点:

  1. 过多的哈希冲突容易产生过长的哈希桶(哈希冲突链)。
    为了减少这个问题产生的影响,需要对哈希表进行rehash操作,这个rehash操作和dict数据结构的rehash原理是一样的。

    全局哈希表实际上就是dict,可以看源码中的定义。

优点:

  1. 合适的散列函数和扩容机制可以保证O(1)的操作复杂度。

skiplist

zset 中除了 dict(字典)外,还会用一个 skiplist 来提供按score排序的要求,以实现指定 score 的范围来获取 value 列表的功能。

Redis-skiplist结构

1
2
3
4
5
6
7
8
9
10
11
12
struct zslnode {
string value;
double score;
zslnode*[] forwards; // 多层连接指针
zslnode* backward; // 回溯指针
}

struct zsl {
zslnode* header; // 跳跃列表头指针
int maxLevel; // 跳跃列表当前的最高层
map<string, zslnode*> ht; // hash 结构的所有键值对
}
  • 各层均为一个有序的链表结构;
  • 层数越大,节点越少;
  • 有一个 header 节点作为哨兵,value=null,score=Double.MIN_VALUE。

插入

插入时会先自顶向下找到新节点在跳表中底层的插入位置,插入每一层时都有概率晋升到更高一层,在 Redis 中是 25%。

删除

删除每一层上的对应节点。

更新

如果不影响排序则直接更新,否则会先删除再重新插入。

布隆过滤器

HyperLogLog

布隆过滤器用于实现contains的需求,而 HyperLogLog 主要用于实现count
同样是一个特别大的位数组,HyperLogLog 将位数组拆分为桶,每个桶是连续的 6 个位,计数时并非单独对某个桶计数,而是:

  • set 操作:计算 key 的散列值,为一个 64 位的数字,前 14 位是桶的位置,桶记录后 50 位中第一个 1 的位置 count,并且count = max(count, oldCount),即每次记录最大的计数。
  • count 操作:因为是概率算法,每个桶的计数值并不精确,但是所有桶的调和均值非常接近真实的计数值。

pubsub

用于实现轻量级的发布订阅功能。

geohash

QA

使用string还是hash?

当数据量少时,使用hash明显更加节省内存,因为数据少时hash会转成ziplist的结构,而string每个kv都需要一大堆的额外空间存储元数据。

如何使用Redis的数据结构实现统计?

  1. 需要支持集合运算(差集、交集、并集)的场合
    使用set、zset,数据量少时会转成ziplist节省内存。
  2. 需要进行二值统计的场合
    使用bitmap
  3. 需要大规模统计,且不要求精确统计的场合
    使用HyperLogLog

采用渐进式hash时,如果实例暂时没有接收到新请求,是不是就不会做rehash了?

不会,还有一个定时任务每隔100ms执行rehash,而且每次执行时长不会超过1ms,以免影响其他任务。

参考

  1. redis源码解析

Redis架构图
Redis知识框架
Redis问题画像

以上图片来自极客时间的《Redis核心技术与实战》。
对原理的分析都是基于Redis3.0版本的代码,现在最新的版本应该是6.0,很多功能都是后面的版本引入的,因此这篇里不会描述多线程这些功能。

为什么使用 Redis

Redis 的缺点 & 优点

特性及优势:

  1. 内存数据库,吞吐率不受磁盘影响;
  2. 每秒可以处理超过 10 万次读写操作;
  3. 多数据结构支持,包括 string、hash、list、set、zset、Bitmaps、Hyperloglog、Geo、Pub/Sub、Redis Module、BloomFilter、RedisSearch、Redis-ML 等,支持绝大多数应用场景;
  4. Replication(复制);
  5. lua(支持服务端执行复杂的业务逻辑);
  6. LRU eviction(缓存淘汰);
  7. Transactions;
  8. Persistence(持久化),包括 rdb(快照)和 AOF 两种;
  9. Sentinel(哨兵);
  10. Cluster(分区)。

但是也不能忽略 Redis 本身的一些缺点:

  1. 数据库容量受到物理内存的限制,不能用作海量数据的高性能读写,因此 Redis 适合的场景主要局限在较小数据量的高性能操作和运算上;
  2. 缓存和数据库双写一致性问题;
  3. 缓存雪崩问题;
  4. 缓存击穿问题;
  5. 缓存的并发竞争问题。

Redis & Memcached

Redis 相对 Memcached 来说有以下优点:

  1. memcached 所有的值均是简单的字符串,redis 作为其替代者,支持更为丰富的数据类型
  2. redis 的速度比 memcached 快很多
  3. redis 可以持久化其数据

Redis 和 Memcached 之间存在以下区别:

  1. 存储方式 Memecache 把数据全部存在内存之中,断电后会挂掉,数据不能超过内存大小。 Redis 有部份存在硬盘上,这样能保证数据的持久性。
  2. 数据支持类型 Memcache 对数据类型支持相对简单。 Redis 有复杂的数据类型。
  3. 使用底层模型不同 它们之间底层实现方式 以及与客户端之间通信的应用协议不一样。 Redis 直接自己构建了 VM 机制 ,因为一般的系统调用系统函数的话,会浪费一定的时间去移动和请求。

应用场景

  1. 会话缓存(Session Cache)
    最常用的一种使用 Redis 的情景是会话缓存(session cache)。用 Redis 缓存会话比其他存储(如 Memcached)的优势在于:Redis 提供持久化。当维护一个不是严格要求一致性的缓存时,如果用户的购物车信息全部丢失,大部分人都会不高兴的,现在,他们还会这样吗?
    幸运的是,随着 Redis 这些年的改进,很容易找到怎么恰当的使用 Redis 来缓存会话的文档。甚至广为人知的商业平台 Magento 也提供 Redis 的插件。
  2. 全页缓存(FPC)
    除基本的会话 token 之外,Redis 还提供很简便的 FPC 平台。回到一致性问题,即使重启了 Redis 实例,因为有磁盘的持久化,用户也不会看到页面加载速度的下降,这是一个极大改进,类似 PHP 本地 FPC。
    再次以 Magento 为例,Magento 提供一个插件来使用 Redis 作为全页缓存后端。
    此外,对 WordPress 的用户来说,Pantheon 有一个非常好的插件 wp-redis,这个插件能帮助你以最快速度加载你曾浏览过的页面。
  3. 队列
    Reids 在内存存储引擎领域的一大优点是提供 list 和 set 操作,这使得 Redis 能作为一个很好的消息队列平台来使用。Redis 作为队列使用的操作,就类似于本地程序语言(如 Python)对 list 的 push/pop 操作。
    当然要将 Redis 作为消息队列投入生产环境还有很多设计要点,比如采用 sleep 一段时间重试还是 blpop 阻塞、主题订阅、如何应对消费者下线导致的消息丢失问题(如何保证消息一定能被消费)等。
    Redis 作为消息队列坑比较多,如果希望少点麻烦且对服务质量有一定要求,最好还是采用 RocketMQ 这些比较成熟的方案。
  4. 延时队列
    使用 zset,时间戳作为 score,消费者用 zrangebyscore 指令获取 N 秒之前的数据轮询进行处理,这种思路和 JDK 中的 ScheduledThreadPoolExecutor 有点像。
  5. 排行榜/计数器
    Redis 在内存中对数字进行递增或递减的操作实现的非常好。集合(Set)和有序集合(Sorted Set)也使得我们在执行这些操作的时候变的非常简单,Redis 只是正好提供了这两种数据结构。所以,我们要从排序集合中获取到排名最靠前的 10 个用户–我们
    称之为“user_scores”,我们只需要像下面一样执行即可:
    当然,这是假定你是根据你用户的分数做递增的排序。如果你想返回用户及用户的分数,你需要这样执行:
    ZRANGE user_scores 0 10 WITHSCORES
    Agora Games 就是一个很好的例子,用 Ruby 实现的,它的排行榜就是使用 Redis 来存储数据的,你可以在这里看到。
  6. 发布/订阅
    最后(但肯定不是最不重要的)是 Redis 的发布/订阅功能。发布/订阅的使用场景确实非常多。我已看见人们在社交网络连接中使用,还可作为基于发布/订阅的脚本触发器,甚至用 Redis 的发布/订阅功能来建立聊天系统!(不,这是真的,你可以去核实)。
    Redis 提供的所有特性中,我感觉这个是喜欢的人最少的一个,虽然它为用户提供如果此多功能。
  7. 分布式锁
    不要用 setnx+expire,因为如果进程 crash 或重启这个锁就直接失效了。实际上 set 命令本身就包含超时和 cas 的设置。
  8. 扫描
    如果 Redis 中有 1 亿多个 key,其中有 10W+个 key 有固定的前缀(这种场景非常常见),如何将它们全部找出来?
    由于 Redis 的单线程特性,使用 keys 可能会阻塞 Redis 进程,最好换成 scan 命令,不过可能提取出的部分 key 是重复的,需要客户端做去重。

QA

Redis与其他KV存储有何不同?

  • 更多复杂的数据结构,支持更多特殊的场景;
  • Redis是内存数据库,但是支持持久化到磁盘。

有人说 Redis 只适合用来做缓存,当数据库来用并不合适,为什么?

Redis 的事务并不严格:
* A(原子性):Redis 支持事务,所有命令会被保存到一个事务队列中,服务器接收到 exec 时才会被真正执行,注意如果中间出错,事务不会回滚,后面的指令还会继续执行;而且如果涉及到复杂的逻辑判断,则只能通过lua 脚本实现“伪原子性”,说它是“伪原子性”是因为虽然脚本可以一次性执行多条命令,如果中间某个命令出错还是会无法保证“要么全部执行,要么都不执行”的要求。
* I(隔离性):Redis 是单线程模型,因此可以保证隔离性。
* D(持久性):Redis 是内存数据库,服务器意外崩溃会导致内存中的数据丢失,除非开启 AOF,并且配置成每次写入都记日志,但是这样又会极大地影响效率,所以一般会配置成混合模式的持久化。

Redis会占用多少内存空间?

  • An empty instance uses ~ 3MB of memory.
  • 1 Million small Keys -> String Value pairs use ~ 85MB of memory.
    那么10亿个kv,大概就会占用85G的内存了。
  • 1 Million Keys -> Hash value, representing an object with 5 fields, use ~ 160 MB of memory.

64位机器会占用比32位机器更多的内存,因为指针在64位机器上占用更多空间,但同时64位机器也可以有更大的内存空间。

Redis 的底层数据结构有哪些

sds:string 使用,变长字符串,不够的情况下重新分配空间并将老字符串数据拷贝过去;
dict:字典应用很多,包括 Redis 数据库中保存所有 key-value、hash、set、zset。dict 类似 Java 中的 HashMap,将 key 散列到哈希桶数组中,每个哈希桶都是一个链表,插入就是插入到链表头部,当元素超过了容量的一半后会启动渐进式 rehash 进行扩容。
ziplist:相当于一个数组,查询时需要遍历一次,每次插入都需要 realloc 重新分配一个新的更大的数组,然后把老数组内容拷贝过去。
quicklist:由于 linkedlist 附加空间成本高且容易产生碎片,因此 Redis 里的 quicklist 设计成了 linkedlist 和 ziplist 的结合,它将 linkedlist 按段切分,每一段使用 ziplist 存储;
skiplist:skiplist 用于实现 zset 中按 score 排序的要求,插入时先自顶向下查位置,然后按概率计算该节点应该分配到几层。

存储数据选择 string 还是 hash?

从业务层面来看,如果要存好多字段的对象、而且这个对象的每个字段都会单独拿出来用,则可以考虑使用 hash,否则没有太多限制条件。
从性能角度来看,如果存的字段少,hash 会使用 ziplist 结构存储,性能多少受点影响,而且还要考虑转换结构和渐进式扩容对性能的损耗。
从节约空间的角度来看,string 的 key 一般都会加个前缀,一般会比 hash 占用更多的空间,不过差距不大。

设计 redis 排序,数据结构是金额+花钱的时间,金额越大,时间越早越靠前

用 zset 存,score 是金额拼上时间,金额放高位,MAX_INT 和时间作差放低位,查询时使用ZREVRANGE命令查询。

hash 中哈希冲突怎么解决的

分两种情况:hash 在数据量小时结构是 ziplist,这时插入不会做冲突检测,插入到目标位置后就向后统一移动数据,给新插入的数据项流出空间;在数据量大时结构是 dict,这种结构和 Java 中的 HashMap 类似,使用链表来处理冲突。

  1. 说说 Redis 为什么那么快。
    单线程模型->减少了线程间上下文切换的开销。
    多路复用的 IO 模型->单线程监控多个连接。
  2. 为什么 Redis 记录 AOF 日志是先执行指令然后再记录 AOF 日志?而不是像其他存储引擎一样反过来?
    主要是因为 Redis 本身是缓存而不是 db,侧重点不同,db 先写日志是为了失败回滚,而 Redis 持久化是一个附加功能,只能保证数据不会完全丢失。

Redis 淘汰时,如果读取,会不会数据不完整

redis 的淘汰分两种:

  • 一种是过期,这种不会导致这种问题,因为查询时会判断下过期时间,过期了就不返回;
  • 另一种是超过内存容量淘汰,比如 LRU,这种也不会导致这种问题,因为执行每个命令时都会检查下缓存是否超出了阈值,可见代码server.c/processCommand
    Redis-执行命令前检查缓存是否溢出

Redis 的持久化原理是什么

Redis 有两种持久化方式:RDB 和 AOF
RDB 是快照,AOF 记录了写操作,效率起见,一般 RDB 作为 checkpoint,checkpoint 后的数据通过 AOF 恢复。

RDB 和 AOF 之间的区别

RDB 二进制文件可以直接加载到内存速度较快;AOF 要重放命令,所以速度比较慢。
RDB 需要全量备份,AOF 可以增量备份,二者的应用场景不同。

Redis的复制原理是什么?

master 会启动一个后台进程进行持久化(RDB or AOF),第一次连接时会将 RDB 文件发给 slave,slave 先保存到磁盘,之后加载到内存中;如果不是第一次连接,slave 连接 master 后通过 PSYNC 命令告知自己同步的起始位置,master 将增量部分 AOF 文件发送给 slave。

Redis 持久化期间,主进程还能对外提供服务吗?为什么

能。
因为 Redis 的复制是通过 fork 子进程实现的,父进程仍然可以接收请求。

持久化期间,Redis如何处理新写入的数据呢,这个数据也会直接进行持久化吗?

不会。
因为 Redis 复制是通过 fork 子进程实现的,由于 COW 机制,子进程只能看到老数据。

主从复制为什么会发生延迟?怎么解决

延迟无法避免,比如主从之间的网络抖动、slave 发生阻塞(如 IO)等情况。
解决办法有两种:

  • min-slave-to-write Nmin-slave-max-lag M,控制 Master,只有在至少有 N 个 slave 正在工作,并且滞后时间均小于 M 秒的情况下,Master 将不接受写入请求;
  • slave-serve-stale-data,控制从库对主库失去响应或复制进行过程中从库的表现,为 yes 则从库会继续响应客户端的请求,为 no 则除去 INFO 和 SLAVOF 命令之外的任何请求都会返回一个错误SYNC with master in progress
  • 编写外部监控程序,如果某个 slave 延迟较大,则通知 client 不要读这个 slave。

Redis 怎么实现高可用

从复制、Sentinel 到 Cluster

sentinel 中,使用客户端是怎么连接服务器的?(Redisson 配置)

见《Redis 客户端》。

哈希槽原理?和一致性哈希的区别?怎么落点

redis cluster 默认分配了 16384 个 slot,当我们 set 一个 key 时,会用CRC16算法来取模得到所属的 slot,然后将这个 key 分到哈希槽区间的节点上,具体算法就是:CRC16(key) % 16384。所以我们在测试的时候看到 set 和 get 的时候,直接跳转到了 7000 端口的节点。
哈希槽与一致性哈希的区别:哈希槽由客户端来重定向到目标 slot 所在节点,一致性哈希需要由服务器端重定向到目标节点,而且需要按顺时针方向一个一个节点递归地找。

Redis雪崩、击穿、穿透等现象是怎么出现的?怎么解决

  1. 缓存穿透
    缓存穿透指查询一个不存在的数据,出于容错考虑这个查询会穿透到 DB 层,如果这种带穿透的查询特别多可能会把 DB 打挂掉。
    解决办法:使用布隆过滤器,保存所有可能存在的数据到一个足够大的 bitmap 中,由于布隆过滤器的特性,一定不存在的数据在 bitmap 中一定找不到,从而可以很大程度上避免对底层存储系统的查询压力;还有一种更简单的方法,就是在查询返回结果为空时也把这个空结果缓存起来,但是它的过期时间会短一些,最长时间不超过 5 分钟。
  2. 缓存雪崩
    缓存雪崩指的是设置缓存时采用了相同的过期时间,导致缓存在同一时间同时失效,请求全部打到 DB,DB 瞬时压力过大导致雪崩。
    解决办法:缓存失效时间随机化,在原有失效时间基础上加上一个随机值,可以使得过期时间的重复率降低;加锁并令请求排队,使得请求串行化,避免所有请求都查询数据库,不过这样会导致性能的降低。
  3. 缓存击穿
    缓存击穿指的是某个 key 在过期时正好有大量请求访问该 key,这些请求会同时回表,可能会瞬间将后端 DB 打挂。
    解决办法:使用互斥锁,缓存失效时先加锁,避免并发回表;一些长时间不变的数据完全可以不设置过期时间,或者过期时间特别长。

主从复制的流程?传的是文件吗?

流程见《主从同步》。
如果是全量同步,同步时会先同步 RDB 文件,再同步增量写命令;
如果是部分重同步,则只同步增量写命令。

中间传输失败怎么办?中间传输不一致怎么办

如果上次传输中断,则下次同步时从中断位置开始执行部分重同步。

参考

  1. FAQ
  2. 使用vscode(gdb)调试redis

应用

  1. Redis strings vs Redis hashes to represent JSON: efficiency?

数据结构

  1. Redis 源码涉及 C 语言
  2. Redis 内部数据结构详解(1)——dict
  3. Redis 内部数据结构详解(2)——sds
  4. Redis 内部数据结构详解(3)——robj
  5. Redis 内部数据结构详解(4)——ziplist
  6. Redis 内部数据结构详解(5)——quicklist
  7. Redis 为什么用跳表而不用平衡树?
  8. Redis 中的集合类型是怎么实现的?

Persistence

  1. 剖析 Redis RDB 文件
  2. Redis 源码分析–RDB 实现源码阅读
  3. Redis 源码分析–AOF 文件全量重写源码阅读
  4. Redis 源码分析–AOF 文件增量追写源码阅读

客户端

  1. Redis 如何处理客户端连接
  2. 剖析 Redis 协议
  3. 剖析 Redis 协议(续)

主从复制

  1. 复制
  2. redis 系列–主从复制以及 redis 复制演进

Sentinel & Cluster

  1. 分区:怎样将数据分布到多个 redis 实例
  2. Redis 的 Sentinel 文档
  3. Redis 集群教程
  4. Redis 集群规范
  5. 一致性哈希和哈希槽对比
  6. Redis 集群扩容和缩容

架构迁移

  1. Redis 集群迁移案例
  2. redis-migrate-tool
  3. redis-port
  4. redis-migration
    redis-migration
    redis-migration:独创的 redis 在线数据迁移工具

Twemproxy

  1. Twemproxy

Codis

  1. Codis

Redisson

  1. Redisson

系统优化

  1. Redis 响应延迟问题排查

为什么 Redis 这么快

Redis 采用的是一种单线程工作模型,它能这么快主要归功于下面几个策略:

  1. 完全基于内存,绝大部分请求是纯粹的内存操作,非常快速。数据存在内存中,类似于 HashMap,HashMap 的优势就是查找和操作的时间复杂度都是 O(1);
  2. 数据结构简单,对数据操作也简单,Redis 中的数据结构是专门进行设计的;
  3. 采用单线程,避免了不必要的上下文切换和竞争条件,也不存在多进程或者多线程导致的切换而消耗 CPU,不用去考虑各种锁的问题,不存在加锁释放锁操作,没有因为可能出现死锁而导致的性能消耗;
  4. 使用底层模型不同,它们之间底层实现方式以及与客户端之间通信的应用协议不一样,Redis 直接自己构建了 VM 机制 ,因为一般的系统调用系统函数的话,会浪费一定的时间去移动和请求;
  5. 使用多路 I/O 复用模型,非阻塞 IO;
    多路 I/O 复用模型是利用 select、poll、epoll 可以同时监察多个流的 I/O 事件的能力,在空闲的时候,会把当前线程阻塞掉,当有一个或多个流有 I/O 事件时,就从阻塞态中唤醒,于是程序就会轮询一遍所有的流(epoll 是只轮询那些真正发出了事件的流),并且只依次顺序的处理就绪的流,这种做法就避免了大量的无用操作。
    Redis-Client 在操作的时候,会产生具有不同事件类型的 socket,在服务端,有一段 I/O 多路复用程序,将其置入队列之中,然后,文件事件分派器依次去队列中取,转发到不同的事件处理器中(对这个 I/O 多路复用机制,Redis 还提供了 select、epoll、evport、kqueue 等多路复用函数库)。
    这里“多路”指的是多个网络连接,“复用”指的是复用同一个线程。采用多路 I/O 复用技术可以让单个线程高效的处理多个连接请求(尽量减少网络 IO 的时间消耗),且 Redis 在内存中操作数据的速度非常快,也就是说内存内的操作不会成为影响 Redis 性能的瓶颈,主要由以上几点造就了 Redis 具有很高的吞吐量。
    多路IO复用模型

一些常见的进程模型

  1. 单进程多线程模型:MySQL、Memcached、Oracle(Windows 版本);
  2. 多进程模型:Oracle(Linux 版本);
  3. Nginx 有两类进程,一类称为 Master 进程(相当于管理进程),另一类称为 Worker 进程(实际工作进程)。启动方式有两种:
    1. 单进程启动:此时系统中仅有一个进程,该进程既充当 Master 进程的角色,也充当 Worker 进程的角色。
    2. 多进程启动:此时系统有且仅有一个 Master 进程,至少有一个 Worker 进程工作。
    3. Master 进程主要进行一些全局性的初始化工作和管理 Worker 的工作;事件处理是在 Worker 中进行的。

为什么是 NIO

对于优化单个 server 节点的网络层,多使用 NIO 方式,server 端与 client 端在多次通讯的情况下使用 TCP 长连接维持会话,比如 Redis epoll 模型,RocketMq 的 netty 模型
对于高性能 Server 节点,在处理好网络请求同时,还要保证 server 端逻辑可以快速执行完成,这就涉及到合理的数据结构与线程模型。
在 Redis 中,采用的是 Reactor 模式实现文件事件处理器:
Redis-事件处理模型

  1. IO 多路复用
    根据平台不同选择不同的 IO 复用模型,比如 Linux 就是选择 epoll,select 是备选方案,不过正常情况下根本不会采用,因为 select 效率低,且有文件描述符监听上限。
  2. 封装不同 IO 模型,为事件处理器提供统一接口

Redis单线程多路复用IO模型实现 - 事件注册

Redis服务器的初始化过程中包括了对事件处理器的初始化。
1、服务器启动期间初始化事件处理器
服务器初始化代码:redis.c/initServer
初始化事件处理器代码:ae.c/aeCreateEventLoop
2、根据系统的不同,选择不同的底层IO事件处理实现
比如linux的话,会选择epoll作为实现:epoll.c/aeApiCreate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/*
* 创建一个新的 epoll 实例,并将它赋值给 eventLoop
*/
static int aeApiCreate(aeEventLoop *eventLoop) {

aeApiState *state = zmalloc(sizeof(aeApiState));

if (!state) return -1;

// 初始化事件槽空间
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}

// 创建 epoll 实例
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}

// 赋值给 eventLoop
eventLoop->apidata = state;
return 0;
}

当服务器需要监听某些事件时,会注册对这些事件的监听器,下面是注册监听器的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/*
* 根据 mask 参数的值,监听 fd 文件的状态,
* 当 fd 可用时,执行 proc 函数
*/
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}

if (fd >= eventLoop->setsize) return AE_ERR;

// 取出文件事件结构
aeFileEvent *fe = &eventLoop->events[fd];

// 监听指定 fd 的指定事件
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;

// 设置文件事件类型,以及事件的处理器
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;

// 私有数据
fe->clientData = clientData;

// 如果有需要,更新事件处理器的最大 fd
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;

return AE_OK;
}

比如Redis服务器要接受客户端的请求,就要注册一个监听连接事件,回调函数中会为客户端连接创建一个socket,并注册可读文件事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void initServer() {

...

/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
// 为 TCP 连接关联连接应答(accept)处理器
// 用于接受并应答客户端的 connect() 调用
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}

...
}

Redis单线程多路复用IO模型实现 - 事件循环(处理)

Redis中定义了两种事件:时间事件TimeEvents、文件事件FileEvents:

  • TimeEvents:一般都是一些定时任务,实际上现在时间事件只应用于服务器启动时注册的serverCron定时任务的执行;
  • FileEvents:socket文件的IO事件,比如上面的监听连接的事件就是文件事件。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    /*
    * 事件处理器的主循环
    */
    void aeMain(aeEventLoop *eventLoop) {

    eventLoop->stop = 0;

    while (!eventLoop->stop) {

    // 如果有需要在事件处理前执行的函数,那么运行它
    if (eventLoop->beforesleep != NULL)
    eventLoop->beforesleep(eventLoop);

    // 开始处理事件
    aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
    }

    // 1、看是否有事件到达了执行时间
    // 2、如果有,则执行这些事件
    /* Process every pending time event, then every pending file event
    * (that may be registered by time event callbacks just processed).
    *
    * 处理所有已到达的时间事件,以及所有已就绪的文件事件。
    *
    * Without special flags the function sleeps until some file event
    * fires, or when the next time event occurs (if any).
    *
    * 如果不传入特殊 flags 的话,那么函数睡眠直到文件事件就绪,
    * 或者下个时间事件到达(如果有的话)。
    *
    * If flags is 0, the function does nothing and returns.
    * 如果 flags 为 0 ,那么函数不作动作,直接返回。
    *
    * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
    * 如果 flags 包含 AE_ALL_EVENTS ,所有类型的事件都会被处理。
    *
    * if flags has AE_FILE_EVENTS set, file events are processed.
    * 如果 flags 包含 AE_FILE_EVENTS ,那么处理文件事件。
    *
    * if flags has AE_TIME_EVENTS set, time events are processed.
    * 如果 flags 包含 AE_TIME_EVENTS ,那么处理时间事件。
    *
    * if flags has AE_DONT_WAIT set the function returns ASAP until all
    * the events that's possible to process without to wait are processed.
    * 如果 flags 包含 AE_DONT_WAIT ,
    * 那么函数在处理完所有不许阻塞的事件之后,即刻返回。
    *
    * The function returns the number of events processed.
    * 函数的返回值为已处理事件的数量
    */
    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
    * file events to process as long as we want to process time
    * events, in order to sleep until the next time event is ready
    * to fire.
    */
    if (eventLoop->maxfd != -1 ||
    ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
    int j;
    aeTimeEvent *shortest = NULL;
    struct timeval tv, *tvp;

    // 获取最近的时间事件
    if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
    shortest = aeSearchNearestTimer(eventLoop);
    if (shortest) {
    // 如果时间事件存在的话
    // 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间
    long now_sec, now_ms;

    /* Calculate the time missing for the nearest
    * timer to fire. */
    // 计算距今最近的时间事件还要多久才能达到
    // 并将该时间距保存在 tv 结构中
    aeGetTime(&now_sec, &now_ms);
    tvp = &tv;
    tvp->tv_sec = shortest->when_sec - now_sec;
    if (shortest->when_ms < now_ms) {
    tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
    tvp->tv_sec --;
    } else {
    tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
    }

    // 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞)
    if (tvp->tv_sec < 0) tvp->tv_sec = 0;
    if (tvp->tv_usec < 0) tvp->tv_usec = 0;
    } else {

    // 执行到这一步,说明没有时间事件
    // 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度

    /* If we have to check for events but need to return
    * ASAP because of AE_DONT_WAIT we need to set the timeout
    * to zero */
    if (flags & AE_DONT_WAIT) {
    // 设置文件事件不阻塞
    tv.tv_sec = tv.tv_usec = 0;
    tvp = &tv;
    } else {
    /* Otherwise we can block */
    // 文件事件可以阻塞直到有事件到达为止
    tvp = NULL; /* wait forever */
    }
    }

    // 处理文件事件,阻塞时间由 tvp 决定
    numevents = aeApiPoll(eventLoop, tvp);
    for (j = 0; j < numevents; j++) {
    // 从已就绪数组中获取事件
    aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

    int mask = eventLoop->fired[j].mask;
    int fd = eventLoop->fired[j].fd;
    int rfired = 0;

    /* note the fe->mask & mask & ... code: maybe an already processed
    * event removed an element that fired and we still didn't
    * processed, so we check if the event is still valid. */
    // 读事件
    if (fe->mask & mask & AE_READABLE) {
    // rfired 确保读/写事件只能执行其中一个
    rfired = 1;
    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
    }
    // 写事件
    if (fe->mask & mask & AE_WRITABLE) {
    if (!rfired || fe->wfileProc != fe->rfileProc)
    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
    }

    processed++;
    }
    }

    /* Check time events */
    // 执行时间事件
    if (flags & AE_TIME_EVENTS)
    processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
    }

操作

  1. Master
    TODO

  2. Slave

  3. Sentinel

  4. 获取集群信息

    1
    redis-cli -p 26379 info Sentinel
  5. 获取 master 节点地址

    1
    redis-cli -p 26379 SENTINEL get-master-addr-by-name mymaster

客户端如何连接Sentinel集群

Sentinel
在 Sentinel 模式下,客户端不是直接连接服务器的,而是先访问 Sentinel 拿到集群信息再尝试连接 Master。当 Master 发生故障时,客户端会重新向 Sentinel 要地址,并自动完成节点切换。

  • Master 和 Slave 的配置和之前并无区别;
  • Sentinel 相当于对 Master 的代理,Sentinel 可以通过发布订阅功能获取到 Slave 和其他 Sentinel 的信息。

    其实 Sentinel 的内核与其他形式的 Redis 服务器基本一致,只是支持的命令不同、负责的任务也不同。

同理,客户端也可以通过pubsub功能来订阅集群中的其他信息,关键事件如下:
RedisSentinel事件

Sentinel 执行原理

Sentinel的主要任务
在Sentinel的主事件循环中可以看到它每100毫秒执行的定时任务:

1
2
3
4
5
6
7
8
9
10
11
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...

/* Run the Sentinel timer if we are in sentinel mode. */
// 如果服务器运行在 sentinel 模式下,那么执行 SENTINEL 的主函数
run_with_period(100) {
if (server.sentinel_mode) sentinelTimer();
}

...
}

实例状态探测

  • 每个 Sentinel 以每秒钟一次的频率向它所知的主服务器、从服务器以及其他 Sentinel 实例发送一个 PING 命令
    如果一个实例(instance)距离最后一次有效回复 PING 命令的时间超过 down-after-milliseconds 选项所指定的值, 那么这个实例会被 Sentinel 标记为主观下线。 一个有效回复可以是: +PONG 、 -LOADING 或者 -MASTERDOWN。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    void sentinelHandleDictOfRedisInstances(dict *instances) {
    ...

    // 遍历多个实例,这些实例可以是多个主服务器、多个从服务器或者多个 sentinel
    di = dictGetIterator(instances);
    while((de = dictNext(di)) != NULL) {

    // 取出实例对应的实例结构
    sentinelRedisInstance *ri = dictGetVal(de);

    // 执行调度操作
    sentinelHandleRedisInstance(ri);

    // 如果被遍历的是主服务器,那么递归地遍历该主服务器的所有从服务器
    // 以及所有 sentinel
    if (ri->flags & SRI_MASTER) {

    // 所有从服务器
    sentinelHandleDictOfRedisInstances(ri->slaves);

    // 所有 sentinel
    sentinelHandleDictOfRedisInstances(ri->sentinels);

    // 对已下线主服务器(ri)的故障迁移已经完成
    // ri 的所有从服务器都已经同步到新主服务器
    if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
    // 已选出新的主服务器
    switch_to_promoted = ri;
    }
    }
    }

    ...
    }

    /* Perform scheduled operations for the specified Redis instance. */
    // 对给定的实例执行定期操作
    void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {

    /* ========== MONITORING HALF ============ */
    /* ========== 监控操作 =========*/

    /* Every kind of instance */
    /* 对所有类型实例进行处理 */

    // 如果有需要的话,创建连向实例的网络连接
    sentinelReconnectInstance(ri);

    // 根据情况,向实例发送 PING、 INFO 或者 PUBLISH 命令
    sentinelSendPeriodicCommands(ri);

    ...
    }

    // 根据时间和实例类型等情况,向实例发送命令,比如 INFO 、PING 和 PUBLISH
    // 虽然函数的名字包含 Ping ,但命令并不只发送 PING 命令
    /* Send periodic PING, INFO, and PUBLISH to the Hello channel to
    * the specified master or slave instance. */
    void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
    mstime_t now = mstime();
    mstime_t info_period, ping_period;
    int retval;

    /* Return ASAP if we have already a PING or INFO already pending, or
    * in the case the instance is not properly connected. */
    // 函数不能在网络连接未创建时执行
    if (ri->flags & SRI_DISCONNECTED) return;

    /* For INFO, PING, PUBLISH that are not critical commands to send we
    * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
    * want to use a lot of memory just because a link is not working
    * properly (note that anyway there is a redundant protection about this,
    * that is, the link will be disconnected and reconnected if a long
    * timeout condition is detected. */
    // 为了避免 sentinel 在实例处于不正常状态时,发送过多命令
    // sentinel 只在待发送命令的数量未超过 SENTINEL_MAX_PENDING_COMMANDS 常量时
    // 才进行命令发送
    if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;

    /* If this is a slave of a master in O_DOWN condition we start sending
    * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
    * period. In this state we want to closely monitor slaves in case they
    * are turned into masters by another Sentinel, or by the sysadmin. */
    // 对于从服务器来说, sentinel 默认每 SENTINEL_INFO_PERIOD 秒向它发送一次 INFO 命令
    // 但是,当从服务器的主服务器处于 SDOWN 状态,或者正在执行故障转移时
    // 为了更快速地捕捉从服务器的变动, sentinel 会将发送 INFO 命令的频率该为每秒一次
    if ((ri->flags & SRI_SLAVE) &&
    (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
    info_period = 1000;
    } else {
    info_period = SENTINEL_INFO_PERIOD;
    }

    /* We ping instances every time the last received pong is older than
    * the configured 'down-after-milliseconds' time, but every second
    * anyway if 'down-after-milliseconds' is greater than 1 second. */
    ping_period = ri->down_after_period;
    if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;

    // 实例不是 Sentinel (主服务器或者从服务器)
    // 并且以下条件的其中一个成立:
    // 1)SENTINEL 未收到过这个服务器的 INFO 命令回复
    // 2)距离上一次该实例回复 INFO 命令已经超过 info_period 间隔
    // 那么向实例发送 INFO 命令
    if ((ri->flags & SRI_SENTINEL) == 0 &&
    (ri->info_refresh == 0 ||
    (now - ri->info_refresh) > info_period))
    {
    /* Send INFO to masters and slaves, not sentinels. */
    retval = redisAsyncCommand(ri->cc,
    sentinelInfoReplyCallback, NULL, "INFO");
    if (retval == REDIS_OK) ri->pending_commands++;
    } else if ((now - ri->last_pong_time) > ping_period) {
    /* Send PING to all the three kinds of instances. */
    sentinelSendPing(ri);
    } else if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
    /* PUBLISH hello messages to all the three kinds of instances. */
    sentinelSendHello(ri);
    }
    }

从主观下线到客观下线

  • 如果一个主服务器被标记为主观下线, 那么正在监视这个主服务器的所有 Sentinel 要以每秒一次的频率确认主服务器的确进入了主观下线状态。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    /* Perform scheduled operations for the specified Redis instance. */
    // 对给定的实例执行定期操作
    void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {

    ...

    /* ============== ACTING HALF ============= */
    /* ============== 故障检测 ============= */

    /* We don't proceed with the acting half if we are in TILT mode.
    * TILT happens when we find something odd with the time, like a
    * sudden change in the clock. */
    // 如果 Sentinel 处于 TILT 模式,那么不执行故障检测。
    if (sentinel.tilt) {

    // 如果 TILI 模式未解除,那么不执行动作
    if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;

    // 时间已过,退出 TILT 模式
    sentinel.tilt = 0;
    sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited");
    }

    /* Every kind of instance */
    // 检查给定实例是否进入 SDOWN 状态
    sentinelCheckSubjectivelyDown(ri);

    ...
    }

    /* Is this instance down from our point of view? */
    // 检查实例是否已下线(从本 Sentinel 的角度来看)
    void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {

    ...

    /* Update the SDOWN flag. We believe the instance is SDOWN if:
    *
    * 更新 SDOWN 标识。如果以下条件被满足,那么 Sentinel 认为实例已下线:
    *
    * 1) It is not replying.
    * 它没有回应命令
    * 2) We believe it is a master, it reports to be a slave for enough time
    * to meet the down_after_period, plus enough time to get two times
    * INFO report from the instance.
    * Sentinel 认为实例是主服务器,这个服务器向 Sentinel 报告它将成为从服务器,
    * 但在超过给定时限之后,服务器仍然没有完成这一角色转换。
    */
    if (elapsed > ri->down_after_period ||
    (ri->flags & SRI_MASTER &&
    ri->role_reported == SRI_SLAVE &&
    mstime() - ri->role_reported_time >
    (ri->down_after_period+SENTINEL_INFO_PERIOD*2)))
    {
    /* Is subjectively down */
    if ((ri->flags & SRI_S_DOWN) == 0) {
    // 发送事件
    sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@");
    // 记录进入 SDOWN 状态的时间
    ri->s_down_since_time = mstime();
    // 打开 SDOWN 标志
    ri->flags |= SRI_S_DOWN;
    }
    } else {
    // 移除(可能有的) SDOWN 状态
    /* Is subjectively up */
    if (ri->flags & SRI_S_DOWN) {
    // 发送事件
    sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@");
    // 移除相关标志
    ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
    }
    }
    }
    如果有足够数量的 Sentinel (至少要达到配置文件指定的数量)在指定的时间范围内同意这一判断, 那么这个主服务器被标记为客观下线
    这个数量是可以配置的,即quorum的数量。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    /* Perform scheduled operations for the specified Redis instance. */
    // 对给定的实例执行定期操作
    void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
    ...

    /* ============== ACTING HALF ============= */
    /* ============== 故障检测 ============= */

    ...这里省略SDOWN检测代码

    /* Only masters */
    /* 对主服务器进行处理 */
    if (ri->flags & SRI_MASTER) {

    // 判断 master 是否进入 ODOWN 状态
    sentinelCheckObjectivelyDown(ri);

    // 如果主服务器进入了 ODOWN 状态,那么开始一次故障转移操作
    if (sentinelStartFailoverIfNeeded(ri))
    // 强制向其他 Sentinel 发送 SENTINEL is-master-down-by-addr 命令
    // 刷新其他 Sentinel 关于主服务器的状态
    sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);

    // 执行故障转移
    sentinelFailoverStateMachine(ri);

    // 如果有需要的话,向其他 Sentinel 发送 SENTINEL is-master-down-by-addr 命令
    // 刷新其他 Sentinel 关于主服务器的状态
    // 这一句是对那些没有进入 if(sentinelStartFailoverIfNeeded(ri)) { /* ... */ }
    // 语句的主服务器使用的
    sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
    }
    }
  • 在一般情况下, 每个 Sentinel 会以每 10 秒一次的频率向它已知的所有主服务器和从服务器发送 INFO 命令。 当一个主服务器被 Sentinel 标记为客观下线时, Sentinel 向下线主服务器的所有从服务器发送 INFO 命令的频率会从 10 秒一次改为每秒一次。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {

    ...

    /* If this is a slave of a master in O_DOWN condition we start sending
    * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
    * period. In this state we want to closely monitor slaves in case they
    * are turned into masters by another Sentinel, or by the sysadmin. */
    // 对于从服务器来说, sentinel 默认每 SENTINEL_INFO_PERIOD 秒向它发送一次 INFO 命令
    // 但是,当从服务器的主服务器处于 SDOWN 状态,或者正在执行故障转移时
    // 为了更快速地捕捉从服务器的变动, sentinel 会将发送 INFO 命令的频率该为每秒一次
    if ((ri->flags & SRI_SLAVE) &&
    (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
    info_period = 1000;
    } else {
    info_period = SENTINEL_INFO_PERIOD;
    }

    ...

    // 实例不是 Sentinel (主服务器或者从服务器)
    // 并且以下条件的其中一个成立:
    // 1)SENTINEL 未收到过这个服务器的 INFO 命令回复
    // 2)距离上一次该实例回复 INFO 命令已经超过 info_period 间隔
    // 那么向实例发送 INFO 命令
    if ((ri->flags & SRI_SENTINEL) == 0 &&
    (ri->info_refresh == 0 ||
    (now - ri->info_refresh) > info_period))
    {
    /* Send INFO to masters and slaves, not sentinels. */
    retval = redisAsyncCommand(ri->cc,
    sentinelInfoReplyCallback, NULL, "INFO");
    if (retval == REDIS_OK) ri->pending_commands++;
    } else if
    ...
    }
    }
    注意上边发请求时使用的回调函数sentinelInfoReplyCallback
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // 处理 INFO 命令的回复
    void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
    sentinelRedisInstance *ri = c->data;
    redisReply *r;

    if (ri) ri->pending_commands--;
    if (!reply || !ri) return;
    r = reply;

    if (r->type == REDIS_REPLY_STRING) {
    // 解析info命令的响应数据
    sentinelRefreshInstanceInfo(ri,r->str);
    }
    }
  • 当没有足够数量的 Sentinel 同意主服务器已经下线,主服务器的客观下线状态就会被移除。
    当主服务器重新向 Sentinel 的 PING 命令返回有效回复时,主服务器的主观下线状态就会被移除。

故障转移 - 选举 Sentinel Leader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {

...

/* ============== ACTING HALF ============= */
/* ============== 故障检测 ============= */

...

/* Only masters */
/* 对主服务器进行处理 */
if (ri->flags & SRI_MASTER) {

// 判断 master 是否进入 ODOWN 状态
sentinelCheckObjectivelyDown(ri);

// 如果主服务器进入了 ODOWN 状态,那么开始一次故障转移操作
if (sentinelStartFailoverIfNeeded(ri))
// 强制向其他 Sentinel 发送 SENTINEL is-master-down-by-addr 命令
// 刷新其他 Sentinel 关于主服务器的状态
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);

// 执行故障转移
sentinelFailoverStateMachine(ri);

// 如果有需要的话,向其他 Sentinel 发送 SENTINEL is-master-down-by-addr 命令
// 刷新其他 Sentinel 关于主服务器的状态
// 这一句是对那些没有进入 if(sentinelStartFailoverIfNeeded(ri)) { /* ... */ }
// 语句的主服务器使用的
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
}
}

状态感知(info)

Sentinel服务器只需配置Master的地址,其他Slave的信息是通过定时(10秒)向Master发送info命令来获取的,info命令返回的信息中,包含了主从拓扑关系,其中包括每个slave的地址和端口号。有了这些信息后,哨兵就会记住这些节点的拓扑信息,在后续发生故障时,选择合适的slave节点进行故障恢复。
哨兵除了向master发送info之外,还会向每个master节点特殊的pubsub中发送master当前的状态信息和哨兵自身的信息,其他哨兵节点通过订阅这个pubsub,就可以拿到每个哨兵发来的信息。这么做的目的主要有2个:

  • 哨兵节点可以发现其他哨兵的加入,进而方便多个哨兵节点通信,为后续共同协商提供基础
  • 与其他哨兵节点交换master的状态信息,为后续判断master是否故障提供依据

心跳检测(ping)

故障发生时,需要立即启动故障恢复机制,那么Sentinel怎么及时地知道哪些节点发生故障了呢?这主要是通过向所有其他节点发送PING命令来实现的。
每个哨兵节点每隔1秒向master、slave、其他哨兵节点发送ping命令,如果对方能在指定时间内响应,说明节点健康存活。如果未在规定时间内(可配置)响应,那么该哨兵节点认为此节点主观下线

至于Sentinel怎么知道其他节点的地址,其实就是通过前面提到的info命令来感知的。

主观下线和客观下线

  • 主观下线(Subjectively Down, 简称 SDOWN)
    主观下线指的是单个 Sentinel 实例对服务器做出的下线判断。
    如果一个服务器没有在 master-down-after-milliseconds 选项所指定的时间内, 对向它发送 PING 命令的 Sentinel 返回一个有效回复(有效回复只有+PONG、-LOADING 错误或 -MASTERDOWN 错误), 那么 Sentinel 就会将这个服务器标记为主观下线。

    注意是在master-down-after-milliseconds时间内一直返回无效回复。

  • 客观下线(Objectively Down, 简称 ODOWN)
    客观下线指的是多个 Sentinel 实例在对同一个 Master 做出 SDOWN 判断, 并且通过 SENTINEL is-master-down-by-addr 命令互相交流之后,得出的服务器下线判断。 (一个 Sentinel 可以通过向另一个 Sentinel 发送 SENTINEL is-master-down-by-addr 命令来询问对方是否认为给定的服务器已下线。)
    从主观下线切换到客观下线并不是通过较严格的投票算法,而是采用了流言协议(gossip protocol):只要 Sentinel 在给定时间内从其他 Sentinel 接收到足够数量的 Master 下线通知,那么 Sentinel 就会执行状态的切换;如果之后其他 Sentinel 不再报告 Master 已下线,则客观下线状态就会被移除。
    只要一个 Sentinel 发现某个 Master 进入客观下线状态,之后就会进入故障迁移阶段,选举出一个 Sentinel 对失效的 Master 执行自动故障迁移操作。

    客观下线只适用于 Master,对 Slave 或 Sentinel 则不会达到客观下线状态。

故障迁移(Master 挂掉,Sentinel选举新Master)

单纯的主从架构并不能挽救 Master 挂掉的情况,因此引入了 Sentinel 集群。Sentinel 会不断地检查集群主服务器和从服务器是否运作正常,并在超过 n 个 Sentinel 同意后判断主节点失效(配置sentinel monitor mymaster 127.0.0.1 6379 2表示这个n=2),不过要注意,无论设置多少个 Sentinel 同意才能判断一个服务器失效, 一个 Sentinel 都需要获得系统中多数 Sentinel 的支持, 才能发起一次自动故障迁移。

  • 当一个主服务器不能正常工作时, Sentinel 会开始一次自动故障迁移操作,它会将失效主服务器的其中一个从服务器升级为新的主服务器,并让失效主服务器的其他从服务器改为复制新的主服务器;
  • 当客户端试图连接失效的主服务器时,集群也会向客户端返回新主服务器的地址,使得集群可以使用新主服务器代替失效服务器。

故障转移主要分为Sentinel选举和故障转移(Master替换)两个步骤,Sentinel选主流程如下:

  • Sentinel发现主服务器已经进入客观下线状态。
  • 利用Raft leader election算法选举 Sentinel 中的 Leader,对我们的当前 epoch 进行自增, 并尝试在这个epoch中当选,之后,所有 Sentinel 都以更高的 epoch 为准,并主动用更新的 epoch 代替自己的配置。
  • 如果当选失败, 那么在设定的故障迁移超时时间的两倍之后,重新尝试当选。 如果当选成功, 那么执行Slave的选主。

Slave选举

Slave选举的规则如下:

  • 在失效主服务器属下的从服务器当中, 那些被标记为主观下线、已断线、或者最后一次回复 PING 命令的时间大于五秒钟的从服务器都会被淘汰。
  • 在失效主服务器属下的从服务器当中, 那些与失效主服务器连接断开的时长超过 down-after 选项指定的时长十倍的从服务器都会被淘汰。
  • 在经历了以上两轮淘汰之后剩下来的从服务器中, 我们选出复制偏移量(replication offset)最大的那个从服务器作为新的主服务器; 如果复制偏移量不可用, 或者从服务器的复制偏移量相同, 那么带有最小运行 ID 的那个从服务器成为新的主服务器。

也就是说,多个Slave的优先级按照:slave-priority配置 > 数据完整性 > runid较小者进行选择。

之后所有Sentinel要进行投票选出一个Leader:
RedisSentinel投票

选出Leader后,Leader需要从现有的Slave中选出

故障转移

提升新的Master的流程如下:

  • 向被选中的从服务器发送 SLAVEOF NO ONE 命令,让它转变为主服务器。
  • 通过发布与订阅功能, 将更新后的配置传播给所有其他 Sentinel , 其他 Sentinel 对它们自己的配置进行更新。
  • 向已下线主服务器的从服务器发送 SLAVEOF 命令, 让它们去复制新的主服务器。
  • 当所有从服务器都已经开始复制新的主服务器时, 领头 Sentinel 终止这次故障迁移操作。

客户端感知新master流程如下:
哨兵在故障切换完成之后,会向自身节点的指定pubsub中写入一条信息,客户端可以订阅这个pubsub来感知master的变化通知。我们的客户端也可以通过在哨兵节点主动查询当前最新的master,来拿到最新的master地址。
另外,哨兵还提供了“钩子”机制,我们也可以在哨兵配置文件中配置一些脚本逻辑,在故障切换完成时,触发“钩子”逻辑,通知客户端发生了切换,让客户端重新在哨兵上获取最新的master地址。
一般来说,推荐采用第一种方式进行处理,很多客户端SDK中已经集成好了从哨兵节点获取最新master的方法,我们直接使用即可。

Sentinel 选举的安全性

配置安全性:

  • 每当一个 Redis 实例被重新配置(reconfigured) —— 无论是被设置成主服务器、从服务器、又或者被设置成其他主服务器的从服务器 —— Sentinel 都会向被重新配置的实例发送一个 CONFIG REWRITE 命令, 从而确保这些配置会持久化在硬盘里。完成重新配置之后,从服务器会去复制正确的主服务器。
  • Sentinel 的状态会被持久化到 Sentinel 配置文件里,当 Sentinel 接收到新配置或 Leader Sentinel 为 Master 创建一个新配置时,这些配置都会与epoch一起被保存到磁盘;

故障自动迁移的一致性:

  • Raft 算法保证在一个 epoch 里只有一个 Leader Sentinel 产生,减少了脑裂的风险;
  • Sentinel 集群总是以更高的 epoch 为准,因为发生网络分区(network partition)时可能会有 Sentinel 包含老的配置,而当这个 Sentinel 服务器接收到其他 Sentinel 的版本更新配置时就会进行更新。
  • 发生网络分区并且某些 Sentinel 仍在采用老的配置时,如果有客户端连接到这些 Sentinel 上,最终可能就会将请求转发到非 Master 服务器上,造成数据不一致。因此,应该使用 min-slaves-to-write 选项, 让主服务器在连接的从实例少于给定数量时停止执行写操作, 与此同时, 应该在每个运行 Redis 主服务器或从服务器的机器上运行 Redis Sentinel 进程。

Sentinel 故障迁移的实时性

故障迁移虽然能提供主从切换来保证挂掉的Master能被其他Slave顶替上来,但是这个顶替过程大概需要多长时间呢?具体又是哪些步骤会比较耗时?

  1. 判断Master下线
    Sentinel会PING Master,如果距离上次PING成功的时间超过了master-down-after-milliseconds时间,则表示主观下线了。
    将Master标记为SDOWN后,这个Sentinel会通过发事件消息来通知其他Sentinel。

    Cluster中是通过gossip协议来通知其他节点的。

  2. 重新选主

  3. Slave提升

这个实时性的讨论并不是纯粹的极客行为,因为切换要多长时间是评估我们服务可用性的重要指标,并且提供后续优化的指导方向。

TILT 模式

TILT 模式是一种特殊的保护模式,Sentinel 每隔 100ms 会向实例发一次PING命令,并将上一次 PING 成功的时间和当前时间比对,从而知道与该实例有多长时间没有进行任何成功通讯:

  • 如果两次调用时间之间的差距为负值, 或者非常大(超过 2 秒钟), 那么 Sentinel 进入 TILT 模式。
  • 如果 Sentinel 已经进入 TILT 模式, 那么 Sentinel 延迟退出 TILT 模式的时间。

    Sentinel严重依赖计算机的时间功能,一旦计算机的时间功能出现故障, 或者计算机非常忙碌, 又或者进程因为某些原因而被阻塞时, Sentinel 可能也会跟着出现故障。

进入 TILT 模式后,Sentinel 仍然会继续监视所有目标,但是:

  • 它不再执行任何操作,比如故障转移。
  • 当有实例向这个 Sentinel 发送 SENTINEL is-master-down-by-addr 命令时, Sentinel 返回负值: 因为这个 Sentinel 所进行的下线判断已经不再准确。

TILT 相当于降级,如果 Sentinel 可以在 TILT 模式下正常维持 30s,那么 Sentinel 会退出 TILT 模式。

BUSY 状态

当 Lus 脚本执行时间超过阈值,Redis 会返回BUSY错误,当出现这种情况时, Sentinel 在尝试执行故障转移操作之前, 会先向服务器发送一个 SCRIPT KILL 命令, 如果服务器正在执行的是一个只读脚本的话, 那么这个脚本就会被杀死, 服务器就会回到正常状态。

脑裂

虽然Sentinel利用Raft选举不会发生脑裂,但是在一些极端的情况下还是有可能会发生脑裂的,比如:

  1. 原Master不能提供服务了,但是它本身并没有挂掉;
  2. Sentinel发现连不上Master,于是判定客观下线,并发起主从切换;
  3. 原Master和新Master同时给Client提供服务,发生脑裂。

这种脑裂并不会影响可用性,但是却破坏了数据的一致性,甚至会导致数据丢失:在Sentinel重连上原Master后,会将其归入到新Master的Slave,这时脑裂期间的数据就会被从新Master上复制过来的数据覆盖掉了,导致数据的丢失。

脑裂的解决办法主要是以下两个配置参数:

  • min-slaves-to-write:这个配置项设置了主库能进行数据同步的最少从库数量;
  • min-slaves-max-lag:这个配置项设置了主从库间进行数据复制时,从库给主库发送 ACK 消息的最大延迟(以秒为单位)。

QA

5个哨兵的集群,quorum设置为2,在运行过程中,有3个实例都发生了故障,这时主库也发生了故障,还能正确判断主库的客观下线吗?还能执行主从的自动切换吗?

判断客户端下线是可以的,因为判断ODOWN的条件是有不少于quorum数量的Sentinel同意即可。
不可执行主从切换,因为一个哨兵要执行主从切换,得获得半数以上哨兵的投票同意,也就是3个哨兵。

哨兵实例是不是越多越好?

哨兵实例越多,误判率越低,但是判断主库下线和选举Leader时实例要拿到的赞成票也越多,主从切换花费的时间也相对会更多。
如果客户端对Redis的响应时间有要求,则很有可能会报警。

调大down-after-milliseconds对减少误判是不是有好处?

这个值的作用是:判断距离上次PING成功的时间超过了这个值,就标记实例主观下线。
调大的话Sentinel需要更长的时间才能判断集群出问题了,也即影响到Redis的可用性。

参考

  1. Sentinel

Redis Cluster介绍

Cluster 优势

  1. 线性的可扩展性:扩容即迁移槽,已有很多迁移案例;
    如果要保存更多的数据,可以直接增加Master来支持,比如每台Master存32G,要完美存下1T数据的话,可以设置32台的Master,当然实际情况下这样非常浪费,一般会少设置一些,只用几台Master来存储最热的数据。
  2. 没有合并操作:因为 Redis 中的 List 和 Set 中保存的 Value 通常是比较大的,可能会达数以百万计的元素,而它们可能被存储到了不同的 Redis 实例上,传输和合并这样的值将很容易称为一个主要的性能瓶颈;
  3. 写入安全(Write Safety):只有在非常少见的 Master 宕机的情况下,写入才会失败,并且这个失败的时间窗口不大(由一个 Slave 顶替上来);
  4. 可用性(Availability):就算有部分 Master 不可用了,它们的 Slave 仍然可以通过选举提升为 Master。

Cluster 缺点

  1. Redis 集群并不支持处理多个 keys 的命令,因为这需要在不同的节点间移动数据,从而达不到像 Redis 那样的性能,在高负载的情况下可能会导致不可预料的错误。
  2. Redis 集群不像单机版本的 Redis 那样支持多个数据库,集群只有数据库 0,而且也不支持 SELECT 命令。

Cluster的去中心化架构

Cluster
redis cluster在设计的时候,就考虑到了去中心化,去中间件,也就是说,集群中的每个节点都是平等的关系,都是对等的,每个节点都保存各自的数据和整个集群的状态。所有的 redis 节点彼此互联(PING-PONG 机制),内部使用二进制协议优化传输速度和带宽,而且这些连接保持活跃,这样就保证了我们只需要连接集群中的任意一个节点,就可以获取到其他节点的数据。客户端与 redis 节点直连,不需要中间 proxy 层,客户端不需要连接集群所有节点,连接集群中任何一个可用节点即可。

一致性

Redis 不能保证强一致性,因为:

  1. 异步复制:写操作会被异步复制到 slave 节点,但可能由于出现网络分区、脑裂而导致数据丢失。
    网络分区
    如上图所示,客户端 Z1 向 Master-B 写入数据后,集群出现了网络分区,且分区持续的时间足够长导致此时 B1 被选举为新的 Master,则在此期间 Z1 向 B 写入的数据就都丢失了。

    网络分区出现期间,客户端 Z1 可以向主节点 B 发送写命令的最大时间是有限制的, 这一时间限制称为节点超时时间(node timeout), 是 Redis 集群的一个重要的配置选项。

Cluster VS Codis

Codis 集群中包含了 4 类关键组件。

  • codis server:这是进行了二次开发的 Redis 实例,其中增加了额外的数据结构,支持数据迁移操作,主要负责处理具体的数据读写请求。
  • codis proxy:接收客户端请求,并把请求转发给 codis server。
  • Zookeeper 集群:保存集群元数据,例如数据位置信息和 codis proxy 信息。
  • codis dashboard 和 codis fe:共同组成了集群管理工具。其中,codis dashboard 负责执行集群管理工作,包括增删 codis server、codis proxy 和进行数据迁移。而 codis fe 负责提供 dashboard 的 Web 操作界面,便于我们直接在 Web 界面上进行集群管理。

Codis如何处理一次请求:

  1. 客户端连接Codis Proxy,将请求发给Proxy;
  2. codis proxy 接收到请求,就会查询请求数据和 codis server 的映射关系,并把请求转发给相应的 codis server 进行处理
  3. 当 codis server 处理完请求后,会把结果返回给 codis proxy,proxy 再把数据返回给客户端。

以4个方面来讨论

  • 数据分布
    和Cluster类似,Codis将数据保存到slot,集群一共有1024个slot,需要手动分配给Codis Server,或者由dashboard自动分配。
    当客户端要读写数据时,会使用CRC32算法计算key的哈希值,并对1024取模,就可以知道对应的是哪个slot了。
    这个路由规则需要先配置到dashboard,dashboard会把路由表发送给codis proxy,并同时保存到ZooKeeper。
    而Cluster的数据路由表是由每个实例管理的,如果发生变化,这些实例会通过Gossip协议来互相传播,如果实例比较多,就会占用比较多的网络资源。
  • 集群扩容和数据迁移
    如果要增加Codis Server来负载slot,需要配置要迁移的slot,Codis Server会将该slot中的数据一个一个地发送给目标Server。
    增加Codis Proxy也是类似的流程。
  • 客户端兼容性
    Codis客户端直接和Codis Proxy连接,codis proxy 是和单实例客户端兼容的,而集群相关的管理工作又都是由Codis Proxy和Codis dashboard这些组件来完成的,不需要客户端参与。
  • 可靠性保证
    Codis Server保证可靠性:Codis Server本身是Redis实例,只是增加了集群相关的操作命令,可靠性是可以通过主从机制+哨兵来实现的。
    Codis Proxy的可靠性:Proxy上的信息都来自ZooKeeper,例如路由表,只要ZooKeeper集群中实例半数以上可以正常工作,那么ZooKeeper集群就是正常的。

比较:
RedisCluster比对Codis

  • 从稳定性和成熟度来看,Codis 应用得比较早,在业界已经有了成熟的生产部署。虽然 Codis 引入了 proxy 和 Zookeeper,增加了集群复杂度,但是,proxy 的无状态设计和 Zookeeper 自身的稳定性,也给 Codis 的稳定使用提供了保证。而 Redis Cluster 的推出时间晚于 Codis,相对来说,成熟度要弱于 Codis,如果你想选择一个成熟稳定的方案,Codis 更加合适些。
  • 从业务应用客户端兼容性来看,连接单实例的客户端可以直接连接 codis proxy,而原本连接单实例的客户端要想连接 Redis Cluster 的话,就需要开发新功能。所以,如果你的业务应用中大量使用了单实例的客户端,而现在想应用切片集群的话,建议你选择 Codis,这样可以避免修改业务应用中的客户端。
  • 从使用 Redis 新命令和新特性来看,Codis server 是基于开源的 Redis 3.2.8 开发的,所以,Codis 并不支持 Redis 后续的开源版本中的新增命令和数据类型。另外,Codis 并没有实现开源 Redis 版本的所有命令,比如 BITOP、BLPOP、BRPOP,以及和与事务相关的 MUTLI、EXEC 等命令。Codis 官网上列出了不被支持的命令列表,你在使用时记得去核查一下。所以,如果你想使用开源 Redis 版本的新特性,Redis Cluster 是一个合适的选择。
  • 从数据迁移性能维度来看,Codis 能支持异步迁移,异步迁移对集群处理正常请求的性能影响要比使用同步迁移的小。所以,如果你在应用集群时,数据迁移比较频繁的话,Codis 是个更合适的选择。

数据分布(分区)和查询路由

分区策略

分区将原来比较大的数据集分离存储到多个存储媒介上,分区后Redis可以管理更大的内存空间和计算能力,但同时多主机又会面临很多分布式集群的可用性、一致性等问题。
分区策略:

  1. 范围分区
    将不同范围的对象映射到不同的Redis实例,比如用户ID为0到10000的存储到R0,10001到20000的存储到R1,以此类推。
    缺点是需要建立一张映射表,谨小甚微地维护ID和Redis实例之间的映射关系,而且由于需要维护表,导致效率不如其他方案。
  2. 散列分区
    使用散列函数(如CRC32)将key转换为一个数字,取模得到一个0到3的数字(假设Redis服务器有4台),这个数字即对应服务器的序号。
  3. 一致性哈希
    一致性哈希的一种示例实现可以参考Dubbo中的实现:com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance
    关键代码如下:
    ConsistentHashLoadBalance
  4. 哈希槽
    Redis Cluster采用的是哈希槽的方式。
    Redis 集群没有并使用传统的一致性哈希来分配数据,而是采用另外一种叫做哈希槽 (hash slot)的方式来分配的。redis cluster 默认分配了 16384 个 slot,当我们 set 一个 key 时,会用CRC16算法来取模得到所属的 slot,然后将这个 key 分到哈希槽区间的节点上,具体算法就是:CRC16(key) % 16384。所以我们在测试的时候看到 set 和 get 的时候,直接跳转到了 7000 端口的节点。
    客户端在接收到重定向错误(redirections errors) -MOVED 和 -ASK 的时候, 将命令重定向到其他节点。客户端不需要存储集群信息(槽所在位置),但是如何客户端可以缓存键值和节点之间的映射关系,就可以明显提高命令执行的效率了(Redisson 中就是这么做的)。
    在 Cluster 架构中,slave 节点不分配槽,只拥有读权限,但是在代码中 cluster 执行读写操作的都是 master 节点,并不是读就是从节点、写就是主节点。

源码中,Redis采用一个大小固定为CLUSTER_SLOTS的clusterNode数组slots来保存每个桶的负责节点,这是个字节数组,每个位表示当前节点是否负责这个槽:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
// 节点状态
struct clusterNode {

// 创建节点的时间
mstime_t ctime; /* Node object creation time. */

// 节点的名字,由 40 个十六进制字符组成
// 例如 68eef66df23420a5862208ef5b1a7005b806f2ff
char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */

// 节点标识
// 使用各种不同的标识值记录节点的角色(比如主节点或者从节点),
// 以及节点目前所处的状态(比如在线或者下线)。
int flags; /* REDIS_NODE_... */

// 节点当前的配置纪元,用于实现故障转移
uint64_t configEpoch; /* Last configEpoch observed for this node */

// 由这个节点负责处理的槽
// 一共有 REDIS_CLUSTER_SLOTS / 8 个字节长
// 每个字节的每个位记录了一个槽的保存状态
// 位的值为 1 表示槽正由本节点处理,值为 0 则表示槽并非本节点处理
// 比如 slots[0] 的第一个位保存了槽 0 的保存情况
// slots[0] 的第二个位保存了槽 1 的保存情况,以此类推
unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */

// 该节点负责处理的槽数量
int numslots; /* Number of slots handled by this node */

// 如果本节点是主节点,那么用这个属性记录从节点的数量
int numslaves; /* Number of slave nodes, if this is a master */

// 指针数组,指向各个从节点
struct clusterNode **slaves; /* pointers to slave nodes */

// 如果这是一个从节点,那么指向主节点
struct clusterNode *slaveof; /* pointer to the master node */

// 最后一次发送 PING 命令的时间
mstime_t ping_sent; /* Unix time we sent latest ping */

// 最后一次接收 PONG 回复的时间戳
mstime_t pong_received; /* Unix time we received the pong */

// 最后一次被设置为 FAIL 状态的时间
mstime_t fail_time; /* Unix time when FAIL flag was set */

// 最后一次给某个从节点投票的时间
mstime_t voted_time; /* Last time we voted for a slave of this master */

// 最后一次从这个节点接收到复制偏移量的时间
mstime_t repl_offset_time; /* Unix time we received offset for this node */

// 这个节点的复制偏移量
long long repl_offset; /* Last known repl offset for this node. */

// 节点的 IP 地址
char ip[REDIS_IP_STR_LEN]; /* Latest known IP address of this node */

// 节点的端口号
int port; /* Latest known port of this node */

// 保存连接节点所需的有关信息
clusterLink *link; /* TCP/IP link with this node */

// 一个链表,记录了所有其他节点对该节点的下线报告
list *fail_reports; /* List of nodes signaling this as failing */

};
typedef struct clusterNode clusterNode;

// 集群状态,每个节点都保存着一个这样的状态,记录了它们眼中的集群的样子。
// 另外,虽然这个结构主要用于记录集群的属性,但是为了节约资源,
// 有些与节点有关的属性,比如 slots_to_keys 、 failover_auth_count
// 也被放到了这个结构里面。
typedef struct clusterState {

// 指向当前节点的指针
clusterNode *myself; /* This node */

// 集群当前的配置纪元,用于实现故障转移
uint64_t currentEpoch;

// 集群当前的状态:是在线还是下线
int state; /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */

// 集群中至少处理着一个槽的节点的数量。
int size; /* Num of master nodes with at least one slot */

// 集群节点名单(包括 myself 节点)
// 字典的键为节点的名字,字典的值为 clusterNode 结构
dict *nodes; /* Hash table of name -> clusterNode structures */

// 节点黑名单,用于 CLUSTER FORGET 命令
// 防止被 FORGET 的命令重新被添加到集群里面
// (不过现在似乎没有在使用的样子,已废弃?还是尚未实现?)
dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */

// 记录要从当前节点迁移到目标节点的槽,以及迁移的目标节点
// migrating_slots_to[i] = NULL 表示槽 i 未被迁移
// migrating_slots_to[i] = clusterNode_A 表示槽 i 要从本节点迁移至节点 A
clusterNode *migrating_slots_to[REDIS_CLUSTER_SLOTS];

// 记录要从源节点迁移到本节点的槽,以及进行迁移的源节点
// importing_slots_from[i] = NULL 表示槽 i 未进行导入
// importing_slots_from[i] = clusterNode_A 表示正从节点 A 中导入槽 i
clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS];

// 负责处理各个槽的节点
// 例如 slots[i] = clusterNode_A 表示槽 i 由节点 A 处理
clusterNode *slots[REDIS_CLUSTER_SLOTS];

// 跳跃表,表中以槽作为分值,键作为成员,对槽进行有序排序
// 当需要对某些槽进行区间(range)操作时,这个跳跃表可以提供方便
// 具体操作定义在 db.c 里面
zskiplist *slots_to_keys;

/* The following fields are used to take the slave state on elections. */
// 以下这些域被用于进行故障转移选举

// 上次执行选举或者下次执行选举的时间
mstime_t failover_auth_time; /* Time of previous or next election. */

// 节点获得的投票数量
int failover_auth_count; /* Number of votes received so far. */

// 如果值为 1 ,表示本节点已经向其他节点发送了投票请求
int failover_auth_sent; /* True if we already asked for votes. */

int failover_auth_rank; /* This slave rank for current auth request. */

uint64_t failover_auth_epoch; /* Epoch of the current election. */

/* Manual failover state in common. */
/* 共用的手动故障转移状态 */

// 手动故障转移执行的时间限制
mstime_t mf_end; /* Manual failover time limit (ms unixtime).
It is zero if there is no MF in progress. */
/* Manual failover state of master. */
/* 主服务器的手动故障转移状态 */
clusterNode *mf_slave; /* Slave performing the manual failover. */
/* Manual failover state of slave. */
/* 从服务器的手动故障转移状态 */
long long mf_master_offset; /* Master offset the slave needs to start MF
or zero if stil not received. */
// 指示手动故障转移是否可以开始的标志值
// 值为非 0 时表示各个主服务器可以开始投票
int mf_can_start; /* If non-zero signal that the manual failover
can start requesting masters vote. */

/* The followign fields are uesd by masters to take state on elections. */
/* 以下这些域由主服务器使用,用于记录选举时的状态 */

// 集群最后一次进行投票的纪元
uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */

// 在进入下个事件循环之前要做的事情,以各个 flag 来记录
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */

// 通过 cluster 连接发送的消息数量
long long stats_bus_messages_sent; /* Num of msg sent via cluster bus. */

// 通过 cluster 接收到的消息数量
long long stats_bus_messages_received; /* Num of msg rcvd via cluster bus.*/

} clusterState;

分区的实现层次

分区可以在程序的不同层次实现。

  • 客户端分区
    就是在客户端就已经决定数据会被存储到哪个redis节点或者从哪个redis节点读取。大多数客户端已经实现了客户端分区。
  • 代理分区
    意味着客户端将请求发送给代理,然后代理决定去哪个节点写数据或者读数据。代理根据分区规则决定请求哪些Redis实例,然后根据Redis的响应结果返回给客户端。redis和memcached的一种代理实现就是Twemproxy
  • 查询路由(Query routing)
    意思是客户端随机地请求任意一个redis实例,然后由Redis将请求转发给正确的Redis节点。Redis Cluster实现了一种混合形式的查询路由,但并不是直接将请求从一个redis节点转发到另一个redis节点,而是在客户端的帮助下直接redirected到正确的redis节点。

Redis Cluster采用的是查询路由的方式。
在Cluster模式下,Redis接收任何命令都会首先计算键对应的桶编号,再根据桶找出所对应的节点,如果节点是自身,则处理键命令;否则回复MOVED重定向错误,通知客户端请求正确的节点,这个过程称为MOVED重定向。
在客户端初次连接Redis集群时,如果客户端是Smart Client,它会获取集群的节点信息及slot的分布信息,并在本地缓存一份 hash slot 与node关系的路由表,这样不必每次访问服务器时都因为重定向而经过多次网络调用。
redis-cli不是smart client,它没有缓存路由表的功能;Java客户端Redisson是smart client,它在初始化时会调用redis实例的CLUSTER NODES命令来获取集群中每个Master负责的slot范围,并启动一个定时任务来每秒刷新本地缓存的集群状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
/**
* 启动时查询集群状态
*/
public ClusterConnectionManager(ClusterServersConfig cfg, Config config) {
super(config);

this.config = create(cfg);
initTimer(this.config);

Throwable lastException = null;
List<String> failedMasters = new ArrayList<String>();
for (URI addr : cfg.getNodeAddresses()) {
RFuture<RedisConnection> connectionFuture = connect(cfg, addr);
try {
RedisConnection connection = connectionFuture.syncUninterruptibly().getNow();

// 发送cluster nodes命令

clusterNodesCommand = RedisCommands.CLUSTER_NODES;
if ("rediss".equals(addr.getScheme())) {
clusterNodesCommand = RedisCommands.CLUSTER_NODES_SSL;
}

List<ClusterNodeInfo> nodes = connection.sync(clusterNodesCommand);

StringBuilder nodesValue = new StringBuilder();
for (ClusterNodeInfo clusterNodeInfo : nodes) {
nodesValue.append(clusterNodeInfo.getNodeInfo()).append("\n");
}
log.info("Redis cluster nodes configuration got from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue);

lastClusterNode = addr;

// 读取每个节点的分区配置

Collection<ClusterPartition> partitions = parsePartitions(nodes);

...

} catch (Exception e) {
lastException = e;
log.warn(e.getMessage());
}
}

...

// 每秒定时刷新本地缓存的cluster状态,包括每个Master节点负责的slot范围
scheduleClusterChangeCheck(cfg, null);
}

/**
* 获取key所处的节点
*/
private NodeSource getNodeSource(String key) {
int slot = connectionManager.calcSlot(key);
MasterSlaveEntry entry = connectionManager.getEntry(slot);
return new NodeSource(entry);
}

/**
* 获取key所处的节点
*/
protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final Codec codec,
// 建立连接、发送命令
final RFuture<RedisConnection> connectionFuture;
if (readOnlyMode) {
connectionFuture = connectionManager.connectionReadOp(source, command);
} else {
connectionFuture = connectionManager.connectionWriteOp(source, command);
}

...

connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
if (connFuture.isCancelled()) {
return;
}
// 如果执行不成功,则设置异常信息
if (!connFuture.isSuccess()) {
connectionManager.getShutdownLatch().release();
details.setException(convertException(connectionFuture));
return;
}
// 如果执行OK,释放连接
if (details.getAttemptPromise().isDone() || details.getMainPromise().isDone()) {
releaseConnection(source, connectionFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details);
return;
}

final RedisConnection connection = connFuture.getNow();
// 响应ASK的情况
if (details.getSource().getRedirect() == Redirect.ASK) {
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2);
RPromise<Void> promise = connectionManager.newPromise();
list.add(new CommandData<Void, Void>(promise, details.getCodec(), RedisCommands.ASKING, new Object[]{}));
list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
RPromise<Void> main = connectionManager.newPromise();
ChannelFuture future = connection.send(new CommandsData(main, list));
details.setWriteFuture(future);
}
// 响应MOVED的情况
else {
if (log.isDebugEnabled()) {
log.debug("acquired connection for command {} and params {} from slot {} using node {}... {}",
details.getCommand(), Arrays.toString(details.getParams()), details.getSource(), connection.getRedisClient().getAddr(), connection);
}
ChannelFuture future = connection.send(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
details.setWriteFuture(future);
}

details.getWriteFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
checkWriteFuture(details, connection);
}
});

releaseConnection(source, connectionFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details);
}
});
}

下面是手动调用cluster nodes可以得到的响应,从中可以看到每个master所负责的slot范围:

1
2
3
4
5
6
7
8
9
10
hgc@hgc-X555LD:~$ redis-cli -h 10.32.64.12 -p 16371 -c
10.32.64.12:16371> auth 123456
OK
10.32.64.12:16371> cluster nodes
d0af93527054ae3713c6ae82f4f58e016c4968d7 10.32.64.161:16371@26371 slave dbf60db7dc4c2d8ea944481d162bf6be7ef48f5a 0 1604569999114 28 connected
20def2dff31ba78c04f72431a51054def2120638 10.32.64.161:16372@26372 master - 0 1604569998112 31 connected 0-5460
b0c86436cd4cf6d2240faf01b45735616b82cae8 10.32.64.12:16371@26371 myself,slave 20def2dff31ba78c04f72431a51054def2120638 0 1604569995000 30 connected
dbf60db7dc4c2d8ea944481d162bf6be7ef48f5a 10.32.64.162:16372@26372 master - 0 1604569996000 28 connected 5461-10922
005c670071b5dab4ef085613f0ca6666fc5bcbce 10.32.64.12:16373@26373 slave df7f690feee2ad536f2573b55190e0f8d576779e 0 1604569998000 25 connected
df7f690feee2ad536f2573b55190e0f8d576779e 10.32.64.162:16373@26373 master - 0 1604569997000 25 connected 10923-16383

如果Cluster发生了扩容缩容或failover导致客户端缓存的信息过期,客户端只需要MOVED时重新更新本地缓存即可。
但是这里有一个问题,如果扩容缩容时正在发生槽迁移,这时正在迁移中的槽在哪个节点是不确定的,可能会导致客户端本地缓存的频繁更新。因此,Redis迁移过程中,会对正在迁移的槽打标记(server.cluster->migrating_slots_to),如果客户端访问的key命中了正在迁移中的槽,则服务器会返回ASK而不是MOVED,客户端接收到ASK后不会重新更新本地的槽缓存。
代码:redis.c/processCommand

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/* If cluster is enabled perform the cluster redirection here.
*
* 如果开启了集群模式,那么在这里进行转向操作。
*
* However we don't perform the redirection if:
*
* 不过,如果有以下情况出现,那么节点不进行转向:
*
* 1) The sender of this command is our master.
* 命令的发送者是本节点的主节点
*
* 2) The command has no key arguments.
* 命令没有 key 参数
*/
if (server.cluster_enabled &&
!(c->flags & REDIS_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
{
int hashslot;

// 集群已下线
if (server.cluster->state != REDIS_CLUSTER_OK) {
flagTransaction(c);
addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n"));
return REDIS_OK;

// 集群运作正常
} else {
int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
// 不能执行多键处理命令
if (n == NULL) {
flagTransaction(c);
if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) {
addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
} else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) {
/* The request spawns mutliple keys in the same slot,
* but the slot is not "stable" currently as there is
* a migration or import in progress. */
addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
} else {
redisPanic("getNodeByQuery() unknown error.");
}
return REDIS_OK;

// 命令针对的槽和键不是本节点处理的,进行转向
} else if (n != server.cluster->myself) {
flagTransaction(c);
// -<ASK or MOVED> <slot> <ip>:<port>
// 例如 -ASK 10086 127.0.0.1:12345
addReplySds(c,sdscatprintf(sdsempty(),
"-%s %d %s:%d\r\n",
(error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
hashslot,n->ip,n->port));

return REDIS_OK;
}

// 如果执行到这里,说明键 key 所在的槽由本节点处理
// 或者客户端执行的是无参数命令
}
}

分区的缺点

有些特性在分区的情况下会受到限制:

  • 涉及多个key的操作通常不会被支持。例如你不能对两个集合求交集,因为他们可能被存储到不同的Redis实例(实际上这种情况也有办法,但是不能直接使用交集指令)。
    同时操作多个key,则不能使用Redis事务.
  • 分区使用的粒度是key,不能使用一个非常长的排序key存储一个数据集(The partitioning granularity is the key, so it is not possible to shard a dataset with a single huge key like a very big sorted set).
  • 当使用分区的时候,数据处理会非常复杂,例如为了备份你必须从不同的Redis实例和主机同时收集RDB / AOF文件。
  • 分区时动态扩容或缩容可能非常复杂。Redis集群在运行时增加或者删除Redis节点,能做到最大程度对用户透明地数据再平衡,但其他一些客户端分区或者代理分区方法则不支持这种特性。然而,有一种预分片的技术也可以较好的解决这个问题。

当要把Redis当作持久化存储时,需要注意分区的性质

  • 如果Redis被当做缓存使用,使用一致性哈希实现动态扩容缩容。
  • 如果Redis被当做一个持久化存储使用,必须使用固定的keys-to-nodes映射关系,节点的数量一旦确定不能变化。否则的话(即Redis节点需要动态变化的情况),必须使用可以在运行时进行数据再平衡的一套系统,现在Redis Cluster已经支持这种再平衡。

节点通信

Redis Cluster采用Gossip协议完成集群状态数据及路由数据等元数据的管理。
一种简单的集群内状态同步思路是:每次节点都将自己本地的集群状态数据广播到集群内所有N个节点,其他节点判断接收到的数据比本地的新则更新本地数据。但是这种方式的缺点是通信量剧增,网络带宽变得紧张。
因此Redis采用Gossip协议来进行集群内元数据的同步,而且:
1、每次只随机选择K(K << N)个其他节点来同步状态;
集群内每个节点维护定时任务默认每秒执行10次,每秒会随机选取5个节点找出最久没有通信的节点发送ping消息,用于保证Gossip信息交换的随机性。每100毫秒都会扫描本地节点列表,如果发现节点最近一次接受pong消息的时间大于cluster_node_timeout/2,则立刻发送ping消息,防止该节点信息太长时间未更新。根据以上规则得出每个节点每秒需要发送ping消息的数量=1+10*num(node.pong_received>cluster_node_timeout/2),因此cluster_node_timeout参数对消息发送的节点数量影响非常大。当我们的带宽资源紧张时,可以适当调大这个参数,如从默认15秒改为30秒来降低带宽占用率。过度调大cluster_node_timeout会影响消息交换的频率从而影响故障转移、槽信息更新、新节点发现的速度。因此需要根据业务容忍度和资源消耗进行平衡。同时整个集群消息总交换量也跟节点数成正比。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
/* This is executed 10 times every second */
// 集群常规操作函数,默认每秒执行 10 次(每间隔 100 毫秒执行一次)
void clusterCron(void) {
dictIterator *di;
dictEntry *de;
int update_state = 0;
int orphaned_masters; /* How many masters there are without ok slaves. */
int max_slaves; /* Max number of ok slaves for a single master. */
int this_slaves; /* Number of ok slaves for our master (if we are slave). */
mstime_t min_pong = 0, now = mstime();
clusterNode *min_pong_node = NULL;
// 迭代计数器,一个静态变量
static unsigned long long iteration = 0;
mstime_t handshake_timeout;

// 记录一次迭代
iteration++; /* Number of times this function was called so far. */

/* The handshake timeout is the time after which a handshake node that was
* not turned into a normal node is removed from the nodes. Usually it is
* just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
* the value of 1 second. */
// 如果一个 handshake 节点没有在 handshake timeout 内
// 转换成普通节点(normal node),
// 那么节点会从 nodes 表中移除这个 handshake 节点
// 一般来说 handshake timeout 的值总是等于 NODE_TIMEOUT
// 不过如果 NODE_TIMEOUT 太少的话,程序会将值设为 1 秒钟
handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000;

/* Check if we have disconnected nodes and re-establish the connection. */
// 向集群中的所有断线或者未连接节点发送消息
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

// 跳过当前节点以及没有地址的节点
if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;

/* A Node in HANDSHAKE state has a limited lifespan equal to the
* configured node timeout. */
// 如果 handshake 节点已超时,释放它
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
freeClusterNode(node);
continue;
}

// 为未创建连接的节点创建连接
if (node->link == NULL) {
int fd;
mstime_t old_ping_sent;
clusterLink *link;

fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
node->port+REDIS_CLUSTER_PORT_INCR,
server.bindaddr_count ? server.bindaddr[0] : NULL);
if (fd == -1) {
redisLog(REDIS_DEBUG, "Unable to connect to "
"Cluster Node [%s]:%d -> %s", node->ip,
node->port+REDIS_CLUSTER_PORT_INCR,
server.neterr);
continue;
}
link = createClusterLink(node);
link->fd = fd;
node->link = link;
aeCreateFileEvent(server.el,link->fd,AE_READABLE,
clusterReadHandler,link);
/* Queue a PING in the new connection ASAP: this is crucial
* to avoid false positives in failure detection.
*
* If the node is flagged as MEET, we send a MEET message instead
* of a PING one, to force the receiver to add us in its node
* table. */
// 向新连接的节点发送 PING 命令,防止节点被识进入下线
// 如果节点被标记为 MEET ,那么发送 MEET 命令,否则发送 PING 命令
old_ping_sent = node->ping_sent;
clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);

// 这不是第一次发送 PING 信息,所以可以还原这个时间
// 等 clusterSendPing() 函数来更新它
if (old_ping_sent) {
/* If there was an active ping before the link was
* disconnected, we want to restore the ping time, otherwise
* replaced by the clusterSendPing() call. */
node->ping_sent = old_ping_sent;
}

/* We can clear the flag after the first packet is sent.
*
* 在发送 MEET 信息之后,清除节点的 MEET 标识。
*
* If we'll never receive a PONG, we'll never send new packets
* to this node. Instead after the PONG is received and we
* are no longer in meet/handshake status, we want to send
* normal PING packets.
*
* 如果当前节点(发送者)没能收到 MEET 信息的回复,
* 那么它将不再向目标节点发送命令。
*
* 如果接收到回复的话,那么节点将不再处于 HANDSHAKE 状态,
* 并继续向目标节点发送普通 PING 命令。
*/
node->flags &= ~REDIS_NODE_MEET;

redisLog(REDIS_DEBUG,"Connecting with Node %.40s at %s:%d",
node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
}
}
dictReleaseIterator(di);

/* Ping some random node 1 time every 10 iterations, so that we usually ping
* one random node every second. */
// clusterCron() 每执行 10 次(至少间隔一秒钟),就向一个随机节点发送 gossip 信息
if (!(iteration % 10)) {
int j;

/* Check a few random nodes and ping the one with the oldest
* pong_received time. */
// 随机 5 个节点,选出其中一个
for (j = 0; j < 5; j++) {

// 随机在集群中挑选节点
de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);

/* Don't ping nodes disconnected or with a ping currently active. */
// 不要 PING 连接断开的节点,也不要 PING 最近已经 PING 过的节点
if (this->link == NULL || this->ping_sent != 0) continue;

if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE))
continue;

// 选出 5 个随机节点中最近一次接收 PONG 回复距离现在最旧的节点
if (min_pong_node == NULL || min_pong > this->pong_received) {
min_pong_node = this;
min_pong = this->pong_received;
}
}

// 向最久没有收到 PONG 回复的节点发送 PING 命令
if (min_pong_node) {
redisLog(REDIS_DEBUG,"Pinging node %.40s", min_pong_node->name);
clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
}
}

// 遍历所有节点,检查是否需要将某个节点标记为下线
/* Iterate nodes to check if we need to flag something as failing.
* This loop is also responsible to:
* 1) Check if there are orphaned masters (masters without non failing
* slaves).
* 2) Count the max number of non failing slaves for a single master.
* 3) Count the number of slaves for our master, if we are a slave. */
orphaned_masters = 0;
max_slaves = 0;
this_slaves = 0;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
now = mstime(); /* Use an updated time at every iteration. */
mstime_t delay;

// 跳过节点本身、无地址节点、HANDSHAKE 状态的节点
if (node->flags &
(REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE))
continue;

/* Orphaned master check, useful only if the current instance
* is a slave that may migrate to another master. */
if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
int okslaves = clusterCountNonFailingSlaves(node);

if (okslaves == 0 && node->numslots > 0) orphaned_masters++;
if (okslaves > max_slaves) max_slaves = okslaves;
if (nodeIsSlave(myself) && myself->slaveof == node)
this_slaves = okslaves;
}

/* If we are waiting for the PONG more than half the cluster
* timeout, reconnect the link: maybe there is a connection
* issue even if the node is alive. */
// 如果等到 PONG 到达的时间超过了 node timeout 一半的连接
// 因为尽管节点依然正常,但连接可能已经出问题了
if (node->link && /* is connected */
now - node->link->ctime >
server.cluster_node_timeout && /* was not already reconnected */
node->ping_sent && /* we already sent a ping */
node->pong_received < node->ping_sent && /* still waiting pong */
/* and we are waiting for the pong more than timeout/2 */
now - node->ping_sent > server.cluster_node_timeout/2)
{
/* Disconnect the link, it will be reconnected automatically. */
// 释放连接,下次 clusterCron() 会自动重连
freeClusterLink(node->link);
}

/* If we have currently no active ping in this instance, and the
* received PONG is older than half the cluster timeout, send
* a new ping now, to ensure all the nodes are pinged without
* a too big delay. */
// 如果目前没有在 PING 节点
// 并且已经有 node timeout 一半的时间没有从节点那里收到 PONG 回复
// 那么向节点发送一个 PING ,确保节点的信息不会太旧
// (因为一部分节点可能一直没有被随机中)
if (node->link &&
node->ping_sent == 0 &&
(now - node->pong_received) > server.cluster_node_timeout/2)
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}

/* If we are a master and one of the slaves requested a manual
* failover, ping it continuously. */
// 如果这是一个主节点,并且有一个从服务器请求进行手动故障转移
// 那么向从服务器发送 PING 。
if (server.cluster->mf_end &&
nodeIsMaster(myself) &&
server.cluster->mf_slave == node &&
node->link)
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}

/* Check only if we have an active ping for this instance. */
// 以下代码只在节点发送了 PING 命令的情况下执行
if (node->ping_sent == 0) continue;

/* Compute the delay of the PONG. Note that if we already received
* the PONG, then node->ping_sent is zero, so can't reach this
* code at all. */
// 计算等待 PONG 回复的时长
delay = now - node->ping_sent;

// 等待 PONG 回复的时长超过了限制值,将目标节点标记为 PFAIL (疑似下线)
if (delay > server.cluster_node_timeout) {
/* Timeout reached. Set the node as possibly failing if it is
* not already in this state. */
if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {
redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
node->name);
// 打开疑似下线标记
node->flags |= REDIS_NODE_PFAIL;
update_state = 1;
}
}
}
dictReleaseIterator(di);

/* If we are a slave node but the replication is still turned off,
* enable it if we know the address of our master and it appears to
* be up. */
// 如果从节点没有在复制主节点,那么对从节点进行设置
if (nodeIsSlave(myself) &&
server.masterhost == NULL &&
myself->slaveof &&
nodeHasAddr(myself->slaveof))
{
replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
}

/* Abourt a manual failover if the timeout is reached. */
manualFailoverCheckTimeout();

if (nodeIsSlave(myself)) {
clusterHandleManualFailover();
clusterHandleSlaveFailover();
/* If there are orphaned slaves, and we are a slave among the masters
* with the max number of non-failing slaves, consider migrating to
* the orphaned masters. Note that it does not make sense to try
* a migration if there is no master with at least *two* working
* slaves. */
if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
clusterHandleSlaveMigration(max_slaves);
}

// 更新集群状态
if (update_state || server.cluster->state == REDIS_CLUSTER_FAIL)
clusterUpdateState();
}

2、状态信息并不是全量同步,而是随机选M(M << N)个节点的状态同步到其他节点。
M值最小为3,最大为N - 2,一般情况下M = N / 10

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/* Send a PING or PONG packet to the specified node, making sure to add enough
* gossip informations. */
// 向指定节点发送一条 MEET 、 PING 或者 PONG 消息
void clusterSendPing(clusterLink *link, int type) {
unsigned char buf[sizeof(clusterMsg)];
clusterMsg *hdr = (clusterMsg*) buf;
int gossipcount = 0, totlen;
/* freshnodes is the number of nodes we can still use to populate the
* gossip section of the ping packet. Basically we start with the nodes
* we have in memory minus two (ourself and the node we are sending the
* message to). Every time we add a node we decrement the counter, so when
* it will drop to <= zero we know there is no more gossip info we can
* send. */
// freshnodes 是用于发送 gossip 信息的计数器
// 每次发送一条信息时,程序将 freshnodes 的值减一
// 当 freshnodes 的数值小于等于 0 时,程序停止发送 gossip 信息
// freshnodes 的数量是节点目前的 nodes 表中的节点数量减去 2
// 这里的 2 指两个节点,一个是 myself 节点(也即是发送信息的这个节点)
// 另一个是接受 gossip 信息的节点
int freshnodes = dictSize(server.cluster->nodes)-2;

// 如果发送的信息是 PING ,那么更新最后一次发送 PING 命令的时间戳
if (link->node && type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = mstime();

// 将当前节点的信息(比如名字、地址、端口号、负责处理的槽)记录到消息里面
clusterBuildMessageHdr(hdr,type);

/* Populate the gossip fields */
// 从当前节点已知的节点中随机选出两个节点
// 并通过这条消息捎带给目标节点,从而实现 gossip 协议

// 每个节点有 freshnodes 次发送 gossip 信息的机会
// 每次向目标节点发送 2 个被选中节点的 gossip 信息(gossipcount 计数)
while(freshnodes > 0 && gossipcount < 3) {
// 从 nodes 字典中随机选出一个节点(被选中节点)
dictEntry *de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);

clusterMsgDataGossip *gossip;
int j;

/* In the gossip section don't include:
* 以下节点不能作为被选中节点:
* 1) Myself.
* 节点本身。
* 2) Nodes in HANDSHAKE state.
* 处于 HANDSHAKE 状态的节点。
* 3) Nodes with the NOADDR flag set.
* 带有 NOADDR 标识的节点
* 4) Disconnected nodes if they don't have configured slots.
* 因为不处理任何槽而被断开连接的节点
*/
if (this == myself ||
this->flags & (REDIS_NODE_HANDSHAKE|REDIS_NODE_NOADDR) ||
(this->link == NULL && this->numslots == 0))
{
freshnodes--; /* otherwise we may loop forever. */
continue;
}

/* Check if we already added this node */
// 检查被选中节点是否已经在 hdr->data.ping.gossip 数组里面
// 如果是的话说明这个节点之前已经被选中了
// 不要再选中它(否则就会出现重复)
for (j = 0; j < gossipcount; j++) {
if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
REDIS_CLUSTER_NAMELEN) == 0) break;
}
if (j != gossipcount) continue;

/* Add it */

// 这个被选中节点有效,计数器减一
freshnodes--;

// 指向 gossip 信息结构
gossip = &(hdr->data.ping.gossip[gossipcount]);

// 将被选中节点的名字记录到 gossip 信息
memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
// 将被选中节点的 PING 命令发送时间戳记录到 gossip 信息
gossip->ping_sent = htonl(this->ping_sent);
// 将被选中节点的 PING 命令回复的时间戳记录到 gossip 信息
gossip->pong_received = htonl(this->pong_received);
// 将被选中节点的 IP 记录到 gossip 信息
memcpy(gossip->ip,this->ip,sizeof(this->ip));
// 将被选中节点的端口号记录到 gossip 信息
gossip->port = htons(this->port);
// 将被选中节点的标识值记录到 gossip 信息
gossip->flags = htons(this->flags);

// 这个被选中节点有效,计数器增一
gossipcount++;
}

// 计算信息长度
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
// 将被选中节点的数量(gossip 信息中包含了多少个节点的信息)
// 记录在 count 属性里面
hdr->count = htons(gossipcount);
// 将信息的长度记录到信息里面
hdr->totlen = htonl(totlen);

// 发送信息
clusterSendMessage(link,buf,totlen);
}

扩容 / 缩容

当新的节点加入时,我们该如何重新分配数据,让新的节点也对外提供服务。当有节点退出时,我们该如何把存在该节点上的数据分配到其他机器上,让其他机器来提供这部分数据的服务。即集群的扩缩容问题。

新节点加入流程

新节点加入时,需要把一部分数据迁移到新节点来达到集群的负载均衡。
在Redis集群中,数据的存储是以slot为单位的,因此:

  1. 集群的伸缩本质上就是slot在不同机器节点之间的迁移;
  2. 迁移过程中,有的slot在老节点上,有的slot在新节点上,这时,客户端请求应该被重定向到正确的节点上。
    比如slot1从A迁移到B上时,请求A或B会怎么样?请求别的节点又会怎么样?

节点的迁移过程主要分为3个步骤:

  1. 准备新节点
  2. 加入集群
  3. 迁移slot到新节点

以下迁移过程的伪代码来自:Redis集群详解(中)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def move_slot(source,target,slot):
# 目标节点准备导入槽
target.cluster("setslot",slot,"importing",source.nodeId);
# 目标节点准备全出槽
source.cluster("setslot",slot,"migrating",target.nodeId);
while true :
# 批量从源节点获取键
keys = source.cluster("getkeysinslot",slot,pipeline_size);
if keys.length == 0:
# 键列表为空时,退出循环
break;
# 批量迁移键到目标节点
source.call("migrate",target.host,target.port,"",0,timeout,"keys",keys);
# 向集群所有主节点通知槽被分配给目标节点
for node in nodes:
if node.flag == "slave":
continue;
node.cluster("setslot",slot,"node",target.nodeId);

节点迁移过程

以下命令告知目标节点准备导入slot:

1
cluster setslot <slot> IMPORTING <nodeId>

以下命令告知目标节点准备导出slot:

1
cluster setslot <slot> MIGRATING <nodeId>

每个节点保存的集群状态中记录了迁移中的slot,其中,迁出的slot放到migrating_slots_to中,迁入的slot放到importing_slots_from

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
typedef struct clusterState {
clusterNode *myself; /* This node */
// 当前纪元
uint64_t currentEpoch;
// 集群的状态
int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */
// 集群中至少负责一个槽的主节点个数
int size; /* Num of master nodes with at least one slot */
// 保存集群节点的字典,键是节点名字,值是clusterNode结构的指针
dict *nodes; /* Hash table of name -> clusterNode structures */
// 防止重复添加节点的黑名单
dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */
// 导入槽数据到目标节点,该数组记录这些节点
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
// 导出槽数据到目标节点,该数组记录这些节点
clusterNode *importing_slots_from[CLUSTER_SLOTS];
// 槽和负责槽节点的映射
clusterNode *slots[CLUSTER_SLOTS];
// 槽映射到键的有序集合
zskiplist *slots_to_keys;

} clusterState;

接下来,将待迁移slot中的key批量转移到目标节点:

1
2
3
4
# 返回count个slot中的键
cluster getkeysinslot <slot> <count>
# 需要对上面命令返回的每个键都发送以下命令,该命令会将所指定的键原子地从源节点移动到目标节点
migrate <host> <port> key destination-db timeout

migrate命令就是向节点发送了N个RESTORE-ASKING命令,实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/* Create RESTORE payload and generate the protocol to call the command. */
for (j = 0; j < num_keys; j++) {
long long ttl = 0;
long long expireat = getExpire(c->db,kv[j]);
//检查键是不是已经过期
if (expireat != -1) {
ttl = expireat-mstime();
if (ttl < 0) {
continue;
}
if (ttl < 1) ttl = 1;
}
kv[non_expired++] = kv[j];

// 集群模式下写入RESTORE-ASKING命令,普通模式下写入RESTORE命令
if (server.cluster_enabled)
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
else
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
// 写入KEY,写入TTL
serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
sdslen(kv[j]->ptr)));
serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
// 写入VALUE以及Redis版本校验码等信息
createDumpPayload(&payload,ov[j],kv[j]);
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,payload.io.buffer.ptr,
sdslen(payload.io.buffer.ptr)));

}

迁入节点接收到restore-asking命令后,执行节点的恢复操作,即获取key,解析出value,然后写入数据库:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/* RESTORE key ttl serialized-value [REPLACE] */
// 根据给定的 DUMP 数据,还原出一个键值对数据,并将它保存到数据库里面
void restoreCommand(redisClient *c) {
long long ttl;
rio payload;
int j, type, replace = 0;
robj *obj;

...

// 读取 DUMP 数据,并反序列化出键值对的类型和值
rioInitWithBuffer(&payload,c->argv[3]->ptr);
if (((type = rdbLoadObjectType(&payload)) == -1) ||
((obj = rdbLoadObject(type,&payload)) == NULL))
{
addReplyError(c,"Bad data format");
return;
}

/* Remove the old key if needed. */
// 如果给定了 REPLACE 选项,那么先删除数据库中已存在的同名键
if (replace) dbDelete(c->db,c->argv[1]);

/* Create the key and set the TTL if any */
// 将键值对添加到数据库
dbAdd(c->db,c->argv[1],obj);

// 如果键带有 TTL 的话,设置键的 TTL
if (ttl) setExpire(c->db,c->argv[1],mstime()+ttl);

signalModifiedKey(c->db,c->argv[1]);

addReply(c,shared.ok);
server.dirty++;
}

迁移过程中,在外部客户端的视角看来,在任意时间点上,key只会存在于某个节点上,而不会同时存在于两个节点上。

现在,待迁移槽中的key都已经被迁移了,但是对其他节点来说,该slot仍是由迁出节点负责的,它们接收到相关请求后仍然会路由到迁出节点,所以迁移的最后一步需要向集群中的所有主节点通知槽已经被分配给目标节点。

1
cluster setslot <slot> node <nodeId>

迁移过程中对新请求的响应

迁移过程中:

  • 如果迁出节点接收请求,迁出节点判断slot或key是否已迁出,若是则ASK重定向到迁入节点上,否则迁出节点自己负责处理请求;
  • 如果迁入节点接收请求,会把请求重定向到迁出节点上,除非请求中包含ASKING命令;
  • 其他节点接收到的相关请求会被重定向到迁出节点上;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    int processCommand(redisClient *c) {

    ...

    /* If cluster is enabled perform the cluster redirection here.
    *
    * 如果开启了集群模式,那么在这里进行转向操作。
    *
    * However we don't perform the redirection if:
    *
    * 不过,如果有以下情况出现,那么节点不进行转向:
    *
    * 1) The sender of this command is our master.
    * 命令的发送者是本节点的主节点
    *
    * 2) The command has no key arguments.
    * 命令没有 key 参数
    */
    if (server.cluster_enabled &&
    !(c->flags & REDIS_MASTER) &&
    !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
    {
    int hashslot;

    // 集群已下线
    if (server.cluster->state != REDIS_CLUSTER_OK) {
    flagTransaction(c);
    addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n"));
    return REDIS_OK;

    // 集群运作正常
    } else {
    int error_code;
    clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
    // 不能执行多键处理命令
    if (n == NULL) {
    flagTransaction(c);
    if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) {
    addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
    } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) {
    /* The request spawns mutliple keys in the same slot,
    * but the slot is not "stable" currently as there is
    * a migration or import in progress. */
    addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
    } else {
    redisPanic("getNodeByQuery() unknown error.");
    }
    return REDIS_OK;

    // 命令针对的槽和键不是本节点处理的,进行转向
    } else if (n != server.cluster->myself) {
    flagTransaction(c);
    // -<ASK or MOVED> <slot> <ip>:<port>
    // 例如 -ASK 10086 127.0.0.1:12345
    addReplySds(c,sdscatprintf(sdsempty(),
    "-%s %d %s:%d\r\n",
    (error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
    hashslot,n->ip,n->port));

    return REDIS_OK;
    }

    // 如果执行到这里,说明键 key 所在的槽由本节点处理
    // 或者客户端执行的是无参数命令
    }
    }

    ...
    }
    注意上边的getNodeByQuery根据key的散列结果查询命令应该被打到的节点,可以看到这个函数里有对ASKING标识的特殊处理:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {

    ...

    /* If we are receiving the slot, and the client correctly flagged the
    * request as "ASKING", we can serve the request. However if the request
    * involves multiple keys and we don't have them all, the only option is
    * to send a TRYAGAIN error. */
    if (importing_slot &&
    (c->flags & REDIS_ASKING || cmd->flags & REDIS_CMD_ASKING))
    {
    if (multiple_keys && missing_keys) {
    if (error_code) *error_code = REDIS_CLUSTER_REDIR_UNSTABLE;
    return NULL;
    } else {
    return myself;
    }
    }

    ...
    }

旧节点退出流程

与新节点的加入相反的是,旧节点退出时需要把其上的数据迁移到其他节点上,确保该节点上的数据能够被正常访问。
槽的迁移过程和上边扩容中描述的没有区别,主要区别是在迁移完毕后需要轮询每个节点发送cluster forget命令,让它们能忘记下线的节点。
节点在接收cluster forget命令后,会将目标节点的状态从自己保存的集群状态中移除,并将其加入黑名单中60s,这期间其他节点不会再去更新自己维护的该节点的信息,也就是说这60秒内该节点无法重新加入集群内。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def delnode_cluster_cmd(downNode):
# 下线节点不允许包含slots
if downNode.slots.length != 0
exit 1
end
# 向集群内节点发送cluster forget
for n in nodes:
if n.id == downNode.id:
# 不能对自己做forget操作
continue;
# 如果下线节点有从节点则把从节点指向其他主节点
if n.replicate && n.replicate.nodeId == downNode.id :
# 指向拥有最少从节点的主节点
master = get_master_with_least_replicas();
n.cluster("replicate",master.nodeId);
#发送忘记节点命令
n.cluster('forget',downNode.id)
# 节点关闭
downNode.shutdown();

集群规模估算

集群规模并不是没有限制的,理论上每个节点一个slot集群可以扩容到16384个节点,但是Redis官方给出的规模上限是一个集群1000个节点,因为实例间的通信开销会随着实例规模增加而增大
下面来讨论下集群内部有哪些交互,并分析它们会对性能有什么样的影响。

实例间数据的同步

集群每个节点都会记录slot和实例间的映射关系,用于请求的重定向。
每个实例都需要通过Gossip协议将数据同步到其他节点,大致流程为:

  1. 每个实例之间会按照一定的频率,从集群中随机挑选一些实例,把 PING 消息发送给挑选出来的实例,用来检测这些实例是否在线,并交换彼此的状态信息。PING 消息中封装了发送消息的实例自身的状态信息、部分其它实例的状态信息,以及 Slot 映射表。
    发送的节点状态信息在源码中由clusterMsgDataGossip这个结构来表示,大小为104字节。每个实例在发送Gossip消息时,除了传递自身的状态信息,默认还会传递集群十分之一实例的状态信息,比如,对于一个包含了 1000 个实例的集群来说,每个实例发送一个 PING 消息时,会包含 100 个实例的状态信息,总的数据量是 10400 字节,再加上发送实例自身的信息,一个 Gossip 消息大约是 10KB。
    另外,Slot映射表是一个16384位的bitmap,算上上面的10KB就是12KB的内容。
  2. 一个实例在接收到 PING 消息后,会给发送 PING 消息的实例,发送一个 PONG 消息。PONG 消息包含的内容和 PING 消息一样,也是12KB。
  3. 另外,上面是随机选节点发PING请求的,如果部分节点一直没有被选到,就会导致这些节点和其他节点不同步。
    为了避免这种情况,Redis Cluster 的实例会按照每 100ms 一次的频率,扫描本地的实例列表,如果发现有实例最近一次接收 PONG 消息的时间,已经大于配置项 cluster-node-timeout 的一半了(cluster-node-timeout/2),就会立刻给该实例发送 PING 消息,更新这个实例上的集群状态信息。
    当集群规模扩大之后,因为网络拥塞或是不同服务器间的流量竞争,会导致实例间的网络通信延迟增加。如果有部分实例无法收到其它实例发送的 PONG 消息,就会引起实例之间频繁地发送 PING 消息,这又会对集群网络通信带来额外的开销了。

从上可知,实例间的数据同步受到通信消息大小通信频率这两方面的影响。
当集群规模扩大后,PING/PONG会占用大量的集群内网络带宽,降低集群服务正常请求的吞吐量。
单实例每秒会发送的PING消息数量大致可以算出是(注意cron是100ms执行一次):

1
PING 消息发送数量 = 1 + 10 * 实例数(最近一次接收 PONG 消息的时间超出 cluster-node-timeout/2)

其中,1 是指单实例常规按照每 1 秒发送一个 PING 消息,10 是指每 1 秒内实例会执行 10 次检查,每次检查后会给 PONG 消息超时的实例发送消息。
假设单个实例检测发现,每 100 毫秒有 10 个实例的 PONG 消息接收超时,那么,这个实例每秒就会发送 101 个 PING 消息,约占 1.2MB/s 带宽。如果集群中有 30 个实例按照这种频率发送消息,就会占用 36MB/s 带宽,这就会挤占集群中用于服务正常请求的带宽。

因此实例间的通信开销优化主要是:

  1. 减少实例传输的消息大小(PING/PONG 消息、Slot 分配信息)
    但是,因为集群实例依赖 PING、PONG 消息和 Slot 分配信息,来维持集群状态的统一,一旦减小了传递的消息大小,就会导致实例间的通信信息减少,不利于集群维护,所以,我们不能采用这种方式。
  2. 降低实例间发送消息的频率
    从上面PING消息发送数量公式可以看出,每秒发送一条PING消息的频率不算高,如果要降低可能导致集群内数据同步延迟;每100ms做一次检测并给延迟超过cluster-node-timeout/2的节点发送PING消息,这个配置是可以适当调大的。
    • 如果配置得比较小,则在大规模集群中会频繁出现PONG超时的情况;
    • 如果配置得过大,则如果真得发生了故障,我们反而需要等比较长的时间才能检测出来。
      可以在调整前后使用tcpdump抓取实例发送心跳网络包的情况。
      tcpdump host 192.168.10.3 port 16379 -i 网卡名 -w /tmp/r1.cap

故障恢复(容错)

Redis故障恢复主要分为以下3个步骤:

  1. 故障发现
    采用多数派协议完成故障检测判断(即至少有半数以上节点认为某主节点故障后才真正判断节点故障)。
  2. 子节点选举
    Redis Cluster中每个Master都会有1至多个Slave,通过复制实现高可用(故障转移),当Master有多个Slave,会采用Raft实现选举出一个主节点以实现故障恢复。
  3. 配置更新
    故障转移后,那么之前的Master和其他Slave怎么处理?Redis会将这些节点成为新Master节点的子节点。

故障发现

一些 CP 特性且中心化的集群来说,当出现节点宕机时经常需要选举新的 Leader 节点,但是 Redis-Cluster 是去中心化的,某个 Master 的宕机并不会影响其他节点的工作。但是,当节点失联时,需要考虑网络的抖动情况,毕竟不能因为某几个请求意外超时就推断集群失败了,部分节点判断一个节点失联只会标记这个节点状态为PFAIL(主观下线),之后如果多数节点投票通过才会真正标记这个节点FAIL(下线)
投票过程是集群中所有 master 参与的,每个节点都存有整个集群所有主节点及从节点的信息,它们之间通过互相 ping-pong 来判断节点是否可以连上,如果半数以上 master 节点与当前 master 节点通信超时(cluster-node-timeout),则认为当前 master 节点挂掉,标记这个节点状态为FAIL

当 master 挂掉时,并不意味着集群已无法再提供服务了,集群要进入fail(不可用)状态需要满足以下条件之一:

  1. 集群的任意 master 挂掉,且该 master 没有 slave 或 slave 全挂掉了,则集群进入 fail 状态。
    这是因为,Cluster中所有slot是平均分配到每个Master的,如果有一个Master的slot不能用了、而且这个Master还没有Slave,那么集群就不能提供服务了,如果Master还有Slave,Slave可以代替Master继续向外提供服务,这个步骤称为slave promotion
    单独的一对Master-Slave挂掉,Redis还提供一个叫 Replica Migration 的解决方案:当集群中的某个Master节点没有Slave节点时(称之为 Orphaned Master),其他有富余Slave节点的主节点会向该节点迁移一个Slave节点以防该节点下线之后没有子节点来替换从而导致整个集群下线。
  2. 集群超过半数以上 master 挂掉,无论有无 slave 都进入 fail 状态。

当集群不可用时,任何操作都将返回((error) CLUSTERDOWN The cluster is down)错误。需要注意的是,必须要 3 个或以上的主节点,否则在创建集群时会失败。

PFAIL

1、Redis每个节点会不断向其他节点发送PING消息来检测其他节点是否可达,如果超时会先断开连接:
代码:cluster.c/clusterCron
RedisCluster故障发现1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/* This is executed 10 times every second */
// 集群常规操作函数,默认每秒执行 10 次(每间隔 100 毫秒执行一次)
void clusterCron(void) {
...

while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
now = mstime(); /* Use an updated time at every iteration. */
mstime_t delay;

...

/* If we are waiting for the PONG more than half the cluster
* timeout, reconnect the link: maybe there is a connection
* issue even if the node is alive. */
// 判断连接的节点是否出事
if (node->link && /* is connected */
now - node->link->ctime >
server.cluster_node_timeout && /* was not already reconnected */
// ping_sent记录发送命令的时间
node->ping_sent && /* we already sent a ping */
node->pong_received < node->ping_sent && /* still waiting pong */
/* and we are waiting for the pong more than timeout/2 */
// PONG 到达的时间超过了 node_timeout 的一半
now - node->ping_sent > server.cluster_node_timeout/2)
{
/* Disconnect the link, it will be reconnected automatically. */
// 释放连接,此时node->link=NULL,下次 clusterCron() 会自动重连
freeClusterLink(node->link);
}

...
}

...
}

2、此时节点A PING目标节点B失败,A会尝试重连,并将重连时间记录到ping_sent变量中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/* This is executed 10 times every second */
// 集群常规操作函数,默认每秒执行 10 次(每间隔 100 毫秒执行一次)
void clusterCron(void) {

...

/* Check if we have disconnected nodes and re-establish the connection. */
// 向集群中的所有断线或者未连接节点发送消息
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

// 跳过当前节点以及没有地址的节点
if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;

...

// 为未创建连接的节点创建连接
if (node->link == NULL) {
int fd;
mstime_t old_ping_sent;
clusterLink *link;

fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
node->port+REDIS_CLUSTER_PORT_INCR,
server.bindaddr_count ? server.bindaddr[0] : NULL);
if (fd == -1) {
redisLog(REDIS_DEBUG, "Unable to connect to "
"Cluster Node [%s]:%d -> %s", node->ip,
node->port+REDIS_CLUSTER_PORT_INCR,
server.neterr);
continue;
}
link = createClusterLink(node);
link->fd = fd;
node->link = link;
aeCreateFileEvent(server.el,link->fd,AE_READABLE,
clusterReadHandler,link);
/* Queue a PING in the new connection ASAP: this is crucial
* to avoid false positives in failure detection.
*
* If the node is flagged as MEET, we send a MEET message instead
* of a PING one, to force the receiver to add us in its node
* table. */
// 向新连接的节点发送 PING 命令,防止节点被识进入下线
// 如果节点被标记为 MEET ,那么发送 MEET 命令,否则发送 PING 命令
old_ping_sent = node->ping_sent;
clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);

// 这不是第一次发送 PING 信息,所以可以还原这个时间
// 等 clusterSendPing() 函数来更新它
if (old_ping_sent) {
/* If there was an active ping before the link was
* disconnected, we want to restore the ping time, otherwise
* replaced by the clusterSendPing() call. */
node->ping_sent = old_ping_sent;
}

/* We can clear the flag after the first packet is sent.
*
* 在发送 MEET 信息之后,清除节点的 MEET 标识。
*
* If we'll never receive a PONG, we'll never send new packets
* to this node. Instead after the PONG is received and we
* are no longer in meet/handshake status, we want to send
* normal PING packets.
*
* 如果当前节点(发送者)没能收到 MEET 信息的回复,
* 那么它将不再向目标节点发送命令。
*
* 如果接收到回复的话,那么节点将不再处于 HANDSHAKE 状态,
* 并继续向目标节点发送普通 PING 命令。
*/
node->flags &= ~REDIS_NODE_MEET;

redisLog(REDIS_DEBUG,"Connecting with Node %.40s at %s:%d",
node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
}
}

...

}

3、节点A发现PING B的延时时间超过了node_timeout之后,就会标记该节点为PFAIL(Possible FAILure),即主观下线:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
void clusterCron(void) {

...

// 遍历所有节点,检查是否需要将某个节点标记为下线
/* Iterate nodes to check if we need to flag something as failing.
* This loop is also responsible to:
* 1) Check if there are orphaned masters (masters without non failing
* slaves).
* 2) Count the max number of non failing slaves for a single master.
* 3) Count the number of slaves for our master, if we are a slave. */
orphaned_masters = 0;
max_slaves = 0;
this_slaves = 0;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {

...

/* Check only if we have an active ping for this instance. */
// 以下代码只在节点发送了 PING 命令的情况下执行
if (node->ping_sent == 0) continue;

/* Compute the delay of the PONG. Note that if we already received
* the PONG, then node->ping_sent is zero, so can't reach this
* code at all. */
// 计算等待 PONG 回复的时长
delay = now - node->ping_sent;

// 等待 PONG 回复的时长超过了限制值,将目标节点标记为 PFAIL (疑似下线)
if (delay > server.cluster_node_timeout) {
/* Timeout reached. Set the node as possibly failing if it is
* not already in this state. */
if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {
redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
node->name);
// 打开疑似下线标记
node->flags |= REDIS_NODE_PFAIL;
update_state = 1;
}
}
}

...
}

FAIL

RedisCluster故障发现2
1、A将B标记为PFAIL后,A会通过Gossip通知到其他节点。

2、所有节点会维护一个下线报告列表(Fail Report),主要维护一个节点被哪些节点报告处于下线状态,此时,C会记录“B被A报告下线了”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
int clusterProcessPacket(clusterLink *link) {

...

/* Process packets by type. */
// 根据消息的类型,处理节点

// 这是一条 PING 消息或者 MEET 消息
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
redisLog(REDIS_DEBUG,"Ping packet received: %p", (void*)link->node);

/* Add this node if it is new for us and the msg type is MEET.
*
* 如果当前节点是第一次遇见这个节点,并且对方发来的是 MEET 信息,
* 那么将这个节点添加到集群的节点列表里面。
*
* In this stage we don't try to add the node with the right
* flags, slaveof pointer, and so forth, as this details will be
* resolved when we'll receive PONGs from the node.
*
* 节点目前的 flag 、 slaveof 等属性的值都是未设置的,
* 等当前节点向对方发送 PING 命令之后,
* 这些信息可以从对方回复的 PONG 信息中取得。
*/
if (!sender && type == CLUSTERMSG_TYPE_MEET) {
clusterNode *node;

// 创建 HANDSHAKE 状态的新节点
node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);

// 设置 IP 和端口
nodeIp2String(node->ip,link);
node->port = ntohs(hdr->port);

// 将新节点添加到集群
clusterAddNode(node);

clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}

/* Get info from the gossip section */
// 分析并取出消息中的 gossip 节点信息
clusterProcessGossipSection(hdr,link);

/* Anyway reply with a PONG */
// 向目标节点返回一个 PONG
clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
}

...
}

void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {

// 记录这条消息中包含了多少个节点的信息
uint16_t count = ntohs(hdr->count);

// 指向第一个节点的信息
clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;

// 取出发送者
clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);

// 遍历所有节点的信息
while(count--) {
sds ci = sdsempty();

// 分析节点的 flag
uint16_t flags = ntohs(g->flags);

// 信息节点
clusterNode *node;

...

/* Update our state accordingly to the gossip sections */
// 使用消息中的信息对节点进行更新
node = clusterLookupNode(g->nodename);
// 节点已经存在于当前节点
if (node) {
/* We already know this node.
Handle failure reports, only when the sender is a master. */
// 如果 sender 是一个主节点,那么我们需要处理下线报告
if (sender && nodeIsMaster(sender) && node != myself) {
// 节点处于 FAIL 或者 PFAIL 状态
if (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)) {

// 添加 sender 对 node 的下线报告
if (clusterNodeAddFailureReport(node,sender)) {
redisLog(REDIS_VERBOSE,
"Node %.40s reported node %.40s as not reachable.",
sender->name, node->name);
}

// 尝试将 node 标记为 FAIL
markNodeAsFailingIfNeeded(node);

// 节点处于正常状态
} else {

// 如果 sender 曾经发送过对 node 的下线报告
// 那么清除该报告
if (clusterNodeDelFailureReport(node,sender)) {
redisLog(REDIS_VERBOSE,
"Node %.40s reported node %.40s is back online.",
sender->name, node->name);
}
}
}

/* If we already know this node, but it is not reachable, and
* we see a different address in the gossip section, start an
* handshake with the (possibly) new address: this will result
* into a node address update if the handshake will be
* successful. */
// 如果节点之前处于 PFAIL 或者 FAIL 状态
// 并且该节点的 IP 或者端口号已经发生变化
// 那么可能是节点换了新地址,尝试对它进行握手
if (node->flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL) &&
(strcasecmp(node->ip,g->ip) || node->port != ntohs(g->port)))
{
clusterStartHandshake(g->ip,ntohs(g->port));
}

// 当前节点不认识 node
} else {
/* If it's not in NOADDR state and we don't have it, we
* start a handshake process against this IP/PORT pairs.
*
* 如果 node 不在 NOADDR 状态,并且当前节点不认识 node
* 那么向 node 发送 HANDSHAKE 消息。
*
* Note that we require that the sender of this gossip message
* is a well known node in our cluster, otherwise we risk
* joining another cluster.
*
* 注意,当前节点必须保证 sender 是本集群的节点,
* 否则我们将有加入了另一个集群的风险。
*/
if (sender &&
!(flags & REDIS_NODE_NOADDR) &&
!clusterBlacklistExists(g->nodename))
{
clusterStartHandshake(g->ip,ntohs(g->port));
}
}

/* Next node */
// 处理下个节点的信息
g++;
}
}

3、C添加下线报告之后,会进行B节点的客观下线状态(FAIL)判定。
当集群中有超过半数的节点都认为节点B处于PFAIL后才会判断B为FAIL,且需要注意的是,A将PFAIL通知给C后,C自己本身也得认为B处于PFAIL状态才会开始客观下线判定。
当C认为B正式FAIL后,它就会立刻向集群所有节点广播这个消息。
RedisCluster故障发现3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/* This function checks if a given node should be marked as FAIL.
* It happens if the following conditions are met:
*
* 此函数用于判断是否需要将 node 标记为 FAIL 。
*
* 将 node 标记为 FAIL 需要满足以下两个条件:
*
* 1) We received enough failure reports from other master nodes via gossip.
* Enough means that the majority of the masters signaled the node is
* down recently.
* 有半数以上的主节点将 node 标记为 PFAIL 状态。
* 2) We believe this node is in PFAIL state.
* 当前节点也将 node 标记为 PFAIL 状态。
*
* If a failure is detected we also inform the whole cluster about this
* event trying to force every other node to set the FAIL flag for the node.
*
* 如果确认 node 已经进入了 FAIL 状态,
* 那么节点还会向其他节点发送 FAIL 消息,让其他节点也将 node 标记为 FAIL 。
*
* Note that the form of agreement used here is weak, as we collect the majority
* of masters state during some time, and even if we force agreement by
* propagating the FAIL message, because of partitions we may not reach every
* node. However:
*
* 注意,集群判断一个 node 进入 FAIL 所需的条件是弱(weak)的,
* 因为节点们对 node 的状态报告并不是实时的,而是有一段时间间隔
* (这段时间内 node 的状态可能已经发生了改变),
* 并且尽管当前节点会向其他节点发送 FAIL 消息,
* 但因为网络分裂(network partition)的问题,
* 有一部分节点可能还是会不知道将 node 标记为 FAIL 。
*
* 不过:
*
* 1) Either we reach the majority and eventually the FAIL state will propagate
* to all the cluster.
* 只要我们成功将 node 标记为 FAIL ,
* 那么这个 FAIL 状态最终(eventually)总会传播至整个集群的所有节点。
* 2) Or there is no majority so no slave promotion will be authorized and the
* FAIL flag will be cleared after some time.
* 又或者,因为没有半数的节点支持,当前节点不能将 node 标记为 FAIL ,
* 所以对 FAIL 节点的故障转移将无法进行, FAIL 标识可能会在之后被移除。
*
*/
void markNodeAsFailingIfNeeded(clusterNode *node) {
int failures;

// 标记为 FAIL 所需的节点数量,需要超过集群节点数量的一半
int needed_quorum = (server.cluster->size / 2) + 1;

if (!nodeTimedOut(node)) return; /* We can reach it. */
if (nodeFailed(node)) return; /* Already FAILing. */

// 统计将 node 标记为 PFAIL 或者 FAIL 的节点数量(不包括当前节点)
failures = clusterNodeFailureReportsCount(node);

/* Also count myself as a voter if I'm a master. */
// 如果当前节点是主节点,那么将当前节点也算在 failures 之内
if (nodeIsMaster(myself)) failures++;
// 报告下线节点的数量不足节点总数的一半,不能将节点判断为 FAIL ,返回
if (failures < needed_quorum) return; /* No weak agreement from masters. */

redisLog(REDIS_NOTICE,
"Marking node %.40s as failing (quorum reached).", node->name);

/* Mark the node as failing. */
// 将 node 标记为 FAIL
node->flags &= ~REDIS_NODE_PFAIL;
node->flags |= REDIS_NODE_FAIL;
node->fail_time = mstime();

/* Broadcast the failing node name to everybody, forcing all the other
* reachable nodes to flag the node as FAIL. */
// 如果当前节点是主节点的话,那么向其他节点发送报告 node 的 FAIL 信息
// 让其他节点也将 node 标记为 FAIL
if (nodeIsMaster(myself)) clusterSendFail(node->name);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
}

4、当C标记了B为FAIL状态,则它会广播到整个集群中的所有节点(包括子节点),其他节点都会更新自己维护的节点B的状态信息为FAIL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/* Send a FAIL message to all the nodes we are able to contact.
*
* 向当前节点已知的所有节点发送 FAIL 信息。
*/
void clusterSendFail(char *nodename) {
unsigned char buf[sizeof(clusterMsg)];
clusterMsg *hdr = (clusterMsg *) buf;

// 创建下线消息
clusterBuildMessageHdr(hdr, CLUSTERMSG_TYPE_FAIL);

// 记录命令
memcpy(hdr->data.fail.about.nodename, nodename, REDIS_CLUSTER_NAMELEN);

// 广播消息
clusterBroadcastMessage(buf, ntohl(hdr->totlen));
}

/* Send a message to all the nodes that are part of the cluster having
* a connected link.
*
* 向节点连接的所有其他节点发送信息。
*/
void clusterBroadcastMessage(void *buf, size_t len) {
dictIterator *di;
dictEntry *de;

// 遍历所有已知节点
di = dictGetSafeIterator(server.cluster->nodes);
while ((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

// 不向未连接节点发送信息
if (!node->link) continue;

// 不向节点自身或者 HANDSHAKE 状态的节点发送信息
if (node->flags & (REDIS_NODE_MYSELF | REDIS_NODE_HANDSHAKE))
continue;

// 发送信息
clusterSendMessage(node->link, buf, len);
}
dictReleaseIterator(di);
}

子节点选举(故障迁移)

1、当B的两个子节点接收到B的FAIL状态消息时,它们会更新自己本地内存中的集群状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int clusterProcessPacket(clusterLink *link) {

...

// 这是一条 FAIL 消息: sender 告知当前节点,某个节点已经进入 FAIL 状态。
else if (type == CLUSTERMSG_TYPE_FAIL) {
clusterNode *failing;

if (sender) {

// 获取下线节点的消息
failing = clusterLookupNode(hdr->data.fail.about.nodename);
// 下线的节点既不是当前节点,也没有处于 FAIL 状态
if (failing &&
!(failing->flags & (REDIS_NODE_FAIL | REDIS_NODE_MYSELF))) {
redisLog(REDIS_NOTICE,
"FAIL message received from %.40s about %.40s",
hdr->sender, hdr->data.fail.about.nodename);

// 打开 FAIL 状态
failing->flags |= REDIS_NODE_FAIL;
failing->fail_time = mstime();
// 关闭 PFAIL 状态
failing->flags &= ~REDIS_NODE_PFAIL;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG |
CLUSTER_TODO_UPDATE_STATE);
}
} else {
redisLog(REDIS_NOTICE,
"Ignoring FAIL message from unknonw node %.40s about %.40s",
hdr->sender, hdr->data.fail.about.nodename);
}

}

...

}

2、随后,在clusterCron定时任务中就会开始发起故障迁移,竞选成为新的Master

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
void clusterCron(void) {

...

/* Abourt a manual failover if the timeout is reached. */
manualFailoverCheckTimeout();

// 如果当前节点是子节点
if (nodeIsSlave(myself)) {
clusterHandleManualFailover();
// 处理集群子节点的故障迁移
clusterHandleSlaveFailover();

/* If there are orphaned slaves, and we are a slave among the masters
* with the max number of non-failing slaves, consider migrating to
* the orphaned masters. Note that it does not make sense to try
* a migration if there is no master with at least *two* working
* slaves. */
if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
clusterHandleSlaveMigration(max_slaves);
}

// 更新集群状态
if (update_state || server.cluster->state == REDIS_CLUSTER_FAIL)
clusterUpdateState();
}

/* This function is called if we are a slave node and our master serving
* a non-zero amount of hash slots is in FAIL state.
*
* 如果当前节点是一个从节点,并且它正在复制的一个负责非零个槽的主节点处于 FAIL 状态,
* 那么执行这个函数。
*
* The gaol of this function is:
*
* 这个函数有三个目标:
*
* 1) To check if we are able to perform a failover, is our data updated?
* 检查是否可以对主节点执行一次故障转移,节点的关于主节点的信息是否准确和最新(updated)?
* 2) Try to get elected by masters.
* 选举一个新的主节点
* 3) Perform the failover informing all the other nodes.
* 执行故障转移,并通知其他节点
*/
void clusterHandleSlaveFailover(void) {
mstime_t data_age;
mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
int needed_quorum = (server.cluster->size / 2) + 1;
int manual_failover = server.cluster->mf_end != 0 &&
server.cluster->mf_can_start;
int j;
mstime_t auth_timeout, auth_retry_time;

server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;

/* Compute the failover timeout (the max time we have to send votes
* and wait for replies), and the failover retry time (the time to wait
* before waiting again.
*
* Timeout is MIN(NODE_TIMEOUT*2,2000) milliseconds.
* Retry is two times the Timeout.
*/
auth_timeout = server.cluster_node_timeout * 2;
if (auth_timeout < 2000) auth_timeout = 2000;
auth_retry_time = auth_timeout * 2;

/* Pre conditions to run the function, that must be met both in case
* of an automatic or manual failover:
* 1) We are a slave.
* 2) Our master is flagged as FAIL, or this is a manual failover.
* 3) It is serving slots. */
if (nodeIsMaster(myself) ||
myself->slaveof == NULL ||
(!nodeFailed(myself->slaveof) && !manual_failover) ||
myself->slaveof->numslots == 0)
return;

...

}

3、资格检查
Slave节点会不停的与Master节点通信来复制Master节点的数据,如果一个Slave节点长时间不与Master节点通信,那么很可能意味着该Slave节点上的数据已经落后Master节点过多(因为Master节点再不停的更新数据但是Slave节点并没有随之更新)。Redis认为,当一个Slave节点过长时间不与Master节点通信,那么该节点就不具备参与竞选的资格。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void clusterHandleSlaveFailover(void) {

...

/* Set data_age to the number of seconds we are disconnected from
* the master. */
// 将 data_age 设置为从节点与主节点的断开秒数
if (server.repl_state == REDIS_REPL_CONNECTED) {
data_age = (mstime_t) (server.unixtime - server.master->lastinteraction)
* 1000;
} else {
data_age = (mstime_t) (server.unixtime - server.repl_down_since) * 1000;
}

/* Remove the node timeout from the data age as it is fine that we are
* disconnected from our master at least for the time it was down to be
* flagged as FAIL, that's the baseline. */
// node timeout 的时间不计入断线时间之内
if (data_age > server.cluster_node_timeout)
data_age -= server.cluster_node_timeout;

/* Check if our data is recent enough. For now we just use a fixed
* constant of ten times the node timeout since the cluster should
* react much faster to a master down.
*
* Check bypassed for manual failovers. */
// 检查这个从节点的数据是否较新:
// 目前的检测办法是断线时间不能超过 node timeout 的十倍
if (data_age >
((mstime_t)server.repl_ping_slave_period * 1000) +
(server.cluster_node_timeout * REDIS_CLUSTER_SLAVE_VALIDITY_MULT))
{
if (!manual_failover) return;
}

...

}

4、休眠时间计算
B的所有子节点(B1、B2)在判断自己具备选举资格时,就开始执行竞选,竞选协议是Raft,选举过程中,所有参与选举的节点首先随机休眠一段时间。
整个休眠时间由两个部分组成:

1
DELAY = 500 milliseconds + random delay between 0 and 500 milliseconds + SLAVE_RANK * 1000 milliseconds.
  • 一部分为固定的500ms时间,这500ms主要是为了等待集群状态同步。上面提到节点C会向集群所有节点广播消息,那么这500ms就是等待确保集群的所有节点都收到了消息并更新了状态。
  • 另一部分主要是一个随机的时间加上由该Slave节点的排名决定的附加时间。每个slave都会记录自己从主节点同步数据的复制偏移量。复制偏移量越大,说明该节点与主节点数据保持的越一致。那么显然我们选举的时候肯定是想选状态更新最近的子节点,所以我们按照更新状态的排序来确定休眠时间的附加部分。状态更新最近的节点SLAVE_RANK排名为1,那么其休眠的时间相应的也最短,也就意味着该节点最有可能获得大部分选票。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    void clusterHandleSlaveFailover(void) {

    ...

    /* If the previous failover attempt timedout and the retry time has
    * elapsed, we can setup a new one. */
    if (auth_age > auth_retry_time) {
    server.cluster->failover_auth_time = mstime() +
    500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
    random() % 500; /* Random delay between 0 and 500 milliseconds. */
    server.cluster->failover_auth_count = 0;
    server.cluster->failover_auth_sent = 0;
    server.cluster->failover_auth_rank = clusterGetSlaveRank();
    /* We add another delay that is proportional to the slave rank.
    * Specifically 1 second * rank. This way slaves that have a probably
    * less updated replication offset, are penalized. */
    server.cluster->failover_auth_time +=
    server.cluster->failover_auth_rank * 1000;
    /* However if this is a manual failover, no delay is needed. */
    if (server.cluster->mf_end) {
    server.cluster->failover_auth_time = mstime();
    server.cluster->failover_auth_rank = 0;
    }
    redisLog(REDIS_WARNING,
    "Start of election delayed for %lld milliseconds "
    "(rank #%d, offset %lld).",
    server.cluster->failover_auth_time - mstime(),
    server.cluster->failover_auth_rank,
    replicationGetSlaveOffset());
    /* Now that we have a scheduled election, broadcast our offset
    * to all the other slaves so that they'll updated their offsets
    * if our offset is better. */
    clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
    return;
    }

    ...

    /* Return ASAP if we can't still start the election. */
    // 如果执行故障转移的时间未到,先返回
    if (mstime() < server.cluster->failover_auth_time) return;

    ...

    }
    5、发起拉票 & 选举投票
    B1唤醒后,会向其他所有节点发送拉票请求,即CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST类型的消息。
    其他主节点接收到拉票请求,且此时它还没有投出自己的票,则会将自己票投给发请求的B1,即回复FAILOVER_AUTH_ACK消息。
    其他子节点没有投票的资格,因此即使接收到CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST类型消息也会直接忽略。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    void clusterHandleSlaveFailover(void) {

    ...

    /* Ask for votes if needed. */
    // 向其他节点发送故障转移请求
    if (server.cluster->failover_auth_sent == 0) {

    // 增加配置纪元
    server.cluster->currentEpoch++;

    // 记录发起故障转移的配置纪元
    server.cluster->failover_auth_epoch = server.cluster->currentEpoch;

    redisLog(REDIS_WARNING, "Starting a failover election for epoch %llu.",
    (unsigned long long) server.cluster->currentEpoch);

    // 向其他所有节点发送信息,看它们是否支持由本节点来对下线主节点进行故障转移
    clusterRequestFailoverAuth();

    // 打开标识,表示已发送信息
    server.cluster->failover_auth_sent = 1;

    // TODO:
    // 在进入下个事件循环之前,执行:
    // 1)保存配置文件
    // 2)更新节点状态
    // 3)同步配置
    clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG |
    CLUSTER_TODO_UPDATE_STATE |
    CLUSTER_TODO_FSYNC_CONFIG);
    return; /* Wait for replies. */
    }

    ...

    }
    6、替换节点(failover)
    当子节点接收到来自其他节点的ACK消息时会统计自己获得的票数,当达到集群Master总数的一半以上时,就会开始执行failover,即替换自己的主节点。
    首先标记自己为主节点,然后将原来由节点B负责的slots标记为由自己负责,最后向整个集群广播现在自己是Master同时负责旧Master所有slots的信息。其他节点接收到该信息后会更新自己维护的B1的状态并标记B1为主节点,将节点B负责的slots的负责节点设置为B1节点。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    void clusterHandleSlaveFailover(void) {

    ...

    /* Check if we reached the quorum. */
    // 如果当前节点获得了足够多的投票,那么对下线主节点进行故障转移
    if (server.cluster->failover_auth_count >= needed_quorum) {
    // 旧主节点
    clusterNode *oldmaster = myself->slaveof;

    redisLog(REDIS_WARNING,
    "Failover election won: I'm the new master.");

    /* We have the quorum, perform all the steps to correctly promote
    * this slave to a master.
    *
    * 1) Turn this node into a master.
    * 将当前节点的身份由从节点改为主节点
    */
    clusterSetNodeAsMaster(myself);
    // 让从节点取消复制,成为新的主节点
    replicationUnsetMaster();

    /* 2) Claim all the slots assigned to our master. */
    // 接收所有主节点负责处理的槽
    for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
    if (clusterNodeGetSlotBit(oldmaster, j)) {
    // 将槽设置为未分配的
    clusterDelSlot(j);
    // 将槽的负责人设置为当前节点
    clusterAddSlot(myself, j);
    }
    }

    /* 3) Update my configEpoch to the epoch of the election. */
    // 更新集群配置纪元
    myself->configEpoch = server.cluster->failover_auth_epoch;

    /* 4) Update state and save config. */
    // 更新节点状态
    clusterUpdateState();
    // 并保存配置文件
    clusterSaveConfigOrDie(1);

    /* 5) Pong all the other nodes so that they can update the state
    * accordingly and detect that we switched to master role. */
    // 向所有节点发送 PONG 信息
    // 让它们可以知道当前节点已经升级为主节点了
    clusterBroadcastPong(CLUSTER_BROADCAST_ALL);

    /* 6) If there was a manual failover in progress, clear the state. */
    // 如果有手动故障转移正在执行,那么清理和它有关的状态
    resetManualFailover();
    }
    }

配置更新

上边我们已经通过故障发现和子节点选举机制用B1这个子节点替换掉了它的Master节点B,那么留下来的节点B和B2应该怎么处理呢?实际上Redis会让它们变成B1的Slave节点。
1、对B2来说,B1升级成Master后会给B2发送消息,让它知道自己已经升级成Master了。

1
2
3
4
5
6
7
8
9
10
11
12
void clusterHandleSlaveFailover(void) {

...

/* 5) Pong all the other nodes so that they can update the state
* accordingly and detect that we switched to master role. */
// 向所有节点发送 PONG 信息
// 让它们可以知道当前节点已经升级为主节点了
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);

...
}

2、对B来说,B1已经成为了Master,B从故障恢复后再次加入集群时,会成为B1的Slave。

Cluster数据丢失隐患及处理方案

一种数据丢失的场景是主从复制时Master挂掉了,这点我在《Redis 复制》讨论过。
另一种数据丢失的场景存在于Cluster集群中,并且并不是特别容易出现,也就是Cluster发生了脑裂,分区恢复时脑裂期间的数据被覆盖:

  1. 主节点挂掉了,从节点选举出了一个新的主节点,但是此时客户端还在与老主节点通信,将数据写入到老的主节点上;

    这种情况是可能发生的,因为客户端会记忆槽所在的节点,而不是每次请求都通过重定向定位到槽实际所在的节点上。

  2. 之后主从切换成功后,老的主节点会成功新主节点的Slave,并从新的主节点上获取数据,这时该节点上的数据会被清空,从而导致数据丢失。

为什么会发生脑裂?

在《Redis核心技术与实战》中提到了一种会导致脑裂的情况:

  1. Slave会定时地PING Master,发生的错误达到一定时间会被标记为主观下线,当标记主观下线的次数达到规定数量后,标记为客观下线;
  2. 但是Master实际上是“假故障”,即虽然响应Slave的心跳失败了,但是客户端还是可以和Master正常通信的。
    比如宿主机上有一些其他进程将CPU打满了,在打满期间,Slave就会有可能将Master判断为下线,开始选举及主从切换。

为什么脑裂会导致数据丢失?

主从切换后,从库会升级为新主库,这时如果老主库重新上线了,会成为新主库的Slave,执行全量同步,而全量同步执行的最后阶段,需要清空本地的数据,加载新主库发送过来的RDB文件,这期间写入的数据就会丢失了。

如何解决这种脑裂问题?

可以通过两个配置来解决这个脑裂问题:

  • min-slaves-to-write
    这个配置项设置了主库能进行数据同步的最少从库数量
  • min-slaves-to-write
    min-slaves-max-lag:这个配置项设置了主从库间进行数据复制时,从库给主库发送 ACK 消息的最大延迟(以秒为单位)

我们可以把 min-slaves-to-write 和 min-slaves-max-lag 这两个配置项搭配起来使用,分别给它们设置一定的阈值,假设为 N 和 T。这两个配置项组合后的要求是,主库连接的从库中至少有 N 个从库,和主库进行数据复制时的 ACK 消息延迟不能超过 T 秒,否则,主库就不会再接收客户端的请求了。即使原主库是假故障,它在假故障期间也无法响应哨兵心跳,也不能和从库进行同步,自然也就无法和从库进行 ACK 确认了。这样一来,min-slaves-to-write 和 min-slaves-max-lag 的组合要求就无法得到满足,原主库就会被限制接收客户端请求,客户端也就不能在原主库中写入新数据了。等到新主库上线时,就只有新主库能接收和处理客户端请求,此时,新写的数据会被直接写到新主库中。而原主库会被哨兵降为从库,即使它的数据被清空了,也不会有新数据丢失。
举个例子:假设我们将min-slaves-to-write 设置为 1,把 min-slaves-max-lag 设置为 12s,把哨兵的 down-after-milliseconds 设置为 10s,主库因为某些原因卡住了 15s,导致哨兵判断主库客观下线,开始进行主从切换。同时,因为原主库卡住了 15s,没有一个从库能和原主库在 12s 内进行数据复制,原主库也无法接收客户端请求了。这样一来,主从切换完成后,也只有新主库能接收请求,不会发生脑裂,也就不会发生数据丢失的问题了。

数据倾斜

Cluster集群通过CRC16算法将key hash到节点槽上,这个过程还是存在很多不确定性,可能很多数据会被hash到固定的某几个槽上,造成数据分布的不均匀,或者某些key是热点数据,被访问得尤其频繁。

数据倾斜的危害

数据倾斜的危害主要是保存热点数据的节点处理压力会增大,速度变慢,甚至内存资源耗尽而崩溃。

数据倾斜的成因

数据倾斜的成因主要有3个:

  1. bigkey
    bigkey一般是value值很大的string或保存了大量对象的集合类型。
    bigkey可能会造成实例IO线程阻塞,影响其他请求的执行效率。
    为了处理bigkey,设计的时候最好避免把过多的数据保存在同一个键值对中,如果是集合类型,还可以把bigkey拆分成多个小的集合类型数据,分散保存在不同的实例上。
  2. slot分配不均衡
    如果没有均衡地分配slot,就会有大量的数据被分配到同一个slot中,而同一个slot只会在一个实例上分布,并导致大量数据被集中到同一个实例上。
  3. Hash Tag
    hash tag指针对key的某个部分进行hash,比如user:123,可以加上hash tag后变成user:{123},只针对123进行hash。
    hash tag的意义主要在于可以将同类的数据hash到同一个槽上,便于范围查询。
    hash tag的缺点也在于分布到同一槽内后,对该槽所在节点的压力会变大。

数据倾斜的解决办法

数据倾斜可以通过重分配slot来解决。
但是热点数据往往是少部分数据被频繁访问,这种情况下重分配slot是无法解决的,为此可以通过热点数据多副本的方法来解决,比如同一key添加一个前缀然后hash到其他slot上。
但是多副本只能用于只读热点key,对于有读有写的热点数据,就只能给实例本身增加资源了,比如改成配置更高的机器。

QA

  1. Redis Cluster哈希槽通过CRC16算法将key哈希到实例上的槽,这样做有什么好处?为什么不直接用一张表来存储key和哈希槽之间的对应关系?
    如果用一张关系表来做映射,问题太多了,比如:key太多了怎么存关系?集群扩容、缩容、故障转移时怎么修改key和实例间的对应关系?
    而引入哈希槽,实际上是将数据和节点解耦,客户端只需关注key被hash到哪个哈希槽,就算打到错误的节点上,也可以通过

参考

  1. Redis 集群规范
  2. redis源码解析
  3. Redis集群详解(上)
  4. Redis集群详解(中)
  5. Redis集群(终篇)
  6. Redis在线数据迁移工具redis-migrate-tool详解,轻松实现redis集群之间的数据同步
  7. CRDT——解决最终一致问题的利器
0%