Spark權(quán)威指南讀書筆記(五):流處理

第二十章 流處理基礎(chǔ)

什么是流處理

流處理是連續(xù)處理新到來的數(shù)據(jù)以更新計(jì)算結(jié)果的行為单山。在流處理中碍现,輸入數(shù)據(jù)是無邊界的,沒有預(yù)定的開始或結(jié)束饥侵。

流處理的挑戰(zhàn)
  • 基于應(yīng)用程序時(shí)間戳處理無序數(shù)據(jù)
  • 維持大量的狀態(tài)
  • 支持高吞吐
  • 即使有機(jī)器故障也僅需對(duì)事件進(jìn)行一次處理
  • 處理負(fù)載不平衡和拖延者
  • 快速響應(yīng)事件
  • 與其他存儲(chǔ)中的數(shù)據(jù)進(jìn)行連接
  • 確定新事件到達(dá)時(shí)如何更新輸出

流處理設(shè)計(jì)要點(diǎn)

記錄級(jí)別API和申明式API
  • 流處理API最簡(jiǎn)單的實(shí)現(xiàn)方法就是將每個(gè)事件傳遞給應(yīng)用程序鸵赫,并使用自定義代碼進(jìn)行響應(yīng)。提供這種記錄級(jí)別的API的流處理系統(tǒng)只是給用戶提供一個(gè)獲取每條流數(shù)據(jù)記錄的接口躏升,這樣許多復(fù)雜狀態(tài)需要由應(yīng)用程序負(fù)責(zé)辩棒。
  • 因此后續(xù)的流處理系統(tǒng)提供了聲明式API,應(yīng)用程序?yàn)榱隧憫?yīng)每個(gè)新事件指定要計(jì)算的內(nèi)容,而不是如何計(jì)算一睁,也不需要考慮如何從失敗中恢復(fù)钻弄。DStream API可以自動(dòng)追蹤每個(gè)操作處理的數(shù)據(jù)量,可靠地保存相關(guān)狀態(tài)者吁,并在需要的時(shí)候從失敗中恢復(fù)計(jì)算窘俺。
連續(xù)處理與微批處理
  • 連續(xù)處理模式中,每個(gè)節(jié)點(diǎn)都不斷偵聽來自其他節(jié)點(diǎn)的消息并將新的更新輸出到其子節(jié)點(diǎn)复凳。map-reduce中瘤泪,map的每個(gè)節(jié)點(diǎn)將從輸入源一個(gè)一個(gè)地讀取記錄,根據(jù)計(jì)算邏輯將它們發(fā)送到相應(yīng)的reducer育八,reducer獲取新記錄時(shí)对途,將更新狀態(tài)


    連續(xù)處理
  • 微批處理系統(tǒng)等待積累少量輸入數(shù)據(jù),然后使用分布式任務(wù)集合并行處理每個(gè)批次


    微批處理

Spark的流處理API

DStream API
  • 它完全基于Java/Python對(duì)象函數(shù)髓棋,而不是DataFrame Dataset中的結(jié)構(gòu)化表概念
  • 完全基于處理時(shí)間
  • 僅支持微批處理
結(jié)構(gòu)化流處理
  • Spark 2.2僅支持微批處理
  • 提供結(jié)構(gòu)化處理流式數(shù)據(jù)的可能

第二十一章 結(jié)構(gòu)化流處理基礎(chǔ)

結(jié)構(gòu)化流處理概述

結(jié)構(gòu)化流處理背后的主要思想是將數(shù)據(jù)流視為連續(xù)追加數(shù)據(jù)的數(shù)據(jù)表
結(jié)構(gòu)化流即是以流處理方式處理的DataFrame

結(jié)構(gòu)化流處理的輸入

核心概念

轉(zhuǎn)換和動(dòng)作

同樣適用這兩個(gè)概念实檀,只是略微有一些限制

輸入源
  • Kafka
  • HDFS
  • 用于測(cè)試的socket源
接收器
  • 需要指定數(shù)據(jù)源來讀取數(shù)據(jù)流,接收器(sink)和執(zhí)行引擎還負(fù)責(zé)可靠地跟蹤號(hào)數(shù)據(jù)處理的進(jìn)度
  • 支持的接收器:kafka按声、文件膳犹、用于測(cè)試的控制臺(tái)接收器、用于調(diào)試的內(nèi)存接收器签则、用于在輸出記錄上運(yùn)行任意計(jì)算的foreach接收器
