一.什么是shuffle
MapOutputTrancker用于跟蹤map任務(wù)的輸出狀態(tài)凡资,此狀態(tài)便于reduce任務(wù)定位到map輸出結(jié)果所在的節(jié)點地址秘狞,進而獲取中間輸出結(jié)果兼砖,每個map任務(wù)或者reduce任務(wù)谈秫,都會有其唯一的標(biāo)識惕橙,分別為mapid和reduceid雇庙,每個reduce任務(wù)的輸入可能是多個map任務(wù)的輸出谓形,因為reduce可能會到多個map任務(wù)所在的節(jié)點上去拉取Block,這一過程叫做shuffle疆前,每次shuffle的過程都有其唯一的標(biāo)識shuffleid.
二.MapOutputTrancker的創(chuàng)建方式
在Driver端和Executor端啟動的同時寒跳,都會創(chuàng)建MapOutputTracker的實例,不同的是Driver端創(chuàng)建的是MapOutputTrackerMaster竹椒,Executor端創(chuàng)建的是MapOutputTranckerWoker童太。
Driver端啟動時會創(chuàng)建MapOutputTrackerMaster,之后創(chuàng)建MapOutputTrackerMasterEndpoint,并且注冊到Dispatcher中,端點名稱為MapOutputTrancker书释。
Executor端創(chuàng)建MapOutputTranckerWorker翘贮,不僅會和Driver端一樣,注冊端點信息等爆惧,而且會從遠(yuǎn)端Driver獲取之前在NettyRpcEnv的Dispatcher中注冊好的MapOutputTrackerMasterEndpoint的引用狸页。
三.MapOutputTrancker的屬性
- trackerEndpoint:持有Driver端上MapOutputTrackerMasterEndpoint的引用Ref
- mapStatuses:用于維護各個map任務(wù)輸出的狀態(tài),類型為Map[Int扯再,Array[MapStatus]],key為shuffleid芍耘,Array存儲著各個map任務(wù)對于的狀態(tài)信息mapStatus。由于各個MapOutputTranckerWoker會不斷向MapOutputTranckerMaster匯報本節(jié)點的Executor運行的map任務(wù)狀態(tài)信息熄阻,因此MapOutputTranckerMaster中的mapStatuses中維護的信息是最新最全的斋竞。而MapOutputTrackerWorker的mapStatuses對于本節(jié)點上的map任務(wù)狀態(tài)是及時更新的,對于其他節(jié)點的map任務(wù)狀態(tài)則是一個緩沖秃殉,如果后續(xù)在獲取mapStatus時坝初,無法命中緩存,則向Drievr端的MapOutputTranckerMaster獲取最新的任務(wù)狀態(tài)信息钾军。
- fetching:shuffle拉取的集合鳄袍,用來記錄當(dāng)前Executor正在從哪些Map輸出的位置拉取數(shù)據(jù)。
四.獲取mapStatus的流程
1.首先 從當(dāng)前Executor中的MapOutputTracker的mapStatuses緩存中巧颈,獲取MapStatus數(shù)組畦木,如果沒有則向遠(yuǎn)端Driver中的MapOutputTranckerMaster去獲取任務(wù)狀態(tài)信息袖扛。
2.然后 判斷fetching中是否已經(jīng)存在要獲取的shuffleid砸泛,如果有,這就說明有其他線程對此shuffleid的數(shù)據(jù)進行遠(yuǎn)程拉取了蛆封,這樣就等待其他線程拉取完畢唇礁,直到fetching中不存在要取的shuffleid,這時就從mapStatuses中再次獲取mapStatus集合惨篱。
3.如果還獲取不到盏筐,則說明其他線程拉取失敗了,則需要自己去拉取數(shù)據(jù)砸讳,首先將shuufleid加入fetching集合中琢融,表示當(dāng)前shuffleid的任務(wù)狀態(tài)信息,已經(jīng)有線程在拉取了簿寂,之后會調(diào)用ackTracker方法漾抬,向MapOutputTrackerMasterEndpoint發(fā)送消息去獲取map任務(wù)的狀態(tài)信息。
4.之后 MapOutputTrackerMaster接受到該消息之后常遂,將請求包裝成MapOutputMessage消息纳令,放入到消息隊列,異步的去處理該消息。
首先會getSerializedMapOutputStatuses方法平绩,查詢本地記錄shuffle對應(yīng)的Map輸出狀態(tài)圈匆。
在獲取的過程中需要為每個shuffleId分配一個分段鎖,因為這里支持并發(fā)調(diào)用捏雌,同一時間有多個線程需要獲取同一個shuffleId對應(yīng)的輸出跃赚,所以需要保證Map元數(shù)據(jù)信息只序列化或者廣播一次。所以在獲取鎖之前和得到鎖之后都需要再次查詢一下緩存性湿,可能有其他線程已經(jīng)緩存了MapStatus来累。
如果緩存還是為空,則需要將MapStatus序列化或者包裝為Broadcast窘奏。對于序列化還是廣播嘹锁,通過比較序列化后的結(jié)果大小是否超出spark.shuffle.mapOutput.minSizeForBroadcast,默認(rèn)值為512K着裹。
序列化完成后领猾,將此結(jié)果進行緩存,并向MapOutputTrackerWorker返回結(jié)果骇扇。
MapOutputTrackerWorker的askTracker接收到返回的結(jié)果后結(jié)束阻塞摔竿,將數(shù)據(jù)反序列化成mapStatus集合緩存下來,然后將shuffleid從fetching中移除少孝,喚醒哪些在fetching鎖上等待的線程继低,使這些線程可以獲取自己需要的MapStatus數(shù)組。
5.最后 返回任務(wù)狀態(tài)信息mapStatus數(shù)組稍走。
6.注意 MapOutputTrancker中會有線程池袁翁,區(qū)別于Dispatcher中的線程池,同時還有MessageLoop婿脸,和Dispatcher中非常相似粱胜。
五.ShuffleReader如何使用mapStatus
1.在ShuffleRDD的compute方法中,會獲取BlockStoreShuffleReader狐树,然后在BlockStoreShuffleReader中焙压,會調(diào)用mapOutputTracker.getMapSizesByExecutorId方法獲取一組二元組序列Seq[(BlockManagerId, Seq[(BlockId, Long)])],第一項代表了BlockManagerId抑钟,第二項描述了存儲于該BlockManager上的一組shuffle blocks涯曲。
2.getMapSizesByExecutorId會調(diào)用getStatuses方法獲取MapStatus集合,然后最后返回MapStatus集合在塔。
3.最后根據(jù)執(zhí)行的分區(qū)范圍[startPartition, endPartition]將返回的結(jié)果Array[MapStatus]轉(zhuǎn)換成Seq[(BlockManagerId, Seq[(BlockId, Long)])]幻件。
4.利用這個Seq[(BlockManagerId, Seq[(BlockId, Long)])],去指定的BlockManager中去拉取對應(yīng)的Block塊的數(shù)據(jù)用來迭代計算心俗。