Flink on yarn部署模式

Flink on yarn部署模式


背景

Flink是一個高性能滴某,高吞吐,低延遲的流處理框架。它不僅僅是作為一個流式處理框架食店,更將批處理統(tǒng)一了起來(在Flink中,批處理是流處理的一種特例)赏寇。Flink的這種架構(gòu)吉嫩,也更好的解決了傳統(tǒng)大數(shù)據(jù)架構(gòu)那種繁瑣的組件堆積,讓批流能夠在不改變原有代碼的基礎(chǔ)上嗅定,進(jìn)行批處理或者流處理自娩。實(shí)現(xiàn)了Flink支持多種部署方式local,standalone,yarn以及k8s,現(xiàn)在大多數(shù)企業(yè)因?yàn)榇髷?shù)據(jù)平臺都以yarn作為資源管理器,所以為了方便管理渠退,很多企業(yè)選擇了Flink on yarn這種模式忙迁。當(dāng)然隨著容器云火熱脐彩,不少企業(yè)選擇K8S作為大數(shù)據(jù)平臺的整個資源管理器,這個時候可以選擇將Flink部署到K8S之上动漾。下面重點(diǎn)介紹現(xiàn)階段Flink on yarn在企業(yè)中的應(yīng)用丁屎。

Flink on Yarn交互過程概覽

flink on yarn的整個交互過程圖,如下:

要使得flink運(yùn)行于yarn上旱眯,flink要能找到hadoop配置晨川,因?yàn)橐B接到y(tǒng)arn的resourcemanager和hdfs∩静颍可以使用下面的策略來指定hadoop配置:

  • 1.會查看YARN_CONF_DIR共虑,HADOOP_CONF_DIR或者HADOOP_CONF_PATH是否設(shè)置,按照順序檢查的呀页。然后妈拌,假如配置了就會從該文件夾下讀取配置。

  • 2.如果上面環(huán)境變量都沒有配置的話蓬蝶,會使用HADOOP_HOME環(huán)境變量尘分。對于hadoop2的話會查找的配置路徑是 $HADOOP_HOME/etc/hadoop;對于hadoop1會查找的路徑是$HADOOP_HOME/conf.

每當(dāng)創(chuàng)建一個新flink的yarn session的時候,客戶端會首先檢查要請求的資源(containers和memory)是否可用丸氛。然后培愁,將包含flink相關(guān)的jar包盒配置上傳到hdfs。

接下來就是客戶端會向resourcemanager申請一個yarn container 用以啟動ApplicationMaster缓窜。由于客戶端已經(jīng)將配置和jar文件注冊為了container的資源定续,所以nodemanager會直接使用這些資源準(zhǔn)備好container(例如,下載文件等)禾锤。一旦該過程結(jié)束私股,AM就被啟動了。

Jobmanager和AM運(yùn)行于同一個container恩掷。一旦創(chuàng)建成功倡鲸,AM就知道了Jobmanager的地址。它會生成一個新的flink配置文件黄娘,這個配置文件是給將要啟動的taskManager用的峭状,該配置文件也會上傳到hdfs。另外寸宏,AM的container也提供了Flink的web接口宁炫。Yarn代碼申請的端口都是臨時端口,目的是為了讓用戶并行啟動多個Flink YARN Session氮凝。

最后羔巢,AM開始申請啟動Flink Taskmanager的containers,這些container會從hdfs上下載jar文件和已修改的配置文件。一旦這些步驟完成竿秆,flink就可以接受任務(wù)了启摄。

Flink 作業(yè)提交

因Flink強(qiáng)大的靈活性及開箱即用的原則, 因此提交作業(yè)分為2種情況:

  • yarn seesion

  • Flink run

這2者對于現(xiàn)有大數(shù)據(jù)平臺資源使用率有著很大的區(qū)別:

  • 1.第一種yarn seesion(Start a long-running Flink cluster on YARN)這種方式需要先啟動集群幽钢,然后在提交作業(yè)歉备,接著會向yarn申請一塊空間后,資源永遠(yuǎn)保持不變匪燕。如果資源滿了蕾羊,下一個作業(yè)就無法提交,只能等到y(tǒng)arn中的其中一個作業(yè)執(zhí)行完成后帽驯,釋放了資源龟再,那下一個作業(yè)才會正常提交.

  • 2.第二種Flink run直接在YARN上提交運(yùn)行Flink作業(yè)(Run a Flink job on YARN),這種方式的好處是一個任務(wù)會對應(yīng)一個job,即沒提交一個作業(yè)會根據(jù)自身的情況尼变,向yarn申請資源利凑,直到作業(yè)執(zhí)行完成,并不會影響下一個作業(yè)的正常運(yùn)行嫌术,除非是yarn上面沒有任何資源的情況下哀澈。

