第二十章 流處理基礎(chǔ)
什么是流處理
流處理是連續(xù)處理新到來的數(shù)據(jù)以更新計(jì)算結(jié)果的行為单山。在流處理中碍现,輸入數(shù)據(jù)是無邊界的,沒有預(yù)定的開始或結(jié)束饥侵。
流處理的挑戰(zhàn)
- 基于應(yīng)用程序時(shí)間戳處理無序數(shù)據(jù)
- 維持大量的狀態(tài)
- 支持高吞吐
- 即使有機(jī)器故障也僅需對(duì)事件進(jìn)行一次處理
- 處理負(fù)載不平衡和拖延者
- 快速響應(yīng)事件
- 與其他存儲(chǔ)中的數(shù)據(jù)進(jìn)行連接
- 確定新事件到達(dá)時(shí)如何更新輸出
流處理設(shè)計(jì)要點(diǎn)
記錄級(jí)別API和申明式API
- 流處理API最簡(jiǎn)單的實(shí)現(xiàn)方法就是將每個(gè)事件傳遞給應(yīng)用程序鸵赫,并使用自定義代碼進(jìn)行響應(yīng)。提供這種記錄級(jí)別的API的流處理系統(tǒng)只是給用戶提供一個(gè)獲取每條流數(shù)據(jù)記錄的接口躏升,這樣許多復(fù)雜狀態(tài)需要由應(yīng)用程序負(fù)責(zé)辩棒。
- 因此后續(xù)的流處理系統(tǒng)提供了聲明式API,應(yīng)用程序?yàn)榱隧憫?yīng)每個(gè)新事件指定要計(jì)算的內(nèi)容,而不是如何計(jì)算一睁,也不需要考慮如何從失敗中恢復(fù)钻弄。DStream API可以自動(dòng)追蹤每個(gè)操作處理的數(shù)據(jù)量,可靠地保存相關(guān)狀態(tài)者吁,并在需要的時(shí)候從失敗中恢復(fù)計(jì)算窘俺。
連續(xù)處理與微批處理
-
連續(xù)處理模式中,每個(gè)節(jié)點(diǎn)都不斷偵聽來自其他節(jié)點(diǎn)的消息并將新的更新輸出到其子節(jié)點(diǎn)复凳。map-reduce中瘤泪,map的每個(gè)節(jié)點(diǎn)將從輸入源一個(gè)一個(gè)地讀取記錄,根據(jù)計(jì)算邏輯將它們發(fā)送到相應(yīng)的reducer育八,reducer獲取新記錄時(shí)对途,將更新狀態(tài)
-
微批處理系統(tǒng)等待積累少量輸入數(shù)據(jù),然后使用分布式任務(wù)集合并行處理每個(gè)批次
Spark的流處理API
DStream API
- 它完全基于Java/Python對(duì)象函數(shù)髓棋,而不是DataFrame Dataset中的結(jié)構(gòu)化表概念
- 完全基于處理時(shí)間
- 僅支持微批處理
結(jié)構(gòu)化流處理
- Spark 2.2僅支持微批處理
- 提供結(jié)構(gòu)化處理流式數(shù)據(jù)的可能
第二十一章 結(jié)構(gòu)化流處理基礎(chǔ)
結(jié)構(gòu)化流處理概述
結(jié)構(gòu)化流處理背后的主要思想是將數(shù)據(jù)流視為連續(xù)追加數(shù)據(jù)的數(shù)據(jù)表
結(jié)構(gòu)化流即是以流處理方式處理的DataFrame
核心概念
轉(zhuǎn)換和動(dòng)作
同樣適用這兩個(gè)概念实檀,只是略微有一些限制
輸入源
- Kafka
- HDFS
- 用于測(cè)試的socket源
接收器
- 需要指定數(shù)據(jù)源來讀取數(shù)據(jù)流,接收器(sink)和執(zhí)行引擎還負(fù)責(zé)可靠地跟蹤號(hào)數(shù)據(jù)處理的進(jìn)度
- 支持的接收器:kafka按声、文件膳犹、用于測(cè)試的控制臺(tái)接收器、用于調(diào)試的內(nèi)存接收器签则、用于在輸出記錄上運(yùn)行任意計(jì)算的foreach接收器
輸出模式
- append (向輸出接收器中添加新記錄)
- update (更新有變化的記錄)
- complete (重寫所有的輸出)
觸發(fā)器
- 定義了數(shù)據(jù)何時(shí)被輸出
事件時(shí)間處理
- watermarks 允許你制定在事件時(shí)間內(nèi)查看數(shù)據(jù)的
- 支持事件時(shí)間的系統(tǒng)通常允許設(shè)置watermarks來限制它們記住舊數(shù)據(jù)的時(shí)長
結(jié)構(gòu)化流處理實(shí)例
val static = spark.read.json("/data/activity-data/")
val dataSchema = static.schema
val streaming = spark.readStream.schema(dataSchema)
.option("maxFilesPerTrigger", 1).json("/data/activity-data")
val activityCounts = streaming.groupBy("gt").count()
spark.conf.set("spark.sql.shuffle.partitions", 5)
val activityQuery = activityCounts.writeStream.queryName("activity_counts")
.format("memory").outputMode("complete")
.start()
// 啟動(dòng)流式計(jì)算
activityQuery.awaitTermination()
// 查詢流數(shù)據(jù)
for( i <- 1 to 5 ) {
spark.sql("SELECT * FROM activity_counts").show()
Thread.sleep(1000)
}
數(shù)據(jù)輸入和輸出
// 讀取kafka
val ds3 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
// 寫入kafka
ds1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream.format("kafka")
.option("checkpointLocation", "/to/HDFS-compatible/dir")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()
// foreach接收器
datasetOfString.write.foreach(new ForeachWriter[String] {
def open(partitionId: Long, version: Long): Boolean = {
// open a database connection
}
def process(record: String) = {
// write string to connection
}
def close(errorOrNull: Throwable): Unit = {
// close the connection
}
})
// 觸發(fā)器
import org.apache.spark.sql.streaming.Trigger
activityCounts.writeStream.trigger(Trigger.ProcessingTime("100 seconds"))
.format("console").outputMode("complete").start()
第二十二章 事件時(shí)間和有狀態(tài)處理
有狀態(tài)處理
- 當(dāng)你執(zhí)行有狀態(tài)操作時(shí)须床,Spark會(huì)為你處理所有復(fù)雜的事情。例如怀愧,在實(shí)現(xiàn)分組操作時(shí)侨颈,結(jié)構(gòu)化流處理會(huì)為你維護(hù)并更新信息,你只需指定處理邏輯芯义。在執(zhí)行有狀態(tài)操作時(shí)哈垢,Spark會(huì)將中間結(jié)果信息存儲(chǔ)在狀態(tài)存儲(chǔ)中。Spark當(dāng)前的狀態(tài)存儲(chǔ)實(shí)現(xiàn)是一個(gè)內(nèi)存狀態(tài)存儲(chǔ)扛拨,它通過將中間狀態(tài)存儲(chǔ)到檢查點(diǎn)目錄來實(shí)現(xiàn)容錯(cuò)耘分。
滾動(dòng)窗口
- 窗口不會(huì)發(fā)生重疊,指定窗口的間隔
-
時(shí)間窗口實(shí)際上是一個(gè)結(jié)構(gòu)體
import org.apache.spark.sql.functions.{window, col}
withEventTime.groupBy(window(col("event_time"), "10 minutes")).count()
.writeStream
.queryName("events_per_window")
.format("memory")
.outputMode("complete")
.start()
滑動(dòng)窗口
-
窗口可以重疊
import org.apache.spark.sql.functions.{window, col}
withEventTime.groupBy(window(col("event_time"), "10 minutes", "5 minutes"))
.count()
.writeStream
.queryName("events_per_window")
.format("memory")
.outputMode("complete")
.start()
使用水位處理延遲數(shù)據(jù)
- 指定水位可以確定過期數(shù)據(jù)
- 指定水位的方式
import org.apache.spark.sql.functions.{window, col}
withEventTime
.withWatermark("event_time", "5 hours")
.groupBy(window(col("event_time"), "10 minutes", "5 minutes"))
.count()
.writeStream
.queryName("events_per_window")
.format("memory")
.outputMode("complete")
.start()
在流中刪除重復(fù)項(xiàng)
dropDuplicates
import org.apache.spark.sql.functions.expr
withEventTime
.withWatermark("event_time", "5 seconds")
.dropDuplicates("User", "event_time")
.groupBy("User")
.count()
.writeStream
.queryName("deduplicated")
.format("memory")
.outputMode("complete")
.start()
任意有狀態(tài)處理
- 可以根據(jù)給定鍵的計(jì)數(shù)創(chuàng)建窗口
- 如果特定時(shí)間范圍發(fā)生多個(gè)特定事件則報(bào)警
- 如果不確定時(shí)間內(nèi)維護(hù)用戶會(huì)話绑警,保存這些會(huì)話一遍稍后進(jìn)行分析
執(zhí)行這類處理時(shí)需要做以下兩件事
- 映射數(shù)據(jù)中的組求泰,對(duì)每組數(shù)據(jù)進(jìn)行操作,并為每個(gè)組生成至多一行(mapGroups WithState)
- 映射數(shù)據(jù)中的組计盒,對(duì)每個(gè)組生成一行或多行(flatMapGroups WithState)
超時(shí)
可以通過GroupState.setTimeoutTimes tamp(...) API設(shè)置超時(shí)時(shí)間戳
輸出模式
- mapGroupWithState僅支持update更新
- flatMapGroupsWithState支持append追加輸出和update更新輸出
mapGroupsWithState
需要給出如下定義
- 三個(gè)類定義:輸入定義渴频、狀態(tài)定義、可選的輸出定義
- 基于鍵北启、事件迭代器和先前狀態(tài)的一個(gè)更新狀態(tài)函數(shù)
- 超時(shí)時(shí)間函數(shù)
// 類定義
case class InputRow(user:String, timestamp:java.sql.Timestamp, activity:String)
case class UserState(user:String,
var activity:String,
var start:java.sql.Timestamp,
var end:java.sql.Timestamp)
// 事件迭代器卜朗、狀態(tài)更新函數(shù)
def updateUserStateWithEvent(state:UserState, input:InputRow):UserState = {
if (Option(input.timestamp).isEmpty) {
return state
}
if (state.activity == input.activity) {
if (input.timestamp.after(state.end)) {
state.end = input.timestamp
}
if (input.timestamp.before(state.start)) {
state.start = input.timestamp
}
} else {
if (input.timestamp.after(state.end)) {
state.start = input.timestamp
state.end = input.timestamp
state.activity = input.activity
}
}
state
}
import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, GroupState}
def updateAcrossEvents(user:String,
inputs: Iterator[InputRow],
oldState: GroupState[UserState]):UserState = {
var state:UserState = if (oldState.exists) oldState.get else UserState(user,
"",
new java.sql.Timestamp(6284160000000L),
new java.sql.Timestamp(6284160L)
)
// we simply specify an old date that we can compare against and
// immediately update based on the values in our data
for (input <- inputs) {
state = updateUserStateWithEvent(state, input)
oldState.update(state)
}
state
}
// 啟動(dòng)
import org.apache.spark.sql.streaming.GroupStateTimeout
withEventTime
.selectExpr("User as user",
"cast(Creation_Time/1000000000 as timestamp) as timestamp", "gt as activity")
.as[InputRow]
.groupByKey(_.user)
.mapGroupsWithState(GroupStateTimeout.NoTimeout)(updateAcrossEvents)
.writeStream
.queryName("events_per_window")
.format("memory")
.outputMode("update")
.start()
flatMapGroupsWithState
需要定義以下內(nèi)容
- 三個(gè)類定義:輸入定義拔第、狀態(tài)定義、可選的輸出定義
- 一個(gè)函數(shù)场钉,輸入?yún)?shù)為一個(gè)鍵蚊俺、一個(gè)多事件迭代器和先前狀態(tài)
- 超時(shí)時(shí)間函數(shù)
case class InputRow(uid:String, timestamp:java.sql.Timestamp, x:Double,
activity:String)
case class UserSession(val uid:String, var timestamp:java.sql.Timestamp,
var activities: Array[String], var values: Array[Double])
case class UserSessionOutput(val uid:String, var activities: Array[String],
var xAvg:Double)
def updateWithEvent(state:UserSession, input:InputRow):UserSession = {
// handle malformed dates
if (Option(input.timestamp).isEmpty) {
return state
}
state.timestamp = input.timestamp
state.values = state.values ++ Array(input.x)
if (!state.activities.contains(input.activity)) {
state.activities = state.activities ++ Array(input.activity)
}
state
}
import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode,
GroupState}
def updateAcrossEvents(uid:String,
inputs: Iterator[InputRow],
oldState: GroupState[UserSession]):Iterator[UserSessionOutput] = {
inputs.toSeq.sortBy(_.timestamp.getTime).toIterator.flatMap { input =>
val state = if (oldState.exists) oldState.get else UserSession(
uid,
new java.sql.Timestamp(6284160000000L),
Array(),
Array())
val newState = updateWithEvent(state, input)
if (oldState.hasTimedOut) {
val state = oldState.get
oldState.remove()
Iterator(UserSessionOutput(uid,
state.activities,
newState.values.sum / newState.values.length.toDouble))
} else if (state.values.length > 1000) {
val state = oldState.get
oldState.remove()
Iterator(UserSessionOutput(uid,
state.activities,
newState.values.sum / newState.values.length.toDouble))
} else {
oldState.update(newState)
oldState.setTimeoutTimestamp(newState.timestamp.getTime(), "5 seconds")
Iterator()
}
}
}
import org.apache.spark.sql.streaming.GroupStateTimeout
withEventTime.where("x is not null")
.selectExpr("user as uid",
"cast(Creation_Time/1000000000 as timestamp) as timestamp",
"x", "gt as activity")
.as[InputRow]
.withWatermark("timestamp", "5 seconds")
.groupByKey(_.uid)
.flatMapGroupsWithState(OutputMode.Append,
GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents)
.writeStream
.queryName("count_based_device")
.format("memory")
.start()