數(shù)據(jù)算法 Hadoop/Spark大數(shù)據(jù)處理---第七章

本章欲解決的問題

購物籃問題是一個流行的數(shù)據(jù)挖掘計算经宏,常用這個技術(shù)來揭示不用商品或商品組之間的相似度

本章共有三種實現(xiàn)方式

  1. 基于傳統(tǒng)mapreduce實現(xiàn)
  2. 基于spark實現(xiàn)--能求得相關(guān)數(shù)據(jù)的依懶性
  3. 基于傳統(tǒng)Scala實現(xiàn)

基于傳統(tǒng)mapreduce實現(xiàn)

//在MBADriver中都是一般的定義,主要亮點在Combiner
job.setMapperClass(MBAMapper.class);
//Combiner用于在map端將相用的key進行分組,之后再傳到reduce,減少網(wǎng)絡傳輸?shù)臄?shù)量
job.setCombinerClass(MBAReducer.class);
job.setReducerClass(MBAReducer.class);


//map函數(shù)
 @Override
   public void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException {

      // input line
      String line = value.toString();
      //convertItemsToList將每一個行轉(zhuǎn)換成list<String>
      List<String> items = convertItemsToList(line);
      if ((items == null) || (items.isEmpty())) {
         // no mapper output will be generated
         return;
      }
      //獲得按照指定數(shù)目的已排序的輸出
      generateMapperOutput(numberOfPairs, items, context);
   }
   
   private static List<String> convertItemsToList(String line) {
      if ((line == null) || (line.length() == 0)) {
         // no mapper output will be generated
         return null;
      }      
      String[] tokens = StringUtils.split(line, ",");   
      if ( (tokens == null) || (tokens.length == 0) ) {
         return null;
      }
      List<String> items = new ArrayList<String>();         
      for (String token : tokens) {
         if (token != null) {
             items.add(token.trim());
         }         
      }         
      return items;
   }
   
   
   private void generateMapperOutput(int numberOfPairs, List<String> items, Context context) 
      throws IOException, InterruptedException {
      //items為源數(shù)據(jù)构资,numberOfPairs為取多少個
      List<List<String>> sortedCombinations = Combination.findSortedCombinations(items, numberOfPairs);
      for (List<String> itemList: sortedCombinations) {
         System.out.println("itemlist="+itemList.toString());
         reducerKey.set(itemList.toString());
         context.write(reducerKey, NUMBER_ONE);
      }   
   }

//reduce為普通的wordCount
@Override
   public void reduce(Text key, Iterable<IntWritable> values, Context context) 
      throws IOException, InterruptedException {
      int sum = 0; // total items paired
      for (IntWritable value : values) {
         sum += value.get();
      }
      context.write(key, new IntWritable(sum));
   }


//findSortedCombinations函數(shù)用于在指定element獲取N個
public static <T extends Comparable<? super T>> List<List<T>> findSortedCombinations(Collection<T> elements, int n) {
        List<List<T>> result = new ArrayList<List<T>>();
        
        if (n == 0) {
            result.add(new ArrayList<T>());
            return result;
        }
        
        List<List<T>> combinations = findSortedCombinations(elements, n - 1);
        for (List<T> combination: combinations) {
            for (T element: elements) {
                if (combination.contains(element)) {
                    continue;
                }
                
                List<T> list = new ArrayList<T>();
                list.addAll(combination);
                
                if (list.contains(element)) {
                    continue;
                }
                
                list.add(element);
                //sort items not to duplicate the items
                //   example: (a, b, c) and (a, c, b) might become  
                //   different items to be counted if not sorted   
                Collections.sort(list);
                
                if (result.contains(list)) {
                    continue;
                }
                
                result.add(list);
            }
        }
        
        return result;
    }

基于spark實現(xiàn)--能求得相關(guān)數(shù)據(jù)的依懶性

