sparkstreaming和kafka整合

Spark Streaming提供了一個(gè)叫做DStream(Discretized Stream)的高級(jí)抽象,DStream表示一個(gè)持續(xù)不斷輸入的數(shù)據(jù)流琉兜,可以基于Kafka、TCP Socket、Flume等輸入數(shù)據(jù)流創(chuàng)建葱绒。在內(nèi)部,一個(gè)DStream實(shí)際上是由一個(gè)RDD序列組成的斗锭。

Spark Streaming提供了一個(gè)叫做DStream(Discretized Stream)的高級(jí)抽象地淀,DStream表示一個(gè)持續(xù)不斷輸入的數(shù)據(jù)流,可以基于Kafka岖是、TCP Socket帮毁、Flume等輸入數(shù)據(jù)流創(chuàng)建。在內(nèi)部豺撑,一個(gè)DStream實(shí)際上是由一個(gè)RDD序列組成的烈疚。Sparking Streaming是基于Spark平臺(tái)的,也就繼承了Spark平臺(tái)的各種特性聪轿,如容錯(cuò)(Fault-tolerant)爷肝、可擴(kuò)展(Scalable)、高吞吐(High-throughput)等屹电。

在Spark Streaming中阶剑,每個(gè)DStream包含了一個(gè)時(shí)間間隔之內(nèi)的數(shù)據(jù)項(xiàng)的集合,我們可以理解為指定時(shí)間間隔之內(nèi)的一個(gè)batch危号,每一個(gè)batch就構(gòu)成一個(gè)RDD數(shù)據(jù)集牧愁,所以DStream就是一個(gè)個(gè)batch的有序序列,時(shí)間是連續(xù)的外莲,按照時(shí)間間隔將數(shù)據(jù)流分割成一個(gè)個(gè)離散的RDD數(shù)據(jù)集.

其實(shí)就是一個(gè)定時(shí)去kafka中消費(fèi)數(shù)據(jù)的定時(shí)器猪半,只不過(guò)數(shù)據(jù)是保存在rdd中而已兔朦。

object UserClickCountAnalytics {

def main(args: Array[String]): Unit = {

var masterUrl = "local[1]"

if (args.length > 0) {

masterUrl = args(0)

}

// Create a StreamingContext with the given master URL

val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCountStat")

val ssc = new StreamingContext(conf, Seconds(5))

// Kafka configurations

val topics = Set("user_events")

val brokers = "10.10.4.126:9092,10.10.4.127:9092"

val kafkaParams = Map[String, String](

"metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")

val dbIndex = 1

val clickHashKey = "app::users::click"

--------------------------------------------------------------------------

// Create a direct stream

val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

val events = kafkaStream.flatMap(line => {

val data = JSONObject.fromObject(line._2)

Some(data)

})

//也可以這樣建立stream對(duì)象

val ssc:StreamingContext=???

val kafkaParams:Map[String,String]=Map("group.id"->"terran",...)

val numDStreams=5

val topics=Map("zerg.hydra"->1)

val kafkaDStreams=(1to numDStreams).map{_=>KafkaUtils.createStream(ssc,kafkaParams,topics,...)}

//> collection of five *input* DStreams = handled by five receivers/tasks

val unionDStream=ssc.union(kafkaDStreams)// often unnecessary, just showcasing how to do it

//> single?DStream

val processingParallelism=20

val processingDStream=unionDStream(processingParallelism)

//> single DStream but now with 20 partitions

---------------------------------------------------------------------

// Compute user click times

/**? ??

?userClicks.foreachRDD拿到的是微批處理一個(gè)批次數(shù)據(jù)? ?

?rdd.foreachPartition拿到的是一個(gè)批次在Spark各節(jié)點(diǎn)對(duì)應(yīng)的分區(qū)數(shù)據(jù)? ? partitionOfRecords.foreach拿到對(duì)應(yīng)分區(qū)的每條數(shù)據(jù) */

val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _)

//通過(guò)foreachRDD來(lái)保存處理結(jié)果

userClicks.foreachRDD(rdd => {

rdd.foreachPartition(partitionOfRecords => {

partitionOfRecords.foreach(pair => {

val uid = pair._1

val clickCount = pair._2

val jedis = RedisClient.pool.getResource

jedis.select(dbIndex)

jedis.hincrBy(clickHashKey, uid, clickCount)

RedisClient.pool.returnResource(jedis)

})

})

})

ssc.start()

ssc.awaitTermination()

}

}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市磨确,隨后出現(xiàn)的幾起案子沽甥,更是在濱河造成了極大的恐慌,老刑警劉巖乏奥,帶你破解...
    沈念sama閱讀 222,252評(píng)論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件摆舟,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡邓了,警方通過(guò)查閱死者的電腦和手機(jī)恨诱,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)骗炉,“玉大人照宝,你說(shuō)我怎么就攤上這事【淇” “怎么了厕鹃?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,814評(píng)論 0 361
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)乍丈。 經(jīng)常有香客問(wèn)我剂碴,道長(zhǎng),這世上最難降的妖魔是什么诗赌? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,869評(píng)論 1 299
  • 正文 為了忘掉前任汗茄,我火速辦了婚禮,結(jié)果婚禮上铭若,老公的妹妹穿的比我還像新娘洪碳。我一直安慰自己,他們只是感情好叼屠,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,888評(píng)論 6 398
  • 文/花漫 我一把揭開(kāi)白布瞳腌。 她就那樣靜靜地躺著,像睡著了一般镜雨。 火紅的嫁衣襯著肌膚如雪嫂侍。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 52,475評(píng)論 1 312
  • 那天荚坞,我揣著相機(jī)與錄音挑宠,去河邊找鬼。 笑死颓影,一個(gè)胖子當(dāng)著我的面吹牛各淀,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播诡挂,決...
    沈念sama閱讀 41,010評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼碎浇,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼临谱!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起奴璃,我...
    開(kāi)封第一講書(shū)人閱讀 39,924評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤悉默,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后苟穆,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體抄课,經(jīng)...
    沈念sama閱讀 46,469評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,552評(píng)論 3 342
  • 正文 我和宋清朗相戀三年雳旅,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了剖膳。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,680評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡岭辣,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出甸饱,到底是詐尸還是另有隱情沦童,我是刑警寧澤,帶...
    沈念sama閱讀 36,362評(píng)論 5 351
  • 正文 年R本政府宣布叹话,位于F島的核電站偷遗,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏驼壶。R本人自食惡果不足惜氏豌,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,037評(píng)論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望热凹。 院中可真熱鬧泵喘,春花似錦、人聲如沸般妙。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,519評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)碟渺。三九已至鲜锚,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間苫拍,已是汗流浹背芜繁。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,621評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留绒极,地道東北人骏令。 一個(gè)月前我還...
    沈念sama閱讀 49,099評(píng)論 3 378
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像集峦,于是被迫代替她去往敵國(guó)和親伏社。 傳聞我的和親對(duì)象是個(gè)殘疾皇子抠刺,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,691評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容