写点什么

Sentinel 源码—ProcessorSlot 的执行过程

  • 2025-04-17
    福建
  • 本文字数:25064 字

    阅读完需:约 82 分钟

1.NodeSelectorSlot 构建资源调用树

 

(1)Entry 的处理链的执行入口


每当一个线程处理包含某些资源的接口请求时,会调用 SphU 的 entry()方法去创建并管控该接口中涉及的 Entry 资源访问对象。

 

在创建 Entry 资源访问对象的期间,会创建一个 ResourceWrapper 对象、一个 Context 对象、以及根据 ResourceWrapper 对象创建或获取一个 ProcessorSlotChain 对象,也就是把 ProcessorSlotChain 对象、Context 对象与 ResourceWrapper 对象绑定到 Entry 对象中。


public class SphU {    private static final Object[] OBJECTS0 = new Object[0];    ...    public static Entry entry(String name) throws BlockException {        //调用CtSph.entry()方法创建一个Entry资源访问对象,默认的请求类型为OUT        return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);    }}
public class Env { //创建一个CtSph对象 public static final Sph sph = new CtSph(); static { InitExecutor.doInit(); }}
public class CtSph implements Sph { //Same resource will share the same ProcessorSlotChain}, no matter in which Context. //Same resource is that ResourceWrapper#equals(Object). private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(); ... @Override public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException { //StringResourceWrapper是ResourceWrapper的子类,且StringResourceWrapper的构造方法默认了资源类型为COMMON StringResourceWrapper resource = new StringResourceWrapper(name, type); return entry(resource, count, args); } //Do all {@link Rule}s checking about the resource. public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException { //调用CtSph.entryWithPriority()方法,执行如下处理: //初始化Context -> 将Context与线程绑定 -> 初始化Entry -> 将Context和ResourceWrapper放入Entry中 return entryWithPriority(resourceWrapper, count, false, args); } private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { //从当前线程中获取Context Context context = ContextUtil.getContext(); if (context instanceof NullContext) { return new CtEntry(resourceWrapper, null, context); } //如果没获取到Context if (context == null) { //Using default context. //创建一个名为sentinel_default_context的Context,并且与当前线程绑定 context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME); } //Global switch is close, no rule checking will do. if (!Constants.ON) { return new CtEntry(resourceWrapper, null, context); } //调用CtSph.lookProcessChain()方法初始化处理链(处理器插槽链条) ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); if (chain == null) { return new CtEntry(resourceWrapper, null, context); } //创建出一个Entry对象,将处理链(处理器插槽链条)、Context与Entry绑定 //其中会将Entry的三个基础属性(封装在resourceWrapper里)以及当前Entry所属的Context作为参数传入CtEntry的构造方法 Entry e = new CtEntry(resourceWrapper, chain, context); try { //处理链(处理器插槽链条)入口,负责采集数据,规则验证 //调用DefaultProcessorSlotChain.entry()方法执行处理链每个节点的逻辑(数据采集+规则验证) chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { //规则验证失败,比如:被流控、被熔断降级、触发黑白名单等 e.exit(count, args); throw e1; } catch (Throwable e1) { RecordLog.info("Sentinel unexpected exception", e1); } return e; } ... private final static class InternalContextUtil extends ContextUtil { static Context internalEnter(String name) { //调用ContextUtil.trueEnter()方法创建一个Context对象 return trueEnter(name, ""); } static Context internalEnter(String name, String origin) { return trueEnter(name, origin); } } //Get ProcessorSlotChain of the resource. //new ProcessorSlotChain will be created if the resource doesn't relate one. //Same resource will share the same ProcessorSlotChain globally, no matter in which Context. //Same resource is that ResourceWrapper#equals(Object). ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { ProcessorSlotChain chain = chainMap.get(resourceWrapper); if (chain == null) { synchronized (LOCK) { chain = chainMap.get(resourceWrapper); if (chain == null) { //Entry size limit. if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { return null; } //调用SlotChainProvider.newSlotChain()方法初始化处理链(处理器插槽链条) chain = SlotChainProvider.newSlotChain(); //写时复制 Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1); newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); chainMap = newMap; } } } return chain; }}
public class StringResourceWrapper extends ResourceWrapper { public StringResourceWrapper(String name, EntryType e) { //调用父类构造方法,且默认资源类型为COMMON super(name, e, ResourceTypeConstants.COMMON); } ...}
//Utility class to get or create Context in current thread.//Each SphU.entry() should be in a Context.//If we don't invoke ContextUtil.enter() explicitly, DEFAULT context will be used.public class ContextUtil { //Store the context in ThreadLocal for easy access. //存放线程与Context的绑定关系 //每个请求对应一个线程,每个线程绑定一个Context,所以每个请求对应一个Context private static ThreadLocal<Context> contextHolder = new ThreadLocal<>(); //Holds all EntranceNode. Each EntranceNode is associated with a distinct context name. //以Context的name作为key,EntranceNode作为value缓存到HashMap中 private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>(); private static final ReentrantLock LOCK = new ReentrantLock(); private static final Context NULL_CONTEXT = new NullContext(); ... //ContextUtil.trueEnter()方法会尝试从ThreadLocal获取一个Context对象 //如果获取不到,再创建一个Context对象然后放入ThreadLocal中 //入参name其实一般就是默认的Constants.CONTEXT_DEFAULT_NAME=sentinel_default_context //由于当前线程可能会涉及创建多个Entry资源访问对象,所以trueEnter()方法需要注意并发问题 protected static Context trueEnter(String name, String origin) { //从ThreadLocal中获取当前线程绑定的Context对象 Context context = contextHolder.get(); //如果当前线程还没绑定Context对象,则初始化Context对象并且与当前线程进行绑定 if (context == null) { //首先要获取或创建Context对象所需要的EntranceNode对象,EntranceNode会负责统计名字相同的Context下的指标数据 //将全局缓存contextNameNodeMap赋值给一个临时变量localCacheNameMap //因为后续会对contextNameNodeMap的内容进行修改,所以这里需要将原来的contextNameNodeMap复制一份出来 //从而避免后续对contextNameNodeMap的内容进行修改时,可能造成对接下来读取contextNameNodeMap内容的影响 Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
//从缓存副本localCacheNameMap中获取EntranceNode //这个name其实一般就是默认的sentinel_default_context DefaultNode node = localCacheNameMap.get(name); //如果获取的EntranceNode为空 if (node == null) { //为了防止缓存无限制地增长,导致内存占用过高,需要设置一个上限,只要超过上限,就直接返回NULL_CONTEXT if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) { setNullContext(); return NULL_CONTEXT; } else { //如果Context还没创建,缓存里也没有当前Context名称对应的EntranceNode,并且缓存数量尚未达到2000 //那么就创建一个EntranceNode,创建EntranceNode时需要加锁,否则会有线程不安全问题 //毕竟需要修改HashMap类型的contextNameNodeMap
//通过加锁 + 缓存 + 写时复制更新缓存,避免并发情况下创建出多个EntranceNode对象 //一个线程对应一个Context对象,多个线程对应多个Context对象 //这些Context对象会使用ThreadLocal进行隔离,但它们的name默认都是sentinel_default_context //根据下面的代码逻辑: //多个线程(对应多个Context的name默认都是sentinel_default_context)会共用同一个EntranceNode //于是可知,多个Context对象会共用一个EntranceNode对象 LOCK.lock(); try { //从缓存中获取EntranceNode node = contextNameNodeMap.get(name); //对node进行Double Check //如果没获取到EntranceNode if (node == null) { if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) { setNullContext(); return NULL_CONTEXT; } else { //创建EntranceNode,缓存到contextNameNodeMap当中 node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null); //Add entrance node. //将新创建的EntranceNode添加到ROOT中,ROOT就是每个Node的根结点 Constants.ROOT.addChild(node);
//写时复制,将新创建的EntranceNode添加到缓存中 Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1); newMap.putAll(contextNameNodeMap); newMap.put(name, node); contextNameNodeMap = newMap; } } } finally { //解锁 LOCK.unlock(); } } } //此处可能会有多个线程同时执行到此处,并发创建多个Context对象 //但这是允许的,因为一个请求对应一个Context,一个请求对应一个线程,所以一个线程本来就需要创建一个Context对象
//初始化Context,将刚获取到或刚创建的EntranceNode放到Context的entranceNode属性中 context = new Context(node, name); context.setOrigin(origin); //将创建出来的Context对象放入ThreadLocal变量contextHolder中,实现Context对象与当前线程的绑定 contextHolder.set(context); } return context; } ...}
public final class SlotChainProvider { private static volatile SlotChainBuilder slotChainBuilder = null;
//The load and pick process is not thread-safe, //but it's okay since the method should be only invoked via CtSph.lookProcessChain() under lock. public static ProcessorSlotChain newSlotChain() { //如果存在,则直接返回 if (slotChainBuilder != null) { return slotChainBuilder.build(); } //Resolve the slot chain builder SPI. //通过SPI机制初始化SlotChainBuilder slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault(); if (slotChainBuilder == null) { //Should not go through here. RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default"); slotChainBuilder = new DefaultSlotChainBuilder(); } else { RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}", slotChainBuilder.getClass().getCanonicalName()); } return slotChainBuilder.build(); } private SlotChainProvider() { }}
@Spi(isDefault = true)public class DefaultSlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { //创建一个DefaultProcessorSlotChain对象实例 ProcessorSlotChain chain = new DefaultProcessorSlotChain(); //通过SPI机制加载责任链的节点ProcessorSlot实现类 //然后按照@Spi注解的order属性进行排序并进行实例化 //最后将ProcessorSlot实例放到sortedSlotList中 List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted(); //遍历已排好序的ProcessorSlot集合 for (ProcessorSlot slot : sortedSlotList) { //安全检查,防止业务系统也写了一个SPI文件,但没按规定继承AbstractLinkedProcessorSlot if (!(slot instanceof AbstractLinkedProcessorSlot)) { RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain"); continue; } //调用DefaultProcessorSlotChain.addLast()方法构建单向链表 //将责任链的节点ProcessorSlot实例放入DefaultProcessorSlotChain中 chain.addLast((AbstractLinkedProcessorSlot<?>) slot); } //返回单向链表 return chain; }}
复制代码


在 DefaultSlotChainBuilder 的 build()方法中,从其初始化 ProcessorSlotChain 的逻辑可知,Entry 的处理链的执行入口就是 DefaultProcessorSlotChain 的 entry()方法。

 

当一个线程调用 SphU 的 entry()方法创建完与接口相关的 Entry 对象后,就会调用 DefaultProcessorSlotChain 的 entry()方法执行处理链节点的逻辑。因为 NodeSelectorSlot 是 Entry 的处理链 ProcessorSlotChain 的第一个节点,所以接着会调用 NodeSelectorSlot 的 entry()方法。由于处理链中紧接着 NodeSelectorSlot 的下一个节点是 ClusterBuilderSlot,所以执行完 NodeSelectorSlot 的 entry()方法后,会接着执行 ClusterBuilderSlot 的 entry()方法。


public class DefaultProcessorSlotChain extends ProcessorSlotChain {    ...    @Override    public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable {        //默认情况下会调用处理链的第一个节点NodeSelectorSlot的transformEntry()方法        first.transformEntry(context, resourceWrapper, t, count, prioritized, args);    }    ...}
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> { ... void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args) throws Throwable { T t = (T)o; entry(context, resourceWrapper, t, count, prioritized, args); } ...}
复制代码


(2)NodeSelectorSlot 的源码


NodeSelectorSlot 和 ClusterBuilderSlot 会一起构建 Context 的资源调用树,资源调用树的作用其实就是用来统计资源的调用数据。

 

在一个 Context 对象实例的资源调用树上主要会有如下三类节点:DefaultNode、EntranceNode、ClusterNode,分别对应于:单机里的资源维度、接口维度、集群中的资源维度。

 

其中 DefaultNode 会统计名字相同的 Context 下的某个资源的调用数据,EntranceNode 会统计名字相同的 Context 下的全部资源的调用数据,ClusterNode 会统计某个资源在全部 Context 下的调用数据。

 

在执行 NodeSelectorSlot 的 entry()方法时,首先会从缓存(NodeSelectorSlot.map 属性)中获取一个 DefaultNode 对象。如果获取不到,再通过 DCL 机制创建一个 DefaultNode 对象并更新缓存。其中缓存的 key 是 Context 的 name,value 是 DefaultNode 对象。由于默认情况下多个线程对应的 Context 的 name 都相同,所以多个线程访问同一资源时使用的 DefaultNode 对象也一样。

 

在执行 ClusterBuilderSlot 的 entry()方法时,首先会判断缓存是否为 null,若是则创建一个 ClusterNode 对象,然后再将 ClusterNode 对象设置到 DefaultNode 对象的 clusterNode 属性中。

 

由 DefaultNode、EntranceNode、ClusterNode 构成的资源调用树:因为 DefaultNode 是和资源 ResourceWrapper 以及 Context 挂钩的,所以 DefaultNode 应该添加到 EntranceNode 中。因为 ClusterNode 和资源挂钩,而不和 Context 挂钩,所以 ClusterNode 应该添加到 DefaultNode 中。

 

具体的资源调用树构建源码如下:


//This class will try to build the calling traces via://adding a new DefaultNode if needed as the last child in the context.//the context's last node is the current node or the parent node of the context.//setting itself to the context current node.
//It works as follow:// ContextUtil.enter("entrance1", "appA");// Entry nodeA = SphU.entry("nodeA");// if (nodeA != null) {// nodeA.exit();// }// ContextUtil.exit();
//Above code will generate the following invocation structure in memory:// machine-root// /// /// EntranceNode1// /// /// DefaultNode(nodeA)- - - - - -> ClusterNode(nodeA);//Here the EntranceNode represents "entrance1" given by ContextUtil.enter("entrance1", "appA").//Both DefaultNode(nodeA) and ClusterNode(nodeA) holds statistics of "nodeA", which is given by SphU.entry("nodeA").//The ClusterNode is uniquely identified by the ResourceId; //The DefaultNode is identified by both the resource id and {@link Context}. //In other words, one resource id will generate multiple DefaultNode for each distinct context, //but only one ClusterNode.
//the following code shows one resource id in two different context:// ContextUtil.enter("entrance1", "appA");// Entry nodeA = SphU.entry("nodeA");// if (nodeA != null) {// nodeA.exit();// }// ContextUtil.exit();// ContextUtil.enter("entrance2", "appA");// nodeA = SphU.entry("nodeA");// if (nodeA != null) {// nodeA.exit();// }// ContextUtil.exit();
//Above code will generate the following invocation structure in memory:// machine-root// / \// / \// EntranceNode1 EntranceNode2// / \// / \// DefaultNode(nodeA) DefaultNode(nodeA)// | |// +- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA);//As we can see, two DefaultNode are created for "nodeA" in two context, //but only one ClusterNode is created.//We can also check this structure by calling: http://localhost:8719/tree?type=root@Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> { //DefaultNodes of the same resource in different context. //缓存map以Context的name为key,DefaultNode为value //由于默认情况下多个线程对应的Context的name都相同,所以多个线程访问资源时使用的DefaultNode也一样 private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10); @Override public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { //It's interesting that we use context name rather resource name as the map key. //Remember that same resource will share the same ProcessorSlotChain globally, no matter in which context. //Same resource is that ResourceWrapper#equals(Object). //So if code goes into entry(Context, ResourceWrapper, DefaultNode, int, Object...), //the resource name must be same but context name may not. //If we use SphU.entry(String resource)} to enter same resource in different context, //using context name as map key can distinguish the same resource. //In this case, multiple DefaultNodes will be created of the same resource name, //for every distinct context (different context name) each. //Consider another question. One resource may have multiple DefaultNode, //so what is the fastest way to get total statistics of the same resource? //The answer is all DefaultNodes with same resource name share one ClusterNode. //See ClusterBuilderSlot for detail. //先从缓存中获取 DefaultNode node = map.get(context.getName()); if (node == null) { //使用DCL机制,即Double Check + Lock机制 synchronized (this) { node = map.get(context.getName()); if (node == null) { //每个线程访问Entry时,都会调用CtSph.entry()方法创建一个ResourceWrapper对象 //下面根据ResourceWrapper创建一个DefaultNode对象 node = new DefaultNode(resourceWrapper, null); //写时复制更新缓存map HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size()); cacheMap.putAll(map); cacheMap.put(context.getName(), node); map = cacheMap; //Build invocation tree //首先会调用context.getLastNode()方法,获取到的是Context.entranceNode属性即一个EntranceNode对象 //EntranceNode对象是在执行ContextUtil.trueEnter()方法创建Context对象实例时添加到Context对象中的 //然后会将刚创建的DefaultNode对象添加到EntranceNode对象的childList列表中 ((DefaultNode) context.getLastNode()).addChild(node); } } } //设置Context的curNode属性为当前获取到或新创建的DefaultNode对象 context.setCurNode(node); //触发执行下一个ProcessorSlot,即ClusterBuilderSlot fireEntry(context, resourceWrapper, node, count, prioritized, args); } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { fireExit(context, resourceWrapper, count, args); }}
//This slot maintains resource running statistics (response time, qps, thread count, exception), //and a list of callers as well which is marked by ContextUtil.enter(String origin).//One resource has only one cluster node, while one resource can have multiple default nodes.@Spi(isSingleton = false, order = Constants.ORDER_CLUSTER_BUILDER_SLOT)public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> { //Remember that same resource will share the same ProcessorSlotChain globally, no matter in which context. //Same resource is that ResourceWrapper#equals(Object). //So if code goes into entry(Context, ResourceWrapper, DefaultNode, int, boolean, Object...), //the resource name must be same but context name may not. //To get total statistics of the same resource in different context, //same resource shares the same ClusterNode} globally. //All ClusterNodes are cached in this map. //The longer the application runs, the more stable this mapping will become. //so we don't concurrent map but a lock. //as this lock only happens at the very beginning while concurrent map will hold the lock all the time. private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
private static final Object lock = new Object(); private volatile ClusterNode clusterNode = null;
@Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { if (clusterNode == null) { //使用DCL机制,即Double Check + Lock机制 synchronized (lock) { if (clusterNode == null) { //Create the cluster node. //创建ClusterNode对象 clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType()); HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16)); newMap.putAll(clusterNodeMap); newMap.put(node.getId(), clusterNode); clusterNodeMap = newMap; } } } //设置DefaultNode的clusterNode属性为获取到的ClusterNode对象 node.setClusterNode(clusterNode);
//if context origin is set, we should get or create a new {@link Node} of the specific origin. if (!"".equals(context.getOrigin())) { Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin()); context.getCurEntry().setOriginNode(originNode); } //执行下一个ProcessorSlot fireEntry(context, resourceWrapper, node, count, prioritized, args); }
@Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { fireExit(context, resourceWrapper, count, args); } ...}
复制代码


(3)Context 对象中存储的资源调用树总结


其实 Context 对象的属性 entranceNode 就代表了一棵资源调用树。

 

首先,在调用 ContextUtil 的 trueEnter()方法创建 Context 对象实例时,便会创建一个 EntranceNode 对象并赋值给 Context 的 entranceNode 属性,以及调用 Constants.ROOT 的 addChild()方法,将这个 EntranceNode 对象放入 Constants.ROOT 的 childList 列表中。

 

然后,执行 NodeSelectorSlot 的 entry()方法时,便会创建一个 DefaultNode 对象。该 DefaultNode 对象会被添加到 Context.entranceNode 的 childList 列表中,也就是前面创建的 EntranceNode 对象的 childList 列表中。

 

接着,执行 ClusterBuilderSlot 的 entry()方法时,便会创建一个 ClusterNode 对象,该 ClusterNode 对象会赋值给前面 DefaultNode 对象中的 clusterNode 属性。

 

至此,便构建完 Context 下的资源调用树了。Constants.ROOT 的 childList 里会存放多个 EntranceNode 对象,每个 EntranceNode 对象的 childList 里会存放多个 DefaultNode 对象,而每个 DefaultNode 对象会指向一个 ClusterNode 对象。


//This class holds metadata of current invocation://the EntranceNode: the root of the current invocation tree.//the current Entry: the current invocation point.//the current Node: the statistics related to the Entry.//the origin: The origin is useful when we want to control different invoker/consumer separately.//Usually the origin could be the Service Consumer's app name or origin IP.
//Each SphU.entry() or SphO.entry() should be in a Context,//if we don't invoke ContextUtil.enter() explicitly, DEFAULT context will be used.//A invocation tree will be created if we invoke SphU.entry() multi times in the same context.//Same resource in different context will count separately, see NodeSelectorSlot.public class Context { //Context name. private final String name; //The entrance node of current invocation tree. private DefaultNode entranceNode; //Current processing entry. private Entry curEntry; //The origin of this context (usually indicate different invokers, e.g. service consumer name or origin IP). private String origin = ""; ... public Context(DefaultNode entranceNode, String name) { this(name, entranceNode, false); } public Context(String name, DefaultNode entranceNode, boolean async) { this.name = name; this.entranceNode = entranceNode; this.async = async; } //Get the parent Node of the current. public Node getLastNode() { if (curEntry != null && curEntry.getLastNode() != null) { return curEntry.getLastNode(); } else { return entranceNode; } } ...}
public class ContextUtil { //以Context的name作为key,EntranceNode作为value缓存所有的EntranceNode到HashMap中 private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>(); ... protected static Context trueEnter(String name, String origin) { ... //从缓存中获取EntranceNode DefaultNode node = contextNameNodeMap.get(name); ... //创建EntranceNode,缓存到contextNameNodeMap当中 node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null); //将新创建的EntranceNode添加到ROOT中,ROOT就是每个Node的根结点 Constants.ROOT.addChild(node); ... //初始化Context,将刚获取到或刚创建的EntranceNode放到Context的entranceNode属性中 context = new Context(node, name); ... } ...}
public final class Constants { ... //Global ROOT statistic node that represents the universal parent node. public final static DefaultNode ROOT = new EntranceNode( new StringResourceWrapper(ROOT_ID, EntryType.IN), new ClusterNode(ROOT_ID, ResourceTypeConstants.COMMON) ); ...}
//A Node used to hold statistics for specific resource name in the specific context.//Each distinct resource in each distinct Context will corresponding to a DefaultNode.//This class may have a list of sub DefaultNodes.//Child nodes will be created when calling SphU.entry() or SphO.entry() multiple times in the same Context.public class DefaultNode extends StatisticNode { //The resource associated with the node. private ResourceWrapper id; //The list of all child nodes. private volatile Set<Node> childList = new HashSet<>(); //Associated cluster node. private ClusterNode clusterNode; ... //Add child node to current node. public void addChild(Node node) { if (node == null) { RecordLog.warn("Trying to add null child to node <{}>, ignored", id.getName()); return; } if (!childList.contains(node)) { synchronized (this) { if (!childList.contains(node)) { Set<Node> newSet = new HashSet<>(childList.size() + 1); newSet.addAll(childList); newSet.add(node); childList = newSet; } } RecordLog.info("Add child <{}> to node <{}>", ((DefaultNode)node).id.getName(), id.getName()); } } //Reset the child node list. public void removeChildList() { this.childList = new HashSet<>(); } ...}
@Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> { //DefaultNodes of the same resource in different context. //缓存map以Context的name为key,DefaultNode为value //由于默认情况下多个线程对应的Context的name都相同,所以多个线程访问资源时使用的DefaultNode也一样 private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10); ... @Override public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { ... //先从缓存中获取 DefaultNode node = map.get(context.getName()); ... //下面根据ResourceWrapper创建一个DefaultNode对象 node = new DefaultNode(resourceWrapper, null); ... //Build invocation tree //首先会调用context.getLastNode()方法,获取到的是Context.entranceNode属性即一个EntranceNode对象 //EntranceNode对象是在执行ContextUtil.trueEnter()方法创建Context对象实例时添加到Context对象中的 //然后会将刚创建的DefaultNode对象添加到EntranceNode对象的childList列表中 ((DefaultNode) context.getLastNode()).addChild(node); ... //执行下一个ProcessorSlot fireEntry(context, resourceWrapper, node, count, prioritized, args); } ...}
@Spi(isSingleton = false, order = Constants.ORDER_CLUSTER_BUILDER_SLOT)public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> { ... private volatile ClusterNode clusterNode = null; @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { ... //创建ClusterNode对象 clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType()); ... //设置DefaultNode的clusterNode属性为获取到的ClusterNode对象 node.setClusterNode(clusterNode); ... //执行下一个ProcessorSlot fireEntry(context, resourceWrapper, node, count, prioritized, args); } ...}
//资源调用树的示例如下所示:// machine-root// / \// / \// EntranceNode1 EntranceNode2// / \// / \// DefaultNode(nodeA) DefaultNode(nodeA)// | |// +- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA);//其中,machine-root中的childList里会有很多个EntranceNode对象//EntranceNode对象中的childList里又会有很多个DefaultNode对象//每个DefaultNode对象下都会指向一个ClusterNode对象
复制代码


一些对应关系的梳理总结:


一个线程对应一个 ResourceWrapper 对象实例,一个线程对应一个 Context 对象实例。如果 ResourceWrapper 对象相同,则会共用一个 ProcessorSlotChain 实例。如果 ResourceWrapper 对象相同,则也会共用一个 ClusterNode 实例。如果 Context 对象的名字相同,则会共用一个 EntranceNode 对象实例。如果 Context 对象的名字相同,则也会共用一个 DefaultNode 对象实例。


//每个请求对应一个线程,每个线程绑定一个Context,所以每个请求对应一个Contextprivate static ThreadLocal<Context> contextHolder = new ThreadLocal<>();
//以Context的name作为key,EntranceNode作为value缓存所有的EntranceNode到HashMap中private static volatile Map<String, EntranceNode> contextNameNodeMap = new HashMap<>();
//Same resource will share the same ProcessorSlotChain}, no matter in which Context.//Same resource is that ResourceWrapper#equals(Object).private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap = new HashMap<ResourceWrapper, ProcessorSlotChain>();
//DefaultNodes of the same resource in different context.//以Context的name作为key,DefaultNode作为value//由于默认情况下多个线程对应的Context的name都相同,所以多个线程访问资源时使用的DefaultNode也一样private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10); //To get total statistics of the same resource in different context, //same resource shares the same ClusterNode globally.//All ClusterNodes are cached in this map.private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
复制代码


2.LogSlot 和 StatisticSlot 采集资源的数据


(1)LogSlot 的源码


LogSlot 用于记录异常请求日志,以便于故障排查。也就是当出现 BlockException 异常时,调用 EagleEyeLogUtil 的 log()方法将日志写到 sentinel-block.log 文件中。


//A ProcessorSlot that is response for logging block exceptions to provide concrete logs for troubleshooting.@Spi(order = Constants.ORDER_LOG_SLOT)public class LogSlot extends AbstractLinkedProcessorSlot<DefaultNode> {    @Override    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args) throws Throwable {        try {            //调用下一个ProcessorSlot            fireEntry(context, resourceWrapper, obj, count, prioritized, args);        } catch (BlockException e) {            //被流控或者熔断降级后打印log日志            EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(), context.getOrigin(), e.getRule().getId(), count);            throw e;        } catch (Throwable e) {            RecordLog.warn("Unexpected entry exception", e);        }    }
@Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { try { //调用下一个ProcessorSlot fireExit(context, resourceWrapper, count, args); } catch (Throwable e) { RecordLog.warn("Unexpected entry exit exception", e); } }}
public class EagleEyeLogUtil { public static final String FILE_NAME = "sentinel-block.log"; private static StatLogger statLogger; static { String path = LogBase.getLogBaseDir() + FILE_NAME; statLogger = EagleEye.statLoggerBuilder("sentinel-block-log") .intervalSeconds(1) .entryDelimiter('|') .keyDelimiter(',') .valueDelimiter(',') .maxEntryCount(6000) .configLogFilePath(path) .maxFileSizeMB(300) .maxBackupIndex(3) .buildSingleton(); } public static void log(String resource, String exceptionName, String ruleLimitApp, String origin, Long ruleId, int count) { String ruleIdString = StringUtil.EMPTY; if (ruleId != null) { ruleIdString = String.valueOf(ruleId); } statLogger.stat(resource, exceptionName, ruleLimitApp, origin, ruleIdString).count(count); }}
复制代码


(2)StatisticSlot 的源码


StatisticSlot 用于统计资源的调用数据,如请求成功数、请求失败数、响应时间等。

 

注意:开始对请求进行规则验证时,需要调用 SphU 的 entry()方法。完成对请求的规则验证后,也需要调用 Entry 的 exit()方法。


//A processor slot that dedicates to real time statistics.//When entering this slot, we need to separately count the following information://ClusterNode: total statistics of a cluster node of the resource ID.//Origin node: statistics of a cluster node from different callers/origins.//DefaultNode: statistics for specific resource name in the specific context.//Finally, the sum statistics of all entrances.@Spi(order = Constants.ORDER_STATISTIC_SLOT)public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {    @Override    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {        try {            //Do some checking.            //执行下一个ProcessorSlot,先进行规则验证等            fireEntry(context, resourceWrapper, node, count, prioritized, args);
//Request passed, add thread count and pass count. //如果通过了后面ProcessorSlot的验证 //则将处理当前资源resourceWrapper的线程数 + 1 以及 将对当前资源resourceWrapper的成功请求数 + 1 node.increaseThreadNum(); node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) { //Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); context.getCurEntry().getOriginNode().addPassRequest(count); }
if (resourceWrapper.getEntryType() == EntryType.IN) { //Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); Constants.ENTRY_NODE.addPassRequest(count); }
//Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (PriorityWaitException ex) { node.increaseThreadNum(); if (context.getCurEntry().getOriginNode() != null) { //Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); }
if (resourceWrapper.getEntryType() == EntryType.IN) { //Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); } //Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (BlockException e) {//捕获BlockException //Blocked, set block exception to current entry. context.getCurEntry().setBlockError(e);
//Add block count. //如果规则验证失败,则将BlockQps+1 node.increaseBlockQps(count); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseBlockQps(count); }
if (resourceWrapper.getEntryType() == EntryType.IN) { //Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseBlockQps(count); }
//Handle block event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onBlocked(e, context, resourceWrapper, node, count, args); } throw e; } catch (Throwable e) { //Unexpected internal error, set error to current entry. context.getCurEntry().setError(e); throw e; } } //开始对请求进行规则验证时,需要调用SphU.entry()方法 //完成对请求的规则验证后,也需要调用Entry.exit()方法 @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { Node node = context.getCurNode(); if (context.getCurEntry().getBlockError() == null) { //Calculate response time (use completeStatTime as the time of completion). //获取系统当前时间 long completeStatTime = TimeUtil.currentTimeMillis(); context.getCurEntry().setCompleteTimestamp(completeStatTime); //计算响应时间 = 系统当前事件 - 根据资源resourceWrapper创建Entry资源访问对象时的时间 long rt = completeStatTime - context.getCurEntry().getCreateTimestamp();
Throwable error = context.getCurEntry().getError();
//Record response time and success count. //记录响应时间等信息 recordCompleteFor(node, count, rt, error); recordCompleteFor(context.getCurEntry().getOriginNode(), count, rt, error); if (resourceWrapper.getEntryType() == EntryType.IN) { recordCompleteFor(Constants.ENTRY_NODE, count, rt, error); } }
//Handle exit event with registered exit callback handlers. Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks(); for (ProcessorSlotExitCallback handler : exitCallbacks) { handler.onExit(context, resourceWrapper, count, args); } fireExit(context, resourceWrapper, count, args); }
private void recordCompleteFor(Node node, int batchCount, long rt, Throwable error) { if (node == null) { return; } node.addRtAndSuccess(rt, batchCount); node.decreaseThreadNum(); if (error != null && !(error instanceof BlockException)) { node.increaseExceptionQps(batchCount); } }}
复制代码


(3)记录资源在不同维度下的调用数据


一.如何统计单机里某个资源的调用数据


由于 DefaultNode 会统计名字相同的 Context 下的某个资源的调用数据,它是按照单机里的资源维度进行调用数据统计的,所以在 StatisticSlot 的 entry()方法中,会调用 DefaultNode 的方法来进行统计。


//A Node used to hold statistics for specific resource name in the specific context.//Each distinct resource in each distinct Context will corresponding to a DefaultNode.//This class may have a list of sub DefaultNodes.//Child nodes will be created when calling SphU.entry() or SphO.entry() multiple times in the same Context.public class DefaultNode extends StatisticNode {    //The resource associated with the node.    private ResourceWrapper id;    //Associated cluster node.    private ClusterNode clusterNode;    ...        @Override    public void increaseThreadNum() {        super.increaseThreadNum();        this.clusterNode.increaseThreadNum();    }        @Override    public void addPassRequest(int count) {        super.addPassRequest(count);        this.clusterNode.addPassRequest(count);    }        @Override    public void increaseBlockQps(int count) {        super.increaseBlockQps(count);        this.clusterNode.increaseBlockQps(count);    }        @Override    public void addRtAndSuccess(long rt, int successCount) {        super.addRtAndSuccess(rt, successCount);        this.clusterNode.addRtAndSuccess(rt, successCount);    }        @Override    public void decreaseThreadNum() {        super.decreaseThreadNum();        this.clusterNode.decreaseThreadNum();    }    ...}
public class StatisticNode implements Node { //The counter for thread count. private LongAdder curThreadNum = new LongAdder(); //Holds statistics of the recent INTERVAL milliseconds. //The INTERVAL is divided into time spans by given sampleCount. private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL); //Holds statistics of the recent 60 seconds. //The windowLengthInMs is deliberately set to 1000 milliseconds, //meaning each bucket per second, in this way we can get accurate statistics of each second. private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false); ... @Override public void increaseThreadNum() { curThreadNum.increment(); } @Override public void addPassRequest(int count) { rollingCounterInSecond.addPass(count); rollingCounterInMinute.addPass(count); } @Override public void increaseBlockQps(int count) { rollingCounterInSecond.addBlock(count); rollingCounterInMinute.addBlock(count); } @Override public void addRtAndSuccess(long rt, int successCount) { rollingCounterInSecond.addSuccess(successCount); rollingCounterInSecond.addRT(rt);
rollingCounterInMinute.addSuccess(successCount); rollingCounterInMinute.addRT(rt); } @Override public void decreaseThreadNum() { curThreadNum.decrement(); } ...}
复制代码


二.如何统计所有资源的调用数据即接口调用数据


由于 EntranceNode 会统计名字相同的 Context 下的全部资源的调用数据,它是按接口维度来统计调用数据的,即统计接口下所有资源的调用情况,所以可以通过遍历 EntranceNode 的 childList 来统计接口的调用数据。


//A Node represents the entrance of the invocation tree.//One Context will related to a EntranceNode,//which represents the entrance of the invocation tree. //New EntranceNode will be created if current context does't have one. //Note that same context name will share same EntranceNode globally.public class EntranceNode extends DefaultNode {    public EntranceNode(ResourceWrapper id, ClusterNode clusterNode) {        super(id, clusterNode);    }        @Override    public double avgRt() {        double total = 0;        double totalQps = 0;        for (Node node : getChildList()) {            total += node.avgRt() * node.passQps();            totalQps += node.passQps();        }        return total / (totalQps == 0 ? 1 : totalQps);    }        @Override    public double blockQps() {        double blockQps = 0;        for (Node node : getChildList()) {            blockQps += node.blockQps();        }        return blockQps;    }        @Override    public long blockRequest() {        long r = 0;        for (Node node : getChildList()) {            r += node.blockRequest();        }        return r;    }        @Override    public int curThreadNum() {        int r = 0;        for (Node node : getChildList()) {            r += node.curThreadNum();        }        return r;    }        @Override    public double totalQps() {        double r = 0;        for (Node node : getChildList()) {            r += node.totalQps();        }        return r;    }        @Override    public double successQps() {        double r = 0;        for (Node node : getChildList()) {            r += node.successQps();        }        return r;    }        @Override    public double passQps() {        double r = 0;        for (Node node : getChildList()) {            r += node.passQps();        }        return r;    }        @Override    public long totalRequest() {        long r = 0;        for (Node node : getChildList()) {            r += node.totalRequest();        }        return r;    }        @Override    public long totalPass() {        long r = 0;        for (Node node : getChildList()) {            r += node.totalPass();        }        return r;    }}
复制代码


三.如何统计集群中某个资源的调用数据


由于 ClusterNode 会统计某个资源在全部 Context 下的调用数据,它是按照集群中的资源维度进行调用数据统计的,而 StatisticSlot 的 entry()调用 DefaultNode 的方法统计单机下的资源时,会顺便调用 ClusterNode 的方法来统计集群下的资源调用,所以通过 ClusterNode 就可以获取集群中某个资源的调用数据。


//A Node used to hold statistics for specific resource name in the specific context.//Each distinct resource in each distinct Context will corresponding to a DefaultNode.//This class may have a list of sub DefaultNodes.//Child nodes will be created when calling SphU.entry() or SphO.entry() multiple times in the same Context.public class DefaultNode extends StatisticNode {    //The resource associated with the node.    private ResourceWrapper id;    //Associated cluster node.    private ClusterNode clusterNode;    ...        @Override    public void increaseThreadNum() {        super.increaseThreadNum();        this.clusterNode.increaseThreadNum();    }        @Override    public void addPassRequest(int count) {        super.addPassRequest(count);        this.clusterNode.addPassRequest(count);    }        @Override    public void increaseBlockQps(int count) {        super.increaseBlockQps(count);        this.clusterNode.increaseBlockQps(count);    }        @Override    public void addRtAndSuccess(long rt, int successCount) {        super.addRtAndSuccess(rt, successCount);        this.clusterNode.addRtAndSuccess(rt, successCount);    }        @Override    public void decreaseThreadNum() {        super.decreaseThreadNum();        this.clusterNode.decreaseThreadNum();    }    ...}
复制代码


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18829907

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2025-04-01 加入

还未添加个人简介

评论

发布
暂无评论
Sentinel源码—ProcessorSlot的执行过程_电子尖叫食人鱼_InfoQ写作社区