Flink --- 第三期

Apache Flink 零基礎(chǔ)入門(mén)(三):開(kāi)發(fā)環(huán)境搭建和應(yīng)用的配置官辽、部署及運(yùn)行

[天池大數(shù)據(jù)科研平臺(tái)](javascript:void(0);) 9月2日

文章轉(zhuǎn)載自公眾號(hào)

Ververica 泥张, 作者 沙晟陽(yáng)

本文是根據(jù) Apache Flink 系列直播課程整理而成劫笙,由阿里巴巴高級(jí)開(kāi)發(fā)工程師沙晟陽(yáng)分享,主要面向于初次接觸 Flink涕俗、或者對(duì) Flink 有了解但是沒(méi)有實(shí)際操作過(guò)的同學(xué)贡定。希望幫助大家更順利地上手使用 Flink闸衫,并著手相關(guān)開(kāi)發(fā)調(diào)試工作献丑。

主要內(nèi)容:

  • Flink 開(kāi)發(fā)環(huán)境的部署和配置

  • 運(yùn)行 Flink 應(yīng)用

  • 單機(jī) Standalone 模式

  • 多機(jī) Standalone 模式

  • Yarn 集群模式

一. Flink 開(kāi)發(fā)環(huán)境部署和配置

Flink 是一個(gè)以 Java 及 Scala 作為開(kāi)發(fā)語(yǔ)言的開(kāi)源大數(shù)據(jù)項(xiàng)目末捣,代碼開(kāi)源在 GitHub 上侠姑,并使用 Maven 來(lái)編譯和構(gòu)建項(xiàng)目创橄。對(duì)于大部分使用 Flink 的同學(xué)來(lái)說(shuō),Java莽红、Maven 和 Git 這三個(gè)工具是必不可少的妥畏,另外一個(gè)強(qiáng)大的 IDE 有助于我們更快的閱讀代碼、開(kāi)發(fā)新功能以及修復(fù) Bug安吁。因?yàn)槠拮硪希覀儾粫?huì)詳述每個(gè)工具的安裝細(xì)節(jié),但會(huì)給出必要的安裝建議鬼店。

關(guān)于開(kāi)發(fā)測(cè)試環(huán)境网棍,Mac OS、Linux 系統(tǒng)或者 Windows 都可以妇智。如果使用的是 Windows 10 系統(tǒng)滥玷,建議使用 Windows 10 系統(tǒng)的 Linux 子系統(tǒng)來(lái)編譯和運(yùn)行。

工具 注釋
Java Java 版本至少是 Java 8巍棱,且最好選用 Java 8u51 及以上版本
Maven 必須使用 Maven 3惑畴,建議使用 Maven 3.2.5。Maven 3.3.x 能夠編譯成功航徙,但是在 Shade 一些 Dependencies 的過(guò)程中有些問(wèn)題
Git Flink 的代碼倉(cāng)庫(kù)是: https://github.com/apache/flink

建議選用社區(qū)已發(fā)布的穩(wěn)定分支如贷,比如 Release-1.6 或者 Release-1.7。

1. 編譯 Flink 代碼

在我們配置好之前的幾個(gè)工具后到踏,編譯 Flink 就非常簡(jiǎn)單了杠袱,執(zhí)行如下命令即可:

mvn clean install -DskipTests

常用編譯參數(shù):

Dfast  主要是忽略QA plugins和JavaDocs的編譯

當(dāng)成功編譯完成后,能在當(dāng)前 Flink 代碼目錄下的 flink-dist/target/子目錄 中看到如下文件(不同的 Flink 代碼分支編譯出的版本號(hào)不同窝稿,這里的版本號(hào)是 Flink 1.5.1):

image

其中有三個(gè)文件可以留意一下:

版本 注釋
flink-1.5.1.tar.gz Binary 的壓縮包
flink-1.5.1-bin/flink-1.5.1 解壓后的 Flink binary 目錄
flink-dist_2.11-1.5.1.jar 包含 Flink 核心功能的 jar 包

