前言
Okio是一款輕量級IO框架先誉,由安卓大區(qū)最強(qiáng)王者Square公司打造,是著名網(wǎng)絡(luò)框架OkHttp的基石的烁。Okio結(jié)合了java.io和java.nio褐耳,提供阻塞IO和非阻塞IO的功能,同時(shí)也對緩存等底層結(jié)構(gòu)做了優(yōu)化渴庆,能讓你更輕快的獲得铃芦、存儲和處理數(shù)據(jù)雅镊。
這篇文章主要是對Okio框架的核心部分做詳盡的解析。由于Okio的代碼量不大且比較精巧刃滓,核心的代碼大約5000行仁烹,本文將采用自底向上的分析方法。先談下Java IO的缺點(diǎn)咧虎,并對Okio的整體框架做個(gè)介紹卓缰,再依次詳細(xì)分析Okio的各個(gè)模塊的實(shí)現(xiàn),包括緩存模塊砰诵、定時(shí)模塊等征唬,之后對阻塞IO和非阻塞IO的執(zhí)行過程,通過閱讀源碼茁彭,進(jìn)行流程分析总寒,最后做個(gè)總結(jié),總結(jié)Okio的優(yōu)化思想和設(shè)計(jì)精髓理肺。
借著這篇文章的機(jī)會摄闸,向大家介紹這款優(yōu)雅的IO框架,也想和大家探討設(shè)計(jì)的相關(guān)問題妹萨。希望通過這篇文章年枕,能讓大家對Okio有個(gè)了解,甚至樂于放棄JAVA原生的IO體系眠副,轉(zhuǎn)而使用這款I(lǐng)O框架來作為自己日常開發(fā)的工具画切。
如果你對一些基礎(chǔ)的IO模型(阻塞IO竣稽、非阻塞IO囱怕、同步IO、異步IO毫别、多路復(fù)用娃弓、BIO、NIO岛宦、AIO)不清楚的話台丛,下面是一些不錯(cuò)的補(bǔ)課資料。
Linux IO模式及 select砾肺、poll挽霉、epoll詳解
Java NIO Tutorial
Java NIO - Ron Hitchens
源碼下載地址
https://github.com/square/okio
文中部分圖片可能看不清楚,可以點(diǎn)一下看原圖变汪。
全文較長侠坎,這里先放出整體的一個(gè)目錄圖
- 前言
- 從Java IO說起
- Okio框架結(jié)構(gòu)
- 緩存結(jié)構(gòu)
- 定時(shí)機(jī)制
- 自定義字符串ByteString
- 流程分析
- 總結(jié)
從Java IO說起
大量獨(dú)立拓展的裝飾者導(dǎo)致類爆炸
用過Java IO的同學(xué)都應(yīng)該有體會,Java的流用起來很麻煩和笨重裙盾。這主要是因?yàn)镴ava IO體系采用裝飾者模式構(gòu)建和擴(kuò)展实胸,整個(gè)體系十分復(fù)雜龐大他嫡,基礎(chǔ)接口就有4個(gè)(InputStream, OutputStream, Reader, Writer),為了支持每一種組合而產(chǎn)生大量獨(dú)立拓展的子類庐完,使得子類的數(shù)目呈爆炸性增長钢属,每個(gè)類對應(yīng)一種IO需求。
下面是一段Java IO調(diào)用代碼门躯。僅僅是一個(gè)簡單需求就要寫這么一大堆代碼淆党。相信大家早已對此心懷不滿。
// Java IO
public static void writeTest(File file) {
try {
FileOutputStream fos = new FileOutputStream(file);
OutputStream os = new BufferedOutputStream(fos);
DataOutputStream dos = new DataOutputStream(os);
dos.writeUTF("write string by utf-8.\n");
dos.writeInt(1234);
dos.flush();
fos.close();
} catch (Exception e) {
e.printStackTrace();
}
}
使用Okio實(shí)現(xiàn)同樣的功能讶凉,明顯輕松得多宁否。而且Okio中的類被特意地設(shè)計(jì)為支持鏈?zhǔn)秸{(diào)用。正確的使用鏈?zhǔn)秸{(diào)用缀遍,就能產(chǎn)生簡潔慕匠、優(yōu)美、易讀的代碼。現(xiàn)在很多框架都是這樣設(shè)計(jì)猾瘸,是個(gè)流行趨勢粗仓。
// Okio
public static void writeTest(File file) {
try {
Okio.buffer(Okio.sink(file))
.writeUtf8("write string by utf-8.\n")
.writeInt(1234).close();
} catch (Exception e) {
e.printStackTrace();
}
}
阻塞IO的瓶頸
傳統(tǒng)Java socket的阻塞性質(zhì)曾經(jīng)是Java程序可伸縮性的最重要制約之一。維持一個(gè)socket連接必須單獨(dú)創(chuàng)建一個(gè)線程來管理锅铅,由此產(chǎn)生大量的線程切換,導(dǎo)致程序性能急劇降低减宣。有了非阻塞IO盐须,進(jìn)程僅需一個(gè)線程就能管理所有的連接,非阻IO是許多復(fù)雜的漆腌、高性能的程序構(gòu)建的基礎(chǔ)贼邓。
服務(wù)器端經(jīng)常會考慮到非阻塞socket通道,因?yàn)樗鼈兪雇瑫r(shí)管理很多socket 通道變得更容易闷尿。但是塑径,在客戶端使用一個(gè)或幾個(gè)非阻塞模式的socket 通道也是有益處的,例如填具,借助非阻塞的socket 通道统舀,GUI 程序可以專注于用戶請求并且同時(shí)維護(hù)與一個(gè)或多個(gè)服務(wù)器的會話。在很多程序上劳景,非阻塞模式都是有用的誉简。
為了解決這個(gè)問題Java的1.4版本加入了nio庫,引入了Buffer盟广,Channel闷串,Selector等概念,實(shí)現(xiàn)了非阻塞IO多路復(fù)用模型衡蚂。
而Okio另辟蹊徑窿克,對的Java原生流做了一個(gè)分裝骏庸,自己設(shè)計(jì)了一套非阻塞調(diào)用的機(jī)制(看門狗)。至于為什么底層采用的是原生流而不是Channel年叮,我只能對大佬的思想做一個(gè)猜測具被。因?yàn)镺kio被設(shè)計(jì)出來主要是為了做網(wǎng)絡(luò)通信,而TCP/IP本身就是流式協(xié)議只损,所以底層采用的還是Java的原生流一姿。使用看門狗而不是Selector,是為了更輕量的IO操作跃惫,更適合移動端叮叹。
Okio框架結(jié)構(gòu)
廢話不多說,先直接上類圖爆存。下圖畫出了Okio中的一些核心類(部分裝飾者類和工具類沒有畫出來)蛉顽。圖片看出清楚可以點(diǎn)一下放大。
可以看出Okio的類圖是非常簡單的先较,這也是Okio之所以輕量的原因携冤。
最基本的接口只有兩個(gè):Sink、Source闲勺,大概相當(dāng)于OutputStream和InputStream在原生接口中的地位曾棕。這兩個(gè)接口中只定義了一些最基礎(chǔ)的IO操作方法。
BufferedSink和BufferedSource接口分別繼承自Sink和Source菜循,擴(kuò)展了讀寫功能翘地,定義了各式各樣的讀和寫。
public interface BufferedSink extends Sink {
Buffer buffer();
BufferedSink write(ByteString byteString) throws IOException;
BufferedSink write(byte[] source) throws IOException;
BufferedSink write(byte[] source, int offset, int byteCount) throws IOException;
long writeAll(Source source) throws IOException;
BufferedSink write(Source source, long byteCount) throws IOException;
BufferedSink writeUtf8(String string) throws IOException;
BufferedSink writeUtf8(String string, int beginIndex, int endIndex) throws IOException;
BufferedSink writeString(String string, int beginIndex, int endIndex, Charset charset)
throws IOException;
BufferedSink writeByte(int b) throws IOException;
BufferedSink writeShort(int s) throws IOException;
BufferedSink writeShortLe(int s) throws IOException;
BufferedSink writeInt(int i) throws IOException;
BufferedSink writeIntLe(int i) throws IOException;
BufferedSink writeLong(long v) throws IOException;
BufferedSink writeLongLe(long v) throws IOException;
BufferedSink writeDecimalLong(long v) throws IOException;
BufferedSink writeHexadecimalUnsignedLong(long v) throws IOException;
@Override void flush() throws IOException;
BufferedSink emit() throws IOException;
BufferedSink emitCompleteSegments() throws IOException;
OutputStream outputStream();
}
public interface BufferedSource extends Source {
Buffer buffer();
boolean exhausted() throws IOException;
void require(long byteCount) throws IOException;
boolean request(long byteCount) throws IOException;
byte readByte() throws IOException;
short readShort() throws IOException;
short readShortLe() throws IOException;
int readInt() throws IOException;
int readIntLe() throws IOException;
long readLong() throws IOException;
long readLongLe() throws IOException;
long readDecimalLong() throws IOException;
long readHexadecimalUnsignedLong() throws IOException;
void skip(long byteCount) throws IOException;
ByteString readByteString() throws IOException;
ByteString readByteString(long byteCount) throws IOException;
int select(Options options) throws IOException;
byte[] readByteArray() throws IOException;
byte[] readByteArray(long byteCount) throws IOException;
int read(byte[] sink) throws IOException;
void readFully(byte[] sink) throws IOException;
int read(byte[] sink, int offset, int byteCount) throws IOException;
void readFully(Buffer sink, long byteCount) throws IOException;
long readAll(Sink sink) throws IOException;
String readUtf8() throws IOException;
String readUtf8(long byteCount) throws IOException;
@Nullable String readUtf8Line() throws IOException;
String readUtf8LineStrict() throws IOException;
String readUtf8LineStrict(long limit) throws IOException;
int readUtf8CodePoint() throws IOException;
String readString(Charset charset) throws IOException;
String readString(long byteCount, Charset charset) throws IOException;
long indexOf(byte b) throws IOException;
long indexOf(byte b, long fromIndex) throws IOException;
long indexOf(byte b, long fromIndex, long toIndex) throws IOException;
long indexOf(ByteString bytes) throws IOException;
long indexOf(ByteString bytes, long fromIndex) throws IOException;
long indexOfElement(ByteString targetBytes) throws IOException;
long indexOfElement(ByteString targetBytes, long fromIndex) throws IOException;
boolean rangeEquals(long offset, ByteString bytes) throws IOException;
boolean rangeEquals(long offset, ByteString bytes, int bytesOffset, int byteCount)
throws IOException;
InputStream inputStream();
}
Buffer實(shí)現(xiàn)了BufferedSink和BufferedSource癌幕,是個(gè)集大成者衙耕,同時(shí)還增加了一些處理數(shù)據(jù)的操作,是一個(gè)可讀序芦、可寫臭杰、可處理數(shù)據(jù)的緩存類粤咪。Buffer的數(shù)據(jù)操作依賴ByteString類谚中,這個(gè)類配合著Buffer進(jìn)行數(shù)據(jù)處理。由于篇幅限制寥枝,下面僅貼出Buffer中一些新增方法的聲明宪塔,具體實(shí)現(xiàn)大家可自行查看源碼。
public final class Buffer implements BufferedSource, BufferedSink, Cloneable {
@Nullable Segment head;
long size;
public long size();
public Buffer copyTo(OutputStream out) throws IOException;
public Buffer copyTo(OutputStream out, long offset, long byteCount) throws IOException;
public Buffer copyTo(Buffer out, long offset, long byteCount);
public Buffer writeTo(OutputStream out) throws IOException;
public Buffer writeTo(OutputStream out, long byteCount) throws IOException;
public Buffer readFrom(InputStream in) throws IOException;
public Buffer readFrom(InputStream in, long byteCount) throws IOException;
private void readFrom(InputStream in, long byteCount, boolean forever) throws IOException;
public byte getByte(long pos);
int selectPrefix(Options options);
public void clear();
Segment writableSegment(int minimumCapacity);
List<Integer> segmentSizes();
public ByteString md5();
public ByteString sha1();
public ByteString sha256();
public ByteString sha512() ;
private ByteString digest(String algorithm);
public ByteString hmacSha1(ByteString key);
public ByteString hmacSha256(ByteString key);
public ByteString hmacSha512(ByteString key);
private ByteString hmac(String algorithm, ByteString key);
public ByteString snapshot();
public ByteString snapshot(int byteCount);
}
RealBufferedSink和RealBufferedSource是BufferedSink和BufferedSource的實(shí)現(xiàn)類囊拜,實(shí)現(xiàn)了接口的所有方法某筐,同時(shí)內(nèi)部擁有一個(gè)Buffer對象,是真正進(jìn)行的緩沖讀寫的角色冠跷。
Okio類相當(dāng)于一個(gè)簡單工廠南誊,對外暴露接口身诺,可以產(chǎn)生各式各樣的Sink和Source。
Buffer的存儲容器用的不是數(shù)組抄囚,而是Segment類對象構(gòu)成的循環(huán)鏈表霉赡,Segment用了享元模式,有SegmentPool對Segment進(jìn)行管理幔托。
定時(shí)模塊主要由Timeout和其子類AnsycTimeout類組成穴亏。
緩存結(jié)構(gòu)
緩存是Okio中最重要的部分,很多優(yōu)化思想都體現(xiàn)在這里重挑,非常值得學(xué)習(xí)嗓化。Okio的緩存設(shè)計(jì)在cpu利用率和內(nèi)存利用率之間做了權(quán)衡,即時(shí)間與空間的權(quán)衡谬哀,精巧而高效刺覆。
緩存模塊主要由Buffer,Segment史煎,SegmentPool這三個(gè)類構(gòu)成隅津,三者之間的關(guān)系如下圖所示。Buffer內(nèi)實(shí)際存儲數(shù)據(jù)的容器是一條由Segment構(gòu)成的的循環(huán)鏈表劲室。暫時(shí)不用的Segment由SegmentPool通過單鏈表保存伦仍,防止頻繁GC,避免內(nèi)存抖動很洋,增加資源的重復(fù)利用充蓝,提高效率。
Segment是存儲數(shù)據(jù)的基本單元喉磁,也是鏈表結(jié)構(gòu)中的一個(gè)節(jié)點(diǎn)谓苟,其源碼如下。
final class Segment {
static final int SIZE = 8192;
static final int SHARE_MINIMUM = 1024;
final byte[] data;
int pos;
int limit;
boolean shared;
boolean owner;
Segment next;
Segment prev;
Segment() {
this.data = new byte[SIZE];
this.owner = true;
this.shared = false;
}
Segment(Segment shareFrom) {
this(shareFrom.data, shareFrom.pos, shareFrom.limit);
shareFrom.shared = true;
}
Segment(byte[] data, int pos, int limit) {
this.data = data;
this.pos = pos;
this.limit = limit;
this.owner = false;
this.shared = true;
}
public @Nullable Segment pop() {
Segment result = next != this ? next : null;
prev.next = next;
next.prev = prev;
next = null;
prev = null;
return result;
}
public Segment push(Segment segment) {
segment.prev = this;
segment.next = next;
next.prev = segment;
next = segment;
return segment;
}
public Segment split(int byteCount) {
if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
Segment prefix;
if (byteCount >= SHARE_MINIMUM) {
prefix = new Segment(this);
} else {
prefix = SegmentPool.take();
System.arraycopy(data, pos, prefix.data, 0, byteCount);
}
prefix.limit = prefix.pos + byteCount;
pos += byteCount;
prev.push(prefix);
return prefix;
}
public void compact() {
if (prev == this) throw new IllegalStateException();
if (!prev.owner) return; // Cannot compact: prev isn't writable.
int byteCount = limit - pos;
int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
writeTo(prev, byteCount);
pop();
SegmentPool.recycle(this);
}
public void writeTo(Segment sink, int byteCount) {
if (!sink.owner) throw new IllegalArgumentException();
if (sink.limit + byteCount > SIZE) {
// We can't fit byteCount bytes at the sink's current position. Shift sink first.
if (sink.shared) throw new IllegalArgumentException();
if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
sink.limit -= sink.pos;
sink.pos = 0;
}
System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
sink.limit += byteCount;
pos += byteCount;
}
}
一個(gè)Segment可以分為三個(gè)部分协怒,用pos和limit區(qū)分涝焙,如下圖所示。紅色部分的數(shù)據(jù)已經(jīng)被讀過了孕暇,為失效數(shù)據(jù)仑撞;綠色部分是剛寫入的數(shù)據(jù),還沒有被讀過妖滔;黃色部分還沒有被使用隧哮,可以寫入新數(shù)據(jù)。這個(gè)設(shè)計(jì)模仿了java.nio中的緩存設(shè)計(jì)座舍,但卻更加巧妙沮翔。java.nio中緩存讀寫操作需要調(diào)用很多額外的操作方法,如從寫切換到讀需要調(diào)用flip曲秉,客戶需要對緩存的結(jié)構(gòu)非常熟悉才能使用采蚀。而Okio的這種設(shè)計(jì)對用戶是透明的疲牵,用戶不需要清楚底層結(jié)構(gòu)也能使用。
Segment提供的一些操作:
public Segment push(Segment segment)
節(jié)點(diǎn)插入榆鼠。在調(diào)用該方法的節(jié)點(diǎn)后插入segment節(jié)點(diǎn)瑰步,并返回新插入的節(jié)點(diǎn)。public @Nullable Segment pop()
節(jié)點(diǎn)刪除璧眠。在雙向鏈表中刪除調(diào)用該方法的節(jié)點(diǎn)缩焦,并返回后繼節(jié)點(diǎn)。若該節(jié)點(diǎn)為頭節(jié)點(diǎn)(此時(shí)只剩頭節(jié)點(diǎn)责静,鏈表為空)袁滥,則返回null。public Segment split(int byteCount)
節(jié)點(diǎn)分裂灾螃。將一個(gè)節(jié)點(diǎn)分裂成兩個(gè)题翻,第一個(gè)節(jié)點(diǎn)獲得原節(jié)點(diǎn)[pos, pos+byteCount)區(qū)間的數(shù)據(jù),第二個(gè)節(jié)點(diǎn)獲得[pos+byteCount, limit)的數(shù)據(jù)腰鬼,返回第一個(gè)節(jié)點(diǎn)嵌赠。如下圖所示
注意,這里有技巧熄赡。由于第一個(gè)節(jié)點(diǎn)是新產(chǎn)生的姜挺,如果第一個(gè)節(jié)點(diǎn)數(shù)據(jù)長度大于SHARE_MINIMUM(1024),那么就調(diào)用拷貝構(gòu)造函數(shù)創(chuàng)造新節(jié)點(diǎn)彼硫,拷貝構(gòu)造函數(shù)做的是淺拷貝炊豪,即兩個(gè)節(jié)點(diǎn)都持有同一個(gè)data數(shù)組的引用,這樣就省去了開辟內(nèi)存及復(fù)制內(nèi)存的開銷拧篮。若小于词渤,則從SegmentPool中取出一個(gè)節(jié)點(diǎn),并做真實(shí)的數(shù)據(jù)拷貝串绩。Avoid short shared segments. These are bad for performance because they are readonly and may lead to long chains of short segments.(這句話是大佬的原文缺虐,怕翻譯的不好沒有翻譯) 可以看出,這是一個(gè)權(quán)衡性的設(shè)計(jì)礁凡。
public void compact()
節(jié)點(diǎn)合并高氮。當(dāng)前驅(qū)節(jié)點(diǎn)沒有被共享時(shí),若兩個(gè)節(jié)點(diǎn)可以合并(兩個(gè)節(jié)點(diǎn)的數(shù)據(jù)長度小于SIZE(8192))把篓,則將該節(jié)點(diǎn)的數(shù)據(jù)寫入前驅(qū)節(jié)點(diǎn)纫溃,并回收該節(jié)點(diǎn)。public void writeTo(Segment sink, int byteCount)
將sink節(jié)點(diǎn)的前byteCount個(gè)字節(jié)寫入到調(diào)用該方法的節(jié)點(diǎn)韧掩,當(dāng)該節(jié)點(diǎn)的尾部長度不足byteCount時(shí),會將該節(jié)點(diǎn)的數(shù)據(jù)字段前移pos位窖铡,與首部對齊疗锐。
SegmentPool非常簡單坊谁,其內(nèi)部維持一條單鏈表保存暫時(shí)不用的Segment,緩存池的大小限制為64KB滑臊,所以最多能保存8個(gè)Segment口芍。SegmentPool提供兩個(gè)同步方法,分別用來存取Segment雇卷。
final class SegmentPool {
static final long MAX_SIZE = 64 * 1024; // 64 KiB.
static @Nullable Segment next;
static long byteCount;
private SegmentPool() {
}
static Segment take() {
synchronized (SegmentPool.class) {
if (next != null) {
Segment result = next;
next = result.next;
result.next = null;
byteCount -= Segment.SIZE;
return result;
}
}
return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
}
static void recycle(Segment segment) {
if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
if (segment.shared) return; // This segment cannot be recycled.
synchronized (SegmentPool.class) {
if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
byteCount += Segment.SIZE;
segment.next = next;
segment.pos = segment.limit = 0;
next = segment;
}
}
}
真正做Segment分裂鬓椭、合并的地方是Buffer類中的write(Buffer source, long byteCount)方法,該方法把傳入的source Buffer的前byteCount個(gè)字節(jié)寫到調(diào)用該方法的Buffer中去关划。由于兩個(gè)Buffer里的數(shù)據(jù)結(jié)構(gòu)都是循環(huán)鏈表小染,所以寫入過程是將source鏈表的節(jié)點(diǎn)按從頭到尾的順序一個(gè)個(gè)取下來,然后插入到被寫入到鏈表贮折,并看看新插入的節(jié)點(diǎn)能否和前一個(gè)節(jié)點(diǎn)合并裤翩。如果要寫的只是一個(gè)Segment的部分?jǐn)?shù)據(jù),那么這個(gè)Segment進(jìn)行分裂调榄,把要寫的數(shù)據(jù)分裂出來踊赠。
public final class Buffer implements BufferedSource, BufferedSink, Cloneable {
// ...
@Override
public void write(Buffer source, long byteCount) {
if (source == null) throw new IllegalArgumentException("source == null");
if (source == this) throw new IllegalArgumentException("source == this");
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
// Is a prefix of the source's head segment all that we need to move?
if (byteCount < (source.head.limit - source.head.pos)) {
Segment tail = head != null ? head.prev : null;
if (tail != null && tail.owner
&& (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {
// Our existing segments are sufficient. Move bytes from source's head to our tail.
source.head.writeTo(tail, (int) byteCount);
source.size -= byteCount;
size += byteCount;
return;
} else {
source.head = source.head.split((int) byteCount);
}
}
// Remove the source's head segment and append it to our tail.
Segment segmentToMove = source.head;
long movedByteCount = segmentToMove.limit - segmentToMove.pos;
source.head = segmentToMove.pop();
if (head == null) {
head = segmentToMove;
head.next = head.prev = head;
} else {
Segment tail = head.prev;
tail = tail.push(segmentToMove);
tail.compact();
}
source.size -= movedByteCount;
size += movedByteCount;
byteCount -= movedByteCount;
}
}
}
好了,到這Okio的緩存結(jié)構(gòu)已經(jīng)看得很清楚了每庆。
定時(shí)機(jī)制
基類Timeout
Okio中使用Timeout類來控制I/O的定時(shí)操作筐带。該定時(shí)機(jī)制使用了時(shí)間段和絕對時(shí)間點(diǎn)兩種計(jì)算定時(shí)的方式,可以選擇使用其中一種缤灵。下面我們看其源碼
public class Timeout {
private boolean hasDeadline;
private long deadlineNanoTime;
private long timeoutNanos;
// ...
public void throwIfReached() throws IOException {
if (Thread.interrupted()) {
throw new InterruptedIOException("thread interrupted");
}
if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
throw new InterruptedIOException("deadline reached");
}
}
public final void waitUntilNotified(Object monitor) throws InterruptedIOException {
try {
boolean hasDeadline = hasDeadline();
long timeoutNanos = timeoutNanos();
if (!hasDeadline && timeoutNanos == 0L) {
monitor.wait(); // There is no timeout: wait forever.
return;
}
// Compute how long we'll wait.
long waitNanos;
long start = System.nanoTime();
if (hasDeadline && timeoutNanos != 0) {
long deadlineNanos = deadlineNanoTime() - start;
waitNanos = Math.min(timeoutNanos, deadlineNanos);
} else if (hasDeadline) {
waitNanos = deadlineNanoTime() - start;
} else {
waitNanos = timeoutNanos;
}
// Attempt to wait that long. This will break out early if the monitor is notified.
long elapsedNanos = 0L;
if (waitNanos > 0L) {
long waitMillis = waitNanos / 1000000L;
monitor.wait(waitMillis, (int) (waitNanos - waitMillis * 1000000L));
elapsedNanos = System.nanoTime() - start;
}
// Throw if the timeout elapsed before the monitor was notified.
if (elapsedNanos >= waitNanos) {
throw new InterruptedIOException("timeout");
}
} catch (InterruptedException e) {
throw new InterruptedIOException("interrupted");
}
}
}
可以看出Timeout類處理超時(shí)的機(jī)制比較簡單烫堤,首先是有3個(gè)實(shí)例變量:
private boolean hasDeadline; // 是否設(shè)置了超時(shí)的時(shí)間點(diǎn)
private long deadlineNanoTime; // 超時(shí)時(shí)間點(diǎn)
private long timeoutNanos; // 超時(shí)時(shí)間段
然后有一堆getter和setter方法,沒有什么好說的凤价,代碼中為了簡潔也沒有列出來鸽斟。而針對定時(shí)處理的方法有兩個(gè):
public void throwIfReached() throws IOException
如果當(dāng)前線程被中斷了或者定時(shí)時(shí)間點(diǎn)到了,拋出中斷異常利诺。public final void waitUntilNotified(Object monitor) throws InterruptedIOException
首先是處理沒有等待時(shí)長的特殊情況富蓄,即無限期等待,直到有人喚醒慢逾。如果設(shè)置了等待時(shí)長立倍,則計(jì)算時(shí)長以后進(jìn)入等待狀態(tài),并等待一定時(shí)間侣滩。定時(shí)時(shí)間到了之后拋出中斷異常口注。
異步事件定時(shí)類AsyncTimeout
真正實(shí)現(xiàn)異步事件定時(shí)的類是AsyncTimeout類,該類繼承自TimeOut類君珠,主要的邏輯如下圖所示寝志。類中維護(hù)著一條由AsyncTimeout對象構(gòu)成的異步事件最小剩余時(shí)間優(yōu)先隊(duì)列(由單列表實(shí)現(xiàn)),即最先超時(shí)的節(jié)點(diǎn)在隊(duì)首。類中定義了一個(gè)內(nèi)部類WatchDog(看門狗)材部,看門狗將作為守護(hù)線程在后臺運(yùn)行毫缆,不斷取出隊(duì)首元素并判斷是否到達(dá)定時(shí)時(shí)間,若到達(dá)定時(shí)時(shí)間則執(zhí)行該AsyncTimeout節(jié)點(diǎn)對象的timedOut方法乐导。timedOut方法為空方法苦丁,需要在繼承的子類中重寫。
AsyncTimeout類有兩個(gè)方法用于包裝輸入和輸出物臂,source和sink旺拉,這兩個(gè)方法都返回代理對象。通過源碼可以看出source和sink方法都會先調(diào)用enter方法將異步事件放入隊(duì)列棵磷,再執(zhí)行真實(shí)對象的輸入蛾狗、輸出方法,當(dāng)然若出現(xiàn)異吃蟊荆或者在超時(shí)之前讀寫完成將調(diào)用exit函數(shù)進(jìn)入異常處理淘太。
public class AsyncTimeout extends Timeout {
// ...
static @Nullable AsyncTimeout head;
private boolean inQueue;
private @Nullable AsyncTimeout next;
private long timeoutAt;
protected void timedOut() {
}
public final Source source(final Source source) {
return new Source() {
@Override
public long read(Buffer sink, long byteCount) throws IOException {
boolean throwOnTimeout = false;
enter();
try {
long result = source.read(sink, byteCount);
throwOnTimeout = true;
return result;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
@Override
public void close() throws IOException {
boolean throwOnTimeout = false;
try {
source.close();
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
@Override
public Timeout timeout() {
return AsyncTimeout.this;
}
// ...
};
}
public final Sink sink(final Sink sink) {
return new Sink() {
@Override
public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0L) {
// Count how many bytes to write. This loop guarantees we split on a segment boundary.
long toWrite = 0L;
for (Segment s = source.head; toWrite < TIMEOUT_WRITE_SIZE; s = s.next) {
int segmentSize = s.limit - s.pos;
toWrite += segmentSize;
if (toWrite >= byteCount) {
toWrite = byteCount;
break;
}
}
// Emit one write. Only this section is subject to the timeout.
boolean throwOnTimeout = false;
enter();
try {
sink.write(source, toWrite);
byteCount -= toWrite;
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
}
@Override
public void flush() throws IOException {
boolean throwOnTimeout = false;
enter();
try {
sink.flush();
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
@Override
public void close() throws IOException {
boolean throwOnTimeout = false;
enter();
try {
sink.close();
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
@Override
public Timeout timeout() {
return AsyncTimeout.this;
}
// ...
};
}
}
enter方法將節(jié)點(diǎn)放入異步事件隊(duì)列,而真正執(zhí)行放入隊(duì)列的操作的是scheduleTimeout(AsyncTimeout node, long timeoutNanos, boolean hasDeadline)方法规丽。該方法為同步方法蒲牧,若隊(duì)列為空就創(chuàng)建隊(duì)列,并創(chuàng)建守護(hù)線程看門狗赌莺,之后計(jì)算事件被觸發(fā)的剩余時(shí)間冰抢,并將事件放入隊(duì)列,如果新放入隊(duì)列的元素是在隊(duì)首艘狭,就喚醒看門狗挎扰,檢查該事件是否超時(shí)。
public class AsyncTimeout extends Timeout {
// ...
public final void enter() {
if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
long timeoutNanos = timeoutNanos();
boolean hasDeadline = hasDeadline();
if (timeoutNanos == 0 && !hasDeadline) {
return; // No timeout and no deadline? Don't bother with the queue.
}
inQueue = true;
scheduleTimeout(this, timeoutNanos, hasDeadline);
}
private static synchronized void scheduleTimeout(
AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
// Start the watchdog thread and create the head node when the first timeout is scheduled.
if (head == null) {
head = new AsyncTimeout();
new Watchdog().start();
}
long now = System.nanoTime();
if (timeoutNanos != 0 && hasDeadline) {
node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
} else if (timeoutNanos != 0) {
node.timeoutAt = now + timeoutNanos;
} else if (hasDeadline) {
node.timeoutAt = node.deadlineNanoTime();
} else {
throw new AssertionError();
}
// Insert the node in sorted order.
long remainingNanos = node.remainingNanos(now);
for (AsyncTimeout prev = head; true; prev = prev.next) {
if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
node.next = prev.next;
prev.next = node;
if (prev == head) {
AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
}
break;
}
}
}
private long remainingNanos(long now) {
return timeoutAt - now;
}
}
異常處理涉及以下幾個(gè)方法巢音,具體就是將事件從隊(duì)列中移除并拋出合適的異常遵倦。
public class AsyncTimeout extends Timeout {
// ...
final void exit(boolean throwOnTimeout) throws IOException {
boolean timedOut = exit();
if (timedOut && throwOnTimeout) throw newTimeoutException(null);
}
final IOException exit(IOException cause) throws IOException {
if (!exit()) return cause;
return newTimeoutException(cause);
}
public final boolean exit() {
if (!inQueue) return false;
inQueue = false;
return cancelScheduledTimeout(this);
}
// Returns true if the timeout occurred.
private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
// Remove the node from the linked list.
for (AsyncTimeout prev = head; prev != null; prev = prev.next) {
if (prev.next == node) {
prev.next = node.next;
node.next = null;
return false;
}
}
// The node wasn't found in the linked list: it must have timed out!
return true;
}
protected IOException newTimeoutException(@Nullable IOException cause) {
InterruptedIOException e = new InterruptedIOException("timeout");
if (cause != null) {
e.initCause(cause);
}
return e;
}
}
看門狗調(diào)用同步方法每次從隊(duì)列中取出隊(duì)首元素,若發(fā)現(xiàn)隊(duì)列為空就休眠IDLE_TIMEOUT_MILLIS(1分鐘)官撼,休眠完成后梧躺,若還是為空則線程退出。取出后檢查隊(duì)首元素的定時(shí)時(shí)間傲绣,發(fā)現(xiàn)還沒到掠哥,則休眠剩余時(shí)間;發(fā)現(xiàn)已超時(shí)秃诵,則回掉隊(duì)首元素的timedOut()方法续搀,并將該元素彈出隊(duì)列〔ぞ唬看門狗設(shè)計(jì)的非常高效禁舷,沒有任務(wù)的時(shí)候處于休眠或退出狀態(tài)彪杉。
public class AsyncTimeout extends Timeout {
private static final long IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60);
private static final long IDLE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(IDLE_TIMEOUT_MILLIS);
private static final class Watchdog extends Thread {
Watchdog() {
super("Okio Watchdog");
setDaemon(true);
}
public void run() {
while (true) {
try {
AsyncTimeout timedOut;
synchronized (AsyncTimeout.class) {
timedOut = awaitTimeout();
// Didn't find a node to interrupt. Try again.
if (timedOut == null) continue;
// The queue is completely empty. Let this thread exit and let another watchdog thread
// get created on the next call to scheduleTimeout().
if (timedOut == head) {
head = null;
return;
}
}
// Close the timed out node.
timedOut.timedOut();
} catch (InterruptedException ignored) {
}
}
}
}
static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException {
// Get the next eligible node.
AsyncTimeout node = head.next;
// The queue is empty. Wait until either something is enqueued or the idle timeout elapses.
if (node == null) {
long startNanos = System.nanoTime();
AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
? head // The idle timeout elapsed.
: null; // The situation has changed.
}
long waitNanos = node.remainingNanos(System.nanoTime());
// The head of the queue hasn't timed out yet. Await that.
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
return null;
}
// The head of the queue has timed out. Remove it.
head.next = node.next;
node.next = null;
return node;
}
}
自定義字符串ByteString
ByteString是自定義的字節(jié)字符串類,此類被設(shè)計(jì)為不可變的(創(chuàng)建后之后不能修改其數(shù)據(jù))榛了,和String類似在讶。當(dāng)然煞抬,Java語言可沒有不可變標(biāo)記關(guān)鍵字霜大,如果想要實(shí)現(xiàn)一個(gè)不可變的對象,還需要一些操作革答。
- 不要提供任何會修改對象狀態(tài)的方法
- 保證類不會被擴(kuò)展
- 使所有的域都是final的
- 使所有的域都是private的
- 確保對于任何可變組件的互斥訪問
不可變的對象有許多的好處战坤,首先本質(zhì)是線程安全的,不要求同步處理残拐,也就是沒有鎖之類的性能問題途茫,而且可以被自由的共享內(nèi)部信息,當(dāng)然壞處就是需要?jiǎng)?chuàng)建大量的類的對象溪食。
ByteString不僅是不可變的囊卜,同時(shí)在內(nèi)部有兩個(gè)filed,分別是byte[]數(shù)據(jù)错沃,以及String的數(shù)據(jù)栅组,這樣能夠讓這個(gè)類在Byte和String轉(zhuǎn)換上基本沒有開銷,同樣的也需要保存兩份引用枢析,這是明顯的空間換時(shí)間的方式玉掸,為了性能Okio做了很多的事情。但是這個(gè)String前面有 transient 關(guān)鍵字標(biāo)記醒叁,也就是說不會進(jìn)入序列化和反序列化司浪,反序列化的過程會進(jìn)行懶加載,節(jié)省開銷把沼。
ByteString提供了哪些功能啊易,我們看一下方法就一目了然。
public class ByteString implements Serializable, Comparable<ByteString> {
final byte[] data;
transient int hashCode; // Lazily computed; 0 if unknown.
transient String utf8; // Lazily computed.
ByteString(byte[] data);
public static ByteString of(byte... data);
public static ByteString of(byte[] data, int offset, int byteCount);
public static ByteString of(ByteBuffer data);
public static ByteString encodeUtf8(String s);
public static ByteString encodeString(String s, Charset charset);
public String utf8();
public String string(Charset charset);
public String base64();
public ByteString md5();
public ByteString sha1();
public ByteString sha256();
public ByteString sha512();
private ByteString digest(String algorithm);
public ByteString hmacSha1(ByteString key);
public ByteString hmacSha256(ByteString key);
public ByteString hmacSha512(ByteString key);
private ByteString hmac(String algorithm, ByteString key);
public String base64Url();
public static @Nullable ByteString decodeBase64(String base64);
public String hex();
public static ByteString decodeHex(String hex);
private static int decodeHexDigit(char c);
public static ByteString read(InputStream in, int byteCount) throws IOException;
public ByteString toAsciiLowercase();
public ByteString toAsciiUppercase();
public ByteString substring(int beginIndex);
public ByteString substring(int beginIndex, int endIndex);
public int size();
public byte[] toByteArray();
byte[] internalArray();
public ByteBuffer asByteBuffer();
public void write(OutputStream out) throws IOException;
void write(Buffer buffer);
public boolean rangeEquals(int offset, ByteString other, int otherOffset, int byteCount);
public boolean rangeEquals(int offset, byte[] other, int otherOffset, int byteCount);
public final boolean startsWith(ByteString prefix);
public final boolean startsWith(byte[] prefix);
public final boolean endsWith(ByteString suffix);
public final boolean endsWith(byte[] suffix);
public final int indexOf(ByteString other);
public final int indexOf(ByteString other, int fromIndex);
public final int indexOf(byte[] other);
public int indexOf(byte[] other, int fromIndex);
public final int lastIndexOf(ByteString other);
public final int lastIndexOf(ByteString other, int fromIndex);
public final int lastIndexOf(byte[] other);
public int lastIndexOf(byte[] other, int fromIndex);
@Override public boolean equals(Object o);
@Override public int hashCode();
@Override public int compareTo(ByteString byteString);
@Override public String toString();
static int codePointIndexToCharIndex(String s, int codePointCount);
private void readObject(ObjectInputStream in) throws IOException;
private void writeObject(ObjectOutputStream out) throws IOException;
}
流程分析
阻塞調(diào)用
讓我們再回過頭來看看文章開始的那個(gè)同步調(diào)用是個(gè)怎樣的流程饮睬,代碼如下租谈。
Okio.buffer(Okio.sink(file))
.writeUtf8("write string by utf-8.\n")
.writeInt(1234).close();
先看看Okio.sink(file)。
// Okio.java
public static Sink sink(File file) throws FileNotFoundException {
if (file == null) throw new IllegalArgumentException("file == null");
return sink(new FileOutputStream(file));
}
public static Sink sink(OutputStream out) {
return sink(out, new Timeout());
}
private static Sink sink(final OutputStream out, final Timeout timeout) {
if (out == null) throw new IllegalArgumentException("out == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Sink() {
@Override public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
timeout.throwIfReached();
Segment head = source.head;
int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
out.write(head.data, head.pos, toCopy);
head.pos += toCopy;
byteCount -= toCopy;
source.size -= toCopy;
if (head.pos == head.limit) {
source.head = head.pop();
SegmentPool.recycle(head);
}
}
}
@Override public void flush() throws IOException {
out.flush();
}
@Override public void close() throws IOException {
out.close();
}
@Override public Timeout timeout() {
return timeout;
}
@Override public String toString() {
return "sink(" + out + ")";
}
};
}
從源碼可以看出Okio.sink(file)最終會調(diào)用Okio.sink(final OutputStream in, final Timeout timeout)方法续捂。傳入的OutputStream對象是new出來的FileOutputStream對象垦垂,到這里我們可以看出,Sink只是包裹了Java原生流牙瓢,可以看成原生流的代理劫拗,包裝了寫操作增加了一些處理,最終底層的寫操作將由FileOutputStream完成矾克。傳入的Timeout對象是通過默認(rèn)構(gòu)造函數(shù)new出來的Timeout對象页慷,沒有設(shè)置延時(shí)。
調(diào)用最終返回一個(gè)Sink對象,這個(gè)對象重寫了write(Buffer source, long byteCount)方法酒繁,是為了RealBufferSink作準(zhǔn)備滓彰,該方法將Buffer里的byteCount個(gè)字節(jié)寫入到Java原生流中,寫操作會改變Buffer的size以及涉及到的Segment的狀態(tài)州袒。需要注意的是揭绑,若timeout設(shè)置了定時(shí),則將延遲設(shè)置的時(shí)間郎哭,直到超時(shí)后才寫數(shù)據(jù)他匪,這是一個(gè)阻塞I/O。返回的Sink對象也重寫了close()夸研,flush()等方法邦蜜,實(shí)際上都是對Java原生流的操作。
得到Sink對象后將進(jìn)入Okio.buffer(Sink sink)方法亥至。
// Okio.java
public static BufferedSink buffer(Sink sink) {
return new RealBufferedSink(sink);
}
這個(gè)方法非常簡單悼沈,僅僅是new了一個(gè)RealBufferedSink對象就返回了。構(gòu)造把Sink對象傳進(jìn)去了姐扮,RealBufferedSink內(nèi)部持有傳入的Sink絮供,也可以看成是Sink的代理,各種操作都是在Sink上操作溶握。RealBufferedSink內(nèi)部也持有一個(gè)Buffer對象杯缺,作為緩存數(shù)據(jù)的容器。
之后調(diào)用就到了RealBufferedSink.writeUtf8(String string)方法睡榆。
// RealBufferedSink.java
@Override public BufferedSink writeUtf8(String string) throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.writeUtf8(string);
return emitCompleteSegments();
}
// Buffer.java
@Override public Buffer writeUtf8(String string) {
return writeUtf8(string, 0, string.length());
}
@Override public Buffer writeUtf8(String string, int beginIndex, int endIndex) {
if (string == null) throw new IllegalArgumentException("string == null");
if (beginIndex < 0) throw new IllegalArgumentException("beginIndex < 0: " + beginIndex);
if (endIndex < beginIndex) {
throw new IllegalArgumentException("endIndex < beginIndex: " + endIndex + " < " + beginIndex);
}
if (endIndex > string.length()) {
throw new IllegalArgumentException(
"endIndex > string.length: " + endIndex + " > " + string.length());
}
// Transcode a UTF-16 Java String to UTF-8 bytes.
for (int i = beginIndex; i < endIndex;) {
int c = string.charAt(i);
if (c < 0x80) {
Segment tail = writableSegment(1);
byte[] data = tail.data;
int segmentOffset = tail.limit - i;
int runLimit = Math.min(endIndex, Segment.SIZE - segmentOffset);
// Emit a 7-bit character with 1 byte.
data[segmentOffset + i++] = (byte) c; // 0xxxxxxx
// Fast-path contiguous runs of ASCII characters. This is ugly, but yields a ~4x performance
// improvement over independent calls to writeByte().
while (i < runLimit) {
c = string.charAt(i);
if (c >= 0x80) break;
data[segmentOffset + i++] = (byte) c; // 0xxxxxxx
}
int runSize = i + segmentOffset - tail.limit; // Equivalent to i - (previous i).
tail.limit += runSize;
size += runSize;
} else if (c < 0x800) {
// Emit a 11-bit character with 2 bytes.
writeByte(c >> 6 | 0xc0); // 110xxxxx
writeByte(c & 0x3f | 0x80); // 10xxxxxx
i++;
} else if (c < 0xd800 || c > 0xdfff) {
// Emit a 16-bit character with 3 bytes.
writeByte(c >> 12 | 0xe0); // 1110xxxx
writeByte(c >> 6 & 0x3f | 0x80); // 10xxxxxx
writeByte(c & 0x3f | 0x80); // 10xxxxxx
i++;
} else {
// c is a surrogate. Make sure it is a high surrogate & that its successor is a low
// surrogate. If not, the UTF-16 is invalid, in which case we emit a replacement character.
int low = i + 1 < endIndex ? string.charAt(i + 1) : 0;
if (c > 0xdbff || low < 0xdc00 || low > 0xdfff) {
writeByte('?');
i++;
continue;
}
// UTF-16 high surrogate: 110110xxxxxxxxxx (10 bits)
// UTF-16 low surrogate: 110111yyyyyyyyyy (10 bits)
// Unicode code point: 00010000000000000000 + xxxxxxxxxxyyyyyyyyyy (21 bits)
int codePoint = 0x010000 + ((c & ~0xd800) << 10 | low & ~0xdc00);
// Emit a 21-bit character with 4 bytes.
writeByte(codePoint >> 18 | 0xf0); // 11110xxx
writeByte(codePoint >> 12 & 0x3f | 0x80); // 10xxxxxx
writeByte(codePoint >> 6 & 0x3f | 0x80); // 10xxyyyy
writeByte(codePoint & 0x3f | 0x80); // 10yyyyyy
i += 2;
}
}
return this;
}
Segment writableSegment(int minimumCapacity) {
if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();
if (head == null) {
head = SegmentPool.take(); // Acquire a first segment.
return head.next = head.prev = head;
}
Segment tail = head.prev;
if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
}
return tail;
}
RealBufferedSink的writeUtf8方法調(diào)用其內(nèi)部Buffer的writeUtf8方法萍肆,最終String以“utf-8”編碼寫入了Buffer里。"utf-8"是一種變長前綴碼胀屿,相當(dāng)于在Unicode的基礎(chǔ)上做了個(gè)信源壓縮塘揣。
注意,在每次真實(shí)的寫之前會調(diào)用writableSegment(int minimumCapacity)方法宿崭,以獲得足夠?qū)懭氪笮〉娜萜鳌?/p>
寫操作完成后將調(diào)用emitCompleteSegments()方法亲铡,我們繼續(xù)跟進(jìn)去看一看。
// RealBufferedSink.java
@Override public BufferedSink emitCompleteSegments() throws IOException {
if (closed) throw new IllegalStateException("closed");
long byteCount = buffer.completeSegmentByteCount();
if (byteCount > 0) sink.write(buffer, byteCount);
return this;
}
// Buffer.java
public long completeSegmentByteCount() {
long result = size;
if (result == 0) return 0;
// Omit the tail if it's still writable.
Segment tail = head.prev;
if (tail.limit < Segment.SIZE && tail.owner) {
result -= tail.limit - tail.pos;
}
return result;
}
這段代碼的邏輯就是寫操作完成后計(jì)算Buffer中可寫的數(shù)據(jù)量葡兑,由于最后一個(gè)Segment有可能不滿奖蔓,所以要特殊處理下。然后根據(jù)計(jì)算出的字節(jié)數(shù)執(zhí)行Sink的寫操作讹堤,將數(shù)據(jù)寫入FileOutputStream中吆鹤。
RealBufferSink確實(shí)比Sink多了緩存的作用,先將數(shù)據(jù)寫到Buffer里洲守,寫操作完成后再把Buffer中緩存的數(shù)據(jù)一把寫到流中疑务。
至此將String寫入流中已經(jīng)完畢了沾凄。寫入Int的過程非常類似沒有太多好說的。
// RealBufferedSink.java
@Override public BufferedSink writeInt(int i) throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.writeInt(i);
return emitCompleteSegments();
}
// Buffer.java
@Override public Buffer writeInt(int i) {
Segment tail = writableSegment(4);
byte[] data = tail.data;
int limit = tail.limit;
data[limit++] = (byte) ((i >>> 24) & 0xff);
data[limit++] = (byte) ((i >>> 16) & 0xff);
data[limit++] = (byte) ((i >>> 8) & 0xff);
data[limit++] = (byte) (i & 0xff);
tail.limit = limit;
size += 4;
return this;
}
最后是調(diào)用RealBufferedSink.close方法關(guān)閉流知允。
// RealBufferedSink.java
@Override public void close() throws IOException {
if (closed) return;
Throwable thrown = null;
try {
if (buffer.size > 0) {
sink.write(buffer, buffer.size);
}
} catch (Throwable e) {
thrown = e;
}
try {
sink.close();
} catch (Throwable e) {
if (thrown == null) thrown = e;
}
closed = true;
if (thrown != null) Util.sneakyRethrow(thrown);
}
close方法首先會檢查Buffer中是否還有未寫入的數(shù)據(jù)撒蟀,若有則一把寫入到流里,不這樣的話就內(nèi)存泄漏了温鸽,Buffer中的數(shù)據(jù)永遠(yuǎn)得不到處理保屯,沒用的Segment也不會回收。最后將執(zhí)行Sink的關(guān)閉操作嗤朴,其實(shí)就是關(guān)閉掉FileOutputStream流配椭。
至此整個(gè)阻塞調(diào)用的流程已經(jīng)分析完了虫溜,可以看出Okio的阻塞IO與Java的阻塞IO是非常相似的雹姊,主要是在緩存上做了優(yōu)化。
之所以叫阻塞IO衡楞,是指IO調(diào)用會使線程阻塞吱雏,直到IO完成時(shí)線程才繼續(xù)執(zhí)行。
非阻塞調(diào)用
我們將上例中的file換成socket就變成了一個(gè)非阻塞的調(diào)用瘾境。
Okio.buffer(Okio.sink(socket))
.writeUtf8("write string by utf-8.\n")
.writeInt(1234).close();
依然從Okio.sink(socket)開始看歧杏。
// Okio.java
public static Sink sink(Socket socket) throws IOException {
if (socket == null) throw new IllegalArgumentException("socket == null");
AsyncTimeout timeout = timeout(socket);
Sink sink = sink(socket.getOutputStream(), timeout);
return timeout.sink(sink);
}
private static AsyncTimeout timeout(final Socket socket) {
return new AsyncTimeout() {
@Override protected IOException newTimeoutException(@Nullable IOException cause) {
InterruptedIOException ioe = new SocketTimeoutException("timeout");
if (cause != null) {
ioe.initCause(cause);
}
return ioe;
}
@Override protected void timedOut() {
try {
socket.close();
} catch (Exception e) {
logger.log(Level.WARNING, "Failed to close timed out socket " + socket, e);
} catch (AssertionError e) {
if (isAndroidGetsocknameError(e)) {
logger.log(Level.WARNING, "Failed to close timed out socket " + socket, e);
} else {
throw e;
}
}
}
};
}
可以看出sink方法首先調(diào)用timeout方法產(chǎn)生一個(gè)AsyncTimeout對象,該對象重寫了timedOut方法迷守,超時(shí)則將socket關(guān)閉犬绒。之后將調(diào)用sink(final OutputStream out, final Timeout timeout)創(chuàng)建原生流的代理對象,這與之前的邏輯一樣兑凿。最后調(diào)用timeout.sink(sink)凯力,把異步事件放入定時(shí)隊(duì)列,并返回經(jīng)過AsyncTimeout包裝的sink對象礼华。之后的邏輯和之前一摸一樣咐鹤,也沒有什么好分析的了。
這個(gè)IO是非阻塞的圣絮,線程不會因?yàn)榈却W(wǎng)絡(luò)數(shù)據(jù)而一致阻塞祈惶,超時(shí)的IO操作會被看門狗移出隊(duì)列,并回調(diào)timedOut方法扮匠,具體就是把socket關(guān)閉捧请。
總結(jié)
到這里整個(gè)Okio框架的解析就結(jié)束。由于篇幅和時(shí)間的限制很多功能和模塊沒有寫出來棒搜,如Pipe疹蛉,以及一些實(shí)現(xiàn)壓縮、轉(zhuǎn)碼的類帮非,不過著無傷大雅氧吐,我們已經(jīng)能看清楚Okio的核心部分讹蘑,并體會到其優(yōu)化思想,總結(jié)如下:
- 使用方便筑舅。對比Java IO和Okio我們可以看出OKio使用更方便座慰,支持鏈?zhǔn)秸{(diào)用,代碼簡潔翠拣、優(yōu)美版仔。緩存等功能對用戶都是透明的,不需要了解底層結(jié)構(gòu)也嫩方便實(shí)用误墓。
- 功能整合蛮粮。Java IO進(jìn)行不同的讀寫功能需要包裹各種裝飾類,而Okio把各種讀寫操作都整合了起來谜慌,不需要串上一堆裝飾類然想。
- cpu和內(nèi)存的優(yōu)化。數(shù)據(jù)容器采用循環(huán)鏈表實(shí)現(xiàn)欣范,Segment通過分裂变泄、合并、共享等操作避免了拷貝操作恼琼。SegmentPool會對暫時(shí)不用的Segment回收保存妨蛹,避免頻繁GC∏缇海看門狗在沒任務(wù)的時(shí)候都處于休眠狀態(tài)蛙卤,不占用cpu。ByteString通過空間換時(shí)間噩死,同時(shí)懶加載實(shí)現(xiàn)了cpu優(yōu)化颤难。
- 功能強(qiáng)大。支持阻塞IO和非阻塞IO甜滨,提供了一系列的方便工具乐严,如GZip的透明處理,對數(shù)據(jù)計(jì)算md5衣摩、sha1等都提供了支持昂验,對數(shù)據(jù)校驗(yàn)非常方便。
最后貼出一些其他分析Okio寫得不錯(cuò)的文章艾扮,本文在一定程度上參考了它們
OKio - 重新定義“短小精悍”
大概是最完全的Okio源碼解析文章
深入理解okio的優(yōu)化思想