背景
????????前段時間做的是一個流式項目里不傅,場景為:對于流式數(shù)據(jù)宝冕,使用過濾規(guī)則進行實時過濾并產(chǎn)出結(jié)果數(shù)據(jù)姓言。流式數(shù)據(jù)為源源不斷的IP寻狂,篩選出在合格IP集合中的數(shù)據(jù)岁经,傳輸?shù)较掠蜗⒅虚g件中。
技術(shù)選型
????????上游數(shù)據(jù)從消息中間件中讀取荆虱,處理采用Spark Streaming蒿偎,下游也采用消息中間件。
廣播變量
廣播變量的適用場景
????????在Spark這種分布式計算中怀读,如果每個算子都需要讀取一個變量诉位,并且變量的數(shù)據(jù)量最好在百級,則采用廣播變量菜枷,把這個變量廣播到各個executor算子中苍糠;
廣播變量相對于外部變量的優(yōu)點
- 減少了各個算子間數(shù)據(jù)的網(wǎng)絡(luò)傳輸;很明顯啤誊,廣播變量一旦廣播后岳瞭,在所有executor中都存在了,不隨task的變化而變化蚊锹。但是對于外部變量來說瞳筏,其隨著task的變化而變化,如果task中用到了牡昆,它則需要從各個節(jié)點中拉取/傳輸/刪除姚炕;
- 減少了內(nèi)存占用;廣播變量的數(shù)據(jù)存儲在executor的共享內(nèi)存中,即:一個executor中只存儲一份廣播變量柱宦;但是外部變量則是一個task中都存儲一份些椒,一個executor中分配到了多少task,則存儲多少份外部變量掸刊;
ps:Spark中executor的內(nèi)存管理見之前寫的文章:https://blog.csdn.net/qq_35583915/article/details/109359939
廣播變量的使用
val sparkConf = new SparkConf().setAppName("broadcast-in-spark")
sparkConf.set("spark-config-key","spark-config-value")
val sparkSession = SparkSession
.builder
.config(sparkConf)
.enableHiveSupport()
.getOrCreate()
//注意免糕,因為一個Spark項目中只允許定義一個spark上下文,所以忧侧,后面用于廣播變量的sparkContext只能從前面定義的sparkSession中獲取石窑,以保證不出現(xiàn)兩個兩個上下文定義
val sparkContext = sparkSession.getSparkContext
val broadcastUse = sparkContext.broadcast(useValue)
println(s"此次廣播變量的內(nèi)容為${broadcastUse.value}")
廣播變量使用心得
- 最好能把廣播變量當(dāng)成一個常量一樣去使用。當(dāng)然不是說廣播變量不能修改蚓炬,只不過廣播變量的修改步驟為:1.刪除這個廣播變量尼斧;2.使用當(dāng)前名字重新定義新的廣播變量。所以說试吁,廣播變量最符合的使用場景是常量情況下棺棵;
- 不能在算子中定義/修改/刪除廣播變量。除讀操作外熄捍,廣播變量的其他操作需要借助項目的Spark上下文環(huán)境烛恤,即sparkContext。而Spark的上下文環(huán)境只能在Driver中定義和使用余耽,不能在Executor間進行序列化后傳輸缚柏。眾所周知,算子的執(zhí)行是在Executor中碟贾,但如外部變量币喧、設(shè)置Spark執(zhí)行環(huán)境等操作是在Driver中。[ps:之前總結(jié)的Driver/Executor等的作用見文章:https://blog.csdn.net/qq_35583915/article/details/109359346]因此袱耽,只能在算子中通過.value方法獲取廣播變量的值參與運算杀餐,不能對廣播變量進行修改等操作。