/**
- 業(yè)務(wù)場(chǎng)景:數(shù)據(jù)去重問題
- Created by YJ on 2017/2/7.
- 統(tǒng)計(jì)數(shù)據(jù),盡量用reduceByKey,不要用groupByKey,優(yōu)化點(diǎn)
- reduceByKey,在本機(jī)suffle后,再發(fā)送一個(gè)總map贮竟,發(fā)送到一個(gè)總機(jī)器上匯總瀑焦,(匯總要壓力谐怼)
- groupByKey,發(fā)送本機(jī)所有的map,在一個(gè)機(jī)器上匯總(匯總壓力大)
/
/
數(shù)據(jù)格式
flie1:
2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c
flie2:
2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d
2012-3-5 a
2012-3-6 c
2012-3-7 d
2012-3-3 c
*/
package ClassicCase
import org.apache.spark.{SparkConf, SparkContext}
object case2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("reduce")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
//獲取數(shù)據(jù)
val two = sc.textFile("hdfs://192.168.109.130:8020//user/flume/ClassicCase/case2/*")
two.filter(_.trim().length>0) //需要有空格坏快。
.map(line=>(line.trim,""))//全部值當(dāng)key靶橱,(key value,"")
.groupByKey()//groupByKey,過濾重復(fù)的key value 郎任,發(fā)送到總機(jī)器上匯總
.sortByKey() //按key value的自然順序排序
.keys.collect().foreach(println) //所有的keys變成數(shù)組再輸出
//第二種有風(fēng)險(xiǎn)
two.filter(_.trim().length>0)
.map(line=>(line.trim,"1"))
.distinct()
.reduceByKey(_+_)
.sortByKey()
.foreach(println)
//reduceByKey,在本機(jī)suffle后,再發(fā)送一個(gè)總map辟犀,發(fā)送到一個(gè)總機(jī)器上匯總钉迷,(匯總要壓力薪荡)
//groupByKey,發(fā)送本機(jī)所有的map,在一個(gè)機(jī)器上匯總(匯總壓力大)
//如果數(shù)據(jù)在不同的機(jī)器上篷朵,則會(huì)出現(xiàn)先重復(fù)數(shù)據(jù),distinct婆排,reduceBykey声旺,只是在本機(jī)上去重,謹(jǐn)慎一點(diǎn)的話段只,在reduceByKey后面需要加多一個(gè)distinct
}
}
方法二
object FileDistinct {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("filedistinct").setMaster("local")
val sc = new SparkContext(sparkConf)
val rdd = sc.textFile("demo1/*")
//1.filter去掉空行 map去掉空格
rdd.filter(x => x.trim.length!=0).map(_.trim).distinct().foreach(println)
//group必須有鍵值對(duì) 拿到key的值
rdd.filter(x => x.trim.length!=0).map(x =>(x.trim,1)).groupByKey().keys.foreach(println)
//3.
rdd.filter(x => x.trim.length!=0).map(x =>(x.trim,1)).reduceByKey(_+_).keys.foreach(println)
}
}
輸出結(jié)果
2012-3-1 a 2012-3-1 b 2012-3-2 a 2012-3-2 b 2012-3-3 b 2012-3-3 c 2012-3-4 d 2012-3-5 a 2012-3-6 b 2012-3-6 c 2012-3-7 c 2012-3-7 d (2012-3-1 a,1) (2012-3-1 b,1) (2012-3-2 a,1) (2012-3-2 b,1) (2012-3-3 b,1) (2012-3-3 c,1) (2012-3-4 d,1) (2012-3-5 a,1) (2012-3-6 b,1) (2012-3-6 c,1) (2012-3-7 c,1) (2012-3-7 d,1)
reduceByKey和groupByKey區(qū)別與用法
(1)當(dāng)采用reduceByKeyt時(shí)腮猖,Spark可以在每個(gè)分區(qū)移動(dòng)數(shù)據(jù)之前將待輸出數(shù)據(jù)與一個(gè)共用的key結(jié)合。借助下圖可以理解在reduceByKey里究竟發(fā)生了什么赞枕。 注意在數(shù)據(jù)對(duì)被搬移前同一機(jī)器上同樣的key是怎樣被組合的(reduceByKey中的lamdba函數(shù))澈缺。然后lamdba函數(shù)在每個(gè)區(qū)上被再次調(diào)用來將所有值reduce成一個(gè)最終結(jié)果。整個(gè)過程如下:
(2)當(dāng)采用groupByKey時(shí)炕婶,由于它不接收函數(shù)姐赡,spark只能先將所有的鍵值對(duì)(key-value pair)都移動(dòng),這樣的后果是集群節(jié)點(diǎn)之間的開銷很大柠掂,導(dǎo)致傳輸延時(shí)项滑。整個(gè)過程如下:
( 3 )區(qū)別
reduceByKey,在本機(jī)suffle后,再發(fā)送一個(gè)總map,發(fā)送到一個(gè)總機(jī)器上suffle匯總map陪踩,(匯總要壓力姓让恰)
groupByKey,發(fā)送本機(jī)所有的map,在一個(gè)機(jī)器上suffle匯總map(匯總壓力大)