本文所討論的計(jì)算資源是指用來(lái)執(zhí)行 Task 的資源,是一個(gè)邏輯概念姨谷。本文會(huì)介紹 Flink 計(jì)算資源相關(guān)的一些核心概念逗宁,如:Slot、SlotSharingGroup梦湘、CoLocationGroup瞎颗、Chain等。并會(huì)著重討論 Flink 如何對(duì)計(jì)算資源進(jìn)行管理和隔離捌议,如何將計(jì)算資源利用率最大化等等哼拔。理解 Flink 中的計(jì)算資源對(duì)于理解 Job 如何在集群中運(yùn)行的有很大的幫助,也有利于我們更透徹地理解 Flink 原理瓣颅,更快速地定位問(wèn)題管挟。
Operator Chains
為了更高效地分布式執(zhí)行,F(xiàn)link會(huì)盡可能地將operator的subtask鏈接(chain)在一起形成task弄捕。每個(gè)task在一個(gè)線程中執(zhí)行僻孝。將operators鏈接成task是非常有效的優(yōu)化:它能減少線程之間的切換,減少消息的序列化/反序列化守谓,減少數(shù)據(jù)在緩沖區(qū)的交換穿铆,減少了延遲的同時(shí)提高整體的吞吐量。
我們?nèi)砸越?jīng)典的 WordCount 為例(參考前文Job例子)斋荞,下面這幅圖荞雏,展示了Source并行度為1,F(xiàn)latMap平酿、KeyAggregation凤优、Sink并行度均為2,最終以5個(gè)并行的線程來(lái)執(zhí)行的優(yōu)化過(guò)程蜈彼。
上圖中將KeyAggregation和Sink兩個(gè)operator進(jìn)行了合并筑辨,因?yàn)檫@兩個(gè)合并后并不會(huì)改變整體的拓?fù)浣Y(jié)構(gòu)。但是幸逆,并不是任意兩個(gè) operator 就能 chain 一起的棍辕。其條件還是很苛刻的:
- 上下游的并行度一致
- 下游節(jié)點(diǎn)的入度為1 (也就是說(shuō)下游節(jié)點(diǎn)沒(méi)有來(lái)自其他節(jié)點(diǎn)的輸入)
- 上下游節(jié)點(diǎn)都在同一個(gè) slot group 中(下面會(huì)解釋 slot group)
- 下游節(jié)點(diǎn)的 chain 策略為 ALWAYS(可以與上下游鏈接,map还绘、flatmap楚昭、filter等默認(rèn)是ALWAYS)
- 上游節(jié)點(diǎn)的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接,不能與上游鏈接拍顷,Source默認(rèn)是HEAD)
- 兩個(gè)節(jié)點(diǎn)間數(shù)據(jù)分區(qū)方式是 forward(參考理解數(shù)據(jù)流的分區(qū))
- 用戶沒(méi)有禁用 chain
Operator chain
的行為可以通過(guò)編程API中進(jìn)行指定抚太。可以通過(guò)在DataStream
的operator
后面(如someStream.map(..)
)調(diào)用startNewChain()
來(lái)指示從該operator
開(kāi)始一個(gè)新的chain
(與前面截?cái)辔舭福粫?huì)被chain到前面)尿贫。或者調(diào)用disableChaining()
來(lái)指示該operator
不參與chaining
(不會(huì)與前后的operator chain一起)爱沟。在底層帅霜,這兩個(gè)方法都是通過(guò)調(diào)整operator
的 chain
策略(HEAD、NEVER)來(lái)實(shí)現(xiàn)的呼伸。另外身冀,也可以通過(guò)調(diào)用StreamExecutionEnvironment.disableOperatorChaining()
來(lái)全局禁用chaining。
原理與實(shí)現(xiàn)
那么 Flink 是如何將多個(gè) operators chain在一起的呢括享?chain在一起的operators是如何作為一個(gè)整體被執(zhí)行的呢搂根?它們之間的數(shù)據(jù)流又是如何避免了序列化/反序列化以及網(wǎng)絡(luò)傳輸?shù)哪兀肯聢D展示了operators chain的內(nèi)部實(shí)現(xiàn):
如上圖所示铃辖,F(xiàn)link內(nèi)部是通過(guò)OperatorChain
這個(gè)類(lèi)來(lái)將多個(gè)operator鏈在一起形成一個(gè)新的operator剩愧。OperatorChai
n形成的框框就像一個(gè)黑盒,F(xiàn)link 無(wú)需知道黑盒中有多少個(gè)ChainOperator
娇斩、數(shù)據(jù)在chain內(nèi)部是怎么流動(dòng)的仁卷,只需要將input數(shù)據(jù)交給 HeadOperator
就可以了穴翩,這就使得OperatorChain
在行為上與普通的operator無(wú)差別,上面的OperaotrChain
就可以看做是一個(gè)入度為1锦积,出度為2的operator芒帕。所以在實(shí)現(xiàn)中,對(duì)外可見(jiàn)的只有HeadOperator
丰介,以及與外部連通的實(shí)線輸出背蟆,這些輸出對(duì)應(yīng)了JobGraph
中的JobEdge
,在底層通過(guò)RecordWriterOutput
來(lái)實(shí)現(xiàn)哮幢。另外带膀,框中的虛線是operator chain
內(nèi)部的數(shù)據(jù)流,這個(gè)流內(nèi)的數(shù)據(jù)不會(huì)經(jīng)過(guò)序列化/反序列化橙垢、網(wǎng)絡(luò)傳輸垛叨,而是直接將消息對(duì)象傳遞給下游的 ChainOperator
處理,這是性能提升的關(guān)鍵點(diǎn)钢悲,在底層是通過(guò) ChainingOutput
實(shí)現(xiàn)的点额,源碼如下方所示,
注:HeadOperator
和ChainOperator
并不是具體的數(shù)據(jù)結(jié)構(gòu)莺琳,前者指代chain中的第一個(gè)operator还棱,后者指代chain中其余的operator,它們實(shí)際上都是StreamOperator
惭等。
private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
// 注冊(cè)的下游operator
protected final OneInputStreamOperator<T, ?> operator;
public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
this.operator = operator;
}
@Override
// 發(fā)送消息方法的實(shí)現(xiàn)珍手,直接將消息對(duì)象傳遞給operator處理,不經(jīng)過(guò)序列化/反序列化辞做、網(wǎng)絡(luò)傳輸
public void collect(StreamRecord<T> record) {
try {
operator.setKeyContextElement1(record);
// 下游operator直接處理消息對(duì)象
operator.processElement(record);
}
catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
}
}
...
}
Task Slot
在架構(gòu)概覽中我們介紹了 TaskManager
是一個(gè) JVM 進(jìn)程琳要,并會(huì)以獨(dú)立的線程來(lái)執(zhí)行一個(gè)task或多個(gè)subtask。為了控制一個(gè) TaskManager
能接受多少個(gè) task秤茅,F(xiàn)link 提出了 Task Slot 的概念稚补。
Flink 中的計(jì)算資源通過(guò) Task Slot
來(lái)定義。每個(gè) task slot
代表了 TaskManager
的一個(gè)固定大小的資源子集框喳。例如课幕,一個(gè)擁有3個(gè)slot的 TaskManager
,會(huì)將其管理的內(nèi)存平均分成三分分給各個(gè) slot五垮。將資源 slot 化意味著來(lái)自不同job的task不會(huì)為了內(nèi)存而競(jìng)爭(zhēng)乍惊,而是每個(gè)task都擁有一定數(shù)量的內(nèi)存儲(chǔ)備。需要注意的是放仗,這里不會(huì)涉及到CPU的隔離润绎,slot目前僅僅用來(lái)隔離task的內(nèi)存。
通過(guò)調(diào)整 task slot 的數(shù)量,用戶可以定義task之間是如何相互隔離的莉撇。每個(gè) TaskManager
有一個(gè)slot呢蛤,也就意味著每個(gè)task運(yùn)行在獨(dú)立的 JVM 中。每個(gè) TaskManager
有多個(gè)slot的話稼钩,也就是說(shuō)多個(gè)task運(yùn)行在同一個(gè)JVM中顾稀。而在同一個(gè)JVM進(jìn)程中的task,可以共享TCP連接(基于多路復(fù)用)和心跳消息坝撑,可以減少數(shù)據(jù)的網(wǎng)絡(luò)傳輸。也能共享一些數(shù)據(jù)結(jié)構(gòu)粮揉,一定程度上減少了每個(gè)task的消耗巡李。
每一個(gè) TaskManager 會(huì)擁有一個(gè)或多個(gè)的 task slot,每個(gè) slot 都能跑由多個(gè)連續(xù) task 組成的一個(gè) pipeline扶认,比如 MapFunction 的第n個(gè)并行實(shí)例和 ReduceFunction 的第n個(gè)并行實(shí)例可以組成一個(gè) pipeline侨拦。
如上文所述的 WordCount 例子,5個(gè)Task可能會(huì)在TaskManager的slots中如下圖分布辐宾,2個(gè)TaskManager狱从,每個(gè)有3個(gè)slot:
SlotSharingGroup 與 CoLocationGroup
默認(rèn)情況下,F(xiàn)link 允許subtasks共享slot叠纹,條件是它們都來(lái)自同一個(gè)Job的不同task的subtask季研。結(jié)果可能一個(gè)slot持有該job的整個(gè)pipeline。允許slot共享有以下兩點(diǎn)好處:
- Flink 集群所需的task slots數(shù)與job中最高的并行度一致誉察。也就是說(shuō)我們不需要再去計(jì)算一個(gè)程序總共會(huì)起多少個(gè)task了与涡。
- 更容易獲得更充分的資源利用。如果沒(méi)有slot共享持偏,那么非密集型操作source/flatmap就會(huì)占用同密集型操作 keyAggregation/sink 一樣多的資源驼卖。如果有slot共享,將基線的2個(gè)并行度增加到6個(gè)鸿秆,能充分利用slot資源酌畜,同時(shí)保證每個(gè)TaskManager能平均分配到重的subtasks。
我們將 WordCount 的并行度從之前的2個(gè)增加到6個(gè)(Source并行度仍為1)卿叽,并開(kāi)啟slot共享(所有operator都在default共享組)桥胞,將得到如上圖所示的slot分布圖。首先附帽,我們不用去計(jì)算這個(gè)job會(huì)其多少個(gè)task埠戳,總之該任務(wù)最終會(huì)占用6個(gè)slots(最高并行度為6)。其次蕉扮,我們可以看到密集型操作 keyAggregation/sink 被平均地分配到各個(gè) TaskManager整胃。
SlotSharingGroup
是Flink中用來(lái)實(shí)現(xiàn)slot共享的類(lèi),它盡可能地讓subtasks共享一個(gè)slot喳钟。相應(yīng)的屁使,還有一個(gè) CoLocationGroup
類(lèi)用來(lái)強(qiáng)制將 subtasks 放到同一個(gè) slot 中在岂。CoLocationGroup
主要用于迭代流中,用來(lái)保證迭代頭與迭代尾的第i個(gè)subtask能被調(diào)度到同一個(gè)TaskManager上蛮寂。這里我們不會(huì)詳細(xì)討論CoLocationGroup
的實(shí)現(xiàn)細(xì)節(jié)蔽午。
怎么判斷operator屬于哪個(gè) slot 共享組呢?默認(rèn)情況下酬蹋,所有的operator都屬于默認(rèn)的共享組default
及老,也就是說(shuō)默認(rèn)情況下所有的operator都是可以共享一個(gè)slot的。而當(dāng)所有input operators具有相同的slot共享組時(shí)范抓,該operator會(huì)繼承這個(gè)共享組骄恶。最后,為了防止不合理的共享匕垫,用戶也能通過(guò)API來(lái)強(qiáng)制指定operator的共享組僧鲁,比如:someStream.filter(...).slotSharingGroup("group1");
就強(qiáng)制指定了filter的slot共享組為group1
。
原理與實(shí)現(xiàn)
那么多個(gè)tasks(或者說(shuō)operators)是如何共享slot的呢象泵?
我們先來(lái)看一下用來(lái)定義計(jì)算資源的slot的類(lèi)圖:
抽象類(lèi)Slot
定義了該槽位屬于哪個(gè)TaskManager(instance
)的第幾個(gè)槽位(slotNumber
)寞秃,屬于哪個(gè)Job(jobID
)等信息。最簡(jiǎn)單的情況下偶惠,一個(gè)slot只持有一個(gè)task春寿,也就是SimpleSlot
的實(shí)現(xiàn)。復(fù)雜點(diǎn)的情況洲鸠,一個(gè)slot能共享給多個(gè)task使用堂淡,也就是SharedSlot
的實(shí)現(xiàn)。SharedSlot能包含其他的SharedSlot扒腕,也能包含SimpleSlot绢淀。所以一個(gè)SharedSlot能定義出一棵slots樹(shù)。
接下來(lái)我們來(lái)看看 Flink 為subtask分配slot的過(guò)程瘾腰。關(guān)于Flink調(diào)度皆的,有兩個(gè)非常重要的原則我們必須知道:(1)同一個(gè)operator的各個(gè)subtask是不能呆在同一個(gè)SharedSlot中的,例如FlatMap[1]
和FlatMap[2]
是不能在同一個(gè)SharedSlot中的蹋盆。(2)Flink是按照拓?fù)漤樞驈腟ource一個(gè)個(gè)調(diào)度到Sink的费薄。例如WordCount(Source并行度為1,其他并行度為2)栖雾,那么調(diào)度的順序依次是:Source
-> FlatMap[1]
-> FlatMap[2]
-> KeyAgg->Sink[1]
-> KeyAgg->Sink[2]
楞抡。假設(shè)現(xiàn)在有2個(gè)TaskManager,每個(gè)只有1個(gè)slot(為簡(jiǎn)化問(wèn)題)析藕,那么分配slot的過(guò)程如圖所示:
注:圖中 SharedSlot 與 SimpleSlot 后帶的括號(hào)中的數(shù)字代表槽位號(hào)(slotNumber)
- 為
Source
分配slot召廷。首先,我們從TaskManager1
中分配出一個(gè)SharedSlot
。并從SharedSlot
中為Source
分配出一個(gè)SimpleSlot
竞慢。如上圖中的①和②先紫。 - 為
FlatMap[1]
分配slot。目前已經(jīng)有一個(gè)SharedSlot
筹煮,則從該SharedSlot
中分配出一個(gè)SimpleSlot
用來(lái)部署FlatMap[1]
遮精。如上圖中的③。 - 為
FlatMap[2]
分配slot败潦。由于TaskManager1
的SharedSlot
中已經(jīng)有同operator的FlatMap[1]
了本冲,我們只能分配到其他SharedSlot
中去。從TaskManager2
中分配出一個(gè)SharedSlot
变屁,并從該SharedSlot
中為FlatMap[2]
分配出一個(gè)SimpleSlot
眼俊。如上圖的④和⑤。 - 為
Key->Sink[1]
分配slot粟关。目前兩個(gè)SharedSlot
都符合條件,從TaskManager1
的SharedSlot
中分配出一個(gè)SimpleSlot
用來(lái)部署Key->Sink[1
]环戈。如上圖中的⑥闷板。 - 為
Key->Sink[2]
分配slot。TaskManager1
的SharedSlot
中已經(jīng)有同operator的Key->Sink[1]
了院塞,則只能選擇另一個(gè)SharedSlot
中分配出一個(gè)SimpleSlot
用來(lái)部署Key->Sink[2]
遮晚。如上圖中的⑦。
最后Source拦止、FlatMap[1]县遣、Key->Sink[1]這些subtask都會(huì)部署到TaskManager1的唯一一個(gè)slot中,并啟動(dòng)對(duì)應(yīng)的線程汹族。FlatMap[2]萧求、Key->Sink[2]這些subtask都會(huì)被部署到TaskManager2的唯一一個(gè)slot中,并啟動(dòng)對(duì)應(yīng)的線程顶瞒。從而實(shí)現(xiàn)了slot共享夸政。
總結(jié)
本文主要介紹了Flink中計(jì)算資源的相關(guān)概念以及原理實(shí)現(xiàn)。最核心的是 Task Slot榴徐,每個(gè)slot能運(yùn)行一個(gè)或多個(gè)task守问。為了拓?fù)涓咝У剡\(yùn)行,F(xiàn)link提出了Chaining坑资,盡可能地將operators chain在一起作為一個(gè)task來(lái)處理耗帕。為了資源更充分的利用,F(xiàn)link又提出了SlotSharingGroup袱贮,盡可能地讓多個(gè)task共享一個(gè)slot仿便。
轉(zhuǎn)載:http://wuchong.me/blog/2016/05/09/flink-internals-understanding-execution-resources/