綜合以上這2種的示意圖如下:

注意事項(xiàng):如果是平時的本地測試或者開發(fā),可以采用第一種方案度气;如果是生產(chǎn)環(huán)境推薦使用第二種方案割按;

Flink on yarn模式部署時,不需要對Flink做任何修改配置蚯嫌,只需要將其解壓傳輸?shù)礁鱾€節(jié)點(diǎn)之上哲虾。但如果要實(shí)現(xiàn)高可用的方案丙躏,這個時候就需要到Flink相應(yīng)的配置修改參數(shù)择示,具體的配置文件是FLINK_HOME/conf/flink-conf.yaml。

對于Flink on yarn模式晒旅,我們并不需要在conf配置下配置 masters和slaves栅盲。因?yàn)樵谥付═M的時候可以通過參數(shù)“-n”來標(biāo)識需要啟動幾個TM;Flink on yarn啟動后,如果是在分離式模式你會發(fā)現(xiàn)废恋,在所有的節(jié)點(diǎn)只會出現(xiàn)一個 YarnSessionClusterEntrypoint進(jìn)程谈秫;如果是客戶端模式會出現(xiàn)2個進(jìn)程一個YarnSessionClusterEntrypoint和一個FlinkYarnSessionCli進(jìn)程。

Flink yarn session部署

用yarn session在啟動集群時鱼鼓,有2種方式可以進(jìn)行集群啟動分別是:

  • 客戶端模式

  • 分離式模式

客戶端模式

默認(rèn)可以直接執(zhí)行bin/yarn-session.sh 默認(rèn)啟動的配置是

{masterMemoryMB=1024, taskManagerMemoryMB=1024,numberTaskManagers=1, slotsPerTaskManager=1}

需要自己自定義配置的話拟烫,可以使用來查看參數(shù):

bin/yarn-session.sh –help
Usage:
   Required
     -n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers)
   Optional
     -D <property=value>             use value for given property
     -d,--detached                   If present, runs the job in detached mode
     -h,--help                       Help for the Yarn session CLI.
     -id,--applicationId <arg>       Attach to running YARN session
     -j,--jar <arg>                  Path to Flink jar file
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
     -m,--jobmanager <arg>           Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
     -n,--container <arg>            Number of YARN container to allocate (=Number of Task Managers)
     -nl,--nodeLabel <arg>           Specify YARN node label for the YARN application
     -nm,--name <arg>                Set a custom name for the application on YARN
     -q,--query                      Display available YARN resources (memory, cores)
     -qu,--queue <arg>               Specify YARN queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -sae,--shutdownOnAttachedExit   If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such
                                     as typing Ctrl + C.
     -st,--streaming                 Start Flink in streaming mode
     -t,--ship <arg>                 Ship files in the specified directory (t for transfer)
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
     -yd,--yarndetached              If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode
yarn-session的參數(shù)介紹
  -n : 指定TaskManager的數(shù)量;
  -d: 以分離模式運(yùn)行迄本;
  -id:指定yarn的任務(wù)ID硕淑;
  -j:Flink jar文件的路徑;
  -jm:JobManager容器的內(nèi)存(默認(rèn)值:MB);
  -nl:為YARN應(yīng)用程序指定YARN節(jié)點(diǎn)標(biāo)簽;
  -nm:在YARN上為應(yīng)用程序設(shè)置自定義名稱;
  -q:顯示可用的YARN資源(內(nèi)存,內(nèi)核);
  -qu:指定YARN隊(duì)列;
  -s:指定TaskManager中slot的數(shù)量;
  -st:以流模式啟動Flink;
  -tm:每個TaskManager容器的內(nèi)存(默認(rèn)值:MB);
  -z:命名空間,用于為高可用性模式創(chuàng)建Zookeeper子路徑;

我們啟動一個yarn-session有2個Taskmanager置媳,jobmanager內(nèi)存2GB于樟,taskManager2GB內(nèi)存,那么腳本編寫應(yīng)該是這樣的:

./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024

啟動的進(jìn)程

[iknow@data-hadoop-50-63 ~]$ jps
186144 SecondaryNameNode
301825 ResourceManager
185845 DataNode
76966 Worker
457498 Jps
302171 NodeManager
457097 FlinkYarnSessionCli
185597 NameNode
[iknow@data-hadoop-50-64 ~]$ jps
269396 Jps
248059 NodeManager
39624 Worker
246509 DataNode
[iknow@data-hadoop-50-64 ~]$ jps
269697 Jps
248059 NodeManager
39624 Worker
269576 YarnSessionClusterEntrypoint
246509 DataNode

