Apache Flink 零基礎(chǔ)入門(mén)(三):開(kāi)發(fā)環(huán)境搭建和應(yīng)用的配置官辽、部署及運(yùn)行
[天池大數(shù)據(jù)科研平臺(tái)](javascript:void(0);) 9月2日
Ververica 泥张, 作者 沙晟陽(yáng)本文是根據(jù) Apache Flink 系列直播課程整理而成劫笙,由阿里巴巴高級(jí)開(kāi)發(fā)工程師沙晟陽(yáng)分享,主要面向于初次接觸 Flink涕俗、或者對(duì) Flink 有了解但是沒(méi)有實(shí)際操作過(guò)的同學(xué)贡定。希望幫助大家更順利地上手使用 Flink闸衫,并著手相關(guān)開(kāi)發(fā)調(diào)試工作献丑。
主要內(nèi)容:
Flink 開(kāi)發(fā)環(huán)境的部署和配置
運(yùn)行 Flink 應(yīng)用
單機(jī) Standalone 模式
多機(jī) Standalone 模式
Yarn 集群模式
一. Flink 開(kāi)發(fā)環(huán)境部署和配置
Flink 是一個(gè)以 Java 及 Scala 作為開(kāi)發(fā)語(yǔ)言的開(kāi)源大數(shù)據(jù)項(xiàng)目末捣,代碼開(kāi)源在 GitHub 上侠姑,并使用 Maven 來(lái)編譯和構(gòu)建項(xiàng)目创橄。對(duì)于大部分使用 Flink 的同學(xué)來(lái)說(shuō),Java莽红、Maven 和 Git 這三個(gè)工具是必不可少的妥畏,另外一個(gè)強(qiáng)大的 IDE 有助于我們更快的閱讀代碼、開(kāi)發(fā)新功能以及修復(fù) Bug安吁。因?yàn)槠拮硪希覀儾粫?huì)詳述每個(gè)工具的安裝細(xì)節(jié),但會(huì)給出必要的安裝建議鬼店。
關(guān)于開(kāi)發(fā)測(cè)試環(huán)境网棍,Mac OS、Linux 系統(tǒng)或者 Windows 都可以妇智。如果使用的是 Windows 10 系統(tǒng)滥玷,建議使用 Windows 10 系統(tǒng)的 Linux 子系統(tǒng)來(lái)編譯和運(yùn)行。
工具 | 注釋 |
---|---|
Java | Java 版本至少是 Java 8巍棱,且最好選用 Java 8u51 及以上版本 |
Maven | 必須使用 Maven 3惑畴,建議使用 Maven 3.2.5。Maven 3.3.x 能夠編譯成功航徙,但是在 Shade 一些 Dependencies 的過(guò)程中有些問(wèn)題 |
Git | Flink 的代碼倉(cāng)庫(kù)是: https://github.com/apache/flink |
建議選用社區(qū)已發(fā)布的穩(wěn)定分支如贷,比如 Release-1.6 或者 Release-1.7。
1. 編譯 Flink 代碼
在我們配置好之前的幾個(gè)工具后到踏,編譯 Flink 就非常簡(jiǎn)單了杠袱,執(zhí)行如下命令即可:
mvn clean install -DskipTests
常用編譯參數(shù):
Dfast 主要是忽略QA plugins和JavaDocs的編譯
當(dāng)成功編譯完成后,能在當(dāng)前 Flink 代碼目錄下的 flink-dist/target/子目錄 中看到如下文件(不同的 Flink 代碼分支編譯出的版本號(hào)不同窝稿,這里的版本號(hào)是 Flink 1.5.1):
其中有三個(gè)文件可以留意一下:
版本 | 注釋 |
---|---|
flink-1.5.1.tar.gz | Binary 的壓縮包 |
flink-1.5.1-bin/flink-1.5.1 | 解壓后的 Flink binary 目錄 |
flink-dist_2.11-1.5.1.jar | 包含 Flink 核心功能的 jar 包 |
注意: 國(guó)內(nèi)用戶(hù)在編譯時(shí)可能遇到編譯失敗“Build Failure”(且有 MapR 相關(guān)報(bào)錯(cuò))楣富,一般都和 MapR 相關(guān)依賴(lài)的下載失敗有關(guān),即使使用了推薦的 settings.xml 配置(其中 Aliyun Maven 源專(zhuān)門(mén)為 MapR 相關(guān)依賴(lài)做了代理)讹躯,還是可能出現(xiàn)下載失敗的情況菩彬。問(wèn)題主要和 MapR 的 Jar 包比較大有關(guān)。遇到這些問(wèn)題時(shí)潮梯,重試即可骗灶。在重試之前,要先根據(jù)失敗信息刪除 Maven local repository 中對(duì)應(yīng)的目錄秉馏,否則需要等待 Maven 下載的超時(shí)時(shí)間才能再次出發(fā)下載依賴(lài)到本地耙旦。
2. 開(kāi)發(fā)環(huán)境準(zhǔn)備
推薦使用 IntelliJ IDEA IDE 作為 Flink 的 IDE 工具。官方不建議使用 Eclipse IDE萝究,主要原因是 Eclipse 的 Scala IDE 和 Flink 用 Scala 的不兼容免都。
如果你需要做一些 Flink 代碼的開(kāi)發(fā)工作锉罐,則需要根據(jù) Flink 代碼的 tools/maven/目錄 下的配置文件來(lái)配置 Checkstyle ,因?yàn)?Flink 在編譯時(shí)會(huì)強(qiáng)制代碼風(fēng)格的檢查绕娘,如果代碼風(fēng)格不符合規(guī)范脓规,可能會(huì)直接編譯失敗。
二险领、運(yùn)行 Flink 應(yīng)用
1. 基本概念
運(yùn)行 Flink 應(yīng)用其實(shí)非常簡(jiǎn)單侨舆,但是在運(yùn)行 Flink 應(yīng)用之前,還是有必要了解 Flink 運(yùn)行時(shí)的各個(gè)組件绢陌,因?yàn)檫@涉及到 Flink 應(yīng)用的配置問(wèn)題挨下。圖 1 所示,這是用戶(hù)用 DataStream API 寫(xiě)的一個(gè)數(shù)據(jù)處理程序脐湾〕舭剩可以看到,在一個(gè) DAG 圖中不能被 Chain 在一起的 Operator 會(huì)被分隔到不同的 Task 中秤掌,也就是說(shuō) Task 是 Flink 中資源調(diào)度的最小單位愁铺。
圖 1 Parallel Dataflows
圖 2 所示,F(xiàn)link 實(shí)際運(yùn)行時(shí)包括兩類(lèi)進(jìn)程:
JobManager(又稱(chēng)為 JobMaster):協(xié)調(diào) Task 的分布式執(zhí)行机杜,包括調(diào)度 Task帜讲、協(xié)調(diào)創(chuàng) Checkpoint 以及當(dāng) Job failover 時(shí)協(xié)調(diào)各個(gè) Task 從 Checkpoint 恢復(fù)等。
-
TaskManager(又稱(chēng)為 Worker):執(zhí)行 Dataflow 中的 Tasks椒拗,包括內(nèi)存 Buffer 的分配似将、Data Stream 的傳遞等。
圖 2 Flink Runtime 架構(gòu)圖
圖 3 所示蚀苛,Task Slot 是一個(gè) TaskManager 中的最小資源分配單位在验,一個(gè) TaskManager 中有多少個(gè) Task Slot 就意味著能支持多少并發(fā)的 Task 處理。需要注意的是堵未,一個(gè) Task Slot 中可以執(zhí)行多個(gè) Operator腋舌,一般這些 Operator 是能被 Chain 在一起處理的。
2. 運(yùn)行環(huán)境準(zhǔn)備
準(zhǔn)備 Flink binary
直接從 Flink 官網(wǎng)上下載 Flink binary 的壓縮包
或者從 Flink 源碼編譯而來(lái)
安裝 Java渗蟹,并配置 JAVA_HOME 環(huán)境變量
3. 單機(jī) Standalone 的方式運(yùn)行 Flink
(1)基本的啟動(dòng)流程
最簡(jiǎn)單的運(yùn)行 Flink 應(yīng)用的方法就是以單機(jī) Standalone 的方式運(yùn)行块饺。
啟動(dòng)集群:
./bin/start-cluster.sh
打開(kāi) http://127.0.0.1:8081/ 就能看到 Flink 的 Web 界面。嘗試提交 Word Count 任務(wù):
./bin/flink run examples/streaming/WordCount.jar
大家可以自行探索 Web 界面中展示的信息雌芽,比如授艰,我們可以看看 TaskManager 的 stdout 日志,就可以看到 Word Count 的計(jì)算結(jié)果世落。
我們還可以嘗試通過(guò)“–input”參數(shù)指定我們自己的本地文件作為輸入淮腾,然后執(zhí)行:
./bin/flink run examples/streaming/WordCount.jar --input ${your_source_file}
停止集群:
./bin/stop-cluster.sh
(2)常用配置介紹
- conf / slaves
conf / slaves 用于配置 TaskManager 的部署,默認(rèn)配置下只會(huì)啟動(dòng)一個(gè) TaskManager 進(jìn)程,如果想增加一個(gè) TaskManager 進(jìn)程的谷朝,只需要文件中追加一行“l(fā)ocalhost”洲押。
也可以直接通過(guò)“ ./bin/taskmanager.sh start ”這個(gè)命令來(lái)追加一個(gè)新的 TaskManager:
./bin/taskmanager.sh start|start-foreground|stop|stop-all
- **conf/flink-conf.yaml **
用于配置 JM 和 TM 的運(yùn)行參數(shù),常用配置有:
# The heap size for the JobManager JVM
Standalone 集群?jiǎn)?dòng)后圆凰,我們可以嘗試分析一下 Flink 相關(guān)進(jìn)程的運(yùn)行情況杈帐。執(zhí)行 jps 命令,可以看到 Flink 相關(guān)的進(jìn)程主要有兩個(gè)送朱,一個(gè)是 JobManager 進(jìn)程娘荡,另一個(gè)是 TaskManager 進(jìn)程。我們可以進(jìn)一步用 ps 命令看看進(jìn)程的啟動(dòng)參數(shù)中“-Xmx”和“-Xms”的配置驶沼。然后我們可以嘗試修改 flink-conf.yaml 中若干配置,然后重啟 Standalone 集群看看發(fā)生了什么變化争群。
需要補(bǔ)充的是回怜,在 Blink 開(kāi)源分支上,TaskManager 的內(nèi)存計(jì)算上相對(duì)于現(xiàn)在的社區(qū)版本要更精細(xì)化换薄,TaskManager 進(jìn)程的堆內(nèi)存限制(-Xmx)一般的計(jì)算方法是:
TotalHeapMemory = taskmanager.heap.mb + taskmanager.managed.memory.size + taskmanager.process.heap.memory.mb(默認(rèn)值為128MB)
而最新的 Flink 社區(qū)版本 Release-1.7 中 JobManager 和 TaskManager 默認(rèn)內(nèi)存配置方式為:
# The heap size for the JobManager JVM
Flink 社區(qū) Release-1.7 版本中的“taskmanager.heap.size”配置實(shí)際上指的不是 Java heap 的內(nèi)存限制玉雾,而是 TaskManager 進(jìn)程總的內(nèi)存限制。我們可以同樣用上述方法查看 Release-1.7 版本的 Flink binary 啟動(dòng)的 TaskManager 進(jìn)程的 -Xmx 配置轻要,會(huì)發(fā)現(xiàn)實(shí)際進(jìn)程上的 -Xmx 要小于配置的“taskmanager.heap.size”的值复旬,原因在于從中扣除了 Network buffer 用的內(nèi)存,因?yàn)?Network buffer 用的內(nèi)存一定是 Direct memory冲泥,所以不應(yīng)該算在堆內(nèi)存限制中驹碍。
(3)日志的查看和配置
JobManager 和 TaskManager 的啟動(dòng)日志可以在 Flink binary 目錄下的 Log 子目錄中找到。Log 目錄中以“flink-user?standalonesession?{id}-${hostname}”為前綴的文件對(duì)應(yīng)的是 JobManager 的輸出凡恍,其中有三個(gè)文件:
flink-user?standalonesession?{id}-${hostname}.log:代碼中的日志輸出
flink-user?standalonesession?{id}-${hostname}.out:進(jìn)程執(zhí)行時(shí)的 stdout 輸出
flink-user?standalonesession?{id}-${hostname}-gc.log:JVM 的 GC 的日志
Log 目錄中以“flink-user?taskexecutor?{id}-${hostname}”為前綴的文件對(duì)應(yīng)的是 TaskManager 的輸出志秃,也包括三個(gè)文件,和 JobManager 的輸出一致嚼酝。
日志的配置文件在 Flink binary 目錄的 conf 子目錄下浮还,其中:
log4j-cli.properties:用 Flink 命令行時(shí)用的 log 配置,比如執(zhí)行“ flink run”命令
log4j-yarn-session.properties:用 yarn-session.sh 啟動(dòng)時(shí)命令行執(zhí)行時(shí)用的 log 配置
log4j.properties:無(wú)論是 Standalone 還是 Yarn 模式闽巩,JobManager 和 TaskManager 上用的 log 配置都是 log4j.properties
這三個(gè)“l(fā)og4j.properties”文件分別有三個(gè)“l(fā)ogback.xml”文件與之對(duì)應(yīng)钧舌,如果想使用 Logback 的同學(xué),之需要把與之對(duì)應(yīng)的“l(fā)og4j.*properties”文件刪掉即可涎跨,對(duì)應(yīng)關(guān)系如下:
log4j-cli.properties -> logback-console.xml
log4j-yarn-session.properties -> logback-yarn.xml
log4j.properties -> logback.xml
需要注意的是洼冻,“flink-user?standalonesession?{id}-hostname”和“flink?{user}-taskexecutor-id?{hostname}”都帶有“id”,“{id}”表示本進(jìn)程在本機(jī)上該角色(JobManager 或 TaskManager)的所有進(jìn)程中的啟動(dòng)順序六敬,默認(rèn)從 0 開(kāi)始碘赖。
(4)進(jìn)一步探索
嘗試重復(fù)執(zhí)行“./bin/start-cluster.sh”命令,然后看看 Web 頁(yè)面(或者執(zhí)行 jps 命令),看看會(huì)發(fā)生什么普泡?可以嘗試看看啟動(dòng)腳本播掷,分析一下原因。接著可以重復(fù)執(zhí)行“./bin/stop-cluster.sh”撼班,每次執(zhí)行完后歧匈,看看會(huì)發(fā)生什么。
4. 多機(jī)部署 Flink Standalone 集群
部署前要注意的要點(diǎn):
每臺(tái)機(jī)器上配置好 Java 以及 JAVA_HOME 環(huán)境變量
每臺(tái)機(jī)器上部署的 Flink binary 的目錄要保證是同一個(gè)目錄
如果需要用 HDFS砰嘁,需要配置 HADOOP_CONF_DIR 環(huán)境變量配置
根據(jù)你的集群信息修改 conf/masters 和 conf/slaves 配置件炉。
修改 conf/flink-conf.yaml 配置,注意要確保和 Masters 文件中的地址一致:
jobmanager.rpc.address: z05f06378.sqa.zth.tbsite.net
確保所有機(jī)器的 Flink binary 目錄中 conf 中的配置文件相同矮湘,特別是以下三個(gè):
conf/masters
然后啟動(dòng) Flink 集群:
./bin/start-cluster.sh
提交 WordCount 作業(yè):
./bin/flink run examples/streaming/WordCount.jar
上傳 WordCount 的 Input 文件:
hdfs dfs -copyFromLocal story /test_dir/input_dir/story
提交讀寫(xiě) HDFS 的 WordCount 作業(yè):
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
增加 WordCount 作業(yè)的并發(fā)度(注意輸出文件重名會(huì)提交失斦迕帷):
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output --parallelism 20
5. Standalone 模式的 HighAvailability(HA)部署和配置
通過(guò)圖 2 Flink Runtime 架構(gòu)圖,我們可以看到 JobManager 是整個(gè)系統(tǒng)中最可能導(dǎo)致系統(tǒng)不可用的角色缅阳。如果一個(gè) TaskManager 掛了磕蛇,在資源足夠的情況下,只需要把相關(guān) Task 調(diào)度到其他空閑 TaskSlot 上十办,然后 Job 從 Checkpoint 中恢復(fù)即可秀撇。而如果當(dāng)前集群中只配置了一個(gè) JobManager,則一旦 JobManager 掛了向族,就必須等待這個(gè) JobManager 重新恢復(fù)呵燕,如果恢復(fù)時(shí)間過(guò)長(zhǎng),就可能導(dǎo)致整個(gè) Job 失敗件相。
因此如果在生產(chǎn)業(yè)務(wù)使用 Standalone 模式再扭,則需要部署配置 HighAvailability,這樣同時(shí)可以有多個(gè) JobManager 待命适肠,從而使得 JobManager 能夠持續(xù)服務(wù)霍衫。
注意:
- 如果想使用 Flink standalone HA 模式,需要確焙钛基于 Flink Release-1.6.1 及以上版本敦跌,因?yàn)檫@里社區(qū)有個(gè) bug 會(huì)導(dǎo)致這個(gè)模式下主 JobManager 不能正常工作。
- 接下來(lái)的實(shí)驗(yàn)中需要用到 HDFS逛揩,所以需要下載帶有 Hadoop 支持的 Flink Binary 包柠傍。
(1)(可選)使用 Flink 自帶的腳本部署 Zookeeper
Flink 目前支持基于 Zookeeper 的 HA。如果你的集群中沒(méi)有部署 ZK辩稽,F(xiàn)link 提供了啟動(dòng) Zookeeper 集群的腳本惧笛。首先修改配置文件“conf/zoo.cfg”,根據(jù)你要部署的 Zookeeper Server 的機(jī)器數(shù)來(lái)配置“server.X=addressX:peerPort:leaderPort”逞泄,其中“X”是一個(gè) Zookeeper Server 的唯一 ID患整,且必須是數(shù)字拜效。
# The port at which the clients will connect
然后啟動(dòng) Zookeeper:
./bin/start-zookeeper-quorum.sh
jps 命令看到 Zookeeper 進(jìn)程已經(jīng)啟動(dòng):
停掉 Zookeeper 集群的命令:
./bin/stop-zookeeper-quorum.sh
(2)修改 Flink Standalone 集群的配置
修改 conf/masters 文件,增加一個(gè) JobManager:
$cat conf/masters
之前修改過(guò)的 conf/slaves 文件保持不變:
$cat conf/slaves
修改 conf/flink-conf.yaml 文件:
# 配置 high-availability mode
需要注意的是各谚,在 HA 模式下 conf/flink-conf.yaml 中的這兩個(gè)配置都失效了(想想為什么)紧憾。
jobmanager.rpc.address
修改完成后,確保配置同步到其他機(jī)器昌渤。
啟動(dòng) Zookeeper 集群:
./bin/start-zookeeper-quorum.sh
再啟動(dòng) Standalone 集群(要確保之前的 Standalone 集群已經(jīng)停掉):
./bin/start-cluster.sh
分別打開(kāi)兩個(gè) Master 節(jié)點(diǎn)上的 JobManager Web 頁(yè)面:
http://z05f06378.sqa.zth.tbsite.net:8081
http://z05c19426.sqa.zth.tbsite.net:8081
可以看到兩個(gè)頁(yè)面最后都轉(zhuǎn)到了同一個(gè)地址上赴穗,這個(gè)地址就是當(dāng)前主 JobManager 所在機(jī)器,另一個(gè)就是 Standby JobManager膀息。以上我們就完成了 Standalone 模式下 HA 的配置般眉。
接下來(lái)我們可以測(cè)試驗(yàn)證 HA 的有效性。當(dāng)我們知道主 JobManager 的機(jī)器后潜支,我們可以把主 JobManager 進(jìn)程 Kill 掉甸赃,比如當(dāng)前主 JobManager 在 z05c19426.sqa.zth.tbsite.net 這個(gè)機(jī)器上,就把這個(gè)進(jìn)程殺掉毁腿。
接著辑奈,再打開(kāi)這兩個(gè)鏈接:
http://z05f06378.sqa.zth.tbsite.net:8081
http://z05c19426.sqa.zth.tbsite.net:8081
可以發(fā)現(xiàn)后一個(gè)鏈接已經(jīng)不能展示了,而前一個(gè)鏈接可以展示已烤,說(shuō)明發(fā)生主備切換。
然后我們?cè)僦貑⑶耙淮蔚闹?JobManager:
./bin/jobmanager.sh start z05c19426.sqa.zth.tbsite.net 8081
再打開(kāi) (http://z05c19426.sqa.zth.tbsite.net:8081) 這個(gè)鏈接妓羊,會(huì)發(fā)現(xiàn)現(xiàn)在這個(gè)鏈接可以轉(zhuǎn)到 (http://z05f06378.sqa.zth.tbsite.net:8081) 這個(gè)頁(yè)面上了胯究。說(shuō)明這個(gè) JobManager 完成了一個(gè) Failover Recovery。
6. 使用 Yarn 模式跑 Flink job
圖 5
相對(duì)于 Standalone 模式躁绸,Yarn 模式允許 Flink job 的好處有:
資源按需使用裕循,提高集群的資源利用率
任務(wù)有優(yōu)先級(jí),根據(jù)優(yōu)先級(jí)運(yùn)行作業(yè)
基于 Yarn 調(diào)度系統(tǒng)净刮,能夠自動(dòng)化地處理各個(gè)角色的 Failover
JobManager 進(jìn)程和 TaskManager 進(jìn)程都由 Yarn NodeManager 監(jiān)控
如果 JobManager 進(jìn)程異常退出剥哑,則 Yarn ResourceManager 會(huì)重新調(diào)度 JobManager 到其他機(jī)器
如果 TaskManager 進(jìn)程異常退出,JobManager 會(huì)收到消息并重新向 Yarn ResourceManager 申請(qǐng)資源淹父,重新啟動(dòng) TaskManager
(1)在 Yarn 上啟動(dòng) Long Running 的 Flink 集群(Session Cluster 模式)
查看命令參數(shù):
./bin/yarn-session.sh -h
創(chuàng)建一個(gè) Yarn 模式的 Flink 集群:
./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m
其中用到的參數(shù)是:
-n,–container Number of TaskManagers
-jm,–jobManagerMemory Memory for JobManager Container with optional unit (default: MB)
-tm,–taskManagerMemory Memory per TaskManager Container with optional unit (default: MB)
-qu,–queue Specify YARN queue.
-s,–slots Number of slots per TaskManager
-t,–ship Ship files in the specified directory (t for transfer)
提交一個(gè) Flink job 到 Flink 集群:
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
這次提交 Flink job,雖然沒(méi)有指定對(duì)應(yīng) Yarn application 的信息,卻可以提交到對(duì)應(yīng)的 Flink 集群茴丰,原因在于“/tmp/.yarn-properties-${user}”文件中保存了上一次創(chuàng)建 Yarn session 的集群信息召衔。所以如果同一用戶(hù)在同一機(jī)器上再次創(chuàng)建一個(gè) Yarn session,則這個(gè)文件會(huì)被覆蓋掉蘸际。
-
如果刪掉“/tmp/.yarn-properties-${user}”或者在另一個(gè)機(jī)器上提交作業(yè)能否提交到預(yù)期到 yarn session 中呢座哩?
可以配置了“high-availability.cluster-id”參數(shù),據(jù)此從 Zookeeper 上獲取到 JobManager 的地址和端口粮彤,從而提交作業(yè)根穷。
如果 Yarn session 沒(méi)有配置 HA姜骡,又該如何提交呢?
這個(gè)時(shí)候就必須要在提交 Flink job 的命令中指明 Yarn 上的 Application ID屿良,通過(guò)“-yid”參數(shù)傳入:
/bin/flink run -yid application_1548056325049_0048 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
我們可以發(fā)現(xiàn)圈澈,每次跑完任務(wù)不久,TaskManager 就被釋放了管引,下次在提交任務(wù)的時(shí)候士败,TaskManager 又會(huì)重新拉起來(lái)。如果希望延長(zhǎng)空閑 TaskManager 的超時(shí)時(shí)間褥伴,可以在 conf/flink-conf.yaml 文件中配置下面這個(gè)參數(shù)谅将,單位是 milliseconds:
slotmanager.taskmanager-timeout: 30000L # deprecated, used in release-1.5
(2)在 Yarn 上運(yùn)行單個(gè) Flink job(Job Cluster 模式)
如果你只想運(yùn)行單個(gè) Flink Job 后就退出,那么可以用下面這個(gè)命令:
./bin/flink run -m yarn-cluster -yn 2 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
常用的配置有:
-yn,–yarncontainer Number of Task Managers
-yqu,–yarnqueue Specify YARN queue.
-ys,–yarnslots Number of slots per TaskManager
-yqu,–yarnqueue Specify YARN queue.
可以通過(guò) Help 命令查看 Run 的可用參數(shù):
./bin/flink run -h
我們可以看到重慢,“./bin/flink run -h”看到的“Options for yarn-cluster mode”中的“-y”和“–yarn”為前綴的參數(shù)其實(shí)和“./bin/yarn-session.sh -h”命令是一一對(duì)應(yīng)的饥臂,語(yǔ)義上也基本一致。
關(guān)于“-n”(在 yarn session 模式下)似踱、“-yn”在(yarn single job 模式下)與“-p”參數(shù)的關(guān)系:
“-n”和“-yn”在社區(qū)版本中(Release-1.5 ~ Release-1.7)中沒(méi)有實(shí)際的控制作用隅熙,實(shí)際的資源是根據(jù)“-p”參數(shù)來(lái)申請(qǐng)的,并且 TM 使用完后就會(huì)歸還
在 Blink 的開(kāi)源版本中核芽,“-n”(在 Yarn Session 模式下)的作用就是一開(kāi)始啟動(dòng)指定數(shù)量的 TaskManager囚戚,之后即使 Job 需要更多的 Slot,也不會(huì)申請(qǐng)新的 TaskManager
在 Blink 的開(kāi)源版本中轧简,Yarn single job 模式“-yn”表示的是初始 TaskManager 的數(shù)量驰坊,不設(shè)置 TaskManager 的上限。(需要特別注意的是哮独,只有加上“-yd”參數(shù)才能用 Single job 模式(例如:命令“./bin/flink run -yd -m yarn-cluster xxx”)
7. Yarn 模式下的 HighAvailability 配置
首先要確保啟動(dòng) Yarn 集群用的“yarn-site.xml”文件中的這個(gè)配置拳芙,這個(gè)是 Yarn 集群級(jí)別 AM 重啟的上限。
<property>
然后在 conf/flink-conf.yaml 文件中配置這個(gè) Flink job 的 JobManager 能夠重啟的次數(shù)皮璧。
yarn.application-attempts: 10 # 1+ 9 retries
最后再在 conf/flink-conf.yaml 文件中配置上 ZK 相關(guān)配置舟扎,這幾個(gè)配置的配置方法和 Standalone 的 HA 配置方法基本一致,如下所示悴务。
# 配置 high-availability mode
需要特別注意的是:“high-availability.cluster-id”這個(gè)配置最好去掉睹限,因?yàn)樵?Yarn(以及 Mesos)模式下,cluster-id 如果不配置的話(huà)惨寿,會(huì)配置成 Yarn 上的 Application ID 邦泄,從而可以保證唯一性。