借助JDK8和RxJava如何讓你的業(yè)務(wù)代碼運(yùn)行的更快

背景

微服務(wù)流行后,在我們項(xiàng)目開發(fā)過程中感耙,一個(gè)服務(wù)經(jīng)常會(huì)調(diào)用N個(gè)微服務(wù),調(diào)用每個(gè)微服務(wù)可能需要幾百毫秒持隧,試想即硼,一個(gè)復(fù)雜的業(yè)務(wù)如果要調(diào)用上百的微服務(wù),如果各個(gè)服務(wù)同步執(zhí)行屡拨,可能就需要花費(fèi)好幾秒只酥,試想:這些服務(wù)為什么不能并行運(yùn)行呢?

一個(gè)復(fù)雜的計(jì)算任務(wù)呀狼,為什么不能分解成更小的任務(wù)單位裂允,讓他們并行運(yùn)行呢?

本文通過以上兩個(gè)業(yè)務(wù)場景赠潦,比較各個(gè)實(shí)現(xiàn)方案的差異叫胖,在講解之前,我們先來了解下本文提到的RxJava她奥。

案例

從一段最簡單的服務(wù)開始:該服務(wù)需調(diào)用3個(gè)微服務(wù)瓮增,每個(gè)微服務(wù)費(fèi)時(shí)250ms,三個(gè)微服務(wù)都獲取數(shù)據(jù)后返回給前端(該微服務(wù)三個(gè)服務(wù)分別是商品詳情哩俭,商品評論和推薦商品列表)绷跑,如果按順序執(zhí)行,那么代碼是這樣的:

public static void main(String[] args) throws Exception {
    long c = System.currentTimeMillis();
    System.out.println("順序執(zhí)行:");
    System.out.println(service("商品詳情微服務(wù)")+service("商品評論微服務(wù)")+service("推薦商品微服務(wù)"));
    spendTime(c);
}
//模擬某個(gè)服務(wù)
private static String service(String srvName){
    try {
        Thread.sleep(250);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return srvName+"\r\n";
}
private static void spendTime(long preTime) {
    System.out.println("花費(fèi):" + (System.currentTimeMillis() - preTime) + " 毫秒");
}

這段代碼毫無疑問凡资,打印輸出:

花費(fèi):781 毫秒

改造一下砸捏,使用JDK8的CompletableFuture,3個(gè)微服務(wù)獨(dú)立線程運(yùn)行隙赁,都完成后通知主線程打印垦藏,代碼如下:

public static void main(String[] args) throws Exception {
        final long cc = System.currentTimeMillis(); 
    CompletableFuture<String> s1 = CompletableFuture.supplyAsync(() -> service("商品詳情微服務(wù)"));
    CompletableFuture<String> s2 = CompletableFuture.supplyAsync(() -> service("商品評論微服務(wù)"));
    CompletableFuture<String> s3 = CompletableFuture.supplyAsync(() -> service("推薦商品微服務(wù)"));
    s1.thenCombine(s2, (i,j)->{
        return i+j;
    }).thenCombine(s3, (i,j)->{
        System.out.println("使用JDK8的并行編程:");
        System.out.println(i+j);
        spendTime(cc);
        return i+j;
    });
}

以上代碼的執(zhí)行結(jié)果取決于3個(gè)微服務(wù)中最長時(shí)間的那個(gè)服務(wù),相比原先速度有明顯提高:

花費(fèi):311 毫秒

那么以上的代碼使用RxJava怎么來寫呢伞访?我們可以flatMap將服務(wù)分拆到各自獨(dú)立線程中去執(zhí)行掂骏,代碼如下:

private static String[] ss = {"商品詳情微服務(wù)","商品評論微服務(wù)","推薦商品微服務(wù)"};
public static void main(String[] args) throws Exception {
    Observable.range(0,3)
    .flatMap(new Function<Integer, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(Integer t) throws Exception {
            return Observable.just(t)
        .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer t) throws Exception {
                        return service(ss[t]);
                    }
                });
            }
        })
        .reduce((s1,s2)->s1+s2)
        .subscribe(s -> {
            System.out.println("Observable:\r\n" + s);
            spendTime(cc2);
        });
}

花費(fèi):455 毫秒

