Flink 原理與實(shí)現(xiàn):理解 Flink 中的計(jì)算資源

本文所討論的計(jì)算資源是指用來(lái)執(zhí)行 Task 的資源,是一個(gè)邏輯概念姨谷。本文會(huì)介紹 Flink 計(jì)算資源相關(guān)的一些核心概念逗宁,如:Slot、SlotSharingGroup梦湘、CoLocationGroup瞎颗、Chain等。并會(huì)著重討論 Flink 如何對(duì)計(jì)算資源進(jìn)行管理和隔離捌议,如何將計(jì)算資源利用率最大化等等哼拔。理解 Flink 中的計(jì)算資源對(duì)于理解 Job 如何在集群中運(yùn)行的有很大的幫助,也有利于我們更透徹地理解 Flink 原理瓣颅,更快速地定位問(wèn)題管挟。

Operator Chains

為了更高效地分布式執(zhí)行,F(xiàn)link會(huì)盡可能地將operator的subtask鏈接(chain)在一起形成task弄捕。每個(gè)task在一個(gè)線程中執(zhí)行僻孝。將operators鏈接成task是非常有效的優(yōu)化:它能減少線程之間的切換,減少消息的序列化/反序列化守谓,減少數(shù)據(jù)在緩沖區(qū)的交換穿铆,減少了延遲的同時(shí)提高整體的吞吐量。

我們?nèi)砸越?jīng)典的 WordCount 為例(參考前文Job例子)斋荞,下面這幅圖荞雏,展示了Source并行度為1,F(xiàn)latMap平酿、KeyAggregation凤优、Sink并行度均為2,最終以5個(gè)并行的線程來(lái)執(zhí)行的優(yōu)化過(guò)程蜈彼。

上圖中將KeyAggregation和Sink兩個(gè)operator進(jìn)行了合并筑辨,因?yàn)檫@兩個(gè)合并后并不會(huì)改變整體的拓?fù)浣Y(jié)構(gòu)。但是幸逆,并不是任意兩個(gè) operator 就能 chain 一起的棍辕。其條件還是很苛刻的:

  1. 上下游的并行度一致
  2. 下游節(jié)點(diǎn)的入度為1 (也就是說(shuō)下游節(jié)點(diǎn)沒(méi)有來(lái)自其他節(jié)點(diǎn)的輸入)
  3. 上下游節(jié)點(diǎn)都在同一個(gè) slot group 中(下面會(huì)解釋 slot group)
  4. 下游節(jié)點(diǎn)的 chain 策略為 ALWAYS(可以與上下游鏈接,map还绘、flatmap楚昭、filter等默認(rèn)是ALWAYS)
  5. 上游節(jié)點(diǎn)的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接,不能與上游鏈接拍顷,Source默認(rèn)是HEAD)
  6. 兩個(gè)節(jié)點(diǎn)間數(shù)據(jù)分區(qū)方式是 forward(參考理解數(shù)據(jù)流的分區(qū)
  7. 用戶沒(méi)有禁用 chain

Operator chain的行為可以通過(guò)編程API中進(jìn)行指定抚太。可以通過(guò)在DataStreamoperator后面(如someStream.map(..))調(diào)用startNewChain()來(lái)指示從該operator開(kāi)始一個(gè)新的chain(與前面截?cái)辔舭福粫?huì)被chain到前面)尿贫。或者調(diào)用disableChaining()來(lái)指示該operator不參與chaining(不會(huì)與前后的operator chain一起)爱沟。在底層帅霜,這兩個(gè)方法都是通過(guò)調(diào)整operatorchain 策略(HEAD、NEVER)來(lái)實(shí)現(xiàn)的呼伸。另外身冀,也可以通過(guò)調(diào)用StreamExecutionEnvironment.disableOperatorChaining()來(lái)全局禁用chaining。

原理與實(shí)現(xiàn)

