前言
自從去年8月底《淺談RxJava與2.0的新特性》宙刘,已經(jīng)過(guò)去快一年。筆者也沒(méi)想到此文竟有讀者等筆者填坑快一年了稚叹,不禁汗顏。所以筆者打算寫(xiě)關(guān)于一個(gè) RxJava2 的系列文章,既為填坑叠洗,也為回報(bào)讀者對(duì)我的支持。本文為第一篇旅东。
讀本系列文章灭抑,你可能有如下收獲:
- 了解其設(shè)計(jì)原理,代碼實(shí)現(xiàn)
- 掌握操作符的正確使用姿勢(shì)抵代,避免采坑
- 強(qiáng)化 Rx 編程思想腾节,寫(xiě)出更 Rx 的代碼
- 跟讀精彩的源碼,強(qiáng)化編程功底
廢話不多說(shuō)荤牍,進(jìn)入正題案腺。
Reactive Streams
之前在《淺談RxJava與2.0的新特性》我們提到過(guò), RxJava2 遵循 Reactive Streams 的編程規(guī)范康吵, 或者更精確的說(shuō)劈榨,是 RxJava2 中的 Flowable 相關(guān)的類。因此我們只分析 Flowable 相關(guān)的實(shí)現(xiàn)與使用晦嵌,剩下的 Observable同辣、 Completable、Single耍铜、 Maybe
這些不會(huì)再提及邑闺,相信讀者朋友們可以舉一反三。
Reactive Streams 中明確規(guī)范了如下4點(diǎn):
- process a potentially unbounded number of elements
- in sequence,
- asynchronously passing elements between components,
- with mandatory non-blocking backpressure.
后面筆者會(huì)用 RS 代替全稱棕兼。
請(qǐng)跟隨本系列文章慢慢看 Flowable 是出色的完成上述的要求陡舅。
閱讀源碼的正確姿勢(shì)
Rx2 在源碼中加入了一些注解,這些注解對(duì)運(yùn)行沒(méi)有任何實(shí)際作用伴挚,僅僅是用作標(biāo)識(shí)備注靶衍,有助于開(kāi)發(fā)者了解某個(gè)操作符的正確使用姿勢(shì)灾炭,同時(shí)也有利于閱讀源碼時(shí)整理思路。這些注解位于io.reactivex.annotations
包名下颅眶,這里著重介紹一個(gè)蜈出。
BackpressureSupport
BackpressureSupport 是用作標(biāo)識(shí)這個(gè)操作符對(duì)背壓的支持類型,有以下幾種:
- PASS_THROUGH:表示這個(gè)操作符僅僅傳遞背壓類型涛酗,不做任何改變铡原。例如
defer
操作符,這個(gè)操作符支持的背壓類型取決于Callable
產(chǎn)生的Publisher
商叹。 - FULL:表示這個(gè)操作符支持完全的背壓燕刻,協(xié)調(diào)上下游關(guān)系
- SPECIAL:表示這個(gè)操作符支持的背壓類型由方法上的文檔說(shuō)明
- UNBOUNDED_IN:表示這個(gè)操作符會(huì)向上游請(qǐng)求 Long.MAX_VALUE,并協(xié)調(diào)下游
- ERROR:表示如果下游沒(méi)有足夠數(shù)量的request剖笙,上游發(fā)射了超額的數(shù)據(jù)卵洗,這個(gè)操作符會(huì)拋出一個(gè)
MissingBackpressureException
- NONE:表示不處理背壓
上面這些字面解釋看起來(lái)還是很繞的,尤其是對(duì)于沒(méi)有閱讀過(guò)相關(guān)源碼的讀者弥咪。我們也不必一次性全部弄明白过蹂,后續(xù)會(huì)慢慢講清楚所有。
走進(jìn)create源碼
前文中有提到過(guò)聚至,Rx2 收回了create
方法的權(quán)限酷勺,使開(kāi)發(fā)者自定義的create
也能夠正確的支持背壓。而實(shí)現(xiàn)的方式就是通過(guò)額外提供一個(gè)BackpressureStrategy
參數(shù)晚岭。也因此鸥印,create
的方法注解中 BackpressureSupport
是 SPECIAL
。
FlowableCreate
拋開(kāi)Rx2提供的 plugin 不談坦报,本質(zhì)上就是用 create 傳進(jìn)來(lái)的2個(gè)參數(shù)創(chuàng)建了FlowableCreate
這個(gè)類库说。
根據(jù)傳入的BackpressureStrategy
生成不同的Emitter
對(duì)象。并遵循一致的編程約定片择,先調(diào)用 onSubscribe潜的,隨后將Emitter
傳遞給FlowableOnSubscribe
用來(lái)發(fā)射數(shù)據(jù)。
@Override
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
t.onSubscribe(emitter);
try {
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
BackpressureStrategy
Rx2 大量的類通過(guò)繼承AtomicLong
來(lái)表示計(jì)算個(gè)數(shù)與發(fā)射個(gè)數(shù)字管,請(qǐng)求一個(gè)+1啰挪,發(fā)射一個(gè)-1。并且在 Rx2 中 Long.MAX_VALUE 有特殊含義嘲叔,表示無(wú)限的數(shù)據(jù)亡呵。即,如果 request(Long.MAX_VALUE)
硫戈,即使發(fā)射了數(shù)據(jù)也不會(huì)減少自身的數(shù)值锰什。
這里所有的 Emitter 都繼承了基類BaseEmitter
,并提供一些公共方法如setDisposable/setCancellable/requested/serialize
等。然后根據(jù)各自的背壓策略汁胆,實(shí)現(xiàn)相應(yīng)的邏輯梭姓,下面分別介紹。
MISSING
MISSING即沒(méi)有背壓嫩码,我們看 onNext 函數(shù)會(huì)發(fā)現(xiàn)誉尖,每調(diào)用一次就會(huì)傳遞給下游的 subscriber.onNext
,空指針則onError
铸题。
@Override
public void onNext(T t) {
if (isCancelled()) {
return;
}
if (t != null) {
actual.onNext(t);
} else {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
for (;;) {
long r = get();
if (r == 0L || compareAndSet(r, r - 1)) {
return;
}
}
}
上面的代碼是2.1.2版本的源碼铡恕,筆者認(rèn)為這里有一處 BUG 。即在自減的時(shí)候回挽,沒(méi)有檢查 Long.MAX_VALUE 的情況没咙,導(dǎo)致在request(Long.MAX_VALUE)
后,發(fā)射數(shù)據(jù)時(shí)依然會(huì)不斷自減千劈,這是與一致的設(shè)計(jì)思路相悖的。反觀下面 DROP 與 BUFFER 相關(guān)的 Emitter 處理時(shí)牌捷,則直接調(diào)用了BackpressureHelper.produced(this, 1)
墙牌,在里面會(huì)有 Long.MAX_VALUE 的判斷。
雖然有點(diǎn)小 BUG暗甥,但是實(shí)際中除了在requested()
函數(shù)中會(huì)出錯(cuò)外喜滨,不會(huì)影響正常的執(zhí)行流。且一般開(kāi)發(fā)者也不會(huì)使用Emitter.requested()
函數(shù)撤防。
雖然 MISSING 不支持背壓虽风,但是沒(méi)關(guān)系,我們可以通過(guò)操作符來(lái)彌補(bǔ)寄月。
這些操作符結(jié)合使用 MISSING 的 create 方法辜膝,使得原本不支持背壓的 Flowable 支持背壓。
當(dāng)然我們也大可不必這樣麻煩漾肮,既然要使用 buffer厂抖、drop 或者 latest,使用下面的策略即可克懊。除非我們需要那些操作符提供的額外功能忱辅。
ERROR
ERROR則和最開(kāi)始的BackpressureSupport.ERROR
表現(xiàn)一致。
下面這塊代碼是NoOverflowBaseAsyncEmitter
谭溉,會(huì)被 ERROR 和 DROP 對(duì)應(yīng)的Emitter
繼承墙懂。邏輯也很簡(jiǎn)單,即請(qǐng)求數(shù)如果還大于0扮念,則向下發(fā)射并將請(qǐng)求數(shù)減1损搬,否則走onOverflow()
方法。
@Override
public final void onNext(T t) {
if (isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (get() != 0) {
actual.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
onOverflow();
}
}
而 ERROR 對(duì)應(yīng)的實(shí)現(xiàn)則很簡(jiǎn)單了,不在贅述场躯。
@Override
void onOverflow() {
onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
}
DROP
DROP即直接丟棄超額的數(shù)據(jù)谈为,體現(xiàn)在代碼中就非常簡(jiǎn)單。
@Override
void onOverflow() {
// nothing to do
}
BUFFER 與 LATEST
這倆之所以放到了一起踢关,是因?yàn)?BUFFER 與 LATEST 本質(zhì)上都是緩存了數(shù)據(jù)伞鲫,細(xì)節(jié)上的區(qū)別就是,BUFFER 是緩存了所有數(shù)據(jù)签舞,而 LATEST 只保留了最近的一個(gè) onNext 數(shù)據(jù)秕脓。
體現(xiàn)在代碼中這兩者最主要的區(qū)別就是一個(gè)用了隊(duì)列來(lái)緩存,一個(gè)用了AtomicReference 來(lái)維持最后一個(gè)未被消費(fèi)的數(shù)據(jù)儒搭。
就挑 BUFFER 來(lái)說(shuō)吠架, onNext 就是將數(shù)據(jù)扔進(jìn)隊(duì)列,而后嘗試消費(fèi)數(shù)據(jù)即調(diào)用drain()
搂鲫。onError 與 onComplete 則是將 結(jié)束標(biāo)識(shí)置為 true 傍药,并保留異常,然后依然也是在drain()
中消費(fèi)該消息魂仍。
在/onNext/onError/onComplete/onRequested時(shí)拐辽,都會(huì)調(diào)用drain()
來(lái)消費(fèi)隊(duì)列中的數(shù)據(jù)。
@Override
public void onNext(T t) {
if (done || isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
queue.offer(t);
drain();
}
drain
中做的事就比較復(fù)雜了擦酌,為了保證線程安全俱诸,首先通過(guò)一個(gè)AtomicInteger
來(lái)確保只有一個(gè)線程可以進(jìn)入for(;;)
循環(huán)。
在 for 循環(huán)中赊舶,不斷的消費(fèi)隊(duì)列中的數(shù)據(jù)睁搭,如果隊(duì)列為空則檢查結(jié)束標(biāo)識(shí)是否為 true,是的話則發(fā)射 onComplete 或者 onError 笼平。
void drain() {
if (wip.getAndIncrement() != 0) {
return;
}
int missed = 1;
final Subscriber<? super T> a = actual;
final SpscLinkedArrayQueue<T> q = queue;
for (;;) {
long r = get();
long e = 0L;
while (e != r) {
if (isCancelled()) {
q.clear();
return;
}
boolean d = done;
T o = q.poll();
boolean empty = o == null;
if (d && empty) {
Throwable ex = error;
if (ex != null) {
error(ex);
} else {
complete();
}
return;
}
if (empty) {
break;
}
a.onNext(o);
e++;
}
if (e == r) {
if (isCancelled()) {
q.clear();
return;
}
boolean d = done;
boolean empty = q.isEmpty();
if (d && empty) {
Throwable ex = error;
if (ex != null) {
error(ex);
} else {
complete();
}
return;
}
}
if (e != 0) {
BackpressureHelper.produced(this, e);
}
missed = wip.addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
這里請(qǐng)大家留意一個(gè)編程的套路园骆,在絕大多數(shù)隊(duì)列消費(fèi)的場(chǎng)景里, Rx2 中都是使用了下面的方式出吹。這也是我們可以積累使用的遇伞。通過(guò)這種方式可以保證for循環(huán)里的代碼是單線程執(zhí)行的,且如果執(zhí)行期間有一次或多次新的調(diào)用drain()
捶牢,會(huì)導(dǎo)致重新走一遍包含注釋處的代碼鸠珠,確保數(shù)據(jù)可以正確的消費(fèi)發(fā)射。
void drain() {
if (wip.getAndIncrement() != 0) {
return;
}
int missed = 1;
for (;;) {
// 消費(fèi)隊(duì)列秋麸, 發(fā)射數(shù)據(jù)
missed = wip.addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
小結(jié)
筆者在介紹過(guò)程中已經(jīng)省略了很多細(xì)枝末節(jié)渐排,不免顯得知識(shí)有些分散,結(jié)合源碼閱讀效果更佳灸蟆。沒(méi)想到一個(gè)小小的 create 也包含這么多的玄機(jī)驯耻。
我相信通過(guò)閱讀這篇文章,讀者們寫(xiě) create 的時(shí)候應(yīng)該可以做到結(jié)合實(shí)際場(chǎng)景選擇正確的BackpressureStrategy
。
有了 create 便從此開(kāi)啟 Rx2 萬(wàn)里長(zhǎng)征第一步可缚。下一篇霎迫,我們將會(huì)介紹 Rx2 的線程調(diào)度相關(guān)的操作符及其實(shí)現(xiàn),敬請(qǐng)期待帘靡。