Spark driver啟動(dòng)Task的流程

Spark內(nèi)部有兩大類操作,Transformation和Action叽粹;
Transformation又分窄依賴操作和寬依賴操作冀值,區(qū)分這兩種操作的很簡(jiǎn)單,RDD之間轉(zhuǎn)化過(guò)程中沒(méi)有shuffle的就是窄依賴杯道,有shuffle的就是寬依賴:

  1. RDD中數(shù)據(jù)是分partition的,每個(gè)partition分布在特定節(jié)點(diǎn)责蝠,shuffle就是指在RDD在轉(zhuǎn)化過(guò)程中党巾,一個(gè)partition中的數(shù)據(jù)需要被split成多個(gè)分片,傳入到下游RDD中的多個(gè)partition中去霜医,比如reduceByKey這類的操作齿拂,實(shí)際生產(chǎn)中,下游partition中的數(shù)據(jù)往往依賴于上游多個(gè)partition的數(shù)據(jù)肴敛,這樣就是會(huì)產(chǎn)生一個(gè)問(wèn)題署海,如果下游某個(gè)partition中的數(shù)據(jù)缺失吗购,需要重新計(jì)算上游多個(gè)partition的數(shù)據(jù),而重新計(jì)算的這些上游partition中又會(huì)同時(shí)含有下游缺失的數(shù)據(jù)partition和正常的partition砸狞,這就會(huì)造成計(jì)算的冗余捻勉;
  2. 與之相對(duì)的是窄依賴計(jì)算,上游一個(gè)或多個(gè)partition只對(duì)應(yīng)下游一個(gè)partition刀森,所以下游某個(gè)節(jié)點(diǎn)故障后踱启,某個(gè)partition缺失數(shù)據(jù),上游需要計(jì)算的所有partition不含有冗余計(jì)算研底,比如map埠偿,filter,union等等榜晦;

Spark數(shù)據(jù)集是RDD冠蒋,如果數(shù)據(jù)類型是Tuple2,還提供PariRDDFunctions的一些方法(是通過(guò)object RDD addToPariFuncitons隱式包含進(jìn)來(lái)的):

  1. 如果是窄依賴乾胶,比如map操作抖剿,生成
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF));
  2. 如果是寬依賴胚吁,比如reduceByKey操作牙躺,生成
new ShuffledRDD[K, V, C](self, partitioner) 
.setSerializer(serializer) 
.setAggregator(aggregator) 
.setMapSideCombine(mapSideCombine);

其中aggregator定義了數(shù)據(jù)merge的規(guī)則,這個(gè)merge包括在map端(類似hadoop中的combiner腕扶,也就是partition內(nèi)部數(shù)據(jù)merge的規(guī)則)和reduce端(partition之間數(shù)據(jù)merge的規(guī)則)孽拷,這個(gè)merge可以是多個(gè)value組成更大的集合,例如groupByKey半抱,也可以是多個(gè)value合并計(jì)算出新的value脓恕,比如word count作業(yè)中的reduceByKey,根據(jù)業(yè)務(wù)邏輯而定窿侈; mapSideCombine是一個(gè)boolean炼幔,表示是否需要在map端進(jìn)行merge操作,比如reduceByKey是true史简,groupByKey是false乃秀;
首先來(lái)看一下MapPartitionRDD:

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](    
  var prev: RDD[T],
  f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
  preservesPartitioning: Boolean = false)  
 extends RDD[U](prev) {
  override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None         
  override def getPartitions: Array[Partition] = firstParent[T].partitions    
  override def compute(split: Partition, context: TaskContext): Iterator[U] =        
    f(context, split.index, firstParent[T].iterator(split, context)) 
  override def clearDependencies() {
    super.clearDependencies()    
    prev = null  
  }
}

