一蚊锹、廣播變量
1、廣播變量的優(yōu)點
不需要每個task帶上一份變量副本,而是變成每個節(jié)點的executor存一份副本慌盯。這樣的話周霉, 就可以讓變量產(chǎn)生的副本數(shù)量大大減少。
2亚皂、廣播變量的用法
//將mapRdd廣播后返回broadcastValue
val broadcastValue: Broadcast[Array[(String, String)]] = sc.broadcast(mapRdd)
//獲取廣播變量的值
val getBroadCastMap: Map[String, String] = broadcastValue.value.toMap
3俱箱、廣播變量的原理
初始的時候,在Driver端有一個副本數(shù)據(jù)灭必。廣播變量后狞谱,task運行的時候,在使用副本數(shù)據(jù)前禁漓,首先在所在本地Executor對應(yīng)的BlockManager中跟衅,嘗試獲取副本數(shù)據(jù);如果本地沒有璃饱,即從Driver端拉取副本數(shù)據(jù)与斤,并且保存在所在本地的BlockManager中;此后這個Executor上所有的task荚恶,都會直接使用本地BlockManager中的副本數(shù)據(jù)。另Executor的BlockManager除了從Driver端拉取數(shù)據(jù)磷支,也可能從其他節(jié)點的BlockManager中拉去副本數(shù)據(jù)谒撼。
BlockManager:負責管理某個Executor對應(yīng)的內(nèi)存和磁盤的數(shù)據(jù),嘗試本地BlockManager中招map數(shù)據(jù)雾狈。
4廓潜、優(yōu)化說明
假設(shè)有50個Executor,共1000個task善榛;若每個map數(shù)據(jù)10M辩蛋。默認情況下,1000個副本10M共10G數(shù)據(jù)移盆。在集群中悼院,通過網(wǎng)絡(luò)傳輸,耗費10G的內(nèi)存資源咒循;如果使用了廣播變量据途,50個Executor即50個副本10M共500M數(shù)據(jù)。而且Executor的BlockManager不一定都從Driver傳輸?shù)奖镜匦鸬椋€可能從最近的節(jié)點的Executor的BlockManager中拉取數(shù)據(jù)颖医,網(wǎng)絡(luò)傳輸速度大大增加,傳輸數(shù)據(jù)大大減少裆蒸。
10G/500M=20倍熔萧,極大的提高了性能。
二、代碼實例
1佛致、準備數(shù)據(jù)
//訂單數(shù)據(jù)
1001,20150710,p0001,2
1002,20150710,p0002,3
1002,20150710,p0003,3
//產(chǎn)品數(shù)據(jù)
p0001,xiaomi,1000,2
p0002,appale,1000,3
p0003,samsung,1000,4
2遂赠、代碼開發(fā)
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkBroadCast {
def main(args: Array[String]): Unit = {
//構(gòu)造Spark程序執(zhí)行環(huán)境
val conf = new SparkConf().setAppName("appName").setMaster("local[*]")
//如果集群運行,則不需要設(shè)置setMaster("local[*]")
val sc = new SparkContext(conf)
//設(shè)置日志級別
sc.setLogLevel("WARN")
//創(chuàng)建RDD,讀取產(chǎn)品信息數(shù)據(jù)
//產(chǎn)品記錄樣例:p0001,xiaomi,1000,2
val productRdd: RDD[String] = sc.textFile(path = "D:\\03 Knowledge Related\\02 Learning\\02 Bigdata\\00 kkb\\02 Projects\\IDEA_WORKSPACE\\bigdata_hadoop\\Spark_core\\src\\main\\resources\\prts.txt")
val productMapRdd: Array[(String, String)] = productRdd.map(x => {
(x.split(",")(0), x)
}).collect()
// productMapRdd.foreach(println)
/**
* (p0001,p0001,xiaomi,1000,2)
* (p0002,p0002,appale,1000,3)
* (p0003,p0003,samsung,1000,4)
*/
//將產(chǎn)品數(shù)據(jù)作為廣播變量
val broadcastValue: Broadcast[Array[(String, String)]] = sc.broadcast(productMapRdd)
//讀取訂單記錄:1001,20150710,p0001,2
val ordersRdd: RDD[String] = sc.textFile(path = "D:\\03 Knowledge Related\\02 Learning\\02 Bigdata\\00 kkb\\02 Projects\\IDEA_WORKSPACE\\bigdata_hadoop\\Spark_core\\src\\main\\resources\\orders.txt")
//將訂單記錄按照分區(qū)處理
val productAndOrderRdd: RDD[String] = ordersRdd.mapPartitions(eachPartition => {
//獲取產(chǎn)品廣播變量的數(shù)據(jù)并轉(zhuǎn)換為map類型晌杰,目的是通過getOrElse獲取產(chǎn)品數(shù)據(jù)
val getBroadCastMap: Map[String, String] = broadcastValue.value.toMap
//處理分區(qū)內(nèi)的訂單數(shù)據(jù)記錄
val finalStr = eachPartition.map(eachLine => {
//將每條訂單記錄按照逗號拆分跷睦,返回集合類型
val ordersGet: Array[String] = eachLine.split(",")
//產(chǎn)品的map類型,通過key(訂單的產(chǎn)品id)獲取對應(yīng)的產(chǎn)品記錄肋演,返回產(chǎn)品數(shù)據(jù)記錄
val getProductStr: String = getBroadCastMap.getOrElse(ordersGet(2), "")
//訂單記錄拼接產(chǎn)品記錄
eachLine + "\t" + getProductStr
})
finalStr
})
productAndOrderRdd.foreach(println)
/**
* 1001,20150710,p0001,2 p0001,xiaomi,1000,2
* 1002,20150710,p0002,3 p0002,appale,1000,3
* 1002,20150710,p0003,3 p0003,samsung,1000,4
*/
//關(guān)閉Spark環(huán)境
sc.stop()
}
}
三抑诸、注意事項
- 能不能將一個RDD使用廣播變量廣播出去?
不能爹殊,因為RDD是不存儲數(shù)據(jù)的蜕乡。可以將RDD的結(jié)果廣播出去梗夸。 - 廣播變量只能在Driver端定義层玲,不能在Executor端定義。
- 在Driver端可以修改廣播變量的值反症,在Executor端無法修改廣播變量的值辛块。
- 當Executor端用到了Driver的變量,如果不使用廣播變量在Executor有多少task就有多少Driver端的變量副本铅碍。
- 當Executor端用到了Driver的變量润绵,如果使用廣播變量在每個Executor中只有一份Driver端的變量副本。