Android RxJava框架源碼解析

目錄

簡(jiǎn)單示例1:

private Disposable mDisposable;
Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("test");
            }
        })
        .subscribe(new Observer<String>() { 
            @Override
            public void onSubscribe(Disposable d) {
                mDisposable = d;
            }

            @Override
            public void onNext(String s) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });


    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null) {
            if (!mDisposable.isDisposed()) {
                mDisposable.dispose();
            }
        }
    }

特別注意:上面示例代碼中的mDisposable最后必須要釋放掉,不然會(huì)出現(xiàn)內(nèi)存泄漏

一前方、觀察者Observer創(chuàng)建過(guò)程

首先對(duì)觀察者Observer源碼開(kāi)始進(jìn)行簡(jiǎn)單分析下:
Observer.java

public interface Observer<T> {
    //表示一執(zhí)行subscribe訂閱就會(huì)執(zhí)行該函數(shù)狈醉,這個(gè)函數(shù)跟當(dāng)前調(diào)用.subscribe()一定執(zhí)行在主線程中
    void onSubscribe(@NonNull Disposable d);
   // 表示拿到上一個(gè)流程的數(shù)據(jù)
    void onNext(@NonNull T t);
   // 表示拿到上一個(gè)流程的錯(cuò)誤數(shù)據(jù)
    void onError(@NonNull Throwable e);
   // 表示事件流程結(jié)束
    void onComplete();
}

具體的對(duì)象創(chuàng)建是在上面示例代碼1中的new Observer<String>()操作,這個(gè)稱(chēng)這個(gè)為自定義觀察者惠险。

二苗傅、被觀察者Observable創(chuàng)建過(guò)程

分析完觀察者Observer的創(chuàng)建,現(xiàn)在來(lái)分析下被觀察者Observable的創(chuàng)建流程班巩,

Observable.create(new ObservableOnSubscribe<String>() {
     @Override
      public void subscribe(ObservableEmitter<String> e) throws Exception {
          e.onNext("test");
      }
    })

將new ObservableOnSubscribe()過(guò)程可以理解為是自定義source的過(guò)程渣慕。

new ObservableOnSubscribe<String>() {
     @Override
      public void subscribe(ObservableEmitter<String> e) throws Exception {
          e.onNext("test");
      }
    }

執(zhí)行Observable.create()代碼流程
Observable.java

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null"); //校驗(yàn)是否為null
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

