c final int EXCEPTION = 2;
/**
* 任务执行的结果【本篇文章由公众号“彤哥读源码”原创】
* 如果执行正常,返回结果为T
* 如果执行异常,返回结果为Exception
*/
private Object result;
再次,如果get()在run()之前执行,那就需要阻塞等待run()执行完毕才能拿到返回值,所以需要保存调用者(主线程),get()的时候park阻塞住,run()完成了unpark唤醒它来拿返回值。
/**
* 调用者线程
* 也可以使用volatile+Unsafe实现CAS操作
*/
private AtomicReference<Thread> caller = new AtomicReference<>();
然后,我们先来看看run()方法的逻辑,它其实就是先执行真正的任务,然后修改状态为完成,并保存任务的返回值,如果保存了主线程,还要唤醒它。
@Override
public void run() {
// 如果状态不是NEW,说明执行过了,直接返回
if (state.get() != NEW) {
return;
}
try {
// 执行任务【本篇文章由公众号“彤哥读源码”原创】
T r = task.call();
// CAS更新state的值为FINISHED
// 如果更新成功,就把r赋值给result
// 如果更新失败,说明state的值不为NEW了,也就是任务已经执行过了
if (state.compareAndSet(NEW, FINISHED)) {
this.result = r;
// finish()必须放在修改state里面,见下面的分析
finish();
}
} catch (Exception e) {
// 如果CAS更新state的值为EXCEPTION成功,就把e赋值给result
// 如果CAS更新失败,说明state的值不为NEW了,也就是任务已经执行过了
if (state.compareAndSet(NEW, EXCEPTION)) {
this.result = e;
// finish()必须放在修改state里面,见下面的分析
finish();
}
}
}
private void finish() {
// 检查调用者是否为空,如果不为空,唤醒它
// 调用者在调用get()方法的进入阻塞状态
for (Thread c; (c = caller.get()) != null;) {
if (caller.compareAndSet(c, null)) {
LockSupport.unpark(c);
}
}
}
最后,我们再看看get()方法,如果任务还未执行,就阻塞等待任务的执行;如果任务已经执行完毕了,直接拿返回值即可;但是,还有一种情况,get()方法执行的过程中run()方法也在执行,所以get()方法中的每一步都要检查状态的值有没有变化。
@Override
public T get() {
int s = state.get();
// 如果任务还未执行完成,判断当前线程是否要进入阻塞状态
if (s == NEW) {
// 标识调用者线程是否被标记过
boolean marked = false;
for (;;) {
// 重新获取state的值
s = state.get();
// 如果state大于NEW说明完成了,跳出循环
if (s > NEW) {
break;
// 此处必须把caller的CAS更新和park()方法分成两步处理,不能把park()放在CAS里面
} else if (!marked) {
// 尝试更新调用者线程
// 试想断点停在此处【本篇文章由公众号“彤哥读源码”原创】
// 此时state为NEW,让run()方法执行到底,它不会执行finish()中的unpark()方法
// 这时打开断点,这里会更新caller成功,但是循环从头再执行一遍发现state已经变了,
// 直接在上面的if(s>NEW)处跳出循环了,因为finish()在修改state内部
marked = caller.compareAndSet(null, Thread.currentThread());
} else {
// 调用者线程更新之后park当前线程
// 试想断点停在此处
// 此时state为NEW,让run()方法执行到底,因为上面的caller已经设置值了,
// 所以会执行finish()方法中的unpark()方法,
// 这时再打开断点,这里不会park信
// 见unpark()方法的注释,上面写得很清楚:
// 如果线程执行了park()方法,那么执行unpark()方法会唤醒那个线程
// 如果先执行了unpark()方法,那么线程下一次执行park()方法将不会阻塞
LockSupport.park();
}
}
}
if (s == FINISHED) {
return (T) result;
}
throw new RuntimeException((Throwable) result);
}
在我们的实现中,如果任务执行的过程抛出异常了,也是通过result返回给主线程,这样主线程就拿到了这个异常,它就可以做相应的处理了。
好了,完整的实现到此结束,不知道你领悟了没有。
测试用例
最后奉上测试代码:
public class MyThreadPoolFutureExecutorTest {
public static void main(String[] args) {
FutureExecutor threadPool = new MyThreadPoolFutureExecutor("test", 2, 4, new ArrayBlockingQueue<>(6), new DiscardRejectPolicy());
List<Future<Integer>> list = new ArrayList<>();
for (int i = 0; i < 100; i++) {
int num = i;
Future<Integer> future = threadPool.submit(() -> {
Thread.sleep(1000);
System.out.println("running: " + num);
return num;
});
list.add(future);
}
for (Future<Integer> future : list) {
System.out.println("runned: " + future.get());
}
}
}
运行结果:
thread name: core_test2
thread name: test4
thread name: test3
discard one task
thread name: core_test1
discard one task
...省略被拒绝的任务
【本篇文章由公众号“彤哥读源码”原创】
discard one task
running: 0
running: 1
running: 8
running: 9
runned: 0
runned: 1
running: 4
running: 2
runni