那么 Flink 是如何將多個(gè) operators chain在一起的呢括享?chain在一起的operators是如何作為一個(gè)整體被執(zhí)行的呢搂根?它們之間的數(shù)據(jù)流又是如何避免了序列化/反序列化以及網(wǎng)絡(luò)傳輸?shù)哪兀肯聢D展示了operators chain的內(nèi)部實(shí)現(xiàn):

如上圖所示铃辖,F(xiàn)link內(nèi)部是通過(guò)OperatorChain這個(gè)類(lèi)來(lái)將多個(gè)operator鏈在一起形成一個(gè)新的operator剩愧。OperatorChain形成的框框就像一個(gè)黑盒,F(xiàn)link 無(wú)需知道黑盒中有多少個(gè)ChainOperator娇斩、數(shù)據(jù)在chain內(nèi)部是怎么流動(dòng)的仁卷,只需要將input數(shù)據(jù)交給 HeadOperator 就可以了穴翩,這就使得OperatorChain在行為上與普通的operator無(wú)差別,上面的OperaotrChain就可以看做是一個(gè)入度為1锦积,出度為2的operator芒帕。所以在實(shí)現(xiàn)中,對(duì)外可見(jiàn)的只有HeadOperator丰介,以及與外部連通的實(shí)線輸出背蟆,這些輸出對(duì)應(yīng)了JobGraph中的JobEdge,在底層通過(guò)RecordWriterOutput來(lái)實(shí)現(xiàn)哮幢。另外带膀,框中的虛線是operator chain內(nèi)部的數(shù)據(jù)流,這個(gè)流內(nèi)的數(shù)據(jù)不會(huì)經(jīng)過(guò)序列化/反序列化橙垢、網(wǎng)絡(luò)傳輸垛叨,而是直接將消息對(duì)象傳遞給下游的 ChainOperator 處理,這是性能提升的關(guān)鍵點(diǎn)钢悲,在底層是通過(guò) ChainingOutput 實(shí)現(xiàn)的点额,源碼如下方所示,

注:HeadOperatorChainOperator并不是具體的數(shù)據(jù)結(jié)構(gòu)莺琳,前者指代chain中的第一個(gè)operator还棱,后者指代chain中其余的operator,它們實(shí)際上都是StreamOperator惭等。

private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
  // 注冊(cè)的下游operator
  protected final OneInputStreamOperator<T, ?> operator;
  public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
    this.operator = operator;
  }
  @Override
  // 發(fā)送消息方法的實(shí)現(xiàn)珍手,直接將消息對(duì)象傳遞給operator處理,不經(jīng)過(guò)序列化/反序列化辞做、網(wǎng)絡(luò)傳輸
  public void collect(StreamRecord<T> record) {
    try {
      operator.setKeyContextElement1(record);
      // 下游operator直接處理消息對(duì)象
      operator.processElement(record);
    }
    catch (Exception e) {
      throw new ExceptionInChainedOperatorException(e);
    }
  }
  ...
}

Task Slot

架構(gòu)概覽中我們介紹了 TaskManager 是一個(gè) JVM 進(jìn)程琳要,并會(huì)以獨(dú)立的線程來(lái)執(zhí)行一個(gè)task或多個(gè)subtask。為了控制一個(gè) TaskManager 能接受多少個(gè) task秤茅,F(xiàn)link 提出了 Task Slot 的概念稚补。

Flink 中的計(jì)算資源通過(guò) Task Slot 來(lái)定義。每個(gè) task slot 代表了 TaskManager 的一個(gè)固定大小的資源子集框喳。例如课幕,一個(gè)擁有3個(gè)slot的 TaskManager,會(huì)將其管理的內(nèi)存平均分成三分分給各個(gè) slot五垮。將資源 slot 化意味著來(lái)自不同job的task不會(huì)為了內(nèi)存而競(jìng)爭(zhēng)乍惊,而是每個(gè)task都擁有一定數(shù)量的內(nèi)存儲(chǔ)備。需要注意的是放仗,這里不會(huì)涉及到CPU的隔離润绎,slot目前僅僅用來(lái)隔離task的內(nèi)存。

