rxjava 如何和傳統(tǒng)回調(diào)函數(shù)結(jié)合

今天看到一個(gè) Observable.fromEmitter 的函數(shù)析苫,這里是這個(gè)函數(shù)的 javadoc

Provides an API (via a cold Observable) that bridges the reactive world with the callback-style, generally non-backpressured world.
Example:
You should call the AsyncEmitter's onNext, onError and onCompleted methods in a serialized fashion. The rest of its methods are thread-safe.

Observable.<Event>fromEmitter(emitter -> {
Callback listener = new Callback() {
@Override
public void onEvent(Event e) {
emitter.onNext(e);
if (e.isLast()) {
emitter.onCompleted();
}
}

@Override
public void onFailure(Exception e) {
emitter.onError(e);
}
};

AutoCloseable c = api.someMethod(listener);

emitter.setCancellation(c::close);

}, BackpressureMode.BUFFER);

這是一個(gè)實(shí)驗(yàn)性的功能,用于和傳統(tǒng)回調(diào)模式的程序?qū)臃看摇榱死斫膺@個(gè)機(jī)制赶舆,我寫了一個(gè)完整的例子龄坪。

package org.wcy123.rxjava1;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

import org.junit.Test;

import lombok.extern.slf4j.Slf4j;
import rx.AsyncEmitter;
import rx.Observable;

@Slf4j
public class FromEmitterTest {

    @Test
    public void main1() throws Exception {
        final ExecutorService service = Executors.newCachedThreadPool();
        final CountDownLatch latch = new CountDownLatch(3 * 4);
        Observable.fromEmitter(
                 emitter -> IntStream.range(0, 3).boxed().forEach(
                        threadIndex -> service.submit(
                                () -> {
                                    for (int i = 0; i < 4; ++i) {
                                        emitter.onNext("thread + " + threadIndex
                                                + " i = " + i);
                                        Utils.sleep(1000);
                                        latch.countDown();
                                    }
                                    if (threadIndex == 2) {
                                        emitter.onCompleted();
                                    }
                                })),
                AsyncEmitter.BackpressureMode.BUFFER)
                .subscribe(s -> log.info("item {}", s));
        log.info("提前打印這里, subscribe 沒有阻塞住");
        log.info("開始等待解鎖");
        latch.await();
        log.info("解鎖完畢");
    }
}

這個(gè)例子的執(zhí)行結(jié)果是

02:12:24.244 [main] INFO org.wcy123.rxjava1.FromEmitterTest - 提前打印這里, subscribe 沒有阻塞住
02:12:24.244 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 0 i = 0
02:12:24.250 [main] INFO org.wcy123.rxjava1.FromEmitterTest - 開始等待解鎖
02:12:24.251 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 1 i = 0
02:12:24.251 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 2 i = 0
02:12:25.245 [pool-1-thread-2] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 2 i = 1
02:12:25.255 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 0 i = 1
02:12:26.248 [pool-1-thread-3] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 2 i = 2
02:12:26.249 [pool-1-thread-3] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 1 i = 2
02:12:26.257 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 0 i = 2
02:12:27.252 [pool-1-thread-2] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 2 i = 3
02:12:27.258 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 0 i = 3
02:12:28.264 [main] INFO org.wcy123.rxjava1.FromEmitterTest - 解鎖完畢

注意到 log.info("item") 是運(yùn)行在三個(gè)不同的線程中叙身。fromEmitter 的第一個(gè)參數(shù)是一個(gè)函數(shù)封拧,即 f志鹃, 該函數(shù)的第一個(gè)參數(shù)是 emitter ,類型是 AsyncEmitter泽西。fromEmitter 返回一個(gè) Observable , 這個(gè)Obverable 被訂閱的時(shí)候弄跌,就會(huì)運(yùn)行函數(shù) f 。f 運(yùn)行時(shí)尝苇,創(chuàng)建了 3 個(gè)線程,每個(gè)線程里面埠胖,都會(huì)調(diào)用 emitter 來發(fā)布數(shù)據(jù)糠溜,emitter.onNext(...) ,一旦調(diào)用這個(gè)函數(shù)直撤,會(huì)觸發(fā)后面所有的 Observable 定義的行為非竿,觸發(fā) s->log.info("iterm{}", s) 的執(zhí)行。

注意到 thread 0 并沒有機(jī)會(huì)打印出來最后一個(gè) i = 3谋竖, 因?yàn)?thread 2 提前調(diào)用了 emitter.onComplet()

latch.await() 等待所有線程結(jié)束红柱。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末承匣,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子锤悄,更是在濱河造成了極大的恐慌韧骗,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,490評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件零聚,死亡現(xiàn)場離奇詭異袍暴,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)隶症,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,581評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門政模,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人蚂会,你說我怎么就攤上這事淋样。” “怎么了胁住?”我有些...
    開封第一講書人閱讀 165,830評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵趁猴,是天一觀的道長。 經(jīng)常有香客問我措嵌,道長躲叼,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,957評(píng)論 1 295
  • 正文 為了忘掉前任企巢,我火速辦了婚禮枫慷,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘浪规。我一直安慰自己或听,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,974評(píng)論 6 393
  • 文/花漫 我一把揭開白布笋婿。 她就那樣靜靜地躺著誉裆,像睡著了一般。 火紅的嫁衣襯著肌膚如雪缸濒。 梳的紋絲不亂的頭發(fā)上足丢,一...
    開封第一講書人閱讀 51,754評(píng)論 1 307
  • 那天,我揣著相機(jī)與錄音庇配,去河邊找鬼斩跌。 笑死,一個(gè)胖子當(dāng)著我的面吹牛捞慌,可吹牛的內(nèi)容都是我干的耀鸦。 我是一名探鬼主播,決...
    沈念sama閱讀 40,464評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼啸澡,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼袖订!你這毒婦竟也來了氮帐?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,357評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤洛姑,失蹤者是張志新(化名)和其女友劉穎上沐,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體吏口,經(jīng)...
    沈念sama閱讀 45,847評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡奄容,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,995評(píng)論 3 338
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了产徊。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片昂勒。...
    茶點(diǎn)故事閱讀 40,137評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖舟铜,靈堂內(nèi)的尸體忽然破棺而出戈盈,到底是詐尸還是另有隱情,我是刑警寧澤谆刨,帶...
    沈念sama閱讀 35,819評(píng)論 5 346
  • 正文 年R本政府宣布塘娶,位于F島的核電站,受9級(jí)特大地震影響痊夭,放射性物質(zhì)發(fā)生泄漏刁岸。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,482評(píng)論 3 331
  • 文/蒙蒙 一她我、第九天 我趴在偏房一處隱蔽的房頂上張望虹曙。 院中可真熱鬧,春花似錦番舆、人聲如沸酝碳。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,023評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽疏哗。三九已至,卻和暖如春禾怠,著一層夾襖步出監(jiān)牢的瞬間返奉,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,149評(píng)論 1 272
  • 我被黑心中介騙來泰國打工吗氏, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留衡瓶,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,409評(píng)論 3 373
  • 正文 我出身青樓牲证,卻偏偏與公主長得像,于是被迫代替她去往敵國和親关面。 傳聞我的和親對(duì)象是個(gè)殘疾皇子坦袍,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,086評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容