import java.sql.Date
import java.text.SimpleDateFormat
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, SparkSession}
object StatoisticsRecommender {
val MONGO_URI:String ="mongodb://hadoop100:27017/recom3"
? ? val MONGODB_DATABASE:String ="recom3"
? ? val MONGODB_RATING_COLLECTION ="Rating"
? ? val MONGODB_MOVIE_COLLECTION ="Movie"
? ? //驅(qū)動(dòng)
? ? val MONGO_DRVIVE_CLASS:String ="com.mongodb.spark.sql"
? ? //優(yōu)質(zhì)電影
? ? val MONGODB_RATE_MORE_MOVIES_COLLECTION ="RateMoreMovies"
? ? //熱門(mén)電影
? ? val MONGODB_RATE_MORE_MOVIES_RECENTLY_COLLECTION ="RateMoreMoviesRecently"
? ? //平均評(píng)分
? ? val MONGODB_AVERAGE_MOVIES_SCORE_COLLECTION ="AverageMoviesScore"
? ? val MONGODB_GENRES_TOP_MOVIES_COLLECTION ="GenresTopMovies"
? ? def main(args: Array[String]): Unit = {
//使用map封裝參數(shù)
? ? ? val conf =Map("spark.cores" ->"local[2]",
"mongo.uri" ->MONGO_URI,
"mongo.db" ->MONGODB_DATABASE)
//sparkconf
? ? ? val sparkConf =new SparkConf().setAppName("statisticsRecommender").setMaster(conf("spark.cores"))
//sparkSeesion
? ? ? val spark = SparkSession.builder().config(sparkConf).getOrCreate()
implicit val mongoConf =new MongoConfig(conf("mongo.uri"), conf("mongo.db"))
//從mongo 中讀取數(shù)據(jù)
? ? ? //導(dǎo)入sparkSession的隱式轉(zhuǎn)換
? ? ? import spark.implicits._
val ratings = spark.read
.option("uri", mongoConf.uri)
.option("collection",MONGODB_RATING_COLLECTION)
.format(MONGO_DRVIVE_CLASS)
.load()
.as[MoviesRating]
.cache
val movies = spark.read
.option("uri", mongoConf.uri)
.option("collection",MONGODB_MOVIE_COLLECTION)
.format(MONGO_DRVIVE_CLASS)
.load()
.as[Movie]
.cache
//把數(shù)據(jù)注冊(cè)成view[下面用到]
? ? ? ratings.createOrReplaceTempView("ratings")
//分析
? ? ? //1.優(yōu)質(zhì)電影=>總的評(píng)分個(gè)數(shù)最多的電影==>RateMoreMoveies
? ? ? rateMore(spark)
//2.熱門(mén)電影=>一個(gè)月內(nèi)評(píng)分最多的電影==>RateMoreRecentlyMovies
? ? ? rateMoreRecently(spark)
//3.電影的平均評(píng)分==>AverageMovies
? ? ? averageMovieScore(spark, movies)
//4.每類電影topN? ==>GenresTopMovies
? ? ? //關(guān)閉資源
? ? }
/**
? ? * 優(yōu)質(zhì)電影的計(jì)算
? ? *
? ? * @param spark
*/
? ? def rateMore(spark: SparkSession)(implicit mongoConf: MongoConfig): Unit = {
//select mid, count(mid) as count from ratings group by mid order by count desc
? ? ? //根據(jù)業(yè)務(wù)執(zhí)行sqlma
? ? ? val rateMoreDF = spark.sql("select mid, count(mid) as count from ratings group by mid order by count desc")
//把結(jié)果數(shù)據(jù)寫(xiě)入到mongodb對(duì)應(yīng)的表中
? ? ? rateMoreDF
.write
.option("uri", mongoConf.uri)
.option("collection",MONGODB_RATE_MORE_MOVIES_COLLECTION)
.mode("overwrite")
.format(MONGO_DRVIVE_CLASS)
.save()
}
/**
? ? * 熱門(mén)電影:月評(píng)分最多的電影
? ? *
? ? * @param spark
*/
? ? def rateMoreRecently(spark: SparkSession)(implicit mongoConf: MongoConfig): Unit = {
val simpleDateFormat =new SimpleDateFormat("yyyyMM")
//sparkSql自定義函數(shù),用于將時(shí)間戳轉(zhuǎn)化成年月的形式(乘上1000 是將秒為單位的轉(zhuǎn)化成毫秒)
? ? ? spark.udf.register("changDate", (x: Long) => simpleDateFormat.format(new Date(x *1000L)).toLong)
//根據(jù)業(yè)務(wù)執(zhí)行sql
? ? ? val yeahMonthOfRatings = spark.sql("select mid, uid, score, changDate(timestamp) as yeahmonth from ratings")
//將上一步得到的df注冊(cè)成表ymRatings
? ? ? yeahMonthOfRatings.createOrReplaceTempView("ymRatings")
//根據(jù)業(yè)務(wù)執(zhí)行sql
? ? ? val rateMoreRecentlyDF = spark.sql("select mid, count(mid) as count,yeahmonth from ymRatings group by yeahmonth,mid order by yeahmonth desc,count desc")
//將我們的結(jié)果數(shù)據(jù)寫(xiě)入到mongo的RateMoreMoviesRecently表中
? ? ? rateMoreRecentlyDF
.write
.option("uri", mongoConf.uri)
.option("collection",MONGODB_RATE_MORE_MOVIES_RECENTLY_COLLECTION)
.mode("overwrite")
.format(MONGO_DRVIVE_CLASS)
.save()
}
/**
? ? * 計(jì)算電影的平均評(píng)分
? ? *
? ? * @param spark
? ? * @param movies
? ? * @param mongoConf
*/
? ? def averageMovieScore(spark: SparkSession, movies: Dataset[Movie])(implicit mongoConf: MongoConfig): Unit = {
//求出每個(gè)電影的平均評(píng)分
? ? ? val averageMovieScoreDF = spark.sql("select mid, avg(score) as avg from ratings group by mid").cache()
//把結(jié)果數(shù)據(jù)寫(xiě)入到mongo的AverageMoviesScore表中
? ? ? averageMovieScoreDF
.write
.option("uri", mongoConf.uri)
.option("collection",MONGODB_AVERAGE_MOVIES_SCORE_COLLECTION)
.mode("overwrite")
.format(MONGO_DRVIVE_CLASS)
.save()
import spark.implicits._
//電影里面所有的類別,使用list進(jìn)行封裝
? ? ? val genres =List("Action","Adventure","Animation","Comedy","Ccrime","Documentary","Drama","Family","Fantasy","Foreign","History","Horror","Music","Mystery"
? ? ? ? ,"Romance","Science","Tv","Thriller","War","Western")
//把電影里面的類別由list的類型轉(zhuǎn)化成rdd的類型
? ? ? val genresRdd = spark.sparkContext.makeRDD(genres)
// 統(tǒng)計(jì)每種類別最熱電影【每種類別中平均評(píng)分最高的10部電影】
? ? ? val moviesWithSocreDF = movies.join(averageMovieScoreDF,Seq("mid","mid")).select("mid","avg","genres").cache()
//類別.cartesian(電影數(shù)據(jù)集(含平均評(píng)分))
? ? ? val genresTopMovies = genresRdd.cartesian(moviesWithSocreDF.rdd).filter(x => {
xmatch {
//包含的就留下,不包含的就去掉
? ? ? ? ? case (genres, row) => {
row.getAs[String]("genres").toLowerCase().contains(genres.toLowerCase)
}
}
})
// mid avg genres
? ? ? ? .map {
//對(duì)數(shù)據(jù)的格式進(jìn)行一個(gè)調(diào)整
? ? ? ? ? case (genres, row) => {
(genres, (row.getAs[Int]("mid"), row.getAs[Double]("avg")))
}
}//按照電影的類別驚醒分組
? ? ? ? //? ? (key,((),()))
? ? ? ? .groupByKey()
.map {
case (genres, items) => {
GenresRecommendation(genres, items.toList.sortWith(_._2 > _._2).slice(0,10).map(x =>Recommendation(x._1, x._2)))
}
}.toDF
//把結(jié)果數(shù)據(jù)寫(xiě)入到mongo的GenresTopMovies表中
? ? ? genresTopMovies
.write
.option("uri", mongoConf.uri)
.option("collection",MONGODB_GENRES_TOP_MOVIES_COLLECTION)
.mode("overwrite")
.format(MONGO_DRVIVE_CLASS)
.save()
}
/**
? * 推薦項(xiàng)目
? *
? * @param rid 項(xiàng)目ID
? * @param r? 推薦分?jǐn)?shù)
? */
? case class Recommendation(rid: Int, r: Double)
/**
? * 電影種類推薦樣例類
? *
? * @param genres
? * @param recs
*/
? case class GenresRecommendation(genres:String, recs:Seq[Recommendation])
/**
? * MongoDB 配置對(duì)象
? *
? * @param uri MongoDB連接地址
? * @param db? 操作的MongoDB數(shù)據(jù)庫(kù)
? */
? case class MongoConfig(val uri:String,val db:String)
/**
? * Rating Class 電影的評(píng)分類
? *
? * @param uid? ? ? 用戶的ID
? * @param mid? ? ? 電影的ID
? * @param score? ? 用戶為該電影的評(píng)分
? * @param timestamp 用戶為該電影評(píng)分的時(shí)間
? */
? case class MoviesRating(val uid: Int,val mid: Int,val score: Double,val timestamp: Int)
/**
? * Movie Class 電影類
? *
? * @param mid? ? ? 電影的ID
? * @param name? ? ? 電影的名稱
? * @param descri? ? 電影的描述
? * @param timelong? 電影的時(shí)長(zhǎng)
? * @param issue? ? 電影的發(fā)行時(shí)間
? * @param shoot? ? 電影的拍攝時(shí)間
? * @param language? 電影的語(yǔ)言
? * @param genres? ? 電影的類別
? * @param actors? ? 電影的演員
? * @param directors 電影的導(dǎo)演
? */
? case class Movie(val mid: Int,val name:String,val descri:String,val timelong:String,val issue:String,val shoot:String,val language:String,val genres:String,val actors:String,val directors:String)
}