Sentinel 源码—ProcessorSlot 的执行过程
- 2025-04-17 福建
本文字数:25064 字
阅读完需:约 82 分钟
1.NodeSelectorSlot 构建资源调用树
(1)Entry 的处理链的执行入口
每当一个线程处理包含某些资源的接口请求时,会调用 SphU 的 entry()方法去创建并管控该接口中涉及的 Entry 资源访问对象。
在创建 Entry 资源访问对象的期间,会创建一个 ResourceWrapper 对象、一个 Context 对象、以及根据 ResourceWrapper 对象创建或获取一个 ProcessorSlotChain 对象,也就是把 ProcessorSlotChain 对象、Context 对象与 ResourceWrapper 对象绑定到 Entry 对象中。
public class SphU {
private static final Object[] OBJECTS0 = new Object[0];
...
public static Entry entry(String name) throws BlockException {
//调用CtSph.entry()方法创建一个Entry资源访问对象,默认的请求类型为OUT
return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
}
}
public class Env {
//创建一个CtSph对象
public static final Sph sph = new CtSph();
static {
InitExecutor.doInit();
}
}
public class CtSph implements Sph {
//Same resource will share the same ProcessorSlotChain}, no matter in which Context.
//Same resource is that ResourceWrapper#equals(Object).
private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap = new HashMap<ResourceWrapper, ProcessorSlotChain>();
...
@Override
public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
//StringResourceWrapper是ResourceWrapper的子类,且StringResourceWrapper的构造方法默认了资源类型为COMMON
StringResourceWrapper resource = new StringResourceWrapper(name, type);
return entry(resource, count, args);
}
//Do all {@link Rule}s checking about the resource.
public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
//调用CtSph.entryWithPriority()方法,执行如下处理:
//初始化Context -> 将Context与线程绑定 -> 初始化Entry -> 将Context和ResourceWrapper放入Entry中
return entryWithPriority(resourceWrapper, count, false, args);
}
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {
//从当前线程中获取Context
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
return new CtEntry(resourceWrapper, null, context);
}
//如果没获取到Context
if (context == null) {
//Using default context.
//创建一个名为sentinel_default_context的Context,并且与当前线程绑定
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
//Global switch is close, no rule checking will do.
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
//调用CtSph.lookProcessChain()方法初始化处理链(处理器插槽链条)
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
//创建出一个Entry对象,将处理链(处理器插槽链条)、Context与Entry绑定
//其中会将Entry的三个基础属性(封装在resourceWrapper里)以及当前Entry所属的Context作为参数传入CtEntry的构造方法
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
//处理链(处理器插槽链条)入口,负责采集数据,规则验证
//调用DefaultProcessorSlotChain.entry()方法执行处理链每个节点的逻辑(数据采集+规则验证)
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
//规则验证失败,比如:被流控、被熔断降级、触发黑白名单等
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
...
private final static class InternalContextUtil extends ContextUtil {
static Context internalEnter(String name) {
//调用ContextUtil.trueEnter()方法创建一个Context对象
return trueEnter(name, "");
}
static Context internalEnter(String name, String origin) {
return trueEnter(name, origin);
}
}
//Get ProcessorSlotChain of the resource.
//new ProcessorSlotChain will be created if the resource doesn't relate one.
//Same resource will share the same ProcessorSlotChain globally, no matter in which Context.
//Same resource is that ResourceWrapper#equals(Object).
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
//Entry size limit.
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
//调用SlotChainProvider.newSlotChain()方法初始化处理链(处理器插槽链条)
chain = SlotChainProvider.newSlotChain();
//写时复制
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
}
public class StringResourceWrapper extends ResourceWrapper {
public StringResourceWrapper(String name, EntryType e) {
//调用父类构造方法,且默认资源类型为COMMON
super(name, e, ResourceTypeConstants.COMMON);
}
...
}
//Utility class to get or create Context in current thread.
//Each SphU.entry() should be in a Context.
//If we don't invoke ContextUtil.enter() explicitly, DEFAULT context will be used.
public class ContextUtil {
//Store the context in ThreadLocal for easy access.
//存放线程与Context的绑定关系
//每个请求对应一个线程,每个线程绑定一个Context,所以每个请求对应一个Context
private static ThreadLocal<Context> contextHolder = new ThreadLocal<>();
//Holds all EntranceNode. Each EntranceNode is associated with a distinct context name.
//以Context的name作为key,EntranceNode作为value缓存到HashMap中
private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();
private static final ReentrantLock LOCK = new ReentrantLock();
private static final Context NULL_CONTEXT = new NullContext();
...
//ContextUtil.trueEnter()方法会尝试从ThreadLocal获取一个Context对象
//如果获取不到,再创建一个Context对象然后放入ThreadLocal中
//入参name其实一般就是默认的Constants.CONTEXT_DEFAULT_NAME=sentinel_default_context
//由于当前线程可能会涉及创建多个Entry资源访问对象,所以trueEnter()方法需要注意并发问题
protected static Context trueEnter(String name, String origin) {
//从ThreadLocal中获取当前线程绑定的Context对象
Context context = contextHolder.get();
//如果当前线程还没绑定Context对象,则初始化Context对象并且与当前线程进行绑定
if (context == null) {
//首先要获取或创建Context对象所需要的EntranceNode对象,EntranceNode会负责统计名字相同的Context下的指标数据
//将全局缓存contextNameNodeMap赋值给一个临时变量localCacheNameMap
//因为后续会对contextNameNodeMap的内容进行修改,所以这里需要将原来的contextNameNodeMap复制一份出来
//从而避免后续对contextNameNodeMap的内容进行修改时,可能造成对接下来读取contextNameNodeMap内容的影响
Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
//从缓存副本localCacheNameMap中获取EntranceNode
//这个name其实一般就是默认的sentinel_default_context
DefaultNode node = localCacheNameMap.get(name);
//如果获取的EntranceNode为空
if (node == null) {
//为了防止缓存无限制地增长,导致内存占用过高,需要设置一个上限,只要超过上限,就直接返回NULL_CONTEXT
if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
//如果Context还没创建,缓存里也没有当前Context名称对应的EntranceNode,并且缓存数量尚未达到2000
//那么就创建一个EntranceNode,创建EntranceNode时需要加锁,否则会有线程不安全问题
//毕竟需要修改HashMap类型的contextNameNodeMap
//通过加锁 + 缓存 + 写时复制更新缓存,避免并发情况下创建出多个EntranceNode对象
//一个线程对应一个Context对象,多个线程对应多个Context对象
//这些Context对象会使用ThreadLocal进行隔离,但它们的name默认都是sentinel_default_context
//根据下面的代码逻辑:
//多个线程(对应多个Context的name默认都是sentinel_default_context)会共用同一个EntranceNode
//于是可知,多个Context对象会共用一个EntranceNode对象
LOCK.lock();
try {
//从缓存中获取EntranceNode
node = contextNameNodeMap.get(name);
//对node进行Double Check
//如果没获取到EntranceNode
if (node == null) {
if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
//创建EntranceNode,缓存到contextNameNodeMap当中
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
//Add entrance node.
//将新创建的EntranceNode添加到ROOT中,ROOT就是每个Node的根结点
Constants.ROOT.addChild(node);
//写时复制,将新创建的EntranceNode添加到缓存中
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
newMap.putAll(contextNameNodeMap);
newMap.put(name, node);
contextNameNodeMap = newMap;
}
}
} finally {
//解锁
LOCK.unlock();
}
}
}
//此处可能会有多个线程同时执行到此处,并发创建多个Context对象
//但这是允许的,因为一个请求对应一个Context,一个请求对应一个线程,所以一个线程本来就需要创建一个Context对象
//初始化Context,将刚获取到或刚创建的EntranceNode放到Context的entranceNode属性中
context = new Context(node, name);
context.setOrigin(origin);
//将创建出来的Context对象放入ThreadLocal变量contextHolder中,实现Context对象与当前线程的绑定
contextHolder.set(context);
}
return context;
}
...
}
public final class SlotChainProvider {
private static volatile SlotChainBuilder slotChainBuilder = null;
//The load and pick process is not thread-safe,
//but it's okay since the method should be only invoked via CtSph.lookProcessChain() under lock.
public static ProcessorSlotChain newSlotChain() {
//如果存在,则直接返回
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
//Resolve the slot chain builder SPI.
//通过SPI机制初始化SlotChainBuilder
slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
if (slotChainBuilder == null) {
//Should not go through here.
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}", slotChainBuilder.getClass().getCanonicalName());
}
return slotChainBuilder.build();
}
private SlotChainProvider() {
}
}
@Spi(isDefault = true)
public class DefaultSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
//创建一个DefaultProcessorSlotChain对象实例
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
//通过SPI机制加载责任链的节点ProcessorSlot实现类
//然后按照@Spi注解的order属性进行排序并进行实例化
//最后将ProcessorSlot实例放到sortedSlotList中
List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
//遍历已排好序的ProcessorSlot集合
for (ProcessorSlot slot : sortedSlotList) {
//安全检查,防止业务系统也写了一个SPI文件,但没按规定继承AbstractLinkedProcessorSlot
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
continue;
}
//调用DefaultProcessorSlotChain.addLast()方法构建单向链表
//将责任链的节点ProcessorSlot实例放入DefaultProcessorSlotChain中
chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
}
//返回单向链表
return chain;
}
}
在 DefaultSlotChainBuilder 的 build()方法中,从其初始化 ProcessorSlotChain 的逻辑可知,Entry 的处理链的执行入口就是 DefaultProcessorSlotChain 的 entry()方法。
当一个线程调用 SphU 的 entry()方法创建完与接口相关的 Entry 对象后,就会调用 DefaultProcessorSlotChain 的 entry()方法执行处理链节点的逻辑。因为 NodeSelectorSlot 是 Entry 的处理链 ProcessorSlotChain 的第一个节点,所以接着会调用 NodeSelectorSlot 的 entry()方法。由于处理链中紧接着 NodeSelectorSlot 的下一个节点是 ClusterBuilderSlot,所以执行完 NodeSelectorSlot 的 entry()方法后,会接着执行 ClusterBuilderSlot 的 entry()方法。
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
...
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable {
//默认情况下会调用处理链的第一个节点NodeSelectorSlot的transformEntry()方法
first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
}
...
}
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
...
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args) throws Throwable {
T t = (T)o;
entry(context, resourceWrapper, t, count, prioritized, args);
}
...
}
(2)NodeSelectorSlot 的源码
NodeSelectorSlot 和 ClusterBuilderSlot 会一起构建 Context 的资源调用树,资源调用树的作用其实就是用来统计资源的调用数据。
在一个 Context 对象实例的资源调用树上主要会有如下三类节点:DefaultNode、EntranceNode、ClusterNode,分别对应于:单机里的资源维度、接口维度、集群中的资源维度。
其中 DefaultNode 会统计名字相同的 Context 下的某个资源的调用数据,EntranceNode 会统计名字相同的 Context 下的全部资源的调用数据,ClusterNode 会统计某个资源在全部 Context 下的调用数据。
在执行 NodeSelectorSlot 的 entry()方法时,首先会从缓存(NodeSelectorSlot.map 属性)中获取一个 DefaultNode 对象。如果获取不到,再通过 DCL 机制创建一个 DefaultNode 对象并更新缓存。其中缓存的 key 是 Context 的 name,value 是 DefaultNode 对象。由于默认情况下多个线程对应的 Context 的 name 都相同,所以多个线程访问同一资源时使用的 DefaultNode 对象也一样。
在执行 ClusterBuilderSlot 的 entry()方法时,首先会判断缓存是否为 null,若是则创建一个 ClusterNode 对象,然后再将 ClusterNode 对象设置到 DefaultNode 对象的 clusterNode 属性中。
由 DefaultNode、EntranceNode、ClusterNode 构成的资源调用树:因为 DefaultNode 是和资源 ResourceWrapper 以及 Context 挂钩的,所以 DefaultNode 应该添加到 EntranceNode 中。因为 ClusterNode 和资源挂钩,而不和 Context 挂钩,所以 ClusterNode 应该添加到 DefaultNode 中。
具体的资源调用树构建源码如下:
//This class will try to build the calling traces via:
//adding a new DefaultNode if needed as the last child in the context.
//the context's last node is the current node or the parent node of the context.
//setting itself to the context current node.
//It works as follow:
// ContextUtil.enter("entrance1", "appA");
// Entry nodeA = SphU.entry("nodeA");
// if (nodeA != null) {
// nodeA.exit();
// }
// ContextUtil.exit();
//Above code will generate the following invocation structure in memory:
// machine-root
// /
// /
// EntranceNode1
// /
// /
// DefaultNode(nodeA)- - - - - -> ClusterNode(nodeA);
//Here the EntranceNode represents "entrance1" given by ContextUtil.enter("entrance1", "appA").
//Both DefaultNode(nodeA) and ClusterNode(nodeA) holds statistics of "nodeA", which is given by SphU.entry("nodeA").
//The ClusterNode is uniquely identified by the ResourceId;
//The DefaultNode is identified by both the resource id and {@link Context}.
//In other words, one resource id will generate multiple DefaultNode for each distinct context,
//but only one ClusterNode.
//the following code shows one resource id in two different context:
// ContextUtil.enter("entrance1", "appA");
// Entry nodeA = SphU.entry("nodeA");
// if (nodeA != null) {
// nodeA.exit();
// }
// ContextUtil.exit();
// ContextUtil.enter("entrance2", "appA");
// nodeA = SphU.entry("nodeA");
// if (nodeA != null) {
// nodeA.exit();
// }
// ContextUtil.exit();
//Above code will generate the following invocation structure in memory:
// machine-root
// / \
// / \
// EntranceNode1 EntranceNode2
// / \
// / \
// DefaultNode(nodeA) DefaultNode(nodeA)
// | |
// +- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA);
//As we can see, two DefaultNode are created for "nodeA" in two context,
//but only one ClusterNode is created.
//We can also check this structure by calling: http://localhost:8719/tree?type=root
@Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
//DefaultNodes of the same resource in different context.
//缓存map以Context的name为key,DefaultNode为value
//由于默认情况下多个线程对应的Context的name都相同,所以多个线程访问资源时使用的DefaultNode也一样
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
//It's interesting that we use context name rather resource name as the map key.
//Remember that same resource will share the same ProcessorSlotChain globally, no matter in which context.
//Same resource is that ResourceWrapper#equals(Object).
//So if code goes into entry(Context, ResourceWrapper, DefaultNode, int, Object...),
//the resource name must be same but context name may not.
//If we use SphU.entry(String resource)} to enter same resource in different context,
//using context name as map key can distinguish the same resource.
//In this case, multiple DefaultNodes will be created of the same resource name,
//for every distinct context (different context name) each.
//Consider another question. One resource may have multiple DefaultNode,
//so what is the fastest way to get total statistics of the same resource?
//The answer is all DefaultNodes with same resource name share one ClusterNode.
//See ClusterBuilderSlot for detail.
//先从缓存中获取
DefaultNode node = map.get(context.getName());
if (node == null) {
//使用DCL机制,即Double Check + Lock机制
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
//每个线程访问Entry时,都会调用CtSph.entry()方法创建一个ResourceWrapper对象
//下面根据ResourceWrapper创建一个DefaultNode对象
node = new DefaultNode(resourceWrapper, null);
//写时复制更新缓存map
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
//Build invocation tree
//首先会调用context.getLastNode()方法,获取到的是Context.entranceNode属性即一个EntranceNode对象
//EntranceNode对象是在执行ContextUtil.trueEnter()方法创建Context对象实例时添加到Context对象中的
//然后会将刚创建的DefaultNode对象添加到EntranceNode对象的childList列表中
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
//设置Context的curNode属性为当前获取到或新创建的DefaultNode对象
context.setCurNode(node);
//触发执行下一个ProcessorSlot,即ClusterBuilderSlot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
//This slot maintains resource running statistics (response time, qps, thread count, exception),
//and a list of callers as well which is marked by ContextUtil.enter(String origin).
//One resource has only one cluster node, while one resource can have multiple default nodes.
@Spi(isSingleton = false, order = Constants.ORDER_CLUSTER_BUILDER_SLOT)
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
//Remember that same resource will share the same ProcessorSlotChain globally, no matter in which context.
//Same resource is that ResourceWrapper#equals(Object).
//So if code goes into entry(Context, ResourceWrapper, DefaultNode, int, boolean, Object...),
//the resource name must be same but context name may not.
//To get total statistics of the same resource in different context,
//same resource shares the same ClusterNode} globally.
//All ClusterNodes are cached in this map.
//The longer the application runs, the more stable this mapping will become.
//so we don't concurrent map but a lock.
//as this lock only happens at the very beginning while concurrent map will hold the lock all the time.
private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
private static final Object lock = new Object();
private volatile ClusterNode clusterNode = null;
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
if (clusterNode == null) {
//使用DCL机制,即Double Check + Lock机制
synchronized (lock) {
if (clusterNode == null) {
//Create the cluster node.
//创建ClusterNode对象
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
//设置DefaultNode的clusterNode属性为获取到的ClusterNode对象
node.setClusterNode(clusterNode);
//if context origin is set, we should get or create a new {@link Node} of the specific origin.
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
//执行下一个ProcessorSlot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
...
}
(3)Context 对象中存储的资源调用树总结
其实 Context 对象的属性 entranceNode 就代表了一棵资源调用树。
首先,在调用 ContextUtil 的 trueEnter()方法创建 Context 对象实例时,便会创建一个 EntranceNode 对象并赋值给 Context 的 entranceNode 属性,以及调用 Constants.ROOT 的 addChild()方法,将这个 EntranceNode 对象放入 Constants.ROOT 的 childList 列表中。
然后,执行 NodeSelectorSlot 的 entry()方法时,便会创建一个 DefaultNode 对象。该 DefaultNode 对象会被添加到 Context.entranceNode 的 childList 列表中,也就是前面创建的 EntranceNode 对象的 childList 列表中。
接着,执行 ClusterBuilderSlot 的 entry()方法时,便会创建一个 ClusterNode 对象,该 ClusterNode 对象会赋值给前面 DefaultNode 对象中的 clusterNode 属性。
至此,便构建完 Context 下的资源调用树了。Constants.ROOT 的 childList 里会存放多个 EntranceNode 对象,每个 EntranceNode 对象的 childList 里会存放多个 DefaultNode 对象,而每个 DefaultNode 对象会指向一个 ClusterNode 对象。
//This class holds metadata of current invocation:
//the EntranceNode: the root of the current invocation tree.
//the current Entry: the current invocation point.
//the current Node: the statistics related to the Entry.
//the origin: The origin is useful when we want to control different invoker/consumer separately.
//Usually the origin could be the Service Consumer's app name or origin IP.
//Each SphU.entry() or SphO.entry() should be in a Context,
//if we don't invoke ContextUtil.enter() explicitly, DEFAULT context will be used.
//A invocation tree will be created if we invoke SphU.entry() multi times in the same context.
//Same resource in different context will count separately, see NodeSelectorSlot.
public class Context {
//Context name.
private final String name;
//The entrance node of current invocation tree.
private DefaultNode entranceNode;
//Current processing entry.
private Entry curEntry;
//The origin of this context (usually indicate different invokers, e.g. service consumer name or origin IP).
private String origin = "";
...
public Context(DefaultNode entranceNode, String name) {
this(name, entranceNode, false);
}
public Context(String name, DefaultNode entranceNode, boolean async) {
this.name = name;
this.entranceNode = entranceNode;
this.async = async;
}
//Get the parent Node of the current.
public Node getLastNode() {
if (curEntry != null && curEntry.getLastNode() != null) {
return curEntry.getLastNode();
} else {
return entranceNode;
}
}
...
}
public class ContextUtil {
//以Context的name作为key,EntranceNode作为value缓存所有的EntranceNode到HashMap中
private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();
...
protected static Context trueEnter(String name, String origin) {
...
//从缓存中获取EntranceNode
DefaultNode node = contextNameNodeMap.get(name);
...
//创建EntranceNode,缓存到contextNameNodeMap当中
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
//将新创建的EntranceNode添加到ROOT中,ROOT就是每个Node的根结点
Constants.ROOT.addChild(node);
...
//初始化Context,将刚获取到或刚创建的EntranceNode放到Context的entranceNode属性中
context = new Context(node, name);
...
}
...
}
public final class Constants {
...
//Global ROOT statistic node that represents the universal parent node.
public final static DefaultNode ROOT = new EntranceNode(
new StringResourceWrapper(ROOT_ID, EntryType.IN),
new ClusterNode(ROOT_ID, ResourceTypeConstants.COMMON)
);
...
}
//A Node used to hold statistics for specific resource name in the specific context.
//Each distinct resource in each distinct Context will corresponding to a DefaultNode.
//This class may have a list of sub DefaultNodes.
//Child nodes will be created when calling SphU.entry() or SphO.entry() multiple times in the same Context.
public class DefaultNode extends StatisticNode {
//The resource associated with the node.
private ResourceWrapper id;
//The list of all child nodes.
private volatile Set<Node> childList = new HashSet<>();
//Associated cluster node.
private ClusterNode clusterNode;
...
//Add child node to current node.
public void addChild(Node node) {
if (node == null) {
RecordLog.warn("Trying to add null child to node <{}>, ignored", id.getName());
return;
}
if (!childList.contains(node)) {
synchronized (this) {
if (!childList.contains(node)) {
Set<Node> newSet = new HashSet<>(childList.size() + 1);
newSet.addAll(childList);
newSet.add(node);
childList = newSet;
}
}
RecordLog.info("Add child <{}> to node <{}>", ((DefaultNode)node).id.getName(), id.getName());
}
}
//Reset the child node list.
public void removeChildList() {
this.childList = new HashSet<>();
}
...
}
@Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
//DefaultNodes of the same resource in different context.
//缓存map以Context的name为key,DefaultNode为value
//由于默认情况下多个线程对应的Context的name都相同,所以多个线程访问资源时使用的DefaultNode也一样
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);
...
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
...
//先从缓存中获取
DefaultNode node = map.get(context.getName());
...
//下面根据ResourceWrapper创建一个DefaultNode对象
node = new DefaultNode(resourceWrapper, null);
...
//Build invocation tree
//首先会调用context.getLastNode()方法,获取到的是Context.entranceNode属性即一个EntranceNode对象
//EntranceNode对象是在执行ContextUtil.trueEnter()方法创建Context对象实例时添加到Context对象中的
//然后会将刚创建的DefaultNode对象添加到EntranceNode对象的childList列表中
((DefaultNode) context.getLastNode()).addChild(node);
...
//执行下一个ProcessorSlot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
...
}
@Spi(isSingleton = false, order = Constants.ORDER_CLUSTER_BUILDER_SLOT)
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
...
private volatile ClusterNode clusterNode = null;
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
...
//创建ClusterNode对象
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
...
//设置DefaultNode的clusterNode属性为获取到的ClusterNode对象
node.setClusterNode(clusterNode);
...
//执行下一个ProcessorSlot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
...
}
//资源调用树的示例如下所示:
// machine-root
// / \
// / \
// EntranceNode1 EntranceNode2
// / \
// / \
// DefaultNode(nodeA) DefaultNode(nodeA)
// | |
// +- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA);
//其中,machine-root中的childList里会有很多个EntranceNode对象
//EntranceNode对象中的childList里又会有很多个DefaultNode对象
//每个DefaultNode对象下都会指向一个ClusterNode对象
一些对应关系的梳理总结:
一个线程对应一个 ResourceWrapper 对象实例,一个线程对应一个 Context 对象实例。如果 ResourceWrapper 对象相同,则会共用一个 ProcessorSlotChain 实例。如果 ResourceWrapper 对象相同,则也会共用一个 ClusterNode 实例。如果 Context 对象的名字相同,则会共用一个 EntranceNode 对象实例。如果 Context 对象的名字相同,则也会共用一个 DefaultNode 对象实例。
//每个请求对应一个线程,每个线程绑定一个Context,所以每个请求对应一个Context
private static ThreadLocal<Context> contextHolder = new ThreadLocal<>();
//以Context的name作为key,EntranceNode作为value缓存所有的EntranceNode到HashMap中
private static volatile Map<String, EntranceNode> contextNameNodeMap = new HashMap<>();
//Same resource will share the same ProcessorSlotChain}, no matter in which Context.
//Same resource is that ResourceWrapper#equals(Object).
private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap = new HashMap<ResourceWrapper, ProcessorSlotChain>();
//DefaultNodes of the same resource in different context.
//以Context的name作为key,DefaultNode作为value
//由于默认情况下多个线程对应的Context的name都相同,所以多个线程访问资源时使用的DefaultNode也一样
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);
//To get total statistics of the same resource in different context,
//same resource shares the same ClusterNode globally.
//All ClusterNodes are cached in this map.
private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
2.LogSlot 和 StatisticSlot 采集资源的数据
(1)LogSlot 的源码
LogSlot 用于记录异常请求日志,以便于故障排查。也就是当出现 BlockException 异常时,调用 EagleEyeLogUtil 的 log()方法将日志写到 sentinel-block.log 文件中。
//A ProcessorSlot that is response for logging block exceptions to provide concrete logs for troubleshooting.
@Spi(order = Constants.ORDER_LOG_SLOT)
public class LogSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args) throws Throwable {
try {
//调用下一个ProcessorSlot
fireEntry(context, resourceWrapper, obj, count, prioritized, args);
} catch (BlockException e) {
//被流控或者熔断降级后打印log日志
EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(), context.getOrigin(), e.getRule().getId(), count);
throw e;
} catch (Throwable e) {
RecordLog.warn("Unexpected entry exception", e);
}
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
try {
//调用下一个ProcessorSlot
fireExit(context, resourceWrapper, count, args);
} catch (Throwable e) {
RecordLog.warn("Unexpected entry exit exception", e);
}
}
}
public class EagleEyeLogUtil {
public static final String FILE_NAME = "sentinel-block.log";
private static StatLogger statLogger;
static {
String path = LogBase.getLogBaseDir() + FILE_NAME;
statLogger = EagleEye.statLoggerBuilder("sentinel-block-log")
.intervalSeconds(1)
.entryDelimiter('|')
.keyDelimiter(',')
.valueDelimiter(',')
.maxEntryCount(6000)
.configLogFilePath(path)
.maxFileSizeMB(300)
.maxBackupIndex(3)
.buildSingleton();
}
public static void log(String resource, String exceptionName, String ruleLimitApp, String origin, Long ruleId, int count) {
String ruleIdString = StringUtil.EMPTY;
if (ruleId != null) {
ruleIdString = String.valueOf(ruleId);
}
statLogger.stat(resource, exceptionName, ruleLimitApp, origin, ruleIdString).count(count);
}
}
(2)StatisticSlot 的源码
StatisticSlot 用于统计资源的调用数据,如请求成功数、请求失败数、响应时间等。
注意:开始对请求进行规则验证时,需要调用 SphU 的 entry()方法。完成对请求的规则验证后,也需要调用 Entry 的 exit()方法。
//A processor slot that dedicates to real time statistics.
//When entering this slot, we need to separately count the following information:
//ClusterNode: total statistics of a cluster node of the resource ID.
//Origin node: statistics of a cluster node from different callers/origins.
//DefaultNode: statistics for specific resource name in the specific context.
//Finally, the sum statistics of all entrances.
@Spi(order = Constants.ORDER_STATISTIC_SLOT)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
try {
//Do some checking.
//执行下一个ProcessorSlot,先进行规则验证等
fireEntry(context, resourceWrapper, node, count, prioritized, args);
//Request passed, add thread count and pass count.
//如果通过了后面ProcessorSlot的验证
//则将处理当前资源resourceWrapper的线程数 + 1 以及 将对当前资源resourceWrapper的成功请求数 + 1
node.increaseThreadNum();
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
//Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
//Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
//Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
//Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
//Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
//Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {//捕获BlockException
//Blocked, set block exception to current entry.
context.getCurEntry().setBlockError(e);
//Add block count.
//如果规则验证失败,则将BlockQps+1
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
//Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
//Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
//Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);
throw e;
}
}
//开始对请求进行规则验证时,需要调用SphU.entry()方法
//完成对请求的规则验证后,也需要调用Entry.exit()方法
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
Node node = context.getCurNode();
if (context.getCurEntry().getBlockError() == null) {
//Calculate response time (use completeStatTime as the time of completion).
//获取系统当前时间
long completeStatTime = TimeUtil.currentTimeMillis();
context.getCurEntry().setCompleteTimestamp(completeStatTime);
//计算响应时间 = 系统当前事件 - 根据资源resourceWrapper创建Entry资源访问对象时的时间
long rt = completeStatTime - context.getCurEntry().getCreateTimestamp();
Throwable error = context.getCurEntry().getError();
//Record response time and success count.
//记录响应时间等信息
recordCompleteFor(node, count, rt, error);
recordCompleteFor(context.getCurEntry().getOriginNode(), count, rt, error);
if (resourceWrapper.getEntryType() == EntryType.IN) {
recordCompleteFor(Constants.ENTRY_NODE, count, rt, error);
}
}
//Handle exit event with registered exit callback handlers.
Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
for (ProcessorSlotExitCallback handler : exitCallbacks) {
handler.onExit(context, resourceWrapper, count, args);
}
fireExit(context, resourceWrapper, count, args);
}
private void recordCompleteFor(Node node, int batchCount, long rt, Throwable error) {
if (node == null) {
return;
}
node.addRtAndSuccess(rt, batchCount);
node.decreaseThreadNum();
if (error != null && !(error instanceof BlockException)) {
node.increaseExceptionQps(batchCount);
}
}
}
(3)记录资源在不同维度下的调用数据
一.如何统计单机里某个资源的调用数据
由于 DefaultNode 会统计名字相同的 Context 下的某个资源的调用数据,它是按照单机里的资源维度进行调用数据统计的,所以在 StatisticSlot 的 entry()方法中,会调用 DefaultNode 的方法来进行统计。
//A Node used to hold statistics for specific resource name in the specific context.
//Each distinct resource in each distinct Context will corresponding to a DefaultNode.
//This class may have a list of sub DefaultNodes.
//Child nodes will be created when calling SphU.entry() or SphO.entry() multiple times in the same Context.
public class DefaultNode extends StatisticNode {
//The resource associated with the node.
private ResourceWrapper id;
//Associated cluster node.
private ClusterNode clusterNode;
...
@Override
public void increaseThreadNum() {
super.increaseThreadNum();
this.clusterNode.increaseThreadNum();
}
@Override
public void addPassRequest(int count) {
super.addPassRequest(count);
this.clusterNode.addPassRequest(count);
}
@Override
public void increaseBlockQps(int count) {
super.increaseBlockQps(count);
this.clusterNode.increaseBlockQps(count);
}
@Override
public void addRtAndSuccess(long rt, int successCount) {
super.addRtAndSuccess(rt, successCount);
this.clusterNode.addRtAndSuccess(rt, successCount);
}
@Override
public void decreaseThreadNum() {
super.decreaseThreadNum();
this.clusterNode.decreaseThreadNum();
}
...
}
public class StatisticNode implements Node {
//The counter for thread count.
private LongAdder curThreadNum = new LongAdder();
//Holds statistics of the recent INTERVAL milliseconds.
//The INTERVAL is divided into time spans by given sampleCount.
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);
//Holds statistics of the recent 60 seconds.
//The windowLengthInMs is deliberately set to 1000 milliseconds,
//meaning each bucket per second, in this way we can get accurate statistics of each second.
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
...
@Override
public void increaseThreadNum() {
curThreadNum.increment();
}
@Override
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
@Override
public void increaseBlockQps(int count) {
rollingCounterInSecond.addBlock(count);
rollingCounterInMinute.addBlock(count);
}
@Override
public void addRtAndSuccess(long rt, int successCount) {
rollingCounterInSecond.addSuccess(successCount);
rollingCounterInSecond.addRT(rt);
rollingCounterInMinute.addSuccess(successCount);
rollingCounterInMinute.addRT(rt);
}
@Override
public void decreaseThreadNum() {
curThreadNum.decrement();
}
...
}
二.如何统计所有资源的调用数据即接口调用数据
由于 EntranceNode 会统计名字相同的 Context 下的全部资源的调用数据,它是按接口维度来统计调用数据的,即统计接口下所有资源的调用情况,所以可以通过遍历 EntranceNode 的 childList 来统计接口的调用数据。
//A Node represents the entrance of the invocation tree.
//One Context will related to a EntranceNode,
//which represents the entrance of the invocation tree.
//New EntranceNode will be created if current context does't have one.
//Note that same context name will share same EntranceNode globally.
public class EntranceNode extends DefaultNode {
public EntranceNode(ResourceWrapper id, ClusterNode clusterNode) {
super(id, clusterNode);
}
@Override
public double avgRt() {
double total = 0;
double totalQps = 0;
for (Node node : getChildList()) {
total += node.avgRt() * node.passQps();
totalQps += node.passQps();
}
return total / (totalQps == 0 ? 1 : totalQps);
}
@Override
public double blockQps() {
double blockQps = 0;
for (Node node : getChildList()) {
blockQps += node.blockQps();
}
return blockQps;
}
@Override
public long blockRequest() {
long r = 0;
for (Node node : getChildList()) {
r += node.blockRequest();
}
return r;
}
@Override
public int curThreadNum() {
int r = 0;
for (Node node : getChildList()) {
r += node.curThreadNum();
}
return r;
}
@Override
public double totalQps() {
double r = 0;
for (Node node : getChildList()) {
r += node.totalQps();
}
return r;
}
@Override
public double successQps() {
double r = 0;
for (Node node : getChildList()) {
r += node.successQps();
}
return r;
}
@Override
public double passQps() {
double r = 0;
for (Node node : getChildList()) {
r += node.passQps();
}
return r;
}
@Override
public long totalRequest() {
long r = 0;
for (Node node : getChildList()) {
r += node.totalRequest();
}
return r;
}
@Override
public long totalPass() {
long r = 0;
for (Node node : getChildList()) {
r += node.totalPass();
}
return r;
}
}
三.如何统计集群中某个资源的调用数据
由于 ClusterNode 会统计某个资源在全部 Context 下的调用数据,它是按照集群中的资源维度进行调用数据统计的,而 StatisticSlot 的 entry()调用 DefaultNode 的方法统计单机下的资源时,会顺便调用 ClusterNode 的方法来统计集群下的资源调用,所以通过 ClusterNode 就可以获取集群中某个资源的调用数据。
//A Node used to hold statistics for specific resource name in the specific context.
//Each distinct resource in each distinct Context will corresponding to a DefaultNode.
//This class may have a list of sub DefaultNodes.
//Child nodes will be created when calling SphU.entry() or SphO.entry() multiple times in the same Context.
public class DefaultNode extends StatisticNode {
//The resource associated with the node.
private ResourceWrapper id;
//Associated cluster node.
private ClusterNode clusterNode;
...
@Override
public void increaseThreadNum() {
super.increaseThreadNum();
this.clusterNode.increaseThreadNum();
}
@Override
public void addPassRequest(int count) {
super.addPassRequest(count);
this.clusterNode.addPassRequest(count);
}
@Override
public void increaseBlockQps(int count) {
super.increaseBlockQps(count);
this.clusterNode.increaseBlockQps(count);
}
@Override
public void addRtAndSuccess(long rt, int successCount) {
super.addRtAndSuccess(rt, successCount);
this.clusterNode.addRtAndSuccess(rt, successCount);
}
@Override
public void decreaseThreadNum() {
super.decreaseThreadNum();
this.clusterNode.decreaseThreadNum();
}
...
}
文章转载自:东阳马生架构

电子尖叫食人鱼
还未添加个人签名 2025-04-01 加入
还未添加个人简介
评论