Apache Spark 2.2.0 中文文檔 - Spark Streaming 編程指南 | ApacheCN

Spark Streaming 編程指南

概述

一個(gè)入門(mén)示例

基礎(chǔ)概念

依賴(lài)

初始化 StreamingContext

Discretized Streams (DStreams)(離散化流)

Input DStreams 和 Receivers(接收器)

DStreams 上的 Transformations(轉(zhuǎn)換)

DStreams 上的輸出操作

DataFrame 和 SQL 操作

MLlib 操作

緩存 / 持久性

Checkpointing

Accumulators, Broadcast 變量, 和 Checkpoint

應(yīng)用程序部署

Monitoring Applications (監(jiān)控應(yīng)用程序)

Performance Tuning (性能調(diào)優(yōu))

Reducing the Batch Processing Times (減少批處理時(shí)間)

Setting the Right Batch Interval (設(shè)置正確的批次間隔)

Memory Tuning (內(nèi)存調(diào)優(yōu))

Fault-tolerance Semantics (容錯(cuò)語(yǔ)義)

快速鏈接

概述

Spark Streaming 是 Spark Core API 的擴(kuò)展, 它支持彈性的, 高吞吐的, 容錯(cuò)的實(shí)時(shí)數(shù)據(jù)流的處理. 數(shù)據(jù)可以通過(guò)多種數(shù)據(jù)源獲取, 例如 Kafka, Flume, Kinesis 以及 TCP sockets, 也可以通過(guò)例如map,reduce,join,window等的高級(jí)函數(shù)組成的復(fù)雜算法處理. 最終, 處理后的數(shù)據(jù)可以輸出到文件系統(tǒng), 數(shù)據(jù)庫(kù)以及實(shí)時(shí)儀表盤(pán)中. 事實(shí)上, 你還可以在 data streams(數(shù)據(jù)流)上使用機(jī)器學(xué)習(xí)以及圖形處理算法.

在內(nèi)部, 它工作原理如下, Spark Streaming 接收實(shí)時(shí)輸入數(shù)據(jù)流并將數(shù)據(jù)切分成多個(gè) batch(批)數(shù)據(jù), 然后由 Spark 引擎處理它們以生成最終的 stream of results in batches(分批流結(jié)果).

Spark Streaming 提供了一個(gè)名為discretized stream或DStream的高級(jí)抽象, 它代表一個(gè)連續(xù)的數(shù)據(jù)流. DStream 可以從數(shù)據(jù)源的輸入數(shù)據(jù)流創(chuàng)建, 例如 Kafka, Flume 以及 Kinesis, 或者在其他 DStream 上進(jìn)行高層次的操作以創(chuàng)建. 在內(nèi)部, 一個(gè) DStream 是通過(guò)一系列的RDDs來(lái)表示.

本指南告訴你如何使用 DStream 來(lái)編寫(xiě)一個(gè) Spark Streaming 程序. 你可以使用 Scala , Java 或者 Python(Spark 1.2 版本后引進(jìn))來(lái)編寫(xiě) Spark Streaming 程序. 所有這些都在本指南中介紹. 您可以在本指南中找到標(biāo)簽, 讓您可以選擇不同語(yǔ)言的代碼段.

Note(注意):在 Python 有些 API 可能會(huì)有不同或不可用. 在本指南, 您將找到Python API的標(biāo)簽來(lái)高亮顯示不同的地方.

一個(gè)入門(mén)示例

在我們?cè)敿?xì)介紹如何編寫(xiě)你自己的 Spark Streaming 程序的細(xì)節(jié)之前, 讓我們先來(lái)看一看一個(gè)簡(jiǎn)單的 Spark Streaming 程序的樣子. 比方說(shuō), 我們想要計(jì)算從一個(gè)監(jiān)聽(tīng) TCP socket 的數(shù)據(jù)服務(wù)器接收到的文本數(shù)據(jù)(text data)中的字?jǐn)?shù). 你需要做的就是照著下面的步驟做.

Scala

Java

Python

首先, 我們導(dǎo)入了 Spark Streaming 類(lèi)和部分從 StreamingContext 隱式轉(zhuǎn)換到我們的環(huán)境的名稱(chēng), 目的是添加有用的方法到我們需要的其他類(lèi)(如 DStream).StreamingContext是所有流功能的主要入口點(diǎn). 我們創(chuàng)建了一個(gè)帶有 2 個(gè)執(zhí)行線(xiàn)程和間歇時(shí)間為 1 秒的本地 StreamingContext.

importorg.apache.spark._importorg.apache.spark.streaming._importorg.apache.spark.streaming.StreamingContext._// 自從 Spark 1.3 開(kāi)始, 不再是必要的了// 創(chuàng)建一個(gè)具有兩個(gè)工作線(xiàn)程(working thread)并且批次間隔為 1 秒的本地 StreamingContext .// master 需要 2 個(gè)核, 以防止饑餓情況(starvation scenario).valconf=newSparkConf().setMaster("local[2]").setAppName("NetworkWordCount")valssc=newStreamingContext(conf,Seconds(1))

Using this context, we can create a DStream that represents streaming data from a TCP source, specified as hostname (e.g.localhost) and port (e.g.9999). 使用該 context, 我們可以創(chuàng)建一個(gè)代表從 TCP 源流數(shù)據(jù)的離散流(DStream), 指定主機(jī)名(hostname)(例如 localhost)和端口(例如 9999).

// 創(chuàng)建一個(gè)將要連接到 hostname:port 的 DStream苍匆,如 localhost:9999vallines=ssc.socketTextStream("localhost",9999)

上一步的這個(gè)linesDStream 表示將要從數(shù)據(jù)服務(wù)器接收到的數(shù)據(jù)流. 在這個(gè)離散流(DStream)中的每一條記錄都是一行文本(text). 接下來(lái)面哼,我們想要通過(guò)空格字符(space characters)拆分這些數(shù)據(jù)行(lines)成單詞(words).

// 將每一行拆分成 words(單詞)valwords=lines.flatMap(_.split(" "))

flatMap是一種 one-to-many(一對(duì)多)的離散流(DStream)操作溅固,它會(huì)通過(guò)在源離散流(source DStream)中根據(jù)每個(gè)記錄(record)生成多個(gè)新紀(jì)錄的形式創(chuàng)建一個(gè)新的離散流(DStream). 在這種情況下堕担,在這種情況下乏苦,每一行(each line)都將被拆分成多個(gè)單詞(words)和代表單詞離散流(words DStream)的單詞流. 接下來(lái)努酸,我們想要計(jì)算這些單詞.

importorg.apache.spark.streaming.StreamingContext._// not necessary since Spark 1.3// 計(jì)算每一個(gè) batch(批次)中的每一個(gè) word(單詞)valpairs=words.map(word=>(word,1))valwordCounts=pairs.reduceByKey(_+_)// 在控制臺(tái)打印出在這個(gè)離散流(DStream)中生成的每個(gè) RDD 的前十個(gè)元素// 注意: 必需要觸發(fā) action(很多初學(xué)者會(huì)忘記觸發(fā) action 操作少孝,導(dǎo)致報(bào)錯(cuò):No output operations registered, so nothing to execute)wordCounts.print()

上一步的wordsDStream 進(jìn)行了進(jìn)一步的映射(一對(duì)一的轉(zhuǎn)換)為一個(gè) (word, 1) paris 的離散流(DStream)理茎,這個(gè) DStream 然后被規(guī)約(reduce)來(lái)獲得數(shù)據(jù)中每個(gè)批次(batch)的單詞頻率. 最后饿序,wordCounts.print()將會(huì)打印一些每秒生成的計(jì)數(shù).

Note that when these lines are executed, Spark Streaming only sets up the computation it will perform when it is started, and no real processing has started yet. To start the processing after all the transformations have been setup, we finally call

