ueue.remove(r)) {
taskList.add(r);
}
}
}
return taskList;
}
- shutdownNow方法在入口处使用mainLock加锁后,与shutdown方法一样也通过checkShutdownAccess检查当前是否有权限访问工作线程(前提是设置了SecurityManager),如果无权限则会抛出SecurityException异常。
- 通过advanceRunState方法将线程池状态推进到STOP。
- 通过interruptWorkers使用中断指令(Thread.interrupt)唤醒所有工作线程(区别于shutdown中的interruptIdleWorkers)。区别在于除了idle的工作线程,所有正在执行任务的工作线程也会收到中断通知,期望其能尽快退出任务的执行。
- 通过drainQueue方法将当前工作线程中剩余的所有任务以List的形式统一返回给调用者。
- 通过调用tryTerminate方法尝试终止线程池。
如何保证线程池在中止后不能再受理新的任务?
在execute方法作为入口,提交任务的逻辑中,v2版本相比v1版本新增了一些基于线程池状态的校验(和jdk的实现保持一致了)。
execute方法中的校验
- 首先在execute方法中,向工作队列加入新任务前(workQueue.offer)对当前线程池的状态做了一个校验(isRunning(currentCtl))。希望非RUNNING状态的线程池不向工作队列中添加新任务
但在做该检查时可能与shutdown/shutdownNow内推进线程池状态的逻辑并发执行,所以在工作队列成功加入任务后还需要再检查一次线程池状态,如果此时已经不是RUNNING状态则需要通过remove方法将刚入队的任务从队列中移除,并调用reject方法(拒绝策略)
addWorker方法中的校验
- 在addWorker方法的入口处(retry:第一层循环通过(runState >= SHUTDOWN && !(runState == SHUTDOWN && firstTask == null && !workQueue.isEmpty())))逻辑,
保证了不是RUNNING状态的线程池(runState >= SHUTDOWN),无法创建新的工作线程(addWorker返回false)。
但有一种特殊情况:即SHUTDOWN状态下(runState == SHUTDOWN),工作队列不为空(!workQueue.isEmpty()),且不是第一次提交任务时创建新工作线程(firstTask == null),
依然允许创建新的工作线程,因为即使在SHUTDOWN状态下,某一存活的工作线程发生中断异常时,会调用processWorkerExit方法,在销毁原有工作线程后依然需要调用addWorker重新创建一个新的(firstTask == null)
execute与shutdown/shutdownNow并发时的处理
execute提交任务时addWorker方法和shutdown/shutdownNow方法是可能并发执行的,但addWorker中有多处地方都对线程池的状态进行了检查,尽最大的可能避免线程池停止时继续创建新的工作线程。
- retry循环中,compareAndIncrementWorkerCount方法会cas的更新状态(此前获取到的ctl状态必然是RUNNING,否则走不到这里),cas成功则会跳出retry:循环( break retry;)。
而cas失败可能有两种情况:
如果是workerCount发生了并发的变化,则在内层的for (;;)循环中进行重试即可
如果线程池由于收到终止指令而推进了状态,则随后的if (runStateOf(currentCtl) != runState)将会为true,跳出到外层的循环重试(continue retry)
- 在new Worker(firstTask)后,使用mainLock获取锁后再一次检查线程池状态(if (runState < SHUTDOWN ||(runState == SHUTDOWN && firstTask == null)))。
由于shutdown、shutdownNow也是通过mainLock加锁后才推进的线程池状态,因此这里获取到的状态是准确的。
如果校验失败(if结果为false),则workers中不会加入新创建的工作线程,临时变量workerAdded=false,则工作线程不会启动(t.start())。临时变量workerStarted也为false,最后会调用addWorkerFailed将新创建的工作线程回收掉(回滚)
基于execute方法和addWorker方法中关于各项关于线程池停止状态校验,最大程度的避免了线程池在停止过程中新任务的提交和可能的新工作线程的创建。使得execute方法在线程池接收到停止指令后(>=SHUTDOWN),最终都会去执行reject拒绝策略逻辑。
/**
* 提交任务,并执行
* */
@Override
public void execute(Runnable command) {
if (command == null){
throw new NullPointerException("command参数不能为空");
}
int currentCtl = this.ctl.get();
if (workerCountOf(currentCtl) < this.corePoolSize) {
// 如果当前存在的worker线程数量低于指定的核心线程数量,则创建新的核心线程
boolean addCoreWorkerSuccess = addWorker(command,true);
if(addCoreWorkerSuccess){
// addWorker添加成功,直接返回即可
return;
}
// addWorker失败了
// 失败的原因主要有以下几个:
// 1 线程池的状态出现了变化,比如调用了shutdown/shutdownNow方法,不再是RUNNING状态,停止接受新的任务
// 2 多个线程并发的execute提交任务,导致cas失败,重试后发现当前线程的个数已经超过了限制
// 3 小概率是ThreadFactory线程工厂没有正确的返回一个Thread
// 获取最新的ctl状态
currentCtl = this.ctl.get();
}
// 走到这里有两种情况
// 1 因为核心线程超过限制(workerCountOf(currentCtl) < corePoolSize == false),需要尝试尝试将任务放入阻塞队列
// 2 addWorker返回false,创建核心工作线程失败
// 判断当前线程池状态是否为running
// 如果是running状态,则进一步执行任务入队操作
if(isRunning(currentCtl) && this.workQueue.offer(command)){
// 线程池是running状态,且w