理解flink的圖結(jié)構(gòu)和任務(wù)的調(diào)度與執(zhí)行

[TOC]
第一部分講到则拷,我們的主函數(shù)最后一項(xiàng)任務(wù)就是生成StreamGraph,然后生成JobGraph嘁捷,然 后以此開始調(diào)度任務(wù)運(yùn)行薪棒,所以接下來(lái)我們從這里入手手蝎,繼續(xù)探索flink。

2.1 flink的三層圖結(jié)構(gòu)

事實(shí)上俐芯,flink總共提供了三種圖的抽象棵介,我們前面已經(jīng)提到了StreamGraph和JobGraph,還 有一種是ExecutionGraph泼各,是用于調(diào)度的基本數(shù)據(jù)結(jié)構(gòu)鞍时。

image.png
image.png

上面這張圖清晰的給出了flink各個(gè)圖的工作原理和轉(zhuǎn)換過(guò)程。其中最后一個(gè)物理執(zhí)行圖并非 flink的數(shù)據(jù)結(jié)構(gòu)扣蜻,而是程序開始執(zhí)行后,各個(gè)task分布在不同的節(jié)點(diǎn)上及塘,所形成的物理上的關(guān) 系表示莽使。

  • 從JobGraph的圖里可以看到,數(shù)據(jù)從上一個(gè)operator流到下一個(gè)operator的過(guò)程中笙僚,上 游作為生產(chǎn)者提供了IntermediateDataSet芳肌,而下游作為消費(fèi)者需要JobEdge。事實(shí) 上,JobEdge是一個(gè)通信管道亿笤,連接了上游生產(chǎn)的dataset和下游的JobVertex節(jié)點(diǎn)翎迁。
  • 在JobGraph轉(zhuǎn)換到ExecutionGraph的過(guò)程中,主要發(fā)生了以下轉(zhuǎn)變:
    • 加入了并行度的概念净薛,成為真正可調(diào)度的圖結(jié)構(gòu)
    • 生成了與JobVertex對(duì)應(yīng)的ExecutionJobVertex汪榔,ExecutionVertex,與 IntermediateDataSet對(duì)應(yīng)的IntermediateResult和IntermediateResultPartition等肃拜, 并行將通過(guò)這些類實(shí)現(xiàn)
  • ExecutionGraph已經(jīng)可以用于調(diào)度任務(wù)痴腌。我們可以看到,flink根據(jù)該圖生成了一一對(duì)應(yīng)的 Task燃领,每個(gè)task對(duì)應(yīng)一個(gè)ExecutionGraph的一個(gè)Execution士聪。Task用InputGate、 InputChannel和ResultPartition對(duì)應(yīng)了上面圖中的IntermediateResult和 ExecutionEdge猛蔽。

那么剥悟,flink抽象出這三層圖結(jié)構(gòu),四層執(zhí)行邏輯的意義是什么呢? StreamGraph是對(duì)用戶邏輯的映射曼库。JobGraph在此基礎(chǔ)上進(jìn)行了一些優(yōu)化区岗,比如把一部分操 作串成chain以提高效率。ExecutionGraph是為了調(diào)度存在的凉泄,加入了并行處理的概念躏尉。而在 此基礎(chǔ)上真正執(zhí)行的是Task及其相關(guān)結(jié)構(gòu)。

2.2 StreamGraph的生成

在第一節(jié)的算子注冊(cè)部分后众,我們可以看到胀糜,flink把每一個(gè)算子transform成一個(gè)對(duì)流的轉(zhuǎn)換 (比如上文中返回的SingleOutputStreamOperator是一個(gè)DataStream的子類),并且注冊(cè) 到執(zhí)行環(huán)境中蒂誉,用于生成StreamGraph教藻。實(shí)際生成StreamGraph的入口是 StreamGraphGenerator.generate(env, transformations) 其中的transformations是一 個(gè)list,里面記錄的就是我們?cè)趖ransform方法中放進(jìn)來(lái)的算子右锨。

2.2.1 StreamTransformation類代表了流的轉(zhuǎn)換
StreamTransformation代表了從一個(gè)或多個(gè)DataStream生成新DataStream的操作括堤。順便,DataStream類在內(nèi)部組合了一個(gè)StreamTransformation類绍移,實(shí)際的轉(zhuǎn)換操作均通過(guò)該 類完成悄窃。

我們可以看到,從source到各種map,union再到sink操作全部被映射成了 StreamTransformation蹂窖。

image.png

