概述
Structured Streaming 是一個基于 Spark SQL 引擎的庙睡、可擴展的且支持容錯的流處理引擎诀艰。你可以像表達靜態(tài)數(shù)據(jù)上的批處理計算一樣表達流計算糠排。Spark SQL 引擎將隨著流式數(shù)據(jù)的持續(xù)到達而持續(xù)運行,并不斷更新結(jié)果史飞。你可以在Scala猫妙,Java瓷翻,Python或R中使用 Dataset/DataFrame API 來表示流聚合,事件時間窗口(event-time windows)吐咳,流到批處理連接(stream-to-batch joins)等。計算在相同的優(yōu)化的 Spark SQL 引擎上執(zhí)行元践。最后韭脊,通過 checkpoint 和 WAL,系統(tǒng)確保端到端的 exactly-once单旁。簡而言之沪羔,Structured Streaming 提供了快速、可擴展的象浑、容錯的蔫饰、端到端 exactly-once 的流處理。
在本指南中愉豺,我們將引導(dǎo)你熟悉編程模型和 API篓吁。首先,我們從一個簡單的例子開始:streaming word count蚪拦。
快速示例
假設(shè)要監(jiān)聽從本機 9999 端口發(fā)送的文本的 WordCount杖剪,讓我們看看如何使用結(jié)構(gòu)化流式表達這一點冻押。 首先,必須 import 必須的類并創(chuàng)建 SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()
import spark.implicits._
然后盛嘿,創(chuàng)建一個流式 Streaming DataFrame 來代表不斷從 localhost:9999
接收數(shù)據(jù)洛巢,并在該 DataFrame 上執(zhí)行 transform 來計算 word counts。
// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.groupBy("value").count()
DataFrame lines
代表一個包含流數(shù)據(jù)的無限的表次兆。該表包含一個 string 類型的 value 列稿茉,流數(shù)據(jù)里的每條數(shù)據(jù)變成了該表中的一行。接下來芥炭,我們調(diào)用 .as[String]
將 DataFrame 轉(zhuǎn)化為 Dataset漓库,這樣我們就可以執(zhí)行 flatMap
來 split 一行為多個 words。返回值 Dataset words
包含所有的 words蚤认。最后米苹,執(zhí)行 words.groupBy("value").count()
得到 wordCounts
,注意砰琢,這是一個流式的 DataFrame蘸嘶,代表這個流持續(xù)運行中的 word counts。
現(xiàn)在我們設(shè)置好了要在流式數(shù)據(jù)上執(zhí)行的查詢陪汽,接下來要做的就是真正啟動數(shù)據(jù)接收和計算训唱。要做到這一點,我們設(shè)置了每當(dāng)結(jié)果有更新就輸出完整的結(jié)果(通過 outputMode("complete")
指定)至控制臺挚冤。然后調(diào)用 start
來啟動流計算况增。
// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
當(dāng)上面的代碼運行起來后,流式計算會在后臺啟動训挡,.awaitTermination()
會一直等待到計算結(jié)束澳骤。
另外,需要執(zhí)行 Netcat 來向 localhost:9999
發(fā)送數(shù)據(jù)澜薄,比如:
$ nc -lk 9999
apache spark
apache hadoop
...
然后为肮,計算再接收到數(shù)據(jù)后會不斷打印出結(jié)果:
# TERMINAL 2: RUNNING StructuredNetworkWordCount
-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache| 1|
| spark| 1|
+------+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache| 2|
| spark| 1|
|hadoop| 1|
+------+-----+
...
編程模型
Structured Streaming 的關(guān)鍵思想是將持續(xù)不斷的數(shù)據(jù)當(dāng)做一個不斷追加的表。這使得流式計算模型與批處理計算引擎十分相似肤京。你將使用類似對于靜態(tài)表的批處理方式來表達流計算颊艳,然后 Spark 以在無限表上的增量計算來運行。
基本概念
將輸入的流數(shù)據(jù)當(dāng)做一張 “輸入表”忘分。把每一條到達的數(shù)據(jù)作為輸入表的新的一行來追加棋枕。
在輸入表上執(zhí)行的查詢將會生成 “結(jié)果表”。每個觸發(fā)間隔(trigger interval)(例如 1s)妒峦,新的行追加到輸入表重斑,最終更新結(jié)果表。無論何時更新結(jié)果表肯骇,我們都希望將更改的結(jié)果行 output 到外部存儲/接收器(external sink)绸狐。
output 有以下三種模式:
- Complete Mode:整個更新的結(jié)果表將被寫入外部存儲卤恳。由存儲連接器(storage connector)決定如何處理整個表的寫入
- Append Mode:只有結(jié)果表中自上次觸發(fā)后附加的新行將被寫入外部存儲。這僅適用于不期望更改結(jié)果表中現(xiàn)有行的查詢寒矿。
- Update Mode:只有自上次觸發(fā)后結(jié)果表中更新的行將被寫入外部存儲(自 Spark 2.1.1 起可用)突琳。 請注意,這與完全模式不同符相,因為此模式僅輸出自上次觸發(fā)以來更改的行拆融。如果查詢不包含聚合操作,它將等同于附加模式啊终。
請注意镜豹,每種模式適用于某些類型的查詢。這將在后面詳細討論蓝牲。
為了說明這個模型的使用趟脂,讓我們來進一步理解上面的快速示例:
- 最開始的 DataFrame
lines
為輸入表 - 最后的 DataFrame
wordCounts
為結(jié)果表
在流上執(zhí)行的查詢將 DataFrame lines
轉(zhuǎn)化為 DataFrame wordCounts
與在靜態(tài) DataFrame 上執(zhí)行的操作完全相同。當(dāng)啟動計算后例衍,Spark 會不斷從 socket 連接接收數(shù)據(jù)昔期。如果有新的數(shù)據(jù)到達,Spark將運行一個 “增量” 查詢佛玄,將以前的 counts 與新數(shù)據(jù)相結(jié)合硼一,以計算更新的 counts,如下所示:
這種模式與許多其他流處理引擎有顯著差異梦抢。許多流處理引擎要求用戶自己維護運行的狀態(tài)般贼,因此必須對容錯和數(shù)據(jù)一致性(at-least-once, or at-most-once, or exactly-once)進行處理。 在這個模型中奥吩,當(dāng)有新數(shù)據(jù)時哼蛆,Spark負責(zé)更新結(jié)果表,從而減輕用戶的工作霞赫。作為例子腮介,我們來看看該模型如何處理 event-time 和延遲的數(shù)據(jù)。
處理 event-time 和延遲數(shù)據(jù)
event-time 是嵌入在數(shù)據(jù)中的時間绩脆。對于許多 application萤厅,你可能希望在 event-time 上進行操作橄抹。例如靴迫,如果要每分鐘獲取IoT設(shè)備生成的事件數(shù),則會希望使用數(shù)據(jù)生成的時間(即嵌入在數(shù)據(jù)中的 event-time)楼誓,而不是 Spark 接收到數(shù)據(jù)的時間玉锌。在該模型中 event-time 被非常自然的表達,來自設(shè)備的每個事件都是表中的一行疟羹,event-time 是行中的一列主守。這允許基于 window 的聚合(例如每分鐘的事件數(shù))僅僅是 event-time 列上的特殊類型的分組(grouping)和聚合(aggregation):每個時間窗口是一個組禀倔,并且每一行可以屬于多個窗口/組。因此参淫,可以在靜態(tài)數(shù)據(jù)集和數(shù)據(jù)流上進行基于事件時間窗口( event-time-window-based)的聚合查詢救湖,從而使用戶操作更加方便。
此外涎才,該模型也可以自然的處理接收到的時間晚于 event-time 的數(shù)據(jù)鞋既。因為 Spark 一直在更新結(jié)果表,所以它可以完全控制更新舊的聚合數(shù)據(jù)耍铜,或清除舊的聚合以限制中間狀態(tài)數(shù)據(jù)的大小邑闺。自 Spark 2.1 起,開始支持 watermark 來允許用于指定數(shù)據(jù)的超時時間(即接收時間比 event-time 晚多少)棕兼,并允許引擎相應(yīng)的清理舊狀態(tài)陡舅。這將在下文的 “窗口操作” 小節(jié)中進一步說明。
容錯語義
提供端到端的 exactly-once 語義是 Struectured Streaming 背后設(shè)計的關(guān)鍵目標之一伴挚。為了達到這點靶衍,設(shè)計了 Structured Streaming 的 sources(數(shù)據(jù)源)、sink(輸出)以及執(zhí)行引擎可靠的追蹤確切的執(zhí)行進度以便于通過重啟或重新處理來處理任何類型的故障章鲤。對于每個具有偏移量(類似于 Kafka 偏移量或 Kinesis 序列號)的 streaming source摊灭。引擎使用 checkpoint 和 WAL 來記錄每個 trigger 處理的 offset 范圍。streaming sinks 被設(shè)計為對重新處理是冪等的败徊。結(jié)合可以重放的 sources 和支持重復(fù)處理冪等的 sinks帚呼,不管發(fā)生什么故障 Structured Streaming 可以確保端到端的 exactly-once 語義。
使用 Datasets 和 DataFrames API
自 Spark 2.0 起皱蹦,Spark 可以代表靜態(tài)的煤杀、有限數(shù)據(jù)和流式的、無限數(shù)據(jù)沪哺。與靜態(tài)的 Datasets/DataFrames 類似沈自, 你可以使用 SparkSession 基于 streaming sources 來創(chuàng)建 DataFrames/Datasets,并且與靜態(tài) DataFrames/Datasets 使用相同的操作辜妓。
創(chuàng)建流式 DataFrames 和流式 Datasets
流式 DataFrames 可以通過 DataStreamReader 創(chuàng)建枯途,DataStreamReader 通過調(diào)用 SparkSession.readStream()
創(chuàng)建。與靜態(tài)的 read()
方法類似籍滴,你可以指定 source 的詳細信息:格式酪夷、schema、選項等孽惰。
輸入源
在 Spark 2.0 中晚岭,只有幾個內(nèi)置的 sources:
- File source:以文件流的形式讀取目錄中寫入的文件。支持的文件格式為text勋功,csv坦报,json库说,parquet。請注意片择,文件必須以原子方式放置在給定的目錄中潜的,這在大多數(shù)文件系統(tǒng)中可以通過文件移動操作實現(xiàn)。
- Kafka source:從 Kafka 拉取數(shù)據(jù)字管。兼容 Kafka 0.10.0 以及更高版本夏块。
- Socket source(僅做測試用):從 socket 讀取 UTF-8 文本數(shù)據(jù)。請注意纤掸,這只能用于測試脐供,因為它不提供端到端的容錯
某些 source 不是容錯的,因為它們不能保證在故障后可以重放數(shù)據(jù)借跪。以下是 Spark 中所有 sources 的詳細信息:
- File Source:
- options:
- path:輸入目錄的路徑政己,所有格式通用
- maxFilesPerTrigger:每次 trigger 最大文件數(shù)(默認無限大)
- latestFirst:是否首先處理最新的文件,當(dāng)有大量積壓的文件時很有用(默認 false)
- fileNameOnly:是否僅根據(jù)文件名而不是完整路徑檢查新文件(默認 false)掏愁。將此設(shè)置為“true”歇由,以下文件將被視為相同的文件,因為它們的文件名“dataset.txt”是相同的:
"file:///dataset.txt"果港、"s3://a/dataset.txt"沦泌、"s3n://a/b/dataset.txt"、"s3a://a/b/c/dataset.txt"
- 容錯:支持
- 注意:支持通配符路徑辛掠,但不支持逗號分隔的多個路徑/通配符路徑
- options:
- Socket Source:
- options:
- host: 要連接的 host, 必須指定
- port: 要連接的 port, 必須指定
- 容錯:不支持
- 注意:無
- options:
- Kafka Source:
- options:詳見Kafka Integration Guide
- 容錯:支持
- 注意:無
以下是一些例子:
val spark: SparkSession = ...
// Read text from socket
val socketDF = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
socketDF.isStreaming // Returns True for DataFrames that have streaming sources
socketDF.printSchema
// Read all the csv files written atomically in a directory
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
.readStream
.option("sep", ";")
.schema(userSchema) // Specify schema of the csv files
.csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory")
這些示例生成的流 DataFrames 是無類型的谢谦,在編譯時并不會進行類型檢查,只在運行時進行檢查萝衩。某些操作回挽,比如 map、flatMap 等猩谊,需要在編譯時就知道類型千劈,這時你可以將 DataFrame 轉(zhuǎn)換為 Dataset(使用與靜態(tài)相同的方法)。
流式 DataFrames/Datasets 的 schema 推斷和分區(qū)
默認情況下牌捷,基于 File Source 需要你自行指定 schema墙牌,而不是依靠 Spark 自動推斷。這樣的限制確保了 streaming query 會使用確切的 schema暗甥。你也可以通過將spark.sql.streaming.schemaInference
設(shè)置為 true 來重新啟用 schema 推斷喜滨。
當(dāng)子目錄名為 /key=value/
時,會自動發(fā)現(xiàn)分區(qū)淋袖,并且對這些子目錄進行遞歸發(fā)現(xiàn)鸿市。如果這些列出現(xiàn)在提供的 schema 中锯梁,spark 會讀取相應(yīng)目錄的文件并填充這些列即碗⊙媲椋可以增加組成分區(qū)的目錄,比如當(dāng) /data/year=2015/
存在是可以增加 /data/year=2016/
稳强;但修改分區(qū)目錄是無效的榆苞,比如創(chuàng)建目錄 /data/date=2016-04-17/
雹姊。
流式 DataFrames/Datasets 上的操作
你可以在流式 DataFrames/Datasets 上應(yīng)用各種操作:從無類型,類似 SQL 的操作(比如 select验游、where、groupBy)保檐,到類似有類型的 RDD 操作(比如 map耕蝉、filter、flatMap)夜只。讓我們通過幾個例子來看看垒在。
基本操作 - Selection, Projection, Aggregation
大部分常見的 DataFrame/Dataset 操作也支持流式的 DataFrame/Dataset。少數(shù)不支持的操作將會在后面進行討論扔亥。
case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)
val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData] // streaming Dataset with IOT device data
// Select the devices which have signal more than 10
df.select("device").where("signal > 10") // using untyped APIs
ds.filter(_.signal > 10).map(_.device) // using typed APIs
// Running count of the number of updates for each device type
df.groupBy("deviceType").count() // using untyped API
// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal)) // using typed API
event-time(事件時間)上的 window 操作
使用 Structured Streaming 進行滑動的 event-time 窗口聚合是很簡單的场躯,與分組聚合非常類似。在分組聚合中旅挤,為用戶指定的分組列中的每個唯一值維護一個聚合值(例如計數(shù))踢关。在基于 window 的聚合的情況下,為每個 window 維護聚合(aggregate values)粘茄,流式追加的行根據(jù) event-time 落入相應(yīng)的聚合签舞。讓我們通過下圖來理解。
想象下柒瓣,我們的快速示例現(xiàn)在改成了包含數(shù)據(jù)生成的時間”窬現(xiàn)在我們想在 10 分鐘的 window 內(nèi)計算 word count,每 5 分鐘更新一次嘹朗。比如 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20
等师妙。12:00 - 12:10
是指數(shù)據(jù)在 12:00 之后 12:10 之前到達。現(xiàn)在屹培,考慮一個 word 在 12:07 的時候接收到默穴。該 word 應(yīng)當(dāng)增加 12:00 - 12:10
和 12:05 - 12:15
相應(yīng)的 counts。所以 counts 會被分組的 key 和 window 分組褪秀。
結(jié)果表將如下所示:
由于這里的 window 與 group 非常類似蓄诽,在代碼上,你可以使用 groupBy
和 window
來表達 window 聚合媒吗。例子如下:
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
Watermark 和延遲數(shù)據(jù)處理
現(xiàn)在考慮一個數(shù)據(jù)延遲到達會怎么樣仑氛。例如,一個在 12:04 生成的 word 在 12:11 被接收到。application 會使用 12:04 而不是 12:11 去更新 12:00 - 12:10
的 counts锯岖。這在基于 window 的分組中很常見介袜。Structured Streaming 會長時間維持部分聚合的中間狀態(tài),以便于后期數(shù)據(jù)可以正確更新舊 window 的聚合出吹,如下所示:
然后遇伞,當(dāng) query 運行了好幾天,系統(tǒng)必須限制其累積的內(nèi)存中中間狀態(tài)的數(shù)量捶牢。這意味著系統(tǒng)需要知道什么時候可以從內(nèi)存狀態(tài)中刪除舊的聚合鸠珠,因為 application 不會再為該聚合更晚的數(shù)據(jù)進行聚合操作。為啟動此功能秋麸,在Spark 2.1中渐排,引入了 watermark(水印)灸蟆,使引擎自動跟蹤數(shù)據(jù)中的當(dāng)前事件時間飞盆,并相應(yīng)地清理舊狀態(tài)。你可以通過指定事件時間列來定義一個 query 的 watermark 和 late threshold(延遲時間閾值)次乓。對于一個開始于 T 的 window吓歇,引擎會保持中間狀態(tài)并允許后期的數(shù)據(jù)對該狀態(tài)進行更新直到 max event time seen by the engine - late threshold > T
。換句話說票腰,在延遲時間閾值范圍內(nèi)的延遲數(shù)據(jù)會被聚合城看,但超過該閾值的數(shù)據(jù)會被丟棄。讓我們以一個例子來理解這一點杏慰。我們可以使用 withWatermark()
定義一個 watermark测柠,如下所示:
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words
.withWatermark("timestamp", "10 minutes")
.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word")
.count()
在這個例子中,我們定義了基于 timestamp 列定義了 watermark缘滥,并且將 10 分鐘定義為允許數(shù)據(jù)延遲的閾值轰胁。如果該數(shù)據(jù)以 update 輸出模式運行:
- 引擎將不斷更新結(jié)果表中 window 中的 counts 直到該 window 比 watermark 更舊
- 數(shù)據(jù)中的 timestamp 值比當(dāng)前的最大 event-time 落后 10 分鐘以上的數(shù)據(jù)將被丟棄
以下為示圖:
如圖所示,引擎跟蹤的最大 event-time 是藍色虛線朝扼,并且在每個 trigger 開始時設(shè)置 watermark 為 (max event time - '10 mins')
的紅線例如赃阀,當(dāng)引擎發(fā)現(xiàn) (12:14, dog)
時將下次 trigger 的 watermark 設(shè)置為 12:04。然后擎颖,當(dāng) watermark 更新為 12:11 時榛斯,window (12:00 - 12:10)
的中間狀態(tài)被清除,所有后續(xù)數(shù)據(jù)(例如(12:04搂捧,donkey)
)被認為是“太晚”驮俗,因此被丟棄。根據(jù) output 模式允跑,每次觸發(fā)后王凑,更新的計數(shù)(即紫色行)都將作為觸發(fā)輸出進行寫入到 sink搪柑。
某些 sink(例如文件)可能不支持 update mode 所需的細粒度更新。所以索烹,我們還支持 append 模式工碾,只有最后確定的計數(shù)被寫入。這如下圖所示术荤。
注意,在非流式 Dataset 上使用 withWatermark 是無效的空操作每篷。
與之前的 update mode 類似瓣戚,引擎維護每個 window 的中間計數(shù)。只有當(dāng) window < watermark
時才會刪除 window 的中間狀態(tài)數(shù)據(jù)焦读,并將該 window 最終的 counts 追加到結(jié)果表或 sink 中子库。例如,window 12:00 - 12:10
的最終結(jié)果將在 watermark 更新到 12:11 后再追加到結(jié)果表中矗晃。
watermark 清除聚合狀態(tài)的條件十分重要仑嗅,為了清理聚合狀態(tài),必須滿足以下條件(自 Spark 2.1.1 起张症,將來可能會有變化):
- output mode 必須為 append 或 update:complete mode 需要保留所有的聚合數(shù)據(jù)仓技,因此 watermark 不能用來清理聚合數(shù)據(jù)
- 聚合必須具有 event-time 列或基于 event-time 的 window
-
withWatermark 必須調(diào)用在用來聚合的時間列上。比如
df.withWatermark("time", "1 min").groupBy("time2").count()
是無效的 -
withWatermark 必須在調(diào)用聚合前調(diào)用來說明 watermark 的細節(jié)俗他。比如脖捻,
df.groupBy("time").count().withWatermark("time", "1 min")
是無效的
Join 操作
流式 DataFrames 可以與靜態(tài) DataFrames 進行 join 來創(chuàng)建新的流式 DataFrames。如下:
val staticDf = spark.read. ...
val streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type") // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "right_join") // right outer join with a static DF
流重復(fù)數(shù)據(jù)的刪除(去重)
你可以使用事件中的唯一標識符對數(shù)據(jù)流中的記錄進行重復(fù)數(shù)據(jù)刪除兆衅。這與使用唯一標識符列的靜態(tài)重復(fù)數(shù)據(jù)消除完全相同地沮。該查詢會存儲所需的一定量先前的數(shù)據(jù),以便可以過濾重復(fù)的記錄羡亩。類似于聚合摩疑,你可以使用或不使用 watermark 來刪除重復(fù)數(shù)據(jù),如下例子:
- 使用 watermark:如果重復(fù)記錄可能到達的時間有上限畏铆,則可以在事件時間列上定義 watermark雷袋,并使用 guid 和事件時間列進行重復(fù)數(shù)據(jù)刪除
- 不使用 watermark:由于重復(fù)記錄可能到達的時間沒有上限,會將來自過去所有記錄的數(shù)據(jù)存儲為狀態(tài)
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// Without watermark using guid column
streamingDf.dropDuplicates("guid")
// With watermark using guid and eventTime columns
streamingDf
.withWatermark("eventTime", "10 seconds")
.dropDuplicates("guid", "eventTime")
任意有狀態(tài)的操作
許多場景需要使用比聚合更復(fù)雜的狀態(tài)操作辞居,可能不得不把任意類型的數(shù)據(jù)保存為狀態(tài)片排,并使用每個 trigger 中的流式事件對狀態(tài)執(zhí)行任意操作。自 Spark2.2 起速侈,這可以通過調(diào)用 mapGroupWithState
和 flatMapGroupWithState
做到率寡。這兩個操作都允許你在分組的數(shù)據(jù)集上應(yīng)用用戶定義的代碼來更新用戶定義的狀態(tài),有關(guān)更具體的細節(jié)倚搬,請查看API文檔 GroupState 和 example冶共。
不支持的操作
DataFrame/Dataset 有一些操作是流式 DataFrame/Dataset 不支持的,其中的一些如下:
- 不支持多個流聚合
- 不支持 limit、first捅僵、take 這些取 N 條 Row 的操作
- 不支持 Distinct
- 只有當(dāng) output mode 為 complete 時才支持排序操作
- 有條件地支持流和靜態(tài)數(shù)據(jù)集之間的外連接:
- 不支持與流式 Dataset 的全外連接(full outer join)
- 不支持左側(cè)外連接(left outer join)與右側(cè)的流式 Dataset
- 右側(cè)外連接與左側(cè)的流式 Dataset 不支持
此外家卖,還有一些 Dataset 方法將不適用于流數(shù)據(jù)集。它們是立即運行查詢并返回結(jié)果的操作庙楚,這在流數(shù)據(jù)集上沒有意義上荡。相反,這些功能可以通過顯式啟動流式查詢來完成馒闷。
-
count()
:無法從流式 Dataset 返回單個計數(shù)。而是使用ds.groupBy().count()
返回一個包含運行計數(shù)的 streaming Dataset -
foreach()
:使用ds.writeStream.foreach(...)
代替 -
show()
:使用輸出到 console sink 代替
如果你執(zhí)行了這些操作纳账,你會看到一個 AnalysisException疏虫,像 operation XYZ is not supported with streaming DataFrames/Datasets”
永罚。雖然其中一些可能在未來版本的 Spark 中得到支持,還有其他一些從根本上難以有效地實現(xiàn)呢袱。例如翅敌,不支持對輸入流進行排序产捞,因為它需要跟蹤流中接收到的所有數(shù)據(jù),這從根本上是很難做到的哼御。
啟動流式查詢
一旦定義了最終的結(jié)果 DataFrame/Dataset
恋昼,剩下的就要啟動流計算。要做到這一點挟炬,必須使用通過調(diào)用 Dataset.writeStream()
返回的 DataStreamWriter嗦哆。必須指定以下的一個或多個:
- output sink 細節(jié):data format、location 等
- output mode
- query name:可選的粥喜,指定用于識別的查詢的唯一名稱
- trigger interval:可選的橘券,如果沒有指定,則系統(tǒng)將在上一次處理完成后立即檢查是否有新的可用數(shù)據(jù)锋华。如果由于上一次的觸發(fā)還未完成導(dǎo)致下一次的觸發(fā)時間錯過了毯焕,系統(tǒng)會在下一次的觸發(fā)時間進行觸發(fā)而不是在上一次觸發(fā)結(jié)束后立馬觸發(fā)
- checkpoint location:對于那些可以保證端到端容錯的 output sinks,系統(tǒng)會往指定的 location 寫入所有的 checkpoint 信息婆咸。該 location 必須是一個 HDFS 兼容的文件系統(tǒng)续担。checkpoint 會在下一節(jié)中進行更詳細得介紹
Output Modes
有幾種類型的輸出模式:
- Append mode(默認的):這是默認模式擅耽,其中只有從上次觸發(fā)后添加到結(jié)果表的新行將被輸出到 sink物遇。適用于那些添加到結(jié)果表中的行從不會更改的查詢憾儒。只有 select起趾、where、map眶根、flatMap属百、filter变姨、join 等查詢會支持 Append mode
- Complete mode:每次 trigger 后定欧,整個結(jié)果表將被輸出到 sink。聚合查詢(aggregation queries)支持該模式
- Update mode:(自 Spark 2.1.1 可用)扩氢。只有結(jié)果表中自上次 trigger 后更新的行將被輸出到 sink
不同類型的流式 query 支持不同的 output mode爷辱。以下是兼容性:
輸出接收器(Output sink)
有幾種類型的內(nèi)置輸出接收器。
- File sink:存儲輸出至目錄:
writeStream
.format("parquet") // can be "orc", "json", "csv", etc.
.option("path", "path/to/destination/dir")
.start()
- Foreach sink:對輸出中的記錄運行任意計算:
writeStream
.foreach(...)
.start()
- Console sink(用來調(diào)試):每次 trigger 將輸出打印到控制臺厚骗。支持 Append 和 Complete 模式领舰。僅適用于小數(shù)據(jù)量的調(diào)試之用迟螺,因為在每次 trigger 之后矩父,完整的輸出會被存儲在 driver 的內(nèi)存中,請謹慎使用:
writeStream
.format("console")
.start()
- Memory sink(用來調(diào)試):輸出作為內(nèi)存表存儲在內(nèi)存中民轴。支持 Append 和 Complete 模式后裸。僅適用于小數(shù)據(jù)量的調(diào)試之用冒滩,因為在每次 trigger 之后开睡,完整的輸出會被存儲在 driver 的內(nèi)存中,請謹慎使用:
writeStream
.format("memory")
.queryName("tableName")
.start()
某些接收器不容錯扶檐,因為它們不保證輸出的持久性蘸秘,僅用于調(diào)試目的蝗茁。請參閱上一節(jié)關(guān)于容錯語義的部分哮翘。以下是 Spark 中所有內(nèi)置接收器的詳細信息:
請注意饭寺,必須調(diào)用 start()
來實際啟動查詢的執(zhí)行叫挟。這將返回一個 StreamingQuery 對象抹恳,它是持續(xù)運行的查詢的句柄署驻。你可以使用該對象來管理查詢旺上,我們將在下一小節(jié)中討論⌒ǎ現(xiàn)在,讓我們通過幾個例子來了解:
// ========== DF with no aggregations ==========
val noAggDF = deviceDataDf.select("device").where("signal > 10")
// Print new data to console
noAggDF
.writeStream
.format("console")
.start()
// Write new data to Parquet files
noAggDF
.writeStream
.format("parquet")
.option("checkpointLocation", "path/to/checkpoint/dir")
.option("path", "path/to/destination/dir")
.start()
// ========== DF with aggregation ==========
val aggDF = df.groupBy("device").count()
// Print updated aggregations to console
aggDF
.writeStream
.outputMode("complete")
.format("console")
.start()
// Have all the aggregates in an in-memory table
aggDF
.writeStream
.queryName("aggregates") // this query name will be the table name
.outputMode("complete")
.format("memory")
.start()
spark.sql("select * from aggregates").show() // interactively query in-memory table
使用 Foreach
foreach 操作允許在輸出數(shù)據(jù)上進行任意操作杭攻。在 Spark 2.1 中朴上,只有 Scala 和 Java 可用。要使用這個叼架,你必須實現(xiàn) ForeachWriter 接口乖订,其具有每次 trigger 后每當(dāng)有一系列行生成時會調(diào)用的方法乍构,注意一下幾個要點:
- writer 必須是可序列化的,因為它將被序列化并發(fā)送給 executor 執(zhí)行
- open岂丘、process 和 close 會在 executors 上被調(diào)用
- 只有當(dāng) open 方法被調(diào)用時 writer 才執(zhí)行所有的初始化眠饮。請注意仪召,如果在創(chuàng)建對象時立即進行任何初始化,那么該初始化將在 driver 中發(fā)生已旧,這可能不是你預(yù)期的
- open 方法可以使用 version 和 partition 來決定是否需要寫入序列的行运褪。可以返回 true(繼續(xù)寫入)或 false(無需寫入)胁后。如果返回 false攀芯,process 不會在任何行上被調(diào)用侣诺。例如氧秘,在部分失敗之后丸相,失敗的 trigger 的部分輸出分區(qū)可能已經(jīng)被提交到數(shù)據(jù)庫∩潘悖基于存儲在數(shù)據(jù)庫中的元數(shù)據(jù)涕蜂,可以識別已經(jīng)提交的分區(qū)映琳,因此返回 false 以避免再次提交它們萨西。
- 每當(dāng) open 被調(diào)用原杂,close 也會被調(diào)用(除非 JVM 因為意外退出)。即使 open 返回 false 也是如此年局。如果在處理和寫入數(shù)據(jù)的時候發(fā)生錯誤,close 會被調(diào)用仲闽。你有責(zé)任清理在 open 中創(chuàng)建的狀態(tài)(例如連接赖欣,事務(wù)等)顶吮,以免資源泄漏
管理流式查詢
當(dāng) query 啟動時粪薛,StreamingQuery 被創(chuàng)建违寿,可以用來監(jiān)控和管理該 query:
val query = df.writeStream.format("console").start() // get the query object
query.id // get the unique identifier of the running query that persists across restarts from checkpoint data
query.runId // get the unique id of this run of the query, which will be generated at every start/restart
query.name // get the name of the auto-generated or user-specified name
query.explain() // print detailed explanations of the query
query.stop() // stop the query
query.awaitTermination() // block until query is terminated, with stop() or with error
query.exception // the exception if the query has been terminated with error
query.recentProgress // an array of the most recent progress updates for this query
query.lastProgress // the most recent progress update of this streaming query
可以在單個 SparkSession 中啟動任意數(shù)量的 query藤巢。他們都將同時運行共享集群資源掂咒。可以調(diào)用 sparkSession.streams()
來獲取 StreamingQueryManager糜工,可以用來管理當(dāng)前 active queries:
val spark: SparkSession = ...
spark.streams.active // get the list of currently active streaming queries
spark.streams.get(id) // get a query object by its unique id
spark.streams.awaitAnyTermination() // block until any one of them terminates
監(jiān)控流式查詢
有兩種 API 用于監(jiān)控和調(diào)試 active queries:以交互方式和異步方式。
交互式 APIs(Interactive APIs)
你可以調(diào)用 streamingQuery.lastProgress()
和 streamingQuery.status()
來直接獲取某個 query 的當(dāng)前的狀態(tài)和指標嫉戚。lastProgress
返回一個 StreamingQueryProgress
對象澈圈。它具有關(guān)于流最后一個 trigger 的進度的所有信息瞬女,包括處理哪些數(shù)據(jù)诽偷、處理速度疯坤、處理延遲等压怠。還有 streamingQuery.recentProgress
返回最后幾個進度的數(shù)組菌瘫。
另外布卡,streamingQuery.status()
返回一個 StreamingQueryStatus
忿等。它提供了有關(guān) query 執(zhí)行的信息这弧,比如是否有 trigger active,是否有數(shù)據(jù)正在被處理等皇帮。
以下是一些例子:
val query: StreamingQuery = ...
println(query.lastProgress)
/* Will print something like the following.
{
"id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
"runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
"name" : "MyQuery",
"timestamp" : "2016-12-14T18:45:24.873Z",
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0,
"durationMs" : {
"triggerExecution" : 3,
"getOffset" : 2
},
"eventTime" : {
"watermark" : "2016-12-14T18:45:24.873Z"
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[topic-0]]",
"startOffset" : {
"topic-0" : {
"2" : 0,
"4" : 1,
"1" : 1,
"3" : 1,
"0" : 1
}
},
"endOffset" : {
"topic-0" : {
"2" : 0,
"4" : 115,
"1" : 134,
"3" : 21,
"0" : 534
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0
} ],
"sink" : {
"description" : "MemorySink"
}
}
*/
println(query.status)
/* Will print something like the following.
{
"message" : "Waiting for data to arrive",
"isDataAvailable" : false,
"isTriggerActive" : false
}
*/
異步 API
你還可以通過附加 StreamingQueryListener
異步監(jiān)控與 SparkSession 關(guān)聯(lián)的所有查詢属拾。一旦你通過 sparkSession.streams.attachListener()
附加了自定義的 StreamingQueryListener
對象渐白,當(dāng) query 啟動纯衍、結(jié)束襟诸、active 查詢有進展時就會被回調(diào)基协。下面是一個例子:
val spark: SparkSession = ...
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
println("Query started: " + queryStarted.id)
}
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
println("Query terminated: " + queryTerminated.id)
}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
println("Query made progress: " + queryProgress.progress)
}
})
使用 checkpoint 從失敗中恢復(fù)
在失敗或主動 shutdown 的情況下澜驮,可以恢復(fù)之前的查詢進度和狀態(tài)并從該處繼續(xù)運行。這是依賴 checkpoint 和 WAL(write ahead logs) 來完成的揩慕。你可以配置一個 checkpoint 路徑扮休,query 會將進度信息(比如每個 trigger 處理的 offset ranger)和運行中的聚合寫入到 checkpoint 的位置玷坠。checkpoint 的路徑必須是一個 HDFS 兼容的文件系統(tǒng)八堡,并且需要在定義 query 的時候設(shè)置好,如下:
aggDF
.writeStream
.outputMode("complete")
.option("checkpointLocation", "path/to/HDFS/dir")
.format("memory")
.start()
作者:牛肉圓粉不加蔥
鏈接:http://www.reibang.com/p/ae07471c1f8d
來源:簡書
著作權(quán)歸作者所有缝龄。商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán)叔壤,非商業(yè)轉(zhuǎn)載請注明出處炼绘。