一點(diǎn)牢騷:
前段時(shí)間捧毛,接到需求观堂,舊項(xiàng)目要增添許多功能;舊項(xiàng)目是這樣的:功能以及代碼量就非常龐大呀忧,加上各種代碼不規(guī)范师痕、可讀性很差、代碼耦合度有點(diǎn)小高而账;
聽(tīng)到這個(gè)消息真的讓我腦袋大了一圈胰坟,
如果真的要在原有架構(gòu)上做開(kāi)發(fā),肯定會(huì)導(dǎo)致小組成員開(kāi)發(fā)沖突以及眾多的冗余代碼泞辐,浪費(fèi)時(shí)間和精力在非必要的事情上笔横,之前自身也知道舊項(xiàng)目有這個(gè)問(wèn)題 但由于新項(xiàng)目開(kāi)發(fā)呀嫌棄舊項(xiàng)目一直沒(méi)有決心去改動(dòng),這下好了完全推不了 那就改架構(gòu)吧咐吼,新的模式是 組件化+Rxjava.Retrofit+MVP模式吹缔,最近一直在忙著項(xiàng)目代碼架構(gòu)調(diào)整,相對(duì)應(yīng)的代碼模板編寫(xiě)等等锯茄,雖然說(shuō)改架構(gòu)是被逼的厢塘,但改著改著還是有成長(zhǎng)以及很有成就感的一件事情; 再接再厲肌幽。
說(shuō)實(shí)話晚碾,rxjava的源碼太難了,一直沒(méi)有去時(shí)間(懶癌)去學(xué)習(xí)牍颈; 包括現(xiàn)在項(xiàng)目比較緊張迄薄,每天下班后更是不太想去學(xué)習(xí),那么現(xiàn)在我就和大家一起看一下rxjava的源碼吧;
1、正常簡(jiǎn)易流程煮岁;
2讥蔽、帶線程切換流程;
3画机、map之后冶伞;
4、一些總結(jié)
1步氏、正常簡(jiǎn)易流程
基于以下這段代碼查看源碼
Observable.just("11")
.subscribe(observer);
大家應(yīng)該都知道或者聽(tīng)過(guò)响禽,Rxjava采用的是 增強(qiáng)版的觀察者模式,在訂閱的那一瞬間開(kāi)始執(zhí)行整個(gè)流程,那么現(xiàn)在看一下訂閱方法subscribe(Observer<? super T> observer)
Observable.class
@Override
public final void subscribe(Observer<? super T> observer) {
//..
// 實(shí)際訂閱
subscribeActual(observer);
//...
}
RxJavaPlugins.class
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
if (f != null) {
return apply(f, source, observer);
}
return observer;
}
static <T, U, R> R apply(@NonNull BiFunction<T, U, R> f, @NonNull T t, @NonNull U u) {
try {
return f.apply(t, u);
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
看到這里實(shí)際訂閱是發(fā)生在 observable 的 subscribeActual
中 而 subscribeActual
是個(gè)抽象方法芋类; 那么我們又要去找它的實(shí)現(xiàn)隆嗅;
這邊通過(guò)Observable.just
開(kāi)始看
Observable.calss
public static <T> Observable<T> just(T item) {
//...
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
ObservableJust.class
protected void subscribeActual(Observer<? super T> s) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
//調(diào)用 observer的 onSubsribe方法
s.onSubscribe(sd);
//執(zhí)行
sd.run();
}
ScalarDisposable.calss
public void run() {
// 判斷什么的
if (get() == START && compareAndSet(START, ON_NEXT)) {
//
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
可以看到 run是直接執(zhí)行的;
整體的一個(gè)簡(jiǎn)單正常的流程就是: observable.subscribe(Observer) -> observable.subscribeActual -> Observer.onSubscribe( Disposable ) -> ScalarDisposable.run -> observer.onNext(value) -> observer.onComplete();
其中正常完整流程都會(huì)執(zhí)行標(biāo)紅部分的方法;其中其它部分先放著,只是判斷有沒(méi)有完成完成所有數(shù)據(jù)流的發(fā)射
2侯繁、線程切換流程
基于以下這段代碼查看源碼
Observable.just("11")
.subscribeOn(Schedulers.io())//指定Observable 在哪個(gè)線程上創(chuàng)建執(zhí)行操作
.observeOn(AndroidSchedulers.mainThread()) //在指定下一事件發(fā)生的線程
.subscribe(observer);
2.1胖喳、 流向 Observable.subscribe 都經(jīng)歷了什么
先看下 Observable.subscribeOn
都做了些什么
Observable.class
public final Observable<T> subscribeOn(Scheduler scheduler) {
//
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
ObservableSubscribeOn.class 本質(zhì)上繼承 Observable
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
//保存以及初始化
super(source);
this.scheduler = scheduler;
}
可以看就就是轉(zhuǎn)換變成了 ObservableSubscribeOn
再看下 Observable.observeOn(Scheduler scheduler)
做了些什么
Observable.class 這邊應(yīng)該是: ObservableSubscribeOn extends .... Observable
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
ObservableObserveOn.class
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
本質(zhì)是將Observable
轉(zhuǎn)換成ObservableObserveOn
,在這個(gè)流程中是將 ObservableSubscribeOn
轉(zhuǎn)換成ObservableObserveOn
;
我們的Observable
變換是這樣子的,ObservableJust
->ObservableSubscribeOn
->ObservableObserveOn
一層一層被包含
2.2贮竟、流向 -> Observer.onSubscribe 都經(jīng)歷了什么
那么又到了我們的 訂閱方法subscribe(Observer<? super T> observer)
了,只不過(guò)我們中間多了幾層轉(zhuǎn)換; 我們?cè)賮?lái)看一下
Observable.class
@Override
public final void subscribe(Observer<? super T> observer) {
//...
// 實(shí)際訂閱
subscribeActual(observer);//...
}
ObservableObserveOn.class
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//創(chuàng)建一個(gè) Scheduler.Worker
Scheduler.Worker w = scheduler.createWorker();
// new一個(gè)新的 ObserveOnObserver implements Observer 再次循環(huán) Observable.subscribe
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
ObserveOnObserver.class .... implements Observer<T>, Runnable
Observable.class
@Override
public final void subscribe(Observer<? super T> observer) {
//..
// 實(shí)際訂閱
subscribeActual(observer);
//...
}
ObservableSubscribeOn.class
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//直接執(zhí)行丽焊,what? Observer.onSubscribe 不能指定線程
// 記錄一下 Observer.onSubscribe 的入口是
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
Observer
的轉(zhuǎn)變是這樣的 Observer
->ObserveOnObserver
->SubscribeOnObserver
以上面為準(zhǔn),先看下 s.onSubscribe(parent)
所經(jīng)歷的事情
ObserveOnObserver.class
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
actual.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
// 看這里 actual 其實(shí)是 Observer ;
actual.onSubscribe(this);
}
}
Observer.class
onSubscribe(sd){...}
這里究竟可以看到 執(zhí)行到 最初observer
的onSubscribe
的一條完整的線路咕别;
ObserveOnObserver.subscribeActual
-> ObservableSubscribeOn.subscribeActual
-> ObserveOnObserver.onSubscribe
-> Observer.onSubscribe
;
不知道有沒(méi)有細(xì)心的同學(xué)發(fā)現(xiàn)了沒(méi)有技健,'onSubscribe'的執(zhí)行沒(méi)有SubscribeOnObserver
什么事情,雖然說(shuō)上面有一層轉(zhuǎn)換成功了SubscribeOnObserver
畫(huà)成圖應(yīng)該就是下面這樣:
我們發(fā)現(xiàn)了 從訂閱開(kāi)始一直到執(zhí)行我們的 observer.onSubscribe()
中間沒(méi)有任何切換線程的影子惰拱;
所以我們得出了一個(gè)
observer的 onSubscribe 運(yùn)行與訂閱動(dòng)作發(fā)生在同一線程雌贱,不受線程指定方法(observeOn subscribeOn)影響
2.3、流向 -> observer.next弓颈、onComplete 都經(jīng)歷了什么
ObservableSubscribeOn.class
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
// new 出一個(gè) SubscribeTask
// scheduler.scheduleDirect 切換線程執(zhí)行 SubscribeTask
// SubscribeOnObserver.setDisposable方法
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
可以看到上面最后一段代碼做個(gè)這樣事情帽芽,一件一件去看一下:
// new 出一個(gè) SubscribeTask
// scheduler.scheduleDirect 切換線程執(zhí)行 SubscribeTask
// SubscribeOnObserver.setDisposable方法
先看一下SubscribeTask
的 run
里面是干嘛的
ObservableSubscribeOn.class
class SubscribeOnObserver
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//其中 source 是 ObservableJust
source.subscribe(parent);
}
}
由第一節(jié)的分析我們可以知道删掀,這邊最終會(huì)執(zhí)行到 SubscribeOnObserver.onNext()
-> ObserveOnObserver.onNext()
->Observer.onNext()
這邊一層一層調(diào)用出來(lái);
SubscribeTask.run
最終執(zhí)行我們的 最初observer.onNext() onComplete()
; 這邊還沒(méi)有涉及到線程切換
再看我們的 scheduler.scheduleDirect(new SubscribeTask)
我們上面用的是 Scheduler.IO 實(shí)際上是 IoScheduler;
IoScheduler extends Scheduler.class
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
//指定工作線程
w.schedule(task, delay, unit);
return task;
}
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
EventLoopWorker extends Scheduler.Worker
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
那么這邊流程就比較清晰了翔冀,拿到subscribeOn 設(shè)置的Scheduler
中創(chuàng)建一個(gè)Worker
設(shè)定了一個(gè) IO 線程;
看到這里 我們就該逆向地執(zhí)行我們 Observer 真正的方法了;
執(zhí)行到 SubscribeOnObserver.onNext()
ObservableSubscribeOn : SubscribeOnObserver<T>
@Override
public void onNext(T t) {
// actual 為 ObserveOnObserver
actual.onNext(t);
}
// scheduler 這邊指定為 AndroidSchedulers.mainThread() createWorker() 這邊不深究,里面轉(zhuǎn)成了 handler
Scheduler.Worker worker = scheduler.createWorker();
ObservableObserveOn : ObserveOnObserver
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
// 這個(gè)最終 執(zhí)行在handler
worker.schedule(this);
}
}
最后的流程應(yīng)該是這樣的
3披泪、map 數(shù)據(jù)操作源碼
Observable.just(1)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
return null;
}
}).subscribe(integer -> out("accept:" + integer));
Observable.class
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
可以看到 它是在 執(zhí)行完 function.apply在執(zhí)行 onNext();
配合上一節(jié) 纤子,流程圖就變成這樣了
4、一些總結(jié)
來(lái)個(gè)總結(jié)吧: 估計(jì)源碼看得很混亂款票。
1控硼、對(duì)Observable指定線程、數(shù)據(jù)變換等等艾少,都采用了一種代理包裝模式卡乾; 比如 ObservableJust-> ObservableSubscribeOn -> ObservableMap -> ObservableObserveOn ; 進(jìn)行了一層包裝;
2缚够、在訂閱完成的那一刻起幔妨,反向調(diào)用 subscribe():subscribeActual()方法;比如 :(ObservableObserveOn.subscribe->ObservableObserveOn.subscribeActual())
->(ObservableMap.subscribe->ObservableMap.subscribeActual())
->(ObservableSubscribeOn.subscribe->ObservableSubscribeOn.subscribeActual())
->(ObservableJust.subscribe->ObservableJust.subscribeActual())
3谍椅、Observer 误堡,同理包裝 Observer -> ObservableMap... 添加了指定 Schedulers.createWorker() ;
4、 Observer 的執(zhí)行順序是 Observer.onSubscribe() -> ObservableXX.onNext() -> ObsevableXXX.onNext() ->...-> Obsever.onNext() -> ObservableXX.OnComplete() -> ObsevableXXX.OnComplete() ->...-> Obsever.OnComplete();
5雏吭、 中間有些操作放入到了線程當(dāng)中.
其實(shí)有點(diǎn)坑的是:原本我就知道這個(gè)流程應(yīng)該是這樣的锁施,類似于事件分發(fā)機(jī)制成 U 字型的流程...... 本篇只是在 眾多代碼 中驗(yàn)證我的思路.................、