RDD
RDD(Resilient Distributed Dataset)叫做彈性分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象,它代表一個不可變昵济、可分區(qū)、里面的元素可并行計算的集合野揪。RDD具有數(shù)據(jù)流模型的特點:自動容錯访忿、位置感知性調(diào)度和可伸縮性。RDD允許用戶在執(zhí)行多個查詢時顯式地將工作集緩存在內(nèi)存中斯稳,后續(xù)的查詢能夠重用工作集海铆,這極大地提升了查詢速度。
RDD5大特型
A list of partitions 一個分區(qū)列表
A function for computing each split 對每個切片的計算函數(shù)
A list of dependencies on other RDDs RDD內(nèi)部存放一個依賴列表(怎么變換來的)
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 分區(qū)器 可選的
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) 數(shù)據(jù)本地化 可選的
特性一
-- A list of partitions:
每個RDD分區(qū)都分布在集群節(jié)點上挣惰,分區(qū)的多少涉及到RDD Computing 并行效率卧斟。其實,分區(qū)只是一個邏輯概念憎茂,前后變換的分區(qū)珍语,可能是物理機器上同一塊內(nèi)存或者存儲,用戶可以指定分區(qū)數(shù)竖幔,默認是CPU核數(shù)板乙。如果從HDFS文件創(chuàng)建時數(shù)據(jù)塊數(shù)。
Spark中的io也是不可避免的拳氢,但是網(wǎng)絡(luò)傳輸Spark進行了優(yōu)化募逞,Spark把Rdd進行分區(qū),放在集群上并行計算馋评。同一個RDD分片100個放接,10個節(jié)點,平均每個節(jié)點10個分區(qū)留特,當進行sum時纠脾,網(wǎng)絡(luò)傳輸非常小,各自節(jié)點將計算好的sum值shuffle到主程序進行全局sum磕秤。所以會很快乳乌,但是進行join時候需要把數(shù)據(jù)本身進行shuffle捧韵,網(wǎng)絡(luò)開銷很大市咆。如果時做了分區(qū),就會將相同key在同一個分區(qū)中再来,按key進行聚合的時就不需要再shuffle了蒙兰。
RDD 分區(qū)的一個分配原則是:盡可能使得分區(qū)的個數(shù)磷瘤,等于集群核心數(shù)目。
/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*
* The partitions in this array must satisfy the following property:
* `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
*/
protected def getPartitions: Array[Partition]
特性二
--A function for computing each split:
一個函數(shù)作用在一個分區(qū)上搜变。比如說一個分區(qū)有1采缚,2,3 在rdd1.map(_*10),把RDD里面的每一個元素取出來乘以10,每個分片都應(yīng)用這個map的函數(shù)挠他。
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
兩個參數(shù):1.分區(qū)對象 2.TaskContext對象
compute函數(shù)會對迭代器進行復合扳抽,不需要保存每次計算的結(jié)果
Spark支持兩個類型(算子)操作:Transformation和Action
主要做的是就是將一個已有的RDD生成另外一個RDD。Transformation具有l(wèi)azy特性(延遲加載)殖侵。Transformation算子的代碼不會真正被執(zhí)行贸呢。只有當我們的程序里面遇到一個action算子的時候,代碼才會真正的被執(zhí)行拢军。這種設(shè)計讓Spark更加有效率地運行楞陷。
特性三
--A list of dependencies on other RDDs RDD:
窄依賴:是指每個父RDD的一個Partition最多被子RDD的一個Partition所使用,例如map,filter,union茉唉。(獨生子女)
寬依賴:是指一個父RDD的Partition會被多個子RDD的Partition所使用固蛾,例如groupBykey,reduceBykey,sortBykey等操作都會產(chǎn)生寬依賴。(超生)
在這里我們是從父RDD的partition被使用的個數(shù)來定義窄依賴和寬依賴度陆,因此可以用一句話概括下:如果父RDD的一個Partition被子RDD的一個Partition所使用就是窄依賴艾凯,否則的話就是寬依賴。因為是確定的partition數(shù)量的依賴關(guān)系坚芜,所以RDD之間的依賴關(guān)系就是窄依賴览芳;由此我們可以得出一個推論:即窄依賴不僅包含一對一的窄依賴,還包含一對固定個數(shù)的窄依賴鸿竖。
一對固定個數(shù)的窄依賴的理解:即子RDD的partition對父RDD依賴的Partition的數(shù)量不會隨著RDD數(shù)據(jù)規(guī)模的改變而改變沧竟;換句話說,無論是有100T的數(shù)據(jù)量還是1P的數(shù)據(jù)量缚忧,在窄依賴中悟泵,子RDD所依賴的父RDD的partition的個數(shù)是確定的,而寬依賴是shuffle級別的闪水,數(shù)據(jù)量越大糕非,那么子RDD所依賴的父RDD的個數(shù)就越多,從而子RDD所依賴的父RDD的partition的個數(shù)也會變得越來越多球榆。
從后往前推朽肥,遇到寬依賴就斷開,劃分為一個stage持钉;遇到窄依賴就將這個RDD加入該stage中
特性四
-- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) :
可選:key-value型的RDD是根據(jù)哈希來分區(qū)的衡招,類似于mapreduce當中的paritioner接口,控制Key分到哪個reduce每强。
一個partitioner始腾,即RDD的分片函數(shù)州刽。當前Spark中實現(xiàn)了兩種類型的分片函數(shù),一個是基于哈希的HashPartitioner浪箭,另外一個基于范圍的RangePartitioner穗椅。只有對于key-value的RDD,才會有Partitioner奶栖,非key-value的RDD的Partitioner的值是None匹表。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量,也決定了parent RDD Shuffle輸出時的分片數(shù)量宣鄙。類似hadoop的預分區(qū)桑孩。
特性五
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) :
可選:每一分片的優(yōu)先計算位置,比如HDFS的block的所在位置應(yīng)該是優(yōu)先計算的位置框冀。
一個列表流椒,存儲存取每個Partition的優(yōu)先位置(preferred location)。對于一個HDFS文件來說明也,這個列表保存的就是每個Partition所在的塊的位置宣虾。按照“移動數(shù)據(jù)不如移動計算”的理念,Spark在進行任務(wù)調(diào)度的時候温数,會盡可能地將計算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲位置绣硝。
/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*
* The partitions in this array must satisfy the following property:
* `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
*/
protected def getPartitions: Array[Partition]