Java_io体系之PipedWriter、PipedReader简介、走进源码及示例――14(二)

2014-11-24 08:05:03 · 作者: · 浏览: 1
pw绑定 synchronized boolean ready() 查看此流是否可读 synchronized int read() 从buf中读取一个字符、以整数形式返回 synchronized int read(char cbuf[], int off, int len) 将buf中读取一部分字符到cbuf中。 synchronized void receive(int c) pw调用此流的此方法、向pr的buf以整数形式中写入一个字符。 synchronized void receive(char c[], int off, int len) 将c中一部分字符写入到buf中。 synchronized void receivedLast() 提醒所有等待的线程、已经接收到了最后一个字符。

3、源码分析


package com.chy.io.original.code;

import java.io.IOException;

public class PipedReader extends Reader {
    boolean closedByWriter = false;
    boolean closedByReader = false;
    boolean connected = false;

    Thread readSide;
    Thread writeSide;

   /** 
    * 用于循环存放PipedWriter写入的字符数组的默认大小
    */ 
    private static final int DEFAULT_PIPE_SIZE = 1024;

    /**
     * 用于循环存放PipedWriter写入的字符数组
     */
    char buffer[];

    /**
     * buf中下一个存放PipedWriter调用此PipedReader的receive(int c)时、c在buf中存放的位置的下标。
     * in为-1时、说明buf中没有可读取字符、in=out时已经存满了。
     */
    int in = -1;

    /**
     * buf中下一个被读取的字符的下标
     */
    int out = 0;

    /**
     * 使用默认的buf的大小和传入的pw构造pr
     */
    public PipedReader(PipedWriter src) throws IOException {
    	this(src, DEFAULT_PIPE_SIZE);
    }

    /**
     * 使用指定的buf的大小和传入的pw构造pr
     */
    public PipedReader(PipedWriter src, int pipeSize) throws IOException {
		initPipe(pipeSize);
		connect(src);
    }


    /**
     * 使用默认大小构造pr
     */
    public PipedReader() {
    	initPipe(DEFAULT_PIPE_SIZE);
    }

    /**
     * 使用指定大小构造pr
     */
    public PipedReader(int pipeSize) {
    	initPipe(pipeSize);
    }

    //初始化buf大小
    private void initPipe(int pipeSize) {
		if (pipeSize <= 0) {
		    throw new IllegalArgumentException("Pipe size <= 0");
		}
		buffer = new char[pipeSize];
    }

    /**
     * 调用与此流绑定的pw的connect方法、将此流与对应的pw绑定
     */
    public void connect(PipedWriter src) throws IOException {
    	src.connect(this);
    }
    
    /**
     * pw调用此流的此方法、向pr的buf以整数形式中写入一个字符。
     */
    synchronized void receive(int c) throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByWriter || closedByReader) {
        	throw new IOException("Pipe closed");
        } else if (readSide != null && !readSide.isAlive()) {
            throw new IOException("Read end dead");
        }

		writeSide = Thread.currentThread();
		while (in == out) {
		    if ((readSide != null) && !readSide.isAlive()) {
		    	throw new IOException("Pipe broken");
		    }
		    //buf中写入的被读取完、唤醒所有此对象监控的线程其他方法、如果一秒钟之后还是满值、则再次唤醒其他方法、直到buf中被读取。
		    notifyAll();	
		    try {
		        wait(1000);
		    } catch (InterruptedException ex) {
		    	throw new java.io.InterruptedIOException();
		    }
		}
		//buf中存放第一个字符时、将字符在buf中存放位置的下标in初始化为0、读取的下标也初始化为0、准备接受写入的第一个字符。
		if (in < 0) {
		    in = 0;
		    out = 0;
		}
		buffer[in++] = (char) c;
		//如果buf中放满了、则再从头开始存放。
		if (in >= buffer.length) {
		    in = 0;
		}
    }

    /**
     * 将c中一部分字符写入到buf中。
     */
    synchronized void receive(char c[], int off, int len)  throws IOException {
		while (--len >= 0) {
		    receive(c[off++]);
		}
    }

    /**
     * 提醒所有等待的线程、已经接收到了最后一个字符、PipedWriter已关闭。用于PipedWriter的close()方法.
     */
    synchronized void receivedLast() {
		closedByWriter = true;
		notifyAll();
    }

    /**
     * 从buf中读取一个字符、以整数形式返回
     */
    public synchronized int read()  throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
		    throw new IOException("Pipe closed");
		} else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }

        readSide = Thread.currentThread();
		int trials = 2;
		while (in < 0) {
		    if (closedByWriter) { 
			/* closed by writer, return EOF */
			return -1;
		    }
		    if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
			throw new IOException("Pipe broken");
		    }
	            /* might be a writer waiting */
		    notifyAll();
		    try {
		        wait(1000);
		    } catch (InterruptedException ex) {
			throw new java.io.InterruptedIOException();
		    }
	 	}
		int ret = buffer[out++];
		if (out >= buffer.length) {
		    out = 0;
		}
		if (in == out) {
	            /* now empty */
		    in = -1;		
		}
		return ret;
    }

    /**
     * 将buf中读取一部分字符到cbuf中。
     */
    public synchronized int read(char cbuf[], int off, int len)  throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
		    throw new IOException("Pipe closed");
		} else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }

        if ((off < 0) || (off > cbuf.length) || (len < 0) ||
            ((off + len) > cbuf.length) || ((off + len) < 0)) {
		    throw new IndexOutOfBoundsException();
		} else if (len == 0) {
		    return 0;
		}

        /* possibly wait on the first character */
		int c = read();		
		if (c < 0) {
		    return -1;
		}
		cbuf[off] =  (char)c;
		int rlen = 1;
		while ((in >= 0) && (--len > 0)) {
		    cbuf[off + rlen] = buffer[out++];
		    rlen++;
		    //如果读取的下一个字符下标大于buffer的size、则重置out、从新开始从第一个开始读取。
		    if (out >= buffer.length) {
		    	out = 0;
		    }
		    //如果下一个写入字符的下标与下一个被读取的下标相同、则清空buf
		    if (in == out) {
	                /* now empty */
		    	in = -1;	
		    }
		}
		return rlen;
    }

    /**
     * 查看此流是否可读、看各个线程是否关闭、以及buffer中是否有可供读取的字符。
     */
    public synchronized boolean ready() throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
	    throw new IOException("Pipe closed");
	} else if (writeSide != null && !writeSide.isAlive()
                   && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }
        if (in < 0) {
            return false;
        } else {
            return true;
        }
    }
 
    /**
     * 清空buf中数据、关闭此流。
     */
    public void close()  throws IOException {
		in = -1;
		closedByReader = true;
    }
}


4、实例演示:


用于发送字符的线程:CharSenderThread
package com.chy.io.original.thread;

import java.io.IOException;
import java.io.PipedWriter;

@SuppressWarnings(