場景
假設我們需要上傳一組動態(tài)增加的數據, 輸入端可以看作inputSteam, 輸入端是outputSteam, 但是輸入和輸出端不能直接對接, 那么我們要怎樣實現(xiàn)呢?
我希望的解決方案時, 輸入和輸出通過一個"數據池"間接連接, 輸入端把數據寫到數據池中, 輸出端從數據池中讀數據, 這里要求數據池有"阻塞"功能, 即數據池滿了阻塞輸入端, 數據池空了, 阻塞輸出端.
以上效果可以使用PipedInputStream
和PipedOutputStream
實現(xiàn).
前言
- 這兩個類需要配套使用, 可以實現(xiàn)管道(pipe)傳輸數據
- 默認的使用方式是, 通過管道連接線程A和B, 在A線程使用
PipedOutputStream
寫數據, 數據緩存到"管道"后, B線程使用PipedInputStream
讀取數據, 以此完成數據傳輸, 如果在同一個線程使用這兩個類, 可能導致死鎖
PipedOutputStream
PipedOutputStream
是管道的發(fā)送端. 寫線程通過它來往"管道"填充數據.
我們先看看它有哪幾個方法, 從命名和注釋基本就能知道每個方法的作用
// 關聯(lián)PipedInputStream
public void connect(PipedInputStream snk)
// 寫一個數據
public void write(int b)
// 寫一段數據
public void write(byte b[], int off, int len)
// 通知讀線程, 管道中有數據等待讀取
public void flush()
// 關閉發(fā)送端, 不再發(fā)送數據
public void close()
以上注釋已經大致說明了這個類的功能了, 接著我們逐個方法分析
connect
public synchronized void connect(PipedInputStream snk) throws IOException {
// 先確保
// 1. 連接對象(輸入的snk)不能為空
// 2. 不能重復連接
sink = snk;
snk.in = -1;
snk.out = 0;
snk.connected = true;
}
從上可以看出, connect
方法就是修改連接的PipedInputStream
的成員變量, 使其處于已連接狀態(tài).
write
public void write(int b) throws IOException {
// 確保sink不為空, 即確保已經連接
sink.receive(b);
}
public void write(byte b[], int off, int len) throws IOException {
// 先確保
// 1. 已經連接
// 2. 輸出數組b不為空
// 3. off和len不會導致數組越界
if (sink == null) {
// ...
} else if (len == 0) {
// 如果len == 0, 表示不讀取數據, 所以可以直接返回
return;
}
sink.receive(b, off, len);
}
從上可以看出, 兩個write
方法, 最后都調用了響應的PipedInputStream#receive
方法, 這表明
數據存儲的地方和寫數據的具體邏輯都在
PipedInputStream
中
后面我們再詳細分析.
flush
public synchronized void flush() throws IOException {
if (sink != null) {
synchronized (sink) {
sink.notifyAll();
}
}
}
這個方法先嘗試獲取sink的鎖, 然后通過notifyAll()
來調度線程, 在這里, 具體就是使讀線程開始讀取數據, 這里涉及讀寫線程間的溝通調度問題, 在了解完PipedInputStream
之后我們再重新看這個問題.
close
public void close() throws IOException {
if (sink != null) {
sink.receivedLast();
}
}
這個方法就是簡單的調用了PipedInputStream#receivedLast()
方法, 從方法名可以判斷出, 這個方法就是通知PipedInputStream
, 數據已經填充完畢.
總結
從上面的分析可以看出, PipedOutputStream
基本就是一個"接口"類, 不會對數據進行實際的操作, 也不承擔具體的職責, 只負責把數據交給PipedInputStream
處理.
下面我們接著分析最關鍵的PipedInputStream
的源碼
PipedInputStream
成員變量
我們先看下關鍵的幾個變量
// 緩存數組, "管道"數據的存儲的地方
protected byte buffer[];
// 寫下一個數據時, 保存到緩存數組的位置
// 小于0表示無可讀數據, 緩存數組為空
// in == out時表示緩存數組已滿
protected int in = -1;
// 下一個被讀數據在緩存數組的位置
protected int out = 0;
看上面3個成員變量我們基本可以知道
"管道"內部使用了數組來緩存寫入的數據, 等待讀取. 通過
in
和out
兩個值來記錄數組的寫位置和讀位置
其余變量都是一些狀態(tài)標識
// 寫數據端(輸入端)是否已經關閉
boolean closedByWriter = false;
// 讀數據端(輸出端)是否已經關閉
volatile boolean closedByReader = false;
// 是否處于已連接狀態(tài)
boolean connected = false;
// 記錄讀線程
Thread readSide;
// 記錄寫線程
Thread writeSide;
這些變量都是用于判斷當前"管道"的狀態(tài)
其中readSide
和writeSide
是一種簡單的標記讀寫線程的方式, 源碼注釋中也有說明這種方式并不可靠, 這種方式針對的應該是兩條線程的情況, 所以我們使用的時候應該盡量按照設計意圖來使用
在兩條線程中建立"管道"傳遞數據, 寫線程寫數據, 讀線程讀數據.
構造函數
它包含了好幾個構造函數, 我們只看參數最多的那個
public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException {
initPipe(pipeSize);
connect(src);
}
最終都會要求我們調用上面的兩個方法, 都比較簡單就不貼代碼了
-
initPipe()
里面對byte數組buffer
變量進行賦值, 也就是初始化緩沖區(qū)域 -
connect()
方法直接調用了PipedOutputStream#connect
, 上面已經分析過了, 最終效果就是指明PipedOutputStream
的連接對象, 改變connected
變量的值, 使得PipedInputStream
處于連接狀態(tài).
receive
通過上面PipedOutputStream
的分析可以知道, 寫數據的方法會調用PipedInputStream
的reveive
方法, 所以我們首先分析這個方法, 了解寫數據的邏輯. 注意閱讀注釋!
// 寫單個數據
protected synchronized void receive(int b) throws IOException {
// 檢查當前"管道"狀態(tài), 確保能夠讀寫數據
checkStateForReceive();
// 本方法由PipedOutputStream調用, 所以線程是寫線程, 記錄該線程
writeSide = Thread.currentThread();
if (in == out)
// in == out表示緩存數組已經滿了, 阻塞線程等待
// 這里確保了未讀的緩存數據不會丟失
awaitSpace();
// 當檢測到緩存數組有空間, 等待結束后, 會繼續(xù)執(zhí)行以下代碼
if (in < 0) {
// in小于0表示當前無數據, 設置讀, 寫位置都是0
in = 0;
out = 0;
}
// 寫操作
// 1. 把數據寫到目標位置(in)
// 2. 后移in, 指明下一個寫數據的位置
buffer[in++] = (byte)(b & 0xFF);
// 如果in超出緩存長度, 回到0, 循環(huán)利用緩存數組
if (in >= buffer.length) {
in = 0;
}
}
注意該方法帶有synchronized
關鍵字, 表明在該方法內, 會持有對象鎖, 我們留到最后再分析各個環(huán)節(jié)中, 對象鎖的歸屬問題.
在寫數據前會先通過checkStateForReceive
檢查"管道"狀態(tài), 確保
- 當前處于連接狀態(tài)
- 管道讀寫兩端都沒有被關閉
- 讀線程狀態(tài)正常
接著用writeSide
記錄當前線程為寫線程, 用來后續(xù)判斷線程狀態(tài);
然后判斷目標位置(in
), 如果in == out
表明當前緩存數組已經滿了, 不能再寫數據了, 所以會通過awaitSpace()
方法阻塞寫線程;
// 此時寫線程持有鎖
private void awaitSpace() throws IOException {
// 只有緩存數組已滿才需要等待
while (in == out) {
// 檢查管道狀態(tài), 防止在等待的過程中狀態(tài)發(fā)生變化
checkStateForReceive();
// 標準用法中僅涉及兩條線程, 所以這里可以認為是通知讀線程讀數據
notifyAll();
try {
// 釋放對象鎖, 等待讀線程讀數據, 調用后就會阻塞寫線程
// 1s后取消等待是為了再次檢查管道狀態(tài)
// 注意等待結束后, 鎖仍然在寫線程
wait(1000);
} catch (InterruptedException ex) {
// 直接拋出異常
IoUtils.throwInterruptedIoException();
}
}
}
以上基本可以概括為
緩存數組有空間時直接寫數據, 無空間時阻塞寫線程, 直至有空間可以寫數據
接著分析寫一段數據的receive(byte[], int, int)
方法, 注意閱讀注釋!
synchronized void receive(byte b[], int off, int len) throws IOException {
checkStateForReceive();
writeSide = Thread.currentThread();
// len是需要寫進緩存數據的總長度
// bytesToTransfer用來記錄剩余個數
int bytesToTransfer = len;
// 循環(huán)寫數據過程, 直至需要寫的數據全部處理完畢
while (bytesToTransfer > 0) {
if (in == out)
// in == out表示緩存區(qū)域已經滿了, 阻塞線程等待
awaitSpace();
// nextTransferAmount用來記錄本次過程寫進緩存的個數
int nextTransferAmount = 0;
if (out < in) {
// 因為out必然大于等于0, 所以這里 0 <= out < int
// out < in 表示[in, buffer.length)和[0, out)兩個區(qū)間可以寫數據
// 先寫數據進[in, buffer.length)區(qū)間, 避免處理頭尾連接的邏輯, 如果還有數據剩余, 留到下一個循環(huán)處理
nextTransferAmount = buffer.length - in;
} else if (in < out) {
// 注意in有可能為-1, 所以特殊判斷下
if (in == -1) {
// in == -1表示緩存數組為空, 整個數組都可以寫數據
// 從這里可知, 單次寫數據最大長度就是緩存數組的長度
in = out = 0;
nextTransferAmount = buffer.length - in;
} else {
// in < out 表示[in, out)區(qū)間可以寫數據
nextTransferAmount = out - in;
}
}
// 到這里nextTransferAmount表示本次過程**最多**可以寫的數據
if (nextTransferAmount > bytesToTransfer)
// 位置比需要的多, 所以修改nextTransferAmount
nextTransferAmount = bytesToTransfer;
// 經過上面的判斷, nextTransferAmount表示本次過程可以寫進緩存的個數
assert(nextTransferAmount > 0);
// 把數據寫進緩存
System.arraycopy(b, off, buffer, in, nextTransferAmount);
// 計算剩余個數
bytesToTransfer -= nextTransferAmount;
// 移動數據起點
off += nextTransferAmount;
// 后移in
in += nextTransferAmount;
// 如果in超出緩存長度, 回到0
if (in >= buffer.length) {
in = 0;
}
}
}
代碼邏輯注釋已經說明得很清楚了, 當你需要處理頭尾相連的數組時, 可以學習上面循環(huán)處理數據的方法, 邏輯清晰, 不需要太多的邊界判斷.
receiveLast
當輸入端關閉時(調用PipedOutputStream#close()
), 會調用receivedLast()
synchronized void receivedLast() {
// 標記輸入端關閉
closedByWriter = true;
// 通知讀線程讀數據
notifyAll();
}
該方法使用變量標記輸入端已經關閉, 表示不會有新數據寫入了.
read
分析完寫數據, 接下來該分析讀數據了.
public synchronized int read() throws IOException {
// synchronized關鍵字, 讀線程需要持有鎖才能讀數據
// 先檢查管道狀態(tài)
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByReader) {
throw new IOException("Pipe closed");
} else if (writeSide != null && !writeSide.isAlive()
&& !closedByWriter && (in < 0)) {
// 只要in >= 0, 表示還有數據沒有讀, 所以不拋出異常
// 這個判斷表明了, 即使輸入端已經調用了close, 也能繼續(xù)讀已經寫入的數據
throw new IOException("Write end dead");
}
// 記錄讀線程
readSide = Thread.currentThread();
int trials = 2;
while (in < 0) {
// in<0表示緩存區(qū)域為空, 只要輸入端沒有被關閉, 阻塞線程等待數據寫入, 即等待in >= 0
if (closedByWriter) {
// 輸入端關閉了, 同時in < 0, 表示數據傳輸完畢了, 返回-1
return -1;
}
// 檢查寫線程的狀態(tài), 線程狀態(tài)異常則認為管道異常, 檢查2次
if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
throw new IOException("Pipe broken");
}
// 這里可以認為是通知寫線程寫數據
notifyAll();
try {
// 阻塞線程, 等待1s, 這里會釋放鎖, 給機會寫線程獲取鎖, 寫數據
wait(1000);
} catch (InterruptedException ex) {
IoUtils.throwInterruptedIoException();
}
}
// 執(zhí)行到這里證明in >= 0, 即緩存數組中有數據
// 關鍵的讀操作
// 1. 讀取out指向的byte數據
// 2. 后移out
// 3. 把byte轉成int, 高位補0
int ret = buffer[out++] & 0xFF;
// out超出長度則回到位置0
if (out >= buffer.length) {
out = 0;
}
if (in == out) {
// 讀取的數據追上了輸入的數據, 則當前緩存區(qū)域為空, 所以設置in = -1
in = -1;
}
return ret;
}
從上面的注釋分析可以知道
- 即使調用了
PipedOutputStream#close()
, 只要管道中還有數據, 仍可以讀數據, 所以實際使用時, 輸入端輸入完畢后可以直接close
輸入端. - 當管道中沒有數據時, 會阻塞讀線程, 直至管道被關閉, 線程異常或者數據被寫入到管道中.
接著看看讀取一段數據的方法
public synchronized int read(byte b[], int off, int len) throws IOException {
// 參數byte[](下面稱輸出數組)是數據讀取后存放的地方, 所以要先檢查該數組
if (b == null) {
// 確保輸出數組不為null, 否則讀出的數據不能寫入
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
// 確保下標不會越界
throw new IndexOutOfBoundsException();
} else if (len == 0) {
// len參數表示需要讀取的長度, 等于0時相當于不讀數據, 所以直接返回
return 0;
}
// 先單獨讀一個數據是為了確保已經有數據寫入, 因為如果當前無數據, 則會阻塞當前的讀線程
int c = read();
// 返回值小于0(實際上只能是-1), 表示管道已經沒有數據了, 所以這里也直接返回-1
if (c < 0) {
return -1;
}
// 把讀取到的第一個數據放到輸出數組, 看后面的代碼時緊記這里已經讀了1個數據
b[off] = (byte) c;
// 記錄讀取到的數據長度
int rlen = 1;
// 循環(huán)條件:
// in >= 0確保還有數據可以讀
// len > 1確保只讀取外部請求的數據長度, 因為上面已經讀了1個數據, 所以是大于1, 而不是大于0
while ((in >= 0) && (len > 1)) {
// available用來記錄當前可以讀取的數據
int available;
if (in > out) {
// in > out表示[out, in)區(qū)間數據可讀
// in的值正常情況下是不會大于buffer.length的, 因為當 in == buffer.length時, in就會賦值0
// 這里的Math.min顯得有點多余, 可能是為了以防萬一吧
available = Math.min((buffer.length - out), (in - out));
} else {
// 首先in是不會等于out的, 因為如果相等, 在上面讀第一個數據的時候就會把in賦值-1, 也就不會進入這個循環(huán)
// 當in < out表示[out, buffer.length)和[0, in)兩個區(qū)間的數據可讀
// 和receive方法類似, 為了不處理跨邊界的情況, 先讀[out, buffer.length)區(qū)間數據
available = buffer.length - out;
}
// 外部已經讀了一個數據, 所以只需要讀(len - 1)個數據了
if (available > (len - 1)) {
available = len - 1;
}
// 經過上面的判斷, available表示本次需要讀的數據長度
// 復制數據到輸出數組
System.arraycopy(buffer, out, b, off + rlen, available);
// 后移out變量
out += available;
// 記錄已經讀到的數據量
rlen += available;
// 計算剩余需要讀的數據
len -= available;
// 如果已經讀到緩存數組的尾部, 回到開頭
if (out >= buffer.length) {
out = 0;
}
if (in == out) {
// in == out表示已經沒有數據可以讀了, 所以in賦值-1
in = -1;
}
}
return rlen;
}
上面的方法我們需要注意:
while
方法體內是不會阻塞讀線程的!while
方法體內是不會阻塞讀線程的!while
方法體內是不會阻塞讀線程的! 重要的事情說3遍~ 所以如果管道內只有1個數據, 那么讀取到輸出數組的就只有這1個數據,read
方法返回值會是1, 在讀取數據后處理輸出數組時需要特別注意這點.
available
我們在讀數據前可以利用available()
先看看管道中的數據個數.
public synchronized int available() throws IOException {
if(in < 0)
// 管道中無數據
return 0;
else if(in == out)
// 緩存數組已滿
return buffer.length;
else if (in > out)
// [out, in)區(qū)間內為有效數據
return in - out;
else
// in < out
// [in, out)區(qū)間為無效數據, 其余為有效數據, 所以長度為 buffer.length - (out - in)
return in + buffer.length - out;
}
到這里我們已經把所有PipedOutputStream
和PipedInputStream
的所有方法分析完畢了~ 接著我們再分析下讀寫過程中對象鎖的歸屬問題.
鎖
分析這部分我們先要了解下wait
和notifyAll
的作用, 可以參考知乎上這個回答java中的notify和notifyAll有什么區(qū)別阎肝? - 文龍的回答 - 知乎, 本文不再說明了, 重點理解鎖池和等待池概念
鎖池:假設線程A已經擁有了某個對象(注意:不是類)的鎖竞漾,而其它的線程想要調用這個對象的某個synchronized方法(或者synchronized塊)矗晃,由于這些線程在進入對象的synchronized方法之前必須先獲得該對象的鎖的擁有權外构,但是該對象的鎖目前正被線程A擁有祭芦,所以這些線程就進入了該對象的鎖池中孔厉。
等待池:假設一個線程A調用了某個對象的wait()方法墓律,線程A就會釋放該對象的鎖后蒂窒,進入到了該對象的等待池中
首先, 需要注意, PipedOutputStream
中, 兩個write
方法都沒有synchronized
關鍵字, 所以我們不需要關心PipedOutputStream
的對象鎖.
我們重點分析PipedInputStream
里面, read
和receive
方法.
假設我們先調用receive
寫數據, 后調用read
讀數據
當我們寫數據時, 進入了receive
方法, 因為synchronized
關鍵字, 此時寫線程會獲取到了對象鎖, 然后寫數據到管道中, 注意, 在這個過程中, 讀線程是不能通過read
方法讀取數據的, 因為讀線程獲取不了對象鎖, 如果這次寫操作中, 管道中的緩存數組滿了, 此時寫線程會進入awaitSpace()
方法, 在該方法內, 寫線程先調用了notifyAll
方法, 使讀線程進入鎖池準備競爭對象鎖, 然后調用wait(1000)
方法, 在這1s內, 寫線程釋放了對象鎖, 然后進入等待池.
寫線程釋放對象鎖后, 讀線程就能夠獲取對象鎖, 進入read
方法內了, 然后讀數據, 只要管道中存在至少一個數據, 就不會阻塞線程, 讀取數據后直接退出方法, 釋放對象鎖, 如果這次讀操作中, 管道中的緩存數組沒有任何數據, 此時讀線程就會調用notifyAll
方法, 使寫線程從等待池移到鎖池, 準備競爭對象鎖, 然后再調用wait(1000)
方法, 在這1s內, 讀線程釋放對象鎖, 自己進入等待池.
以上就是一次讀寫中, 對象鎖的轉移過程, 但是在實際過程中, 我們都是兩個線程在各自的循環(huán)體內一直讀數據和一直寫數據的, 所以每一次循環(huán)的時候都會競爭鎖, 可能先讀后寫, 或者先寫后讀.
總結
分析這兩個類的源碼我們應該可以學習到
-
InputSteam
和OutputSteam
的接口含義 - 使用數組緩存數據的方法, 使用
while
循環(huán)避免處理邊界問題 -
wait
和notifyAll
協(xié)調讀寫線程的邏輯 - 使用這兩個類實現(xiàn)傳輸數據流