IntermediateDataset
IntermediateDataset
是在 JobGraph
中對(duì)中間結(jié)果的抽象。我們知道侧啼,JobGraph
是對(duì) StreamGraph
進(jìn)一步進(jìn)行優(yōu)化后得到的邏輯圖趟据,它盡量把可以 chain 到一起 operator 合并為一個(gè) JobVertex
厕氨,而 IntermediateDataset
就表示一個(gè) JobVertex
的輸出結(jié)果盅视。JobVertex
的輸入是 JobEdge
弱恒,而 JobEdge
可以看作是 IntermediateDataset
的消費(fèi)者杀赢。一個(gè) JobVertex
也可能產(chǎn)生多個(gè) IntermediateDataset
烘跺。需要說明的一點(diǎn)是,目前一個(gè) IntermediateDataset
實(shí)際上只會(huì)有一個(gè) JobEdge
作為消費(fèi)者脂崔,也就是說滤淳,一個(gè) JobVertex
的下游有多少 JobVertex
需要依賴當(dāng)前節(jié)點(diǎn)的數(shù)據(jù),那么當(dāng)前節(jié)點(diǎn)就有對(duì)應(yīng)數(shù)量的 IntermediateDataset
砌左。
這時(shí)候還沒有向集群去提交任務(wù)脖咐,在 Client 端會(huì)將 StreamGraph 生成 JobGraph伤柄,JobGraph 就是做為向集群提交的最基本的單元。在生成 JobGrap 的時(shí)候會(huì)做一些優(yōu)化文搂,將一些沒有 Shuffle 機(jī)制的節(jié)點(diǎn)進(jìn)行合并适刀。有了 JobGraph 后就會(huì)向集群進(jìn)行提交,進(jìn)入運(yùn)行階段煤蹭。
Jobvertex->Intermedia Dateset->JobEage
然后我們概念化這樣一張物理執(zhí)行圖笔喉,可以看到每個(gè) Task 在接收數(shù)據(jù)時(shí)都會(huì)通過這樣一個(gè)InputGate 可以認(rèn)為是負(fù)責(zé)接收數(shù)據(jù)的,再往前有這樣一個(gè) ResultPartition 負(fù)責(zé)發(fā)送數(shù)據(jù)硝皂,在 ResultPartition 又會(huì)去做分區(qū)跟下游的 Task 保持一致常挚,就形成了 ResultSubPartition 和 InputChannel 的對(duì)應(yīng)關(guān)系。這就是從邏輯層上來看的網(wǎng)絡(luò)傳輸?shù)耐ǖ阑铮谶@么一個(gè)概念我們可以將反壓的問題進(jìn)行拆解奄毡。
ExecutionGraph -> Intermedia ATA
task ->ResultPartion->InputGate
ResultSubParition-->InputChannel 一一對(duì)應(yīng)
問題拆解:反壓傳播兩個(gè)階段
假設(shè)最下游的 Task (Sink)出現(xiàn)了問題,處理速度降了下來這時(shí)候是如何將這個(gè)壓力反向傳播回去呢贝或?這時(shí)候就分為兩種情況:
跨 TaskManager 吼过,反壓如何從 InputGate 傳播到 ResultPartition
TaskManager 內(nèi),反壓如何從 ResultPartition 傳播到 InputGate
TaskManager 只有一個(gè) Network BufferPool 被所有的 Task 共享,初始化時(shí)會(huì)從 Off-heap Memory 中申請(qǐng)內(nèi)存咪奖,申請(qǐng)后內(nèi)存管理通過 Network BufferPool 來進(jìn)行盗忱,不需要依賴 JVM GC 的機(jī)制去釋放。有了 Network BufferPool 之后可以為每一個(gè) ResultSubPartition 創(chuàng)建 Local BufferPool
反壓過程:
- 首先InputChannel 的 Buffer 被用盡于
- 會(huì)向 Local BufferPool 申請(qǐng)新的 Buffer
- 如果 Local BufferPool沒有可用的,或是 Local BufferPool 的最大可用 Buffer 到了上限 無法向 Network BufferPool 申請(qǐng)羊赵,沒有辦法去讀取新的數(shù)據(jù)
- 這時(shí) Netty AutoRead 就會(huì)被禁掉趟佃,Netty 就不會(huì)從 Socket 的 Buffer 中讀取數(shù)據(jù)了。
- 導(dǎo)致消費(fèi)端的Socket的buffer也被用盡,這時(shí)就會(huì)將 Window = 0 發(fā)送給發(fā)送端(前文提到的 TCP 滑動(dòng)窗口的機(jī)制)昧捷。
- 這時(shí)發(fā)送端的 Socket 就會(huì)停止發(fā)送闲昭。導(dǎo)致 Socket 的 Buffer 也被用盡,發(fā)送端的Netty 檢測(cè)到 Socket 無法寫了之后就會(huì)停止向 Socket 寫數(shù)據(jù)靡挥。
- Netty 停止寫了之后序矩,所有的數(shù)據(jù)就會(huì)阻塞在 Netty 的 Buffer 當(dāng)中了,但是 Netty 的 Buffer 是無界的芹血,可以通過 Netty 的水位機(jī)制中的 high watermark 控制他的上界贮泞。當(dāng)超過了 high watermark,Netty 就會(huì)將其 channel 置為不可寫幔烛,ResultSubPartition 在寫之前都會(huì)檢測(cè) Netty 是否可寫啃擦,發(fā)現(xiàn)不可寫就會(huì)停止向 Netty 寫數(shù)據(jù)。
8.這時(shí)候所有的壓力都來到了 ResultSubPartition饿悬,和接收端一樣他會(huì)不斷的向 Local BufferPool 和 Network BufferPool 申請(qǐng)內(nèi)存直至用盡buffer
9.record write寫阻塞,Operator 就會(huì)停止寫數(shù)據(jù),達(dá)到跨 TaskManager 的反壓令蛉。
-
跨 TaskManager 數(shù)據(jù)傳輸
-
TaskManager 內(nèi)反壓過程
下游的 TaskManager 反壓導(dǎo)致本 TaskManager 的 ResultSubPartition 無法繼續(xù)寫入數(shù)據(jù),于是 Record Writer 的寫也被阻塞住了,因?yàn)?Operator 需要有輸入才能有計(jì)算后的輸出珠叔,輸入跟輸出都是在同一線程執(zhí)行蝎宇, Record Writer 阻塞了,Record Reader 也停止從 InputChannel 讀數(shù)據(jù)祷安,這時(shí)上游的 TaskManager 還在不斷地發(fā)送數(shù)據(jù)姥芥,最終將這個(gè) TaskManager 的 Buffer 耗盡。具體流程可以參考下圖汇鞭,這就是 TaskManager 內(nèi)的反壓過程凉唐。
TCP-based 反壓的弊端
在一個(gè) TaskManager 中可能要執(zhí)行多個(gè) Task,如果多個(gè) Task 的數(shù)據(jù)最終都要傳輸?shù)?strong>下游的同一個(gè) TaskManager 就會(huì)復(fù)用同一個(gè) Socket 進(jìn)行傳輸霍骄,這個(gè)時(shí)候如果單個(gè) Task 產(chǎn)生反壓台囱,就會(huì)導(dǎo)致復(fù)用的 Socket 阻塞,其余的 Task 也無法使用傳輸读整,checkpoint barrier 也無法發(fā)出導(dǎo)致下游執(zhí)行 checkpoint 的延遲增大簿训。
依賴最底層的 TCP 去做流控,會(huì)導(dǎo)致反壓傳播路徑太長米间,導(dǎo)致生效的延遲比較大强品。
Credit-based 反壓(SINCE FLINK1.5)
這個(gè)機(jī)制簡單的理解起來就是在 Flink 層面實(shí)現(xiàn)類似 TCP 流控的反壓機(jī)制來解決上述的弊端,Credit 可以類比為 TCP 的 Window 機(jī)制车伞。
反壓過程
Flink 層面實(shí)現(xiàn)反壓機(jī)制择懂,就是每一次 ResultSubPartition 向 InputChannel 發(fā)送消息的時(shí)候都會(huì)發(fā)送一個(gè) backlog size 告訴下游準(zhǔn)備發(fā)送多少消息,下游就會(huì)去計(jì)算有多少的 Buffer 去接收消息另玖,算完之后如果有充足的 Buffer 就會(huì)返還給上游一個(gè) Credit 告知他可以發(fā)送消息(圖上兩個(gè) ResultSubPartition 和 InputChannel 之間是虛線是因?yàn)樽罱K還是要通過 Netty 和 Socket 去通信)
過了一段時(shí)間后,由于上游的發(fā)送速率要大于下游的接受速率表伦,下游的 TaskManager 的 Buffer 已經(jīng)到達(dá)了申請(qǐng)上限谦去,這時(shí)候下游就會(huì)向上游返回 Credit = 0,ResultSubPartition 接收到之后就不會(huì)向 Netty 去傳輸數(shù)據(jù)蹦哼,上游 TaskManager 的 Buffer 也很快耗盡鳄哭,達(dá)到反壓的效果,這樣在 ResultSubPartition 層就能感知到反壓纲熏,不用通過 Socket 和 Netty 一層層地向上反饋妆丘,降低了反壓生效的延遲。同時(shí)也不會(huì)將 Socket 去阻塞局劲,解決了由于一個(gè) Task 反壓導(dǎo)致 TaskManager 和 TaskManager 之間的 Socket 阻塞的問題
總結(jié)
- 網(wǎng)絡(luò)流控是為了在上下游速度不匹配的情況下勺拣,防止下游出現(xiàn)過載
- 網(wǎng)絡(luò)流控有靜態(tài)限速和動(dòng)態(tài)反壓兩種手段
- Flink 1.5 之前是基于 TCP 流控 + bounded buffer 實(shí)現(xiàn)反壓
- Flink 1.5 之后實(shí)現(xiàn)了自己托管的 credit – based 流控機(jī)制,在應(yīng)用層模擬 TCP 的流控機(jī)制
有了動(dòng)態(tài)反壓鱼填,靜態(tài)限速是不是完全沒有作用了药有?
我們流計(jì)算的結(jié)果最終是要輸出到一個(gè)外部的存儲(chǔ)(Storage),外部數(shù)據(jù)存儲(chǔ)到 Sink 端的反壓是不一定會(huì)觸發(fā)的,這要取決于外部存儲(chǔ)的實(shí)現(xiàn)愤惰,像 Kafka 這樣是實(shí)現(xiàn)了限流限速的消息中間件可以通過協(xié)議將反壓反饋給 Sink 端苇经,但是像 ES 無法將反壓進(jìn)行傳播反饋給 Sink 端*(極端情況導(dǎo)致job failover,es端socket發(fā)生 time out),這種情況下為了防止外部存儲(chǔ)在大的數(shù)據(jù)量下被打爆宦言,我們就可以通過靜態(tài)限速的方式在 Source 端去做限流扇单。
網(wǎng)絡(luò)上傳輸?shù)臄?shù)據(jù)會(huì)寫到 Task 的 InputGate(IG) 中,經(jīng)過 Task 的處理后奠旺,再由 Task 寫到 ResultPartition(RS) 中令花。每個(gè) Task 都包括了輸入和輸入,輸入和輸出的數(shù)據(jù)存在 Buffer
中(都是字節(jié)數(shù)據(jù))凉倚。Buffer
是 MemorySegment 的包裝
類兼都。
根據(jù)配置,F(xiàn)link 會(huì)在 NetworkBufferPool
中生成一定數(shù)量(默認(rèn)2048)的內(nèi)存塊 MemorySegment
稽寒,內(nèi)存塊的總數(shù)量就代表了網(wǎng)絡(luò)傳輸中所有可用的內(nèi)存扮碧。NetworkEnvironment 和 NetworkBufferPool 是 Task 之間共享的,每個(gè) TM 只會(huì)實(shí)例化一個(gè)
Task 線程啟動(dòng)時(shí)杏糙,會(huì)為 Task 的 InputGate(IG)和 ResultPartition(RP) 分別創(chuàng)建一個(gè) LocalBufferPool(緩沖池)并設(shè)置可申請(qǐng)的 MemorySegment(內(nèi)存塊)數(shù)量慎王。IG 對(duì)應(yīng)的緩沖池初始的內(nèi)存塊數(shù)量與 IG 中 InputChannel 數(shù)量一致,RP 對(duì)應(yīng)的緩沖池初始的內(nèi)存塊數(shù)量與 RP 中的 ResultSubpartition 數(shù)量一致宏侍。不過赖淤,每當(dāng)創(chuàng)建或銷毀緩沖池時(shí),NetworkBufferPool 會(huì)計(jì)算剩余空閑的內(nèi)存塊數(shù)量谅河,并平均分配給已創(chuàng)建的緩沖池咱旱。
在 Task 線程執(zhí)行過程中,當(dāng) Netty 接收端收到數(shù)據(jù)時(shí)绷耍,為了將 Netty 中的數(shù)據(jù)拷貝到 Task 中吐限,InputChannel(實(shí)際是 RemoteInputChannel)會(huì)向其對(duì)應(yīng)的緩沖池申請(qǐng)內(nèi)存塊(上圖中的①)。如果緩沖池中也沒有可用的內(nèi)存塊且已申請(qǐng)的數(shù)量還沒到池子上限褂始,則會(huì)向 NetworkBufferPool 申請(qǐng)內(nèi)存塊(上圖中的②)并交給 InputChannel 填上數(shù)據(jù)(上圖中的③和④)诸典。
- 當(dāng)一個(gè)內(nèi)存塊被消費(fèi)完成之后(在輸入端是指內(nèi)存塊中的字節(jié)被反序列化成對(duì)象了,在輸出端是指內(nèi)存塊中的字節(jié)寫入到 Netty Channel 了)崎苗,會(huì)調(diào)用
Buffer.recycle()
方法狐粱,會(huì)將內(nèi)存塊還給 LocalBufferPool (上圖中的⑤)。如果LocalBufferPool中當(dāng)前申請(qǐng)的數(shù)量超過了池子容量(由于上文提到的動(dòng)態(tài)容量胆数,由于新注冊(cè)的 Task 導(dǎo)致該池子容量變屑◎摺),則LocalBufferPool會(huì)將該內(nèi)存塊回收給 NetworkBufferPool(上圖中的⑥)幅慌。如果沒超過池子容量宋欺,則會(huì)繼續(xù)留在池子中,減少反復(fù)申請(qǐng)的開銷
反壓的過程
這里我們需要注意兩個(gè)場(chǎng)景:
- 本地傳輸:如果 Task 1 和 Task 2 運(yùn)行在同一個(gè) worker 節(jié)點(diǎn)(TaskManager),該 buffer 可以直接交給下一個(gè) Task齿诞。一旦 Task 2 消費(fèi)了該 buffer酸休,則該 buffer 會(huì)被緩沖池1回收。如果 Task 2 的速度比 1 慢祷杈,那么 buffer 回收的速度就會(huì)趕不上 Task 1 取 buffer 的速度斑司,導(dǎo)致緩沖池1無可用的 buffer,Task 1 等待在可用的 buffer 上但汞。最終形成 Task 1 的降速宿刮。
- 遠(yuǎn)程傳輸:在輸出端,通過 Netty 的水位值機(jī)制來保證不往網(wǎng)絡(luò)中寫入太多數(shù)據(jù)私蕾。如果網(wǎng)絡(luò)中的數(shù)據(jù)(Netty輸出緩沖中的字節(jié)數(shù))超過了高水位值僵缺,我們會(huì)等到其降到低水位值以下才繼續(xù)寫入數(shù)據(jù)。
Netty 水位值機(jī)制
初始化 NettyServer 時(shí)配置的水位值參數(shù)踩叭。
bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, config.getMemorySegmentSize() + 1);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 2 * config.getMemorySegmentSize());
當(dāng)輸出緩沖中的字節(jié)數(shù)超過了高水位值, 則 Channel.isWritable() 會(huì)返回false磕潮。當(dāng)輸出緩存中的字節(jié)數(shù)又掉到了低水位值以下, 則 Channel.isWritable() 會(huì)重新返回true。Flink 中發(fā)送數(shù)據(jù)的核心代碼在 PartitionRequestQueue 中
private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
if (fatalError) {
return;
}
Buffer buffer = null;
try {
// channel.isWritable() 配合 WRITE_BUFFER_LOW_WATER_MARK
// 和 WRITE_BUFFER_HIGH_WATER_MARK 實(shí)現(xiàn)發(fā)送端的流量控制
if (channel.isWritable()) {
// 注意: 一個(gè)while循環(huán)也就最多只發(fā)送一個(gè)BufferResponse, 連續(xù)發(fā)送BufferResponse是通過writeListener回調(diào)實(shí)現(xiàn)的
while (true) {
if (currentPartitionQueue == null && (currentPartitionQueue = queue.poll()) == null) {
return;
}
buffer = currentPartitionQueue.getNextBuffer();
if (buffer == null) {
// 跳過這部分代碼
...
}
else {
// 構(gòu)造一個(gè)response返回給客戶端
BufferResponse resp = new BufferResponse(buffer, currentPartitionQueue.getSequenceNumber(), currentPartitionQueue.getReceiverId());
if (!buffer.isBuffer() &&
EventSerializer.fromBuffer(buffer, getClass().getClassLoader()).getClass() == EndOfPartitionEvent.class) {
// 跳過這部分代碼容贝。batch 模式中 subpartition 的數(shù)據(jù)準(zhǔn)備就緒自脯,通知下游消費(fèi)者。
...
}
// 將該response發(fā)到netty channel, 當(dāng)寫成功后,
// 通過注冊(cè)的writeListener又會(huì)回調(diào)進(jìn)來, 從而不斷地消費(fèi) queue 中的請(qǐng)求
channel.writeAndFlush(resp).addListener(writeListener);
return;
}
}
}
}
catch (Throwable t) {
if (buffer != null) {
buffer.recycle();
}
throw new IOException(t.getMessage(), t);
}
}
// 當(dāng)水位值降下來后(channel 再次可寫)斤富,會(huì)重新觸發(fā)發(fā)送函數(shù)
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
writeAndFlushNextMessageIfPossible(ctx.channel());
}
反壓監(jiān)控
Flink 在使用了一個(gè) trick 來實(shí)現(xiàn)對(duì)反壓的監(jiān)控膏潮。如果一個(gè) Task 因?yàn)榉磯憾邓倭耍敲此鼤?huì)卡在向 LocalBufferPool 申請(qǐng)內(nèi)存塊上满力。那么這時(shí)候焕参,該 Task 的 stack trace 就會(huì)長下面這樣:
java.lang.Object.wait(Native Method)
o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) <--- BLOCKING request
[...]
那么事情就簡單了。通過不斷地采樣每個(gè) task 的 stack trace(線程堆棧) 就可以實(shí)現(xiàn)反壓監(jiān)控脚囊。
Flink 的實(shí)現(xiàn)中龟糕,只有當(dāng) Web 頁面切換到某個(gè) Job 的 Backpressure 頁面,才會(huì)對(duì)這個(gè) Job 觸發(fā)反壓檢測(cè)悔耘,因?yàn)榉磯簷z測(cè)還是挺昂貴的。JobManager的StackTraceSampleCoordinator
( A coordinator for triggering and collecting stack traces of running tasks)
會(huì)通過 Akka 給每個(gè) TaskManager 發(fā)送TriggerStackTraceSample消息我擂。默認(rèn)情況下衬以,TaskManager 會(huì)觸發(fā)100次 stack trace 采樣,每次間隔 50ms(也就是說一次反壓檢測(cè)至少要等待5秒鐘)校摩。并將這 100 次采樣的結(jié)果返回給 JobManager看峻,由 JobManager 來計(jì)算反壓比率
(反壓出現(xiàn)的次數(shù)/采樣的次數(shù)),最終展現(xiàn)在 UI 上衙吩。UI 刷新的默認(rèn)周期是一分鐘互妓,目的是不對(duì) TaskManager 造成太大的負(fù)擔(dān)。
Flink 如何在吞吐量和延遲之間做權(quán)衡?
我們分析了上述的網(wǎng)絡(luò)傳輸后冯勉,知道每個(gè) SubTask 輸出的數(shù)據(jù)并不是直接輸出到下游澈蚌,而是在 ResultSubPartition 中有一個(gè) Buffer 用來緩存一批數(shù)據(jù)后,再 Flush 到 Netty 發(fā)送到下游 SubTask灼狰。那到底哪些情況會(huì)觸發(fā) Buffer Flush 到 Netty 呢宛瞄?
Buffer 變滿時(shí)
Buffer timeout 時(shí)
特殊事件來臨時(shí),例如:
CheckPoint 的 barrier
來臨時(shí)
Flink 在數(shù)據(jù)傳輸時(shí)交胚,會(huì)把數(shù)據(jù)序列化成二進(jìn)制然后寫到 Buffer 中份汗,當(dāng) Buffer 滿了,需要 Flush(默認(rèn)為32KiB
蝴簇,通過taskmanager.memory.segment-size設(shè)置)杯活。但是當(dāng)流量低峰或者測(cè)試環(huán)節(jié),可能1分鐘都沒有 32 KB
的數(shù)據(jù)熬词,就會(huì)導(dǎo)致1分鐘內(nèi)的數(shù)據(jù)都積攢在 Buffer 中不會(huì)發(fā)送到下游 Task 去處理旁钧,從而導(dǎo)致數(shù)據(jù)出現(xiàn)延遲,這并不是我們想看到的荡澎。所以 Flink 有一個(gè) Buffer timeout
的策略均践,意思是當(dāng)數(shù)據(jù)量比較少,Buffer 一直沒有變滿時(shí)摩幔,后臺(tái)的 Output flusher
線程會(huì)強(qiáng)制地將 Buffer 中的數(shù)據(jù) Flush 到下游彤委。Flink 中默認(rèn) timeout 時(shí)間是 100ms
,即:Buffer 中的數(shù)據(jù)要么變滿時(shí) Flush或衡,要么最多等 100ms 也會(huì) Flush 來保證數(shù)據(jù)不會(huì)出現(xiàn)很大的延遲焦影。當(dāng)然這個(gè)可以通過 env.setBufferTimeout
(timeoutMillis) 來控制超時(shí)時(shí)間。
- timeoutMillis > 0 表示最長等待 timeoutMillis 時(shí)間封断,就會(huì)flush
- timeoutMillis = 0 表示每條數(shù)據(jù)都會(huì)觸發(fā) flush斯辰,直接將數(shù)據(jù)發(fā)送到下游,相當(dāng)于沒有Buffer了(避免設(shè)置為0坡疼,可能導(dǎo)致性能下降)
- timeoutMillis = -1 表示只有等到 buffer滿了或 CheckPoint的時(shí)候彬呻,才會(huì)flush。相當(dāng)于取消了 timeout 策略
嚴(yán)格來講柄瑰,Output flusher 不提供任何保證——它只向 Netty 發(fā)送通知闸氮,而 Netty 線程會(huì)按照能力與意愿進(jìn)行處理。這也意味著如果存在反壓教沾,則 Output flusher 是無效的蒲跨。言外之意,如果反壓很嚴(yán)重授翻,下游 Buffer 都滿了或悲,當(dāng)然不能強(qiáng)制一直往下游發(fā)數(shù)據(jù)孙咪。
一些特殊的消息如果通過 RecordWriter 發(fā)送,也會(huì)觸發(fā)立即 Flush 緩存的數(shù)據(jù)巡语。其中最重要的消息包括 Checkpoint barrier 以及 end-of-partition 事件翎蹈,這些事件應(yīng)該盡快被發(fā)送,而不應(yīng)該等待 Buffer 被填滿或者 Output flusher 的下一次 Flush捌臊。當(dāng)然如果出現(xiàn)反壓杨蛋,CheckPoint barrier 也會(huì)等待,不能發(fā)送到下游理澎。
引入 Network buffers
以獲得更高的資源利用率
和更高的吞吐量
逞力,代價(jià)是讓一些記錄在 Buffer 中等待一段時(shí)間。雖然可以通過緩沖區(qū)超時(shí)給出此等待時(shí)間的上限糠爬,但你可能知道有關(guān)這兩個(gè)維度(延遲和吞吐量)之間權(quán)衡的更多信息:顯然寇荧,無法同時(shí)獲得這兩者
參考:
https://ververica.cn/developers/advanced-tutorial-2-analysis-of-network-flow-control-and-back-pressure/
http://wuchong.me/blog/2016/04/26/flink-internals-how-to-handle-backpressure/
[https://www.cnblogs.com/pengblog2020/p/12161716.html]