以MapFunction為例:

  • 首先轧抗,用戶代碼里定義的UDF會(huì)被當(dāng)作其基類對(duì)待,然后交給StreamMap這個(gè)operator 做進(jìn)一步包裝瞬测。事實(shí)上横媚,每一個(gè)Transformation都對(duì)應(yīng)了一個(gè)StreamOperator纠炮。
  • 由于map這個(gè)操作只接受一個(gè)輸入,所以再被進(jìn)一步包裝為OneInputTransformation灯蝴。
  • 最后恢口,將該transformation注冊(cè)到執(zhí)行環(huán)境中,當(dāng)執(zhí)行上文提到的generate方法時(shí)穷躁,生成 StreamGraph圖結(jié)構(gòu)耕肩。

另外,并不是每一個(gè) StreamTransformation 都會(huì)轉(zhuǎn)換成runtime層中的物理操作折砸。 有一些只是邏輯概念看疗,比如union、split/select睦授、partition等两芳。如下圖所示的轉(zhuǎn)換 樹,在運(yùn)行時(shí)會(huì)優(yōu)化成下方的操作圖去枷。

image.png

2.2.2 StreamGraph生成函數(shù)分析

2.2.2 StreamGraph生成函數(shù)分析

image.png

[圖片上傳失敗...(image-8e8652-1609649040159)]

因?yàn)閙ap怖辆,filter等常用操作都是OneInputStreamOperator,我們就來(lái)看看transformOneInputTransform((OneInputTransformation<?, ?>) transform) 方法。

image.png

2.2.3 WordCount函數(shù)的StreamGraph
flink提供了一個(gè)StreamGraph可視化顯示工具删顶,在這里 我們可以把我們的程序的執(zhí)行計(jì)劃打印出 來(lái)System.out.println(env.getExecutionPlan()); 復(fù)制到這個(gè)網(wǎng)站上竖螃,點(diǎn)擊生成,如圖 所示:

image.png

可以看到逗余,我們?cè)闯绦虮晦D(zhuǎn)化成了4個(gè)operator特咆。 另外,在operator之間的連線上也顯示出了flink添加的一些邏輯流程录粱。由于我設(shè)定了每個(gè)操作 符的并行度都是1腻格,所以在每個(gè)操作符之間都是直接FORWARD,不存在shuffle的過(guò)程啥繁。

2.3 JobGraph的生成
flink會(huì)根據(jù)上一步生成的StreamGraph生成JobGraph菜职,然后將JobGraph發(fā)送到server端進(jìn) 行ExecutionGraph的解析

2.3.1 JobGraph生成源碼
與StreamGraph類似,JobGraph的入口方法
是 StreamingJobGraphGenerator.createJobGraph() 旗闽。我們直接來(lái)看源碼

image.png

2.3.2 operator chain的邏輯

為了更高效地分布式執(zhí)行酬核,F(xiàn)link會(huì)盡可能地將operator的subtask鏈接(chain)在一起 形成task。每個(gè)task在一個(gè)線程中執(zhí)行适室。將operators鏈接成task是非常有效的優(yōu)化:它 能減少線程之間的切換嫡意,減少消息的序列化/反序列化,減少數(shù)據(jù)在緩沖區(qū)的交換捣辆,減少 了延遲的同時(shí)提高整體的吞吐量鹅很。

image.png

上圖中將KeyAggregation和Sink兩個(gè)operator進(jìn)行了合并,因?yàn)檫@兩個(gè)合并后并不會(huì)改變整 體的拓?fù)浣Y(jié)構(gòu)罪帖。但是促煮,并不是任意兩個(gè) operator 就能 chain 一起的,其條件還是很苛刻的:

  • 上下游的并行度一致
  • 下游節(jié)點(diǎn)的入度為1 (也就是說(shuō)下游節(jié)點(diǎn)沒(méi)有來(lái)自其他節(jié)點(diǎn)的輸入) 上下游節(jié)點(diǎn)都在同一個(gè) slot group 中(下面會(huì)解釋 slot group)
  • 下游節(jié)點(diǎn)的 chain 策略為 ALWAYS(可以與上下游鏈接,map整袁、flatmap菠齿、filter等 默認(rèn)是ALWAYS)
  • 上游節(jié)點(diǎn)的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接,不能與上游鏈 接坐昙,Source默認(rèn)是HEAD)
  • 兩個(gè)節(jié)點(diǎn)間數(shù)據(jù)分區(qū)方式是 forward(參考理解數(shù)據(jù)流的分區(qū))
  • 用戶沒(méi)有禁用 chain

