突然聽同事提起棍丐,就隨便寫一下的掘譬。
reduceByKey==groupByKey().map()
做一個word count小例子寥掐,
val counts = pairs.groupByKey(count=>(count._1,count._2.sum))
groupByKey的過程 MapPartitionsRDD=>ShuffledRDD=>MapPartitionsRDD=>MapPartitionsRDD
也就是說胀屿,它是原封不動的糊昙,把ShuffleMapTask的輸出费奸,來去到ResultTask的內存中弥激,所以導致所有數(shù)據都進行了網絡傳輸
而如果是reduceByKey,看下shuffleMapTask的write的實現(xiàn)愿阐,判斷了是否有mapSideCombine微服,如果有,就先本地聚合缨历,再寫磁盤以蕴,再傳輸。
override def write(records: Iterator[Product2[K, V]]): Unit = {
sorter = if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
......
}
這到底是在干什么
談之前要有個共識辛孵,分布式系統(tǒng)丛肮,網絡傳輸是占時間比重高也非常影響效率的部分。
說些比較飄浮的內容魄缚,這其實是mapreduce比較經典的map端combine宝与,也就是說因為是分布式系統(tǒng)啊,首先把數(shù)據分散到各個節(jié)點并行計算冶匹,算完了再把數(shù)據傳到其他節(jié)點去做最終結果計算习劫。那么在第一次計算之前,如果能先做一些對最終結果計算有幫助的計算嚼隘,再去傳輸榜聂,就能節(jié)省一點網絡傳輸時間。
說些更飄浮的內容啊嗓蘑,mr這種計算是為了算結果须肆,也就是把數(shù)據的抽象程度變高了匿乃,那么,能越早的接近最終結果豌汇,越能節(jié)約時間幢炸。
適用場景
如果有hadoop基礎就知道,map端combine和reduce端combine邏輯一致才能得到最終結果拒贱。
如果不是,那就是如果需要對單key的所有value放在一起才能計算的邏輯不合適做這種優(yōu)化逻澳。