电商-京东 黄金流程 PC
对于传统的电商,为了保障大促时系统稳定性,保证用户可以查看购买下单,有一个黄金流程,一般的黄金流程都包括4大领域
导购(首页、分类、商详、搜索)、营销(促销、权益、券)、交易(购物车、提单、下单、支付)、履约(接单、生产、配送)
对于传统的电商,为了保障大促时系统稳定性,保证用户可以查看购买下单,有一个黄金流程,一般的黄金流程都包括4大领域
导购(首页、分类、商详、搜索)、营销(促销、权益、券)、交易(购物车、提单、下单、支付)、履约(接单、生产、配送)
java -Dserver.port=8080 \
-Dcsp.sentinel.dashboard.server=localhost:8080 \
-Dproject.name=sentinel-dashboard \
-jar target/sentinel-dashboard.jar
[weikeqin@bogon Downloads]$ java -Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard-1.8.6.jar
INFO: Sentinel log output type is: file
INFO: Sentinel log charset is: utf-8
INFO: Sentinel log base directory is: /Users/weikeqin/logs/csp/
INFO: Sentinel log name use pid is: false
INFO: Sentinel log level is: INFO
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.5.12)
2023-01-14 15:33:46.974 INFO 53124 --- [ main] c.a.c.s.dashboard.DashboardApplication : Starting DashboardApplication using Java 19.0.1 on bogon with PID 53124 (/Users/weikeqin/Downloads/sentinel-dashboard-1.8.6.jar started by weikeqin in /Users/weikeqin/Downloads)
2023-01-14 15:33:46.977 INFO 53124 --- [ main] c.a.c.s.dashboard.DashboardApplication : No active profile set, falling back to 1 default profile: "default"
2023-01-14 15:33:48.040 INFO 53124 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8080 (http)
2023-01-14 15:33:48.051 INFO 53124 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2023-01-14 15:33:48.052 INFO 53124 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.60]
2023-01-14 15:33:48.152 INFO 53124 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2023-01-14 15:33:48.153 INFO 53124 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1112 ms
2023-01-14 15:33:48.229 INFO 53124 --- [ main] c.a.c.s.dashboard.config.WebConfig : Sentinel servlet CommonFilter registered
2023-01-14 15:33:49.185 INFO 53124 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2023-01-14 15:33:49.196 INFO 53124 --- [ main] c.a.c.s.dashboard.DashboardApplication : Started DashboardApplication in 2.755 seconds (JVM running for 3.529)
2023-01-14 15:33:51.455 INFO 53124 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2023-01-14 15:33:51.455 INFO 53124 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2023-01-14 15:33:51.458 INFO 53124 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 2 ms
2023-01-14 15:34:13.892 INFO 53124 --- [nio-8080-exec-5] o.apache.tomcat.util.http.parser.Cookie : A cookie header was received [Hm_lvt_d1ad0ae2a9976c44d556abc07cda1365=1656754551,1658499321] that contained an invalid cookie. That cookie will be ignored.
Note: further occurrences of this error will be logged at DEBUG level.
[1] Sentinel控制台
[2] Sentinel 控制台
Sentinel与dubbo结合的基本思路是sentinel利用了Dubbo Filter和spi机制进行拓展,这样做的好处可以做到无需改动业务代码就能支持限流、熔断等功能。
SPI对应的配置文件 /resources/META-INF/dubbo/org.apache.dubbo.rpc.Filter
sentinel.dubbo.provider.filter=com.alibaba.csp.sentinel.adapter.dubbo.SentinelDubboProviderFilter
sentinel.dubbo.consumer.filter=com.alibaba.csp.sentinel.adapter.dubbo.SentinelDubboConsumerFilter
dubbo.application.context.name.filter=com.alibaba.csp.sentinel.adapter.dubbo.DubboAppContextFilter
package com.alibaba.csp.sentinel.adapter.dubbo;
/**
* 支持与Sentinel集成的Apache Dubbo服务提供商筛选器。默认情况下自动激活。
* 注意:这只适用于Apache Dubbo 2.7.x或更高版本。
*
* 如果要禁用提供程序筛选器,可以配置: dubbo:provider filter="-sentinel.dubbo.provider.filter"
*/
@Activate(group = PROVIDER)
public class SentinelDubboProviderFilter extends BaseSentinelDubboFilter {
public SentinelDubboProviderFilter() {
RecordLog.info("Sentinel Apache Dubbo provider filter initialized");
}
@Override
String getMethodName(Invoker invoker, Invocation invocation, String prefix) {
return DubboUtils.getMethodResourceName(invoker, invocation, prefix);
}
@Override
String getInterfaceName(Invoker invoker, String prefix) {
return DubboUtils.getInterfaceName(invoker, prefix);
}
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// Get origin caller.
String origin = DubboAdapterGlobalConfig.getOriginParser().parse(invoker, invocation);
if (null == origin) {
origin = "";
}
Entry interfaceEntry = null;
Entry methodEntry = null;
String prefix = DubboAdapterGlobalConfig.getDubboProviderResNamePrefixKey();
// 接口名
String interfaceResourceName = getInterfaceName(invoker, prefix);
// 方法名
String methodResourceName = getMethodName(invoker, invocation, prefix);
try {
// Only need to create entrance context at provider side, as context will take effect
// at entrance of invocation chain only (for inbound traffic).
// 使用方法名作为上下文名称
ContextUtil.enter(methodResourceName, origin);
// 使用接口名作为interfaceEntry资源名
interfaceEntry = SphU.entry(interfaceResourceName, ResourceTypeConstants.COMMON_RPC, EntryType.IN);
// 使用方法名作为methodEntry资源名
methodEntry = SphU.entry(methodResourceName, ResourceTypeConstants.COMMON_RPC, EntryType.IN,
invocation.getArguments());
// 调用方法
Result result = invoker.invoke(invocation);
if (result.hasException()) {
Tracer.traceEntry(result.getException(), interfaceEntry);
Tracer.traceEntry(result.getException(), methodEntry);
}
return result;
} catch (BlockException e) {
// 触发限流或熔断
return DubboAdapterGlobalConfig.getProviderFallback().handle(invoker, invocation, e);
} catch (RpcException e) {
Tracer.traceEntry(e, interfaceEntry);
Tracer.traceEntry(e, methodEntry);
throw e;
} finally {
if (methodEntry != null) {
// 退出
methodEntry.exit(1, invocation.getArguments());
}
if (interfaceEntry != null) {
// 退出
interfaceEntry.exit();
}
ContextUtil.exit();
}
}
}
限流或熔断后调用的方法 DefaultDubboFallback::handle()
package com.alibaba.csp.sentinel.adapter.dubbo.fallback;
/**
*
*/
public class DefaultDubboFallback implements DubboFallback {
@Override
public Result handle(Invoker<?> invoker, Invocation invocation, BlockException ex) {
// Just wrap the exception.
return AsyncRpcResult.newDefaultAsyncResult(ex.toRuntimeException(), invocation);
}
}
我们经常使用@SentinelResource
来标记一个方法,可以将这个被@SentinelResource
标记的方法看成是一个Sentinel资源
。
因此,我们以@ SentinelResource
为入口,找到其切面,看看切面拦截后所做的工作,来明确Sentinel的工作原理。直接看注解@SentinelResource的切面代码(SentinelResourceAspect)。
<!-- sentinel核心模块-->
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.8.6</version>
</dependency>
<!-- sentinel注解 spring里会用到 -->
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-annotation-aspectj</artifactId>
<version>1.8.6</version>
</dependency>
配置对应Bean
@Configuration
public class SentinelAspectConfiguration {
/**
*
* @return
*/
@Bean
public SentinelResourceAspect sentinelResourceAspect() {
return new SentinelResourceAspect();
}
}
/**
* 下单接口
*/
@PostMapping(value = "/order/create", consumes = MediaType.APPLICATION_JSON_VALUE)
@SentinelResource(value = "orderCreate", blockHandler = "blockHandlerForOrderCreate")
public OrderResponse<OrderCreateVo> orderCreate(@RequestBody OrderCreateRequest req) {
log.info("下单请求开始||req={}", req);
OrderResponse<OrderCreateVo> res = orderCreateService.createOrder(req);
log.info("下单请求结束||res={}", res);
return res;
}
/**
* 限流
*
* @param req OrderCreateRequest
* @param e BlockException
* @return
*/
public OrderResponse<OrderCreateVo> blockHandlerForOrderCreate(@RequestBody OrderCreateRequest req, BlockException e) {
log.info("下单达到阈值被限流 || ");
return OrderResponse.fail(BizErrorCodeEnum.SYSTEM_BUSY.getCode(), BizErrorCodeEnum.SYSTEM_BUSY.getDescription());
}
/**
* @param goodsBizReq
* @return
*/
@Override
@SentinelResource(value = "order_getGoodsInfo", fallback = "fallbackForGetGoodsInfo")
public List<GoodsBizResEntity> getGoodsInfo(GoodsRequestBizEntity goodsBizReq) {
// 省略部分代码
return goodsBizList;
}
/**
* 熔断
*
* @param goodsBizReq
* @param ex
* @return
*/
public List<GoodsBizResEntity> fallbackForGetGoodsInfo(GoodsRequestBizEntity goodsBizReq, BlockException ex) {
log.warn("查询商品触发熔断||");
// 走熔断逻辑 调getGoodsInfoV2
return getGoodsInfoV2(goodsBizReq);
}
package com.alibaba.csp.sentinel.annotation.aspectj;
/**
* SentinelResource注解切面
*/
@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {
/** 指定切入点为@SentinelResource注解 */
@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
public void sentinelResourceAnnotationPointcut() {
}
/** 指定为环绕通知 */
@Around("sentinelResourceAnnotationPointcut()")
public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
Method originMethod = resolveMethod(pjp);
SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
if (annotation == null) {
// Should not go through here.
throw new IllegalStateException("Wrong state for SentinelResource annotation");
}
// 资源名 使用注解里配置的资源名或者方法名
String resourceName = getResourceName(annotation.value(), originMethod);
// 注解里没配置 默认是OUT
EntryType entryType = annotation.entryType();
// 默认为0
int resourceType = annotation.resourceType();
Entry entry = null;
try {
// 进行流控
entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
// 通过流控检查后,才会调用目标方法
return pjp.proceed();
} catch (BlockException ex) {
// 对BlockException进行回调
return handleBlockException(pjp, annotation, ex);
} catch (Throwable ex) {
Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
// The ignore list will be checked first.
if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
throw ex;
}
if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
traceException(ex);
// 异常回调
return handleFallback(pjp, annotation, ex);
}
// No fallback function can handle the exception, so throw it out.
throw ex;
} finally {
if (entry != null) {
entry.exit(1, pjp.getArgs());
}
}
}
}
进入SentinelResource切面后,会执行SphU.entry()
方法,在这个方法中会对被拦截方法做限流和熔断的逻辑处理。
如果触发限流或熔断,会抛出BlockException
,我们可以指定blockHandler方法来处理BlockException。而对于业务上的异常,我们也可以配置fallback方法来处理被拦截方法调用产生的异常。
public abstract class AbstractSentinelAspectSupport {
/** 获取资源名 */
protected String getResourceName(String resourceName, /*@NonNull*/ Method method) {
// 注解里配置了资源名,使用此值
if (StringUtil.isNotBlank(resourceName)) {
return resourceName;
}
// 解析方法名 使用方法名作为资源名
return MethodUtil.resolveMethodName(method);
}
}
package com.alibaba.csp.sentinel.annotation;
/**
* 注释表示Sentinel资源的定义。
*
* @since 0.1.1
*/
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface SentinelResource {
/**
* 返回Sentinel资源的名称
*/
String value() default "";
/**
* @return the entry type (inbound or outbound), outbound by default
*/
EntryType entryType() default EntryType.OUT;
/**
* @return the classification (type) of the resource
* @since 1.7.0
*/
int resourceType() default 0;
/**
* @return name of the block exception function, empty by default
*/
String blockHandler() default "";
/**
* The {@code blockHandler} is located in the same class with the original method by default.
* However, if some methods share the same signature and intend to set the same block handler,
* then users can set the class where the block handler exists. Note that the block handler method
* must be static.
*
* @return the class where the block handler exists, should not provide more than one classes
*/
Class<?>[] blockHandlerClass() default {};
/**
* @return name of the fallback function, empty by default
*/
String fallback() default "";
/**
* The {@code defaultFallback} is used as the default universal fallback method.
* It should not accept any parameters, and the return type should be compatible
* with the original method.
*
* @return name of the default fallback method, empty by default
* @since 1.6.0
*/
String defaultFallback() default "";
/**
* The {@code fallback} is located in the same class with the original method by default.
* However, if some methods share the same signature and intend to set the same fallback,
* then users can set the class where the fallback function exists. Note that the shared fallback method
* must be static.
*
* @return the class where the fallback method is located (only single class)
* @since 1.6.0
*/
Class<?>[] fallbackClass() default {};
/**
* @return the list of exception classes to trace, {@link Throwable} by default
* @since 1.5.1
*/
Class<? extends Throwable>[] exceptionsToTrace() default {Throwable.class};
/**
* Indicates the exceptions to be ignored. Note that {@code exceptionsToTrace} should
* not appear with {@code exceptionsToIgnore} at the same time, or {@code exceptionsToIgnore}
* will be of higher precedence.
*
* @return the list of exception classes to ignore, empty by default
* @since 1.6.0
*/
Class<? extends Throwable>[] exceptionsToIgnore() default {};
}
[1] 主流框架的适配
Sentinel整体架构如下
图里从下往上可以看到,核心的部分包含 规则(rules)
、 处理插槽(slot)
、 调用链路(invocation tree)
、 集群节点(cluster node)
、 滑动窗口(slading winodw)
5部分。
// todo
ClusterBuilderSlot
负责维护资源运行统计信息(响应时间、qps、线程数、异常),以及调用者列表
package com.alibaba.csp.sentinel.slots.clusterbuilder;
/**
* 该槽维护资源运行统计信息(响应时间、qps、线程数、异常),以及调用者列表,由 ContextUtil#enter(String origin)标记
* 一个资源只有一个集群节点,而一个资源可以有多个默认节点。
*/
@Spi(isSingleton = false, order = Constants.ORDER_CLUSTER_BUILDER_SLOT)
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
/**
* 相同的资源 ResourceWrapper#equals(Object) 将在全局范围内共享相同的ProcessorSlotChain,无论在哪个上下文中。
* 因此,如果代码进入 entry(),资源名称必须相同,但上下文名称可能不同。
*
* 为了获取同一资源在不同上下文中的总统计信息,同一资源在全局共享相同的ClusterNode。所有ClusterNode都缓存在此映射中。
*
* 应用程序运行的时间越长,这个映射就会变得越稳定。所以我们不是并发映射而是锁。因为这个锁只发生在最开始,而并发映射将一直持有锁。
*
*/
private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
private static final Object lock = new Object();
private volatile ClusterNode clusterNode = null;
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args)
throws Throwable {
// 单例-DCL
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// 创建集群节点。
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
// 添加到缓存里
newMap.put(node.getId(), clusterNode);
// 更新缓存
clusterNodeMap = newMap;
}
}
}
node.setClusterNode(clusterNode);
/*
* 如果设置了原始上下文,我们应该获取 或 创建原始直接的Node。
*/
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
Sentinel整体架构如下
图里从下往上可以看到,核心的部分包含 规则(rules)
、 处理插槽(slot)
、 调用链路(invocation tree)
、 集群节点(cluster node)
、 滑动窗口(slading winodw)
5部分。
Sentinel里rules包含 流控规则(Flow Rule)、熔断规则( ) 等
//
//
FlowRuleManager对象主要职责是管理资源的限流配置
资源对应的流控配置保存在 Map<String, List<FlowRule>> flowRules
package com.alibaba.csp.sentinel.slots.block.flow;
/**
* 一个资源可以有多个规则/配置。这些规则按以下顺序生效:
* 来自指定呼叫方的请求
* 没有指定的调用者
*/
public class FlowRuleManager {
/** 资源和对应限流配置集合 重要 */
private static volatile Map<String, List<FlowRule>> flowRules = new HashMap<>();
private static final FlowPropertyListener LISTENER = new FlowPropertyListener();
/** */
private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>();
/**
* 定时任务线程池
* SCHEDULER 的 corePool size 必须设置为 1,这样两个任务startMetricTimerListener()才能由SCHEDULER有序运行
*/
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("sentinel-metrics-record-task", true));
static {
currentProperty.addListener(LISTENER);
startMetricTimerListener();
}
}
这里用到了观察者模式,DynamicSentinelProperty
是被观察者,FlowRuleManager
是观察者,FlowRuleManager观察到配置变更后会通过configUpdate
方法更新配置信息。
FlowRule对象的主要用来存配置信息
package com.alibaba.csp.sentinel.slots.block.flow;
public class FlowRule extends AbstractRule {
public FlowRule() {
super();
setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
/**
* 流控类型 0:线程数限流 1:QPS限流
* 默认使用 QPS限流
*/
private int grade = RuleConstant.FLOW_GRADE_QPS;
/**
* 流量控制阈值。
*/
private double count;
/**
* 限流策略
* 基于调用链的流量控制策略。
*
* {@link RuleConstant#STRATEGY_DIRECT} 直接限流 (by origin);
* {@link RuleConstant#STRATEGY_RELATE} 关联限流 (with relevant resource);
* {@link RuleConstant#STRATEGY_CHAIN} 链路限流 (by entrance resource).
*/
private int strategy = RuleConstant.STRATEGY_DIRECT;
/**
* 具有相关资源或上下文的流量控制中的参考资源。
*/
private String refResource;
/**
* 流控效果
*
* 0.直接拒绝(reject directly)
* 1.热启动(warm up)
* 2. 匀速排队(rate limiter)
* 3. 热启动 + 匀速排队 warm up + rate limiter
*/
private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
/**
* 流控效果为预热启动时的预热时长
* 默认10s
*/
private int warmUpPeriodSec = 10;
/**
* 流控效果为排队等待时的等待时长
* 默认500
*/
private int maxQueueingTimeMs = 500;
/**
* 集群模式
*/
private boolean clusterMode;
/**
* 集群模式的流规则配置。
*/
private ClusterFlowConfig clusterConfig;
/**
* 流量整形(节流)控制器。
*/
private TrafficShapingController controller;
}
public abstract class AbstractRule implements Rule {
/**
* 规则id
*/
private Long id;
/**
* 规则名称
*/
private String resource;
/**
* 将受来源限制的应用程序名称。
* 默认的limitApp是"default",即允许所有源应用。
*
* 对于权限规则,多个源名称可以用逗号(',')分隔。
*/
private String limitApp;
}
synchronized是Java语言里互斥锁的一种实现。解决了原子性、可见性的问题。
synchronized 是基于底层操作系统的 Mutex Lock 实现的,每次获取和释放锁操作都会带来用户态和内核态的切换,从而增加系统性能开销。因此,在锁竞争激烈的情况下,synchronized 同步锁在性能上就表现得非常糟糕,它也常被大家称为重量级锁。
synchronized 实现同步锁的方式有两种,一种是修饰方法,一种是修饰方法块。以下就是通过 Synchronized 实现的两种同步方法加锁的方式:
public class SynchronizedTest {
public static void main(String[] args){
}
/**
* 同步实例方法,锁实例对象
*/
public synchronized void test() {
}
/**
* 同步类方法,锁类对象
*/
public synchronized static void test1() {
}
/**
* 同步代码块
*/
public void test2() {
// 锁类对象
synchronized (SynchronizedTest.class) {
// 锁实例对象
synchronized (this) {
}
}
}
}
javac SynchronizedTest.java //先运行编译class文件命令
javap -v SynchronizedTest.class //再通过javap打印出字节文件
在复杂的分布式系统调用链路里,想对部分服务做限流,首先要知道整体的调用链路才能去定制化限流。
比如在电商系统里,首页、商详、购物车、下单 等都会调用商品服务,假设商品服务达到服务QPS极限了,这个时候要限流,但是要保证下单调商品不限流,其他的按照优先级限流,要怎么做?
在复杂业务场景,需要对调用系统按照优先级去处理
假设要自己实现,可能通过加一个系统标识、系统优先级去区分。
//
1.N叉树初始化
2.N叉树新增子节点
3.N叉树查询
DefaultNode
表示 DefaultNodeConstants
类里,public final static DefaultNode ROOT
NodeSelectorSlot
的 Map<String, DefaultNode> map
里,注意 map 是 private volatile
修饰的,也就是只有NodeSelectorSlot
类可以访问到 N叉树的根节点在Constants
类里,是一个公共静态变量,可以随时访问
package com.alibaba.csp.sentinel;
/**
* Sentinel 的通用常量。
*/
public final class Constants {
public final static String ROOT_ID = "machine-root";
/**
* 全局统计根节点,代表通用父节点。
*/
public final static DefaultNode ROOT = new EntranceNode(new StringResourceWrapper(ROOT_ID, EntryType.IN),
new ClusterNode(ROOT_ID, ResourceTypeConstants.COMMON));
}
在调用CtSph::entryWithPriority
方法时,会执行 Context context = ContextUtil.getContext();
这行代码
在调用ContextUtil时,会触发 static{}
代码块初始化,这块代码对应的就是N叉树初始化的代码。
public class ContextUtil {
/**
* 把上下文存储到线程本地变量方便获取
*/
private static ThreadLocal<Context> contextHolder = new ThreadLocal<>();
/**
* 持有所有 EntranceNode 。 每个 EntranceNode 都与一个不同的上下文名称相关联。
*/
private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();
private static final ReentrantLock LOCK = new ReentrantLock();
private static final Context NULL_CONTEXT = new NullContext();
// ContextUtil被调用时自动执行static里的方法
static {
// 缓存默认上下文的入口节点。
initDefaultContext();
}
private static void initDefaultContext() {
// sentinel_default_context
String defaultContextName = Constants.CONTEXT_DEFAULT_NAME;
EntranceNode node = new EntranceNode(new StringResourceWrapper(defaultContextName, EntryType.IN), null);
// N叉树根节点添加子节点
Constants.ROOT.addChild(node);
contextNameNodeMap.put(defaultContextName, node);
}
}
注意,static代码块里的方法只会执行一次
NodeSelectorSlot主要作用是负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
package com.alibaba.csp.sentinel.slots.nodeselector;
/**
* 这个类将尝试通过构建调用跟踪
*
* 如果需要,添加一个新的DefaultNode作为上下文中的最后一个子节点。
* 上下文的最后一个节点是上下文的当前节点或父节点。
* 将自身设置为上下文当前节点。
*
* 它的工作原理如下
* <pre>
* ContextUtil.enter("entrance1", "appA");
* Entry nodeA = SphU.entry("nodeA");
* if (nodeA != null) {
* nodeA.exit();
* }
* ContextUtil.exit();
* </pre>
*
* 上面的代码会在内存中生成如下调用结构:
*
* <pre>
*
* machine-root
* /
* /
* EntranceNode1 (entrance1,appA)
* /
* /
* DefaultNode(nodeA)- - - - - -> ClusterNode(nodeA);
* </pre>
*
* 这里的EntranceNode表示由ContextUtil.enter("entrance1", "appA") 给出的“entrance1”。
*
*
* DefaultNode(nodeA) 和 ClusterNode(nodeA) 都保存了“nodeA”的统计信息,它由 SphU.entry("nodeA")给出
*
* ClusterNode 由 ResourceId 唯一标识;
* DefaultNode 由 资源ID 和 Context 标识。
* 换句话说,一个资源 ID 将为每个不同的上下文生成多个 DefaultNode,但只会生成一个 ClusterNode。
*
*
* 以下代码显示了两个不同上下文中的一个资源 ID
* <pre>
* ContextUtil.enter("entrance1", "appA");
* Entry nodeA = SphU.entry("nodeA");
* if (nodeA != null) {
* nodeA.exit();
* }
* ContextUtil.exit();
*
* ContextUtil.enter("entrance2", "appA");
* nodeA = SphU.entry("nodeA");
* if (nodeA != null) {
* nodeA.exit();
* }
* ContextUtil.exit();
* </pre>
*
* 上面的代码会在内存中生成如下调用结构:
*
* <pre>
*
* machine-root
* / \
* / \
* EntranceNode1 EntranceNode2
* / \
* / \
* DefaultNode(nodeA) DefaultNode(nodeA)
* | |
* +- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA);
* </pre>
*
* 正如我们所见,在两个上下文中为“nodeA”创建了两个DefaultNode,但只创建了一个ClusterNode。
*
* 我们还可以通过调用来检查这个结构: curl http://localhost:8719/tree?type=root}
*
* @see EntranceNode
* @see ContextUtil
*/
@Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
/**
* DefaultNode s 相同资源在不同上下文中。
* {@link DefaultNode}s of the same resource in different context.
*
* map的key是上下文名称,对应 context.getName()
*/
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);
/**
*/
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
/*
* 有趣的是,我们使用上下文名称而不是资源名称作为映射键。
*
* 请记住,相同的资源 ResourceWrapper#equals() 将在全局范围内共享相同的ProcessorSlotChain,无论在哪个上下文中。
* 因此,如果代码进入#entry(Context, ResourceWrapper, DefaultNode, int, Object...),资源名称必须相同,但上下文名称可能不同。
*
* 如果我们使用 SphU#entry(String resource) 在不同的上下文中输入同一个资源,
* 使用上下文名称作为映射键可以区分同一个资源。
* 在这种情况下,将为每个不同的上下文(不同的上下文名称)创建具有相同资源名称的多个 DefaultNode 。
*
* 考虑另一个问题。一个资源可能有多个DefaultNode,那么获取同一个资源的总统计数据最快的方法是什么?
* 答案是所有具有相同资源名称的DefaultNode 共享一个 ClusterNode。
* 有关详细信息,请参阅 ClusterBuilderSlot 。
*
*/
// 获取当前上下文的DefaultNode
DefaultNode node = map.get(context.getName());
// 单例模式-DCL
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
// 新建节点
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
// 把当前上下文节点放入缓存中
cacheMap.put(context.getName(), node);
map = cacheMap;
// 构建调用树 构建N叉树
// 在当前上下文中添加子节点
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
// 更新上下文里的当前处理节点
context.setCurNode(node);
// 调用责任链的下一个功能插槽
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
package com.alibaba.csp.sentinel.node;
public class DefaultNode extends StatisticNode {
/**
* 孩子节点集合
*/
private volatile Set<Node> childList = new HashSet<>();
/**
* 添加孩子节点
*
* @param node valid child node
*/
public void addChild(Node node) {
if (node == null) {
RecordLog.warn("Trying to add null child to node <{}>, ignored", id.getName());
return;
}
// 单例-DCL
if (!childList.contains(node)) {
synchronized (this) {
if (!childList.contains(node)) {
Set<Node> newSet = new HashSet<>(childList.size() + 1);
newSet.addAll(childList);
// 添加孩子节点
newSet.add(node);
// 更新孩子节点
childList = newSet;
}
}
RecordLog.info("Add child <{}> to node <{}>", ((DefaultNode)node).id.getName(), id.getName());
}
}
}
/**
* <pre>
*
* machine-root
* / \
* / \
* EntranceNode1 EntranceNode2
* / \
* / \
* DefaultNode(nodeA) DefaultNode(nodeA)
* | |
* +- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA);
* </pre>
*/