Java
Java IO

Java IO 19 - PipedReader和PipedWriter详解

简介:流是一组有顺序的,有起点和终点的字节集合。是对设备文件间数据传输的总称和抽象。

1. PipedReader和PipedWriter简介

PipedReader和PipedWriter和PipedInputStream和PipedOutputStream一样,都可以用于管道通信。PipedWriter是字符管道输出流,它继承于Writer。PipedReader是字符管道输入流,它继承于Writer。PipedWriter和PipedReader的作用是可以通过管道进行线程间的通讯。在使用管道通信时,必须将PipedWriter和PipedReader配套使用。

2. PipedWriter源码分析

下面是PipedWriter的源码,基于JDK 1.7.0_07:

  • package java.io;
  • public class PipedWriter extends Writer {
  • // 与PipedWriter通信的PipedReader对象
  • private PipedReader sink;
  • // PipedWriter的关闭标记
  • private boolean closed = false;
  • // 构造函数,指定配对的PipedReader
  • public PipedWriter(PipedReader snk) throws IOException {
  • connect(snk);
  • }
  • // 构造函数
  • public PipedWriter() {
  • }
  • // 将PipedWriter和PipedReader连接
  • public synchronized void connect(PipedReader snk) throws IOException {
  • if (snk == null) {
  • throw new NullPointerException();
  • } else if (sink != null || snk.connected) {
  • throw new IOException("Already connected");
  • } else if (snk.closedByReader || closed) {
  • throw new IOException("Pipe closed");
  • }
  • sink = snk;
  • snk.in = -1;
  • snk.out = 0;
  • // 设置PipedReader和PipedWriter为已连接状态
  • // connected是PipedReader中定义的,用于表示PipedReader和PipedWriter是否已经连接
  • snk.connected = true;
  • }
  • // 将一个字符c写入PipedWriter中
  • // 将c写入PipedWriter之后,它会将c传输给PipedReader
  • public void write(int c) throws IOException {
  • if (sink == null) {
  • throw new IOException("Pipe not connected");
  • }
  • sink.receive(c);
  • }
  • // 将字符数组b写入PipedWriter中
  • // 将数组b写入PipedWriter之后,它会将其传输给PipedReader
  • public void write(char cbuf[], int off, int len) throws IOException {
  • if (sink == null) {
  • throw new IOException("Pipe not connected");
  • } else if ((off | len | (off + len) | (cbuf.length - (off + len))) < 0) {
  • throw new IndexOutOfBoundsException();
  • }
  • sink.receive(cbuf, off, len);
  • }
  • // 清空PipedWriter
  • // 这里会调用PipedReader的notifyAll();
  • // 目的是让PipedReader放弃对当前资源的占有,让其它的等待线程(等待读取PipedWriter的线程)读取PipedWriter的值
  • public synchronized void flush() throws IOException {
  • if (sink != null) {
  • if (sink.closedByReader || closed) {
  • throw new IOException("Pipe closed");
  • }
  • synchronized (sink) {
  • sink.notifyAll();
  • }
  • }
  • }
  • // 关闭PipedWriter
  • // 关闭之后,会调用receivedLast()通知PipedReader它已经关闭
  • public void close() throws IOException {
  • closed = true;
  • if (sink != null) {
  • sink.receivedLast();
  • }
  • }
  • }

3. PipedReader源码分析

