[TOC]
第一部分講到则拷,我們的主函數(shù)最后一項(xiàng)任務(wù)就是生成StreamGraph,然后生成JobGraph嘁捷,然 后以此開始調(diào)度任務(wù)運(yùn)行薪棒,所以接下來(lái)我們從這里入手手蝎,繼續(xù)探索flink。
2.1 flink的三層圖結(jié)構(gòu)
事實(shí)上俐芯,flink總共提供了三種圖的抽象棵介,我們前面已經(jīng)提到了StreamGraph和JobGraph,還 有一種是ExecutionGraph泼各,是用于調(diào)度的基本數(shù)據(jù)結(jié)構(gòu)鞍时。
上面這張圖清晰的給出了flink各個(gè)圖的工作原理和轉(zhuǎn)換過(guò)程。其中最后一個(gè)物理執(zhí)行圖并非 flink的數(shù)據(jù)結(jié)構(gòu)扣蜻,而是程序開始執(zhí)行后,各個(gè)task分布在不同的節(jié)點(diǎn)上及塘,所形成的物理上的關(guān) 系表示莽使。
- 從JobGraph的圖里可以看到,數(shù)據(jù)從上一個(gè)operator流到下一個(gè)operator的過(guò)程中笙僚,上 游作為生產(chǎn)者提供了IntermediateDataSet芳肌,而下游作為消費(fèi)者需要JobEdge。事實(shí) 上,JobEdge是一個(gè)通信管道亿笤,連接了上游生產(chǎn)的dataset和下游的JobVertex節(jié)點(diǎn)翎迁。
- 在JobGraph轉(zhuǎn)換到ExecutionGraph的過(guò)程中,主要發(fā)生了以下轉(zhuǎn)變:
- 加入了并行度的概念净薛,成為真正可調(diào)度的圖結(jié)構(gòu)
- 生成了與JobVertex對(duì)應(yīng)的ExecutionJobVertex汪榔,ExecutionVertex,與 IntermediateDataSet對(duì)應(yīng)的IntermediateResult和IntermediateResultPartition等肃拜, 并行將通過(guò)這些類實(shí)現(xiàn)
- ExecutionGraph已經(jīng)可以用于調(diào)度任務(wù)痴腌。我們可以看到,flink根據(jù)該圖生成了一一對(duì)應(yīng)的 Task燃领,每個(gè)task對(duì)應(yīng)一個(gè)ExecutionGraph的一個(gè)Execution士聪。Task用InputGate、 InputChannel和ResultPartition對(duì)應(yīng)了上面圖中的IntermediateResult和 ExecutionEdge猛蔽。
那么剥悟,flink抽象出這三層圖結(jié)構(gòu),四層執(zhí)行邏輯的意義是什么呢? StreamGraph是對(duì)用戶邏輯的映射曼库。JobGraph在此基礎(chǔ)上進(jìn)行了一些優(yōu)化区岗,比如把一部分操 作串成chain以提高效率。ExecutionGraph是為了調(diào)度存在的凉泄,加入了并行處理的概念躏尉。而在 此基礎(chǔ)上真正執(zhí)行的是Task及其相關(guān)結(jié)構(gòu)。
2.2 StreamGraph的生成
在第一節(jié)的算子注冊(cè)部分后众,我們可以看到胀糜,flink把每一個(gè)算子transform成一個(gè)對(duì)流的轉(zhuǎn)換 (比如上文中返回的SingleOutputStreamOperator是一個(gè)DataStream的子類),并且注冊(cè) 到執(zhí)行環(huán)境中蒂誉,用于生成StreamGraph教藻。實(shí)際生成StreamGraph的入口是 StreamGraphGenerator.generate(env, transformations) 其中的transformations是一 個(gè)list,里面記錄的就是我們?cè)趖ransform方法中放進(jìn)來(lái)的算子右锨。
2.2.1 StreamTransformation類代表了流的轉(zhuǎn)換
StreamTransformation代表了從一個(gè)或多個(gè)DataStream生成新DataStream的操作括堤。順便,DataStream類在內(nèi)部組合了一個(gè)StreamTransformation類绍移,實(shí)際的轉(zhuǎn)換操作均通過(guò)該 類完成悄窃。
我們可以看到,從source到各種map,union再到sink操作全部被映射成了 StreamTransformation蹂窖。
以MapFunction為例:
- 首先轧抗,用戶代碼里定義的UDF會(huì)被當(dāng)作其基類對(duì)待,然后交給StreamMap這個(gè)operator 做進(jìn)一步包裝瞬测。事實(shí)上横媚,每一個(gè)Transformation都對(duì)應(yīng)了一個(gè)StreamOperator纠炮。
- 由于map這個(gè)操作只接受一個(gè)輸入,所以再被進(jìn)一步包裝為OneInputTransformation灯蝴。
- 最后恢口,將該transformation注冊(cè)到執(zhí)行環(huán)境中,當(dāng)執(zhí)行上文提到的generate方法時(shí)穷躁,生成 StreamGraph圖結(jié)構(gòu)耕肩。
另外,并不是每一個(gè) StreamTransformation 都會(huì)轉(zhuǎn)換成runtime層中的物理操作折砸。 有一些只是邏輯概念看疗,比如union、split/select睦授、partition等两芳。如下圖所示的轉(zhuǎn)換 樹,在運(yùn)行時(shí)會(huì)優(yōu)化成下方的操作圖去枷。
2.2.2 StreamGraph生成函數(shù)分析
2.2.2 StreamGraph生成函數(shù)分析
[圖片上傳失敗...(image-8e8652-1609649040159)]
因?yàn)閙ap怖辆,filter等常用操作都是OneInputStreamOperator,我們就來(lái)看看transformOneInputTransform((OneInputTransformation<?, ?>) transform) 方法。
2.2.3 WordCount函數(shù)的StreamGraph
flink提供了一個(gè)StreamGraph可視化顯示工具删顶,在這里 我們可以把我們的程序的執(zhí)行計(jì)劃打印出 來(lái)System.out.println(env.getExecutionPlan()); 復(fù)制到這個(gè)網(wǎng)站上竖螃,點(diǎn)擊生成,如圖 所示:
可以看到逗余,我們?cè)闯绦虮晦D(zhuǎn)化成了4個(gè)operator特咆。 另外,在operator之間的連線上也顯示出了flink添加的一些邏輯流程录粱。由于我設(shè)定了每個(gè)操作 符的并行度都是1腻格,所以在每個(gè)操作符之間都是直接FORWARD,不存在shuffle的過(guò)程啥繁。
2.3 JobGraph的生成
flink會(huì)根據(jù)上一步生成的StreamGraph生成JobGraph菜职,然后將JobGraph發(fā)送到server端進(jìn) 行ExecutionGraph的解析
2.3.1 JobGraph生成源碼
與StreamGraph類似,JobGraph的入口方法
是 StreamingJobGraphGenerator.createJobGraph() 旗闽。我們直接來(lái)看源碼
2.3.2 operator chain的邏輯
為了更高效地分布式執(zhí)行酬核,F(xiàn)link會(huì)盡可能地將operator的subtask鏈接(chain)在一起 形成task。每個(gè)task在一個(gè)線程中執(zhí)行适室。將operators鏈接成task是非常有效的優(yōu)化:它 能減少線程之間的切換嫡意,減少消息的序列化/反序列化,減少數(shù)據(jù)在緩沖區(qū)的交換捣辆,減少 了延遲的同時(shí)提高整體的吞吐量鹅很。
上圖中將KeyAggregation和Sink兩個(gè)operator進(jìn)行了合并,因?yàn)檫@兩個(gè)合并后并不會(huì)改變整 體的拓?fù)浣Y(jié)構(gòu)罪帖。但是促煮,并不是任意兩個(gè) operator 就能 chain 一起的,其條件還是很苛刻的:
- 上下游的并行度一致
- 下游節(jié)點(diǎn)的入度為1 (也就是說(shuō)下游節(jié)點(diǎn)沒(méi)有來(lái)自其他節(jié)點(diǎn)的輸入) 上下游節(jié)點(diǎn)都在同一個(gè) slot group 中(下面會(huì)解釋 slot group)
- 下游節(jié)點(diǎn)的 chain 策略為 ALWAYS(可以與上下游鏈接,map整袁、flatmap菠齿、filter等 默認(rèn)是ALWAYS)
- 上游節(jié)點(diǎn)的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接,不能與上游鏈 接坐昙,Source默認(rèn)是HEAD)
- 兩個(gè)節(jié)點(diǎn)間數(shù)據(jù)分區(qū)方式是 forward(參考理解數(shù)據(jù)流的分區(qū))
- 用戶沒(méi)有禁用 chain
flink的chain邏輯是一種很常見的設(shè)計(jì)绳匀,比如spring的interceptor也是類似的實(shí)現(xiàn)方式。通過(guò) 把操作符串成一個(gè)大操作符炸客,flink避免了把數(shù)據(jù)序列化后通過(guò)網(wǎng)絡(luò)發(fā)送給其他節(jié)點(diǎn)的開銷疾棵,能 夠大大增強(qiáng)效率。
2.3.3 JobGraph的提交
前面已經(jīng)提到痹仙,JobGraph的提交依賴于JobClient和JobManager之間的異步通信是尔,如圖所 示:
在submitJobAndWait方法中,其首先會(huì)創(chuàng)建一個(gè)JobClientActor的ActorRef,然后向其發(fā)起 一個(gè)SubmitJobAndWait消息开仰,該消息將JobGraph的實(shí)例提交給JobClientActor拟枚。發(fā)起模式是ask,它表示需要一個(gè)應(yīng)答消息众弓。
Future<Object> future = Patterns.ask(jobClientActor, new JobClientMessa ges.SubmitJobAndWait(jobGraph), new Timeout(AkkaUtils.INF_TIMEOUT()));
answer = Await.result(future, AkkaUtils.INF_TIMEOUT());
該SubmitJobAndWait消息被JobClientActor接收后恩溅,最終通過(guò)調(diào)用tryToSubmitJob方法觸 發(fā)真正的提交動(dòng)作。當(dāng)JobManager的actor接收到來(lái)自client端的請(qǐng)求后谓娃,會(huì)執(zhí)行一個(gè) submitJob方法脚乡,主要做以下事情:
向BlobLibraryCacheManager注冊(cè)該Job;
構(gòu)建ExecutionGraph對(duì)象;
對(duì)JobGraph中的每個(gè)頂點(diǎn)進(jìn)行初始化; 將DAG拓?fù)渲袕膕ource開始排序,排序后的頂點(diǎn)集合附加到Exec> - utionGraph對(duì)象;
獲取檢查點(diǎn)相關(guān)的配置滨达,并將其設(shè)置到ExecutionGraph對(duì)象; 向ExecutionGraph注冊(cè)相關(guān)的listener; 執(zhí)行恢復(fù)操作或者將JobGraph信息寫入SubmittedJobGraphStore以在后續(xù)用于恢 復(fù)目的;
響應(yīng)給客戶端JobSubmitSuccess消息;
對(duì)ExecutionGraph對(duì)象進(jìn)行調(diào)度執(zhí)行;
最后奶稠,JobManger會(huì)返回消息給JobClient,通知該任務(wù)是否提交成功弦悉。
2.4 ExecutionGraph的生成
與StreamGraph和JobGraph不同窒典,ExecutionGraph并不是在我們的客戶端程序生成,而是 在服務(wù)端(JobManager處)生成的稽莉,順便flink只維護(hù)一個(gè)JobManager瀑志。其入口代碼
是 ExecutionGraphBuilder.buildGraph(...)
該方法長(zhǎng)200多行,其中一大半是checkpoiont的相關(guān)邏輯污秆,我們暫且略過(guò)劈猪,直接看核心方
法 executionGraph.attachJobGraph(sortedTopology)
因?yàn)镋xecutionGraph事實(shí)上只是改動(dòng)了JobGraph的每個(gè)節(jié)點(diǎn),而沒(méi)有對(duì)整個(gè)拓?fù)浣Y(jié)構(gòu)進(jìn)行變 動(dòng)良拼,所以代碼里只是挨個(gè)遍歷jobVertex并進(jìn)行處理:
至此战得,ExecutorGraph就創(chuàng)建完成了。
3. 任務(wù)的調(diào)度與執(zhí)行
關(guān)于flink的任務(wù)執(zhí)行架構(gòu)庸推,官網(wǎng)的這兩張圖就是最好的說(shuō)明:
Flink 集群?jiǎn)?dòng)后常侦,首先會(huì)啟動(dòng)一個(gè) JobManger 和多個(gè)的 TaskManager浇冰。用戶的代碼會(huì)由 JobClient 提交給 JobManager,JobManager 再把來(lái)自不同用戶的任務(wù)發(fā)給 不同的 TaskManager 去執(zhí)行聋亡,每個(gè)TaskManager管理著多個(gè)task肘习,task是執(zhí)行計(jì)算的最小結(jié)構(gòu), TaskManager 將心跳和統(tǒng)計(jì)信息匯報(bào)給 JobManager坡倔。TaskManager 之間以流的形式進(jìn)行 數(shù)據(jù)的傳輸漂佩。上述除了task外的三者均為獨(dú)立的 JVM 進(jìn)程。 要注意的是罪塔,TaskManager和job并非一一對(duì)應(yīng)的關(guān)系投蝉。flink調(diào)度的最小單元是task而非 TaskManager,也就是說(shuō)征堪,來(lái)自不同job的不同task可能運(yùn)行于同一個(gè)TaskManager的不同 線程上瘩缆。
一個(gè)flink任務(wù)所有可能的狀態(tài)如上圖所示。圖上畫的很明白请契,就不再贅述了咳榜。
3.1 計(jì)算資源的調(diào)度
Task slot是一個(gè)TaskManager內(nèi)資源分配的最小載體,代表了一個(gè)固定大小的資源子集爽锥,每 個(gè)TaskManager會(huì)將其所占有的資源平分給它的slot涌韩。
通過(guò)調(diào)整 task slot 的數(shù)量,用戶可以定義task之間是如何相互隔離的氯夷。每個(gè) TaskManager 有一個(gè)slot臣樱,也就意味著每個(gè)task運(yùn)行在獨(dú)立的 JVM 中。每個(gè) TaskManager 有多個(gè)slot的 話腮考,也就是說(shuō)多個(gè)task運(yùn)行在同一個(gè)JVM中雇毫。
而在同一個(gè)JVM進(jìn)程中的task,可以共享TCP連接(基于多路復(fù)用)和心跳消息踩蔚,可以減少數(shù) 據(jù)的網(wǎng)絡(luò)傳輸棚放,也能共享一些數(shù)據(jù)結(jié)構(gòu),一定程度上減少了每個(gè)task的消耗馅闽。
每個(gè)slot可以接受單個(gè)task飘蚯,也可以接受多個(gè)連續(xù)task組成的pipeline,如下圖所示福也,F(xiàn)latMap函數(shù)占用一個(gè)taskslot局骤,而key Agg函數(shù)和sink函數(shù)共用一個(gè)taskslot:
為了達(dá)到共用slot的目的,除了可以以chain的方式pipeline算子暴凑,我們還可以允許 SlotSharingGroup峦甩,如下圖所示:
我們可以把不能被chain成一條的兩個(gè)操作如flatmap和key&sink放在一個(gè)TaskSlot里執(zhí)行, 這樣做可以獲得以下好處:
- 共用slot使得我們不再需要計(jì)算每個(gè)任務(wù)需要的總task數(shù)目现喳,直接取最高算子的并行度即可
- 對(duì)計(jì)算資源的利用率更高凯傲。例如犬辰,通常的輕量級(jí)操作map和重量級(jí)操作Aggregate不再分 別需要一個(gè)線程,而是可以在同一個(gè)線程內(nèi)執(zhí)行泣洞,而且對(duì)于slot有限的場(chǎng)景忧风,我們可以增 大每個(gè)task的并行度了。
接下來(lái)我們還是用官網(wǎng)的圖來(lái)說(shuō)明flink是如何重用slot的:
- TaskManager1分配一個(gè)SharedSlot0
- 把source task放入一個(gè)SimpleSlot0球凰,再把該slot放入SharedSlot0
- 把flatmap task放入一個(gè)SimpleSlot1,再把該slot放入SharedSlot0
- 因?yàn)槲覀兊膄latmap task并行度是2腿宰,因此不能再放入SharedSlot0呕诉,所以向
TaskMange21申請(qǐng)了一個(gè)新的SharedSlot0 - 把第二個(gè)flatmap task放進(jìn)一個(gè)新的SimpleSlot,并放進(jìn)TaskManager2的
SharedSlot0 - 開始處理key&sink task吃度,因?yàn)槠洳⑿卸纫彩?甩挫,所以先把第一個(gè)task放進(jìn)
TaskManager1的SharedSlot - 把第二個(gè)key&sink放進(jìn)TaskManager2的SharedSlot
3.2 JobManager執(zhí)行job
JobManager負(fù)責(zé)接收 flink 的作業(yè),調(diào)度 task椿每,收集 job 的狀態(tài)伊者、管理 TaskManagers。被 實(shí)現(xiàn)為一個(gè) akka actor间护。
3.2.1 JobManager的組件
- BlobServer 是一個(gè)用來(lái)管理二進(jìn)制大文件的服務(wù)亦渗,比如保存用戶上傳的jar文件,該服務(wù)會(huì) 將其寫到磁盤上汁尺。還有一些相關(guān)的類法精,如BlobCache,用于TaskManager向JobManager 下載用戶的jar文件
- InstanceManager 用來(lái)管理當(dāng)前存活的TaskManager的組件痴突,記錄了TaskManager的心 跳信息等
- CompletedCheckpointStore 用于保存已完成的checkpoint相關(guān)信息搂蜓,持久化到內(nèi)存中 或者zookeeper上
- MemoryArchivist 保存了已經(jīng)提交到flink的作業(yè)的相關(guān)信息,如JobGraph等
3.2.2 JobManager的啟動(dòng)過(guò)程
- 配置Akka并生成ActorSystem辽装,啟動(dòng)JobManager
- 啟動(dòng)HA和metric相關(guān)服務(wù)
- 在 startJobManagerActors() 方法中啟動(dòng)JobManagerActors帮碰,以及 webserver,TaskManagerActor拾积,ResourceManager等等
- 阻塞等待終止
- 集群通過(guò)LeaderService等選出JobManager的leader
3.2.3 JobManager啟動(dòng)Task
JobManager 是一個(gè)Actor殉挽,通過(guò)各種消息來(lái)完成核心邏輯:
override def handleMessage: Receive = {
case GrantLeadership(newLeaderSessionID) =>
log.info(s"JobManager $getAddress was granted leadership with leader session ID " +
s"$newLeaderSessionID.") leaderSessionID = newLeaderSessionID
.......
有幾個(gè)比較重要的消息:
- GrantLeadership 獲得leader授權(quán),將自身被分發(fā)到的 session id 寫到 zookeeper殷勘,并 恢復(fù)所有的 jobs
- RevokeLeadership 剝奪leader授權(quán)此再,打斷清空所有的 job 信息,但是保留作業(yè)緩存玲销,注 銷所有的 TaskManagers
- RegisterTaskManagers 注冊(cè) TaskManager输拇,如果之前已經(jīng)注冊(cè)過(guò),則只給對(duì)應(yīng)的 Instance 發(fā)送消息贤斜,否則啟動(dòng)注冊(cè)邏輯:在 InstanceManager 中注冊(cè)該 Instance 的信 息策吠,并停止 Instance BlobLibraryCacheManager 的端口【供下載 lib 包用】逛裤,同時(shí)使用 watch 監(jiān)聽 task manager 的存活
- SubmitJob 提交 jobGraph 最后一項(xiàng)SubmintJob就是我們要關(guān)注的,從客戶端收到JobGraph猴抹,轉(zhuǎn)換為 ExecutionGraph并執(zhí)行的過(guò)程带族。
首先做一些準(zhǔn)備工作,然后獲取一個(gè)ExecutionGraph蟀给,判斷是否是恢復(fù)的job蝙砌,然后將job保 存下來(lái),并且通知客戶端本地已經(jīng)提交成功了跋理,最后如果確認(rèn)本JobManager是leader择克,則執(zhí) 行 executionGraph.scheduleForExecution() 方法,這個(gè)方法經(jīng)過(guò)一系列調(diào)用前普,把每個(gè) ExecutionVertex傳遞給了Excution類的deploy方法
我們首先生成了一個(gè)TaskDeploymentDescriptor肚邢,然后交給
了 taskManagerGateway.submitTask() 方法執(zhí)行。接下來(lái)的部分拭卿,就屬于TaskManager的范 疇了骡湖。
3.3 TaskManager執(zhí)行task
3.3.1 TaskManager的基本組件
TaskManager是flink中資源管理的基本組件,是所有執(zhí)行任務(wù)的基本容器峻厚,提供了內(nèi)存管 理响蕴、IO管理、通信管理等一系列功能目木,本節(jié)對(duì)各個(gè)模塊進(jìn)行簡(jiǎn)要介紹换途。
MemoryManager flink并沒(méi)有把所有內(nèi)存的管理都委托給JVM,因?yàn)镴VM普遍存在著存儲(chǔ) 對(duì)象密度低刽射、大內(nèi)存時(shí)GC對(duì)系統(tǒng)影響大等問(wèn)題军拟。所以flink自己抽象了一套內(nèi)存管理機(jī)制,將 所有對(duì)象序列化后放在自己的MemorySegment上進(jìn)行管理誓禁。MemoryManger涉及內(nèi)容較 多懈息,將在后續(xù)章節(jié)進(jìn)行繼續(xù)剖析。
IOManager flink通過(guò)IOManager管理磁盤IO的過(guò)程摹恰,提供了同步和異步兩種寫模式辫继,又 進(jìn)一步區(qū)分了block、buffer和bulk三種讀寫方式俗慈。 IOManager提供了兩種方式枚舉磁盤文件姑宽,一種是直接遍歷文件夾下所有文件,另一種是計(jì) 數(shù)器方式闺阱,對(duì)每個(gè)文件名以遞增順序訪問(wèn)炮车。 在底層,flink將文件IO抽象為FileIOChannle,封裝了底層實(shí)現(xiàn)瘦穆。
NetworkEnvironment 是TaskManager的網(wǎng)絡(luò) IO 組件纪隙,包含了追蹤中間結(jié)果和數(shù)據(jù)交換 的數(shù)據(jù)結(jié)構(gòu)。它的構(gòu)造器會(huì)統(tǒng)一將配置的內(nèi)存先分配出來(lái)扛或,抽象成 NetworkBufferPool 統(tǒng)一 管理內(nèi)存的申請(qǐng)和釋放绵咱。意思是說(shuō),在輸入和輸出數(shù)據(jù)時(shí)熙兔,不管是保留在本地內(nèi)存悲伶,等待 chain在一起的下個(gè)操作符進(jìn)行處理,還是通過(guò)網(wǎng)絡(luò)把本操作符的計(jì)算結(jié)果發(fā)送出去黔姜,都被抽 象成了NetworkBufferPool拢切。后續(xù)我們還將對(duì)這個(gè)組件進(jìn)行詳細(xì)分析。
3.3.2 TaskManager執(zhí)行Task
對(duì)于TM來(lái)說(shuō)秆吵,執(zhí)行task就是把收到的 TaskDeploymentDescriptor 對(duì)象轉(zhuǎn)換成一個(gè)task并執(zhí)
行的過(guò)程。TaskDeploymentDescriptor這個(gè)類保存了task執(zhí)行所必須的所有內(nèi)容五慈,例如序列 化的算子纳寂,輸入的InputGate和輸出的ResultPartition的定義,該task要作為幾個(gè)subtask執(zhí) 行等等泻拦。
按照正常邏輯思維毙芜,很容易想到TM的submitTask方法的行為:首先是確認(rèn)資源,如尋找 JobManager和Blob争拐,而后建立連接腋粥,解序列化算子,收集task相關(guān)信息架曹,接下來(lái)就是創(chuàng)建一個(gè)新的 Task 對(duì)象隘冲,這個(gè)task對(duì)象就是真正執(zhí)行任務(wù)的關(guān)鍵所在。
如果讀者是從頭開始看這篇blog绑雄,里面有很多對(duì)象應(yīng)該已經(jīng)比較明確其作用了(除了那個(gè) brVarManager展辞,這個(gè)是管理廣播變量的,廣播變量是一類會(huì)被分發(fā)到每個(gè)任務(wù)中的共享變 量)万牺。接下來(lái)的主要任務(wù)罗珍,就是把這個(gè)task啟動(dòng)起來(lái),然后報(bào)告說(shuō)已經(jīng)啟動(dòng)task了:
3.3.2.1 生成Task對(duì)象
在執(zhí)行new Task()方法時(shí),第一步是把構(gòu)造函數(shù)里的這些變量賦值給當(dāng)前task的fields脚粟。
接下來(lái)是初始化ResultPartition和InputGate覆旱。這兩個(gè)類描述了task的輸出數(shù)據(jù)和輸入數(shù)據(jù)。
最后核无,創(chuàng)建一個(gè)Thread對(duì)象扣唱,并把自己放進(jìn)該對(duì)象,這樣在執(zhí)行時(shí),自己就有了自身的線程 的引用画舌。
3.3.2.2 運(yùn)行Task對(duì)象
Task對(duì)象本身就是一個(gè)Runable堕担,因此在其run方法里定義了運(yùn)行邏輯。 第一步是切換Task的狀態(tài):
其實(shí)這里有個(gè)值得關(guān)注的點(diǎn)曲聂,就是flink里大量使用了這種while(true)的寫法來(lái)修改和檢測(cè)狀 態(tài)霹购,emmm...
接下來(lái),就是導(dǎo)入用戶類加載器并加載用戶代碼朋腋。 然后齐疙,是向網(wǎng)絡(luò)管理器注冊(cè)當(dāng)前任務(wù)(flink的各個(gè)算子在運(yùn)行時(shí)進(jìn)行數(shù)據(jù)交換需要依賴網(wǎng)絡(luò)管 理器),分配一些緩存以保存數(shù)據(jù)
然后旭咽,讀入指定的緩存文件贞奋。 然后,再把task創(chuàng)建時(shí)傳入的那一大堆變量用于創(chuàng)建一個(gè)執(zhí)行環(huán)境Envrionment穷绵。 再然后轿塔,對(duì)于那些并不是第一次執(zhí)行的task(比如失敗后重啟的)要恢復(fù)其狀態(tài)。 接下來(lái)最重要的是
1. invokable.invoke();
方法仲墨。為什么這么說(shuō)呢勾缭,因?yàn)檫@個(gè)方法就是用戶代碼所真正被執(zhí)行的入口。比如我們寫的什么 new MapFunction()的邏輯目养,最終就是在這里被執(zhí)行的俩由。這里說(shuō)一下這個(gè)invokable,這是一 個(gè)抽象類癌蚁,提供了可以被TaskManager執(zhí)行的對(duì)象的基本抽象幻梯。 這個(gè)invokable是在解析JobGraph的時(shí)候生成相關(guān)信息的,并在此處形成真正可執(zhí)行的對(duì)象
// now load the task's invokable code
2. //通過(guò)反射生成對(duì)象
3. invokable = loadAndInstantiateInvokable(userCodeClassLoader,
nameOfInvokableClass);
上圖顯示了flink提供的可被執(zhí)行的Task類型努释。從名字上就可以看出各個(gè)task的作用碘梢,在此不再 贅述。
接下來(lái)就是invoke方法了洽洁,因?yàn)槲覀兊膚ordcount例子用了流式api痘系,在此我們以StreamTask 的invoke方法為例進(jìn)行說(shuō)明。
3.3.2.3 StreamTask的執(zhí)行邏輯
StreamTask.invoke()方法里饿自,第一個(gè)值得一說(shuō)的是 TimerService 汰翠。Flink在2015年決定向 StreamTask類加入timer service的時(shí)候解釋到:
第二個(gè)要注意的是chain操作。前面提到了昭雌,flink會(huì)出于優(yōu)化的角度复唤,把一些算子chain成一個(gè) 整體的算子作為一個(gè)task來(lái)執(zhí)行。比如wordcount例子中烛卧,Source和FlatMap算子就被chain 在了一起佛纫。在進(jìn)行chain操作的時(shí)候妓局,會(huì)設(shè)定頭節(jié)點(diǎn),并且指定輸出的RecordWriter呈宇。
接下來(lái)不出所料仍然是初始化好爬,只不過(guò)初始化的對(duì)象變成了各個(gè)operator。如果是有 checkpoint的甥啄,那就從state信息里恢復(fù)存炮,不然就作為全新的算子處理。從源碼中可以看 到蜈漓,flink針對(duì)keyed算子和普通算子做了不同的處理穆桂。keyed算子在初始化時(shí)需要計(jì)算出一個(gè) group區(qū)間,這個(gè)區(qū)間的值在整個(gè)生命周期里都不會(huì)再變化融虽,后面key就會(huì)根據(jù)hash的不同結(jié) 果享完,分配到特定的group中去計(jì)算。順便提一句有额,flink的keyed算子保存的是對(duì)每個(gè)數(shù)據(jù)的 key的計(jì)算方法般又,而非真實(shí)的key,用戶需要自己保證對(duì)每一行數(shù)據(jù)提供的keySelector的冪等 性巍佑。至于為什么要用KeyGroup的設(shè)計(jì)倒源,這就牽扯到擴(kuò)容的范疇了,將在后面的章節(jié)進(jìn)行講 述句狼。
對(duì)于 openAllOperators() 方法,就是對(duì)各種RichOperator執(zhí)行其open方法热某,通衬骞剑可用于在 執(zhí)行計(jì)算之前加載資源。 最后昔馋,run方法千呼萬(wàn)喚始出來(lái)筹吐,該方法經(jīng)過(guò)一系列跳轉(zhuǎn),最終調(diào)用chain上的第一個(gè)算子的 run方法秘遏。在wordcount的例子中丘薛,它最終調(diào)用了SocketTextStreamFunction的run,建立 socket連接并讀入文本邦危。
3.4 StreamTask與StreamOperator
前面提到洋侨,Task對(duì)象在執(zhí)行過(guò)程中,把執(zhí)行的任務(wù)交給了StreamTask這個(gè)類去執(zhí)行倦蚪。在我們 的wordcount例子中希坚,實(shí)際初始化的是OneInputStreamTask的對(duì)象(參考上面的類圖)。那 么這個(gè)對(duì)象是如何執(zhí)行用戶的代碼的呢?
它做的陵且,就是把任務(wù)直接交給了InputProcessor去執(zhí)行processInput方法裁僧。這是一個(gè) StreamInputProcessor 的實(shí)例,該processor的任務(wù)就是處理輸入的數(shù)據(jù),包括用戶數(shù) 據(jù)聊疲、watermark和checkpoint數(shù)據(jù)等茬底。我們先來(lái)看看這個(gè)processor是如何產(chǎn)生的:
這是OneInputStreamTask的init方法,從configs里面獲取StreamOperator信息获洲,生成自己 的inputProcessor阱表。那么inputProcessor是如何處理數(shù)據(jù)的呢?我們接著跟進(jìn)源碼:
到此為止,以上部分就是一個(gè)flink程序啟動(dòng)后昌妹,到執(zhí)行用戶代碼之前捶枢,flink框架所做的準(zhǔn)備工 作》裳拢回顧一下:
- 啟動(dòng)一個(gè)環(huán)境
- 生成StreamGraph
- 注冊(cè)和選舉JobManager
- 在各節(jié)點(diǎn)生成TaskManager烂叔,并根據(jù)JobGraph生成對(duì)應(yīng)的Task
- 啟動(dòng)各個(gè)task,準(zhǔn)備執(zhí)行代碼
接下來(lái)固歪,我們挑幾個(gè)Operator看看flink是如何抽象這些算子的蒜鸡。