sentinel的几个实体分析

  张一帆   2020年03月01日


sentinel中有几个比较重要的实体需要我们了解。

  • resource

  resource是sentinel中最重要的一个概念。sentinel通过resource来保护具体的业务代码和其他后方服务。sentinel把复杂的逻辑给屏蔽掉了,用户只需要为受保护的代码或服务定义一个resource,然后定义rule就可以了,剩下的就通通交给sentinel来处理。并且resource和rule是解耦的,rule设置可以在runtime动态修改,定义完resource后,就可以通过在程序中埋点来保护自己的服务了,埋点的方式有两种:

  • try-catch 方式(通过 SphU.entry(…)),当 catch 到BlockException时执行异常处理(或fallback)
  • if-else 方式(通过 SphO.entry(…)),当返回 false 时执行异常处理(或fallback)

  以上这两种方式都是通过硬编码的形式定义resource然后进行resource埋点的,对业务代码的侵入太大,从0.1.1版本开始, Sentinel 加入了注解的支持,可以通过注解来定义资源,具体的注解为:SentinelResource 。通过注解除了可以定义资源外,还可以指定 blockHandler 和 fallback 方法。

  //这个注解代表定义一个sentinel resource
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface SentinelResource {

    //sentinel resource名称
    String value() default "";
 
    //entry类型 (输入或者输出,默认是输出)
    EntryType entryType() default EntryType.OUT;

     //资源的分类(类型)
    int resourceType() default 0;

    //限流的异常名,默认为空
    String blockHandler() default "";
     
    /*
    blockHandler默认和原方法在同一个类中。
    如果希望使用其他类的函数,则需要指定 blockHandlerClass 为对应的类的 Class 对象。必须是静态的,否则无法解析
    */
    Class<?>[] blockHandlerClass() default {};

    //熔断方法名
    String fallback() default "";

     /*
     defaultFallBack()用来做默认的降级方法
     这个方法不接受任何参数,并且返回必须的类型和原始方法适配
     */
    String defaultFallback() default "";

     /*
     fallback()默认情况下是和原方法在一起的。
     同blockHandlerClass。
     */
    Class<?>[] fallbackClass() default {};

     //用来追踪的异常列表
    Class<? extends Throwable>[] exceptionsToTrace() default {Throwable.class};
    
     /*
     指明有哪些异常可以被忽略。
     exceptionsToTrace()和exceptionsToIgnore()不能够同时存在
     exceptionsToIgnore()优先使用。
     */
    Class<? extends Throwable>[] exceptionsToIgnore() default {};
}


  其中EntryType的枚举为:

  public enum EntryType {
    IN("IN"),
    OUT("OUT");
}

  有了注解以后,接下来就是看看通过反射,获取配置了注解的方法是如何使用的了。使用了@SentinelResource就需要依赖sentinel-annotation-aspectj了。具体实现如下:

   //SentinelResource切面的实现。继承于AbstractSentinelAspectSupport
@Aspect //定义切面
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {

//匹配有SentinelResource注解的方法。设置切入点
    @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
    public void sentinelResourceAnnotationPointcut() {
    }

//以sentinelResourceAnnotationPointcut()为切入点,环绕增强。也就是说有这个注解的方法环绕增强
    @Around("sentinelResourceAnnotationPointcut()")
    public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
        //反射,获取方法信息
        Method originMethod = resolveMethod(pjp);
        //通过方法反射出注解信息
        SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
        //如果注解信息为null的话,就抛出异常。否则,继续
        if (annotation == null) {
            throw new IllegalStateException("Wrong state for SentinelResource annotation");
        }
        //获取资源名
        String resourceName = getResourceName(annotation.value(), originMethod);
        //获取entry类型
        EntryType entryType = annotation.entryType();
        //获取资源类型
        int resourceType = annotation.resourceType();
        Entry entry = null;
        try {
            //出发entry()出发熔断降级策略,具体看我的[sentinel原理](https://www.naffan.cn/tech/2020/04/04/01.html)
            entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
            return pjp.proceed();
        } catch (BlockException ex) {
            //如果遇到了BlockException说明限流阻塞了。具体会去调用注解中设置的blockHandler()方法
            return handleBlockException(pjp, annotation, ex);
        } catch (Throwable ex) {
            //反射注解中的忽略异常列表
            Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
            // 如果异常忽略列表数量不为空且抛出的异常属于忽略异常,继续抛出异常。
            if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
                throw ex;
            }
            //如果抛出的异常属于需要跟踪的异常
            if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
                //跟踪,并且出发熔断降级方法,handleFallBack()
                traceException(ex);
                return handleFallback(pjp, annotation, ex);
            }

            // No fallback function can handle the exception, so throw it out.
            throw ex;
        } finally {
            //最终不要忘了entry需要exit。回收资源用的
            if (entry != null) {
                entry.exit(1, pjp.getArgs());
            }
        }
    }
}


  我们看完了反射的注解了。但是其中有一些方法是继承于AbstractSentinelAspectSupport的实现方法。我们也来看看刚才用到的方法:

  
