1 概要
通过引入结构化并发编程的API,简化并发编程。结构化并发将在不同线程中运行的相关任务组视为单个工作单元,从而简化错误处理和取消操作,提高可靠性,并增强可观察性。这是一个预览版的API。
2 历史
结构化并发是由JEP 428提出的,并在JDK 19中作为孵化API发布。它在JDK 20中被JEP 437重新孵化,通过对作用域值(JEP 429)进行轻微更新。
我们在这里提议将结构化并发作为JUC包中的预览API。唯一重要变化是StructuredTaskScope::fork(...)方法返回一个[子任务],而不是一个Future,如下面所讨论的。
3 目标
推广一种并发编程风格,可以消除由于取消和关闭而产生的常见风险,如线程泄漏和取消延迟。
提高并发代码的可观察性。
4 非目标
不替换JUC包中的任何并发构造,如ExecutorService和Future。
不定义Java平台的最终结构化并发API。其他结构化并发构造可以由第三方库定义,或在未来的JDK版本中定义。
不定义在线程之间共享数据流的方法(即通道)。会在未来提出这样做。
不用新的线程取消机制替换现有的线程中断机制。会在未来提出这样做。
5 动机
开发人员通过将任务分解为多个子任务来管理复杂性。在普通的单线程代码中,子任务按顺序执行。然而,如果子任务彼此足够独立,并且存在足够的硬件资源,那么通过在不同线程中并发执行子任务,可以使整个任务运行得更快(即具有较低的延迟)。例如,将多个I/O操作的结果组合成一个任务,如果每个I/O操作都在自己的线程中并发执行,那么任务将运行得更快。虚拟线程(JEP 444)使得为每个此类I/O操作分配一个线程成为一种具有成本效益的方法,但是管理可能会产生大量线程仍然是一个挑战。
6 ExecutorService 非结构化并发
java.util.concurrent.ExecutorService
API 是在 Java 5 中引入的,它帮助开发人员以并发方式执行子任务。
如下 handle()
的方法,它表示服务器应用程序中的一个任务。它通过将两个子任务提交给 ExecutorService
来处理传入的请求。
ExecutorService
立即返回每个子任务的 Future
,并根据 Executor 的调度策略同时执行这些子任务。handle()
方法通过阻塞调用它们的 Future
的 get()
方法来等待子任务的结果,因此该任务被称为加入了其子任务。
Response handle() throws ExecutionException, InterruptedException {
Future<String> user = esvc.submit(() -> findUser());
Future<Integer> order = esvc.submit(() -> fetchOrder());
String theUser = user.get(); // 加入 findUser
int theOrder = order.get(); // 加入 fetchOrder
return new Response(theUser, theOrder);
}
由于子任务并发执行,每个子任务都可独立地成功或失败。在这个上下文中,"失败" 意味着抛出异常。通常,像 handle()
这样的任务应该在任何一个子任务失败时失败。当出现失败时,理解线程的生命周期会变得非常复杂:
-
如
findUser()
抛异常,那么调用user.get()
时handle()
也会抛出异常,但是fetchOrder()
会继续在自己的线程中运行。这是线程泄漏,最好情况下浪费资源,最坏情况下fetchOrder()
的线程可能会干扰其他任务。 -
如执行
handle()
的线程被中断,这个中断不会传播到子任务。findUser()
和fetchOrder()
的线程都会泄漏,即使在handle()
失败后仍然继续运行。 -
如果
findUser()
执行时间很长,但是在此期间fetchOrder()
失败,那么handle()
将不必要地等待findUser()
,因为它会在user.get()
上阻塞,而不是取消它。只有在findUser()
完成并且user.get()
返回后,order.get()
才会抛出异常,导致handle()
失败。
每种case下,问题在于我们的程序在逻辑上被结构化为任务-子任务关系,但这些关系只存在于开发人员的头脑中。这不仅增加错误可能性,还会使诊断和排除此类错误变得更加困难。例如,线程转储等可观察性工具会在不相关的线程调用栈中显示 handle()
、findUser()
和 fetchOrder()
,而没有任务-子任务关系的提示。
可尝试在错误发生时显式取消其他子任务,例如通过在失败的任务的 catch 块中使用 try-finally 包装任务,并调用其他任务的 Future
的 cancel(boolean)
方法。我们还需要在 try-with-resources
语句中使用 ExecutorService
,就像
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
IntStream.range(0, 10_000).forEach(i -> {
executor.submit(() -> {
Thread.sleep(Duration.ofSeconds(1));
return i;
});
});
} // executor.close() is called implicitly, and waits
因为 Future
没有提供等待被取消的任务的方法。但所有这些都很难做到,并且往往会使代码的逻辑意图变得更加难以理解。跟踪任务之间的关系,并手动添加所需的任务间取消边缘,是对开发人员的一种很大要求。
无限制的并发模式
这种需要手动协调生命周期的需求是因为 ExecutorService
和 Future
允许无限制的并发模式。在涉及的所有线程中,没有限制或顺序:
- 一个线程可以创建一个
ExecutorService
- 另一个线程可以向其提交工作
- 执行工作的线程与第一个或第二个线程没有任何关系
线程提交工作之后,一个完全不同的线程可以等待执行的结果。具有对 Future
的引用的任何代码都可以加入它(即通过调用 get()
等待其结果),甚至可以在与获取 Future
的线程不同的线程中执行代码。实际上,由一个任务启动的子任务不必返回到提交它的任务。它可以返回给许多任务中的任何一个,甚至可能是没有返回给任何任务。
因为 ExecutorService
和 Future
允许这种无结构的使用,它们既不强制执行也不跟踪任务和子任务之间的关系,尽管这些关系是常见且有用的。因此,即使子任务在同一个任务中被提交和加入,一个子任务的失败也不能自动导致另一个子任务的取消。在上述的 handle()
方法中,fetchOrder()
的失败不能自动导致 findUser()
的取消。fetchOrder()
的 Future
与 findUser()
的 Future
没有关系,也与最终通过其 get()
方法加入它的线程无关。与其要求开发人员手动管理这种取消,我们希望能够可靠地自动化这一过程。