Java - ThreadPoolExecutor源码分析
1. 为什么要自定义线程池
首先ThreadPoolExecutor中,一共提供了7个参数,每个参数都是非常核心的属性,在线程池去执行任务时,每个参数都有决定性的作用。
但是如果直接采用JDK提供的方式去构建,可见设置的核心参数最多就两个,这样就会导致对线程池的控制粒度很粗。所以在阿里规范中也推荐自己创建自定义线程池。
自定义构建线程池,可以细粒度的控制线程池,去管理内存的属性,并且针对一些参数的设置可能更好的在后期排查问题。
ThreadPoolExecutor 七大核心参数:
public ThreadPoolExecutor(int corePoolSize, // 核心工作线程(当前任务执行结束后,不会销毁) int maximumPoolSize, // 最大工作线程(代表当前线程池中一共可以有多少工作线程) long keepAliveTime, // 非核心工作线程在阻塞队列位置等待时间 TimeUnit unit, // 非核心工作线程在阻塞队列位置等待时间的单位 BlockingQueue<Runnable> workQueue, // 任务在没有核心工作线程处理时,任务先到阻塞队列中 ThreadFactory threadFactory, // 构建线程的线程工厂,可以自定义thread信息 RejectedExecutionHandler handler) // 当线程池无法处理处理任务时,执行拒绝策略
2.ThreadPoolExecutor应用
JDK提供的几种拒绝策略:
- AbortPolicy: 当前拒绝策略会在无法执行任务时,直接抛出一个异常
public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
- CallerRunsPolicy: 当前拒绝策略会在无法执行任务时,将任务交给调用者处理
public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
- DiscardPolicy:当前拒绝策略会在无法执行任务时,直接将任务丢弃
public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
- DiscardOldestPolicy: 当前拒绝策略会在无法执行任务时,将阻塞队列中最早的任务丢弃,将当前任务再次交接线程池处理
public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
- 当然也可以自定义拒绝策略,根据自己业务修改实现逻辑, 只需实现 RejectedExecutionHandler 类中的 rejectedExecution 方法。
3. ThreadPoolExecutor的核心属性
线程池的核心属性就是ctl,它会基于ctl拿到线程池的状态以及工作线程个数。
// 当前线程的核心属性 // 当前的ctl其实就是一个int类型的数值,内部是基于AtomicInteger套了一层,进行运算时,是原子操作 // ctl表示线程池的两个