[譯]Spark Streaming編程指南(二)

鏈接

和Spark類似呜达,Spark Streaming通過Maven Central提供。為編寫Spark Streaming程序拍摇,需要添加下面的依賴到你的SBT或者M(jìn)aven工程中递递。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.1.1</version>
</dependency>
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.1"

從Kafka,F(xiàn)lume和Kinesis獲取數(shù)據(jù)不在Spark Streaming的核心API中返吻,需要添加相應(yīng)的artifact spark-streaming-xyz_2.11依賴。例如乎婿,常用的依賴如下:

Source Artifact
Kafka spark-streaming-kafka-0-8_2.11
Flume spark-streaming-flume_2.11
Kinesis spark-streaming-kinesis-asl_2.11

更新列表請參見Maven repository

初始化StreamingContext

初始化Spark Streaming程序街佑,必須創(chuàng)建StreamingContext對象谢翎,作為Spark Streaming程序的主入口點。

SparkConf對象創(chuàng)建StreamingContext對象沐旨。

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

appName參數(shù)是應(yīng)用程序名稱森逮,顯示在集群UI上。master是一個Spark, Mesos或YARN集群URL磁携,或者一個特殊的"local[]"字符串褒侧,以本地模式運行。實際上,當(dāng)運行在集群上時闷供,是不希望在程序中硬編碼master的烟央,而是使用spark-submit啟動應(yīng)用程序
接收master參數(shù)。不過為了本地測試和單元測試歪脏,可以傳"local[
]"來運行Spark Streaming(在本地系統(tǒng)檢測CPU核數(shù))疑俭。注意,這里內(nèi)部創(chuàng)建了一個SparkContext(Spark程序的起始點)婿失,可以通過ssc.sparkContext訪問钞艇。

批時間間隔必須根據(jù)應(yīng)用程序的延遲需求和集群可用資源來設(shè)置。具體參見性能調(diào)優(yōu)豪硅。

StreamingContext對象也可以通過已經(jīng)存在的SparkContext對象創(chuàng)建哩照。

import org.apache.spark.streaming._

val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

上下文定義完成后,必須做以下事情懒浮。

  1. 通過創(chuàng)建輸入DStream定義輸入源葡秒。
  2. 通過在DStream上應(yīng)用轉(zhuǎn)換和輸出操作定義流計算。
  3. 開始接收數(shù)據(jù)并使用streamingContext.start()進(jìn)行處理嵌溢。
  4. 使用streamingContext.awaitTermination()等待處理結(jié)束(手動或者因為錯誤結(jié)束)眯牧。
  5. 可使用streamingContext.stop()手動停用處理過程。

需要記住的幾點:

  • 一旦上下文啟動赖草,就不能像其添加新的流計算了学少。
  • 一旦上下文停用,不能重啟秧骑。
  • 同一時間再JVM中只能有一個StreamingContext處于活躍狀態(tài)版确。
  • StreamingContext的stop()方法也會停用SparkContext。只想停用StreamingContext乎折,設(shè)置stop()方法的可選參數(shù)stopSparkContext為false绒疗。
  • SparkContext可以進(jìn)行重用,創(chuàng)建多個StreamingContext骂澄,只要在創(chuàng)建下一個StreamingContext之前停用了前一個StreamingContext(但是沒有停用SparkContext)即可吓蘑。

離散流(DStreams)

DStream是Spark Streaming提供的基本抽象。代表一個連續(xù)的數(shù)據(jù)流坟冲,要么是從源接收的輸入數(shù)據(jù)流磨镶,要么是轉(zhuǎn)換輸入流生成的處理過的數(shù)據(jù)流。在Spark內(nèi)部健提,DStream代表一系列連續(xù)的RDD(不可變的分布式數(shù)據(jù)集琳猫,參見Spark編程指南(二)。DStream中的每個RDD包含一定時間間隔的數(shù)據(jù)私痹,顯示如下圖脐嫂。

image.png

應(yīng)用在DStream的任何操作都會轉(zhuǎn)換為潛在RDD的操作统刮。例如,在前面例子中將每行的數(shù)據(jù)流轉(zhuǎn)換為單詞账千,flatMap操作應(yīng)用在lines DStream中的每個RDD上侥蒙,生成words DStream的RDD。顯示如下圖蕊爵。

image.png

這些潛在的RDD轉(zhuǎn)換由Spark engine進(jìn)行計算辉哥。DStream操作會隱藏大部分這些細(xì)節(jié),然后提供給開發(fā)者一個高層API方便使用攒射。這些操作會在之后進(jìn)行詳細(xì)介紹醋旦。

輸入DStreams和Receivers

輸入DStream代表從源接收的輸入數(shù)據(jù)流。在前面快速示例中会放,lines就是輸入DStream饲齐,它代表了從netcat服務(wù)器接收的數(shù)據(jù)流。每個輸入DStream(除了文件流咧最,會在下面進(jìn)行討論)都會關(guān)聯(lián)一個ReceiverScala doc, Java doc)對象捂人,這個對象接收來自源的數(shù)據(jù)并將數(shù)據(jù)存儲在Spark的內(nèi)存中供后續(xù)處理。

