Structured Streaming 編程指南

概述

Structured Streaming 是一個基于 Spark SQL 引擎的庙睡、可擴展的且支持容錯的流處理引擎诀艰。你可以像表達靜態(tài)數(shù)據(jù)上的批處理計算一樣表達流計算糠排。Spark SQL 引擎將隨著流式數(shù)據(jù)的持續(xù)到達而持續(xù)運行,并不斷更新結(jié)果史飞。你可以在Scala猫妙,Java瓷翻,Python或R中使用 Dataset/DataFrame API 來表示流聚合,事件時間窗口(event-time windows)吐咳,流到批處理連接(stream-to-batch joins)等。計算在相同的優(yōu)化的 Spark SQL 引擎上執(zhí)行元践。最后韭脊,通過 checkpoint 和 WAL,系統(tǒng)確保端到端的 exactly-once单旁。簡而言之沪羔,Structured Streaming 提供了快速、可擴展的象浑、容錯的蔫饰、端到端 exactly-once 的流處理。

在本指南中愉豺,我們將引導(dǎo)你熟悉編程模型和 API篓吁。首先,我們從一個簡單的例子開始:streaming word count蚪拦。

快速示例

假設(shè)要監(jiān)聽從本機 9999 端口發(fā)送的文本的 WordCount杖剪,讓我們看看如何使用結(jié)構(gòu)化流式表達這一點冻押。 首先,必須 import 必須的類并創(chuàng)建 SparkSession

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder
  .appName("StructuredNetworkWordCount")
  .getOrCreate()

import spark.implicits._

然后盛嘿,創(chuàng)建一個流式 Streaming DataFrame 來代表不斷從 localhost:9999 接收數(shù)據(jù)洛巢,并在該 DataFrame 上執(zhí)行 transform 來計算 word counts。

// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))

// Generate running word count
val wordCounts = words.groupBy("value").count()

DataFrame lines 代表一個包含流數(shù)據(jù)的無限的表次兆。該表包含一個 string 類型的 value 列稿茉,流數(shù)據(jù)里的每條數(shù)據(jù)變成了該表中的一行。接下來芥炭,我們調(diào)用 .as[String] 將 DataFrame 轉(zhuǎn)化為 Dataset漓库,這樣我們就可以執(zhí)行 flatMap 來 split 一行為多個 words。返回值 Dataset words 包含所有的 words蚤认。最后米苹,執(zhí)行 words.groupBy("value").count() 得到 wordCounts注意砰琢,這是一個流式的 DataFrame蘸嘶,代表這個流持續(xù)運行中的 word counts

現(xiàn)在我們設(shè)置好了要在流式數(shù)據(jù)上執(zhí)行的查詢陪汽,接下來要做的就是真正啟動數(shù)據(jù)接收和計算训唱。要做到這一點,我們設(shè)置了每當(dāng)結(jié)果有更新就輸出完整的結(jié)果(通過 outputMode("complete")指定)至控制臺挚冤。然后調(diào)用 start 來啟動流計算况增。

// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()

當(dāng)上面的代碼運行起來后,流式計算會在后臺啟動训挡,.awaitTermination() 會一直等待到計算結(jié)束澳骤。

另外,需要執(zhí)行 Netcat 來向 localhost:9999 發(fā)送數(shù)據(jù)澜薄,比如:

$ nc -lk 9999
apache spark
apache hadoop
...

然后为肮,計算再接收到數(shù)據(jù)后會不斷打印出結(jié)果:

# TERMINAL 2: RUNNING StructuredNetworkWordCount

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...

編程模型

Structured Streaming 的關(guān)鍵思想是將持續(xù)不斷的數(shù)據(jù)當(dāng)做一個不斷追加的表。這使得流式計算模型與批處理計算引擎十分相似肤京。你將使用類似對于靜態(tài)表的批處理方式來表達流計算颊艳,然后 Spark 以在無限表上的增量計算來運行。

基本概念

將輸入的流數(shù)據(jù)當(dāng)做一張 “輸入表”忘分。把每一條到達的數(shù)據(jù)作為輸入表的新的一行來追加棋枕。

image

在輸入表上執(zhí)行的查詢將會生成 “結(jié)果表”。每個觸發(fā)間隔(trigger interval)(例如 1s)妒峦,新的行追加到輸入表重斑,最終更新結(jié)果表。無論何時更新結(jié)果表肯骇,我們都希望將更改的結(jié)果行 output 到外部存儲/接收器(external sink)绸狐。

image

output 有以下三種模式:

  • Complete Mode:整個更新的結(jié)果表將被寫入外部存儲卤恳。由存儲連接器(storage connector)決定如何處理整個表的寫入
  • Append Mode:只有結(jié)果表中自上次觸發(fā)后附加的新行將被寫入外部存儲。這僅適用于不期望更改結(jié)果表中現(xiàn)有行的查詢寒矿。
  • Update Mode:只有自上次觸發(fā)后結(jié)果表中更新的行將被寫入外部存儲(自 Spark 2.1.1 起可用)突琳。 請注意,這與完全模式不同符相,因為此模式僅輸出自上次觸發(fā)以來更改的行拆融。如果查詢不包含聚合操作,它將等同于附加模式啊终。