下面是PipedReader的源码,基于JDK 1.7.0_07:

  • package java.io;
  • public class PipedReader extends Reader {
  • // PipedWriter是否关闭的标记
  • boolean closedByWriter = false;
  • // PipedReader是否关闭的标记
  • boolean closedByReader = false;
  • // PipedReader与PipedWriter是否连接的标记
  • // 它在PipedWriter的connect()连接函数中被设置为true
  • boolean connected = false;
  • Thread readSide; // 读取管道数据的线程
  • Thread writeSide; // 向管道写入数据的线程
  • // 管道的默认大小
  • private static final int DEFAULT_PIPE_SIZE = 1024;
  • // 缓冲区
  • char buffer[];
  • // 下一个写入字符的位置。in == out代表满,说明写入的数据全部被读取了
  • int in = -1;
  • // 下一个读取字符的位置。in == out代表满,说明写入的数据全部被读取了
  • int out = 0;
  • // 构造函数:指定与PipedReader关联的PipedWriter
  • public PipedReader(PipedWriter src) throws IOException {
  • this(src, DEFAULT_PIPE_SIZE);
  • }
  • // 构造函数:指定与PipedReader关联的PipedWriter,以及缓冲区大小
  • public PipedReader(PipedWriter src, int pipeSize) throws IOException {
  • initPipe(pipeSize);
  • connect(src);
  • }
  • // 构造函数:默认缓冲区大小是1024字符
  • public PipedReader() {
  • initPipe(DEFAULT_PIPE_SIZE);
  • }
  • // 构造函数:指定缓冲区大小是pipeSize
  • public PipedReader(int pipeSize) {
  • initPipe(pipeSize);
  • }
  • // 初始化管道:新建缓冲区大小
  • private void initPipe(int pipeSize) {
  • if (pipeSize <= 0) {
  • throw new IllegalArgumentException("Pipe size <= 0");
  • }
  • buffer = new char[pipeSize];
  • }
  • // 将PipedReader和PipedWriter绑定
  • // 实际上,这里调用的是PipedWriter的connect()函数
  • public void connect(PipedWriter src) throws IOException {
  • src.connect(this);
  • }
  • // 接收int类型的数据b
  • // 它只会在PipedWriter的write(int b)中会被调用
  • synchronized void receive(int c) throws IOException {
  • // 检查管道状态
  • if (!connected) {
  • throw new IOException("Pipe not connected");
  • } else if (closedByWriter || closedByReader) {
  • throw new IOException("Pipe closed");
  • } else if (readSide != null && !readSide.isAlive()) {
  • throw new IOException("Read end dead");
  • }
  • // 获取写入管道的线程
  • writeSide = Thread.currentThread();
  • // 如果管道中被读取的数据,等于写入管道的数据时,
  • // 则每隔1000ms检查管道状态,并唤醒管道操作:若有读取管道数据线程被阻塞,则唤醒该线程
  • while (in == out) {
  • if ((readSide != null) && !readSide.isAlive()) {
  • throw new IOException("Pipe broken");
  • }
  • /* full: kick any waiting readers */
  • notifyAll();
  • try {
  • wait(1000);
  • } catch (InterruptedException ex) {
  • throw new java.io.InterruptedIOException();
  • }
  • }
  • if (in < 0) {
  • in = 0;
  • out = 0;
  • }
  • buffer[in++] = (char) c;
  • if (in >= buffer.length) {
  • in = 0;
  • }
  • }
  • // 接收字符数组b
  • synchronized void receive(char c[], int off, int len) throws IOException {
  • while (--len >= 0) {
  • receive(c[off++]);
  • }
  • }
  • // 当PipedWriter被关闭时,被调用
  • synchronized void receivedLast() {
  • closedByWriter = true;
  • notifyAll();
  • }
  • // 从管道(的缓冲)中读取一个字符,并将其转换成int类型
  • public synchronized int read() throws IOException {
  • if (!connected) {
  • throw new IOException("Pipe not connected");
  • } else if (closedByReader) {
  • throw new IOException("Pipe closed");
  • } else if (writeSide != null && !writeSide.isAlive()
  • && !closedByWriter && (in < 0)) {
  • throw new IOException("Write end dead");
  • }
  • readSide = Thread.currentThread();
  • int trials = 2;
  • while (in < 0) {
  • if (closedByWriter) {
  • /* closed by writer, return EOF */
  • return -1;
  • }
  • if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
  • throw new IOException("Pipe broken");
  • }
  • /* might be a writer waiting */
  • notifyAll();
  • try {
  • wait(1000);
  • } catch (InterruptedException ex) {
  • throw new java.io.InterruptedIOException();
  • }
  • }
  • int ret = buffer[out++];
  • if (out >= buffer.length) {
  • out = 0;
  • }
  • if (in == out) {
  • /* now empty */
  • in = -1;
  • }
  • return ret;
  • }
  • // 从管道(的缓冲)中读取数据,并将其存入到数组b中
  • public synchronized int read(char cbuf[], int off, int len) throws IOException {
  • if (!connected) {
  • throw new IOException("Pipe not connected");
  • } else if (closedByReader) {
  • throw new IOException("Pipe closed");
  • } else if (writeSide != null && !writeSide.isAlive()
  • && !closedByWriter && (in < 0)) {
  • throw new IOException("Write end dead");
  • }
  • if ((off < 0) || (off > cbuf.length) || (len < 0) ||
  • ((off + len) > cbuf.length) || ((off + len) < 0)) {
  • throw new IndexOutOfBoundsException();
  • } else if (len == 0) {
  • return 0;
  • }
  • /* possibly wait on the first character */
  • int c = read();
  • if (c < 0) {
  • return -1;
  • }
  • cbuf[off] = (char)c;
  • int rlen = 1;
  • while ((in >= 0) && (--len > 0)) {
  • cbuf[off + rlen] = buffer[out++];
  • rlen++;
  • if (out >= buffer.length) {
  • out = 0;
  • }
  • if (in == out) {
  • /* now empty */
  • in = -1;
  • }
  • }
  • return rlen;
  • }
  • // 是否能从管道中读取下一个数据
  • public synchronized boolean ready() throws IOException {
  • if (!connected) {
  • throw new IOException("Pipe not connected");
  • } else if (closedByReader) {
  • throw new IOException("Pipe closed");
  • } else if (writeSide != null && !writeSide.isAlive()
  • && !closedByWriter && (in < 0)) {
  • throw new IOException("Write end dead");
  • }
  • if (in < 0) {
  • return false;
  • } else {
  • return true;
  • }
  • }
  • // 关闭PipedReader
  • public void close() throws IOException {
  • in = -1;
  • closedByReader = true;
  • }
  • }

