論文概要
該論文是Spark團(tuán)隊(duì)在2018年發(fā)表的一篇基于Spark引擎之上新構(gòu)建的一套Streaming api疆虚,Structured Streaming項(xiàng)目在2016年就已經(jīng)開始開發(fā)了要糊。
Structured Streaming采用了不同于Spark Streaming趁餐、Flink這類DataStream的角度來處理流數(shù)據(jù)凳枝。Structured Streaming視圖通過“增量查詢模型”來處理流數(shù)據(jù)夹姥,將流式數(shù)據(jù)作為增量數(shù)據(jù)(也就是分批)進(jìn)行處理。
待定狐血,看他API饲做。
流處理現(xiàn)狀與挑戰(zhàn)
雖然近些年分布式流處理系統(tǒng)取得了巨大的進(jìn)步,但是我們基于我們?cè)赟park Streaming上的經(jīng)驗(yàn),認(rèn)為在實(shí)際使用過程中仍然具有很多挑戰(zhàn)蜒灰。
復(fù)雜低級(jí)的API(Complex and Low-Level APIs)
流處理系統(tǒng)之所以在使用上相較批處理復(fù)雜很多弦蹂,主要的原因就是流處理系統(tǒng)中的復(fù)雜API語義。這些復(fù)雜的語義需要用戶感知和并且指定具體的物理執(zhí)行操作强窖,比如物理執(zhí)行計(jì)劃編排凸椿、at-least-once語義、state storage和觸發(fā)模式等翅溺。
比如在Google Dataflow模型中脑漫,能夠處理基于event-time聚合、窗口和亂序數(shù)據(jù)咙崎。但是需要用戶指定窗口模式优幸、觸發(fā)模式等,這樣就需要用戶手寫物理算子執(zhí)行計(jì)劃圖褪猛。向Spark Streaming和Apache Flink的DataStream API网杆,也需要編排任務(wù)的物理執(zhí)行DAG,并且管理復(fù)雜的狀態(tài)伊滋。
對(duì)于這類問題碳却,Structured Streaming使用增量查詢模型來解決。對(duì)于簡(jiǎn)單的流應(yīng)用笑旺,直接使用增量查詢模型來進(jìn)行表達(dá)追城;對(duì)于復(fù)雜的應(yīng)用,也可以在該模型之上很容易的添加自定Stateful Operator來滿足燥撞。(感覺就是Flink的High-level SQL和Stateful Stream Processing API)
Structured Streaming的自動(dòng)增量查詢模型就是通過對(duì)Spark SQL和DataFrame/DataSet API描述的靜態(tài)數(shù)據(jù)集進(jìn)行增量查詢。所以用戶只需要了解Spark 的批處理API就可以編寫流處理作業(yè)迷帜。
不太同意論文這里的描述物舒。首先,Structured Streaming增量模型就是直接使用的Saprk SQL和DataFrame API戏锹。但像Flink不止有DataStream 這類low level api冠胯,也很早就支持SQL、Table API這類High-level API了锦针。
端到端集成挑戰(zhàn)
在實(shí)際使用中荠察,流處理系統(tǒng)往往是大型業(yè)務(wù)系統(tǒng)的一部分,業(yè)務(wù)系統(tǒng)同時(shí)也包括批處理奈搜、join靜態(tài)數(shù)據(jù)和交互查詢等悉盆。傳統(tǒng)的流處理系統(tǒng)API主要關(guān)注的是讀取數(shù)據(jù)源數(shù)據(jù),并將流數(shù)據(jù)輸出到接收器中馋吗,但作為端到端的業(yè)務(wù)應(yīng)用程序焕盟,可能還需要執(zhí)行其他任務(wù)。
比如在ETL場(chǎng)景宏粤,任務(wù)可能需要去join另一個(gè)存儲(chǔ)系統(tǒng)或批計(jì)算轉(zhuǎn)換出來的靜態(tài)數(shù)據(jù)脚翘。這時(shí)候保證兩個(gè)系統(tǒng)數(shù)據(jù)的一致性灼卢,以及能夠使用一套API來將業(yè)務(wù)表達(dá)出來,就非常重要了来农。再比如在我一個(gè)流處理應(yīng)用中鞋真,我需要通過批出的能力來處理一些歷史數(shù)據(jù)(追數(shù)據(jù))。
對(duì)于這類問題沃于,我們將Structured Streaming和Spark 中的批處理和交互式API 進(jìn)行了緊密的集成涩咖。
我理解Google Dataflow模型視圖要解決的其中一個(gè)問題就是將通過一套執(zhí)行引擎來支持批、微批揽涮、流等場(chǎng)景抠藕,所以應(yīng)該能夠很容易做流批這類集成。但是目前Flink貌似確實(shí)不支持流批的集成蒋困。
運(yùn)維挑戰(zhàn)
部署完流應(yīng)用后盾似,如何管理和運(yùn)維這些任務(wù)也是一項(xiàng)非常大的挑戰(zhàn),比如失敗處理(Failover雪标,包括單點(diǎn)故障零院、優(yōu)雅停止作業(yè)、作業(yè)重啟繼續(xù)處理等)村刨、代碼更新(應(yīng)用代碼變更告抄、系統(tǒng)版本升級(jí),這類作業(yè)重啟后需要繼續(xù)從之前的位置進(jìn)行數(shù)據(jù)處理)嵌牺、彈性伸縮(Rescaling)打洼、熱點(diǎn)處理(Stragglers)和監(jiān)控等等。
對(duì)于運(yùn)維挑戰(zhàn)逆粹,Structured Streaming通過以下幾種方式來處理:
容錯(cuò)處理募疮,通過WAL和state store 進(jìn)行回滾和恢復(fù)。
節(jié)點(diǎn)crash僻弹、伸縮和慢節(jié)點(diǎn)處理阿浓,通過自動(dòng)新增節(jié)點(diǎn)來支持。
UDF代碼更新蹋绽,通過作業(yè)停止芭毙、重啟解決。
微批處理模式卸耘,通過自適應(yīng)批大小(adaptively batch)來應(yīng)對(duì)負(fù)載高峰和追歷史數(shù)據(jù)退敦。
成本和性能挑戰(zhàn)
流應(yīng)用往往是7 * 24h運(yùn)行,如果系統(tǒng)不具備動(dòng)態(tài)伸縮能力蚣抗,那么就需要以高峰流量的資源來長(zhǎng)時(shí)間運(yùn)行任務(wù)苛聘,即便具備動(dòng)態(tài)伸縮能力,連續(xù)計(jì)算結(jié)果的成本可能也比運(yùn)行定期批處理作業(yè)昂貴。
對(duì)于這類問題设哗,Structured Streaming復(fù)用Spark SQL中所有執(zhí)行優(yōu)化能力唱捣,來提升系統(tǒng)的吞吐能力。
Structured Streaming組件架構(gòu)
下面是Structured Streaming組件構(gòu)成圖网梢,主要包括:數(shù)據(jù)流輸入和輸出connector震缭、API、Structured Streaing執(zhí)行引擎战虏。
輸入輸出Connector
Structured Streaming 提供了用于I/O的各類輸入數(shù)據(jù)源和輸出接收器拣宰,并且提供了“exactly-once”語義的輸出和容錯(cuò)。為了保證“exactly-once”語義烦感,Structured Streaming系統(tǒng)和其它流系統(tǒng)基本類似巡社,對(duì)輸入source和輸出sink有進(jìn)行了限制。
- 輸入source必須具備重放功能(replayable)手趣,允許重復(fù)讀取最近數(shù)據(jù)晌该。
- 輸出sink必須支持冪等寫(idempotent writes),如果節(jié)點(diǎn)在寫入sink時(shí)發(fā)生fo绿渣,能夠確背海可靠恢復(fù)。
上面組件圖我們看到中符,Structured Streaming除了能夠接收外部數(shù)據(jù)流(比如Kafka)系統(tǒng)外姜胖,還支持接收Spark SQL中的Table。而且Structured Streaming也能將數(shù)據(jù)流輸出到Spark Table中淀散,這樣用戶就可以對(duì)該table進(jìn)行交互查詢了右莱。(這就是Structured Streaming中端到端集成的方式,能夠接收batch 類型的table档插,也能輸出table進(jìn)行交互查詢)慢蜓。
API
上面我們提到過,Structured Streaming作業(yè)是可以通過Spark SQL的batch API(SQL和DataFrame)來描述任務(wù)的阀捅。
Structured Streaming為了能夠支持一些Streaming場(chǎng)景,在原有Spark SQL API之上针余,增加了一些新API饲鄙,這些新API也能在batch場(chǎng)景工作。
觸發(fā)控制(Triggers control)圆雁,觸發(fā)引擎計(jì)算新的結(jié)果集并且更新到輸出接收器中忍级。
指定event time字段和設(shè)置watermark 策略。
Stateful operator操作伪朽,允許用戶實(shí)現(xiàn)復(fù)雜任務(wù)處理轴咱,類似Spark Streaming中的"updateStateByKey" API。
上面的1和2是Structured Streaming從Google Dataflow 模型中借鑒的思想。
執(zhí)行引擎
執(zhí)行引擎主要分為三部分:遞增器(Incrementalizer)朴肺、優(yōu)化器(Optimizer)和處理器窖剑。遞增器和優(yōu)化器用于對(duì)用戶編寫的query進(jìn)行處理。
Structured Streaming提供了兩類處理器:微批模式的Microbatch Execution和連續(xù)處理模式的Coniuous Processing戈稿。默認(rèn)系統(tǒng)使用微批模式西土,微批模式支持動(dòng)態(tài)負(fù)載、伸縮鞍盗、容錯(cuò)恢復(fù)和慢節(jié)點(diǎn)處理需了。這兩類處理器都能支持容錯(cuò),都是用兩種形式的持久存儲(chǔ)來實(shí)現(xiàn)容錯(cuò)
- WAL(write-ahead log)般甲,WAL用于跟蹤數(shù)據(jù)是否可靠處理完成肋乍。對(duì)于一些sink,可以集成WAL來實(shí)現(xiàn)原子接收器(sink atomic)敷存。
- 大規(guī)模狀態(tài)存儲(chǔ)(state store)墓造,采用狀態(tài)存儲(chǔ)的方式來保存長(zhǎng)時(shí)間運(yùn)行的聚合算子狀態(tài)快照。
值得吐槽的是Spark 2.3開始支持連續(xù)處理模式历帚,但是該執(zhí)行器一直處于試驗(yàn)階段滔岳。
無論WAL還是Checkpoint,都是記錄每次觸發(fā)器中正在處理數(shù)據(jù)的偏移范圍挽牢。
Structured Streaming編程模型
Structured Streaming的核心思想就是將數(shù)據(jù)流抽象為一個(gè)連續(xù)不斷追加的表谱煤。這是一種新流處理模型,通過使用類似批處理的查詢禽拔,來查詢這個(gè)靜態(tài)無界的表刘离,查詢過程就是Spark對(duì)表中的增量數(shù)據(jù)進(jìn)行查詢。
數(shù)據(jù)流中的數(shù)據(jù)睹栖,作為這個(gè)無界表的新紀(jì)錄供Spark進(jìn)行增量查詢處理硫惕。
下面Structured Streaming的增量編程模型,每個(gè)觸發(fā)間隔(比如下面的1s)野来,新數(shù)據(jù)追加到input table中恼除,然后對(duì)input table中的增量部分進(jìn)行查詢(這個(gè)查詢就是map、flatmap這類算子)曼氛,并將增量結(jié)果更新到result table中豁辉,最后將result table根據(jù)不同的輸出模式,將result table輸出舀患。
下面是這張帶有實(shí)例數(shù)據(jù)的圖徽级,更容易理解。
第一次觸發(fā)時(shí)聊浅,輸入數(shù)據(jù)“cat dog”和“dog dog”餐抢,查詢邏輯對(duì)新增數(shù)據(jù)進(jìn)行word count查詢計(jì)算现使,得到“cat=1, dog = 3”旷痕。
第二次輸入數(shù)據(jù)觸發(fā)碳锈,讀取“owl cat”加入到input table中(此時(shí)input table 中包括了原始數(shù)據(jù)“cat dog”、“dog dog”和增量數(shù)據(jù)“owl cat”)苦蒿,這時(shí)候Spark在進(jìn)行增量查詢(查詢新加入表中的“owl cat”)殴胧,并和result table進(jìn)行合并。
第三次輸入“dog” 和 “owl”佩迟,查詢計(jì)算邏輯和上面一樣团滥。
下面是一段Structured Streaming的DataFrame API代碼示例,我們從代碼上來理解Structured Streaming的編程模型:
讀取指定目錄下的json數(shù)據(jù)(注意這里是流式讀取报强,也就是這個(gè)目錄會(huì)一直新增json文件)灸姊,data就是對(duì)應(yīng)了上面input table。
對(duì)data中增量數(shù)據(jù)進(jìn)行聚合查詢計(jì)數(shù)(按照數(shù)據(jù)中的"contry"字段)秉溉,counts對(duì)應(yīng)了上面的result table力惯。
將result table以parquet文件格式輸出,輸出模型為“complete”召嘶。
上面的模型圖和實(shí)例代碼父晶,我們都看到在result table到輸出到外部存儲(chǔ)時(shí),有一個(gè)output mode參數(shù)弄跌。該參數(shù)定義了Structured Streaming的輸出模式甲喝。
complete mode,將整個(gè)更新的結(jié)果表全量寫入到外部存儲(chǔ)铛只。如何存儲(chǔ)整個(gè)表取決于sink connector埠胖。比如上面將每次更新的全量結(jié)果都寫到一個(gè)parquet文件中。
append mode淳玩,只將上次result table中追加的內(nèi)容寫入到外部存儲(chǔ)直撤。該模式只適用于結(jié)果表不會(huì)有更新行的查詢場(chǎng)景。
update mode蜕着,只將上次result table中更新的內(nèi)容寫入到外部存儲(chǔ)谋竖。
查詢計(jì)劃
Structured Streaming的查詢計(jì)劃是通過Spark SQL中的Catalyst可擴(kuò)展優(yōu)化器來實(shí)現(xiàn)的。查詢計(jì)劃的處理主要分為三個(gè)階段:
分析階段(Analysis)承匣,分析階段主要用于校驗(yàn)用戶查詢是否能夠被增量執(zhí)行引擎執(zhí)行和解析查詢引用中的屬性和數(shù)據(jù)類型蓖乘。比如對(duì)于append 模式的輸出類型,只能用于查詢輸出結(jié)果是單調(diào)的查詢算子悄雅,也就是輸出結(jié)果不能移除驱敲。
增量化(Incrementalization)铁蹈,增量化的過程就是遞增用戶提供的靜態(tài)查詢宽闲,以便更新結(jié)果來響應(yīng)新數(shù)據(jù)众眨。(我理解就是比如之前有一組查詢了,新的查詢要和之前查詢邏輯合并容诬,已達(dá)到更新處理邏輯的目的)娩梨。在這步通過Catalyst轉(zhuǎn)換規(guī)則,將將查詢映射執(zhí)行計(jì)算和狀態(tài)管理的算子樹中览徒,也就是翻譯成執(zhí)行樹(對(duì)應(yīng)Flink中的DAG)狈定。比如aggregation會(huì)翻譯為StatefulAggregate算子。
查詢優(yōu)化(Query Optimization)习蓬,直接使用Spark SQL中的優(yōu)化規(guī)則纽什,比如謂詞下推、簡(jiǎn)化表達(dá)之類的躲叼。
應(yīng)用執(zhí)行
狀態(tài)管理和容錯(cuò)
上面我們提到過芦缰,Structured Streaming目前支持兩種類型的執(zhí)行模式,微批執(zhí)行模式和連續(xù)處理模式枫慷。無論哪種模式都是通過兩種外部存儲(chǔ)來管理狀態(tài)让蕾,支持低延遲原子寫入的WAL和能夠存儲(chǔ)大量數(shù)據(jù)的state store,比如HDFS或听、S3等探孝。Structured Streaming這種狀態(tài)管理方式基本和Spark Streaming類似。
上圖是Structured Streaming的狀態(tài)管理邏輯圖誉裆,可以看到:
Input operator負(fù)責(zé)定義每個(gè)epoch(紀(jì)元顿颅、時(shí)代)并且保存這個(gè)epoch的相關(guān)信息到WAL,比如offset(有一點(diǎn)類似Flink中Checkpoint對(duì)應(yīng)的offset)找御。
Stateful operator根據(jù)當(dāng)前的epoch也異步檢查點(diǎn)狀態(tài)(這個(gè)操作其實(shí)是將算子內(nèi)存數(shù)據(jù)寫到state中)元镀,需要注意的是這里不會(huì)每個(gè)epoch都去做Checkpoint(epoch很短)。
Output operator記錄哪些epoch的數(shù)據(jù)成功輸出到sink中霎桅,成功的epoch數(shù)據(jù)會(huì)commit到WAL中栖疑。因?yàn)閑poch是串行的,也就是只有上一個(gè)epoch commit后才能commit下一個(gè)epoch滔驶,所以當(dāng)節(jié)點(diǎn)fo時(shí)遇革,最后一個(gè)epoch數(shù)據(jù)可能會(huì)重新寫。
當(dāng)作業(yè)重啟或恢復(fù)時(shí)揭糕,應(yīng)用從WAL讀取最后一次沒有成功commit的epoch萝快,包括start offset和end offset。對(duì)于Stateful operator會(huì)加載最近一次的epoch的狀態(tài)數(shù)據(jù)著角,上面我們說過Stateful operator并不會(huì)每次epoch都生成檢查點(diǎn)揪漩,所以Stateful operator從state store加載最近的狀態(tài)數(shù)據(jù)同時(shí),以這個(gè)狀態(tài)數(shù)據(jù)的offset重新進(jìn)行計(jì)算(這時(shí)候會(huì)禁掉output)吏口,等狀態(tài)數(shù)據(jù)恢復(fù)到和input source 相同offset后奄容,開始處理最后一次沒有commit成功的epoch數(shù)據(jù)冰更。
上面的狀態(tài)管理與容錯(cuò),對(duì)于用戶來說都是透明的昂勒,用戶代碼邏輯不需要針對(duì)這些做任何事情蜀细。
這種機(jī)制和Storm的ack機(jī)制有點(diǎn)類似。
微批執(zhí)行模式
微批執(zhí)行模式就是Spark Streaming中的離散流執(zhí)行模式戈盈,該模式能夠進(jìn)行dynamic load balancing奠衔、rescaling、緩解慢節(jié)點(diǎn)和細(xì)粒度容錯(cuò)恢復(fù)(無需全局回滾)塘娶。
該模式下epoch一般是幾百毫秒到幾秒鐘归斤,并且每個(gè)epoch就是一個(gè)傳統(tǒng)Spark job組成的 DAG任務(wù)(也就是攢一批數(shù)據(jù),啟動(dòng)一個(gè)DAG任務(wù))刁岸。比如一個(gè)查詢操作在select 后跟隨了一個(gè)aggregation操作官册,實(shí)際執(zhí)行時(shí)就select可能就轉(zhuǎn)換為了一組map task,aggregation轉(zhuǎn)換為了reduce task难捌,這時(shí)這組查詢就對(duì)應(yīng)了map和task組成的DAG膝宁。 reduce中的狀態(tài)數(shù)據(jù)是在內(nèi)存中的,并且會(huì)定期checkpoint到state store中根吁。
使用該模式的優(yōu)點(diǎn):
Dynamic load balancing员淫,因?yàn)槊總€(gè)operator都會(huì)被拆分為很小并且相互獨(dú)立的task,所以他們能夠調(diào)度到任意節(jié)點(diǎn)击敌,所以當(dāng)一個(gè)node執(zhí)行緩慢時(shí)介返,可以在其它節(jié)點(diǎn)啟動(dòng)滿節(jié)點(diǎn)的copy back task來執(zhí)行任務(wù)。
細(xì)粒度容錯(cuò)恢復(fù)沃斤,當(dāng)一個(gè)節(jié)點(diǎn)掛掉圣蝎,只需要重新執(zhí)行這個(gè)節(jié)點(diǎn)的task(因?yàn)樗褪莃atch task),而不用向其它長(zhǎng)時(shí)間存活operator的流處理引擎那樣衡瓶,全局恢復(fù)到上一次檢查點(diǎn)徘公。而且Structured Streaming恢復(fù)時(shí),可以增加并發(fā)度哮针,來加速恢復(fù)关面。
Rescaling,增加和刪除節(jié)點(diǎn)都很簡(jiǎn)單十厢,因?yàn)楫?dāng)下一次epoch dag調(diào)度的時(shí)候等太,只會(huì)調(diào)度到存活的節(jié)點(diǎn)。
規(guī)模和吞吐蛮放,以為直接復(fù)用Spark的批出引擎缩抡,所以spark的所有優(yōu)化都能利用,比如高性能shuffle實(shí)現(xiàn)等包颁。
該模式的主要缺點(diǎn)就是延遲高瞻想,因?yàn)槊看螁?dòng)DAG task時(shí)都存在開銷挎塌。
連續(xù)處理模式
連續(xù)處理模式是Spark 2.3版本時(shí)加入的,Structured Streaming任務(wù)使用long-lived operator來執(zhí)行内边。該模式延遲非常低,但是失去了靈活度待锈,比如限制了運(yùn)行時(shí)的rescaling漠其。
連續(xù)模式設(shè)計(jì)和提出的原因并不是想要提供一種獨(dú)立的執(zhí)行策略,而是想要推廣Structured Streaming API竿音。因?yàn)樵缙诘腟park Streaming API在編程模型上和屎,一些操作對(duì)用戶透露了微批語義,用戶很難將程序移動(dòng)到其它引擎之上春瞬。而Structured Streaming API是與引擎無關(guān)的實(shí)現(xiàn)柴信,能夠遷移到其它引擎之上。
略微牽強(qiáng)宽气,一方面很少見直接將當(dāng)前引擎代碼遷移到其它引擎之上随常,另一方面既然Structured Streaming API已經(jīng)與引擎不是強(qiáng)綁定,運(yùn)行在微批模式不也是ok的萄涯?
在這個(gè)版本的連續(xù)處理模式只支持簡(jiǎn)單類似map操作的任務(wù)绪氛,不支持shuffle操作。在目前最新的Structured Streaming文檔中(好像一直停留在2.4)涝影,該模式還是處于試驗(yàn)階段枣察,并且只能保證at-least-once,算子支持上也還不能支持shuffle類型燃逻。
下面是官網(wǎng)的在該模式下的demo序目,相較mico-batch模式,只是將Trigger修改為連續(xù)模式伯襟。下面的1秒代表猿涨,每秒觸發(fā)一次Checkpoint,F(xiàn)O恢復(fù)時(shí)從上一次Checkpoint恢復(fù)姆怪。
<pre data-language="java" id="ZSynd" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">mport org.apache.spark.sql.streaming.Trigger;
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.trigger(Trigger.Continuous("1 second")) // only change in query
.start();</pre>
有文章說該模式也是使用的Chandy-Lamport分布式快照算法嘿辟,待確定。