Spark實(shí)時(shí)離線電影推薦系統(tǒng)

1 項(xiàng)目介紹
2 涉及的技術(shù)
3 推薦流程圖
4 收獲
5 問(wèn)題

1 項(xiàng)目介紹

  1. 使用Spark框架實(shí)現(xiàn)電影推薦系統(tǒng);
  2. 運(yùn)用數(shù)據(jù)挖掘的算法產(chǎn)生模型映凳,為用戶精準(zhǔn)推薦喜好的電影颊亮;
  3. 分別通過(guò)離線和實(shí)時(shí)兩種方式實(shí)現(xiàn)電影推薦系統(tǒng)糟港;

2 涉及技術(shù)

  1. Spark:基于內(nèi)存的分布式計(jì)算框架

  2. Hadoop:分布式離線計(jì)算框架

  3. Hive:基于Hadoop的一個(gè)數(shù)據(jù)倉(cāng)庫(kù)工具沮脖,可以將結(jié)構(gòu)化的數(shù)據(jù)文件映射為一張數(shù)據(jù)庫(kù)表,并提供簡(jiǎn)單的sql查詢功能拍埠,可以將sql語(yǔ)句轉(zhuǎn)換為MapReduce任務(wù)進(jìn)行運(yùn)行

  4. Kafka:分布式高并發(fā)消息隊(duì)列,負(fù)責(zé)緩存Flume采集的數(shù)據(jù)并為下游的各種計(jì)算提供高并發(fā)的數(shù)據(jù)處理

  5. Hbase:億級(jí)行百萬(wàn)列并可毫秒級(jí)查詢的數(shù)據(jù)庫(kù)土居,可快速查詢我們的計(jì)算數(shù)據(jù)

  6. Phoenix:是構(gòu)建在HBase上的SQL中間層枣购,Phoenix查詢引擎會(huì)將SQL查詢轉(zhuǎn)換為一個(gè)或者多個(gè)HBase Scan,并行執(zhí)行以生成標(biāo)準(zhǔn)的JDBC結(jié)果集擦耀。

3 推薦流程圖

image.png

解釋如下:

  1. 加載HDFS數(shù)據(jù)棉圈,處理之后存儲(chǔ)到Hive中;
  2. 離線推薦部分技術(shù)處理思路眷蜓;
  • 從Hive中加載訓(xùn)練數(shù)據(jù)和測(cè)試數(shù)據(jù)
  • 使用SparkMLlib的ALS交替最小二乘法訓(xùn)練模型
  • 使用模型產(chǎn)生推薦結(jié)果
  • 將推薦結(jié)果寫(xiě)入到Mysql分瘾、Hive、Phoenix+Hbase中
  1. 實(shí)時(shí)推薦部分技術(shù)處理思路吁系;
  • 從Hive中拿出數(shù)據(jù)
  • 取出測(cè)試數(shù)據(jù)集中數(shù)據(jù)德召,send到Kafka中。
  • 通過(guò)SparkStreaming主動(dòng)Kafka消息隊(duì)列獲取數(shù)據(jù)汽纤,并根據(jù)用戶是否為新用戶制定推薦策略
  • 新用戶上岗,從訓(xùn)練數(shù)據(jù)集中取出瀏覽人數(shù)最多的電影的前5部作為推薦結(jié)果
  • 老用戶,使用推薦模型為用戶推薦5部電影
image.png

4 收獲

1 大數(shù)據(jù)環(huán)境搭建

(1)單機(jī)版Hadoop蕴坪、Spark肴掷、Hive、Mysql的搭建
Spark處理HDFS數(shù)據(jù)背传,并將結(jié)果存儲(chǔ)在Hive中
配置一臺(tái)Hive + Mysql元數(shù)據(jù)庫(kù)

2 數(shù)據(jù)初始預(yù)處理

