(手机横屏看源码更方便)
问题
(1)自己动手写的线程池如何支持带返回值的任务呢?
(2)如果任务执行的过程中抛出异常了该怎么处理呢?
简介
上一章我们自己动手写了一个线程池,但是它是不支持带返回值的任务的,那么,我们自己能否实现呢?必须可以,今天我们就一起来实现带返回值任务的线程池。
前情回顾
首先,让我们先回顾一下上一章写的线程池:
(1)它包含四个要素:核心线程数、最大线程数、任务队列、拒绝策略;
(2)它具有执行无返回值任务的能力;
(3)它无法处理有返回值的任务;
(4)它无法处理任务执行的异常(线程中的异常不会抛出到线程外);
那么,我们能不能在现有的基础上实现其下面两项能力呢?让我们一起来试一试吧!
有返回值和无返回值的任务到底有何不同?
答案很明显,就是一个有返回值,一个无返回值,用伪代码来表示就是下面这样:
// 无返回值
threadPool.execute(()->{
System.out.println(1);
});
// 有返回值,分两步走
// 1. 提交任务到线程池中
SomeClass result = threadPool.execute(()->{
System.out.println(1);
return 1;
});
// 2. 等待任务的结果返回
Object value = result.get();
无返回值的任务提交了就完事,主线程并不Care它到底有没有执行完,并不关心它是不是抛出异常,主线程Just提交线程到线程池中,其余什么都不管。
有返回值的任务就不一样了,主线程首先要提交任务到线程池中,它需要使用到任务执行的结果,所以它必须等待任务执行完毕才能拿到任务执行的结果。
那么,为什么不直接在execute的时候就等待任务执行完毕呢?这样的话那不就跟串行没啥区别了,还不如直接在主线程执行任务呢,还少了线程切换的资源消耗。
之所以要分成两步,是因为主线程并不一定需要立即获取返回值,在需要用到返回值的时候才去get,这样就可以在提交任务和获取返回值之间干些其它的事情,提高效率。
所以,提交任务的时候不需要阻塞,get返回值的时候才可能需要阻塞,如果get的时候任务已经执行完毕了,这时候也不需要阻塞,如果get的时候任务还未执行完毕,那就要阻塞等待任务执行完毕才能获取到返回值。
实现分析
首先,无返回值的任务我们直接使用的Runnable函数式接口,有返回值的任务有没有现成的接口呢?还真有,那就是Callable接口,它有个返回值。
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
其次,提交任务的时候需要有个返回值,它是在将来用来获取任务执行结果的,实际上它也是新任务的一种能力,可以使用它对任务进行包装,使其具有返回值的能力。
public interface Future<T> {
T get();
}
再次,我们需要给现有的线程池增加一种新的能力,根据单一职责原则,我们定义一个新的接口来承载这种能力。
public interface FutureExecutor extends Executor {
<T> Future<T> submit(Callable<T> command);
}
然后,我们需要一种新的任务,它既具有旧任务的执行能力(run()方法),又具有新任务的返回值能力(get()方法),所以我们造一个“将来的任务”对提交的任务进行包装,使其具有返回值的能力。
public class FutureTask<T> implements Runnable, Future<T> {
/**
* 真正的任务
*/
private Callable<T> task;
public FutureTask(Callable<T> task) {
this.task = task;
}
@Override
public void run() {
// 具体实现...
}
@Override
public T get() {
// 具体实现...
}
}
最后,我们只要对原有的线程池进行扩展,将提交的任务包装成“将来获取返回值的任务”,还是使用原来的方法去执行,然后返回这个将来的任务即可。
根据开闭原则,【本篇文章由公众号“彤哥读源码”原创】原来的代码我们不做任何修改,扩展新的子类来实现新的能力。
public class MyThreadPoolFutureExecutor extends MyThreadPoolExecutor implements FutureExecutor, Executor {
public MyThreadPoolFutureExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
super(name, coreSize, maxSize, taskQueue, rejectPolicy);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
// 包装成将来获取返回值的任务
FutureTask<T> futureTask = new FutureTask<>(task);
// 还是使用原来的执行能力
execute(futureTask);
// 返回将来的任务,只需要返回其get返回值的能力即可
// 所以这里返回的是Future而不是FutureTask类型
return futureTask;
}
}
好了,到这里整体的逻辑我们就已经比较清晰地实现完了,还剩下最关键的部分,这个“将来的任务”的两个能力要如何实现。
将来的任务
将来的任务,具有两个能力:一是执行真正任务的能力,二是将来获取返回值的能力。
public class FutureTask<T> implements Runnable, Future<T> {
@Override
public void run() {
// 具体实现...
}
@Override
public T get() {
// 具体实现...
}
}
首先,我们要明确一件事,任务的执行是线程池中,获取返回值是在主线程中,它们是在两个线程中执行的,而且谁先谁后我们无法确定。
其次,如果run()在get()之前执行,我们需要告诉get()任务已经执行完毕了,所以需要一个状态来通知这个事,还需要一个变量来承载任务执行的返回值。
/**
* 任务执行的状态,0未开始,1正常完成,2异常完成
* 也可以使用volatile+Unsafe实现CAS操作
*/
private AtomicInteger state = new AtomicInteger(NEW);
private static final int NEW = 0;
private static final int FINISHED = 1;
private stati