RxJava3源碼分析

ReactiveXReactive Extensions的縮寫少孝,一般簡寫為Rx,最初是LINQ的一個(gè)擴(kuò)展,由微軟的架構(gòu)師Erik Meijer領(lǐng)導(dǎo)的團(tuán)隊(duì)開發(fā)卵皂,在2012年11月開源悼粮。Rx現(xiàn)在已經(jīng)支持幾乎全部的流行編程語言癌椿,RxJavaReactiveXJava語言的支持庫舔清。簡單來說,RxJava是使用觀察者模式的基于事件流處理的響應(yīng)式編程庫(如果還不是特別了解觀察者設(shè)計(jì)模式,可看看我的這篇文章觀察者設(shè)計(jì)模式-RecyclerView中的觀察者)蛔垢,下面是官方的RxJava原理圖

RxJava原理圖

本篇將從工作線程下載網(wǎng)絡(luò)圖片轉(zhuǎn)成Bitmap,切換到主線程ImageView.setImageBitmap()將圖片顯示出來的過程分析RxJava源碼,主要涉及以下部分

  • RxJava的基本使用
  • 創(chuàng)建操作符Just
  • 變換操作符Map
  • 輔助操作符SubscribeOn
  • 輔助操作符ObserveOn

基本使用

導(dǎo)入依賴

implementation "io.reactivex.rxjava3:rxjava:3.0.2"
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
Observable.just("https://timgsa.baidu.com/timg?image&quality=80&size=b9999_10000&sec=1587923166363&di=050c57334c9bf2e16e19389161657cb3&imgtype=0&src=http%3A%2F%2Fp2.so.qhimgs1.com%2Ft01dfcbc38578dac4c2.jpg")
                .map(new Function<String, Bitmap>() {
                    @Override
                    public Bitmap apply(String urlStr) throws Throwable {
                        URL url = new URL(urlStr);
                        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
                        connection.connect();
                        InputStream inputStream = connection.getInputStream();
                        Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
                        return bitmap;
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Bitmap>() {
                    @Override
                    public void accept(Bitmap bitmap) throws Throwable {
                        iv.setImageBitmap(bitmap);
                    }
                });

Observable.just()創(chuàng)建網(wǎng)絡(luò)圖片鏈接的被觀察者拦惋,map轉(zhuǎn)換操作符中將String轉(zhuǎn)換成Bitmap寻仗,subscribeOnobserveOn 將下載和轉(zhuǎn)換Bitmap放在工作線程,并將下面的流程切換到主線程昧穿,subscribe將bitmap發(fā)送給觀察者Consumer

創(chuàng)建操作符Just

    @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> just(@NonNull T item) {//參數(shù)item在這里是網(wǎng)絡(luò)圖片url
        Objects.requireNonNull(item, "item is null");//判空拋異常勺远,不是重點(diǎn)
        return RxJavaPlugins.onAssembly(new ObservableJust<>(item));
    }

RxJavaPlugins.onAssembly(new ObservableJust<>(item)),將網(wǎng)絡(luò)圖片Url作為構(gòu)造參數(shù)創(chuàng)建ObservableJust對象时鸵,傳入RxJavaPlugins靜態(tài)方法onAssembly()

    @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

onObservableAssembly是一個(gè)鉤子參數(shù)胶逢,默認(rèn)為空,不需要關(guān)心饰潜,即onAssembly()方法就是將傳入的對象source直接返回了,這里的source就是剛才創(chuàng)建的ObservableJust,ObservableJustObservable的子類初坠。ObservableJust構(gòu)造方法中將傳入?yún)?shù)賦值給了成員變量value

public final class ObservableJust<T> extends Observable<T> implements ScalarSupplier<T> {

    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
        observer.onSubscribe(sd);
        sd.run();
    }

    @Override
    public T get() {
        return value;
    }
}

為了方便更好的理解彭雾,我將just()操作符的代碼簡化一下

    @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> just(@NonNull T item) {
        return new ObservableJust<>(item);
    }

小結(jié)一下
創(chuàng)建操作符Just,實(shí)例化了ObservableJust對象碟刺,并給成員變量value賦值,并返回ObservableJust對象薯酝,給后面的流程鏈?zhǔn)秸{(diào)用半沽。

變換操作符Map

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    @NonNull
    public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
        Objects.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
    }

