Spark內(nèi)核解析

Spark內(nèi)核概述

Spark內(nèi)核泛指Spark的核心運行機制曙砂,包括Spark核心組件的運行機制、Spark任務(wù)調(diào)度機制、Spark內(nèi)存管理機制辐董、Spark核心功能的運行原理等,熟練掌握Spark內(nèi)核原理禀综。

一简烘、Spark核心組件回顧

Driver

Spark驅(qū)動器節(jié)點,用于執(zhí)行Spark任務(wù)中的main方法定枷,負責實際代碼的執(zhí)行工作孤澎。Driver在Spark作業(yè)執(zhí)行時主要負責:

1、將用戶程序轉(zhuǎn)化為任務(wù)(Job)欠窒;

2覆旭、在Executor之間調(diào)度任務(wù)(task);

3、跟蹤Executor的執(zhí)行情況;

4、通過UI展示查詢運行情況贷掖。

Executor

Spark Executor節(jié)點是一個JVM進程,負責在Spark作業(yè)中運行具體任務(wù)丸凭,任務(wù)彼此之間相互獨立。Spark應用啟動時惊搏,Executor節(jié)點被同時啟動贮乳,并且始終伴隨著整個Spark應用的生命周期而存在。如果有Executor節(jié)點發(fā)生了故障或崩潰恬惯,Spark應用也可以繼續(xù)執(zhí)行向拆,會將出錯節(jié)點上的任務(wù)調(diào)度到其他Executor節(jié)點上繼續(xù)運行。

Executor有兩個核心功能:

1酪耳、負責運行組成Spark應用的任務(wù)浓恳,并將結(jié)果返回給Driver進程;

2碗暗、它們通過自身的塊管理器(Block Manager)為用戶程序中要求緩存的RDD提供內(nèi)存式存儲颈将。RDD是直接緩存在Executor進程內(nèi)的,因此任務(wù)可以在運行時充分利用緩存數(shù)據(jù)加速運算言疗。

Spark通用運行流程概述

img

上圖為Spark通用運行流程晴圾,不論Spark以何種模式進行部署,任務(wù)提交后噪奄,都會先啟動Driver進程死姚,隨后Driver進程向集群管理器注冊應用程序,之后集群管理器根據(jù)此任務(wù)的配置文件分配Executor并啟動勤篮,當Driver所需的資源全部滿足后都毒,Driver開始執(zhí)行main函數(shù),Spark查詢?yōu)閼袌?zhí)行碰缔,當執(zhí)行到action算子時開始反向推算账劲,根據(jù)寬依賴進行stage的劃分,隨后每一個stage對應一個taskset金抡,taskset中有多個task瀑焦,根據(jù)本地化原則,task會被分發(fā)到指定的Executor去執(zhí)行梗肝,在任務(wù)執(zhí)行的過程中蝠猬,Executor也會不斷與Driver進行通信,報告任務(wù)運行情況统捶。

二、Spark部署模式

Spark支持三種集群管理器(Cluster Manager),分別為:

1喘鸟、Standalone:獨立模式匆绣,Spark原生的簡單集群管理器,自帶完整的服務(wù)什黑,可單獨部署到一個集群中崎淳,無需依賴任何其他資源管理系統(tǒng),使用Standalone可以很方便地搭建一個集群愕把;

2拣凹、Apache Mesos:一個強大的分布式資源管理框架,它允許多種不同的框架部署在其上恨豁,包括yarn嚣镜;

3、Hadoop YARN:統(tǒng)一的資源管理機制橘蜜,在上面可以運行多套計算框架菊匿,如map reduce、storm等计福,根據(jù)driver在集群中的位置不同跌捆,分為yarn client和yarn cluster。

實際上象颖,除了上述這些通用的集群管理器外佩厚,Spark內(nèi)部也提供了一些方便用戶測試和學習的簡單集群部署模式。由于在實際工廠環(huán)境下使用的絕大多數(shù)的集群管理器是Hadoop YARN说订,因此我們關(guān)注的重點是Hadoop YARN模式下的Spark集群部署抄瓦。

Spark的運行模式取決于傳遞給SparkContext的MASTER環(huán)境變量的值,個別模式還需要輔助的程序接口來配合使用克蚂,目前支持的Master字符串及URL包括:

img

用戶在提交任務(wù)給Spark處理時闺鲸,以下兩個參數(shù)共同決定了Spark的運行方式。

  • master MASTER_URL:決定了Spark任務(wù)提交給哪種集群處理埃叭。

  • deploy-mode DEPLOY_MODE:決定了Driver的運行方式摸恍,可選值為Client或者Cluster。

Standalone模式運行機制

Standalone集群有四個重要組成部分赤屋,分別是:

(1)Driver:是一個進程立镶,我們編寫的Spark應用程序就運行在Driver上,由Driver進程執(zhí)行类早;

(2)Master:是一個進程媚媒,主要負責資源調(diào)度和分配,并進行集群的監(jiān)控等職責涩僻;

(3)Worker:是一個進程缭召,一個Worker運行在集群中的一臺服務(wù)器上栈顷,主要負責兩個職責,一個是用自己的內(nèi)存存儲RDD的某個或某些partition嵌巷;另一個是啟動其他進程和線程(Executor)萄凤,對RDD上的partition進行并行的處理和計算。

(4)Executor:是一個進程搪哪,一個Worker上可以運行多個Executor靡努,Executor通過啟動多個線程(task)來執(zhí)行對RDD的partition進行并行計算,也就是執(zhí)行我們對RDD定義的例如map晓折、flatMap惑朦、reduce等算子操作。

Standalone Client模式

img

1漓概、在Standalone Client模式下漾月,Driver在任務(wù)提交的本地機器上運行;

2垛耳、Driver啟動后向Master注冊應用程序栅屏,Master根據(jù)submit腳本的資源需求找到內(nèi)部資源至少可以啟動一個Executor的所有Worker,然后在這些Worker之間分配Executor堂鲜;

3栈雳、Worker上的Executor啟動后會向Driver反向注冊;

4缔莲、當所有的Executor注冊完成后哥纫,Driver開始執(zhí)行main函數(shù);

5痴奏、之后執(zhí)行到Action算子時蛀骇,開始劃分stage;

6读拆、每個stage生成對應的taskSet擅憔,之后將task 分發(fā)到各個Executor上執(zhí)行。

Standalone Cluster模式

img

1檐晕、在Standalone Cluster模式下暑诸,任務(wù)提交后,Master會找到一個Worker啟動Driver進程辟灰;

2个榕、Driver啟動后向 Master注冊應用程序;

3芥喇、Master根據(jù)submit腳本的資源需求找到內(nèi)部資源至少可以啟動一個Executor的所有 Worker西采,然后在這些Worker之間分配Executor;

4继控、Worker上的Executor啟動后會向Driver反向注冊械馆;

5胖眷、所有的 Executor注冊完成后,Driver開始執(zhí)行main函數(shù)狱杰;

6瘦材、之后執(zhí)行到Action算子時,開始劃分stage仿畸,每個stage生成對應的taskSet;

7朗和、之后將task分發(fā)到各個Executor上執(zhí)行错沽。

注意,Standalone的兩種模式下(client/Cluster)眶拉,Master在接到Driver注冊Spark應用程序的請求后千埃,會獲取其所管理的剩余資源能夠啟動一個 Executor的所有Worker,然后在這些Worker之間分發(fā)Executor忆植,此時的分發(fā)只考慮Worker上的資源是否足夠使用放可,直到當前應用程序所需的所有Executor都分配完畢,Executor反向注冊完畢后朝刊,Driver開始執(zhí)行main程序耀里。

YARN模式運行機制

YARN Client模式

img

1、在YARN Client模式下拾氓,Driver在任務(wù)提交的本地機器上運行冯挎;

2、Driver啟動后會和ResourceManager通訊申請啟動ApplicationMaster咙鞍;

3房官、隨后ResourceManager分配container,在合適的NodeManager上啟動ApplicationMaster续滋,此時的ApplicationMaster的功能相當于一個ExecutorLaucher(執(zhí)行者發(fā)射器)翰守,只負責向ResourceManager申請Executor內(nèi)存;

4疲酌、ResourceManager接到ApplicationMaster的資源申請后會分配container蜡峰,然后ApplicationMaster在資源分配指定的NodeManager上啟動Executor進程;

5徐勃、Executor進程啟動后會向Driver反向注冊事示;

6、Executor全部注冊完成后Driver開始執(zhí)行main函數(shù)僻肖;

7肖爵、之后執(zhí)行到Action算子時,觸發(fā)一個job臀脏,并根據(jù)寬依賴開始劃分stage劝堪;

8冀自、每個stage生成對應的taskSet,之后將task分發(fā)到各個Executor上執(zhí)行秒啦。

YARN Cluster模式

img

1熬粗、在YARN Cluster模式下,任務(wù)提交后會和ResourceManager通訊申請啟動ApplicationMaster余境;

2驻呐、隨后ResourceManager分配container,在合適的NodeManager上啟動ApplicationMaster芳来;(此時的ApplicationMaster就是Driver)

3含末、Driver啟動后向ResourceManager申請Executor內(nèi)存,ResourceManager接到ApplicationMaster的資源申請后會分配container即舌,然后在合適的NodeManager上啟動Executor進程佣盒;

4、Executor進程啟動后會向Driver反向注冊顽聂;

5肥惭、Executor全部注冊完成后Driver開始執(zhí)行main函數(shù);

6紊搪、之后執(zhí)行到Action算子時蜜葱,觸發(fā)一個job,并根據(jù)寬依賴開始劃分stage嗦明;

7笼沥、每個stage生成對應的taskSet,之后將task分發(fā)到各個Executor上執(zhí)行娶牌。

三奔浅、Spark通訊架構(gòu)

Spark通信架構(gòu)概述

Spark2.x版本使用Netty通訊架構(gòu)作為內(nèi)部通訊組件。Spark基于Netty新的rpc框架借鑒了Akka中的設(shè)計诗良,它是基于Actor模型汹桦,如下圖所示:

img

Spark通訊框架中各個組件(Client/Master/Worker)可以認為是一個個獨立的實體,各個實體之間通過消息來進行通信鉴裹。具體各個組件之間的關(guān)系如下:

img

Endpoint(Client/Master/Worker)有一個InBox和N個OutBox(N>=1舞骆,N取決于當前Endpoint與多少其他的Endpoint進行通信,一個與其通訊的其他Endpoint對應一個OutBox)径荔,Endpoint接收到的消息被寫入InBox督禽,發(fā)送出去的消息寫入OutBox并被發(fā)送到其他Endpoint的InBox中。

