10章 RxJava源碼分析

本篇文章已授權(quán)微信公眾號 YYGeeker 獨(dú)家發(fā)布轉(zhuǎn)載請標(biāo)明出處

CSDN學(xué)院課程地址

10. RxJava源碼分析

RxJava源碼分析最主要的點(diǎn)在于

  • RxJava是如何從事件流的發(fā)送方發(fā)送到事件流的接收方的
  • RxJava是如何對操作符進(jìn)行封裝和操作的
  • RxJava是如何隨意切換線程的

在分析的過程中沥曹,部分源碼分析我們會(huì)通過手寫RxJava的部分代碼進(jìn)行分析份名,當(dāng)然也會(huì)結(jié)合實(shí)際RxJava的代碼進(jìn)行分析。其中妓美,手寫RxJava的原因是為了簡化源代碼僵腺,讓讀者方便閱讀到主要代碼,更快的看懂RxJava的實(shí)現(xiàn)思路部脚。在閱讀源碼之前想邦,我們需要對RxJava的大體概念進(jìn)行簡單的梳理

  • 發(fā)射器:Emitter裤纹,發(fā)射數(shù)據(jù)的對象
  • 被觀察者:Observable委刘,被觀察的對象
  • 觀察者:Observer,觀察的對象
  • 被觀察者被訂閱時(shí):ObservableOnSubscribe鹰椒,被訂閱時(shí)的回調(diào)锡移,同時(shí)創(chuàng)建出發(fā)射器
  • 釋放者:Disposable,釋放RxJava的對象

RxJava的分析三步驟

  • 創(chuàng)建:被觀察者創(chuàng)建的過程
  • 訂閱:被觀察者訂閱觀察者的過程
  • 發(fā)射:發(fā)射器發(fā)射的過程

RxJava原理圖解

  • 第一排表示各個(gè)對象的創(chuàng)建關(guān)系漆际,A->B->C->D
  • 第二排表示各個(gè)對象的訂閱關(guān)系淆珊,D->C->B->A
  • 第三排表示各個(gè)對象的發(fā)射關(guān)系,A->B->C->D
在這里插入圖片描述

10.1 RxJava的事件發(fā)射原理

知識(shí)點(diǎn):

  • 理解發(fā)射數(shù)據(jù)的過程
  • 理解接收數(shù)據(jù)的過程

以下是手寫RxJava的代碼

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(Emitter<String> emitter) {
        emitter.onNext("Hello RxJava");
        emitter.onError();
        emitter.onNext("Hello RxJava");
    }
}).subscribe(new Observabler<String>() {
    @Override
    public void onSubscribe() {
        
    }

    @Override
    public void onNext(String string) {
        System.out.println("onNext=" + string);
    }

    @Override
    public void onError() {
        System.out.println("onError");
    }

    @Override
    public void onComplete() {

    }
});

輸出結(jié)果:在輸出onError后奸汇,就不會(huì)繼續(xù)收到新的事件流施符,表示事件已經(jīng)被釋放了

onNext=Hello RxJava
onError

1、定義接口

發(fā)射器

public interface Emitter<T> {
    void onNext(T t);
    void onError();
}

觀察者

public interface Observer<T> {
    void onSubscribe();
    void onNext(T t);
    void onError();
    void onComplete();
}

被觀察者被訂閱時(shí)

public interface ObservableOnSubscribe<T> {
    void subscribe(Emitter<T> emitter);
}

2、實(shí)現(xiàn)被觀察者

被觀察者Observable負(fù)責(zé)創(chuàng)建奈应、訂閱痒留,發(fā)射由發(fā)射器負(fù)責(zé)

  • 創(chuàng)建:創(chuàng)建的過程只是將傳遞進(jìn)來的參數(shù)交給新的ObservableCreate進(jìn)行管理
  • 訂閱:訂閱的過程只是實(shí)現(xiàn)創(chuàng)建出來的ObservableCreate的subscribeActual方法
public abstract class Observable<T> {

    public static <T> ObservableCreate create(ObservableOnSubscribe<T> observableOnSubscribe) {
        return new ObservableCreate<T>(observableOnSubscribe);
    }

    public void subscribe(Observer<T> observer) {
        subscribeActual(observer);
    }

    public abstract void subscribeActual(Observer<T> observer);
}

3、ObservableCreate

ObservableCreate繼承自O(shè)bservable听哭,由于Observable.create返回當(dāng)前ObservableCreate慢洋,所以在subscribe的時(shí)候,走的是這里的subscribeActual陆盘,subscribeActual中會(huì)去創(chuàng)建發(fā)射器普筹,并給發(fā)射器傳遞進(jìn)去observer

public class ObservableCreate<T> extends Observable{

    private ObservableOnSubscribe source;

    public ObservableCreate(ObservableOnSubscribe observableOnSubscribe) {
        this.source = observableOnSubscribe;
    }

    @Override
    public void subscribeActual(Observer observer) {
        //固定的三步曲分析法(個(gè)人創(chuàng)建,基本都是這個(gè)步驟)
        
        //1隘马、創(chuàng)建發(fā)射器
        EmitterCreate<T> emitterCreate = new EmitterCreate<>(observer);
        //2太防、回調(diào)observer的onSubscribe
        observer.onSubscribe();
        //3、回調(diào)上一個(gè)的subscribe
        source.subscribe(emitterCreate);
    }
}

4酸员、EmitterCreate

傳遞進(jìn)來的observer即是我們最開始訂閱時(shí)候new出來的杏头,此時(shí)發(fā)射數(shù)據(jù),就會(huì)去調(diào)用Observer的onNext方法沸呐,這樣數(shù)據(jù)就從發(fā)射器中傳遞到觀察者中了醇王。DisposableHelper在后面會(huì)講到,這里只是用作判斷是否被釋放的一個(gè)工具類

public class EmitterCreate<T>
        extends AtomicReference<Disposable>
        implements Emitter<T>, Disposable {

    private Observer<T> observer;

    public EmitterCreate(Observer<T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

    @Override
    public void onError() {
        if (!isDisposed()) {
            try {
                observer.onError();
            } finally {
                dispose();
            }
        }
    }

    @Override
    public void dispose() {
        DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed() {
        return DisposableHelper.isDisposed(get());
    }
}

以下是RxJava源代碼

1崭添、Observable.create

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");//判空
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));//返回自身
}

RxJavaPlugins.onAssembly只是對傳遞進(jìn)來的參數(shù)做判斷處理寓娩,最終還是返回ObservableCreate,有關(guān)RxJavaPlugins的東西最終都是返回自身呼渣,RxJavaPlugins后面分析會(huì)說到棘伴,這里只需要知道他是返回參數(shù)本身即可

