写点什么

全链路压测必备基础组件之线程上下文管理之“三剑客”

  • 2021 年 11 月 11 日
  • 本文字数:8633 字

    阅读完需:约 28 分钟

Object value = key.childValue(e.value);


Entry c = new Entry(key, value);


int h = key.threadLocalHashCode & (len - 1);


while (table[h] != null)


h = nextIndex(h, len);


table[h] = c;


size++;


}


}


}


}


上述代码就不一一分析,类似于 Map 的复制,只不过其在 Hash 冲突时,不是使用链表结构,而是直接在数组中找下一个为 null 的槽位。


温馨提示:子线程默认拷贝父线程的方式是浅拷贝,如果需要使用深拷贝,需要使用自定义 ThreadLocal,继承 InheritableThreadLocal 并重写 childValue 方法。

2.3 验证 InheritableThreadLocal 的特性

验证代码如下:


public class Service {


private static InheritableThreadLocal<Integer> requestIdThreadLocal = new InheritableThreadLocal<>();


public static void main(String[] args) {


Integer reqId = new Integer(5);


Service a = new Service();


a.setRequestId(reqId);


}


public void setRequestId(Integer requestId) {


requestIdThreadLocal.set(requestId);


doBussiness();


}


public void doBussiness() {


System.out.println("首先打印 requestId:" + requestIdThreadLocal.get());


(new Thread(new Runnable() {


@Override


public void run() {


System.out.println("子线程启动");


System.out.println("在子线程中访问 requestId:" + requestIdThreadLocal.get());


}


})).start();


}


}


执行结果如下:



符合预期,在子线程中如愿访问到了在主线程中设置的本地环境变量。

2.4 InheritableThreadLocal 局限性

InheritableThreadLocal 支持子线程访问在父线程的核心思想是在创建线程的时候将父线程中的本地变量值复制到子线程,即复制的时机为创建子线程时。但我们提到并发、多线程就理不开线程池的使用,因为线程池能够复用线程,减少线程的频繁创建与销毁,如果使用 InheritableThreadLocal,那么线程池中的线程拷贝的数据来自于第一个提交任务的外部线程,即后面的外部线程向线程池中提交任务时,子线程访问的本地变量都来源于第一个外部线程,造成线程本地变量混乱,验证代码如下:


import java.util.concurrent.ExecutorService;


import java.util.concurrent.Executors;


public class Service {


/**


  • 模拟 tomcat 线程池


*/


private static ExecutorService tomcatExecutors = Executors.newFixedThreadPool(10);


/**


  • 业务线程池,默认 Contro


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


l 中异步任务执行线程池


*/


private static ExecutorService businessExecutors = Executors.newFixedThreadPool(5);


/**


  • 线程上下文环境,模拟在 Control 这一层,设置环境变量,然后在这里提交一个异步任务,模拟在子线程中,是否可以访问到刚设置的环境变量值。


*/


private static InheritableThreadLocal<Integer> requestIdThreadLocal = new InheritableThreadLocal<>();


public static void main(String[] args) {


for(int i = 0; i < 10; i ++ ) { // 模式 10 个请求,每个请求执行 ControlThread 的逻辑,其具体实现就是,先输出父线程的名称,


// 然后设置本地环境变量,并将父线程名称传入到子线程中,在子线程中尝试获取在父线程中的设置的环境变量


tomcatExecutors.submit(new ControlThread(i));


}


//简单粗暴的关闭线程池


try {


Thread.sleep(10000);


} catch (InterruptedException e) {


e.printStackTrace();


}


businessExecutors.shutdown();


tomcatExecutors.shutdown();


}


/**


  • 模拟 Control 任务


*/


static class ControlThread implements Runnable {


private int i;


public ControlThread(int i) {


this.i = i;


}


@Override


public void run() {


System.out.println(Thread.currentThread().getName() + ":" + i);


requestIdThreadLocal.set(i);


//使用线程池异步处理任务


businessExecutors.submit(new BusinessTask(Thread.currentThread().getName()));


}


}


/**


  • 业务任务,主要是模拟在 Control 控制层,提交任务到线程池执行


*/


static class BusinessTask implements Runnable {


private String parentThreadName;


public BusinessTask(String parentThreadName) {


this.parentThreadName = parentThreadName;


}


@Override


public void run() {


//如果与上面的能对应上来,则说明正确,否则失败


System.out.println("parentThreadName:" + parentThreadName + ":" + requestIdThreadLocal.get());


}


}


}


执行效果如下:


pool-1-thread-1:0


pool-1-thread-2:1


pool-1-thread-3:2


pool-1-thread-4:3


pool-1-thread-5:4


pool-1-thread-6:5


pool-1-thread-7:6


pool-1-thread-8:7


