服务治理——限流

限流器可以用于限制打入系统的流量或限制系统的对外请求频率,这种想法有点类似 TCP 中的拥塞控制。

初衷

限流使用场景

限流可以认为是一种降级,一般是根据后台的负载提前预估的一个阈值(也可以动态调整)。超过了这个值,就要进行一些旁路处理。根据业务形态,会有直接拒绝、延迟处理、保持等待、部分穿透、默认返回等响应方式。
限流没有具体的应用场景,可以说只要有高并发就会有限流的用武之地(类似的还有缓存、降级等):

  1. 一般 web 服务,可以直接拒绝服务。快速响应才是最重要的;
  2. 像一些秒杀、下单等,可以通过排队或者等待解决(部分的);
  3. 像消息消费等,如果没有顺序需求,我觉得,无限等待还可能是个好的方式;
  4. 对于大多数可有可无的业务结果,使用一些默认值直接返回,效果会好的多。虽然是限流,但干的是熔断的事。

还有一些对安全敏感的场景,比如:

  1. 有人想暴力碰撞网站的用户密码;
  2. 有人想攻击某个非常耗费资源的接口;
  3. 有人想从某个接口大量抓取数据。

现成方案存在的缺陷

Guava 内置了一个限流器 RateLimiter ,但是它只能提供单机环境,如果提供同一接口的服务实例变多了或者变少了,流量总量也会相应改变,这样严格意义上流量其实是没有限制住的。
或者,借助 Redis 来实现分布式系统内的限流, Redis 记录单位时间内的请求数,只有未超过阈值的情况下才允许执行,但是这种方案下,每次请求都需要额外访问一次 Redis ,非常浪费时间。
另外,其实还可以和服务发现组件进行配合实现一种分布式限流器,思路如下图:
分布式限流器示意图
服务实例数量变动的时候,均衡每个实例内的限流器流量大小。

算法

常见的经典限流算法有两种:漏桶算法和令牌桶算法,在某些情况下也可以采取更简单的滑动窗口算法。这几种算法描述的都是单位时间内允许通过的流量,并不关注调用什么时候结束,换句话说,它们并不能限制并发度,如果对并发量有限制,可以转为采取类似信号量(Semaphore)的实现方式。

在描述实际的算法之前,先提一句我对限流器的抽象,我将其看作一个资源池,限流算法就像生产者消费者算法,只不过生产者是固定的时间,比如令牌桶就是每经过一段时间就向桶中放入一个单位的资源,而我们的目的就是限制服务器接口从中获取资源来处理自己的逻辑,示意图如下:
ResourcePool示意图

LeakyBucket(漏桶算法)

为了消除突刺现象,可以采用漏桶算法实现限流。不管服务调用方多么不稳定,通过漏桶算法进行限流,每 10 毫秒处理一次请求。因为处理的速度是固定的,请求进来的速度是未知的,可能突然进来很多请求,没来得及处理的请求就先放在桶里,既然是个桶,肯定是有容量上限,如果桶满了,那么新进来的请求就丢弃。
漏桶算法同样存在弊端:无法应对短时间内的突发流量。
实现思路可以是准备一个队列,用来保存请求,另外通过一个线程池定期从队列中获取请求并执行,可以一次性获取多个并发执行。
LeakyBucket示意图

  • leakyRate 表示资源消耗速度,即单位时间可获取的资源数,或者说单位时间内可以执行的任务数;
  • capacity 表示可以预留的资源数,或者说可以排队的任务数。

acquire 时没有足量的资源,有两种策略:

  • TRAFFIC_SHAPINGsleep 至资源足够的时间点;
  • TRAFFIC_POLICING ,直接放弃该任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/**
* 容量阈值
*/
private final double capacity;

/**
* 漏水速率,单位为s
*/
private double leakyRate;

/**
* 超出阈值后执行策略
*/
private final OverCapPolicyEnum overCapPolicy;

/**
* 1. 无锁算法
* 2. Long 和 Double 间的转换
*
* @see [Java: is there no AtomicFloat or AtomicDouble?](https://stackoverflow.com/questions/5505460/java-is-there-no-atomicfloat-or-atomicdouble)
*/
private volatile AtomicLong freeTime = new AtomicLong(
TimeUtil.getTime());

@Override
public void acquire() throws ResAllocException {
acquire(1);
}

