Spark任務(wù)從提交到執(zhí)行完成有很多步驟戏罢,整體上可以劃分為三個階段:
應(yīng)用的提交烁兰;
執(zhí)行環(huán)境的準備减响;
任務(wù)的調(diào)度和執(zhí)行靖诗。
一、執(zhí)行流程概述
Spark有多種不同的運行模式支示,在不同模式下這三個階段的執(zhí)行流程也不太相同刊橘。
以on yarn模式為例,Spark應(yīng)用提交shell命令如下:
$SPARK_HOME/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
$SPARK_HOME/examples/jars/spark-examples*.jar
Spark應(yīng)用執(zhí)行過程可以劃分如下三個階段:
第一步:應(yīng)用的提交
Driver端:
解析參數(shù)颂鸿,驗證參數(shù)合法性
檢查和準備依賴jar包
確定運行的主類促绵,也就是應(yīng)用的入口
Executor端:未創(chuàng)建
第二步:執(zhí)行環(huán)境的準備
Driver端:
進入應(yīng)用的main函數(shù),開始執(zhí)行
首先創(chuàng)建SparkContext對象,在創(chuàng)建時會執(zhí)行
初始化各個服務(wù)模塊和通信的RPC環(huán)境
向cluster manager申請資源
Executor端:
在Worker節(jié)點啟動Executor
初始化Executor败晴,啟動各個服務(wù)模塊
連接到Driver端浓冒,匯報Executor的狀態(tài)
第三步:任務(wù)的調(diào)度和執(zhí)行
Driver端:
執(zhí)行處理任務(wù)代碼
Job分解為Stage,并將Stage劃分為Task
提交Task到Executor端
接受Executor端的狀態(tài)和結(jié)果信息
Executor端:
啟動TaskRunner線程位衩,執(zhí)行接收到的Task
向Driver端匯報執(zhí)行狀態(tài)
向Driver端返回執(zhí)行結(jié)果
二裆蒸、執(zhí)行流程詳解
以如下代碼為例,講解Spark應(yīng)用執(zhí)行的各個階段糖驴。
# HelloWorld.scala
import scala.math.random
import org.apache.spark.sql.SparkSession
object HelloWorld {
def main(args: Array[String]) {
val spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
val rdd = spark.sparkContext.parallelize(Seq("Hello", "World"))
rdd.collect()
}
}
1僚祷、第一階段:應(yīng)用的提交
這個階段主要在Driver端完成,主要目標是:準備依賴jar包并確定Spark應(yīng)用的執(zhí)行主類贮缕。具體的任務(wù)包括:
解析任務(wù)提交的參數(shù)辙谜,并對參數(shù)進行解析和保存。
準備任務(wù)啟動參數(shù)制定的依賴文件或者程序包感昼。
根據(jù)Spark應(yīng)用的執(zhí)行模式和應(yīng)用的編寫語言装哆,來確定執(zhí)行的主類名稱。
實例化執(zhí)行主類定嗓,生成SparkApplication對象蜕琴,并調(diào)用SparkApplication.start()函數(shù)來運行Spark應(yīng)用(如果是Java/Scala代碼則執(zhí)行Spark應(yīng)用中的main函數(shù))。
注意:第1階段完成時宵溅,Driver端并沒有向資源管理平臺申請任何資源凌简,也沒有啟動任何Spark內(nèi)部的服務(wù)。
2恃逻、第二階段:執(zhí)行環(huán)境的準備
通過第1階段雏搂,已經(jīng)找到了運行在Driver端的Spark應(yīng)用的執(zhí)行主類,并創(chuàng)建了SparkApplication對象:app寇损。此時凸郑,在app.start()函數(shù)中會直接調(diào)用主類的main函數(shù)開始執(zhí)行應(yīng)用,從而進入第2階段矛市。
第二階段主要目標是:創(chuàng)建SparkSession(包括SparkContext和SparkEnv)芙沥,完成資源的申請和Executor的創(chuàng)建。第2階段完成后Task的執(zhí)行環(huán)境就準備好了浊吏。
也就是說而昨,第2階段不僅會在Driver端進行初始化,而且還要準備好Executor卿捎。這一階段的任務(wù)主要是在Driver端執(zhí)行創(chuàng)建SparkSession的代碼來完成配紫,也就是執(zhí)行下面一行代碼:
val spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
第二階段的Driver端主要完成以下步驟:
- 創(chuàng)建SparkContext和SparkEnv對象,在創(chuàng)建這兩個對象時午阵,向Cluster Manager申請資源躺孝,啟動各個服務(wù)模塊享扔,并對服務(wù)模塊進行初始化。
- 這些服務(wù)模塊包括:DAG調(diào)度服務(wù)植袍,任務(wù)調(diào)度服務(wù)惧眠,shuffle服務(wù),文件傳輸服務(wù)于个,數(shù)據(jù)塊管理服務(wù)氛魁,內(nèi)存管理服務(wù)等。
第2階段的Executor端主要完成以下步驟:
- Driver端向Cluster Manager申請資源厅篓,若是Yarn模式會在NodeManager上創(chuàng)建ApplicationMaster秀存,并由ApplicationMaster向Cluster Manager來申請資源,并啟動Container羽氮,在Container中啟動Executor或链。
- 在啟動Executor時向Driver端注冊BlockManager服務(wù),并創(chuàng)建心跳服務(wù)RPC環(huán)境档押,通過該RPC環(huán)境向Driver匯報Executor的狀態(tài)信息澳盐。
第二階段執(zhí)行完成后的Spark集群狀態(tài)如下:
3、第三階段:任務(wù)的調(diào)度和執(zhí)行
通過第2階段已經(jīng)完成了Task執(zhí)行環(huán)境的初始化令宿,此時叼耙,在Driver端已經(jīng)完成了SparkContext和SparkEnv的創(chuàng)建,資源已經(jīng)申請到了粒没,并且已經(jīng)啟動了Executor筛婉。
這一階段會執(zhí)行接下來的數(shù)據(jù)處理的代碼:
val rdd = spark.sparkContext.parallelize(Seq("Hello", "World"))
rdd.collect()
第3階段Driver端主要完成以下步驟:
執(zhí)行Spark的處理代碼,當執(zhí)行map操作時革娄,生成新的RDD倾贰;
當執(zhí)行Action操作時冕碟,觸發(fā)Job的提交拦惋,此時會執(zhí)行以下步驟:
根據(jù)RDD的血緣,把Job劃分成相互依賴的Stage安寺;
把每個Stage拆分成一個或多個Task厕妖;
把這些Task提交給已經(jīng)創(chuàng)建好的Executor去執(zhí)行;
獲取Executor的執(zhí)行狀態(tài)信息挑庶,直到Executor完成所有Task的執(zhí)行言秸;
獲取執(zhí)行結(jié)果和最終的執(zhí)行狀態(tài)。