請注意镜豹,每種模式適用于某些類型的查詢。這將在后面詳細討論蓝牲。

為了說明這個模型的使用趟脂,讓我們來進一步理解上面的快速示例:

  • 最開始的 DataFrame lines 為輸入表
  • 最后的 DataFrame wordCounts 為結(jié)果表

在流上執(zhí)行的查詢將 DataFrame lines 轉(zhuǎn)化為 DataFrame wordCounts 與在靜態(tài) DataFrame 上執(zhí)行的操作完全相同。當(dāng)啟動計算后例衍,Spark 會不斷從 socket 連接接收數(shù)據(jù)昔期。如果有新的數(shù)據(jù)到達,Spark將運行一個 “增量” 查詢佛玄,將以前的 counts 與新數(shù)據(jù)相結(jié)合硼一,以計算更新的 counts,如下所示:

image

這種模式與許多其他流處理引擎有顯著差異梦抢。許多流處理引擎要求用戶自己維護運行的狀態(tài)般贼,因此必須對容錯和數(shù)據(jù)一致性(at-least-once, or at-most-once, or exactly-once)進行處理。 在這個模型中奥吩,當(dāng)有新數(shù)據(jù)時哼蛆,Spark負責(zé)更新結(jié)果表,從而減輕用戶的工作霞赫。作為例子腮介,我們來看看該模型如何處理 event-time 和延遲的數(shù)據(jù)。

處理 event-time 和延遲數(shù)據(jù)

event-time 是嵌入在數(shù)據(jù)中的時間绩脆。對于許多 application萤厅,你可能希望在 event-time 上進行操作橄抹。例如靴迫,如果要每分鐘獲取IoT設(shè)備生成的事件數(shù),則會希望使用數(shù)據(jù)生成的時間(即嵌入在數(shù)據(jù)中的 event-time)楼誓,而不是 Spark 接收到數(shù)據(jù)的時間玉锌。在該模型中 event-time 被非常自然的表達,來自設(shè)備的每個事件都是表中的一行疟羹,event-time 是行中的一列主守。這允許基于 window 的聚合(例如每分鐘的事件數(shù))僅僅是 event-time 列上的特殊類型的分組(grouping)和聚合(aggregation):每個時間窗口是一個組禀倔,并且每一行可以屬于多個窗口/組。因此参淫,可以在靜態(tài)數(shù)據(jù)集和數(shù)據(jù)流上進行基于事件時間窗口( event-time-window-based)的聚合查詢救湖,從而使用戶操作更加方便。

此外涎才,該模型也可以自然的處理接收到的時間晚于 event-time 的數(shù)據(jù)鞋既。因為 Spark 一直在更新結(jié)果表,所以它可以完全控制更新舊的聚合數(shù)據(jù)耍铜,或清除舊的聚合以限制中間狀態(tài)數(shù)據(jù)的大小邑闺。自 Spark 2.1 起,開始支持 watermark 來允許用于指定數(shù)據(jù)的超時時間(即接收時間比 event-time 晚多少)棕兼,并允許引擎相應(yīng)的清理舊狀態(tài)陡舅。這將在下文的 “窗口操作” 小節(jié)中進一步說明。

容錯語義

提供端到端的 exactly-once 語義是 Struectured Streaming 背后設(shè)計的關(guān)鍵目標之一伴挚。為了達到這點靶衍,設(shè)計了 Structured Streaming 的 sources(數(shù)據(jù)源)、sink(輸出)以及執(zhí)行引擎可靠的追蹤確切的執(zhí)行進度以便于通過重啟或重新處理來處理任何類型的故障章鲤。對于每個具有偏移量(類似于 Kafka 偏移量或 Kinesis 序列號)的 streaming source摊灭。引擎使用 checkpoint 和 WAL 來記錄每個 trigger 處理的 offset 范圍。streaming sinks 被設(shè)計為對重新處理是冪等的败徊。結(jié)合可以重放的 sources 和支持重復(fù)處理冪等的 sinks帚呼,不管發(fā)生什么故障 Structured Streaming 可以確保端到端的 exactly-once 語義。

使用 Datasets 和 DataFrames API

自 Spark 2.0 起皱蹦,Spark 可以代表靜態(tài)的煤杀、有限數(shù)據(jù)和流式的、無限數(shù)據(jù)沪哺。與靜態(tài)的 Datasets/DataFrames 類似沈自, 你可以使用 SparkSession 基于 streaming sources 來創(chuàng)建 DataFrames/Datasets,并且與靜態(tài) DataFrames/Datasets 使用相同的操作辜妓。

