ReactiveX
是Reactive Extensions
的縮寫少孝,一般簡寫為Rx,最初是LINQ的一個(gè)擴(kuò)展,由微軟的架構(gòu)師Erik Meijer領(lǐng)導(dǎo)的團(tuán)隊(duì)開發(fā)卵皂,在2012年11月開源悼粮。Rx現(xiàn)在已經(jīng)支持幾乎全部的流行編程語言癌椿,RxJava
是ReactiveX
對Java
語言的支持庫舔清。簡單來說,RxJava
是使用觀察者模式的基于事件流處理的響應(yīng)式編程庫(如果還不是特別了解觀察者設(shè)計(jì)模式,可看看我的這篇文章觀察者設(shè)計(jì)模式-RecyclerView中的觀察者)蛔垢,下面是官方的RxJava原理圖
本篇將從工作線程下載網(wǎng)絡(luò)圖片轉(zhuǎn)成
Bitmap
,切換到主線程ImageView.setImageBitmap()
將圖片顯示出來的過程分析RxJava
源碼,主要涉及以下部分
- RxJava的基本使用
- 創(chuàng)建操作符Just
- 變換操作符Map
- 輔助操作符SubscribeOn
- 輔助操作符ObserveOn
基本使用
導(dǎo)入依賴
implementation "io.reactivex.rxjava3:rxjava:3.0.2"
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
Observable.just("https://timgsa.baidu.com/timg?image&quality=80&size=b9999_10000&sec=1587923166363&di=050c57334c9bf2e16e19389161657cb3&imgtype=0&src=http%3A%2F%2Fp2.so.qhimgs1.com%2Ft01dfcbc38578dac4c2.jpg")
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String urlStr) throws Throwable {
URL url = new URL(urlStr);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.connect();
InputStream inputStream = connection.getInputStream();
Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
return bitmap;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Bitmap>() {
@Override
public void accept(Bitmap bitmap) throws Throwable {
iv.setImageBitmap(bitmap);
}
});
Observable.just()
創(chuàng)建網(wǎng)絡(luò)圖片鏈接的被觀察者拦惋,map
轉(zhuǎn)換操作符中將String轉(zhuǎn)換成Bitmap寻仗,subscribeOn
和observeOn
將下載和轉(zhuǎn)換Bitmap放在工作線程,并將下面的流程切換到主線程昧穿,subscribe
將bitmap發(fā)送給觀察者Consumer
創(chuàng)建操作符Just
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> just(@NonNull T item) {//參數(shù)item在這里是網(wǎng)絡(luò)圖片url
Objects.requireNonNull(item, "item is null");//判空拋異常勺远,不是重點(diǎn)
return RxJavaPlugins.onAssembly(new ObservableJust<>(item));
}
RxJavaPlugins.onAssembly(new ObservableJust<>(item))
,將網(wǎng)絡(luò)圖片Url作為構(gòu)造參數(shù)創(chuàng)建ObservableJust
對象时鸵,傳入RxJavaPlugins
靜態(tài)方法onAssembly()
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
onObservableAssembly
是一個(gè)鉤子參數(shù)胶逢,默認(rèn)為空,不需要關(guān)心饰潜,即onAssembly()
方法就是將傳入的對象source
直接返回了,這里的source
就是剛才創(chuàng)建的ObservableJust
,ObservableJust
是Observable
的子類初坠。ObservableJust
構(gòu)造方法中將傳入?yún)?shù)賦值給了成員變量value
。
public final class ObservableJust<T> extends Observable<T> implements ScalarSupplier<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
observer.onSubscribe(sd);
sd.run();
}
@Override
public T get() {
return value;
}
}
為了方便更好的理解彭雾,我將just()
操作符的代碼簡化一下
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> just(@NonNull T item) {
return new ObservableJust<>(item);
}
小結(jié)一下
創(chuàng)建操作符Just
,實(shí)例化了ObservableJust
對象碟刺,并給成員變量value賦值,并返回ObservableJust
對象薯酝,給后面的流程鏈?zhǔn)秸{(diào)用半沽。
變換操作符Map
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
}
map
操作符代碼結(jié)構(gòu)和just
操作符一樣爽柒。為了方便更好的理解,我將map()
操作符的代碼簡化一下
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
return new ObservableMap<>(this, mapper);
}
map()
實(shí)例化了ObservableMap
對象抄囚,ObservableMap
傳入了兩個(gè)參數(shù)this
和mapper
- this(類型為ObservableSource)
Observable的對象霉赡,在這里的this
,就是在just()
操作符創(chuàng)建的ObservableJust
- mapper(類型為Function<? super T, ? extends U>)
Function
第一個(gè)泛型參數(shù)規(guī)定了下限幔托,必須為T或者T的父類穴亏,在這里就是網(wǎng)絡(luò)圖片的url,String
類型重挑。第一個(gè)參數(shù)規(guī)定了上限嗓化,是下游方法希望拿到的參數(shù)類型,在這里就是Bitmap
類型
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
//-------------------------------------下方代碼暫時(shí)不用關(guān)心--------------------------------------------//
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Throwable {
T t = qd.poll();
return t != null ? Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
protected final ObservableSource<T> source;
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
在ObservableMap
構(gòu)造方法中谬哀,將function
賦值給了ObservableMap
的成員變量function
刺覆。
source
賦值給了父類AbstractObservableWithUpstream
的成員變量source
,類型為ObservableSource
史煎。在這里的source
是剛才創(chuàng)建的ObservableJust
對象
輔助操作符SubscribeOn
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
}
按照套路繼續(xù)簡化代碼
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
return new ObservableSubscribeOn<>(this, scheduler);
}
這里的this
是上游的Observable
谦屑,在這里是ObservableMap
對象
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
//下方代碼暫時(shí)不用關(guān)心,訂閱之后再回來看
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
//此處省略大量代碼
}
SubscribeOn重點(diǎn)Schedulers.io()
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
static {
//此處省略無關(guān)代碼
IO = RxJavaPlugins.initIoScheduler(new IOTask());
//此處省略無關(guān)代碼
}
static final class IOTask implements Supplier<Scheduler> {
@Override
public Scheduler get() {
return IoHolder.DEFAULT;
}
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
public final class RxThreadFactory extends AtomicLong implements ThreadFactory {
private static final long serialVersionUID = -7789753024099756196L;
final String prefix;
final int priority;
final boolean nonBlocking;
public RxThreadFactory(String prefix) {
this(prefix, Thread.NORM_PRIORITY, false);
}
public RxThreadFactory(String prefix, int priority) {
this(prefix, priority, false);
}
public RxThreadFactory(String prefix, int priority, boolean nonBlocking) {
this.prefix = prefix;
this.priority = priority;
this.nonBlocking = nonBlocking;
}
@Override
public Thread newThread(@NonNull Runnable r) {
StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
String name = nameBuilder.toString();
Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
t.setPriority(priority);
t.setDaemon(true);
return t;
}
}
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<>(NONE);
start();
}
@Override
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
static final class CachedWorkerPool implements Runnable {
private final long keepAliveTime;
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
final CompositeDisposable allWorkers;
private final ScheduledExecutorService evictorService;
private final Future<?> evictorTask;
private final ThreadFactory threadFactory;
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;
ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}
小結(jié)一下
subscribeOn(Schedulers.io())
就是將事件流作為runable,傳入到newScheduledThreadPool線程池中篇梭,達(dá)到線程切換到工作線程的目的氢橙。
輔助操作符ObserveOn
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Observable<T> observeOn(@NonNull Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
Objects.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
}
老套路 簡化代碼
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Observable<T> observeOn(@NonNull Scheduler scheduler) {
return new ObservableObserveOn<>(this, scheduler, false, 128);
}
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
//-------------------------------------下方代碼暫時(shí)不用關(guān)心--------------------------------------------//
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
}
}
//此處省略大量代碼
}
ObserveOn重點(diǎn)AndroidSchedulers.mainThread()
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
private static final Scheduler MAIN_THREAD =
RxAndroidPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT);
private static final class MainHolder {
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), true);
}
依然簡化代碼
public static Scheduler mainThread() {
return new HandlerScheduler(new Handler(Looper.getMainLooper()), true);
}
final class HandlerScheduler extends Scheduler {
private final Handler handler;
private final boolean async;
HandlerScheduler(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}
//省略下方大量代碼
}
再次小結(jié)一下
從上方的源碼分析可以看到,這些操作符的代碼套路都是一樣的恬偷,ObserveOn
在這里傳入了HandlerScheduler
,HandlerScheduler
構(gòu)造方法中傳入了主線程的Handler悍手,實(shí)現(xiàn)主線程切換。在后面的訂閱方法中袍患,我們需要重點(diǎn)關(guān)注實(shí)例化的Observable
子類的subscribeActual()
方法
subscribe訂閱
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Disposable subscribe(@NonNull Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,
@NonNull Action onComplete) {
Objects.requireNonNull(onNext, "onNext is null");
Objects.requireNonNull(onError, "onError is null");
Objects.requireNonNull(onComplete, "onComplete is null");
LambdaObserver<T> ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());
subscribe(ls);
return ls;
}
老套路坦康,簡化代碼
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Disposable subscribe(@NonNull Consumer<? super T> onNext) {
LambdaObserver<T> ls = new LambdaObserver<>(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
subscribe(ls);
return ls;
}
public final class LambdaObserver<T> extends AtomicReference<Disposable>
implements Observer<T>, Disposable, LambdaConsumerIntrospection {
private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Disposable> onSubscribe;
public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete,
Consumer<? super Disposable> onSubscribe) {
super();
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onSubscribe = onSubscribe;
}
//此處省略大量代碼
}
subscribe(ls);
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(@NonNull Observer<? super T> observer) {
//此處只列出重點(diǎn)方法
subscribeActual(observer);
}
subscribeActual(observer)是一個(gè)抽象方法,所以從后往回看上游Observable
子類的實(shí)現(xiàn)方法诡延,也就是前面注釋暫時(shí)不關(guān)心的方法滞欠。
輔助操作符ObserveOn
subscribeActual(observer)
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
}
}
//此處省略大量代碼
}
通過前面的分析可以知道,這里的source是上游傳入的ObservableObserveOn
對象
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
//此處省略大量代碼
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
//此處省略大量代碼
}
在這里的worker
代表的是主線程孕暇,將事件作為一個(gè)runnable
@NonNull
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
final class HandlerScheduler extends Scheduler {
//此處省略大量代碼
private static final class HandlerWorker extends Worker {
private final Handler handler;
private final boolean async;
private volatile boolean disposed;
HandlerWorker(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}
@Override
@SuppressLint("NewApi")
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposable.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
if (async) {
message.setAsynchronous(true);
}
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposable.disposed();
}
return scheduled;
}
//此處省略大量代碼
}
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed; // Tracked solely for isDisposed().
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
RxJavaPlugins.onError(t);
}
}
@Override
public void dispose() {
handler.removeCallbacks(this);
disposed = true;
}
@Override
public boolean isDisposed() {
return disposed;
}
}
}
schedule()
中將事件切換到主線程執(zhí)行仑撞。
總結(jié)一下
RxJava讓復(fù)雜的事件流分步驟編碼,讓代碼更具拓展性和可讀性妖滔,更重要的是給我們帶來了響應(yīng)式的編程思想隧哮,它的操作符還是比較多的,但是基本所有的操作符座舍,實(shí)現(xiàn)思路都和上面分析的這些操作符大同小異沮翔。
下面列出ReactiveX/RxJava文檔中文版上的大量的操作符(應(yīng)該大部分都不常用),有需要的同學(xué)可以去了解下
創(chuàng)建操作符
-
just(?)
— 將一個(gè)或多個(gè)對象轉(zhuǎn)換成發(fā)射這個(gè)或這些對象的一個(gè)Observable -
from(?)
— 將一個(gè)Iterable, 一個(gè)Future, 或者一個(gè)數(shù)組轉(zhuǎn)換成一個(gè)Observable -
repeat(?)
— 創(chuàng)建一個(gè)重復(fù)發(fā)射指定數(shù)據(jù)或數(shù)據(jù)序列的Observable -
repeatWhen(?)
— 創(chuàng)建一個(gè)重復(fù)發(fā)射指定數(shù)據(jù)或數(shù)據(jù)序列的Observable曲秉,它依賴于另一個(gè)Observable發(fā)射的數(shù)據(jù) -
create(?)
— 使用一個(gè)函數(shù)從頭創(chuàng)建一個(gè)Observable -
defer(?)
— 只有當(dāng)訂閱者訂閱才創(chuàng)建Observable采蚀;為每個(gè)訂閱創(chuàng)建一個(gè)新的Observable -
range(?)
— 創(chuàng)建一個(gè)發(fā)射指定范圍的整數(shù)序列的Observable -
interval(?)
— 創(chuàng)建一個(gè)按照給定的時(shí)間間隔發(fā)射整數(shù)序列的Observable -
timer(?)
— 創(chuàng)建一個(gè)在給定的延時(shí)之后發(fā)射單個(gè)數(shù)據(jù)的Observable -
empty(?)
— 創(chuàng)建一個(gè)什么都不做直接通知完成的Observable -
error(?)
— 創(chuàng)建一個(gè)什么都不做直接通知錯(cuò)誤的Observable -
never(?)
— 創(chuàng)建一個(gè)不發(fā)射任何數(shù)據(jù)的Observable
變換操作符
-
map(?)
— 對序列的每一項(xiàng)都應(yīng)用一個(gè)函數(shù)來變換Observable發(fā)射的數(shù)據(jù)序列 -
flatMap(?)
,concatMap(?)
, andflatMapIterable(?)
— 將Observable發(fā)射的數(shù)據(jù)集合變換為Observables集合疲牵,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進(jìn)一個(gè)單獨(dú)的Observable -
switchMap(?)
— 將Observable發(fā)射的數(shù)據(jù)集合變換為Observables集合,然后只發(fā)射這些Observables最近發(fā)射的數(shù)據(jù) -
scan(?)
— 對Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù)榆鼠,然后按順序依次發(fā)射每一個(gè)值 -
groupBy(?)
— 將Observable分拆為Observable集合纲爸,將原始Observable發(fā)射的數(shù)據(jù)按Key分組,每一個(gè)Observable發(fā)射一組不同的數(shù)據(jù) -
buffer(?)
— 它定期從Observable收集數(shù)據(jù)到一個(gè)集合妆够,然后把這些數(shù)據(jù)集合打包發(fā)射识啦,而不是一次發(fā)射一個(gè) -
window(?)
— 定期將來自O(shè)bservable的數(shù)據(jù)分拆成一些Observable窗口,然后發(fā)射這些窗口神妹,而不是每次發(fā)射一項(xiàng) -
cast(?)
— 在發(fā)射之前強(qiáng)制將Observable發(fā)射的所有數(shù)據(jù)轉(zhuǎn)換為指定類型
過濾操作符
-
filter(?)
— 過濾數(shù)據(jù) -
takeLast(?)
— 只發(fā)射最后的N項(xiàng)數(shù)據(jù) -
last(?)
— 只發(fā)射最后的一項(xiàng)數(shù)據(jù) -
lastOrDefault(?)
— 只發(fā)射最后的一項(xiàng)數(shù)據(jù)颓哮,如果Observable為空就發(fā)射默認(rèn)值 -
takeLastBuffer(?)
— 將最后的N項(xiàng)數(shù)據(jù)當(dāng)做單個(gè)數(shù)據(jù)發(fā)射 -
skip(?)
— 跳過開始的N項(xiàng)數(shù)據(jù) -
skipLast(?)
— 跳過最后的N項(xiàng)數(shù)據(jù) -
take(?)
— 只發(fā)射開始的N項(xiàng)數(shù)據(jù) -
first(?)
andtakeFirst(?)
— 只發(fā)射第一項(xiàng)數(shù)據(jù),或者滿足某種條件的第一項(xiàng)數(shù)據(jù) -
firstOrDefault(?)
— 只發(fā)射第一項(xiàng)數(shù)據(jù)鸵荠,如果Observable為空就發(fā)射默認(rèn)值 -
elementAt(?)
— 發(fā)射第N項(xiàng)數(shù)據(jù) -
elementAtOrDefault(?)
— 發(fā)射第N項(xiàng)數(shù)據(jù)冕茅,如果Observable數(shù)據(jù)少于N項(xiàng)就發(fā)射默認(rèn)值 -
sample(?)
orthrottleLast(?)
— 定期發(fā)射Observable最近的數(shù)據(jù) -
throttleFirst(?)
— 定期發(fā)射Observable發(fā)射的第一項(xiàng)數(shù)據(jù) -
throttleWithTimeout(?)
ordebounce(?)
— 只有當(dāng)Observable在指定的時(shí)間后還沒有發(fā)射數(shù)據(jù)時(shí),才發(fā)射一個(gè)數(shù)據(jù) -
timeout(?)
— 如果在一個(gè)指定的時(shí)間段后還沒發(fā)射數(shù)據(jù)蛹找,就發(fā)射一個(gè)異常 -
distinct(?)
— 過濾掉重復(fù)數(shù)據(jù) -
distinctUntilChanged(?)
— 過濾掉連續(xù)重復(fù)的數(shù)據(jù) -
ofType(?)
— 只發(fā)射指定類型的數(shù)據(jù) -
ignoreElements(?)
— 丟棄所有的正常數(shù)據(jù)姨伤,只發(fā)射錯(cuò)誤或完成通知
結(jié)合操作符
-
startWith(?)
— 在數(shù)據(jù)序列的開頭增加一項(xiàng)數(shù)據(jù) -
merge(?)
— 將多個(gè)Observable合并為一個(gè) -
mergeDelayError(?)
— 合并多個(gè)Observables,讓沒有錯(cuò)誤的Observable都完成后再發(fā)射錯(cuò)誤通知 -
zip(?)
— 使用一個(gè)函數(shù)組合多個(gè)Observable發(fā)射的數(shù)據(jù)集合庸疾,然后再發(fā)射這個(gè)結(jié)果 -
and(?)
,then(?)
, andwhen(?)
— (rxjava-joins
) 通過模式和計(jì)劃組合多個(gè)Observables發(fā)射的數(shù)據(jù)集合 -
combineLatest(?)
— 當(dāng)兩個(gè)Observables中的任何一個(gè)發(fā)射了一個(gè)數(shù)據(jù)時(shí)姜挺,通過一個(gè)指定的函數(shù)組合每個(gè)Observable發(fā)射的最新數(shù)據(jù)(一共兩個(gè)數(shù)據(jù)),然后發(fā)射這個(gè)函數(shù)的結(jié)果 -
join(?)
andgroupJoin(?)
— 無論何時(shí)彼硫,如果一個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù)項(xiàng),只要在另一個(gè)Observable發(fā)射的數(shù)據(jù)項(xiàng)定義的時(shí)間窗口內(nèi)凌箕,就將兩個(gè)Observable發(fā)射的數(shù)據(jù)合并發(fā)射 -
switchOnNext(?)
— 將一個(gè)發(fā)射Observables的Observable轉(zhuǎn)換成另一個(gè)Observable拧篮,后者發(fā)射這些Observables最近發(fā)射的數(shù)據(jù)
錯(cuò)誤處理
-
onErrorResumeNext(?)
— 指示Observable在遇到錯(cuò)誤時(shí)發(fā)射一個(gè)數(shù)據(jù)序列 -
onErrorReturn(?)
— 指示Observable在遇到錯(cuò)誤時(shí)發(fā)射一個(gè)特定的數(shù)據(jù) -
onExceptionResumeNext(?)
— instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable)指示Observable遇到錯(cuò)誤時(shí)繼續(xù)發(fā)射數(shù)據(jù) -
retry(?)
— 指示Observable遇到錯(cuò)誤時(shí)重試 -
retryWhen(?)
— 指示Observable遇到錯(cuò)誤時(shí),將錯(cuò)誤傳遞給另一個(gè)Observable來決定是否要重新給訂閱這個(gè)Observable
輔助操作符
-
materialize(?)
— 將Observable轉(zhuǎn)換成一個(gè)通知列表convert an Observable into a list of Notifications -
dematerialize(?)
— 將上面的結(jié)果逆轉(zhuǎn)回一個(gè)Observable -
timestamp(?)
— 給Observable發(fā)射的每個(gè)數(shù)據(jù)項(xiàng)添加一個(gè)時(shí)間戳 -
serialize(?)
— 強(qiáng)制Observable按次序發(fā)射數(shù)據(jù)并且要求功能是完好的 -
cache(?)
— 記住Observable發(fā)射的數(shù)據(jù)序列并發(fā)射相同的數(shù)據(jù)序列給后續(xù)的訂閱者 -
observeOn(?)
— 指定觀察者觀察Observable的調(diào)度器 -
subscribeOn(?)
— 指定Observable執(zhí)行任務(wù)的調(diào)度器 -
doOnEach(?)
— 注冊一個(gè)動(dòng)作牵舱,對Observable發(fā)射的每個(gè)數(shù)據(jù)項(xiàng)使用 -
doOnCompleted(?)
— 注冊一個(gè)動(dòng)作串绩,對正常完成的Observable使用 -
doOnError(?)
— 注冊一個(gè)動(dòng)作,對發(fā)生錯(cuò)誤的Observable使用 -
doOnTerminate(?)
— 注冊一個(gè)動(dòng)作芜壁,對完成的Observable使用礁凡,無論是否發(fā)生錯(cuò)誤 -
doOnSubscribe(?)
— 注冊一個(gè)動(dòng)作,在觀察者訂閱時(shí)使用 -
doOnUnsubscribe(?)
— 注冊一個(gè)動(dòng)作慧妄,在觀察者取消訂閱時(shí)使用 -
finallyDo(?)
— 注冊一個(gè)動(dòng)作顷牌,在Observable完成時(shí)使用 -
delay(?)
— 延時(shí)發(fā)射Observable的結(jié)果 -
delaySubscription(?)
— 延時(shí)處理訂閱請求 -
timeInterval(?)
— 定期發(fā)射數(shù)據(jù) -
using(?)
— 創(chuàng)建一個(gè)只在Observable生命周期存在的資源 -
single(?)
— 強(qiáng)制返回單個(gè)數(shù)據(jù),否則拋出異常 -
singleOrDefault(?)
— 如果Observable完成時(shí)返回了單個(gè)數(shù)據(jù)塞淹,就返回它窟蓝,否則返回默認(rèn)數(shù)據(jù) -
toFuture(?)
,toIterable(?)
,toList(?)
— 將Observable轉(zhuǎn)換為其它對象或數(shù)據(jù)結(jié)構(gòu)
條件操作符
-
amb(?)
— 給定多個(gè)Observable,只讓第一個(gè)發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù) -
defaultIfEmpty(?)
— 發(fā)射來自原始Observable的數(shù)據(jù)饱普,如果原始Observable沒有發(fā)射數(shù)據(jù)运挫,就發(fā)射一個(gè)默認(rèn)數(shù)據(jù) - (
rxjava-computation-expressions
)doWhile(?)
— 發(fā)射原始Observable的數(shù)據(jù)序列状共,然后重復(fù)發(fā)射這個(gè)序列直到不滿足這個(gè)條件為止 - (
rxjava-computation-expressions
)ifThen(?)
— 只有當(dāng)某個(gè)條件為真時(shí)才發(fā)射原始Observable的數(shù)據(jù)序列,否則發(fā)射一個(gè)空的或默認(rèn)的序列 -
skipUntil(?)
— 丟棄原始Observable發(fā)射的數(shù)據(jù)谁帕,直到第二個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù)峡继,然后發(fā)射原始Observable的剩余數(shù)據(jù) -
skipWhile(?)
— 丟棄原始Observable發(fā)射的數(shù)據(jù),直到一個(gè)特定的條件為假匈挖,然后發(fā)射原始Observable剩余的數(shù)據(jù) - (
rxjava-computation-expressions
)switchCase(?)
— 基于一個(gè)計(jì)算結(jié)果碾牌,發(fā)射一個(gè)指定Observable的數(shù)據(jù)序列 -
takeUntil(?)
— 發(fā)射來自原始Observable的數(shù)據(jù),直到第二個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù)或一個(gè)通知 -
takeWhile(?)
andtakeWhileWithIndex(?)
— 發(fā)射原始Observable的數(shù)據(jù)关划,直到一個(gè)特定的條件為真小染,然后跳過剩余的數(shù)據(jù) - (
rxjava-computation-expressions
)whileDo(?)
— 如果條件為true
,則發(fā)射源Observable數(shù)據(jù)序列贮折,并且只要條件保持為true
就重復(fù)發(fā)射此數(shù)據(jù)序列
布爾操作符
-
all(?)
— 判斷是否所有的數(shù)據(jù)項(xiàng)都滿足某個(gè)條件 -
contains(?)
— 判斷Observable是否會發(fā)射一個(gè)指定的值 -
exists(?)
andisEmpty(?)
— 判斷Observable是否發(fā)射了一個(gè)值 -
sequenceEqual(?)
— 判斷兩個(gè)Observables發(fā)射的序列是否相等
算數(shù)模塊的操作符
-
averageInteger(?)
— 求序列平均數(shù)并發(fā)射 -
averageLong(?)
— 求序列平均數(shù)并發(fā)射 -
averageFloat(?)
— 求序列平均數(shù)并發(fā)射 -
averageDouble(?)
— 求序列平均數(shù)并發(fā)射 -
max(?)
— 求序列最大值并發(fā)射 -
maxBy(?)
— 求最大key對應(yīng)的值并發(fā)射 -
min(?)
— 求最小值并發(fā)射 -
minBy(?)
— 求最小Key對應(yīng)的值并發(fā)射 -
sumInteger(?)
— 求和并發(fā)射 -
sumLong(?)
— 求和并發(fā)射 -
sumFloat(?)
— 求和并發(fā)射 -
sumDouble(?)
— 求和并發(fā)射
其它聚合操作符
-
concat(?)
— 順序連接多個(gè)Observables -
count(?)
andcountLong(?)
— 計(jì)算數(shù)據(jù)項(xiàng)的個(gè)數(shù)并發(fā)射結(jié)果 -
reduce(?)
— 對序列使用reduce()函數(shù)并發(fā)射最終的結(jié)果 -
collect(?)
— 將原始Observable發(fā)射的數(shù)據(jù)放到一個(gè)單一的可變的數(shù)據(jù)結(jié)構(gòu)中裤翩,然后返回一個(gè)發(fā)射這個(gè)數(shù)據(jù)結(jié)構(gòu)的Observable -
toList(?)
— 收集原始Observable發(fā)射的所有數(shù)據(jù)到一個(gè)列表,然后返回這個(gè)列表 -
toSortedList(?)
— 收集原始Observable發(fā)射的所有數(shù)據(jù)到一個(gè)有序列表调榄,然后返回這個(gè)列表 -
toMap(?)
— 將序列數(shù)據(jù)轉(zhuǎn)換為一個(gè)Map踊赠,Map的key是根據(jù)一個(gè)函數(shù)計(jì)算的 -
toMultiMap(?)
— 將序列數(shù)據(jù)轉(zhuǎn)換為一個(gè)列表,同時(shí)也是一個(gè)Map每庆,Map的key是根據(jù)一個(gè)函數(shù)計(jì)算的
異步操作符
-
start(?)
— 創(chuàng)建一個(gè)Observable筐带,它發(fā)射一個(gè)函數(shù)的返回值 -
toAsync(?)
orasyncAction(?)
orasyncFunc(?)
— 將一個(gè)函數(shù)或者Action轉(zhuǎn)換為已Observable,它執(zhí)行這個(gè)函數(shù)并發(fā)射函數(shù)的返回值 -
startFuture(?)
— 將一個(gè)返回Future的函數(shù)轉(zhuǎn)換為一個(gè)Observable缤灵,它發(fā)射Future的返回值 -
deferFuture(?)
— 將一個(gè)返回Observable的Future轉(zhuǎn)換為一個(gè)Observable伦籍,但是并不嘗試獲取這個(gè)Future返回的Observable,直到有訂閱者訂閱它 -
forEachFuture(?)
— 傳遞Subscriber方法給一個(gè)Subscriber腮出,但是同時(shí)表現(xiàn)得像一個(gè)Future一樣阻塞直到它完成 -
fromAction(?)
— 將一個(gè)Action轉(zhuǎn)換為Observable帖鸦,當(dāng)一個(gè)訂閱者訂閱時(shí),它執(zhí)行這個(gè)action并發(fā)射它的返回值 -
fromCallable(?)
— 將一個(gè)Callable轉(zhuǎn)換為Observable胚嘲,當(dāng)一個(gè)訂閱者訂閱時(shí)作儿,它執(zhí)行這個(gè)Callable并發(fā)射Callable的返回值,或者發(fā)射異常 -
fromRunnable(?)
— convert a Runnable into an Observable that invokes the runable and emits its result when a Subscriber subscribes將一個(gè)Runnable轉(zhuǎn)換為Observable馋劈,當(dāng)一個(gè)訂閱者訂閱時(shí)攻锰,它執(zhí)行這個(gè)Runnable并發(fā)射Runnable的返回值 -
runAsync(?)
— 返回一個(gè)StoppableObservable,它發(fā)射某個(gè)Scheduler上指定的Action生成的多個(gè)actions
連接操作符
-
ConnectableObservable.connect(?)
— 指示一個(gè)可連接的Observable開始發(fā)射數(shù)據(jù) -
Observable.publish(?)
— 將一個(gè)Observable轉(zhuǎn)換為一個(gè)可連接的Observable -
Observable.replay(?)
— 確保所有的訂閱者看到相同的數(shù)據(jù)序列妓雾,即使它們在Observable開始發(fā)射數(shù)據(jù)之后才訂閱 -
ConnectableObservable.refCount(?)
— 讓一個(gè)可連接的Observable表現(xiàn)得像一個(gè)普通的Observable