其中,RxJavaPlugins.onAssembly()采用了hook技術(shù)抱慌,如果沒(méi)有重寫(xiě)RxJavaPlugins.setOnObservableAssembly()方法逊桦,這個(gè)可以不要考慮。
ObservableCreate.java

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source; // 自定義source

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public ObservableEmitter<T> serialize() {
            return new SerializedEmitter<T>(this);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }

    /**
     * Serializes calls to onNext, onError and onComplete.
     *
     * @param <T> the value type
     */
    static final class SerializedEmitter<T>
    extends AtomicInteger
    implements ObservableEmitter<T> {

        private static final long serialVersionUID = 4883307006032401862L;

        final ObservableEmitter<T> emitter;

        final AtomicThrowable error;

        final SpscLinkedArrayQueue<T> queue;

        volatile boolean done;

        SerializedEmitter(ObservableEmitter<T> emitter) {
            this.emitter = emitter;
            this.error = new AtomicThrowable();
            this.queue = new SpscLinkedArrayQueue<T>(16);
        }

        @Override
        public void onNext(T t) {
            if (emitter.isDisposed() || done) {
                return;
            }
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (get() == 0 && compareAndSet(0, 1)) {
                emitter.onNext(t);
                if (decrementAndGet() == 0) {
                    return;
                }
            } else {
                SimpleQueue<T> q = queue;
                synchronized (q) {
                    q.offer(t);
                }
                if (getAndIncrement() != 0) {
                    return;
                }
            }
            drainLoop();
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (emitter.isDisposed() || done) {
                return false;
            }
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (error.addThrowable(t)) {
                done = true;
                drain();
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (emitter.isDisposed() || done) {
                return;
            }
            done = true;
            drain();
        }

        void drain() {
            if (getAndIncrement() == 0) {
                drainLoop();
            }
        }

        void drainLoop() {
            ObservableEmitter<T> e = emitter;
            SpscLinkedArrayQueue<T> q = queue;
            AtomicThrowable error = this.error;
            int missed = 1;
            for (;;) {

                for (;;) {
                    if (e.isDisposed()) {
                        q.clear();
                        return;
                    }

                    if (error.get() != null) {
                        q.clear();
                        e.onError(error.terminate());
                        return;
                    }

                    boolean d = done;
                    T v = q.poll();

                    boolean empty = v == null;

                    if (d && empty) {
                        e.onComplete();
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    e.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

        @Override
        public void setDisposable(Disposable s) {
            emitter.setDisposable(s);
        }

        @Override
        public void setCancellable(Cancellable c) {
            emitter.setCancellable(c);
        }

        @Override
        public boolean isDisposed() {
            return emitter.isDisposed();
        }

        @Override
        public ObservableEmitter<T> serialize() {
            return this;
        }
    }

}


這里將ObservableCreate的源碼全部放在這抑进,作為一個(gè)埋點(diǎn)

其實(shí)强经,Observable.create()方法主要功能就是創(chuàng)建了一個(gè)ObservableCreate對(duì)象,并將自定義的source傳給ObservableCreate寺渗。該方法最終返回的是ObserverableCreate對(duì)象匿情。

三兰迫、subscribe訂閱過(guò)程

分析執(zhí)行subscribe()訂閱流程,并將自定義觀察者作為參數(shù)傳入码秉。
Observable.java

@Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null"); // 功能校驗(yàn),判定observer是否為null
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer); 
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

首先會(huì)執(zhí)行一些功能校驗(yàn)鸡号,最后執(zhí)行到subscribeActual()方法中转砖。
Observable.java

 protected abstract void subscribeActual(Observer<? super T> observer);

subscribeActual()是一個(gè)抽象類(lèi),從而最終調(diào)用的是ObservableCreate的subscribeActual()方法中鲸伴。

ObservableCreate.java

@Override
    protected void subscribeActual(Observer<? super T> observer) {  // observer為自定義觀察者
        // 自定義一個(gè)CreateEmitter發(fā)射器
        CreateEmitter<T> parent = new CreateEmitter<T>(observer); 
        // 執(zhí)行該方法就會(huì)執(zhí)行自定義觀察者的onSubscribe()方法中
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

subscribeActual()方法里面會(huì)執(zhí)行如下三個(gè)操作:
1)CreateEmitter<T> parent = new CreateEmitter<T>(observer); --> 首先會(huì)創(chuàng)建一個(gè)CreateEmitter發(fā)射器府蔗,并將自定義觀察者傳入該發(fā)射器中
2)observer.onSubscribe(parent);–> 執(zhí)行自定義觀察者的onSubscribe()方法,所以該方法也是最先執(zhí)行調(diào)用汞窗,并且一定在主線程中
3)source.subscribe(parent); -->執(zhí)行自定義source的subscribe()訂閱操作,從而跳轉(zhuǎn)到示例代碼1中ObservableOnSubscribe的subscribe()方法姓赤,并將CreateEmitter發(fā)射器作為參數(shù)傳入進(jìn)去

new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("test");
            }
        }

執(zhí)行e.onNext("test")就會(huì)跳轉(zhuǎn)到CreateEmitter發(fā)射器中的onNext()方法
ObservableCreate.java

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
    
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);  //執(zhí)行該流程,observer為自定義觀察者
            }
        }
       ...
    }

該observer為上面流程中自定義的CreateEmitter發(fā)射器CreateEmitter<T> parent = new CreateEmitter<T>(observer);傳入進(jìn)來(lái)的自定義觀察者對(duì)象仲吏,執(zhí)行observer.onNext(t)該語(yǔ)句就調(diào)到示例代碼1中的

@Override
public void onNext(String s) {

}

Observable與Observer訂閱的過(guò)程時(shí)序圖如下:
[圖片上傳失敗...(image-e9a070-1677294388046)]

在標(biāo)準(zhǔn)的觀察者設(shè)計(jì)模式中不铆,是一個(gè)“被觀察者”,多個(gè)“觀察者”裹唆,并且需要“被觀察者”發(fā)出改變通知后誓斥,所以的“觀察者”才能觀察到
??在RxJava觀察者設(shè)計(jì)模式中,是多個(gè)“被觀察者”许帐,一個(gè)“觀察者”劳坑,并且需要 起點(diǎn)(被觀察者) 和 終點(diǎn)(觀察者) 在“訂閱”一次后,才發(fā)出改變通知成畦,終點(diǎn)(觀察者)才能觀察到

