Spark-core:Spark的算子

目錄
一.RDD基礎
????1.什么是RDD
????2.RDD的屬性
????3.RDD的創(chuàng)建方式
????4.RDD的類型
????5.RDD的基本原理
二.Transformation
三.Action
四.RDD的緩存機制
五.RDD的Checkpoint(檢查點)機制:容錯機制
六.RDD的依賴關系和Spark任務中的Stage
????1.RDD的依賴關系
????2.Spark任務中的Stage
七.RDD基礎練習

一.RDD基礎

RDD
1.什么是RDD大渤?

????RDD(Resilient Distributed Dataset)叫做彈性分布式數(shù)據(jù)集绸罗,是Spark中最基本的數(shù)據(jù)抽象怎虫,它代表一個不可變邑商、可分區(qū)增淹、里面的元素可并行計算的集合椿访。RDD具有數(shù)據(jù)流模型的特點:自動容錯、位置感知性調度和可伸縮性虑润。RDD允許用戶在執(zhí)行多個查詢時顯式地將工作集緩存在內存中成玫,后續(xù)的查詢能夠重用工作集,這極大地提升了查詢速度拳喻。

2.RDD的屬性(源碼中的一段話)

(a)是一組分區(qū)
????理解:RDD是由分區(qū)組成的哭当,每個分區(qū)運行在不同的Worker上,通過這種方式舞蔽,實現(xiàn)分布式計算荣病。

(b)split理解為分區(qū)
????在RDD中,有一系列函數(shù)渗柿,用于處理計算每個分區(qū)中的數(shù)據(jù)个盆。這里把函數(shù)叫做算子脖岛。

(c)RDD之間存在依賴關系。窄依賴颊亮,寬依賴
????需要用依賴關系來劃分Stage柴梆,任務是按照Stage來執(zhí)行的。

(d)可以自動以分區(qū)規(guī)則來創(chuàng)建RDD
????創(chuàng)建RDD時终惑,可以指定分區(qū)绍在,也可以自定義分區(qū)規(guī)則。

(e)優(yōu)先選擇離文件位置近的節(jié)點來執(zhí)行任務

3.RDD的創(chuàng)建方式

(1)通過外部的數(shù)據(jù)文件創(chuàng)建雹有,如HDFS

