Broadcast Variables(廣播變量)
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
(廣播變量容許程序在每臺機器上保存一份只讀變量緩存碾盐,而不是為每一個任務(wù)發(fā)送一個副本。他們可以被使用,例如以一種高效的方式給每一個節(jié)點拷貝一份大的數(shù)據(jù)集。spark 試圖通過分布式廣播變量來有效低減少通信開銷。)
(以上直意,意思是這樣。就是spark執(zhí)行任務(wù)的時候會用到共享變量彼城。一般方式是為每個task拷貝一份共享變量。那么問題來了退个,如果一個機器上有上百個task那么就要拷貝上百次募壕,延遲不說,對內(nèi)存溢出造成隱患帜乞。那么使用廣播變量后司抱,所有任務(wù)公用一個只讀變量。有點類似于readonly黎烈,那么只需要傳輸一次习柠,且內(nèi)存保留一個副本匀谣,大大提高效率。)
那么廣播變量這么有用资溃,大家鐵定多用武翎,不知道有木有遇到空指正bug的。
spark Broadcast 空指正異常:(看下面代碼 查查他有沒有毛踩芏А)
package com.migu.dpi
import java.util
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.broadcast.Broadcast
object Test2 {
//代碼純屬為了引出問題 不要較真
? var data: Broadcast[java.util.ArrayList[String]] =null
? def formatFlag(sc: SparkContext, datas: util.ArrayList[String]):Unit = {
data = sc.broadcast(datas)
}
def main(args: Array[String]):Unit = {
var datas: util.ArrayList[String] =new util.ArrayList[String]()
datas.add("mmp")
var conf =new SparkConf().setAppName(Test2.getClass.getName).setMaster("local[2]")
var sc =new SparkContext(conf)
formatFlag(sc, datas)
//...... args(0) is a path
sc.textFile(args(0)).map(x => {
x +data.value.get(0)
}).foreach(println)
? }
}
如果沒有那么這樣呢宝恶,提交到集群呢?
package com.migu.dpi
import java.util
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.broadcast.Broadcast
object Test2 {
//代碼純屬為了引出問題 不要較真
? var data: Broadcast[java.util.ArrayList[String]] =null
? def formatFlag(sc: SparkContext, datas: util.ArrayList[String]):Unit = {
data = sc.broadcast(datas)
}
def main(args: Array[String]):Unit = {
var datas: util.ArrayList[String] =new util.ArrayList[String]()
datas.add("mmp")
var conf =new SparkConf().setAppName(Test2.getClass.getName)
var sc =new SparkContext(conf)
formatFlag(sc, datas)
//...... args(0) is a path
sc.textFile(args(0)).map(x => {
x +data.value.get(0)
}).foreach(println)
? }
}
這時候你會發(fā)現(xiàn)趴捅,第二種情況垫毙,報data廣播變量空指正異常。但是你會想拱绑,不是格式化過了嗎综芥?是的,我當(dāng)時也是這么想的猎拨。
但是 我們 來看看spark提交任務(wù)是怎么干的膀藐。
Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted this way is cached in serialized form and deserialized before running each task. This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.
(意譯:spark行為是通過一個stage集執(zhí)行來完成的。stage是被分布式shuffle動作分割的红省。spark自動將廣播變量廣播到每一個需要的節(jié)點上额各。廣播變量廣播是在每一個運行任務(wù)之前完成的。下面沒用不譯了)
spark 對任務(wù)分發(fā)是通過 預(yù)讀碼 然后 根據(jù)編譯中代碼中行為動作和轉(zhuǎn)換動作來做shuffle切分的吧恃。切分好后把各個任務(wù)發(fā)送到各個節(jié)點虾啦,在任務(wù)執(zhí)行前,把廣播變量發(fā)過去蚜枢。具體誰前誰后缸逃,我猜是廣播變量在前针饥,因為task很多厂抽,有些廣播變量貫穿整個stage,所以我也就不讀源碼直接猜了丁眼。
那么問題來了筷凤,代碼沒運行你的廣播變量格式化個毛啊。所以除非本地運行苞七,否則其他機器全空指正異常藐守。
造成這個現(xiàn)象的根本原因是——全局變量的濫用。所以自我檢討一下蹂风,能用局部變量別用全局卢厂,除非局部變量造成內(nèi)存溢出,為了防止OOM惠啄。其他別為了省幾個變量的空間而使用全局變量慎恒。