RxJava2 源碼淺析
ReactiveX
歷史:
ReactiveX是Reactive Extensions的縮寫,一般簡寫為Rx棘劣,最初是LINQ的一個擴展昆庇,由微軟的架構師Erik Meijer領導的團隊開發(fā)扎阶,在2012年11月開源,Rx是一個編程模型蜓洪,目標是提供一致的編程接口,幫助開發(fā)者更方便的處理異步數(shù)據(jù)流坯苹,Rx庫支持.NET隆檀、JavaScript和C++,Rx近幾年越來越流行了,現(xiàn)在已經(jīng)支持幾乎全部的流行編程語言了恐仑,Rx的大部分語言庫由ReactiveX這個組織負責維護泉坐,比較流行的有RxJava/RxJS/Rx.NET,社區(qū)網(wǎng)站是 reactivex.io
定義:
ReactiveX.io給的定義是裳仆,Rx是一個使用可觀察數(shù)據(jù)流進行異步編程的編程接口坚冀,ReactiveX結合了觀察者模式、迭代器模式和函數(shù)式編程的精華鉴逞。
ReactiveX不僅僅是一個編程接口记某,它是一種編程思想的突破,它影響了許多其它的程序庫和框架以及編程語言构捡。
這就是數(shù)據(jù)流液南?
RxJava2定義
a library for composing asynchronous and event-based programs by using observable sequences.(一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫)
大致流程
上代碼:
Observable.
create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("test First");
Log.e("TAG", "subScribe test First");
emitter.onNext("test Second");
Log.e("TAG", "subScribe test Second");
emitter.onComplete();
Log.e("TAG", "subScribe onComplete");
}
}).
subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("TAG", "onSubscribe");
mDisposable = d;
}
@Override
public void onNext(String s) {
Log.e("TAG", "onNext");
Log.e("TAG", s);
if (s.equals("test First")) {
mDisposable.dispose();
}
}
@Override
public void onError(Throwable e) {
Log.e("TAG", "onError");
}
@Override
public void onComplete() {
Log.e("TAG", "onComplete");
}
});
剛開始學習RxJava 時勾徽,這段代碼給我最直觀的感受就是滑凉,這不就是自己調(diào)用自己嗎。ObservableEmitter<String> emitter 這個就是下面的subscribe(new Observer<String>())喘帚。對吧畅姊,我覺得大家應該都是這樣的感受吧...
追蹤一下源碼:點擊create()方法進去看一下:
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
public static <T> Observable<T> onAssembly(Observable<T> source) {
Function<Observable, Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
看了代碼onObservableAssembly為null,所以create方法之后這個對象被包裝成new ObservableCreate<T>(source),source是外面?zhèn)鬟M來的吹由。
關鍵字:io.reactivex.internal.operators.observable.ObservableCreate
繼續(xù)看下一個操作符:subscribe()若未,點進去看一下
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);//observer原樣返回,沒改動
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);//關鍵點
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
// RxJavaPlugins.onSubscribe
public static <T> Observer<? super T> onSubscribe(Observable<T> source, Observer<? super T> observer) {
BiFunction<Observable, Observer, Observer> f = onObservableSubscribe; //f 為null
if (f != null) {
return apply(f, source, observer);
}
return observer;
}
上面說過了經(jīng)過create()方法或這個對象已經(jīng)是ObservableCreate了倾鲫,那么最終會調(diào)用的就是subscribeActual(observer) 看一下ObservableCreate這個類的代碼:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
}
看一下粗合,這里會把observer包裝成一個CreateEmitter對象,然后source是Observable.create(new ObservableOnSubscribe<String>())傳進來的ObservableOnSubscribe對象乌昔。然后會調(diào)用observer.onSubscribe(parent);source.subscribe(parent);終于清晰了...可以回答上面的問題了隙疚,其實Observer和ObservableEmitter可以看成是一個對象,只是對observer做了個包裝...
Scheduler 線程變換(subscribeOn 和 observeOn)
說到線程變換即線程間通信磕道,因為我是學Android供屉,所以第一印象就是Handler,然后就是Future溺蕉×尕ぃ看了源碼后發(fā)現(xiàn)RxJava用的是Future,ScheduledExecutorService焙贷,Runnable撵割,二AndroidScheduler就是用Handler的,因為需要切換到Android中的UI線程辙芍。
subscribeOn(Schedulers.newThread())
點進去看一下:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
}
傳進來的是Schedulers.newThread()啡彬,點擊Schedulers.newThread() 點進去
發(fā)現(xiàn)最終返回的是NewThreadScheduler
關鍵字:io.reactivex.internal.schedulers.NewThreadScheduler
io.reactivex.internal.schedulers.NewThreadWorker(真正做線程調(diào)度的類)
發(fā)現(xiàn)有scheduler.scheduleDirect(new Runnable())點擊進去羹与,最終調(diào)用
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
w.schedule(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} finally {
w.dispose();
}
}
}, delay, unit);
return w;
}
最終還是w.schedule(new Runnable()),w就是NewThreadWorker,找到這個類看一下schedule方法,最終會調(diào)用:
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
parent.remove(sr);
RxJavaPlugins.onError(ex);
}
return sr;
}
看到future和executor了庶灿,這里就是線程切換
observeOn(AndroidSchedulers.mainThread())
ObservableObserveOn 最終調(diào)用的是HandlerScheduler和HandlerWorker
HandlerWorker:
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (delay < 0) throw new IllegalArgumentException("delay < 0: " + delay);
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
發(fā)現(xiàn)是用Handler來做線程切換纵搁,Handler管理的Looper是Looper.getMainLooper(),所以把消息發(fā)送到了主線程往踢。