Spark Streaming學(xué)習(xí)六七八章筆記

通過詞頻統(tǒng)計(jì)功能學(xué)習(xí)Spark-submit的使用:

先打開一個(gè)命令窗口輸入nc -lk 9999


然后在另一個(gè)窗口呵哨,spark的bin文件夾下輸入

./spark-submit --master local[2] \

--class org.apache.spark.examples.streaming.NetworkWordCount \

--name NetworkWordCount \

/home/hadoop/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.2.0.jar hadoop000 9999

在netcat窗口輸入a a a a b b之后再spark窗口的流式輸出會(huì)見到詞頻統(tǒng)計(jì)的結(jié)果吏祸。


sparkStreaming工作原理(粗粒度)

Spark Streaming接收到實(shí)時(shí)數(shù)據(jù)流褐墅,把數(shù)據(jù)按照指定的時(shí)間段切成一片片小的數(shù)據(jù)塊循签,然后把小的數(shù)據(jù)塊傳給Spark Engine處理厢钧。


sparkStreaming工作原理(細(xì)粒度)

細(xì)粒度工作原理

首先皿哨,spark應(yīng)用程序運(yùn)行在driver端长酗,driver需要在Executor(電腦)中啟動(dòng)Receiver接收器,接收數(shù)據(jù)流智玻,并且分模塊接收遂唧,可能還會(huì)以副本的方式存儲(chǔ),接收了一個(gè)周期之后吊奢,Executor會(huì)向spark應(yīng)用程序返回接收情況(分塊數(shù)量盖彭,副本數(shù)量等等)應(yīng)用程序會(huì)將任務(wù)分發(fā)到Executor中。

DStream概念:對(duì)DStream進(jìn)行操作,比如map/flatMap,其實(shí)底層會(huì)被翻譯為對(duì)DStream中的每個(gè)RDD都做相同的操作召边,因?yàn)橐粋€(gè)DStream是由不同批次的RDD所構(gòu)成的铺呵。

每一個(gè)輸入流Input DStreamings 都要對(duì)應(yīng)一個(gè)receivers來接收它,Input DStreamings的種類:文件系統(tǒng)隧熙,socket傳輸片挂,Kafka,F(xiàn)lume贞盯。

Output Operation 的種類:print()宴卖,saveAsTextFiles保存到文件系統(tǒng),saveAsHadoopFiles等邻悬。

實(shí)戰(zhàn):spark streaming 處理socket數(shù)據(jù)

object NetworkWorldCount {

? def main(args: Array[String]): Unit = {

? ? val sparkConf = new SparkConf().setMaster("local").setAppName("NetworkWorldCount")

? ? val ssc = new StreamingContext(sparkConf, Seconds(5))

? ? val lines = ssc.socketTextStream("localhost",6789)

? ? val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

? ? result.print()

? ? ssc.start()

? ? ssc.awaitTermination()

? }

}


在另外一個(gè)控制臺(tái)里輸入

nc -lk 6789

a a a a c c c d d d?

結(jié)果:


spark streaming 處理socket數(shù)據(jù)

實(shí)戰(zhàn):spark streaming 處理socket數(shù)據(jù)并寫入mysql數(shù)據(jù)庫(kù)

object ForeachRDDApp {

? def main(args: Array[String]): Unit = {

? ? val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]")

? ? val ssc = new StreamingContext(sparkConf, Seconds(5))

? ? val lines = ssc.socketTextStream("localhost", 6789)

? ? val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

//前幾行不變。随闽。

? ? result.foreachRDD(rdd => {? ? ? ?//循環(huán)每一個(gè)Rdd

? ? ? rdd.foreachPartition(partitionOfRecords => {? //在一個(gè)rdd里循環(huán)每一個(gè)partition

? ? ? ? val connection = createCOnnection()? ? ?//獲取mysql連接

? ? ? ? partitionOfRecords.foreach(record => {? ? ? ? //在每一個(gè)partition里獲取一條記錄

? ? ? ? ? val sql = "insert into wordcount(word, wordcount) values('" + record._1+ "'," + record._2+")"

? ? ? ? ? connection.createStatement().execute(sql)

? ? ? ? })

? ? ? ? connection.close()

? ? ? })

? ? })

? ? ssc.start()

? ? ssc.awaitTermination()

? }

? def createCOnnection() = {

? ? Class.forName("com.mysql.jdbc.Driver")

? ? DriverManager.getConnection("jdbc:mysql://localhost:3306/imooc_spark","root","root")

? }

結(jié)果:


結(jié)果


spark streaming從socket接收數(shù)據(jù)后根據(jù)標(biāo)準(zhǔn)過濾數(shù)據(jù)實(shí)戰(zhàn)(黑名單例子)




