spark structedStreaming是如何實(shí)現(xiàn)容錯(cuò)的

sss如何實(shí)現(xiàn)eoc的

spark structed Streaming簡(jiǎn)稱sss,它主要還是采用微批的模式提供端到端的eoc(exactly-once)語(yǔ)義,要實(shí)現(xiàn)eoc疗疟,需要3方面保證鞍匾,一個(gè)是可以replay的source,二是框架提供作業(yè)狀態(tài)的持久化能力瘸恼,三是sinker要實(shí)現(xiàn)冪等

DataSource

DataSource要replayable,就是指數(shù)據(jù)源可以追蹤當(dāng)前讀取的位置岩睁,并且能夠從上次失敗的位置重新消費(fèi)數(shù)據(jù)
這兩點(diǎn)可以保證能夠從持久化的狀態(tài)中恢復(fù)任務(wù)钞脂,比如apache kafka和Amazon Kinesis,kafka消費(fèi)可以commit offset捕儒,可以根據(jù)offset seek到指定的位置開始消費(fèi)冰啃;
如果是quickstart中的socket數(shù)據(jù)源類型,它就不能replay刘莹,也就無(wú)法實(shí)現(xiàn)eoc

追蹤數(shù)據(jù)處理的點(diǎn)位主要依賴spark提供的checkpoint機(jī)制阎毅,checkpoint保存的信息主要是當(dāng)前批次的數(shù)據(jù)源的點(diǎn)位等元數(shù)據(jù)信息

StreamingQueryManager的startQuery和createQuery方法,將checkpoint的位置傳給StreamExecution對(duì)象
StreamExecution初始化org.apache.spark.sql.execution.streaming.OffsetSeqLog点弯,這就是文檔中提到的wal日志扇调,從名字就可以猜測(cè)它的功能是順序?qū)憯?shù)據(jù)源的點(diǎn)位信息,類似于數(shù)據(jù)庫(kù)的事務(wù)日志哦抢肛,它記錄了已經(jīng)處理的每一批數(shù)據(jù)的點(diǎn)位信息狼钮,當(dāng)前批次N在處理完之前,就先把點(diǎn)位信息寫到OffsetSeqLog捡絮,第N批的點(diǎn)位寫進(jìn)OffsetSeqLog意味著第N-1批數(shù)據(jù)已經(jīng)正確的交給了sinker

org.apache.spark.sql.execution.streaming.MicroBatchExecution#constructNextBatch()
我們來(lái)看這個(gè)代碼片段

updateStatusMessage("Writing offsets to log")
reportTimeTaken("walCommit") {
  assert(offsetLog.add(
    currentBatchId,
    availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)),
    s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
    logInfo(s"Committed offsets for batch $currentBatchId. " +
    s"Metadata ${offsetSeqMetadata.toString}")
 
  // NOTE: The following code is correct because runStream() processes exactly one
  // batch at a time. If we add pipeline parallelism (multiple batches in flight at
  // the same time), this cleanup logic will need to change.
 
  // Now that we've updated the scheduler's persistent checkpoint, it is safe for the
  // sources to discard data from the previous batch.
  if (currentBatchId != 0) {
    val prevBatchOff = offsetLog.get(currentBatchId - 1)
    if (prevBatchOff.isDefined) {
      prevBatchOff.get.toStreamProgress(sources).foreach {
        case (src: Source, off) => src.commit(off)
        case (reader: MicroBatchReader, off) =>
          reader.commit(reader.deserializeOffset(off.json))
      }
    } else {
      throw new IllegalStateException(s"batch $currentBatchId doesn't exist")
    }
  }
   
  // It is now safe to discard the metadata beyond the minimum number to retain.
  // Note that purge is exclusive, i.e. it purges everything before the target ID.
  if (minLogEntriesToMaintain < currentBatchId) {
    offsetLog.purge(currentBatchId - minLogEntriesToMaintain)
    commitLog.purge(currentBatchId - minLogEntriesToMaintain)
  }
}

作業(yè)恢復(fù)的時(shí)候熬芜,如果offset checkpoint文件存在,那么sss會(huì)解析當(dāng)前批次的id福稳,它對(duì)應(yīng)的要處理的offsets涎拉,以及已經(jīng)commit的N-1批次的offsets,

