什么是RDD
RDD(Resilient Distributed Datasets)纹烹,彈性分布式數(shù)據(jù)集页滚,是Spark的基本數(shù)據(jù)結(jié)構(gòu)召边。
它是一個不可變的分布式對象集合。
RDD中的每個數(shù)據(jù)集被劃分為邏輯分區(qū)裹驰,其可以在集群的不同節(jié)點上計算隧熙。
RDD可以包含任何類型的Python,Java或Scala對象邦马,包括用戶定義的類贱鼻。
形式上,RDD是只讀的 分區(qū) 記錄集合滋将。 可以通過讀取外部存儲系統(tǒng)中的數(shù)據(jù)集(如HDFS邻悬,HBase或提供Hadoop輸入格式的任何數(shù)據(jù)源等)、轉(zhuǎn)換現(xiàn)有數(shù)據(jù)集合或?qū)ζ渌鸕DD的數(shù)據(jù)進行轉(zhuǎn)換來創(chuàng)建RDD随闽。
RDD是一個支持容錯集合父丰,可以并行操作。
RDD的主要屬性
從RDD的內(nèi)部定義來看掘宪,每個RDD擁有以下五個主要屬性:
- 分區(qū)列表
- 與其他RDD的依賴關(guān)系列表
- 計算分片(split)的函數(shù)
- (可選) 鍵值RDD中的分區(qū)器Partitioner (例如蛾扇,hash-partitioner)
- (可選) 用于計算每個分片的的優(yōu)選位置列表 (例如,HDFS文件的block位置)
RDD的組成
RDD主要由以下四部分組成
- 分區(qū)(Partitions):數(shù)據(jù)集的原子片段魏滚。 每個計算節(jié)點含有一個或多個分區(qū)镀首。
- 依賴關(guān)系(Dependencies):RDD的每個分區(qū)計算時依賴哪些父RDD的分區(qū)(如下圖)
- 函數(shù)/算子(Functions): 基于其父RDD的用于計算數(shù)據(jù)集的函數(shù)。
-
元數(shù)據(jù)(Metadata): RDD的分區(qū)方案和數(shù)據(jù)的存儲放置鼠次。
RDD的分區(qū)(Partition)
RDD中的數(shù)據(jù)被存儲在多個分區(qū)中更哄。
RDD分區(qū)的特征
- 分區(qū)永遠不會跨越多臺機器,即同一分區(qū)中的數(shù)據(jù)始終保證在同一臺機器上腥寇。
- 群集中的每個節(jié)點包含一個或多個分區(qū)成翩。
- 分區(qū)的數(shù)目是可以設(shè)置的。 默認情況下赦役,它等于所有執(zhí)行程序節(jié)點上的核心總數(shù)麻敌。 例如。 6個工作節(jié)點掂摔,每個具有4個核心术羔,RDD將被劃分為24個分區(qū)。
RDD分區(qū)與任務(wù)執(zhí)行的關(guān)系
在Map階段partition數(shù)目保持不變乙漓。
在Reduce階段级历,RDD的聚合會觸發(fā)shuffle操作,聚合后的RDD的partition數(shù)目跟具體操作有關(guān)簇秒,例如repartition操作會聚合成指定分區(qū)數(shù)鱼喉,還有一些算子是可配置的秀鞭。
RDD分區(qū)數(shù)的調(diào)整可以通過以下兩個函數(shù)完成:
- repartition
repartition函數(shù)相當于coalesce(numPartitions, shuffle = True), 不僅可以調(diào)整分區(qū)數(shù)目(增加或減少)趋观,也可以將partitioner調(diào)整為hash-partitioner扛禽,產(chǎn)生shuffle操作 - coalesce
coalesce函數(shù)可以控制是否shuffle,但是shuffle為False時皱坛,只能減少分區(qū)數(shù)编曼,無法增大。
RDD在計算的時候剩辟,每個分區(qū)都會啟動一個task掐场,RDD的分區(qū)數(shù)目決定了總的task數(shù)目。
申請的Executor數(shù)和Executor的CPU核數(shù)贩猎,決定了你同一時刻可以并行執(zhí)行的task數(shù)量熊户。
這里我們舉個例子來加深對RDD分區(qū)數(shù)量與task執(zhí)行的關(guān)系的理解</font></b>
比如的RDD有100個分區(qū),那么計算的時候就會生成100個task吭服,你的資源配置為10個計算節(jié)點嚷堡,每個兩2個核,同一時刻可以并行的task數(shù)目為20艇棕,計算這個RDD就需要5個輪次蝌戒。如果計算資源不變,你有101個task的話沼琉,就需要6個輪次北苟,在最后一輪中,只有一個task在執(zhí)行打瘪,其余核都在空轉(zhuǎn)友鼻。
<font color=black>partition數(shù)量太少會造成資源利用不夠充分。
例如瑟慈,在資源不變的情況桃移,你的RDD只有10個分區(qū),那么同一時刻只有10個task運行葛碧,其余10個核將空轉(zhuǎn)借杰。
通常在spark調(diào)優(yōu)中,可以增大RDD分區(qū)數(shù)目來增大任務(wù)并行度进泼。
但是partition數(shù)量太多則會造成task過多蔗衡,task的傳輸/序列化開銷增大,也可能會造成輸出過多的(小)文件乳绕。
<<b>spark.default.parallelism</b> 和 <b>spark.sql.shuffle.partitions</b> 這兩個參數(shù)很重要
RDD的分區(qū)器(Partitioner)
Spark中提供兩種分區(qū)器:
- 散列分區(qū) Hash partitioning
- 范圍分區(qū) Range partitioning
<font color=black>只有PairRDD支持自定義分區(qū)器绞惦。
RDD的邏輯執(zhí)行計劃(Lineage)
RDD Lineage,又叫做RDD運算符圖或RDD依賴圖洋措,是包含一個子RDD的所有父RDD的圖济蝉。每當我們執(zhí)行RDD轉(zhuǎn)換(transformation)操作,就會產(chǎn)生RDD Lineage并用于創(chuàng)建 邏輯執(zhí)行計劃。
Spark stages的DAG的執(zhí)行稱作 物理執(zhí)行計劃
邏輯執(zhí)行計劃從最初始的RDD (不依賴于其他RDD或引用緩存數(shù)據(jù)的RDD)開始王滤,以調(diào)用可以產(chǎn)生RDD結(jié)果的action算子結(jié)束贺嫂。
使用toDebugString函數(shù)可以顯示RDD Lineage
RDD Lineage是Spark中容錯的關(guān)鍵
我們可以通過RDD Lineage,追溯到丟失分區(qū)的父RDD雁乡,然后根據(jù)父RDD重新計算丟失分區(qū)第喳,使其從故障中恢復。
RDD的依賴關(guān)系(Dependencies)
RDD的每一個Transformation操作都會生成一個新的RDD踱稍,所以RDD之間就會形成類似流水線的前后依賴關(guān)系曲饱;在Spark中,RDD之間存在兩種類型的依賴關(guān)系:窄依賴(Narrow Dependency)和寬依賴(Wide Dependency)珠月;
窄依賴(Narrow Dependency)
窄依賴是指每個父RDD的一個Partition最多被子RDD的一個Partition所使用扩淀,例如map、filter啤挎、union等操作都會產(chǎn)生窄依賴引矩;
對于窄依賴,由于partition依賴關(guān)系的確定性侵浸,partition的轉(zhuǎn)換處理就可以在同一個線程里完成旺韭,這種轉(zhuǎn)換不會引起shuffle操作,速度快掏觉!
寬依賴(Wide Dependency)
寬依賴是指一個父RDD的Partition會被多個子RDD的Partition所使用区端,例如groupByKey、reduceByKey澳腹、sortByKey等操作都會產(chǎn)生寬依賴织盼;
這種轉(zhuǎn)換會引起shuffle操作,速度慢酱塔!
Shuffle是MapReduce框架中的一個特定的phase沥邻,介于Map phase和Reduce phase之間,當Map的輸出結(jié)果要被Reduce使用時羊娃,輸出結(jié)果需要按key哈希唐全,并且分發(fā)到每一個Reducer上去,這個過程就是shuffle蕊玷。由于shuffle涉及到了磁盤的讀寫和網(wǎng)絡(luò)的傳輸邮利,因此shuffle性能的高低直接影響到了整個程序的運行效率。
RDD與Task/Stage的關(guān)系
Task
Task是Spark中最小的任務(wù)執(zhí)行單元垃帅,每個RDD的transformation操作都會被翻譯成相應的task延届,分配到相應的executor節(jié)點上對相應的partition執(zhí)行。
RDD在計算的時候贸诚,每個分區(qū)都會啟動一個task方庭,RDD的分區(qū)數(shù)目決定了總的task數(shù)目厕吉。
Task的類型分為2種:ShuffleMapTask和ResultTask;
簡單來說械念,DAG的最后一個階段會為每個結(jié)果的partition生成一個ResultTask赴涵,即每個Stage里面的Task的數(shù)量是由該Stage中最后一個RDD的Partition的數(shù)量所決定的。
Stage
Stage是程序執(zhí)行時的物理订讼,是物理執(zhí)行計劃中的一個步驟。
Stage由一組有narrow transformation(無要shuffle)構(gòu)成的task組成扇苞, 不需要在節(jié)點間傳輸數(shù)據(jù)欺殿,可以被高效的執(zhí)行。
一個Stage只能在單個RDD的分區(qū)上工作鳖敷。
Stage的類型分為2種:
- ShuffleMapStage
- ResultStage
參考來源:
Wide vs Narrow Dependencies
Mastering Apache Spark
Partitioning
spark學習之RDD來源解密