pool-1-thread-9:8


pool-1-thread-10:9


parentThreadName:pool-1-thread-7:6


parentThreadName:pool-1-thread-4:6


parentThreadName:pool-1-thread-3:6


parentThreadName:pool-1-thread-2:6


parentThreadName:pool-1-thread-1:6


parentThreadName:pool-1-thread-9:6


parentThreadName:pool-1-thread-10:6


parentThreadName:pool-1-thread-8:7


parentThreadName:pool-1-thread-6:5


parentThreadName:pool-1-thread-5:4


从这里可以出 thread-7、thread-4、thread-3、thread-2、thread-1、thread-9、thread-10 获取的都是 6,在子线程中出现出现了线程本地变量混乱的现象,在全链路跟踪与压测出现这种情况是致命的。


问题:大家通过上面的学习,应该能解释这个现象?此处可以稍微停下来思考一番。


怎么解决这个问题呢?


TransmittableThreadLocal ”闪亮登场“。


3、TransmittableThreadLocal



3.1 TransmittableThreadLocal“何许人也”

TransmittableThreadLocal 何许人也,它可是阿里巴巴开源的专门解决 InheritableThreadLocal 的局限性,实现线程本地变量在线程池的执行过程中,能正常的访问父线程设置的线程变量。实践是检验整理的唯一标准,我们还是以上面的示例来进行验证,看看 TransmittableThreadLocal 是否支持上述场景:


首先需要在 pom.xml 文件中引入如下 maven 依赖:


<dependency>


<groupId>com.alibaba</groupId>


<artifactId>transmittable-thread-local</artifactId>


<version>2.10.2</version>


</dependency>


示例代码如下:


public class Service {


/**


  • 模拟 tomcat 线程池


*/


private static ExecutorService tomcatExecutors = Executors.newFixedThreadPool(10);


/**


  • 业务线程池,默认 Control 中异步任务执行线程池


*/


private static ExecutorService businessExecutors = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(4)); // 使用 ttl 线程池,该框架的使用,请查阅官方文档。


/**


  • 线程上下文环境,模拟在 Control 这一层,设置环境变量,然后在这里提交一个异步任务,模拟在子线程中,是否可以访问到刚设置的环境变量值。


*/


private static TransmittableThreadLocal<Integer> requestIdThreadLocal = new TransmittableThreadLocal<>();


// private static InheritableThreadLocal<Integer> requestIdThreadLocal = new InheritableThreadLocal<>();


public static void main(String[] args) {


for(int i = 0; i < 10; i ++ ) {


tomcatExecutors.submit(new ControlThread(i));


}


//简单粗暴的关闭线程池


try {


Thread.sleep(10000);


} catch (InterruptedException e) {


e.printStackTrace();


}


businessExecutors.shutdown();


tomcatExecutors.shutdown();


}


/**


  • 模拟 Control 任务


*/


static class ControlThread implements Runnable {


private int i;


public ControlThread(int i) {


this.i = i;


}


@Override


public void run() {


System.out.println(Thread.currentThread().getName() + ":" + i);


requestIdThreadLocal.set(i);


//使用线程池异步处理任务


businessExecutors.submit(new BusinessTask(Thread.currentThread().getName()));


}


}


/**


  • 业务任务,主要是模拟在 Control 控制层,提交任务到线程池执行


*/


static class BusinessTask implements Runnable {


private String parentThreadName;


public BusinessTask(String parentThreadName) {


this.parentThreadName = parentThreadName;


}


@Override


public void run() {


//如果与上面的能对应上来,则说明正确,否则失败


System.out.println("parentThreadName:" + parentThreadName + ":" + requestIdThreadLocal.get());


}


}


}


其运行结果如下:


pool-1-thread-10:9


pool-1-thread-8:7


pool-1-thread-7:6


pool-1-thread-9:8


pool-1-thread-6:5


pool-1-thread-5:4


pool-1-thread-4:3


pool-1-thread-3:2


pool-1-thread-2:1


pool-1-thread-1:0


parentThreadName:pool-1-thread-5:4


parentThreadName:pool-1-thread-9:4


parentThreadName:pool-1-thread-3:4


parentThreadName:pool-1-thread-2:4


parentThreadName:pool-1-thread-7:4


parentThreadName:pool-1-thread-8:4


parentThreadName:pool-1-thread-1:4


parentThreadName:pool-1-thread-6:5


parentThreadName:pool-1-thread-10:9


parentThreadName:pool-1-thread-4:3


执行结果符合预期。那 TransmittableThreadLocal 是如何实现的呢?

3.2 TransmittableThreadLocal 实现原理

