l) {
//...省略非关键代码
if (workerAdded) {
t.start();//关键代码 2
workerStarted = true;
}
}
}
w = new Worker(firstTask)
创建一个新线程!把Worker作为this对象传进去,因为Worker implements Runnable,并且实现了java.lang.Runnable#run方法。
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;//
this.thread = getThreadFactory().newThread(this);
}
这意味着啥?执行java.lang.Runnable#run 就会去真正地执行 java.util.concurrent.ThreadPoolExecutor.Worker#run,那么java.lang.Runnable#run是被谁调用的呢?
聪明的你一定知道了,new Thread(Runnable).start()
执行时,会由jvm去自动调用java.lang.Runnable#run
所以,上面java.util.concurrent.ThreadPoolExecutor#addWorker 中的关键代码2 t.start();
,触发了java.util.concurrent.ThreadPoolExecutor.Worker#run的调用。
java.util.concurrent.ThreadPoolExecutor.Worker#run
里面只是调用了runWoker(this)
而已。
//java.util.concurrent.ThreadPoolExecutor.Worker#run
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
重点来了!再跟进去看看runWoker是何方神圣:
//java.util.concurrent.ThreadPoolExecutor#runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;//task 实际上是FutureTask类型的对象
w.firstTask = null;
try {
while (task != null || (task = getTask()) != null) {
//省略一些 非关键代码....
try {
beforeExecute(wt, task);//
try {
//重点代码!触发 java.util.concurrent.FutureTask#run 执行
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
//去看看afterExecute方法注释,无论线程执行过程中是否抛异常,afterExecute()都会 执行,看了源码,明白为什么是这样了,因为catch异常处理里面会执行afterExecute
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
看懂了java.util.concurrent.ThreadPoolExecutor#runWorker
几乎就明白线程池执行任务时的beforeExecute、afterExecute方法的所起的作用了(比如经常在afterExecute方法里面做一些线程池任务运行时间的统计工作)。
总结以下点:
Callable任务被submit时,会生成一个FutureTask对象,封装Callable,在FutureTask的run方法里面执行Callable#call方法,并且调用java.util.concurrent.FutureTask#set
设置Callable任务的执行结果(结果保存在一个FutureTask的Object类型的实例变量里面:private Object outcome;
)。
Future<Integer> future = executorService.submit(callable);
返回一个Future,它实际上是一个FutureTask对象,通过java.util.concurrent.FutureTask#get()
获取Callable任务的执行结果。
java.util.concurrent.FutureTask#run
方法是由java.util.concurrent.ThreadPoolExecutor#runWorker
触发调用的;而java.util.concurrent.ThreadPoolExecutor#runWorker
又是由java.util.concurrent.ThreadPoolExecutor.Worker#run
触发调用的;而java.util.concurrent.ThreadPoolExecutor.Worker#run
又是由java.util.concurrent.ThreadPoolExecutor#addWorker
里面的t.start();
这条语句触发调用的;而t.start();
会触发Runnable#run
方法的执行。这就是前面提到的这个原理:new Thread(Runnable).start()
会由jvm来调用Runnable#run。具体可参考:
用一个词表示就是多态。用一张图表示就是:
?
继承 ThreadPoolExecutor 实现自定义的线程池时,可重写 afterExecute()方法做一些异常处理逻辑的实现,不管任务正常执行完成、还是抛出异常,都会调用afterExecute(),具体可看JDK源码关于ThreadPoolExecutor#runWorker方法的注释。有兴趣可研究下ES SEARCH线程池源码就使用afterExecute来统计提交给线程池的每个任务的等待时间、执行时间,从而根据Little's law 自动调整线程池任务队列的长度:org.elasticsearch.common.util.concurrent.QueueResizingEsThreadPoolExecutor#afterExecute
最后,想说的是:Callable任务,到ThreadPoolExecutor线程池执行 层面,它实际上是一个Runnable任务在执行。因为,ExecutorService submit Callable时,其实是将Callable封装到FutureTask/RunnableFuture中,而RunnableFuture implements Runnable,因此可以提交给线程池的java.util.concurrent.ThreadPoolExecutor#execute(Runnable comman