设为首页 加入收藏

TOP

死磕 java线程系列之自己动手写一个线程池(续)(二)
2019-10-11 11:19:55 】 浏览:56
Tags:死磕 java 线程 系列 自己 手写 一个
c final int EXCEPTION = 2; /** * 任务执行的结果【本篇文章由公众号“彤哥读源码”原创】 * 如果执行正常,返回结果为T * 如果执行异常,返回结果为Exception */ private Object result;

再次,如果get()在run()之前执行,那就需要阻塞等待run()执行完毕才能拿到返回值,所以需要保存调用者(主线程),get()的时候park阻塞住,run()完成了unpark唤醒它来拿返回值。

    /**
     * 调用者线程
     * 也可以使用volatile+Unsafe实现CAS操作
     */
    private AtomicReference<Thread> caller = new AtomicReference<>();

然后,我们先来看看run()方法的逻辑,它其实就是先执行真正的任务,然后修改状态为完成,并保存任务的返回值,如果保存了主线程,还要唤醒它。

    @Override
    public void run() {
        // 如果状态不是NEW,说明执行过了,直接返回
        if (state.get() != NEW) {
            return;
        }
        try {
            // 执行任务【本篇文章由公众号“彤哥读源码”原创】
            T r = task.call();
            // CAS更新state的值为FINISHED
            // 如果更新成功,就把r赋值给result
            // 如果更新失败,说明state的值不为NEW了,也就是任务已经执行过了
            if (state.compareAndSet(NEW, FINISHED)) {
                this.result = r;
                // finish()必须放在修改state里面,见下面的分析
                finish();
            }
        } catch (Exception e) {
            // 如果CAS更新state的值为EXCEPTION成功,就把e赋值给result
            // 如果CAS更新失败,说明state的值不为NEW了,也就是任务已经执行过了
            if (state.compareAndSet(NEW, EXCEPTION)) {
                this.result = e;
                // finish()必须放在修改state里面,见下面的分析
                finish();
            }
        }
    }

    private void finish() {
        // 检查调用者是否为空,如果不为空,唤醒它
        // 调用者在调用get()方法的进入阻塞状态
        for (Thread c; (c = caller.get()) != null;) {
            if (caller.compareAndSet(c, null)) {
                LockSupport.unpark(c);
            }
        }
    }

最后,我们再看看get()方法,如果任务还未执行,就阻塞等待任务的执行;如果任务已经执行完毕了,直接拿返回值即可;但是,还有一种情况,get()方法执行的过程中run()方法也在执行,所以get()方法中的每一步都要检查状态的值有没有变化。

@Override
    public T get() {
        int s = state.get();
        // 如果任务还未执行完成,判断当前线程是否要进入阻塞状态
        if (s == NEW) {
            // 标识调用者线程是否被标记过
            boolean marked = false;
            for (;;) {
                // 重新获取state的值
                s = state.get();
                // 如果state大于NEW说明完成了,跳出循环
                if (s > NEW) {
                    break;
                    // 此处必须把caller的CAS更新和park()方法分成两步处理,不能把park()放在CAS里面
                } else if (!marked) {
                    // 尝试更新调用者线程
                    // 试想断点停在此处【本篇文章由公众号“彤哥读源码”原创】
                    // 此时state为NEW,让run()方法执行到底,它不会执行finish()中的unpark()方法
                    // 这时打开断点,这里会更新caller成功,但是循环从头再执行一遍发现state已经变了,
                    // 直接在上面的if(s>NEW)处跳出循环了,因为finish()在修改state内部
                    marked = caller.compareAndSet(null, Thread.currentThread());
                } else {
                    // 调用者线程更新之后park当前线程
                    // 试想断点停在此处
                    // 此时state为NEW,让run()方法执行到底,因为上面的caller已经设置值了,
                    // 所以会执行finish()方法中的unpark()方法,
                    // 这时再打开断点,这里不会park信
                    // 见unpark()方法的注释,上面写得很清楚:
                    // 如果线程执行了park()方法,那么执行unpark()方法会唤醒那个线程
                    // 如果先执行了unpark()方法,那么线程下一次执行park()方法将不会阻塞
                    LockSupport.park();
                }
            }
        }

        if (s == FINISHED) {
            return (T) result;
        }
        throw new RuntimeException((Throwable) result);
    }

在我们的实现中,如果任务执行的过程抛出异常了,也是通过result返回给主线程,这样主线程就拿到了这个异常,它就可以做相应的处理了。

好了,完整的实现到此结束,不知道你领悟了没有。

测试用例

最后奉上测试代码:

public class MyThreadPoolFutureExecutorTest {
    public static void main(String[] args) {
        FutureExecutor threadPool = new MyThreadPoolFutureExecutor("test", 2, 4, new ArrayBlockingQueue<>(6), new DiscardRejectPolicy());
        List<Future<Integer>> list = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            int num = i;
            Future<Integer> future = threadPool.submit(() -> {
                Thread.sleep(1000);
                System.out.println("running: " + num);
                return num;
            });
            list.add(future);
        }

        for (Future<Integer> future : list) {
            System.out.println("runned: " + future.get());
        }
    }
}

运行结果:

thread name: core_test2
thread name: test4
thread name: test3
discard one task
thread name: core_test1
discard one task
...省略被拒绝的任务
【本篇文章由公众号“彤哥读源码”原创】
discard one task
running: 0
running: 1
running: 8
running: 9
runned: 0
runned: 1
running: 4
running: 2
runni
首页 上一页 1 2 3 下一页 尾页 2/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇【mybatis】1mybatis下载、配置与.. 下一篇SpringMVC_HandlerMethodArgument..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目