一、概述
Spark Streaming是對(duì)核心Spark API的一個(gè)擴(kuò)展河哑,它能夠?qū)崿F(xiàn)對(duì)實(shí)時(shí)數(shù)據(jù)流的流式處理,并具有很好的可擴(kuò)展性搭幻、高吞吐量和容錯(cuò)性姆吭。Spark Streaming支持從多種數(shù)據(jù)源提取數(shù)據(jù),如:Kafka痊末、Flume秒梅、Twitter、ZeroMQ舌胶、Kinesis以及TCP套接字捆蜀,并且可以提供一些高級(jí)API來(lái)表達(dá)復(fù)雜的處理算法,如:map、reduce辆它、join和window等誊薄。最后,Spark Streaming支持將處理完的數(shù)據(jù)推送到文件系統(tǒng)锰茉、數(shù)據(jù)庫(kù)或者實(shí)時(shí)儀表盤中展示呢蔫。實(shí)際上,你完全可以將Spark的機(jī)器學(xué)習(xí)(machine learning) 和 圖計(jì)算(graph processing)的算法應(yīng)用于Spark Streaming的數(shù)據(jù)流當(dāng)中飒筑。
二片吊、Spark Streaming基本原理
1)官方文檔對(duì)Spark Streaming的原理解讀
Spark Streaming從實(shí)時(shí)數(shù)據(jù)流接入數(shù)據(jù),再將其劃分為一個(gè)個(gè)小批量供后續(xù)Spark engine處理协屡,所以實(shí)際上俏脊,Spark Streaming是按一個(gè)個(gè)小批量來(lái)處理數(shù)據(jù)流的。下圖展示了Spark Streaming的內(nèi)部工作原理:
Spark Streaming為這種持續(xù)的數(shù)據(jù)流提供了的一個(gè)高級(jí)抽象肤晓,即:<font color="red">discretized stream(離散數(shù)據(jù)流)或者叫DStream</font>爷贫。DStream既可以從輸入數(shù)據(jù)源創(chuàng)建得來(lái),如:Kafka补憾、Flume或者Kinesis漫萄,也可以從其他DStream經(jīng)一些算子操作得到。其實(shí)在內(nèi)部盈匾,一個(gè)DStream就是包含了一系列RDDs腾务。
2)框架執(zhí)行流程
下面將從更細(xì)粒度架構(gòu)角度看Spark Streaming的執(zhí)行原理,這里先回顧一下Spark框架執(zhí)行流程削饵。
Spark計(jì)算平臺(tái)有兩個(gè)重要角色窑睁,Driver和executor,不論是Standlone模式還是Yarn模式葵孤,都是Driver充當(dāng)Application的master角色担钮,負(fù)責(zé)任務(wù)執(zhí)行計(jì)劃生成和任務(wù)分發(fā)及調(diào)度;executor充當(dāng)worker角色尤仍,負(fù)責(zé)實(shí)際執(zhí)行任務(wù)的task箫津,<font color="red">計(jì)算的結(jié)果返回Driver</font>。
下圖是Driver和Ececutor的執(zhí)行流程宰啦。
Driver負(fù)責(zé)生成邏輯查詢計(jì)劃苏遥、物理查詢計(jì)劃和把任務(wù)派發(fā)給executor,executor接受任務(wù)后進(jìn)行處理赡模,離線計(jì)算也是按這個(gè)流程進(jìn)行田炭。
- DAGScheduler:負(fù)責(zé)將Task拆分成不同Stage的具有依賴關(guān)系(包含RDD的依賴關(guān)系)的多批任務(wù),然后提交給TaskScheduler進(jìn)行具體處理漓柑。
- TaskScheduler:負(fù)責(zé)實(shí)際每個(gè)具體Task的物理調(diào)度執(zhí)行教硫。
下面看Spark Streaming實(shí)時(shí)計(jì)算的執(zhí)行流程:
- 從整體上看叨吮,實(shí)時(shí)計(jì)算與離線計(jì)算一樣,主要組件是Driver和Executor的瞬矩。不同的是多了數(shù)據(jù)采集和數(shù)據(jù)按時(shí)間分片過(guò)程茶鉴,數(shù)據(jù)采集依賴外部數(shù)據(jù)源,這里用MessageQueue表示景用,數(shù)據(jù)分片則依靠?jī)?nèi)部一個(gè)時(shí)鐘Clock涵叮,按batch interval來(lái)定時(shí)對(duì)數(shù)據(jù)分片,然后把每一個(gè)batch interval內(nèi)的數(shù)據(jù)提交處理伞插。
- Executor從MessageQueue獲取數(shù)據(jù)并交給BlockManager管理割粮,然后把元數(shù)據(jù)信息BlockID返給driver的Receiver Tracker,driver端的Job Jenerator對(duì)一個(gè)batch的數(shù)據(jù)生成JobSet媚污,最后把作業(yè)執(zhí)行計(jì)劃傳遞給executor處理舀瓢。
三、Spark Streaming核心API
SparkStreaming完整的API包括StreamingContext杠步、DStream輸入、DStream上的各種操作和動(dòng)作榜轿、DStream輸出幽歼、窗口操作等。
1)StreamingContext
為了初始化Spark Streaming程序谬盐,必須創(chuàng)建一個(gè)StreamingContext對(duì)象甸私,該對(duì)象是Spark Streaming所有流操作的主要入口。一個(gè)StreamingContext對(duì)象可以用SparkConf對(duì)象創(chuàng)建:
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
2)DStream輸入
DStream輸入表示從數(shù)據(jù)源獲取的原始數(shù)據(jù)流飞傀。每個(gè)輸入流DStream和一個(gè)接收器(receiver)對(duì)象相關(guān)聯(lián)皇型,這個(gè)Receiver從源中獲取數(shù)據(jù),并將數(shù)據(jù)存入內(nèi)存中用于處理砸烦。
Spark Streaming有兩類數(shù)據(jù)源:
- 基本源(basic source):在StreamingContext API中直接可用的源頭弃鸦,例如文件系統(tǒng)、套接字連接幢痘、Akka的actor等唬格。
- 高級(jí)源(advanced source):包括 Kafka、Flume颜说、Kinesis购岗、Tiwtter等,他們需要通過(guò)額外的類來(lái)使用门粪。
3)DStream的轉(zhuǎn)換
和RDD類似喊积,transformation用來(lái)對(duì)輸入DStreams的數(shù)據(jù)進(jìn)行轉(zhuǎn)換、修改等各種操作玄妈,當(dāng)然乾吻,DStream也支持很多在Spark RDD的transformation算子髓梅。
轉(zhuǎn)換操作(transformation) | 含義(Meaning) |
---|---|
map(func) | 利用函數(shù)func處理原DStream的每個(gè)元素,返回一個(gè)新的DStream. |
flatMap(func) | 與map相似溶弟,但是每個(gè)輸入項(xiàng)可用被映射0個(gè)或多個(gè)輸出項(xiàng) |
filter(func) | 返回一個(gè)新的DStream,它僅包含源DStream中滿足函數(shù)func的項(xiàng) |
repartition(numPartitions) | 通過(guò)創(chuàng)建更多或更少的的partition改變這個(gè)DStream的并行級(jí)別(level ofparallelism) |
union(otherStream) | 返回一個(gè)新的DStream,它包含源DStream和otherStream的聯(lián)合元素 |
count() | 通過(guò)計(jì)算源DStream中每個(gè)RDD的元素?cái)?shù)量女淑,返回一個(gè)包含單元素RDD的新DStream |
reduce(func) | 利用函數(shù)func聚集源DStream中每個(gè)RDD的元素,返回一個(gè)包含單元素RDD的新的DStream辜御。函數(shù)應(yīng)該是相關(guān)聯(lián)的鸭你,以使計(jì)算可以并行化 |
countByValue() | 這個(gè)算子應(yīng)用于元素類型為K的DStream上,返回一個(gè)(Kjong)前的新DStreamo每個(gè)鍵的值是在原DStream的每個(gè)RDD的頻率 |
reduceByKey(func, [numTasks]) | 當(dāng)在一個(gè)由(K,V)對(duì)組成的DStream上調(diào)用這個(gè)算子擒权,返回一個(gè)新的由(K,V)對(duì)組成的DStream,每一個(gè)key的值均有給定的reduce函數(shù)聚集起來(lái)袱巨。注意:在默認(rèn)情況下,這個(gè)算子利用了 Spark默認(rèn)的并發(fā)任務(wù)數(shù)去分組碳抄∮淅希可以用numTasks參數(shù)設(shè)置不同的任務(wù)數(shù) |
join(otherStream, [numTasks]) | 當(dāng)應(yīng)用于兩個(gè)DStream(一個(gè)包含(K,V)對(duì),一個(gè)包含(K,W)對(duì)剖效,返回一個(gè)包含(K,(V,W))對(duì)的新的 DStream |
cogroup(otherStream, [numTasks]) | 當(dāng)應(yīng)用于兩個(gè)DStream(一個(gè)包含(K,V)對(duì)嫉入,一個(gè)包含(K,W)對(duì),返回一個(gè)包含(K,Seq[VJSeq[WN 的元組 |
transform(func) | 通過(guò)對(duì)源DStream的每個(gè)RDD應(yīng)用RDD-to-RDD函數(shù)璧尸,創(chuàng)建一個(gè)新的DStreamo這個(gè)可以在DStream中的任何RDD操作中使用 |
updateStateByKey(func) | 利用給定的函數(shù)更新DStream狀態(tài)咒林,返回一個(gè)新“state”的DStream |
4)DStream的輸出
和RDD類似,Spark Streaming允許將DStream轉(zhuǎn)換后的結(jié)果發(fā)送到數(shù)據(jù)庫(kù)爷光、文件系統(tǒng)等外部系統(tǒng)中垫竞。目前,定義了Spark Streaming的輸出操作:
轉(zhuǎn)換操作(transformation) | 含義(Meaning) |
---|---|
print() | 在運(yùn)行流應(yīng)用程序的驅(qū)動(dòng)程序節(jié)點(diǎn)上打印數(shù)據(jù)流中每批數(shù)據(jù)的前十個(gè)元素蛀序。這對(duì)于開(kāi)發(fā)和調(diào)試非常有用欢瞪。Python API在Python API中稱為pprint()。 |
saveAsTextFiles(prefix, [suffix]) | 將此數(shù)據(jù)流的內(nèi)容另存為文本文件徐裸。每個(gè)批處理間隔的文件名基于前綴和后綴生成:“prefix-TIME_IN_MS[.suffix]”遣鼓。 |
saveAsObjectFiles(prefix, [suffix]) | 將此數(shù)據(jù)流的內(nèi)容另存為序列化Java對(duì)象的SequenceFile。每個(gè)批處理間隔的文件名基于前綴和后綴生成:“prefix-TIME_IN_MS[.suffix]”重贺。Python API這在Python API中不可用譬正。 |
saveAsHadoopFiles(prefix, [suffix]) | 將此數(shù)據(jù)流的內(nèi)容另存為Hadoop文件。每個(gè)批處理間隔的文件名基于前綴和后綴生成:“prefix-TIME_IN_MS[.suffix]”檬姥。Python API這在Python API中不可用曾我。 |
foreachRDD(func) | 對(duì)從流生成的每個(gè)RDD應(yīng)用函數(shù)func的最通用的輸出運(yùn)算符。此函數(shù)應(yīng)將每個(gè)RDD中的數(shù)據(jù)推送到外部系統(tǒng)健民,例如將RDD保存到文件中抒巢,或通過(guò)網(wǎng)絡(luò)將其寫入數(shù)據(jù)庫(kù)。請(qǐng)注意秉犹,函數(shù)func是在運(yùn)行流應(yīng)用程序的驅(qū)動(dòng)程序進(jìn)程中執(zhí)行的蛉谜,其中通常包含RDD操作稚晚,這些操作將強(qiáng)制計(jì)算流RDD。 |
5)窗口操作
Spark Streaming 還提供窗口計(jì)算型诚,允許您在數(shù)據(jù)的滑動(dòng)窗口上應(yīng)用轉(zhuǎn)換客燕。下圖說(shuō)明了這個(gè)滑動(dòng)窗口:
如圖所示,每次窗口滑過(guò)一個(gè)源 DStream 時(shí)狰贯,落入窗口內(nèi)的源 RDD 被組合并操作以產(chǎn)生窗口化 DStream 的 RDD也搓。在這種特定情況下,該操作應(yīng)用于最后 3 個(gè)時(shí)間單位的數(shù)據(jù)涵紊,并滑動(dòng) 2 個(gè)時(shí)間單位傍妒。這說(shuō)明任何窗口操作都需要指定兩個(gè)參數(shù)。
- windowLength:窗口的持續(xù)時(shí)間(圖中 3)摸柄。
-
slideInterval :執(zhí)行窗口操作的間隔(圖中為 2)颤练。
一些常見(jiàn)的窗口操作如下。所有這些操作都采用上述兩個(gè)參數(shù) - windowLength和slideInterval驱负。
轉(zhuǎn)換操作(transformation) | 含義(Meaning) |
---|---|
window(windowLength, slideInterval) | 返回一個(gè)新的 DStream嗦玖,它是根據(jù)源 DStream 的窗口批次計(jì)算的。 |
countByWindow(windowLength, slideInterval) | 返回流中元素的滑動(dòng)窗口計(jì)數(shù)跃脊。 |
reduceByWindow(func, windowLength, slideInterval) | 返回一個(gè)新的單元素流宇挫,它是通過(guò)使用func在滑動(dòng)間隔內(nèi)聚合流中的元素而創(chuàng)建的。該函數(shù)應(yīng)該是關(guān)聯(lián)的和可交換的匾乓,以便它可以被正確地并行計(jì)算捞稿。 |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | 當(dāng)在 (K, V) 對(duì)的 DStream 上調(diào)用時(shí)又谋,返回一個(gè)新的 (K, V) 對(duì) DStream拼缝,其中每個(gè)鍵的值使用給定的 reduce 函數(shù)func 在滑動(dòng)窗口中的批次上聚合。<font color="red">注意:默認(rèn)情況下彰亥,這使用 Spark 的默認(rèn)并行任務(wù)數(shù)(本地模式為 2咧七,在集群模式下,數(shù)量由 config 屬性決定spark.default.parallelism)進(jìn)行分組任斋。您可以傳遞一個(gè)可選 numTasks參數(shù)來(lái)設(shè)置不同數(shù)量的任務(wù)继阻。</font> |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) | reduceByKeyAndWindow()其中每個(gè)窗口的減少值是使用前一個(gè)窗口的減少值遞增計(jì)算的。這是通過(guò)減少進(jìn)入滑動(dòng)窗口的新數(shù)據(jù)废酷,并“逆減少”離開(kāi)窗口的舊數(shù)據(jù)來(lái)完成的瘟檩。一個(gè)例子是在窗口滑動(dòng)時(shí)“添加”和“減去”鍵的計(jì)數(shù)。但是澈蟆,它只適用于“可逆歸約函數(shù)”墨辛,即那些具有相應(yīng)“逆歸約”函數(shù)(作為參數(shù)invFunc)的歸約函數(shù)。跟reduceByKeyAndWindow一樣趴俘,reduce 任務(wù)的數(shù)量可通過(guò)可選參數(shù)進(jìn)行配置睹簇。<font color="red">請(qǐng)注意奏赘,必須啟用檢查點(diǎn)才能使用此操作<font>。 |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) | 當(dāng)在 (K, V) 對(duì)的 DStream 上調(diào)用時(shí)太惠,返回一個(gè)新的 (K, Long) 對(duì) DStream磨淌,其中每個(gè)鍵的值是其在滑動(dòng)窗口內(nèi)的頻率。與 in 一樣 reduceByKeyAndWindow凿渊,reduce 任務(wù)的數(shù)量可通過(guò)可選參數(shù)進(jìn)行配置梁只。 |
更多操作詳情,請(qǐng)參考官方文檔:https://spark.apache.org/docs/latest/streaming-programming-guide.html
四嗽元、Spark下一代實(shí)時(shí)計(jì)算框架Structured Streaming
1)簡(jiǎn)介
從Spark 2.0開(kāi)始敛纲,Spark Streaming引入了一套新的流計(jì)算編程模型:Structured Streaming,開(kāi)發(fā)這套API的主要?jiǎng)右蚴亲許park 2.0之后剂癌,以RDD為核心的API逐步升級(jí)到Dataset/DataFrame上淤翔,而另一方面,以RDD為基礎(chǔ)的編程模型對(duì)開(kāi)發(fā)人員的要求較高佩谷,需要有足夠的編程背景才能勝任Spark Streaming的編程工作旁壮,而新引入的Structured Streaming模型是把數(shù)據(jù)流當(dāng)作一個(gè)沒(méi)有邊界的數(shù)據(jù)表來(lái)對(duì)待,這樣開(kāi)發(fā)人員可以在流上使用Spark SQL進(jìn)行流處理谐檀,這大大降低了流計(jì)算的編程門檻抡谐。
下圖為Structure Streaming邏輯數(shù)據(jù)結(jié)構(gòu)圖:
這里以wordcount為例的計(jì)算過(guò)程如下圖:
圖中Time橫軸是時(shí)間軸,隨著時(shí)間桐猬,在1麦撵、2、3秒分別輸入數(shù)據(jù)溃肪,進(jìn)入wordcount算法計(jì)算聚合免胃,輸出結(jié)果。更對(duì)關(guān)于Structure Streaming可以參考官網(wǎng):https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
2) Spark streaming 和 Spark Structured Streaming的對(duì)比
對(duì)比項(xiàng) | Spark Streaming | Structured Streaming |
---|---|---|
流模型 | Spark Streaming是spark最初的流處理框架惫撰,使用了微批的形式來(lái)進(jìn)行流處理羔沙,微批終究是批。每一個(gè)批處理間隔的為一個(gè)批厨钻,也就是一個(gè)RDD扼雏,我們對(duì)RDD進(jìn)行操作就可以源源不斷的接收、處理數(shù)據(jù)夯膀。 | Spark 2.X出來(lái)的流框架诗充,采用了無(wú)界表的概念,流數(shù)據(jù)相當(dāng)于往一個(gè)表上連續(xù)追加行诱建,流上的每一條數(shù)據(jù)都類似于將一行新數(shù)據(jù)添加到表中蝴蜓。 |
操作對(duì)象 | Dtream編程接口是RDD | 使用 DataFrame、DataSet 的編程接口涂佃,處理數(shù)據(jù)時(shí)可以使用Spark SQL中提供的方法 |
時(shí)延 | 接收到數(shù)據(jù)時(shí)間窗口励翼,秒級(jí) | 實(shí)時(shí)處理數(shù)據(jù)蜈敢,毫秒級(jí) |
可靠性 | Checkpoint 機(jī)制 | Checkpoint 機(jī)制 |
Sink | 提供了 foreachRDD()方法,通過(guò)自己編程實(shí)現(xiàn)將每個(gè)批的數(shù)據(jù)寫出 | 提供了一些 sink(Console Sink汽抚、File Sink抓狭、Kafka Sink等),只要通過(guò)option配置就可以使用造烁;對(duì)于需要自定義的Sink,提供了ForeachWriter的編程接口否过,實(shí)現(xiàn)相關(guān)方法就可以完成 |
Spark Streaming
Spark Streaming是spark最初的流處理框架,使用了微批的形式來(lái)進(jìn)行流處理惭蟋。
提供了基于RDDs的Dstream API苗桂,每個(gè)時(shí)間間隔內(nèi)的數(shù)據(jù)為一個(gè)RDD,源源不斷對(duì)RDD進(jìn)行處理來(lái)實(shí)現(xiàn)流計(jì)算告组。
Spark Streaming采用微批的處理方法煤伟,微批終究是批。每一個(gè)批處理間隔的為一個(gè)批木缝,也就是一個(gè)RDD便锨,我們對(duì)RDD進(jìn)行操作就可以源源不斷的接收、處理數(shù)據(jù)我碟。
Spark Structured Streaming
Spark 2.X出來(lái)的流框架放案,采用了無(wú)界表的概念,流數(shù)據(jù)相當(dāng)于往一個(gè)表上不斷追加行矫俺。
基于Spark SQL引擎實(shí)現(xiàn)吱殉,可以使用大多數(shù)Spark SQL的function。
Structured Streaming將實(shí)時(shí)數(shù)據(jù)當(dāng)做被連續(xù)追加的表厘托。流上的每一條數(shù)據(jù)都類似于將一行新數(shù)據(jù)添加到表中友雳。
3)對(duì)比其它實(shí)時(shí)計(jì)算框架
為了展示結(jié)構(gòu)化流的獨(dú)特之處,下表將其與其他幾個(gè)系統(tǒng)進(jìn)行了比較催烘。正如我們所討論的鲫构,Structured Streaming 對(duì)前綴完整性的強(qiáng)大保證使其等同于批處理作業(yè)俯树,并且易于集成到更大的應(yīng)用程序中泞歉。此外顽聂,在 Spark 上構(gòu)建可以與批處理和交互式查詢集成砌函。
從延遲看:Storm和Flink原生支持流計(jì)算冀墨,對(duì)每條記錄處理第焰,毫秒級(jí)延遲峻堰,是真正的實(shí)時(shí)計(jì)算咽袜,對(duì)延遲要求較高的應(yīng)用建議選擇這兩種丸卷。Spark Streaming的延遲是秒級(jí)。Flink是目前最火的實(shí)時(shí)計(jì)算引擎询刹,也是公司用的最多的實(shí)時(shí)計(jì)算引擎谜嫉,出來(lái)的晚萎坷,但是發(fā)展迅猛。
從容錯(cuò)看 :Spark Streaming和Flink都支持最高的exactly-once容錯(cuò)級(jí)別沐兰,Storm會(huì)有記錄重復(fù)計(jì)算的可能
從吞吐量看 :Spark Streaming是小批處理哆档,故吞吐量會(huì)相對(duì)更大。
從成熟度看: Storm最成熟住闯,Spark其次瓜浸,F(xiàn)link處于仍處于發(fā)展中,這三個(gè)項(xiàng)目都有公司生產(chǎn)使用比原,但畢竟開(kāi)源項(xiàng)目插佛,項(xiàng)目越不成熟,往往越要求公司大數(shù)據(jù)平臺(tái)研發(fā)水平量窘。
從整合性看:Storm與SQL雇寇、機(jī)器學(xué)習(xí)和圖計(jì)算的結(jié)合復(fù)雜性最高;而Spark和Flink都有生態(tài)圈內(nèi)對(duì)應(yīng)的SQL蚌铜、機(jī)器學(xué)習(xí)和圖計(jì)算谢床,與這些項(xiàng)目結(jié)合更容易。
【參考資料】