請(qǐng)注意勉失,當(dāng)這些行(lines)被執(zhí)行的時(shí)候, Spark Streaming 僅僅設(shè)置了計(jì)算, 只有在啟動(dòng)時(shí)才會(huì)執(zhí)行原探,并沒(méi)有開(kāi)始真正地處理. 為了在所有的轉(zhuǎn)換都已經(jīng)設(shè)置好之后開(kāi)始處理戴质,我們?cè)谧詈笳{(diào)用:

ssc.start()// 開(kāi)始計(jì)算ssc.awaitTermination()// 等待計(jì)算被中斷

該部分完整的代碼可以在 Spark Streaming 示例NetworkWordCount中找到.

如果你已經(jīng)下載并且構(gòu)建Spark, 您可以使用如下方式來(lái)運(yùn)行該示例. 你首先需要運(yùn)行 Netcat(一個(gè)在大多數(shù)類(lèi) Unix 系統(tǒng)中的小工具)作為我們使用的數(shù)據(jù)服務(wù)器.

$ nc -lk9999

然后,在另一個(gè)不同的終端踢匣,你可以通過(guò)執(zhí)行如下命令來(lái)運(yùn)行該示例:

Scala

Java

Python

$ ./bin/run-example streaming.NetworkWordCount localhost9999

然后告匠,在運(yùn)行在 netcat 服務(wù)器上的終端輸入的任何行(lines),都將被計(jì)算离唬,并且每一秒都顯示在屏幕上后专,它看起來(lái)就像下面這樣:

# TERMINAL 1:# Running Netcat$ nc -lk9999hello world...

Scala

Java

Python

# TERMINAL 2: RUNNING NetworkWordCount$ ./bin/run-example streaming.NetworkWordCount localhost9999...-------------------------------------------Time:1357008430000ms-------------------------------------------(hello,1)(world,1)...

基礎(chǔ)概念

接下來(lái),我們了解完了簡(jiǎn)單的例子输莺,開(kāi)始闡述 Spark Streaming 的基本知識(shí)戚哎。

依賴(lài)

與 Spark 類(lèi)似,Spark Streaming 可以通過(guò) Maven 來(lái)管理依賴(lài). 為了編寫(xiě)你自己的 Spark Streaming 程序嫂用,你必須添加以下的依賴(lài)到你的 SBT 或者 Maven 項(xiàng)目中.

Maven

SBT

org.apache.spark

spark-streaming_2.11

2.2.0

針對(duì)從 Spark Streaming Core API 中不存在的數(shù)據(jù)源中獲取數(shù)據(jù)型凳,如 Kafka, Flume嘱函,Kinesis 甘畅,你必須添加相應(yīng)的坐標(biāo)spark-streaming-xyz_2.11到依賴(lài)中. 例如,有一些常見(jiàn)的依賴(lài)如下.

Source(數(shù)據(jù)源)Artifact(坐標(biāo))

Kafkaspark-streaming-kafka-0-8_2.11

Flumespark-streaming-flume_2.11

Kinesis

spark-streaming-kinesis-asl_2.11 [Amazon Software License]

想要查看一個(gè)實(shí)時(shí)更新的列表往弓,請(qǐng)參閱Maven repository來(lái)了解支持的 sources(數(shù)據(jù)源)和 artifacts(坐標(biāo))的完整列表疏唾。

初始化 StreamingContext

為了初始化一個(gè) Spark Streaming 程序, 一個(gè)StreamingContext對(duì)象必須要被創(chuàng)建出來(lái),它是所有的 Spark Streaming 功能的主入口點(diǎn)函似。

Scala

Java

Python

一個(gè)StreamingContext對(duì)象可以從一個(gè)SparkConf對(duì)象中來(lái)創(chuàng)建.

importorg.apache.spark._importorg.apache.spark.streaming._valconf=newSparkConf().setAppName(appName).setMaster(master)valssc=newStreamingContext(conf,Seconds(1))

這個(gè)appName參數(shù)是展示在集群 UI 界面上的應(yīng)用程序的名稱(chēng).master是一個(gè)Spark, Mesos or YARN cluster URL, 或者一個(gè)特殊的“l(fā)ocal[*]”字符串以使用 local mode(本地模式)來(lái)運(yùn)行. 在實(shí)踐中槐脏,當(dāng)在集群上運(yùn)行時(shí),你不會(huì)想在應(yīng)用程序中硬編碼master撇寞,而是使用spark-submit來(lái)啟動(dòng)應(yīng)用程序, 并且接受該參數(shù). 然而顿天,對(duì)于本地測(cè)試和單元測(cè)試堂氯,你可以傳遞 “l(fā)ocal[*]” 來(lái)運(yùn)行 Spark Streaming 進(jìn)程(檢測(cè)本地系統(tǒng)中內(nèi)核的個(gè)數(shù)). 請(qǐng)注意,做個(gè)內(nèi)部創(chuàng)建了一個(gè)SparkContext(所有 Spark 功能的出發(fā)點(diǎn))牌废,它可以像 ssc.sparkContext 這樣被訪問(wèn).

這個(gè) batch interval(批間隔)必須根據(jù)您的應(yīng)用程序和可用的集群資源的等待時(shí)間要求進(jìn)行設(shè)置. 更多詳情請(qǐng)參閱優(yōu)化指南部分.

一個(gè)StreamingContext對(duì)象也可以從一個(gè)現(xiàn)有的SparkContext對(duì)象來(lái)創(chuàng)建.

importorg.apache.spark.streaming._valsc=...// 已存在的 SparkContextvalssc=newStreamingContext(sc,Seconds(1))

在定義一個(gè) context 之后,您必須執(zhí)行以下操作.

通過(guò)創(chuàng)建輸入 DStreams 來(lái)定義輸入源.

通過(guò)應(yīng)用轉(zhuǎn)換和輸出操作 DStreams 定義流計(jì)算(streaming computations).

開(kāi)始接收輸入并且使用streamingContext.start()來(lái)處理數(shù)據(jù).

使用streamingContext.awaitTermination()等待處理被終止(手動(dòng)或者由于任何錯(cuò)誤).

使用streamingContext.stop()來(lái)手動(dòng)的停止處理.

需要記住的幾點(diǎn):

一旦一個(gè) context 已經(jīng)啟動(dòng)咽白,將不會(huì)有新的數(shù)據(jù)流的計(jì)算可以被創(chuàng)建或者添加到它。.

一旦一個(gè) context 已經(jīng)停止畔规,它不會(huì)被重新啟動(dòng).

同一時(shí)間內(nèi)在 JVM 中只有一個(gè) StreamingContext 可以被激活.

在 StreamingContext 上的 stop() 同樣也停止了 SparkContext 局扶。為了只停止 StreamingContext ,設(shè)置stop()的可選參數(shù),名叫stopSparkContext為 false.

一個(gè) SparkContext 就可以被重用以創(chuàng)建多個(gè) StreamingContexts,只要前一個(gè) StreamingContext 在下一個(gè)StreamingContext 被創(chuàng)建之前停止(不停止 SparkContext).

Discretized Streams (DStreams)(離散化流)

Discretized StreamorDStream是 Spark Streaming 提供的基本抽象. 它代表了一個(gè)連續(xù)的數(shù)據(jù)流, 無(wú)論是從 source(數(shù)據(jù)源)接收到的輸入數(shù)據(jù)流, 還是通過(guò)轉(zhuǎn)換輸入流所產(chǎn)生的處理過(guò)的數(shù)據(jù)流. 在內(nèi)部, 一個(gè) DStream 被表示為一系列連續(xù)的 RDDs, 它是 Spark 中一個(gè)不可改變的抽象, distributed dataset (的更多細(xì)節(jié)請(qǐng)看Spark 編程指南. 在一個(gè) DStream 中的每個(gè) RDD 包含來(lái)自一定的時(shí)間間隔的數(shù)據(jù)烹骨,如下圖所示.

應(yīng)用于 DStream 的任何操作轉(zhuǎn)化為對(duì)于底層的 RDDs 的操作. 例如,在先前的示例畴蒲,轉(zhuǎn)換一個(gè)行(lines)流成為單詞(words)中,flatMap 操作被應(yīng)用于在行離散流(lines DStream)中的每個(gè) RDD 來(lái)生成單詞離散流(words DStream)的 RDDs . 如下所示.

這些底層的 RDD 變換由 Spark 引擎(engine)計(jì)算对室。 DStream 操作隱藏了大多數(shù)這些細(xì)節(jié)并為了方便起見(jiàn)模燥,提供給了開(kāi)發(fā)者一個(gè)更高級(jí)別的 API 。這些操作細(xì)節(jié)會(huì)在后邊的章節(jié)中討論掩宜。

Input DStreams 和 Receivers(接收器)

輸入 DStreams 是代表輸入數(shù)據(jù)是從流的源數(shù)據(jù)(streaming sources)接收到的流的 DStream. 在一個(gè)入門(mén)示例中,lines是一個(gè) input DStream, 因?yàn)樗碇鴱?netcat 服務(wù)器接收到的數(shù)據(jù)的流. 每一個(gè) input DStream(除了 file stream 之外, 會(huì)在本章的后面來(lái)討論)與一個(gè)Receiver(Scala doc,Java doc) 對(duì)象關(guān)聯(lián), 它從 source(數(shù)據(jù)源)中獲取數(shù)據(jù)蔫骂,并且存儲(chǔ)它到 Sparl 的內(nèi)存中用于處理.

Spark Streaming 提供了兩種內(nèi)置的 streaming source(流的數(shù)據(jù)源).

Basic sources(基礎(chǔ)的數(shù)據(jù)源): 在 StreamingContext API 中直接可以使用的數(shù)據(jù)源. 例如: file systems 和 socket connections.

Advanced sources(高級(jí)的數(shù)據(jù)源): 像 Kafka, Flume, Kinesis, 等等這樣的數(shù)據(jù)源. 可以通過(guò)額外的 utility classes 來(lái)使用. 像在依賴(lài)中討論的一樣, 這些都需要額外的外部依賴(lài).

在本節(jié)的后邊,我們將討論每種類(lèi)別中的現(xiàn)有的一些數(shù)據(jù)源.

請(qǐng)注意, 如果你想要在你的流處理程序中并行的接收多個(gè)數(shù)據(jù)流, 你可以創(chuàng)建多個(gè) input DStreams(在性能優(yōu)化部分進(jìn)一步討論). 這將創(chuàng)建同時(shí)接收多個(gè)數(shù)據(jù)流的多個(gè) receivers(接收器). 但需要注意牺汤,一個(gè) Spark 的 worker/executor 是一個(gè)長(zhǎng)期運(yùn)行的任務(wù)(task)辽旋,因此它將占用分配給 Spark Streaming 的應(yīng)用程序的所有核中的一個(gè)核(core). 因此,要記住檐迟,一個(gè) Spark Streaming 應(yīng)用需要分配足夠的核(core)(或線(xiàn)程(threads)补胚,如果本地運(yùn)行的話(huà))來(lái)處理所接收的數(shù)據(jù),以及來(lái)運(yùn)行接收器(receiver(s)).

要記住的幾點(diǎn)

當(dāng)在本地運(yùn)行一個(gè) Spark Streaming 程序的時(shí)候追迟,不要使用 “l(fā)ocal” 或者 “l(fā)ocal[1]” 作為 master 的 URL. 這兩種方法中的任何一個(gè)都意味著只有一個(gè)線(xiàn)程將用于運(yùn)行本地任務(wù). 如果你正在使用一個(gè)基于接收器(receiver)的輸入離散流(input DStream)(例如溶其, sockets ,Kafka 敦间,F(xiàn)lume 等)瓶逃,則該單獨(dú)的線(xiàn)程將用于運(yùn)行接收器(receiver),而沒(méi)有留下任何的線(xiàn)程用于處理接收到的數(shù)據(jù). 因此每瞒,在本地運(yùn)行時(shí)金闽,總是用 “l(fā)ocal[n]” 作為 master URL ,其中的 n > 運(yùn)行接收器的數(shù)量(查看Spark 屬性來(lái)了解怎樣去設(shè)置 master 的信息).

將邏輯擴(kuò)展到集群上去運(yùn)行剿骨,分配給 Spark Streaming 應(yīng)用程序的內(nèi)核(core)的內(nèi)核數(shù)必須大于接收器(receiver)的數(shù)量。否則系統(tǒng)將接收數(shù)據(jù)埠褪,但是無(wú)法處理它.

基礎(chǔ)的 Sources(數(shù)據(jù)源)

我們已經(jīng)簡(jiǎn)單地了解過(guò)了在入門(mén)示例中ssc.socketTextStream(...)的例子浓利,例子中是通過(guò)從一個(gè) TCP socket 連接接收到的文本數(shù)據(jù)來(lái)創(chuàng)建了一個(gè)離散流(DStream). 除了 sockets 之外挤庇,StreamingContext API 也提供了根據(jù)文件作為輸入來(lái)源創(chuàng)建離散流(DStreams)的方法。

File Streams:用于從文件中讀取數(shù)據(jù)贷掖,在任何與 HDFS API 兼容的文件系統(tǒng)中(即嫡秕,HDFS,S3苹威,NFS 等)昆咽,一個(gè) DStream 可以像下面這樣創(chuàng)建:

Scala

Java

Python

streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

Spark Streaming 將監(jiān)控dataDirectory目錄并且該目錄中任何新建的文件 (寫(xiě)在嵌套目錄中的文件是不支持的). 注意

文件必須具有相同的數(shù)據(jù)格式.

文件必須被創(chuàng)建在dataDirectory目錄中, 通過(guò) atomically(院子的)moving(移動(dòng))或renaming(重命名)它們到數(shù)據(jù)目錄.

一旦移動(dòng),這些文件必須不能再更改牙甫,因此如果文件被連續(xù)地追加掷酗,新的數(shù)據(jù)將不會(huì)被讀取.

對(duì)于簡(jiǎn)單的文本文件,還有一個(gè)更加簡(jiǎn)單的方法streamingContext.textFileStream(dataDirectory). 并且文件流(file streams)不需要運(yùn)行一個(gè)接收器(receiver)窟哺,因此泻轰,不需要分配內(nèi)核(core)。

Python API在 Python API 中fileStream是不可用的, 只有textFileStream是可用的.

Streams based on Custom Receivers(基于自定義的接收器的流):DStreams 可以使用通過(guò)自定義的 receiver(接收器)接收到的數(shù)據(jù)來(lái)創(chuàng)建. 更多細(xì)節(jié)請(qǐng)參閱自定義 Receiver 指南.

Queue of RDDs as a Stream(RDDs 隊(duì)列作為一個(gè)流):為了使用測(cè)試數(shù)據(jù)測(cè)試 Spark Streaming 應(yīng)用程序且轨,還可以使用streamingContext.queueStream(queueOfRDDs)創(chuàng)建一個(gè)基于 RDDs 隊(duì)列的 DStream浮声,每個(gè)進(jìn)入隊(duì)列的 RDD 都將被視為 DStream 中的一個(gè)批次數(shù)據(jù),并且就像一個(gè)流進(jìn)行處理.

想要了解更多的關(guān)于從 sockets 和文件(files)創(chuàng)建的流的細(xì)節(jié), 請(qǐng)參閱相關(guān)函數(shù)的 API文檔, 它們?cè)?a target="_blank" rel="nofollow">StreamingContextfor Scala,JavaStreamingContextfor Java 以及StreamingContextfor Python 中.

高級(jí) Sources(數(shù)據(jù)源)

Python API從 Spark 2.2.0 開(kāi)始, 在 Python API 中的 Kafka, Kinesis 和 Flume 這樣的外部數(shù)據(jù)源都是可用的.

這一類(lèi)別的 sources(數(shù)據(jù)源)需要使用非 Spark 庫(kù)中的外部接口旋奢,它們中的其中一些還需要比較復(fù)雜的依賴(lài)關(guān)系(例如泳挥, Kafka 和 Flume). 因此,為了最小化有關(guān)的依賴(lài)關(guān)系的版本沖突的問(wèn)題至朗,這些資源本身不能創(chuàng)建 DStream 的功能屉符,它是通過(guò)依賴(lài)單獨(dú)的類(lèi)庫(kù)實(shí)現(xiàn)創(chuàng)建 DStream 的功能.

請(qǐng)注意, 這些高級(jí) sources(數(shù)據(jù)源)不能再 Spark shell 中使用, 因此,基于這些高級(jí) sources(數(shù)據(jù)源)的應(yīng)用程序不能在 shell 中被測(cè)試. 如果你真的想要在 Spark shell 中使用它們爽丹,你必須下載帶有它的依賴(lài)的相應(yīng)的 Maven 組件的 JAR 筑煮,并且將其添加到 classpath.

一些高級(jí)的 sources(數(shù)據(jù)源)如下.

Kafka:Spark Streaming 2.2.0 與 Kafka broker 版本 0.8.2.1 或更高是兼容的. 更多細(xì)節(jié)請(qǐng)參閱Kafka 集成指南.

Flume:Spark Streaming 2.2.0 與 Flume 1.6.0 相兼容. 更多細(xì)節(jié)請(qǐng)參閱Flume 集成指南.

Kinesis:Spark Streaming 2.2.0 與 Kinesis Client Library 1.2.1 相兼容. 更多細(xì)節(jié)請(qǐng)參閱Kinesis 集成指南.

自定義 Sources(數(shù)據(jù)源)

Python API在 Python 中還不支持這一功能.

Input DStreams 也可以從自定義數(shù)據(jù)源中創(chuàng)建. 如果您想這樣做, 需要實(shí)現(xiàn)一個(gè)用戶(hù)自定義的receiver(看下一節(jié)以了解它是什么), 它可以從自定義的 sources(數(shù)據(jù)源)中接收數(shù)據(jù)并且推送它到 Spark. 更多細(xì)節(jié)請(qǐng)參閱自定義 Receiver 指南.

Receiver Reliability(接收器的可靠性)

可以有兩種基于他們的reliability可靠性的數(shù)據(jù)源. 數(shù)據(jù)源(如 Kafka 和 Flume)允許傳輸?shù)臄?shù)據(jù)被確認(rèn). 如果系統(tǒng)從這些可靠的數(shù)據(jù)來(lái)源接收數(shù)據(jù),并且被確認(rèn)(acknowledges)正確地接收數(shù)據(jù)粤蝎,它可以確保數(shù)據(jù)不會(huì)因?yàn)槿魏晤?lèi)型的失敗而導(dǎo)致數(shù)據(jù)丟失. 這樣就出現(xiàn)了 2 種接收器(receivers):

Reliable Receiver(可靠的接收器)- 當(dāng)數(shù)據(jù)被接收并存儲(chǔ)在 Spark 中并帶有備份副本時(shí)真仲,一個(gè)可靠的接收器(reliable receiver)正確地發(fā)送確認(rèn)(acknowledgment)給一個(gè)可靠的數(shù)據(jù)源(reliable source).

Unreliable Receiver(不可靠的接收器)- 一個(gè)不可靠的接收器( unreliable receiver )不發(fā)送確認(rèn)(acknowledgment)到數(shù)據(jù)源。這可以用于不支持確認(rèn)的數(shù)據(jù)源初澎,或者甚至是可靠的數(shù)據(jù)源當(dāng)你不想或者不需要進(jìn)行復(fù)雜的確認(rèn)的時(shí)候.

自定義 Receiver 指南中描述了關(guān)于如何去編寫(xiě)一個(gè) reliable receiver(可靠的接收器)的細(xì)節(jié).

DStreams 上的 Transformations(轉(zhuǎn)換)

與 RDD 類(lèi)似秸应,transformation 允許從 input DStream 輸入的數(shù)據(jù)做修改. DStreams 支持很多在 RDD 中可用的 transformation 算子。一些常用的如下所示 :

