Sentinel核心流程源码解读

Sentinel整体架构如下
Sentinel架构
图里从下往上可以看到,核心的部分包含 规则(rules)处理插槽(slot)调用链路(invocation tree)集群节点(cluster node)滑动窗口(slading winodw) 5部分。

Sentinel代码使用 tag 1.8.6 ,对应github链接 https://github.com/alibaba/Sentinel/tree/1.8.6

(1) 核心流程源码解读

核心源码包含以下几个部分
1.规则(rules)-限流规则/熔断规则
2.构建功能插槽(solt)责任链
3.调用链路(invocation tree)
4.集群节点(cluster node)
5.滑动窗口(slading winodw)

public class SimpleDemo {

    public static void main(String[] args) {
        // 配置规则.
        initFlowRules();

        //while (true) {
            // 1.5.0 版本开始可以直接利用 try-with-resources 特性
            try (Entry entry = SphU.entry("HelloWorld")) {
                // 被保护的逻辑
                System.out.println("hello world");
            } catch (BlockException ex) {
                // 处理被流控的逻辑
                System.out.println("blocked!");
            }
        //}
    }

    private static void initFlowRules() {
        List<FlowRule> rules = new ArrayList<>();
        FlowRule rule = new FlowRule();
        rule.setResource("HelloWorld");
        rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
        // Set limit QPS to 2.
        rule.setCount(2);
        rules.add(rule);
        // 
        FlowRuleManager.loadRules(rules);
    }

}

(2) 加载限流规则

资源对应限流规则

List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule();
rule.setResource("HelloWorld");
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
// Set limit QPS to 2.
rule.setCount(2);
rules.add(rule);
// 
FlowRuleManager.loadRules(rules);

(3) 构建功能插槽(ProcessSolt)责任链

这块对应的代码是 SlotChainProvider.newSlotChain();
在构建功能插槽的时候使用责任链设计模式和使用SPI提高扩展性

构建完的责任链类似这种
功能插槽责任链

整体上的代码如下

package com.alibaba.csp.sentinel.slotchain;

/**
 * A provider for creating slot chains via resolved slot chain builder SPI.
 */
public final class SlotChainProvider {

    private static volatile SlotChainBuilder slotChainBuilder = null;

    /**
     * The load and pick process is not thread-safe, but it's okay since the method should be only invoked
     * via {@code lookProcessChain} in {@link com.alibaba.csp.sentinel.CtSph} under lock.
     *
     * @return new created slot chain
     */
    public static ProcessorSlotChain newSlotChain() {
        if (slotChainBuilder != null) {
            return slotChainBuilder.build();
        }

        // 使用SPI构建插槽实例 
        // 这块通过SPI读取配置文件加载类  
        // 读取的配置文件 META-INF/services/com.alibaba.csp.sentinel.slotchain.SlotChainBuilder
        // slotChainBuilder=com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder
        // Resolve the slot chain builder SPI.
        slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();

        if (slotChainBuilder == null) {
            // 不应该走到这儿  走到这儿肯定有问题
            RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
            slotChainBuilder = new DefaultSlotChainBuilder();
        } else {
            RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
                slotChainBuilder.getClass().getCanonicalName());
        }

        // 构建功能插槽责任链
        return slotChainBuilder.build();
    }
}

这儿比较重要的方法有两个,一个是 SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault(),另一个是 slotChainBuilder.build()

(3.1) 构造默认的slotChainBuilder

package com.alibaba.csp.sentinel.spi;

public final class SpiLoader<S> {

    /**
     * Load the first-found Provider instance,if not found, return default Provider instance
     *
     * @return Provider instance
     */
    public S loadFirstInstanceOrDefault() {
        // 使用SPI读取配置文件获取全限定类目,并通过ClassLoader加载class
        load();

        for (Class<? extends S> clazz : classList) {
            if (defaultClass == null || clazz != defaultClass) {
                return createInstance(clazz);
            }
        }

        // 实例化  newInstance 
        return loadDefaultInstance();
    }

}