2、Observable.subscribe

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");//判空
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);//返回自身

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);//回調(diào)ObservableCreate的subscribeActual
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        ......
        throw npe;
    }
}

observable.subscribe和我們手寫代碼一樣屁置,最終調(diào)用的是ObservableCreate的subscribeActual方法

3焊夸、ObservableCreate

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //1、創(chuàng)建發(fā)射器
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //2蓝角、回調(diào)observer的onSubscribe
        observer.onSubscribe(parent);

        try {
            //3阱穗、回調(diào)ObservableOnSubscribe的subscribe
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public ObservableEmitter<T> serialize() {
            return new SerializedEmitter<T>(this);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }
}

ObservableCreate和我們手寫代碼一樣,創(chuàng)建發(fā)射器使鹅,并在發(fā)射器中做發(fā)射數(shù)據(jù)等操作

小結(jié)

如圖所示

在這里插入圖片描述

10.2 RxJava的事件釋放原理

知識(shí)點(diǎn):

  • 理解釋放事件的原理

有關(guān)RxJava的釋放原理是基于Observable可以返回Disposable對象揪阶,只有調(diào)用dispose()才能釋放事件,通過上面的例子患朱,我們知道在發(fā)射器里面有isDisposed()dispose()操作鲁僚,在發(fā)射完onError事件的情況下,我們會(huì)將事件釋放,所以在finally會(huì)做釋放操作冰沙,防止后面的事件再次發(fā)射

以下是手寫RxJava的代碼

public class EmitterCreate<T>
        extends AtomicReference<Disposable>
        implements Emitter<T>, Disposable {

    private Observer<T> observer;

    public EmitterCreate(Observer<T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

    @Override
    public void onError() {
        if (!isDisposed()) {
            try {
                observer.onError();
            } finally {
                dispose();
            }
        }
    }

    @Override
    public void dispose() {
        DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed() {
        return DisposableHelper.isDisposed(get());
    }
}

以下是RxJava源代碼

@Override
public void dispose() {
    DisposableHelper.dispose(this);
}

@Override
public boolean isDisposed() {
    return DisposableHelper.isDisposed(get());
}

可以發(fā)現(xiàn)事件的釋放都是通過DisposableHelper去觸發(fā)的侨艾,不管是手寫RxJava還是源代碼,釋放RxJava都是通過DisposableHelper進(jìn)行釋放拓挥,具體看DisposableHelper蒋畜。在我們的演示程序中,我們通過發(fā)射onNext->onError->onNext的過程撞叽,去挖掘事件是怎么被釋放掉的

public enum DisposableHelper implements Disposable {
    
    DISPOSED
    ;

    public static boolean isDisposed(Disposable d) {
        return d == DISPOSED;
    }

    public static boolean dispose(AtomicReference<Disposable> field) {
        Disposable current = field.get();//獲取參數(shù)的Disposable對象
        Disposable d = DISPOSED;//聲明一個(gè)已經(jīng)釋放的Disposable對象
        if (current != d) {//如果當(dāng)前未被釋放
            current = field.getAndSet(d);//則將當(dāng)前的Disposable賦值成已經(jīng)釋放過的Disposable對象
            if (current != d) {//如果當(dāng)前還未被釋放
                if (current != null) {//且不為空
                    current.dispose();//則釋放當(dāng)前Disposable對象
                }
                return true;
            }
        }
        return false;
    }
}

在事件釋放的過程中姻成,EmitterCreate本身是個(gè)AtomicReference<Disposable>,代碼通過get()去獲取Disposable對象愿棋,其中代碼會(huì)通過雙層判斷去做釋放科展,防止在多線程的時(shí)候出現(xiàn)搶奪的情況

  • onNext:第一次發(fā)射數(shù)據(jù)時(shí),get()會(huì)獲取一個(gè)null對象糠雨,所以不符合d == DISPOSED
  • onEroor:這時(shí)候會(huì)調(diào)用dispose()去比較當(dāng)前和釋放過的對象才睹,如果不等于,則將當(dāng)前的對象設(shè)置為釋放過的值
  • onNext:第二次發(fā)射數(shù)據(jù)時(shí)甘邀,get()會(huì)獲取一個(gè)已經(jīng)釋放過的對象琅攘,這個(gè)時(shí)候符合d == DISPOSED

其實(shí)這里的操作如同設(shè)置一個(gè)Flag,但由于Disposable是對象的形式松邪,且需要保證原子性坞琴,AtomicReference類型是個(gè)最佳選擇,能保證對象的原子性

10.3 RxJava的背壓原理

知識(shí)點(diǎn):

  • 理解背壓實(shí)現(xiàn)的本質(zhì)
  • 理解背壓數(shù)據(jù)項(xiàng)丟棄的本質(zhì)

背壓原理有一部分和RxJava事件發(fā)射原理相似逗抑,其背壓的過程就是在不同策略的發(fā)射器去處理當(dāng)前的數(shù)據(jù)項(xiàng)而已剧辐。在分析背壓策略的時(shí)候,我們都知道背壓是需要手動(dòng)進(jìn)行請求才可以將數(shù)據(jù)發(fā)射到觀察者中邮府,所以我們會(huì)調(diào)用s.request(Long.MAX_VALUE)讓觀察者能接收到數(shù)據(jù)荧关。有些人就會(huì)有疑問,為什么有些人平時(shí)用背壓的時(shí)候褂傀,不需要去調(diào)用request()就能接收到數(shù)據(jù)忍啤,原因是有些背壓已經(jīng)在內(nèi)部默認(rèn)調(diào)用了s.request(Long.MAX_VALUE),所以這里是不用多想的仙辟,是一定要調(diào)用s.request(Long.MAX_VALUE)才能收到數(shù)據(jù)的同波。由于不同背壓的策略的原理大同小異,主要以Drop策略去分析背壓的原理

public static void drop(View view) {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 1000; i++) {
                emitter.onNext(i);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new FlowableSubscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Long.MAX_VALUE);
                }

                @Override
                public void onNext(Integer integer) {
                    Log.e("TAG", "onNext=" + integer);
                }

                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }

                @Override
                public void onComplete() {

                }
            });
}

以下是RxJava源代碼

1欺嗤、Flowable.create

public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
    ObjectHelper.requireNonNull(source, "source is null");//判空
    ObjectHelper.requireNonNull(mode, "mode is null");//判空
    return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));//返回自身
}

Flowable.create跟我們前面是一樣的参萄,最后還是會(huì)交給新對象FlowableCreate去處理

2、Flowable.subscribe