與RDD類(lèi)似碑宴,類(lèi)似软啼,transformation 允許修改來(lái)自 input DStream 的數(shù)據(jù). DStreams 支持標(biāo)準(zhǔn)的 Spark RDD 上可用的許多轉(zhuǎn)換. 一些常見(jiàn)的如下.

Transformation(轉(zhuǎn)換)Meaning(含義)

map(func)利用函數(shù)func處理原 DStream 的每個(gè)元素,返回一個(gè)新的 DStream.

flatMap(func)與 map 相似延柠,但是每個(gè)輸入項(xiàng)可用被映射為 0 個(gè)或者多個(gè)輸出項(xiàng)祸挪。.

filter(func)返回一個(gè)新的 DStream,它僅僅包含原 DStream 中函數(shù)func返回值為 true 的項(xiàng).

repartition(numPartitions)通過(guò)創(chuàng)建更多或者更少的 partition 以改變這個(gè) DStream 的并行級(jí)別(level of parallelism).

union(otherStream)返回一個(gè)新的 DStream贞间,它包含源 DStream 和otherDStream的所有元素.

count()通過(guò) count 源 DStream 中每個(gè) RDD 的元素?cái)?shù)量贿条,返回一個(gè)包含單元素(single-element)RDDs 的新 DStream.

reduce(func)利用函數(shù)func聚集源 DStream 中每個(gè) RDD 的元素雹仿,返回一個(gè)包含單元素(single-element)RDDs 的新 DStream。函數(shù)應(yīng)該是相關(guān)聯(lián)的整以,以使計(jì)算可以并行化.

countByValue()在元素類(lèi)型為 K 的 DStream上胧辽,返回一個(gè)(K,long)pair 的新的 DStream,每個(gè) key 的值是在原 DStream 的每個(gè) RDD 中的次數(shù).

reduceByKey(func, [numTasks])當(dāng)在一個(gè)由 (K,V) pairs 組成的 DStream 上調(diào)用這個(gè)算子時(shí)公黑,返回一個(gè)新的, 由 (K,V) pairs 組成的 DStream邑商,每一個(gè) key 的值均由給定的 reduce 函數(shù)聚合起來(lái).注意:在默認(rèn)情況下,這個(gè)算子利用了 Spark 默認(rèn)的并發(fā)任務(wù)數(shù)去分組凡蚜。你可以用 numTasks 參數(shù)設(shè)置不同的任務(wù)數(shù)人断。

join(otherStream, [numTasks])當(dāng)應(yīng)用于兩個(gè) DStream(一個(gè)包含(K,V)對(duì),一個(gè)包含 (K,W) 對(duì))番刊,返回一個(gè)包含 (K, (V, W)) 對(duì)的新 DStream.

cogroup(otherStream, [numTasks])當(dāng)應(yīng)用于兩個(gè) DStream(一個(gè)包含(K,V)對(duì)含鳞,一個(gè)包含 (K,W) 對(duì)),返回一個(gè)包含 (K, Seq[V], Seq[W]) 的 tuples(元組).

transform(func)通過(guò)對(duì)源 DStream 的每個(gè) RDD 應(yīng)用 RDD-to-RDD 函數(shù)芹务,創(chuàng)建一個(gè)新的 DStream. 這個(gè)可以在 DStream 中的任何 RDD 操作中使用.

updateStateByKey(func)返回一個(gè)新的 "狀態(tài)" 的 DStream蝉绷,其中每個(gè) key 的狀態(tài)通過(guò)在 key 的先前狀態(tài)應(yīng)用給定的函數(shù)和 key 的新 valyes 來(lái)更新. 這可以用于維護(hù)每個(gè) key 的任意狀態(tài)數(shù)據(jù).

其中一些轉(zhuǎn)換值得深入討論.

UpdateStateByKey 操作

該updateStateByKey操作允許您維護(hù)任意狀態(tài),同時(shí)不斷更新新信息. 你需要通過(guò)兩步來(lái)使用它.

定義 state - state 可以是任何的數(shù)據(jù)類(lèi)型.

定義 state update function(狀態(tài)更新函數(shù)) - 使用函數(shù)指定如何使用先前狀態(tài)來(lái)更新?tīng)顟B(tài)枣抱,并從輸入流中指定新值.

在每個(gè) batch 中熔吗,Spark 會(huì)使用狀態(tài)更新函數(shù)為所有已有的 key 更新?tīng)顟B(tài),不管在 batch 中是否含有新的數(shù)據(jù)佳晶。如果這個(gè)更新函數(shù)返回一個(gè) none桅狠,這個(gè) key-value pair 也會(huì)被消除.

讓我們舉個(gè)例子來(lái)說(shuō)明. 在例子中,假設(shè)你想保持在文本數(shù)據(jù)流中看到的每個(gè)單詞的運(yùn)行計(jì)數(shù)轿秧,運(yùn)行次數(shù)用一個(gè) state 表示中跌,它的類(lèi)型是整數(shù), 我們可以使用如下方式來(lái)定義 update 函數(shù):

Scala

Java

Python

defupdateFunction(newValues:Seq[Int],runningCount:Option[Int]):Option[Int]={valnewCount=...// add the new values with the previous running count to get the new countSome(newCount)}

這里是一個(gè)應(yīng)用于包含 words(單詞)的 DStream 上(也就是說(shuō),在先前的示例中菇篡,該pairsDStream 包含了 (word, 1) pair).

valrunningCounts=pairs.updateStateByKey[Int](updateFunction_)

update 函數(shù)將會(huì)被每個(gè)單詞調(diào)用漩符,newValues擁有一系列的 1(來(lái)自 (word, 1) pairs),runningCount 擁有之前的次數(shù).

請(qǐng)注意, 使用updateStateByKey需要配置的checkpoint(檢查點(diǎn))的目錄驱还,這里是更詳細(xì)關(guān)于討論checkpointing的部分.

Transform Operation*(轉(zhuǎn)換操作)

transform 操作(以及它的變化形式如transformWith)允許在 DStream 運(yùn)行任何 RDD-to-RDD 函數(shù). 它能夠被用來(lái)應(yīng)用任何沒(méi)在 DStream API 中提供的 RDD 操作. 例如嗜暴,連接數(shù)據(jù)流中的每個(gè)批(batch)和另外一個(gè)數(shù)據(jù)集的功能并沒(méi)有在 DStream API 中提供,然而你可以簡(jiǎn)單的利用transform方法做到. 這使得有非常強(qiáng)大的可能性. 例如议蟆,可以通過(guò)將輸入數(shù)據(jù)流與預(yù)先計(jì)算的垃圾郵件信息(也可以使用 Spark 一起生成)進(jìn)行實(shí)時(shí)數(shù)據(jù)清理闷沥,然后根據(jù)它進(jìn)行過(guò)濾.

Scala

Java

Python

valspamInfoRDD=ssc.sparkContext.newAPIHadoopRDD(...)// RDD containing spam informationvalcleanedDStream=wordCounts.transform{rdd=>rdd.join(spamInfoRDD).filter(...)// join data stream with spam information to do data cleaning...}

請(qǐng)注意,每個(gè) batch interval(批間隔)提供的函數(shù)被調(diào)用. 這允許你做隨時(shí)間變動(dòng)的 RDD 操作, 即 RDD 操作, 分區(qū)的數(shù)量咐容,廣播變量舆逃,等等. batch 之間等可以改變。

Window Operations(窗口操作)

Spark Streaming 也支持windowed computations(窗口計(jì)算),它允許你在數(shù)據(jù)的一個(gè)滑動(dòng)窗口上應(yīng)用 transformation(轉(zhuǎn)換). 下圖說(shuō)明了這個(gè)滑動(dòng)窗口.

如上圖顯示颖侄,窗口在源 DStream 上slides(滑動(dòng))鸟雏,合并和操作落入窗內(nèi)的源 RDDs享郊,產(chǎn)生窗口化的 DStream 的 RDDs览祖。在這個(gè)具體的例子中,程序在三個(gè)時(shí)間單元的數(shù)據(jù)上進(jìn)行窗口操作炊琉,并且每?jī)蓚€(gè)時(shí)間單元滑動(dòng)一次展蒂。 這說(shuō)明,任何一個(gè)窗口操作都需要指定兩個(gè)參數(shù).

window length(窗口長(zhǎng)度)- 窗口的持續(xù)時(shí)間(圖 3).

sliding interval(滑動(dòng)間隔)- 執(zhí)行窗口操作的間隔(圖 2).

這兩個(gè)參數(shù)必須是 source DStream 的 batch interval(批間隔)的倍數(shù)(圖 1).

讓我們舉例以說(shuō)明窗口操作. 例如苔咪,你想擴(kuò)展前面的例子用來(lái)計(jì)算過(guò)去 30 秒的詞頻锰悼,間隔時(shí)間是 10 秒. 為了達(dá)到這個(gè)目的,我們必須在過(guò)去 30 秒的(wrod, 1)pairs 的pairsDStream 上應(yīng)用reduceByKey操作. 用方法reduceByKeyAndWindow實(shí)現(xiàn).

Scala

Java

Python

// Reduce last 30 seconds of data, every 10 secondsvalwindowedWordCounts=pairs.reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(30),Seconds(10))

一些常用的窗口操作如下所示团赏,這些操作都需要用到上文提到的兩個(gè)參數(shù) -windowLength(窗口長(zhǎng)度)和slideInterval(滑動(dòng)的時(shí)間間隔).

Transformation(轉(zhuǎn)換)Meaning(含義)

window(windowLength,slideInterval)返回一個(gè)新的 DStream, 它是基于 source DStream 的窗口 batch 進(jìn)行計(jì)算的.

countByWindow(windowLength,slideInterval)返回 stream(流)中滑動(dòng)窗口元素的數(shù)

reduceByWindow(func,windowLength,slideInterval)返回一個(gè)新的單元素 stream(流)箕般,它通過(guò)在一個(gè)滑動(dòng)間隔的 stream 中使用func來(lái)聚合以創(chuàng)建. 該函數(shù)應(yīng)該是 associative(關(guān)聯(lián)的)且 commutative(可交換的),以便它可以并行計(jì)算

reduceByKeyAndWindow(func,windowLength,slideInterval, [numTasks])在一個(gè) (K, V) pairs 的 DStream 上調(diào)用時(shí), 返回一個(gè)新的 (K, V) pairs 的 Stream, 其中的每個(gè) key 的 values 是在滑動(dòng)窗口上的 batch 使用給定的函數(shù)func來(lái)聚合產(chǎn)生的.Note(注意):默認(rèn)情況下, 該操作使用 Spark 的默認(rèn)并行任務(wù)數(shù)量(local model 是 2, 在 cluster mode 中的數(shù)量通過(guò)spark.default.parallelism來(lái)確定)來(lái)做 grouping. 您可以通過(guò)一個(gè)可選的numTasks參數(shù)來(lái)設(shè)置一個(gè)不同的 tasks(任務(wù))數(shù)量.

reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval, [numTasks])上述reduceByKeyAndWindow()的更有效的一個(gè)版本舔清,其中使用前一窗口的 reduce 值逐漸計(jì)算每個(gè)窗口的 reduce值. 這是通過(guò)減少進(jìn)入滑動(dòng)窗口的新數(shù)據(jù)丝里,以及 “inverse reducing(逆減)” 離開(kāi)窗口的舊數(shù)據(jù)來(lái)完成的. 一個(gè)例子是當(dāng)窗口滑動(dòng)時(shí)”添加” 和 “減” keys 的數(shù)量. 然而,它僅適用于 “invertible reduce functions(可逆減少函數(shù))”体谒,即具有相應(yīng) “inverse reduce(反向減少)” 函數(shù)的 reduce 函數(shù)(作為參數(shù)invFunc ). 像在reduceByKeyAndWindow中的那樣, reduce 任務(wù)的數(shù)量可以通過(guò)可選參數(shù)進(jìn)行配置. 請(qǐng)注意, 針對(duì)該操作的使用必須啟用checkpointing.

countByValueAndWindow(windowLength,slideInterval, [numTasks])在一個(gè) (K, V) pairs 的 DStream 上調(diào)用時(shí), 返回一個(gè)新的 (K, Long) pairs 的 DStream, 其中每個(gè) key 的 value 是它在一個(gè)滑動(dòng)窗口之內(nèi)的頻次. 像 code>reduceByKeyAndWindow 中的那樣, reduce 任務(wù)的數(shù)量可以通過(guò)可選參數(shù)進(jìn)行配置.

Join 操作

最后杯聚,它值得強(qiáng)調(diào)的是,您可以輕松地在 Spark Streaming 中執(zhí)行不同類(lèi)型的 join.

Stream-stream joins

Streams(流)可以非常容易地與其他流進(jìn)行 join.

Scala

Java

Python

valstream1:DStream[String,String]=...valstream2:DStream[String,String]=...valjoinedStream=stream1.join(stream2)

這里抒痒,在每個(gè) batch interval(批間隔)中幌绍,由stream1生成的 RDD 將與stream2生成的 RDD 進(jìn)行 jion. 你也可以做leftOuterJoin,rightOuterJoin故响,fullOuterJoin. 此外傀广,在 stream(流)的窗口上進(jìn)行 join 通常是非常有用的. 這也很容易做到.

Scala

Java

Python

valwindowedStream1=stream1.window(Seconds(20))valwindowedStream2=stream2.window(Minutes(1))valjoinedStream=windowedStream1.join(windowedStream2)

Stream-dataset joins

這在解釋DStream.transform操作時(shí)已經(jīng)在前面演示過(guò)了. 這是另一個(gè) join window stream(窗口流)與 dataset 的例子.

Scala

Java

Python

valdataset:RDD[String,String]=...valwindowedStream=stream.window(Seconds(20))...valjoinedStream=windowedStream.transform{rdd=>rdd.join(dataset)}

