写点什么

美团动态线程池实践思路,开源了

作者:yanhom
  • 2022 年 2 月 15 日
  • 本文字数:7365 字

    阅读完需:约 24 分钟

美团动态线程池实践思路,开源了

大家好,今天我们来聊一个比较实用的话题,动态可监控的线程池实践,全新开源项目(DynamicTp)地址在文章末尾,欢迎交流学习。



写在前面

稍微有些 Java 编程经验的小伙伴都知道,Java 的精髓在 juc 包,这是大名鼎鼎的 Doug Lea 老爷子的杰作,评价一个程序员 Java 水平怎么样,一定程度上看他对 juc 包下的一些技术掌握的怎么样,这也是面试中的基本上必问的一些技术点之一。


juc 包主要包括:


1.原子类(AtomicXXX)

2.锁类(XXXLock)

3.线程同步类(AQS、CountDownLatch、CyclicBarrier、Semaphore、Exchanger)

4.任务执行器类(Executor 体系类,包括今天的主角 ThreadPoolExecutor)

5.并发集合类(ConcurrentXXX、CopyOnWriteXXX)相关集合类

6.阻塞队列类(BlockingQueue 继承体系类)

7.Future 相关类

8.其他一些辅助工具类


多线程编程场景下,这些类都是必备技能,会这些可以帮助我们写出高质量、高性能、少 bug 的代码,同时这些也是 Java 中比较难啃的一些技术,需要持之以恒,学以致用,在使用中感受他们带来的奥妙。


上边简单罗列了下 juc 包下功能分类,这篇文章我们主要来介绍动态可监控线程池的,所以具体内容也就不展开讲了,以后有时间单独来聊吧。看这篇文章前,希望读者最好有一定的线程池 ThreadPoolExecutor 使用经验,不然看起来会有点懵。


如果你对 ThreadPoolExecutor 不是很熟悉,推荐阅读下面两篇文章


javadoop: https://www.javadoop.com/post/java-thread-pool


美团技术博客: https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html



背景

使用 ThreadPoolExecutor 过程中你是否有以下痛点呢?


1.代码中创建了一个 ThreadPoolExecutor,但是不知道那几个核心参数设置多少比较合适

2.凭经验设置参数值,上线后发现需要调整,改代码重启服务,非常麻烦

3.线程池相对开发人员来说是个黑盒,运行情况不能感知到,直到出现问题


如果你有以上痛点,这篇文章要介绍的动态可监控线程池(DynamicTp)或许能帮助到你。


如果看过 ThreadPoolExecutor 的源码,大概可以知道其实它有提供一些 set 方法,可以在运行时动态去修改相应的值,这些方法有:


public void setCorePoolSize(int corePoolSize);public void setMaximumPoolSize(int maximumPoolSize);public void setKeepAliveTime(long time, TimeUnit unit);public void setThreadFactory(ThreadFactory threadFactory);public void setRejectedExecutionHandler(RejectedExecutionHandler handler);
复制代码


现在大多数的互联网项目其实都会微服务化部署,有一套自己的服务治理体系,微服务组件中的分布式配置中心扮演的就是动态修改配置,实时生效的角色。那么我们是否可以结合配置中心来做运行时线程池参数的动态调整呢?答案是肯定的,而且配置中心相对都是高可用的,使用它也不用过于担心配置推送出现问题这类事儿,而且也能减少研发动态线程池组件的难度和工作量。


综上,我们总结出以下的背景


  • 广泛性:在 Java 开发中,想要提高系统性能,线程池已经是一个 90%以上的人都会选择使用的基础工具

  • 不确定性:项目中可能会创建很多线程池,既有 IO 密集型的,也有 CPU 密集型的,但线程池的参数并不好确定;需要有套机制在运行过程中动态去调整参数

  • 无感知性,线程池运行过程中的各项指标一般感知不到;需要有套监控报警机制在事前、事中就能让开发人员感知到线程池的运行状况,及时处理

  • 高可用性,配置变更需要及时推送到客户端;需要有高可用的配置管理推送服务,配置中心是现在大多数互联网系统都会使用的组件,与之结合可以大幅度减少开发量及接入难度



简介

我们基于配置中心对线程池 ThreadPoolExecutor 做一些扩展,实现对运行中线程池参数的动态修改,实时生效;以及实时监控线程池的运行状态,触发设置的报警策略时报警,报警信息会推送办公平台(钉钉、企微等)。报警维度包括(队列容量、线程池活性、拒绝触发等);同时也会定时采集线程池指标数据供监控平台可视化使用。使我们能时刻感知到线程池的负载,根据情况及时调整,避免出现问题影响线上业务。


    |  __ \                            (_) |__   __|    | |  | |_   _ _ __   __ _ _ __ ___  _  ___| |_ __      | |  | | | | | '_ \ / _` | '_ ` _ | |/ __| | '_ \     | |__| | |_| | | | | (_| | | | | | | | (__| | |_) |    |_____/ __, |_| |_|__,_|_| |_| |_|_|___|_| .__/              __/ |                              | |                |___/                               |_|         :: Dynamic Thread Pool :: 
复制代码


特性


  • 参考美团线程池实践 ,对线程池参数动态化管理,增加监控、报警功能

  • 基于 Spring 框架,现只支持 SpringBoot 项目使用,轻量级,引入 starter 即可食用

  • 基于配置中心实现线程池参数动态调整,实时生效;集成主流配置中心,默认支持 Nacos、Apollo,同时也提供 SPI 接口可自定义扩展实现

  • 内置通知报警功能,提供多种报警维度(配置变更通知、活性报警、容量阈值报警、拒绝策略触发报警),默认支持企业微信、钉钉报警,同时提供 SPI 接口可自定义扩展实现

  • 内置线程池指标采集功能,支持通过 MicroMeter、JsonLog 日志输出、Endpoint 三种方式,可通过 SPI 接口自定义扩展实现

  • 集成管理常用第三方组件的线程池,已集成 SpringBoot 内置 WebServer(Tomcat、Undertow、Jetty)的线程池管理



架构设计

主要分四大模块


  • 配置变更监听模块:

  • 1.监听特定配置中心的指定配置文件(默认实现 Nacos、Apollo),可通过内部提供的 SPI 接口扩展其他实现

  • 2.解析配置文件内容,内置实现 yml、properties 配置文件的解析,可通过内部提供的 SPI 接口扩展其他实现

  • 3.通知线程池管理模块实现刷新

  • 线程池管理模块:

  • 1.服务启动时从配置中心拉取配置信息,生成线程池实例注册到内部线程池注册中心中

  • 2.监听模块监听到配置变更时,将变更信息传递给管理模块,实现线程池参数的刷新

  • 3.代码中通过 getExecutor()方法根据线程池名称来获取线程池对象实例

  • 监控模块:

  • 实现监控指标采集以及输出,默认提供以下三种方式,也可通过内部提供的 SPI 接口扩展其他实现

  • 1.默认实现 Json log 输出到磁盘

  • 2.MicroMeter 采集,引入 MicroMeter 相关依赖

  • 3.暴雷 Endpoint 端点,可通过 http 方式访问

  • 通知告警模块:

  • 对接办公平台,实现通告告警功能,默认实现钉钉、企微,可通过内部提供的 SPI 接口扩展其他实现,通知告警类型如下

  • 1.线程池参数变更通知

  • 2.阻塞队列容量达到设置阈值告警

  • 3.线程池活性达到设置阈值告警

  • 4.触发拒绝策略告警




使用

  • maven 依赖


  1. apollo 应用用接入用此依赖


        <dependency>            <groupId>io.github.lyh200</groupId>            <artifactId>dynamic-tp-spring-boot-starter-apollo</artifactId>            <version>1.0.1</version>        </dependency>
复制代码


  1. spring-cloud 场景下的 nacos 应用接入用此依赖


        <dependency>            <groupId>io.github.lyh200</groupId>            <artifactId>dynamic-tp-spring-cloud-starter-nacos</artifactId>            <version>1.0.1</version>        </dependency>
复制代码


  1. 非 spring-cloud 场景下的 nacos 应用接入用此依赖


        <dependency>            <groupId>io.github.lyh200</groupId>            <artifactId>dynamic-tp-spring-boot-starter-nacos</artifactId>            <version>1.0.1</version>        </dependency>
复制代码


  • 线程池配置


    spring:      dynamic:        tp:          enabled: true          enabledBanner: true        # 是否开启banner打印,默认true          enabledCollect: false      # 是否开启监控指标采集,默认false          collectorType: logging     # 监控数据采集器类型(JsonLog | MicroMeter),默认logging          logPath: /home/logs        # 监控日志数据路径,默认${user.home}/logs          monitorInterval: 5         # 监控时间间隔(报警判断、指标采集),默认5s          nacos:                     # nacos配置,不配置有默认值(规则name-dev.yml这样)            dataId: dynamic-tp-demo-dev.yml            group: DEFAULT_GROUP          apollo:                    # apollo配置,不配置默认拿apollo配置第一个namespace            namespace: dynamic-tp-demo-dev.yml          configType: yml            # 配置文件类型          platforms:                 # 通知报警平台配置            - platform: wechat              urlKey: 3a7500-1287-4bd-a798-c5c3d8b69c  # 替换              receivers: test1,test2                   # 接受人企微名称            - platform: ding              urlKey: f80dad441fcd655438f4a08dcd6a     # 替换              secret: SECb5441fa6f375d5b9d21           # 替换,非sign模式可以没有此值              receivers: 15810119805                   # 钉钉账号手机号              tomcatTp:                                    # tomcat web server线程池配置              minSpare: 100              max: 400                jettyTp:                                     # jetty web server线程池配置              min: 100              max: 400               undertowTp:                                  # undertow web server线程池配置              ioThreads: 100              workerThreads: 400                executors:                                   # 动态线程池配置            - threadPoolName: dynamic-tp-test-1              corePoolSize: 6              maximumPoolSize: 8              queueCapacity: 200              queueType: VariableLinkedBlockingQueue   # 任务队列,查看源码QueueTypeEnum枚举类              rejectedHandlerType: CallerRunsPolicy    # 拒绝策略,查看RejectedTypeEnum枚举类              keepAliveTime: 50              allowCoreThreadTimeOut: false              threadNamePrefix: test           # 线程名前缀              notifyItems:                     # 报警项,不配置自动会配置(变更通知、容量报警、活性报警、拒绝报警)                - type: capacity               # 报警项类型,查看源码 NotifyTypeEnum枚举类                  enabled: true                  threshold: 80                # 报警阈值                  platforms: [ding,wechat]     # 可选配置,不配置默认拿上层platforms配置的所以平台                  interval: 120                # 报警间隔(单位:s)                - type: change                  enabled: true                - type: liveness                  enabled: true                  threshold: 80                - type: reject                  enabled: true                  threshold: 1
复制代码


  • 代码方式生成,服务启动会自动注册


    @Configuration    public class DtpConfig {
@Bean public DtpExecutor demo1Executor() { return DtpCreator.createDynamicFast("demo1-executor"); }
@Bean public ThreadPoolExecutor demo2Executor() { return ThreadPoolBuilder.newBuilder() .threadPoolName("demo2-executor") .corePoolSize(8) .maximumPoolSize(16) .keepAliveTime(50) .allowCoreThreadTimeOut(true) .workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, false) .rejectedExecutionHandler(RejectedTypeEnum.CALLER_RUNS_POLICY.getName()) .buildDynamic(); } }
复制代码


  • 代码调用,根据线程池名称获取


    public static void main(String[] args) {           DtpExecutor dtpExecutor = DtpRegistry.getExecutor("dynamic-tp-test-1");           dtpExecutor.execute(() -> System.out.println("test"));    }
复制代码



注意事项

  1. 配置文件配置的参数会覆盖通过代码生成方式配置的参数

  2. 阻塞队列只有 VariableLinkedBlockingQueue 类型可以修改 capacity,该类型功能和 LinkedBlockingQueue 相似,只是 capacity 不是 final 类型,可以修改,VariableLinkedBlockingQueue 参考 RabbitMq 的实现

  3. 启动看到如下日志输出证明接入成功



| __ \ (_) |__ __| | | | |_ _ _ __ __ _ _ __ ___ _ ___| |_ __ | | | | | | | '_ \ / _` | '_ ` _ | |/ __| | '_ \ | |__| | |_| | | | | (_| | | | | | | | (__| | |_) | |_____/ __, |_| |_|__,_|_| |_| |_|_|___|_| .__/ __/ | | | |___/ |_| :: Dynamic Thread Pool ::
DynamicTp register, executor: DtpMainPropWrapper(dtpName=dynamic-tp-test-1, corePoolSize=6, maxPoolSize=8, keepAliveTime=50, queueType=VariableLinkedBlockingQueue, queueCapacity=200, rejectType=RejectedCountableCallerRunsPolicy, allowCoreThreadTimeOut=false)
复制代码


  1. 配置变更会推送通知消息,且会高亮变更的字段



DynamicTp [dynamic-tp-test-2] refresh end, changed keys: [corePoolSize, queueCapacity], corePoolSize: [6 => 4], maxPoolSize: [8 => 8], queueType: [VariableLinkedBlockingQueue => VariableLinkedBlockingQueue], queueCapacity: [200 => 2000], keepAliveTime: [50s => 50s], rejectedType: [CallerRunsPolicy => CallerRunsPolicy], allowsCoreThreadTimeOut: [false => false]
复制代码



通知报警

触发报警阈值会推送相应报警消息,且会高亮显示相关字段,活性告警、容量告警、拒绝告警



配置变更会推送通知消息,且会高亮变更的字段




监控日志

通过主配置文件 collectType 属性配置指标采集类型,默认值:logging


  • micrometer 方式:通过引入 micrometer 相关依赖采集到相应的平台(如 Prometheus,InfluxDb...)

  • logging:指标数据以 json 格式输出日志到磁盘,地址{appName}.monitor.log


    2022-01-16 15:25:20.599 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":100,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":10,"taskCount":120,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":1078,"dtpName":"remoting-call","maximumPoolSize":8}    2022-01-16 15:25:25.603 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":120,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":20,"taskCount":140,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":1459,"dtpName":"remoting-call","maximumPoolSize":8}    2022-01-16 15:25:30.609 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":140,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":89,"taskCount":180,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":1890,"dtpName":"remoting-call","maximumPoolSize":8}    2022-01-16 15:25:35.613 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":160,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":99,"taskCount":230,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":2780,"dtpName":"remoting-call","maximumPoolSize":8}    2022-01-16 15:25:40.616 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":230,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":300,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":4030,"dtpName":"remoting-call","maximumPoolSize":8}
复制代码


  • 暴露 EndPoint 端点(dynamic-tp),可以通过 http 方式请求


    [        {            "dtp_name": "remoting-call",            "core_pool_size": 8,            "maximum_pool_size": 16,            "queue_type": "SynchronousQueue",            "queue_capacity": 0,            "queue_size": 0,            "fair": false,            "queue_remaining_capacity": 0,            "active_count": 2,            "task_count": 2760,            "completed_task_count": 2760,            "largest_pool_size": 16,            "pool_size": 8,            "wait_task_count": 0,            "reject_count": 12462,            "reject_handler_name": "CallerRunsPolicy"        },        {            "max_memory": "220 MB",            "total_memory": "140 MB",            "free_memory": "44 MB",            "usable_memory": "125 MB"        }    ]
复制代码



项目地址

gitee 地址https://gitee.com/yanhom/dynamic-tp


github 地址https://github.com/lyh200/dynamic-tp



联系我

对项目有什么想法或者建议,可以加我 vx:yanhom1314 交流,或者创建 issues,一起完善项目


发布于: 2022 年 02 月 15 日阅读数: 4
用户头像

yanhom

关注

还未添加个人签名 2017.11.29 加入

还未添加个人简介

评论

发布
暂无评论
美团动态线程池实践思路,开源了