數(shù)據(jù)算法 Hadoop/Spark大數(shù)據(jù)處理---第五章

本章介紹反轉(zhuǎn)排序(五種方法)

  1. 傳統(tǒng)的MapReduce
  2. 傳統(tǒng)的spark
  3. spark SQL的方法
  4. 傳統(tǒng)的Scala方法
  5. Scala的spark SQL方法

本章欲處理的問題

統(tǒng)計一個單詞相鄰前后兩位的數(shù)量戒洼,如有w1,w2,w3,w4,w5,w6,則:

單詞 領(lǐng)域(+-2)
W1 W2,W3
W2 W1,W3,W4
W3 W1,W2,W4,W5
W4 W2,W3,W5,W6
W5 W3,W4,W6
W6 W4,W5

最終要輸出為(word,neighbor,frequency)

傳統(tǒng)的MapReduce

類名 類描述
RelativeFrequencyDriver 提交作業(yè)驅(qū)動器
RelativeFrequencyMapper map()函數(shù)
RelativeFrequencyReducer reduce()函數(shù)
RelativeFrequencyCombiner combine-減少map()傳輸
OrderInversionPartitioner 按照key進行分區(qū)
PairOfWords 表示詞對(Word1,Word2)
//map函數(shù)
 @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        String[] tokens = StringUtils.split(value.toString(), " ");
        //String[] tokens = StringUtils.split(value.toString(), "\\s+");
        if ((tokens == null) || (tokens.length < 2)) {
            return;
        }
        //計算相鄰兩個單詞的計算規(guī)則
        for (int i = 0; i < tokens.length; i++) {
            tokens[i] = tokens[i].replaceAll("\\W+", "");

            if (tokens[i].equals("")) {
                continue;
            }

            pair.setWord(tokens[i]);

            int start = (i - neighborWindow < 0) ? 0 : i - neighborWindow;
            int end = (i + neighborWindow >= tokens.length) ? tokens.length - 1 : i + neighborWindow;
            for (int j = start; j <= end; j++) {
                if (j == i) {
                    continue;
                }
                pair.setNeighbor(tokens[j].replaceAll("\\W", ""));
                context.write(pair, ONE);
            }
            //
            pair.setNeighbor("*");
            totalCount.set(end - start);
            context.write(pair, totalCount);
        }
    }


//reduce函數(shù)
 @Override
    protected void reduce(PairOfWords key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        //等于*表示為單詞本身,它的count為totalCount
        if (key.getNeighbor().equals("*")) {
            if (key.getWord().equals(currentWord)) {
                totalCount += totalCount + getTotalCount(values);
            } else {
                currentWord = key.getWord();
                totalCount = getTotalCount(values);
            }
        } else {
            //其它的則為單次的word场航,需要通過getTotalCount獲得相加
            int count = getTotalCount(values);
            relativeCount.set((double) count / totalCount);
            context.write(key, relativeCount);
        }

    }

傳統(tǒng)的spark

