RxJava 并行操作

上一篇文章RxJava 線程模型分析詳細(xì)介紹了RxJava的線程模型讹蘑,被觀察者(Observable巫橄、Flowable...)發(fā)射的數(shù)據(jù)流可以經(jīng)歷各種線程切換,但是數(shù)據(jù)流的各個(gè)元素之間不會(huì)產(chǎn)生并行執(zhí)行的效果纺棺。我們知道并行并不是并發(fā)叨恨,不是同步,更不是異步搭盾。

Java 8新增了并行流來(lái)實(shí)現(xiàn)并行的效果咳秉,只需要在集合上調(diào)用parallelStream()即可。

        List<Integer> result = new ArrayList();
        for(Integer i=1;i<=100;i++) {

            result.add(i);
        }

        result.parallelStream()
                .map(new java.util.function.Function<Integer, String>() {


            @Override
            public String apply(Integer integer) {
                return integer.toString();
            }
        }).forEach(new java.util.function.Consumer<String>() {
            @Override
            public void accept(String s) {
                System.out.println(s);
            }
        });

如果要達(dá)到類似于 Java8 的 parallel 執(zhí)行效果鸯隅,可以借助 flatMap 操作符來(lái)實(shí)現(xiàn)并行的效果澜建。

        Observable.range(1,100)
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        return Observable.just(integer)
                                .subscribeOn(Schedulers.computation())
                                .map(new Function<Integer, String>() {

                                    @Override
                                    public String apply(Integer integer) throws Exception {
                                        return integer.toString();
                                    }
                                });
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String str) throws Exception {

                        System.out.println(str);
                    }
                });

flatMap操作符的原理是將這個(gè)Observable轉(zhuǎn)化為多個(gè)以原Observable發(fā)射的數(shù)據(jù)作為源數(shù)據(jù)的Observable,然后再將這多個(gè)Observable發(fā)射的數(shù)據(jù)整合發(fā)射出來(lái)蝌以,需要注意的是最后的順序可能會(huì)交錯(cuò)地發(fā)射出來(lái)炕舵。

flatMap.png

flatMap會(huì)對(duì)原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)執(zhí)行變換操作。在這里跟畅,生成的每個(gè)Observable可以使用線程池(指定了computation作為Scheduler)并發(fā)的執(zhí)行咽筋。

當(dāng)然我們還可以使用ExecutorService來(lái)創(chuàng)建一個(gè)Scheduler。

        int threadNum = Runtime.getRuntime().availableProcessors()+1;

        ExecutorService executor = Executors.newFixedThreadPool(threadNum);
        final Scheduler scheduler = Schedulers.from(executor);
        Observable.range(1,100)
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        return Observable.just(integer)
                                .subscribeOn(scheduler)
                                .map(new Function<Integer, String>() {

                                    @Override
                                    public String apply(Integer integer) throws Exception {
                                        return integer.toString();
                                    }
                                });
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String str) throws Exception {

                        System.out.println(str);
                    }
                });

需要補(bǔ)充的是: 當(dāng)完成所有的操作之后徊件,ExecutorService需要執(zhí)行shutdown()來(lái)關(guān)閉 ExecutorService奸攻。在這里,可以使用doFinally操作符來(lái)執(zhí)行shutdown()虱痕。

doFinally操作符可以在onError或者onComplete之后調(diào)用指定的操作睹耐,或由下游處理。

增加了doFinally操作符之后部翘,代碼是這樣的硝训。

        int threadNum = Runtime.getRuntime().availableProcessors()+1;

        final ExecutorService executor = Executors.newFixedThreadPool(threadNum);
        final Scheduler scheduler = Schedulers.from(executor);
        Observable.range(1,100)
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        return Observable.just(integer)
                                .subscribeOn(scheduler)
                                .map(new Function<Integer, String>() {

                                    @Override
                                    public String apply(Integer integer) throws Exception {
                                        return integer.toString();
                                    }
                                });
                    }
                })
                .doFinally(new Action() {
                    @Override
                    public void run() throws Exception {
                        executor.shutdown();
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String str) throws Exception {

                        System.out.println(str);
                    }
                });

Round-Robin 算法實(shí)現(xiàn)并行

Round-Robin算法是最簡(jiǎn)單的一種負(fù)載均衡算法。它的原理是把來(lái)自用戶的請(qǐng)求輪流分配給內(nèi)部的服務(wù)器:從服務(wù)器1開始略就,直到服務(wù)器N捎迫,然后重新開始循環(huán)。也被稱為哈希取模法表牢,在實(shí)際中是非常常用的數(shù)據(jù)分片方法窄绒。Round-Robin算法的優(yōu)點(diǎn)是其簡(jiǎn)潔性,它無(wú)需記錄當(dāng)前所有連接的狀態(tài)崔兴,所以它是一種無(wú)狀態(tài)調(diào)度彰导。