這時(shí)候sss再檢查這個(gè)第N批有沒(méi)有處理完畢的圆,即看一下最后commit的offset是不是跟N批的offset相同鼓拧,如果相同,那么就會(huì)執(zhí)行N+1新批次的處理越妈,比如上次最后執(zhí)行到1102批季俩,那么解析出來(lái)發(fā)現(xiàn)1102批處理完成了,那么批次從1103批開始執(zhí)行
否則重做1102批次

Data sinks

sinker要冪等梅掠,同時(shí)sss還提供一個(gè)commitLog酌住,它用來(lái)記錄所有的已經(jīng)完成的batch的id,跟offsetLog一樣瓤檐,這倆都可以retension


處理流程示意圖

trigger一批之后赂韵,總是先寫offsetlog,然后處理挠蛉,處理完之后寫commitlog

前面提到了offset的錯(cuò)誤恢復(fù)祭示,顯然,會(huì)有可能會(huì)重復(fù)消費(fèi)一批數(shù)據(jù)進(jìn)行處理谴古,導(dǎo)致到達(dá)sinker的數(shù)據(jù)出現(xiàn)重復(fù)质涛,eoc因此要求sinker必須實(shí)現(xiàn)冪等稠歉,sinker自己去重,比如寫key -value類型的數(shù)據(jù)庫(kù)汇陆,redis等怒炸,重復(fù)的數(shù)據(jù)并不影響結(jié)果

sss的狀態(tài)管理

狀態(tài)管理僅僅給應(yīng)用提供了at-least once的支持,要實(shí)現(xiàn)eoc還需要sinker是冪等的毡代,這跟flink的數(shù)據(jù)加barrier對(duì)齊后checkpoint不同阅羹,flink等于是框架層面實(shí)現(xiàn)了eoc,當(dāng)然flink也要求sinker必須是冪等的教寂,否則還是有可能有重復(fù)數(shù)據(jù)捏鱼,比如kafka-fllink-kafka,因?yàn)閷懡okafka sinker的那一批結(jié)果酪耕,雖然不用flink重新計(jì)算导梆,但是kafka如果沒(méi)有開啟事務(wù),那么夸session的producer無(wú)法保證冪等迂烁,也就是不知道要寫的結(jié)果數(shù)據(jù)看尼,到底成功寫進(jìn)去了幾條。

spark的狀態(tài)管理就是指持久化保存在checkpointLocation位置的那些數(shù)據(jù)

包括四類
1盟步、數(shù)據(jù)源的信息藏斩,因?yàn)橹С侄喾N數(shù)據(jù)源,所有要知道你用了什么數(shù)據(jù)源
2址芯、數(shù)據(jù)源的offsetLog
3灾茁、commitLog
4窜觉、應(yīng)用程序內(nèi)部的狀態(tài)(統(tǒng)計(jì)狀態(tài)等谷炸,用戶作業(yè)邏輯的狀態(tài),可能會(huì)非常多禀挫,非常大旬陡,比如你groupby url之類的試試)

前3個(gè)狀態(tài)信息,都是文本格式保存的语婴,帶有版本信息描孟,防止被不同版本的spark處理,破壞元數(shù)據(jù)等

故障恢復(fù)的流程

正常處理流程是先commit offset砰左,再處理匿醒,被sinker完全處理后 再寫commitlog
這種情況下,在恢復(fù)的時(shí)候缠导,加載最后commit的offsetLog,然后通過(guò)commitLog判斷那一批有沒(méi)有被正確處理

處理了廉羔,那么從新的offset開始處理,如果沒(méi)有僻造,那么憋他,就要重新做

應(yīng)該是按照微批做原子更新的孩饼,如果本批次沒(méi)有完成成功寫入commitLog,是可以回滾的

狀態(tài)管理跟flink的區(qū)別

