本文為《Flink大數(shù)據(jù)項(xiàng)目實(shí)戰(zhàn)》學(xué)習(xí)筆記,想通過視頻系統(tǒng)學(xué)習(xí)Flink這個最火爆的大數(shù)據(jù)計算框架的同學(xué),推薦學(xué)習(xí)課程: ??
?Flink大數(shù)據(jù)項(xiàng)目實(shí)戰(zhàn):http://t.cn/EJtKhaz
1. Flink運(yùn)行時架構(gòu)
1.1Flink架構(gòu)
Flink 運(yùn)行時架構(gòu)主要包含幾個部分:Client罢绽、JobManager(master節(jié)點(diǎn))和TaskManger(slave節(jié)點(diǎn))寺晌。
Client:Flink 作業(yè)在哪臺機(jī)器上面提交沼侣,那么當(dāng)前機(jī)器稱之為Client疆导。用戶開發(fā)的Program 代碼,它會構(gòu)建出DataFlow graph澳迫,然后通過Client提交給JobManager局齿。
JobManager:是主(master)節(jié)點(diǎn),相當(dāng)于YARN里面的REsourceManager橄登,生成環(huán)境中一般可以做HA高可用抓歼。JobManager會將任務(wù)進(jìn)行拆分,調(diào)度到TaskManager上面執(zhí)行拢锹。
TaskManager:是從節(jié)點(diǎn)(slave)谣妻,TaskManager才是真正實(shí)現(xiàn)task的部分。
Client提交作業(yè)到JobManager卒稳,就需要跟JobManager進(jìn)行通信蹋半,它使用Akka框架或者庫進(jìn)行通信,另外Client與JobManager進(jìn)行數(shù)據(jù)交互充坑,使用的是Netty框架减江。Akka通信基于Actor
System,Client可以向JobManager發(fā)送指令捻爷,比如Submit job或者Cancel /update job辈灼。JobManager也可以反饋信息給Client,比如status updates也榄,Statistics和results巡莹。
Client提交給JobManager的是一個Job,然后JobManager將Job拆分成task甜紫,提交給TaskManager(worker)降宅。JobManager與TaskManager也是基于Akka進(jìn)行通信,JobManager發(fā)送指令囚霸,比如Deploy/Stop/Cancel
Tasks或者觸發(fā)Checkpoint钉鸯,反過來TaskManager也會跟JobManager通信返回Task Status,Heartbeat(心跳)邮辽,Statistics等唠雕。另外TaskManager之間的數(shù)據(jù)通過網(wǎng)絡(luò)進(jìn)行傳輸,比如Data Stream做一些算子的操作吨述,數(shù)據(jù)往往需要在TaskManager之間做數(shù)據(jù)傳輸岩睁。
1.2. TaskManger Slot
TaskManager是進(jìn)程,他下面運(yùn)行的task(整個Flink應(yīng)用是Job揣云,Job可以拆分成很多個task)是線程捕儒,每個task/subtask(線程)下可運(yùn)行一個或者多個operator,即OperatorChain邓夕。Task是class刘莹,抽象的,subtask是Object(類比學(xué)習(xí))焚刚,具體的点弯。
一個TaskManager通過Slot(任務(wù)槽)來控制它上面可以接受多少個task,比如一個TaskManager劃分了3個Task Slot(僅限內(nèi)存托管矿咕,目前CPU未做隔離)抢肛,它只能接受3個task。Slot均分TaskManager所托管的內(nèi)存碳柱,比如一個TaskManager有6G內(nèi)存捡絮,那么每個Slot分配2G。
同一個TaskManager中的task共享TCP連接(通過多路復(fù)用)和心跳消息莲镣。它們還可以共享數(shù)據(jù)集和數(shù)據(jù)結(jié)構(gòu)福稳,從而減少每個任務(wù)的開銷。一個TaskManager有N個槽位只能接受N個Task嗎瑞侮?不是的圆,后面會講共享槽位。
1.3. OperatorChain && Task
為了更高效地分布式執(zhí)行区岗,F(xiàn)link會盡可能地將operator的subtask鏈接(chain)在一起形成task略板。以wordcount為例,解析不同視圖下的數(shù)據(jù)流慈缔,如下圖所示叮称。
數(shù)據(jù)流(邏輯視圖)
創(chuàng)建Source(并行度設(shè)置為1)讀取數(shù)據(jù)源,數(shù)據(jù)經(jīng)過FlatMap(并行度設(shè)置為2)做轉(zhuǎn)換操作藐鹤,然后數(shù)據(jù)經(jīng)過Key Agg(并行度設(shè)置為2)做聚合操作瓤檐,最后數(shù)據(jù)經(jīng)過Sink(并行度設(shè)置為2)將數(shù)據(jù)輸出。
數(shù)據(jù)流(并行化視圖)
并行度為1的Source讀取數(shù)據(jù)源娱节,然后FlatMap并行度為2讀取數(shù)據(jù)源進(jìn)行轉(zhuǎn)化操作挠蛉,然后數(shù)據(jù)經(jīng)過Shuffle交給并行度為2的Key Agg進(jìn)行聚合操作,然后并行度為2的Sink將數(shù)據(jù)輸出肄满,未優(yōu)化前的task總和為7谴古。
數(shù)據(jù)流(優(yōu)化后視圖)
并行度為1的Source讀取數(shù)據(jù)源质涛,然后FlatMap并行度為2讀取數(shù)據(jù)源進(jìn)行轉(zhuǎn)化操作,然后數(shù)據(jù)經(jīng)過Shuffle交給Key Agg進(jìn)行聚合操作掰担,此時Key Agg和Sink操作合并為一個task(注意:將KeyAgg和Sink兩個operator進(jìn)行了合并汇陆,因?yàn)檫@兩個合并后并不會改變整體的拓?fù)浣Y(jié)構(gòu)),它們一起的并行度為2带饱,數(shù)據(jù)經(jīng)過Key Agg和Sink之后將數(shù)據(jù)輸出毡代,優(yōu)化后的task總和為5.
1.4. OperatorChain的優(yōu)點(diǎn)和組成條件
OperatorChain的優(yōu)點(diǎn)
1.減少線程切換
2.減少序列化與反序列化
3.減少數(shù)據(jù)在緩沖區(qū)的交換
4.減少延遲并且提高吞吐能力
OperatorChain 組成條件
1.沒有禁用Chain
2.上下游算子并行度一致。
3.下游算子的入度為1(也就是說下游節(jié)點(diǎn)沒有來自其他節(jié)點(diǎn)的輸入)勺疼。
4.上下游算子在同一個slot group(后面緊跟著就會講如何通過slot
group先分配到同一個solt教寂,然后才能chain) 。
5.下游節(jié)點(diǎn)的 chain 策略為 ALWAYS(可以與上下游鏈接执庐,map酪耕、flatmap、filter等默認(rèn)是ALWAYS)耕肩。
6.上游節(jié)點(diǎn)的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接因妇,不能與上游鏈接,Source默認(rèn)是HEAD)猿诸。
7.上下游算子之間沒有數(shù)據(jù)shuffle (數(shù)據(jù)分區(qū)方式是forward)婚被。
1.5. 編程改變OperatorChain行為
Operator chain的行為可以通過編程API中進(jìn)行指定,可以通過在DataStream的operator后面(如someStream.map(..))調(diào)用startNewChain()來指示從該operator開始一個新的chain(與前面截斷梳虽,不會被chain到前面)址芯。可以調(diào)用disableChaining()來指示該operator不參與chaining(不會與前后的operator
chain一起)窜觉」日ǎ可以通過調(diào)用StreamExecutionEnvironment.disableOperatorChaining()來全局禁用chaining≠鞔欤可以設(shè)置Slot
group偿荷,例如someStream.filter(...).slotSharingGroup(“name”)趣钱〗韫颍可以通過調(diào)整并行度溶握,來調(diào)整Operator chain。
2. Slot分配與共享
2.1共享Slot
默認(rèn)情況下砰左,F(xiàn)link 允許subtasks共享slot匿醒,條件是它們都來自同一個Job的不同task的subtask。結(jié)果可能一個slot持有該job的整個pipeline缠导。
允許slot共享有以下兩點(diǎn)好處:
1.Flink集群需要的任務(wù)槽與作業(yè)中使用的最高并行度正好相同(前提廉羔,保持默認(rèn)SlotSharingGroup)。也就是說我們不需要再去計算一個程序總共會起多少個task了僻造。
2.更容易獲得更充分的資源利用憋他。如果沒有slot共享孩饼,那么非密集型操作source/flatmap就會占用同密集型操作keyAggregation/sink 一樣多的資源。如果有slot共享竹挡,將task的2個并行度增加到6個捣辆,能充分利用slot資源,同時保證每個TaskManager能平均分配到重的subtasks此迅。
2.2共享Slot實(shí)例
將 WordCount 的并行度從之前的2個增加到6個(Source并行度仍為1),并開啟slot共享(所有operator都在default共享組)旧巾,將得到如上圖所示的slot分布圖耸序。
首先,我們不用去計算這個job會其多少個task鲁猩,總之該任務(wù)最終會占用6個slots(最高并行度為6)坎怪。其次,我們可以看到密集型操作 keyAggregation/sink 被平均地分配到各個 TaskManager廓握。
2.3 SlotSharingGroup(soft)
SlotSharingGroup是Flink中用來實(shí)現(xiàn)slot共享的類搅窿,它盡可能地讓subtasks共享一個slot。
保證同一個group的并行度相同的sub-tasks 共享同一個slots隙券。算子的默認(rèn)group為default(即默認(rèn)一個job下的subtask都可以共享一個slot)
為了防止不合理的共享男应,用戶也能通過API來強(qiáng)制指定operator的共享組,比如:someStream.filter(...).slotSharingGroup("group1");就強(qiáng)制指定了filter的slot共享組為group1娱仔。怎么確定一個未做SlotSharingGroup設(shè)置算子的SlotSharingGroup什么呢(根據(jù)上游算子的group 和自身是否設(shè)置group共同確定)沐飘。適當(dāng)設(shè)置可以減少每個slot運(yùn)行的線程數(shù),從而整體上減少機(jī)器的負(fù)載牲迫。
2.4 CoLocationGroup(強(qiáng)制)
CoLocationGroup可以保證所有的并行度相同的sub-tasks運(yùn)行在同一個slot耐朴,主要用于迭代流(訓(xùn)練機(jī)器學(xué)習(xí)模型)。
3. Slot & parallelism的關(guān)系
3.1 Slots && parallelism
如上圖所示盹憎,有兩個TaskManager筛峭,每個TaskManager有3個槽位。假設(shè)source操作并行度為3陪每,map操作的并行度為4影晓,sink的并行度為4,所需的task slots數(shù)與job中task的最高并行度一致奶稠,最高并行度為4俯艰,那么使用的Slot也為4。
3.2如何計算Slot
如何計算一個應(yīng)用需要多少slot锌订?
如果不設(shè)置SlotSharingGroup竹握,那么需要的Slot數(shù)為應(yīng)用的最大并行度數(shù)。如果設(shè)置了SlotSharingGroup辆飘,那么需要的Slot數(shù)為所有SlotSharingGroup中的最大并行度之和啦辐。比如已經(jīng)強(qiáng)制指定了map的slot共享組為test谓传,那么map和map下游的組為test,map的上游source的組為默認(rèn)的default芹关,此時default組中最大并行度為10续挟,test組中最大并行度為20,那么需要的Slot=10+20=30侥衬。
4.Flink部署模式
4.1 Local 本地部署
Flink 可以運(yùn)行在 Linux诗祸、Mac OS X 和 Windows 上。本地模式的安裝唯一需要的只是 Java 1.7.x或更高版本轴总,本地運(yùn)行會啟動Single JVM直颅,主要用于測試調(diào)試代碼。
4.2 Standalone Cluster集群部署
軟件需求
1.安裝Java1.8或者更高版本
2.集群各個節(jié)點(diǎn)需要ssh免密登錄
Flink Standalone 運(yùn)行流程前面已經(jīng)講過怀樟,這里就不在贅敘功偿。
4.3Flink ON?
Flink ON YARN工作流程如下所示:
首先提交job給YARN,就需要有一個Flink YARN Client往堡。
第一步:Client將Flink 應(yīng)用jar包和配置文件上傳到HDFS械荷。
第二步:Client向REsourceManager注冊resources和請求APPMaster? Container
第三步:REsourceManager就會給某一個Worker節(jié)點(diǎn)分配一個Container來啟動APPMaster,JobManager會在APPMaster中啟動虑灰。
第四步:APPMaster為Flink的TaskManagers分配容器并啟動TaskManager吨瞎,TaskManager內(nèi)部會劃分很多個Slot,它會自動從HDFS下載jar文件和修改后的配置瘩缆,然后運(yùn)行相應(yīng)的task关拒。TaskManager也會與APPMaster中的JobManager進(jìn)行交互,維持心跳等庸娱。
5.Flink Standalone集群部署
安裝Flink之前需要提前安裝好JDK着绊,這里我們安裝的是JDK1.8版本。
5.1下載
可以到官網(wǎng):https://archive.apache.org/dist/flink/將Flink1.6.2版本下載到本地熟尉。
5.2解壓
將下載的flink-1.6.2-bin-hadoop26-scala_2.11.tgz上傳至主節(jié)點(diǎn)
使用tar -zxvf flink-1.6.2-bin-hadoop26-scala_2.11.tgz命令解壓flink安裝包
方便后期flink多版本的使用归露,可以創(chuàng)建flink軟連接
ln -s flink-1.6.2 flink
5.3配置環(huán)境變量
vi ~/.bashrc
export FLINK_HOME=/home/hadoop/app/flink
export PATH=$FLINK_HOME/bin:$PATH
使配置文件生效
source ~/.bashrc
查看flink版本
flink -v
5.4修改配置文件
1.修改flink-conf.yaml配置文件
vi flink-conf.yaml
#JobManager地址
jobmanager.rpc.address: cdh01
#槽位配置為3
taskmanager.numberOfTaskSlots: 3
#設(shè)置并行度為3
parallelism.default: 3
2.修改masters配置
vi masters
cdh01:8081
3.修改slaves配置
vi slaves
cdh01
cdh02
cdh03
5.5主節(jié)點(diǎn)安裝目錄同步到從節(jié)點(diǎn)
通過deploy.sh腳本將flink安裝目錄同步到其他節(jié)點(diǎn)。
deploy.sh flink-1.6.2 /home/hadoop/app/slave
在從節(jié)點(diǎn)分別創(chuàng)建flink軟連接
ln -s flink-1.6.2 flink
5.6啟動服務(wù)
進(jìn)入flink bin目錄執(zhí)行啟動集群腳本start-cluster.sh
bin/start-cluster.sh
通過web查看flink集群斤儿,查看相關(guān)集群信息剧包。
5.7測試運(yùn)行
查看官網(wǎng)案例:https://ci.apache.org/projects/flink/flink-docs-release-1.6/
1.啟動nc服務(wù)
nc -l 9000
2.提交flink作業(yè)
bin/flink runexamples/streaming/SocketWindowWordCount.jar --port 9000
3.輸入測試數(shù)據(jù)
4.查看運(yùn)行結(jié)果
在TaskManager界面查看Flink運(yùn)行結(jié)果