注意: 國(guó)內(nèi)用戶(hù)在編譯時(shí)可能遇到編譯失敗“Build Failure”(且有 MapR 相關(guān)報(bào)錯(cuò))楣富,一般都和 MapR 相關(guān)依賴(lài)的下載失敗有關(guān),即使使用了推薦的 settings.xml 配置(其中 Aliyun Maven 源專(zhuān)門(mén)為 MapR 相關(guān)依賴(lài)做了代理)讹躯,還是可能出現(xiàn)下載失敗的情況菩彬。問(wèn)題主要和 MapR 的 Jar 包比較大有關(guān)。遇到這些問(wèn)題時(shí)潮梯,重試即可骗灶。在重試之前,要先根據(jù)失敗信息刪除 Maven local repository 中對(duì)應(yīng)的目錄秉馏,否則需要等待 Maven 下載的超時(shí)時(shí)間才能再次出發(fā)下載依賴(lài)到本地耙旦。

2. 開(kāi)發(fā)環(huán)境準(zhǔn)備

推薦使用 IntelliJ IDEA IDE 作為 Flink 的 IDE 工具。官方不建議使用 Eclipse IDE萝究,主要原因是 Eclipse 的 Scala IDE 和 Flink 用 Scala 的不兼容免都。

如果你需要做一些 Flink 代碼的開(kāi)發(fā)工作锉罐,則需要根據(jù) Flink 代碼的 tools/maven/目錄 下的配置文件來(lái)配置 Checkstyle ,因?yàn)?Flink 在編譯時(shí)會(huì)強(qiáng)制代碼風(fēng)格的檢查绕娘,如果代碼風(fēng)格不符合規(guī)范脓规,可能會(huì)直接編譯失敗。

二险领、運(yùn)行 Flink 應(yīng)用

1. 基本概念

運(yùn)行 Flink 應(yīng)用其實(shí)非常簡(jiǎn)單侨舆,但是在運(yùn)行 Flink 應(yīng)用之前,還是有必要了解 Flink 運(yùn)行時(shí)的各個(gè)組件绢陌,因?yàn)檫@涉及到 Flink 應(yīng)用的配置問(wèn)題挨下。圖 1 所示,這是用戶(hù)用 DataStream API 寫(xiě)的一個(gè)數(shù)據(jù)處理程序脐湾〕舭剩可以看到,在一個(gè) DAG 圖中不能被 Chain 在一起的 Operator 會(huì)被分隔到不同的 Task 中秤掌,也就是說(shuō) Task 是 Flink 中資源調(diào)度的最小單位愁铺。

image.png

圖 1 Parallel Dataflows

圖 2 所示,F(xiàn)link 實(shí)際運(yùn)行時(shí)包括兩類(lèi)進(jìn)程:

  • JobManager(又稱(chēng)為 JobMaster):協(xié)調(diào) Task 的分布式執(zhí)行机杜,包括調(diào)度 Task帜讲、協(xié)調(diào)創(chuàng) Checkpoint 以及當(dāng) Job failover 時(shí)協(xié)調(diào)各個(gè) Task 從 Checkpoint 恢復(fù)等。

  • TaskManager(又稱(chēng)為 Worker):執(zhí)行 Dataflow 中的 Tasks椒拗,包括內(nèi)存 Buffer 的分配似将、Data Stream 的傳遞等。


    Flink Runtime 架構(gòu)圖.png

圖 2 Flink Runtime 架構(gòu)圖

圖 3 所示蚀苛,Task Slot 是一個(gè) TaskManager 中的最小資源分配單位在验,一個(gè) TaskManager 中有多少個(gè) Task Slot 就意味著能支持多少并發(fā)的 Task 處理。需要注意的是堵未,一個(gè) Task Slot 中可以執(zhí)行多個(gè) Operator腋舌,一般這些 Operator 是能被 Chain 在一起處理的。