public final void subscribe(FlowableSubscriber<? super T> s) {
    ObjectHelper.requireNonNull(s, "s is null");
    try {
        Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);

        ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber");

        subscribeActual(z);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        ......
        throw npe;
    }
}

Flowable.subscribe跟我們前面是一樣的煎饼,最終調(diào)用的是FlowableCreate的subscribeActual方法

3、FlowableCreate.subscribeActual

public final class FlowableCreate<T> extends Flowable<T> {

    final FlowableOnSubscribe<T> source;

    final BackpressureStrategy backpressure;

    public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
        this.source = source;
        this.backpressure = backpressure;
    }

    @Override
    public void subscribeActual(Subscriber<? super T> t) {
        //使用三步曲分析法
        
        BaseEmitter<T> emitter;

        switch (backpressure) {
        case MISSING: {
            emitter = new MissingEmitter<T>(t);
            break;
        }
        case ERROR: {
            emitter = new ErrorAsyncEmitter<T>(t);
            break;
        }
        case DROP: {
            emitter = new DropAsyncEmitter<T>(t);
            break;
        }
        case LATEST: {
            emitter = new LatestAsyncEmitter<T>(t);
            break;
        }
        default: {
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
        }
        }

        t.onSubscribe(emitter);
        try {
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }
}

subscribeActual會(huì)根據(jù)不同的策略生成不同的發(fā)射器校赤,具體的所有策略邏輯都在發(fā)射器中體現(xiàn)的

4吆玖、DropAsyncEmitter

static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {

    private static final long serialVersionUID = 8360058422307496563L;

    DropAsyncEmitter(Subscriber<? super T> actual) {
        super(actual);
    }

    @Override
    void onOverflow() {
        // nothing to do
    }

}

DropAsyncEmitter其實(shí)沒做什么事情筒溃,主要都在其父類中實(shí)現(xiàn)了,onOverflow的回調(diào)表示事件流溢出的時(shí)候的處理沾乘,很明顯Drop策略就把溢出的數(shù)據(jù)項(xiàng)直接不做處理怜奖,意思就是拋棄掉這個(gè)數(shù)據(jù)項(xiàng)了

static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {

    private static final long serialVersionUID = 338953216916120960L;

    ErrorAsyncEmitter(Subscriber<? super T> actual) {
        super(actual);
    }

    @Override
    void onOverflow() {
        onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
    }

}

再看看Error策略,溢出之后就會(huì)拋出溢出的異常翅阵,其他策略也類似分析歪玲,具體父類是如何處理溢出函數(shù)的呢

5、NoOverflowBaseAsyncEmitter

abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {

    private static final long serialVersionUID = 4127754106204442833L;

    NoOverflowBaseAsyncEmitter(Subscriber<? super T> actual) {
        super(actual);
    }

    @Override
    public final void onNext(T t) {
        if (isCancelled()) {
            return;
        }

        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        
        //這里暫時(shí)將get()函數(shù)當(dāng)作是類似于List這種的容器掷匠,存儲(chǔ)的是當(dāng)前需要處理的數(shù)據(jù)項(xiàng)
        if (get() != 0) { //從數(shù)據(jù)項(xiàng)容器中取值滥崩,如果當(dāng)前有數(shù)據(jù)項(xiàng)需要處理
            actual.onNext(t); //發(fā)射數(shù)據(jù)項(xiàng)
            BackpressureHelper.produced(this, 1); //對當(dāng)前存在需要處理的數(shù)據(jù)項(xiàng)進(jìn)行-1操作
        } else {
            onOverflow(); //從數(shù)據(jù)項(xiàng)容器中取值,如果當(dāng)前沒有數(shù)據(jù)項(xiàng)需要處理讹语,則回調(diào)溢出函數(shù)
        }
    }

    abstract void onOverflow();
}

NoOverflowBaseAsyncEmitter在發(fā)射數(shù)據(jù)項(xiàng)的時(shí)候钙皮,會(huì)去BaseEmitter中的數(shù)據(jù)項(xiàng)容器去取出數(shù)據(jù)項(xiàng),如果存在則處理顽决,不存在則表示溢出短条,回調(diào)溢出函數(shù),那么具體的數(shù)據(jù)項(xiàng)容器時(shí)候怎么存儲(chǔ)需要處理的數(shù)據(jù)項(xiàng)的呢

6才菠、BaseEmitter

abstract static class BaseEmitter<T>
    extends AtomicLong
    implements FlowableEmitter<T>, Subscription {
    
    private static final long serialVersionUID = 7326289992464377023L;

    final Subscriber<? super T> actual;

    final SequentialDisposable serial;

    BaseEmitter(Subscriber<? super T> actual) {
        this.actual = actual;
        this.serial = new SequentialDisposable();
    }

    @Override
    public void onComplete() {
        complete();
    }

    protected void complete() {
        if (isCancelled()) {
            return;
        }
        try {
            actual.onComplete();
        } finally {
            serial.dispose();
        }
    }

    @Override
    public final void onError(Throwable e) {
        if (!tryOnError(e)) {
            RxJavaPlugins.onError(e);
        }
    }

    @Override
    public final void request(long n) {
        if (SubscriptionHelper.validate(n)) {
            BackpressureHelper.add(this, n);
        }
    }
}

BaseEmitter就是一個(gè)AtomicLong茸时,如果沒學(xué)過AtomicLong的話,可以簡單理解為一個(gè)計(jì)數(shù)器赋访,get()就是獲取當(dāng)前的Long值屹蚊,只要不等于0就表示有值。主要還是在request()进每,request()表示此時(shí)需要處理的數(shù)據(jù)項(xiàng)汹粤。結(jié)合上面NoOverflowBaseAsyncEmitter的中的BackpressureHelper.produced(this, 1)和當(dāng)前BaseEmitter中的BackpressureHelper.add(this, n),可得數(shù)據(jù)項(xiàng)的容器完全都是由BackpressureHelper去控制田晚,我們只需要對BackpressureHelper的存儲(chǔ)和獲取做分析嘱兼,就可以知道當(dāng)前是否有數(shù)據(jù)項(xiàng)需要處理

7、BackpressureHelper

public final class BackpressureHelper {

    public static long add(AtomicLong requested, long n) {
        for (;;) {
            long r = requested.get(); //獲取當(dāng)前數(shù)據(jù)項(xiàng)
            if (r == Long.MAX_VALUE) {
                return Long.MAX_VALUE;
            }
            long u = addCap(r, n);//當(dāng)前數(shù)據(jù)項(xiàng) + 新增的數(shù)據(jù)項(xiàng)
            if (requested.compareAndSet(r, u)) { //設(shè)置最新的數(shù)據(jù)項(xiàng)
                return r;
            }
        }
    }
    
