Sentinel熔断源码解读

(1) 什么是熔断

在生活中,我们经常看到家里的闸里会有保险丝,保险丝会在电流过大时熔断。

(2) 为什么需要熔断

像保险丝在电流过大时熔断保护电路一样
计算机软件系统里的熔断是为了保护系统不被过大的资源消耗拖垮

(3) 熔断的原理

断路器

(3.1) 状态机

熔断状态机

(4) Sentinel熔断配置

(4.1) 系统里使用熔断

// 

(4.2) 熔断配置

交易每天都有大量的熔断报警,但是实际熔断时的服务状况又不是我们预期配置的熔断触发场景,和我们在预案平台配置的熔断条件有出入,因此排查了一波。

(4.2.1) sentinel 1.7版本

熔断策略 实际触发熔断规则
RT 1s内所有成功的请求平均RT连续5次大于配置的阈值时触发熔断
错误数 1min内所有异常的请求次数大于配置的阈值时触发熔断
错误率 1s内请求数大于配置的最小请求数,并且异常数/成功数大于配置的阈值

(4.2.2) sentinel 1.8版本

熔断策略 实际触发熔断规则
RT 在一个统计时长内总请求数大于等于最小请求数,并且慢调用(RT大于阈值)次数/总调用次数大于比例阈值时触发熔断
错误数 1min内所有异常的请求次数大于配置的阈值时触发熔断
错误率 1s内请求数大于配置的最小请求数,并且异常数/成功数大于配置的阈值

(5) 源码解读

// todo
// 流程
// 根据流程重新梳理 表达

熔断降级插槽-DegradeSlot

package com.alibaba.csp.sentinel.slots.block.degrade;

/**
 * 专门用于熔断/电路断路 的处理插槽
 */
@Spi(order = Constants.ORDER_DEGRADE_SLOT)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        // 执行检查       
        performChecking(context, resourceWrapper);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

}    
/** 执行检查 */
void performChecking(Context context, ResourceWrapper r) throws BlockException {
    // 根据资源名获取对应的熔断器/熔断器
    List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
    if (circuitBreakers == null || circuitBreakers.isEmpty()) {
        return;
    }
    for (CircuitBreaker cb : circuitBreakers) {
        // 判断是否可以通过
        if (!cb.tryPass(context)) {
            // 抛出熔断异常 DegradeException 
            throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
        }
    }
}

熔断规则的规则管理器-DegradeRuleManager

DegradeRuleManager是观察者模式里的观察者,被观察着是DynamicSentinelProperty,观察到配置变更以后,会更新配置。

package com.alibaba.csp.sentinel.slots.block.degrade;

/**
 * 熔断规则的规则管理器  
 */
public final class DegradeRuleManager {
    // 资源对应的熔断器
    private static volatile Map<String, List<CircuitBreaker>> circuitBreakers = new HashMap<>();

    // 资源对应的熔断配置
    private static volatile Map<String, Set<DegradeRule>> ruleMap = new HashMap<>();

    // 观察者
    private static final RulePropertyListener LISTENER = new RulePropertyListener();

    // 当前配置
    private static SentinelProperty<List<DegradeRule>> currentProperty
        = new DynamicSentinelProperty<>();

    static {
        currentProperty.addListener(LISTENER);
    }

}

查询熔断规则

static List<CircuitBreaker> getCircuitBreakers(String resourceName) {
    return circuitBreakers.get(resourceName);
}

更新熔断规则

观察着实现类


private static class RulePropertyListener implements PropertyListener<List<DegradeRule>> {

    /** 重新加载熔断配置 */
    private synchronized void reloadFrom(List<DegradeRule> list) {
        // 构建 资源和熔断器map
        Map<String, List<CircuitBreaker>> cbs = buildCircuitBreakers(list);
        // 构建 资源和熔断配置map
        Map<String, Set<DegradeRule>> rm = new HashMap<>(cbs.size());

        for (Map.Entry<String, List<CircuitBreaker>> e : cbs.entrySet()) {
            // 熔断器集合不能为空
            assert e.getValue() != null && !e.getValue().isEmpty();

            // 处理熔断规则
            Set<DegradeRule> rules = new HashSet<>(e.getValue().size());
            for (CircuitBreaker cb : e.getValue()) {
                rules.add(cb.getRule());
            }
            rm.put(e.getKey(), rules);
        }

        // 设置 资源和熔断器map
        DegradeRuleManager.circuitBreakers = cbs;
        // 设置 资源和熔断配置map
        DegradeRuleManager.ruleMap = rm;
    }

