數(shù)據(jù)算法 Hadoop/Spark大數(shù)據(jù)處理---第三章

本章欲解決的問題為求TOP(N)脊髓,共用到的方法有:

  • 假設(shè)輸入鍵都是唯一的辫愉,也即給定的輸入集合{(K,V)},所有的K都是唯一的,用Mapreduce/Hadoop方法
  • 假設(shè)輸入鍵都是唯一的将硝,也即給定的輸入集合{(K,V)},所有的K都是唯一的恭朗,用spark方法
  • 假設(shè)輸入鍵都不是唯一的,也即給定的輸入集合{(K,V)},K是有重復的依疼,用spark強大的排序算法top()函數(shù)和takeOrdered()等

主要用到的TOP N函數(shù)

java中實現(xiàn)Top N的方法最常用的是適用SortedMap<K,V>和TreeMap<K,V>,然后將L的所有元素增加到topN中痰腮,如果topN.size()>N,則刪除第一個元素或最后一個元素

//TOP K 中最關(guān)鍵的算法
static SortMap<Integer,T> topN(List<Tuple2<T,Integer>> L,int N){
    if((L==null) || (L.isEmpty())){
        return null;
    }
    
    SortMap<Integer,T> topN = new TreeMap<Integer,T>();
    for(Tuple2<T,Integer> element : L){
        topN.put(element._1,element._2);
        if(topN.size() > N){
            topN.remove(topN.firstKey())
        }
    }
    return topN;
}


基于MapReduce實現(xiàn)的鍵唯一方法

類名 描述
TopN_Driver 提交作業(yè)的驅(qū)動器
TopN_Mapper 定義map()
TopN_Reduce 定義reduce()

  • 重寫setup和cleanup函數(shù),這里兩個函數(shù)再每次啟動映射器都會執(zhí)行一次律罢,setup用于獲取N的值膀值,cleanup用于發(fā)射每個映射器的TOP N到reduce端
  //獲取N的值
  @Override
   protected void setup(Context context) throws IOException,
         InterruptedException {
      this.N = context.getConfiguration().getInt("N", 10); // default is top 10
   }
   
    //將結(jié)果發(fā)射,其中NullWritable.get()獲取的值都相同误辑,也即都映射到相同的reduce端
   @Override
   protected void cleanup(Context context) throws IOException,
         InterruptedException {
      for (String str : top.values()) {
         context.write(NullWritable.get(), new Text(str));
      }
   }

- Map函數(shù)沧踏,完成分區(qū)的TOP N求值

 @Override
   public void map(Text key, IntWritable value, Context context)
         throws IOException, InterruptedException {

      String keyAsString = key.toString();
      int frequency =  value.get();
      String compositeValue = keyAsString + "," + frequency;
      top.put(frequency, compositeValue);
      // keep only top N
      if (top.size() > N) {
         top.remove(top.firstKey());
      }
   }

- Reduce函數(shù),完成所有的TOP N求值

 private int N = 10; // default
   private SortedMap<Integer, String> top = new TreeMap<Integer, String>();

   //同樣的SortedMap<Integer, String>操作
   @Override
   public void reduce(NullWritable key, Iterable<Text> values, Context context) 
      throws IOException, InterruptedException {
      for (Text value : values) {
         String valueAsString = value.toString().trim();
         String[] tokens = valueAsString.split(",");
         String url = tokens[0];
         int frequency =  Integer.parseInt(tokens[1]);
         top.put(frequency, url);
         // keep only top N
         if (top.size() > N) {
            top.remove(top.firstKey());
         }
      }
      
      // 發(fā)射最終的 final top N
        List<Integer> keys = new ArrayList<Integer>(top.keySet());
        for(int i=keys.size()-1; i>=0; i--){
         context.write(new IntWritable(keys.get(i)), new Text(top.get(keys.get(i))));
      }
   }
   
   //也先執(zhí)行setup獲得N的值
   @Override
   protected void setup(Context context) 
      throws IOException, InterruptedException {
      this.N = context.getConfiguration().getInt("N", 10); // default is top 10
   }

