什么是RDD
RDD(Resilient Distributed Dataset)叫做分布式數(shù)據(jù)集拇派,是Spark中最基本的數(shù)據(jù)抽象。代碼中是一個抽象類凿跳,它代表一個不可變件豌、可分區(qū)、里面的元素可并行計算的集合控嗜。
一組分區(qū)(Partition)茧彤,即數(shù)據(jù)集的基本組成單位;
一個計算每個分區(qū)的函數(shù);
RDD之間的依賴關(guān)系;
一個Partitioner,即RDD的分片函數(shù);
一個列表疆栏,存儲存取每個Partition的優(yōu)先位置(preferred location)曾掂。
RDD創(chuàng)建的方法
- 從集合中創(chuàng)建 并行度一般為2
##makerdd或parallise都是根據(jù)totalcpucores和2比較最大值
##如果直接覆蓋makerdd或parallise的第二個分區(qū)個數(shù)的參數(shù)可以改變數(shù)量
override def defaultParallelism(): Int = {
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}
#查看源碼所得
sc.parallelize
#makeRDD實際上是在內(nèi)部創(chuàng)建了一個parallelize
sc.makeRDD
- 從文件中轉(zhuǎn)換
#從文件轉(zhuǎn)換RDD
sc.textFile
#從文件夾拉取多個文件
sc.wholeTextFiles("data/baseinput/ratings100/")
textFile在讀取小文件的時候,會參考小文件的個數(shù)壁顶,文件個數(shù)越多珠洗,分區(qū)個數(shù)越多
sc.textFile遇到小文件沒有辦法很好合并小文件的,即便重寫第二個參數(shù)也沒有作用
用textFile時博助,它的partition的數(shù)量是與文件夾下的文件數(shù)量(實例中用3個xxx.log文件)相關(guān)险污,一個文件就是一個partition(既然3個文件就是:partition=3)。
wholeTextFiles的partition數(shù)量是根據(jù)用戶指定或者文件大小來(文件內(nèi)的數(shù)據(jù)量少 有hdfs源碼默認確定的)
確定與hdfs目錄下的文件數(shù)量無關(guān)富岳!所以說:wholeTextFile通常用于讀取許多小文件的需求蛔糯。
查看RDD分區(qū)的shell命令
#從集合中創(chuàng)建
sc.parallelize(Seq(1,2,3,4))
#查看分區(qū)數(shù)量(并行數(shù)量)
res3.getNumPartitions
#查看分區(qū)并行數(shù)量的內(nèi)容
#將每一個分區(qū)形成一個數(shù)組,形成新的RDD類型時RDD[Array[T]]
res3.glom.collect
#查看分區(qū)數(shù)量(并行數(shù)量)
res3.partitions.length
關(guān)于DRR分區(qū)決定因素
第一點:RDD分區(qū)的原則是使得分區(qū)的個數(shù)盡量等于集群中的CPU核心(core)數(shù)目窖式,這樣可以充分利用CPU的計算資源蚁飒;
第二點:在實際中為了更加充分的壓榨CPU的計算資源,會把并行度設(shè)置為cpu核數(shù)的2~3倍萝喘;
第三點:RDD分區(qū)數(shù)和啟動時指定的核數(shù)淮逻、調(diào)用方法時指定的分區(qū)數(shù)、如文件本身分區(qū)數(shù)有關(guān)
partitionBy 改變分區(qū)
解析:
- 對RDD進行分區(qū)操作阁簸,如果原有的partionRDD和現(xiàn)有的partionRDD是一致的話就不進行分區(qū)爬早, 否則會生成ShuffleRDD.
scala> val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[44] at parallelize at <console>:24
scala> rdd.partitions.size
res24: Int = 4
scala> var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[45] at partitionBy at <console>:26
scala> var rdd3 = rdd.partitionBy(new org.apache.spark.RangePartitioner(2,rdd))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[40] at partitionBy at <console>:25
scala> rdd2.partitions.size
res25: Int = 2
scala> rdd2.glom.collect
res26: Array[Array[(Int, String)]] = Array(Array((2,bbb), (4,ddd)), Array((1,aaa), (3,ccc)))
scala> rdd3.glom.collect
res27: Array[Array[(Int, String)]] = Array(Array((1,aaa), (2,bbb)), Array((3,ccc), (4,ddd)))
注意:Spark采用的分區(qū)有三種:
水平分區(qū),也就是sc.makerdd按照下標元素劃分启妹,
Hash劃分根據(jù)數(shù)據(jù)確定性劃分到某個分區(qū)筛严,一般只給定分區(qū)數(shù)。
Range分區(qū)該方法一般按照元素大小進行劃分不同區(qū)域饶米,每個分區(qū)表示一個數(shù)據(jù)區(qū)域桨啃,如數(shù)組中每個數(shù)是[0,100]之間的隨機數(shù)车胡,Range劃分首先將區(qū)域劃分為10份,然后將數(shù)組中每個數(shù)字分發(fā)到不同的分區(qū)照瘾,比如將18分到(10,20]的分區(qū)匈棘,最后對每個分區(qū)進行排序。
RDD編程
在Spark中析命,RDD被表示為對象主卫,通過對象上的方法調(diào)用來對RDD進行轉(zhuǎn)換。經(jīng)過一系列的transformations定義RDD之后碳却,就可以調(diào)用actions觸發(fā)RDD的計算队秩,action可以是向應(yīng)用程序返回結(jié)果(count, collect等),或者是向存儲系統(tǒng)保存數(shù)據(jù)(saveAsTextFile等)昼浦。在Spark中馍资,只有遇到action,才會執(zhí)行RDD的計算(即延遲計算)关噪,這樣在運行時可以通過管道的方式傳輸多個轉(zhuǎn)換鸟蟹。
要使用Spark,開發(fā)者需要編寫一個Driver程序,它被提交到集群以調(diào)度運行Worker,如下圖所示磷醋。Driver中定義了一個或多個RDD宗侦,并調(diào)用RDD上的action蒸苇,Worker則執(zhí)行RDD分區(qū)計算任務(wù)。
RDD的轉(zhuǎn)化 ( 重點掌握 )
RDD整體上分為 TRANSFORMATIONS 跟 ACTIONS 兩種
Value類型
map(func) 重點
將RDD創(chuàng)建的集合轉(zhuǎn)換為另外一個映射集合,例如,如果將一個Array中的數(shù)全部 *2 輸出镐依,那么就會用到map方法。例如
//創(chuàng)建一個array
scala> sc.makeRDD(1 to 10)
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25
//集合內(nèi)每個元素*2
scala> res0.map(_*2)
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:27
//打印輸出
scala> res1.collect()
res2: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
mapPartitions(func)
類似于map天试,但獨立地在RDD的每一個分片上運行槐壳,因此在類型為T的RDD上運行時,func的函數(shù)類型必須是Iterator[T] => Iterator[U]喜每。假設(shè)有N個元素务唐,有M個分區(qū),那么map的函數(shù)的將被調(diào)用N次,而mapPartitions被調(diào)用M次,一個函數(shù)一次處理所有分區(qū)带兜。同樣以上述的需求為例:
//創(chuàng)建一個array
scala> sc.makeRDD(1 to 10)
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25
//集合內(nèi)每個元素*2
scala> res0.mapPartitions(x=>x.map(_*2))
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:27
//打印輸出
scala> res1.collect()
res2: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
mapPartitionsWithIndex(func)
類似于mapPartitions枫笛,但func帶有一個整數(shù)參數(shù)表示分片的索引值,因此在類型為T的RDD上運行時刚照,func的函數(shù)類型必須是(Int, Interator[T]) => Iterator[U]刑巧;
glom
將每一個分區(qū)形成一個數(shù)組,形成新的RDD類型時RDD[Array[T]]
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24
scala> rdd.glom().collect()
res25: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8), Array(9, 10, 11, 12), Array(13, 14, 15, 16))
groupBy(func) 重點
分組,按照傳入函數(shù)的返回值進行分組海诲。將相同的key對應(yīng)的值放入一個迭代器。
scala> val rdd = sc.parallelize(1 to 4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24
scala> val group = rdd.groupBy(_%2)
group: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at <console>:26
scala> group.collect
res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4)), (1,CompactBuffer(1, 3)))
上述例子解釋是創(chuàng)建一個1到4的序列檩互,然后把能被2整除的放進一個元祖中特幔,不能被2整除的放入另外一個元祖中。那么分組的條件就是%2
filter(func) 重點
過濾闸昨。返回一個新的RDD蚯斯,該RDD由經(jīng)過func函數(shù)計算后返回值為true的輸入元素組成。比如創(chuàng)建一個RDD(由字符串組成)饵较,過濾出一個新RDD(包含”xiao”子串)
scala> var sourceFilter = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi"))
sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24
scala> sourceFilter.collect()
res9: Array[String] = Array(xiaoming, xiaojiang, xiaohe, dazhi)
scala> val filter = sourceFilter.filter(_.contains("xiao"))
filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at <console>:26
scala> filter.collect()
res10: Array[String] = Array(xiaoming, xiaojiang, xiaohe)
sortBy(func,[ascending], [numTasks]) 重點
使用func先對數(shù)據(jù)進行處理拍嵌,按照處理后的數(shù)據(jù)比較結(jié)果排序,默認為正序循诉。
//創(chuàng)建一個RDD
scala> val rdd = sc.parallelize(List(2,1,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24
//按照自身大小排序
scala> rdd.sortBy(x => x).collect()
res11: Array[Int] = Array(1, 2, 3, 4)
//按照與3余數(shù)的大小排序
scala> rdd.sortBy(x => x%3).collect()
res12: Array[Int] = Array(3, 4, 1, 2)
Key-Value類型
partitionBy
pairRDD進行分區(qū)操作横辆,如果原有的partionRDD和現(xiàn)有的partionRDD是一致的話就不進行分區(qū), 否則會生成ShuffleRDD茄猫,即會產(chǎn)生shuffle過程狈蚤。
groupByKey
作用:groupByKey也是對每個key進行操作,但只生成一個sequence划纽。
//創(chuàng)建一個pairRDD
scala> val words = Array("one", "two", "two", "three", "three", "three")
words: Array[String] = Array(one, two, two, three, three, three)
scala> val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
wordPairsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:26
//將相同key對應(yīng)值聚合到一個sequence中
scala> val group = wordPairsRDD.groupByKey()
group: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[5] at groupByKey at <console>:28
//打印結(jié)果
scala> group.collect()
res1: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))
//計算相同key對應(yīng)值的相加結(jié)果
scala> group.map(t => (t._1, t._2.sum))
res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:31
//打印結(jié)果
scala> res2.collect()
res3: Array[(String, Int)] = Array((two,2), (one,1), (three,3))
reduceByKey(func, [numTasks])
在一個(K,V)的RDD上調(diào)用脆侮,返回一個(K,V)的RDD,使用指定的reduce函數(shù)勇劣,將相同key的值聚合到一起靖避,reduce任務(wù)的個數(shù)可以通過第二個可選的參數(shù)來設(shè)置。
//創(chuàng)建一個pairRDD
scala> val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[46] at parallelize at <console>:24
//算相同key對應(yīng)值的相加結(jié)果
scala> val reduce = rdd.reduceByKey((x,y) => x+y)
reduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[47] at reduceByKey at <console>:26
//打印結(jié)果
scala> reduce.collect()
res29: Array[(String, Int)] = Array((female,6), (male,7))
reduceByKey和groupByKey的區(qū)別
1.reduceByKey:按照key進行聚合比默,在shuffle之前有combine(預(yù)聚合)操作幻捏,返回結(jié)果是RDD[k,v].
2.groupByKey:按照key進行分組,直接進行shuffle退敦。
aggregateByKey
在kv對的RDD中粘咖,,按key將value進行分組合并侈百,合并時瓮下,將每個value和初始值作為seq函數(shù)的參數(shù),進行計算钝域,返回的結(jié)果作為一個新的kv對讽坏,然后再將結(jié)果按照key進行合并,最后將每個分組的value傳遞給combine函數(shù)進行計算(先將前兩個value進行計算例证,將返回結(jié)果和下一個value傳給combine函數(shù)路呜,以此類推),將key與計算結(jié)果作為一個新的kv對輸出。
(1)zeroValue:給每一個分區(qū)中的每一個key一個初始值胀葱;
(2)seqOp:函數(shù)用于在每一個分區(qū)中用初始值逐步迭代value漠秋;
(3)combOp:函數(shù)用于合并每個分區(qū)中的結(jié)果。
//創(chuàng)建一個pairRDD
scala> val rdd = sc.parallelize(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
//取出每個分區(qū)相同key對應(yīng)值的最大值抵屿,然后相加
scala> val agg = rdd.aggregateByKey(0)(math.max(_,_),_+_)
agg: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[1] at aggregateByKey at <console>:26
//打印結(jié)果
scala> agg.collect()
res0: Array[(String, Int)] = Array((b,3), (a,3), (c,12))
reduceByKey和groupByKey的區(qū)別
- reduceByKey:按照key進行聚合庆锦,在shuffle之前有combine(預(yù)聚合)操作,返回結(jié)果是RDD[k,v].
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
- groupByKey:按照key進行分組轧葛,直接進行shuffle搂抒。
val createCombiner = (v: V) => CompactBuffer(v) ,它把一個V變成一個C(例 如尿扯,創(chuàng)建一個單元素列表)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v 求晶,將一個V合并到一個C中(例如,將它添加到列表的末尾)
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 衷笋,將兩個C合并成一個C芳杏。
- 開發(fā)指導(dǎo):reduceByKey比groupByKey,建議使用右莱。但是需要注意是否會影響業(yè)務(wù)邏輯蚜锨。