SparkCore(一)(RDD和一些算子)

什么是RDD

RDD(Resilient Distributed Dataset)叫做分布式數(shù)據(jù)集拇派,是Spark中最基本的數(shù)據(jù)抽象。代碼中是一個抽象類凿跳,它代表一個不可變件豌、可分區(qū)、里面的元素可并行計算的集合控嗜。

  • 一組分區(qū)(Partition)茧彤,即數(shù)據(jù)集的基本組成單位;

  • 一個計算每個分區(qū)的函數(shù);

  • RDD之間的依賴關(guān)系;

  • 一個Partitioner,即RDD的分片函數(shù);

  • 一個列表疆栏,存儲存取每個Partition的優(yōu)先位置(preferred location)曾掂。

RDD創(chuàng)建的方法

  • 從集合中創(chuàng)建 并行度一般為2
##makerdd或parallise都是根據(jù)totalcpucores和2比較最大值
##如果直接覆蓋makerdd或parallise的第二個分區(qū)個數(shù)的參數(shù)可以改變數(shù)量
 override def defaultParallelism(): Int = {
    conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
  }

#查看源碼所得
sc.parallelize

#makeRDD實際上是在內(nèi)部創(chuàng)建了一個parallelize
sc.makeRDD

  • 從文件中轉(zhuǎn)換
#從文件轉(zhuǎn)換RDD
sc.textFile

#從文件夾拉取多個文件
sc.wholeTextFiles("data/baseinput/ratings100/")

  • textFile在讀取小文件的時候,會參考小文件的個數(shù)壁顶,文件個數(shù)越多珠洗,分區(qū)個數(shù)越多

  • sc.textFile遇到小文件沒有辦法很好合并小文件的,即便重寫第二個參數(shù)也沒有作用

  • 用textFile時博助,它的partition的數(shù)量是與文件夾下的文件數(shù)量(實例中用3個xxx.log文件)相關(guān)险污,一個文件就是一個partition(既然3個文件就是:partition=3)。

  • wholeTextFiles的partition數(shù)量是根據(jù)用戶指定或者文件大小來(文件內(nèi)的數(shù)據(jù)量少 有hdfs源碼默認確定的)

  • 確定與hdfs目錄下的文件數(shù)量無關(guān)富岳!所以說:wholeTextFile通常用于讀取許多小文件的需求蛔糯。

查看RDD分區(qū)的shell命令

#從集合中創(chuàng)建
sc.parallelize(Seq(1,2,3,4))

#查看分區(qū)數(shù)量(并行數(shù)量)
res3.getNumPartitions

#查看分區(qū)并行數(shù)量的內(nèi)容
#將每一個分區(qū)形成一個數(shù)組,形成新的RDD類型時RDD[Array[T]]

res3.glom.collect

#查看分區(qū)數(shù)量(并行數(shù)量)
res3.partitions.length

關(guān)于DRR分區(qū)決定因素

  • 第一點:RDD分區(qū)的原則是使得分區(qū)的個數(shù)盡量等于集群中的CPU核心(core)數(shù)目窖式,這樣可以充分利用CPU的計算資源蚁飒;

  • 第二點:在實際中為了更加充分的壓榨CPU的計算資源,會把并行度設(shè)置為cpu核數(shù)的2~3倍萝喘;

  • 第三點:RDD分區(qū)數(shù)和啟動時指定的核數(shù)淮逻、調(diào)用方法時指定的分區(qū)數(shù)、如文件本身分區(qū)數(shù)有關(guān)

partitionBy 改變分區(qū)

解析:

  • 對RDD進行分區(qū)操作阁簸,如果原有的partionRDD和現(xiàn)有的partionRDD是一致的話就不進行分區(qū)爬早, 否則會生成ShuffleRDD.
scala> val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[44] at parallelize at <console>:24

scala> rdd.partitions.size
res24: Int = 4

scala> var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[45] at partitionBy at <console>:26

scala> var rdd3 = rdd.partitionBy(new org.apache.spark.RangePartitioner(2,rdd))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[40] at partitionBy at <console>:25

