onStage填写每个汽车的评分,通过rating(manufacturerId)
返回一个CompletionStage, 它会异步地获取汽车的评分(可能又是一个REST API调用)
当所有的汽车填好评分后,我们结束这个列表,所以我们调用allOf
得到最终的阶段, 它在前面阶段所有阶段完成后才完成。
在最终的阶段调用whenComplete()
,我们打印出每个汽车和它的评分。
cars().thenCompose(cars -> {
List<CompletionStage> updatedCars = cars.stream()
.map(car -> rating(car.manufacturerId).thenApply(r -> {
car.setRating(r);
return car;
})).collect(Collectors.toList());
CompletableFuture done = CompletableFuture
.allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()]));
return done.thenApply(v -> updatedCars.stream().map(CompletionStage::toCompletableFuture)
.map(CompletableFuture::join).collect(Collectors.toList()));
}).whenComplete((cars, th) -> {
if (th == null) {
cars.forEach(System.out::println);
} else {
throw new RuntimeException(th);
}
}).toCompletableFuture().join();
因为每个汽车的实例都是独立的,得到每个汽车的评分都可以异步地执行,这会提高系统的性能(延迟),而且,等待所有的汽车评分被处理使用的是allOf
方法,而不是手工的线程等待(Thread#join() 或 a CountDownLatch)。
这些例子可以帮助你更好的理解相关的API,你可以在github上得到所有的例子的代码。
其它参考文档