- 驅(qū)動程序類TopNDriver.java

 Job job = new Job(getConf());
      HadoopUtil.addJarsToDistributedCache(job, "/lib/");
      int N = Integer.parseInt(args[0]); // top N
      job.getConfiguration().setInt("N", N);
      job.setJobName("TopNDriver");

      job.setInputFormatClass(SequenceFileInputFormat.class);
      job.setOutputFormatClass(SequenceFileOutputFormat.class);

      job.setMapperClass(TopNMapper.class);
      job.setReducerClass(TopNReducer.class);
      //設(shè)置reduce的數(shù)目為1個巾钉,也即所有的TOP N都到同一個Reduce
      job.setNumReduceTasks(1);

      // map()'s output (K,V)
      job.setMapOutputKeyClass(NullWritable.class);   
      job.setMapOutputValueClass(Text.class);   
      
      // reduce()'s output (K,V)
      job.setOutputKeyClass(IntWritable.class);
      job.setOutputValueClass(Text.class);


- 查找TOP 10 和 Bottom 10

//查找top10 
if(top10Cats.size()>10){
    top10Cats.remove(top10Cats.firstKey())
}

//查找Bottom10
if(top10Cats.size()>10){
    top10Cats.remove(top10Cats.lastKey())
}

基于Spark實現(xiàn)的鍵唯一方法

Java API使用的spark函數(shù)類
spark java類 函數(shù)類型
Function<T,R> T=>R
DoubleFunction<T> T=>Double
PairFunction<T,K,V> T=>Tuple2<K,V>
FlatMapFunction<T,R> T=>Iterable<R>
DoubleFlatMapFunction<T> T=>Iterable<Double>
PairFlatMapFunction<T,K,V> T=>Iterable<Tuple2<K,v>>
Function2<T1,T2,R> T1,T2 => R
在spark中使用setUp()和cleanUp()
 JavaRDD<SortedMap<Integer, String>> partitions =
 //使用mapPartitions方法
 pairs.mapPartitions(
  new FlatMapFunction<Iterator<Tuple2<K,V>>, SortedMap<K1, K2>>() {
      @Override
         public Iterator<Tuple2<K,V>> call(Iterator<Tuple2<K,V>> iter) {
             setup();
             while(iter.hasNext()){
                 //map()功能
             }
             cleanUp();
             return <the-result>
         }
  })

- 采用spark實現(xiàn)TOP N

 public static void main(String[] args) throws Exception {
  
      // 輸入處理參數(shù)
      if (args.length < 1) {
         System.err.println("Usage: Top10 <input-file>");
         System.exit(1);
      }
      String inputPath = args[0];
      System.out.println("args[0]: <input-path>="+inputPath);

      // 連接到spark master
      JavaSparkContext ctx = SparkUtil.createJavaSparkContext();

      // 從HDFS中讀取文件并創(chuàng)建第一個RDD
      //  <string-key><,><integer-value>,
      JavaRDD<String> lines = ctx.textFile(inputPath, 1);

    
      // 從現(xiàn)有的JavaRDD<String>創(chuàng)建一個新的成對的RDDJavaPairRDD<String,Integer>
      // Spark Java類:PairFunction<T, K, V>
      // 函數(shù)類型:T => Tuple2<K, V>
      //其實每一個JavaPairRDD<String,Integer>也即是Tuple2<String,Integer>()
      JavaPairRDD<String,Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
         @Override
         public Tuple2<String,Integer> call(String s) {
            String[] tokens = s.split(","); // cat7,234
            return new Tuple2<String,Integer>(tokens[0], Integer.parseInt(tokens[1]));
         }
      });

      List<Tuple2<String,Integer>> debug1 = pairs.collect();
      for (Tuple2<String,Integer> t2 : debug1) {
         System.out.println("key="+t2._1 + "\t value= " + t2._2);
      }

    
      // 為各個輸入分區(qū)創(chuàng)建一個本地TOP 10列表
      JavaRDD<SortedMap<Integer, String>> partitions = pairs.mapPartitions(
         new FlatMapFunction<Iterator<Tuple2<String,Integer>>, SortedMap<Integer, String>>() {
         @Override
         public Iterator<SortedMap<Integer, String>> call(Iterator<Tuple2<String,Integer>> iter) {
             SortedMap<Integer, String> top10 = new TreeMap<Integer, String>();
             while (iter.hasNext()) {
                Tuple2<String,Integer> tuple = iter.next();
                top10.put(tuple._2, tuple._1);
                // keep only top N 
                if (top10.size() > 10) {
                   top10.remove(top10.firstKey());
                }  
             }
             //singletonList確保唯一性
             return Collections.singletonList(top10).iterator();
         }
      });

    
      SortedMap<Integer, String> finaltop10 = new TreeMap<Integer, String>();
      //使用collect得到所有TOP 10列表
      List<SortedMap<Integer, String>> alltop10 = partitions.collect();
      //獲得最終所有的TOP 10
      for (SortedMap<Integer, String> localtop10 : alltop10) {
          //System.out.println(tuple._1 + ": " + tuple._2);
          // weight/count = tuple._1
          // catname/URL = tuple._2
          for (Map.Entry<Integer, String> entry : localtop10.entrySet()) {
              //   System.out.println(entry.getKey() + "--" + entry.getValue());
              finaltop10.put(entry.getKey(), entry.getValue());
              // keep only top 10 
              if (finaltop10.size() > 10) {
                 finaltop10.remove(finaltop10.firstKey());
              }
          }
      }
    
      // 輸出最終的TOP 10列表
      for (Map.Entry<Integer, String> entry : finaltop10.entrySet()) {
         System.out.println(entry.getKey() + "--" + entry.getValue());
      }

      System.exit(0);
   }