Spark通訊架構(gòu)解析

Spark通信架構(gòu)如下圖所示:

img
  1. RpcEndpoint:RPC端點总处,Spark針對每個節(jié)點(Client/Master/Worker)都稱之為一個Rpc 端點狈惫,且都實現(xiàn)RpcEndpoint接口,內(nèi)部根據(jù)不同端點的需求鹦马,設(shè)計不同的消息和不同的業(yè)務(wù)處理胧谈,如果需要發(fā)送(詢問)則調(diào)用 Dispatcher忆肾;

  2. RpcEnv:RPC上下文環(huán)境,每個RPC端點運行時依賴的上下文環(huán)境稱為 RpcEnv菱肖;

  3. Dispatcher:消息分發(fā)器客冈,針對于RPC端點需要發(fā)送消息或者從遠程 RPC 接收到的消息,分發(fā)至對應的指令收件箱/發(fā)件箱稳强。如果指令接收方是自己則存入收件箱场仲,如果指令接收方不是自己,則放入發(fā)件箱退疫;

  4. Inbox:指令消息收件箱燎窘,一個本地RpcEndpoint對應一個收件箱,Dispatcher在每次向Inbox存入消息時蹄咖,都將對應EndpointData加入內(nèi)部ReceiverQueue中,另外Dispatcher創(chuàng)建時會啟動一個單獨線程進行輪詢ReceiverQueue付鹿,進行收件箱消息消費澜汤;

  5. RpcEndpointRef:RpcEndpointRef是對遠程RpcEndpoint的一個引用。當我 們需要向一個具體的RpcEndpoint發(fā)送消息時舵匾,一般我們需要獲取到該RpcEndpoint的引用俊抵,然后通過該應用發(fā)送消息。

  6. OutBox:指令消息發(fā)件箱坐梯,對于當前RpcEndpoint來說徽诲,一個目標RpcEndpoint對應一個發(fā)件箱,如果向多個目標RpcEndpoint發(fā)送信息吵血,則有多個OutBox谎替。當消息放入Outbox后,緊接著通過TransportClient將消息發(fā)送出去瓮具。消息放入發(fā)件箱以及發(fā)送過程是在同一個線程中進行官地;

  7. RpcAddress:表示遠程的RpcEndpointRef的地址厌处,Host + Port。

  8. TransportClient:Netty通信客戶端秩命,一個OutBox對應一個TransportClient,TransportClient不斷輪詢OutBox褒傅,根據(jù)OutBox消息的receiver信息弃锐,請求對應的遠程TransportServer;

  9. TransportServer:Netty通信服務(wù)端殿托,一個RpcEndpoint對應一個TransportServer霹菊,接受遠程消息后調(diào)用 Dispatcher分發(fā)消息至對應收發(fā)件箱;

根據(jù)上面的分析碌尔,Spark通信架構(gòu)的高層視圖如下圖所示:

img

四浇辜、SparkContext解析

在Spark中由SparkContext負責與集群進行通訊券敌、資源的申請以及任務(wù)的分配和監(jiān)控等。當 Worker節(jié)點中的Executor運行完畢Task后柳洋,Driver同時負責將SparkContext關(guān)閉待诅。

通常也可以使用SparkContext來代表驅(qū)動程序(Driver)。

img

SparkContext是用戶通往Spark集群的唯一入口熊镣,可以用來在Spark集群中創(chuàng)建RDD卑雁、累加器和廣播變量。

SparkContext也是整個Spark應用程序中至關(guān)重要的一個對象绪囱,可以說是整個Application運行調(diào)度的核心(不包括資源調(diào)度)测蹲。

SparkContext的核心作用是初始化Spark應用程序運行所需的核心組件,包括高層調(diào)度器(DAGScheduler)鬼吵、底層調(diào)度器(TaskScheduler)和調(diào)度器的通信終端(SchedulerBackend)扣甲,同時還會負責Spark程序向Cluster Manager的注冊等。

img

在實際的編碼過程中齿椅,我們會先創(chuàng)建SparkConf實例琉挖,并對SparkConf的屬性進行自定義設(shè)置,隨后涣脚,將SparkConf作為SparkContext類的唯一構(gòu)造參數(shù)傳入來完成SparkContext實例對象的創(chuàng)建示辈。SparkContext在實例化的過程中會初始化DAGScheduler、TaskScheduler和SchedulerBackend遣蚀,當RDD的action算子觸發(fā)了作業(yè)(Job)后矾麻,SparkContext會調(diào)用DAGScheduler根據(jù)寬窄依賴將Job劃分成幾個小的階段(Stage),TaskScheduler會調(diào)度每個Stage的任務(wù)(Task)芭梯,另外险耀,SchedulerBackend負責申請和管理集群為當前Application分配的計算資源(即Executor)。

如果我們將Spark Application比作汽車粥帚,那么SparkContext就是汽車的引擎胰耗,而SparkConf就是引擎的配置參數(shù)。

下圖描述了Spark-On-Yarn模式下在任務(wù)調(diào)度期間芒涡,ApplicationMaster柴灯、Driver以及Executor內(nèi)部模塊的交互過程:

img

Driver初始化SparkContext過程中,會分別初始化DAGScheduler费尽、TaskScheduler赠群、SchedulerBackend以及HeartbeatReceiver,并啟動SchedulerBackend以及HeartbeatReceiver旱幼。SchedulerBackend通過ApplicationMaster申請資源查描,并不斷從TaskScheduler中拿到合適的Task分發(fā)到Executor執(zhí)行。HeartbeatReceiver負責接收Executor的心跳信息,監(jiān)控Executor的存活狀況冬三,并通知到TaskScheduler匀油。

五、Spark任務(wù)調(diào)度機制

在工廠環(huán)境下勾笆,Spark集群的部署方式一般為YARN-Cluster模式敌蚜,之后的內(nèi)核分析內(nèi)容中我們默認集群的部署方式為YARN-Cluster模式。

Spark任務(wù)提交流程

img

Spark YARN-Cluster模式下的任務(wù)提交流程

下面的時序圖清晰地說明了一個Spark應用程序從提交到運行的完整流程:

img

1窝爪、提交一個Spark應用程序弛车,首先通過Client向ResourceManager請求啟動一個Application,同時檢查是否有足夠的資源滿足Application的需求蒲每,如果資源條件滿足纷跛,則準備ApplicationMaster的啟動上下文,交給ResourceManager邀杏,并循環(huán)監(jiān)控Application狀態(tài)贫奠。

2、當提交的資源隊列中有資源時望蜡,ResourceManager會在某個 NodeManager上啟動ApplicationMaster進程叮阅,ApplicationMaster會單獨啟動Driver后臺線程,當Driver啟動后泣特,ApplicationMaster會通過本地的RPC連接Driver,并開始向ResourceManager申請Container資源運行Executor進程(一個Executor對應與一個Container)挑随,當ResourceManager返回Container資源状您,ApplicationMaster則在對應的Container上啟動Executor。

3兜挨、Driver線程主要是初始化SparkContext對象膏孟,準備運行所需的上下文,然后一方面保持與ApplicationMaster的RPC連接拌汇,通過ApplicationMaster申請資源柒桑,另一方面根據(jù)用戶業(yè)務(wù)邏輯開始調(diào)度任務(wù),將任務(wù)下發(fā)到已有的空閑Executor上噪舀。

4魁淳、當ResourceManager向ApplicationMaster返回Container資源時,ApplicationMaster就嘗試在對應的Container上啟動Executor進程与倡,Executor進程起來后界逛,會向Driver反向注冊,注冊成功后保持與Driver的心跳纺座,同時等待Driver分發(fā)任務(wù)息拜,當分發(fā)的任務(wù)執(zhí)行完畢后,將任務(wù)狀態(tài)上報給 Driver。

從上述時序圖可知少欺,Client只負責提交Application并監(jiān)控Application 的狀態(tài)喳瓣。對于Spark的任務(wù)調(diào)度主要是集中在兩個方面: 資源申請和任務(wù)分發(fā),其主要是通過ApplicationMaster赞别、Driver以及Executor之間來完成畏陕。

Spark任務(wù)調(diào)度概述

當Driver起來后,Driver則會根據(jù)用戶程序邏輯準備任務(wù)氯庆,并根據(jù)Executor資源情況逐步分發(fā)任務(wù)蹭秋。在詳細闡述任務(wù)調(diào)度前,首先說明下Spark里的幾個概念堤撵。一個Spark應用程序包括Job仁讨、Stage以及Task三個概念:

Job是以Action方法為界,遇到一個Action方法則觸發(fā)一個Job实昨;

Stage是Job的子集洞豁,以RDD寬依賴(即 Shuffle)為界,遇到Shuffle做一次劃分荒给;

Task是Stage的子集丈挟,以并行度(分區(qū)數(shù))來衡量,分區(qū)數(shù)是多少志电,則有多少個task曙咽。

Spark的任務(wù)調(diào)度總體來說分兩路進行,一路是Stage級的調(diào)度挑辆,一路是Task級的調(diào)度例朱,總體調(diào)度流程如下圖所示:

img

Spark RDD通過其Transactions操作,形成了RDD血緣關(guān)系圖鱼蝉,即DAG洒嗤,最后通過Action的調(diào)用,觸發(fā)Job并調(diào)度執(zhí)行魁亦。DAGScheduler負責Stage級的調(diào)度渔隶,主要是將job切分成若干個Stage,并將每個Stage打包成TaskSet交給TaskScheduler調(diào)度洁奈。TaskScheduler負責Task級的調(diào)度间唉,將DAGScheduler給過來的TaskSet按照指定的調(diào)度策略分發(fā)到Executor上執(zhí)行,調(diào)度過程中SchedulerBackend負責提供可用資源利术,其中SchedulerBackend有多種實現(xiàn)终吼,分別對接不同的資源管理系統(tǒng)。

Spark Stage級調(diào)度

Spark的任務(wù)調(diào)度是從DAG切割開始氯哮,主要是由DAGScheduler來完成际跪。當遇到一個Action操作后就會觸發(fā)一個Job的計算商佛,并交給DAGScheduler來提交,下圖是涉及到Job提交的相關(guān)方法調(diào)用流程圖姆打。

img