val rdd1 = sc.textFile(“hdfs://192.168.1.121:9000/data/data.txt”)

(2)通過sc.parallelize進行創(chuàng)建

val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
4.RDD的類型:

????TransformationAction

5.RDD的基本原理
RDD的基本原理

二.Transformation

????RDD中的所有轉換都是延遲加載的偿渡,也就是說,它們并不會直接計算結果霸奕。相反的溜宽,它們只是記住這些應用到基礎數(shù)據(jù)集(例如一個文件)上的轉換動作。只有當發(fā)生一個要求返回結果給Driver的動作時质帅,這些轉換才會真正運行适揉。這種設計讓Spark更加有效率地運行。

轉換 含義
map(func) 返回一個新的RDD煤惩,該RDD由每一個輸入元素經過func函數(shù)轉換后組成
filter(func) 返回一個新的RDD嫉嘀,該RDD由經過func函數(shù)計算后返回值為true的輸入元素組成
flatMap(func) 類似于map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列魄揉,而不是單一元素)
mapPartitions(func) 類似于map剪侮,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDD上運行時什猖,func的函數(shù)類型必須是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 類似于mapPartitions票彪,但func帶有一個整數(shù)參數(shù)表示分片的索引值红淡,因此在類型為T的RDD上運行時不狮,func的函數(shù)類型必須是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根據(jù)fraction指定的比例對數(shù)據(jù)進行采樣,可以選擇是否使用隨機數(shù)進行替換在旱,seed用于指定隨機數(shù)生成器種子
union(otherDataset) 對源RDD和參數(shù)RDD求并集后返回一個新的RDD
intersection(otherDataset) 對源RDD和參數(shù)RDD求交集后返回一個新的RDD
distinct([numTasks])) 對源RDD進行去重后返回一個新的RDD
groupByKey([numTasks]) 在一個(K,V)的RDD上調用摇零,返回一個(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一個(K,V)的RDD上調用,返回一個(K,V)的RDD桶蝎,使用指定的reduce函數(shù)驻仅,將相同key的值聚合到一起,與groupByKey類似登渣,reduce任務的個數(shù)可以通過第二個可選的參數(shù)來設置
aggregateByKey(zeroValue)(seqOp,combOp,[numTasks])
sortByKey([ascending], [numTasks]) 在一個(K,V)的RDD上調用噪服,K必須實現(xiàn)Ordered接口,返回一個按照key進行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 與sortByKey類似胜茧,但是更靈活
join(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的RDD上調用粘优,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的RDD上調用仇味,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD
cartesian(otherDataset) 笛卡爾積
pipe(command, [envVars])
coalesce(numPartitions)
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)
1.舉例:

(1)創(chuàng)建一個RDD,使用List

val rdd1 = sc.parallelize(List(1,2,3,4,5,8,34,100,79))

val rdd2 = rdd1.map(_*2)

rdd2.collect

rdd2.sortBy(x=>x,true)

rdd2.sortBy(x=>x,true).collect

rdd2.sortBy(x=>x,false).collect

rdd2.filter(_>20).collect

(2)創(chuàng)建一個RDD雹顺,字符串

val rdd4 = sc.parallelize(Array("a b c","d e f","x y z"))

val rdd5 = rdd4.flatMap(_.split(" "))

rdd5.collect

(3)集合操作

val rdd6 = sc.parallelize(List(5,6,7,8,9,10))

val rdd7 = sc.parallelize(List(1,2,3,4,5,6))

val rdd8 = rdd6.union(rdd7)

rdd8.collect

rdd8.distinct.collect

val rdd9 = rdd6.intersection(rdd7)

rdd9.collect

(4)分組操作:reduceByKey groupByKey

val rdd1 = sc.parallelize(List(("Tom",1000),("Jerry",3000),("Mary",2000)))

val rdd2 = sc.parallelize(List(("Jerry",1000),("Tom",3000),("Mike",2000)))

val rdd3 = rdd1 union rdd2

rdd3.collect

val rdd4 = rdd3.groupByKey

rdd4.collect

rdd3.reduceByKey(_+_).collect

注意:使用分組函數(shù)時丹墨,不推薦使用groupByKey,因為性能不好嬉愧,官方推薦reduceByKey

(5)cogroup操作

val rdd1 = sc.parallelize(List(("Tom",1),("Tom",2),("jerry",1),("Mike",2)))

val rdd2 = sc.parallelize(List(("jerry",2),("Tom",1),("Bob",2)))

val rdd3 = rdd1.cogroup(rdd2)

rdd3.collect

(6)reduce操作(reduce是一個Action)

val rdd1 = sc.parallelize(List(1,2,3,4,5))

val rdd2 = rdd1.reduce(_+_)

(7)需求:按value排序贩挣,SortByKey按照key排序。
做法:把Key Value交換位置没酣,并且交換兩次王财。
(a).第一步交換,把key value交換裕便,然后調用sortByKey
(b).調換位置

val rdd1 = sc.parallelize(List(("tom",1),("jerry",1),("kitty",2),("bob",1)))

val rdd2 = sc.parallelize(List(("jerry",2),("tom",3),("kitty",5),("bob",2)))

val rdd3 = rdd1 union(rdd2)

val rdd4 = rdd3.reduceByKey(_+_)

rdd4.collect

val rdd5 = rdd4.map(t => (t._2,t._1)).sortByKey(false).map(t=>(t._2,t._1))

rdd5.collect

三.Action

動??????????????????????????????????????????????????????????????作 含義
reduce(func) 通過func函數(shù)聚集RDD中的所有元素搪搏,這個功能必須是課交換且可并聯(lián)的
collect() 在驅動程序中,以數(shù)組的形式返回數(shù)據(jù)集的所有元素
count() 返回RDD的元素個數(shù)
first() 返回RDD的第一個元素(類似于take(1))
take(n) 返回一個由數(shù)據(jù)集的前n個元素組成的數(shù)組
takeSample(withReplacement,num, [seed]) 返回一個數(shù)組闪金,該數(shù)組由從數(shù)據(jù)集中隨機采樣的num個元素組成疯溺,可以選擇是否用隨機數(shù)替換不足的部分,seed用于指定隨機數(shù)生成器種子
takeOrdered(n, [ordering])
saveAsTextFile(path) 將數(shù)據(jù)集的元素以textfile的形式保存到HDFS文件系統(tǒng)或者其他支持的文件系統(tǒng)哎垦,對于每個元素囱嫩,Spark將會調用toString方法,將它裝換為文件中的文本
saveAsSequenceFile(path) 將數(shù)據(jù)集中的元素以Hadoop sequencefile的格式保存到指定的目錄下漏设,可以使HDFS或者其他Hadoop支持的文件系統(tǒng)
saveAsObjectFile(path)
countByKey() 針對(K,V)類型的RDD墨闲,返回一個(K,Int)的map,表示每一個key對應的元素個數(shù)
foreach(func) 在數(shù)據(jù)集的每一個元素上郑口,運行函數(shù)func進行更新

四.RDD的緩存機制

????RDD通過persist方法或cache方法可以將前面的計算結果緩存鸳碧,但是并不是這兩個方法被調用時立即緩存,而是觸發(fā)后面的action時犬性,該RDD將會被緩存在計算節(jié)點的內存中瞻离,并供后面重用。

????通過查看源碼發(fā)現(xiàn)cache最終也是調用了persist方法乒裆,默認的存儲級別都是僅在內存存儲一份套利,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的鹤耍。

????緩存有可能丟失肉迫,或者存儲存儲于內存的數(shù)據(jù)由于內存不足而被刪除,RDD的緩存容錯機制保證了即使緩存丟失也能保證計算的正確執(zhí)行稿黄。通過基于RDD的一系列轉換喊衫,丟失的數(shù)據(jù)會被重算,由于RDD的各個Partition是相對獨立的杆怕,因此只需要計算丟失的部分即可族购,并不需要重算全部Partition鼻听。
(1)Demo示例:

(2)通過UI進行監(jiān)控:

五.RDD的Checkpoint(檢查點)機制:容錯機制

????檢查點(本質是通過將RDD寫入Disk做檢查點)是為了通過lineage(血統(tǒng))做容錯的輔助,lineage過長會造成容錯成本過高联四,這樣就不如在中間階段做檢查點容錯撑碴,如果之后有節(jié)點出現(xiàn)問題而丟失分區(qū),從做檢查點的RDD開始重做Lineage朝墩,就會減少開銷醉拓。

????設置checkpoint的目錄,可以是本地的文件夾收苏、也可以是HDFS亿卤。一般是在具有容錯能力,高可靠的文件系統(tǒng)上(比如HDFS, S3等)設置一個檢查點路徑鹿霸,用于保存檢查點數(shù)據(jù)排吴。
????分別舉例說明:
(1)本地目錄
注意:這種模式,需要將spark-shell運行在本地模式

(2)HDFS的目錄

注意:這種模式懦鼠,需要將spark-shell運行在集群模式

(3)源碼中的一段話

六.RDD的依賴關系和Spark任務中的Stage

1.RDD的依賴關系

????RDD和它依賴的父RDD(s)的關系有兩種不同的類型钻哩,即窄依賴(narrow dependency)和寬依賴(wide dependency)。

(1)窄依賴指的是每一個父RDD的Partition最多被子RDD的一個Partition使用

總結:窄依賴我們形象的比喻為獨生子女

(2)寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition

總結:窄依賴我們形象的比喻為超生

2.Spark任務中的Stage

????DAG(Directed Acyclic Graph)叫做有向無環(huán)圖肛冶,原始的RDD通過一系列的轉換就就形成了DAG街氢,根據(jù)RDD之間的依賴關系的不同將DAG劃分成不同的Stage,對于窄依賴睦袖,partition的轉換處理在Stage中完成計算珊肃。對于寬依賴,由于有Shuffle的存在馅笙,只能在parent RDD處理完成后伦乔,才能開始接下來的計算,因此寬依賴是劃分Stage的依據(jù)

七.RDD基礎練習

(1)練習1:

//通過并行化生成rdd
val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
//對rdd1里的每一個元素乘2然后排序
val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)
//過濾出大于等于十的元素
val rdd3 = rdd2.filter(_ >= 10)
//將元素以數(shù)組的方式在客戶端顯示
rdd3.collect

