Flink On Yarn 架構(gòu)
Paste_Image.png
前提條件
首先需要配置YARN_CONF_DIR, HADOOP_CONF_DIR ,HADOOP_CONF_PATH其中一個(gè)用來(lái)確保Flink能夠訪問(wèn)HDFS和Yarn的RM拱燃。
主要啟動(dòng)流程
1. 啟動(dòng)進(jìn)程
首先我們通過(guò)下面的命令行啟動(dòng)flink on yarn的集群
bin/yarn-session.sh -n 3 -jm 1024 -nm 1024 -st
這里將產(chǎn)生總共五個(gè)進(jìn)程
- ** 1個(gè)FlinkYarnSessionCli ---> Yarn Client **
- ** 1個(gè)YarnApplicationMasterRunner ---> AM + JobManager**
- 3個(gè)YarnTaskManager --> TaskManager
即一個(gè)客戶端+4個(gè)container,1個(gè)container啟動(dòng)AM,3個(gè)container啟動(dòng)TaskManager来惧。
2.啟動(dòng)流程
- FlinkYarnSessionCli 啟動(dòng)的過(guò)程中首先會(huì)檢查Yarn上有沒(méi)有足夠的資源去啟動(dòng)所需要的container洽蛀,如果有,則上傳一些flink的jar和配置文件到HDFS见妒,這里主要是啟動(dòng)AM進(jìn)程和TaskManager進(jìn)程的相關(guān)依賴jar包和配置文件孤荣。
- 2.接著yarn client會(huì)首先向RM申請(qǐng)一個(gè)container來(lái) ApplicationMaster(YarnApplicationMasterRunner進(jìn)程),然后RM會(huì)通知其中一個(gè)NM啟動(dòng)這個(gè)container须揣,被分配到啟動(dòng)AM的NM會(huì)首先去HDFS上下載第一步上傳的jar包和配置文件到本地盐股,接著啟動(dòng)AM;在這個(gè)過(guò)程中會(huì)啟動(dòng)JobManager返敬,因?yàn)镴obManager和AM在同一進(jìn)程里面,它會(huì)把JobManager的地址重新作為一個(gè)文件上傳到HDFS上去遂庄,TaskManager在啟動(dòng)的過(guò)程中也會(huì)去下載這個(gè)文件獲取JobManager的地址,然后與其進(jìn)行通信劲赠;AM還負(fù)責(zé)Flink的web 服務(wù)涛目,F(xiàn)link里面用到的都是隨機(jī)端口,這樣就允許了用戶能夠啟動(dòng)多個(gè)yarn session凛澎。
啟動(dòng)命令:
Paste_Image.png - 3.AM 啟動(dòng)完成以后霹肝,就會(huì)向AM申請(qǐng)container去啟動(dòng)TaskManager,啟動(dòng)的過(guò)程中也是首先從HDFS上去下載一些包含TaskManager(yarn模式的話這里就是YarnTaskManager )主類 的jar和啟動(dòng)過(guò)程依賴的配置文件,如JobManager地址所在的文件塑煎,然后利用java cp的方式去啟動(dòng)YarnTaskManager 沫换,一旦這些準(zhǔn)備好,就可以接受任務(wù)了最铁。這個(gè)和spark on yarn的yarn cluster模式其實(shí)差不多讯赏,也是分為兩個(gè)部分垮兑,一個(gè)是準(zhǔn)備工人和工具(spark是啟動(dòng)sc的過(guò)程,flink是初始化ENV的過(guò)程)漱挎,另外一個(gè)就是給工人分配具體工作(都是執(zhí)行具體的操作系枪,action什么的觸發(fā))。
啟動(dòng)命令:
Paste_Image.png
進(jìn)程信息
- ** FlinkYarnSessionCli **
/home/hadoop/ym/jdk1.8.0_101/bin/java -Xmx512m -classpath /home/hadoop/ym/flink-1.1.3/lib/flink-dist_2.10-1.1.3.jar:/home/hadoop/ym/flink-1.1.3/lib/flink-python_2.10-1.1.3.jar:/home/hadoop/ym/flink-1.1.3/lib/log4j-1.2.17.jar:/home/hadoop/ym/flink-1.1.3/lib/slf4j-log4j12-1.7.7.jar::/home/hadoop/ym/hadoop-2.7.1/etc/hadoop: -Dlog.file=/home/hadoop/ym/flink-1.1.3/log/flink-xxxuser-yarn-session-db-180.photo.163.org.log -Dlog4j.configuration=file:/home/hadoop/ym/flink-1.1.3/conf/log4j-yarn-session.properties -Dlogback.configurationFile=file:/home/hadoop/ym/flink-1.1.3/conf/logback-yarn.xml org.apache.flink.yarn.cli.FlinkYarnSessionCli -j /home/hadoop/ym/flink-1.1.3/lib/flink-dist_2.10-1.1.3.jar -n 3 -jm 1024 -nm 1024 -st - ** YarnApplicationMasterRunner **
/home/hadoop/ym/jdk1.8.0_101/bin/java -Xmx424M -Dlog.file=/home/hadoop/ym/hadoop-2.7.1/hadoop/nm/application_1480493133223_0009/container_1480493133223_0009_01_000001/jobmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnApplicationMasterRunner - **個(gè)YarnTaskManager **
/home/hadoop/ym/jdk1.8.0_101/bin/java -Xms424m -Xmx424m -XX:MaxDirectMemorySize=424m -Dlog.file=/home/hadoop/ym/hadoop-2.7.1/hadoop/nm/application_1480493133223_0009/container_1480493133223_0009_01_000003/taskmanager.log -Dlogback.configurationFile=file:./logback.xml -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.YarnTaskManager --configDir .
后面我會(huì)從源代碼的角度看下啟動(dòng)流程