鏈接
和Spark類似呜达,Spark Streaming通過Maven Central提供。為編寫Spark Streaming程序拍摇,需要添加下面的依賴到你的SBT或者M(jìn)aven工程中递递。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
</dependency>
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.1"
從Kafka,F(xiàn)lume和Kinesis獲取數(shù)據(jù)不在Spark Streaming的核心API中返吻,需要添加相應(yīng)的artifact spark-streaming-xyz_2.11依賴。例如乎婿,常用的依賴如下:
Source | Artifact |
---|---|
Kafka | spark-streaming-kafka-0-8_2.11 |
Flume | spark-streaming-flume_2.11 |
Kinesis | spark-streaming-kinesis-asl_2.11 |
更新列表請參見Maven repository。
初始化StreamingContext
初始化Spark Streaming程序街佑,必須創(chuàng)建StreamingContext對象谢翎,作為Spark Streaming程序的主入口點。
從SparkConf對象創(chuàng)建StreamingContext對象沐旨。
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
appName
參數(shù)是應(yīng)用程序名稱森逮,顯示在集群UI上。master
是一個Spark, Mesos或YARN集群URL磁携,或者一個特殊的"local[]"字符串褒侧,以本地模式運行。實際上,當(dāng)運行在集群上時闷供,是不希望在程序中硬編碼master
的烟央,而是使用spark-submit啟動應(yīng)用程序
接收master
參數(shù)。不過為了本地測試和單元測試歪脏,可以傳"local[]"來運行Spark Streaming(在本地系統(tǒng)檢測CPU核數(shù))疑俭。注意,這里內(nèi)部創(chuàng)建了一個SparkContext(Spark程序的起始點)婿失,可以通過ssc.sparkContext
訪問钞艇。
批時間間隔必須根據(jù)應(yīng)用程序的延遲需求和集群可用資源來設(shè)置。具體參見性能調(diào)優(yōu)豪硅。
StreamingContext
對象也可以通過已經(jīng)存在的SparkContext
對象創(chuàng)建哩照。
import org.apache.spark.streaming._
val sc = ... // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
上下文定義完成后,必須做以下事情懒浮。
- 通過創(chuàng)建輸入DStream定義輸入源葡秒。
- 通過在DStream上應(yīng)用轉(zhuǎn)換和輸出操作定義流計算。
- 開始接收數(shù)據(jù)并使用
streamingContext.start()
進(jìn)行處理嵌溢。 - 使用
streamingContext.awaitTermination()
等待處理結(jié)束(手動或者因為錯誤結(jié)束)眯牧。 - 可使用
streamingContext.stop()
手動停用處理過程。
需要記住的幾點:
- 一旦上下文啟動赖草,就不能像其添加新的流計算了学少。
- 一旦上下文停用,不能重啟秧骑。
- 同一時間再JVM中只能有一個StreamingContext處于活躍狀態(tài)版确。
- StreamingContext的stop()方法也會停用SparkContext。只想停用StreamingContext乎折,設(shè)置
stop()
方法的可選參數(shù)stopSparkContext
為false绒疗。 - SparkContext可以進(jìn)行重用,創(chuàng)建多個StreamingContext骂澄,只要在創(chuàng)建下一個StreamingContext之前停用了前一個StreamingContext(但是沒有停用SparkContext)即可吓蘑。
離散流(DStreams)
DStream是Spark Streaming提供的基本抽象。代表一個連續(xù)的數(shù)據(jù)流坟冲,要么是從源接收的輸入數(shù)據(jù)流磨镶,要么是轉(zhuǎn)換輸入流生成的處理過的數(shù)據(jù)流。在Spark內(nèi)部健提,DStream代表一系列連續(xù)的RDD(不可變的分布式數(shù)據(jù)集琳猫,參見Spark編程指南(二)。DStream中的每個RDD包含一定時間間隔的數(shù)據(jù)私痹,顯示如下圖脐嫂。
應(yīng)用在DStream的任何操作都會轉(zhuǎn)換為潛在RDD的操作统刮。例如,在前面例子中將每行的數(shù)據(jù)流轉(zhuǎn)換為單詞账千,
flatMap
操作應(yīng)用在lines
DStream中的每個RDD上侥蒙,生成words
DStream的RDD。顯示如下圖蕊爵。
這些潛在的RDD轉(zhuǎn)換由Spark engine進(jìn)行計算辉哥。DStream操作會隱藏大部分這些細(xì)節(jié),然后提供給開發(fā)者一個高層API方便使用攒射。這些操作會在之后進(jìn)行詳細(xì)介紹醋旦。
輸入DStreams和Receivers
輸入DStream代表從源接收的輸入數(shù)據(jù)流。在前面快速示例中会放,lines
就是輸入DStream饲齐,它代表了從netcat服務(wù)器接收的數(shù)據(jù)流。每個輸入DStream(除了文件流咧最,會在下面進(jìn)行討論)都會關(guān)聯(lián)一個Receiver(Scala doc, Java doc)對象捂人,這個對象接收來自源的數(shù)據(jù)并將數(shù)據(jù)存儲在Spark的內(nèi)存中供后續(xù)處理。
Spark Streaming提供兩類內(nèi)置的源矢沿。
- Basic sources:在StreamingContext API中直接可用滥搭。例如:文件系統(tǒng)和socket連接。
- Advanced sources:如Kafka捣鲸,F(xiàn)lume瑟匆,Kinesis等。通過額外的工具類可用栽惶。這些需要鏈接到額外的依賴項愁溜,這個之前已經(jīng)討論過。
接下來討論每個類別中的一些源外厂。
注意冕象,如果你想在streaming應(yīng)用程序中并行接收多個數(shù)據(jù)流,可以創(chuàng)建多個輸入DStream(在之后性能調(diào)優(yōu)中進(jìn)行討論)汁蝶。這樣會創(chuàng)建多個receiver同時接收多個數(shù)據(jù)流渐扮。但是需要注意的是Spark worker/executor是長線任務(wù),它會占用分配給Spark Streaming應(yīng)用程序的一個CPU核穿仪。因此席爽,要記住Spark Streaming應(yīng)用程序需要分配足夠的CPU核(或線程,如果在本地運行)來處理接收到的數(shù)據(jù)以及運行receiver啊片。
需要記住的幾點
- 當(dāng)本地運行Spark Streaming程序時,不要使用"local"或者"local[1]"作為master URL玖像。這意味著本地只有一個線程用來運行任務(wù)紫谷。如果你使用基于receiver的輸入DStream(例如sockets胰坟,Kafka击吱,F(xiàn)lume等等),單線程會用于運行receiver,沒有線程會用于處理接收到的數(shù)據(jù)盟戏。因此,當(dāng)本地運行時但骨,要使用"local[n]"作為master URL恋技,其中n大于receiver的數(shù)量(可參考Spark Properties了解如何設(shè)置master)。
- 將邏輯擴展到集群上運行崇裁,分配給Spark Streaming應(yīng)用程序的核數(shù)必須比receiver的數(shù)量多匕坯。否則系統(tǒng)會接收數(shù)據(jù),但不能處理數(shù)據(jù)拔稳。
Basic Sources
在前面的示例中我們已經(jīng)看到過ssc.socketTextStream(...)
葛峻,從TCP socket連接接收文本數(shù)據(jù)并創(chuàng)建一個DStream。處理sockets巴比,StreamingContext API還提供了創(chuàng)建以文件作為源的DStream术奖。
- File Streams:從任意兼容HDFS API的文件系統(tǒng)(如HDFS, S3, NFS等等)的文件中讀取數(shù)據(jù),可以用如下方式創(chuàng)建DStream:
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
Spark Streaming會監(jiān)控目錄dataDirectory
轻绞,處理任意在那個目錄中創(chuàng)建的文件(不支持寫入嵌套目錄中的文件)采记。請注意:
- 文件必須是相同數(shù)據(jù)格式。
- 文件必須在
dataDirectory
中創(chuàng)建政勃,通過原子級地移動或重命名文件到數(shù)據(jù)目錄的方式唧龄。 - 一旦移動,文件必須不能修改稼病。如果文件正在不斷地進(jìn)行追加选侨,那么新數(shù)據(jù)不會被讀取。
對于簡單文本文件然走,有一個更簡單的方法streamingContext.textFileStream(dataDirectory)
援制。文件流不會要求運行receiver,因此不需要分配CPU核芍瑞。
基于自定義Receiver的流:可以通過自定義receiver接收數(shù)據(jù)流來創(chuàng)建DStream晨仑。詳細(xì)請參見Custom Receiver Guide。
RDD隊列作為流:對于使用測試數(shù)據(jù)測試一個Spark Streaming應(yīng)用程序來說拆檬,可以基于RDD隊列來創(chuàng)建DStream洪己,使用
streamingContext.queueStream(queueOfRDDs)
。添加到隊列中的每個RDD都會被當(dāng)做DStream中一個批次的數(shù)據(jù)竟贯,像數(shù)據(jù)流一樣進(jìn)行處理答捕。
對于來自socket和文件數(shù)據(jù)流的更多細(xì)節(jié)請參見相關(guān)的API文檔,scala請參見StreamingContext屑那,Java請參見JavaStreamingContext拱镐,Python請參見StreamingContext艘款。
Advanced Sources
這個分類的源要和外部非Spark庫交互,其中一些需要復(fù)雜的依賴(如Kafka和Flume)沃琅。因此哗咆,為了盡量減少和依賴庫版本沖突的問題,從這些源創(chuàng)建DStream的功能移到了單獨的庫中益眉,必要時可以顯式鏈接晌柬。
注意這些高級源在Spark shell中不可用,因此基于高級源的應(yīng)用程序不能在shell中測試郭脂。如果想要在Spark shell中使用高級源年碘,需要在下載對應(yīng)的Maven artifact’s JAR以及其依賴的庫,然后添加到classpath中朱庆。
一些高級源如下:
- Kafka:Spark Streaming 2.1.1和Kafka broker 0.8.2.1及更高版本兼容盛泡。具體參見Kafka Integration Guide。
- Flume:Spark Streaming 2.1.1和Flume 1.6.0版本兼容娱颊。具體參見Flume Integration Guide傲诵。
- Kinesis:Spark Streaming 2.1.1和Kinesis Client Library 1.2.1兼容。具體參見Kinesis Integration Guide箱硕。
Custom Sources
輸入DStream也可以通過自定義數(shù)據(jù)源創(chuàng)建拴竹。你需要實現(xiàn)用戶定義的receiver(下面會進(jìn)行說明)用于接收自定義源的數(shù)據(jù)并將數(shù)據(jù)存儲到Spark中。具體參見Custom Receiver Guide剧罩。
Receiver可靠性
有兩類基于可靠性的數(shù)據(jù)源栓拜。數(shù)據(jù)源(如Kafka和Flume)允許確認(rèn)傳輸?shù)臄?shù)據(jù)。如果系統(tǒng)從這些能確認(rèn)數(shù)據(jù)正確接收的可靠源接收數(shù)據(jù)惠昔,則可以確保不會有數(shù)據(jù)因為任何類型的失敗而丟失數(shù)據(jù)幕与。下面是兩類receiver:
- 可靠Receiver - 當(dāng)數(shù)據(jù)已經(jīng)接收并且按照副本要求存儲到Spark中時,可靠receiver會正確發(fā)送確認(rèn)信息給可靠數(shù)據(jù)源镇防。
- 不可靠Receiver - 不可靠數(shù)據(jù)源不能發(fā)送確認(rèn)信息給數(shù)據(jù)源啦鸣。這類receiver用于不支持確認(rèn)的數(shù)據(jù)源,或者對于可靠數(shù)據(jù)源来氧,不想或不需要進(jìn)行確認(rèn)诫给。
如何編寫可靠receiver在中Custom Receiver Guide進(jìn)行討論。