scala> rdd2.partitions.size
res25: Int = 2
scala> rdd2.glom.collect
res26: Array[Array[(Int, String)]] = Array(Array((2,bbb), (4,ddd)), Array((1,aaa), (3,ccc)))
scala> rdd3.glom.collect
res27: Array[Array[(Int, String)]] = Array(Array((1,aaa), (2,bbb)), Array((3,ccc), (4,ddd)))

注意:Spark采用的分區(qū)有三種:

  • 水平分區(qū),也就是sc.makerdd按照下標元素劃分启妹,

  • Hash劃分根據(jù)數(shù)據(jù)確定性劃分到某個分區(qū)筛严,一般只給定分區(qū)數(shù)。

  • Range分區(qū)該方法一般按照元素大小進行劃分不同區(qū)域饶米,每個分區(qū)表示一個數(shù)據(jù)區(qū)域桨啃,如數(shù)組中每個數(shù)是[0,100]之間的隨機數(shù)车胡,Range劃分首先將區(qū)域劃分為10份,然后將數(shù)組中每個數(shù)字分發(fā)到不同的分區(qū)照瘾,比如將18分到(10,20]的分區(qū)匈棘,最后對每個分區(qū)進行排序。

RDD編程

在Spark中析命,RDD被表示為對象主卫,通過對象上的方法調(diào)用來對RDD進行轉(zhuǎn)換。經(jīng)過一系列的transformations定義RDD之后碳却,就可以調(diào)用actions觸發(fā)RDD的計算队秩,action可以是向應(yīng)用程序返回結(jié)果(count, collect等),或者是向存儲系統(tǒng)保存數(shù)據(jù)(saveAsTextFile等)昼浦。在Spark中馍资,只有遇到action,才會執(zhí)行RDD的計算(即延遲計算)关噪,這樣在運行時可以通過管道的方式傳輸多個轉(zhuǎn)換鸟蟹。

要使用Spark,開發(fā)者需要編寫一個Driver程序,它被提交到集群以調(diào)度運行Worker,如下圖所示磷醋。Driver中定義了一個或多個RDD宗侦,并調(diào)用RDD上的action蒸苇,Worker則執(zhí)行RDD分區(qū)計算任務(wù)。

RDD的轉(zhuǎn)化 ( 重點掌握 )

RDD整體上分為 TRANSFORMATIONS 跟 ACTIONS 兩種

Value類型

map(func) 重點

將RDD創(chuàng)建的集合轉(zhuǎn)換為另外一個映射集合,例如,如果將一個Array中的數(shù)全部 *2 輸出镐依,那么就會用到map方法。例如

//創(chuàng)建一個array
scala> sc.makeRDD(1 to 10)
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25


//集合內(nèi)每個元素*2
scala> res0.map(_*2)
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:27


//打印輸出
scala> res1.collect()
res2: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)        
mapPartitions(func)

類似于map天试,但獨立地在RDD的每一個分片上運行槐壳,因此在類型為T的RDD上運行時,func的函數(shù)類型必須是Iterator[T] => Iterator[U]喜每。假設(shè)有N個元素务唐,有M個分區(qū),那么map的函數(shù)的將被調(diào)用N次,而mapPartitions被調(diào)用M次,一個函數(shù)一次處理所有分區(qū)带兜。同樣以上述的需求為例:

//創(chuàng)建一個array
scala> sc.makeRDD(1 to 10)
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25


//集合內(nèi)每個元素*2
scala> res0.mapPartitions(x=>x.map(_*2))
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:27


//打印輸出
scala> res1.collect()
res2: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)       
mapPartitionsWithIndex(func)

類似于mapPartitions枫笛,但func帶有一個整數(shù)參數(shù)表示分片的索引值,因此在類型為T的RDD上運行時刚照,func的函數(shù)類型必須是(Int, Interator[T]) => Iterator[U]刑巧;

glom

將每一個分區(qū)形成一個數(shù)組,形成新的RDD類型時RDD[Array[T]]

scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24
scala> rdd.glom().collect()
res25: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8), Array(9, 10, 11, 12), Array(13, 14, 15, 16))
groupBy(func) 重點

分組,按照傳入函數(shù)的返回值進行分組海诲。將相同的key對應(yīng)的值放入一個迭代器。

