RxJava中的線程
默認(rèn)的情況下派继,Observable 和 Observer是處在同一線程的赠潦,發(fā)送事件在哪個(gè)線程,處理事件同樣也在該線程脆侮。
在Activity的onCreate方法中運(yùn)行以下代碼:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.d("RxJava", "Observable Thread: " + Thread.currentThread().getName());
e.onNext(1);
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.d("RxJava", "Observer Thread: " + Thread.currentThread().getName());
}
});
可以得到以下結(jié)果:
D/RxJava: Observable Thread: main
D/RxJava: Observer Thread: main
RxJava中,使用Scheduler來進(jìn)行線程控制泌辫,從而實(shí)現(xiàn)了關(guān)鍵的異步操作随夸。
Scheduler
Scheduler可以稱之為線程調(diào)度器,它指定了發(fā)送和處理事件所在的線程震放。常用的API有以下幾個(gè):
- Schedulers.newThread():啟用新線程并在新線程運(yùn)行
- Schedulers.io():進(jìn)行I/O 操作所使用的Scheduler宾毒,它的內(nèi)部實(shí)現(xiàn)是一個(gè)無上限的線程池,可以重用空閑線程殿遂,比newThread更有效率诈铛,通常用于讀寫文件,數(shù)據(jù)庫墨礁,網(wǎng)絡(luò)操作等幢竹。
- Schedulers.computation():CPU密集計(jì)算所用的Scheduler,它內(nèi)部是一個(gè)線程數(shù)等于CPU核心數(shù)的線程池饵溅。
- AndroidSchedulers.mainThread(): Android中的主線程(UI線程)妨退。
介紹完了常用API之后,通過下面的例子來看一下是怎樣使用的:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.d("RxJava", "Observable Thread: " + Thread.currentThread().getName());
e.onNext(1);
}
})
.subscribeOn(Schedulers.newThread())//指定observable線程
.observeOn(AndroidSchedulers.mainThread())//指定Observer線程
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.d("RxJava", "Observer Thread: " + Thread.currentThread().getName());
}
});
還是上面的例子蜕企,加了兩行代碼咬荷,subscribeOn和observeOn。subscribeOn用來指定發(fā)送事件的線程轻掩,即事件產(chǎn)生的線程幸乒,observeOn指定接收并處理事件的線程,即事件消費(fèi)線程唇牧。運(yùn)行結(jié)果如下:
D/RxJava: Observable Thread: RxNewThreadScheduler-1
D/RxJava: Observer Thread: main
subscribeOn和observeOn都可以多次設(shè)置罕扎,但是subscribeOn只有第一次設(shè)置的值會(huì)生效,而observeOn不一樣丐重,觀察者會(huì)按照observeOn的指定順序依次切換到最后一個(gè)線程腔召。
操作符
操作符的作用是在事件發(fā)送的過程中完成一些特定的操作,比如對事件的包裝扮惦,添加額外的動(dòng)作等等臀蛛。常用操作符主要有以下幾種:
-
map();
map的作用是將observable的數(shù)據(jù)進(jìn)行加工,轉(zhuǎn)換成一個(gè)新的數(shù)據(jù)之后再進(jìn)行發(fā)送崖蜜∽瞧停看一個(gè)具體的例子:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
})
.map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return "This is Data No. " + integer ;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String str) throws Exception {
Log.d("RxJava", "Received data: " + str);
}
});
輸出結(jié)果如下:
D/RxJava: Received data: This is Data No. 1
D/RxJava: Received data: This is Data No. 2
D/RxJava: Received data: This is Data No. 3
-
FlatMap()
FlatMap與map類似,但是功能更強(qiáng)大了豫领。map只是對Observable發(fā)送的數(shù)據(jù)進(jìn)行處理抡柿,返回的是處理后的數(shù)據(jù),而FlatMap在數(shù)據(jù)處理之后返回的是一個(gè)Observable對象等恐,所以洲劣,F(xiàn)latMap實(shí)際上是對原來的一系列事件進(jìn)行加工然后分拆备蚓,將每一個(gè)數(shù)據(jù)包含在一個(gè)新的Observable對象中發(fā)送給下游的觀察者。這樣做有什么好處囱稽? 舉一個(gè)簡單的例子星著,如果每一個(gè)事件都是耗時(shí)操作,那么采用FlatMap粗悯,將事件分發(fā)給不同的Observable虚循,然后加入Schedulers.io(),這樣效率瞬間提高了样傍。示例如下:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
//原始事件横缔,打印線程
Log.d("RxJava", "Original Observable Thread: " + Thread.currentThread().getName());
e.onNext(10);
e.onNext(20);
e.onNext(30);
}
})
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull final Integer integer) throws Exception {
return Observable.create(new ObservableOnSubscribe<String>(){
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
//打印FlatMap轉(zhuǎn)換后,發(fā)送事件的線程
Log.d("RxJava", "Observable Thread: " + Thread.currentThread().getName());
Thread.sleep(1000);
e.onNext("This is Data No." + integer);
}
//指定flatMap轉(zhuǎn)換后發(fā)送事件所處的線程
}).subscribeOn(Schedulers.io());
}
})
//指定原始事件發(fā)送線程
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String str) throws Exception {
//打印觀察者所處的線程
Log.d("RxJava", "Observer Thread: " + Thread.currentThread().getName());
Log.d("RxJava", "Received data: " + str);
}
});
運(yùn)行結(jié)果如下:
Original Observable Thread: RxCachedThreadScheduler-1
D/RxJava: Observable Thread: RxCachedThreadScheduler-1
D/RxJava: Observable Thread: RxCachedThreadScheduler-2
D/RxJava: Observable Thread: RxCachedThreadScheduler-3
Observer Thread: main
D/RxJava: Received data: This is Data No.10
D/RxJava: Observer Thread: main
D/RxJava: Received data: This is Data No.20
D/RxJava: Observer Thread: main
D/RxJava: Received data: This is Data No.30
從結(jié)果可以看出來衫哥,最初的Observable包含3個(gè)事件茎刚,運(yùn)行在同一個(gè)子線程中,如果是耗時(shí)操作撤逢,采用同步的方式會(huì)浪費(fèi)大量事件膛锭,經(jīng)過FlatMap轉(zhuǎn)換之后,將每個(gè)事件轉(zhuǎn)換為一個(gè)新的Observable對象蚊荣,并指定線程初狰,效率一下提高了3倍!
concatMap
這個(gè)操作符與FlatMap作用一樣互例,只是奢入,F(xiàn)latMap轉(zhuǎn)換的事件在發(fā)送時(shí)并不保證順序,而concatMap仍然會(huì)按原來的順序發(fā)送媳叨。filter()
filter用來對發(fā)送的數(shù)據(jù)進(jìn)行過濾
.filter(new Predicate<Integer>() {
@Override
public boolean test(@NonNull Integer integer) throws Exception {
return false;
}
})
返回值決定了下游觀察者是否能夠收到數(shù)據(jù)腥光,true表示能收到,false表示不能接收到糊秆。
take()
傳入一個(gè)long數(shù)值武福,表示取前多少個(gè)數(shù)據(jù)。如果傳入值大于數(shù)據(jù)量痘番,會(huì)全部發(fā)送捉片。另外,它還接收時(shí)間參數(shù)夫偶,表示在多長時(shí)間內(nèi)發(fā)送的數(shù)據(jù)會(huì)被接收界睁。-
doOnNext()
doOnNext()允許我們在每次輸出一個(gè)元素之前做一些額外的事情觉增,比如緩存兵拢,調(diào)試,等等逾礁。
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.d("RxJava", "onnext2 Observer Thread: " + Thread.currentThread().getName());
}
})