前言
好久沒有更新,最近在閱讀flutter相關(guān)源碼。之后會(huì)整理一下既们,把自己的學(xué)習(xí)源碼思考寫出來号杏。最近看到了flutter的http請(qǐng)求盾致,dio相關(guān)的源碼穗酥,不由的想到在Android開發(fā)中常用網(wǎng)絡(luò)請(qǐng)求抽高,OKHttp是怎么工作的。想起這一塊沒有做總結(jié)瞭亮,也就來寫寫OkHttp的源碼原理總結(jié)。
要弄懂OkHttp娶桦,我們需要大致理解OkHttp的框架脈絡(luò)知牌。為什么OkHttp的命名要冠以O(shè)k的前綴角寸?究其根源忿墅,是因?yàn)镺kHttp的所有io操作都建立在Okio之上,因此研究Okio是必要的。
我大致上把OkHttp劃分為如下幾個(gè)模塊來分別講解:
- 1.Okio源碼解析,關(guān)于OkHttp是如何提高IO的執(zhí)行性能
- 2.OKHttp把整個(gè)網(wǎng)絡(luò)請(qǐng)求邏輯拆成7個(gè)攔截器,設(shè)計(jì)成責(zé)任鏈模式的處理。
- 3.retryAndFollowUpInterceptor 重試攔截器
- 4.BridgeInterceptor 建立網(wǎng)絡(luò)橋梁的攔截器察绷,主要是為了給網(wǎng)絡(luò)請(qǐng)求時(shí)候蚜印,添加各種各種必要參數(shù)较木。如Cookie萎馅,Content-type
- 5.CacheInterceptor 緩存攔截器晃虫,主要是為了在網(wǎng)絡(luò)請(qǐng)求時(shí)候扛吞,根據(jù)返回碼處理緩存。
- 6.ConnectInterceptor 鏈接攔截器荆责,主要是為了從鏈接池子中查找可以復(fù)用的socket鏈接喻粹。
- 7.CallServerInterceptor 真正執(zhí)行網(wǎng)絡(luò)請(qǐng)求的邏輯。
- 8.Interceptor 用戶定義的攔截器草巡,在重試攔截器之前執(zhí)行
- 9.networkInterceptors 用戶定義的網(wǎng)絡(luò)攔截器守呜,在CallServerInterceptor(執(zhí)行網(wǎng)絡(luò)請(qǐng)求攔截器)之前運(yùn)行。
本文將和大家講述Okio的設(shè)計(jì)原理山憨,以及從源碼的角度看看Okio為何如此設(shè)計(jì)查乒。
當(dāng)然這部分代碼應(yīng)該很多人熟悉,如果熟悉這些的人來說郁竟,本文是在浪費(fèi)你的時(shí)間玛迄。
正文
NIO的原理時(shí)序圖
OKio本質(zhì)上是對(duì)Java的NIO的一次擴(kuò)展,并且做了緩存的優(yōu)化棚亩,為了徹底明白OKio為何如此設(shè)計(jì)蓖议,我們先來看看一個(gè)Java中如何使用簡單的NIO虏杰。
NIO有三個(gè)基本角色:
- 1.Channel 通道: 數(shù)據(jù)的源頭和重點(diǎn)
- 2.Buffer 緩沖區(qū): 數(shù)據(jù)的緩沖區(qū)
- 3.Selector 選擇器:實(shí)現(xiàn)異步,非阻塞IO
借用網(wǎng)上一副總結(jié)比較好的圖:
channel和buffer之間的關(guān)系如圖:
而selector會(huì)作為非阻塞IO勒虾,對(duì)多個(gè)Channnel進(jìn)行管理纺阔,關(guān)系如圖:
那么NIO和IO有什么區(qū)別呢?
Java NIO和IO之間第一個(gè)最大的區(qū)別是修然,IO是面向流的笛钝,NIO是面向緩沖區(qū)的。NIO可以是非阻塞式的IO操作愕宋,IO則是面向流的阻塞式IO玻靡。
閑話不多少來看看NIO中Buffer和Channel的簡單例子:
public void testNIO(){
try {
File file = new File("./test.txt");
if(!file.exists()){
file.createNewFile();
}
//聲明一個(gè)輸出流
FileOutputStream fout = new FileOutputStream(file);
//獲得輸出流的通道
FileChannel channel = fout.getChannel();
String sendString="hello";
//聲明一個(gè)Byte緩沖區(qū)
ByteBuffer sendBuff = ByteBuffer.wrap(sendString.getBytes());
//寫入通道
channel.write(sendBuff);
sendBuff.clear();
channel.close();
fout.close();
//聲明一個(gè)流
FileInputStream fin = new FileInputStream(file);
//獲得輸入流的通道
FileChannel inchannel = fin.getChannel();
//聲明一個(gè)固定大小的Byte緩沖區(qū)
ByteBuffer readBuff = ByteBuffer.allocate(256);
//讀取第一段數(shù)據(jù)到緩沖區(qū),獲得結(jié)果中贝,-1時(shí)候結(jié)束
int bytesRead = inchannel.read(readBuff);
while (bytesRead != -1){
//寫模式變成讀模式緩存
readBuff.flip();
while (readBuff.hasRemaining()){
System.out.println((char)readBuff.get());
}
//清空已讀的區(qū)域
readBuff.compact();
//繼續(xù)讀取
bytesRead = inchannel.read(readBuff);
}
readBuff.clear();
inchannel.close();
fin.close();
}catch (Exception e){
e.printStackTrace();
}
}
能看到NIO的所有的操作都要經(jīng)過Buffer和Channel進(jìn)行操作囤捻。
我們稍微來看看NIO中FileOutputStream的源碼時(shí)序圖:
能根據(jù)上面的時(shí)序圖,可以簡單的看到實(shí)際上JDK首先簡單的封裝了一層Java API在頂層邻寿,接著會(huì)層層解封進(jìn)入到native層最蕾,最后通過FileChannel調(diào)用到系統(tǒng)調(diào)用。
注意上述流程圖老厌,并沒有涉及到Selector.至于Selector的核心原理本質(zhì)上是對(duì)系統(tǒng)調(diào)用poll()進(jìn)行一次封裝瘟则,不是本文重點(diǎn),而且FileChannel因?yàn)椴荒茉O(shè)置為非阻塞模式枝秤,在這里就不討論醋拧。
為了真正明白其原理,就以普通IO和NIO的write為例子看看淀弹,Java是怎么優(yōu)化整個(gè)讀寫思路丹壕。
NIO和IO的設(shè)計(jì)比較
我們直接看看,假如使用FileOutStream的核心邏輯如下:
public void write(byte b[], int off, int len) throws IOException {
// Android-added: close() check before I/O.
if (closed && len > 0) {
throw new IOException("Stream Closed");
}
// Android-added: Tracking of unbuffered I/O.
tracker.trackIo(len);
// Android-changed: Use IoBridge instead of calling native method.
IoBridge.write(fd, b, off, len);
}
能看到如果使用FileOutStream直接寫入一個(gè)字節(jié)數(shù)組薇溃,就會(huì)直接調(diào)用IoBridgede.write方法菌赖,而這個(gè)方法會(huì)教過Libcore的Linux調(diào)用writeBytes的jni方法,最后會(huì)跑到動(dòng)態(tài)注冊(cè)好的方法:
文件:/libcore/luni/src/main/native/libcore_io_Linux.cpp
static jint Linux_pwriteBytes(JNIEnv* env, jobject, jobject javaFd, jbyteArray javaBytes, jint byteOffset, jint byteCount, jlong offset) {
ScopedBytesRO bytes(env, javaBytes);
if (bytes.get() == NULL) {
return -1;
}
return IO_FAILURE_RETRY(env, ssize_t, pwrite64, javaFd, bytes.get() + byteOffset, byteCount, offset);
}
能看到我們調(diào)用流的時(shí)候沐序,本質(zhì)上是直接調(diào)用系統(tǒng)調(diào)用pwrite(隨機(jī)寫)琉用。
如果研究過Linux編程的哥們必定會(huì)清楚這么做有一個(gè)十分大的缺陷,十分致命策幼。
Linux優(yōu)化write的方案
在Linux編程中邑时,肯定有人會(huì)比較過同樣是文件寫操作的fwrite和系統(tǒng)調(diào)用write。
試著思考一下特姐,假如調(diào)用10000次fwrite和10000次write誰的耗時(shí)會(huì)更加少晶丘?
我第一次接觸的時(shí)候,想當(dāng)然的以為當(dāng)然是write啊,write是系統(tǒng)調(diào)用浅浮,更加接近內(nèi)核的核心api沫浆。但是事實(shí)恰恰是相反。fwrite的速度比write快的多滚秩。
為什么會(huì)是這樣的結(jié)果呢专执?實(shí)際上恰恰是因?yàn)樘l繁的調(diào)用調(diào)用系統(tǒng)調(diào)用,每一次調(diào)用系統(tǒng)調(diào)用進(jìn)入內(nèi)核態(tài)都必須存儲(chǔ)當(dāng)前寄存器中所有的狀態(tài)叔遂,當(dāng)恢復(fù)會(huì)到用戶態(tài)的時(shí)候他炊,又要還原回去争剿,一來二去反而開銷更大已艰。
那么fwrite的實(shí)現(xiàn),很容易猜想到本質(zhì)上也是對(duì)系統(tǒng)調(diào)用write上進(jìn)行了一次封裝蚕苇。
其核心思路如下圖:
通過一個(gè)緩沖區(qū)哩掺,等到緩沖區(qū)填滿之后,在調(diào)用系統(tǒng)調(diào)用write寫入磁盤中涩笤。通過這種方式調(diào)用嚼吞,減少系統(tǒng)調(diào)用的次數(shù),從而增加io讀寫的效率蹬碧。
FileChannel的優(yōu)化
那么Linux是如此優(yōu)化舱禽,那么Java又是如何優(yōu)化的?本質(zhì)上和Linux優(yōu)化十分相似恩沽。
我們看看其核心代碼:
文件:/libcore/ojluni/src/main/java/sun/nio/ch/IOUtil.java
static int write(FileDescriptor fd, ByteBuffer src, long position,
NativeDispatcher nd)
throws IOException
{
if (src instanceof DirectBuffer)
return writeFromNativeBuffer(fd, src, position, nd);
// Substitute a native buffer
int pos = src.position();
int lim = src.limit();
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0);
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
try {
bb.put(src);
bb.flip();
// Do not update src until we see how many bytes were written
src.position(pos);
int n = writeFromNativeBuffer(fd, bb, position, nd);
if (n > 0) {
// now update src
src.position(pos + n);
}
return n;
} finally {
Util.offerFirstTemporaryDirectBuffer(bb);
}
}
private static int writeFromNativeBuffer(FileDescriptor fd, ByteBuffer bb,
long position, NativeDispatcher nd)
throws IOException
{
int pos = bb.position();
int lim = bb.limit();
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0);
int written = 0;
if (rem == 0)
return 0;
if (position != -1) {
written = nd.pwrite(fd,
((DirectBuffer)bb).address() + pos,
rem, position);
} else {
written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem);
}
if (written > 0)
bb.position(pos + written);
return written;
}
首先解釋一下DirectBuffer誊稚,它在native下面申請(qǐng)一段空間,這一段空間會(huì)隨著DirectBuffer對(duì)象存在而存在罗心,最后會(huì)通過Cleaner的方式調(diào)用native的方法釋放native下面空間里伯,詳細(xì)的可以看我寫的Binder的死亡代理一文中,有詳細(xì)的描述這種技術(shù)渤闷。
因此我們拿到對(duì)象的地址疾瓮,就能根據(jù)寫入的類型進(jìn)行對(duì)這段地址的隨機(jī)讀寫。這就是DirectBuffer的本質(zhì)飒箭。
當(dāng)然Java不會(huì)隨意的開辟新的DirectBuffer狼电,而是通過享元設(shè)計(jì),減少DirectBuffer開辟弦蹂,把不需要的對(duì)象暫時(shí)存放到cache中漫萄,核心如下:
public static ByteBuffer getTemporaryDirectBuffer(int size) {
BufferCache cache = bufferCache.get();
ByteBuffer buf = cache.get(size);
if (buf != null) {
return buf;
} else {
// No suitable buffer in the cache so we need to allocate a new
// one. To avoid the cache growing then we remove the first
// buffer from the cache and free it.
if (!cache.isEmpty()) {
buf = cache.removeFirst();
free(buf);
}
return ByteBuffer.allocateDirect(size);
}
}
總結(jié):Java對(duì)文件讀寫有著一樣的理解。Channel+Buffer的讀寫方式本質(zhì)上是對(duì)緩存區(qū)進(jìn)行讀寫操作盈匾。當(dāng)我們把緩沖區(qū)中的寫滿時(shí)候腾务,再進(jìn)行一次write的寫入,就能避免頻繁的調(diào)用系統(tǒng)調(diào)用削饵。
這也是Android性能優(yōu)化中IO優(yōu)化的核心思想之一岩瘦,為了避免過多頻繁的調(diào)用讀寫操作未巫,我們必須適當(dāng)?shù)脑O(shè)置讀寫大小,避免過度調(diào)用系統(tǒng)調(diào)用启昧,或者一口氣寫入過多的內(nèi)容導(dǎo)致一口氣申請(qǐng)過多的pageCache叙凡,導(dǎo)致內(nèi)存驟降,可能會(huì)觸發(fā)臟數(shù)據(jù)的寫到磁盤中密末,導(dǎo)致系統(tǒng)cpu過于繁忙握爷。
關(guān)于Android更多的優(yōu)化之后會(huì)開一個(gè)專欄來聊聊。
Okio的概述
當(dāng)我們得知Linux严里,Java api中是如何優(yōu)化io的.那么Okio又是如何優(yōu)化的呢新啼?本質(zhì)上還是無法脫離這個(gè)思路,讓我們一探究竟吧刹碾。在使用之前燥撞,按照慣例,看看Okio是如何使用的迷帜?
@Test
public void testOkio(){
try {
File file = new File("./test.txt");
if(!file.exists()){
file.createNewFile();
}
Sink sink = Okio.sink(file);
BufferedSink bufferedSink = Okio.buffer(sink);
bufferedSink.writeUtf8("hello world\n");
bufferedSink.flush();
bufferedSink.close();
Source source = Okio.source(file);
BufferedSource bufferedSource= Okio.buffer(source);
while(true){
String line = bufferedSource.readUtf8Line();
if(line == null){
break;
}
System.out.println(line);
}
}catch (Exception e){
e.printStackTrace();
}
}
這段事例代碼包含了Okio是如何讀寫的物舒。我們能夠看到在Okio中,存在著三個(gè)核心對(duì)象:
- Source 數(shù)據(jù)讀取對(duì)象
- Sink 數(shù)據(jù)寫入對(duì)象
- Buffer Okio的讀寫緩沖對(duì)象
通過上面兩個(gè)例子戏锹,雖然沒有看到Buffer的存在冠胯,是因?yàn)镺kio在操作的過程中隱藏了這個(gè)對(duì)象的操作。
為了更好的理解這幾個(gè)對(duì)象之間的關(guān)系锦针,我畫了一副UML圖:
能看到整個(gè)Okio繼承和實(shí)現(xiàn)的關(guān)系比較復(fù)雜荠察。但是面向我們的api一般是Buffer,以及封裝好的Source,Sink伞插。RealBufferedSource和RealBufferedSink往往承載著核心的讀寫操作割粮。Buffer則作為Okio的緩沖區(qū)。
當(dāng)然還有其他的媚污,如GzipSource舀瓢,GzipSink,HashingSource耗美,HashingSin讀寫操作對(duì)象京髓。不過這一期我們把重點(diǎn)放在RealBuffered系列上。
從類的關(guān)系圖上就能看到商架,我為什么說Okio是對(duì)nio的一次擴(kuò)展堰怨。因?yàn)镺kio的讀寫操作對(duì)象Source和Sink,繼承的是Channel對(duì)象蛇摸。本質(zhì)上是一種讀寫流的通道备图。因此可以聯(lián)合Selector進(jìn)行nio讀寫操作。
總結(jié)一下,Okio的讀寫操作一般是按照如下順序進(jìn)行讀寫:
- 1.Okio生成一個(gè)Sink或者Source對(duì)象
- 2.Okio通過調(diào)用buffer對(duì)象揽涮,把生成的Sink或者Source對(duì)象包裹起來抠藕,變成可以操作Buffer的讀寫操作對(duì)象
- 3.調(diào)用Sink或者Source的讀寫操作
如果熟悉這個(gè)操作,就不難理解上面設(shè)計(jì)蒋困。因?yàn)槲覀冃枰b飾設(shè)計(jì)模式盾似,層層包裹。那么前提就是需要對(duì)外暴露一致的接口雪标,因此我們能夠看到整個(gè)UML的類關(guān)系圖中零院,面向真正的讀寫操作繼承的核心操作幾乎都是一致的。
既然如此村刨,接下來的源碼分析就按照這個(gè)調(diào)用流程走告抄,就是最清晰的思路。
Okio的讀寫優(yōu)化
接下來烹困,我們就以寫操作為例子玄妈,看看OKio是如何優(yōu)化讀寫的乾吻。這里我選擇是2.4kotlin的版本進(jìn)行分析髓梅,順道記錄一下一些kotlin有趣的特性。
Okio閱讀準(zhǔn)備
首先我們已經(jīng)沒有辦法看到以前版本的Okio.java的類绎签,取而代之的是Okio.kt文件枯饿。
這個(gè)文件定義了若干個(gè)擴(kuò)展方法。如果是在文件中直接聲明方法诡必,那么調(diào)用方式如OkioKt.sink().因此這里是使用了一個(gè)注解在package上奢方,重新定義調(diào)用對(duì)象:
@file:JvmName("Okio")
package okio
了解了這個(gè),我們來看看下面的所有常用的靜態(tài)方法:
actual fun Source.buffer(): BufferedSource = RealBufferedSource(this)
actual fun Sink.buffer(): BufferedSink = RealBufferedSink(this)
/** Returns a sink that writes to `out`. */
fun OutputStream.sink(): Sink = OutputStreamSink(this, Timeout())
/** Returns a source that reads from `in`. */
fun InputStream.source(): Source = InputStreamSource(this, Timeout())
/** Returns a sink that writes to `file`. */
@JvmOverloads
@Throws(FileNotFoundException::class)
fun File.sink(append: Boolean = false): Sink = FileOutputStream(this, append).sink()
/** Returns a source that reads from `file`. */
@Throws(FileNotFoundException::class)
fun File.source(): Source = inputStream().source()
能看到sink()和source()有幾種參數(shù)如File爸舒,InputStream蟋字,OutputStream等。
這里解釋一下扭勉,前面的File.xxx 的前綴File指的是什么類的擴(kuò)展方法鹊奖,同時(shí)需要外部作為參數(shù)傳遞進(jìn)來。
Okio生成一個(gè)Sink或者Source對(duì)象
以write操作為例子:
fun File.sink(append: Boolean = false): Sink = FileOutputStream(this, append).sink()
這里以File作為參數(shù)輸入涂炎,此時(shí)該方法中的this就是指傳遞進(jìn)來的參數(shù)忠聚。接著繼續(xù).sink()。意思是繼續(xù)以FileOutputStream作為參數(shù)唱捣,調(diào)用OutputStream擴(kuò)展方法sink两蟀。
fun OutputStream.sink(): Sink = OutputStreamSink(this, Timeout())
只有new一個(gè)OutputStreamSink對(duì)象,這個(gè)對(duì)象本質(zhì)上就是一個(gè)Sink對(duì)象,寫數(shù)據(jù)對(duì)象:
private class OutputStreamSink(
private val out: OutputStream,
private val timeout: Timeout
) : Sink {
override fun write(source: Buffer, byteCount: Long) {
checkOffsetAndCount(source.size, 0, byteCount)
var remaining = byteCount
while (remaining > 0) {
timeout.throwIfReached()
val head = source.head!!
val toCopy = minOf(remaining, head.limit - head.pos).toInt()
out.write(head.data, head.pos, toCopy)
head.pos += toCopy
remaining -= toCopy
source.size -= toCopy
if (head.pos == head.limit) {
source.head = head.pop()
SegmentPool.recycle(head)
}
}
}
能看到當(dāng)我們使用了Okio.Sink方法之后震缭,將會(huì)生成一個(gè)OutputStreamSink包裹著OutputStream讓我們操作赂毯,當(dāng)我們調(diào)用寫的時(shí)候,本質(zhì)上就是調(diào)用這個(gè)write中復(fù)寫的對(duì)象。
這是最內(nèi)的部分党涕,但是還不具有優(yōu)化活烙。我們能看到,本質(zhì)上還是在調(diào)用Java的OutputStream進(jìn)行write的操作遣鼓。關(guān)于更多的啸盏,我稍后解釋,不過在這里記住一個(gè)重要的對(duì)象SegmentPool骑祟。
類似的回懦,Okio可以通過source方法生成InputStreamSource對(duì)象。
Okio生成操作Buffer的讀寫操作對(duì)象
接下來我們將會(huì)調(diào)用buffer的方法次企,生成對(duì)應(yīng)的緩沖區(qū)操作對(duì)象
actual fun Source.buffer(): BufferedSource = RealBufferedSource(this)
actual fun Sink.buffer(): BufferedSink = RealBufferedSink(this)
我們一樣以寫操作RealBufferedSink為例子看看源碼怯晕。
internal class RealBufferedSink(
@JvmField val sink: Sink
) : BufferedSink {
@JvmField val bufferField = Buffer()
@JvmField var closed: Boolean = false
@Suppress("OVERRIDE_BY_INLINE") // Prevent internal code from calling the getter.
override val buffer: Buffer
inline get() = bufferField
override fun buffer() = bufferField
我們能看到在類初始化中,就存在了一個(gè)bufferField的Buffer對(duì)象缸棵,一切在RealBufferedSink和RealBufferedSource中的操作對(duì)視對(duì)應(yīng)使用bufferField這個(gè)對(duì)象進(jìn)行讀寫舟茶。
稍微學(xué)習(xí)一個(gè)這里面的內(nèi)聯(lián)屬性
override val buffer: Buffer
inline get() = bufferField
意思是buffer的get方法實(shí)際上是bufferField。當(dāng)我們使用buffer這個(gè)對(duì)象的時(shí)候堵第,會(huì)默認(rèn)使用bufferField贿堰。
調(diào)用RealBufferedSource或者RealBufferedSink的讀寫操作
接下來,會(huì)使用讀寫操作颊亮,把數(shù)據(jù)寫入或者讀進(jìn)緩沖區(qū)宿饱。就以writeUtf8方法為例子看看里面做了什么事情。
override fun writeUtf8(string: String): BufferedSink {
check(!closed) { "closed" }
buffer.writeUtf8(string)
return emitCompleteSegments()
}
我們依次看看這兩個(gè)方法做了什么针余。
Buffer.writeUtf8
actual override fun writeUtf8(string: String): Buffer = writeUtf8(string, 0, string.length)
actual override fun writeUtf8(string: String, beginIndex: Int, endIndex: Int): Buffer =
commonWriteUtf8(string, beginIndex, endIndex)
writeUtf8會(huì)拿到String的Index饲鄙,確定讀寫范圍之后,調(diào)用commonWriteUtf8圆雁。
字符串寫入核心commonWriteUtf8
internal inline fun Buffer.commonWriteUtf8(string: String, beginIndex: Int, endIndex: Int): Buffer {
require(beginIndex >= 0) { "beginIndex < 0: $beginIndex" }
require(endIndex >= beginIndex) { "endIndex < beginIndex: $endIndex < $beginIndex" }
require(endIndex <= string.length) { "endIndex > string.length: $endIndex > ${string.length}" }
// Transcode a UTF-16 Java String to UTF-8 bytes.
var i = beginIndex
while (i < endIndex) {
var c = string[i].toInt()
when {
c < 0x80 -> {
val tail = writableSegment(1)
val data = tail.data
val segmentOffset = tail.limit - i
val runLimit = minOf(endIndex, Segment.SIZE - segmentOffset)
// Emit a 7-bit character with 1 byte.
data[segmentOffset + i++] = c.toByte() // 0xxxxxxx
// Fast-path contiguous runs of ASCII characters. This is ugly, but yields a ~4x performance
// improvement over independent calls to writeByte().
while (i < runLimit) {
c = string[i].toInt()
if (c >= 0x80) break
data[segmentOffset + i++] = c.toByte() // 0xxxxxxx
}
val runSize = i + segmentOffset - tail.limit // Equivalent to i - (previous i).
tail.limit += runSize
size += runSize.toLong()
}
c < 0x800 -> {
// Emit a 11-bit character with 2 bytes.
val tail = writableSegment(2)
/* ktlint-disable no-multi-spaces */
tail.data[tail.limit ] = (c shr 6 or 0xc0).toByte() // 110xxxxx
tail.data[tail.limit + 1] = (c and 0x3f or 0x80).toByte() // 10xxxxxx
/* ktlint-enable no-multi-spaces */
tail.limit += 2
size += 2L
i++
}
c < 0xd800 || c > 0xdfff -> {
// Emit a 16-bit character with 3 bytes.
val tail = writableSegment(3)
/* ktlint-disable no-multi-spaces */
tail.data[tail.limit ] = (c shr 12 or 0xe0).toByte() // 1110xxxx
tail.data[tail.limit + 1] = (c shr 6 and 0x3f or 0x80).toByte() // 10xxxxxx
tail.data[tail.limit + 2] = (c and 0x3f or 0x80).toByte() // 10xxxxxx
/* ktlint-enable no-multi-spaces */
tail.limit += 3
size += 3L
i++
}
else -> {
// c is a surrogate. Make sure it is a high surrogate & that its successor is a low
// surrogate. If not, the UTF-16 is invalid, in which case we emit a replacement
// character.
val low = (if (i + 1 < endIndex) string[i + 1].toInt() else 0)
if (c > 0xdbff || low !in 0xdc00..0xdfff) {
writeByte('?'.toInt())
i++
} else {
// UTF-16 high surrogate: 110110xxxxxxxxxx (10 bits)
// UTF-16 low surrogate: 110111yyyyyyyyyy (10 bits)
// Unicode code point: 00010000000000000000 + xxxxxxxxxxyyyyyyyyyy (21 bits)
val codePoint = 0x010000 + (c and 0x03ff shl 10 or (low and 0x03ff))
// Emit a 21-bit character with 4 bytes.
val tail = writableSegment(4)
/* ktlint-disable no-multi-spaces */
tail.data[tail.limit ] = (codePoint shr 18 or 0xf0).toByte() // 11110xxx
tail.data[tail.limit + 1] = (codePoint shr 12 and 0x3f or 0x80).toByte() // 10xxxxxx
tail.data[tail.limit + 2] = (codePoint shr 6 and 0x3f or 0x80).toByte() // 10xxyyyy
tail.data[tail.limit + 3] = (codePoint and 0x3f or 0x80).toByte() // 10yyyyyy
/* ktlint-enable no-multi-spaces */
tail.limit += 4
size += 4L
i += 2
}
}
}
}
return this
}
能看到忍级,在這個(gè)寫入字符串核心方法中分為幾種情況:
- 1.當(dāng)字符小于0x80
- 2.當(dāng)字符小于0x800
- 3.當(dāng)字符小于0xd800大于0xdfff
- 4.其他情況
為什么分為這幾種情況呢?在16進(jìn)制中0x80用二進(jìn)制表示:1000 0000.還記得一字節(jié)就是8位嗎伪朽。此時(shí)代表的是一個(gè)字節(jié)最大位數(shù)轴咱,也就是一個(gè)Byte。
同理第二個(gè)情況是指2個(gè)字節(jié)的情況驱负,第三個(gè)是指3字節(jié)的情況嗦玖。最后一種是3自己以上的情況。為什么要怎么處理呢跃脊?
就以一個(gè)字節(jié)的情況為例子看看Okio究竟做了什么:
val tail = writableSegment(1)
val data = tail.data
val segmentOffset = tail.limit - i
val runLimit = minOf(endIndex, Segment.SIZE - segmentOffset)
// Emit a 7-bit character with 1 byte.
data[segmentOffset + i++] = c.toByte() // 0xxxxxxx
// Fast-path contiguous runs of ASCII characters. This is ugly, but yields a ~4x performance
// improvement over independent calls to writeByte().
while (i < runLimit) {
c = string[i].toInt()
if (c >= 0x80) break
data[segmentOffset + i++] = c.toByte() // 0xxxxxxx
}
val runSize = i + segmentOffset - tail.limit // Equivalent to i - (previous i).
tail.limit += runSize
size += runSize.toLong()
我們能夠看到宇挫,在buffer寫入數(shù)據(jù)之前都會(huì)調(diào)用writableSegment方法申請(qǐng)一個(gè)對(duì)象出來。
關(guān)于Segment
這個(gè)對(duì)象是一個(gè)Segment:
companion object {
/** The size of all segments in bytes. */
const val SIZE = 8192
/** Segments will be shared when doing so avoids `arraycopy()` of this many bytes. */
const val SHARE_MINIMUM = 1024
}
internal class Segment {
@JvmField val data: ByteArray
/** The next byte of application data byte to read in this segment. */
@JvmField var pos: Int = 0
/** The first byte of available data ready to be written to. */
@JvmField var limit: Int = 0
/** True if other segments or byte strings use the same byte array. */
@JvmField var shared: Boolean = false
/** True if this segment owns the byte array and can append to it, extending `limit`. */
@JvmField var owner: Boolean = false
/** Next segment in a linked or circularly-linked list. */
@JvmField var next: Segment? = null
/** Previous segment in a circularly-linked list. */
@JvmField var prev: Segment? = null
constructor() {
this.data = ByteArray(SIZE)
this.owner = true
this.shared = false
}
這個(gè)對(duì)象內(nèi)部包含這數(shù)組酪术,我們會(huì)把所有的需要寫入的數(shù)據(jù)都轉(zhuǎn)化位字節(jié)器瘪,并且寫入到data數(shù)組中翠储。同時(shí)包含next,pre這個(gè)Segment對(duì)象橡疼,還有一個(gè)limit限制大小大小援所。
看到這個(gè)對(duì)象,就能立即反應(yīng)過來這是一個(gè)雙向鏈表中某一項(xiàng)欣除。
寫入原理
在寫入數(shù)據(jù)的過程中住拭,我們能夠看到有幾個(gè)關(guān)鍵的屬性segmentOffset以及runLimit。
runLimit是通過Segment.SIZE - segmentOffset計(jì)算得出历帚。
segmentOffset是通過tail.limit - index滔岳。雖然limit初始化為0,但是在第一次寫入數(shù)組的時(shí)候挽牢,segmentOffset = segmentOffset +i+1.因此不用擔(dān)心數(shù)組越界
每一次循環(huán)的時(shí)候都以runLimit重點(diǎn)或者遇到了大于一字節(jié)的字符串終止谱煤。每次寫入一個(gè)字符串segmentOffset都會(huì)自增。
所以我們可以得出如下結(jié)論:
- Okio中以buffer作為流的操作對(duì)象禽拔,而每一次操作本質(zhì)上都會(huì)由更加細(xì)粒的segment控制
- limit是一個(gè)segment剩余可以寫入大小極限
- 每一次寫入都需要按照當(dāng)前條件的刘离,如在一字節(jié)中情況只允許寫入一字節(jié),當(dāng)寫入達(dá)到了segment的上限就不允許寫入睹栖。
同理硫惕,整個(gè)情況放到2,3磨淌,4字節(jié)也可以通用疲憋。只是每一次計(jì)算剩余空間的增加計(jì)數(shù)不同罷了凿渊。
Segment的管理
那么Segment是如何管理的呢梁只?其實(shí)上面就通過Segment的數(shù)據(jù)結(jié)構(gòu)就猜測是應(yīng)該是雙向鏈表。
我們直接看看埃脏,核心writableSegment方法:
internal inline fun Buffer.commonWritableSegment(minimumCapacity: Int): Segment {
require(minimumCapacity >= 1 && minimumCapacity <= Segment.SIZE) { "unexpected capacity" }
if (head == null) {
val result = SegmentPool.take() // Acquire a first segment.
head = result
result.prev = result
result.next = result
return result
}
var tail = head!!.prev
if (tail!!.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
tail = tail.push(SegmentPool.take()) // Append a new empty segment to fill up.
}
return tail
}
每一個(gè)Buffer都會(huì)持有一個(gè)名為head的Segment對(duì)象搪锣。當(dāng)head為空,說明Buffer是新創(chuàng)建出來彩掐,則從SegmentPool中獲取一個(gè)Segment是指到頭部构舟,頭尾相互指引。這是很經(jīng)典的鏈表環(huán)設(shè)計(jì)堵幽。
當(dāng)head不為空的時(shí)候狗超,則獲取head的前一個(gè)Segment對(duì)象tail,如果tail的剩余空間不能存放朴下,則需要一個(gè)新的Segment努咐,從SegmentPool中獲取一個(gè)新的。最后通過push方法殴胧,鏈接到鏈表中渗稍。
fun push(segment: Segment): Segment {
segment.prev = this
segment.next = next
next!!.prev = segment
next = segment
return segment
}
新建的segment的prev為tail佩迟,新建的segment的next為tail的next,tail的next的prev為新建的segment竿屹,tail的next為segment报强。
換句話說,就是每一個(gè)新的segment都會(huì)添加到鏈表里面拱燃,最后把整個(gè)環(huán)鏈接起來秉溉。
大致上整個(gè)鏈表結(jié)構(gòu)如下圖:
SegmentPool管理Segment對(duì)象
而在這個(gè)過程中,你能發(fā)現(xiàn)所有的Segment都被SegmentPool管理碗誉。這本質(zhì)上就是一個(gè)享元設(shè)計(jì)模式坚嗜。
在這里面包含如下幾個(gè)基礎(chǔ)方法:
@ThreadLocal
internal object SegmentPool {
/** The maximum number of bytes to pool. */
// TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments?
const val MAX_SIZE = 64 * 1024L // 64 KiB.
/** Singly-linked list of segments. */
var next: Segment? = null
/** Total bytes in this pool. */
var byteCount = 0L
fun take(): Segment {
synchronized(this) {
next?.let { result ->
next = result.next
result.next = null
byteCount -= Segment.SIZE
return result
}
}
return Segment() // Pool is empty. Don't zero-fill while holding a lock.
}
fun recycle(segment: Segment) {
require(segment.next == null && segment.prev == null)
if (segment.shared) return // This segment cannot be recycled.
synchronized(this) {
if (byteCount + Segment.SIZE > MAX_SIZE) return // Pool is full.
byteCount += Segment.SIZE
segment.next = next
segment.limit = 0
segment.pos = segment.limit
next = segment
}
}
}
SegmentPool會(huì)緩存固定大小的Segment進(jìn)來,每一次通過take從中獲取一個(gè)Segment出去诗充,就會(huì)減少內(nèi)部的緩存大小苍蔬。通過release則會(huì)增加內(nèi)部緩存大小,等待Okio的使用蝴蜓。
這樣就能極大的減少很多Segment對(duì)象生成碟绑。實(shí)際上這種思路到處都是。甚至連Activity啟動(dòng)中都能看到茎匠。
Okio讀寫結(jié)束的收尾工作
最后writeUtf8調(diào)用如下方法格仲,結(jié)束整個(gè)調(diào)用:
override fun emitCompleteSegments(): BufferedSink {
check(!closed) { "closed" }
val byteCount = buffer.completeSegmentByteCount()
if (byteCount > 0L) sink.write(buffer, byteCount)
return this
}
還記得,OutStreamSink最后傳遞進(jìn)來诵冒,讓RealBufferedSink調(diào)用寫入凯肋,最后寫入的就是在Okio.kt文件復(fù)寫write方法。
override fun write(source: Buffer, byteCount: Long) {
checkOffsetAndCount(source.size, 0, byteCount)
var remaining = byteCount
while (remaining > 0) {
timeout.throwIfReached()
val head = source.head!!
val toCopy = minOf(remaining, head.limit - head.pos).toInt()
out.write(head.data, head.pos, toCopy)
head.pos += toCopy
remaining -= toCopy
source.size -= toCopy
if (head.pos == head.limit) {
source.head = head.pop()
SegmentPool.recycle(head)
}
}
}
能看到其中汽馋,還是調(diào)用OutputSream的寫入方法侮东,不過這一次寫入的是保存在緩存中的數(shù)組.當(dāng)buffer每寫入一部分就把Segment中的pos進(jìn)行變化。記錄已經(jīng)寫入了多少了豹芯。每一次執(zhí)行寫入結(jié)束后悄雅,當(dāng)發(fā)現(xiàn)Segment的pos剛好達(dá)到限制的大小,說明Segement內(nèi)部已經(jīng)滿了铁蹈,就清空內(nèi)部緩存加入到SegmentPool等待新的使用者調(diào)用宽闲。
總結(jié)
經(jīng)過上面幾個(gè)源碼片段的閱讀,我大致上能夠整理出整個(gè)設(shè)計(jì)核心握牧,如下:
從圖上可以對(duì)比出結(jié)論容诬,Okio和Linux的fwrite,Java的Channel讀寫思路一致沿腰。都是通過做緩存來減少系統(tǒng)調(diào)用的次數(shù)览徒。而Okio做的更加的完善,內(nèi)部所有的操作都要經(jīng)過buffer緩沖區(qū)處理矫俺,而緩沖區(qū)內(nèi)部管理細(xì)粒度更加細(xì)小的Segment吱殉,是通過一個(gè)鏈表環(huán)加上一個(gè)緩沖池來管理掸冤,這樣就能更大限度的使用內(nèi)存,同時(shí)避免了過多的緩存對(duì)象生成友雳。
在互聯(lián)網(wǎng)時(shí)代稿湿,網(wǎng)絡(luò)請(qǐng)求數(shù)目日益增加。為了擁有更好的IO性能押赊,更加細(xì)两忍伲化管理內(nèi)存,找出合適的讀寫緩沖塊大小流礁,是一個(gè)很好的思路涕俗。
后話
為什么突發(fā)奇想要寫Okio呢?
因?yàn)樽罱疽鉬lutter神帅,因此我研究flutter源碼再姑,看到了dart中的異步機(jī)制以及Isolate源碼。發(fā)現(xiàn)現(xiàn)在流行的網(wǎng)絡(luò)請(qǐng)求框架dio也好還是原生的httpclient也好找御,都是在主線程中編寫網(wǎng)絡(luò)請(qǐng)求元镀,這樣就極大的浪費(fèi)我們的自己線程。自己也嘗試著寫了一個(gè)基于Isolate的網(wǎng)絡(luò)請(qǐng)求框架霎桅,也就回顧了一下Okio栖疑,Okhttp的源碼。
比較了一下滔驶,發(fā)現(xiàn)整個(gè)flutter的社區(qū)還是很稚嫩遇革,很多優(yōu)化點(diǎn)也沒有考慮進(jìn)去。之后有機(jī)會(huì)揭糕,會(huì)整理一下萝快,試著寫寫flutter相關(guān)的專題。實(shí)際上在閱讀flutter的底層原理插佛,發(fā)現(xiàn)還是和Android有很多地方設(shè)計(jì)思路互通的杠巡,這也印證了那句話,學(xué)習(xí)東西要學(xué)本質(zhì)」涂埽現(xiàn)在新技術(shù)層出不窮,不要被“亂花漸欲迷人”蚌铜,今天出一個(gè)新技術(shù)就去追捧锨侯,不如靜下心去看看Android的底層思想,去多思考其中設(shè)計(jì)的優(yōu)缺點(diǎn)冬殃。