RDD與算子

什么是RDD

  • RDD(Resilient Distributed Dataset)叫做彈性分布式數(shù)據(jù)集镇辉,是Spark中最基本的數(shù)據(jù)抽象,它代表一個(gè)不可變、可分區(qū)习劫、里面的元素可并行計(jì)算的集合。RDD具有數(shù)據(jù)流模型的特點(diǎn):自動容錯(cuò)嚼隘、位置感知性調(diào)度和可伸縮性诽里。RDD允許用戶在執(zhí)行多個(gè)查詢時(shí)顯式地將工作集緩存在內(nèi)存中,后續(xù)的查詢能夠重用工作集飞蛹,這極大地提升了查詢速度谤狡。

什么是DataFrame

  • DataFrame引入了schema和off-heap
    schema : RDD每一行的數(shù)據(jù), 結(jié)構(gòu)都是一樣的灸眼,這個(gè)結(jié)構(gòu)就存儲在schema中。 Spark通過schema就能夠讀懂?dāng)?shù)據(jù), 因此在通信和IO時(shí)就只需要序列化和反序列化數(shù)據(jù), 而結(jié)構(gòu)的部分就可以省略了墓懂。

什么是DataSet

  • DataSet結(jié)合了RDD和DataFrame的優(yōu)點(diǎn)焰宣,并帶來的一個(gè)新的概念Encoder。
    當(dāng)序列化數(shù)據(jù)時(shí)捕仔,Encoder產(chǎn)生字節(jié)碼與off-heap進(jìn)行交互匕积,能夠達(dá)到按需訪問數(shù)據(jù)的效果,而不用反序列化整個(gè)對象榜跌。Spark還沒有提供自定義Encoder的API闪唆,但是未來會加入。


    三者之間轉(zhuǎn)換

RDD的寬依賴和窄依賴

  • 由于RDD是粗粒度的操作數(shù)據(jù)集斜做,每個(gè)Transformation操作都會生成一個(gè)新的RDD苞氮,所以RDD之間就會形成類似流水線的前后依賴關(guān)系;RDD和它依賴的父RDD(s)的關(guān)系有兩種不同的類型瓤逼,即窄依賴(narrow dependency)和寬依賴(wide dependency)笼吟。如圖所示顯示了RDD之間的依賴關(guān)系。
  • 窄依賴:是指每個(gè)父RDD的一個(gè)Partition最多被子RDD的一個(gè)Partition所使用霸旗,例如map贷帮、filter、union等操作都會產(chǎn)生窄依賴诱告;(獨(dú)生子女)

  • 寬依賴:是指一個(gè)父RDD的Partition會被多個(gè)子RDD的Partition所使用撵枢,例如groupByKey、reduceByKey精居、sortByKey等操作都會產(chǎn)生寬依賴锄禽;(超生)

依賴關(guān)系流下面的視圖

在spark中,會根據(jù)RDD之間的依賴關(guān)系將DAG圖(有向無環(huán)圖)劃分為不同的階段靴姿,對于窄依賴沃但,由于partition依賴關(guān)系的確定性,partition的轉(zhuǎn)換處理就可以在同一個(gè)線程里完成佛吓,窄依賴就被spark劃分到同一個(gè)stage中宵晚,而對于寬依賴,只能等父RDD shuffle處理完成后维雇,下一個(gè)stage才能開始接下來的計(jì)算淤刃。

因此spark劃分stage的整體思路是:從后往前推,遇到寬依賴就斷開吱型,劃分為一個(gè)stage逸贾;遇到窄依賴就將這個(gè)RDD加入該stage中。因此在圖2中RDD C,RDD D,RDD E,RDDF被構(gòu)建在一個(gè)stage中,RDD A被構(gòu)建在一個(gè)單獨(dú)的Stage中,而RDD B和RDD G又被構(gòu)建在同一個(gè)stage中。

在spark中耕陷,Task的類型分為2種:ShuffleMapTask和ResultTask掂名;

