设为首页 加入收藏

TOP

从中间件团队窃取了这个组件,见识到了编码能力的天花板!!(一)
2023-07-25 21:29:35 】 浏览:82
Tags:从中间 能力的 花板

大家好,我是陶朱公Boy,又跟大家见面了。

前言

今天跟大家分享一款基于“生产者消费者模式”下实现的组件。

该组件是作者偶然在翻阅公司一中间件源码的时候碰到的,觉得设计的非常精美、巧妙,花了点时间整理成文分享给大家。

生产者和消费者彼此之间不进行通信,中间通过一个容器(如阻塞队列)来解决强解耦问题。
阻塞队列起到了一定的数据缓冲作用,平衡了生产者和消费者对数据的处理能力。by—《Java并发编程的艺术》

组件介绍

该组件基于生产者消费者模式来编码实现,是一款本地化解决流量削峰、解耦、异步的利器。

此组件由以下知识点构成:线程池、阻塞队列、LockSupport、Executor框架、final、volatile。此外你还能接触到hash取模算法、接口回调等机制。

组件本身代码量并不大,但知识点比较密集,所以希望大家能花一点时间认真看完。我将从适用场景、架构设计、源码解析这三个角度给大家讲介绍这款组件。

适用场景

 

☆场景一:报表下载

现在很多后台下载功能,普适的做法是先筛选转换数据,然后对接云存储平台进行保存,最后生成一个可访问的文件地址,整个过程非常耗时。

其实完全可以生产者发送一个下载请求就结束响应,服务端异步的去消费这个任务请求,处理完生成地址后,再进行通知(比如更新对应数据库文件字段)这是一种异步体现,也解耦了生产者与消费者原来的同步交互方式,整体效率会更高。

☆场景二:日志埋点

有些应用它的QPS非常高,产生的数据本身并不是特别重要比如埋点的日志,如果实时调用埋点平台可能给平台侧造成非常大的访问压力。所以这个时候中间的阻塞队列就起到了一定的缓冲作用,等一段时间或队列数据量达到一定量(参赛可动态配置)再一次性拿出来转换后,最后批量传递出去。

☆场景三:Yana(阿里内部一款基于邮件分享技术文章的工具)

《Java并发编程的艺术》作者方腾飞有分享过他们基于生产者消费者模式实现的一个案例。

他们团队早期有一个习惯,大家如果在平时工作当中遇到比较好的文章,会通过邮件转发到专属邮箱进行内部分享,这样其他成员就能看到这篇文章,甚至大家会在底部评论、回复、交流。

但期间遇到一个问题:一旦时间一长,以前的文章很难被检阅。邮件列表的可视化太差,也不能进行归类,有些新入职员工也看不到以往其他成员分享过的文章。

基于这些问题,有几个小伙伴自发的趁业余时间开发了一个简易工具--yana。该工具功能就是:生产者线程会先往邮箱里将所有分享的邮件下载下来(包括附件、图片、邮件回复等内容),下载完成后,通过confluence的Web Service接口,把文章保存到confluence中去。这样不仅好维护,而且留存问题也得到了解决。

不过随着这款工具在其他部门的推广,发现系统响应时间越来越长。只要单位时间内积累邮件一多,一次处理完可能就要花费几分钟。

于是他们升级了方案,把架构演进到了V2.0版本。整体思路是使用了生产者消费者模式来处理。

思路如下:生产者线程去邮件系统下载完邮件后,不会立即调用confluence的web service接口,而是选择把下载的内容放入阻塞队列后立即返回。而消费者启动CPU*2个线程数来并行处理队列中的邮件,从之前的单线程演变成了多线程处理,生产者和消费者实现了异步、解耦。经过观察,比起V1.0同步处理,速度比之前要快好了几倍。 

...

架构设计

☆对象图

该组件支持“多生产者多消费者”场景,在多核时代充分利用CPU多核机制,消费者多线程并行处理阻塞队列中的数据,加快任务处理速度。

☆逻辑架构图

该组件内部持有一个工作线程对象数组,当生产者提交数据的时候,会先经过一个route组件(采用hash取模算法),动态路由到其中一个线程对象内的阻塞队列中存储起来。等到满足一定条件,工作线程就会将自身线程对象内阻塞队列中的数据转换成指定容量的List对象(BlockQueue的drainto方法有支持),然后调用已经注册的回调函数把数据传递出去。

☆流程图

我们一起来看下这张工作线程内部运行流程图:

首选我们说此组件对象持有一个工作线程对象数组,每个工作线程对象内部持有一个有界阻塞队列实例对象(ArrayBlockingQueue),主要有run(),add(),timeout()方法。

1)add方法:

生产者调用组件的add方法后,add方法内部通过hash取模算法动态路由到某个工作线程对象,继而调用对象内部的add方法,将对象添加到指定阻塞队列中去。

生产者线程调用组件的add方法后,add方法内部通过hash取模算法,会动态路由到指定工作线程对象,继而调用其内部的add方法。当工作者线程内部的add被调用后,方法底部都会检查当前队列的实际大小是否超过指定阈值(可配置),如果超过,就会触发LockSupport.unpark方法,唤醒被阻塞的工作线程。

2)timeout方法

ScheduledThreadPoolExecutor在组件初始化新建工作线程的时候,为每一个工作线程对象开启一个定时器,按固定时间间隔周期(可配置),检查工作线程距离上一次任务处理完的时间差是否超过指定阈值(可配置),如果超过就会触发LockSupport.unpark方法,唤醒被阻塞的工作线程。

很明显上述timeout机制,是为容错考虑。大家想过没有,通过add方法将元素添加到阻塞队列中去,假如某一个时间窗口,队列中的元素个数达不到指定阈值,那就尴尬了,工作线程永远无法将队列中的元素消费掉,这个时候从高可用角度就需要双通道保障机制了,而timeout就是为了应对这种场景而设计的容错方案。

源码赏析

public class ProducerAndConsumerComponet {
    private final static Logger log = LoggerFactory.getLogger(ProducerAndConsumerComponet.class);

    //组件持有一个工作线程对象数组
    private final WorkThread<T>[] workThreads;
    private AtomicInteger index;
    private static final Random r = new Random();
    //任务定时器
    private static ScheduledExecutorService scheduleThreadPool = new ScheduledThreadPoolExecutor(1);
    //组件初始化完成工作线程的新建
 
首页 上一页 1 2 3 4 5 6 7 下一页 尾页 1/8/8
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇学习笔记——Http协议 下一篇学习笔记——ServletConfig,Servl..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目