flink的chain邏輯是一種很常見的設(shè)計(jì)绳匀,比如spring的interceptor也是類似的實(shí)現(xiàn)方式。通過(guò) 把操作符串成一個(gè)大操作符炸客,flink避免了把數(shù)據(jù)序列化后通過(guò)網(wǎng)絡(luò)發(fā)送給其他節(jié)點(diǎn)的開銷疾棵,能 夠大大增強(qiáng)效率。

2.3.3 JobGraph的提交
前面已經(jīng)提到痹仙,JobGraph的提交依賴于JobClient和JobManager之間的異步通信是尔,如圖所 示:

image.png

在submitJobAndWait方法中,其首先會(huì)創(chuàng)建一個(gè)JobClientActor的ActorRef,然后向其發(fā)起 一個(gè)SubmitJobAndWait消息开仰,該消息將JobGraph的實(shí)例提交給JobClientActor拟枚。發(fā)起模式是ask,它表示需要一個(gè)應(yīng)答消息众弓。

Future<Object> future = Patterns.ask(jobClientActor, new JobClientMessa ges.SubmitJobAndWait(jobGraph), new Timeout(AkkaUtils.INF_TIMEOUT()));
answer = Await.result(future, AkkaUtils.INF_TIMEOUT());

該SubmitJobAndWait消息被JobClientActor接收后恩溅,最終通過(guò)調(diào)用tryToSubmitJob方法觸 發(fā)真正的提交動(dòng)作。當(dāng)JobManager的actor接收到來(lái)自client端的請(qǐng)求后谓娃,會(huì)執(zhí)行一個(gè) submitJob方法脚乡,主要做以下事情:

向BlobLibraryCacheManager注冊(cè)該Job;
構(gòu)建ExecutionGraph對(duì)象;
對(duì)JobGraph中的每個(gè)頂點(diǎn)進(jìn)行初始化; 將DAG拓?fù)渲袕膕ource開始排序,排序后的頂點(diǎn)集合附加到Exec> - utionGraph對(duì)象;
獲取檢查點(diǎn)相關(guān)的配置滨达,并將其設(shè)置到ExecutionGraph對(duì)象; 向ExecutionGraph注冊(cè)相關(guān)的listener; 執(zhí)行恢復(fù)操作或者將JobGraph信息寫入SubmittedJobGraphStore以在后續(xù)用于恢 復(fù)目的;
響應(yīng)給客戶端JobSubmitSuccess消息;
對(duì)ExecutionGraph對(duì)象進(jìn)行調(diào)度執(zhí)行;

最后奶稠,JobManger會(huì)返回消息給JobClient,通知該任務(wù)是否提交成功弦悉。

2.4 ExecutionGraph的生成
與StreamGraph和JobGraph不同窒典,ExecutionGraph并不是在我們的客戶端程序生成,而是 在服務(wù)端(JobManager處)生成的稽莉,順便flink只維護(hù)一個(gè)JobManager瀑志。其入口代碼
是 ExecutionGraphBuilder.buildGraph(...)

該方法長(zhǎng)200多行,其中一大半是checkpoiont的相關(guān)邏輯污秆,我們暫且略過(guò)劈猪,直接看核心方
法 executionGraph.attachJobGraph(sortedTopology)

因?yàn)镋xecutionGraph事實(shí)上只是改動(dòng)了JobGraph的每個(gè)節(jié)點(diǎn),而沒(méi)有對(duì)整個(gè)拓?fù)浣Y(jié)構(gòu)進(jìn)行變 動(dòng)良拼,所以代碼里只是挨個(gè)遍歷jobVertex并進(jìn)行處理:

image.png

至此战得,ExecutorGraph就創(chuàng)建完成了。

3. 任務(wù)的調(diào)度與執(zhí)行
關(guān)于flink的任務(wù)執(zhí)行架構(gòu)庸推,官網(wǎng)的這兩張圖就是最好的說(shuō)明:

image.png

Flink 集群?jiǎn)?dòng)后常侦,首先會(huì)啟動(dòng)一個(gè) JobManger 和多個(gè)的 TaskManager浇冰。用戶的代碼會(huì)由 JobClient 提交給 JobManager,JobManager 再把來(lái)自不同用戶的任務(wù)發(fā)給 不同的 TaskManager 去執(zhí)行聋亡,每個(gè)TaskManager管理著多個(gè)task肘习,task是執(zhí)行計(jì)算的最小結(jié)構(gòu), TaskManager 將心跳和統(tǒng)計(jì)信息匯報(bào)給 JobManager坡倔。TaskManager 之間以流的形式進(jìn)行 數(shù)據(jù)的傳輸漂佩。上述除了task外的三者均為獨(dú)立的 JVM 進(jìn)程。 要注意的是罪塔,TaskManager和job并非一一對(duì)應(yīng)的關(guān)系投蝉。flink調(diào)度的最小單元是task而非 TaskManager,也就是說(shuō)征堪,來(lái)自不同job的不同task可能運(yùn)行于同一個(gè)TaskManager的不同 線程上瘩缆。