輸出模式
  • append (向輸出接收器中添加新記錄)
  • update (更新有變化的記錄)
  • complete (重寫所有的輸出)
觸發(fā)器
  • 定義了數(shù)據(jù)何時(shí)被輸出
事件時(shí)間處理
  • watermarks 允許你制定在事件時(shí)間內(nèi)查看數(shù)據(jù)的
  • 支持事件時(shí)間的系統(tǒng)通常允許設(shè)置watermarks來限制它們記住舊數(shù)據(jù)的時(shí)長

結(jié)構(gòu)化流處理實(shí)例

val static = spark.read.json("/data/activity-data/")
val dataSchema = static.schema

val streaming = spark.readStream.schema(dataSchema)
  .option("maxFilesPerTrigger", 1).json("/data/activity-data")

val activityCounts = streaming.groupBy("gt").count()

spark.conf.set("spark.sql.shuffle.partitions", 5)

val activityQuery = activityCounts.writeStream.queryName("activity_counts")
  .format("memory").outputMode("complete")
  .start()
// 啟動(dòng)流式計(jì)算
activityQuery.awaitTermination()
// 查詢流數(shù)據(jù)
for( i <- 1 to 5 ) {
    spark.sql("SELECT * FROM activity_counts").show()
    Thread.sleep(1000)
}
數(shù)據(jù)輸入和輸出
// 讀取kafka
val ds3 = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load()
// 寫入kafka
ds1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream.format("kafka")
  .option("checkpointLocation", "/to/HDFS-compatible/dir")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()

// foreach接收器
datasetOfString.write.foreach(new ForeachWriter[String] {
  def open(partitionId: Long, version: Long): Boolean = {
    // open a database connection
  }
  def process(record: String) = {
    // write string to connection
  }
  def close(errorOrNull: Throwable): Unit = {
    // close the connection
  }
})

// 觸發(fā)器
import org.apache.spark.sql.streaming.Trigger

activityCounts.writeStream.trigger(Trigger.ProcessingTime("100 seconds"))
  .format("console").outputMode("complete").start()

第二十二章 事件時(shí)間和有狀態(tài)處理

有狀態(tài)處理

  • 當(dāng)你執(zhí)行有狀態(tài)操作時(shí)须床,Spark會(huì)為你處理所有復(fù)雜的事情。例如怀愧,在實(shí)現(xiàn)分組操作時(shí)侨颈,結(jié)構(gòu)化流處理會(huì)為你維護(hù)并更新信息,你只需指定處理邏輯芯义。在執(zhí)行有狀態(tài)操作時(shí)哈垢,Spark會(huì)將中間結(jié)果信息存儲(chǔ)在狀態(tài)存儲(chǔ)中。Spark當(dāng)前的狀態(tài)存儲(chǔ)實(shí)現(xiàn)是一個(gè)內(nèi)存狀態(tài)存儲(chǔ)扛拨,它通過將中間狀態(tài)存儲(chǔ)到檢查點(diǎn)目錄來實(shí)現(xiàn)容錯(cuò)耘分。

滾動(dòng)窗口

  • 窗口不會(huì)發(fā)生重疊,指定窗口的間隔
  • 時(shí)間窗口實(shí)際上是一個(gè)結(jié)構(gòu)體


    滾動(dòng)窗口
import org.apache.spark.sql.functions.{window, col}
withEventTime.groupBy(window(col("event_time"), "10 minutes")).count()
  .writeStream
  .queryName("events_per_window")
  .format("memory")
  .outputMode("complete")
  .start()

滑動(dòng)窗口

  • 窗口可以重疊


    滑動(dòng)窗口
import org.apache.spark.sql.functions.{window, col}
withEventTime.groupBy(window(col("event_time"), "10 minutes", "5 minutes"))
  .count()
  .writeStream
  .queryName("events_per_window")
  .format("memory")
  .outputMode("complete")
  .start()

使用水位處理延遲數(shù)據(jù)

  • 指定水位可以確定過期數(shù)據(jù)
  • 指定水位的方式
import org.apache.spark.sql.functions.{window, col}
withEventTime
  .withWatermark("event_time", "5 hours")
  .groupBy(window(col("event_time"), "10 minutes", "5 minutes"))
  .count()
  .writeStream
  .queryName("events_per_window")
  .format("memory")
  .outputMode("complete")
  .start()

在流中刪除重復(fù)項(xiàng)

