背景
對(duì)于生產(chǎn)者和消費(fèi)者模型肛走,存在一個(gè)問(wèn)題就是當(dāng)生產(chǎn)者生產(chǎn)的速度大于消費(fèi)者消費(fèi)速度焚志,并且生產(chǎn)過(guò)程不會(huì)停止映凳,生產(chǎn)者和消費(fèi)者位于不同的線(xiàn)程中胆筒,這是要如何對(duì)待多余出來(lái)的生產(chǎn)內(nèi)容?是丟掉诈豌,是緩沖仆救?
在強(qiáng)大的異步處理框架中,RxJava又是怎么處理的呢矫渔?如果在工作中萬(wàn)一發(fā)生丟包事件怎么辦彤蔽?
使用環(huán)境與本文目的
RxJava版本:2.1.0
默認(rèn)條件:觀(guān)察者和被觀(guān)察者位于main線(xiàn)程中,且使用了默認(rèn)的事件發(fā)射器庙洼。
目的:通過(guò)Flowable顿痪,探究RxJava的流控機(jī)制。
Flowable創(chuàng)建過(guò)程
Flowable flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
for(int i =0 ; i<10;i++){
e.onNext(i);
}
}
}
, BackpressureStrategy.BUFFER);
在create方法中油够,完成了對(duì)Flowable的構(gòu)建過(guò)程:
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
ObjectHelper.requireNonNull(source, "source is null");
ObjectHelper.requireNonNull(mode, "mode is null");
//在工廠(chǎng)中構(gòu)建出一個(gè)Flowable對(duì)象蚁袭。需要傳入對(duì)向FlowableCreate
//如果要構(gòu)建Observable,則傳入的是ObservableDefer
return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
}
FlowableCreate實(shí)際上是Flowable子類(lèi)石咬。當(dāng)調(diào)用Flowable的subscribe方法時(shí)揩悄,實(shí)際上將執(zhí)行FlowableCreate中的subscribeActual(該方法在Flowable是一個(gè)抽象方法,在FlowableCreate中實(shí)現(xiàn))方法:
public final void subscribe(FlowableSubscriber<? super T> s) {
ObjectHelper.requireNonNull(s, "s is null");
try {
Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber");
subscribeActual(z);
}
......
subscribe過(guò)程分析
實(shí)際執(zhí)行的是subscribeActual鬼悠,這個(gè)方法非常重要虏束,該方法的實(shí)現(xiàn)為:
BaseEmitter<T> emitter;
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
t.onSubscribe(emitter);
try {
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
我們可以看到:
- 它首先會(huì)根據(jù)我們選擇的背壓模式,設(shè)置不同的emitter厦章;如果沒(méi)有設(shè)置镇匀,默認(rèn)將開(kāi)啟帶有緩存的emitter;
- Subscriber中的onSubscribe在事件沒(méi)有發(fā)射前就執(zhí)行了袜啃;
- 事件的發(fā)射汗侵,是通過(guò)source.subscribe(emitter)實(shí)現(xiàn)的,而這個(gè)source,實(shí)際上就是我們?cè)跇?gòu)建Flowable時(shí)創(chuàng)建的FlowableOnSubscribe晰韵。
現(xiàn)在回過(guò)來(lái)我們看看在構(gòu)建時(shí)发乔,F(xiàn)lowableOnSubscribe的內(nèi)容,通常我們會(huì)這么寫(xiě):
new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
for(int i =0 ; i<10;i++){
e.onNext(i);
}
}
}
轉(zhuǎn)了一圈,又回到了這里雪猪。FlowableEmitter來(lái)發(fā)射事件栏尚。默認(rèn)的,將使用BufferAsyncEmitter只恨,這是一個(gè)支持背壓處理的Emitter译仗。
該Emitter中,onNext方法是這樣的:
@Override
public void onNext(T t) {
if (done || isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
queue.offer(t); //生產(chǎn)
drain(); //實(shí)際消費(fèi)過(guò)程會(huì)執(zhí)行queue.poll
}
我們看到官觅,queue就是它維護(hù)的一個(gè)SpscLinkedArrayQueue隊(duì)列(其中使用的大量的原子類(lèi)型處理多線(xiàn)程訪(fǎng)問(wèn)問(wèn)題)纵菌,隊(duì)列容量會(huì)根據(jù)生產(chǎn)消費(fèi)情況自動(dòng)擴(kuò)容。
生產(chǎn)過(guò)程休涤,或者說(shuō)事件發(fā)射過(guò)程咱圆,直接調(diào)用了隊(duì)列的offer方法,進(jìn)行入隊(duì)操作功氨;
消費(fèi)過(guò)程序苏,或者說(shuō)消費(fèi)事件,則是先使用了drain方法捷凄,該方法的本質(zhì)忱详,是執(zhí)行隊(duì)列的poll方法取出事件,然后在onNext()中消費(fèi)纵势。
在 offer 中主要完成生產(chǎn):
//producerLookAhead相當(dāng)于一個(gè)生產(chǎn)者的斥候踱阿,主要用于檢測(cè)邊界
//這里將檢測(cè)管钳,要插入的位置钦铁,是否已經(jīng)越界了
if (index < producerLookAhead) {
return writeToQueue(buffer, e, index, offset);
}
//else這種情況,主要時(shí)考慮到循環(huán)隊(duì)列
else {
//producerLookAheadStep實(shí)際上是一個(gè)定值才漆,表示固定步長(zhǎng)
final int lookAheadStep = producerLookAheadStep;
// go around the buffer or resize if full (unless we hit max capacity)
//首先檢查前進(jìn)了固定步長(zhǎng)之后牛曹,是否還有位置用來(lái)插入,注意醇滥,使用calcWrappedOffset方法黎比,
//包括很多其他用到mask的地方,實(shí)際上是將數(shù)組作為一個(gè)循環(huán)隊(duì)列使用鸳玩。
//如果前進(jìn)固定步長(zhǎng)之后阅虫,還可以插入,那么不跟,說(shuō)明生產(chǎn)者可用空間還有很多
int lookAheadElementOffset = calcWrappedOffset(index + lookAheadStep, mask);
if (null == lvElement(buffer, lookAheadElementOffset)) { // LoadLoad
producerLookAhead = index + lookAheadStep - 1; // joy, there's plenty of room
return writeToQueue(buffer, e, index, offset);
}
//檢查下一插入位是否為空颓帝,如果不為空,則使用 ;
//反之购城,插入位已經(jīng)滿(mǎn)了吕座,需要?jiǎng)?chuàng)建一個(gè)新的數(shù)組以完成生產(chǎn)者的工作
else if (null == lvElement(buffer, calcWrappedOffset(index + 1, mask))) { // buffer is not full
return writeToQueue(buffer, e, index, offset);
} else {
//現(xiàn)有數(shù)組容量已經(jīng)滿(mǎn)了,消費(fèi)者速度無(wú)法跟上生產(chǎn)者速度瘪板,
//需要開(kāi)辟一塊新的空間用于生產(chǎn)吴趴。空間大小和現(xiàn)有數(shù)組大小一致侮攀。
//這里將完成對(duì)已經(jīng)生產(chǎn)并且尚未消費(fèi)的數(shù)組進(jìn)行保存的工作锣枝;
//同時(shí),開(kāi)辟一個(gè)新的數(shù)組魏身,用于生產(chǎn)
resize(buffer, index, offset, e, mask); // add a buffer and link old to new
return true;
}
}
在poll中主要完成事件取出惊橱,以在onNext中消費(fèi):
public T poll() {
// local load of field to avoid repeated loads after volatile reads
final AtomicReferenceArray<Object> buffer = consumerBuffer;
final long index = lpConsumerIndex();
final int mask = consumerMask;
final int offset = calcWrappedOffset(index, mask);
final Object e = lvElement(buffer, offset);// LoadLoad
boolean isNextBuffer = e == HAS_NEXT;
if (null != e && !isNextBuffer) {
//取出發(fā)射的事件,進(jìn)行消費(fèi)
soElement(buffer, offset, null);// StoreStore
soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
return (T) e;
} else if (isNextBuffer) {
//如果這個(gè)數(shù)組中所有元素已經(jīng)消費(fèi)完箭昵,同時(shí)生產(chǎn)者已經(jīng)不再這個(gè)數(shù)組中進(jìn)行生產(chǎn)工作税朴;
//跳轉(zhuǎn)到新的數(shù)組中,完成消費(fèi)工作家制,同時(shí)正林,移除當(dāng)前數(shù)組,即放棄這塊空間颤殴,不再使用
return newBufferPoll(lvNext(buffer), index, mask);
}
return null;
}
本質(zhì)上將觅廓,生產(chǎn)和消費(fèi)就是在操作這樣一個(gè)隊(duì)列。
現(xiàn)在涵但,可以回過(guò)頭來(lái)杈绸,重新看一看上面的 drain() 方法,看看它具體的消費(fèi)過(guò)程矮瘟,這個(gè)方法很有意思瞳脓。
drain:
void drain() {
//保證同時(shí)只能有一個(gè)線(xiàn)程操作進(jìn)行下面的循環(huán)
//注意在該方法的末尾,對(duì)wip進(jìn)行了重置為-1澈侠,打開(kāi)進(jìn)行循環(huán)的權(quán)限
if (wip.getAndIncrement() != 0) {
return;
}
int missed = 1;
final Subscriber<? super T> a = actual;
//無(wú)限隊(duì)列劫侧,本質(zhì)上有很多個(gè)固定長(zhǎng)度的數(shù)組自動(dòng)擴(kuò)展構(gòu)成
final SpscLinkedArrayQueue<T> q = queue;
//死循環(huán)
for (;;) {
//得到請(qǐng)求的數(shù)量,該值是通過(guò)Subscription.request()設(shè)置的哨啃,
//而這個(gè)方法烧栋,Subscription參數(shù),實(shí)際上在Subscriber的onSubscribe傳遞進(jìn)去拳球。
//所以审姓,這也就是為什么,要在subscriber中request(num), num為多少祝峻,就消費(fèi)多少魔吐。
//這個(gè)請(qǐng)求對(duì)于外界來(lái)說(shuō)次坡,只能通過(guò)subscription設(shè)置
long r = get();
long e = 0L;
//如果請(qǐng)求量為0,不進(jìn)入循環(huán)進(jìn)行消費(fèi)
while (e != r) {
if (isCancelled()) {
q.clear();
return;
}
//用來(lái)判斷是否執(zhí)行了onComplete或者onError
boolean d = done;
T o = q.poll();
boolean empty = o == null;
//如果事件全部消費(fèi)完画畅,之后執(zhí)行了onCopmlete或者onError
if (d && empty) {
Throwable ex = error;
if (ex != null) {
super.onError(ex);
} else {
super.onComplete();
}
return;
}
//如果事件全部消費(fèi)完砸琅,跳出本次循環(huán)
//注意,此時(shí)空轉(zhuǎn)了轴踱。如果消費(fèi)者速度大于生產(chǎn)者速度症脂,會(huì)發(fā)生這次空轉(zhuǎn),同時(shí)繼續(xù)循環(huán)過(guò)程
if (empty) {
break;
}
//消費(fèi)事件
a.onNext(o);
//處理完一件事情淫僻,計(jì)數(shù)器加一
e++;
}
if (e == r) {
if (isCancelled()) {
q.clear();
return;
}
boolean d = done;
boolean empty = q.isEmpty();
if (d && empty) {
Throwable ex = error;
if (ex != null) {
super.onError(ex);
} else {
super.onComplete();
}
return;
}
}
//上一次request的量已經(jīng)全部完成诱篷,此時(shí)重置請(qǐng)求量
if (e != 0) {
BackpressureHelper.produced(this, e);
}
//開(kāi)鎖
missed = wip.addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
總結(jié)
上述的分析過(guò)程,實(shí)際上并沒(méi)有設(shè)置觀(guān)察者雳灵、 被觀(guān)察者于不同的線(xiàn)程棕所,且使用默認(rèn)的事件發(fā)射器。緩沖隊(duì)列的空間是無(wú)限大的(一旦當(dāng)前緩沖被使用完悯辙,則開(kāi)辟新的緩沖空間琳省,直到這個(gè)空間的容量達(dá)到了 long 類(lèi)型的最大值,或者內(nèi)存溢出)躲撰。
這種背壓方式针贬,需要觀(guān)察者或者消費(fèi)者主動(dòng)請(qǐng)求要處理的事件的數(shù)量,已達(dá)到流速控制拢蛋。