map操作符代碼結(jié)構(gòu)和just操作符一樣爽柒。為了方便更好的理解,我將map()操作符的代碼簡化一下

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    @NonNull
    public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
        return new ObservableMap<>(this, mapper);
    }

map()實(shí)例化了ObservableMap對象抄囚,ObservableMap傳入了兩個(gè)參數(shù)thismapper

  • this(類型為ObservableSource)
    Observable的對象霉赡,在這里的this,就是在just()操作符創(chuàng)建的ObservableJust
  • mapper(類型為Function<? super T, ? extends U>)
    Function第一個(gè)泛型參數(shù)規(guī)定了下限幔托,必須為T或者T的父類穴亏,在這里就是網(wǎng)絡(luò)圖片的url,String類型重挑。第一個(gè)參數(shù)規(guī)定了上限嗓化,是下游方法希望拿到的參數(shù)類型,在這里就是Bitmap類型
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

  //-------------------------------------下方代碼暫時(shí)不用關(guān)心--------------------------------------------//

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }
      
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
                v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Throwable {
            T t = qd.poll();
            return t != null ? Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

    protected final ObservableSource<T> source;

    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }

    @Override
    public final ObservableSource<T> source() {
        return source;
    }

}

ObservableMap構(gòu)造方法中谬哀,將function賦值給了ObservableMap的成員變量function刺覆。
source賦值給了父類AbstractObservableWithUpstream的成員變量source,類型為ObservableSource史煎。在這里的source是剛才創(chuàng)建的ObservableJust對象

輔助操作符SubscribeOn

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    @NonNull
    public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
    }

按照套路繼續(xù)簡化代碼

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    @NonNull
    public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
        return new ObservableSubscribeOn<>(this, scheduler);
    }

這里的this是上游的Observable谦屑,在這里是ObservableMap對象

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
    
    //下方代碼暫時(shí)不用關(guān)心,訂閱之后再回來看
    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    //此處省略大量代碼
}

SubscribeOn重點(diǎn)Schedulers.io()

    @NonNull
    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }
static {
        //此處省略無關(guān)代碼

        IO = RxJavaPlugins.initIoScheduler(new IOTask());

        //此處省略無關(guān)代碼
    }
    static final class IOTask implements Supplier<Scheduler> {
        @Override
        public Scheduler get() {
            return IoHolder.DEFAULT;
        }
    }
    static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }
    public IoScheduler() {
        this(WORKER_THREAD_FACTORY);
    }
        WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
public final class RxThreadFactory extends AtomicLong implements ThreadFactory {

    private static final long serialVersionUID = -7789753024099756196L;

    final String prefix;

    final int priority;

    final boolean nonBlocking;

    public RxThreadFactory(String prefix) {
        this(prefix, Thread.NORM_PRIORITY, false);
    }

    public RxThreadFactory(String prefix, int priority) {
        this(prefix, priority, false);
    }

    public RxThreadFactory(String prefix, int priority, boolean nonBlocking) {
        this.prefix = prefix;
        this.priority = priority;
        this.nonBlocking = nonBlocking;
    }

    @Override
    public Thread newThread(@NonNull Runnable r) {
        StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
        String name = nameBuilder.toString();
        Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
        t.setPriority(priority);
        t.setDaemon(true);
        return t;
    }
}
    public IoScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.pool = new AtomicReference<>(NONE);
        start();
    }
    @Override
    public void start() {
        CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
        if (!pool.compareAndSet(NONE, update)) {
            update.shutdown();
        }
    }
static final class CachedWorkerPool implements Runnable {
        private final long keepAliveTime;
        private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
        final CompositeDisposable allWorkers;
        private final ScheduledExecutorService evictorService;
        private final Future<?> evictorTask;
        private final ThreadFactory threadFactory;

        CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
            this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
            this.expiringWorkerQueue = new ConcurrentLinkedQueue<>();
            this.allWorkers = new CompositeDisposable();
            this.threadFactory = threadFactory;

            ScheduledExecutorService evictor = null;
            Future<?> task = null;
            if (unit != null) {
                evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
                task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
            }
            evictorService = evictor;
            evictorTask = task;
        }

小結(jié)一下
subscribeOn(Schedulers.io())就是將事件流作為runable,傳入到newScheduledThreadPool線程池中篇梭,達(dá)到線程切換到工作線程的目的氢橙。