實(shí)際上,您也可以動(dòng)態(tài)更改要加入的 dataset. 提供給transform的函數(shù)是每個(gè) batch interval(批次間隔)進(jìn)行評(píng)估彩届,因此將使用dataset引用指向當(dāng)前的 dataset.

DStream 轉(zhuǎn)換的完整列表可在 API 文檔中找到. 針對(duì) Scala API伪冰,請(qǐng)看DStreamPairDStreamFunctions. 針對(duì) Java API,請(qǐng)看JavaDStreamJavaPairDStream. 針對(duì) Python API惨缆,請(qǐng)看DStream.

DStreams 上的輸出操作

輸出操作允許將 DStream 的數(shù)據(jù)推送到外部系統(tǒng), 如數(shù)據(jù)庫(kù)或文件系統(tǒng). 由于輸出操作實(shí)際上允許外部系統(tǒng)使用變換后的數(shù)據(jù), 所以它們觸發(fā)所有 DStream 變換的實(shí)際執(zhí)行(類(lèi)似于RDD的動(dòng)作). 目前, 定義了以下輸出操作:

Output OperationMeaning

print()在運(yùn)行流應(yīng)用程序的 driver 節(jié)點(diǎn)上的DStream中打印每批數(shù)據(jù)的前十個(gè)元素. 這對(duì)于開(kāi)發(fā)和調(diào)試很有用.

Python API這在 Python API 中稱(chēng)為pprint().

saveAsTextFiles(prefix, [suffix])將此 DStream 的內(nèi)容另存為文本文件. 每個(gè)批處理間隔的文件名是根據(jù)前綴和后綴:"prefix-TIME_IN_MS[.suffix]"生成的.

saveAsObjectFiles(prefix, [suffix])將此 DStream 的內(nèi)容另存為序列化 Java 對(duì)象的SequenceFiles. 每個(gè)批處理間隔的文件名是根據(jù)前綴和后綴:"prefix-TIME_IN_MS[.suffix]"生成的.

Python API這在Python API中是不可用的.

saveAsHadoopFiles(prefix, [suffix])將此 DStream 的內(nèi)容另存為 Hadoop 文件. 每個(gè)批處理間隔的文件名是根據(jù)前綴和后綴:"prefix-TIME_IN_MS[.suffix]"生成的.

Python API這在Python API中是不可用的.

foreachRDD(func)對(duì)從流中生成的每個(gè) RDD 應(yīng)用函數(shù)func的最通用的輸出運(yùn)算符. 此功能應(yīng)將每個(gè) RDD 中的數(shù)據(jù)推送到外部系統(tǒng), 例如將 RDD 保存到文件, 或?qū)⑵渫ㄟ^(guò)網(wǎng)絡(luò)寫(xiě)入數(shù)據(jù)庫(kù). 請(qǐng)注意, 函數(shù)func在運(yùn)行流應(yīng)用程序的 driver 進(jìn)程中執(zhí)行, 通常會(huì)在其中具有 RDD 動(dòng)作, 這將強(qiáng)制流式傳輸 RDD 的計(jì)算.

foreachRDD 設(shè)計(jì)模式的使用

dstream.foreachRDD是一個(gè)強(qiáng)大的原語(yǔ), 允許將數(shù)據(jù)發(fā)送到外部系統(tǒng).但是, 了解如何正確有效地使用這個(gè)原語(yǔ)很重要. 避免一些常見(jiàn)的錯(cuò)誤如下.

通常向外部系統(tǒng)寫(xiě)入數(shù)據(jù)需要?jiǎng)?chuàng)建連接對(duì)象(例如與遠(yuǎn)程服務(wù)器的 TCP 連接), 并使用它將數(shù)據(jù)發(fā)送到遠(yuǎn)程系統(tǒng).為此, 開(kāi)發(fā)人員可能會(huì)無(wú)意中嘗試在Spark driver 中創(chuàng)建連接對(duì)象, 然后嘗試在Spark工作人員中使用它來(lái)在RDD中保存記錄.例如(在 Scala 中):

Scala

Java

Python

dstream.foreachRDD{rdd=>valconnection=createNewConnection()// executed at the driverrdd.foreach{record=>connection.send(record)// executed at the worker}}

這是不正確的, 因?yàn)檫@需要將連接對(duì)象序列化并從 driver 發(fā)送到 worker. 這種連接對(duì)象很少能跨機(jī)器轉(zhuǎn)移. 此錯(cuò)誤可能會(huì)顯示為序列化錯(cuò)誤(連接對(duì)象不可序列化), 初始化錯(cuò)誤(連接對(duì)象需要在 worker 初始化)等. 正確的解決方案是在 worker 創(chuàng)建連接對(duì)象.

但是, 這可能會(huì)導(dǎo)致另一個(gè)常見(jiàn)的錯(cuò)誤 - 為每個(gè)記錄創(chuàng)建一個(gè)新的連接. 例如:

Scala

Java

Python

dstream.foreachRDD{rdd=>rdd.foreach{record=>valconnection=createNewConnection()connection.send(record)connection.close()}}

通常, 創(chuàng)建連接對(duì)象具有時(shí)間和資源開(kāi)銷(xiāo). 因此, 創(chuàng)建和銷(xiāo)毀每個(gè)記錄的連接對(duì)象可能會(huì)引起不必要的高開(kāi)銷(xiāo), 并可顯著降低系統(tǒng)的總體吞吐量. 一個(gè)更好的解決方案是使用rdd.foreachPartition- 創(chuàng)建一個(gè)連接對(duì)象, 并使用該連接在 RDD 分區(qū)中發(fā)送所有記錄.

Scala

Java

Python

dstream.foreachRDD{rdd=>rdd.foreachPartition{partitionOfRecords=>valconnection=createNewConnection()partitionOfRecords.foreach(record=>connection.send(record))connection.close()}}

這樣可以在多個(gè)記錄上分?jǐn)傔B接創(chuàng)建開(kāi)銷(xiāo).

最后, 可以通過(guò)跨多個(gè)RDD /批次重用連接對(duì)象來(lái)進(jìn)一步優(yōu)化. 可以維護(hù)連接對(duì)象的靜態(tài)池, 而不是將多個(gè)批次的 RDD 推送到外部系統(tǒng)時(shí)重新使用, 從而進(jìn)一步減少開(kāi)銷(xiāo).

Scala

Java

Python

dstream.foreachRDD{rdd=>rdd.foreachPartition{partitionOfRecords=>// ConnectionPool is a static, lazily initialized pool of connectionsvalconnection=ConnectionPool.getConnection()partitionOfRecords.foreach(record=>connection.send(record))ConnectionPool.returnConnection(connection)// return to the pool for future reuse}}

請(qǐng)注意, 池中的連接應(yīng)根據(jù)需要懶惰創(chuàng)建, 如果不使用一段時(shí)間, 則會(huì)超時(shí). 這實(shí)現(xiàn)了最有效地將數(shù)據(jù)發(fā)送到外部系統(tǒng).

其他要記住的要點(diǎn):

DStreams 通過(guò)輸出操作進(jìn)行延遲執(zhí)行, 就像 RDD 由 RDD 操作懶惰地執(zhí)行. 具體來(lái)說(shuō), DStream 輸出操作中的 RDD 動(dòng)作強(qiáng)制處理接收到的數(shù)據(jù).因此, 如果您的應(yīng)用程序沒(méi)有任何輸出操作, 或者具有dstream.foreachRDD()等輸出操作, 而在其中沒(méi)有任何 RDD 操作, 則不會(huì)執(zhí)行任何操作.系統(tǒng)將簡(jiǎn)單地接收數(shù)據(jù)并將其丟棄.

默認(rèn)情況下, 輸出操作是 one-at-a-time 執(zhí)行的. 它們按照它們?cè)趹?yīng)用程序中定義的順序執(zhí)行.

DataFrame 和 SQL 操作

您可以輕松地在流數(shù)據(jù)上使用DataFrames and SQL和 SQL 操作. 您必須使用 StreamingContext 正在使用的 SparkContext 創(chuàng)建一個(gè) SparkSession.此外, 必須這樣做, 以便可以在 driver 故障時(shí)重新啟動(dòng). 這是通過(guò)創(chuàng)建一個(gè)簡(jiǎn)單實(shí)例化的 SparkSession 單例實(shí)例來(lái)實(shí)現(xiàn)的.這在下面的示例中顯示.它使用 DataFrames 和 SQL 來(lái)修改早期的字?jǐn)?shù)示例以生成單詞計(jì)數(shù).將每個(gè) RDD 轉(zhuǎn)換為 DataFrame, 注冊(cè)為臨時(shí)表, 然后使用 SQL 進(jìn)行查詢(xún).

Scala

Java

Python