//一些给切面提供功能的普通方法
public abstract class AbstractSentinelAspectSupport {

    //追踪记录异常,这些异常不是阻塞的异常
    protected void traceException(Throwable ex) {
        Tracer.trace(ex);
    }

    //判断ex和annotation反射出来的类型。然后进行追踪
    protected void traceException(Throwable ex, SentinelResource annotation) {
        Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
        // The ignore list will be checked first.
        if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
            return;
        }
        if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
            traceException(ex);
        }
    }
    
    //判断异常是否在提供的异常列表中
    protected boolean exceptionBelongsTo(Throwable ex, Class<? extends Throwable>[] exceptions) {
        if (exceptions == null) {
            return false;
        }
        for (Class<? extends Throwable> exceptionClass : exceptions) {
            if (exceptionClass.isAssignableFrom(ex.getClass())) {
                return true;
            }
        }
        return false;
    }
    
    //获取资源名称
    protected String getResourceName(String resourceName, /*@NonNull*/ Method method) {
        // If resource name is present in annotation, use this value.
        if (StringUtil.isNotBlank(resourceName)) {
            return resourceName;
        }
        // 调用方法类中的esolveMethodName。这个方法用因为涉及线程安全问题,所以加锁来获取资源名称。
        return MethodUtil.resolveMethodName(method);
    }

    //调用熔断方法
    protected Object handleFallback(ProceedingJoinPoint pjp, SentinelResource annotation, Throwable ex)
        throws Throwable {
        return handleFallback(pjp, annotation.fallback(), annotation.defaultFallback(), annotation.fallbackClass(), ex);
    }

    //真正执行熔断的方法
    protected Object handleFallback(ProceedingJoinPoint pjp, String fallback, String defaultFallback,
                                    Class<?>[] fallbackClass, Throwable ex) throws Throwable {
        //获取参数
        Object[] originArgs = pjp.getArgs();

        // 如果配置了就执行熔断方法,没有配置就调用一个默认的熔断方法
        Method fallbackMethod = extractFallbackMethod(pjp, fallback, fallbackClass);
        if (fallbackMethod != null) {
            // 构造参数
            int paramCount = fallbackMethod.getParameterTypes().length;
            Object[] args;
            if (paramCount == originArgs.length) {
                args = originArgs;
            } else {
                args = Arrays.copyOf(originArgs, originArgs.length + 1);
                args[args.length - 1] = ex;
            }
            
            try {
                //熔断方法是否是静态方法
                if (isStatic(fallbackMethod)) {
                    //调用静态的熔断方法
                    return fallbackMethod.invoke(null, args);
                }
                 //不是静态方法的话,就调用熔断方法
                return fallbackMethod.invoke(pjp.getTarget(), args);
            } catch (InvocationTargetException e) {
                // throw the actual exception
                throw e.getTargetException();
            }
        }
        // If fallback is absent, we'll try the defaultFallback if provided.
        return handleDefaultFallback(pjp, defaultFallback, fallbackClass, ex);
    }

    //默认的熔断方法。过程同上
    protected Object handleDefaultFallback(ProceedingJoinPoint pjp, String defaultFallback,
                                           Class<?>[] fallbackClass, Throwable ex) throws Throwable {
        // Execute the default fallback function if configured.
        Method fallbackMethod = extractDefaultFallbackMethod(pjp, defaultFallback, fallbackClass);
        if (fallbackMethod != null) {
            // Construct args.
            Object[] args = fallbackMethod.getParameterTypes().length == 0 ? new Object[0] : new Object[] {ex};
            try {
                if (isStatic(fallbackMethod)) {
                    return fallbackMethod.invoke(null, args);
                }
                return fallbackMethod.invoke(pjp.getTarget(), args);
            } catch (InvocationTargetException e) {
                // throw the actual exception
                throw e.getTargetException();
            }
        }

        // If no any fallback is present, then directly throw the exception.
        throw ex;
    }

    //调用阻塞限流,同上
    protected Object handleBlockException(ProceedingJoinPoint pjp, SentinelResource annotation, BlockException ex)
        throws Throwable {

        // Execute block handler if configured.
        Method blockHandlerMethod = extractBlockHandlerMethod(pjp, annotation.blockHandler(),
            annotation.blockHandlerClass());
        if (blockHandlerMethod != null) {
            Object[] originArgs = pjp.getArgs();
            // Construct args.
            Object[] args = Arrays.copyOf(originArgs, originArgs.length + 1);
            args[args.length - 1] = ex;
            try {
                if (isStatic(blockHandlerMethod)) {
                    return blockHandlerMethod.invoke(null, args);
                }
                return blockHandlerMethod.invoke(pjp.getTarget(), args);
            } catch (InvocationTargetException e) {
                // throw the actual exception
                throw e.getTargetException();
            }
        }

        // If no block handler is present, then go to fallback.
        return handleFallback(pjp, annotation, ex);
    }
    
    //判断方法是否是静态方法
    private boolean isStatic(Method method) {
        return Modifier.isStatic(method.getModifiers());
    }
    …………   
}

  至此,通过@SentinelResource使用熔断降级,阻塞限流的原理就分析完成了。那接下来让我们看看ResourceWrapper是什么。

