设为首页 加入收藏

TOP

jdk线程池ThreadPoolExecutor优雅停止原理解析(自己动手实现线程池)(二)(五)
2023-07-25 21:33:13 】 浏览:160
Tags:jdk 程池 ThreadPoolExecutor 解析
orkQueue.offer入队成功 int recheck = this.ctl.get(); // 重新检查状态,避免在上面入队的过程中线程池并发的关闭了 // 如果是isRunning=false,则进一步需要通过remove操作将刚才入队的任务删除,进行回滚 if (!isRunning(recheck) && remove(command)) { // 线程池关闭了,执行reject操作 reject(command); } else if(workerCountOf(currentCtl) == 0){ // 在corePoolSize为0的情况下,当前不存在存活的核心线程 // 一个任务在入队之后,如果当前线程池中一个线程都没有,则需要兜底的创建一个非核心线程来处理入队的任务 // 因此firstTask为null,目的是先让任务先入队后创建线程去拉取任务并执行 addWorker(null,false); }else{ // 加入队列成功,且当前存在worker线程,成功返回 return; } }else{ // 阻塞队列已满,尝试创建一个新的非核心线程处理 boolean addNonCoreWorkerSuccess = addWorker(command,false); if(!addNonCoreWorkerSuccess){ // 创建非核心线程失败,执行拒绝策略(失败的原因和前面创建核心线程addWorker的原因类似) reject(command); }else{ // 创建非核心线程成功,成功返回 return; } } }
/**
     * 向线程池中加入worker
     * */
    private boolean addWorker(Runnable firstTask, boolean core) {
        // retry标识外层循环
        retry:
        for (;;) {
            int currentCtl = ctl.get();
            int runState = runStateOf(currentCtl);

            // Check if queue empty only if necessary.
            // 线程池终止时需要返回false,避免新的worker被创建
            // 1 先判断runState >= SHUTDOWN
            // 2 runState >= SHUTDOWN时,意味着不再允许创建新的工作线程,但有一种情况例外
            // 即SHUTDOWN状态下(runState == SHUTDOWN),工作队列不为空(!workQueue.isEmpty()),还需要继续执行
            // 比如在当前存活的线程发生中断异常时,会调用processWorkerExit方法,在销毁原有工作线程后调用addWorker重新创建一个新的(firstTask == null)
            if (runState >= SHUTDOWN && !(runState == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {
                // 线程池已经是关闭状态了,不再允许创建新的工作线程,返回false
                return false;
            }

            // 用于cas更新workerCount的内层循环(注意这里面与jdk的写法不同,改写成了逻辑一致但更可读的形式)
            for (;;) {
                // 判断当前worker数量是否超过了限制
                int workerCount = workerCountOf(currentCtl);
                if (workerCount >= CAPACITY) {
                    // 当前worker数量超过了设计上允许的最大限制
                    return false;
                }
                if (core) {
                    // 创建的是核心线程,判断当前线程数是否已经超过了指定的核心线程数
                    if (workerCount >= this.corePoolSize) {
                        // 超过了核心线程数,创建核心worker线程失败
                        return false;
                    }
                } else {
                    // 创建的是非核心线程,判断当前线程数是否已经超过了指定的最大线程数
                    if (workerCount >= this.maximumPoolSize) {
                        // 超过了最大线程数,创建非核心worker线程失败
                        return false;
                    }
                }

                // cas更新workerCount的值
                boolean casSuccess = compareAndIncrementWorkerCount(currentCtl);
                if (casSuccess) {
                    // cas成功,跳出外层循环
                    break retry;
                }

                // 重新检查一下当前线程池的状态与之前是否一致
                currentCtl = ctl.get();  // Re-read ctl
                if (runStateOf(currentCtl) != runState) {
                    // 从外层循环开始continue(因为说明在这期间 线程池的工作状态出现了变化,需要重新判断)
                    continue retry;
                }

                // compareAndIncrementWorkerCount方法cas争抢失败,重新执行内层循环
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;

        MyWorker newWorker = null;
        try {
            // 创建一个新的worker
            newWorker = new MyWorker(firstTask);
            final Thread myWorkerThread = newWorker.thread;
            if (myWorkerThread != null) {
                // MyWorker初始化时内部线程创建成功

                // 加锁,防止并发更新
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();

                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int runState = runStateOf(ctl.get());

                    // 重新检查线程池运行状态,满足以下两个条件的任意一个才创建新Worker
                    // 1 runState < SHUTDOWN
                    // 说明线程池处于RUNNING状态正常运行,可以创建新的工作线程
                    // 2 runState == SHUTDOWN && firstTask == null
                    // 说明线程池调用了shutdown,但工作队列不为空,依然需要新的Worker。
                    // firstTask == null标识着其不是因为外部提交新任务而创建新Worker,而是在消费SHUTDOWN前已提交的任务
                    if (runState < SHUTDOWN ||
                            (runState == SHUTDOWN && firstTask == null)) {
                        if (myWorkerThread.isAlive()) {
                            // 预检查线程的状态,刚初始化的worker线程必须是未唤醒的状态
                            throw new IllegalThreadStateException();
                        }

                        // 加入worker集合
首页 上一页 2 3 4 5 6 7 下一页 尾页 5/7/7
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇数据库可视化工具分享 (DBeaver) 下一篇Spring AOP中增强Advice的执行顺序

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目