1. 彈性分布式數(shù)據(jù)集
def createInstance(factDF: DataFrame, startDate: String, endDate: String): DataFrame = {
val instanceDF = factDF
.filter(col("eventDate") > lit(startDate) && col("eventDate") <= lit(endDate))
.groupBy("dim1", "dim2", "dim3", "event_date")
.agg("sum(value) as sum_value")
instanceDF
}
pairDF.collect.foreach{
case (startDate: String, endDate: String) =>
val instance = createInstance(factDF, startDate, endDate)
val outPath = s"${rootPath}/endDate=${endDate}/startDate=${startDate}"
instance.write.parquet(outPath)
}
單機(jī)思維娩贷,factDF是一個(gè)大數(shù)據(jù)集,每次foreach都會(huì)調(diào)用createInstance導(dǎo)致這個(gè)數(shù)據(jù)集被多次掃描
RDD
RDD 的 4 大屬性可以劃分為兩類谓苟,橫向?qū)傩院涂v向?qū)傩浴F渲行瑱M向?qū)傩藻^定數(shù)據(jù)分片實(shí)體涝焙,并規(guī)定了數(shù)據(jù)分片在分布式集群中如何分布;縱向?qū)傩杂糜谠诳v深方向構(gòu)建 DAG孕暇,通過(guò)提供重構(gòu) RDD 的容錯(cuò)能力保障內(nèi)存計(jì)算的穩(wěn)定性仑撞。
preferredLocations:移動(dòng)計(jì)算不移動(dòng)數(shù)據(jù),數(shù)據(jù)在哪就在哪計(jì)算妖滔,減少IO
2. 內(nèi)存計(jì)算
為什么Spark是內(nèi)存計(jì)算的隧哮,要回答這個(gè)問(wèn)題就要回答Spark中Stage的概念的意義是什么了:在同一 Stage 內(nèi)部,所有算子融合為一個(gè)函數(shù)(捏合)铛楣,Stage 的輸出結(jié)果由這個(gè)函數(shù)一次性作用在輸入數(shù)據(jù)集而產(chǎn)生近迁。這也說(shuō)明Shuffle(上下游的分區(qū)器是否是一致的)是多么的影響性能啊
所謂內(nèi)存計(jì)算,不僅僅是指數(shù)據(jù)可以緩存在內(nèi)存中簸州,更重要的是讓我們明白了,通過(guò)計(jì)算的融合( Stage內(nèi)的流水線式計(jì)算模式)來(lái)大幅提升數(shù)據(jù)在內(nèi)存中的轉(zhuǎn)換效率歧譬,進(jìn)而從整體上提升應(yīng)用的執(zhí)行性能 深入淺出 Spark:內(nèi)存計(jì)算的由來(lái)
3. 調(diào)度系統(tǒng)
調(diào)度系統(tǒng)中的核心組件
- DAGScheduler
一是把用戶 DAG 拆分為 Stages,二是在 Stage 內(nèi)創(chuàng)建計(jì)算任務(wù) Tasks瑰步,這些任務(wù)囊括了用戶通過(guò)組合不同算子實(shí)現(xiàn)的數(shù)據(jù)轉(zhuǎn)換邏輯矢洲。然后,執(zhí)行器 Executors 接收到 Tasks缩焦,會(huì)將其中封裝的計(jì)算函數(shù)應(yīng)用于分布式數(shù)據(jù)分片读虏,去執(zhí)行分布式的計(jì)算過(guò)程。 - SchedulerBackend
是對(duì)于資源調(diào)度器的封裝與抽象袁滥,為了支持多樣的資源調(diào)度模式如 Standalone盖桥、YARN 和 Mesos,SchedulerBackend 提供了對(duì)應(yīng)的實(shí)現(xiàn)類题翻。在運(yùn)行時(shí)揩徊,Spark 根據(jù)用戶提供的 MasterURL,來(lái)決定實(shí)例化哪種實(shí)現(xiàn)類的對(duì)象。對(duì)于集群中可用的計(jì)算資源塑荒,SchedulerBackend 會(huì)用一個(gè)叫做 ExecutorDataMap 的數(shù)據(jù)結(jié)構(gòu)熄赡,來(lái)記錄每一個(gè)計(jì)算節(jié)點(diǎn)中 Executors 的資源狀態(tài)。ExecutorDataMap 是一種 HashMap齿税,它的 Key 是標(biāo)記 Executor 的字符串彼硫,Value 是一種叫做 ExecutorData 的數(shù)據(jù)結(jié)構(gòu),ExecutorData 用于封裝 Executor 的資源狀態(tài)凌箕,如 RPC 地址拧篮、主機(jī)地址、可用 CPU 核數(shù)和滿配 CPU 核數(shù)等等陌知,它相當(dāng)于是對(duì) Executor 做的“資源畫(huà)像”他托。 - TaskScheduler
左邊有需求,右邊有供給仆葡,如果把 Spark 調(diào)度系統(tǒng)看作是一個(gè)交易市場(chǎng)的話赏参,那么中間還需要有個(gè)中介來(lái)幫它們對(duì)接意愿、撮合交易沿盅,從而最大限度地提升資源配置的效率把篓。在 Spark 調(diào)度系統(tǒng)中,這個(gè)中介就是 TaskScheduler腰涧。TaskScheduler 的職責(zé)是韧掩,基于既定的規(guī)則與策略達(dá)成供需雙方的匹配與撮合。TaskScheduler 的核心是任務(wù)調(diào)度的規(guī)則和策略窖铡,TaskScheduler 的調(diào)度策略分為兩個(gè)層次疗锐,一個(gè)是不同 Stages 之間的調(diào)度優(yōu)先級(jí),一個(gè)是 Stages 內(nèi)不同任務(wù)之間的調(diào)度優(yōu)先級(jí)费彼。
對(duì)于同一個(gè) Stages 內(nèi)部不同任務(wù)之間的調(diào)度優(yōu)先級(jí)滑臊,Stages 內(nèi)部的任務(wù)調(diào)度相對(duì)來(lái)說(shuō)簡(jiǎn)單得多。當(dāng) TaskScheduler 接收到來(lái)自 SchedulerBackend 的 WorkerOffer 后箍铲,TaskScheduler 會(huì)優(yōu)先挑選那些滿足本地性級(jí)別要求的任務(wù)進(jìn)行分發(fā)雇卷。
Spark 調(diào)度系統(tǒng)的原則是盡可能地讓數(shù)據(jù)呆在原地、保持不動(dòng)颠猴,同時(shí)盡可能地把承載計(jì)算任務(wù)的代碼分發(fā)到離數(shù)據(jù)最近的地方关划,從而最大限度地降低分布式系統(tǒng)中的網(wǎng)絡(luò)開(kāi)銷
spark的資源調(diào)度和任務(wù)調(diào)度是分開(kāi)的,也就是說(shuō)翘瓮,不論你用standalone贮折、yarn還是mesos,spark在申請(qǐng)硬件資源的時(shí)候春畔,以cpu脱货、memory量化申請(qǐng)executors的過(guò)程岛都,是先于任務(wù)調(diào)度的。用你的話說(shuō)振峻,executors已經(jīng)提前申請(qǐng)好了臼疫,申請(qǐng)executors的時(shí)候,只看cpu和memory是否滿足要求扣孟,不會(huì)考慮locality這些與任務(wù)有關(guān)的細(xì)節(jié)
4. 存儲(chǔ)系統(tǒng)
Spark 存儲(chǔ)系統(tǒng)是為誰(shuí)服務(wù)
- RDD 緩存
是將 RDD 以緩存的形式物化到內(nèi)存或磁盤的過(guò)程 - Shuffle 中間文件
Shuffle 中間文件實(shí)際上就是 Shuffle Map 階段的輸出結(jié)果烫堤,這些結(jié)果會(huì)以文件的形式暫存于本地磁盤。在集群范圍內(nèi)凤价,Reducer 想要拉取屬于自己的那部分中間數(shù)據(jù)鸽斟,就必須要知道這些數(shù)據(jù)都存儲(chǔ)在哪些節(jié)點(diǎn),以及什么位置利诺。而這些關(guān)鍵的元信息富蓄,正是由 Spark 存儲(chǔ)系統(tǒng)保存并維護(hù)的。 - 廣播變量
利用存儲(chǔ)系統(tǒng)慢逾,廣播變量可以在 Executors 進(jìn)程范疇內(nèi)保存全量數(shù)據(jù)立倍。這樣一來(lái),對(duì)于同一 Executors 內(nèi)的所有計(jì)算任務(wù)侣滩,應(yīng)用就能夠以 Process local 的本地性級(jí)別口注,來(lái)共享廣播變量中攜帶的全量數(shù)據(jù)
存儲(chǔ)系統(tǒng)的基本組件
與調(diào)度系統(tǒng)類似,Spark 存儲(chǔ)系統(tǒng)是一個(gè)囊括了眾多組件的復(fù)合系統(tǒng)君珠,如 BlockManager寝志、BlockManagerMaster、MemoryStore策添、DiskStore 和 DiskBlockManager 等等
- BlockManager
在 Executors 端負(fù)責(zé)統(tǒng)一管理和協(xié)調(diào)數(shù)據(jù)的本地存取與跨節(jié)點(diǎn)傳輸材部。
對(duì)外,BlockManager 與 Driver 端的 BlockManagerMaster 通信唯竹,不僅定期向 BlockManagerMaster 匯報(bào)本地?cái)?shù)據(jù)元信息败富,還會(huì)不定時(shí)按需拉取全局?jǐn)?shù)據(jù)存儲(chǔ)狀態(tài)
對(duì)內(nèi),BlockManager 通過(guò)組合存儲(chǔ)系統(tǒng)內(nèi)部組件的功能來(lái)實(shí)現(xiàn)數(shù)據(jù)的存與取摩窃、收與發(fā) - MemoryStore
MemoryStore 用來(lái)管理數(shù)據(jù)在內(nèi)存中的存取
MemoryStore 支持對(duì)象值和字節(jié)數(shù)組,統(tǒng)一采用 MemoryEntry 數(shù)據(jù)抽象對(duì)它們進(jìn)行封裝芬骄。對(duì)象值和字節(jié)數(shù)組二者之間存在著一種博弈關(guān)系猾愿,所謂的“以空間換時(shí)間”和“以時(shí)間換空間”,兩者的取舍還要看具體的應(yīng)用場(chǎng)景账阻。 - DiskStore
DiskStore 用來(lái)管理數(shù)據(jù)在磁盤中的存取蒂秘。
利用 DiskBlockManager 維護(hù)的數(shù)據(jù)塊與磁盤文件的對(duì)應(yīng)關(guān)系,來(lái)完成字節(jié)序列與磁盤文件之間的轉(zhuǎn)換淘太。
5. 內(nèi)存管理基礎(chǔ)
內(nèi)存的管理模式
在管理方式上姻僧,Spark 會(huì)區(qū)分堆內(nèi)內(nèi)存(On-heap Memory)和堆外內(nèi)存(Off-heap Memory)规丽。這里的“堆”指的是 JVM Heap,因此堆內(nèi)內(nèi)存實(shí)際上就是 Executor JVM 的堆內(nèi)存撇贺;堆外內(nèi)存指的是通過(guò) Java Unsafe API赌莺,像 C++ 那樣直接從操作系統(tǒng)中申請(qǐng)和釋放內(nèi)存空間
其中,堆內(nèi)內(nèi)存的申請(qǐng)與釋放統(tǒng)一由 JVM 代勞松嘶。比如說(shuō)艘狭,Spark 需要內(nèi)存來(lái)實(shí)例化對(duì)象,JVM 負(fù)責(zé)從堆內(nèi)分配空間并創(chuàng)建對(duì)象翠订,然后把對(duì)象的引用返回巢音,最后由 Spark 保存引用,同時(shí)記錄內(nèi)存消耗尽超。反過(guò)來(lái)也是一樣官撼,Spark 申請(qǐng)刪除對(duì)象會(huì)同時(shí)記錄可用內(nèi)存,JVM 負(fù)責(zé)把這樣的對(duì)象標(biāo)記為“待刪除”似谁,然后再通過(guò)垃圾回收(Garbage Collection傲绣,GC)機(jī)制將對(duì)象清除并真正釋放內(nèi)存。
堆外內(nèi)存則不同棘脐,Spark 通過(guò)調(diào)用 Unsafe 的 allocateMemory 和 freeMemory 方法直接在操作系統(tǒng)內(nèi)存中申請(qǐng)斜筐、釋放內(nèi)存空間這樣的內(nèi)存管理方式自然不再需要垃圾回收機(jī)制,也就免去了它帶來(lái)的頻繁掃描和回收引入的性能開(kāi)銷蛀缝。更重要的是顷链,空間的申請(qǐng)與釋放可以精確計(jì)算,因此 Spark 對(duì)堆外可用內(nèi)存的估算會(huì)更精確屈梁,對(duì)內(nèi)存的利用率也更有把握嗤练。
內(nèi)存區(qū)域的劃分
- 堆外內(nèi)存
Spark 把堆外內(nèi)存劃分為兩塊區(qū)域:一塊用于執(zhí)行分布式任務(wù),如 Shuffle在讶、Sort 和 Aggregate 等操作煞抬,這部分內(nèi)存叫做 Execution Memory;一塊用于緩存 RDD 和廣播變量等數(shù)據(jù)构哺,它被稱為 Storage Memory革答。 - 堆內(nèi)內(nèi)存
堆內(nèi)內(nèi)存的劃分方式和堆外差不多,Spark 也會(huì)劃分出用于執(zhí)行和緩存的兩份內(nèi)存空間曙强。不僅如此残拐,Spark 在堆內(nèi)還會(huì)劃分出一片叫做 User Memory 的內(nèi)存空間,它用于存儲(chǔ)開(kāi)發(fā)者自定義數(shù)據(jù)結(jié)構(gòu)碟嘴。
除此之外溪食,Spark 在堆內(nèi)還會(huì)預(yù)留出一小部分內(nèi)存空間,叫做 Reserved Memory娜扇,它被用來(lái)存儲(chǔ)各種 Spark 內(nèi)部對(duì)象错沃,例如存儲(chǔ)系統(tǒng)中的 BlockManager栅组、DiskBlockManager 等等。
執(zhí)行與緩存內(nèi)存(枢析?玉掸??)
在 1.6 版本之后登疗,Spark 推出了統(tǒng)一內(nèi)存管理模式排截。統(tǒng)一內(nèi)存管理指的是 Execution Memory 和 Storage Memory 之間可以相互轉(zhuǎn)化
- 如果對(duì)方的內(nèi)存空間有空閑,雙方就都可以搶占辐益;
- 對(duì)于 RDD 緩存任務(wù)搶占的執(zhí)行內(nèi)存断傲,當(dāng)執(zhí)行任務(wù)有內(nèi)存需要時(shí),RDD 緩存任務(wù)必須立即歸還搶占的內(nèi)存智政,涉及的 RDD 緩存數(shù)據(jù)要么落盤认罩、要么清除;
- 對(duì)于分布式計(jì)算任務(wù)搶占的 Storage Memory 內(nèi)存空間续捂,即便 RDD 緩存任務(wù)有收回內(nèi)存的需要垦垂,也要等到任務(wù)執(zhí)行完畢才能釋放。
6. 補(bǔ)充
- 提交spark任務(wù)的模式:standalone模式牙瓢,yarn client和yarn cluster模式的區(qū)別劫拗。主要是分清楚client,master矾克,worker页慷,executor,Driver(在哪里胁附,有什么用的問(wèn)題)
- 問(wèn)題:有網(wǎng)絡(luò)傳輸是否就是發(fā)生了shuffle酒繁,Shuffle的含義就是洗牌,將數(shù)據(jù)打散控妻,父RDD一個(gè)分區(qū)中的數(shù)據(jù)如果給了子RDD的多個(gè)分區(qū)(只要存在這種可能)州袒,就是shuffle。Shuffle會(huì)有網(wǎng)絡(luò)傳輸數(shù)據(jù)弓候,但是有網(wǎng)絡(luò)傳輸郎哭,并不意味著就是shuffle。菇存,看一下具體的示例:shuffle依賴
- shuffle機(jī)制和原理彰居,有哪些shuffle類型,有什么不同的特點(diǎn)或者優(yōu)勢(shì)
- HashShuffle和Sorted-Based Shuffle的比較撰筷,為什么Sorted-Based Shuffle比HashShuffle要好,從哪些層面考慮呢
- 回答為什么說(shuō)Spark是內(nèi)存式計(jì)算畦徘,回答DAG的問(wèn)題(血緣)血統(tǒng)(DAG)
- 一個(gè)案例:如果對(duì)一個(gè)dataframe Read以后做了一堆不會(huì)觸發(fā)shuffle 的操作毕籽,最后又調(diào)用了一下coalesce(1)抬闯,然后write ,那是不是就意味著從讀數(shù)據(jù)開(kāi)始的所有操作都會(huì)在一個(gè)executor上完成关筒?
shuffle = false溶握,就像你說(shuō)的,所有操作蒸播,從一開(kāi)始睡榆,并行度都是1,都在一個(gè)executor計(jì)算袍榆,顯然胀屿,這個(gè)時(shí)候,整個(gè)作業(yè)非常慢包雀,奇慢無(wú)比
shuffle = true宿崭,這個(gè)時(shí)候,coalesce就會(huì)引入shuffle才写,切割stage葡兑。coalesce之前,用源數(shù)據(jù)DataFrame的并行度赞草,這個(gè)時(shí)候是多個(gè)Executors真正的并行計(jì)算讹堤;coalesce之后,也就是shuffle之后厨疙,并行度下降為1洲守,所有父RDD的分區(qū),全部shuffle到一個(gè)executor轰异,交給一個(gè)task去計(jì)算岖沛。 - 任務(wù)調(diào)度的時(shí)候不考慮可用內(nèi)存大小嗎?Spark在做任務(wù)調(diào)度之前搭独,SchedulerBackend封裝的調(diào)度器婴削,比如Yarn、Mesos牙肝、Standalone唉俗,實(shí)際上已經(jīng)完成了資源調(diào)度,換句話說(shuō)配椭,整個(gè)集群有多少個(gè)containers/executors虫溜,已經(jīng)是一件確定的事情了。為什么ExecutorData不存儲(chǔ)于內(nèi)存相關(guān)的信息股缸。答案是:不需要衡楞。一來(lái),TaskScheduler要達(dá)到目的敦姻,它只需知道Executors是否有空閑CPU瘾境、有幾個(gè)空閑CPU就可以了歧杏,有這些信息就足以讓他決定是否把tasks調(diào)度到目標(biāo)Executors上去。二來(lái)迷守,每個(gè)Executors的內(nèi)存總大小犬绒,在Spark集群?jiǎn)?dòng)的時(shí)候就確定了,因此兑凿,ExecutorData自然是沒(méi)必要記錄像Total Memory這樣的冗余信息凯力。Spark對(duì)于內(nèi)存的預(yù)估不準(zhǔn),再者礼华,每個(gè)Executors的可用內(nèi)存都會(huì)隨著GC的執(zhí)行而動(dòng)態(tài)變化咐鹤,因此,ExecutorData記錄的Free Memory卓嫂,永遠(yuǎn)都是過(guò)時(shí)的信息慷暂,TaskScheduler拿到這樣的信息,也沒(méi)啥用晨雳。一者是不準(zhǔn)行瑞,二來(lái)確實(shí)沒(méi)用,因?yàn)門askScheduler拿不到數(shù)據(jù)分片大小這樣的信息餐禁,TaskScheduler在Driver端血久,而數(shù)據(jù)分片是在目標(biāo)Executors,所以TaskScheduler拿到Free Memory也沒(méi)啥用帮非,因?yàn)樗膊荒芘袛嗾f(shuō):task要處理的數(shù)據(jù)分片氧吐,是不是超過(guò)了目標(biāo)Executors的可用內(nèi)存。
- 資源調(diào)度和任務(wù)調(diào)度是分開(kāi)的末盔。資源調(diào)度主要看哪些節(jié)點(diǎn)可以啟動(dòng)executors筑舅,是否能滿足executors所需的cpu數(shù)量要求,這個(gè)時(shí)候陨舱,不會(huì)考慮任務(wù)翠拣、數(shù)據(jù)本地性這些因素。資源調(diào)度完成之后游盲,在任務(wù)調(diào)度階段误墓,spark負(fù)責(zé)計(jì)算每個(gè)任務(wù)的本地性,效果就是task明確知道自己應(yīng)該調(diào)度到哪個(gè)節(jié)點(diǎn)益缎,甚至是哪個(gè)executors谜慌。最后scheduler Backend會(huì)把task代碼,分發(fā)到目標(biāo)節(jié)點(diǎn)的目標(biāo)executors莺奔,完成任務(wù)調(diào)度欣范,實(shí)現(xiàn)數(shù)據(jù)不動(dòng)代碼動(dòng)。
- DAGScheduler 在創(chuàng)建 Tasks 的過(guò)程中,是如何設(shè)置每一個(gè)任務(wù)的本地性級(jí)別熙卡?DAGScheduler會(huì)嘗試獲取RDD的每個(gè)Partition的偏好位置信息杖刷,a.如果RDD被緩存,通過(guò)緩存的位置信息獲取每個(gè)分區(qū)的位置信息驳癌;b.如果RDD有preferredLocations屬性,通過(guò)preferredLocations獲取每個(gè)分區(qū)的位置信息役听;c. 遍歷RDD的所有是NarrowDependency的父RDD颓鲜,找到第一個(gè)滿足a,b條件的位置信息
- 關(guān)于等待時(shí)間和執(zhí)行時(shí)間的平衡?在調(diào)度了最契合locality的tasks后還有空閑executor典予。下一批task本來(lái)是有資源可用的甜滨,但最適合執(zhí)行task的executor已被占用,此時(shí)會(huì)評(píng)估下一批tasks等待時(shí)間和在空閑executor執(zhí)行數(shù)據(jù)傳輸時(shí)間瘤袖,如果等待時(shí)間大于數(shù)據(jù)傳輸則直接調(diào)度到空閑executor衣摩,否則繼續(xù)等待。把wait參數(shù)設(shè)置為0捂敌,則可以不進(jìn)行等待艾扮,有資源時(shí)直接調(diào)度執(zhí)行≌纪瘢看你具體場(chǎng)景泡嘴。
- 能在Driver段處理的就在Driver處理,比如作者提供的初始化字典的例子逆济,否則每個(gè)executor都要初始化一邊(Driver端把包含字典對(duì)象發(fā)給的executor)
- 透過(guò) RDD 緩存看 MemoryStore
- 透過(guò) Shuffle 看 DiskStore
- 為什么用到了LinkedHashMap存儲(chǔ)(Block ID, MemoryEntry)酌予?當(dāng)storage memory不足,spark需要?jiǎng)h除rdd cache的時(shí)候奖慌,遵循的是lru抛虫,那么問(wèn)題來(lái)了,它咋實(shí)現(xiàn)的lru简僧,答案就是它充分利用LinkedHashMap特性:訪問(wèn)有序
- spark 做shuffle的時(shí)候建椰,shuffle write 要寫(xiě)入磁盤,是否可以直接通過(guò)內(nèi)存?zhèn)鬏敚?/li>
- 結(jié)合 RDD 數(shù)據(jù)存儲(chǔ)到 MemoryStore 的過(guò)程涎劈,推演出通過(guò) MemoryStore 通過(guò) getValues/getBytes 方法去訪問(wèn) RDD 緩存內(nèi)容的過(guò)程嗎广凸?
- 考 RDD 緩存存儲(chǔ)的過(guò)程,推演出廣播變量存入 MemoryStore 的流程嗎蛛枚?
- 堆外內(nèi)存存在的意義是什么谅海,有什么場(chǎng)景是一定需要堆外內(nèi)存么?Spark不同任務(wù)在堆內(nèi)堆外內(nèi)存的使用選擇上的邏輯蹦浦?
- 分別計(jì)算不同內(nèi)存區(qū)域(Reserved扭吁、User、Execution、Storage)的具體大小