resource的种类

  在 Sentinel 中具体表示资源的类是:ResourceWrapper ,他是一个抽象的包装类,包装了资源的 Name 和EntryType。他有两个实现类,分别是:StringResourceWrapper 和 MethodResourceWrapper

   //resource的关于name和type的封装
public abstract class ResourceWrapper {

    protected final String name;

    //上面已经列出了EntryType的代码了,一个是IN,一个是OUT
    protected final EntryType entryType;
    protected final int resourceType;

    //构造犯法,先判name和entryType是否是空或者null。如果不是的话就将其赋值给类内部变量。
    public ResourceWrapper(String name, EntryType entryType, int resourceType) {
        AssertUtil.notEmpty(name, "resource name cannot be empty");
        AssertUtil.notNull(entryType, "entryType cannot be null");
        this.name = name;
        this.entryType = entryType;
        this.resourceType = resourceType;
    }

    public String getName() {
        return name;
    }

    public EntryType getEntryType() {
        return entryType;
    }

    public int getResourceType() {
        return resourceType;
    }

     //获取美化后的资源名称
    public abstract String getShowName();

    @Override
    public int hashCode() {
        return getName().hashCode();
    }

    @Override
    public boolean equals(Object obj) {
        if (obj instanceof ResourceWrapper) {
            ResourceWrapper rw = (ResourceWrapper)obj;
            return rw.getName().equals(getName());
        }
        return false;
    }
}


  从代码中,我们可以看到其实这就是一个简单的resource封装。在实际的使用过程中,作为传递的参数。这样的好处是传递时可以记录下当时的状态信息等。接下来我们看看两种继承类

  //通过方法信息来封装resource
public class MethodResourceWrapper extends ResourceWrapper {

    private final transient Method method;

    public MethodResourceWrapper(Method method, EntryType e) {
        //调用下面的方法
        this(method, e, ResourceTypeConstants.COMMON);
    }

    public MethodResourceWrapper(Method method, EntryType e, int resType) {
        //调用父类的构造方法。第一参数是资源名,这里通过方法类解析出方法名字符串格式
        super(MethodUtil.resolveMethodName(method), e, resType);
        this.method = method;
    }

    public Method getMethod() {
        return method;
    }

    @Override
    public String getShowName() {
        return name;
    }

    //这个其实就是之后的resouce的Key
    @Override
    public String toString() {
        return "MethodResourceWrapper{" +
            "name='" + name + '\'' +
            ", entryType=" + entryType +
            ", resourceType=" + resourceType +
            '}';
    }
}   


  另一个是

  //其实和上面的方法一个意思,只是下面的实现直接可以把resource的string类型取到了
public class StringResourceWrapper extends ResourceWrapper {

    public StringResourceWrapper(String name, EntryType e) {
        super(name, e, ResourceTypeConstants.COMMON);
    }

    public StringResourceWrapper(String name, EntryType e, int resType) {
        super(name, e, resType);
    }

    @Override
    public String getShowName() {
        return name;
    }

    //这个名称和上面的有些许的不一样
    @Override
    public String toString() {
        return "StringResourceWrapper{" +
            "name='" + name + '\'' +
            ", entryType=" + entryType +
            ", resourceType=" + resourceType +
            '}';
    }
}


  当然,这块可以自定义的。只要按照SPI的规则来即可。


  • slot

  Slot 是另一个 Sentinel 中非常重要的概念, Sentinel 的工作流程就是围绕着一个个插槽所组成的插槽链来展开的。需要注意的是每个插槽都有自己的职责,他们各司其职完好的配合,通过一定的编排顺序,来达到最终的限流降级的目的。默认的各个插槽之间的顺序是固定的,因为有的插槽需要依赖其他的插槽计算出来的结果才能进行工作。

  但是这并不意味着我们只能按照框架的定义来,Sentinel 通过 SlotChainBuilder 作为 SPI 接口,使得 Slot Chain 具备了扩展的能力。我们可以通过实现 SlotsChainBuilder 接口加入自定义的 slot 并自定义编排各个 slot 之间的顺序,从而可以给 Sentinel 添加自定义的功能。

  那SlotChain是在哪创建的呢?是在 CtSph.lookProcessChain() 方法中创建的,并且该方法会根据当前请求的资源先去一个静态的HashMap中获取,如果获取不到才会创建,创建后会保存到HashMap中。这就意味着,同一个资源会全局共享一个SlotChain。

  相关slot的更多讲解可以看我之前写的sentinel原理解析,上面介绍的slot都在里面讲到过

