设为首页 加入收藏

TOP

JUC学习-线程池部分(一)
2023-07-25 21:42:50 】 浏览:62
Tags:JUC 学习 程池部

自定义线程池

package com.appletree24;  
  
import java.util.ArrayDeque;  
import java.util.Deque;  
import java.util.HashSet;  
import java.util.concurrent.ExecutionException;  
import java.util.concurrent.TimeUnit;  
import java.util.concurrent.locks.Condition;  
import java.util.concurrent.locks.ReentrantLock;  
  
class Main {  
    public static void main(String[] args) throws ExecutionException, InterruptedException {  
        ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MICROSECONDS, 5, (queue, task) -> {  
            //带超时等待  
//            queue.offer(task,500,TimeUnit.MILLISECONDS);  
        });  
        for (int i = 0; i < 10; i++) {  
            int j = i;  
            threadPool.execute(() -> {  
                try {  
                    Thread.sleep(1000L);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
                System.out.println(j);  
            });  
        }  
    }  
}  
  
//策略模式接口 此处使用策略模式是因为在实现拒绝策略时,有许多种拒绝的方式,这些方式如果不使用恰当的模式,就需要大量的if..else来编写  
//且方式数量大于4个,会造成类膨胀的问题,推荐使用混合模式  
//https://www.runoob.com/design-pattern/strategy-pattern.html  
@FunctionalInterface  
interface RejectPolicy<T> {  
    void reject(BlockingQueue<T> queue, T task);  
}  
  
class ThreadPool {  
    //任务队列  
    private BlockingQueue<Runnable> taskQueue;  
    //线程集合  
    private HashSet<Worker> workers = new HashSet<>();  
  
    //线程数  
    private int coreSize;  
  
    //超时时间  
    private long timeout;  
  
    private TimeUnit timeUnit;  
    private RejectPolicy<Runnable> rejectPolicy;  
  
    //执行任务  
    public void execute(Runnable task) {  
        //当任务数未超过核心线程数时,直接交给Worker对象执行  
        //如果超过,则加入阻塞任务队列,暂存起来  
        synchronized (workers) {  
            if (workers.size() < coreSize) {  
                Worker worker = new Worker(task);  
                workers.add(worker);  
                worker.start();  
            } else {  
                //第一种选择死等  
//                taskQueue.put(task);  
                //第二种为超时等待  
                //第三种为消费者放弃任务执行  
                //第四种为主线程抛出异常  
                //第五种让调用者自行执行任务  
                taskQueue.tryPut(rejectPolicy, task);  
            }  
        }  
    }  
  
    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {  
        this.coreSize = coreSize;  
        this.timeout = timeout;  
        this.timeUnit = timeUnit;  
        this.taskQueue = new BlockingQueue<>(queueCapcity);  
        this.rejectPolicy = rejectPolicy;  
    }  
  
    class Worker extends Thread {  
        private Runnable task;  
  
        public Worker(Runnable task) {  
            this.task = task;  
        }  
  
        @Override  
        public void run() {  
            //执行任务  
            //1.当传递过来的task不为空,执行任务  
            //2.当task执行完毕,再接着取下一个任务并执行  
            while (task != null || (task = taskQueue.poll(1000, TimeUnit.MILLISECONDS)) != null) {  
                try {  
                    task.run();  
                } catch (Exception e) {  
                    e.printStackTrace();  
                } finally {  
                    task = null;  
                }  
            }  
            synchronized (workers) {  
                workers.remove(this);  
            }  
        }  
    }  
}  
  
class BlockingQueue<T> {  
    //1. 任务队列  
    private final Deque<T> queue = new ArrayDeque<>();  
  
    //2. 锁  
    private final ReentrantLock lock = new ReentrantLock();  
  
    //3. 生产者条件变量  
    private final Condition fullWaitSet = lock.newCondition();  
  
    //4. 消费者条件变量  
    private final Condition emptyWaitSet = lock.newCondition();  
  
    //5. 容量上限  
    private int capcity;  
  
    public BlockingQueue(int capcity) {  
        this.capcity = capcity;  
    }  
  
    //带超时的等待获取  
    public T poll(long timeout, TimeUnit unit) {  
        lock.lock();  
        long nanos = unit.toNanos(timeout);  
        try {  
            while (queue.isEmpty()) {  
                try {  
                    if (nanos <= 0) {  
                        return null;  
                    }  
                    nanos = emptyWaitSet.awaitNanos(nanos);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
            T t = queue.removeFirst();  
            fullWaitSet.signal();  
            return t;  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    //消费者拿取任务的方法  
    public T take() {  
        l
首页 上一页 1 2 3 4 下一页 尾页 1/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇学习笔记——@RequestMapping注解.. 下一篇Java的Lambda表达式总结-JDK1.8

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目