从 InheritableThreadLocal 不支持线程池的根本原因是 InheritableThreadLocal 是在父线程创建子线程时复制的,由于线程池的复用机制,“子线程”只会复制一次。要支持线程池中能访问提交任务线程的本地变量,其实只需要在父线程在向线程池提交任务时复制父线程的上下环境,那在子线程中就能够如愿访问到父线程中的本地遍历,实现本地环境变量在线程调用之中的透传,实现链路跟踪,这也就是 TransmittableThreadLocal 最本质的实现原理。

3.2.1 TransmittableThreadLocal 类图


TransmittableThreadLocal 继承自 InheritableThreadLocal,接下来将从 set 方法为入口,开始探究 TransmittableThreadLocal 实现原理。

3.2.2 set 方法详解

public final void set(T value) {


super.set(value); // @1


// may set null to remove value


if (null == value) // @2


removeValue();


else


addValue();


}


代码 @1:首先调用父类的 set 方法,将 value 存入线程本地遍历,即 Thread 对象的 inheritableThreadLocals 中。


代码 @2:如果 value 为空,则调用 removeValue()否则调用 addValue。


那接下来重点看看这两个方法有什么名堂:


private void addValue() {


if (!holder.get().containsKey(this)) { // @1


holder.get().put(this, null); // WeakHashMap supports null value.


}


}


private void removeValue() {


holder.get().remove(this);


}


代码 @1:当前线程在调用 threadLocal 方法的 set 方法(即向线程本地遍历存储数据时),如果需要设置的值不为 null,则调用 addValue 方法,将当前 ThreadLocal 存储到 TransmittableThreadLocal 的全局静态变量 holder。holder 的定义如下:


private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder =


new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {


@Override


protected Map<TransmittableThreadLocal<?>, ?> initialValue() {


return new WeakHashMap<TransmittableThreadLocal<?>, Object>();


}


@Override


protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {


return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue);


}


};


从中可以看出,使用了线程本地变量,内部存放的结构为 Map<TransmittableThreadLocal<?>, ?>,即该对象缓存了线程执行过程中所有的 TransmittableThreadLocal 对象,并且其关联的值不为空。但这样做有什么用呢?


为了解开这个难题,可能需要大家对 ttl 这个框架的使用有一定的理解,本文由于篇幅的原因,将不会详细介绍,如有大家有兴趣,可以查阅其官网了解其使用:https://github.com/alibaba/transmittable-thread-local


ExecutorService executorService = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(4));


TransmittableThreadLocal<String> parent = new TransmittableThreadLocal<String>();


parent.set("value-set-in-parent");


Runnable task = new Task("1");


Callable call = new Call("2");


executorService.submit(task);


executorService.submit(call);


我们从 submit 为突破口,来尝试解开 holder 属性用途。


class ExecutorTtlWrapper implements Executor, TtlEnhanced {


private final Executor executor;


ExecutorTtlWrapper(@Nonnull Executor executor) {


this.executor = executor;


}


@Override


public void execute(@Nonnull Runnable command) {


executor.execute(TtlRunnable.get(command)); // @1


}


@Nonnull


public Executor unwrap() {


return executor;


}


}


在向线程池提交任务时,会使用 TtlRunnable 对提交任务进行包装。接下来将重点探讨 TtlRunnable。

3.2.2 TtlRunnable 详解

3.2.2.1 类图


下面一一来介绍其核心属性:


  • AtomicReference< Object> capturedRef


“捕获”的引用,根据下文的解读,该引用指向的数据结构包含了父线程在执行过程中,通过使用 TransmittableThreadLocal 存储的本地线程变量,但这里的触发时机是向线程池提交任务时捕获。


  • Runnable runnable


提交到线程池中待执行的业务逻辑。


  • boolean releaseTtlValueReferenceAfterRun


默认为 false。


接下来重点看一下其构造方法


private TtlRunnable(@Nonnull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {


this.capturedRef = new AtomicReference<Object>(capture()); // @1


this.runnable = runnable;


this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;


}


构造方法没什么特别,重点看一下子线程是如何“捕获”父线程中已设置的本地线程变量。


TransmittableThreadLocal$Transmitter#capture


public static Object capture() {


Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>(); // @1


for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) { // @2


captured.put(threadLocal, threadLocal.copyValue()); // @3


}


return captured;


}


代码 @1:先创建 Map 容器,用来存储父线程的本地线程变量,键为在父线程执行过程中使用到的 TransmittableThreadLocal 线程。


代码 @2:holder.get(),获取父线程中使用中的 ThreadLocal,因为我们从 3.2.2 节中发现,在当前线程在调用 TransmittableThreadLocal 的 set 方法,并且其值不为空的时候,会将 TransmittableThreadLocal 对象存储存储在当前线程的本地变量中。故这里使用 holder.get()方法能获取父线程中已使用的 ThreadLocal,并其值不为 null。


