響應(yīng)式編程總覽

引子:被譽(yù)為“中國(guó)大數(shù)據(jù)第一人”的涂子沛先生在其成名作《數(shù)據(jù)之巔》里提到鹅颊,摩爾定律、社交媒體漠烧、數(shù)據(jù)挖掘是大數(shù)據(jù)的三大成因。IBM的研究稱靡砌,整個(gè)人類文明所獲得的全部數(shù)據(jù)中已脓,有90%是過去兩年內(nèi)產(chǎn)生的。在此背景下通殃,包括NoSQL度液,Hadoop, Spark, Storm, Kylin在內(nèi)的大批新技術(shù)應(yīng)運(yùn)而生。其中以RxJavaReactor為代表的響應(yīng)式(Reactive)編程技術(shù)針對(duì)的就是經(jīng)典的大數(shù)據(jù)4V定義(Volume画舌,Variety堕担,Velocity,Value)中的Velocity曲聂,即高并發(fā)問題霹购,而在即將發(fā)布的Spring 5中,也引入了響應(yīng)式編程的支持朋腋。在接下來的幾周齐疙,我會(huì)圍繞響應(yīng)式編程分三期與你分享我的一些學(xué)習(xí)心得。本篇是第二篇乍丈,以Reactor框架為例介紹響應(yīng)式編程的幾個(gè)關(guān)鍵特性剂碴。

前情概要:

1 響應(yīng)式編程總覽

In computing, reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. - Reactive programming - Wikipedia

在上述響應(yīng)式編程(后面簡(jiǎn)稱RP)的定義中,除了異步編程轻专,還包含兩個(gè)重要的關(guān)鍵詞:

  • Data streams: 即數(shù)據(jù)流,分為靜態(tài)數(shù)據(jù)流(比如數(shù)組察蹲,文件)和動(dòng)態(tài)數(shù)據(jù)流(比如事件流请垛,日志流)兩種∏⒁椋基于數(shù)據(jù)流模型宗收,RP得以提供一套統(tǒng)一的Stream風(fēng)格的數(shù)據(jù)處理接口。和Java 8中的Stream API相比亚兄,RP API除了支持靜態(tài)數(shù)據(jù)流混稽,還支持動(dòng)態(tài)數(shù)據(jù)流,并且允許復(fù)用和同時(shí)接入多個(gè)訂閱者。
  • The propagation of change: 變化傳播匈勋,簡(jiǎn)單來說就是以一個(gè)數(shù)據(jù)流為輸入礼旅,經(jīng)過一連串操作轉(zhuǎn)化為另一個(gè)數(shù)據(jù)流,然后分發(fā)給各個(gè)訂閱者的過程洽洁。這就有點(diǎn)像函數(shù)式編程中的組合函數(shù)痘系,將多個(gè)函數(shù)串聯(lián)起來,把一組輸入數(shù)據(jù)轉(zhuǎn)化為格式迥異的輸出數(shù)據(jù)饿自。

一個(gè)容易混淆的概念是響應(yīng)式設(shè)計(jì)汰翠,雖然它的名字中也包含了“響應(yīng)式”三個(gè)字,但其實(shí)和RP完全是兩碼事昭雌。響應(yīng)式設(shè)計(jì)是指網(wǎng)頁能夠自動(dòng)調(diào)整布局和樣式以適配不同尺寸的屏幕复唤,屬于網(wǎng)站設(shè)計(jì)的范疇,而RP是一種關(guān)注系統(tǒng)可響應(yīng)性烛卧,面向數(shù)據(jù)流的編程思想或者說編程框架佛纫。

特性

