聊聊reactive streams的tranform操作

本文主要展示一下reactive streams的一些transform操作

mergeWith

    @Test
    public void testMerge(){
        Flux<String> flux1 = Flux.interval(Duration.ofSeconds(1))
                .take(3)
                .map(e -> "[flux1]:"+e);

        Flux<String> mergeFlux = Flux.interval(Duration.ofSeconds(1))
                .delayElements(Duration.ofSeconds(1))
                .take(3)
                .map(e -> "[flux2]:"+e)
                .mergeWith(flux1);

        mergeFlux.subscribe(e -> {
            LOGGER.info("subscribe:{}",e);
        });

        mergeFlux.blockLast();
    }

輸出實(shí)例

21:18:07.583 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
21:18:08.618 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux1]:0
21:18:09.619 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux1]:1
21:18:09.645 [parallel-6] INFO com.example.demo.TransformTest - subscribe:[flux2]:0
21:18:10.619 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux1]:2
21:18:10.649 [parallel-8] INFO com.example.demo.TransformTest - subscribe:[flux2]:1
21:18:11.654 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux2]:2

可以發(fā)現(xiàn)充尉,他們是交叉合并的。

concatWith

    @Test
    public void testConcat(){
        Flux<String> flux1 = Flux.interval(Duration.ofSeconds(1))
                .take(3)
                .map(e -> "[flux1]:"+e);

        Flux<String> concatFlux = Flux.interval(Duration.ofSeconds(1))
                .delayElements(Duration.ofSeconds(1))
                .take(3)
                .map(e -> "[flux2]:"+e)
                .concatWith(flux1);
        concatFlux.subscribe(e -> {
            LOGGER.info("subscribe:{}",e);
        });
        concatFlux.blockLast();
    }

輸出

21:19:00.779 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
21:19:02.832 [parallel-4] INFO com.example.demo.TransformTest - subscribe:[flux2]:0
21:19:03.836 [parallel-6] INFO com.example.demo.TransformTest - subscribe:[flux2]:1
21:19:04.840 [parallel-8] INFO com.example.demo.TransformTest - subscribe:[flux2]:2
21:19:05.845 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux1]:0
21:19:06.845 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux1]:1
21:19:07.844 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux1]:2

可以發(fā)現(xiàn)concatWith只是連接兩個(gè)flux的數(shù)據(jù)憔鬼,并不是按emit的順序交叉來

zipWith

    @Test
    public void testZip(){
        List<String> firstList = Lists.newArrayList("a","b","c","d","e","a","b");
        List<String> secondList = Lists.newArrayList("1","2","3","4","5");
        Flux<Tuple2<String,String>> zipFlux =  Flux.fromIterable(firstList)
                .zipWith(Flux.fromIterable(secondList));
        zipFlux.subscribe(e -> {
            LOGGER.info("subscribe:{}",e);
        });
    }

輸出如下

21:20:59.506 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
21:20:59.516 [main] INFO com.example.demo.TransformTest - subscribe:[a,1]
21:20:59.517 [main] INFO com.example.demo.TransformTest - subscribe:[b,2]
21:20:59.517 [main] INFO com.example.demo.TransformTest - subscribe:[c,3]
21:20:59.517 [main] INFO com.example.demo.TransformTest - subscribe:[d,4]
21:20:59.517 [main] INFO com.example.demo.TransformTest - subscribe:[e,5]

可以發(fā)現(xiàn)flux1相比flux2多余的數(shù)據(jù)沒有被zip

flatMap

    @Test
    public void testFlatMap(){
        List<String> secondList = Lists.newArrayList("1","2","3","4","5");
        Flux<String> flatMapFlux = Flux.fromIterable(secondList)
                .flatMap((str) ->{
                    return Mono.just(str).repeat(2).map(String::toUpperCase).delayElements(Duration.ofMillis(1));
                });
        flatMapFlux.subscribe(e -> {
            LOGGER.info("subscribe:{}",e);
        });
        flatMapFlux.blockLast();

        Flux<String> mapFlux = Flux.fromIterable(secondList)
                .repeat(2)
                .map(String::toUpperCase);
        mapFlux.subscribe(e -> {
                    LOGGER.info("map subscribe:{}",e);
                });
        mapFlux.blockLast();
    }