Process.png

2. 運(yùn)行環(huán)境準(zhǔn)備

  • 準(zhǔn)備 Flink binary

  • 直接從 Flink 官網(wǎng)上下載 Flink binary 的壓縮包

  • 或者從 Flink 源碼編譯而來(lái)

  • 安裝 Java渗蟹,并配置 JAVA_HOME 環(huán)境變量

3. 單機(jī) Standalone 的方式運(yùn)行 Flink

(1)基本的啟動(dòng)流程

最簡(jiǎn)單的運(yùn)行 Flink 應(yīng)用的方法就是以單機(jī) Standalone 的方式運(yùn)行块饺。

啟動(dòng)集群:

./bin/start-cluster.sh

打開(kāi) http://127.0.0.1:8081/ 就能看到 Flink 的 Web 界面。嘗試提交 Word Count 任務(wù):

./bin/flink run examples/streaming/WordCount.jar

大家可以自行探索 Web 界面中展示的信息雌芽,比如授艰,我們可以看看 TaskManager 的 stdout 日志,就可以看到 Word Count 的計(jì)算結(jié)果世落。

我們還可以嘗試通過(guò)“–input”參數(shù)指定我們自己的本地文件作為輸入淮腾,然后執(zhí)行:

./bin/flink run examples/streaming/WordCount.jar --input ${your_source_file}

停止集群:

./bin/stop-cluster.sh

(2)常用配置介紹

  • conf / slaves

conf / slaves 用于配置 TaskManager 的部署,默認(rèn)配置下只會(huì)啟動(dòng)一個(gè) TaskManager 進(jìn)程,如果想增加一個(gè) TaskManager 進(jìn)程的谷朝,只需要文件中追加一行“l(fā)ocalhost”洲押。

也可以直接通過(guò)“ ./bin/taskmanager.sh start ”這個(gè)命令來(lái)追加一個(gè)新的 TaskManager:

./bin/taskmanager.sh start|start-foreground|stop|stop-all
  • **conf/flink-conf.yaml **

用于配置 JM 和 TM 的運(yùn)行參數(shù),常用配置有:

# The heap size for the JobManager JVM

Standalone 集群?jiǎn)?dòng)后圆凰,我們可以嘗試分析一下 Flink 相關(guān)進(jìn)程的運(yùn)行情況杈帐。執(zhí)行 jps 命令,可以看到 Flink 相關(guān)的進(jìn)程主要有兩個(gè)送朱,一個(gè)是 JobManager 進(jìn)程娘荡,另一個(gè)是 TaskManager 進(jìn)程。我們可以進(jìn)一步用 ps 命令看看進(jìn)程的啟動(dòng)參數(shù)中“-Xmx”和“-Xms”的配置驶沼。然后我們可以嘗試修改 flink-conf.yaml 中若干配置,然后重啟 Standalone 集群看看發(fā)生了什么變化争群。

需要補(bǔ)充的是回怜,在 Blink 開(kāi)源分支上,TaskManager 的內(nèi)存計(jì)算上相對(duì)于現(xiàn)在的社區(qū)版本要更精細(xì)化换薄,TaskManager 進(jìn)程的堆內(nèi)存限制(-Xmx)一般的計(jì)算方法是:

TotalHeapMemory = taskmanager.heap.mb + taskmanager.managed.memory.size + taskmanager.process.heap.memory.mb(默認(rèn)值為128MB)

而最新的 Flink 社區(qū)版本 Release-1.7 中 JobManager 和 TaskManager 默認(rèn)內(nèi)存配置方式為:

# The heap size for the JobManager JVM

