使用
首先從代碼層面來(lái)分析RxJava的每一步到底干了什么。
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
emitter.onNext("A");
}
}).map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String s) throws Throwable {
return null;
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Bitmap>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable = d;
}
@Override
public void onNext(@NonNull Bitmap bitmap) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
源碼分析
new ObservableOnSubscribe
在這里ObservableOnSubscribe就是我們的被觀察者
public interface ObservableOnSubscribe<@NonNull T> {
/**
* Called for each {@link Observer} that subscribes.
* @param emitter the safe emitter instance, never {@code null}
* @throws Throwable on error
*/
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Throwable;
}
Observable.create
調(diào)用Observable.create的時(shí)候?qū)⒈挥^察者傳了進(jìn)來(lái)并且創(chuàng)建了對(duì)象
public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
Hook
hook的含義就是在函數(shù)執(zhí)行的過(guò)程中,我們有一個(gè)鉤子函數(shù)狈邑,可以優(yōu)先執(zhí)行我們的代碼驾胆,然后再接著執(zhí)行牲证。
上面的方法中調(diào)用了RxJavaPlugins.onAssembly函數(shù)
RxJavaPlugins.onAssembly(new ObservableCreate<>(source))
/*在這里什么都沒(méi)有做纠脾,直接將source返回了亿傅,所以我們?cè)谶@里可以給
onObservableAssembly進(jìn)行賦值老玛,通過(guò)setOnObservableAssembly方法淤年。
*/
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
//如果onObservableAssembly有值的話會(huì)執(zhí)行apply方法
static <T, R> R apply(@NonNull Function<T, R> f, @NonNull T t) {
try {
//這里會(huì)執(zhí)行我們的hook方法,將被觀察者傳遞進(jìn)來(lái)蜡豹,在我們實(shí)現(xiàn)的hook方法中返回觀察者
return f.apply(t);
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
//自定義hook方法實(shí)現(xiàn)
RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
@Override
public Observable apply(Observable observable) throws Throwable {
//為了不影響代碼流程麸粮,我們需要將observable返回
return observable;
}
});
map(new Function<String, Bitmap>()
上面我們獲取到了對(duì)象,再調(diào)用map肯定是調(diào)用了
對(duì)象的map方法,由于他繼承自O(shè)bservable镜廉,所以調(diào)用的還是Observable的map方法弄诲。
這里又生成了對(duì)象
public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
}
subscribeOn
對(duì)象. subscribeOn
public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
}
這里傳入了一個(gè)Scheduler對(duì)象Schedulers.io(),并且創(chuàng)建了ObservableSubscribeOn對(duì)象
最終拿到了一個(gè)線程池娇唯。
observeOn
AndroidSchedulers.mainThread()
observeOn創(chuàng)建了ObservableObserveOn對(duì)象
new Observer<Bitmap>()
創(chuàng)建了一個(gè)觀察者
public interface Observer<@NonNull T> {
/**
* Provides the {@link Observer} with the means of cancelling (disposing) the
* connection (channel) with the {@link Observable} in both
* synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
* @param d the {@link Disposable} instance whose {@link Disposable#dispose()} can
* be called anytime to cancel the connection
* @since 2.0
*/
void onSubscribe(@NonNull Disposable d);
/**
* Provides the {@link Observer} with a new item to observe.
* <p>
* The {@link Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
* {@link #onError}.
*
* @param t
* the item emitted by the Observable
*/
void onNext(@NonNull T t);
/**
* Notifies the {@link Observer} that the {@link Observable} has experienced an error condition.
* <p>
* If the {@code Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onComplete}.
*
* @param e
* the exception encountered by the Observable
*/
void onError(@NonNull Throwable e);
/**
* Notifies the {@link Observer} that the {@link Observable} has finished sending push-based notifications.
* <p>
* The {@code Observable} will not call this method if it calls {@link #onError}.
*/
void onComplete();
}
層次圖
分析到這里就有了代碼嵌套層次了
subscribe
subscribe方法內(nèi)部調(diào)用了subscribeActual(observer);那我們可以看到調(diào)用的就是ObservableObserveOn的subscribeActual方法
@Override
protected void subscribeActual(Observer<? super T> observer) {
//主線程的HandlerScheduler明顯不是TrampolineScheduler的子類
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
}
}
//調(diào)用了createWorker
@Override
public Worker createWorker() {
return new HandlerWorker(handler, async);
}
/*source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
這里的source就是$\color{#FF0000}{ObservableSubscribeOn}$對(duì)象齐遵,因?yàn)檎{(diào)用observeOn方法的是ObservableSubscribeOn,并且他將自己作為source傳了進(jìn)去
*/
public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
Objects.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
}
new ObserveOnObserver
在上面的subscribeActual方法我們可以看到他創(chuàng)建了一個(gè)新的ObserveOnObserver對(duì)象
//這里接收了一個(gè)observer對(duì)象塔插,而這個(gè)observer正是我們的觀察者梗摇,也就是在這里對(duì)觀察者又進(jìn)行了一次封裝
new ObserveOnObserver<>(observer, w, delayError, bufferSize)
source.subscribe,source就是ObservableSubscribeOn
//這里SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);又對(duì)observer封裝了一層想许,并且調(diào)用了onSubscribe伶授,最終會(huì)調(diào)到我們自己的觀察者的onSubscribe方法
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
observer.onSubscribe(parent);
//將觀察者放入線程池中執(zhí)行,下面的SubscribeTask可以看到在子線程里面執(zhí)行了source.subscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//執(zhí)行了上一步的subscribe方法
source.subscribe(parent);
}
}
對(duì)觀察者進(jìn)行封裝圖
一層一層向上調(diào)用subscribe(@NonNull Observer<? super T> observer)伸刃,--->subscribeActual(observer);
當(dāng)調(diào)用到了最上層的時(shí)候source就是我們的被觀察者了
public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
Objects.requireNonNull(source, "source is null");
//注意他這里傳的可不是this
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
在我們自定義的被觀察者里面的subscribe調(diào)用了emitter.onNext("A");
當(dāng)執(zhí)行到以后一層也就是我們自己調(diào)用subscribe方法的時(shí)候
他會(huì)執(zhí)行ObserveOnObserver的onNext
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
//這里的work就是我們創(chuàng)建出來(lái)的HandlerWork
worker.schedule(this);
}
}
//HandlerWork類的schedule方法
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposable.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
if (async) {
message.setAsynchronous(true);
}
//使用了主線程的Handler發(fā)送消息
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposable.disposed();
}
return scheduled;
}
當(dāng)執(zhí)行上面的run方法的時(shí)候
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
//*************************************************
//在這里最終調(diào)用了onNext的方法
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
總結(jié)
1.執(zhí)行過(guò)程
- 從上到下對(duì)被觀察者進(jìn)行封裝
- 從下往上對(duì)觀察者進(jìn)行封裝
- 然后再?gòu)纳贤聢?zhí)行onNext
思考一個(gè)問(wèn)題谎砾?對(duì)觀察者和被觀察者的封裝層數(shù)是一樣的,那么是不是可以看成是1個(gè)被觀察者對(duì)應(yīng)一個(gè)觀察者
對(duì)應(yīng)關(guān)系.png
2.觀察者的onSubscribe是在ObservableSubscribeOn的subscribeActual方法中執(zhí)行的捧颅,此時(shí)還沒(méi)有進(jìn)行線程的切換景图,那么就是說(shuō)在那個(gè)線程使用的RXJava那么觀察者的onSubscribe方法就執(zhí)行在哪個(gè)線程
- subscribeOn只會(huì)負(fù)責(zé)上層的線程調(diào)度,observeOn只有在執(zhí)行onNext的時(shí)候才起作用碉哑,也就是下層的線程調(diào)度
4.使用handler(getMainLooper)來(lái)保證主線程操作
自上而下(左邊的流程)->自下而上(右邊的流程+subscribe的調(diào)用)->自上而下(onNext的調(diào)用)