從本質(zhì)上說,RP是一種異步編程框架唱星,和其他框架相比雳旅,RP至少包含了以下三個(gè)特性:

  • 描述而非執(zhí)行:在你最終調(diào)用subscribe()方法之前,從發(fā)布端到訂閱端间聊,沒有任何事會(huì)發(fā)生攒盈。就好比無論多長(zhǎng)的水管,只要水龍頭不打開哎榴,水管里的水就不會(huì)流動(dòng)型豁。為了提高描述能力,RP提供了比Stream豐富的多的多的API尚蝌,比如buffer(), merge(), onErrorMap()等迎变。
  • 提高吞吐量: 類似于HTTP/2中的連接復(fù)用,RP通過線程復(fù)用來提高吞吐量飘言。在傳統(tǒng)的Servlet容器中衣形,每來一個(gè)請(qǐng)求就會(huì)發(fā)起一個(gè)線程進(jìn)行處理。受限于機(jī)器硬件資源姿鸿,單臺(tái)服務(wù)器所能支撐的線程數(shù)是存在一個(gè)上限的谆吴,假設(shè)為T,那么應(yīng)用同時(shí)能處理的請(qǐng)求數(shù)(吞吐量)必然也不會(huì)超過T苛预。但對(duì)于一個(gè)使用Spring 5開發(fā)的RP應(yīng)用句狼,如果運(yùn)行在像Netty這樣的異步容器中,無論有多少個(gè)請(qǐng)求热某,用于處理請(qǐng)求的線程數(shù)是相對(duì)固定的腻菇,因此最大吞吐量就有可能超過T胳螟。
  • 背壓(Backpressure)支持:簡(jiǎn)單來說,背壓就是一種反饋機(jī)制筹吐。在一般的Push模型中糖耸,發(fā)布者既不知道也不關(guān)心訂閱者的處理速度,當(dāng)數(shù)據(jù)的發(fā)布速度超過處理速度時(shí)骏令,需要訂閱者自己決定是緩存還是丟棄蔬捷。如果使用RP,決定權(quán)就交回給發(fā)布者榔袋,訂閱者只需要根據(jù)自己的處理能力問發(fā)布者請(qǐng)求相應(yīng)數(shù)量的數(shù)據(jù)周拐。你可能會(huì)問這不就是Pull模型嗎?其實(shí)是不同的凰兑。在Pull模型中妥粟,訂閱者每次處理完數(shù)據(jù),都要重新發(fā)起一次請(qǐng)求拉取新的數(shù)據(jù)吏够,而使用背壓勾给,訂閱者只需要發(fā)起一次請(qǐng)求,就能連續(xù)不斷的重復(fù)請(qǐng)求數(shù)據(jù)锅知。

適用場(chǎng)景

了解了RP的這些特性播急,你可能已經(jīng)猜想到RP有哪些適用場(chǎng)景了。一般來說售睹,RP適用于高并發(fā)桩警、帶延遲操作的場(chǎng)景,比如以下這些情況(的組合):

  • 一次請(qǐng)求涉及多次外部服務(wù)調(diào)用
  • 非可靠的網(wǎng)絡(luò)傳輸
  • 高并發(fā)下的消息處理
  • 彈性計(jì)算網(wǎng)絡(luò)

代價(jià)

Every coin has two sides.

和任何框架一樣昌妹,有優(yōu)勢(shì)必然就有劣勢(shì)捶枢。RP的兩個(gè)比較大的問題是:

  • 雖然復(fù)用線程有助于提高吞吐量,但一旦在某個(gè)回調(diào)函數(shù)中線程被卡住飞崖,那么這個(gè)線程上所有的請(qǐng)求都會(huì)被阻塞烂叔,最嚴(yán)重的情況,整個(gè)應(yīng)用會(huì)被拖垮固歪。
  • 難以調(diào)試蒜鸡。由于RP強(qiáng)大的描述能力,在一個(gè)典型的RP應(yīng)用中牢裳,大部分代碼都是以鏈?zhǔn)奖磉_(dá)式的形式出現(xiàn)术瓮,比如flux.map(String::toUpperCase).doOnNext(s -> LOG.info("UC String {}", s)).next().subscribe(),一旦出錯(cuò)贰健,你將很難定位到具體是哪個(gè)環(huán)節(jié)出了問題。所幸的是恬汁,RP框架一般都會(huì)提供一些工具方法來輔助進(jìn)行調(diào)試伶椿。

2 Reactor實(shí)戰(zhàn)

為了幫助你理解上面說的一些概念辜伟,下面我就通過幾個(gè)測(cè)試用例,演示RP的兩個(gè)關(guān)鍵特性:提高吞吐量和背壓脊另。完整的代碼可參見我GitHub上的示例工程导狡。

