设为首页 加入收藏

TOP

扩展 ThreadPoolExecutor 的一种办法(二)
2017-12-18 12:36:55 】 浏览:414
Tags:扩展 ThreadPoolExecutor 办法
eue) { final TaskQueue queue = (TaskQueue)taskQueue; if (!queue.forceTaskIntoQueue(command)) { submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException("队列已满"); } } else { submittedTaskCount.decrementAndGet(); throw rx; } } } }

TaskQueue

package executer;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;

public class TaskQueue extends LinkedBlockingQueue<Runnable> {
    private EnhancedThreadPoolExecutor executor;

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public void setExecutor(EnhancedThreadPoolExecutor exec) {
        executor = exec;
    }

    public boolean forceTaskIntoQueue(Runnable o) {
        if (executor.isShutdown()) {
            throw new RejectedExecutionException("Executor已经关闭了,不能将task添加到队列里面");
        }
        return super.offer(o);
    }

    @Override
    public  boolean offer(Runnable o) {
        int currentPoolThreadSize = executor.getPoolSize();
        //如果线程池里的线程数量已经到达最大,将任务添加到队列中
        if (currentPoolThreadSize == executor.getMaximumPoolSize()) {
            return super.offer(o);
        }
        //说明有空闲的线程,这个时候无需创建core线程之外的线程,而是把任务直接丢到队列里即可
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            return super.offer(o);
        }

        //如果线程池里的线程数量还没有到达最大,直接创建线程,而不是把任务丢到队列里面
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }

        return super.offer(o);
    }
}

TestExecuter

package executer;

import java.util.concurrent.TimeUnit;

public class TestExecuter {
    private static final int CORE_SIZE = 5;

    private static final int MAX_SIZE = 10;

    private static final long KEEP_ALIVE_TIME = 30;

    private static final int QUEUE_SIZE = 5;

    static EnhancedThreadPoolExecutor executor = new EnhancedThreadPoolExecutor(CORE_SIZE,MAX_SIZE,KEEP_ALIVE_TIME, TimeUnit.SECONDS , new TaskQueue(QUEUE_SIZE));

    public static void main(String[] args){
        for (int i = 0; i < 15; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.currentThread().sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });

            System.out.println("线程池中现在的线程数目是:"+executor.getPoolSize()+",  队列中正在等待执行的任务数量为:"+ executor.getQueue().size());
        }
    }
}

先运行一下代码,看看是否如何预期。直接执行TestExecuter类中的main方法,运行结果如下:

线程池中现在的线程数目是:1,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:2,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:3,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:4,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:5,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:6,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:7,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:8,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:9,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:10,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:10,  队列中正在等待执行的任务数量为:1
线程池中现在的线程数目是:10,  队列中正在等待执行的任务数量为:2
线程池中现在的线程数目是:10,  队列中正在等待执行的任务数量为:3
线程池中现在的线程数目是:10,  队列中正在等待执行的任务数量为:4
线程池中现在的线程数目是:10,  队列中正在等待执行的任务数量为:5

可以看到当线程数增加到core数量的时候,队列中是没有任务的。一直到线程数量增加到MAX数量,也即是10的时候,队列中才开始有任务。符合我们的预期。

如果我们注释掉TaskQueue类中的offer方法,也即是不覆盖队列的offer方法,那么运行结果如下:

线程池中现在的线程数目是:1,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:2,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:3,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:4,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:5,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:5,  队列中正在等待执行的任务数量为:1
线程池中现
首页 上一页 1 2 3 下一页 尾页 2/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Java 线程池框架核心代码分析 下一篇如何实现一个 Web Server

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目