Sentinel滑动窗口原理及源码解析
Sentinel从架构上主要包含5部分:规则(rules)、 处理插槽(slot)、 调用链路(invocation tree)、 集群节点(cluster node)、 滑动窗口(slading winodw)
下面我们来看滑动窗口在Sentinel里是怎么实现的?
(1) 滑动窗口是什么
滑动窗口(Sliding window)是一种流量控制技术。
(2) 为什么要用滑动窗口
滑动窗口是计数器算法的优化版,解决了计数器限流算法的临界值问题。
(3) 滑动窗口的原理
滑动窗口
是 固定时间窗口计数器算法
的改进版,把一个时间间隔分为多个更小的时间间隔,使得流量更为平滑,限流更为准确。
(3.1) 工程优化
如果在程序里直接使用数组,随着时间的推移,每次滑动一个窗口需要在尾部增加一个元素并且需要在头部删除一个元素,所以为了节省删除元素对整个数字带来的移动开销,使用环形数组来避免数组频繁增加移动的问题。
在实现时可以通过移动指针来达到环形数组的情况,比如ArrayBlockingQueue
就用的这个办法。
Sentinel也使用了类似的思想,只不过指针需要计算一下。
可以看到,使用环形数组后数组里的元素个数是固定的,不会随时间变化而变化,唯一变化的是时间。
(3.2) 环形数组的问题
但是使用了环形数组后,带来2个问题:
1.怎么根据时间定位到对应的下标?
2.数组里的数据是新的还是旧的?
(3.2.1) 根据时间定位到桶(时间窗口)的下标
对 时间跨度(1000ms) 取余即可得到对应的时间,再除以时间窗口长度(200ms)得到的余数就是下标。 下标 = (当前时间 % 时间跨度) / 一个时间窗口的时间
比如 时间跨度是1000ms,时间窗口是200ms,桶有5个
那么t1=2100,对应的下标就是 (2100%1000) / 200 = 0
(3.2.2) 数据是新的还是旧的
在一个桶/时间窗口内保存开始时间,在使用时对比 开始时间是否一致,如果一致则是新的,否则就是旧的。公式: 当前时间跨度开始时间 = 当前时间 - (当前时间 % 时间跨度)
2100 - (2100 % 1000) = 2000,可以获取到当前时间对应的时间跨度开始时间是 2000
(4) 源码
Sentinel整体统计计数流程如下
类 | 作用 |
---|---|
StatisticSlot | 实时统计处理插槽。 |
DefaultNode | N叉树的节点,节点里保存了孩子节点、统计指标。 |
StatisticNode | 存储实时统计指标;包含秒级指标、分钟级指标、线程数。 |
ArrayMetric | 滑动窗口的包装类,对外提供计数功能类,底层通过持有LeapArray的对象实现计数。 |
LeapArray | 滑动窗口真正的数据结构,环形数组、滑动窗口的实现。 |
WindowWrap | 时间窗口/桶包装类,通过桶的开始时间戳 解决环形数组定位下标和判断新老数据问题。 |
MetricBucket | 计数桶/时间窗口,存储各种事件计数,比如请求通过总数、限流总数、异常总数等。 |
和滑动窗口相关的数据结构如下
可以看到滑动窗口是基于Node的,这个Node是在ClusterBuilderSlot
获取的。
滑动窗口里比较重要的是LeapArray
Sentinel实现的滑动窗口如下
滑动窗口的调用入口从StatisticSlot
类 entry()
方法 node.increaseThreadNum();
和 node.addPassRequest(count);
2行代码
注意滑动窗口都是 基于当前节点 当前时间窗口的。
(4.1) 实时统计插槽-StatisticSlot
package com.alibaba.csp.sentinel.slots.statistic;
/**
* 专用于实时统计的处理器插槽。
* 在进入这个槽的时候,我们需要单独统计以下信息:
* ClusterNode:资源ID的集群节点的总统计。
* 源节点:来自不同调用者/源的集群节点的统计信息。
* DefaultNode:特定上下文中特定资源名称的统计信息。
* 最后是所有入口的总和统计。
*/
@Spi(order = Constants.ORDER_STATISTIC_SLOT)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 资源线程计数
node.increaseThreadNum();
// 资源请求量计数 计数的部分用到了滑动窗口
node.addPassRequest(count);
// 来源计数
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
// 入口流量计数
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// 使用已注册的条目回调处理程序处理传递事件。
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// origin node 添加计数
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// Blocked, set block exception to current entry.
context.getCurEntry().setBlockError(e);
// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);
throw e;
}
}
}
(4.2) N叉树节点-DefaultNode
DefaultNode可以看做N叉树节点。继承了StatisticNode
节点。
package com.alibaba.csp.sentinel.node;
/**
* 用于保存特定上下文中特定资源名称的统计信息的Node。
* 不同上下文中的不同资源将对应不同的DefaultNode。
*
* 此类可能有子节点列表。在同一上下文中多次调用 SphU#entry() 或 SphO#entry() 时,将创建子节点。
*
* @see NodeSelectorSlot
*/
public class DefaultNode extends StatisticNode {
/**
* 与当前节点关联的资源。
*/
private ResourceWrapper id;
/**
* 当前节点的孩子节点
*/
private volatile Set<Node> childList = new HashSet<>();
/**
* 关联的群集节点
*/
private ClusterNode clusterNode;
public DefaultNode(ResourceWrapper id, ClusterNode clusterNode) {
this.id = id;
this.clusterNode = clusterNode;
}
}
/** 添加请求次数 */
@Override
public void addPassRequest(int count) {
super.addPassRequest(count);
this.clusterNode.addPassRequest(count);
}
(4.3) 统计节点-StatisticNode
package com.alibaba.csp.sentinel.node;
/**
* 统计节点保留三种实时统计指标:
* 秒级指标 rollingCounterInSecond
* 分钟级别指标 rollingCounterInMinute
* 线程数
*
* Sentinel 使用滑动窗口实时记录和统计资源统计数据。
* ArrayMetric 背后的滑动窗口基础架构是 LeapArray
*
*
* case 1:当第一个请求进来时,Sentinel会新建一个指定时间跨度的window bucket来存储运行统计信息,
* 比如总响应时间(rt)、传入请求(QPS)、阻塞请求(bq)等. 时间跨度由样本计数定义。
*
* 0 100ms
* +-------+--→ 滑动窗口
* ^
* |
* request
*
* Sentinel使用有效桶的静态信息来决定是否可以通过该请求。
* 例如,如果一条规则定义了只能通过100个请求,它会将有效桶中的所有qps相加,并将其与规则中定义的阈值进行比较。
*
*
* case 2: 连续请求
*
* 0 100ms 200ms 300ms
* +-------+-------+-------+-----→ 滑动窗口
* ^
* |
* request
*
*
* case 3: 请求不断到来,之前的桶变得无效
*
* 0 100ms 200ms 800ms 900ms 1000ms 1300ms
* +-------+-------+ ...... +-------+-------+ ...... +-------+-----→ 滑动窗口
* ^
* |
* request
*
*
* 滑动窗口变为:
*
* 300ms 800ms 900ms 1000ms 1300ms
* + ...... +-------+ ...... +-------+-----→ Sliding Windows
* ^
* |
* request
*
*
*/
public class StatisticNode implements Node {
/**
* 保存最近毫秒间隔的统计信息。
* 毫秒间隔 按给定的 桶 划分为时间跨度
*/
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
/**
* 保存最近60秒的统计信息。
* windowLengthInMs 特意设置为1000毫秒,即每桶每秒,这样我们就可以准确统计每一秒。
*/
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
/**
* 线程数计数器
*/
private LongAdder curThreadNum = new LongAdder();
/**
* 获取指标时的上一个时间戳。
*/
private long lastFetchTime = -1;
}
/** 添加请求次数 */
@Override
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
(4.4) 度量数组包装类-ArrayMetric
ArrayMetric
是Metric
的一个实现类
在思考一个问题,会有LinkedMetric
吗?
package com.alibaba.csp.sentinel.slots.statistic.metric;
/**
* Sentinel中使用BucketLeapArray内部的基本度量类。
*/
public class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
}
/** 添加请求数 */
@Override
public void addPass(int count) {
// 获取当前的桶/时间窗口
WindowWrap<MetricBucket> wrap = data.currentWindow();
// 添加调用次数
wrap.value().addPass(count);
}
(4.5) 滑动窗口-LeapArray
规定 窗口长度、窗口个数、
package com.alibaba.csp.sentinel.slots.statistic.base;
/**
* Sentinel 中统计指标的基本数据结构。
*
* Leap数组使用滑动窗口算法来统计数据。
* 每个桶/窗口覆盖windowLengthInMs时间跨度,总时间跨度为intervalInMs,
* 因此总桶数为:sampleCount = intervalInMs / windowLengthInMs
* 桶/窗口个数 = 毫秒单位间隔(默认1000) / 毫秒级窗口长度(默认500)
*
* @param <T> 统计类型数据
*/
public abstract class LeapArray<T> {
// 毫秒级别的窗口长度 默认为500
protected int windowLengthInMs;
// 桶/时间窗口个数 默认为2
protected int sampleCount;
// (毫秒级)时间跨度 默认是1000
protected int intervalInMs;
// 以分钟为单位的时间跨度
private double intervalInSecond;
//
protected final AtomicReferenceArray<WindowWrap<T>> array;
/**
* 条件(谓词)更新锁仅在当前存储桶被弃用时使用。
*/
private final ReentrantLock updateLock = new ReentrantLock();
/**
*
* @param sampleCount 滑动窗口个数
* @param intervalInMs 总的时间间隔(单位毫秒)
*/
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.intervalInSecond = intervalInMs / 1000.0;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
}
(4.5.1) currentWindow()
/**
* 获取当前时间戳对应的桶/时间窗口
*
* @return
*/
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
}
/**
* 根据时间戳获取当前的桶/时间窗口
*
* @param timeMillis 毫秒时间戳
* @return 当前时间戳对应的桶
*/
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 计算索引下标
int idx = calculateTimeIdx(timeMillis);
// 计算 桶/时间窗口 的开始时间戳
long windowStart = calculateWindowStart(timeMillis);
/*
* 从数组获取指定时间的桶/时间窗口
*
* (1) Bucket不存在,只需创建一个新的Bucket并将CAS更新为循环数组。
* (2) Bucket存在,并且是最新的,然后只需返回Bucket。
* (3) Bucket存在,但是是旧的数据,也就是已弃用,重置当前Bucket。
*/
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) { // 当前桶/时间窗口是空的
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* 桶是空的,所以创建一个新的并更新
*
* 如果旧的bucket不存在,那么我们将在windowStart创建一个新的bucket,然后尝试通过CAS操作更新循环数组。
* 只有一个线程可以成功更新,而其他线程会让出时间片。
*/
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// 成功更新,返回创建的桶
return window;
} else {
// 争用失败,线程将放弃其时间片以等待桶可用。
Thread.yield();
}
} else if (windowStart == old.windowStart()) { // 桶存在,并且是最新的
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*
* 如果当前 windowStart 等于旧bucket的起始时间戳,说明时间在bucket之内,直接返回bucket。
*/
return old;
} else if (windowStart > old.windowStart()) { // 桶存在,但是是旧的数据
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*
* 如果旧桶的开始时间戳晚于提供的时间,则表示该桶数据是旧的,已弃用。
* 我们必须将存储桶重置为当前 windowStart 。
* 请注意,重置和清理操作很难是原子的,因此我们需要一个更新锁来保证存储桶更新的正确性。
*
* 更新锁是有条件的(作用域很小),只有在桶被弃用时才会生效,所以在大多数情况下不会导致性能损失。
*/
// 尝试获取锁
if (updateLock.tryLock()) {
try {
// 成功拿到更新锁,现在我们重置桶。
return resetWindowTo(old, windowStart);
} finally {
// 释放锁
updateLock.unlock();
}
} else {
// 争用失败,线程将放弃其时间片以等待桶可用。
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// 不应该经过这里,因为规定的时间已经晚了。
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
(4.5.2) 根据时间定位到对应的下标-calculateTimeIdx()
下标 = (当前时间 % 时间跨度) / 一个时间窗口的时间
Sentinel里采用的公式 下标 = (当前时间 / 一个时间窗口的时间) % 一个时间跨度里的时间窗口个数
把一个窗口当做一步,这个是先算出 走了多少步,然后按照已经走的步数 对 一圈里走多少步,即对应下标
比如 时间跨度(intervalInMs)是1000ms,时间窗口(windowLengthInMs)是200ms,桶/时间窗口个数(sampleCount)有5个
那么t1=2100,对应的下标就是 (2100 / 200) % 5 = 0
/**
* 公式 下标 = (当前时间 / 一个时间窗口的时间) % 一个时间跨度里的时间窗口个数
*/
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
// 走的步数
long timeId = timeMillis / windowLengthInMs;
// 走的步数 % 走一圈需要多少步
return (int)(timeId % array.length());
}
(4.5.3) 时间窗口的开始时间戳
/** 时间窗口的开始时间戳 */
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
2100 - (2100 % 1000) = 2000,可以获取到当前时间对应的时间跨度开始时间是2000
(4.5.4) TimeUtil.currentTimeMillis()
解决System.currentTimeMillis()
CPU占用率高的问题
com.alibaba.csp.sentinel.util.TimeUtil high CPU usage
package com.alibaba.csp.sentinel.util;
/**
* 提供毫秒级的OS时间。
*
* 在这里我们应该看到,并非所有时间 TimeUtil 都应该保持每秒循环 1_000 次(由于一些损失,实际上大约是 800/s)。
*
* 在空闲条件下,它只是充当 System.currentTimeMillis();
* 在繁忙的情况下(明显超过 1_000/s),它会保持循环以降低成本。
*
* 有关详细设计和建议,请转到 https://github.com/alibaba/Sentinel/issues/1702#issuecomment-692151160
*
*/
public final class TimeUtil implements Runnable {
/** 单例模式 */
private static TimeUtil INSTANCE;
/** 当前时间戳 后台线程更新时间戳 */
private volatile long currentTimeMillis;
private volatile STATE state = STATE.IDLE;
private LeapArray<Statistic> statistics;
/**
* thread private variables
*/
private long lastCheck = 0;
//
static {
INSTANCE = new TimeUtil();
}
public static TimeUtil instance() {
return INSTANCE;
}
public static long currentTimeMillis() {
return INSTANCE.getTime();
}
/**
* 当前时间戳(毫秒)
*
* @return
*/
public long getTime() {
return this.currentTime(false);
}
private long currentTime(boolean innerCall) {
long now = this.currentTimeMillis;
Statistic val = this.statistics.currentWindow(now).value();
if (!innerCall) {
val.getReads().increment();
}
if (this.state == STATE.IDLE || this.state == STATE.PREPARE) {
now = System.currentTimeMillis();
this.currentTimeMillis = now;
if (!innerCall) {
val.getWrites().increment();
}
}
return now;
}
/** */
@Override
public void run() {
while (true) {
// Mechanism optimized since 1.8.2
this.check();
if (this.state == STATE.RUNNING) {
this.currentTimeMillis = System.currentTimeMillis();
this.statistics.currentWindow(this.currentTimeMillis).value().getWrites().increment();
try {
// 休眠1ms
TimeUnit.MILLISECONDS.sleep(1);
} catch (Throwable e) {
}
continue;
}
if (this.state == STATE.IDLE) {
try {
// 休眠300ms
TimeUnit.MILLISECONDS.sleep(300);
} catch (Throwable e) {
}
continue;
}
if (this.state == STATE.PREPARE) {
RecordLog.debug("TimeUtil switches to RUNNING");
this.currentTimeMillis = System.currentTimeMillis();
this.state = STATE.RUNNING;
continue;
}
}
}
}
(4.6) 时间窗口包装类-WindowWrap
记录每个桶/时间窗口的 时间长度、开始时间戳、对应的桶/时间窗口的数据
package com.alibaba.csp.sentinel.slots.statistic.base;
/**
* 一段时间窗口的包装器实体类。
*
* @param <T> 数据类型
*/
public class WindowWrap<T> {
/**
* 单个窗口桶的时间长度(毫秒)。
*/
private final long windowLengthInMs;
/**
* 窗口开始的毫秒时间戳。
*/
private long windowStart;
/**
* 统计数据
*/
private T value;
/**
* @param windowLengthInMs 单个窗口同的(毫秒)时间长度
* @param windowStart 窗口开始时间戳
* @param value 统计数据
*/
public WindowWrap(long windowLengthInMs, long windowStart, T value) {
this.windowLengthInMs = windowLengthInMs;
this.windowStart = windowStart;
this.value = value;
}
}
(4.7) 度量桶-MetricBucket
Sentinel 使用 Bucket 统计一个窗口时间内的各项指标数据,这些指标数据包括请求总数、成功总数、异常总数、总耗时、最小耗时、最大耗时等
一个桶就是一个时间窗口
package com.alibaba.csp.sentinel.slots.statistic.data;
/**
* 表示一段时间内的度量数据。
*/
public class MetricBucket {
// 度量数据
// 存储各事件的计数,比如异常总数、请求总数等
// counters[0] 存储请求通过总数, 对应 MetricEvent::PASS
// counters[1] 存储限流总数, 对应 MetricEvent::BLOCK
// counters[2] 存储异常总数, 对应 MetricEvent::EXCEPTION
// counters[3] 存储成功总数, 对应 MetricEvent::SUCCESS
// counters[4] 存储响应时间之和, 对应 MetricEvent::RT
// counters[5] 存储已通过未来配额, 对应 MetricEvent::OCCUPIED_PASS
private final LongAdder[] counters;
// 最小响应时间
private volatile long minRt;
public MetricBucket() {
// 枚举常量 长度是6
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
// 根据下标初始化
counters[event.ordinal()] = new LongAdder();
}
initMinRt();
}
private void initMinRt() {
// 最小rt 用的是环境变量 对应数值是5000
this.minRt = SentinelConfig.statisticMaxRt();
}
}
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
/** */
public void addPass(int n) {
add(MetricEvent.PASS, n);
}
public void addOccupiedPass(int n) {
add(MetricEvent.OCCUPIED_PASS, n);
}
public void addException(int n) {
add(MetricEvent.EXCEPTION, n);
}
public void addBlock(int n) {
add(MetricEvent.BLOCK, n);
}
public void addSuccess(int n) {
add(MetricEvent.SUCCESS, n);
}
/** 添加响应时间 这里保存的是这个桶/窗口内所有响应时间之和 */
public void addRT(long rt) {
add(MetricEvent.RT, rt);
// Not thread-safe, but it's okay.
if (rt < minRt) {
minRt = rt;
}
}
/** */
public long get(MetricEvent event) {
return counters[event.ordinal()].sum();
}
度量事件类型-MetricEvent
package com.alibaba.csp.sentinel.slots.statistic;
/**
*
*/
public enum MetricEvent {
/**
* 正常通过数
*/
PASS,
/**
* 正常限流数
*/
BLOCK,
EXCEPTION,
SUCCESS,
RT,
/**
* 已通过未来配额(自1.5.0版本开始预占用)。
*/
OCCUPIED_PASS
}
LongAdder
AtomicLong中有个内部变量value保存着实际的long值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
public class LongAdder extends Striped64 implements Serializable {
private static final long serialVersionUID = 7249069246863182397L;
/**
* Creates a new adder with initial sum of zero.
*/
public LongAdder() {
}
}
/**
* Adds the given value.
*
* @param x the value to add
*/
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
/**
* Equivalent to {@code add(1)}.
*/
public void increment() {
add(1L);
}
/**
* Equivalent to {@code add(-1)}.
*/
public void decrement() {
add(-1L);
}
/**
* Returns the current sum. The returned value is <em>NOT</em> an
* atomic snapshot; invocation in the absence of concurrent
* updates returns an accurate result, but concurrent updates that
* occur while the sum is being calculated might not be
* incorporated.
*
* @return the sum
*/
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
/**
* Resets variables maintaining the sum to zero. This method may
* be a useful alternative to creating a new adder, but is only
* effective if there are no concurrent updates. Because this
* method is intrinsically racy, it should only be used when it is
* known that no threads are concurrently updating.
*/
public void reset() {
Cell[] as = cells; Cell a;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
a.value = 0L;
}
}
}