Spark共享變量原理

共享變量

通常情況下怯屉,一個傳遞給 Spark 操作(例如 mapreduce)的函數(shù) func 是在遠程的集群節(jié)點上執(zhí)行的蛤签。該函數(shù) func 在多個節(jié)點執(zhí)行過程中使用的變量厕吉,是同一個變量的多個副本。這些變量的以副本的方式拷貝到每個機器上,并且各個遠程機器上變量更新并不會傳播回 driver program(驅(qū)動程序)狱窘。通用且支持 read-write(讀-寫) 的共享變量在任務(wù)間是不能勝任的陵且。所以裁僧,Spark 提供了兩種特定類型的共享變量 : broadcast variables(廣播變量)和 accumulators(累加器)个束。

Broadcast Variables (廣播變量)

Broadcast variables(廣播變量)允許程序員將一個 read-only(只讀的)變量緩存到每臺機器上,而不是給任務(wù)傳遞一個副本聊疲。它們是如何來使用呢茬底,例如,廣播變量可以用一種高效的方式給每個節(jié)點傳遞一份比較大的 input dataset(輸入數(shù)據(jù)集)副本获洲。在使用廣播變量時阱表,Spark 也嘗試使用高效廣播算法分發(fā) broadcast variables(廣播變量)以降低通信成本。

Spark 的 action(動作)操作是通過一系列的 stage(階段)進行執(zhí)行的贡珊,這些 stage(階段)是通過分布式的 "shuffle" 操作進行拆分的最爬。Spark 會自動廣播出每個 stage(階段)內(nèi)任務(wù)所需要的公共數(shù)據(jù)。這種情況下廣播的數(shù)據(jù)使用序列化的形式進行緩存门岔,并在每個任務(wù)運行前進行反序列化爱致。這也就意味著,只有在跨越多個 stage(階段)的多個任務(wù)會使用相同的數(shù)據(jù)寒随,或者在使用反序列化形式的數(shù)據(jù)特別重要的情況下糠悯,使用廣播變量會有比較好的效果。

廣播變量通過在一個變量 v 上調(diào)用 SparkContext.broadcast(v) 方法來進行創(chuàng)建妻往。廣播變量是 v 的一個 wrapper(包裝器)互艾,可以通過調(diào)用 value 方法來訪問它的值。代碼示例如下 :

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

在創(chuàng)建廣播變量之后讯泣,在集群上執(zhí)行的所有的函數(shù)中纫普,應(yīng)該使用該廣播變量代替原來的 v 值,所以節(jié)點上的 v 最多分發(fā)一次判帮。另外局嘁,對象 v 在廣播后不應(yīng)該再被修改,以保證分發(fā)到所有的節(jié)點上的廣播變量具有同樣的值(例如晦墙,如果以后該變量會被運到一個新的節(jié)點)悦昵。

Accumulators (累加器)

Accumulators(累加器)是一個僅可以執(zhí)行 added(添加)的變量來通過一個關(guān)聯(lián)和交換操作,因此可以高效地執(zhí)行支持并行晌畅。累加器可以用于實現(xiàn) counter( 計數(shù)但指,類似在 MapReduce 中那樣)或者 sums(求和)。原生 Spark 支持數(shù)值型的累加器抗楔,并且程序員可以添加新的支持類型棋凳。

創(chuàng)建 accumulators(累加器)并命名之后,在 SparkUI 界面上將會顯示它连躏。這樣可以幫助理解正在運行的階段的運行情況(注意 : 該特性在 Python 中還不支持)剩岳。

image.png