(3.2) 构造ProcessorSlot责任链

package com.alibaba.csp.sentinel.slots;

/**
 * 默认 ProcessorSlotChain 的构建器  
 *  
 * 
 * Builder for a default {@link ProcessorSlotChain}.
 *
 */
@Spi(isDefault = true)
public class DefaultSlotChainBuilder implements SlotChainBuilder {

    @Override
    public ProcessorSlotChain build() {
        // 创建默认的 DefaultProcessorSlotChain 
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();

        // 使用SPI读取配置,并通过ClassLoader加载class 
        // 读取的配置文件 META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot
        // 这儿会读取到8个 ProcessorSlot
        List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
        for (ProcessorSlot slot : sortedSlotList) {
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }
            
            // 按照字典序设置ProcessorSlot责任链
            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }

        return chain;
    }
}

META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot文件内容如下

# Sentinel default ProcessorSlots
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot

可以仔细看一下,文件里的内容已经按照字典序排序了

构建完成的功能插槽责任链类似下面这样:
功能插槽责任链抽象

对应的代码Debug截图:
构建完成的功能插槽责任链

(3.2.1) 默认功能插槽责任链-DefaultProcessorSlotChain

package com.alibaba.csp.sentinel.slotchain;

public class DefaultProcessorSlotChain extends ProcessorSlotChain {

    // 头结点 first
    AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
        // 省略部分代码
    };

    // 尾结点 end
    AbstractLinkedProcessorSlot<?> end = first;

    /** 添加头节点 */
    @Override
    public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        protocolProcessor.setNext(first.getNext());
        first.setNext(protocolProcessor);
        if (end == first) {
            end = protocolProcessor;
        }
    }

    /** 添加尾结点 */
    @Override
    public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        end.setNext(protocolProcessor);
        end = protocolProcessor;
    }


    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
        throws Throwable {
        first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        first.exit(context, resourceWrapper, count, args);
    }

}

(3.3) 功能插槽责任链-ProcessorSlot

执行各个功能插槽的入口是chain.entry()
这里的chain是DefaultProcessorSlotChainchain.entry()会不断调用对应的entry()方法

先来了解一下 有哪些功能插槽 及 主要作用

(3.3.1) Sentinel里的功能插槽

处理插槽 作用 备注 全限定类名
NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来 用于根据调用路径来限流降级 com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
ClusterBuilderSlot 负责维护资源运行统计信息(响应时间、qps、线程数、异常),以及调用者列表 这些信息将用作为多维度限流,降级的依据 com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
LogSlot 记录(BlockException)异常日志 (限流、熔断) com.alibaba.csp.sentinel.slots.logger.LogSlot
StatisticSlot 用于记录、统计不同纬度的 runtime 指标监控信息; com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
AuthoritySlot 根据配置的黑白名单和调用来源信息,来做黑白名单控制; com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
SystemSlot 通过系统的状态,例如 load1 等,来控制总的入口流量; com.alibaba.csp.sentinel.slots.system.SystemSlot
FlowSlot 用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制; com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
DegradeSlot 通过统计信息以及预设的规则,来做熔断降级; com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot

(3.3.2) 功能插槽-ProcessorSlot

源码 https://github.com/alibaba/Sentinel/blob/1.8.6/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slotchain/ProcessorSlot.java

package com.alibaba.csp.sentinel.slotchain;

/**
 * 一些处理的容器 和 处理完成时的通知方式。 
 */
public interface ProcessorSlot<T> {