/** DataFrame operations inside your streaming program */valwords:DStream[String]=...words.foreachRDD{rdd=>// Get the singleton instance of SparkSessionvalspark=SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()importspark.implicits._// Convert RDD[String] to DataFramevalwordsDataFrame=rdd.toDF("word")// Create a temporary viewwordsDataFrame.createOrReplaceTempView("words")// Do word count on DataFrame using SQL and print itvalwordCountsDataFrame=spark.sql("select word, count(*) as total from words group by word")wordCountsDataFrame.show()}

請(qǐng)參閱完整的源代碼.

您還可以對(duì)來(lái)自不同線(xiàn)程的流數(shù)據(jù)(即異步運(yùn)行的 StreamingContext )上定義的表運(yùn)行 SQL 查詢(xún). 只需確保您將 StreamingContext 設(shè)置為記住足夠數(shù)量的流數(shù)據(jù), 以便查詢(xún)可以運(yùn)行. 否則, 不知道任何異步 SQL 查詢(xún)的 StreamingContext 將在查詢(xún)完成之前刪除舊的流數(shù)據(jù). 例如, 如果要查詢(xún)最后一個(gè)批次, 但是您的查詢(xún)可能需要5分鐘才能運(yùn)行, 則可以調(diào)用streamingContext.remember(Minutes(5))(以 Scala 或其他語(yǔ)言的等價(jià)物).

有關(guān)DataFrames的更多信息, 請(qǐng)參閱DataFrames 和 SQL 指南.

MLlib 操作

您還可以輕松使用MLlib提供的機(jī)器學(xué)習(xí)算法. 首先, 有 streaming 機(jī)器學(xué)習(xí)算法(例如:Streaming 線(xiàn)性回歸,Streaming KMeans等), 其可以同時(shí)從 streaming 數(shù)據(jù)中學(xué)習(xí), 并將該模型應(yīng)用于 streaming 數(shù)據(jù). 除此之外, 對(duì)于更大類(lèi)的機(jī)器學(xué)習(xí)算法, 您可以離線(xiàn)學(xué)習(xí)一個(gè)學(xué)習(xí)模型(即使用歷史數(shù)據(jù)), 然后將該模型在線(xiàn)應(yīng)用于流數(shù)據(jù).有關(guān)詳細(xì)信息, 請(qǐng)參閱MLlib指南.

緩存 / 持久性

與 RDD 類(lèi)似, DStreams 還允許開(kāi)發(fā)人員將流的數(shù)據(jù)保留在內(nèi)存中. 也就是說(shuō), 在 DStream 上使用persist()方法會(huì)自動(dòng)將該 DStream 的每個(gè) RDD 保留在內(nèi)存中. 如果 DStream 中的數(shù)據(jù)將被多次計(jì)算(例如, 相同數(shù)據(jù)上的多個(gè)操作), 這將非常有用. 對(duì)于基于窗口的操作, 如reduceByWindow和reduceByKeyAndWindow以及基于狀態(tài)的操作, 如updateStateByKey, 這是隱含的.因此, 基于窗口的操作生成的 DStream 會(huì)自動(dòng)保存在內(nèi)存中, 而不需要開(kāi)發(fā)人員調(diào)用persist().

對(duì)于通過(guò)網(wǎng)絡(luò)接收數(shù)據(jù)(例如: Kafka, Flume, sockets 等)的輸入流, 默認(rèn)持久性級(jí)別被設(shè)置為將數(shù)據(jù)復(fù)制到兩個(gè)節(jié)點(diǎn)進(jìn)行容錯(cuò).

請(qǐng)注意, 與 RDD 不同, DStreams 的默認(rèn)持久性級(jí)別將數(shù)據(jù)序列化在內(nèi)存中. 這在性能調(diào)優(yōu)部分進(jìn)一步討論. 有關(guān)不同持久性級(jí)別的更多信息, 請(qǐng)參見(jiàn)Spark編程指南.

Checkpointing

streaming 應(yīng)用程序必須 24/7 運(yùn)行, 因此必須對(duì)應(yīng)用邏輯無(wú)關(guān)的故障(例如, 系統(tǒng)故障, JVM 崩潰等)具有彈性. 為了可以這樣做, Spark Streaming 需要checkpoint足夠的信息到容錯(cuò)存儲(chǔ)系統(tǒng), 以便可以從故障中恢復(fù).checkpoint有兩種類(lèi)型的數(shù)據(jù).

Metadata checkpointing- 將定義 streaming 計(jì)算的信息保存到容錯(cuò)存儲(chǔ)(如 HDFS)中.這用于從運(yùn)行 streaming 應(yīng)用程序的 driver 的節(jié)點(diǎn)的故障中恢復(fù)(稍后詳細(xì)討論). 元數(shù)據(jù)包括:

Configuration- 用于創(chuàng)建流應(yīng)用程序的配置.

DStream operations- 定義 streaming 應(yīng)用程序的 DStream 操作集.

Incomplete batches- 批量的job 排隊(duì)但尚未完成.

Data checkpointing- 將生成的 RDD 保存到可靠的存儲(chǔ).這在一些將多個(gè)批次之間的數(shù)據(jù)進(jìn)行組合的狀態(tài)變換中是必需的.在這種轉(zhuǎn)換中, 生成的 RDD 依賴(lài)于先前批次的 RDD, 這導(dǎo)致依賴(lài)鏈的長(zhǎng)度隨時(shí)間而增加.為了避免恢復(fù)時(shí)間的這種無(wú)限增加(與依賴(lài)關(guān)系鏈成比例), 有狀態(tài)轉(zhuǎn)換的中間 RDD 會(huì)定期checkpoint到可靠的存儲(chǔ)(例如 HDFS)以切斷依賴(lài)關(guān)系鏈.

總而言之, 元數(shù)據(jù) checkpoint 主要用于從 driver 故障中恢復(fù), 而數(shù)據(jù)或 RDD checkpoint 對(duì)于基本功能(如果使用有狀態(tài)轉(zhuǎn)換)則是必需的.

何時(shí)啟用 checkpoint

對(duì)于具有以下任一要求的應(yīng)用程序, 必須啟用 checkpoint:

使用狀態(tài)轉(zhuǎn)換- 如果在應(yīng)用程序中使用updateStateByKey或reduceByKeyAndWindow(具有反向功能), 則必須提供 checkpoint 目錄以允許定期的 RDD checkpoint.

從運(yùn)行應(yīng)用程序的 driver 的故障中恢復(fù)- 元數(shù)據(jù) checkpoint 用于使用進(jìn)度信息進(jìn)行恢復(fù).

請(qǐng)注意, 無(wú)需進(jìn)行上述有狀態(tài)轉(zhuǎn)換的簡(jiǎn)單 streaming 應(yīng)用程序即可運(yùn)行, 無(wú)需啟用 checkpoint. 在這種情況下, 驅(qū)動(dòng)器故障的恢復(fù)也將是部分的(一些接收但未處理的數(shù)據(jù)可能會(huì)丟失). 這通常是可以接受的, 許多運(yùn)行 Spark Streaming 應(yīng)用程序. 未來(lái)對(duì)非 Hadoop 環(huán)境的支持預(yù)計(jì)會(huì)有所改善.

如何配置 checkpoint

可以通過(guò)在保存 checkpoint 信息的容錯(cuò), 可靠的文件系統(tǒng)(例如, HDFS, S3等)中設(shè)置目錄來(lái)啟用 checkpoint. 這是通過(guò)使用streamingContext.checkpoint(checkpointDirectory)完成的. 這將允許您使用上述有狀態(tài)轉(zhuǎn)換. 另外, 如果要使應(yīng)用程序從 driver 故障中恢復(fù), 您應(yīng)該重寫(xiě) streaming 應(yīng)用程序以具有以下行為.

當(dāng)程序第一次啟動(dòng)時(shí), 它將創(chuàng)建一個(gè)新的 StreamingContext, 設(shè)置所有流, 然后調(diào)用 start().

當(dāng)程序在失敗后重新啟動(dòng)時(shí), 它將從 checkpoint 目錄中的 checkpoint 數(shù)據(jù)重新創(chuàng)建一個(gè) StreamingContext.

Scala

Java

Python

使用StreamingContext.getOrCreate可以簡(jiǎn)化此行為. 這樣使用如下.

// Function to create and setup a new StreamingContextdeffunctionToCreateContext():StreamingContext={valssc=newStreamingContext(...)// new contextvallines=ssc.socketTextStream(...)// create DStreams...ssc.checkpoint(checkpointDirectory)// set checkpoint directoryssc}// Get StreamingContext from checkpoint data or create a new onevalcontext=StreamingContext.getOrCreate(checkpointDirectory,functionToCreateContext_)// Do additional setup on context that needs to be done,// irrespective of whether it is being started or restartedcontext....// Start the contextcontext.start()context.awaitTermination()

If thecheckpointDirectoryexists, then the context will be recreated from the checkpoint data. If the directory does not exist (i.e., running for the first time), then the functionfunctionToCreateContextwill be called to create a new context and set up the DStreams. See the Scala exampleRecoverableNetworkWordCount. This example appends the word counts of network data into a file.

除了使用getOrCreate之外, 還需要確保在失敗時(shí)自動(dòng)重新啟動(dòng) driver 進(jìn)程. 這只能由用于運(yùn)行應(yīng)用程序的部署基礎(chǔ)架構(gòu)完成. 這在部署部分進(jìn)一步討論.

請(qǐng)注意, RDD 的 checkpoint 會(huì)導(dǎo)致保存到可靠存儲(chǔ)的成本. 這可能會(huì)導(dǎo)致 RDD 得到 checkpoint 的批次的處理時(shí)間增加. 因此, 需要仔細(xì)設(shè)置 checkpoint 的間隔. 在小批量大忻又怠(例如: 1秒), 檢查每個(gè)批次可能會(huì)顯著降低操作吞吐量. 相反, checkpoint 太少會(huì)導(dǎo)致譜系和任務(wù)大小增長(zhǎng), 這可能會(huì)產(chǎn)生不利影響. 對(duì)于需要 RDD checkpoint 的狀態(tài)轉(zhuǎn)換, 默認(rèn)間隔是至少10秒的批間隔的倍數(shù). 它可以通過(guò)使用dstream.checkpoint(checkpointInterval)進(jìn)行設(shè)置. 通常, DStream 的5到10個(gè)滑動(dòng)間隔的 checkpoint 間隔是一個(gè)很好的設(shè)置.

Accumulators, Broadcast 變量, 和 Checkpoint

在Spark Streaming中, 無(wú)法從 checkpoint 恢復(fù)AccumulatorsBroadcast 變量. 如果啟用 checkpoint 并使用AccumulatorsBroadcast 變量, 則必須為AccumulatorsBroadcast 變量創(chuàng)建延遲實(shí)例化的單例實(shí)例, 以便在 driver 重新啟動(dòng)失敗后重新實(shí)例化. 這在下面的示例中顯示:

Scala

Java

Python

objectWordBlacklist{@volatileprivatevarinstance:Broadcast[Seq[String]]=nulldefgetInstance(sc:SparkContext):Broadcast[Seq[String]]={if(instance==null){synchronized{if(instance==null){valwordBlacklist=Seq("a","b","c")instance=sc.broadcast(wordBlacklist)}}}instance}}objectDroppedWordsCounter{@volatileprivatevarinstance:LongAccumulator=nulldefgetInstance(sc:SparkContext):LongAccumulator={if(instance==null){synchronized{if(instance==null){instance=sc.longAccumulator("WordsInBlacklistCounter")}}}instance}}wordCounts.foreachRDD{(rdd:RDD[(String,Int)],time:Time)=>// Get or register the blacklist Broadcastvalblacklist=WordBlacklist.getInstance(rdd.sparkContext)// Get or register the droppedWordsCounter AccumulatorvaldroppedWordsCounter=DroppedWordsCounter.getInstance(rdd.sparkContext)// Use blacklist to drop words and use droppedWordsCounter to count themvalcounts=rdd.filter{case(word,count)=>if(blacklist.value.contains(word)){droppedWordsCounter.add(count)false}else{true}}.collect().mkString("[",", ","]")valoutput="Counts at time "+time+" "+counts})

請(qǐng)參閱完整的源代碼.

應(yīng)用程序部署

本節(jié)討論部署 Spark Streaming 應(yīng)用程序的步驟.

要求

要運(yùn)行 Spark Streaming 應(yīng)用程序, 您需要具備以下功能.

集群管理器集群- 這是任何 Spark 應(yīng)用程序的一般要求, 并在部署指南中詳細(xì)討論.

打包應(yīng)用程序 JAR- 您必須將 streaming 應(yīng)用程序編譯為 JAR. 如果您正在使用spark-submit啟動(dòng)應(yīng)用程序, 則不需要在 JAR 中提供 Spark 和 Spark Streaming.但是, 如果您的應(yīng)用程序使用高級(jí)資源(例如: Kafka, Flume), 那么您將必須將他們鏈接的額外工件及其依賴(lài)項(xiàng)打包在用于部署應(yīng)用程序的 JAR 中.例如, 使用KafkaUtils的應(yīng)用程序必須在應(yīng)用程序 JAR 中包含spark-streaming-kafka-0-8_2.11及其所有傳遞依賴(lài)關(guān)系.

為 executor 配置足夠的內(nèi)存- 由于接收到的數(shù)據(jù)必須存儲(chǔ)在內(nèi)存中, 所以 executor 必須配置足夠的內(nèi)存來(lái)保存接收到的數(shù)據(jù). 請(qǐng)注意, 如果您正在進(jìn)行10分鐘的窗口操作, 系統(tǒng)必須至少保留最近10分鐘的內(nèi)存中的數(shù)據(jù). 因此, 應(yīng)用程序的內(nèi)存要求取決于其中使用的操作.

配置 checkpoint- 如果 streaming 應(yīng)用程序需要它, 則 Hadoop API 兼容容錯(cuò)存儲(chǔ)(例如:HDFS, S3等)中的目錄必須配置為 checkpoint 目錄, 并且流程應(yīng)用程序以 checkpoint 信息的方式編寫(xiě) 用于故障恢復(fù). 有關(guān)詳細(xì)信息, 請(qǐng)參閱checkpoint部分.

配置應(yīng)用程序 driver 的自動(dòng)重新啟動(dòng)- 要從 driver 故障自動(dòng)恢復(fù), 用于運(yùn)行流應(yīng)用程序的部署基礎(chǔ)架構(gòu)必須監(jiān)視 driver 進(jìn)程, 并在 driver 發(fā)生故障時(shí)重新啟動(dòng) driver.不同的集群管理者有不同的工具來(lái)實(shí)現(xiàn)這一點(diǎn).

Spark Standalone- 可以提交 Spark 應(yīng)用程序 driver 以在Spark Standalone集群中運(yùn)行(請(qǐng)參閱集群部署模式), 即應(yīng)用程序 driver 本身在其中一個(gè)工作節(jié)點(diǎn)上運(yùn)行. 此外, 可以指示獨(dú)立的群集管理器來(lái)監(jiān)督 driver, 如果由于非零退出代碼而導(dǎo)致 driver 發(fā)生故障, 或由于運(yùn)行 driver 的節(jié)點(diǎn)發(fā)生故障, 則可以重新啟動(dòng)它. 有關(guān)詳細(xì)信息, 請(qǐng)參閱 [Spark Standalone 指南]](spark-standalone.html) 中的群集模式和監(jiān)督.

YARN- Yarn 支持類(lèi)似的機(jī)制來(lái)自動(dòng)重新啟動(dòng)應(yīng)用程序.有關(guān)詳細(xì)信息, 請(qǐng)參閱 YARN文檔.

Mesos-Marathon已被用來(lái)實(shí)現(xiàn)這一點(diǎn)與Mesos.

配置預(yù)寫(xiě)日志- 自 Spark 1.2 以來(lái), 我們引入了寫(xiě)入日志來(lái)實(shí)現(xiàn)強(qiáng)大的容錯(cuò)保證.如果啟用, 則從 receiver 接收的所有數(shù)據(jù)都將寫(xiě)入配置 checkpoint 目錄中的寫(xiě)入日志.這可以防止 driver 恢復(fù)時(shí)的數(shù)據(jù)丟失, 從而確保零數(shù)據(jù)丟失(在容錯(cuò)語(yǔ)義部分中詳細(xì)討論).可以通過(guò)將配置參數(shù)spark.streaming.receiver.writeAheadLog.enable設(shè)置為true來(lái)啟用此功能.然而, 這些更強(qiáng)的語(yǔ)義可能以單個(gè) receiver 的接收吞吐量為代價(jià).通過(guò)并行運(yùn)行更多的 receiver可以糾正這一點(diǎn), 以增加總吞吐量.另外, 建議在啟用寫(xiě)入日志時(shí), 在日志已經(jīng)存儲(chǔ)在復(fù)制的存儲(chǔ)系統(tǒng)中時(shí), 禁用在 Spark 中接收到的數(shù)據(jù)的復(fù)制.這可以通過(guò)將輸入流的存儲(chǔ)級(jí)別設(shè)置為StorageLevel.MEMORY_AND_DISK_SER來(lái)完成.使用 S3(或任何不支持刷新的文件系統(tǒng))寫(xiě)入日志時(shí), 請(qǐng)記住啟用spark.streaming.driver.writeAheadLog.closeFileAfterWrite和spark.streaming.receiver.writeAheadLog.closeFileAfterWrite.有關(guān)詳細(xì)信息, 請(qǐng)參閱Spark Streaming配.請(qǐng)注意, 啟用 I/O 加密時(shí), Spark 不會(huì)將寫(xiě)入寫(xiě)入日志的數(shù)據(jù)加密.如果需要對(duì)提前記錄數(shù)據(jù)進(jìn)行加密, 則應(yīng)將其存儲(chǔ)在本地支持加密的文件系統(tǒng)中.

設(shè)置最大接收速率- 如果集群資源不夠大, streaming 應(yīng)用程序能夠像接收到的那樣快速處理數(shù)據(jù), 則可以通過(guò)設(shè)置 記錄/秒 的最大速率限制來(lái)對(duì) receiver 進(jìn)行速率限制. 請(qǐng)參閱 receiver 的spark.streaming.receiver.maxRate和用于 Direct Kafka 方法的spark.streaming.kafka.maxRatePerPartition的配置參數(shù). 在Spark 1.5中, 我們引入了一個(gè)稱(chēng)為背壓的功能, 無(wú)需設(shè)置此速率限制, 因?yàn)镾park Streaming會(huì)自動(dòng)計(jì)算速率限制, 并在處理?xiàng)l件發(fā)生變化時(shí)動(dòng)態(tài)調(diào)整速率限制. 可以通過(guò)將配置參數(shù)spark.streaming.backpressure.enabled設(shè)置為true來(lái)啟用此 backpressure.