輸出

21:33:46.904 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
21:33:46.958 [parallel-1] INFO com.example.demo.TransformTest - subscribe:1
21:33:46.959 [parallel-1] INFO com.example.demo.TransformTest - subscribe:2
21:33:46.959 [parallel-1] INFO com.example.demo.TransformTest - subscribe:3
21:33:46.960 [parallel-1] INFO com.example.demo.TransformTest - subscribe:4
21:33:46.960 [parallel-1] INFO com.example.demo.TransformTest - subscribe:5
21:33:46.960 [parallel-1] INFO com.example.demo.TransformTest - subscribe:2
21:33:46.960 [parallel-7] INFO com.example.demo.TransformTest - subscribe:3
21:33:46.960 [parallel-8] INFO com.example.demo.TransformTest - subscribe:4
21:33:46.960 [parallel-1] INFO com.example.demo.TransformTest - subscribe:5
21:33:46.961 [parallel-6] INFO com.example.demo.TransformTest - subscribe:1
21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:1
21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:2
21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:3
21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:4
21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:5
21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:1
21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:2
21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:3
21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:4
21:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:5

flatMap是異步的

reduce

    @Test
    public void testReduce(){
        List<String> secondList = Lists.newArrayList("1","2","3","4","5");
        Mono<Integer> reduceMono = Flux.fromIterable(secondList)
                .flatMap(e -> Mono.just(e).map(item -> Integer.valueOf(item)))
                .reduce((total, e) -> total + e);
        reduceMono.subscribe(e -> {
            LOGGER.info("subscribe:{}",e);
        });
    }

輸出

21:36:29.978 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
21:36:30.014 [main] INFO com.example.demo.TransformTest - subscribe:15

groupBy

    @Test
    public void testGroup(){
        List<String> firstList = Lists.newArrayList("a","b","c","d","e","a","b");
        Flux<GroupedFlux<String, String>> groupFlux = Flux.fromIterable(firstList)
                .map(String::toUpperCase)
                .groupBy(key -> key);
        groupFlux.subscribe(e -> {
            LOGGER.info("subscribe:{}",e.collectList().subscribe(item -> {
                LOGGER.info("item:{}",item);
            }));
        });
    }

輸出

21:37:00.912 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
21:37:00.949 [main] INFO com.example.demo.TransformTest - subscribe:reactor.core.publisher.LambdaMonoSubscriber@5faeada1
21:37:00.951 [main] INFO com.example.demo.TransformTest - subscribe:reactor.core.publisher.LambdaMonoSubscriber@1563da5
21:37:00.951 [main] INFO com.example.demo.TransformTest - subscribe:reactor.core.publisher.LambdaMonoSubscriber@2bbf4b8b
21:37:00.951 [main] INFO com.example.demo.TransformTest - subscribe:reactor.core.publisher.LambdaMonoSubscriber@30a3107a
21:37:00.951 [main] INFO com.example.demo.TransformTest - subscribe:reactor.core.publisher.LambdaMonoSubscriber@33c7e1bb
21:37:00.951 [main] INFO com.example.demo.TransformTest - item:[A, A]
21:37:00.952 [main] INFO com.example.demo.TransformTest - item:[B, B]
21:37:00.952 [main] INFO com.example.demo.TransformTest - item:[C]
21:37:00.952 [main] INFO com.example.demo.TransformTest - item:[D]
21:37:00.952 [main] INFO com.example.demo.TransformTest - item:[E]

first

    @Test
    public void testFirst(){
        List<String> firstList = Lists.newArrayList("a","b","c","d","e","a","b");
        List<String> secondList = Lists.newArrayList("1","2","3","4","5");
        Flux<String> firstFlux = Flux.fromIterable(firstList)
                .delayElements(Duration.ofMillis(200));
        Flux<String> secondFlux = Flux.fromIterable(secondList)
                .take(2);

        Flux<String> result = Flux.first(firstFlux, secondFlux);
        result.subscribe(e -> {
            LOGGER.info("subscribe:{}",e);
        });
    }