    /** 更新配置 */
    @Override
    public void configUpdate(List<DegradeRule> conf) {
        reloadFrom(conf);
        RecordLog.info("[DegradeRuleManager] Degrade rules has been updated to: {}", ruleMap);
    }

    /** 加载配置 */
    @Override
    public void configLoad(List<DegradeRule> conf) {
        reloadFrom(conf);
        RecordLog.info("[DegradeRuleManager] Degrade rules loaded: {}", ruleMap);
    }

    /** 构建 资源和熔断器map */
    private Map<String, List<CircuitBreaker>> buildCircuitBreakers(List<DegradeRule> list) {
        Map<String, List<CircuitBreaker>> cbMap = new HashMap<>(8);
        if (list == null || list.isEmpty()) {
            return cbMap;
        }
        for (DegradeRule rule : list) {
            // 判断熔断规则是否可用
            if (!isValidRule(rule)) {
                RecordLog.warn("[DegradeRuleManager] Ignoring invalid rule when loading new rules: {}", rule);
                continue;
            }

            if (StringUtil.isBlank(rule.getLimitApp())) {
                // 熔断规则limitApp为空 设置默认值 default
                rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
            }

            // 
            CircuitBreaker cb = getExistingSameCbOrNew(rule);
            if (cb == null) {
                RecordLog.warn("[DegradeRuleManager] Unknown circuit breaking strategy, ignoring: {}", rule);
                continue;
            }

            String resourceName = rule.getResource();

            List<CircuitBreaker> cbList = cbMap.get(resourceName);
            if (cbList == null) {
                cbList = new ArrayList<>();
                cbMap.put(resourceName, cbList);
            }
            cbList.add(cb);
        }
        return cbMap;
    }
}

DegradeRuleManager::getExistingSameCbOrNew

private static CircuitBreaker getExistingSameCbOrNew(/*@Valid*/ DegradeRule rule) {
    List<CircuitBreaker> cbs = getCircuitBreakers(rule.getResource());
    if (cbs == null || cbs.isEmpty()) {
        return newCircuitBreakerFrom(rule);
    }
    for (CircuitBreaker cb : cbs) {
        if (rule.equals(cb.getRule())) {
            // Reuse the circuit breaker if the rule remains unchanged.
            return cb;
        }
    }
    return newCircuitBreakerFrom(rule);
}

DegradeRuleManager::newCircuitBreakerFrom

/**
 * 根据提供的熔断规则创建熔断器实例。 
 *
 * @param rule 熔断规则
 * @return 基于提供规则的新熔断器;如果规则无效或类型不受支持,则为 null
 */
private static CircuitBreaker newCircuitBreakerFrom(/*@Valid*/ DegradeRule rule) {
    // 
    switch (rule.getGrade()) {
        // 按照rt熔断
        case RuleConstant.DEGRADE_GRADE_RT:
            return new ResponseTimeCircuitBreaker(rule);
        // 按照错误率熔断    
        case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO:
        // 按照错误数熔断
        case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT:
            return new ExceptionCircuitBreaker(rule);
        default:
            return null;
    }
}

ResponseTimeCircuitBreaker

package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;
 
/**
 * @since 1.8.0
 */
public class ResponseTimeCircuitBreaker extends AbstractCircuitBreaker {

    // 慢请求比例最大值
    private static final double SLOW_REQUEST_RATIO_MAX_VALUE = 1.0d;

    // 最大允许rt
    private final long maxAllowedRt;

    // 最大慢请求比例
    private final double maxSlowRequestRatio;

    // 最小请求数
    private final int minRequestAmount;

    private final LeapArray<SlowRequestCounter> slidingCounter;

    /**
     */
    public ResponseTimeCircuitBreaker(DegradeRule rule) {
        this(rule, new SlowRequestLeapArray(1, rule.getStatIntervalMs()));
    }