public static void main(String[] args) {
        if (args.length < 3) {
            System.out.println("Usage: RelativeFrequencyJava <neighbor-window> <input-dir> <output-dir>");
            System.exit(1);
        }

        SparkConf sparkConf = new SparkConf().setAppName("RelativeFrequency");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        int neighborWindow = Integer.parseInt(args[0]);
        String input = args[1];
        String output = args[2];

        final Broadcast<Integer> brodcastWindow = sc.broadcast(neighborWindow);

        JavaRDD<String> rawData = sc.textFile(input);

        /*
         * Transform the input to the format: (word, (neighbour, 1))
         */
        JavaPairRDD<String, Tuple2<String, Integer>> pairs = rawData.flatMapToPair(
                new PairFlatMapFunction<String, String, Tuple2<String, Integer>>() {
            private static final long serialVersionUID = -6098905144106374491L;

            @Override
            public java.util.Iterator<scala.Tuple2<String, scala.Tuple2<String, Integer>>> call(String line) throws Exception {
                List<Tuple2<String, Tuple2<String, Integer>>> list = new ArrayList<Tuple2<String, Tuple2<String, Integer>>>();
                String[] tokens = line.split("\\s");
                for (int i = 0; i < tokens.length; i++) {
                    int start = (i - brodcastWindow.value() < 0) ? 0 : i - brodcastWindow.value();
                    int end = (i + brodcastWindow.value() >= tokens.length) ? tokens.length - 1 : i + brodcastWindow.value();
                    for (int j = start; j <= end; j++) {
                        if (j != i) {
                            list.add(new Tuple2<String, Tuple2<String, Integer>>(tokens[i], new Tuple2<String, Integer>(tokens[j], 1)));
                        } else {
                            // do nothing
                            continue;
                        }
                    }
                }
                return list.iterator();
            }
        }
        );

        // (word, sum(word))
        //PairFunction<T, K, V> T => Tuple2<K, V>
        JavaPairRDD<String, Integer> totalByKey = pairs.mapToPair(

                new PairFunction<Tuple2<String, Tuple2<String, Integer>>, String, Integer>() {
            private static final long serialVersionUID = -213550053743494205L;

            @Override
            public Tuple2<String, Integer> call(Tuple2<String, Tuple2<String, Integer>> tuple) throws Exception {
                return new Tuple2<String, Integer>(tuple._1, tuple._2._2);
            }
        }).reduceByKey(
                        new Function2<Integer, Integer, Integer>() {
                    private static final long serialVersionUID = -2380022035302195793L;

                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return (v1 + v2);
                    }
                });

        JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> grouped = pairs.groupByKey();

        // (word, (neighbour, 1)) -> (word, (neighbour, sum(neighbour)))
        //flatMapValues至少對value進行操作萍肆,但是不改變key的順序
        JavaPairRDD<String, Tuple2<String, Integer>> uniquePairs = grouped.flatMapValues(
                //Function<T1, R> -> R call(T1 v1)
                new Function<Iterable<Tuple2<String, Integer>>, Iterable<Tuple2<String, Integer>>>() {
            private static final long serialVersionUID = 5790208031487657081L;
            
            @Override
            public Iterable<Tuple2<String, Integer>> call(Iterable<Tuple2<String, Integer>> values) throws Exception {
                Map<String, Integer> map = new HashMap<>();
                List<Tuple2<String, Integer>> list = new ArrayList<>();
                Iterator<Tuple2<String, Integer>> iterator = values.iterator();
                while (iterator.hasNext()) {
                    Tuple2<String, Integer> value = iterator.next();
                    int total = value._2;
                    if (map.containsKey(value._1)) {
                        total += map.get(value._1);
                    }
                    map.put(value._1, total);
                }
                for (Map.Entry<String, Integer> kv : map.entrySet()) {
                    list.add(new Tuple2<String, Integer>(kv.getKey(), kv.getValue()));
                }
                return list;
            }
        });

        // (word, ((neighbour, sum(neighbour)), sum(word)))
        JavaPairRDD<String, Tuple2<Tuple2<String, Integer>, Integer>> joined = uniquePairs.join(totalByKey);

        // ((key, neighbour), sum(neighbour)/sum(word))
        JavaPairRDD<Tuple2<String, String>, Double> relativeFrequency = joined.mapToPair(
                new PairFunction<Tuple2<String, Tuple2<Tuple2<String, Integer>, Integer>>, Tuple2<String, String>, Double>() {
            private static final long serialVersionUID = 3870784537024717320L;

            @Override
            public Tuple2<Tuple2<String, String>, Double> call(Tuple2<String, Tuple2<Tuple2<String, Integer>, Integer>> tuple) throws Exception {
                return new Tuple2<Tuple2<String, String>, Double>(new Tuple2<String, String>(tuple._1, tuple._2._1._1), ((double) tuple._2._1._2 / tuple._2._2));
            }
        });

        // For saving the output in tab separated format
        // ((key, neighbour), relative_frequency)
        //將結(jié)果轉(zhuǎn)換成一個String
        JavaRDD<String> formatResult_tab_separated = relativeFrequency.map(
                new Function<Tuple2<Tuple2<String, String>, Double>, String>() {
            private static final long serialVersionUID = 7312542139027147922L;

            @Override
            public String call(Tuple2<Tuple2<String, String>, Double> tuple) throws Exception {
                return tuple._1._1 + "\t" + tuple._1._2 + "\t" + tuple._2;
            }
        });

        // save output
        formatResult_tab_separated.saveAsTextFile(output);

        // done
        sc.close();

    }