輔助操作符ObserveOn

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    @NonNull
    public final Observable<T> observeOn(@NonNull Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    @NonNull
    public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
    }

老套路 簡化代碼

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    @NonNull
    public final Observable<T> observeOn(@NonNull Scheduler scheduler) {
        return new ObservableObserveOn<>(this, scheduler, false, 128);
    }
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

 //-------------------------------------下方代碼暫時(shí)不用關(guān)心--------------------------------------------//

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
        }
    }
  //此處省略大量代碼
}

ObserveOn重點(diǎn)AndroidSchedulers.mainThread()

    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
    private static final Scheduler MAIN_THREAD =
        RxAndroidPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT);
private static final class MainHolder {
        static final Scheduler DEFAULT
            = new HandlerScheduler(new Handler(Looper.getMainLooper()), true);
    }

依然簡化代碼

    public static Scheduler mainThread() {
        return new HandlerScheduler(new Handler(Looper.getMainLooper()), true);
    }
final class HandlerScheduler extends Scheduler {
    private final Handler handler;
    private final boolean async;

    HandlerScheduler(Handler handler, boolean async) {
        this.handler = handler;
        this.async = async;
    }

    //省略下方大量代碼
}

再次小結(jié)一下
從上方的源碼分析可以看到,這些操作符的代碼套路都是一樣的恬偷,ObserveOn在這里傳入了HandlerScheduler,HandlerScheduler構(gòu)造方法中傳入了主線程的Handler悍手,實(shí)現(xiàn)主線程切換。在后面的訂閱方法中袍患,我們需要重點(diǎn)關(guān)注實(shí)例化的Observable子類的subscribeActual()方法

subscribe訂閱

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    @NonNull
    public final Disposable subscribe(@NonNull Consumer<? super T> onNext) {
        return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
    }
@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    @NonNull
    public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,
            @NonNull Action onComplete) {
        Objects.requireNonNull(onNext, "onNext is null");
        Objects.requireNonNull(onError, "onError is null");
        Objects.requireNonNull(onComplete, "onComplete is null");

        LambdaObserver<T> ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());

        subscribe(ls);

        return ls;
    }

老套路坦康,簡化代碼

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    @NonNull
    public final Disposable subscribe(@NonNull Consumer<? super T> onNext) {

    LambdaObserver<T> ls = new LambdaObserver<>(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());

    subscribe(ls);

    return ls;
    }
public final class LambdaObserver<T> extends AtomicReference<Disposable>
        implements Observer<T>, Disposable, LambdaConsumerIntrospection {

    private static final long serialVersionUID = -7251123623727029452L;
    final Consumer<? super T> onNext;
    final Consumer<? super Throwable> onError;
    final Action onComplete;
    final Consumer<? super Disposable> onSubscribe;

    public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete,
            Consumer<? super Disposable> onSubscribe) {
        super();
        this.onNext = onNext;
        this.onError = onError;
        this.onComplete = onComplete;
        this.onSubscribe = onSubscribe;
    }
    //此處省略大量代碼
}

subscribe(ls);

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(@NonNull Observer<? super T> observer) {
        //此處只列出重點(diǎn)方法
        subscribeActual(observer);
    }

subscribeActual(observer)是一個(gè)抽象方法,所以從后往回看上游Observable子類的實(shí)現(xiàn)方法诡延,也就是前面注釋暫時(shí)不關(guān)心的方法滞欠。

輔助操作符ObserveOn

subscribeActual(observer)

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
        }
    }
  //此處省略大量代碼
}

通過前面的分析可以知道,這里的source是上游傳入的ObservableObserveOn對象

 static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

    //此處省略大量代碼

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }

      
        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
    //此處省略大量代碼
    }

在這里的worker代表的是主線程孕暇,將事件作為一個(gè)runnable

        @NonNull
        public Disposable schedule(@NonNull Runnable run) {
            return schedule(run, 0L, TimeUnit.NANOSECONDS);
        }
        @NonNull
        public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
final class HandlerScheduler extends Scheduler {
    //此處省略大量代碼
    private static final class HandlerWorker extends Worker {
        private final Handler handler;
        private final boolean async;

        private volatile boolean disposed;

