聊聊reactor異步線程的變量傳遞

本文主要研究下reactor異步線程的變量傳遞

threadlocal的問題

在傳統(tǒng)的請求/應(yīng)答同步模式中,使用threadlocal來傳遞上下文變量是非常方便的我碟,可以省得在每個方法參數(shù)添加公用的變量放案,比如當(dāng)前登錄用戶。但是業(yè)務(wù)方法可能使用了async或者在其他線程池中異步執(zhí)行怎囚,這個時候threadlocal的作用就失效了卿叽。

這個時候的解決辦法就是采取propagation模式桥胞,即在同步線程與異步線程銜接處傳播這個變量恳守。

TaskDecorator

比如spring就提供了TaskDecorator,通過實(shí)現(xiàn)這個接口贩虾,可以自己控制傳播那些變量催烘。例如:

class MdcTaskDecorator implements TaskDecorator {
 
  @Override
  public Runnable decorate(Runnable runnable) {
    // Right now: Web thread context !
    // (Grab the current thread MDC data)
    Map<String, String> contextMap = MDC.getCopyOfContextMap();
    return () -> {
      try {
        // Right now: @Async thread context !
        // (Restore the Web thread context's MDC data)
        MDC.setContextMap(contextMap);
        runnable.run();
      } finally {
        MDC.clear();
      }
    };
  }
}

這里注意在finally里頭clear

配置這個taskDecorator

@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer {
 
  @Override
  public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setTaskDecorator(new MdcTaskDecorator());
    executor.initialize();
    return executor;
  }

}

完整實(shí)例詳見Spring 4.3: Using a TaskDecorator to copy MDC data to @Async threads

Reactor Context

spring5引入webflux,其底層是基于reactor缎罢,那么reactor如何進(jìn)行上下文變量的傳播呢伊群?官方提供了Context對象來替代threadlocal。

其特性如下:

  • 類似map的kv操作策精,比如put(Object key, Object value),putAll(Context), hasKey(Object key)
  • immutable舰始,即同一個key,后面put不會覆蓋
  • 提供getOrDefault咽袜,getOrEmpty方法
  • Context與作用鏈上的每個Subscriber綁定
  • 通過subscriberContext(Context)來訪問
  • Context的作用是自底向上

實(shí)例

設(shè)置及讀取

    @Test
    public void testSubscriberContext(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello World")
                .verifyComplete();
    }

這里從最底部的subscriberContext設(shè)置message值為World丸卷,然后flatMap里頭通過subscriberContext來訪問。

自底向上

    @Test
    public void testContextSequence(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                //NOTE 這個subscriberContext設(shè)置的太高了
                .subscriberContext(ctx -> ctx.put(key, "World"))
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.getOrDefault(key, "Stranger")));

        StepVerifier.create(r)
                .expectNext("Hello Stranger")
                .verifyComplete();
    }

由于這個例子的subscriberContext設(shè)置的太高了询刹,不能作用在flatMap里頭的Mono.subscriberContext()

不可變

    @Test
    public void testContextImmutable(){
        String key = "message";

        Mono<String> r = Mono.subscriberContext()
                .map( ctx -> ctx.put(key, "Hello"))
                //這里返回了一個新的谜嫉,因此上面的設(shè)置失效了
                .flatMap( ctx -> Mono.subscriberContext())
                .map( ctx -> ctx.getOrDefault(key,"Default"));

        StepVerifier.create(r)
                .expectNext("Default")
                .verifyComplete();
    }

subscriberContext永遠(yuǎn)返回一個新的

多個連續(xù)的subscriberContext

    @Test
    public void testReadOrder(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello Reactor")
                .verifyComplete();
    }

operator只會讀取離它最近的一個context

flatMap間的subscriberContext

    @Test
    public void testContextBetweenFlatMap(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello Reactor World")
                .verifyComplete();
    }

flatMap讀取離它最近的context

flatMap中的subscriberContext

    @Test
    public void testContextInFlatMap(){
        String key = "message";
        Mono<String> r =
                Mono.just("Hello")
                        .flatMap( s -> Mono.subscriberContext()
                                .map( ctx -> s + " " + ctx.get(key))
                        )
                        .flatMap( s -> Mono.subscriberContext()
                                .map( ctx -> s + " " + ctx.get(key))
                                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                        )
                        .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello World Reactor")
                .verifyComplete();
    }

這里第一個flatMap無法讀取第二個flatMap內(nèi)部的context