創(chuàng)建流式 DataFrames 和流式 Datasets

流式 DataFrames 可以通過 DataStreamReader 創(chuàng)建枯途,DataStreamReader 通過調(diào)用 SparkSession.readStream() 創(chuàng)建。與靜態(tài)的 read() 方法類似籍滴,你可以指定 source 的詳細信息:格式酪夷、schema、選項等孽惰。

輸入源

在 Spark 2.0 中晚岭,只有幾個內(nèi)置的 sources:

  • File source:以文件流的形式讀取目錄中寫入的文件。支持的文件格式為text勋功,csv坦报,json库说,parquet。請注意片择,文件必須以原子方式放置在給定的目錄中潜的,這在大多數(shù)文件系統(tǒng)中可以通過文件移動操作實現(xiàn)
  • Kafka source:從 Kafka 拉取數(shù)據(jù)字管。兼容 Kafka 0.10.0 以及更高版本夏块。
  • Socket source(僅做測試用):從 socket 讀取 UTF-8 文本數(shù)據(jù)。請注意纤掸,這只能用于測試脐供,因為它不提供端到端的容錯

某些 source 不是容錯的,因為它們不能保證在故障后可以重放數(shù)據(jù)借跪。以下是 Spark 中所有 sources 的詳細信息:

  • File Source:
    • options:
      • path:輸入目錄的路徑政己,所有格式通用
      • maxFilesPerTrigger:每次 trigger 最大文件數(shù)(默認無限大)
      • latestFirst:是否首先處理最新的文件,當(dāng)有大量積壓的文件時很有用(默認 false)
      • fileNameOnly:是否僅根據(jù)文件名而不是完整路徑檢查新文件(默認 false)掏愁。將此設(shè)置為“true”歇由,以下文件將被視為相同的文件,因為它們的文件名“dataset.txt”是相同的:"file:///dataset.txt"果港、"s3://a/dataset.txt"沦泌、"s3n://a/b/dataset.txt"、"s3a://a/b/c/dataset.txt"
    • 容錯:支持
    • 注意:支持通配符路徑辛掠,但不支持逗號分隔的多個路徑/通配符路徑
  • Socket Source:
    • options:
      • host: 要連接的 host, 必須指定
      • port: 要連接的 port, 必須指定
    • 容錯:不支持
    • 注意:無
  • Kafka Source:

以下是一些例子:

val spark: SparkSession = ...

// Read text from socket
val socketDF = spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

socketDF.isStreaming    // Returns True for DataFrames that have streaming sources

socketDF.printSchema

// Read all the csv files written atomically in a directory
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
  .readStream
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory")    // Equivalent to format("csv").load("/path/to/directory")

這些示例生成的流 DataFrames 是無類型的谢谦,在編譯時并不會進行類型檢查,只在運行時進行檢查萝衩。某些操作回挽,比如 map、flatMap 等猩谊,需要在編譯時就知道類型千劈,這時你可以將 DataFrame 轉(zhuǎn)換為 Dataset(使用與靜態(tài)相同的方法)。

流式 DataFrames/Datasets 的 schema 推斷和分區(qū)

默認情況下牌捷,基于 File Source 需要你自行指定 schema墙牌,而不是依靠 Spark 自動推斷。這樣的限制確保了 streaming query 會使用確切的 schema暗甥。你也可以通過將spark.sql.streaming.schemaInference 設(shè)置為 true 來重新啟用 schema 推斷喜滨。

當(dāng)子目錄名為 /key=value/ 時,會自動發(fā)現(xiàn)分區(qū)淋袖,并且對這些子目錄進行遞歸發(fā)現(xiàn)鸿市。如果這些列出現(xiàn)在提供的 schema 中锯梁,spark 會讀取相應(yīng)目錄的文件并填充這些列即碗⊙媲椋可以增加組成分區(qū)的目錄,比如當(dāng) /data/year=2015/ 存在是可以增加 /data/year=2016/稳强;但修改分區(qū)目錄是無效的榆苞,比如創(chuàng)建目錄 /data/date=2016-04-17/雹姊。

流式 DataFrames/Datasets 上的操作

你可以在流式 DataFrames/Datasets 上應(yīng)用各種操作:從無類型,類似 SQL 的操作(比如 select验游、where、groupBy)保檐,到類似有類型的 RDD 操作(比如 map耕蝉、filter、flatMap)夜只。讓我們通過幾個例子來看看垒在。

基本操作 - Selection, Projection, Aggregation

大部分常見的 DataFrame/Dataset 操作也支持流式的 DataFrame/Dataset。少數(shù)不支持的操作將會在后面進行討論扔亥。

case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)

val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10")      // using untyped APIs   
ds.filter(_.signal > 10).map(_.device)         // using typed APIs

// Running count of the number of updates for each device type
df.groupBy("deviceType").count()                          // using untyped API

// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API

event-time(事件時間)上的 window 操作

