Reactive Streams:一種支持背壓的異步數(shù)據(jù)流處理標(biāo)準(zhǔn),主流實(shí)現(xiàn)有RxJava和Reactor颈墅,Spring WebFlux默認(rèn)集成的是Reactor蜡镶。
Reactive Streams主要解決背壓(back-pressure)問(wèn)題。當(dāng)傳入的任務(wù)速率大于系統(tǒng)處理能力時(shí)恤筛,數(shù)據(jù)處理將會(huì)對(duì)未處理數(shù)據(jù)產(chǎn)生一個(gè)緩沖區(qū)官还。
背壓依我的理解來(lái)說(shuō),是指訂閱者能和發(fā)布者交互(通過(guò)代碼里面的調(diào)用request和cancel方法交互)毒坛,可以調(diào)節(jié)發(fā)布者發(fā)布數(shù)據(jù)的速率望伦,解決把訂閱者壓垮的問(wèn)題林说。關(guān)鍵在于上面例子里面的訂閱關(guān)系Subscription這個(gè)接口,他有request和cancel 2個(gè)方法屯伞,用于通知發(fā)布者需要數(shù)據(jù)和通知發(fā)布者不再接受數(shù)據(jù)腿箩。
我們重點(diǎn)理解背壓在jdk9里面是如何實(shí)現(xiàn)的。關(guān)鍵在于發(fā)布者Publisher的實(shí)現(xiàn)類(lèi)SubmissionPublisher的submit方法是block方法劣摇。訂閱者會(huì)有一個(gè)緩沖池珠移,默認(rèn)為Flow.defaultBufferSize() = 256。當(dāng)訂閱者的緩沖池滿了之后末融,發(fā)布者調(diào)用submit方法發(fā)布數(shù)據(jù)就會(huì)被阻塞钧惧,發(fā)布者就會(huì)停(慢)下來(lái);訂閱者消費(fèi)了數(shù)據(jù)之后(調(diào)用Subscription.request方法)勾习,緩沖池有位置了浓瞪,submit方法就會(huì)繼續(xù)執(zhí)行下去,就是通過(guò)這樣的機(jī)制语卤,實(shí)現(xiàn)了調(diào)節(jié)發(fā)布者發(fā)布數(shù)據(jù)的速率追逮,消費(fèi)得快,生成就快粹舵,消費(fèi)得慢钮孵,發(fā)布者就會(huì)被阻塞,當(dāng)然就會(huì)慢下來(lái)了眼滤。
Reactive Streams由4個(gè)Java接口構(gòu)成:
- 處理器(Processor)
- 發(fā)布商(Publisher)
- 訂閱用戶(Subscriber)
- 訂閱(Subscription)
Flow類(lèi)允許相互關(guān)聯(lián)的接口和靜態(tài)方法來(lái)建立流控制組件巴席,其中發(fā)布者產(chǎn)生由一個(gè)或多個(gè)訂閱者消費(fèi)的項(xiàng)目,每個(gè)訂閱者由訂閱管理诅需。
Reactive Streams構(gòu)建在java.util.concurrent.Flow容器對(duì)象下漾唉,開(kāi)發(fā)者可以在這里找到Flow.Publisher,一個(gè)用作lambda表達(dá)式或方法引用的賦值目標(biāo)功能接口堰塌。該接口可以讓開(kāi)發(fā)者更容易生成Flow.Subscription元素赵刑,并且將它們鏈接在一起琐旁。
另一個(gè)元素Flow.Subscriber缭保,是異步工作機(jī)制喻犁,由請(qǐng)求觸發(fā)钞啸。它可以從Flow.Subscription請(qǐng)求多個(gè)元素,開(kāi)發(fā)者還可以根據(jù)需要自定義緩沖區(qū)大小双饥。
背壓示例代碼1
/**
* reactive stream
* 背壓
*/
@Slf4j
public class ReactiveExample1 {
public static void main(String[] args) throws InterruptedException {
//1.發(fā)布者
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
//2. 訂閱者
Flow.Subscriber subscriber = new Flow.Subscriber() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.info("onSubscribe");
//請(qǐng)求數(shù)據(jù)
subscription.request(1);
this.subscription = subscription;
}
/**
* 處理數(shù)據(jù)
* @param item
*/
@Override
public void onNext(Object item) {
log.info("item:{}", item);
log.info("onNext");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.subscription.request(1);
}
/**
* 數(shù)據(jù)處理異常
* @param throwable
*/
@Override
public void onError(Throwable throwable) {
log.info("onError");
}
/**
* 數(shù)據(jù)完成
*/
@Override
public void onComplete() {
log.info("onComplete");
}
};
// 3. 建立關(guān)系
publisher.subscribe(subscriber);
// 4. 生產(chǎn)數(shù)據(jù)
for (int i = 0; i < 500; i++) {
publisher.submit("test" + i);
log.info("submit:{}","test" + i);
}
// 5 .結(jié)束關(guān)閉
publisher.close();
TimeUnit.SECONDS.sleep(10);
}
}
11:28:53.864 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - onSubscribe
11:28:53.878 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - item:test0
11:28:53.879 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test0
11:28:53.881 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - onNext
11:28:53.881 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test1
11:28:53.881 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test2
.
.
.
.
.
.
11:28:53.896 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test252
11:28:53.896 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test253
11:28:53.896 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test254
11:28:53.897 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test255
11:28:53.897 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test256
11:28:55.882 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test257
11:28:55.882 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - item:test1
11:28:55.883 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - onNext
11:28:57.884 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - item:test2
11:28:57.884 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test258
11:28:57.884 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - onNext
11:28:59.885 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - item:test3
11:28:59.885 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test259
11:28:59.885 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - onNext
11:29:01.886 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - item:test4
11:29:01.886 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - onNext
11:29:01.886 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test260
11:29:03.886 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - item:test5
11:29:03.886 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test261
11:29:03.886 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - onNext
11:29:05.887 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - item:test6
11:29:05.887 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test262
11:29:05.887 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - onNext
11:29:07.888 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - item:test7
11:29:07.888 [main] INFO cn.wyj.learn.reactive.ReactiveExample1 - submit:test263
11:29:07.888 [ForkJoinPool.commonPool-worker-3] INFO cn.wyj.learn.reactive.ReactiveExample1 - onNext
根據(jù)運(yùn)行結(jié)果我們可以看到任務(wù)提交256 后就不能直接提交了, 消費(fèi)者消費(fèi)完一條消息后又可以提交一條數(shù)據(jù),這些就起到了流控的作用
傳統(tǒng)的發(fā)布訂閱模式, 生產(chǎn)者并不能夠根據(jù)消費(fèi)者調(diào)節(jié)生成速率
@Slf4j
public class NormalPublisherSubscriber {
public static void main(String[] args) {
BlockingQueue<String> queue = new LinkedBlockingDeque();
ExecutorService executorService = Executors.newCachedThreadPool();
//訂閱者,消費(fèi)者
executorService.submit(() -> {
try {
while (true) {
String take = queue.take();
log.info("Received :{}", take);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//發(fā)布者,生產(chǎn)者
executorService.submit(() -> {
try {
queue.put("test1");
queue.put("test2");
queue.put("test3");
queue.put("test4");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.shutdown();
}
}
參考1 https://www.imooc.com/article/27181
參考2 https://blog.csdn.net/houzhizhen/article/details/78195210