開頭
之前我們分析過subscribeOn
這個函數,
現在我們來看下subscribeOn
和observeOn
這兩個函數到底有什么異同纺非。
用過rxjava
的旁友都知道派草,subscribeOn
和observeOn
都是用來切換線程用的须蜗,可是我什么時候用subscribeOn
硅确,什么時候用observeOn
呢,我們很少知道這兩個區(qū)別是啥唠粥。
友情提示疏魏,如果不想看分析過程的,可以直接跳到下面的總結部分晤愧。
subscribeOn
先看下OperatorSubscribeOn
的核心代碼:
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;
final Observable<T> source;
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
inner.schedule(new Action0() {
@Override
public void call() {
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
....
};
source.unsafeSubscribe(s);
}
});
}
}
這里注意兩點:
- 因為
OperatorSubscribeOn
是個OnSubscribe
對象大莫,所以在call
參數中傳入的subscriber
就是我們在外面使用Observable.subscribe(a)
傳入的對象a
。
- 這里
source
對象指向的是調用subscribeOn
之前的那個Observable
序列官份。
明確了這兩點只厘,我們就很好的知道了subscribeOn
是如何工作,產生神奇的效果了舅巷。
其實最最主要的就是一行函數
source.unsafeSubscribe(s);
并且要注意它所在的位置羔味,是在worker的call
里面,說白了钠右,就是把source.subscribe
這一行調用放在指定的線程里赋元,那么總結起來的結論就是:
subscribeOn
的調用,改變了調用前序列所運行的線程飒房。
observeOn
同樣看下OperatorObserveOn
這個類的主要代碼:
public final class OperatorObserveOn<T> implements Operator<T, T> {
private final Scheduler scheduler;
private final boolean delayError;
/**
* @param scheduler the scheduler to use
* @param delayError delay errors until all normal events are emitted in the other thread?
*/
public OperatorObserveOn(Scheduler scheduler, boolean delayError) {
this.scheduler = scheduler;
this.delayError = delayError;
}
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
....
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError);
parent.init();
return parent;
}
/** Observe through individual queue per observer. */
private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
final Subscriber<? super T> child;
final Scheduler.Worker recursiveScheduler;
final NotificationLite<T> on;
final boolean delayError;
final Queue<Object> queue;
// the status of the current stream
volatile boolean finished;
final AtomicLong requested = new AtomicLong();
final AtomicLong counter = new AtomicLong();
/**
* The single exception if not null, should be written before setting finished (release) and read after
* reading finished (acquire).
*/
Throwable error;
// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
// not prevent anything downstream from consuming, which will happen if the Subscription is chained
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
this.delayError = delayError;
this.on = NotificationLite.instance();
if (UnsafeAccess.isUnsafeAvailable()) {
queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);
} else {
queue = new SpscAtomicArrayQueue<Object>(RxRingBuffer.SIZE);
}
}
void init() {
// don't want this code in the constructor because `this` can escape through the
// setProducer call
Subscriber<? super T> localChild = child;
localChild.setProducer(new Producer() {
@Override
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(requested, n);
schedule();
}
}
});
localChild.add(recursiveScheduler);
localChild.add(this);
}
@Override
public void onStart() {
// signal that this is an async operator capable of receiving this many
request(RxRingBuffer.SIZE);
}
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
@Override
public void onCompleted() {
if (isUnsubscribed() || finished) {
return;
}
finished = true;
schedule();
}
@Override
public void onError(final Throwable e) {
if (isUnsubscribed() || finished) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
return;
}
error = e;
finished = true;
schedule();
}
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
// only execute this from schedule()
@Override
public void call() {
long emitted = 0L;
long missed = 1L;
// these are accessed in a tight loop around atomics so
// loading them into local variables avoids the mandatory re-reading
// of the constant fields
final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;
final NotificationLite<T> localOn = this.on;
// requested and counter are not included to avoid JIT issues with register spilling
// and their access is is amortized because they are part of the outer loop which runs
// less frequently (usually after each RxRingBuffer.SIZE elements)
for (;;) {
long requestAmount = requested.get();
long currentEmission = 0L;
while (requestAmount != currentEmission) {
boolean done = finished;
Object v = q.poll();
boolean empty = v == null;
if (checkTerminated(done, empty, localChild, q)) {
return;
}
if (empty) {
break;
}
localChild.onNext(localOn.getValue(v));
currentEmission++;
emitted++;
}
if (requestAmount == currentEmission) {
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
return;
}
}
if (currentEmission != 0L) {
BackpressureUtils.produced(requested, currentEmission);
}
missed = counter.addAndGet(-missed);
if (missed == 0L) {
break;
}
}
if (emitted != 0L) {
request(emitted);
}
}
boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {
if (a.isUnsubscribed()) {
q.clear();
return true;
}
if (done) {
if (delayError) {
if (isEmpty) {
Throwable e = error;
try {
if (e != null) {
a.onError(e);
} else {
a.onCompleted();
}
} finally {
recursiveScheduler.unsubscribe();
}
}
} else {
Throwable e = error;
if (e != null) {
q.clear();
try {
a.onError(e);
} finally {
recursiveScheduler.unsubscribe();
}
return true;
} else
if (isEmpty) {
try {
a.onCompleted();
} finally {
recursiveScheduler.unsubscribe();
}
return true;
}
}
}
return false;
}
}
}
這里的代碼有點長搁凸,我們先注意到它是一個Operator
,它沒有對上層Observable
做任何的控制或者包裝狠毯。
既然是Operator
护糖,那么它的職責就是把一個Subscriber
轉換成另外一個Subscriber
, 我們來關注下轉換后的Subscriber
對轉換前的Subscriber
做了些什么事嚼松。
首先它是一個ObserveOnSubscriber
類嫡良, 既然是Subscriber
那么肯定有onNext
, onComplete
和onError
看最主要的onNext
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
好了锰扶,這里做了兩件事,首先把結果緩存到一個隊列里寝受,然后調用schedule
啟動傳入的worker
我們這里需要注意下:
在調用
observeOn
前的序列坷牛,把結果傳入到onNext
就是它的工作,它并不關心后續(xù)的流程很澄,所以工作就到這里就結束了漓帅,剩下的交給ObserveOnSubscriber
繼續(xù)。
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
recursiveScheduler
就是之前我們傳入的Scheduler痴怨,我們一般會在observeOn
傳入AndroidScheluders.mainThread()
對吧、
接下去器予,我們看下在scheduler
中調用的call
方法浪藻,這里只列出主要帶代碼
@Override
public void call() {
...
final Subscriber<? super T> localChild = this.child;
for (;;) {
...
boolean done = finished;
Object v = q.poll();
boolean empty = v == null;
if (checkTerminated(done, empty, localChild, q)) {
return;
}
if (empty) {
break;
}
localChild.onNext(localOn.getValue(v));
...
}
if (emitted != 0L) {
request(emitted);
}
}
OK,在Scheduler
啟動后乾翔, 我們在Observable.subscribe(a)
傳入的a
就是這里的child
爱葵, 我們看到,在call
中終于調用了它的onNext
方法反浓,把真正的結果傳了出去萌丈,但是在這里,我們是工作在observeOn
的線程上的雷则。
那么總結起來的結論就是:
observeOn
對調用之前的序列默不關心辆雾,也不會要求之前的序列運行在指定的線程上observeOn
對之前的序列產生的結果先緩存起來,然后再在指定的線程上月劈,推送給最終的subscriber
復雜情況
我們經常多次使用subscribeOn
切換線程度迂,那么以后是否可以組合observeOn
和subscribeOn
達到自由切換的目的呢?
組合是可以的猜揪,但是他們的執(zhí)行順序是有條件的惭墓,如果仔細分析的話,可以知道observeOn
調用之后而姐,再調用subscribeOn
是無效的腊凶,原因是什么?
因為subscribeOn
改變的是subscribe
這句調用所在的線程拴念,大多數情況钧萍,產生內容和消費內容是在同一線程的,所以改變了產生內容所在的線程丈莺,就改變了消費內容所在的線程划煮。
經過上面的闡述,我們知道缔俄,observeOn
的工作原理是把消費結果先緩存弛秋,再切換到新線程上讓原始消費者消費器躏,它和生產者是沒有一點關系的,就算subscribeOn
調用了蟹略,也只是改變observeOn
這個消費者所在的線程登失,和OperatorObserveOn
中存儲的原始消費者一點關系都沒有,它還是由observeOn
控制挖炬。
總結
如果我們有一段這樣的序列
Observable
.map // 操作1
.flatMap // 操作2
.subscribeOn(io)
.map //操作3
.flatMap //操作4
.observeOn(main)
.map //操作5
.flatMap //操作6
.subscribeOn(io) //!!特別注意
.subscribe(handleData)
假設這里我們是在主線程上調用這段代碼揽浙,
那么
操作1
,操作2
是在io線程上意敛,因為之后subscribeOn
切換了線程操作3
媒吗,操作4
也是在io線程上,因為在subscribeOn
切換了線程之后舍哄,并沒有發(fā)生改變氛改。操作5
,操作6
是在main線程上撩独,因為在他們之前的observeOn
切換了線程敞曹。- 特別注意那一段,對于
操作5
和操作6
是無效的
再簡單點總結就是
-
subscribeOn
的調用切換之前的線程综膀。 -
observeOn
的調用切換之后的線程澳迫。 -
observeOn
之后,不可再調用subscribeOn
切換線程