Spark有幾個工具用于在計(jì)算之間調(diào)度資源禽作。Spark運(yùn)行的集群管理器(cluster manager)(主要包括 Standalone呆瞻、Mesos和YARN這三種)為跨應(yīng)用程序調(diào)度提供了便利溯泣,有靜態(tài)分區(qū)方式和動態(tài)資源分配方式兩種策幼。 其次摊唇,在各個Spark應(yīng)用內(nèi)部,Spark包含一個公平調(diào)度器來調(diào)度每個SparkContext中的資源租冠。
1. 跨應(yīng)用調(diào)度
1.1 使用靜態(tài)分區(qū)方式分配集群資源
所有集群管理器都可以使用的最簡單的選項(xiàng)是資源的靜態(tài)分區(qū)辞嗡。 這種方式就意味著闻书,每個Spark應(yīng)用都是設(shè)定一個最大可用資源總量堵未,并且該應(yīng)用在整個生命周期內(nèi)都會占住這個資源.茫负。這是Standalone和YARN模式以及粗粒度Mesos模式中使用的方法前域。
1.2 動態(tài)資源分配方式
Spark提供了一種機(jī)制译仗,可根據(jù)工作負(fù)載動態(tài)調(diào)整應(yīng)用程序占用的資源觉壶。 這意味著您的應(yīng)用程序可能會在資源不再使用時將資源返回給群集髓需,并在需求時可以再次請求使用玻褪。 如果多個應(yīng)用程序共享Spark群集中的資源肉渴,此功能特別有用。
此功能在默認(rèn)情況下處于禁用狀態(tài)带射,可用于所有粗粒度集群管理器同规,即獨(dú)立模式,YARN模式和Mesos粗粒度模式。
下面介紹配置和部署方式:
要使用這一特性有兩個前提條件券勺。首先绪钥,你的應(yīng)用必須設(shè)置spark.dynamicAllocation.enabled為true。其次关炼,你必須在每個節(jié)點(diǎn)上啟動external shuffle service程腹,并將spark.shuffle.service.enabled設(shè)為true。external shuffle service 的目的是在移除executor的時候儒拂,能夠保留executor輸出的shuffle文件寸潦。啟用external shuffle service 的方式在各個集群管理器上各不相同:
- 在Spark獨(dú)立部署的集群中,你只需要在worker啟動前設(shè)置spark.shuffle.service.enabled為true即可。
- 在Mesos粗粒度模式下,你需要在各個節(jié)點(diǎn)上運(yùn)行$SPARK_HOME/sbin/start-mesos-shuffle-service.sh 并設(shè)置 spark.shuffle.service.enabled為true即可社痛。
- 在YARN模式下见转,需要按以下步驟在各個NodeManager上啟動:這里
動態(tài)資源分配的具體實(shí)現(xiàn):
總體上來說,Spark應(yīng)該在執(zhí)行器(executor)空閑時將其關(guān)閉,而在后續(xù)要用時再申請蒜哀。因?yàn)闆]有一個固定的方法,可以預(yù)測一個執(zhí)行器在后續(xù)是否馬上會被分配去執(zhí)行任務(wù)池户,或者一個新分配的執(zhí)行器實(shí)際上是空閑的,所以我們需要一個試探性的方法凡怎,來決定是否申請或是移除一個執(zhí)行器校焦。
- 請求executor的策略:
一個啟用了動態(tài)分配的Spark應(yīng)用會有等待任務(wù)需要調(diào)度的時候,申請額外的executors统倒。在這種情況下寨典,必定意味著已有的executors已經(jīng)不足以同時執(zhí)行所有未完成的任務(wù)。
Spark會分輪次來申請執(zhí)行器房匆。實(shí)際的資源申請耸成,會在任務(wù)掛起spark.dynamicAllocation.schedulerBacklogTimeout
秒后首次觸發(fā),其后如果等待隊(duì)列中仍有掛起的任務(wù)浴鸿,則每過spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
秒后觸發(fā)一次資源申請井氢。另外,每一輪申請的執(zhí)行器個數(shù)以指數(shù)形式增長岳链。例如:一個Spark應(yīng)用可能在首輪申請1個執(zhí)行器花竞,后續(xù)的輪次申請個數(shù)可能是2個、4個掸哑、8個….约急。
采用指數(shù)級增長策略的原因有兩個:第一,應(yīng)用程序應(yīng)該在開始時謹(jǐn)慎地請求執(zhí)行器苗分,以防只需要少數(shù)的執(zhí)行者就已經(jīng)足夠了厌蔽;第二,如果一旦Spark應(yīng)用確實(shí)需要申請多個執(zhí)行器的話摔癣,那么可以確保其所需的計(jì)算資源及時增長奴饮。 - 移除executor的策略:
移除執(zhí)行器的策略就簡單得多了纬向。Spark應(yīng)用會在某個執(zhí)行器空閑超過spark.dynamicAllocation.executorIdleTimeout
秒后將其刪除,在大多數(shù)情況下戴卜,執(zhí)行器的移除條件和申請條件都是互斥的罢猪,也就是說,執(zhí)行器在有等待執(zhí)行任務(wù)掛起時叉瘩,不應(yīng)該空閑膳帕。
非動態(tài)分配模式下,執(zhí)行器可能的退出原因有執(zhí)行失敗或是相關(guān)Spark應(yīng)用已經(jīng)退出薇缅。不管是哪種原因危彩,執(zhí)行器的所有狀態(tài)都已經(jīng)不再需要,可以丟棄掉泳桦。但是在動態(tài)分配的情況下汤徽,執(zhí)行器有可能在Spark應(yīng)用運(yùn)行期間被移除。這時候灸撰,如果Spark應(yīng)用嘗試去訪問該執(zhí)行器存儲的狀態(tài)谒府,就必須重算這一部分?jǐn)?shù)據(jù)。因此浮毯,Spark需要一種機(jī)制完疫,能夠優(yōu)雅的關(guān)閉執(zhí)行器,同時還保留其狀態(tài)數(shù)據(jù)债蓝。要解決這一問題壳鹤,就需要用到 external shuffle service ,該服務(wù)在 Spark 1.2 引入饰迹。該服務(wù)在每個節(jié)點(diǎn)上都會啟動一個不依賴于任何 Spark 應(yīng)用或執(zhí)行器的獨(dú)立進(jìn)程芳誓,就可以優(yōu)雅的來關(guān)閉executor。
2. 應(yīng)用內(nèi)調(diào)度
在指定的 Spark 應(yīng)用內(nèi)部(對應(yīng)同一 SparkContext 實(shí)例)啊鸭,多個線程可能并發(fā)地提交 Spark 作業(yè)(job)锹淌。Spark 調(diào)度器是完全線程安全的,并且能夠支持 Spark 應(yīng)用同時處理多個請求(比如 : 來自不同用戶的查詢)赠制。
2.1 FIFO 調(diào)度策略
默認(rèn)情況下赂摆,Spark 應(yīng)用內(nèi)部使用 FIFO 調(diào)度策略。每個作業(yè)被劃分為多個階段(stage)(例如 : map 階段和 reduce 階段)憎妙,第一個作業(yè)在其啟動后會優(yōu)先獲取所有的可用資源库正,然后是第二個作業(yè)再申請曲楚,再第三個……厘唾。如果前面的作業(yè)沒有把集群資源占滿,則后續(xù)的作業(yè)可以立即啟動運(yùn)行龙誊,否則抚垃,后提交的作業(yè)會有明顯的延遲等待。
2.2 公平(Fair)調(diào)度策略
不過從 Spark 0.8 開始,Spark 也能支持各個作業(yè)間的公平(Fair)調(diào)度鹤树。公平調(diào)度時铣焊,Spark 以輪詢的方式給每個作業(yè)分配資源,因此所有的作業(yè)獲得的資源大體上是平均分配罕伯。這意味著曲伊,即使有大作業(yè)在運(yùn)行,小的作業(yè)再提交也能立即獲得計(jì)算資源而不是等待前面的作業(yè)結(jié)束追他,大大減少了延遲時間坟募。這種模式特別適合于多用戶配置。 要啟用公平調(diào)度器邑狸,只需設(shè)置一下 SparkContext 中 spark.scheduler.mode 屬性為 FAIR 即可 :
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)
如果啟用了公平調(diào)度策略懈糯,默認(rèn)情況下,各個資源池之間平分整個集群的資源单雾,但在資源池內(nèi)部赚哗,默認(rèn)情況下,作業(yè)是 FIFO 順序執(zhí)行的硅堆。舉例來說屿储,如果你為每個用戶創(chuàng)建了一個資源池,那么久意味著各個用戶之間共享整個集群的資源渐逃,但每個用戶自己提交的作業(yè)是按順序執(zhí)行的扩所,而不會出現(xiàn)后提交的作業(yè)搶占前面作業(yè)的資源。
默認(rèn)情況下朴乖,新提交的作業(yè)都會進(jìn)入到默認(rèn)(default)資源池中祖屏,不過作業(yè)對應(yīng)于哪個資源池,可以在提交作業(yè)的線程中用 SparkContext.setLocalProperty 設(shè)定 spark.scheduler.pool 屬性买羞。示例代碼如下 :
sc.setLocalProperty("spark.scheduler.pool", "pool1")
一旦設(shè)好了局部屬性袁勺,所有該線程所提交的作業(yè)(即 : 在該線程中調(diào)用action算子,如 : RDD.save/count/collect 等)都會使用這個資源池畜普。同樣期丰,如果需要清除資源池設(shè)置,只需在對應(yīng)線程中調(diào)用如下代碼 :
sc.setLocalProperty("spark.scheduler.pool", null)
當(dāng)然這一切都是可以修改的吃挑,比如:公平調(diào)度器還可以支持將作業(yè)分組放入資源池(pool)钝荡,然后給每個資源池配置不同的選項(xiàng)(如 : 權(quán)重)。這樣你就可以給一些比較重要的作業(yè)創(chuàng)建一個“高優(yōu)先級”資源池舶衬,或者你也可以把每個用戶的作業(yè)分到一組埠通,這樣一來就是各個用戶平均分享集群資源,而不是各個作業(yè)平分集群資源逛犹。下面就介紹一下具體的配置:
資源池屬性是一個 XML 文件端辱,可以基于 conf/fairscheduler.xml.template 修改梁剔,然后在 SparkConf 的 spark.scheduler.allocation.file 屬性指定文件路徑:
conf.set("spark.scheduler.allocation.file", "/path/to/file")
資源池 XML 配置文件格式如下,其中每個池子對應(yīng)一個 元素舞蔽,每個資源池可以有其獨(dú)立的配置 :
<?xml version="1.0"?>
<allocations>
<pool name="production">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
<pool name="test">
<schedulingMode>FIFO</schedulingMode>
<weight>2</weight>
<minShare>3</minShare>
</pool>
</allocations>
資源池的屬性需要通過配置文件來指定荣病。每個資源池都支持以下3個屬性 :
- schedulingMode: 控制資源池內(nèi)部的作業(yè)是如何調(diào)度的,可以是 FIFO 或 FAIR渗柿。
- weight: 控制資源池相對其他資源池可以分配到資源的比例个盆。默認(rèn)所有資源池的 weight 都是 1。如果你將某個資源池的 weight 設(shè)為 2朵栖,那么該資源池中的資源將是其他池子的2倍砾省。如果將 weight 設(shè)得很高,如 1000混槐,可以實(shí)現(xiàn)資源池之間的調(diào)度優(yōu)先級 编兄。 也就是說,weight=1000 的資源池總能立即啟動其對應(yīng)的作業(yè)声登。
- minShare: 除了整體 weight 之外狠鸳,每個資源池還能指定一個最小資源分配值(CPU 個數(shù)),管理員可能會需要這個設(shè)置悯嗓。公平調(diào)度器總是會嘗試優(yōu)先滿足所有活躍(active)資源池的最小資源分配值件舵,然后再根據(jù)各個池子的 weight 來分配剩下的資源。因此脯厨,minShare 屬性能夠確保每個資源池都能至少獲得一定量的集群資源铅祸。minShare 的默認(rèn)值是 0。
注意合武,沒有在配置文件中配置的資源池都會使用默認(rèn)配置(schedulingMode : FIFO临梗,weight : 1,minShare : 0)稼跳。