/**
* 比如 capacity = 3 , leakyRate = 0.(3) ,则每秒能攫取 0.(3) 单位的资源,且最多有 3 单位的资源待取。
* 此时若 freeTime = now + 2 秒,调用 acquire(1) 表示获取一个单位资源,则共被占用 1.(6) 单位资源, freeTime 更新为 5 秒
*
* @param count 获取资源数量
*/
@Override
public void acquire(double count) throws ResAllocException {
checkArg(count > 0, "攫取资源数必须大于0");
while (true) {
long curTime = TimeUtil.getTime();
long curFreeTime = freeTime.get();
long nextFreeTime =
max(TimeUtil.getTime(), curFreeTime)
+ (long) (count / leakyRate * TIME_BASE);
// 检查流量是否超出“桶”的容量
if (isOverCap(curTime, curFreeTime, count)) {
// 本次获取资源将导致"溢出",根据配置执行对应策略
applyPolicy(curTime, curFreeTime, count);
continue;
}
// 更新时间
if (freeTime.compareAndSet(curFreeTime, nextFreeTime)) {
// 设置成功,休眠到下一个空闲时间
UninterruptiblesUtil.sleepUninterruptibly(max(curFreeTime - curTime, 0));
break;
}
}
}

private void applyPolicy(long curTime, long curFreeTime, double count) throws ResAllocException {
if (OverCapPolicyEnum.TRAFFIC_SHAPING == overCapPolicy) {
// 等到桶中留出足够的空间
UninterruptiblesUtil.sleepUninterruptibly(
(long) (count / leakyRate * TIME_BASE) - (curFreeTime - curTime));
} else if (OverCapPolicyEnum.TRAFFIC_POLICING == overCapPolicy) {
throw new ResAllocException("[流量超出阈值]抛弃");
} else {
// won't happen...
throw new ResAllocException("[流量超出阈值]未知策略");
}
}

private boolean isOverCap(long curTime, long curFreeTime, double count) {
// 已有部分资源被占用,且占用的部分超过阈值
long occupiedTime = curFreeTime > curTime ? curFreeTime - curTime : 0;
double occupiedRes =
leakyRate * occupiedTime / TIME_BASE;
return occupiedRes + count > capacity;
}

TokenBucket(令牌桶算法)

从某种意义上讲,令牌桶算法是对漏桶算法的一种改进,桶算法能够限制请求调用的速率,而令牌桶算法能够在限制调用的平均速率的同时还允许一定程度的突发调用。
在令牌桶算法中,存在一个桶,用来存放固定数量的令牌。算法中存在一种机制,以一定的速率往桶中放令牌。每次请求调用需要先获取令牌,只有拿到令牌,才有机会继续执行,否则选择选择等待可用的令牌、或者直接拒绝。
实现令牌桶时可以准备一个队列,用来保存令牌,另外通过一个线程池定期生成令牌放到队列中,每来一个请求,就从队列中获取一个令牌,并继续执行。
Guava 中的RateLimiter就是令牌桶的一种实现。
TokenBucket示意图

  • addingRate 令牌添加速率。

acquire 时如果有足够的令牌则可以继续执行,否则同样有两种策略:

  • TRAFFIC_SHAPINGsleep 至令牌足够的时间点;
  • TRAFFIC_POLICING ,直接放弃该任务。

令牌桶与漏桶最大的区别是令牌桶能缓存令牌,以应对网络中的突发流量,可以看作网络中的缓存。并且这个令牌的积累过程是随着时间慢慢增加的,可以看作一种预热的过程——服务器在刚启动时总是会因为处理一些初始化任务而非常繁忙。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* 令牌添加速率
*/
private double addingRate;

/**
* 超出阈值后执行策略
*/
private final OverCapPolicyEnum overCapPolicy;

private volatile AtomicLong freeTime = new AtomicLong(TimeUtil.getTime());

private static CircularSequence<Long> circularSequence =
new CircularSequence<>(SequenceUtil.genContinuousLongSeq(1L, 100L, 10L));

@Override
public void acquire() throws ResAllocException {
acquire(1);
}

/**
* @param count 获取令牌数量
*/
@Override
public void acquire(double count) throws ResAllocException {
checkArg(count > 0, "获取令牌数必须大于0");
while (true) {
long curTime = TimeUtil.getTime();
long curFreeTime = this.freeTime.get();
long nextFreeTime = curFreeTime + (long) (count / addingRate * TIME_BASE);
// 检查“桶”中令牌是否满足当前请求
if (isOverCap(curTime, curFreeTime, count)) {
applyPolicy(curTime, curFreeTime, count);
continue;
}
// 更新时间
if (freeTime.compareAndSet(curFreeTime, nextFreeTime)) {
break;
}
}
}

private void applyPolicy(long curTime, long freeTime, double count) throws ResAllocException {
if (OverCapPolicyEnum.TRAFFIC_SHAPING == overCapPolicy) {
// 加个抖动,减少竞争(可能会发生某个线程饥饿的情况,但是一般不会那么敏感)
long shake = circularSequence.fetch();
// 等到桶中产生足够的令牌
UninterruptiblesUtil.sleepUninterruptibly(
(long) (count / addingRate * TIME_BASE) - (curTime - freeTime) + shake);
} else if (OverCapPolicyEnum.TRAFFIC_POLICING == overCapPolicy) {
throw new ResAllocException("[流量超出阈值]抛弃");
} else {
// won't happen...
throw new ResAllocException("[流量超出阈值]未知策略");
}
}

