作者:京东科技 张天赐
前言
JDK 8 是一次重大的版本升级,新增了非常多的特性,其中之一便是 CompletableFuture
。自此从 JDK 层面真正意义上的支持了基于事件的异步编程范式,弥补了 Future
的缺陷。
在我们的日常优化中,最常用手段便是多线程并行执行。这时候就会涉及到 CompletableFuture
的使用。
常见使用方式
下面举例一个常见场景。
假如我们有两个 RPC 远程调用服务,我们需要获取两个 RPC 的结果后,再进行后续逻辑处理。
public static void main(String[] args) {
// 任务 A,耗时 2 秒
int resultA = compute(1);
// 任务 B,耗时 2 秒
int resultB = compute(2);
// 后续业务逻辑处理
System.out.println(resultA + resultB);
}
可以预估到,串行执行最少耗时 4 秒,并且 B 任务并不依赖 A 任务结果。
对于这种场景,我们通常会选择并行的方式优化,Demo 代码如下:
public static void main(String[] args) {
// 仅简单举例,在生产代码中可别这么写!
// 统计耗时的函数
time(() -> {
CompletableFuture<Integer> result = Stream.of(1, 2)
// 创建异步任务
.map(x -> CompletableFuture.supplyAsync(() -> compute(x), executor))
// 聚合
.reduce(CompletableFuture.completedFuture(0), (x, y) -> x.thenCombineAsync(y, Integer::sum, executor));
// 等待结果
try {
System.out.println("结果:" + result.get());
} catch (ExecutionException | InterruptedException e) {
System.err.println("任务执行异常");
}
});
}
输出:
[async-1]: 任务执行开始:1
[async-2]: 任务执行开始:2
[async-1]: 任务执行完成:1
[async-2]: 任务执行完成:2
结果:3
耗时:2 秒
可以看到耗时变成了 2 秒。
存在的问题
分析
看上去 CompletableFuture
现有功能可以满足我们诉求。但当我们引入一些现实常见情况时,一些潜在的不足便暴露出来了。
compute(x)
如果是一个根据入参查询用户某类型优惠券列表的任务,我们需要查询两种优惠券并组合在一起返回给上游。假如上游要求我们 2 秒内处理完毕并返回结果,但 compute(x)
耗时却在 0.5 秒 ~ 无穷大波动。这时候我们就需要把耗时过长的 compute(x)
任务结果放弃,仅处理在指定时间内完成的任务,尽可能保证服务可用。
那么以上代码的耗时由耗时最长的服务决定,无法满足现有诉求。通常我们会使用 get(long timeout, TimeUnit unit)
来指定获取结果的超时时间,并且我们会给 compute(x)
设置一个超时时间,达到后自动抛异常来中断任务。
public static void main(String[] args) {
// 仅简单举例,在生产代码中可别这么写!
// 统计耗时的函数
time(() -> {
List<CompletableFuture<Integer>> result = Stream.of(1, 2)
// 创建异步任务,compute(x) 超时抛出异常
.map(x -> CompletableFuture.supplyAsync(() -> compute(x), executor))
.toList();
// 等待结果
int res = 0;
for (CompletableFuture<Integer> future : result) {
try {
res += future.get(2, SECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
System.err.println("任务执行异常或超时");
}
}
System.out.println("结果:" + res);
});
}
输出:
[async-2]: 任务执行开始:2
[async-1]: 任务执行开始:1
[async-1]: 任务执行完成:1
任务执行异常或超时
结果:1
耗时:2 秒
可以看到,只要我们能够给 compute(x)
设置一个超时时间将任务中断,结合 get
、getNow
等获取结果的方式,就可以很好地管理整体耗时。
那么问题也就转变成了,如何给任务设置异步超时时间呢?
现有做法
当异步任务是一个 RPC 请求时,我们可以设置一个 JSF 超时,以达到异步超时效果。
当请求是一个 R2M 请求时,我们也可以控制 R2M 连接的最大超时时间来达到效果。
这么看好像我们都是在依赖三方中间件的能力来管理任务超时时间?那么就存在一个问题,中间件超时控制能力有限,如果异步任务是中间件 IO 操作 + 本地计算操作怎么办?
用 JSF 超时举一个具体的例子,反编译 JSF 的获取结果代码如下:
public V get(long timeout, TimeUnit unit) throws InterruptedException {
// 配置的超时时间
timeout = unit.toMillis(timeout);
// 剩余等待时间
long remaintime = timeout - (this.sentTime - this.genTime);
if (remaintime <= 0L) {
if (this.isDone()) {
// 反序列化获取结果
return this.getNow();
}
} else if (this.await(remaintime, TimeUnit.MILLISECONDS)) {
// 等待时间内任务完成,反序列化获取结果
return this.getNow();
}
this.setDoneTime();
// 超时抛出异常
throw this.clientTimeoutException(false);
}
当这个任务刚好卡在超时边缘完成时,这个任务的耗时时间就变成了超时时间 + 获取结果时间。而获取结果(反序列化)作为纯本地计算操作,耗时长短受 CPU 影响较大。
某些 CPU 使用率高的情况下,就会出现异步任务没能触发抛出异常中断,导致我们无法准确控制超时时间。对上游来说,本次请求全部失败。
解决方式
JDK 9
这类问题非常常见,如大促场景,服务器 CPU 瞬间升高就会出现