scala> val rdd = sc.parallelize(1 to 4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24

scala> val group = rdd.groupBy(_%2)
group: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at <console>:26

scala> group.collect
res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4)), (1,CompactBuffer(1, 3)))

上述例子解釋是創(chuàng)建一個1到4的序列檩互,然后把能被2整除的放進一個元祖中特幔,不能被2整除的放入另外一個元祖中。那么分組的條件就是%2

filter(func) 重點

過濾闸昨。返回一個新的RDD蚯斯,該RDD由經(jīng)過func函數(shù)計算后返回值為true的輸入元素組成。比如創(chuàng)建一個RDD(由字符串組成)饵较,過濾出一個新RDD(包含”xiao”子串)

scala> var sourceFilter = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi"))
sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24

scala> sourceFilter.collect()
res9: Array[String] = Array(xiaoming, xiaojiang, xiaohe, dazhi)

scala> val filter = sourceFilter.filter(_.contains("xiao"))
filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at <console>:26

scala> filter.collect()
res10: Array[String] = Array(xiaoming, xiaojiang, xiaohe)

sortBy(func,[ascending], [numTasks]) 重點

使用func先對數(shù)據(jù)進行處理拍嵌,按照處理后的數(shù)據(jù)比較結(jié)果排序,默認為正序循诉。

//創(chuàng)建一個RDD
scala> val rdd = sc.parallelize(List(2,1,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24

//按照自身大小排序
scala> rdd.sortBy(x => x).collect()
res11: Array[Int] = Array(1, 2, 3, 4)

//按照與3余數(shù)的大小排序
scala> rdd.sortBy(x => x%3).collect()
res12: Array[Int] = Array(3, 4, 1, 2)

Key-Value類型

partitionBy

pairRDD進行分區(qū)操作横辆,如果原有的partionRDD和現(xiàn)有的partionRDD是一致的話就不進行分區(qū), 否則會生成ShuffleRDD茄猫,即會產(chǎn)生shuffle過程狈蚤。

groupByKey

作用:groupByKey也是對每個key進行操作,但只生成一個sequence划纽。

//創(chuàng)建一個pairRDD
scala> val words = Array("one", "two", "two", "three", "three", "three")
words: Array[String] = Array(one, two, two, three, three, three)

scala> val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
wordPairsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:26

//將相同key對應(yīng)值聚合到一個sequence中
scala> val group = wordPairsRDD.groupByKey()
group: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[5] at groupByKey at <console>:28

//打印結(jié)果
scala> group.collect()
res1: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))

//計算相同key對應(yīng)值的相加結(jié)果
scala> group.map(t => (t._1, t._2.sum))
res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:31

//打印結(jié)果
scala> res2.collect()
res3: Array[(String, Int)] = Array((two,2), (one,1), (three,3))
reduceByKey(func, [numTasks])

在一個(K,V)的RDD上調(diào)用脆侮,返回一個(K,V)的RDD,使用指定的reduce函數(shù)勇劣,將相同key的值聚合到一起靖避,reduce任務(wù)的個數(shù)可以通過第二個可選的參數(shù)來設(shè)置。

//創(chuàng)建一個pairRDD
scala> val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[46] at parallelize at <console>:24

//算相同key對應(yīng)值的相加結(jié)果
scala> val reduce = rdd.reduceByKey((x,y) => x+y)
reduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[47] at reduceByKey at <console>:26

//打印結(jié)果
scala> reduce.collect()
res29: Array[(String, Int)] = Array((female,6), (male,7))

reduceByKey和groupByKey的區(qū)別

1.reduceByKey:按照key進行聚合比默,在shuffle之前有combine(預(yù)聚合)操作幻捏,返回結(jié)果是RDD[k,v].

2.groupByKey:按照key進行分組,直接進行shuffle退敦。

aggregateByKey

在kv對的RDD中粘咖,,按key將value進行分組合并侈百,合并時瓮下,將每個value和初始值作為seq函數(shù)的參數(shù),進行計算钝域,返回的結(jié)果作為一個新的kv對讽坏,然后再將結(jié)果按照key進行合并,最后將每個分組的value傳遞給combine函數(shù)進行計算(先將前兩個value進行計算例证,將返回結(jié)果和下一個value傳給combine函數(shù)路呜,以此類推),將key與計算結(jié)果作為一個新的kv對輸出。