private boolean isOverCap(long curTime, long freeTime, double count) {
// 计算“桶”中当前存在的令牌,必须满足 freeTime < curTime ,因为桶中资源不能“透支”
long remainingTime = freeTime < curTime ? curTime - freeTime : 0;
double remainingTokens = addingRate * remainingTime / TIME_BASE;
return count > remainingTokens;
}

限流原理

限流维度

这属于应用的范畴,应该由限流器的使用者来关注,即限流可以在不同维度执行,比如:

  1. 对请求的 URL 进行限流;
  2. 对客户端的访问 IP 进行限流;
  3. 对某些特定用户或者用户组进行限流;
  4. 多维度混合的限流。

负载变化

  1. 压力普通,正常服务,耗时正常 。
  2. 压力上升,服务开始出现大面积超时,由于使用不公平锁竞争,偶尔会有正常耗时的需求。
  3. 压力继续增大,服务器开始进入假死状态,几乎不能再接受新的请求。

并发度评估

限流器的参数设置很多时候没有一个固定的套路,但是回到开头的讨论:

…限流器的作用是限制吞吐率…

那么怎么才能正确评估系统的吞吐率呢?(注意平均处理时间是吞吐率的倒数)

  1. 评估总访问量
    问运营和产品上线后的总访问量,一般用 PV 等单位估算。
  2. 评估平均访问量 QPS
    总量除以时间即可,一天 86400 秒,一般认为请求发生在白天,即 4w 秒。
  3. 评估高峰 QPS
    系统容量规划时,不能只考虑平均 QPS,而是要抗住高峰的 QPS,如何知道高峰 QPS 呢?根据业务特性,通过业务访问曲线评估。
    有一些业务例如“秒杀业务”比较难画出业务访问趋势图,这类业务的容量评估不在此列。
  4. 评估系统、单机极限 QPS
    压力测试
  5. 提出方案
    比较上述计算出的所需服务器数量和当前线上已有服务器数量的差值,判断需要加几台机器。

但是这种评估没有权衡其他接口的流量,因为并不是只有一个接口在接收请求,只能作为一个调参的参考。

Web 服务器不会因为来了多少个请求就建立多少个连接,一个用户可能会发出多个请求,内核会使用同一个文件描述符来处理。

Little’s Law

Little’s Law 是排队论中的一个定律:一个系统中顾客的长期平均数量 L 等于顾客的长期平均到达速率λ乘以顾客在系统中平均花费的时间 W。用公式表示为:
L = λ W
虽然这个定律看起来很简单,但是这是一个非常有名的结果,因为这种关系“不受到达过程的分布,服务分布,服务顺序,或其他任何因素的影响”。这个结果适用于任何系统,特别是适用于系统内的系统。唯一的要求是系统必须是稳定非抢占式的。
因为这个公式是在普遍意义上成立的,因此也可以在 Web 服务中应用,用于定性描述平均响应时间 W、平均请求数 L、平均吞吐率λ之间的关系。
但是要注意的是稳定的定义,根据 Wiki 上的定义,如果到达速度超过离开速度就代表是一个不稳定的系统,这会造成等待的顾客数量逐渐增加到无穷大。对这种现象的定性描述需要深入研究排队论。

使用 原子变量 而不是 锁

多个线程同时去获取一个限流器的资源的时候,会产生竞态条件,这在漏桶和令牌桶算法的实现中采取的是基于原子操作工具来解决的,下面这段代码是 Java 中原子变量的使用示例:

1
2
3
4
5
6
7
8
AtomicLong atomicLong = new AtomicLong(0);
public void test {
long v, newV;
do {
v = atomicLong.get();
newV = 2 * v;
} while (atomicLong.compareAndSet(v, newV));
}

AtomicXxx 的原理是 CAS ,它是一种无锁算法,相对 Lock 来说一般会更加高效,但是要注意存在多个线程同时去竞争同一原子变量的情况,可能会导致某个线程长时间的在“空转”(有点像进程调度中的“饥饿”现象),但是在限流器场景内发生的概率比较小,因为一个线程拿到资源后会去执行自己的逻辑或者睡眠一段时间,这段时间内它是不会再加入到抢资源的队伍里的。
大并发下 AtomicXxx 会有明显的空转现象,最好使用LongAdder替代,LongAdder 将对一个 AtomicLong 的请求分摊到多个 AtomicLong,其原理类似负载均衡。

最大流量限制

