Spark編程進(jìn)階
這一部分將介紹一些沒有提到的一些Spark的特性,都是非常有用的家妆,內(nèi)容之間關(guān)聯(lián)性不是很強(qiáng)鸵荠。主要包括,共享變量伤极,分區(qū)操作蛹找,調(diào)用腳本以及統(tǒng)計(jì)操作姨伤。
1. 共享變量之累加器
通常在向 Spark 傳遞函數(shù)時(shí),比如使用map()函數(shù)或者用filter()傳條件時(shí)庸疾,可以使用驅(qū) 動(dòng)器程序中定義的變量乍楚,但是集群中運(yùn)行的每個(gè)任務(wù)都會(huì)得到這些變量的一份新的副本, 更新這些副本的值也不會(huì)影響驅(qū)動(dòng)器中的對(duì)應(yīng)變量届慈。
下面這段scala程序讀取文件的同時(shí)統(tǒng)計(jì)了空白行的數(shù)量
val sc = new SparkContext(...)
val file = sc.textFile("file.txt")
val blankLines = sc.accumulator(0) // 創(chuàng)建Accumulator[Int]并初始化為0
val callSigns = file.flatMap(line => {
if (line == "") {
blankLines += 1 // 累加器加1
}
line.split(" ")
})
callSigns.saveAsTextFile("output.txt")
println("Blank lines: " + blankLines.value)
注意徒溪,工作節(jié)點(diǎn)上的任務(wù)不能訪問累加器的值。我的理解是轉(zhuǎn)化操作都是惰性操作金顿,使用累加器的值存在同步問題臊泌。
2. 共享變量之廣播變量
Spark的第二種共享變量類型是廣播變量,它可以讓程序高效地向所有工作節(jié)點(diǎn)發(fā)送一個(gè) 較大的只讀值揍拆,以供一個(gè)或多個(gè)Spark操作使用
廣播變量為只讀變量渠概,它由運(yùn)行SparkContext的驅(qū)動(dòng)程序創(chuàng)建后發(fā)送給會(huì)參與計(jì)算的節(jié)點(diǎn)。
廣播變量可以被非驅(qū)動(dòng)程序所在的節(jié)點(diǎn)(即工作節(jié)點(diǎn))訪問嫂拴,訪問的方法是用該廣播變量的value方法播揪。
val broadcastAList = sc.broadcast(List("a", "b", "c", "d", "e"))
sc.parallelize(List("1", "2", "3")).map(x => broadcastAList.value ++ x).collect
3. 基于分區(qū)的操作
基于分區(qū)對(duì)數(shù)據(jù)進(jìn)行操作可以讓我們避免為每個(gè)數(shù)據(jù)元素進(jìn)行重復(fù)的配置工作。諸如打開 數(shù)據(jù)庫連接或創(chuàng)建隨機(jī)數(shù)生成器等操作顷牌,都是我們應(yīng)當(dāng)盡量避免為每個(gè)元素都配置一次的 工作剪芍。Spark 提供基于分區(qū)的map和foreach,讓你的部分代碼只對(duì)RDD的每個(gè)分區(qū)運(yùn)行 一次窟蓝,這樣可以幫助降低這些操作的代價(jià)罪裹。
4. 與外部程序間的管道
如果Scala、Java以及Python都不能實(shí)現(xiàn)你需要的功能运挫,那么Spark也為這種情況提供了一種通用機(jī)制状共,可以將數(shù)據(jù)通過管道傳給用其他語言編寫的程序,比如R語言腳本谁帕。
有點(diǎn)類似Hadoop的Streaming
5. 數(shù)值RDD操作
Spark 對(duì)包含數(shù)值數(shù)據(jù)的RDD提供了一些描述性的統(tǒng)計(jì)操作峡继。
之前已經(jīng)使用過的類似rdd.count()的操作,Spark提供了一系列的操作匈挖,如果需要用到多個(gè)操作的話建議先調(diào)用rdd.stats()將這些值通過一次遍歷全部計(jì)算出來碾牌。
Spark包含的統(tǒng)計(jì)值有:
count() RDD 中的元素個(gè)數(shù)
mean() 元素的平均值
sum() 總和
max() 最大值
min() 最小值
variance() 元素的方差
sampleVariance() 從采樣中計(jì)算出的方差
stdev() 標(biāo)準(zhǔn)差
sampleStdev() 采樣的標(biāo)準(zhǔn)差
下面這個(gè)例子用來篩選掉異常值
val distanceDouble = distance.map(string => string.toDouble)
val stats = distanceDoubles.stats()
val stddev = stats.stdev
val mean = stats.mean
val reasonableDistances = distanceDoubles.filter(x => math.abs(x-mean) < 3 * stddev) println(reasonableDistance.collect().toList)