Job由最終的RDD和Action方法封裝而成良姆,SparkContext 將Job交給DAGScheduler提交,它會根據(jù)RDD的血緣關(guān)系構(gòu)成的DAG進行切分幔戏,將一個Job劃分為若干Stages玛追,具體劃分策略是,由最終的RDD不斷通過依賴回溯判斷父依賴 是否是寬依賴闲延,即以Shuffle為界痊剖,劃分Stage,窄依賴的RDD之間被劃分到同一個Stage中垒玲,可以進行pipeline式的計算陆馁,如上圖紫色流程部分。劃分的Stages分兩類合愈,一類叫做ResultStage叮贩,為DAG最下游的Stage,由Action方法決定佛析,另一類叫做ShuffleMapStage益老,為下游Stage準備數(shù)據(jù),下面看一個簡單的例子WordCount寸莫。

img

Job由saveAsTextFile觸發(fā)捺萌,該Job由RDD-3和saveAsTextFile方法組成,根據(jù)RDD之間的依賴關(guān)系從RDD-3開始回溯搜索膘茎,直到?jīng)]有依賴的RDD-0互婿,在回溯搜索過程中,RDD-3依賴RDD-2辽狈,并且是寬依賴,所以在RDD-2和RDD-3之間劃分Stage呛牲,RDD-3被劃到最后一個Stage刮萌,即ResultStage中,RDD-2依賴RDD-1娘扩,RDD-1依賴RDD-0着茸,這些依賴都是窄依賴,所以將RDD-0琐旁、RDD-1和RDD-2劃分到同一個 Stage涮阔,即 ShuffleMapStage中,實際執(zhí)行的時候灰殴,數(shù)據(jù)記錄會一氣呵成地執(zhí)行RDD-0到RDD-2的轉(zhuǎn)化敬特。不難看出,其本質(zhì)上是一個深度優(yōu)先搜索算法。一個Stage是否被提交伟阔,需要判斷它的父Stage是否執(zhí)行辣之,只有在父Stage執(zhí)行完畢才能提交當前Stage,如果一個Stage沒有父Stage皱炉,那么從該Stage開始提交怀估。Stage提交時會將Task信息(分區(qū)信息以及方法等)序列化并被打包成TaskSet 交給TaskScheduler,一個Partition對應一個Task合搅,另一方面TaskScheduler會監(jiān)控Stage的運行狀態(tài)多搀,只有Executor丟失或者Task由于Fetch失敗才需要重新提交失敗的Stage以調(diào)度運行失敗的任務(wù),其他類型的Task失敗會在TaskScheduler的調(diào)度過程中重試灾部。相對來說DAGScheduler做的事情較為簡單康铭,僅僅是在Stage層面上劃分DAG,提交Stage并監(jiān)控相關(guān)狀態(tài)信息梳猪。TaskScheduler則相對較為復雜麻削,下面詳細闡述其細節(jié)。

Spark Task級調(diào)度

Spark Task的調(diào)度是由TaskScheduler來完成春弥,DAGScheduler將Stage打包到TaskSet交給TaskScheduler呛哟,TaskScheduler會將TaskSet封裝為TaskSetManager加入到調(diào)度隊列中,TaskSetManager結(jié)構(gòu)如下圖所示匿沛。

img

TaskSetManager負責監(jiān)控管理同一個Stage中的Tasks扫责,TaskScheduler就是以TaskSetManager為單元來調(diào)度任務(wù)。

TaskScheduler初始化后會啟動SchedulerBackend逃呼,它負責跟外界打交道鳖孤,接收Executor的注冊信息,并維護Executor的狀態(tài)抡笼,所以說SchedulerBackend是管“糧食”的苏揣,同時它在啟動后會定期地去“詢問”TaskScheduler有沒有任務(wù)要運行,也就是說推姻,它會定期地“問”TaskScheduler“我有這么余量平匈,你要不要啊”,TaskScheduler在SchedulerBackend“問 ”它的時候藏古,會從調(diào)度隊列中按照指定的調(diào)度策略選擇TaskSetManager去調(diào)度運行增炭,大致方法調(diào)用流程如下圖所示:

img

將TaskSetManager加入rootPool調(diào)度池中之后,調(diào)用SchedulerBackend的riviveOffers方法給driverEndpoint發(fā)送ReviveOffer消息拧晕;driverEndpoint收到ReviveOffer消息后調(diào)用makeOffers方法隙姿,過濾出活躍狀態(tài)的Executor(這些Executor都是任務(wù)啟動時反向注冊到Driver的Executor),然后將Executor封裝成WorkerOffer對象厂捞;準備好計算資源(WorkerOffer)后输玷,taskScheduler基于這些資源調(diào)用resourceOffer在Executor上分配task队丝。

六、調(diào)度策略

前面講到饲嗽,TaskScheduler會先把DAGScheduler給過來的TaskSet封裝成TaskSetManager扔到任務(wù)隊列里炭玫,然后再從任務(wù)隊列里按照一定的規(guī)則把它們?nèi)〕鰜碓赟chedulerBackend給過來的Executor上運行。這個調(diào)度過程實際上還是比較粗粒度的貌虾,是面向TaskSetManager的吞加。調(diào)度隊列的層次結(jié)構(gòu)如下圖所示:

img

TaskScheduler是以樹的方式來管理任務(wù)隊列,樹中的節(jié)點類型為Schdulable尽狠,葉子節(jié)點為TaskSetManager衔憨,非葉子節(jié)點為Pool,下圖是它們之間的繼承關(guān)系袄膏。

img

TaskScheduler支持兩種調(diào)度策略践图,一種是FIFO,也是默認的調(diào)度策略沉馆,另一種是FAIR码党。在TaskScheduler初始化過程中會實例化rootPool,表示樹的根節(jié)點斥黑,是Pool類型揖盘。

1、FIFO調(diào)度策略

FIFO調(diào)度策略執(zhí)行步驟如下:

1)對s1和s2兩個Schedulable的優(yōu)先級(Schedulable類的一個屬性锌奴,記為priority兽狭,值越小,優(yōu)先級越高)鹿蜀;

2)如果兩個Schedulable的優(yōu)先級相同箕慧,則對s1,s2所屬的Stage的身份進行標識進行比較(Schedulable類的一個屬性茴恰,記為priority倍奢,值越小打肝,優(yōu)先級越高)蕴忆;

3)如果比較的結(jié)果小于0剃毒,則優(yōu)先調(diào)度s1具练,否則優(yōu)先調(diào)度s2菱涤。

img

2晶通、FAIR 調(diào)度策略

FAIR 調(diào)度策略的樹結(jié)構(gòu)如下圖所示:

img

FAIR模式中有一個rootPool和多個子Pool项钮,各個子Pool中存儲著所有待分配的TaskSetMagager渣叛。

可以通過在Properties中指定spark.scheduler.pool屬性丈秩,指定調(diào)度池中的某個調(diào)度池作為TaskSetManager的父調(diào)度池,如果根調(diào)度池不存在此屬性值對應的調(diào)度池淳衙,會創(chuàng)建以此屬性值為名稱的調(diào)度池作為TaskSetManager的父調(diào)度池蘑秽,并將此調(diào)度池作為根調(diào)度池的子調(diào)度池饺著。

在FAIR模式中,需要先對子Pool進行排序肠牲,再對子Pool里面的TaskSetMagager進行排序幼衰,因為Pool和TaskSetMagager都繼承了Schedulable特質(zhì),因此使用相同的排序算法缀雳。

排序過程的比較是基于Fair-share來比較的渡嚣,每個要排序的對象包含三個屬性:runningTasks值(正在運行的Task數(shù))、minShare值肥印、weight值识椰,比較時會綜合考量runningTasks值,minShare值以及weight值深碱。

注意腹鹉,minShare、weight的值均在公平調(diào)度配置文件fairscheduler.xml中被指定敷硅,調(diào)度池在構(gòu)建階段會讀取此文件的相關(guān)配置功咒。

1)如果A對象的runningTasks大于它的minShare,B對象的runningTasks小于它的minShare绞蹦,那么B排在A前面力奋;(runningTasks比minShare小的先執(zhí)行)

2)如果A、B對象的runningTasks都小于它們的minShare坦辟,那么就比較runningTasks與minShare的比值(minShare使用率)刊侯,誰小誰排前面;(minShare使用率低的先執(zhí)行)

3)如果A锉走、B對象的runningTasks都大于它們的minShare滨彻,那么就比較runningTasks與weight的比值(權(quán)重使用率),誰小誰排前面挪蹭。(權(quán)重使用率低的先執(zhí)行)

4)如果上述比較均相等亭饵,則比較名字。

整體上來說就是通過minShare和weight這兩個參數(shù)控制比較過程梁厉,可以做到讓minShare使用率和權(quán)重使用率少(實際運行task比例較少)的先運行辜羊。

FAIR模式排序完成后,所有的TaskSetManager被放入一個ArrayBuffer里词顾,之后依次被取出并發(fā)送給Executor執(zhí)行八秃。

從調(diào)度隊列中拿到TaskSetManager后,由于TaskSetManager封裝了一個Stage的所有Task肉盹,并負責管理調(diào)度這些Task昔驱,那么接下來的工作就是TaskSetManager按照一定的規(guī)則一個個取出Task給TaskScheduler,TaskScheduler再交給SchedulerBackend去發(fā)到Executor上執(zhí)行上忍。

本地化調(diào)度

DAGScheduler切割Job骤肛,劃分Stage,通過調(diào)用submitStage來提交一個Stage對應的tasks纳本,submitStage會調(diào)用submitMissingTasks,submitMissingTasks確定每個需要計算的task的preferredLocations腋颠,通過調(diào)用getPreferrdeLocations()得到partition的優(yōu)先位置繁成,由于一個partition對應一個task,此partition的優(yōu)先位置就是task的優(yōu)先位置淑玫,對于要提交到TaskScheduler的TaskSet中的每一個task巾腕,該task優(yōu)先位置與其對應的partition對應的優(yōu)先位置一致。從調(diào)度隊列中拿到TaskSetManager后混移,那么接下來的工作就是TaskSetManager按照一定的規(guī)則一個個取出task給TaskScheduler祠墅,TaskScheduler再交給SchedulerBackend去發(fā)到Executor上執(zhí)行。前面也提到歌径,TaskSetManager封裝了一個Stage的所有task毁嗦,并負責管理調(diào)度這些task。根據(jù)每個task的優(yōu)先位置回铛,確定task的Locality級別狗准,Locality一共有五種,優(yōu)先級由高到低順序:

img

