/**
- 業(yè)務(wù)場景:數(shù)據(jù)去重問題
- Created by YJ on 2017/2/7.
- 統(tǒng)計(jì)數(shù)據(jù),盡量用reduceByKey,不要用groupByKey,優(yōu)化點(diǎn)
- reduceByKey,在本機(jī)suffle后,再發(fā)送一個總map,發(fā)送到一個總機(jī)器上匯總,(匯總要壓力信布贰)
- groupByKey,發(fā)送本機(jī)所有的map,在一個機(jī)器上匯總(匯總壓力大)
/
/
數(shù)據(jù)格式
flie1:
2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c
flie2:
2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d
2012-3-5 a
2012-3-6 c
2012-3-7 d
2012-3-3 c
*/
package ClassicCase
import org.apache.spark.{SparkConf, SparkContext}
object case2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("reduce")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
//獲取數(shù)據(jù)
val two = sc.textFile("hdfs://192.168.109.130:8020//user/flume/ClassicCase/case2/*")
two.filter(_.trim().length>0) //需要有空格。
.map(line=>(line.trim,""))//全部值當(dāng)key盒卸,(key value,"")
.groupByKey()//groupByKey,過濾重復(fù)的key value 英支,發(fā)送到總機(jī)器上匯總
.sortByKey() //按key value的自然順序排序
.keys.collect().foreach(println) //所有的keys變成數(shù)組再輸出
//第二種有風(fēng)險
two.filter(_.trim().length>0)
.map(line=>(line.trim,"1"))
.distinct()
.reduceByKey(_+_)
.sortByKey()
.foreach(println)
//reduceByKey,在本機(jī)suffle后,再發(fā)送一個總map随常,發(fā)送到一個總機(jī)器上匯總,(匯總要壓力姓到住)
//groupByKey,發(fā)送本機(jī)所有的map,在一個機(jī)器上匯總(匯總壓力大)
//如果數(shù)據(jù)在不同的機(jī)器上晒奕,則會出現(xiàn)先重復(fù)數(shù)據(jù),distinct名斟,reduceBykey脑慧,只是在本機(jī)上去重,謹(jǐn)慎一點(diǎn)的話蒸眠,在reduceByKey后面需要加多一個distinct
}
}
輸出結(jié)果
2012-3-1 a 2012-3-1 b 2012-3-2 a 2012-3-2 b 2012-3-3 b 2012-3-3 c 2012-3-4 d 2012-3-5 a 2012-3-6 b 2012-3-6 c 2012-3-7 c 2012-3-7 d (2012-3-1 a,1) (2012-3-1 b,1) (2012-3-2 a,1) (2012-3-2 b,1) (2012-3-3 b,1) (2012-3-3 c,1) (2012-3-4 d,1) (2012-3-5 a,1) (2012-3-6 b,1) (2012-3-6 c,1) (2012-3-7 c,1) (2012-3-7 d,1)
reduceByKey和groupByKey區(qū)別與用法
(1)當(dāng)采用reduceByKeyt時漾橙,Spark可以在每個分區(qū)移動數(shù)據(jù)之前將待輸出數(shù)據(jù)與一個共用的key結(jié)合。借助下圖可以理解在reduceByKey里究竟發(fā)生了什么楞卡。 注意在數(shù)據(jù)對被搬移前同一機(jī)器上同樣的key是怎樣被組合的(reduceByKey中的lamdba函數(shù))霜运。然后lamdba函數(shù)在每個區(qū)上被再次調(diào)用來將所有值reduce成一個最終結(jié)果。整個過程如下:
(2)當(dāng)采用groupByKey時蒋腮,由于它不接收函數(shù)淘捡,spark只能先將所有的鍵值對(key-value pair)都移動,這樣的后果是集群節(jié)點(diǎn)之間的開銷很大池摧,導(dǎo)致傳輸延時焦除。整個過程如下:
( 3 )區(qū)別
reduceByKey,在本機(jī)suffle后,再發(fā)送一個總map,發(fā)送到一個總機(jī)器上suffle匯總map作彤,(匯總要壓力斜炱恰)
groupByKey,發(fā)送本機(jī)所有的map,在一個機(jī)器上suffle匯總map(匯總壓力大)