Flink 社區(qū) Release-1.7 版本中的“taskmanager.heap.size”配置實(shí)際上指的不是 Java heap 的內(nèi)存限制玉雾,而是 TaskManager 進(jìn)程總的內(nèi)存限制。我們可以同樣用上述方法查看 Release-1.7 版本的 Flink binary 啟動(dòng)的 TaskManager 進(jìn)程的 -Xmx 配置轻要,會(huì)發(fā)現(xiàn)實(shí)際進(jìn)程上的 -Xmx 要小于配置的“taskmanager.heap.size”的值复旬,原因在于從中扣除了 Network buffer 用的內(nèi)存,因?yàn)?Network buffer 用的內(nèi)存一定是 Direct memory冲泥,所以不應(yīng)該算在堆內(nèi)存限制中驹碍。

(3)日志的查看和配置

JobManager 和 TaskManager 的啟動(dòng)日志可以在 Flink binary 目錄下的 Log 子目錄中找到。Log 目錄中以“flink-user?standalonesession?{id}-${hostname}”為前綴的文件對(duì)應(yīng)的是 JobManager 的輸出凡恍,其中有三個(gè)文件:

  • flink-user?standalonesession?{id}-${hostname}.log:代碼中的日志輸出

  • flink-user?standalonesession?{id}-${hostname}.out:進(jìn)程執(zhí)行時(shí)的 stdout 輸出

  • flink-user?standalonesession?{id}-${hostname}-gc.log:JVM 的 GC 的日志

Log 目錄中以“flink-user?taskexecutor?{id}-${hostname}”為前綴的文件對(duì)應(yīng)的是 TaskManager 的輸出志秃,也包括三個(gè)文件,和 JobManager 的輸出一致嚼酝。

日志的配置文件在 Flink binary 目錄的 conf 子目錄下浮还,其中:

  • log4j-cli.properties:用 Flink 命令行時(shí)用的 log 配置,比如執(zhí)行“ flink run”命令

  • log4j-yarn-session.properties:用 yarn-session.sh 啟動(dòng)時(shí)命令行執(zhí)行時(shí)用的 log 配置

  • log4j.properties:無(wú)論是 Standalone 還是 Yarn 模式闽巩,JobManager 和 TaskManager 上用的 log 配置都是 log4j.properties

這三個(gè)“l(fā)og4j.properties”文件分別有三個(gè)“l(fā)ogback.xml”文件與之對(duì)應(yīng)钧舌,如果想使用 Logback 的同學(xué),之需要把與之對(duì)應(yīng)的“l(fā)og4j.*properties”文件刪掉即可涎跨,對(duì)應(yīng)關(guān)系如下:

  • log4j-cli.properties -> logback-console.xml

  • log4j-yarn-session.properties -> logback-yarn.xml

  • log4j.properties -> logback.xml

需要注意的是洼冻,“flink-user?standalonesession?{id}-hostname”和“flink?{user}-taskexecutor-id?{hostname}”都帶有“id”,“{id}”表示本進(jìn)程在本機(jī)上該角色(JobManager 或 TaskManager)的所有進(jìn)程中的啟動(dòng)順序六敬,默認(rèn)從 0 開(kāi)始碘赖。

(4)進(jìn)一步探索

嘗試重復(fù)執(zhí)行“./bin/start-cluster.sh”命令,然后看看 Web 頁(yè)面(或者執(zhí)行 jps 命令),看看會(huì)發(fā)生什么普泡?可以嘗試看看啟動(dòng)腳本播掷,分析一下原因。接著可以重復(fù)執(zhí)行“./bin/stop-cluster.sh”撼班,每次執(zhí)行完后歧匈,看看會(huì)發(fā)生什么。

4. 多機(jī)部署 Flink Standalone 集群

部署前要注意的要點(diǎn):

  • 每臺(tái)機(jī)器上配置好 Java 以及 JAVA_HOME 環(huán)境變量

  • 每臺(tái)機(jī)器上部署的 Flink binary 的目錄要保證是同一個(gè)目錄

  • 如果需要用 HDFS砰嘁,需要配置 HADOOP_CONF_DIR 環(huán)境變量配置