如果需要单机承载上万 / s 的流量,过大的流量会导致响应时间变长,甚至冲垮进程,这时限流的意义就不大了。
缓解单机压力的最常用方法是均摊,在 DNS、反向代理层都可以做流量的分摊,具体如何实现这里就不展开了。

动态限流(自适应)

动态化是一个比较好的想法,可以在调整机器负载后自动反馈到限流器,但是现实环境比较复杂:

  • 需要根据业务需求进行设计,比如一个接口是热点,需要进行限流以提高整体的吞吐率,但是该接口有流量高峰期,可以考虑在这段时间内临时提高限流器的阈值,避免这些请求直接被丢弃了;
  • 需要考虑即时系统负载(CPU、内存、网络连接数、数据库连接数)参数来考虑。
  • 如果不能直接获取机器配置,可以从系统吞吐率、积压的请求数、响应时间、可用率等来间接推导出系统负载。

WarmUp(预热)

为什么要预热

WarmUp,即冷启动/预热的方式。当系统长期处于低水位的情况下,流量突然增加时,直接把系统拉升到高水位可能瞬间把系统压垮。通过”冷启动”,让通过的流量缓慢增加,在一定时间内逐渐增加到阈值上限,给冷系统一个预热的时间,可以避免冷系统被压垮。
如果直接跳过预热,系统很容易出现各种问题,比如:

  1. DB 重启后,瞬间死亡
    一个高并发环境下的 DB,进程死亡后进行重启。由于业务处在高峰期间,上游的负载均衡策略发生了重分配。刚刚启动的 DB 瞬间接受了 1/3 的流量,然后 load 疯狂飙升,直至再无响应。
    原因就是:新启动的 DB,各种 Cache 并没有准备完毕,系统状态与正常运行时截然不同。可能平常 1/10 的量,就能够把它带入死亡。
  2. 服务重启后,访问异常
    另外一个常见的问题是:我的一台服务器发生了问题,由于负载均衡的作用,剩下的机器立马承载了这些请求,运行的很好。当服务重新加入集群时,却发生了大量高耗时的请求,在请求量高的情况下,甚至大批大批的失败。
    引起的原因大概可以归结于:
    • 服务启动后,jvm 并未完全准备完毕,JIT 未编译等。
    • 应用程序使用的各种资源未准备就绪。
    • 负载均衡发生了 rebalance。

添加预热的位置

warmup 最合适的切入层面就是网关。集成在网关中的负载均衡组件,将能够识别出这台刚加入的实例,然后逐步放量到这台机器,直到它能够真正承受高速流量。
但现实情况往往不能满足条件。比如:

  1. 你的应用直接获取了注册中心的信息,然后在客户端组件中进行了流量分配。
  2. 你的应用通过了一些复杂的中间件和路由规则,最终定位到某一台 DB 上。
  3. 你的终端,可能通过了 MQTT 协议,直接连上了 MQTT 服务端。

预热方案 - 接口放量

我们进行一下抽象,可以看到:所有这些流量分配逻辑,包括网关,都可以叫做客户端。即所有的 warmup 逻辑都是放在客户端的,它们都与负载均衡紧密耦合在一起。

  1. 我要能拿到所有要调用资源的集合,以及启动时间,冷启动的配置等。
  2. 给这些资源分配一些权重,比如最大权重为 100,配置 100 秒之后冷启动成功。假如现在是第 15 秒,则总权重就是 100*(n-1)+15。
  3. 根据算好的权重,进行分配,流量会根据时间流逝逐步增加,直到与其他节点等同。
  4. 一个极端情况,我的后端只有 1 个实例,根本就启动不起来。

预热方案 - 走马观花

顾名思义,意思就是把所有的接口都提前访问一遍,让系统对资源进行提前准备。
比如,遍历所有的 http 连接,然后发送请求。
这种方法是部分有效的,一些懒加载的资源会在这个阶段陆续加载进来,但不是全部。
JIT 等一些增强功能,可能使得预热过程变得非常的长,走马观花的方式,只能在一定程度上有作用。
再比如某些 DB,在启动之后,会执行一些非常有特点的 sql,使得 PageCache 里加载到最需要的热数据。

预热方案 - 状态保留

系统在死亡时做一个快照,然后在启动时,原封不动的还原回来。
这个过程就比较魔幻了,因为一般的非正常关闭,系统根本没有机会发表遗言,所以只能定时的,在运行中的系统中做快照。
节点在启动时,再将快照加载到内存中。这在一些内存型的组件中应用广泛。

实现