可以通過調(diào)用 SparkContext.longAccumulator()SparkContext.doubleAccumulator()方法創(chuàng)建數(shù)值類型的 accumulator(累加器)以分別累加 LongDouble 類型的值。集群上正在運行的任務(wù)就可以使用add 方法來累計數(shù)值`入热。然而拍棕,它們不能夠讀取它的值晓铆。只有 driver program(驅(qū)動程序)才可以使用 value 方法讀取累加器的值。

下面的代碼展示了一個 accumulator(累加器)被用于對一個數(shù)字中的元素求和绰播。

Scala

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
 
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
 
scala> accum.value
res2: Long = 10

上面的代碼示例使用的是 Spark 內(nèi)置的 Long 類型的累加器骄噪,程序員可以通過繼承 AccumulatorV2 類創(chuàng)建新的累加器類型。AccumulatorV2抽象類有幾個需要 override(重寫)的方法 : reset 方法可將累加器重置為 0蠢箩,add 方法可將其它值添加到累加器中链蕊,merge方法可將其他同樣類型的累加器合并為一個。其他需要重寫的方法可參考 scala API 文檔谬泌。 例如滔韵,假設(shè)我們有一個表示數(shù)學上 vectors(向量)的 MyVector類,我們可以寫成 :

Scala

object VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
  val vec_ : MyVector = MyVector.createZeroVector
  def reset(): MyVector = {
    vec_.reset()
  }
  def add(v1: MyVector, v2: MyVector): MyVector = {
    vec_.add(v2)
  }
  ...
}
 
// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")

注意呵萨,在開發(fā)者定義自己的 AccumulatorV2 類型時奏属, resulting type(返回值類型)可能與添加的元素的類型不一致。

累加器的更新只發(fā)生在 action 操作中潮峦,Spark 保證每個任務(wù)只更新累加器一次囱皿,例如,重啟任務(wù)不會更新值忱嘹。在 transformations(轉(zhuǎn)換)中嘱腥, 用戶需要注意的是,如果 task(任務(wù))或 job stages(階段)重新執(zhí)行拘悦,每個任務(wù)的更新操作可能會執(zhí)行多次齿兔。

累加器不會改變 Spark lazy evaluation(懶加載)的模式。如果累加器在 RDD 中的一個操作中進行更新础米,它們的值僅被更新一次分苇,RDD 被作為 action 的一部分來計算。因此屁桑,在一個像 map()這樣的 transformation(轉(zhuǎn)換)時医寿,累加器的更新并沒有執(zhí)行。下面的代碼片段證明了這個特性 :

Scala

val accum = sc.accumulator(0)
data.map { x => accum += x; x }
// 在這里蘑斧,accus 仍然為 0, 因為沒有 actions(動作)來讓 map 操作被計算靖秩。

為什么需要有共享變量?

image.png

對于每個task都會使用的一個變量竖瘾,假如變量比較大沟突,每個主機的沒份task都copy一份,占用的內(nèi)存空間會特別大捕传;如果做成共享變量惠拭,每個節(jié)點copy一份,這樣可以減少很多空間的開銷庸论;

廣播共享數(shù)據(jù)原理

image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末求橄,一起剝皮案震驚了整個濱河市今野,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌罐农,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,635評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件催什,死亡現(xiàn)場離奇詭異涵亏,居然都是意外死亡,警方通過查閱死者的電腦和手機蒲凶,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評論 3 399
  • 文/潘曉璐 我一進店門气筋,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人旋圆,你說我怎么就攤上這事宠默。” “怎么了灵巧?”我有些...
    開封第一講書人閱讀 168,083評論 0 360
  • 文/不壞的土叔 我叫張陵搀矫,是天一觀的道長。 經(jīng)常有香客問我刻肄,道長瓤球,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,640評論 1 296
  • 正文 為了忘掉前任敏弃,我火速辦了婚禮卦羡,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘麦到。我一直安慰自己绿饵,他們只是感情好,可當我...
    茶點故事閱讀 68,640評論 6 397
  • 文/花漫 我一把揭開白布瓶颠。 她就那樣靜靜地躺著拟赊,像睡著了一般。 火紅的嫁衣襯著肌膚如雪步清。 梳的紋絲不亂的頭發(fā)上要门,一...
    開封第一講書人閱讀 52,262評論 1 308
  • 那天,我揣著相機與錄音廓啊,去河邊找鬼欢搜。 笑死,一個胖子當著我的面吹牛谴轮,可吹牛的內(nèi)容都是我干的炒瘟。 我是一名探鬼主播,決...
    沈念sama閱讀 40,833評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼第步,長吁一口氣:“原來是場噩夢啊……” “哼疮装!你這毒婦竟也來了缘琅?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,736評論 0 276
  • 序言:老撾萬榮一對情侶失蹤廓推,失蹤者是張志新(化名)和其女友劉穎刷袍,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體樊展,經(jīng)...
    沈念sama閱讀 46,280評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡呻纹,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,369評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了专缠。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片雷酪。...
    茶點故事閱讀 40,503評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖涝婉,靈堂內(nèi)的尸體忽然破棺而出哥力,到底是詐尸還是另有隱情,我是刑警寧澤墩弯,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布吩跋,位于F島的核電站,受9級特大地震影響最住,放射性物質(zhì)發(fā)生泄漏钞澳。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,870評論 3 333
  • 文/蒙蒙 一涨缚、第九天 我趴在偏房一處隱蔽的房頂上張望轧粟。 院中可真熱鬧,春花似錦脓魏、人聲如沸兰吟。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,340評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽混蔼。三九已至,卻和暖如春珊燎,著一層夾襖步出監(jiān)牢的瞬間惭嚣,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,460評論 1 272
  • 我被黑心中介騙來泰國打工悔政, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留晚吞,地道東北人。 一個月前我還...
    沈念sama閱讀 48,909評論 3 376
  • 正文 我出身青樓谋国,卻偏偏與公主長得像槽地,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,512評論 2 359

推薦閱讀更多精彩內(nèi)容