這篇文章中,會展示一個Java9中FlowAPI的列子昧旨,通過Publisher和Subscriber接口來構(gòu)建響應(yīng)式程序。最后你將會理解這種全新的編程模式和她的優(yōu)缺點富拗。所有的代碼可在Github上下載臼予。
Java9 Flow API介紹
JDK9響應(yīng)式編程
Java是一個“古老”并且廣泛應(yīng)用的編程語言,但Java9中引入了一些新鮮有趣的特性啃沪。這篇文章主要介紹FlowAPI這個新特性粘拾,通過FlowAPI我們僅僅使用JDK就能夠搭建響應(yīng)式應(yīng)用程序,而不需要其他額外的類庫创千,如RxJava或Project Reactor缰雇。
盡管如此,當你看到過接口文檔后你就會明白到正如字面所說追驴,這只是一個API而已械哟。她僅僅包含了一些Interface和一個實現(xiàn)類:
- Interface
Flow.Publisher<T>
定義了生產(chǎn)數(shù)據(jù)和控制事件的方法。 - Interface
Flow.Subscriber<T>
定義了消費數(shù)據(jù)和事件的方法殿雪。 - Interface
Flow.Subscription
定義了鏈接Publisher和Subscriber的方法暇咆。 - Interface
Flow.Processor<T,R>
定義了轉(zhuǎn)換Publisher到Subscriber的方法 - 最后,class
SubmissionPublisher<T>
是Flow.Publisher<T>
的實現(xiàn)丙曙,她可以靈活的生產(chǎn)數(shù)據(jù)爸业,同時與Reactive Stream兼容。
雖然Java9中沒有很多FlowAPI的實現(xiàn)類可供我們使用亏镰,但是依靠這些接口第三方可以提供的響應(yīng)式編程得到了規(guī)范和統(tǒng)一扯旷,比如從JDBC driver到RabbitMQ的響應(yīng)式實現(xiàn)。
Pull索抓,Push钧忽,Pull-Push
我對響應(yīng)式編程的理解是毯炮, 這是一種數(shù)據(jù)消費者控制數(shù)據(jù)流的編程方式。需要指出是耸黑,當消費速度低于生產(chǎn)速度時桃煎,消費者要求生產(chǎn)者降低速度以完全消費數(shù)據(jù)(這個現(xiàn)象稱作back-pressure)。這種處理方式不是在制造混亂崎坊,你可能已經(jīng)使用過這種模式备禀,只是最近因為在主要框架和平臺上使用才變得更流行,比如Java9奈揍,Spring5。另外在分布式系統(tǒng)中處理大規(guī)模數(shù)據(jù)傳輸時也使用到了這種模式赋续。
回顧過去可以幫我們更好的理解這種模式男翰。幾年前,最常見的消費數(shù)據(jù)模式是pull-based纽乱。client端不斷輪詢服務(wù)端以獲取數(shù)據(jù)蛾绎。這種模式的優(yōu)點是當client端資源有限時可以更好的控制數(shù)據(jù)流(停止輪詢),而缺點是當服務(wù)端沒有數(shù)據(jù)時輪詢是對計算資源和網(wǎng)絡(luò)資源的浪費鸦列。
隨著時間推移租冠,處理數(shù)據(jù)的模式轉(zhuǎn)變?yōu)閜ush-based,生產(chǎn)者不關(guān)心消費者的消費能力薯嗤,直接推送數(shù)據(jù)顽爹。這種模式的缺點是當消費資源低于生產(chǎn)資源時會造成緩沖區(qū)溢出從而數(shù)據(jù)丟失,當丟失率維持在較小的數(shù)值時還可以接受骆姐,但是當這個比率變大時我們會希望生產(chǎn)者降速以避免大規(guī)模數(shù)據(jù)丟失镜粤。
響應(yīng)式編程是一種pull-push混合模式以綜合他們的優(yōu)點,這種模式下消費者負責(zé)請求數(shù)據(jù)以控制生產(chǎn)者數(shù)據(jù)流玻褪,同時當處理資源不足時也可以選擇阻斷或者丟棄數(shù)據(jù)肉渴,接下來我們會看到一個典型案例。
Flow與Stream
響應(yīng)式編程并不是為了替換傳統(tǒng)編程带射,其實兩者相互兼容而且可以互相協(xié)作完成任務(wù)同规。Java8中引入的StreamAPI通過map,reduce以及其他操作可以完美的處理數(shù)據(jù)集窟社,而FlowAPI則專注于處理數(shù)據(jù)的流通券勺,比如對數(shù)據(jù)的請求,減速桥爽,丟棄朱灿,阻塞等。同時你可以使用Streams作為數(shù)據(jù)源(publisher)钠四,當必要時阻塞丟棄其中的數(shù)據(jù)盗扒。你也可以在Subscriber中使用Streams以進行數(shù)據(jù)的歸并操作跪楞。更值得一提的時reactive streams不僅兼容傳統(tǒng)編程方式,而且還支持函數(shù)式編程以極大的提高可讀性和可維護性侣灶。
有一點可能會使我們感到困惑:如果你需要在兩個系統(tǒng)間傳輸數(shù)據(jù)甸祭,同時進行轉(zhuǎn)形操作,如何使用Flows和Streams來完成褥影?這種情況下池户,我們使用Java8的Function來做數(shù)據(jù)轉(zhuǎn)換,但是如何在Publisher和Subscriber之間使用StreamAPI呢凡怎?答案是我們可以在Publisher和Subscriber之間再加一個subscriber校焦,她可以從最初的publisher獲取數(shù)據(jù),轉(zhuǎn)換统倒,然后再作為一個新的publisher寨典,而使最初的subscriber訂閱這個新的publisher,也是Java9中的接口Flow.Processor<T,R>
房匆,我們只需要實現(xiàn)這個接口并編寫轉(zhuǎn)換數(shù)據(jù)的functions耸成。
從技術(shù)上講,我們完全可以使用Flows來替換Streams浴鸿,但任何時候都這么做就顯得過于偏激井氢。比如,我們創(chuàng)建一個Publisher來作為int數(shù)組的數(shù)據(jù)源岳链,然后在Processor中轉(zhuǎn)換Integer為String花竞,最后創(chuàng)建一個Subscriber來歸并到一個String中。這個時候就完全沒有必要使用Flows宠页,因為這不是在控制兩個模塊或兩個線程間的數(shù)據(jù)通信左胞,這個時候使用Streams更為合理。
一個雜志出版商的使用場景
本文中給出的示例代碼是以雜志出版商為模型举户。假設(shè)出版商有兩個訂閱客戶烤宙。
出版商將為每個訂閱客戶出版20本雜志。出版商知道他們的客戶有時在郵遞雜志時會不在家俭嘁,而當他們的郵箱(subscriber buffer)不巧被塞滿時郵遞員會退回或丟棄雜志躺枕,出版商不希望出現(xiàn)這種情況。
于是出版商發(fā)明了一個郵遞系統(tǒng):當客戶在家時再給出版商致電供填,出版商會立即郵遞一份雜志拐云。出版商打算在辦公室為每個客戶保留一個小號的郵箱以防當雜志出版時客戶沒有第一時間致電獲取。出版商認為為每個客戶預(yù)留一個可以容納8份雜志的郵件已經(jīng)足夠(publisher buffer)近她。
于是一名員工提出了以下不同的場景:
- 如果客戶請求雜志足夠迅速叉瘩,將不會存在郵箱容量的問題。
- 如果客戶沒有以雜志出版的速度發(fā)出請求粘捎,那么郵箱將被塞滿薇缅。這位員工提出以下幾種處理方案:
a. 增加郵箱容量危彩,為每位客戶提供可容納20份雜志的郵箱。(publisher增加buffer)
b. 直到客戶請求下一份雜志之前停止印刷泳桦,并且根據(jù)客戶請求的速度降低印刷速度以清空郵箱汤徽。
c. 新的雜志直接丟掉。
d. 一個折中的方案: 如果郵箱滿了灸撰,在下次打印之前等待一段時間谒府,如果還是沒有足夠的空間則丟棄新的雜志。
出版商無法承受花費過多的資源僅僅是因為一個速度慢的客戶浮毯,那將是巨大的浪費完疫,最終選擇了方案d,最大程度上減少客戶損失。
本文示例代碼中選用了方案d是因為如果我們使用了一個虛擬的無窮buffer,這對理解Reactive模式的中概念是不利的员帮,代碼也將變得過于簡易亥贸,無法與其他方案進行比較,接下來讓我們來看代碼吧讯嫂。
Java9 Flow 代碼示例
一個簡單的Subscriber(full-control of Flow)
從訂閱者開始蹦锋,MagazineSubscriber
實現(xiàn)了Flow.Subscriber<Integer>
,訂閱者將收到一個數(shù)字欧芽,但請假設(shè)這是一份雜志正如上面的使用場景提到的莉掂。
package com.thepracticaldeveloper;
import java.util.concurrent.Flow;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MagazineSubscriber implements Flow.Subscriber<Integer> {
public static final String JACK = "Jack";
public static final String PETE = "Pete";
private static final Logger log = LoggerFactory.
getLogger(MagazineSubscriber.class);
private final long sleepTime;
private final String subscriberName;
private Flow.Subscription subscription;
private int nextMagazineExpected;
private int totalRead;
MagazineSubscriber(final long sleepTime, final String subscriberName) {
this.sleepTime = sleepTime;
this.subscriberName = subscriberName;
this.nextMagazineExpected = 1;
this.totalRead = 0;
}
@Override
public void onSubscribe(final Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(final Integer magazineNumber) {
if (magazineNumber != nextMagazineExpected) {
IntStream.range(nextMagazineExpected, magazineNumber).forEach(
(msgNumber) ->
log("Oh no! I missed the magazine " + msgNumber)
);
// Catch up with the number to keep tracking missing ones
nextMagazineExpected = magazineNumber;
}
log("Great! I got a new magazine: " + magazineNumber);
takeSomeRest();
nextMagazineExpected++;
totalRead++;
log("I'll get another magazine now, next one should be: " +
nextMagazineExpected);
subscription.request(1);
}
@Override
public void onError(final Throwable throwable) {
log("Oops I got an error from the Publisher: " + throwable.getMessage());
}
@Override
public void onComplete() {
log("Finally! I completed the subscription, I got in total " +
totalRead + " magazines.");
}
private void log(final String logMessage) {
log.info("<=========== [" + subscriberName + "] : " + logMessage);
}
public String getSubscriberName() {
return subscriberName;
}
private void takeSomeRest() {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
class中實現(xiàn)了必要的方法如下:
-
onSubscriber(subscription)
Publisher在被指定一個新的Subscriber時調(diào)用此方法。 一般來說你需要在subscriber內(nèi)部保存這個subscrition實例千扔,因為后面會需要通過她向publisher發(fā)送信號來完成:請求更多數(shù)據(jù)憎妙,或者取消訂閱。 一般在這里我們會直接請求第一個數(shù)據(jù)曲楚,正如代碼中所示厘唾。 -
onNext(magazineNumber)
每當新的數(shù)據(jù)產(chǎn)生,這個方法會被調(diào)用龙誊。在我們的示例中抚垃,我們用到了最經(jīng)典的使用方式:處理這個數(shù)據(jù)的同時再請求下一個數(shù)據(jù)。然而我們在這中間添加了一段可配置的sleep時間趟大,這樣我們可以嘗試訂閱者在不同場景下的表現(xiàn)鹤树。剩下的一段邏輯判斷僅僅是記錄下丟失的雜志(當publisher出現(xiàn)丟棄數(shù)據(jù)的時候)。 -
onError(throwable)
當publisher出現(xiàn)異常時會調(diào)用subscriber的這個方法逊朽。在我們的實現(xiàn)中publisher丟棄數(shù)據(jù)時會產(chǎn)生異常罕伯。 -
onComplete()
當publisher數(shù)據(jù)推送完畢時會調(diào)用此方法,于是整個訂閱過程結(jié)束叽讳。
通過Java9 SubmissionPublisher發(fā)送數(shù)據(jù)
我們將使用Java9 SubmissionPublisher
類來創(chuàng)建publisher追他。正如javadoc所述坟募, 當subscribers消費過慢,就像Reactive Streams中的Publisher一樣她會阻塞或丟棄數(shù)據(jù)湿酸。在深入理解之前讓我們先看代碼婿屹。
package com.thepracticaldeveloper;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ReactiveFlowApp {
private static final int NUMBER_OF_MAGAZINES = 20;
private static final long MAX_SECONDS_TO_KEEP_IT_WHEN_NO_SPACE = 2;
private static final Logger log =
LoggerFactory.getLogger(ReactiveFlowApp.class);
public static void main(String[] args) throws Exception {
final ReactiveFlowApp app = new ReactiveFlowApp();
log.info("\n\n### CASE 1: Subscribers are fast, buffer size is not so " +
"important in this case.");
app.magazineDeliveryExample(100L, 100L, 8);
log.info("\n\n### CASE 2: A slow subscriber, but a good enough buffer " +
"size on the publisher's side to keep all items until they're picked up");
app.magazineDeliveryExample(1000L, 3000L, NUMBER_OF_MAGAZINES);
log.info("\n\n### CASE 3: A slow subscriber, and a very limited buffer " +
"size on the publisher's side so it's important to keep the slow " +
"subscriber under control");
app.magazineDeliveryExample(1000L, 3000L, 8);
}
void magazineDeliveryExample(final long sleepTimeJack,
final long sleepTimePete,
final int maxStorageInPO) throws Exception {
final SubmissionPublisher<Integer> publisher =
new SubmissionPublisher<>(ForkJoinPool.commonPool(), maxStorageInPO);
final MagazineSubscriber jack = new MagazineSubscriber(
sleepTimeJack,
MagazineSubscriber.JACK
);
final MagazineSubscriber pete = new MagazineSubscriber(
sleepTimePete,
MagazineSubscriber.PETE
);
publisher.subscribe(jack);
publisher.subscribe(pete);
log.info("Printing 20 magazines per subscriber, with room in publisher for "
+ maxStorageInPO + ". They have " + MAX_SECONDS_TO_KEEP_IT_WHEN_NO_SPACE +
" seconds to consume each magazine.");
IntStream.rangeClosed(1, 20).forEach((number) -> {
log.info("Offering magazine " + number + " to consumers");
final int lag = publisher.offer(
number,
MAX_SECONDS_TO_KEEP_IT_WHEN_NO_SPACE,
TimeUnit.SECONDS,
(subscriber, msg) -> {
subscriber.onError(
new RuntimeException("Hey " + ((MagazineSubscriber) subscriber)
.getSubscriberName() + "! You are too slow getting magazines" +
" and we don't have more space for them! " +
"I'll drop your magazine: " + msg));
return false; // don't retry, we don't believe in second opportunities
});
if (lag < 0) {
log("Dropping " + -lag + " magazines");
} else {
log("The slowest consumer has " + lag +
" magazines in total to be picked up");
}
});
// Blocks until all subscribers are done (this part could be improved
// with latches, but this way we keep it simple)
while (publisher.estimateMaximumLag() > 0) {
Thread.sleep(500L);
}
// Closes the publisher, calling the onComplete() method on every subscriber
publisher.close();
// give some time to the slowest consumer to wake up and notice
// that it's completed
Thread.sleep(Math.max(sleepTimeJack, sleepTimePete));
}
private static void log(final String message) {
log.info("===========> " + message);
}
}
magazineDeliveryExample
中我們?yōu)閮蓚€不同的subscribers設(shè)置了兩個不同的等待時間, 并且設(shè)置了緩存容量maxStorageInPO
步驟如下:
- 創(chuàng)建
SubmissionPublisher
并設(shè)置一個標準的線程池(每個subscriber擁有一個線程) - 創(chuàng)建兩個subscribers推溃,通過傳遞變量設(shè)置不同的消費時間和不同的名字昂利,以在log中方便區(qū)別
- 用20個數(shù)字的的stream數(shù)據(jù)集作為數(shù)據(jù)源以扮演“雜志打印機”,我們調(diào)用
offer
铁坎,并傳遞以下變量:
a. 提供給subscribers的數(shù)據(jù)蜂奸。
b. 第二和第三個變量是等待subscribers獲取雜志的最大時間。
c. 控制器以處理數(shù)據(jù)丟棄的情況硬萍。這里我們拋出了一個異常扩所,返回false意味著告訴publisher不需要重試。 - 當丟棄數(shù)據(jù)發(fā)生時朴乖,
offer
方法返回一個負數(shù)祖屏,否則將返回publisher的最大容量(以供最慢的subscriber消費),同時打印這個數(shù)字买羞。
5 . 最后我們添加了一個循環(huán)等待以防止主進程過早結(jié)束袁勺。這里一個是等待publisher清空緩存數(shù)據(jù),另外等待最慢的subscriber收到onComplete
回調(diào)信號(close()
調(diào)用之后)
main()
方法中使用不同參數(shù)調(diào)用以上邏輯三次畜普,以模擬之前介紹的三種不同真是場景期丰。
- 消費者消費速度很快,publisher緩存區(qū)不會發(fā)生問題吃挑。
- 其中一個消費者速度很慢钝荡,以至緩存被填滿,然而緩存區(qū)足夠大以容納所有所有數(shù)據(jù)舶衬,不會發(fā)生丟棄埠通。
- 其中一個消費者速度很慢,同時緩存區(qū)不夠大约炎,這是控制器被出發(fā)了多次植阴,subscriber沒有收到所有數(shù)據(jù)。
你還可以嘗試其他組合圾浅,比如設(shè)置MAX_SECONDS_TO_WAIT_WHEN_NO_SPACE
為很大的數(shù)字掠手,這時offer
表象將類似于submit
,或者可以嘗試將兩個消費者速度同時降低(會出現(xiàn)大量丟棄數(shù)據(jù))狸捕。