JAVA背壓

Reactive Streams:一種支持背壓的異步數(shù)據(jù)流處理標(biāo)準(zhǔn),主流實(shí)現(xiàn)有RxJava和Reactor颈墅,Spring WebFlux默認(rèn)集成的是Reactor蜡镶。

Reactive Streams主要解決背壓(back-pressure)問(wèn)題。當(dāng)傳入的任務(wù)速率大于系統(tǒng)處理能力時(shí)恤筛,數(shù)據(jù)處理將會(huì)對(duì)未處理數(shù)據(jù)產(chǎn)生一個(gè)緩沖區(qū)官还。

背壓依我的理解來(lái)說(shuō),是指訂閱者能和發(fā)布者交互(通過(guò)代碼里面的調(diào)用request和cancel方法交互)毒坛,可以調(diào)節(jié)發(fā)布者發(fā)布數(shù)據(jù)的速率望伦,解決把訂閱者壓垮的問(wèn)題林说。關(guān)鍵在于上面例子里面的訂閱關(guān)系Subscription這個(gè)接口,他有request和cancel 2個(gè)方法屯伞,用于通知發(fā)布者需要數(shù)據(jù)和通知發(fā)布者不再接受數(shù)據(jù)腿箩。

我們重點(diǎn)理解背壓在jdk9里面是如何實(shí)現(xiàn)的。關(guān)鍵在于發(fā)布者Publisher的實(shí)現(xiàn)類(lèi)SubmissionPublisher的submit方法是block方法劣摇。訂閱者會(huì)有一個(gè)緩沖池珠移,默認(rèn)為Flow.defaultBufferSize() = 256。當(dāng)訂閱者的緩沖池滿了之后末融,發(fā)布者調(diào)用submit方法發(fā)布數(shù)據(jù)就會(huì)被阻塞钧惧,發(fā)布者就會(huì)停(慢)下來(lái);訂閱者消費(fèi)了數(shù)據(jù)之后(調(diào)用Subscription.request方法)勾习,緩沖池有位置了浓瞪,submit方法就會(huì)繼續(xù)執(zhí)行下去,就是通過(guò)這樣的機(jī)制语卤,實(shí)現(xiàn)了調(diào)節(jié)發(fā)布者發(fā)布數(shù)據(jù)的速率追逮,消費(fèi)得快,生成就快粹舵,消費(fèi)得慢钮孵,發(fā)布者就會(huì)被阻塞,當(dāng)然就會(huì)慢下來(lái)了眼滤。

Reactive Streams由4個(gè)Java接口構(gòu)成:

  • 處理器(Processor)
  • 發(fā)布商(Publisher)
  • 訂閱用戶(Subscriber)
  • 訂閱(Subscription)

Flow類(lèi)允許相互關(guān)聯(lián)的接口和靜態(tài)方法來(lái)建立流控制組件巴席,其中發(fā)布者產(chǎn)生由一個(gè)或多個(gè)訂閱者消費(fèi)的項(xiàng)目,每個(gè)訂閱者由訂閱管理诅需。

Reactive Streams構(gòu)建在java.util.concurrent.Flow容器對(duì)象下漾唉,開(kāi)發(fā)者可以在這里找到Flow.Publisher,一個(gè)用作lambda表達(dá)式或方法引用的賦值目標(biāo)功能接口堰塌。該接口可以讓開(kāi)發(fā)者更容易生成Flow.Subscription元素赵刑,并且將它們鏈接在一起琐旁。

另一個(gè)元素Flow.Subscriber缭保,是異步工作機(jī)制喻犁,由請(qǐng)求觸發(fā)钞啸。它可以從Flow.Subscription請(qǐng)求多個(gè)元素,開(kāi)發(fā)者還可以根據(jù)需要自定義緩沖區(qū)大小双饥。

背壓示例代碼1

/**
 * reactive stream
 * 背壓
 */
@Slf4j
public class ReactiveExample1 {

    public static void main(String[] args) throws InterruptedException {
        //1.發(fā)布者
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

        //2. 訂閱者
        Flow.Subscriber subscriber = new Flow.Subscriber() {
            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                log.info("onSubscribe");
                //請(qǐng)求數(shù)據(jù)
                subscription.request(1);
                this.subscription = subscription;
            }

            /**
             * 處理數(shù)據(jù)
             * @param item
             */
            @Override
            public void onNext(Object item) {
                log.info("item:{}", item);
                log.info("onNext");
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                this.subscription.request(1);
            }

            /**
             * 數(shù)據(jù)處理異常
             * @param throwable
             */
            @Override
            public void onError(Throwable throwable) {
                log.info("onError");
            }

            /**
             * 數(shù)據(jù)完成
             */

            @Override
            public void onComplete() {
                log.info("onComplete");
            }
        };

        // 3. 建立關(guān)系
        publisher.subscribe(subscriber);
        // 4. 生產(chǎn)數(shù)據(jù)
        for (int i = 0; i < 500; i++) {
            publisher.submit("test" + i);
            log.info("submit:{}","test" + i);
        }