全局指定TOP N 參數(shù)
  • 定義broadcastTopN:final Broadcast<Integer> broadcastTopN = context.broadcast(topN)
  • 獲取N的值:final int topN = broadcastTopN.value();

基于Spark實現(xiàn)的鍵不唯一的方法

算法過程
  1. 要保證K是唯一的翘狱,要把輸入映射到JavaPairRDD<K,V>對,然后交給reduceByKey()
  2. 將所有唯一的(K,V)對劃分為M個分區(qū)
  3. 找到各個分區(qū)的TOP N (本地TOP N)
  4. 找出所有本地TOP N的最終TOP N

基于Spark實現(xiàn)的非唯一鍵方法

public static void main(String[] args) throws Exception {
      // 輸入處理參數(shù)
      if (args.length < 2) {
         System.err.println("Usage: Top10 <input-path> <topN>");
         System.exit(1);
      }
      System.out.println("args[0]: <input-path>="+args[0]);
      System.out.println("args[1]: <topN>="+args[1]);
      final int N = Integer.parseInt(args[1]);

      // 創(chuàng)建一個javaSpark上下文對象
      JavaSparkContext ctx = SparkUtil.createJavaSparkContext();

      // 將TOP N 廣播到所有集群節(jié)點
      final Broadcast<Integer> topN = ctx.broadcast(N);
      // now topN is available to be read from all cluster nodes

      // 創(chuàng)建第一個RDD砰苍,格式是這樣的A,2 | B,2 |C,3這樣
      //<string-key><,><integer-value-count>
      JavaRDD<String> lines = ctx.textFile(args[0], 1);
      lines.saveAsTextFile("/output/1");
    
      // RDD分區(qū)盒蟆,返回一個新的RDD踏烙,歸約到numPartitions分區(qū)
      //分區(qū)的原則:每個執(zhí)行器使用(2*num_executors*cores_per_executor)個分區(qū)
      JavaRDD<String> rdd = lines.coalesce(9);
       
      // 將輸入(T)映射到(K,V)對
      // PairFunction<T, K, V>   
      // T => Tuple2<K, V>
      JavaPairRDD<String,Integer> kv = rdd.mapToPair(new PairFunction<String,String,Integer>() {
         @Override
         public Tuple2<String,Integer> call(String s) {
            String[] tokens = s.split(","); // url,789
            return new Tuple2<String,Integer>(tokens[0], Integer.parseInt(tokens[1]));
         }
      });
      kv.saveAsTextFile("/output/2");

      //用Function函數(shù)對重復鍵進行歸約
      JavaPairRDD<String, Integer> uniqueKeys = kv.reduceByKey(new Function2<Integer, Integer, Integer>() {
         @Override
         public Integer call(Integer i1, Integer i2) {
            return i1 + i2;
         }
      });
      uniqueKeys.saveAsTextFile("/output/3");
    
      // 為本地的partitions創(chuàng)建本地的TOP N
      JavaRDD<SortedMap<Integer, String>> partitions = uniqueKeys.mapPartitions(
          new FlatMapFunction<Iterator<Tuple2<String,Integer>>, SortedMap<Integer, String>>() {
          @Override
          public Iterator<SortedMap<Integer, String>> call(Iterator<Tuple2<String,Integer>> iter) {
             final int N = topN.value();
             SortedMap<Integer, String> localTopN = new TreeMap<Integer, String>();
             while (iter.hasNext()) {
                Tuple2<String,Integer> tuple = iter.next();
                localTopN.put(tuple._2, tuple._1);
                // keep only top N 
                if (localTopN.size() > N) {
                   localTopN.remove(localTopN.firstKey());
                } 
             }
             return Collections.singletonList(localTopN).iterator();
          }
      });
      partitions.saveAsTextFile("/output/4");

      // 獲得最終的TOP N
      SortedMap<Integer, String> finalTopN = new TreeMap<Integer, String>();
      //獲得所有分區(qū)的TOP N
      List<SortedMap<Integer, String>> allTopN = partitions.collect();
      for (SortedMap<Integer, String> localTopN : allTopN) {
         for (Map.Entry<Integer, String> entry : localTopN.entrySet()) {
             // count = entry.getKey()
             // url = entry.getValue()
             finalTopN.put(entry.getKey(), entry.getValue());
             // keep only top N 
             if (finalTopN.size() > N) {
                finalTopN.remove(finalTopN.firstKey());
             }
         }
      }
    
      //輸出最終的TOP N
      for (Map.Entry<Integer, String> entry : finalTopN.entrySet()) {
         System.out.println(entry.getKey() + "--" + entry.getValue());
      }

      System.exit(0);
   }