spark SQL的方法

 public static void main(String[] args) {
        if (args.length < 3) {
            System.out.println("Usage: SparkSQLRelativeFrequency <neighbor-window> <input-dir> <output-dir>");
            System.exit(1);
        }

        SparkConf sparkConf = new SparkConf().setAppName("SparkSQLRelativeFrequency");
        //創(chuàng)建SparkSQL需要的SparkSession
        SparkSession spark = SparkSession
                .builder()
                .appName("SparkSQLRelativeFrequency")
                .config(sparkConf)
                .getOrCreate();

        JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
        int neighborWindow = Integer.parseInt(args[0]);
        String input = args[1];
        String output = args[2];

        final Broadcast<Integer> brodcastWindow = sc.broadcast(neighborWindow);

        /*
         *注冊一個Schema表袍榆,這個frequency等會要用
         * Schema (word, neighbour, frequency)
         */
        StructType rfSchema = new StructType(new StructField[]{
            new StructField("word", DataTypes.StringType, false, Metadata.empty()),
            new StructField("neighbour", DataTypes.StringType, false, Metadata.empty()),
            new StructField("frequency", DataTypes.IntegerType, false, Metadata.empty())});

        JavaRDD<String> rawData = sc.textFile(input);

        /*
         * Transform the input to the format: (word, (neighbour, 1))
         */
        JavaRDD<Row> rowRDD = rawData
                .flatMap(new FlatMapFunction<String, Row>() {
                    private static final long serialVersionUID = 5481855142090322683L;

                    @Override
                    public Iterator<Row> call(String line) throws Exception {
                        List<Row> list = new ArrayList<>();
                        String[] tokens = line.split("\\s");
                        for (int i = 0; i < tokens.length; i++) {
                            int start = (i - brodcastWindow.value() < 0) ? 0
                                    : i - brodcastWindow.value();
                            int end = (i + brodcastWindow.value() >= tokens.length) ? tokens.length - 1
                                    : i + brodcastWindow.value();
                            for (int j = start; j <= end; j++) {
                                if (j != i) {
                                    list.add(RowFactory.create(tokens[i], tokens[j], 1));
                                } else {
                                    // do nothing
                                    continue;
                                }
                            }
                        }
                        return list.iterator();
                    }
                });
        //創(chuàng)建DataFrame
        Dataset<Row> rfDataset = spark.createDataFrame(rowRDD, rfSchema);
        //將rfDataset轉(zhuǎn)成一個table,可以進行查詢
        rfDataset.createOrReplaceTempView("rfTable");

        String query = "SELECT a.word, a.neighbour, (a.feq_total/b.total) rf "
                + "FROM (SELECT word, neighbour, SUM(frequency) feq_total FROM rfTable GROUP BY word, neighbour) a "
                + "INNER JOIN (SELECT word, SUM(frequency) as total FROM rfTable GROUP BY word) b ON a.word = b.word";
        Dataset<Row> sqlResult = spark.sql(query);

        sqlResult.show(); // print first 20 records on the console
        sqlResult.write().parquet(output + "/parquetFormat"); // saves output in compressed Parquet format, recommended for large projects.
        sqlResult.rdd().saveAsTextFile(output + "/textFormat"); // to see output via cat command

        // done
        sc.close();
        spark.stop();

    }

傳統(tǒng)的Scala方法

