写点什么

一次「找回」TraceId 的问题分析与过程思考

  • 2023-04-21
    湖南
  • 本文字数:14144 字

    阅读完需:约 46 分钟

用好中间件是每一个开发人员的基本功,一个专业的开发人员,追求的不仅是中间件的日常使用,还要探究这背后的设计初衷和底层逻辑,进而保证我们的系统运行更加稳定,让开发工作更加高效。


结合这一主题,本文从一次线上告警问题出发,通过第一时间定位问题的根本原因,进而引出 Google Dapper 与 MTrace(美团内部自研)这类分布式链路追踪系统的设计思想和实现途径,再回到问题本质深入 @Async 的源码分析底层的异步逻辑和实现特点,并给出 MTrace 跨线程传递失效的原因和解决方案,最后梳理目前主流的分布式跟踪系统的现状,并结合开发人员日常使用中间件的场景提出一些思考和总结。

一、问题背景和思考

1.1 问题背景

在一次排查线上告警的过程中,突然发现一个链路信息有点不同寻常(这里仅展示测试复现的内容):

在机器中可以清楚的发现“2022-08-02 19:26:34.952 DXMsgRemoteService ”这一行日志信息并没有携带 TraceId,导致调用链路信息戛然而止,无法追踪当时的调用情况。

1.2 问题复现和思考

在处理完线上告警后,我们开始分析“丢失”的 TraceId 到底去了哪里?首先在代码中定位 TraceId 没有追踪到的部分,发现问题出现在一个 @Async 注解下的方法,删除无关的业务信息代码,并增加 MTrace 埋点方法后的复现代码如下:

@SpringBootTest@RunWith(SpringRunner.class)@EnableAsyncpublic class DemoServiceTest extends TestCase {	@Resource		private DemoService demoService;	@Test		public void testTestAsy() {		Tracer.serverRecv("test");		String mainThreadName = Thread.currentThread().getName();		long mainThreadId = Thread.currentThread().getId();		System.out.println("------We got main thread: "+ mainThreadName + " - " +  mainThreadId + "  Trace Id: " + Tracer.id() + "----------");		demoService.testAsy();	}}@Componentpublic class DemoService {	@Async		public void testAsy(){		String asyThreadName = Thread.currentThread().getName();		long asyThreadId = Thread.currentThread().getId();		System.out.println("======Async====");		System.out.println("------We got asy thread: "+ asyThreadName + " - " +  asyThreadId + "  Trace Id: " + Tracer.id() + "----------");	}}
复制代码

运行这段代码后,我们看看控制台实际的输出结果:

------We got main thread: main - 1  Trace Id: -5292097998940230785----------======Async====------We got asy thread: SimpleAsyncTaskExecutor-1 - 630  Trace Id: null----------
复制代码

至此我们可以发现 TraceId 是在 @Async 异步传递的过程中发生丢失现象,明白了造成这一现象的原因后,我们开始思考:

  • MTrace(美团内部自研的分布式链路追踪系统)这类分布式链路追踪系统是如何设计的?

  • @Async 异步方法是如何实现的?

  • InheritableThreadLocal、TransmittableThreadLocal 和 TransmissibleThreadLocal 有什么区别?

  • 为什么 MTrace 的跨线程传递方案“失效”了?

  • 如何解决 @Async 场景下“弄丢”TraceId 的问题?

  • 目前有哪些分布式链路追踪系统?它们又是如何解决跨线程传递问题的?

二、深度分析

2.1 MTrace 与 Google Dapper

MTrace 是美团参考 Google Dapper 对服务间调用链信息收集和整理的分布式链路追踪系统,目的是帮助开发人员分析系统各项性能和快速排查告警问题。


要想了解 MTrace 是如何设计分布式链路追踪系统的,首先看看 Google Dapper 是如何在大型分布式环境下实现分布式链路追踪。我们先来看看下图一个完整的分布式请求:

用户发送一个请求到前端 A,然后请求分发到两个不同的中间层服务 B 和 C,服务 B 在处理完请求后将结果返回,同时服务 C 需要继续调用后端服务 D 和 E 再将处理后的请求结果进行返回,最后由前端 A 汇总来响应用户的这次请求。


回顾这次完整的请求我们不难发现,要想直观可靠的追踪多项服务的分布式请求,我们最关注的是每组客户端和服务端之间的请求响应以及响应耗时,因此,Google Dapper 采取对每一个请求和响应设置标识符和时间戳的方式实现链路追踪,基于这一设计思想的基本追踪树模型如下图所示:

追踪树模型由 span 组成,其中每个 span 包含 span name、span id、parent id 和 trace id,进一步分析跟踪树模型中各个 span 之间的调用关系可以发现,其中没有 parent id 且 span id 为 1 代表根服务调用,span id 越小代表服务在调用链的过程中离根服务就越近,将模型中各个相对独立的 span 联系在一起就构成了一次完整的链路调用记录,我们再继续深入看看 span 内部的细节信息:

除了最基本的 span name、span id 和 parent id 之外,Annotations 扮演着重要的角色,Annotations 包括<Strat>、Client Send、Server Recv、Server Send、Client Recv 和<End>这些注解,记录了 RPC 请求中 Client 发送请求到 Server 的处理响应时间戳信息,其中 foo 注解代表可以自定义的业务数据,这些也会一并记录到 span 中,提供给开发人员记录业务信息;在这当中有 64 位整数构成的 trace id 作为全局的唯一标识存储在 span 中。


至此我们已经了解到,Google Dapper 主要是在每个请求中配置 span 信息来实现对分布式系统的追踪,那么又是用什么方式在分布式请求中植入这些追踪信息呢?


为满足低损耗、应用透明和大范围部署的设计目标,Google Dapper 支持应用开发者依赖于少量通用组件库,实现几乎零投入的成本对分布式链路进行追踪,当一个服务线程在链路中调用其他服务之前,会在 ThreadLocal 中保存本次跟踪的上下文信息,主要包括一些轻量级且易复制的信息(类似 spand id 和 trace id),当服务线程收到响应之后,应用开发者可以通过回调函数进行服务信息日志打印。


MTrace 是美团参考 Google Dapper 的设计思路并结合自身业务进行了改进和完善后的自研产品,具体的实现流程这里就不再赘述了,我们重点看看 MTrace 做了哪些改进:

  • 在美团的各个中间件中埋点,来采集发生调用的调用时长和调用结果等信息,埋点的上下文主要包括传递信息、调用信息、机器相关信息和自定义信息,各个调用链路之间有一个全局且唯一的变量 TraceId 来记录一次完整的调用情况和追踪数据。

  • 在网络间的数据传递中,MTrace 主要传递使用 UUID 异或生成的 TraceId 和表示层级和前后关系的 SpanId,支持批量压缩上报、TraceId 做聚合和 SpanId 构建形态。

  • 目前,产品已经覆盖到 RPC 服务、HTTP 服务、MySQL、Cache 缓存和 MQ,基本实现了全覆盖。

  • MTrace 支持跨线程传递和代理来优化埋点方式,减轻开发人员的使用成本。

2.2 @Async 的异步过程追溯

从 Spring3 开始提供了 @Async 注解,该注解的使用需要注意以下几点:

  1. 需要在配置类上增加 @EnableAsync 注解;

  2. @Async 注解可以标记一个异步执行的方法,也可以用来标记一个类表明该类的所有方法都是异步执行;

  3. 可以在 @Async 中自定义执行器。


我们以 @EnableAsync 为入口开始分析异步过程,除了基本的配置方法外,我们重点关注下配置类 AsyncConfigurationSelector 的内部逻辑,由于默认条件下我们使用 JDK 接口代理,这里重点看看 ProxyAsyncConfiguration 类的代码逻辑:

