在Spark官方文檔中對aggregate函數(shù)定義如下
def aggregate[S](zeroValue: =>S)(seqop: (S, T) => S, combop: (S, S) => S): S
其中seqOp是聚合各分區(qū)中的元素,將元素類型從T轉(zhuǎn)變?yōu)镾,操作的初始值是zeroValue
combop是將聚合各分區(qū)中元素的結(jié)果再次進行聚合励背,操作的初始值也是zeroValue
舉個列子:
對List(1,2,3,4,5,6,7,8,9)取平均值
/**
* @author td
* @date 2017/11/23
*/
object Demo {
/**
* 對各分區(qū)的元素進行聚合,聚合的初始值為zeroValue
* @param tuple1
* @param num
*/
def sqlOp(tuple1: (Int, Int), num: Int): (Int, Int) = {
(tuple1._1+num,tuple1._2+1)
}
/**
* 對各分區(qū)元素聚合的結(jié)果再次進行聚合,聚合的初始值為zeroValue
* @param tuple1
* @return
*/
def combOp(tuple1: (Int,Int),tuple2: (Int,Int)): (Int,Int) = {
(tuple1._1+tuple2._1,tuple1._2+tuple2._2);
}
def main(args: Array[String]): Unit = {
val rdd = List(1,2,3,4,5,6,7,8,9)
val resutlt = rdd.par.aggregate((0,0))(sqlOp,combOp);
println(resutlt._1)
println(resutlt._2)
val avg = resutlt._1/resutlt._2;
println("平均值是"+avg)
}
}
其中sqlOp操作的過程是樣子:
第一步:將zeroValue作為初始值進行運算(0,0)
def sqlOp((0,0),1): (Int, Int) = {
(0+1,0+1)
}
第二步:將第一步的結(jié)果tuple作為參數(shù)傳入進去(1,1)
def sqlOp((1,1),2): (Int, Int) = {
(1+2,1+1)
}
以此類推sqlOp每一步的過程:
3+3, 2+1
6+4, 3+1
10+5, 4+1
15+6, 5+1
21+7, 6+1
28+8, 7+1
36+9, 8+1
上面說的spark單線程運行的情況卓箫,Spark在實際運行過程中是以分區(qū)多線程的形式運行
比如分為3個分區(qū)List(1,2,3,4)、List(5,6,7,8)贪庙、List(9)。然后combOp函數(shù)將sqlOp每個分區(qū)算出來的結(jié)果再次進行聚合。(0+1+2+3+4,4)烫扼、(0+5+6+7+8,4)哀澈、(0+9,1)
聚合后的結(jié)果是(0+1+2+3+4+0+5+6+7+8+0+9,4+4+1) 即(45,9)牌借。在求平均值就簡單了。