簡單來說,DAG的最后一個(gè)階段會為每個(gè)結(jié)果的partition生成一個(gè)ResultTask哟沫,即每個(gè)Stage里面的Task的數(shù)量是由該Stage中最后一個(gè)RDD的Partition的數(shù)量所決定的!而其余所有階段都會生成ShuffleMapTask锌介;之所以稱之為ShuffleMapTask是因?yàn)樗枰獙⒆约旱挠?jì)算結(jié)果通過shuffle到下一個(gè)stage中嗜诀;也就是說上圖中的stage1和stage2相當(dāng)于mapreduce中的Mapper,而ResultTask所代表的stage3就相當(dāng)于mapreduce中的reducer。

在之前動手操作了一個(gè)wordcount程序孔祸,因此可知隆敢,Hadoop中MapReduce操作中的Mapper和Reducer在spark中的基本等量算子是map和reduceByKey;不過區(qū)別在于:Hadoop中的MapReduce天生就是排序的;而reduceByKey只是根據(jù)Key進(jìn)行reduce崔慧,但spark除了這兩個(gè)算子還有其他的算子拂蝎;因此從這個(gè)意義上來說,Spark比Hadoop的計(jì)算算子更為豐富惶室。

RDD編程API(Transformation和Action)

  • Transformation(轉(zhuǎn)換)

