写点什么

使用 Hystrix 的插件机制,解决在使用线程隔离时,threadlocal 的传递问题

用户头像
极客good
关注
发布于: 刚刚

return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);} else {return fromConstructor;}}


上面我们说了,第一个实参,总是 null,所以,会走这里的 1 处。


com.netflix.hystrix.HystrixThreadPool.Factory#getInstance


static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {String key = threadPoolKey.name();


//1 this should find it for all but the first timeHystrixThreadPool previouslyCached = threadPools.get(key);if (previouslyCached != null) {return previouslyCached;}


//2 if we get here this is the first time so we need to initializesynchronized (HystrixThreadPool.class) {if (!threadPools.containsKey(key)) {// 3threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));}}return threadPools.get(key);}


  • 1 处,会查找缓存,就是前面说的,去 map 中,根据线程池的 key,查找对应的线程池

  • 2 处,没找到,则进行创建

  • 3 处,new HystrixThreadPoolDefault,创建线程池


我们接着看 3 处:


public HystrixThreadPo


【一线大厂Java面试题解析+核心总结学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


olDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {// 1this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);// 2HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();// 3this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,concurrencyStrategy.getThreadPool(threadPoolKey, properties),properties);// 4this.threadPool = this.metrics.getThreadPool();...}


  • 1 处,获取线程池的默认配置,这个就和我们前面说的那个 Setter 里的类似

  • 2 处,从 HystrixPlugins.getInstance()获取一个 HystrixConcurrencyStrategy 类型的对象,保存到局部变量 concurrencyStrategy

  • 3 处,初始化 metrics,这里的第二个参数,是 concurrencyStrategy.getThreadPool 来获取的,这个操作,实际上就会去创建线程池。


com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy#getThreadPool


public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);...final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();final int maxQueueSize = threadPoolProperties.maxQueueSize().get();


...// 1return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);}}


上面的 1 处,会去创建线程池。但是,这里直接就是要了 jdk 的默认线程池类来创建,这还怎么搞?类型都定死了。没法扩展了。。。

发现 hystrix 的插件机制

但是,回过头来,又仔细看了看,这个 getThreadPool 是 HystrixConcurrencyStrategy 类的一个方法,这个方法也是个实例方法。


方法不能改,那,实例能换吗?再看看前面的代码:



ok,那接着分析:


public HystrixConcurrencyStrategy getConcurrencyStrategy() {if (concurrencyStrategy.get() == null) {//1 check for an implementation from Archaius firstObject impl = getPluginImplementation(HystrixConcurrencyStrategy.class);concurrencyStrategy.compareAndSet(null, (HystrixConcurrencyStrategy) impl);}return concurrencyStrategy.get();}


1 处,根据这个类,获取实现,感觉有点戏。


// 1T p = getPluginImplementationViaProperties(pluginClass, dynamicProperties);if (p != null) return p;


// 2return findService(pluginClass, classLoader);}


  • 1 处,从一个动态属性中获取,后来经查,发现是如果集成了 Netflix Archaius 就可以动态获取属性,类似于一个配置中心

  • 2 处,如果前面没找到,就是要 JDK 的 SPI 机制。


private static <T> T findService(Class<T> spi,ClassLoader classLoader) throws ServiceConfigurationError {


ServiceLoader<T> sl = ServiceLoader.load(spi,classLoader);for (T s : sl) {if (s != null)return s;}return null;}


那就好说了。SPI ,我们自定义一个实现,就可以替换掉默认的了,hystrix 做的还是不错,扩展性可以。


现在知道可以自定义 HystrixConcurrencyStrategy 了,那要怎么自定义呢?


这个类,是个抽象类,大体有如下几个方法:


getThreadPool


getBlockingQueue(int maxQueueSize)


Callable<T> wrapCallable(Callable<T> callable)


getRequestVariable(final HystrixRequestVariableLifecycle<T> rv)


说是抽象类,但其实并没有需要我们实现的方法,所有方法都有默认实现,我们只需要重写需要覆盖的方法即可。


我这里,看重了第三个方法:


/**


  • Provides an opportunity to wrap/decorate a {@code Callable<T>} before execution.

  • <p>

  • This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}).

  • <p>

  • <b>Default Implementation</b>

  • <p>

  • Pass-thru that does no wrapping.

  • @param callable

  • @return {@code Callable<T>} either as a pass-thru or wrapping the one given*/public <T> Callable<T> wrapCallable(Callable<T> callable) {return callable;}


方法注释如上,我简单说下,在执行前,提供一个机会,让你去 wrap 这个 callable,即最终要丢到线程池执行的那个 callable。


我们可以 wrap 一下原有的 callable,在执行前,把当前线程的 threadlocal 变量存下来,即为 A,然后设置到 callable 里面去;在 callable 执行的时候,就可以使用我们的 A 中的 threadlocal 来替换掉 worker 线程中的。


多说无益,这里直接看代码:


// 0public class MyHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {


@Overridepublic <T> Callable<T> wrapCallable(Callable<T> callable) {/**


  • 1 获取当前线程的 threadlocalmap*/Object currentThreadlocalMap = getCurrentThreadlocalMap();


Callable<T> finalCallable = new Callable<T>() {// 2private Object callerThreadlocalMap = currentThreadlocalMap;// 3private Callable<T> targetCallable = callable;


@Overridepublic T call() throws Exception {/**


  • 4 将工作线程的原有线程变量保存起来/Object oldThreadlocalMapOfWorkThread = getCurrentThreadlocalMap();/**5 将本线程的线程变量,设置为 caller 的线程变量*/setCurrentThreadlocalMap(callerThreadlocalMap);


try {// 6return targetCallable.call();}finally {// 7setCurrentThreadlocalMap(oldThreadlocalMapOfWorkThread);log.info("restore work thread's threadlocal");}


}};


return finalCallable;}


  • 0 处,自定义了一个类,继承 HystrixConcurrencyStrategy,准备覆盖其默认的 wrap 方法

  • 1 处,获取外部线程的 threadlocal

  • 2 处,3 处,这里已经是处于匿名内部类了,定义了 2 个 field,分别存放 1 中的外部线程的 threadlocal,以及要 wrap 的 callable

  • 4 处,此时已经处于 run 方法的执行逻辑了:保存 worker 线程的自身的线程局部变量

  • 5 处,使用外部线程的 threadlocal 覆盖自身的

  • 6 处,调用真正的业务逻辑

  • 7 处,恢复为线程自身的 threadlocal


获取线程的 threadlocal 的代码:


private Object getCurrentThreadlocalMap() {Thread thread = Thread.currentThread();try {Field field = Thread.class.getDeclaredField("threadLocals");field.setAccessible(true);Object o = field.get(thread);return o;} catch (NoSuchFieldException | IllegalAccessException e) {log.error("{}",e);}return null;}


设置线程的 threadlocal 的代码:


private void setCurrentThreadlocalMap(Object newThreadLocalMap) {Thread thread = Thread.currentThread();try {Field field = Thread.class.getDeclaredField("threadLocals");field.setAccessible(true);field.set(thread,newThreadLocalMap);


} catch (NoSuchFieldException | IllegalAccessException e) {log.error("{}",e);}}

插件机制的相关资料

[https://github.com/Netflix/Hystrix/wiki/Plugins](


)

运行效果

controller 代码

@RequestMapping("/")public String hystrixOrder () {// 1SessionUtils.getSessionVOFromRedisAndPut2ThreadLocal();// 2SimpleHystrixCommand simpleHystrixCommand = new SimpleHystrixCommand(testService);String res = simpleHystrixCommand.execute();return res;}


  • 1 处,设置 ThreadLocal 变量


public static UserVO getSessionVOFromRedisAndPut2ThreadLocal() {UserVO userVO = new UserVO();userVO.setUserName("test user");


RequestContextHolder.set(userVO);log.info("set thread local:{} to context",userVO);


return userVO;}


  • 2 处,new 了一个 HystrixCommand,然后 execute 执行

command 中代码

public class SimpleHystrixCommand extends HystrixCommand<String> {

用户头像

极客good

关注

还未添加个人签名 2021.03.18 加入

还未添加个人简介

评论

发布
暂无评论
使用Hystrix的插件机制,解决在使用线程隔离时,threadlocal的传递问题