image.png

一個(gè)flink任務(wù)所有可能的狀態(tài)如上圖所示。圖上畫的很明白请契,就不再贅述了咳榜。

3.1 計(jì)算資源的調(diào)度
Task slot是一個(gè)TaskManager內(nèi)資源分配的最小載體,代表了一個(gè)固定大小的資源子集爽锥,每 個(gè)TaskManager會(huì)將其所占有的資源平分給它的slot涌韩。

通過(guò)調(diào)整 task slot 的數(shù)量,用戶可以定義task之間是如何相互隔離的氯夷。每個(gè) TaskManager 有一個(gè)slot臣樱,也就意味著每個(gè)task運(yùn)行在獨(dú)立的 JVM 中。每個(gè) TaskManager 有多個(gè)slot的 話腮考,也就是說(shuō)多個(gè)task運(yùn)行在同一個(gè)JVM中雇毫。

而在同一個(gè)JVM進(jìn)程中的task,可以共享TCP連接(基于多路復(fù)用)和心跳消息踩蔚,可以減少數(shù) 據(jù)的網(wǎng)絡(luò)傳輸棚放,也能共享一些數(shù)據(jù)結(jié)構(gòu),一定程度上減少了每個(gè)task的消耗馅闽。

每個(gè)slot可以接受單個(gè)task飘蚯,也可以接受多個(gè)連續(xù)task組成的pipeline,如下圖所示福也,F(xiàn)latMap函數(shù)占用一個(gè)taskslot局骤,而key Agg函數(shù)和sink函數(shù)共用一個(gè)taskslot:

image.png

為了達(dá)到共用slot的目的,除了可以以chain的方式pipeline算子暴凑,我們還可以允許 SlotSharingGroup峦甩,如下圖所示:

image.png

我們可以把不能被chain成一條的兩個(gè)操作如flatmap和key&sink放在一個(gè)TaskSlot里執(zhí)行, 這樣做可以獲得以下好處:

  • 共用slot使得我們不再需要計(jì)算每個(gè)任務(wù)需要的總task數(shù)目现喳,直接取最高算子的并行度即可
  • 對(duì)計(jì)算資源的利用率更高凯傲。例如犬辰,通常的輕量級(jí)操作map和重量級(jí)操作Aggregate不再分 別需要一個(gè)線程,而是可以在同一個(gè)線程內(nèi)執(zhí)行泣洞,而且對(duì)于slot有限的場(chǎng)景忧风,我們可以增 大每個(gè)task的并行度了。

接下來(lái)我們還是用官網(wǎng)的圖來(lái)說(shuō)明flink是如何重用slot的:

image.png
  1. TaskManager1分配一個(gè)SharedSlot0
  2. 把source task放入一個(gè)SimpleSlot0球凰,再把該slot放入SharedSlot0
  3. 把flatmap task放入一個(gè)SimpleSlot1,再把該slot放入SharedSlot0
  4. 因?yàn)槲覀兊膄latmap task并行度是2腿宰,因此不能再放入SharedSlot0呕诉,所以向
    TaskMange21申請(qǐng)了一個(gè)新的SharedSlot0
  5. 把第二個(gè)flatmap task放進(jìn)一個(gè)新的SimpleSlot,并放進(jìn)TaskManager2的
    SharedSlot0
  6. 開始處理key&sink task吃度,因?yàn)槠洳⑿卸纫彩?甩挫,所以先把第一個(gè)task放進(jìn)
    TaskManager1的SharedSlot
  7. 把第二個(gè)key&sink放進(jìn)TaskManager2的SharedSlot

3.2 JobManager執(zhí)行job

JobManager負(fù)責(zé)接收 flink 的作業(yè),調(diào)度 task椿每,收集 job 的狀態(tài)伊者、管理 TaskManagers。被 實(shí)現(xiàn)為一個(gè) akka actor间护。

