谈谈 Callable 任务是怎么运行的?它的执行结果又是怎么获取的?
向线程池提交Callable任务,会创建一个新线程(执行任务的线程)去执行这个Callable任务,但是通过Future#get获取任务的执行结果是在提交任务的调用者线程中,那问题一:调用者线程如何获取执行任务的线程的结果?
在JDK中,有2种类型的任务,Runnable和Callable,但是具体到线程池执行任务的java.util.concurrent.ThreadPoolExecutor#execute(Runnable)
方法,它只接收Runnable任务,那问题二:Callable任务是提交给线程池后是如何执行的呢?
Callable 任务是怎么运行的?
import java.util.concurrent.*;
public class FutureTest {
public static void main(String[] args) {
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
//sleep 是为了调试方便
TimeUnit.SECONDS.sleep(4);
return 3;
}
};
//创建一个 ThreadPoolExecutor 对象
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future<Integer> future = executorService.submit(callable);
try {
Integer i = future.get();
System.out.println(i);
} catch (Exception e) {
System.out.println(e);
}
}
}
Future<Integer> future = executorService.submit(callable);
//java.util.concurrent.AbstractExecutorService#submit(java.util.concurrent.Callable<T>)
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
//FutureTask其实是个RunnableFuture, RunnableFuture其实是个Runnable
//重点是: Runnable#run方法的执行,其实就是 FutureTask#run方法的执行!!!
RunnableFuture<T> ftask = newTaskFor(task);
//java.util.concurrent.ThreadPoolExecutor#execute
execute(ftask);
return ftask;
}
RunnableFuture<T> ftask = newTaskFor(task);
//java.util.concurrent.AbstractExecutorService#newTaskFor(java.util.concurrent.Callable<T>)
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
当submit一个Callable任务时,会生成一个RunnableFuture接口对象,默认情况下 RunnableFuture对象是一个FutureTask对象。看java.util.concurrent.AbstractExecutorService
类的源码注释:我们也可以重写 newTaskFor 方法生成我们自己的 RunnableFuture。一个具体的示例可参考ES源码org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor#newTaskFor(java.util.concurrent.Callable<T>)
,它就重写了 newTaskFor 方法,实现了执行优先级任务时,获取任务执行结果的逻辑。
the implementation of submit(Runnable) creates an associated RunnableFuture that is executed and returned. Subclasses may override the newTaskFor methods to return RunnableFuture implementations other than FutureTask
然后再来看FutureTask这个类的run()方法:java.util.concurrent.FutureTask#run
,它会触发执行我们定义的Callable#call()方法。搞清楚java.util.concurrent.FutureTask#run方法是怎么被调用的,就搞清楚了线程池执行Callable任务的原理。该方法主要是做了2件事:
- 执行Callable#call方法,即:FutureTest.java中 我们定义的处理逻辑:返回一个Integer 3
- 设置任务的执行结果:
set(result)
java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable) 中execute(ftask)
提交任务(注意:FutureTask implements Runnable)
ThreadPoolExecutor是AbstractExecutorService具体实现类,因此最终会执行到:java.util.concurrent.ThreadPoolExecutor#execute提交任务。
//java.util.concurrent.ThreadPoolExecutor#execute, 重点看addWorker()实现
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
java.util.concurrent.ThreadPoolExecutor#addWorker 有2行代码很关键:
//java.util.concurrent.ThreadPoolExecutor#addWorker
try {
w = new Worker(firstTask);//关键代码1, firstTask 本质上是 FutureTask对象
final Thread t = w.thread;
if (t != nul