圖1:RxJava簡(jiǎn)單訂閱過(guò)程:
[圖片上傳失敗...(image-f0f708-1677294388046)]

四距芬、map操作符

加入map操作符之后的簡(jiǎn)單示例代碼2:

private Disposable mDisposable;

// 創(chuàng)建ObserverCreate
Observable.create(new ObservableOnSubscribe<String>() { //自定義source
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("test");
            }
        })
        // ObservableCreate.map
        .map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                return s;
            }
        })
        // ObservableMap.subscribe
        .subscribe(new Observer<String>() { //自定義觀察者
            @Override
            public void onSubscribe(Disposable d) {
                mDisposable = d;
            }

            @Override
            public void onNext(String s) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null) {
            if (!mDisposable.isDisposed()) {
                mDisposable.dispose();
            }
        }
    }

這個(gè)示例代碼2寫(xiě)法采用裝飾模型

圖2:加入map操作符之后的流程:
[圖片上傳失敗...(image-40fc9-1677294388046)]

從①~⑥流程簡(jiǎn)稱(chēng)為封包裹,⑦ ~⑨流程簡(jiǎn)稱(chēng)為拆包裹

其實(shí)圖1與圖2的區(qū)別不大循帐,主要就是多了一個(gè)ObservableMap封包裹的流程框仔,其他流程都類(lèi)似。針對(duì)這個(gè)區(qū)別進(jìn)行代碼流程闡述下:
??從示例代碼2中執(zhí)行map()操作進(jìn)行分析:
Observable.java

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

進(jìn)行創(chuàng)建ObservableMap對(duì)象

ObservableMap.java

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source); //source指ObservableCreate
        this.function = function; // 自定義的Function方法
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function)); //這里面的t為下一層包裹即圖2中的自定義觀察者,source指上一層ObservableCreate
    }
...
}

這里需要注意拄养,在ObservableMap()構(gòu)造函數(shù)中存和,參數(shù)source指從上一層傳過(guò)來(lái)的ObservableCreate對(duì)象,參數(shù)function指示例代碼2中的new Function()方法衷旅。

 .map(new Function<String, String>() 

執(zhí)行示例代碼2中的.subscribe()其實(shí)就是執(zhí)行到了ObservableMap類(lèi)的subscribeActual()方法捐腿,在這個(gè)方法中會(huì)對(duì)MapObserver進(jìn)行封裝一層包裹,并將下一層的包裹即自定義觀察者也就是參數(shù)t傳入柿顶。

MapObserver為ObservableMap的內(nèi)部類(lèi)茄袖。

ObservableMap.java

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual); // actual為自定義觀察者
            this.mapper = mapper;
        }
        ...
}

在執(zhí)行圖2的第⑧步流程時(shí),就會(huì)調(diào)用執(zhí)行包裹1的onNext()方法嘁锯,即MapObserver類(lèi)的onNext();
ObservableMap.java

@Override
public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode != NONE) {
        actual.onNext(null);
        return;
    }

    U v;

    try {
       // 代碼1
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);
        return;
    }
    // 代碼2
    actual.onNext(v);
}


1:代碼1
??執(zhí)行mapper.apply(t)流程的時(shí)候宪祥,其實(shí)就是調(diào)用了示例代碼2中的apply()方法聂薪。
Function.java

public interface Function<T, R> {
    R apply(@NonNull T t) throws Exception;
}

@Override
public String apply(String s) throws Exception {
    return s;
}

2:代碼2
?? actual.onNext(v);中的actual是在ObservableMap構(gòu)造函數(shù)傳過(guò)來(lái)的,actual對(duì)應(yīng)圖2中的自定義觀察者對(duì)象蝗羊,也就是對(duì)應(yīng)圖2中的第9步流程藏澳。

五、線程切換原理

subscribeOn:給上面代碼分配線程
observeOn:給下面代碼分配線程

Scheduler分類(lèi):

調(diào)度器類(lèi)型 效果
Schedulers.computation() 用于計(jì)算任務(wù)耀找,如事件循環(huán)或回調(diào)處理翔悠,不要用于IO操作(IO操作使用Schedulers.io());默認(rèn)線程數(shù)等于處理器的數(shù)量
Schedulers.from(executor) 使用指定的Executor作為調(diào)度器
Schedulers.immediate() 在當(dāng)前線程立即開(kāi)始執(zhí)行任務(wù)
Schedulers.io() 用于IO密集型任務(wù)
Schedulers.newThread() 為每個(gè)任務(wù)創(chuàng)建一個(gè)新任務(wù)
Schedulers.trampoline() 當(dāng)其他排隊(duì)的任務(wù)完成后野芒,在當(dāng)前線程排隊(duì)開(kāi)始執(zhí)行
AndroidSchedulers.mainThread() 用于Android的UI更新操作

