Flink學(xué)習(xí)筆記:Flink Runtime

本文為《Flink大數(shù)據(jù)項(xiàng)目實(shí)戰(zhàn)》學(xué)習(xí)筆記,想通過視頻系統(tǒng)學(xué)習(xí)Flink這個最火爆的大數(shù)據(jù)計算框架的同學(xué),推薦學(xué)習(xí)課程: ??

?Flink大數(shù)據(jù)項(xiàng)目實(shí)戰(zhàn):http://t.cn/EJtKhaz


1. Flink運(yùn)行時架構(gòu)

1.1Flink架構(gòu)

Flink 運(yùn)行時架構(gòu)主要包含幾個部分:Client罢绽、JobManager(master節(jié)點(diǎn))和TaskManger(slave節(jié)點(diǎn))寺晌。

Client:Flink 作業(yè)在哪臺機(jī)器上面提交沼侣,那么當(dāng)前機(jī)器稱之為Client疆导。用戶開發(fā)的Program 代碼,它會構(gòu)建出DataFlow graph澳迫,然后通過Client提交給JobManager局齿。

JobManager:是主(master)節(jié)點(diǎn),相當(dāng)于YARN里面的REsourceManager橄登,生成環(huán)境中一般可以做HA高可用抓歼。JobManager會將任務(wù)進(jìn)行拆分,調(diào)度到TaskManager上面執(zhí)行拢锹。

TaskManager:是從節(jié)點(diǎn)(slave)谣妻,TaskManager才是真正實(shí)現(xiàn)task的部分。

Client提交作業(yè)到JobManager卒稳,就需要跟JobManager進(jìn)行通信蹋半,它使用Akka框架或者庫進(jìn)行通信,另外Client與JobManager進(jìn)行數(shù)據(jù)交互充坑,使用的是Netty框架减江。Akka通信基于Actor

System,Client可以向JobManager發(fā)送指令捻爷,比如Submit job或者Cancel /update job辈灼。JobManager也可以反饋信息給Client,比如status updates也榄,Statistics和results巡莹。

Client提交給JobManager的是一個Job,然后JobManager將Job拆分成task甜紫,提交給TaskManager(worker)降宅。JobManager與TaskManager也是基于Akka進(jìn)行通信,JobManager發(fā)送指令囚霸,比如Deploy/Stop/Cancel

Tasks或者觸發(fā)Checkpoint钉鸯,反過來TaskManager也會跟JobManager通信返回Task Status,Heartbeat(心跳)邮辽,Statistics等唠雕。另外TaskManager之間的數(shù)據(jù)通過網(wǎng)絡(luò)進(jìn)行傳輸,比如Data Stream做一些算子的操作吨述,數(shù)據(jù)往往需要在TaskManager之間做數(shù)據(jù)傳輸岩睁。

1.2. TaskManger Slot

TaskManager是進(jìn)程,他下面運(yùn)行的task(整個Flink應(yīng)用是Job揣云,Job可以拆分成很多個task)是線程捕儒,每個task/subtask(線程)下可運(yùn)行一個或者多個operator,即OperatorChain邓夕。Task是class刘莹,抽象的,subtask是Object(類比學(xué)習(xí))焚刚,具體的点弯。

一個TaskManager通過Slot(任務(wù)槽)來控制它上面可以接受多少個task,比如一個TaskManager劃分了3個Task Slot(僅限內(nèi)存托管矿咕,目前CPU未做隔離)抢肛,它只能接受3個task。Slot均分TaskManager所托管的內(nèi)存碳柱,比如一個TaskManager有6G內(nèi)存捡絮,那么每個Slot分配2G。

同一個TaskManager中的task共享TCP連接(通過多路復(fù)用)和心跳消息莲镣。它們還可以共享數(shù)據(jù)集和數(shù)據(jù)結(jié)構(gòu)福稳,從而減少每個任務(wù)的開銷。一個TaskManager有N個槽位只能接受N個Task嗎瑞侮?不是的圆,后面會講共享槽位。

1.3. OperatorChain && Task

為了更高效地分布式執(zhí)行区岗,F(xiàn)link會盡可能地將operator的subtask鏈接(chain)在一起形成task略板。以wordcount為例,解析不同視圖下的數(shù)據(jù)流慈缔,如下圖所示叮称。

數(shù)據(jù)流(邏輯視圖)

創(chuàng)建Source(并行度設(shè)置為1)讀取數(shù)據(jù)源,數(shù)據(jù)經(jīng)過FlatMap(并行度設(shè)置為2)做轉(zhuǎn)換操作藐鹤,然后數(shù)據(jù)經(jīng)過Key Agg(并行度設(shè)置為2)做聚合操作瓤檐,最后數(shù)據(jù)經(jīng)過Sink(并行度設(shè)置為2)將數(shù)據(jù)輸出。