整體上sss的這種狀態(tài)管理跟flink類似竹挡,spark因?yàn)槭俏⑴迫ⅲ敲淳涂梢宰雠脑訝顟B(tài)管理,flink因?yàn)槭沁B續(xù)的流揪罕,所以必須用barrier機(jī)制同步各個(gè)算子的狀態(tài)梯码,也相當(dāng)于利用barrier實(shí)現(xiàn)了微批,只不過(guò)flink的微批是“異步的”好啰,就是不用等你這一批執(zhí)行完忍些,就可以不斷的執(zhí)行下一批,而sss的是“同步的”坎怪,顯然異步的似乎效率更高一些罢坝,但是checkpoint如果太頻繁,頻繁等待barrier一致的話搅窿,也會(huì)有很多算子干等著啥也不干嘁酿,協(xié)調(diào)效率可能需要根據(jù)場(chǎng)景自行調(diào)優(yōu)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市男应,隨后出現(xiàn)的幾起案子闹司,更是在濱河造成了極大的恐慌,老刑警劉巖沐飘,帶你破解...
    沈念sama閱讀 218,607評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件游桩,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡耐朴,警方通過(guò)查閱死者的電腦和手機(jī)借卧,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,239評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)筛峭,“玉大人铐刘,你說(shuō)我怎么就攤上這事∮跋” “怎么了镰吵?”我有些...
    開封第一講書人閱讀 164,960評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)挂签。 經(jīng)常有香客問(wèn)我疤祭,道長(zhǎng),這世上最難降的妖魔是什么饵婆? 我笑而不...
    開封第一講書人閱讀 58,750評(píng)論 1 294
  • 正文 為了忘掉前任勺馆,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘谓传。我一直安慰自己蜈项,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,764評(píng)論 6 392
  • 文/花漫 我一把揭開白布续挟。 她就那樣靜靜地躺著紧卒,像睡著了一般。 火紅的嫁衣襯著肌膚如雪诗祸。 梳的紋絲不亂的頭發(fā)上跑芳,一...
    開封第一講書人閱讀 51,604評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音直颅,去河邊找鬼博个。 笑死,一個(gè)胖子當(dāng)著我的面吹牛功偿,可吹牛的內(nèi)容都是我干的盆佣。 我是一名探鬼主播,決...
    沈念sama閱讀 40,347評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼械荷,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼共耍!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起吨瞎,我...
    開封第一講書人閱讀 39,253評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤痹兜,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后颤诀,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體字旭,經(jīng)...
    沈念sama閱讀 45,702評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,893評(píng)論 3 336
  • 正文 我和宋清朗相戀三年崖叫,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了遗淳。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,015評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡归露,死狀恐怖洲脂,靈堂內(nèi)的尸體忽然破棺而出斤儿,到底是詐尸還是另有隱情剧包,我是刑警寧澤,帶...
    沈念sama閱讀 35,734評(píng)論 5 346
  • 正文 年R本政府宣布往果,位于F島的核電站疆液,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏陕贮。R本人自食惡果不足惜堕油,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,352評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧掉缺,春花似錦卜录、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,934評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至搜囱,卻和暖如春丑瞧,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背蜀肘。 一陣腳步聲響...
    開封第一講書人閱讀 33,052評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工绊汹, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人扮宠。 一個(gè)月前我還...
    沈念sama閱讀 48,216評(píng)論 3 371
  • 正文 我出身青樓西乖,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親坛增。 傳聞我的和親對(duì)象是個(gè)殘疾皇子浴栽,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,969評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 內(nèi)容 sparkStreaming簡(jiǎn)介 spark Streaming和Storm區(qū)別 Spark Streami...
    SUSUR_28f6閱讀 2,534評(píng)論 0 0
  • 1.前言 目前實(shí)時(shí)計(jì)算的業(yè)務(wù)場(chǎng)景越來(lái)越多典鸡,實(shí)時(shí)計(jì)算引擎技術(shù)及生態(tài)也越來(lái)越成熟。以Flink和Spark為首的實(shí)時(shí)計(jì)...
    java菜閱讀 1,712評(píng)論 0 1
  • 與一朵花對(duì)話 與一朵花對(duì)話 與生命對(duì)話 生命宛如一場(chǎng)盛放 為了這場(chǎng)盛放 承受了多少寒潮的凄苦 承受了幾多欲望的煎熬...
    千里軒閱讀 142評(píng)論 1 2
  • 李老師是小艾高中的班主任坏晦,個(gè)子矮小萝玷,長(zhǎng)著一幅六棱角的臉型,不茍言笑昆婿,古板中透著幾分刻鼻虻铩!說(shuō)實(shí)話仓蛆,小艾不是很喜歡他睁冬,...
    浮生貧樂(lè)閱讀 206評(píng)論 0 0
  • 當(dāng)孩子頂撞你的時(shí)候 作者:沉香紅 坐在高鐵上,前面有一個(gè)三歲小男孩看疙,不斷的指責(zé)與批評(píng)媽媽豆拨,聲音很大,語(yǔ)言很不文明能庆,...
    爾文書舍閱讀 139評(píng)論 0 1