写点什么

Java 异步判断线程池所有任务是否执行完成的方法

  • 2024-07-12
    福建
  • 本文字数:4929 字

    阅读完需:约 16 分钟

1.使用ExecutorServiceCountDownLatch的方法示例


在 Java 中,当我们使用线程池(如ExecutorService)来执行异步任务时,常常需要知道所有任务是否都已经完成。ExecutorService接口提供了几种方式来处理这种情况,但最常用的是shutdown()awaitTermination()方法的组合,或者使用FutureCompletionService。这里我将提供一个使用ExecutorServiceCountDownLatch的示例,因为CountDownLatch提供了一种直观的方式来等待一组线程完成。


首先,我们定义几个任务,然后使用ExecutorService来异步执行它们,并使用CountDownLatch来等待所有任务完成。

import java.util.concurrent.*;    public class ThreadPoolExample {        public static void main(String[] args) throws InterruptedException {          // 创建一个包含固定数量线程的线程池          ExecutorService executorService = Executors.newFixedThreadPool(4);            // 定义任务数量          int taskCount = 10;            // 使用CountDownLatch来等待所有任务完成          final CountDownLatch latch = new CountDownLatch(taskCount);            // 提交任务到线程池          for (int i = 0; i < taskCount; i++) {              int taskId = i;              executorService.submit(() -> {                  // 模拟任务执行                  try {                      Thread.sleep(1000); // 假设每个任务需要1秒                  } catch (InterruptedException e) {                      Thread.currentThread().interrupt();                  }                  System.out.println("任务 " + taskId + " 完成");                  // 每完成一个任务,计数减一                  latch.countDown();              });          }            // 等待所有任务完成          System.out.println("等待所有任务完成...");          latch.await(); // 阻塞当前线程,直到latch的计数达到零          System.out.println("所有任务完成!");            // 关闭线程池          executorService.shutdown();            // 可选:等待线程池中的线程都执行完毕          try {              if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {                  // 线程池没有在规定时间内关闭,则强制关闭                  executorService.shutdownNow();              }          } catch (InterruptedException e) {              // 当前线程在等待过程中被中断              executorService.shutdownNow();              Thread.currentThread().interrupt();          }      }  }
复制代码


在这个例子中,我们首先创建了一个固定大小的线程池(这里使用 4 个线程)。然后,我们定义了一个CountDownLatch,其计数被初始化为任务的数量(这里为 10)。对于每个任务,我们都向线程池提交了一个Runnable,其中包含了任务的执行逻辑和latch.countDown()调用,以确保每次任务完成时都会减少CountDownLatch的计数。


主线程通过调用latch.await()来等待,直到所有任务都调用了countDown()(即计数达到零),然后才能继续执行。这确保了主线程会等待所有任务完成后再继续。


最后,我们关闭了线程池,并通过调用awaitTermination()来可选地等待线程池中的所有线程都执行完毕。如果线程池没有在指定时间内关闭,则调用shutdownNow()来尝试立即停止所有正在执行的任务。

这个示例提供了处理异步任务并等待它们完成的一种有效方式,适用于需要等待所有任务完成再继续的场景。


2.使用ExecutorServiceinvokeAll方法和Future列表的方法示例


除了使用CountDownLatch之外,还有其他方法可以判断线程池中的所有任务是否执行完成。以下是一个使用ExecutorServiceinvokeAll方法和Future列表的示例,这种方法适用于我们有一组已知的任务(Callable)需要并行执行,并且我们需要等待所有任务完成并获取它们的结果。

import java.util.ArrayList;  import java.util.List;  import java.util.concurrent.*;    public class ThreadPoolFutureExample {        public static void main(String[] args) throws InterruptedException, ExecutionException {          // 创建一个包含固定数量线程的线程池          ExecutorService executorService = Executors.newFixedThreadPool(4);            // 创建一个Callable任务列表          List<Callable<String>> tasks = new ArrayList<>();          for (int i = 0; i < 10; i++) {              final int taskId = i;              tasks.add(() -> {                  // 模拟任务执行                  Thread.sleep(1000); // 假设每个任务需要1秒                  return "任务 " + taskId + " 完成";              });          }            // 使用invokeAll提交所有任务,这将返回一个Future列表          List<Future<String>> futures = executorService.invokeAll(tasks);            // 遍历Future列表,获取每个任务的结果          for (Future<String> future : futures) {              // get()会阻塞,直到对应的任务完成              System.out.println(future.get());          }            // 关闭线程池          executorService.shutdown();            // 可选:等待线程池中的线程都执行完毕          try {              if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {                  // 线程池没有在规定时间内关闭,则强制关闭                  executorService.shutdownNow();              }          } catch (InterruptedException e) {              // 当前线程在等待过程中被中断              executorService.shutdownNow();              Thread.currentThread().interrupt();          }      }  }    // 注意:这里使用了Lambda表达式和方法引用来简化Callable的创建  // 实际使用中,你可能需要实现Callable接口或使用匿名内部类
复制代码


在这个例子中,我们创建了一个ExecutorService和一个Callable任务列表。每个Callable任务都会返回一个字符串,表示任务完成的信息。我们使用invokeAll方法提交了所有任务,并立即获得了一个Future列表,每个Future都代表了一个任务的执行结果。


然后,我们遍历这个Future列表,并对每个Future调用get()方法。get()方法会阻塞当前线程,直到对应的任务完成并返回结果。这样,我们就能确保在继续执行之前,所有任务都已经完成。


最后,我们关闭了线程池,并等待所有线程都执行完毕(或超时后强制关闭)。


请注意,虽然这个示例使用了CallableFuture,但它并没有直接提供一个“是否所有任务都已完成”的布尔值。然而,通过遍历Future列表并调用get(),我们实际上已经达到了等待所有任务完成的效果。如果我们只需要知道是否所有任务都已开始执行(而不是等待它们完成),那么我们可能需要采用不同的策略,比如使用execute方法结合其他同步机制(如CountDownLatch)。


3.使用ExecutorService来异步执行多个Callable任务方法示例


以下是一个详细完整的代码示例,该示例使用了ExecutorService来异步执行多个Callable任务,并通过遍历Future列表来等待所有任务完成并获取它们的结果。

import java.util.ArrayList;  import java.util.List;  import java.util.concurrent.*;    public class ThreadPoolFutureCompleteExample {        public static void main(String[] args) {          // 创建一个包含固定数量线程的线程池          ExecutorService executorService = Executors.newFixedThreadPool(4);            // 创建一个Callable任务列表          List<Callable<String>> tasks = new ArrayList<>();          for (int i = 0; i < 10; i++) {              final int taskId = i;              tasks.add(new Callable<String>() {                  @Override                  public String call() throws Exception {                      // 模拟任务执行                      TimeUnit.SECONDS.sleep(1); // 假设每个任务需要1秒                      return "任务 " + taskId + " 完成";                  }              });                // 或者使用Lambda表达式(如果你使用的是Java 8或更高版本)              // tasks.add(() -> {              //     TimeUnit.SECONDS.sleep(1);              //     return "任务 " + taskId + " 完成";              // });          }            try {              // 使用invokeAll提交所有任务,这将返回一个Future列表              List<Future<String>> futures = executorService.invokeAll(tasks);                // 遍历Future列表,获取每个任务的结果              for (Future<String> future : futures) {                  // get()会阻塞,直到对应的任务完成                  System.out.println(future.get());              }                // 关闭线程池              executorService.shutdown();                // 等待线程池中的所有线程都执行完毕(可选)              // 注意:由于我们已经调用了invokeAll并等待了所有Future的完成,这一步通常是多余的              // 但为了完整性,我还是展示了如何等待线程池关闭              boolean terminated = executorService.awaitTermination(60, TimeUnit.SECONDS);              if (!terminated) {                  // 如果线程池没有在规定时间内关闭,则强制关闭                  System.err.println("线程池没有在规定时间内关闭,尝试强制关闭...");                  executorService.shutdownNow();                  // 注意:shutdownNow()不保证已经提交的任务会被取消                  // 它会尝试停止正在执行的任务,但已经开始执行的任务可能无法被中断              }            } catch (InterruptedException | ExecutionException e) {              // 处理异常              e.printStackTrace();                // 如果当前线程在等待过程中被中断,尝试关闭线程池              if (!executorService.isShutdown()) {                  executorService.shutdownNow();              }                // 根据需要,可能还需要重新设置中断状态              Thread.currentThread().interrupt();          }      }  }
复制代码


在这个示例中,我使用了传统的匿名内部类来创建Callable任务(同时也提供了 Lambda 表达式的注释),以便与各种 Java 版本兼容。然而,如果我们正在使用 Java 8 或更高版本,我强烈推荐我们使用 Lambda 表达式来简化代码。


请注意,invokeAll方法会阻塞调用它的线程,直到所有任务都完成,或者直到等待超时(如果我们提供了超时时间)。但是,在这个示例中,我们没有为invokeAll提供超时时间,因此它会一直等待,直到所有任务都完成。


另外,请注意,在catch块中,如果捕获到InterruptedException,我们检查了线程池是否已经被关闭(使用isShutdown方法)。如果没有,我们调用shutdownNow方法来尝试关闭线程池并停止正在执行的任务。然而,需要注意的是,shutdownNow方法并不保证能够停止所有已经开始执行的任务,因为某些任务可能无法被中断。


最后,如果在捕获到InterruptedException后,我们确定当前线程需要被重新中断(比如,我们在一个循环中等待某个条件,而中断是用来退出循环的),那么我们应该调用Thread.currentThread().interrupt()来重新设置中断状态。在这个示例中,我们没有这样做,因为main方法不需要重新中断。但是,在更复杂的场景中,这可能是必要的。


文章转载自:TechSynapse

原文链接:https://www.cnblogs.com/TS86/p/18296282

体验地址:http://www.jnpfsoft.com/?from=infoq

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
Java异步判断线程池所有任务是否执行完成的方法_Java_快乐非自愿限量之名_InfoQ写作社区