一遇革、概述
在 Spring Reactor 項目中,有兩個出鏡較少的方法:publishOn
和 subscribeOn
锻霎。這兩個方法的作用是指定執(zhí)行 Reactive Streaming 的 Scheduler(可理解為線程池)杠巡。
為何需要指定執(zhí)行 Scheduler 呢氢拥?一個顯而易見的原因是:組成一個反應式流的代碼有快有慢,例如 NIO嫩海、BIO叁怪。如果將這些功能都放在一個線程里執(zhí)行奕谭,快的就會被慢的影響血柳,所以需要相互隔離难捌。這是這兩個方法應用的最典型的場景膝宁。
二鸦难、Scheduler
在介紹 publishOn
和 subscribeOn
方法之前,需要先介紹 Scheduler
這個概念员淫。在 Reactor 中合蔽,Scheduler
用來定義執(zhí)行調度任務的抽象〗榉担可以簡單理解為線程池拴事,但其實際作用要更多。先簡單介紹 Scheduler
的實現(xiàn):
-
Schedulers.elastic()
: 調度器會動態(tài)創(chuàng)建工作線程圣蝎,線程數無上界挤聘,類似于Execturos.newCachedThreadPool()
-
Schedulers.parallel()
: 創(chuàng)建固定線程數的調度器,默認線程數等于 CPU 核心數捅彻。
關于 Scheduler
的更多作用留在以后介紹组去。
三从隆、publishOn 與 subscribeOn
接下來進入正題。先看兩個例子(來自 https://github.com/reactor/lite-rx-api-hands-on)
publishOn
的例子
Mono<Void> fluxToBlockingRepository(Flux<User> flux,
BlockingRepository<User> repository) {
return flux
.publishOn(Schedulers.elastic())
.doOnNext(repository::save)
.then();
}
subscribeOn
的例子
Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
return Flux.defer(() -> Flux.fromIterable(repository.findAll()))
.subscribeOn(Schedulers.elastic());
}
這里的 repository
的類型是 BlockingRepository
缝其,指的是會導致線程阻塞的數據庫操作的集合榴都,例如 JPA和屎、MyBatis 等基于 JDBC 技術實現(xiàn)的 DAO套啤。
在第一個例子中,在執(zhí)行了 publishOn(Schedulers.elastic())
之后止潮,repository::save
就會被 Schedulers.elastic()
定義的線程池所執(zhí)行询件。
而在第二個例子中刻蟹,subscribeOn(Schedulers.elastic())
的作用類似。它使得 repository.findAll()
(也包括 Flux.fromIterable
)的執(zhí)行發(fā)生在 Schedulers.elastic()
所定義的線程池中。
從上面的描述看,publishOn
和 subscribeOn
的作用類似,那兩者的區(qū)別又是什么沮稚?
兩者的區(qū)別
簡單說,兩者的區(qū)別在于影響范圍剩拢。publishOn
影響在其之后的 operator 執(zhí)行的線程池,而 subscribeOn
則會從源頭影響整個執(zhí)行過程办素。所以勺三,publishOn
的影響范圍和它的位置有關商源,而 subscribeOn
的影響范圍則和位置無關。
看個 publishOn
和 subscribeOn
同時使用的例子
Flux.just("tom")
.map(s -> {
System.out.println("[map] Thread name: " + Thread.currentThread().getName());
return s.concat("@mail.com");
})
.publishOn(Schedulers.newElastic("thread-publishOn"))
.filter(s -> {
System.out.println("[filter] Thread name: " + Thread.currentThread().getName());
return s.startsWith("t");
})
.subscribeOn(Schedulers.newElastic("thread-subscribeOn"))
.subscribe(s -> {
System.out.println("[subscribe] Thread name: " + Thread.currentThread().getName());
System.out.println(s);
});
輸出結果如下:
[map] Thread name: thread-subscribeOn-3
[filter] Thread name: thread-publishOn-4
[subscribe] Thread name: thread-publishOn-4
tom@mail.com
從上面的例子可以看出以政,subscribeOn
定義在 publishOn
之后,但是卻從源頭開始生效殊轴。而在 publishOn
執(zhí)行之后孽文,線程池變更為 publishOn
所定義的。
實際用途
這里介紹 publishOn
和 subscribeOn
的一種實際用途,那就是反應式編程和傳統(tǒng)的肥隆,會導致線程阻塞的編程技術混用的場景。其實開頭兩個例子已經解釋了這個場景憨琳。
在第一個 publishOn
的例子中篙螟,repository::save
會導致線程阻塞,為了避免造成對其它反應式操作的影響,便使用 publishOn
改變其執(zhí)行線程。
在第二個 subscribeOn
的例子中履因,repository.findAll()
會導致線程阻塞。但是其是源頭的 publisher,因此不能使用 publishOn
改變其 執(zhí)行線程。這時就需要使用 subscribeOn
,在源頭上修改其執(zhí)行線程盐欺。
這樣,通過 publishOn
和 subscribeOn
就在反應式編程中實現(xiàn)了線程池隔離的目的粉洼,一定程度上避免了會導致線程阻塞的程序執(zhí)行影響到反應式編程的程序執(zhí)行效率宵喂。
局限性
使用 publishOn
和 subscribeOn
只能在一定程度上避免反應式編程代碼執(zhí)行的效率被影響锅棕。因為用來隔離的線程池資源終歸是有限的顾瞻,比如當出現(xiàn)數據庫資源不足、慢查詢等問題時几蜻,對應的線程池資源如果被耗盡梭稚,還是會使整個反應式編程的執(zhí)行效率受到影響颖低。
目前裹匙,Redis、Mongo伸辟、Couchbase 等非關系型數據庫均有相應的反應式編程的解決方案碎乃,但是關系型數據庫卻沒有理想的方案。一個重要原因是 JDBC 本身就是一個阻塞式的 API苛白,根本不可能讓其適應反應式編程。因此需要一個新的方案焚虱。目前 Oracle 正在推動 ADBA (Asynchronous Database Access API)购裙,使得關系型數據庫可以滿足異步編程的需要。但是鹃栽,因為是 Oracle 主導躏率,大家都懂的,所以目前前景還不是很明朗。另外一個技術方案是 Spring 推動的 R2DBC禾锤,從名字上來看就很像是 JDBC 在反應式編程領域的對應的解決方案私股。目前可以支持 PostgreSQL,支持 MySQL 目前還尚需時日恩掷。
后續(xù)
接下來關于 Project Reactor 的文章我打算向大家介紹一下 Hot 和 Cold Publisher 的概念以及 Project Reactor 的源碼實現(xiàn)倡鲸。