前言
與很多Android小伙伴一樣嘁傀,接觸到Okio也是在接觸Okhttp之后返弹。在Okhttp中智润,每個請求通過攔截鏈處理悦冀,而Okio則在CallServerInterceptor中安吁,對建立起連接的請求進(jìn)行讀寫醉蚁。
剛好自己對Java原生IO也不熟,就兩個一起學(xué)了鬼店。本篇文章分為三個部分网棍,第一部分介紹IO,第二部分簡要介紹Java中的IO妇智,第三部分介紹Okio滥玷。熟悉的部分自行跳過。
什么是IO
程序與運(yùn)行時數(shù)據(jù)在內(nèi)存中駐留巍棱,由CPU負(fù)責(zé)執(zhí)行惑畴,涉及到數(shù)據(jù)交換的地方,如磁盤航徙、網(wǎng)絡(luò)等如贷,就需要IO接口。IO中涉及到輸入流 Input Stream 與輸出流 Output Stream的概念到踏,用來表達(dá)數(shù)據(jù)從一端杠袱,到達(dá)另一端的過程。
Input Stream 與 Output Stream 可以以內(nèi)存作為參照標(biāo)準(zhǔn)窝稿,加載到內(nèi)存的楣富,是輸入流,而從內(nèi)存中輸出到別的地方讹躯,如磁盤菩彬、網(wǎng)絡(luò)的缠劝,則稱為輸出流。比如File存于磁盤中骗灶,程序獲取File數(shù)據(jù)用來進(jìn)行其它操作惨恭,這是將數(shù)據(jù)讀入內(nèi)存中的過程,所以為輸入流耙旦,反之脱羡,程序?qū)⒏鞣N信息保存入File中,是將數(shù)據(jù)讀出內(nèi)存的過程免都,所以為輸出流锉罐;再比如,網(wǎng)絡(luò)操作绕娘,請求從客戶端來到服務(wù)端脓规,也就是數(shù)據(jù)從客戶端到達(dá)了服務(wù)端,那么對于客戶端险领,是輸出流侨舆,對服務(wù)端,是輸入流绢陌,響應(yīng)則相反挨下。如圖:
IO原理
- 用戶態(tài):對于操作系統(tǒng)而言,JVM只是一個用戶進(jìn)程(應(yīng)用程序)脐湾,處于用戶態(tài)空間中臭笆,處于用戶態(tài)空間的進(jìn)程是不能只能操作底層的硬件(磁盤/網(wǎng)卡)
- 系統(tǒng)調(diào)用:區(qū)別于用戶進(jìn)程調(diào)用,系統(tǒng)調(diào)用時操作系統(tǒng)級別的api秤掌,比如java IO的讀取數(shù)據(jù)過程(使用緩沖區(qū))愁铺,用戶程序發(fā)起讀操作,導(dǎo)致“syscall read”系統(tǒng)調(diào)用闻鉴,就會把數(shù)據(jù)搬入到一個buffer中帜讲;用戶發(fā)起寫操作,導(dǎo)致“syscal write”系統(tǒng)調(diào)用椒拗,將會把一個buffer中的數(shù)據(jù)搬出去(發(fā)送到網(wǎng)絡(luò)中 or 寫入到磁盤文件)
- 內(nèi)核態(tài):用戶態(tài)的進(jìn)程要訪問磁盤/網(wǎng)卡(也就是操作IO)似将,必須通過系統(tǒng)調(diào)用,從用戶態(tài)切換到內(nèi)核態(tài)(中斷蚀苛,trap)在验,才能完成
- 局部性原理:操作系統(tǒng)在訪問磁盤時,由于局部性原理堵未,操作系統(tǒng)不會每次只讀取一個字節(jié)(代價太大)腋舌,而是借助硬件直接存儲器存取(DMA)一次性讀取一片(一個或者若干個磁盤塊)數(shù)據(jù)渗蟹。因此块饺,就需要有一個“中間緩沖區(qū)”——即內(nèi)核緩沖區(qū)赞辩。先把數(shù)據(jù)從磁盤讀到內(nèi)核緩沖區(qū)中,然后再把數(shù)據(jù)從內(nèi)核緩沖區(qū)搬到用戶緩沖區(qū)授艰。
用戶態(tài)于內(nèi)核態(tài)的轉(zhuǎn)化時耗時操作辨嗽,甚至可能比所要執(zhí)行的函數(shù)執(zhí)行時間還長,應(yīng)用程序進(jìn)行IO操作時淮腾,應(yīng)盡量減少轉(zhuǎn)換操作糟需。并且由于局部性原理,操作系統(tǒng)度讀取數(shù)據(jù)是整片讀取的谷朝,假設(shè)一片的數(shù)據(jù)為4096字節(jié)洲押,那么0~4096字節(jié)范圍內(nèi)的數(shù)據(jù),對于操作系統(tǒng)來說圆凰,讀取時間差異是可以忽略不計的杈帐。因此,緩沖區(qū)的是為了解決速度不匹配問題专钉。
Java原生IO
Java程序自然要遵守并利用IO的特點(diǎn)娘荡。在Java里,輸入流為InputStream的子類驶沼,輸出流為OutputStream的子類,并且具體的讀操作read()與寫操作write()争群,均有具體場景下的具體子類來實現(xiàn)回怜。而涉及到IO操作,就拋不開BufferedInputStream和BufferedOutputStream换薄,前者對應(yīng)輸入流玉雾,后者對應(yīng)輸出流,這兩個類是流上緩沖區(qū)的實現(xiàn)轻要。
假設(shè)要將一些自定義的數(shù)據(jù)寫入文件中复旬,那構(gòu)建出的輸出流可能如下:
new DataOutputStream(new BufferedOutputStream(new FileOutputStream("filePath")));
其中DataOutputStream功能為轉(zhuǎn)譯,將數(shù)據(jù)轉(zhuǎn)換成對應(yīng)字節(jié)冲泥,BufferedOutputStream為緩沖驹碍,F(xiàn)ileOutputStream則為具體輸出實現(xiàn),也就是調(diào)用下層API的上層觸發(fā)點(diǎn)凡恍。實際上志秃,輸入流與輸出流類似,流的構(gòu)造涉及裝飾模式嚼酝,這樣可以把想要的功能拼裝起來浮还。
IO操作涉及到的類有很多,不一一介紹闽巩,主要看BufferedInputStream與BufferedOutputStream如何實現(xiàn)緩沖功能钧舌。
其它IO類可以參考
輸入流緩沖 BufferedInputStream
BufferedInputStream的讀取操作有:
- read():讀取下一個字節(jié)
- read(byte b[], int off, int len): 讀取一段數(shù)據(jù)到b[]中
看讀取一段數(shù)據(jù)担汤,讀取下一字節(jié)的API自然能理解:
// 默認(rèn)的緩沖區(qū)存儲數(shù)據(jù)大小
private static int DEFAULT_BUFFER_SIZE = 8192;
// 存儲緩沖區(qū)數(shù)據(jù)
protected volatile byte buf[];
// 當(dāng)前緩沖區(qū)的有效數(shù)據(jù) = count - pos
protected int count;
// 當(dāng)前緩沖區(qū)讀的索引位置,在pos前的數(shù)據(jù)是無效數(shù)據(jù)
protected int pos;
// 當(dāng)前緩沖區(qū)的標(biāo)記位置洼冻,需要配合 mark() 和 reset()使用
// mark()將pos位置索引到到markpos
// reset() 將pos值重置為markpos崭歧,當(dāng)再次read()數(shù)據(jù)時,會從mark()標(biāo)記的位置開始讀取數(shù)據(jù)
protected int markpos = -1;
// 緩沖區(qū)可標(biāo)記位置的最大值
protected int marklimit;
public synchronized int read(byte b[], int off, int len)
throws IOException
{
// 獲取buf碘赖,在流關(guān)閉情況下buf被釋放
getBufIfOpen();
// 檢查要獲取的數(shù)據(jù)(假設(shè)有)驾荣,b[]是否內(nèi)存得下
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
int n = 0;
for (;;) {
// 讀取數(shù)據(jù)到b[], nread為已讀取的數(shù)量
int nread = read1(b, off + n, len - n);
// 已按照需求,將要求的數(shù)據(jù)讀取到b[]
if (nread <= 0)
return (n == 0) ? nread : n;
// 記錄已讀取到的數(shù)據(jù)數(shù)
n += nread;
if (n >= len)
return n;
// 是BufferedInputStream裝飾的輸入流普泡,BufferedInputStream只負(fù)責(zé)緩沖
InputStream input = in;
// 如果輸入流已關(guān)閉或者再沒有可讀數(shù)據(jù)播掷,則返回
if (input != null && input.available() <= 0)
return n;
}
}
讀取操作需要通過私有函數(shù)read1()進(jìn)行讀取,每次讀取完后撼班,read1()都會返回int表示讀取到的字節(jié)數(shù)歧匈,-1則表示沒有讀取或讀取不到數(shù)據(jù),這種情況直接向上返回砰嘁。接著件炉,用n記錄每次讀取到的數(shù)據(jù),因為將數(shù)據(jù)讀取到 b[] 很可能一次讀取不滿矮湘。當(dāng)n滿足讀取需求或是再無可讀取數(shù)據(jù)時斟冕,向上返回。
private int read1(byte[] b, int off, int len) throws IOException {
// 緩沖區(qū)有效數(shù)據(jù)數(shù)量
int avail = count - pos;
if (avail <= 0) {
// 進(jìn)到這里說明緩沖區(qū)沒有可讀取的數(shù)據(jù)
// 需要讀取的數(shù)據(jù)量大于緩沖區(qū)能讀取的大小缅阳,使用緩沖區(qū)無意義
// 直接交給in去讀取
if (len >= getBufIfOpen().length && markpos < 0) {
return getInIfOpen().read(b, off, len);
}
// 緩沖區(qū)已沒有可讀取的數(shù)據(jù)磕蛇,對緩沖區(qū)填充
fill();
// 記錄緩沖區(qū)有效數(shù)據(jù)
avail = count - pos;
// 這里說明已經(jīng)讀不到有效數(shù)據(jù)
if (avail <= 0) return -1;
}
// 將要讀入 b[] 的數(shù)據(jù)量
int cnt = (avail < len) ? avail : len;
// 將緩沖區(qū)的數(shù)據(jù)讀入 b[]
System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
// 更新緩沖區(qū)索引位置,在fill()會被重重
pos += cnt;
return cnt;
}
read1()將數(shù)據(jù)讀入 b[]十办,需要是從緩沖區(qū)讀取還是直接從in讀取需要看具體情況秀撇。需要注意,如果要讀取的數(shù)據(jù)量len大于緩沖區(qū)存儲的數(shù)據(jù)量向族,就直接從in讀取呵燕,因在在這種情況下使用緩沖策略不能帶來優(yōu)化。緩沖的作用件相,是用來模仿CPU讀取數(shù)據(jù)整塊讀取的習(xí)慣再扭,在塊的數(shù)據(jù)范圍內(nèi),速度差異是可以不計的夜矗,因此緩沖可以拿到整塊的數(shù)據(jù)霍衫,在從緩沖區(qū)中讀取不超過塊范圍的數(shù)據(jù),是不用經(jīng)過系統(tǒng)調(diào)用的侯养。而在len大于緩沖區(qū)內(nèi)存儲的數(shù)據(jù)量情況下敦跌,如果使用緩沖策略,不僅用不到緩沖區(qū)的優(yōu)勢,反而增加了系統(tǒng)調(diào)度次數(shù)柠傍。
read1() 讀取數(shù)據(jù)可用下圖表示:
剩下填充緩沖區(qū)操作fill()
private void fill() throws IOException {
// 獲取緩沖區(qū)
byte[] buffer = getBufIfOpen();
if (markpos < 0)
// 沒有使用mark功能麸俘,重置pos
pos = 0;
else if (pos >= buffer.length)
if (markpos > 0) {
int sz = pos - markpos;
// 將 pos ~ markpos 之間的數(shù)據(jù)向左移動
// 移動完后數(shù)據(jù)位于 0 ~ sz,移動完后目前sz之后的數(shù)據(jù)無效
System.arraycopy(buffer, markpos, buffer, 0, sz);
pos = sz;
markpos = 0;
} else if (buffer.length >= marklimit) {
// 緩沖區(qū)容量大于可標(biāo)記限制惧笛,所有數(shù)據(jù)都不要了
// mark標(biāo)記也不要了
markpos = -1;
pos = 0;
} else if (buffer.length >= MAX_BUFFER_SIZE) {
// 緩沖區(qū)容量過大从媚,拋出異常
throw new OutOfMemoryError("Required array size too large");
} else {
// 可標(biāo)記區(qū)域大于緩沖區(qū)容量,對緩沖區(qū)進(jìn)行擴(kuò)容
int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
pos * 2 : MAX_BUFFER_SIZE;
if (nsz > marklimit)
nsz = marklimit;
byte nbuf[] = new byte[nsz];
System.arraycopy(buffer, 0, nbuf, 0, pos);
if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
throw new IOException("Stream closed");
}
buffer = nbuf;
}
// 記錄count位置
count = pos;
// 向in讀取數(shù)據(jù)患整,數(shù)量為緩沖區(qū)容量 - 當(dāng)前緩沖區(qū)索引
int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
// 讀到有效數(shù)據(jù)拜效,更新count
if (n > 0)
count = n + pos;
}
主要理解,從in讀取緩沖數(shù)據(jù)各谚,并更新pos和count紧憾,因為count - pos 得到緩沖區(qū)有效數(shù)據(jù),pos則是有效數(shù)據(jù)的起點(diǎn)昌渤。
輸出流緩沖 BufferedOutputStream
與輸入流BufferedInputStream相似赴穗,輸出流BufferedOutputStream同樣是為了減少系統(tǒng)調(diào)度,只不過二者的數(shù)據(jù)走向方向相反膀息。BufferedOutputStream接收數(shù)據(jù)并存入緩沖區(qū)般眉,在緩沖池滿或者主動調(diào)用flush()之后,觸發(fā)系統(tǒng)調(diào)度潜支,將緩沖池數(shù)據(jù)寫出甸赃。平時可能少見觸發(fā)flush()操作,在關(guān)閉輸出流接口操作close()時冗酿,也會線觸發(fā)flush()操作埠对。
與分析BufferedInputStream時類似,輸出流BufferedOutputStream直接看write()
public synchronized void write(byte b[], int off, int len) throws IOException {
if (len >= buf.length) {
// 要寫出的數(shù)據(jù)大于緩沖區(qū)的容量已烤,也不用緩沖區(qū)策略
// 先將緩沖區(qū)數(shù)據(jù)寫出
flushBuffer();
// 再直接通過輸出流out直接將數(shù)據(jù)寫出
out.write(b, off, len);
return;
}
if (len > buf.length - count) {
// 要寫出的數(shù)據(jù)大于緩沖區(qū)還可寫入的容量,將緩沖區(qū)數(shù)據(jù)寫出
flushBuffer();
}
// 將要寫出的數(shù)據(jù)寫入到緩沖區(qū)
System.arraycopy(b, off, buf, count, len);
// 更新緩沖區(qū)已添加的數(shù)據(jù)容量
count += len;
}
當(dāng)數(shù)據(jù)大于緩沖區(qū)容量時妓羊,不使用緩沖策略的原因和與分析寫入流類似胯究,都是盡可能少的進(jìn)行系統(tǒng)調(diào)度,輸出流緩沖寫出過程可用下圖表示
flushBuffer()就比較簡單了躁绸,觸發(fā)out輸出流寫出數(shù)據(jù)
private void flushBuffer() throws IOException {
if (count > 0) {
out.write(buf, 0, count);
count = 0;
}
}
IO緩沖小結(jié)
IO緩沖區(qū)的存在裕循,減少了系統(tǒng)調(diào)用。也就是說净刮,如果緩沖區(qū)能滿足讀入/寫出需求剥哑,則不需要進(jìn)行系統(tǒng)調(diào)用,維護(hù)系統(tǒng)讀寫數(shù)據(jù)的習(xí)慣淹父。
從上面學(xué)習(xí)的內(nèi)容來看株婴,不管是讀入還是寫出,緩沖區(qū)的存在必然涉及copy的過程,而如果涉及雙流操作困介,比如從一個輸入流讀入大审,再寫入到一個輸出流,那么這種情況下座哩,在緩沖存在的情況下徒扶,數(shù)據(jù)走向是:
-> 從輸入流讀出到緩沖區(qū)
-> 從輸入流緩沖區(qū)copy到 b[]
-> 將 b[] copy 到輸出流緩沖區(qū)
-> 輸出流緩沖區(qū)讀出數(shù)據(jù)到輸出流
上面情況存在冗余copy操作,Okio應(yīng)運(yùn)而生根穷。
Okio實現(xiàn)
在Okio里姜骡,解決了雙流操作時,中間數(shù)據(jù) b[] 存在冗余拷貝的問題屿良。雖然這不能概括Okio的優(yōu)點(diǎn)圈澈,但卻是足夠亮眼以及核心的優(yōu)點(diǎn)。
Okio可以通過
implementation("com.squareup.okio:okio:2.4.0")
引入管引,如果已引入Okttp3或者Retrofit士败,則無需再引入。
Okio使用Segment來作為數(shù)據(jù)存儲手段褥伴。Segment 實際上也是對 byte[] 進(jìn)行封裝谅将,再通過各種屬性來記錄各種狀態(tài)。在交換時重慢,如果可以饥臂,將Segment整體作為數(shù)據(jù)傳授媒介,這樣就沒有具體數(shù)據(jù)的copy過程似踱,而是交換了對應(yīng)的Segment引用隅熙。
Segment的數(shù)據(jù)結(jié)構(gòu)如下:
final class Segment {
// 默認(rèn)容量
static final int SIZE = 8192;
// 最小分享數(shù)據(jù)量
static final int SHARE_MINIMUM = 1024;
// 存儲具體數(shù)據(jù)的數(shù)組
final byte[] data;
// 有效數(shù)據(jù)索引起始位置
int pos;
// 有效數(shù)據(jù)索引結(jié)束位置
int limit;
// 指示Segment是否為共享狀態(tài)
boolean shared;
// 指示當(dāng)前Segment是否為數(shù)據(jù)擁有者,與shared互斥
// 默認(rèn)構(gòu)造函數(shù)的Segment owner為true核芽,當(dāng)把數(shù)據(jù)分享
// 出去時囚戚,被分享的Segment的owner標(biāo)記為false
boolean owner;
// 指向下一個Segment
Segment next;
// 指向前一個Segment
Segment prev;
}
除了用來存儲具體數(shù)據(jù)的byte[]數(shù)據(jù)外,以 pos ~ limit 來標(biāo)記有效的數(shù)據(jù)范圍轧简。Segment被涉及成可以被分割驰坊,在將Segment分割成兩個Segment時,就會進(jìn)行數(shù)據(jù)分享哮独,即使用相同的byte[] 數(shù)組拳芙,只不過 pos ~ limit 標(biāo)記不同罷了。在分享否,就需要區(qū)分個Segment是owner,哪個Segment是shared伍玖,這樣,就需要對應(yīng)的標(biāo)志進(jìn)行標(biāo)記睹限。也不難看出,Segment可以采用雙向鏈表結(jié)構(gòu)進(jìn)行連接。這里不妨先看看Segment的分割函數(shù)split()邦泄。
Segment分割
public Segment split(int byteCount) {
// byteCount表示要分割出去的數(shù)據(jù)大小
// 如果byteCount大于Segment擁有的有效數(shù)據(jù)大小删窒,拋出異常
if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
Segment prefix;
if (byteCount >= SHARE_MINIMUM) {
// 大于分割閾值 1024,進(jìn)行數(shù)據(jù)共享
// 這個情況 prefix.pos = this.pos
prefix = new Segment(this);
} else {
// 小于分割閾值 顺囊,從緩存池里換取Segment肌索,將所需數(shù)據(jù)copy到
// 新的Segment中,這里就沒有使用到共享
// 這個情況 prefix.pos = 0;
prefix = SegmentPool.take();
System.arraycopy(data, pos, prefix.data, 0, byteCount);
}
// 更新當(dāng)前Segment.pos與新的Segment.limit
prefix.limit = prefix.pos + byteCount;
pos += byteCount;
// 將Segment加入到當(dāng)前Segment節(jié)點(diǎn)的后面
prev.push(prefix);
return prefix;
}
上面的代碼描述情況可以用下圖表示
分割操作視byteCount大小特碳,有不同選擇诚亚。byteCount大于閾值 時,新建Segment午乓,并與當(dāng)前的Segment共享byte[]數(shù)據(jù)站宗,其中,當(dāng)前Segment的的索引范圍為[pos + byteCount] ~ [limit]益愈,新的Segmetn索引范圍為[pos] ~ [pos + byteCount] ; byteCount小于閾值 時梢灭,則通過copy操作,將所需數(shù)據(jù)搬運(yùn)到新的Segment蒸其。
Segment緩存池
slipt()操作中可以看到緩存池SegmentPool的身影敏释。與大多數(shù)緩存池一樣,SegmentPool避免的內(nèi)存的重新分配摸袁。SegmentPool存儲的大小為 64 * 1024钥顽, Semgent數(shù)據(jù)存儲大小為 8192,因此最多存下8個Segment靠汁。
SegmentPool復(fù)用IO操作中分配到的內(nèi)存蜂大,也是得益于Segment的設(shè)計,當(dāng)涉及到多流操作時蝶怔,效果明顯奶浦。
取操作為 take(),回收操作為 recycle() 踢星,存儲方式為單向鏈表澳叉,這里不多說。
Okio中的角色
說了那么多斩狱,在看看看Okio中涉及的到角色
Source和Sink
對應(yīng)IO中的輸入流和輸出流耳高,Source的實現(xiàn)類實現(xiàn)需read(Buffer sink, long byteCount) throws IOException; Sink的實現(xiàn)類實現(xiàn)write(Buffer source, long byteCount)扎瓶。不難猜測所踊,在Okio中以Buffer作為操作媒介,可以發(fā)揮它的最大優(yōu)勢概荷。
BufferedSource 和 BufferedSink
對應(yīng)IO中輸入流緩沖和輸出流緩沖秕岛,提供對外的API進(jìn)行讀寫操作。
Okio
入口類,工廠類继薛,提供source方法可以得到一個Source輸入流修壕,提供sink方法得到一個Sink輸出流。兩種方法可接受的入?yún)⒍伎蔀?File遏考、Socket慈鸠、InputStream / OutputStream。對每個對應(yīng)的方法進(jìn)行查看灌具,Okio并沒有改變各種Java 輸入輸出流的對應(yīng)裝飾對象的構(gòu)造青团,在構(gòu)造上,對于涉及到的上面說到的入?yún)⒖ч梗瑯?gòu)造起來比較方便督笆。也能看出,Okio并沒有打算改變底層的IO方式诱贿,旨在彌補(bǔ)原聲IO框架上的不足娃肿。
Segment
這一部分開篇已現(xiàn)對Segment進(jìn)行了介紹。除了介紹都的內(nèi)容外珠十,Segment可以以單鏈料扰、雙鏈的方式存儲。提供了pop()將自己從鏈中刪除宵睦,返回下一節(jié)點(diǎn)记罚;push()將一個Segment加在自己后面,這兩個對于鏈表的操作不做深入壳嚎。既然提供了split()方法進(jìn)行分割桐智,自然也提供了compact()方法Segment進(jìn)行合并,前提是用來做合并的Segment的剩余容量裝得下烟馅,也不做深入说庭。
SegmentPoll
復(fù)用Segment,前面說過郑趁,不贅述刊驴。
RealBufferedSource,RealBufferdSink
為BufferedSource 和 BufferedSink的實現(xiàn)類
Buffer
Okio使用了Segment作為數(shù)據(jù)存儲的方式寡润,自然要提供對應(yīng)的緩沖方式來操作Segment捆憎,Segment在Buffer中以雙向鏈表形式存在。Buffer則負(fù)責(zé)此項事務(wù)梭纹。Buffer也實現(xiàn)了BufferedSource和BufferedSink躲惰,這是因在使用Okio提供的輸入/輸出緩沖時,都需要進(jìn)行緩沖處理变抽,均由Buffer來處理础拨,這樣使API對應(yīng)氮块。
TimeOut
提供超時功能,希望IO能在一定時間內(nèi)進(jìn)行完畢诡宗,否則視為異常滔蝉。分兩種情況,同步超時和異步超時塔沃。
- 同步超時:在每次讀寫中判斷超時條件蝠引,因為處于同步方法,因此當(dāng)IO發(fā)生阻塞時蛀柴,不能及時響應(yīng)立肘。
- 異步超時:用單獨(dú)的線程監(jiān)控超時條件,如果IO發(fā)生阻塞名扛,并且檢測到超時谅年,拋出IO異常,阻塞終止肮韧。
這部分也不做深入融蹂。
緩沖實現(xiàn)
假設(shè)使用Okio復(fù)制一個文件,那么實例代碼可能是這樣的
/**
* 構(gòu)造帶緩沖的輸入流
*/
Source source = null;
BufferedSource bufferedSource = null;
source = Okio.source(new File("yourFilePath"));
bufferedSource = Okio.buffer(source);
/**
* 構(gòu)造帶緩沖的輸出流
*/
Sink sink = null;
BufferedSink bufferedSink = null;
sink = Okio.sink(new File("yourSaveFilePath"));
sink = Okio.buffer(sink);
int bufferSize = 8 * 1024; // 8kb
// 復(fù)制文件
while (!bufferedSource.exhausted()){
// 從輸入流讀取數(shù)據(jù)到輸出流緩沖
bufferedSource.read(
bufferedSink.buffer(),
bufferSize
);
// 輸出流緩沖寫出
bufferedSink.emit();
}
source.close();
sink.close();
上面代碼中弄企,Okio.source() 和 Okio.sink() , Source 接收的輸入流為 FileInputStream超燃, Sink接收輸出流為FileOutputStream。Okio.buffer() 和 Okio.sink()分別返回 RealBufferedSource, 和 RealBufferedSink拘领,Buffer作為這兩個類的成員變量存在意乓,在實例化時初始化,這部分代碼不貼出约素。主要看 RealBufferedSource.read()
@Override public long read(Buffer sink, long byteCount) throws IOException {
// 用來接收數(shù)據(jù)的Buffer 不能為空
if (sink == null) throw new IllegalArgumentException("sink == null");
// 讀取數(shù)據(jù)不能為負(fù)數(shù)
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (closed) throw new IllegalStateException("closed");
// 緩沖區(qū)沒有數(shù)據(jù)了
if (buffer.size == 0) {
// 從輸入流中讀取數(shù)據(jù)
long read = source.read(buffer, Segment.SIZE);
if (read == -1) return -1;
}
// 比較 byteCount 與 緩沖中的數(shù)據(jù)容量届良,得到到實際要讀取的數(shù)據(jù)量
long toRead = Math.min(byteCount, buffer.size);
// 從Buffer 中讀取數(shù)據(jù)
return buffer.read(sink, toRead);
}
與Java原生的緩沖方式類似,都先考慮緩沖區(qū)中的數(shù)據(jù)情況圣猎,如果緩沖區(qū)中沒有數(shù)據(jù)士葫,則先向流讀取數(shù)據(jù)填充緩沖區(qū),再根據(jù)所需讀取容量與實際緩沖區(qū)中存有的數(shù)據(jù)容量進(jìn)行讀取送悔。這里有一點(diǎn)和Java原生的不同慢显,如果byteCount的數(shù)據(jù)超出Segment的容量的話,不會直接向流讀取欠啤〖栽澹可以看出Okio非常希望以Segment為單位來對流數(shù)據(jù)進(jìn)行操作,看接收byte[]為參數(shù)的read()的重載方法也最受這個規(guī)則洁段。
先看source.read(buffer, Segment.SIZE)应狱。 source通過之前Okio.source()得來,見Okio.source()
private static Source source(final InputStream in, final Timeout timeout) {
// 輸入流不能為空
if (in == null) throw new IllegalArgumentException("in == null");
// 超時條件不能為空
if (timeout == null) throw new IllegalArgumentException("timeout == null");
// 匿名內(nèi)部類Source
return new Source() {
@Override public long read(Buffer sink, long byteCount) throws IOException {
// 獲取的byteCount不能為負(fù)數(shù)
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (byteCount == 0) return 0;
try {
// 檢查是否超時
timeout.throwIfReached();
// 獲取尾節(jié)點(diǎn)的Segment眉撵,尾節(jié)點(diǎn)不滿足填充新數(shù)據(jù)條件則拿到新的Segment
// 也是位于尾節(jié)點(diǎn)
Segment tail = sink.writableSegment(1);
// 實際從in讀取的數(shù)據(jù)侦香,能看出最大不能超過 Segment.SIZE
int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
// 從in中讀取數(shù)據(jù)到Segment.data
int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
// 這里說明沒有讀到有效數(shù)據(jù)
if (bytesRead == -1) return -1;
// 更新索引位置
tail.limit += bytesRead;
// 更新buffer容量
sink.size += bytesRead;
return bytesRead;
} catch (AssertionError e) {
if (isAndroidGetsocknameError(e)) throw new IOException(e);
throw e;
}
}
......
};
}
從輸入流中讀取數(shù)據(jù),數(shù)據(jù)存于Buffer中纽疟,位于尾節(jié)點(diǎn)的Segment罐韩,與前面說的一樣,單次向流讀出操作污朽,大小不能超過Segment.SIZE散吵。
回到read(),在確認(rèn)緩沖區(qū)有數(shù)據(jù)之后蟆肆,從緩沖區(qū)中讀取數(shù)據(jù)到sink矾睦,即從Buffer中讀取數(shù)據(jù)到另一Buffer.
@Override public long read(Buffer sink, long byteCount) {
// 用來接收數(shù)據(jù)的sink不能為空
if (sink == null) throw new IllegalArgumentException("sink == null");
// 接收的數(shù)據(jù)大小不能為負(fù)數(shù)
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (size == 0) return -1L;
// 讀取的數(shù)據(jù)不超過當(dāng)前緩沖區(qū)的容量
if (byteCount > size) byteCount = size;
// 從當(dāng)前緩沖區(qū),將數(shù)據(jù)寫入到另一緩沖區(qū)炎功,即從 this枚冗,寫到sink
sink.write(this, byteCount);
return byteCount;
}
@Override public void write(Buffer source, long byteCount) {
if (source == null) throw new IllegalArgumentException("source == null");
if (source == this) throw new IllegalArgumentException("source == this");
checkOffsetAndCount(source.size, 0, byteCount);
/**
緩沖區(qū)數(shù)據(jù)從 緩沖區(qū)source 移動到 緩沖區(qū)this,
在當(dāng)前的案例中蛇损,緩沖區(qū)source代表輸入流緩沖數(shù)據(jù)赁温,緩沖區(qū)this代表輸出流緩沖數(shù)據(jù)
此函數(shù)源碼內(nèi)部有一大段注釋,可以細(xì)細(xì)品味淤齐,我就不貼了
*/
while (byteCount > 0) {
if (byteCount < (source.head.limit - source.head.pos)) {
// 進(jìn)到這里說明股囊,說明source的頭節(jié)點(diǎn)有足夠的數(shù)據(jù)
// 獲取當(dāng)前緩沖區(qū)尾節(jié)點(diǎn)
Segment tail = head != null ? head.prev : null;
if (tail != null && tail.owner
&& (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {
// 尾節(jié)點(diǎn)不為空,并且尾可解是owner狀態(tài)
// 并且尾節(jié)點(diǎn)能夠裝下byteCount數(shù)量的數(shù)據(jù)
// 將數(shù)據(jù)從source的頭節(jié)點(diǎn) copy 到 當(dāng)前緩沖區(qū)的尾節(jié)點(diǎn)
source.head.writeTo(tail, (int) byteCount);
source.size -= byteCount;
size += byteCount;
return;
} else {
// 說明數(shù)據(jù)當(dāng)前緩沖區(qū)尾節(jié)點(diǎn)不能存下byteCount大小的數(shù)據(jù)
// 將source頭節(jié)點(diǎn)的Segment分割更啄,byteCount過閾值則共享稚疹,否則拷貝
// 共享過程則不用copy
source.head = source.head.split((int) byteCount);
}
}
/**
將source緩沖區(qū)的頭節(jié)點(diǎn)pop,加入到當(dāng)前緩沖區(qū)
*/
Segment segmentToMove = source.head;
long movedByteCount = segmentToMove.limit - segmentToMove.pos;
source.head = segmentToMove.pop();
if (head == null) {
// 進(jìn)到這里說明當(dāng)前緩沖區(qū)沒有數(shù)據(jù)祭务,將segmentToMove作為頭節(jié)點(diǎn)
head = segmentToMove;
head.next = head.prev = head;
} else {
// 進(jìn)到這里則是將segmentToMove放到鏈表尾部
Segment tail = head.prev;
tail = tail.push(segmentToMove);
tail.compact();
}
/**
更新緩沖區(qū)大小内狗,已經(jīng)還需的byteCount數(shù)量
*/
source.size -= movedByteCount;
size += movedByteCount;
byteCount -= movedByteCount;
}
}
Buffer.write()方法,將數(shù)據(jù)從一個緩沖區(qū)移動到另一個緩沖區(qū)义锥,根據(jù)不同情況來決定是進(jìn)行copy或者是引用的引動其屏,沿用上面代碼的source表示為來源緩沖區(qū),this表示為當(dāng)前緩沖區(qū)缨该,則source到this的過程為:
- 假設(shè)source頭節(jié)點(diǎn)有足夠的數(shù)據(jù)偎行,當(dāng)this的尾節(jié)點(diǎn)能裝得下是,將數(shù)據(jù)copy入this的尾節(jié)點(diǎn)贰拿;當(dāng)this的尾節(jié)點(diǎn)裝不下時蛤袒,將source頭節(jié)點(diǎn)數(shù)據(jù)進(jìn)行slipt()操作,this需要的數(shù)據(jù)會被分割在source的Segment鏈表里并成為新的頭節(jié)點(diǎn)膨更,再將source的頭節(jié)點(diǎn)pop出妙真,push到this的尾節(jié)點(diǎn)
- 假設(shè)source頭節(jié)點(diǎn)沒有足夠的數(shù)據(jù),說明整個頭節(jié)點(diǎn)都要移動出去荚守。當(dāng)this內(nèi)無數(shù)據(jù)時珍德,將source頭節(jié)點(diǎn)pop练般,成為this的頭節(jié)點(diǎn);當(dāng)this內(nèi)有數(shù)據(jù)時锈候,將source頭節(jié)點(diǎn)pop薄料,push到this的尾節(jié)點(diǎn)
上述出現(xiàn)的情況可以用下圖表達(dá)
經(jīng)過Buffer.write()操作,也就把數(shù)據(jù)從一個緩沖區(qū)泵琳,移動到了另一個緩沖區(qū)摄职。對應(yīng)當(dāng)前的案例,則是從文件輸入流緩沖區(qū)拿數(shù)據(jù)获列,讀出到文件輸出緩沖區(qū)谷市。
其它
Okio最亮眼的操作,就是設(shè)計出了Segment存儲數(shù)據(jù)击孩,通過Buffer進(jìn)行緩沖管理迫悠,并在Buffer.write()則里,通過移動引用而不是真實數(shù)據(jù)巩梢,是減少數(shù)據(jù)copy進(jìn)而交換數(shù)據(jù)的關(guān)鍵及皂。
上面分析了RealBufferedSource,而RealBufferedSink也是同樣的道理且改,只是方向相反验烧,緩沖數(shù)據(jù)存儲依然離不開Segment和Buffer,RealBufferedSink拿到數(shù)據(jù)后又跛,再通過emit()將數(shù)據(jù)寫出到輸出流碍拆,RealBufferedSink拿到的sink的操作,可以通過Okio.sink()拿到的匿名內(nèi)部類Sink()查看慨蓝,分析方法類似感混。
想較于Java原生IO的緩沖方案,雙流操中礼烈,或者說以Buffer來代替 寫入/寫出 的 byte[]弧满,減少了copy的過程,通過Segment的移動達(dá)到目的此熬。
此外庭呜,Okio的寫入/寫出操作,也可以像原生那樣犀忱,接受byte[]參數(shù)募谎,或者直接獲取下一個數(shù)據(jù),這種情況時阴汇,則于原生相似数冬,需要時一樣依賴copy,不再有減少copy的優(yōu)勢搀庶。并且拐纱,Okio接口也更友好铜异,如之前說原生實現(xiàn)向文件寫入自定義數(shù)據(jù)時,需要Data的流類型進(jìn)行轉(zhuǎn)譯秸架,自身就封裝了這樣的操作揍庄。
TimeOut 方案就不深入了,篇幅過長咕宿,自行查閱,優(yōu)點(diǎn)但非必要核心點(diǎn)蜡秽。
總結(jié)
Okio核心競爭力為府阀,增強(qiáng)了流于流之間的互動,使得當(dāng)數(shù)據(jù)從一個緩沖區(qū)移動到另一個緩沖區(qū)時芽突,可以不經(jīng)過copy能達(dá)到:
- 以Segment作為存儲結(jié)構(gòu)试浙,真實數(shù)據(jù)以類型為byte[]的成員變量data存在,并用其它變量標(biāo)記數(shù)據(jù)狀態(tài)寞蚌,在需要時田巴,如果可以,移動Segment引用挟秤,而非copy data數(shù)據(jù)
- Segment在Segment線程池中以單鏈表存在以便復(fù)用壹哺,在Buffer中以雙向鏈表存在存儲數(shù)據(jù),head指向頭部艘刚,是最老的數(shù)據(jù)
- Segment能通過slipt()進(jìn)行分割管宵,可實現(xiàn)數(shù)據(jù)共享,能通過compact()進(jìn)行合并攀甚。由Buffer來進(jìn)行數(shù)據(jù)調(diào)度箩朴,基本遵守 “大塊數(shù)據(jù)移動引用,小塊數(shù)據(jù)進(jìn)行copy” 的思想
- Source 對應(yīng)輸入流秋度,Sink 對應(yīng)輸出流
- TimeOut 以達(dá)到在期望時間內(nèi)完成IO操作的目的炸庞,同步超時在每次IO操作中檢查耗時,異步超時開啟另一線程間隔時間檢查耗時
Okio并沒有打算優(yōu)化底層IO方式以及替代原生IO方式荚斯,Okio優(yōu)化了緩沖策略以減輕內(nèi)存壓力和性能消耗埠居,并且對于部分IO場景,提供了更友好的API事期,而更多的IO場景拐格,該記的還得記。
參考
Okio精簡高效的IO庫
okio:定義簡短高效
字符流與字節(jié)流區(qū)別
看完這個刑赶,Java IO從此不在難
Java IO捏浊、NIO原理