Spark 動(dòng)態(tài)資源分配下數(shù)據(jù)本地性導(dǎo)致的作業(yè)運(yùn)行緩慢

CoarseGrainedSchedulerBackend 以 spark.scheduler.revive.interval 默認(rèn)1s調(diào)用makeoffers(), 在分配到的executor上調(diào)度task撬碟;makeoffers() 中scheduler.resourceOffers(workerOffers)產(chǎn)生可執(zhí)行的task策略凭豪,包含task到executor的映射方面;
每一個(gè)stage對(duì)應(yīng)的tasks都由一個(gè)TaskSetManager管理氓栈,分配策略由以下生成:

for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels){
  do {
    launchedTask = resourceOfferSingleTaskSet(taskSet, maxLocality, shuffledOffers,   availableCpus, tasks)
  } while (launchedTask)}

其中myLocalityLevels是對(duì)應(yīng)taskSet中所包含的所有本地性偏好級(jí)別纺讲,包括PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY;
shuffledOffers是對(duì)executor offer的隨機(jī)化處理墩瞳,taskSet整個(gè)分配過(guò)程是兩層for循環(huán):
第一層for循環(huán)就是上面的maxLocality.taskSet.myLocalityLevels售滤;
第二層for循環(huán)在resourceOfferSingleTaskSet豹芯,對(duì)offer中的每一個(gè)executor進(jìn)行判斷悄雅,是否有一個(gè)task能夠滿足本地性偏好,和executor綁定一起铁蹈,形成一個(gè)執(zhí)行略宽闲;
其實(shí)還有第三層for循環(huán),就是dequeueTask握牧;
重點(diǎn)看TaskSetManager::resourceOffer:
一個(gè)關(guān)鍵的問(wèn)題就是如何通過(guò)延遲機(jī)制保證數(shù)據(jù)本地行容诬,其實(shí)現(xiàn)方法就在getAllowedLocalityLevel,spark關(guān)于延遲調(diào)度由三個(gè)參數(shù):
spark.locality.wait.process, spark.locality.wait.node, spark.locality.wait.rack, 默認(rèn)3s沿腰;
TaskSetManager中記錄了lastLaunchTime览徒,如果當(dāng)前時(shí)間減去lastLaunchTime大于上面的值,對(duì)應(yīng)getAllowedLocalityLevel就返回允許的本地偏好級(jí)別颂龙;
在遍歷過(guò)程中习蓬,第一層的for循環(huán)的locality和getAllowedLocalityLevel的返回值取最小值,然后執(zhí)行dequeueTask措嵌,如果dequeueTask如果返回Some(_)躲叼,則更新lastLaunchTime和currentLocalityIndex;

這就帶來(lái)一個(gè)問(wèn)題企巢,我們舉一個(gè)極端的例子:
有100個(gè)executor和100個(gè)task枫慷,每?jī)蓚€(gè)executor一個(gè)node,每四個(gè)executor一個(gè)rack,100個(gè)task的本地便好全都是到executor1的process或听,則整個(gè)調(diào)度過(guò)程如下:

  1. 第一次dequeue task1到executor1探孝,之后executor2~executor100的遍歷,由于本地性原因誉裆,全部調(diào)度失敗再姑,并且dequeueTask導(dǎo)致currentLocalityIndex=0;
  2. 3s過(guò)后找御,currentLocalityIndex加1元镀,getAllowedLocality返回NODE_LOCAL,導(dǎo)致task2被調(diào)度到executor2霎桅,但是executor3~executor100均調(diào)度失斊芤伞;
  3. 2s過(guò)后假如executor1執(zhí)行task1結(jié)束滔驶,executor1參與調(diào)度遇革,task3成功調(diào)度到executor1;
  4. dequeueTask返回Some(_)揭糕,currentLocalityIndex=0萝快,lastLaunch=curTime,其他executor調(diào)度失斨恰揪漩;
  5. 1s過(guò)后,類似步驟2的情況再次發(fā)生吏口;
  6. 假如上米娜四步循環(huán)發(fā)生奄容,會(huì)導(dǎo)致長(zhǎng)時(shí)間的executor處于idle狀態(tài),默認(rèn)60s产徊,idle的executor被系統(tǒng)釋放掉昂勒,
  7. stage被拖死;