使用 Structured Streaming 進行滑動的 event-time 窗口聚合是很簡單的场躯,與分組聚合非常類似。在分組聚合中旅挤,為用戶指定的分組列中的每個唯一值維護一個聚合值(例如計數(shù))踢关。在基于 window 的聚合的情況下,為每個 window 維護聚合(aggregate values)粘茄,流式追加的行根據(jù) event-time 落入相應(yīng)的聚合签舞。讓我們通過下圖來理解。

想象下柒瓣,我們的快速示例現(xiàn)在改成了包含數(shù)據(jù)生成的時間”窬現(xiàn)在我們想在 10 分鐘的 window 內(nèi)計算 word count,每 5 分鐘更新一次嘹朗。比如 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20 等师妙。12:00 - 12:10 是指數(shù)據(jù)在 12:00 之后 12:10 之前到達。現(xiàn)在屹培,考慮一個 word 在 12:07 的時候接收到默穴。該 word 應(yīng)當(dāng)增加 12:00 - 12:1012:05 - 12:15 相應(yīng)的 counts。所以 counts 會被分組的 key 和 window 分組褪秀。

結(jié)果表將如下所示:

image

由于這里的 window 與 group 非常類似蓄诽,在代碼上,你可以使用 groupBywindow 來表達 window 聚合媒吗。例子如下:

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

Watermark 和延遲數(shù)據(jù)處理

現(xiàn)在考慮一個數(shù)據(jù)延遲到達會怎么樣仑氛。例如,一個在 12:04 生成的 word 在 12:11 被接收到。application 會使用 12:04 而不是 12:11 去更新 12:00 - 12:10的 counts锯岖。這在基于 window 的分組中很常見介袜。Structured Streaming 會長時間維持部分聚合的中間狀態(tài),以便于后期數(shù)據(jù)可以正確更新舊 window 的聚合出吹,如下所示:

image

然后遇伞,當(dāng) query 運行了好幾天,系統(tǒng)必須限制其累積的內(nèi)存中中間狀態(tài)的數(shù)量捶牢。這意味著系統(tǒng)需要知道什么時候可以從內(nèi)存狀態(tài)中刪除舊的聚合鸠珠,因為 application 不會再為該聚合更晚的數(shù)據(jù)進行聚合操作。為啟動此功能秋麸,在Spark 2.1中渐排,引入了 watermark(水印)灸蟆,使引擎自動跟蹤數(shù)據(jù)中的當(dāng)前事件時間飞盆,并相應(yīng)地清理舊狀態(tài)。你可以通過指定事件時間列來定義一個 query 的 watermark 和 late threshold(延遲時間閾值)次乓。對于一個開始于 T 的 window吓歇,引擎會保持中間狀態(tài)并允許后期的數(shù)據(jù)對該狀態(tài)進行更新直到 max event time seen by the engine - late threshold > T。換句話說票腰,在延遲時間閾值范圍內(nèi)的延遲數(shù)據(jù)會被聚合城看,但超過該閾值的數(shù)據(jù)會被丟棄。讓我們以一個例子來理解這一點杏慰。我們可以使用 withWatermark() 定義一個 watermark测柠,如下所示:

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),
        $"word")
    .count()

在這個例子中,我們定義了基于 timestamp 列定義了 watermark缘滥,并且將 10 分鐘定義為允許數(shù)據(jù)延遲的閾值轰胁。如果該數(shù)據(jù)以 update 輸出模式運行:

  • 引擎將不斷更新結(jié)果表中 window 中的 counts 直到該 window 比 watermark 更舊
  • 數(shù)據(jù)中的 timestamp 值比當(dāng)前的最大 event-time 落后 10 分鐘以上的數(shù)據(jù)將被丟棄

以下為示圖:

image

如圖所示,引擎跟蹤的最大 event-time 是藍色虛線朝扼,并且在每個 trigger 開始時設(shè)置 watermark 為 (max event time - '10 mins') 的紅線例如赃阀,當(dāng)引擎發(fā)現(xiàn) (12:14, dog) 時將下次 trigger 的 watermark 設(shè)置為 12:04。然后擎颖,當(dāng) watermark 更新為 12:11 時榛斯,window (12:00 - 12:10) 的中間狀態(tài)被清除,所有后續(xù)數(shù)據(jù)(例如(12:04搂捧,donkey))被認為是“太晚”驮俗,因此被丟棄。根據(jù) output 模式允跑,每次觸發(fā)后王凑,更新的計數(shù)(即紫色行)都將作為觸發(fā)輸出進行寫入到 sink搪柑。

某些 sink(例如文件)可能不支持 update mode 所需的細粒度更新。所以索烹,我們還支持 append 模式工碾,只有最后確定的計數(shù)被寫入。這如下圖所示术荤。

注意,在非流式 Dataset 上使用 withWatermark 是無效的空操作每篷。

image