在調(diào)度執(zhí)行時茵肃,Spark調(diào)度總是會盡量讓每個task以最高的本地性級別來啟動腔长,當一個task以X本地性級別啟動,但是該本地性級別對應的所有節(jié)點都沒有空閑資源而啟動失敗验残,此時并不會馬上降低本地性級別啟動而是在某個時間長度內(nèi)再次以X本地性級別來啟動該task捞附,若超過限時時間則降級啟動,去嘗試下一個本地性級別您没,依次類推鸟召。可以通過調(diào)大每個類別的最大容忍延遲時間氨鹏,在等待階段對應的Executor可能就會有相應的資源去執(zhí)行此task欧募,這就在在一定程度上提到了運行性能。

失敗重試與黑名單機制

除了選擇合適的Task調(diào)度運行外仆抵,還需要監(jiān)控Task的執(zhí)行狀態(tài)跟继,前面也提到,與外部打交道的是SchedulerBackend镣丑,Task被提交到Executor啟動執(zhí)行后舔糖,Executor會將執(zhí)行狀態(tài)上報給SchedulerBackend,SchedulerBackend則告訴TaskScheduler莺匠,TaskScheduler找到該Task對應的TaskSetManager金吗,并通知到該TaskSetManager,這樣TaskSetManager就知道Task的失敗與成功狀態(tài),對于失敗的Task辽聊,會記錄它失敗的次數(shù),如果失敗次數(shù)還沒有超過最大重試次數(shù)期贫,那么就把它放回待調(diào)度的Task池子中跟匆,否則整個Application失敗。在記錄Task失敗次數(shù)過程中通砍,會記錄它上一次失敗所在的ExecutorId和Host玛臂,這樣下次再調(diào)度這個Task時,會使用黑名單機制封孙,避免它被調(diào)度到上一次失敗的節(jié)點上迹冤,起到一定的容錯作用。黑名單記錄Task上一次失敗所在的ExecutorId和Host虎忌,以及其對應的“拉黑”時間泡徙,“拉黑”時間是指這段時間內(nèi)不要再往這個節(jié)點上調(diào)度這個Task了。

七膜蠢、Spark Shuffle解析

ShuffleMapStage與FinalStage

img

在劃分stage時堪藐,最后一個stage成為FinalStage,它本質(zhì)上是一個ResultStage對象挑围,前面的所有stage被稱為ShuffleMapStage礁竞。

ShuffleMapStage的結(jié)束伴隨著shuffle文件的寫磁盤。

ResultStage基本上對應代碼中的action算子杉辙,即將一個函數(shù)應用在RDD的各個partition的數(shù)據(jù)集上模捂,意味著一個job的運行結(jié)束。

Shuffle中的任務(wù)個數(shù)

map端task個數(shù)的確定

Shuffle過程中的task個數(shù)由RDD分區(qū)數(shù)決定蜘矢,而RDD的分區(qū)個數(shù)與參數(shù)spark.default.parallelism有密切關(guān)系狂男。

在Yarn Cluster模式下,如果沒有手動設(shè)置spark.default.parallelism硼端,則有:

Others: total number of cores on all executor nodes or 2, whichever is larger. spark.default.parallelism = max(所有executor使用的core總數(shù)并淋,2)

如果進行了手動配置,則:

spark.default.parallelism = 配置值

還有一個重要的配置:

The maximum number of bytes to pack into a single partition when reading files. spark.files.maxPartitionBytes = 128 M (默認)

代表著rdd的一個分區(qū)能存放數(shù)據(jù)的最大字節(jié)數(shù)珍昨,如果一個400MB的文件县耽,只分了兩個區(qū),則在action時會發(fā)生錯誤镣典。

當一個spark應用程序執(zhí)行時兔毙,生成sparkContext,同時會生成兩個參數(shù)兄春,由上面得到的spark.default.parallelism推導出這兩個參數(shù)的值:

sc.defaultParallelism = spark.default.parallelism

sc.defaultMinPartitions = min(spark.default.parallelism,2)

當以上參數(shù)確定后澎剥,就可以推算RDD分區(qū)數(shù)目了。

(1)通過scala集合方式parallelize生成的RDD

val rdd = sc.parallelize(1 to 10)

這種方式下赶舆,如果在parallelize操作時沒有指定分區(qū)數(shù)哑姚,則有:

rdd的分區(qū)數(shù) = sc.defaultParallelism

(2)在本地文件系統(tǒng)通過textFile方式生成的RDD

val rdd = sc.textFile("path/file")

rdd的分區(qū)數(shù) = max(本地file的分片數(shù)祭饭,sc.defaultMinPartitions)

(3)在HDFS文件系統(tǒng)生成的RDD

rdd的分區(qū)數(shù) = max(HDFS文件的Block數(shù)目,sc.defaultMinPartitions)

(4)從HBase數(shù)據(jù)表獲取數(shù)據(jù)并轉(zhuǎn)換為RDD

rdd的分區(qū)數(shù) = Table的region個數(shù)

(5)通過獲取json(或者parquet等等)文件轉(zhuǎn)換成的DataFrame

rdd的分區(qū)數(shù) = 該文件在文件系統(tǒng)中存放的Block數(shù)目

(6)Spark Streaming獲取Kafka消息對應的分區(qū)數(shù)

基于Receiver:

在Receiver的方式中,Spark中的partition和kafka中的partition并不是相關(guān)的叙量,所以如果我們加大每個topic的partition數(shù)量倡蝙,僅僅是增加線程來處理由單一Receiver消費的主題。但是這并沒有增加Spark在處理數(shù)據(jù)上的并行度绞佩。

基于DirectDStream:

Spark會創(chuàng)建跟Kafka partition一樣多的RDD partition寺鸥,并且會并行從Kafka中讀取數(shù)據(jù),所以在Kafka partition和RDD partition之間品山,有一個一對一的映射關(guān)系胆建。

reduce端task個數(shù)的確定

Reduce端進行數(shù)據(jù)的聚合,一部分聚合算子可以手動指定reduce task的并行度肘交,如果沒有指定笆载,則以map端的最后一個RDD的分區(qū)數(shù)作為其分區(qū)數(shù),那么分區(qū)數(shù)就決定了reduce端的task的個數(shù)涯呻。

reduce端數(shù)據(jù)的讀取

根據(jù)stage的劃分我們知道宰译,map端task和reduce端task不在相同的stage中,map task位于ShuffleMapStage魄懂,reduce task位于ResultStage沿侈,map task會先執(zhí)行,那么后執(zhí)行的reduce task如何知道從哪里去拉去map task落盤后的數(shù)據(jù)呢市栗?

reduce端的數(shù)據(jù)拉取過程如下:

1缀拭、map task執(zhí)行完畢后會將計算狀態(tài)以及磁盤小文件位置等信息封裝到mapStatue對象中,然后由本進程中的MapOutPutTrackerWorker對象將mapstatus對象發(fā)送給Driver進程的MapOutPutTrackerMaster對象填帽;

2蛛淋、在reduce task開始執(zhí)行之前會先讓本進程中的MapOutPutTrackerWorker向Driver進程中的MapOutPutTrackerMaster發(fā)動請求,請求磁盤小文件位置信息篡腌;

3褐荷、當所有的Map task執(zhí)行完畢后,Driver進程中的MapOutPutTrackerMaster就掌握了所有的磁盤小文件的位置信息嘹悼。此時MapOutPutTrackerMaster會告訴MapOutPutTrackerWorker磁盤小文件的位置信息叛甫;

4、完成之前的操作之后杨伙,由BlockerTransforService去Executor所在的節(jié)點拉數(shù)據(jù)其监,默認會啟動五個子線程。每次拉取的數(shù)據(jù)量不能超過48M(reduce task每次最多拉取48M數(shù)據(jù)限匣,將拉來的數(shù)據(jù)存儲到Executor內(nèi)存的20%內(nèi)存中)抖苦。

HashShuffle解析

以下的討論都假設(shè)每個Executor有一個CPU core。

1、未經(jīng)優(yōu)化的HashShuffleManager

shuffle write階段锌历,主要就是在一個stage結(jié)束計算之后贮庞,為了下一個stage可以執(zhí)行shuffle類的算子(比如reduceByKey),而將每個task處理的數(shù)據(jù)按key進行“劃分”究西。所謂“劃分”贸伐,就是對相同的key執(zhí)行hash算法,從而將相同key都寫入同一個磁盤文件中怔揩,而每一個磁盤文件都只屬于下游stage的一個task。在將數(shù)據(jù)寫入磁盤之前脯丝,會先將數(shù)據(jù)寫入內(nèi)存緩沖中商膊,當內(nèi)存緩沖填滿之后,才會溢寫到磁盤文件中去宠进。

下一個stage的task有多少個晕拆,當前stage的每個task就要創(chuàng)建多少份磁盤文件。比如下一個stage總共有100個task材蹬,那么當前stage的每個task都要創(chuàng)建100份磁盤文件实幕。如果當前stage有50個task,總共有10個Executor堤器,每個Executor執(zhí)行5個task昆庇,那么每個Executor上總共要創(chuàng)建500個磁盤文件,所有Executor上會創(chuàng)建5000個磁盤文件闸溃。由此可見整吆,未經(jīng)優(yōu)化的shuffle write操作所產(chǎn)生的磁盤文件的數(shù)量是極其驚人的。

shuffle read階段辉川,通常就是一個stage剛開始時要做的事情表蝙。此時該stage的每一個task就需要將上一個stage的計算結(jié)果中的所有相同key,從各個節(jié)點上通過網(wǎng)絡(luò)都拉取到自己所在的節(jié)點上乓旗,然后進行key的集合或鏈接等操作府蛇。由于shuffle write的過程中,map task個下游stage的每個reduce task都創(chuàng)建了一個磁盤文件屿愚,因此shuffle read的過程中汇跨,每個reduce task只要從上游stage的所有map task所在的節(jié)點上,拉取屬于自己的那一個磁盤文件即可妆距。

shuffle read的拉取過程是一邊拉取一邊進行聚合的扰法。每個shuffle read task都會有一個自己的buffer緩沖,每次都只能拉取與buffer緩沖相同大小的數(shù)據(jù)毅厚,然后通過你村中的一個Map進行聚合等操作塞颁。聚合完一批數(shù)據(jù)后,再拉取下一批數(shù)據(jù),并放到buffer緩沖中進行聚合操作祠锣。以此類推酷窥,知道最后將所有數(shù)據(jù)到拉取完,并得到最終的結(jié)果伴网。

未經(jīng)優(yōu)化的HashShuffleManager工作原理如下圖所示:

img

2蓬推、優(yōu)化后的HashShuffleManager

