Spark 7. 共享變量

共享變量

原文地址: http://spark.apache.org/docs/latest/programming-guide.html#shared-variables
僅限交流使用,轉(zhuǎn)載請注明出處趟畏。如有錯誤螺垢,歡迎指出!

Henvealf/譯

一般情況下, Spark 的 map 或者 reduce 操作(task)的方法是運行在遠(yuǎn)程的集群節(jié)點上的绷杜,且會在每一個操作上復(fù)制一份變量。因為節(jié)點之間的變量不會共享,所以在遠(yuǎn)程機器上的變量的更新不會傳播到驅(qū)動器程序上。通用的解決方法纹因,就是使用可以被全部的 task 讀寫的共享變量,但他會拖慢運行效率琳拨。然而瞭恰, Spark 還是為兩種普遍的使用模式提供了兩種共享變量的受限類型:廣播變量與增量器。

廣播變量(Broadcast Variables)

廣播變量允許在每個機器中的程序里維護一個只讀的緩存變量狱庇,而不是在每個 task 中傳送一個拷貝惊畏。他能夠被用于,舉例來說密任,他會使用一種高效率的方式為每個節(jié)點提供一份大的輸入數(shù)據(jù)集的拷貝颜启。Spark 也嘗試使用一個高效率的廣播算法去減少分發(fā)廣播變量時的交互開銷。

Spark 的 actions 的執(zhí)行會經(jīng)歷一系列通過分布式的 "shuffle" 操作來分離的階段浪讳。Spark 需要各個階段中的 task 來自動廣播公用的數(shù)據(jù)缰盏。數(shù)據(jù)廣播的方式緩存進序列化結(jié)構(gòu)和在運行每一個 task 之前進行反序列化。這意味著僅僅當(dāng)task經(jīng)過了需要相同數(shù)據(jù)的多個階段時,或者按照序列化結(jié)構(gòu)緩存數(shù)據(jù)是重要的時候口猜,才確定創(chuàng)建廣播變量负溪。

通過調(diào)用 SparkContext.boradcast(v), 就可以從變量 v 中創(chuàng)建出一個廣播變量济炎。變量 v 此時已經(jīng)被包裝進了廣播變量在中川抡,可以在調(diào)用 value() 函數(shù),來獲取到變量冻辩。示例代碼如下:

Scala

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

Java

Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});

broadcastVar.value();
// returns [1, 2, 3]

創(chuàng)建了一個廣播變量之后猖腕,在集群上運行的任何方法中拆祈,都應(yīng)該使用廣播變量來代替與 v 中的值相同的變量恨闪,這樣就可以避免 v 的值在節(jié)點上出現(xiàn)了兩次。另外放坏,v 的值不要去修改咙咽,確保在所有節(jié)點上的廣播變量都是相同的值。

增量器 (Accumulators)

增量器是一個只能進行加操作的變量淤年,用于操作之間的協(xié)作與交互钧敞,因此他也支持高效率的并發(fā)。很像 MapReduce 中的 Counter麸粮。原生的增量器支持?jǐn)?shù)字類型溉苛,現(xiàn)在程序可以支持新的類型。

如果你給了增量器一個名字弄诲,那個你就能在 Spark UI 上看到他愚战。通過他來理解程序運行的各個階段(注意,在 Python 中并不支持)齐遵。

Scala

一個數(shù)字增量器可以通過調(diào)用 SparkContext.longAccumulator() 或者 SparkContext.doubleAccumulator() 來創(chuàng)建寂玲。然后在集群上使用 add 方法來進行加操作。然而梗摇,集群不能讀取他的值拓哟。只可以在驅(qū)動器程序上讀取增量器的值。使用 value() 方法伶授。

下面就使用增量氣來將一個數(shù)組中的值加一塊:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

上面上使用原生的數(shù)字類型的增量器断序。下面看看如何創(chuàng)建一個自定義類型的增量器。你的增量器需要繼承 AccumulatorV2 抽象類糜烹。你需要重寫(override)他的幾個方法:

  • reset : 將迭代器中的值設(shè)置為 0违诗。
  • add:增加增量器中的值。
  • merge: 合并其他同類型的增量器景图。

其他需要重寫的方法請看 Scala API 文檔较雕。

下面我們有一個 MyVector 類,代表了數(shù)學(xué)上的向量,下面就是 MyVector 的實現(xiàn)類:

object VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
  val vec_ : MyVector = MyVector.createZeroVector
  def reset(): MyVector = {
    vec_.reset()
  }
  def add(v1: MyVector, v2: MyVector): MyVector = {
    vec_.add(v2)
  }
  ...
}

// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")

增量器的更新在內(nèi)部執(zhí)行的 僅僅是 action 亮蒋。 Spark 保證每個 task 在增量器上的更新僅僅被應(yīng)用一次扣典,也就是說,重啟 task 將不會更新值慎玖。在 transformations 中贮尖,用戶應(yīng)該知道,如果 job 的階段被重新運行趁怔,每一個 task 的更新會被應(yīng)用多次湿硝。

迭代器不會改變 Spark 的懶惰評估。如果更新操作是作用在一個 RDD 上润努,他的值將只會 在作為action 的一部分 來進行計算之后才進行更新关斜。隨之而來的,當(dāng)使用一個懶惰的 transformation(比如 map() )來更新值铺浇,增量器不會保證一定執(zhí)行更新, 下面的代碼就展示了所說的情況:

Scala

val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// 這里 accum 將一直為 0痢畜, 因為沒有 action 來觸發(fā) `map` 任務(wù)的計算。

Java

LongAccumulator accum = jsc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
// 這里 accum 將一直為 0鳍侣, 因為沒有 action 來觸發(fā) `map` 任務(wù)的計算丁稀。


Python

accum = sc.accumulator(0)
def g(x):
  accum.add(x)
  return f(x)
data.map(g)

End !倚聚!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末线衫,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子惑折,更是在濱河造成了極大的恐慌授账,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,036評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件唬复,死亡現(xiàn)場離奇詭異矗积,居然都是意外死亡,警方通過查閱死者的電腦和手機敞咧,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評論 3 395
  • 文/潘曉璐 我一進店門棘捣,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人休建,你說我怎么就攤上這事乍恐。” “怎么了测砂?”我有些...
    開封第一講書人閱讀 164,411評論 0 354
  • 文/不壞的土叔 我叫張陵茵烈,是天一觀的道長。 經(jīng)常有香客問我砌些,道長呜投,這世上最難降的妖魔是什么加匈? 我笑而不...
    開封第一講書人閱讀 58,622評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮仑荐,結(jié)果婚禮上雕拼,老公的妹妹穿的比我還像新娘。我一直安慰自己粘招,他們只是感情好啥寇,可當(dāng)我...
    茶點故事閱讀 67,661評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著洒扎,像睡著了一般辑甜。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上袍冷,一...
    開封第一講書人閱讀 51,521評論 1 304
  • 那天磷醋,我揣著相機與錄音,去河邊找鬼难裆。 笑死子檀,一個胖子當(dāng)著我的面吹牛镊掖,可吹牛的內(nèi)容都是我干的乃戈。 我是一名探鬼主播,決...
    沈念sama閱讀 40,288評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼亩进,長吁一口氣:“原來是場噩夢啊……” “哼症虑!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起归薛,我...
    開封第一講書人閱讀 39,200評論 0 276
  • 序言:老撾萬榮一對情侶失蹤谍憔,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后主籍,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體习贫,經(jīng)...
    沈念sama閱讀 45,644評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,837評論 3 336
  • 正文 我和宋清朗相戀三年千元,在試婚紗的時候發(fā)現(xiàn)自己被綠了苫昌。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,953評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡幸海,死狀恐怖祟身,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情物独,我是刑警寧澤袜硫,帶...
    沈念sama閱讀 35,673評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站挡篓,受9級特大地震影響婉陷,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,281評論 3 329
  • 文/蒙蒙 一秽澳、第九天 我趴在偏房一處隱蔽的房頂上張望世杀。 院中可真熱鬧,春花似錦肝集、人聲如沸瞻坝。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,889評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽所刀。三九已至,卻和暖如春捞挥,著一層夾襖步出監(jiān)牢的瞬間浮创,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,011評論 1 269
  • 我被黑心中介騙來泰國打工砌函, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留斩披,地道東北人。 一個月前我還...
    沈念sama閱讀 48,119評論 3 370
  • 正文 我出身青樓讹俊,卻偏偏與公主長得像垦沉,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子仍劈,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,901評論 2 355

推薦閱讀更多精彩內(nèi)容