    public static long addCap(long a, long b) {
        long u = a + b;
        if (u < 0L) {
            return Long.MAX_VALUE;
        }
        return u;
    }
    
    public static long produced(AtomicLong requested, long n) {
        for (;;) {
            long current = requested.get(); //獲取當(dāng)前數(shù)據(jù)項(xiàng)
            if (current == Long.MAX_VALUE) {
                return Long.MAX_VALUE;
            }
            long update = current - n; //當(dāng)前數(shù)據(jù)項(xiàng) - 需要發(fā)射的數(shù)據(jù)項(xiàng)(從源碼上贤徒,n為1)
            if (update < 0L) { //不能為負(fù)數(shù)
                RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
                update = 0L;
            }
            if (requested.compareAndSet(current, update)) { //設(shè)置最新的數(shù)據(jù)項(xiàng)
                return update;
            }
        }
    }
}

BackpressureHelper就是利用AtomicLong的原子性就行簡單的計(jì)數(shù)器操作而已芹壕,并沒有什么復(fù)雜的操作。至此接奈,我們就知道背壓的原理原來就是利用AtomicLong計(jì)數(shù)器和生產(chǎn)消費(fèi)的模式去決定是否發(fā)射當(dāng)前的數(shù)據(jù)項(xiàng)而已

10.4 RxJava的常用操作符原理

知識(shí)點(diǎn):

  • 理解map操作符的原理

RxJava常用操作符的代表就是map踢涌,分析map源碼后,其他的操作符的思想是一樣的序宦,只不過是實(shí)現(xiàn)邏輯不一致而已睁壁。下面我們通過分析map的主要流程去分析map是如何轉(zhuǎn)換字符串的,從上面我們知道Observable的創(chuàng)建、訂閱潘明、發(fā)射的過程行剂,這次對于重復(fù)的內(nèi)容就不再繼續(xù)分析,主要是分析中間map是如何回調(diào)apply()去將數(shù)據(jù)項(xiàng)轉(zhuǎn)換成字符串的

public void map() {
    //創(chuàng)建被觀察者
    Observable
            .create(new ObservableOnSubscribe<String>() {
                @Override
                //默認(rèn)在主線程里執(zhí)行該方法
                public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                    e.onNext("俊俊俊很帥");
                    e.onNext("你值得擁有");
                    e.onNext("取消關(guān)注");
                    e.onNext("但還是要保持微笑");
                    e.onComplete();
                }
            })
            .map(new Function<String, String>() {
                @Override
                public String apply(String s) throws Exception {
                    return "Hello";
                }
            })
            //創(chuàng)建觀察者并訂閱
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    if (!d.isDisposed()) {
                        d.dispose();
                    }
                }

                @Override
                public void onNext(String s) {
                    System.out.println("onNext=" + s);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("onNext=" + e.getMessage());
                }

                @Override
                public void onComplete() {

                }
            });
}

以下是RxJava源代碼

1钳降、Observable.map

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

從create到map的過程中厚宰,create的時(shí)候,當(dāng)前的Observable已經(jīng)被轉(zhuǎn)換成ObservableCreate遂填,再次map的時(shí)候铲觉,
當(dāng)前的Observable已經(jīng)被轉(zhuǎn)換成ObservableMap,而且在ObservableMap中傳遞的參數(shù)包含this吓坚,所以當(dāng)前ObservableMap中是嵌套著ObservableCreate

2撵幽、Observable.subscribe

由于當(dāng)前的Observable是ObservableMap,所以O(shè)bservable.subscribe會(huì)回調(diào)ObservableMap中的subscribeActual

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));//source是傳遞進(jìn)來的ObservableCreate
    }
}

ObservableMap中的subscribeActual凌唬,會(huì)去調(diào)用ObservableCreatesubscribe方法并齐,最后還是會(huì)去回調(diào)
ObservableCreatesubscribeActual,不過這里在回調(diào)的過程中增加了一個(gè)參數(shù)MapObserver客税,這個(gè)參數(shù)只有在ObservableCreate發(fā)射器發(fā)射的時(shí)候才會(huì)被調(diào)用

3况褪、ObservableCreate.subscribeActual

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

ObservableCreate.subscribeActual中,會(huì)接收一個(gè)Observer的參數(shù)更耻,這個(gè)時(shí)候的Observer的參數(shù)是從ObservableMap中傳遞過來的MapObserver测垛,當(dāng)CreateEmitter發(fā)射onNext的時(shí)候,就會(huì)在當(dāng)前的MapObserver對象onNext進(jìn)行處理

4秧均、MapObserver.onNext

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
    final Function<? super T, ? extends U> mapper;

    MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
        super(actual);
        this.mapper = mapper;
    }

    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }

        if (sourceMode != NONE) {
            actual.onNext(null);
            return;
        }

        U v;

        try {
            v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
        } catch (Throwable ex) {
            fail(ex);
            return;
        }
        actual.onNext(v);
    }
    
    ......
}

onNext主要做了兩個(gè)事情食侮,一個(gè)是mapper.apply(t),這個(gè)就是map操作符所實(shí)現(xiàn)的方法目胡,這里就將原來的值轉(zhuǎn)換成新的值锯七,一個(gè)是actual.onNext(v),將轉(zhuǎn)換出來的新值v繼續(xù)onNext出去誉己,這里的actual就是在構(gòu)造函數(shù)中傳遞進(jìn)來的ObservableCreate眉尸,這里就已經(jīng)將數(shù)據(jù)項(xiàng)經(jīng)過map的操作符后繼續(xù)執(zhí)行后面正常的發(fā)射流程

小結(jié)

如圖所示

在這里插入圖片描述

10.5 RxJava的線程切換原理

知識(shí)點(diǎn):

  • 理解在工作線程上為什么能執(zhí)行耗時(shí)操作
  • 理解在UI線程為什么能執(zhí)行更新UI的操作

沿用上面的例子,在線程切換的過程中巨双,無非就是相當(dāng)于不同的操作符繼續(xù)操作數(shù)據(jù)項(xiàng)而已噪猾,根本的實(shí)現(xiàn)思路和map等操作符是一樣的,也是通過嵌套Observable的過程來執(zhí)行的筑累,只不過是線程切換的操作符內(nèi)部實(shí)現(xiàn)的邏輯有區(qū)別而已袱蜡。通過我們以往的思路去想,這兩個(gè)知識(shí)點(diǎn)無非就是啟動(dòng)線程池去執(zhí)行耗時(shí)任務(wù)慢宗,而UI線程則是交給Handler去處理坪蚁,RxJava線程切換的原理就是這樣的