        HandlerWorker(Handler handler, boolean async) {
            this.handler = handler;
            this.async = async;
        }

        @Override
        @SuppressLint("NewApi") 
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                return Disposable.disposed();
            }

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            if (async) {
                message.setAsynchronous(true);
            }

            handler.sendMessageDelayed(message, unit.toMillis(delay));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposable.disposed();
            }

            return scheduled;
        }
      //此處省略大量代碼
    }

    private static final class ScheduledRunnable implements Runnable, Disposable {
        private final Handler handler;
        private final Runnable delegate;

        private volatile boolean disposed; // Tracked solely for isDisposed().

        ScheduledRunnable(Handler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }

        @Override
        public void run() {
            try {
                delegate.run();
            } catch (Throwable t) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void dispose() {
            handler.removeCallbacks(this);
            disposed = true;
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }
}

schedule()中將事件切換到主線程執(zhí)行仑撞。

總結(jié)一下
RxJava讓復(fù)雜的事件流分步驟編碼,讓代碼更具拓展性和可讀性妖滔,更重要的是給我們帶來了響應(yīng)式的編程思想隧哮,它的操作符還是比較多的,但是基本所有的操作符座舍,實(shí)現(xiàn)思路都和上面分析的這些操作符大同小異沮翔。
下面列出ReactiveX/RxJava文檔中文版上的大量的操作符(應(yīng)該大部分都不常用),有需要的同學(xué)可以去了解下

創(chuàng)建操作符

  • just(?) — 將一個(gè)或多個(gè)對象轉(zhuǎn)換成發(fā)射這個(gè)或這些對象的一個(gè)Observable
  • from(?) — 將一個(gè)Iterable, 一個(gè)Future, 或者一個(gè)數(shù)組轉(zhuǎn)換成一個(gè)Observable
  • repeat(?) — 創(chuàng)建一個(gè)重復(fù)發(fā)射指定數(shù)據(jù)或數(shù)據(jù)序列的Observable
  • repeatWhen(?) — 創(chuàng)建一個(gè)重復(fù)發(fā)射指定數(shù)據(jù)或數(shù)據(jù)序列的Observable曲秉,它依賴于另一個(gè)Observable發(fā)射的數(shù)據(jù)
  • create(?) — 使用一個(gè)函數(shù)從頭創(chuàng)建一個(gè)Observable
  • defer(?) — 只有當(dāng)訂閱者訂閱才創(chuàng)建Observable采蚀;為每個(gè)訂閱創(chuàng)建一個(gè)新的Observable
  • range(?) — 創(chuàng)建一個(gè)發(fā)射指定范圍的整數(shù)序列的Observable
  • interval(?) — 創(chuàng)建一個(gè)按照給定的時(shí)間間隔發(fā)射整數(shù)序列的Observable
  • timer(?) — 創(chuàng)建一個(gè)在給定的延時(shí)之后發(fā)射單個(gè)數(shù)據(jù)的Observable
  • empty(?) — 創(chuàng)建一個(gè)什么都不做直接通知完成的Observable
  • error(?) — 創(chuàng)建一個(gè)什么都不做直接通知錯(cuò)誤的Observable
  • never(?) — 創(chuàng)建一個(gè)不發(fā)射任何數(shù)據(jù)的Observable

變換操作符

  • map(?) — 對序列的每一項(xiàng)都應(yīng)用一個(gè)函數(shù)來變換Observable發(fā)射的數(shù)據(jù)序列
  • flatMap(?), concatMap(?), and flatMapIterable(?) — 將Observable發(fā)射的數(shù)據(jù)集合變換為Observables集合疲牵,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進(jìn)一個(gè)單獨(dú)的Observable
  • switchMap(?) — 將Observable發(fā)射的數(shù)據(jù)集合變換為Observables集合,然后只發(fā)射這些Observables最近發(fā)射的數(shù)據(jù)
  • scan(?) — 對Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù)榆鼠,然后按順序依次發(fā)射每一個(gè)值
  • groupBy(?) — 將Observable分拆為Observable集合纲爸,將原始Observable發(fā)射的數(shù)據(jù)按Key分組,每一個(gè)Observable發(fā)射一組不同的數(shù)據(jù)
  • buffer(?) — 它定期從Observable收集數(shù)據(jù)到一個(gè)集合妆够,然后把這些數(shù)據(jù)集合打包發(fā)射识啦,而不是一次發(fā)射一個(gè)
  • window(?) — 定期將來自O(shè)bservable的數(shù)據(jù)分拆成一些Observable窗口,然后發(fā)射這些窗口神妹,而不是每次發(fā)射一項(xiàng)
  • cast(?) — 在發(fā)射之前強(qiáng)制將Observable發(fā)射的所有數(shù)據(jù)轉(zhuǎn)換為指定類型