通過(guò) Round-Robin 算法把數(shù)據(jù)分組, 按線程數(shù)分組蛔翅,分成5組每組個(gè)數(shù)相同,一起發(fā)送處理位谋。這樣做的目的可以減少Observable的創(chuàng)建節(jié)省系統(tǒng)資源山析,但是會(huì)增加處理時(shí)間,Round-Robin 算法可以看成是對(duì)時(shí)間和空間的綜合考慮掏父。

        final AtomicInteger batch = new AtomicInteger(0);

        Observable.range(1,100)
                .groupBy(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(@NonNull Integer integer) throws Exception {
                        return batch.getAndIncrement() % 5;
                    }
                })
                .flatMap(new Function<GroupedObservable<Integer, Integer>, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(@NonNull GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) throws Exception {
                        return integerIntegerGroupedObservable.observeOn(Schedulers.io())
                                .map(new Function<Integer, String>() {

                                    @Override
                                    public String apply(@NonNull Integer integer) throws Exception {
                                        return integer.toString();
                                    }
                                });
                    }
                })
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(@NonNull Object o) throws Exception {
                        System.out.println(o);
                    }
                });

在這里笋轨,也可以使用ExecutorService創(chuàng)建Scheduler,來(lái)替代Schedulers.io()

        final AtomicInteger batch = new AtomicInteger(0);

        int threadNum = 5;

        final ExecutorService executor = Executors.newFixedThreadPool(threadNum);
        final Scheduler scheduler = Schedulers.from(executor);

        Observable.range(1,100)
                .groupBy(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(@NonNull Integer integer) throws Exception {
                        return batch.getAndIncrement() % threadNum;
                    }
                })
                .flatMap(new Function<GroupedObservable<Integer, Integer>, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(@NonNull GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) throws Exception {
                        return integerIntegerGroupedObservable.observeOn(scheduler)
                                .map(new Function<Integer, String>() {

                                    @Override
                                    public String apply(@NonNull Integer integer) throws Exception {
                                        return integer.toString();
                                    }
                                });
                    }
                })
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(@NonNull Object o) throws Exception {
                        System.out.println(o);
                    }
                });
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末赊淑,一起剝皮案震驚了整個(gè)濱河市爵政,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌陶缺,老刑警劉巖钾挟,帶你破解...
    沈念sama閱讀 217,084評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異饱岸,居然都是意外死亡掺出,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,623評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門苫费,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)汤锨,“玉大人,你說(shuō)我怎么就攤上這事黍衙∧喑” “怎么了?”我有些...
    開封第一講書人閱讀 163,450評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵琅翻,是天一觀的道長(zhǎng)位仁。 經(jīng)常有香客問(wèn)我,道長(zhǎng)方椎,這世上最難降的妖魔是什么聂抢? 我笑而不...
    開封第一講書人閱讀 58,322評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮棠众,結(jié)果婚禮上琳疏,老公的妹妹穿的比我還像新娘。我一直安慰自己闸拿,他們只是感情好空盼,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,370評(píng)論 6 390
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著新荤,像睡著了一般揽趾。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上苛骨,一...
    開封第一講書人閱讀 51,274評(píng)論 1 300
  • 那天篱瞎,我揣著相機(jī)與錄音苟呐,去河邊找鬼。 笑死俐筋,一個(gè)胖子當(dāng)著我的面吹牛牵素,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播澄者,決...
    沈念sama閱讀 40,126評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼笆呆,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了闷哆?” 一聲冷哼從身側(cè)響起腰奋,我...
    開封第一講書人閱讀 38,980評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤单起,失蹤者是張志新(化名)和其女友劉穎抱怔,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體嘀倒,經(jīng)...
    沈念sama閱讀 45,414評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡屈留,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,599評(píng)論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了测蘑。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片灌危。...
    茶點(diǎn)故事閱讀 39,773評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖碳胳,靈堂內(nèi)的尸體忽然破棺而出勇蝙,到底是詐尸還是另有隱情,我是刑警寧澤挨约,帶...
    沈念sama閱讀 35,470評(píng)論 5 344
  • 正文 年R本政府宣布味混,位于F島的核電站,受9級(jí)特大地震影響诫惭,放射性物質(zhì)發(fā)生泄漏翁锡。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,080評(píng)論 3 327
  • 文/蒙蒙 一夕土、第九天 我趴在偏房一處隱蔽的房頂上張望馆衔。 院中可真熱鬧,春花似錦怨绣、人聲如沸角溃。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,713評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)减细。三九已至,卻和暖如春咽扇,著一層夾襖步出監(jiān)牢的瞬間邪财,已是汗流浹背陕壹。 一陣腳步聲響...
    開封第一講書人閱讀 32,852評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留树埠,地道東北人糠馆。 一個(gè)月前我還...
    沈念sama閱讀 47,865評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像怎憋,于是被迫代替她去往敵國(guó)和親又碌。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,689評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容