3.2.1 JobManager的組件

  • BlobServer 是一個(gè)用來(lái)管理二進(jìn)制大文件的服務(wù)亦渗,比如保存用戶上傳的jar文件,該服務(wù)會(huì) 將其寫到磁盤上汁尺。還有一些相關(guān)的類法精,如BlobCache,用于TaskManager向JobManager 下載用戶的jar文件
  • InstanceManager 用來(lái)管理當(dāng)前存活的TaskManager的組件痴突,記錄了TaskManager的心 跳信息等
  • CompletedCheckpointStore 用于保存已完成的checkpoint相關(guān)信息搂蜓,持久化到內(nèi)存中 或者zookeeper上
  • MemoryArchivist 保存了已經(jīng)提交到flink的作業(yè)的相關(guān)信息,如JobGraph等

3.2.2 JobManager的啟動(dòng)過(guò)程

image.png

  • 配置Akka并生成ActorSystem辽装,啟動(dòng)JobManager
  • 啟動(dòng)HA和metric相關(guān)服務(wù)
  • 在 startJobManagerActors() 方法中啟動(dòng)JobManagerActors帮碰,以及 webserver,TaskManagerActor拾积,ResourceManager等等
  • 阻塞等待終止
  • 集群通過(guò)LeaderService等選出JobManager的leader

3.2.3 JobManager啟動(dòng)Task
JobManager 是一個(gè)Actor殉挽,通過(guò)各種消息來(lái)完成核心邏輯:

override def handleMessage: Receive = {
case GrantLeadership(newLeaderSessionID) =>
log.info(s"JobManager $getAddress was granted leadership with leader session ID " +
s"$newLeaderSessionID.") leaderSessionID = newLeaderSessionID
.......

有幾個(gè)比較重要的消息:

  • GrantLeadership 獲得leader授權(quán),將自身被分發(fā)到的 session id 寫到 zookeeper殷勘,并 恢復(fù)所有的 jobs
  • RevokeLeadership 剝奪leader授權(quán)此再,打斷清空所有的 job 信息,但是保留作業(yè)緩存玲销,注 銷所有的 TaskManagers
  • RegisterTaskManagers 注冊(cè) TaskManager输拇,如果之前已經(jīng)注冊(cè)過(guò),則只給對(duì)應(yīng)的 Instance 發(fā)送消息贤斜,否則啟動(dòng)注冊(cè)邏輯:在 InstanceManager 中注冊(cè)該 Instance 的信 息策吠,并停止 Instance BlobLibraryCacheManager 的端口【供下載 lib 包用】逛裤,同時(shí)使用 watch 監(jiān)聽 task manager 的存活
  • SubmitJob 提交 jobGraph 最后一項(xiàng)SubmintJob就是我們要關(guān)注的,從客戶端收到JobGraph猴抹,轉(zhuǎn)換為 ExecutionGraph并執(zhí)行的過(guò)程带族。

首先做一些準(zhǔn)備工作,然后獲取一個(gè)ExecutionGraph蟀给,判斷是否是恢復(fù)的job蝙砌,然后將job保 存下來(lái),并且通知客戶端本地已經(jīng)提交成功了跋理,最后如果確認(rèn)本JobManager是leader择克,則執(zhí) 行 executionGraph.scheduleForExecution() 方法,這個(gè)方法經(jīng)過(guò)一系列調(diào)用前普,把每個(gè) ExecutionVertex傳遞給了Excution類的deploy方法

我們首先生成了一個(gè)TaskDeploymentDescriptor肚邢,然后交給
了 taskManagerGateway.submitTask() 方法執(zhí)行。接下來(lái)的部分拭卿,就屬于TaskManager的范 疇了骡湖。

3.3 TaskManager執(zhí)行task

3.3.1 TaskManager的基本組件
TaskManager是flink中資源管理的基本組件,是所有執(zhí)行任務(wù)的基本容器峻厚,提供了內(nèi)存管 理响蕴、IO管理、通信管理等一系列功能目木,本節(jié)對(duì)各個(gè)模塊進(jìn)行簡(jiǎn)要介紹换途。

  1. MemoryManager flink并沒(méi)有把所有內(nèi)存的管理都委托給JVM,因?yàn)镴VM普遍存在著存儲(chǔ) 對(duì)象密度低刽射、大內(nèi)存時(shí)GC對(duì)系統(tǒng)影響大等問(wèn)題军拟。所以flink自己抽象了一套內(nèi)存管理機(jī)制,將 所有對(duì)象序列化后放在自己的MemorySegment上進(jìn)行管理誓禁。MemoryManger涉及內(nèi)容較 多懈息,將在后續(xù)章節(jié)進(jìn)行繼續(xù)剖析。

  2. IOManager flink通過(guò)IOManager管理磁盤IO的過(guò)程摹恰,提供了同步和異步兩種寫模式辫继,又 進(jìn)一步區(qū)分了block、buffer和bulk三種讀寫方式俗慈。 IOManager提供了兩種方式枚舉磁盤文件姑宽,一種是直接遍歷文件夾下所有文件,另一種是計(jì) 數(shù)器方式闺阱,對(duì)每個(gè)文件名以遞增順序訪問(wèn)炮车。 在底層,flink將文件IO抽象為FileIOChannle,封裝了底層實(shí)現(xiàn)瘦穆。

  3. NetworkEnvironment 是TaskManager的網(wǎng)絡(luò) IO 組件纪隙,包含了追蹤中間結(jié)果和數(shù)據(jù)交換 的數(shù)據(jù)結(jié)構(gòu)。它的構(gòu)造器會(huì)統(tǒng)一將配置的內(nèi)存先分配出來(lái)扛或,抽象成 NetworkBufferPool 統(tǒng)一 管理內(nèi)存的申請(qǐng)和釋放绵咱。意思是說(shuō),在輸入和輸出數(shù)據(jù)時(shí)熙兔,不管是保留在本地內(nèi)存悲伶,等待 chain在一起的下個(gè)操作符進(jìn)行處理,還是通過(guò)網(wǎng)絡(luò)把本操作符的計(jì)算結(jié)果發(fā)送出去黔姜,都被抽 象成了NetworkBufferPool拢切。后續(xù)我們還將對(duì)這個(gè)組件進(jìn)行詳細(xì)分析。

