一、前述
Spark中因?yàn)樗阕又械恼嬲壿嬍前l(fā)送到Executor中去運(yùn)行的,所以當(dāng)Executor中需要引用外部變量時(shí)帅涂,需要使用廣播變量。
累機(jī)器相當(dāng)于統(tǒng)籌大變量尤蛮,常用于計(jì)數(shù)媳友,統(tǒng)計(jì)。
二产捞、具體原理
1醇锚、廣播變量
- 廣播變量理解圖
image
- 注意事項(xiàng)
1、能不能將一個(gè)RDD使用廣播變量廣播出去坯临?
不能焊唬,因?yàn)镽DD是不存儲(chǔ)數(shù)據(jù)的恋昼。可以將RDD的結(jié)果廣播出去。
2赶促、 廣播變量只能在Driver端定義液肌,不能在Executor端定義。
3芳杏、 在Driver端可以修改廣播變量的值矩屁,在Executor端無(wú)法修改廣播變量的值。
4爵赵、如果executor端用到了Driver的變量吝秕,如果不使用廣播變量在Executor有多少task就有多少Driver端的變量副本。
5空幻、如果Executor端用到了Driver的變量烁峭,如果使用廣播變量在每個(gè)Executor中只有一份Driver端的變量副本。
val conf = new SparkConf()
conf.setMaster("local").setAppName("brocast")
val sc = new SparkContext(conf)
val list = List("hello","world")
val broadCast = sc.broadcast(list)
val lineRDD = sc.textFile("input/*")
lineRDD.filter { x => {println(broadCast.value);broadCast.value.contains(x)} }.collect().foreach { println}
sc.stop()
2秕铛、累加器
- 累加器理解圖
image
image
Scala代碼:
有問(wèn)題
object acculateDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("sortDemo")
val sc = new SparkContext(conf)
var accumulator = 0
sc.textFile("input/*",2).foreach {//兩個(gè)變量
x =>{accumulator += x.toInt
println(accumulator)}}
println(accumulator)
sc.stop()
}
}
正確
import org.apache.spark.{SparkConf, SparkContext}
object AccumulatorOperator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local").setAppName("accumulator")
val sc = new SparkContext(conf)
val accumulator = sc.longAccumulator
sc.textFile("./records.txt",2).foreach {//兩個(gè)變量
x =>{accumulator.add(1)
println(accumulator)}}
println(accumulator.value)
sc.stop()
}
}
結(jié)果:
image
package com.neusoft
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by ttc on 2018/10/17.
*/
object acculateDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.textFile("/root/words.txt",10)
val sum = sc.collectionAccumulator[String]
rdd.map(x=>{
sum.add(x)
}).collect()
println("sum is " + sum.value)
sc.stop()
}
}
注意事項(xiàng)
累加器在Driver端定義賦初始值约郁,累加器只能在Driver端讀取最后的值,在Excutor端更新但两。