SparkWC.scala
package day06
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkWC {
def main(args: Array[String]): Unit = {
// 配置信息類
val conf: SparkConf = new SparkConf().setAppName("SparkWC")//.setMaster("local[*]")
// 上下文對象
val sc: SparkContext = new SparkContext(conf)
// 讀取數(shù)據(jù)
val lines = sc.textFile(args(0))
// 處理數(shù)據(jù)
val words: RDD[String] = lines.flatMap(_.split(" "))
val paired: RDD[(String, Int)] = words.map((_,1))
val reduced: RDD[(String, Int)] = paired.reduceByKey(_+_)
val res: RDD[(String, Int)] = reduced.sortBy(_._2, false)
// 保存
res.saveAsTextFile(args(1))
// println(res.collect().toBuffer)
// 結(jié)束任務(wù)
sc.stop()
}
}
//打包上傳
image.png
#cd data
#vi wc1.log
hello tom
hello jerry
hello tom
hello kitty
hello tom
hello jerry
vi wc2.log
hello tom
hello jerry
hello lilei
hello hanmeimei
hello tom
hello tom
hello jerry
hello tom
#vi wc3.log
hello tom
hello jerry
hello lilei
hello hanmeimei
hello tom
hello tom
hello jerry
hello tom
# hdfs dfs -mkdir /wc
# hdfs dfs -put ~/data/wc1.log /wc
# hdfs dfs -put ~/data/wc2.log /wc
# hdfs dfs -put ~/data/wc3.log /wc
#cd training/spark/bin
# ./spark-submit --class day06.SparkWC spark://192.168.56.21:7077 --executor-memory 1g --total-executor-cores 2 /root/wc.jar hdfs://192.168.56.21:9000/wc hdfs://192.168.56.21:9000/output