基于takeOrdered實現(xiàn)的鍵不唯一的方法

//步驟8:獲取全局TOP 10的使用
 List<Tuple2<String, Integer>> topNResult = uniqueKeys.takeOrdered(N, MyTupleComparator.INSTANCE);
 
//但需要實現(xiàn)排序方法
static class MyTupleComparator implements Comparator<Tuple2<String, Integer>> ,Serializable {
       final static MyTupleComparator INSTANCE = new MyTupleComparator();
       @Override
       public int compare(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
          return -t1._2.compareTo(t2._2);     // sorts RDD elements descending (use for Top-N)
          // return t1._2.compareTo(t2._2);   // sorts RDD elements ascending (use for Bottom-N)
       }
   }

用mapreduce求不唯一的方法

  • 先類似wordCount求出唯一的<key,value>
  • 之后用第一節(jié)唯一鍵求TOP N即可

使用Scala實現(xiàn)唯一鍵和不唯一鍵

唯一鍵的實現(xiàn)方法
 def main(args: Array[String]): Unit = {
    if (args.size < 1) {
      println("Usage: TopN <input>")
      sys.exit(1)
    }
    //獲得sparkConf對象
    val sparkConf = new SparkConf().setAppName("TopN")
    val sc = new SparkContext(sparkConf)
    //廣播N變量
    val N = sc.broadcast(10)
    val path = args(0)

    val input = sc.textFile(path)
    //注意:key和value倒過來了
    val pair = input.map(line => {
      val tokens = line.split(",")
      (tokens(2).toInt, tokens)
    })

    import Ordering.Implicits._
    val partitions = pair.mapPartitions(itr => {
    //sortedMap是對key進行排序的师骗,也即對value排序了
      var sortedMap = SortedMap.empty[Int, Array[String]]
      itr.foreach { tuple =>
        {
          sortedMap += tuple
          if (sortedMap.size > N.value) {
            sortedMap = sortedMap.takeRight(N.value)
          }
        }
      }
      //獲得分區(qū)右邊的N個
      sortedMap.takeRight(N.value).toIterator
    })
    //獲得所有分區(qū)
    val alltop10 = partitions.collect()
    //把所有分區(qū)連接上SortedMap历等,也即可所有分區(qū)都排序好了
    val finaltop10 = SortedMap.empty[Int, Array[String]].++:(alltop10)
    val resultUsingMapPartition = finaltop10.takeRight(N.value)
    
    //Prints result (top 10) on the console
    resultUsingMapPartition.foreach {
      case (k, v) => println(s"$k \t ${v.asInstanceOf[Array[String]].mkString(",")}")
    }

    // 方法二:sortByKey對key進行排序,以降序的方式
    val moreConciseApproach = pair.groupByKey().sortByKey(false).take(N.value)

    //Prints result (top 10) on the console
    moreConciseApproach.foreach {
      case (k, v) => println(s"$k \t ${v.flatten.mkString(",")}")
    }
    
    // done
    sc.stop()
  }