為了優(yōu)化HashShuffleManager我們可以設(shè)置一個參數(shù),spark.shuffle.consolidateFiles澡腾,該參數(shù)默認值為false沸伏,將其設(shè)置為true即可開啟優(yōu)化機制,通常來說动分,如果我們使用HashShuffleManager毅糟,那么都建議開啟這個選項。

開啟consolidate機制之后澜公,在shuffle write過程中姆另,task就不是為了下游stage的每個task創(chuàng)建一個磁盤文件了,此時會出現(xiàn)shuffleFileGroup的概念坟乾,每個shuffleFileGroup會對應一批磁盤文件迹辐,磁盤文件的數(shù)量與下游stage的task數(shù)量是相同的。一個Executor上有多少個CPU core甚侣,就可以并行執(zhí)行多少個task明吩。而第一批并行執(zhí)行的每個task都會闖將一個shuffleFileGroup,并將數(shù)據(jù)寫入對應的磁盤文件內(nèi)殷费。

當Executor的CPU core執(zhí)行完一批task贺喝,接著執(zhí)行下一批task時,下一批task就會復用之前已有的shuffleFileGroup宗兼,包括其中的磁盤文件躏鱼,也就是說,此時task會將數(shù)據(jù)寫入已有的磁盤文件中殷绍,而不會寫入新的磁盤文件中染苛。因此,consolidate機制允許不同的task復用同一批磁盤文件主到,這樣就可以有效將多個task的磁盤文件進行一定程度上的合并茶行,從而大幅度減少磁盤文件的數(shù)量,進而提升shuffle write的性能登钥。

假設(shè)第二個stage有100個task畔师,第一個stage有50個task,總共還是有10個Executor(Executor CPU個數(shù)為1)牧牢,每個Executor執(zhí)行5個task看锉。那么原本使用未經(jīng)優(yōu)化的HashSHuffleManager時姿锭,每個Executor會產(chǎn)生500個磁盤文件,所有Executor會產(chǎn)生5000個磁盤文件的伯铣。但是此時經(jīng)過優(yōu)化之后呻此,每個Executor創(chuàng)建的磁盤文件的數(shù)量的計算公式為:CPU core的數(shù)量 * 下一個stage的task數(shù)量,也就是說腔寡,每個Executor此時只會創(chuàng)建100個磁盤文件跃巡,所有Executor只會創(chuàng)建1000個磁盤文件赁咙。

優(yōu)化后的HashShuffleManager工作原理如下圖所示:

img

SortShuffle解析

SortShuffleManager的運行機制主要分為兩種比肄,一種是普通運行機制痊焊,另一種是bypass運行機制。當shuffle read task的數(shù)量小于等于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值時(默認為200)凭语,就會啟用bypass機制葱她。

1、普通運行機制

在該模式下叽粹,數(shù)據(jù)會先寫入一個內(nèi)存數(shù)據(jù)結(jié)構(gòu)中此時根據(jù)不同的shuffle算子,可能選用不同的數(shù)據(jù)結(jié)構(gòu)却舀,如果是reduceByKey這種聚合類的shuffle算子虫几,那么會選用Map數(shù)據(jù)結(jié)構(gòu),一邊通過Map進行聚合挽拔,一邊寫入內(nèi)存辆脸;如果是join這種普通的shuffle算子,那么會選用Array數(shù)據(jù)結(jié)構(gòu)螃诅,直接寫入內(nèi)存啡氢。接著,每寫一條數(shù)據(jù)進如內(nèi)存數(shù)據(jù)結(jié)構(gòu)之后术裸,就會判斷一下倘是,是否達到了某個臨界閾值。如果達到臨界閾值的話袭艺,那么就會嘗試將內(nèi)存數(shù)據(jù)結(jié)構(gòu)中的數(shù)據(jù)溢寫到磁盤搀崭,然后清空內(nèi)存數(shù)據(jù)結(jié)構(gòu)。

在溢寫到磁盤文件之前猾编,會先根據(jù)key對內(nèi)存數(shù)據(jù)結(jié)構(gòu)中已有的數(shù)據(jù)進行排序瘤睹。排序過后,會分批將數(shù)據(jù)寫入磁盤文件答倡。默認的batch數(shù)量是10000條轰传,也就是說,排序好的數(shù)據(jù)瘪撇,會以每批1萬條數(shù)據(jù)的形式分批寫入磁盤文件获茬。寫入磁盤文件是通過Java的BufferedOutputStream實現(xiàn)的港庄。BufferedOutputStream是Java的緩沖輸出流,首先會將數(shù)據(jù)緩沖在內(nèi)存中锦茁,當內(nèi)存緩沖滿溢之后再一次寫入磁盤文件中攘轩,這樣可以減少磁盤IO次數(shù),提升性能码俩。

一個task將所有數(shù)據(jù)寫入內(nèi)存數(shù)據(jù)結(jié)構(gòu)的過程中度帮,會發(fā)生多次磁盤溢寫操作,也就會產(chǎn)生多個臨時文件稿存。最后會將之前所有的臨時磁盤文件都進行合并笨篷,這就是merge過程,此時會將之前所有臨時磁盤文件中的數(shù)據(jù)讀取出來瓣履,然后依次寫入最終的磁盤文件之中率翅。此外,由于一個task就只對應一個磁盤文件袖迎,也就意味著該task為下游stage的task準備的數(shù)據(jù)都在這一個文件中冕臭,一次你還會單獨寫一份索引文件,其中標識了下游各個task的數(shù)據(jù)在文件中的start offset與end offset燕锥。

SortShuffleManager由于有一個磁盤文件merge的過程辜贵,因此大大減少了文件數(shù)量。比如第一個stage有50個task归形,總共有10個Executor托慨,每個Executor執(zhí)行5個task,而第二個stage有100個task暇榴。由于每個task最終只有一個磁盤文件厚棵,因此此時每個Executor上只有5個磁盤文件,所有Executor只有50個磁盤文件蔼紧。

普通運行機制的SortShuffleManager工作原理如下圖所示:

img

2婆硬、bypass運行機制

bypass運行機制的觸發(fā)條件如下:

(1)shuffle map task數(shù)量小于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值。

(2)不是聚合類的shuffle算子奸例。

此時柿祈,每個task會為每個下游task都創(chuàng)建一個臨時磁盤文件,并將數(shù)據(jù)按key進行hash然后根據(jù)key的hash值哩至,將key寫入對應的磁盤文件之中躏嚎。當然,寫入磁盤文件時也是先寫入內(nèi)存緩沖菩貌,緩沖寫滿之后再溢寫到磁盤文件的卢佣。最后,同樣會將所有臨時磁盤文件都合并成一個磁盤文件箭阶,并創(chuàng)建一個單獨的索引文件虚茶。

該過程的磁盤寫機制其實跟未經(jīng)優(yōu)化的HashShuffleManager是一模一樣的戈鲁,因為都要創(chuàng)建數(shù)量驚人的磁盤文件,只是在最后會做一個磁盤文件的合并而已嘹叫。因此少量的最終磁盤文件婆殿,也讓該機制相對未經(jīng)優(yōu)化的HashShuffleManager來說,shuffleread的性能會更好罩扇。

而該機制與普通SortShuffleManager運行機制的不同在于:第一婆芦,磁盤寫機制不同;第二喂饥,不會進行排序消约。也就是說,啟用該機制的最大好處在于员帮,shuffle write過程中或粮,不需要進行數(shù)據(jù)的排序操作,也就節(jié)省掉了這部分的性能開銷捞高。

普通運行機制的SortShuffleManager工作原理如下圖所示:

img

八氯材、Spark內(nèi)存管理

在執(zhí)行Spark應用程序時,Spark集群會啟動Driver和Executor兩種JVM進程硝岗,前者為主控進程氢哮,負責創(chuàng)建Spark上下文,提交Spark作業(yè)(Job)辈讶,并將作業(yè)轉(zhuǎn)化為計算任務(wù)(Task)命浴,在各個Executor進程間協(xié)調(diào)任務(wù)的調(diào)度娄猫,后者負責在工作節(jié)點上執(zhí)行具體的計算任務(wù)贱除,并將結(jié)果返回給Driver,同時為需要持久化的RDD提供存儲功能媳溺。

堆內(nèi)和堆外內(nèi)存規(guī)劃

作為一個JVM進程月幌,Executor的內(nèi)存管理建立在JVM的內(nèi)存管理之上,Spark對JVM的堆內(nèi)(On-heap)空間進行了更為詳細的分配悬蔽,以充分利用內(nèi)存扯躺。同時,Spark引入了堆外(Off-heap)內(nèi)存蝎困,使之可以直接在工作節(jié)點的系統(tǒng)內(nèi)存中開辟空間录语,進一步優(yōu)化了內(nèi)存的使用。

堆內(nèi)內(nèi)存受到JVM統(tǒng)一管理禾乘,堆外內(nèi)存是直接向操作系統(tǒng)進行內(nèi)存的申請和釋放澎埠。

img

1、堆內(nèi)內(nèi)存

堆內(nèi)內(nèi)存的大小始藕,由Spark應用程序啟動時的- executor-memory或spark.executor.memory參數(shù)配置蒲稳。Executor內(nèi)運行的并發(fā)任務(wù)共享JVM堆內(nèi)內(nèi)存氮趋,這些任務(wù)在緩存RDD數(shù)據(jù)和廣播(Broadcast)數(shù)據(jù)時占用的內(nèi)存被規(guī)劃為存儲(Storage)內(nèi)存,而這些任務(wù)在執(zhí)行Shuffle時占用的內(nèi)存被規(guī)劃為執(zhí)行(Execution)內(nèi)存江耀,剩余的部分不做特殊規(guī)劃剩胁,那些Spark內(nèi)部的對象實例,或者用戶定義的Spark應用程序中的對象實例祥国,均占用剩余的空間昵观。不同的管理模式下,這三部分占用的空間大小各不相同系宫。

Spark對堆內(nèi)內(nèi)存的管理是一種邏輯上的俄“規(guī)劃式”的管理索昂,因為對象實例占用內(nèi)存的申請和釋放都由JVM完成,Spark只能在申請后和釋放前記錄這些內(nèi)存扩借。其具體流程如下:

1椒惨、Spark在代碼中new一個對象實例;

2潮罪、JVM從堆內(nèi)內(nèi)存分配空間康谆,創(chuàng)建對象并返回對象引用;

3嫉到、Spark保存該對象的引用沃暗,記錄該對象占用的內(nèi)存。

釋放內(nèi)存流程如下:

1何恶、Spark記錄該對象釋放的內(nèi)存孽锥,刪除該對象的引用;