1. 異步線程流程

示例代碼3:

private Disposable mDisposable;

//創(chuàng)建ObserverableCreate對(duì)象
Observable.create(
            // 自定義source
            new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> e) throws Exception {
                        e.onNext("test");
                    }
                }
        )
        // TODO 第二步
        //ObservableCreate.subscribeOn()
        .subscribeOn(
            // TODO 第一步
            Schedulers.io()  // 給上面的代碼分配異步線程
            ) 
        // TODO 第三步
        // ObservableSubscribeOn.subscribe()
        .subscribe(
            // 自定義觀察者
            new Observer<String>() { 
                @Override
                public void onSubscribe(Disposable d) {
                    mDisposable = d;
                }
    
                @Override
                public void onNext(String s) {
                    Log.d("abc", "onNext:" + Thread.currentThread().getName());
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
        });


    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null) {
            if (!mDisposable.isDisposed()) {
                mDisposable.dispose();
            }
        }
    }

示例代碼3只是簡(jiǎn)單的在示例代碼1上添加了一行異步線程的操作 .subscribeOn(Schedulers.io())蓄愁,從第一步該語(yǔ)句進(jìn)行分析:
Schedules.java

static final Scheduler IO;
...
static {
        SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

        COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

        IO = RxJavaPlugins.initIoScheduler(new IOTask());

        TRAMPOLINE = TrampolineScheduler.instance();

        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }
    
 ...
 @NonNull
 public static Scheduler io() {
     return RxJavaPlugins.onIoScheduler(IO);
 }

RxJavaPlugins.initIoScheduler(...);這條語(yǔ)句也采用了hook機(jī)制,繼續(xù)分析new IOTask()流程
Schedules.java

static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return IoHolder.DEFAULT;
        }
    }

DEFAULT賦值如下:
Schedules.java

static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }

IoScheduler.java

public IoScheduler() {
   this(WORKER_THREAD_FACTORY);
}

public IoScheduler(ThreadFactory threadFactory) {
   this.threadFactory = threadFactory;
   this.pool = new AtomicReference<CachedWorkerPool>(NONE);
   start();
}

@Override
public void start() {
    CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
    if (!pool.compareAndSet(NONE, update)) {
        update.shutdown();
    }
}

IoScheduler.java

 private final ScheduledExecutorService evictorService;  //線程池
 
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
            this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
            this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
            this.allWorkers = new CompositeDisposable();
            this.threadFactory = threadFactory;

            ScheduledExecutorService evictor = null;
            Future<?> task = null;
            if (unit != null) {
                evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY); // 創(chuàng)建線程池
                task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
            }
            evictorService = evictor;
            evictorTask = task;
        }

第一步總結(jié)執(zhí)行Schedulers.io() 最終返回的是Scheduler狞悲,也就是IOScheduler對(duì)象撮抓。通過(guò)new IOScheduler 創(chuàng)建了一個(gè)線程池,然后通過(guò)subscribeOn()來(lái)觸發(fā)摇锋。
Observable.java

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

此時(shí)new 了一個(gè)ObservableSubscribeOn對(duì)象丹拯,并將IoScheduler對(duì)象傳進(jìn)去
ObservableSubscribeOn.java

final Scheduler scheduler;

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
     super(source);
     this.scheduler = scheduler;
}

第二步總結(jié):就是創(chuàng)建了一個(gè)ObservableSubscribeOn對(duì)象,并將IoScheduler傳入到該類(lèi)中荸恕。
??執(zhí)行第三步的.subscribe()訂閱流程也就執(zhí)行到了ObservableSubscribeOn.subscribeActual()這個(gè)方法中咽笼。
ObservableSubscribeOn.java

// s為自定義觀察者
public void subscribeActual(final Observer<? super T> s) {
        // 代碼1
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); //定義一個(gè)包裹SubscribeOnObserver
        // 代碼2
        s.onSubscribe(parent);
        // 代碼3
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

1:代碼1
??final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);定義一個(gè)包裹SubscribeOnObserver,并將自定義觀察者s作為參數(shù)傳入

