并发编程长期以来一直是 Java 的阿喀琉斯之踵。尽管 ExecutorService 和 Future 为我们提供了良好的服务,但它们允许不受限制的模式,其中子任务可能比其父任务存活更久、线程可能泄漏,而取消操作则变成了一场噩梦。结构化并发通过将运行在不同线程中的相关任务组视为一个单一的工作单元,改变了这一现状,它简化了错误处理和取消操作,同时提高了可靠性和可观测性。
非结构化并发的问题
考虑一个使用 ExecutorService 的典型模式:一个线程创建执行器,另一个线程提交工作,而执行任务的线程与前两者都没有关系。在一个线程提交工作之后,一个完全不同的线程可以等待结果——任何持有 Future 引用的代码都可以连接它,甚至可以是与获取该 Future 的线程不同的线程中的代码。
这种非结构化方法带来了实际问题。当父任务未能正确关闭子任务时,就会发生线程泄漏。由于没有协调的方式来通知多个子任务,取消操作会出现延迟。并且由于任务和子任务之间的关系在运行时未被跟踪,可观测性会受到影响。
// 非结构化:关系是隐式且脆弱的ExecutorService executor = Executors.newCachedThreadPool();Future<User> userFuture = executor.submit(() -> fetchUser(id));Future<Orders> ordersFuture = executor.submit(() -> fetchOrders(id));
// 如果 fetchUser 失败会发生什么?// 谁负责关闭执行器?// 如果我们忘记清理,线程会泄漏吗?
复制代码
引入 StructuredTaskScope
结构化并发 API 的主要类是 java.util.concurrent 包中的 StructuredTaskScope,它使您能够将一个并发子任务组作为一个单元进行协调。使用 StructuredTaskScope,您可以在各自的线程中分叉每个子任务,然后将它们作为一个单元进行汇合,确保在主任务继续之前子任务完成。
该 API 遵循一个清晰的模式:
使用 try-with-resources 创建一个 StructuredTaskScope
将子任务定义为 Callable 实例
在各自的线程中分叉每个子任务
汇合以等待完成
处理子任务的结果
以下是一个获取天气数据的真实示例:
WeatherReport getWeatherReport(String location) throws ExecutionException, InterruptedException { try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { Supplier<Temperature> temperature = scope.fork(() -> getTemperature(location)); Supplier<Humidity> humidity = scope.fork(() -> getHumidity(location)); Supplier<WindSpeed> windSpeed = scope.fork(() -> getWindSpeed(location));
scope.join() // 汇合所有子任务 .throwIfFailed(); // 如果有任何失败,传播错误
// 全部成功,组合结果 return new WeatherReport( location, temperature.get(), humidity.get(), windSpeed.get() ); }}
复制代码
try-with-resources 代码块至关重要——它确保作用域被正确关闭,取消任何未完成的子任务并防止线程泄漏。
使用关闭策略实现短路
短路模式通过使主任务能够中断和取消那些不再需要其结果子任务,来促使子任务快速完成。两个内置策略处理了常见场景:
ShutdownOnFailure:"调用所有"模式
当您需要所有子任务都成功时,ShutdownOnFailure 会在一个任务失败后立即取消剩余的任务:
Response handleRequest(String userId) throws Exception { try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { Subtask<User> user = scope.fork(() -> fetchUser(userId)); Subtask<Profile> profile = scope.fork(() -> fetchProfile(userId)); Subtask<Settings> settings = scope.fork(() -> fetchSettings(userId));
scope.join().throwIfFailed();
// 如果有任何失败,我们永远不会到达这里 return new Response(user.get(), profile.get(), settings.get()); }}
复制代码
如果 fetchUser() 抛出异常,作用域会立即取消配置文件和设置的获取。没有浪费的工作,没有线程泄漏。
ShutdownOnSuccess:"调用任一"模式
有时您只需要第一个成功的结果——例如查询多个数据中心或尝试备用服务:
String fetchFromMultipleSources(String key) throws Exception { try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) { scope.fork(() -> fetchFromPrimaryDB(key)); scope.fork(() -> fetchFromCache(key)); scope.fork(() -> fetchFromBackup(key));
scope.join();
// 返回第一个成功的结果 return scope.result(); }}
复制代码
任何子任务成功的瞬间,作用域就会取消其他任务。这种模式非常适合对延迟敏感的操作,即您需要竞速多个来源。
自定义关闭策略
在实践中,大多数 StructuredTaskScope 的使用不会直接使用 StructuredTaskScope 类,而是使用实现了关闭策略的两个子类之一,或者编写自定义子类来实现自定义关闭策略。
以下是一个收集所有成功结果并忽略失败的自定义策略:
class AllSuccessesScope<T> extends StructuredTaskScope<T> { private final List<T> results = Collections.synchronizedList(new ArrayList<>());
@Override protected void handleComplete(Subtask<? extends T> subtask) { if (subtask.state() == Subtask.State.SUCCESS) { results.add(subtask.get()); } }
public List<T> getResults() { return List.copyOf(results); }}
// 用法List<Data> collectAll() throws InterruptedException { try (var scope = new AllSuccessesScope<Data>()) { for (String source : dataSources) { scope.fork(() -> fetchData(source)); } scope.join(); return scope.getResults(); }}
复制代码
虚拟线程:完美搭档
虚拟线程提供了大量的线程——结构化并发可以正确且健壮地协调它们,并使可观测性工具能够按开发人员理解的方式显示线程。这种组合非常强大,因为虚拟线程使得创建数百万个线程的成本很低,而结构化并发则确保您能安全地管理它们。
// 现在启动 10,000 个并发任务是可行的try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { for (int i = 0; i < 10_000; i++) { final int taskId = i; scope.fork(() -> processTask(taskId)); } scope.join().throwIfFailed();}
复制代码
使用平台线程,这将是灾难性的。但使用虚拟线程和结构化并发,这变得简单而安全。
模块系统考量
在使用结构化并发构建模块化应用程序时,理解 Java 的模块系统变得很重要。对于模块,反射失去了其"超能力",并且受限于与编译代码完全相同的可访问性规则——它只能访问导出包中公共类的公共成员。
默认情况下,只有 module-info.java 中显式导出的包是可见的。如果您使用的是依赖反射的框架(如 Spring 或 Hibernate),您将需要额外的声明:
module com.example.app { // 用于编译时访问的常规导出 exports com.example.api;
// 为运行时反射访问开放 opens com.example.entities to org.hibernate.orm.core;
requires java.base; requires org.hibernate.orm.core;}
复制代码
在编译时,开放的包完全被封装,就像该指令不存在一样,但在运行时,包的类型可用于反射,自由地与所有类型和成员(无论公开与否)交互。
为了在所有包上获得完整的反射访问权限,您可以声明一个开放模块:
open module com.example.app { exports com.example.api; requires java.base;}
复制代码
开放模块会开放其包含的所有包,就像每个包都单独在 opens 指令中使用一样,这很方便但降低了封装性。
可观测性和调试
结构化并发显著提高了可观测性。线程转储现在显示了清晰的父子关系:
jcmd <pid> Thread.dump_to_file -format=json output.json
复制代码
JSON 输出揭示了 StructuredTaskScope 及其在数组中的分叉子任务,使得理解正在运行的内容及其原因变得容易。这与关系隐式的扁平线程转储相比,是一种变革。
当前状态与演进
结构化并发由 JEP 428 提出,并在 JDK 19 中作为孵化 API 交付,在 JDK 20 中重新孵化,通过 JEP 453 在 JDK 21 中首次预览,并在 JDK 22 和 23 中重新预览。截至 JDK 25,该 API 已经演进,使用静态工厂方法替代了公共构造函数。
要在当前 JDK 版本中使用结构化并发,需启用预览特性:
# 编译javac --release 21 --enable-preview MyApp.java
# 运行java --enable-preview MyApp
复制代码
基于真实世界的反馈,该 API 正在稳定下来。结构化并发已被证明是一种安全、富有表现力且易于理解的并发方法,Python 库率先开创了这一领域,随后是 Kotlin 等语言。
最佳实践
始终使用 Try-With-Resources:必须关闭作用域以防止线程泄漏。切勿手动管理 StructuredTaskScope 的生命周期。
选择正确的策略:当所有结果都重要时使用 ShutdownOnFailure,在竞速场景中使用 ShutdownOnSuccess,或者为特定需求实现自定义策略。
与虚拟线程结合使用:结构化并发与虚拟线程结合时效果最佳,能够通过简单的代码实现大规模并发。
避免共享可变状态:虽然结构化并发处理协调,但您仍然需要对共享数据的线程安全负责。
考虑作用域值:为了在任务层次结构中传递上下文,作用域值(JEP 481)提供了比 ThreadLocal 更好的替代方案。
真实示例:聚合用户数据
让我们构建一个从多个来源聚合数据的完整示例:
public class UserAggregator { record UserData(User user, List<Order> orders, Stats stats, Recommendations recs) {}
public UserData aggregate(String userId) throws ExecutionException, InterruptedException { try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { Supplier<User> user = scope.fork(() -> userService.fetch(userId)); Supplier<List<Order>> orders = scope.fork(() -> orderService.fetch(userId)); Supplier<Stats> stats = scope.fork(() -> statsService.compute(userId)); Supplier<Recommendations> recs = scope.fork(() -> mlService.recommend(userId));
scope.join().throwIfFailed();
return new UserData( user.get(), orders.get(), stats.get(), recs.get() ); } }}
复制代码
这种模式简洁、安全且高效。如果任何服务失败,所有其他服务会立即被取消。作用域确保适当的清理。并且借助虚拟线程,这可以扩展到数千个并发请求。
开发者观点
Java 架构师决定不从 fork 方法返回 Future 实例,以避免与非结构化计算混淆,并与旧的并发模型进行清晰切割。这一设计决策强调了结构化并发是一种新的范式,而不仅仅是渐进式改进。
Rock the JVM 教程指出,结构化并发最终为 Java 带来了其他 JVM 语言通过 Kotlin 协程和 Scala Cats Effects Fibers 等库所提供的功能,但拥有官方的平台支持。
展望未来
结构化并发代表了我们对并发编程思考方式的根本转变。我们不是管理单个线程和 Future,而是按层次结构组织并发工作——就像我们用方法和循环组织顺序代码一样。
好处是显而易见的:没有线程泄漏、正确的错误传播、协调的取消以及增强的可观测性。结合虚拟线程,Java 现在提供了一个既强大又易于使用的并发模型。
随着该 API 走向最终化,预计将在框架和库中得到更广泛的采用。Spring、Hibernate 及其他生态系统项目已经在考虑如何利用结构化并发来编写更清晰、更可靠的并发代码。
【注】本文译自:Structured Concurrency Patterns in Java
评论