1 項(xiàng)目介紹
2 涉及的技術(shù)
3 推薦流程圖
4 收獲
5 問(wèn)題
1 項(xiàng)目介紹
- 使用Spark框架實(shí)現(xiàn)電影推薦系統(tǒng);
- 運(yùn)用數(shù)據(jù)挖掘的算法產(chǎn)生模型映凳,為用戶精準(zhǔn)推薦喜好的電影颊亮;
- 分別通過(guò)離線和實(shí)時(shí)兩種方式實(shí)現(xiàn)電影推薦系統(tǒng)糟港;
2 涉及技術(shù)
Spark:基于內(nèi)存的分布式計(jì)算框架
Hadoop:分布式離線計(jì)算框架
Hive:基于Hadoop的一個(gè)數(shù)據(jù)倉(cāng)庫(kù)工具沮脖,可以將結(jié)構(gòu)化的數(shù)據(jù)文件映射為一張數(shù)據(jù)庫(kù)表,并提供簡(jiǎn)單的sql查詢功能拍埠,可以將sql語(yǔ)句轉(zhuǎn)換為MapReduce任務(wù)進(jìn)行運(yùn)行
Kafka:分布式高并發(fā)消息隊(duì)列,負(fù)責(zé)緩存Flume采集的數(shù)據(jù)并為下游的各種計(jì)算提供高并發(fā)的數(shù)據(jù)處理
Hbase:億級(jí)行百萬(wàn)列并可毫秒級(jí)查詢的數(shù)據(jù)庫(kù)土居,可快速查詢我們的計(jì)算數(shù)據(jù)
Phoenix:是構(gòu)建在HBase上的SQL中間層枣购,Phoenix查詢引擎會(huì)將SQL查詢轉(zhuǎn)換為一個(gè)或者多個(gè)HBase Scan,并行執(zhí)行以生成標(biāo)準(zhǔn)的JDBC結(jié)果集擦耀。
3 推薦流程圖
解釋如下:
- 加載HDFS數(shù)據(jù)棉圈,處理之后存儲(chǔ)到Hive中;
- 離線推薦部分技術(shù)處理思路眷蜓;
- 從Hive中加載訓(xùn)練數(shù)據(jù)和測(cè)試數(shù)據(jù)
- 使用SparkMLlib的ALS交替最小二乘法訓(xùn)練模型
- 使用模型產(chǎn)生推薦結(jié)果
- 將推薦結(jié)果寫(xiě)入到Mysql分瘾、Hive、Phoenix+Hbase中
- 實(shí)時(shí)推薦部分技術(shù)處理思路吁系;
- 從Hive中拿出數(shù)據(jù)
- 取出測(cè)試數(shù)據(jù)集中數(shù)據(jù)德召,send到Kafka中。
- 通過(guò)SparkStreaming主動(dòng)Kafka消息隊(duì)列獲取數(shù)據(jù)汽纤,并根據(jù)用戶是否為新用戶制定推薦策略
- 新用戶上岗,從訓(xùn)練數(shù)據(jù)集中取出瀏覽人數(shù)最多的電影的前5部作為推薦結(jié)果
- 老用戶,使用推薦模型為用戶推薦5部電影
4 收獲
1 大數(shù)據(jù)環(huán)境搭建
(1)單機(jī)版Hadoop蕴坪、Spark肴掷、Hive、Mysql的搭建
Spark處理HDFS數(shù)據(jù)背传,并將結(jié)果存儲(chǔ)在Hive中
配置一臺(tái)Hive + Mysql元數(shù)據(jù)庫(kù)
2 數(shù)據(jù)初始預(yù)處理
object ETL {
def main(args: Array[String]): Unit = {
val localClusterURL = "local[2]"
val clusterMasterURL = "spark://s1:7077"
val conf = new SparkConf().setAppName("ETL").setMaster(clusterMasterURL)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val hc = new HiveContext(sc)
import sqlContext.implicits._
hc.sql("use moive_recommend")
// 設(shè)置RDD的partition的數(shù)量一般以集群分配給應(yīng)用的CPU核數(shù)的整數(shù)倍為宜呆瞻。
val minPartitions = 8
// 通過(guò)case class來(lái)定義Links的數(shù)據(jù)結(jié)構(gòu),數(shù)據(jù)的schema径玖,適用于schama已知的數(shù)據(jù)
// 也可以通過(guò)StructType的方式痴脾,適用于schema未知的數(shù)據(jù),具體參考文檔:
//http://spark.apache.org/docs/1.6.2/sql-programming-guide.html#programmatically-specifying-the-schema
val links = sc.textFile("hdfs://localhost/home/spark/temp/moiveRec/links.txt",minPartitions).filter{ !_.endsWith(",")}.map(_.split(","))
.map(x =>Links(x(0).trim.toInt,x(1).trim().toInt,x(2).trim().toInt)).toDF()
val movies = sc.textFile("hdfs://localhost/home/spark/temp/moiveRec/movies.txt",minPartitions).filter{ !_.endsWith(",")}.map(_.split(","))
.map(x =>Movies(x(0).trim.toInt,x(1).trim(),x(2).trim())).toDF()
val ratings = sc.textFile("hdfs://localhost/home/spark/temp/moiveRec/ratings.txt",minPartitions).filter{ !_.endsWith(",")}.map(_.split(","))
.map(x =>Ratings(x(0).trim.toInt,x(1).trim().toInt,x(2).trim().toDouble,x(3).trim().toInt)).toDF()
val tags = sc.textFile("hdfs://localhost/home/spark/temp/moiveRec/tags.txt", minPartitions).filter { !_.endsWith(",") }.map(x=>rebuild(x))
.map(_.split(",")).map(x => Tags(x(0).trim().toInt, x(1).trim().toInt, x(2).trim(), x(3).trim().toInt)).toDF()
links.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/links")
hc.sql("drop table if exists links")
hc.sql("create table if not exists links(movieId int,imdbId int,tmdbId int) stored as parquet" )
hc.sql("load data inpath '/home/spark/temp/moiveRec/links' overwrite into table links")
movies.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/movies")
hc.sql("drop table if exists movies")
hc.sql("create table if not exists movies(movieId int,title string,genres string) stored as parquet" )
hc.sql("load data inpath '/home/spark/temp/moiveRec/movies' overwrite into table movies")
ratings.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/ratings")
hc.sql("drop table if exists ratings")
hc.sql("create table if not exists ratings(userId int,movieId int,rating double,timestamp int) stored as parquet" )
hc.sql("load data inpath '/home/spark/temp/moiveRec/ratings' overwrite into table ratings")
tags.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/tags")
hc.sql("drop table if exists tags")
hc.sql("create table if not exists tags(userId int,movieId int,tag string,timestamp int) stored as parquet")
hc.sql("load data inpath '/home/spark/temp/moiveRec/tags' overwrite into table tags")
}
// tags中大部分?jǐn)?shù)據(jù)格式如下:
// 4208,260,Action-packed,1438012536
// 但會(huì)出現(xiàn)如下的數(shù)據(jù):
// 4208,260,"Family,Action-packed",1438012562
// 這樣對(duì)數(shù)據(jù)split后插入hive中就會(huì)出錯(cuò),需清洗數(shù)據(jù):
// 4208,260,"Family,Action-packed",1438012562 => 4208,260,FamilyAction-packed,1438012562
private def rebuild(input:String):String = {
val a = input.split(",")
val head = a.take(2).mkString(",")//提取列表的前2個(gè)元素
val tail = a.takeRight(1).mkString//提取列表的最后1個(gè)元素
val b = a.drop(2).dropRight(1).mkString.replace("\"", "")
val output = head + "," + b + "," + tail
output
}
}
3 Hive的使用
配置一臺(tái)Hive + Mysql元數(shù)據(jù)庫(kù)
4 SparkMLlib機(jī)器學(xué)習(xí)算法庫(kù)的使用
/**
* KafkaProducer從測(cè)試數(shù)據(jù)集中取出數(shù)據(jù)
*/
object Spark_MovieTraining extends AppConf {
def main(args: Array[String]): Unit = {
hc.sql("use moive_recommend")
// 訓(xùn)練集挺狰,總數(shù)據(jù)集的60%
val trainingData = hc.sql("select * from trainingData")
val ratingRDD = hc.sql("select * from trainingData")
.rdd.map(x => Rating(x.getInt(0), x.getInt(1), x.getDouble(2)))
// Build the recommendation model using ALS
val rank = 10
val numIterations = 10
val model = ALS.train(ratingRDD, rank, numIterations, 0.01)
// Evaluate the model on rating data
val training = ratingRDD.map {
case Rating(userid, movieid, rating) => (userid, movieid)
}
ratingRDD.persist()
training.persist()
val predictions =
model.predict(training).map {
case Rating(userid, movieid, rating) => ((userid, movieid), rating)
}
val ratesAndPreds = ratingRDD.map { case Rating(userid, movieid, rating) =>
((userid, movieid), rating)
}.join(predictions)
val MSE = ratesAndPreds.map { case ((userid, movieid), (r1, r2)) =>
val err = (r1 - r2)
err * err
}.mean()
println(s"Mean Squared Error = $MSE")
// Save and load model
model.save(sc, s"/home/spark/temp/moiveRec/BestModel1/$MSE")
//val sameModel = MatrixFactorizationModel.load(sc, "/home/spark/temp/moiveRec/BestModel/")
}
}
5 實(shí)時(shí)推薦部分Kafka + Streaming + Phoenix+Hbase流處理
object KafkaProducer extends AppConf {
def main(args: Array[String]): Unit = {
hc.sql("use moive_recommend")
val testDF = hc.sql("select * from testData limit 10000")
val prop = new Properties()
// 指定kafka的 ip地址:端口號(hào)
prop.put("bootstrap.servers", "s1:9092")
// 設(shè)定ProducerRecord發(fā)送的key值為String類型
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
// 設(shè)定ProducerRecord發(fā)送的value值為String類型
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val topic = "movie"
val testData = testDF.map(
x => (topic, x.getInt(0).toString() + "," + x.getInt(1).toString + "," + x.getDouble(2).toString())
)
val producer = new KafkaProducer[String, String](prop)
// 如果服務(wù)器內(nèi)存不夠明郭,會(huì)出現(xiàn)OOM錯(cuò)誤
val messages = testData.toLocalIterator
while (messages.hasNext) {
val message = messages.next()
val record = new ProducerRecord[String, String](topic, message._1, message._2)
println(record)
producer.send(record)
// 延遲10毫秒
Thread.sleep(10)
}
producer.close()
}
}
/**
* 接收kafka產(chǎn)生的數(shù)據(jù),進(jìn)行處理
*/
object SparkDirectStream {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkDirectStream").setMaster("spark://s1:7077")
// Duration對(duì)象中封裝了時(shí)間的一個(gè)對(duì)象丰泊,它的單位是ms.
val batchDuration = new Duration(5000)
// batchDuration為時(shí)間間隔
val ssc = new StreamingContext(conf, batchDuration)
val hc = new HiveContext(ssc.sparkContext)
// 訓(xùn)練數(shù)據(jù)中是否有該用戶
val validusers = hc.sql("select * from trainingData")
val userlist = validusers.select("userId")
val modelpath = "/home/spark/temp/moiveRec/BestModel1/0.5366434001808432"
val broker = "s1:9092"
// val topics = "movie".split(",").toSet
val topics = Set("movie")
// val kafkaParams = Map("bootstrap.servers" -> "spark1:9092")
val kafkaParams = Map("metadata.broker.list" -> "s1:9092")
def exist(u: Int): Boolean = {
val userlist = hc.sql("select distinct(userid) from trainingdata").rdd.map(x => x.getInt(0)).toArray()
userlist.contains(u)
}
// 為沒(méi)有登錄的用戶推薦電影的策略:
// 1.推薦觀看人數(shù)較多的電影薯定,采用這種策略
// 2.推薦最新的電影
val defaultrecresult = hc.sql("select * from pop5result").rdd.toLocalIterator
// 創(chuàng)建SparkStreaming接收kafka消息隊(duì)列數(shù)據(jù)的2種方式
// 一種是Direct approache,通過(guò)SparkStreaming自己主動(dòng)去Kafka消息隊(duì)
// 列中查詢還沒(méi)有接收進(jìn)來(lái)的數(shù)據(jù),并把他們拿到sparkstreaming中瞳购。
val kafkaDirectStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val model = MatrixFactorizationModel.load(ssc.sparkContext, modelpath)
val messages = kafkaDirectStream.foreachRDD { rdd =>
// println(rdd)
val userrdd = rdd.map(x => x._2.split(",")).map(x => x(1)).map(_.toInt)
val validusers = userrdd.filter(user => exist(user))
val newusers = userrdd.filter(user => !exist(user))
// 采用迭代器的方式來(lái)避開(kāi)對(duì)象不能序列化的問(wèn)題话侄。
// 通過(guò)對(duì)RDD中的每個(gè)元素實(shí)時(shí)產(chǎn)生推薦結(jié)果,將結(jié)果寫(xiě)入到redis,或者其他高速緩存中年堆,來(lái)達(dá)到一定的實(shí)時(shí)性吞杭。
// 2個(gè)流的處理分成2個(gè)sparkstreaming的應(yīng)用來(lái)處理。
val validusersIter = validusers.toLocalIterator
val newusersIter = newusers.toLocalIterator
while (validusersIter.hasNext) {
val recresult = model.recommendProducts(validusersIter.next, 5)
println("below movies are recommended for you :")
println(recresult)
}
while (newusersIter.hasNext) {
println("below movies are recommended for you :")
for (i <- defaultrecresult) {
println(i.getString(0))
}
}
}
ssc.start()
ssc.awaitTermination()
}
}
5 問(wèn)題
依舊不熟悉scala語(yǔ)言变丧,在使用Spark時(shí)很多東西依舊不知道
參考文獻(xiàn)
http://www.dajiangtai.com/course/56.do
end