RxJava 2.0(二)線程調(diào)度Scheduler和操作符

RxJava中的線程

默認(rèn)的情況下派继,Observable 和 Observer是處在同一線程的赠潦,發(fā)送事件在哪個(gè)線程,處理事件同樣也在該線程脆侮。
在Activity的onCreate方法中運(yùn)行以下代碼:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.d("RxJava", "Observable Thread:  " + Thread.currentThread().getName());
                e.onNext(1);
            }
        })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.d("RxJava", "Observer Thread: " + Thread.currentThread().getName());
                    }
                });

可以得到以下結(jié)果:

D/RxJava: Observable Thread: main
D/RxJava: Observer Thread: main

RxJava中,使用Scheduler來進(jìn)行線程控制泌辫,從而實(shí)現(xiàn)了關(guān)鍵的異步操作随夸。

Scheduler

Scheduler可以稱之為線程調(diào)度器,它指定了發(fā)送和處理事件所在的線程震放。常用的API有以下幾個(gè):

  • Schedulers.newThread():啟用新線程并在新線程運(yùn)行
  • Schedulers.io():進(jìn)行I/O 操作所使用的Scheduler宾毒,它的內(nèi)部實(shí)現(xiàn)是一個(gè)無上限的線程池,可以重用空閑線程殿遂,比newThread更有效率诈铛,通常用于讀寫文件,數(shù)據(jù)庫墨礁,網(wǎng)絡(luò)操作等幢竹。
  • Schedulers.computation():CPU密集計(jì)算所用的Scheduler,它內(nèi)部是一個(gè)線程數(shù)等于CPU核心數(shù)的線程池饵溅。
  • AndroidSchedulers.mainThread(): Android中的主線程(UI線程)妨退。

介紹完了常用API之后,通過下面的例子來看一下是怎樣使用的:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.d("RxJava", "Observable Thread:  " + Thread.currentThread().getName());
                e.onNext(1);
            }
        })
                .subscribeOn(Schedulers.newThread())//指定observable線程
                .observeOn(AndroidSchedulers.mainThread())//指定Observer線程
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.d("RxJava", "Observer Thread: " + Thread.currentThread().getName());
                    }
                });

還是上面的例子蜕企,加了兩行代碼咬荷,subscribeOn和observeOn。subscribeOn用來指定發(fā)送事件的線程轻掩,即事件產(chǎn)生的線程幸乒,observeOn指定接收并處理事件的線程,即事件消費(fèi)線程唇牧。運(yùn)行結(jié)果如下:

D/RxJava: Observable Thread: RxNewThreadScheduler-1
D/RxJava: Observer Thread: main

subscribeOn和observeOn都可以多次設(shè)置罕扎,但是subscribeOn只有第一次設(shè)置的值會(huì)生效,而observeOn不一樣丐重,觀察者會(huì)按照observeOn的指定順序依次切換到最后一個(gè)線程腔召。

操作符

操作符的作用是在事件發(fā)送的過程中完成一些特定的操作,比如對事件的包裝扮惦,添加額外的動(dòng)作等等臀蛛。常用操作符主要有以下幾種:

  • map();
    map的作用是將observable的數(shù)據(jù)進(jìn)行加工,轉(zhuǎn)換成一個(gè)新的數(shù)據(jù)之后再進(jìn)行發(fā)送崖蜜∽瞧停看一個(gè)具體的例子:
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        })
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(@NonNull Integer integer) throws Exception {
                        return "This is Data No. " + integer ;
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String str) throws Exception {
                        Log.d("RxJava", "Received data: " + str);
                    }
                });

輸出結(jié)果如下:

D/RxJava: Received data: This is Data No. 1
D/RxJava: Received data: This is Data No. 2
D/RxJava: Received data: This is Data No. 3

  • FlatMap()

    FlatMap與map類似,但是功能更強(qiáng)大了豫领。map只是對Observable發(fā)送的數(shù)據(jù)進(jìn)行處理抡柿,返回的是處理后的數(shù)據(jù),而FlatMap在數(shù)據(jù)處理之后返回的是一個(gè)Observable對象等恐,所以洲劣,F(xiàn)latMap實(shí)際上是對原來的一系列事件進(jìn)行加工然后分拆备蚓,將每一個(gè)數(shù)據(jù)包含在一個(gè)新的Observable對象中發(fā)送給下游的觀察者。這樣做有什么好處囱稽? 舉一個(gè)簡單的例子星著,如果每一個(gè)事件都是耗時(shí)操作,那么采用FlatMap粗悯,將事件分發(fā)給不同的Observable虚循,然后加入Schedulers.io(),這樣效率瞬間提高了样傍。示例如下:

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
//原始事件横缔,打印線程
                Log.d("RxJava", "Original Observable Thread:  " + Thread.currentThread().getName());
                e.onNext(10);
                e.onNext(20);
                e.onNext(30);
            }
        })
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(@NonNull final Integer integer) throws Exception {
                        return Observable.create(new ObservableOnSubscribe<String>(){

                            @Override
                            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
//打印FlatMap轉(zhuǎn)換后,發(fā)送事件的線程
                                Log.d("RxJava", "Observable Thread:  " + Thread.currentThread().getName());
                                Thread.sleep(1000);
                                e.onNext("This is Data No." + integer);
                            }
//指定flatMap轉(zhuǎn)換后發(fā)送事件所處的線程
                        }).subscribeOn(Schedulers.io());
                    }
                })
//指定原始事件發(fā)送線程
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String str) throws Exception {
//打印觀察者所處的線程
                        Log.d("RxJava", "Observer Thread: " + Thread.currentThread().getName());
                        Log.d("RxJava", "Received data: " + str);
                    }
                });