Spark Streaming提供兩類內(nèi)置的源矢沿。

  • Basic sources:在StreamingContext API中直接可用滥搭。例如:文件系統(tǒng)和socket連接。
  • Advanced sources:如Kafka捣鲸,F(xiàn)lume瑟匆,Kinesis等。通過額外的工具類可用栽惶。這些需要鏈接到額外的依賴項愁溜,這個之前已經(jīng)討論過。

接下來討論每個類別中的一些源外厂。

注意冕象,如果你想在streaming應(yīng)用程序中并行接收多個數(shù)據(jù)流,可以創(chuàng)建多個輸入DStream(在之后性能調(diào)優(yōu)中進(jìn)行討論)汁蝶。這樣會創(chuàng)建多個receiver同時接收多個數(shù)據(jù)流渐扮。但是需要注意的是Spark worker/executor是長線任務(wù),它會占用分配給Spark Streaming應(yīng)用程序的一個CPU核穿仪。因此席爽,要記住Spark Streaming應(yīng)用程序需要分配足夠的CPU核(或線程,如果在本地運行)來處理接收到的數(shù)據(jù)以及運行receiver啊片。

需要記住的幾點

  • 當(dāng)本地運行Spark Streaming程序時,不要使用"local"或者"local[1]"作為master URL玖像。這意味著本地只有一個線程用來運行任務(wù)紫谷。如果你使用基于receiver的輸入DStream(例如sockets胰坟,Kafka击吱,F(xiàn)lume等等),單線程會用于運行receiver,沒有線程會用于處理接收到的數(shù)據(jù)盟戏。因此,當(dāng)本地運行時但骨,要使用"local[n]"作為master URL恋技,其中n大于receiver的數(shù)量(可參考Spark Properties了解如何設(shè)置master)。
  • 將邏輯擴展到集群上運行崇裁,分配給Spark Streaming應(yīng)用程序的核數(shù)必須比receiver的數(shù)量多匕坯。否則系統(tǒng)會接收數(shù)據(jù),但不能處理數(shù)據(jù)拔稳。

Basic Sources
在前面的示例中我們已經(jīng)看到過ssc.socketTextStream(...)葛峻,從TCP socket連接接收文本數(shù)據(jù)并創(chuàng)建一個DStream。處理sockets巴比,StreamingContext API還提供了創(chuàng)建以文件作為源的DStream术奖。

  • File Streams:從任意兼容HDFS API的文件系統(tǒng)(如HDFS, S3, NFS等等)的文件中讀取數(shù)據(jù),可以用如下方式創(chuàng)建DStream:
 streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

Spark Streaming會監(jiān)控目錄dataDirectory轻绞,處理任意在那個目錄中創(chuàng)建的文件(不支持寫入嵌套目錄中的文件)采记。請注意:

  • 文件必須是相同數(shù)據(jù)格式。
  • 文件必須在dataDirectory中創(chuàng)建政勃,通過原子級地移動或重命名文件到數(shù)據(jù)目錄的方式唧龄。
  • 一旦移動,文件必須不能修改稼病。如果文件正在不斷地進(jìn)行追加选侨,那么新數(shù)據(jù)不會被讀取。

對于簡單文本文件然走,有一個更簡單的方法streamingContext.textFileStream(dataDirectory)援制。文件流不會要求運行receiver,因此不需要分配CPU核芍瑞。

  • 基于自定義Receiver的流:可以通過自定義receiver接收數(shù)據(jù)流來創(chuàng)建DStream晨仑。詳細(xì)請參見Custom Receiver Guide

  • RDD隊列作為流:對于使用測試數(shù)據(jù)測試一個Spark Streaming應(yīng)用程序來說拆檬,可以基于RDD隊列來創(chuàng)建DStream洪己,使用streamingContext.queueStream(queueOfRDDs)。添加到隊列中的每個RDD都會被當(dāng)做DStream中一個批次的數(shù)據(jù)竟贯,像數(shù)據(jù)流一樣進(jìn)行處理答捕。

對于來自socket和文件數(shù)據(jù)流的更多細(xì)節(jié)請參見相關(guān)的API文檔,scala請參見StreamingContext屑那,Java請參見JavaStreamingContext拱镐,Python請參見StreamingContext艘款。

