1 RDD編程
1.1 Action算子
1.1.1 reduce(func)
- 作用:通過func函數(shù)聚集RDD中的所有元素棋傍,先聚合分區(qū)內(nèi)數(shù)據(jù)雀彼,再聚合分區(qū)間數(shù)據(jù)滋将。
- 需求:創(chuàng)建一個RDD捏悬,將所有元素聚合得到結(jié)果屑彻。
(1)創(chuàng)建一個RDD[Int]
scala> val rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at makeRDD at <console>:24
(2)聚合RDD[Int]所有元素
scala> rdd1.reduce(_+_)
res50: Int = 55
(3)創(chuàng)建一個RDD[String]
scala> val rdd2 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[86] at makeRDD at <console>:24
(4)聚合RDD[String]所有數(shù)據(jù)
scala> rdd2.reduce((x,y)=>(x._1 + y._1,x._2 + y._2))
res51: (String, Int) = (adca,12)
1.1.2 collect()
- 作用:在驅(qū)動程序中验庙,以數(shù)組的形式返回數(shù)據(jù)集的所有元素。
- 需求:創(chuàng)建一個RDD社牲,并將RDD內(nèi)容收集到Driver端打印
(1)創(chuàng)建一個RDD
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
(2)將結(jié)果收集到Driver端
scala> rdd.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
1.2.3 count()
- 作用:返回RDD中元素的個數(shù)
- 需求:創(chuàng)建一個RDD粪薛,統(tǒng)計該RDD的條數(shù)
(1)創(chuàng)建一個RDD
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
(2)統(tǒng)計該RDD的條數(shù)
scala> rdd.count
res1: Long = 10
1.1.4 first()案例
- 作用:返回RDD中的第一個元素
- 需求:創(chuàng)建一個RDD,返回該RDD中的第一個元素
(1)創(chuàng)建一個RDD
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
(2)統(tǒng)計該RDD的條數(shù)
scala> rdd.first
res2: Int = 1
1.1.5 take(n)案例
- 作用:返回一個由RDD的前n個元素組成的數(shù)組
- 需求:創(chuàng)建一個RDD搏恤,統(tǒng)計該RDD的條數(shù)
(1)創(chuàng)建一個RDD
scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
(2)統(tǒng)計該RDD的條數(shù)
scala> rdd.take(3)
res10: Array[Int] = Array(2, 5, 4)
1.1.6 takeOrdered(n)案例
- 作用:返回該RDD排序后的前n個元素組成的數(shù)組
- 需求:創(chuàng)建一個RDD违寿,統(tǒng)計該RDD的條數(shù)
(1)創(chuàng)建一個RDD
scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
(2)統(tǒng)計該RDD的條數(shù)
scala> rdd.takeOrdered(3)
res18: Array[Int] = Array(2, 3, 4)
1.1.7 aggregate
- 參數(shù):(zeroValue: U)(seqOp: (U, T) ? U, combOp: (U, U) ? U)
- 作用:aggregate函數(shù)將每個分區(qū)里面的元素通過seqOp和初始值進行聚合,然后用combine函數(shù)將每個分區(qū)的結(jié)果和初始值(zeroValue)進行combine操作熟空。這個函數(shù)最終返回的類型不需要和RDD中元素類型一致陨界。
- 需求:創(chuàng)建一個RDD,將所有元素相加得到結(jié)果
(1)創(chuàng)建一個RDD
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24
(2)將該RDD所有元素相加得到結(jié)果
scala> rdd.aggregate(0)(_+_,_+_)
res22: Int = 55
1.1.8 fold(num)(func)
- 作用:折疊操作痛阻,aggregate的簡化操作菌瘪,seqop和combop一樣。
- 需求:創(chuàng)建一個RDD阱当,將所有元素相加得到結(jié)果
(1)創(chuàng)建一個RDD
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24
(2)將該RDD所有元素相加得到結(jié)果
scala> rdd.fold(0)(_+_)
res24: Int = 55
1.1.9 saveAsTextFile(path)
作用:將數(shù)據(jù)集的元素以textfile的形式保存到HDFS文件系統(tǒng)或者其他支持的文件系統(tǒng)俏扩,對于每個元素,Spark將會調(diào)用toString方法弊添,將它裝換為文件中的文本
1.1.10 saveAsSequenceFile(path)
作用:將數(shù)據(jù)集中的元素以Hadoop sequencefile的格式保存到指定的目錄下录淡,可以使HDFS或者其他Hadoop支持的文件系統(tǒng)。
1.1.11 saveAsObjectFile(path)
作用:用于將RDD中的元素序列化成對象油坝,存儲到文件中嫉戚。
1.1.12 countByKey()
- 作用:針對(K,V)類型的RDD,返回一個(K,Int)的map澈圈,表示每一個key對應(yīng)的元素個數(shù)彬檀。
- 需求:創(chuàng)建一個PairRDD,統(tǒng)計每種key的個數(shù)
(1)創(chuàng)建一個PairRDD
scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[95] at parallelize at <console>:24
(2)統(tǒng)計每種key的個數(shù)
scala> rdd.countByKey
res63: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)
1.1.13 foreach(func)
- 作用:在數(shù)據(jù)集的每一個元素上瞬女,運行函數(shù)func進行更新窍帝。
- 需求:創(chuàng)建一個RDD,對每個元素進行打印
(1)創(chuàng)建一個RDD
scala> var rdd = sc.makeRDD(1 to 5,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[107] at makeRDD at <console>:24
(2)對該RDD每個元素進行打印
scala> rdd.foreach(println(_))
3
4
5
1
2
1.2 RDD中的函數(shù)傳遞
在實際開發(fā)中我們往往需要自己定義一些對于RDD的操作诽偷,那么此時需要注意的是坤学,初始化工作是在Driver端進行的疯坤,而實際運行程序是在Executor端進行的,這就涉及到了跨進程通信深浮,是需要序列化的压怠。
1.2.1 傳遞一個方法
1.創(chuàng)建一個類
class Search(s:String){
//過濾出包含字符串的數(shù)據(jù)
def isMatch(s: String): Boolean = {
s.contains(query)
}
//過濾出包含字符串的RDD
def getMatch1 (rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
//過濾出包含字符串的RDD
def getMatche2(rdd: RDD[String]): RDD[String] = {
rdd.filter(x => x.contains(query))
}
}
2.創(chuàng)建Spark主程序
object SeriTest {
def main(args: Array[String]): Unit = {
//1.初始化配置信息及SparkContext
val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
//2.創(chuàng)建一個RDD
val rdd: RDD[String] = sc.parallelize(Array("hadoop", "spark", "hive", "atguigu"))
//3.創(chuàng)建一個Search對象
val search = new Search()
//4.運用第一個過濾函數(shù)并打印結(jié)果
val match1: RDD[String] = search.getMatche1(rdd)
match1.collect().foreach(println)
}
}
3.運行程序
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.filter(RDD.scala:386)
at com.atguigu.Search.getMatche1(SeriTest.scala:39)
at com.atguigu.SeriTest$.main(SeriTest.scala:18)
at com.atguigu.SeriTest.main(SeriTest.scala)
Caused by: java.io.NotSerializableException: com.atguigu.Search
4.問題說明
//過濾出包含字符串的RDD
def getMatch1 (rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
在這個方法中所調(diào)用的方法isMatch()是定義在Search這個類中的,實際上調(diào)用的是this. isMatch()飞苇,this表示Search這個類的對象刑峡,程序在運行過程中需要將Search對象序列化以后傳遞到Executor端。
5.解決方案
使類繼承scala.Serializable即可玄柠。
class Search() extends Serializable{...}
1.2.2 傳遞一個屬性
1.創(chuàng)建Spark主程序
object TransmitTest {
def main(args: Array[String]): Unit = {
//1.初始化配置信息及SparkContext
val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
//2.創(chuàng)建一個RDD
val rdd: RDD[String] = sc.parallelize(Array("hadoop", "spark", "hive", "atguigu"))
//3.創(chuàng)建一個Search對象
val search = new Search()
//4.運用第一個過濾函數(shù)并打印結(jié)果
val match1: RDD[String] = search.getMatche2(rdd)
match1.collect().foreach(println)
}
}
2.運行程序
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.filter(RDD.scala:386)
at com.atguigu.Search.getMatche1(SeriTest.scala:39)
at com.atguigu.SeriTest$.main(SeriTest.scala:18)
at com.atguigu.SeriTest.main(SeriTest.scala)
Caused by: java.io.NotSerializableException: com.atguigu.Search
3.問題說明
//過濾出包含字符串的RDD
def getMatche2(rdd: RDD[String]): RDD[String] = {
rdd.filter(x => x.contains(query))
}
在這個方法中所調(diào)用的方法query是定義在Search這個類中的字段突梦,實際上調(diào)用的是this. query,this表示Search這個類的對象,程序在運行過程中需要將Search對象序列化以后傳遞到Executor端。
4.解決方案
1)使類繼承scala.Serializable即可凶掰。
class Search() extends Serializable{...}
2)將類變量query賦值給局部變量
修改getMatche2為
//過濾出包含字符串的RDD
def getMatche2(rdd: RDD[String]): RDD[String] = {
val query_ : String = this.query//將類變量賦值給局部變量
rdd.filter(x => x.contains(query_))
}
1.3 RDD依賴關(guān)系
1.3.1 Lineage
RDD只支持粗粒度轉(zhuǎn)換终抽,即在大量記錄上執(zhí)行的單個操作。將創(chuàng)建RDD的一系列Lineage(血統(tǒng))記錄下來,以便恢復(fù)丟失的分區(qū)。RDD的Lineage會記錄RDD的元數(shù)據(jù)信息和轉(zhuǎn)換行為,當該RDD的部分分區(qū)數(shù)據(jù)丟失時皇帮,它可以根據(jù)這些信息來重新運算和恢復(fù)丟失的數(shù)據(jù)分區(qū)。
(1)讀取一個HDFS文件并將其中內(nèi)容映射成一個個元組
scala> val wordAndOne = sc.textFile("/fruit.tsv").flatMap(_.split("\t")).map((_,1))
wordAndOne: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[22] at map at <console>:24
(2)統(tǒng)計每一種key對應(yīng)的個數(shù)
scala> val wordAndCount = wordAndOne.reduceByKey(_+_)
wordAndCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:26
(3)查看“wordAndOne”的Lineage
scala> wordAndOne.toDebugString
res5: String =
(2) MapPartitionsRDD[22] at map at <console>:24 []
| MapPartitionsRDD[21] at flatMap at <console>:24 []
| /fruit.tsv MapPartitionsRDD[20] at textFile at <console>:24 []
| /fruit.tsv HadoopRDD[19] at textFile at <console>:24 []
(4)查看“wordAndCount”的Lineage
scala> wordAndCount.toDebugString
res6: String =
(2) ShuffledRDD[23] at reduceByKey at <console>:26 []
+-(2) MapPartitionsRDD[22] at map at <console>:24 []
| MapPartitionsRDD[21] at flatMap at <console>:24 []
| /fruit.tsv MapPartitionsRDD[20] at textFile at <console>:24 []
| /fruit.tsv HadoopRDD[19] at textFile at <console>:24 []
(5)查看“wordAndOne”的依賴類型
scala> wordAndOne.dependencies
res7: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@5d5db92b)
(6)查看“wordAndCount”的依賴類型
scala> wordAndCount.dependencies
res8: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@63f3e6a8)
注意:RDD和它依賴的父RDD(s)的關(guān)系有兩種不同的類型蛋辈,即窄依賴(narrow dependency)和寬依賴(wide dependency)属拾。
1.3..2 窄依賴
窄依賴指的是每一個父RDD的Partition最多被子RDD的一個Partition使用
1.3..3 寬依賴
寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition,會引起shuffle
1.3.4 DAG
DAG(Directed Acyclic Graph)叫做有向無環(huán)圖冷溶,原始的RDD通過一系列的轉(zhuǎn)換就就形成了DAG渐白,根據(jù)RDD之間的依賴關(guān)系的不同將DAG劃分成不同的Stage,對于窄依賴逞频,partition的轉(zhuǎn)換處理在Stage中完成計算纯衍。對于寬依賴,由于有Shuffle的存在苗胀,只能在parent RDD處理完成后襟诸,才能開始接下來的計算,因此寬依賴是劃分Stage的依據(jù)基协。
1.3.5 任務(wù)劃分
RDD任務(wù)切分中間分為:Application歌亲、Job、Stage和Task
1)Application:初始化一個SparkContext即生成一個Application
2)Job:一個Action算子就會生成一個Job
3)Stage:根據(jù)RDD之間的依賴關(guān)系的不同將Job劃分成不同的Stage堡掏,遇到一個寬依賴則劃分一個Stage应结。
4)Task:Stage是一個TaskSet,將Stage劃分的結(jié)果發(fā)送到不同的Executor執(zhí)行即為一個Task泉唁。
注意:Application->Job->Stage-> Task每一層都是1對n的關(guān)系鹅龄。
1.4 RDD緩存
RDD通過persist方法或cache方法可以將前面的計算結(jié)果緩存,默認情況下 persist() 會把數(shù)據(jù)以序列化的形式緩存在 JVM 的堆空間中亭畜。
但是并不是這兩個方法被調(diào)用時立即緩存扮休,而是觸發(fā)后面的action時,該RDD將會被緩存在計算節(jié)點的內(nèi)存中拴鸵,并供后面重用玷坠。
通過查看源碼發(fā)現(xiàn)cache最終也是調(diào)用了persist方法,默認的存儲級別都是僅在內(nèi)存存儲一份劲藐,Spark的存儲級別還有好多種八堡,存儲級別在object StorageLevel中定義的。
在存儲級別的末尾加上“_2”來把持久化數(shù)據(jù)存為兩份
緩存有可能丟失聘芜,或者存儲存儲于內(nèi)存的數(shù)據(jù)由于內(nèi)存不足而被刪除兄渺,RDD的緩存容錯機制保證了即使緩存丟失也能保證計算的正確執(zhí)行。通過基于RDD的一系列轉(zhuǎn)換汰现,丟失的數(shù)據(jù)會被重算挂谍,由于RDD的各個Partition是相對獨立的,因此只需要計算丟失的部分即可瞎饲,并不需要重算全部Partition口叙。
(1)創(chuàng)建一個RDD
scala> val rdd = sc.makeRDD(Array("spark"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at makeRDD at <console>:24
(2)將RDD轉(zhuǎn)換為攜帶當前時間戳不做緩存
scala> val nocache = rdd.map(_.toString+System.currentTimeMillis)
nocache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at map at <console>:25
(3)多次打印結(jié)果
scala> nocache.collect
res0: Array[String] = Array(spark1641648704486)
scala> nocache.collect
res1: Array[String] = Array(spark1641648706011)
scala> nocache.collect
res2: Array[String] = Array(spark1641648708796)
(4)將RDD轉(zhuǎn)換為攜帶當前時間戳并做緩存
scala> val cache = rdd.map(_.toString+System.currentTimeMillis).cache
cache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at map at <console>:25
(5)多次打印做了緩存的結(jié)果
scala> cache.collect
res4: Array[String] = Array(spark1641648778868)
scala> cache.collect
res5: Array[String] = Array(spark1641648778868)
scala> cache.collect
res6: Array[String] = Array(spark1641648778868)
1.5 RDD CheckPoint
Spark中對于數(shù)據(jù)的保存除了持久化操作之外,還提供了一種檢查點的機制嗅战,檢查點(本質(zhì)是通過將RDD寫入Disk做檢查點)是為了通過lineage做容錯的輔助妄田,lineage過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯驮捍,如果之后有節(jié)點出現(xiàn)問題而丟失分區(qū)形庭,從做檢查點的RDD開始重做Lineage,就會減少開銷厌漂。檢查點通過將數(shù)據(jù)寫入到HDFS文件系統(tǒng)實現(xiàn)了RDD的檢查點功能萨醒。
為當前RDD設(shè)置檢查點。該函數(shù)將會創(chuàng)建一個二進制的文件苇倡,并存儲到checkpoint目錄中富纸,該目錄是用SparkContext.setCheckpointDir()設(shè)置的。在checkpoint的過程中旨椒,該RDD的所有依賴于父RDD中的信息將全部被移除晓褪。對RDD進行checkpoint操作并不會馬上被執(zhí)行,必須執(zhí)行Action操作才能觸發(fā)综慎。
案例實操:
(1)設(shè)置檢查點
scala> sc.setCheckpointDir("hdfs://cluster/checkpoint")
(2)創(chuàng)建一個RDD
scala> val rdd = sc.parallelize(Array("spark"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[14] at parallelize at <console>:24
(3)將RDD轉(zhuǎn)換為攜帶當前時間戳并做checkpoint
scala> val ch = rdd.map(_+System.currentTimeMillis)
ch: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at map at <console>:26
scala> ch.checkpoint
(4)多次打印結(jié)果
scala> ch.collect
res2: Array[String] = Array(spark1641649020404)
scala> ch.collect
res3: Array[String] = Array(spark1641649020525)
scala> ch.collect
res4: Array[String] = Array(spark1641649020525)
scala> ch.collect
res5: Array[String] = Array(spark1641649020525)
2 鍵值對RDD數(shù)據(jù)分區(qū)器
Spark目前支持Hash分區(qū)和Range分區(qū)涣仿,用戶也可以自定義分區(qū),Hash分區(qū)為當前的默認分區(qū),Spark中分區(qū)器直接決定了RDD中分區(qū)的個數(shù)好港、RDD中每條數(shù)據(jù)經(jīng)過Shuffle過程屬于哪個分區(qū)和Reduce的個數(shù)
注意:
(1)只有Key-Value類型的RDD才有分區(qū)器的愉镰,非Key-Value類型的RDD分區(qū)器的值是None
(2)每個RDD的分區(qū)ID范圍:0~numPartitions-1,決定這個值是屬于那個分區(qū)的钧汹。
2.1 獲取RDD分區(qū)
可以通過使用RDD的partitioner 屬性來獲取 RDD 的分區(qū)方式丈探。它會返回一個 scala.Option 對象, 通過get方法獲取其中的值拔莱。相關(guān)源碼如下:
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
(1)創(chuàng)建一個pairRDD
scala> val pairs = sc.parallelize(List((1,1),(2,2),(3,3)))
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:24
(2)查看RDD的分區(qū)器
scala> pairs.partitioner
res1: Option[org.apache.spark.Partitioner] = None
(3)導(dǎo)入HashPartitioner類
scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner
(4)使用HashPartitioner對RDD進行重新分區(qū)
scala> val partitioned = pairs.partitionBy(new HashPartitioner(2))
partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[4] at partitionBy at <console>:27
(5)查看重新分區(qū)后RDD的分區(qū)器
scala> partitioned.partitioner
res2: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)
2.2 Hash分區(qū)
HashPartitioner分區(qū)的原理:對于給定的key碗降,計算其hashCode,并除以分區(qū)的個數(shù)取余塘秦,如果余數(shù)小于0讼渊,則用余數(shù)+分區(qū)的個數(shù)(否則加0),最后返回的值就是這個key所屬的分區(qū)ID尊剔。
示例:
scala> nopar.partitioner
res20: Option[org.apache.spark.Partitioner] = None
scala> val nopar = sc.parallelize(List((1,3),(1,2),(2,4),(2,3),(3,6),(3,8)),8)
nopar: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:24
scala>nopar.mapPartitionsWithIndex((index,iter)=>{ Iterator(index.toString+" : "+iter.mkString("|")) }).collect
res0: Array[String] = Array("0 : ", 1 : (1,3), 2 : (1,2), 3 : (2,4), "4 : ", 5 : (2,3), 6 : (3,6), 7 : (3,8))
scala> val hashpar = nopar.partitionBy(new org.apache.spark.HashPartitioner(7))
hashpar: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[12] at partitionBy at <console>:26
scala> hashpar.count
res18: Long = 6
scala> hashpar.partitioner
res21: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@7)
scala> hashpar.mapPartitions(iter => Iterator(iter.length)).collect()
res19: Array[Int] = Array(0, 3, 1, 2, 0, 0, 0)
2.3 Ranger分區(qū)
HashPartitioner分區(qū)弊端:可能導(dǎo)致每個分區(qū)中數(shù)據(jù)量的不均勻爪幻,極端情況下會導(dǎo)致某些分區(qū)擁有RDD的全部數(shù)據(jù)。
RangePartitioner作用:將一定范圍內(nèi)的數(shù)映射到某一個分區(qū)內(nèi)赋兵,盡量保證每個分區(qū)中數(shù)據(jù)量的均勻笔咽,而且分區(qū)與分區(qū)之間是有序的,一個分區(qū)中的元素肯定都是比另一個分區(qū)內(nèi)的元素小或者大霹期,但是分區(qū)內(nèi)的元素是不能保證順序的叶组。簡單的說就是將一定范圍內(nèi)的數(shù)映射到某一個分區(qū)內(nèi)。實現(xiàn)過程為:
第一步:先重整個RDD中抽取出樣本數(shù)據(jù)历造,將樣本數(shù)據(jù)排序甩十,計算出每個分區(qū)的最大key值,形成一個Array[KEY]類型的數(shù)組變量rangeBounds吭产;
第二步:判斷key在rangeBounds中所處的范圍侣监,給出該key值在下一個RDD中的分區(qū)id下標;該分區(qū)器要求RDD中的KEY類型必須是可以排序的
2.4 自定義分區(qū)
要實現(xiàn)自定義的分區(qū)器臣淤,你需要繼承 org.apache.spark.Partitioner 類并實現(xiàn)下面三個方法橄霉。
(1)numPartitions: Int:返回創(chuàng)建出來的分區(qū)數(shù)。
(2)getPartition(key: Any): Int:返回給定鍵的分區(qū)編號(0到numPartitions-1)邑蒋。
(3)equals():Java 判斷相等性的標準方法姓蜂。這個方法的實現(xiàn)非常重要,Spark 需要用這個方法來檢查你的分區(qū)器對象是否和其他分區(qū)器實例相同医吊,這樣 Spark 才可以判斷兩個 RDD 的分區(qū)方式是否相同钱慢。
需求:將相同后綴的數(shù)據(jù)寫入相同的文件,通過將相同后綴的數(shù)據(jù)分區(qū)到相同的分區(qū)并保存輸出來實現(xiàn)卿堂。
(1)創(chuàng)建一個pairRDD
scala> val data = sc.parallelize(Array((1,1),(2,2),(3,3),(4,4),(5,5),(6,6)))
data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:24
(2)定義一個自定義分區(qū)類
scala> :paste
// Entering paste mode (ctrl-D to finish)
class CustomerPartitioner(numParts:Int) extends org.apache.spark.Partitioner{
//覆蓋分區(qū)數(shù)
override def numPartitions: Int = numParts
//覆蓋分區(qū)號獲取函數(shù)
override def getPartition(key: Any): Int = {
val ckey: String = key.toString
ckey.substring(ckey.length-1).toInt%numParts
}
}
// Exiting paste mode, now interpreting.
defined class CustomerPartitioner
(3)將RDD使用自定義的分區(qū)類進行重新分區(qū)
scala> val par = data.partitionBy(new CustomerPartitioner(2))
par: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[2] at partitionBy at <console>:27
(4)查看重新分區(qū)后的數(shù)據(jù)分布
scala> par.mapPartitionsWithIndex((index,items)=>items.map((index,_))).collect
res3: Array[(Int, (Int, Int))] = Array((0,(2,2)), (0,(4,4)), (0,(6,6)), (1,(1,1)), (1,(3,3)), (1,(5,5)))
使用自定義的 Partitioner 是很容易的:只要把它傳給 partitionBy() 方法即可束莫。Spark 中有許多依賴于數(shù)據(jù)混洗的方法懒棉,比如 join() 和 groupByKey(),它們也可以接收一個可選的 Partitioner 對象來控制輸出數(shù)據(jù)的分區(qū)方式览绿。
3 數(shù)據(jù)讀取與保存
Spark的數(shù)據(jù)讀取及數(shù)據(jù)保存可以從兩個維度來作區(qū)分:文件格式以及文件系統(tǒng)策严。
文件格式分為:Text文件、Json文件挟裂、Csv文件享钞、Sequence文件以及Object文件揍诽;
文件系統(tǒng)分為:本地文件系統(tǒng)诀蓉、HDFS、HBASE以及數(shù)據(jù)庫暑脆。
3.1 文件類數(shù)據(jù)讀取與保存
3.1.1 Text文件
1)數(shù)據(jù)讀取:textFile(String)
scala> val hdfsFile = sc.textFile("hdfs://cluster/test.txt")
hdfsFile: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/fruit.txt MapPartitionsRDD[21] at textFile at <console>:24
2)數(shù)據(jù)保存: saveAsTextFile(String)
scala> hdfsFile.saveAsTextFile("/testOut")
3.1.2 Json文件
如果JSON文件中每一行就是一個JSON記錄渠啤,那么可以通過將JSON文件當做文本文件來讀取,然后利用相關(guān)的JSON庫對每一條數(shù)據(jù)進行JSON解析添吗。
注意:使用RDD讀取JSON文件處理很復(fù)雜沥曹,同時SparkSQL集成了很好的處理JSON文件的方式,所以應(yīng)用中多是采用SparkSQL處理JSON文件碟联。
(1)導(dǎo)入解析json所需的包
scala> import scala.util.parsing.json.JSON
(2)上傳json文件到HDFS
[hadoop@hadoop101 spark]$ hdfs dfs -put ./examples/src/main/resources/people.json /
(3)讀取文件
scala> val json = sc.textFile("/people.json")
json: org.apache.spark.rdd.RDD[String] = /people.json MapPartitionsRDD[8] at textFile at <console>:24
(4)解析json數(shù)據(jù)
scala> val result = json.map(JSON.parseFull)
result: org.apache.spark.rdd.RDD[Option[Any]] = MapPartitionsRDD[10] at map at <console>:27
(5)打印
scala> result.collect
res11: Array[Option[Any]] = Array(Some(Map(name -> Michael)), Some(Map(name -> Andy, age -> 30.0)), Some(Map(name -> Justin, age -> 19.0)))
3.1.3 Sequence文件
SequenceFile文件是Hadoop用來存儲二進制形式的key-value對而設(shè)計的一種平面文件(Flat File)妓美。Spark 有專門用來讀取 SequenceFile 的接口。在 SparkContext 中鲤孵,可以調(diào)用 sequenceFile keyClass, valueClass壶栋。
注意:SequenceFile文件只針對PairRDD
(1)創(chuàng)建一個RDD
scala> val rdd = sc.parallelize(Array((1,2),(3,4),(5,6)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:24
(2)將RDD保存為Sequence文件
scala> rdd.saveAsSequenceFile("file:///opt/modules/spark/seqFile")
(3)查看該文件
[hadoop@hadoop101 seqFile]$ pwd
/opt/modules/spark/seqFile
[hadoop@hadoop101 seqFile]$ ll
總用量 8
-rw-r--r-- 1 hadoop hadoop 108 1月 9 10:29 part-00000
-rw-r--r-- 1 hadoop hadoop 124 1月 9 10:29 part-00001
-rw-r--r-- 1 hadoop hadoop 0 1月 9 10:29 _SUCCESS
[hadoop@hadoop101 seqFile]$ cat part-00000
SEQ org.apache.hadoop.io.IntWritable org.apache.hadoop.io.IntWritable?
(4)讀取Sequence文件
scala> val seq = sc.sequenceFile[Int,Int]("file:///opt/modules/spark/seqFile")
seq: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[18] at sequenceFile at <console>:24
(5)打印讀取后的Sequence文件
scala> seq.collect
res14: Array[(Int, Int)] = Array((1,2), (3,4), (5,6))
3.1.4 對象文件
對象文件是將對象序列化后保存的文件,采用Java的序列化機制普监」笫裕可以通過objectFilek,v 函數(shù)接收一個路徑,讀取對象文件凯正,返回對應(yīng)的 RDD毙玻,也可以通過調(diào)用saveAsObjectFile() 實現(xiàn)對對象文件的輸出。因為是序列化所以要指定類型廊散。
(1)創(chuàng)建一個RDD
scala> val rdd = sc.parallelize(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at <console>:24
(2)將RDD保存為Object文件
scala> rdd.saveAsObjectFile("file:///opt/modules/spark/objectFile")
(3)查看該文件
[hadoop@hadoop101 objectFile]$ pwd
/opt/module/spark/objectFile
[hadoop@hadoop101 objectFile]$ ll
總用量 8
-rw-r--r-- 1 hadoop hadoop 142 1月 9 10:37 part-00000
-rw-r--r-- 1 hadoop hadoop 142 1月 9 10:37 part-00001
-rw-r--r-- 1 hadoop hadoop 0 1月 9 10:37 _SUCCESS
[hadoop@hadoop101 objectFile]$ cat part-00000
SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableW@`l
(4)讀取Object文件
scala> val objFile = sc.objectFile[Int]("file:///opt/modules/spark/objectFile")
objFile: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at objectFile at <console>:24
(5)打印讀取后的Sequence文件
scala> objFile.collect
res19: Array[Int] = Array(1, 2, 3, 4)
3.2 文件系統(tǒng)類數(shù)據(jù)讀取與保存
3.2.1 HDFS
Spark的整個生態(tài)系統(tǒng)與Hadoop是完全兼容的,所以對于Hadoop所支持的文件類型或者數(shù)據(jù)庫類型,Spark也同樣支持.另外,由于Hadoop的API有新舊兩個版本,所以Spark為了能夠兼容Hadoop所有的版本,也提供了兩套創(chuàng)建操作接口.對于外部存儲創(chuàng)建操作而言,hadoopRDD和newHadoopRDD是最為抽象的兩個函數(shù)接口,主要包含以下四個參數(shù).
1)輸入格式(InputFormat): 制定數(shù)據(jù)輸入的類型,如TextInputFormat等,新舊兩個版本所引用的版本分別是org.apache.hadoop.mapred.InputFormat和org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)
2)鍵類型: 指定[K,V]鍵值對中K的類型
3)值類型: 指定[K,V]鍵值對中V的類型
4)分區(qū)值: 指定由外部存儲生成的RDD的partition數(shù)量的最小值,如果沒有指定,系統(tǒng)會使用默認值defaultMinSplits
注意:其他創(chuàng)建操作的API接口都是為了方便最終的Spark程序開發(fā)者而設(shè)置的,是這兩個接口的高效實現(xiàn)版本.例如,對于textFile而言,只有path這個指定文件路徑的參數(shù),其他參數(shù)在系統(tǒng)內(nèi)部指定了默認值桑滩。
1.在Hadoop中以壓縮形式存儲的數(shù)據(jù),不需要指定解壓方式就能夠進行讀取,因為Hadoop本身有一個解壓器會根據(jù)壓縮文件的后綴推斷解壓算法進行解壓.
2.如果用Spark從Hadoop中讀取某種類型的數(shù)據(jù)不知道怎么讀取的時候,上網(wǎng)查找一個使用map-reduce的時候是怎么讀取這種這種數(shù)據(jù)的,然后再將對應(yīng)的讀取方式改寫成上面的hadoopRDD和newAPIHadoopRDD兩個類就行了
3.2.2 MySQL數(shù)據(jù)庫連接
支持通過Java JDBC訪問關(guān)系型數(shù)據(jù)庫。需要通過JdbcRDD進行允睹,示例如下:
(1)添加依賴
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
(2)Mysql讀仍俗肌:
object MysqlRDD {
def main(args: Array[String]): Unit = {
//1.創(chuàng)建spark配置信息
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")
//2.創(chuàng)建SparkContext
val sc = new SparkContext(sparkConf)
//3.定義連接mysql的參數(shù)
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://hadoop101:3306/rdd"
val userName = "root"
val passWd = "root"
//創(chuàng)建JdbcRDD
val rdd = new JdbcRDD(sc, () => {
Class.forName(driver)
DriverManager.getConnection(url, userName, passWd)
},
"select * from `rddtable` where `id`>=?;",
1,
10,
1,
r => (r.getInt(1), r.getString(2))
)
//打印最后結(jié)果
println(rdd.count())
rdd.foreach(println)
sc.stop()
}
}
Mysql寫入:
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp")
val sc = new SparkContext(sparkConf)
val data = sc.parallelize(List("Female", "Male","Female"))
data.foreachPartition(insertData)
}
def insertData(iterator: Iterator[String]): Unit = {
Class.forName ("com.mysql.jdbc.Driver").newInstance()
val conn = java.sql.DriverManager.getConnection("jdbc:mysql://hadoop101:3306/rdd", "root", "root")
iterator.foreach(data => {
val ps = conn.prepareStatement("insert into rddtable(name) values (?)")
ps.setString(1, data)
ps.executeUpdate()
})
}
3.2.3 HBase數(shù)據(jù)庫
由于 org.apache.hadoop.hbase.mapreduce.TableInputFormat 類的實現(xiàn),Spark 可以通過Hadoop輸入格式訪問HBase擂找。這個輸入格式會返回鍵值對數(shù)據(jù)戳吝,其中鍵的類型為org. apache.hadoop.hbase.io.ImmutableBytesWritable,而值的類型為org.apache.hadoop.hbase.client.Result贯涎。
(1)添加依賴
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
(2)從HBase讀取數(shù)據(jù)
object HBaseSpark {
def main(args: Array[String]): Unit = {
//創(chuàng)建spark配置信息
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")
//創(chuàng)建SparkContext
val sc = new SparkContext(sparkConf)
//構(gòu)建HBase配置信息
val conf: Configuration = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "hadoop101,hadoop102,hadoop103")
conf.set(TableInputFormat.INPUT_TABLE, "rddtable")
//從HBase讀取數(shù)據(jù)形成RDD
val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
val count: Long = hbaseRDD.count()
println(count)
//對hbaseRDD進行處理
hbaseRDD.foreach {
case (_, result) =>
val key: String = Bytes.toString(result.getRow)
val name: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")))
val color: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("color")))
println("RowKey:" + key + ",Name:" + name + ",Color:" + color)
}
//關(guān)閉連接
sc.stop()
}
}
3)往HBase寫入
def main(args: Array[String]) {
//獲取Spark配置信息并創(chuàng)建與spark的連接
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HBaseApp")
val sc = new SparkContext(sparkConf)
//創(chuàng)建HBaseConf
val conf = HBaseConfiguration.create()
val jobConf = new JobConf(conf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "fruit_spark")
//構(gòu)建Hbase表描述器
val fruitTable = TableName.valueOf("fruit_spark")
val tableDescr = new HTableDescriptor(fruitTable)
tableDescr.addFamily(new HColumnDescriptor("info".getBytes))
//創(chuàng)建Hbase表
val admin = new HBaseAdmin(conf)
if (admin.tableExists(fruitTable)) {
admin.disableTable(fruitTable)
admin.deleteTable(fruitTable)
}
admin.createTable(tableDescr)
//定義往Hbase插入數(shù)據(jù)的方法
def convert(triple: (Int, String, Int)) = {
val put = new Put(Bytes.toBytes(triple._1))
put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2))
put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3))
(new ImmutableBytesWritable, put)
}
//創(chuàng)建一個RDD
val initialRDD = sc.parallelize(List((1,"apple",11), (2,"banana",12), (3,"pear",13)))
//將RDD內(nèi)容寫到HBase
val localData = initialRDD.map(convert)
localData.saveAsHadoopDataset(jobConf)
}
4 RDD編程進階
4.1 累加器
累加器用來對信息進行聚合听哭,通常在向 Spark傳遞函數(shù)時,比如使用 map() 函數(shù)或者用 filter() 傳條件時,可以使用驅(qū)動器程序中定義的變量陆盘,但是集群中運行的每個任務(wù)都會得到這些變量的一份新的副本普筹,更新這些副本的值也不會影響驅(qū)動器中的對應(yīng)變量。如果我們想實現(xiàn)所有分片處理時更新共享變量的功能隘马,那么累加器可以實現(xiàn)我們想要的效果太防。
4.1.1 系統(tǒng)累加器
針對一個輸入的日志文件,如果我們想計算文件中所有空行的數(shù)量酸员,我們可以編寫以下程序:
scala> val notice = sc.textFile("./NOTICE")
notice: org.apache.spark.rdd.RDD[String] = ./NOTICE MapPartitionsRDD[40] at textFile at <console>:32
scala> val blanklines = sc.accumulator(0)
warning: there were two deprecation warnings; re-run with -deprecation for details
blanklines: org.apache.spark.Accumulator[Int] = 0
scala> val tmp = notice.flatMap(line => {
| if (line == "") {
| blanklines += 1
| }
| line.split(" ")
| })
tmp: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[41] at flatMap at <console>:36
scala> tmp.count()
res31: Long = 3213
scala> blanklines.value
res32: Int = 171
累加器的用法如下所示:
通過在驅(qū)動器中調(diào)用SparkContext.accumulator(initialValue)方法蜒车,創(chuàng)建出存有初始值的累加器。返回值為 org.apache.spark.Accumulator[T] 對象幔嗦,其中 T 是初始值 initialValue 的類型酿愧。Spark閉包里的執(zhí)行器代碼可以使用累加器的 += 方法(在Java中是 add)增加累加器的值。 驅(qū)動器程序可以調(diào)用累加器的value屬性(在Java中使用value()或setValue())來訪問累加器的值邀泉。
注意:工作節(jié)點上的任務(wù)不能訪問累加器的值嬉挡。從這些任務(wù)的角度來看,累加器是一個只寫變量汇恤。
對于要在行動操作中使用的累加器庞钢,Spark只會把每個任務(wù)對各累加器的修改應(yīng)用一次。因此因谎,如果想要一個無論在失敗還是重復(fù)計算時都絕對可靠的累加器基括,我們必須把它放在 foreach() 這樣的行動操作中。轉(zhuǎn)化操作中累加器可能會發(fā)生不止一次更新蓝角。
4.1.2 自定義累加器
自定義累加器類型的功能在1.X版本中就已經(jīng)提供了阱穗,但是使用起來比較麻煩,在2.0版本后使鹅,累加器的易用性有了較大的改進揪阶,而且官方還提供了一個新的抽象類:AccumulatorV2來提供更加友好的自定義類型累加器的實現(xiàn)方式。實現(xiàn)自定義類型累加器需要繼承AccumulatorV2并至少覆寫下例中出現(xiàn)的方法患朱,下面這個累加器可以用于在程序運行過程中收集一些文本類信息鲁僚,最終以Set[String]的形式返回。
class LogAccumulator extends org.apache.spark.util.AccumulatorV2[String, java.util.Set[String]] {
private val _logArray: java.util.Set[String] = new java.util.HashSet[String]()
override def isZero: Boolean = {
_logArray.isEmpty
}
override def reset(): Unit = {
_logArray.clear()
}
override def add(v: String): Unit = {
_logArray.add(v)
}
override def merge(other: org.apache.spark.util.AccumulatorV2[String, java.util.Set[String]]): Unit = {
other match {
case o: LogAccumulator => _logArray.addAll(o.value)
}
}
override def value: java.util.Set[String] = {
java.util.Collections.unmodifiableSet(_logArray)
}
override def copy():org.apache.spark.util.AccumulatorV2[String, java.util.Set[String]] = {
val newAcc = new LogAccumulator()
_logArray.synchronized{
newAcc._logArray.addAll(_logArray)
}
newAcc
}
}
// 過濾掉帶字母的
object LogAccumulator {
def main(args: Array[String]) {
val conf=new SparkConf().setAppName("LogAccumulator")
val sc=new SparkContext(conf)
val accum = new LogAccumulator
sc.register(accum, "logAccum")
val sum = sc.parallelize(Array("1", "2a", "3", "4b", "5", "6", "7cd", "8", "9"), 2).filter(line => {
val pattern = """^-?(\d+)"""
val flag = line.matches(pattern)
if (!flag) {
accum.add(line)
}
flag
}).map(_.toInt).reduce(_ + _)
println("sum: " + sum)
for (v <- accum.value) print(v + "")
println()
sc.stop()
}
}
4.2 廣播變量(調(diào)優(yōu)策略)
廣播變量用來高效分發(fā)較大的對象裁厅。向所有工作節(jié)點發(fā)送一個較大的只讀值冰沙,以供一個或多個Spark操作使用。比如执虹,如果你的應(yīng)用需要向所有節(jié)點發(fā)送一個較大的只讀查詢表拓挥,甚至是機器學習算法中的一個很大的特征向量,廣播變量用起來都很順手袋励。 在多個并行操作中使用同一個變量侥啤,但是 Spark會為每個任務(wù)分別發(fā)送当叭。
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(35)
scala> broadcastVar.value
res33: Array[Int] = Array(1, 2, 3)
使用廣播變量的過程如下:
(1) 通過對一個類型 T 的對象調(diào)用 SparkContext.broadcast 創(chuàng)建出一個 Broadcast[T] 對象。 任何可序列化的類型都可以這么實現(xiàn)盖灸。
(2) 通過 value 屬性訪問該對象的值(在 Java 中為 value() 方法)蚁鳖。
(3) 變量只會被發(fā)到各個節(jié)點一次,應(yīng)作為只讀值處理(修改這個值不會影響到別的節(jié)點)赁炎。
5 RDD相關(guān)概念關(guān)系
輸入可能以多個文件的形式存儲在HDFS上醉箕,每個File都包含了很多塊,稱為Block徙垫。當Spark讀取這些文件作為輸入時讥裤,會根據(jù)具體數(shù)據(jù)格式對應(yīng)的InputFormat進行解析,一般是將若干個Block合并成一個輸入分片松邪,稱為InputSplit坞琴,注意InputSplit不能跨越文件哨查。隨后將為這些輸入分片生成具體的Task逗抑。InputSplit與Task是一一對應(yīng)的關(guān)系。隨后這些具體的Task每個都會被分配到集群上的某個節(jié)點的某個Executor去執(zhí)行寒亥。
1)每個節(jié)點可以起一個或多個Executor邮府。
2)每個Executor由若干core組成,每個Executor的每個core一次只能執(zhí)行一個Task溉奕。
3)每個Task執(zhí)行的結(jié)果就是生成了目標RDD的一個partiton褂傀。
注意: 這里的core是虛擬的core而不是機器的物理CPU核,可以理解為就是Executor的一個工作線程加勤。而 Task被執(zhí)行的并發(fā)度 = Executor數(shù)目 * 每個Executor核數(shù)仙辟。至于partition的數(shù)目:
1)對于數(shù)據(jù)讀入階段,例如sc.textFile鳄梅,輸入文件被劃分為多少InputSplit就會需要多少初始Task叠国。
2)在Map階段partition數(shù)目保持不變。
3)在Reduce階段戴尸,RDD的聚合會觸發(fā)shuffle操作粟焊,聚合后的RDD的partition數(shù)目跟具體操作有關(guān),例如repartition操作會聚合成指定分區(qū)數(shù)孙蒙,還有一些算子是可配置的项棠。
RDD在計算的時候,每個分區(qū)都會起一個task挎峦,所以rdd的分區(qū)數(shù)目決定了總的的task數(shù)目香追。申請的計算節(jié)點(Executor)數(shù)目和每個計算節(jié)點核數(shù),決定了你同一時刻可以并行執(zhí)行的task坦胶。
比如RDD有100個分區(qū)透典,那么計算的時候就會生成100個task歪玲,你的資源配置為10個計算節(jié)點,每個2個核掷匠,同一時刻可以并行的task數(shù)目為20滥崩,計算這個RDD就需要5個輪次。如果計算資源不變讹语,你有101個task的話钙皮,就需要6個輪次,在最后一輪中顽决,只有一個task在執(zhí)行短条,其余核都在空轉(zhuǎn)。如果資源不變才菠,你的RDD只有2個分區(qū)茸时,那么同一時刻只有2個task運行,其余18個核空轉(zhuǎn)赋访,造成資源浪費可都。這就是在spark調(diào)優(yōu)中,增大RDD分區(qū)數(shù)目蚓耽,增大任務(wù)并行度的做法渠牲。