Apache Spark 2.2.0 中文文檔 - Structured Streaming 編程指南 | ApacheCN

Structured Streaming 編程指南

概述

快速示例

Programming Model (編程模型)

基本概念

處理 Event-time 和延遲數(shù)據(jù)

容錯語義

API 使用 Datasets 和 DataFrames

創(chuàng)建 streaming DataFrames 和 streaming Datasets

Input Sources (輸入源)

streaming DataFrames/Datasets 的模式接口和分區(qū)

streaming DataFrames/Datasets 上的操作

基礎(chǔ)操作 - Selection, Projection, Aggregation

Window Operations on Event Time (事件時間窗口操作)

處理 Late Data (遲到數(shù)據(jù))和 Watermarking (水酉芙肌)

Join 操作

Streaming Deduplication (Streaming 去重)

Arbitrary Stateful Operations (任意有狀態(tài)的操作)

不支持的操作

開始 Streaming Queries

Output Modes (輸出模式)

Output Sinks (輸出接收器)

使用 Foreach

管理 Streaming Queries

監(jiān)控 Streaming Queries

Interactive APIs

Asynchronous API

Recovering from Failures with Checkpointing (從檢查點恢復(fù)故障)

從這里去哪兒

概述

Structured Streaming (結(jié)構(gòu)化流)是一種基于 Spark SQL 引擎構(gòu)建的可擴展且容錯的 stream processing engine (流處理引擎)自赔。您可以以靜態(tài)數(shù)據(jù)表示批量計算的方式來表達 streaming computation (流式計算)。 Spark SQL 引擎將隨著 streaming data 持續(xù)到達而增量地持續(xù)地運行,并更新最終結(jié)果。您可以使用 Scala 壕吹, Java , Python 或 R 中的Dataset/DataFrame API來表示 streaming aggregations (流聚合)蓬蝶, event-time windows (事件時間窗口)读处, stream-to-batch joins (流到批處理連接) 等糊治。在同一個 optimized Spark SQL engine (優(yōu)化的 Spark SQL 引擎)上執(zhí)行計算。最后罚舱,系統(tǒng)通過 checkpointing (檢查點) 和 Write Ahead Logs (預(yù)寫日志)來確保 end-to-end exactly-once (端到端的完全一次性) 容錯保證井辜。簡而言之绎谦,Structured Streaming 提供快速,可擴展粥脚,容錯窃肠,end-to-end exactly-once stream processing (端到端的完全一次性流處理),而無需用戶理解 streaming 刷允。

在本指南中铭拧,我們將向您介紹 programming model (編程模型) 和 APIs 。首先恃锉,我們從一個簡單的例子開始 - 一個 streaming word count 搀菩。

快速示例

假設(shè)您想要保持從監(jiān)聽 TCP socket 的 data server (數(shù)據(jù)服務(wù)器) 接收的 text data (文本數(shù)據(jù))的運行的 word count 。 讓我們看看如何使用 Structured Streaming 表達這一點破托。你可以在Scala/Java/Python/R之中看到完整的代碼肪跋。 Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Streaming. You can see the full code inScala/Java/Python/R。 并且如果您下載 Spark土砂,您可以直接運行這個例子州既。在任何情況下,讓我們逐步了解示例并了解它的工作原理萝映。首先吴叶,我們必須導(dǎo)入必要的 classes 并創(chuàng)建一個本地的 SparkSession ,這是與 Spark 相關(guān)的所有功能的起點序臂。

Scala

Java

Python

R

importorg.apache.spark.sql.functions._importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()importspark.implicits._

接下來蚌卤,我們創(chuàng)建一個 streaming DataFrame ,它表示從監(jiān)聽 localhost:9999 的服務(wù)器上接收的 text data (文本數(shù)據(jù))奥秆,并且將 DataFrame 轉(zhuǎn)換以計算 word counts 逊彭。

Scala

Java

Python

R

// 創(chuàng)建表示從連接到 localhost:9999 的輸入行 stream 的 DataFramevallines=spark.readStream.format("socket").option("host","localhost").option("port",9999).load()// 將 lines 切分為 wordsvalwords=lines.as[String].flatMap(_.split(" "))// 生成正在運行的 word countvalwordCounts=words.groupBy("value").count()

這個linesDataFrame 表示一個包含包含 streaming text data (流文本數(shù)據(jù)) 的無邊界表。此表包含了一列名為 “value” 的 strings 构订,并且 streaming text data 中的每一 line (行)都將成為表中的一 row (行)。請注意囊榜,這并不是正在接收的任何數(shù)據(jù)亥宿,因為我們只是設(shè)置 transformation (轉(zhuǎn)換),還沒有開始孔庭。接下來,我們使用.as[String]將 DataFrame 轉(zhuǎn)換為 String 的 Dataset 圆到,以便我們可以應(yīng)用flatMap操作將每 line (行)切分成多個 words 马绝。所得到的wordsDataset 包含所有的 words 富稻。最后椭赋,我們通過將 Dataset 中 unique values (唯一的值)進行分組并對它們進行計數(shù)來定義wordCountsDataFrame 。請注意,這是一個 streaming DataFrame 叉信,它表示 stream 的正在運行的 word counts 艘希。

我們現(xiàn)在已經(jīng)設(shè)置了關(guān)于 streaming data (流數(shù)據(jù))的 query (查詢)鸠姨。剩下的就是實際開始接收數(shù)據(jù)并計算 counts (計數(shù))连茧。為此客扎,我們將其設(shè)置為在每次更新時將完整地計數(shù)(由outputMode("complete")指定)發(fā)送到控制臺。然后使用start()啟動 streaming computation (流式計算)袱吆。

Scala

Java

Python

R

// 開始運行將 running counts 打印到控制臺的查詢valquery=wordCounts.writeStream.outputMode("complete").format("console").start()query.awaitTermination()

執(zhí)行此代碼之后婶希, streaming computation (流式計算) 將在后臺啟動。query對象是該 active streaming query (活動流查詢)的 handle (句柄),并且我們決定使用awaitTermination()來等待查詢的終止龄砰,以防止查詢處于 active (活動)狀態(tài)時退出。

要實際執(zhí)行此示例代碼,您可以在您自己的Spark 應(yīng)用程序編譯代碼夕玩,或者簡單地運行示例一旦您下載了 Spark 。我們正在展示的是后者。您將首先需要運行 Netcat (大多數(shù)類 Unix 系統(tǒng)中的一個小型應(yīng)用程序)作為 data server 通過使用

$ nc -lk 9999

然后爆侣,在一個不同的終端,您可以啟動示例通過使用

Scala

Java

Python

R

$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost9999

然后,在運行 netcat 服務(wù)器的終端中輸入的任何 lines 將每秒計數(shù)并打印在屏幕上榕吼。它看起來像下面這樣嗤堰。

# 終端 1:# 運行 Netcat$ nc -lk9999apache sparkapache hadoop...

Scala

Java

Python

R

# 終端 2: 運行 StructuredNetworkWordCount$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost9999-------------------------------------------Batch:0-------------------------------------------+------+-----+|value|count|+------+-----+|apache|1||spark|1|+------+-----+-------------------------------------------Batch:1-------------------------------------------+------+-----+|value|count|+------+-----+|apache|2||spark|1||hadoop|1|+------+-----+...