不唯一鍵的實現(xiàn)方法
def main(args: Array[String]): Unit = {
    if (args.size < 1) {
      println("Usage: TopNNonUnique <input>")
      sys.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("TopNNonUnique")
    val sc = new SparkContext(sparkConf)

    val N = sc.broadcast(2)
    val path = args(0)

    val input = sc.textFile(path)
    val kv = input.map(line => {
      val tokens = line.split(",")
      (tokens(0), tokens(1).toInt)
    })

    val uniqueKeys = kv.reduceByKey(_ + _)
    import Ordering.Implicits._
    val partitions = uniqueKeys.mapPartitions(itr => {
      //SortedMap是一個對鍵進行排列
      var sortedMap = SortedMap.empty[Int, String]
      itr.foreach { tuple =>
        {
          //把元組的值相反再相加
          sortedMap += tuple.swap
          if (sortedMap.size > N.value) {
            sortedMap = sortedMap.takeRight(N.value)
          }
        }
      }
      sortedMap.takeRight(N.value).toIterator
    })

    val alltop10 = partitions.collect()
    val finaltop10 = SortedMap.empty[Int, String].++:(alltop10)
    val resultUsingMapPartition = finaltop10.takeRight(N.value)

    //Prints result (top 10) on the console
    resultUsingMapPartition.foreach { 
      case (k, v) => println(s"$k \t ${v.mkString(",")}") 
    }

    // Below is additional approach which is more concise
    val createCombiner = (v: Int) => v
    val mergeValue = (a: Int, b: Int) => (a + b)
    val moreConciseApproach = kv.combineByKey(createCombiner, mergeValue, mergeValue)
                                .map(_.swap)
                                .groupByKey()
                                .sortByKey(false)
                                .take(N.value)

    //Prints result (top 10) on the console
    moreConciseApproach.foreach {
      case (k, v) => println(s"$k \t ${v.mkString(",")}")
    }

    // done
    sc.stop()
  }

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末辟癌,一起剝皮案震驚了整個濱河市寒屯,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌黍少,老刑警劉巖寡夹,帶你破解...
    沈念sama閱讀 217,509評論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異厂置,居然都是意外死亡菩掏,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,806評論 3 394
  • 文/潘曉璐 我一進店門昵济,熙熙樓的掌柜王于貴愁眉苦臉地迎上來智绸,“玉大人,你說我怎么就攤上這事访忿∏评酰” “怎么了?”我有些...
    開封第一講書人閱讀 163,875評論 0 354
  • 文/不壞的土叔 我叫張陵海铆,是天一觀的道長迹恐。 經(jīng)常有香客問我,道長卧斟,這世上最難降的妖魔是什么殴边? 我笑而不...
    開封第一講書人閱讀 58,441評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮珍语,結(jié)果婚禮上锤岸,老公的妹妹穿的比我還像新娘。我一直安慰自己廊酣,他們只是感情好能耻,可當我...
    茶點故事閱讀 67,488評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著亡驰,像睡著了一般晓猛。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上凡辱,一...
    開封第一講書人閱讀 51,365評論 1 302
  • 那天戒职,我揣著相機與錄音,去河邊找鬼透乾。 笑死洪燥,一個胖子當著我的面吹牛磕秤,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播捧韵,決...
    沈念sama閱讀 40,190評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼市咆,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了再来?” 一聲冷哼從身側(cè)響起蒙兰,我...
    開封第一講書人閱讀 39,062評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎芒篷,沒想到半個月后搜变,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,500評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡针炉,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,706評論 3 335
  • 正文 我和宋清朗相戀三年挠他,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片篡帕。...
    茶點故事閱讀 39,834評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡殖侵,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出赂苗,到底是詐尸還是另有隱情愉耙,我是刑警寧澤,帶...
    沈念sama閱讀 35,559評論 5 345
  • 正文 年R本政府宣布拌滋,位于F島的核電站朴沿,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏败砂。R本人自食惡果不足惜赌渣,卻給世界環(huán)境...
    茶點故事閱讀 41,167評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望昌犹。 院中可真熱鬧坚芜,春花似錦、人聲如沸斜姥。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,779評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽铸敏。三九已至缚忧,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間杈笔,已是汗流浹背闪水。 一陣腳步聲響...
    開封第一講書人閱讀 32,912評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蒙具,地道東北人球榆。 一個月前我還...
    沈念sama閱讀 47,958評論 2 370
  • 正文 我出身青樓朽肥,卻偏偏與公主長得像,于是被迫代替她去往敵國和親持钉。 傳聞我的和親對象是個殘疾皇子衡招,可洞房花燭夜當晚...
    茶點故事閱讀 44,779評論 2 354

推薦閱讀更多精彩內(nèi)容