@Slf4j
public class DynamicThreadPoolRefresh implements InitializingBean {
/**
* Maintain all automatically registered and manually registered DynamicThreadPoolTaskExecutor.
*/
private static final ConcurrentMap<String, DynamicThreadPoolTaskExecutor> DTP_REGISTRY = new ConcurrentHashMap<>();
/**
* @param threadPoolBeanName
* @param threadPoolTaskExecutor
*/
public static void registerDynamicThreadPool(String threadPoolBeanName, DynamicThreadPoolTaskExecutor threadPoolTaskExecutor) {
log.info("DynamicThreadPool register ThreadPoolTaskExecutor, threadPoolBeanName: {}, executor: {}", threadPoolBeanName, ExecutorConverter.convert(threadPoolBeanName, threadPoolTaskExecutor.getThreadPoolExecutor()));
DTP_REGISTRY.putIfAbsent(threadPoolBeanName, threadPoolTaskExecutor);
}
@Override
public void afterPropertiesSet() throws Exception {
this.refresh();
//创建定时任务线程池
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, (new BasicThreadFactory.Builder()).namingPattern("DynamicThreadPoolRefresh-%d").daemon(true).build());
//延迟1秒执行,每个1分钟check一次
executorService.scheduleAtFixedRate(new RefreshThreadPoolConfig(), 1000L, 60000L, TimeUnit.MILLISECONDS);
}
private void refresh() {
String dynamicThreadPool = "";
try {
if (DTP_REGISTRY.isEmpty()) {
log.debug("DynamicThreadPool refresh DTP_REGISTRY is empty");
return;
}
dynamicThreadPool = DuccConfigUtil.getValue(DuccConfigConstants.DYNAMIC_THREAD_POOL);
if (StringUtils.isBlank(dynamicThreadPool)) {
log.debug("DynamicThreadPool refresh dynamicThreadPool not config");
return;
}
log.debug("DynamicThreadPool refresh dynamicThreadPool:{}", dynamicThreadPool);
List<ThreadPoolProperties> threadPoolPropertiesList = JsonUtil.json2Object(dynamicThreadPool, new TypeReference<List<ThreadPoolProperties>>() {
});
if (CollectionUtils.isEmpty(threadPoolPropertiesList)) {
log.error("DynamicThreadPool refresh dynamicThreadPool json2Object error!{}", dynamicThreadPool);
return;
}
for (ThreadPoolProperties properties : threadPoolPropertiesList) {
doRefresh(properties);
}
} catch (Exception e) {
log.error("DynamicThreadPool refresh exception!dynamicThreadPool:{}", dynamicThreadPool, e);
}
}
/**
* @param properties
*/
private void doRefresh(ThreadPoolProperties properties) {
if (StringUtils.isBlank(properties.getThreadPoolBeanName())
|| properties.getCorePoolSize() < 1
|| properties.getMaxPoolSize() < 1
|| properties.getMaxPoolSize() < properties.getCorePoolSize()) {
log.error("DynamicThreadPool refresh, invalid parameters exist, properties: {}", properties);
return;
}
DynamicThreadPoolTaskExecutor threadPoolTaskExecutor = DTP_REGISTRY.get(properties.getThreadPoolBeanName());
if (Objects.isNull(threadPoolTaskExecutor)) {
log.warn("DynamicThreadPool refresh, DTP_REGISTRY not found {}", properties.getThreadPoolBeanName());
return;
}
ThreadPoolProperties oldProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
if (Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())
&& Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
log.warn("DynamicThreadPool refresh, properties of [{}] have not changed.", properties.getThreadPoolBeanName());
return;
}
if (!Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())) {
threadPoolTaskExecutor.setCorePoolSize(properties.getCorePoolSize());
log.info("DynamicThreadPool refresh, corePoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getCorePoolSize());
}
if (!Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
threadPoolTaskExecutor.setMaxPoolSize(properties.getMaxPoolSize());
log.info("DynamicThreadPool refresh, maxPoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getMaxPoolSize());
}
ThreadPoolProperties newProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
log.info("DynamicThreadPool refresh result!{} oldProp:{},newProp:{}", properties.getThreadPoolBeanName(), oldProp, newProp);
}
private class RefreshThreadPoolConfig extends TimerTask {
private RefreshThreadPoolConfig() {
}
@Override
public void run() {
DynamicThreadPoolRefresh.this.refresh();
}
}
}
评论