提高吞吐量

    @Test
    public void testImperative() throws InterruptedException {
        _runInParallel(CONCURRENT_SIZE, () -> {
            ImperativeRestaurantRepository.INSTANCE.insert(load);
        });
    }

    private void _runInParallel(int nThreads, Runnable task) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
        for (int i = 0; i < nThreads; i++) {
            executorService.submit(task);
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);
    }

    @Test
    public void testReactive() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(CONCURRENT_SIZE);
        for (int i = 0; i < CONCURRENT_SIZE; i++) {
            ReactiveRestaurantRepository.INSTANCE.insert(load).subscribe(s -> {
            }, e -> latch.countDown(), latch::countDown);
        }
        latch.await();
    }

用例解讀:

  • 第一個(gè)測(cè)試用例使用的是多線程+MongoDB Driver,同時(shí)起100個(gè)線程偎痛,每個(gè)線程往MongoDB中插入10000條數(shù)據(jù)旱捧,總共100萬條數(shù)據(jù),平均用時(shí)15秒左右踩麦。
  • 第二個(gè)測(cè)試用例使用的是Reactor+MongoDB Reactive Streams Driver枚赡,同樣是插入100萬條數(shù)據(jù),平均用時(shí)不到10秒谓谦,吞吐量提高了50%贫橙!

背壓

在演示測(cè)試用例之前,先看兩張圖反粥,幫助你更形象的理解什么是背壓卢肃。

圖片出處:Dataflow and simplified reactive programming

兩張圖乍一看沒啥區(qū)別,但其實(shí)是完全兩種不同的背壓策略才顿。第一張圖莫湘,發(fā)布速度(100/s)遠(yuǎn)大于訂閱速度(1/s),但由于背壓的關(guān)系郑气,發(fā)布者嚴(yán)格按照訂閱者的請(qǐng)求數(shù)量發(fā)送數(shù)據(jù)幅垮。第二張圖,發(fā)布速度(1/s)小于訂閱速度(100/s)竣贪,當(dāng)訂閱者請(qǐng)求100個(gè)數(shù)據(jù)時(shí)军洼,發(fā)布者會(huì)積滿所需個(gè)數(shù)的數(shù)據(jù)再開始發(fā)送⊙菰酰可以看到匕争,通過背壓機(jī)制,發(fā)布者可以根據(jù)各個(gè)訂閱者的能力動(dòng)態(tài)調(diào)整發(fā)布速度爷耀。

    @BeforeEach
    public void beforeEach() {
        // initialize publisher
        AtomicInteger count = new AtomicInteger();
        timerPublisher = Flux.create(s ->
                new Timer().schedule(new TimerTask() {
                    @Override
                    public void run() {
                        s.next(count.getAndIncrement());
                        if (count.get() == 10) {
                            s.complete();
                        }
                    }
                }, 100, 100)
        );
    }

    @Test
    public void testNormal() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        timerPublisher
                .subscribe(r -> System.out.println("Continuous consuming " + r),
                        e -> latch.countDown(),
                        latch::countDown);
        latch.await();
    }

    @Test
    public void testBackpressure() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference<Subscription> timerSubscription = new AtomicReference<>();
        Subscriber<Integer> subscriber = new BaseSubscriber<Integer>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                timerSubscription.set(subscription);
            }

            @Override
            protected void hookOnNext(Integer value) {
                System.out.println("consuming " + value);
            }

            @Override
            protected void hookOnComplete() {
                latch.countDown();
            }

            @Override
            protected void hookOnError(Throwable throwable) {
                latch.countDown();
            }
        };
        timerPublisher.onBackpressureDrop().subscribe(subscriber);
        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                timerSubscription.get().request(1);
            }
        }, 100, 200);
        latch.await();
    }

用例解讀:

  • 第一個(gè)測(cè)試用例演示了在理想情況下甘桑,即訂閱者的處理速度能夠跟上發(fā)布者的發(fā)布速度(以100ms為間隔產(chǎn)生10個(gè)數(shù)字),控制臺(tái)從0打印到9歹叮,一共10個(gè)數(shù)字跑杭,和發(fā)布端一致。
  • 第二個(gè)測(cè)試用例故意調(diào)慢了訂閱者的處理速度(每200ms處理一個(gè)數(shù)字)咆耿,同時(shí)發(fā)布者采用了Drop的背壓策略德谅,結(jié)果控制臺(tái)只打印了一半的數(shù)字(0,2萨螺,4窄做,6愧驱,8),另外一半的數(shù)字由于背壓的原因被發(fā)布者Drop掉了椭盏,并沒有發(fā)給訂閱者组砚。