與之前的 update mode 類似瓣戚,引擎維護每個 window 的中間計數(shù)。只有當(dāng) window < watermark 時才會刪除 window 的中間狀態(tài)數(shù)據(jù)焦读,并將該 window 最終的 counts 追加到結(jié)果表或 sink 中子库。例如,window 12:00 - 12:10 的最終結(jié)果將在 watermark 更新到 12:11 后再追加到結(jié)果表中矗晃。

watermark 清除聚合狀態(tài)的條件十分重要仑嗅,為了清理聚合狀態(tài),必須滿足以下條件(自 Spark 2.1.1 起张症,將來可能會有變化):

  • output mode 必須為 append 或 update:complete mode 需要保留所有的聚合數(shù)據(jù)仓技,因此 watermark 不能用來清理聚合數(shù)據(jù)
  • 聚合必須具有 event-time 列或基于 event-time 的 window
  • withWatermark 必須調(diào)用在用來聚合的時間列上。比如 df.withWatermark("time", "1 min").groupBy("time2").count() 是無效的
  • withWatermark 必須在調(diào)用聚合前調(diào)用來說明 watermark 的細節(jié)俗他。比如脖捻,df.groupBy("time").count().withWatermark("time", "1 min") 是無效的

Join 操作

流式 DataFrames 可以與靜態(tài) DataFrames 進行 join 來創(chuàng)建新的流式 DataFrames。如下:

val staticDf = spark.read. ...
val streamingDf = spark.readStream. ...

streamingDf.join(staticDf, "type")          // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "right_join")  // right outer join with a static DF

流重復(fù)數(shù)據(jù)的刪除(去重)

你可以使用事件中的唯一標識符對數(shù)據(jù)流中的記錄進行重復(fù)數(shù)據(jù)刪除兆衅。這與使用唯一標識符列的靜態(tài)重復(fù)數(shù)據(jù)消除完全相同地沮。該查詢會存儲所需的一定量先前的數(shù)據(jù),以便可以過濾重復(fù)的記錄羡亩。類似于聚合摩疑,你可以使用或不使用 watermark 來刪除重復(fù)數(shù)據(jù),如下例子:

  • 使用 watermark:如果重復(fù)記錄可能到達的時間有上限畏铆,則可以在事件時間列上定義 watermark雷袋,并使用 guid 和事件時間列進行重復(fù)數(shù)據(jù)刪除
  • 不使用 watermark:由于重復(fù)記錄可能到達的時間沒有上限,會將來自過去所有記錄的數(shù)據(jù)存儲為狀態(tài)
val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid")

// With watermark using guid and eventTime columns
streamingDf
  .withWatermark("eventTime", "10 seconds")
  .dropDuplicates("guid", "eventTime")

任意有狀態(tài)的操作

許多場景需要使用比聚合更復(fù)雜的狀態(tài)操作辞居,可能不得不把任意類型的數(shù)據(jù)保存為狀態(tài)片排,并使用每個 trigger 中的流式事件對狀態(tài)執(zhí)行任意操作。自 Spark2.2 起速侈,這可以通過調(diào)用 mapGroupWithStateflatMapGroupWithState 做到率寡。這兩個操作都允許你在分組的數(shù)據(jù)集上應(yīng)用用戶定義的代碼來更新用戶定義的狀態(tài),有關(guān)更具體的細節(jié)倚搬,請查看API文檔 GroupStateexample冶共。

不支持的操作

DataFrame/Dataset 有一些操作是流式 DataFrame/Dataset 不支持的,其中的一些如下:

  • 不支持多個流聚合
  • 不支持 limit、first捅僵、take 這些取 N 條 Row 的操作
  • 不支持 Distinct
  • 只有當(dāng) output mode 為 complete 時才支持排序操作
  • 有條件地支持流和靜態(tài)數(shù)據(jù)集之間的外連接:
    • 不支持與流式 Dataset 的全外連接(full outer join)
    • 不支持左側(cè)外連接(left outer join)與右側(cè)的流式 Dataset
    • 右側(cè)外連接與左側(cè)的流式 Dataset 不支持

此外家卖,還有一些 Dataset 方法將不適用于流數(shù)據(jù)集。它們是立即運行查詢并返回結(jié)果的操作庙楚,這在流數(shù)據(jù)集上沒有意義上荡。相反,這些功能可以通過顯式啟動流式查詢來完成馒闷。

  • count():無法從流式 Dataset 返回單個計數(shù)。而是使用 ds.groupBy().count() 返回一個包含運行計數(shù)的 streaming Dataset
  • foreach():使用 ds.writeStream.foreach(...) 代替
  • show():使用輸出到 console sink 代替

如果你執(zhí)行了這些操作纳账,你會看到一個 AnalysisException疏虫,像 operation XYZ is not supported with streaming DataFrames/Datasets”永罚。雖然其中一些可能在未來版本的 Spark 中得到支持,還有其他一些從根本上難以有效地實現(xiàn)呢袱。例如翅敌,不支持對輸入流進行排序产捞,因為它需要跟蹤流中接收到的所有數(shù)據(jù),這從根本上是很難做到的哼御。

