sentinel中有几个比较重要的实体需要我们了解。
-
resource
resource是sentinel中最重要的一个概念。sentinel通过resource来保护具体的业务代码和其他后方服务。sentinel把复杂的逻辑给屏蔽掉了,用户只需要为受保护的代码或服务定义一个resource,然后定义rule就可以了,剩下的就通通交给sentinel来处理。并且resource和rule是解耦的,rule设置可以在runtime动态修改,定义完resource后,就可以通过在程序中埋点来保护自己的服务了,埋点的方式有两种:
- try-catch 方式(通过 SphU.entry(…)),当 catch 到BlockException时执行异常处理(或fallback)
- if-else 方式(通过 SphO.entry(…)),当返回 false 时执行异常处理(或fallback)
以上这两种方式都是通过硬编码的形式定义resource然后进行resource埋点的,对业务代码的侵入太大,从0.1.1版本开始, Sentinel 加入了注解的支持,可以通过注解来定义资源,具体的注解为:SentinelResource 。通过注解除了可以定义资源外,还可以指定 blockHandler 和 fallback 方法。
//这个注解代表定义一个sentinel resource
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface SentinelResource {
//sentinel resource名称
String value() default "";
//entry类型 (输入或者输出,默认是输出)
EntryType entryType() default EntryType.OUT;
//资源的分类(类型)
int resourceType() default 0;
//限流的异常名,默认为空
String blockHandler() default "";
/*
blockHandler默认和原方法在同一个类中。
如果希望使用其他类的函数,则需要指定 blockHandlerClass 为对应的类的 Class 对象。必须是静态的,否则无法解析
*/
Class<?>[] blockHandlerClass() default {};
//熔断方法名
String fallback() default "";
/*
defaultFallBack()用来做默认的降级方法
这个方法不接受任何参数,并且返回必须的类型和原始方法适配
*/
String defaultFallback() default "";
/*
fallback()默认情况下是和原方法在一起的。
同blockHandlerClass。
*/
Class<?>[] fallbackClass() default {};
//用来追踪的异常列表
Class<? extends Throwable>[] exceptionsToTrace() default {Throwable.class};
/*
指明有哪些异常可以被忽略。
exceptionsToTrace()和exceptionsToIgnore()不能够同时存在
exceptionsToIgnore()优先使用。
*/
Class<? extends Throwable>[] exceptionsToIgnore() default {};
}
其中EntryType的枚举为:
public enum EntryType {
IN("IN"),
OUT("OUT");
}
有了注解以后,接下来就是看看通过反射,获取配置了注解的方法是如何使用的了。使用了@SentinelResource
就需要依赖sentinel-annotation-aspectj
了。具体实现如下:
//SentinelResource切面的实现。继承于AbstractSentinelAspectSupport
@Aspect //定义切面
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {
//匹配有SentinelResource注解的方法。设置切入点
@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
public void sentinelResourceAnnotationPointcut() {
}
//以sentinelResourceAnnotationPointcut()为切入点,环绕增强。也就是说有这个注解的方法环绕增强
@Around("sentinelResourceAnnotationPointcut()")
public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
//反射,获取方法信息
Method originMethod = resolveMethod(pjp);
//通过方法反射出注解信息
SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
//如果注解信息为null的话,就抛出异常。否则,继续
if (annotation == null) {
throw new IllegalStateException("Wrong state for SentinelResource annotation");
}
//获取资源名
String resourceName = getResourceName(annotation.value(), originMethod);
//获取entry类型
EntryType entryType = annotation.entryType();
//获取资源类型
int resourceType = annotation.resourceType();
Entry entry = null;
try {
//出发entry()出发熔断降级策略,具体看我的[sentinel原理](https://www.naffan.cn/tech/2020/04/04/01.html)
entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
return pjp.proceed();
} catch (BlockException ex) {
//如果遇到了BlockException说明限流阻塞了。具体会去调用注解中设置的blockHandler()方法
return handleBlockException(pjp, annotation, ex);
} catch (Throwable ex) {
//反射注解中的忽略异常列表
Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
// 如果异常忽略列表数量不为空且抛出的异常属于忽略异常,继续抛出异常。
if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
throw ex;
}
//如果抛出的异常属于需要跟踪的异常
if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
//跟踪,并且出发熔断降级方法,handleFallBack()
traceException(ex);
return handleFallback(pjp, annotation, ex);
}
// No fallback function can handle the exception, so throw it out.
throw ex;
} finally {
//最终不要忘了entry需要exit。回收资源用的
if (entry != null) {
entry.exit(1, pjp.getArgs());
}
}
}
}
我们看完了反射的注解了。但是其中有一些方法是继承于AbstractSentinelAspectSupport
的实现方法。我们也来看看刚才用到的方法:
//一些给切面提供功能的普通方法
public abstract class AbstractSentinelAspectSupport {
//追踪记录异常,这些异常不是阻塞的异常
protected void traceException(Throwable ex) {
Tracer.trace(ex);
}
//判断ex和annotation反射出来的类型。然后进行追踪
protected void traceException(Throwable ex, SentinelResource annotation) {
Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
// The ignore list will be checked first.
if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
return;
}
if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
traceException(ex);
}
}
//判断异常是否在提供的异常列表中
protected boolean exceptionBelongsTo(Throwable ex, Class<? extends Throwable>[] exceptions) {
if (exceptions == null) {
return false;
}
for (Class<? extends Throwable> exceptionClass : exceptions) {
if (exceptionClass.isAssignableFrom(ex.getClass())) {
return true;
}
}
return false;
}
//获取资源名称
protected String getResourceName(String resourceName, /*@NonNull*/ Method method) {
// If resource name is present in annotation, use this value.
if (StringUtil.isNotBlank(resourceName)) {
return resourceName;
}
// 调用方法类中的esolveMethodName。这个方法用因为涉及线程安全问题,所以加锁来获取资源名称。
return MethodUtil.resolveMethodName(method);
}
//调用熔断方法
protected Object handleFallback(ProceedingJoinPoint pjp, SentinelResource annotation, Throwable ex)
throws Throwable {
return handleFallback(pjp, annotation.fallback(), annotation.defaultFallback(), annotation.fallbackClass(), ex);
}
//真正执行熔断的方法
protected Object handleFallback(ProceedingJoinPoint pjp, String fallback, String defaultFallback,
Class<?>[] fallbackClass, Throwable ex) throws Throwable {
//获取参数
Object[] originArgs = pjp.getArgs();
// 如果配置了就执行熔断方法,没有配置就调用一个默认的熔断方法
Method fallbackMethod = extractFallbackMethod(pjp, fallback, fallbackClass);
if (fallbackMethod != null) {
// 构造参数
int paramCount = fallbackMethod.getParameterTypes().length;
Object[] args;
if (paramCount == originArgs.length) {
args = originArgs;
} else {
args = Arrays.copyOf(originArgs, originArgs.length + 1);
args[args.length - 1] = ex;
}
try {
//熔断方法是否是静态方法
if (isStatic(fallbackMethod)) {
//调用静态的熔断方法
return fallbackMethod.invoke(null, args);
}
//不是静态方法的话,就调用熔断方法
return fallbackMethod.invoke(pjp.getTarget(), args);
} catch (InvocationTargetException e) {
// throw the actual exception
throw e.getTargetException();
}
}
// If fallback is absent, we'll try the defaultFallback if provided.
return handleDefaultFallback(pjp, defaultFallback, fallbackClass, ex);
}
//默认的熔断方法。过程同上
protected Object handleDefaultFallback(ProceedingJoinPoint pjp, String defaultFallback,
Class<?>[] fallbackClass, Throwable ex) throws Throwable {
// Execute the default fallback function if configured.
Method fallbackMethod = extractDefaultFallbackMethod(pjp, defaultFallback, fallbackClass);
if (fallbackMethod != null) {
// Construct args.
Object[] args = fallbackMethod.getParameterTypes().length == 0 ? new Object[0] : new Object[] {ex};
try {
if (isStatic(fallbackMethod)) {
return fallbackMethod.invoke(null, args);
}
return fallbackMethod.invoke(pjp.getTarget(), args);
} catch (InvocationTargetException e) {
// throw the actual exception
throw e.getTargetException();
}
}
// If no any fallback is present, then directly throw the exception.
throw ex;
}
//调用阻塞限流,同上
protected Object handleBlockException(ProceedingJoinPoint pjp, SentinelResource annotation, BlockException ex)
throws Throwable {
// Execute block handler if configured.
Method blockHandlerMethod = extractBlockHandlerMethod(pjp, annotation.blockHandler(),
annotation.blockHandlerClass());
if (blockHandlerMethod != null) {
Object[] originArgs = pjp.getArgs();
// Construct args.
Object[] args = Arrays.copyOf(originArgs, originArgs.length + 1);
args[args.length - 1] = ex;
try {
if (isStatic(blockHandlerMethod)) {
return blockHandlerMethod.invoke(null, args);
}
return blockHandlerMethod.invoke(pjp.getTarget(), args);
} catch (InvocationTargetException e) {
// throw the actual exception
throw e.getTargetException();
}
}
// If no block handler is present, then go to fallback.
return handleFallback(pjp, annotation, ex);
}
//判断方法是否是静态方法
private boolean isStatic(Method method) {
return Modifier.isStatic(method.getModifiers());
}
…………
}
至此,通过@SentinelResource使用熔断降级,阻塞限流的原理就分析完成了。那接下来让我们看看ResourceWrapper是什么。
在 Sentinel 中具体表示资源的类是:ResourceWrapper ,他是一个抽象的包装类,包装了资源的 Name 和EntryType。他有两个实现类,分别是:StringResourceWrapper 和 MethodResourceWrapper
//resource的关于name和type的封装
public abstract class ResourceWrapper {
protected final String name;
//上面已经列出了EntryType的代码了,一个是IN,一个是OUT
protected final EntryType entryType;
protected final int resourceType;
//构造犯法,先判name和entryType是否是空或者null。如果不是的话就将其赋值给类内部变量。
public ResourceWrapper(String name, EntryType entryType, int resourceType) {
AssertUtil.notEmpty(name, "resource name cannot be empty");
AssertUtil.notNull(entryType, "entryType cannot be null");
this.name = name;
this.entryType = entryType;
this.resourceType = resourceType;
}
public String getName() {
return name;
}
public EntryType getEntryType() {
return entryType;
}
public int getResourceType() {
return resourceType;
}
//获取美化后的资源名称
public abstract String getShowName();
@Override
public int hashCode() {
return getName().hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof ResourceWrapper) {
ResourceWrapper rw = (ResourceWrapper)obj;
return rw.getName().equals(getName());
}
return false;
}
}
从代码中,我们可以看到其实这就是一个简单的resource封装。在实际的使用过程中,作为传递的参数。这样的好处是传递时可以记录下当时的状态信息等。接下来我们看看两种继承类
//通过方法信息来封装resource
public class MethodResourceWrapper extends ResourceWrapper {
private final transient Method method;
public MethodResourceWrapper(Method method, EntryType e) {
//调用下面的方法
this(method, e, ResourceTypeConstants.COMMON);
}
public MethodResourceWrapper(Method method, EntryType e, int resType) {
//调用父类的构造方法。第一参数是资源名,这里通过方法类解析出方法名字符串格式
super(MethodUtil.resolveMethodName(method), e, resType);
this.method = method;
}
public Method getMethod() {
return method;
}
@Override
public String getShowName() {
return name;
}
//这个其实就是之后的resouce的Key
@Override
public String toString() {
return "MethodResourceWrapper{" +
"name='" + name + '\'' +
", entryType=" + entryType +
", resourceType=" + resourceType +
'}';
}
}
另一个是
//其实和上面的方法一个意思,只是下面的实现直接可以把resource的string类型取到了
public class StringResourceWrapper extends ResourceWrapper {
public StringResourceWrapper(String name, EntryType e) {
super(name, e, ResourceTypeConstants.COMMON);
}
public StringResourceWrapper(String name, EntryType e, int resType) {
super(name, e, resType);
}
@Override
public String getShowName() {
return name;
}
//这个名称和上面的有些许的不一样
@Override
public String toString() {
return "StringResourceWrapper{" +
"name='" + name + '\'' +
", entryType=" + entryType +
", resourceType=" + resourceType +
'}';
}
}
当然,这块可以自定义的。只要按照SPI的规则来即可。
-
slot
Slot 是另一个 Sentinel 中非常重要的概念, Sentinel 的工作流程就是围绕着一个个插槽所组成的插槽链来展开的。需要注意的是每个插槽都有自己的职责,他们各司其职完好的配合,通过一定的编排顺序,来达到最终的限流降级的目的。默认的各个插槽之间的顺序是固定的,因为有的插槽需要依赖其他的插槽计算出来的结果才能进行工作。
但是这并不意味着我们只能按照框架的定义来,Sentinel 通过 SlotChainBuilder 作为 SPI 接口,使得 Slot Chain 具备了扩展的能力。我们可以通过实现 SlotsChainBuilder 接口加入自定义的 slot 并自定义编排各个 slot 之间的顺序,从而可以给 Sentinel 添加自定义的功能。
那SlotChain是在哪创建的呢?是在 CtSph.lookProcessChain() 方法中创建的,并且该方法会根据当前请求的资源先去一个静态的HashMap中获取,如果获取不到才会创建,创建后会保存到HashMap中。这就意味着,同一个资源会全局共享一个SlotChain。
相关slot的更多讲解可以看我之前写的sentinel原理解析,上面介绍的slot都在里面讲到过
-
context
context是sentinel整个系统的上下文功能。context里面记载着很多关键信息。
/*
这个类保存着整个调用过程中的元数据,这其中包括
EntranceNode - 当前调用树的跟节点
Entry - 当前调用节点
Node - 和Entry有关的统计信息
origin - 先解释为源点吧。源点对我们区分控制发起者/消费者非常用用。一般源点可以视为消费者的服务app名称或者ip
每个SphU.entry或者SphO.entry应该都在一个context中,如果我们不明确的调用ContextUtil.enter()方法,那么会使用一个默认的context。
如果我们在同一个context中多次调用SphU.entry()一个调用树会被创建起来。
在不同的context中的相同的resource会分开来计算,详情请看NodeSelectorSlot的实现
*/
public class Context {
private final String name;
//当前调用树的入口node
private DefaultNode entranceNode;
//当前所在的entry
private Entry curEntry;
//当前context的源点信息 ,一般是消费者名称或者是个ip
private String origin = "";
private final boolean async;
//新建一个同步的上下文
public static Context newAsyncContext(DefaultNode entranceNode, String name) {
return new Context(name, entranceNode, true);
}
public Context(DefaultNode entranceNode, String name) {
this(name, entranceNode, false);
}
//建立context的最终方法。就是给context对象里设置这三个类内变量
public Context(String name, DefaultNode entranceNode, boolean async) {
this.name = name;
this.entranceNode = entranceNode;
this.async = async;
}
public boolean isAsync() {
return async;
}
public String getName() {
return name;
}
//下面是一堆get和set方法
public Node getCurNode() {
return curEntry == null ? null : curEntry.getCurNode();
}
public Context setCurNode(Node node) {
this.curEntry.setCurNode(node);
return this;
}
public Entry getCurEntry() {
return curEntry;
}
public Context setCurEntry(Entry curEntry) {
this.curEntry = curEntry;
return this;
}
public String getOrigin() {
return origin;
}
public Context setOrigin(String origin) {
this.origin = origin;
return this;
}
public double getOriginTotalQps() {
return getOriginNode() == null ? 0 : getOriginNode().totalQps();
}
public double getOriginBlockQps() {
return getOriginNode() == null ? 0 : getOriginNode().blockQps();
}
public double getOriginPassReqQps() {
return getOriginNode() == null ? 0 : getOriginNode().successQps();
}
public double getOriginPassQps() {
return getOriginNode() == null ? 0 : getOriginNode().passQps();
}
public long getOriginTotalRequest() {
return getOriginNode() == null ? 0 : getOriginNode().totalRequest();
}
public long getOriginBlockRequest() {
return getOriginNode() == null ? 0 : getOriginNode().blockRequest();
}
public double getOriginAvgRt() {
return getOriginNode() == null ? 0 : getOriginNode().avgRt();
}
public int getOriginCurThreadNum() {
return getOriginNode() == null ? 0 : getOriginNode().curThreadNum();
}
public DefaultNode getEntranceNode() {
return entranceNode;
}
//获取当前节点的父节点
public Node getLastNode() {
if (curEntry != null && curEntry.getLastNode() != null) {
return curEntry.getLastNode();
} else {
return entranceNode;
}
}
//获取源点
public Node getOriginNode() {
return curEntry == null ? null : curEntry.getOriginNode();
}
@Override
public String toString() {
return "Context{" +
"name='" + name + '\'' +
", entranceNode=" + entranceNode +
", curEntry=" + curEntry +
", origin='" + origin + '\'' +
", async=" + async +
'}';
}
}
可以看到,context中存储着entry和node等信息。等一会我会详细的分析这两个类。但是,我想想先看看context的创建实现。context的实现位置在ContextUtil.java中
//工具类,用来在当前线程中获取或者创建上下文
public class ContextUtil {
//存储在threadLocal中,为了方便存取
private static ThreadLocal<Context> contextHolder = new ThreadLocal<>();
//定义一个静态的且是volatile类型的hashMap来存储所有EntranceNode类,每个EntrnceNode和一个唯一的上下文名称绑定。 这个名称是Context.toString定义的
//为什么不用concurrnetHashMap呢?
private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();
//用一个可重入锁
private static final ReentrantLock LOCK = new ReentrantLock();
//nullContext这个也是一个类。一会看看他干了啥
private static final Context NULL_CONTEXT = new NullContext();
//静态代码块,直接运行
static {
//缓存EntranceNode
initDefaultContext();
}
private static void initDefaultContext() {
//默认的上下文名称
String defaultContextName = Constants.CONTEXT_DEFAULT_NAME;
//构造EntranceNode,采用字符串封装Resource的方式。EntryType为出口类型
EntranceNode node = new EntranceNode(new StringResourceWrapper(defaultContextName, EntryType.IN), null);
//将node添加在root下。
Constants.ROOT.addChild(node);
//存储在hashmap中
contextNameNodeMap.put(defaultContextName, node);
}
//测试用,非线程安全。重置hashMap用的
static void resetContextMap() {
if (contextNameNodeMap != null) {
RecordLog.warn("Context map cleared and reset to initial state");
contextNameNodeMap.clear();
initDefaultContext();
}
}
/*
进入调用上下文,它标记为调用链的入口。
context被ThreadLocal封起来了,意味着每个线程拥有着自己的context
如果当前线程没有context,就会创建一个context给这个线程。
一个context会与一个调用链中的一个代表着入口统计节点的EntranceNode进行绑定。
如果当前context没有EntranceNode,新的EntranceNode会被创建出来。
要注意的是同样的context名可能会被同样的EntraceNode全局共享。
在ClusterBuilderSlot类中会会创建源节点
不同的资源的不同源点会创建不同的Node,统计源点的总数等于resource名称量+源点数量
所以,当有很多源点时,应该仔细考虑内存占用率。
不同的context在同样的resource中会被单独计算,具体看NodeSelectorSlot
*/
public static Context enter(String name, String origin) {
//如果context name等于配置中的默认名称就抛出相应的exception
if (Constants.CONTEXT_DEFAULT_NAME.equals(name)) {
throw new ContextNameDefineException(
"The " + Constants.CONTEXT_DEFAULT_NAME + " can't be permit to defined!");
}
//调用下面的方法
return trueEnter(name, origin);
}
//真正实现创建context的实现。
protected static Context trueEnter(String name, String origin) {
//从threadLocal中获取context
Context context = contextHolder.get();
if (context == null) {
//如果context为空,首先定义一个空的hashMap
Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
//从hashMap中找到key是name的value。
DefaultNode node = localCacheNameMap.get(name);
if (node == null) {
//node是空的话,hashMap的大小大于Constants.MAX_CONTEXT_NAME_SIZE
if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
//把hasehMap设置为空
setNullContext();
return NULL_CONTEXT;
} else {
//node不空,需要上锁,用的重入锁
LOCK.lock();
try {
//double get
node = contextNameNodeMap.get(name);
if (node == null) {
//如果还是空的话,hashMap的大小大于Constants.MAX_CONTEXT_NAME_SIZE,跟上面的处理一样。
if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
//这块是重点。定义一个EntranceNode。
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
// 将Node添加到根下面,也就是构造调用树。
Constants.ROOT.addChild(node);
//再定义一个比上边hashMap空间大1的hashMap
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
//将上边的hashMap放入新的HashMap中
newMap.putAll(contextNameNodeMap);
//然后,多出来的那一个空间用来存放当前的node
newMap.put(name, node);
//原先的HashMap更新为新的HashMap
contextNameNodeMap = newMap;
}
}
} finally {
//结束,解锁
LOCK.unlock();
}
}
}
//hashMap构造完毕以后,开始构造context。并保存到当前的ThreadLocal中。
context = new Context(node, name);
context.setOrigin(origin);
contextHolder.set(context);
}
//返回context,这个context
return context;
}
private static boolean shouldWarn = true;
//把hashMap设置为空。
private static void setNullContext() {
contextHolder.set(NULL_CONTEXT);
// Don't need to be thread-safe.
if (shouldWarn) {
RecordLog.warn("[SentinelStatusChecker] WARN: Amount of context exceeds the threshold "
+ Constants.MAX_CONTEXT_NAME_SIZE + ". Entries in new contexts will NOT take effect!");
shouldWarn = false;
}
}
//重载Context.enter()
public static Context enter(String name) {
return enter(name, "");
}
//从当前线程中退出context,也就是从ThreadLocal中删除Context
public static void exit() {
Context context = contextHolder.get();
if (context != null && context.getCurEntry() == null) {
contextHolder.set(null);
}
}
//获取当前entrance node hashMap的大小
public static int contextSize() {
return contextNameNodeMap.size();
}
//判断提供的context是不是初始自动创建额context
public static boolean isDefaultContext(Context context) {
if (context == null) {
return false;
}
return Constants.CONTEXT_DEFAULT_NAME.equals(context.getName());
}
//从当前的线程中获取context
public static Context getContext() {
return contextHolder.get();
}
//替换context的方法。
static Context replaceContext(Context newContext) {
Context backupContext = contextHolder.get();
if (newContext == null) {
contextHolder.remove();
} else {
contextHolder.set(newContext);
}
return backupContext;
}
//执行提供的context中的代码。可以看到这个f是个runnable接口实现。也就是说f是个可以进行扩展的方法。提供开发者自定义。
public static void runOnContext(Context context, Runnable f) {
Context curContext = replaceContext(context);
try {
f.run();
} finally {
replaceContext(curContext);
}
}
}
ContextUtil看完了,再看看NullContext。
//这个类比较简单,就是调用Context中的context方法,创建一个空的context。意味着将会没有检测能够提供。
public class NullContext extends Context {
public NullContext() {
super(null, "null_context_internal");
}
}
就这样,我了解了context在sentinel中起的作用了。其实,主要就是为了将请求行程有关系的结构,也就是调用链,其结构是由Node(节点)组成的树。
-
entry
Entry 是 Sentinel 中用来表示是否通过限流的一个凭证,就像一个token一样。每次执行 SphU.entry() 或 SphO.entry() 都会返回一个 Entry 给调用者,意思就是告诉调用者,如果正确返回了 Entry 给你,那表示你可以正常访问被 Sentinel 保护的后方服务了,否则 Sentinel 会抛出一个BlockException(如果是 SphO.entry() 会返回false),这就表示调用者想要访问的服务被保护了,也就是说调用者本身被限流了。
entry中保存了本次执行 entry() 方法的一些基本信息,包括:
- createTime:当前Entry的创建时间,主要用来后期计算rt
- node:当前Entry所关联的node,该node主要是记录了当前context下该资源的统计信息
- origin:当前Entry的调用来源,通常是调用方的应用名称,在 ClusterBuilderSlot.entry() 方法中设置的
- resourceWrapper:当前Entry所关联的资源
当在一个上下文中多次调用了 SphU.entry() 方法时,就会创建一个调用树,这个树的节点之间是通过parent和child关系维持的。
需要注意的是:parent和child是在 CtSph 类的一个私有内部类 CtEntry 中定义的
,CtEntry 是 Entry 的一个子类。 由于context中总是保存着调用链树中的当前入口,所以当当前entry执行exit退出时,需要将parent设置为当前入口。
-
node
Node 中保存了资源的实时统计数据,例如:passQps,blockQps,rt等实时数据。正是有了这些统计数据后, Sentinel 才能进行限流、降级等一系列的操作。
node是一个接口,他有一个实现类:StatisticNode,但是StatisticNode本身也有两个子类,一个是DefaultNode,另一个是ClusterNode,DefaultNode又有一个子类叫EntranceNode。
其中entranceNode是每个上下文的入口,该节点是直接挂在root下的,是全局唯一的,每一个context都会对应一个entranceNode。另外defaultNode是记录当前调用的实时数据的,每个defaultNode都关联着一个资源和clusterNode,有着相同资源的defaultNode,他们关联着同一个clusterNode。
首先,我先看看Node的定义。其实想要全部看懂Node的设计,还是应该先了解下Metric度量的设计,否则可能不太明白sentinel怎么进行计算每个时间维度的量:
//掌握着资源的时实统计数据
public interface Node extends OccupySupport, DebugSupport {
//一分钟的所有来源请求(pass + block)
long totalRequest();
//一分钟pass的请求
long totalPass();
long totalSuccess();
long blockRequest();
//一分钟异常总数
long totalException();
//每秒通过的请求
double passQps();
double blockQps();
//passQps + blockQps
double totalQps();
//每秒调用了Entry.exit()的请求。调用了这方法说明整个流程走完了,没有抛出blockException也就是没有进行熔断限流
double successQps();
//目前为止预估最大的成功qps
double maxSuccessQps();
double exceptionQps();
//rt - response time。一秒平均返回耗时
double avgRt();
double minRt();
//当前激活中的线程总数
int curThreadNum();
//迁移秒钟被限制的QPS
double previousBlockQps();
double previousPassQps();
//所有资源的可用的检查节点
Map<Long, MetricNode> metrics();
/**
* Fetch all raw metric items that satisfies the time predicate.
*
* @param timePredicate time predicate
* @return raw metric items that satisfies the time predicate
* @since 1.7.0
*/
List<MetricNode> rawMetricsInMin(Predicate<Long> timePredicate);
void addPassRequest(int count);
void addRtAndSuccess(long rt, int success);
void increaseBlockQps(int count);
void increaseExceptionQps(int count);
void increaseThreadNum();
void decreaseThreadNum();
//终止内部计算器。
void reset();
}
Node接口的描述也有了。那么剩下的实现其实就是在其基础上实现不同的功能了。我不打算一一进行分析了。所以,就暂时先分析一个StaticNode吧,毕竟其他的Node也都是继承于他。
/*
统计节点有三种实时统计指标:
秒级统计
分钟级别统计
线程总数
sentinel用滑动窗口来记录和统计实时的resource数据。
ArrayMetric的LeapArray是实现滑动窗口的基础设施。
案例1:当第一个请求进来,sentinel会创建一个新的窗口,这个窗口的意思是有着能够将统计数据保存在特定时间跨度的桶。例如总响应时间,QPS,拦截请求等等。这个时间跨度是由样本数量确定的,也就是说是动态的。
* 0 100ms
* +-------+--→ 滑动窗口
* ^
* |
* 请求
Sentinel使用有效桶的静态信息来决定是否可以传递此请求。比如,如果一个规则定义是只能有100个请求能够通过,他会在有效桶中将所有qps加起来与规则定义的阈值(100)进行对比。
案例2:连续的请求。
* 0 100ms 200ms 300ms
* +-------+-------+-------+-----→ 滑动窗口
* ^
* |
* 请求
案例3:请求不间断的到来,并且前一个桶变成了无效。
* 0 100ms 200ms 800ms 900ms 1000ms 1300ms
* +-------+-------+ ...... +-------+-------+ ...... +-------+-----→ 滑动窗口
* ^
* |
* 请求
* 滑动窗口会变成:
* <pre>
* 300ms 800ms 900ms 1000ms 1300ms
* + ...... +-------+ ...... +-------+-----→ 滑动窗口
* ^
* |
* 请求
*/
public class StatisticNode implements Node {
//保存最近N毫秒的统计信息,时间跨度N被给定的样本总数划分
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
//保存最近60秒的统计信息,windowLengthInMs被故意设置为1000毫秒,即每秒每个桶,这样我们就可以获得每秒的准确统计数据。
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
//线程总数
private LongAdder curThreadNum = new LongAdder();
//获取度量的最后一个时间戳。
private long lastFetchTime = -1;
//获取资源的所有有效度量节点。
@Override
public Map<Long, MetricNode> metrics() {
//在单线程调度程序池下,获取操作是线程安全的。
long currentTime = TimeUtil.currentTimeMillis();
currentTime = currentTime - currentTime % 1000;
Map<Long, MetricNode> metrics = new ConcurrentHashMap<>();
List<MetricNode> nodesOfEverySecond = rollingCounterInMinute.details();
long newLastFetchTime = lastFetchTime;
// 迭代所有资源的度量,过滤有效的度量(非空的和最新的)。
for (MetricNode node : nodesOfEverySecond) {
if (isNodeInTime(node, currentTime) && isValidMetricNode(node)) {
metrics.put(node.getTimestamp(), node);
newLastFetchTime = Math.max(newLastFetchTime, node.getTimestamp());
}
}
lastFetchTime = newLastFetchTime;
return metrics;
}
//生成满足时间谓词的聚合度量项。
@Override
public List<MetricNode> rawMetricsInMin(Predicate<Long> timePredicate) {
return rollingCounterInMinute.detailsOnCondition(timePredicate);
}
//度量node时间是否在上次获取时间和当前时间之内
private boolean isNodeInTime(MetricNode node, long currentTime) {
return node.getTimestamp() > lastFetchTime && node.getTimestamp() < currentTime;
}
//判断度量node是否有效
private boolean isValidMetricNode(MetricNode node) {
return node.getPassQps() > 0 || node.getBlockQps() > 0 || node.getSuccessQps() > 0
|| node.getExceptionQps() > 0 || node.getRt() > 0 || node.getOccupiedPassQps() > 0;
}
//重置度量数组
@Override
public void reset() {
rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);
}
@Override
public long totalRequest() {
return rollingCounterInMinute.pass() + rollingCounterInMinute.block();
}
@Override
public long blockRequest() {
return rollingCounterInMinute.block();
}
@Override
public double blockQps() {
return rollingCounterInSecond.block() / rollingCounterInSecond.getWindowIntervalInSec();
}
@Override
public double previousBlockQps() {
return this.rollingCounterInMinute.previousWindowBlock();
}
@Override
public double previousPassQps() {
return this.rollingCounterInMinute.previousWindowPass();
}
@Override
public double totalQps() {
return passQps() + blockQps();
}
@Override
public long totalSuccess() {
return rollingCounterInMinute.success();
}
@Override
public double exceptionQps() {
return rollingCounterInSecond.exception() / rollingCounterInSecond.getWindowIntervalInSec();
}
@Override
public long totalException() {
return rollingCounterInMinute.exception();
}
@Override
public double passQps() {
return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}
@Override
public long totalPass() {
return rollingCounterInMinute.pass();
}
@Override
public double successQps() {
return rollingCounterInSecond.success() / rollingCounterInSecond.getWindowIntervalInSec();
}
@Override
public double maxSuccessQps() {
return (double) rollingCounterInSecond.maxSuccess() * rollingCounterInSecond.getSampleCount()
/ rollingCounterInSecond.getWindowIntervalInSec();
}
@Override
public double occupiedPassQps() {
return rollingCounterInSecond.occupiedPass() / rollingCounterInSecond.getWindowIntervalInSec();
}
@Override
public double avgRt() {
long successCount = rollingCounterInSecond.success();
if (successCount == 0) {
return 0;
}
return rollingCounterInSecond.rt() * 1.0 / successCount;
}
@Override
public double minRt() {
return rollingCounterInSecond.minRt();
}
@Override
public int curThreadNum() {
return (int)curThreadNum.sum();
}
@Override
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
@Override
public void addRtAndSuccess(long rt, int successCount) {
rollingCounterInSecond.addSuccess(successCount);
rollingCounterInSecond.addRT(rt);
rollingCounterInMinute.addSuccess(successCount);
rollingCounterInMinute.addRT(rt);
}
@Override
public void increaseBlockQps(int count) {
rollingCounterInSecond.addBlock(count);
rollingCounterInMinute.addBlock(count);
}
@Override
public void increaseExceptionQps(int count) {
rollingCounterInSecond.addException(count);
rollingCounterInMinute.addException(count);
}
@Override
public void increaseThreadNum() {
curThreadNum.increment();
}
@Override
public void decreaseThreadNum() {
curThreadNum.decrement();
}
@Override
public void debug() {
rollingCounterInSecond.debug();
}
//这个是OccupySupport接口的实现。尝试占用有一段时间窗口的令牌。如果占位成功,一个小于occupyTimeout的值会与OccupyTimeoutProperty结构一同返回。具体的解释需要去OccupySupport接口中查看。
@Override
public long tryOccupyNext(long currentTime, int acquireCount, double threshold) {
double maxCount = threshold * IntervalProperty.INTERVAL / 1000;
long currentBorrow = rollingCounterInSecond.waiting();
if (currentBorrow >= maxCount) {
return OccupyTimeoutProperty.getOccupyTimeout();
}
int windowLength = IntervalProperty.INTERVAL / SampleCountProperty.SAMPLE_COUNT;
long earliestTime = currentTime - currentTime % windowLength + windowLength - IntervalProperty.INTERVAL;
int idx = 0;
/*
* Note: here {@code currentPass} may be less than it really is NOW, because time difference
* since call rollingCounterInSecond.pass(). So in high concurrency, the following code may
* lead more tokens be borrowed.
*/
long currentPass = rollingCounterInSecond.pass();
while (earliestTime < currentTime) {
long waitInMs = idx * windowLength + windowLength - currentTime % windowLength;
if (waitInMs >= OccupyTimeoutProperty.getOccupyTimeout()) {
break;
}
long windowPass = rollingCounterInSecond.getWindowPass(earliestTime);
if (currentPass + currentBorrow + acquireCount - windowPass <= maxCount) {
return waitInMs;
}
earliestTime += windowLength;
currentPass -= windowPass;
idx++;
}
return OccupyTimeoutProperty.getOccupyTimeout();
}
@Override
public long waiting() {
return rollingCounterInSecond.waiting();
}
@Override
public void addWaitingRequest(long futureTime, int acquireCount) {
rollingCounterInSecond.addWaiting(futureTime, acquireCount);
}
@Override
public void addOccupiedPass(int acquireCount) {
rollingCounterInMinute.addOccupiedPass(acquireCount);
rollingCounterInMinute.addPass(acquireCount);
}
}
他们之间的继承关系,也侧面反映出来了,每个资源都有一个统计信息,对应存储到Node上,但是上一级的信息会是下一级信息的一个统计。
-
metirc
Metric 是 Sentinel 中用来进行实时数据统计的度量接口,node就是通过metric来进行数据统计的。而metric本身也并没有统计的能力,他也是通过Window来进行统计的。
Metric有一个实现类:ArrayMetric,在ArrayMetric中主要是通过一个叫LeapArray
如果想要了解sentinel是如何计算度量统计数据的,需要深度LeapArray这个类。里面有很多方法值得学习的。我想抽时间学习一下他采用的时间窗口算法,然后再写一篇自己的理解吧。
本文参考: