sss如何實(shí)現(xiàn)eoc的
spark structed Streaming簡(jiǎn)稱sss,它主要還是采用微批的模式提供端到端的eoc(exactly-once)語(yǔ)義,要實(shí)現(xiàn)eoc疗疟,需要3方面保證鞍匾,一個(gè)是可以replay的source,二是框架提供作業(yè)狀態(tài)的持久化能力瘸恼,三是sinker要實(shí)現(xiàn)冪等
DataSource
DataSource要replayable,就是指數(shù)據(jù)源可以追蹤當(dāng)前讀取的位置岩睁,并且能夠從上次失敗的位置重新消費(fèi)數(shù)據(jù)
這兩點(diǎn)可以保證能夠從持久化的狀態(tài)中恢復(fù)任務(wù)钞脂,比如apache kafka和Amazon Kinesis,kafka消費(fèi)可以commit offset捕儒,可以根據(jù)offset seek到指定的位置開始消費(fèi)冰啃;
如果是quickstart中的socket數(shù)據(jù)源類型,它就不能replay刘莹,也就無(wú)法實(shí)現(xiàn)eoc
追蹤數(shù)據(jù)處理的點(diǎn)位主要依賴spark提供的checkpoint機(jī)制阎毅,checkpoint保存的信息主要是當(dāng)前批次的數(shù)據(jù)源的點(diǎn)位等元數(shù)據(jù)信息
StreamingQueryManager的startQuery和createQuery方法,將checkpoint的位置傳給StreamExecution對(duì)象
StreamExecution初始化org.apache.spark.sql.execution.streaming.OffsetSeqLog点弯,這就是文檔中提到的wal日志扇调,從名字就可以猜測(cè)它的功能是順序?qū)憯?shù)據(jù)源的點(diǎn)位信息,類似于數(shù)據(jù)庫(kù)的事務(wù)日志哦抢肛,它記錄了已經(jīng)處理的每一批數(shù)據(jù)的點(diǎn)位信息狼钮,當(dāng)前批次N在處理完之前,就先把點(diǎn)位信息寫到OffsetSeqLog捡絮,第N批的點(diǎn)位寫進(jìn)OffsetSeqLog意味著第N-1批數(shù)據(jù)已經(jīng)正確的交給了sinker
org.apache.spark.sql.execution.streaming.MicroBatchExecution#constructNextBatch()
我們來(lái)看這個(gè)代碼片段
updateStatusMessage("Writing offsets to log")
reportTimeTaken("walCommit") {
assert(offsetLog.add(
currentBatchId,
availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
logInfo(s"Committed offsets for batch $currentBatchId. " +
s"Metadata ${offsetSeqMetadata.toString}")
// NOTE: The following code is correct because runStream() processes exactly one
// batch at a time. If we add pipeline parallelism (multiple batches in flight at
// the same time), this cleanup logic will need to change.
// Now that we've updated the scheduler's persistent checkpoint, it is safe for the
// sources to discard data from the previous batch.
if (currentBatchId != 0) {
val prevBatchOff = offsetLog.get(currentBatchId - 1)
if (prevBatchOff.isDefined) {
prevBatchOff.get.toStreamProgress(sources).foreach {
case (src: Source, off) => src.commit(off)
case (reader: MicroBatchReader, off) =>
reader.commit(reader.deserializeOffset(off.json))
}
} else {
throw new IllegalStateException(s"batch $currentBatchId doesn't exist")
}
}
// It is now safe to discard the metadata beyond the minimum number to retain.
// Note that purge is exclusive, i.e. it purges everything before the target ID.
if (minLogEntriesToMaintain < currentBatchId) {
offsetLog.purge(currentBatchId - minLogEntriesToMaintain)
commitLog.purge(currentBatchId - minLogEntriesToMaintain)
}
}
作業(yè)恢復(fù)的時(shí)候熬芜,如果offset checkpoint文件存在,那么sss會(huì)解析當(dāng)前批次的id福稳,它對(duì)應(yīng)的要處理的offsets涎拉,以及已經(jīng)commit的N-1批次的offsets,
這時(shí)候sss再檢查這個(gè)第N批有沒(méi)有處理完畢的圆,即看一下最后commit的offset是不是跟N批的offset相同鼓拧,如果相同,那么就會(huì)執(zhí)行N+1新批次的處理越妈,比如上次最后執(zhí)行到1102批季俩,那么解析出來(lái)發(fā)現(xiàn)1102批處理完成了,那么批次從1103批開始執(zhí)行
否則重做1102批次
Data sinks
sinker要冪等梅掠,同時(shí)sss還提供一個(gè)commitLog酌住,它用來(lái)記錄所有的已經(jīng)完成的batch的id,跟offsetLog一樣瓤檐,這倆都可以retension
trigger一批之后赂韵,總是先寫offsetlog,然后處理挠蛉,處理完之后寫commitlog
前面提到了offset的錯(cuò)誤恢復(fù)祭示,顯然,會(huì)有可能會(huì)重復(fù)消費(fèi)一批數(shù)據(jù)進(jìn)行處理谴古,導(dǎo)致到達(dá)sinker的數(shù)據(jù)出現(xiàn)重復(fù)质涛,eoc因此要求sinker必須實(shí)現(xiàn)冪等稠歉,sinker自己去重,比如寫key -value類型的數(shù)據(jù)庫(kù)汇陆,redis等怒炸,重復(fù)的數(shù)據(jù)并不影響結(jié)果
sss的狀態(tài)管理
狀態(tài)管理僅僅給應(yīng)用提供了at-least once的支持,要實(shí)現(xiàn)eoc還需要sinker是冪等的毡代,這跟flink的數(shù)據(jù)加barrier對(duì)齊后checkpoint不同阅羹,flink等于是框架層面實(shí)現(xiàn)了eoc,當(dāng)然flink也要求sinker必須是冪等的教寂,否則還是有可能有重復(fù)數(shù)據(jù)捏鱼,比如kafka-fllink-kafka,因?yàn)閷懡okafka sinker的那一批結(jié)果酪耕,雖然不用flink重新計(jì)算导梆,但是kafka如果沒(méi)有開啟事務(wù),那么夸session的producer無(wú)法保證冪等迂烁,也就是不知道要寫的結(jié)果數(shù)據(jù)看尼,到底成功寫進(jìn)去了幾條。
spark的狀態(tài)管理就是指持久化保存在checkpointLocation位置的那些數(shù)據(jù)
包括四類
1盟步、數(shù)據(jù)源的信息藏斩,因?yàn)橹С侄喾N數(shù)據(jù)源,所有要知道你用了什么數(shù)據(jù)源
2址芯、數(shù)據(jù)源的offsetLog
3灾茁、commitLog
4窜觉、應(yīng)用程序內(nèi)部的狀態(tài)(統(tǒng)計(jì)狀態(tài)等谷炸,用戶作業(yè)邏輯的狀態(tài),可能會(huì)非常多禀挫,非常大旬陡,比如你groupby url之類的試試)
前3個(gè)狀態(tài)信息,都是文本格式保存的语婴,帶有版本信息描孟,防止被不同版本的spark處理,破壞元數(shù)據(jù)等
正常處理流程是先commit offset砰左,再處理匿醒,被sinker完全處理后 再寫commitlog
這種情況下,在恢復(fù)的時(shí)候缠导,加載最后commit的offsetLog,然后通過(guò)commitLog判斷那一批有沒(méi)有被正確處理
處理了廉羔,那么從新的offset開始處理,如果沒(méi)有僻造,那么憋他,就要重新做
應(yīng)該是按照微批做原子更新的孩饼,如果本批次沒(méi)有完成成功寫入commitLog,是可以回滾的
狀態(tài)管理跟flink的區(qū)別
整體上sss的這種狀態(tài)管理跟flink類似竹挡,spark因?yàn)槭俏⑴迫ⅲ敲淳涂梢宰雠脑訝顟B(tài)管理,flink因?yàn)槭沁B續(xù)的流揪罕,所以必須用barrier機(jī)制同步各個(gè)算子的狀態(tài)梯码,也相當(dāng)于利用barrier實(shí)現(xiàn)了微批,只不過(guò)flink的微批是“異步的”好啰,就是不用等你這一批執(zhí)行完忍些,就可以不斷的執(zhí)行下一批,而sss的是“同步的”坎怪,顯然異步的似乎效率更高一些罢坝,但是checkpoint如果太頻繁,頻繁等待barrier一致的話搅窿,也會(huì)有很多算子干等著啥也不干嘁酿,協(xié)調(diào)效率可能需要根據(jù)場(chǎng)景自行調(diào)優(yōu)