Observable
        .create(new ObservableOnSubscribe<String>() {
            @Override
            //默認(rèn)在主線程里執(zhí)行該方法
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("俊俊俊很帥");
                e.onNext("你值得擁有");
                e.onNext("取消關(guān)注");
                e.onNext("但還是要保持微笑");
                e.onComplete();
            }
        })
        .map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                return "Hello";
            }
        })
        //將被觀察者切換到子線程
        .subscribeOn(Schedulers.io())
        //將觀察者切換到主線程  需要在Android環(huán)境下運(yùn)行
        .observeOn(AndroidSchedulers.mainThread())
        //創(chuàng)建觀察者并訂閱
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                if (!d.isDisposed()) {
                    d.dispose();
                }
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext=" + s);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onNext=" + e.getMessage());
            }

            @Override
            public void onComplete() {

            }
        });

基礎(chǔ)概念:

  • Schedulers:調(diào)度器的管理者奔穿。管理著多種不同種類的Scheduler
  • Scheduler:調(diào)度器。負(fù)責(zé)線程Worker的創(chuàng)建createWorker()迅细,調(diào)度Worker的執(zhí)行schedule()
  • Worker:抽象的工作線程巫橄。被線程調(diào)度器管理淘邻,負(fù)責(zé)線程的創(chuàng)建和執(zhí)行

在源碼中茵典,我們需要先熟悉這三者之間的關(guān)系到底是如何運(yùn)作的

以下是RxJava源代碼

1、observeOn()

@CheckReturnValue
@SchedulerSupport("custom")
public final Observable<T> observeOn(Scheduler scheduler) {
    return this.observeOn(scheduler, false, bufferSize());
}

@CheckReturnValue
@SchedulerSupport("custom")
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize));
}

當(dāng)前的Observable已經(jīng)被轉(zhuǎn)換成ObservableObserveOn

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;

    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    protected void subscribeActual(Observer<? super T> observer) {
        if (this.scheduler instanceof TrampolineScheduler) {
            this.source.subscribe(observer);
        } else {
            //1宾舅、創(chuàng)建工作線程
            Worker w = this.scheduler.createWorker();
            //2统阿、訂閱之后,在發(fā)射的過程中
            this.source.subscribe(new ObservableObserveOn.ObserveOnObserver(observer, w, this.delayError, this.bufferSize));
        }

    }

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> actual;
        final Worker worker;
        final boolean delayError;
        final int bufferSize;
        SimpleQueue<T> queue;
        
        public void onNext(T t) {
            if (!this.done) {
                if (this.sourceMode != 2) {
                    this.queue.offer(t);
                }
                //3筹我、在OnNext中執(zhí)行
                this.schedule();
            }
        }

        void schedule() {
            if (this.getAndIncrement() == 0) {
                //4扶平、執(zhí)行工作線程
                this.worker.schedule(this);
            }
        }
    }
}

其三者的關(guān)系簡單的說就是在每次訂閱的時(shí)候,都會(huì)去創(chuàng)建出對應(yīng)的工作線程蔬蕊,這個(gè)工作線程取決于你傳遞的參數(shù)是哪個(gè)Worker结澄,在發(fā)射器發(fā)射的過程中,這個(gè)工作線程總會(huì)去執(zhí)行它的回調(diào)schedule岸夯,其實(shí)大部分的操作就是在schedule里面執(zhí)行線程麻献。搞懂了三者的關(guān)系之后,分析線程切換就簡單多了猜扮,就相當(dāng)于工廠一樣勉吻,給個(gè)具體的任務(wù)給到具體的工人去執(zhí)行,很像工廠的流水線旅赢,我們已經(jīng)確定下來了流水線的流程了齿桃,這個(gè)時(shí)候我們就需要去關(guān)心參數(shù)具體是什么東西了。在閱讀subscribeOn煮盼、observeOn前短纵,我們先看看這兩個(gè)方法中的參數(shù)都是什么

1、Schedulers.io()

public final class Schedulers {

    @NonNull
    static final Scheduler IO;
    
    static {
        
        SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

        COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
        
        //1僵控、在初始化的時(shí)候就構(gòu)建出了IOTask香到,initIoScheduler會(huì)去執(zhí)行IOTask的call方法
        IO = RxJavaPlugins.initIoScheduler(new IOTask());

        TRAMPOLINE = TrampolineScheduler.instance();

        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }
    
    static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            //2、IOTask的call方法會(huì)去獲取IoHolder的值
            return IoHolder.DEFAULT;
        }
    }
    
    static final class IoHolder {
        //3喉祭、創(chuàng)建IoScheduler
        static final Scheduler DEFAULT = new IoScheduler();
    }
    
    public static Scheduler io() {
        //Schedulers.io():它會(huì)去獲取前面3步創(chuàng)建出來的IoScheduler對象
        return RxJavaPlugins.onIoScheduler(IO); //返回IO自身
    }
}

其正在實(shí)現(xiàn)在IoScheduler养渴,其表示管理Io線程的管理者

public final class IoScheduler extends Scheduler {
    
    final AtomicReference<CachedWorkerPool> pool;
    
    static final CachedWorkerPool NONE;
    static {
        ......
        //1、創(chuàng)建CachedWorkerPool
        NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
    }
    
    static final class CachedWorkerPool implements Runnable {
        
        ......
        private final ScheduledExecutorService evictorService;
        private final Future<?> evictorTask;
        
        CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
            ......
            Future<?> task = null;
            if (unit != null) {
                evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
                task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
            }
            evictorService = evictor;
            evictorTask = task;
        }
    }
    
    @NonNull
    @Override
    public Worker createWorker() {
        //2泛烙、創(chuàng)建具體的線程
        return new EventLoopWorker(pool.get());
    }
    
    static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
            //3理卑、最終會(huì)去調(diào)用ThreadWorker的scheduleActual
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }
    
    //4、由于ThreadWorker沒有scheduleActual蔽氨,在父類中找NewThreadWorker
    static final class ThreadWorker extends NewThreadWorker {
        private long expirationTime;

        ThreadWorker(ThreadFactory threadFactory) {
            super(threadFactory);
            this.expirationTime = 0L;
        }

        public long getExpirationTime() {
            return expirationTime;
        }

        public void setExpirationTime(long expirationTime) {
            this.expirationTime = expirationTime;
        }
    }
}

NewThreadWorker藐唠,最終還是調(diào)用executor.submit()executor.schedule()

@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }

    Future<?> f;
    try {
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }

    return sr;
}

2帆疟、AndroidSchedulers.mainThread()

public final class AndroidSchedulers {

    private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
}

返回一個(gè)HandlerScheduler,創(chuàng)建單例模式的主線程Handler

final class HandlerScheduler extends Scheduler {
    private final Handler handler;

