);
}
System.out.println("end");
设置超时时间,然后会将错误信息打印出来。
工厂方法supplyAsync创建CompletableFuture
使用工厂方法可以一句话来创建getPriceAsync方法
public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(() -> getPrice(product));
}
supplyAsync方法接受一个生产者(Supplier)作为参数,返回一个CompletableFuture对象,该对象完成异步执行后悔读取调用生产者方法的返回值。生产者方法会交由ForkJoinPool池中的某个执行线程(Executor)运行,也可以调用supplyAsync方法的重载版本,传入第二个参数指定不同的线程执行生产者方法。 工厂方法返回的CompletableFuture对象也提供了同样的错误处理机制。
阻塞优化
例如现在有一个商品列表,然后输出一个字符串 商品名,价格 。
List<Shop> shops = Arrays.asList(
new Shop("one"),
new Shop("two"),
new Shop("three"),
new Shop("four"));
long start = System.nanoTime();
List<String> str = shops.stream().map(shop -> String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName()))).collect(toList());
System.out.print(str);
long end = System.nanoTime();
System.out.print((end - start) / 1000000);
[one price: 161.83, two price: 126.04, three price: 153.20, four price: 166.06]
4110
每次调用getPrice方法都会阻塞1秒钟,对付这种我们可以使用并行流来进行优化:
List<String> str = shops.parallelStream().map(shop -> String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName()))).collect(toList());
1137
明显速度提升了,现在对四个商品查询 实现了并行,所以只耗时1秒多点,下面我们尝试CompletableFuture:
List<CompletableFuture<String>> str2 = shops.stream().map(shop->
CompletableFuture.supplyAsync(
()->String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName())))).collect(toList());
我们使用工厂方法supplyAsync创建CompletableFuture对象,使用这种方式我们会得到一个List<CompletableFuture<String>>,列表中的每一个ComplatableFuture对象在计算完成后都会包含商品的名称。但是我们要求返回的是List<String>,所以需要等待所有的future执行完毕,再将里面的值提取出来,填充到列表中才能返回。
List<String> str3 =str2.stream().map(CompletableFuture::join).collect(toList());
为了返回List<String> 需要对str2添加第二个map操作,对List中的所有future对象执行join操作,一个接一个的等待他们的运行结束。CompletableFuture类中的join和Future接口中的get方法有相同的含义,并且声明在Future接口中,唯一的不同是join不会抛出任何检测到的异常。
1149
现在使用了两个不同的Stream流水线,而不是在同一个处理流的流水线上一个接一个的防治两个map操作。考虑流操作之间的延迟特性,如果你在单一流水线中处理流,发向不同商家的请求只能以同步、顺序执行的方式才会成功。因此每个创建CompletableFuture对象只能在前一个操作结束之后,再join返回计算结果。
更好的解决方式
并行流的版本工作的非常好,那是因为他可以并行处理8个任务,获取操作系统线程数量:
System.out.print(Runtime.getRuntime().availableProcessors());
但是如果列表是9个呢?那么执行结果就会2秒。因为他最多只能让8个线程处于繁忙状态。 但是使用CompletableFuture允许你对执行器Executor进行配置,尤其是线程池的大小,这是并行流API无法实现的。
定制执行器
//创建一个线程池,线程池的数目为100何商店数目二者中较小的一个值
&n