實(shí)現(xiàn):
首先基于topic,創(chuàng)建出kafka的DStream流
val sparkConf = new SparkConf().setAppName(appParams.appName)
val sc = new SparkContext(sparkConf)
val streamingContext = new StreamingContext(sc, Seconds(appParams.batchProcInterval))
val kafkaParams = Map[String, String]("metadata.broker.list" -> appParams.brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, Set[String](appParams.messageInTopic))
創(chuàng)建時(shí)間窗:
val windows = messages.map(_._2).window(Seconds(appParams.windownTime), Seconds(appParams.windownTime))
針對(duì)每個(gè)時(shí)間窗做計(jì)算
windows.foreachRDD { rdd =>
......
}
每個(gè)時(shí)間窗內(nèi)部的處理:
創(chuàng)建case class
case class Record(channelid: String, starttime: Long)
創(chuàng)建sqlContext
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._
將當(dāng)前消息流轉(zhuǎn)換成DataFrame:
val df = rdd.map(_.split("\\|")).map(line => Record(line(5), line(2).toLong)).toDF()
注冊(cè)成一張表:
df.registerTempTable("UserPlayChannel")
讀取parquet數(shù)據(jù)秽晚,注冊(cè)成另一張表:
val paraquetFile = sqlContext.read.parquet(filePath)
paraquetFile.registerTempTable("ProgramMetaData")
現(xiàn)在有了兩張表屡律,關(guān)聯(lián)查詢(xún)只需要寫(xiě)好sql語(yǔ)句就可以了宙拉,樣例:
select b.programid , count(b.programid) as count from UserPlayChannel a join ProgramMetaData b on a.channelid = b.channelid and a.starttime >= b.starttime and a.starttime <= b.endtime group by b.programid order by count DESC
代碼執(zhí)行:
val hotProgramList = sqlContext.sql("select b.programid , count(b.programid) as count from UserPlayChannel a join ProgramMetaData b on a.channelid = b.channelid and a.starttime >= b.starttime and a.starttime <= b.endtime group by b.programid order by count DESC")
現(xiàn)在hotProgramList就是關(guān)聯(lián)查詢(xún)出的結(jié)果谢揪。