前言
RxJava2.0從入門到放棄(一)中簡單介紹了我對RxJava的理解以及RxJava最基本的一個(gè)寫法惜姐。這一部分繼續(xù)講講RxJava最重要的一個(gè)環(huán)節(jié)關(guān)于線程得調(diào)度。(本文案例用kotlin來做案例椿息。)
至于為什么說是最重要的载弄?RxJava在github是這么定義自己的
RxJava is a Java VM implementation of [Reactive Extensions](http://reactivex.io/): a library for composing asynchronous and event-based programs by using observable sequences.
也就是說RxJava是一個(gè)專注于用來解決異步以及事件驅(qū)動(dòng)的庫。
講解之前我們先拋出一個(gè)問題吧:
先從IO線程獨(dú)讀一個(gè)文件夾撵颊,
再把文件夾里面的png圖片篩選出來宇攻,
然后在主線程中把這些圖片加載在UI上。
面對這樣一個(gè)需求該怎么處理倡勇。用Thread應(yīng)該差不多這樣實(shí)現(xiàn)
final File[] files = file.listFiles();
new Thread(new Runnable() {
@Override
public void run() {
for(File f:files) {
if(f.getName().endsWith(".png")){
final Bitmap bitmap = transFileToBitmap(f);
runOnUiThread(new Runnable() {
@Override
public void run() {
updateUI(bitmap);
}
});
}
}
}
}).start();
后面再來用RxJava來實(shí)現(xiàn)一下這個(gè)需求逞刷。
正文
RxJava整體的線程調(diào)度涉及到三個(gè)關(guān)鍵點(diǎn)分別是subscribeOn
observeOn
Scheduler
。
RxJava在不指定線程的情況下妻熊,RxJava保持者線程不變的原則夸浅。也就是說『上游』在哪個(gè)線程上創(chuàng)建事件,『下游』就是在哪個(gè)線程上處理事件扔役,『上游』和『下游』線程保持一致帆喇。
用代碼來驗(yàn)證下:
Observable.create<Int> { e ->
for (i in 0..5) {
Log.e(TAG, "Observable thread ${Thread.currentThread().name}")
Log.e(TAG, "observable $i")
e.onNext(i)
}
}
.subscribe { int ->
Log.e(TAG, "onNext $int")
Log.e(TAG, "subscribe thread ${Thread.currentThread().name}")
}
輸出結(jié)果是這樣:
08-23 17:55:33.635 19473 19473 E RxTag : Observable thread main
08-23 17:55:33.635 19473 19473 E RxTag : observable 0
08-23 17:55:33.635 19473 19473 E RxTag : onNext 0
08-23 17:55:33.635 19473 19473 E RxTag : subscribe thread main
08-23 17:55:33.635 19473 19473 E RxTag : Observable thread main
08-23 17:55:33.635 19473 19473 E RxTag : observable 1
08-23 17:55:33.635 19473 19473 E RxTag : onNext 1
08-23 17:55:33.635 19473 19473 E RxTag : subscribe thread main
08-23 17:55:33.635 19473 19473 E RxTag : Observable thread main
08-23 17:55:33.635 19473 19473 E RxTag : observable 2
08-23 17:55:33.635 19473 19473 E RxTag : onNext 2
08-23 17:55:33.635 19473 19473 E RxTag : subscribe thread main
08-23 17:55:33.635 19473 19473 E RxTag : Observable thread main
08-23 17:55:33.635 19473 19473 E RxTag : observable 3
08-23 17:55:33.635 19473 19473 E RxTag : onNext 3
08-23 17:55:33.635 19473 19473 E RxTag : subscribe thread main
08-23 17:55:33.635 19473 19473 E RxTag : Observable thread main
08-23 17:55:33.635 19473 19473 E RxTag : observable 4
08-23 17:55:33.635 19473 19473 E RxTag : onNext 4
08-23 17:55:33.635 19473 19473 E RxTag : subscribe thread main
08-23 17:55:33.635 19473 19473 E RxTag : Observable thread main
08-23 17:55:33.635 19473 19473 E RxTag : observable 5
08-23 17:55:33.635 19473 19473 E RxTag : onNext 5
08-23 17:55:33.635 19473 19473 E RxTag : subscribe thread main
可以看到所有的運(yùn)行都是在main線程運(yùn)行的,可以驗(yàn)證:
RxJava在不指定線程的情況下,『上游』和『下游』線程保持一致亿胸。
如果指定線程的話該怎么做坯钦?
在RxJava中可以分別通過 subscribeOn()
和observerOn()
這兩個(gè)方法來指定『上游』事件產(chǎn)生的線程以及『下游』事件響應(yīng)的線程。
具體怎么做我們來看代碼:
Observable.create<Int> { e ->
for (i in 0..2) {
Log.e(TAG, "Observable thread ${Thread.currentThread().name}")
Log.e(TAG, "observable $i")
e.onNext(i)
}
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { int ->
Log.e(TAG, "onNext $int")
Log.e(TAG, "subscribe thread ${Thread.currentThread().name}")
}
輸出結(jié)果是:
08-24 11:02:28.614 19473 21708 E RxTag : Observable thread RxCachedThreadScheduler-1
08-24 11:02:28.614 19473 21708 E RxTag : observable 0
08-24 11:02:28.614 19473 21708 E RxTag : Observable thread RxCachedThreadScheduler-1
08-24 11:02:28.615 19473 21708 E RxTag : observable 1
08-24 11:02:28.615 19473 21708 E RxTag : Observable thread RxCachedThreadScheduler-1
08-24 11:02:28.615 19473 21708 E RxTag : observable 2
08-24 11:02:28.982 19473 19473 E RxTag : onNext 0
08-24 11:02:28.982 19473 19473 E RxTag : subscribe thread main
08-24 11:02:28.982 19473 19473 E RxTag : onNext 1
08-24 11:02:28.983 19473 19473 E RxTag : subscribe thread main
08-24 11:02:28.983 19473 19473 E RxTag : onNext 2
08-24 11:02:28.983 19473 19473 E RxTag : subscribe thread main
『上游』事件產(chǎn)生在
RxCachedThreadScheduler-1
這個(gè)線程侈玄,『下游』事件響應(yīng)的onNext()在main
線程婉刀。
那么我們關(guān)注下Scheduler的幾個(gè)線程名稱:
- Schedulers.trampoline() : 相當(dāng)于不指定線程。直接在之前的線程運(yùn)行序仙,依賴于調(diào)用操作的線程突颊。
- Schedulers.io():(讀寫文件、讀寫數(shù)據(jù)庫潘悼、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler律秃。行為模式和 newThread() 差不多,區(qū)別在于 io() 的內(nèi)部實(shí)現(xiàn)是是用一個(gè)無數(shù)量上限的線程池治唤,可以重用空閑的線程棒动,因此多數(shù)情況下 io() 比 newThread() 更有效率;
- Schedulers.newThread(): 總是啟用新線程,并在新線程中執(zhí)行操作肝劲;
- Schedulers.single(): 啟用一個(gè)線程池大小為1的線程池迁客,相當(dāng)于(newScheduledThreadPool(1))郭宝,重復(fù)利用這個(gè)線程;
- Schedulers.computation(): 計(jì)算所使用的 Scheduler。這個(gè)計(jì)算指的是 CPU 密集型計(jì)算掷漱,即不會(huì)被 I/O 等操作限制性能的操作粘室,例如圖形的計(jì)算。這個(gè) Scheduler 使用的固定的線程池卜范,大小為 CPU 核數(shù)衔统。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時(shí)間會(huì)浪費(fèi) CPU海雪。
在RxAndroid中就會(huì)有這么一個(gè)線程:
- AndroidSchedulers.mainThread():運(yùn)行在Android主線程中锦爵。main UI線程。
那么在Android中最簡單的異步或者請求網(wǎng)絡(luò)就這么寫了:
Observable.create<String> { e ->
//請求網(wǎng)絡(luò)奥裸,返回一個(gè)String
val str:String = api.getString()
e.onNext(str)
}.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
str ->
//獲得String险掀,更新到UI
updateUI(str)
},{
error->
//連接錯(cuò)誤,提示錯(cuò)誤信息
netWorkError(error.message)
})
回顧文章最開始提出的問題我們就可以這么的用 RxJava實(shí)現(xiàn)出來:
Observable.fromArray(f.listFiles())
.filter(new Predicate<File>() {
@Override
public boolean test(File file) throws Exception {
return file.getName().endsWith(".png");
}
})
.map(new Function<File, Bitmap>() {
@Override
public Bitmap apply(File file) throws Exception {
return getBitMapFromFile(file);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Bitmap>() {
@Override
public void accept(Bitmap bitmap) throws Exception {
updateUI(bitmap);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
showError(throwable.getMessage());
}
});
}
但是有兩點(diǎn)需要注意:
- subscribeOn 講『上游』事件的發(fā)射切換到 Scheduler 所定義的線程湾宙, 如果多次調(diào)用 subscribeOn(),那么只有第一個(gè) subscribeOn 操作有效 樟氢;
- observeOn 指定 observeOn 后續(xù)操作所在線程。也就是說 可以多次調(diào)用observeOn 可以多次切換接下來『下游』事件處理的線程 侠鳄;
舉個(gè)栗子吧:
Observable.create<Int> { emitter ->
for (i in 0..2) {
Log.e(TAG, "Observable thread ${Thread.currentThread().name}")
Log.e(TAG, "observable $i")
emitter.onNext(i)
}
}
.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.doOnNext(consumer())
.observeOn(Schedulers.io())
.doOnNext(consumer())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer())
}
private fun consumer(): Consumer<Int> {
return Consumer { i ->
Log.e(TAG, "onNext thread ${Thread.currentThread().name}")
Log.e(TAG, "onNext $i")
}
}
看一看輸出結(jié)果就證明之前的注意點(diǎn):
08-24 14:54:41.670 26674 27052 E RxTag : Observable thread RxCachedThreadScheduler-6
08-24 14:54:41.670 26674 27052 E RxTag : observable 0
08-24 14:54:41.670 26674 27052 E RxTag : Observable thread RxCachedThreadScheduler-6
08-24 14:54:41.670 26674 27052 E RxTag : observable 1
08-24 14:54:41.670 26674 27052 E RxTag : Observable thread RxCachedThreadScheduler-6
08-24 14:54:41.670 26674 27052 E RxTag : observable 2
08-24 14:54:41.672 26674 26880 E RxTag : onNext thread RxComputationThreadPool-2
08-24 14:54:41.688 26674 26880 E RxTag : onNext 0
08-24 14:54:41.688 26674 26880 E RxTag : onNext thread RxComputationThreadPool-2
08-24 14:54:41.688 26674 26880 E RxTag : onNext 1
08-24 14:54:41.688 26674 26880 E RxTag : onNext thread RxComputationThreadPool-2
08-24 14:54:41.688 26674 26880 E RxTag : onNext 2
08-24 14:54:41.691 26674 27054 E RxTag : onNext thread RxCachedThreadScheduler-7
08-24 14:54:41.691 26674 27054 E RxTag : onNext 0
08-24 14:54:41.697 26674 27054 E RxTag : onNext thread RxCachedThreadScheduler-7
08-24 14:54:41.697 26674 27054 E RxTag : onNext 1
08-24 14:54:41.698 26674 27054 E RxTag : onNext thread RxCachedThreadScheduler-7
08-24 14:54:41.698 26674 27054 E RxTag : onNext 2
08-24 14:54:42.058 26674 26674 E RxTag : onNext thread main
08-24 14:54:42.058 26674 26674 E RxTag : onNext 0
08-24 14:54:42.058 26674 26674 E RxTag : onNext thread main
08-24 14:54:42.059 26674 26674 E RxTag : onNext 1
08-24 14:54:42.059 26674 26674 E RxTag : onNext thread main
08-24 14:54:42.059 26674 26674 E RxTag : onNext 2
我們調(diào)用了兩次 subscribeOn()分別是 io()和newThread(),但是輸出結(jié)果就只有在RxCachedThreadScheduler -6
線程中埠啃。但是每次調(diào)用doOnNext()
都切換了一個(gè)線程,也就是說可以隨時(shí)隨地切換事件的處理線程伟恶。
總結(jié)
線程的調(diào)度就到這結(jié)束了碴开,把握好subscribeOn
observableOn
以及scheduler
的切換,就能隨心所欲的進(jìn)行切換線程切換啦博秫。