Programming Model (編程模型)

Structured Streaming 的關(guān)鍵思想是將 live data stream (實時數(shù)據(jù)流)視為一種正在不斷 appended (附加)的表戈抄。這形成了一個與 batch processing model (批處理模型)非常相似的新的 stream processing model (流處理模型)输莺。您會將您的 streaming computation (流式計算)表示為在一個靜態(tài)表上的 standard batch-like query (標(biāo)準(zhǔn)類批次查詢),并且 Spark 在unbounded(無界)輸入表上運行它作為incremental(增量)查詢嘱函。讓我們更加詳細地了解這個模型。

基本概念

將 input data stream (輸入數(shù)據(jù)流) 視為 “Input Table”(輸入表)。每個在 stream 上到達的 data item (數(shù)據(jù)項)就像是一個被 appended 到 Input Table 的新的 row 撇寞。

對輸入的查詢將生成 “Result Table” (結(jié)果表)祖灰。每個 trigger interval (觸發(fā)間隔)(例如,每 1 秒)三妈,新 row (行)將附加到 Input Table 悠鞍,最終更新 Result Table 。無論何時更新 result table ,我們都希望將 changed result rows (更改的結(jié)果行)寫入 external sink (外部接收器)浩嫌。

“Output(輸出)” 被定義為寫入 external storage (外部存儲器)的內(nèi)容溶其¤氤粒可以以不同的模式定義 output :

Complete Mode(完全模式)- 整個更新的 Result Table 將被寫入外部存儲。由 storage connector (存儲連接器)決定如何處理整個表的寫入。

Append Mode(附加模式)- 只有 Result Table 中自上次觸發(fā)后附加的新 rows(行) 將被寫入 external storage (外部存儲)钞速。這僅適用于不期望更改 Result Table 中現(xiàn)有行的查詢。

Update Mode(更新模式)- 只有自上次觸發(fā)后 Result Table 中更新的 rows (行)將被寫入 external storage (外部存儲)(從 Spark 2.1.1 之后可用)牙甫。請注意,這與 Complete Mode (完全模式),因為此模式僅輸出自上次觸發(fā)以來更改的 rows (行)旋奢。如果查詢不包含 aggregations (聚合)玷过,它將等同于 Append mode 粤蝎。

請注意,每種模式適用于特定模型的查詢碑宴。這將在later詳細討論锣披。

為了說明這個模型的使用增热,我們來了解一下上面章節(jié)的快速示例。第一個linesDataFrame 是 input table 帆调,并且最后的wordCountsDataFrame 是 result table 含鳞。請注意鸭廷,streaminglinesDataFrame 上的查詢生成wordCounts是exactly the same(完全一樣的)因為它將是一個 static DataFrame (靜態(tài) DataFrame )。但是,當(dāng)這個查詢啟動時, Spark 將從 socket 連接中持續(xù)檢查新數(shù)據(jù)一喘。如果有新數(shù)據(jù)闷沥,Spark 將運行一個 “incremental(增量)” 查詢撞鹉,它會結(jié)合以前的 running counts (運行計數(shù))與新數(shù)據(jù)計算更新的 counts 览祖,如下所示又活。

這種模式與許多其他 stream processing engines (流處理引擎)有著顯著不同。許多 streaming systems (流系統(tǒng))要求用戶本身保持運行 aggregations (聚合)舔清,因此必須要考慮容錯臼婆,和數(shù)據(jù)一致性(at-least-once(至少一次)故响, at-most-once (最多一次),exactly-once (完全一次))。在這個模型中,當(dāng)有新數(shù)據(jù)時, Spark 負責(zé)更新 Result Table 耍攘,從而減輕用戶對它的考慮庆揪。舉個例子吝羞,我們來看一下這個模型如何處理對于基于 event-time 的處理和 late arriving (遲到)的數(shù)據(jù)。

處理 Event-time 和延遲數(shù)據(jù)

Event-time 是數(shù)據(jù)本身 embedded (嵌入)的時間。對于很多應(yīng)用程序纸厉,您可能需要在此 event-time 進行操作肯尺。例如锄蹂,如果要每分鐘獲取 IoT devices (設(shè)備)生成的 events 數(shù)敬扛,則可能希望使用數(shù)據(jù)生成的時間(即數(shù)據(jù)中的 event-time )治宣,而不是 Spark 接收到它們的時間。這個 event-time 在這個模型中非常自然地表現(xiàn)出來 – 來自 devices (設(shè)備)的每個 event 都是表中的一 row(行),并且 event-time 是 row (行)中的 column value (列值)捉超。這允許 window-based aggregations (基于窗口的聚合)(例如每分鐘的 events 數(shù))僅僅是 event-time 列上的特殊類型的 group (分組)和 aggregation (聚合) – 每個 time window 是一個組,并且每一 row (行)可以屬于多個 windows/groups 。因此,可以在 static dataset (靜態(tài)數(shù)據(jù)集)(例如來自 collected device events logs (收集的設(shè)備事件日志))以及 data stream 上一致地定義 event-time-window-based aggregation queries (基于事件時間窗口的聚合查詢),從而使用戶的使用壽命更加容易。

此外绎速,這個模型自然地處理了比預(yù)計將根據(jù)它的 event-time 到達的數(shù)據(jù)晚到的數(shù)據(jù)焙蚓。由于 Spark 正在更新 Result Table 纹冤, Spark 有完整的控制對當(dāng)有遲到的數(shù)據(jù)時 updating old aggregates (更新舊的聚合),以及清理 old aggregates (舊聚合) 以限制 intermediate state data (中間體狀態(tài)數(shù)據(jù))的大小主届。自 Spark 2.1 以來,我們對于 watermarking 進行了支持君丁,允許用戶指定 late data 的閾值枫夺,并允許引擎相應(yīng)地清理舊狀態(tài)。這些將在后面的Window Operations部分解釋绘闷。

容錯語義

提供 end-to-end exactly-once semantics (端到端的完全一次性語義)是 Structured Streaming 設(shè)計背后的關(guān)鍵目標(biāo)之一橡庞。為了實現(xiàn)這一點,我們設(shè)計了 Structured Streaming sources 印蔗, sinks 和 execution engine (執(zhí)行引擎)扒最,以可靠的跟蹤處理確切進度,以便它可以通過 restarting and/or reprocessing (重新啟動和/或重新處理)來處理任何類型的故障华嘹。假設(shè)每個 streaming source 都具有 offsets (偏移量)(類似于 Kafka offsets 或 Kinesis sequence numbers (Kafka 偏移量或 Kinesis 序列號))來跟蹤 stream 中的 read position (讀取位置)吧趣。引擎使用 checkpointing (檢查點)并 write ahead logs (預(yù)寫日志)記錄每個 trigger (觸發(fā)器)中正在處理的數(shù)據(jù)的 offset range (偏移范圍)。 streaming sinks 設(shè)計為處理后處理的 idempotent (冪等)香缺。一起使用 replayable sources (可重放源)和 idempotent sinks (冪等接收器)郎楼, Structured Streaming 可以確保在任何故障下end-to-end exactly-once semantics(端對端完全一次性語義)。

