Project Reactor 之 publishOn 與 subscribeOn

一遇革、概述

在 Spring Reactor 項目中,有兩個出鏡較少的方法:publishOnsubscribeOn锻霎。這兩個方法的作用是指定執(zhí)行 Reactive Streaming 的 Scheduler(可理解為線程池)杠巡。

為何需要指定執(zhí)行 Scheduler 呢氢拥?一個顯而易見的原因是:組成一個反應式流的代碼有快有慢,例如 NIO嫩海、BIO叁怪。如果將這些功能都放在一個線程里執(zhí)行奕谭,快的就會被慢的影響血柳,所以需要相互隔離难捌。這是這兩個方法應用的最典型的場景膝宁。

二鸦难、Scheduler

在介紹 publishOnsubscribeOn 方法之前,需要先介紹 Scheduler 這個概念员淫。在 Reactor 中合蔽,Scheduler 用來定義執(zhí)行調度任務的抽象〗榉担可以簡單理解為線程池拴事,但其實際作用要更多。先簡單介紹 Scheduler 的實現(xiàn):

  • Schedulers.elastic(): 調度器會動態(tài)創(chuàng)建工作線程圣蝎,線程數無上界挤聘,類似于 Execturos.newCachedThreadPool()
  • Schedulers.parallel(): 創(chuàng)建固定線程數的調度器,默認線程數等于 CPU 核心數捅彻。

關于 Scheduler 的更多作用留在以后介紹组去。

三从隆、publishOn 與 subscribeOn

接下來進入正題。先看兩個例子(來自 https://github.com/reactor/lite-rx-api-hands-on

publishOn 的例子

Mono<Void> fluxToBlockingRepository(Flux<User> flux, 
                                    BlockingRepository<User> repository) {
    return flux
            .publishOn(Schedulers.elastic())
            .doOnNext(repository::save)
            .then();
}

subscribeOn 的例子

Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository)  {
    return Flux.defer(() -> Flux.fromIterable(repository.findAll()))
               .subscribeOn(Schedulers.elastic());
}

這里的 repository 的類型是 BlockingRepository缝其,指的是會導致線程阻塞的數據庫操作的集合榴都,例如 JPA和屎、MyBatis 等基于 JDBC 技術實現(xiàn)的 DAO套啤。

在第一個例子中,在執(zhí)行了 publishOn(Schedulers.elastic()) 之后止潮,repository::save 就會被 Schedulers.elastic() 定義的線程池所執(zhí)行询件。

而在第二個例子中刻蟹,subscribeOn(Schedulers.elastic()) 的作用類似。它使得 repository.findAll()(也包括 Flux.fromIterable)的執(zhí)行發(fā)生在 Schedulers.elastic() 所定義的線程池中。

從上面的描述看,publishOnsubscribeOn 的作用類似,那兩者的區(qū)別又是什么沮稚?

兩者的區(qū)別

簡單說,兩者的區(qū)別在于影響范圍剩拢。publishOn 影響在其之后的 operator 執(zhí)行的線程池,而 subscribeOn 則會從源頭影響整個執(zhí)行過程办素。所以勺三,publishOn 的影響范圍和它的位置有關商源,而 subscribeOn 的影響范圍則和位置無關。

看個 publishOnsubscribeOn 同時使用的例子

Flux.just("tom")
        .map(s -> {
            System.out.println("[map] Thread name: " + Thread.currentThread().getName());
            return s.concat("@mail.com");
        })
        .publishOn(Schedulers.newElastic("thread-publishOn"))
        .filter(s -> {
            System.out.println("[filter] Thread name: " + Thread.currentThread().getName());
            return s.startsWith("t");
        })
        .subscribeOn(Schedulers.newElastic("thread-subscribeOn"))
        .subscribe(s -> {
            System.out.println("[subscribe] Thread name: " + Thread.currentThread().getName());
            System.out.println(s);
        });

輸出結果如下:

[map] Thread name: thread-subscribeOn-3
[filter] Thread name: thread-publishOn-4
[subscribe] Thread name: thread-publishOn-4
tom@mail.com

從上面的例子可以看出以政,subscribeOn 定義在 publishOn 之后,但是卻從源頭開始生效殊轴。而在 publishOn 執(zhí)行之后孽文,線程池變更為 publishOn 所定義的。

實際用途

這里介紹 publishOnsubscribeOn 的一種實際用途,那就是反應式編程和傳統(tǒng)的肥隆,會導致線程阻塞的編程技術混用的場景。其實開頭兩個例子已經解釋了這個場景憨琳。

在第一個 publishOn 的例子中篙螟,repository::save 會導致線程阻塞,為了避免造成對其它反應式操作的影響,便使用 publishOn 改變其執(zhí)行線程。

在第二個 subscribeOn 的例子中履因,repository.findAll() 會導致線程阻塞。但是其是源頭的 publisher,因此不能使用 publishOn 改變其 執(zhí)行線程。這時就需要使用 subscribeOn,在源頭上修改其執(zhí)行線程盐欺。