(1)zeroValue:給每一個分區(qū)中的每一個key一個初始值胀葱;

(2)seqOp:函數(shù)用于在每一個分區(qū)中用初始值逐步迭代value漠秋;

(3)combOp:函數(shù)用于合并每個分區(qū)中的結(jié)果。

//創(chuàng)建一個pairRDD
scala> val rdd = sc.parallelize(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24

//取出每個分區(qū)相同key對應(yīng)值的最大值抵屿,然后相加
scala> val agg = rdd.aggregateByKey(0)(math.max(_,_),_+_)
agg: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[1] at aggregateByKey at <console>:26

//打印結(jié)果
scala> agg.collect()
res0: Array[(String, Int)] = Array((b,3), (a,3), (c,12))
reduceByKey和groupByKey的區(qū)別
  1. reduceByKey:按照key進行聚合庆锦,在shuffle之前有combine(預(yù)聚合)操作,返回結(jié)果是RDD[k,v].
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
  1. groupByKey:按照key進行分組轧葛,直接進行shuffle搂抒。
val createCombiner = (v: V) => CompactBuffer(v) ,它把一個V變成一個C(例    如尿扯,創(chuàng)建一個單元素列表)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v 求晶,將一個V合并到一個C中(例如,將它添加到列表的末尾)
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 衷笋,將兩個C合并成一個C芳杏。
  1. 開發(fā)指導(dǎo):reduceByKey比groupByKey,建議使用右莱。但是需要注意是否會影響業(yè)務(wù)邏輯蚜锨。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市慢蜓,隨后出現(xiàn)的幾起案子亚再,更是在濱河造成了極大的恐慌,老刑警劉巖晨抡,帶你破解...
    沈念sama閱讀 219,366評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件氛悬,死亡現(xiàn)場離奇詭異,居然都是意外死亡耘柱,警方通過查閱死者的電腦和手機如捅,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,521評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來调煎,“玉大人镜遣,你說我怎么就攤上這事∈堪溃” “怎么了悲关?”我有些...
    開封第一講書人閱讀 165,689評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長娄柳。 經(jīng)常有香客問我寓辱,道長,這世上最難降的妖魔是什么赤拒? 我笑而不...
    開封第一講書人閱讀 58,925評論 1 295
  • 正文 為了忘掉前任秫筏,我火速辦了婚禮诱鞠,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘这敬。我一直安慰自己航夺,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,942評論 6 392
  • 文/花漫 我一把揭開白布崔涂。 她就那樣靜靜地躺著敷存,像睡著了一般。 火紅的嫁衣襯著肌膚如雪堪伍。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,727評論 1 305
  • 那天觅闽,我揣著相機與錄音帝雇,去河邊找鬼。 笑死蛉拙,一個胖子當(dāng)著我的面吹牛尸闸,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播孕锄,決...
    沈念sama閱讀 40,447評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼吮廉,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了畸肆?” 一聲冷哼從身側(cè)響起宦芦,我...
    開封第一講書人閱讀 39,349評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎轴脐,沒想到半個月后调卑,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,820評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡大咱,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,990評論 3 337
  • 正文 我和宋清朗相戀三年恬涧,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片碴巾。...
    茶點故事閱讀 40,127評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡溯捆,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出厦瓢,到底是詐尸還是另有隱情提揍,我是刑警寧澤,帶...
    沈念sama閱讀 35,812評論 5 346
  • 正文 年R本政府宣布旷痕,位于F島的核電站碳锈,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏欺抗。R本人自食惡果不足惜售碳,卻給世界環(huán)境...
    茶點故事閱讀 41,471評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧贸人,春花似錦间景、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,017評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至十拣,卻和暖如春封拧,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背夭问。 一陣腳步聲響...
    開封第一講書人閱讀 33,142評論 1 272
  • 我被黑心中介騙來泰國打工泽西, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人缰趋。 一個月前我還...
    沈念sama閱讀 48,388評論 3 373
  • 正文 我出身青樓捧杉,卻偏偏與公主長得像,于是被迫代替她去往敵國和親味抖。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,066評論 2 355

推薦閱讀更多精彩內(nèi)容