Tasks and Operator Chains
對(duì)于分布式計(jì)算祭刚,F(xiàn)link 把operator subtasks 運(yùn)行子任務(wù)串連在一起氏身,組合成Tasks。每個(gè)線程執(zhí)行一個(gè)Task丛忆。 把算子任務(wù)串連到一個(gè) Task 中運(yùn)行是一種非常有用的優(yōu)化:它減少了線程到線程間切換和緩存的開銷否纬,并且提高了整體吞吐量,減少了數(shù)據(jù)延遲怠晴。這種運(yùn)行子任務(wù)的串連操作是可以配置的:更多細(xì)節(jié)請(qǐng)查看這里遥金。
下圖中的數(shù)據(jù)流有5個(gè)運(yùn)行子任務(wù)(subtasks),因此有5個(gè)并行線程:
Job Managers, Task Managers, Clients
Flink 運(yùn)行時(shí)包含2種進(jìn)程:
Job Managers (又稱 Masters): Job Managers 負(fù)責(zé)協(xié)調(diào)分布式任務(wù)的運(yùn)行蒜田。Master調(diào)度Tasks稿械,協(xié)調(diào)Checkpoint執(zhí)行,協(xié)調(diào)故障恢復(fù)等工作冲粤。
Flink 的執(zhí)行環(huán)境中至少有一個(gè) Job Manager美莫。如果配置了 Flink 的HA (高可用),會(huì)有多個(gè) Job Manager梯捕,其中一個(gè) Job Manager 始終是 Leader厢呵,其他Job Manager 是 Standby (備用)。Task Managers (又稱 Workers): Task Managers 負(fù)責(zé)執(zhí)行數(shù)據(jù)流的tasks (更確切的說是: subtasks)傀顾,并緩存和交換數(shù)據(jù)流的數(shù)據(jù)。
Flink 的執(zhí)行環(huán)境中至少有一個(gè) TaskManager短曾。
Job Managers 和 Task Managers 有多種不同啟動(dòng)方式:直接以 standalone cluster 形式在Linux 機(jī)器中啟動(dòng)寒砖,或者在 資源管理框架 YARN 或 Mesos 的 容器中(containers) 中啟動(dòng)嫉拐。Task Manager 與Job Manager 保持連接上報(bào)自身狀態(tài),并接收Master分配的任務(wù)婉徘。
Client客戶端(Job Client)不是運(yùn)行時(shí)環(huán)境和程序執(zhí)行的一部分,但它是任務(wù)執(zhí)行的起點(diǎn)判哥。Client 負(fù)責(zé)準(zhǔn)備dataflow 任務(wù)執(zhí)行流程 并發(fā)送到 JobManager。之后塌计,Client 可以斷開連接挺身,或者保持連接用于接收J(rèn)ob執(zhí)行進(jìn)度相關(guān)的信息锌仅。
Task Slots and Resources
每個(gè) Worker(TaskManager) 都是一個(gè) JVM 進(jìn)程墙贱,每個(gè) TaskManager會(huì)在彼此隔離的線程中執(zhí)行 一個(gè)或多個(gè) subtasks 子任務(wù)。為了控制一個(gè) Worker可以執(zhí)行多少個(gè)Tasks 任務(wù)贱傀,F(xiàn)link 引入了稱為Task Slots 任務(wù)槽的概念惨撇,每個(gè)Worker至少包括一個(gè)Task Slot 任務(wù)槽。
每個(gè)task slot代表TaskManager的一些固定資源府寒。例如: 一個(gè) TaskManager 有3個(gè) Slots魁衙,那 TaskManager為每個(gè)任務(wù)槽分配他自身1/3的資源。分配任務(wù)槽意味著subtasks 子任務(wù)不會(huì)與其他Jobs 作業(yè)爭(zhēng)搶內(nèi)存株搔,而是為每個(gè) Slot 預(yù)留一定數(shù)量的內(nèi)存剖淀。注意:目前 Flink任務(wù)槽 Slots 僅隔離分配給TaskManager內(nèi)存, 不會(huì)隔離分配給 TaskManager 的 CPU纤房。
通過調(diào)整任務(wù)槽的數(shù)量纵隔,用戶可以定義subtasks 子任務(wù)的隔離程度。TaskManager 有一個(gè) Slot炮姨,表示每個(gè)Task group 任務(wù)組都在單獨(dú)的 JVM 進(jìn)程中運(yùn)行捌刮。TaskManager 有多個(gè) Slot,表示多個(gè)subtasks 子任務(wù)共享一個(gè) JVM舒岸。在同一個(gè) JVM 進(jìn)程中的 subtasks 子任務(wù) 共享 TCP 連接 (通過多路復(fù)用技術(shù)) 和 心跳消息绅作。多個(gè)Slots之間也會(huì)共享數(shù)據(jù)集和數(shù)據(jù)結(jié)構(gòu),這樣可以減少每個(gè)subtasks 子任務(wù)的開銷吁津。
默認(rèn)情況下棚蓄,F(xiàn)link 允許同一個(gè) Job的 subtasks 子任務(wù)之間共享 Slot,即使這些subtasks 子任務(wù)屬于不同的Tasks (這個(gè) Task 可以理解為 Spark 的 Stage) 碍脏,只要這些Tasks屬于同一個(gè)Flink-Job梭依,subtasks就可以共享Slot。這樣做的結(jié)果是一個(gè) Slot可能負(fù)責(zé)整個(gè)Flink-Job的作業(yè)流水線(Pipeline)典尾。Flink 允許 Slot 共享 帶來2個(gè)好處:
- Flink 集群所需要的 任務(wù)Slot 數(shù)與 Flink Job 中使用的并行度一致役拴。不需要再額外計(jì)算一個(gè)程序要包含多個(gè)Tasks。
- 更好的利用系統(tǒng)資源钾埂。沒有 Slot Sharing 任務(wù)槽共享河闰,非資源密集型的子任務(wù)source()/map() 占用的資源 將與資源密集的window() 窗口子任務(wù)占用的一樣多。在我們的例子中褥紫,通過 Slot Sharing 任務(wù)槽共享姜性,任務(wù)的并行度由2增加到6 可以充分利用 Slot 資源,同時(shí)確保重型任務(wù)能在 TaskManager 之間公平分配髓考。
Flink API 包含一種 resource group 的機(jī)制 來阻止不希望發(fā)生的 Slot Sharing部念。
根據(jù)經(jīng)驗(yàn)來看,較合理的共享槽 Slots 數(shù)量 應(yīng)該與 CPU 的核數(shù)相一致。通過hyper-threading 超線程技術(shù)儡炼,每個(gè)任務(wù)槽將運(yùn)行2個(gè)妓湘,或者多個(gè)線程。
State Backends
存儲(chǔ)key/value 索引的確切數(shù)據(jù)結(jié)構(gòu)依賴于所選擇的 state backend乌询。一種state backend在內(nèi)存中使用 Hash Map 結(jié)構(gòu)來存儲(chǔ)數(shù)據(jù)榜贴,另一種state backend使用 RocksDB 來存儲(chǔ) Key/Value。除了定義數(shù)據(jù)結(jié)構(gòu)來存儲(chǔ)狀態(tài)值之外妹田,state backend也實(shí)現(xiàn)了獲取 Key/Value 狀態(tài)的時(shí)間點(diǎn)快照唬党,并將狀態(tài)值快照做為 Checkpoint 的一部分秆麸。
Savepoints
Flink 的Data Stream API可以從savepoint中恢復(fù)異常沮趣。savepoints能夠保證在不丟失任何 狀態(tài)數(shù)據(jù) 的情況下更新 Flink程序 和 Flink集群坷随。
Savepoints 是手動(dòng)觸發(fā)的checkpoints,savepoints 會(huì)生成 程序快照 并將快照寫入 state backend 中缸匪。savepoint 依賴常規(guī)的checkpoint機(jī)制类溢。在執(zhí)行執(zhí)行過程中會(huì)定期在 worker 節(jié)點(diǎn)上生成 快照 和 檢查點(diǎn)。狀態(tài)恢復(fù)只需要最后一次完成的checkpoint砂心,當(dāng)最新的 checkpoint生成之后蛇耀,就可以安全的刪除之前完成的checkpoint。
savepoint特別像這些定期生成的 checkpoint译暂,區(qū)別就是savepoint是用戶觸發(fā)的并且當(dāng)生成新的checkpoint時(shí)撩炊,savepoint不會(huì)自動(dòng)過期〔ィ可以用command line 命令行來創(chuàng)建 savepoint,或在取消一個(gè) job 時(shí)啦撮,通過REST API來生成 Savepoint汪厨。