運(yùn)行結(jié)果如下:

Original Observable Thread: RxCachedThreadScheduler-1
D/RxJava: Observable Thread: RxCachedThreadScheduler-1
D/RxJava: Observable Thread: RxCachedThreadScheduler-2
D/RxJava: Observable Thread: RxCachedThreadScheduler-3
Observer Thread: main
D/RxJava: Received data: This is Data No.10
D/RxJava: Observer Thread: main
D/RxJava: Received data: This is Data No.20
D/RxJava: Observer Thread: main
D/RxJava: Received data: This is Data No.30

從結(jié)果可以看出來衫哥,最初的Observable包含3個(gè)事件茎刚,運(yùn)行在同一個(gè)子線程中,如果是耗時(shí)操作撤逢,采用同步的方式會(huì)浪費(fèi)大量事件膛锭,經(jīng)過FlatMap轉(zhuǎn)換之后,將每個(gè)事件轉(zhuǎn)換為一個(gè)新的Observable對象蚊荣,并指定線程初狰,效率一下提高了3倍!

  • concatMap
    這個(gè)操作符與FlatMap作用一樣互例,只是奢入,F(xiàn)latMap轉(zhuǎn)換的事件在發(fā)送時(shí)并不保證順序,而concatMap仍然會(huì)按原來的順序發(fā)送媳叨。

  • filter()
    filter用來對發(fā)送的數(shù)據(jù)進(jìn)行過濾

.filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(@NonNull Integer integer) throws Exception {
                        return false;
                    }
                })

返回值決定了下游觀察者是否能夠收到數(shù)據(jù)腥光,true表示能收到,false表示不能接收到糊秆。

  • take()
    傳入一個(gè)long數(shù)值武福,表示取前多少個(gè)數(shù)據(jù)。如果傳入值大于數(shù)據(jù)量痘番,會(huì)全部發(fā)送捉片。另外,它還接收時(shí)間參數(shù)夫偶,表示在多長時(shí)間內(nèi)發(fā)送的數(shù)據(jù)會(huì)被接收界睁。

  • doOnNext()

    doOnNext()允許我們在每次輸出一個(gè)元素之前做一些額外的事情觉增,比如緩存兵拢,調(diào)試,等等逾礁。

.observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.d("RxJava", "onnext2 Observer Thread: " + Thread.currentThread().getName());
                    }
                })
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末说铃,一起剝皮案震驚了整個(gè)濱河市访惜,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌腻扇,老刑警劉巖债热,帶你破解...
    沈念sama閱讀 218,755評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異幼苛,居然都是意外死亡窒篱,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評論 3 395
  • 文/潘曉璐 我一進(jìn)店門舶沿,熙熙樓的掌柜王于貴愁眉苦臉地迎上來墙杯,“玉大人,你說我怎么就攤上這事括荡「吒洌” “怎么了?”我有些...
    開封第一講書人閱讀 165,138評論 0 355
  • 文/不壞的土叔 我叫張陵畸冲,是天一觀的道長嫉髓。 經(jīng)常有香客問我,道長邑闲,這世上最難降的妖魔是什么算行? 我笑而不...
    開封第一講書人閱讀 58,791評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮苫耸,結(jié)果婚禮上纱意,老公的妹妹穿的比我還像新娘。我一直安慰自己鲸阔,他們只是感情好偷霉,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著褐筛,像睡著了一般类少。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上渔扎,一...
    開封第一講書人閱讀 51,631評論 1 305
  • 那天硫狞,我揣著相機(jī)與錄音,去河邊找鬼晃痴。 笑死残吩,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的倘核。 我是一名探鬼主播泣侮,決...
    沈念sama閱讀 40,362評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼紧唱!你這毒婦竟也來了活尊?” 一聲冷哼從身側(cè)響起隶校,我...
    開封第一講書人閱讀 39,264評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎蛹锰,沒想到半個(gè)月后深胳,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,724評論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡铜犬,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年舞终,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片癣猾。...
    茶點(diǎn)故事閱讀 40,040評論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡权埠,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出煎谍,到底是詐尸還是另有隱情攘蔽,我是刑警寧澤,帶...
    沈念sama閱讀 35,742評論 5 346
  • 正文 年R本政府宣布呐粘,位于F島的核電站满俗,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏作岖。R本人自食惡果不足惜唆垃,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望痘儡。 院中可真熱鬧辕万,春花似錦、人聲如沸沉删。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽矾瑰。三九已至砖茸,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間殴穴,已是汗流浹背凉夯。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留采幌,地道東北人劲够。 一個(gè)月前我還...
    沈念sama閱讀 48,247評論 3 371
  • 正文 我出身青樓,卻偏偏與公主長得像休傍,于是被迫代替她去往敵國和親征绎。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 轉(zhuǎn)一篇文章 原地址:http://gank.io/post/560e15be2dca930e00da1083 前言...
    jack_hong閱讀 915評論 0 2
  • 一尊残、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性炒瘸,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡潔易...
    BrotherChen閱讀 1,620評論 0 10
  • 一、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性寝衫,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡潔易...
    無求_95dd閱讀 3,088評論 0 21
  • 在正文開始之前的最后顷扩,放上GitHub鏈接和引入依賴的gradle代碼: Github: https://gith...
    蘇蘇說zz閱讀 678評論 0 2
  • 姓名:徐偉 常州新日催化劑有限公司 組別:第455期樂觀二組紀(jì)律委員 【日精進(jìn)打卡第196天】 【知~學(xué)習(xí)】 書名...
    奔波兒灞_87f6閱讀 125評論 0 0