3.3.2 TaskManager執(zhí)行Task
對(duì)于TM來(lái)說(shuō)秆吵,執(zhí)行task就是把收到的 TaskDeploymentDescriptor 對(duì)象轉(zhuǎn)換成一個(gè)task并執(zhí)
行的過(guò)程。TaskDeploymentDescriptor這個(gè)類保存了task執(zhí)行所必須的所有內(nèi)容五慈,例如序列 化的算子纳寂,輸入的InputGate和輸出的ResultPartition的定義,該task要作為幾個(gè)subtask執(zhí) 行等等泻拦。

按照正常邏輯思維毙芜,很容易想到TM的submitTask方法的行為:首先是確認(rèn)資源,如尋找 JobManager和Blob争拐,而后建立連接腋粥,解序列化算子,收集task相關(guān)信息架曹,接下來(lái)就是創(chuàng)建一個(gè)新的 Task 對(duì)象隘冲,這個(gè)task對(duì)象就是真正執(zhí)行任務(wù)的關(guān)鍵所在。


image.png

如果讀者是從頭開始看這篇blog绑雄,里面有很多對(duì)象應(yīng)該已經(jīng)比較明確其作用了(除了那個(gè) brVarManager展辞,這個(gè)是管理廣播變量的,廣播變量是一類會(huì)被分發(fā)到每個(gè)任務(wù)中的共享變 量)万牺。接下來(lái)的主要任務(wù)罗珍,就是把這個(gè)task啟動(dòng)起來(lái),然后報(bào)告說(shuō)已經(jīng)啟動(dòng)task了:

image.png

3.3.2.1 生成Task對(duì)象
在執(zhí)行new Task()方法時(shí),第一步是把構(gòu)造函數(shù)里的這些變量賦值給當(dāng)前task的fields脚粟。

接下來(lái)是初始化ResultPartition和InputGate覆旱。這兩個(gè)類描述了task的輸出數(shù)據(jù)和輸入數(shù)據(jù)。

image.png

最后核无,創(chuàng)建一個(gè)Thread對(duì)象扣唱,并把自己放進(jìn)該對(duì)象,這樣在執(zhí)行時(shí),自己就有了自身的線程 的引用画舌。

3.3.2.2 運(yùn)行Task對(duì)象
Task對(duì)象本身就是一個(gè)Runable堕担,因此在其run方法里定義了運(yùn)行邏輯。 第一步是切換Task的狀態(tài):

image.png