    HandlerScheduler(Handler handler) {
        this.handler = handler;
    }

    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) {
            throw new NullPointerException("run == null");
        } else if (unit == null) {
            throw new NullPointerException("unit == null");
        } else {
            run = RxJavaPlugins.onSchedule(run);
            HandlerScheduler.ScheduledRunnable scheduled = new HandlerScheduler.ScheduledRunnable(this.handler, run);
            this.handler.postDelayed(scheduled, unit.toMillis(delay));
            return scheduled;
        }
    }

    public Worker createWorker() {
        //創(chuàng)建具體工作線程
        return new HandlerScheduler.HandlerWorker(this.handler);
    }

    ......
}

就好像我們上面分析的三者關(guān)系一樣宇立,Schedule最終還是會(huì)管理著具體的工作線程

private static final class HandlerWorker extends Worker {
    private final Handler handler;

    private volatile boolean disposed;

    HandlerWorker(Handler handler) {
        this.handler = handler;
    }

    @Override
    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        if (disposed) {
            return Disposables.disposed();
        }

        run = RxJavaPlugins.onSchedule(run);
        //包裝新的Runnable交給Handler
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

        Message message = Message.obtain(handler, scheduled);
        message.obj = this; // Used as token for batch disposal of this worker's runnables.

        handler.sendMessageDelayed(message, unit.toMillis(delay));

        // Re-check disposed state for removing in case we were racing a call to dispose().
        if (disposed) {
            handler.removeCallbacks(scheduled);
            return Disposables.disposed();
        }

        return scheduled;
    }

    @Override
    public void dispose() {
        disposed = true;
        handler.removeCallbacksAndMessages(this /* token */);
    }

    @Override
    public boolean isDisposed() {
        return disposed;
    }
}

3踪宠、subscribeOn()

理解完參數(shù)后,回到我們的分析重點(diǎn)

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

subscribeOn就如同普通操作符一樣妈嘹,包裝一層ObservableSubscribeOn柳琢,在subscribe的時(shí)候真正走的還是subscribeActual

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        //使用三步曲分析法
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
        s.onSubscribe(parent);
        //3、將第三步的內(nèi)容放到線程中去執(zhí)行
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
    
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
    
        @Override
        public void run() {
            //3润脸、回調(diào)ObservableOnSubscribe的subscribe
            source.subscribe(parent);
        }
    }
}

scheduler.scheduleDirect中會(huì)去執(zhí)行Scheduler里的方法柬脸,這里的scheduler就是IoScheduler

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    DisposeTask task = new DisposeTask(decoratedRun, w);

    w.schedule(task, delay, unit);

    return task;
}

回調(diào)IoScheduler的createWorker()并執(zhí)行w.schedule()

小結(jié)

如圖所示

在這里插入圖片描述

10.6 RxJava的自定義Operator原理

知識(shí)點(diǎn):

  • 自定義Operator是如何實(shí)現(xiàn)的

在講解之前,讓我們先回味下自定義Operator

public class CustomOperator implements ObservableOperator<String, List<String>> {

    @Override
    public Observer<? super List<String>> apply(final Observer<? super String> observer) throws Exception {
        return new Observer<List<String>>() {
            @Override
            public void onSubscribe(Disposable d) {
                observer.onSubscribe(d);
            }

            @Override
            public void onNext(List<String> strings) {
                observer.onNext(strings.toString());
            }

            @Override
            public void onError(Throwable e) {
                observer.onError(e);
            }

            @Override
            public void onComplete() {
                observer.onComplete();
            }
        };
    }
}
Observable.create(new ObservableOnSubscribe<List<String>>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<List<String>> e) throws Exception {
      
    }
}).lift(new CustomOperator())

自定義Operator如同普通的操作符原理差不多毙驯,用的是lift的操作符倒堕,只不過在lift里面將邏輯的執(zhí)行回調(diào)到自定義的Operator的apply()

以下是RxJava源代碼

1、Observable.lift

public final <R> Observable<R> lift(ObservableOperator<? extends R, ? super T> lifter) {
    ObjectHelper.requireNonNull(lifter, "onLift is null");
    return RxJavaPlugins.onAssembly(new ObservableLift<R, T>(this, lifter));
}

2爆价、ObservableLift.subscribeActual

public final class ObservableLift<R, T> extends AbstractObservableWithUpstream<T, R> {
    /** The actual operator. */
    final ObservableOperator<? extends R, ? super T> operator;

    public ObservableLift(ObservableSource<T> source, ObservableOperator<? extends R, ? super T> operator) {
        super(source);
        this.operator = operator;
    }

    @Override
    public void subscribeActual(Observer<? super R> s) {
        Observer<? super T> observer;
        try {
            observer = ObjectHelper.requireNonNull(operator.apply(s), "Operator " + operator + " returned a null Observer");
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Disposable already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }

        source.subscribe(observer);
    }
}

可以看到代碼非晨寻停快的就將傳遞進(jìn)來的參數(shù)operator執(zhí)行apply()

10.7 RxJava的自定義Transformer原理

知識(shí)點(diǎn):

  • 自定義Transformer是如何實(shí)現(xiàn)的

在講解之前,讓我們先回味下自定義Transformer

public class NetWorkTransformer implements ObservableTransformer {

    @Override
    public ObservableSource apply(Observable upstream) {
        return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
    }
}
Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        
    }
}).compose(new CustomTransformer())

自定義Transformer如同普通的操作符原理差不多铭段,用的是compose的操作符骤宣,只不過在compose里面將邏輯的執(zhí)行回調(diào)到自定義的Transformer的apply()

以下是RxJava源代碼

1、Observable.compose

public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
    return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
}

可以看到代碼非吵硐睿快的就將傳遞進(jìn)來的參數(shù)composer執(zhí)行apply()涯雅,這里的wrap()只是將代碼裹了一層,如果你想簡單的理解的話展运,可以理解為作者的強(qiáng)迫癥犯了活逆,只是為了讓所有代碼看起來都比較規(guī)范,不然這里實(shí)在和其他操作符的實(shí)現(xiàn)不一樣拗胜,我們可以追進(jìn)去wrap()

public static <T> Observable<T> wrap(ObservableSource<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    if (source instanceof Observable) {
        return RxJavaPlugins.onAssembly((Observable<T>)source);
    }
    return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(source));
}

wrap()其實(shí)是對composer操作符做了Hook蔗候,因?yàn)樗胁僮鞣紩?huì)被RxJava去Hook住,這里會(huì)在下面講到自定義Plugin原理的時(shí)候就明白了

10.8 RxJava的自定義Plugin原理

知識(shí)點(diǎn):

  • 自定義Plugin是如何實(shí)現(xiàn)AOP的