通過(guò)調(diào)整 task slot 的數(shù)量,用戶可以定義task之間是如何相互隔離的莉撇。每個(gè) TaskManager 有一個(gè)slot呢蛤,也就意味著每個(gè)task運(yùn)行在獨(dú)立的 JVM 中。每個(gè) TaskManager 有多個(gè)slot的話稼钩,也就是說(shuō)多個(gè)task運(yùn)行在同一個(gè)JVM中顾稀。而在同一個(gè)JVM進(jìn)程中的task,可以共享TCP連接(基于多路復(fù)用)和心跳消息坝撑,可以減少數(shù)據(jù)的網(wǎng)絡(luò)傳輸。也能共享一些數(shù)據(jù)結(jié)構(gòu)粮揉,一定程度上減少了每個(gè)task的消耗巡李。

每一個(gè) TaskManager 會(huì)擁有一個(gè)或多個(gè)的 task slot,每個(gè) slot 都能跑由多個(gè)連續(xù) task 組成的一個(gè) pipeline扶认,比如 MapFunction 的第n個(gè)并行實(shí)例和 ReduceFunction 的第n個(gè)并行實(shí)例可以組成一個(gè) pipeline侨拦。

如上文所述的 WordCount 例子,5個(gè)Task可能會(huì)在TaskManager的slots中如下圖分布辐宾,2個(gè)TaskManager狱从,每個(gè)有3個(gè)slot:

SlotSharingGroup 與 CoLocationGroup

默認(rèn)情況下,F(xiàn)link 允許subtasks共享slot叠纹,條件是它們都來(lái)自同一個(gè)Job的不同task的subtask季研。結(jié)果可能一個(gè)slot持有該job的整個(gè)pipeline。允許slot共享有以下兩點(diǎn)好處:

  1. Flink 集群所需的task slots數(shù)與job中最高的并行度一致誉察。也就是說(shuō)我們不需要再去計(jì)算一個(gè)程序總共會(huì)起多少個(gè)task了与涡。
  2. 更容易獲得更充分的資源利用。如果沒(méi)有slot共享持偏,那么非密集型操作source/flatmap就會(huì)占用同密集型操作 keyAggregation/sink 一樣多的資源驼卖。如果有slot共享,將基線的2個(gè)并行度增加到6個(gè)鸿秆,能充分利用slot資源酌畜,同時(shí)保證每個(gè)TaskManager能平均分配到重的subtasks。

我們將 WordCount 的并行度從之前的2個(gè)增加到6個(gè)(Source并行度仍為1)卿叽,并開(kāi)啟slot共享(所有operator都在default共享組)桥胞,將得到如上圖所示的slot分布圖。首先附帽,我們不用去計(jì)算這個(gè)job會(huì)其多少個(gè)task埠戳,總之該任務(wù)最終會(huì)占用6個(gè)slots(最高并行度為6)。其次蕉扮,我們可以看到密集型操作 keyAggregation/sink 被平均地分配到各個(gè) TaskManager整胃。

SlotSharingGroup是Flink中用來(lái)實(shí)現(xiàn)slot共享的類(lèi),它盡可能地讓subtasks共享一個(gè)slot喳钟。相應(yīng)的屁使,還有一個(gè) CoLocationGroup類(lèi)用來(lái)強(qiáng)制將 subtasks 放到同一個(gè) slot 中在岂。CoLocationGroup主要用于迭代流中,用來(lái)保證迭代頭與迭代尾的第i個(gè)subtask能被調(diào)度到同一個(gè)TaskManager上蛮寂。這里我們不會(huì)詳細(xì)討論CoLocationGroup的實(shí)現(xiàn)細(xì)節(jié)蔽午。