RxJava模擬的針對每個(gè)數(shù)據(jù)項(xiàng)的并發(fā)操作調(diào)用時(shí)間上要比直接使用JDK8的API慢得多

第二個(gè)業(yè)務(wù)場景是將復(fù)雜的計(jì)算進(jìn)行拆分子計(jì)算任務(wù),然后將每個(gè)任務(wù)計(jì)算合并成最終計(jì)算結(jié)果厚掷,以下直接給出所有源碼弟灼,我們來看看幾種計(jì)算方式在耗時(shí)上的不同,復(fù)雜計(jì)算任務(wù)是:對1到210000000開根號求總和

package com.sumslack.rxjava;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;

public class TestComputer {
    private static final int MAX_I = 210000000;
    
    private static void spendTime(long preTime) {
        System.out.println("花費(fèi):" + (System.currentTimeMillis() - preTime) + " 毫秒");
    }
    
    private static void spendTime(long preTime,String str) {
        System.out.println("[" + str + "] 花費(fèi):" + (System.currentTimeMillis() - preTime) + " 毫秒");
    }
    private static ExecutorService eService = Executors.newCachedThreadPool();
    public static void main(String[] args) throws Exception{
        
        int[] ss = new int[MAX_I];
        for(int i=1;i<=MAX_I;i++) {
            ss[i-1] = i;
        }
        
        
        long c = System.currentTimeMillis();
        System.out.println(xx(0,MAX_I));
        spendTime(c,"順序執(zhí)行");

        final long cc5 = System.currentTimeMillis();
        Observable.range(1, MAX_I).map(new Function<Integer, Double>() {
            @Override
            public Double apply(Integer t) throws Exception {
                return Math.sqrt(t);
            }
        }).reduce((i,j)->i+j)
        .subscribeOn(Schedulers.computation())
        .subscribe(s -> {
            spendTime(cc5,"Observable直接算");
        });
        final long cc = System.currentTimeMillis();
        CompletableFuture<Double> cf1 = CompletableFuture.supplyAsync(() -> {
            return xx(0,MAX_I/2);
        });
        CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {
            return xx(MAX_I/2,MAX_I);
        });
        cf1.thenCombine(cf2,  (i,j)->{
            System.out.println(""+(i+j));
            spendTime(cc,"CompletableFuture");
            return i+j;
        });
               
        //也可以用:CompletableFuture.allOf(cf1,cf2).join();
        c = System.currentTimeMillis();
        Double dd = Arrays.stream(ss).mapToDouble(d -> Math.sqrt(d)).reduce(0d,Double::sum);
        System.out.println(dd);
        spendTime(cc,"stream");
        
        c = System.currentTimeMillis();
        Double dd2 = Arrays.stream(ss).parallel().mapToDouble(d -> Math.sqrt(d)).reduce(0d,Double::sum);
        System.out.println(dd2);
        spendTime(cc,"parallel stream");
        
        final long cc2 = System.currentTimeMillis();
        Observable.fromArray(0,1,2)
        .flatMap(new io.reactivex.functions.Function<Integer,ObservableSource<Double>>(){
            @Override
            public ObservableSource<Double> apply(Integer t) throws Exception {
                if(t%3==0) {
                    return Observable.just(t)
                        .subscribeOn(Schedulers.computation())
                        .map(new Function<Integer, Double>() {
                            @Override
                            public Double apply(Integer t) throws Exception {
                                return xx(0,MAX_I/3);
                            }
                        });
                }else if(t%3==1) {
                    return Observable.just(t)
                            .subscribeOn(Schedulers.computation())
                            .map(new Function<Integer, Double>() {
                                @Override
                                public Double apply(Integer t) throws Exception {
                                    return xx(MAX_I/3,MAX_I*2/3);
                                }
                            });
                }else {
                    return Observable.just(t)
                            .subscribeOn(Schedulers.computation())
                            .map(new Function<Integer, Double>() {
                                @Override
                                public Double apply(Integer t) throws Exception {
                                    return xx(MAX_I*2/3,MAX_I);
                                }
                            });
                }
            }
        })
        .reduce(new BiFunction<Double, Double, Double>() {
            @Override
            public Double apply(Double t1, Double t2) throws Exception {
                return t1+t2;
            }
        })
        .subscribe( s->{
            System.out.println(s);
            spendTime(cc2,"Observable");
        });
        Thread.sleep(100000);
    }
    
    private static double xx(int start,int end) {
        double sum = 1;
        for(int i=start;i<end;i++) {
            sum += Math.sqrt(i+1);
        }
        return sum;
    }
}

