引子:被譽(yù)為“中國(guó)大數(shù)據(jù)第一人”的涂子沛先生在其成名作《數(shù)據(jù)之巔》里提到鹅颊,摩爾定律、社交媒體漠烧、數(shù)據(jù)挖掘是大數(shù)據(jù)的三大成因。IBM的研究稱靡砌,整個(gè)人類文明所獲得的全部數(shù)據(jù)中已脓,有90%是過去兩年內(nèi)產(chǎn)生的。在此背景下通殃,包括NoSQL度液,Hadoop, Spark, Storm, Kylin在內(nèi)的大批新技術(shù)應(yīng)運(yùn)而生。其中以RxJava和Reactor為代表的響應(yīng)式(Reactive)編程技術(shù)針對(duì)的就是經(jīng)典的大數(shù)據(jù)4V定義(Volume画舌,Variety堕担,Velocity,Value)中的Velocity曲聂,即高并發(fā)問題霹购,而在即將發(fā)布的Spring 5中,也引入了響應(yīng)式編程的支持朋腋。在接下來的幾周齐疙,我會(huì)圍繞響應(yīng)式編程分三期與你分享我的一些學(xué)習(xí)心得。本篇是第二篇乍丈,以Reactor框架為例介紹響應(yīng)式編程的幾個(gè)關(guān)鍵特性剂碴。
前情概要:
1 響應(yīng)式編程總覽
In computing, reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. - Reactive programming - Wikipedia
在上述響應(yīng)式編程(后面簡(jiǎn)稱RP)的定義中,除了異步編程轻专,還包含兩個(gè)重要的關(guān)鍵詞:
- Data streams: 即數(shù)據(jù)流,分為靜態(tài)數(shù)據(jù)流(比如數(shù)組察蹲,文件)和動(dòng)態(tài)數(shù)據(jù)流(比如事件流请垛,日志流)兩種∏⒁椋基于數(shù)據(jù)流模型宗收,RP得以提供一套統(tǒng)一的Stream風(fēng)格的數(shù)據(jù)處理接口。和Java 8中的Stream API相比亚兄,RP API除了支持靜態(tài)數(shù)據(jù)流混稽,還支持動(dòng)態(tài)數(shù)據(jù)流,并且允許復(fù)用和同時(shí)接入多個(gè)訂閱者。
- The propagation of change: 變化傳播匈勋,簡(jiǎn)單來說就是以一個(gè)數(shù)據(jù)流為輸入礼旅,經(jīng)過一連串操作轉(zhuǎn)化為另一個(gè)數(shù)據(jù)流,然后分發(fā)給各個(gè)訂閱者的過程洽洁。這就有點(diǎn)像函數(shù)式編程中的組合函數(shù)痘系,將多個(gè)函數(shù)串聯(lián)起來,把一組輸入數(shù)據(jù)轉(zhuǎn)化為格式迥異的輸出數(shù)據(jù)饿自。
一個(gè)容易混淆的概念是響應(yīng)式設(shè)計(jì)汰翠,雖然它的名字中也包含了“響應(yīng)式”三個(gè)字,但其實(shí)和RP完全是兩碼事昭雌。響應(yīng)式設(shè)計(jì)是指網(wǎng)頁能夠自動(dòng)調(diào)整布局和樣式以適配不同尺寸的屏幕复唤,屬于網(wǎng)站設(shè)計(jì)的范疇,而RP是一種關(guān)注系統(tǒng)可響應(yīng)性烛卧,面向數(shù)據(jù)流的編程思想或者說編程框架佛纫。
特性
從本質(zhì)上說,RP是一種異步編程框架唱星,和其他框架相比雳旅,RP至少包含了以下三個(gè)特性:
- 描述而非執(zhí)行:在你最終調(diào)用
subscribe()
方法之前,從發(fā)布端到訂閱端间聊,沒有任何事會(huì)發(fā)生攒盈。就好比無論多長(zhǎng)的水管,只要水龍頭不打開哎榴,水管里的水就不會(huì)流動(dòng)型豁。為了提高描述能力,RP提供了比Stream豐富的多的多的API尚蝌,比如buffer()
,merge()
,onErrorMap()
等迎变。 - 提高吞吐量: 類似于HTTP/2中的連接復(fù)用,RP通過線程復(fù)用來提高吞吐量飘言。在傳統(tǒng)的Servlet容器中衣形,每來一個(gè)請(qǐng)求就會(huì)發(fā)起一個(gè)線程進(jìn)行處理。受限于機(jī)器硬件資源姿鸿,單臺(tái)服務(wù)器所能支撐的線程數(shù)是存在一個(gè)上限的谆吴,假設(shè)為T,那么應(yīng)用同時(shí)能處理的請(qǐng)求數(shù)(吞吐量)必然也不會(huì)超過T苛预。但對(duì)于一個(gè)使用Spring 5開發(fā)的RP應(yīng)用句狼,如果運(yùn)行在像Netty這樣的異步容器中,無論有多少個(gè)請(qǐng)求热某,用于處理請(qǐng)求的線程數(shù)是相對(duì)固定的腻菇,因此最大吞吐量就有可能超過T胳螟。
- 背壓(Backpressure)支持:簡(jiǎn)單來說,背壓就是一種反饋機(jī)制筹吐。在一般的Push模型中糖耸,發(fā)布者既不知道也不關(guān)心訂閱者的處理速度,當(dāng)數(shù)據(jù)的發(fā)布速度超過處理速度時(shí)骏令,需要訂閱者自己決定是緩存還是丟棄蔬捷。如果使用RP,決定權(quán)就交回給發(fā)布者榔袋,訂閱者只需要根據(jù)自己的處理能力問發(fā)布者請(qǐng)求相應(yīng)數(shù)量的數(shù)據(jù)周拐。你可能會(huì)問這不就是Pull模型嗎?其實(shí)是不同的凰兑。在Pull模型中妥粟,訂閱者每次處理完數(shù)據(jù),都要重新發(fā)起一次請(qǐng)求拉取新的數(shù)據(jù)吏够,而使用背壓勾给,訂閱者只需要發(fā)起一次請(qǐng)求,就能連續(xù)不斷的重復(fù)請(qǐng)求數(shù)據(jù)锅知。
適用場(chǎng)景
了解了RP的這些特性播急,你可能已經(jīng)猜想到RP有哪些適用場(chǎng)景了。一般來說售睹,RP適用于高并發(fā)桩警、帶延遲操作的場(chǎng)景,比如以下這些情況(的組合):
- 一次請(qǐng)求涉及多次外部服務(wù)調(diào)用
- 非可靠的網(wǎng)絡(luò)傳輸
- 高并發(fā)下的消息處理
- 彈性計(jì)算網(wǎng)絡(luò)
代價(jià)
Every coin has two sides.
和任何框架一樣昌妹,有優(yōu)勢(shì)必然就有劣勢(shì)捶枢。RP的兩個(gè)比較大的問題是:
- 雖然復(fù)用線程有助于提高吞吐量,但一旦在某個(gè)回調(diào)函數(shù)中線程被卡住飞崖,那么這個(gè)線程上所有的請(qǐng)求都會(huì)被阻塞烂叔,最嚴(yán)重的情況,整個(gè)應(yīng)用會(huì)被拖垮固歪。
- 難以調(diào)試蒜鸡。由于RP強(qiáng)大的描述能力,在一個(gè)典型的RP應(yīng)用中牢裳,大部分代碼都是以鏈?zhǔn)奖磉_(dá)式的形式出現(xiàn)术瓮,比如
flux.map(String::toUpperCase).doOnNext(s -> LOG.info("UC String {}", s)).next().subscribe()
,一旦出錯(cuò)贰健,你將很難定位到具體是哪個(gè)環(huán)節(jié)出了問題。所幸的是恬汁,RP框架一般都會(huì)提供一些工具方法來輔助進(jìn)行調(diào)試伶椿。
2 Reactor實(shí)戰(zhàn)
為了幫助你理解上面說的一些概念辜伟,下面我就通過幾個(gè)測(cè)試用例,演示RP的兩個(gè)關(guān)鍵特性:提高吞吐量和背壓脊另。完整的代碼可參見我GitHub上的示例工程导狡。
提高吞吐量
@Test
public void testImperative() throws InterruptedException {
_runInParallel(CONCURRENT_SIZE, () -> {
ImperativeRestaurantRepository.INSTANCE.insert(load);
});
}
private void _runInParallel(int nThreads, Runnable task) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
for (int i = 0; i < nThreads; i++) {
executorService.submit(task);
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
}
@Test
public void testReactive() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(CONCURRENT_SIZE);
for (int i = 0; i < CONCURRENT_SIZE; i++) {
ReactiveRestaurantRepository.INSTANCE.insert(load).subscribe(s -> {
}, e -> latch.countDown(), latch::countDown);
}
latch.await();
}
用例解讀:
- 第一個(gè)測(cè)試用例使用的是多線程+MongoDB Driver,同時(shí)起100個(gè)線程偎痛,每個(gè)線程往MongoDB中插入10000條數(shù)據(jù)旱捧,總共100萬條數(shù)據(jù),平均用時(shí)15秒左右踩麦。
- 第二個(gè)測(cè)試用例使用的是Reactor+MongoDB Reactive Streams Driver枚赡,同樣是插入100萬條數(shù)據(jù),平均用時(shí)不到10秒谓谦,吞吐量提高了50%贫橙!
背壓
在演示測(cè)試用例之前,先看兩張圖反粥,幫助你更形象的理解什么是背壓卢肃。
圖片出處:Dataflow and simplified reactive programming
兩張圖乍一看沒啥區(qū)別,但其實(shí)是完全兩種不同的背壓策略才顿。第一張圖莫湘,發(fā)布速度(100/s)遠(yuǎn)大于訂閱速度(1/s),但由于背壓的關(guān)系郑气,發(fā)布者嚴(yán)格按照訂閱者的請(qǐng)求數(shù)量發(fā)送數(shù)據(jù)幅垮。第二張圖,發(fā)布速度(1/s)小于訂閱速度(100/s)竣贪,當(dāng)訂閱者請(qǐng)求100個(gè)數(shù)據(jù)時(shí)军洼,發(fā)布者會(huì)積滿所需個(gè)數(shù)的數(shù)據(jù)再開始發(fā)送⊙菰酰可以看到匕争,通過背壓機(jī)制,發(fā)布者可以根據(jù)各個(gè)訂閱者的能力動(dòng)態(tài)調(diào)整發(fā)布速度爷耀。
@BeforeEach
public void beforeEach() {
// initialize publisher
AtomicInteger count = new AtomicInteger();
timerPublisher = Flux.create(s ->
new Timer().schedule(new TimerTask() {
@Override
public void run() {
s.next(count.getAndIncrement());
if (count.get() == 10) {
s.complete();
}
}
}, 100, 100)
);
}
@Test
public void testNormal() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
timerPublisher
.subscribe(r -> System.out.println("Continuous consuming " + r),
e -> latch.countDown(),
latch::countDown);
latch.await();
}
@Test
public void testBackpressure() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Subscription> timerSubscription = new AtomicReference<>();
Subscriber<Integer> subscriber = new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
timerSubscription.set(subscription);
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("consuming " + value);
}
@Override
protected void hookOnComplete() {
latch.countDown();
}
@Override
protected void hookOnError(Throwable throwable) {
latch.countDown();
}
};
timerPublisher.onBackpressureDrop().subscribe(subscriber);
new Timer().schedule(new TimerTask() {
@Override
public void run() {
timerSubscription.get().request(1);
}
}, 100, 200);
latch.await();
}
用例解讀:
- 第一個(gè)測(cè)試用例演示了在理想情況下甘桑,即訂閱者的處理速度能夠跟上發(fā)布者的發(fā)布速度(以100ms為間隔產(chǎn)生10個(gè)數(shù)字),控制臺(tái)從0打印到9歹叮,一共10個(gè)數(shù)字跑杭,和發(fā)布端一致。
- 第二個(gè)測(cè)試用例故意調(diào)慢了訂閱者的處理速度(每200ms處理一個(gè)數(shù)字)咆耿,同時(shí)發(fā)布者采用了Drop的背壓策略德谅,結(jié)果控制臺(tái)只打印了一半的數(shù)字(0,2萨螺,4窄做,6愧驱,8),另外一半的數(shù)字由于背壓的原因被發(fā)布者Drop掉了椭盏,并沒有發(fā)給訂閱者组砚。
3 小結(jié)
通過上面的介紹,不難看出RP實(shí)際上是一種內(nèi)置了發(fā)布者訂閱者模型的異步編程框架掏颊,包含了線程復(fù)用糟红,背壓等高級(jí)特性,特別適用于高并發(fā)乌叶、有延遲的場(chǎng)景盆偿。
以上就是我對(duì)響應(yīng)式編程的一些簡(jiǎn)單介紹,歡迎你到我的留言板分享枉昏,和大家一起過過招陈肛。下一篇我將綜合前兩篇的內(nèi)容,詳解一個(gè)完整的Spring 5示例應(yīng)用兄裂,敬請(qǐng)期待句旱。