讀書計(jì)劃: spark源碼分析和學(xué)習(xí)
參考書記:《spark大數(shù)據(jù)處理技術(shù)》《 深入理解spark:核心思想和源碼分析》
Matei Zaharia寫的《An Architecture for Fast and General Data Processing on Large Clusters》
學(xué)習(xí)周期:1個(gè)月(2017.4.24—2017.5.24)
第一步:閱讀Matei Zaharia的論文命迈,回顧了解spark的設(shè)計(jì)辛辨。
第二部:依據(jù)以上兩本書分一下8部分來詳細(xì)了解spark的使用 設(shè)計(jì) 實(shí)現(xiàn)
- RDD
- Spark運(yùn)行模式及其原理
- spark調(diào)度管理原理
- spark存儲(chǔ)管理
- spark sql
- spark streaming
- graphx
論文
RDD概述
RDD 設(shè)計(jì)時(shí)的最大挑戰(zhàn)在于定義一個(gè)能提供高效容錯(cuò)能力的編程接口《渎啵現(xiàn)有的基于集群的內(nèi)存存儲(chǔ)抽象够挂,比如分布式共享內(nèi)存[79],鍵-值存儲(chǔ)[81],數(shù)據(jù)庫,以及 Piccolo[86],提供了一個(gè)對(duì)內(nèi)部狀態(tài)基于細(xì)粒度更新的接口(例如,表格里面的單元).在這樣的設(shè)計(jì)之下,提供容錯(cuò)性的方法就要么是在主機(jī)之間復(fù)制數(shù)據(jù),要么對(duì)各主機(jī)的更新情況做日志記錄。這兩種方法對(duì)于數(shù)據(jù)密集型的任務(wù)來說代價(jià)很高衔沼,因?yàn)樗鼈冃枰趲掃h(yuǎn)低于內(nèi)存的集群網(wǎng)絡(luò)間拷貝大量的數(shù)據(jù),同時(shí)還將產(chǎn)生大量的存儲(chǔ)開銷昔瞧。
與上述系統(tǒng)不同的是指蚁,RDD提供一種基于粗粒度變換(如, map, filter, join)的接口自晰,該接口會(huì)將相同的操作應(yīng)用到多個(gè)數(shù)據(jù)集上凝化。這使得他們可以通過記錄用來創(chuàng)建數(shù)據(jù)集的變換(lineage),而不需存儲(chǔ)真正的數(shù)據(jù)酬荞,進(jìn)而達(dá)到高效的容錯(cuò)性搓劫。1 當(dāng)一個(gè)RDD的某個(gè)分區(qū)丟失的時(shí)候,RDD記錄有足夠的信息記錄其如何通過其他的RDD進(jìn)行計(jì)算混巧,且只需重新計(jì)算該分區(qū)枪向。因此,丟失的數(shù)據(jù)可以被很快的恢復(fù)咧党,而不需要昂貴的復(fù)制代價(jià)秘蛔。
從形式上看,RDD是一個(gè)分區(qū)的只讀記錄的集合。RDD只能通過在(1)穩(wěn)定的存儲(chǔ)器或(2)其他RDD的數(shù)據(jù)上的確定性操作來創(chuàng)建缠犀。我們把這些操作稱作變換以區(qū)別其他類型的操作数苫。例如 map, filter, 和 join聪舒。2
RDD 在任何時(shí)候都不需要被"物化"(進(jìn)行實(shí)際的變換并最終寫入穩(wěn)定的存儲(chǔ)器上)辨液。實(shí)際上乌妒,一個(gè) RDD 有足夠的信息描述著其如何從其他穩(wěn)定的存儲(chǔ)器上的數(shù)據(jù)生成肖卧。它有一個(gè)強(qiáng)大的特性:從本質(zhì)上說,若 RDD 失效且不能重建什乙,程序?qū)⒉荒芤迷?RDD被辑。
最后燎悍,用戶可以控制 RDD 的其他兩個(gè)方面:持久化和分區(qū)。用戶可以選擇重用哪個(gè) RDD盼理,并為其制定存儲(chǔ)策略(比如谈山, 內(nèi)存存儲(chǔ))。也可以讓 RDD 中的數(shù)據(jù)根據(jù)記錄的 key 分布到集群的多個(gè)機(jī)器宏怔。這對(duì)位置優(yōu)化來說是有用的奏路,比如可用來保證兩個(gè)要Jion的數(shù)據(jù)集都使用了相同的哈希分區(qū)方式。
Spark直到RDD第一次調(diào)用一個(gè)動(dòng)作時(shí)才真正計(jì)算RDD臊诊。這也就使得Spark可以按序緩存多個(gè)變換鸽粉。
我們可以將 transformations 操作理解成一種惰性操作约谈,它只是定義了一個(gè)新的 RDD笔宿,而不是立即計(jì)算它。相反棱诱,actions 操作則是立即計(jì)算泼橘,并返回結(jié)果給程序,或者將結(jié)果寫入到外存儲(chǔ)中迈勋。
抽象RDD:
簡(jiǎn)而言之炬灭,我們提供了一個(gè)通用接口來抽象每個(gè) RDD,并提供 5 種信息:一組分區(qū)靡菇,他們是數(shù)據(jù)集的最小分片;一組 依賴關(guān)系重归,指向其父 RDD;一個(gè)函數(shù)米愿,基于父 RDD 進(jìn)行計(jì)算;以及劃分策略和數(shù)據(jù)位置的元數(shù)據(jù)。
在設(shè)計(jì)接口的過程中鼻吮,最有趣的問題在于如何表示 RDD 之間的依賴關(guān)系育苟。我們發(fā)現(xiàn),比較合理的方式是將依賴關(guān)系分成兩類:窄依賴:每個(gè)父 RDD 的分區(qū)都至多被一個(gè)子 RDD 的分區(qū)使用;寬依賴:多個(gè)子 RDD 的分區(qū)依賴一個(gè)父 RDD 的分區(qū)椎木。例如违柏,map 操作是一種窄依賴,而 join操作是一種寬依賴(除非父 RDD 已經(jīng)基于 Hash 策略被劃分過了)香椎。
一些RDD實(shí)現(xiàn)的概念:
HDFS 文件:在我們的例子中漱竖,HDFS 文件作為輸入 RDD。對(duì)于這些 RDD畜伐,partitions 代表文件中每個(gè)文件塊的分區(qū)(包含文件塊在每個(gè)分區(qū)對(duì)象中的偏移量)馍惹,preferredLocations 表示文件塊所在的節(jié)點(diǎn),而 iterator 讀取這些文件塊玛界。
map:在任何一個(gè)RDD上調(diào)用map操作將返回一個(gè)MappedRDD對(duì)象万矾。這個(gè)對(duì)象與其父對(duì)象具有相同的分區(qū)以及首選地點(diǎn)(preferredLocations),但在其迭代方法(iterator)中脚仔,傳遞給 map的函數(shù)會(huì)應(yīng)用到父對(duì)象記錄勤众。
union:在兩個(gè)RDD上調(diào)用union操作將返回一個(gè)RDD,這個(gè)RDD的分區(qū)為原始兩個(gè)RDD的父RDD的分區(qū)進(jìn)行union后的結(jié)果鲤脏。每個(gè)子分區(qū)都是通過窄依賴于同一個(gè)父級(jí)分區(qū)計(jì)算出來的们颜。7
sample:抽樣類似于映射。不同之處在于猎醇,RDD 會(huì)為每一個(gè)分區(qū)保存一個(gè)生成隨機(jī)數(shù)的種子來對(duì)確定如何對(duì)父級(jí)記錄進(jìn)行抽樣窥突。
join:連接兩個(gè) RDD 可能會(huì)產(chǎn)生兩個(gè)窄依賴,或兩個(gè)寬依賴硫嘶,或一個(gè)窄依賴和一個(gè)寬依賴阻问。如果兩個(gè) RDD 都是基于相同的 Hash/范圍劃分策略,那么就會(huì)產(chǎn)生窄依賴;如果一個(gè)父 RDD 具有某種劃分策略而另一個(gè)不具有沦疾,則會(huì)同時(shí)產(chǎn)生窄依賴和寬依賴称近。無論哪種情況,結(jié)果 RDD 都具有一個(gè)劃分策略(要么繼承自父 RDD哮塞,要么是一個(gè)默認(rèn)的 Hash 劃分策略)刨秆。
作業(yè)調(diào)度:
總的來說,我們的調(diào)度器與 Dryad 的[61]類似忆畅,但它額外會(huì)考慮被持久化(persist)的 RDD的那個(gè)分區(qū)保存在內(nèi)存中并可供使用衡未。當(dāng)用戶對(duì)一個(gè) RDD 執(zhí)行 Action(如 count 或 save)操作時(shí), 調(diào)度器會(huì)根據(jù)該 RDD 的 lineage,來構(gòu)建一個(gè)由若干 階段(stage) 組成的一個(gè) DAG(有向無環(huán)圖)以執(zhí)行程序缓醋,正如 2.5 所示如失。 每個(gè) stage 都包含盡可能多的連續(xù)的窄依賴型轉(zhuǎn)換。各個(gè)階段之間的分界則是寬依賴所需的 shuffle 操作送粱,或者是 DAG 中一個(gè)經(jīng)由該分區(qū)能更快到達(dá)父 RDD 的已計(jì)算分區(qū)褪贵。之后,調(diào)度器運(yùn)行多個(gè)任務(wù)來計(jì)算各個(gè)階段所缺失的分區(qū)葫督,直到最終得出目標(biāo) RDD竭鞍。
調(diào)度器向各機(jī)器的任務(wù)分配采用延時(shí)調(diào)度機(jī)制[117]并根據(jù)數(shù)據(jù)存儲(chǔ)位置(本地性)來確定板惑。若一個(gè)任務(wù)需要處理的某個(gè)分區(qū)剛好存儲(chǔ)在某個(gè)節(jié)點(diǎn)的內(nèi)存中橄镜,則該任務(wù)會(huì)分配給那個(gè)節(jié)點(diǎn)。否則冯乘,如果一個(gè)任務(wù)處理的某個(gè)分區(qū)洽胶,該分區(qū)含有的 RDD 提供較佳的位置(例如,一個(gè) HDFS 文件)裆馒,我們把該任務(wù)分配到這些位置姊氓。
"對(duì)應(yīng)寬依賴類的操作{比如w shuffle依賴),我們會(huì)將中間記錄物理化到保存父分區(qū)的節(jié)點(diǎn)上喷好。這和 MapReduce 物化 Map 的輸出類似翔横,能簡(jiǎn)化數(shù)據(jù)的故障恢復(fù)過程。"
針對(duì)調(diào)度器器自身失敗的容錯(cuò)哆键,拷貝相應(yīng) RDD 的 lineage 是比較直接的解決之道掘托。但現(xiàn)階段我們并不提供該類容錯(cuò)特性。
若某個(gè)任務(wù)執(zhí)行緩慢 (即"落后者"straggler)籍嘹,系統(tǒng)則會(huì)在其他節(jié)點(diǎn)上執(zhí)行該任務(wù)的拷貝這與 MapReduce 做法類似闪盔,并取最先得到的結(jié)果作為最終的結(jié)果。
scala解析器
Scala 解析器通常會(huì)為用戶輸入的每一行生成一個(gè)類辱士,把它導(dǎo)入 JVM 泪掀,調(diào)用上面的一個(gè)函數(shù)。Scala 解析器的解析通常有如下組成:
將用戶輸入的每一行編譯出其所對(duì)應(yīng)的一個(gè)類;
將該類載入到 JVM 中;
調(diào)用該類的某個(gè)函數(shù)识补。這個(gè)類包含一個(gè)單例對(duì)象族淮,對(duì)象中包含當(dāng)前行的變量或函數(shù),在初始化方法中包含運(yùn)行該行的代碼。例如祝辣,如果用戶鍵入 var x = 5贴妻,換一行再鍵入 println(x),那解析器會(huì)定義一個(gè)叫 Line1 的類蝙斜,該類包含 x名惩。第二行編譯成println(Line1.getInstance().x)。
內(nèi)存管理
Spark提供了三種對(duì)持久化RDD的存儲(chǔ)策略:未序列化Java對(duì)象存于內(nèi)存中孕荠、序列化后的數(shù)據(jù)存于內(nèi)存及磁盤存儲(chǔ)娩鹉。第一個(gè)選項(xiàng)的性能表現(xiàn)是最優(yōu)秀的,因?yàn)榭梢灾苯釉L問在JAVA虛擬機(jī)內(nèi)存里的RDD對(duì)象稚伍。在空間有限的情況下弯予,第二種方式可以讓用戶采用比JAVA對(duì)象圖更有效的內(nèi)存組織方式,代價(jià)是降低了性能个曙。8第三種策略適用于RDD太大難以存儲(chǔ)在內(nèi)存的情形锈嫩,但每次重新計(jì)算該RDD會(huì)帶來額外的資源開銷。
對(duì)于有限可用內(nèi)存垦搬,我們使用以 RDD 為對(duì)象的 LRU 回收算法來進(jìn)行管理呼寸。當(dāng)計(jì)算得到一個(gè)新的 RDD 分區(qū),但卻沒有足夠空間來存儲(chǔ)它時(shí)猴贰,系統(tǒng)會(huì)從最近最少使用的 RDD 中回收其一個(gè)分區(qū)的空間对雪。除非該 RDD 便是新分區(qū)對(duì)應(yīng)的 RDD,這種情況下米绕,Spark 會(huì)將舊的分區(qū)繼續(xù)保留在內(nèi)存瑟捣,防止同一個(gè) RDD 的分區(qū)被循環(huán)調(diào)入調(diào)出。這點(diǎn)很關(guān)鍵--因?yàn)榇蟛糠值牟僮鲿?huì)在一個(gè) RDD 的所有分區(qū)上進(jìn)行义郑,那么很有可能已經(jīng)存在內(nèi)存中的分區(qū)將會(huì)被再次使用蝶柿。到目前為止,這種默認(rèn)的策略在我們所有的應(yīng)用中都運(yùn)行很好非驮,當(dāng)然我們也為用戶提供了“持久化優(yōu)先級(jí)”選項(xiàng)來控制 RDD 的存儲(chǔ)交汤。