API 使用 Datasets 和 DataFrames

自從 Spark 2.0 犬庇, DataFrame 和 Datasets 可以表示 static (靜態(tài))俯渤, bounded data(有界數(shù)據(jù))呆细,以及 streaming , unbounded data (無界數(shù)據(jù))八匠。類似于 static Datasets/DataFrames 絮爷,您可以使用常用的 entry point (入口點)SparkSession(Scala/Java/Python/R文檔) 來從 streaming sources 中創(chuàng)建 streaming DataFrames/Datasets ,并將其作為 static DataFrames/Datasets 應(yīng)用相同的操作梨树。如果您不熟悉 Datasets/DataFrames 坑夯,強烈建議您使用DataFrame/Dataset 編程指南來熟悉它們。

創(chuàng)建 streaming DataFrames 和 streaming Datasets

可以通過DataStreamReader的接口 (Scala/Java/Python文檔 )來創(chuàng)建 Streaming DataFrames 并由SparkSession.readStream()返回劝萤。在R中渊涝,使用read.stream()方法。與創(chuàng)建 static DataFrame 的 read interface (讀取接口)類似床嫌,您可以指定 source - data format (數(shù)據(jù)格式), schema (模式)胸私, options (選項)等的詳細信息厌处。

Input Sources (輸入源)

在 Spark 2.0 中,有一些內(nèi)置的 sources 岁疼。

File source(文件源)- 以文件流的形式讀取目錄中寫入的文件阔涉。支持的文件格式為 text , csv 捷绒, json 瑰排, parquet 。有關(guān)更多的 up-to-date 列表暖侨,以及每種文件格式的支持選項椭住,請參閱 DataStreamReader interface 的文檔。請注意字逗,文件必須以 atomically (原子方式)放置在給定的目錄中京郑,這在大多數(shù)文件系統(tǒng)中可以通過文件移動操作實現(xiàn)。

Kafka source(Kafka 源)- 來自 Kafka 的 Poll 數(shù)據(jù)葫掉。它與 Kafka broker 的 0.10.0 或者更高的版本兼容些举。有關(guān)詳細信息,請參閱Kafka Integration 指南俭厚。

Socket source (for testing) (Socket 源(用于測試))- 從一個 socket 連接中讀取 UTF8 文本數(shù)據(jù)户魏。 listening server socket (監(jiān)聽服務(wù)器 socket)位于 driver 。請注意,這只能用于測試叼丑,因為它不提供 end-to-end fault-tolerance (端到端的容錯)保證关翎。

某些 sources 是不容錯的,因為它們不能保證數(shù)據(jù)在使用 checkpointed offsets (檢查點偏移量)故障之后可以被重新使用幢码。參見前面的部分fault-tolerance semantics笤休。以下是 Spark 中所有 sources 的詳細信息。

SourceOptions(選項)Fault-tolerant(容錯)Notes(說明)

File source(文件源)path: 輸入路徑的目錄症副,并且與所有文件格式通用店雅。

maxFilesPerTrigger: 每個 trigger (觸發(fā)器)中要考慮的最大新文件數(shù)(默認(rèn)是: 無最大值)

latestFirst: 是否先處理最新的新文件,當(dāng)有大量積壓的文件時有用(默認(rèn): false)

fileNameOnly: 是否僅根據(jù)文件名而不是完整路徑檢查新文件(默認(rèn)值: false)贞铣。將此設(shè)置為 `true` 闹啦,以下文件將被視為相同的文件,因為它們的文件名 "dataset.txt" 是相同的:

· "file:///dataset.txt"

· "s3://a/dataset.txt"

· "s3n://a/b/dataset.txt"

· "s3a://a/b/c/dataset.txt"

有關(guān)特定于 file-format-specific (文件格式)的選項辕坝,請參閱DataStreamReader(Scala/Java/Python/R) 中的相關(guān)方法窍奋。例如,對于 "parquet" 格式選項請參閱DataStreamReader.parquet()Yes支持 glob 路徑酱畅,但是不支持多個逗號分隔的 paths/globs 琳袄。

Socket Source(Socket 源)host: 連接到的 host ,必須指定

port: 連接的 port (端口)纺酸,必須指定No

Kafka Source(Kafka 源)請查看Kafka Integration 指南.Yes

這里有一些例子窖逗。

Scala

Java

Python

R

valspark:SparkSession=...// 從 socket 讀取 textvalsocketDF=spark.readStream.format("socket").option("host","localhost").option("port",9999).load()socketDF.isStreaming// 對于有 streaming sources 的 DataFrame 返回 TruesocketDF.printSchema// 讀取目錄內(nèi)原子寫入的所有 csv 文件valuserSchema=newStructType().add("name","string").add("age","integer")valcsvDF=spark.readStream.option("sep",";").schema(userSchema)// 指定 csv 文件的模式.csv("/path/to/directory")// 等同于 format("csv").load("/path/to/directory")

這些示例生成無類型的 streaming DataFrames ,這意味著在編譯時不會檢查 DataFrame 的模式餐蔬,僅在運行時在 query is submitted (查詢提交)的時候進行檢查碎紊。像map,flatMap等這樣的操作需要在編譯時知道這個類型樊诺。要做到這一點仗考,您可以使用與 static DataFrame 相同的方法將這些 untyped (無類型)的 streaming DataFrames 轉(zhuǎn)換為 typed streaming Datasets (類型化的 streaming Datasets )。有關(guān)詳細信息词爬,請參閱SQL 編程指南秃嗜。此外,有關(guān)支持的 streaming sources 的更多詳細信息將在文檔后面討論缸夹。

streaming DataFrames/Datasets 的模式接口和分區(qū)

默認(rèn)情況下痪寻,基于文件的 sources 的 Structured Streaming 需要您指定 schema (模式),而不是依靠 Spark 自動 infer 虽惭。這種 restriction 確保了 consistent schema (一致的模式)將被用于 streaming query (流式查詢)橡类,即使在出現(xiàn)故障的情況下也是如此。對于 ad-hoc use cases (特殊用例)芽唇,您可以通過將spark.sql.streaming.schemaInference設(shè)置為true來重新啟用 schema inference (模式接口)顾画。

當(dāng)存在名為/key=value/的子目錄并且列表將自動遞歸到這些目錄中時取劫,會發(fā)生 Partition discovery (分區(qū)發(fā)現(xiàn))。如果這些 columns (列)顯示在用戶提供的 schema 中研侣,則它們將根據(jù)正在讀取的文件路徑由 Spark 進行填充谱邪。 構(gòu)成 partitioning scheme (分區(qū)方案)的目錄 must be present when the query starts (必須在查詢開始時是存在的),并且必須保持 static 庶诡。例如惦银,當(dāng)/data/year=2015/存在時,可以添加/data/year=2016/末誓,但是更改 partitioning column (分區(qū)列)是無效的(即通過創(chuàng)建目錄/data/date=2016-04-17/)扯俱。

streaming DataFrames/Datasets 上的操作

