說明:本文為《Flink大數(shù)據(jù)項(xiàng)目實(shí)戰(zhàn)》學(xué)習(xí)筆記,想通過視頻系統(tǒng)學(xué)習(xí)Flink這個最火爆的大數(shù)據(jù)計(jì)算框架的同學(xué)佣蓉,推薦學(xué)習(xí)CSDN官網(wǎng)課程:
Flink大數(shù)據(jù)項(xiàng)目實(shí)戰(zhàn):http://t.cn/ExrHPl9?
(2)Flink初探
1.快速生成Flink項(xiàng)目
1.推薦開發(fā)工具
idea+maven+git
2.推薦開發(fā)語言
Java或者Scala
https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/java_api_quickstart.html
3.Flink項(xiàng)目構(gòu)建步驟
1)通過maven構(gòu)建Flink項(xiàng)目
這里我們選擇構(gòu)建1.6.2版本的Flink項(xiàng)目疟游,打開終端輸入如下命令:
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java ???-DarchetypeVersion=1.6.2
項(xiàng)目構(gòu)建過程中需要輸入groupId责嚷,artifactId,version和package
然后輸入y確認(rèn)
然后顯示Maven項(xiàng)目構(gòu)建成功
2)打開IDEA導(dǎo)入Flink 構(gòu)建的maven項(xiàng)目
打開IDEA開發(fā)工具嘿期,點(diǎn)擊open選項(xiàng)
選擇剛剛創(chuàng)建的Flink項(xiàng)目
IDEA打開Flink項(xiàng)目
2. Flink Batch版WordCount
新建一個batch package
打開github Flink源碼品擎,將批處理WordCount代碼copy到batch包下。
?
?右鍵選擇run备徐,運(yùn)行Flink批處理WordCount萄传,運(yùn)行結(jié)果如下所示:
3. Flink Stream版WordCount
同樣,流處理我們也單獨(dú)創(chuàng)建一個包stream
打開github Flink源碼坦喘,將流處理WordCount代碼copy到stream包下盲再。
打開流處理WordCount代碼:
右鍵選擇run,運(yùn)行Flink流處理WordCount瓣铣,運(yùn)行結(jié)果如下所示:
(3)Flink核心概念與編程模型
1. Flink分層架構(gòu)
1.1 Flink生態(tài)之核心組件棧
大家回顧一下Flink生態(tài)圈中的核心組件棧即可答朋,前面已經(jīng)詳細(xì)講過,這里就不再贅敘棠笑。
1.2 Flink分層架構(gòu)
Flink一共分為四個層級梦碗,具體如下圖所示:
Flink最下面的一層API為Stateful Stream Processing,它是Flink最底層的API,控制更靈活但一般很少使用洪规。然后上面一層就是Flink Core(核心)API印屁,它包含DataStream和DataSet API,應(yīng)用層的用戶經(jīng)常使用 Core API斩例。然后再上面一層就是 Table API雄人,它相當(dāng)于在Core API中可以定義數(shù)據(jù)的Table結(jié)構(gòu),可以做table操作念赶。最上面一層就是SQL 操作础钠,用戶可以直接使用SQL語句對數(shù)據(jù)處理,更簡單更方便叉谜。
注意:越底層的API越靈活旗吁,但越復(fù)雜。越上層的API越輕便停局,但靈活性差很钓。
[if !supportLists]1.?[endif]Stateful Stream Processing
a)它位于最底層,是Core API 的底層實(shí)現(xiàn)董栽。
b)它是嵌入到Stream流里面的處理函數(shù)(processFunction)码倦。
c)當(dāng)Core API滿足不了用戶需求,可以利用低階API構(gòu)建一些新的組件或者算子锭碳。
d)它雖然靈活性高叹洲,但開發(fā)比較復(fù)雜,需要具備一定的編碼能力工禾。
[if !supportLists]2.?[endif]Core API
[if !supportLists]a)?[endif]DataSet API是批處理API,處理有限的數(shù)據(jù)集蝗柔。
[if !supportLists]b)?[endif]DataStream API是流處理API闻葵,處理無限的數(shù)據(jù)集。
[if !supportLists]3.?[endif]Table API & SQL
a)SQL 構(gòu)建在Table 之上癣丧,都需要構(gòu)建Table 環(huán)境槽畔。
b)不同的類型的Table 構(gòu)建不同的Table 環(huán)境中。
c)Table 可以與DataStream或者DataSet進(jìn)行相互轉(zhuǎn)換胁编。
d)Streaming SQL不同于存儲的SQL厢钧,最終會轉(zhuǎn)化為流式執(zhí)行計(jì)劃。
1.3Flink DataFlow
Flink DataFlow基本套路:先創(chuàng)建Data Source讀取數(shù)據(jù)嬉橙,然后對數(shù)據(jù)進(jìn)行轉(zhuǎn)化操作早直,然后創(chuàng)建DataSink對數(shù)據(jù)輸出。
結(jié)合代碼和示意圖理解DataFlow
Flink DataFlow基本套路如下所示:
步驟1:構(gòu)建計(jì)算環(huán)境(決定采用哪種計(jì)算執(zhí)行方式)
步驟2:創(chuàng)建Source(可以多個數(shù)據(jù)源)
步驟3:對數(shù)據(jù)進(jìn)行不同方式的轉(zhuǎn)換(提供了豐富的算子)
步驟4:對結(jié)果的數(shù)據(jù)進(jìn)行Sink(可以輸出到多個地方)
并行化DataFlow
從上圖可以看出Source的并行度為2市框,它們可以并行運(yùn)行在不同的節(jié)點(diǎn)上霞扬。Map的并行度也為2,source讀取數(shù)據(jù)后做Stream Partition操作,source1將數(shù)據(jù)交給map1喻圃,source2將數(shù)據(jù)交給map2萤彩。keyBy(或者window等)的并行度為2,map處理后的數(shù)據(jù)需要經(jīng)過shuffle操作斧拍,然后交給keyBy進(jìn)行分組統(tǒng)計(jì)雀扶。Sink的并行度為1,keyBy最后分組統(tǒng)計(jì)后的數(shù)據(jù)交給sink肆汹,將數(shù)據(jù)進(jìn)行輸出操作愚墓。
算子間數(shù)據(jù)傳遞模式
從上圖可以看出,F(xiàn)link算子間的數(shù)據(jù)傳遞模式大概分為兩種:
1.One-to-one?streams:保持元素的分區(qū)和順序县踢,比如數(shù)據(jù)做map操作转绷。
2.Redistributing?streams: 它會改變流的分區(qū),重新分區(qū)策略取決于使用的算子
keyBy()?(re-partitions by hashing the key):根據(jù)hash key對數(shù)據(jù)重新分區(qū)硼啤。
broadcast():即為廣播操作议经,比如map1有100條數(shù)據(jù),發(fā)送給keyBy1是100條數(shù)據(jù)谴返,發(fā)給keyBy2也是100條數(shù)據(jù)煞肾。
rebalance()?(which re-partitions randomly):即隨機(jī)打散,數(shù)據(jù)隨機(jī)分區(qū)發(fā)送給下游操作嗓袱。
2.?Windows
前面我們已經(jīng)了解了Flink的Stream流處理和Batch批處理籍救,那么我們這里講的Windows操作是對一段數(shù)據(jù)進(jìn)行操作,它可以按照固定數(shù)據(jù)量進(jìn)行Windows操作渠抹,也可以按照固定時間進(jìn)行windows操作蝙昙,它是Stream 流處理所特有的窗口操作。
Flink Windows操作的類型大概分為以下幾類:
[if !supportLists]1.?[endif]Count Windows
顧名思義梧却,是按照Events的數(shù)量進(jìn)行操作奇颠,比如每3個Event做一次windows操作。
[if !supportLists]2.?[endif]Time Windows
基于時間長度進(jìn)行Windows操作
[if !supportLists]a)?[endif]Tumbling Windows:即翻滾窗口放航,不會重疊烈拒,比如每隔3s操作一次。
[if !supportLists]b)?[endif]Sliding Windows:即滑動窗口广鳍,有重疊荆几,比如窗口大小為3s,每次向前滑動1s赊时。
[if !supportLists]c)?[endif]Session Windows:類似于Web編程里的Session吨铸,以不活動間隙作為窗口進(jìn)行操作,比如每10s內(nèi)沒有活動祖秒,就會做一次Windows操作焊傅。
[if !supportLists]3.?[endif]自定義Windows
當(dāng)Flink內(nèi)置的windows不能滿足用戶的需求剂陡,我們可以自定義Windows操作。
3.各種Time
從上圖可以看出Flink中的Time大致分為以下三類:
1.Event Time:Event 真正產(chǎn)生的時間狐胎,我們稱之為Event Time鸭栖。
2.Ingestion Time:Event 事件被Source拿到,進(jìn)入Flink處理引擎的時間握巢,我們稱之為Ingestion Time晕鹊。
3.Window Processing Time:Event事件被Flink 處理(比如做windows操作)時的時間,我們稱之為Window Processing Time暴浦。
4.Stateful Operations
什么是狀態(tài)溅话?
state一般指一個具體的task/operator的狀態(tài),比如當(dāng)前處理那些數(shù)據(jù)歌焦,數(shù)據(jù)處理的進(jìn)度等等飞几。
Flink state操作狀態(tài)分為兩類:
1.Operator State
Operator State跟一個特定operator的一個并發(fā)實(shí)例綁定,整個operator只對應(yīng)一個state独撇。
2.Keyed State
基于KeyedStream上的狀態(tài)屑墨。這個狀態(tài)是跟特定的key綁定的,對KeyedStream流上的每一個key纷铣,可能都對應(yīng)一個state卵史。
Flink每個操作狀態(tài)又分為兩類:
Keyed State和Operator State可以以兩種形式存在:原始狀態(tài)和托管狀態(tài)( Flink框架管理的狀態(tài))。
1.原始狀態(tài):比如一個字符串或者數(shù)組搜立,它需要序列化以躯,保存到內(nèi)存或磁盤,或者外部存儲中啄踊,這就是它的原始狀態(tài)忧设。
2.托管狀態(tài):比如數(shù)據(jù)放在Hash表中,或者放在HDFS中颠通,或者放在rocksdb中见转,這種就是托管狀態(tài)。當(dāng)需要處理數(shù)據(jù)的時候蒜哀,從托管狀態(tài)中讀取出來,還原成原始狀態(tài)吏砂,甚至變量和集合撵儿,然后再進(jìn)行處理。
5.Checkpoints(備份)
什么是checkpoint狐血?
所謂checkpoint淀歇,就是在某一時刻,將所有task的狀態(tài)做一個快照(snapshot)匈织,然后存儲到State Backend(比如hdfs)浪默。checkpoint擁有輕量級容錯機(jī)制牡直,可以保證exactly-once 語義,用于內(nèi)部失敗的恢復(fù)(比如當(dāng)應(yīng)用掛了纳决,它可以自動恢復(fù)從上次的進(jìn)度接著執(zhí)行)碰逸。
checkpoint基本原理:通過往source 注入barrier(可以理解為特殊的Event),barrier作為checkpoint的標(biāo)志阔加,它會自動做checkpoint無需人工干預(yù)饵史。
6.Savepoint
savepoint是流處理過程中的狀態(tài)歷史版本,它具有可以replay的功能胜榔。用于外部恢復(fù)胳喷,當(dāng)Flink應(yīng)用重啟和升級,它會做一個先做一個savepoint夭织,下次應(yīng)用啟動可以接著上次進(jìn)度執(zhí)行吭露。
savepoint兩種觸發(fā)方式:
1.Cancel with savepoint
2.手動主動觸發(fā)
savepoint可以理解為是一種特殊的checkpoint,savepoint就是指向checkpoint的一個指針,需要手動觸發(fā)驻谆,而且不會過期歇拆,不會被覆蓋,除非手動刪除戴卜。正常情況下的線上環(huán)境是不需要設(shè)置savepoint的。除非對job或集群做出重大改動的時候琢岩,需要進(jìn)行測試運(yùn)行投剥。
(4)Flink Runtime
1. Flink運(yùn)行時架構(gòu)
1.1Flink架構(gòu)
Flink運(yùn)行時架構(gòu)主要包含幾個部分:Client、JobManager(master節(jié)點(diǎn))和TaskManger(slave節(jié)點(diǎn))担孔。
Client:Flink 作業(yè)在哪臺機(jī)器上面提交江锨,那么當(dāng)前機(jī)器稱之為Client。用戶開發(fā)的Program 代碼糕篇,它會構(gòu)建出DataFlow graph啄育,然后通過Client提交給JobManager。
JobManager:是主(master)節(jié)點(diǎn)拌消,相當(dāng)于YARN里面的REsourceManager挑豌,生成環(huán)境中一般可以做HA 高可用。JobManager會將任務(wù)進(jìn)行拆分墩崩,調(diào)度到TaskManager上面執(zhí)行氓英。
TaskManager:是從節(jié)點(diǎn)(slave),TaskManager才是真正實(shí)現(xiàn)task的部分鹦筹。
Client提交作業(yè)到JobManager铝阐,就需要跟JobManager進(jìn)行通信,它使用Akka框架或者庫進(jìn)行通信铐拐,另外Client與JobManager進(jìn)行數(shù)據(jù)交互徘键,使用的是Netty框架练对。Akka通信基于Actor System,Client可以向JobManager發(fā)送指令吹害,比如Submit job或者Cancel /update job螟凭。JobManager也可以反饋信息給Client,比如status updates赠制,Statistics和results赂摆。
Client提交給JobManager的是一個Job,然后JobManager將Job拆分成task钟些,提交給TaskManager(worker)烟号。JobManager與TaskManager也是基于Akka進(jìn)行通信,JobManager發(fā)送指令政恍,比如Deploy/Stop/Cancel Tasks或者觸發(fā)Checkpoint汪拥,反過來TaskManager也會跟JobManager通信返回Task Status,Heartbeat(心跳)篙耗,Statistics等迫筑。另外TaskManager之間的數(shù)據(jù)通過網(wǎng)絡(luò)進(jìn)行傳輸,比如Data Stream做一些算子的操作宗弯,數(shù)據(jù)往往需要在TaskManager之間做數(shù)據(jù)傳輸脯燃。
1.2.?TaskManger Slot
TaskManager是進(jìn)程,他下面運(yùn)行的task(整個Flink應(yīng)用是Job蒙保,Job可以拆分成很多個task)是線程辕棚,每個task/subtask(線程)下可運(yùn)行一個或者多個operator,即OperatorChain邓厕。Task是class逝嚎,抽象的,subtask是Object(類比學(xué)習(xí))详恼,具體的补君。
一個TaskManager通過Slot(任務(wù)槽)來控制它上面可以接受多少個task,比如一個TaskManager劃分了3個Task Slot(僅限內(nèi)存托管昧互,目前CPU未做隔離)挽铁,它只能接受3個task。Slot均分TaskManager所托管的內(nèi)存敞掘,比如一個TaskManager有6G內(nèi)存叽掘,那么每個Slot分配2G。
同一個TaskManager中的task共享TCP連接(通過多路復(fù)用)和心跳消息渐逃。它們還可以共享數(shù)據(jù)集和數(shù)據(jù)結(jié)構(gòu),從而減少每個任務(wù)的開銷民褂。一個TaskManager有N個槽位只能接受N個Task嗎茄菊?不是疯潭,后面會講共享槽位。
1.3.?OperatorChain && Task
為了更高效地分布式執(zhí)行面殖,F(xiàn)link會盡可能地將operator的subtask鏈接(chain)在一起形成task竖哩。以wordcount為例,解析不同視圖下的數(shù)據(jù)流脊僚,如下圖所示相叁。
數(shù)據(jù)流(邏輯視圖)
創(chuàng)建Source(并行度設(shè)置為1)讀取數(shù)據(jù)源,數(shù)據(jù)經(jīng)過FlatMap(并行度設(shè)置為2)做轉(zhuǎn)換操作辽幌,然后數(shù)據(jù)經(jīng)過Key Agg(并行度設(shè)置為2)做聚合操作增淹,最后數(shù)據(jù)經(jīng)過Sink(并行度設(shè)置為2)將數(shù)據(jù)輸出。
數(shù)據(jù)流(并行化視圖)
并行度為1的Source讀取數(shù)據(jù)源乌企,然后FlatMap并行度為2讀取數(shù)據(jù)源進(jìn)行轉(zhuǎn)化操作虑润,然后數(shù)據(jù)經(jīng)過Shuffle交給并行度為2的Key Agg進(jìn)行聚合操作,然后并行度為2的Sink將數(shù)據(jù)輸出加酵,未優(yōu)化前的task總和為7拳喻。
數(shù)據(jù)流(優(yōu)化后視圖)
并行度為1的Source讀取數(shù)據(jù)源,然后FlatMap并行度為2讀取數(shù)據(jù)源進(jìn)行轉(zhuǎn)化操作猪腕,然后數(shù)據(jù)經(jīng)過Shuffle交給Key Agg進(jìn)行聚合操作冗澈,此時Key Agg和Sink操作合并為一個task(注意:將KeyAgg和Sink兩個operator進(jìn)行了合并,因?yàn)檫@兩個合并后并不會改變整體的拓?fù)浣Y(jié)構(gòu))陋葡,它們一起的并行度為2亚亲,數(shù)據(jù)經(jīng)過Key Agg和Sink之后將數(shù)據(jù)輸出,優(yōu)化后的task總和為5.
1.4. OperatorChain的優(yōu)點(diǎn)和組成條件
OperatorChain的優(yōu)點(diǎn)
1.減少線程切換
2.減少序列化與反序列化
3.減少數(shù)據(jù)在緩沖區(qū)的交換
4.減少延遲并且提高吞吐能力
OperatorChain組成條件
1.沒有禁用Chain
2.上下游算子并行度一致 脖岛。
3.下游算子的入度為1(也就是說下游節(jié)點(diǎn)沒有來自其他節(jié)點(diǎn)的輸入)朵栖。
4.上下游算子在同一個slot group(后面緊跟著就會講如何通過slot group先分配到同一個solt,然后才能chain) 柴梆。
5.下游節(jié)點(diǎn)的 chain 策略為 ALWAYS(可以與上下游鏈接陨溅,map、flatmap绍在、filter等默認(rèn)是ALWAYS)门扇。
6.上游節(jié)點(diǎn)的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接,不能與上游鏈接偿渡,Source默認(rèn)是HEAD)臼寄。
7.上下游算子之間沒有數(shù)據(jù)shuffle (數(shù)據(jù)分區(qū)方式是 forward)。
1.5.編程改變OperatorChain行為
Operator chain的行為可以通過編程API中進(jìn)行指定溜宽,可以通過在DataStream的operator后面(如someStream.map(..))調(diào)用startNewChain()來指示從該operator開始一個新的chain(與前面截?cái)嗉粫籧hain到前面)∈嗜啵可以調(diào)用disableChaining()來指示該operator不參與chaining(不會與前后的operator chain一起)留攒∶撼停可以通過調(diào)用StreamExecutionEnvironment.disableOperatorChaining()來全局禁用chaining×堆可以設(shè)置Slot group魄揉,例如someStream.filter(...).slotSharingGroup(“name”)∈媚可以通過調(diào)整并行度洛退,來調(diào)整Operator chain。
2. Slot分配與共享
2.1共享Slot
默認(rèn)情況下杰标,F(xiàn)link允許subtasks共享slot兵怯,條件是它們都來自同一個Job的不同task的subtask。結(jié)果可能一個slot持有該job的整個pipeline在旱。
允許slot共享有以下兩點(diǎn)好處:
1.Flink集群需要的任務(wù)槽與作業(yè)中使用的最高并行度正好相同(前提摇零,保持默認(rèn)SlotSharingGroup)。也就是說我們不需要再去計(jì)算一個程序總共會起多少個task了桶蝎。
2.更容易獲得更充分的資源利用驻仅。如果沒有slot共享,那么非密集型操作source/flatmap就會占用同密集型操作 keyAggregation/sink 一樣多的資源登渣。如果有slot共享噪服,將task的2個并行度增加到6個,能充分利用slot資源胜茧,同時保證每個TaskManager能平均分配到重的subtasks粘优。
2.2共享Slot實(shí)例
將WordCount的并行度從之前的2個增加到6個(Source并行度仍為1),并開啟slot共享(所有operator都在default共享組)呻顽,將得到如上圖所示的slot分布圖雹顺。
首先,我們不用去計(jì)算這個job會其多少個task廊遍,總之該任務(wù)最終會占用6個slots(最高并行度為6)嬉愧。其次,我們可以看到密集型操作 keyAggregation/sink 被平均地分配到各個 TaskManager喉前。
2.3?SlotSharingGroup(soft)
SlotSharingGroup是Flink中用來實(shí)現(xiàn)slot共享的類没酣,它盡可能地讓subtasks共享一個slot。
保證同一個group的并行度相同的sub-tasks 共享同一個slots卵迂。算子的默認(rèn)group為default(即默認(rèn)一個job下的subtask都可以共享一個slot)
為了防止不合理的共享裕便,用戶也能通過API來強(qiáng)制指定operator的共享組,比如:someStream.filter(...).slotSharingGroup("group1");就強(qiáng)制指定了filter的slot共享組為group1见咒。怎么確定一個未做SlotSharingGroup設(shè)置算子的SlotSharingGroup什么呢(根據(jù)上游算子的group 和自身是否設(shè)置group共同確定)偿衰。適當(dāng)設(shè)置可以減少每個slot運(yùn)行的線程數(shù),從而整體上減少機(jī)器的負(fù)載。
2.4 CoLocationGroup(強(qiáng)制)
CoLocationGroup可以保證所有的并行度相同的sub-tasks運(yùn)行在同一個slot下翎,主要用于迭代流(訓(xùn)練機(jī)器學(xué)習(xí)模型)囱嫩。
3. Slot & parallelism的關(guān)系
3.1?Slots && parallelism
3.2如何計(jì)算Slot
如何計(jì)算一個應(yīng)用需要多少slot?
如果不設(shè)置SlotSharingGroup漏设,那么需要的Slot數(shù)為應(yīng)用的最大并行度數(shù)。如果設(shè)置了SlotSharingGroup今妄,那么需要的Slot數(shù)為所有SlotSharingGroup中的最大并行度之和郑口。比如已經(jīng)強(qiáng)制指定了map的slot共享組為test,那么map和map下游的組為test盾鳞,map的上游source的組為默認(rèn)的default犬性,此時default組中最大并行度為10,test組中最大并行度為20腾仅,那么需要的Slot=10+20=30乒裆。