    /**
     * 插槽入口
     *
     * @param context         当前上下文
     * @param resourceWrapper 当前资源
     * @param param           泛型参数 {@link com.alibaba.csp.sentinel.node.Node}
     * @param count           需要的令牌个数
     * @param prioritized     entry优先级
     * @param args            原始调用的参数
     * @throws Throwable blocked exception or unexpected error
     */
    void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,
               Object... args) throws Throwable;

    /**
     * 表示entry()方法结束
     *
     * @param context         当前上下文
     * @param resourceWrapper 当前资源
     * @param obj             相关对象 (e.g. Node)
     * @param count           需要的令牌个数
     * @param prioritized     entry优先级
     * @param args            原始调用的参数
     * @throws Throwable blocked exception or unexpected error
     */
    void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,
                   Object... args) throws Throwable;

    /**
     * 退出插槽
     *
     * @param context         当前上下文
     * @param resourceWrapper 当前资源
     * @param count           需要的令牌个数
     * @param args            原始调用的参数
     */
    void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);

    /**
     * 表示exit结束
     *
     * @param context         当前上下文
     * @param resourceWrapper 当前资源
     * @param count           需要的令牌个数
     * @param args            原始调用的参数
     */
    void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}

调用功能插槽时都是从entry方法进入

(3.4) 默认的8个功能插槽

(3.4.1) 调用链路(资源路径)插槽-NodeSelectorSlot

NodeSelectorSlot主要作用是负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;

源码:https://github.com/alibaba/Sentinel/blob/1.8.6/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/nodeselector/NodeSelectorSlot.java#L136

package com.alibaba.csp.sentinel.slots.nodeselector;

/**
 * </p>
 * This class will try to build the calling traces via
 * <ol>
 * <li>adding a new {@link DefaultNode} if needed as the last child in the context.
 * The context's last node is the current node or the parent node of the context. </li>
 * <li>setting itself to the context current node.</li>
 * </ol>
 * </p>
 *
 * <p>It works as follow:</p>
 * <pre>
 * ContextUtil.enter("entrance1", "appA");
 * Entry nodeA = SphU.entry("nodeA");
 * if (nodeA != null) {
 *     nodeA.exit();
 * }
 * ContextUtil.exit();
 * </pre>
 *
 * Above code will generate the following invocation structure in memory:
 *
 * <pre>
 *
 *              machine-root
 *                  /
 *                 /
 *           EntranceNode1
 *               /
 *              /
 *        DefaultNode(nodeA)- - - - - -> ClusterNode(nodeA);
 * </pre>
 *
 * <p>
 * Here the {@link EntranceNode} represents "entrance1" given by
 * {@code ContextUtil.enter("entrance1", "appA")}.
 * </p>
 * <p>
 * Both DefaultNode(nodeA) and ClusterNode(nodeA) holds statistics of "nodeA", which is given
 * by {@code SphU.entry("nodeA")}
 * </p>
 * <p>
 * The {@link ClusterNode} is uniquely identified by the ResourceId; the {@link DefaultNode}
 * is identified by both the resource id and {@link Context}. In other words, one resource
 * id will generate multiple {@link DefaultNode} for each distinct context, but only one
 * {@link ClusterNode}.
 * </p>
 * <p>
 * the following code shows one resource id in two different context:
 * </p>
 *
 * <pre>
 *    ContextUtil.enter("entrance1", "appA");
 *    Entry nodeA = SphU.entry("nodeA");
 *    if (nodeA != null) {
 *        nodeA.exit();
 *    }
 *    ContextUtil.exit();
 *
 *    ContextUtil.enter("entrance2", "appA");
 *    nodeA = SphU.entry("nodeA");
 *    if (nodeA != null) {
 *        nodeA.exit();
 *    }
 *    ContextUtil.exit();
 * </pre>
 *
 * Above code will generate the following invocation structure in memory:
 *
 * <pre>
 *
 *                  machine-root
 *                  /         \
 *                 /           \
 *         EntranceNode1   EntranceNode2
 *               /               \
 *              /                 \
 *      DefaultNode(nodeA)   DefaultNode(nodeA)
 *             |                    |
 *             +- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA);
 * </pre>
 *
 * <p>
 * As we can see, two {@link DefaultNode} are created for "nodeA" in two context, but only one
 * {@link ClusterNode} is created.
 * </p>
 *
 * <p>
 * We can also check this structure by calling: <br/>
 * {@code curl http://localhost:8719/tree?type=root}
 * </p>
 *
 * @author jialiang.linjl
 * @see EntranceNode
 * @see ContextUtil
 */
@Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {

    /**
     * {@link DefaultNode}s of the same resource in different context.
     */
    private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        /*
         * It's interesting that we use context name rather resource name as the map key.
         *
         * Remember that same resource({@link ResourceWrapper#equals(Object)}) will share
         * the same {@link ProcessorSlotChain} globally, no matter in which context. So if
         * code goes into {@link #entry(Context, ResourceWrapper, DefaultNode, int, Object...)},
         * the resource name must be same but context name may not.
         *
         * If we use {@link com.alibaba.csp.sentinel.SphU#entry(String resource)} to
         * enter same resource in different context, using context name as map key can
         * distinguish the same resource. In this case, multiple {@link DefaultNode}s will be created
         * of the same resource name, for every distinct context (different context name) each.
         *
         * Consider another question. One resource may have multiple {@link DefaultNode},
         * so what is the fastest way to get total statistics of the same resource?
         * The answer is all {@link DefaultNode}s with same resource name share one
         * {@link ClusterNode}. See {@link ClusterBuilderSlot} for detail.
         */
        // 根据上下文名称获取节点 
        DefaultNode node = map.get(context.getName());
        // 单例模式-DCL
        if (node == null) {
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
                    node = new DefaultNode(resourceWrapper, null);
                    HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;
                    // 构建调用树
                    ((DefaultNode) context.getLastNode()).addChild(node);
                }

            }
        }

        context.setCurNode(node);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

}    
package com.alibaba.csp.sentinel.node;


public class DefaultNode extends StatisticNode {

    /**
     * 孩子节点集合
     */
    private volatile Set<Node> childList = new HashSet<>();

    /**
     * 添加孩子节点
     *
     * @param node valid child node
     */
    public void addChild(Node node) {
        if (node == null) {
            RecordLog.warn("Trying to add null child to node <{}>, ignored", id.getName());
            return;
        }
        // DCL
        if (!childList.contains(node)) {
            synchronized (this) {
                if (!childList.contains(node)) {
                    // 添加孩子节点
                    Set<Node> newSet = new HashSet<>(childList.size() + 1);
                    newSet.addAll(childList);
                    newSet.add(node);
                    childList = newSet;
                }
            }
            RecordLog.info("Add child <{}> to node <{}>", ((DefaultNode)node).id.getName(), id.getName());
        }
    }

}
/**
 * <pre>
 *
 *                  machine-root
 *                  /         \
 *                 /           \
 *         EntranceNode1   EntranceNode2
 *               /               \
 *              /                 \
 *      DefaultNode(nodeA)   DefaultNode(nodeA)
 *             |                    |
 *             +- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA);
 * </pre>
 */

(3.4.2) 节点统计信息插槽-ClusterBuilderSlot

ClusterBuilderSlot负责维护资源运行统计信息(响应时间、qps、线程数、异常),以及调用者列表

https://github.com/alibaba/Sentinel/blob/1.8.6/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/clusterbuilder/ClusterBuilderSlot.java#L77

package com.alibaba.csp.sentinel.slots.clusterbuilder;
 
/**
 * 该槽维护资源运行统计信息(响应时间、qps、线程数、异常),以及调用者列表,由 ContextUtil#enter(String origin)标记 
 * 一个资源只有一个集群节点,而一个资源可以有多个默认节点。
 */