小結(jié)

reactor通過提供Context來實(shí)現(xiàn)了類似同步線程threadlocal的功能,非常強(qiáng)大凹联,值得好好琢磨沐兰。

doc

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市蔽挠,隨后出現(xiàn)的幾起案子住闯,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,386評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件寞秃,死亡現(xiàn)場離奇詭異斟叼,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)春寿,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,142評論 3 394
  • 文/潘曉璐 我一進(jìn)店門朗涩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人绑改,你說我怎么就攤上這事谢床。” “怎么了厘线?”我有些...
    開封第一講書人閱讀 164,704評論 0 353
  • 文/不壞的土叔 我叫張陵识腿,是天一觀的道長。 經(jīng)常有香客問我造壮,道長渡讼,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,702評論 1 294
  • 正文 為了忘掉前任耳璧,我火速辦了婚禮成箫,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘旨枯。我一直安慰自己蹬昌,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,716評論 6 392
  • 文/花漫 我一把揭開白布攀隔。 她就那樣靜靜地躺著皂贩,像睡著了一般。 火紅的嫁衣襯著肌膚如雪昆汹。 梳的紋絲不亂的頭發(fā)上明刷,一...
    開封第一講書人閱讀 51,573評論 1 305
  • 那天,我揣著相機(jī)與錄音满粗,去河邊找鬼辈末。 笑死,一個胖子當(dāng)著我的面吹牛败潦,可吹牛的內(nèi)容都是我干的本冲。 我是一名探鬼主播,決...
    沈念sama閱讀 40,314評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼劫扒,長吁一口氣:“原來是場噩夢啊……” “哼檬洞!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起沟饥,我...
    開封第一講書人閱讀 39,230評論 0 276
  • 序言:老撾萬榮一對情侶失蹤添怔,失蹤者是張志新(化名)和其女友劉穎湾戳,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體广料,經(jīng)...
    沈念sama閱讀 45,680評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡砾脑,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,873評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了艾杏。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片韧衣。...
    茶點(diǎn)故事閱讀 39,991評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖购桑,靈堂內(nèi)的尸體忽然破棺而出畅铭,到底是詐尸還是另有隱情,我是刑警寧澤勃蜘,帶...
    沈念sama閱讀 35,706評論 5 346
  • 正文 年R本政府宣布硕噩,位于F島的核電站,受9級特大地震影響缭贡,放射性物質(zhì)發(fā)生泄漏炉擅。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,329評論 3 330
  • 文/蒙蒙 一阳惹、第九天 我趴在偏房一處隱蔽的房頂上張望谍失。 院中可真熱鬧,春花似錦穆端、人聲如沸袱贮。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,910評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至嗽仪,卻和暖如春荒勇,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背闻坚。 一陣腳步聲響...
    開封第一講書人閱讀 33,038評論 1 270
  • 我被黑心中介騙來泰國打工沽翔, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人窿凤。 一個月前我還...
    沈念sama閱讀 48,158評論 3 370
  • 正文 我出身青樓仅偎,卻偏偏與公主長得像,于是被迫代替她去往敵國和親雳殊。 傳聞我的和親對象是個殘疾皇子橘沥,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,941評論 2 355

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)夯秃,斷路器座咆,智...
    卡卡羅2017閱讀 134,657評論 18 139
  • 從三月份找實(shí)習(xí)到現(xiàn)在痢艺,面了一些公司,掛了不少介陶,但最終還是拿到小米堤舒、百度、阿里哺呜、京東舌缤、新浪、CVTE某残、樂視家的研發(fā)崗...
    時芥藍(lán)閱讀 42,248評論 11 349
  • Spring Boot 參考指南 介紹 轉(zhuǎn)載自:https://www.gitbook.com/book/qbgb...
    毛宇鵬閱讀 46,815評論 6 342
  • 尖銳的利齒閃爍著 冰冷的寒光友驮。 血色的眼眸按捺不住 內(nèi)心嗜血的瘋狂。 冰冷的寒月懸于天空之上驾锰, 飛舞的雪凝固了戰(zhàn)斗...
    無元再見閱讀 253評論 0 4
  • 1. 凌晨卸留,你關(guān)了所有的燈,鎖門的時候不自覺回頭看了看空無一人的辦公室椭豫,即使是在繁榮的北京金融街耻瑟,外面也已經(jīng)是沉寂...
    螢染閱讀 419評論 1 1