您可以對 streaming DataFrames/Datasets 應(yīng)用各種操作 - 從 untyped (無類型), SQL-like operations (類似 SQL 的操作)(例如select喇澡,where迅栅,groupBy) 到 typed RDD-like operations (類型化的類似 RDD 的操作)(例如map,filter晴玖,flatMap)读存。有關(guān)詳細信息,請參閱SQL 編程指南呕屎。讓我們來看看可以使用的幾個示例操作让簿。

基礎(chǔ)操作 - Selection, Projection, Aggregation

streaming 支持 DataFrame/Dataset 上的大多數(shù)常見操作。不支持的少數(shù)操作discussed later將在本節(jié)中討論(稍后討論)秀睛。

Scala

Java

Python

R

caseclassDeviceData(device:String,deviceType:String,signal:Double,time:DateTime)valdf:DataFrame=...// streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }valds:Dataset[DeviceData]=df.as[DeviceData]// streaming Dataset with IOT device data// Select the devices which have signal more than 10df.select("device").where("signal > 10")// using untyped APIsds.filter(_.signal>10).map(_.device)// using typed APIs// Running count of the number of updates for each device typedf.groupBy("deviceType").count()// using untyped API// Running average signal for each device typeimportorg.apache.spark.sql.expressions.scalalang.typedds.groupByKey(_.deviceType).agg(typed.avg(_.signal))// using typed API

Window Operations on Event Time (事件時間窗口操作)

通過 Structured Streaming 拜英, sliding event-time window (滑動事件時間窗口)的 Aggregations (聚合)很簡單,與 grouped aggregations (分組聚合)非常相似琅催。在 grouped aggregation (分組聚合)中,為 user-specified grouping column (用戶指定的分組列)中的每個唯一值維護 aggregate values (聚合值)(例如 counts )虫给。在 window-based aggregations (基于窗口的聚合)的情況下藤抡,針對每個窗口的 event-time 維持 aggregate values (聚合值)。讓我們用一個例子來理解這一點抹估。

想象一下缠黍,我們的快速示例被修改,并且 stream 現(xiàn)在包含生成 line 的時間的 line 药蜻。不運行 word counts 瓷式,我們想 count words within 10 minute windows (在 10 分鐘內(nèi)的窗口計數(shù)單詞),每 5 分鐘更新一次语泽。也就是說贸典,在 10 minute windows (10 分鐘的窗口之間)收到的 word counts 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20 等。請注意踱卵, 12:00 - 12:10 表示數(shù)據(jù)在 12:00 之后但在 12:10 之前抵達±韧眨現(xiàn)在据过,考慮在 12:07 收到一個 word 。這個 word 應(yīng)該增加對應(yīng)于兩個窗口的計數(shù) 12:00 - 12:10 和 12:05 - 12:15 妒挎。因此绳锅, counts 將被二者分組, grouping key (分組秘鑰)(即 word)和 window (窗口)(可以從 event-time 計算)來 indexed (索引)酝掩。

result tables 將如下所示鳞芙。

由于這個 windowing (窗口)類似于 grouping (分組),在代碼中期虾,您可以使用groupBy()和window()操作來表示 windowed aggregations (窗口化的聚合)原朝。您可以看到以下示例Scala/Java/Python的完整代碼。

Scala

Java

Python

importspark.implicits._valwords=...// streaming DataFrame of schema { timestamp: Timestamp, word: String }// Group the data by window and word and compute the count of each groupvalwindowedCounts=words.groupBy(window($"timestamp","10 minutes","5 minutes"),$"word").count()

處理 Late Data (遲到數(shù)據(jù))和 Watermarking (水映瓜)

現(xiàn)在考慮以下如果其中一個 event 遲到應(yīng)用程序會發(fā)生什么竿拆。例如,想象一下宾尚,在 12:04 (即 event time )生成的 word 可以在 12:11 被接收申請丙笋。應(yīng)用程序應(yīng)該使用 12:04 而不是 12:11 來更新 window12:00 - 12:10的較舊 counts 。發(fā)生這種情況自然就是在我們 window-based grouping (基于窗口的分組中) - Structured Streaming 可以保持intermediate state 對于部分 aggregates (聚合)長時間煌贴,以便后期數(shù)據(jù)可以 update aggregates of old windows correctly (更新聚合)舊窗口正確御板,如下圖所示。

但是牛郑,要運行此查詢幾天怠肋,系統(tǒng)必須綁定 the amount of intermediate in-memory state it accumulates (中間狀態(tài)累積的數(shù)量)。這意味著系統(tǒng)需要知道什么時候 old aggregate (老聚合)可以從內(nèi)存中的狀態(tài)丟失淹朋,因為這個應(yīng)用程序不會在繼續(xù)接收 aggregate (該聚合)的更多l(xiāng)ate data (后期的數(shù)據(jù))笙各。為了實現(xiàn)這一點,在 Spark 2.1 中础芍,我們介紹了watermarking(水予厩馈),讓引擎自動跟蹤數(shù)據(jù)中的 current event time (當(dāng)前事件時間)并試圖相應(yīng)地清理舊狀態(tài)仑性。您可以定義查詢的 watermark 指定 event time column (事件時間列)和數(shù)據(jù)預(yù)期的延遲閾值 event time (事件時間)惶楼。對于從T時間開始的特定窗口,引擎將保持狀態(tài)并允許 late data (延遲數(shù)據(jù))更新狀態(tài)直到(max event time seen by the engine - late threshold > T)诊杆。換句話說歼捐, threshold (閾值)內(nèi)的 late data (晚期數(shù)據(jù))將被 aggregated ,但數(shù)據(jù)晚于閾值將被丟棄晨汹。讓我們以一個例子來理解這一點豹储。我們可以使用withWatermark()可以輕松地定義上一個例子的 watermarking (水印)宰缤,如下所示颂翼。

Scala

Java

Python

importspark.implicits._valwords=...// streaming DataFrame of schema { timestamp: Timestamp, word: String }// Group the data by window and word and compute the count of each groupvalwindowedCounts=words.withWatermark("timestamp","10 minutes").groupBy(window($"timestamp","10 minutes","5 minutes"),$"word").count()

在這個例子中晃洒,我們正在定義查詢的 watermark 對 “timestamp” 列的值,并將 “10 minutes” 定義為允許數(shù)據(jù)延遲的閾值朦乏。如果這個查詢以 Update output mode (更新輸出模式)運行(稍后在Output Modes部分中討論)球及,引擎將不斷更新 Result Table 中窗口的 counts ,直到 window is older than the watermark (窗口比水印較舊)呻疹,它滯后于 current event time (當(dāng)前事件時間)列 “timestamp” 10分鐘吃引。這是一個例子。

