sparkCore:算子實(shí)現(xiàn):
object WordCount {
def main(args: Array[String]):Unit = {
//創(chuàng)建
val config =new SparkConf().setMaster("local[*]").setAppName("WC")
//創(chuàng)建SparkContext對(duì)象
val sc =new SparkContext(config)
//讀取文件
val lines:RDD[String] = sc.textFile("in")
//分解單詞
val words:RDD[String] = lines.flatMap(_.split(" "))
val wordOne:RDD[(String,Int)] = words.map((_,1))
val result:Array[(String,Int)] = wordOne.reduceByKey(_+_).collect()
result.foreach(println)
}
SparkSql 實(shí)現(xiàn):
object WordCount {
def main(args: Array[String]):Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("wordCount")
.master("local[*]")
.getOrCreate()
//讀取數(shù)據(jù)
val ds: Dataset[String] = spark.read.textFile("in/word.txt")
//引包释牺,不然無(wú)法調(diào)用flatMap()
import spark.implicits._
//整理數(shù)據(jù) (切分壓平)
val ds1: Dataset[String] = ds.flatMap(_.split(" "))
//構(gòu)建臨時(shí)表
ds1.createTempView("word")
//執(zhí)行 SQL 語(yǔ)句野崇,結(jié)果倒序
val df:DataFrame = spark.sql("select value,count(*) count from word group by value order by count desc")
//展示
df.show()
//關(guān)閉
spark.stop()
}
}
java 實(shí)現(xiàn):MapReduce
mapper
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
Text k =new Text();
IntWritable v = new IntWritable(1);
// @SuppressWarnings("unused")
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 1 將一行內(nèi)容轉(zhuǎn)化為String
String line = value.toString();
// 2 切分
String[] words = line.split(" ");
// 3 循環(huán)寫出到下一個(gè)階段 寫
for (String word : words) {
k.set(word);
context.write(k,v);//寫入
}
}
}
reduce
public class WordCountReducer extends Reducer<Text, IntWritable, Text,IntWritable>{
// hello 1
// hello 1
@Override
//相同的進(jìn)來(lái)
protected void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
// 1 匯總 單詞總個(gè)數(shù)
int sum = 0;
for (IntWritable count : values) {
sum +=count.get();
}
// 2 輸出單詞的總個(gè)數(shù)
context.write(key, new IntWritable(sum));
}
}
driver
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1獲取job信息
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 獲取jar包位置
job.setJarByClass(WordCountDriver.class);
// 3 關(guān)聯(lián)mapper he reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4 設(shè)置map輸出數(shù)據(jù)類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 設(shè)置最終輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 設(shè)置數(shù)據(jù)輸入 輸出文件的 路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7提交代碼
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}