dropDuplicates

import org.apache.spark.sql.functions.expr

withEventTime
  .withWatermark("event_time", "5 seconds")
  .dropDuplicates("User", "event_time")
  .groupBy("User")
  .count()
  .writeStream
  .queryName("deduplicated")
  .format("memory")
  .outputMode("complete")
  .start()

任意有狀態(tài)處理

  • 可以根據(jù)給定鍵的計(jì)數(shù)創(chuàng)建窗口
  • 如果特定時(shí)間范圍發(fā)生多個(gè)特定事件則報(bào)警
  • 如果不確定時(shí)間內(nèi)維護(hù)用戶會(huì)話绑警,保存這些會(huì)話一遍稍后進(jìn)行分析

執(zhí)行這類處理時(shí)需要做以下兩件事

  • 映射數(shù)據(jù)中的組求泰,對(duì)每組數(shù)據(jù)進(jìn)行操作,并為每個(gè)組生成至多一行(mapGroups WithState)
  • 映射數(shù)據(jù)中的組计盒,對(duì)每個(gè)組生成一行或多行(flatMapGroups WithState)
超時(shí)

可以通過GroupState.setTimeoutTimes tamp(...) API設(shè)置超時(shí)時(shí)間戳

輸出模式
  • mapGroupWithState僅支持update更新
  • flatMapGroupsWithState支持append追加輸出和update更新輸出
mapGroupsWithState

需要給出如下定義

  • 三個(gè)類定義:輸入定義渴频、狀態(tài)定義、可選的輸出定義
  • 基于鍵北启、事件迭代器和先前狀態(tài)的一個(gè)更新狀態(tài)函數(shù)
  • 超時(shí)時(shí)間函數(shù)
// 類定義
case class InputRow(user:String, timestamp:java.sql.Timestamp, activity:String)
case class UserState(user:String,
  var activity:String,
  var start:java.sql.Timestamp,
  var end:java.sql.Timestamp)

// 事件迭代器卜朗、狀態(tài)更新函數(shù)
def updateUserStateWithEvent(state:UserState, input:InputRow):UserState = {
  if (Option(input.timestamp).isEmpty) {
    return state
  }
  if (state.activity == input.activity) {

    if (input.timestamp.after(state.end)) {
      state.end = input.timestamp
    }
    if (input.timestamp.before(state.start)) {
      state.start = input.timestamp
    }
  } else {
    if (input.timestamp.after(state.end)) {
      state.start = input.timestamp
      state.end = input.timestamp
      state.activity = input.activity
    }
  }

  state
}

import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, GroupState}
def updateAcrossEvents(user:String,
  inputs: Iterator[InputRow],
  oldState: GroupState[UserState]):UserState = {
  var state:UserState = if (oldState.exists) oldState.get else UserState(user,
        "",
        new java.sql.Timestamp(6284160000000L),
        new java.sql.Timestamp(6284160L)
    )
  // we simply specify an old date that we can compare against and
  // immediately update based on the values in our data

  for (input <- inputs) {
    state = updateUserStateWithEvent(state, input)
    oldState.update(state)
  }
  state
}

// 啟動(dòng)
import org.apache.spark.sql.streaming.GroupStateTimeout
withEventTime
  .selectExpr("User as user",
    "cast(Creation_Time/1000000000 as timestamp) as timestamp", "gt as activity")
  .as[InputRow]
  .groupByKey(_.user)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout)(updateAcrossEvents)
  .writeStream
  .queryName("events_per_window")
  .format("memory")
  .outputMode("update")
  .start()
flatMapGroupsWithState

需要定義以下內(nèi)容

  • 三個(gè)類定義:輸入定義拔第、狀態(tài)定義、可選的輸出定義
  • 一個(gè)函數(shù)场钉,輸入?yún)?shù)為一個(gè)鍵蚊俺、一個(gè)多事件迭代器和先前狀態(tài)
  • 超時(shí)時(shí)間函數(shù)
case class InputRow(uid:String, timestamp:java.sql.Timestamp, x:Double,
  activity:String)
case class UserSession(val uid:String, var timestamp:java.sql.Timestamp,
  var activities: Array[String], var values: Array[Double])
case class UserSessionOutput(val uid:String, var activities: Array[String],
  var xAvg:Double)

