Sentinel原理总结
Sentinel vs Hystrix
Sentinel概述
- 资源(Resource)
用户需要将业务抽象为资源,用于限流规则的配置。
Sentinel中需要在代码中埋点,表示这些地方需要通过某个规则访问上面定义的业务资源:try-catch
方式(通过SphU.entry(...)
),用户在 catch 块中执行异常处理 / fallbackif-else
方式(通过SphO.entry(...)
),当返回 false 时执行异常处理 / fallback
源码:ResourceWrapper
- 插槽(Slot)
多个插槽会组成一个“责任链”,表示对不同资源的不同处理逻辑。
源码:自定义SlotSlotChainBuilder
,创建位置CtSph#lookProcessChain()
- 上下文(Context)
维护当前调用链的元数据,每次调用SphU.entry()
或SphO.entry()
都需要在一个context中执行,包括:
entranceNode:当前调用链的入口节点
curEntry:当前调用链的当前entry
node:与当前entry所对应的curNode
origin:当前调用链的调用源 - Entry
entry是sentinel中用来表示是否通过限流的一个凭证,由SphU.entry()
或SphO.entry()
返回,如果不通过则会直接抛出一个BlockException
异常。 - Node
Node是一个接口,提供应用服务的实时统计数据。 - Metric
metric是sentinel中用来进行实时数据统计的度量接口。
开始使用Sentinel
Sentinel主要分为客户端和控制台两个部分。
直接使用
接入SpringBoot
搭建控制台
1 | # 下载 |
访问UI界面:http://127.0.0.1:8080/
使用默认的账号「sentinel / sentinel」进行登录。
在控制台可以配置流量控制、熔断降级等规则。
流量控制
引入依赖:
1 | <!-- 实现对 SpringMVC 的自动化配置 --> |
添加拦截器:
1 | @Configuration |
当超过流量阈值时,请求会抛出异常,我们可以定义一个拦截器来统一一下异常信息:
1 | @ControllerAdvice(basePackages = "com.tallate.sentinel.test.controller") |
添加测试接口:
1 | @RestController |
增加配置文件resources/sentinel.properties:
1 | csp.sentinel.dashboard.server=127.0.0.1:7070 |
Application:
1 | @SpringBootApplication |
热点参数限流
针对接口中个别参数配置限流规则。
1 | @GetMapping("/productInfo") |
Feature & 实现原理
每次资源调用都会产生一个Entry对象,Entry对象会创建一系列功能插槽,Sentinel的所有功能都是围绕这些插槽来展开的,下面就按顺序先来介绍一下这些Slot:
- NodeSelectorSlot
负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级; - ClusterBuilderSlot
则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据; - StatisticSlot
则用于记录、统计不同纬度的 runtime 指标监控信息; - FlowSlot
则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制; - AuthoritySlot
则根据配置的黑白名单和调用来源信息,来做黑白名单控制; - DegradeSlot
则通过统计信息以及预设的规则,来做熔断降级; - SystemSlot
则通过系统的状态,例如 load1 等,来控制总的入口流量;
对于我们的项目来说,第一步还得先拦截请求,之后才是处理上面的ProcessorSlot链。
客户端拦截请求
如果是SpringMVC,一般来说是将Sentinel提供的拦截器注册到SpringMVC容器的拦截器链表里:
1 | @Configuration |
当请求进来时,会先被上面定义的拦截器拦截,最终还是调用了SphU.entry
:
1 | public abstract class AbstractSentinelInterceptor implements HandlerInterceptor { |
客户端感知最新配置
当在dashboard增加修改规则后,dashboard会主动将规则推送给对应的客户端。
- 客户端会通过心跳将本实例的地址(IP、端口号)上报到dashboard;
客户端启动时,启动一个定时任务发送心跳:com.alibaba.csp.sentinel.Env
。
调用dashboard的/registry/machine
接口上报实例信息,保存到内存中,类似服务发现。 - 在dashboard更新规则后,dashboard将规则推送给客户端;
更新规则接口:/v1/flow/rule
。
调用客户端暴露的设置规则接口:com.alibaba.csp.sentinel.command.handler.ModifyRulesCommandHandler
。 - 更新客户端的规则
使用监视器模式监听规则的变更情况,当规则发生变更,轮询本地的监视器:com.alibaba.csp.sentinel.property.DynamicSentinelProperty#updateValue
。
比如流控的话就会调用:com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager.FlowPropertyListener#configUpdate
。
NodeSelectorSlot
构建资源树,比如调用某个接口,就会创建一个代表这个接口的Node
放到Context中,如果一条链路中存在多个资源,那么这些资源会按调用的顺序组织成一棵树。
ClusterBuilderSlot
初始化了clusterNode信息,clusterNode主要用于维护资源的运行期间统计信息(响应时间、QPS、block 数目、线程数、异常数等)。
StatisticSlot
统计实时调用情况,比如调用成功多少次、失败多少次等,这些信息被记录到Node
中。
ParamFlowSlot(热点参数限流)
热点统计难题及解决
怎么统计热点?
用来统计的数据结构必须能够仅保存最常被访问的一定数量商品ID,并且可以控制结构的大小、统计量。
guava中的ConcurrentLinkedHashMap
1、支持并发
2、支持维护热点:使用额外的双链表维护entry,当一个entry被访问时移动到表头;
3、支持保留k大:当map容量超过预设,则移除链表尾的entry
4、性能比ConcurrentHashMap差,但是在可忍受范围内。统计热点的单位时间维度怎么定?
单位时间如果太粗,必然会带来毛刺,导致系统性能不平滑;如果太细,会给性能带来挑战。
Sentinel通过滑动窗口算法来统计qps,实现上是采用了抽样的技巧:它将采样窗口均分为一些格子,比如采样窗口为2s,均分为20个格子,那么统计前20个格子qps再取均值即可得到平均qps。
1、采用平均滑动窗口累计平均值可以削平毛刺qps
移动平均(Moving Average)
2、采用滑动窗口和ConcurrentLinkedHashMap可以实现Sentinel的核心逻辑在分布式系统中如何统计?
如何快速地在集群中统计,并且让限流规则在单机上生效。
簇点数据是通过http接口获取的,多个线程并发获取,将结果放到一个queue中,另外还有一个线程从queue中获取节点数据,merge结果。簇点是业务中需要统计的数据。
代码
1 | void applyRealParamIdx(/*@NonNull*/ ParamFlowRule rule, int length) { |
FlowSlot
1 | public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, |
注意这里的限流规则分为单机版本的和集群版本的:
- 单机限流支持QPS和线程数两种限流方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public boolean canPass(Node node, int acquireCount, boolean prioritized) {
int curCount = avgUsedTokens(node);
if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
如果是QPS,则基于滑动窗口,判断之前QPS+1是否超过了配置的阈值。
如果是线程数,则计算执行中的线程数是否到达阈值。
QPS和线程数都是在StatisticSlot
中统计的。
Sentinel中的滑动窗口总体思路是将1秒拆分成sampleCount
个区间,用一个环形数组array
来代表这些区间的计数值。
获取当前窗口:com.alibaba.csp.sentinel.slots.statistic.base.LeapArray#currentWindow(long)
。
统计1秒内所有区间的计数:com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric#pass
。
下面是测试用的代码:1
2
3
4
5
6
7
8
9
10@Test
public void test() throws InterruptedException {
StatisticNode statisticNode = new StatisticNode();
statisticNode.addPassRequest(1);
Thread.sleep(500);
statisticNode.addPassRequest(1);
Thread.sleep(500);
double qps = statisticNode.passQps();
System.out.println(qps);
} - 集群限流
集群限流还会调一次服务端的DefaultTokenService#requestToken:服务端DefaultTokenService#requestToken:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
try {
TokenService clusterService = pickClusterService();
if (clusterService == null) {
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}
long flowId = rule.getClusterConfig().getFlowId();
TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
// If client is absent, then fallback to local mode.
} catch (Throwable ex) {
RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
}
// Fallback to local flow control when token client or server for this rule is not available.
// If fallback is not enabled, then directly pass.
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68static boolean allowProceed(long flowId) {
String namespace = ClusterFlowRuleManager.getNamespace(flowId);
return GlobalRequestLimiter.tryPass(namespace);
}
static TokenResult acquireClusterToken(/*@Valid*/ FlowRule rule, int acquireCount, boolean prioritized) {
Long id = rule.getClusterConfig().getFlowId();
// 全局流控,滑动窗口
if (!allowProceed(id)) {
return new TokenResult(TokenResultStatus.TOO_MANY_REQUEST);
}
ClusterMetric metric = ClusterMetricStatistics.getMetric(id);
if (metric == null) {
return new TokenResult(TokenResultStatus.FAIL);
}
double latestQps = metric.getAvg(ClusterFlowEvent.PASS);
// 根据阈值模式(全局 OR 单机均摊)计算阈值
double globalThreshold = calcGlobalThreshold(rule) * ClusterServerConfigManager.getExceedCount();
// 计算获取token后剩余数
double nextRemaining = globalThreshold - latestQps - acquireCount;
if (nextRemaining >= 0) {
// TODO: checking logic and metric operation should be separated.
// 更新统计信息
metric.add(ClusterFlowEvent.PASS, acquireCount);
metric.add(ClusterFlowEvent.PASS_REQUEST, 1);
if (prioritized) {
// Add prioritized pass.
metric.add(ClusterFlowEvent.OCCUPIED_PASS, acquireCount);
}
// Remaining count is cut down to a smaller integer.
return new TokenResult(TokenResultStatus.OK)
.setRemaining((int) nextRemaining)
.setWaitInMs(0);
} else {
// 如果支持优先,则尝试从下一个格子借用token
if (prioritized) {
// Try to occupy incoming buckets.
double occupyAvg = metric.getAvg(ClusterFlowEvent.WAITING);
if (occupyAvg <= ClusterServerConfigManager.getMaxOccupyRatio() * globalThreshold) {
int waitInMs = metric.tryOccupyNext(ClusterFlowEvent.PASS, acquireCount, globalThreshold);
// waitInMs > 0 indicates pre-occupy incoming buckets successfully.
if (waitInMs > 0) {
ClusterServerStatLogUtil.log("flow|waiting|" + id);
return new TokenResult(TokenResultStatus.SHOULD_WAIT)
.setRemaining(0)
.setWaitInMs(waitInMs);
}
// Or else occupy failed, should be blocked.
}
}
// Blocked.
metric.add(ClusterFlowEvent.BLOCK, acquireCount);
metric.add(ClusterFlowEvent.BLOCK_REQUEST, 1);
ClusterServerStatLogUtil.log("flow|block|" + id, acquireCount);
ClusterServerStatLogUtil.log("flow|block_request|" + id, 1);
if (prioritized) {
// Add prioritized block.
metric.add(ClusterFlowEvent.OCCUPIED_BLOCK, acquireCount);
ClusterServerStatLogUtil.log("flow|occupied_block|" + id, 1);
}
return blockedResult();
}
}
熔断降级 - CircuitBreaker、DegradeRule
https://github.com/alibaba/Sentinel/wiki/%E7%86%94%E6%96%AD%E9%99%8D%E7%BA%A7
- 用于对不稳定的弱依赖服务调用进行熔断降级,避免局部不稳定因素导致整体的雪崩。
- 适用于1.8.0以上版本
3种熔断策略
- 慢调用比例 (SLOW_REQUEST_RATIO):选择以慢调用比例作为阈值,需要设置允许的慢调用 RT(即最大的响应时间),请求的响应时间大于该值则统计为慢调用。当单位统计时长(statIntervalMs)内请求数目大于设置的最小请求数目,并且慢调用的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求响应时间小于设置的慢调用 RT 则结束熔断,若大于设置的慢调用 RT 则会再次被熔断。
- 异常比例 (ERROR_RATIO):当单位统计时长(statIntervalMs)内请求数目大于设置的最小请求数目,并且异常的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。异常比率的阈值范围是
[0.0, 1.0]
,代表 0% - 100%。
异常降级仅对业务异常生效,对Sentinel本身的异常BlockException
不生效,为了统计异常比例或异常数,需要通过 Tracer.trace(ex) 记录业务异常。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15Entry entry = null;
try {
entry = SphU.entry(key, EntryType.IN, key);
// Write your biz code here.
// <<BIZ CODE>>
} catch (Throwable t) {
if (!BlockException.isBlockException(t)) {
Tracer.trace(t);
}
} finally {
if (entry != null) {
entry.exit();
}
} - 异常数 (ERROR_COUNT):当单位统计时长内的异常数目超过阈值之后会自动进行熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。
熔断规则 - DegradeRule
Field | 说明 | 默认值 |
---|---|---|
resource | 资源名,即规则的作用对象 | |
grade | 熔断策略,支持慢调用比例/异常比例/异常数策略 | 慢调用比例 |
count | 慢调用比例模式下为慢调用临界 RT(超出该值计为慢调用);异常比例/异常数模式下为对应的阈值 | |
timeWindow | 熔断时长,单位为 | s |
minRequestAmount | 熔断触发的最小请求数,请求数小于该值时即使异常比率超出阈值也不会熔断(1.7.0 引入) | 5 |
statIntervalMs | 统计时长(单位为 ms),如 60*1000 代表分钟级(1.8.0 引入) | 1000 ms |
slowRatioThreshold | 慢调用比例阈值,仅慢调用比例模式有效(1.8.0 引入) |
DegradeSlot 和 CircuitBreaker
DegradeSlot
降级相关的统计Slot
- 执行请求前检查是否熔断 DegradeSlot#performChecking
根据熔断状态决定是否通过,没有通过则抛出DegradeException异常
如果是CLOSED表示关闭熔断,此时所有请求都正常被执行;如果是OPEN则表示打开熔断,此时所有请求被拦截,仅会每隔一小段时间放一个请求作为。 - 执行请求后统计
ResponseTimeCircuitBreaker#onRequestComplete 会统计慢请求,然后根据慢请求统计信息改变熔断状态
ExceptionCircuitBreaker#onRequestComplete 会统计异常请求,然后根据统计信息改变熔断状态
超时熔断 - ResponseTimeCircuitBreaker
状态变更 ResponseTimeCircuitBreaker#handleStateChangeWhenThresholdExceeded
异常熔断 - ExceptionCircuitBreaker
状态变更 ExceptionCircuitBreaker#handleStateChangeWhenThresholdExceeded