其實(shí)這里有個(gè)值得關(guān)注的點(diǎn)曲聂,就是flink里大量使用了這種while(true)的寫法來(lái)修改和檢測(cè)狀 態(tài)霹购,emmm...
接下來(lái),就是導(dǎo)入用戶類加載器并加載用戶代碼朋腋。 然后齐疙,是向網(wǎng)絡(luò)管理器注冊(cè)當(dāng)前任務(wù)(flink的各個(gè)算子在運(yùn)行時(shí)進(jìn)行數(shù)據(jù)交換需要依賴網(wǎng)絡(luò)管 理器),分配一些緩存以保存數(shù)據(jù)
然后旭咽,讀入指定的緩存文件贞奋。 然后,再把task創(chuàng)建時(shí)傳入的那一大堆變量用于創(chuàng)建一個(gè)執(zhí)行環(huán)境Envrionment穷绵。 再然后轿塔,對(duì)于那些并不是第一次執(zhí)行的task(比如失敗后重啟的)要恢復(fù)其狀態(tài)。 接下來(lái)最重要的是

1. invokable.invoke();

方法仲墨。為什么這么說(shuō)呢勾缭,因?yàn)檫@個(gè)方法就是用戶代碼所真正被執(zhí)行的入口。比如我們寫的什么 new MapFunction()的邏輯目养,最終就是在這里被執(zhí)行的俩由。這里說(shuō)一下這個(gè)invokable,這是一 個(gè)抽象類癌蚁,提供了可以被TaskManager執(zhí)行的對(duì)象的基本抽象幻梯。 這個(gè)invokable是在解析JobGraph的時(shí)候生成相關(guān)信息的,并在此處形成真正可執(zhí)行的對(duì)象

// now load the task's invokable code
2. //通過(guò)反射生成對(duì)象
3. invokable = loadAndInstantiateInvokable(userCodeClassLoader,
nameOfInvokableClass);

上圖顯示了flink提供的可被執(zhí)行的Task類型努释。從名字上就可以看出各個(gè)task的作用碘梢,在此不再 贅述。

接下來(lái)就是invoke方法了洽洁,因?yàn)槲覀兊膚ordcount例子用了流式api痘系,在此我們以StreamTask 的invoke方法為例進(jìn)行說(shuō)明。

3.3.2.3 StreamTask的執(zhí)行邏輯

image.png

image.png

StreamTask.invoke()方法里饿自,第一個(gè)值得一說(shuō)的是 TimerService 汰翠。Flink在2015年決定向 StreamTask類加入timer service的時(shí)候解釋到:

第二個(gè)要注意的是chain操作。前面提到了昭雌,flink會(huì)出于優(yōu)化的角度复唤,把一些算子chain成一個(gè) 整體的算子作為一個(gè)task來(lái)執(zhí)行。比如wordcount例子中烛卧,Source和FlatMap算子就被chain 在了一起佛纫。在進(jìn)行chain操作的時(shí)候妓局,會(huì)設(shè)定頭節(jié)點(diǎn),并且指定輸出的RecordWriter呈宇。

接下來(lái)不出所料仍然是初始化好爬,只不過(guò)初始化的對(duì)象變成了各個(gè)operator。如果是有 checkpoint的甥啄,那就從state信息里恢復(fù)存炮,不然就作為全新的算子處理。從源碼中可以看 到蜈漓,flink針對(duì)keyed算子和普通算子做了不同的處理穆桂。keyed算子在初始化時(shí)需要計(jì)算出一個(gè) group區(qū)間,這個(gè)區(qū)間的值在整個(gè)生命周期里都不會(huì)再變化融虽,后面key就會(huì)根據(jù)hash的不同結(jié) 果享完,分配到特定的group中去計(jì)算。順便提一句有额,flink的keyed算子保存的是對(duì)每個(gè)數(shù)據(jù)的 key的計(jì)算方法般又,而非真實(shí)的key,用戶需要自己保證對(duì)每一行數(shù)據(jù)提供的keySelector的冪等 性巍佑。至于為什么要用KeyGroup的設(shè)計(jì)倒源,這就牽扯到擴(kuò)容的范疇了,將在后面的章節(jié)進(jìn)行講 述句狼。

對(duì)于 openAllOperators() 方法,就是對(duì)各種RichOperator執(zhí)行其open方法热某,通衬骞剑可用于在 執(zhí)行計(jì)算之前加載資源。 最后昔馋,run方法千呼萬(wàn)喚始出來(lái)筹吐,該方法經(jīng)過(guò)一系列跳轉(zhuǎn),最終調(diào)用chain上的第一個(gè)算子的 run方法秘遏。在wordcount的例子中丘薛,它最終調(diào)用了SocketTextStreamFunction的run,建立 socket連接并讀入文本邦危。

3.4 StreamTask與StreamOperator

前面提到洋侨,Task對(duì)象在執(zhí)行過(guò)程中,把執(zhí)行的任務(wù)交給了StreamTask這個(gè)類去執(zhí)行倦蚪。在我們 的wordcount例子中希坚,實(shí)際初始化的是OneInputStreamTask的對(duì)象(參考上面的類圖)。那 么這個(gè)對(duì)象是如何執(zhí)行用戶的代碼的呢?