過濾操作符

  • filter(?) — 過濾數(shù)據(jù)
  • takeLast(?) — 只發(fā)射最后的N項(xiàng)數(shù)據(jù)
  • last(?) — 只發(fā)射最后的一項(xiàng)數(shù)據(jù)
  • lastOrDefault(?) — 只發(fā)射最后的一項(xiàng)數(shù)據(jù)颓哮,如果Observable為空就發(fā)射默認(rèn)值
  • takeLastBuffer(?) — 將最后的N項(xiàng)數(shù)據(jù)當(dāng)做單個(gè)數(shù)據(jù)發(fā)射
  • skip(?) — 跳過開始的N項(xiàng)數(shù)據(jù)
  • skipLast(?) — 跳過最后的N項(xiàng)數(shù)據(jù)
  • take(?) — 只發(fā)射開始的N項(xiàng)數(shù)據(jù)
  • first(?) and takeFirst(?) — 只發(fā)射第一項(xiàng)數(shù)據(jù),或者滿足某種條件的第一項(xiàng)數(shù)據(jù)
  • firstOrDefault(?) — 只發(fā)射第一項(xiàng)數(shù)據(jù)鸵荠,如果Observable為空就發(fā)射默認(rèn)值
  • elementAt(?) — 發(fā)射第N項(xiàng)數(shù)據(jù)
  • elementAtOrDefault(?) — 發(fā)射第N項(xiàng)數(shù)據(jù)冕茅,如果Observable數(shù)據(jù)少于N項(xiàng)就發(fā)射默認(rèn)值
  • sample(?) or throttleLast(?) — 定期發(fā)射Observable最近的數(shù)據(jù)
  • throttleFirst(?) — 定期發(fā)射Observable發(fā)射的第一項(xiàng)數(shù)據(jù)
  • throttleWithTimeout(?) or debounce(?) — 只有當(dāng)Observable在指定的時(shí)間后還沒有發(fā)射數(shù)據(jù)時(shí),才發(fā)射一個(gè)數(shù)據(jù)
  • timeout(?) — 如果在一個(gè)指定的時(shí)間段后還沒發(fā)射數(shù)據(jù)蛹找,就發(fā)射一個(gè)異常
  • distinct(?) — 過濾掉重復(fù)數(shù)據(jù)
  • distinctUntilChanged(?) — 過濾掉連續(xù)重復(fù)的數(shù)據(jù)
  • ofType(?) — 只發(fā)射指定類型的數(shù)據(jù)
  • ignoreElements(?) — 丟棄所有的正常數(shù)據(jù)姨伤,只發(fā)射錯(cuò)誤或完成通知

結(jié)合操作符

  • startWith(?) — 在數(shù)據(jù)序列的開頭增加一項(xiàng)數(shù)據(jù)
  • merge(?) — 將多個(gè)Observable合并為一個(gè)
  • mergeDelayError(?) — 合并多個(gè)Observables,讓沒有錯(cuò)誤的Observable都完成后再發(fā)射錯(cuò)誤通知
  • zip(?) — 使用一個(gè)函數(shù)組合多個(gè)Observable發(fā)射的數(shù)據(jù)集合庸疾,然后再發(fā)射這個(gè)結(jié)果
  • and(?), then(?), and when(?) — (rxjava-joins) 通過模式和計(jì)劃組合多個(gè)Observables發(fā)射的數(shù)據(jù)集合
  • combineLatest(?) — 當(dāng)兩個(gè)Observables中的任何一個(gè)發(fā)射了一個(gè)數(shù)據(jù)時(shí)姜挺,通過一個(gè)指定的函數(shù)組合每個(gè)Observable發(fā)射的最新數(shù)據(jù)(一共兩個(gè)數(shù)據(jù)),然后發(fā)射這個(gè)函數(shù)的結(jié)果
  • join(?) and groupJoin(?) — 無論何時(shí)彼硫,如果一個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù)項(xiàng),只要在另一個(gè)Observable發(fā)射的數(shù)據(jù)項(xiàng)定義的時(shí)間窗口內(nèi)凌箕,就將兩個(gè)Observable發(fā)射的數(shù)據(jù)合并發(fā)射
  • switchOnNext(?) — 將一個(gè)發(fā)射Observables的Observable轉(zhuǎn)換成另一個(gè)Observable拧篮,后者發(fā)射這些Observables最近發(fā)射的數(shù)據(jù)

