以前在工作中主要寫Spark SQL相關的代碼褐鸥,對于RDD的學習有些疏漏酬荞。本周工作中學習了一些簡單的RDD的知識刻像,主要是關于RDD分區(qū)相關的內容锹安。下面的內容都是自己親身實踐所得短荐,如果有錯誤的地方倚舀,還希望大家批評指正。
本文的目錄如下:
1忍宋、RDD特性-分區(qū)列表
2瞄桨、寬/窄依賴
3、RDD的創(chuàng)建
4讶踪、查看分區(qū)數(shù)據(jù):mapPartitionsWithIndex
5芯侥、笛卡爾積:cartesian
6、數(shù)據(jù)分組:groupByKey
7乳讥、重新分區(qū):repartition VS coalesce
8柱查、map Vs mapPartitions
在正式開始文章內容之前,先介紹一下咱們的背景云石“ぃ考慮一下機器學習中網格搜索策略,比如隨機森林中汹忠,我們想得到n_estimators和max_depth兩個參數(shù)的最優(yōu)組合淋硝,我們會對給出的參數(shù)取值范圍計算笛卡爾積,然后對每一種組合訓練得到一個效果宽菜,并選取效果最好的一組參數(shù)谣膳。假設我們想使用spark把這個過程并行化,但是參數(shù)組合數(shù)量太多铅乡,沒有足夠的計算資源继谚,只能一個task上運行幾組參數(shù)。
舉例來說阵幸,假設n_estimators有10個取值花履,max_depth有5個取值,共有5*10=50種組合挚赊,最好的方法就是并行50個task诡壁。但是由于資源不足,我們只能并行執(zhí)行10個task荠割,也就是說一個task上執(zhí)行五組參數(shù)組合妹卿。
好了,介紹完了背景涨共,是不是很簡單纽帖?接下來就介紹一下在這一過程中的一些學習收獲。
1举反、RDD特性-分區(qū)列表
Spark中的RDD是被分區(qū)的懊直,每一個分區(qū)都會被一個計算任務(Task處理),分區(qū)數(shù)決定了并行計算的數(shù)量火鼻。
2室囊、寬/窄依賴
RDD中的一些算子雕崩,會將一個RDD轉換為另一個RDD,在這一過程中融撞,由于RDD是分區(qū)的盼铁,就會產生不同的依賴關系,主要分為寬依賴和窄依賴尝偎。
2.1 窄依賴
窄依賴如下圖所示:
先定義一下饶火,上圖中每一組中左邊的稱做父RDD、右邊的稱為子RDD致扯,那么窄依賴就是說:每一個父RDD中的Partition最多被子RDD中的1個Partition所使用肤寝。窄依賴最常見的就是map算子。
2.2 寬依賴
寬依賴的示意圖如下:
在寬依賴中抖僵,一個父RDD的Partition會被多個子RDD所使用鲤看。寬依賴也很常見,如我們下文要介紹的groupByKey和repartition耍群。
介紹完一些簡單的知識之后义桂,我們開始進入正題了,你應該還沒有忘記剛才的背景介紹吧蹈垢!
3慷吊、RDD的創(chuàng)建
首先創(chuàng)建一個sparkSession的對象:
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.enableHiveSupport()
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
接下來,咱們建立兩個RDD,分別是n_estimators和max_depth對應的取值
val n_estimators = Array(10,20,30,40,50,60,70,80,90,100)
val max_depth = Array(3,4,5,6,7)
val n_estimators_rdd = spark.sparkContext.parallelize(n_estimators)
val max_depth_rdd = spark.sparkContext.parallelize(max_depth)
先來查看一下分區(qū)數(shù)量:
println(n_estimators_rdd.partitions.length)
println(max_depth_rdd.partitions.length)
輸出為:
1
1
可以看到兩個RDD的分區(qū)數(shù)量都是1,那是不是RDD的分區(qū)數(shù)量默認是1呢负拟?答案當然是否定的鸠信,有關于RDD默認的分區(qū)數(shù)量,可以參考:http://www.reibang.com/p/fe987f6d2018?utm_source=oschina-app
當然攒岛,我們也可以在創(chuàng)建時指定RDD的分區(qū)數(shù)量:
val n_estimators_rdd = spark.sparkContext.parallelize(n_estimators,10)
val max_depth_rdd = spark.sparkContext.parallelize(max_depth,5)
此時分區(qū)數(shù)量就是10和5了赖临。
4、查看分區(qū)數(shù)據(jù):mapPartitionsWithIndex
接下來你可能會想查看一下每個分區(qū)的內容灾锯,我們使用mapPartitionsWithIndex函數(shù)兢榨,先直接給出代碼,隨后再詳細介紹:
n_estimators_rdd.mapPartitionsWithIndex((partid,iter)=>{
iter.map(n_estimator => {
println(partid + "," + n_estimator)
n_estimator
})
}).collect()
結果輸出如下:
可以看到顺饮,每個分區(qū)對應了一個n_estimator的數(shù)值吵聪,關于上述代碼,有以下幾點需要注意的點:
1)mapPartitionsWithIndex對每一對(分區(qū)id,分區(qū)內容)進行操作兼雄,partid即是分區(qū)id吟逝,上面的iter即是分區(qū)內容。
2)分區(qū)內容iter是一個Iterator赦肋,在我們這里是Iterator[Int]類型的块攒,對其每個元素進行查看励稳,需要通過map操作。
5囱井、笛卡爾積:cartesian
接下來驹尼,我們需要計算兩組參數(shù)的笛卡爾積,RDD間的笛卡爾積操作示意圖如下:
可以看到庞呕,經過笛卡爾積后的RDD的Partition數(shù)量應該是兩個父RDD的分區(qū)數(shù)量的乘積:
val cartesian_rdd = n_estimators_rdd.cartesian(max_depth_rdd)
println(cartesian_rdd.partitions.length)
由于n_estimators_rdd的分區(qū)數(shù)量是10新翎,max_depth_rdd的分區(qū)數(shù)量是5,因此cartesian_rdd的分區(qū)數(shù)量是50住练。
6料祠、數(shù)據(jù)分組:groupByKey
在對n_estimators和max_depth計算笛卡爾積之后,我們已經得到了50組參數(shù)組合澎羞,并且分布在50個Partition上髓绽。無論是通過map還是mapPartitions去并行計算每組參數(shù)對應的GBDT擬合效果,都會創(chuàng)建50個task妆绞。如果我們只想要10個task顺呕,每個task去執(zhí)行5組參數(shù),那么需要對數(shù)據(jù)進行分組括饶,使用groupByKey算子株茶。代碼如下:
val cartesian_grp_rdd = cartesian_rdd
.zipWithIndex()
.map(row=>(row._2 / 5,row._1))
.groupByKey()
對于上一節(jié)中得到的RDD,我們首先使用zipWithIndex()為其添加了編號图焰,此時RDD中每一條數(shù)據(jù)分為兩部分启盛,假設一行用row表示,那么row._2 代表其對應的行號技羔,row._1代表一組實驗參數(shù)僵闯,類型為(Int,Int)。
接下來藤滥,使用map把我們的RDD轉換成K-V形式的pairRDD鳖粟,注意這里我們進行row._2 / 5,這樣5條數(shù)據(jù)的Key相同拙绊。
最后通過groupByKey()對每5條數(shù)據(jù)進行了分組向图。接下來我們來看下經過groupByKey()操作后RDD的分區(qū)情況:
cartesian_grp_rdd.mapPartitionsWithIndex((partid,iter)=>{
iter.map(data => {
println(partid + "," +data)
data
})
}).collect()
輸出如下:
可以看到,分區(qū)數(shù)量為10标沪,但是榄攀,每個分區(qū)僅有一條數(shù)據(jù),每條數(shù)據(jù)的Key是我們剛才計算的index金句,Value是一個包含5組實驗參數(shù)的CompactBuffer檩赢。此時如果我們想對每組參數(shù)進行操作的話,還需要將數(shù)據(jù)轉換為List趴梢,通過循環(huán)進行處理漠畜,如下:
cartesian_grp_rdd.mapPartitionsWithIndex((partid,iter)=>{
iter.map(data => {
println(partid + "," +data)
val dataList = data._2.toList
for(i <- 0 until dataList.length){
println("do something")
}
data
})
}).collect()
7币他、重新分區(qū):repartition VS coalesce
看到這里,你可能會想憔狞,能不能不通過groupByKey()來對數(shù)據(jù)進行重新分區(qū)蝴悉?這里我們介紹兩種方法repartition和coalesce。
7.1 coalesce
先上代碼:
val cartesian_coalesce_rdd = cartesian_rdd.coalesce(10)
cartesian_coalesce_rdd.mapPartitionsWithIndex((partid,iter)=>{
iter.map(data => {
println(partid + "," +data)
data
})
}).collect()
結果如下:
分區(qū)效果還不錯瘾敢,對吧拍冠。如果將一個分區(qū)較多的RDD重新分區(qū)為分區(qū)較少的RDD,默認的coalesce是不會進行shuffle過程的(參數(shù)中的shuffle默認值為false)簇抵,其過程類似于如下庆杜,是一個分區(qū)之間相互組合的過程(窄依賴):
但是如果想要分區(qū)較少的RDD轉換為分區(qū)較多的RDD,shuffle過程是會有的碟摆。
7.2 repartition
而對于repartition來說晃财,它其實就是shuffle=True的coalesce過程,看源碼也可以看出來:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
先看一下repartition的分區(qū)效果:
val repartition_coalesce_rdd = cartesian_rdd.repartition(10)
repartition_coalesce_rdd.mapPartitionsWithIndex((partid,iter)=>{
iter.map(data => {
println(partid + "," +data)
data
})
}).collect()
感覺分區(qū)效果很亂啊典蜕,其背后的分區(qū)原理是什么呢断盛?還是得看源碼:
從源碼中可以看出,它是基于HashPartitioner來進行分區(qū)的愉舔。HashPartitioner分區(qū)的原理很簡單钢猛,對于給定的key,計算其hashCode轩缤,并除于分區(qū)的個數(shù)取余命迈,如果余數(shù)小于0,則用余數(shù)+分區(qū)的個數(shù)火的,最后返回的值就是這個key所屬的分區(qū)ID壶愤。
好了, 既然是基于Key分區(qū)的卫玖,那我們先給RDD搞一個Key公你,看看是不是按照我們的想法去進行分區(qū)的:
val repartition_coalesce_rdd = cartesian_rdd
.zipWithIndex()
.map(row=>(row._2 / 5,row._1))
.repartition(10)
repartition_coalesce_rdd.mapPartitionsWithIndex((partid,iter)=>{
iter.map(data => {
println(partid + "," +data)
data
})
}).collect()
輸出如下:
結果并不是如我們所想,那么是為什么呢假瞬?原因在于repartition所使用的Key,并非是RDD中每一條數(shù)據(jù)的Key迂尝,而是它為每條數(shù)據(jù)重新生成了一個隨機數(shù)脱茉,作為此條數(shù)據(jù)的Key:
所以,在將一個分區(qū)較多的RDD重新分區(qū)為分區(qū)較少的RDD時垄开,盡量使用coalesce算子琴许。
8、map Vs mapPartitions
好了溉躲,我們已經重新將數(shù)據(jù)分為10個分區(qū)了榜田,接下來就是并行調度了益兄,這里有兩個方法,一個是map箭券,一個是mapPartitions净捅。二者有什么區(qū)別呢:
map是對rdd中的每一個元素進行操作;mapPartitions則是對rdd中的每個分區(qū)的迭代器進行操作辩块。假設我們想把RDD中所有的數(shù)變?yōu)閮杀痘琢绻灿?個Partition,每個Partition有1萬條數(shù)據(jù)废亭。那么map操作會執(zhí)行5萬次function国章,而mapPartitions操作只會執(zhí)行5次function。因此mapPartitions的性能較高豆村。
但要注意的一點是液兽,MapPartitions操作,對于大量數(shù)據(jù)來說掌动,比如一個Partition有100萬數(shù)據(jù)抵碟,一次傳入一個function以后,那么可能一下子內存不夠坏匪,但是又沒有辦法去騰出內存空間來拟逮,可能就OOM,內存溢出适滓。
而在我們的場景中敦迄,選擇mapPartitions即可。二者代碼示例如下:
def doubleMap(a:(Int,Int))= { (a._1 * 2,a._2 * 3) }
cartesian_coalesce_rdd.map(
data=>{
doubleMap(data)
}
).foreach(println)
def doubleMapPartition( iter : Iterator[(Int,Int)]) = {
var res = List[(Int,Int)]()
while (iter.hasNext){
val cur = iter.next()
res .::= (cur._1 * 2, cur._2 * 3)
}
res.iterator
}
cartesian_coalesce_rdd.mapPartitions(
iter=>{
doubleMapPartition(iter)
}
).foreach(println)
注意凭迹,mapPartitions方法接受的是一個Iterator罚屋,返回也必須是一個Iterator。
好了嗅绸,本文就整理到這里了脾猛!