這樣,通過 publishOnsubscribeOn 就在反應式編程中實現(xiàn)了線程池隔離的目的粉洼,一定程度上避免了會導致線程阻塞的程序執(zhí)行影響到反應式編程的程序執(zhí)行效率宵喂。

局限性

使用 publishOnsubscribeOn 只能在一定程度上避免反應式編程代碼執(zhí)行的效率被影響锅棕。因為用來隔離的線程池資源終歸是有限的顾瞻,比如當出現(xiàn)數據庫資源不足、慢查詢等問題時几蜻,對應的線程池資源如果被耗盡梭稚,還是會使整個反應式編程的執(zhí)行效率受到影響颖低。

目前裹匙,Redis、Mongo伸辟、Couchbase 等非關系型數據庫均有相應的反應式編程的解決方案碎乃,但是關系型數據庫卻沒有理想的方案。一個重要原因是 JDBC 本身就是一個阻塞式的 API苛白,根本不可能讓其適應反應式編程。因此需要一個新的方案焚虱。目前 Oracle 正在推動 ADBA (Asynchronous Database Access API)购裙,使得關系型數據庫可以滿足異步編程的需要。但是鹃栽,因為是 Oracle 主導躏率,大家都懂的,所以目前前景還不是很明朗。另外一個技術方案是 Spring 推動的 R2DBC禾锤,從名字上來看就很像是 JDBC 在反應式編程領域的對應的解決方案私股。目前可以支持 PostgreSQL,支持 MySQL 目前還尚需時日恩掷。

后續(xù)

接下來關于 Project Reactor 的文章我打算向大家介紹一下 Hot 和 Cold Publisher 的概念以及 Project Reactor 的源碼實現(xiàn)倡鲸。

我的技術公眾號“編走編想”
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市黄娘,隨后出現(xiàn)的幾起案子峭状,更是在濱河造成了極大的恐慌,老刑警劉巖逼争,帶你破解...
    沈念sama閱讀 218,682評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件优床,死亡現(xiàn)場離奇詭異,居然都是意外死亡誓焦,警方通過查閱死者的電腦和手機胆敞,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來杂伟,“玉大人移层,你說我怎么就攤上這事『罩啵” “怎么了观话?”我有些...
    開封第一講書人閱讀 165,083評論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長越平。 經常有香客問我频蛔,道長,這世上最難降的妖魔是什么秦叛? 我笑而不...
    開封第一講書人閱讀 58,763評論 1 295
  • 正文 為了忘掉前任晦溪,我火速辦了婚禮,結果婚禮上挣跋,老公的妹妹穿的比我還像新娘三圆。我一直安慰自己,他們只是感情好浆劲,可當我...
    茶點故事閱讀 67,785評論 6 392
  • 文/花漫 我一把揭開白布嫌术。 她就那樣靜靜地躺著,像睡著了一般牌借。 火紅的嫁衣襯著肌膚如雪度气。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,624評論 1 305
  • 那天膨报,我揣著相機與錄音磷籍,去河邊找鬼适荣。 笑死,一個胖子當著我的面吹牛院领,可吹牛的內容都是我干的弛矛。 我是一名探鬼主播,決...
    沈念sama閱讀 40,358評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼比然,長吁一口氣:“原來是場噩夢啊……” “哼丈氓!你這毒婦竟也來了?” 一聲冷哼從身側響起强法,我...
    開封第一講書人閱讀 39,261評論 0 276
  • 序言:老撾萬榮一對情侶失蹤万俗,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后饮怯,有當地人在樹林里發(fā)現(xiàn)了一具尸體闰歪,經...
    沈念sama閱讀 45,722評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年蓖墅,在試婚紗的時候發(fā)現(xiàn)自己被綠了库倘。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,030評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡论矾,死狀恐怖教翩,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情拇囊,我是刑警寧澤迂曲,帶...
    沈念sama閱讀 35,737評論 5 346
  • 正文 年R本政府宣布靶橱,位于F島的核電站寥袭,受9級特大地震影響,放射性物質發(fā)生泄漏关霸。R本人自食惡果不足惜传黄,卻給世界環(huán)境...
    茶點故事閱讀 41,360評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望队寇。 院中可真熱鬧膘掰,春花似錦、人聲如沸佳遣。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽零渐。三九已至窒舟,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間诵盼,已是汗流浹背惠豺。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評論 1 270
  • 我被黑心中介騙來泰國打工银还, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人洁墙。 一個月前我還...
    沈念sama閱讀 48,237評論 3 371
  • 正文 我出身青樓蛹疯,卻偏偏與公主長得像,于是被迫代替她去往敵國和親热监。 傳聞我的和親對象是個殘疾皇子捺弦,可洞房花燭夜當晚...
    茶點故事閱讀 44,976評論 2 355

推薦閱讀更多精彩內容