一栽惶、前述
Spark中因為算子中的真正邏輯是發(fā)送到Executor中去運行的,所以當Executor中需要引用外部變量時疾嗅,需要使用廣播變量外厂。
累機器相當于統(tǒng)籌大變量,常用于計數(shù)代承,統(tǒng)計汁蝶。
二、具體原理
1论悴、廣播變量
- 廣播變量理解圖
image
- 注意事項
1掖棉、能不能將一個RDD使用廣播變量廣播出去?
不能膀估,因為RDD是不存儲數(shù)據(jù)的幔亥。可以將RDD的結果廣播出去。
2察纯、 廣播變量只能在Driver端定義帕棉,不能在Executor端定義针肥。
3、 在Driver端可以修改廣播變量的值香伴,在Executor端無法修改廣播變量的值祖驱。
4、如果executor端用到了Driver的變量瞒窒,如果不使用廣播變量在Executor有多少task就有多少Driver端的變量副本捺僻。
5、如果Executor端用到了Driver的變量崇裁,如果使用廣播變量在每個Executor中只有一份Driver端的變量副本匕坯。
val conf = new SparkConf()
conf.setMaster("local").setAppName("brocast")
val sc = new SparkContext(conf)
val list = List("hello","world")
val broadCast = sc.broadcast(list)
val lineRDD = sc.textFile("input/*")
lineRDD.filter { x => {println(broadCast.value);broadCast.value.contains(x)} }.collect().foreach { println}
sc.stop()
2、累加器
- 累加器理解圖
image
image
Scala代碼:
有問題
object acculateDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("sortDemo")
val sc = new SparkContext(conf)
var accumulator = 0
sc.textFile("input/*",2).foreach {//兩個變量
x =>{accumulator += x.toInt
println(accumulator)}}
println(accumulator)
sc.stop()
}
}
正確
import org.apache.spark.{SparkConf, SparkContext}
object AccumulatorOperator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local").setAppName("accumulator")
val sc = new SparkContext(conf)
val accumulator = sc.longAccumulator
sc.textFile("./records.txt",2).foreach {//兩個變量
x =>{accumulator.add(1)
println(accumulator)}}
println(accumulator.value)
sc.stop()
}
}
結果:
image
package com.neusoft
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by ttc on 2018/10/17.
*/
object acculateDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.textFile("/root/words.txt",10)
val sum = sc.collectionAccumulator[String]
rdd.map(x=>{
sum.add(x)
}).collect()
println("sum is " + sum.value)
sc.stop()
}
}
注意事項
累加器在Driver端定義賦初始值拔稳,累加器只能在Driver端讀取最后的值葛峻,在Excutor端更新。