以下是費(fèi)時(shí)結(jié)果:

[順序執(zhí)行] 花費(fèi):1086 毫秒
[CompletableFuture] 花費(fèi):537 毫秒
[stream] 花費(fèi):1028 毫秒
[parallel stream] 花費(fèi):1305 毫秒
[Observable] 花費(fèi):461 毫秒
[Observable直接算] 花費(fèi):4265 毫秒

這里使用 RxJava 進(jìn)行計(jì)算任務(wù)分解求和是最快的冒黑,因?yàn)镴DK8并發(fā)編程我們分解的是兩個(gè)計(jì)算任務(wù)田绑,而RxJava分解成3個(gè)所致!

關(guān)于RxJava

RxJavaReactive ExtensionsJava實(shí)現(xiàn)抡爹,通過使用Obserable/Flowable序列來構(gòu)建異步和基于事件的程序的庫掩驱,RxJava實(shí)現(xiàn)和擴(kuò)展了觀察者模式。

RxJava基于響應(yīng)式編程冬竟,是一種面向數(shù)據(jù)流和變化傳播的編程范式昙篙。傳統(tǒng)編程方式代碼都是順序執(zhí)行的,而響應(yīng)式編程是基于異步編程的诱咏,借助于CPU多核能力苔可,提高運(yùn)行效率,降低延遲和阻塞袋狞,基于數(shù)據(jù)流模型焚辅,如一個(gè)函數(shù)可作用與數(shù)據(jù)流中的每項(xiàng),可變化傳播苟鸯。在響應(yīng)式編程中同蜻,函數(shù)成為其第一等公民,同原型類型一樣早处,函數(shù)可作用與參數(shù)湾蔓,也可作為返回值。

RxJava基于函數(shù)式編程砌梆,傳統(tǒng)面向?qū)ο笫峭ㄟ^抽象出對象關(guān)系來解決問題默责,函數(shù)式編程是通過函數(shù)的組合來解決問題贬循。

概念

  • Observable:被訂閱者,比如在安卓開發(fā)中桃序,可能是某個(gè)數(shù)據(jù)源杖虾,數(shù)據(jù)源的變化要通知到UI,那么UI就是Observer媒熊,被訂閱者有冷熱之分奇适,熱Observable無論有沒有訂閱者訂閱,事件流始終發(fā)送芦鳍,而冷Observable則只有訂閱者訂閱事件流才開始發(fā)送數(shù)據(jù)嚷往,它們之間是可以通過API相互轉(zhuǎn)化的,比如使用publish可以冷->熱柠衅,RefCount可以熱->冷皮仁;
  • Observer:訂閱者;

