推薦系統(tǒng)之離線模塊

import java.sql.Date

import java.text.SimpleDateFormat

import org.apache.spark.SparkConf

import org.apache.spark.sql.{Dataset, SparkSession}

object StatoisticsRecommender {

val MONGO_URI:String ="mongodb://hadoop100:27017/recom3"

? ? val MONGODB_DATABASE:String ="recom3"

? ? val MONGODB_RATING_COLLECTION ="Rating"

? ? val MONGODB_MOVIE_COLLECTION ="Movie"

? ? //驅(qū)動(dòng)

? ? val MONGO_DRVIVE_CLASS:String ="com.mongodb.spark.sql"

? ? //優(yōu)質(zhì)電影

? ? val MONGODB_RATE_MORE_MOVIES_COLLECTION ="RateMoreMovies"

? ? //熱門(mén)電影

? ? val MONGODB_RATE_MORE_MOVIES_RECENTLY_COLLECTION ="RateMoreMoviesRecently"

? ? //平均評(píng)分

? ? val MONGODB_AVERAGE_MOVIES_SCORE_COLLECTION ="AverageMoviesScore"

? ? val MONGODB_GENRES_TOP_MOVIES_COLLECTION ="GenresTopMovies"

? ? def main(args: Array[String]): Unit = {

//使用map封裝參數(shù)

? ? ? val conf =Map("spark.cores" ->"local[2]",

"mongo.uri" ->MONGO_URI,

"mongo.db" ->MONGODB_DATABASE)

//sparkconf

? ? ? val sparkConf =new SparkConf().setAppName("statisticsRecommender").setMaster(conf("spark.cores"))

//sparkSeesion

? ? ? val spark = SparkSession.builder().config(sparkConf).getOrCreate()

implicit val mongoConf =new MongoConfig(conf("mongo.uri"), conf("mongo.db"))

//從mongo 中讀取數(shù)據(jù)

? ? ? //導(dǎo)入sparkSession的隱式轉(zhuǎn)換

? ? ? import spark.implicits._

val ratings = spark.read

.option("uri", mongoConf.uri)

.option("collection",MONGODB_RATING_COLLECTION)

.format(MONGO_DRVIVE_CLASS)

.load()

.as[MoviesRating]

.cache

val movies = spark.read

.option("uri", mongoConf.uri)

.option("collection",MONGODB_MOVIE_COLLECTION)

.format(MONGO_DRVIVE_CLASS)

.load()

.as[Movie]

.cache

//把數(shù)據(jù)注冊(cè)成view[下面用到]

? ? ? ratings.createOrReplaceTempView("ratings")

//分析

? ? ? //1.優(yōu)質(zhì)電影=>總的評(píng)分個(gè)數(shù)最多的電影==>RateMoreMoveies

? ? ? rateMore(spark)

//2.熱門(mén)電影=>一個(gè)月內(nèi)評(píng)分最多的電影==>RateMoreRecentlyMovies

? ? ? rateMoreRecently(spark)

//3.電影的平均評(píng)分==>AverageMovies

? ? ? averageMovieScore(spark, movies)

//4.每類電影topN? ==>GenresTopMovies

? ? ? //關(guān)閉資源

? ? }

/**

? ? * 優(yōu)質(zhì)電影的計(jì)算

? ? *

? ? * @param spark

*/

? ? def rateMore(spark: SparkSession)(implicit mongoConf: MongoConfig): Unit = {

//select mid, count(mid) as count from ratings group by mid order by count desc

? ? ? //根據(jù)業(yè)務(wù)執(zhí)行sqlma

? ? ? val rateMoreDF = spark.sql("select mid, count(mid) as count from ratings group by mid order by count desc")

//把結(jié)果數(shù)據(jù)寫(xiě)入到mongodb對(duì)應(yīng)的表中

? ? ? rateMoreDF

.write

.option("uri", mongoConf.uri)

.option("collection",MONGODB_RATE_MORE_MOVIES_COLLECTION)

.mode("overwrite")

.format(MONGO_DRVIVE_CLASS)

.save()

}

/**

? ? * 熱門(mén)電影:月評(píng)分最多的電影

? ? *

? ? * @param spark

*/

? ? def rateMoreRecently(spark: SparkSession)(implicit mongoConf: MongoConfig): Unit = {

val simpleDateFormat =new SimpleDateFormat("yyyyMM")

//sparkSql自定義函數(shù),用于將時(shí)間戳轉(zhuǎn)化成年月的形式(乘上1000 是將秒為單位的轉(zhuǎn)化成毫秒)

? ? ? spark.udf.register("changDate", (x: Long) => simpleDateFormat.format(new Date(x *1000L)).toLong)

//根據(jù)業(yè)務(wù)執(zhí)行sql

? ? ? val yeahMonthOfRatings = spark.sql("select mid, uid, score, changDate(timestamp) as yeahmonth from ratings")

//將上一步得到的df注冊(cè)成表ymRatings

? ? ? yeahMonthOfRatings.createOrReplaceTempView("ymRatings")

//根據(jù)業(yè)務(wù)執(zhí)行sql

? ? ? val rateMoreRecentlyDF = spark.sql("select mid, count(mid) as count,yeahmonth from ymRatings group by yeahmonth,mid order by yeahmonth desc,count desc")

//將我們的結(jié)果數(shù)據(jù)寫(xiě)入到mongo的RateMoreMoviesRecently表中

? ? ? rateMoreRecentlyDF

.write

.option("uri", mongoConf.uri)

.option("collection",MONGODB_RATE_MORE_MOVIES_RECENTLY_COLLECTION)

.mode("overwrite")

.format(MONGO_DRVIVE_CLASS)

.save()

}

/**

? ? * 計(jì)算電影的平均評(píng)分

? ? *

? ? * @param spark

? ? * @param movies

? ? * @param mongoConf

*/

? ? def averageMovieScore(spark: SparkSession, movies: Dataset[Movie])(implicit mongoConf: MongoConfig): Unit = {

//求出每個(gè)電影的平均評(píng)分

? ? ? val averageMovieScoreDF = spark.sql("select mid, avg(score) as avg from ratings group by mid").cache()

//把結(jié)果數(shù)據(jù)寫(xiě)入到mongo的AverageMoviesScore表中

? ? ? averageMovieScoreDF

.write

.option("uri", mongoConf.uri)

.option("collection",MONGODB_AVERAGE_MOVIES_SCORE_COLLECTION)

.mode("overwrite")

.format(MONGO_DRVIVE_CLASS)

.save()

import spark.implicits._

//電影里面所有的類別,使用list進(jìn)行封裝

? ? ? val genres =List("Action","Adventure","Animation","Comedy","Ccrime","Documentary","Drama","Family","Fantasy","Foreign","History","Horror","Music","Mystery"

? ? ? ? ,"Romance","Science","Tv","Thriller","War","Western")

//把電影里面的類別由list的類型轉(zhuǎn)化成rdd的類型

? ? ? val genresRdd = spark.sparkContext.makeRDD(genres)

// 統(tǒng)計(jì)每種類別最熱電影【每種類別中平均評(píng)分最高的10部電影】

? ? ? val moviesWithSocreDF = movies.join(averageMovieScoreDF,Seq("mid","mid")).select("mid","avg","genres").cache()

//類別.cartesian(電影數(shù)據(jù)集(含平均評(píng)分))

? ? ? val genresTopMovies = genresRdd.cartesian(moviesWithSocreDF.rdd).filter(x => {

xmatch {

//包含的就留下,不包含的就去掉

? ? ? ? ? case (genres, row) => {

row.getAs[String]("genres").toLowerCase().contains(genres.toLowerCase)

}

}

})

// mid avg genres

? ? ? ? .map {

//對(duì)數(shù)據(jù)的格式進(jìn)行一個(gè)調(diào)整

? ? ? ? ? case (genres, row) => {

(genres, (row.getAs[Int]("mid"), row.getAs[Double]("avg")))

}

}//按照電影的類別驚醒分組

? ? ? ? //? ? (key,((),()))

? ? ? ? .groupByKey()

.map {

case (genres, items) => {

GenresRecommendation(genres, items.toList.sortWith(_._2 > _._2).slice(0,10).map(x =>Recommendation(x._1, x._2)))

}

}.toDF

//把結(jié)果數(shù)據(jù)寫(xiě)入到mongo的GenresTopMovies表中

? ? ? genresTopMovies

.write

.option("uri", mongoConf.uri)

.option("collection",MONGODB_GENRES_TOP_MOVIES_COLLECTION)

.mode("overwrite")

.format(MONGO_DRVIVE_CLASS)

.save()

}

/**

? * 推薦項(xiàng)目

? *

? * @param rid 項(xiàng)目ID

? * @param r? 推薦分?jǐn)?shù)

? */

? case class Recommendation(rid: Int, r: Double)

/**

? * 電影種類推薦樣例類

? *

? * @param genres

? * @param recs

*/

? case class GenresRecommendation(genres:String, recs:Seq[Recommendation])

/**

? * MongoDB 配置對(duì)象

? *

? * @param uri MongoDB連接地址

? * @param db? 操作的MongoDB數(shù)據(jù)庫(kù)

? */

? case class MongoConfig(val uri:String,val db:String)

/**

? * Rating Class 電影的評(píng)分類

? *

? * @param uid? ? ? 用戶的ID

? * @param mid? ? ? 電影的ID

? * @param score? ? 用戶為該電影的評(píng)分

? * @param timestamp 用戶為該電影評(píng)分的時(shí)間

? */

? case class MoviesRating(val uid: Int,val mid: Int,val score: Double,val timestamp: Int)

/**

? * Movie Class 電影類

? *

? * @param mid? ? ? 電影的ID

? * @param name? ? ? 電影的名稱

? * @param descri? ? 電影的描述

? * @param timelong? 電影的時(shí)長(zhǎng)

? * @param issue? ? 電影的發(fā)行時(shí)間

? * @param shoot? ? 電影的拍攝時(shí)間

? * @param language? 電影的語(yǔ)言

? * @param genres? ? 電影的類別

? * @param actors? ? 電影的演員

? * @param directors 電影的導(dǎo)演

? */

? case class Movie(val mid: Int,val name:String,val descri:String,val timelong:String,val issue:String,val shoot:String,val language:String,val genres:String,val actors:String,val directors:String)

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子铣焊,更是在濱河造成了極大的恐慌,老刑警劉巖玄妈,帶你破解...
    沈念sama閱讀 216,919評(píng)論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異髓梅,居然都是意外死亡拟蜻,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,567評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)枯饿,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)酝锅,“玉大人,你說(shuō)我怎么就攤上這事鸭你∏牛” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,316評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵袱巨,是天一觀的道長(zhǎng)阁谆。 經(jīng)常有香客問(wèn)我,道長(zhǎng)愉老,這世上最難降的妖魔是什么场绿? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,294評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮嫉入,結(jié)果婚禮上焰盗,老公的妹妹穿的比我還像新娘。我一直安慰自己咒林,他們只是感情好熬拒,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,318評(píng)論 6 390
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著垫竞,像睡著了一般澎粟。 火紅的嫁衣襯著肌膚如雪蛀序。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,245評(píng)論 1 299
  • 那天活烙,我揣著相機(jī)與錄音徐裸,去河邊找鬼。 笑死啸盏,一個(gè)胖子當(dāng)著我的面吹牛重贺,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播回懦,決...
    沈念sama閱讀 40,120評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼气笙,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了粉怕?” 一聲冷哼從身側(cè)響起健民,我...
    開(kāi)封第一講書(shū)人閱讀 38,964評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎贫贝,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體蛉谜,經(jīng)...
    沈念sama閱讀 45,376評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡稚晚,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,592評(píng)論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了型诚。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片客燕。...
    茶點(diǎn)故事閱讀 39,764評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖狰贯,靈堂內(nèi)的尸體忽然破棺而出也搓,到底是詐尸還是另有隱情,我是刑警寧澤涵紊,帶...
    沈念sama閱讀 35,460評(píng)論 5 344
  • 正文 年R本政府宣布傍妒,位于F島的核電站,受9級(jí)特大地震影響摸柄,放射性物質(zhì)發(fā)生泄漏颤练。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,070評(píng)論 3 327
  • 文/蒙蒙 一驱负、第九天 我趴在偏房一處隱蔽的房頂上張望嗦玖。 院中可真熱鬧,春花似錦跃脊、人聲如沸宇挫。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,697評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)器瘪。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間娱局,已是汗流浹背彰亥。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,846評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留衰齐,地道東北人任斋。 一個(gè)月前我還...
    沈念sama閱讀 47,819評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像耻涛,于是被迫代替她去往敵國(guó)和親废酷。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,665評(píng)論 2 354