啟動流式查詢

一旦定義了最終的結(jié)果 DataFrame/Dataset恋昼,剩下的就要啟動流計算。要做到這一點挟炬,必須使用通過調(diào)用 Dataset.writeStream() 返回的 DataStreamWriter嗦哆。必須指定以下的一個或多個:

  • output sink 細節(jié):data format、location 等
  • output mode
  • query name:可選的粥喜,指定用于識別的查詢的唯一名稱
  • trigger interval:可選的橘券,如果沒有指定,則系統(tǒng)將在上一次處理完成后立即檢查是否有新的可用數(shù)據(jù)锋华。如果由于上一次的觸發(fā)還未完成導(dǎo)致下一次的觸發(fā)時間錯過了毯焕,系統(tǒng)會在下一次的觸發(fā)時間進行觸發(fā)而不是在上一次觸發(fā)結(jié)束后立馬觸發(fā)
  • checkpoint location:對于那些可以保證端到端容錯的 output sinks,系統(tǒng)會往指定的 location 寫入所有的 checkpoint 信息婆咸。該 location 必須是一個 HDFS 兼容的文件系統(tǒng)续担。checkpoint 會在下一節(jié)中進行更詳細得介紹

Output Modes

有幾種類型的輸出模式:

  • Append mode(默認的):這是默認模式擅耽,其中只有從上次觸發(fā)后添加到結(jié)果表的新行將被輸出到 sink物遇。適用于那些添加到結(jié)果表中的行從不會更改的查詢憾儒。只有 select起趾、where、map眶根、flatMap属百、filter变姨、join 等查詢會支持 Append mode
  • Complete mode:每次 trigger 后定欧,整個結(jié)果表將被輸出到 sink。聚合查詢(aggregation queries)支持該模式
  • Update mode:(自 Spark 2.1.1 可用)扩氢。只有結(jié)果表中自上次 trigger 后更新的行將被輸出到 sink

不同類型的流式 query 支持不同的 output mode爷辱。以下是兼容性:

image

輸出接收器(Output sink)

有幾種類型的內(nèi)置輸出接收器。

  • File sink:存儲輸出至目錄:
writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path", "path/to/destination/dir")
    .start()

  • Foreach sink:對輸出中的記錄運行任意計算:
writeStream
    .foreach(...)
    .start()

  • Console sink(用來調(diào)試):每次 trigger 將輸出打印到控制臺厚骗。支持 Append 和 Complete 模式领舰。僅適用于小數(shù)據(jù)量的調(diào)試之用迟螺,因為在每次 trigger 之后矩父,完整的輸出會被存儲在 driver 的內(nèi)存中,請謹慎使用:
writeStream
    .format("console")
    .start()

  • Memory sink(用來調(diào)試):輸出作為內(nèi)存表存儲在內(nèi)存中民轴。支持 Append 和 Complete 模式后裸。僅適用于小數(shù)據(jù)量的調(diào)試之用冒滩,因為在每次 trigger 之后开睡,完整的輸出會被存儲在 driver 的內(nèi)存中,請謹慎使用:
writeStream
    .format("memory")
    .queryName("tableName")
    .start()

某些接收器不容錯扶檐,因為它們不保證輸出的持久性蘸秘,僅用于調(diào)試目的蝗茁。請參閱上一節(jié)關(guān)于容錯語義的部分哮翘。以下是 Spark 中所有內(nèi)置接收器的詳細信息:

image

請注意饭寺,必須調(diào)用 start() 來實際啟動查詢的執(zhí)行叫挟。這將返回一個 StreamingQuery 對象抹恳,它是持續(xù)運行的查詢的句柄署驻。你可以使用該對象來管理查詢旺上,我們將在下一小節(jié)中討論⌒ǎ現(xiàn)在,讓我們通過幾個例子來了解:

// ========== DF with no aggregations ==========
val noAggDF = deviceDataDf.select("device").where("signal > 10")   

// Print new data to console
noAggDF
  .writeStream
  .format("console")
  .start()

// Write new data to Parquet files
noAggDF
  .writeStream
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start()

// ========== DF with aggregation ==========
val aggDF = df.groupBy("device").count()

// Print updated aggregations to console
aggDF
  .writeStream
  .outputMode("complete")
  .format("console")
  .start()

// Have all the aggregates in an in-memory table
aggDF
  .writeStream
  .queryName("aggregates")    // this query name will be the table name
  .outputMode("complete")
  .format("memory")
  .start()

spark.sql("select * from aggregates").show()   // interactively query in-memory table

使用 Foreach