public static void main(String[] args) throws Exception {
      // STEP-1: handle input parameters
      if (args.length < 1) {
         System.err.println("Usage: FindAssociationRules <transactions>");
         System.exit(1);
      }
      String transactionsFileName =  args[0];

      // STEP-2: create a Spark context object
      JavaSparkContext ctx = new JavaSparkContext();
       
      // STEP-3: read all transactions from HDFS and create the first RDD 
      JavaRDD<String> transactions = ctx.textFile(transactionsFileName, 1);
      transactions.saveAsTextFile("/rules/output/1");

      // STEP-4: generate frequent patterns
      // PairFlatMapFunction<T, K, V>     
      // T => Iterable<Tuple2<K, V>>

       /**
        * 輸出格式為([a],1),([b],1)([a,b],1)([a,b,c],1)([b,d],1)等
        */
       JavaPairRDD<List<String>,Integer> patterns =
         transactions.flatMapToPair(new PairFlatMapFunction<
                                                             String,        // T
                                                             List<String>,  // K
                                                             Integer        // V
                                                           >() {
         @Override
         public Iterator<Tuple2<List<String>,Integer>> call(String transaction) {
            List<String> list = Util.toList(transaction);
            //傳入findSortedCombinations時不指定獲取的參數(shù)N,則取得全部
            List<List<String>> combinations = Combination.findSortedCombinations(list);
            List<Tuple2<List<String>,Integer>> result = new ArrayList<Tuple2<List<String>,Integer>>();
            for (List<String> combList : combinations) {
                 if (combList.size() > 0) {
                     //把全部的組合賦上一次
                   result.add(new Tuple2<List<String>,Integer>(combList, 1));
                 }
            }
            return result.iterator();
         }
      });    
      patterns.saveAsTextFile("/rules/output/2");
    
      // 對key相同的進行聚合
      JavaPairRDD<List<String>, Integer> combined = patterns.reduceByKey(new Function2<Integer, Integer, Integer>() {
         @Override
         public Integer call(Integer i1, Integer i2) {
            return i1 + i2;
         }
      });    
      combined.saveAsTextFile("/rules/output/3");
    
      // now, we have: patterns(K,V)
      //      K = pattern as List<String>
      //      V = frequency of pattern
      // now given (K,V) as (List<a,b,c>, 2) we will 
      // generate the following (K2,V2) pairs:
      //
      //   (List<a,b,c>, T2(null, 2))
      //   (List<a,b>,   T2(List<a,b,c>, 2))
      //   (List<a,c>,   T2(List<a,b,c>, 2))
      //   (List<b,c>,   T2(List<a,b,c>, 2))


      // STEP-6: generate all sub-patterns
      // PairFlatMapFunction<T, K, V>     
      // T => Iterable<Tuple2<K, V>>
       /**
        * 輸出的類型大概:([a,b],(null,2))([b],([a,b],2)) ([a],([a,b],2))
        * ([a,b,d],(null,1)) ([b,d],([a,b,d],1)) ([a,d],([a,b,d],1)) ([a,b],([a,b,d],1))
        */
      JavaPairRDD<List<String>,Tuple2<List<String>,Integer>> subpatterns = 
         combined.flatMapToPair(new PairFlatMapFunction<
          Tuple2<List<String>, Integer>,   // T
          List<String>,                    // K
          Tuple2<List<String>,Integer>     // V
        >() {
       @Override
       public Iterator<Tuple2<List<String>,Tuple2<List<String>,Integer>>> 
          call(Tuple2<List<String>, Integer> pattern) {
            List<Tuple2<List<String>,Tuple2<List<String>,Integer>>> result = 
               new ArrayList<Tuple2<List<String>,Tuple2<List<String>,Integer>>>();
            List<String> list = pattern._1;
            Integer frequency = pattern._2;
            result.add(new Tuple2(list, new Tuple2(null,frequency)));
            if (list.size() == 1) {
               return result.iterator();
            }
            
            // pattern has more than one items
            // result.add(new Tuple2(list, new Tuple2(null,size)));
            for (int i=0; i < list.size(); i++) {
                //removeOneItem用于刪除掉list中的一個值并返回list
               List<String> sublist = Util.removeOneItem(list, i);
               result.add(new Tuple2(sublist, new Tuple2(list, frequency)));
            }
            return result.iterator();
        }
      });
      subpatterns.saveAsTextFile("/rules/output/4");
        
      // 將key進行分組
       /**
        * 輸出的格式為:([a,c],[([a,b,c],1),(null,1)]) --- key:[a,c]
        *               ([a,b,c],[(null,1)])            --- key:[a,b,c]
        *               ([b,c],[(null,3),([a,b,c],1)])  --- key:[b,c]
        */
      JavaPairRDD<List<String>,Iterable<Tuple2<List<String>,Integer>>> rules = subpatterns.groupByKey();       
      rules.saveAsTextFile("/rules/output/5");

      // STEP-7: generate association rules      
      // Now, use (K=List<String>, V=Iterable<Tuple2<List<String>,Integer>>) 
      // to generate association rules
      // JavaRDD<R> map(Function<T,R> f)
      // Return a new RDD by applying a function to all elements of this RDD.
       /**
        * 輸出的格式為:[([a,b],[d],0.5),([a,b],[c],0.5)]
        *              []
        *              [([c],[b],1.0),([c],[a],0.33333)]
        */
      JavaRDD<List<Tuple3<List<String>,List<String>,Double>>> assocRules = rules.map(new Function<
          Tuple2<List<String>,Iterable<Tuple2<List<String>,Integer>>>,     // T: input 
          List<Tuple3<List<String>,List<String>,Double>>                   // R: ( ac => b, 1/3): T3(List(a,c), List(b),  0.33)
                                                                           //    ( ad => c, 1/3): T3(List(a,d), List(c),  0.33)
         >() {
        @Override
        public List<Tuple3<List<String>,List<String>,Double>> call(Tuple2<List<String>,Iterable<Tuple2<List<String>,Integer>>> in) {
            List<Tuple3<List<String>,List<String>,Double>> result = 
               new ArrayList<Tuple3<List<String>,List<String>,Double>>();
            //([a,c],[([a,b,c],1),(null,1)])中的[a,c]
            List<String> fromList = in._1;
            // //([a,c],[([a,b,c],1),(null,1)])中的[([a,b,c],1),(null,1)
            Iterable<Tuple2<List<String>,Integer>> to = in._2;
            List<Tuple2<List<String>,Integer>> toList = new ArrayList<Tuple2<List<String>,Integer>>();
            Tuple2<List<String>,Integer> fromCount = null;
            for (Tuple2<List<String>,Integer> t2 : to) {
               // find the "count" object
               if (t2._1 == null) {
                   //fromCount用于記錄總的值count
                    fromCount = t2;
               }
               else {
                  toList.add(t2);
               }
            }
            
            // Now, we have the required objects for generating association rules:
            //  "fromList", "fromCount", and "toList"
            if (toList.isEmpty()) {
               // no output generated, but since Spark does not like null objects, we will fake a null object
               return result; // an empty list
            } 
            
            // now using 3 objects: "from", "fromCount", and "toList",
            // create association rules:
            for (Tuple2<List<String>,Integer>  t2 : toList) {
               double confidence = (double) t2._2 / (double) fromCount._2;
               List<String> t2List = new ArrayList<String>(t2._1);
               //把 t2List中關(guān)于fromList的元素全部的刪除
               t2List.removeAll(fromList);
               //定義輸出格式
               result.add(new Tuple3(fromList, t2List, confidence));
            }
          return result;
        }
      });   
      assocRules.saveAsTextFile("/rules/output/6");

      // done
      ctx.close(); 
      
      System.exit(0);
   }