slot图


  • context

  context是sentinel整个系统的上下文功能。context里面记载着很多关键信息。

   /*
 这个类保存着整个调用过程中的元数据,这其中包括
 EntranceNode - 当前调用树的跟节点
 Entry - 当前调用节点
 Node - 和Entry有关的统计信息
 origin - 先解释为源点吧。源点对我们区分控制发起者/消费者非常用用。一般源点可以视为消费者的服务app名称或者ip
 每个SphU.entry或者SphO.entry应该都在一个context中,如果我们不明确的调用ContextUtil.enter()方法,那么会使用一个默认的context。
 如果我们在同一个context中多次调用SphU.entry()一个调用树会被创建起来。
 在不同的context中的相同的resource会分开来计算,详情请看NodeSelectorSlot的实现
 */
public class Context {

    private final String name;

    //当前调用树的入口node
    private DefaultNode entranceNode;

    //当前所在的entry
    private Entry curEntry;

    //当前context的源点信息 ,一般是消费者名称或者是个ip
    private String origin = "";

    private final boolean async;

     //新建一个同步的上下文
    public static Context newAsyncContext(DefaultNode entranceNode, String name) {
        return new Context(name, entranceNode, true);
    }

    public Context(DefaultNode entranceNode, String name) {
        this(name, entranceNode, false);
    }
    
    //建立context的最终方法。就是给context对象里设置这三个类内变量
    public Context(String name, DefaultNode entranceNode, boolean async) {
        this.name = name;
        this.entranceNode = entranceNode;
        this.async = async;
    }

    public boolean isAsync() {
        return async;
    }

    public String getName() {
        return name;
    }
    
    //下面是一堆get和set方法
    public Node getCurNode() {
        return curEntry == null ? null : curEntry.getCurNode();
    }

    public Context setCurNode(Node node) {
        this.curEntry.setCurNode(node);
        return this;
    }

    public Entry getCurEntry() {
        return curEntry;
    }

    public Context setCurEntry(Entry curEntry) {
        this.curEntry = curEntry;
        return this;
    }

    public String getOrigin() {
        return origin;
    }

    public Context setOrigin(String origin) {
        this.origin = origin;
        return this;
    }

    public double getOriginTotalQps() {
        return getOriginNode() == null ? 0 : getOriginNode().totalQps();
    }

    public double getOriginBlockQps() {
        return getOriginNode() == null ? 0 : getOriginNode().blockQps();
    }

    public double getOriginPassReqQps() {
        return getOriginNode() == null ? 0 : getOriginNode().successQps();
    }

    public double getOriginPassQps() {
        return getOriginNode() == null ? 0 : getOriginNode().passQps();
    }

    public long getOriginTotalRequest() {
        return getOriginNode() == null ? 0 : getOriginNode().totalRequest();
    }

    public long getOriginBlockRequest() {
        return getOriginNode() == null ? 0 : getOriginNode().blockRequest();
    }

    public double getOriginAvgRt() {
        return getOriginNode() == null ? 0 : getOriginNode().avgRt();
    }

    public int getOriginCurThreadNum() {
        return getOriginNode() == null ? 0 : getOriginNode().curThreadNum();
    }

    public DefaultNode getEntranceNode() {
        return entranceNode;
    }

     //获取当前节点的父节点
    public Node getLastNode() {
        if (curEntry != null && curEntry.getLastNode() != null) {
            return curEntry.getLastNode();
        } else {
            return entranceNode;
        }
    }

     //获取源点
    public Node getOriginNode() {
        return curEntry == null ? null : curEntry.getOriginNode();
    }

    @Override
    public String toString() {
        return "Context{" +
            "name='" + name + '\'' +
            ", entranceNode=" + entranceNode +
            ", curEntry=" + curEntry +
            ", origin='" + origin + '\'' +
            ", async=" + async +
            '}';
    }
}


  可以看到,context中存储着entry和node等信息。等一会我会详细的分析这两个类。但是,我想想先看看context的创建实现。context的实现位置在ContextUtil.java中

   //工具类,用来在当前线程中获取或者创建上下文
public class ContextUtil {

    //存储在threadLocal中,为了方便存取
    private static ThreadLocal<Context> contextHolder = new ThreadLocal<>();

     //定义一个静态的且是volatile类型的hashMap来存储所有EntranceNode类,每个EntrnceNode和一个唯一的上下文名称绑定。 这个名称是Context.toString定义的
     //为什么不用concurrnetHashMap呢?
    private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();

    //用一个可重入锁
    private static final ReentrantLock LOCK = new ReentrantLock();
    //nullContext这个也是一个类。一会看看他干了啥
    private static final Context NULL_CONTEXT = new NullContext();

    //静态代码块,直接运行
    static {
        //缓存EntranceNode
        initDefaultContext();
    }

    private static void initDefaultContext() {
        //默认的上下文名称
        String defaultContextName = Constants.CONTEXT_DEFAULT_NAME;
        //构造EntranceNode,采用字符串封装Resource的方式。EntryType为出口类型
        EntranceNode node = new EntranceNode(new StringResourceWrapper(defaultContextName, EntryType.IN), null);
        //将node添加在root下。
        Constants.ROOT.addChild(node);
        //存储在hashmap中
        contextNameNodeMap.put(defaultContextName, node);
    }