轉(zhuǎn)換 含義
map(func) 返回一個(gè)新的RDD温自,該RDD由每一個(gè)輸入元素經(jīng)過func函數(shù)轉(zhuǎn)換后組成
filter(func) 返回一個(gè)新的RDD,該RDD由經(jīng)過func函數(shù)計(jì)算后返回值為true的輸入元素組成
flatMap(func) 類似于map皇钞,但是每一個(gè)輸入元素可以被映射為0或多個(gè)輸出元素(所以func應(yīng)該返回一個(gè)序列悼泌,而不是單一元素)
mapPartitions(func) 類似于map,但獨(dú)立地在RDD的每一個(gè)分片上運(yùn)行夹界,因此在類型為T的RDD上運(yùn)行時(shí)馆里,func的函數(shù)類型必須是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 類似于mapPartitions,但func帶有一個(gè)整數(shù)參數(shù)表示分片的索引值可柿,因此在類型為T的RDD上運(yùn)行時(shí)鸠踪,func的函數(shù)類型必須是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根據(jù)fraction指定的比例對數(shù)據(jù)進(jìn)行采樣,可以選擇是否使用隨機(jī)數(shù)進(jìn)行替換复斥,seed用于指定隨機(jī)數(shù)生成器種子
union(otherDataset) 對源RDD和參數(shù)RDD求并集后返回一個(gè)新的RDD
intersection(otherDataset) 對源RDD和參數(shù)RDD求交集后返回一個(gè)新的RDD
distinct([numTasks])) 對源RDD進(jìn)行去重后返回一個(gè)新的RDD
groupByKey([numTasks]) 在一個(gè)(K,V)的RDD上調(diào)用营密,返回一個(gè)(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一個(gè)(K,V)的RDD上調(diào)用,返回一個(gè)(K,V)的RDD永票,使用指定的reduce函數(shù)卵贱,將相同key的值聚合到一起,與groupByKey類似侣集,reduce任務(wù)的個(gè)數(shù)可以通過第二個(gè)可選的參數(shù)來設(shè)置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 先按分區(qū)聚合 再總的聚合 每次要跟初始值交流 例如:aggregateByKey(0)(+,+) 對k/y的RDD進(jìn)行操作
sortByKey([ascending], [numTasks]) 在一個(gè)(K,V)的RDD上調(diào)用键俱,K必須實(shí)現(xiàn)Ordered接口,返回一個(gè)按照key進(jìn)行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 與sortByKey類似世分,但是更靈活 第一個(gè)參數(shù)是根據(jù)什么排序 第二個(gè)是怎么排序 false倒序 第三個(gè)排序后分區(qū)數(shù) 默認(rèn)與原RDD一樣
join(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的RDD上調(diào)用编振,返回一個(gè)相同key對應(yīng)的所有元素對在一起的(K,(V,W))的RDD 相當(dāng)于內(nèi)連接(求交集)
cogroup(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的RDD上調(diào)用,返回一個(gè)(K,(Iterable<V>,Iterable<W>))類型的RDD
cartesian(otherDataset) 兩個(gè)RDD的笛卡爾積 的成很多個(gè)K/V
pipe(command, [envVars]) 調(diào)用外部程序
coalesce(numPartitions) 重新分區(qū) 第一個(gè)參數(shù)是要分多少區(qū),第二個(gè)參數(shù)是否shuffle 默認(rèn)false 少分區(qū)變多分區(qū) true 多分區(qū)變少分區(qū) false
repartition(numPartitions) 重新分區(qū) 必須shuffle 參數(shù)是要分多少區(qū) 少變多
repartitionAndSortWithinPartitions(partitioner) 重新分區(qū)+排序 比先分區(qū)再排序效率高 對K/V的RDD進(jìn)行操作
foldByKey(zeroValue)(seqOp) 該函數(shù)用于K/V做折疊踪央,合并處理 臀玄,與aggregate類似 第一個(gè)括號的參數(shù)應(yīng)用于每個(gè)V值 第二括號函數(shù)是聚合例如:+
combineByKey 合并相同的key的值 rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
partitionBy(partitioner) 對RDD進(jìn)行分區(qū) partitioner是分區(qū)器 例如new HashPartition(2
cache persist RDD緩存,可以避免重復(fù)計(jì)算從而減少時(shí)間畅蹂,區(qū)別:cache內(nèi)部調(diào)用了persist算子健无,cache默認(rèn)就一個(gè)緩存級別MEMORY-ONLY ,而persist則可以選擇緩存級別
Subtract(rdd) 返回前rdd元素不在后rdd的rdd
leftOuterJoin leftOuterJoin類似于SQL中的左外關(guān)聯(lián)left outer join液斜,返回結(jié)果以前面的RDD為主累贤,關(guān)聯(lián)不上的記錄為空。只能用于兩個(gè)RDD之間的關(guān)聯(lián)少漆,如果要多個(gè)RDD關(guān)聯(lián)臼膏,多關(guān)聯(lián)幾次即可。
rightOuterJoin rightOuterJoin類似于SQL中的有外關(guān)聯(lián)right outer join示损,返回結(jié)果以參數(shù)中的RDD為主渗磅,關(guān)聯(lián)不上的記錄為空。只能用于兩個(gè)RDD之間的關(guān)聯(lián)检访,如果要多個(gè)RDD關(guān)聯(lián)始鱼,多關(guān)聯(lián)幾次即可
subtractByKey substractByKey和基本轉(zhuǎn)換操作中的subtract類似只不過這里是針對K的,返回在主RDD中出現(xiàn)烛谊,并且不在otherRDD中出現(xiàn)的元素
  • Action(動作)

動作 含義
reduce(func) 通過func函數(shù)聚集RDD中的所有元素风响,這個(gè)功能必須是課交換且可并聯(lián)的
collect() 在驅(qū)動程序中,以數(shù)組的形式返回?cái)?shù)據(jù)集的所有元素
count() 返回RDD的元素個(gè)數(shù)
first() 返回RDD的第一個(gè)元素(類似于take(1))
take(n) 返回一個(gè)由數(shù)據(jù)集的前n個(gè)元素組成的數(shù)組
takeSample(withReplacement,num, [seed]) 返回一個(gè)數(shù)組丹禀,該數(shù)組由從數(shù)據(jù)集中隨機(jī)采樣的num個(gè)元素組成状勤,可以選擇是否用隨機(jī)數(shù)替換不足的部分,seed用于指定隨機(jī)數(shù)生成器種子
takeOrdered(n, [ordering]) *
saveAsTextFile(path) 將數(shù)據(jù)集的元素以textfile的形式保存到HDFS文件系統(tǒng)或者其他支持的文件系統(tǒng)双泪,對于每個(gè)元素持搜,Spark將會調(diào)用toString方法,將它裝換為文件中的文本
saveAsSequenceFile(path) 將數(shù)據(jù)集中的元素以Hadoop sequencefile的格式保存到指定的目錄下焙矛,可以使HDFS或者其他Hadoop支持的文件系統(tǒng)葫盼。
saveAsObjectFile(path) *
countByKey() 針對(K,V)類型的RDD,返回一個(gè)(K,Int)的map村斟,表示每一個(gè)key對應(yīng)的元素個(gè)數(shù)贫导。
foreach(func) 在數(shù)據(jù)集的每一個(gè)元素上,運(yùn)行函數(shù)func進(jìn)行更新蟆盹。
aggregate 先對分區(qū)進(jìn)行操作孩灯,在總體操作
reduceByKeyLocally *
lookup *
top *
fold *
foreachPartition *

WordCount代碼編寫

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SparkWordCountWithScala {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    /**
      * 如果這個(gè)參數(shù)不設(shè)置,默認(rèn)認(rèn)為你運(yùn)行的是集群模式
      * 如果設(shè)置成local代表運(yùn)行的是local模式
      */
    conf.setMaster("local")
    //設(shè)置任務(wù)名
    conf.setAppName("WordCount")
    //創(chuàng)建SparkCore的程序入口
    val sc = new SparkContext(conf)
    //讀取文件 生成RDD
    val file: RDD[String] = sc.textFile("E:\\hello.txt")
    //把每一行數(shù)據(jù)按照逾滥,分割
    val word: RDD[String] = file.flatMap(_.split(","))
    //讓每一個(gè)單詞都出現(xiàn)一次
    val wordOne: RDD[(String, Int)] = word.map((_,1))
    //單詞計(jì)數(shù)
    val wordCount: RDD[(String, Int)] = wordOne.reduceByKey(_+_)
    //按照單詞出現(xiàn)的次數(shù) 降序排序
    val sortRdd: RDD[(String, Int)] = wordCount.sortBy(tuple => tuple._2,false)
    //將最終的結(jié)果進(jìn)行保存
    sortRdd.saveAsTextFile("E:\\result")

    sc.stop()
  }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末峰档,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌讥巡,老刑警劉巖掀亩,帶你破解...
    沈念sama閱讀 211,348評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異欢顷,居然都是意外死亡槽棍,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,122評論 2 385
  • 文/潘曉璐 我一進(jìn)店門抬驴,熙熙樓的掌柜王于貴愁眉苦臉地迎上來刹泄,“玉大人,你說我怎么就攤上這事怎爵。” “怎么了盅蝗?”我有些...
    開封第一講書人閱讀 156,936評論 0 347
  • 文/不壞的土叔 我叫張陵鳖链,是天一觀的道長。 經(jīng)常有香客問我墩莫,道長芙委,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,427評論 1 283
  • 正文 為了忘掉前任狂秦,我火速辦了婚禮灌侣,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘裂问。我一直安慰自己侧啼,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,467評論 6 385
  • 文/花漫 我一把揭開白布堪簿。 她就那樣靜靜地躺著痊乾,像睡著了一般。 火紅的嫁衣襯著肌膚如雪椭更。 梳的紋絲不亂的頭發(fā)上哪审,一...
    開封第一講書人閱讀 49,785評論 1 290
  • 那天,我揣著相機(jī)與錄音虑瀑,去河邊找鬼湿滓。 笑死,一個(gè)胖子當(dāng)著我的面吹牛舌狗,可吹牛的內(nèi)容都是我干的叽奥。 我是一名探鬼主播,決...
    沈念sama閱讀 38,931評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼把夸,長吁一口氣:“原來是場噩夢啊……” “哼而线!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,696評論 0 266
  • 序言:老撾萬榮一對情侶失蹤膀篮,失蹤者是張志新(化名)和其女友劉穎嘹狞,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體誓竿,經(jīng)...
    沈念sama閱讀 44,141評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡磅网,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,483評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了筷屡。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片涧偷。...
    茶點(diǎn)故事閱讀 38,625評論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖毙死,靈堂內(nèi)的尸體忽然破棺而出燎潮,到底是詐尸還是另有隱情,我是刑警寧澤扼倘,帶...
    沈念sama閱讀 34,291評論 4 329
  • 正文 年R本政府宣布确封,位于F島的核電站,受9級特大地震影響再菊,放射性物質(zhì)發(fā)生泄漏爪喘。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,892評論 3 312
  • 文/蒙蒙 一纠拔、第九天 我趴在偏房一處隱蔽的房頂上張望秉剑。 院中可真熱鬧,春花似錦稠诲、人聲如沸侦鹏。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽种柑。三九已至,卻和暖如春匹耕,著一層夾襖步出監(jiān)牢的瞬間聚请,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工稳其, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留驶赏,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,324評論 2 360
  • 正文 我出身青樓既鞠,卻偏偏與公主長得像煤傍,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子嘱蛋,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,492評論 2 348