@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {	@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)		@Role(BeanDefinition.ROLE_INFRASTRUCTURE)		public AsyncAnnotationBeanPostProcessor asyncAdvisor() {		Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");		//新建一个异步注解bean后置处理器		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();		//如果@EnableAsync注解中有自定义annotation配置则进行设置		Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");		if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {			bpp.setAsyncAnnotationType(customAsyncAnnotation);		}		if (this.executor != null) {			//设置线程处理器			bpp.setExecutor(this.executor);		}		if (this.exceptionHandler != null) {			//设置异常处理器			bpp.setExceptionHandler(this.exceptionHandler);		}		//设置是否需要创建CGLIB子类代理,默认为false		bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));		//设置异步注解bean处理器应该遵循的执行顺序,默认最低的优先级		bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));		return bpp;	}}
复制代码

ProxyAsyncConfiguration 继承了父类 AbstractAsyncConfiguration 的方法,重点定义了一个 AsyncAnnotationBeanPostProcessor 的异步注解 bean 后置处理器。看到这里我们可以知道,@Async 主要是通过后置处理器生成一个代理对象来实现异步的执行逻辑,接下来我们重点关注 AsyncAnnotationBeanPostProcessor 是如何实现异步的:

从类图中我们可以直观地看到 AsyncAnnotationBeanPostProcessor 同时实现了 BeanFactoryAware 的接口,因此我们进入 setBeanFactory()方法,可以看到对 AsyncAnnotationAdvisor 异步注解切面进行了构造,再接着进入 AsyncAnnotationAdvisor 的 buildAdvice()方法中可以看 AsyncExecutionInterceptor 类,再看类图发现 AsyncExecutionInterceptor 实现了 MethodInterceptor 接口,而 MethodInterceptor 是 AOP 中切入点的处理器,对于 interceptor 类型的对象,处理器中最终被调用的是 invoke 方法,所以我们重点看看 invoke 的代码逻辑:

public Object invoke(final MethodInvocation invocation) throws Throwable {	Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);	Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);	final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);  //首先获取到一个线程池	AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);	if (executor == null) {		throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");	}  //封装Callable对象到线程池执行	Callable<Object> task = () -> {		try {			Object result = invocation.proceed();			if (result instanceof Future) {				return ((Future<?>) result).get();			}		}		catch (ExecutionException ex) {			handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());		}		catch (Throwable ex) {			handleError(ex, userDeclaredMethod, invocation.getArguments());		}		return null;	};  //任务提交到线程池	return doSubmit(task, executor, invocation.getMethod().getReturnType());}
复制代码

我们再接着看看 @Async 用了什么线程池,重点关注 determineAsyncExecutor 方法中 getExecutorQualifier 指定获取的默认线程池是哪一个:

@Override@Nullableprotected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {	Executor defaultExecutor = super.getDefaultExecutor(beanFactory);   	return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor()); //其中默认线程池是SimpleAsyncTaskExecutor}
复制代码

至此,我们了解到在未指定线程池的情况下调用被标记为 @Async 的方法时,Spring 会自动创建 SimpleAsyncTaskExecutor 线程池来执行该方法,从而完成异步执行过程。

2.3. “丢失”TraceId 的原因

回顾我们之前对 MTrace 的学习和了解,TraceId 等信息是在 ThreadLocal 中进行传递和保存,那么当异步方法切换线程的时候,就会出现下图中上下文信息传递丢失的问题:

下面我们探究一下 ThreadLocal 有哪些跨线程传递方案?MTrace 又提供哪些跨线程传递方案?SimpleAsyncTaskExecutor 又有什么不一样?逐步找到“丢失”TraceId 的原因。

2.3.1 InheritableThreadLocal、TransmittableThreadLocal 和 TransmissibleThreadLocal

在前面的分析中,我们发现跨线程场景下上下文信息是保存在 ThreadLocal 中发生丢失,那么我们接下来看看 ThreadLocal 的特点及其延伸出来的类,是否可以解决这一问题:

  • ThreadLocal 主要是为每个 ThreadLocal 对象创建一个 ThreadLocalMap 来保存对象和线程中的值的映射关系。当创建一个 ThreadLocal 对象时会调用 get()或 set()方法,在当前线程的中查找这个 ThreadLocal 对象对应的 Entry 对象,如果存在,就获取或设置 Entry 中的值;否则,在 ThreadLocalMap 中创建一个新的 Entry 对象。ThreadLocal 类的实例被多个线程共享,每个线程都拥有自己的 ThreadLocalMap 对象,存储着自己线程中的所有 ThreadLocal 对象的键值对。ThreadLocal 的实现比较简单,但需要注意的是,如果使用不当,可能会出现内存泄漏问题,因为 ThreadLocalMap 中的 Entry 对象并不会自动删除。

  • InheritableThreadLocal 的实现方式和 ThreadLocal 类似,但不同之处在于,当一个线程创建子线程时会调用 init()方法:

private void init(ThreadGroup g, Runnable target, String name,long stackSize, AccessControlContext acc,Boolean inheritThreadLocals) {	if (inheritThreadLocals && parent.inheritableThreadLocals != null)  //拷贝父线程的变量	this.inheritableThreadLocals =ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);		this.stackSize = stackSize;	tid = nextThreadID();}
复制代码

这意味着子线程可以访问父线程中的 InheritableThreadLocal 实例,而且在子线程中调用 set()方法时,会在子线程自己的 inheritableThreadLocals 字段中创建一个新的 Entry 对象,而不会影响父线程中的 Entry 对象。同时,根据源码我们也可以看到 Thread 的 init()方法是在线程构造方法中拷贝的,在线程复用的线程池中是没有办法使用的。


  • TransmittableThreadLocal 是阿里巴巴提供的解决跨线程传递上下文的 InheritableThreadLocal 子类,引入了 holder 来保存需要在线程间进行传递的变量,大致流程我们可以参考下面给出的时序图分析:

步骤可以总结为:

  1. 装饰 Runnable,将主线程的 TTL 传入到 TtlRunnable 的构造方法中;

  2. 将子线程的 TTL 的值进行备份,将主线程的 TTL 设置到子线程中(value 是对象引用,可能存在线程安全问题);

  3. 执行子线程逻辑;

  4. 删除子线程新增的 TTL,将备份还原重新设置到子线程的 TTL 中,从而保证了 ThreadLocal 的值在多线程环境下的传递性。


TransmittableThreadLocal 虽然解决了 InheritableThreadLocal 的继承问题,但是由于需要在序列化和反序列化时对 ThreadLocalMap 进行处理,会增加对象创建和序列化的成本,并且需要支持的序列化框架较少,不够灵活。

  • TransmissibleThreadLocal 是继承了 InheritableThreadLocal 类并重写了 get()、set()和 remove()方法,TransmissibleThreadLocal 的实现方式和 TransmittableThreadLocal 类似,主要的执行逻辑在 Transmitter 的 capture()方法复制 holder 中的变量,replay()方法过滤非父线程的 holder 的变量,restore()来恢复经过 replay()过滤后 holder 的变量:

public class TransmissibleThreadLocal<T> extends InheritableThreadLocal<T> {	public static class Transmitter {		public static Object capture() {			Map<TransmissibleThreadLocal<?>, Object> captured = new HashMap<TransmissibleThreadLocal<?>, Object>();      //获取所有存储在holder中的变量			for (TransmissibleThreadLocal<?> threadLocal : holder.get().keySet()) { 				captured.put(threadLocal, threadLocal.copyValue());			}			return captured;		}		public static Object replay(Object captured) {			@SuppressWarnings("unchecked")			Map<TransmissibleThreadLocal<?>, Object> capturedMap = (Map<TransmissibleThreadLocal<?>, Object>) captured;			Map<TransmissibleThreadLocal<?>, Object> backup = new HashMap<TransmissibleThreadLocal<?>, Object>();			for (Iterator<? extends Map.Entry<TransmissibleThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();iterator.hasNext(); ) {				Map.Entry<TransmissibleThreadLocal<?>, ?> next = iterator.next();				TransmissibleThreadLocal<?> threadLocal = next.getKey();				// backup				backup.put(threadLocal, threadLocal.get());				// clear the TTL value only in captured				// avoid extra TTL value in captured, when run task.        //过滤非传递的变量				if (!capturedMap.containsKey(threadLocal)) { 					iterator.remove();					threadLocal.superRemove();				}			}			// set value to captured TTL			for (Map.Entry<TransmissibleThreadLocal<?>, Object> entry : capturedMap.entrySet()) {				@SuppressWarnings("unchecked")				TransmissibleThreadLocal<Object> threadLocal = (TransmissibleThreadLocal<Object>) entry.getKey();				threadLocal.set(entry.getValue());			}			// call beforeExecute callback			doExecuteCallback(true);			return backup;		}		public static void restore(Object backup) {			@SuppressWarnings("unchecked")			Map<TransmissibleThreadLocal<?>, Object> backupMap = (Map<TransmissibleThreadLocal<?>, Object>) backup;			// call afterExecute callback			doExecuteCallback(false);			for (Iterator<? extends Map.Entry<TransmissibleThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();						                 iterator.hasNext(); ) {				Map.Entry<TransmissibleThreadLocal<?>, ?> next = iterator.next();				TransmissibleThreadLocal<?> threadLocal = next.getKey();				// clear the TTL value only in backup				// avoid the extra value of backup after restore				if (!backupMap.containsKey(threadLocal)) { 					iterator.remove();					threadLocal.superRemove();				}			}			// restore TTL value			for (Map.Entry<TransmissibleThreadLocal<?>, Object> entry : backupMap.entrySet()) {				@SuppressWarnings("unchecked")				TransmissibleThreadLocal<Object> threadLocal = (TransmissibleThreadLocal<Object>) entry.getKey();				threadLocal.set(entry.getValue());			}		}	}}
复制代码

TransmissibleThreadLocal 不但可以解决跨线程的传递问题,还能保证子线程和主线程之间的隔离,但是目前跨线程拷贝 span 数据时,采用浅拷贝有丢失数据的风险。最后,我们可以根据下表综合对比:

考虑到 TransmittableThreadLocal 并非标准的 Java API,而是第三方库提供的,存在与其它库的兼容性问题,无形中增加了代码的复杂性和使用难度。因此,MTrace 选择自定义实现的 TransmissibleThreadLocal 类可以方便地在跨线程和跨服务的情况下传递追踪信息,透明自动完成所有异步执行上下文的可定制、规范化的捕捉传递,使得整个跟踪信息更加完整和准确。

2.3.2 Mtrace 的跨线程传递方案

这一问题 MTrace 其实已经提供解决方案,主要的设计思路是在子线程初始化 Runnable 对象的时候首先会去父线程的 ThreadLocal 中拿到保存的 trace 信息,然后作为参数传递给子线程,子线程在初始化的时候设置 trace 信息来避免丢失。下面我们看看具体实现。


父线程新建任务时捕捉所有 TransmissibleThreadLocal 中的变量信息,如下图所示:

子线程执行任务时复制父线程捕捉的 TransmissibleThreadLocal 变量信息,并返回备份的 TransmissibleThreadLocal 变量信息,如下图所示:

在子线程执行完业务流程后会恢复之前备份的 TransmissibleThreadLocal 变量信息,如下图所示:

这种方案可以解决跨线程传递上下文丢失的问题,但是需要代码层面的开发会增加开发人员的工作量,对于一个分布式追踪系统而言并不是最优解:

TraceRunnable command = new TraceRunnable(runnable);newThread(command).start();executorService.execute(command);
复制代码

因此,MTrace 同时提供无侵入方式的 javaagent&instrument 技术,可以简单理解成一个类加载时的 AOP 功能,只要在 JVM 参数添加 javaagent 的配置,不需要修饰 Runnable 或是线程池的代码,就可以在启动时增强完成跨线程传递问题。


回归到本次的问题中来,目前使用的 MDP 本身就已经集成了 MTrace-agent 的模式,但是为什么还是会“弄丢”TraceId 呢?查看 MTrace 的 ThreadPoolTransformer 类和 ForkJoinPoolTransformer 类我们可以知道,MTrace 修改了 ThreadPoolExecutor 类、ScheduledThreadPoolExecutor 类和 ForkJoinTask 类的字节码,顺着这个思路我们再看看 @Async 用到的 SimpleAsyncTaskExecutor 线程池是怎么一回事。

2.3.3 SimpleAsyncTaskExecutor 是怎么一回事

我们先深入 SimpleAsyncTaskExecutor 的代码中,看看执行逻辑:

public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implements AsyncListenableTaskExecutor, Serializable {	private ThreadFactory threadFactory;	public void execute(Runnable task, long startTimeout) {		Assert.notNull(task, "Runnable must not be null");    //isThrottleActive是否开启限流(默认concurrencyLimit=-1,不开启限流)		if(this.isThrottleActive() && startTimeout > 0L) {					this.concurrencyThrottle.beforeAccess();			this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(task));			this.concurrencyThrottle.beforeAccess();			this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(task));			this.concurrencyThrottle.beforeAccess();			this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(task));		} else {			this.doExecute(task);		}	}	protected void doExecute(Runnable task) {    //没有线程工厂的话默认创建线程		Thread thread = this.threadFactory != null?this.threadFactory.newThread(task):this.createThread(task);				thread.start();	}	public Thread createThread(Runnable runnable) {    //和线程池不同,每次都是创建新的线程		Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName());		thread.setPriority(getThreadPriority());		thread.setDaemon(isDaemon());		return thread;	}}
复制代码

看到这里我们可以得出以下几个特性:

  • SimpleAsyncTaskExecutor 每次执行提交给它的任务时,会启动新的线程,并不是严格意义上的线程池,达不到线程复用的功能。

  • 允许开发者控制并发线程的上限(concurrencyLimit)起到一定的资源节流作用,但默认 concurrencyLimit 取值为-1,即不启用资源节流,有引发内存泄漏的风险。

  • 阿里技术编码规约要求用 ThreadPoolExecutor 的方式来创建线程池,规避资源耗尽的风险。


结合之前说过的 MTrace 线程池代理模型,我们继续再来看看 SimpleAsyncTaskExecutor 的类图:

可以发现,其继承了 spring 的 TaskExecutor 接口,其实质是 java.util.concurrent.Executor,结合我们这次“丢失”的 TraceId 问题来看,我们已经找到了 Mtrace 的跨线程传递方案“失效”的原因:虽然 MTrace 已经通过 javaagent&instrument 技术可以完成 Trace 信息跨线程传递,但是目前只覆盖到 ThreadPoolExecutor 类、ScheduledThreadPoolExecutor 类和 ForkJoinTask 类的字节码,而 @Async 在未指定线程池的情况下默认会启用 SimpleAsyncTaskExecutor,其本质是 java.util.concurrent.Executor 没有被覆盖到,就会造成 ThreadLocal 中的 get 方法获取信息为空,导致最终 TraceId 传递丢失。

三、解决方案

实际上 @Async 支持我们使用自定义的线程池,可以手动自定义 Configuration 来配置 ThreadPoolExecutor 线程池,然后在注解里面指定 bean 的名称,就可以切换到对应的线程池去,可以看看下面的代码:

@Configurationpublic class ThreadPoolConfig {	@Bean("taskExecutor")	    public Executor taskExecutor() {		ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();		//设置线程池参数信息		taskExecutor.setCorePoolSize(10);		taskExecutor.setMaxPoolSize(50);		taskExecutor.setQueueCapacity(200);		taskExecutor.setKeepAliveSeconds(60);		taskExecutor.setThreadNamePrefix("myExecutor--");		taskExecutor.setWaitForTasksToCompleteOnShutdown(true);		taskExecutor.setAwaitTerminationSeconds(60);		taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());		taskExecutor.initialize();		return taskExecutor;	}}
复制代码

然后在注解中标注这个线程池:

@SpringBootTest@RunWith(SpringRunner.class)@EnableAsyncpublic class DemoServiceTest extends TestCase {	@Resource	  private DemoService demoService;	@Test	  public void testTestAsy() {		Tracer.serverRecv("test");		String mainThreadName = Thread.currentThread().getName();		long mainThreadId = Thread.currentThread().getId();		System.out.println("------We got main thread: "+ mainThreadName + " - " +  mainThreadId + "  Trace Id: " + Tracer.id() + "----------");		demoService.testAsy();	}}@Componentpublic class DemoService {	@Async("taskExecutor")	  public void testAsy(){		String asyThreadName = Thread.currentThread().getName();		long asyThreadId = Thread.currentThread().getId();		System.out.println("======Async====");		System.out.println("------We got asy thread: "+ asyThreadName + " - " +  asyThreadId + "  Trace Id: " + Tracer.id() + "----------");	}}
复制代码

看看输出台的打印:

------We got main thread: main - 1  Trace Id: -3495543588231940494----------======Async====------We got asy thread: SimpleAsyncTaskExecutor-1 - 658  Trace Id: 3495543588231940494----------
复制代码

最终,我们可以通过这一方式“找回”在 @Async 注解下跨线程传递而“丢失”的 TraceId。

四、其他方案对比

分布式追踪系统从诞生之际到有实质性的突破,很大程度受到 Google Dapper 的影响,目前常见的分布式追踪系统有 Twitter 的 Zipkin、SkyWalking、阿里的 EagleEye、PinPoint 和美团的 MTrace 等,这些大多都是基于 Google Dapper 的设计思想,考虑到设计思路和架构特点,我们重点介绍 Zipkin、SkyWalking 和 EagleEye 的基本框架和跨线程解决方案(以下内容主要来源官网及作者总结,仅供参考,不构成技术建议)。

4.1 Zipkin

Zipkin 是由 Twitter 公司贡献开发的一款开源的分布式追踪系统,官方提供有基于 Finagle 框架(Scala 语言)的接口,而其他框架的接口由社区贡献,目前可以支持 Java、Python、Ruby 和 C#等主流开发语言和框架,其主要功能是聚集来自各个异构系统的实时监控数据。主要由 4 个核心组件构成,如下图所示:

  • Collector:收集器组件,它主要用于处理从外部系统发送过来的跟踪信息,将这些信息转换为 Zipkin 内部处理的 Span 格式,以支持后续的存储、分析、展示等功能。

  • Storage:存储组件,它主要对处理收集器接收到的跟踪信息,默认会将这些信息存储起来,同时支持修改存储策略。

  • API:API 组件,它主要用来提供外部访问接口,比如给客户端展示跟踪信息,或是外接系统访问以实现监控等。

  • UI:UI 组件,基于 API 组件实现的上层应用,通过 UI 组件用户可以方便而有直观地查询和分析跟踪信息。


当用户发起一次调用的时候,Zipkin 的客户端会在入口处先记录这次请求相关的 trace 信息,然后在调用链路上传递 trace 信息并执行实际的业务流程,为防止追踪系统发送延迟与发送失败导致用户系统的延迟与中断,采用异步的方式发送 trace 信息给 Zipkin Collector,Zipkin Server 在收到 trace 信息后,将其存储起来。随后 Zipkin 的 Web UI 会通过 API 访问的方式从存储中将 trace 信息提取出来分析并展示。

最后,我们看看 Zipkin 的跨线程传递方案的优缺点:在单个线程的调用中 Zipkin 通过定义一个 ThreadLocal<TraceContext> local 来完成在整个线程执行过程中获取相同的 Trace 值,但是当新起一个线程的时候 ThreadLocal 就会失效,对于这种场景,Zipkin 对于不提交线程池的场景提供 InheritableThreadLocal<TraceContext>来解决父子线程 trace 信息传递丢失的问题。


而对于 @Async 的使用场景,Zipkin 提供 CurrentTraceContext 类首先获取父线程的 trace 信息,然后将 trace 信息复制到子线程来,其基本思路和上文 MTrace 的一致,但是需要代码开发,具有较强的侵入性。

4.2 SkyWalking

SkyWalking 是 Apache 基金会下面的一个开源的应用程序性能监控系统,提供了一种简便的方式来清晰地观测云原生和基于容器的分布式系统。具有支持多种语言探针;微内核+插件的架构;存储、集群管理和使用插件集合都可以自由选择;支持告警;优秀的可视化效果的特点。其主要由 4 个核心组件构成,如下图所示:

  • 探针:基于不同的来源可能是不一样的,但作用都是收集数据,将数据格式化为 SkyWalking 适用的格式。

  • 平台后端:支持数据聚合,数据分析以及驱动数据流从探针到用户界面的流程。分析包括 Skywalking 原生追踪和性能指标以及第三方来源,包括 Istio、Envoy telemetry、Zipkin 追踪格式化等。

  • 存储:通过开放的插件化的接口存放 SkyWalking 数据。用户可以选择一个既有的存储系统,如 ElasticSearch、H2 或 MySQL 集群(Sharding-Sphere 管理),也可以指定选择实现一个存储系统。

  • UI :一个基于接口高度定制化的 Web 系统,用户可以可视化查看和管理 SkyWalking 数据。


SkyWalking 的工作原理和 Zipkin 类似,但是相比较于 Zipkin 接入系统的方式,SkyWalking 使用了插件化+javaagent 的形式来实现:通过虚拟机提供的用于修改代码的接口来动态加入打点的代码,如通过 javaagent premain 来修改 Java 类,在系统运行时操作代码,让用户可以在不需要修改代码的情况下进行链路追踪,对业务的代码无侵入性,同时使用字节码操作技术(Byte-Buddy)和 AOP 概念来实现拦截追踪上下文的 trace 信息,这样一来每个用户只需要根据自己的需用定义拦截点,就可以实现对一些模块实施分布式追踪。

最后,我们总结一下 SkyWalking 的跨线程传递方案的优缺点:和主流的分布式追踪系统类似,SkyWalking 也是借助 ThreadLocal 来存储上下文信息,当遇到跨线程传输时也面临传递丢失的场景,针对这一问题 SkyWalking 会在父线程调用 ContextManager.capture()将 trace 信息保存到一个 ContextSnapshot 的实例中并返回,ContextSnapshott 则被附加到任务对象的特定属性中,那么当子线程处理任务对象的时会先取出 ContextSnapshott 对象,将其作为入参调用 ContextManager.continued(contextSnapshot)来保存到子线程中。


整体思路其实和主流的分布式追踪系统的相似,SkyWalking 目前只针对带有 @TraceCrossThread 注解的 Callable、Runnable 和 Supplier 这三种接口的实现类进行增强拦截,通过使用 xxxWrapper.of 的包装方式,避免开发者需要大的代码改动。

4.3 EagleEye

EagleEye 阿里巴巴开源的应用性能监控工具,提供了多维度、实时、自动化的应用性能监控和分析能力。它可以帮助开发人员实时监控应用程序的性能指标、日志、异常信息等,并提供相应的性能分析和报告,帮助开发人员快速定位和解决问题。主要由以下 5 部分组成:

  • 代理:代理是鹰眼的数据采集组件,通过代理可以采集应用程序的性能指标、日志、异常信息等数据,并将其传输到鹰眼的存储和分析组件中。代理支持多种协议,如 HTTP、Dubbo、RocketMQ、Kafka 等,能够满足不同场景下的数据采集需求。

  • 存储:存储是鹰眼的数据存储组件,负责存储代理采集的数据,并提供高可用、高性能、高可靠的数据存储服务。存储支持多种存储引擎,如 HBase、Elasticsearch、TiDB 等,可以根据实际情况进行选择和配置。

  • 分析:分析是鹰眼的数据分析组件,负责对代理采集的数据进行实时分析和处理,并生成相应的监控指标和性能报告。分析支持多种分析引擎,如 Apache Flink、Apache Spark 等,可以根据实际情况进行选择和配置。

  • 可视化:可视化是鹰眼的数据展示组件,负责将分析产生的监控指标和性能报告以图形化的方式展示出来,以便用户能够直观地了解系统的运行状态和性能指标。

  • 告警:告警是鹰眼的告警组件,负责根据用户的配置进行异常检测和告警,及时发现和处理系统的异常情况,防止系统出现故障。


不同于 SkyWalking 的开源社区,EagleEye 重点面向阿里内部环境开发,针对海量实时监控的痛点,对底层的流计算、多维时序指标与交互体系等进行了大量优化,同时引入了时序检测、根因分析、业务链路特征等技术,将问题发现与定位由被动转为主动。


EagleEye 采用了 StreamLib 实时流式处理技术提升流计算性能,对采集的数据进行实时分析和处理,当监控一个电商网站时,可以实时地分析用户访问的日志数据,并根据分析结果来优化网站的性能和用户体验;参考 Apache Flink 的 Snapshot 优化齐全度算法来保证监控系统确定性;为了满足不同的个性化需求,把一些可复用的逻辑变成了“积木块”,让用户按照自己的需求,拼装流计算的 pipeline。

最后总结一下 EagleEye 的跨线程传递方案优缺点:EagleEye 的解决思路和大多数分布式追踪系统一致,都是通过 javaagent 的方式修改线程池的实现,进而子线程可以获取到父线程到 trace 信息,不同于 SkyWalking 这种开源系统采用的字节码增强,EagleEye 大多数场景是内部使用,所以采用直接编码的方式,维护和性能消耗方面也是非常有优势的,但扩展性和开放性并不是非常友好。

五、总结

本文意在从日常工作中一个很细微的问题出发,探究分析背后的设计思想和底层原因,主要涉及以下方面:

  • 抓住问题本质:在业务系统报警中抓住问题的核心代码并尝试再次复现问题,找到真正出问题的模块。

  • 深入理解设计思想:在查阅公司中间件的产品文档的基础上再继续追根溯源,学习业内领先者最开始的分布式链路追踪系统的设计思想和实现途径。

  • 结合实际问题提出疑问:结合了解到的分布式链路追踪系统的实现流程和设计思想,回归到一开始我们要解决的 TraceId 丢失情况分析是在什么环节出现问题。

  • 阅读源码找到底层逻辑:从 @Async 注解、SimpleAsyncTaskExecutor 和 ThreadLocal 类源码进行层层追踪,分析底层真正的实现逻辑和特点。

  • 对比分析找到解决方案:分析为什么 Mtrace 的跨线程传递方案“失效”了,找到原因提供解决方案并总结其他分布式追踪系统。


从本文可以看出,中间件的出现不仅为我们维护系统的稳定提供有力的支持,还已经为使用中可能发生的问题提供了更高效的解决方案,作为开发人员在享受这一极大便利的同时,还是要沉下心来认真思考其中的实现逻辑和使用场景,如果只是一味的低头使用不求甚解,那么在一些特定问题上往往会显得十分被动,无法发挥中间件真正的价值,甚至在没有中间件支撑时无法高效的解决问题。


作者:美团技术团队

链接:https://juejin.cn/post/7224306040601837624

来源:稀土掘金

用户头像

还未添加个人签名 2021-07-28 加入

公众号:该用户快成仙了

评论

发布
暂无评论
一次「找回」TraceId的问题分析与过程思考_Java_做梦都在改BUG_InfoQ写作社区