系統(tǒng)默認(rèn)使用con/flink-conf.yaml里的配置拇囊。Flink on yarn將會覆蓋掉幾個參數(shù):jobmanager.rpc.address因?yàn)閖obmanager的在集群的運(yùn)行位置并不是實(shí)現(xiàn)確定的迂曲,前面也說到了就是am的地址;taskmanager.tmp.dirs使用yarn給定的臨時目錄;parallelism.default也會被覆蓋掉寥袭,如果在命令行里指定了slot數(shù)路捧。

如果你想保證conf/flink-conf.yaml僅是全局末日配置,然后針對要啟動的每一個yarn-session.sh都設(shè)置自己的配置传黄,那么可以考慮使用-D修飾鬓长。

日志如下:

iknow@data-hadoop-50-63 flink-1.7.2]$ ./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024
2019-02-22 11:26:54,048 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, 192.168.50.63
2019-02-22 11:26:54,049 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2019-02-22 11:26:54,050 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2019-02-22 11:26:54,050 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
2019-02-22 11:26:54,050 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2019-02-22 11:26:54,050 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2019-02-22 11:26:54,051 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.port, 18081
2019-02-22 11:26:54,392 WARN  org.apache.hadoop.util.NativeCodeLoader                       - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-02-22 11:26:54,450 INFO  org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set to iknow (auth:SIMPLE)
2019-02-22 11:26:54,506 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at /0.0.0.0:8032
2019-02-22 11:26:54,606 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - The argument n is deprecated in will be ignored.
2019-02-22 11:26:54,713 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=2, slotsPerTaskManager=1}
2019-02-22 11:26:55,023 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The configuration directory ('/home/iknow/xxx/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2019-02-22 11:26:56,027 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Submitting application master application_1550804667415_0004
2019-02-22 11:26:56,058 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1550804667415_0004
2019-02-22 11:26:56,058 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Waiting for the cluster to be allocated
2019-02-22 11:26:56,060 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deploying cluster, current state ACCEPTED
2019-02-22 11:26:59,340 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - YARN application has been deployed successfully.
2019-02-22 11:26:59,652 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
Flink JobManager is now running on data-hadoop-50-63.bjrs.xxx.com:37730 with leader id 00000000-0000-0000-0000-000000000000.
JobManager Web Interface: http://data-hadoop-50-63.bjrs.xxx.com:37730

JobManager Web Interface: http://data-hadoop-50-63.bjrs.xxx.com:37730
是jobManager的ui,查看yarn上的任務(wù)尝江,發(fā)現(xiàn)0004一直是運(yùn)行狀態(tài)

點(diǎn)擊ApplicationMaster涉波,看到有一個任務(wù)運(yùn)行完成了。

這個任務(wù)是通過下面的命令提交的炭序,運(yùn)行任務(wù)之前要把flink 下的LICENSE文件上傳到hdfs

./bin/flink run ./examples/batch/WordCount.jar -input hdfs://192.168.50.63:9000/LICENSE -output hdfs://192.168.50.63:9000/wordcount-result_1.txt

運(yùn)行日志如下:

[iknow@data-hadoop-50-63 flink-1.7.2]$ ./bin/flink run ./examples/batch/WordCount.jar -input hdfs://192.168.50.63:9000/LICENSE -output hdfs://192.168.50.63:9000/wordcount-result_1.txt
2019-02-22 11:36:58,256 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-iknow.
2019-02-22 11:36:58,256 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-iknow.
2019-02-22 11:36:58,504 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - YARN properties set default parallelism to 2
2019-02-22 11:36:58,504 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - YARN properties set default parallelism to 2
YARN properties set default parallelism to 2
2019-02-22 11:36:58,543 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at /0.0.0.0:8032
2019-02-22 11:36:58,633 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-02-22 11:36:58,633 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-02-22 11:36:58,699 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Found application JobManager host name 'data-hadoop-50-63.bjrs.xxx.com' and port '37730' from supplied application id 'application_1550804667415_0004'
Starting execution of program
Program execution finished
Job with JobID 6d1a05cfd324111904bb1749c50ef5d6 has finished.
Job Runtime: 9851 ms

Flink ui上點(diǎn)擊進(jìn)這個任務(wù)

對于客戶端模式而言啤覆,你可以啟動多個yarn session,一個yarn session模式對應(yīng)一個JobManager,并按照需求提交作業(yè)惭聂,同一個Session中可以提交多個Flink作業(yè)窗声。如果想要停止Flink Yarn Application,需要通過yarn application -kill命令來停止.

[iknow@data-hadoop-50-63 ~]$ yarn application -kill application_1550836652097_0002

殺掉任務(wù)之后再查看進(jìn)程

[iknow@data-hadoop-50-63 ~]$ jps
1809 Jps
186144 SecondaryNameNode
301825 ResourceManager
185845 DataNode
76966 Worker
302171 NodeManager
185597 NameNode
[iknow@data-hadoop-50-64 ~]$ jps
269842 Jps
248059 NodeManager
39624 Worker
246509 DataNode
分離式模式

對于分離式模式辜纲,并不像客戶端那樣可以啟動多個yarn session笨觅,如果啟動多個,會出現(xiàn)下面的session一直處在等待狀態(tài)耕腾。JobManager的個數(shù)只能是一個见剩,同一個Session中可以提交多個Flink作業(yè)。如果想要停止Flink Yarn Application扫俺,需要通過yarn application -kill命令來停止苍苞。通過-d指定分離模式,即客戶端在啟動Flink Yarn Session后狼纬,就不再屬于Yarn Cluster的一部分羹呵。

yarn-session啟動的時候可以指定-nm的參數(shù),這個就是給你的yarn-session起一個名字疗琉。比如

bin/yarn-session.sh -nm yarn-session_test

這個時候要停止該yarn-session.sh必須要用yarn的命令了yarn application –kill <appid>

[iknow@data-hadoop-50-63 ~]$ yarn application -kill application_1550836652097_0007
19/02/24 09:13:11 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
Killing application application_1550836652097_0006
19/02/24 09:13:12 INFO impl.YarnClientImpl: Killed application application_1550836652097_0006

./bin/yarn-session.sh -nm test3 -d


[iknow@data-hadoop-50-63 flink-1.7.2]$ ./bin/yarn-session.sh -nm test3 -d
2019-02-24 17:31:40,471 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, 192.168.50.63
2019-02-24 17:31:40,473 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2019-02-24 17:31:40,473 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2019-02-24 17:31:40,473 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
2019-02-24 17:31:40,473 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2019-02-24 17:31:40,473 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2019-02-24 17:31:40,474 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.port, 18081
2019-02-24 17:31:40,482 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-iknow.
2019-02-24 17:31:40,843 WARN  org.apache.hadoop.util.NativeCodeLoader                       - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-02-24 17:31:40,901 INFO  org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set to iknow (auth:SIMPLE)
2019-02-24 17:31:40,954 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at /0.0.0.0:8032
2019-02-24 17:31:41,136 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1}
2019-02-24 17:31:41,439 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The configuration directory ('/home/iknow/xxx/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2019-02-24 17:31:42,007 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Submitting application master application_1550836652097_0014
2019-02-24 17:31:42,038 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1550836652097_0014
2019-02-24 17:31:42,038 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Waiting for the cluster to be allocated
2019-02-24 17:31:42,039 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deploying cluster, current state ACCEPTED
2019-02-24 17:31:45,560 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - YARN application has been deployed successfully.
2019-02-24 17:31:45,560 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The Flink YARN client has b
een started in detached mode. In order to stop Flink on YARN, use the following command or a YARN web interface to stop it:
yarn application -kill application_1550836652097_0014
Please also note that the temporary files of the YARN session in the home directory will not be removed.
2019-02-24 17:31:45,864 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
Flink JobManager is now running on data-hadoop-50-63.bj.xxxcom:42513 with leader id 00000000-0000-0000-0000-000000000000.
JobManager Web Interface: http://data-hadoop-50-63.bj.xxx.com:42513
2019-02-24 17:31:45,879 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - The Flink YARN client has been started in detached mode. In order to stop Flink on YARN, use the following command or a YARN web interface to stop it:
yarn application -kill application_1550836652097_0014

Flink run 方式提交

對于前面介紹的yarn session需要先啟動一個集群冈欢,然后在提交作業(yè)。對于Flink run直接提交作業(yè)就相對比較簡單盈简,不需要額外的去啟動一個集群凑耻,直接提交作業(yè)犯戏,即可完成Flink作業(yè)。

flink run參數(shù)介紹:

-c:如果沒有在jar包中指定入口類拳话,則需要在這里通過這個參數(shù)指定;
   -m:指定需要連接的jobmanager(主節(jié)點(diǎn))地址先匪,使用這個參數(shù)可以指定一個不同于配置文件中的jobmanager,可以說是yarn集群名稱;
-p:指定程序的并行度弃衍⊙椒牵可以覆蓋配置文件中的默認(rèn)值;
-n:允許跳過保存點(diǎn)狀態(tài)無法恢復(fù)。 你需要允許如果您從中刪除了一個運(yùn)算符你的程序是的一部分保存點(diǎn)時的程序觸發(fā);
-q:如果存在镜盯,則禁止將日志記錄輸出標(biāo)準(zhǔn)出來;
-s:保存點(diǎn)的路徑以還原作業(yè)來自(例如hdfs:///flink/savepoint-1537);
還有參數(shù)如果在yarn-session當(dāng)中沒有指定岸裙,可以在yarn-session參數(shù)的基礎(chǔ)上前面加“y”,即可控制所有的資源速缆,這里就不獒述了降允。
./bin/flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar

日志如下:

[iknow@data-hadoop-50-63 flink-1.7.2]$ ./bin/flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar
2019-02-21 19:54:06,718 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at /0.0.0.0:8032
2019-02-21 19:54:06,815 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-02-21 19:54:06,815 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-02-21 19:54:06,822 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - The argument yn is deprecated in will be ignored.
2019-02-21 19:54:06,822 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - The argument yn is deprecated in will be ignored.
2019-02-21 19:54:06,931 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1}
2019-02-21 19:54:07,231 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The configuration directory ('/home/iknow/xxx/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2019-02-21 19:54:10,039 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Submitting application master application_1550749966977_0002
2019-02-21 19:54:10,400 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1550749966977_0002
2019-02-21 19:54:10,400 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Waiting for the cluster to be allocated
2019-02-21 19:54:10,403 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deploying cluster, current state ACCEPTED
2019-02-21 19:54:14,451 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - YARN application has been deployed successfully.
Starting execution of program
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
(arrows,1)
(awry,1)
(ay,1)
(bare,1)
(be,4)
(bear,3)
(bodkin,1)
(bourn,1)
(but,1)
(by,2)
(calamity,1)
(cast,1)
(coil,1)
(come,1)
(conscience,1)
(consummation,1)
(contumely,1)
(country,1)
(cowards,1)
(currents,1)
(d,4)
(death,2)
(delay,1)
(despis,1)
(devoutly,1)
(die,2)
(does,1)
(dread,1)
(dream,1)
(dreams,1)
(end,2)
(enterprises,1)
(er,1)
(fair,1)
(fardels,1)
(flesh,1)
(fly,1)
(for,2)
(fortune,1)
(from,1)
(give,1)
(great,1)
(grunt,1)
(have,2)
(he,1)
(heartache,1)
(heir,1)
(himself,1)
(his,1)
(hue,1)
(ills,1)
(in,3)
(insolence,1)
(is,3)
(know,1)
(law,1)
(life,2)
(long,1)
(lose,1)
(love,1)
(make,2)
(makes,2)
(man,1)
(may,1)
(merit,1)
(might,1)
(mind,1)
(moment,1)
(more,1)
(mortal,1)
(must,1)
(my,1)
(name,1)
(native,1)
(natural,1)
(no,2)
(nobler,1)
(not,2)
(now,1)
(nymph,1)
(o,1)
(of,15)
(off,1)
(office,1)
(ophelia,1)
(opposing,1)
(oppressor,1)
(or,2)
(orisons,1)
(others,1)
(outrageous,1)
(pale,1)
(pangs,1)
(patient,1)
(pause,1)
(perchance,1)
(pith,1)
(proud,1)
(puzzles,1)
(question,1)
(quietus,1)
(rather,1)
(regard,1)
(remember,1)
(resolution,1)
(respect,1)
(returns,1)
(rub,1)
(s,5)
(say,1)
(scorns,1)
(sea,1)
(shocks,1)
(shuffled,1)
(sicklied,1)
(sins,1)
(sleep,5)
(slings,1)
(so,1)
(soft,1)
(something,1)
(spurns,1)
(suffer,1)
(sweat,1)
(take,1)
(takes,1)
(than,1)
(that,7)
(the,22)
(their,1)
(them,1)
(there,2)
(these,1)
(this,2)
(those,1)
(thought,1)
(thousand,1)
(thus,2)
(thy,1)
(time,1)
(tis,2)
(to,15)
(traveller,1)
(troubles,1)
(turn,1)
(under,1)
(undiscover,1)
(unworthy,1)
(us,3)
(we,4)
(weary,1)
(what,1)
(when,2)
(whether,1)
(whips,1)
(who,2)
(whose,1)
(will,1)
(wish,1)
(with,3)
(would,2)
(wrong,1)
(you,1)
Program execution finished
Job with JobID 8fb799387976bc6426b3ebdcf1e80dfd has finished.
Job Runtime: 8027 ms
Accumulator Results:
- 464ea1424ef5011784a0d0cbc837baba (java.util.ArrayList) [170 elements]

Yarn上也是有flink任務(wù)的


程序正在運(yùn)行,過一段時間再看艺糜,狀態(tài)是successed


192.168.50.63上測試從hdfs讀入數(shù)據(jù)剧董,寫到hdfs

[iknow@data-hadoop-50-63 flink-1.7.2]$ hdfs dfs -put LICENSE /
[iknow@data-hadoop-50-63 flink-1.7.2]$ ./bin/flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 1024  ./examples/batch/WordCount.jar -input hdfs://192.168.50.63:9000/LICENSE -output hdfs://192.168.50.63:9000/wordcount-result.txt

運(yùn)行日志如下:

[iknow@data-hadoop-50-63 flink-1.7.2]$ ./bin/flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 1024  ./examples/batch/WordCount.jar -input hdfs://192.168.50.63:9000/LICENSE -output hdfs://192.168.50.63:9000/wordcount-result.txt
2019-02-22 11:20:26,060 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at /0.0.0.0:8032
2019-02-22 11:20:26,154 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-02-22 11:20:26,154 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-02-22 11:20:26,161 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - The argument yn is deprecated in will be ignored.
2019-02-22 11:20:26,161 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - The argument yn is deprecated in will be ignored.
2019-02-22 11:20:26,273 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1}
2019-02-22 11:20:26,581 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The configuration directory ('/home/iknow/xxx/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2019-02-22 11:20:28,005 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Submitting application master application_1550804667415_0003
2019-02-22 11:20:28,028 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1550804667415_0003
2019-02-22 11:20:28,028 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Waiting for the cluster to be allocated
2019-02-22 11:20:28,030 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deploying cluster, current state ACCEPTED
2019-02-22 11:20:31,563 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - YARN application has been deployed successfully.
Starting execution of program
Program execution finished
Job with JobID 16e83cc5a60f9cd82ed39d72bcc62110 has finished.
Job Runtime: 8281 ms

查看文件wordcount-result.txt是有數(shù)據(jù)的


運(yùn)行到指定的yarn session

指定yarn applicationID 來運(yùn)行到特定的yarn session

首先查看./bin/flink run 命令的說明

iknow@search-aa-4-59:~/xxx/flink-pangu $ ./bin/flink run --help
2019-02-26 20:32:50,218 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-iknow.
2019-02-26 20:32:50,218 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-iknow.
2019-02-26 20:32:50,540 WARN  org.apache.hadoop.conf.Configuration                          - /home/iknow/hadoop-client/etc/hadoop/core-site.xml:an attempt to override final parameter: fs.defaultFS;  Ignoring.
2019-02-26 20:32:50,542 WARN  org.apache.hadoop.conf.Configuration                          - /home/iknow/hadoop-client/etc/hadoop/hdfs-site.xml:an attempt to override final parameter: dfs.datanode.data.dir;  Ignoring.
2019-02-26 20:32:50,543 WARN  org.apache.hadoop.conf.Configuration                          - /home/iknow/hadoop-client/etc/hadoop/hdfs-site.xml:an attempt to override final parameter: dfs.namenode.name.dir;  Ignoring.

Action "run" compiles and runs a program.

  Syntax: run [OPTIONS] <jar-file> <arguments>
  "run" action options:
     -c,--class <classname>               Class with the program entry point
                                          ("main" method or "getPlan()" method.
                                          Only needed if the JAR file does not
                                          specify the class in its manifest.
     -C,--classpath <url>                 Adds a URL to each user code
                                          classloader  on all nodes in the
                                          cluster. The paths must specify a
                                          protocol (e.g. file://) and be
                                          accessible on all nodes (e.g. by means
                                          of a NFS share). You can use this
                                          option multiple times for specifying
                                          more than one URL. The protocol must
                                          be supported by the {@link
                                          java.net.URLClassLoader}.
     -d,--detached                        If present, runs the job in detached
                                          mode
     -n,--allowNonRestoredState           Allow to skip savepoint state that
                                          cannot be restored. You need to allow
                                          this if you removed an operator from
                                          your program that was part of the
                                          program when the savepoint was
                                          triggered.
     -p,--parallelism <parallelism>       The parallelism with which to run the
                                          program. Optional flag to override the
                                          default value specified in the
                                          configuration.
     -q,--sysoutLogging                   If present, suppress logging output to
                                          standard out.
     -s,--fromSavepoint <savepointPath>   Path to a savepoint to restore the job
                                          from (for example
                                          hdfs:///flink/savepoint-1537).
     -sae,--shutdownOnAttachedExit        If the job is submitted in attached
                                          mode, perform a best-effort cluster
                                          shutdown when the CLI is terminated
                                          abruptly, e.g., in response to a user
                                          interrupt, such as typing Ctrl + C.
  Options for yarn-cluster mode:
     -d,--detached                        If present, runs the job in detached
                                          mode
     -m,--jobmanager <arg>                Address of the JobManager (master) to
                                          which to connect. Use this flag to
                                          connect to a different JobManager than
                                          the one specified in the
                                          configuration.
     -sae,--shutdownOnAttachedExit        If the job is submitted in attached
                                          mode, perform a best-effort cluster
                                          shutdown when the CLI is terminated
                                          abruptly, e.g., in response to a user
                                          interrupt, such as typing Ctrl + C.
     -yD <property=value>                 use value for given property
     -yd,--yarndetached                   If present, runs the job in detached
                                          mode (deprecated; use non-YARN
                                          specific option instead)
     -yh,--yarnhelp                       Help for the Yarn session CLI.
     -yid,--yarnapplicationId <arg>       Attach to running YARN session
     -yj,--yarnjar <arg>                  Path to Flink jar file
     -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container with
                                          optional unit (default: MB)
     -yn,--yarncontainer <arg>            Number of YARN container to allocate
                                          (=Number of Task Managers)
     -ynl,--yarnnodeLabel <arg>           Specify YARN node label for the YARN
                                          application
     -ynm,--yarnname <arg>                Set a custom name for the application
                                          on YARN
     -yq,--yarnquery                      Display available YARN resources
                                          (memory, cores)
     -yqu,--yarnqueue <arg>               Specify YARN queue.
     -ys,--yarnslots <arg>                Number of slots per TaskManager
     -yst,--yarnstreaming                 Start Flink in streaming mode
     -yt,--yarnship <arg>                 Ship files in the specified directory
                                          (t for transfer)
     -ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container with
                                          optional unit (default: MB)
     -yz,--yarnzookeeperNamespace <arg>   Namespace to create the Zookeeper
                                          sub-paths for high availability mode
     -z,--zookeeperNamespace <arg>        Namespace to create the Zookeeper
                                          sub-paths for high availability mode

  Options for default mode:
     -m,--jobmanager <arg>           Address of the JobManager (master) to which
                                     to connect. Use this flag to connect to a
                                     different JobManager than the one specified
                                     in the configuration.
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths
                                     for high availability mode

可以指定yid -yid,--yarnapplicationId <arg> Attach to running YARN session來運(yùn)行到特定的yarn session

我們指定運(yùn)行到ID為application_1550579025929_62420的yarn-session

./bin/flink run -yid application_1550579025929_62420 ./examples/batch/WordCount.jar -input hdfs://data-hadoop-112-16.bj.xxx.com:8020/flume/events-.1539684881482 -output hdfs://data-hadoop-112-16.bj.xxx.com:8020/flink/flink-test02.txt

運(yùn)行日志如下:

iknow@search-aa-4-59:~/xxx/flink-pangu $ ./bin/flink run -yid application_1550579025929_62420 ./examples/batch/WordCount.jar -input hdfs://data-hadoop-xx-xx.bj.xxx.com:8020/flume/events-.1539684881482 -output hdfs://data-hadoop-112-16.bj.xxx.com:8020/flink/flink-test02.txt
2019-02-26 20:33:48,393 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-iknow.
2019-02-26 20:33:48,393 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-iknow.
2019-02-26 20:33:48,723 WARN  org.apache.hadoop.conf.Configuration                          - /home/iknow/hadoop-client/etc/hadoop/core-site.xml:an attempt to override final parameter: fs.defaultFS;  Ignoring.
2019-02-26 20:33:48,725 WARN  org.apache.hadoop.conf.Configuration                          - /home/iknow/hadoop-client/etc/hadoop/hdfs-site.xml:an attempt to override final parameter: dfs.datanode.data.dir;  Ignoring.
2019-02-26 20:33:48,726 WARN  org.apache.hadoop.conf.Configuration                          - /home/iknow/hadoop-client/etc/hadoop/hdfs-site.xml:an attempt to override final parameter: dfs.namenode.name.dir;  Ignoring.
2019-02-26 20:33:49,080 INFO  org.apache.hadoop.yarn.client.AHSProxy                        - Connecting to Application History server at data-hadoop-112-16.bjrs.xxx.com/192.168.112.16:10200
2019-02-26 20:33:49,094 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-02-26 20:33:49,094 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-02-26 20:33:49,105 INFO  org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider  - Looking for the active RM in [rm1, rm2]...
2019-02-26 20:33:49,265 INFO  org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider  - Found active RM [rm1]
2019-02-26 20:33:49,272 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Found application JobManager host name 'search-as-107-45.bj.xxx.com' and port '52901' from supplied application id 'application_1550579025929_62420'
Starting execution of program
2019-02-26 20:33:49,754 WARN  org.apache.hadoop.conf.Configuration                          - /home/iknow/hadoop-client/etc/hadoop/core-site.xml:an attempt to override final parameter: fs.defaultFS;  Ignoring.
2019-02-26 20:33:49,756 WARN  org.apache.hadoop.conf.Configuration                          - /home/iknow/hadoop-client/etc/hadoop/hdfs-site.xml:an attempt to override final parameter: dfs.datanode.data.dir;  Ignoring.
2019-02-26 20:33:49,757 WARN  org.apache.hadoop.conf.Configuration                          - /home/iknow/hadoop-client/etc/hadoop/hdfs-site.xml:an attempt to override final parameter: dfs.namenode.name.dir;  Ignoring.
2019-02-26 20:33:50,108 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
Program execution finished
Job with JobID 7331813a31914c4009493de57cc6e7e2 has finished.
Job Runtime: 26410 ms

JobManager的web ui上是有jobID為7331813a31914c4009493de57cc6e7e2的任務(wù)的。


hdfs上也有flink-test02.txt文件生成破停。


參考:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末翅楼,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子真慢,更是在濱河造成了極大的恐慌毅臊,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,755評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件黑界,死亡現(xiàn)場離奇詭異管嬉,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)朗鸠,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評論 3 395
  • 文/潘曉璐 我一進(jìn)店門蚯撩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人童社,你說我怎么就攤上這事求厕≈。” “怎么了扰楼?”我有些...
    開封第一講書人閱讀 165,138評論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長美浦。 經(jīng)常有香客問我弦赖,道長,這世上最難降的妖魔是什么浦辨? 我笑而不...
    開封第一講書人閱讀 58,791評論 1 295
  • 正文 為了忘掉前任蹬竖,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘币厕。我一直安慰自己列另,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評論 6 392
  • 文/花漫 我一把揭開白布旦装。 她就那樣靜靜地躺著页衙,像睡著了一般。 火紅的嫁衣襯著肌膚如雪阴绢。 梳的紋絲不亂的頭發(fā)上店乐,一...
    開封第一講書人閱讀 51,631評論 1 305
  • 那天,我揣著相機(jī)與錄音呻袭,去河邊找鬼眨八。 笑死,一個胖子當(dāng)著我的面吹牛左电,可吹牛的內(nèi)容都是我干的廉侧。 我是一名探鬼主播,決...
    沈念sama閱讀 40,362評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼篓足,長吁一口氣:“原來是場噩夢啊……” “哼伏穆!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起纷纫,我...
    開封第一講書人閱讀 39,264評論 0 276
  • 序言:老撾萬榮一對情侶失蹤枕扫,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后辱魁,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體烟瞧,經(jīng)...
    沈念sama閱讀 45,724評論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年染簇,在試婚紗的時候發(fā)現(xiàn)自己被綠了参滴。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,040評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡锻弓,死狀恐怖砾赔,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情青灼,我是刑警寧澤暴心,帶...
    沈念sama閱讀 35,742評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站杂拨,受9級特大地震影響专普,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜弹沽,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評論 3 330
  • 文/蒙蒙 一檀夹、第九天 我趴在偏房一處隱蔽的房頂上張望筋粗。 院中可真熱鬧,春花似錦炸渡、人聲如沸娜亿。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽暇唾。三九已至,卻和暖如春辰斋,著一層夾襖步出監(jiān)牢的瞬間策州,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評論 1 270
  • 我被黑心中介騙來泰國打工宫仗, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留够挂,地道東北人。 一個月前我還...
    沈念sama閱讀 48,247評論 3 371
  • 正文 我出身青樓藕夫,卻偏偏與公主長得像孽糖,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子毅贮,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評論 2 355

推薦閱讀更多精彩內(nèi)容