上兩節(jié)我們講了普通shuffle的操作原理,與優(yōu)化后的操作原理院尔。并對比了他們各自的特別蜻展。那么我就了解到spark shuffle其實是進行了兩步
第一步,ShuffleMapTask執(zhí)行后把計算出來的數(shù)據(jù)寫入ShuffleBlockFile里
第二步邀摆,ResultTask讀取這些數(shù)據(jù)文件進行計算纵顾。
節(jié)章節(jié)就是深入剖析這兩步的源碼。
我們在前面講過Executor在執(zhí)行Task時栋盹,調(diào)用runTask方法施逾,并返回MapStatus
try {
//獲取shuffleManager,在用shuffleManager獲取shuffleWriter對象
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
//最重要的代碼在這里
/**
* 首先調(diào)用了rdd.iterator方法,參數(shù)是傳入了當前 task要處理的partition
* 所以核心的邏輯,就是rdd的iterator方法中汉额,在這里沪饺,就實現(xiàn)了針對RDD的某個partitione,執(zhí)行
* 我們定義的算子或者函數(shù)
* 當執(zhí)行完我們自定義的算子或者函數(shù),是不是相當于針對rdd的partiton執(zhí)行了處理闷愤,那么是不是有返回值
* ok,返回的數(shù)據(jù)整葡,都是通過ShuffleWriter,經(jīng)過HashPartitioner進行分區(qū)后,寫入自己對應(yīng)的bucket
*/
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
/**
* 最后讥脐,結(jié)果返回MapStatus
* Mapstatus封裝了ShuffleMapTask計算 后的數(shù)據(jù)遭居,存儲在哪里,這其實就是BlockManager相關(guān)的信息
* BlockManager是spark底層的內(nèi)存旬渠,數(shù)據(jù)俱萍,磁盤管理的組件
* 講完shuffle,我們會講blockManager
*/
return writer.stop(success = true).get
}
里面講了
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
其實這個writer就是HashSuffleWriter.里write方法里,我們首先要判斷是否需要在Map端本地聚會告丢。是什么情況下可以聚合呢枪蘑,這要看我們實際的業(yè)務(wù),比如前面說的reduceBykey岖免。
//將每個ShuffleMapTask計算出來的rdd的partition數(shù)據(jù)岳颇,寫入本地磁盤
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
//首先要判斷,是否需要在map端本地聚合
//這里包括reduceByKey,這樣的操作颅湘,dep.aggreatetor.isDefined是true
//包括dep.mapSideCombine也是true
val iter = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
//這里是執(zhí)行本地聚會
//比如本地:(hello,1)(hello,1) ==> (hello,2)
dep.aggregator.get.combineValuesByKey(records, context)
} else {
records
}
} else {
require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
records
}
那么這里說到了本地聚合话侧,也可以叫組合(combine),有什么用呢闯参,大家想一相瞻鹏,是不是這里要走網(wǎng)絡(luò)傳遞了,要是本地聚合后鹿寨,大大減少了網(wǎng)絡(luò)流量 了
接著我們看源碼:
//如果本地聚會就本地聚會新博,
//然后遍歷數(shù)據(jù)
//對每個數(shù)據(jù),調(diào)用partitioner脚草,默認是HashPartitioner,生成bucketId
//也就是決定 了赫悄,每一份數(shù)據(jù)寫入哪個bucket中
for (elem <- iter) {
val bucketId = dep.partitioner.getPartition(elem._1)
// 調(diào)用shuffleBlockManager.forMapTask來生成bucketId對應(yīng)的Writer,然后把數(shù)據(jù)寫入bucket
shuffle.writers(bucketId).write(elem)
}
}
接著我們看forMapTask的方法。這個方法里就看到我們前面兩節(jié)講的普通shuffle與優(yōu)化后的shuffle寫入本地磁盤的區(qū)別玩讳。
ivate val shuffleState = shuffleStates(shuffleId)
private var fileGroup: ShuffleFileGroup = null
//這里很關(guān)鍵涩蜘,前面我們講過Shuffle有兩種,一種是普通的熏纯,一種是優(yōu)化后的
//這里會判斷,如果開啟了consolidate,就是consoldateShuffleFile是true
//這里不會為每個bucket都獲取一個獨立的文件
//而是為這個bucket,獲取一個ShuffleGroup的Writer
val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
fileGroup = getUnusedFileGroup()
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
//首先用shuffleId, mapId, bucketId來生成一個唯一的blockId
//然后用bucket獲取一個ShuffleGroup
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
//然后用BlockManager.getDiskWriter方法粤策,針對ShuffleGroup獲取一個Writer
/**
* 這樣我們就清楚了樟澜,如果開啟了consolidate機制
* 實際上,對于每一個bucket,都會獲取一個針對ShuffleFileGroup的writer
* 而不是一個獨立的ShuffleBlockFile的writer
*
* 這樣就實現(xiàn)了多個ShuffleMapTask的輸出 數(shù)據(jù)的合并
*/
blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize,
writeMetrics)
}}
上面講了當開啟了consolidate機制后,對于每一個bucket都會獲取一個ShuffleFileGroup的writer
而普通的shuffle的源碼如下:
else {
//如果沒有開啟consolicate機制秩贰,也就是普通的Shuffle
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
//同樣生成一個blockId
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
//然后調(diào)用 blockManager.diskBlockManager霹俺,獲取了一個代表要寫入磁盤文件的blockFile
val blockFile = blockManager.diskBlockManager.getFile(blockId)
// Because of previous failures, the shuffle file may already exist on this machine.
// If so, remove it.
if (blockFile.exists) {
if (blockFile.delete()) {
logInfo(s"Removed existing shuffle file $blockFile")
} else {
logWarning(s"Failed to remove existing shuffle file $blockFile")
}
}
//然后調(diào)用 這個方法,針對那個blockFile生成writer
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics)
/**
*所以對于普通的Shuffle操作的話
* 對于每個ShuffleMapTask輸出的bucket,都會在本地獲取一個單獨的ShuffleBlockFIle文件
*/
}
我們看到了不管是哪種的shuffle毒费,最終調(diào)用blockMamager.getDiskWriter方法寫數(shù)據(jù)到本地磁盤丙唧。這就是Shuffle的第一步,寫文件數(shù)據(jù)操作觅玻。
接下來我們看第二步想际,
在前面分析Task是不是分析了task的計算 ,調(diào)用了compute方法,里面是不是通過getReader方法得到ShuffleMapReader溪厘,調(diào)用read方法
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
.read()
.asInstanceOf[Iterator[(K, C)]]
}
override def read(): Iterator[Product2[K, C]] = {
val ser = Serializer.getSerializer(dep.serializer)
//這里和我們以前分析的圖串起來了吧DAGScheduler的MapoutputTrackerMaster中獲取自己想要的數(shù)據(jù)信息
//然后底層用 blockMamger拉取自己的數(shù)據(jù)
val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)
val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))
} else {
new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
}
} else {
require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
// Convert the Product2s to pairs since this is what downstream RDDs currently expect
iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))
}
用blockManager拉取自己的數(shù)據(jù)胡本,調(diào)用ftech方法:
def fetch[T](
shuffleId: Int,
reduceId: Int,
context: TaskContext,
serializer: Serializer)
: Iterator[T] =
{
logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
val blockManager = SparkEnv.get.blockManager
val startTime = System.currentTimeMillis
/**
* 拿到mapOutputTracker的引用 ,然后調(diào)用getServerStatuses
* suffleId表示這個stage的上一個stage的ID
* reduceId是bucketId
* 這兩個參數(shù)可以限制找到當前resultTask獲取所需要的那份數(shù)據(jù)
* getServerStatuses這個方法一定會走網(wǎng)絡(luò)通信的畸悬,因為要聯(lián)系dirver的
*/
val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
//ShuffleBlockFetcherIterator構(gòu)造 以后侧甫,就直接根據(jù)拉取地睛位置信息,通過BlockMamager
//去遠程的ShuffleTask所以節(jié)點的blockManager去拉取數(shù)據(jù)
val blockFetcherItr = new ShuffleBlockFetcherIterator(
context,
SparkEnv.get.blockManager.shuffleClient,
blockManager,
blocksByAddress,
serializer,
SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024)
val itr = blockFetcherItr.flatMap(unpackBlock)
//對拉取的數(shù)據(jù)進行封裝
val completionIter = CompletionIterator[T, Iterator[T]](itr, {
context.taskMetrics.updateShuffleReadMetrics()
})
這就是ResultTask讀取數(shù)據(jù)的過程蹋宦。在spark中Shuffle是重點披粟。