def updateWithEvent(state:UserSession, input:InputRow):UserSession = {
  // handle malformed dates
  if (Option(input.timestamp).isEmpty) {
    return state
  }

  state.timestamp = input.timestamp
  state.values = state.values ++ Array(input.x)
  if (!state.activities.contains(input.activity)) {
    state.activities = state.activities ++ Array(input.activity)
  }
  state
}

import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode,
  GroupState}

def updateAcrossEvents(uid:String,
  inputs: Iterator[InputRow],
  oldState: GroupState[UserSession]):Iterator[UserSessionOutput] = {

  inputs.toSeq.sortBy(_.timestamp.getTime).toIterator.flatMap { input =>
    val state = if (oldState.exists) oldState.get else UserSession(
    uid,
    new java.sql.Timestamp(6284160000000L),
    Array(),
    Array())
    val newState = updateWithEvent(state, input)

    if (oldState.hasTimedOut) {
      val state = oldState.get
      oldState.remove()
      Iterator(UserSessionOutput(uid,
      state.activities,
      newState.values.sum / newState.values.length.toDouble))
    } else if (state.values.length > 1000) {
      val state = oldState.get
      oldState.remove()
      Iterator(UserSessionOutput(uid,
      state.activities,
      newState.values.sum / newState.values.length.toDouble))
    } else {
      oldState.update(newState)
      oldState.setTimeoutTimestamp(newState.timestamp.getTime(), "5 seconds")
      Iterator()
    }

  }
}

import org.apache.spark.sql.streaming.GroupStateTimeout

withEventTime.where("x is not null")
  .selectExpr("user as uid",
    "cast(Creation_Time/1000000000 as timestamp) as timestamp",
    "x", "gt as activity")
  .as[InputRow]
  .withWatermark("timestamp", "5 seconds")
  .groupByKey(_.uid)
  .flatMapGroupsWithState(OutputMode.Append,
    GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents)
  .writeStream
  .queryName("count_based_device")
  .format("memory")
  .start()
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市逛万,隨后出現(xiàn)的幾起案子泳猬,更是在濱河造成了極大的恐慌,老刑警劉巖宇植,帶你破解...
    沈念sama閱讀 216,744評(píng)論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件得封,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡当纱,警方通過查閱死者的電腦和手機(jī)呛每,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,505評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來坡氯,“玉大人,你說我怎么就攤上這事洋腮◇锪” “怎么了?”我有些...
    開封第一講書人閱讀 163,105評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵啥供,是天一觀的道長悯恍。 經(jīng)常有香客問我,道長伙狐,這世上最難降的妖魔是什么涮毫? 我笑而不...
    開封第一講書人閱讀 58,242評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮贷屎,結(jié)果婚禮上罢防,老公的妹妹穿的比我還像新娘。我一直安慰自己唉侄,他們只是感情好咒吐,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,269評(píng)論 6 389
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著属划,像睡著了一般恬叹。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上同眯,一...
    開封第一講書人閱讀 51,215評(píng)論 1 299
  • 那天绽昼,我揣著相機(jī)與錄音,去河邊找鬼须蜗。 笑死硅确,一個(gè)胖子當(dāng)著我的面吹牛目溉,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播疏魏,決...
    沈念sama閱讀 40,096評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼停做,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了大莫?” 一聲冷哼從身側(cè)響起蛉腌,我...
    開封第一講書人閱讀 38,939評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎只厘,沒想到半個(gè)月后烙丛,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,354評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡羔味,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,573評(píng)論 2 333
  • 正文 我和宋清朗相戀三年河咽,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片赋元。...
    茶點(diǎn)故事閱讀 39,745評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡忘蟹,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出搁凸,到底是詐尸還是另有隱情媚值,我是刑警寧澤,帶...
    沈念sama閱讀 35,448評(píng)論 5 344
  • 正文 年R本政府宣布护糖,位于F島的核電站褥芒,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏嫡良。R本人自食惡果不足惜锰扶,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,048評(píng)論 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望寝受。 院中可真熱鬧坷牛,春花似錦、人聲如沸羡蛾。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,683評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽痴怨。三九已至忙干,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間浪藻,已是汗流浹背捐迫。 一陣腳步聲響...
    開封第一講書人閱讀 32,838評(píng)論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留爱葵,地道東北人施戴。 一個(gè)月前我還...
    沈念sama閱讀 47,776評(píng)論 2 369
  • 正文 我出身青樓反浓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親赞哗。 傳聞我的和親對(duì)象是個(gè)殘疾皇子雷则,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,652評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容