如圖所示刽锤,maximum event time tracked (引擎跟蹤的最大事件時間)是藍色虛線镊尺,watermark 設(shè)置為(max event time - '10 mins')在每個觸發(fā)的開始處是紅線。例如并思,當(dāng)引擎觀察數(shù)據(jù)(12:14, dog)時庐氮,它為下一個觸發(fā)器設(shè)置 watermark 為12:04。該 watermark 允許 engine 保持 intermediate state (中間狀態(tài))另外 10 分鐘以允許延遲 late data to be counted (要計數(shù)的數(shù)據(jù))宋彼。例如弄砍,數(shù)據(jù)(12:09, cat)是 out of order and late (不正常的,而且延遲了)输涕,它落在了 windows12:05 - 12:15和12:10 - 12:20音婶。因為它仍然在 watermark12:04之前的觸發(fā)器,引擎仍然將 intermediate counts (中間計數(shù))保持為狀態(tài)并正確 updates the counts of the related windows (更新相關(guān)窗口的計數(shù))莱坎。然而衣式,當(dāng) watermark 更新為12:11時,window(12:00 - 12:10)的中間狀態(tài)被清除檐什,所有 subsequent data (后續(xù)數(shù)據(jù))(例如(12:04, donkey))被認(rèn)為是 “too late” 碴卧,因此被忽視。請注意乃正,每次觸發(fā)后螟深,寫入 updated counts (更新的計數(shù))(即紫色行)作為 trigger output 進行 sink ,如下 Update mode 所示烫葬。

某些 sinks (接收器)(例如 文件)可能不支持更新模式所需的 fine-grained updates (細粒度更新)。 與他們一起工作凡蜻,我們也支持 Append Mode (附加模式)搭综,只有final counts(最終計數(shù))被寫入 sink 。這如下所示划栓。

請注意兑巾,在 non-streaming Dataset (非流數(shù)據(jù)集)上使用withWatermark是不可行的。 由于 watermark 不應(yīng)該以任何方式影響任何批處理查詢忠荞,我們將直接忽略它蒋歌。

與之前的 Update Mode 類似帅掘,引擎維護 intermediate counts for each window (每個窗口的中間計數(shù))。但是堂油,partial counts (部分計數(shù))不會更新到 Result Table 修档,也不是寫入 sink 。 引擎等待遲到的 “10 mins” 計數(shù)府框,然后刪除 window < watermark 的 intermediate state (中間狀態(tài))吱窝,并追加最終 計數(shù)到 Result Table/sink 。 例如迫靖, window12:00 - 12:10的最終計數(shù)是僅在水印更新為12:11之后附加到 Result Table 院峡。

Conditions for watermarking to clean aggregation state(watermarking 清理聚合狀態(tài)的條件)重要的是要注意,watermarking 必須滿足以下清理聚合查詢中的狀態(tài)的條件(從 Spark 2.1.1 開始系宜,將來會更改)照激。

Output mode must be Append or Update.(輸出模式必須是追加或者更新)Complete mode 要求保留所有 aggregate data (聚合數(shù)據(jù)),因此不能使用 watermarking 去掉 intermediate state (中間狀態(tài))盹牧。參見Output Modes部分俩垃,詳細說明每種輸出模式的語義。

aggregation (聚合)必須具有 event-time column (事件時間列)或 event-time column 上的window欢策。

withWatermark必須被調(diào)用與聚合中使用的 timestamp column (時間戳列)相同的列吆寨。例如,df.withWatermark("time", "1 min").groupBy("time2").count()在 Append output mode 是無效的踩寇,因為 watermark 是從聚合列在不同的列上定義的啄清。

在使用 watermark details 的 aggregation (聚合)之前必須調(diào)用withWatermark。例如俺孙,df.groupBy("time").count().withWatermark("time", "1 min")在 Append output mode 中是無效的辣卒。

Join 操作

Streaming DataFrames 可以與 static DataFrames 連接,以創(chuàng)建新的 streaming DataFrames 睛榄。 這里有幾個例子荣茫。

Scala

Java

Python

valstaticDf=spark.read....valstreamingDf=spark.readStream....streamingDf.join(staticDf,"type")// inner equi-join with a static DFstreamingDf.join(staticDf,"type","right_join")// right outer join with a static DF

Streaming Deduplication (Streaming 去重)

您可以使用 events 中的 unique identifier (唯一標(biāo)識符)對 data streams 中的記錄進行重復(fù)數(shù)據(jù)刪除。 這與使用唯一標(biāo)識符列的 static 重復(fù)數(shù)據(jù)消除完全相同场靴。 該查詢將存儲先前記錄所需的數(shù)據(jù)量啡莉,以便可以過濾重復(fù)的記錄。 與 aggregations (聚合)類似旨剥,您可以使用帶有或不帶有 watermarking 的重復(fù)數(shù)據(jù)刪除功能咧欣。

With watermark(使用 watermark )- 如果重復(fù)記錄可能到達的時間有上限,則可以在 event time column (事件時間列)上定義 watermark 轨帜,并使用 guid 和 event time columns 進行重復(fù)數(shù)據(jù)刪除魄咕。 該查詢將使用 watermark 從以前的記錄中刪除舊的狀態(tài)數(shù)據(jù),這些記錄不會再受到任何重復(fù)蚌父。 這界定了查詢必須維護的狀態(tài)量哮兰。

Without watermark (不適用 watermark )- 由于當(dāng)重復(fù)記錄可能到達時沒有界限,查詢將來自所有過去記錄的數(shù)據(jù)存儲為狀態(tài)。

Scala

Java

Python

valstreamingDf=spark.readStream....// columns: guid, eventTime, ...// Without watermark using guid columnstreamingDf.dropDuplicates("guid")// With watermark using guid and eventTime columnsstreamingDf.withWatermark("eventTime","10 seconds").dropDuplicates("guid","eventTime")

Arbitrary Stateful Operations (任意有狀態(tài)的操作)

許多用例需要比 aggregations 更高級的狀態(tài)操作言蛇。例如婿斥,在許多用例中活鹰,您必須 track (跟蹤) data streams of events (事件數(shù)據(jù)流)中的 sessions (會話)。對于進行此類 sessionization (會話),您必須將 arbitrary types of data (任意類型的數(shù)據(jù))保存為 state (狀態(tài)),并在每個 trigger 中使用 state using the data stream events (數(shù)據(jù)流事件對狀態(tài))執(zhí)行 arbitrary operations 。自從 Spark 2.2 拥诡,可以使用mapGroupsWithState操作和更強大的操作flatMapGroupsWithState來完成仇祭。這兩個操作都允許您在 grouped Datasets (分組的數(shù)據(jù)集)上應(yīng)用用戶定義的代碼來更新用戶定義的狀態(tài)徙缴。有關(guān)更具體的細節(jié)考廉,請查看 API文檔(Scala/Java) 和例子 (Scala/Java)誓军。

不支持的操作

streaming DataFrames/Datasets 不支持一些 DataFrame/Dataset 操作。其中一些如下帽哑。

streaming Datasets 不支持 Multiple streaming aggregations (多個流聚合) (i.e. a chain of aggregations on a streaming DF)(即 streaming DF 上的聚合鏈)

streaming Datasets 不支持 Limit and take first N rows 。

streaming Datasets 上的 Distinct operations 不支持肄梨。

只有在 aggregation 和 Complete Output Mode 下,streaming Datasets 才支持排序操作蓖租。

有條件地支持 streaming 和 static Datasets 之間的 Outer joins 粱侣。

不支持使用 streaming Dataset 的 Full outer join