    /**
     */
    ResponseTimeCircuitBreaker(DegradeRule rule, LeapArray<SlowRequestCounter> stat) {
        super(rule);
        AssertUtil.isTrue(rule.getGrade() == RuleConstant.DEGRADE_GRADE_RT, "rule metric type should be RT");
        AssertUtil.notNull(stat, "stat cannot be null");
        this.maxAllowedRt = Math.round(rule.getCount());
        this.maxSlowRequestRatio = rule.getSlowRatioThreshold();
        this.minRequestAmount = rule.getMinRequestAmount();
        this.slidingCounter = stat;
    }

}

CircuitBreaker::tryPass

package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;

import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;

/**
 * 熔断器/断路器接口
 *   
 * https://martinfowler.com/bliki/CircuitBreaker.html 
 *
 */
public interface CircuitBreaker {

    /**
     * 获取关联的熔断规则。
     *
     * @return 关联的熔断规则
     */
    DegradeRule getRule();

    /**
     * Acquires permission of an invocation only if it is available at the time of invoking.
     *
     * @param context context of current invocation
     * @return {@code true} if permission was acquired and {@code false} otherwise
     */
    boolean tryPass(Context context);

    /**
     * 获取熔断器的当前状态
     *
     * @return 当前熔断器状态
     */
    State currentState();

    /**
     * <p>Record a completed request with the context and handle state transformation of the circuit breaker.</p>
     * <p>Called when a <strong>passed</strong> invocation finished.</p>
     *
     * @param context 当前调用的上下文
     */
    void onRequestComplete(Context context);

    /**
     * 熔断器状态机
     */
    enum State {
        /**
         * 在OPEN状态下,所有的请求都会被拒绝,直到下一个恢复时间点。
         */
        OPEN,
        /**
         * 在 HALF_OPEN 状态下,熔断器将允许“探测”调用。
         * 如果根据策略调用异常(如慢调用),熔断器会重新转变为OPEN状态,等待下一个恢复时间点;
         * 否则资源将被视为“已恢复”,熔断器将停止切断请求并转换为 CLOSED 状态。 
         */
        HALF_OPEN,
        /**
         * 在 CLOSED 状态下,所有请求都被允许。当指标值超过阈值时,熔断器将转换为 OPEN
         */
        CLOSED
    }
}
package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;

/**
 * @since 1.8.0
 */
public abstract class AbstractCircuitBreaker implements CircuitBreaker {

    //
    protected final DegradeRule rule;

    // 
    protected final int recoveryTimeoutMs;

    private final EventObserverRegistry observerRegistry;

    // 
    protected final AtomicReference<State> currentState = new AtomicReference<>(State.CLOSED);

    // 
    protected volatile long nextRetryTimestamp;

    public AbstractCircuitBreaker(DegradeRule rule) {
        this(rule, EventObserverRegistry.getInstance());
    }

}
@Override
public boolean tryPass(Context context) {
    // 模版实现 
    if (currentState.get() == State.CLOSED) {
        return true;
    }
    // 熔断器 开启 
    if (currentState.get() == State.OPEN) {
        // 对于半开状态,允许探测请求。
        return retryTimeoutArrived() && fromOpenToHalfOpen(context);
    }
    return false;
}
/**  */
protected boolean retryTimeoutArrived() {
     // 当前时间戳 >= 下个周期
    return TimeUtil.currentTimeMillis() >= nextRetryTimestamp;
}
protected boolean fromOpenToHalfOpen(Context context) {
    // 熔断器只有在OPEN状态下才能设置成HALF_OPEN
    // 并发场景只有一个线程能设置成功
    if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
        // 事件通知监听者
        notifyObservers(State.OPEN, State.HALF_OPEN, null);
        // 获取当前上下文的entry
        Entry entry = context.getCurEntry();
        // 退出回调处理handler
        entry.whenTerminate(new BiConsumer<Context, Entry>() {
            @Override
            public void accept(Context context, Entry entry) {
                // Note: This works as a temporary workaround for https://github.com/alibaba/Sentinel/issues/1638
                // Without the hook, the circuit breaker won't recover from half-open state in some circumstances
                // when the request is actually blocked by upcoming rules (not only degrade rules).
                if (entry.getBlockError() != null) {
                    // 熔断器打开 
                    // Fallback to OPEN due to detecting request is blocked
                    currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
                    // 
                    notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
                }
            }
        });
        // 
        return true;
    }

    // 关闭状态 或者 半开状态 返回false
    return false;
}

