1柏卤、淺談數(shù)據(jù)挖掘中的關(guān)聯(lián)規(guī)則挖掘
2允跑、Hadoop/MapReduce購物籃分析:關(guān)聯(lián)規(guī)則挖掘
3凛俱、Spark購物籃分析
過程分析:
image.png
image.png
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.mutable.ListBuffer
object FindAssociationRules {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("market-basket-analysis").setMaster("local")
val sc = new SparkContext(sparkConf)
val input = "file:///media/chenjie/0009418200012FF3/ubuntu/mba2.txt"
val output = "file:///media/chenjie/0009418200012FF3/ubuntu/mba2"
val transactions = sc.textFile(input)
/*
* a,b,c
a,b,d
b,c
b,c
* */
/* val tests = transactions.flatMap(line => {
println("line=" + line)
val items = line.split(",").toList
// Converting to List is required because Spark doesn't partition on Array (as returned by split method)
//(0 to items.size) flatMap items.combinations filter (xs => !xs.isEmpty)
println("result=" + items.combinations(2).mkString(","))
val list = ListBuffer.empty[List[String]]
for(i <- 0 to items.size){
list.++= (items.combinations(i).toBuffer)
}
list.toList.filter(xs => !xs.isEmpty)
})
tests.foreach(println)*/
val patterns = transactions.flatMap(line => {
val items = line.split(",").toList
// Converting to List is required because Spark doesn't partition on Array (as returned by split method)
(0 to items.size) flatMap items.combinations filter (xs => !xs.isEmpty)
/*
combinations(n: Int): Iterator[List[A]] 取列表中的n個元素進行組合箩言,返回不重復(fù)的組合列表,結(jié)果一個迭代器
*/
/*
* 上句話等價于:
* val list = ListBuffer.empty[List[String]]
for(i <- 0 to items.size){
list.++= (items.combinations(i).toBuffer)
}
list.toList.filter(xs => !xs.isEmpty)
* 即對a,b,c
* 先取0個元素進行組合做盅,得到不重復(fù)的組合列表[]缤削,加入list中,list為[[]]
* 再取1個元素進行組合吹榴,得到不重復(fù)的組合列表[[a],[b],[c]]亭敢,加入list中,list為[[a],[b],[c]]
* 再取2個元素進行組合图筹,得到不重復(fù)的組合列表[[a,b],[a,c],[b,c]]帅刀,加入list中,list為[[],[a],[b],[c],[a,b],[a,c],[b,c]]
* 再取3個元素進行組合让腹,得到不重復(fù)的組合列表[[a,b,c]],加入list中,list為[[],[a],[b],[c],[a,b],[a,c],[b,c],[a,b,c]]
* 然后對其進行過濾扣溺,去掉其中為空的列表
* list為[[a],[b],[c],[a,b],[a,c],[b,c],[a,b,c]]
* 最后回到外層的flatMap骇窍,會將列表的列表拍扁成列表:
* [a],[b],[c],[a,b],[a,c],[b,c],[a,b,c]
* */
}).map((_, 1))
//到最外面的map,將列表映射為(列表锥余,1)的鍵值對
/*
* (List(a),1)
(List(b),1)
(List(c),1)
(List(a, b),1)
(List(a, c),1)
(List(b, c),1)
(List(a, b, c),1)
(List(a),1)
(List(b),1)
(List(d),1)
(List(a, b),1)
(List(a, d),1)
(List(b, d),1)
(List(a, b, d),1)
(List(b),1)
(List(c),1)
(List(b, c),1)
(List(b),1)
(List(c),1)
(List(b, c),1)
* */
val combined = patterns.reduceByKey(_ + _)//合并key值相同的鍵值對
/*
* (List(a, b, c),1)
(List(b),4)
(List(a, b, d),1)
(List(b, d),1)
(List(a, b),2)
(List(a),2)
(List(a, d),1)
(List(b, c),3)
(List(a, c),1)
(List(c),3)
(List(d),1)
*
* */
/*下面開始生成子模式
給定一個頻繁模式:(K=List<A1,A2,...,An>,V=Frequency)
創(chuàng)建如下的子模式(K2,V2)
(K2=K=List<A1,A2,...,An>,V2=Tuple(null,V))
即把K作為K2腹纳,Tuple(null,V))作為V2
(K2=List<A1,A2,...,An-1>),V2=Tuple(K,V))
(K2=List<A1,A2,...,An-2,An>),V2=Tuple(K,V))
...
(K2=List<A2,...,An-1,An>),V2=Tuple(K,V))
即把K的每一個元素拿掉一次作為K2,Tuple(K,V))作為V2
*/
val subpatterns = combined.flatMap(pattern => {
//pattern:(List(a, b, c),1)
val result = ListBuffer.empty[Tuple2[List[String], Tuple2[List[String], Int]]]
result += ((pattern._1, (Nil, pattern._2)))//即把K作為K2,Tuple(null,V))作為V2
val sublist = for {
i <- 0 until pattern._1.size
xs = pattern._1.take(i) ++ pattern._1.drop(i + 1)
if xs.size > 0
} yield (xs, (pattern._1, pattern._2))
//上段代碼等價于:
/*
for(i <- 0 to pattern._1.size){
val sublist = pattern._1.take(i) ++ pattern._1.drop(i + 1)
if(sublist.size > 0)
result += new Tuple2(sublist,new Tuple2(pattern._1,pattern._2))
}
即每次去掉一個元素驱犹,將剩下的元素集合作為K2
*/
result ++= sublist
result.toList
})
/*
* (List(a, b, c),(List(),1))
(List(b, c),(List(a, b, c),1))
(List(a, c),(List(a, b, c),1))
(List(a, b),(List(a, b, c),1))
(List(b),(List(),4))
(List(a, b, d),(List(),1))
(List(b, d),(List(a, b, d),1))
(List(a, d),(List(a, b, d),1))
(List(a, b),(List(a, b, d),1))
(List(b, d),(List(),1))
(List(d),(List(b, d),1))
(List(b),(List(b, d),1))
(List(a, b),(List(),2))
(List(b),(List(a, b),2))
(List(a),(List(a, b),2))
(List(a),(List(),2))
(List(a, d),(List(),1))
(List(d),(List(a, d),1))
(List(a),(List(a, d),1))
(List(b, c),(List(),3))
(List(c),(List(b, c),3))
(List(b),(List(b, c),3))
(List(a, c),(List(),1))
(List(c),(List(a, c),1))
(List(a),(List(a, c),1))
(List(c),(List(),3))
(List(d),(List(),1))
* */
val rules = subpatterns.groupByKey()
/*
* (List(a, b, c),CompactBuffer((List(),1)))
(List(b),CompactBuffer((List(),4), (List(b, d),1), (List(a, b),2), (List(b, c),3)))
(List(a, b),CompactBuffer((List(a, b, c),1), (List(a, b, d),1), (List(),2)))
(List(b, d),CompactBuffer((List(a, b, d),1), (List(),1)))
(List(a, b, d),CompactBuffer((List(),1)))
(List(a),CompactBuffer((List(a, b),2), (List(),2), (List(a, d),1), (List(a, c),1)))
(List(a, d),CompactBuffer((List(a, b, d),1), (List(),1)))
(List(b, c),CompactBuffer((List(a, b, c),1), (List(),3)))
(List(a, c),CompactBuffer((List(a, b, c),1), (List(),1)))
(List(c),CompactBuffer((List(b, c),3), (List(a, c),1), (List(),3)))
(List(d),CompactBuffer((List(b, d),1), (List(a, d),1), (List(),1)))
* */
val assocRules = rules.map(in => {
println("in=" + in)
//in:(List(b),CompactBuffer((List(),4), (List(b, d),1), (List(a, b),2), (List(b, c),3)))
val fromCount = in._2.find(p => p._1 == Nil).get//找到[b]的frequency:即(List(),4)
println("fromCount=" + fromCount)
val toList = in._2.filter(p => p._1 != Nil).toList//將規(guī)則集合去掉空的
println("toList=" + toList)
//toList:CompactBuffer((List(b, d),1), (List(a, b),2), (List(b, c),3))
if (toList.isEmpty) Nil
else {
val result =
for {
t2 <- toList
confidence = t2._2.toDouble / fromCount._2.toDouble
difference = t2._1 diff in._1
//diff(that: collection.Seq[A]): List[A] 保存列表中那些不在另外一個列表中的元素嘲恍,即從集合中減去與另外一個集合的交集
} yield (((in._1, difference, confidence)))
result
}
//等價于
/*if (toList.isEmpty) Nil
else {
val result = ListBuffer.empty[Tuple3[List[String],List[String],Double]]
for(t2 <- toList){
println("t2=" + t2)
//t2:(List(b, d),1)
val confidence = t2._2.toDouble / fromCount._2.toDouble
val difference = t2._1 diff in._1
println(Tuple3(in._1, difference, confidence))
result.+=(Tuple3(in._1, difference, confidence))
}
result
}*/
})
assocRules.foreach(println)
/*
List()
List((List(b),List(d),0.25), (List(b),List(a),0.5), (List(b),List(c),0.75))
List((List(a, b),List(c),0.5), (List(a, b),List(d),0.5))
List((List(b, d),List(a),1.0))
List()
List((List(a),List(b),1.0), (List(a),List(d),0.5), (List(a),List(c),0.5))
List((List(a, d),List(b),1.0))
List((List(b, c),List(a),0.3333333333333333))
List((List(a, c),List(b),1.0))
List((List(c),List(b),1.0), (List(c),List(a),0.3333333333333333))
List((List(d),List(b),1.0), (List(d),List(a),1.0))
* */
val formatResult = assocRules.flatMap(f => {
f.map(s => (s._1.mkString("[", ",", "]"), s._2.mkString("[", ",", "]"), s._3))
})
/*
* ([b],[d],0.25)
([b],[a],0.5)
([b],[c],0.75)
([a,b],[c],0.5)
([a,b],[d],0.5)
([b,d],[a],1.0)
([a],[b],1.0)
([a],[d],0.5)
([a],[c],0.5)
([a,d],[b],1.0)
([b,c],[a],0.3333333333333333)
([a,c],[b],1.0)
([c],[b],1.0)
([c],[a],0.3333333333333333)
([d],[b],1.0)
([d],[a],1.0)
* */
formatResult.saveAsTextFile(output)
sc.stop()
}
}