目錄
- [前言]
- [一棺克、接口]
- [1、Publisher]
- [2咒吐、Subscriber]
- [3野建、Subscription]
- [4、Processor]
- [二恬叹、使用示例]
- [三候生、總結(jié)]
前言
jdk 9 中新增了 Flow 類,是Reactive Stream ([響應(yīng)式]流/反應(yīng)流) 的實(shí)現(xiàn)妄呕,Reactive Stream是一套基于發(fā)布/訂閱模式的數(shù)據(jù)處理規(guī)范陶舞,能夠以非阻塞背壓方式實(shí)現(xiàn)數(shù)據(jù)的異步流。
一绪励、接口
1肿孵、Publisher
發(fā)布者,進(jìn)行數(shù)據(jù)發(fā)布
@FunctionalInterface
public static interface Publisher<T> {
//發(fā)布者與訂閱者建立訂閱關(guān)系
public void subscribe(Subscriber<? super T> subscriber);
}
2疏魏、Subscriber
訂閱者停做,訂閱數(shù)據(jù)
public static interface Subscriber<T> {
//發(fā)布者通知訂閱者開始發(fā)送數(shù)據(jù)
public void onSubscribe(Subscription subscription);
//訂閱者接收數(shù)據(jù)
public void onNext(T item);
//出現(xiàn)異常
public void onError(Throwable throwable);
//數(shù)據(jù)處理完成
public void onComplete();
}
3、Subscription
數(shù)據(jù)訂閱大莫,請求數(shù)據(jù)蛉腌、取消訂閱
public static interface Subscription {
public void request(long n);
public void cancel();
}
4、Processor
數(shù)據(jù)的中間操作只厘,既是Subscriber也是Publisher
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
二烙丛、使用示例
1、數(shù)據(jù)訂閱者
public class FlowSubscriber implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("Subscriber 建立訂閱關(guān)系");
//發(fā)布訂閱關(guān)系
this.subscription = subscription;
//請求一個數(shù)據(jù)
this.subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("訂閱者接收消息: " + item);
//接收數(shù)據(jù)后 再請求一個數(shù)據(jù)
this.subscription.request(1);
//不再接收數(shù)據(jù)羔味,調(diào)用cancel
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
System.out.println("訂閱者數(shù)據(jù)接收出現(xiàn)異常河咽,error :" + throwable.getMessage());
this.subscription.cancel();
}
@Override
public void onComplete() {
System.out.println("訂閱者數(shù)據(jù)處理完成");
}
}
2、數(shù)據(jù)中間處理
public class FlowProcessor extends SubmissionPublisher<String> implements Flow.Processor<String,String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("Processor 建立訂閱關(guān)系");
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("Processor 接收數(shù)據(jù): " + item);
item += " Processor 處理后的消息";
this.submit(item);
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("Processor 數(shù)據(jù)接收出現(xiàn)異常赋元,error :" + throwable.getMessage());
this.subscription.cancel();
}
@Override
public void onComplete() {
System.out.println("Processor 數(shù)據(jù)處理完成");
}
3忘蟹、測試
public class FlowTest {
public static void main(String[] args) {
//數(shù)據(jù)發(fā)布者
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
//中間處理者
FlowProcessor processor = new FlowProcessor();
//數(shù)據(jù)訂閱者
Subscriber<String> subscriber = new FlowSubscriber();
//發(fā)布者與中間處理者建立關(guān)系
publisher.subscribe(processor);
//中間處理者與訂閱者建立關(guān)系
processor.subscribe(subscriber);
//發(fā)布者開始發(fā)布數(shù)據(jù)
for (int i = 0; i < 10; i++) {
String msg = "hello flow: " + i;
System.out.println("發(fā)布者發(fā)送數(shù)據(jù)" + i);
publisher.submit(msg);
}
//關(guān)閉發(fā)布者
publisher.close();
//休眠等待,防止主線程退出
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
結(jié)果:
Subscriber 建立訂閱關(guān)系
Processor 建立訂閱關(guān)系
發(fā)布者發(fā)送數(shù)據(jù)0
發(fā)布者發(fā)送數(shù)據(jù)1
發(fā)布者發(fā)送數(shù)據(jù)2
發(fā)布者發(fā)送數(shù)據(jù)3
發(fā)布者發(fā)送數(shù)據(jù)4
發(fā)布者發(fā)送數(shù)據(jù)5
發(fā)布者發(fā)送數(shù)據(jù)6
發(fā)布者發(fā)送數(shù)據(jù)7
發(fā)布者發(fā)送數(shù)據(jù)8
發(fā)布者發(fā)送數(shù)據(jù)9
Processor 接收數(shù)據(jù): hello flow: 0
Processor 接收數(shù)據(jù): hello flow: 1
Processor 接收數(shù)據(jù): hello flow: 2
Processor 接收數(shù)據(jù): hello flow: 3
Processor 接收數(shù)據(jù): hello flow: 4
Processor 接收數(shù)據(jù): hello flow: 5
Processor 接收數(shù)據(jù): hello flow: 6
訂閱者接收消息: hello flow: 0 Processor 處理后的消息
Processor 接收數(shù)據(jù): hello flow: 7
訂閱者接收消息: hello flow: 1 Processor 處理后的消息
訂閱者接收消息: hello flow: 2 Processor 處理后的消息
Processor 接收數(shù)據(jù): hello flow: 8
訂閱者接收消息: hello flow: 3 Processor 處理后的消息
Processor 接收數(shù)據(jù): hello flow: 9
訂閱者接收消息: hello flow: 4 Processor 處理后的消息
Processor 數(shù)據(jù)處理完成
訂閱者接收消息: hello flow: 5 Processor 處理后的消息
訂閱者接收消息: hello flow: 6 Processor 處理后的消息
訂閱者接收消息: hello flow: 7 Processor 處理后的消息
訂閱者接收消息: hello flow: 8 Processor 處理后的消息
訂閱者接收消息: hello flow: 9 Processor 處理后的消息
三搁凸、總結(jié)
1媚值、發(fā)布者 Publisher 將消息發(fā)給訂閱關(guān)系 Subscription;
2护糖、訂閱關(guān)系 Subscription 將消息發(fā)給訂閱者 Subscriber褥芒;
3、中間操作 Processor 作為 發(fā)布者 Publisher 的 訂閱者嫡良,又作為訂閱者 Subscriber 的發(fā)布者