Sentinel限流源码解读
(1) 原理
流量控制(flow control),其原理是监控应用流量的 QPS 或并发线程数等指标,当达到指定的阈值时对流量进行控制,以避免被瞬时的流量高峰冲垮,从而保障应用的高可用性。
假如让自己实现一个限流算法,怎么实现?
(1.1) 计数器限流算法
//
(2) 限流配置
sentinel
限流类型(2种)
线程数限流
QPS数限流
限流策略/流控模式(3种)
直接限流、关联限流、链路限流
流控效果(3种)
直接拒绝、Ware Up、排队等待
(3) 限流源码
源码流程图 todo
时序图 todo
限流有比较重要的几个点
1.限流配置 限流配置管理
2.限流算法 限流实现
3.
(3.1) 限流配置管理-FlowRuleManager
FlowRuleManager对象主要职责是管理资源的限流配置
资源对应的流控配置保存在 Map<String, List<FlowRule>> flowRules
package com.alibaba.csp.sentinel.slots.block.flow;
/**
* 一个资源可以有多个规则/配置。这些规则按以下顺序生效:
* 来自指定呼叫方的请求
* 没有指定的调用者
*/
public class FlowRuleManager {
/** 资源和对应限流配置集合 重要 */
private static volatile Map<String, List<FlowRule>> flowRules = new HashMap<>();
private static final FlowPropertyListener LISTENER = new FlowPropertyListener();
/** */
private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>();
/**
* 定时任务线程池
* SCHEDULER 的 corePool size 必须设置为 1,这样两个任务startMetricTimerListener()才能由SCHEDULER有序运行
*/
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("sentinel-metrics-record-task", true));
static {
currentProperty.addListener(LISTENER);
startMetricTimerListener();
}
}
这里用到了观察者模式,DynamicSentinelProperty
是被观察者,FlowRuleManager
是观察者,FlowRuleManager观察到配置变更后会通过configUpdate
方法更新配置信息。
(3.1.1) 限流规则对象-FlowRule
FlowRule对象的主要用来存配置信息
package com.alibaba.csp.sentinel.slots.block.flow;
public class FlowRule extends AbstractRule {
public FlowRule() {
super();
setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
/**
* 流控类型 0:线程数限流 1:QPS限流
* 默认使用 QPS限流
*/
private int grade = RuleConstant.FLOW_GRADE_QPS;
/**
* 流量控制阈值。
*/
private double count;
/**
* 限流策略
* 基于调用链的流量控制策略。
*
* {@link RuleConstant#STRATEGY_DIRECT} 直接限流 (by origin);
* {@link RuleConstant#STRATEGY_RELATE} 关联限流 (with relevant resource);
* {@link RuleConstant#STRATEGY_CHAIN} 链路限流 (by entrance resource).
*/
private int strategy = RuleConstant.STRATEGY_DIRECT;
/**
* 具有相关资源或上下文的流量控制中的参考资源。
*/
private String refResource;
/**
* 流控效果
*
* 0.直接拒绝(reject directly)
* 1.热启动(warm up)
* 2. 匀速排队(rate limiter)
* 3. 热启动 + 匀速排队 warm up + rate limiter
*/
private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
/**
* 流控效果为预热启动时的预热时长
* 默认10s
*/
private int warmUpPeriodSec = 10;
/**
* 流控效果为排队等待时的等待时长
* 默认500
*/
private int maxQueueingTimeMs = 500;
/**
* 集群模式
*/
private boolean clusterMode;
/**
* 集群模式的流规则配置。
*/
private ClusterFlowConfig clusterConfig;
/**
* 流量整形(节流)控制器。
*/
private TrafficShapingController controller;
}
public abstract class AbstractRule implements Rule {
/**
* 规则id
*/
private Long id;
/**
* 规则名称
*/
private String resource;
/**
* 将受来源限制的应用程序名称。
* 默认的limitApp是"default",即允许所有源应用。
*
* 对于权限规则,多个源名称可以用逗号(',')分隔。
*/
private String limitApp;
}
(3.2) 插槽-FlowSlot
package com.alibaba.csp.sentinel.slots.block.flow;
/**
* 结合从前面的插槽(NodeSelectorSlot、ClusterNodeBuilderSlot 和 StatisticSlot)收集的
* 运行时统计信息,FlowSlot 将使用预设规则来决定是否应阻止传入请求。
*
* 如果触发任何规则,SphU.entry(resourceName) 将抛出 FlowException。
* 用户可以通过捕获 FlowException 来自定义自己的逻辑。
*
* 一个资源可以有多个流规则。 FlowSlot 遍历这些规则,直到其中一条被触发或者所有规则都被遍历。
*
* 每个FlowRule主要由这些因素组成:等级、策略、路径。 我们可以结合这些因素来达到不同的效果。
*
* 等级由 FlowRule 中的 grade 字段定义。
* 此处,0 用于线程隔离,1 用于请求计数整形 (QPS)。
* 线程计数和请求计数都是在真实运行时收集的,
* 我们可以通过以下命令查看这些统计信息:
* <pre>
* curl http://localhost:8719/tree
*
* idx id thread pass blocked success total aRt 1m-pass 1m-block 1m-all exception
* 2 abc647 0 460 46 46 1 27 630 276 897 0
* </pre>
*
* thread 当前正在处理资源的线程数
* pass 一秒内传入请求数
* blocked 一秒内被阻塞的请求数
* success 一秒内被Sentinel成功处理的请求数
* RT 请求在一秒内的平均响应时间
* total 一秒内传入请求和阻塞请求的总和
* 1m-pass 为一分钟内传入请求数
* 1m-block 为一分钟内被阻塞的请求数
* 1m-all 是一分钟内传入和阻止的请求总数
* exception 为一秒内业务(自定义)异常的计数
*
* 这个阶段通常用于保护资源不被占用。 (达到限流阈值,服务快撑不住了)
* 如果资源需要很长时间才能完成,线程将开始占用。 响应时间越长,占用的线程越多。
*
* 除了计数器之外,线程池或信号量也可以用来实现这一点。
* - 线程池:分配一个线程池来处理这些资源。 当池中不再有空闲线程时,拒绝请求而不影响其他资源。
* - 信号量:使用信号量来控制该资源中线程的并发数。
*
* 使用线程池的好处是,超时可以优雅的走开。 但它也给我们带来了上下文切换和额外线程的成本。
* 如果传入请求已经在单独的线程中提供服务,例如 Servlet HTTP 请求,那么如果使用线程池,线程数几乎会翻倍。
*
*
* 流量整形
* 当QPS超过阈值时,Sentinel会采取动作控制传入的请求,由流规则中的 controlBehavior 字段配置。
* 1.立即拒绝(Immediately reject):默认行为。 超出的请求立即被拒绝 并抛出 FlowException
* 2.热启动(Warmup) : 如果一段时间以来系统的负载很低,大量的请求来了,系统可能无法一次处理所有这些请求。
* 然而,如果我们稳定地增加传入请求,系统可以预热并最终能够处理所有请求。
* 可以通过在流规则中设置字段 warmUpPeriodSec 来配置此预热期。
* 3.统一速率限制 此策略严格控制请求之间的间隔。换句话说,它允许请求以稳定、统一的速率传递。
* https://raw.githubusercontent.com/wiki/alibaba/Sentinel/image/uniform-speed-queue.png
* 该策略是漏桶算法的实现 https://en.wikipedia.org/wiki/Leaky_bucket
* 它用于以稳定的速率处理请求,通常用于突发业务(例如消息处理)。
* 当大量超出系统容量的请求同时到达时,使用此策略的系统将处理请求及其固定速率,直到所有请求都已处理或超时。
*/
@Spi(order = Constants.ORDER_FLOW_SLOT)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
private final FlowRuleChecker checker;
public FlowSlot() {
this(new FlowRuleChecker());
}
/**
* Package-private for test.
*
* @param checker flow rule checker
* @since 1.6.1
*/
FlowSlot(FlowRuleChecker checker) {
AssertUtil.notNull(checker, "flow checker should not be null");
this.checker = checker;
}
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// 校验
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
// 限流规则校验
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
//
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
@Override
public Collection<FlowRule> apply(String resource) {
// Flow rule map should not be null.
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
return flowRules.get(resource);
}
};
}
(3.2.1) 流量控制规则的规则检查-FlowRuleChecker
FlowRuleChecker
的主要作用是 流量控制规则的规则检查
package com.alibaba.csp.sentinel.slots.block.flow;
/**
* 流量控制规则的规则检查器。
*/
public class FlowRuleChecker {
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
// 获取资源对应的规则集合
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
// 是否可以通过流量控制规则检查
if (!canPassCheck(rule, context, node, count, prioritized)) {
// 被限流,抛异常FlowException
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
}
/** */
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
// 是否是集群节点
if (rule.isClusterMode()) {
// 集群检查
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
// 本地检查
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
/** */
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
//
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
/** */
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
// 限流服务 默认为default
String limitApp = rule.getLimitApp();
// 流控策略
int strategy = rule.getStrategy();
//
String origin = context.getOrigin();
if (limitApp.equals(origin) && filterOrigin(origin)) {
// 流控策略 = 直接拒绝
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Matches limit origin, return origin statistic node.
return context.getOriginNode();
}
//
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) { // limitApp = "default"
// 流控策略 = 直接拒绝
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// 返回集群节点
return node.getClusterNode();
}
//
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
&& FlowRuleManager.isOtherOrigin(origin, rule.getResource())) { // limitApp = "other" &&
// 流控策略 = 直接拒绝
if (strategy == RuleConstant.STRATEGY_DIRECT) {
return context.getOriginNode();
}
//
return selectReferenceNode(rule, context, node);
}
return null;
}
static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {
// 资源名
String refResource = rule.getRefResource();
// 流控策略
int strategy = rule.getStrategy();
if (StringUtil.isEmpty(refResource)) {
return null;
}
// 流控策略-关联限流
if (strategy == RuleConstant.STRATEGY_RELATE) {
//
return ClusterBuilderSlot.getClusterNode(refResource);
}
// 流控策略-链路限流
if (strategy == RuleConstant.STRATEGY_CHAIN) {
if (!refResource.equals(context.getName())) {
return null;
}
return node;
}
// No node.
return null;
}
(3.3) 流控效果-TrafficShapingController
TrafficShapingController 实现类有3类共4个
DefaultController 快速失败流控效果实现 默认节流控制器(立即拒绝策略)。
RateLimiterController 排队等待流控效果实现
WarmUpController 冷启动流控效果实现
WarmUpRateLimiterController 冷启动排队等待流控效果实现
(3.3.1) 默认快速失败限流-DefaultController
快速失败限流器
如果
package com.alibaba.csp.sentinel.slots.block.flow.controller;
/**
* 默认节流控制器(立即拒绝策略)。
*/
public class DefaultController implements TrafficShapingController {
private static final int DEFAULT_AVG_USED_TOKENS = 0;
private double count;
private int grade;
public DefaultController(double count, int grade) {
this.count = count;
this.grade = grade;
}
/**
* 1.获取已经使用的令牌 (qps数或线程数)
* 2.如果 已使用数+申请数 <= 阈值,通过
* 3.如果 已使用数+申请数 > 阈值
* 判断是否允许 透支(使用下个窗口令牌)
*/
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 当前令牌数 (qps数或线程数)
int curCount = avgUsedTokens(node);
// 判断已使用令牌数和申请的令牌数 是否大于 配置的令牌数
if (curCount + acquireCount > count) {
// 如果阈值类型是QPS 并且 优先获取时,允许使用下个窗口令牌/透支
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
// 当前时间戳
currentTime = TimeUtil.currentTimeMillis();
// 预占用下个时间段的令牌
// 计算下一个时间窗口需要等待的时间
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
// 如果等待时间 < 最大等待超时时间
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
// 不通过 开始限流
return false;
}
return true;
}
/** 获取令牌个数 */
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
// 线程个数 或 通过QPS数
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
StatisticNode::tryOccupyNext
/** */
@Override
public long tryOccupyNext(long currentTime, int acquireCount, double threshold) {
//
double maxCount = threshold * IntervalProperty.INTERVAL / 1000;
// 当前时间窗口内已经预占的令牌数
long currentBorrow = rollingCounterInSecond.waiting();
if (currentBorrow >= maxCount) {
return OccupyTimeoutProperty.getOccupyTimeout();
}
int windowLength = IntervalProperty.INTERVAL / SampleCountProperty.SAMPLE_COUNT;
// 桶/时间窗口开始时间
long earliestTime = currentTime - currentTime % windowLength + windowLength - IntervalProperty.INTERVAL;
int idx = 0;
/*
* Note: here {@code currentPass} may be less than it really is NOW, because time difference
* since call rollingCounterInSecond.pass(). So in high concurrency, the following code may
* lead more tokens be borrowed.
*/
long currentPass = rollingCounterInSecond.pass();
// 计算经过多少时间后可以申请到令牌
while (earliestTime < currentTime) {
long waitInMs = idx * windowLength + windowLength - currentTime % windowLength;
if (waitInMs >= OccupyTimeoutProperty.getOccupyTimeout()) {
break;
}
long windowPass = rollingCounterInSecond.getWindowPass(earliestTime);
if (currentPass + currentBorrow + acquireCount - windowPass <= maxCount) {
return waitInMs;
}
earliestTime += windowLength;
currentPass -= windowPass;
idx++;
}
return OccupyTimeoutProperty.getOccupyTimeout();
}
StatisticNode
@Override
public double passQps() {
return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}
(4) sentinel限流规则初始化
- FlowRule对象用来存配置信息
- FlowRuleManager用来加载配置信息
- 后台定时任务每秒获取一次最新配置 通过 MetricTimerListener 实现
SentinelConfig初始化配置及获取最新配置
MetricTimerListener 定时的数据采集,然后写到log文件里去
StatisticNode 使用滑动窗口实时统计数据遍历集群节点 汇总统计的数据
流控策略
LimitApp的作用域只在配置的流控策略为RuleConstant.STRATEGY_DIRECT(直接关联)时起作用。
其有三种配置,分别为default,origin_name,other
default 如果配置为default,表示统计不区分来源,当前资源的任何来源流量都会被统计(其实就是选择 Node 为 clusterNode 维度)
origin_name 如果配置为指定名称的 origin_name,则只会对当前配置的来源流量做统计
other 如果配置为other 则会对其他全部来源生效但不包括第二条配置的来源
当策略配置为 RuleConstant.STRATEGY_RELATE 或 RuleConstant.STRATEGY_CHAIN 时
STRATEGY_RELATE 关联其他的指定资源,如资源A想以资源B的流量状况来决定是否需要限流,这时资源A规则配置可以使用 STRATEGY_RELATE 策略
STRATEGY_CHAIN 对指定入口的流量限流,因为流量可以有多个不同的入口(EntranceNode)
流控策略校验
同一个资源名可以配置多条规则,规则的生效顺序为:{some_origin_name} > other > default
校验 FlowRuleChecker#selectNodeByRequesterAndStrategy
String limitApp = rule.getLimitApp(); // 不设置默认为default,设置了就是自己指定的
// 调用方限流-直接限流-针对特定的调用者 origin变量 不为`default` 或 `other`,并且配置的limitApp和origin变量 相等
if (limitApp.equals(origin) && filterOrigin(origin)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Matches limit origin, return origin statistic node.
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
}
// 调用方限流-直接限流-默认
if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Return the cluster node.
return node.getClusterNode();
}
return selectReferenceNode(rule, context, node);
}
// 调用方限流-直接限流-其它
if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
&& FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
}
//
static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {
String refResource = rule.getRefResource();
int strategy = rule.getStrategy();
if (StringUtil.isEmpty(refResource)) {
return null;
}
if (strategy == RuleConstant.STRATEGY_RELATE) {
return ClusterBuilderSlot.getClusterNode(refResource);
}
if (strategy == RuleConstant.STRATEGY_CHAIN) {
if (!refResource.equals(context.getName())) {
return null;
}
return node;
}
// No node.
return null;
}
流控效果 流量控制实现
// 加载流控规则
FlowRuleManager.loadRules(rules);
com.alibaba.csp.sentinel.slots.block.flowFlowRuleUtil::buildFlowRuleMap
/**
* Build the flow rule map from raw list of flow rules, grouping by provided group function.
*
* @param list raw list of flow rules
* @param groupFunction grouping function of the map (by key)
* @param filter rule filter
* @param shouldSort whether the rules should be sorted
* @param <K> type of key
* @return constructed new flow rule map; empty map if list is null or empty, or no wanted rules
*/
public static <K> Map<K, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Function<FlowRule, K> groupFunction,
Predicate<FlowRule> filter, boolean shouldSort) {
// 存储所有的流控规则
Map<K, List<FlowRule>> newRuleMap = new ConcurrentHashMap<>();
if (list == null || list.isEmpty()) {
return newRuleMap;
}
Map<K, Set<FlowRule>> tmpMap = new ConcurrentHashMap<>();
// 循环加载流控规则
for (FlowRule rule : list) {
if (!isValidRule(rule)) {
RecordLog.warn("[FlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule);
continue;
}
if (filter != null && !filter.test(rule)) {
continue;
}
if (StringUtil.isBlank(rule.getLimitApp())) {
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
// 获取流控规则的阈值
TrafficShapingController rater = generateRater(rule);
rule.setRater(rater);
K key = groupFunction.apply(rule);
if (key == null) {
continue;
}
Set<FlowRule> flowRules = tmpMap.get(key);
if (flowRules == null) {
// Use hash set here to remove duplicate rules.
flowRules = new HashSet<>();
tmpMap.put(key, flowRules);
}
flowRules.add(rule);
}
Comparator<FlowRule> comparator = new FlowRuleComparator();
for (Entry<K, Set<FlowRule>> entries : tmpMap.entrySet()) {
List<FlowRule> rules = new ArrayList<>(entries.getValue());
if (shouldSort) {
// Sort the rules.
Collections.sort(rules, comparator);
}
newRuleMap.put(entries.getKey(), rules);
}
return newRuleMap;
}
// 0. 直接拒绝(reject directly),
// 1. 热启动(warm up),
// 2. 匀速排队 rate limiter,
// 3. 热启动 + 匀速排队 warm up + rate limiter
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
// 是否是QPS限流
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
switch (rule.getControlBehavior()) {
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
// 1 热启动 warm up 缓慢放量
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
// 3 热启动 + 匀速排队 warm up + rate limiter
return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
// 2 匀速排队 rate limiter
return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
// 直接拒绝
default:
// Default mode or unknown mode: default traffic shaping controller (fast-reject).
}
}
// 直接拒绝
return new DefaultController(rule.getCount(), rule.getGrade());
}
参考资料
[1] Sentinel源码分析—FlowRuleManager加载规则做了什么?
[2] Sentinel源码分析—Sentinel是如何进行流量统计的?
[3] Sentinel源码分析— QPS流量控制是如何实现的?
[4] Sentinel源码解析四(流控策略和流控效果)
[5] sentinel 滑动窗口 限流