什么是RDD
RDD(Resilient Distributed Dataset)叫做分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象恕刘,
它代表一個(gè)不可變香追、可分區(qū)固额、里面的元素可并行計(jì)算的集合。
RDD具有數(shù)據(jù)流模型的特點(diǎn):自動(dòng)容錯(cuò)伏钠、位置感知性調(diào)度和可伸縮性钩杰。
RDD允許用戶在執(zhí)行多個(gè)查詢時(shí)顯式地將工作集緩存在內(nèi)存中,
后續(xù)的查詢能夠重用工作集苛萎,這極大地提升了查詢速度奠伪。
RDD的屬性
1、一組分片(Partition)首懈,即數(shù)據(jù)集的基本組成單位绊率。
對于RDD來說,每個(gè)分片都會(huì)被一個(gè)計(jì)算任務(wù)處理究履,并決定并行計(jì)算的粒度滤否。
用戶可以在創(chuàng)建RDD時(shí)指定RDD的分片個(gè)數(shù),如果沒有指定最仑,那么就會(huì)采用默認(rèn)值藐俺。
默認(rèn)值就是程序所分配到的CPU Core的數(shù)目。
2泥彤、一個(gè)計(jì)算每個(gè)分區(qū)的函數(shù)欲芹。
Spark中RDD的計(jì)算是以分片為單位的,每個(gè)RDD都會(huì)實(shí)現(xiàn)compute函數(shù)以達(dá)到這個(gè)目的吟吝。
compute函數(shù)會(huì)對迭代器進(jìn)行復(fù)合菱父,不需要保存每次計(jì)算的結(jié)果。
3剑逃、RDD之間的依賴關(guān)系浙宜。
RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD,所以RDD之間就會(huì)形成類似于流水線一樣的前后依賴關(guān)系蛹磺。
在部分分區(qū)數(shù)據(jù)丟失時(shí)粟瞬,Spark可以通過這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù),而不是對RDD的所有分區(qū)進(jìn)行重新計(jì)算萤捆。
4裙品、一個(gè)Partitioner,即RDD的分片函數(shù)俗或。
當(dāng)前Spark中實(shí)現(xiàn)了兩種類型的分片函數(shù)市怎,一個(gè)是基于哈希的HashPartitioner,
另外一個(gè)是基于范圍的RangePartitioner蕴侣。
只有對于key-value的RDD焰轻,才會(huì)有Partitioner,
非key-value的RDD的Parititioner的值是None昆雀。
Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量辱志,也決定了parent RDD Shuffle輸出時(shí)的分片數(shù)量。
5狞膘、一個(gè)列表揩懒,存儲(chǔ)存取每個(gè)Partition的優(yōu)先位置(preferred location)。
對于一個(gè)HDFS文件來說挽封,這個(gè)列表保存的就是每個(gè)Partition所在的塊的位置已球。
按照“移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算”的理念,Spark在進(jìn)行任務(wù)調(diào)度的時(shí)候辅愿,會(huì)盡可能地將計(jì)算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲(chǔ)位置智亮。
生成rdd的兩種方式
#cd training/spark/bin
# ./spark-shell --master spark://hadoop21:7077 --executor-memory 512m --total-executor-cores 2
scala> val rdd1 = sc.textFile("hdfs://192.168.56.21:9000/wc")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://192.168.56.21:9000/wc MapPartitionsRDD[1] at textFile at <console>:24
scala> rdd1.collect
res0: Array[String] = Array(hello tom, hello jerry, hello tom, hello kitty, hello tom, hello jerry, hello tom, hello jerry, hello lilei, hello hanmeimei, hello tom, hello tom, hello jerry, hello tom, hello tom, hello jerry, hello lilei, hello hanmeimei, hello tom, hello tom, hello jerry, hello tom)
scala> val rdd2 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> rdd2.collect
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8)