Guava - RateLimiter 中的 SmoothWarmingUp 提供了预热的功能,预热的功能主要是为了应对突发的流量:在没有预热的情况下,如果突发的大量流量直接打到后台服务器,如果后台服务的缓存陈旧、DB 或其他 IO 操作耗时,就有可能拖垮后台服务。预热属性可以通过设置 warmupPeriod 参数来实现。
另外, RateLimiter 还允许选择 SmoothBursty 或 SmoothWarmingUp 策略,举个例子:
比如当前桶中有 5 个令牌,此时来了一个新请求需要消耗 9 个令牌,那么它需要等待生成 4 个令牌的时间(SmoothBursty),或等待 9 个令牌的时间(SmoothWarmingUp)。
为了简单起见,我在代码里采用的是较简单的实现,令牌的添加速度总是均匀的,没有预热的过程,且没有阈值的限制,当桶中令牌数不足以承载请求时会等待直到足够的令牌被添加进来(或者直接抛出异常)。

请求丢失策略

限流器——特别是基于令牌桶的限流器,当流量超过阈值,一般会提供两种对溢出流量的处理策略,分别是 Traffic ShapingTraffic Policing ,简单来说,前者会暂时拦截住上方水的向下流动、等待容量空闲后再放行,而后者碰到溢出的流量会直接抛弃,这些流量可以通过日志追溯,但是已经没有实际的作用。

分布式限流

正如开头所说,现存的(至少是网上可以随便找到的)一些限流方案不能有效、高效地实现分布式系统的限流,因此我建议使用服务注册中心来注册所有服务,然后动态更新每个业务服务实例内的限流属性。为此,我们先分析一下一般基于 Redis 的限流器实现模式,然后讨论一下服务注册中心的相关问题,最后给出这个限流器的一个大致运作流程。

  • 服务注册中心可以使用 ZooKeeper 、 Eureka 、 Etcd 等技术来实现。
  • 我们需要的是 <ServiceId, ServiceProp> 这样的结构,姑且称为服务注册项,其中,服务 ID 唯一标识一个服务,服务属性包括版本号、实例数量等,不需要记录每个服务的地址等信息。

基于 Redis 的限流器

简而言之就是为每个请求生成一个 key(可以是 IP+接口 URL,也可以直接根据接口 URL 生成),value 变量为计数值,设置过期时间。
需要注意 Redis 的过期策略是混合的:

  • 被动删除:当读 / 写一个已经过期的 key 时,会触发惰性删除策略,直接删除掉这个过期 key;
  • 主动删除:Redis 会定期(默认似乎是 100ms)主动淘汰一批已过期的 key,当已用内存超过限定时,也会触发主动清理策略。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
-- KEYS[1] map key
-- ARGV[1] current time
-- ARGV[2] duration
-- ARGV[3] limitation
-- ARGV[4] precision
-- ARGV[5] permits

local function clear(il, i2, key, count_key, dele)
local sum = 0
for id = il, i2 do
local bkey = count_key .. ":" .. id;
local bcount = redis.call('HGET', key, bkey)
if bcount then
sum = sum + tonumber(bcount)
table.insert(dele, bkey)
end
end
return sum
end

local count_key = "cnt"
local ts_key = "ts"

// 限流记录的 key, 此处的 key 由外部传入, 一般根据我们需要限流的维度来生成. 例如如果是按 ip 对某个 url 做访问限流限制, 则 key 可能是 url:/test:ip:192.168.1.1
local key = KEYS[1]
// 当前时间, 使用服务端 redis 时间, 为了保证分布式情况下时间的一致性, 这里的使用通过 redis.time 获取并传入 lua 脚本
local now = tonumber(ARGV[1])
// 限流的总时长, 例如 1 分钟则是 60 * 1000 ms
local duration = tonumber(ARGV[2])
// 最高流量限制, 例如每分钟 10 次, 则为 10
local limit = tonumber(ARGV[3])
// 限流精度, 例如精度是 1s, 则为 1000 ms, 限流精度也是保证能实现上图红框内限流的关键, 精度越小, 限流越精确, block 数也越多, 占用的内存也越大. 实际上上图的简单限流即是 duration = precision 的一种特殊情况
local precision = tonumber(ARGV[4])
// 本次需要增加多少流量, 对于频率来说一般是 1, 而对于流量来说则是数据流量的字节数
local permits = tonumber(ARGV[5])

// 限流总时长按精度拆分成blocks块
local blocks = math.ceil(duration / precision)
// 当前时间在第几块
local block_id = math.floor(now / precision) % blocks
local last_ts = redis.call('HGET', key, ts_key)
last_ts = last_ts and tonumber(last_ts) or 0
if last_ts ~= 0 then
local decr = 0;
local dele = {}
local last_id = math.floor(last_ts / precision) % blocks
local elapsed = now - last_ts;