數(shù)據(jù)流(并行化視圖)

并行度為1的Source讀取數(shù)據(jù)源娱节,然后FlatMap并行度為2讀取數(shù)據(jù)源進(jìn)行轉(zhuǎn)化操作挠蛉,然后數(shù)據(jù)經(jīng)過Shuffle交給并行度為2的Key Agg進(jìn)行聚合操作,然后并行度為2的Sink將數(shù)據(jù)輸出肄满,未優(yōu)化前的task總和為7谴古。

數(shù)據(jù)流(優(yōu)化后視圖)

并行度為1的Source讀取數(shù)據(jù)源质涛,然后FlatMap并行度為2讀取數(shù)據(jù)源進(jìn)行轉(zhuǎn)化操作,然后數(shù)據(jù)經(jīng)過Shuffle交給Key Agg進(jìn)行聚合操作掰担,此時Key Agg和Sink操作合并為一個task(注意:將KeyAgg和Sink兩個operator進(jìn)行了合并汇陆,因?yàn)檫@兩個合并后并不會改變整體的拓?fù)浣Y(jié)構(gòu)),它們一起的并行度為2带饱,數(shù)據(jù)經(jīng)過Key Agg和Sink之后將數(shù)據(jù)輸出毡代,優(yōu)化后的task總和為5.

1.4. OperatorChain的優(yōu)點(diǎn)和組成條件

OperatorChain的優(yōu)點(diǎn)

1.減少線程切換

2.減少序列化與反序列化

3.減少數(shù)據(jù)在緩沖區(qū)的交換

4.減少延遲并且提高吞吐能力

OperatorChain 組成條件

1.沒有禁用Chain

2.上下游算子并行度一致。

3.下游算子的入度為1(也就是說下游節(jié)點(diǎn)沒有來自其他節(jié)點(diǎn)的輸入)勺疼。

4.上下游算子在同一個slot group(后面緊跟著就會講如何通過slot

group先分配到同一個solt教寂,然后才能chain) 。

5.下游節(jié)點(diǎn)的 chain 策略為 ALWAYS(可以與上下游鏈接执庐,map酪耕、flatmap、filter等默認(rèn)是ALWAYS)耕肩。

6.上游節(jié)點(diǎn)的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接因妇,不能與上游鏈接,Source默認(rèn)是HEAD)猿诸。

7.上下游算子之間沒有數(shù)據(jù)shuffle (數(shù)據(jù)分區(qū)方式是forward)婚被。

1.5. 編程改變OperatorChain行為

Operator chain的行為可以通過編程API中進(jìn)行指定,可以通過在DataStream的operator后面(如someStream.map(..))調(diào)用startNewChain()來指示從該operator開始一個新的chain(與前面截斷梳虽,不會被chain到前面)址芯。可以調(diào)用disableChaining()來指示該operator不參與chaining(不會與前后的operator

chain一起)窜觉」日ǎ可以通過調(diào)用StreamExecutionEnvironment.disableOperatorChaining()來全局禁用chaining≠鞔欤可以設(shè)置Slot

group偿荷,例如someStream.filter(...).slotSharingGroup(“name”)趣钱〗韫颍可以通過調(diào)整并行度溶握,來調(diào)整Operator chain。

2. Slot分配與共享

2.1共享Slot

默認(rèn)情況下砰左,F(xiàn)link 允許subtasks共享slot匿醒,條件是它們都來自同一個Job的不同task的subtask。結(jié)果可能一個slot持有該job的整個pipeline缠导。

允許slot共享有以下兩點(diǎn)好處:

1.Flink集群需要的任務(wù)槽與作業(yè)中使用的最高并行度正好相同(前提廉羔,保持默認(rèn)SlotSharingGroup)。也就是說我們不需要再去計算一個程序總共會起多少個task了僻造。

2.更容易獲得更充分的資源利用憋他。如果沒有slot共享孩饼,那么非密集型操作source/flatmap就會占用同密集型操作keyAggregation/sink 一樣多的資源。如果有slot共享竹挡,將task的2個并行度增加到6個捣辆,能充分利用slot資源,同時保證每個TaskManager能平均分配到重的subtasks此迅。

2.2共享Slot實(shí)例

將 WordCount 的并行度從之前的2個增加到6個(Source并行度仍為1),并開啟slot共享(所有operator都在default共享組)旧巾,將得到如上圖所示的slot分布圖耸序。