        // 5 .結(jié)束關(guān)閉
        publisher.close();
        TimeUnit.SECONDS.sleep(10);

    }
}

11:28:53.864 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - onSubscribe
11:28:53.878 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - item:test0
11:28:53.879 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test0
11:28:53.881 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - onNext
11:28:53.881 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test1
11:28:53.881 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test2
.
.
.
.
.
.
11:28:53.896 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test252
11:28:53.896 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test253
11:28:53.896 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test254
11:28:53.897 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test255
11:28:53.897 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test256
11:28:55.882 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test257
11:28:55.882 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - item:test1
11:28:55.883 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - onNext
11:28:57.884 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - item:test2
11:28:57.884 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test258
11:28:57.884 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - onNext
11:28:59.885 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - item:test3
11:28:59.885 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test259
11:28:59.885 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - onNext
11:29:01.886 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - item:test4
11:29:01.886 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - onNext
11:29:01.886 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test260
11:29:03.886 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - item:test5
11:29:03.886 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test261
11:29:03.886 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - onNext
11:29:05.887 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - item:test6
11:29:05.887 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test262
11:29:05.887 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - onNext
11:29:07.888 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - item:test7
11:29:07.888 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test263
11:29:07.888 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - onNext

根據(jù)運(yùn)行結(jié)果我們可以看到任務(wù)提交256 后就不能直接提交了, 消費(fèi)者消費(fèi)完一條消息后又可以提交一條數(shù)據(jù),這些就起到了流控的作用

傳統(tǒng)的發(fā)布訂閱模式, 生產(chǎn)者并不能夠根據(jù)消費(fèi)者調(diào)節(jié)生成速率


@Slf4j
public class NormalPublisherSubscriber {

    public static void main(String[] args) {

        BlockingQueue<String> queue = new LinkedBlockingDeque();

        ExecutorService executorService = Executors.newCachedThreadPool();
        //訂閱者,消費(fèi)者
        executorService.submit(() -> {
            try {
                while (true) {
                    String take = queue.take();
                    log.info("Received :{}", take);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        //發(fā)布者,生產(chǎn)者
        executorService.submit(() -> {
            try {
                queue.put("test1");
                queue.put("test2");
                queue.put("test3");
                queue.put("test4");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        });
        executorService.shutdown();

    }
}

參考1 https://www.imooc.com/article/27181

參考2 https://blog.csdn.net/houzhizhen/article/details/78195210

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末朽缴,一起剝皮案震驚了整個(gè)濱河市祟蚀,隨后出現(xiàn)的幾起案子瞎疼,更是在濱河造成了極大的恐慌科乎,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,639評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件贼急,死亡現(xiàn)場(chǎng)離奇詭異茅茂,居然都是意外死亡捏萍,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)玉吁,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)照弥,“玉大人腻异,你說(shuō)我怎么就攤上這事进副。” “怎么了悔常?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,221評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵影斑,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我机打,道長(zhǎng)矫户,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,474評(píng)論 1 283
  • 正文 為了忘掉前任残邀,我火速辦了婚禮皆辽,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘芥挣。我一直安慰自己驱闷,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,570評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布空免。 她就那樣靜靜地躺著空另,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蹋砚。 梳的紋絲不亂的頭發(fā)上扼菠,一...
    開(kāi)封第一講書(shū)人閱讀 49,816評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音坝咐,去河邊找鬼循榆。 笑死,一個(gè)胖子當(dāng)著我的面吹牛墨坚,可吹牛的內(nèi)容都是我干的秧饮。 我是一名探鬼主播,決...
    沈念sama閱讀 38,957評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼框杜,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼浦楣!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起咪辱,我...
    開(kāi)封第一講書(shū)人閱讀 37,718評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤振劳,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后油狂,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體历恐,經(jīng)...
    沈念sama閱讀 44,176評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡寸癌,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,511評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了弱贼。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蒸苇。...
    茶點(diǎn)故事閱讀 38,646評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖吮旅,靈堂內(nèi)的尸體忽然破棺而出溪烤,到底是詐尸還是另有隱情,我是刑警寧澤庇勃,帶...
    沈念sama閱讀 34,322評(píng)論 4 330
  • 正文 年R本政府宣布檬嘀,位于F島的核電站,受9級(jí)特大地震影響责嚷,放射性物質(zhì)發(fā)生泄漏鸳兽。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,934評(píng)論 3 313
  • 文/蒙蒙 一罕拂、第九天 我趴在偏房一處隱蔽的房頂上張望揍异。 院中可真熱鬧,春花似錦爆班、人聲如沸衷掷。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,755評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)棍鳖。三九已至,卻和暖如春碗旅,著一層夾襖步出監(jiān)牢的瞬間渡处,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,987評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工祟辟, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留医瘫,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,358評(píng)論 2 360
  • 正文 我出身青樓旧困,卻偏偏與公主長(zhǎng)得像醇份,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子吼具,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,514評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容