@Spi(isSingleton = false, order = Constants.ORDER_CLUSTER_BUILDER_SLOT)
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    /**
     * 相同的资源 ResourceWrapper#equals(Object) 将在全局范围内共享相同的ProcessorSlotChain,无论在哪个上下文中。
     * 因此,如果代码进入 entry(),资源名称必须相同,但上下文名称可能不同。
     * 
     * 为了获取同一资源在不同上下文中的总统计信息,同一资源在全局共享相同的ClusterNode。所有ClusterNode都缓存在此映射中。
     * 
     * 应用程序运行的时间越长,这个映射就会变得越稳定。所以我们不是并发映射而是锁。因为这个锁只发生在最开始,而并发映射将一直持有锁。
     * 
     */
    private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();

    private static final Object lock = new Object();

    private volatile ClusterNode clusterNode = null;

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args)
        throws Throwable {
        // 单例-DCL
        if (clusterNode == null) {
            synchronized (lock) {
                if (clusterNode == null) {
                    // 创建集群节点​​。
                    clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
                    HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                    newMap.putAll(clusterNodeMap);
                    // 添加到缓存里
                    newMap.put(node.getId(), clusterNode);
                    // 更新缓存
                    clusterNodeMap = newMap;
                }
            }
        }
        node.setClusterNode(clusterNode);

        /*
         * 如果设置了原始上下文,我们应该获取 或 创建原始直接的Node。
         */
        if (!"".equals(context.getOrigin())) {
            Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
            context.getCurEntry().setOriginNode(originNode);
        }

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

}

(3.4.3) 日志插槽-LogSlot

LogSlot主要负责记录限流异常的响应,以提供用于故障排除的具体日志。
记录的日志存储在 sentinel-block.log 文件里

源码: https://github.com/alibaba/Sentinel/blob/1.8.6/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/logger/LogSlot.java#L35

package com.alibaba.csp.sentinel.slots.logger;

 
/**
 * 一个处理插槽,它是对记录限流异常的响应,以提供用于故障排除的具体日志。
 */
@Spi(order = Constants.ORDER_LOG_SLOT)
public class LogSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        try {
            fireEntry(context, resourceWrapper, obj, count, prioritized, args);
        } catch (BlockException e) {
            // 记录异常日志(包括限流和降级) 
            // 日志会记录在 sentinel-block.log 文件里
            EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
                context.getOrigin(), e.getRule().getId(), count);
            throw e;
        } catch (Throwable e) {
            RecordLog.warn("Unexpected entry exception", e);
        }

    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        try {
            fireExit(context, resourceWrapper, count, args);
        } catch (Throwable e) {
            RecordLog.warn("Unexpected entry exit exception", e);
        }
    }
}

sentinel-block.log 文件内容如下:

2020-10-30 14:39:29|1|HelloWorld,FlowException,default,|46944,0
2020-10-30 14:39:30|1|HelloWorld,FlowException,default,|138183,0
2020-10-30 14:39:31|1|HelloWorld,FlowException,default,|188067,0
2020-11-25 20:12:24|1|sentinelTest,FlowException,default,|400,0
2020-11-25 20:12:25|1|sentinelTest,FlowException,default,|540,0
2020-11-25 21:00:57|1|sentinelTest,FlowException,default,|9,0
2020-11-25 21:00:58|1|sentinelTest,FlowException,default,|1,0
2020-11-25 21:00:59|1|sentinelTest,FlowException,default,|62,0
2020-11-25 21:01:00|1|sentinelTest,FlowException,default,|838,0
2020-12-10 10:39:24|1|SentinelResourceMethod1,FlowException,app1,app1|1,0
2020-12-10 10:39:25|1|SentinelResourceMethod1,FlowException,app1,app1|1,0

(3.4.4) 实时统计插槽-StatisticSlot

源码 https://github.com/alibaba/Sentinel/blob/1.8.6/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/StatisticSlot.java#L55

package com.alibaba.csp.sentinel.slots.statistic;

/**
 * 专用于实时统计的处理器插槽。
 * 在进入这个槽的时候,我们需要单独统计以下信息:
 *   ClusterNode:资源ID的集群节点的总统计。
 *   源节点:来自不同调用者/源的集群节点的统计信息。 
 *   DefaultNode:特定上下文中特定资源名称的统计信息。
 *   最后是所有入口的总和统计。
 */
