架構(gòu)
前三章從 job 的角度介紹了用戶寫的 program 如何一步步地被分解和執(zhí)行整陌。這一章主要從架構(gòu)的角度來討論 master探入,worker蚯嫌,driver 和 executor 之間怎么協(xié)調(diào)來完成整個 job 的運(yùn)行贬蛙。
實(shí)在不想在文檔中貼過多的代碼长搀,這章貼這么多,只是為了方面自己回頭 debug 的時(shí)候可以迅速定位匿辩,不想看代碼的話,直接看圖和描述即可榛丢。
部署圖
重新貼一下 Overview 中給出的部署圖:
接下來分階段討論并細(xì)化這個圖铲球。
Job 提交
下圖展示了driver program(假設(shè)在 master node 上運(yùn)行)如何生成 job,并提交到 worker node 上執(zhí)行晰赞。
Driver 端的邏輯如果用代碼表示:
finalRDD.action()
=> sc.runJob()
// generate job, stages and tasks
=> dagScheduler.runJob()
=> dagScheduler.submitJob()
=> dagSchedulerEventProcessActor ! JobSubmitted
=> dagSchedulerEventProcessActor.JobSubmitted()
=> dagScheduler.handleJobSubmitted()
=> finalStage = newStage()
=> mapOutputTracker.registerShuffle(shuffleId, rdd.partitions.size)
=> dagScheduler.submitStage()
=> missingStages = dagScheduler.getMissingParentStages()
=> dagScheduler.subMissingTasks(readyStage)
// add tasks to the taskScheduler
=> taskScheduler.submitTasks(new TaskSet(tasks))
=> fifoSchedulableBuilder.addTaskSetManager(taskSet)
// send tasks
=> sparkDeploySchedulerBackend.reviveOffers()
=> driverActor ! ReviveOffers
=> sparkDeploySchedulerBackend.makeOffers()
=> sparkDeploySchedulerBackend.launchTasks()
=> foreach task
CoarseGrainedExecutorBackend(executorId) ! LaunchTask(serializedTask)
代碼的文字描述:
當(dāng)用戶的 program 調(diào)用 val sc = new SparkContext(sparkConf)
時(shí)稼病,這個語句會幫助 program 啟動諸多有關(guān) driver 通信、job 執(zhí)行的對象掖鱼、線程然走、actor等,該語句確立了 program 的 driver 地位戏挡。
生成 Job 邏輯執(zhí)行圖
Driver program 中的 transformation() 建立 computing chain(一系列的 RDD)芍瑞,每個 RDD 的 compute() 定義數(shù)據(jù)來了怎么計(jì)算得到該 RDD 中 partition 的結(jié)果,getDependencies() 定義 RDD 之間 partition 的數(shù)據(jù)依賴褐墅。
生成 Job 物理執(zhí)行圖
每個 action() 觸發(fā)生成一個 job拆檬,在 dagScheduler.runJob() 的時(shí)候進(jìn)行 stage 劃分,在 submitStage() 的時(shí)候生成該 stage 包含的具體的 ShuffleMapTasks 或者 ResultTasks妥凳,然后將 tasks 打包成 TaskSet 交給 taskScheduler竟贯,如果 taskSet 可以運(yùn)行就將 tasks 交給 sparkDeploySchedulerBackend 去分配執(zhí)行。
分配 Task
sparkDeploySchedulerBackend 接收到 taskSet 后逝钥,會通過自帶的 DriverActor 將 serialized tasks 發(fā)送到調(diào)度器指定的 worker node 上的 CoarseGrainedExecutorBackend Actor上屑那。
Job 接收
Worker 端接收到 tasks 后,執(zhí)行如下操作
coarseGrainedExecutorBackend ! LaunchTask(serializedTask)
=> executor.launchTask()
=> executor.threadPool.execute(new TaskRunner(taskId, serializedTask))
executor 將 task 包裝成 taskRunner艘款,并從線程池中抽取出一個空閑線程運(yùn)行 task持际。一個 CoarseGrainedExecutorBackend 進(jìn)程有且僅有一個 executor 對象。
Task 運(yùn)行
下圖展示了 task 被分配到 worker node 上后的執(zhí)行流程及 driver 如何處理 task 的 result磷箕。
Executor 收到 serialized 的 task 后选酗,先 deserialize 出正常的 task,然后運(yùn)行 task 得到其執(zhí)行結(jié)果 directResult岳枷,這個結(jié)果要送回到 driver 那里芒填。但是通過 Actor 發(fā)送的數(shù)據(jù)包不宜過大呜叫,如果 result 比較大(比如 groupByKey 的 result)先把 result 存放到本地的“內(nèi)存+磁盤”上,由 blockManager 來管理殿衰,只把存儲位置信息(indirectResult)發(fā)送給 driver朱庆,driver 需要實(shí)際的 result 的時(shí)候,會通過 HTTP 去 fetch闷祥。如果 result 不大(小于spark.akka.frameSize = 10MB
)萍桌,那么直接發(fā)送給 driver。
上面的描述還有一些細(xì)節(jié):如果 task 運(yùn)行結(jié)束生成的 directResult > akka.frameSize龙考,directResult 會被存放到由 blockManager 管理的本地“內(nèi)存+磁盤”上筋现。BlockManager 中的 memoryStore 開辟了一個 LinkedHashMap 來存儲要存放到本地內(nèi)存的數(shù)據(jù)。LinkedHashMap 存儲的數(shù)據(jù)總大小不超過 Runtime.getRuntime.maxMemory * spark.storage.memoryFraction(default 0.6)
悟衩。如果 LinkedHashMap 剩余空間不足以存放新來的數(shù)據(jù)剧罩,就將數(shù)據(jù)交給 diskStore 存放到磁盤上,但前提是該數(shù)據(jù)的 storageLevel 中包含“磁盤”座泳。
In TaskRunner.run()
// deserialize task, run it and then send the result to
=> coarseGrainedExecutorBackend.statusUpdate()
=> task = ser.deserialize(serializedTask)
=> value = task.run(taskId)
=> directResult = new DirectTaskResult(ser.serialize(value))
=> if( directResult.size() > akkaFrameSize() )
indirectResult = blockManager.putBytes(taskId, directResult, MEMORY+DISK+SER)
else
return directResult
=> coarseGrainedExecutorBackend.statusUpdate(result)
=> driver ! StatusUpdate(executorId, taskId, result)
ShuffleMapTask 和 ResultTask 生成的 result 不一樣惠昔。ShuffleMapTask 生成的是 MapStatus,MapStatus 包含兩項(xiàng)內(nèi)容:一是該 task 所在的 BlockManager 的 BlockManagerId(實(shí)際是 executorId + host, port, nettyPort)挑势,二是 task 輸出的每個 FileSegment 大小镇防。ResultTask 生成的 result 的是 func 在 partition 上的執(zhí)行結(jié)果。比如 count() 的 func 就是統(tǒng)計(jì) partition 中 records 的個數(shù)潮饱。由于 ShuffleMapTask 需要將 FileSegment 寫入磁盤来氧,因此需要輸出流 writers,這些 writers 是由 blockManger 里面的 shuffleBlockManager 產(chǎn)生和控制的饼齿。
In task.run(taskId)
// if the task is ShuffleMapTask
=> shuffleMapTask.runTask(context)
=> shuffleWriterGroup = shuffleBlockManager.forMapTask(shuffleId, partitionId, numOutputSplits)
=> shuffleWriterGroup.writers(bucketId).write(rdd.iterator(split, context))
=> return MapStatus(blockManager.blockManagerId, Array[compressedSize(fileSegment)])
//If the task is ResultTask
=> return func(context, rdd.iterator(split, context))
Driver 收到 task 的執(zhí)行結(jié)果 result 后會進(jìn)行一系列的操作:首先告訴 taskScheduler 這個 task 已經(jīng)執(zhí)行完饲漾,然后去分析 result。由于 result 可能是 indirectResult缕溉,需要先調(diào)用 blockManager.getRemoteBytes() 去 fech 實(shí)際的 result考传,這個過程下節(jié)會詳解。得到實(shí)際的 result 后证鸥,需要分情況分析僚楞,如果是 ResultTask 的 result,那么可以使用 ResultHandler 對 result 進(jìn)行 driver 端的計(jì)算(比如 count() 會對所有 ResultTask 的 result 作 sum)枉层,如果 result 是 ShuffleMapTask 的 MapStatus泉褐,那么需要將 MapStatus(ShuffleMapTask 輸出的 FileSegment 的位置和大小信息)存放到 mapOutputTrackerMaster 中的 mapStatuses 數(shù)據(jù)結(jié)構(gòu)中以便以后 reducer shuffle 的時(shí)候查詢。如果 driver 收到的 task 是該 stage 中的最后一個 task鸟蜡,那么可以 submit 下一個 stage膜赃,如果該 stage 已經(jīng)是最后一個 stage,那么告訴 dagScheduler job 已經(jīng)完成揉忘。
After driver receives StatusUpdate(result)
=> taskScheduler.statusUpdate(taskId, state, result.value)
=> taskResultGetter.enqueueSuccessfulTask(taskSet, tid, result)
=> if result is IndirectResult
serializedTaskResult = blockManager.getRemoteBytes(IndirectResult.blockId)
=> scheduler.handleSuccessfulTask(taskSetManager, tid, result)
=> taskSetManager.handleSuccessfulTask(tid, taskResult)
=> dagScheduler.taskEnded(result.value, result.accumUpdates)
=> dagSchedulerEventProcessActor ! CompletionEvent(result, accumUpdates)
=> dagScheduler.handleTaskCompletion(completion)
=> Accumulators.add(event.accumUpdates)
// If the finished task is ResultTask
=> if (job.numFinished == job.numPartitions)
listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded))
=> job.listener.taskSucceeded(outputId, result)
=> jobWaiter.taskSucceeded(index, result)
=> resultHandler(index, result)
// if the finished task is ShuffleMapTask
=> stage.addOutputLoc(smt.partitionId, status)
=> if (all tasks in current stage have finished)
mapOutputTrackerMaster.registerMapOutputs(shuffleId, Array[MapStatus])
mapStatuses.put(shuffleId, Array[MapStatus]() ++ statuses)
=> submitStage(stage)
Shuffle read
上一節(jié)描述了 task 運(yùn)行過程及 result 的處理過程跳座,這一節(jié)描述 reducer(需要 shuffle 的 task )是如何獲取到輸入數(shù)據(jù)的端铛。關(guān)于 reducer 如何處理輸入數(shù)據(jù)已經(jīng)在上一章的 shuffle read 中解釋了。
問題:reducer 怎么知道要去哪里 fetch 數(shù)據(jù)疲眷?
reducer 首先要知道 parent stage 中 ShuffleMapTask 輸出的 FileSegments 在哪個節(jié)點(diǎn)禾蚕。這個信息在 ShuffleMapTask 完成時(shí)已經(jīng)送到了 driver 的 mapOutputTrackerMaster,并存放到了 mapStatuses: HashMap<stageId, Array[MapStatus]> 里面狂丝,給定 stageId换淆,可以獲取該 stage 中 ShuffleMapTasks 生成的 FileSegments 信息 Array[MapStatus],通過 Array(taskId) 就可以得到某個 task 輸出的 FileSegments 位置(blockManagerId)及每個 FileSegment 大小几颜。
當(dāng) reducer 需要 fetch 輸入數(shù)據(jù)的時(shí)候倍试,會首先調(diào)用 blockStoreShuffleFetcher 去獲取輸入數(shù)據(jù)(FileSegments)的位置。blockStoreShuffleFetcher 通過調(diào)用本地的 MapOutputTrackerWorker 去完成這個任務(wù)蛋哭,MapOutputTrackerWorker 使用 mapOutputTrackerMasterActorRef 來與 mapOutputTrackerMasterActor 通信獲取 MapStatus 信息易猫。blockStoreShuffleFetcher 對獲取到的 MapStatus 信息進(jìn)行加工,提取出該 reducer 應(yīng)該去哪些節(jié)點(diǎn)上獲取哪些 FileSegment 的信息具壮,這個信息存放在 blocksByAddress 里面。之后哈蝇,blockStoreShuffleFetcher 將獲取 FileSegment 數(shù)據(jù)的任務(wù)交給 basicBlockFetcherIterator棺妓。
rdd.iterator()
=> rdd(e.g., ShuffledRDD/CoGroupedRDD).compute()
=> SparkEnv.get.shuffleFetcher.fetch(shuffledId, split.index, context, ser)
=> blockStoreShuffleFetcher.fetch(shuffleId, reduceId, context, serializer)
=> statuses = MapOutputTrackerWorker.getServerStatuses(shuffleId, reduceId)
=> blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = compute(statuses)
=> basicBlockFetcherIterator = blockManager.getMultiple(blocksByAddress, serializer)
=> itr = basicBlockFetcherIterator.flatMap(unpackBlock)
basicBlockFetcherIterator 收到獲取數(shù)據(jù)的任務(wù)后,會生成一個個 fetchRequest炮赦,每個 fetchRequest 包含去某個節(jié)點(diǎn)獲取若干個 FileSegments 的任務(wù)怜跑。圖中展示了 reducer-2 需要從三個 worker node 上獲取所需的白色 FileSegment (FS)》涂保總的數(shù)據(jù)獲取任務(wù)由 blocksByAddress 表示性芬,要從第一個 node 獲取 4 個,從第二個 node 獲取 3 個剧防,從第三個 node 獲取 4 個植锉。
為了加快任務(wù)獲取過程,顯然要將總?cè)蝿?wù)劃分為子任務(wù)(fetchRequest)峭拘,然后為每個任務(wù)分配一個線程去 fetch俊庇。Spark 為每個 reducer 啟動 5 個并行 fetch 的線程(Hadoop 也是默認(rèn)啟動 5 個)。由于 fetch 來的數(shù)據(jù)會先被放到內(nèi)存作緩沖鸡挠,因此一次 fetch 的數(shù)據(jù)不能太多辉饱,Spark 設(shè)定不能超過 spark.reducer.maxMbInFlight=48MB
。注意這 48MB 的空間是由這 5 個 fetch 線程共享的拣展,因此在劃分子任務(wù)時(shí)彭沼,盡量使得 fetchRequest 不超過48MB / 5 = 9.6MB
。如圖在 node 1 中备埃,Size(FS0-2) + Size(FS1-2) < 9.6MB 但是 Size(FS0-2) + Size(FS1-2) + Size(FS2-2) > 9.6MB姓惑,因此要在 t1-r2 和 t2-r2 處斷開褐奴,所以圖中有兩個 fetchRequest 都是要去 node 1 fetch。那么會不會有 fetchRequest 超過 9.6MB挺益?當(dāng)然會有歉糜,如果某個 FileSegment 特別大,仍然需要一次性將這個 FileSegment fetch 過來望众。另外匪补,如果 reducer 需要的某些 FileSegment 就在本節(jié)點(diǎn)上,那么直接進(jìn)行 local read烂翰。最后夯缺,將 fetch 來的 FileSegment 進(jìn)行 deserialize,將里面的 records 以 iterator 的形式提供給 rdd.compute()甘耿,整個 shuffle read 結(jié)束踊兜。
In basicBlockFetcherIterator:
// generate the fetch requests
=> basicBlockFetcherIterator.initialize()
=> remoteRequests = splitLocalRemoteBlocks()
=> fetchRequests ++= Utils.randomize(remoteRequests)
// fetch remote blocks
=> sendRequest(fetchRequests.dequeue()) until Size(fetchRequests) > maxBytesInFlight
=> blockManager.connectionManager.sendMessageReliably(cmId,
blockMessageArray.toBufferMessage)
=> fetchResults.put(new FetchResult(blockId, sizeMap(blockId)))
=> dataDeserialize(blockId, blockMessage.getData, serializer)
// fetch local blocks
=> getLocalBlocks()
=> fetchResults.put(new FetchResult(id, 0, () => iter))
下面再討論一些細(xì)節(jié)問題:
reducer 如何將 fetchRequest 信息發(fā)送到目標(biāo)節(jié)點(diǎn)?目標(biāo)節(jié)點(diǎn)如何處理 fetchRequest 信息佳恬,如何讀取 FileSegment 并回送給 reducer捏境?
rdd.iterator() 碰到 ShuffleDependency 時(shí)會調(diào)用 BasicBlockFetcherIterator 去獲取 FileSegments。BasicBlockFetcherIterator 使用 blockManager 中的 connectionManager 將 fetchRequest 發(fā)送給其他節(jié)點(diǎn)的 connectionManager毁葱。connectionManager 之間使用 NIO 模式通信垫言。其他節(jié)點(diǎn),比如 worker node 2 上的 connectionManager 收到消息后倾剿,會交給 blockManagerWorker 處理筷频,blockManagerWorker 使用 blockManager 中的 diskStore 去本地磁盤上讀取 fetchRequest 要求的 FileSegments,然后仍然通過 connectionManager 將 FileSegments 發(fā)送回去前痘。如果使用了 FileConsolidation凛捏,diskStore 還需要 shuffleBlockManager 來提供 blockId 所在的具體位置。如果 FileSegment 不超過 spark.storage.memoryMapThreshold=8KB
芹缔,那么 diskStore 在讀取 FileSegment 的時(shí)候會直接將 FileSegment 放到內(nèi)存中坯癣,否則,會使用 RandomAccessFile 中 FileChannel 的內(nèi)存映射方法來讀取 FileSegment(這樣可以將大的 FileSegment 加載到內(nèi)存)乖菱。
當(dāng) BasicBlockFetcherIterator 收到其他節(jié)點(diǎn)返回的 serialized FileSegments 后會將其放到 fetchResults: Queue 里面坡锡,并進(jìn)行 deserialization,所以 fetchResults: Queue 就相當(dāng)于在 Shuffle details 那一章提到的 softBuffer窒所。如果 BasicBlockFetcherIterator 所需的某些 FileSegments 就在本地鹉勒,會通過 diskStore 直接從本地文件讀取,并放到 fetchResults 里面吵取。最后 reducer 一邊從 FileSegment 中邊讀取 records 一邊處理禽额。
After the blockManager receives the fetch request
=> connectionManager.receiveMessage(bufferMessage)
=> handleMessage(connectionManagerId, message, connection)
// invoke blockManagerWorker to read the block (FileSegment)
=> blockManagerWorker.onBlockMessageReceive()
=> blockManagerWorker.processBlockMessage(blockMessage)
=> buffer = blockManager.getLocalBytes(blockId)
=> buffer = diskStore.getBytes(blockId)
=> fileSegment = diskManager.getBlockLocation(blockId)
=> shuffleManager.getBlockLocation()
=> if(fileSegment < minMemoryMapBytes)
buffer = ByteBuffer.allocate(fileSegment)
else
channel.map(MapMode.READ_ONLY, segment.offset, segment.length)
每個 reducer 都持有一個 BasicBlockFetcherIterator,一個 BasicBlockFetcherIterator 理論上可以持有 48MB 的 fetchResults。每當(dāng) fetchResults 中有一個 FileSegment 被讀取完脯倒,就會一下子去 fetch 很多個 FileSegment实辑,直到 48MB 被填滿。
BasicBlockFetcherIterator.next()
=> result = results.task()
=> while (!fetchRequests.isEmpty &&
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
sendRequest(fetchRequests.dequeue())
}
=> result.deserialize()
Discussion
這一章寫了三天藻丢,也是我這個月來心情最不好的幾天剪撬。Anyway,繼續(xù)總結(jié)悠反。
架構(gòu)部分其實(shí)沒有什么好說的残黑,就是設(shè)計(jì)時(shí)盡量功能獨(dú)立,模塊獨(dú)立斋否,松耦合梨水。BlockManager 設(shè)計(jì)的不錯,就是管的東西太多(數(shù)據(jù)塊茵臭、內(nèi)存疫诽、磁盤、通信)旦委。
這一章主要探討了系統(tǒng)中各個模塊是怎么協(xié)同來完成 job 的生成奇徒、提交、運(yùn)行缨硝、結(jié)果收集逼龟、結(jié)果計(jì)算以及 shuffle 的。貼了很多代碼追葡,也畫了很多圖,雖然細(xì)節(jié)很多奕短,但遠(yuǎn)沒有達(dá)到源碼的細(xì)致程度宜肉。如果有地方不明白的,請根據(jù)描述閱讀一下源碼吧翎碑。
如果想進(jìn)一步了解 blockManager谬返,可以參閱 Jerry Shao 寫的 Spark源碼分析之-Storage模塊。