3、源码分析
package com.chy.io.original.code;
import java.io.IOException;
public class PipedReader extends Reader {
boolean closedByWriter = false;
boolean closedByReader = false;
boolean connected = false;
Thread readSide;
Thread writeSide;
/**
* 用于循环存放PipedWriter写入的字符数组的默认大小
*/
private static final int DEFAULT_PIPE_SIZE = 1024;
/**
* 用于循环存放PipedWriter写入的字符数组
*/
char buffer[];
/**
* buf中下一个存放PipedWriter调用此PipedReader的receive(int c)时、c在buf中存放的位置的下标。
* in为-1时、说明buf中没有可读取字符、in=out时已经存满了。
*/
int in = -1;
/**
* buf中下一个被读取的字符的下标
*/
int out = 0;
/**
* 使用默认的buf的大小和传入的pw构造pr
*/
public PipedReader(PipedWriter src) throws IOException {
this(src, DEFAULT_PIPE_SIZE);
}
/**
* 使用指定的buf的大小和传入的pw构造pr
*/
public PipedReader(PipedWriter src, int pipeSize) throws IOException {
initPipe(pipeSize);
connect(src);
}
/**
* 使用默认大小构造pr
*/
public PipedReader() {
initPipe(DEFAULT_PIPE_SIZE);
}
/**
* 使用指定大小构造pr
*/
public PipedReader(int pipeSize) {
initPipe(pipeSize);
}
//初始化buf大小
private void initPipe(int pipeSize) {
if (pipeSize <= 0) {
throw new IllegalArgumentException("Pipe size <= 0");
}
buffer = new char[pipeSize];
}
/**
* 调用与此流绑定的pw的connect方法、将此流与对应的pw绑定
*/
public void connect(PipedWriter src) throws IOException {
src.connect(this);
}
/**
* pw调用此流的此方法、向pr的buf以整数形式中写入一个字符。
*/
synchronized void receive(int c) throws IOException {
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByWriter || closedByReader) {
throw new IOException("Pipe closed");
} else if (readSide != null && !readSide.isAlive()) {
throw new IOException("Read end dead");
}
writeSide = Thread.currentThread();
while (in == out) {
if ((readSide != null) && !readSide.isAlive()) {
throw new IOException("Pipe broken");
}
//buf中写入的被读取完、唤醒所有此对象监控的线程其他方法、如果一秒钟之后还是满值、则再次唤醒其他方法、直到buf中被读取。
notifyAll();
try {
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
//buf中存放第一个字符时、将字符在buf中存放位置的下标in初始化为0、读取的下标也初始化为0、准备接受写入的第一个字符。
if (in < 0) {
in = 0;
out = 0;
}
buffer[in++] = (char) c;
//如果buf中放满了、则再从头开始存放。
if (in >= buffer.length) {
in = 0;
}
}
/**
* 将c中一部分字符写入到buf中。
*/
synchronized void receive(char c[], int off, int len) throws IOException {
while (--len >= 0) {
receive(c[off++]);
}
}
/**
* 提醒所有等待的线程、已经接收到了最后一个字符、PipedWriter已关闭。用于PipedWriter的close()方法.
*/
synchronized void receivedLast() {
closedByWriter = true;
notifyAll();
}
/**
* 从buf中读取一个字符、以整数形式返回
*/
public synchronized int read() throws IOException {
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByReader) {
throw new IOException("Pipe closed");
} else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) {
throw new IOException("Write end dead");
}
readSide = Thread.currentThread();
int trials = 2;
while (in < 0) {
if (closedByWriter) {
/* closed by writer, return EOF */
return -1;
}
if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
throw new IOException("Pipe broken");
}
/* might be a writer waiting */
notifyAll();
try {
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
int ret = buffer[out++];
if (out >= buffer.length) {
out = 0;
}
if (in == out) {
/* now empty */
in = -1;
}
return ret;
}
/**
* 将buf中读取一部分字符到cbuf中。
*/
public synchronized int read(char cbuf[], int off, int len) throws IOException {
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByReader) {
throw new IOException("Pipe closed");
} else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) {
throw new IOException("Write end dead");
}
if ((off < 0) || (off > cbuf.length) || (len < 0) ||
((off + len) > cbuf.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
/* possibly wait on the first character */
int c = read();
if (c < 0) {
return -1;
}
cbuf[off] = (char)c;
int rlen = 1;
while ((in >= 0) && (--len > 0)) {
cbuf[off + rlen] = buffer[out++];
rlen++;
//如果读取的下一个字符下标大于buffer的size、则重置out、从新开始从第一个开始读取。
if (out >= buffer.length) {
out = 0;
}
//如果下一个写入字符的下标与下一个被读取的下标相同、则清空buf
if (in == out) {
/* now empty */
in = -1;
}
}
return rlen;
}
/**
* 查看此流是否可读、看各个线程是否关闭、以及buffer中是否有可供读取的字符。
*/
public synchronized boolean ready() throws IOException {
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByReader) {
throw new IOException("Pipe closed");
} else if (writeSide != null && !writeSide.isAlive()
&& !closedByWriter && (in < 0)) {
throw new IOException("Write end dead");
}
if (in < 0) {
return false;
} else {
return true;
}
}
/**
* 清空buf中数据、关闭此流。
*/
public void close() throws IOException {
in = -1;
closedByReader = true;
}
}
4、实例演示:
用于发送字符的线程:CharSenderThread
package com.chy.io.original.thread; import java.io.IOException; import java.io.PipedWriter; @SuppressWarnings(