PipedInputStream类中的read()方法源代码如下:
PipedInputStream
public synchronized int read() throws IOException {
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByReader) {
throw new IOException("Pipe closed");
} else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) {
throw new IOException("Write end dead");
}
readSide = Thread.currentThread(); // 获取读取线程
int trials = 2;
while (in < 0) {
if (closedByWriter) { // 如果in<0(表示管道中无数据)且closedByWriter为true(表示输入管道已经关闭)则直接返回-1
return -1;
}
if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
throw new IOException("Pipe broken");
}
notifyAll();
try {
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
int ret = buffer[out++] & 0xFF;
if (out >= buffer.length) {
out = 0;
}
if (in == out) {
/* now empty */
in = -1;
}
return ret;
}
public synchronized int read(byte b[], int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
/* possibly wait on the first character */
int c = read();
if (c < 0) {
return -1;
}
b[off] = (byte) c;
int rlen = 1;
while ((in >= 0) && (len > 1)) {
int available;
if (in > out) {
available = Math.min((buffer.length - out), (in - out));
} else {
available = buffer.length - out;
}
// A byte is read beforehand outside the loop
if (available > (len - 1)) {
available = len - 1;
}
System.arraycopy(buffer, out, b, off + rlen, available);
out += available;
rlen += available;
len -= available;
if (out >= buffer.length) {
out = 0;
}
if (in == out) {
in = -1;
}
}
return rlen;
}
3、刷新与关闭管道
来看一下管道输出流中的刷新和关闭方法,源代码如下:
PipedOutputStream
// 刷回管道输出流
public synchronized void flush() throws IOException {
if (sink != null) {
synchronized (sink) {
/*
* 调用管道输入流的notifyAll(),通知管道输入流放弃对当前资源的占有,
* 让其它的等待线程(等待读取管道输出流的线程)读取管道输出流的值。
*/
sink.notifyAll();
}
}
}
// 关闭管道输出流
public void close() throws IOException {
if (sink != null) {
sink.receivedLast();// 通知管道输入流,输出管理已经关闭
}
}
看一下receivedLast()方法,如下:
PipedInputStream
synchronized void receivedLast() {
closedByWriter = true; // 输出管道标志为true,表示关闭
notifyAll(); // 唤醒所有的等待线程
}通知所有的等待线程,最后的数据已经全部到达。
PipedInputStream
public void close() throws IOException { // 关闭管道输出流
closedByReader = true;
synchronized (this) {
in = -1; // 清空缓冲区数据
}
}
下面来具体举一个例子,如下:
public class test04 {
public static void main(String [] args) {
Sender sender = new Sender();
Receiver receiver = new Receiver();
PipedOutputStream outStream = sender.getOutStream();
PipedInputStream inStream = receiver.getInStream();
try {
//inStream.connect(outStream); // 与下一句一样
outStream.connect(inStream);
} catch (Exception e) {
e.printStackTrace();
}
sender.start();
receiver.start();
}
}
class Sender extends Thread {
private PipedOutputStream outStream = new PipedOutputStream();
public PipedOutputStream getOutStream() {
return outStream;
}
public void run() {
String info = "hello, receiver";
try {
outStream.write(info.getBytes());
outStream.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
class Receiver extends Thread {
private PipedInputStream inStream = new PipedInputStream();
public PipedInputStream getInStream() {
return inStream;
}
public void run() {
byte[] buf = new byte[1024];
try {
int len = inStream.read(buf);
System.out.println("receive message from sender : " + new String(buf, 0, len));
inStream.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
最后运行后输出的结果如下:receive message from sender : hello, receiver
参考文献:
1、http://www.cnblogs.com/lich/archive/2011/12/11/2283928.html
2、ht