怎么判斷operator屬于哪個(gè) slot 共享組呢?默認(rèn)情況下酬蹋,所有的operator都屬于默認(rèn)的共享組default及老,也就是說(shuō)默認(rèn)情況下所有的operator都是可以共享一個(gè)slot的。而當(dāng)所有input operators具有相同的slot共享組時(shí)范抓,該operator會(huì)繼承這個(gè)共享組骄恶。最后,為了防止不合理的共享匕垫,用戶也能通過(guò)API來(lái)強(qiáng)制指定operator的共享組僧鲁,比如:someStream.filter(...).slotSharingGroup("group1");就強(qiáng)制指定了filter的slot共享組為group1

原理與實(shí)現(xiàn)

那么多個(gè)tasks(或者說(shuō)operators)是如何共享slot的呢象泵?

我們先來(lái)看一下用來(lái)定義計(jì)算資源的slot的類(lèi)圖:

抽象類(lèi)Slot定義了該槽位屬于哪個(gè)TaskManager(instance)的第幾個(gè)槽位(slotNumber)寞秃,屬于哪個(gè)Job(jobID)等信息。最簡(jiǎn)單的情況下偶惠,一個(gè)slot只持有一個(gè)task春寿,也就是SimpleSlot的實(shí)現(xiàn)。復(fù)雜點(diǎn)的情況洲鸠,一個(gè)slot能共享給多個(gè)task使用堂淡,也就是SharedSlot的實(shí)現(xiàn)。SharedSlot能包含其他的SharedSlot扒腕,也能包含SimpleSlot绢淀。所以一個(gè)SharedSlot能定義出一棵slots樹(shù)。

接下來(lái)我們來(lái)看看 Flink 為subtask分配slot的過(guò)程瘾腰。關(guān)于Flink調(diào)度皆的,有兩個(gè)非常重要的原則我們必須知道:(1)同一個(gè)operator的各個(gè)subtask是不能呆在同一個(gè)SharedSlot中的,例如FlatMap[1]FlatMap[2]是不能在同一個(gè)SharedSlot中的蹋盆。(2)Flink是按照拓?fù)漤樞驈腟ource一個(gè)個(gè)調(diào)度到Sink的费薄。例如WordCount(Source并行度為1,其他并行度為2)栖雾,那么調(diào)度的順序依次是:Source -> FlatMap[1] -> FlatMap[2] -> KeyAgg->Sink[1] -> KeyAgg->Sink[2]楞抡。假設(shè)現(xiàn)在有2個(gè)TaskManager,每個(gè)只有1個(gè)slot(為簡(jiǎn)化問(wèn)題)析藕,那么分配slot的過(guò)程如圖所示:

注:圖中 SharedSlot 與 SimpleSlot 后帶的括號(hào)中的數(shù)字代表槽位號(hào)(slotNumber)

  1. Source分配slot召廷。首先,我們從TaskManager1中分配出一個(gè)SharedSlot。并從SharedSlot中為Source分配出一個(gè)SimpleSlot竞慢。如上圖中的①和②先紫。
  2. FlatMap[1]分配slot。目前已經(jīng)有一個(gè)SharedSlot筹煮,則從該SharedSlot中分配出一個(gè)SimpleSlot用來(lái)部署FlatMap[1]遮精。如上圖中的③。
  3. FlatMap[2]分配slot败潦。由于TaskManager1SharedSlot中已經(jīng)有同operator的FlatMap[1]了本冲,我們只能分配到其他SharedSlot中去。從TaskManager2中分配出一個(gè)SharedSlot变屁,并從該SharedSlot中為FlatMap[2]分配出一個(gè)SimpleSlot眼俊。如上圖的④和⑤。
  4. Key->Sink[1]分配slot粟关。目前兩個(gè)SharedSlot都符合條件,從TaskManager1SharedSlot中分配出一個(gè)SimpleSlot用來(lái)部署Key->Sink[1]环戈。如上圖中的⑥闷板。
  5. Key->Sink[2]分配slot。TaskManager1SharedSlot中已經(jīng)有同operator的Key->Sink[1]了院塞,則只能選擇另一個(gè)SharedSlot中分配出一個(gè)SimpleSlot用來(lái)部署Key->Sink[2]遮晚。如上圖中的⑦。