首先,我們不用去計算這個job會其多少個task鲁猩,總之該任務(wù)最終會占用6個slots(最高并行度為6)坎怪。其次,我們可以看到密集型操作 keyAggregation/sink 被平均地分配到各個 TaskManager廓握。

2.3 SlotSharingGroup(soft)

SlotSharingGroup是Flink中用來實(shí)現(xiàn)slot共享的類搅窿,它盡可能地讓subtasks共享一個slot。

保證同一個group的并行度相同的sub-tasks 共享同一個slots隙券。算子的默認(rèn)group為default(即默認(rèn)一個job下的subtask都可以共享一個slot)

為了防止不合理的共享男应,用戶也能通過API來強(qiáng)制指定operator的共享組,比如:someStream.filter(...).slotSharingGroup("group1");就強(qiáng)制指定了filter的slot共享組為group1娱仔。怎么確定一個未做SlotSharingGroup設(shè)置算子的SlotSharingGroup什么呢(根據(jù)上游算子的group 和自身是否設(shè)置group共同確定)沐飘。適當(dāng)設(shè)置可以減少每個slot運(yùn)行的線程數(shù),從而整體上減少機(jī)器的負(fù)載牲迫。

2.4 CoLocationGroup(強(qiáng)制)

CoLocationGroup可以保證所有的并行度相同的sub-tasks運(yùn)行在同一個slot耐朴,主要用于迭代流(訓(xùn)練機(jī)器學(xué)習(xí)模型)。

3. Slot & parallelism的關(guān)系

3.1 Slots && parallelism

如上圖所示盹憎,有兩個TaskManager筛峭,每個TaskManager有3個槽位。假設(shè)source操作并行度為3陪每,map操作的并行度為4影晓,sink的并行度為4,所需的task slots數(shù)與job中task的最高并行度一致奶稠,最高并行度為4俯艰,那么使用的Slot也為4。

3.2如何計算Slot

如何計算一個應(yīng)用需要多少slot锌订?

如果不設(shè)置SlotSharingGroup竹握,那么需要的Slot數(shù)為應(yīng)用的最大并行度數(shù)。如果設(shè)置了SlotSharingGroup辆飘,那么需要的Slot數(shù)為所有SlotSharingGroup中的最大并行度之和啦辐。比如已經(jīng)強(qiáng)制指定了map的slot共享組為test谓传,那么map和map下游的組為test,map的上游source的組為默認(rèn)的default芹关,此時default組中最大并行度為10续挟,test組中最大并行度為20,那么需要的Slot=10+20=30侥衬。

4.Flink部署模式

4.1 Local 本地部署

Flink 可以運(yùn)行在 Linux诗祸、Mac OS X 和 Windows 上。本地模式的安裝唯一需要的只是 Java 1.7.x或更高版本轴总,本地運(yùn)行會啟動Single JVM直颅,主要用于測試調(diào)試代碼。

4.2 Standalone Cluster集群部署

軟件需求

1.安裝Java1.8或者更高版本

2.集群各個節(jié)點(diǎn)需要ssh免密登錄

Flink Standalone 運(yùn)行流程前面已經(jīng)講過怀樟,這里就不在贅敘功偿。

4.3Flink ON?

Flink ON YARN工作流程如下所示:

首先提交job給YARN,就需要有一個Flink YARN Client往堡。

第一步:Client將Flink 應(yīng)用jar包和配置文件上傳到HDFS械荷。

第二步:Client向REsourceManager注冊resources和請求APPMaster? Container

第三步:REsourceManager就會給某一個Worker節(jié)點(diǎn)分配一個Container來啟動APPMaster,JobManager會在APPMaster中啟動虑灰。

第四步:APPMaster為Flink的TaskManagers分配容器并啟動TaskManager吨瞎,TaskManager內(nèi)部會劃分很多個Slot,它會自動從HDFS下載jar文件和修改后的配置瘩缆,然后運(yùn)行相應(yīng)的task关拒。TaskManager也會與APPMaster中的JobManager進(jìn)行交互,維持心跳等庸娱。

5.Flink Standalone集群部署

安裝Flink之前需要提前安裝好JDK着绊,這里我們安裝的是JDK1.8版本。

5.1下載

可以到官網(wǎng):https://archive.apache.org/dist/flink/將Flink1.6.2版本下載到本地熟尉。

5.2解壓

將下載的flink-1.6.2-bin-hadoop26-scala_2.11.tgz上傳至主節(jié)點(diǎn)

使用tar -zxvf flink-1.6.2-bin-hadoop26-scala_2.11.tgz命令解壓flink安裝包

