RxAndroid
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
RxAndroid 是在 RxJava 上的擴展,用法和 RxJava 一樣脱柱。結(jié)構(gòu)比較簡單,主要用在 Android 上的線程切換拉馋。
Observable.just(1,2)
.subscribeOn(Schedulers.io())
// 切換到 Android 的主線程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observerInt)
看源碼
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
通過 Looper.getMainLooper() 構(gòu)建的 Handler 來切換榨为。
看 HandlerScheduler 內(nèi)部類 HandlerWorker 的 schedule 方法:
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
...
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
內(nèi)部原理就是 Android 的 Handler 這一套機制。
除了切換到主線程還可以根據(jù)某個線程上的 Looper 切換到指定線程上煌茴。
Looper backgroundLooper = // ...
Observable.just("one", "two", "three", "four", "five")
.observeOn(AndroidSchedulers.from(backgroundLooper))
.subscribe(/* an Observer */)
就是根據(jù)傳入的 Looper 構(gòu)建一個 Handler随闺。
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new HandlerScheduler(new Handler(looper));
}
RxKotlin
implementation 'io.reactivex.rxjava2:rxkotlin:2.2.0'
用 Kotlin 語言寫 RxJava 的功能就行,但 Kotlin 本身有些語言特性(比如擴展)蔓腐,使用 RxKotlin 后語法可以更簡潔矩乐。類似 Google 的 ktx 框架,沒什么新功能合住,只是對一些方法的封裝使得使用起來更方便绰精。
RxJava 是調(diào)用 Observable 的方法撒璧,比如:
Observable.fromArray("a","b","c").subscribe(observerStr)
而使用 RxKotlin 就是這樣
arrayOf("a", "b", "c").toObservable().subscribe(observerStr)
源碼
fun <T : Any> Array<T>.toObservable(): Observable<T> = Observable.fromArray(*this)
在數(shù)組上定義的擴展方法 toObservable()
內(nèi)部其實就是調(diào)用的 fromArray 方法透葛。
其實擴展方法更符合人的思維,不像原來那種卿樱,乍看之下是別扭的僚害。
全部的擴展方法見 https://github.com/ReactiveX/RxKotlin。