Structured Streaming基礎(chǔ)編程模型
structured streaming的核心理念献丑,就是將數(shù)據(jù)流抽象成一張表阳距,而源源不斷過(guò)來(lái)的數(shù)據(jù)是持續(xù)地添加到這個(gè)表中的结借。這就產(chǎn)生了一種全新的流式計(jì)算模型,與離線計(jì)算模型是很類似的船老。你可以使用與在一個(gè)靜態(tài)表中執(zhí)行離線查詢相同的方式來(lái)編寫流式查詢。spark會(huì)采用一種增量執(zhí)行的方式來(lái)對(duì)表中源源不斷的數(shù)據(jù)進(jìn)行查詢馍管。我們可以將輸入數(shù)據(jù)流想象成是一張input table薪韩。數(shù)據(jù)流中每條新到達(dá)的數(shù)據(jù),都可以想象成是一條添加到表中的新數(shù)據(jù)罗捎。
針對(duì)輸入數(shù)據(jù)執(zhí)行的查詢拉盾,會(huì)產(chǎn)生一張result table。每個(gè)trigger interval倒得,比如說(shuō)1秒鐘夭禽,添加到input table中的新數(shù)據(jù)行,都會(huì)被增量地執(zhí)行我們定義的查詢操作讹躯,產(chǎn)生的結(jié)果會(huì)更新到結(jié)果表中蜀撑。當(dāng)結(jié)果表被更新的時(shí)候剩彬,我們可能會(huì)希望將結(jié)果表中變化的行寫入一個(gè)外部存儲(chǔ)中。
我們可以定義每次結(jié)果表中的數(shù)據(jù)更新時(shí)沃饶,以何種方式,將哪些數(shù)據(jù)寫入外部存儲(chǔ)糊肤。我們有多種模式的output:
- complete mode,被更新后的整個(gè)結(jié)果表中的數(shù)據(jù)业舍,都會(huì)被寫入外部存儲(chǔ)升酣。具體如何寫入,是根據(jù)不同的外部存儲(chǔ)自身來(lái)決定的下面。
- append mode绩聘,只有最近一次trigger之后,新增加到result table中的數(shù)據(jù)凿菩,會(huì)被寫入外部存儲(chǔ)蓄髓。只有當(dāng)我們確定,result table中已有的數(shù)據(jù)是肯定不會(huì)被改變時(shí)会喝,才應(yīng)該使用append mode。
- update mode枉阵,只有最近一次trigger之后预茄,result table中被更新的數(shù)據(jù),包括增加的和修改的拙徽,會(huì)被寫入外部存儲(chǔ)中诗宣。spark 2.0中還不支持這種mode。這種mode和complete mode不同召庞,沒(méi)有改變的數(shù)據(jù)是不會(huì)寫入外部存儲(chǔ)的来破。
我們可以以上篇的wordcount例子作為背景來(lái)理解忘古,lines dataframe是一個(gè)input table,而wordcounts dataframe就是一個(gè)result table送朱。當(dāng)應(yīng)用啟動(dòng)后干旁,spark會(huì)周期性地check socket輸入源中是否有新數(shù)據(jù)到達(dá)。如果有新數(shù)據(jù)到達(dá)商乎,那么spark會(huì)將之前的計(jì)算結(jié)果與新到達(dá)的數(shù)據(jù)整合起來(lái)祭阀,以增量的方式來(lái)運(yùn)行我們定義的計(jì)算操作,進(jìn)而計(jì)算出最新的單詞計(jì)數(shù)結(jié)果抹凳。
這種模型跟其他很多流式計(jì)算引擎都不同伦腐。大多數(shù)流式計(jì)算引擎都需要開(kāi)發(fā)人員自己來(lái)維護(hù)新數(shù)據(jù)與歷史數(shù)據(jù)的整合并進(jìn)行聚合操作。然后我們就需要自己去考慮和實(shí)現(xiàn)容錯(cuò)機(jī)制幸冻、數(shù)據(jù)一致性的語(yǔ)義等咳焚。然而在structured streaming的這種模式下,spark會(huì)負(fù)責(zé)將新到達(dá)的數(shù)據(jù)與歷史數(shù)據(jù)進(jìn)行整合革半,并完成正確的計(jì)算操作又官,同時(shí)更新result table,不需要我們?nèi)タ紤]這些事情六敬。
Structured Streaming之event-time和late-data process
event-time指的是嵌入在數(shù)據(jù)自身內(nèi)部的一個(gè)時(shí)間。在很多流式計(jì)算應(yīng)用中崖疤,我們可能都需要根據(jù)event-time來(lái)進(jìn)行處理典勇。例如,可能我們需要獲取某個(gè)設(shè)備每分鐘產(chǎn)生的事件的數(shù)量权烧,那么我們就需要使用事件產(chǎn)生時(shí)的時(shí)間伤溉,而不是spark接受到這條數(shù)據(jù)的時(shí)間。設(shè)備產(chǎn)生的每個(gè)事件都是input table中的一行數(shù)據(jù)板祝,而event-time就是這行數(shù)據(jù)的一個(gè)字段走净。這就可以支持我們進(jìn)行基于時(shí)間窗口的聚合操作(例如每分鐘的事件數(shù)量),只要針對(duì)input table中的event-time字段進(jìn)行分組和聚合即可伏伯。每個(gè)時(shí)間窗口就是一個(gè)分組说搅,而每一行都可以落入不同行的分組內(nèi)。因此弄唧,類似這樣的基于時(shí)間窗口的分組聚合操作,既可以被定義在一份靜態(tài)數(shù)據(jù)上迂猴,也可以被定義在一個(gè)實(shí)時(shí)數(shù)據(jù)流上背伴。
此外,這種模型也天然支持延遲到達(dá)的數(shù)據(jù)息尺,late-data疾掰。spark會(huì)負(fù)責(zé)更新result table,因此它有決定的控制權(quán)來(lái)針對(duì)延遲到達(dá)的數(shù)據(jù)進(jìn)行聚合結(jié)果的重新計(jì)算炭懊。雖然目前在spark 2.0中還沒(méi)有實(shí)現(xiàn)這個(gè)feature,但是未來(lái)會(huì)基于event-time watermark(水游旮埂)來(lái)實(shí)現(xiàn)這個(gè)late-data processing的feature。
Structured Streaming容錯(cuò)語(yǔ)義
structured streaming的核心設(shè)計(jì)理念和目標(biāo)之一愈涩,就是支持一次且僅一次的語(yǔ)義加矛。為了實(shí)現(xiàn)這個(gè)目標(biāo),structured streaming設(shè)計(jì)將source毁腿、sink和execution engine來(lái)追蹤計(jì)算處理的進(jìn)度苛茂,這樣就可以在任何一個(gè)步驟出現(xiàn)失敗時(shí)自動(dòng)重試。每個(gè)streaming source都被設(shè)計(jì)成支持offset草戈,進(jìn)而可以讓spark來(lái)追蹤讀取的位置侍瑟。spark基于checkpoint和wal來(lái)持久化保存每個(gè)trigger interval內(nèi)處理的offset的范圍。sink被設(shè)計(jì)成可以支持在多次計(jì)算處理時(shí)保持冪等性费韭,就是說(shuō)庭瑰,用同樣的一批數(shù)據(jù),無(wú)論多少次去更新sink督暂,都會(huì)保持一致和相同的狀態(tài)穷吮。這樣的話,綜合利用基于offset的source捡鱼,基于checkpoint和wal的execution engine,以及基于冪等性的sink缠诅,可以支持完整的一次且僅一次的語(yǔ)義。