实践 Java 如何创建安全的线程池

用户头像
tingye
关注
发布于: 2020 年 06 月 14 日
实践Java如何创建安全的线程池

《阿里巴巴Java开发手册》关于线程池的创建,有如下规则:

【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

说明:Executors 返回的线程池对象的弊端如下:

1)FixedThreadPool 和 SingleThreadPool:

允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。

2)CachedThreadPool 和 ScheduledThreadPool:

允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。



如何理解并实践这条规则,创建没有OOM风险的、安全的线程池呢?以下主要分三个部分展开:

1、测试验证使用Executors创建的线程池为什么不安全

2、实现创建安全的CachedThreadPool、FixedThreadPool 、SingleThreadPool线程池,并测试验证

3、实现创建安全的ScheduledThreadPool线程池,并测试验证

1、测试验证使用Executors创建的线程池为什么不安全

除了从源码角度理解,我们也可以通过编写测试用例,验证为什么使用 Executors 去创建线程池不安全,从而加深对规则的感性认识。这里我们构造了SleepTask任务执行类,按需sleep指定的毫秒数,模拟线程任务执行。

public class SleepTask implements Runnable {
private final long sleepMilliSeconds;
private final int taskId;
public SleepTask(long sleepMilliSeconds, int taskId) {
this.sleepMilliSeconds = sleepMilliSeconds;
this.taskId = taskId;
}
public void run() {
System.out.println("--------Start task " + taskId);
try {
Thread.sleep(sleepMilliSeconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("--------Finish task " + taskId);
}
}

通过ExecutorsTester测试类,模拟了大量线程任务进入线程池执行/排队的场景,观察CachedThreadPool、FixedThreadPool和ScheduledThreadPool触发OOM时的情况,因SingleThreadPool与FixedThreadPool实现类似,可以简单认为是固定线程数为1的特殊FixedThreadPool,故略去相关代码。

public class ExecutorsTester {
@Test
public void testNewCachedThreadPool() {
ExecutorService executorService = Executors.newCachedThreadPool();
executeSleepTaskLoop(executorService);
}
@Test
public void testNewFixedThreadPool() {
ExecutorService executorService = Executors.newFixedThreadPool(20);
executeSleepTaskLoop(executorService);
}
@Test
public void testNewScheduledThreadPool() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
try {
for(int i = 0; i < 100000; i++) {
executorService.schedule(new SleepTask(2000, i), 1, TimeUnit.SECONDS);
System.out.println(executorService.toString());
}
} catch (OutOfMemoryError e) {
System.out.println(e.toString());
System.exit(1);
}
}
private void executeSleepTaskLoop(ExecutorService executorService) {
try {
for(int i = 0; i < 100000; i++) {
executorService.submit(new SleepTask(3000, i));
System.out.println(executorService.toString());
}
} catch (OutOfMemoryError e) {
System.out.println(e.toString());
System.exit(1);
}
}
}

测试用例执行时,限制了最大堆内存(-Xmx2m),以便较快触发OOM异常。

CachedThreadPool、FixedThreadPool和ScheduledThreadPool测试用例执行结果如下:





细心的你可能会发现,跟规则描述稍有不同,ScheduledThreadPool最终OOM时,打印出来的线程数并不大,反而跟FixedThreadPool类似,有大量的排队任务。通过查看源码发现,因为ScheduledThreadPool使用了自定义的DelayedWorkQueue,虽然初始队列大小是16,但会动态扩容至最大Integer.MAXVALUE,实际上也是无界工作队列。所以虽然配置的线程数量是Integer.MAXVALUE,但其实是用不到的,OOM的直接原因是无界队列堆积大量请求,这里《阿里巴巴Java开发手册》的描述值得推敲。

static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
//只展示部分关键源码
private static final int INITIAL_CAPACITY = 16;
private void grow() {
int oldCapacity = queue.length;
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
queue = Arrays.copyOf(queue, newCapacity);
}
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
}

2、实现创建安全的CachedThreadPool、FixedThreadPool 、SingleThreadPool线程池,并测试验证

下面我们按规则要求尝试创建安全的线程池,其中CachedThreadPool、FixedThreadPool 、 SingleThreadPool通过ThreadPoolExecutor 的方式创建都比较容易,可通过Guava的ThreadFactoryBuilder统一给线程池中的线程命名。

public class SafeExecutors {
public static ExecutorService newCachedThreadPool(int corePoolSize, int maxPoolSize, int queueCapacity, String threadNamePrefix) {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-%d").build();
BlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<Runnable>(queueCapacity);
return new ThreadPoolExecutor(corePoolSize, maxPoolSize, 60L, TimeUnit.SECONDS, blockingQueue, namedThreadFactory);
}
public static ExecutorService newFixedThreadPool(int corePoolSize, int queueCapacity, String threadNamePrefix) {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-%d").build();
BlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<Runnable>(queueCapacity);
return new ThreadPoolExecutor(corePoolSize, corePoolSize, 60L, TimeUnit.SECONDS, blockingQueue, namedThreadFactory);
}
public static ExecutorService newSingledThreadPool(int queueCapacity, String threadNamePrefix) {
return newFixedThreadPool(1, queueCapacity, threadNamePrefix);
}
}

