響應(yīng)式編程
說道rxjava,就要提到響應(yīng)式編程
響應(yīng)式編程是一種基于異步數(shù)據(jù)流概念的編程模式。
數(shù)據(jù)流就像一條河:它可以被觀測课舍,被過濾霹俺,被操作,或者為新的消費者與另外一條流合并為一條新的流掰邢。
簡介
RxJava可以濃縮為異步兩個字牺陶,其核心的東西不外乎兩個伟阔, Observable(被觀察者) 和 Observer(觀察者)。Observable可以發(fā)出一系列的 事件(例如網(wǎng)絡(luò)請求掰伸、復(fù)雜計算皱炉、數(shù)據(jù)庫操作、文件讀取等)碱工,事件執(zhí)行結(jié)束后交給Observer的回調(diào)處理娃承。
使用
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("5555555");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String o) {
Log.e("onNext",o);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}
看源碼
先看被觀察者Observable的create方法,Observable是一個抽象類
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//null判斷
ObjectHelper.requireNonNull(source, "source is null");
//追蹤該代碼
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
//將Observable返回給我們
return source;
}
//ObservableCreate是Observable(被觀察者)的子類
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
從上述代碼可以看出,最終將Observable返回給了我們,那我們看看ObservableOnSubscribe是什么
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param emitter the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
接下我們看Observer,這幾個方法很熟悉
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
然后我們看Observable的subscribe方法
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//該方法是Observable的抽象方法,是Observable的子類實現(xiàn)的
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
//我們知道ObservableCreate是Observable的子類,所以我們看ObservableCreate中實現(xiàn)的subscribeActual方法
@Override
protected void subscribeActual(Observer observer) {
//是ObservableCreate的靜態(tài)內(nèi)部類
CreateEmitter<T> parent = new CreateEmitter(observer);
observer.onSubscribe(parent);
try {
//source就是我們在Observable.create方法中傳入的ObservableOnSubscribe
source.subscribe(parent);
} catch (Exception e) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
//被觀察者和觀察者發(fā)生了關(guān)系....
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
線程調(diào)度
subscribeOn
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
//這里代碼和 Observable.create方法類似,直接看ObservableSubscribeOn
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
//Observable實現(xiàn)了ObservableSource這個接口的,所以這里的source是指我們剛才創(chuàng)建的那個Observable
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//Observer的onSubscribe方法,因為此時的Observable的subscribe
//方法發(fā)生在當(dāng)前線程怕篷,所以O(shè)bserver的onSubscribe方法的執(zhí)行
//線程和當(dāng)前調(diào)用Observable的subscribe方法的線程一致
s.onSubscribe(parent);
//SubscribeTask是一個Runnable,scheduler是線程調(diào)度對象,我們跟蹤scheduleDirect方法
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
//將新的 Disposable 設(shè)置給 parent 历筝,方便取消訂閱關(guān)系,
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//執(zhí)行了source(ObservableSource)的subscribe方法
//即上游Observable的subscribe方法
source.subscribe(parent);
}
}
}
subscribeOn方法返回了一個新的Observable廊谓,而這個新的Observable里面持有一個上一層Observable的引用梳猪。那個引用就是source。調(diào)用subscribeOn方法后蒸痹,在他之前的和在他之后的代碼執(zhí)行的線程都是subscribeOn指定的線程,onSubscribe方法除外,因為Observable的subscribe方法發(fā)生在當(dāng)前線程春弥,所以O(shè)bserver的onSubscribe方法的執(zhí)行線程和當(dāng)前調(diào)用Observable的subscribe方法的線程一致!
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//包裝了一下Runnable,DisposeTask還是一個Runnable
DisposeTask task = new DisposeTask(decoratedRun, w);
//schedule是Worker的抽象方法,在這我們追蹤下IoScheduler的schedule方法
w.schedule(task, delay, unit);
return task;
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
//通過線程池執(zhí)行Runnable
f = executor.submit((Callable<Object>)sr);
} else {
//通過線程池執(zhí)行Runnable
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
我們看到最終調(diào)用了NewThreadWorker的中的線程池去執(zhí)行我們傳入的Runnable的run方法,即訂閱方法中的線程始終由上游observable.subscribeOn方法來決定
subscribeOn第一次有效原理
由于訂閱是由下而上的,所以最上面的subscribeOn方法是最后執(zhí)行了,而subscribeOn方法返回的一個新的Observable里持有一個上一層Observable的引用,所以這里的線程是由上游observable.subscribeOn方法中傳入的線程來決定,個人理解就像遞歸調(diào)用一樣,直至調(diào)用到最上層的第一個subscribeOn方法.
舉個栗子
public class Rx {
String name;
Rx rx;
public Rx(Rx rx,String name) {
this.name = name;
this.rx = rx;
}
public void subscribe(){
System.out.println(name+"subscribe線程:"+Thread.currentThread().getName());
if (rx == null){
System.out.println("最終執(zhí)行"+name+"線程:"+Thread.currentThread().getName());
return;
}
new Thread(new Runnable() {
@Override
public void run() {
//調(diào)用上游rx的subscribe方法
rx.subscribe();
}
}).start();
}
}
public static void main(String[] args) {
Rx rx1 = new Rx(null, "rx1");
Rx rx2 = new Rx(rx1, "rx2");
Rx rx3 = new Rx(rx2, "rx3");
Rx rx4 = new Rx(rx3, "rx4");
rx4.subscribe();
}
結(jié)果如圖
observeOn
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//不啰嗦,直接看ObservableObserveOn的subscribeActual方法(真正的訂閱方法)
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//TrampolineScheduler 表示當(dāng)前線程
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
//重點是ObserveOnObserver觀察者
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
重點看ObserveOnObserver的onNext,onError,onComplete等方法
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
//切換了線程
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
//切換了線程
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
//切換了線程
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
//我們已IO線程為例,追蹤該方法
worker.schedule(this);
}
}
在Ioscheduler中找到實現(xiàn)的schedule方法
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
//該方法即為我們追蹤的subscribeOn中的相同的方法
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
看到這里,我們應(yīng)該了解了在線程調(diào)度過程中,subscribeOn和observeOn方法的區(qū)別,observeOn只要調(diào)用,就會切換下游事件的線程,而subscribeOn在他之前的和在他之后的代碼執(zhí)行的線程都是subscribeOn指定的線程,onSubscribe方法除外.