錯(cuò)誤處理

  • onErrorResumeNext(?) — 指示Observable在遇到錯(cuò)誤時(shí)發(fā)射一個(gè)數(shù)據(jù)序列
  • onErrorReturn(?) — 指示Observable在遇到錯(cuò)誤時(shí)發(fā)射一個(gè)特定的數(shù)據(jù)
  • onExceptionResumeNext(?) — instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable)指示Observable遇到錯(cuò)誤時(shí)繼續(xù)發(fā)射數(shù)據(jù)
  • retry(?) — 指示Observable遇到錯(cuò)誤時(shí)重試
  • retryWhen(?) — 指示Observable遇到錯(cuò)誤時(shí),將錯(cuò)誤傳遞給另一個(gè)Observable來決定是否要重新給訂閱這個(gè)Observable

輔助操作符

  • materialize(?) — 將Observable轉(zhuǎn)換成一個(gè)通知列表convert an Observable into a list of Notifications
  • dematerialize(?) — 將上面的結(jié)果逆轉(zhuǎn)回一個(gè)Observable
  • timestamp(?) — 給Observable發(fā)射的每個(gè)數(shù)據(jù)項(xiàng)添加一個(gè)時(shí)間戳
  • serialize(?) — 強(qiáng)制Observable按次序發(fā)射數(shù)據(jù)并且要求功能是完好的
  • cache(?) — 記住Observable發(fā)射的數(shù)據(jù)序列并發(fā)射相同的數(shù)據(jù)序列給后續(xù)的訂閱者
  • observeOn(?) — 指定觀察者觀察Observable的調(diào)度器
  • subscribeOn(?) — 指定Observable執(zhí)行任務(wù)的調(diào)度器
  • doOnEach(?) — 注冊一個(gè)動(dòng)作牵舱,對Observable發(fā)射的每個(gè)數(shù)據(jù)項(xiàng)使用
  • doOnCompleted(?) — 注冊一個(gè)動(dòng)作串绩,對正常完成的Observable使用
  • doOnError(?) — 注冊一個(gè)動(dòng)作,對發(fā)生錯(cuò)誤的Observable使用
  • doOnTerminate(?) — 注冊一個(gè)動(dòng)作芜壁,對完成的Observable使用礁凡,無論是否發(fā)生錯(cuò)誤
  • doOnSubscribe(?) — 注冊一個(gè)動(dòng)作,在觀察者訂閱時(shí)使用
  • doOnUnsubscribe(?) — 注冊一個(gè)動(dòng)作慧妄,在觀察者取消訂閱時(shí)使用
  • finallyDo(?) — 注冊一個(gè)動(dòng)作顷牌,在Observable完成時(shí)使用
  • delay(?) — 延時(shí)發(fā)射Observable的結(jié)果
  • delaySubscription(?) — 延時(shí)處理訂閱請求
  • timeInterval(?) — 定期發(fā)射數(shù)據(jù)
  • using(?) — 創(chuàng)建一個(gè)只在Observable生命周期存在的資源
  • single(?) — 強(qiáng)制返回單個(gè)數(shù)據(jù),否則拋出異常
  • singleOrDefault(?) — 如果Observable完成時(shí)返回了單個(gè)數(shù)據(jù)塞淹,就返回它窟蓝,否則返回默認(rèn)數(shù)據(jù)
  • toFuture(?), toIterable(?), toList(?) — 將Observable轉(zhuǎn)換為其它對象或數(shù)據(jù)結(jié)構(gòu)