@Spi(order = Constants.ORDER_STATISTIC_SLOT)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            // Do some checking.
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // Request passed, add thread count and pass count.
            node.increaseThreadNum();
            node.addPassRequest(count);

            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }

            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (PriorityWaitException ex) {
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException e) {
            // Blocked, set block exception to current entry.
            context.getCurEntry().setBlockError(e);

            // Add block count.
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }

            // Handle block event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }

            throw e;
        } catch (Throwable e) {
            // Unexpected internal error, set error to current entry.
            context.getCurEntry().setError(e);

            throw e;
        }
    }

}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
    Node node = context.getCurNode();

    if (context.getCurEntry().getBlockError() == null) {
        // Calculate response time (use completeStatTime as the time of completion).
        long completeStatTime = TimeUtil.currentTimeMillis();
        context.getCurEntry().setCompleteTimestamp(completeStatTime);
        long rt = completeStatTime - context.getCurEntry().getCreateTimestamp();

        Throwable error = context.getCurEntry().getError();

        // Record response time and success count.
        recordCompleteFor(node, count, rt, error);
        recordCompleteFor(context.getCurEntry().getOriginNode(), count, rt, error);
        if (resourceWrapper.getEntryType() == EntryType.IN) {
            recordCompleteFor(Constants.ENTRY_NODE, count, rt, error);
        }
    }

    // Handle exit event with registered exit callback handlers.
    Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
    for (ProcessorSlotExitCallback handler : exitCallbacks) {
        handler.onExit(context, resourceWrapper, count, args);
    }

    // fix bug https://github.com/alibaba/Sentinel/issues/2374
    fireExit(context, resourceWrapper, count, args);
}

(3.4.5) 权限规则检查插槽-AuthoritySlot

AuthoritySlot 的主要作用是 权限规则检查。

源码 https://github.com/alibaba/Sentinel/blob/1.8.6/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/authority/AuthoritySlot.java

package com.alibaba.csp.sentinel.slots.block.authority;

/**
 * 致力于权限规则检查的一个处理插槽。
 */
@Spi(order = Constants.ORDER_AUTHORITY_SLOT)
public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
        throws Throwable {
        // 权限校验  
        checkBlackWhiteAuthority(resourceWrapper, context);
        // 
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }


    /**  */
    void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
        // 获取所有资源的权限规则
        Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();

        if (authorityRules == null) {
            return;
        }

        // 获取当前资源的权限规则  
        Set<AuthorityRule> rules = authorityRules.get(resource.getName());
        if (rules == null) {
            return;
        }

        for (AuthorityRule rule : rules) {
            // 检查权限
            if (!AuthorityRuleChecker.passCheck(rule, context)) {
                // 权限校验不通过抛AuthorityException异常
                throw new AuthorityException(context.getOrigin(), rule);
            }
        }
    }

}    

(3.4.6) 系统规则检查插槽-SystemSlot

SystemSlot主要作用是系统规则检查。

源码: https://github.com/alibaba/Sentinel/blob/1.8.6/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/system/SystemSlot.java#L36

package com.alibaba.csp.sentinel.slots.system;

/**
 * 致力于系统规则检查的一个处理插槽。
 */
@Spi(order = Constants.ORDER_SYSTEM_SLOT)
public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        // 系统规则检查                
        SystemRuleManager.checkSystem(resourceWrapper, count);
        // 
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }

}
package com.alibaba.csp.sentinel.slots.system;

