RaJava2修煉之路——易容術(shù)
在上一篇講解了RxJava的原理之后唁奢,本篇繼續(xù)深入了解一下RxJava對(duì)執(zhí)行任務(wù)的線程調(diào)度抱慌。
目的
在每個(gè)剛學(xué)Android的小白來講推沸,必須牢記的的一點(diǎn)就是子線程不能跟新UI,所有跟新UI的操作必須放在主線程(也就是UI線程)蔚携,問什么呢房维?因?yàn)闀?huì)造成界面卡頓或者ANR啊傻粘,如果你想問什么是ANR懊拷选?那你就先去查一下資料充點(diǎn)電再來看本篇文章弦悉,如果任何跟新UI的操作在子線程中程序就會(huì)奔潰窒典。下面是我寫的一個(gè)案例,利用Handler來跟新UI:
handler = new Handler(){
@Override
public void handleMessage(Message msg) {
super.handleMessage(msg);
Log.d(TAG, "handleMessage: "+Thread.currentThread().getName());
if(msg.what == 0x01){
Bitmap bmp = (Bitmap) msg.obj;
loadPic(bmp);
}
}
};
/**
* 跟新UI
*
* @param bmp
*/
private void loadPic(Bitmap bmp) {
img.setImageBitmap(bmp);
}
@Override
protected void onResume() {
super.onResume();
new Thread() {//開啟圖片下載線程
@Override
public void run() {
Bitmap bmp = loadImage();
Message msg = handler.obtainMessage();
msg.obj = bmp;
msg.what = 0x01;
handler.sendMessage(msg);
}
}.start();
}
/**
* 圖片下載
*
* @return
*/
private Bitmap loadImage() {
Bitmap bmp = null;
try {
URL url = new URL(mUrl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
conn.setConnectTimeout(8 * 1000);
conn.setReadTimeout(8 * 1000);
InputStream is = conn.getInputStream();
bmp = BitmapFactory.decodeStream(is);
} catch (MalformedURLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return bmp;
}
@Override
protected void onDestroy() {
super.onDestroy();
handler.removeCallbacksAndMessages(null);// 移除所有handler的消息回調(diào)
}
是一個(gè)很簡(jiǎn)單的網(wǎng)絡(luò)加載圖片,然后顯示在UI界面稽莉,在OnResume中開啟線程執(zhí)行加載圖片的耗時(shí)操作瀑志,圖片加載完成后將圖片填充到message中傳遞到HandlerMessage中,執(zhí)行UI跟新,Handler是Android中一塊很重要的知識(shí)點(diǎn)劈猪,不太清楚的小伙伴去查資料補(bǔ)一下吧昧甘。
實(shí)現(xiàn)
我們首先看你一下RxJava中事件發(fā)送和接受分別在那個(gè)線程:
Observable.create(new ObservableOnSubscribe<String >() {
@Override
public void subscribe(ObservableEmitter<String > e) throws Exception {
Log.d(TAG, "accept: "+Thread.currentThread().getName());
e.onNext("hello world");
e.onComplete();
}
}).subscribe(new Consumer<String >() {
@Override
public void accept(String str) throws Exception {
Log.d(TAG, "accept: "+Thread.currentThread().getName());
}
});
}
打印結(jié)果:
09-02 10:34:10.065 22364-22364/ruanrong.com.rxjava2demo D/tag: subscribe: main
09-02 10:34:10.067 22364-22364/ruanrong.com.rxjava2demo D/tag: accept: main
很明顯,發(fā)送和接受事件默認(rèn)都是在Main战得,也就是UI線程充边,那如何讓發(fā)送事件在子線程,跟新事件在UI線程呢常侦?這里引入一個(gè)新的概念浇冰,RxJava的線程調(diào)度器Scheduler,Schedulers內(nèi)部包含六中調(diào)度類型
- Schedulers.computation(?)
用于計(jì)算任務(wù),如事件循環(huán)或和回調(diào)處理聋亡,不要用于IO操作(IO操作請(qǐng)使用Schedulers.io())肘习;默認(rèn)線程數(shù)等于處理器的數(shù)量 - Schedulers.from(executor)
使用指定的Executor作為調(diào)度器 - Schedulers.immediate(?)
在當(dāng)前線程立即開始執(zhí)行任務(wù) - Schedulers.io(?)
用于IO密集型任務(wù),如異步阻塞IO操作坡倔,這個(gè)調(diào)度器的線程池會(huì)根據(jù)需要增長(zhǎng)漂佩;對(duì)于普通的計(jì)算任務(wù),請(qǐng)使用Schedulers.computation()罪塔;Schedulers.io(?)默認(rèn)是一個(gè)CachedThreadScheduler仅仆,很像一個(gè)有線程緩存的新線程調(diào)度器 - Schedulers.newThread(?)
為每個(gè)任務(wù)創(chuàng)建一個(gè)新線程 - Schedulers.trampoline(?)
當(dāng)其它排隊(duì)的任務(wù)完成后,在當(dāng)前線程排隊(duì)開始執(zhí)行
另外垢袱,還有一個(gè)特殊的調(diào)度類型:
AndroidSchedulers.mainThread() 該調(diào)度類型是唯一的確定發(fā)生在UI線程的
上面就是線程調(diào)度器的六中類型,都是Schedules類的靜態(tài)常量港柜。
接下來就是變身的時(shí)候了:
Observable.create(new ObservableOnSubscribe<String >() {
@Override
public void subscribe(ObservableEmitter<String > e) throws Exception {
Log.d(TAG, "subscribe: "+Thread.currentThread().getName());
e.onNext("hello world");
e.onComplete();
}
}).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String >() {
@Override
public void accept(String str) throws Exception {
Log.d(TAG, "accept: "+Thread.currentThread().getName());
}
});
}
打印結(jié)果:
09-02 10:59:12.717 12326-12357/ruanrong.com.rxjava2demo D/tag: subscribe: RxNewThreadScheduler-1
09-02 10:59:12.729 12326-12326/ruanrong.com.rxjava2demo D/tag: accept: main
果然请契,完成了事件發(fā)送在子線程,跟新在主線程的操作夏醉,下面將圖片的下載更新放到RxJava中來執(zhí)行:
Observable.create(new ObservableOnSubscribe<Bitmap>() {
@Override
public void subscribe(ObservableEmitter<Bitmap> e) throws Exception {
Log.d(TAG, "accept: "+Thread.currentThread().getName());
Bitmap bmp = loadImage();
e.onNext(bmp);
e.onComplete();
}
}).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Bitmap>() {
@Override
public void accept(Bitmap bitmap) throws Exception {
Log.d(TAG, "accept: "+Thread.currentThread().getName());
img.setImageBitmap(bitmap);
}
});
好了爽锥,圖片正常顯示,既然這樣畔柔,如果在onSubscribe和onObserver中多次指定線程呢氯夷?比如下面這樣:
Observable.create(new ObservableOnSubscribe<String >() {
@Override
public void subscribe(ObservableEmitter<String > e) throws Exception {
Log.d(TAG, "accept: "+Thread.currentThread().getName());
e.onNext("hello world");
e.onComplete();
}
}).subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<String >() {
@Override
public void accept(String str) throws Exception {
Log.d(TAG, "accept: "+Thread.currentThread().getName());
}
});
}
打印結(jié)果:
09-02 11:09:51.315 21666-21686/? D/tag: subscribe: RxNewThreadScheduler-1
09-02 11:09:51.329 21666-21688/? D/tag: accept: RxNewThreadScheduler-2
可以說明設(shè)置訂閱事件選擇離它最近的線程調(diào)度方式,而觀察者選擇最后的線程調(diào)度方式靶擦。
既然完成了訂閱在子線程觀察在主線程腮考,那么對(duì)于這種在處理耗時(shí)任務(wù)過程中,Activity銷毀呢玄捕?這里就用到上一篇中的Disposiable對(duì)象了踩蔚,當(dāng)Activity銷毀時(shí)終止事件發(fā)送。
Observable.create(new ObservableOnSubscribe<Bitmap>() {
@Override
public void subscribe(ObservableEmitter<Bitmap> e) throws Exception {
Log.d(TAG, "accept: "+Thread.currentThread().getName());
Bitmap bmp = loadImage();
e.onNext(bmp);
e.onComplete();
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Bitmap>() {
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
}
@Override
public void onNext(Bitmap value) {
img.setImageBitmap(value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
@Override
protected void onDestroy() {
super.onDestroy();
handler.removeCallbacksAndMessages(null);
if(!mDisposable.isDisposed()){
mDisposable.dispose();
}
}
那如果有多個(gè)Disposable呢枚粘?這里有一個(gè)專門裝Disposable的容器類CompositeDisposable馅闽,里面維護(hù)了一個(gè)`OpenHashSet<Disposable> resources;集合,所以只需要在訂閱時(shí)將Disposable添加到CompositeDisposable中,在Activity銷毀時(shí)將該容器Clear掉就行了福也。
好了局骤,RxJava的事件調(diào)度就是這樣來改變事件執(zhí)行的線程。