條件操作符

  • amb(?) — 給定多個(gè)Observable,只讓第一個(gè)發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù)
  • defaultIfEmpty(?) — 發(fā)射來自原始Observable的數(shù)據(jù)饱普,如果原始Observable沒有發(fā)射數(shù)據(jù)运挫,就發(fā)射一個(gè)默認(rèn)數(shù)據(jù)
  • (rxjava-computation-expressions) doWhile(?) — 發(fā)射原始Observable的數(shù)據(jù)序列状共,然后重復(fù)發(fā)射這個(gè)序列直到不滿足這個(gè)條件為止
  • (rxjava-computation-expressions) ifThen(?) — 只有當(dāng)某個(gè)條件為真時(shí)才發(fā)射原始Observable的數(shù)據(jù)序列,否則發(fā)射一個(gè)空的或默認(rèn)的序列
  • skipUntil(?) — 丟棄原始Observable發(fā)射的數(shù)據(jù)谁帕,直到第二個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù)峡继,然后發(fā)射原始Observable的剩余數(shù)據(jù)
  • skipWhile(?) — 丟棄原始Observable發(fā)射的數(shù)據(jù),直到一個(gè)特定的條件為假匈挖,然后發(fā)射原始Observable剩余的數(shù)據(jù)
  • (rxjava-computation-expressions) switchCase(?) — 基于一個(gè)計(jì)算結(jié)果碾牌,發(fā)射一個(gè)指定Observable的數(shù)據(jù)序列
  • takeUntil(?) — 發(fā)射來自原始Observable的數(shù)據(jù),直到第二個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù)或一個(gè)通知
  • takeWhile(?) and takeWhileWithIndex(?) — 發(fā)射原始Observable的數(shù)據(jù)关划,直到一個(gè)特定的條件為真小染,然后跳過剩余的數(shù)據(jù)
  • (rxjava-computation-expressions) whileDo(?) — 如果條件為true,則發(fā)射源Observable數(shù)據(jù)序列贮折,并且只要條件保持為true就重復(fù)發(fā)射此數(shù)據(jù)序列

布爾操作符

  • all(?) — 判斷是否所有的數(shù)據(jù)項(xiàng)都滿足某個(gè)條件
  • contains(?) — 判斷Observable是否會發(fā)射一個(gè)指定的值
  • exists(?) and isEmpty(?) — 判斷Observable是否發(fā)射了一個(gè)值
  • sequenceEqual(?) — 判斷兩個(gè)Observables發(fā)射的序列是否相等

算數(shù)模塊的操作符

其它聚合操作符

  • concat(?) — 順序連接多個(gè)Observables
  • count(?) and countLong(?) — 計(jì)算數(shù)據(jù)項(xiàng)的個(gè)數(shù)并發(fā)射結(jié)果
  • reduce(?) — 對序列使用reduce()函數(shù)并發(fā)射最終的結(jié)果
  • collect(?) — 將原始Observable發(fā)射的數(shù)據(jù)放到一個(gè)單一的可變的數(shù)據(jù)結(jié)構(gòu)中裤翩,然后返回一個(gè)發(fā)射這個(gè)數(shù)據(jù)結(jié)構(gòu)的Observable
  • toList(?) — 收集原始Observable發(fā)射的所有數(shù)據(jù)到一個(gè)列表,然后返回這個(gè)列表
  • toSortedList(?) — 收集原始Observable發(fā)射的所有數(shù)據(jù)到一個(gè)有序列表调榄,然后返回這個(gè)列表
  • toMap(?) — 將序列數(shù)據(jù)轉(zhuǎn)換為一個(gè)Map踊赠,Map的key是根據(jù)一個(gè)函數(shù)計(jì)算的
  • toMultiMap(?) — 將序列數(shù)據(jù)轉(zhuǎn)換為一個(gè)列表,同時(shí)也是一個(gè)Map每庆,Map的key是根據(jù)一個(gè)函數(shù)計(jì)算的

