sentinel中的滑动窗口算法实现

  张一帆   2020年04月25日


  在上篇《sentinel的几个实体分析》的Node和Metric中,我没有花太长的时间研究。因为sentinel是如何进行qps等指标的统计的,跟StaticNode和Metric有很大的关系。所以,我想单拿出来写一篇。首先,确定一点就是sentinel是基于滑动窗口算法来实现的。

  在上一篇《sentinel降级背后的原理解析》已经分析了sentinel是如何驱动的。在里面提到了有很多的slot按照顺序依次排列并运行的。所以,我们先来看一看这些数据是按照什么路径获取到的。那么,首先我们知道StatisticSlot是进行统计的。所以我们就要看看他的entry方法

  • StatisticSlot

  //A processor slot that dedicates to real time statistics.
@Spi(order = Constants.ORDER_STATISTIC_SLOT)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
     @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            // 触发下一个slot的entry()
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
            //能往下执行就说明没有被限流或者降级
            // Request passed, add thread count and pass count.
            node.increaseThreadNum();
            node.addPassRequest(count);

            …………
        } catch (PriorityWaitException ex) {
            node.increaseThreadNum();
           …………            }
        } catch (BlockException e) {
            // Blocked, set block exception to current entry.
            context.getCurEntry().setBlockError(e);

            // Add block count.
            node.increaseBlockQps(count);
            …………
        } catch (Throwable e) {
            // Unexpected internal error, set error to current entry.
            context.getCurEntry().setError(e);

            throw e;
        }
    }
    
    …………
}

  从上面代码,我们可知

  1. 通过node中的当前的实时统计指标信息进行规则校验
  2. 如果通过了校验,则重新更新node中的实时指标数据
  3. 如果被block了或者出现异常了,则更新node中block的指标或异常指标

  从上面的代码中,可以清晰的看到所有的实时指标都是在node中进行的。我们以qps的指标进行分析,看sentinel是怎么统计出qps的。


  • 几种Node

  
//DefaultNode继承了StatisticNode实现了Node
public class DefaultNode extends StatisticNode {

    /**
     * Associated cluster node.
     */
    private ClusterNode clusterNode;
    …………
    
    //下面两句其实执行的方法都是一个。但是一个属于DefaultNode,一个属于ClusterNode。DefaultNode指向了ClusterNode
    @Override
    public void addPassRequest(int count) {
        //调用StatisticNode的
        super.addPassRequest(count);
        //ClusterNode没有定义这个方法,向父类找,在StaticsticNode中找到了
        this.clusterNode.addPassRequest(count);
    }
    
    …………
}

  DefaultNode和ClusterNode有一些区别。

  DefaultNode是保存着某个resource的某个context的实时指标。每个DefaultNode都指向一个ClusterNode,

  ClusterNode是保存着某个resource在所有的context中实时指标的总和。同样的resource会共享同一个ClusterNode,不管它在那个context中。

  上面说了,最终都是执行的StatisticNode对象的addPassRequest方法:

  public class StatisticNode implements Node {
    
    //虽然用的是ArrayMetric。但是真正构造的却是OccupiableBucketLeapArray
     private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
        IntervalProperty.INTERVAL);

    private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

    ……
    
    //都是Metric的类型。所以我们应该去Metric看看
    @Override
    public void addPassRequest(int count) {
        rollingCounterInSecond.addPass(count); //下面几个列子改造的数据结构,再返回这里向下看。
        //应该去看OccupiableBucketLeapArray.addPass()。但是OccupiableBucketLeapArray没有这个方法,所以就向上看他的父亲的addPass()
        rollingCounterInMinute.addPass(count);
    }

    ……
}


  • ArrayMetric

  /*
采用BucketLeapArray结构的基础指标结构
*/
public class ArrayMetric implements Metric {
    
    //这块要求了MetricBucekt。MetricBucket是记录着一段时间内的所有指标(pass,block等等)
    private final LeapArray<MetricBucket> data;
    
    //构造函数,我们要看看OccupiableBucketLeapArray这个结构
    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }

    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
        if (enableOccupy) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        } else {
            //还要看看这个结构,有什么区别
            this.data = new BucketLeapArray(sampleCount, intervalInMs);
        }
    }

}

    //data是LeapArray
    @Override
    public void addPass(int count) {
        //调用OccupiableBucketLeapArray的currentWindow()。但是OccupiableBucketLeapArray没有,他父亲LeapArray有。
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        //通过窗口策略,会把通过的窗口内的counters中的PASS值增加count次
        wrap.value().addPass(count);
    }
    
    ………………

  public class MetricBucket {

    //所有信息存在这里的。LongAdder也是原子的
    private final LongAdder[] counters;

    private volatile long minRt;

    public MetricBucket() {
        //在构造的时候,就按照这个枚举来定义counters不同数组了。(PASS,BLOCK,EXCEPTION,SUCCESS,RT,OCCUPIED_PASS)。之后计算QPS也是通过这个类的方法得到的
        MetricEvent[] events = MetricEvent.values();
        this.counters = new LongAdder[events.length];
        for (MetricEvent event : events) {
            counters[event.ordinal()] = new LongAdder();
        }
        initMinRt();
    }
    …………
}


  • BucketLeapArray

  
public class OccupiableBucketLeapArray extends LeapArray<MetricBucket> {

    private final FutureBucketLeapArray borrowArray;
    