其中partitioner(默認(rèn)HashPartitioner)定義如果存在shuffle,不同的key被shuffle到下游的那一個(gè)分片(partition)中去圆兵,對(duì)于MapPartitionsRDD跺讯,不存在這樣的情況;每個(gè)RDD都會(huì)維護(hù)自己的dependencies殉农,是一個(gè)Seq刀脏,這里的dependency可能是OneToOneDependency(一對(duì)一,例如map)超凳,可能是RangeDependency(例如union愈污,兩個(gè)RDD合并成一個(gè)耀态,但是partition不會(huì)發(fā)生merge,上游RDD多個(gè)partition會(huì)變成下游RDD的partition range)暂雹,也可能是ShuffleDependency首装;
firstParent這里就是dependencies中的第一個(gè),map操作擎析,上游只有一個(gè)RDD簿盅,也就是只有一個(gè)partition,compute操作很簡(jiǎn)單揍魂,通過(guò)split指定上游partition,對(duì)其執(zhí)行f函數(shù)棚瘟,返回的也是一個(gè)Iterator现斋,其中firstParent[T].iterator,如果沒(méi)有cache或checkpoint則也是一個(gè)compute實(shí)現(xiàn)偎蘸;
上面的結(jié)果就是RDD內(nèi)部層層套R(shí)DD庄蹋,最后計(jì)算(compute)的時(shí)候,由里到外迷雪,不斷的遍歷iterator限书,完成計(jì)算,這里直觀感覺(jué)上是遍歷多次章咧,但基于scala內(nèi)部的實(shí)現(xiàn)倦西,減少不必要的遍歷;
下面再看ShuffledRDD:
compute內(nèi)部通過(guò)shuffleManager獲取上游shuffle-write產(chǎn)生的數(shù)據(jù)赁严,根據(jù)split扰柠,返回iterator,并不包涵其他的函數(shù)計(jì)算:

  override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { 
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]       
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context).read().asInstanceOf[Iterator[(K, C)]]  
  }

