Spark任務(wù)輸出追蹤器MapOutputTracker詳解

一.什么是shuffle

MapOutputTrancker用于跟蹤map任務(wù)的輸出狀態(tài)凡资,此狀態(tài)便于reduce任務(wù)定位到map輸出結(jié)果所在的節(jié)點地址秘狞,進而獲取中間輸出結(jié)果兼砖,每個map任務(wù)或者reduce任務(wù)谈秫,都會有其唯一的標(biāo)識惕橙,分別為mapid和reduceid雇庙,每個reduce任務(wù)的輸入可能是多個map任務(wù)的輸出谓形,因為reduce可能會到多個map任務(wù)所在的節(jié)點上去拉取Block,這一過程叫做shuffle疆前,每次shuffle的過程都有其唯一的標(biāo)識shuffleid.

二.MapOutputTrancker的創(chuàng)建方式

在Driver端和Executor端啟動的同時寒跳,都會創(chuàng)建MapOutputTracker的實例,不同的是Driver端創(chuàng)建的是MapOutputTrackerMaster竹椒,Executor端創(chuàng)建的是MapOutputTranckerWoker童太。

  • Driver端啟動時會創(chuàng)建MapOutputTrackerMaster,之后創(chuàng)建MapOutputTrackerMasterEndpoint,并且注冊到Dispatcher中,端點名稱為MapOutputTrancker书释。

  • Executor端創(chuàng)建MapOutputTranckerWorker翘贮,不僅會和Driver端一樣,注冊端點信息等爆惧,而且會從遠(yuǎn)端Driver獲取之前在NettyRpcEnv的Dispatcher中注冊好的MapOutputTrackerMasterEndpoint的引用狸页。

三.MapOutputTrancker的屬性
  • trackerEndpoint:持有Driver端上MapOutputTrackerMasterEndpoint的引用Ref
  • mapStatuses:用于維護各個map任務(wù)輸出的狀態(tài),類型為Map[Int扯再,Array[MapStatus]],key為shuffleid芍耘,Array存儲著各個map任務(wù)對于的狀態(tài)信息mapStatus。由于各個MapOutputTranckerWoker會不斷向MapOutputTranckerMaster匯報本節(jié)點的Executor運行的map任務(wù)狀態(tài)信息熄阻,因此MapOutputTranckerMaster中的mapStatuses中維護的信息是最新最全的斋竞。而MapOutputTrackerWorker的mapStatuses對于本節(jié)點上的map任務(wù)狀態(tài)是及時更新的,對于其他節(jié)點的map任務(wù)狀態(tài)則是一個緩沖秃殉,如果后續(xù)在獲取mapStatus時坝初,無法命中緩存,則向Drievr端的MapOutputTranckerMaster獲取最新的任務(wù)狀態(tài)信息钾军。
  • fetching:shuffle拉取的集合鳄袍,用來記錄當(dāng)前Executor正在從哪些Map輸出的位置拉取數(shù)據(jù)。
四.獲取mapStatus的流程

1.首先 從當(dāng)前Executor中的MapOutputTracker的mapStatuses緩存中巧颈,獲取MapStatus數(shù)組畦木,如果沒有則向遠(yuǎn)端Driver中的MapOutputTranckerMaster去獲取任務(wù)狀態(tài)信息袖扛。

2.然后 判斷fetching中是否已經(jīng)存在要獲取的shuffleid砸泛,如果有,這就說明有其他線程對此shuffleid的數(shù)據(jù)進行遠(yuǎn)程拉取了蛆封,這樣就等待其他線程拉取完畢唇礁,直到fetching中不存在要取的shuffleid,這時就從mapStatuses中再次獲取mapStatus集合惨篱。

3.如果還獲取不到盏筐,則說明其他線程拉取失敗了,則需要自己去拉取數(shù)據(jù)砸讳,首先將shuufleid加入fetching集合中琢融,表示當(dāng)前shuffleid的任務(wù)狀態(tài)信息,已經(jīng)有線程在拉取了簿寂,之后會調(diào)用ackTracker方法漾抬,向MapOutputTrackerMasterEndpoint發(fā)送消息去獲取map任務(wù)的狀態(tài)信息。

4.之后 MapOutputTrackerMaster接受到該消息之后常遂,將請求包裝成MapOutputMessage消息纳令,放入到消息隊列,異步的去處理該消息。

  • 首先會getSerializedMapOutputStatuses方法平绩,查詢本地記錄shuffle對應(yīng)的Map輸出狀態(tài)圈匆。

  • 在獲取的過程中需要為每個shuffleId分配一個分段鎖,因為這里支持并發(fā)調(diào)用捏雌,同一時間有多個線程需要獲取同一個shuffleId對應(yīng)的輸出跃赚,所以需要保證Map元數(shù)據(jù)信息只序列化或者廣播一次。所以在獲取鎖之前和得到鎖之后都需要再次查詢一下緩存性湿,可能有其他線程已經(jīng)緩存了MapStatus来累。

  • 如果緩存還是為空,則需要將MapStatus序列化或者包裝為Broadcast窘奏。對于序列化還是廣播嘹锁,通過比較序列化后的結(jié)果大小是否超出spark.shuffle.mapOutput.minSizeForBroadcast,默認(rèn)值為512K着裹。

  • 序列化完成后领猾,將此結(jié)果進行緩存,并向MapOutputTrackerWorker返回結(jié)果骇扇。

  • MapOutputTrackerWorker的askTracker接收到返回的結(jié)果后結(jié)束阻塞摔竿,將數(shù)據(jù)反序列化成mapStatus集合緩存下來,然后將shuffleid從fetching中移除少孝,喚醒哪些在fetching鎖上等待的線程继低,使這些線程可以獲取自己需要的MapStatus數(shù)組。

