Reactor中的Thread和Scheduler

簡介

今天我們要介紹的是Reactor中的多線程模型和定時器模型,Reactor之前我們已經(jīng)介紹過了咆瘟,它實(shí)際上是觀察者模式的延伸苏研。

所以從本質(zhì)上來說截粗,Reactor是和多線程無關(guān)的袱巨。你可以把它用在多線程或者不用在多線程适篙。

今天將會給大家介紹一下如何在Reactor中使用多線程和定時器模型铁坎。

Thread多線程

先看一下之前舉的Flux的創(chuàng)建的例子:

        Flux<String> flux = Flux.generate(
                () -> 0,
                (state, sink) -> {
                    sink.next("3 x " + state + " = " + 3*state);
                    if (state == 10) sink.complete();
                    return state + 1;
                });

        flux.subscribe(System.out::println);

可以看到蜂奸,不管是Flux generator還是subscriber,他們實(shí)際上都是運(yùn)行在同一個線程中的硬萍。

如果我們想讓subscribe發(fā)生在一個新的線程中扩所,我們需要新啟動一個線程,然后在線程內(nèi)部進(jìn)行subscribe操作朴乖。

        Mono<String> mono = Mono.just("hello ");

        Thread t = new Thread(() -> mono
                .map(msg -> msg + "thread ")
                .subscribe(v ->
                        System.out.println(v + Thread.currentThread().getName())
                )
        );
        t.start();
        t.join();

上面的例子中祖屏,Mono在主線程中創(chuàng)建,而subscribe發(fā)生在新啟動的Thread中买羞。

Schedule定時器

很多情況下袁勺,我們的publisher是需要定時去調(diào)用一些方法,來產(chǎn)生元素的畜普。Reactor提供了一個新的Schedule類來負(fù)責(zé)定時任務(wù)的生成和管理期丰。

Scheduler是一個接口:

public interface Scheduler extends Disposable 

它定義了一些定時器中必須要實(shí)現(xiàn)的方法:

比如立即執(zhí)行的:

Disposable schedule(Runnable task);

延時執(zhí)行的:

default Disposable schedule(Runnable task, long delay, TimeUnit unit)

和定期執(zhí)行的:

default Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit)

Schedule有一個工具類叫做Schedules,它提供了多個創(chuàng)建Scheduler的方法吃挑,它的本質(zhì)就是對ExecutorService和ScheduledExecutorService進(jìn)行封裝钝荡,將其做為Supplier來創(chuàng)建Schedule。

簡單點(diǎn)看Schedule就是對ExecutorService的封裝儒鹿。

Schedulers工具類

Schedulers工具類提供了很多個有用的工具類化撕,我們來詳細(xì)介紹一下:

Schedulers.immediate():

提交的Runnable將會立馬在當(dāng)前線程執(zhí)行。

Schedulers.single():

使用同一個線程來執(zhí)行所有的任務(wù)约炎。

Schedulers.boundedElastic():

創(chuàng)建一個可重用的線程池植阴,如果線程池中的線程在長時間內(nèi)都沒有被使用蟹瘾,那么將會被回收。boundedElastic會有一個最大的線程個數(shù)掠手,一般來說是CPU cores x 10憾朴。 如果目前沒有可用的worker線程,提交的任務(wù)將會被放入隊(duì)列等待喷鸽。

Schedulers.parallel():

創(chuàng)建固定個數(shù)的工作線程众雷,個數(shù)和CPU的核數(shù)相關(guān)。

Schedulers.fromExecutorService(ExecutorService):

從一個現(xiàn)有的線程池創(chuàng)建Scheduler做祝。

Schedulers.newXXX:

Schedulers提供了很多new開頭的方法砾省,來創(chuàng)建各種各樣的Scheduler。

我們看一個Schedulers的具體應(yīng)用混槐,我們可以指定特定的Scheduler來產(chǎn)生元素:

Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))

publishOn 和 subscribeOn

publishOn和subscribeOn主要用來進(jìn)行切換Scheduler的執(zhí)行上下文编兄。

先講一個結(jié)論,就是在鏈?zhǔn)秸{(diào)用中声登,publishOn可以切換Scheduler狠鸳,但是subscribeOn并不會起作用。

這是因?yàn)檎嬲膒ublish-subscribe關(guān)系只有在subscriber開始subscribe的時候才建立悯嗓。

下面我們來具體看一下這兩個方法的使用情況:

publishOn