根據(jù)你的集群信息修改 conf/masters 和 conf/slaves 配置件炉。

修改 conf/flink-conf.yaml 配置,注意要確保和 Masters 文件中的地址一致:

jobmanager.rpc.address: z05f06378.sqa.zth.tbsite.net

確保所有機(jī)器的 Flink binary 目錄中 conf 中的配置文件相同矮湘,特別是以下三個(gè):

conf/masters

然后啟動(dòng) Flink 集群:

./bin/start-cluster.sh

提交 WordCount 作業(yè):

./bin/flink run examples/streaming/WordCount.jar

上傳 WordCount 的 Input 文件:

hdfs dfs -copyFromLocal story /test_dir/input_dir/story

提交讀寫(xiě) HDFS 的 WordCount 作業(yè):

./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output

增加 WordCount 作業(yè)的并發(fā)度(注意輸出文件重名會(huì)提交失斦迕帷):

./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output --parallelism 20

5. Standalone 模式的 HighAvailability(HA)部署和配置

通過(guò)圖 2 Flink Runtime 架構(gòu)圖,我們可以看到 JobManager 是整個(gè)系統(tǒng)中最可能導(dǎo)致系統(tǒng)不可用的角色缅阳。如果一個(gè) TaskManager 掛了磕蛇,在資源足夠的情況下,只需要把相關(guān) Task 調(diào)度到其他空閑 TaskSlot 上十办,然后 Job 從 Checkpoint 中恢復(fù)即可秀撇。而如果當(dāng)前集群中只配置了一個(gè) JobManager,則一旦 JobManager 掛了向族,就必須等待這個(gè) JobManager 重新恢復(fù)呵燕,如果恢復(fù)時(shí)間過(guò)長(zhǎng),就可能導(dǎo)致整個(gè) Job 失敗件相。

因此如果在生產(chǎn)業(yè)務(wù)使用 Standalone 模式再扭,則需要部署配置 HighAvailability,這樣同時(shí)可以有多個(gè) JobManager 待命适肠,從而使得 JobManager 能夠持續(xù)服務(wù)霍衫。

Flink JobManager HA 示意圖.png

注意:

  • 如果想使用 Flink standalone HA 模式,需要確焙钛基于 Flink Release-1.6.1 及以上版本敦跌,因?yàn)檫@里社區(qū)有個(gè) bug 會(huì)導(dǎo)致這個(gè)模式下主 JobManager 不能正常工作。
  • 接下來(lái)的實(shí)驗(yàn)中需要用到 HDFS逛揩,所以需要下載帶有 Hadoop 支持的 Flink Binary 包柠傍。

(1)(可選)使用 Flink 自帶的腳本部署 Zookeeper

Flink 目前支持基于 Zookeeper 的 HA。如果你的集群中沒(méi)有部署 ZK辩稽,F(xiàn)link 提供了啟動(dòng) Zookeeper 集群的腳本惧笛。首先修改配置文件“conf/zoo.cfg”,根據(jù)你要部署的 Zookeeper Server 的機(jī)器數(shù)來(lái)配置“server.X=addressX:peerPort:leaderPort”逞泄,其中“X”是一個(gè) Zookeeper Server 的唯一 ID患整,且必須是數(shù)字拜效。

# The port at which the clients will connect

然后啟動(dòng) Zookeeper:

./bin/start-zookeeper-quorum.sh

jps 命令看到 Zookeeper 進(jìn)程已經(jīng)啟動(dòng):

image.png

停掉 Zookeeper 集群的命令:

./bin/stop-zookeeper-quorum.sh

(2)修改 Flink Standalone 集群的配置

修改 conf/masters 文件,增加一個(gè) JobManager:

$cat conf/masters

之前修改過(guò)的 conf/slaves 文件保持不變:

$cat conf/slaves

修改 conf/flink-conf.yaml 文件:

# 配置 high-availability mode