5.最后 返回任務(wù)狀態(tài)信息mapStatus數(shù)組稍走。

6.注意 MapOutputTrancker中會有線程池袁翁,區(qū)別于Dispatcher中的線程池,同時還有MessageLoop婿脸,和Dispatcher中非常相似粱胜。

五.ShuffleReader如何使用mapStatus

1.在ShuffleRDD的compute方法中,會獲取BlockStoreShuffleReader狐树,然后在BlockStoreShuffleReader中焙压,會調(diào)用mapOutputTracker.getMapSizesByExecutorId方法獲取一組二元組序列Seq[(BlockManagerId, Seq[(BlockId, Long)])],第一項代表了BlockManagerId抑钟,第二項描述了存儲于該BlockManager上的一組shuffle blocks涯曲。

2.getMapSizesByExecutorId會調(diào)用getStatuses方法獲取MapStatus集合,然后最后返回MapStatus集合在塔。

3.最后根據(jù)執(zhí)行的分區(qū)范圍[startPartition, endPartition]將返回的結(jié)果Array[MapStatus]轉(zhuǎn)換成Seq[(BlockManagerId, Seq[(BlockId, Long)])]幻件。

4.利用這個Seq[(BlockManagerId, Seq[(BlockId, Long)])],去指定的BlockManager中去拉取對應(yīng)的Block塊的數(shù)據(jù)用來迭代計算心俗。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末傲武,一起剝皮案震驚了整個濱河市蓉驹,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌揪利,老刑警劉巖态兴,帶你破解...
    沈念sama閱讀 216,692評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異疟位,居然都是意外死亡瞻润,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評論 3 392
  • 文/潘曉璐 我一進店門甜刻,熙熙樓的掌柜王于貴愁眉苦臉地迎上來绍撞,“玉大人,你說我怎么就攤上這事得院∩迪常” “怎么了?”我有些...
    開封第一講書人閱讀 162,995評論 0 353
  • 文/不壞的土叔 我叫張陵祥绞,是天一觀的道長非洲。 經(jīng)常有香客問我,道長蜕径,這世上最難降的妖魔是什么两踏? 我笑而不...
    開封第一講書人閱讀 58,223評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮兜喻,結(jié)果婚禮上梦染,老公的妹妹穿的比我還像新娘。我一直安慰自己朴皆,他們只是感情好帕识,可當(dāng)我...
    茶點故事閱讀 67,245評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著车荔,像睡著了一般渡冻。 火紅的嫁衣襯著肌膚如雪戚扳。 梳的紋絲不亂的頭發(fā)上忧便,一...
    開封第一講書人閱讀 51,208評論 1 299
  • 那天,我揣著相機與錄音帽借,去河邊找鬼珠增。 笑死,一個胖子當(dāng)著我的面吹牛砍艾,可吹牛的內(nèi)容都是我干的蒂教。 我是一名探鬼主播,決...
    沈念sama閱讀 40,091評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼脆荷,長吁一口氣:“原來是場噩夢啊……” “哼凝垛!你這毒婦竟也來了懊悯?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,929評論 0 274
  • 序言:老撾萬榮一對情侶失蹤梦皮,失蹤者是張志新(化名)和其女友劉穎炭分,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體剑肯,經(jīng)...
    沈念sama閱讀 45,346評論 1 311
  • 正文 獨居荒郊野嶺守林人離奇死亡捧毛,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,570評論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了让网。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片呀忧。...
    茶點故事閱讀 39,739評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖溃睹,靈堂內(nèi)的尸體忽然破棺而出而账,到底是詐尸還是另有隱情,我是刑警寧澤因篇,帶...
    沈念sama閱讀 35,437評論 5 344
  • 正文 年R本政府宣布福扬,位于F島的核電站,受9級特大地震影響惜犀,放射性物質(zhì)發(fā)生泄漏铛碑。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,037評論 3 326
  • 文/蒙蒙 一虽界、第九天 我趴在偏房一處隱蔽的房頂上張望汽烦。 院中可真熱鬧,春花似錦莉御、人聲如沸撇吞。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,677評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽牍颈。三九已至,卻和暖如春琅关,著一層夾襖步出監(jiān)牢的瞬間煮岁,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,833評論 1 269
  • 我被黑心中介騙來泰國打工涣易, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留画机,地道東北人。 一個月前我還...
    沈念sama閱讀 47,760評論 2 369
  • 正文 我出身青樓新症,卻偏偏與公主長得像步氏,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子徒爹,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,647評論 2 354