/**
 * Sentinel System Rule 使入站流量和容量满足。 它考虑了传入请求的平均 rt、qps、线程数。 
 * 它还提供了系统负载的度量,但仅在 Linux 上可用。
 *
 * rt、qps、线程数很好理解。 如果传入请求的 rt、qps、线程数超过其阈值,请求将被拒绝。
 * 但是,我们使用不同的方法来计算负载。
 *
 * 将系统视为管道,约束之间的转换导致三个不同区域(交通限制、容量限制和危险区域)具有不同性质的行为。 
 * 当运行中没有足够的请求来填充管道时,RTprop 会确定行为; 否则,系统容量占主导地位。 
 * 约束线在飞行中相交 = Capacity × RTprop。 
 * 由于管道已满,超过这一点,机载容量过剩产生了一个队列,导致RTT对机载流量的线性依赖和系统负载的增加。
 * 在危险区域,系统将停止响应。 
 * 
 * 参考 BBR 算法了解更多。
 * 
 * 注意,SystemRule仅对入站请求有效,出站流量不受SystemRule限制 
 */
public final class SystemRuleManager {

    /**
     * 将系统规则应用于资源。 只会检查入站流量。
     *
     * @param 资源
     * @throws BlockException 当超过任何系统规则的阈值时。
     */
    public static void checkSystem(ResourceWrapper resourceWrapper, int count) throws BlockException {
        if (resourceWrapper == null) {
            return;
        }
        // 确保检查开关打开。
        if (!checkSystemStatus.get()) {
            return;
        }

        // 仅用于入站流量 
        if (resourceWrapper.getEntryType() != EntryType.IN) {
            return;
        }

        // Constants.ENTRY_NODE 是 入站流量的全局统计节点。

        // total qps
        double currentQps = Constants.ENTRY_NODE.passQps();
        if (currentQps + count > qps) {
            throw new SystemBlockException(resourceWrapper.getName(), "qps");
        }

        // total thread
        int currentThread = Constants.ENTRY_NODE.curThreadNum();
        if (currentThread > maxThread) {
            throw new SystemBlockException(resourceWrapper.getName(), "thread");
        }

        // 延时
        double rt = Constants.ENTRY_NODE.avgRt();
        if (rt > maxRt) {
            throw new SystemBlockException(resourceWrapper.getName(), "rt");
        }

        // 负载  BBR algorithm.
        if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
            if (!checkBbr(currentThread)) {
                throw new SystemBlockException(resourceWrapper.getName(), "load");
            }
        }

        // cpu usage
        if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
            throw new SystemBlockException(resourceWrapper.getName(), "cpu");
        }
    }

}

(3.4.7) 插槽-FlowSlot

package com.alibaba.csp.sentinel.slots.block.flow;