不支持在右側(cè)使用 streaming Dataset 的 Left outer join

不支持在左側(cè)使用 streaming Dataset 的 Right outer join

不支持兩種 streaming Datasets 之間的任何種類的 joins 。

此外蓖宦,還有一些 Dataset 方法將不適用于 streaming Datasets 齐婴。他們是立即運行查詢并返回結(jié)果的操作,這在 streaming Dataset 上沒有意義稠茂。相反柠偶,這些功能可以通過顯式啟動 streaming query 來完成(參見下一節(jié))。

count()- 無法從 streaming Dataset 返回 single count 。 而是使用ds.groupBy().count()返回一個包含 running count 的 streaming Dataset 诱担。

foreach()- 而是使用ds.writeStream.foreach(...)(參見下一節(jié)).

show()- 而是使用 console sink (參見下一節(jié)).

如果您嘗試任何這些操作鲫售,您將看到一個AnalysisException,如 “operation XYZ is not supported with streaming DataFrames/Datasets” 该肴。雖然其中一些可能在未來版本的 Spark 中得到支持,還有其他一些從根本上難以有效地實現(xiàn) streaming data 藐不。例如匀哄, input stream 的排序不受支持,因為它需要保留 track of all the data received in the stream (跟蹤流中接收到的所有數(shù)據(jù))雏蛮。 因此從根本上難以有效率地執(zhí)行涎嚼。

開始 Streaming Queries

一旦定義了 final result DataFrame/Dataset ,剩下的就是讓你開始 streaming computation 挑秉。 為此法梯,您必須使用DataStreamWriter(Scala/Java/Python文檔)通過Dataset.writeStream()返回。您將必須在此 interface 中指定以下一個或多個犀概。

Details of the output sink ( output sink 的詳細信息):Data format, location, etc.

Output mode (輸出模式):指定寫入 output sink 的內(nèi)容立哑。

Query name (查詢名稱):可選,指定用于標(biāo)識的查詢的唯一名稱姻灶。

Trigger interval (觸發(fā)間隔):可選铛绰,指定觸發(fā)間隔。 如果未指定产喉,則系統(tǒng)將在上一次處理完成后立即檢查新數(shù)據(jù)的可用性捂掰。 如果由于先前的處理尚未完成而導(dǎo)致觸發(fā)時間錯誤,則系統(tǒng)將嘗試在下一個觸發(fā)點觸發(fā)曾沈,而不是在處理完成后立即觸發(fā)这嚣。

Checkpoint location (檢查點位置):對于可以保證 end-to-end fault-tolerance (端對端容錯)能力的某些 output sinks ,請指定系統(tǒng)將寫入所有 checkpoint (檢查點)信息的位置塞俱。 這應(yīng)該是與 HDFS 兼容的容錯文件系統(tǒng)中的目錄姐帚。 檢查點的語義將在下一節(jié)中進行更詳細的討論。

Output Modes (輸出模式)

有幾種類型的輸出模式敛腌。

Append mode (default) (附加模式(默認(rèn)))- 這是默認(rèn)模式卧土,其中只有 自從 last trigger (上一次觸發(fā))以來,添加到 Result Table 的新行將會是 outputted to the sink 像樊。 只有添加到 Result Table 的行將永遠不會改變那些查詢才支持這一點尤莺。 因此,這種模式 保證每行只能輸出一次(假設(shè) fault-tolerant sink )生棍。例如颤霎,只有select,where,map,flatMap,filter,join等查詢支持 Append mode 。

Complete mode (完全模式)- 每次觸發(fā)后,整個 Result Table 將被輸出到 sink 友酱。 aggregation queries (聚合查詢)支持這一點晴音。

Update mode (更新模式)- (自 Spark 2.1.1 可用) 只有 Result Table rows 自上次觸發(fā)后更新將被輸出到 sink 。更多信息將在以后的版本中添加缔杉。

不同類型的 streaming queries 支持不同的 output modes 锤躁。 以下是兼容性矩陣。

Query Type(查詢類型)Supported Output Modes(支持的輸出模式)Notes(說明)

Queries with aggregation (使用聚合的查詢)Aggregation on event-time with watermark (使用 watermark 的 event-time 聚合 )Append, Update, Complete (附加或详,更新系羞,完全)Append mode 使用 watermark 來降低 old aggregation state (舊聚合狀態(tài))。 但輸出 windowed aggregation (窗口聚合)延遲在 `withWatermark()` 中指定的 late threshold (晚期閾值)模式語義霸琴,rows 只能在 Result Table 中添加一次在 finalized (最終確定)之后(即 watermark is crossed (水印交叉)后)椒振。 有關(guān)詳細信息,請參閱Late Data部分梧乘。

Update mode 使用 watermark 刪除 old aggregation state (舊的聚合狀態(tài))澎迎。

Complete mode (完全模式)不會刪除舊的聚合狀態(tài),因為從定義這個模式 ???????? 保留 Result Table 中的所有數(shù)據(jù)选调。

Other aggregations (其他聚合)Complete, Update (完全夹供,更新)由于沒有定義 watermark(僅在其他 category 中定義),舊的聚合狀態(tài)不會刪除学歧。

不支持 Append mode 罩引,因為 aggregates (聚合)可以更新,從而違反了這種模式的語義枝笨。

Queries withmapGroupsWithStateUpdate (更新)

Queries withflatMapGroupsWithStateAppend operation mode (附加操作模式)Append (附加)flatMapGroupsWithState之后允許 Aggregations (聚合)袁铐。

Update operation mode (更新操作模式)Update(更新)flatMapGroupsWithState之后不允許 Aggregations (聚合)。

Other queries (其他查詢)Append, Update (附加横浑,更新)不支持 Complete mode 剔桨,因為將所有未分組數(shù)據(jù)保存在 Result Table 中是不可行的 。

Output Sinks (輸出接收器)

有幾種類型的內(nèi)置輸出接收器徙融。

File sink (文件接收器)- 將輸出存儲到目錄中洒缀。

writeStream.format("parquet")// can be "orc", "json", "csv", etc..option("path","path/to/destination/dir").start()

Foreach sink- 對 output 中的記錄運行 arbitrary computation 。 有關(guān)詳細信息欺冀,請參閱本節(jié)后面部分树绩。

writeStream.foreach(...).start()

Console sink (for debugging) (控制臺接收器(用于調(diào)試))- 每次觸發(fā)時,將輸出打印到 console/stdout 隐轩。 都支持 Append 和 Complete 輸出模式饺饭。 這應(yīng)該用于低數(shù)據(jù)量的調(diào)試目的,因為在每次觸發(fā)后职车,整個輸出被收集并存儲在驅(qū)動程序的內(nèi)存中瘫俊。

writeStream.format("console").start()

Memory sink (for debugging) (內(nèi)存 sink (用于調(diào)試))- 輸出作為 in-memory table (內(nèi)存表)存儲在內(nèi)存中鹊杖。都支持 Append 和 Complete 輸出模式。 這應(yīng)該用于調(diào)試目的在低數(shù)據(jù)量下扛芽,整個輸出被收集并存儲在驅(qū)動程序的存儲器中骂蓖。因此,請謹(jǐn)慎使用川尖。