Advanced Sources
這個分類的源要和外部非Spark庫交互,其中一些需要復(fù)雜的依賴(如Kafka和Flume)沃琅。因此哗咆,為了盡量減少和依賴庫版本沖突的問題,從這些源創(chuàng)建DStream的功能移到了單獨的庫中益眉,必要時可以顯式鏈接晌柬。

注意這些高級源在Spark shell中不可用,因此基于高級源的應(yīng)用程序不能在shell中測試郭脂。如果想要在Spark shell中使用高級源年碘,需要在下載對應(yīng)的Maven artifact’s JAR以及其依賴的庫,然后添加到classpath中朱庆。

一些高級源如下:

Custom Sources
輸入DStream也可以通過自定義數(shù)據(jù)源創(chuàng)建拴竹。你需要實現(xiàn)用戶定義的receiver(下面會進(jìn)行說明)用于接收自定義源的數(shù)據(jù)并將數(shù)據(jù)存儲到Spark中。具體參見Custom Receiver Guide剧罩。

Receiver可靠性
有兩類基于可靠性的數(shù)據(jù)源栓拜。數(shù)據(jù)源(如Kafka和Flume)允許確認(rèn)傳輸?shù)臄?shù)據(jù)。如果系統(tǒng)從這些能確認(rèn)數(shù)據(jù)正確接收的可靠源接收數(shù)據(jù)惠昔,則可以確保不會有數(shù)據(jù)因為任何類型的失敗而丟失數(shù)據(jù)幕与。下面是兩類receiver:

  1. 可靠Receiver - 當(dāng)數(shù)據(jù)已經(jīng)接收并且按照副本要求存儲到Spark中時,可靠receiver會正確發(fā)送確認(rèn)信息給可靠數(shù)據(jù)源镇防。
  2. 不可靠Receiver - 不可靠數(shù)據(jù)源不能發(fā)送確認(rèn)信息給數(shù)據(jù)源啦鸣。這類receiver用于不支持確認(rèn)的數(shù)據(jù)源,或者對于可靠數(shù)據(jù)源来氧,不想或不需要進(jìn)行確認(rèn)诫给。

如何編寫可靠receiver在中Custom Receiver Guide進(jìn)行討論。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末啦扬,一起剝皮案震驚了整個濱河市中狂,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌扑毡,老刑警劉巖胃榕,帶你破解...
    沈念sama閱讀 211,042評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異瞄摊,居然都是意外死亡勤晚,警方通過查閱死者的電腦和手機枉层,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,996評論 2 384
  • 文/潘曉璐 我一進(jìn)店門泉褐,熙熙樓的掌柜王于貴愁眉苦臉地迎上來赐写,“玉大人,你說我怎么就攤上這事膜赃⊥ρ” “怎么了?”我有些...
    開封第一講書人閱讀 156,674評論 0 345
  • 文/不壞的土叔 我叫張陵跳座,是天一觀的道長端铛。 經(jīng)常有香客問我,道長疲眷,這世上最難降的妖魔是什么禾蚕? 我笑而不...
    開封第一講書人閱讀 56,340評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮狂丝,結(jié)果婚禮上换淆,老公的妹妹穿的比我還像新娘。我一直安慰自己几颜,他們只是感情好倍试,可當(dāng)我...
    茶點故事閱讀 65,404評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著蛋哭,像睡著了一般县习。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上谆趾,一...
    開封第一講書人閱讀 49,749評論 1 289
  • 那天躁愿,我揣著相機與錄音,去河邊找鬼沪蓬。 笑死彤钟,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的怜跑。 我是一名探鬼主播样勃,決...
    沈念sama閱讀 38,902評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼性芬!你這毒婦竟也來了峡眶?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,662評論 0 266
  • 序言:老撾萬榮一對情侶失蹤植锉,失蹤者是張志新(化名)和其女友劉穎辫樱,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體俊庇,經(jīng)...
    沈念sama閱讀 44,110評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡狮暑,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年鸡挠,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片搬男。...
    茶點故事閱讀 38,577評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡拣展,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出缔逛,到底是詐尸還是另有隱情备埃,我是刑警寧澤,帶...
    沈念sama閱讀 34,258評論 4 328
  • 正文 年R本政府宣布褐奴,位于F島的核電站按脚,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏敦冬。R本人自食惡果不足惜辅搬,卻給世界環(huán)境...
    茶點故事閱讀 39,848評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望脖旱。 院中可真熱鬧堪遂,春花似錦、人聲如沸夯缺。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,726評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽踊兜。三九已至竿滨,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間捏境,已是汗流浹背于游。 一陣腳步聲響...
    開封第一講書人閱讀 31,952評論 1 264
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留垫言,地道東北人贰剥。 一個月前我還...
    沈念sama閱讀 46,271評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像筷频,于是被迫代替她去往敵國和親蚌成。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,452評論 2 348

推薦閱讀更多精彩內(nèi)容