Sentinel熔断源码解读
(1) 什么是熔断
在生活中,我们经常看到家里的闸里会有保险丝,保险丝会在电流过大时熔断。
(2) 为什么需要熔断
像保险丝在电流过大时熔断保护电路一样
计算机软件系统里的熔断是为了保护系统不被过大的资源消耗拖垮
(3) 熔断的原理
断路器
(3.1) 状态机
熔断状态机
(4) Sentinel熔断配置
(4.1) 系统里使用熔断
//
(4.2) 熔断配置
交易每天都有大量的熔断报警,但是实际熔断时的服务状况又不是我们预期配置的熔断触发场景,和我们在预案平台配置的熔断条件有出入,因此排查了一波。
(4.2.1) sentinel 1.7版本
熔断策略 | 实际触发熔断规则 |
---|---|
RT | 1s内所有成功的请求平均RT连续5次大于配置的阈值时触发熔断 |
错误数 | 1min内所有异常的请求次数大于配置的阈值时触发熔断 |
错误率 | 1s内请求数大于配置的最小请求数,并且异常数/成功数大于配置的阈值 |
(4.2.2) sentinel 1.8版本
熔断策略 | 实际触发熔断规则 |
---|---|
RT | 在一个统计时长内总请求数大于等于最小请求数,并且慢调用(RT大于阈值)次数/总调用次数大于比例阈值时触发熔断 |
错误数 | 1min内所有异常的请求次数大于配置的阈值时触发熔断 |
错误率 | 1s内请求数大于配置的最小请求数,并且异常数/成功数大于配置的阈值 |
(5) 源码解读
// todo
// 流程
// 根据流程重新梳理 表达
熔断降级插槽-DegradeSlot
package com.alibaba.csp.sentinel.slots.block.degrade;
/**
* 专门用于熔断/电路断路 的处理插槽
*/
@Spi(order = Constants.ORDER_DEGRADE_SLOT)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// 执行检查
performChecking(context, resourceWrapper);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
/** 执行检查 */
void performChecking(Context context, ResourceWrapper r) throws BlockException {
// 根据资源名获取对应的熔断器/熔断器
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}
for (CircuitBreaker cb : circuitBreakers) {
// 判断是否可以通过
if (!cb.tryPass(context)) {
// 抛出熔断异常 DegradeException
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
}
熔断规则的规则管理器-DegradeRuleManager
DegradeRuleManager
是观察者模式里的观察者,被观察着是DynamicSentinelProperty
,观察到配置变更以后,会更新配置。
package com.alibaba.csp.sentinel.slots.block.degrade;
/**
* 熔断规则的规则管理器
*/
public final class DegradeRuleManager {
// 资源对应的熔断器
private static volatile Map<String, List<CircuitBreaker>> circuitBreakers = new HashMap<>();
// 资源对应的熔断配置
private static volatile Map<String, Set<DegradeRule>> ruleMap = new HashMap<>();
// 观察者
private static final RulePropertyListener LISTENER = new RulePropertyListener();
// 当前配置
private static SentinelProperty<List<DegradeRule>> currentProperty
= new DynamicSentinelProperty<>();
static {
currentProperty.addListener(LISTENER);
}
}
查询熔断规则
static List<CircuitBreaker> getCircuitBreakers(String resourceName) {
return circuitBreakers.get(resourceName);
}
更新熔断规则
观察着实现类
private static class RulePropertyListener implements PropertyListener<List<DegradeRule>> {
/** 重新加载熔断配置 */
private synchronized void reloadFrom(List<DegradeRule> list) {
// 构建 资源和熔断器map
Map<String, List<CircuitBreaker>> cbs = buildCircuitBreakers(list);
// 构建 资源和熔断配置map
Map<String, Set<DegradeRule>> rm = new HashMap<>(cbs.size());
for (Map.Entry<String, List<CircuitBreaker>> e : cbs.entrySet()) {
// 熔断器集合不能为空
assert e.getValue() != null && !e.getValue().isEmpty();
// 处理熔断规则
Set<DegradeRule> rules = new HashSet<>(e.getValue().size());
for (CircuitBreaker cb : e.getValue()) {
rules.add(cb.getRule());
}
rm.put(e.getKey(), rules);
}
// 设置 资源和熔断器map
DegradeRuleManager.circuitBreakers = cbs;
// 设置 资源和熔断配置map
DegradeRuleManager.ruleMap = rm;
}
/** 更新配置 */
@Override
public void configUpdate(List<DegradeRule> conf) {
reloadFrom(conf);
RecordLog.info("[DegradeRuleManager] Degrade rules has been updated to: {}", ruleMap);
}
/** 加载配置 */
@Override
public void configLoad(List<DegradeRule> conf) {
reloadFrom(conf);
RecordLog.info("[DegradeRuleManager] Degrade rules loaded: {}", ruleMap);
}
/** 构建 资源和熔断器map */
private Map<String, List<CircuitBreaker>> buildCircuitBreakers(List<DegradeRule> list) {
Map<String, List<CircuitBreaker>> cbMap = new HashMap<>(8);
if (list == null || list.isEmpty()) {
return cbMap;
}
for (DegradeRule rule : list) {
// 判断熔断规则是否可用
if (!isValidRule(rule)) {
RecordLog.warn("[DegradeRuleManager] Ignoring invalid rule when loading new rules: {}", rule);
continue;
}
if (StringUtil.isBlank(rule.getLimitApp())) {
// 熔断规则limitApp为空 设置默认值 default
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
//
CircuitBreaker cb = getExistingSameCbOrNew(rule);
if (cb == null) {
RecordLog.warn("[DegradeRuleManager] Unknown circuit breaking strategy, ignoring: {}", rule);
continue;
}
String resourceName = rule.getResource();
List<CircuitBreaker> cbList = cbMap.get(resourceName);
if (cbList == null) {
cbList = new ArrayList<>();
cbMap.put(resourceName, cbList);
}
cbList.add(cb);
}
return cbMap;
}
}
DegradeRuleManager::getExistingSameCbOrNew
private static CircuitBreaker getExistingSameCbOrNew(/*@Valid*/ DegradeRule rule) {
List<CircuitBreaker> cbs = getCircuitBreakers(rule.getResource());
if (cbs == null || cbs.isEmpty()) {
return newCircuitBreakerFrom(rule);
}
for (CircuitBreaker cb : cbs) {
if (rule.equals(cb.getRule())) {
// Reuse the circuit breaker if the rule remains unchanged.
return cb;
}
}
return newCircuitBreakerFrom(rule);
}
DegradeRuleManager::newCircuitBreakerFrom
/**
* 根据提供的熔断规则创建熔断器实例。
*
* @param rule 熔断规则
* @return 基于提供规则的新熔断器;如果规则无效或类型不受支持,则为 null
*/
private static CircuitBreaker newCircuitBreakerFrom(/*@Valid*/ DegradeRule rule) {
//
switch (rule.getGrade()) {
// 按照rt熔断
case RuleConstant.DEGRADE_GRADE_RT:
return new ResponseTimeCircuitBreaker(rule);
// 按照错误率熔断
case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO:
// 按照错误数熔断
case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT:
return new ExceptionCircuitBreaker(rule);
default:
return null;
}
}
ResponseTimeCircuitBreaker
package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;
/**
* @since 1.8.0
*/
public class ResponseTimeCircuitBreaker extends AbstractCircuitBreaker {
// 慢请求比例最大值
private static final double SLOW_REQUEST_RATIO_MAX_VALUE = 1.0d;
// 最大允许rt
private final long maxAllowedRt;
// 最大慢请求比例
private final double maxSlowRequestRatio;
// 最小请求数
private final int minRequestAmount;
private final LeapArray<SlowRequestCounter> slidingCounter;
/**
*/
public ResponseTimeCircuitBreaker(DegradeRule rule) {
this(rule, new SlowRequestLeapArray(1, rule.getStatIntervalMs()));
}
/**
*/
ResponseTimeCircuitBreaker(DegradeRule rule, LeapArray<SlowRequestCounter> stat) {
super(rule);
AssertUtil.isTrue(rule.getGrade() == RuleConstant.DEGRADE_GRADE_RT, "rule metric type should be RT");
AssertUtil.notNull(stat, "stat cannot be null");
this.maxAllowedRt = Math.round(rule.getCount());
this.maxSlowRequestRatio = rule.getSlowRatioThreshold();
this.minRequestAmount = rule.getMinRequestAmount();
this.slidingCounter = stat;
}
}
CircuitBreaker::tryPass
package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
/**
* 熔断器/断路器接口
*
* https://martinfowler.com/bliki/CircuitBreaker.html
*
*/
public interface CircuitBreaker {
/**
* 获取关联的熔断规则。
*
* @return 关联的熔断规则
*/
DegradeRule getRule();
/**
* Acquires permission of an invocation only if it is available at the time of invoking.
*
* @param context context of current invocation
* @return {@code true} if permission was acquired and {@code false} otherwise
*/
boolean tryPass(Context context);
/**
* 获取熔断器的当前状态
*
* @return 当前熔断器状态
*/
State currentState();
/**
* <p>Record a completed request with the context and handle state transformation of the circuit breaker.</p>
* <p>Called when a <strong>passed</strong> invocation finished.</p>
*
* @param context 当前调用的上下文
*/
void onRequestComplete(Context context);
/**
* 熔断器状态机
*/
enum State {
/**
* 在OPEN状态下,所有的请求都会被拒绝,直到下一个恢复时间点。
*/
OPEN,
/**
* 在 HALF_OPEN 状态下,熔断器将允许“探测”调用。
* 如果根据策略调用异常(如慢调用),熔断器会重新转变为OPEN状态,等待下一个恢复时间点;
* 否则资源将被视为“已恢复”,熔断器将停止切断请求并转换为 CLOSED 状态。
*/
HALF_OPEN,
/**
* 在 CLOSED 状态下,所有请求都被允许。当指标值超过阈值时,熔断器将转换为 OPEN
*/
CLOSED
}
}
package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;
/**
* @since 1.8.0
*/
public abstract class AbstractCircuitBreaker implements CircuitBreaker {
//
protected final DegradeRule rule;
//
protected final int recoveryTimeoutMs;
private final EventObserverRegistry observerRegistry;
//
protected final AtomicReference<State> currentState = new AtomicReference<>(State.CLOSED);
//
protected volatile long nextRetryTimestamp;
public AbstractCircuitBreaker(DegradeRule rule) {
this(rule, EventObserverRegistry.getInstance());
}
}
@Override
public boolean tryPass(Context context) {
// 模版实现
if (currentState.get() == State.CLOSED) {
return true;
}
// 熔断器 开启
if (currentState.get() == State.OPEN) {
// 对于半开状态,允许探测请求。
return retryTimeoutArrived() && fromOpenToHalfOpen(context);
}
return false;
}
/** */
protected boolean retryTimeoutArrived() {
// 当前时间戳 >= 下个周期
return TimeUtil.currentTimeMillis() >= nextRetryTimestamp;
}
protected boolean fromOpenToHalfOpen(Context context) {
// 熔断器只有在OPEN状态下才能设置成HALF_OPEN
// 并发场景只有一个线程能设置成功
if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
// 事件通知监听者
notifyObservers(State.OPEN, State.HALF_OPEN, null);
// 获取当前上下文的entry
Entry entry = context.getCurEntry();
// 退出回调处理handler
entry.whenTerminate(new BiConsumer<Context, Entry>() {
@Override
public void accept(Context context, Entry entry) {
// Note: This works as a temporary workaround for https://github.com/alibaba/Sentinel/issues/1638
// Without the hook, the circuit breaker won't recover from half-open state in some circumstances
// when the request is actually blocked by upcoming rules (not only degrade rules).
if (entry.getBlockError() != null) {
// 熔断器打开
// Fallback to OPEN due to detecting request is blocked
currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
//
notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
}
}
});
//
return true;
}
// 关闭状态 或者 半开状态 返回false
return false;
}
熔断只是对时间和状态进行判断,时间和状态来自哪里?
时间在执行完毕后的exits里
看 com.alibaba.csp.sentinel.slots.statistic.StatisticSlot::exit 方法
package com.alibaba.csp.sentinel.slots.statistic;
/**
* 专用于实时统计的处理器插槽。
*/
@Spi(order = Constants.ORDER_STATISTIC_SLOT)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
/** 退出时调用exit方法 */
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
// 获取当前上下文的node
Node node = context.getCurNode();
// 当前上下文的entry没有被流控
if (context.getCurEntry().getBlockError() == null) {
// 计算响应时间 (使用 completeStatTime 作为完成时间)
long completeStatTime = TimeUtil.currentTimeMillis();
context.getCurEntry().setCompleteTimestamp(completeStatTime);
// 当前时间戳 - 开始时间戳
long rt = completeStatTime - context.getCurEntry().getCreateTimestamp();
Throwable error = context.getCurEntry().getError();
// 记录响应时间和成功请求个数
recordCompleteFor(node, count, rt, error);
recordCompleteFor(context.getCurEntry().getOriginNode(), count, rt, error);
//
if (resourceWrapper.getEntryType() == EntryType.IN) {
recordCompleteFor(Constants.ENTRY_NODE, count, rt, error);
}
}
// 使用注册的退出回调处理程序处理退出事件。
Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
for (ProcessorSlotExitCallback handler : exitCallbacks) {
handler.onExit(context, resourceWrapper, count, args);
}
// fix bug https://github.com/alibaba/Sentinel/issues/2374
fireExit(context, resourceWrapper, count, args);
}
/** */
private void recordCompleteFor(Node node, int batchCount, long rt, Throwable error) {
if (node == null) {
return;
}
//
node.addRtAndSuccess(rt, batchCount);
node.decreaseThreadNum();
if (error != null && !(error instanceof BlockException)) {
node.increaseExceptionQps(batchCount);
}
}
StatisticSlot
package com.alibaba.csp.sentinel.slots.statistic;
/**
* 专用于实时统计的处理器插槽。
*/
@Spi(order = Constants.ORDER_STATISTIC_SLOT)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
/**
* Holds statistics of the recent {@code INTERVAL} milliseconds. The {@code INTERVAL} is divided into time spans
* by given {@code sampleCount}.
*/
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
/**
* Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds,
* meaning each bucket per second, in this way we can get accurate statistics of each second.
*/
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
/**
* The counter for thread count.
*/
private LongAdder curThreadNum = new LongAdder();
/**
* The last timestamp when metrics were fetched.
*/
private long lastFetchTime = -1;
@Override
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
}
package com.alibaba.csp.sentinel.slots.statistic.metric;
/**
* The basic metric class in Sentinel using a {@link BucketLeapArray} internal.
*/
public class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
@Override
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
}
LeapArray
package com.alibaba.csp.sentinel.slots.statistic.base;
/**
* <p>
* Basic data structure for statistic metrics in Sentinel.
* </p>
* <p>
* Leap array use sliding window algorithm to count data. Each bucket cover {@code windowLengthInMs} time span,
* and the total time span is {@link #intervalInMs}, so the total bucket amount is:
* {@code sampleCount = intervalInMs / windowLengthInMs}.
* </p>
*
* @param <T> type of statistic data
* @author jialiang.linjl
* @author Eric Zhao
* @author Carpenter Lee
*/
public abstract class LeapArray<T> {
protected int windowLengthInMs;
protected int sampleCount;
protected int intervalInMs;
private double intervalInSecond;
protected final AtomicReferenceArray<WindowWrap<T>> array;
/**
* The conditional (predicate) update lock is used only when current bucket is deprecated.
*/
private final ReentrantLock updateLock = new ReentrantLock();
/**
* Get the bucket at current timestamp.
*
* @return the bucket at current timestamp
*/
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
}
}
/**
* Get bucket item at provided timestamp.
*
* @param timeMillis a valid timestamp in milliseconds
* @return current bucket item at provided timestamp if the time is valid; null if time is invalid
*/
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 计算下标
int idx = calculateTimeIdx(timeMillis);
// 计算当前桶开始时间。
long windowStart = calculateWindowStart(timeMillis);
/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket.
*/
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* bucket is empty, so create new and update
*
* If the old bucket is absent, then we create a new bucket at {@code windowStart},
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
*/
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*
* If current {@code windowStart} is equal to the start timestamp of old bucket,
* that means the time is within the bucket, so directly return the bucket.
*/
return old;
} else if (windowStart > old.windowStart()) {
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*
* If the start timestamp of old bucket is behind provided time, that means
* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
* Note that the reset and clean-up operations are hard to be atomic,
* so we need a update lock to guarantee the correctness of bucket update.
*
* The update lock is conditional (tiny scope) and will take effect only when
* bucket is deprecated, so in most cases it won't lead to performance loss.
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
return (int)(timeId % array.length());
}
package com.alibaba.csp.sentinel.slots.statistic.metric.occupy;
/**
* @since 1.5.0
*/
public class OccupiableBucketLeapArray extends LeapArray<MetricBucket> {
@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) {
// Update the start time and reset value.
w.resetTo(time);
MetricBucket borrowBucket = borrowArray.getWindowValue(time);
if (borrowBucket != null) {
w.value().reset();
w.value().addPass((int)borrowBucket.pass());
} else {
w.value().reset();
}
return w;
}
}