應(yīng)對(duì)方法:

  1. 調(diào)整currentLocalityIndex和lastLaunchTime的更新策略舟铜,能夠提高task的調(diào)度效率戈盈;
  2. 減少spark.locaity.wait;

以上兩點(diǎn)均以犧牲數(shù)據(jù)本地性為代價(jià)谆刨。

補(bǔ)充寫(xiě)一點(diǎn)Spark關(guān)于本地偏好的機(jī)制塘娶,Spark通過(guò)RDD的依賴關(guān)系拓?fù)鋱D來(lái)描述整個(gè)一個(gè)Job的計(jì)算過(guò)程,整個(gè)拓?fù)鋱D通過(guò)shuffle dependency來(lái)劃分出各個(gè)stage痴荐,我們說(shuō)一個(gè)stage就是從一個(gè)shuffle-read開(kāi)始到一個(gè)shuffle-write血柳,task運(yùn)行的executor距離shuffle-read(或者讀取dfs, cache)數(shù)據(jù)物理距離越近,本地性就越強(qiáng)生兆,那hdfs距離难捌,RDD的HadoopPartition內(nèi)部就描述著split的位置信息膝宁,而這樣的信息會(huì)在DAGScheduler.submitMissingTasks時(shí)通過(guò)listenerBus以SparkListenerStageSubmitted的形式通知給ExecutorAllocationManager,ExecutorAllocation據(jù)此向ExecutorAllocationClient指示最終通過(guò)YarnAllocator如何申請(qǐng)executor根吁,申請(qǐng)獲得的executor以offer的形式最終分配給TaskScheduler员淫,offer和task最終在TaskSetManager內(nèi)部完成匹配;

TaskSetManager內(nèi)有幾個(gè)存儲(chǔ)結(jié)構(gòu)击敌,

pendingTasksForExecutor, 
pendingTasksForHost, 
pendingTasksForRack, 
pendingTasksWithNoPrefs, 
allPendingTasks介返;

由低到高,假如一個(gè)task存在于pendingTasksForExecutor沃斤,它一定存在于其他四種圣蝎,相應(yīng)的key就是executor所屬的Host,Rack等衡瓶;上面四個(gè)集合在tasks初始化的時(shí)候就根據(jù)task的location preference確定了徘公,TaskSetManager只要根據(jù)offer內(nèi)的executor,按照延遲分配的策略哮针,匹配出對(duì)應(yīng)的task即可关面,當(dāng)然,也有可能由于本地性的原因十厢,無(wú)法匹配出任何task等太;

下面終點(diǎn)講一下延遲匹配的函數(shù):

private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
......
  while (currentLocalityIndex < myLocalityLevels.length - 1) {    
    val moreTasks = myLocalityLevels(currentLocalityIndex) match {      
      case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)      
      case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)      
      case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty      
      case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)     
    }    
    if (!moreTasks) {      
      // This is a performance optimization: if there are no more tasks that can      
      // be scheduled at a particular locality level, there is no point in waiting     
      // for the locality wait timeout (SPARK-4939).      
      lastLaunchTime = curTime      
      logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +        s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")      currentLocalityIndex += 1    
    } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {      
      // Jump to the next locality level, and reset lastLaunchTime so that the next locality      
      // wait timer doesn't immediately expire      
      lastLaunchTime += localityWaits(currentLocalityIndex)      
      logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " +        s"${localityWaits(currentLocalityIndex)}ms")      
      currentLocalityIndex += 1    } 
    else {
      return myLocalityLevels(currentLocalityIndex)
    }
  }
  myLocalityLevels(currentLocalityIndex)
}