需要注意的是各谚,在 HA 模式下 conf/flink-conf.yaml 中的這兩個(gè)配置都失效了(想想為什么)紧憾。

jobmanager.rpc.address

修改完成后,確保配置同步到其他機(jī)器昌渤。

啟動(dòng) Zookeeper 集群:

./bin/start-zookeeper-quorum.sh

再啟動(dòng) Standalone 集群(要確保之前的 Standalone 集群已經(jīng)停掉):

./bin/start-cluster.sh

分別打開(kāi)兩個(gè) Master 節(jié)點(diǎn)上的 JobManager Web 頁(yè)面:

http://z05f06378.sqa.zth.tbsite.net:8081

http://z05c19426.sqa.zth.tbsite.net:8081

可以看到兩個(gè)頁(yè)面最后都轉(zhuǎn)到了同一個(gè)地址上赴穗,這個(gè)地址就是當(dāng)前主 JobManager 所在機(jī)器,另一個(gè)就是 Standby JobManager膀息。以上我們就完成了 Standalone 模式下 HA 的配置般眉。

接下來(lái)我們可以測(cè)試驗(yàn)證 HA 的有效性。當(dāng)我們知道主 JobManager 的機(jī)器后潜支,我們可以把主 JobManager 進(jìn)程 Kill 掉甸赃,比如當(dāng)前主 JobManager 在 z05c19426.sqa.zth.tbsite.net 這個(gè)機(jī)器上,就把這個(gè)進(jìn)程殺掉毁腿。

接著辑奈,再打開(kāi)這兩個(gè)鏈接:

http://z05f06378.sqa.zth.tbsite.net:8081

http://z05c19426.sqa.zth.tbsite.net:8081

可以發(fā)現(xiàn)后一個(gè)鏈接已經(jīng)不能展示了,而前一個(gè)鏈接可以展示已烤,說(shuō)明發(fā)生主備切換。

然后我們?cè)僦貑⑶耙淮蔚闹?JobManager:

./bin/jobmanager.sh start z05c19426.sqa.zth.tbsite.net 8081

