序
本文主要研究下reactor異步線程的變量傳遞
threadlocal的問題
在傳統(tǒng)的請求/應(yīng)答同步模式中,使用threadlocal來傳遞上下文變量是非常方便的我碟,可以省得在每個方法參數(shù)添加公用的變量放案,比如當(dāng)前登錄用戶。但是業(yè)務(wù)方法可能使用了async或者在其他線程池中異步執(zhí)行怎囚,這個時候threadlocal的作用就失效了卿叽。
這個時候的解決辦法就是采取propagation模式桥胞,即在同步線程與異步線程銜接處傳播這個變量恳守。
TaskDecorator
比如spring就提供了TaskDecorator,通過實(shí)現(xiàn)這個接口贩虾,可以自己控制傳播那些變量催烘。例如:
class MdcTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
// Right now: Web thread context !
// (Grab the current thread MDC data)
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return () -> {
try {
// Right now: @Async thread context !
// (Restore the Web thread context's MDC data)
MDC.setContextMap(contextMap);
runnable.run();
} finally {
MDC.clear();
}
};
}
}
這里注意在finally里頭clear
配置這個taskDecorator
@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setTaskDecorator(new MdcTaskDecorator());
executor.initialize();
return executor;
}
}
完整實(shí)例詳見Spring 4.3: Using a TaskDecorator to copy MDC data to @Async threads
Reactor Context
spring5引入webflux,其底層是基于reactor缎罢,那么reactor如何進(jìn)行上下文變量的傳播呢伊群?官方提供了Context對象來替代threadlocal。
其特性如下:
- 類似map的kv操作策精,比如put(Object key, Object value),putAll(Context), hasKey(Object key)
- immutable舰始,即同一個key,后面put不會覆蓋
- 提供getOrDefault咽袜,getOrEmpty方法
- Context與作用鏈上的每個Subscriber綁定
- 通過subscriberContext(Context)來訪問
- Context的作用是自底向上
實(shí)例
設(shè)置及讀取
@Test
public void testSubscriberContext(){
String key = "message";
Mono<String> r = Mono.just("Hello")
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + " " + ctx.get(key)))
.subscriberContext(ctx -> ctx.put(key, "World"));
StepVerifier.create(r)
.expectNext("Hello World")
.verifyComplete();
}
這里從最底部的subscriberContext設(shè)置message值為World丸卷,然后flatMap里頭通過subscriberContext來訪問。
自底向上
@Test
public void testContextSequence(){
String key = "message";
Mono<String> r = Mono.just("Hello")
//NOTE 這個subscriberContext設(shè)置的太高了
.subscriberContext(ctx -> ctx.put(key, "World"))
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + " " + ctx.getOrDefault(key, "Stranger")));
StepVerifier.create(r)
.expectNext("Hello Stranger")
.verifyComplete();
}
由于這個例子的subscriberContext設(shè)置的太高了询刹,不能作用在flatMap里頭的Mono.subscriberContext()
不可變
@Test
public void testContextImmutable(){
String key = "message";
Mono<String> r = Mono.subscriberContext()
.map( ctx -> ctx.put(key, "Hello"))
//這里返回了一個新的谜嫉,因此上面的設(shè)置失效了
.flatMap( ctx -> Mono.subscriberContext())
.map( ctx -> ctx.getOrDefault(key,"Default"));
StepVerifier.create(r)
.expectNext("Default")
.verifyComplete();
}
subscriberContext永遠(yuǎn)返回一個新的
多個連續(xù)的subscriberContext
@Test
public void testReadOrder(){
String key = "message";
Mono<String> r = Mono.just("Hello")
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + " " + ctx.get(key)))
.subscriberContext(ctx -> ctx.put(key, "Reactor"))
.subscriberContext(ctx -> ctx.put(key, "World"));
StepVerifier.create(r)
.expectNext("Hello Reactor")
.verifyComplete();
}
operator只會讀取離它最近的一個context
flatMap間的subscriberContext
@Test
public void testContextBetweenFlatMap(){
String key = "message";
Mono<String> r = Mono.just("Hello")
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + " " + ctx.get(key)))
.subscriberContext(ctx -> ctx.put(key, "Reactor"))
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + " " + ctx.get(key)))
.subscriberContext(ctx -> ctx.put(key, "World"));
StepVerifier.create(r)
.expectNext("Hello Reactor World")
.verifyComplete();
}
flatMap讀取離它最近的context
flatMap中的subscriberContext
@Test
public void testContextInFlatMap(){
String key = "message";
Mono<String> r =
Mono.just("Hello")
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + " " + ctx.get(key))
)
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + " " + ctx.get(key))
.subscriberContext(ctx -> ctx.put(key, "Reactor"))
)
.subscriberContext(ctx -> ctx.put(key, "World"));
StepVerifier.create(r)
.expectNext("Hello World Reactor")
.verifyComplete();
}
這里第一個flatMap無法讀取第二個flatMap內(nèi)部的context
小結(jié)
reactor通過提供Context來實(shí)現(xiàn)了類似同步線程threadlocal的功能,非常強(qiáng)大凹联,值得好好琢磨沐兰。