2细层、等待JVM的垃圾回收機制釋放該對象占用的堆內(nèi)內(nèi)存惜辑。

我們知道,JVM的對象可以以序列化的方式存儲疫赎,序列化的過程是將對象轉(zhuǎn)換為二進制字節(jié)流盛撑,本質(zhì)上可以理解為將非連續(xù)空間的鏈式存儲轉(zhuǎn)化為連續(xù)空間或塊存儲,在訪問時則需要進行序列化的逆過程--反序列化捧搞,將字節(jié)流轉(zhuǎn)化為對象抵卫,序列化的方式可以節(jié)省存儲空間,但增加了存儲和讀取時候的計算開銷胎撇。

對于Spark中序列化的對象介粘,由于是字節(jié)流的形式,其占用的內(nèi)存大小可直接計算晚树,而對于非序列化的對象姻采,其占用的內(nèi)存是通過周期性地采樣近似估算而得,即并不是每次新增的數(shù)據(jù)項都會計算一次占用的內(nèi)存大小题涨,這種方法降低了時間開銷但是有可能誤差較大偎谁,導致某一時刻的實際內(nèi)存可能遠遠超出預期总滩。此外,在被Spark標記為釋放的對象實例巡雨,很有可能在實際上并沒有被JVM回收闰渔,導致實際可用的內(nèi)存小于Spark記錄的可用內(nèi)存。所以Spark并不能準確記錄實際可用的堆內(nèi)內(nèi)存铐望,從而也就無法完全避免內(nèi)存溢出(OOM冈涧,Out of Memory)的異常。

雖然不能精確控制堆內(nèi)內(nèi)存的申請和釋放正蛙,但Spark通過對存儲內(nèi)存和執(zhí)行內(nèi)存各自獨立的規(guī)劃管理督弓,可以決定是否要在存儲內(nèi)存里緩沖新的RDD,以及是否為新的任務(wù)分配執(zhí)行內(nèi)存乒验,在一定程度上可以提升內(nèi)存的利用率愚隧,減少異常的出現(xiàn)。

2锻全、堆外內(nèi)存

為了進一步優(yōu)化內(nèi)存的使用以及提高Shuffle時排序的效率狂塘,Spark引入了堆外(Off-heap)內(nèi)存,使之可以直接在工作節(jié)點的系統(tǒng)內(nèi)存中開辟空間鳄厌,存儲經(jīng)過序列化的二進制數(shù)據(jù)荞胡。

堆外內(nèi)存意味著把內(nèi)存對象分配在Java虛擬機的堆以外的內(nèi)存,這些內(nèi)存直接受操作系統(tǒng)管理(而不是虛擬機)了嚎。這樣做的結(jié)果就是能保持一個較小的堆泪漂,以減少垃圾收集對應用的影響。

利用JDK Unsafe API(從spark2.0開始歪泳,在管理堆外的存儲內(nèi)存時不再基于Tachyon萝勤,而是與堆外的執(zhí)行內(nèi)存一樣,基于JDK Unsafe API實現(xiàn))夹囚,Spark可以直接操作系統(tǒng)堆外內(nèi)存纵刘,減少了不必要的內(nèi)存開銷邀窃,以及頻繁的GC掃描和回收荸哟,提升了處理性能。堆外內(nèi)存可以被精確地申請和釋放(堆外內(nèi)存之所以能夠被精確的申請和釋放瞬捕,是由于內(nèi)存的申請和釋放不再通過JVM機制鞍历,而是直接向操作系統(tǒng)申請,JVM對于內(nèi)存的清理是無法準確指定時間點的肪虎,因此無法實現(xiàn)精確的釋放)劣砍,而且序列化的數(shù)據(jù)占用的空間可以被精確計算,所以相比堆內(nèi)內(nèi)存來說降低了管理的難度扇救,也降低了誤差刑枝。

在默認情況下堆外內(nèi)存并不啟用香嗓,可以通過配置spark.memory.offHeap.enabled參數(shù)啟用,并由spark.memory.offHeap.size參數(shù)設(shè)定堆外空間的大小装畅。除了沒有other空間靠娱,堆外內(nèi)存與堆內(nèi)內(nèi)存的劃分方式相同,所有運行中的并發(fā)任務(wù)共享存儲內(nèi)存和執(zhí)行內(nèi)存掠兄。

(該部分內(nèi)存主要用于程序的共享庫像云,Perm Space、線程Stack和一些Memory mapping等蚂夕,或者類C方式allocate object)

內(nèi)存空間分配

1迅诬、靜態(tài)內(nèi)存管理

在Spark最初采用的靜態(tài)內(nèi)存管理機制下,存儲內(nèi)存婿牍、執(zhí)行內(nèi)存和其他內(nèi)存的大小在Spark應用程序運行期間均為固定的侈贷,但用戶可以應用程序啟動前進行配置,堆內(nèi)內(nèi)存的分配如下圖所示:

img

可以看到等脂,可用的堆內(nèi)內(nèi)存的大小需要按照代碼清單的方式計算:

可用的存儲內(nèi)存 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safety Fraction

可用的執(zhí)行內(nèi)存 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safety Fraction

其中systemMaxMemory取決于當前JVM堆內(nèi)內(nèi)存的大小铐维,最后可用的執(zhí)行內(nèi)存或者存儲內(nèi)存要在此基礎(chǔ)上與各自的memoryFraction參數(shù)和safetyFraction參數(shù)相乘得出。上述計算公式中的兩個safetyFraction參數(shù)慎菲,其意義在于在邏輯預留出1-safetyFraction這么一塊保險區(qū)域嫁蛇,降低因?qū)嶋H內(nèi)存超出當前預設(shè)范圍而導致OOM的風險(上文提到,對于非序列化對象的內(nèi)存采樣估算會產(chǎn)生誤差)露该。值得注意的是睬棚,這個預留的保險區(qū)域僅僅是一種邏輯上的規(guī)劃,再具體使用時Spark并沒有區(qū)別對待解幼,和“其他內(nèi)存”一樣交給了JVM去管理抑党。

Storage內(nèi)存和Executor內(nèi)存都有預留空間,目的是防止OOM撵摆,因為Spark堆內(nèi)內(nèi)存大小的記錄是不準確的底靠,需要留出保險區(qū)域。

堆外的空間分配較為簡單特铝,只有存儲內(nèi)存和執(zhí)行內(nèi)存暑中。可用的執(zhí)行內(nèi)存和存儲內(nèi)存占用的空間大小直接由參數(shù)spark.memory.storageFraction決定鲫剿,由于堆外內(nèi)存占用的空間可以被精確計算鳄逾,所以無需再設(shè)定保險區(qū)域。

img

靜態(tài)內(nèi)存管理機制實現(xiàn)起來較為簡單灵莲,但如果用戶不熟悉Spark的鵆機制雕凹,或沒有根據(jù)具體的數(shù)據(jù)規(guī)模和計算任務(wù)或做相應的配置,很容易造成“一般海水,一般火焰”的局面枚抵,即存儲內(nèi)存和執(zhí)行內(nèi)存中的一方剩余大量的空間线欲,而另一方卻早早被占滿,不得不淘汰或移出舊的內(nèi)容以存儲新的內(nèi)容汽摹。由于新的內(nèi)存管理機制的出現(xiàn)询筏,這種方式目前已經(jīng)很少有開發(fā)者使用,出于兼容舊版本的應用程序的目的竖慧,Spark依然保留了它的實現(xiàn)嫌套。

2、統(tǒng)一內(nèi)存管理

Spark1.6之后引入的統(tǒng)一內(nèi)存管理機制圾旨,與靜態(tài)內(nèi)存管理的區(qū)別在于存儲內(nèi)存和執(zhí)行內(nèi)存共享同一塊空間踱讨,可以動態(tài)占用對方的空閑區(qū)域,統(tǒng)一內(nèi)存管理的堆內(nèi)內(nèi)存結(jié)構(gòu)如下圖所示:

img

統(tǒng)一內(nèi)存管理的堆外內(nèi)存結(jié)構(gòu)如下圖所示:

img

其中最重要的優(yōu)化在于動態(tài)占用機制砍的,其規(guī)則如下:

1痹筛、設(shè)定基本的存儲內(nèi)存和執(zhí)行內(nèi)存區(qū)域(spark.storage.storageFraction參數(shù)),該設(shè)定確定了雙方各自擁有的空間的范圍廓鞠;

2帚稠、雙方的空間都不足時,則存儲到磁盤床佳;若己方空間不足而對方空余時滋早,可借用對方的空間;(存儲空間不足是指不足以放下一個完整的Block)

3砌们、執(zhí)行內(nèi)存的空間被對方占用后杆麸,可讓對方將占用的部分轉(zhuǎn)存到磁盤,然后“歸還”借用的空間浪感;

4昔头、存儲內(nèi)存的空間被對方占用后,無法讓對方“歸還”影兽,因為需要考慮Shuffle過程中的很多因素揭斧,實現(xiàn)起來較為復雜。

統(tǒng)一內(nèi)存管理的動態(tài)占用機制如下圖所示:

img

憑借統(tǒng)一內(nèi)存管理機制峻堰,spark在一定程度上提高了堆內(nèi)和堆外內(nèi)存資源的利用率讹开,降低了開發(fā)者維護spark內(nèi)存的難度。如果存儲內(nèi)存的空間太大或者說緩存的數(shù)據(jù)過多茧妒,反而會導致頻繁的全量垃圾回收萧吠,降低任務(wù)執(zhí)行時的性能左冬,因為緩存的RDD數(shù)據(jù)通常都是長期主流內(nèi)存的桐筏。所以要想充分發(fā)揮Spark的性能,需要開發(fā)者進一步了解存儲內(nèi)存和執(zhí)行內(nèi)存各自管理方式和實現(xiàn)原理拇砰。

存儲內(nèi)存管理

1梅忌、RDD持久化機制

彈性分布式數(shù)據(jù)集(RDD)作為Spark最根本的數(shù)據(jù)抽象狰腌,是只讀的分區(qū)記錄(Partition)的集合,只能基于在穩(wěn)定物理存儲中的數(shù)據(jù)集上創(chuàng)建牧氮,或者在其他已有的RDD上執(zhí)行轉(zhuǎn)換(Transformation)操作產(chǎn)生一個新的RDD琼腔。轉(zhuǎn)換后的RDD與原始的RDD之間產(chǎn)生了依賴關(guān)系,構(gòu)成了血統(tǒng)(Lineage)踱葛。憑借血統(tǒng)丹莲,Spark保證了每一個RDD都可以被重新恢復。但是RDD的所有轉(zhuǎn)換都是有惰性的尸诽,即只有當一個返回結(jié)果給Driver的行動(Action)發(fā)生時甥材,Spark才會創(chuàng)建任務(wù)讀取RDD,然后真正觸發(fā)轉(zhuǎn)換的執(zhí)行性含。