再打開(kāi) (http://z05c19426.sqa.zth.tbsite.net:8081) 這個(gè)鏈接妓羊,會(huì)發(fā)現(xiàn)現(xiàn)在這個(gè)鏈接可以轉(zhuǎn)到 (http://z05f06378.sqa.zth.tbsite.net:8081) 這個(gè)頁(yè)面上了胯究。說(shuō)明這個(gè) JobManager 完成了一個(gè) Failover Recovery。

6. 使用 Yarn 模式跑 Flink job

Flink Yarn 部署流程圖.png

圖 5

相對(duì)于 Standalone 模式躁绸,Yarn 模式允許 Flink job 的好處有:

  • 資源按需使用裕循,提高集群的資源利用率

  • 任務(wù)有優(yōu)先級(jí),根據(jù)優(yōu)先級(jí)運(yùn)行作業(yè)

  • 基于 Yarn 調(diào)度系統(tǒng)净刮,能夠自動(dòng)化地處理各個(gè)角色的 Failover

  • JobManager 進(jìn)程和 TaskManager 進(jìn)程都由 Yarn NodeManager 監(jiān)控

  • 如果 JobManager 進(jìn)程異常退出剥哑,則 Yarn ResourceManager 會(huì)重新調(diào)度 JobManager 到其他機(jī)器

  • 如果 TaskManager 進(jìn)程異常退出,JobManager 會(huì)收到消息并重新向 Yarn ResourceManager 申請(qǐng)資源淹父,重新啟動(dòng) TaskManager

(1)在 Yarn 上啟動(dòng) Long Running 的 Flink 集群(Session Cluster 模式)

查看命令參數(shù):

./bin/yarn-session.sh -h

創(chuàng)建一個(gè) Yarn 模式的 Flink 集群:

./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m

其中用到的參數(shù)是:

  • -n,–container Number of TaskManagers

  • -jm,–jobManagerMemory Memory for JobManager Container with optional unit (default: MB)

  • -tm,–taskManagerMemory Memory per TaskManager Container with optional unit (default: MB)

  • -qu,–queue Specify YARN queue.

  • -s,–slots Number of slots per TaskManager

  • -t,–ship Ship files in the specified directory (t for transfer)

提交一個(gè) Flink job 到 Flink 集群:

./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output

這次提交 Flink job,雖然沒(méi)有指定對(duì)應(yīng) Yarn application 的信息,卻可以提交到對(duì)應(yīng)的 Flink 集群茴丰,原因在于“/tmp/.yarn-properties-${user}”文件中保存了上一次創(chuàng)建 Yarn session 的集群信息召衔。所以如果同一用戶(hù)在同一機(jī)器上再次創(chuàng)建一個(gè) Yarn session,則這個(gè)文件會(huì)被覆蓋掉蘸际。

  • 如果刪掉“/tmp/.yarn-properties-${user}”或者在另一個(gè)機(jī)器上提交作業(yè)能否提交到預(yù)期到 yarn session 中呢座哩?

    可以配置了“high-availability.cluster-id”參數(shù),據(jù)此從 Zookeeper 上獲取到 JobManager 的地址和端口粮彤,從而提交作業(yè)根穷。

  • 如果 Yarn session 沒(méi)有配置 HA姜骡,又該如何提交呢?

這個(gè)時(shí)候就必須要在提交 Flink job 的命令中指明 Yarn 上的 Application ID屿良,通過(guò)“-yid”參數(shù)傳入:

/bin/flink run -yid application_1548056325049_0048 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output

我們可以發(fā)現(xiàn)圈澈,每次跑完任務(wù)不久,TaskManager 就被釋放了管引,下次在提交任務(wù)的時(shí)候士败,TaskManager 又會(huì)重新拉起來(lái)。如果希望延長(zhǎng)空閑 TaskManager 的超時(shí)時(shí)間褥伴,可以在 conf/flink-conf.yaml 文件中配置下面這個(gè)參數(shù)谅将,單位是 milliseconds:

slotmanager.taskmanager-timeout: 30000L         # deprecated, used in release-1.5

(2)在 Yarn 上運(yùn)行單個(gè) Flink job(Job Cluster 模式)

如果你只想運(yùn)行單個(gè) Flink Job 后就退出,那么可以用下面這個(gè)命令:

./bin/flink run -m yarn-cluster -yn 2 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output

常用的配置有:

  • -yn,–yarncontainer Number of Task Managers

  • -yqu,–yarnqueue Specify YARN queue.

  • -ys,–yarnslots Number of slots per TaskManager

  • -yqu,–yarnqueue Specify YARN queue.

可以通過(guò) Help 命令查看 Run 的可用參數(shù):

./bin/flink run -h

我們可以看到重慢,“./bin/flink run -h”看到的“Options for yarn-cluster mode”中的“-y”和“–yarn”為前綴的參數(shù)其實(shí)和“./bin/yarn-session.sh -h”命令是一一對(duì)應(yīng)的饥臂,語(yǔ)義上也基本一致。

關(guān)于“-n”(在 yarn session 模式下)似踱、“-yn”在(yarn single job 模式下)與“-p”參數(shù)的關(guān)系:

  • “-n”和“-yn”在社區(qū)版本中(Release-1.5 ~ Release-1.7)中沒(méi)有實(shí)際的控制作用隅熙,實(shí)際的資源是根據(jù)“-p”參數(shù)來(lái)申請(qǐng)的,并且 TM 使用完后就會(huì)歸還

  • 在 Blink 的開(kāi)源版本中核芽,“-n”(在 Yarn Session 模式下)的作用就是一開(kāi)始啟動(dòng)指定數(shù)量的 TaskManager囚戚,之后即使 Job 需要更多的 Slot,也不會(huì)申請(qǐng)新的 TaskManager

  • 在 Blink 的開(kāi)源版本中轧简,Yarn single job 模式“-yn”表示的是初始 TaskManager 的數(shù)量驰坊,不設(shè)置 TaskManager 的上限。(需要特別注意的是哮独,只有加上“-yd”參數(shù)才能用 Single job 模式(例如:命令“./bin/flink run -yd -m yarn-cluster xxx”)

7. Yarn 模式下的 HighAvailability 配置

首先要確保啟動(dòng) Yarn 集群用的“yarn-site.xml”文件中的這個(gè)配置拳芙,這個(gè)是 Yarn 集群級(jí)別 AM 重啟的上限。

     <property>

然后在 conf/flink-conf.yaml 文件中配置這個(gè) Flink job 的 JobManager 能夠重啟的次數(shù)皮璧。

yarn.application-attempts: 10     # 1+ 9 retries

最后再在 conf/flink-conf.yaml 文件中配置上 ZK 相關(guān)配置舟扎,這幾個(gè)配置的配置方法和 Standalone 的 HA 配置方法基本一致,如下所示悴务。

# 配置 high-availability mode

需要特別注意的是:“high-availability.cluster-id”這個(gè)配置最好去掉睹限,因?yàn)樵?Yarn(以及 Mesos)模式下,cluster-id 如果不配置的話(huà)惨寿,會(huì)配置成 Yarn 上的 Application ID 邦泄,從而可以保證唯一性。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末裂垦,一起剝皮案震驚了整個(gè)濱河市顺囊,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌蕉拢,老刑警劉巖特碳,帶你破解...
    沈念sama閱讀 211,123評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件诚亚,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡午乓,警方通過(guò)查閱死者的電腦和手機(jī)站宗,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,031評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)益愈,“玉大人梢灭,你說(shuō)我怎么就攤上這事≌羝洌” “怎么了敏释?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,723評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)摸袁。 經(jīng)常有香客問(wèn)我钥顽,道長(zhǎng),這世上最難降的妖魔是什么靠汁? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,357評(píng)論 1 283
  • 正文 為了忘掉前任蜂大,我火速辦了婚禮,結(jié)果婚禮上蝶怔,老公的妹妹穿的比我還像新娘奶浦。我一直安慰自己,他們只是感情好踢星,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,412評(píng)論 5 384
  • 文/花漫 我一把揭開(kāi)白布财喳。 她就那樣靜靜地躺著,像睡著了一般斩狱。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上扎瓶,一...
    開(kāi)封第一講書(shū)人閱讀 49,760評(píng)論 1 289
  • 那天所踊,我揣著相機(jī)與錄音,去河邊找鬼概荷。 笑死秕岛,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的误证。 我是一名探鬼主播继薛,決...
    沈念sama閱讀 38,904評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼愈捅!你這毒婦竟也來(lái)了遏考?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,672評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤蓝谨,失蹤者是張志新(化名)和其女友劉穎灌具,沒(méi)想到半個(gè)月后青团,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,118評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡咖楣,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,456評(píng)論 2 325
  • 正文 我和宋清朗相戀三年督笆,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片诱贿。...
    茶點(diǎn)故事閱讀 38,599評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡娃肿,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出珠十,到底是詐尸還是另有隱情料扰,我是刑警寧澤,帶...
    沈念sama閱讀 34,264評(píng)論 4 328
  • 正文 年R本政府宣布宵睦,位于F島的核電站记罚,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏壳嚎。R本人自食惡果不足惜桐智,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,857評(píng)論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望烟馅。 院中可真熱鬧说庭,春花似錦、人聲如沸郑趁。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,731評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)寡润。三九已至捆憎,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間梭纹,已是汗流浹背躲惰。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,956評(píng)論 1 264
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留变抽,地道東北人础拨。 一個(gè)月前我還...
    沈念sama閱讀 46,286評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像绍载,于是被迫代替她去往敵國(guó)和親诡宗。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,465評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容