if elapsed > duration then
-- clear all
clear(O, blocks - 1, key, count_key, dele)
if permits > 0 then
redis.call('HSET', key, ts_key, now)
redis.call('HINCRBY', key, count_key, permits)
redis.call('HINCRBY', key, count_key .. ":" .. block_id, permits)
redis.call('PEXPIRE', key, duration)
end
return false
elseif block_id > last_id then
decr = decr + clear(last_id + 1, block_id, key, count_key, dele)
elseif block_id < last_id then
decr = decr + clear(O, block_id, key, count_key, dele)
decr = decr + clear(last_id + 1, blocks - 1, key, count_key, dele)
end

local cur
if #dele > 0 then
redis.call('HDEL', key, unpack(dele))
cur = redis.call(eHINCRBY, key, count_key, -decr)
else
cur = redis.call('HGET', key, count_key)
end

if tonumber(cur or '0') + permits > limit then
return true
end
end
if permits > 0 then
redis.call('HSET', key, ts_key, now)
redis.call('HINCRBY', key, count_key, permits)
redis.call('HINCRBY', key, count_key .. ":" .. block_id, permits)
redis.call('PEXPIRE', key, duration)
end
return false
  • redis 集群问题
    由于 redis 是集群环境, 集群环境下实际上直接执行 lua 脚本是有问题的. 试想 lua 脚本内可能涉及到多个 key 的操作, 而 redis 实际执行节点的选择也是通过 key 来选择的. 在多 key 情况下可能会造成 lua 脚本内 key 的执行混乱, 所以我们需要先手动选择好 redis 节点.
    此处我们可以先用限流的 key 将 redis 选择出来, 再将 lua 脚本传到某个 redis 节点执行. 也就是我们必须要可以通过限流 key 唯一确定一个 redis 节点, 例如 url:/test:ip:192.168.1.1 是可以确定使用某个 redis 节点的.
  • 分布式时间问题
    分布式系统需要考虑多客户端时间不一致问题, 此处使用 redis 时间解决.
  • 客户端性能问题
    由于这是一个公用的限流服务, 也就是所有接入该服务的应用的每次请求都会调用该服务, 再加上所有接入服务的应用共用一个 redis, 显然如果客户端使用同步等待限流服务的返回结果并不太合适, 会影响客户端的服务调用性能.
    所以我们可以使用一种折中策略, 即将限流结果保存到本地, 每次请求直接检查本地限流结果是否被限流, 同时使用异步的方式调用限流服务, 并在异步回调中更新限流结果. 这种做法会让限流数据略有延迟, 但是影响不大.
  • 限流服务本身的负载
    作为限流服务, 一个主要的作用是限制恶意流量对正常业务造成冲击, 但如果所有流量都需要经过限流服务, 当流量激增的时候, 谁来保证限流服务自己不被压垮?其实可以添加一个兜底方案:设定一个阈值, 当流量超过某个阈值(一般来讲, 这个阈值可以设置为 机器数 * 限流阈值)时, 直接退化为本地限流.

Push Or Pull

当服务上下线时,首先由上线的服务实例发起请求更新服务注册中心的服务注册项,然后由服务注册中心通知其他相关服务实例,其实这是一种 发布 / 订阅模式

  • 更新服务注册项
    推的模式,由客户端主动发起请求,更新服务注册项。
  • 更新本地限流器
    如果是推的模式——即由服务端主动通知相关的所有服务实例——需要服务端维持和客户端之间的长连接,一个实例是 Zk 中的 Watcher 机制。
    如果是拉的模式——即由客户端定期从服务端拉取服务注册信息——则可能会面临对实时性和性能之间的权衡,如果频率过高可能会影响服务的整体吞吐量,频率过低又会在一定程度上影响实时性。

强一致性 Or 弱一致性

像 ZooKeeper 这样的具有强一致性的 KV 数据库在实践中常被用于实现分布式锁等功能,但是用于服务发现并不合适,因为每当有 Zk 实例上下线时都需要重新选举,此时 Zk 集群将不提供服务(意味着不管访问哪一个实例都一样),提高一致性的代价是牺牲了可用性
如果采用 Zk 作为我们的服务发现提供器,可能会导致服务接口可用性与 Zk 集群健康程度关联紧密,当 Zk 集群重新选举时,集群不能很好地提供服务。
与此相对的,像 Eureka 是专门为微服务架构提供的一种服务发现组件,它采用心跳机制来同步多个副本,没有 Zk 中“至少一半存活才能提供服务”之类的限制,还有一些像“出现大量服务上下线时不修改服务注册表”之类的保底机制,可以有效提高服务的可用性。
总而言之,弱一致性就足够了。

如何实现流量的分摊

每个服务实例在使用限流器时需要指定服务 ID 和流量,这些流量会添加到服务注册项内,然后每个实例在接到更新通知后除以实例数量即可分到平均流量

时间同步

