共享變量
原文地址: http://spark.apache.org/docs/latest/programming-guide.html#shared-variables
僅限交流使用,轉(zhuǎn)載請注明出處趟畏。如有錯誤螺垢,歡迎指出!
Henvealf/譯
一般情況下, Spark 的 map 或者 reduce 操作(task)的方法是運行在遠(yuǎn)程的集群節(jié)點上的绷杜,且會在每一個操作上復(fù)制一份變量。因為節(jié)點之間的變量不會共享,所以在遠(yuǎn)程機器上的變量的更新不會傳播到驅(qū)動器程序上。通用的解決方法纹因,就是使用可以被全部的 task 讀寫的共享變量,但他會拖慢運行效率琳拨。然而瞭恰, Spark 還是為兩種普遍的使用模式提供了兩種共享變量的受限類型:廣播變量與增量器。
廣播變量(Broadcast Variables)
廣播變量允許在每個機器中的程序里維護一個只讀的緩存變量狱庇,而不是在每個 task 中傳送一個拷貝惊畏。他能夠被用于,舉例來說密任,他會使用一種高效率的方式為每個節(jié)點提供一份大的輸入數(shù)據(jù)集的拷貝颜启。Spark 也嘗試使用一個高效率的廣播算法去減少分發(fā)廣播變量時的交互開銷。
Spark 的 actions 的執(zhí)行會經(jīng)歷一系列通過分布式的 "shuffle" 操作來分離的階段浪讳。Spark 需要各個階段中的 task 來自動廣播公用的數(shù)據(jù)缰盏。數(shù)據(jù)廣播的方式緩存進序列化結(jié)構(gòu)和在運行每一個 task 之前進行反序列化。這意味著僅僅當(dāng)task經(jīng)過了需要相同數(shù)據(jù)的多個階段時,或者按照序列化結(jié)構(gòu)緩存數(shù)據(jù)是重要的時候口猜,才確定創(chuàng)建廣播變量负溪。
通過調(diào)用 SparkContext.boradcast(v), 就可以從變量 v 中創(chuàng)建出一個廣播變量济炎。變量 v 此時已經(jīng)被包裝進了廣播變量在中川抡,可以在調(diào)用 value() 函數(shù),來獲取到變量冻辩。示例代碼如下:
Scala
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
Java
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
broadcastVar.value();
// returns [1, 2, 3]
創(chuàng)建了一個廣播變量之后猖腕,在集群上運行的任何方法中拆祈,都應(yīng)該使用廣播變量來代替與 v 中的值相同的變量恨闪,這樣就可以避免 v 的值在節(jié)點上出現(xiàn)了兩次。另外放坏,v 的值不要去修改咙咽,確保在所有節(jié)點上的廣播變量都是相同的值。
增量器 (Accumulators)
增量器是一個只能進行加操作的變量淤年,用于操作之間的協(xié)作與交互钧敞,因此他也支持高效率的并發(fā)。很像 MapReduce 中的 Counter麸粮。原生的增量器支持?jǐn)?shù)字類型溉苛,現(xiàn)在程序可以支持新的類型。
如果你給了增量器一個名字弄诲,那個你就能在 Spark UI 上看到他愚战。通過他來理解程序運行的各個階段(注意,在 Python 中并不支持)齐遵。
Scala
一個數(shù)字增量器可以通過調(diào)用 SparkContext.longAccumulator() 或者 SparkContext.doubleAccumulator() 來創(chuàng)建寂玲。然后在集群上使用 add 方法來進行加操作。然而梗摇,集群不能讀取他的值拓哟。只可以在驅(qū)動器程序上讀取增量器的值。使用 value() 方法伶授。
下面就使用增量氣來將一個數(shù)組中的值加一塊:
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Long = 10
上面上使用原生的數(shù)字類型的增量器断序。下面看看如何創(chuàng)建一個自定義類型的增量器。你的增量器需要繼承 AccumulatorV2 抽象類糜烹。你需要重寫(override)他的幾個方法:
- reset : 將迭代器中的值設(shè)置為 0违诗。
- add:增加增量器中的值。
- merge: 合并其他同類型的增量器景图。
其他需要重寫的方法請看 Scala API 文檔较雕。
下面我們有一個 MyVector 類,代表了數(shù)學(xué)上的向量,下面就是 MyVector 的實現(xiàn)類:
object VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
val vec_ : MyVector = MyVector.createZeroVector
def reset(): MyVector = {
vec_.reset()
}
def add(v1: MyVector, v2: MyVector): MyVector = {
vec_.add(v2)
}
...
}
// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")
增量器的更新在內(nèi)部執(zhí)行的 僅僅是 action 亮蒋。 Spark 保證每個 task 在增量器上的更新僅僅被應(yīng)用一次扣典,也就是說,重啟 task 將不會更新值慎玖。在 transformations 中贮尖,用戶應(yīng)該知道,如果 job 的階段被重新運行趁怔,每一個 task 的更新會被應(yīng)用多次湿硝。
迭代器不會改變 Spark 的懶惰評估。如果更新操作是作用在一個 RDD 上润努,他的值將只會 在作為action 的一部分 來進行計算之后才進行更新关斜。隨之而來的,當(dāng)使用一個懶惰的 transformation(比如 map() )來更新值铺浇,增量器不會保證一定執(zhí)行更新, 下面的代碼就展示了所說的情況:
Scala
val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// 這里 accum 將一直為 0痢畜, 因為沒有 action 來觸發(fā) `map` 任務(wù)的計算。
Java
LongAccumulator accum = jsc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
// 這里 accum 將一直為 0鳍侣, 因為沒有 action 來觸發(fā) `map` 任務(wù)的計算丁稀。
Python
accum = sc.accumulator(0)
def g(x):
accum.add(x)
return f(x)
data.map(g)