Apache Spark是一個開源的通用集群計算系統(tǒng)诵肛,它提供了High-level編程API分衫,支持Scala缭保、Java和Python三種編程語言却嗡。Spark內(nèi)核使用Scala語言編寫,通過基于Scala的函數(shù)式編程特性稍刀,在不同的計算層面進行抽象刨摩,代碼設(shè)計非常優(yōu)秀勇垛。
RDD抽象
RDD(Resilient Distributed Datasets)扎运,彈性分布式數(shù)據(jù)集瑟曲,它是對分布式數(shù)據(jù)集的一種內(nèi)存抽象,通過受限的共享內(nèi)存方式來提供容錯性豪治,同時這種內(nèi)存模型使得計算比傳統(tǒng)的數(shù)據(jù)流模型要高效洞拨。RDD具有5個重要的特性,如下圖所示:
上圖展示了2個RDD進行JOIN操作负拟,體現(xiàn)了RDD所具備的5個主要特性烦衣,如下所示:
一組分區(qū)
計算每一個數(shù)據(jù)分片的函數(shù)
RDD上的一組依賴
可選,對于鍵值對RDD齿椅,有一個Partitioner(通常是HashPartitioner)
可選琉挖,一組Preferred location信息(例如启泣,HDFS文件的Block所在location信息)
有了上述特性涣脚,能夠非常好地通過RDD來表達分布式數(shù)據(jù)集,并作為構(gòu)建DAG圖的基礎(chǔ):首先抽象一次分布式計算任務(wù)的邏輯表示寥茫,最終將任務(wù)在實際的物理計算環(huán)境中進行處理執(zhí)行遣蚀。
計算抽象
在描述Spark中的計算抽象,我們首先需要了解如下幾個概念:
Application
用戶編寫的Spark程序纱耻,完成一個計算任務(wù)的處理芭梯。它是由一個Driver程序和一組運行于Spark集群上的Executor組成。
Job
用戶程序中弄喘,每次調(diào)用Action時玖喘,邏輯上會生成一個Job,一個Job包含了多個Stage蘑志。
Stage
Stage包括兩類:ShuffleMapStage和ResultStage累奈,如果用戶程序中調(diào)用了需要進行Shuffle計算的Operator贬派,如groupByKey等,就會以Shuffle為邊界分成ShuffleMapStage和ResultStage澎媒。
TaskSet
基于Stage可以直接映射為TaskSet搞乏,一個TaskSet封裝了一次需要運算的、具有相同處理邏輯的Task戒努,這些Task可以并行計算请敦,粗粒度的調(diào)度是以TaskSet為單位的。
Task
Task是在物理節(jié)點上運行的基本單位储玫,Task包含兩類:ShuffleMapTask和ResultTask侍筛,分別對應(yīng)于Stage中ShuffleMapStage和ResultStage中的一個執(zhí)行基本單元。
下面撒穷,我們看一下勾笆,上面這些基本概念之間的關(guān)系,如下圖所示:
上圖桥滨,為了簡單窝爪,每個Job假設(shè)都很簡單,并且只需要進行一次Shuffle處理齐媒,所以都對應(yīng)2個Stage蒲每。實際應(yīng)用中,一個Job可能包含若干個Stage喻括,或者是一個相對復雜的Stage DAG邀杏。
在Standalone模式下,默認使用的是FIFO這種簡單的調(diào)度策略唬血,在進行調(diào)度的過程中望蜡,大概流程如下圖所示:
從用戶提交Spark程序,最終生成TaskSet拷恨,而在調(diào)度時脖律,通過TaskSetManager來管理一個TaskSet(包含一組可在物理節(jié)點上執(zhí)行的Task),這里面TaskSet必須要按照順序執(zhí)行才能保證計算結(jié)果的正確性腕侄,因為TaskSet之間是有序依賴的(上溯到ShuffleMapStage和ResultStage)小泉,只有一個TaskSet中的所有Task都運行完成后,才能調(diào)度下一個TaskSet中的Task去執(zhí)行冕杠。
集群模式
Spark集群在設(shè)計的時候微姊,并沒有在資源管理的設(shè)計上對外封閉,而是充分考慮了未來對接一些更強大的資源管理系統(tǒng)分预,如YARN兢交、Mesos等,所以Spark架構(gòu)設(shè)計將資源管理單獨抽象出一層笼痹,通過這種抽象能夠構(gòu)建一種適合企業(yè)當前技術(shù)棧的插件式資源管理模塊配喳,從而為不同的計算場景提供不同的資源分配與調(diào)度策略飘诗。Spark集群模式架構(gòu),如下圖所示:
下圖中界逛,Spark集群Cluster Manager目前支持如下三種模式:
Standalone模式
Standalone模式是Spark內(nèi)部默認實現(xiàn)的一種集群管理模式昆稿,這種模式是通過集群中的Master來統(tǒng)一管理資源,而與Master進行資源請求協(xié)商的是Driver內(nèi)部的StandaloneSchedulerBackend(實際上是其內(nèi)部的StandaloneAppClient真正與Master通信)息拜,后面會詳細說明溉潭。
YARN模式
YARN模式下,可以將資源的管理統(tǒng)一交給YARN集群的ResourceManager去管理少欺,選擇這種模式喳瓣,可以更大限度的適應(yīng)企業(yè)內(nèi)部已有的技術(shù)棧,如果企業(yè)內(nèi)部已經(jīng)在使用Hadoop技術(shù)構(gòu)建大數(shù)據(jù)處理平臺赞别。
Mesos模式
隨著Apache Mesos的不斷成熟畏陕,一些企業(yè)已經(jīng)在嘗試使用Mesos構(gòu)建數(shù)據(jù)中心的操作系統(tǒng)(DCOS),Spark構(gòu)建在Mesos之上仿滔,能夠支持細粒度惠毁、粗粒度的資源調(diào)度策略(Mesos的優(yōu)勢),也可以更好地適應(yīng)企業(yè)內(nèi)部已有技術(shù)棧崎页。
那么鞠绰,Spark中是怎么考慮滿足這一重要的設(shè)計決策的呢?也就是說飒焦,如何能夠保證Spark非常容易的讓第三方資源管理系統(tǒng)輕松地接入進來蜈膨。我們深入到類設(shè)計的層面看一下,如下圖類圖所示:
可以看出牺荠,Task調(diào)度直接依賴SchedulerBackend翁巍,SchedulerBackend與實際資源管理模塊交互實現(xiàn)資源請求。這里面休雌,CoarseGrainedSchedulerBackend是Spark中與資源調(diào)度相關(guān)的最重要的抽象灶壶,它需要抽象出與TaskScheduler通信的邏輯,同時還要能夠與各種不同的第三方資源管理系統(tǒng)無縫地交互挑辆。實際上例朱,CoarseGrainedSchedulerBackend內(nèi)部采用了一種ResourceOffer的方式來處理資源請求孝情。
RPC網(wǎng)絡(luò)通信抽象
Spark RPC層是基于優(yōu)秀的網(wǎng)絡(luò)通信框架Netty設(shè)計開發(fā)的鱼蝉,但是Spark提供了一種很好地抽象方式,將底層的通信細節(jié)屏蔽起來箫荡,而且也能夠基于此來設(shè)計滿足擴展性魁亦,比如,如果有其他不基于Netty的網(wǎng)絡(luò)通信框架的新的RPC接入需求羔挡,可以很好地擴展而不影響上層的設(shè)計洁奈。RPC層設(shè)計间唉,如下圖類圖所示:
任何兩個Endpoint只能通過消息進行通信,可以實現(xiàn)一個RpcEndpoint和一個RpcEndpointRef:想要與RpcEndpoint通信利术,需要獲取到該RpcEndpoint對應(yīng)的RpcEndpointRef即可呈野,而且管理RpcEndpoint和RpcEndpointRef創(chuàng)建及其通信的邏輯,統(tǒng)一在RpcEnv對象中管理印叁。
啟動Standalone集群
Standalone模式下被冒,Spark集群采用了簡單的Master-Slave架構(gòu)模式,Master統(tǒng)一管理所有的Worker轮蜕,這種模式很常見昨悼,我們簡單地看下Spark Standalone集群啟動的基本流程,如下圖所示:
可以看到跃洛,Spark集群采用的消息的模式進行通信率触,也就是EDA架構(gòu)模式,借助于RPC層的優(yōu)雅設(shè)計汇竭,任何兩個Endpoint想要通信葱蝗,發(fā)送消息并攜帶數(shù)據(jù)即可。上圖的流程描述如下所示:
Master啟動時首先創(chuàng)一個RpcEnv對象细燎,負責管理所有通信邏輯
Master通過RpcEnv對象創(chuàng)建一個Endpoint垒玲,Master就是一個Endpoint,Worker可以與其進行通信
Worker啟動時也是創(chuàng)一個RpcEnv對象
Worker通過RpcEnv對象創(chuàng)建一個Endpoint
Worker通過RpcEnv對找颓,建立到Master的連接合愈,獲取到一個RpcEndpointRef對象,通過該對象可以與Master通信
Worker向Master注冊击狮,注冊內(nèi)容包括主機名佛析、端口、CPU Core數(shù)量彪蓬、內(nèi)存數(shù)量
Master接收到Worker的注冊寸莫,將注冊信息維護在內(nèi)存中的Table中,其中還包含了一個到Worker的RpcEndpointRef對象引用
Master回復Worker已經(jīng)接收到注冊档冬,告知Worker已經(jīng)注冊成功
此時如果有用戶提交Spark程序膘茎,Master需要協(xié)調(diào)啟動Driver;而Worker端收到成功注冊響應(yīng)后酷誓,開始周期性向Master發(fā)送心跳
核心組件
集群處理計算任務(wù)的運行時(用戶提交了Spark程序)披坏,最核心的頂層組件就是Driver和Executor,它們內(nèi)部管理很多重要的組件來協(xié)同完成計算任務(wù)盐数,核心組件棧如下圖所示:
Driver和Executor都是運行時創(chuàng)建的組件棒拂,一旦用戶程序運行結(jié)束,他們都會釋放資源,等待下一個用戶程序提交到集群而進行后續(xù)調(diào)度帚屉。上圖谜诫,我們列出了大多數(shù)組件,其中SparkEnv是一個重量級組件攻旦,他們內(nèi)部包含計算過程中需要的主要組件喻旷,而且,Driver和Executor共同需要的組件在SparkEnv中也包含了很多牢屋。這里掰邢,我們不做過多詳述,后面交互流程等處會說明大部分組件負責的功能伟阔。
核心組件交互流程
在Standalone模式下辣之,Spark中各個組件之間交互還是比較復雜的,但是對于一個通用的分布式計算系統(tǒng)來說皱炉,這些都是非常重要而且比較基礎(chǔ)的交互怀估。首先,為了理解組件之間的主要交互流程合搅,我們給出一些基本要點:
一個Application會啟動一個Driver
一個Driver負責跟蹤管理該Application運行過程中所有的資源狀態(tài)和任務(wù)狀態(tài)
一個Driver會管理一組Executor
一個Executor只執(zhí)行屬于一個Driver的Task
核心組件之間的主要交互流程多搀,如下圖所示:
上圖中,通過不同顏色或類型的線條灾部,給出了如下6個核心的交互流程康铭,我們會詳細說明:
橙色:提交用戶Spark程序
用戶提交一個Spark程序,主要的流程如下所示:
用戶spark-submit腳本提交一個Spark程序赌髓,會創(chuàng)建一個ClientEndpoint對象从藤,該對象負責與Master通信交互
ClientEndpoint向Master發(fā)送一個RequestSubmitDriver消息,表示提交用戶程序
Master收到RequestSubmitDriver消息锁蠕,向ClientEndpoint回復SubmitDriverResponse夷野,表示用戶程序已經(jīng)完成注冊
ClientEndpoint向Master發(fā)送RequestDriverStatus消息,請求Driver狀態(tài)
如果當前用戶程序?qū)?yīng)的Driver已經(jīng)啟動荣倾,則ClientEndpoint直接退出悯搔,完成提交用戶程序
紫色:啟動Driver進程
當用戶提交用戶Spark程序后,需要啟動Driver來處理用戶程序的計算邏輯舌仍,完成計算任務(wù)妒貌,這時Master協(xié)調(diào)需要啟動一個Driver,具體流程如下所示:
Maser內(nèi)存中維護著用戶提交計算的任務(wù)Application铸豁,每次內(nèi)存結(jié)構(gòu)變更都會觸發(fā)調(diào)度灌曙,向Worker發(fā)送LaunchDriver請求
Worker收到LaunchDriver消息,會啟動一個DriverRunner線程去執(zhí)行LaunchDriver的任務(wù)
DriverRunner線程在Worker上啟動一個新的JVM實例推姻,該JVM實例內(nèi)運行一個Driver進程平匈,該Driver會創(chuàng)建SparkContext對象
紅色:注冊Application
Dirver啟動以后,它會創(chuàng)建SparkContext對象藏古,初始化計算過程中必需的基本組件增炭,并向Master注冊Application,流程描述如下:
創(chuàng)建SparkEnv對象拧晕,創(chuàng)建并管理一些數(shù)基本組件
創(chuàng)建TaskScheduler隙姿,負責Task調(diào)度
創(chuàng)建StandaloneSchedulerBackend,負責與ClusterManager進行資源協(xié)商
創(chuàng)建DriverEndpoint厂捞,其它組件可以與Driver進行通信
在StandaloneSchedulerBackend內(nèi)部創(chuàng)建一個StandaloneAppClient输玷,負責處理與Master的通信交互
StandaloneAppClient創(chuàng)建一個ClientEndpoint,實際負責與Master通信
ClientEndpoint向Master發(fā)送RegisterApplication消息靡馁,注冊Application
Master收到RegisterApplication請求后欲鹏,回復ClientEndpoint一個RegisteredApplication消息,表示已經(jīng)注冊成功
藍色:啟動Executor進程
Master向Worker發(fā)送LaunchExecutor消息臭墨,請求啟動Executor赔嚎;同時Master會向Driver發(fā)送ExecutorAdded消息,表示Master已經(jīng)新增了一個Executor(此時還未啟動)
Worker收到LaunchExecutor消息胧弛,會啟動一個ExecutorRunner線程去執(zhí)行LaunchExecutor的任務(wù)
Worker向Master發(fā)送ExecutorStageChanged消息尤误,通知Executor狀態(tài)已發(fā)生變化
Master向Driver發(fā)送ExecutorUpdated消息,此時Executor已經(jīng)啟動
粉色:啟動Task執(zhí)行
StandaloneSchedulerBackend啟動一個DriverEndpoint
DriverEndpoint啟動后结缚,會周期性地檢查Driver維護的Executor的狀態(tài)损晤,如果有空閑的Executor便會調(diào)度任務(wù)執(zhí)行
DriverEndpoint向TaskScheduler發(fā)送Resource Offer請求
如果有可用資源啟動Task,則DriverEndpoint向Executor發(fā)送LaunchTask請求
Executor進程內(nèi)部的CoarseGrainedExecutorBackend調(diào)用內(nèi)部的Executor線程的launchTask方法啟動Task
Executor線程內(nèi)部維護一個線程池红竭,創(chuàng)建一個TaskRunner線程并提交到線程池執(zhí)行
綠色:Task運行完成
Executor進程內(nèi)部的Executor線程通知CoarseGrainedExecutorBackend尤勋,Task運行完成
CoarseGrainedExecutorBackend向DriverEndpoint發(fā)送StatusUpdated消息,通知Driver運行的Task狀態(tài)發(fā)生變更
StandaloneSchedulerBackend調(diào)用TaskScheduler的updateStatus方法更新Task狀態(tài)
StandaloneSchedulerBackend繼續(xù)調(diào)用TaskScheduler的resourceOffers方法茵宪,調(diào)度其他任務(wù)運行
Block管理
Block管理斥黑,主要是為Spark提供的Broadcast機制提供服務(wù)支撐的。Spark中內(nèi)置采用TorrentBroadcast實現(xiàn)眉厨,該Broadcast變量對應(yīng)的數(shù)據(jù)(Task數(shù)據(jù))或數(shù)據(jù)集(如RDD)锌奴,默認會被切分成若干4M大小的Block,Task運行過程中讀取到該Broadcast變量憾股,會以4M為單位的Block為拉取數(shù)據(jù)的最小單位鹿蜀,最后將所有的Block合并成Broadcast變量對應(yīng)的完整數(shù)據(jù)或數(shù)據(jù)集。將數(shù)據(jù)切分成4M大小的Block服球,Task從多個Executor拉取Block茴恰,可以非常好地均衡網(wǎng)絡(luò)傳輸負載,提高整個計算集群的穩(wěn)定性斩熊。
通常往枣,用戶程序在編寫過程中,會對某個變量進行Broadcast,該變量稱為Broadcast變量分冈。在實際物理節(jié)點的Executor上執(zhí)行Task時圾另,需要讀取Broadcast變量對應(yīng)的數(shù)據(jù)集,那么此時會根據(jù)需要拉取DAG執(zhí)行流上游已經(jīng)生成的數(shù)據(jù)集雕沉。采用Broadcast機制集乔,可以有效地降低數(shù)據(jù)在計算集群環(huán)境中傳輸?shù)拈_銷。具體地坡椒,如果一個用戶對應(yīng)的程序中的Broadcast變量扰路,對應(yīng)著一個數(shù)據(jù)集,它在計算過程中需要拉取對應(yīng)的數(shù)據(jù)倔叼,如果在同一個物理節(jié)點上運行著多個Task汗唱,多個Task都需要該數(shù)據(jù),有了Broadcast機制丈攒,只需要拉取一份存儲在本地物理機磁盤即可哩罪,供多個Task計算共享。
另外肥印,用戶程序在進行調(diào)度過程中识椰,會根據(jù)調(diào)度策略將Task計算邏輯數(shù)據(jù)(代碼)移動到對應(yīng)的Worker節(jié)點上,最優(yōu)情況是對本地數(shù)據(jù)進行處理深碱,那么代碼(序列化格式)也需要在網(wǎng)絡(luò)上傳輸腹鹉,也是通過Broadcast機制進行傳輸,不過這種方式是首先將代碼序列化到Driver所在Worker節(jié)點敷硅,后續(xù)如果Task在其他Worker中執(zhí)行功咒,需要讀取對應(yīng)代碼的Broadcast變量,首先就是從Driver上拉取代碼數(shù)據(jù)绞蹦,接著其他晚一些被調(diào)度的Task可能直接從其他Worker上的Executor中拉取代碼數(shù)據(jù)力奋。
我們通過以Broadcast變量taskBinary為例,說明Block是如何管理的幽七,如下圖所示:
上圖中景殷,Driver負責管理所有的Broadcast變量對應(yīng)的數(shù)據(jù)所在的Executor,即一個Executor維護一個Block列表澡屡。在Executor中運行一個Task時猿挚,執(zhí)行到對應(yīng)的Broadcast變量taskBinary,如果本地沒有對應(yīng)的數(shù)據(jù)驶鹉,則會向Driver請求獲取Broadcast變量對應(yīng)的數(shù)據(jù)绩蜻,包括一個或多個Block所在的Executor列表,然后該Executor根據(jù)Driver返回的Executor列表室埋,直接通過底層的BlockTransferService組件向?qū)?yīng)Executor請求拉取Block办绝。Executor拉取到的Block會緩存到本地伊约,同時向Driver報告該Executor上存在的Block信息,以供其他Executor執(zhí)行Task時獲取Broadcast變量對應(yīng)的數(shù)據(jù)孕蝉。
參考內(nèi)容
http://spark.apache.org/docs/2.0.0/cluster-overview.html
Spark-2.0.0源碼