双队列的一种实现(二)

2014-11-24 02:08:38 · 作者: · 浏览: 8
bVF1ZXVlysfSu7j2Q29uY3VycmVudExpbmtlZFF1ZXVlo6y087zSv8nS1Lr2wtSjrHRvUXVldWXKx8urttPB0KOsv8nS1Nei0uLSu8/C08O3qKGjzNix8MrHzfnA78Pm0LS1xMqxuvKjrNDo0qp3aGlsZdGtu7fW2MrU1rG1vdC0yOuzybmmoaMKPHByZSBjbGFzcz0="brush:java;">@Override public void run() { long start = System.currentTimeMillis(); log.debug(Thread.currentThread() + " Unpacker started at " + start); Random r = new Random(start); Bundle bundle = null; boolean shoudShutdown = false; try { while(!shoudShutdown) { bundle = (Bundle) fromQueue.poll(); if (bundle == null) { if (seeEOF.get()) { // 当取到空,并且其他线程已经取到EOF,那么本线程将Latch减1,并退出循环 latch.countDown(); shoudShutdown = true; } else { // 如果EOF还没被取到,本线程小睡一会后继续取 try { sleep(r.nextInt(10)); } catch (InterruptedException e) { log.error("Interrupted when taking a nap", e); } } } else if (!bundle.isEof()) { // bundle非空且非EOF,则往双队列写入一个Bundle byte[] lineBytes = BundleUtil.getDecompressedData(bundle); // 放入双队列时,若offer失败则重试 while (!toQueue.offer(new UnCompressedBundle(bundle.getId(), ByteUtil.bytes2Lines(lineBytes, lineDelim), bundle.getIndex(), bundle.getJobId()))) { log.info("Unpacker put failed, will retry"); } log.info("After enqueue, queue size is " + toQueue.size()); } else { // Unpacker获得到了EOF seeEOF.set(true); // 自己将Lacth减1,并等待其他线程退出 latch.countDown(); try { latch.await(); } catch (InterruptedException e) { log.error("Interrupted when waiting the latch "); } // 其他线程已经退出,本线程放入EOF while (!toQueue.offer(new UnCompressedBundle(-1L, new Line[0], -1L, -1L))) { log.info("Unpacker put EOF failed, will retry"); } // 关闭Queue toQueue.close(); // 退出循环 shoudShutdown = true; } } log.debug(Thread.currentThread() + " Unpacker finished in " + (System.currentTimeMillis()-start) + " ms"); } catch (Exception e) { log.error("Exception when unpacker is running ", e); // 将latch减1,表示自己异常退出,且不再工作 // latch.countDown(); log.debug(Thread.currentThread() + " Unpacker occured exception and stopped. "); } finally { } }

多对一

多个生产者的情况下,写入队列无可避免发送锁争抢,但是能保证消费者的稳定读出过程。没有什么特殊处理的地方,这里就不 嗦了。

总结分析

本文介绍了一种经典双队列的设计和实现,也给出了一些代码演示。文章末尾我会贴出整个双队列的代码实现,需要的同学也可以留言,我把.java发给你。如果使用的时候有发现问题,不吝赐教,这个双队列的实现也还不是很完美。使用的时候也存在需要注意的地方。 其实双队列的目的还是在于让写和读互相没有影响,而且更加照顾了写的速度。因为一般写的速度可能会比较快,而读的人读出之后还会做一些额外的处理,所以写的这一方借助双队列,可以持续写的过程,而且如果读的一方慢的话,可以多起几个消费者线程,就像"一对多"场景里阐述的那样来使用双队列。
下面是整个实现。各位可以仔细看看,发现问题一定记得通知我 :)
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;


import lombok.ToString;
import lombok.extern.log4j.Log4j;


/**
 * Represents a region with two swap spaces, one for storing data which from
 * data source, the other one for storing data which will be transferred to data
 * destination.
 * 
* A classical DoubleCachedQueue, In beginning, space A and space B both * empty, then loading task begin to load data to space A, when A is almost * full, let the data from data source being loaded to space B, then dumping * task begin to dump data from space A to data source. When space A is empty, * switch the two spaces for load and dump task. Repeat the above operation. * */ @Log4j @ToString public class DoubleCachedQueue extends AbstractQueue implements BlockingQueue , java.io.Serializable { private static final long serialVersi