image.png

它做的陵且,就是把任務(wù)直接交給了InputProcessor去執(zhí)行processInput方法裁僧。這是一個(gè) StreamInputProcessor 的實(shí)例,該processor的任務(wù)就是處理輸入的數(shù)據(jù),包括用戶數(shù) 據(jù)聊疲、watermark和checkpoint數(shù)據(jù)等茬底。我們先來(lái)看看這個(gè)processor是如何產(chǎn)生的:

image.png

這是OneInputStreamTask的init方法,從configs里面獲取StreamOperator信息获洲,生成自己 的inputProcessor阱表。那么inputProcessor是如何處理數(shù)據(jù)的呢?我們接著跟進(jìn)源碼:

image.png

image.png
image.png

到此為止,以上部分就是一個(gè)flink程序啟動(dòng)后昌妹,到執(zhí)行用戶代碼之前捶枢,flink框架所做的準(zhǔn)備工 作》裳拢回顧一下:

  • 啟動(dòng)一個(gè)環(huán)境
  • 生成StreamGraph
  • 注冊(cè)和選舉JobManager
  • 在各節(jié)點(diǎn)生成TaskManager烂叔,并根據(jù)JobGraph生成對(duì)應(yīng)的Task
  • 啟動(dòng)各個(gè)task,準(zhǔn)備執(zhí)行代碼

接下來(lái)固歪,我們挑幾個(gè)Operator看看flink是如何抽象這些算子的蒜鸡。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市牢裳,隨后出現(xiàn)的幾起案子逢防,更是在濱河造成了極大的恐慌,老刑警劉巖蒲讯,帶你破解...
    沈念sama閱讀 210,978評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件忘朝,死亡現(xiàn)場(chǎng)離奇詭異吭产,居然都是意外死亡集索,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門腕让,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)晦墙,“玉大人悦昵,你說(shuō)我怎么就攤上這事∩纬” “怎么了但指?”我有些...
    開封第一講書人閱讀 156,623評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)抗楔。 經(jīng)常有香客問(wèn)我棋凳,道長(zhǎng),這世上最難降的妖魔是什么谓谦? 我笑而不...
    開封第一講書人閱讀 56,324評(píng)論 1 282
  • 正文 為了忘掉前任贫橙,我火速辦了婚禮,結(jié)果婚禮上反粥,老公的妹妹穿的比我還像新娘卢肃。我一直安慰自己疲迂,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,390評(píng)論 5 384
  • 文/花漫 我一把揭開白布莫湘。 她就那樣靜靜地躺著尤蒿,像睡著了一般。 火紅的嫁衣襯著肌膚如雪幅垮。 梳的紋絲不亂的頭發(fā)上腰池,一...
    開封第一講書人閱讀 49,741評(píng)論 1 289
  • 那天,我揣著相機(jī)與錄音忙芒,去河邊找鬼示弓。 笑死,一個(gè)胖子當(dāng)著我的面吹牛呵萨,可吹牛的內(nèi)容都是我干的奏属。 我是一名探鬼主播,決...
    沈念sama閱讀 38,892評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼潮峦,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼囱皿!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起忱嘹,我...
    開封第一講書人閱讀 37,655評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤嘱腥,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后拘悦,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體齿兔,經(jīng)...
    沈念sama閱讀 44,104評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,451評(píng)論 2 325
  • 正文 我和宋清朗相戀三年础米,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了愧驱。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,569評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡椭盏,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出吻商,到底是詐尸還是另有隱情掏颊,我是刑警寧澤,帶...
    沈念sama閱讀 34,254評(píng)論 4 328
  • 正文 年R本政府宣布艾帐,位于F島的核電站乌叶,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏柒爸。R本人自食惡果不足惜准浴,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,834評(píng)論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望捎稚。 院中可真熱鬧乐横,春花似錦求橄、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,725評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至催什,卻和暖如春涵亏,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背蒲凶。 一陣腳步聲響...
    開封第一講書人閱讀 31,950評(píng)論 1 264
  • 我被黑心中介騙來(lái)泰國(guó)打工气筋, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人旋圆。 一個(gè)月前我還...
    沈念sama閱讀 46,260評(píng)論 2 360
  • 正文 我出身青樓宠默,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親臂聋。 傳聞我的和親對(duì)象是個(gè)殘疾皇子光稼,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,446評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容