在講解之前埂软,讓我們先回味下自定義Plugin

public class CustomObservableAssembly implements Function<Observable, Observable> {
    @Override
    public Observable apply(Observable observable) throws Exception {
        System.out.println("CustomObservableAssembly observable.toString:" + observable.toString());
        return observable;
    }
}
RxJavaPlugins.setOnObservableAssembly(new CustomObservableAssembly());

在自定義Plugin中锈遥,類似于Android的術(shù)語Hook,但在這里并不是真正的Hook勘畔,而是作者在寫RxJava的時(shí)候去限定一套規(guī)范所灸,讓后面的所有操作符或其他操作等,都可以實(shí)現(xiàn)Hook的原理

以下是RxJava源代碼

1炫七、RxJavaPlugins.setOnObservableAssembly

public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
    if (lockdown) {
        throw new IllegalStateException("Plugins can't be changed anymore");
    }
    RxJavaPlugins.onObservableAssembly = onObservableAssembly;
}

RxJavaPlugins.setOnObservableAssembly只是對成員變量設(shè)置了自定義的值爬立,這個(gè)時(shí)候onObservableAssembly就有了值,默認(rèn)是為null的万哪。設(shè)置完值就表示已經(jīng)Hook成功了侠驯,當(dāng)操作符執(zhí)行的時(shí)候抡秆,是如何回調(diào)我們Hook的函數(shù)的

2、Observable.create

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

create相當(dāng)于一個(gè)操作符吟策,在每個(gè)操作符的里面都會(huì)去執(zhí)行一段RxJavaPlugins.onAssembly儒士,這里就是RxJava規(guī)定的規(guī)范,一開始我們只是說返回自身檩坚,但是有了Hook之后着撩,就會(huì)回調(diào)Hook函數(shù),返回已經(jīng)經(jīng)過二次加工的自身

3效床、RxJavaPlugins.onAssembly

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

由于我們已經(jīng)設(shè)置了新值睹酌,這里的onObservableAssembly就不為null权谁,不為null則執(zhí)行apply()剩檀,apply()就是我們Hook傳進(jìn)去參數(shù)的回調(diào)方法

10.9 美團(tuán)WhiteBoard

美團(tuán)的WhiteBoard其實(shí)是取自美團(tuán)的開源框架Shield——開源的移動(dòng)端頁面模塊化開發(fā)框架中的代碼,其主要作用是應(yīng)用RxJava的Subject搭起組件間通訊的橋梁旺芽。實(shí)質(zhì)上在WhiteBoard中沪猴,是將所有的組件的數(shù)據(jù)和Subject通訊的橋梁保存起來,通過key作為組件的唯一標(biāo)志采章。不過比較可惜的是WhiteBoard使用的是RxJava1运嗜,不過關(guān)系不大,只要讀懂里面的源碼即可

1悯舟、WhiteBoard的初始化

初始化放在Activity/Fragment界面中担租,相當(dāng)于通訊的橋梁,每個(gè)界面中僅有一個(gè)WhiteBoard的實(shí)例抵怎,并由所有組件共用

public abstract class ShieldFragment extends Fragment implements AgentCellBridgeInterface, DriverInterface {
    
    static final String TAG = ShieldFragment.class.getSimpleName();
    ......
    protected WhiteBoard whiteBoard;

    public ShieldFragment() {
        this.whiteBoard = new WhiteBoard();//初始化
    }

    @Override
    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        ......
        whiteBoard.onCreate(savedInstanceState);//對應(yīng)生命周期
    }

    @Override
    public void onDestroy() {
        super.onDestroy();
        ......
        whiteBoard.onDestory();//對應(yīng)生命周期
    }

    @Override
    public void onSaveInstanceState(Bundle outState) {
        super.onSaveInstanceState(outState);
        ......
        whiteBoard.onSaveInstanceState(outState);//對應(yīng)生命周期
    }

    @Override
    public WhiteBoard getWhiteBoard() {
        return whiteBoard;//獲取實(shí)例
    }
}

2奋救、WhiteBoard監(jiān)聽通知

組件只監(jiān)聽某個(gè)key的事件,有通知的時(shí)候就能收到

public class MixCellAgent extends LightAgent {
    private MixCell mixCell;
    private Subscription loadingSubscription;
    private Subscription emptySubscription;

    public MixCellAgent(Fragment fragment, DriverInterface bridge, PageContainerInterface pageContainer) {
        super(fragment, bridge, pageContainer);
    }

    @Override
    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        mixCell = new MixCell(getContext(), this);
        loadingSubscription = getWhiteBoard().getObservable(MixLoadingAgent.KEY_LOADING).filter(new Func1() {
            @Override
            public Object call(Object o) {
                return o instanceof Boolean && ((Boolean) o);
            }
        }).subscribe(new Action1() {
            @Override
            public void call(Object o) {
                loading();
            }
        });

        emptySubscription = getWhiteBoard().getObservable(MixLoadingAgent.KEY_EMPTY).filter(new Func1<Object, Boolean>() {
            @Override
            public Boolean call(Object o) {
                return o instanceof Boolean && ((Boolean) o);
            }
        }).subscribe(new Action1() {
            @Override
            public void call(Object o) {
                mixCell.onEmpty();
            }
        });
        
        ......
    }

    @Override
    public void onDestroy() {
        if (loadingSubscription != null) {
            loadingSubscription.unsubscribe();
            loadingSubscription = null;
        }

        if (emptySubscription != null) {
            emptySubscription.unsubscribe();
        }
        
        ......
    }
}

3反惕、WhiteBoard的發(fā)送通知

WhiteBoard發(fā)送通知就是調(diào)用WhiteBoard提供的所有put方法尝艘,具體是如何收到消息的,還需要通過WhiteBoard的源碼看下

public class MixLoadingAgent extends LightAgent implements MixLoadingCell.MixLoadingListener {
    public static final String KEY_LOADING = "loading";
    public static final String KEY_EMPTY = "empty";
    public static final String KEY_FAILED = "failed";
    public static final String KEY_MORE = "more";
    public static final String KEY_DONE = "done";

    private MixLoadingCell mixLoadingCell;

    public MixLoadingAgent(Fragment fragment, DriverInterface bridge, PageContainerInterface pageContainer) {
        super(fragment, bridge, pageContainer);
        mixLoadingCell = new MixLoadingCell(getContext());
        mixLoadingCell.setOnMixLoadingListener(this);
    }

    @Override
    public SectionCellInterface getSectionCellInterface() {
        return mixLoadingCell;
    }

    @Override
    public void onLoading() {
        getWhiteBoard().putBoolean(KEY_LOADING, true);
    }

