Spark on Yarn
首先這部分分為源碼部分以及實(shí)例部分钞楼,例子中包括最基本的通過spark-submit提交以及程序中提交yarn
這里僅僅說明Spark on Yarn的第一部分匀归,分為三塊:
- 原理
- spark-submit提交yarn程序
- IDEA代碼提交yarn程序
1 原理
Spark yarn 模式有兩種衡未, yarn-client, yarn-cluster, 其中yarn-client適合測(cè)試環(huán)境歇式, yarn-cluster適合生產(chǎn)環(huán)境。
在詳細(xì)說明Yarn模式之前孩等, 需要先了解幾個(gè)名詞
ResourceManager: 整個(gè)集群只有一個(gè), 負(fù)責(zé)集群資源的統(tǒng)一管理和調(diào)度璃哟, 因?yàn)檎麄€(gè)集群只有一個(gè),所以也有單點(diǎn)問題喊递,
NodeManager: 它可以理解為集群中的每一臺(tái)slave
AM: application master, 對(duì)于每一個(gè)應(yīng)用程序都有一個(gè)AM, AM主要是向RM申請(qǐng)資源(資源其實(shí)就是Container随闪, 目前這個(gè)Container就是cpu cores, memory), 然后在每個(gè)NodeManager上啟動(dòng)Executors(進(jìn)一步分布資源給內(nèi)部任務(wù)), 監(jiān)控跟蹤應(yīng)用程序的進(jìn)程等骚勘。
這里就引入了YARN的調(diào)度框架問題: 雙層調(diào)度框架
(1)RM統(tǒng)一管理集群資源铐伴,分配資源給AM
(2)AM將資源進(jìn)一步分配給Tasks
1.1 Yarn-cluster模式
下面來具體說說Spark Yarn Cluster的流程:
(1) Client端啟動(dòng)應(yīng)用程序,提交APP到Y(jié)ARN RM
(2)RM收到請(qǐng)求之后俏讹, 就會(huì)在集群中隨機(jī)選擇一個(gè)NM盛杰, 為該應(yīng)用程序分配第一個(gè)Container, 然后在這個(gè)Contaiiner上啟動(dòng)AM藐石,AM則實(shí)現(xiàn)了SC等的初始化
(3)AM啟動(dòng)時(shí)會(huì)向RM注冊(cè)即供,并向RM申請(qǐng)資源
(4)AM一旦申請(qǐng)到資源也就是Container之后, 會(huì)在對(duì)應(yīng)的Container(Container信息里面會(huì)包含NM節(jié)點(diǎn)信息)啟動(dòng)Executor
(5)AM的SC會(huì)分配任務(wù)及給Executor進(jìn)行執(zhí)行(之前Executor會(huì)去向AM中的SC注冊(cè))于微, 同時(shí)EXecutor會(huì)向AM匯報(bào)運(yùn)行的狀態(tài)和進(jìn)度逗嫡,也就是上面綠色的通信
(6)AM向RM注冊(cè)之后, AM會(huì)定時(shí)向RM匯報(bào)程序的運(yùn)行狀態(tài)等信息株依,也就是上面紅色部分的通信驱证。
1.2 Yarn-client
其具體流程和上面的yarn-cluster很類似
(1) Client端啟動(dòng)應(yīng)用程序,提交APP到Y(jié)ARN RM恋腕, 這個(gè)過程則涉及到SC的初始化抹锄,SC啟動(dòng)時(shí)會(huì)去初始化DAGScheduler調(diào)度器, 使用反射方法去初始化YarnScheduler 和 YarnClientSchedulerBackend荠藤,最終Client會(huì)去向RM申請(qǐng)啟動(dòng)AM
(2)RM收到請(qǐng)求之后伙单, 就會(huì)在集群中隨機(jī)選擇一個(gè)NM, 為該應(yīng)用程序分配第一個(gè)Container哈肖, 然后在這個(gè)Contaiiner上啟動(dòng)AM吻育,AM則實(shí)現(xiàn)了SC等的初始化, 此處與yarn-cluster不同的是淤井,SC的初始化驅(qū)動(dòng)程序的啟動(dòng)并不在AM中布疼, 也就是AM并不是Driver端,但是AM會(huì)和SC通信來獲取其需要的資源情況(多少cpu, 多少memory)
(3)當(dāng)客戶端的SC與AM啟動(dòng)完畢币狠,會(huì)通信游两, AM可知道SC需要的資源情況, 然后AM會(huì)向RM注冊(cè)漩绵, 并向RM申請(qǐng)資源Container
(4)AM申請(qǐng)到資源Container之后贱案, 會(huì)與COntainer對(duì)應(yīng)的NodeManager通信, 要求他在其Container里面啟動(dòng)Executor渐行, 然后去向客戶端的SparkContext注冊(cè)轰坊, 并申請(qǐng)任務(wù)集Tasks
(5)客戶端的SC分配任務(wù)集給Executor铸董,
(6)應(yīng)用程序運(yùn)行結(jié)束之后,客戶端的SC會(huì)向RM申請(qǐng)資源釋放并去關(guān)閉自己肴沫,kill進(jìn)程等
上面的流程介紹完之后粟害, 來對(duì)比一下YARN-CLUSTER 與 yarn-client的區(qū)別:
其主要區(qū)別的是AM的作用不大一樣,
yarn-client模式下:AM僅僅向RM請(qǐng)求資源颤芬, 然后AM會(huì)在對(duì)應(yīng)的Container中要求其所屬NodeManager去啟動(dòng)Executor悲幅, Client會(huì)去與此Container Executor通信, 也就是整個(gè)程序運(yùn)行過程中站蝠, Client不能離開
yarn-cluster模式: Driver運(yùn)行在AM中汰具,也就是SC與Executor的所有通信操作都與Client無關(guān)了, 在提交完應(yīng)用程序之后菱魔,Client就可以離開了留荔。
2 spark-submit提交到y(tǒng)arn
首先安裝好Hadoop 并配置好Yarn, 之后啟動(dòng)sbin/start-yarn.sh
其次針對(duì)Spark的spark-env.sh里面增加:
export HADOOP_CONF_DIR=/home/kason/bigdata/hadoop-2.7.4/etc/hadoop
export YARN_CONF_DIR=/home/kason/bigdata/hadoop-2.7.4/etc/hadoop
下面說明一個(gè)最基本的應(yīng)用程序
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkDemo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkDEMO")
val sc: SparkContext = new SparkContext(conf)
val dataRDD: RDD[Int] = sc.parallelize(Array(1,2,3,4))
dataRDD.foreach(print)
print(dataRDD.count())
}
}
通過maven編譯成jar包澜倦, 然后通過spark-shell去提交此jar包到y(tǒng)arn上
./bin/spark-submit --class SparkDemo --master yarn --deploy-mode cluster --driver-memory 1G --executor-memory 1G --executor-cores 1 /home/kason/workspace/BigdataComponents/SparkLearn/target/SparkLearn-1.0-SNAPSHOT.jar
提交之后顯示:
根據(jù)Yarn上的application id找到對(duì)應(yīng)日志:
3 IDEA代碼提交到y(tǒng)arn
通過IDEA實(shí)現(xiàn)代碼提交其實(shí)很簡單聚蝶, 主要是設(shè)置Master, Yarn模式不像mesos以及standalone模式通過傳輸url來實(shí)現(xiàn)資源管理藻治, yarn模式實(shí)際上是Hadoop Yarn接管資源管理碘勉,具體代碼如下:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkDemo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkDEMO")
.setMaster("yarn")
//.setMaster("spark://kason-pc:7077")
.set("spark.yarn.jars","hdfs://kason-pc:9000/system/spark/yarn/jars/*")
.setJars(List("/home/kason/workspace/BigdataComponents/out/artifacts/SparkLearn_jar/SparkLearn.jar"))
//.setJars(GETJars.getJars("/home/kason/workspace/BigdataComponents/spark-main/target/spark-main/WEB-INF/lib"))
val sc: SparkContext = new SparkContext(conf)
val dataRDD: RDD[Int] = sc.parallelize(Array(1,2,3,4))
val result = dataRDD.map(res => res * 2)
result.collect().foreach(println(_))
print(result.count())
}
}
運(yùn)行spark之后, 去8088yarn頁面去查看一下:
注意IDEA代碼提交到y(tǒng)arn只能使用yarn-client模式