toIterable

    @Test
    public void testToIterable(){
        List<String> firstList = Lists.newArrayList("a","b","c","d","e","a","b");
        Iterable<String> itr = Flux.fromIterable(firstList)
                .map(String::toUpperCase)
                .toIterable();
        itr.forEach(e -> LOGGER.info(e));
    }

輸出

21:39:35.031 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
21:39:35.045 [main] INFO com.example.demo.TransformTest - A
21:39:35.045 [main] INFO com.example.demo.TransformTest - B
21:39:35.045 [main] INFO com.example.demo.TransformTest - C
21:39:35.045 [main] INFO com.example.demo.TransformTest - D
21:39:35.045 [main] INFO com.example.demo.TransformTest - E
21:39:35.045 [main] INFO com.example.demo.TransformTest - A
21:39:35.045 [main] INFO com.example.demo.TransformTest - B

小結(jié)

reactive streams的操作相當(dāng)于在jdk的streams的基礎(chǔ)上實(shí)現(xiàn)了reactive化,可以參照著了解。

doc

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末胯舷,一起剝皮案震驚了整個(gè)濱河市完慧,隨后出現(xiàn)的幾起案子谋旦,更是在濱河造成了極大的恐慌,老刑警劉巖屈尼,帶你破解...
    沈念sama閱讀 211,561評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件册着,死亡現(xiàn)場離奇詭異,居然都是意外死亡脾歧,警方通過查閱死者的電腦和手機(jī)甲捏,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,218評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來鞭执,“玉大人司顿,你說我怎么就攤上這事〔隙” “怎么了免猾?”我有些...
    開封第一講書人閱讀 157,162評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長囤热。 經(jīng)常有香客問我猎提,道長,這世上最難降的妖魔是什么旁蔼? 我笑而不...
    開封第一講書人閱讀 56,470評(píng)論 1 283
  • 正文 為了忘掉前任锨苏,我火速辦了婚禮,結(jié)果婚禮上棺聊,老公的妹妹穿的比我還像新娘伞租。我一直安慰自己,他們只是感情好限佩,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,550評(píng)論 6 385
  • 文/花漫 我一把揭開白布葵诈。 她就那樣靜靜地躺著,像睡著了一般祟同。 火紅的嫁衣襯著肌膚如雪作喘。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,806評(píng)論 1 290
  • 那天晕城,我揣著相機(jī)與錄音泞坦,去河邊找鬼。 笑死砖顷,一個(gè)胖子當(dāng)著我的面吹牛贰锁,可吹牛的內(nèi)容都是我干的赃梧。 我是一名探鬼主播,決...
    沈念sama閱讀 38,951評(píng)論 3 407
  • 文/蒼蘭香墨 我猛地睜開眼豌熄,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼授嘀!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起房轿,我...
    開封第一講書人閱讀 37,712評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤粤攒,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后囱持,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體夯接,經(jīng)...
    沈念sama閱讀 44,166評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,510評(píng)論 2 327
  • 正文 我和宋清朗相戀三年纷妆,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了盔几。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,643評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡掩幢,死狀恐怖逊拍,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情际邻,我是刑警寧澤芯丧,帶...
    沈念sama閱讀 34,306評(píng)論 4 330
  • 正文 年R本政府宣布,位于F島的核電站世曾,受9級(jí)特大地震影響缨恒,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜轮听,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,930評(píng)論 3 313
  • 文/蒙蒙 一骗露、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧血巍,春花似錦萧锉、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,745評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至鲫凶,卻和暖如春优俘,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背掀序。 一陣腳步聲響...
    開封第一講書人閱讀 31,983評(píng)論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留惭婿,地道東北人不恭。 一個(gè)月前我還...
    沈念sama閱讀 46,351評(píng)論 2 360
  • 正文 我出身青樓叶雹,卻偏偏與公主長得像,于是被迫代替她去往敵國和親换吧。 傳聞我的和親對(duì)象是個(gè)殘疾皇子折晦,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,509評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容