一俊庇、RDD為什么出現(xiàn)狮暑?
在實際開發(fā)應用中,存在許多迭代式計算暇赤,這些應用場景的共同之處是心例,不同計算階段之間會重用中間結果宵凌,即一個階段的輸出結果會作為下一個階段的輸入鞋囊。
以前常用的MapReduce框架是把中間結果寫入到HDFS中,帶來了大量的數(shù)據(jù)復制瞎惫、磁盤IO和序列化開銷溜腐。
如果有一種方法,能將結果保存在內存當中瓜喇,就可以大量減少IO消耗挺益。RDD一種彈性分布數(shù)據(jù)集,就是為了滿足這種需求而出現(xiàn)的乘寒,它提供了一個抽象的數(shù)據(jù)架構望众,我們不必擔心底層數(shù)據(jù)的分布式特性,只需將具體的應用邏輯表達為一系列轉換處理伞辛。
不同RDD之間的轉換操作形成依賴關系烂翰,可以實現(xiàn)管道化,從而避免了中間結果的落地存儲蚤氏,大大降低了數(shù)據(jù)復制甘耿、磁盤IO和序列化開銷。
二竿滨、RDD是什么佳恬?
一個RDD就是一個分布式對象集合,本質上是一個只讀的分區(qū)記錄集合于游,每個RDD可以分成多個分區(qū)毁葱,每個分區(qū)就是一個數(shù)據(jù)集片段(HDFS上的塊),并且一個RDD的不同分區(qū)可以被保存到集群中不同的節(jié)點上贰剥,從而可以在集群中的不同節(jié)點上進行并行計算倾剿。
RDD提供了一種高度受限的共享內存模型,即RDD是只讀的記錄分區(qū)的集合鸠澈,不能直接修改柱告,只能基于穩(wěn)定的物理存儲中的數(shù)據(jù)集來創(chuàng)建RDD截驮,或者通過在其他RDD上執(zhí)行確定的轉換操作(如map、join和groupBy)而創(chuàng)建得到新的RDD际度。
RDD提供了一組豐富的操作以支持常見的數(shù)據(jù)運算葵袭,分為“行動”(Action)和“轉換”(Transformation)兩種類型的算子,前者用于執(zhí)行計算并指定輸出的形式乖菱,后者指定RDD之間的相互依賴關系坡锡。
兩類操作的主要區(qū)別是,轉換操作(比如map窒所、filter鹉勒、groupBy、join等)接受RDD并返回RDD吵取,而行動操作(比如count禽额、collect等)接受RDD但是返回非RDD(即輸出一個值或結果)。
三皮官、RDD的執(zhí)行過程
1脯倒、RDD讀入外部數(shù)據(jù)源(或者內存中的數(shù)據(jù)集)進行創(chuàng)建;注意:RDD讀取數(shù)據(jù)時捺氢,一般默認2個分區(qū)藻丢。
2、RDD經(jīng)過一系列的“轉換”操作摄乒,每一次都會產(chǎn)生不同的RDD悠反,供給下一個“轉換”使用;
3馍佑、最后一個RDD經(jīng)“行動”操作進行處理斋否,并輸出到外部數(shù)據(jù)源,或者變成Scala/JAVA集合或變量挤茄。
值得注意的是如叼,RDD采用了惰性調用,即在RDD的執(zhí)行過程中穷劈,真正的計算發(fā)生在RDD的“行動”操作笼恰,對于“行動”之前的所有“轉換”操作,Spark只是記錄下“轉換”操作應用的一些基礎數(shù)據(jù)集以及RDD生成的軌跡歇终,即相互之間的依賴關系社证,而不會觸發(fā)真正的計算。
RDD血緣依賴轉換流程
從數(shù)據(jù)輸入评凝,到邏輯上生成A和C兩個RDD追葡,經(jīng)過一系列“轉換”操作,邏輯上生成了F,也是一個RDD宜肉。之所以說是邏輯上匀钧,是因為這時候計算并沒有發(fā)生,只是記錄了RDD之間的生成和依賴關系谬返。當F要進行輸出時之斯,也就是當F進行“行動”操作的時候,Spark才會根據(jù)RDD的依賴關系生成DAG遣铝,并從起點開始真正的計算佑刷。
這一處理過程:稱為一個“血緣關系(Lineage)”,即DAG拓撲排序的結果酿炸。
Spark采用惰性調用瘫絮,通過血緣關系連接起來的一系列RDD操作就可以實現(xiàn)管道化(pipeline),避免了多次轉換操作之間數(shù)據(jù)同步的等待填硕,而且不用擔心有過多的中間數(shù)據(jù)麦萤。
因為這些具有血緣關系的操作都管道化了,一個操作得到的結果不需要保存為中間數(shù)據(jù)廷支,而是直接管道式地流入到下一個操作進行處理频鉴。
同時,這種通過血緣關系就是把一系列操作進行管道化連接的設計方式恋拍,也使得管道中每次操作的計算變得相對簡單,保證了每個操作在處理邏輯上的單一性藕甩;相反施敢,在MapReduce的設計中,為了盡可能地減少MapReduce過程狭莱,在單個MapReduce中會寫入過多復雜的邏輯僵娃。
下面我們以具體代碼來講解RDD執(zhí)行過程:
object SparkTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf.setAppName("SparkTest").setMaster("local[*]")
val sparkContext = new SparkContext(conf)
val line :RDD = sparkContext.textFile("filepath")
val filt = line.filter(_.contains("spark"))
val data = filt.cache().count() println(count)
}
}
從上可以看出,一個Spark應用程序腋妙,基本是基于RDD的一系列計算操作默怨。
第1行代碼用于創(chuàng)建SparkContext對象,執(zhí)行上下文環(huán)境骤素;
第2行代碼從文件中讀取數(shù)據(jù)創(chuàng)建一個RDD匙睹;
第3行代碼對讀取的數(shù)據(jù),返回的RDD進行轉換操作得到一個新的RDD济竹,即filterRDD痕檬;
filt.cache()表示對lines進行持久化,把它保存在內存或磁盤中送浊。這里采用cache接口把數(shù)據(jù)集保存在內存中梦谜,方便后續(xù)重復使用。
注意:當數(shù)據(jù)被反復訪問時,比如查詢一些熱點數(shù)據(jù)唁桩、或者運行迭代算法時闭树,把數(shù)據(jù)緩存到內存中這是非常有用的。****而且通過cache()可以緩存非常大的數(shù)據(jù)集荒澡,支持跨越幾十甚至上百個節(jié)點蔼啦;****filt.count()是一個行動操作,用于計算一個RDD集合中包含的元素個數(shù)仰猖。
這個程序的執(zhí)行過程如下:
- 創(chuàng)建這個Spark程序的執(zhí)行上下文捏肢,即創(chuàng)建SparkContext對象;
- 從外部數(shù)據(jù)源中讀取數(shù)據(jù)創(chuàng)建fileRDD對象饥侵;
- 構建起fileRDD和filterRDD之間的依賴關系鸵赫,形成DAG圖,這時候并沒有發(fā)生真正的計算躏升,只是記錄轉換的軌跡辩棒,也就是血緣依賴關系;
- 執(zhí)行action代碼時膨疏,count()是一個行動類型的RDD一睁,觸發(fā)真正的計算。開始執(zhí)行從fileRDD到filterRDD的轉換操作佃却,并把結果持久化到內存中者吁,最后計算出filterRDD中包含的元素個數(shù)。