2:代碼2
??執(zhí)行s.onSubscribe(parent);對(duì)應(yīng)的執(zhí)行到了示例代碼3中的語(yǔ)句塊

@Override
public void onSubscribe(Disposable d) {
    mDisposable = d;
}

3:代碼3
??首先先分析parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));語(yǔ)句中的new SubscribeTask(parent)代碼流程
ObservableSubscribeOn.java

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

     SubscribeTask(SubscribeOnObserver<T> parent) {
         this.parent = parent;
     }

     @Override
     public void run() {
         source.subscribe(parent);
     }
 }

SubscribeTask 就是一個(gè)線程任務(wù)

source.subscribe(parent);這個(gè)語(yǔ)句塊中的source就是指上一層的對(duì)象戚炫,在示例代碼3中指ObservableCreate剑刑,parent指包裹SubscribeOnObserver。

之后繼續(xù)分析parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));語(yǔ)句中的scheduler.scheduleDirect(...)代碼流程

Scheduler.java

public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

Scheduler.java

// run是指SubscribeTask任務(wù)
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
   // 代碼1
    final Worker w = createWorker();

    // 代碼2 
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); // hook機(jī)制
    
    // 代碼3
    DisposeTask task = new DisposeTask(decoratedRun, w);
    
    // 代碼4
    w.schedule(task, delay, unit);

    return task;
}

1:代碼1
??執(zhí)行final Worker w = createWorker();双肤,createWorker()是一個(gè)抽象方法施掏,其實(shí)調(diào)用到了IoScheduler類(lèi)的createWorker()
IoScheduler.java

@NonNull
@Override
public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

IoScheduler.java

static final class EventLoopWorker extends Scheduler.Worker {
     private final CompositeDisposable tasks;
      private final CachedWorkerPool pool;
      private final ThreadWorker threadWorker;

      final AtomicBoolean once = new AtomicBoolean();

      EventLoopWorker(CachedWorkerPool pool) {
          this.pool = pool;
          this.tasks = new CompositeDisposable();
          this.threadWorker = pool.get();
      }
      ...
}

該語(yǔ)句塊最后返回的是EventLoopWorker對(duì)象。

2:代碼2
??其實(shí)代碼2語(yǔ)句就是將Runable進(jìn)行封裝了下茅糜,最后還是Runnable

3:代碼3
??將Runnable又包裝了一層 為DisposeTask
Scheduler.java

static final class DisposeTask implements Runnable, Disposable {
        final Runnable decoratedRun;
        final Worker w;
    
    DisposeTask(Runnable decoratedRun, Worker w) {
       this.decoratedRun = decoratedRun;
       this.w = w;
    }
    ...
}

4:代碼4
??執(zhí)行 w.schedule(task, delay, unit);就會(huì)執(zhí)行到EventLoopWorker類(lèi)的schedule()方法
IoScheduler.java

public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
   if (tasks.isDisposed()) {
         // don't schedule, we are unsubscribed
         return EmptyDisposable.INSTANCE;
     }

     return threadWorker.scheduleActual(action, delayTime, unit, tasks);
 }

NewThreadWorker.java

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
   Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

   ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

   if (parent != null) {
       if (!parent.add(sr)) {
           return sr;
       }
   }

   Future<?> f;
   try {
       if (delayTime <= 0) {
           f = executor.submit((Callable<Object>)sr); //線程池的執(zhí)行七芭,executor是ScheduleExecutorService線程池
       } else {
           f = executor.schedule((Callable<Object>)sr, delayTime, unit);
       }
       sr.setFuture(f);
   } catch (RejectedExecutionException ex) {
       if (parent != null) {
           parent.remove(sr);
       }
       RxJavaPlugins.onError(ex);
   }

   return sr;
}

scheduler.scheduleDirect(new SubscribeTask(parent)) 這句代碼的最終目的就是將 SubscribeTask 任務(wù) 交給線程池去執(zhí)行。

執(zhí)行executor.submit()該語(yǔ)句就會(huì)觸發(fā)SubscribeTask任務(wù)的Run()方法執(zhí)行蔑赘,該SubscribeTask任務(wù)就處于異步線程中狸驳。

ObservableSubscribeOn.java

public void run() {
    source.subscribe(parent); //處于異步線程中執(zhí)行
}

