本文基于Okio 2.4.3源碼分析
Okio - 官方地址
Okio - GitHub代碼地址
Okio 介紹
Okio是什么
Okio來源Square公司绰疤,它是對(duì)java.io和java.nio的進(jìn)一步封裝實(shí)現(xiàn)铜犬,使得更容易處理、訪問轻庆、緩存數(shù)據(jù)癣猾。它最初是作為OkHttp網(wǎng)絡(luò)庫的組件
Okio 特點(diǎn)
Buffer and ByteString
目標(biāo):更好的CPU和Memory綜合表現(xiàn)
- Buffer:通過雙向鏈表的Segment緩存結(jié)構(gòu),當(dāng)從一個(gè)Buffer轉(zhuǎn)移數(shù)據(jù)到另一個(gè)Buffer的時(shí)候余爆,提供重新分配擁有權(quán)達(dá)到無需拷貝纷宇,相比一次深拷貝,效率大大增加蛾方。
- ByteString:Encode UTF-8 String到byteString過程會(huì)緩存原string像捶,Decode過程中則可以直接使用
Source and Sink
- 支持超時(shí)機(jī)制
- 非常輕便,便于實(shí)現(xiàn)桩砰、使用拓春、測(cè)試
Okio 圖文概括
源碼分析
測(cè)試代碼示例
public final class OkioTest {
@Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Test public void readWriteFile() throws Exception {
File file = temporaryFolder.newFile();
// 寫
BufferedSink sink = Okio.buffer(Okio.sink(file));
sink.writeUtf8("Hello, java.io file!");
sink.close();
assertTrue(file.exists());
assertEquals(20, file.length());
// 讀
BufferedSource source = Okio.buffer(Okio.source(file));
assertEquals("Hello, java.io file!", source.readUtf8());
source.close();
}
}
數(shù)據(jù)寫入 Sink
- 實(shí)現(xiàn)類:OutputStreamSink
- 實(shí)現(xiàn)原理:依舊是借助OutputStream進(jìn)行寫入操作
- 寫入流程
- 超時(shí)判斷
- 根據(jù)入?yún)⒌乃鑼懭霐?shù)據(jù)大小,循環(huán)寫入數(shù)據(jù)
- 取Buffer中第一個(gè)Segment亚隅,計(jì)算可讀取的數(shù)據(jù)硼莽,寫入到目標(biāo)輸出流中
- 每次循環(huán)寫入的數(shù)據(jù)大小=min(剩余要寫入數(shù)據(jù)大小,此次循環(huán)取到的Segment中的可讀取數(shù)據(jù)大兄笞荨)懂鸵,直至完成目標(biāo)數(shù)據(jù)大小的寫入
- 每次寫入完畢,對(duì)Segment進(jìn)行移除和回收
// 返回一個(gè)Sink
fun File.sink(append: Boolean = false): Sink = FileOutputStream(this, append).sink()
fun OutputStream.sink(): Sink = OutputStreamSink(this, Timeout())
// Sink實(shí)現(xiàn)類
private class OutputStreamSink(
private val out: OutputStream, // java底層輸出流
private val timeout: Timeout // 超時(shí)機(jī)制
) : Sink {
// buffer寫入到輸出流
override fun write(source: Buffer, byteCount: Long) {
checkOffsetAndCount(source.size, 0, byteCount)
var remaining = byteCount
// 循環(huán)讀取 所需讀取數(shù)據(jù)大小
while (remaining > 0) {
timeout.throwIfReached() // 是否超時(shí)
// Buffer的緩存數(shù)據(jù)是由 Segment雙向鏈表數(shù)據(jù)結(jié)構(gòu)緩存
// 取Buffer的第一個(gè)Segment(緩存片段)
val head = source.head!!
// segment中l(wèi)imit-pos即數(shù)據(jù)大小行疏,這里取兩者最小的那個(gè)數(shù)據(jù)大小
val toCopy = minOf(remaining, head.limit - head.pos).toInt()
// 將目標(biāo)讀取數(shù)據(jù) 寫入到 OutputStream中
out.write(head.data, head.pos, toCopy)
// segment 讀指針遷移 寫入數(shù)據(jù)大小
head.pos += toCopy
// 讀取數(shù)據(jù)大小減少 寫入數(shù)據(jù)大小
remaining -= toCopy
// source即buffer中數(shù)據(jù)大小減少 寫入數(shù)據(jù)大小
source.size -= toCopy
// 如果讀數(shù)據(jù)指針等于寫數(shù)據(jù)指針匆光,證明segment無有效數(shù)據(jù),則臉表移除該Segment隘擎,并執(zhí)行Segment回收方法
if (head.pos == head.limit) {
source.head = head.pop()
SegmentPool.recycle(head)
}
}
}
// flush 執(zhí)行OutputStream的flush方法
override fun flush() = out.flush()
// close 執(zhí)行OutputStream的close方法
override fun close() = out.close()
// 超時(shí)機(jī)制
override fun timeout() = timeout
override fun toString() = "sink($out)"
}
數(shù)據(jù)讀取 Source
- 實(shí)現(xiàn)類:InputStreamSource
- 實(shí)現(xiàn)原理:依舊是借助InputStream進(jìn)行寫入操作
- 讀取流程
- 超時(shí)判斷殴穴,目標(biāo)讀取數(shù)據(jù)大小合法判斷
- 獲取Buffer的尾部Segment,并計(jì)算該Segment可寫入的數(shù)據(jù)最大值 货葬,與目標(biāo)讀取數(shù)據(jù)大小值進(jìn)行比較采幌,取其中小的一個(gè)
- 讀取數(shù)據(jù)到Buffer中
- 返回讀取數(shù)據(jù)大小
// 返回File的一個(gè)Source
fun File.source(): Source = inputStream().source()
// 創(chuàng)建InputStreamSource作為Source返回
fun InputStream.source(): Source = InputStreamSource(this, Timeout())
// InputStreamSource實(shí)現(xiàn)類
private class InputStreamSource(
private val input: InputStream,
private val timeout: Timeout
) : Source {
// 核心read方法實(shí)現(xiàn)
override fun read(sink: Buffer, byteCount: Long): Long {
if (byteCount == 0L) return 0
require(byteCount >= 0) { "byteCount < 0: $byteCount" }
try {
// 是否超時(shí)
timeout.throwIfReached()
// 獲取一個(gè)可寫入數(shù)據(jù)的 Segment
val tail = sink.writableSegment(1)
// 最大可寫入到Buffer中的大小(一個(gè)Segment數(shù)據(jù)最大大小 減去 該Segment已經(jīng)寫入的數(shù)據(jù)大姓鹜啊)
val maxToCopy = minOf(byteCount, Segment.SIZE - tail.limit).toInt()
// 從目標(biāo)讀取流中 休傍,讀取制定大小數(shù)據(jù)到Segment中
val bytesRead = input.read(tail.data, tail.limit, maxToCopy)
// 讀取大小-1異常情況處理
if (bytesRead == -1) {
if (tail.pos == tail.limit) {
// 沒有數(shù)據(jù)烁巫,移除Segment并回收
sink.head = tail.pop()
SegmentPool.recycle(tail)
}
return -1
}
// Segment的limit增加讀取的數(shù)據(jù)大小
tail.limit += bytesRead
// buffer數(shù)據(jù)大小增加 讀取的數(shù)據(jù)大小
sink.size += bytesRead
// 返回本次讀取數(shù)據(jù)的大小
return bytesRead.toLong()
} catch (e: AssertionError) {
if (e.isAndroidGetsocknameError) throw IOException(e)
throw e
}
}
override fun close() = input.close()
override fun timeout() = timeout
override fun toString() = "source($input)"
}
數(shù)據(jù)緩存 Buffer
寫入緩存
BufferedSink 接口
actual interface BufferedSink : Sink, WritableByteChannel {
// Buffer
actual val buffer: Buffer
// 一系列 write方法
actual fun writeXXX(...): BufferedSink
// 將Buffer中數(shù)據(jù)寫入到Sink中
actual fun emitCompleteSegments(): BufferedSink
}
RealBufferedSink 實(shí)現(xiàn)類
- BufferedSink實(shí)現(xiàn)類绿映,含有兩個(gè)重要成員Sink和Buffer
- 接口方法基本上都經(jīng)過 internal/RealBufferedSink一層封裝阱表,方法實(shí)現(xiàn)皆借助Buffer進(jìn)行寫入操作
internal actual class RealBufferedSink actual constructor(
@JvmField actual val sink: Sink // 目標(biāo)寫入Sink
) : BufferedSink {
@JvmField val bufferField = Buffer() // 創(chuàng)建Buffer畔乙,buffer邏輯實(shí)現(xiàn)核心類
@JvmField actual var closed: Boolean = false // 是否關(guān)閉
@Suppress("OVERRIDE_BY_INLINE") // 重載父類buffer getter方法
override val buffer: Buffer
inline get() = bufferField
// 一系列 write方法 。實(shí)現(xiàn)邏輯都是通過internal/RealBufferSink進(jìn)行實(shí)現(xiàn)忙厌,里面都是通過Buffer進(jìn)行寫操作
override fun writeAll(source: Source) = commonWriteAll(source)
override fun write(source: Source, byteCount: Long): BufferedSink = commonWrite(source, byteCount)
override fun writeByte(b: Int) = commonWriteByte(b)
override fun writeShort(s: Int) = commonWriteShort(s)
override fun writeShortLe(s: Int) = commonWriteShortLe(s)
override fun writeInt(i: Int) = commonWriteInt(i)
override fun writeIntLe(i: Int) = commonWriteIntLe(i)
override fun writeLong(v: Long) = commonWriteLong(v)
override fun writeLongLe(v: Long) = commonWriteLongLe(v)
override fun writeDecimalLong(v: Long) = commonWriteDecimalLong(v)
override fun writeHexadecimalUnsignedLong(v: Long) = commonWriteHexadecimalUnsignedLong(v)
override fun emitCompleteSegments() = commonEmitCompleteSegments()
override fun emit() = commonEmit()
...
}
讀取緩存
BufferedSource 接口
actual interface BufferedSource : Source, ReadableByteChannel {
// Buffer
actual val buffer: Buffer
// 一系列 read方法
actual fun readXXX(...): XXX
}
RealBufferedSource 實(shí)現(xiàn)類
- BufferedSource實(shí)現(xiàn)類凫岖,含有兩個(gè)重要成員Source和Buffer
- 接口方法基本上都經(jīng)過 internal/RealBufferedSource一層封裝,方法實(shí)現(xiàn)皆借助Buffer進(jìn)行讀取操作
internal actual class RealBufferedSource actual constructor(
@JvmField actual val source: Source
) : BufferedSource {
@JvmField val bufferField = Buffer() // 創(chuàng)建一個(gè)Buffer
@JvmField actual var closed: Boolean = false
@Suppress("OVERRIDE_BY_INLINE") // 重載父類buffer getter方法
override val buffer: Buffer
inline get() = bufferField
// 一系列 read方法 逢净。實(shí)現(xiàn)邏輯都是通過internal/RealBufferSource進(jìn)行實(shí)現(xiàn)哥放,里面都是通過Buffer進(jìn)行寫操作
override fun readByte(): Byte = commonReadByte()
override fun readByteString(): ByteString = commonReadByteString()
override fun readByteString(byteCount: Long): ByteString = commonReadByteString(byteCount)
override fun readFully(sink: Buffer, byteCount: Long): Unit = commonReadFully(sink, byteCount)
override fun readAll(sink: Sink): Long = commonReadAll(sink)
override fun readUtf8(): String = commonReadUtf8()
override fun readUtf8(byteCount: Long): String = commonReadUtf8(byteCount)
...
}
緩存實(shí)現(xiàn)類 Buffer
緩存邏輯實(shí)現(xiàn)核心類,對(duì)RealBufferedSource和RealBufferedSink中暴露的Api進(jìn)行了實(shí)現(xiàn)爹土,重點(diǎn)看下Buffer的讀寫方法甥雕,其核心設(shè)計(jì)原則就是兼顧C(jī)PU(時(shí)間)和Memory(空間)
- 每一個(gè)Buffer中,包含Segment雙向鏈表結(jié)構(gòu)胀茵,進(jìn)行緩存數(shù)據(jù)存儲(chǔ)社露,多個(gè)固定的Segment有助于避免內(nèi)存的申請(qǐng)和回收,減少內(nèi)存片段
- Buffer既充當(dāng)了緩存寫入角色琼娘,又充當(dāng)了緩存讀取角色峭弟,更有利于實(shí)現(xiàn)buffer平滑實(shí)現(xiàn)一次拷貝
- 讀Buffer數(shù)據(jù)轉(zhuǎn)移到寫B(tài)uffer,1. 如果寫B(tài)uffer可以容納轨奄,則直接拷貝存儲(chǔ)孟害;2.如果寫B(tài)uffer不可以容納,則通過split方法在來一個(gè)Segment進(jìn)行存儲(chǔ)
Buffer挪拟、Segment、SegmentPool巧妙的設(shè)計(jì)击你,兼顧了CPU和Memory的平衡
internal inline fun Buffer.commonRead(sink: Buffer, byteCount: Long): Long {
var byteCount = byteCount
require(byteCount >= 0) { "byteCount < 0: $byteCount" }
if (size == 0L) return -1L
if (byteCount > size) byteCount = size
// 執(zhí)行 Buffer的write方法
sink.write(this, byteCount)
return byteCount
}
internal inline fun Buffer.commonWrite(source: Buffer, byteCount: Long) {
var byteCount = byteCount // 寫入大小
// 邏輯判斷
require(source !== this) { "source == this" }
checkOffsetAndCount(source.size, 0, byteCount)
while (byteCount > 0L) {
// source對(duì)應(yīng)的Segment數(shù)據(jù)大小是否 大于 目標(biāo)寫入大小玉组,大于則只寫入一部分?jǐn)?shù)據(jù)
if (byteCount < source.head!!.limit - source.head!!.pos) {
// 取尾部Segment
val tail = if (head != null) head!!.prev else null
// 尾部不為空,且擁有數(shù)據(jù)寫入權(quán) 且 擁有足夠?qū)懭氲目臻g
if (tail != null && tail.owner &&
byteCount + tail.limit - (if (tail.shared) 0 else tail.pos) <= Segment.SIZE) {
// 將讀取Buffer的數(shù)據(jù)寫入到 本Buffer的尾部Segment中
// writeTo方法 可能是深拷貝或者是淺拷貝丁侄,后面有分析
source.head!!.writeTo(tail, byteCount.toInt())
// 重置source的size 和本Buffer的size
source.size -= byteCount
size += byteCount
return
} else {
// 裝不下讀取的數(shù)據(jù)惯雳,則需要新的Segment來寫入 ,新的segement可能通過shareCopy或者SegemntPool來(見Segment分析)
source.head = source.head!!.split(byteCount.toInt())
}
}
// Remove the source's head segment and append it to our tail.
// source對(duì)應(yīng)Buffer中的head Segment數(shù)據(jù)讀取完畢鸿摇,對(duì)Buffer中的Segment進(jìn)行移除
val segmentToMove = source.head
val movedByteCount = (segmentToMove!!.limit - segmentToMove.pos).toLong()
source.head = segmentToMove.pop()
// 移除后石景,因?yàn)樗锌赡芎泄蚕淼臄?shù)據(jù)(見Segment),所以將其加入本Buffer的tail
// 從而巧妙實(shí)現(xiàn) Buffer 讀寫的復(fù)用
if (head == null) { // head為空拙吉,直接賦值head
head = segmentToMove
segmentToMove.prev = segmentToMove
segmentToMove.next = segmentToMove.prev
} else {
// head不為空潮孽,賦值到尾部
var tail = head!!.prev
tail = tail!!.push(segmentToMove)
// 新的tail加入,確認(rèn)是否需要壓縮
tail.compact()
}
// 重置source大小筷黔,和本Buffer的size大小
source.size -= movedByteCount
size += movedByteCount
byteCount -= movedByteCount
}
}
緩存數(shù)據(jù)片段 Segment
Buffer數(shù)據(jù)存儲(chǔ)片段往史,目標(biāo)數(shù)據(jù)都存儲(chǔ)在Segment中的data字段中,Segment雙向鏈表結(jié)構(gòu)佛舱,提供pop椎例、push進(jìn)行Segment的增加和移除挨决。split和compact對(duì)Segment進(jìn)行拆分和合并,shareCopy為共享數(shù)據(jù)提供一次拷貝便利等
共享機(jī)制
兩個(gè)字段(shared订歪、owner)和兩個(gè)方法(sharedCopy脖祈、unsharedCopy)來實(shí)現(xiàn)
淺拷貝的應(yīng)用會(huì)直接減少一次I/O操作,大大提高I/O效率
- shared:代表該Segment的數(shù)據(jù)是否共享
- owner:代表該Segment是否擁有對(duì)數(shù)據(jù)的寫入權(quán)利
- sharedCopy:data淺拷貝刷晋,共享復(fù)制的Segment中的data數(shù)據(jù)撒犀,其中owner為false(即拷貝的Segment只能讀不能寫)
- unsharedCopy:data深拷貝,完全復(fù)制一個(gè)Segment
拆分與合并
為了Segment的回收掏秩,以及更加合理化存儲(chǔ)數(shù)據(jù)或舞,提供兩個(gè)方法
- compact:合并,如果本Segment與前一個(gè)Segment兩者數(shù)據(jù)可以合為一個(gè)蒙幻,則可以通過compact方法合并為一個(gè)Segment映凳,然后回收本Segment
- split:分割,當(dāng)存儲(chǔ)的數(shù)據(jù)大于當(dāng)前Segment的容量時(shí)邮破,則需要一個(gè)新的Segment诈豌,此時(shí)會(huì)有兩種情況,1.如果數(shù)據(jù)量大于1024則淺拷貝一個(gè)Segment來裝數(shù)據(jù)抒和,如果不是矫渔,則直接深拷貝數(shù)據(jù)創(chuàng)建一個(gè)新的Segment
internal class Segment {
// 緩存的數(shù)據(jù)
@JvmField val data: ByteArray
/** 讀取數(shù)據(jù)的指針 */
@JvmField var pos: Int = 0
/** 寫入數(shù)據(jù)的的指針 */
@JvmField var limit: Int = 0
/** 數(shù)據(jù)是否共享 */
@JvmField var shared: Boolean = false
/** limit擴(kuò)展字段,數(shù)據(jù)是否屬于當(dāng)前的Segment摧莽,true即可以寫入 */
@JvmField var owner: Boolean = false
/** 鏈表數(shù)據(jù)結(jié)構(gòu)的next字段 */
@JvmField var next: Segment? = null
/** 鏈表數(shù)據(jù)結(jié)構(gòu)的prev字段 */
@JvmField var prev: Segment? = null
constructor() {
this.data = ByteArray(SIZE)
this.owner = true
this.shared = false
}
constructor(data: ByteArray, pos: Int, limit: Int, shared: Boolean, owner: Boolean) {
this.data = data
this.pos = pos
this.limit = limit
this.shared = shared
this.owner = owner
}
// data淺拷貝庙洼,創(chuàng)建一個(gè)共享的Segment,owner為false
fun sharedCopy(): Segment {
shared = true
return Segment(data, pos, limit, true, false)
}
// data深拷貝镊辕,創(chuàng)建一個(gè)非共享的Segment油够,
fun unsharedCopy() = Segment(data.copyOf(), pos, limit, false, true)
// 移除本Segment對(duì)象
fun pop(): Segment? {
val result = if (next !== this) next else null
prev!!.next = next
next!!.prev = prev
next = null
prev = null
return result
}
// 添加一個(gè)segment到鏈表中
fun push(segment: Segment): Segment {
segment.prev = this
segment.next = next
next!!.prev = segment
next = segment
return segment
}
// 分割為兩個(gè)Segment,數(shù)據(jù)起始點(diǎn)分別為 `[pos..pos+byteCount)`征懈、`[pos+byteCount..limit)`
fun split(byteCount: Int): Segment {
require(byteCount > 0 && byteCount <= limit - pos) { "byteCount out of range" }
val prefix: Segment
// 1\. 盡量減少二次拷貝石咬;2\. 共享拷貝在一定大小才執(zhí)行,避免過多的短小的Segment
if (byteCount >= SHARE_MINIMUM) {
prefix = sharedCopy() // 共享拷貝
} else {
prefix = SegmentPool.take() // 直接拷貝
data.copyInto(prefix.data, startIndex = pos, endIndex = pos + byteCount)
}
// prefix賦值 limit pos
prefix.limit = prefix.pos + byteCount
pos += byteCount
prev!!.push(prefix)
return prefix
}
// 合并壓縮
fun compact() {
check(prev !== this) { "cannot compact" }
if (!prev!!.owner) return
// 本Segment數(shù)據(jù)大小
val byteCount = limit - pos
// 前一個(gè)Segment剩余可寫入空間大小
val availableByteCount = SIZE - prev!!.limit + if (prev!!.shared) 0 else prev!!.pos
if (byteCount > availableByteCount) return
// 如果有足夠的空間卖哎,則將本Segment數(shù)據(jù)寫入到前一個(gè)Segment
writeTo(prev!!, byteCount)
// pop鬼悠,然后執(zhí)行回收
pop()
SegmentPool.recycle(this)
}
/** 將數(shù)據(jù)寫入到buffer的segment中 */
fun writeTo(sink: Segment, byteCount: Int) {
// 只有owner為true才能寫入數(shù)據(jù)
check(sink.owner) { "only owner can write" }
// 如果被寫入的sink limit+byteCount大于了最大值,則需要先充值下pos和limit
if (sink.limit + byteCount > SIZE) {
if (sink.shared) throw IllegalArgumentException()
// 如果所有空閑都容納不下 亏娜,則拋出異常
if (sink.limit + byteCount - sink.pos > SIZE) throw IllegalArgumentException()
// 將被寫入的segment中的數(shù)據(jù)復(fù)制到 [ 0,limit-pos)
sink.data.copyInto(sink.data, startIndex = sink.pos, endIndex = sink.limit)
// 重置 pos和limit
sink.limit -= sink.pos
sink.pos = 0
}
// 將本Segment的data復(fù)制到sink的data中焕窝,偏移大小為limit,起點(diǎn)為pos
data.copyInto(sink.data, destinationOffset = sink.limit, startIndex = pos,
endIndex = pos + byteCount)
sink.limit += byteCount
pos += byteCount
}
companion object {
/** Segment 大小 bytes */
const val SIZE = 8192
/** 共享數(shù)據(jù)大小 */
const val SHARE_MINIMUM = 1024
}
}
緩存片段池 SegmentPool
- 大姓赵濉:64 * 1024 Kib袜啃,一個(gè)Segment大小8192,相當(dāng)于8個(gè)Segment
- 結(jié)構(gòu):單鏈表結(jié)構(gòu)幸缕,提供take(热悍ⅰ)和recycle(存)Segment方法晰韵,且線程安全
- 作用:防止已申請(qǐng)的資源被回收,增加資源的重復(fù)利用熟妓,提高效率雪猪,減少GC,避免內(nèi)存抖動(dòng)
internal object SegmentPool {
// 緩存Segment池總大小
const val MAX_SIZE = 64 * 1024L // 64 KiB.
// 單鏈表next指針
var next: Segment? = null
// segment緩存池使用大小
var byteCount = 0L
// 取 segment方法 (線程安全)
fun take(): Segment {
synchronized(this) {
// 先從鏈表中獲取
next?.let { result ->
// 將緩存池next指針后移到下一個(gè)next
next = result.next
// 目標(biāo)segment中next指引為null
result.next = null
// 總大小 減去
byteCount -= Segment.SIZE
return result
}
}
return Segment() // 創(chuàng)建一個(gè)新的segment
}
// 存(回收) segment方法
fun recycle(segment: Segment) {
require(segment.next == null && segment.prev == null)
if (segment.shared) return // 共用segment,不可回收使用
synchronized(this) {
// 目前segment池大小已經(jīng)滿了起愈,不在回收
if (byteCount + Segment.SIZE > MAX_SIZE) return // Pool is full.
// 回收目標(biāo)segment只恨,增加大小、更換next指引抬虽、重置segment
byteCount += Segment.SIZE
segment.next = next
segment.limit = 0
segment.pos = segment.limit
next = segment
}
}
}