Tungsten-sort 算不得一個全新的shuffle 方案攒霹,它在特定場景下基于類似現(xiàn)有的Sort Based Shuffle處理流程嗽冒,對內存/CPU/Cache使用做了非常大的優(yōu)化逛裤。帶來高效的同時,也就限定了自己的使用場景。如果Tungsten-sort 發(fā)現(xiàn)自己無法處理,則會自動使用 Sort Based Shuffle進行處理萤晴。
前言
看這篇文章前吐句,建議你先簡單看看Spark Sort Based Shuffle內存分析。
Tungsten 中文是鎢絲
的意思店读。 Tungsten Project 是 Databricks 公司提出的對Spark優(yōu)化內存和CPU使用的計劃嗦枢,該計劃初期似乎對Spark SQL優(yōu)化的最多。不過部分RDD API 還有Shuffle也因此受益屯断。
簡述
Tungsten-sort優(yōu)化點主要在三個方面:
- 直接在serialized binary data上sort而不是java objects文虏,減少了memory的開銷和GC的overhead。
- 提供cache-efficient sorter殖演,使用一個8bytes的指針氧秘,把排序轉化成了一個指針數(shù)組的排序。
- spill的merge過程也無需反序列化即可完成
這些優(yōu)化的實現(xiàn)導致引入了一個新的內存管理模型趴久,類似OS的Page丸相,對應的實際數(shù)據(jù)結構為MemoryBlock
,支持off-heap 以及 in-heap 兩種模式。為了能夠對Record 在這些MemoryBlock進行定位朋鞍,引入了Pointer(指針)的概念。
如果你還記得Sort Based Shuffle里存儲數(shù)據(jù)的對象PartitionedAppendOnlyMap
,這是一個放在JVM heap里普通對象妥箕,在Tungsten-sort中滥酥,他被替換成了類似操作系統(tǒng)內存頁的對象。如果你無法申請到新的Page,這個時候就要執(zhí)行spill操作畦幢,也就是寫入到磁盤的操作坎吻。具體觸發(fā)條件,和Sort Based Shuffle 也是類似的宇葱。
開啟條件
Spark 默認開啟的是Sort Based Shuffle,想要打開Tungsten-sort ,請設置
spark.shuffle.manager=tungsten-sort
對應的實現(xiàn)類是:
org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
名字的來源是因為使用了大量JDK Sun Unsafe API瘦真。
當且僅當下面條件都滿足時,才會使用新的Shuffle方式:
- Shuffle dependency 不能帶有aggregation 或者輸出需要排序
- Shuffle 的序列化器需要是 KryoSerializer 或者 Spark SQL's 自定義的一些序列化方式.
- Shuffle 文件的數(shù)量不能大于 16777216
- 序列化時黍瞧,單條記錄不能大于 128 MB
可以看到诸尽,能使用的條件還是挺苛刻的。
這些限制來源于哪里
參看如下代碼印颤,page的大心:
this.pageSizeBytes = (int) Math.min(
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES,
shuffleMemoryManager.pageSizeBytes());
這就保證了頁大小不超過PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES
的值,該值就被定義成了128M年局。
而產生這個限制的具體設計原因际看,我們還要仔細分析下Tungsten的內存模型:
這張圖其實畫的是 on-heap 的內存邏輯圖,其中 #Page 部分為13bit, Offset 為51bit,你會發(fā)現(xiàn) 2^51 >>128M的矢否。但是在Shuffle的過程中仲闽,對51bit 做了壓縮,使用了27bit,具體如下:
[24 bit partition number][13 bit memory page number][27 bit offset in page]
這里預留出的24bit給了partition number,為了后面的排序用僵朗。上面的好幾個限制其實都是因為這個指針引起的:
- 一個是partition 的限制赖欣,前面的數(shù)字
16777216
就是來源于partition number 使用24bit 表示的屑彻。 - 第二個是page number
- 第三個是偏移量,最大能表示到2^27=128M畏鼓。那一個task 能管理到的內存是受限于這個指針的酱酬,最多是 2^13 * 128M 也就是1TB左右。
有了這個指針云矫,我們就可以定位和管理到off-heap 或者 on-heap里的內存了膳沽。這個模型還是很漂亮的,內存管理也非常高效让禀,記得之前的預估PartitionedAppendOnlyMap
的內存是非常困難的挑社,但是通過現(xiàn)在的內存管理機制,是非逞沧幔快速并且精確的痛阻。
對于第一個限制,那是因為后續(xù)Shuffle Write的sort 部分腮敌,只對前面24bit的partiton number 進行排序阱当,key的值沒有被編碼到這個指針,所以沒辦法進行ordering
同時糜工,因為整個過程是追求不反序列化的弊添,所以不能做aggregation。
Shuffle Write
核心類:
org.apache.spark.shuffle.unsafe.UnsafeShuffleWriter
數(shù)據(jù)會通過 UnsafeShuffleExternalSorter.insertRecordIntoSorter
一條一條寫入到 serOutputStream
序列化輸出流捌木。
這里消耗內存的地方是
serBuffer = new MyByteArrayOutputStream(1024 * 1024)
默認是1M,類似于Sort Based Shuffle 中的ExternalSorter
油坝,在Tungsten Sort 對應的為UnsafeShuffleExternalSorter
,記錄序列化后就通過sorter.insertRecord
方法放到sorter里去了。
這里sorter 負責申請Page,釋放Page,判斷是否要進行spill都這個類里完成刨裆。代碼的架子其實和Sort Based 是一樣的澈圈。
(另外,值得注意的是帆啃,這張圖里進行spill操作的同時檢查內存可用而導致的Exeception 的bug 已經(jīng)在1.5.1版本被修復了,忽略那條路徑)
內存是否充足的條件依然shuffleMemoryManager
來決定瞬女,也就是所有task shuffle 申請的Page內存總和不能大于下面的值:
ExecutorHeapMemeory * 0.2 * 0.8
上面的數(shù)字可通過下面兩個配置來更改:
spark.shuffle.memoryFraction=0.2
spark.shuffle.safetyFraction=0.8
UnsafeShuffleExternalSorter 負責申請內存,并且會生成該條記錄最后的邏輯地址努潘,也就前面提到的 Pointer拆魏。
接著Record 會繼續(xù)流轉到UnsafeShuffleInMemorySorter
中,這個對象維護了一個指針數(shù)組:
private long[] pointerArray;
數(shù)組的初始大小為 4096慈俯,后續(xù)如果不夠了渤刃,則按每次兩倍大小進行擴充。
假設100萬條記錄贴膘,那么該數(shù)組大約是8M 左右卖子,所以其實還是很小的。一旦spill后該UnsafeShuffleInMemorySorter
就會被賦為null,被回收掉刑峡。
我們回過頭來看spill,其實邏輯上也異常簡單了洋闽,UnsafeShuffleInMemorySorter
會返回一個迭代器玄柠,該迭代器粒度每個元素就是一個指針,然后到根據(jù)該指針可以拿到真實的record,然后寫入到磁盤诫舅,因為這些record 在一開始進入UnsafeShuffleExternalSorter
就已經(jīng)被序列化了羽利,所以在這里就純粹變成寫字節(jié)數(shù)組了。形成的結構依然和Sort Based Shuffle 一致刊懈,一個文件里不同的partiton的數(shù)據(jù)用fileSegment來表示这弧,對應的信息存在一個index文件里。
另外寫文件的時候也需要一個 buffer :
spark.shuffle.file.buffer = 32k
另外從內存里拿到數(shù)據(jù)放到DiskWriter,這中間還要有個中轉虚汛,是通過
final byte[] writeBuffer = new byte[DISK_WRITE_BUFFER_SIZE=1024 * 1024];
來完成的匾浪,都是內存,所以很快卷哩。
Task結束前蛋辈,我們要做一次mergeSpills
操作,然后形成一個shuffle 文件将谊。這里面其實也挺復雜的冷溶,
如果開啟了
`spark.shuffle.unsafe.fastMergeEnabled=true`
并且沒有開啟
`spark.shuffle.compress=true`
或者壓縮方式為:
LZFCompressionCodec
則可以非常高效的進行合并,叫做transferTo
。不過無論是什么合并尊浓,都不需要進行反序列化逞频。
Shuffle Read
Shuffle Read 完全復用HashShuffleReader
,具體參看 Sort-Based Shuffle。
總結
我個人感覺眠砾,Tungsten-sort 實現(xiàn)了內存的自主管理虏劲,管理方式模擬了操作系統(tǒng)的方式托酸,通過Page可以使得大量的record被順序存儲在內存褒颈,整個shuffle write 排序的過程只需要對指針進行運算(二進制排序),并且無需反序列化励堡,整個過程非常高效谷丸,對于減少GC,提高內存訪問效率,提高CPU使用效率確實帶來了明顯的提升应结。