下面來(lái)看一下調(diào)用rdd.collect(Spark的Action操作)之后stage的劃分疼约,最后直接的處理函數(shù)是handleJobSubmitted卤档,
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite);
Stage這里面涉及到兩種ResultStage和ShuffleMapStage程剥,每一個(gè)Stage都包含若干parent stages劝枣,對(duì)于這種一對(duì)一的RDD DAG 作業(yè),parent stages這個(gè)集合中织鲸,保存的都只是上游1個(gè)stage舔腾,是一個(gè)單鏈條;劃分Stage的方法很簡(jiǎn)單昙沦,就是通過(guò)ShuffleDependency來(lái)判斷琢唾;
stage都確定好之后,將stage轉(zhuǎn)化為task進(jìn)行計(jì)算盾饮,計(jì)算的條件就是一個(gè)stage的所有parent stges都已經(jīng)計(jì)算完成采桃,stage到task的邏輯是:submitMissingTasks;
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
首先得到需要計(jì)算哪些partition懒熙,然后根據(jù)ShuffleMapStage和ResultStage分別生成ShuffleMapTask和ResultTask,然后提交task:
taskScheduler.submitTasks(new TaskSet(tasks.toArray, [stage.id](http://stage.id), stage.latestInfo.attemptId, jobId, properties))
CoarseGrainedExecutorBackend收到LaunchTask之后:

case LaunchTask(data) =>  if (executor == null) {
  logError("Received LaunchTask command but executor was null")
  System.exit(1)  
} else {    
  val taskDesc = ser.deserialize[TaskDescription](data.value)    
  logInfo("Got assigned task " + taskDesc.taskId)    
  executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask)
}

通過(guò)executor執(zhí)行task普办。
補(bǔ)充說(shuō)明一點(diǎn)工扎,整個(gè)RDD的DAG圖根據(jù)ShuffleDependency劃分出若干stage之后,或者是一個(gè)ShuffleMapStage衔蹲,或者是一個(gè)ResultStage肢娘,對(duì)于ResultStage很簡(jiǎn)單,從上游讀取數(shù)據(jù)舆驶,聚合之后就可以返回了橱健,對(duì)于ShuffleMapStage,他的目標(biāo)就是完成Shuffle-Write操作沙廉,這個(gè)實(shí)在ShuffleMapTask中的runTask完成的拘荡,而需要讀取的數(shù)據(jù)由上游的RDD的iterator提供,上游如果是普通的RDD撬陵,比如MapPartitionsRDD珊皿,直接讀取,內(nèi)部的compute函數(shù)完成兩個(gè)事情巨税,第一提供iterator蟋定,第二完成上游RDD的計(jì)算邏輯,即用函數(shù)對(duì)iterator操作草添,而這個(gè)提供數(shù)據(jù)iterator又回繼續(xù)遞歸調(diào)用更上游的RDD的compute驶兜,但是如果是ShuffledRDD,則不會(huì)遞歸到更上游果元,而是去reader促王,讀取上游數(shù)據(jù),返回iterator而晒,僅此而已蝇狼,reader的實(shí)現(xiàn)中是到driver拿到mapstatuses,里面包括block的executor位置信息倡怎,然后連接executor進(jìn)行讀取迅耘。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市监署,隨后出現(xiàn)的幾起案子颤专,更是在濱河造成了極大的恐慌,老刑警劉巖钠乏,帶你破解...
    沈念sama閱讀 212,454評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件栖秕,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡晓避,警方通過(guò)查閱死者的電腦和手機(jī)簇捍,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門只壳,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人暑塑,你說(shuō)我怎么就攤上這事吼句。” “怎么了事格?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,921評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵惕艳,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我驹愚,道長(zhǎng)远搪,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,648評(píng)論 1 284
  • 正文 為了忘掉前任么鹤,我火速辦了婚禮终娃,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘蒸甜。我一直安慰自己,他們只是感情好余佛,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,770評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布柠新。 她就那樣靜靜地躺著,像睡著了一般辉巡。 火紅的嫁衣襯著肌膚如雪恨憎。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,950評(píng)論 1 291
  • 那天郊楣,我揣著相機(jī)與錄音憔恳,去河邊找鬼。 笑死净蚤,一個(gè)胖子當(dāng)著我的面吹牛钥组,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播今瀑,決...
    沈念sama閱讀 39,090評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼程梦,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了橘荠?” 一聲冷哼從身側(cè)響起屿附,我...
    開(kāi)封第一講書(shū)人閱讀 37,817評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎哥童,沒(méi)想到半個(gè)月后挺份,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,275評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡贮懈,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,592評(píng)論 2 327
  • 正文 我和宋清朗相戀三年匀泊,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了优训。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,724評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡探赫,死狀恐怖型宙,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情伦吠,我是刑警寧澤妆兑,帶...
    沈念sama閱讀 34,409評(píng)論 4 333
  • 正文 年R本政府宣布,位于F島的核電站毛仪,受9級(jí)特大地震影響搁嗓,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜箱靴,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,052評(píng)論 3 316
  • 文/蒙蒙 一腺逛、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧衡怀,春花似錦棍矛、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,815評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至怖现,卻和暖如春茁帽,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背屈嗤。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,043評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工潘拨, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人饶号。 一個(gè)月前我還...
    沈念sama閱讀 46,503評(píng)論 2 361
  • 正文 我出身青樓铁追,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親讨韭。 傳聞我的和親對(duì)象是個(gè)殘疾皇子脂信,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,627評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容

  • 本文基于spark源碼2.11 1. 前言 shuffle是spark job中一個(gè)重要的階段,發(fā)生在map和re...
    aaron1993閱讀 11,694評(píng)論 1 12
  • 本文基于spark2.11 1. 前言 1.1 基本概念 RDD關(guān)于RDD已經(jīng)有很多文章了透硝,可以參考一下理解Spa...
    aaron1993閱讀 1,802評(píng)論 0 3
  • 1 數(shù)據(jù)傾斜調(diào)優(yōu) 1.1 調(diào)優(yōu)概述 有的時(shí)候狰闪,我們可能會(huì)遇到大數(shù)據(jù)計(jì)算中一個(gè)最棘手的問(wèn)題——數(shù)據(jù)傾斜,此時(shí)Spar...
    wisfern閱讀 2,934評(píng)論 0 23
  • Job 物理執(zhí)行圖 在 Overview 里我們初步介紹了 DAG 型的物理執(zhí)行圖濒生,里面包含 stages 和 t...
    Albert陳凱閱讀 1,554評(píng)論 0 3
  • Shuffle 過(guò)程 上一章里討論了 job 的物理執(zhí)行圖埋泵,也討論了流入 RDD 中的 records 是怎么被 ...
    Albert陳凱閱讀 4,105評(píng)論 1 10