Spark Streaming提供了一個(gè)叫做DStream(Discretized Stream)的高級(jí)抽象,DStream表示一個(gè)持續(xù)不斷輸入的數(shù)據(jù)流琉兜,可以基于Kafka、TCP Socket、Flume等輸入數(shù)據(jù)流創(chuàng)建葱绒。在內(nèi)部,一個(gè)DStream實(shí)際上是由一個(gè)RDD序列組成的斗锭。
Spark Streaming提供了一個(gè)叫做DStream(Discretized Stream)的高級(jí)抽象地淀,DStream表示一個(gè)持續(xù)不斷輸入的數(shù)據(jù)流,可以基于Kafka岖是、TCP Socket帮毁、Flume等輸入數(shù)據(jù)流創(chuàng)建。在內(nèi)部豺撑,一個(gè)DStream實(shí)際上是由一個(gè)RDD序列組成的烈疚。Sparking Streaming是基于Spark平臺(tái)的,也就繼承了Spark平臺(tái)的各種特性聪轿,如容錯(cuò)(Fault-tolerant)爷肝、可擴(kuò)展(Scalable)、高吞吐(High-throughput)等屹电。
在Spark Streaming中阶剑,每個(gè)DStream包含了一個(gè)時(shí)間間隔之內(nèi)的數(shù)據(jù)項(xiàng)的集合,我們可以理解為指定時(shí)間間隔之內(nèi)的一個(gè)batch危号,每一個(gè)batch就構(gòu)成一個(gè)RDD數(shù)據(jù)集牧愁,所以DStream就是一個(gè)個(gè)batch的有序序列,時(shí)間是連續(xù)的外莲,按照時(shí)間間隔將數(shù)據(jù)流分割成一個(gè)個(gè)離散的RDD數(shù)據(jù)集.
其實(shí)就是一個(gè)定時(shí)去kafka中消費(fèi)數(shù)據(jù)的定時(shí)器猪半,只不過(guò)數(shù)據(jù)是保存在rdd中而已兔朦。
object UserClickCountAnalytics {
def main(args: Array[String]): Unit = {
var masterUrl = "local[1]"
if (args.length > 0) {
masterUrl = args(0)
}
// Create a StreamingContext with the given master URL
val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCountStat")
val ssc = new StreamingContext(conf, Seconds(5))
// Kafka configurations
val topics = Set("user_events")
val brokers = "10.10.4.126:9092,10.10.4.127:9092"
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")
val dbIndex = 1
val clickHashKey = "app::users::click"
--------------------------------------------------------------------------
// Create a direct stream
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val events = kafkaStream.flatMap(line => {
val data = JSONObject.fromObject(line._2)
Some(data)
})
//也可以這樣建立stream對(duì)象
val ssc:StreamingContext=???
val kafkaParams:Map[String,String]=Map("group.id"->"terran",...)
val numDStreams=5
val topics=Map("zerg.hydra"->1)
val kafkaDStreams=(1to numDStreams).map{_=>KafkaUtils.createStream(ssc,kafkaParams,topics,...)}
//> collection of five *input* DStreams = handled by five receivers/tasks
val unionDStream=ssc.union(kafkaDStreams)// often unnecessary, just showcasing how to do it
//> single?DStream
val processingParallelism=20
val processingDStream=unionDStream(processingParallelism)
//> single DStream but now with 20 partitions
---------------------------------------------------------------------
// Compute user click times
/**? ??
?userClicks.foreachRDD拿到的是微批處理一個(gè)批次數(shù)據(jù)? ?
?rdd.foreachPartition拿到的是一個(gè)批次在Spark各節(jié)點(diǎn)對(duì)應(yīng)的分區(qū)數(shù)據(jù)? ? partitionOfRecords.foreach拿到對(duì)應(yīng)分區(qū)的每條數(shù)據(jù) */
val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _)
//通過(guò)foreachRDD來(lái)保存處理結(jié)果
userClicks.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
partitionOfRecords.foreach(pair => {
val uid = pair._1
val clickCount = pair._2
val jedis = RedisClient.pool.getResource
jedis.select(dbIndex)
jedis.hincrBy(clickHashKey, uid, clickCount)
RedisClient.pool.returnResource(jedis)
})
})
})
ssc.start()
ssc.awaitTermination()
}
}