Spark廣播變量應(yīng)用

一蚊锹、廣播變量

1、廣播變量的優(yōu)點

不需要每個task帶上一份變量副本,而是變成每個節(jié)點的executor存一份副本慌盯。這樣的話周霉, 就可以讓變量產(chǎn)生的副本數(shù)量大大減少。

2亚皂、廣播變量的用法
//將mapRdd廣播后返回broadcastValue
val broadcastValue: Broadcast[Array[(String, String)]] = sc.broadcast(mapRdd)
//獲取廣播變量的值
val getBroadCastMap: Map[String, String] = broadcastValue.value.toMap
3俱箱、廣播變量的原理

初始的時候,在Driver端有一個副本數(shù)據(jù)灭必。廣播變量后狞谱,task運行的時候,在使用副本數(shù)據(jù)前禁漓,首先在所在本地Executor對應(yīng)的BlockManager中跟衅,嘗試獲取副本數(shù)據(jù);如果本地沒有璃饱,即從Driver端拉取副本數(shù)據(jù)与斤,并且保存在所在本地的BlockManager中;此后這個Executor上所有的task荚恶,都會直接使用本地BlockManager中的副本數(shù)據(jù)。另Executor的BlockManager除了從Driver端拉取數(shù)據(jù)磷支,也可能從其他節(jié)點的BlockManager中拉去副本數(shù)據(jù)谒撼。
BlockManager:負責管理某個Executor對應(yīng)的內(nèi)存和磁盤的數(shù)據(jù),嘗試本地BlockManager中招map數(shù)據(jù)雾狈。

4廓潜、優(yōu)化說明

假設(shè)有50個Executor,共1000個task善榛;若每個map數(shù)據(jù)10M辩蛋。默認情況下,1000個副本10M共10G數(shù)據(jù)移盆。在集群中悼院,通過網(wǎng)絡(luò)傳輸,耗費10G的內(nèi)存資源咒循;如果使用了廣播變量据途,50個Executor即50個副本10M共500M數(shù)據(jù)。而且Executor的BlockManager不一定都從Driver傳輸?shù)奖镜匦鸬椋€可能從最近的節(jié)點的Executor的BlockManager中拉取數(shù)據(jù)颖医,網(wǎng)絡(luò)傳輸速度大大增加,傳輸數(shù)據(jù)大大減少裆蒸。
10G/500M=20倍熔萧,極大的提高了性能。

二、代碼實例

1佛致、準備數(shù)據(jù)
//訂單數(shù)據(jù)
1001,20150710,p0001,2
1002,20150710,p0002,3
1002,20150710,p0003,3
//產(chǎn)品數(shù)據(jù)
p0001,xiaomi,1000,2
p0002,appale,1000,3
p0003,samsung,1000,4

2遂赠、代碼開發(fā)

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SparkBroadCast {
  def main(args: Array[String]): Unit = {
    //構(gòu)造Spark程序執(zhí)行環(huán)境
    val conf = new SparkConf().setAppName("appName").setMaster("local[*]")
    //如果集群運行,則不需要設(shè)置setMaster("local[*]")
    val sc = new SparkContext(conf)
    //設(shè)置日志級別
    sc.setLogLevel("WARN")
    //創(chuàng)建RDD,讀取產(chǎn)品信息數(shù)據(jù)
    //產(chǎn)品記錄樣例:p0001,xiaomi,1000,2
    val productRdd: RDD[String] = sc.textFile(path = "D:\\03 Knowledge Related\\02 Learning\\02 Bigdata\\00 kkb\\02 Projects\\IDEA_WORKSPACE\\bigdata_hadoop\\Spark_core\\src\\main\\resources\\prts.txt")
    val productMapRdd: Array[(String, String)] = productRdd.map(x => {
      (x.split(",")(0), x)
    }).collect()
//    productMapRdd.foreach(println)
    /**
     * (p0001,p0001,xiaomi,1000,2)
     * (p0002,p0002,appale,1000,3)
     * (p0003,p0003,samsung,1000,4)
     */
    //將產(chǎn)品數(shù)據(jù)作為廣播變量
    val broadcastValue: Broadcast[Array[(String, String)]] = sc.broadcast(productMapRdd)

    //讀取訂單記錄:1001,20150710,p0001,2
    val ordersRdd: RDD[String] = sc.textFile(path = "D:\\03 Knowledge Related\\02 Learning\\02 Bigdata\\00 kkb\\02 Projects\\IDEA_WORKSPACE\\bigdata_hadoop\\Spark_core\\src\\main\\resources\\orders.txt")
    //將訂單記錄按照分區(qū)處理
    val productAndOrderRdd: RDD[String] = ordersRdd.mapPartitions(eachPartition => {
      //獲取產(chǎn)品廣播變量的數(shù)據(jù)并轉(zhuǎn)換為map類型晌杰,目的是通過getOrElse獲取產(chǎn)品數(shù)據(jù)
      val getBroadCastMap: Map[String, String] = broadcastValue.value.toMap
      //處理分區(qū)內(nèi)的訂單數(shù)據(jù)記錄
      val finalStr = eachPartition.map(eachLine => {
        //將每條訂單記錄按照逗號拆分跷睦,返回集合類型
        val ordersGet: Array[String] = eachLine.split(",")
        //產(chǎn)品的map類型,通過key(訂單的產(chǎn)品id)獲取對應(yīng)的產(chǎn)品記錄肋演,返回產(chǎn)品數(shù)據(jù)記錄
        val getProductStr: String = getBroadCastMap.getOrElse(ordersGet(2), "")
        //訂單記錄拼接產(chǎn)品記錄
        eachLine + "\t" + getProductStr
      })
      finalStr
    })
    productAndOrderRdd.foreach(println)

    /**
     * 1001,20150710,p0001,2    p0001,xiaomi,1000,2
     * 1002,20150710,p0002,3    p0002,appale,1000,3
     * 1002,20150710,p0003,3    p0003,samsung,1000,4
     */


    //關(guān)閉Spark環(huán)境
    sc.stop()
  }

}