    @Override
    public void onEmpty() {
        getWhiteBoard().putBoolean(KEY_EMPTY, true);
    }

    @Override
    public void onFailed() {
        getWhiteBoard().putBoolean(KEY_FAILED, true);
    }

    @Override
    public void onMore() {
        getWhiteBoard().putBoolean(KEY_MORE, true);
    }

    @Override
    public void onDone() {
        getWhiteBoard().putBoolean(KEY_DONE, true);
    }
}

4姿染、WhiteBoard的原理

最后只需要獲取實(shí)例后發(fā)送通知即可背亥,getWhiteBoard().putBoolean(key)。WhiteBoard原理是只要還是Subject的橋梁的作用

public class WhiteBoard {

    public static final String WHITE_BOARD_DATA_KEY = "White_Board_Data";
    protected Bundle mData;//保存所有組件的數(shù)據(jù)
    protected HashMap<String, Subject> subjectMap;//保存所有組件的通訊橋梁

    public WhiteBoard() {
        this(null);
    }

    public WhiteBoard(Bundle data) {
        mData = data;
        if (mData == null) {
            mData = new Bundle();//初始化
        }

        subjectMap = new HashMap<>();//初始化
    }

    public void onCreate(Bundle savedInstanceState) {
        if (savedInstanceState != null) {
            mData = savedInstanceState.getBundle(WHITE_BOARD_DATA_KEY);
        }

        if (mData == null) {
            mData = new Bundle();
        }
    }

    public void onSaveInstanceState(Bundle outState) {
        if (outState != null) {

            // here we must save a new copy of the mData into the outState
            outState.putBundle(WHITE_BOARD_DATA_KEY, new Bundle(mData));
        }
    }

    public void onDestory() {
        subjectMap.clear();
        mData.clear();
    }

    //通過key獲取某組件的橋梁
    public Observable getObservable(final String key) {

        Subject res = null;
        if (subjectMap.containsKey(key)) {
            res = subjectMap.get(key);
        } else {
            res = PublishSubject.create();
            subjectMap.put(key, res);
        }
        if (getData(key) != null) {
            return res.startWith(getData(key));//帶上已經(jīng)存儲(chǔ)過的數(shù)據(jù)
        } else {
            return res;
        }
    }

    //通過key通知某組件
    protected void notifyDataChanged(String key) {
        if (subjectMap.containsKey(key)) {
            subjectMap.get(key).onNext(mData.get(key));
        }
    }

    //移除組件中的數(shù)據(jù)
    public void removeData(String key) {
        mData.remove(key);
        notifyDataChanged(key);
    }
    
    //每次put值的時(shí)候悬赏,就會(huì)去通知對應(yīng)的組件
    public void putBoolean(@Nullable String key, boolean value) {
        mData.putBoolean(key, value);
        notifyDataChanged(key);
    }
    public void putInt(@Nullable String key, int value) {
        mData.putInt(key, value);
        notifyDataChanged(key);
    }
    public void putString(@Nullable String key, @Nullable String value) {
        mData.putString(key, value);
        notifyDataChanged(key);
    }
    ......

    public double getDouble(String key) {
        return mData.getDouble(key);
    }
    public String getString(String key, String defaultValue) {
        return mData.getString(key, defaultValue);
    }
    ......
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末狡汉,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子闽颇,更是在濱河造成了極大的恐慌盾戴,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,657評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件进萄,死亡現(xiàn)場離奇詭異捻脖,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)可婶,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事。” “怎么了峦椰?”我有些...
    開封第一講書人閱讀 164,057評論 0 354
  • 文/不壞的土叔 我叫張陵龄寞,是天一觀的道長。 經(jīng)常有香客問我汤功,道長物邑,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,509評論 1 293
  • 正文 為了忘掉前任滔金,我火速辦了婚禮色解,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘鹦蠕。我一直安慰自己冒签,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,562評論 6 392
  • 文/花漫 我一把揭開白布钟病。 她就那樣靜靜地躺著萧恕,像睡著了一般。 火紅的嫁衣襯著肌膚如雪肠阱。 梳的紋絲不亂的頭發(fā)上票唆,一...
    開封第一講書人閱讀 51,443評論 1 302
  • 那天,我揣著相機(jī)與錄音屹徘,去河邊找鬼走趋。 笑死,一個(gè)胖子當(dāng)著我的面吹牛噪伊,可吹牛的內(nèi)容都是我干的簿煌。 我是一名探鬼主播氮唯,決...
    沈念sama閱讀 40,251評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼姨伟!你這毒婦竟也來了惩琉?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,129評論 0 276
  • 序言:老撾萬榮一對情侶失蹤夺荒,失蹤者是張志新(化名)和其女友劉穎瞒渠,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體技扼,經(jīng)...
    沈念sama閱讀 45,561評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡伍玖,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,779評論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了剿吻。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片窍箍。...
    茶點(diǎn)故事閱讀 39,902評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖和橙,靈堂內(nèi)的尸體忽然破棺而出仔燕,到底是詐尸還是另有隱情,我是刑警寧澤魔招,帶...
    沈念sama閱讀 35,621評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站五辽,受9級特大地震影響办斑,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜杆逗,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,220評論 3 328
  • 文/蒙蒙 一乡翅、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧罪郊,春花似錦蠕蚜、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至癣疟,卻和暖如春挣柬,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背睛挚。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評論 1 269
  • 我被黑心中介騙來泰國打工邪蛔, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人扎狱。 一個(gè)月前我還...
    沈念sama閱讀 48,025評論 2 370
  • 正文 我出身青樓侧到,卻偏偏與公主長得像勃教,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子匠抗,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,843評論 2 354

推薦閱讀更多精彩內(nèi)容

  • 01 前幾天戈咳,隔壁店子里的夫妻倆在爭吵心软,一句比一句兇,男人有要?jiǎng)邮值内厔葜埽悄佑悬c(diǎn)嚇人删铃,我連忙跑過去勸勸,拉開踏堡。...
    蕓羲閱讀 2,994評論 10 32
  • 利用統(tǒng)計(jì)學(xué)原理猎唁,來研究學(xué)習(xí)漱挚。 由于每天花最多時(shí)間的事情就是睡覺监氢,所以從明天起,每天讀有關(guān)睡覺的paper半小時(shí)咧最。加油帐偎!
    愛跑步的coder閱讀 156評論 0 0
  • 這幾天負(fù)面情緒很多逐纬,頂著很大的壓力在這里工作。 第一削樊,這是一間什么都不完善的工廠豁生,這里的員工素質(zhì)很低。很隨意漫贞。夫妻...
    瑞秋bb閱讀 340評論 0 0