线程池ThreadPoolExecutor
ThreadPoolExecutor 继承结构
继承结构如图所示:ThreadPoolExecutor <- AbstractExecutorService <- ExecutorService <- Executor
public class ThreadPoolExecutor extends AbstractExecutorService {
//...
}
/**
* 实现了部分 ExecutorService 方法
* 1. submit 方法
* 2. invokeAny 方法
* 3. invokeAll 方法
*/
public abstract class AbstractExecutorService implements ExecutorService {
/**
* Callable -> FutureTask
* FutureTask<V> implements RunnableFuture<V>
* RunnableFuture<V> extends Future<V>, Runnable
*
* FutureTask Status:
* NEW(0): 初始状态, 任务刚被创建或者正在计算中
* COMPLETING(1): 中间状态, 任务计算完成正在对结果进行赋值,或者正在处理异常
* NORMAL(2): 终止状态, 任务计算完成, 结果已经完成赋值
* EXCEPTIONAL(3): 终止状态, 任务计算过程发生异常无法处理,线程中断
* CANCELLED(4): 终止状态, 任务计算过程被取消
* INTERRUPTING(5): 中间状态, 任务计算过程已开始并被中断,正在修改状态
* INTERRUPTED(6): 终止状态,任务计算过程已开始并被中断,且已经完全停止
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
// 提交 callable 任务
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
// 提交 runnable 任务,返回 null
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
// 提交 runnable 任务,返回 result
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
// invokeAll
// 为每一个任务创建对应的FutureTask, 并调用 execute 方法执行
// execute() 方法在 ThreadPoolExecutor 被实现
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
// 如何任务此时还未执行完成,则阻塞获取对应的值
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
// 执行过程抛出无法处理的异常
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
// 取消任务的执行,如果任务已经执行完成,则不受影响
futures.get(i).cancel(true);
}
}
// InvokeAny 方法逻辑待后续更新
}
/**
* 在 Executor 的基础上定义了一系列任务执行和线程池管理方法
*
* 1. submit: 提供方法执行带有返回值的任务
* 2. invokeAll: 提供方法执行指定的任务集合中的所有任务, 返回 List<Future<T>>
* 3. invokeAny: 提供方法执行指定的任务集合中的所有任务, 将第一个执行完成的任务的结果作为返回值, 并终止其他线程的执行
* 4. isShutDown/isTerminated: 判断线程池状态方法
* 5. shutdown: 不再接受新的任务, 待所有任务执行完毕后关闭线程池
* 6. shutdownNow: 不再接受新的任务,直接关闭线程池
*/
public interface ExecutorService extends Executor {
// ...
}
/**
* 只定义了一个 execute 方法, 执行 Runnable 任务
*/
public interface Executor {
void execute(Runnable command);
}