测试验证也很简单,用例执行结果符合预期。

public class SafeExcutorsTester {
@Test(expected = RejectedExecutionException.class)
public void testSafeNewCachedThreadPool() {
ExecutorService executorService = SafeExecutors.newCachedThreadPool(10, 100, 50, "test");
executeSafeSleepTaskLoop(executorService);
}
@Test(expected = RejectedExecutionException.class)
public void testSafeNewFixedThreadPool() {
ExecutorService executorService = SafeExecutors.newFixedThreadPool(20, 50, "test");
executeSafeSleepTaskLoop(executorService);
}
@Test(expected = RejectedExecutionException.class)
public void testSafeNewSingledThreadPool() {
ExecutorService executorService = SafeExecutors.newSingledThreadPool(50, "test");
executeSafeSleepTaskLoop(executorService);
}
private void executeSafeSleepTaskLoop(ExecutorService executorService) {
try {
for(int i = 0; i < 100000; i++) {
executorService.submit(new SleepTask(3000, i));
System.out.println(executorService.toString());
}
} catch (RejectedExecutionException e) {
System.out.println(e.toString());
throw e;
}
}
}



3、实现创建安全的ScheduledThreadPool线程池,并测试验证

实现创建安全的ScheduledThreadPool就有点麻烦了,因为ScheduledThreadPoolExecutor

虽然继承了ThreadPoolExecutor,但提供的构造方法没办法定制最大线程数和工作队列。

public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
//只展示部分关键源码
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
}

虽然可以通过super.setMaximumPoolSize方法重置最大线程数,但工作队列仍然不好处理,能不能也替代为有界的LinkedBlockingQueue呢?通过反射机制技术上是能做到的,但是细想意义不大,因为ScheduledThreadPoolExecutor定时任务执行的功能主要就是通过DelayedWorkQueue完成的,牺牲了定时功能ScheduledThreadPool就没有使用的意义了。

既然不能替换DelayedWorkQueue,那怎么解决无界队列可能引发的OOM呢?首先想到的就是定制DelayedWorkQueue为有界队列,但DelayedWorkQueue不是public的,也没有提供相关定制接口,难以实现。

还是从源码里找答案,发现ScheduledThreadPoolExecutor提供的关于DelayedWorkQueue队列操作的唯一public方法getQueue,是直接使用的父类的getQueue方法返回BlockingQueue,我们能不能间接的,在每次添加任务到队列前,增加检查队列长度的操作,人工控制工作队列的最大长度呢?

public BlockingQueue<Runnable> getQueue() {
return super.getQueue();
}



因此尝试如下解决方案,自定义SafeScheduledThreadPool继承ScheduledThreadPool,覆写需要用到的定时方法,在调用父类的方法前,增加判断队列长度是否达到了自定义的上限,如果是就抛出RejectedExecutionException异常,避免队列无限增长。

public class SafeScheduledThreadPool extends ScheduledThreadPoolExecutor {
protected final int queueCapacity;
public SafeScheduledThreadPool(int corePoolSize, int maxPoolSize, int queueCapacity, ThreadFactory threadFactory) {
super(corePoolSize, threadFactory);
super.setMaximumPoolSize(maxPoolSize);
this.queueCapacity = queueCapacity;
}
@Override
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
checkQueueSize();
return super.schedule(command, delay, unit);
}
//省略覆写ScheduledThreadPoolExecutor中其他定时任务方法,实现同上
private void checkQueueSize() {
if (getQueue().size() >= queueCapacity) {
throw new RejectedExecutionException("DelayedWorkQueue size exceed capacity: " + queueCapacity);
}
}
}



经过测试完美达到预期效果。

public class SafeExcutorsTester {
@Test(expected = RejectedExecutionException.class)
public void testSafeNewScheduledThreadPool() {
ScheduledExecutorService scheduledExecutorService = SafeExecutors.newScheduledThreadPool(10, 100, 50, "test");
try {
for(int i = 0; i < 100000; i++) {
scheduledExecutorService.schedule(new SleepTask(2000, i), 1, TimeUnit.SECONDS);
System.out.println(scheduledExecutorService.toString());
}
} catch (RejectedExecutionException e) {
System.out.println(e.toString());
throw e;
}
}
}





总结

通过动手测试更容易理解线程池的运行机制,以及编码规范的初衷,另外编码规范可能很精炼,自己实践时遇到问题还是要从源码里找答案,多尝试多验证,不浅尝辄止最终定有收货。

发布于: 2020 年 06 月 14 日 阅读数: 155
用户头像

tingye

关注

念天地之悠悠,独怆然而涕下 2019.01.24 加入

还未添加个人简介

评论

发布
暂无评论
实践Java如何创建安全的线程池