三抑诸、注意事項

  • 能不能將一個RDD使用廣播變量廣播出去?
    不能爹殊,因為RDD是不存儲數(shù)據(jù)的蜕乡。可以將RDD的結(jié)果廣播出去梗夸。
  • 廣播變量只能在Driver端定義层玲,不能在Executor端定義。
  • 在Driver端可以修改廣播變量的值反症,在Executor端無法修改廣播變量的值辛块。
  • 當Executor端用到了Driver的變量,如果不使用廣播變量在Executor有多少task就有多少Driver端的變量副本铅碍。
  • 當Executor端用到了Driver的變量润绵,如果使用廣播變量在每個Executor中只有一份Driver端的變量副本。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末胞谈,一起剝皮案震驚了整個濱河市尘盼,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌烦绳,老刑警劉巖卿捎,帶你破解...
    沈念sama閱讀 210,914評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異径密,居然都是意外死亡午阵,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,935評論 2 383
  • 文/潘曉璐 我一進店門睹晒,熙熙樓的掌柜王于貴愁眉苦臉地迎上來趟庄,“玉大人,你說我怎么就攤上這事伪很∑萆叮” “怎么了?”我有些...
    開封第一講書人閱讀 156,531評論 0 345
  • 文/不壞的土叔 我叫張陵锉试,是天一觀的道長猫十。 經(jīng)常有香客問我,道長,這世上最難降的妖魔是什么拖云? 我笑而不...
    開封第一講書人閱讀 56,309評論 1 282
  • 正文 為了忘掉前任贷笛,我火速辦了婚禮,結(jié)果婚禮上宙项,老公的妹妹穿的比我還像新娘乏苦。我一直安慰自己,他們只是感情好尤筐,可當我...
    茶點故事閱讀 65,381評論 5 384
  • 文/花漫 我一把揭開白布汇荐。 她就那樣靜靜地躺著,像睡著了一般盆繁。 火紅的嫁衣襯著肌膚如雪掀淘。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,730評論 1 289
  • 那天油昂,我揣著相機與錄音革娄,去河邊找鬼。 笑死冕碟,一個胖子當著我的面吹牛拦惋,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播鸣哀,決...
    沈念sama閱讀 38,882評論 3 404
  • 文/蒼蘭香墨 我猛地睜開眼架忌,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了我衬?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,643評論 0 266
  • 序言:老撾萬榮一對情侶失蹤饰恕,失蹤者是張志新(化名)和其女友劉穎挠羔,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體埋嵌,經(jīng)...
    沈念sama閱讀 44,095評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡破加,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,448評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了雹嗦。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片范舀。...
    茶點故事閱讀 38,566評論 1 339
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖了罪,靈堂內(nèi)的尸體忽然破棺而出锭环,到底是詐尸還是另有隱情,我是刑警寧澤泊藕,帶...
    沈念sama閱讀 34,253評論 4 328
  • 正文 年R本政府宣布辅辩,位于F島的核電站,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏玫锋。R本人自食惡果不足惜蛾茉,卻給世界環(huán)境...
    茶點故事閱讀 39,829評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望撩鹿。 院中可真熱鬧谦炬,春花似錦、人聲如沸节沦。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,715評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽散劫。三九已至稚机,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間获搏,已是汗流浹背赖条。 一陣腳步聲響...
    開封第一講書人閱讀 31,945評論 1 264
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留常熙,地道東北人纬乍。 一個月前我還...
    沈念sama閱讀 46,248評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像裸卫,于是被迫代替她去往敵國和親仿贬。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,440評論 2 348

推薦閱讀更多精彩內(nèi)容