聲明:本系列只供本人自學使用宦言,勿噴扇单。
參考:https://www.cnblogs.com/skywang12345/p/io_04.html
管道流一般用于線程間的通訊。
大致流程:在線程A中向PipedOutputStream中寫入數據奠旺,這些數據會自動的發(fā)送到與PipedOutputStream對應的PipedInputStream中蜘澜,進而存儲在PipedInputStream的緩沖中施流;此時,線程B通過讀取PipedInputStream中的數據鄙信,就可以實現瞪醋,線程A和線程B的通信。
一扮碧、PipedInputStream
public class PipedInputStream extends InputStream{
// 循環(huán)緩存數組趟章,默認1024
protected byte buffer[];
// PipedOutputStream往PipedInputStream的buffer中寫入數據的下一個位置
protected int in = -1;
// PipedInputStream從buffer中讀數據的下一個位置
protected int out = 0;
}
- 構造器
// 使得 PipedOutputStream與該PipedInputStream建立連接
public PipedInputStream(PipedOutputStream src) throws IOException {
this(src, DEFAULT_PIPE_SIZE);
}
public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException {
initPipe(pipeSize);
connect(src);
}
// 構造未建立連接的PipedInputStream
public PipedInputStream()
public PipedInputStream(int pipeSize)
-核心方法
// 建立連接
public void connect(PipedOutputStream src) throws IOException {
src.connect(this);
}
// 是PipedOutputStream的write(int b)方法中調用的,使得PipedInputStream能receive
protected synchronized void receive(int b) throws IOException{
1.檢查連接狀態(tài)慎王,checkStateForReceive();
2.假如寫入的所有數據已全部讀完(即in==out)蚓土,awaitSpace();
將新寫入的數據添加到buffer中
}
// 若 “寫入管道” 已將 “讀取管道” 的緩存buffer寫滿,則需要執(zhí)行awaitSpace()操作
// 喚醒“讀取管道”的線程進行讀壤涤佟(讀完即可清空buffer繼續(xù)寫入)
private void awaitSpace() throws IOException {
while (in == out) {
checkStateForReceive();
notifyAll();
try {
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
}
// 接收byte[]數組
// 在PipedOutputStream的write(byte b[], int off, int len)方法中調用蜀漆,使得PipedInputStream能receive
synchronized void receive(byte b[], int off, int len) throws IOException{
checkStateForReceive();
writeSide = Thread.currentThread();//獲取寫入線程
int bytesToTransfer = len;//PipedOutputStream寫入的字節(jié)總數
while (bytesToTransfer > 0) {
// 第一次執(zhí)行時,in=-1,out=0咱旱,證明buffer為空
if (in == out)
awaitSpace();
// 計算可拷貝到buffer的字節(jié)總數 nextTransferAmount
int nextTransferAmount = 0;
if (out < in) {
nextTransferAmount = buffer.length - in;
} else if (in < out) {
if (in == -1) {
in = out = 0;
nextTransferAmount = buffer.length - in;
} else {
nextTransferAmount = out - in;
}
}
if (nextTransferAmount > bytesToTransfer)
nextTransferAmount = bytesToTransfer;
assert(nextTransferAmount > 0);
// 拷貝到buffer
System.arraycopy(b, off, buffer, in, nextTransferAmount);
bytesToTransfer -= nextTransferAmount;
off += nextTransferAmount;
in += nextTransferAmount;
// 已經寫入1024個字節(jié)后(即buffer已滿)确丢,需要將循環(huán)數組in歸0
if (in >= buffer.length) {
in = 0;
}
}
}
// 返回下一個字節(jié)
public synchronized int read() throws IOException
// 將buffer的數據讀取到 byte b[],當都讀完后吐限,清空buffer(即 int=-1,out=0)
public synchronized int read(byte b[], int off, int len) throws IOException
二鲜侥、PipedOutputStream
public class PipedOutputStream extends OutputStream{
private PipedInputStream sink;
}
- 構造器
// 與PipedInputStream建立連接
public PipedOutputStream(PipedInputStream snk) throws IOException {
connect(snk);
}
// 構造未建立連接的PipedOutputStream
public PipedOutputStream() {
}
-核心方法
// 建立連接
public synchronized void connect(PipedInputStream snk) throws IOException
public void write(int b) throws IOException{
sink.receive(b);
}
public void write(byte b[], int off, int len) throws IOException{
sink.receive(b, off, len);
}
// 清空PipedOutputStream
// 目的是讓“管道輸入流”放棄對當前資源的占有,讓其它的等待線程(等待讀取管道輸出流的線程)讀取“管道輸出流”的值诸典。
public synchronized void flush() throws IOException {
if (sink != null) {
synchronized (sink) {
sink.notifyAll();
}
}
}
// 關閉PipedOutputStream并釋放資源
public void close() throws IOException {
if (sink != null) {
sink.receivedLast();
}
}
三描函、demo
public class Test3 {
public static void main(String[] args) throws IOException {
Sender sender = new Sender();
Receiver receiver = new Receiver();
receiver.getPipedInputStream().connect(sender.getPipedOutputStream());
sender.start();
receiver.start();
}
}
class Sender extends Thread {
private PipedOutputStream pipedOutputStream = new PipedOutputStream();
public PipedOutputStream getPipedOutputStream() {
return pipedOutputStream;
}
@Override
public void run() {
writeMessage();
}
private void writeMessage() {
try {
pipedOutputStream.write("哈嘍,China".getBytes());
pipedOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
class Receiver extends Thread {
private PipedInputStream pipedInputStream = new PipedInputStream();
public PipedInputStream getPipedInputStream() {
return pipedInputStream;
}
@Override
public void run() {
readMessage();
}
void readMessage() {
try {
byte[] bytes = new byte[1024];
int len = 0;
while ((len = pipedInputStream.read(bytes)) != -1) {
System.out.println(new String(bytes, 0, len));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}