writeStream.format("memory").queryName("tableName").start()

某些 sinks 是不容錯的登下,因為它們不能保證輸出的持久性并且僅用于調(diào)試目的。參見前面的部分容錯語義叮喳。以下是 Spark 中所有接收器的詳細信息庐船。

Sink (接收器)Supported Output Modes (支持的輸出模式)Options (選項)Fault-tolerant (容錯)Notes (說明)

File Sink (文件接收器)Append (附加)path: 必須指定輸出目錄的路徑。

有關(guān)特定于文件格式的選項嘲更,請參閱 DataFrameWriter (Scala/Java/Python/R) 中的相關(guān)方法。 例如揩瞪,對于 "parquet" 格式選項赋朦,請參閱DataFrameWriter.parquet()Yes支持對 partitioned tables (分區(qū)表)的寫入。按時間 Partitioning (劃分)可能是有用的李破。

Foreach SinkAppend, Update, Compelete (附加宠哄,更新,完全)None取決于 ForeachWriter 的實現(xiàn)嗤攻。更多詳細信息在下一節(jié)

Console Sink (控制臺接收器)Append, Update, Complete (附加毛嫉,更新,完全)numRows: 每個觸發(fā)器需要打印的行數(shù)(默認(rèn):20)

truncate: 如果輸出太長是否截斷(默認(rèn): true)No

Memory Sink (內(nèi)存接收器)Append, Complete (附加妇菱,完全)None否承粤。但是在 Complete Mode 模式下,重新啟動的查詢將重新創(chuàng)建完整的表闯团。Table name is the query name.(表名是查詢的名稱)

請注意辛臊,您必須調(diào)用start()來實際啟動查詢的執(zhí)行。 這將返回一個 StreamingQuery 對象房交,它是連續(xù)運行的執(zhí)行的句柄彻舰。 您可以使用此對象來管理查詢,我們將在下一小節(jié)中討論候味。 現(xiàn)在刃唤,讓我們通過幾個例子了解所有這些。

Scala

Java

Python

R

// ========== DF with no aggregations ==========valnoAggDF=deviceDataDf.select("device").where("signal > 10")// Print new data to consolenoAggDF.writeStream.format("console").start()// Write new data to Parquet filesnoAggDF.writeStream.format("parquet").option("checkpointLocation","path/to/checkpoint/dir").option("path","path/to/destination/dir").start()// ========== DF with aggregation ==========valaggDF=df.groupBy("device").count()// Print updated aggregations to consoleaggDF.writeStream.outputMode("complete").format("console").start()// Have all the aggregates in an in-memory tableaggDF.writeStream.queryName("aggregates")// this query name will be the table name.outputMode("complete").format("memory").start()spark.sql("select * from aggregates").show()// interactively query in-memory table

使用 Foreach

foreach操作允許在輸出數(shù)據(jù)上計算 arbitrary operations 白群。從 Spark 2.1 開始尚胞,這只適用于 Scala 和 Java 。為了使用這個川抡,你必須實現(xiàn)接口ForeachWriter(Scala/Java文檔) 其具有在 trigger (觸發(fā)器)之后生成 sequence of rows generated as output (作為輸出的行的序列)時被調(diào)用的方法辐真。請注意以下要點须尚。

writer 必須是 serializable (可序列化)的,因為它將被序列化并發(fā)送給 executors 執(zhí)行侍咱。

所有這三個方法耐床,open,process和close都會在執(zhí)行器上被調(diào)用楔脯。

只有當(dāng)調(diào)用open方法時撩轰,writer 才能執(zhí)行所有的初始化(例如打開連接,啟動事務(wù)等)昧廷。請注意堪嫂,如果在創(chuàng)建對象時立即在類中進行任何初始化,那么該初始化將在 driver 中發(fā)生(因為這是正在創(chuàng)建的實例)木柬,這可能不是您打算的皆串。

version和partition是open中的兩個參數(shù),它們獨特地表示一組需要被 pushed out 的行眉枕。version是每個觸發(fā)器增加的單調(diào)遞增的 id 恶复。partition是一個表示輸出分區(qū)的 id ,因為輸出是分布式的速挑,將在多個執(zhí)行器上處理谤牡。

open可以使用version和partition來選擇是否需要寫入行的順序。因此姥宝,它可以返回true(繼續(xù)寫入)或false( 不需要寫入 )翅萤。如果返回false,那么process不會在任何行上被調(diào)用腊满。例如套么,在 partial failure (部分失敗)之后碳蛋,失敗的觸發(fā)器的一些輸出分區(qū)可能已經(jīng)被提交到數(shù)據(jù)庫违诗。基于存儲在數(shù)據(jù)庫中的 metadata (元數(shù)據(jù))疮蹦, writer 可以識別已經(jīng)提交的分區(qū)诸迟,因此返回 false 以跳過再次提交它們。

當(dāng)open被調(diào)用時愕乎,close也將被調(diào)用(除非 JVM 由于某些錯誤而退出)阵苇。即使open返回 false 也是如此。如果在處理和寫入數(shù)據(jù)時出現(xiàn)任何錯誤感论,那么close將被錯誤地調(diào)用绅项。您有責(zé)任清理以open創(chuàng)建的狀態(tài)(例如,連接比肄,事務(wù)等)快耿,以免資源泄漏囊陡。

管理 Streaming Queries

在啟動查詢時創(chuàng)建的StreamingQuery對象可用于 monitor and manage the query (監(jiān)視和管理查詢)。

Scala

Java

Python

R

valquery=df.writeStream.format("console").start()// get the query objectquery.id// get the unique identifier of the running query that persists across restarts from checkpoint dataquery.runId// get the unique id of this run of the query, which will be generated at every start/restartquery.name// get the name of the auto-generated or user-specified namequery.explain()// print detailed explanations of the queryquery.stop()// stop the queryquery.awaitTermination()// block until query is terminated, with stop() or with errorquery.exception// the exception if the query has been terminated with errorquery.recentProgress// an array of the most recent progress updates for this queryquery.lastProgress// the most recent progress update of this streaming query

您可以在單個 SparkSession 中啟動任意數(shù)量的查詢掀亥。 他們都將同時運行共享集群資源撞反。 您可以使用sparkSession.streams()獲取StreamingQueryManager(Scala/Java/Python文檔) 可用于管理 currently active queries (當(dāng)前活動的查詢)。

Scala

Java

Python

R

valspark:SparkSession=...spark.streams.active// get the list of currently active streaming queriesspark.streams.get(id)// get a query object by its unique idspark.streams.awaitAnyTermination()// block until any one of them terminates

監(jiān)控 Streaming Queries

有兩個用于 monitoring and debugging active queries (監(jiān)視和調(diào)試活動查詢) 的 API - interactively 和 asynchronously 搪花。

Interactive APIs

您可以直接獲取活動查詢的當(dāng)前狀態(tài)和指標(biāo)使用streamingQuery.lastProgress()和streamingQuery.status()遏片。lastProgress()返回一個StreamingQueryProgress對象 在ScalaJava和 Python 中具有相同字段的字典。它有所有的信息在 stream 的最后一個觸發(fā)器中取得的 progress - 處理了哪些數(shù)據(jù)撮竿,處理率是多少吮便,延遲等等。streamingQuery.recentProgress返回最后幾個進度的 array 幢踏。

