本文內(nèi)容基于Spark最新版1.6.1
Spark 最初只有Spark Core槐瑞,通過逐步的發(fā)展悼沿,現(xiàn)在已擴展出Spark SQL、Spark Streaming、Spark MLlib(machine learning)、GraphX(graph)、Spark R等。 而Spark Streaming本是Spark Core上的一個子框架,如果我們試著去精通這個子框架蠕搜,不僅僅能寫出非常復(fù)雜的應(yīng)用程序,還能夠很好的駕馭Spark掉伏,進而研究并達到精通Spark的地步,及其尋找到Spark問題的解決之道箍镜。
我們?yōu)槭裁磸腟park Streaming切入研究Spark源碼的定制薪夕,因為Spark SQL涉及到很多SQL語法解析和優(yōu)化的細節(jié)埂淮,對于我們集中精力研究Spark有所干擾;Spark R還不是很成熟痪蝇,支持功能有限;GraphX最近幾個版本基本沒有改進耙册,里面有許多數(shù)學(xué)算法;MLlib也涉及到相當(dāng)多的數(shù)學(xué)知識饶辙。
Spark Streaming的優(yōu)勢是在于可以結(jié)合SparkSQL蹋宦、圖計算蒿辙、機器學(xué)習(xí),使其功能更加強大。同時在Spark中Spark Streaming也是最容易出現(xiàn)問題的,因為它是不斷的運行,內(nèi)部比較復(fù)雜唉擂。掌握好Spark Streaming空扎,可以去窺視Spark的一切撮慨!
Spark Streaming到底是什么规伐? Spark Streaming是一個流式計算框架,運行在Spark Core之上吵护。這是一個流處理的時代用爪,一切數(shù)據(jù)如果不是以流式來處理或者跟流式的處理不相關(guān)的話颇玷,都將是次數(shù)據(jù)谒亦,我們必將處在一個流的數(shù)據(jù)處理時代锁摔。Spark Streaming很像是基于Spark Core之上的一個應(yīng)用程序十气。不像其他子框架籍胯,比如機器學(xué)習(xí)是把數(shù)學(xué)算法直接應(yīng)用在Spark的RDD之上妖爷,Spark Streaming更像一般的應(yīng)用程序那樣,感知流進來的數(shù)據(jù)并進行相應(yīng)的處理挪圾。很像順其自然的一種感知操作棚赔,利用自己獨有的“神經(jīng)元”來對數(shù)據(jù)進行各類操作捆毫。
Spark Streaming的幾大優(yōu)點
- 對源源不斷流進來的數(shù)據(jù)濒憋,能夠迅速響應(yīng)并立即給出你所要是反饋信息
- Spark非常強大的地方在于它的流式處理可以在線的利用機器學(xué)習(xí)黔夭、圖計算婚惫、Spark SQL或者Spark R的成果,這得益于Spark多元化、一體化的基礎(chǔ)架構(gòu)設(shè)計餐济。也就是說篙悯,在Spark技術(shù)堆棧中矮燎,Spark Streaming可以調(diào)用任何的API接口峡谊,不需要做任何的設(shè)置贤壁。這是Spark無可匹敵之處名船,也是Spark Streaming必將一統(tǒng)天下的根源迷扇。
- 如何清晰的看到數(shù)據(jù)的流入厨内、被處理的過程? 使用一個小技巧瞭亮,通過調(diào)節(jié)放大Batch Interval的方式唆缴,來降低批處理次數(shù)趟紊,以方便看清楚各個環(huán)節(jié)铛嘱。
我們從已寫過的廣告點擊的在線黑名單過濾的Spark Streaming應(yīng)用程序入手纹磺,看一下是具體的實驗源碼:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 使用Scala開發(fā)集群運行的Spark 在線黑名單過濾程序
*
* 背景描述:在廣告點擊計費系統(tǒng)中橄杨,我們在線過濾掉黑名單的點擊式矫,
進而保護廣告商的利益衷佃,只進行有效的廣告點擊計費
* 或者在防刷評分(或者流量)系統(tǒng)锄列,過濾掉無效的投票或者評分或者流量竣况;
* 實現(xiàn)技術(shù):使用transform Api直接基于RDD編程丹泉,進行join操作
*
*/
object OnlineBlackListFilter {
def main(args: Array[String]){
/**
* 第1步:創(chuàng)建Spark的配置對象SparkConf摹恨,設(shè)置Spark程序的運行時的配置信息晒哄,
* 例如說通過setMaster來設(shè)置程序要鏈接的Spark集群的Master的URL,如果設(shè)置
* 為local寝凌,則代表Spark程序在本地運行较木,特別適合于機器配置條件非常差(例如
* 只有1G的內(nèi)存)的初學(xué)者 *
*/
val conf = new SparkConf() //創(chuàng)建SparkConf對象
conf.setAppName("OnlineBlackListFilter") //設(shè)置應(yīng)用程序的名稱预侯,在程序運行的監(jiān)控界面可以看到名稱
conf.setMaster("spark://Master:7077") //此時雌桑,程序在Spark集群
val ssc = new StreamingContext(conf, Seconds(30))
/**
* 黑名單數(shù)據(jù)準(zhǔn)備校坑,實際上黑名單一般都是動態(tài)的耍目,例如在Redis或者數(shù)據(jù)庫中,黑名單的生成往往有復(fù)雜的業(yè)務(wù)
* 邏輯毅访,具體情況算法不同,但是在Spark Streaming進行處理的時候每次都能工訪問完整的信息
*/
val blackList = Array(("hadoop", true),("mahout", true))
val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)
val adsClickStream = ssc.socketTextStream("Master", 9999)
/**
* 此處模擬的廣告點擊的每條數(shù)據(jù)的格式為:time、name
* 此處map操作的結(jié)果是name查乒、(time棚亩,name)的格式
*/
val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(" ")(1), ads) }
adsClickStreamFormatted.transform(userClickRDD => {
//通過leftOuterJoin操作既保留了左側(cè)用戶廣告點擊內(nèi)容的RDD的所有內(nèi)容蔑舞,又獲得了相應(yīng)點擊內(nèi)容是否在黑名單中
val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD)
/**
* 進行filter過濾的時候从撼,其輸入元素是一個Tuple:(name,((time,name), boolean))
* 其中第一個元素是黑名單的名稱掏婶,第二元素的第二個元素是進行l(wèi)eftOuterJoin的時候是否存在在值
* 如果存在的話雄妥,表面當(dāng)前廣告點擊是黑名單老厌,需要過濾掉枝秤,否則的話則是有效點擊內(nèi)容丹壕;
*/
val validClicked = joinedBlackListRDD.filter(joinedItem => {
if(joinedItem._2._2.getOrElse(false))
{
false
} else {
true
}
})
validClicked.map(validClick => {validClick._2._1})
}).print
/**
* 計算后的有效數(shù)據(jù)一般都會寫入Kafka中菌赖,下游的計費系統(tǒng)會從kafka中pull到有效數(shù)據(jù)進行計費
*/
ssc.start()
ssc.awaitTermination()
}
}
集群中需要先執(zhí)行nc,啟動 9999端口
nc -lk 9999
將代碼打包上傳到集群運行
-
我們運行完程序薄啥,看到過濾結(jié)果以后,停止程序到逊,打開HistoryServer http://master:18080/
-
點擊App ID進去觉壶,打開,會看到如下圖所示的4個Job争剿,從實際執(zhí)行的Job是1個Job蚕苇,但是圖中顯示有4個Job涩笤,從這里可以看出Spark Streaming運行的時候自己會啟動一些Job辆它。
先看看job id 為0 的詳細信息
很明顯是我們定義的blackListRDD數(shù)據(jù)的生成。對應(yīng)的代碼為
val blackList = Array((“Hadoop”, true), (“Mathou”, true))
//把Array變成RDD
val blackListRDD = ssc.sparkContext.parallelize(blackList)
并且它做了reduceBykey的操作(代碼中并沒有此步操作片吊,SparkStreaming框架自行生成的)俏脊。
這里有兩個Stage爷贫,Stage 0和Stage 1
Job 1的詳細信息
一個makeRDD,這個RDD是receiver不斷的接收數(shù)據(jù)流中的數(shù)據(jù)腾务,在時間間隔達到batchInterval后岩瘦,將所有數(shù)據(jù)變成一個RDD启昧。并且它的耗時也是最長的59s
-
此處可以看出,receiver也是一個獨立的job苏遥。由此我們可以得出一個結(jié)論:我們在應(yīng)用程序中田炭,可以啟動多個job教硫,并且不用的job之間可以相互配合瞬矩,這就為我們編寫復(fù)雜的應(yīng)用程序打下了基礎(chǔ)涵叮。
我們點擊上面的start at OnlineBlackListFilter.scala:64查看詳細信息
- 根據(jù)上圖的信息割粮,只有一個Executor在接收數(shù)據(jù),最最重要的是紅色框中的數(shù)據(jù)本地性為PROCESS_LOCAL,由此可以知道receiver接收到數(shù)據(jù)后會保存到內(nèi)存中商架,只要內(nèi)存充足是不會寫到磁盤中的飞傀。
即便在創(chuàng)建receiver時,指定的存儲默認(rèn)策略為
MEMORY_AND_DISK_SER_2
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = withNamedScope(“socket text stream”) {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
-
job 2的詳細信息
Job 2 將前兩個job生成的RDD進行l(wèi)eftOuterJoin操作汰聋。
從Stage Id的編號就可以看出乾吻,它是依賴于上兩個Job的。
Receiver接收數(shù)據(jù)時是在spark-master節(jié)點上,但是Job 2在處理數(shù)據(jù)時袱巨,數(shù)據(jù)已經(jīng)到了spark-worker1上了(因為我的環(huán)境只有兩個worker嫉入,數(shù)據(jù)并沒有分散到所有worker節(jié)點爷光,worker節(jié)點如果多一點欢瞪,情況可能不一樣重贺,每個節(jié)點都會處理數(shù)據(jù))
點擊上面的Stage Id 3查看詳細信息:
Executor上運行,并且有5個Task 。
Job 3的詳細信息
-
總結(jié):我們可以看出型诚,一個batchInterval并不是僅僅觸發(fā)一個Job。
根據(jù)上面的描述,我們更細致的了解了DStream和RDD的關(guān)系了。DStream就是一個個batchInterval時間內(nèi)的RDD組成的嗦玖。只不過DStream帶上了時間維度酪术,是一個無邊界的集合。
以上的連續(xù)4個圖继阻,分別對應(yīng)以下4個段落的描述:
Spark Streaming接收Kafka、Flume、HDFS和Kinesis等各種來源的實時輸入數(shù)據(jù),進行處理后,處理結(jié)果保存在HDFS、Databases等各種地方搪锣。
Spark Streaming接收這些實時輸入數(shù)據(jù)流堵幽,會將它們按批次劃分,然后交給Spark引擎處理溃肪,生成按照批次劃分的結(jié)果流厨钻。
Spark Streaming提供了表示連續(xù)數(shù)據(jù)流的苍蔬、高度抽象的被稱為離散流的DStream。DStream本質(zhì)上表示RDD的序列。任何對DStream的操作都會轉(zhuǎn)變?yōu)閷Φ讓覴DD的操作抓狭。
Spark Streaming使用數(shù)據(jù)源產(chǎn)生的數(shù)據(jù)流創(chuàng)建DStream苗桂,也可以在已有的DStream上使用一些操作來創(chuàng)建新的DStream木缝。
在我們前面的實驗中,每300秒會接收一批數(shù)據(jù)吱殉,基于這批數(shù)據(jù)會生成RDD友雳,進而觸發(fā)Job押赊,執(zhí)行處理流礁。
DStream是一個沒有邊界的集合,沒有大小的限制。
DStream代表了時空的概念。隨著時間的推移谜嫉,里面不斷產(chǎn)生RDD沐兰。
鎖定到時間片后住闯,就是空間的操作,也就是對本時間片的對應(yīng)批次的數(shù)據(jù)的處理插佛。
下面用實例來講解數(shù)據(jù)處理過程雇寇。
從Spark Streaming程序轉(zhuǎn)換為Spark執(zhí)行的作業(yè)的過程中锨侯,使用了DStreamGraph囚痴。
Spark Streaming程序中一般會有若干個對DStream的操作渡讼。DStreamGraph就是由這些操作的依賴關(guān)系構(gòu)成成箫。
對DStream的操作會構(gòu)建成DStream Graph
從每個foreach開始蹬昌,都會進行回溯皂贩。從后往前回溯這些操作之間的依賴關(guān)系,也就形成了DStreamGraph婴栽。
在每到batchInterval時間間隔后愚争,Job被觸發(fā)轰枝,DStream Graph將會被轉(zhuǎn)換成RDD Graph
空間維度確定之后,隨著時間不斷推進,會不斷實例化RDD Graph,然后觸發(fā)Job去執(zhí)行處理砾脑。