    //测试用,非线程安全。重置hashMap用的
    static void resetContextMap() {
        if (contextNameNodeMap != null) {
            RecordLog.warn("Context map cleared and reset to initial state");
            contextNameNodeMap.clear();
            initDefaultContext();
        }
    }

    /*
    进入调用上下文,它标记为调用链的入口。
    context被ThreadLocal封起来了,意味着每个线程拥有着自己的context
    如果当前线程没有context,就会创建一个context给这个线程。
    一个context会与一个调用链中的一个代表着入口统计节点的EntranceNode进行绑定。
    如果当前context没有EntranceNode,新的EntranceNode会被创建出来。
    要注意的是同样的context名可能会被同样的EntraceNode全局共享。
    
    在ClusterBuilderSlot类中会会创建源节点
    不同的资源的不同源点会创建不同的Node,统计源点的总数等于resource名称量+源点数量
    所以,当有很多源点时,应该仔细考虑内存占用率。
        
    不同的context在同样的resource中会被单独计算,具体看NodeSelectorSlot
    */

       public static Context enter(String name, String origin) {
       //如果context name等于配置中的默认名称就抛出相应的exception
        if (Constants.CONTEXT_DEFAULT_NAME.equals(name)) {
            throw new ContextNameDefineException(
                "The " + Constants.CONTEXT_DEFAULT_NAME + " can't be permit to defined!");
        }
        //调用下面的方法
        return trueEnter(name, origin);
    }

    //真正实现创建context的实现。
    protected static Context trueEnter(String name, String origin) {
        //从threadLocal中获取context
        Context context = contextHolder.get();
        if (context == null) {
            //如果context为空,首先定义一个空的hashMap
            Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
            //从hashMap中找到key是name的value。
            DefaultNode node = localCacheNameMap.get(name);
            if (node == null) {
                //node是空的话,hashMap的大小大于Constants.MAX_CONTEXT_NAME_SIZE
                if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                    //把hasehMap设置为空
                    setNullContext();
                    return NULL_CONTEXT;
                } else {
                    //node不空,需要上锁,用的重入锁
                    LOCK.lock();
                    try {
                        //double get
                        node = contextNameNodeMap.get(name);
                        if (node == null) {
                            //如果还是空的话,hashMap的大小大于Constants.MAX_CONTEXT_NAME_SIZE,跟上面的处理一样。
                            if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                                setNullContext();
                                return NULL_CONTEXT;
                            } else {
                                //这块是重点。定义一个EntranceNode。
                                node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                                // 将Node添加到根下面,也就是构造调用树。
                                Constants.ROOT.addChild(node);
                                //再定义一个比上边hashMap空间大1的hashMap
                                Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
                                //将上边的hashMap放入新的HashMap中
                                newMap.putAll(contextNameNodeMap);
                                //然后,多出来的那一个空间用来存放当前的node
                                newMap.put(name, node);
                                //原先的HashMap更新为新的HashMap
                                contextNameNodeMap = newMap;
                            }
                        }
                    } finally {
                        //结束,解锁
                        LOCK.unlock();
                    }
                }
            }
            //hashMap构造完毕以后,开始构造context。并保存到当前的ThreadLocal中。
            context = new Context(node, name);
            context.setOrigin(origin);
            contextHolder.set(context);
        }
        //返回context,这个context
        return context;
    }

    private static boolean shouldWarn = true;

    //把hashMap设置为空。
    private static void setNullContext() {
        contextHolder.set(NULL_CONTEXT);
        // Don't need to be thread-safe.
        if (shouldWarn) {
            RecordLog.warn("[SentinelStatusChecker] WARN: Amount of context exceeds the threshold "
                + Constants.MAX_CONTEXT_NAME_SIZE + ". Entries in new contexts will NOT take effect!");
            shouldWarn = false;
        }
    }
    
    //重载Context.enter()
    public static Context enter(String name) {
        return enter(name, "");
    }

     //从当前线程中退出context,也就是从ThreadLocal中删除Context
    public static void exit() {
        Context context = contextHolder.get();
        if (context != null && context.getCurEntry() == null) {
            contextHolder.set(null);
        }
    }

     //获取当前entrance node hashMap的大小
    public static int contextSize() {
        return contextNameNodeMap.size();
    }

     //判断提供的context是不是初始自动创建额context
    public static boolean isDefaultContext(Context context) {
        if (context == null) {
            return false;
        }
        return Constants.CONTEXT_DEFAULT_NAME.equals(context.getName());
    }

     //从当前的线程中获取context
    public static Context getContext() {
        return contextHolder.get();
    }

     //替换context的方法。
    static Context replaceContext(Context newContext) {
        Context backupContext = contextHolder.get();
        if (newContext == null) {
            contextHolder.remove();
        } else {
            contextHolder.set(newContext);
        }
        return backupContext;
    }

    //执行提供的context中的代码。可以看到这个f是个runnable接口实现。也就是说f是个可以进行扩展的方法。提供开发者自定义。
    public static void runOnContext(Context context, Runnable f) {
        Context curContext = replaceContext(context);
        try {
            f.run();
        } finally {
            replaceContext(curContext);
        }
    }
}


  ContextUtil看完了,再看看NullContext。

   
 //这个类比较简单,就是调用Context中的context方法,创建一个空的context。意味着将会没有检测能够提供。
