Season_zl給初學(xué)者的RxJava2.0教程
ObservableEmitter<T> emitter
1.發(fā)射器發(fā)出onComplete()或者onError()后副编,接收器將不再接收時間茅撞。
2.游可以不發(fā)送onComplete或onError。
3.最為關(guān)鍵的是onComplete和onError必須唯一并且互斥, 即不能發(fā)多個onComplete, 也不能發(fā)多個onError, 也不能先發(fā)一個onComplete, 然后再發(fā)一個onError, 反之亦然用狱。
注: 關(guān)于onComplete和onError唯一并且互斥這一點(diǎn), 是需要自行在代碼中進(jìn)行控制, 如果你的代碼邏輯中違背了這個規(guī)則, **并不一定會導(dǎo)致程序崩潰. ** 比如發(fā)送多個onComplete是可以正常運(yùn)行的, 依然是收到第一個onComplete就不再接收了, 但若是發(fā)送多個onError, 則收到第二個onError事件會導(dǎo)致程序會崩潰.
Disposable d
當(dāng)調(diào)用它的
dispose()
方法時, 它就會將兩根管道切斷, 從而導(dǎo)致下游收不到事件嘉抓,但上游的還會繼續(xù)發(fā)送剩余事件始绍。
在Activity中將這個Disposable
保存起來, 當(dāng)Activity退出時切斷它即可访锻。多個Disposable
則使用CompositeDisposable
管理,CompositeDisposable.add()
和CompositeDisposable.clear()
總結(jié)
ObservableEmitter<T> emitter
的onComplete()
和onError()
因谎,以及Disposable d
的dispose()
都只會讓下游接收不到事件基括,但上游假如還存在事件則會繼續(xù)發(fā)送,以上的方法都可以視為階段器财岔,
subscribeOn()
和observeOn()
subscribeOn()
指定的是上游發(fā)送事件的線程,observeOn()
指定的是下游接收事件的線程.- 多次指定上游的線程只有第一次指定的有效, 也就是說多次調(diào)用subscribeOn() 只有第一次的有效, 其余的會被忽略.
- 多次指定下游的線程是可以的, 也就是說每調(diào)用一次observeOn() , 下游的線程就會切換一次.
Schedulers.io() 代表io操作的線程, 通常用于網(wǎng)絡(luò),讀寫文件等io密集型的操作
Schedulers.computation() 代表CPU計算密集型的操作, 例如需要大量計算的操作
Schedulers.newThread() 代表一個常規(guī)的新線程
AndroidSchedulers.mainThread() 代表Android的主線程
Map操作符
對原數(shù)據(jù)進(jìn)行變化操作(其實(shí)就是一個方法风皿,接收原數(shù)據(jù)操作然后返回結(jié)果數(shù)據(jù))
FlatMap操作符(玩的熟才用,否則容易暈)
將第一次發(fā)送的數(shù)據(jù)和flatMap發(fā)送的數(shù)據(jù)進(jìn)行組合再此發(fā)送匠璧。比如第一次發(fā)送ABC桐款,第二次發(fā)送123,那么
可能(因?yàn)椴槐WC順序)
會出現(xiàn)A1A2A3 B1B2B3 C1C2C3 夷恍。保證順序的話用concatMap
Zip操作符
對多個發(fā)送源的數(shù)據(jù)進(jìn)行合并魔眨,每個源數(shù)據(jù)的對應(yīng)角標(biāo)的元素進(jìn)行合并,以最短發(fā)送源的為準(zhǔn)酿雪,較長發(fā)送源的剩余元素被舍棄遏暴。同一線程一定有會有一個發(fā)送源先全部發(fā)送完畢。
Flowable(默認(rèn)緩存為128個事件执虹,響應(yīng)式拉韧鼗印)
背壓策略:BackpressureStrategy(水缸)
唠梨。一般的使用場景都是發(fā)送量大且異步(因?yàn)檫@兩個都可以會引起內(nèi)存溢出)
- ERROR袋励,上游積壓超過128事件則會直接報異常
- BUFFER, 無限制緩存大小,但是會存在OOM風(fēng)險
- DROP, 丟棄超過128個事件的剩余事件(默認(rèn)緩存為128当叭,你發(fā)了129茬故,那么第129不會進(jìn)入水缸)。 Drop就是直接把存不下的事件丟棄
- LATEST, Latest就是只保留最新的事件蚁鳖,當(dāng)水缸(緩存128)已經(jīng)存滿了128個事件磺芭,那么這時候還有事件進(jìn)入的話則前面的事件會被覆蓋掉。
背壓源碼解析
Flowable
// 創(chuàng)建上游的方法
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
// 檢查是否為null的工具類醉箕,不必深究
ObjectHelper.requireNonNull(source, "source is null");
ObjectHelper.requireNonNull(mode, "mode is null");
// RxJavaPlugins.onAssembly()钾腺。因?yàn)槭擎準(zhǔn)侥J剑苑祷乇旧砑タ悖@個方法就是一個包裹轉(zhuǎn)換的功能放棒,不必深究
// FlowableCreate,這個類才是重點(diǎn)
return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
}
// 訂閱下游的方法
public final void subscribe(Subscriber<? super T> s) {
// 一般我都是直接new一個Subscriber己英,所以走else塊间螟。
if (s instanceof FlowableSubscriber) {
subscribe((FlowableSubscriber<? super T>)s);
} else {
ObjectHelper.requireNonNull(s, "s is null");
// 包裹一層
subscribe(new StrictSubscriber<T>(s));
}
public final void subscribe(FlowableSubscriber<? super T> s) {
ObjectHelper.requireNonNull(s, "s is null");
try {
Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
ObjectHelper.requireNonNull(z, "The RxJavaPlugins.onSubscribe hook returned a null FlowableSubscriber. Please check the handler provided to RxJavaPlugins.setOnFlowableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(z); // !!O崞啤H偕!D帷0恃妗!<忧凇仙辟!真實(shí)發(fā)起訂閱(其他代碼可不看,就看這個句)
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Subscription has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
StrictSubscriber 下游類
public class StrictSubscriber<T>
extends AtomicInteger
implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = -4945028590049415624L;
final Subscriber<? super T> downstream;
final AtomicThrowable error;
final AtomicLong requested;
final AtomicReference<Subscription> upstream;
final AtomicBoolean once;
volatile boolean done;
public StrictSubscriber(Subscriber<? super T> downstream) {
this.downstream = downstream;
this.error = new AtomicThrowable();
this.requested = new AtomicLong();
this.upstream = new AtomicReference<Subscription>();
this.once = new AtomicBoolean();
}
@Override
public void request(long n) {
if (n <= 0) {
cancel();
onError(new IllegalArgumentException("§3.9 violated: positive request amount required but it was " + n));
} else {
SubscriptionHelper.deferredRequest(upstream, requested, n);
}
}
@Override
public void cancel() {
if (!done) {
SubscriptionHelper.cancel(upstream);
}
}
@Override
public void onSubscribe(Subscription s) {
if (once.compareAndSet(false, true)) {
downstream.onSubscribe(this);
SubscriptionHelper.deferredSetOnce(this.upstream, requested, s);
} else {
s.cancel();
cancel();
onError(new IllegalStateException("§2.12 violated: onSubscribe must be called at most once"));
}
}
@Override
public void onNext(T t) {
HalfSerializer.onNext(downstream, t, this, error);
}
@Override
public void onError(Throwable t) {
done = true;
HalfSerializer.onError(downstream, t, this, error);
}
@Override
public void onComplete() {
done = true;
HalfSerializer.onComplete(downstream, this, error);
}
}
FlowableCreate(繼承Flowable)
// 構(gòu)造方法
public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
// 持有把上游對象
this.source = source;
// 持有背壓模式對象
this.backpressure = backpressure;
}
// 實(shí)際訂閱鳄梅,F(xiàn)lowable的subscribe()內(nèi)部會調(diào)用這個方法叠国。
// 當(dāng)你使用訂閱下游的時候,會把下游對象傳到這個方法戴尸。
@Override
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
// 工廠模式粟焊,根據(jù)背壓模式實(shí)例化對應(yīng)的發(fā)射器,且會把下游對象通過發(fā)射器的構(gòu)造方法讓發(fā)射器內(nèi)部持有(所以我們在發(fā)射器才會知道下游所需的處理能力)孙蒙。
// 背壓的核心就是這些工廠類项棠,執(zhí)行的條件不同產(chǎn)生的效果就不同
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
// 調(diào)用下游的onSubscribe,并且把發(fā)射器對象傳遞過去讓下游對象持有挎峦。(雙向傳遞香追,下游和發(fā)射器互相持有對方的對象)
t.onSubscribe(emitter);
try {
// 上游持有了發(fā)射器對象
// 使用上游對象執(zhí)行該對象的subscribe,其實(shí)就是走發(fā)射事件的邏輯
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
BaseEmitter背壓發(fā)射器基類
abstract static class BaseEmitter<T>
extends AtomicLong
implements FlowableEmitter<T>, Subscription {
private static final long serialVersionUID = 7326289992464377023L;
final Subscriber<? super T> downstream;
final SequentialDisposable serial;
BaseEmitter(Subscriber<? super T> downstream) {
// 下游對象
this.downstream = downstream;
// 切斷對象
this.serial = new SequentialDisposable();
}
@Override
public void onComplete() {
complete();
}
protected void complete() {
// 如果已經(jīng)切斷了就跳過坦胶,所以下游不會收到onComplete()事件
if (isCancelled()) {
return;
}
try {
// 回調(diào)下游的onComplete()事件
downstream.onComplete();
} finally {
// 切斷
serial.dispose();
}
}
@Override
public final void onError(Throwable e) {
if (!tryOnError(e)) {
// 已經(jīng)切斷透典,如果接著發(fā)送onError內(nèi)部會拋異常
RxJavaPlugins.onError(e);
}
}
@Override
public boolean tryOnError(Throwable e) {
return error(e);
}
protected boolean error(Throwable e) {
// 判斷開發(fā)者傳遞的異常是否為null
if (e == null) {
e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (isCancelled()) {
return false;
}
try {
// 回調(diào)下游的方法
downstream.onError(e);
} finally {
// 切斷
serial.dispose();
}
return true;
}
@Override
public final void cancel() {
// 切斷
serial.dispose();
onUnsubscribed();
}
// 注銷訂閱,空實(shí)現(xiàn)
void onUnsubscribed() {
// default is no-op
}
@Override
public final boolean isCancelled() {
return serial.isDisposed();
}
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
// 將下游請求的事件數(shù)存放
BackpressureHelper.add(this, n);
onRequested();
}
}
void onRequested() {
// default is no-op
}
@Override
public final void setDisposable(Disposable d) {
serial.update(d);
}
@Override
public final void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public final long requested() {
return get();
}
@Override
public final FlowableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
@Override
public String toString() {
return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
}
}
ErrorAsyncEmitter背壓發(fā)射器(繼承了NoOverflowBaseAsyncEmitter)
static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
private static final long serialVersionUID = 338953216916120960L;
ErrorAsyncEmitter(Subscriber<? super T> downstream) {
super(downstream);
}
@Override
void onOverflow() {
// 回調(diào)下游的onError()顿苇,直接拋出異常
onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
}
}
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
private static final long serialVersionUID = 4127754106204442833L;
NoOverflowBaseAsyncEmitter(Subscriber<? super T> downstream) {
super(downstream);
}
@Override
public final void onNext(T t) {
if (isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (get() != 0) { // 下游所需事件不為0峭咒,就是下游還有處理的事件
downstream.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
// 調(diào)用子類重寫的方法
onOverflow();
}
}
// 子類重寫
abstract void onOverflow();
}
// BackpressureHelper的方法
public static long produced(AtomicLong requested, long n) {
for (;;) {
long current = requested.get();
if (current == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
// 下游所需事件 - 1
long update = current - n;
if (update < 0L) {
RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
update = 0L;
}
// 重置所需事件數(shù)
if (requested.compareAndSet(current, update)) {
return update;
}
}
}