spark 為什么要集成 gluten
- 隨著 spark 版本的迭代首装,spark 的 benchmark 的提升逐漸趨于平緩
- 隨著硬件技術(shù)的發(fā)展仙逻,網(wǎng)絡(luò)磁盤(pán)都有較大的提升但是 cpu 并沒(méi)有很大的突破涧尿,cpu 可能會(huì)成為瓶頸
- 在 java 中 cpu 的計(jì)算集中度不夠高(cpu 用來(lái)純計(jì)算的時(shí)間占比較少)姑廉,沒(méi)有充分利用 cpu 的并行計(jì)算能力
- java 直接操作更高級(jí)的可并行化處理數(shù)據(jù)的 cpu 指令集困難
cpu 數(shù)據(jù)并行處理 & 向量化
并行處理
- 舉例說(shuō)明:
- 假如要對(duì)兩個(gè)數(shù)組的相同下標(biāo)的元素相加
- 不管寄存器有多長(zhǎng)對(duì)于兩個(gè)整型數(shù)據(jù)相加他的長(zhǎng)度只會(huì)占用前面的部分 bit (32個(gè)bit) 浪費(fèi)了后面的bit 長(zhǎng)度所以可以進(jìn)行優(yōu)化
- 將多個(gè)同質(zhì)的指令計(jì)算需求壓到同一個(gè)寄存器里面用一組指令來(lái)完成
向量化計(jì)算的好處
- 感覺(jué)向量化的前提就是列計(jì)算偎蘸。需要把同質(zhì)化的計(jì)算放一塊
- 減少虛函數(shù)的調(diào)用(java 面向接口編程迷雪,在計(jì)算時(shí)需要找到真正的實(shí)現(xiàn)方法),使得 cpu 的時(shí)間能夠更加專(zhuān)注在 計(jì)算這一塊
- 比如 spark sql 都會(huì)通過(guò) codegen 的方式去將 sql 邏輯代碼化倦西,而 codegen 出來(lái)的代碼就是一個(gè)個(gè)不一樣的 class. 沒(méi)有繼承多態(tài)等扰柠。這可能就是為啥要 codegen 而不再抽象公共類(lèi)的原因吧
- 能夠用上 cpu SIMD 指令集等技術(shù)對(duì)數(shù)據(jù)進(jìn)行并行處理
- 因?yàn)檩斎胧橇袛?shù)據(jù)所以會(huì)比較集中疼约,cpu cache 的數(shù)據(jù)可能都是要計(jì)算的列的數(shù)據(jù)程剥,相比行的數(shù)據(jù),空間更節(jié)省舔腾,所以可以 cache 更多的列數(shù)據(jù)搂擦。cache 的數(shù)據(jù)可能就是下一步計(jì)算需要的瀑踢,所以能夠提供 cpu 的緩存命中率。
note: 由于對(duì)硬件了解不多普办,理解可能也不對(duì)
gluten 與 spark 結(jié)合的簡(jiǎn)單實(shí)現(xiàn)
- 將 Spark 物理計(jì)劃轉(zhuǎn)換為 Substrate plain徘钥,然后 Substrait plain 通過(guò)JNI調(diào)用傳遞給 native。
- Substrait 是一種可以跨平臺(tái)將一種 plan 轉(zhuǎn)化成另一種 plan 的通用轉(zhuǎn)化格式舆驶。(gluten 作為 spark 和 native 向量化執(zhí)行引擎的中間層沙廉,為了方便拓展不同的 native 向量化執(zhí)行引擎所以采用了 Substrait)
- 在 native 層,構(gòu)建 native 算子操作鏈然后在 native 引擎里面去執(zhí)行珊皿。
- native 層與 spark 數(shù)據(jù)交互以 spark ColumnarBatch 的形式進(jìn)行交互
- 交互的數(shù)據(jù)格式可能會(huì)涉及到 apache arrow
- arrow 是一種通用的數(shù)據(jù)共享的數(shù)據(jù)結(jié)構(gòu)从绘,它再內(nèi)存中的格式就是序列化后的格式意推,傳輸過(guò)程不需要序列化和反序列化
-
由于 native 引擎對(duì)數(shù)據(jù)格式远寸,udf,算子等的不支持導(dǎo)致 stage 沒(méi)辦法在 native 中執(zhí)行肆资。而一個(gè) spark sql 作業(yè)中會(huì)根據(jù)寬依賴劃分有不同的 stage, 所以stage 間的 shuffle write/shuffle read 會(huì)遇到如下情況灶芝,針對(duì)這些場(chǎng)景是怎么實(shí)現(xiàn)的呢监署?
47b2570df007adb30f97b3a3f95a6a6.png
spark 集成 gluten 的 shuffle 實(shí)現(xiàn)
看 gluten shuffleManager 之前先看基于 spark 框架 shuffleManager 的主要作用
spark shuffleManager
- org.apache.spark.shuffle.ShuffleManager 是為 shuffle systems 提供的一種插件試的接口
- 接口核心有 3 個(gè)方法
registerShuffle钠乏、getWriter春塌、getReader - 在構(gòu)建 ShuffleDependency 時(shí)會(huì)調(diào)用 shuffle manager 的注冊(cè)方法只壳,注冊(cè)返回一個(gè) ShuffleHandle 之后 ShuffleDependency 會(huì)持有 ShuffleHandle 的屬性
- 在 shuffler write 階段會(huì)調(diào)用 manager.getWriter 并且會(huì)把這個(gè) ShuffleHandle 句柄加進(jìn)去去創(chuàng)建 shuffler write
shuffler write 會(huì)返回一個(gè) MapStatus ,這個(gè)在 spark3 的 AQE 里面會(huì)用到去動(dòng)態(tài)優(yōu)化 stage 的執(zhí)行計(jì)劃 - 在 shuffler read 階段會(huì)調(diào)用 manager.getReader 并且會(huì)把這個(gè) ShuffleHandle 句柄加進(jìn)去去創(chuàng)建 shuffler read
spark shuffler writer
- 在 spark diriver 會(huì)對(duì) stage 轉(zhuǎn)成成一組 task (ShuffleMapTask 或者 ResultTask) 給到 excutor 去執(zhí)行锅必,當(dāng) ShuffleMapTask 在 excutor 端調(diào)用起來(lái)后會(huì)執(zhí)行 runTask 方法搞隐,runTask 方法在最后會(huì) 調(diào)用 shuffleDependency.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition)
- 其實(shí)這都只是把鏈路打通了一切都是 lazy 都是流式的。 這個(gè)抽象的 rdd 一直 next . 這里可能會(huì)有疑問(wèn) 為什么到了 excutor 端了還他媽的是 rdd, 不應(yīng)該是一個(gè)具體的數(shù)據(jù)集了嗎逢捺? 沒(méi)錯(cuò)他還傳了個(gè) partition癞季,所以只會(huì)去計(jì)算這個(gè) rdd 的某個(gè)分區(qū)然后狗日的一直 next 進(jìn)行 shuffle 輸出绷柒。
- 那這個(gè)迭代器是怎么來(lái)的呢,就是 rdd 的compute 方法構(gòu)造出來(lái)的恨憎, 構(gòu)造邏輯就是 shufflerReader.read() 經(jīng)過(guò)某種 tramsform(比如執(zhí)行 codegen 的代碼)
- 總之就是從 shuffle read 里面讀數(shù)據(jù)執(zhí)行計(jì)算邏輯就得到了這個(gè)計(jì)算鏈路憔恳,鏈路是 lazy 的 净蚤。直到 write 調(diào)用 next 方法觸發(fā)鏈路數(shù)據(jù)流的計(jì)算
gluten shuffleManager 實(shí)現(xiàn)
shuffleManager 是 spark 框架的而 native 的執(zhí)行還是遵循這套框架, shuffle write /shuffe read 還是要走 spark 的框架走的 JVM 所以先實(shí)現(xiàn) ShuffleManager 接口再看數(shù)據(jù)在 JVM 和 native 層的流轉(zhuǎn)
- 實(shí)現(xiàn) org.apache.spark.shuffle.ShuffleManager 接口 的 registerShuffle今瀑、getWriter、getReader 方法
registerShuffle:
- spark 構(gòu)建ShuffleDependency 時(shí) -> 調(diào)用 registerShuffle
- registerShuffle 方法實(shí)現(xiàn)會(huì)判斷 ShuffleDependency 是否為 gluten 的 ColumnarShuffleDependency 如果是構(gòu)建出一個(gè) ColumnarShuffleHandle 返回屿附,如果不是就走原生 shuffleManager 的邏輯挺份,去創(chuàng)建 spark 自身的 BypassMergeSortShuffleHandle 或者 SerializedShuffleHandle 或者 BaseShuffleHandle
- ColumnarShuffleDependency 什么時(shí)候創(chuàng)建與spark ShuffleDependency 有什么不一樣贮懈?
- ColumnarShuffleDependency 的創(chuàng)建大概邏輯如下
1. gluten-core: org.apache.gluten.extension.columnar.TransformExchange
1.1. 如果 gluten 開(kāi)關(guān)開(kāi)啟會(huì)調(diào)用 BackendsApiManager.getSparkPlanExecApiInstance.genColumnarShuffleExchange(plan, child)
把 SparkPlan(ShuffleExchangeExec朵你, BroadcastExchangeExec) 進(jìn)行轉(zhuǎn)化
2. backends-velox: org.apache.gluten.backendsapi.velox.SparkPlanExecApiImpl#genColumnarShuffleExchange
2.1 創(chuàng)建 ColumnarShuffleExchangeExec
2.3 ColumnarShuffleExchangeExec extends ShuffleExchangeLike with GlutenPlan
3. 什么時(shí)候創(chuàng)建的 ColumnarShuffleExchangeExec
gluten-core: org.apache.spark.sql.execution.ColumnarShuffleExchangeExec#columnarShuffleDependency
gluten-core: org.apache.spark.sql.execution.ColumnarShuffleExchangeExec#prepareShuffleDependency
backends-velox: org.apache.gluten.backendsapi.velox.SparkPlanExecApiImpl#genShuffleDependency
gluten-data: org.apache.spark.sql.execution.utils.ExecUtil#genShuffleDependency
創(chuàng)建出了 ColumnarShuffleDependency
總結(jié)就是 sparkPlan 會(huì)轉(zhuǎn)化成 glutenPlan 時(shí)會(huì)創(chuàng)建出 ColumnarShuffleDependency
ColumnarShuffleDependency 與 spark ShuffleDependency 有什么不一樣抡医?
ColumnarShuffleDependency 針對(duì) shuffle 數(shù)據(jù)會(huì)有特別的序列化器,只用在讀取 shuffle 數(shù)據(jù)序列化器對(duì) shuflle 數(shù)據(jù)進(jìn)行反序列化大脉。 不需要序列化因?yàn)閿?shù)據(jù)是從 native 里面吐出來(lái)的箱靴,內(nèi)部應(yīng)該還是字節(jié)數(shù)組的形式
關(guān)于序列化器的創(chuàng)建和解釋
在構(gòu)建 shufflerDependcy 時(shí)會(huì)創(chuàng)建 反序列化器
1. gluten-core: org.apache.spark.sql.execution.ColumnarShuffleExchangeExec#serializer
1.1 val serializer: Serializer = BackendsApiManager.getSparkPlanExecApiInstance
.createColumnarBatchSerializer(schema, metrics)
1.2 默認(rèn)場(chǎng)景
org.apache.gluten.vectorized.ColumnarBatchSerializer
rss 是 Celeborn 場(chǎng)景:
org.apache.spark.shuffle.CelebornColumnarBatchSerializer
2. org.apache.gluten.vectorized.ColumnarBatchSerializer 實(shí)現(xiàn)
2.1. 實(shí)際走的 ColumnarBatchSerializerInstance
2.2. 構(gòu)造 ColumnarBatchSerializerInstance 時(shí)會(huì)初始化一個(gè) Long 類(lèi)型的 shuffleReaderHandle
2.3. 這個(gè)東西 shuffleReaderHandle 有點(diǎn)抽象衡怀,他是 JVM 層和 native 數(shù)據(jù)轉(zhuǎn)化的重要媒介,native 吐給 JVM 層一般都是一個(gè) long 的數(shù)字(怎么理解呢够委? 內(nèi)存地址 or 偏移量怖现?)屈嗤,如果需要 JVM 層可以通過(guò)這個(gè)數(shù)字調(diào)用 native 方法返回一批數(shù)據(jù)。
2.4 反序列方法 deserializeStream(in: InputStream) 輸入流是 java 的輸入流
也就是 shuffle read 流
2.5 反序列化具體實(shí)現(xiàn):
2.5.1 private val byteIn: JniByteInputStream = JniByteInputStreams.create(in)
把 java 的數(shù)據(jù)流直接寫(xiě)入 native 的堆外內(nèi)存
2.5.2. 然后在 JVM 層提供一個(gè)對(duì) native 內(nèi)存迭代訪問(wèn)的包裝器
private val wrappedOut: GeneralOutIterator = new ColumnarBatchOutIterator(
Runtimes.contextInstance(),
ShuffleReaderJniWrapper
.create()
.readStream(shuffleReaderHandle, byteIn),
nmm)
2.5.3. 迭代器的 next 方法
public ColumnarBatch nextInternal() throws IOException {
// JVM 的next 實(shí)際是調(diào)用 native 的 next native 的next 返回一個(gè) long
long batchHandle = nativeNext(iterHandle);
if (batchHandle == -1L) {
return null; // stream ended
}
// long 會(huì)映射到一塊內(nèi)存直接包裝成 ColumnarBatches铁追,
// 這過(guò)程不涉及到數(shù)據(jù)的拷貝讀取琅束,數(shù)據(jù)還是在堆外 和 native 層還是一塊內(nèi)存
// 所以針對(duì) JVM 讀取 native 數(shù)據(jù)是輕量的
return ColumnarBatches.create(runtime, batchHandle);
}
2.6. 總之如果下游 stage 能夠放入 native 中執(zhí)行算谈,那么他就會(huì)通過(guò) shuffle reader 把 JVM 的輸入流通過(guò) JNI 的方式寫(xiě)入native 的堆外內(nèi)存然眼,然后在 JVM 層提供一個(gè)對(duì) native 數(shù)據(jù)的引用的輕量的迭代器進(jìn)行包裝
2.7. 如果 上游 stage 數(shù)據(jù)是 在 JVM 中執(zhí)行計(jì)算的罪治,那么還會(huì)有一個(gè) 行轉(zhuǎn)列的過(guò)程礁蔗,把行的數(shù)據(jù)轉(zhuǎn)成 ColumnBatch 再喂給 native
getShufflerWriter:
- 會(huì)根據(jù) shuffle Hander 去判斷是否為 glutlen 的 ColumnarShuffleHandle, 如果是則創(chuàng)建 gluten 的 writer, 否則走原生的邏輯 創(chuàng)建 BypassMergeSortShuffleWriter 或者 UnsafeShuffleWriter 或者 SortShuffleWriter
- 與原生 writer 的不一樣的地方
2.1. 最終用的是 org.apache.spark.shuffle.ColumnarShuffleWriter
2.2. shuffler write 的東西是 rdd compute 出來(lái)的數(shù)據(jù)我們看看 在gluten 里面有哪些rdd
org.apache.gluten.execution.BroadcastBuildSideRDD
org.apache.gluten.execution.VeloxBroadcastBuildSideRDD
org.apache.spark.sql.execution.VeloxColumnarWriteFilesRDD
org.apache.spark.sql.execution.ShuffledColumnarBatchRDD
2.3. 這些 rdd 返回的類(lèi)型都是 ColumnBatch, 也就是 c++ 吐出來(lái)的就是 ColumnBatch
2.4. ColumnBatch 里面裝的是 ColumnVector 集合 的第 0 個(gè)ColumnVector 是 IndicatorVector 之后的 ColumnVector 都是 PlaceholderVector
IndicatorVector 會(huì)有這個(gè) ColumnBatch 的 long 類(lèi)型的 Handle,還有一個(gè) runtime (JNI相關(guān)的) 可以根據(jù) 這個(gè) handle 和 runtime 去獲取 真實(shí)的數(shù)據(jù)
這種需要通過(guò) JNI 的方式把 native 數(shù)據(jù) load 到 JVM 里面的場(chǎng)景應(yīng)該是上游是在 native 中進(jìn)行計(jì)算,而下游必須在 JVM 里面進(jìn)行所以需要一次列轉(zhuǎn)行徒坡。
2.5. 最終數(shù)據(jù)在 native 層進(jìn)行數(shù)據(jù)溢寫(xiě)落盤(pán)之類(lèi)的喇完,對(duì)于 rss 場(chǎng)景他需要
在創(chuàng)建 shufflerWriter 時(shí)實(shí)例化一個(gè) pusher 傳入 JNI 里面剥啤,然后 JNI 會(huì)把 調(diào)用 pusher 實(shí)例把數(shù)據(jù) 突出來(lái),寫(xiě)入 rss. 此時(shí)數(shù)據(jù)是二進(jìn)制的刻诊。
getShufflerReader:
- 同 getShufflerWriter 一樣也會(huì)去判斷是否為 glutlen 的 ColumnarShuffleHandle则涯,如果是則創(chuàng)建 spark 的 BlockStoreShuffleReader冲簿,如果不是 ColumnarShuffleHandle 他也是會(huì)創(chuàng)建 spark 的 BlockStoreShuffleReader
- 不同點(diǎn)是 gluten 的 shuffler Reader 會(huì)傳入一個(gè) SerializerManager峦剔。 這與 spark 原生的不一樣的地方
- 他不會(huì)對(duì)輸入流加密和壓縮。
- 會(huì)有特有的反序列實(shí)現(xiàn) 這個(gè)在registerShuffle 環(huán)節(jié)已經(jīng)介紹了
針對(duì)最開(kāi)始的 3 個(gè)場(chǎng)景的總結(jié)
純屬自己的理解不一定對(duì)
- 場(chǎng)景1. shuffle write 是 native shuffle read 是 jvm
中間會(huì)經(jīng)過(guò)一次列轉(zhuǎn)行羊异。 native 該怎寫(xiě)還是怎么寫(xiě) - 場(chǎng)景2. shuffle write 是 jvm shuffle read 是 native
中間會(huì)經(jīng)過(guò)一次行轉(zhuǎn)列事秀, native 改怎么讀就怎讀 - 場(chǎng)景3. shuffle write 是 native shuffle read 是 native
按照 native 的方式讀寫(xiě)