object ETL {
  def main(args: Array[String]): Unit = {
    val localClusterURL = "local[2]"
    val clusterMasterURL = "spark://s1:7077"
    val conf = new SparkConf().setAppName("ETL").setMaster(clusterMasterURL)
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val hc = new HiveContext(sc)
    import sqlContext.implicits._
    hc.sql("use moive_recommend")

    // 設(shè)置RDD的partition的數(shù)量一般以集群分配給應(yīng)用的CPU核數(shù)的整數(shù)倍為宜呆瞻。
    val minPartitions = 8
    // 通過(guò)case class來(lái)定義Links的數(shù)據(jù)結(jié)構(gòu),數(shù)據(jù)的schema径玖,適用于schama已知的數(shù)據(jù)
    // 也可以通過(guò)StructType的方式痴脾,適用于schema未知的數(shù)據(jù),具體參考文檔:
    //http://spark.apache.org/docs/1.6.2/sql-programming-guide.html#programmatically-specifying-the-schema
    val links = sc.textFile("hdfs://localhost/home/spark/temp/moiveRec/links.txt",minPartitions).filter{ !_.endsWith(",")}.map(_.split(","))
      .map(x =>Links(x(0).trim.toInt,x(1).trim().toInt,x(2).trim().toInt)).toDF()

    val movies = sc.textFile("hdfs://localhost/home/spark/temp/moiveRec/movies.txt",minPartitions).filter{ !_.endsWith(",")}.map(_.split(","))
      .map(x =>Movies(x(0).trim.toInt,x(1).trim(),x(2).trim())).toDF()

    val ratings = sc.textFile("hdfs://localhost/home/spark/temp/moiveRec/ratings.txt",minPartitions).filter{ !_.endsWith(",")}.map(_.split(","))
      .map(x =>Ratings(x(0).trim.toInt,x(1).trim().toInt,x(2).trim().toDouble,x(3).trim().toInt)).toDF()

    val tags = sc.textFile("hdfs://localhost/home/spark/temp/moiveRec/tags.txt", minPartitions).filter { !_.endsWith(",") }.map(x=>rebuild(x))
      .map(_.split(",")).map(x => Tags(x(0).trim().toInt, x(1).trim().toInt, x(2).trim(), x(3).trim().toInt)).toDF()


    links.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/links")
    hc.sql("drop table if exists links")
    hc.sql("create table if not exists links(movieId int,imdbId int,tmdbId int) stored as parquet" )
    hc.sql("load data inpath '/home/spark/temp/moiveRec/links' overwrite into table links")


    movies.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/movies")
    hc.sql("drop table if exists movies")
    hc.sql("create table if not exists movies(movieId int,title string,genres string) stored as parquet" )
    hc.sql("load data inpath '/home/spark/temp/moiveRec/movies' overwrite into table movies")


    ratings.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/ratings")
    hc.sql("drop table if exists ratings")
    hc.sql("create table if not exists ratings(userId int,movieId int,rating double,timestamp int) stored as parquet" )
    hc.sql("load data inpath '/home/spark/temp/moiveRec/ratings' overwrite into table ratings")


    tags.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/tags")
    hc.sql("drop table if exists tags")
    hc.sql("create table if not exists tags(userId int,movieId int,tag string,timestamp int) stored as parquet")
    hc.sql("load data inpath '/home/spark/temp/moiveRec/tags' overwrite into table tags")
  }

  // tags中大部分?jǐn)?shù)據(jù)格式如下:
  //    4208,260,Action-packed,1438012536
  // 但會(huì)出現(xiàn)如下的數(shù)據(jù):
  //    4208,260,"Family,Action-packed",1438012562
  // 這樣對(duì)數(shù)據(jù)split后插入hive中就會(huì)出錯(cuò),需清洗數(shù)據(jù):
  //    4208,260,"Family,Action-packed",1438012562 => 4208,260,FamilyAction-packed,1438012562
  private def rebuild(input:String):String = {
    val a = input.split(",")
    val head = a.take(2).mkString(",")//提取列表的前2個(gè)元素
    val tail = a.takeRight(1).mkString//提取列表的最后1個(gè)元素
    val b = a.drop(2).dropRight(1).mkString.replace("\"", "")
    val output = head + "," + b + "," + tail
    output
  }
}

3 Hive的使用

配置一臺(tái)Hive + Mysql元數(shù)據(jù)庫(kù)

4 SparkMLlib機(jī)器學(xué)習(xí)算法庫(kù)的使用

/**
  * KafkaProducer從測(cè)試數(shù)據(jù)集中取出數(shù)據(jù)
  */
