|
一.java中阻塞队列如何设计
关于java中的阻塞队列 队列非常适合于生产者/消费者这种业务场景,像rabbitMq,activeMq都是对queue的一个封装的中间件。
java中提供了阻塞队列的实现。
Queue是接口,定义了存取这些基本接口:
public interface Queue
extends Collection
{
//放入到队列,如果放不进则抛错,成功返回true
boolean add(E e);
//放入queue,返回成功或失败
boolean offer(E e);
//取并移除队首元素,没有则抛异常
E remove();
//取并移除队首元素,没有则返回null
E poll();
//取队首元素,但不移除队首元素,没有则抛异常
E element();
//取队首元素,但不移除队首元素,没有则返回null
E peek();
}
考虑设计一个阻塞队列,需要处理哪些情况?
阻塞队列可以是有界或者无界,一般来讲无界队列比较危险,极端情况会使整台服务器JVM由于内存不断增长从而OutOfMemory。
所以下面只考虑如何设计有界阻塞队列。
主要关注两个点:
1.队列满了之后如何处理?
2.如何提高生产者将消息放入队列,消费者从队列取消息的效率?
对于队列满了的情况,有以下解决方案:
第一种处理方案是阻塞生产者,然后让消费者消费消息。
但是这样子,会使对外服务的线程完全阻塞(假设消息生产者是对外提供用户服务的线程),然后阻塞的用户线程越多,最后服务器也不能再创建线程,然后所有后面用户的请求会阻塞在tcp连接队列中,当tcp连接队列满了后,最终造成整台机子对外停止响应。
这种方案可以优化的地方是当队列满了后,由放入消息到队列的生产者线程执行这个任务。这样可以缓解一定的消费压力。
第二种方案是将后面放不进去的消息序列化到磁盘,当然这种方案会牺牲性能。
第三种,就是抛异常。
第二种就是返回不成功。
java线程池实现ThreadPoolExecutor,已经定义了几种常用的队列满后放不进消息的异常情况。
如AbortPolicy,DiscardPolicy,DiscardOldestPolicy,CallerRunsPolicy。
其中AbortPolicy如果队列满了放不进去消息,就直接拒绝并抛异常给生产者线程。
DiscardPolicy则默默的丢弃新提交的消息。而DiscardOldestPolicy则将队列中最旧的消息从队列中取走。
CallerRunsPolicy则是上述提到的第一种方案的优化版,即将不能放入满队列的消息任务处理交由生产者call线程自己去执行。
设置java线程池对于队列满放不进消息的处理策略代码如下:
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 1, 0,
TimeUnit.SECONDS, new LinkedBlockingQueue
(1));
poolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
poolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
poolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
poolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
基本是这几种方案,实际中最好是提高消费者取消息,处理消息能力,尽量避免队列满了的情况。
第二个问题,事实是就是提高生产者,消息者的存取消费。提高的途径就是减少竞争。
当然java提供了SyncronziedQueue,它应用场景适合 一个生产者多个消费者的情况。
SyncronziedQueue中生产者每向queue放入一条消息必须要有消费者从queue中将消息取出,生产者才能够继续放另一条消息。
相当于生产者直接将消息交给消费者,内部没有queue实体队列。
Executors的newCachedThreadPool用的就是SyncronziedQueue,当生产者放入消息的速度大于消费者处理消息速度时,如果此时线程数小于等于线程池设置的最大线程数,线程池尝试创建新的消费线程来处理消息。
测试代码如下:
public class SynchronousQueueTest {
public static void main(String[] args) throws Exception {
SynchronousQueue
|