Task在啟動之初讀取一個分區(qū)時洲赵,會先判斷這個分區(qū)是否已經(jīng)被持久化,如果沒有則需要檢查Checkpoint或按照血統(tǒng)重新計算商蕴。所以如果一個RDD上要執(zhí)行多次行動叠萍,可以在第一次行動中使用persist或cache方法,在內(nèi)存或磁盤中持久化或緩存這個RDD绪商,從而在后面的行動中提升計算速度苛谷。

事實上,cache方法是使用默認的MEMORY_ONLY的存儲級別將RDD持久化到內(nèi)存格郁,故緩存是一種特殊的持久化抄腔。堆內(nèi)和堆外存儲內(nèi)存的設(shè)計,便可以對緩存RDD時使用的內(nèi)存做統(tǒng)一的規(guī)劃和管理理张。

RDD的持久化由Spark的Storage模塊負責赫蛇,實現(xiàn)了RDD與物理存儲的解耦合。Storage模塊負責管理Spark在計算過程中產(chǎn)生的數(shù)據(jù)雾叭,將那些在內(nèi)存或磁盤悟耘、在本地或遠程存取數(shù)據(jù)的功能封裝了起來。在具體實現(xiàn)時Driver端和Executor端的Storage模塊構(gòu)成了主從式的架構(gòu)织狐,即Driver端的BlockManager為Master暂幼,Executor端的BlockManager為Slave。

Storage模塊在邏輯上以Block為基本存儲單位移迫,RDD的每個Partition經(jīng)過處理后位移對應一個Block(BlockId的格式為rdd_RDD-ID_PARTITION-ID)旺嬉。Driver端的Master負責整個Spark應用程序的Block的元數(shù)據(jù)信息的管理和維護,而Executor端的Slave需要將Block的更新等狀態(tài)上報到Master厨埋,同時接受Master的命令邪媳,例如新增或刪除一個RDD。

img

在對RDD持久化時,Spark規(guī)定了MEMORY_ONLY雨效、MEMORY_AND_DISK等7中不同的存儲級別迅涮,而存儲級別是以下5個變量的組合:

class StorageLevel private(

private var _useDisk: Boolean, //磁盤

private var _useMemory: Boolean, //這里其實是指堆內(nèi)內(nèi)存

private var _useOffHeap: Boolean, //堆外內(nèi)存

private var _deserialized: Boolean, //是否為非序列化

private var _replication: Int = 1 //副本個數(shù)

)

Spark中7中存儲級別如下:

img

通過對數(shù)據(jù)結(jié)構(gòu)的分析氮唯,可以看出存儲級別從三個維度定義了RDD的Partition(同時也就是Block)的存儲方式:

(1)存儲位置:磁盤/堆內(nèi)內(nèi)存/堆外內(nèi)存瀑晒。如MEMORY_AND_DISK是同時在磁盤和堆內(nèi)內(nèi)存上存儲触创,實現(xiàn)了冗余備份叫潦。OFF_HEAP則是只在堆外內(nèi)存存儲嫩痰,目前選擇堆外內(nèi)存時不能同時存儲到其他位置凿菩。

(2)存儲形式:Block緩存到存儲內(nèi)存后凄敢,是否為非序列化的形式韧衣。如MEMORY_ONLY是非序列化方式存儲极颓,OFF_HEAP是序列化方式存儲旷祸。

(3)副本數(shù)量:大于1時需要遠程冗余備份到其他節(jié)點。如DISK_ONLY_2需要遠程備份1個副本讼昆。

2托享、RDD的緩存過程

RDD在緩存到存儲內(nèi)存之前,Partition中的數(shù)據(jù)一般以迭代器(Iterator)的數(shù)據(jù)結(jié)構(gòu)來訪問浸赫,這是Scala語言中一種遍歷數(shù)據(jù)集合的方法闰围。通過Iterator可以獲取分區(qū)中每一條序列化或者非序列化的數(shù)據(jù)項(Record),這些Record的對象實例在邏輯上占用了JVM堆內(nèi)內(nèi)存的other部分的空間既峡,同一Partition的不同Record的存儲空間并不連續(xù)羡榴。

RDD在緩存到存儲內(nèi)存之后,Partition被轉(zhuǎn)換成Block运敢,Record在堆內(nèi)或堆外存儲內(nèi)存中占用一塊連續(xù)的空間校仑。將Partition由不連續(xù)的存儲空間轉(zhuǎn)換為連續(xù)存儲空間的過程,Spark稱之為“展開”(Unroll)传惠。

Block有序列化和非序列化兩種存儲格式迄沫,具體以哪種方式取決于該RDD的存儲級別。非序列化的Block以一種DeserializedMemoryEntry的數(shù)據(jù)結(jié)構(gòu)定義卦方,用一個數(shù)組存儲所有的對象實例羊瘩,序列化的Block則以SerializedMemoryEntry的數(shù)據(jù)結(jié)構(gòu)定義,用字節(jié)緩沖區(qū)(ByteBuffer)來存儲二進制數(shù)據(jù)盼砍。每個Executor的Storage模塊用一個鏈式Map結(jié)構(gòu)(LinkedHashMap)來管理堆內(nèi)和堆外存儲內(nèi)存中所有的Block對象的實例尘吗,對這個LinkedHashMap新增和刪除間接記錄了內(nèi)存的申請和釋放。

因為不能保證存儲空間可以一次容納Iterator中的所有數(shù)據(jù)浇坐,當前的計算任務(wù)在Unroll時要向MemoryManager申請足夠的Unroll空間來臨時占位睬捶,空間不足則Unroll失敗,空間足夠時可以繼續(xù)進行近刘。

對于序列化的Partition擒贸,其所需的Unroll空間可以直接累加計算臀晃,一次申請。

對于非序列化的Partition則要在便利Record的過程中一次申請酗宋,即每讀取一條Record积仗,采樣估算其所需的Unroll空間并進行申請疆拘,空間不足時可以中斷蜕猫,釋放已占用的Unroll空間。

如果最終Unroll成功哎迄,當前Partition所占用的Unroll空間被轉(zhuǎn)換為正常的緩存RDD的存儲空間回右,如下圖所示。

img

在靜態(tài)內(nèi)存管理時漱挚,Spark在存儲內(nèi)存中專門劃分了一塊Unroll空間翔烁,其大小是固定的,統(tǒng)一內(nèi)存管理時則沒有對Unroll空間進行特別區(qū)分旨涝,當存儲空間不足時會根據(jù)動態(tài)占用機制進行處理蹬屹。

3、淘汰與落盤

由于同一個Executor的所有的計算任務(wù)共享有限的存儲內(nèi)存空間白华,當有新的Block需要緩存單數(shù)剩余空間不足且無法動態(tài)占用時慨默,就要對LinkedHashMap中的舊Block進行淘汰(Eviction),而被淘汰的Block如果其存儲級別中同時包含存儲到磁盤的要求弧腥,則要對其進行落盤(Drop)厦取,否則直接刪除該Block。

存儲內(nèi)存的淘汰規(guī)則為:

被淘汰的舊Block要與新的Block的MemoryMode相同管搪,即同屬于堆外或堆內(nèi)內(nèi)存虾攻;

新舊Block不能屬于同一個RDD,避免循環(huán)淘汰更鲁;

舊Block所屬RDD不能處于被讀狀態(tài)霎箍,避免引發(fā)一致性問題;

遍歷LinkedHashMap中Block澡为,按照最近最少使用(LRU)的順序淘汰朋沮,直到滿足新Block所需的空間。其中LRU是LinkedHashMap的特性缀壤。

落盤的流程則比較簡單樊拓,如果其存儲級別符合useDisk為true的條件,再根據(jù)其deserialized判斷是否是非序列化的形式塘慕,若是則對其進行序列化筋夏,最后將數(shù)據(jù)存儲到磁盤,在Storage模塊中更新其信息图呢。

執(zhí)行內(nèi)存管理

執(zhí)行內(nèi)存主要用來存儲任務(wù)再在執(zhí)行Shuffle時占用的內(nèi)存条篷,Shuffle是按照一定規(guī)則對RDD數(shù)據(jù)重新分區(qū)的過程骗随,Shuffle的Write和Read兩階段對執(zhí)行內(nèi)存的使用:

Shuffle Write

在map端會采用ExternalSorter進行外排,在內(nèi)存中存儲數(shù)據(jù)時主要占用堆內(nèi)執(zhí)行空間赴叹。

Shuffle Read

(1)在對reduce端的數(shù)據(jù)進行聚合時鸿染,要將數(shù)據(jù)交給Aggregator處理,在內(nèi)存中存儲數(shù)據(jù)時占用堆內(nèi)執(zhí)行空間乞巧。

(2)如果需要進行最終結(jié)果排序涨椒,則要將再次將數(shù)據(jù)交給ExternalSorter處理,占用堆內(nèi)執(zhí)行空間绽媒。

在ExternalSorter和Aggregator中蚕冬,Spark會使用一種叫做AppendOnlyMap的哈希表在堆內(nèi)執(zhí)行內(nèi)存中存儲數(shù)據(jù),但是Shuffle過程中所有數(shù)據(jù)并不能都保存到該哈希表中是辕,當這個哈希表占用的內(nèi)存會進行周期性地采樣估算囤热,當其大到一定程度,無法再從MemoryManager申請到新的執(zhí)行內(nèi)存時获三,Spark就會將其全部內(nèi)容存儲到磁盤文件中旁蔼,這個過程被稱為溢存(Spill),溢存到磁盤的文件最后會被歸并(Merge)疙教。

Spark的存儲內(nèi)存和執(zhí)行內(nèi)存有著截然不同的管理方式:對于存儲內(nèi)存來說棺聊,Spark用一個LinkedHashMap來集中管理所有的Block,Block由需要緩存的RDD的Partition轉(zhuǎn)化而成松逊;而對于執(zhí)行內(nèi)存躺屁,Spark用AppendOnlyMap來存儲Shuffle過程中的數(shù)據(jù),在Tungsten排序中甚至抽象稱為頁式內(nèi)存管理经宏,開辟了全新的JVM內(nèi)存管理機制犀暑。

九、Spark核心組件解析

BlockManager數(shù)據(jù)存儲與管理機制

