this.workers.add(newWorker);
int workerSize = workers.size();
if (workerSize > largestPoolSize) {
// 如果当前worker个数超过了之前记录的最大存活线程数,将其更新
largestPoolSize = workerSize;
}
// 创建成功
workerAdded = true;
}
} finally {
// 无论是否发生异常,都先将主控锁解锁
mainLock.unlock();
}
if (workerAdded) {
// 加入成功,启动worker线程
myWorkerThread.start();
// 标识为worker线程启动成功,并作为返回值返回
workerStarted = true;
}
}
}finally {
if (!workerStarted) {
addWorkerFailed(newWorker);
}
}
return workerStarted;
}
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
// 当一个任务从工作队列中被成功移除,可能此时工作队列为空。尝试判断是否满足线程池中止条件
tryTerminate();
return removed;
}
如何保证中止过程中不丢失任务?
- 通过shutdown关闭线程池时,SHUTDOWN状态的线程池会等待所有剩余的任务执行完毕后再进入TIDYING状态。
- 通过shutdownNow关闭线程池时,以返回值的形式将剩余的任务吐出来还给用户
中止前已提交的任务不会丢失;而中止后线程池也不会再接收新的任务(走拒绝策略)。这两点共同保证了提交的任务不会丢失。
如何保证线程池最终关闭前,所有工作线程都已退出?
线程池在收到中止命令进入SHUTDOWN或者STOP状态时,会一直等到工作队列为空且所有工作线程都中止退出后才会推进到TIDYING阶段。
上面描述的条件是一个复合的条件,其只有在“收到停止指令(进入SHUTDOWN或者STOP状态)”、"工作队列中任务被移除或消费(工作队列为空)"或是“工作线程退出(所有工作线程都中止退出)”这三类事件发生时才有可能满足。
而判断是否满足条件并推进到TIDYING状态的关键就在tryTerminate方法中。tryTerminate顾名思义便是用于尝试终止线程池的,当上述任意事件触发时便判断是否满足终止条件,如果满足则将线程池推进到TIDYING阶段。
因此在ThreadPoolExecutor中tryTerminate一共在6个地方被调用,分别是shutdown、shutdownNow、remove、purge、addWorkerFailed和processWorkerExit方法。
- shutdown、shutdownNow方法触发收到停止指令的事件
- remove、purge方法触发工作队列中任务被移除的事件
- addWorkerFailed、processWorkerExit方法触发工作线程退出的事件
tryTerminate源码分析
/**
* 尝试判断是否满足线程池中止条件,如果满足条件,将其推进到最后的TERMINATED状态
* 注意:必须在任何可能触发线程池中止的场景下调用(例如工作线程退出,或者SHUTDOWN状态下队列工作队列为空等)
* */
final void tryTerminate() {
for (;;) {
int currentCtl = this.ctl.get();
if (isRunning(currentCtl)
|| runStateAtLeast(currentCtl, TIDYING)
|| (runStateOf(currentCtl) == SHUTDOWN && !workQueue.isEmpty())) {
// 1 isRunning(currentCtl)为true,说明线程池还在运行中,不满足中止条件
// 2 当前线程池状态已经大于等于TIDYING了,说明之前别的线程可能已经执行过tryTerminate,且通过了这个if校验,不用重复执行了
// 3 当前线程池是SHUTDOWN状态,但工作队列中还有任务没处理完,也不满足中止条件
// 以上三个条件任意一个满足即直接提前return返回
return;
}
// 有两种场景会走到这里
// 1 执行了shutdown方法(runState状态为SHUTDOWN),且当前工作线程已经空了
// 2 执行了shutdownNow方法(runState状态为STOP)
// 这个时候需要令所有的工作线程都主动的退出来回收资源
if (workerCountOf(currentCtl) != 0) {
// 如果当前工作线程个数不为0,说明还有别的工作线程在工作中。
// 通过interruptIdleWorkers(true),打断其中的一个idle线程,尝试令其也执行runWorker中的processWorkerExit逻辑,并执行tryTerminate
// 被中断的那个工作线程也会执行同样的逻辑(getTask方法返回->processWorkerExit->tryTerminate)
// 这样可以一个接着一个的不断打断每一个工作线程,令其逐步的退出(比起一次性的通知所有的idle工作线程,这样相对平滑很多)
interruptIdleWorkers(ONLY_ONE);
return;
}
// 线程池状态runState为SHUTDOWN或者STOP,且存活的工作线程个数已经为0了
// 虽然前面的interruptIdleWorkers是一个一个中断idle线程的,但实际上有的工作线程是因为别的原因退出的(恰好workerCountOf为0了)
// 所以这里是可能存在并发的,因此通过mainLock加锁防止并发,避免重复的terminated方法调用和termination.signalAll方法调用
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// cas的设置ctl的值为TIDYING+工作线程个数0(防止与别的地方ctl并发更新)
if (ctl.compareAndSet(currentCtl, ctlOf(TIDYING, 0))) {
try {
// cas成功,调用terminated钩子函数
terminated();
} finally {
// 无论terminated钩子函数是否出现异常
// cas的设置ctl的值为TERMINATED最终态+工作线程个数0(防止与别的地方ctl并发更新)
ctl.set(ctlOf(TERMINATED, 0));
// 通知使用awaitTermination方法等待线程池关闭的其它线程(通过termination.await等待)
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// 如果上