相關(guān)博客鏈接:https://blog.csdn.net/hkl15111093042/article/details/94376896
http://www.reibang.com/p/1d714f0c5e07
簡(jiǎn)答:【Spark shuffle處于一個(gè)寬依賴魁蒜,可以實(shí)現(xiàn)類似混洗的功能般堆,可將相同的Key分發(fā)至同一個(gè)Reducer進(jìn)行處理(即將多個(gè)節(jié)點(diǎn)上的同一類數(shù)據(jù)匯集到某一節(jié)點(diǎn)進(jìn)行計(jì)算)】
Shuffle的重要性:無(wú)論是在Hadoop還是Spark中心剥,Shuffle階段會(huì)涉及到磁盤的讀寫和網(wǎng)絡(luò)傳輸尘喝,因此Shuffle的性能高低直接影響整個(gè)程序的性能和吞吐量贴膘。
1.Hadoop MapReduce中的shuffle
在mapreduce中,shuffle是連接Map和Reduce之間的橋梁胜茧,Map操作的輸出要到Reduce中必須經(jīng)過(guò)shuffle操作抑片。
mapreduce中的shuffle過(guò)程:
首先就是加載文件路徑:
Path inPath=new Path(args[0])
FileInputFormat.addInputPath(job,inPath)
之后的過(guò)程大概如下所示:
【說(shuō)明:】
partitioner:決定map的數(shù)據(jù)寫到哪一個(gè)分區(qū)塊中,被哪一個(gè)reduce獲攘蛱琛茧痕;
sort:保證map的輸出結(jié)果按照指定的排序方式進(jìn)入到reduce中;
combiner:說(shuō)明數(shù)據(jù)在map端是否需要進(jìn)行一次規(guī)約(reduce)恼除;
group:將同一個(gè)reduce中踪旷,相同的key規(guī)約到同一個(gè)組中曼氛;
【mapreduce中shuffle的調(diào)優(yōu)點(diǎn):】設(shè)置combiner,減少?gòu)膍ap到reduce的數(shù)量量和io開(kāi)銷令野。
注意舀患,另外的面試點(diǎn):
?气破?mapper和reducer的數(shù)量由什么決定聊浅,mapper的數(shù)量不可以指定,reducer的數(shù)量可以指定
2.Spark中Shuffle的寫操作
2.1 基于哈希的shuffle寫操作
Spark避免了Hadoop中多余的排序(即在Reduce之前獲取的數(shù)據(jù)要經(jīng)過(guò)排序)现使,提供基于哈希的Shuffle寫操作低匙。
1.【原理:】
<1> 每一個(gè)Mapper會(huì)根據(jù)Reducer的數(shù)量創(chuàng)建出對(duì)應(yīng)數(shù)量的bucket(bucket的數(shù)量就是M*R);
<2> Mapper生成的結(jié)果會(huì)根據(jù)設(shè)置的Partition算法填充到每個(gè)bucket中碳锈,這里的bucket是抽象的概念顽冶,在該機(jī)制中每個(gè)bucket對(duì)應(yīng)一個(gè)文件,
<3>當(dāng)Reduce啟動(dòng)時(shí)售碳,會(huì)根據(jù)任務(wù)的編號(hào)和所依賴的Mapper的編號(hào)强重,從遠(yuǎn)端或者本地獲取相應(yīng)的bucket作為Reduce任務(wù)的輸入
2.【HashShuffleManager】
博客鏈接:https://blog.csdn.net/weixin_39216383/article/details/81174950
這里的M*R個(gè),相當(dāng)于executor數(shù) * 每個(gè)executor上的task數(shù)目
優(yōu)化點(diǎn):如果我們使用HashShuffleManager贸人,可以設(shè)置spark.shuffle.consolidateFiles间景,該參數(shù)默認(rèn)置為false,將其設(shè)置為true即可開(kāi)啟優(yōu)化機(jī)制艺智。
3.【HashShuffleManager源碼流程】
見(jiàn)【Spark】HashShuffleManager
2.2 基于排序的shuffle寫操作
1.【HashShuffle的不足】:
<1> 過(guò)程中創(chuàng)建的文件數(shù)是M*R拱燃,M是當(dāng)前Shuffle Map Task的任務(wù)數(shù),而R是后續(xù)任務(wù)的任務(wù)數(shù)力惯。
如M和R都為1000,最終會(huì)產(chǎn)生1M個(gè)文件召嘶,對(duì)于文件系統(tǒng)負(fù)擔(dān)非常大父晶,同時(shí)在shuffle數(shù)據(jù)量不大而文件數(shù)量很多的情況下,隨機(jī)寫會(huì)降低I/O性能弄跌。
<2> 寫文件時(shí)甲喝,每一個(gè)Writer Handler默認(rèn)要100KB內(nèi)存,雖然Shuffle寫是分時(shí)運(yùn)行的铛只,其內(nèi)存所需是C * F *100KB埠胖,其中C為Spark集群中運(yùn)行的核數(shù),F(xiàn)為后續(xù)任務(wù)數(shù)(即Reduce數(shù)目)淳玩,緩存占用內(nèi)存也不小
2.【原理:】
區(qū)別:
每個(gè)Shuffle Map Task不會(huì)為后續(xù)的每個(gè)Task創(chuàng)建單獨(dú)的文件直撤,而是會(huì)將所有的結(jié)果寫在同一個(gè)文件中,對(duì)應(yīng)的生成一個(gè)Index文件進(jìn)行索引蜕着。
最終臨時(shí)文件的數(shù)量就是2 * reduce task num
運(yùn)行機(jī)制:
a.普通運(yùn)行機(jī)制 b.bypass運(yùn)行機(jī)制
當(dāng)shuffle read task的數(shù)量小于等于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值時(shí)(默認(rèn)為200)谋竖,就會(huì)啟用bypass機(jī)制
https://www.cnblogs.com/itboys/p/9201750.html
3.【BypassMergeSortShuffleWriter】
BypassMergeSortShuffleWriter:
未定義aggregator并且沒(méi)有開(kāi)啟mapside combine红柱,分區(qū)數(shù)小于spark.shuffle.sort.bypassMergeThreshold指定的分區(qū)數(shù)(默認(rèn)200),則直接每個(gè)reduce分區(qū)寫一個(gè)文件蓖乘,然后再合并锤悄,避免為溢寫文件合并而兩次序列化和反序列化。
源碼流程:
(1)前幾步與HashShuffleWriter一致嘉抒,獲取rdd與dependency零聚,并根據(jù)SparkEnv獲取對(duì)應(yīng)的ShuffleManager
(2)其中manager.getWriter方法的第一個(gè)參數(shù) dep.shuffleHandle
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
registerShuffle方法的具體實(shí)現(xiàn)
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
// need map-side aggregation, then write numPartitions files directly and just concatenate
// them at the end. This avoids doing serialization and deserialization twice to merge
// together the spilled files, which would happen with the normal code path. The downside is
// having multiple files open at a time and thus more memory allocated to buffers.
new BypassMergeSortShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
new SerializedShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
// Otherwise, buffer map outputs in a deserialized form:
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
}
其中使用BypassMergeSortShuffleHandle的條件:
a、未開(kāi)啟mapSideCombine
b些侍、分區(qū)數(shù)小于
["spark.shuffle.sort.bypassMergeThreshold]
因?yàn)槎x了map端的合并隶症,無(wú)法避免要進(jìn)行排序;
如果分區(qū)數(shù)小于線程數(shù)娩梨,就無(wú)需排序沿腰,假如大于的話也不能忽略排序。
(3)BypassMergeSortShuffleWriter的write方法
a狈定、先根據(jù)分區(qū)數(shù)初始化partitionWriters數(shù)組
b颂龙、將數(shù)據(jù)迭代寫入
key的hash值,對(duì)partition的數(shù)目取余數(shù)
寫完提交關(guān)閉
c纽什、合并文件措嵌,寫DataFile和indexFile,同時(shí)更新MapStatus作為結(jié)果返回芦缰。
partitionLengths:long型數(shù)組企巢,表示每個(gè)分區(qū)的數(shù)據(jù)長(zhǎng)度,用于在indexFile中让蕾,表示每個(gè)分區(qū)的開(kāi)始的索引浪规,知道索引后,可以去dataFile中獲取數(shù)據(jù)
4.【SortShuffleWriter】
與ByPass機(jī)制的不同:
并不會(huì)像bypass機(jī)制一樣探孝,直接寫很多文件(按照每個(gè)Map Task對(duì)應(yīng)分區(qū)數(shù)寫對(duì)應(yīng)數(shù)量的文件)再進(jìn)行合并笋婿,而是采用了溢寫磁盤的方式產(chǎn)生臨時(shí)文件,并進(jìn)行合并顿颅。
源碼流程:
(1)前幾步與bypass一致缸濒,直到調(diào)用registerShuflle方法返回ShuffleHandle實(shí)例的時(shí)候:
在不滿足BypassMergeSortShuffleHandle和SerializedShuffleHandle的時(shí)候
根據(jù)BaseShuffleHandle匹配,返回ShuffleWriter為
(2)根據(jù)獲取到的rdd和dependecny粱腻,是否有mapSideCombine庇配,是否定義aggregator,來(lái)決定構(gòu)建不同的ExternalSorter(第二種情況中绍些,并不會(huì)在意每個(gè)分區(qū)中是否根據(jù)key進(jìn)行排序捞慌,如果是sortByKey算子也是會(huì)在reduce端進(jìn)行排序。)
ExternalSorter類的注釋上有這么一段:
需要在map端combine柬批,則使用PartitionedAppendOnlyMap寫內(nèi)存數(shù)據(jù)buffer卿闹;否則的話使用PartitionedPairBuffer揭糕。
(3)調(diào)用sorter的insertAll方法,將數(shù)據(jù)先寫入到buffer中锻霎,有需要的話再溢寫到磁盤中著角。
(4)insertAll方法中,
def insertAll(records: Iterator[Product2[K, V]]): Unit = {
// TODO: stop combining if we find that the reduction factor isn't high
val shouldCombine = aggregator.isDefined
if (shouldCombine) {
// Combine values in-memory first using our AppendOnlyMap
val mergeValue = aggregator.get.mergeValue
val createCombiner = aggregator.get.createCombiner
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (records.hasNext) {
addElementsRead()
kv = records.next()
map.changeValue((getPartition(kv._1), kv._1), update)
maybeSpillCollection(usingMap = true)
}
} else {
// Stick values into our buffer
while (records.hasNext) {
addElementsRead()
val kv = records.next()
buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
maybeSpillCollection(usingMap = false)
}
}
}
首先根據(jù)是否定義aggregator旋恼,決定寫內(nèi)存數(shù)據(jù)的buffer類型吏口,將數(shù)據(jù)迭代寫入,同時(shí)會(huì)判斷是否需要溢寫到磁盤上maybeSpillCollection冰更,溢寫條件判斷在maybeSpill方法中产徊,
最終的條件就是當(dāng)前的內(nèi)存占用大于等于初始化的內(nèi)存閾值[spark.shuffle.spill.initialMemoryThreshold],默認(rèn)值是5 * 1024 * 1024或者當(dāng)緩存的條數(shù)達(dá)到一定的量就進(jìn)行溢寫
maybeSpill過(guò)程描述:每寫入32條數(shù)據(jù)檢查1次蜀细,向內(nèi)存管理器申請(qǐng)執(zhí)行內(nèi)存(granted代表內(nèi)存真正分配的)舟铜,如果內(nèi)存真正占用超過(guò)了新的閾值,那么就進(jìn)行溢寫奠衔。
(5)溢寫文件調(diào)用的是ExternalSorter的spill方法
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
spills += spillFile
}
下圖方法返回分區(qū)的寫迭代器谆刨,會(huì)根據(jù)是否進(jìn)行mapSideCombine有兩種實(shí)現(xiàn)
a、PartitionedPairBuffer
的partitionedDestructiveSortedIterator归斤,返回的僅僅是基于partitionId進(jìn)行排序的一個(gè)迭代器
b痊夭、PartitionedAppendOnlyMap的
partitionedDestructiveSortedIterator返回的迭代器排序是基于(PartitionId,key)為key的排序迭代器
(6)寫入內(nèi)存buffer(或進(jìn)一步溢寫磁盤)后,合并DataFile及創(chuàng)建IndexFile
過(guò)程分析:
構(gòu)建blockId脏里,并調(diào)用writePartitionedFile進(jìn)入將數(shù)據(jù)寫入磁盤的動(dòng)作
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
writePartitionedFile:
a她我、如果沒(méi)有溢寫文件,無(wú)需合并的步驟直接構(gòu)建迭代器迫横,根據(jù)分區(qū)按照文件塊寫文件番舆。構(gòu)建迭代器的時(shí)候跟spill里的方式一樣
b、如果有溢寫文件矾踱,其迭代器 this.partitionedIterator 會(huì)根據(jù)是否有溢寫合蔽,是否需要排序分了三種情況:
def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
val usingMap = aggregator.isDefined
val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
if (spills.isEmpty) {
// Special case: if we have only in-memory data, we don't need to merge streams, and perhaps
// we don't even need to sort by anything other than partition ID
if (!ordering.isDefined) {
// The user hasn't requested sorted keys, so only sort by partition ID, not key
groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
} else {
// We do need to sort by both partition ID and key
groupByPartition(destructiveIterator(
collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
}
} else {
// Merge spilled and in-memory data
merge(spills, destructiveIterator(
collection.partitionedDestructiveSortedIterator(comparator)))
}
}
其中第三種情況,有溢寫的話介返,需要合并溢寫數(shù)據(jù)和內(nèi)存數(shù)據(jù):
定義了aggregator,要進(jìn)行基于aggregator的合并沃斤;
沒(méi)有定義aggregator圣蝎,但是有順序(如sortByKey),需要對(duì)數(shù)據(jù)排序但不需要合并merge衡瓶;
既沒(méi)有aggregator徘公,也沒(méi)有順序,直接返回哮针。
private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
: Iterator[(Int, Iterator[Product2[K, C]])] = {
val readers = spills.map(new SpillReader(_))
val inMemBuffered = inMemory.buffered
(0 until numPartitions).iterator.map { p =>
val inMemIterator = new IteratorForPartition(p, inMemBuffered)
val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
if (aggregator.isDefined) {
// Perform partial aggregation across partitions
(p, mergeWithAggregation(
iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
} else if (ordering.isDefined) {
// No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);
// sort the elements without trying to merge them
(p, mergeSort(iterators, ordering.get))
} else {
(p, iterators.iterator.flatten)
}
}
}
最后关面,遍歷每個(gè)MapTask的所有數(shù)據(jù)迭代器坦袍,將數(shù)據(jù)寫入數(shù)據(jù)文件;接著寫索引文件和構(gòu)建MapStatus等太。
5.【UnsafeShuffleWriter】"鎢絲計(jì)劃"
執(zhí)行過(guò)程圖與SortShuffleWriter基本一致捂齐,也是先寫內(nèi)存buffer,再溢寫到磁盤缩抡。
源碼流程:
(1)前幾步與bypass一致奠宜,直到調(diào)用SortShuffleManager.registerShuflle方法返回ShuffleHandle實(shí)例的時(shí)候:
在排除不是bypassMergeSortShuffleHandle的基礎(chǔ)上,滿足 1.對(duì)象需要支持序列化瞻想,2.不能定義aggregator压真,3.partition數(shù)目小于等于[MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE] (16777216個(gè))
注意:不能大于該數(shù)目的原因是
根據(jù)ShuffleHandle類型,返回UnsafeShuffleWriter實(shí)現(xiàn)類蘑险。
(2)UnsafeShuffleWriter的write方法
boolean success = false;
try {
while (records.hasNext()) {
insertRecordIntoSorter(records.next());
}
closeAndWriteOutput();
success = true;
} finally {
if (sorter != null) {
try {
sorter.cleanupResources();
} catch (Exception e) {
// Only throw this error if we won't be masking another
// error.
if (success) {
throw e;
} else {
logger.error("In addition to a failure during writing, we failed during " +
"cleanup.", e);
}
}
}
}
其中insertRecordIntoSorter方法滴肿,
定義一個(gè)會(huì)清空的serBuffer,大小為1024*1024(就是1M)佃迄,將數(shù)據(jù)寫入到序列化流中并且刷新泼差。
最后的sorter.insertRecord,該方法內(nèi)會(huì)判斷是否需要溢寫文件:
內(nèi)存中的record數(shù)目和屎,是否大于設(shè)定的[numElementsForSpillThreshold]拴驮,由屬性 spark.shuffle.spill.numElementsForceSpillThreshold指定
public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId)
throws IOException {
// for tests
assert(inMemSorter != null);
if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
logger.info("Spilling data because number of spilledRecords crossed the threshold " +
numElementsForSpillThreshold);
// 需要溢寫操作,調(diào)用spill函數(shù)
spill();
}
// 判斷是否需要將數(shù)組增大柴信,以便接受數(shù)據(jù)套啤,如果空間不足,則觸發(fā)spill
growPointerArrayIfNecessary();
// Need 4 bytes to store the record length.
final int required = length + 4;
// 如果空間不足随常,就重新申請(qǐng)新的page潜沦,假如資源不足觸發(fā)spill
// 已經(jīng)根據(jù)申請(qǐng)的字節(jié)數(shù),修改了currentPage和pageCursor的值
acquireNewPageIfNecessary(required);
assert(currentPage != null);
/**
* 獲取內(nèi)存地址绪氛,通過(guò)一個(gè)內(nèi)存地址(off-heap)或者一個(gè)對(duì)象引用的偏移(on-heap)追蹤
* 尋址方式對(duì)比:
* on-heap:先找到對(duì)象唆鸡,然后再找索引
* off-heap:根據(jù)地址找到索引
*/
final Object base = currentPage.getBaseObject();
final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
Platform.putInt(base, pageCursor, length);
pageCursor += 4;
Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
pageCursor += length;
inMemSorter.insertRecord(recordAddress, partitionId);
}
on-heap和off-heap兩種模式是由下圖決定:
滿足的條件為:
(3)再分析下spill方法,調(diào)用的是自身ShuffleExternalSorter的spill方法枣察,其中進(jìn)一步調(diào)用的是writeSortedFile方法争占。
(方法太復(fù)雜,會(huì)另寫文章介紹具體過(guò)程)
(4)在writeSortedFile方法中序目,會(huì)先進(jìn)行排序臂痕,再將數(shù)據(jù)寫入磁盤
(5)不需要溢寫,會(huì)先判斷資源是否寫一條記錄猿涨,不足夠時(shí)會(huì)先進(jìn)行資源擴(kuò)展握童,然后將數(shù)據(jù)寫到內(nèi)存里面。如果資源不夠會(huì)觸發(fā)溢寫操作叛赚。
(6)在合并文件時(shí)澡绩,先將內(nèi)存殘余刷寫到磁盤中稽揭,接著進(jìn)行文件合并。
spill聚合文件的時(shí)候分為高效和低效兩種方式肥卡,spark.shuffle.unsafe.fastMergeEnabled為true時(shí)溪掀,并且沒(méi)有開(kāi)啟壓縮或者壓縮方式為snappy|LZF,可以采用非常高效的合并transferTo召调;否則只能使用fileStream-based膨桥。
(7)最后產(chǎn)生indexFile并且更新mapstatus