public class NullContext extends Context {

    public NullContext() {
        super(null, "null_context_internal");
    }
}

  就这样,我了解了context在sentinel中起的作用了。其实,主要就是为了将请求行程有关系的结构,也就是调用链,其结构是由Node(节点)组成的树。


  • entry

  Entry 是 Sentinel 中用来表示是否通过限流的一个凭证,就像一个token一样。每次执行 SphU.entry() 或 SphO.entry() 都会返回一个 Entry 给调用者,意思就是告诉调用者,如果正确返回了 Entry 给你,那表示你可以正常访问被 Sentinel 保护的后方服务了,否则 Sentinel 会抛出一个BlockException(如果是 SphO.entry() 会返回false),这就表示调用者想要访问的服务被保护了,也就是说调用者本身被限流了。

  entry中保存了本次执行 entry() 方法的一些基本信息,包括:

  • createTime:当前Entry的创建时间,主要用来后期计算rt
  • node:当前Entry所关联的node,该node主要是记录了当前context下该资源的统计信息
  • origin:当前Entry的调用来源,通常是调用方的应用名称,在 ClusterBuilderSlot.entry() 方法中设置的
  • resourceWrapper:当前Entry所关联的资源

  当在一个上下文中多次调用了 SphU.entry() 方法时,就会创建一个调用树,这个树的节点之间是通过parent和child关系维持的。

  需要注意的是:parent和child是在 CtSph 类的一个私有内部类 CtEntry 中定义的,CtEntry 是 Entry 的一个子类。 由于context中总是保存着调用链树中的当前入口,所以当当前entry执行exit退出时,需要将parent设置为当前入口。

sentinal结构图


  • node

  Node 中保存了资源的实时统计数据,例如:passQps,blockQps,rt等实时数据。正是有了这些统计数据后, Sentinel 才能进行限流、降级等一系列的操作。

  node是一个接口,他有一个实现类:StatisticNode,但是StatisticNode本身也有两个子类,一个是DefaultNode,另一个是ClusterNode,DefaultNode又有一个子类叫EntranceNode。

  其中entranceNode是每个上下文的入口,该节点是直接挂在root下的,是全局唯一的,每一个context都会对应一个entranceNode。另外defaultNode是记录当前调用的实时数据的,每个defaultNode都关联着一个资源和clusterNode,有着相同资源的defaultNode,他们关联着同一个clusterNode。

  首先,我先看看Node的定义。其实想要全部看懂Node的设计,还是应该先了解下Metric度量的设计,否则可能不太明白sentinel怎么进行计算每个时间维度的量:

   //掌握着资源的时实统计数据
public interface Node extends OccupySupport, DebugSupport {

     //一分钟的所有来源请求(pass + block)
    long totalRequest();

     //一分钟pass的请求
    long totalPass();

    long totalSuccess();

    long blockRequest();

     //一分钟异常总数
    long totalException();

     //每秒通过的请求
    double passQps();

    double blockQps();

    //passQps + blockQps
    double totalQps();

     //每秒调用了Entry.exit()的请求。调用了这方法说明整个流程走完了,没有抛出blockException也就是没有进行熔断限流
    double successQps();

     //目前为止预估最大的成功qps
    double maxSuccessQps();

    double exceptionQps();

     //rt - response time。一秒平均返回耗时
    double avgRt();

    double minRt();

     //当前激活中的线程总数
    int curThreadNum();

     //迁移秒钟被限制的QPS
    double previousBlockQps();

    double previousPassQps();

     //所有资源的可用的检查节点
    Map<Long, MetricNode> metrics();

    /**
     * Fetch all raw metric items that satisfies the time predicate.
     *
     * @param timePredicate time predicate
     * @return raw metric items that satisfies the time predicate
     * @since 1.7.0
     */
    List<MetricNode> rawMetricsInMin(Predicate<Long> timePredicate);

    void addPassRequest(int count);

    void addRtAndSuccess(long rt, int success);

    void increaseBlockQps(int count);

    void increaseExceptionQps(int count);

    void increaseThreadNum();

    void decreaseThreadNum();

