一粉臊、問題
對(duì)實(shí)時(shí)流量日志過濾篩選商戶流量是牢,對(duì)每個(gè)商戶的流量進(jìn)行累計(jì)联喘,統(tǒng)計(jì)商戶實(shí)時(shí)累計(jì)流量。
當(dāng)時(shí)間超過24時(shí)時(shí)官册,重新統(tǒng)計(jì)當(dāng)日商戶的實(shí)時(shí)累計(jì)流量。
二难捌、實(shí)現(xiàn)步驟
1膝宁、采用Spark Streaming讀取Kafka中的實(shí)時(shí)日志流,生成DStream
2根吁、過濾其中的商戶頁流量员淫,生成DStream[k,v] (注:k為shopid, v為pv)
3、采用Spark Streaming中DStream[k,v]的mapWithState方法生成商戶累計(jì)流量MapWithStateDStream
4击敌、通過調(diào)用StreamingContext中的awaitTerminationOrTimeout(time) 方法設(shè)置當(dāng)前StreamingContext的終止時(shí)間實(shí)現(xiàn)在每天24時(shí)終止所有上述DStream計(jì)算介返。
5、調(diào)用StreamingContext中的stop方法沃斤,終止StreamingContext圣蝎。調(diào)用stop方法默認(rèn)會(huì)終止SparkContext,設(shè)置stop(stopSparkContext:Boolean = false,stopGracefully:Boolean = true)參數(shù)衡瓶,可以實(shí)現(xiàn)不終止SparkContext徘公,同時(shí)能夠保持StreamingContext已經(jīng)接受的Batch能夠處理完成后再終止StreamingContext
6、重復(fù)1~5哮针,即可以再次日0時(shí)自動(dòng)生成新的StreamingContext統(tǒng)計(jì)當(dāng)日商戶累計(jì)流量
三关面、案例代碼
package com.demo.data
import java.util
import java.util.Date
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
import org.apache.spark.{Logging, SparkConf}
import com.demo.data.kafka.KafkaService
import com.demo.data.util.Constants
/**
* Created by phycsgy on 17/2/13.
*/
object KafkaToRedis extends App with Logging{
val conf = new SparkConf().setAppName("SparkStreamingKafka")
val sc = new SparkContext(conf)
//過濾商戶頁P(yáng)V流量
def shopTrafficFilter(log:String):Boolean = {
(log contains "\"element_id\":\"pageview\"") &
(log contains "\"page_name\":\"shopinfo\"") &
("\"shop_id\":\"[0-9]+\"".r findFirstIn log).nonEmpty
}
//正則表達(dá)式提取shopid
def shopInfoExtract(log:String) = {
val parttern = "\"shop_id\":\"([0-9]+)\"".r
val matchResult = parttern findFirstMatchIn log
Tuple2(matchResult.get.group(1),1)
}
//計(jì)算當(dāng)前時(shí)間距離次日凌晨的時(shí)長(zhǎng)(毫秒數(shù))
def resetTime = {
val now = new Date()
val tomorrowMidnight = new Date(now.getYear,now.getMonth,now.getDate+1)
tomorrowMidnight.getTime - now.getTime
}
//商戶實(shí)時(shí)流量狀態(tài)更新函數(shù)
val mapFuction = (shopid: String, pv: Option[Int], state: State[Int]) => {
val accuSum = pv.getOrElse(0) + state.getOption().getOrElse(0)
val output = (shopid,accuSum)
state.update(accuSum)
output
}
val stateSpec = StateSpec.function(mapFuction)
while(true){
val ssc = new StreamingContext(sc, Seconds(30))
ssc.checkpoint("./")
val kafkaService = new KafkaService
val topicName = "log.traffic_data"
//從kafka讀取日志流
val kafkaStream = kafkaService.getKafkaStream[String, StringDecoder](ssc, topicName, Constants.KAFKA_LARGEST_OFFSET)
//過濾商戶頁實(shí)時(shí)流量
val shopTrafficStream = kafkaStream.map(msg => msg._2).filter(shopTrafficFilter).map(shopInfoExtract)
//生成商戶頁流量實(shí)時(shí)累計(jì)狀態(tài)
val shopTrafficUpdateStateDStream = shopTrafficStream.mapWithState(stateSpec).stateSnapshots()
//展示商戶頁實(shí)時(shí)累計(jì)流量TOP10的商戶
shopTrafficUpdateStateDStream.foreachRDD{
rdd => {
//取TOP10商戶
rdd.top(10)(/*自定義排序方法*/TopElementOrdering)
.foreach(item => println(item))
}
}
ssc.start()
//
ssc.awaitTerminationOrTimeout(resetTime)
ssc.stop(false,true)
}
}