Spark學習記錄|RDD分區(qū)的那些事

以前在工作中主要寫Spark SQL相關的代碼褐鸥,對于RDD的學習有些疏漏酬荞。本周工作中學習了一些簡單的RDD的知識刻像,主要是關于RDD分區(qū)相關的內容锹安。下面的內容都是自己親身實踐所得短荐,如果有錯誤的地方倚舀,還希望大家批評指正。

本文的目錄如下:

1忍宋、RDD特性-分區(qū)列表

2瞄桨、寬/窄依賴

3、RDD的創(chuàng)建

4讶踪、查看分區(qū)數(shù)據(jù):mapPartitionsWithIndex

5芯侥、笛卡爾積:cartesian

6、數(shù)據(jù)分組:groupByKey

7乳讥、重新分區(qū):repartition VS coalesce

8柱查、map Vs mapPartitions

在正式開始文章內容之前,先介紹一下咱們的背景云石“ぃ考慮一下機器學習中網格搜索策略,比如隨機森林中汹忠,我們想得到n_estimators和max_depth兩個參數(shù)的最優(yōu)組合淋硝,我們會對給出的參數(shù)取值范圍計算笛卡爾積,然后對每一種組合訓練得到一個效果宽菜,并選取效果最好的一組參數(shù)谣膳。假設我們想使用spark把這個過程并行化,但是參數(shù)組合數(shù)量太多铅乡,沒有足夠的計算資源继谚,只能一個task上運行幾組參數(shù)。

舉例來說阵幸,假設n_estimators有10個取值花履,max_depth有5個取值,共有5*10=50種組合挚赊,最好的方法就是并行50個task诡壁。但是由于資源不足,我們只能并行執(zhí)行10個task荠割,也就是說一個task上執(zhí)行五組參數(shù)組合妹卿。

好了,介紹完了背景涨共,是不是很簡單纽帖?接下來就介紹一下在這一過程中的一些學習收獲。

1举反、RDD特性-分區(qū)列表

Spark中的RDD是被分區(qū)的懊直,每一個分區(qū)都會被一個計算任務(Task處理),分區(qū)數(shù)決定了并行計算的數(shù)量火鼻。

2室囊、寬/窄依賴

RDD中的一些算子雕崩,會將一個RDD轉換為另一個RDD,在這一過程中融撞,由于RDD是分區(qū)的盼铁,就會產生不同的依賴關系,主要分為寬依賴和窄依賴尝偎。

2.1 窄依賴

窄依賴如下圖所示:

先定義一下饶火,上圖中每一組中左邊的稱做父RDD、右邊的稱為子RDD致扯,那么窄依賴就是說:每一個父RDD中的Partition最多被子RDD中的1個Partition所使用肤寝。窄依賴最常見的就是map算子。

2.2 寬依賴

寬依賴的示意圖如下:

在寬依賴中抖僵,一個父RDD的Partition會被多個子RDD所使用鲤看。寬依賴也很常見,如我們下文要介紹的groupByKey和repartition耍群。

介紹完一些簡單的知識之后义桂,我們開始進入正題了,你應該還沒有忘記剛才的背景介紹吧蹈垢!

3慷吊、RDD的創(chuàng)建

首先創(chuàng)建一個sparkSession的對象:

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .enableHiveSupport()
  .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

接下來,咱們建立兩個RDD,分別是n_estimators和max_depth對應的取值

val n_estimators = Array(10,20,30,40,50,60,70,80,90,100)
val max_depth = Array(3,4,5,6,7)

val n_estimators_rdd = spark.sparkContext.parallelize(n_estimators)
val max_depth_rdd =  spark.sparkContext.parallelize(max_depth)

先來查看一下分區(qū)數(shù)量:

println(n_estimators_rdd.partitions.length)
println(max_depth_rdd.partitions.length)

輸出為:

1
1

可以看到兩個RDD的分區(qū)數(shù)量都是1,那是不是RDD的分區(qū)數(shù)量默認是1呢负拟?答案當然是否定的鸠信,有關于RDD默認的分區(qū)數(shù)量,可以參考:http://www.reibang.com/p/fe987f6d2018?utm_source=oschina-app

當然攒岛,我們也可以在創(chuàng)建時指定RDD的分區(qū)數(shù)量:

val n_estimators_rdd = spark.sparkContext.parallelize(n_estimators,10)
val max_depth_rdd =  spark.sparkContext.parallelize(max_depth,5)

此時分區(qū)數(shù)量就是10和5了赖临。

4、查看分區(qū)數(shù)據(jù):mapPartitionsWithIndex

接下來你可能會想查看一下每個分區(qū)的內容灾锯,我們使用mapPartitionsWithIndex函數(shù)兢榨,先直接給出代碼,隨后再詳細介紹:

n_estimators_rdd.mapPartitionsWithIndex((partid,iter)=>{
      iter.map(n_estimator => {
        println(partid + "," + n_estimator)
        n_estimator
      })
    }).collect()

結果輸出如下:

可以看到顺饮,每個分區(qū)對應了一個n_estimator的數(shù)值吵聪,關于上述代碼,有以下幾點需要注意的點:

1)mapPartitionsWithIndex對每一對(分區(qū)id,分區(qū)內容)進行操作兼雄,partid即是分區(qū)id吟逝,上面的iter即是分區(qū)內容。
2)分區(qū)內容iter是一個Iterator赦肋,在我們這里是Iterator[Int]類型的块攒,對其每個元素進行查看励稳,需要通過map操作。

5囱井、笛卡爾積:cartesian

接下來驹尼,我們需要計算兩組參數(shù)的笛卡爾積,RDD間的笛卡爾積操作示意圖如下:

可以看到庞呕,經過笛卡爾積后的RDD的Partition數(shù)量應該是兩個父RDD的分區(qū)數(shù)量的乘積:

val cartesian_rdd = n_estimators_rdd.cartesian(max_depth_rdd)
println(cartesian_rdd.partitions.length)

由于n_estimators_rdd的分區(qū)數(shù)量是10新翎,max_depth_rdd的分區(qū)數(shù)量是5,因此cartesian_rdd的分區(qū)數(shù)量是50住练。

6料祠、數(shù)據(jù)分組:groupByKey

在對n_estimators和max_depth計算笛卡爾積之后,我們已經得到了50組參數(shù)組合澎羞,并且分布在50個Partition上髓绽。無論是通過map還是mapPartitions去并行計算每組參數(shù)對應的GBDT擬合效果,都會創(chuàng)建50個task妆绞。如果我們只想要10個task顺呕,每個task去執(zhí)行5組參數(shù),那么需要對數(shù)據(jù)進行分組括饶,使用groupByKey算子株茶。代碼如下:

val cartesian_grp_rdd = cartesian_rdd
        .zipWithIndex()
        .map(row=>(row._2 / 5,row._1))
        .groupByKey()

對于上一節(jié)中得到的RDD,我們首先使用zipWithIndex()為其添加了編號图焰,此時RDD中每一條數(shù)據(jù)分為兩部分启盛,假設一行用row表示,那么row._2 代表其對應的行號技羔,row._1代表一組實驗參數(shù)僵闯,類型為(Int,Int)。

接下來藤滥,使用map把我們的RDD轉換成K-V形式的pairRDD鳖粟,注意這里我們進行row._2 / 5,這樣5條數(shù)據(jù)的Key相同拙绊。

最后通過groupByKey()對每5條數(shù)據(jù)進行了分組向图。接下來我們來看下經過groupByKey()操作后RDD的分區(qū)情況:

cartesian_grp_rdd.mapPartitionsWithIndex((partid,iter)=>{
      iter.map(data => {
        println(partid + "," +data)
        data
      })
    }).collect()

輸出如下:

可以看到,分區(qū)數(shù)量為10标沪,但是榄攀,每個分區(qū)僅有一條數(shù)據(jù),每條數(shù)據(jù)的Key是我們剛才計算的index金句,Value是一個包含5組實驗參數(shù)的CompactBuffer檩赢。此時如果我們想對每組參數(shù)進行操作的話,還需要將數(shù)據(jù)轉換為List趴梢,通過循環(huán)進行處理漠畜,如下:

cartesian_grp_rdd.mapPartitionsWithIndex((partid,iter)=>{
      iter.map(data => {
        println(partid + "," +data)
        val dataList = data._2.toList
        for(i <- 0 until dataList.length){
          println("do something")
        }
        data
      })
    }).collect()

7币他、重新分區(qū):repartition VS coalesce

看到這里,你可能會想憔狞,能不能不通過groupByKey()來對數(shù)據(jù)進行重新分區(qū)蝴悉?這里我們介紹兩種方法repartition和coalesce。

7.1 coalesce

先上代碼:

val cartesian_coalesce_rdd = cartesian_rdd.coalesce(10)

    cartesian_coalesce_rdd.mapPartitionsWithIndex((partid,iter)=>{
      iter.map(data => {
        println(partid + "," +data)
        data
      })
    }).collect()

結果如下:

分區(qū)效果還不錯瘾敢,對吧拍冠。如果將一個分區(qū)較多的RDD重新分區(qū)為分區(qū)較少的RDD,默認的coalesce是不會進行shuffle過程的(參數(shù)中的shuffle默認值為false)簇抵,其過程類似于如下庆杜,是一個分區(qū)之間相互組合的過程(窄依賴):

但是如果想要分區(qū)較少的RDD轉換為分區(qū)較多的RDD,shuffle過程是會有的碟摆。

7.2 repartition

而對于repartition來說晃财,它其實就是shuffle=True的coalesce過程,看源碼也可以看出來:

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }

先看一下repartition的分區(qū)效果:


val repartition_coalesce_rdd = cartesian_rdd.repartition(10)

repartition_coalesce_rdd.mapPartitionsWithIndex((partid,iter)=>{
  iter.map(data => {
    println(partid + "," +data)
    data
  })
}).collect()

感覺分區(qū)效果很亂啊典蜕,其背后的分區(qū)原理是什么呢断盛?還是得看源碼:

從源碼中可以看出,它是基于HashPartitioner來進行分區(qū)的愉舔。HashPartitioner分區(qū)的原理很簡單钢猛,對于給定的key,計算其hashCode轩缤,并除于分區(qū)的個數(shù)取余命迈,如果余數(shù)小于0,則用余數(shù)+分區(qū)的個數(shù)火的,最后返回的值就是這個key所屬的分區(qū)ID壶愤。

好了, 既然是基于Key分區(qū)的卫玖,那我們先給RDD搞一個Key公你,看看是不是按照我們的想法去進行分區(qū)的:

val repartition_coalesce_rdd = cartesian_rdd
  .zipWithIndex()
  .map(row=>(row._2 / 5,row._1))
  .repartition(10)

repartition_coalesce_rdd.mapPartitionsWithIndex((partid,iter)=>{
  iter.map(data => {
    println(partid + "," +data)
    data
  })
}).collect()

輸出如下:

結果并不是如我們所想,那么是為什么呢假瞬?原因在于repartition所使用的Key,并非是RDD中每一條數(shù)據(jù)的Key迂尝,而是它為每條數(shù)據(jù)重新生成了一個隨機數(shù)脱茉,作為此條數(shù)據(jù)的Key:

所以,在將一個分區(qū)較多的RDD重新分區(qū)為分區(qū)較少的RDD時垄开,盡量使用coalesce算子琴许。

8、map Vs mapPartitions

好了溉躲,我們已經重新將數(shù)據(jù)分為10個分區(qū)了榜田,接下來就是并行調度了益兄,這里有兩個方法,一個是map箭券,一個是mapPartitions净捅。二者有什么區(qū)別呢:

map是對rdd中的每一個元素進行操作;mapPartitions則是對rdd中的每個分區(qū)的迭代器進行操作辩块。假設我們想把RDD中所有的數(shù)變?yōu)閮杀痘琢绻灿?個Partition,每個Partition有1萬條數(shù)據(jù)废亭。那么map操作會執(zhí)行5萬次function国章,而mapPartitions操作只會執(zhí)行5次function。因此mapPartitions的性能較高豆村。

但要注意的一點是液兽,MapPartitions操作,對于大量數(shù)據(jù)來說掌动,比如一個Partition有100萬數(shù)據(jù)抵碟,一次傳入一個function以后,那么可能一下子內存不夠坏匪,但是又沒有辦法去騰出內存空間來拟逮,可能就OOM,內存溢出适滓。