    //终止内部计算器。
    void reset();
}


  Node接口的描述也有了。那么剩下的实现其实就是在其基础上实现不同的功能了。我不打算一一进行分析了。所以,就暂时先分析一个StaticNode吧,毕竟其他的Node也都是继承于他。

  /*
统计节点有三种实时统计指标:
秒级统计
分钟级别统计
线程总数

sentinel用滑动窗口来记录和统计实时的resource数据。
ArrayMetric的LeapArray是实现滑动窗口的基础设施。

案例1:当第一个请求进来,sentinel会创建一个新的窗口,这个窗口的意思是有着能够将统计数据保存在特定时间跨度的桶。例如总响应时间,QPS,拦截请求等等。这个时间跨度是由样本数量确定的,也就是说是动态的。

 * 	0      100ms
 *  +-------+--→ 滑动窗口
 * 	    ^
 * 	    |
 * 	   请求

Sentinel使用有效桶的静态信息来决定是否可以传递此请求。比如,如果一个规则定义是只能有100个请求能够通过,他会在有效桶中将所有qps加起来与规则定义的阈值(100)进行对比。

案例2:连续的请求。
 *  0    100ms    200ms    300ms
 *  +-------+-------+-------+-----→ 滑动窗口
 *                      ^
 *                      |
 *                     请求

案例3:请求不间断的到来,并且前一个桶变成了无效。
 *  0    100ms    200ms	  800ms	   900ms  1000ms    1300ms
 *  +-------+-------+ ...... +-------+-------+ ...... +-------+-----→ 滑动窗口
 *                                                      ^
 *                                                      |
 *                                                     请求

 * 滑动窗口会变成:
 * <pre>
 * 300ms     800ms  900ms  1000ms  1300ms
 *  + ...... +-------+ ...... +-------+-----→        滑动窗口
 *                                                      ^
 *                                                      |
 *                                                     请求
*/
public class StatisticNode implements Node {

