前言
對于一般的需求場景珊蟀,需要在子線程中實現(xiàn)耗時的操作菊值;然后回到主線程實現(xiàn) UI操作應(yīng)用到 RxJava模型中外驱,可理解為:
被觀察者 (Observable) 在 子線程 中生產(chǎn)事件(如實現(xiàn)耗時操作等等)
觀察者(Observer)在 主線程 接收 & 響應(yīng)事件(即實現(xiàn)UI操作
實現(xiàn)方式
采用 RxJava內(nèi)置的線程調(diào)度器( Scheduler )育灸,即通過 功能性操作符subscribeOn() & observeOn()實現(xiàn)
subscribeOn
使用該方法可以指定被觀察者執(zhí)行方法位于的線程。
注意:該方法調(diào)用只能生效一次昵宇,即第一次調(diào)用后磅崭,再調(diào)用
subscribeOn
無法改變其執(zhí)行線程的位置。
observeOn
使用該方法指定觀察者事件響應(yīng)位于的線程瓦哎。
注意:該方法可調(diào)用多次砸喻,每一次調(diào)用
observeOn
,后續(xù)操作線程就會切換一次,這里的后續(xù)操作指的是調(diào)用observeOn
后蒋譬,在下一個observeOn
前指定的事件監(jiān)聽操作
代碼實現(xiàn)
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.v("rxJava","Observable thread :" + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribeOn(Schedulers.io()) // 指定被觀察者執(zhí)行線程
.observeOn(Schedulers.newThread()) // 切換到新線程
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("rxJava","do on next thread :" + Thread.currentThread().getName());
}
})
.observeOn(Schedulers.newThread()) // 切換到另一個新線程
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
Log.v("rxJava","filter thread:"+ Thread.currentThread().getName());
return true;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.v("rxJava","Observer onSubscribe thread :" + Thread.currentThread().getName());
}
@Override
public void onNext(Integer value) {
Log.v("rxJava","Observer onNext thread :" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.v("rxJava","Observer onError thread :" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.v("rxJava","Observer onComplete thread :" + Thread.currentThread().getName());
}
});
運行結(jié)果
V/rxJava: Observer onSubscribe thread :main
V/rxJava: Observable thread :RxCachedThreadScheduler-1
V/rxJava: do on next thread :RxNewThreadScheduler-1
V/rxJava: do on next thread :RxNewThreadScheduler-1
V/rxJava: do on next thread :RxNewThreadScheduler-1
V/rxJava: filter thread:RxNewThreadScheduler-2
V/rxJava: Observer onNext thread :RxNewThreadScheduler-2
V/rxJava: filter thread:RxNewThreadScheduler-2
V/rxJava: Observer onNext thread :RxNewThreadScheduler-2
V/rxJava: filter thread:RxNewThreadScheduler-2
V/rxJava: Observer onNext thread :RxNewThreadScheduler-2
V/rxJava: Observer onComplete thread :RxNewThreadScheduler-2
注意:onSubscribe方法總是執(zhí)行在調(diào)用subscribe方法的線程
線程可選參數(shù)
schedulers.io()
這個調(diào)度器時用于I/O操作割岛。它基于根據(jù)需要,增長或縮減來自適應(yīng)的線程池犯助。我們將使用它來修復(fù)StrictMode檢測到的違規(guī)做法癣漆。由于它專用于I/O操作,所以并不是RxJava的默認(rèn)方法剂买;正確的使用它是由開發(fā)者決定的惠爽。 重點需要注意的是線程池是無限制的,大量的I/O調(diào)度操作將創(chuàng)建許多個線程并占用內(nèi)存瞬哼。一如既往的是婚肆,我們需要在性能和簡捷兩者之間找到一個有效的平衡點。
Schedulers.computation()
這個是計算工作默認(rèn)的調(diào)度器坐慰,它與I/O操作無關(guān)较性。它也是許多RxJava方法的默認(rèn)調(diào)度器:buffer(),debounce(),delay(),interval(),sample(),skip()。
Schedulers.immediate()
這個調(diào)度器允許你立即在當(dāng)前線程執(zhí)行你指定的工作结胀。它是timeout(),timeInterval(),以及timestamp()方法默認(rèn)的調(diào)度器赞咙。
Schedulers.newThread()
指定一個新線程來執(zhí)行任務(wù),如果有多個步驟且每個步驟都使用這個方法調(diào)度把跨,則每個步驟都是在一個新的線程中人弓,而不是同一個線程。
Schedulers.trampoline()
當(dāng)我們想在當(dāng)前線程執(zhí)行一個任務(wù)時着逐,并不是立即崔赌,我們可以用.trampoline()將它入隊意蛀。這個調(diào)度器將會處理它的隊列并且按序運行隊列中每一個任務(wù)。它是repeat()和retry()方法默認(rèn)的調(diào)度器健芭。