Java 9?Reactive Streams允許我們實(shí)現(xiàn)非阻塞異步流處理路幸。這是將響應(yīng)式編程模型應(yīng)用于核心java編程的重要一步俭驮。
如果您對(duì)響應(yīng)式編程不熟悉,請(qǐng)閱讀Reactive Manifesto并閱讀Reactive Streams的簡短說明赦拘。RxJava和Akka Streams一直是十分優(yōu)秀的響應(yīng)流實(shí)現(xiàn)庫∷喟荩現(xiàn)在java 9已經(jīng)通過java.util.concurrent.Flow?API 引入了響應(yīng)流支持。
Reactive Streams是關(guān)于流的異步處理真朗,因此應(yīng)該有一個(gè)發(fā)布者(Publisher)和一個(gè)訂閱者(Subscriber)此疹。發(fā)布者發(fā)布數(shù)據(jù)流,訂閱者使用數(shù)據(jù)遮婶。
有時(shí)我們必須在Publisher和Subscriber之間轉(zhuǎn)換數(shù)據(jù)蝗碎。處理器(Processor)是位于最終發(fā)布者和訂閱者之間的實(shí)體,用于轉(zhuǎn)換從發(fā)布者接收的數(shù)據(jù)旗扑,以便訂閱者能理解它蹦骑。我們可以擁有一系列(chain?)處理器。
從上面的圖中可以清楚地看出臀防,Processor既可以作為訂閱者也可以作為發(fā)布者眠菇。
Java 9 Flow API實(shí)現(xiàn)了Reactive Streams規(guī)范。Flow API是Iterator和Observer模式的組合袱衷。Iterator在pull模型上工作琼锋,用于應(yīng)用程序從源中拉取項(xiàng)目;而Observer在push模型上工作祟昭,并在item從源推送到應(yīng)用程序時(shí)作出反應(yīng)缕坎。
Java 9 Flow API訂閱者可以在訂閱發(fā)布者時(shí)請(qǐng)求N個(gè)項(xiàng)目。然后將項(xiàng)目從發(fā)布者推送到訂閱者篡悟,直到推送玩所有項(xiàng)目或遇到某些錯(cuò)誤谜叹。?
讓我們快速瀏覽一下Flow API類和接口匾寝。
java.util.concurrent.Flow:這是Flow API的主要類。該類封裝了Flow API的所有重要接口荷腊。這是一個(gè)final類艳悔,我們不能擴(kuò)展它。
java.util.concurrent.Flow.Publisher:這是一個(gè)功能接口女仰,每個(gè)發(fā)布者都必須實(shí)現(xiàn)它的subscribe方法猜年,并添加相關(guān)的訂閱者以接收消息。
java.util.concurrent.Flow.Subscriber:每個(gè)訂閱者都必須實(shí)現(xiàn)此接口疾忍。訂閱者中的方法以嚴(yán)格的順序進(jìn)行調(diào)用乔外。此接口有四種方法:?
onSubscribe:這是訂閱者訂閱了發(fā)布者后接收消息時(shí)調(diào)用的第一個(gè)方法。通常我們調(diào)用subscription.request開始從處理器(Processor)接收項(xiàng)目一罩。
onNext:當(dāng)從發(fā)布者收到項(xiàng)目時(shí)調(diào)用此方法杨幼,這是我們實(shí)現(xiàn)業(yè)務(wù)邏輯以處理流,然后從發(fā)布者請(qǐng)求更多數(shù)據(jù)的方法聂渊。
onError:當(dāng)發(fā)生不可恢復(fù)的錯(cuò)誤時(shí)調(diào)用此方法差购,我們可以在此方法中執(zhí)行清理操作,例如關(guān)閉數(shù)據(jù)庫連接汉嗽。
onComplete:這就像finally方法欲逃,并且在發(fā)布者沒有發(fā)布其他項(xiàng)目或發(fā)布者關(guān)閉時(shí)調(diào)用。我們可以用它來發(fā)送流成功處理的通知饼暑。
java.util.concurrent.Flow.Subscription:這用于在發(fā)布者和訂閱者之間創(chuàng)建異步非阻塞鏈接稳析。訂閱者調(diào)用其request方法來向發(fā)布者請(qǐng)求項(xiàng)目。它還有cancel取消訂閱的方法撵孤,即關(guān)閉發(fā)布者和訂閱者之間的鏈接。
java.util.concurrent.Flow.Processor:此接口同時(shí)擴(kuò)展了Publisher和Subscriber接口竭望,用于在發(fā)布者和訂閱者之間轉(zhuǎn)換消息邪码。
java.util.concurrent.SubmissionPublisher:一個(gè)Publisher實(shí)現(xiàn),它將提交的項(xiàng)目異步發(fā)送給當(dāng)前訂閱者咬清,直到它關(guān)閉為止闭专。它使用Executor框架,我們將在響應(yīng)流示例中使用該類來添加訂閱者旧烧,然后向其提交項(xiàng)目影钉。
讓我們從一個(gè)簡單的例子開始,我們將實(shí)現(xiàn)Flow API Subscriber接口并使用SubmissionPublisher來創(chuàng)建發(fā)布者和發(fā)送消息掘剪。
假設(shè)我們有一個(gè)Employee類平委,用于創(chuàng)建從發(fā)布者發(fā)送到訂閱者的流消息。
packagecom.journaldev.reactive.beans;publicclassEmployee{privateintid;privateString name;publicintgetId() {returnid;? ? }publicvoidsetId(intid) {this.id = id;? ? }publicStringgetName() {returnname;? ? }publicvoidsetName(String name) {this.name = name;? ? }publicEmployee(inti, String s) {this.id = i;this.name = s;? ? }publicEmployee() {? ? }@OverridepublicStringtoString() {return"[id="+id+",name="+name+"]";? ? }}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
我們還有一個(gè)實(shí)用程序類來為我們的示例創(chuàng)建一個(gè)員工列表夺谁。
packagecom.journaldev.reactive_streams;importjava.util.ArrayList;importjava.util.List;importcom.journaldev.reactive.beans.Employee;publicclassEmpHelper{publicstaticListgetEmps() {? ? ? ? Employee e1 =newEmployee(1,"Pankaj");? ? ? ? Employee e2 =newEmployee(2,"David");? ? ? ? Employee e3 =newEmployee(3,"Lisa");? ? ? ? Employee e4 =newEmployee(4,"Ram");? ? ? ? Employee e5 =newEmployee(5,"Anupam");? ? ? ? List emps =newArrayList<>();? ? ? ? emps.add(e1);? ? ? ? emps.add(e2);? ? ? ? emps.add(e3);? ? ? ? emps.add(e4);? ? ? ? emps.add(e5);returnemps;? ? }}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
packagecom.journaldev.reactive_streams;importjava.util.concurrent.Flow.Subscriber;importjava.util.concurrent.Flow.Subscription;importcom.journaldev.reactive.beans.Employee;publicclassMySubscriberimplementsSubscriber {privateSubscription subscription;privateintcounter =0;@OverridepublicvoidonSubscribe(Subscription subscription) {? ? ? ? System.out.println("Subscribed");this.subscription = subscription;this.subscription.request(1);//requesting data from publisherSystem.out.println("onSubscribe requested 1 item");? ? }@OverridepublicvoidonNext(Employee item) {? ? ? ? System.out.println("Processing Employee "+item);? ? ? ? counter++;this.subscription.request(1);? ? }@OverridepublicvoidonError(Throwable e) {? ? ? ? System.out.println("Some error happened");? ? ? ? e.printStackTrace();? ? }@OverridepublicvoidonComplete() {? ? ? ? System.out.println("All Processing Done");? ? }publicintgetCounter() {returncounter;? ? }}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
Subscription變量以保持引用廉赔,以便可以在onNext方法中進(jìn)行請(qǐng)求肉微。
counter變量以記錄處理的項(xiàng)目數(shù),請(qǐng)注意它的值在onNext方法中增加蜡塌。這將在我們的main方法中用于在結(jié)束主線程之前等待執(zhí)行完成碉纳。
在onSubscribe方法中調(diào)用訂閱請(qǐng)求以開始處理。另請(qǐng)注意馏艾,onNext在處理項(xiàng)目后再次調(diào)用方法劳曹,要求對(duì)下一個(gè)從發(fā)布者發(fā)布的項(xiàng)目進(jìn)行處理。
onError和onComplete在例子中沒有太多邏輯琅摩,但在實(shí)際情況中铁孵,它們應(yīng)該用于在發(fā)生錯(cuò)誤時(shí)執(zhí)行糾正措施或在處理成功完成時(shí)清理資源。
我們將使用SubmissionPublisherPublisher作為示例迫吐,讓我們看一下響應(yīng)流實(shí)現(xiàn)的測試程序库菲。
packagecom.journaldev.reactive_streams;importjava.util.List;importjava.util.concurrent.SubmissionPublisher;importcom.journaldev.reactive.beans.Employee;publicclassMyReactiveApp{publicstaticvoidmain(String args[])throwsInterruptedException {// Create PublisherSubmissionPublisher publisher =newSubmissionPublisher<>();// Register SubscriberMySubscriber subs =newMySubscriber();? ? ? ? publisher.subscribe(subs);? ? ? ? List emps = EmpHelper.getEmps();// Publish itemsSystem.out.println("Publishing Items to Subscriber");? ? ? ? emps.stream().forEach(i -> publisher.submit(i));// logic to wait till processing of all messages are overwhile(emps.size() != subs.getCounter()) {? ? ? ? ? ? Thread.sleep(10);? ? ? ? }// close the Publisherpublisher.close();? ? ? ? System.out.println("Exiting the app");? ? }}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
在上述代碼中,最重要的部分是發(fā)布者subscribe和submit方法的調(diào)用志膀。我們應(yīng)該始終關(guān)閉發(fā)布者以避免任何內(nèi)存泄漏熙宇。
執(zhí)行上述程序時(shí),我們將得到以下輸出溉浙。
SubscribedPublishing ItemstoSubscriberonSubscribe requested1itemProcessing Employee [id=1,name=Pankaj]Processing Employee [id=2,name=David]Processing Employee [id=3,name=Lisa]Processing Employee [id=4,name=Ram]Processing Employee [id=5,name=Anupam]ExitingtheappAll Processing Done
1
2
3
4
5
6
7
8
9
10
處理器用于在發(fā)布者和訂閱者之間轉(zhuǎn)換消息烫止。假設(shè)我們有另一個(gè)用戶希望處理不同類型的消息。假設(shè)這個(gè)新的消息類型是Freelancer戳稽。
packagecom.journaldev.reactive.beans;publicclassFreelancerextendsEmployee{privateintfid;publicintgetFid() {returnfid;? ? }publicvoidsetFid(intfid) {this.fid = fid;? ? }publicFreelancer(intid,intfid, String name) {super(id, name);this.fid = fid;? ? }@OverridepublicStringtoString() {return"[id="+super.getId()+",name="+super.getName()+",fid="+fid+"]";? ? }}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
我們有一個(gè)新訂閱者使用Freelancer流數(shù)據(jù)馆蠕。
packagecom.journaldev.reactive_streams;importjava.util.concurrent.Flow.Subscriber;importjava.util.concurrent.Flow.Subscription;importcom.journaldev.reactive.beans.Freelancer;publicclassMyFreelancerSubscriberimplementsSubscriber {privateSubscription subscription;privateintcounter =0;@OverridepublicvoidonSubscribe(Subscription subscription) {? ? ? ? System.out.println("Subscribed for Freelancer");this.subscription = subscription;this.subscription.request(1);//requesting data from publisherSystem.out.println("onSubscribe requested 1 item for Freelancer");? ? }@OverridepublicvoidonNext(Freelancer item) {? ? ? ? System.out.println("Processing Freelancer "+item);? ? ? ? counter++;this.subscription.request(1);? ? }@OverridepublicvoidonError(Throwable e) {? ? ? ? System.out.println("Some error happened in MyFreelancerSubscriber");? ? ? ? e.printStackTrace();? ? }@OverridepublicvoidonComplete() {? ? ? ? System.out.println("All Processing Done for MyFreelancerSubscriber");? ? }publicintgetCounter() {returncounter;? ? }}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
代碼重要的部分是實(shí)現(xiàn)Processor接口。由于我們想要使用它SubmissionPublisher惊奇,我們會(huì)擴(kuò)展它并在適合的地方使用它互躬。
packagecom.journaldev.reactive_streams;importjava.util.concurrent.Flow.Processor;importjava.util.concurrent.Flow.Subscription;importjava.util.concurrent.SubmissionPublisher;importjava.util.function.Function;importcom.journaldev.reactive.beans.Employee;importcom.journaldev.reactive.beans.Freelancer;publicclassMyProcessorextendsSubmissionPublisherimplementsProcessor {privateSubscription subscription;privateFunction function;publicMyProcessor(Function function) {super();this.function = function;? ? ? ? }@OverridepublicvoidonSubscribe(Subscription subscription) {this.subscription = subscription;? ? ? ? subscription.request(1);? ? }@OverridepublicvoidonNext(Employee emp) {? ? ? ? submit((Freelancer) function.apply(emp));? ? ? ? ? subscription.request(1);? ? ? }@OverridepublicvoidonError(Throwable e) {? ? ? ? e.printStackTrace();? ? }@OverridepublicvoidonComplete() {? ? ? ? System.out.println("Done");? ? }}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
Function?將用于將Employee對(duì)象轉(zhuǎn)換為Freelancer對(duì)象。
我們將傳入的Employee消息轉(zhuǎn)換為onNext方法中的Freelancer消息颂郎,然后使用SubmissionPublisher?submit方法將其發(fā)送給訂閱者吼渡。
由于Processor既是訂閱者又是發(fā)布者,我們可以在終端發(fā)布者和訂閱者之間創(chuàng)建一系列處理器乓序。
packagecom.journaldev.reactive_streams;importjava.util.List;importjava.util.concurrent.SubmissionPublisher;importcom.journaldev.reactive.beans.Employee;importcom.journaldev.reactive.beans.Freelancer;publicclassMyReactiveAppWithProcessor{publicstaticvoidmain(String[] args)throwsInterruptedException {// Create End PublisherSubmissionPublisher publisher =newSubmissionPublisher<>();// Create ProcessorMyProcessor transformProcessor =newMyProcessor(s -> {returnnewFreelancer(s.getId(), s.getId() +100, s.getName());? ? ? ? });//Create End SubscriberMyFreelancerSubscriber subs =newMyFreelancerSubscriber();//Create chain of publisher, processor and subscriberpublisher.subscribe(transformProcessor);// publisher to processortransformProcessor.subscribe(subs);// processor to subscriberList emps = EmpHelper.getEmps();// Publish itemsSystem.out.println("Publishing Items to Subscriber");? ? ? ? emps.stream().forEach(i -> publisher.submit(i));// Logic to wait for messages processing to finishwhile(emps.size() != subs.getCounter()) {? ? ? ? ? ? Thread.sleep(10);? ? ? ? }// Closing publisherspublisher.close();? ? ? ? transformProcessor.close();? ? ? ? System.out.println("Exiting the app");? ? }}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
閱讀程序中的注釋以正確理解它寺酪,最重要的變化是發(fā)布者 - 處理器 - 訂閱者鏈的創(chuàng)建。執(zhí)行上述程序時(shí)替劈,我們將得到以下輸出寄雀。
SubscribedforFreelancerPublishing ItemstoSubscriberonSubscribe requested1itemforFreelancerProcessing Freelancer [id=1,name=Pankaj,fid=101]Processing Freelancer [id=2,name=David,fid=102]Processing Freelancer [id=3,name=Lisa,fid=103]Processing Freelancer [id=4,name=Ram,fid=104]Processing Freelancer [id=5,name=Anupam,fid=105]ExitingtheappAll Processing DoneforMyFreelancerSubscriberDone
1
2
3
4
5
6
7
8
9
10
11
我們可以使用Subscription?cancel方法停止在訂閱者中接收消息。
以下是一個(gè)示例代碼陨献,其中訂閱者只消費(fèi)3條消息盒犹,然后取消訂閱。
@OverridepublicvoidonNext(Employee item) {? ? System.out.println("Processing Employee "+item);? ? counter++;if(counter==3) {this.subscription.cancel();return;? ? }this.subscription.request(1);}
1
2
3
4
5
6
7
8
9
10
請(qǐng)注意,在這種情況下阿趁,我們?cè)谔幚硭邢⒅巴V怪骶€程的邏輯將進(jìn)入無限循環(huán)膜蛔。我們可以為此場景添加一些額外的邏輯,如果訂閱者已停止處理或取消訂閱脖阵,就使用一些全局變量來標(biāo)志該狀態(tài)皂股。
當(dāng)發(fā)布者以比訂閱者消費(fèi)更快的速度生成消息時(shí),會(huì)產(chǎn)生背壓命黔。Flow API不提供任何關(guān)于背壓或處理它的信號(hào)的機(jī)制呜呐。但我們可以設(shè)計(jì)自己的策略來處理它,例如微調(diào)用戶或降低信息產(chǎn)生率悍募。您可以閱讀RxJava deals with Back Pressure蘑辑。
Java 9 Flow API是響應(yīng)式編程和創(chuàng)建異步非阻塞應(yīng)用程序的良好舉措。但是坠宴,只有在所有系統(tǒng)API都支持它時(shí)洋魂,才能創(chuàng)建真正的響應(yīng)式應(yīng)用程序。
原文地址:Java 9 Reactive Streams?written by Pankaj?
完整代碼:Github