foreach 操作允許在輸出數(shù)據(jù)上進行任意操作杭攻。在 Spark 2.1 中朴上,只有 Scala 和 Java 可用。要使用這個叼架,你必須實現(xiàn) ForeachWriter 接口乖订,其具有每次 trigger 后每當(dāng)有一系列行生成時會調(diào)用的方法乍构,注意一下幾個要點:

  • writer 必須是可序列化的,因為它將被序列化并發(fā)送給 executor 執(zhí)行
  • open岂丘、process 和 close 會在 executors 上被調(diào)用
  • 只有當(dāng) open 方法被調(diào)用時 writer 才執(zhí)行所有的初始化眠饮。請注意仪召,如果在創(chuàng)建對象時立即進行任何初始化,那么該初始化將在 driver 中發(fā)生已旧,這可能不是你預(yù)期的
  • open 方法可以使用 version 和 partition 來決定是否需要寫入序列的行运褪。可以返回 true(繼續(xù)寫入)或 false(無需寫入)胁后。如果返回 false攀芯,process 不會在任何行上被調(diào)用侣诺。例如氧秘,在部分失敗之后丸相,失敗的 trigger 的部分輸出分區(qū)可能已經(jīng)被提交到數(shù)據(jù)庫∩潘悖基于存儲在數(shù)據(jù)庫中的元數(shù)據(jù)涕蜂,可以識別已經(jīng)提交的分區(qū)映琳,因此返回 false 以避免再次提交它們萨西。
  • 每當(dāng) open 被調(diào)用原杂,close 也會被調(diào)用(除非 JVM 因為意外退出)。即使 open 返回 false 也是如此年局。如果在處理和寫入數(shù)據(jù)的時候發(fā)生錯誤,close 會被調(diào)用仲闽。你有責(zé)任清理在 open 中創(chuàng)建的狀態(tài)(例如連接赖欣,事務(wù)等)顶吮,以免資源泄漏

管理流式查詢

當(dāng) query 啟動時粪薛,StreamingQuery 被創(chuàng)建违寿,可以用來監(jiān)控和管理該 query:

val query = df.writeStream.format("console").start()   // get the query object

query.id          // get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId       // get the unique id of this run of the query, which will be generated at every start/restart

query.name        // get the name of the auto-generated or user-specified name

query.explain()   // print detailed explanations of the query

query.stop()      // stop the query

query.awaitTermination()   // block until query is terminated, with stop() or with error

query.exception       // the exception if the query has been terminated with error

query.recentProgress  // an array of the most recent progress updates for this query

query.lastProgress    // the most recent progress update of this streaming query

可以在單個 SparkSession 中啟動任意數(shù)量的 query藤巢。他們都將同時運行共享集群資源掂咒。可以調(diào)用 sparkSession.streams() 來獲取 StreamingQueryManager糜工,可以用來管理當(dāng)前 active queries:

val spark: SparkSession = ...

spark.streams.active    // get the list of currently active streaming queries

spark.streams.get(id)   // get a query object by its unique id

spark.streams.awaitAnyTermination()   // block until any one of them terminates

監(jiān)控流式查詢

有兩種 API 用于監(jiān)控和調(diào)試 active queries:以交互方式和異步方式。

交互式 APIs(Interactive APIs)

你可以調(diào)用 streamingQuery.lastProgress()streamingQuery.status() 來直接獲取某個 query 的當(dāng)前的狀態(tài)和指標嫉戚。lastProgress 返回一個 StreamingQueryProgress 對象澈圈。它具有關(guān)于流最后一個 trigger 的進度的所有信息瞬女,包括處理哪些數(shù)據(jù)诽偷、處理速度疯坤、處理延遲等压怠。還有 streamingQuery.recentProgress 返回最后幾個進度的數(shù)組菌瘫。

另外布卡,streamingQuery.status() 返回一個 StreamingQueryStatus忿等。它提供了有關(guān) query 執(zhí)行的信息这弧,比如是否有 trigger active,是否有數(shù)據(jù)正在被處理等皇帮。

以下是一些例子:

val query: StreamingQuery = ...

println(query.lastProgress)

/* Will print something like the following.

{
  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  "name" : "MyQuery",
  "timestamp" : "2016-12-14T18:45:24.873Z",
  "numInputRows" : 10,
  "inputRowsPerSecond" : 120.0,
  "processedRowsPerSecond" : 200.0,
  "durationMs" : {
    "triggerExecution" : 3,
    "getOffset" : 2
  },
  "eventTime" : {
    "watermark" : "2016-12-14T18:45:24.873Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-0]]",
    "startOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 1,
        "1" : 1,
        "3" : 1,
        "0" : 1
      }
    },
    "endOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 115,
        "1" : 134,
        "3" : 21,
        "0" : 534
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 120.0,
    "processedRowsPerSecond" : 200.0
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
}
*/

println(query.status)

/*  Will print something like the following.
{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
*/

異步 API