代码 @3:遍历父线程已使用的线程本地,将其值存入到 captured 中,注意默认是浅拷贝,如果向实现深度拷贝,请重写 TransmittableThreadLocal 的 copyValue 方法。


温馨提示:从这里看出 TransmittableThreadLocal 的静态属性 holder 的用处吧,请重点理解 holder 的属性类型为:InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>。


在向线程池提交任务时,就会先捕获父线程(提交任务到线程池的线程)中的本地环境变量,接下来重点来看一下其 run 方法。

3.2.2.2 run 方法

public void run() {


Object captured = capturedRef.get();


if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {


throw new IllegalStateException("TTL value reference is released after run!");


}


Object backup = replay(captured); // @1


try {


runnable.run(); // @2


} finally {


restore(backup); // @3


}


}


代码 @1:"重放"父线程的本地环境变量,即使用从父线程中捕获过来的上下文环境,在子线程中重新执行一遍,并返回原先存在与子线程中的上下文环境变量。


代码 @2:执行业务逻辑。


代码 @3:恢复线程池中当前执行任务的线程的上下文环境,即代码 @1,会直接继承父线程中的上下文环境,但会将原先存在该线程的线程上下文环境进行备份,在任务执行完后通过执行 restore 方法进行恢复。


不得不佩服这里设计的巧妙。笔者有理由相信能看到这里的诸位读者一定是有实力有求知的欲望的人,那我们在来看一下 replay、restore 方法的实现。

3.2.2.3 replay

public static Object replay(@Nonnull Object captured) {


@SuppressWarnings("unchecked")


Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured; // @1


Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();


for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator(); // @2


iterator.hasNext(); ) {


Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();


TransmittableThreadLocal<?> threadLocal = next.getKey();


backup.put(threadLocal, threadLocal.get()); // @3


// clear the TTL values that is not in captured


// avoid the extra TTL values after replay when run task


if (!capturedMap.containsKey(threadLocal)) { // @4


iterator.remove();


threadLocal.superRemove();


}


// set values to captured TTL


setTtlValuesTo(capturedMap); // @5


// call beforeExecute callback


doExecuteCallback(true); // @6


return backup; // @7


}


代码 @1:首先解释一下两个局部变量的含义:


  • capturedMap


子线程从父线程捕获的线程本地遍历。


  • backup


线程池中处理本次任务的线程中原先存在的本地线程变量。


代码 @2:holder.get(),这里是子线程中原先存在的本地线程变量(即线程池中分配来执行本次任务的线程),然后遍历它,将其存储在 backUp(@3)。


代码 @4:从这里开始,开始将根据父线程的本地变量来重放当前线程,如果父线程中不包含的 threadlocal 对象,将从本地线程变量中移除。


代码 @5:遍历父线程中的本地线程变量,在子线程中重新执行一次 threadlocal.set 方法。


代码 @6:执行 beforeExecute()狗子函数。


代码 @7:返回线程池原线程的本地线程变量,供本次调用后恢复上下文环境。

3.2.2.4 restore

恢复线程中子线程原先的本地线程变量,即恢复线程,本次执行并不会污染线程池中线程原先的上下文环境,精妙。我们来看看其代码实现:


public static void restore(@Nonnull Object backup) {


@SuppressWarnings("unchecked")


Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup; // @1


// call afterExecute callback


doExecuteCallback(false); // @2


for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator(); // @3


iterator.hasNext(); ) {


Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();


TransmittableThreadLocal<?> threadLocal = next.getKey();


// clear the TTL values that is not in bac1kup


// avoid the extra TTL values after restore


if (!backupMap.containsKey(threadLocal)) { // @4


iterator.remove();


threadLocal.superRemove();


}


}


// restore TTL values


setTtlValuesTo(backupMap); // @5


}


代码 @1:获取备份好的线程本地上下文。


代码 @2:执行 afterExecute()钩子函数。


代码 @3:遍历本地线程变量,将不属于 backUpMap 中存在的线程本地上下文移除(@4)。


代码 @5:遍历备份的本地线程本地,在本地线程中重新执行 threadlocal#set 方法,实现线程本地变量的还原。


本文介绍到这里了,详细介绍了 ThreadLocal、InheritableThreadLocal、TransmittableThreadLocal 的实现原理,并从 ThreadLocal、InheritableThreadLocal 的局限性,最终引出 TransmittableThreadLocal,为全链路压测中压测标记的透传打下坚实的基础。


各位可爱的读者,您能看到这里,我相信应该收获满满,有劳帮忙点个赞,谢谢您的鼓励。




打造完备分布式架构体系





评论

发布
暂无评论
全链路压测必备基础组件之线程上下文管理之“三剑客”