而在我們的場景中敦迄,選擇mapPartitions即可。二者代碼示例如下:

def doubleMap(a:(Int,Int))= { (a._1 * 2,a._2 * 3) }

    cartesian_coalesce_rdd.map(
      data=>{
        doubleMap(data)
      }
    ).foreach(println)
def doubleMapPartition( iter : Iterator[(Int,Int)]) = {
  var res = List[(Int,Int)]()
  while (iter.hasNext){
    val cur = iter.next()
    res .::= (cur._1 * 2, cur._2 * 3)
  }
  res.iterator
}

cartesian_coalesce_rdd.mapPartitions(
  iter=>{
    doubleMapPartition(iter)
  }
).foreach(println)

注意凭迹,mapPartitions方法接受的是一個Iterator罚屋,返回也必須是一個Iterator。

好了嗅绸,本文就整理到這里了脾猛!

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市鱼鸠,隨后出現(xiàn)的幾起案子猛拴,更是在濱河造成了極大的恐慌,老刑警劉巖蚀狰,帶你破解...
    沈念sama閱讀 212,080評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件愉昆,死亡現(xiàn)場離奇詭異,居然都是意外死亡麻蹋,警方通過查閱死者的電腦和手機跛溉,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,422評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人芳室,你說我怎么就攤上這事专肪。” “怎么了堪侯?”我有些...
    開封第一講書人閱讀 157,630評論 0 348
  • 文/不壞的土叔 我叫張陵嚎尤,是天一觀的道長。 經常有香客問我抖格,道長诺苹,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,554評論 1 284
  • 正文 為了忘掉前任雹拄,我火速辦了婚禮收奔,結果婚禮上,老公的妹妹穿的比我還像新娘滓玖。我一直安慰自己坪哄,他們只是感情好,可當我...
    茶點故事閱讀 65,662評論 6 386
  • 文/花漫 我一把揭開白布势篡。 她就那樣靜靜地躺著翩肌,像睡著了一般。 火紅的嫁衣襯著肌膚如雪禁悠。 梳的紋絲不亂的頭發(fā)上念祭,一...
    開封第一講書人閱讀 49,856評論 1 290
  • 那天,我揣著相機與錄音碍侦,去河邊找鬼粱坤。 笑死,一個胖子當著我的面吹牛瓷产,可吹牛的內容都是我干的站玄。 我是一名探鬼主播,決...
    沈念sama閱讀 39,014評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼濒旦,長吁一口氣:“原來是場噩夢啊……” “哼株旷!你這毒婦竟也來了?” 一聲冷哼從身側響起尔邓,我...
    開封第一講書人閱讀 37,752評論 0 268
  • 序言:老撾萬榮一對情侶失蹤晾剖,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后铃拇,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體钞瀑,經...
    沈念sama閱讀 44,212評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,541評論 2 327
  • 正文 我和宋清朗相戀三年慷荔,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,687評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡显晶,死狀恐怖贷岸,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情磷雇,我是刑警寧澤偿警,帶...
    沈念sama閱讀 34,347評論 4 331
  • 正文 年R本政府宣布,位于F島的核電站唯笙,受9級特大地震影響螟蒸,放射性物質發(fā)生泄漏。R本人自食惡果不足惜崩掘,卻給世界環(huán)境...
    茶點故事閱讀 39,973評論 3 315
  • 文/蒙蒙 一七嫌、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧苞慢,春花似錦诵原、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,777評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至辑畦,卻和暖如春吗蚌,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背纯出。 一陣腳步聲響...
    開封第一講書人閱讀 32,006評論 1 266
  • 我被黑心中介騙來泰國打工蚯妇, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人潦刃。 一個月前我還...
    沈念sama閱讀 46,406評論 2 360
  • 正文 我出身青樓侮措,卻偏偏與公主長得像,于是被迫代替她去往敵國和親乖杠。 傳聞我的和親對象是個殘疾皇子分扎,可洞房花燭夜當晚...
    茶點故事閱讀 43,576評論 2 349

推薦閱讀更多精彩內容