目錄介紹
StreamingContext:核心概念1
官方文檔:帶領(lǐng)著進(jìn)入Spark Streaming的開(kāi)發(fā)
http://spark.apache.org/docs/latest/streaming-programming-guide.html#initializing-streamingcontext
Discretized(離散化) Streams(DStreams):核心概念2
對(duì)DStream的計(jì)算操作葱蝗,底層其實(shí)就是其中包含的RDD進(jìn)行操作比如做map/flatMap
Input DStreams and Receivers:核心概念3
Input DStreams是一種來(lái)自于網(wǎng)絡(luò)服務(wù)器的源源不斷的數(shù)據(jù)流芹橡,每一個(gè)Input DStreams都會(huì)被Receivers解析成一個(gè)對(duì)象赂蠢,并存儲(chǔ)在Spark的內(nèi)存中誓竿,但是除了文件流以外纳猫。
Transformations on DStreams:核心概念4
Output Operations on DStreams:核心概念5
Spark Streaming處理socket數(shù)據(jù)-案例
首先把java對(duì)應(yīng)的spark streaming的java版的jar包配置到項(xiàng)目的pom文件中
地址:https://search.maven.org/artifact/org.apache.spark/spark-sql_2.10/2.1.1/jar
linux往某個(gè)端口輸入數(shù)據(jù):
nc -lk 端口號(hào)
打包我們的java類(lèi)
kafka-test-1.0-SNAPSHOT.jar倔丈,并放在jar文件夾下/home/hadoop/waimai/selfjars/
提交到y(tǒng)arn上運(yùn)行
./spark-submit --class com.lppz.sparkstreaming.NetworkWordCount --master yarn /home/hadoop/waimai/selfjars/kafka-test-1.0-SNAPSHOT.jar 10.101.3.3 9999
Spark Streaming處理文件系統(tǒng)數(shù)據(jù)
文件系統(tǒng)方式讀取數(shù)的注意事項(xiàng):
文件夾可以是本地服務(wù)器文件夾,也可以是任何文件系統(tǒng)地址如:hdfs://namenode:8040/logs/状蜗,S3需五,NFS,ETC等
1轧坎、textFileStream中的文件夾會(huì)被Streaming監(jiān)控起來(lái)宏邮,并且會(huì)被處理。
2、被監(jiān)控的文件夾下的所有文件必須是同一中格式蜜氨。
3械筛、一旦文件被處理之后,即使文件有所變化飒炎,也不會(huì)被再次處理埋哟。也就是說(shuō)僅僅會(huì)被處理一次。
4郎汪、文件夾里的文件越多赤赊,所需要耗費(fèi)的掃描時(shí)間越多,即使文件沒(méi)有被更改過(guò)煞赢。
Spark Streaming帶狀態(tài)的算子:UpdateStateByKey
實(shí)戰(zhàn):
1砍鸠、計(jì)算到目前為止累計(jì)出現(xiàn)的單詞個(gè)數(shù)寫(xiě)入到MySql中
把結(jié)果寫(xiě)入mysql,我們 要用到DStream的output的操作
我自己實(shí)現(xiàn)的一個(gè)簡(jiǎn)陋的樣例耕驰,僅供參考
2、基于Window的統(tǒng)計(jì)
3朦肘、黑名單過(guò)濾
實(shí)現(xiàn)要求:
訪問(wèn)日志==》DStream
20190610饭弓、zhangsan??
20190610、lisi
20190610媒抠、wangwu
黑名單列表==》RDD
lisi弟断、wangwu
==》20190610、zhangsan??
其中l(wèi)isi和wangwu是黑名單趴生,需要過(guò)濾掉
只輸出20190610阀趴、zhangsan
那么我們?nèi)绾伟袲Stream與RDD的數(shù)據(jù)進(jìn)行關(guān)聯(lián)操作吶?
如下:
20190610苍匆、zhangsan??
20190610刘急、lisi
20190610、wangwu
===》轉(zhuǎn)化成(zhangsan:20190610,zhangsan)(lisi:20190610,lisi)(wangwu:20190610,wangwu)
黑名單列表lisi浸踩、wangwu
===》轉(zhuǎn)化成(lisi:true)(wangwu:true)
===>20190610叔汁、zhangsan??
使用leftjoin,就能得到如下列表
===》
(zhagnsan:<20190610,zhangsan>,<true>)
(lisi:<20190610,lisi>,<true>)
(wangwu:<20190610,wangwu>,<false>)
要想拿到20190610,wangwu,只需要===》tuple 1就可以了
接下來(lái)看下我們寫(xiě)的例子检碗。
我們需要用到Transform這個(gè)API