写点什么

利用 DUCC 配置平台实现一个动态化线程池

  • 2023-02-16
    北京
  • 本文字数:5558 字

    阅读完需:约 18 分钟

利用DUCC配置平台实现一个动态化线程池

作者:京东零售 张宾

1.背景

在后台开发中,会经常用到线程池技术,对于线程池核心参数的配置很大程度上依靠经验。然而,由于系统运行过程中存在的不确定性,我们很难一劳永逸地规划一个合理的线程池参数。在对线程池配置参数进行调整时,一般需要对服务进行重启,这样修改的成本就会偏高。一种解决办法就是,将线程池的配置放到配置平台侧,系统运行期间开发人员根据系统运行情况对核心参数进行动态配置。


本文以公司 DUCC 配置平台作为服务配置中心,以修改线程池核心线程数、最大线程数为例,实现一个简单的动态化线程池。

2.代码实现

当前项目中使用的是 Spring 框架提供的线程池类 ThreadPoolTaskExecutor,而 ThreadPoolTaskExecutor 底层又使用里了 JDK 中线程池类 ThreadPoolExecutor,线程池类 ThreadPoolExecutor 有两个成员方法 setCorePoolSize、setMaximumPoolSize 可以在运行时设置核心线程数和最大线程数。


setCorePoolSize 方法执行流程是:首先会覆盖之前构造函数设置的 corePoolSize,然后,如果新的值比原始值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁,如果新的值比原来的值要大且工作队列不为空,则会创建新的工作线程。流程图如下:



setMaximumPoolSize 方法: 首先会覆盖之前构造函数设置的 maximumPoolSize,然后,如果新的值比原来的值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁。


Spring 框架提供的线程池类 ThreadPoolTaskExecutor,此类封装了对 ThreadPoolExecutor 有两个成员方法 setCorePoolSize、setMaximumPoolSize 的调用。



基于以上源代码分析,要实现一个简单的动态线程池需要以下几步:


(1)定义一个动态线程池类,继承 ThreadPoolTaskExecutor,目的跟非动态配置的线程池类 ThreadPoolTaskExecutor 区分开;


(2)定义和实现一个动态线程池配置定时刷的类,目的定时对比 ducc 配置的线程池数和本地应用中线程数是否一致,若不一致,则更新本地动态线程池线程池数;


(3)引入公司 ducc 配置平台相关 jar 包并创建一个动态线程池配置 key;


(4)定义和实现一个应用启动后根据动态线程池 Bean 和从 ducc 配置平台拉取配置刷新应用中的线程数配置;


接下来代码一一实现:


(1)动态线程池类


/** * 动态线程池 * */public class DynamicThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {}
复制代码


(2)动态线程池配置定时刷新类


@Slf4jpublic class DynamicThreadPoolRefresh implements InitializingBean {    /**     * Maintain all automatically registered and manually registered DynamicThreadPoolTaskExecutor.     */    private static final ConcurrentMap<String, DynamicThreadPoolTaskExecutor> DTP_REGISTRY = new ConcurrentHashMap<>();
/** * @param threadPoolBeanName * @param threadPoolTaskExecutor */ public static void registerDynamicThreadPool(String threadPoolBeanName, DynamicThreadPoolTaskExecutor threadPoolTaskExecutor) { log.info("DynamicThreadPool register ThreadPoolTaskExecutor, threadPoolBeanName: {}, executor: {}", threadPoolBeanName, ExecutorConverter.convert(threadPoolBeanName, threadPoolTaskExecutor.getThreadPoolExecutor())); DTP_REGISTRY.putIfAbsent(threadPoolBeanName, threadPoolTaskExecutor); }
@Override public void afterPropertiesSet() throws Exception { this.refresh(); //创建定时任务线程池 ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, (new BasicThreadFactory.Builder()).namingPattern("DynamicThreadPoolRefresh-%d").daemon(true).build()); //延迟1秒执行,每个1分钟check一次 executorService.scheduleAtFixedRate(new RefreshThreadPoolConfig(), 1000L, 60000L, TimeUnit.MILLISECONDS); }
private void refresh() { String dynamicThreadPool = ""; try { if (DTP_REGISTRY.isEmpty()) { log.debug("DynamicThreadPool refresh DTP_REGISTRY is empty"); return; } dynamicThreadPool = DuccConfigUtil.getValue(DuccConfigConstants.DYNAMIC_THREAD_POOL); if (StringUtils.isBlank(dynamicThreadPool)) { log.debug("DynamicThreadPool refresh dynamicThreadPool not config"); return; } log.debug("DynamicThreadPool refresh dynamicThreadPool:{}", dynamicThreadPool); List<ThreadPoolProperties> threadPoolPropertiesList = JsonUtil.json2Object(dynamicThreadPool, new TypeReference<List<ThreadPoolProperties>>() { }); if (CollectionUtils.isEmpty(threadPoolPropertiesList)) { log.error("DynamicThreadPool refresh dynamicThreadPool json2Object error!{}", dynamicThreadPool); return; } for (ThreadPoolProperties properties : threadPoolPropertiesList) { doRefresh(properties); } } catch (Exception e) { log.error("DynamicThreadPool refresh exception!dynamicThreadPool:{}", dynamicThreadPool, e); } }
/** * @param properties */ private void doRefresh(ThreadPoolProperties properties) { if (StringUtils.isBlank(properties.getThreadPoolBeanName()) || properties.getCorePoolSize() < 1 || properties.getMaxPoolSize() < 1 || properties.getMaxPoolSize() < properties.getCorePoolSize()) { log.error("DynamicThreadPool refresh, invalid parameters exist, properties: {}", properties); return; } DynamicThreadPoolTaskExecutor threadPoolTaskExecutor = DTP_REGISTRY.get(properties.getThreadPoolBeanName()); if (Objects.isNull(threadPoolTaskExecutor)) { log.warn("DynamicThreadPool refresh, DTP_REGISTRY not found {}", properties.getThreadPoolBeanName()); return; } ThreadPoolProperties oldProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor()); if (Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize()) && Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) { log.warn("DynamicThreadPool refresh, properties of [{}] have not changed.", properties.getThreadPoolBeanName()); return; } if (!Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())) { threadPoolTaskExecutor.setCorePoolSize(properties.getCorePoolSize()); log.info("DynamicThreadPool refresh, corePoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getCorePoolSize()); } if (!Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) { threadPoolTaskExecutor.setMaxPoolSize(properties.getMaxPoolSize()); log.info("DynamicThreadPool refresh, maxPoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getMaxPoolSize()); } ThreadPoolProperties newProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor()); log.info("DynamicThreadPool refresh result!{} oldProp:{},newProp:{}", properties.getThreadPoolBeanName(), oldProp, newProp); }
private class RefreshThreadPoolConfig extends TimerTask { private RefreshThreadPoolConfig() { }
@Override public void run() { DynamicThreadPoolRefresh.this.refresh(); } }
}
复制代码


线程池配置类


@Datapublic class ThreadPoolProperties {    /**     * 线程池名称     */    private String threadPoolBeanName;    /**     * 线程池核心线程数量     */    private int corePoolSize;    /**     * 线程池最大线程池数量     */    private int maxPoolSize;}
复制代码


(3)引入公司 ducc 配置平台相关 jar 包并创建一个动态线程池配置 key


ducc 配置平台使用见:https://cf.jd.com/pages/viewpage.action?pageId=403477057


动态线程池配置 key:dynamic.thread.pool


配置 value:


[  {    "threadPoolBeanName": "submitOrderThreadPoolTaskExecutor",    "corePoolSize": 32,    "maxPoolSize": 128  }]
复制代码


(4) 应用启动刷新应用本地动态线程池配置


@Slf4jpublic class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
@Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (bean instanceof DynamicThreadPoolTaskExecutor) { DynamicThreadPoolRefresh.registerDynamicThreadPool(beanName, (DynamicThreadPoolTaskExecutor) bean); } return bean; }}
复制代码

3.动态线程池应用

动态线程池 Bean 声明


    <!-- 普通线程池 -->    <bean id="threadPoolTaskExecutor" class="com.jd.concurrent.ThreadPoolTaskExecutorWrapper">        <!-- 核心线程数,默认为 -->        <property name="corePoolSize" value="128"/>        <!-- 最大线程数,默认为Integer.MAX_VALUE -->        <property name="maxPoolSize" value="512"/>        <!-- 队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE -->        <property name="queueCapacity" value="500"/>        <!-- 线程池维护线程所允许的空闲时间,默认为60s -->        <property name="keepAliveSeconds" value="60"/>        <!-- 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者 -->        <property name="rejectedExecutionHandler">            <!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->            <!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 -->            <!-- DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->            <!-- DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>        </property>    </bean>    <!-- 动态线程池 -->    <bean id="submitOrderThreadPoolTaskExecutor" class="com.jd.concurrent.DynamicThreadPoolTaskExecutor">        <!-- 核心线程数,默认为 -->        <property name="corePoolSize" value="32"/>        <!-- 最大线程数,默认为Integer.MAX_VALUE -->        <property name="maxPoolSize" value="128"/>        <!-- 队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE -->        <property name="queueCapacity" value="500"/>        <!-- 线程池维护线程所允许的空闲时间,默认为60s -->        <property name="keepAliveSeconds" value="60"/>        <!-- 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者 -->        <property name="rejectedExecutionHandler">            <!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->            <!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 -->            <!-- DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->            <!-- DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>        </property>    </bean>    <!-- 动态线程池刷新配置 -->    <bean class="com.jd.concurrent.DynamicThreadPoolPostProcessor"/>    <bean class="com.jd.concurrent.DynamicThreadPoolRefresh"/>
复制代码


业务类注入 Spring Bean 后,直接使用即可


 @Resource private ThreadPoolTaskExecutor submitOrderThreadPoolTaskExecutor;
Runnable asyncTask = ()->{...}; CompletableFuture.runAsync(asyncTask, this.submitOrderThreadPoolTaskExecutor);
复制代码

4.小结

本文从实际项目的业务痛点场景出发,并基于公司已有的 ducc 配置平台简单实现了线程池线程数量可配置。

发布于: 刚刚阅读数: 4
用户头像

拥抱技术,与开发者携手创造未来! 2018-11-20 加入

我们将持续为人工智能、大数据、云计算、物联网等相关领域的开发者,提供技术干货、行业技术内容、技术落地实践等文章内容。京东云开发者社区官方网站【https://developer.jdcloud.com/】,欢迎大家来玩

评论

发布
暂无评论
利用DUCC配置平台实现一个动态化线程池_spring_京东科技开发者_InfoQ写作社区