使用 Hystrix 的插件机制,解决在使用线程隔离时,threadlocal 的传递问题
return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);} else {return fromConstructor;}}
上面我们说了,第一个实参,总是 null,所以,会走这里的 1 处。
com.netflix.hystrix.HystrixThreadPool.Factory#getInstance
static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {String key = threadPoolKey.name();
//1 this should find it for all but the first timeHystrixThreadPool previouslyCached = threadPools.get(key);if (previouslyCached != null) {return previouslyCached;}
//2 if we get here this is the first time so we need to initializesynchronized (HystrixThreadPool.class) {if (!threadPools.containsKey(key)) {// 3threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));}}return threadPools.get(key);}
1 处,会查找缓存,就是前面说的,去 map 中,根据线程池的 key,查找对应的线程池
2 处,没找到,则进行创建
3 处,new HystrixThreadPoolDefault,创建线程池
我们接着看 3 处:
public HystrixThreadPo
olDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {// 1this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);// 2HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();// 3this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,concurrencyStrategy.getThreadPool(threadPoolKey, properties),properties);// 4this.threadPool = this.metrics.getThreadPool();...}
1 处,获取线程池的默认配置,这个就和我们前面说的那个 Setter 里的类似
2 处,从 HystrixPlugins.getInstance()获取一个 HystrixConcurrencyStrategy 类型的对象,保存到局部变量 concurrencyStrategy
3 处,初始化 metrics,这里的第二个参数,是 concurrencyStrategy.getThreadPool 来获取的,这个操作,实际上就会去创建线程池。
com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy#getThreadPool
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);...final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
...// 1return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);}}
上面的 1 处,会去创建线程池。但是,这里直接就是要了 jdk 的默认线程池类来创建,这还怎么搞?类型都定死了。没法扩展了。。。
发现 hystrix 的插件机制
但是,回过头来,又仔细看了看,这个 getThreadPool 是 HystrixConcurrencyStrategy 类的一个方法,这个方法也是个实例方法。
方法不能改,那,实例能换吗?再看看前面的代码:
ok,那接着分析:
public HystrixConcurrencyStrategy getConcurrencyStrategy() {if (concurrencyStrategy.get() == null) {//1 check for an implementation from Archaius firstObject impl = getPluginImplementation(HystrixConcurrencyStrategy.class);concurrencyStrategy.compareAndSet(null, (HystrixConcurrencyStrategy) impl);}return concurrencyStrategy.get();}
1 处,根据这个类,获取实现,感觉有点戏。
// 1T p = getPluginImplementationViaProperties(pluginClass, dynamicProperties);if (p != null) return p;
// 2return findService(pluginClass, classLoader);}
1 处,从一个动态属性中获取,后来经查,发现是如果集成了 Netflix Archaius 就可以动态获取属性,类似于一个配置中心
2 处,如果前面没找到,就是要 JDK 的 SPI 机制。
private static <T> T findService(Class<T> spi,ClassLoader classLoader) throws ServiceConfigurationError {
ServiceLoader<T> sl = ServiceLoader.load(spi,classLoader);for (T s : sl) {if (s != null)return s;}return null;}
那就好说了。SPI ,我们自定义一个实现,就可以替换掉默认的了,hystrix 做的还是不错,扩展性可以。
现在知道可以自定义 HystrixConcurrencyStrategy 了,那要怎么自定义呢?
这个类,是个抽象类,大体有如下几个方法:
getThreadPool
getBlockingQueue(int maxQueueSize)
Callable<T> wrapCallable(Callable<T> callable)
getRequestVariable(final HystrixRequestVariableLifecycle<T> rv)
说是抽象类,但其实并没有需要我们实现的方法,所有方法都有默认实现,我们只需要重写需要覆盖的方法即可。
我这里,看重了第三个方法:
/**
Provides an opportunity to wrap/decorate a {@code Callable<T>} before execution.
<p>
This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}).
<p>
<b>Default Implementation</b>
<p>
Pass-thru that does no wrapping.
@param callable
@return {@code Callable<T>} either as a pass-thru or wrapping the one given*/public <T> Callable<T> wrapCallable(Callable<T> callable) {return callable;}
方法注释如上,我简单说下,在执行前,提供一个机会,让你去 wrap 这个 callable,即最终要丢到线程池执行的那个 callable。
我们可以 wrap 一下原有的 callable,在执行前,把当前线程的 threadlocal 变量存下来,即为 A,然后设置到 callable 里面去;在 callable 执行的时候,就可以使用我们的 A 中的 threadlocal 来替换掉 worker 线程中的。
多说无益,这里直接看代码:
// 0public class MyHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
@Overridepublic <T> Callable<T> wrapCallable(Callable<T> callable) {/**
1 获取当前线程的 threadlocalmap*/Object currentThreadlocalMap = getCurrentThreadlocalMap();
Callable<T> finalCallable = new Callable<T>() {// 2private Object callerThreadlocalMap = currentThreadlocalMap;// 3private Callable<T> targetCallable = callable;
@Overridepublic T call() throws Exception {/**
4 将工作线程的原有线程变量保存起来/Object oldThreadlocalMapOfWorkThread = getCurrentThreadlocalMap();/**5 将本线程的线程变量,设置为 caller 的线程变量*/setCurrentThreadlocalMap(callerThreadlocalMap);
try {// 6return targetCallable.call();}finally {// 7setCurrentThreadlocalMap(oldThreadlocalMapOfWorkThread);log.info("restore work thread's threadlocal");}
}};
return finalCallable;}
0 处,自定义了一个类,继承 HystrixConcurrencyStrategy,准备覆盖其默认的 wrap 方法
1 处,获取外部线程的 threadlocal
2 处,3 处,这里已经是处于匿名内部类了,定义了 2 个 field,分别存放 1 中的外部线程的 threadlocal,以及要 wrap 的 callable
4 处,此时已经处于 run 方法的执行逻辑了:保存 worker 线程的自身的线程局部变量
5 处,使用外部线程的 threadlocal 覆盖自身的
6 处,调用真正的业务逻辑
7 处,恢复为线程自身的 threadlocal
获取线程的 threadlocal 的代码:
private Object getCurrentThreadlocalMap() {Thread thread = Thread.currentThread();try {Field field = Thread.class.getDeclaredField("threadLocals");field.setAccessible(true);Object o = field.get(thread);return o;} catch (NoSuchFieldException | IllegalAccessException e) {log.error("{}",e);}return null;}
设置线程的 threadlocal 的代码:
private void setCurrentThreadlocalMap(Object newThreadLocalMap) {Thread thread = Thread.currentThread();try {Field field = Thread.class.getDeclaredField("threadLocals");field.setAccessible(true);field.set(thread,newThreadLocalMap);
} catch (NoSuchFieldException | IllegalAccessException e) {log.error("{}",e);}}
插件机制的相关资料
[https://github.com/Netflix/Hystrix/wiki/Plugins](
)
运行效果
controller 代码
@RequestMapping("/")public String hystrixOrder () {// 1SessionUtils.getSessionVOFromRedisAndPut2ThreadLocal();// 2SimpleHystrixCommand simpleHystrixCommand = new SimpleHystrixCommand(testService);String res = simpleHystrixCommand.execute();return res;}
1 处,设置 ThreadLocal 变量
public static UserVO getSessionVOFromRedisAndPut2ThreadLocal() {UserVO userVO = new UserVO();userVO.setUserName("test user");
RequestContextHolder.set(userVO);log.info("set thread local:{} to context",userVO);
return userVO;}
2 处,new 了一个 HystrixCommand,然后 execute 执行
command 中代码
public class SimpleHystrixCommand extends HystrixCommand<String> {
评论