異步操作符

  • start(?) — 創(chuàng)建一個(gè)Observable筐带,它發(fā)射一個(gè)函數(shù)的返回值
  • toAsync(?) or asyncAction(?) or asyncFunc(?) — 將一個(gè)函數(shù)或者Action轉(zhuǎn)換為已Observable,它執(zhí)行這個(gè)函數(shù)并發(fā)射函數(shù)的返回值
  • startFuture(?) — 將一個(gè)返回Future的函數(shù)轉(zhuǎn)換為一個(gè)Observable缤灵,它發(fā)射Future的返回值
  • deferFuture(?) — 將一個(gè)返回Observable的Future轉(zhuǎn)換為一個(gè)Observable伦籍,但是并不嘗試獲取這個(gè)Future返回的Observable,直到有訂閱者訂閱它
  • forEachFuture(?) — 傳遞Subscriber方法給一個(gè)Subscriber腮出,但是同時(shí)表現(xiàn)得像一個(gè)Future一樣阻塞直到它完成
  • fromAction(?) — 將一個(gè)Action轉(zhuǎn)換為Observable帖鸦,當(dāng)一個(gè)訂閱者訂閱時(shí),它執(zhí)行這個(gè)action并發(fā)射它的返回值
  • fromCallable(?) — 將一個(gè)Callable轉(zhuǎn)換為Observable胚嘲,當(dāng)一個(gè)訂閱者訂閱時(shí)作儿,它執(zhí)行這個(gè)Callable并發(fā)射Callable的返回值,或者發(fā)射異常
  • fromRunnable(?) — convert a Runnable into an Observable that invokes the runable and emits its result when a Subscriber subscribes將一個(gè)Runnable轉(zhuǎn)換為Observable馋劈,當(dāng)一個(gè)訂閱者訂閱時(shí)攻锰,它執(zhí)行這個(gè)Runnable并發(fā)射Runnable的返回值
  • runAsync(?) — 返回一個(gè)StoppableObservable,它發(fā)射某個(gè)Scheduler上指定的Action生成的多個(gè)actions

連接操作符

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末娶吞,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子械姻,更是在濱河造成了極大的恐慌寝志,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,454評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異材部,居然都是意外死亡毫缆,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評論 3 385
  • 文/潘曉璐 我一進(jìn)店門乐导,熙熙樓的掌柜王于貴愁眉苦臉地迎上來苦丁,“玉大人,你說我怎么就攤上這事物臂⊥” “怎么了?”我有些...
    開封第一講書人閱讀 157,921評論 0 348
  • 文/不壞的土叔 我叫張陵棵磷,是天一觀的道長蛾狗。 經(jīng)常有香客問我,道長仪媒,這世上最難降的妖魔是什么沉桌? 我笑而不...
    開封第一講書人閱讀 56,648評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮算吩,結(jié)果婚禮上留凭,老公的妹妹穿的比我還像新娘。我一直安慰自己偎巢,他們只是感情好蔼夜,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,770評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著压昼,像睡著了一般求冷。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上窍霞,一...
    開封第一講書人閱讀 49,950評論 1 291
  • 那天遵倦,我揣著相機(jī)與錄音,去河邊找鬼官撼。 笑死,一個(gè)胖子當(dāng)著我的面吹牛似谁,可吹牛的內(nèi)容都是我干的傲绣。 我是一名探鬼主播,決...
    沈念sama閱讀 39,090評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼巩踏,長吁一口氣:“原來是場噩夢啊……” “哼秃诵!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起塞琼,我...
    開封第一講書人閱讀 37,817評論 0 268
  • 序言:老撾萬榮一對情侶失蹤菠净,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體毅往,經(jīng)...
    沈念sama閱讀 44,275評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡牵咙,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,592評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了攀唯。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片洁桌。...
    茶點(diǎn)故事閱讀 38,724評論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖侯嘀,靈堂內(nèi)的尸體忽然破棺而出另凌,到底是詐尸還是另有隱情,我是刑警寧澤戒幔,帶...
    沈念sama閱讀 34,409評論 4 333
  • 正文 年R本政府宣布吠谢,位于F島的核電站,受9級特大地震影響诗茎,放射性物質(zhì)發(fā)生泄漏工坊。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,052評論 3 316
  • 文/蒙蒙 一错沃、第九天 我趴在偏房一處隱蔽的房頂上張望栅组。 院中可真熱鬧,春花似錦枢析、人聲如沸玉掸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,815評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽司浪。三九已至,卻和暖如春把沼,著一層夾襖步出監(jiān)牢的瞬間啊易,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,043評論 1 266
  • 我被黑心中介騙來泰國打工饮睬, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留租谈,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,503評論 2 361
  • 正文 我出身青樓捆愁,卻偏偏與公主長得像割去,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子昼丑,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,627評論 2 350