RxJava編程

  • 被訂閱者:用的做多的是Observable茄茁,如果要支持背壓則使用Flowable魂贬,還可以使用Single(只要OnSuccess和onError,沒有onComplete)裙顽,Completable(創(chuàng)建后不發(fā)射任何數(shù)據(jù)付燥,只有onComplete和onError)和Maybe(只發(fā)送0或1個(gè)數(shù)據(jù));
  • 生命周期監(jiān)聽:Observable創(chuàng)建后可使用doXXX監(jiān)聽你說需要的生命周期回調(diào)愈犹;
  • 流的創(chuàng)建:create(使用一個(gè)函數(shù)從頭創(chuàng)建)键科,just(指定值創(chuàng)建,最多10個(gè))漩怎,fromXXX(基于X類創(chuàng)建)勋颖,repeat(特定數(shù)據(jù)重復(fù)N次創(chuàng)建),defer(直到有訂閱者訂閱時(shí)才創(chuàng)建)勋锤,interval(每隔一段時(shí)間創(chuàng)建一個(gè)數(shù)據(jù)發(fā)送)饭玲,timer(延遲一段時(shí)間后發(fā)送數(shù)據(jù));
  • RxJava線程模型: 內(nèi)置多個(gè)線程控制器叁执,包括single(定長為1的線程池)茄厘,newThread(啟動(dòng)新線程執(zhí)行),computation(大小為CPU核數(shù)線程池谈宛,一般用于密集型計(jì)算)次哈,io(適用IO操作),trampoline(直接在當(dāng)前線程運(yùn)行)和Schedulers.from(自定義);
  • 變化操作符:map(數(shù)據(jù)轉(zhuǎn)型)吆录,flatMap(數(shù)據(jù)轉(zhuǎn)某個(gè)Observable后合并發(fā)送)窑滞,scan(每個(gè)數(shù)據(jù)應(yīng)用一個(gè)函數(shù),然后按順序發(fā)送),groupBy(按Key分組拆分成多個(gè)Observable)哀卫,buffer(打包發(fā)送)巨坊,window,cast(強(qiáng)制轉(zhuǎn)換類型)聊训;
  • 過濾操作:filter(按條件過濾)抱究,takeLast(只發(fā)送最后N個(gè)數(shù)據(jù))恢氯,last(只發(fā)送最后一個(gè)數(shù)據(jù))带斑,lastOrDefault(只發(fā)送最后一個(gè)數(shù)據(jù),為Null發(fā)送默認(rèn)值)勋拟,takeLastBuffer(將最后N個(gè)數(shù)據(jù)當(dāng)做單個(gè)數(shù)據(jù)發(fā)送)勋磕,skip(跳過N個(gè)發(fā)送),skipLast(跳過最后N個(gè))敢靡,take(只發(fā)送開始的N個(gè)數(shù)據(jù))挂滓,first,takeFirst(只發(fā)送滿足條件的第一個(gè)數(shù)據(jù))啸胧,elementAt(只發(fā)送第N個(gè)數(shù)據(jù))赶站,timeout(指定事件內(nèi)沒發(fā)送數(shù)據(jù),就發(fā)送異常)纺念,distinct(去重)贝椿,ofType(只發(fā)送特定類型的數(shù)據(jù)),ignoreElements(丟失所有正常數(shù)據(jù)陷谱,只發(fā)送錯(cuò)誤或完成通知)烙博,sample(一段時(shí)間內(nèi),只處理最后一個(gè)數(shù)據(jù))烟逊,throttleFirst(一段時(shí)間內(nèi)渣窜,只處理第一個(gè)數(shù)據(jù)),debounce(發(fā)送一個(gè)數(shù)據(jù)宪躯,開始計(jì)時(shí)乔宿,到了規(guī)定時(shí)間沒有再發(fā)送數(shù)據(jù),則開始處理數(shù)據(jù))访雪;
  • 條件操作和布爾操作符:all(發(fā)送的數(shù)據(jù)是否都滿足條件)详瑞,contains(發(fā)送的數(shù)據(jù)是否包含某數(shù)據(jù)),amb(多個(gè)被訂閱者數(shù)據(jù)發(fā)送只發(fā)送首次被訂閱的那個(gè)數(shù)據(jù)流)冬阳,defaultIfEmpty(如果原始被訂閱者沒有值蛤虐,則發(fā)送一個(gè)默認(rèn)值),sequenceEquals(判定兩個(gè)數(shù)據(jù)流是否一樣肝陪,返回true或false)驳庭,skipUtil(直到符合條件才發(fā)送),skipWhile(直到條件不符合才開始發(fā)送),takeUntil(滿足條件后不發(fā)送)和takeWhile(條件滿足的一直發(fā)送)饲常;
  • 合并和連接操作符:merge(將多個(gè)被訂閱數(shù)據(jù)流合并)蹲堂,zip(將多個(gè)數(shù)據(jù)流結(jié)合發(fā)送,返回?cái)?shù)據(jù)流的數(shù)據(jù)個(gè)數(shù)是最少的那個(gè))贝淤,combineLastest(類似zip柒竞,任意被訂閱者開始發(fā)送數(shù)據(jù)時(shí)即發(fā)送,而zip要每個(gè)被訂閱者開始發(fā)送數(shù)據(jù)才發(fā)送)播聪,join(兩個(gè)被訂閱者結(jié)合合并朽基,總數(shù)據(jù)項(xiàng)是M*N項(xiàng)),startWith(在數(shù)據(jù)序列開頭插入指定項(xiàng))离陶,connect稼虎,靈活控制發(fā)送數(shù)據(jù)規(guī)則可使用push,refCount招刨,replay(保證所有訂閱者收到相同數(shù)據(jù))霎俩;
  • 背壓:被訂閱者發(fā)送數(shù)據(jù)過快以至于訂閱者來不及處理的情況;

