设为首页 加入收藏

TOP

【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(下)(五)
2023-07-25 21:32:57 】 浏览:62
Tags:时队列 史上手 时间轮 TimingWheel
rvalTime % 30)))}; } else { return getWheel(intervalTime,baseInterval,baseInterval, 1); } } private static Integer[] getWheel(long intervalTime,long baseInterval,long interval, int p) { long nextInterval = baseInterval * interval; if (intervalTime < nextInterval) { return new Integer[]{p, Integer.valueOf(String.valueOf(intervalTime / interval))}; } else { return getWheel(intervalTime,baseInterval,nextInterval, (p+1)); } } static class wheelThread extends Thread { DelayQueue<TimewheelTask> queue = new DelayQueue<TimewheelTask>(); public DelayQueue<TimewheelTask> getQueue() { return queue; } public void add(List<TimewheelTask> tasks) { if (CollectionUtils.isNotEmpty(tasks)) { tasks.forEach(task -> add(task)); } } public void add(TimewheelTask task) { task.calDelay(); queue.add(task); } @Override public void run() { while (true) { try { TimewheelTask task = queue.take(); int p = task.getLevel(); long nextInterval = MathTool.power(interval, Integer.valueOf(String.valueOf(MathTool.power(2, p)))); TimewheelBucket timewheelBucket = cache.get(p); synchronized (timewheelBucket) { timewheelBucket.indexAdd(); task.run(); task.clear(); } task.setDelay(nextInterval, TimeUnitProvider.getTimeUnit()); task.calDelay(); queue.add(task); } catch (InterruptedException e) { } } } } }
TimerWheel的模型定义
private static Map<Integer, TimewheelBucket> cache = new ConcurrentHashMap<>();

一个轮表示30秒的整体跨度。

private static int interval = 30;

创建整体驱动的执行线程

private static wheelThread wheelThread;

 static{
        interval = 30;
        wheelThread = new wheelThread();
        wheelThread.setDaemon(false);
        wheelThread.start();
}

    static class wheelThread extends Thread {
        DelayQueue<TimewheelTask> queue = new DelayQueue<TimewheelTask>();
        public DelayQueue<TimewheelTask> getQueue() {
            return queue;
        }
        public void add(List<TimewheelTask> tasks) {
            if (CollectionUtils.isNotEmpty(tasks)) {
                tasks.forEach(task -> add(task));
            }
        }
        public void add(TimewheelTask task) {
            task.calDelay();
            queue.add(task);
        }
        @Override
        public void run() {
            while (true) {
                try {
                    TimewheelTask task = queue.take();
                    int p = task.getLevel();
                    long nextInterval = MathTool.power(interval, Integer.valueOf(String.valueOf(MathTool.power(2, p))));
                    TimewheelBucket timewheelBucket = cache.get(p);
                    synchronized (timewheelBucket) {
                        timewheelBucket.indexAdd();
                        task.run();
                        task.clear();
                    }
                    task.setDelay(nextInterval, TimeUnitProvider.getTimeUnit());
                    task.calDelay();
                    queue.add(task);
                } catch (InterruptedException e) {

                }
            }
   }

获取对应的时间轮轮盘模型体系
    private static Integer[] getWheel(long intervalTime,long baseInterval) {
        //转换后的延时时间
        if (intervalTime < baseInterval) {
            return new Integer[]{0, Integer.valueOf(String.valueOf((intervalTime % 30)))};
        } else {
            return getWheel(intervalTime,baseInterval,baseInterval, 1);
        }
    }

    private static Integer[] getWheel(long intervalTime,long baseInterval,long interval, int p) {
         long nextInterval = baseInterval * interval;
        if (intervalTime < nextInterval) {
            return new Integer[]{p, Integer.valueOf(String.valueOf(intervalTime / interval))};
        } else {
            return getWheel(intervalTime,baseInterval,nextInterval, (p+1));
        }
    }

到这里相信大家,基本上应该是了解了如何去实现对应的时间轮盘的技术实现过程,有兴趣希望整个完整源

首页 上一页 2 3 4 5 下一页 尾页 5/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Flink 生成ParquetFile 下一篇Java反应式编程(3)

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目