升級(jí)應(yīng)用程序代碼

如果運(yùn)行的 Spark Streaming 應(yīng)用程序需要使用新的應(yīng)用程序代碼進(jìn)行升級(jí), 則有兩種可能的機(jī)制.

升級(jí)后的 Spark Streaming 應(yīng)用程序與現(xiàn)有應(yīng)用程序并行啟動(dòng)并運(yùn)行.一旦新的(接收與舊的數(shù)據(jù)相同的數(shù)據(jù))已經(jīng)升溫并準(zhǔn)備好黃金時(shí)段, 舊的可以被關(guān)掉.請(qǐng)注意, 這可以用于支持將數(shù)據(jù)發(fā)送到兩個(gè)目的地(即較早和已升級(jí)的應(yīng)用程序)的數(shù)據(jù)源.

現(xiàn)有應(yīng)用程序正常關(guān)閉(請(qǐng)參閱StreamingContext.stop(...)JavaStreamingContext.stop(...)以獲取正常的關(guān)閉選項(xiàng)), 以確保已關(guān)閉的數(shù)據(jù)在關(guān)閉之前被完全處理.然后可以啟動(dòng)升級(jí)的應(yīng)用程序, 這將從較早的應(yīng)用程序停止的同一點(diǎn)開(kāi)始處理.請(qǐng)注意, 只有在支持源端緩沖的輸入源(如: Kafka 和 Flume)時(shí)才可以進(jìn)行此操作, 因?yàn)閿?shù)據(jù)需要在先前的應(yīng)用程序關(guān)閉并且升級(jí)的應(yīng)用程序尚未啟動(dòng)時(shí)進(jìn)行緩沖.從升級(jí)前代碼的早期 checkpoint 信息重新啟動(dòng)不能完成.checkpoint 信息基本上包含序列化的 Scala/Java/Python 對(duì)象, 并嘗試使用新的修改的類(lèi)反序列化對(duì)象可能會(huì)導(dǎo)致錯(cuò)誤.在這種情況下, 可以使用不同的 checkpoint 目錄啟動(dòng)升級(jí)的應(yīng)用程序, 也可以刪除以前的 checkpoint 目錄.

Monitoring Applications (監(jiān)控應(yīng)用程序)

除了 Spark 的monitoring capabilities(監(jiān)控功能), 還有其他功能特定于 Spark Streaming .當(dāng)使用 StreamingContext 時(shí),Spark web UI顯示一個(gè)額外的Streaming選項(xiàng)卡, 顯示 running receivers (運(yùn)行接收器)的統(tǒng)計(jì)信息(無(wú)論是 receivers (接收器)是否處于 active (活動(dòng)狀態(tài)), 接收到的 records (記錄)數(shù), receiver error (接收器錯(cuò)誤)等)并完成 batches (批次)(batch processing times (批處理時(shí)間), queueing delays (排隊(duì)延遲)等).這可以用來(lái)監(jiān)視 streaming application (流應(yīng)用程序)的進(jìn)度.

web UI 中的以下兩個(gè) metrics (指標(biāo))特別重要:

Processing Time (處理時(shí)間)- 處理每 batch (批)數(shù)據(jù)的時(shí)間 .

Scheduling Delay (調(diào)度延遲)- batch (批處理)在 queue (隊(duì)列)中等待處理 previous batches (以前批次)完成的時(shí)間.

如果 batch processing time (批處理時(shí)間)始終 more than (超過(guò)) batch interval (批間隔) and/or queueing delay (排隊(duì)延遲)不斷增加, 表示系統(tǒng)是無(wú)法快速 process the batches (處理批次), 并且正在 falling behind (落后). 在這種情況下, 請(qǐng)考慮reducing (減少)batch processing time (批處理時(shí)間).

Spark Streaming 程序的進(jìn)展也可以使用StreamingListener接口, 這允許您獲得 receiver status (接收器狀態(tài))和 processing times (處理時(shí)間).請(qǐng)注意, 這是一個(gè)開(kāi)發(fā)人員 API 并且將來(lái)可能會(huì)改善(即, 更多的信息報(bào)告).

Performance Tuning (性能調(diào)優(yōu))

在集群上的 Spark Streaming application 中獲得最佳性能需要一些調(diào)整.本節(jié)介紹了可調(diào)整的多個(gè) parameters (參數(shù))和 configurations (配置)提高你的應(yīng)用程序性能.在高層次上, 你需要考慮兩件事情:

通過(guò)有效利用集群資源, Reducing the processing time of each batch of data (減少每批數(shù)據(jù)的處理時(shí)間).

設(shè)置正確的 batch size (批量大小), 以便 batches of data (批量的數(shù)據(jù))可以像 received (被接收)處理一樣快(即 data processing (數(shù)據(jù)處理)與 data ingestion (數(shù)據(jù)攝扰髂)保持一致).

Reducing the Batch Processing Times (減少批處理時(shí)間)

在 Spark 中可以進(jìn)行一些優(yōu)化, 以 minimize the processing time of each batch (最小化每批處理時(shí)間).這些已在Tuning Guide (調(diào)優(yōu)指南)中詳細(xì)討論過(guò).本節(jié)突出了一些最重要的.

Level of Parallelism in Data Receiving (數(shù)據(jù)接收中的并行級(jí)別)

通過(guò)網(wǎng)絡(luò)接收數(shù)據(jù)(如Kafka, Flume, socket 等)需要 deserialized (反序列化)數(shù)據(jù)并存儲(chǔ)在 Spark 中.如果數(shù)據(jù)接收成為系統(tǒng)的瓶頸, 那么考慮一下 parallelizing the data receiving (并行化數(shù)據(jù)接收).注意每個(gè) input DStream 創(chuàng)建接收 single stream of data (單個(gè)數(shù)據(jù)流)的 single receiver (單個(gè)接收器)(在 work machine 上運(yùn)行). 因此, 可以通過(guò)創(chuàng)建多個(gè) input DStreams 來(lái)實(shí)現(xiàn) Receiving multiple data streams (接收多個(gè)數(shù)據(jù)流)并配置它們以從 source(s) 接收 data stream (數(shù)據(jù)流)的 different partitions (不同分區(qū)).例如, 接收 two topics of data (兩個(gè)數(shù)據(jù)主題)的單個(gè)Kafka input DStream 可以分為兩個(gè) Kafka input streams (輸入流), 每個(gè)只接收一個(gè) topic (主題).這將運(yùn)行兩個(gè) receivers (接收器), 允許 in parallel (并行)接收數(shù)據(jù), 從而提高 overall throughput (總體吞吐量).這些 multiple DStreams 可以 unioned (聯(lián)合起來(lái))創(chuàng)建一個(gè) single DStream .然后 transformations (轉(zhuǎn)化)為應(yīng)用于 single input DStream 可以應(yīng)用于 unified stream .如下這樣做.

Scala

Java

Python

valnumStreams=5valkafkaStreams=(1tonumStreams).map{i=>KafkaUtils.createStream(...)}valunifiedStream=streamingContext.union(kafkaStreams)unifiedStream.print()

應(yīng)考慮的另一個(gè)參數(shù)是 receiver’s block interval (接收器的塊間隔), 這由configuration parameter (配置參數(shù))的spark.streaming.blockInterval決定.對(duì)于大多數(shù) receivers (接收器), 接收到的數(shù)據(jù) coalesced (合并)在一起存儲(chǔ)在 Spark 內(nèi)存之前的 blocks of data (數(shù)據(jù)塊).每個(gè) batch (批次)中的 blocks (塊)數(shù)確定將用于處理接收到的數(shù)據(jù)以 map-like (類(lèi)似與 map 形式的) transformation (轉(zhuǎn)換)的 task (任務(wù))的數(shù)量.每個(gè) receiver (接收器)每 batch (批次)的任務(wù)數(shù)量將是大約( batch interval (批間隔)/ block interval (塊間隔)).例如, 200 ms的 block interval (塊間隔)每 2 秒 batches (批次)創(chuàng)建 10 個(gè) tasks (任務(wù)).如果 tasks (任務(wù))數(shù)量太少(即少于每個(gè)機(jī)器的內(nèi)核數(shù)量), 那么它將無(wú)效, 因?yàn)樗锌捎玫膬?nèi)核都不會(huì)被使用處理數(shù)據(jù).要增加 given batch interval (給定批間隔)的 tasks (任務(wù))數(shù)量, 請(qǐng)減少 block interval (塊間??隔).但是, 推薦的 block interval (塊間隔)最小值約為 50ms , 低于此任務(wù)啟動(dòng)開(kāi)銷(xiāo)可能是一個(gè)問(wèn)題.

使用 multiple input streams (多個(gè)輸入流)/ receivers (接收器)接收數(shù)據(jù)的替代方法是明確 repartition (重新分配) input data stream (輸入數(shù)據(jù)流)(使用inputStream.repartition()). 這會(huì)在 further processing (進(jìn)一步處理)之前將 received batches of data (收到的批次數(shù)據(jù)) distributes (分發(fā))到集群中指定數(shù)量的計(jì)算機(jī).

Level of Parallelism in Data Processing (數(shù)據(jù)處理中的并行度水平)

如果在任何 computation (計(jì)算)階段中使用 number of parallel tasks (并行任務(wù)的數(shù)量), 則 Cluster resources (集群資源)可能未得到充分利用. 例如, 對(duì)于 distributed reduce (分布式 reduce)操作, 如reduceByKey和reduceByKeyAndWindow, 默認(rèn)并行任務(wù)的數(shù)量由spark.default.parallelismconfiguration property控制. 您 可以通過(guò) parallelism (并行度)作為參數(shù)(見(jiàn)PairDStreamFunctions文檔 ), 或設(shè)置spark.default.parallelismconfiguration property更改默認(rèn)值.

Data Serialization (數(shù)據(jù)序列化)

可以通過(guò)調(diào)優(yōu) serialization formats (序列化格式)來(lái)減少數(shù)據(jù) serialization (序列化)的開(kāi)銷(xiāo).在 streaming 的情況下, 有兩種類(lèi)型的數(shù)據(jù)被 serialized (序列化).

Input data (輸入數(shù)據(jù)): 默認(rèn)情況下, 通過(guò) Receivers 接收的 input data (輸入數(shù)據(jù))通過(guò)StorageLevel.MEMORY_AND_DISK_SER_2存儲(chǔ)在 executors 的內(nèi)存中.也就是說(shuō), 將數(shù)據(jù) serialized (序列化)為 bytes (字節(jié))以減少 GC 開(kāi)銷(xiāo), 并復(fù)制以容忍 executor failures (執(zhí)行器故障).此外, 數(shù)據(jù)首先保留在內(nèi)存中, 并且只有在內(nèi)存不足以容納 streaming computation (流計(jì)算)所需的所有輸入數(shù)據(jù)時(shí)才會(huì) spilled over (溢出)到磁盤(pán).這個(gè) serialization (序列化)顯然具有開(kāi)銷(xiāo) - receiver (接收器)必須使接收的數(shù)據(jù) deserialize (反序列化), 并使用 Spark 的 serialization format (序列化格式)重新序列化它.

Persisted RDDs generated by Streaming Operations (流式操作生成的持久 RDDs): 通過(guò) streaming computations (流式計(jì)算)生成的 RDD 可能會(huì)持久存儲(chǔ)在內(nèi)存中.例如, window operations (窗口操作)會(huì)將數(shù)據(jù)保留在內(nèi)存中, 因?yàn)樗鼈儗⒈惶幚矶啻?但是, 與StorageLevel.MEMORY_ONLY的 Spark Core 默認(rèn)情況不同, 通過(guò)流式計(jì)算生成的持久化 RDD 將以StorageLevel.MEMORY_ONLY_SER(即序列化), 以最小化 GC 開(kāi)銷(xiāo).

在這兩種情況下, 使用 Kryo serialization (Kryo 序列化)可以減少 CPU 和內(nèi)存開(kāi)銷(xiāo).有關(guān)詳細(xì)信息, 請(qǐng)參閱Spark Tuning Guide.對(duì)于 Kryo , 請(qǐng)考慮 registering custom classes , 并禁用對(duì)象引用跟蹤(請(qǐng)參閱Configuration Guide中的 Kryo 相關(guān)配置).

在 streaming application 需要保留的數(shù)據(jù)量不大的特定情況下, 可以將數(shù)據(jù)(兩種類(lèi)型)作為 deserialized objects (反序列化對(duì)象)持久化, 而不會(huì)導(dǎo)致過(guò)多的 GC 開(kāi)銷(xiāo).例如, 如果您使用幾秒鐘的 batch intervals (批次間隔)并且沒(méi)有 window operations (窗口操作), 那么可以通過(guò)明確地相應(yīng)地設(shè)置 storage level (存儲(chǔ)級(jí)別)來(lái)嘗試禁用 serialization in persisted data (持久化數(shù)據(jù)中的序列化).這將減少由于序列化造成的 CPU 開(kāi)銷(xiāo), 潛在地提高性能, 而不需要太多的 GC 開(kāi)銷(xiāo).

Task Launching Overheads (任務(wù)啟動(dòng)開(kāi)銷(xiāo))

如果每秒啟動(dòng)的任務(wù)數(shù)量很高(比如每秒 50 個(gè)或更多), 那么這個(gè)開(kāi)銷(xiāo)向 slaves 發(fā)送任務(wù)可能是重要的, 并且將難以實(shí)現(xiàn) sub-second latencies (次要的延遲).可以通過(guò)以下更改減少開(kāi)銷(xiāo):

Execution mode (執(zhí)行模式): 以 Standalone mode (獨(dú)立模式)或 coarse-grained Mesos 模式運(yùn)行 Spark 比 fine-grained Mesos 模式更好的任務(wù)啟動(dòng)時(shí)間.有關(guān)詳細(xì)信息, 請(qǐng)參閱Running on Mesos guide.

