介紹以下Actions算子:
foreach
foreachPatition
reduce
collect
count
first
take
takeSample
top
takeOrdered
saveAsTextFile
saveAsSequenceFile
saveAsObjectFile
countByKey
countByValue
aggregate
(1) foreach、foreachPatition
- foreach:遍歷RDD中的元素
- foreachPatition:按照分區(qū)遍歷RDD中的元素
val arr = Array(1,2,3,4,5,6)
val rdd = sc.makeRDD(arr,2)
rdd.foreach(x => {
println("===========")
println(x)
})
/*
===========
1
===========
2
===========
3
===========
4
===========
5
===========
6
*/
rdd.foreachPartition(x => {
println("===========")
while(x.hasNext) {
println(x.next())
}
})
/*
===========
1
2
3
===========
4
5
6
*/
}
(2) reduce:按照指定規(guī)則聚合RDD中的元素
val numArr = Array(1,2,3,4,5)
val rdd = sc.parallelize(numArr)
val sum = rdd.reduce(_+_)
println(sum)
/*
15
*/
(3) collect:計算結(jié)果拉取回Driver端
val numArr = Array((1,1),(1,2),(1,3),(2,1),(2,2),(2,3))
val rdd = sc.parallelize(numArr)
val sum = rdd.reduceByKey(_+_)
sum.collect().foreach(println)
/*
(1,6)
(2,6)
*/
(4) count、countByKey栗菜、countByValue
count:統(tǒng)計RDD中元素個數(shù)
countByKey:統(tǒng)計每個Key中的元素的個數(shù)
countByValue:統(tǒng)計每個value的個數(shù)
// -- count
val arr = Array("Tom","Jack","Tony","Bob","Kate")
val rdd = sc.makeRDD(arr)
println(rdd.count())
/*
5
*/
// -- countByKey
val rdd = sc.parallelize(Array(
("銷售部","Tom"), ("銷售部","Jack"),("銷售部","Bob"),("銷售部","Terry"),
("后勤部","Jack"),("后勤部","Selina"),("后勤部","Hebe"),
("人力部","Ella"),("人力部","Harry"),
("開發(fā)部","Allen")
))
val result = rdd.countByKey();
result.foreach(println)
/*
(后勤部,3)
(開發(fā)部,1)
(銷售部,4)
(人力部,2)
// -- countByValue
val rdd = sc.parallelize(Array(
"Tom","Jed","Tom",
"Tom","Jed","Jed",
"Tom","Tony","Jed"
))
val result = rdd.countByValue();
result.foreach(println)
/*
(Tom,4)
(Tony,1)
(Jed,4)
*/
(5) first镊掖、take渡八、takeSample
take(n):取RDD中前n條數(shù)據(jù)
first:= take(1)
takeSample(withReplacement,num,[seed]):隨機抽取RDD中的元素
withReplacement : 是否是放回式抽樣
true代表如果抽中A元素登疗,之后還可以抽取A元素
false代表如果抽中了A元素,之后都不在抽取A元素
fraction : 抽樣的比例
seed : 抽樣算法的隨機數(shù)種子椭迎,不同的數(shù)值代表不同的抽樣規(guī)則彰阴,可以手動設(shè)置瘾敢,默認為long的隨機數(shù)
val arr = Array(("Tom",88),("Bob",92),("Allen",86),("Kate",100),("Sandy",97))
val rdd = sc.makeRDD(arr)
// 排序后去前三個
rdd.sortBy(_._2,false).take(3).foreach(println)
/*
(Kate,100)
(Sandy,97)
(Bob,92)
*/
// 排序后取top1
rdd.sortBy(_._2,false).take(1).foreach(println) // (Kate,100)
println(rdd.sortBy(_._2,false).first()) // (Kate,100)
// 隨機抽取2個元素
rdd.takeSample(false, 2).foreach(println)
(6) top、takeOrdered
top(n):從RDD中尿这,按照默認(降序)或者指定的排序規(guī)則簇抵,返回前n個元素
takeOrdered(n):從RDD中,按照默認(升序)或者指定的排序規(guī)則射众,返回前n個元素
var rdd = sc.makeRDD(Array(10, 4, 2, 12, 3))
rdd.top(3).foreach(println) // 12 10 4(降序取)
rdd.takeOrdered(3).foreach(println) // 2 3 4(升序取)
(7) saveAsTextFile碟摆、saveAsSequenceFile 、saveAsObjectFile
- saveAsTextFile:把結(jié)果文件保存為textFile
- saveAsSequenceFile:把結(jié)果文件保存為SequenceFile
- saveAsObjectFile:把結(jié)果文件保存為ObjectFile
val line = sc.textFile("hdfs://repo:9000/user/spark/wordcount/input/wordcount.txt")
line.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.sortBy(_._2,false)
// .foreach(t => println(t._1 + " " + t._2))
.saveAsTextFile("hdfs://repo:9000/user/spark/wordcount/output/")