source.subscribe(parent);這個(gè)語(yǔ)句塊中的source就是指上一層的對(duì)象,在示例代碼3中指ObservableCreate缩赛,parent指包裹SubscribeOnObserver耙箍。

圖3:subscribeOn異步流程:
[圖片上傳失敗...(image-584a9b-1677294388046)]

說(shuō)明:步驟④是將SubscribeTask任務(wù)加入到線程池中執(zhí)行,則后續(xù)步驟⑤~⑩都是在異步線程中執(zhí)行

subscribeOn()切換線程時(shí)序圖:
[圖片上傳失敗...(image-32486d-1677294388046)]

2. 主線程流程

示例代碼4:

private Disposable mDisposable;

//創(chuàng)建ObserverableCreate對(duì)象
Observable.create(
            // 自定義source
            new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> e) throws Exception {
                        e.onNext("test");
                    }
                }
        )
        // TODO 第二步
        //ObservableCreate.observeOn()
        .observeOn(
            // TODO 第一步
            AndroidSchedulers.mainThread()  // 給上面的代碼分配主線程
            ) 
        // TODO 第三步
        // ObservableObserveOn.subscribe()
        .subscribe(
            // 自定義觀察者
            new Observer<String>() { 
                @Override
                public void onSubscribe(Disposable d) {
                    mDisposable = d;
                }
    
                @Override
                public void onNext(String s) {
                    Log.d("abc", "onNext:" + Thread.currentThread().getName());
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
        });


    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null) {
            if (!mDisposable.isDisposed()) {
                mDisposable.dispose();
            }
        }
    }

先來(lái)分析下第一步AndroidSchedulers.mainThread()流程:
AndroidSchedulers.java

public static Scheduler mainThread() {
    return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}


private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
      new Callable<Scheduler>() {
           @Override public Scheduler call() throws Exception {
               return MainHolder.DEFAULT;
           }
       });


private static final class MainHolder {
    static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}

HandlerScheduler.java

final class HandlerScheduler extends Scheduler {
    private final Handler handler;

    HandlerScheduler(Handler handler) {
        this.handler = handler;
    }
    ... 
}

AndroidSchedulers.mainThread()流程就是創(chuàng)建了一個(gè)HandlerScheduler對(duì)象酥馍。

執(zhí)行第二步.observeOn(...)
Observable.java

public final Observable<T> observeOn(Scheduler scheduler) {
   return observeOn(scheduler, false, bufferSize());
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

創(chuàng)建一個(gè)ObservableObserveOn對(duì)象辩昆,其中scheduler就是HandlerScheduler對(duì)象。
ObservableObserveOn.java

public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
    super(source);
    this.scheduler = scheduler;
    this.delayError = delayError;
    this.bufferSize = bufferSize;
}

.observeOn(...)流程就是創(chuàng)建一個(gè)ObservableObserveOn對(duì)象旨袒,并將HandlerScheduler對(duì)象傳入汁针。

之后執(zhí)行第三步.subscribe()术辐,對(duì)應(yīng)執(zhí)行的是ObservableObserveOn.subscribeActual()
ObservableObserveOn.java

@Override
public void subscribeActual(final Observer<? super T> s) {
    // 代碼1
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
   // 代碼2
    s.onSubscribe(parent);
    // 代碼3
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

其實(shí),參數(shù)s為自定義的觀察者施无,這個(gè)地方跟異步線程的流程是一樣的辉词,
代碼1:封裝了一層SubscribeOnObserver包裹
代碼2:執(zhí)行自定義觀察者中的onSubscribe()方法流程
代碼3:scheduler.scheduleDirect(new SubscribeTask(parent)) 這行代碼的功能就是將SubscribeTask任務(wù)交給主線程執(zhí)行。

HandlerScheduler.java

 @Override
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }

    private static final class HandlerWorker extends Worker {
        private final Handler handler;

        private volatile boolean disposed;

        HandlerWorker(Handler handler) {
            this.handler = handler;
        }

        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                return Disposables.disposed();
            }

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); //將run運(yùn)行在主線程中

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }
        ...
}

圖4:observeOn主線程流程:
[圖片上傳失敗...(image-5ac86f-1677294388046)]

observeOn()時(shí)序圖:
[圖片上傳失敗...(image-e9a7a-1677294388046)]