另外髓需,streamingQuery.status()返回一個StreamingQueryStatus對象在ScalaJava和 Python 中具有相同字段的字典。它提供有關(guān)的信息立即執(zhí)行的查詢 - 觸發(fā)器是否 active 房蝉,數(shù)據(jù)是否正在處理等授账。

這里有幾個例子。

Scala

Java

Python

R

valquery:StreamingQuery=...println(query.lastProgress)/* Will print something like the following.{"id" : "ce011fdc-8762-4dcb-84eb-a77333e28109","runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a","name" : "MyQuery","timestamp" : "2016-12-14T18:45:24.873Z","numInputRows" : 10,"inputRowsPerSecond" : 120.0,"processedRowsPerSecond" : 200.0,"durationMs" : {"triggerExecution" : 3,"getOffset" : 2},"eventTime" : {"watermark" : "2016-12-14T18:45:24.873Z"},"stateOperators" : [ ],"sources" : [ {"description" : "KafkaSource[Subscribe[topic-0]]","startOffset" : {"topic-0" : {"2" : 0,"4" : 1,"1" : 1,"3" : 1,"0" : 1}},"endOffset" : {"topic-0" : {"2" : 0,"4" : 115,"1" : 134,"3" : 21,"0" : 534}},"numInputRows" : 10,"inputRowsPerSecond" : 120.0,"processedRowsPerSecond" : 200.0} ],"sink" : {"description" : "MemorySink"}}*/println(query.status)/*? Will print something like the following.{"message" : "Waiting for data to arrive","isDataAvailable" : false,"isTriggerActive" : false}*/

Asynchronous API

您還可以 asynchronously monitor (異步監(jiān)視)與SparkSession相關(guān)聯(lián)的所有查詢 通過附加一個StreamingQueryListener(Scala/Javadocs) 惨驶。一旦你使用sparkSession.streams.attachListener()附加你的自定義StreamingQueryListener對象,當(dāng)您啟動查詢和當(dāng)有活動查詢有進度時停止時敛助,您將收到 callbacks (回調(diào))粗卜。 這是一個例子,

Scala

Java

Python

R

valspark:SparkSession=...spark.streams.addListener(newStreamingQueryListener(){overridedefonQueryStarted(queryStarted:QueryStartedEvent):Unit={println("Query started: "+queryStarted.id)}overridedefonQueryTerminated(queryTerminated:QueryTerminatedEvent):Unit={println("Query terminated: "+queryTerminated.id)}overridedefonQueryProgress(queryProgress:QueryProgressEvent):Unit={println("Query made progress: "+queryProgress.progress)}})

Recovering from Failures with Checkpointing (從檢查點恢復(fù)故障)

如果發(fā)生 failure or intentional shutdown (故障或故意關(guān)機)纳击,您可以恢復(fù)之前的查詢的進度和狀態(tài)续扔,并繼續(xù)停止的位置。 這是使用 checkpointing and write ahead logs (檢查點和預(yù)寫入日志)來完成的焕数。 您可以使用 checkpoint location (檢查點位置)配置查詢纱昧,并且查詢將保存所有進度信息(即,每個觸發(fā)器中處理的偏移范圍)和正在運行的 aggregates (聚合)(例如quick example中的 woed counts ) 到 checkpoint location (檢查點位置)堡赔。 此檢查點位置必須是 HDFS 兼容文件系統(tǒng)中的路徑识脆,并且可以在starting a query時將其設(shè)置為DataStreamWriter 中的選項。

Scala

Java

Python

R

aggDF.writeStream.outputMode("complete").option("checkpointLocation","path/to/HDFS/dir").format("memory").start()

從這里去哪兒

示例: 查看并運行Scala/Java/Python/R示例善已。

Spark Summit 2016 Talk -深入 Structured Streaming

我們一直在努力

apachecn/spark-doc-zh

原文地址: http://spark.apachecn.org/docs/cn/2.2.0/structured-streaming-programming-guide.html

網(wǎng)頁地址: http://spark.apachecn.org/

github: https://github.com/apachecn/spark-doc-zh(覺得不錯麻煩給個 Star灼捂,謝謝!~)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末换团,一起剝皮案震驚了整個濱河市悉稠,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌艘包,老刑警劉巖的猛,帶你破解...
    沈念sama閱讀 219,539評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件耀盗,死亡現(xiàn)場離奇詭異,居然都是意外死亡卦尊,警方通過查閱死者的電腦和手機叛拷,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評論 3 396
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來猫牡,“玉大人胡诗,你說我怎么就攤上這事√视眩” “怎么了煌恢?”我有些...
    開封第一講書人閱讀 165,871評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長震庭。 經(jīng)常有香客問我瑰抵,道長,這世上最難降的妖魔是什么器联? 我笑而不...
    開封第一講書人閱讀 58,963評論 1 295
  • 正文 為了忘掉前任二汛,我火速辦了婚禮,結(jié)果婚禮上拨拓,老公的妹妹穿的比我還像新娘肴颊。我一直安慰自己,他們只是感情好渣磷,可當(dāng)我...
    茶點故事閱讀 67,984評論 6 393
  • 文/花漫 我一把揭開白布婿着。 她就那樣靜靜地躺著,像睡著了一般醋界。 火紅的嫁衣襯著肌膚如雪竟宋。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,763評論 1 307
  • 那天形纺,我揣著相機與錄音丘侠,去河邊找鬼。 笑死逐样,一個胖子當(dāng)著我的面吹牛蜗字,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播脂新,決...
    沈念sama閱讀 40,468評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼秽澳,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了戏羽?” 一聲冷哼從身側(cè)響起担神,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎始花,沒想到半個月后妄讯,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體孩锡,經(jīng)...
    沈念sama閱讀 45,850評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,002評論 3 338
  • 正文 我和宋清朗相戀三年亥贸,在試婚紗的時候發(fā)現(xiàn)自己被綠了躬窜。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,144評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡炕置,死狀恐怖荣挨,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情朴摊,我是刑警寧澤默垄,帶...
    沈念sama閱讀 35,823評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站甚纲,受9級特大地震影響痰腮,放射性物質(zhì)發(fā)生泄漏凿宾。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,483評論 3 331
  • 文/蒙蒙 一铸题、第九天 我趴在偏房一處隱蔽的房頂上張望秘案。 院中可真熱鬧货裹,春花似錦腰吟、人聲如沸趟大。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,026評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽椰拒。三九已至,卻和暖如春癞尚,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背乱陡。 一陣腳步聲響...
    開封第一講書人閱讀 33,150評論 1 272
  • 我被黑心中介騙來泰國打工浇揩, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人憨颠。 一個月前我還...
    沈念sama閱讀 48,415評論 3 373
  • 正文 我出身青樓胳徽,卻偏偏與公主長得像,于是被迫代替她去往敵國和親爽彤。 傳聞我的和親對象是個殘疾皇子养盗,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,092評論 2 355

推薦閱讀更多精彩內(nèi)容