object Spark_MovieTraining extends AppConf {
  def main(args: Array[String]): Unit = {
    hc.sql("use moive_recommend")
    // 訓(xùn)練集挺狰,總數(shù)據(jù)集的60%
    val trainingData = hc.sql("select * from trainingData")
    val ratingRDD = hc.sql("select * from trainingData")
      .rdd.map(x => Rating(x.getInt(0), x.getInt(1), x.getDouble(2)))

    // Build the recommendation model using ALS
    val rank = 10
    val numIterations = 10
    val model = ALS.train(ratingRDD, rank, numIterations, 0.01)

    // Evaluate the model on rating data
    val training = ratingRDD.map {
      case Rating(userid, movieid, rating) => (userid, movieid)
    }

    ratingRDD.persist()
    training.persist()

    val predictions =
      model.predict(training).map {
        case Rating(userid, movieid, rating) => ((userid, movieid), rating)
      }

    val ratesAndPreds = ratingRDD.map { case Rating(userid, movieid, rating) =>
      ((userid, movieid), rating)
    }.join(predictions)


    val MSE = ratesAndPreds.map { case ((userid, movieid), (r1, r2)) =>
      val err = (r1 - r2)
      err * err
    }.mean()

    println(s"Mean Squared Error = $MSE")

    // Save and load model
    model.save(sc, s"/home/spark/temp/moiveRec/BestModel1/$MSE")
    //val sameModel = MatrixFactorizationModel.load(sc, "/home/spark/temp/moiveRec/BestModel/")
  }
}

5 實(shí)時(shí)推薦部分Kafka + Streaming + Phoenix+Hbase流處理

object KafkaProducer extends AppConf {

  def main(args: Array[String]): Unit = {
    hc.sql("use moive_recommend")
    val testDF = hc.sql("select * from testData limit 10000")
    val prop = new Properties()
    // 指定kafka的 ip地址:端口號(hào)
    prop.put("bootstrap.servers", "s1:9092")
    // 設(shè)定ProducerRecord發(fā)送的key值為String類型
    prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    // 設(shè)定ProducerRecord發(fā)送的value值為String類型
    prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    val topic = "movie"
    val testData = testDF.map(
      x => (topic, x.getInt(0).toString() + "," + x.getInt(1).toString + "," + x.getDouble(2).toString())
    )
    val producer = new KafkaProducer[String, String](prop)
    // 如果服務(wù)器內(nèi)存不夠明郭,會(huì)出現(xiàn)OOM錯(cuò)誤
    val messages = testData.toLocalIterator
    while (messages.hasNext) {
      val message = messages.next()
      val record = new ProducerRecord[String, String](topic, message._1, message._2)
      println(record)
      producer.send(record)
      // 延遲10毫秒
      Thread.sleep(10)
    }
    producer.close()
  }
}
/**
  * 接收kafka產(chǎn)生的數(shù)據(jù),進(jìn)行處理
  */
object SparkDirectStream {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkDirectStream").setMaster("spark://s1:7077")
    // Duration對(duì)象中封裝了時(shí)間的一個(gè)對(duì)象丰泊,它的單位是ms.
    val batchDuration = new Duration(5000)
    // batchDuration為時(shí)間間隔
    val ssc = new StreamingContext(conf, batchDuration)
    val hc = new HiveContext(ssc.sparkContext)

    // 訓(xùn)練數(shù)據(jù)中是否有該用戶
    val validusers = hc.sql("select * from trainingData")
    val userlist = validusers.select("userId")

    val modelpath = "/home/spark/temp/moiveRec/BestModel1/0.5366434001808432"
    val broker = "s1:9092"

    // val topics = "movie".split(",").toSet
    val topics = Set("movie")
    // val kafkaParams = Map("bootstrap.servers" -> "spark1:9092")
    val kafkaParams = Map("metadata.broker.list" -> "s1:9092")

    def exist(u: Int): Boolean = {
      val userlist = hc.sql("select distinct(userid) from trainingdata").rdd.map(x => x.getInt(0)).toArray()
      userlist.contains(u)
    }

    // 為沒(méi)有登錄的用戶推薦電影的策略:
    // 1.推薦觀看人數(shù)較多的電影薯定,采用這種策略
    // 2.推薦最新的電影
    val defaultrecresult = hc.sql("select * from pop5result").rdd.toLocalIterator