BlockManager是整個Spark底層負責數(shù)據(jù)存儲與管理的一個組件烁兰,Driver和Executor的所有數(shù)據(jù)都由對應的BlockManager進行管理耐亏。

Driver上有BlockManagerMaster,負責對各個節(jié)點上的BlockManager內(nèi)部管理的數(shù)據(jù)的元數(shù)據(jù)進行維護沪斟,比如block的增刪改等操作广辰,都會在這里維護好元數(shù)據(jù)的變更。

每個節(jié)點都有一個BlockManager主之,每個BlockManager創(chuàng)建之后择吊,第一件事即使去向BlockManagerMaster進行注冊,此時BlockManagerMaster會為其創(chuàng)建對應的BlockManagerInfo槽奕。

img

BlockManagerMaster與BlockManager的關(guān)系非常像NameNode與DataNode的關(guān)系几睛,BlockManagerMaster中保存BlockManager內(nèi)部管理數(shù)據(jù)的元數(shù)據(jù),進行維護粤攒,當BlockManager進行Block增刪改等操作時所森,都會在BlockManagerMaster中進行元數(shù)據(jù)的變更囱持,這與NameNode維護DataNode的元數(shù)據(jù)信息,DataNode中數(shù)據(jù)發(fā)生變化時NameNode中的元數(shù)據(jù)也會相應變化是一致的焕济。

每個節(jié)點上都有一個BlockManager纷妆,BlockManager中有三個非常重要的組件:

DisStore:負責對磁盤數(shù)據(jù)進行讀寫;

MemoryStore:負責對內(nèi)存數(shù)據(jù)進行讀寫晴弃;

BlockTransferService:負責建立BlockManager到遠程其他節(jié)點的BlockManager的連接掩幢,負責對遠程其他節(jié)點的BlockManager的數(shù)據(jù)進行讀寫;

每個BlockManager創(chuàng)建之后肝匆,做的第一件事就是向BlockManagerMaster進行注冊粒蜈,此時BlockManagerMaster會為其創(chuàng)建對應的BlockManagerInfo顺献。

使用BlockManager進行寫操作時旗国,比如說,RDD運行過程中的一些中間數(shù)據(jù)注整,或者我們手動指定了persist()能曾,會優(yōu)先將數(shù)據(jù)寫入內(nèi)存中,如果內(nèi)存大小不夠肿轨,會使用自己的算法寿冕,將內(nèi)存中的部分數(shù)據(jù)寫入磁盤;此外椒袍,如果persist()指定了要replica驼唱,那么會使用BlockTransferService將數(shù)據(jù)replicate一份到其他節(jié)點的BlockManager上去。

使用BlockManager進行讀操作時驹暑,比如說玫恳,shuffleRead操作,如果能從本地讀取优俘,就利用DisStore或者MemoryStore從本地讀取數(shù)據(jù)京办,但是本地沒有數(shù)據(jù)的話,那么會用BlockTransferService與有數(shù)據(jù)的BlockManager建立連接帆焕,然后用BlockTransferService從遠程BlockManager讀取數(shù)據(jù)惭婿;例如,shuffle Read操作中叶雹,很有可能要拉取的數(shù)據(jù)本地沒有财饥,那么此時就會從遠程有數(shù)據(jù)的節(jié)點上,找那個節(jié)點的BlockManager來拉取需要的數(shù)據(jù)折晦。

只要使用BlockManager執(zhí)行了數(shù)據(jù)增刪改的操作钥星,那么必須將Block的BlockStatus上報到BlockManagerMaster,在BlockManagerMaster上會對指定BlockManager的BlockManagerInfo內(nèi)部的BlockStatus進行增刪改操作筋遭,從而達到元數(shù)據(jù)的維護功能打颤。

Spark共享變量底層實現(xiàn)

Spark一個非常重要的特性就是共享變量暴拄。

默認情況下,如果在一個算子的函數(shù)中使用到了某個外部的變量编饺,那么這個變量的值會被拷貝到每個task中乖篷,此時每個task只能操作自己的那份變量副本。如果多個task想要共享某個變量透且,那么這種方式是做不到的撕蔼。

Spark為此提供了兩種共享變量,一種是Broadcast Variable(廣播變量)秽誊,另一種是Accumulator(累加變量)鲸沮。Broadcast Variable會將用到的變量,僅僅為每個節(jié)點拷貝一份锅论,即每個Executor拷貝一份讼溺,更大的用途是優(yōu)化性能,見上網(wǎng)絡(luò)傳輸以及內(nèi)存損耗最易。Accumulator則可以讓多個task共同操作一份變量怒坯,主要可以進行累加操作。Broadcast Variable是共享讀變量藻懒,task不能去修改它剔猿,而Accumulator可以讓多個task操作一個變量。

廣播變量

廣播變量允許編程者在每個Executor上暴力外部數(shù)據(jù)的只讀變量嬉荆,而不是給每個任務(wù)發(fā)送一個副本归敬。

每個task都會保存一份它所使用的外部變量的副本,當一個Executor上的多個task都使用一個外部變量時鄙早,對于Executor內(nèi)存的消耗是非常大的汪茧,因此,我們可以將大型外部變量封裝為廣播變量蝶锋,此時一個Executor保存一個變量副本陆爽,此Executor上的所有task共用此變量,不再是一個task單獨保存一個副本扳缕,這在一定程度上降低了Spark任務(wù)的內(nèi)存占用慌闭。

img

使用外部變量

img

使用廣播變量

Spark還嘗試使用高效的廣播算法分發(fā)廣播變量,以降低通信成本躯舔。

Spark提供的Broadcast Variable是只讀的驴剔,并且在每個Executor上只會有一個副本,而不會為每個task都拷貝一份副本粥庄,因此丧失,它的最大作用,就是減少變量到各個節(jié)點的網(wǎng)絡(luò)傳輸消耗惜互,以及在各個節(jié)點上的內(nèi)存消耗布讹。此外琳拭,Spark內(nèi)部也是用了高效的廣播算法來減少網(wǎng)絡(luò)消耗。

可以通過調(diào)用SparkContext的broadcast()方法來針對每個變量創(chuàng)建廣播變量描验。然后再算子的函數(shù)內(nèi)白嘁,使用到廣播變量時,每個Executor只會拷貝一份副本了膘流,每個task可以使用廣播變量的value()方法獲取值絮缅。

在任務(wù)運行時,Executor并不獲取廣播變量呼股,當task執(zhí)行到使用廣播變量的代碼時耕魄,會向Executor的內(nèi)存中請求廣播變量,如下圖所示:

img

之后Executor會通過BlockManager向Driver拉取廣播變量彭谁,然后提供給task進行使用吸奴,如下圖所示:

img

廣播大變量是Spark中常用的基礎(chǔ)優(yōu)化方法,通過減少內(nèi)存占用實現(xiàn)任務(wù)執(zhí)行性能的提升马靠。

累加器

累加器(accumulator):Accumulator是僅僅被相關(guān)操作累加的變量奄抽,因此可以在并行中被有效地支持蔼两。它們可用于實現(xiàn)計數(shù)器(如MapReduce)或總和計數(shù)甩鳄。

Accumulator是存在于Driver端的,集群上運行的task進行Accumulator的累加额划,隨后把值發(fā)送到Driver端妙啃,在Driver端匯總(Spark UI在SparkContext創(chuàng)建時被創(chuàng)建,即在Driver端被創(chuàng)建俊戳,因此它可以讀取Accumulator的數(shù)值)揖赴,由于Accumulator存在于Driver端,從節(jié)點讀取不到Accumulator的數(shù)值抑胎。

Spark提供的Accumulator主要用于多個節(jié)點對一個變量進行共享性的操作燥滑。Accumulator只提供了累加的功能,但是卻給我們提供了多個task對于同一個變量并行操作的功能阿逃,但是task只能對Accumulator進行累加操作铭拧,不能讀取它的值,只有Driver程序可以讀取Accumulator的值恃锉。

Accumulator的底層原理如下圖所示:

img

關(guān)注公眾號:Java大數(shù)據(jù)與數(shù)據(jù)倉庫搀菩,學習大數(shù)據(jù)技術(shù)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末破托,一起剝皮案震驚了整個濱河市肪跋,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌土砂,老刑警劉巖州既,帶你破解...
    沈念sama閱讀 206,602評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件谜洽,死亡現(xiàn)場離奇詭異,居然都是意外死亡吴叶,警方通過查閱死者的電腦和手機褥琐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來晤郑,“玉大人敌呈,你說我怎么就攤上這事≡烨蓿” “怎么了磕洪?”我有些...
    開封第一講書人閱讀 152,878評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長诫龙。 經(jīng)常有香客問我析显,道長,這世上最難降的妖魔是什么签赃? 我笑而不...
    開封第一講書人閱讀 55,306評論 1 279
  • 正文 為了忘掉前任谷异,我火速辦了婚禮,結(jié)果婚禮上锦聊,老公的妹妹穿的比我還像新娘歹嘹。我一直安慰自己,他們只是感情好孔庭,可當我...
    茶點故事閱讀 64,330評論 5 373
  • 文/花漫 我一把揭開白布尺上。 她就那樣靜靜地躺著,像睡著了一般圆到。 火紅的嫁衣襯著肌膚如雪怎抛。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,071評論 1 285
  • 那天芽淡,我揣著相機與錄音马绝,去河邊找鬼。 笑死挣菲,一個胖子當著我的面吹牛富稻,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播己单,決...
    沈念sama閱讀 38,382評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼唉窃,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了纹笼?” 一聲冷哼從身側(cè)響起纹份,我...
    開封第一講書人閱讀 37,006評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后蔓涧,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體件已,經(jīng)...
    沈念sama閱讀 43,512評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,965評論 2 325
  • 正文 我和宋清朗相戀三年元暴,在試婚紗的時候發(fā)現(xiàn)自己被綠了篷扩。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,094評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡茉盏,死狀恐怖鉴未,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情鸠姨,我是刑警寧澤铜秆,帶...
    沈念sama閱讀 33,732評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站讶迁,受9級特大地震影響连茧,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜巍糯,卻給世界環(huán)境...
    茶點故事閱讀 39,283評論 3 307
  • 文/蒙蒙 一啸驯、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧祟峦,春花似錦罚斗、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至咱筛,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間杆故,已是汗流浹背迅箩。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留处铛,地道東北人饲趋。 一個月前我還...
    沈念sama閱讀 45,536評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像撤蟆,于是被迫代替她去往敵國和親奕塑。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,828評論 2 345

推薦閱讀更多精彩內(nèi)容