總結(jié)

對于復(fù)雜計(jì)算沉眶,你可以將計(jì)算任務(wù)分解成N個(gè)子計(jì)算任務(wù)打却,交給多個(gè)線程處理并將結(jié)果合并后取得最終結(jié)果,對于服務(wù)業(yè)務(wù)的調(diào)用谎倔,你應(yīng)該清楚柳击,哪些子任務(wù)可以并行運(yùn)行,哪些需要順序執(zhí)行传藏,使用RxJava在代碼上可能更加直觀腻暮,也可以使用JDK8的CompletableFuture,其實(shí)JDK8的很多API參考了RxJava的實(shí)現(xiàn)毯侦,兩者在寫法上非常的類似哭靖,響應(yīng)式編程相比傳統(tǒng)代碼的順序執(zhí)行在思路上有很大的不同,理解上也有一定的難度侈离,希望通過本文讓您全面了解函數(shù)式編程的實(shí)現(xiàn)思路试幽。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市卦碾,隨后出現(xiàn)的幾起案子铺坞,更是在濱河造成了極大的恐慌,老刑警劉巖洲胖,帶你破解...
    沈念sama閱讀 216,544評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件济榨,死亡現(xiàn)場離奇詭異,居然都是意外死亡绿映,警方通過查閱死者的電腦和手機(jī)擒滑,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,430評論 3 392
  • 文/潘曉璐 我一進(jìn)店門腐晾,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人丐一,你說我怎么就攤上這事藻糖。” “怎么了库车?”我有些...
    開封第一講書人閱讀 162,764評論 0 353
  • 文/不壞的土叔 我叫張陵巨柒,是天一觀的道長。 經(jīng)常有香客問我柠衍,道長洋满,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,193評論 1 292
  • 正文 為了忘掉前任拧略,我火速辦了婚禮芦岂,結(jié)果婚禮上瘪弓,老公的妹妹穿的比我還像新娘垫蛆。我一直安慰自己,他們只是感情好腺怯,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,216評論 6 388
  • 文/花漫 我一把揭開白布袱饭。 她就那樣靜靜地躺著,像睡著了一般呛占。 火紅的嫁衣襯著肌膚如雪虑乖。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,182評論 1 299
  • 那天晾虑,我揣著相機(jī)與錄音疹味,去河邊找鬼。 笑死帜篇,一個(gè)胖子當(dāng)著我的面吹牛糙捺,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播笙隙,決...
    沈念sama閱讀 40,063評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼洪灯,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了竟痰?” 一聲冷哼從身側(cè)響起签钩,我...
    開封第一講書人閱讀 38,917評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎坏快,沒想到半個(gè)月后铅檩,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,329評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡莽鸿,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,543評論 2 332
  • 正文 我和宋清朗相戀三年昧旨,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,722評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡臼予,死狀恐怖鸣戴,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情粘拾,我是刑警寧澤窄锅,帶...
    沈念sama閱讀 35,425評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站缰雇,受9級特大地震影響入偷,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜械哟,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,019評論 3 326
  • 文/蒙蒙 一疏之、第九天 我趴在偏房一處隱蔽的房頂上張望脑豹。 院中可真熱鬧研叫,春花似錦脱羡、人聲如沸处铛。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,671評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽啃奴。三九已至腊满,卻和暖如春扯旷,著一層夾襖步出監(jiān)牢的瞬間拯爽,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,825評論 1 269
  • 我被黑心中介騙來泰國打工钧忽, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留毯炮,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,729評論 2 368
  • 正文 我出身青樓耸黑,卻偏偏與公主長得像桃煎,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子崎坊,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,614評論 2 353

推薦閱讀更多精彩內(nèi)容