分布式系统中的一个常见问题是多个服务器上的时间不一致,但是在我们的实现中不会有影响,因为就算两台机器的时间差了 1 个小时,它们时间的流速总该是一样的吧?
时间的同步需求在定时任务中涉及到的比较多。

如何实现服务上线

当有两个服务实例同时上线时,它们会竞争同一个服务注册项,在服务集群启动时这个冲突是很频繁的,出于实现上的方便,我更倾向于借助乐观锁来实现同步(如果要采用分布式锁,可能还要起一个锁服务器)。
正如前面所说,服务注册项在更新后需要通知到所有相关的服务实例,经过观察比较,常见的可以用于实现服务发现功能的 KV 存储中间件都提供了 Watcher 机制,即在监听的数据发生变更后注册中心能够主动通知客户端,如果不存在这样的机制,则只能退而求其次选择客户端轮询了。

如何实现服务下线

服务下线可能是非常突然的,比如断电、人为重启等,服务注册中心需要通过某种机制感知到这种情况。
一种常见的实现方式是由服务端维持和客户端之间的长连接,通过心跳检测来检查服务的健康状况。

替代方案及延伸思考

上边的实现考虑的比较多,实际上开发时出于稳妥和降低工作量起见,会采取其他更方便的方案。

Counter(计数器)

单位时间内只允许一定数量的流量通过,类似于令牌桶的简化版。
计数算法不能防止两段相邻时间区间边界处的突发流量:[----o][o----]

SlidingWindow(滑动窗口算法)

和 CounterResPool 思路差不多,但是滑动窗口可以避免两个相邻时间区间边界处的突发流量。
SlidingWindow示意图

信号量

信号量不同于计数器,在结束请求后需要把计数器“加回来”。限流器限制的是吞吐率(单位时间内请求数或者流量),而 Semaphore 限制的则是并发度,因为同时正在执行的请求数是由信号量的初始信号数决定的,我们可以利用这一点来实现并发数的精确控制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static Semaphore semaphore = new Semaphore(100);

public void acquire(Consumer<T> consumer, T arg,
Consumer<T> onAcquireFailed, Consumer<Exception> onException) {
if(!semaphore.tryAcquire()) {
onAcquireFailed.accept(arg);
return ;
}
try {
consumer.accept(arg);
} catch(Exception e) {
onException.accept(e);
} finally {
semaphore.release();
}
}

线程池

1
2
3
4
5
6
7
8
9
10
private final static ExecutorService pool = new ThreadPoolExecutor(100, 100, 1, TimeUnit.MINUTES, new SynchronousQueue<>());

public static <T> T execute(Supplier<T> supplier) {
try {
Future<T> future = pool.submit(supplier::get);
return future.get();
} catch (Exception e) {
return null;
}
}
  • 这种方案无法完美地做到“如果超过 100,超出的请求就直接返回 null 或抛出异常”,哪怕是使用 SynchronousQueue。

计数器

用计数器会有多种方案,而且每一种都有竞态条件,在高并发场景下都会有问题,方案一:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private AtomicInteger counter = new AtomicInteger(0);

public <T> void execute(Consumer<T> consumer, T arg, Consumer<T> onAcquireFailed, Consumer<Exception> onException) {
int v = counter.incrementAndGet();
if(v > 100) {
onAcquireFailed(arg);
return ;
}
try {
supplier.accept(arg);
} catch(Exception e) {
onException(e);
} finally {
counter.decrementAndGet();
}
}

方案二:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private AtomicInteger counter = new AtomicInteger(0);

public <T> void execute(Consumer<T> consumer, T arg, Consumer<T> onAcquireFailed, Consumer<Exception> onException) {
int v = counter.get();
if(v > 100) {
onAcquireFailed(arg);
return ;
}
counter.incrementAndGet();
try {
supplier.accept(arg);
} catch(Exception e) {
onException(e);
} finally {
counter.decrementAndGet();
}
}
  • 方案一在计数器已经达到 100 的情况下如果再进来请求,会有只加不减的情况;
  • 方案二是在先检查计数器后再执行 incrementAndGet,在计数器已经达到 100 的情况下如果并发调用该方法会导致计数器超出 100。

阻塞队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private BlockingQueue<Integer> reqQueue = new ArrayBlockingQueue<>();

public void execute(Consumer<T> consumer, T arg, Consumer<T> onAcquireFailed, Consumer<Exception> onException) {
if(!reqQueue.offer()) {
onAcquireFailed(arg);
return ;
}
try {
consumer.accept(arg);
} catch(Exception e) {
onException(e);
} finally {
reqQueue.poll();
}
}

可配置

上面我们给出的并发限制都是写死的,但是如果上线后发现不合适怎么办?比如容量评估失败,实际上的并发数要求更高、机器效率过剩了,会导致大量请求阻塞,特别是在和三方系统对接的时候。
解决办法是让并发数可配置化,以信号量为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private static Semaphore semaphore;

