写点什么

SpringBoot 异步线程,父子线程数据传递的 5 种姿势

作者:Java你猿哥
  • 2023-06-03
    湖南
  • 本文字数:7074 字

    阅读完需:约 23 分钟

姿势 1:ThreadLocal+TaskDecorator

用户工具类

/** *使用ThreadLocal存储共享的数据变量,如登录的用户信息 */public class UserUtils {    private static  final  ThreadLocal<String> userLocal=new ThreadLocal<>();
public static String getUserId(){ return userLocal.get(); } public static void setUserId(String userId){ userLocal.set(userId); }
public static void clear(){ userLocal.remove(); }
}
复制代码

自定义 CustomTaskDecorator

/** * 线程池修饰类 */public class CustomTaskDecorator implements TaskDecorator {    @Override    public Runnable decorate(Runnable runnable) {        // 获取主线程中的请求信息(我们的用户信息也放在里面)        String robotId = UserUtils.getUserId();        System.out.println(robotId);        return () -> {            try {                // 将主线程的请求信息,设置到子线程中                UserUtils.setUserId(robotId);                // 执行子线程,这一步不要忘了                runnable.run();            } finally {                // 线程结束,清空这些信息,否则可能造成内存泄漏                UserUtils.clear();            }        };    }}
复制代码


ExecutorConfig
复制代码

在原来的基础上增加 executor.setTaskDecorator(new CustomTaskDecorator());

@Bean(name = "asyncServiceExecutor")    public Executor asyncServiceExecutor() {        log.info("start asyncServiceExecutor----------------");        //ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        //使用可视化运行状态的线程池        ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();        //配置核心线程数        executor.setCorePoolSize(corePoolSize);        //配置最大线程数        executor.setMaxPoolSize(maxPoolSize);        //配置队列大小        executor.setQueueCapacity(queueCapacity);        //配置线程池中的线程的名称前缀        executor.setThreadNamePrefix(namePrefix);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//增加线程池修饰类 executor.setTaskDecorator(new CustomTaskDecorator()); //增加MDC的线程池修饰类 //executor.setTaskDecorator(new MDCTaskDecorator()); //执行初始化 executor.initialize(); log.info("end asyncServiceExecutor------------"); return executor; }
复制代码


AsyncServiceImpl
复制代码


/**     * 使用ThreadLocal方式传递     * 带有返回值     * @throws InterruptedException     */    @Async("asyncServiceExecutor")    public CompletableFuture<String> executeValueAsync2() throws InterruptedException {        log.info("start executeValueAsync");        System.out.println("异步线程执行返回结果......+");        log.info("end executeValueAsync");        return CompletableFuture.completedFuture(UserUtils.getUserId());    }
复制代码


Test2Controller
复制代码


/**     * 使用ThreadLocal+TaskDecorator的方式     * @return     * @throws InterruptedException     * @throws ExecutionException     */    @GetMapping("/test2")    public String test2() throws InterruptedException, ExecutionException {        UserUtils.setUserId("123456");        CompletableFuture<String> completableFuture = asyncService.executeValueAsync2();        String s = completableFuture.get();        return s;    }
复制代码

姿势 2:RequestContextHolder+TaskDecorator

自定义 CustomTaskDecorator

/** * 线程池修饰类 */public class CustomTaskDecorator implements TaskDecorator {    @Override    public Runnable decorate(Runnable runnable) {        // 获取主线程中的请求信息(我们的用户信息也放在里面)        RequestAttributes attributes = RequestContextHolder.getRequestAttributes();        return () -> {            try {                // 将主线程的请求信息,设置到子线程中                RequestContextHolder.setRequestAttributes(attributes);                // 执行子线程,这一步不要忘了                runnable.run();            } finally {                // 线程结束,清空这些信息,否则可能造成内存泄漏                RequestContextHolder.resetRequestAttributes();            }        };    }}
复制代码


ExecutorConfig
复制代码

在原来的基础上增加 executor.setTaskDecorator(new CustomTaskDecorator());

@Bean(name = "asyncServiceExecutor")    public Executor asyncServiceExecutor() {        log.info("start asyncServiceExecutor----------------");        //ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        //使用可视化运行状态的线程池        ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();        //配置核心线程数        executor.setCorePoolSize(corePoolSize);        //配置最大线程数        executor.setMaxPoolSize(maxPoolSize);        //配置队列大小        executor.setQueueCapacity(queueCapacity);        //配置线程池中的线程的名称前缀        executor.setThreadNamePrefix(namePrefix);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//增加线程池修饰类 executor.setTaskDecorator(new CustomTaskDecorator()); //增加MDC的线程池修饰类 //executor.setTaskDecorator(new MDCTaskDecorator()); //执行初始化 executor.initialize(); log.info("end asyncServiceExecutor------------"); return executor; }
复制代码


AsyncServiceImpl
复制代码


/**     * 使用RequestAttributes获取主线程传递的数据     * @return     * @throws InterruptedException     */    @Async("asyncServiceExecutor")    public CompletableFuture<String> executeValueAsync3() throws InterruptedException {        log.info("start executeValueAsync");        System.out.println("异步线程执行返回结果......+");        RequestAttributes attributes = RequestContextHolder.getRequestAttributes();        Object userId = attributes.getAttribute("userId", 0);        log.info("end executeValueAsync");        return CompletableFuture.completedFuture(userId.toString());    }
复制代码


Test2Controller
复制代码


/**     * RequestContextHolder+TaskDecorator的方式     * @return     * @throws InterruptedException     * @throws ExecutionException     */    @GetMapping("/test3")    public String test3() throws InterruptedException, ExecutionException {        RequestAttributes attributes = RequestContextHolder.getRequestAttributes();        attributes.setAttribute("userId","123456",0);        CompletableFuture<String> completableFuture = asyncService.executeValueAsync3();        String s = completableFuture.get();        return s;    }
复制代码

姿势 3:MDC+TaskDecorator

自定义 MDCTaskDecorator

/** * 线程池修饰类 */public class MDCTaskDecorator implements TaskDecorator {    @Override    public Runnable decorate(Runnable runnable) {        // 获取主线程中的请求信息(我们的用户信息也放在里面)        String userId = MDC.get("userId");        Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();        System.out.println(copyOfContextMap);        return () -> {            try {                // 将主线程的请求信息,设置到子线程中                MDC.put("userId",userId);                // 执行子线程,这一步不要忘了                runnable.run();            } finally {                // 线程结束,清空这些信息,否则可能造成内存泄漏                MDC.clear();            }        };    }}
复制代码


ExecutorConfig
复制代码

在原来的基础上增加 executor.setTaskDecorator(new MDCTaskDecorator());

@Bean(name = "asyncServiceExecutor")    public Executor asyncServiceExecutor() {        log.info("start asyncServiceExecutor----------------");        //ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        //使用可视化运行状态的线程池        ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();        //配置核心线程数        executor.setCorePoolSize(corePoolSize);        //配置最大线程数        executor.setMaxPoolSize(maxPoolSize);        //配置队列大小        executor.setQueueCapacity(queueCapacity);        //配置线程池中的线程的名称前缀        executor.setThreadNamePrefix(namePrefix);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//增加MDC的线程池修饰类 executor.setTaskDecorator(new MDCTaskDecorator()); //执行初始化 executor.initialize(); log.info("end asyncServiceExecutor------------"); return executor; }
复制代码


AsyncServiceImpl
复制代码


/**     * 使用MDC获取主线程传递的数据     * @return     * @throws InterruptedException     */    @Async("asyncServiceExecutor")    public CompletableFuture<String> executeValueAsync5() throws InterruptedException {        log.info("start executeValueAsync");        System.out.println("异步线程执行返回结果......+");        log.info("end executeValueAsync");        return CompletableFuture.completedFuture(MDC.get("userId"));    }
复制代码


Test2Controller
复制代码


/**     * 使用MDC+TaskDecorator方式     * 本质也是ThreadLocal+TaskDecorator方式     * @return     * @throws InterruptedException     * @throws ExecutionException     */    @GetMapping("/test5")    public String test5() throws InterruptedException, ExecutionException {        MDC.put("userId","123456");        CompletableFuture<String> completableFuture = asyncService.executeValueAsync5();        String s = completableFuture.get();        return s;    }
复制代码

姿势 4:InheritableThreadLocal

用户工具类 UserInheritableUtils

//** *使用InheritableThreadLocal存储线程之间共享的数据变量,如登录的用户信息 */public class UserInheritableUtils {    private static  final  InheritableThreadLocal<String> userLocal=new InheritableThreadLocal<>();
public static String getUserId(){ return userLocal.get(); } public static void setUserId(String userId){ userLocal.set(userId); }
public static void clear(){ userLocal.remove(); }
}
复制代码


AsyncServiceImpl
复制代码


/**     * 使用InheritableThreadLocal获取主线程传递的数据     * @return     * @throws InterruptedException     */    @Async("asyncServiceExecutor")    public CompletableFuture<String> executeValueAsync4() throws InterruptedException {        log.info("start executeValueAsync");        System.out.println("异步线程执行返回结果......+");        log.info("end executeValueAsync");        return CompletableFuture.completedFuture(UserInheritableUtils.getUserId());    }
复制代码


Test2Controller
复制代码


/**     * 使用InheritableThreadLocal方式     * @return     * @throws InterruptedException     * @throws ExecutionException     */    @GetMapping("/test4")    public String test4(@RequestParam("userId") String userId) throws InterruptedException, ExecutionException {        UserInheritableUtils.setUserId(userId);        CompletableFuture<String> completableFuture = asyncService.executeValueAsync4();        String s = completableFuture.get();        return s;    }
复制代码

姿势 5:TransmittableThreadLocal

用户工具类 UserTransmittableUtils

/** *使用TransmittableThreadLocal存储线程之间共享的数据变量,如登录的用户信息 */public class UserTransmittableUtils {    private static  final TransmittableThreadLocal<String> userLocal=new TransmittableThreadLocal<>();
public static String getUserId(){ return userLocal.get(); } public static void setUserId(String userId){ userLocal.set(userId); }
public static void clear(){ userLocal.remove(); }
}
}
复制代码


AsyncServiceImpl
复制代码


/**     * 使用TransmittableThreadLocal获取主线程传递的数据     * @return     * @throws InterruptedException     */    @Async("asyncServiceExecutor")    public CompletableFuture<String> executeValueAsync6() throws InterruptedException {        log.info("start executeValueAsync");        System.out.println("异步线程执行返回结果......+");        log.info("end executeValueAsync");        return CompletableFuture.completedFuture(UserTransmittableUtils.getUserId());    }
复制代码


Test2Controller
复制代码


/**     * 使用TransmittableThreadLocal方式     * @return     * @throws InterruptedException     * @throws ExecutionException     */    @GetMapping("/test6")    public String test6() throws InterruptedException, ExecutionException {        UserTransmittableUtils.setUserId("123456");        CompletableFuture<String> completableFuture = asyncService.executeValueAsync6();        String s = completableFuture.get();        return s;    }
复制代码

maven 依赖

<dependency>            <groupId>com.alibaba</groupId>            <artifactId>transmittable-thread-local</artifactId>            <version>2.12.1</version>        </dependency>
复制代码

方案对比

方案 1,方案 2,方案 3 主要是借助 TaskDecorator 进行父子线程之间传递数据。其中 MDC 方案主要借鉴于 MDC 的日志跟踪的思想来实现,关于 MDC 相关的日志跟踪后续会学习分享

方案 4 和方案 5 使用 InheritableThreadLocal 和 TransmittableThreadLocal 来实现,其中 TransmittableThreadLocal 是阿里 InheritableThreadLocal 进行优化封装。为什么要封装,有兴趣的可以去学习《 加强版 ThreadLocal 之阿里开源 TransmittableThreadLocal 学习 》

本人推荐使用方案 5,哈哈。

简答说一下 InheritableThreadLocal

public static void main(String[] args) {        ThreadPoolExecutor executor = new ThreadPoolExecutor(1,1,1,                TimeUnit.MINUTES,new ArrayBlockingQueue<>(1));
ThreadLocal local = new InheritableThreadLocal(); local.set(1);
executor.execute(()->{ System.out.println("打印1:"+local.get()); }); local.set(2);
System.out.println("打印2:"+local.get());
executor.execute(()->{ System.out.println("打印3:"+local.get()); }); new Thread(new Runnable() { @Override public void run() { System.out.println("打印4:"+local.get()); } }).start(); }
复制代码

运行结果如下

打印2:2打印1:1打印3:1打印4:2
复制代码

分析: 分析打印 3 为什么是 1,InheritableThreadLocal 的继承性是在 new Thread 创建子线程时候在构造函数内把父线程内线程变量拷贝到子线程内部的。为了不在创建新线程耗费资源,我们一般会用线程池,线程池的线程会复用,那么线程中的 ThreadLocal 便不对了,可能是旧的,因为线程是旧的。

用户头像

Java你猿哥

关注

一只在编程路上渐行渐远的程序猿 2023-03-09 加入

关注我,了解更多Java、架构、Spring等知识

评论

发布
暂无评论
SpringBoot异步线程,父子线程数据传递的5种姿势_Java_Java你猿哥_InfoQ写作社区