延遲策略的關(guān)鍵就是currentLocalityIndex的變化,終點(diǎn)是上面的else if蛮放,這里面一個(gè)地方有一些微妙缩抡,就是四個(gè)級(jí)別的先后順序是process, node, nopref, rack, any,有人可能會(huì)猜測(cè)nopref會(huì)被rack先選中筛武,其實(shí)不可能缝其,因?yàn)閚opref對(duì)應(yīng)的wait時(shí)間默認(rèn)是0s挎塌,所以while循環(huán)內(nèi)徘六,在遍歷到node之后,會(huì)自動(dòng)遍歷通過(guò)nopref榴都,并進(jìn)入下一次遍歷到rack待锈,所以如果我們本地性偏好幾種在rack,就可以把所有的wait值設(shè)成0嘴高,然后rack的wait值設(shè)成1~3s竿音,這樣能夠緩解因?yàn)楸镜仄玫絹?lái)的調(diào)度效率低下,極端情況下還是不能避免上面舉出的例子拴驮,但是生產(chǎn)情況下應(yīng)該會(huì)好很多春瞬。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市套啤,隨后出現(xiàn)的幾起案子宽气,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,539評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件萄涯,死亡現(xiàn)場(chǎng)離奇詭異绪氛,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)涝影,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評(píng)論 3 396
  • 文/潘曉璐 我一進(jìn)店門枣察,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人燃逻,你說(shuō)我怎么就攤上這事序目。” “怎么了伯襟?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,871評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵宛琅,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我逗旁,道長(zhǎng)嘿辟,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,963評(píng)論 1 295
  • 正文 為了忘掉前任片效,我火速辦了婚禮红伦,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘淀衣。我一直安慰自己昙读,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,984評(píng)論 6 393
  • 文/花漫 我一把揭開(kāi)白布膨桥。 她就那樣靜靜地躺著蛮浑,像睡著了一般。 火紅的嫁衣襯著肌膚如雪只嚣。 梳的紋絲不亂的頭發(fā)上沮稚,一...
    開(kāi)封第一講書(shū)人閱讀 51,763評(píng)論 1 307
  • 那天,我揣著相機(jī)與錄音册舞,去河邊找鬼蕴掏。 笑死,一個(gè)胖子當(dāng)著我的面吹牛调鲸,可吹牛的內(nèi)容都是我干的盛杰。 我是一名探鬼主播,決...
    沈念sama閱讀 40,468評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼藐石,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼即供!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起于微,我...
    開(kāi)封第一講書(shū)人閱讀 39,357評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤逗嫡,失蹤者是張志新(化名)和其女友劉穎办素,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體祸穷,經(jīng)...
    沈念sama閱讀 45,850評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡性穿,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,002評(píng)論 3 338
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了雷滚。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片需曾。...
    茶點(diǎn)故事閱讀 40,144評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖祈远,靈堂內(nèi)的尸體忽然破棺而出呆万,到底是詐尸還是另有隱情,我是刑警寧澤车份,帶...
    沈念sama閱讀 35,823評(píng)論 5 346
  • 正文 年R本政府宣布谋减,位于F島的核電站,受9級(jí)特大地震影響扫沼,放射性物質(zhì)發(fā)生泄漏出爹。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,483評(píng)論 3 331
  • 文/蒙蒙 一缎除、第九天 我趴在偏房一處隱蔽的房頂上張望严就。 院中可真熱鬧,春花似錦器罐、人聲如沸梢为。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,026評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)铸董。三九已至,卻和暖如春肴沫,著一層夾襖步出監(jiān)牢的瞬間粟害,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,150評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工樊零, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留我磁,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,415評(píng)論 3 373
  • 正文 我出身青樓驻襟,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親芋哭。 傳聞我的和親對(duì)象是個(gè)殘疾皇子沉衣,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,092評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容