@ConfigParam("permits") // 1
public void onUpdate(int permits) {
if(permits <= 0) {
throw new IllegalArgumentException("并发数配置必须大于0");
}
semaphore = new Semaphore(permits);
}

public <T> void acquire(Consumer<T> consumer, T arg,
Consumer<T> onAcquireFailed, Consumer<Exception> onException) {
Semaphore semaphore = semaphore; // 2
if (!semaphore.tryAcquire()) { // 3
onAcquireFailed.accept(arg);
return ;
}
try {
consumer.accept(arg);
} catch(Exception e) {
onException.accept(e);
} finally {
semaphore.release(); // 4
}
}
  • 1 处的 @ConfigParam 是一个配置注解,表示从配置中心读取一个属性,它需要能动态感知配置的变化;
  • 2 处需要取到 semaphore 的一个引用,不能直接使用成员变量的引用,因为配置修改后会把对象成员变量的 semaphore 给覆盖掉,这样 3 和 4 处获取和释放的可能不是同一个。

用户访问频率限制

用户体验是一个容易被忽略的设计要点,按照上边的算法的设计思路,虽然ReteLmiter.create(0.1666)这么声明确实可以限制用户 1 分钟内只能访问 10 次,但是因为是均匀的限流、用户在每次点击后都需要再等 6 秒才能进行下一步操作,这给用户的体验当然是非常差的,更好的方案是先给用户点击 10 次的机会,然后再限制他在接下来的时间内无法继续点击。
这个方案不能通过一个本地计数器来实现,因为无法确认用户点击时请求被负载均衡到哪台服务器上,对用户来说,就是有时候点击会提示“您操作过于频繁,请稍后再试”、有时候不会,这同样会降低用户体验。
一般来说这种限流器是通过 Redis 来实现的:

1
2
3
4
5
6
7
String key = buildKey(userId);
String count = redisClient.incr(key);
if(count > countThreshold) {
throw new BizException("您操作过于频繁,请稍后再试");
} else {
redisClient.expire(key, expireTime);
}

当然在判断之前设置也存在问题,这样只要用户点击就会重新刷新过期时间,比如过期时间是 1 分钟,用户在 55 秒的时候又点击了一次,结果又要再等 1 分钟才能继续操作。
解决方案一般是引入 Lua 脚本、将这段逻辑包装成一个脚本批量执行,减少网络出错导致失败的概率,提供一种近似的原子操作,当然随意引入 Redis-Lua 是非常危险的,我暂时没有想到更好的解决方案,如果你有想法,欢迎联系我讨论。

限流器和队列

  • 非公平调度
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

    private Semaphore semaphore = new Semaphore(10);

    private LongAdder counter = new LongAdder();

    private <T> void doAcquire(long expire, Supplier<T> onSucceed, Supplier<T> onFailed) {
    // 判断超时、队列是否已满
    if (System.currentTimeMillis() > expire) {
    onFailed.get();
    return;
    } else if (counter.longValue() > 100) {
    onFailed.get();
    return;
    }
    if (semaphore.tryAcquire()) {
    onSucceed.get();
    return ;
    }
    counter.increment();
    // 进入队列等待下一次调度
    executorService.schedule(() -> {
    counter.decrement();
    doAcquire(expire, onSucceed, onFailed);
    }, 100, TimeUnit.MILLISECONDS);
    }

    public void test() {
    doAcquire(100 + System.currentTimeMillis(),
    () -> "SUCCESS",
    () -> "FAILED");
    }
  • 公平调度
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    void syncDoAcquire(long expire, Supplier onSucceed, Supplier onFailed) {
    try {
    if (semaphore.tryAcquire(expire, TimeUnit.MILLISECONDS)) {
    onSucceed.get();
    return;
    }
    } catch (InterruptedException e) {
    e.printStackTrace();
    onFailed.get();
    }
    }

参考

  1. 谈谈高并发系统的限流
  2. 限流算法之漏桶算法、令牌桶算法
  3. Traffic shaping
  4. Traffic policing
  5. Bandwidth management
  6. Little’s law
  7. 排队论在架构的应用:对服务延迟、稳定性的影响

Guava

  1. Guava RateLimiter 源码解析
  2. RateLimiter 解析(一) ——设计哲学与快速使用

Nginx

  1. Nginx Module ngx_http_limit_req_module

HAProxy

  1. Better Rate Limiting For All with HAProxy

Radis

  1. Redis INCR rate limiter
  2. Redis 命令参考 - INCR key
  3. Introduction to rate limiting with Redis [Part 1]
  4. Introduction to rate limiting with Redis [Part 2]