方便后期flink多版本的使用归露,可以創(chuàng)建flink軟連接

ln -s flink-1.6.2 flink


5.3配置環(huán)境變量

vi ~/.bashrc

export FLINK_HOME=/home/hadoop/app/flink

export PATH=$FLINK_HOME/bin:$PATH

使配置文件生效

source ~/.bashrc

查看flink版本

flink -v

5.4修改配置文件

1.修改flink-conf.yaml配置文件

vi flink-conf.yaml

#JobManager地址

jobmanager.rpc.address: cdh01

#槽位配置為3

taskmanager.numberOfTaskSlots: 3

#設(shè)置并行度為3

parallelism.default: 3

2.修改masters配置

vi masters

cdh01:8081

3.修改slaves配置

vi slaves

cdh01

cdh02

cdh03

5.5主節(jié)點(diǎn)安裝目錄同步到從節(jié)點(diǎn)

通過deploy.sh腳本將flink安裝目錄同步到其他節(jié)點(diǎn)。

deploy.sh flink-1.6.2 /home/hadoop/app/slave

在從節(jié)點(diǎn)分別創(chuàng)建flink軟連接

ln -s flink-1.6.2 flink

5.6啟動服務(wù)

進(jìn)入flink bin目錄執(zhí)行啟動集群腳本start-cluster.sh

bin/start-cluster.sh

通過web查看flink集群斤儿,查看相關(guān)集群信息剧包。

http://cdh01:8081

5.7測試運(yùn)行

查看官網(wǎng)案例:https://ci.apache.org/projects/flink/flink-docs-release-1.6/

1.啟動nc服務(wù)

nc -l 9000

2.提交flink作業(yè)

bin/flink runexamples/streaming/SocketWindowWordCount.jar --port 9000

3.輸入測試數(shù)據(jù)


4.查看運(yùn)行結(jié)果

在TaskManager界面查看Flink運(yùn)行結(jié)果


?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市往果,隨后出現(xiàn)的幾起案子疆液,更是在濱河造成了極大的恐慌,老刑警劉巖陕贮,帶你破解...
    沈念sama閱讀 207,248評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件堕油,死亡現(xiàn)場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)掉缺,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,681評論 2 381
  • 文/潘曉璐 我一進(jìn)店門卜录,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人眶明,你說我怎么就攤上這事艰毒。” “怎么了搜囱?”我有些...
    開封第一講書人閱讀 153,443評論 0 344
  • 文/不壞的土叔 我叫張陵丑瞧,是天一觀的道長。 經(jīng)常有香客問我蜀肘,道長嗦篱,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,475評論 1 279
  • 正文 為了忘掉前任幌缝,我火速辦了婚禮,結(jié)果婚禮上诫欠,老公的妹妹穿的比我還像新娘涵卵。我一直安慰自己,他們只是感情好荒叼,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,458評論 5 374
  • 文/花漫 我一把揭開白布轿偎。 她就那樣靜靜地躺著,像睡著了一般被廓。 火紅的嫁衣襯著肌膚如雪坏晦。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,185評論 1 284
  • 那天嫁乘,我揣著相機(jī)與錄音昆婿,去河邊找鬼。 笑死蜓斧,一個胖子當(dāng)著我的面吹牛仓蛆,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播挎春,決...
    沈念sama閱讀 38,451評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼看疙,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了直奋?” 一聲冷哼從身側(cè)響起能庆,我...
    開封第一講書人閱讀 37,112評論 0 261
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎脚线,沒想到半個月后搁胆,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,609評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,083評論 2 325
  • 正文 我和宋清朗相戀三年丰涉,在試婚紗的時候發(fā)現(xiàn)自己被綠了拓巧。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,163評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡一死,死狀恐怖肛度,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情投慈,我是刑警寧澤承耿,帶...
    沈念sama閱讀 33,803評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站伪煤,受9級特大地震影響加袋,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜抱既,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,357評論 3 307
  • 文/蒙蒙 一职烧、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧防泵,春花似錦蚀之、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,357評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至锁右,卻和暖如春失受,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背咏瑟。 一陣腳步聲響...
    開封第一講書人閱讀 31,590評論 1 261
  • 我被黑心中介騙來泰國打工拂到, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人码泞。 一個月前我還...
    沈念sama閱讀 45,636評論 2 355
  • 正文 我出身青樓谆焊,卻偏偏與公主長得像,于是被迫代替她去往敵國和親浦夷。 傳聞我的和親對象是個殘疾皇子辖试,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,925評論 2 344

推薦閱讀更多精彩內(nèi)容