一丐怯、RDD及其特點(diǎn)
**1. **RDD(Resillient Distributed Dataset)彈性分布式數(shù)據(jù)集液斜,是spark提供的核心抽象混萝。它代表一個(gè)不可變拜秧、可分區(qū)、里面的元素可并行計(jì)算的集合
**2. **RDD在抽象上來(lái)說(shuō)是一種元素集合老赤,包含了數(shù)據(jù)轮洋。它是被分區(qū)的,分為多個(gè)分區(qū)抬旺,每個(gè)分區(qū)分布在集群中的不同節(jié)點(diǎn)上弊予,從而讓RDD中的數(shù)據(jù)可以被并行操作(分布式數(shù)據(jù)集)
**3. **RDD通常通過(guò)hadoop上的文件,即hdfs文件或者h(yuǎn)ive表來(lái)進(jìn)行創(chuàng)建开财,有時(shí)也可以通過(guò)應(yīng)用程序中的集合來(lái)創(chuàng)建汉柒。
**4. **RDD最重要的特性就是提供了容錯(cuò)性,可以自動(dòng)從節(jié)點(diǎn)失敗中恢復(fù)過(guò)來(lái)责鳍,即某節(jié)點(diǎn)上的RDD partition碾褂,因?yàn)楣?jié)點(diǎn)故障,導(dǎo)致數(shù)據(jù)丟了历葛,那么RDD會(huì)自動(dòng)通過(guò)自己數(shù)據(jù)來(lái)源重新計(jì)算該partition正塌,這一切對(duì)使用者是透明的
**5. **RDD的數(shù)據(jù)默認(rèn)情況下存在內(nèi)存中,但是在內(nèi)存資源不足是,spark會(huì)自動(dòng)將RDD數(shù)據(jù)寫入磁盤(彈性)
注意:RDD的每個(gè)partition乓诽,在spark節(jié)點(diǎn)上存儲(chǔ)時(shí)帜羊,默認(rèn)都是放在內(nèi)存中的,但是如果說(shuō)內(nèi)存放不下這么多數(shù)據(jù)鸠天,比如每個(gè)節(jié)點(diǎn)最多放5w數(shù)據(jù)逮壁,結(jié)果每個(gè)partition市10w數(shù)據(jù),那么就會(huì)把partition中的部分?jǐn)?shù)據(jù)寫入磁盤粮宛,進(jìn)行保存。而上述這一切卖宠,對(duì)于用戶來(lái)說(shuō)巍杈,都是完全透明的,也就是不用去管RDD的數(shù)據(jù)存放在內(nèi)存還是磁盤扛伍,只要關(guān)注你針對(duì)RDD來(lái)進(jìn)行計(jì)算和處理等操作即可筷畦。所以說(shuō)RDD這種自動(dòng)進(jìn)行內(nèi)存和磁盤之間權(quán)衡和切換的機(jī)制,就是RDD的彈性的特點(diǎn)所在刺洒。
二鳖宾、RDD的屬性
1. partitions(分區(qū))。每個(gè)RDD包括多個(gè)分區(qū), 這既是RDD的數(shù)據(jù)單位, 也是計(jì)算粒度, 每個(gè)分區(qū)是由一個(gè)Task線程處理. 在RDD創(chuàng)建的時(shí)候可以指定分區(qū)的個(gè)數(shù), 如果沒(méi)有指定, 那么默認(rèn)分區(qū)的個(gè)數(shù)是CPU的核數(shù)(standalone).
每一分區(qū)對(duì)應(yīng)一個(gè)內(nèi)存block, 由BlockManager分配.
2. partitioner(分區(qū)方法)逆航。這個(gè)屬性指的是RDD的partitioner函數(shù)(分片函數(shù)), 分區(qū)函數(shù)就是將數(shù)據(jù)分配到指定的分區(qū), 這個(gè)目前實(shí)現(xiàn)了HashPartitioner和RangePartitioner, 只有key-value的RDD才會(huì)有分片函數(shù), 否則為none.鼎文。分片函數(shù)不僅決定了當(dāng)前分片的個(gè)數(shù), 同時(shí)決定parent shuffle RDD的輸出的分區(qū)個(gè)數(shù)。
3. dependencies(依賴關(guān)系)因俐。Spark的運(yùn)行過(guò)程就是RDD之間的轉(zhuǎn)換, 因此, 必須記錄RDD之間的生成關(guān)系(新RDD是由哪個(gè)或哪幾個(gè)父RDD生成), 這就是所謂的依賴關(guān)系, 這樣既有助于階段和任務(wù)的劃分, 也有助于在某個(gè)分區(qū)出錯(cuò)的時(shí)候, 只需要重新計(jì)算與當(dāng)前出錯(cuò)的分區(qū)有關(guān)的分區(qū),而不需要計(jì)算所有的分區(qū)拇惋。
窄依賴:父 RDD 的 partition 至多被一個(gè)子 RDD partition 依賴(OneToOneDependency,RangeDependency)
寬依賴:父 RDD 的 partition 被多個(gè)子 RDD partitions 依賴(ShuffleDependency)
4. compute(獲取分區(qū)迭代列表)抹剩。計(jì)算屬性: 當(dāng)調(diào)用 RDD#iterator 方法無(wú)法從緩存或checkpoint中獲取指定 partition 的迭代器時(shí)撑帖,就需要調(diào)用 compute 方法來(lái)獲取RDD不僅包含有數(shù)據(jù), 還有在數(shù)據(jù)上的計(jì)算, 每個(gè)RDD以分區(qū)為計(jì)算粒度, 每個(gè)RDD會(huì)實(shí)現(xiàn)compute函數(shù), compute函數(shù)會(huì)和迭代器(RDD之間轉(zhuǎn)換的迭代器)進(jìn)行復(fù)合, 這樣就不需要保存每次compute運(yùn)行的結(jié)果.
5. preferedLocations(優(yōu)先分配節(jié)點(diǎn)列表)。對(duì)于分區(qū)而言返回?cái)?shù)據(jù)本地化計(jì)算的節(jié)點(diǎn)列表澳眷。也就是說(shuō), 每個(gè)RDD會(huì)報(bào)出一個(gè)列表(Seq), 而這個(gè)列表保存著分片優(yōu)先分配給哪個(gè)Worker節(jié)點(diǎn)計(jì)算, spark堅(jiān)持移動(dòng)計(jì)算而非移動(dòng)數(shù)據(jù)的原則. 也就是盡量在存儲(chǔ)數(shù)據(jù)的節(jié)點(diǎn)上進(jìn)行計(jì)算胡嘿。要注意的是,并不是每個(gè) RDD 都有 preferedLocation钳踊,比如從 Scala 集合中創(chuàng)建的 RDD 就沒(méi)有衷敌,而從 HDFS 讀取的 RDD 就有。
三箍土、創(chuàng)建RDD
進(jìn)行spark核心編程時(shí)逢享,先要?jiǎng)?chuàng)建一個(gè)初始的RDD,該RDD中吴藻,通常就代表和包含了spark應(yīng)用程序的輸入源數(shù)據(jù)瞒爬,然后在創(chuàng)建了初識(shí)的RDD之后,才可以通過(guò)spark core提供的transformation算子,對(duì)RDD進(jìn)行轉(zhuǎn)換侧但,來(lái)獲取其他的RDD矢空。
spark core提供了二種創(chuàng)建RDD的方式:
- 由已經(jīng)存在的Scala集合創(chuàng)建RDD
先要啟動(dòng)spark-shell
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
求和
scala> val sum = rdd1.reduce(_+_)
sum: Int = 55
- 由外部存儲(chǔ)系統(tǒng)的數(shù)據(jù)集創(chuàng)建
使用本地文件創(chuàng)建RDD
使用HDFS文件創(chuàng)建RDD
scala> val rdd3 = sc.textFile("hdfs://master:9000/student/2016113012/spark/words.txt")
rdd3: org.apache.spark.rdd.RDD[String] = hdfs://master:9000/student/2016113012/spark/words.txt MapPartitionsRDD[46] at textFile at <console>:24
統(tǒng)計(jì)文本字?jǐn)?shù)
scala> val count = rdd3.map(lines => lines.length()).reduce(_+_)
count: Int = 48
文件內(nèi)容如下
hello scala
hello java
hello python
hello wujiadong
spark默認(rèn)會(huì)為hdfs的每一個(gè)block創(chuàng)建一個(gè)partition,但是也可以通過(guò)textFile()的第二個(gè)參數(shù)手動(dòng)設(shè)置分區(qū)數(shù)量禀横,只能比block數(shù)量多屁药,不能比他更少。
四柏锄、RDD運(yùn)行流程
RDD在Spark中運(yùn)行大概分為以下三步:
- 創(chuàng)建RDD對(duì)象
- DAGScheduler模塊介入運(yùn)算酿箭,計(jì)算RDD之間的依賴關(guān)系,RDD之間的依賴關(guān)系就形成了DAG
-
每一個(gè)Job被分為多個(gè)Stage趾娃。劃分Stage的一個(gè)主要依據(jù)是當(dāng)前計(jì)算因子的輸入是否是確定的缭嫡,如果是則將其分在同一個(gè)Stage,避免多個(gè)Stage之間的消息傳遞開銷
五妇蛀、RDD如何操作
對(duì)于RDD可以有兩種操作算子:
1. 轉(zhuǎn)換(Transformation):Transformation操作是延遲計(jì)算的,也就是說(shuō)從一個(gè)RDD轉(zhuǎn)換生成另一個(gè)RDD的轉(zhuǎn)換操作不是馬上執(zhí)行笤成,需要等到有Action操作的時(shí)候才會(huì)真正觸發(fā)運(yùn)算评架。
2. 行動(dòng)(Action):Action算子會(huì)觸發(fā)Spark提交作業(yè)(Job),并將數(shù)據(jù)輸出Spark系統(tǒng)炕泳。
1.Transformation具體內(nèi)容:
2.Action具體內(nèi)容:
六纵诞、RDD的優(yōu)缺點(diǎn)
優(yōu)點(diǎn):
- 編譯時(shí)類型安全, 編譯時(shí)就能檢查出類型錯(cuò)誤。
- 面向?qū)ο蟮木幊田L(fēng)格, 直接通過(guò)類名點(diǎn)的方式來(lái)操作數(shù)據(jù)培遵。
缺點(diǎn):
- 序列化和反序列化的性能開銷, 無(wú)論是集群間的通信, 還是IO操作都需要對(duì)對(duì)象的結(jié)構(gòu)和數(shù)據(jù)進(jìn)行序列化和反序列化挣磨。
- GC的性能開銷, 頻繁的創(chuàng)建和銷毀對(duì)象, 勢(shì)必會(huì)增加GC。