ount(c)) //CAS增加线程数
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
?
//上面的流程走完,就可以真实开始创建线程了
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); //这里创建了线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
?
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); //这里将线程加入到线程池中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); //添加成功,启动线程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w); //添加线程失败操作
}
return workerStarted;
}
小结:addWorker()方法主要功能;
-
增加线程数;
-
创建线程Worker实例加入线程池;
-
加入完成开启线程;
-
启动失败则回滚增加流程;
2.1.3.3、工作线程的实现
private final class Worker //Worker类是ThreadPoolExecutor的内部类
extends AbstractQueuedSynchronizer
implements Runnable
{
final Thread thread; //持有实际线程
Runnable firstTask; //worker所对应的第一个任务,可能为空
volatile long completedTasks; //记录执行任务数
?
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this); //当前线程调用ThreadPoolExecutor中的runWorker方法,在这里实现的线程复用
}
?
...继承AQS,实现了不可重入锁...
}
小结:工作线程Worker类主要功能;
-
此类持有一个工作线程,不断处理拿到的新任务,持有的线程即为可复用的线程;
-
此类可看作一个适配类,在run()方法中真实调用runWorker()方法不断获取新任务,完成线程复用;
2.1.3.4、线程的复用
final void runWorker(Worker w) { //ThreadPoolExecutor中的runWorker方法,在这里实现的线程复用
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true; //标识线程是否异常终止
try {
while (task != null || (task = getTask()) != null) { //这里会不断从任务队列获取任务并执行
w.lock();
//线程是否需要中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); //执行任务前的Hook方法,可自定义
Throwable thrown = null;
try {
task.run(); //执行实际的任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); //执行任务后的Hook方法,可自定义
}
} finally {
task = null; //执行完成后,将当前线程中的任务制空,准备执行下一个任务
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); //线程执行完成后的清理工作
}
}
小结:runWorker()方法主要功能;
-
循环从缓存队列中获取新的任务,直到没有任务为止;
-
使用worker持有的线程真实执行任务;
-
任务都执行完成后的清理工作;
2.1.3.5、队列中获取待执行任务
private Runnable getTask() {
boolean timedOut = false; //标识当前线程是否超时未能获取到task对象
?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
?
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
?
int wc = workerCountOf(c);
?
// Are workers subject to culling?
boolean timed = al