這些更改可能會(huì)將 batch processing time (批處理時(shí)間)縮短 100 毫秒, 從而允許 sub-second batch size (次秒批次大屑呕恪)是可行的.

Setting the Right Batch Interval (設(shè)置正確的批次間隔)

對(duì)于在集群上穩(wěn)定地運(yùn)行的 Spark Streaming application, 該系統(tǒng)應(yīng)該能夠處理數(shù)據(jù)盡可能快地被接收.換句話(huà)說(shuō), 應(yīng)該處理批次的數(shù)據(jù)就像生成它們一樣快.這是否適用于 application 可以在monitoringstreaming web UI 中的 processing times 中被找到, processing time (批處理處理時(shí)間)應(yīng)小于 batch interval (批間隔).

取決于 streaming computation (流式計(jì)算)的性質(zhì), 使用的 batch interval (批次間隔)可能對(duì)處理由應(yīng)用程序持續(xù)一組固定的 cluster resources (集群資源)的數(shù)據(jù)速率有重大的影響.例如, 讓我們考慮早期的 WordCountNetwork 示例.對(duì)于特定的 data rate (數(shù)據(jù)速率), 系統(tǒng)可能能夠跟蹤每 2 秒報(bào)告 word counts (即 2 秒的 batch interval (批次間隔)), 但不能每 500 毫秒.因此, 需要設(shè)置 batch interval (批次間隔), 使預(yù)期的數(shù)據(jù)速率在生產(chǎn)可以持續(xù).

為您的應(yīng)用程序找出正確的 batch size (批量大小)的一個(gè)好方法是使用進(jìn)行測(cè)試 conservative batch interval (保守的批次間隔)(例如 5-10 秒)和 low data rate (低數(shù)據(jù)速率).驗(yàn)證是否系統(tǒng)能夠跟上 data rate (數(shù)據(jù)速率), 可以檢查遇到的 end-to-end delay (端到端延遲)的值通過(guò)每個(gè) processed batch (處理的批次)(在 Spark driver log4j 日志中查找 “Total delay” , 或使用StreamingListener接口). 如果 delay (延遲)保持與 batch size (批量大械啡尽)相當(dāng), 那么系統(tǒng)是穩(wěn)定的.除此以外, 如果延遲不斷增加, 則意味著系統(tǒng)無(wú)法跟上, 因此不穩(wěn)定.一旦你有一個(gè) stable configuration (穩(wěn)定的配置)的想法, 你可以嘗試增加 data rate and/or 減少 batch size .請(qǐng)注意, momentary increase (瞬時(shí)增加)由于延遲暫時(shí)增加只要延遲降低到 low value (低值), 臨時(shí)數(shù)據(jù)速率增加就可以很好(即, 小于 batch size (批量大薪景辍)).

Memory Tuning (內(nèi)存調(diào)優(yōu))

調(diào)整 Spark 應(yīng)用程序的內(nèi)存使用情況和 GC behavior 已經(jīng)有很多的討論在Tuning Guide中.我們強(qiáng)烈建議您閱讀一下.在本節(jié)中, 我們將在 Spark Streaming applications 的上下文中討論一些 tuning parameters (調(diào)優(yōu)參數(shù)).

Spark Streaming application 所需的集群內(nèi)存量在很大程度上取決于所使用的 transformations 類(lèi)型.例如, 如果要在最近 10 分鐘的數(shù)據(jù)中使用 window operation (窗口操作), 那么您的集群應(yīng)該有足夠的內(nèi)存來(lái)容納內(nèi)存中 10 分鐘的數(shù)據(jù).或者如果要使用大量 keys 的updateStateByKey, 那么必要的內(nèi)存將會(huì)很高.相反, 如果你想做一個(gè)簡(jiǎn)單的 map-filter-store 操作, 那么所需的內(nèi)存就會(huì)很低.

一般來(lái)說(shuō), 由于通過(guò) receivers (接收器)接收的數(shù)據(jù)與 StorageLevel.MEMORY_AND_DISK_SER_2 一起存儲(chǔ), 所以不適合內(nèi)存的數(shù)據(jù)將會(huì) spill over (溢出)到磁盤(pán)上.這可能會(huì)降低 streaming application (流式應(yīng)用程序)的性能, 因此建議您提供足夠的 streaming application (流量應(yīng)用程序)所需的內(nèi)存.最好仔細(xì)查看內(nèi)存使用量并相應(yīng)地進(jìn)行估算.

memory tuning (內(nèi)存調(diào)優(yōu))的另一個(gè)方面是 garbage collection (垃圾收集).對(duì)于需要低延遲的 streaming application , 由 JVM Garbage Collection 引起的大量暫停是不希望的.

有幾個(gè) parameters (參數(shù))可以幫助您調(diào)整 memory usage (內(nèi)存使用量)和 GC 開(kāi)銷(xiāo):

Persistence Level of DStreams (DStreams 的持久性級(jí)別): 如前面在Data Serialization部分中所述, input data 和 RDD 默認(rèn)保持為 serialized bytes (序列化字節(jié)).與 deserialized persistence (反序列化持久性)相比, 這減少了內(nèi)存使用量和 GC 開(kāi)銷(xiāo).啟用 Kryo serialization 進(jìn)一步減少了 serialized sizes (序列化大小)和 memory usage (內(nèi)存使用).可以通過(guò) compression (壓縮)來(lái)實(shí)現(xiàn)內(nèi)存使用的進(jìn)一步減少(參見(jiàn)Spark配置spark.rdd.compress), 代價(jià)是 CPU 時(shí)間.

Clearing old data (清除舊數(shù)據(jù)): 默認(rèn)情況下, DStream 轉(zhuǎn)換生成的所有 input data 和 persisted RDDs 將自動(dòng)清除. Spark Streaming 決定何時(shí)根據(jù)所使用的 transformations (轉(zhuǎn)換)來(lái)清除數(shù)據(jù).例如, 如果您使用 10 分鐘的 window operation (窗口操作), 則 Spark Streaming 將保留最近 10 分鐘的數(shù)據(jù), 并主動(dòng)丟棄舊數(shù)據(jù). 數(shù)據(jù)可以通過(guò)設(shè)置streamingContext.remember保持更長(zhǎng)的持續(xù)時(shí)間(例如交互式查詢(xún)舊數(shù)據(jù)).

CMS Garbage Collector (CMS垃圾收集器): 強(qiáng)烈建議使用 concurrent mark-and-sweep GC , 以保持 GC 相關(guān)的暫停始終如一.即使 concurrent GC 已知可以減少 系統(tǒng)的整體處理吞吐量, 其使用仍然建議實(shí)現(xiàn)更多一致的 batch processing times (批處理時(shí)間).確保在 driver (使用--driver-java-options在spark-submit中 )和 executors (使用Spark configurationspark.executor.extraJavaOptions)中設(shè)置 CMS GC.

Other tips (其他提示): 為了進(jìn)一步降低 GC 開(kāi)銷(xiāo), 以下是一些更多的提示.

使用OFF_HEAP存儲(chǔ)級(jí)別的保持 RDDs .在Spark Programming Guide中查看更多詳細(xì)信息.

使用更小的 heap sizes 的 executors.這將降低每個(gè) JVM heap 內(nèi)的 GC 壓力.

Important points to remember(要記住的要點(diǎn)):

DStream 與 single receiver (單個(gè)接收器)相關(guān)聯(lián).為了獲得讀取并行性, 需要?jiǎng)?chuàng)建多個(gè) receivers , 即 multiple DStreams .receiver 在一個(gè) executor 中運(yùn)行.它占據(jù)一個(gè) core (內(nèi)核).確保在 receiver slots are booked 后有足夠的內(nèi)核進(jìn)行處理, 即spark.cores.max應(yīng)該考慮 receiver slots . receivers 以循環(huán)方式分配給 executors .

當(dāng)從 stream source 接收到數(shù)據(jù)時(shí), receiver 創(chuàng)建數(shù)據(jù) blocks (塊).每個(gè) blockInterval 毫秒生成一個(gè)新的數(shù)據(jù)塊.在 N = batchInterval/blockInterval 的 batchInterval 期間創(chuàng)建 N 個(gè)數(shù)據(jù)塊.這些塊由當(dāng)前 executor 的 BlockManager 分發(fā)給其他執(zhí)行程序的 block managers .之后, 在驅(qū)動(dòng)程序上運(yùn)行的 Network Input Tracker (網(wǎng)絡(luò)輸入跟蹤器)通知有關(guān)進(jìn)一步處理的塊位置

在驅(qū)動(dòng)程序中為在 batchInterval 期間創(chuàng)建的塊創(chuàng)建一個(gè) RDD .在 batchInterval 期間生成的塊是 RDD 的 partitions .每個(gè)分區(qū)都是一個(gè) spark 中的 task. blockInterval == batchinterval 意味著創(chuàng)建 single partition (單個(gè)分區(qū)), 并且可能在本地進(jìn)行處理.

除非 non-local scheduling (非本地調(diào)度)進(jìn)行, 否則塊上的 map tasks (映射任務(wù))將在 executors (接收 block, 復(fù)制塊的另一個(gè)塊)中進(jìn)行處理.具有更大的 block interval (塊間隔)意味著更大的塊.spark.locality.wait的高值增加了處理 local node (本地節(jié)點(diǎn))上的塊的機(jī)會(huì).需要在這兩個(gè)參數(shù)之間找到平衡, 以確保在本地處理較大的塊.

而不是依賴(lài)于 batchInterval 和 blockInterval , 您可以通過(guò)調(diào)用inputDstream.repartition(n)來(lái)定義 number of partitions (分區(qū)數(shù)).這樣可以隨機(jī)重新組合 RDD 中的數(shù)據(jù), 創(chuàng)建 n 個(gè)分區(qū).是的, 為了更大的 parallelism (并行性).雖然是 shuffle 的代價(jià). RDD 的處理由 driver’s jobscheduler 作為一項(xiàng)工作安排.在給定的時(shí)間點(diǎn), 只有一個(gè) job 是 active 的.因此, 如果一個(gè)作業(yè)正在執(zhí)行, 則其他作業(yè)將排隊(duì).

如果您有兩個(gè) dstream , 將會(huì)有兩個(gè) RDD 形成, 并且將創(chuàng)建兩個(gè)將被安排在另一個(gè)之后的作業(yè).為了避免這種情況, 你可以聯(lián)合兩個(gè) dstream .這將確保為 dstream 的兩個(gè) RDD 形成一個(gè) unionRDD .這個(gè) unionRDD 然后被認(rèn)為是一個(gè) single job (單一的工作).但 RDD 的 partitioning (分區(qū))不受影響.

如果 batch processing time (批處理時(shí)間)超過(guò) batchinterval (批次間隔), 那么顯然 receiver 的內(nèi)存將會(huì)開(kāi)始填滿(mǎn), 最終會(huì)拋出 exceptions (最可能是 BlockNotFoundException ).目前沒(méi)有辦法暫停 receiver .使用 SparkConf 配置spark.streaming.receiver.maxRate, receiver 的 rate 可以受到限制.

Fault-tolerance Semantics (容錯(cuò)語(yǔ)義)

在本節(jié)中, 我們將討論 Spark Streaming applications 在該 event 中的行為的失敗.

Background(背景)

要了解 Spark Streaming 提供的語(yǔ)義, 請(qǐng)記住 Spark 的 RDD 的基本 fault-tolerance semantics (容錯(cuò)語(yǔ)義).

RDD 是一個(gè)不可變的, 確定性地可重新計(jì)算的分布式數(shù)據(jù)集.每個(gè)RDD 記住在容錯(cuò)輸入中使用的確定性操作的 lineage 數(shù)據(jù)集創(chuàng)建它.

如果 RDD 的任何 partition 由于工作節(jié)點(diǎn)故障而丟失, 則該分區(qū)可以是 從 original fault-tolerant dataset (原始容錯(cuò)數(shù)據(jù)集)中使用業(yè)務(wù)流程重新計(jì)算.

假設(shè)所有的 RDD transformations 都是確定性的, 最后的數(shù)據(jù)被轉(zhuǎn)換, 無(wú)論 Spark 集群中的故障如何, RDD 始終是一樣的.

Spark 運(yùn)行在容錯(cuò)文件系統(tǒng)(如 HDFS 或 S3 )中的數(shù)據(jù)上.因此, 從容錯(cuò)數(shù)據(jù)生成的所有 RDD 也都是容錯(cuò)的.但是, 這不是在大多數(shù)情況下, Spark Streaming 作為數(shù)據(jù)的情況通過(guò)網(wǎng)絡(luò)接收(除非fileStream被使用).為了為所有生成的 RDD 實(shí)現(xiàn)相同的 fault-tolerance properties (容錯(cuò)屬性), 接收的數(shù)據(jù)在集群中的工作節(jié)點(diǎn)中的多個(gè) Spark executors 之間進(jìn)行復(fù)制(默認(rèn) replication factor (備份因子)為 2).這導(dǎo)致了發(fā)生故障時(shí)需要恢復(fù)的系統(tǒng)中的兩種數(shù)據(jù):

Data received and replicated (數(shù)據(jù)接收和復(fù)制)- 這個(gè)數(shù)據(jù)在單個(gè)工作節(jié)點(diǎn)作為副本的故障中幸存下來(lái), 它存在于其他節(jié)點(diǎn)之一上.

Data received but buffered for replication (接收數(shù)據(jù)但緩沖進(jìn)行復(fù)制)- 由于不復(fù)制, 恢復(fù)此數(shù)據(jù)的唯一方法是從 source 重新獲取.

此外, 我們應(yīng)該關(guān)注的有兩種 failures:

