Spark的計算階段
我們可以對比來看竹椒。首先和MapReduce一個應(yīng)用一次只運(yùn)行一個map和一個reduce不同苹熏,Spark可以根據(jù)應(yīng)用的復(fù)雜程度,分割成更多的計算階段(stage),這些計算階段組成一個有向無環(huán)圖DAG共苛,Spark任務(wù)調(diào)度器可以根據(jù)DAG的依賴關(guān)系執(zhí)行計算階段。
從圖上看蜓萄,整個應(yīng)用被切分成3個階段隅茎,階段3需要依賴階段1和階段2,階段1和階段2互不依賴嫉沽。Spark在執(zhí)行調(diào)度的時候辟犀,先執(zhí)行階段1和階段2,完成以后绸硕,再執(zhí)行階段3堂竟。如果有更多的階段魂毁,Spark的策略也是一樣的。只要根據(jù)程序初始化好DAG跃捣,就建立了依賴關(guān)系漱牵,然后根據(jù)依賴關(guān)系順序執(zhí)行各個計算階段,Spark大數(shù)據(jù)應(yīng)用的計算就完成了疚漆。
上圖這個DAG對應(yīng)的Spark程序偽代碼如下酣胀。
rddB = rddA.groupBy(key)
rddD = rddC.map(func)
rddF = rddD.union(rddE)
rddG = rddB.join(rddF)
一個數(shù)據(jù)集中的多個數(shù)據(jù)分片需要進(jìn)行分區(qū)傳輸,寫入到另一個數(shù)據(jù)集的不同分片中娶聘,這種數(shù)據(jù)分區(qū)交叉?zhèn)鬏數(shù)牟僮魑畔猓覀冊贛apReduce的運(yùn)行過程中也看到過。
Spark也需要通過shuffle將數(shù)據(jù)進(jìn)行重新組合丸升,相同Key的數(shù)據(jù)放在一起铆农,進(jìn)行聚合、關(guān)聯(lián)等操作狡耻,因而每次shuffle都產(chǎn)生新的計算階段墩剖。這也是為什么計算階段會有依賴關(guān)系,它需要的數(shù)據(jù)來源于前面一個或多個計算階段產(chǎn)生的數(shù)據(jù)夷狰,必須等待前面的階段執(zhí)行完畢才能進(jìn)行shuffle岭皂,并得到數(shù)據(jù)。
計算階段劃分的依據(jù)是shuffle沼头,不是轉(zhuǎn)換函數(shù)的類型爷绘,有的函數(shù)有時候有shuffle,有時候沒有进倍。比如上圖例子中RDD B和RDD F進(jìn)行join土至,得到RDD G,這里的RDD F需要進(jìn)行shuffle猾昆,RDD B就不需要陶因。
因?yàn)镽DD B在前面一個階段,階段1的shuffle過程中垂蜗,已經(jīng)進(jìn)行了數(shù)據(jù)分區(qū)楷扬。分區(qū)數(shù)目和分區(qū)Key不變,就不需要再進(jìn)行shuffle么抗。
這種不需要進(jìn)行shuffle的依賴毅否,在Spark里被稱作窄依賴;相反的蝇刀,需要進(jìn)行shuffle的依賴螟加,被稱作寬依賴。跟MapReduce一樣,shuffle也是Spark最重要的一個環(huán)節(jié)捆探,只有通過shuffle然爆,相關(guān)數(shù)據(jù)才能互相計算,構(gòu)建起復(fù)雜的應(yīng)用邏輯黍图。
Spark的作業(yè)管理
RDD里面的每個數(shù)據(jù)分片曾雕,Spark都會創(chuàng)建一個計算任務(wù)去處理,所以一個計算階段會包含很多個計算任務(wù)(task)助被。
關(guān)于作業(yè)剖张、計算階段、任務(wù)的依賴和時間先后關(guān)系你可以通過下圖看到揩环。
圖中橫軸方向是時間搔弄,縱軸方向是任務(wù)。兩條粗黑線之間是一個作業(yè)丰滑,兩條細(xì)線之間是一個計算階段顾犹。一個作業(yè)至少包含一個計算階段。水平方向紅色的線是任務(wù)褒墨,每個階段由很多個任務(wù)組成炫刷,這些任務(wù)組成一個任務(wù)集合。
DAGScheduler根據(jù)代碼生成DAG圖以后郁妈,Spark的任務(wù)調(diào)度就以任務(wù)為單位進(jìn)行分配浑玛,將任務(wù)分配到分布式集群的不同機(jī)器上執(zhí)行。
Spark的執(zhí)行過程
Spark支持Standalone圃庭、Yarn锄奢、Mesos失晴、Kubernetes等多種部署方案剧腻,幾種部署方案原理也都一樣,只是不同組件角色命名不同涂屁,但是核心功能和運(yùn)行流程都差不多书在。
首先,Spark應(yīng)用程序啟動在自己的JVM進(jìn)程里拆又,即Driver進(jìn)程儒旬,啟動后調(diào)用SparkContext初始化執(zhí)行配置和輸入數(shù)據(jù)。SparkContext啟動DAGScheduler構(gòu)造執(zhí)行的DAG圖帖族,切分成最小的執(zhí)行單位也就是計算任務(wù)栈源。
然后Driver向Cluster Manager請求計算資源,用于DAG的分布式計算竖般。Cluster Manager收到請求以后甚垦,將Driver的主機(jī)地址等信息通知給集群的所有計算節(jié)點(diǎn)Worker。
Worker收到信息以后,根據(jù)Driver的主機(jī)地址艰亮,跟Driver通信并注冊闭翩,然后根據(jù)自己的空閑資源向Driver通報自己可以領(lǐng)用的任務(wù)數(shù)。Driver根據(jù)DAG圖開始向注冊的Worker分配任務(wù)迄埃。
Worker收到任務(wù)后疗韵,啟動Executor進(jìn)程開始執(zhí)行任務(wù)。Executor先檢查自己是否有Driver的執(zhí)行代碼侄非,如果沒有蕉汪,從Driver下載執(zhí)行代碼,通過Java反射加載后開始執(zhí)行逞怨。
Spark有三個主要特性:RDD的編程模型更簡單肤无,DAG切分的多階段計算過程更快速,使用內(nèi)存存儲中間計算結(jié)果更高效骇钦。這三個特性使得Spark相對Hadoop MapReduce可以有更快的執(zhí)行速度宛渐,以及更簡單的編程實(shí)現(xiàn)。