4. 管道通信示例

PipedWriter和PipedReader与之前介绍过的PipedOutputStream和PipedInputStream非常相似,这里将沿用之前的例子,做一定的修改,来满足使用PipedWriter和PipedReader通信测试示例:

  • package com.coderap;
  • import java.io.*;
  • class Sender implements Runnable {
  • // 管道流
  • private PipedWriter pipedWriter;
  • public Sender(PipedWriter pipedWriter) {
  • this.pipedWriter = pipedWriter;
  • }
  • @Override
  • public void run() {
  • // 1536 = 1024 + 512
  • char[] chars = new char[1536];
  • for (int j = 0; j < chars.length; j++) {
  • if (j == 1023) {
  • // 第1024个字节写入*
  • chars[j] = '*';
  • } else if (j == 1024) {
  • // 第1025个字节写入#
  • chars[j] = '#';
  • } else {
  • // 其他字节循环写入a-z
  • char c = (char) ('a' + j % 26);
  • chars[j] = c;
  • }
  • }
  • try {
  • // 向管道输出流中写数据
  • System.out.println("\n" + "Time: " + System.currentTimeMillis() + ", Sender sent message length: " + chars.length + ", Message body: " + new String(chars));
  • pipedWriter.write(chars);
  • } catch (Exception e) {
  • e.printStackTrace();
  • } finally {
  • try {
  • pipedWriter.close();
  • } catch (IOException e) {
  • e.printStackTrace();
  • }
  • }
  • }
  • }
  • class Receiver implements Runnable {
  • // 管道流
  • private PipedReader pipedReader;
  • public Receiver(PipedReader pipedReader) {
  • this.pipedReader = pipedReader;
  • }
  • @Override
  • public void run() {
  • try {
  • // 从管道输入流中读数据
  • char[] chars = new char[2048];
  • int read;
  • while ((read = pipedReader.read(chars)) != -1) {
  • System.out.println("\n" + "Time: " + System.currentTimeMillis() + ", Receiver received message length: " + read + ", Message body: " + new String(chars,0, read));
  • }
  • } catch (Exception e) {
  • e.printStackTrace();
  • } finally {
  • try {
  • pipedReader.close();
  • } catch (IOException e) {
  • e.printStackTrace();
  • }
  • }
  • }
  • }
  • public class PipedTest {
  • public static void main(String[] args) {
  • // 创建并连接两个管道流
  • PipedReader pipedReader = new PipedReader();
  • PipedWriter pipedWriter = new PipedWriter();
  • try {
  • pipedReader.connect(pipedWriter);
  • } catch (IOException e) {
  • e.printStackTrace();
  • }
  • // 启动两个线程
  • new Thread(new Receiver(pipedReader)).start();
  • new Thread(new Sender(pipedWriter)).start();
  • }
  • }

根据源码,有以下需要注意的点:

  1. in.connect(out)可以将管道输入流和管道输出流关联起来。从PipedWriter和PipedReader中connect()的源码可以知道out.connect(in)等价于in.connect(out)
  2. 实际上PipedWriter的write(int c)write(char cbuf[], int off, int len)方法会调用PipedReader的receive(int c)receive(char c[], int off, int len),这两个方法的作用就是将管道输出流中的数据保存到管道输入流的缓冲区中,而管道输入流的缓冲区buffer的默认大小是1024个字节。
  3. 调用PipedReader的read()read(char cbuf[], int off, int len)可以从管道输入流PipedReader中读取数据,或保存到cbuf中,由于管道输入流的缓冲区cbuf的默认大小是1024个字节,因此默认情况下一次最多只能读取1024个字符。在每次调用read()方法前,都会判断缓存区是否有数据(依据in变量判断),如果没有的话就先让出当前的锁(即让输出管道的写的线程先运行)。PipedWriter要写入(调用PipedReader的receive方法)1536个字节的数据时,缓存区大小不够存放,因此当一次性往cbuf中写入1024个字节后,会先调用notifyAll(),再调用wait(1000),目的就是把刚才写入的内容被读出,然后再把剩下的512个字符再覆盖写入cbuf,因此上面的Receiver在读取数据时分了两次进行读取。