def main(args: Array[String]): Unit = {

    if (args.size < 3) {
      println("Usage: RelativeFrequency <neighbor-window> <input-dir> <output-dir>")
      sys.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("RelativeFrequency")
    val sc = new SparkContext(sparkConf)

    val neighborWindow = args(0).toInt
    val input = args(1)
    val output = args(2)

    val brodcastWindow = sc.broadcast(neighborWindow)

    val rawData = sc.textFile(input)

    /* 
     * Transform the input to the format:
     * (word, (neighbour, 1))
     */
    val pairs = rawData.flatMap(line => {
      val tokens = line.split("\\s")
      for {
        i <- 0 until tokens.length
        start = if (i - brodcastWindow.value < 0) 0 else i - brodcastWindow.value
        end = if (i + brodcastWindow.value >= tokens.length) tokens.length - 1 else i + brodcastWindow.value
        j <- start to end if (j != i)
        //用yield來收集轉(zhuǎn)換之后的函數(shù)(word, (neighbour, 1))
      } yield (tokens(i), (tokens(j), 1))
    })

    // (word, sum(word))
    val totalByKey = pairs.map(t => (t._1, t._2._2)).reduceByKey(_ + _)

    val grouped = pairs.groupByKey()

    // (word, (neighbour, sum(neighbour)))
    val uniquePairs = grouped.flatMapValues(_.groupBy(_._1).mapValues(_.unzip._2.sum))
    //用join函數(shù)把兩個RDD連接起來
    // (word, ((neighbour, sum(neighbour)), sum(word)))
    val joined = uniquePairs join totalByKey

    // ((key, neighbour), sum(neighbour)/sum(word))
    val relativeFrequency = joined.map(t => {
      ((t._1, t._2._1._1), (t._2._1._2.toDouble / t._2._2.toDouble))
    })

    // For saving the output in tab separated format
    // ((key, neighbour), relative_frequency)
    val formatResult_tab_separated = relativeFrequency.map(t => t._1._1 + "\t" + t._1._2 + "\t" + t._2)
    formatResult_tab_separated.saveAsTextFile(output)

    // done
    sc.stop()
  }

Scala的spark SQL方法

def main(args: Array[String]): Unit = {

    if (args.size < 3) {
      println("Usage: SparkSQLRelativeFrequency <neighbor-window> <input-dir> <output-dir>")
      sys.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("SparkSQLRelativeFrequency")

    val spark = SparkSession
      .builder()
      .config(sparkConf)
      .getOrCreate()
    val sc = spark.sparkContext

    val neighborWindow = args(0).toInt
    val input = args(1)
    val output = args(2)

    val brodcastWindow = sc.broadcast(neighborWindow)

    val rawData = sc.textFile(input)

    /*
    * Schema
    * (word, neighbour, frequency)
    */
    val rfSchema = StructType(Seq(
      StructField("word", StringType, false),
      StructField("neighbour", StringType, false),
      StructField("frequency", IntegerType, false)))

    /* 
     * Transform the input to the format:
     * Row(word, neighbour, 1)
     */
    //轉(zhuǎn)換成StructType中要求的格式
    val rowRDD = rawData.flatMap(line => {
      val tokens = line.split("\\s")
      for {
        i <- 0 until tokens.length
        //正常的計算規(guī)則塘揣,與MapReduce有區(qū)別
        start = if (i - brodcastWindow.value < 0) 0 else i - brodcastWindow.value
        end = if (i + brodcastWindow.value >= tokens.length) tokens.length - 1 else i + brodcastWindow.value
        j <- start to end if (j != i)
      } yield Row(tokens(i), tokens(j), 1)
    })

    val rfDataFrame = spark.createDataFrame(rowRDD, rfSchema)
    //創(chuàng)建rfTable表
    rfDataFrame.createOrReplaceTempView("rfTable")

    import spark.sql

    val query = "SELECT a.word, a.neighbour, (a.feq_total/b.total) rf " +
      "FROM (SELECT word, neighbour, SUM(frequency) feq_total FROM rfTable GROUP BY word, neighbour) a " +
      "INNER JOIN (SELECT word, SUM(frequency) as total FROM rfTable GROUP BY word) b ON a.word = b.word"

    val sqlResult = sql(query)
    sqlResult.show() // print first 20 records on the console
    sqlResult.write.save(output + "/parquetFormat") // saves output in compressed Parquet format, recommended for large projects.
    sqlResult.rdd.saveAsTextFile(output + "/textFormat") // to see output via cat command

    // done
    spark.stop()

  }

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末包雀,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子亲铡,更是在濱河造成了極大的恐慌才写,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,539評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件奖蔓,死亡現(xiàn)場離奇詭異赞草,居然都是意外死亡,警方通過查閱死者的電腦和手機吆鹤,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評論 3 396
  • 文/潘曉璐 我一進店門厨疙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人疑务,你說我怎么就攤上這事沾凄。” “怎么了知允?”我有些...
    開封第一講書人閱讀 165,871評論 0 356
  • 文/不壞的土叔 我叫張陵撒蟀,是天一觀的道長。 經(jīng)常有香客問我温鸽,道長保屯,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,963評論 1 295
  • 正文 為了忘掉前任嗤朴,我火速辦了婚禮配椭,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘雹姊。我一直安慰自己股缸,他們只是感情好,可當我...
    茶點故事閱讀 67,984評論 6 393
  • 文/花漫 我一把揭開白布吱雏。 她就那樣靜靜地躺著敦姻,像睡著了一般瘾境。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上镰惦,一...
    開封第一講書人閱讀 51,763評論 1 307
  • 那天迷守,我揣著相機與錄音,去河邊找鬼旺入。 笑死兑凿,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的茵瘾。 我是一名探鬼主播礼华,決...
    沈念sama閱讀 40,468評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼拗秘!你這毒婦竟也來了圣絮?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤雕旨,失蹤者是張志新(化名)和其女友劉穎扮匠,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體凡涩,經(jīng)...
    沈念sama閱讀 45,850評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡棒搜,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,002評論 3 338
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了突照。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片帮非。...
    茶點故事閱讀 40,144評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖讹蘑,靈堂內(nèi)的尸體忽然破棺而出末盔,到底是詐尸還是另有隱情,我是刑警寧澤座慰,帶...
    沈念sama閱讀 35,823評論 5 346
  • 正文 年R本政府宣布陨舱,位于F島的核電站,受9級特大地震影響版仔,放射性物質(zhì)發(fā)生泄漏游盲。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,483評論 3 331
  • 文/蒙蒙 一蛮粮、第九天 我趴在偏房一處隱蔽的房頂上張望益缎。 院中可真熱鬧,春花似錦然想、人聲如沸莺奔。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,026評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽令哟。三九已至恼琼,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間屏富,已是汗流浹背晴竞。 一陣腳步聲響...
    開封第一講書人閱讀 33,150評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留狠半,地道東北人噩死。 一個月前我還...
    沈念sama閱讀 48,415評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像神年,于是被迫代替她去往敵國和親甜滨。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,092評論 2 355

推薦閱讀更多精彩內(nèi)容