熔断只是对时间和状态进行判断,时间和状态来自哪里?
时间在执行完毕后的exits里
看 com.alibaba.csp.sentinel.slots.statistic.StatisticSlot::exit 方法

package com.alibaba.csp.sentinel.slots.statistic;


/**
 * 专用于实时统计的处理器插槽。
 */
@Spi(order = Constants.ORDER_STATISTIC_SLOT)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    /** 退出时调用exit方法 */
    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        // 获取当前上下文的node
        Node node = context.getCurNode();

        // 当前上下文的entry没有被流控
        if (context.getCurEntry().getBlockError() == null) {
            // 计算响应时间  (使用 completeStatTime 作为完成时间)
            long completeStatTime = TimeUtil.currentTimeMillis();
            context.getCurEntry().setCompleteTimestamp(completeStatTime);
            // 当前时间戳 - 开始时间戳
            long rt = completeStatTime - context.getCurEntry().getCreateTimestamp();

            Throwable error = context.getCurEntry().getError();

            // 记录响应时间和成功请求个数
            recordCompleteFor(node, count, rt, error);
            recordCompleteFor(context.getCurEntry().getOriginNode(), count, rt, error);
            // 
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                recordCompleteFor(Constants.ENTRY_NODE, count, rt, error);
            }
        }

        // 使用注册的退出回调处理程序处理退出事件。
        Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
        for (ProcessorSlotExitCallback handler : exitCallbacks) {
            handler.onExit(context, resourceWrapper, count, args);
        }

        // fix bug https://github.com/alibaba/Sentinel/issues/2374
        fireExit(context, resourceWrapper, count, args);
    }
/**  */
private void recordCompleteFor(Node node, int batchCount, long rt, Throwable error) {
    if (node == null) {
        return;
    }
    // 
    node.addRtAndSuccess(rt, batchCount);
    node.decreaseThreadNum();

    if (error != null && !(error instanceof BlockException)) {
        node.increaseExceptionQps(batchCount);
    }
}

StatisticSlot

package com.alibaba.csp.sentinel.slots.statistic;


/**
 * 专用于实时统计的处理器插槽。
 */
@Spi(order = Constants.ORDER_STATISTIC_SLOT)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    /**
     * Holds statistics of the recent {@code INTERVAL} milliseconds. The {@code INTERVAL} is divided into time spans
     * by given {@code sampleCount}.
     */
    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
        IntervalProperty.INTERVAL);

    /**
     * Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds,
     * meaning each bucket per second, in this way we can get accurate statistics of each second.
     */
    private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

    /**
     * The counter for thread count.
     */
    private LongAdder curThreadNum = new LongAdder();

    /**
     * The last timestamp when metrics were fetched.
     */
    private long lastFetchTime = -1;

    @Override
    public void addPassRequest(int count) {
        rollingCounterInSecond.addPass(count);
        rollingCounterInMinute.addPass(count);
    }

}
package com.alibaba.csp.sentinel.slots.statistic.metric;
 
/**
 * The basic metric class in Sentinel using a {@link BucketLeapArray} internal.
 */
public class ArrayMetric implements Metric {

    private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }


    @Override
    public void addPass(int count) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addPass(count);
    }

}

LeapArray

package com.alibaba.csp.sentinel.slots.statistic.base;

/**
 * <p>
 * Basic data structure for statistic metrics in Sentinel.
 * </p>
 * <p>
 * Leap array use sliding window algorithm to count data. Each bucket cover {@code windowLengthInMs} time span,
 * and the total time span is {@link #intervalInMs}, so the total bucket amount is:
 * {@code sampleCount = intervalInMs / windowLengthInMs}.
 * </p>
 *
 * @param <T> type of statistic data
 * @author jialiang.linjl
 * @author Eric Zhao
 * @author Carpenter Lee
 */
public abstract class LeapArray<T> {

    protected int windowLengthInMs;
    protected int sampleCount;
    protected int intervalInMs;
    private double intervalInSecond;

    protected final AtomicReferenceArray<WindowWrap<T>> array;

