1谈秫、獲取文本內(nèi)最大的前三個數(shù)字
輸入數(shù)據(jù):
3
54
44
2
67
32
133
54
23
1
35
23
73
32
16
78
21
56
1)Java版top3:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
/**
* 取最大的前3個數(shù)字
*/
public class Topn {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("Topn")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("D:\\spark\\top3.txt");
JavaPairRDD<Integer, String> pairs = lines.mapToPair(new PairFunction<String, Integer, String>() {
@Override
public Tuple2<Integer, String> call(String s) throws Exception {
return new Tuple2<>(Integer.valueOf(s),s);
}
});
JavaPairRDD<Integer, String> sortedPairs = pairs.sortByKey(false);
JavaRDD<Integer> sortedNumbers = sortedPairs.map(new Function<Tuple2<Integer, String>, Integer>() {
@Override
public Integer call(Tuple2<Integer, String> v1) throws Exception {
return v1._1;
}
});
List<Integer> list = sortedNumbers.take(3);
System.out.println(Arrays.toString(list.toArray()));
sc.close();
}
}
輸出結(jié)果:
[133, 78, 73]
2)Scala版top3
import org.apache.spark.{SparkConf, SparkContext}
object Topn {
def main(args: Array[String]): Unit = {
val inputPath = "D:\\spark\\top3.txt"
val conf = new SparkConf()
.setAppName("Topn")
.setMaster("local")
val sc = new SparkContext(conf)
val lines =sc.textFile(inputPath,1)
val pairs = lines.map(line =>(line.toInt,line))
val sortedPairs = pairs.sortByKey(false)
val sortedNumbers = sortedPairs.map(_._1)
val topNumber = sortedNumbers.take(3)
for (num <- topNumber){
println(num)
}
sc.stop()
}
}
輸出結(jié)果:
133
78
73
2、對每個班級的學(xué)生成績判帮,取出前3名(分組取topn)
輸入數(shù)據(jù):
class1 90
class2 56
class1 87
class1 76
class2 88
class1 95
class1 74
class2 87
class2 67
class2 77
1)Java版分組取topn
/**
* 分組取Top3
*/
public class GroupTop3 {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("Topn")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("D:\\spark\\grouptop3.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
String[] split = s.split(" ");
return new Tuple2<>(split[0], Integer.valueOf(split[1]));
}
});
JavaPairRDD<String, Iterable<Integer>> groupedPairs = pairs.groupByKey();
JavaPairRDD<String, Iterable<Integer>> top3Scores = groupedPairs.mapToPair(new PairFunction<Tuple2<String, Iterable<Integer>>, String, Iterable<Integer>>() {
@Override
public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> classScores) throws Exception {
Integer[] top3 = new Integer[3];
String className = classScores._1;
Iterator<Integer> scores = classScores._2.iterator();
while (scores.hasNext()){
Integer score = scores.next();
for(int i=0;i<3;i++){
if(top3[i]==null){
top3[i] = score;
break;
}else if(score>top3[i]){
for(int j=top3.length-1;j>i;j--){
top3[j]=top3[j-1];
}
top3[i] = score;
break;
}
}
}
return new Tuple2<String, Iterable<Integer>>(className,Arrays.asList(top3));
}
});
top3Scores.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
@Override
public void call(Tuple2<String, Iterable<Integer>> top3Score) throws Exception {
System.out.println("class:"+top3Score._1);
Iterator<Integer> integers = top3Score._2.iterator();
while (integers.hasNext()){
System.out.println(integers.next());
}
System.out.println("=================================");
}
});
sc.close();
}
}
輸出結(jié)果:
class:class1
95
90
87
=================================
class:class2
88
87
77
=================================
2)scala版分組取topn
object GroupTop3 {
def main(args: Array[String]): Unit = {
val inputPath = "D:\\spark\\grouptop3.txt"
val conf = new SparkConf()
.setAppName("GroupTop3")
.setMaster("local")
val sc = new SparkContext(conf)
val lines =sc.textFile(inputPath,1)
val pairs = lines.map(line =>(line.split(" ")(0),line.split(" ")(1).toInt))
val groupedPairs = pairs.groupByKey()
val groupscores = groupedPairs.map(classScore =>{
val className = classScore._1;
val scores = classScore._2.iterator
val top3 = new Array[Integer](3)
var flag = true
for (score <- scores){
var flag = true
for (i <- 0 until top3.length if flag){
if(top3(i)==null){
top3(i) = score
flag = false
}else if(score>top3(i)){
for(j <- top3.length - 1 to i+1 by -1){
top3(j) = top3(j-1);
}
top3(i) = score;
flag = false
}
}
}
(className,top3)
})
for(groupscore<-groupscores){
println("class:"+groupscore._1)
val scores = groupscore._2
for(score<- scores){
println(score)
}
println("=============================")
}
sc.stop()
}
}
輸出結(jié)果:
class:class1
95
90
87
=============================
class:class2
88
87
77
=============================