最后Source拦止、FlatMap[1]县遣、Key->Sink[1]這些subtask都會(huì)部署到TaskManager1的唯一一個(gè)slot中,并啟動(dòng)對(duì)應(yīng)的線程汹族。FlatMap[2]萧求、Key->Sink[2]這些subtask都會(huì)被部署到TaskManager2的唯一一個(gè)slot中,并啟動(dòng)對(duì)應(yīng)的線程顶瞒。從而實(shí)現(xiàn)了slot共享夸政。

總結(jié)

本文主要介紹了Flink中計(jì)算資源的相關(guān)概念以及原理實(shí)現(xiàn)。最核心的是 Task Slot榴徐,每個(gè)slot能運(yùn)行一個(gè)或多個(gè)task守问。為了拓?fù)涓咝У剡\(yùn)行,F(xiàn)link提出了Chaining坑资,盡可能地將operators chain在一起作為一個(gè)task來(lái)處理耗帕。為了資源更充分的利用,F(xiàn)link又提出了SlotSharingGroup袱贮,盡可能地讓多個(gè)task共享一個(gè)slot仿便。

轉(zhuǎn)載:http://wuchong.me/blog/2016/05/09/flink-internals-understanding-execution-resources/

參考資料

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子探越,更是在濱河造成了極大的恐慌狡赐,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,284評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件钦幔,死亡現(xiàn)場(chǎng)離奇詭異枕屉,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)鲤氢,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)搀擂,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人卷玉,你說(shuō)我怎么就攤上這事哨颂。” “怎么了相种?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,614評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵威恼,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我寝并,道長(zhǎng)箫措,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,671評(píng)論 1 293
  • 正文 為了忘掉前任衬潦,我火速辦了婚禮斤蔓,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘镀岛。我一直安慰自己弦牡,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,699評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布漂羊。 她就那樣靜靜地躺著驾锰,像睡著了一般。 火紅的嫁衣襯著肌膚如雪拨与。 梳的紋絲不亂的頭發(fā)上稻据,一...
    開(kāi)封第一講書(shū)人閱讀 51,562評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音买喧,去河邊找鬼捻悯。 笑死,一個(gè)胖子當(dāng)著我的面吹牛淤毛,可吹牛的內(nèi)容都是我干的今缚。 我是一名探鬼主播,決...
    沈念sama閱讀 40,309評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼低淡,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼姓言!你這毒婦竟也來(lái)了瞬项?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,223評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤何荚,失蹤者是張志新(化名)和其女友劉穎囱淋,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體餐塘,經(jīng)...
    沈念sama閱讀 45,668評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡妥衣,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,859評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了戒傻。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片税手。...
    茶點(diǎn)故事閱讀 39,981評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖需纳,靈堂內(nèi)的尸體忽然破棺而出芦倒,到底是詐尸還是另有隱情,我是刑警寧澤不翩,帶...
    沈念sama閱讀 35,705評(píng)論 5 347
  • 正文 年R本政府宣布兵扬,位于F島的核電站,受9級(jí)特大地震影響口蝠,放射性物質(zhì)發(fā)生泄漏周霉。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,310評(píng)論 3 330
  • 文/蒙蒙 一亚皂、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧国瓮,春花似錦灭必、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,904評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至孵睬,卻和暖如春播歼,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背掰读。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,023評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工秘狞, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人蹈集。 一個(gè)月前我還...
    沈念sama閱讀 48,146評(píng)論 3 370
  • 正文 我出身青樓烁试,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親拢肆。 傳聞我的和親對(duì)象是個(gè)殘疾皇子减响,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,933評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容