写点什么

【高并发】从源码角度分析创建线程池究竟有哪些方式

作者:冰河
  • 2021 年 11 月 14 日
  • 本文字数:4923 字

    阅读完需:约 16 分钟

【高并发】从源码角度分析创建线程池究竟有哪些方式

大家好,我是冰河~~


在 Java 的高并发领域,线程池一直是一个绕不开的话题。有些童鞋一直在使用线程池,但是,对于如何创建线程池仅仅停留在使用 Executors 工具类的方式,那么,创建线程池究竟存在哪几种方式呢?就让我们一起从创建线程池的源码来深入分析究竟有哪些方式可以创建线程池。

使用 Executors 工具类创建线程池

在创建线程池时,初学者用的最多的就是 Executors 这个工具类,而使用这个工具类创建线程池时非常简单的,不需要关注太多的线程池细节,只需要传入必要的参数即可。Executors 工具类提供了几种创建线程池的方法,如下所示。


  • Executors.newCachedThreadPool:创建一个可缓存的线程池,如果线程池的大小超过了需要,可以灵活回收空闲线程,如果没有可回收线程,则新建线程

  • Executors.newFixedThreadPool:创建一个定长的线程池,可以控制线程的最大并发数,超出的线程会在队列中等待

  • Executors.newScheduledThreadPool:创建一个定长的线程池,支持定时、周期性的任务执行

  • Executors.newSingleThreadExecutor: 创建一个单线程化的线程池,使用一个唯一的工作线程执行任务,保证所有任务按照指定顺序(先入先出或者优先级)执行

  • Executors.newSingleThreadScheduledExecutor:创建一个单线程化的线程池,支持定时、周期性的任务执行

  • Executors.newWorkStealingPool:创建一个具有并行级别的 work-stealing 线程池


其中,Executors.newWorkStealingPool 方法是 Java 8 中新增的创建线程池的方法,它能够为线程池设置并行级别,具有更高的并发度和性能。除了此方法外,其他创建线程池的方法本质上调用的是 ThreadPoolExecutor 类的构造方法。


例如,我们可以使用如下代码创建线程池。


Executors.newWorkStealingPool();Executors.newCachedThreadPool();Executors.newScheduledThreadPool(3);
复制代码

使用 ThreadPoolExecutor 类创建线程池

从代码结构上看 ThreadPoolExecutor 类继承自 AbstractExecutorService,也就是说,ThreadPoolExecutor 类具有 AbstractExecutorService 类的全部功能。


既然 Executors 工具类中创建线程池大部分调用的都是 ThreadPoolExecutor 类的构造方法,所以,我们也可以直接调用 ThreadPoolExecutor 类的构造方法来创建线程池,而不再使用 Executors 工具类。接下来,我们一起看下 ThreadPoolExecutor 类的构造方法。


ThreadPoolExecutor 类中的所有构造方法如下所示。


public ThreadPoolExecutor(int corePoolSize,            int maximumPoolSize,            long keepAliveTime,            TimeUnit unit,           BlockingQueue<Runnable> workQueue) {  this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,     Executors.defaultThreadFactory(), defaultHandler);}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;}
复制代码


由 ThreadPoolExecutor 类的构造方法的源代码可知,创建线程池最终调用的构造方法如下。


public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,        long keepAliveTime, TimeUnit unit,        BlockingQueue<Runnable> workQueue,        ThreadFactory threadFactory,              RejectedExecutionHandler handler) {  if (corePoolSize < 0 ||    maximumPoolSize <= 0 ||    maximumPoolSize < corePoolSize ||    keepAliveTime < 0)    throw new IllegalArgumentException();  if (workQueue == null || threadFactory == null || handler == null)    throw new NullPointerException();  this.acc = System.getSecurityManager() == null ?      null :      AccessController.getContext();  this.corePoolSize = corePoolSize;  this.maximumPoolSize = maximumPoolSize;  this.workQueue = workQueue;  this.keepAliveTime = unit.toNanos(keepAliveTime);  this.threadFactory = threadFactory;  this.handler = handler;}
复制代码


关于此构造方法中各参数的含义和作用,各位可以移步《高并发之——不得不说的线程池与ThreadPoolExecutor类浅析》进行查阅。


大家可以自行调用 ThreadPoolExecutor 类的构造方法来创建线程池。例如,我们可以使用如下形式创建线程池。


new ThreadPoolExecutor(0, Integer.MAX_VALUE,                       60L, TimeUnit.SECONDS,                       new SynchronousQueue<Runnable>());
复制代码

使用 ForkJoinPool 类创建线程池

在 Java8 的 Executors 工具类中,新增了如下创建线程池的方式。


public static ExecutorService newWorkStealingPool(int parallelism) {  return new ForkJoinPool    (parallelism,     ForkJoinPool.defaultForkJoinWorkerThreadFactory,     null, true);}
public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);}
复制代码


从源代码可以可以,本质上调用的是 ForkJoinPool 类的构造方法类创建线程池,而从代码结构上来看 ForkJoinPool 类继承自 AbstractExecutorService 抽象类。接下来,我们看下 ForkJoinPool 类的构造方法。


public ForkJoinPool() {  this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),     defaultForkJoinWorkerThreadFactory, null, false);} public ForkJoinPool(int parallelism) {  this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);}
public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission();}
private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; this.config = (parallelism & SMASK) | mode; long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);}
复制代码


通过查看源代码得知,ForkJoinPool 的构造方法,最终调用的是如下私有构造方法。


private ForkJoinPool(int parallelism,         ForkJoinWorkerThreadFactory factory,         UncaughtExceptionHandler handler,         int mode,         String workerNamePrefix) {  this.workerNamePrefix = workerNamePrefix;  this.factory = factory;  this.ueh = handler;  this.config = (parallelism & SMASK) | mode;  long np = (long)(-parallelism); // offset ctl counts  this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);}
复制代码


其中,各参数的含义如下所示。


  • parallelism:并发级别。

  • factory:创建线程的工厂类对象。

  • handler:当线程池中的线程抛出未捕获的异常时,统一使用 UncaughtExceptionHandler 对象处理。

  • mode:取值为 FIFO_QUEUE 或者 LIFO_QUEUE。

  • workerNamePrefix:执行任务的线程名称的前缀。


当然,私有构造方法虽然是参数最多的一个方法,但是其不会直接对外方法,我们可以使用如下方式创建线程池。


new ForkJoinPool();new ForkJoinPool(Runtime.getRuntime().availableProcessors());new ForkJoinPool(Runtime.getRuntime().availableProcessors(),             ForkJoinPool.defaultForkJoinWorkerThreadFactory,             null, true);
复制代码

使用 ScheduledThreadPoolExecutor 类创建线程池

在 Executors 工具类中存在如下方法类创建线程池。


public static ScheduledExecutorService newSingleThreadScheduledExecutor() {  return new DelegatedScheduledExecutorService    (new ScheduledThreadPoolExecutor(1));}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1, threadFactory));}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize);}
public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);}
复制代码


从源码来看,这几个方法本质上调用的都是 ScheduledThreadPoolExecutor 类的构造方法,ScheduledThreadPoolExecutor 中存在的构造方法如下所示。


public ScheduledThreadPoolExecutor(int corePoolSize) {  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,      new DelayedWorkQueue());}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);}
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler);}
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);}
复制代码


而从代码结构上看,ScheduledThreadPoolExecutor 类继承自 ThreadPoolExecutor 类,本质上还是调用 ThreadPoolExecutor 类的构造方法,只不过此时传递的队列为 DelayedWorkQueue。我们可以直接调用 ScheduledThreadPoolExecutor 类的构造方法来创建线程池,例如以如下形式创建线程池。


new ScheduledThreadPoolExecutor(3)
复制代码


好了,今天就到这儿吧,我是冰河,我们下期见~~

发布于: 2021 年 11 月 14 日阅读数: 32
用户头像

冰河

关注

公众号:冰河技术 2020.05.29 加入

互联网高级技术专家,《深入理解分布式事务:原理与实战》,《海量数据处理与大数据技术实战》和《MySQL技术大全:开发、优化与运维实战》作者,mykit-data与mykit-transaction-message框架作者。【冰河技术】作者。

评论 (1 条评论)

发布
用户头像
原创不易,冰河在线求三连,不过分吧?
2021 年 11 月 14 日 21:08
回复
没有更多了
【高并发】从源码角度分析创建线程池究竟有哪些方式