aggregate(RDD.scala)
大致的意思是aggregate接收兩個(gè)函數(shù)辟宗,和一個(gè)初始化值赊豌。seqOp函數(shù)用于聚集每一個(gè)分區(qū),combOp用于聚集所有分區(qū)聚集后的結(jié)果瓤湘。每一個(gè)分區(qū)的聚集瓢颅,和最后所有分區(qū)的聚集都需要初始化值的參與。
舉例如下:
集群環(huán)境:一臺(tái)Master弛说,三臺(tái)Worker,在spark-shell中測(cè)試
scala> val seqOp:(Int,Int)=>Int={(a,b)=>{println("seqOp"+a+"\t"+b);math.min(a,b)}}
seqOp: (Int, Int) => Int = <function2>
scala> val combOp:(Int,Int)=>Int={(a,b)=>{println("combOp"+a+"\t"+b);a+b}}
combOp: (Int, Int) => Int = <function2>
scala> val z=sc.parallelize(List(1,2,3,4,5,6,7,8),2)
z: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> z.aggregate(3)(seqOp,combOp)
[Stage 13:> (0 + 0) / 2]combOp3 1
combOp 4 3
res13: Int = 7
為什么會(huì)等于7呢挽懦?
任務(wù)
可以看出有兩個(gè)任務(wù),原因是我們將List并發(fā)數(shù)設(shè)置為了2木人,Spark會(huì)將List拆分成2部分同時(shí)執(zhí)行信柿。再進(jìn)一步看這兩個(gè)任務(wù)的的統(tǒng)計(jì)信息:
任務(wù)的統(tǒng)計(jì)信息
可以看出這兩個(gè)任務(wù)在兩個(gè)worker上執(zhí)行,可以看到任務(wù)的啟動(dòng)時(shí)間醒第,執(zhí)行了多久等信息渔嚷。再進(jìn)一步看任務(wù)的stdout輸出日志:
任務(wù)一的輸出日志
任務(wù)二的輸出日志
可以看出spark將List拆分成了兩部分,啟動(dòng)兩個(gè)任務(wù)分別執(zhí)行稠曼。再來(lái)看看seqOp函數(shù)表達(dá)的意思形病,seqOp取的是兩個(gè)數(shù)中的較小值。如第一半部分List(1,2,3,4),spark會(huì)拿初始值3與這個(gè)List中的每一個(gè)元素分別比較蒲列,最后得出的結(jié)果是1.同時(shí),第二半部分List得出的結(jié)果是3,然后spark再將這兩部分得出的結(jié)果調(diào)用combOp處理搀罢,combOp是兩個(gè)數(shù)的相加蝗岖,spark首先將初始值3加上1得出4,再加上3得到7.
接下來(lái)再看一下使用aggregate方法編寫(xiě)wordcount例子榔至。
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
/**
* User:cool coding
* Date:20171214
* Time:16:12:20
*
*/
object WordCount {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("wordcount").setMaster("local[2]")
val sc=new SparkContext(conf)
val data=sc.textFile("H:/hadoop/wordcount.txt");
val words: RDD[String] = data.flatMap(_.split(" "))
val countsMap=words.aggregate(mutable.HashMap[String,Int]())((agg:mutable.HashMap[String,Int], word)=>{
if(!agg.contains(word)){
agg.put(word,1)
}else{
agg.put(word,agg(word)+1)
}
agg
},(agg1:mutable.HashMap[String,Int],agg2:mutable.HashMap[String,Int])=> {
for((word,count)<-agg1){
if(!agg2.contains(word)){
agg2.put(word,1)
}else{
agg2.put(word,agg2(word)+count)
}
}
agg2
}
)
println(countsMap.toList)
}
}