     //保存最近N毫秒的统计信息,时间跨度N被给定的样本总数划分
    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
        IntervalProperty.INTERVAL);

     //保存最近60秒的统计信息,windowLengthInMs被故意设置为1000毫秒,即每秒每个桶,这样我们就可以获得每秒的准确统计数据。
    private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

     //线程总数
    private LongAdder curThreadNum = new LongAdder();

    //获取度量的最后一个时间戳。
    private long lastFetchTime = -1;

    //获取资源的所有有效度量节点。
    @Override
    public Map<Long, MetricNode> metrics() {
        //在单线程调度程序池下,获取操作是线程安全的。
        long currentTime = TimeUtil.currentTimeMillis();
        currentTime = currentTime - currentTime % 1000;
        Map<Long, MetricNode> metrics = new ConcurrentHashMap<>();
        List<MetricNode> nodesOfEverySecond = rollingCounterInMinute.details();
        long newLastFetchTime = lastFetchTime;
        // 迭代所有资源的度量,过滤有效的度量(非空的和最新的)。
        for (MetricNode node : nodesOfEverySecond) {
            if (isNodeInTime(node, currentTime) && isValidMetricNode(node)) {
                metrics.put(node.getTimestamp(), node);
                newLastFetchTime = Math.max(newLastFetchTime, node.getTimestamp());
            }
        }
        lastFetchTime = newLastFetchTime;

        return metrics;
    }

    //生成满足时间谓词的聚合度量项。
    @Override
    public List<MetricNode> rawMetricsInMin(Predicate<Long> timePredicate) {
        return rollingCounterInMinute.detailsOnCondition(timePredicate);
    }

    //度量node时间是否在上次获取时间和当前时间之内
    private boolean isNodeInTime(MetricNode node, long currentTime) {
        return node.getTimestamp() > lastFetchTime && node.getTimestamp() < currentTime;
    }

    //判断度量node是否有效
    private boolean isValidMetricNode(MetricNode node) {
        return node.getPassQps() > 0 || node.getBlockQps() > 0 || node.getSuccessQps() > 0
            || node.getExceptionQps() > 0 || node.getRt() > 0 || node.getOccupiedPassQps() > 0;
    }

    //重置度量数组
    @Override
    public void reset() {
        rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);
    }

    @Override
    public long totalRequest() {
        return rollingCounterInMinute.pass() + rollingCounterInMinute.block();
    }

    @Override
    public long blockRequest() {
        return rollingCounterInMinute.block();
    }

    @Override
    public double blockQps() {
        return rollingCounterInSecond.block() / rollingCounterInSecond.getWindowIntervalInSec();
    }

    @Override
    public double previousBlockQps() {
        return this.rollingCounterInMinute.previousWindowBlock();
    }

    @Override
    public double previousPassQps() {
        return this.rollingCounterInMinute.previousWindowPass();
    }

    @Override
    public double totalQps() {
        return passQps() + blockQps();
    }

    @Override
    public long totalSuccess() {
        return rollingCounterInMinute.success();
    }

    @Override
    public double exceptionQps() {
        return rollingCounterInSecond.exception() / rollingCounterInSecond.getWindowIntervalInSec();
    }

    @Override
    public long totalException() {
        return rollingCounterInMinute.exception();
    }

    @Override
    public double passQps() {
        return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
    }

    @Override
    public long totalPass() {
        return rollingCounterInMinute.pass();
    }

    @Override
    public double successQps() {
        return rollingCounterInSecond.success() / rollingCounterInSecond.getWindowIntervalInSec();
    }

    @Override
    public double maxSuccessQps() {
        return (double) rollingCounterInSecond.maxSuccess() * rollingCounterInSecond.getSampleCount()
                / rollingCounterInSecond.getWindowIntervalInSec();
    }

    @Override
    public double occupiedPassQps() {
        return rollingCounterInSecond.occupiedPass() / rollingCounterInSecond.getWindowIntervalInSec();
    }

    @Override
    public double avgRt() {
        long successCount = rollingCounterInSecond.success();
        if (successCount == 0) {
            return 0;
        }

        return rollingCounterInSecond.rt() * 1.0 / successCount;
    }

    @Override
    public double minRt() {
        return rollingCounterInSecond.minRt();
    }

    @Override
    public int curThreadNum() {
        return (int)curThreadNum.sum();
    }

    @Override
    public void addPassRequest(int count) {
        rollingCounterInSecond.addPass(count);
        rollingCounterInMinute.addPass(count);
    }

    @Override
    public void addRtAndSuccess(long rt, int successCount) {
        rollingCounterInSecond.addSuccess(successCount);
        rollingCounterInSecond.addRT(rt);

        rollingCounterInMinute.addSuccess(successCount);
        rollingCounterInMinute.addRT(rt);
    }

    @Override
    public void increaseBlockQps(int count) {
        rollingCounterInSecond.addBlock(count);
        rollingCounterInMinute.addBlock(count);
    }

    @Override
    public void increaseExceptionQps(int count) {
        rollingCounterInSecond.addException(count);
        rollingCounterInMinute.addException(count);
    }

    @Override
    public void increaseThreadNum() {
        curThreadNum.increment();
    }

    @Override
    public void decreaseThreadNum() {
        curThreadNum.decrement();
    }

    @Override
    public void debug() {
        rollingCounterInSecond.debug();
    }
    //这个是OccupySupport接口的实现。尝试占用有一段时间窗口的令牌。如果占位成功,一个小于occupyTimeout的值会与OccupyTimeoutProperty结构一同返回。具体的解释需要去OccupySupport接口中查看。
    @Override
    public long tryOccupyNext(long currentTime, int acquireCount, double threshold) {
        double maxCount = threshold * IntervalProperty.INTERVAL / 1000;
        long currentBorrow = rollingCounterInSecond.waiting();
        if (currentBorrow >= maxCount) {
            return OccupyTimeoutProperty.getOccupyTimeout();
        }

        int windowLength = IntervalProperty.INTERVAL / SampleCountProperty.SAMPLE_COUNT;
        long earliestTime = currentTime - currentTime % windowLength + windowLength - IntervalProperty.INTERVAL;

        int idx = 0;
        /*
         * Note: here {@code currentPass} may be less than it really is NOW, because time difference
         * since call rollingCounterInSecond.pass(). So in high concurrency, the following code may
         * lead more tokens be borrowed.
         */
        long currentPass = rollingCounterInSecond.pass();
        while (earliestTime < currentTime) {
            long waitInMs = idx * windowLength + windowLength - currentTime % windowLength;
            if (waitInMs >= OccupyTimeoutProperty.getOccupyTimeout()) {
                break;
            }
            long windowPass = rollingCounterInSecond.getWindowPass(earliestTime);
            if (currentPass + currentBorrow + acquireCount - windowPass <= maxCount) {
                return waitInMs;
            }
            earliestTime += windowLength;
            currentPass -= windowPass;
            idx++;
        }

        return OccupyTimeoutProperty.getOccupyTimeout();
    }

    @Override
    public long waiting() {
        return rollingCounterInSecond.waiting();
    }

    @Override
    public void addWaitingRequest(long futureTime, int acquireCount) {
        rollingCounterInSecond.addWaiting(futureTime, acquireCount);
    }

    @Override
    public void addOccupiedPass(int acquireCount) {
        rollingCounterInMinute.addOccupiedPass(acquireCount);
        rollingCounterInMinute.addPass(acquireCount);
    }
}

  他们之间的继承关系,也侧面反映出来了,每个资源都有一个统计信息,对应存储到Node上,但是上一级的信息会是下一级信息的一个统计。

node结构图


  • metirc

  Metric 是 Sentinel 中用来进行实时数据统计的度量接口,node就是通过metric来进行数据统计的。而metric本身也并没有统计的能力,他也是通过Window来进行统计的。

  Metric有一个实现类:ArrayMetric,在ArrayMetric中主要是通过一个叫LeapArray的对象进行窗口统计的。

metrics

  如果想要了解sentinel是如何计算度量统计数据的,需要深度LeapArray这个类。里面有很多方法值得学习的。我想抽时间学习一下他采用的时间窗口算法,然后再写一篇自己的理解吧。

本文参考: