Java 线程池相关知识点总结,mybatis 基础面试题
workQueue:表示存放任务的 BlockingQueue<Runnable 队列。
BlockingQueue:阻塞队列(BlockingQueue)是 java.util.concurrent 下的主要用来控制线程同步的工具。如果 BlockQueue 是空的,从 BlockingQueue 取东西的操作将会被阻断进入等待状态,直到 BlockingQueue 进了东西才会被唤醒。同样,如果 BlockingQueue 是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到 BlockingQueue 里有空间才会被唤醒继续操作。
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。具体的实现类有 LinkedBlockingQueue,ArrayBlockingQueued 等。一般其内部的都是通过 Lock 和 Condition(显示锁(Lock)及 Condition 的学习与使用)来实现阻塞和唤醒。
3.线程池的工作过程
线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
当调用 execute() 方法添加一个任务时,线程池会做如下判断:
如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常 RejectExecutionException。
当一个线程完成任务时,它会从队列中取下一个任务来执行。
当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
4.线程池的创建和使用
生成线程池采用了工具类 Executors 的静态方法,以下是几种常见的线程池。
1)SingleThreadExecutor:单个后台线程 (其缓冲队列是无界的)
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService (
new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
创建一个单线程的线程池。这个线程池只有一个核心线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
2)FixedThreadPool:只有核心线程的线程池,大小固定 (其缓冲队列是无界的) 。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
3)CachedThreadPool:无界线程池,可以进行自动线程回收。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0,Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60 秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说 JVM)能够创建的最大线程大小。SynchronousQueue 是一个是缓冲区为 1 的阻塞队列。
4)ScheduledThreadPool:核心线程池固定,大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
public static ExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPool(corePoolSize,
Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue
());
}
5.线程池实现的原理
如果只讲线程池的使用,那这篇博客没有什么大的价值,充其量也就是熟悉 Executor 相关 API 的过程。线程池的实现过程没有用到 Synchronized 关键字,用的都是 volatile,Lock 和同步(阻塞)队列,Atomic 相关类,FutureTask 等等,因为后者的性能更优。理解的过程可以很好的学习源码中并发控制的思想。
在 ThreadPoolExecutor 主要 Worker 类来控制线程的复用。看下 Worker 类简化后的代码,这样方便理解:
private final class Worker implements Runnable {
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
while (task != null || (task = getTask()) != null){
task.run();
}
}
Worker 是一个 Runnable,同时拥有一个 thread,这个 thread 就是要开启的线程,在新建 Worker 对象时同时新建一个 Thread 对象,同时将 Worker 自己作为参数传入 TThread,这样当 Thread 的 start()方法调用时,运行的实际上是 Worker 的 run()方法,接着到 runWorker()中,有个 while 循环,一直从 getTask()里得到 Runnable 对象,顺序执行。getTask()又是怎么得到 Runnable 对象的呢?
private Runnable getTask() {
if(一些特殊情况) {
return null;
}
Runnable r = workQueue.take();
return r;
}
这个 workQueue 就是初始化 ThreadPoolExecutor 时存放任务的 BlockingQueue 队列,这个队列里的存放的都是将要执行的 Runnable 任务。因为 BlockingQueue 是个阻塞队列,BlockingQueue.take()得到如果是空,则进入等待状态直到 BlockingQueue 有新的对象被加入时唤醒阻塞的线程。所以一般情况 Thread 的 run()方法就不会结束,而是不断执行从 workQueue 里的 Runnable 任务,这就达到了线程复用的原理了。
我是面试课的 DocMike 老师,曾在阿里、百度、美团,最近和北大同学搞了一个
职场类公众号: 健男说说 ,会有热门互联网职场咨询和经验,可以关注下,
也可以加我私人微信 570089514,注明慕课网就可以
以后有相关面试、内推、简历、职场的问题都可以通过微信和公众号给我反馈
希望自己这么多年走过的弯路 能够给大家一些帮助
控制最大并发数:
那 Runnable 是什么时候放入 workQueue?Worker 又是什么时候创建,Worker 里的 Thread 的又是什么时候调用 start()开启新线程来执行 Worker 的 run()方法的呢?有上面的分析看出 Worker 里的 runWorker()执行任务时是一个接一个,串行进行的,那并发是怎么体现的呢?
很容易想到是在 execute(Runnable runnable)时会做上面的一些任务。看下 execute 里是怎么做的。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 当前线程数 < corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 直接启动新的线程。
if (addWorker(command, true))
return;
c = ctl.get();
}
// 活动线程数 >= corePoolSize
// runState 为 RUNNING && 队列未满
// workQueue.offer(command)表示添加到队列,如果添加成功返回 true,否则返回 false
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次检验是否为 RUNNING 状态
评论