Rxjava

RxJava 簡(jiǎn)介

RxJava 是 ReactiveX 在 Java 上的開源的實(shí)現(xiàn)埋哟。Observable(被觀察者) 和 Subscriber(訂閱者)是兩個(gè)主要的類。在 RxJava 上嗜暴,一個(gè) Observable 是一個(gè)發(fā)出數(shù)據(jù)流或者事件的類械姻,Subscriber 是一個(gè)對(duì)這些發(fā)出的 items (數(shù)據(jù)流或者事件)進(jìn)行處理(采取行動(dòng))的類斑响。一個(gè) Observable 的標(biāo)準(zhǔn)流發(fā)出一個(gè)或多個(gè) item枉侧,然后成功完成或者出錯(cuò)。一個(gè) Observable 可以有多個(gè) Subscribers肛宋,并且通過 Observable 發(fā)出的每一個(gè) item州藕,該 item 將會(huì)被發(fā)送到 Subscriber.onNext() 方法來進(jìn)行處理。一旦 Observable 不再發(fā)出 items酝陈,它將會(huì)調(diào)用 Subscriber.onCompleted() 方法床玻,或如果有一個(gè)出錯(cuò)的話 Observable 會(huì)調(diào)用 Subscriber.onError() 方法。

對(duì)于任何 Observable 你可以定義在兩個(gè)不同的線程后添,Observable 會(huì)操作在它上面笨枯。使用 Observable.observeOn() 定義"觀察者執(zhí)行觀察的"線程,用來監(jiān)聽和檢查從 Observable 最新發(fā)出的 items (Subscriber 的 onNext遇西,onCompleted 和 onError 方法會(huì)執(zhí)行在 observeOn 所指定的線程上)馅精,并使用 Observable.subscribeOn() 來定義"訂閱的線程",將其運(yùn)行我們 Observable 的代碼(長(zhǎng)時(shí)間運(yùn)行的操作)粱檀。

observeOn 與作用域

observeOn是對(duì)下游生效的洲敢,一個(gè)簡(jiǎn)單的例子:

Flowable.just(1).observeOn(Schedulers.io())
        .subscribe(i -> {
            System.out.println(Thread.currentThread().getName());
        });

輸出:
RxCachedThreadScheduler-1

但是當(dāng)有多個(gè)操作符,且存在多次observeOn時(shí)茄蚯,每個(gè)方法都是執(zhí)行在什么線程呢压彭?

Flowable.just(1).observeOn(Schedulers.io())
        .map(i -> {
            System.out.println(Thread.currentThread().getName());
            return i;
        })
        .observeOn(Schedulers.computation())
        .subscribe(i -> {
            System.out.println(Thread.currentThread().getName());
        });

輸出:
RxCachedThreadScheduler-1
RxComputationThreadPool-1

這里就涉及到一些 RxJava 實(shí)現(xiàn)的細(xì)節(jié),多數(shù)操作符是基于上游調(diào)用onNext / onComplete / onError 的進(jìn)一步封裝渗常,在不涉及包含Scheduler的操作符的情況下壮不,在上游調(diào)用了observeOn后,后續(xù)操作符的方法都是執(zhí)行在上游observeOn所調(diào)度的線程皱碘。因此每個(gè)操作符所執(zhí)行的線程都是由上游最近的一個(gè)observeOn的Scheduler決定询一。

因此筆者稱之為最近生效原則,但是請(qǐng)注意癌椿,observeOn是影響下游的健蕊,因此操作符所執(zhí)行的線程受的是最近上游的observeOn影響。

示例

因此在實(shí)際使用中靈活的使用observeOn踢俄,使得代碼的效率最大化缩功。這里筆者再舉個(gè)例子:

Flowable.just(new File("input.txt"))
        .map(f -> new BufferedReader(new InputStreamReader(new FileInputStream(f))))
        .observeOn(Schedulers.io())
        .flatMap(r -> Flowable.<String, BufferedReader>generate(() -> r, (br, e) -> {
            String s = br.readLine();
            if (s != null) {
                e.onNext(s);
            } else {
                System.out.println(Thread.currentThread().getName());
                e.onComplete();
            }
        }, BufferedReader::close))
        .observeOn(Schedulers.computation())
        .map(Integer::parseInt)
        .reduce(0, (total, item) -> {
            System.out.println(item);
            return total + item;
        })
        .subscribe(s -> {
            System.out.println("total: " + s);
            System.out.println(Thread.currentThread().getName());
        });

輸出:
RxCachedThreadScheduler-1
1
2
3
4
5
total: 15
RxComputationThreadPool-1

如上代碼所示,我們從 input.txt 讀出每行的字符串都办,然后轉(zhuǎn)成一個(gè) int, 最后求和嫡锌。這里我們靈活地使用了兩次observeOn虑稼,在讀文件時(shí),調(diào)度至IoScheduler势木,隨后做計(jì)算工作時(shí)調(diào)度至ComputationScheduler动雹,從控制臺(tái)的輸出可以見線程的的確確是我們所期望的。當(dāng)然這里求和只是一個(gè)示例跟压,讀者們可以舉一反三。

事實(shí)上上面的代碼還不是最優(yōu)的:

Flowable.just(new File("input.txt"))
        .map(f -> new BufferedReader(new InputStreamReader(new FileInputStream(f))))
        .observeOn(Schedulers.io())
        .flatMap(r -> Flowable.<String, BufferedReader>generate(() -> r, (br, e) -> {
            String s = br.readLine();
            if (s != null) {
                e.onNext(s);
            } else {
                System.out.println(Thread.currentThread().getName());
                e.onComplete();
            }
        }, BufferedReader::close))
        .parallel()
        .runOn(Schedulers.computation())
        .map(Integer::parseInt)
        .reduce((i, j) -> {
            System.out.println(Thread.currentThread().getName());
            return i + j;
        })
        .subscribe(s -> {
            System.out.println("total: " + s);
            System.out.println(Thread.currentThread().getName());
        });
輸出:
RxCachedThreadScheduler-1
RxComputationThreadPool-1
RxComputationThreadPool-2
RxComputationThreadPool-4
RxComputationThreadPool-4
total: 15
RxComputationThreadPool-4

如上代碼所示我們可以充分利用多核的性能歼培,通過parallel來并行運(yùn)算震蒋,當(dāng)然這里用在求和就有點(diǎn)殺雞用牛刀的意思了,這里只是一個(gè)舉例躲庄。更多 parallel 相關(guān)的內(nèi)容查剖,留待后續(xù)分享。

subscribeOn與作用域

事實(shí)上subscribeOn同樣遵循最近生效原則噪窘,但是與observeOn恰恰相反笋庄。操作符會(huì)被最近的下游的subscribeOn調(diào)度,因?yàn)閟ubscribeOn影響的是上游倔监。

但是和observeOn又有一些微妙的差別在于直砂,我們通常調(diào)用subscribeOn更加關(guān)注最上游的數(shù)據(jù)源的線程。因此通常不會(huì)在中間過程中調(diào)用多次浩习,任意的調(diào)用一次subscribeOn均會(huì)影響上游所有操作符的subscribe所在的線程静暂,且不受observeOn的影響。這是由于這兩者機(jī)制的不同谱秽,subscribeOn是將整個(gè)上游的subscribe方法都調(diào)度到目標(biāo)線程了洽蛀。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市疟赊,隨后出現(xiàn)的幾起案子郊供,更是在濱河造成了極大的恐慌,老刑警劉巖近哟,帶你破解...
    沈念sama閱讀 206,968評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件驮审,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡椅挣,警方通過查閱死者的電腦和手機(jī)头岔,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來鼠证,“玉大人峡竣,你說我怎么就攤上這事×烤牛” “怎么了适掰?”我有些...
    開封第一講書人閱讀 153,220評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵颂碧,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我类浪,道長(zhǎng)载城,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,416評(píng)論 1 279
  • 正文 為了忘掉前任费就,我火速辦了婚禮诉瓦,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘力细。我一直安慰自己睬澡,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評(píng)論 5 374
  • 文/花漫 我一把揭開白布眠蚂。 她就那樣靜靜地躺著煞聪,像睡著了一般。 火紅的嫁衣襯著肌膚如雪逝慧。 梳的紋絲不亂的頭發(fā)上昔脯,一...
    開封第一講書人閱讀 49,144評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音笛臣,去河邊找鬼云稚。 笑死,一個(gè)胖子當(dāng)著我的面吹牛捐祠,可吹牛的內(nèi)容都是我干的碱鳞。 我是一名探鬼主播,決...
    沈念sama閱讀 38,432評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼踱蛀,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼窿给!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起率拒,我...
    開封第一講書人閱讀 37,088評(píng)論 0 261
  • 序言:老撾萬榮一對(duì)情侶失蹤崩泡,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后猬膨,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體角撞,經(jīng)...
    沈念sama閱讀 43,586評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評(píng)論 2 325
  • 正文 我和宋清朗相戀三年勃痴,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了谒所。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,137評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡沛申,死狀恐怖劣领,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情铁材,我是刑警寧澤尖淘,帶...
    沈念sama閱讀 33,783評(píng)論 4 324
  • 正文 年R本政府宣布奕锌,位于F島的核電站,受9級(jí)特大地震影響村生,放射性物質(zhì)發(fā)生泄漏惊暴。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評(píng)論 3 307
  • 文/蒙蒙 一趁桃、第九天 我趴在偏房一處隱蔽的房頂上張望辽话。 院中可真熱鬧,春花似錦卫病、人聲如沸屡穗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至烂斋,卻和暖如春屹逛,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背汛骂。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評(píng)論 1 262
  • 我被黑心中介騙來泰國(guó)打工罕模, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人帘瞭。 一個(gè)月前我還...
    沈念sama閱讀 45,595評(píng)論 2 355
  • 正文 我出身青樓淑掌,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親蝶念。 傳聞我的和親對(duì)象是個(gè)殘疾皇子抛腕,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • 什么是 ReactiveX? ???ReactiveX 是一個(gè)專注于異步編程與控制可觀察數(shù)據(jù)(或者事件)流的API...
    huangandroid閱讀 557評(píng)論 0 2
  • 原文地址:RxJava 入門 翻譯原文:Getting Started with RxJava and Andro...
    MrFu閱讀 11,742評(píng)論 6 48
  • 編譯地址:Getting Started with RxJava and Android譯者:MrFu原文地址:R...
    IT程序獅閱讀 483評(píng)論 0 6
  • 注:此文轉(zhuǎn)載自:作者:JYcoder的文章:http://www.reibang.com/u/2ebe42698...
    飛魚先生閱讀 481評(píng)論 0 1
  • 1.簡(jiǎn)化輸出Hello world 上篇中雖然打印出了Hello world媒殉,但代碼有點(diǎn)多担敌,就簡(jiǎn)化一下形式。修改r...
    英勇青銅5閱讀 1,204評(píng)論 11 8