本章欲解決的問題為求TOP(N)脊髓,共用到的方法有:
- 假設(shè)輸入鍵都是唯一的辫愉,也即給定的輸入集合{(K,V)},所有的K都是唯一的,用Mapreduce/Hadoop方法
- 假設(shè)輸入鍵都是唯一的将硝,也即給定的輸入集合{(K,V)},所有的K都是唯一的恭朗,用spark方法
- 假設(shè)輸入鍵都不是唯一的,也即給定的輸入集合{(K,V)},K是有重復的依疼,用spark強大的排序算法top()函數(shù)和takeOrdered()等
主要用到的TOP N函數(shù)
java中實現(xiàn)Top N的方法最常用的是適用SortedMap<K,V>和TreeMap<K,V>,然后將L的所有元素增加到topN中痰腮,如果topN.size()>N,則刪除第一個元素或最后一個元素
//TOP K 中最關(guān)鍵的算法
static SortMap<Integer,T> topN(List<Tuple2<T,Integer>> L,int N){
if((L==null) || (L.isEmpty())){
return null;
}
SortMap<Integer,T> topN = new TreeMap<Integer,T>();
for(Tuple2<T,Integer> element : L){
topN.put(element._1,element._2);
if(topN.size() > N){
topN.remove(topN.firstKey())
}
}
return topN;
}
基于MapReduce實現(xiàn)的鍵唯一方法
類名 | 描述 |
---|---|
TopN_Driver | 提交作業(yè)的驅(qū)動器 |
TopN_Mapper | 定義map() |
TopN_Reduce | 定義reduce() |
- 重寫setup和cleanup函數(shù),這里兩個函數(shù)再每次啟動映射器都會執(zhí)行一次律罢,setup用于獲取N的值膀值,cleanup用于發(fā)射每個映射器的TOP N到reduce端
//獲取N的值
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
this.N = context.getConfiguration().getInt("N", 10); // default is top 10
}
//將結(jié)果發(fā)射,其中NullWritable.get()獲取的值都相同误辑,也即都映射到相同的reduce端
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
for (String str : top.values()) {
context.write(NullWritable.get(), new Text(str));
}
}
- Map函數(shù)沧踏,完成分區(qū)的TOP N求值
@Override
public void map(Text key, IntWritable value, Context context)
throws IOException, InterruptedException {
String keyAsString = key.toString();
int frequency = value.get();
String compositeValue = keyAsString + "," + frequency;
top.put(frequency, compositeValue);
// keep only top N
if (top.size() > N) {
top.remove(top.firstKey());
}
}
- Reduce函數(shù),完成所有的TOP N求值
private int N = 10; // default
private SortedMap<Integer, String> top = new TreeMap<Integer, String>();
//同樣的SortedMap<Integer, String>操作
@Override
public void reduce(NullWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
String valueAsString = value.toString().trim();
String[] tokens = valueAsString.split(",");
String url = tokens[0];
int frequency = Integer.parseInt(tokens[1]);
top.put(frequency, url);
// keep only top N
if (top.size() > N) {
top.remove(top.firstKey());
}
}
// 發(fā)射最終的 final top N
List<Integer> keys = new ArrayList<Integer>(top.keySet());
for(int i=keys.size()-1; i>=0; i--){
context.write(new IntWritable(keys.get(i)), new Text(top.get(keys.get(i))));
}
}
//也先執(zhí)行setup獲得N的值
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
this.N = context.getConfiguration().getInt("N", 10); // default is top 10
}
- 驅(qū)動程序類TopNDriver.java
Job job = new Job(getConf());
HadoopUtil.addJarsToDistributedCache(job, "/lib/");
int N = Integer.parseInt(args[0]); // top N
job.getConfiguration().setInt("N", N);
job.setJobName("TopNDriver");
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(TopNMapper.class);
job.setReducerClass(TopNReducer.class);
//設(shè)置reduce的數(shù)目為1個巾钉,也即所有的TOP N都到同一個Reduce
job.setNumReduceTasks(1);
// map()'s output (K,V)
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
// reduce()'s output (K,V)
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
- 查找TOP 10 和 Bottom 10
//查找top10
if(top10Cats.size()>10){
top10Cats.remove(top10Cats.firstKey())
}
//查找Bottom10
if(top10Cats.size()>10){
top10Cats.remove(top10Cats.lastKey())
}
基于Spark實現(xiàn)的鍵唯一方法
Java API使用的spark函數(shù)類
spark java類 | 函數(shù)類型 |
---|---|
Function<T,R> | T=>R |
DoubleFunction<T> | T=>Double |
PairFunction<T,K,V> | T=>Tuple2<K,V> |
FlatMapFunction<T,R> | T=>Iterable<R> |
DoubleFlatMapFunction<T> | T=>Iterable<Double> |
PairFlatMapFunction<T,K,V> | T=>Iterable<Tuple2<K,v>> |
Function2<T1,T2,R> | T1,T2 => R |
在spark中使用setUp()和cleanUp()
JavaRDD<SortedMap<Integer, String>> partitions =
//使用mapPartitions方法
pairs.mapPartitions(
new FlatMapFunction<Iterator<Tuple2<K,V>>, SortedMap<K1, K2>>() {
@Override
public Iterator<Tuple2<K,V>> call(Iterator<Tuple2<K,V>> iter) {
setup();
while(iter.hasNext()){
//map()功能
}
cleanUp();
return <the-result>
}
})
- 采用spark實現(xiàn)TOP N
public static void main(String[] args) throws Exception {
// 輸入處理參數(shù)
if (args.length < 1) {
System.err.println("Usage: Top10 <input-file>");
System.exit(1);
}
String inputPath = args[0];
System.out.println("args[0]: <input-path>="+inputPath);
// 連接到spark master
JavaSparkContext ctx = SparkUtil.createJavaSparkContext();
// 從HDFS中讀取文件并創(chuàng)建第一個RDD
// <string-key><,><integer-value>,
JavaRDD<String> lines = ctx.textFile(inputPath, 1);
// 從現(xiàn)有的JavaRDD<String>創(chuàng)建一個新的成對的RDDJavaPairRDD<String,Integer>
// Spark Java類:PairFunction<T, K, V>
// 函數(shù)類型:T => Tuple2<K, V>
//其實每一個JavaPairRDD<String,Integer>也即是Tuple2<String,Integer>()
JavaPairRDD<String,Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String,Integer> call(String s) {
String[] tokens = s.split(","); // cat7,234
return new Tuple2<String,Integer>(tokens[0], Integer.parseInt(tokens[1]));
}
});
List<Tuple2<String,Integer>> debug1 = pairs.collect();
for (Tuple2<String,Integer> t2 : debug1) {
System.out.println("key="+t2._1 + "\t value= " + t2._2);
}
// 為各個輸入分區(qū)創(chuàng)建一個本地TOP 10列表
JavaRDD<SortedMap<Integer, String>> partitions = pairs.mapPartitions(
new FlatMapFunction<Iterator<Tuple2<String,Integer>>, SortedMap<Integer, String>>() {
@Override
public Iterator<SortedMap<Integer, String>> call(Iterator<Tuple2<String,Integer>> iter) {
SortedMap<Integer, String> top10 = new TreeMap<Integer, String>();
while (iter.hasNext()) {
Tuple2<String,Integer> tuple = iter.next();
top10.put(tuple._2, tuple._1);
// keep only top N
if (top10.size() > 10) {
top10.remove(top10.firstKey());
}
}
//singletonList確保唯一性
return Collections.singletonList(top10).iterator();
}
});
SortedMap<Integer, String> finaltop10 = new TreeMap<Integer, String>();
//使用collect得到所有TOP 10列表
List<SortedMap<Integer, String>> alltop10 = partitions.collect();
//獲得最終所有的TOP 10
for (SortedMap<Integer, String> localtop10 : alltop10) {
//System.out.println(tuple._1 + ": " + tuple._2);
// weight/count = tuple._1
// catname/URL = tuple._2
for (Map.Entry<Integer, String> entry : localtop10.entrySet()) {
// System.out.println(entry.getKey() + "--" + entry.getValue());
finaltop10.put(entry.getKey(), entry.getValue());
// keep only top 10
if (finaltop10.size() > 10) {
finaltop10.remove(finaltop10.firstKey());
}
}
}
// 輸出最終的TOP 10列表
for (Map.Entry<Integer, String> entry : finaltop10.entrySet()) {
System.out.println(entry.getKey() + "--" + entry.getValue());
}
System.exit(0);
}
全局指定TOP N 參數(shù)
- 定義broadcastTopN:final Broadcast<Integer> broadcastTopN = context.broadcast(topN)
- 獲取N的值:final int topN = broadcastTopN.value();
基于Spark實現(xiàn)的鍵不唯一的方法
算法過程
- 要保證K是唯一的翘狱,要把輸入映射到JavaPairRDD<K,V>對,然后交給reduceByKey()
- 將所有唯一的(K,V)對劃分為M個分區(qū)
- 找到各個分區(qū)的TOP N (本地TOP N)
- 找出所有本地TOP N的最終TOP N
基于Spark實現(xiàn)的非唯一鍵方法
public static void main(String[] args) throws Exception {
// 輸入處理參數(shù)
if (args.length < 2) {
System.err.println("Usage: Top10 <input-path> <topN>");
System.exit(1);
}
System.out.println("args[0]: <input-path>="+args[0]);
System.out.println("args[1]: <topN>="+args[1]);
final int N = Integer.parseInt(args[1]);
// 創(chuàng)建一個javaSpark上下文對象
JavaSparkContext ctx = SparkUtil.createJavaSparkContext();
// 將TOP N 廣播到所有集群節(jié)點
final Broadcast<Integer> topN = ctx.broadcast(N);
// now topN is available to be read from all cluster nodes
// 創(chuàng)建第一個RDD砰苍,格式是這樣的A,2 | B,2 |C,3這樣
//<string-key><,><integer-value-count>
JavaRDD<String> lines = ctx.textFile(args[0], 1);
lines.saveAsTextFile("/output/1");
// RDD分區(qū)盒蟆,返回一個新的RDD踏烙,歸約到numPartitions分區(qū)
//分區(qū)的原則:每個執(zhí)行器使用(2*num_executors*cores_per_executor)個分區(qū)
JavaRDD<String> rdd = lines.coalesce(9);
// 將輸入(T)映射到(K,V)對
// PairFunction<T, K, V>
// T => Tuple2<K, V>
JavaPairRDD<String,Integer> kv = rdd.mapToPair(new PairFunction<String,String,Integer>() {
@Override
public Tuple2<String,Integer> call(String s) {
String[] tokens = s.split(","); // url,789
return new Tuple2<String,Integer>(tokens[0], Integer.parseInt(tokens[1]));
}
});
kv.saveAsTextFile("/output/2");
//用Function函數(shù)對重復鍵進行歸約
JavaPairRDD<String, Integer> uniqueKeys = kv.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
uniqueKeys.saveAsTextFile("/output/3");
// 為本地的partitions創(chuàng)建本地的TOP N
JavaRDD<SortedMap<Integer, String>> partitions = uniqueKeys.mapPartitions(
new FlatMapFunction<Iterator<Tuple2<String,Integer>>, SortedMap<Integer, String>>() {
@Override
public Iterator<SortedMap<Integer, String>> call(Iterator<Tuple2<String,Integer>> iter) {
final int N = topN.value();
SortedMap<Integer, String> localTopN = new TreeMap<Integer, String>();
while (iter.hasNext()) {
Tuple2<String,Integer> tuple = iter.next();
localTopN.put(tuple._2, tuple._1);
// keep only top N
if (localTopN.size() > N) {
localTopN.remove(localTopN.firstKey());
}
}
return Collections.singletonList(localTopN).iterator();
}
});
partitions.saveAsTextFile("/output/4");
// 獲得最終的TOP N
SortedMap<Integer, String> finalTopN = new TreeMap<Integer, String>();
//獲得所有分區(qū)的TOP N
List<SortedMap<Integer, String>> allTopN = partitions.collect();
for (SortedMap<Integer, String> localTopN : allTopN) {
for (Map.Entry<Integer, String> entry : localTopN.entrySet()) {
// count = entry.getKey()
// url = entry.getValue()
finalTopN.put(entry.getKey(), entry.getValue());
// keep only top N
if (finalTopN.size() > N) {
finalTopN.remove(finalTopN.firstKey());
}
}
}
//輸出最終的TOP N
for (Map.Entry<Integer, String> entry : finalTopN.entrySet()) {
System.out.println(entry.getKey() + "--" + entry.getValue());
}
System.exit(0);
}
基于takeOrdered實現(xiàn)的鍵不唯一的方法
//步驟8:獲取全局TOP 10的使用
List<Tuple2<String, Integer>> topNResult = uniqueKeys.takeOrdered(N, MyTupleComparator.INSTANCE);
//但需要實現(xiàn)排序方法
static class MyTupleComparator implements Comparator<Tuple2<String, Integer>> ,Serializable {
final static MyTupleComparator INSTANCE = new MyTupleComparator();
@Override
public int compare(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
return -t1._2.compareTo(t2._2); // sorts RDD elements descending (use for Top-N)
// return t1._2.compareTo(t2._2); // sorts RDD elements ascending (use for Bottom-N)
}
}
用mapreduce求不唯一的方法
- 先類似wordCount求出唯一的<key,value>
- 之后用第一節(jié)唯一鍵求TOP N即可
使用Scala實現(xiàn)唯一鍵和不唯一鍵
唯一鍵的實現(xiàn)方法
def main(args: Array[String]): Unit = {
if (args.size < 1) {
println("Usage: TopN <input>")
sys.exit(1)
}
//獲得sparkConf對象
val sparkConf = new SparkConf().setAppName("TopN")
val sc = new SparkContext(sparkConf)
//廣播N變量
val N = sc.broadcast(10)
val path = args(0)
val input = sc.textFile(path)
//注意:key和value倒過來了
val pair = input.map(line => {
val tokens = line.split(",")
(tokens(2).toInt, tokens)
})
import Ordering.Implicits._
val partitions = pair.mapPartitions(itr => {
//sortedMap是對key進行排序的师骗,也即對value排序了
var sortedMap = SortedMap.empty[Int, Array[String]]
itr.foreach { tuple =>
{
sortedMap += tuple
if (sortedMap.size > N.value) {
sortedMap = sortedMap.takeRight(N.value)
}
}
}
//獲得分區(qū)右邊的N個
sortedMap.takeRight(N.value).toIterator
})
//獲得所有分區(qū)
val alltop10 = partitions.collect()
//把所有分區(qū)連接上SortedMap历等,也即可所有分區(qū)都排序好了
val finaltop10 = SortedMap.empty[Int, Array[String]].++:(alltop10)
val resultUsingMapPartition = finaltop10.takeRight(N.value)
//Prints result (top 10) on the console
resultUsingMapPartition.foreach {
case (k, v) => println(s"$k \t ${v.asInstanceOf[Array[String]].mkString(",")}")
}
// 方法二:sortByKey對key進行排序,以降序的方式
val moreConciseApproach = pair.groupByKey().sortByKey(false).take(N.value)
//Prints result (top 10) on the console
moreConciseApproach.foreach {
case (k, v) => println(s"$k \t ${v.flatten.mkString(",")}")
}
// done
sc.stop()
}
不唯一鍵的實現(xiàn)方法
def main(args: Array[String]): Unit = {
if (args.size < 1) {
println("Usage: TopNNonUnique <input>")
sys.exit(1)
}
val sparkConf = new SparkConf().setAppName("TopNNonUnique")
val sc = new SparkContext(sparkConf)
val N = sc.broadcast(2)
val path = args(0)
val input = sc.textFile(path)
val kv = input.map(line => {
val tokens = line.split(",")
(tokens(0), tokens(1).toInt)
})
val uniqueKeys = kv.reduceByKey(_ + _)
import Ordering.Implicits._
val partitions = uniqueKeys.mapPartitions(itr => {
//SortedMap是一個對鍵進行排列
var sortedMap = SortedMap.empty[Int, String]
itr.foreach { tuple =>
{
//把元組的值相反再相加
sortedMap += tuple.swap
if (sortedMap.size > N.value) {
sortedMap = sortedMap.takeRight(N.value)
}
}
}
sortedMap.takeRight(N.value).toIterator
})
val alltop10 = partitions.collect()
val finaltop10 = SortedMap.empty[Int, String].++:(alltop10)
val resultUsingMapPartition = finaltop10.takeRight(N.value)
//Prints result (top 10) on the console
resultUsingMapPartition.foreach {
case (k, v) => println(s"$k \t ${v.mkString(",")}")
}
// Below is additional approach which is more concise
val createCombiner = (v: Int) => v
val mergeValue = (a: Int, b: Int) => (a + b)
val moreConciseApproach = kv.combineByKey(createCombiner, mergeValue, mergeValue)
.map(_.swap)
.groupByKey()
.sortByKey(false)
.take(N.value)
//Prints result (top 10) on the console
moreConciseApproach.foreach {
case (k, v) => println(s"$k \t ${v.mkString(",")}")
}
// done
sc.stop()
}