在上篇《sentinel的几个实体分析》的Node和Metric中,我没有花太长的时间研究。因为sentinel是如何进行qps等指标的统计的,跟StaticNode和Metric有很大的关系。所以,我想单拿出来写一篇。首先,确定一点就是sentinel是基于滑动窗口算法来实现的。
在上一篇《sentinel降级背后的原理解析》已经分析了sentinel是如何驱动的。在里面提到了有很多的slot按照顺序依次排列并运行的。所以,我们先来看一看这些数据是按照什么路径获取到的。那么,首先我们知道StatisticSlot是进行统计的。所以我们就要看看他的entry方法
-
StatisticSlot
//A processor slot that dedicates to real time statistics.
@Spi(order = Constants.ORDER_STATISTIC_SLOT)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// 触发下一个slot的entry()
fireEntry(context, resourceWrapper, node, count, prioritized, args);
//能往下执行就说明没有被限流或者降级
// Request passed, add thread count and pass count.
node.increaseThreadNum();
node.addPassRequest(count);
…………
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
………… }
} catch (BlockException e) {
// Blocked, set block exception to current entry.
context.getCurEntry().setBlockError(e);
// Add block count.
node.increaseBlockQps(count);
…………
} catch (Throwable e) {
// Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);
throw e;
}
}
…………
}
从上面代码,我们可知
- 通过node中的当前的实时统计指标信息进行规则校验
- 如果通过了校验,则重新更新node中的实时指标数据
- 如果被block了或者出现异常了,则更新node中block的指标或异常指标
从上面的代码中,可以清晰的看到所有的实时指标都是在node中进行的。我们以qps的指标进行分析,看sentinel是怎么统计出qps的。
-
几种Node
//DefaultNode继承了StatisticNode实现了Node
public class DefaultNode extends StatisticNode {
/**
* Associated cluster node.
*/
private ClusterNode clusterNode;
…………
//下面两句其实执行的方法都是一个。但是一个属于DefaultNode,一个属于ClusterNode。DefaultNode指向了ClusterNode
@Override
public void addPassRequest(int count) {
//调用StatisticNode的
super.addPassRequest(count);
//ClusterNode没有定义这个方法,向父类找,在StaticsticNode中找到了
this.clusterNode.addPassRequest(count);
}
…………
}
DefaultNode和ClusterNode有一些区别。
DefaultNode是保存着某个resource的某个context的实时指标。每个DefaultNode都指向一个ClusterNode,
ClusterNode是保存着某个resource在所有的context中实时指标的总和。同样的resource会共享同一个ClusterNode,不管它在那个context中。
上面说了,最终都是执行的StatisticNode对象的addPassRequest方法:
public class StatisticNode implements Node {
//虽然用的是ArrayMetric。但是真正构造的却是OccupiableBucketLeapArray
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
……
//都是Metric的类型。所以我们应该去Metric看看
@Override
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count); //下面几个列子改造的数据结构,再返回这里向下看。
//应该去看OccupiableBucketLeapArray.addPass()。但是OccupiableBucketLeapArray没有这个方法,所以就向上看他的父亲的addPass()
rollingCounterInMinute.addPass(count);
}
……
}
-
ArrayMetric
/*
采用BucketLeapArray结构的基础指标结构
*/
public class ArrayMetric implements Metric {
//这块要求了MetricBucekt。MetricBucket是记录着一段时间内的所有指标(pass,block等等)
private final LeapArray<MetricBucket> data;
//构造函数,我们要看看OccupiableBucketLeapArray这个结构
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
if (enableOccupy) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else {
//还要看看这个结构,有什么区别
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}
}
//data是LeapArray
@Override
public void addPass(int count) {
//调用OccupiableBucketLeapArray的currentWindow()。但是OccupiableBucketLeapArray没有,他父亲LeapArray有。
WindowWrap<MetricBucket> wrap = data.currentWindow();
//通过窗口策略,会把通过的窗口内的counters中的PASS值增加count次
wrap.value().addPass(count);
}
………………
public class MetricBucket {
//所有信息存在这里的。LongAdder也是原子的
private final LongAdder[] counters;
private volatile long minRt;
public MetricBucket() {
//在构造的时候,就按照这个枚举来定义counters不同数组了。(PASS,BLOCK,EXCEPTION,SUCCESS,RT,OCCUPIED_PASS)。之后计算QPS也是通过这个类的方法得到的
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
initMinRt();
}
…………
}
-
BucketLeapArray
public class OccupiableBucketLeapArray extends LeapArray<MetricBucket> {
private final FutureBucketLeapArray borrowArray;
//这个FutureBucketLeapArray又是个啥?
public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
// This class is the original "CombinedBucketArray".
super(sampleCount, intervalInMs);
this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
}
……
}
接着往下
//一种只服务于future buckets的BucketLeapArray
public class FutureBucketLeapArray extends LeapArray<MetricBucket> {
public FutureBucketLeapArray(int sampleCount, int intervalInMs) {
// 调用的这里。
super(sampleCount, intervalInMs);
}
@Override
public MetricBucket newEmptyBucket(long time) {
return new MetricBucket();
}
@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
// Update the start time and reset value.
w.resetTo(startTime);
w.value().reset();
return w;
}
@Override
public boolean isWindowDeprecated(long time, WindowWrap<MetricBucket> windowWrap) {
// Tricky: will only calculate for future.
return time >= windowWrap.windowStart();
}
}
调用了父构造方法。
//基本的统计数据结构
public abstract class LeapArray<T> {
//以毫秒为单位的时间窗口
protected int windowLengthInMs;
// 采样窗口的个数
protected int sampleCount;
// 以毫秒为单位的时间间隔
protected int intervalInMs;
// 以秒为单位的时间间隔
private double intervalInSecond;
//定义一个原子数组, 采样的时间窗口数组
protected final AtomicReferenceArray<WindowWrap<T>> array;
/**
* The conditional (predicate) update lock is used only when current bucket is deprecated.
*/
private final ReentrantLock updateLock = new ReentrantLock();
/*
sampleCount = intervalInMs / windowLengthInMs 例如 1000/500 = 2。
通过这个代码,我们了解到时间窗口是2个。
*/
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
this.windowLengthInMs = intervalInMs / sampleCount; //2
this.intervalInMs = intervalInMs; //1000
this.intervalInSecond = intervalInMs / 1000.0; // 1
this.sampleCount = sampleCount; //2
this.array = new AtomicReferenceArray<>(sampleCount);
}
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
}
…………
}
这样ArrayMetric中就构造出了一个包含类内变量data 为LeapArray
然后,程序会一路进行下去,直到要进行实时的数据统计调用了,这里就是窗口算法的体现了。
/*
这个方法极其重要。窗口算法的体现
*/
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
//计算出时间的哈希值。因为array数组的长度是2出值只可能是 0 和 1.作为时间窗口数组array中的索引。
int idx = calculateTimeIdx(timeMillis);
//计算当前的毫秒其实值。 例如当前是12:10.303,出值为12:00.000。当然是按照毫秒计算的
long windowStart = calculateWindowStart(timeMillis);
/*
一个循环。通过时间窗口的值。
一共有三种情况。
1.bucket是空,创建一个bucket并且cas更新循环的数组
2.bucket更新了,就把更新的返回
3.bucket失效了,重置当前的bucket并且清空所有的时效bucket
这个while的动机要理解一下,其实就是为了在第3步更新完后再去调第2步用的。没其他的什么意义。你会发现1,2最终都是return,也就是直接从循环中结束外面的方法了。
*/
while (true) {
//第一次肯定是null
WindowWrap<T> old = array.get(idx);
if (old == null) {
//先创建一个新的
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// 给原子数组idx位置设置为新的window,直接返回
return window;
} else {
// 如果当前数组idx位不空,就不更新。让当前线程让出时间片,进入waitting()状态
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
//新的时间和已存在的时间窗口的时间一致,就直接返回已存在
return old;
} else if (windowStart > old.windowStart()) {
/*
如果当前时间大于已存在窗口的时间,那就意味着当前的已经失效。我们必须重置当前的时间。
因为重置和清除操作需要保证原子性,所以我们用了更新锁来保证bucket更新的正确性。
更新锁是有条件的(小范围)并且只有在bucket已经失效的时候才能发挥作用,所以大多场景下这不会导致性能的丢失
*/
if (updateLock.tryLock()) {
try {
//更新为新的时间
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
//如果没有获得锁那么就等待。
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// 如果当前时间小于已存在的时间,一般不可能走到这里。如果走就返回一个新的。
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
至此,就算出了QPS通过的,阻塞的,总数等指标了。上面的代码调用顺序不是线性的,简单的线性阅读肯定不能够掌握这里面的算法。就在我写这篇分析的时候,分析代码的时候都是跳来跳去,很容易把自己绕晕。所以,还是建议跟随者代码debug时,参照上面所说的进行阅读。这样效果要比较好。
上面的发生时机,我来分析下。因为slot的排列顺序是这样的:
# Sentinel default ProcessorSlots
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
然后,根据调用栈来看。如图:
可以看到,滑动窗口发生的时间是在统计阶段。还没有到限流或者熔断的阶段。