(2)練習2:

val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
//將rdd1里面的每一個元素先切分在壓平
val rdd2 = rdd1.flatMap(_.split(' '))
rdd2.collect

(3)練習3:

val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求并集
val rdd3 = rdd1.union(rdd2)
//求交集
val rdd4 = rdd1.intersection(rdd2)
//去重
rdd3.distinct.collect
rdd4.collect

(4)練習4:

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求jion
val rdd3 = rdd1.join(rdd2)
rdd3.collect
//求并集
val rdd4 = rdd1 union rdd2
//按key進行分組
rdd4.groupByKey
rdd4.collect

(5)練習5:

val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//cogroup
val rdd3 = rdd1.cogroup(rdd2)
//注意cogroup與groupByKey的區(qū)別
rdd3.collect

(6)練習6:

val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
//reduce聚合
val rdd2 = rdd1.reduce(_ + _)
rdd2.collect

(7)練習7:

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2),  ("shuke", 1)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
val rdd3 = rdd1.union(rdd2)
//按key進行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.collect
//按value的降序排序
val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
rdd5.collect
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末董习,一起剝皮案震驚了整個濱河市烈和,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌阱飘,老刑警劉巖斥杜,帶你破解...
    沈念sama閱讀 218,546評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件虱颗,死亡現(xiàn)場離奇詭異沥匈,居然都是意外死亡,警方通過查閱死者的電腦和手機忘渔,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,224評論 3 395
  • 文/潘曉璐 我一進店門高帖,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人畦粮,你說我怎么就攤上這事散址」哉螅” “怎么了?”我有些...
    開封第一講書人閱讀 164,911評論 0 354
  • 文/不壞的土叔 我叫張陵预麸,是天一觀的道長瞪浸。 經常有香客問我,道長吏祸,這世上最難降的妖魔是什么对蒲? 我笑而不...
    開封第一講書人閱讀 58,737評論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮贡翘,結果婚禮上蹈矮,老公的妹妹穿的比我還像新娘。我一直安慰自己鸣驱,他們只是感情好泛鸟,可當我...
    茶點故事閱讀 67,753評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著踊东,像睡著了一般北滥。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上闸翅,一...
    開封第一講書人閱讀 51,598評論 1 305
  • 那天碑韵,我揣著相機與錄音,去河邊找鬼缎脾。 笑死祝闻,一個胖子當著我的面吹牛,可吹牛的內容都是我干的遗菠。 我是一名探鬼主播联喘,決...
    沈念sama閱讀 40,338評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼辙纬!你這毒婦竟也來了豁遭?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,249評論 0 276
  • 序言:老撾萬榮一對情侶失蹤贺拣,失蹤者是張志新(化名)和其女友劉穎蓖谢,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體譬涡,經...
    沈念sama閱讀 45,696評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡闪幽,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,888評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了涡匀。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片盯腌。...
    茶點故事閱讀 40,013評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖陨瘩,靈堂內的尸體忽然破棺而出腕够,到底是詐尸還是另有隱情级乍,我是刑警寧澤,帶...
    沈念sama閱讀 35,731評論 5 346
  • 正文 年R本政府宣布帚湘,位于F島的核電站玫荣,受9級特大地震影響,放射性物質發(fā)生泄漏大诸。R本人自食惡果不足惜崇决,卻給世界環(huán)境...
    茶點故事閱讀 41,348評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望底挫。 院中可真熱鬧恒傻,春花似錦、人聲如沸建邓。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,929評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽官边。三九已至沸手,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間注簿,已是汗流浹背契吉。 一陣腳步聲響...
    開封第一講書人閱讀 33,048評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留诡渴,地道東北人捐晶。 一個月前我還...
    沈念sama閱讀 48,203評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像妄辩,于是被迫代替她去往敵國和親惑灵。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,960評論 2 355