publishOn可以在鏈?zhǔn)秸{(diào)用的過程中件舵,進(jìn)行publish的切換:

    @Test
    public void usePublishOn() throws InterruptedException {
        Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
        final Flux<String> flux = Flux
                .range(1, 2)
                .map(i -> 10 + i + ":"+ Thread.currentThread())
                .publishOn(s)
                .map(i -> "value " + i+":"+ Thread.currentThread());

        new Thread(() -> flux.subscribe(System.out::println),"ThreadA").start();
        System.out.println(Thread.currentThread());
        Thread.sleep(5000);
    }

上面我們創(chuàng)建了一個名字為parallel-scheduler的scheduler。

然后創(chuàng)建了一個Flux脯厨,F(xiàn)lux先做了一個map操作铅祸,然后切換執(zhí)行上下文到parallel-scheduler,最后右執(zhí)行了一次map操作俄认。

最后个少,我們采用一個新的線程來進(jìn)行subscribe的輸出。

先看下輸出結(jié)果:

Thread[main,5,main]
value 11:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]
value 12:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]

可以看到,主線程的名字是Thread眯杏。Subscriber線程的名字是ThreadA夜焦。

那么在publishOn之前,map使用的線程就是ThreadA岂贩。 而在publishOn之后茫经,map使用的線程就切換到了parallel-scheduler線程池。

subscribeOn

subscribeOn是用來切換Subscriber的執(zhí)行上下文萎津,不管subscribeOn出現(xiàn)在調(diào)用鏈的哪個部分卸伞,最終都會應(yīng)用到整個調(diào)用鏈上。

我們看一個例子:

    @Test
    public void useSubscribeOn() throws InterruptedException {
        Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
        final Flux<String> flux = Flux
                .range(1, 2)
                .map(i -> 10 + i + ":" + Thread.currentThread())
                .subscribeOn(s)
                .map(i -> "value " + i + ":"+ Thread.currentThread());

        new Thread(() -> flux.subscribe(System.out::println), "ThreadA").start();
        Thread.sleep(5000);
    }

同樣的锉屈,上面的例子中荤傲,我們使用了兩個map,然后在兩個map中使用了一個subscribeOn用來切換subscribe執(zhí)行上下文颈渊。

看下輸出結(jié)果:

value 11:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]
value 12:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]

可以看到遂黍,不管哪個map终佛,都是用的是切換過的parallel-scheduler。

本文的例子learn-reactive

本文作者:flydean程序那些事

本文鏈接:http://www.flydean.com/reactor-thread-scheduler/

本文來源:flydean的博客

歡迎關(guān)注我的公眾號:「程序那些事」最通俗的解讀雾家,最深刻的干貨铃彰,最簡潔的教程,眾多你不知道的小技巧等你來發(fā)現(xiàn)芯咧!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末牙捉,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子敬飒,更是在濱河造成了極大的恐慌邪铲,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,039評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件无拗,死亡現(xiàn)場離奇詭異霜浴,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)蓝纲,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來晌纫,“玉大人税迷,你說我怎么就攤上這事∏率” “怎么了箭养?”我有些...
    開封第一講書人閱讀 165,417評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長哥牍。 經(jīng)常有香客問我毕泌,道長,這世上最難降的妖魔是什么嗅辣? 我笑而不...
    開封第一講書人閱讀 58,868評論 1 295
  • 正文 為了忘掉前任撼泛,我火速辦了婚禮,結(jié)果婚禮上澡谭,老公的妹妹穿的比我還像新娘愿题。我一直安慰自己,他們只是感情好蛙奖,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,892評論 6 392
  • 文/花漫 我一把揭開白布潘酗。 她就那樣靜靜地躺著,像睡著了一般雁仲。 火紅的嫁衣襯著肌膚如雪仔夺。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,692評論 1 305
  • 那天攒砖,我揣著相機(jī)與錄音缸兔,去河邊找鬼日裙。 笑死,一個胖子當(dāng)著我的面吹牛灶体,可吹牛的內(nèi)容都是我干的阅签。 我是一名探鬼主播,決...
    沈念sama閱讀 40,416評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼蝎抽,長吁一口氣:“原來是場噩夢啊……” “哼政钟!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起樟结,我...
    開封第一講書人閱讀 39,326評論 0 276
  • 序言:老撾萬榮一對情侶失蹤养交,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后瓢宦,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體碎连,經(jīng)...
    沈念sama閱讀 45,782評論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,957評論 3 337
  • 正文 我和宋清朗相戀三年驮履,在試婚紗的時候發(fā)現(xiàn)自己被綠了鱼辙。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,102評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡玫镐,死狀恐怖倒戏,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情恐似,我是刑警寧澤杜跷,帶...
    沈念sama閱讀 35,790評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站矫夷,受9級特大地震影響葛闷,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜双藕,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,442評論 3 331
  • 文/蒙蒙 一淑趾、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧忧陪,春花似錦治笨、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至更卒,卻和暖如春等孵,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背蹂空。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評論 1 272
  • 我被黑心中介騙來泰國打工俯萌, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留果录,地道東北人。 一個月前我還...
    沈念sama閱讀 48,332評論 3 373
  • 正文 我出身青樓咐熙,卻偏偏與公主長得像弱恒,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子棋恼,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,044評論 2 355

推薦閱讀更多精彩內(nèi)容