本文轉(zhuǎn)自 https://blog.csdn.net/xuyin1204/article/details/129147940?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522167729434516800188551819%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=167729434516800188551819&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2blogfirst_rank_ecpm_v1~rank_v31_ecpm-1-129147940-null-null.blog_rank_default&utm_term=Android%20RxJava%E6%A1%86%E6%9E%B6%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90&spm=1018.2226.3001.4450猾骡,如有侵權(quán)瑞躺,請(qǐng)聯(lián)系刪除。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末卓练,一起剝皮案震驚了整個(gè)濱河市隘蝎,隨后出現(xiàn)的幾起案子购啄,更是在濱河造成了極大的恐慌襟企,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,884評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件狮含,死亡現(xiàn)場(chǎng)離奇詭異顽悼,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)几迄,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)蔚龙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人映胁,你說(shuō)我怎么就攤上這事木羹。” “怎么了解孙?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,435評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵坑填,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我弛姜,道長(zhǎng)脐瑰,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,509評(píng)論 1 284
  • 正文 為了忘掉前任廷臼,我火速辦了婚禮苍在,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘荠商。我一直安慰自己寂恬,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,611評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布莱没。 她就那樣靜靜地躺著掠剑,像睡著了一般。 火紅的嫁衣襯著肌膚如雪郊愧。 梳的紋絲不亂的頭發(fā)上朴译,一...
    開(kāi)封第一講書(shū)人閱讀 49,837評(píng)論 1 290
  • 那天井佑,我揣著相機(jī)與錄音,去河邊找鬼眠寿。 笑死躬翁,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的盯拱。 我是一名探鬼主播盒发,決...
    沈念sama閱讀 38,987評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼狡逢!你這毒婦竟也來(lái)了宁舰?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,730評(píng)論 0 267
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤奢浑,失蹤者是張志新(化名)和其女友劉穎蛮艰,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體雀彼,經(jīng)...
    沈念sama閱讀 44,194評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡壤蚜,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,525評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了徊哑。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片袜刷。...
    茶點(diǎn)故事閱讀 38,664評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖莺丑,靈堂內(nèi)的尸體忽然破棺而出著蟹,到底是詐尸還是另有隱情,我是刑警寧澤梢莽,帶...
    沈念sama閱讀 34,334評(píng)論 4 330
  • 正文 年R本政府宣布萧豆,位于F島的核電站,受9級(jí)特大地震影響蟹漓,放射性物質(zhì)發(fā)生泄漏炕横。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,944評(píng)論 3 313
  • 文/蒙蒙 一葡粒、第九天 我趴在偏房一處隱蔽的房頂上張望份殿。 院中可真熱鬧,春花似錦嗽交、人聲如沸卿嘲。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,764評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)拾枣。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間梅肤,已是汗流浹背司蔬。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,997評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留姨蝴,地道東北人俊啼。 一個(gè)月前我還...
    沈念sama閱讀 46,389評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像左医,于是被迫代替她去往敵國(guó)和親授帕。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,554評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容

  • 前言 上一篇文章談了單機(jī)的定時(shí)任務(wù)解決方案浮梢,只能在單個(gè)JVM進(jìn)程中使用跛十;而我們的現(xiàn)在基本上是分布式場(chǎng)景,需要一套在...
    逸飛追夢(mèng)人閱讀 7,647評(píng)論 1 2
  • 描述清點(diǎn)擊 Android Studio 的 build 按鈕后發(fā)生了什么 build[https://jueji...
    CHSmile閱讀 557評(píng)論 0 1
  • 前言 在前一篇文章中介紹了如何構(gòu)建源碼閱讀環(huán)境秕硝,既然構(gòu)建好了源碼環(huán)境芥映,本地也可以正常運(yùn)行,那就開(kāi)始閱讀源碼吧缝裤! 在...
    Java李太白閱讀 175評(píng)論 0 1
  • 前言 以前有學(xué)過(guò)屏轰,感覺(jué)理解不了颊郎,然后又用不到憋飞,就不了了之了 ,現(xiàn)在因?yàn)閷?shí)習(xí)公司的項(xiàng)目有用到姆吭,如果不學(xué)的話感覺(jué)根本看...
    道別1999閱讀 406評(píng)論 0 2
  • 一榛做、發(fā)展歷史 20世紀(jì)90年代,硬件領(lǐng)域出現(xiàn)了單片式計(jì)算機(jī)系統(tǒng)内狸,這種價(jià)格低廉的系統(tǒng)一出現(xiàn)就立即引起了自動(dòng)控制領(lǐng)域人...
    橙子v閱讀 397評(píng)論 0 0