    // 創(chuàng)建SparkStreaming接收kafka消息隊(duì)列數(shù)據(jù)的2種方式
    // 一種是Direct approache,通過(guò)SparkStreaming自己主動(dòng)去Kafka消息隊(duì)
    // 列中查詢還沒(méi)有接收進(jìn)來(lái)的數(shù)據(jù),并把他們拿到sparkstreaming中瞳购。
    val kafkaDirectStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

    val model = MatrixFactorizationModel.load(ssc.sparkContext, modelpath)

    val messages = kafkaDirectStream.foreachRDD { rdd =>
//      println(rdd)
      val userrdd = rdd.map(x => x._2.split(",")).map(x => x(1)).map(_.toInt)
      val validusers = userrdd.filter(user => exist(user))
      val newusers = userrdd.filter(user => !exist(user))
      // 采用迭代器的方式來(lái)避開(kāi)對(duì)象不能序列化的問(wèn)題话侄。
      // 通過(guò)對(duì)RDD中的每個(gè)元素實(shí)時(shí)產(chǎn)生推薦結(jié)果,將結(jié)果寫(xiě)入到redis,或者其他高速緩存中年堆,來(lái)達(dá)到一定的實(shí)時(shí)性吞杭。
      // 2個(gè)流的處理分成2個(gè)sparkstreaming的應(yīng)用來(lái)處理。
      val validusersIter = validusers.toLocalIterator
      val newusersIter = newusers.toLocalIterator
      while (validusersIter.hasNext) {
        val recresult = model.recommendProducts(validusersIter.next, 5)
        println("below movies are recommended for you :")
        println(recresult)
      }
      while (newusersIter.hasNext) {
        println("below movies are recommended for you :")
        for (i <- defaultrecresult) {
          println(i.getString(0))
        }
      }
    }
    ssc.start()
    ssc.awaitTermination()
  }
}

5 問(wèn)題

依舊不熟悉scala語(yǔ)言变丧,在使用Spark時(shí)很多東西依舊不知道

參考文獻(xiàn)

http://www.dajiangtai.com/course/56.do


end

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末芽狗,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子痒蓬,更是在濱河造成了極大的恐慌童擎,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,884評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件攻晒,死亡現(xiàn)場(chǎng)離奇詭異顾复,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)鲁捏,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,755評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)芯砸,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人给梅,你說(shuō)我怎么就攤上這事假丧。” “怎么了破喻?”我有些...
    開(kāi)封第一講書(shū)人閱讀 158,369評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵虎谢,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我曹质,道長(zhǎng)婴噩,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,799評(píng)論 1 285
  • 正文 為了忘掉前任羽德,我火速辦了婚禮几莽,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘宅静。我一直安慰自己章蚣,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,910評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布姨夹。 她就那樣靜靜地躺著纤垂,像睡著了一般。 火紅的嫁衣襯著肌膚如雪磷账。 梳的紋絲不亂的頭發(fā)上峭沦,一...
    開(kāi)封第一講書(shū)人閱讀 50,096評(píng)論 1 291
  • 那天,我揣著相機(jī)與錄音逃糟,去河邊找鬼吼鱼。 笑死蓬豁,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的菇肃。 我是一名探鬼主播地粪,決...
    沈念sama閱讀 39,159評(píng)論 3 411
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼琐谤!你這毒婦竟也來(lái)了蟆技?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,917評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤笑跛,失蹤者是張志新(化名)和其女友劉穎付魔,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體飞蹂,經(jīng)...
    沈念sama閱讀 44,360評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,673評(píng)論 2 327
  • 正文 我和宋清朗相戀三年翻屈,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了陈哑。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,814評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡伸眶,死狀恐怖惊窖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情厘贼,我是刑警寧澤界酒,帶...
    沈念sama閱讀 34,509評(píng)論 4 334
  • 正文 年R本政府宣布,位于F島的核電站嘴秸,受9級(jí)特大地震影響毁欣,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜岳掐,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,156評(píng)論 3 317
  • 文/蒙蒙 一凭疮、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧串述,春花似錦执解、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,882評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至觅赊,卻和暖如春右蕊,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背茉兰。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,123評(píng)論 1 267
  • 我被黑心中介騙來(lái)泰國(guó)打工尤泽, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,641評(píng)論 2 362
  • 正文 我出身青樓坯约,卻偏偏與公主長(zhǎng)得像熊咽,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子闹丐,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,728評(píng)論 2 351