3 小結(jié)

通過上面的介紹,不難看出RP實(shí)際上是一種內(nèi)置了發(fā)布者訂閱者模型的異步編程框架掏颊,包含了線程復(fù)用糟红,背壓等高級(jí)特性,特別適用于高并發(fā)乌叶、有延遲的場(chǎng)景盆偿。

以上就是我對(duì)響應(yīng)式編程的一些簡(jiǎn)單介紹,歡迎你到我的留言板分享枉昏,和大家一起過過招陈肛。下一篇我將綜合前兩篇的內(nèi)容,詳解一個(gè)完整的Spring 5示例應(yīng)用兄裂,敬請(qǐng)期待句旱。

4 參考

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市晰奖,隨后出現(xiàn)的幾起案子谈撒,更是在濱河造成了極大的恐慌,老刑警劉巖匾南,帶你破解...
    沈念sama閱讀 206,378評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件啃匿,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡蛆楞,警方通過查閱死者的電腦和手機(jī)溯乒,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來豹爹,“玉大人裆悄,你說我怎么就攤上這事”哿” “怎么了光稼?”我有些...
    開封第一講書人閱讀 152,702評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)孩等。 經(jīng)常有香客問我艾君,道長(zhǎng),這世上最難降的妖魔是什么肄方? 我笑而不...
    開封第一講書人閱讀 55,259評(píng)論 1 279
  • 正文 為了忘掉前任冰垄,我火速辦了婚禮,結(jié)果婚禮上权她,老公的妹妹穿的比我還像新娘播演。我一直安慰自己冀瓦,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,263評(píng)論 5 371
  • 文/花漫 我一把揭開白布写烤。 她就那樣靜靜地躺著,像睡著了一般拾徙。 火紅的嫁衣襯著肌膚如雪洲炊。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,036評(píng)論 1 285
  • 那天尼啡,我揣著相機(jī)與錄音暂衡,去河邊找鬼。 笑死崖瞭,一個(gè)胖子當(dāng)著我的面吹牛狂巢,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播书聚,決...
    沈念sama閱讀 38,349評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼唧领,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了雌续?” 一聲冷哼從身側(cè)響起斩个,我...
    開封第一講書人閱讀 36,979評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎驯杜,沒想到半個(gè)月后受啥,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,469評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡鸽心,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,938評(píng)論 2 323
  • 正文 我和宋清朗相戀三年滚局,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片顽频。...
    茶點(diǎn)故事閱讀 38,059評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡藤肢,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出冲九,到底是詐尸還是另有隱情谤草,我是刑警寧澤,帶...
    沈念sama閱讀 33,703評(píng)論 4 323
  • 正文 年R本政府宣布莺奸,位于F島的核電站丑孩,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏灭贷。R本人自食惡果不足惜温学,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,257評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望甚疟。 院中可真熱鬧仗岖,春花似錦逃延、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至檩电,卻和暖如春拄丰,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背俐末。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來泰國(guó)打工料按, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人卓箫。 一個(gè)月前我還...
    沈念sama閱讀 45,501評(píng)論 2 354
  • 正文 我出身青樓载矿,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親烹卒。 傳聞我的和親對(duì)象是個(gè)殘疾皇子闷盔,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,792評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)甫题,斷路器馁筐,智...
    卡卡羅2017閱讀 134,599評(píng)論 18 139
  • Android 自定義View的各種姿勢(shì)1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 171,506評(píng)論 25 707
  • 本篇文章介主要紹RxJava中操作符是以函數(shù)作為基本單位,與響應(yīng)式編程作為結(jié)合使用的坠非,對(duì)什么是操作敏沉、操作符都有哪些...
    嘎啦果安卓獸閱讀 2,837評(píng)論 0 10
  • 順德沒救了,徹底的炎码。 對(duì)吃已經(jīng)執(zhí)著到恨不得把自己吃了盟迟,不說如何吃人,就說魚吧潦闲,魚作為順德食材的靈魂王者攒菠,順德人可以...
    深見江閱讀 297評(píng)論 0 1
  • Amy花魅千夜閱讀 69評(píng)論 0 13