你還可以通過附加 StreamingQueryListener 異步監(jiān)控與 SparkSession 關(guān)聯(lián)的所有查詢属拾。一旦你通過 sparkSession.streams.attachListener() 附加了自定義的 StreamingQueryListener 對象渐白,當(dāng) query 啟動纯衍、結(jié)束襟诸、active 查詢有進展時就會被回調(diào)基协。下面是一個例子:

val spark: SparkSession = ...

spark.streams.addListener(new StreamingQueryListener() {
    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
        println("Query started: " + queryStarted.id)
    }
    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
        println("Query terminated: " + queryTerminated.id)
    }
    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
        println("Query made progress: " + queryProgress.progress)
    }
})

使用 checkpoint 從失敗中恢復(fù)

在失敗或主動 shutdown 的情況下澜驮,可以恢復(fù)之前的查詢進度和狀態(tài)并從該處繼續(xù)運行。這是依賴 checkpoint 和 WAL(write ahead logs) 來完成的揩慕。你可以配置一個 checkpoint 路徑扮休,query 會將進度信息(比如每個 trigger 處理的 offset ranger)和運行中的聚合寫入到 checkpoint 的位置玷坠。checkpoint 的路徑必須是一個 HDFS 兼容的文件系統(tǒng)八堡,并且需要在定義 query 的時候設(shè)置好,如下:

aggDF
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start()

作者:牛肉圓粉不加蔥
鏈接:http://www.reibang.com/p/ae07471c1f8d
來源:簡書
著作權(quán)歸作者所有缝龄。商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán)叔壤,非商業(yè)轉(zhuǎn)載請注明出處炼绘。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末俺亮,一起剝皮案震驚了整個濱河市疟呐,隨后出現(xiàn)的幾起案子启具,更是在濱河造成了極大的恐慌富纸,老刑警劉巖晓褪,帶你破解...
    沈念sama閱讀 218,204評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件涣仿,死亡現(xiàn)場離奇詭異,居然都是意外死亡愉镰,警方通過查閱死者的電腦和手機丈探,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評論 3 395
  • 文/潘曉璐 我一進店門碗降,熙熙樓的掌柜王于貴愁眉苦臉地迎上來讼渊,“玉大人尊剔,你說我怎么就攤上這事须误。” “怎么了叶组?”我有些...
    開封第一講書人閱讀 164,548評論 0 354
  • 文/不壞的土叔 我叫張陵甩十,是天一觀的道長侣监。 經(jīng)常有香客問我橄霉,道長邑蒋,這世上最難降的妖魔是什么医吊? 我笑而不...
    開封第一講書人閱讀 58,657評論 1 293
  • 正文 為了忘掉前任卿堂,我火速辦了婚禮,結(jié)果婚禮上策严,老公的妹妹穿的比我還像新娘妻导。我一直安慰自己怀各,他們只是感情好渠啤,可當(dāng)我...
    茶點故事閱讀 67,689評論 6 392
  • 文/花漫 我一把揭開白布沥曹。 她就那樣靜靜地躺著妓美,像睡著了一般壶栋。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上琉兜,一...
    開封第一講書人閱讀 51,554評論 1 305
  • 那天豌蟋,我揣著相機與錄音桑滩,去河邊找鬼运准。 笑死胁澳,一個胖子當(dāng)著我的面吹牛听哭,可吹牛的內(nèi)容都是我干的塘雳。 我是一名探鬼主播,決...
    沈念sama閱讀 40,302評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼太防,長吁一口氣:“原來是場噩夢啊……” “哼蜒车!你這毒婦竟也來了酿愧?” 一聲冷哼從身側(cè)響起邀泉,我...
    開封第一講書人閱讀 39,216評論 0 276
  • 序言:老撾萬榮一對情侶失蹤庞钢,失蹤者是張志新(化名)和其女友劉穎因谎,沒想到半個月后财岔,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體风皿,經(jīng)...
    沈念sama閱讀 45,661評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,851評論 3 336
  • 正文 我和宋清朗相戀三年匠璧,在試婚紗的時候發(fā)現(xiàn)自己被綠了桐款。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,977評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡患朱,死狀恐怖鲁僚,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情冰沙,我是刑警寧澤,帶...
    沈念sama閱讀 35,697評論 5 347
  • 正文 年R本政府宣布执虹,位于F島的核電站拓挥,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏袋励。R本人自食惡果不足惜侥啤,卻給世界環(huán)境...
    茶點故事閱讀 41,306評論 3 330
  • 文/蒙蒙 一当叭、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧盖灸,春花似錦蚁鳖、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至徙垫,卻和暖如春讥裤,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背姻报。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評論 1 270
  • 我被黑心中介騙來泰國打工己英, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人吴旋。 一個月前我還...
    沈念sama閱讀 48,138評論 3 370
  • 正文 我出身青樓损肛,卻偏偏與公主長得像,于是被迫代替她去往敵國和親邮府。 傳聞我的和親對象是個殘疾皇子荧关,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,927評論 2 355

推薦閱讀更多精彩內(nèi)容