基于Scala實現(xiàn)

//基于Scala的實現(xiàn)方式
  def main(args: Array[String]): Unit = {

    if (args.size < 2) {
      println("Usage: FindAssociationRules <input-path> <output-path>")
      sys.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("market-basket-analysis")
    val sc = new SparkContext(sparkConf)

    val input = args(0)
    val output = args(1)

    val transactions = sc.textFile(input)

    val patterns = transactions.flatMap(line => {
      val items = line.split(",").toList
      //對每一條讀取的transactions根據(jù)逗號分隔之后賦值上1
      (0 to items.size) flatMap items.combinations filter (xs => !xs.isEmpty)
    }).map((_, 1))
    //對key進行組合
    val combined = patterns.reduceByKey(_ + _)
    //生成所有的子模式
    val subpatterns = combined.flatMap(pattern => {
      val result = ListBuffer.empty[Tuple2[List[String], Tuple2[List[String], Int]]]
      //第一個賦值為(null,count)
      result += ((pattern._1, (Nil, pattern._2)))

      val sublist = for {
        i <- 0 until pattern._1.size
        //獲取比pattern._1.size少一個的list
        xs = pattern._1.take(i) ++ pattern._1.drop(i + 1)
        if xs.size > 0
      } yield (xs, (pattern._1, pattern._2))
      result ++= sublist
      result.toList
    })
    //對組合之后的可以進行分組
    val rules = subpatterns.groupByKey()

    val assocRules = rules.map(in => {
      //獲得總數(shù)count
      val fromCount = in._2.find(p => p._1 == Nil).get
      val toList = in._2.filter(p => p._1 != Nil).toList
      if (toList.isEmpty) Nil
      else {
        val result =
          for {
            t2 <- toList
          //獲得比率
            confidence = t2._2.toDouble / fromCount._2.toDouble
          //t2._1去點與in._1相同的element
            difference = t2._1 diff in._1
          } yield (((in._1, difference, confidence)))
        result
      }
    })

    // Formatting the result just for easy reading.
    val formatResult = assocRules.flatMap(f => {
      f.map(s => (s._1.mkString("[", ",", "]"), s._2.mkString("[", ",", "]"), s._3))
    })
    formatResult.saveAsTextFile(output)

    // done!
    sc.stop()
  }

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末速客,一起剝皮案震驚了整個濱河市叠赐,隨后出現(xiàn)的幾起案子欲账,更是在濱河造成了極大的恐慌,老刑警劉巖芭概,帶你破解...
    沈念sama閱讀 222,590評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件赛不,死亡現(xiàn)場離奇詭異,居然都是意外死亡罢洲,警方通過查閱死者的電腦和手機踢故,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,157評論 3 399
  • 文/潘曉璐 我一進店門文黎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人殿较,你說我怎么就攤上這事耸峭。” “怎么了淋纲?”我有些...
    開封第一講書人閱讀 169,301評論 0 362
  • 文/不壞的土叔 我叫張陵劳闹,是天一觀的道長。 經(jīng)常有香客問我洽瞬,道長本涕,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,078評論 1 300
  • 正文 為了忘掉前任伙窃,我火速辦了婚禮菩颖,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘为障。我一直安慰自己位他,他們只是感情好,可當我...
    茶點故事閱讀 69,082評論 6 398
  • 文/花漫 我一把揭開白布产场。 她就那樣靜靜地躺著鹅髓,像睡著了一般。 火紅的嫁衣襯著肌膚如雪京景。 梳的紋絲不亂的頭發(fā)上窿冯,一...
    開封第一講書人閱讀 52,682評論 1 312
  • 那天,我揣著相機與錄音确徙,去河邊找鬼醒串。 笑死,一個胖子當著我的面吹牛鄙皇,可吹牛的內(nèi)容都是我干的芜赌。 我是一名探鬼主播,決...
    沈念sama閱讀 41,155評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼伴逸,長吁一口氣:“原來是場噩夢啊……” “哼缠沈!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起错蝴,我...
    開封第一講書人閱讀 40,098評論 0 277
  • 序言:老撾萬榮一對情侶失蹤洲愤,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后顷锰,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體柬赐,經(jīng)...
    沈念sama閱讀 46,638評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,701評論 3 342
  • 正文 我和宋清朗相戀三年官紫,在試婚紗的時候發(fā)現(xiàn)自己被綠了肛宋。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片州藕。...
    茶點故事閱讀 40,852評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖酝陈,靈堂內(nèi)的尸體忽然破棺而出床玻,到底是詐尸還是另有隱情,我是刑警寧澤后添,帶...
    沈念sama閱讀 36,520評論 5 351
  • 正文 年R本政府宣布笨枯,位于F島的核電站,受9級特大地震影響遇西,放射性物質(zhì)發(fā)生泄漏馅精。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,181評論 3 335
  • 文/蒙蒙 一粱檀、第九天 我趴在偏房一處隱蔽的房頂上張望洲敢。 院中可真熱鬧,春花似錦茄蚯、人聲如沸压彭。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,674評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽壮不。三九已至,卻和暖如春皱碘,著一層夾襖步出監(jiān)牢的瞬間询一,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,788評論 1 274
  • 我被黑心中介騙來泰國打工癌椿, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留健蕊,地道東北人。 一個月前我還...
    沈念sama閱讀 49,279評論 3 379
  • 正文 我出身青樓踢俄,卻偏偏與公主長得像缩功,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子都办,可洞房花燭夜當晚...
    茶點故事閱讀 45,851評論 2 361

推薦閱讀更多精彩內(nèi)容