- 1.OkHttp源碼解析(一):OKHttp初階
- 2 OkHttp源碼解析(二):OkHttp連接的"前戲"——HTTP的那些事
- 3 OkHttp源碼解析(三):OKHttp中階之線程池和消息隊(duì)列
- 4 OkHttp源碼解析(四):OKHttp中階之?dāng)r截器及調(diào)用鏈
- 5 OkHttp源碼解析(五):OKHttp中階之OKio簡(jiǎn)介
- 6 OkHttp源碼解析(六):OKHttp中階之緩存基礎(chǔ)
- 7 OkHttp源碼解析(七):OKHttp中階之緩存機(jī)制
- 8 OkHttp源碼解析(八):OKHttp中階之連接與請(qǐng)求值前奏
- 9 OkHttp源碼解析(九):OKHTTP連接中三個(gè)"核心"RealConnection涤垫、ConnectionPool、StreamAllocation
- 10 OkHttp源碼解析(十) OKHTTP中連接與請(qǐng)求
- 11 OkHttp的感謝
本來(lái)我打算OKHttp源碼解析(四) 是寫(xiě)OKHTTP的緩存,最后再單獨(dú)寫(xiě)OKIO的例嘱,但是發(fā)現(xiàn)里面運(yùn)用到了OKIO洪唐,而且后面講連接的時(shí)候也要涉及到OKIO,所以我就把OKIO拿到前面來(lái),這樣大家在讀緩存源碼和連接的時(shí)候更清楚鞋喇。
本篇文章的大綱如下:
- 1.什么是OKIO
- 2.如何使用OKIO
- 3.Sink和Source及其實(shí)現(xiàn)
- 4.Segment和SegmentPool解析
- 5.不可變的ByteString
- 6.最核心的Buffer解析
- 7.okio中的超時(shí)機(jī)制
- 8.okio的優(yōu)雅之處
- 9.FileSystem
一肌割、okio
說(shuō)道okio就不能不提JDK里面io卧蜓,那么咱們先簡(jiǎn)單說(shuō)下JDK里面的io。
(一)把敞、javaNIO和阻塞I/O:
- 1弥奸、阻塞I/O通信模式:調(diào)用InputStream.read()方法時(shí)是阻塞的,它會(huì)一直等到數(shù)據(jù)到來(lái)時(shí)才返回
- 2奋早、NIO通信模式:在JDK1.4開(kāi)始盛霎,是一種非阻塞I/O,在Java NIO的服務(wù)端由一個(gè)專門(mén)的線程來(lái)處理所有I/O事件耽装,并負(fù)責(zé)分發(fā)愤炸;線程之間通訊通過(guò)wait和notify等方式。
(二)掉奄、okio
okio是由square公司開(kāi)發(fā)的规个,它補(bǔ)充了java.io和java.nio的不足,以便能夠更加方便挥萌,快速的訪問(wèn)绰姻、存儲(chǔ)和處理你的數(shù)據(jù)。OKHttp底層也是用該庫(kù)作為支持引瀑。而且okio使用起來(lái)很簡(jiǎn)單狂芋,減少了很多io操作的基本代碼,并且對(duì)內(nèi)存和CPU使用做了優(yōu)化憨栽,他的主要功能封裝在ByteString和Buffer這兩個(gè)類中帜矾。
okio的作者認(rèn)為,javad的JDK對(duì)字節(jié)流和字符流進(jìn)行分開(kāi)定義這一世情屑柔,并不是那么優(yōu)雅屡萤,所以在okio并不進(jìn)行這樣劃分。具體的做法就是把比特?cái)?shù)據(jù)都交給Buffer管理掸宛,然后Buff實(shí)現(xiàn)BufferedSource和BufferedSink這兩個(gè)接口死陆,最后通過(guò)調(diào)用buffer相應(yīng)的方法對(duì)數(shù)據(jù)進(jìn)行編碼。
二、okio的使用
假設(shè)我有一個(gè)hello.txt文件措译,內(nèi)容是hello jianshu别凤,現(xiàn)在我用okio把它讀出來(lái)。
那我們先理一下思路:讀取文件的步驟是首先要拿到一個(gè)輸入流领虹,okio封裝了許多輸入流规哪,統(tǒng)一使用的方法重載source方法轉(zhuǎn)換成一個(gè)source,然后使用buffer方法包裝成BufferedSource塌衰,這個(gè)里面提供了流的各種操作诉稍,讀String,讀byte,short等最疆,甚至是16進(jìn)制數(shù)杯巨。這里直接讀出文件的內(nèi)容,十分簡(jiǎn)單肚菠,代碼如下:
public static void main(String[] args) {
File file = new File("hello.txt");
try {
readString(new FileInputStream(file));
} catch (IOException e) {
e.printStackTrace();
}
}
public static void readString(InputStream in) throws IOException {
BufferedSource source = Okio.buffer(Okio.source(in)); //創(chuàng)建BufferedSource
String s = source.readUtf8(); //以UTF-8讀
System.out.println(s); //打印
source.close();
}
--------------------------------------輸出-----------------------------------------
hello jianshu
Okio是對(duì)Java底層io的封裝舔箭,所以底層io能做的Okio都能做。
上面的大體流程如下:
第一步蚊逢,首先是調(diào)用okio的source(InputStream in)方法獲取Source對(duì)象
第二步层扶,調(diào)用okio的buffer(Source source)方法獲取BufferedSource對(duì)象
第三步,調(diào)用BufferedSource的readUtf8()方法讀取String對(duì)象
第四步烙荷,關(guān)閉BufferedSource
總結(jié)下大體流程如下:
- 1.構(gòu)建緩沖池镜会,緩沖源對(duì)象
- 2.讀寫(xiě)操作
- 3.關(guān)閉緩沖池
三、Sink和Source及其實(shí)現(xiàn)
(一)终抽、Sink和Source
在JDK里面有InputStream和OutputStream兩個(gè)接口戳表,由于okio作者認(rèn)為Inputsream和OutputStream中的available()和讀寫(xiě)單字節(jié)的方法純屬雞肋,所以懟出Source和Sink接口昼伴。Source和Sink類似于InputStream和OutputStream匾旭,是io操作的頂級(jí)接口類,這兩個(gè)接口均實(shí)現(xiàn)了Closeable接口。所以可以把Source簡(jiǎn)單的看成InputStream圃郊,Sink簡(jiǎn)單看成OutputStream价涝。結(jié)構(gòu)圖如下圖:
其中Sink代表的是輸出流,Source代表的是輸入流持舆,這兩個(gè)基本上都是對(duì)稱的色瘩。那我們先來(lái)分析下Sink,代碼如下:
public interface Sink extends Closeable, Flushable {
/** Removes {@code byteCount} bytes from {@code source} and appends them to this. */
// 定義基礎(chǔ)的write操作,該方法將字節(jié)寫(xiě)入Buffer
void write(Buffer source, long byteCount) throws IOException;
/** Pushes all buffered bytes to their final destination. */
@Override void flush() throws IOException;
/** Returns the timeout for this sink. */
Timeout timeout();
/**
* Pushes all buffered bytes to their final destination and releases the
* resources held by this sink. It is an error to write a closed sink. It is
* safe to close a sink more than once.
*/
@Override void close() throws IOException;
}
public interface Source extends Closeable {
/**
* Removes at least 1, and up to {@code byteCount} bytes from this and appends
* them to {@code sink}. Returns the number of bytes read, or -1 if this
* source is exhausted.
*/
// 定義基礎(chǔ)的read操作,該方法將字節(jié)寫(xiě)入Buffer
long read(Buffer sink, long byteCount) throws IOException;
/** Returns the timeout for this source. */
Timeout timeout();
/**
* Closes this source and releases the resources held by this source. It is an
* error to read a closed source. It is safe to close a source more than once.
*/
@Override void close() throws IOException;
}
雖然Sink和Source只定義了很少的方法逸寓,這也是為何說(shuō)它容易實(shí)現(xiàn)的原因居兆,但是我們一般在使用過(guò)程中,不直接拿它使用竹伸,而是使用BufferedSink和BufferedSouce對(duì)接口的封裝泥栖,因?yàn)樵贐ufferedSinke和BufferedSource接口定義了一系列好用的方法。
(二)、 BufferedSinke和 BufferedSource
看源碼可知BufferedSink和BufferedSource定義了很多方便的方法如下圖:
但是發(fā)現(xiàn)BufferedSink和BufferedSource兩個(gè)都是接口 聊倔,那么他的具體具體實(shí)現(xiàn)類是什么那晦毙?
(三)生巡、 RealBufferedSink和 RealBufferedSource
因?yàn)镽ealBufferedSink和RealBufferedSource是一一對(duì)應(yīng)的耙蔑,我就講解RealBufferedSInk了,RealBufferedSource這里就不仔細(xì)講解了孤荣。
final class RealBufferedSink implements BufferedSink {
public final Buffer buffer = new Buffer();
public final Sink sink;
public RealBufferedSink(Sink sink){
if (sink == null)
throw new NullPointerException("sink == null");
this.sink = sink;
}
}
大家看源代碼可知RealBufferedSink類中有兩個(gè)主要參數(shù)甸陌,一個(gè)是新建的Buffer對(duì)象,一個(gè)是Sink對(duì)象盐股。雖然這個(gè)類叫做RealBufferedSinke钱豁,但是實(shí)際上這個(gè)只是一個(gè)保存Buffer對(duì)象的代理而已,真生的實(shí)現(xiàn)都是在Buffer中實(shí)現(xiàn)的疯汁。
@Override public BufferedSink write(byte[] source) throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.write(source);
return emitCompleteSegments();
}
@Override public BufferedSink write(byte[] source, int offset, int byteCount) throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.write(source, offset, byteCount);
return emitCompleteSegments();
}
看上面代碼可知牲尺,RealBufferedSink實(shí)現(xiàn)BufferedSink的方法實(shí)際上都是調(diào)用buffer對(duì)應(yīng)的方法,對(duì)應(yīng)的RealBufferedSource也是這樣調(diào)用的buffer的read方法幌蚊。
可以看到這個(gè)實(shí)現(xiàn)了BufferedSink接口的兩個(gè)方法實(shí)際上都是調(diào)用了buffer的對(duì)應(yīng)方法谤碳,對(duì)應(yīng)的RealBufferedSource也是同樣的調(diào)用buffer中的read方法,關(guān)于Buffer這個(gè)類會(huì)在下面詳述溢豆,剛才我們看到Sink接口中有一個(gè)Timeout的類蜒简,這個(gè)就是Okio所實(shí)現(xiàn)的超時(shí)機(jī)制,保證了IO操作的穩(wěn)定性漩仙。
所以關(guān)于讀寫(xiě)操作這塊的類圖如下:
關(guān)于Buffer這個(gè)類搓茬,我們后面再說(shuō)
(四)其它實(shí)現(xiàn)類
1、Sink和Source它們還有各自的支持gzip壓縮的實(shí)現(xiàn)類GzipSink和GzipSource
2队他、具有委托功能的抽象類ForwardingSink和ForwardingSource卷仑,它們的具體實(shí)現(xiàn)類是InflaterSource和DeflaterSink,這兩個(gè)類主要用于壓縮麸折,為GzipSink和GzipSource服務(wù)锡凝。
綜上所述關(guān)于類的結(jié)構(gòu)圖如下:
剛剛我們看到在Sink接口中有一個(gè)Time類,這個(gè)就是okio中實(shí)現(xiàn)超時(shí)機(jī)制的接口磕谅,用于保證IO操作的穩(wěn)定性私爷。
四、Segment和SegmentPool解析
(一)膊夹、Segment
1衬浑、Segment字面的意思就是片段,okio將數(shù)據(jù)也就是Buffer分割成一塊塊的片段放刨,內(nèi)部維護(hù)者固定長(zhǎng)度的byte[]數(shù)組工秩,同時(shí)segment擁有前面節(jié)點(diǎn)和后面節(jié)點(diǎn),構(gòu)成一個(gè)雙向循環(huán)鏈表,就像下圖:
這樣采取分片使用鏈表連接助币,片中使用數(shù)組存儲(chǔ)浪听,兼具讀的連續(xù)性,以及寫(xiě)的可插入性眉菱,對(duì)比單一使用鏈表或者數(shù)組迹栓,是一種折中的方案,讀寫(xiě)更快俭缓,而且有個(gè)好處根據(jù)需求改動(dòng)分片的大小來(lái)權(quán)衡讀寫(xiě)的業(yè)務(wù)操作克伊,另外,segment也有一些內(nèi)置的優(yōu)化操作华坦,綜合這些okio才能大放異彩愿吹,后面解析Buffer時(shí)會(huì)講解什么時(shí)候形成雙向循環(huán)鏈表。
// 每一個(gè)segment所含數(shù)據(jù)的大小惜姐,固定的
static final int SIZE = 8192;
// Segments 用分享的方式避免復(fù)制數(shù)組
static final int SHARE_MINIMUM = 1024;
final byte[] data;
// data[]中第一個(gè)可讀的位置
int pos;
// data[]中第一個(gè)可寫(xiě)的位
//所以一個(gè)Segment的可讀數(shù)據(jù)數(shù)量為pos~limit-1=limit-pos犁跪;limit和pos的有效值為0~SIZE-1
int limit;
//當(dāng)前存儲(chǔ)的data數(shù)據(jù)是其它對(duì)象共享的則為真
boolean shared;
//是否是自己是操作者
boolean owner;
///前一個(gè)Segment
Segment pre;
///下一個(gè)Segment
Segment next;
總結(jié)一下就是:
SIZE就是一個(gè)segment的最大字節(jié)數(shù),其中還有一個(gè)SHARE_MINIMUM歹袁,這個(gè)涉及到segment優(yōu)化的另一個(gè)技巧坷衍,共享內(nèi)存,然后data就是保存的字節(jié)數(shù)組宇攻,pos惫叛,limit就是開(kāi)始和結(jié)束點(diǎn)的index,shared和owner用來(lái)設(shè)置狀態(tài)判斷是否可讀寫(xiě),一個(gè)有共享內(nèi)存的sement是不能寫(xiě)入的逞刷,pre嘉涌,next就是前置后置節(jié)點(diǎn)。
2夸浅、Segment的構(gòu)造方法
Segment() {
this.data = new byte[SIZE];
this.owner = true;
this.shared = false;
}
Segment(Segment shareFrom) {
this(shareFrom.data, shareFrom.pos, shareFrom.limit);
shareFrom.shared = true;
}
Segment(byte[] data, int pos, int limit) {
this.data = data;
this.pos = pos;
this.limit = limit;
this.owner = false;
this.shared = true;
}
- 1仑最、無(wú)參的構(gòu)造函數(shù),表明采用該構(gòu)造器表明該數(shù)據(jù)data的所有者就是該Segment自己帆喇,所以owner為真警医,shared為假
- 2、一個(gè)參數(shù)的構(gòu)造函數(shù)坯钦,看代碼我們知道這個(gè)數(shù)據(jù)來(lái)自其他Segment预皇,所以表明這個(gè)Segment是被別人共享了,所以shared為真婉刀,owner為假
- 3吟温、三個(gè)參數(shù)構(gòu)造函數(shù)表明數(shù)據(jù)直接使用外面的,所以share為真突颊,owner為假
由上面代碼可知鲁豪,owner和shared這兩個(gè)狀態(tài)是互斥的潘悼,且賦值都是同步賦值的。
3爬橡、Segment的幾個(gè)方法分析
既然是雙向循環(huán)鏈表治唤,其中也會(huì)有一些操作的方法:
(1)pop()方法:
將當(dāng)前Segment從Segment鏈中移除去將,返回下一個(gè)Segment代碼如下
public Segment pop(){
Segment result = next != this ? next : null;
pre.next = next;
next.pre = pre;
next = null;
pre = null;
return result;
}
pop方法移除自己糙申,首先將自己的前后兩個(gè)節(jié)點(diǎn)連接起來(lái)宾添,然后將自己的前后置空,這昂就脫離了整個(gè)雙向鏈表郭宝,然后返回next辞槐。說(shuō)道pop就一定會(huì)聯(lián)想到push。
(2)push方法:
Segment中的push表示將一個(gè)Segment壓入該Segment節(jié)點(diǎn)的后面粘室,返回剛剛壓入的Segment
public Segment push(Segment segment){
segment.pre = this;
segment.next = next;
next.pre = segment;
next = segment;
return segment;
}
push方法就是在當(dāng)前和next引用中間插入一個(gè)segment進(jìn)來(lái),并且返回插入的segment卜范,這兩個(gè)都是尋常的雙向鏈表的操作衔统,我們?cè)賮?lái)看看如何寫(xiě)入數(shù)據(jù)。
(3)writeTo方法
/** Moves {@code byteCount} bytes from this segment to {@code sink}. */
public void writeTo(Segment sink, int byteCount) {
//只能對(duì)自己操作
if (!sink.owner) throw new IllegalArgumentException();
//寫(xiě)的起始位置加上需要寫(xiě)的byteCount大于SIZE
if (sink.limit + byteCount > SIZE) {
// We can't fit byteCount bytes at the sink's current position. Shift sink first.
//如果是共享內(nèi)存海雪,不能寫(xiě)锦爵,則拋異常
if (sink.shared) throw new IllegalArgumentException();
//如果不是共享內(nèi)存,且寫(xiě)的起始位置加上byteCount減去頭還大于SIZE則拋異常
if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
//上述條件都不滿足奥裸,我們需要先觸動(dòng)要寫(xiě)文件的地址险掀,把sink.data從sink.pos這個(gè)位置移動(dòng)到 0這個(gè)位置,移動(dòng)的長(zhǎng)度是limit - sink.pos湾宙,這樣的好處就是sink初始位置是0
System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
sink.limit -= sink.pos;
sink.pos = 0;
}
//開(kāi)始尾部寫(xiě)入 寫(xiě)完置limit地址
System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
sink.limit += byteCount;
pos += byteCount;//索引后移
}
PS:這里我們來(lái)復(fù)習(xí)一下arraycopy方法:
public static native void arraycopy(Object src, int srcPos, Object dest, int destPos, int length);
src:源數(shù)組樟氢; srcPos:源數(shù)組要復(fù)制的起始位置;dest:目的數(shù)組侠鳄; destPos:目的數(shù)組放置的起始位置埠啃;length:復(fù)制的長(zhǎng)度。
實(shí)現(xiàn)過(guò)程是這樣的伟恶,先生成一個(gè)長(zhǎng)度為length的臨時(shí)數(shù)組,將圓數(shù)組中srcPos 到scrPos+lengh-1之間的數(shù)據(jù)拷貝到臨時(shí)數(shù)組中碴开。然后將這個(gè)臨時(shí)數(shù)組復(fù)制到dest中。
讀到上面的代碼博秫,我們發(fā)現(xiàn)owner和shared這兩個(gè)狀態(tài)是互斥的潦牛,賦值都是同步賦值的,這里我們有點(diǎn)明白這兩個(gè)參數(shù)的意義了挡育,如果是共享的就是無(wú)法寫(xiě)巴碗,以免污染數(shù)據(jù),會(huì)拋出異常静盅。
如果要寫(xiě)的字節(jié)加上原有的字節(jié)大于單個(gè)segment的最大值也會(huì)拋出異常良价。
還有一種情況就是寝殴,由于前面的pos索引可能因?yàn)閞ead方法取出數(shù)據(jù),pos所以后移這樣導(dǎo)致可以容納數(shù)據(jù)明垢,這樣可以先執(zhí)行移動(dòng)操作蚣常,使用JDK提供的System.arraycopy方法來(lái)移動(dòng)到pos為0的狀態(tài),更改pos和limit索引后再在尾部寫(xiě)入byteCount數(shù)的數(shù)據(jù)痊银,寫(xiě)完之后實(shí)際上原segment讀了byteCount的數(shù)據(jù)抵蚊,所以pos需要后移這么多。
(4) compact()方法(壓縮機(jī)制)
除了寫(xiě)入數(shù)據(jù)之外溯革,segment還有一個(gè)優(yōu)化的技巧贞绳,因?yàn)槊總€(gè)segment的片段size是固定的,為了防止經(jīng)過(guò)長(zhǎng)時(shí)間的使用后致稀,每個(gè)segment中的數(shù)據(jù)被分割的十分嚴(yán)重冈闭,可能一個(gè)很小的數(shù)據(jù)卻占據(jù)了整個(gè)segment,所以有了一個(gè)壓縮機(jī)制抖单。
public void compact() {
//上一個(gè)節(jié)點(diǎn)就是自己萎攒,意味著就一個(gè)節(jié)點(diǎn)兢榨,無(wú)法壓縮心傀,拋異常
if (prev == this) throw new IllegalStateException();
//如果上一個(gè)節(jié)點(diǎn)不是自己的娇跟,所以你是沒(méi)有權(quán)利壓縮的
if (!prev.owner) return; // Cannot compact: prev isn't writable.
//能進(jìn)來(lái)說(shuō)明夹厌,存在上一個(gè)節(jié)點(diǎn)励稳,且上一個(gè)節(jié)點(diǎn)是自己的含友,可以壓縮
//記錄當(dāng)前Segment具有的數(shù)據(jù)改执,數(shù)據(jù)大小為limit-pos
int byteCount = limit - pos;
統(tǒng)計(jì)前結(jié)點(diǎn)是否被共享絮短,如果共享則只記錄Size-limit大小囚玫,如果沒(méi)有被共享喧锦,則加上pre.pos之前的空位置;
//本行代碼主要是獲取前一個(gè)segment的可用空間劫灶。先判斷prev是否是共享的裸违,如果是共享的,則只記錄SIZE-limit,如果沒(méi)有共享則記錄SIZE-limit加上prev.pos之前的空位置
int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
//判斷prev的空余空間是否能夠容納Segment的全部數(shù)據(jù)本昏,不能容納則返回
if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
//能容納則將自己的這個(gè)部分?jǐn)?shù)據(jù)寫(xiě)入上一個(gè)Segment
writeTo(prev, byteCount);
//講當(dāng)前Segment從Segment鏈表中移除
pop();
//回收該Segment
SegmentPool.recycle(this);
}
總結(jié)下上述代碼:如果前面的Segment是共享的供汛,那么不可寫(xiě),也不能壓縮涌穆,然后判斷前一個(gè)的剩余大小是否比當(dāng)前打怔昨,如果有足夠的空間來(lái)容納數(shù)據(jù),調(diào)用前面的writeTo方法寫(xiě)入數(shù)據(jù)宿稀,寫(xiě)完以后趁舀,移除當(dāng)前segment,然后回收segment祝沸。
(5)split()方法(共享機(jī)制)
還有一種機(jī)制是共享機(jī)制矮烹,為了減少數(shù)據(jù)復(fù)制帶來(lái)的性能開(kāi)銷越庇。
本方法用于將Segment一分為二,將pos+1pos+btyeCount-1的內(nèi)容給新的Segment,將pos+byteCountlimit-1的內(nèi)容留給自己奉狈,然后將前面的新Segment插入到自己的前面卤唉。這里需要注意的是,雖然這里變成了兩個(gè)Segment仁期,但是實(shí)際上byte[]數(shù)據(jù)并沒(méi)有被拷貝桑驱,兩個(gè)Segment都引用了改Segment。
public Segment split(int byteCount) {
if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
Segment prefix;
// We have two competing performance goals:
// - Avoid copying data. We accomplish this by sharing segments.
// - Avoid short shared segments. These are bad for performance because they are readonly and
// may lead to long chains of short segments.
// To balance these goals we only share segments when the copy will be large.
if (byteCount >= SHARE_MINIMUM) {
prefix = new Segment(this);
} else {
prefix = SegmentPool.take();
System.arraycopy(data, pos, prefix.data, 0, byteCount);
}
prefix.limit = prefix.pos + byteCount;
pos += byteCount;
prev.push(prefix);
return prefix;
}
這個(gè)方法實(shí)際上經(jīng)過(guò)很多次的改變跛蛋,在回顧okio1.6時(shí)熬的,發(fā)現(xiàn)有一個(gè)重要的差異就是多了一個(gè)SHARE_MININUM參數(shù),同時(shí)也多了一個(gè)注釋赊级,為了防止一個(gè)很小的片段就進(jìn)行共享押框,我們知道共享之后為了防止數(shù)據(jù)污染就無(wú)法寫(xiě)了,如果存在大量的共享片段此衅,實(shí)際上是浪費(fèi)資源的强戴,所以通過(guò)這個(gè)對(duì)比可以看出這個(gè)最小數(shù)的的意義,而且這個(gè)方法在1.6的版本中檢索實(shí)際上只有一個(gè)地方使用了這個(gè)方法挡鞍,就是Buffer中的write方法,為了效率在移動(dòng)大數(shù)據(jù)的時(shí)候直接移動(dòng)了segment而不是data预烙,這樣在寫(xiě)數(shù)據(jù)上能達(dá)到很高的效率墨微,具體的write細(xì)節(jié)會(huì)在后面講解。
在上面的方法中出現(xiàn)的 SegmentPool 扁掸,這是什么東西翘县,那我們下面就來(lái)講解一下
(二) SegmentPool
SegmentPool是一個(gè)Segment池,由一個(gè)單項(xiàng)鏈表構(gòu)成谴分。該池負(fù)責(zé)Segment的回收和閑置Segment管理锈麸,也就是說(shuō)Buffer使用的Segment是從Segment單項(xiàng)鏈表中取出的,這樣有效的避免了GC頻率
SegmentPool這個(gè)類比較簡(jiǎn)單牺蹄,先來(lái)看下這個(gè)類的屬性
先看下
//一個(gè)Segment記錄的最大長(zhǎng)度是8192忘伞,因此SegmentPool只能存儲(chǔ)8個(gè)Segment
static final long MAX_SIZE = 64 * 1024;
//該SegmentPool存儲(chǔ)了一個(gè)回收Segment的鏈表
static Segment next;
//該值記錄了當(dāng)前所有Segment的總大小,最大值是為MAX_SIZE
static long byteCount;
上面代碼表明了一個(gè)池子的上線也就是64K沙兰,相當(dāng)于8個(gè)Segment氓奈,next這個(gè)屬性表明這個(gè)SegmentPool是按照單鏈表的方式進(jìn)行存儲(chǔ)的,byteCount這個(gè)字段表明已經(jīng)使用的大小鼎天。
SegmentPool的方法十分簡(jiǎn)單舀奶,就兩個(gè)一個(gè)是"取",一個(gè)是"收"
static Segment take() {
synchronized (SegmentPool.class) {
if (next != null) {
Segment result = next;
next = result.next;
result.next = null;
byteCount -= Segment.SIZE;
return result;
}
}
return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
}
為了防止多線程同時(shí)操作出現(xiàn)問(wèn)題斋射,這里加了鎖育勺。注意:這里的next表示整個(gè)對(duì)象池的頭但荤,而不是"下一個(gè)"。如果next為null涧至,則這個(gè)腹躁,池子里面沒(méi)有Segment。否則就從里面拿出一個(gè)next出來(lái)化借,并將next的下一個(gè)節(jié)點(diǎn)賦值為next(即為頭)潜慎,然后設(shè)置下一下byteCount。如果鏈表為空蓖康,則創(chuàng)建一個(gè)Segment對(duì)象返回铐炫。
下面咱們來(lái)看下回收的方法
static void recycle(Segment segment) {
//如果這個(gè)要回收的Segment被前后引用,則無(wú)法回收
if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
//如果這個(gè)要回收的Segment的數(shù)據(jù)是分享的蒜焊,則無(wú)法回收
if (segment.shared) return; // This segment cannot be recycled.
//加鎖
synchronized (SegmentPool.class) {
//如果 這個(gè)空間已經(jīng)不足以再放入一個(gè)空的Segment倒信,則不回收
if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
//設(shè)置SegmentPool的池大小
byteCount += Segment.SIZE;
//segment的下一個(gè)指向頭
segment.next = next;
//設(shè)置segment的可讀寫(xiě)位置為0
segment.pos = segment.limit = 0;
//設(shè)置當(dāng)前segment為頭
next = segment;
}
}
上面代碼很簡(jiǎn)單,就是嘗試將參數(shù)Segment對(duì)象加入到自身的Segment鏈表中泳梆。其中做了一些條件判斷鳖悠,具體看注釋。
SegmentPool總結(jié):
1.大家注意到?jīng)]有SegmentPool的作用就是管理多余的Segment优妙,不直接丟棄廢棄的Segment,等客戶需要Segment的時(shí)候直接從池中獲取一個(gè)對(duì)象乘综,避免了重復(fù)創(chuàng)建新兌現(xiàn),提高資源利用率套硼。
2.大家仔細(xì)讀取源碼會(huì)發(fā)現(xiàn)SegmentPool的所有操作都是基于對(duì)表頭的操作卡辰。
五、ByteString
ByteString存儲(chǔ)的是不可變比特序列邪意,可能你不太理解這句話九妈,如果給出final btye[] data 你是不是就秒懂了。官方文檔說(shuō)可以把它看成string的遠(yuǎn)方親戚雾鬼,且這個(gè)親戚符合人工工學(xué)設(shè)計(jì)萌朱,逼格是不是很高。不過(guò)簡(jiǎn)單的講策菜,他就是一個(gè)byte序列(數(shù)組)晶疼,以制定的編碼格式進(jìn)行解碼。目前支持的解碼規(guī)則有hex做入,base64和UTF-8等冒晰,機(jī)智如你可能會(huì)說(shuō)String也是如此。是的竟块,你說(shuō)的沒(méi)錯(cuò)壶运,ByteString 只是把這些方法進(jìn)行了封裝。免去了我們直接輸入類似的"utf-8"這樣的錯(cuò)誤浪秘,直接通過(guò)調(diào)用utf-8格式進(jìn)行解碼蒋情,還做了優(yōu)化埠况,在第一次調(diào)用uft8()方法的時(shí)候得到了一個(gè)該解碼的String,同時(shí)在ByteString內(nèi)部還保留了這個(gè)引用棵癣,當(dāng)再次調(diào)用utf-8()的時(shí)候辕翰,則直接返回這個(gè)引用。
剛剛說(shuō)了不可變對(duì)象狈谊。Effective Java 一書(shū)中有一條給了不可變對(duì)象的需要遵循的的幾條原則:
- 1.不要提供任何會(huì)修改對(duì)象狀態(tài)的方法
- 2.保證類不會(huì)被擴(kuò)展
- 3.使所有的域都是final的
- 4.使所有的域都是private的
- 5.確保對(duì)于任何可變組件的互斥訪問(wèn)
不可變對(duì)象有許多好處喜命,首先本質(zhì)是線程安全,不要求同步處理河劝,也就是沒(méi)有鎖之類的性能問(wèn)題壁榕,而且可以被自由的共享內(nèi)部信息,當(dāng)然壞處就是要?jiǎng)?chuàng)建大量類的對(duì)象,咱們看看ByteString的屬性
1赎瞎、ByteString屬性
//明顯給16進(jìn)制數(shù)準(zhǔn)備的
static final char[] HEX_DIGITS = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
//一個(gè)空的對(duì)象
/** A singleton empty {@code ByteString}. */
public static final ByteString EMPTY = ByteString.of();
final byte[] data;
//hashcode的值
transient int hashCode; // Lazily computed; 0 if unknown.
transient String utf8; // Lazily computed.
這里要重點(diǎn)說(shuō)明下 byte[] data和transient String utf8牌里,ByteString不僅是不可變的,同時(shí)在內(nèi)部有兩個(gè)filed务甥,分別是byte數(shù)據(jù)牡辽,以及String數(shù)據(jù),這樣能夠讓這個(gè)類Byte和String轉(zhuǎn)換上基本沒(méi)有開(kāi)銷敞临。同樣需要保存這兩份引用态辛,這明顯的空間換時(shí)間的方式,為了性能okio做了很多事情挺尿。這是這個(gè)String前面有transient關(guān)鍵字標(biāo)記因妙,也就是說(shuō)不會(huì)進(jìn)入序列化和反序列化,所以readObject和writeObject()中沒(méi)有utf8屬性
2票髓、ByteString構(gòu)造函數(shù)
ByteString(byte[] data) {
this.data = data; // Trusted internal constructor doesn't clone data.
}
構(gòu)造函數(shù)的參數(shù)為一個(gè)byte[]類型,不過(guò)構(gòu)造函數(shù)只能被同包的類使用铣耘,因此我們創(chuàng)建ByteString對(duì)象并不是通過(guò)該方法洽沟。我們是如何構(gòu)造一個(gè)ByteString對(duì)象?其實(shí)是通過(guò) of()方法構(gòu)造對(duì)象的
3蜗细、of()方法創(chuàng)建ByteString對(duì)象
/**
* Returns a new byte string containing a clone of the bytes of {@code data}.
*/
public static ByteString of(byte... data) {
if (data == null) throw new IllegalArgumentException("data == null");
return new ByteString(data.clone());
}
/**
* Returns a new byte string containing a copy of {@code byteCount} bytes of {@code data} starting
* at {@code offset}.
*/
public static ByteString of(byte[] data, int offset, int byteCount) {
if (data == null) throw new IllegalArgumentException("data == null");
checkOffsetAndCount(data.length, offset, byteCount);
byte[] copy = new byte[byteCount];
System.arraycopy(data, offset, copy, 0, byteCount);
return new ByteString(copy);
}
public static ByteString of(ByteBuffer data) {
if (data == null) throw new IllegalArgumentException("data == null");
byte[] copy = new byte[data.remaining()];
data.get(copy);
return new ByteString(copy);
ByteString 有三個(gè)of方法裆操,都是return一個(gè)ByteString,咱們就一個(gè)一個(gè)的分析
- 1.第一個(gè)of方法炉媒,就是調(diào)用clone方法踪区,重新創(chuàng)建一個(gè)byte數(shù)組。clone一個(gè)數(shù)組的原因很簡(jiǎn)單吊骤,我們確保ByteString的data指向byte[]沒(méi)有被其他對(duì)象所引用缎岗,否則就容易破壞ByteString中存儲(chǔ)的是一個(gè)不可變化的的比特流數(shù)據(jù)這一約束。
- 2.第二個(gè)of方法白粉,首先進(jìn)行checkOffsetAndCount()方法進(jìn)行邊界檢查传泊,然后鼠渺,將data中指定的數(shù)據(jù)拷貝到數(shù)組中去。
- 3.第三個(gè)of方法眷细,同理拦盹,和第二個(gè)of方法差不多,只不過(guò)一個(gè)是byte[]溪椎,一個(gè)是ByteBuffer普舆。
4、toString()
/**
* Returns a human-readable string that describes the contents of this byte string. Typically this
* is a string like {@code [text=Hello]} or {@code [hex=0000ffff]}.
*/
@Override public String toString() {
if (data.length == 0) {
return "[size=0]";
}
String text = utf8();
int i = codePointIndexToCharIndex(text, 64);
if (i == -1) {
return data.length <= 64
? "[hex=" + hex() + "]"
: "[size=" + data.length + " hex=" + substring(0, 64).hex() + "…]";
}
String safeText = text.substring(0, i)
.replace("\\", "\\\\")
.replace("\n", "\\n")
.replace("\r", "\\r");
return i < text.length()
? "[size=" + data.length + " text=" + safeText + "…]"
: "[text=" + safeText + "]";
}
5校读、utf8()
utf8格式的String
/** Constructs a new {@code String} by decoding the bytes as {@code UTF-8}. */
public String utf8() {
String result = utf8;
// We don't care if we double-allocate in racy code.
return result != null ? result : (utf8 = new String(data, Util.UTF_8));
}
這里面的一個(gè)判斷語(yǔ)句沼侣,實(shí)現(xiàn)ByteString性能優(yōu)化,看來(lái)優(yōu)化這個(gè)東西還是很容易實(shí)現(xiàn)的地熄。第一次創(chuàng)建UTF-8對(duì)象的方法是調(diào)用new String(data,Util.UTF_8),后面就不再調(diào)用該方法而是直接返回result华临;發(fā)現(xiàn)uft8是對(duì)String 的方法進(jìn)一步封裝。
下面是ByteString的方法
六端考、Buffer
1雅潭、Buffer簡(jiǎn)介
- 1、Buffer存儲(chǔ)的是可變比特序列却特,需要注意的是Buffer內(nèi)部對(duì)比特?cái)?shù)據(jù)的存儲(chǔ)不是直接使用一個(gè)byte數(shù)組那么簡(jiǎn)單扶供,它使用了一種新的數(shù)據(jù)類型Segment進(jìn)行存儲(chǔ)。不過(guò)我們先不去管Segment是什么東西裂明,可以先直接將Buffer想象成一個(gè)LinkedList集合就知道了椿浓,之所以做這樣,因?yàn)锽uffer的容量可以動(dòng)態(tài)擴(kuò)展闽晦,從序列的尾部存入數(shù)據(jù)扳碍,從序列的頭部讀取數(shù)據(jù)。其實(shí)Buffer的底層實(shí)現(xiàn)了遠(yuǎn)比LinkedList復(fù)雜的多仙蛉,他使用雙向鏈表的形式存儲(chǔ)數(shù)組笋敞,鏈表節(jié)點(diǎn)的數(shù)據(jù)類型就是前面說(shuō)的Segment,Segment存儲(chǔ)有一個(gè)不可變比特序列荠瘪,即final byte[] data夯巷。使用的Buffer的好處在于從一個(gè)Buffer移動(dòng)到另一個(gè)Buffer的時(shí)候,實(shí)際上并沒(méi)有進(jìn)行拷貝哀墓,只是改變了它對(duì)應(yīng)的Segment的所有者趁餐,其實(shí)這也采用鏈表存儲(chǔ)數(shù)據(jù)的好處,這樣的特點(diǎn)在多線程網(wǎng)絡(luò)通信中帶來(lái)很大的好處篮绰。最后使用Buffer還有另一個(gè)好處那就是它實(shí)現(xiàn)了BufferSource和BufferSink接口后雷。
- 2、前面講到Buffer這個(gè)類實(shí)際上就是整個(gè)讀寫(xiě)的核心,包括RealBufferSource和RealBufferedSink實(shí)際上都只是一個(gè)代理喷面,里面操作全部都是通過(guò)Buffer來(lái)完成星瘾。
2、Buffer的屬性及實(shí)現(xiàn)的接口
public final class Buffer implements BufferedSource, BufferedSink, Cloneable {
//明顯是給16進(jìn)制準(zhǔn)備的
private static final byte[] DIGITS =
{ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
static final int REPLACEMENT_CHARACTER = '\ufffd';
//Buffer存儲(chǔ)了一個(gè)這樣的head節(jié)點(diǎn)惧辈,這就是Buffer對(duì)數(shù)據(jù)的存儲(chǔ)結(jié)構(gòu)琳状。字節(jié)數(shù)組都是交給Segment進(jìn)行管理
Segment head;
//當(dāng)前存儲(chǔ)的數(shù)據(jù)大小
long size;
因?yàn)锽uffer持有一個(gè)Segment的引用,所以通過(guò)這個(gè)引用能拿到整個(gè)鏈表中的所有數(shù)據(jù)盒齿。
同時(shí)Buffer實(shí)現(xiàn)了三個(gè)接口念逞,讀,寫(xiě)以及clone边翁。
(1)clone接口
那咱們先說(shuō)clone吧翎承,這個(gè)比較簡(jiǎn)答,大家都能看懂符匾,Java拷貝叨咖,主要分為深拷貝還是淺拷貝。Buffer這里采用的是淺拷貝啊胶。
//看到注釋的第一句話甸各,我就知道了是深拷貝,哈哈焰坪!
/** Returns a deep copy of this buffer. */
@Override public Buffer clone() {
//先new了一個(gè)Buffer對(duì)象
Buffer result = new Buffer();
//如果size==0趣倾,說(shuō)明這個(gè)Buffer是空的,所以直接返回即可
if (size == 0) return result;
//如果size!=0某饰,說(shuō)明這個(gè)Buffer是有數(shù)據(jù)的儒恋,然后吧head指向這個(gè)copy的head,PS大家回想下Segment的這個(gè)構(gòu)造函數(shù),里面是怎么操作的黔漂?
result.head = new Segment(head);
//然后設(shè)置copy的head的next和prev的值
result.head.next = result.head.prev = result.head;
//開(kāi)始遍歷這個(gè)Buffer持有的Segment鏈了
for (Segment s = head.next; s != head; s = s.next) {
result.head.prev.push(new Segment(s));
}
result.size = size;
return result;
}
- 1對(duì)應(yīng)了實(shí)現(xiàn)的clone方法诫尽,如果整個(gè)Buffer的size為0,也就是沒(méi)有數(shù)據(jù)炬守,那么返回一個(gè)新建的Buffer對(duì)象箱锐,如果不為空就是遍歷所有segment并且都創(chuàng)建一個(gè)對(duì)應(yīng)的segment,這樣clone出來(lái)的對(duì)象就是一個(gè)全新的毫無(wú)關(guān)系的對(duì)象劳较。
- 2前面分析segment的時(shí)候有講到是一個(gè)雙向鏈表,但是segment自身構(gòu)造的時(shí)候卻沒(méi)有形成閉環(huán)浩聋,其實(shí)就是在Buffer中產(chǎn)生的观蜗。
result.head.next = result.head.prev = result.head;- 3關(guān)于for循環(huán)有同學(xué)表示看不懂,我這里就詳細(xì)解釋下:
先取出head的next對(duì)象的引用衣洁,然后壓入該節(jié)點(diǎn)墓捻,比如 目前順序是 A B C,A是head,A的pre是C砖第,所以在C節(jié)點(diǎn)的后面加入新的segmentD撤卢,壓后的順序就是 A B C D,然后執(zhí)行s.next,如果s應(yīng)是最后了梧兼,則跳出循環(huán)
2放吩、BufferedSource, BufferedSink接口
Buffer實(shí)現(xiàn)了這兩個(gè)接口的所有方法,既有讀羽杰,也有寫(xiě)渡紫,由于篇幅的限制,我就不全部講解考赛,只挑幾個(gè)講解惕澎。其他的都大同小異
@Override public int readInt() {
//第一步
if (size < 4) throw new IllegalStateException("size < 4: " + size);
//第二步
Segment segment = head;
int pos = segment.pos;
int limit = segment.limit;
// If the int is split across multiple segments, delegate to readByte().
if (limit - pos < 4) {
return (readByte() & 0xff) << 24
| (readByte() & 0xff) << 16
| (readByte() & 0xff) << 8
| (readByte() & 0xff);
}
//第三步
byte[] data = segment.data;
int i = (data[pos++] & 0xff) << 24
| (data[pos++] & 0xff) << 16
| (data[pos++] & 0xff) << 8
| (data[pos++] & 0xff);
size -= 4;
//第四步
if (pos == limit) {
head = segment.pop();
SegmentPool.recycle(segment);
} else {
segment.pos = pos;
}
//第五步
return i;
}
第一步,做了一次驗(yàn)證颜骤,因?yàn)橐粋€(gè)int數(shù)據(jù)的字節(jié)數(shù)是4唧喉,所以必須保證當(dāng)前Buffer的size大于4。
第二步忍抽,如果當(dāng)前的Segement所包含的字節(jié)數(shù)小于4,因此還需要去一個(gè)Segment中獲取一分部數(shù)據(jù)八孝,因此通過(guò)調(diào)用readByte()方法一個(gè)字節(jié)一個(gè)字節(jié)的讀取,該方法我們后介紹
第三步梯找,如果當(dāng)前Segment的數(shù)據(jù)夠用唆阿,因此直接從pos位置開(kāi)始讀取4個(gè)字節(jié)的數(shù)據(jù),然后將其轉(zhuǎn)換為int數(shù)據(jù)锈锤,轉(zhuǎn)換方法很簡(jiǎn)單就是位和或運(yùn)算驯鳖。
第四步,如果pos==limit證明當(dāng)前head對(duì)應(yīng)的Segment沒(méi)有可讀數(shù)據(jù)久免,因此將該Segment從雙向鏈表移除浅辙,并回收該Segment。如果還有數(shù)據(jù)則設(shè)置新的pos值阎姥。
第五步记舆,返回解析得到int值
這里面提到了readByte()那么咱們就來(lái)研究下這個(gè)readByte()方法
@Override public byte readByte() {
//第一步
if (size == 0) throw new IllegalStateException("size == 0");
//第二步
Segment segment = head;
int pos = segment.pos;
int limit = segment.limit;
//第三步
byte[] data = segment.data;
//第四步
byte b = data[pos++];
size -= 1;
//第五步
if (pos == limit) {
head = segment.pop();
SegmentPool.recycle(segment);
} else {
segment.pos = pos;
}
//第六步
return b;
}
第一步,做size判斷呼巴,如果size==0表示沒(méi)有什么東西好讀取的泽腮。
第二步,獲取Segment衣赶,并取得這個(gè)segment的pos位置和limit的位置
第三步诊赊,獲取byte[] data
第四步,獲取一個(gè)字節(jié)的byte
第五步府瞄,判斷這個(gè)segment有沒(méi)有可讀數(shù)據(jù)了碧磅,如果沒(méi)有可讀數(shù)據(jù)則回收Segment。如果有數(shù)據(jù)則更新pos的值
第六步,返回b
那咱們?cè)賮?lái)看看寫(xiě)的方法鲸郊,那換一個(gè)就不寫(xiě)int丰榴,選寫(xiě)short吧
@Override public Buffer writeShort(int s) {
//第一步
Segment tail = writableSegment(2);
//第二步
byte[] data = tail.data;
int limit = tail.limit;
data[limit++] = (byte) ((s >>> 8) & 0xff);
data[limit++] = (byte) (s & 0xff);
tail.limit = limit;
//第三步
size += 2;
//第四步
return this;
}
/**
* Returns a tail segment that we can write at least {@code minimumCapacity}
* bytes to, creating it if necessary.
*/
Segment writableSegment(int minimumCapacity) {
if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();
//如果hea=null表明Buffer里面一個(gè)Segment都沒(méi)有
if (head == null) {
//從SegmentPool取出一個(gè)Segment
head = SegmentPool.take(); // Acquire a first segment.
return head.next = head.prev = head;
}
//如果head不為null.
Segment tail = head.prev;
//如果已經(jīng)寫(xiě)入的數(shù)據(jù)+最小可以寫(xiě)入的空間超過(guò)限制,則在SegmentPool里面取一個(gè)
if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
}
return tail;
}
writeShort用來(lái)給Buffer寫(xiě)入一個(gè)short數(shù)據(jù)
第一步,通過(guò)writeableSegment拿到一個(gè)能有2個(gè)字節(jié)的空間的segment秆撮,writeableSegment方法我里面寫(xiě)了注釋四濒,大家自己去看,我這里就不詳細(xì)說(shuō)明了像吻。
第二步峻黍,tail中的data就是字節(jié)數(shù)組,limit則是數(shù)據(jù)的尾部索引拨匆,寫(xiě)數(shù)據(jù)就是在尾部繼續(xù)寫(xiě)姆涩,直接設(shè)置在data通過(guò)limit自增后的index,然后重置尾部索引.
第三步,size+2
第四步惭每,返回tail
七骨饿、Okio中的超時(shí)機(jī)制
1、TimeOut
okio的超時(shí)機(jī)制讓I/O操作不會(huì)因?yàn)楫惓W枞谀硞€(gè)未知的錯(cuò)誤上台腥,okio的基礎(chǔ)超時(shí)機(jī)制是采取同步超時(shí)宏赘。
1以輸出流Sink為例子,當(dāng)我們用下面的方法包裝流時(shí):
//Okio.java
//實(shí)際上調(diào)用的兩個(gè)參數(shù)的sink方法黎侈,第二個(gè)參數(shù)是new的TimeOut對(duì)象察署,即同步超時(shí)
public static Sink sink(OutputStream out) {
return sink(out, new Timeout());
}
private static Sink sink(final OutputStream out, final Timeout timeout) {
if (out == null) throw new IllegalArgumentException("out == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Sink() {
@Override
public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
timeout.throwIfReached();
Segment head = source.head;
int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
out.write(head.data, head.pos, toCopy);
head.pos += toCopy;
byteCount -= toCopy;
source.size -= toCopy;
if (head.pos == head.limit) {
source.head = head.pop();
SegmentPool.recycle(head);
}
}
}
@Override public void flush() throws IOException {
out.flush();
}
@Override public void close() throws IOException {
out.close();
}
@Override public Timeout timeout() {
return timeout;
}
@Override public String toString() {
return "sink(" + out + ")";
}
};
}
可以看到write方法里中實(shí)際上有一個(gè)while循環(huán),在每個(gè)開(kāi)始寫(xiě)的時(shí)候就調(diào)用了timeout.throwIfReached()方法峻汉,我們推斷這個(gè)方法里面做了時(shí)間是否超時(shí)的判斷贴汪,如果超時(shí)了,應(yīng)該throw一個(gè)exception出來(lái)休吠。這很明顯是一個(gè)同步超時(shí)機(jī)制扳埂,按序執(zhí)行。同理Source也是一樣瘤礁,那么咱們看下他里面到底是怎么執(zhí)行的
/**
* Throws an {@link InterruptedIOException} if the deadline has been reached or if the current
* thread has been interrupted. This method doesn't detect timeouts; that should be implemented to
* asynchronously abort an in-progress operation.
*/
public void throwIfReached() throws IOException {
if (Thread.interrupted()) {
throw new InterruptedIOException("thread interrupted");
}
if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
throw new InterruptedIOException("deadline reached");
}
}
大家先看下上面的注釋阳懂,我英語(yǔ)不是很好,大家如果有更好的翻譯柜思,請(qǐng)?zhí)崾疚已业鳎曳g如下:
如果到達(dá)了最后的時(shí)間,會(huì)拋出InterruptedIOException赡盘,或者線程被interrupted了也會(huì)拋出InterruptedIOException誊辉。該方法是不會(huì)檢查超時(shí)的,應(yīng)該是是一個(gè)異步進(jìn)度操作單元來(lái)實(shí)現(xiàn)這個(gè)類亡脑,進(jìn)行檢查超時(shí)。
但是當(dāng)我們看okio對(duì)于socket的封裝時(shí)
/**
* Returns a sink that writes to {@code socket}. Prefer this over {@link
* #sink(OutputStream)} because this method honors timeouts. When the socket
* write times out, the socket is asynchronously closed by a watchdog thread.
*/
public static Sink sink(Socket socket) throws IOException {
if (socket == null) throw new IllegalArgumentException("socket == null");
AsyncTimeout timeout = timeout(socket);
Sink sink = sink(socket.getOutputStream(), timeout);
return timeout.sink(sink);
}
2、AsyncTimeout
這里出現(xiàn)一個(gè)AsyncTimeout類霉咨,這個(gè)實(shí)際上繼承于TimeOut所實(shí)現(xiàn)的一個(gè)異步超時(shí)類蛙紫,這個(gè)異步類比同步要復(fù)雜的多,它使用了人一個(gè)WatchDog線程在后臺(tái)進(jìn)行監(jiān)聽(tīng)超時(shí)途戒。
這里面用到了Watchdog,Watchdog是AsyncTimeout的靜態(tài)內(nèi)部類坑傅,那么我們來(lái)下看Watchdog是個(gè)什么東西
private static final class Watchdog extends Thread {
public Watchdog() {
super("Okio Watchdog");
setDaemon(true);
}
public void run() {
while (true) {
try {
AsyncTimeout timedOut;
synchronized (AsyncTimeout.class) {
timedOut = awaitTimeout();
// Didn't find a node to interrupt. Try again.
if (timedOut == null) continue;
// The queue is completely empty. Let this thread exit and let another watchdog thread
// get created on the next call to scheduleTimeout().
if (timedOut == head) {
head = null;
return;
}
}
// Close the timed out node.
timedOut.timedOut();
} catch (InterruptedException ignored) {
}
}
}
}
這里的WatchDog并不是Linux中的那個(gè),只是一個(gè)繼承于Thread的一類喷斋,里面的run方法執(zhí)行的就是超時(shí)的判斷唁毒,之所以在socket寫(xiě)時(shí)采取異步超時(shí),這完全是由socket自身的性質(zhì)決定的星爪,socket經(jīng)常會(huì)阻塞自己浆西,導(dǎo)致下面的事情執(zhí)行不了。AsyncTimeout繼承Timeout類顽腾,可以復(fù)寫(xiě)里面的timeout方法近零,這個(gè)方法會(huì)在watchdao的線程中調(diào)用,所以不能執(zhí)行長(zhǎng)時(shí)間的操作抄肖,否則就會(huì)引起其他的超時(shí)久信。
下面詳細(xì)分析AsyncTimeout這個(gè)類
//不要一次寫(xiě)超過(guò)64k的數(shù)據(jù)否則可能會(huì)在慢連接中導(dǎo)致超時(shí)
private static final int TIMEOUT_WRITE_SIZE = 64 * 1024;
private static AsyncTimeout head;
private boolean inQueue;
private AsyncTimeout next;
private long timeoutAt;
首先就是一個(gè)最大的寫(xiě)值,定義為64K漓摩,剛好和一個(gè)Buffer大小一樣裙士。官方的解釋是如果連續(xù)讀寫(xiě)超過(guò)這個(gè)數(shù)字的字節(jié),那么及其容易導(dǎo)致超時(shí)管毙,所以為了限制這個(gè)操作腿椎,直接給出了一個(gè)能寫(xiě)的最大數(shù)。
下面兩個(gè)參數(shù)head和next锅风,很明顯表明這是一個(gè)單鏈表酥诽,timeoutAt則是超時(shí)時(shí)間。使用者在操作之前首先要調(diào)用enter()方法皱埠,這樣相當(dāng)于注冊(cè)了這個(gè)超時(shí)監(jiān)聽(tīng)肮帐,然后配對(duì)的實(shí)現(xiàn)exit()方法。這樣exit()有一個(gè)返回值會(huì)表明超時(shí)是否出發(fā)边器,注意:這個(gè)timeout是異步的训枢,可能會(huì)在exit()后才調(diào)用
public final void enter() {
if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
long timeoutNanos = timeoutNanos();
boolean hasDeadline = hasDeadline();
if (timeoutNanos == 0 && !hasDeadline) {
return; // No timeout and no deadline? Don't bother with the queue.
}
inQueue = true;
scheduleTimeout(this, timeoutNanos, hasDeadline);
}
這里只是判斷了inQueue的狀態(tài),然后設(shè)置inQueue的狀態(tài)忘巧,真正的調(diào)用是在scheduleTimeout()里面恒界,那我們進(jìn)去看下
private static synchronized void scheduleTimeout(
AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
// Start the watchdog thread and create the head node when the first timeout is scheduled.
//head==null,表明之前沒(méi)有砚嘴,本次是第一次操作十酣,開(kāi)啟Watchdog守護(hù)線程
if (head == null) {
head = new AsyncTimeout();
new Watchdog().start();
}
long now = System.nanoTime();
//如果有最長(zhǎng)限制(hasDeadline我翻譯為最長(zhǎng)限制)涩拙,并且超時(shí)時(shí)長(zhǎng)不為0
if (timeoutNanos != 0 && hasDeadline) {
// Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around,
// Math.min() is undefined for absolute values, but meaningful for relative ones.
//對(duì)比最長(zhǎng)限制和超時(shí)時(shí)長(zhǎng),選擇最小的那個(gè)值
node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
} else if (timeoutNanos != 0) {
//如果沒(méi)有最長(zhǎng)限制耸采,但是超時(shí)時(shí)長(zhǎng)不為0兴泥,則使用超時(shí)時(shí)長(zhǎng)
node.timeoutAt = now + timeoutNanos;
} else if (hasDeadline) {
//如果有最長(zhǎng)限制,但是超時(shí)時(shí)長(zhǎng)為0虾宇,則使用最長(zhǎng)限制
node.timeoutAt = node.deadlineNanoTime();
} else {
//如果既沒(méi)有最長(zhǎng)限制搓彻,和超時(shí)時(shí)長(zhǎng),則拋異常
throw new AssertionError();
}
// Insert the node in sorted order.
long remainingNanos = node.remainingNanos(now);
for (AsyncTimeout prev = head; true; prev = prev.next) {
//如果下一個(gè)為null或者剩余時(shí)間比下一個(gè)短 就插入node
if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
node.next = prev.next;
prev.next = node;
if (prev == head) {
AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
}
break;
}
}
}
上面可以看出這個(gè)鏈表實(shí)際上是按照剩余的超時(shí)時(shí)間來(lái)進(jìn)行排序的嘱朽,快到超時(shí)的節(jié)點(diǎn)排在表頭旭贬,一次往后遞增。我們以一個(gè)read的代碼來(lái)砍整個(gè)超時(shí)的綁定過(guò)程搪泳。
@Override
public long read(Buffer sink, long byteCount) throws IOException {
boolean throwOnTimeout = false;
enter();
try {
long result = source.read(sink,byteCount);
throwOnTimeout = true;
return result;
}catch (IOException e){
throw exit(e);
}finally {
exit(throwOnTimeout);
}
}
首先調(diào)用enterf方法稀轨,然后去做讀的操作,這里可以砍到不僅在catch上而且是在finally中也做了操作森书,這樣一場(chǎng)和正常的情況都考慮到了靶端,在exit中調(diào)用了真正的exit方法,exit中會(huì)判斷這個(gè)異步超時(shí)對(duì)象是否在鏈表中
final void exit(boolean throwOnTimeout) throws IOException {
boolean timeOut = exit();
if (timeOut && throwOnTimeout)
throw newTimeoutException(null);
}
public final boolean exit(){
if (!inQueue)
return false;
inQueue = false;
return cancelScheduledTimeout(this);
}
回到前面說(shuō)的的WatchDog凛膏,內(nèi)部的run方法是一個(gè)while(true)的一個(gè)死循環(huán)杨名,由于在while(true)里面鎖住了內(nèi)部的awaitTimeout的操作,這個(gè)await正是判斷是否超時(shí)的真正地方猖毫。
static AsyncTimeout awaitTimeout() throws InterruptedException {
//拿到下一個(gè)節(jié)點(diǎn)
AsyncTimeout node = head.next;
//如果queue為空台谍,等待直到有node進(jìn)隊(duì),或者觸發(fā)IDLE_TIMEOUT_MILLS
if (node == null) {
long startNanos = System.nanoTime();
AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLS);
return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS ? head : null;
}
long waitNanos = node.remainingNanos(System.nanoTime());
//這個(gè)head依然還沒(méi)有超時(shí)吁断,繼續(xù)等待
if (waitNanos > 0) {
long waitMills = waitNanos / 1000000L;
waitNanos -= (waitMills * 1000000L);
AsyncTimeout.class.wait(waitMills, (int) waitNanos);
return null;
}
head.next = node.next;
node.next = null;
return node;
}
這里就比較清晰了趁蕊,主要就是通過(guò)這個(gè)remainNanos來(lái)判斷預(yù)定的超時(shí)時(shí)間減去當(dāng)前時(shí)間是否大于0,如果比0大就說(shuō)明還沒(méi)超時(shí)仔役,于是wait剩余的時(shí)間掷伙,然后表示沒(méi)有超時(shí),如果小于0又兵,就會(huì)把這個(gè)從鏈表中移除任柜,根據(jù)前面的exit方法中的判斷就能觸發(fā)整個(gè)超時(shí)的方法。
所以Buffer的寫(xiě)操作沛厨,實(shí)際上就是不斷增加Segment的一個(gè)過(guò)程宙地,讀操作,就是不斷消耗Segment中的數(shù)據(jù)逆皮,如果數(shù)據(jù)讀取完宅粥,則使用SegmentPool進(jìn)行回收。Buffer更多的邏輯主要是跨Segment讀取數(shù)據(jù)电谣,需要把前一個(gè)Segment的前端拼接在一起秽梅,因此看起來(lái)代碼相對(duì)很多抹蚀,但是其實(shí)開(kāi)銷非常低
八、okio的優(yōu)雅之處
(一)企垦、okio類
在說(shuō)okio的設(shè)計(jì)模式之前况鸣,先說(shuō)下okio這這個(gè)類,該類是一個(gè)大工廠竹观,為我們創(chuàng)建出各種各樣的Sink、Source 對(duì)象潜索,提供了三種數(shù)據(jù)源InputStream/OutputStream臭增、Socket、File竹习,我們可以把本該對(duì)這個(gè)三類數(shù)據(jù)源的IO操作通過(guò)okio庫(kù)來(lái)實(shí)現(xiàn)誊抛,更方便,更高效整陌。
(二) 圖演示
1拗窃、okio的類圖
2、okio讀寫(xiě)流程圖
(三) okio高效方便之處
- 1泌辫、它對(duì)數(shù)據(jù)進(jìn)行了分塊處理(Segment)随夸,這樣在大數(shù)據(jù)IO的時(shí)候可以以塊為單位進(jìn)行IO,這可以提高IO的吞吐率
- 2震放、它對(duì)這些數(shù)據(jù)塊使用鏈表來(lái)進(jìn)行管理宾毒,這可以僅通過(guò)移動(dòng)指針就進(jìn)行數(shù)據(jù)的管理,而不用真正的處理數(shù)據(jù)殿遂,而且對(duì)擴(kuò)容來(lái)說(shuō)十分方便.
- 3诈铛、閑置的塊進(jìn)行管理,通過(guò)一個(gè)塊池(SegmentPool)的管理墨礁,避免系統(tǒng)GC和申請(qǐng)byte時(shí)的zero-fill幢竹。其他的還有一些小細(xì)節(jié)上的優(yōu)化,比如如果你把一個(gè)UTF-8的String轉(zhuǎn)化為ByteString恩静,ByteString會(huì)保留一份對(duì)原來(lái)String的引用焕毫,這樣當(dāng)你下次需要decode這個(gè)String時(shí),程序通過(guò)保留的引用直接返回對(duì)應(yīng)的String蜕企,從而避免了轉(zhuǎn)碼過(guò)程咬荷。
- 4、他為所有的Source轻掩、Sink提供了超時(shí)操作幸乒,這是在Java原生IO操作是沒(méi)有的。
- 5唇牧、okio它對(duì)數(shù)據(jù)的讀寫(xiě)都進(jìn)行了封裝罕扎,調(diào)用者可以十分方便的進(jìn)行各種值(Stringg,short,int,hex,utf-8,base64等)的轉(zhuǎn)化聚唐。
九、FileSystem
public interface FileSystem {
/** The host machine's local file system. */
FileSystem SYSTEM = new FileSystem() {
@Override public Source source(File file) throws FileNotFoundException {
return Okio.source(file);
}
@Override public Sink sink(File file) throws FileNotFoundException {
try {
return Okio.sink(file);
} catch (FileNotFoundException e) {
// Maybe the parent directory doesn't exist? Try creating it first.
file.getParentFile().mkdirs();
return Okio.sink(file);
}
}
@Override public Sink appendingSink(File file) throws FileNotFoundException {
try {
return Okio.appendingSink(file);
} catch (FileNotFoundException e) {
// Maybe the parent directory doesn't exist? Try creating it first.
file.getParentFile().mkdirs();
return Okio.appendingSink(file);
}
}
@Override public void delete(File file) throws IOException {
// If delete() fails, make sure it's because the file didn't exist!
if (!file.delete() && file.exists()) {
throw new IOException("failed to delete " + file);
}
}
@Override public boolean exists(File file) {
return file.exists();
}
@Override public long size(File file) {
return file.length();
}
@Override public void rename(File from, File to) throws IOException {
delete(to);
if (!from.renameTo(to)) {
throw new IOException("failed to rename " + from + " to " + to);
}
}
@Override public void deleteContents(File directory) throws IOException {
File[] files = directory.listFiles();
if (files == null) {
throw new IOException("not a readable directory: " + directory);
}
for (File file : files) {
if (file.isDirectory()) {
deleteContents(file);
}
if (!file.delete()) {
throw new IOException("failed to delete " + file);
}
}
}
};
/** Reads from {@code file}. */
Source source(File file) throws FileNotFoundException;
/**
* Writes to {@code file}, discarding any data already present. Creates parent directories if
* necessary.
*/
Sink sink(File file) throws FileNotFoundException;
/**
* Writes to {@code file}, appending if data is already present. Creates parent directories if
* necessary.
*/
Sink appendingSink(File file) throws FileNotFoundException;
/** Deletes {@code file} if it exists. Throws if the file exists and cannot be deleted. */
void delete(File file) throws IOException;
/** Returns true if {@code file} exists on the file system. */
boolean exists(File file);
/** Returns the number of bytes stored in {@code file}, or 0 if it does not exist. */
long size(File file);
/** Renames {@code from} to {@code to}. Throws if the file cannot be renamed. */
void rename(File from, File to) throws IOException;
/**
* Recursively delete the contents of {@code directory}. Throws an IOException if any file could
* not be deleted, or if {@code dir} is not a readable directory.
*/
void deleteContents(File directory) throws IOException;
}
看完這段代碼大家就會(huì)知道腔召,F(xiàn)ileSystem是一個(gè)接口杆查,里面有一個(gè)他的實(shí)現(xiàn)類SYSTEM.所以可以FileSystem看成okhttp中文件系統(tǒng)對(duì)okio的橋接管理類。
看下他的所有方法:
- 1、Source source(File): 獲取File的source (用于讀)
- 2袋狞、Sinke sink(File):獲取File的Sink (用于寫(xiě))
- 3炮车、Sink appending(File): 獲取File的Sink,拼接用的(用于寫(xiě))
- 4、delete(File): void 刪除文件
- 5客峭、exists(File):文件是否存在
- 6、size(Flie):獲取文件的大小
- 7抡柿、rename(File舔琅,F(xiàn)ile): 文件改名
- 8、deleteContents(File):刪除文件夾