//構(gòu)建黑名單

object TransformApp {

? def main(args: Array[String]): Unit = {

? ? val sparkConf = new SparkConf().setAppName("TransformApp").setMaster("local[2]")

? ? val ssc = new StreamingContext(sparkConf, Seconds(5))

//跟前面一樣

? ? val blacks = List("zs","ls")? ? //構(gòu)建黑名單List

? ? val blackRDD = ssc.sparkContext.parallelize(blacks).map(x=>(x,true))? ? ? ? //將List轉(zhuǎn)成(zs,true)的這種RDD類型

? ? val lines = ssc.socketTextStream("localhost", 6789)? ? ? ? //lines是DSTream類型

? ? val clicklog = lines.map(x => (x.split(",")(1),x)).transform(rdd => {? ? ? ?

//lines是這種類型的數(shù)據(jù)(20160410,zs) 根據(jù)逗號(hào)分隔后重整為(zs:20160410,zs),即為(x.split(",")(1),x))的結(jié)果父丰,得到的結(jié)果仍然是RDD類型,transform函數(shù)是將每個(gè)Rdd拿出來操作掘宪。

? ? ? rdd.leftOuterJoin(blackRDD)? ?//每個(gè)rdd都跟blackRDD進(jìn)行l(wèi)eftOuterJoin蛾扇,得到(zs:[<20160410,zs>,<true>])這種類型的數(shù)據(jù)

? ? ? ? .filter(x=> x._2._2.getOrElse(false) != true)? ? ? ? ?//過濾,將參數(shù)的第二個(gè)中的第二個(gè)為true的過濾掉魏滚。

? ? ? ? .map(x =>x._2._1)? ? ? //重整,將結(jié)構(gòu)變?yōu)閞dd中第二個(gè)的第一個(gè)镀首,即為<20160410,zs>

? ? })

? ? clicklog.print()

? ? ssc.start()

? ? ssc.awaitTermination()

? }

}


在nc -lk 6789中輸入

20160410,zs

20160410,ls

20160410,ww

20160410,zs

20160410,ls

20160410,ww

20160410,zs

20160410,ls

20160410,ww

控制臺(tái)輸出


結(jié)果
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市鼠次,隨后出現(xiàn)的幾起案子更哄,更是在濱河造成了極大的恐慌,老刑警劉巖腥寇,帶你破解...
    沈念sama閱讀 216,402評(píng)論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件成翩,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡赦役,警方通過查閱死者的電腦和手機(jī)麻敌,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來掂摔,“玉大人术羔,你說我怎么就攤上這事∫依欤” “怎么了级历?”我有些...
    開封第一講書人閱讀 162,483評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)簇秒。 經(jīng)常有香客問我鱼喉,道長(zhǎng),這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,165評(píng)論 1 292
  • 正文 為了忘掉前任扛禽,我火速辦了婚禮锋边,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘编曼。我一直安慰自己豆巨,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,176評(píng)論 6 388
  • 文/花漫 我一把揭開白布掐场。 她就那樣靜靜地躺著往扔,像睡著了一般。 火紅的嫁衣襯著肌膚如雪熊户。 梳的紋絲不亂的頭發(fā)上萍膛,一...
    開封第一講書人閱讀 51,146評(píng)論 1 297
  • 那天,我揣著相機(jī)與錄音嚷堡,去河邊找鬼蝗罗。 笑死,一個(gè)胖子當(dāng)著我的面吹牛蝌戒,可吹牛的內(nèi)容都是我干的串塑。 我是一名探鬼主播,決...
    沈念sama閱讀 40,032評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼北苟,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼桩匪!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起友鼻,我...
    開封第一講書人閱讀 38,896評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤傻昙,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后桃移,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體屋匕,經(jīng)...
    沈念sama閱讀 45,311評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,536評(píng)論 2 332
  • 正文 我和宋清朗相戀三年借杰,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了过吻。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,696評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡蔗衡,死狀恐怖纤虽,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情绞惦,我是刑警寧澤逼纸,帶...
    沈念sama閱讀 35,413評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站济蝉,受9級(jí)特大地震影響杰刽,放射性物質(zhì)發(fā)生泄漏菠发。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,008評(píng)論 3 325
  • 文/蒙蒙 一贺嫂、第九天 我趴在偏房一處隱蔽的房頂上張望滓鸠。 院中可真熱鬧,春花似錦第喳、人聲如沸糜俗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)悠抹。三九已至,卻和暖如春扩淀,著一層夾襖步出監(jiān)牢的瞬間楔敌,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工驻谆, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留梁丘,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,698評(píng)論 2 368
  • 正文 我出身青樓旺韭,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親掏觉。 傳聞我的和親對(duì)象是個(gè)殘疾皇子区端,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,592評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容