Failure of a Worker Node (工作節(jié)點(diǎn)的故障)- 運(yùn)行 executors 的任何工作節(jié)點(diǎn)都可能會(huì)故障, 并且這些節(jié)點(diǎn)上的所有內(nèi)存中數(shù)據(jù)將丟失.如果任何 receivers 運(yùn)行在失敗節(jié)點(diǎn), 則它們的 buffered (緩沖)數(shù)據(jù)將丟失.

Failure of the Driver Node (Driver 節(jié)點(diǎn)的故障)- 如果運(yùn)行 Spark Streaming application 的 driver node 發(fā)生了故障, 那么顯然 SparkContext 丟失了, 所有的 executors 和其內(nèi)存中的數(shù)據(jù)也一起丟失了.

有了這個(gè)基礎(chǔ)知識(shí), 讓我們了解 Spark Streaming 的 fault-tolerance semantics (容錯(cuò)語(yǔ)義).

Definitions (定義)

streaming systems (流系統(tǒng))的語(yǔ)義通常是通過(guò)系統(tǒng)可以處理每個(gè)記錄的次數(shù)來(lái)捕獲的.系統(tǒng)可以在所有可能的操作條件下提供三種類(lèi)型的保證(盡管有故障等).

At most once (最多一次): 每個(gè) record (記錄)將被處理一次或根本不處理.

At least once (至少一次): 每個(gè) record (記錄)將被處理一次或多次.這比at-most once, 因?yàn)樗_保沒(méi)有數(shù)據(jù)將丟失.但可能有重復(fù).

Exactly once(有且僅一次): 每個(gè) record (記錄) 將被精確處理一次 - 沒(méi)有數(shù)據(jù)丟失, 數(shù)據(jù)不會(huì)被多次處理.這顯然是三者的最強(qiáng)保證.

Basic Semantics (基本語(yǔ)義)

在任何 stream processing system (流處理系統(tǒng))中, 廣義上說(shuō), 處理數(shù)據(jù)有三個(gè)步驟.

Receiving the data (接收數(shù)據(jù)): 使用 Receivers 或其他方式從數(shù)據(jù)源接收數(shù)據(jù).

Transforming the data (轉(zhuǎn)換數(shù)據(jù)): 使用 DStream 和 RDD transformations 來(lái) transformed (轉(zhuǎn)換)接收到的數(shù)據(jù).

Pushing out the data (推出數(shù)據(jù)): 最終的轉(zhuǎn)換數(shù)據(jù)被推出到 external systems (外部系統(tǒng)), 如 file systems (文件系統(tǒng)), databases (數(shù)據(jù)庫(kù)), dashboards (儀表板)等.

如果 streaming application 必須實(shí)現(xiàn) end-to-end exactly-once guarantees (端到端的一次且僅一次性保證), 那么每個(gè)步驟都必須提供 exactly-once guarantee .也就是說(shuō), 每個(gè)記錄必須被精確地接收一次, 轉(zhuǎn)換完成一次, 并被推送到下游系統(tǒng)一次.讓我們?cè)?Spark Streaming 的上下文中了解這些步驟的語(yǔ)義.

Receiving the data (接收數(shù)據(jù)): 不同的 input sources 提供不同的保證.這將在下一小節(jié)中詳細(xì)討論.

Transforming the data (轉(zhuǎn)換數(shù)據(jù)): 所有已收到的數(shù)據(jù)都將被處理exactly once, 這得益于 RDD 提供的保證.即使存在故障, 只要接收到的輸入數(shù)據(jù)可訪問(wèn), 最終變換的 RDD 將始終具有相同的內(nèi)容.

Pushing out the data (推出數(shù)據(jù)): 默認(rèn)情況下的輸出操作確保at-least once語(yǔ)義, 因?yàn)樗Q于輸出操作的類(lèi)型( idempotent (冪等))或 downstream system (下游系統(tǒng))的語(yǔ)義(是否支持 transactions (事務(wù))).但用戶(hù)可以實(shí)現(xiàn)自己的事務(wù)機(jī)制來(lái)實(shí)現(xiàn)exactly-once語(yǔ)義.這將在本節(jié)后面的更多細(xì)節(jié)中討論.

Semantics of Received Data (接收數(shù)據(jù)的語(yǔ)義)

不同的 input sources (輸入源)提供不同的保證, 范圍從at-least once到exactly once.

With Files

如果所有的 input data (輸入數(shù)據(jù))都已經(jīng)存在于 fault-tolerant file system (容錯(cuò)文件系統(tǒng))中 HDFS , Spark Streaming 可以隨時(shí)從任何故障中恢復(fù)并處理所有數(shù)據(jù).這給了exactly-once語(yǔ)義, 意味著無(wú)論什么故障, 所有的數(shù)據(jù)將被精確處理一次.

With Receiver-based Sources (使用基于接收器的數(shù)據(jù)源)

對(duì)于基于 receivers (接收器)的 input sources (輸入源), 容錯(cuò)語(yǔ)義取決于故障場(chǎng)景和接收器的類(lèi)型. 正如我們earlier討論的, 有兩種類(lèi)型的 receivers (接收器):

Reliable Receiver (可靠的接收器)- 這些 receivers (接收機(jī))只有在確認(rèn)收到的數(shù)據(jù)已被復(fù)制之后確認(rèn) reliable sources (可靠的源).如果這樣的接收器出現(xiàn)故障, source 將不會(huì)被接收對(duì)于 buffered (unreplicated) data (緩沖(未復(fù)制)數(shù)據(jù))的確認(rèn).因此, 如果 receiver 是重新啟動(dòng), source 將重新發(fā)送數(shù)據(jù), 并且不會(huì)由于故障而丟失數(shù)據(jù).

Unreliable Receiver (不可靠的接收器)- 這樣的接收器不會(huì)發(fā)送確認(rèn), 因此可能丟失數(shù)據(jù), 由于 worker 或 driver 故障.

根據(jù)使用的 receivers 類(lèi)型, 我們實(shí)現(xiàn)以下語(yǔ)義. 如果 worker node 出現(xiàn)故障, 則 reliable receivers 沒(méi)有數(shù)據(jù)丟失.unreliable receivers , 收到但未復(fù)制的數(shù)據(jù)可能會(huì)丟失.如果 driver node 失敗, 那么除了這些損失之外, 在內(nèi)存中接收和復(fù)制的所有過(guò)去的數(shù)據(jù)將丟失.這將影響 stateful transformations (有狀態(tài)轉(zhuǎn)換)的結(jié)果.

為避免過(guò)去收到的數(shù)據(jù)丟失, Spark 1.2 引入了_write ahead logs_ 將接收到的數(shù)據(jù)保存到 fault-tolerant storage (容錯(cuò)存儲(chǔ)).用write ahead logs enabled和 reliable receivers, 數(shù)據(jù)沒(méi)有丟失.在語(yǔ)義方面, 它提供 at-least once guarantee (至少一次保證).

下表總結(jié)了失敗的語(yǔ)義:

Deployment Scenario (部署場(chǎng)景)Worker Failure (Worker 故障)Driver Failure (Driver 故障)

Spark 1.1 或更早版本,或者

Spark 1.2 或者沒(méi)有 write ahead logs 的更高的版本Buffered data lost with unreliable receivers(unreliable receivers 的緩沖數(shù)據(jù)丟失)

Zero data loss with reliable receivers (reliable receivers 的零數(shù)據(jù)丟失)

At-least once semantics (至少一次性語(yǔ)義)Buffered data lost with unreliable receivers (unreliable receivers 的緩沖數(shù)據(jù)丟失)

Past data lost with all receivers (所有的 receivers 的過(guò)去的數(shù)據(jù)丟失)

Undefined semantics (未定義語(yǔ)義)

Spark 1.2 或者帶有 write ahead logs 的更高版本Zero data loss with reliable receivers(reliable receivers 的零數(shù)據(jù)丟失)

At-least once semantics (至少一次性語(yǔ)義)Zero data loss with reliable receivers and files (reliable receivers 和 files 的零數(shù)據(jù)丟失)

At-least once semantics (至少一次性語(yǔ)義)

With Kafka Direct API (使用 Kafka Direct API)

在 Spark 1.3 中, 我們引入了一個(gè)新的 Kafka Direct API , 可以確保所有的 Kafka 數(shù)據(jù)都被 Spark Streaming exactly once (一次)接收.與此同時(shí), 如果您實(shí)現(xiàn) exactly-once output operation (一次性輸出操作), 您可以實(shí)現(xiàn) end-to-end exactly-once guarantees (端到端的一次且僅一次性保證).在Kafka Integration Guide中進(jìn)一步討論了這種方法.

Semantics of output operations (輸出操作的語(yǔ)義)

Output operations (輸出操作)(如foreachRDD)具有at-least once語(yǔ)義, 也就是說(shuō), transformed data (變換后的數(shù)據(jù))可能會(huì)不止一次寫(xiě)入 external entity (外部實(shí)體)在一個(gè) worker 故障事件中.雖然這是可以接受的使用saveAs***Files操作(因?yàn)槲募⒈幌嗤臄?shù)據(jù)簡(jiǎn)單地覆蓋) 保存到文件系統(tǒng), 可能需要額外的努力來(lái)實(shí)現(xiàn) exactly-once (一次且僅一次)語(yǔ)義.有兩種方法.

Idempotent updates (冪等更新): 多次嘗試總是寫(xiě)入相同的數(shù)據(jù).例如,saveAs***Files總是將相同的數(shù)據(jù)寫(xiě)入生成的文件.

Transactional updates (事務(wù)更新): 所有更新都是事務(wù)性的, 以便更新完全按原子進(jìn)行.這樣做的一個(gè)方法如下.

使用批處理時(shí)間(在foreachRDD中可用)和 RDD 的 partition index (分區(qū)索引)來(lái)創(chuàng)建 identifier (標(biāo)識(shí)符).該標(biāo)識(shí)符唯一地標(biāo)識(shí) streaming application 中的 blob 數(shù)據(jù).

使用該 identifier (標(biāo)識(shí)符)blob transactionally (blob 事務(wù)地)更新 external system (外部系統(tǒng))(即, exactly once, atomically (一次且僅一次, 原子性地)).也就是說(shuō), 如果 identifier (標(biāo)識(shí)符)尚未提交, 則以 atomically (原子方式)提交 partition data (分區(qū)數(shù)據(jù))和 identifier (標(biāo)識(shí)符).否則, 如果已經(jīng)提交, 請(qǐng)?zhí)^(guò)更新.

dstream.foreachRDD { (rdd, time) =>

rdd.foreachPartition { partitionIterator =>

val partitionId = TaskContext.get.partitionId()

val uniqueId = generateUniqueId(time.milliseconds, partitionId)

// use this uniqueId to transactionally commit the data in partitionIterator

}

}

快速鏈接

附加指南

Kafka 集成指南

Kinesis 集成指南

自定義 Receiver(接收器)指南

第三方 DStream 數(shù)據(jù)源可以在第三方項(xiàng)目上查看.

API 文檔

Scala 文檔

StreamingContextDStream

KafkaUtils,FlumeUtils,KinesisUtils,

Java 文檔

JavaStreamingContext,JavaDStreamJavaPairDStream

KafkaUtils,FlumeUtils,KinesisUtils

Python 文檔

StreamingContextDStream

KafkaUtils

更多的示例在ScalaJavaPython

描述 Spark Streaming 的Papervideo.

我們一直在努力

apachecn/spark-doc-zh

原文地址: http://spark.apachecn.org/docs/cn/2.2.0/streaming-programming-guide.html

網(wǎng)頁(yè)地址: http://spark.apachecn.org/

github: https://github.com/apachecn/spark-doc-zh(覺(jué)得不錯(cuò)麻煩給個(gè) Star耍攘,謝謝榕栏!~)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末畔勤,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子扒磁,更是在濱河造成了極大的恐慌庆揪,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,126評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件妨托,死亡現(xiàn)場(chǎng)離奇詭異缸榛,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)兰伤,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)内颗,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人敦腔,你說(shuō)我怎么就攤上這事均澳。” “怎么了符衔?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,445評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵找前,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我柏腻,道長(zhǎng)纸厉,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,185評(píng)論 1 278
  • 正文 為了忘掉前任五嫂,我火速辦了婚禮颗品,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘沃缘。我一直安慰自己躯枢,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布槐臀。 她就那樣靜靜地躺著锄蹂,像睡著了一般。 火紅的嫁衣襯著肌膚如雪水慨。 梳的紋絲不亂的頭發(fā)上得糜,一...
    開(kāi)封第一講書(shū)人閱讀 48,970評(píng)論 1 284
  • 那天,我揣著相機(jī)與錄音晰洒,去河邊找鬼朝抖。 笑死,一個(gè)胖子當(dāng)著我的面吹牛谍珊,可吹牛的內(nèi)容都是我干的治宣。 我是一名探鬼主播,決...
    沈念sama閱讀 38,276評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼侮邀!你這毒婦竟也來(lái)了坏怪?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 36,927評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤绊茧,失蹤者是張志新(化名)和其女友劉穎铝宵,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體按傅,經(jīng)...
    沈念sama閱讀 43,400評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡捉超,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了唯绍。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 37,997評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡枝誊,死狀恐怖况芒,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情叶撒,我是刑警寧澤绝骚,帶...
    沈念sama閱讀 33,646評(píng)論 4 322
  • 正文 年R本政府宣布,位于F島的核電站祠够,受9級(jí)特大地震影響压汪,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜古瓤,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評(píng)論 3 307
  • 文/蒙蒙 一止剖、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧落君,春花似錦穿香、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,204評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至纹冤,卻和暖如春洒宝,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背萌京。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,423評(píng)論 1 260
  • 我被黑心中介騙來(lái)泰國(guó)打工雁歌, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人枫夺。 一個(gè)月前我還...
    沈念sama閱讀 45,423評(píng)論 2 352
  • 正文 我出身青樓将宪,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子较坛,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容