orkQueue.offer入队成功
int recheck = this.ctl.get();
// 重新检查状态,避免在上面入队的过程中线程池并发的关闭了
// 如果是isRunning=false,则进一步需要通过remove操作将刚才入队的任务删除,进行回滚
if (!isRunning(recheck) && remove(command)) {
// 线程池关闭了,执行reject操作
reject(command);
} else if(workerCountOf(currentCtl) == 0){
// 在corePoolSize为0的情况下,当前不存在存活的核心线程
// 一个任务在入队之后,如果当前线程池中一个线程都没有,则需要兜底的创建一个非核心线程来处理入队的任务
// 因此firstTask为null,目的是先让任务先入队后创建线程去拉取任务并执行
addWorker(null,false);
}else{
// 加入队列成功,且当前存在worker线程,成功返回
return;
}
}else{
// 阻塞队列已满,尝试创建一个新的非核心线程处理
boolean addNonCoreWorkerSuccess = addWorker(command,false);
if(!addNonCoreWorkerSuccess){
// 创建非核心线程失败,执行拒绝策略(失败的原因和前面创建核心线程addWorker的原因类似)
reject(command);
}else{
// 创建非核心线程成功,成功返回
return;
}
}
}
/**
* 向线程池中加入worker
* */
private boolean addWorker(Runnable firstTask, boolean core) {
// retry标识外层循环
retry:
for (;;) {
int currentCtl = ctl.get();
int runState = runStateOf(currentCtl);
// Check if queue empty only if necessary.
// 线程池终止时需要返回false,避免新的worker被创建
// 1 先判断runState >= SHUTDOWN
// 2 runState >= SHUTDOWN时,意味着不再允许创建新的工作线程,但有一种情况例外
// 即SHUTDOWN状态下(runState == SHUTDOWN),工作队列不为空(!workQueue.isEmpty()),还需要继续执行
// 比如在当前存活的线程发生中断异常时,会调用processWorkerExit方法,在销毁原有工作线程后调用addWorker重新创建一个新的(firstTask == null)
if (runState >= SHUTDOWN && !(runState == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {
// 线程池已经是关闭状态了,不再允许创建新的工作线程,返回false
return false;
}
// 用于cas更新workerCount的内层循环(注意这里面与jdk的写法不同,改写成了逻辑一致但更可读的形式)
for (;;) {
// 判断当前worker数量是否超过了限制
int workerCount = workerCountOf(currentCtl);
if (workerCount >= CAPACITY) {
// 当前worker数量超过了设计上允许的最大限制
return false;
}
if (core) {
// 创建的是核心线程,判断当前线程数是否已经超过了指定的核心线程数
if (workerCount >= this.corePoolSize) {
// 超过了核心线程数,创建核心worker线程失败
return false;
}
} else {
// 创建的是非核心线程,判断当前线程数是否已经超过了指定的最大线程数
if (workerCount >= this.maximumPoolSize) {
// 超过了最大线程数,创建非核心worker线程失败
return false;
}
}
// cas更新workerCount的值
boolean casSuccess = compareAndIncrementWorkerCount(currentCtl);
if (casSuccess) {
// cas成功,跳出外层循环
break retry;
}
// 重新检查一下当前线程池的状态与之前是否一致
currentCtl = ctl.get(); // Re-read ctl
if (runStateOf(currentCtl) != runState) {
// 从外层循环开始continue(因为说明在这期间 线程池的工作状态出现了变化,需要重新判断)
continue retry;
}
// compareAndIncrementWorkerCount方法cas争抢失败,重新执行内层循环
}
}
boolean workerStarted = false;
boolean workerAdded = false;
MyWorker newWorker = null;
try {
// 创建一个新的worker
newWorker = new MyWorker(firstTask);
final Thread myWorkerThread = newWorker.thread;
if (myWorkerThread != null) {
// MyWorker初始化时内部线程创建成功
// 加锁,防止并发更新
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int runState = runStateOf(ctl.get());
// 重新检查线程池运行状态,满足以下两个条件的任意一个才创建新Worker
// 1 runState < SHUTDOWN
// 说明线程池处于RUNNING状态正常运行,可以创建新的工作线程
// 2 runState == SHUTDOWN && firstTask == null
// 说明线程池调用了shutdown,但工作队列不为空,依然需要新的Worker。
// firstTask == null标识着其不是因为外部提交新任务而创建新Worker,而是在消费SHUTDOWN前已提交的任务
if (runState < SHUTDOWN ||
(runState == SHUTDOWN && firstTask == null)) {
if (myWorkerThread.isAlive()) {
// 预检查线程的状态,刚初始化的worker线程必须是未唤醒的状态
throw new IllegalThreadStateException();
}
// 加入worker集合