簡介
今天我們要介紹的是Reactor中的多線程模型和定時器模型,Reactor之前我們已經(jīng)介紹過了咆瘟,它實(shí)際上是觀察者模式的延伸苏研。
所以從本質(zhì)上來說截粗,Reactor是和多線程無關(guān)的袱巨。你可以把它用在多線程或者不用在多線程适篙。
今天將會給大家介紹一下如何在Reactor中使用多線程和定時器模型铁坎。
Thread多線程
先看一下之前舉的Flux的創(chuàng)建的例子:
Flux<String> flux = Flux.generate(
() -> 0,
(state, sink) -> {
sink.next("3 x " + state + " = " + 3*state);
if (state == 10) sink.complete();
return state + 1;
});
flux.subscribe(System.out::println);
可以看到蜂奸,不管是Flux generator還是subscriber,他們實(shí)際上都是運(yùn)行在同一個線程中的硬萍。
如果我們想讓subscribe發(fā)生在一個新的線程中扩所,我們需要新啟動一個線程,然后在線程內(nèi)部進(jìn)行subscribe操作朴乖。
Mono<String> mono = Mono.just("hello ");
Thread t = new Thread(() -> mono
.map(msg -> msg + "thread ")
.subscribe(v ->
System.out.println(v + Thread.currentThread().getName())
)
);
t.start();
t.join();
上面的例子中祖屏,Mono在主線程中創(chuàng)建,而subscribe發(fā)生在新啟動的Thread中买羞。
Schedule定時器
很多情況下袁勺,我們的publisher是需要定時去調(diào)用一些方法,來產(chǎn)生元素的畜普。Reactor提供了一個新的Schedule類來負(fù)責(zé)定時任務(wù)的生成和管理期丰。
Scheduler是一個接口:
public interface Scheduler extends Disposable
它定義了一些定時器中必須要實(shí)現(xiàn)的方法:
比如立即執(zhí)行的:
Disposable schedule(Runnable task);
延時執(zhí)行的:
default Disposable schedule(Runnable task, long delay, TimeUnit unit)
和定期執(zhí)行的:
default Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit)
Schedule有一個工具類叫做Schedules,它提供了多個創(chuàng)建Scheduler的方法吃挑,它的本質(zhì)就是對ExecutorService和ScheduledExecutorService進(jìn)行封裝钝荡,將其做為Supplier來創(chuàng)建Schedule。
簡單點(diǎn)看Schedule就是對ExecutorService的封裝儒鹿。
Schedulers工具類
Schedulers工具類提供了很多個有用的工具類化撕,我們來詳細(xì)介紹一下:
Schedulers.immediate():
提交的Runnable將會立馬在當(dāng)前線程執(zhí)行。
Schedulers.single():
使用同一個線程來執(zhí)行所有的任務(wù)约炎。
Schedulers.boundedElastic():
創(chuàng)建一個可重用的線程池植阴,如果線程池中的線程在長時間內(nèi)都沒有被使用蟹瘾,那么將會被回收。boundedElastic會有一個最大的線程個數(shù)掠手,一般來說是CPU cores x 10憾朴。 如果目前沒有可用的worker線程,提交的任務(wù)將會被放入隊(duì)列等待喷鸽。
Schedulers.parallel():
創(chuàng)建固定個數(shù)的工作線程众雷,個數(shù)和CPU的核數(shù)相關(guān)。
Schedulers.fromExecutorService(ExecutorService):
從一個現(xiàn)有的線程池創(chuàng)建Scheduler做祝。
Schedulers.newXXX:
Schedulers提供了很多new開頭的方法砾省,來創(chuàng)建各種各樣的Scheduler。
我們看一個Schedulers的具體應(yīng)用混槐,我們可以指定特定的Scheduler來產(chǎn)生元素:
Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))
publishOn 和 subscribeOn
publishOn和subscribeOn主要用來進(jìn)行切換Scheduler的執(zhí)行上下文编兄。
先講一個結(jié)論,就是在鏈?zhǔn)秸{(diào)用中声登,publishOn可以切換Scheduler狠鸳,但是subscribeOn并不會起作用。
這是因?yàn)檎嬲膒ublish-subscribe關(guān)系只有在subscriber開始subscribe的時候才建立悯嗓。
下面我們來具體看一下這兩個方法的使用情況:
publishOn
publishOn可以在鏈?zhǔn)秸{(diào)用的過程中件舵,進(jìn)行publish的切換:
@Test
public void usePublishOn() throws InterruptedException {
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i + ":"+ Thread.currentThread())
.publishOn(s)
.map(i -> "value " + i+":"+ Thread.currentThread());
new Thread(() -> flux.subscribe(System.out::println),"ThreadA").start();
System.out.println(Thread.currentThread());
Thread.sleep(5000);
}
上面我們創(chuàng)建了一個名字為parallel-scheduler的scheduler。
然后創(chuàng)建了一個Flux脯厨,F(xiàn)lux先做了一個map操作铅祸,然后切換執(zhí)行上下文到parallel-scheduler,最后右執(zhí)行了一次map操作俄认。
最后个少,我們采用一個新的線程來進(jìn)行subscribe的輸出。
先看下輸出結(jié)果:
Thread[main,5,main]
value 11:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]
value 12:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]
可以看到,主線程的名字是Thread眯杏。Subscriber線程的名字是ThreadA夜焦。
那么在publishOn之前,map使用的線程就是ThreadA岂贩。 而在publishOn之后茫经,map使用的線程就切換到了parallel-scheduler線程池。
subscribeOn
subscribeOn是用來切換Subscriber的執(zhí)行上下文萎津,不管subscribeOn出現(xiàn)在調(diào)用鏈的哪個部分卸伞,最終都會應(yīng)用到整個調(diào)用鏈上。
我們看一個例子:
@Test
public void useSubscribeOn() throws InterruptedException {
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i + ":" + Thread.currentThread())
.subscribeOn(s)
.map(i -> "value " + i + ":"+ Thread.currentThread());
new Thread(() -> flux.subscribe(System.out::println), "ThreadA").start();
Thread.sleep(5000);
}
同樣的锉屈,上面的例子中荤傲,我們使用了兩個map,然后在兩個map中使用了一個subscribeOn用來切換subscribe執(zhí)行上下文颈渊。
看下輸出結(jié)果:
value 11:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]
value 12:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]
可以看到遂黍,不管哪個map终佛,都是用的是切換過的parallel-scheduler。
本文的例子learn-reactive
本文作者:flydean程序那些事
本文鏈接:http://www.flydean.com/reactor-thread-scheduler/
本文來源:flydean的博客
歡迎關(guān)注我的公眾號:「程序那些事」最通俗的解讀雾家,最深刻的干貨铃彰,最簡潔的教程,眾多你不知道的小技巧等你來發(fā)現(xiàn)芯咧!