1、简单介绍
我们在并发编程中,目前大部分做法都是将任务添加到线程池中,并拿到 Future 对象,将其添加到集合中,等所有任务都添加到线程池后,在通过遍历 Future 集合,调用 future.get()来获取每个任务的结果,这样可以使得先添加到线程池的任务先等待其完成,但是并不能保证第一个添加到线程池的任务就是第一个执行完成的,所以会出现这种情况,后面添加到线程池的任务已经完成了,但是还必须要等待第一个任务执行完成并处理结果后才能处理接下来的任务。
如果想要不管添加到线程池的任务的顺序,先完成的任务先进行处理,那么就需要用到 ExecutorCompletionService 这个工具了。
2、源码解析
ExecutorCompletionService 实现了 CompletionService 接口。CompletionService 接种有有以下方法。
public interface CompletionService<V> {
// 提交任务
Future<V> submit(Callable<V> task);
// 提交任务
Future<V> submit(Runnable task, V result);
// 获取任务结果,带抛出异常
Future<V> take() throws InterruptedException;
// 获取任务结果
Future<V> poll();
// 获取任务结果,带超时
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
复制代码
可以看到接口中的方法非常简单,只有提交任务以及获取任务结果两类方法。
我们再看下实现类 ExecutorCompletionService 中的代码。
public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
/**
* FutureTask的子类,重写FutureTask完成后的done方法
*/
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
// task任务执行完成后将任务放到队列中
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
if (aes == null)
return new FutureTask<V>(task, result);
else
return aes.newTaskFor(task, result);
}
/**
* 构造方法,传入一个线程池,创建一个队列
*/
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
/**
* 构造方法,传入线程池和队列
*/
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
// 提交一个task任务,最终将任务封装成QueueingFuture并由指定的线程池执行
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
// 提交一个task任务,最终将任务封装成QueueingFuture并由指定的线程池执行
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}
// 从队列中获取执行完成的RunnableFuture对象,take方法会阻塞直到有数据
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
// 从队列中获取执行完成的RunnableFuture对象
public Future<V> poll() {
return completionQueue.poll();
}
// 从队列中获取执行完成的RunnableFuture对象
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
}
复制代码
通过观察实现类中的代码,我们可以发现这个方法非常简单,其原理分为以下几步:
1、在构造 ExecutorCompletionService 对象时,需要传入给定的线程池或者阻塞队列。
2、当我们提交任务到 ExecutorCompletionService 时,会将提交的任务包装成 QueueingFuture 对象,然后交由我们指定的线程池来执行。
3、当任务执行完成后,QueueingFuture 对象会执行最终的 done 方法(QueueingFuture 对象重新的方法),将 RunnableFuture 对象添加到指定的阻塞队列中。
4、我们可以通过 poll 或者 take 方法来获取队列中的 RunnableFuture 对象,以便获取执行结果。
由此可以发现我们获取到的任务执行结果,与提交到线程池的任务顺序是无关的,哪个任务先完成,就会被添加到队列中,我们就可以先获取执行结果。
3、使用场景
1、当我们不关注提交到线程池任务顺序以及任务执行完成获取结果的顺序时,我们就可以使用 ExecutorCompletionService 这个来执行任务。以下是示例代码。
void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException, ExecutionException {
CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
for (Callable<Result> s : solvers) {
ecs.submit(s);
}
int n = solvers.size();
for (int i = 0; i < n; ++i) {
Result r = ecs.take().get();
if (r != null) {
use(r);
}
}
}
复制代码
2、当多个任务同时执行,我们只需要获取第一个任务的执行结果,其余结果不需要关心时,也可以通过 ExecutorCompletionService 来执行任务。以下是示例代码。
void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
int n = solvers.size();
List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
Result result = null;
try {
for (Callable<Result> s : solvers) {
futures.add(ecs.submit(s));
}
for (int i = 0; i < n; ++i) {
try {
Result r = ecs.take().get();
if (r != null) {
result = r;
break;
}
} catch (ExecutionException ignore) {
}
}
} finally {
for (Future<Result> f : futures) {
f.cancel(true);
}
}
if (result != null) {
use(result);
}
}
复制代码
4、代码实践
在业务上我们有这种场景,我们有一批订单进行批量更新,每处理完一单,我们都需要维护一下处理进度,保证订单处理进度实时更新成最新的进度数据,我们此时用到的就是 ExecutorCompletionService。
protected void parallelBatchUpdateWaybill(Map<String, LwbMain> lwbMainMap, Map<String, UpdateWaybillTaskDetail> taskDetailMap) {
long start = System.currentTimeMillis();
log.info("{} 并行批量更新订单开始:{}", traceId, taskNo);
int total = lwbMainMap.size();
BlockingQueue<Future<String>> blockingQueue = new LinkedBlockingQueue<>(total + 2);
ExecutorCompletionService<String> executorCompletionService = new ExecutorCompletionService<>(parallelUpdateWaybillExecutorService, blockingQueue);
for (Map.Entry<String, UpdateWaybillTaskDetail> entry : taskDetailMap.entrySet()) {
String lwbNo = entry.getKey();
LwbMain lwbMain = lwbMainMap.get(lwbNo);
UpdateWaybillTaskDetail taskDetail = entry.getValue();
executorCompletionService.submit(() -> this.updateSingleWaybill(lwbMain, taskDetail), "done");
}
for (int current = 0; current < taskDetailMap.size(); current++) {
try {
executorCompletionService.take().get();
} catch (Exception e) {
log.error("{} 获取并行批量更新订单结果异常:{}", traceId, e.getMessage(), e);
} finally {
jimClient.incr(importTaskNo);
}
}
long end = System.currentTimeMillis();
log.info("{} 并行批量更新订单结束:{},耗时:{}", traceId, taskNo, (end - start));
}
复制代码
评论