    /**
     * The conditional (predicate) update lock is used only when current bucket is deprecated.
     */
    private final ReentrantLock updateLock = new ReentrantLock();

    /**
     * Get the bucket at current timestamp.
     *
     * @return the bucket at current timestamp
     */
    public WindowWrap<T> currentWindow() {
        return currentWindow(TimeUtil.currentTimeMillis());
    }



}
/**
 * Get bucket item at provided timestamp.
 *
 * @param timeMillis a valid timestamp in milliseconds
 * @return current bucket item at provided timestamp if the time is valid; null if time is invalid
 */
public WindowWrap<T> currentWindow(long timeMillis) {
    if (timeMillis < 0) {
        return null;
    }

    // 计算下标
    int idx = calculateTimeIdx(timeMillis);
    // 计算当前桶开始时间。
    long windowStart = calculateWindowStart(timeMillis);

    /*
     * Get bucket item at given time from the array.
     *
     * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
     * (2) Bucket is up-to-date, then just return the bucket.
     * (3) Bucket is deprecated, then reset current bucket.
     */
    while (true) {
        WindowWrap<T> old = array.get(idx);
        if (old == null) {
            /*
             *     B0       B1      B2    NULL      B4
             * ||_______|_______|_______|_______|_______||___
             * 200     400     600     800     1000    1200  timestamp
             *                             ^
             *                          time=888
             *            bucket is empty, so create new and update
             *
             * If the old bucket is absent, then we create a new bucket at {@code windowStart},
             * then try to update circular array via a CAS operation. Only one thread can
             * succeed to update, while other threads yield its time slice.
             */
            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            if (array.compareAndSet(idx, null, window)) {
                // Successfully updated, return the created bucket.
                return window;
            } else {
                // Contention failed, the thread will yield its time slice to wait for bucket available.
                Thread.yield();
            }
        } else if (windowStart == old.windowStart()) {
            /*
             *     B0       B1      B2     B3      B4
             * ||_______|_______|_______|_______|_______||___
             * 200     400     600     800     1000    1200  timestamp
             *                             ^
             *                          time=888
             *            startTime of Bucket 3: 800, so it's up-to-date
             *
             * If current {@code windowStart} is equal to the start timestamp of old bucket,
             * that means the time is within the bucket, so directly return the bucket.
             */
            return old;
        } else if (windowStart > old.windowStart()) {
            /*
             *   (old)
             *             B0       B1      B2    NULL      B4
             * |_______||_______|_______|_______|_______|_______||___
             * ...    1200     1400    1600    1800    2000    2200  timestamp
             *                              ^
             *                           time=1676
             *          startTime of Bucket 2: 400, deprecated, should be reset
             *
             * If the start timestamp of old bucket is behind provided time, that means
             * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
             * Note that the reset and clean-up operations are hard to be atomic,
             * so we need a update lock to guarantee the correctness of bucket update.
             *
             * The update lock is conditional (tiny scope) and will take effect only when
             * bucket is deprecated, so in most cases it won't lead to performance loss.
             */
            if (updateLock.tryLock()) {
                try {
                    // Successfully get the update lock, now we reset the bucket.
                    return resetWindowTo(old, windowStart);
                } finally {
                    updateLock.unlock();
                }
            } else {
                // Contention failed, the thread will yield its time slice to wait for bucket available.
                Thread.yield();
            }
        } else if (windowStart < old.windowStart()) {
            // Should not go through here, as the provided time is already behind.
            return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        }
    }
}
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
    long timeId = timeMillis / windowLengthInMs;
    // Calculate current index so we can map the timestamp to the leap array.
    return (int)(timeId % array.length());
}
package com.alibaba.csp.sentinel.slots.statistic.metric.occupy;
 
/**
 * @since 1.5.0
 */
public class OccupiableBucketLeapArray extends LeapArray<MetricBucket> {

    @Override
    protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) {
        // Update the start time and reset value.
        w.resetTo(time);
        MetricBucket borrowBucket = borrowArray.getWindowValue(time);
        if (borrowBucket != null) {
            w.value().reset();
            w.value().addPass((int)borrowBucket.pass());
        } else {
            w.value().reset();
        }

        return w;
    }

}