/**
 * 结合从前面的插槽(NodeSelectorSlot、ClusterNodeBuilderSlot 和 StatisticSlot)收集的
 * 运行时统计信息,FlowSlot 将使用预设规则来决定是否应阻止传入请求。
 *
 * 如果触发任何规则,SphU.entry(resourceName) 将抛出 FlowException。 
 * 用户可以通过捕获 FlowException 来自定义自己的逻辑。
 *
 * 一个资源可以有多个流规则。 FlowSlot 遍历这些规则,直到其中一条被触发或者所有规则都被遍历。
 *
 * 每个FlowRule主要由这些因素组成:等级、策略、路径。 我们可以结合这些因素来达到不同的效果。
 * 
 * 等级由 FlowRule 中的 grade 字段定义。 
 * 此处,0 用于线程隔离,1 用于请求计数整形 (QPS)。 
 * 线程计数和请求计数都是在真实运行时收集的,
 * 我们可以通过以下命令查看这些统计信息:
 * <pre>
 * curl http://localhost:8719/tree
 *
 * idx id    thread pass  blocked   success total aRt   1m-pass   1m-block   1m-all   exception
 * 2   abc647 0      460    46          46   1    27      630       276        897      0
 * </pre>
 *
 * thread 当前正在处理资源的线程数 
 * pass 一秒内传入请求数 
 * blocked 一秒内被阻塞的请求数 
 * success 一秒内被Sentinel成功处理的请求数 
 * RT 请求在一秒内的平均响应时间 
 * total 一秒内传入请求和阻塞请求的总和 
 * 1m-pass 为一分钟内传入请求数 
 * 1m-block 为一分钟内被阻塞的请求数 
 * 1m-all 是一分钟内传入和阻止的请求总数 
 * exception 为一秒内业务(自定义)异常的计数 
 * 
 * 这个阶段通常用于保护资源不被占用。 (达到限流阈值,服务快撑不住了)
 * 如果资源需要很长时间才能完成,线程将开始占用。 响应时间越长,占用的线程越多。
 * 
 * 除了计数器之外,线程池或信号量也可以用来实现这一点。
 * - 线程池:分配一个线程池来处理这些资源。 当池中不再有空闲线程时,拒绝请求而不影响其他资源。
 * - 信号量:使用信号量来控制该资源中线程的并发数。
 * 
 * 使用线程池的好处是,超时可以优雅的走开。 但它也给我们带来了上下文切换和额外线程的成本。 
 * 如果传入请求已经在单独的线程中提供服务,例如 Servlet HTTP 请求,那么如果使用线程池,线程数几乎会翻倍。
 * 
 * 
 * 流量整形 
 * 当QPS超过阈值时,Sentinel会采取动作控制传入的请求,由流规则中的 controlBehavior 字段配置。
 *   1.立即拒绝(Immediately reject):默认行为。 超出的请求立即被拒绝 并抛出 FlowException
 *   2.热启动(Warmup) : 如果一段时间以来系统的负载很低,大量的请求来了,系统可能无法一次处理所有这些请求。 
 *        然而,如果我们稳定地增加传入请求,系统可以预热并最终能够处理所有请求。 
 *        可以通过在流规则中设置字段 warmUpPeriodSec 来配置此预热期。
 *   3.统一速率限制  此策略严格控制请求之间的间隔。换句话说,它允许请求以稳定、统一的速率传递。
 *        https://raw.githubusercontent.com/wiki/alibaba/Sentinel/image/uniform-speed-queue.png 
 *        该策略是漏桶算法的实现  https://en.wikipedia.org/wiki/Leaky_bucket  
 *        它用于以稳定的速率处理请求,通常用于突发业务(例如消息处理)。
 *        当大量超出系统容量的请求同时到达时,使用此策略的系统将处理请求及其固定速率,直到所有请求都已处理或超时。
 */
@Spi(order = Constants.ORDER_FLOW_SLOT)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    private final FlowRuleChecker checker;

    public FlowSlot() {
        this(new FlowRuleChecker());
    }

    /**
     * Package-private for test.
     *
     * @param checker flow rule checker
     * @since 1.6.1
     */
    FlowSlot(FlowRuleChecker checker) {
        AssertUtil.notNull(checker, "flow checker should not be null");
        this.checker = checker;
    }

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        // 校验
        checkFlow(resourceWrapper, context, node, count, prioritized);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
        throws BlockException {
        //   
        checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }

    private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
        @Override
        public Collection<FlowRule> apply(String resource) {
            // Flow rule map should not be null.
            Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
            return flowRules.get(resource);
        }
    };
}

(3.4.8) 熔断降级插槽-DegradeSlot

package com.alibaba.csp.sentinel.slots.block.degrade;

/**
 * 专门用于熔断/电路断路 的处理插槽
 */
@Spi(order = Constants.ORDER_DEGRADE_SLOT)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        //                 
        performChecking(context, resourceWrapper);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

}    
/**  */
void performChecking(Context context, ResourceWrapper r) throws BlockException {
    // 获取资源对应的熔断器/断路器
    List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
    if (circuitBreakers == null || circuitBreakers.isEmpty()) {
        return;
    }
    for (CircuitBreaker cb : circuitBreakers) {
        // 判断是否可以通过
        if (!cb.tryPass(context)) {
            // 抛出熔断异常 DegradeException 
            throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
        }
    }
}

3.5 调用堆栈

调用8个功能插槽后的代码堆栈


参考资料

[1] Sentinel介绍
[2] Sentinel工作主流程