      //这个FutureBucketLeapArray又是个啥?
      public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
        // This class is the original "CombinedBucketArray".
        super(sampleCount, intervalInMs);
        this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
    }
    
    ……
}


  接着往下

  
//一种只服务于future buckets的BucketLeapArray
public class FutureBucketLeapArray extends LeapArray<MetricBucket> {

    public FutureBucketLeapArray(int sampleCount, int intervalInMs) {
        // 调用的这里。
        super(sampleCount, intervalInMs);
    }

    @Override
    public MetricBucket newEmptyBucket(long time) {
        return new MetricBucket();
    }

    @Override
    protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
        // Update the start time and reset value.
        w.resetTo(startTime);
        w.value().reset();
        return w;
    }

    @Override
    public boolean isWindowDeprecated(long time, WindowWrap<MetricBucket> windowWrap) {
        // Tricky: will only calculate for future.
        return time >= windowWrap.windowStart();
    }
}


  调用了父构造方法。

  
//基本的统计数据结构
public abstract class LeapArray<T> {
    //以毫秒为单位的时间窗口
    protected int windowLengthInMs;
    // 采样窗口的个数
    protected int sampleCount;
    // 以毫秒为单位的时间间隔
    protected int intervalInMs;
    // 以秒为单位的时间间隔
    private double intervalInSecond;

    //定义一个原子数组, 采样的时间窗口数组
    protected final AtomicReferenceArray<WindowWrap<T>> array;

    /**
     * The conditional (predicate) update lock is used only when current bucket is deprecated.
     */
    private final ReentrantLock updateLock = new ReentrantLock();
     
     /*
     sampleCount = intervalInMs / windowLengthInMs 例如 1000/500 = 2。
     通过这个代码,我们了解到时间窗口是2个。
     */
    public LeapArray(int sampleCount, int intervalInMs) {
        AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
        AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
        AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");

        this.windowLengthInMs = intervalInMs / sampleCount; //2
        this.intervalInMs = intervalInMs; //1000
        this.intervalInSecond = intervalInMs / 1000.0; // 1
        this.sampleCount = sampleCount; //2

        this.array = new AtomicReferenceArray<>(sampleCount);
    }


    public WindowWrap<T> currentWindow() {
        return currentWindow(TimeUtil.currentTimeMillis());
    }
    
   …………

}

  这样ArrayMetric中就构造出了一个包含类内变量data 为LeapArray的数组来了,最终以ArrayMetric返回。

  然后,程序会一路进行下去,直到要进行实时的数据统计调用了,这里就是窗口算法的体现了。

      /*
    这个方法极其重要。窗口算法的体现
    */
    public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        
        //计算出时间的哈希值。因为array数组的长度是2出值只可能是 0 和 1.作为时间窗口数组array中的索引。
        int idx = calculateTimeIdx(timeMillis);
        //计算当前的毫秒其实值。 例如当前是12:10.303,出值为12:00.000。当然是按照毫秒计算的
        long windowStart = calculateWindowStart(timeMillis);

         /*
         一个循环。通过时间窗口的值。
         一共有三种情况。
         1.bucket是空,创建一个bucket并且cas更新循环的数组
         2.bucket更新了,就把更新的返回
         3.bucket失效了,重置当前的bucket并且清空所有的时效bucket
         这个while的动机要理解一下,其实就是为了在第3步更新完后再去调第2步用的。没其他的什么意义。你会发现1,2最终都是return,也就是直接从循环中结束外面的方法了。
         */
        while (true) {
            //第一次肯定是null
            WindowWrap<T> old = array.get(idx);
            if (old == null) {
                //先创建一个新的
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    // 给原子数组idx位置设置为新的window,直接返回
                    return window;
                } else {
                    // 如果当前数组idx位不空,就不更新。让当前线程让出时间片,进入waitting()状态
                    Thread.yield();
                }
            } else if (windowStart == old.windowStart()) {
                 //新的时间和已存在的时间窗口的时间一致,就直接返回已存在
                return old;
            } else if (windowStart > old.windowStart()) {
                 /*
                 如果当前时间大于已存在窗口的时间,那就意味着当前的已经失效。我们必须重置当前的时间。
                 因为重置和清除操作需要保证原子性,所以我们用了更新锁来保证bucket更新的正确性。
                 更新锁是有条件的(小范围)并且只有在bucket已经失效的时候才能发挥作用,所以大多场景下这不会导致性能的丢失
                 */
                if (updateLock.tryLock()) {
                    try {
                        //更新为新的时间
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    //如果没有获得锁那么就等待。
                    Thread.yield();
                }
            } else if (windowStart < old.windowStart()) {
                // 如果当前时间小于已存在的时间,一般不可能走到这里。如果走就返回一个新的。
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }


  至此,就算出了QPS通过的,阻塞的,总数等指标了。上面的代码调用顺序不是线性的,简单的线性阅读肯定不能够掌握这里面的算法。就在我写这篇分析的时候,分析代码的时候都是跳来跳去,很容易把自己绕晕。所以,还是建议跟随者代码debug时,参照上面所说的进行阅读。这样效果要比较好。

  上面的发生时机,我来分析下。因为slot的排列顺序是这样的:

  # Sentinel default ProcessorSlots
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot

  然后,根据调用栈来看。如图:

调用栈

  可以看到,滑动窗口发生的时间是在统计阶段。还没有到限流或者熔断的阶段。