【Java 多线程 3】线程池 2,linux 内核编程进阶篇 pdf
线程池添加任务过程:
我们首先从线程池的创建说起,Executors.newFixedThreadPool(2)表示创建一个具有两个线程的线程池,源代码如下:
public class Executors {
//生成一个最大为 nThreads 的线程池执行器
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
}
这里使用了 LinkedBlockingQueue 作为队列任务管理器,所有等待处理的任务都会放在该对列中,需要注意的是,此队列是一个阻塞式的单端队列。线程池建立好了,那就需要线程在其中运行了,线程池中的线程是在 submit 第一次提交任务时建立的,代码如下:
public Future<?> submit(Runnable task) {
//检查任务是否为 null
if (task == null) throw new NullPointerException();
//把 Runnable 任务包装成具有返回值的任务对象,不过此时并没有执行,只是包装
RunnableFuture<Object> ftask = newTaskFor(task, null);
//执行此任务
execute(ftask);
//返回任务预期执行结果
return ftask;
}
此处的代码关键是 execute 方法,它实现了三个职责。
1、创建足够多的工作线程数,不大于最大线程数量,并保持线程处于运行或等待状态。
2、把等待处理的任务放到任务队列中
3、从任务队列中取出来执行
其中此处的关键是工作线程的创建,它也是通过 new Thread 方式创建的一个线程,只是它创建的并不是我们的任务线程(虽然我们的任务实现了 Runnable 接口,但它只是起了一个标志性的作用),而是经过包装的 Worker 线程,代码如下:
private final class Worker implements Runnable {
// 运行一次任务
private void runTask(Runnable task) {
/* 这里的 task 才是我们自定义实现 Runnable 接口的任务 */
task.run();
/* 该方法其它代码略 */
}
// 工作线程也是线程,必须实现 run 方法
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
// 任务队列中获得任务
Runnable getTask() {
/* 其它代码略 */
for (;;) {
return r = workQueue.take();
}
}
}
此处为示意代码,删除了大量的判断条件和锁资源。execute 方法是通过 Worker 类启动的一个工作线程,执行的是我们的第一个任务,然后改线程通过 getTask 方法从任务队列中获取任务,之后再继续执行,但问题是任务队列是一个 BlockingQuene,是阻塞式的,也就是说如果该队列的元素为 0,则保持等待状态,直到有任务进入为止,我们来看 LinkedBlockingQuene 的 take 方法,代码如下:
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
try {
// 如果队列中的元素为 0,则等待
while (count.get() == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to a non-interrupted thread
throw ie;
}
// 等待状态结束,弹出头元素
x = extract();
c = count.getAndDecrement();
// 如果队列数量还多于一个,唤醒其它线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
// 返回头元素
return x;
}
分析到这里,我们就明白了线程池的创建过程:创建一个阻塞队列以容纳任务,在第一次执行任务时创建做够多的线程(不超过许可线程数),并处理任务,之后每个工作线程自行从任务对列中获得任务,直到任务队列中的任务数量为 0 为止,此时,线程将处于等待状态,一旦有任务再加入到队列中,即召唤醒工作线程进行处理,实现线程的可复用性。
使用线程池减少的是线程的创建和销毁时间,因为创建一个对象要获取内存资源,虚拟机跟踪每个对象,以便可以在对象销毁后进行垃圾回收,所以提高服务程序效率的一个重要手段就是尽可能的降低创建和销毁对象的次数,特别是一些非常耗资源的对象创建和销毁。
这对于多线程应用来说非常的有帮助,比如我们常用的 servlet 容器,每次请求处理的都是一个线程,如果不采用线程池技术,每次请求都会重新创建一个新的线程,这会导致系统的性能负荷加大,影响效率下降,主要是很 low,弱爆了,这代码谁写的,给我出来,哈哈。
四、适时选择不同的线程池来实现
Java 的线程池包括 ThreadPoolExecutor 类和 ScheduledThreadPoolExecutor 类。
为了简化,还提供了 Exceutors 的静态类,它可以直接生成不同的线程池执行器,比如单线程执行器、带缓冲功能的执行器等。
为了理解这些执行器,我们首先来看看 ThreadPoolExecutor 类,其中它复杂的构造函数可以很好的理解线程池的作用,代码如下:
// Java 线程池的完整构造函数
public ThreadPoolExecutor(
int corePoolSize, // 线程池长期维持的最小线程数,即使线程处于 Idle 状态,也不会回收。
int maximumPoolSize, // 线程数的上限
long keepAliveTime, // 线程最大生命周期。
TimeUnit unit, //时间单位
BlockingQueue<Runnable> workQueue, //任务队列。当线程池中的线程都处于运行状态,而此时任务数量继续增加,则需要一个容器来容纳这些任务,这就是任务队列。
ThreadFactory threadFactory, // 线程工厂。定义如何启动一个线程,可以设置线程名称,并且可以确认是否是后台线程等。
RejectedExecutionHandler handler) // 拒绝任务处理器。由于超出线程数量和队列容量而对继续增加的任务进行处理的程序。
这是 ThreadPoolExecutor 最完整的构造函数,其他的构造函数都是引用该构造函数实现的。
线程池的管理是这样一个过程:首先创建线程池,然后根据任务的数量逐步将线程增大到 corePoolSize 数量,如果此时仍有任务增加,则放置到 workQuene 中,直到 workQuene 爆满为止,然后继续增加池中的数量(增强处理能力),最终达到 maximumPoolSize,那如果此时还有任务增加进来呢?这就需要 handler 处理了,或者丢弃任务,或者拒绝新任务,或者挤占已有任务等。
在任务队列和线程池都饱和的情况下,一但有线程处于等待(任务处理完毕,没有新任务增加)状态的时间超过 keepAliveTime,则该线程终止,也就说池中的线程数量会逐渐降低,直至为 corePoolSize 数量为止。
向线程池提交一个任务后,它的主要处理流程如下图所示:
![5fc9cf0a48a4ad2da37b26856660a7b5205.jpg](https://imgconvert.csdnimg.cn/aHR0cHM6Ly9vc2NpbWcub3NjaGluYS5uZXQvb3NjbmV0LzVmYzljZjBhNDhhNGFkMmRhMzdiMjY4NTY
2NjBhN2I1MjA1LmpwZw?x-oss-process=image/format,png)
我们可以把线程池想象为这样一个场景:在一个生产线上,车间规定是可以有 corePoolSize 数量的工人,但是生产线刚建立时,工作不多,不需要那么多的人。随着工作数量的增加,工人数量也逐渐增加,直至增加到 corePoolSize 数量为止。此时还有任务增加怎么办呢?
好办,任务排队,corePoolSize 数量的工人不停歇的处理任务,新增加的任务按照一定的规则存放在仓库中(也就是我们的 workQuene 中),一旦任务增加的速度超过了工人处理的能力,也就是说仓库爆满时,车间就会继续招聘工人(也就是扩大线程数),直至工人数量到达 maximumPoolSize 为止,那如果所有的 maximumPoolSize 工人都在处理任务时,而且仓库也是饱和状态,新增任务该怎么处理呢?这就会扔一个叫 handler 的专门机构去处理了,它要么丢弃这些新增的任务,要么无视,要么替换掉别的任务。
过了一段时间后,任务的数量逐渐减少,导致一部分工人处于待工状态,为了减少开支(Java 是为了减少系统的资源消耗),于是开始辞退工人,直至保持 corePoolSize 数量的工人为止,此时即使没有工作,也不再辞退工人(池中的线程数量不再减少),这也是保证以后再有任务时能够快速的处理。
明白了线程池的概念,我们再来看看 Executors 提供的几个线程创建线程池的便捷方法:
① newSingleThreadExecutor:单线程池。
顾名思义就是一个池中只有一个线程在运行,该线程永不超时,而且由于是一个线程,当有多个任务需要处理时,会将它们放置到一个无界阻塞队列中逐个处理,它的实现代码如下:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable()));
}
它的使用方法也很简单,下面是简单的示例:
public static void main(String[] args) throws ExecutionException,InterruptedException {
// 创建单线程执行器
ExecutorService es = Executors.newSingleThreadExecutor();
// 执行一个任务
Future<String> future = es.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "";
}
评论