共享變量
通常情況下怯屉,一個傳遞給 Spark 操作(例如 map
或 reduce
)的函數(shù) func 是在遠程的集群節(jié)點上執(zhí)行的蛤签。該函數(shù) func 在多個節(jié)點執(zhí)行過程中使用的變量厕吉,是同一個變量的多個副本。這些變量的以副本的方式拷貝到每個機器上,并且各個遠程機器上變量的更新并不會傳播回 driver program(驅(qū)動程序)狱窘。通用且支持 read-write(讀-寫) 的共享變量在任務(wù)間是不能勝任的陵且。所以裁僧,Spark 提供了兩種特定類型的共享變量 : broadcast variables(廣播變量)和 accumulators(累加器)个束。
Broadcast Variables (廣播變量)
Broadcast variables(廣播變量)允許程序員將一個 read-only(只讀的)變量緩存到每臺機器上,而不是給任務(wù)傳遞一個副本聊疲。它們是如何來使用呢茬底,例如,廣播變量可以用一種高效的方式給每個節(jié)點傳遞一份比較大的 input dataset(輸入數(shù)據(jù)集)副本获洲。在使用廣播變量時阱表,Spark 也嘗試使用高效廣播算法分發(fā) broadcast variables(廣播變量)以降低通信成本。
Spark 的 action(動作)操作是通過一系列的 stage(階段)進行執(zhí)行的贡珊,這些 stage(階段)是通過分布式的 "shuffle" 操作進行拆分的最爬。Spark 會自動廣播出每個 stage(階段)內(nèi)任務(wù)所需要的公共數(shù)據(jù)。這種情況下廣播的數(shù)據(jù)使用序列化的形式進行緩存门岔,并在每個任務(wù)運行前進行反序列化爱致。這也就意味著,只有在跨越多個 stage(階段)的多個任務(wù)會使用相同的數(shù)據(jù)寒随,或者在使用反序列化形式的數(shù)據(jù)特別重要的情況下糠悯,使用廣播變量會有比較好的效果。
廣播變量通過在一個變量 v 上調(diào)用 SparkContext.broadcast(v) 方法來進行創(chuàng)建妻往。廣播變量是 v 的一個 wrapper(包裝器)互艾,可以通過調(diào)用 value 方法來訪問它的值。代碼示例如下 :
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
在創(chuàng)建廣播變量之后讯泣,在集群上執(zhí)行的所有的函數(shù)中纫普,應(yīng)該使用該廣播變量代替原來的 v 值,所以節(jié)點上的 v 最多分發(fā)一次判帮。另外局嘁,對象 v 在廣播后不應(yīng)該再被修改,以保證分發(fā)到所有的節(jié)點上的廣播變量具有同樣的值(例如晦墙,如果以后該變量會被運到一個新的節(jié)點)悦昵。
Accumulators (累加器)
Accumulators(累加器)是一個僅可以執(zhí)行 added
(添加)的變量來通過一個關(guān)聯(lián)和交換操作,因此可以高效地執(zhí)行支持并行晌畅。累加器可以用于實現(xiàn) counter( 計數(shù)但指,類似在 MapReduce 中那樣)或者 sums(求和)。原生 Spark 支持數(shù)值型的累加器抗楔,并且程序員可以添加新的支持類型棋凳。
創(chuàng)建 accumulators(累加器)并命名之后,在 Spark 的 UI 界面上將會顯示它连躏。這樣可以幫助理解正在運行的階段的運行情況(注意 : 該特性在 Python 中還不支持)剩岳。
可以通過調(diào)用 SparkContext.longAccumulator()
或 SparkContext.doubleAccumulator()
方法創(chuàng)建數(shù)值類型的 accumulator(累加器)以分別累加 Long 或Double 類型的值。集群上正在運行的任務(wù)就可以使用
add 方法來累計數(shù)值`入热。然而拍棕,它們不能夠讀取它的值晓铆。只有 driver program(驅(qū)動程序)才可以使用 value 方法讀取累加器的值。
下面的代碼展示了一個 accumulator(累加器)被用于對一個數(shù)字中的元素求和绰播。
Scala
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Long = 10
上面的代碼示例使用的是 Spark 內(nèi)置的 Long 類型的累加器骄噪,程序員可以通過繼承 AccumulatorV2 類創(chuàng)建新的累加器類型。AccumulatorV2抽象類有幾個需要 override(重寫)的方法 : reset 方法可將累加器重置為 0蠢箩,add 方法可將其它值添加到累加器中链蕊,merge方法可將其他同樣類型的累加器合并為一個。其他需要重寫的方法可參考 scala API 文檔谬泌。 例如滔韵,假設(shè)我們有一個表示數(shù)學上 vectors(向量)的 MyVector
類,我們可以寫成 :
Scala
object VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
val vec_ : MyVector = MyVector.createZeroVector
def reset(): MyVector = {
vec_.reset()
}
def add(v1: MyVector, v2: MyVector): MyVector = {
vec_.add(v2)
}
...
}
// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")
注意呵萨,在開發(fā)者定義自己的 AccumulatorV2 類型時奏属, resulting type(返回值類型)可能與添加的元素的類型不一致。
累加器的更新只發(fā)生在 action 操作中潮峦,Spark 保證每個任務(wù)只更新累加器一次囱皿,例如,重啟任務(wù)不會更新值忱嘹。在 transformations(轉(zhuǎn)換)中嘱腥, 用戶需要注意的是,如果 task(任務(wù))或 job stages(階段)重新執(zhí)行拘悦,每個任務(wù)的更新操作可能會執(zhí)行多次齿兔。
累加器不會改變 Spark lazy evaluation(懶加載)的模式。如果累加器在 RDD 中的一個操作中進行更新础米,它們的值僅被更新一次分苇,RDD 被作為 action 的一部分來計算。因此屁桑,在一個像 map()
這樣的 transformation(轉(zhuǎn)換)時医寿,累加器的更新并沒有執(zhí)行。下面的代碼片段證明了這個特性 :
Scala
val accum = sc.accumulator(0)
data.map { x => accum += x; x }
// 在這里蘑斧,accus 仍然為 0, 因為沒有 actions(動作)來讓 map 操作被計算靖秩。
為什么需要有共享變量?
對于每個task都會使用的一個變量竖瘾,假如變量比較大沟突,每個主機的沒份task都copy一份,占用的內(nèi)存空間會特別大捕传;如果做成共享變量惠拭,每個節(jié)點copy一份,這樣可以減少很多空間的開銷庸论;