作者 | yjshen缅刽,Anonymitaet編輯 | Linda批流現(xiàn)狀
在大規(guī)模并行數(shù)據(jù)分析領(lǐng)域藤滥,AMPLab 的『One stack to rule them all』提出用 Apache Spark 作為統(tǒng)一的引擎支持批處理底燎、流處理刃榨、交互查詢和機(jī)器學(xué)習(xí)等常見的數(shù)據(jù)處理場景弹砚。 2017 年 7 月双仍,Spark 2.2.0 版本正式推出的 Spark structured streaming 將 Spark SQL 作為流處理、批處理底層統(tǒng)一的執(zhí)行引擎桌吃,提供對無界表(無邊界的源源不斷到達(dá)的流數(shù)據(jù))和有界表(靜態(tài)歷史數(shù)據(jù))的優(yōu)化查詢朱沃,而向用戶提供 Dataset/DataFrame API 對批流數(shù)據(jù)聯(lián)合處理,進(jìn)一步模糊了批流數(shù)據(jù)處理的邊界茅诱。
另一方面逗物,Apache Flink 在 2016 年左右進(jìn)入大眾視野,憑借其當(dāng)時更優(yōu)的流處理引擎瑟俭,原生的 Watermark 支持『Exaclty Once』的數(shù)據(jù)一致性保證翎卓,和批流一體計算等各種場景的支持,成為 Spark 的有力挑戰(zhàn)者摆寄。無論是使用 Spark 還是 Flink失暴,用戶真正關(guān)心的是如何更好地使用數(shù)據(jù),更快地挖掘數(shù)據(jù)中的價值微饥,流數(shù)據(jù)和靜態(tài)數(shù)據(jù)不再是分離的個體逗扒,而是一份數(shù)據(jù)的兩種不同表征方式。
然而在實踐中欠橘,構(gòu)建一個批流一體的數(shù)據(jù)平臺并不只是計算引擎層的任務(wù)矩肩。因為在傳統(tǒng)解決方案中,近實時的流肃续、事件數(shù)據(jù)通常采用消息隊列(例如 RabbitMQ)黍檩、實時數(shù)據(jù)管道(例如 Apache Kafka)存儲,而批處理所需要的靜態(tài)數(shù)據(jù)通常使用文件系統(tǒng)始锚、對象存儲進(jìn)行保存建炫。這就意味著,一方面疼蛾,在數(shù)據(jù)分析過程中肛跌,為了保證結(jié)果的正確性和實時性,需要對分別存儲在兩類系統(tǒng)中數(shù)據(jù)進(jìn)行聯(lián)合查詢;另一方面衍慎,在運維過程中转唉,需要定期將流數(shù)據(jù)轉(zhuǎn)存到文件 / 對象存儲中,通過維持流形式的數(shù)據(jù)總量在閾值之下來保證消息隊列稳捆、數(shù)據(jù)管道的性能(因為這類系統(tǒng)的以分區(qū)為主的架構(gòu)設(shè)計緊耦合了消息服務(wù)和消息存儲赠法,而且多數(shù)都太過依賴文件系統(tǒng),隨著數(shù)據(jù)量的增加乔夯,系統(tǒng)性能會急劇下降)砖织,但人為的數(shù)據(jù)搬遷不但會提升系統(tǒng)的運維成本,而且搬遷過程中的數(shù)據(jù)清洗末荐、讀取侧纯、加載也是對集群資源的巨大消耗。
與此同時甲脏,從 Mesos 和 YARN 的流行眶熬、Docker 的興起到現(xiàn)在的 Kubernetes 被廣泛采用,整個基礎(chǔ)架構(gòu)正在全面地向容器化方向發(fā)展块请,傳統(tǒng)緊耦合消息服務(wù)和消息計算的架構(gòu)并不能很好地適應(yīng)容器化的架構(gòu)娜氏。以 Kafka 為例,其以分區(qū)為中心的架構(gòu)緊耦合了消息服務(wù)和消息存儲墩新。Kafka 的分區(qū)與一臺或者一組物理機(jī)強(qiáng)綁定贸弥,這帶來的問題是在機(jī)器失效或集群擴(kuò)容中,需要進(jìn)行昂貴且漫長的分區(qū)數(shù)據(jù)重新均衡的過程海渊;其以分區(qū)為粒度的存儲設(shè)計也不能很好利用已有的云存儲資源绵疲;此外,過于簡單的設(shè)計導(dǎo)致其為了進(jìn)行容器化需要解決多租戶管理切省、IO 隔離等方面很多架構(gòu)上的缺陷最岗。
Pulsar 簡介
Apache Pulsar 是一個多租戶、高性能的企業(yè)級消息發(fā)布訂閱系統(tǒng)朝捆,最初由 Yahoo 研發(fā)般渡, 2018 年 9 月從 Apache 孵化器畢業(yè),成為 Apache 基金會的頂級開源項目芙盘。Pulsar 基于發(fā)布訂閱模式(pub-sub)構(gòu)建驯用,生產(chǎn)者(producer)發(fā)布消息(message)到主題(topic),消費者可以訂閱主題儒老,處理收到的消息蝴乔,并在消息處理完成后發(fā)送確認(rèn)(Ack)。Pulsar 提供了四種訂閱類型驮樊,它們可以共存在同一個主題上薇正,以訂閱名進(jìn)行區(qū)分:
獨享(exclusive)訂閱——一個訂閱名下同時只能有一個消費者片酝。
共享(shared)訂閱——可以由多個消費者訂閱,每個消費者接收其中一部分消息挖腰。
失效備援(failover)訂閱——允許多個消費者連接到同一個主題雕沿,但只有一個消費者能夠接收消息。只有在當(dāng)前消費者發(fā)生失效時猴仑,其他消費者才開始接收消息审轮。
鍵劃分(key-shared)訂閱(測試版功能)——多個消費者連接到同一主題,相同 Key 總會發(fā)送給同一個消費者辽俗。
Pulsar 從設(shè)計之初就支持多租戶(multi-tenancy)的概念疾渣,租戶(tenant)可以橫跨多個集群(clusters),每個租戶都有其認(rèn)證和鑒權(quán)方式崖飘,租戶也是存儲配額榴捡、消息生存時間(TTL)和隔離策略的管理單元。Pulsar 多租戶的特性可以在 topic URL 上得到充分體現(xiàn)坐漏,其結(jié)構(gòu)是 persistent://tenant/namespace/topic
薄疚。命名空間(namespace)是 Pulsar 中最基本的管理單元碧信,我們可以設(shè)置權(quán)限赊琳、調(diào)整復(fù)制選項、管理跨集群的數(shù)據(jù)復(fù)制砰碴、控制消息的過期時間或執(zhí)行其他關(guān)鍵任務(wù)躏筏。
Pulsar 獨特架構(gòu)
Pulsar 和其他消息系統(tǒng)的最根本區(qū)別在于其采用計算和存儲分離的分層架構(gòu)。Pulsar 集群由兩層組成:無狀態(tài)服務(wù)層呈枉,它由一組接受和傳遞消息的 broker 組成趁尼;分布式存儲層,它由一組名為 bookies 的 Apache BookKeeper 存儲節(jié)點組成猖辫,具備高可用酥泞、強(qiáng)一致、低延時的特點啃憎。
和 Kafka 一樣芝囤,Pulsar 也是基于主題分區(qū)(Topic partition)的邏輯概念進(jìn)行主題數(shù)據(jù)的存儲。不同的是辛萍,Kafka 的物理存儲也是以分區(qū)為單位悯姊,每個 partition 必須作為一個整體(一個目錄)被存儲在一個 broker 上,而 Pulsar 的每個主題分區(qū)本質(zhì)上都是存儲在 BookKeeper 上的分布式日志贩毕,每個日志又被分成分段(Segment)悯许。每個 Segment 作為 BookKeeper 上的一個 Ledger,均勻分布并存儲在多個 bookie 中辉阶。存儲分層的架構(gòu)和以 Segment 為中心的分片存儲是 Pulsar 的兩個關(guān)鍵設(shè)計理念先壕。以此為基礎(chǔ)為 Pulsar 提供了很多重要的優(yōu)勢:無限制的主題分區(qū)瘩扼、存儲即時擴(kuò)展,無需數(shù)據(jù)遷移 垃僚、無縫 broker 故障恢復(fù)邢隧、無縫集群擴(kuò)展、無縫的存儲(Bookie)故障恢復(fù)和獨立的可擴(kuò)展性冈在。
消息系統(tǒng)解耦了生產(chǎn)者與消費者倒慧,但實際的消息本質(zhì)上仍是有結(jié)構(gòu)的,因此生產(chǎn)者和消費者之間需要一種協(xié)調(diào)機(jī)制包券,達(dá)到生產(chǎn)纫谅、消費過程中對消息結(jié)構(gòu)的共識,以達(dá)到類型安全的目的溅固。Pulsar 有內(nèi)置的 Schema 注冊方式在消息系統(tǒng)端提供傳輸消息類型約定的方式付秕,客戶端可以通過上傳 Schema 來約定主題級別的消息類型信息,而由 Pulsar 負(fù)責(zé)消息的類型檢查和有類型消息的自動序列化侍郭、反序列化询吴,從而降低多應(yīng)用間的消息解析代碼反復(fù)開發(fā)、維護(hù)的成本亮元。當(dāng)然猛计,Schema 定義與類型安全是一種可選的機(jī)制,并不會給非類型化消息的發(fā)布爆捞、消費產(chǎn)生任何性能開銷奉瘤。
在 Spark 中實現(xiàn)對 Pulsar 數(shù)據(jù)的讀寫
自 Spark 2.2 版本 Structured Streaming 正式發(fā)布,Spark 只保留了 SparkSession
作為主程序入口煮甥,你只需編寫 DataSet/DataFrame API 程序盗温,以聲明形式對數(shù)據(jù)的操作,而將具體的查詢優(yōu)化與批流處理執(zhí)行的細(xì)節(jié)交由 Spark SQL 引擎進(jìn)行處理成肘。對于一個數(shù)據(jù)處理作業(yè)卖局,需要定義 DataFrame 的產(chǎn)生、變換和寫出三個部分双霍,而將 Pulsar 作為流數(shù)據(jù)平臺與 Spark 進(jìn)行集成正是要解決如何從 Pulsar 中讀取數(shù)據(jù)(Source)和如何向 Pulsar 寫出運算結(jié)果(Sink)兩個問題砚偶。
為了實現(xiàn)以 Pulsar 為源讀取批流數(shù)據(jù)與支持批流數(shù)據(jù)向 Pulsar 的寫入,我們構(gòu)建了 Spark Pulsar Connector店煞。
對 Structured Streaming 的支持
上圖展示了 Structured Streaming(以下簡稱 SS )的主要組件:
輸入和輸出——為了提供細(xì)粒度的容錯蟹演,SS 要求輸入數(shù)據(jù)源(Source)是可重放(replayable)的;為了提供端到端的 Exactly-Once 的語義顷蟀,需要輸出(Sink)支持冪等寫出(一條消息被多次寫入與一次寫入效果一致酒请,可由 DBMS、KV 系統(tǒng)通過鍵約束的方式支持)鸣个。
API——用戶通過編寫 Spark SQL 的 batch API(SQL 或 DataFrame)指定對一個或多個流羞反、表的查詢布朦,并定義一個輸出表保存所有的輸出結(jié)果,而引擎內(nèi)部決定如何將結(jié)果增量地寫到 Sink 中昼窗。為了支持流處理是趴,SS 在原有的 Spark SQL API 上添加了一些接口:
觸發(fā)器(Trigger)——控制引擎觸發(fā)流處理執(zhí)行、在 Sink 中更新結(jié)果的頻率澄惊。
水印機(jī)制(Watermark policy)——用戶通過指定字段做 event time唆途,來決定對晚到數(shù)據(jù)的處理。
有狀態(tài)算子(Stateful operator)——用戶可以根據(jù) Key 跟蹤和更新算子內(nèi)部的可變狀態(tài)掸驱,完成復(fù)雜的業(yè)務(wù)需求(例如肛搬,基于會話的窗口)。
執(zhí)行層——當(dāng)收到一個查詢時毕贼,SS 決定它的增量執(zhí)行方式温赔,進(jìn)行優(yōu)化、并開始執(zhí)行鬼癣。SS 有兩種可選的執(zhí)行模型:
Microbatch model(微批處理模式)——默認(rèn)的執(zhí)行方式陶贼,與 Spark Streaming 的 DStream 類似,將流切成 micro batch待秃,對每個 batch 分別處理拜秧。這種模式支持動態(tài)負(fù)載均衡、故障恢復(fù)等機(jī)制锥余,適合將吞吐率作為主要性能指標(biāo)的應(yīng)用腹纳。
Continuous mode(持續(xù)模式)——在集群上啟動長時間運行的算子痢掠,適合處理較為簡單驱犹、延遲敏感類應(yīng)用。
Log 和 State Store —— SS 利用兩種持久化存儲來提供容錯保障:一個 Write-ahead-Log(WAL)足画,記錄被成功消費且持久化寫出的每個數(shù)據(jù)源中的位置雄驹;一個大規(guī)模的 state store, 存儲長期運行的聚集算子內(nèi)部的狀態(tài)快照淹辞。當(dāng)故障發(fā)生時医舆,SS 會根據(jù)快照的位置,通過重放之后的消息完成流處理狀態(tài)的恢復(fù)象缀。
具體到源碼層面蔬将,Source 接口定義了可重放數(shù)據(jù)源需要提供的功能。
trait Source { def schema: StructType def getOffset: Option[Offset] def getBatch(start: Option[Offset], end: Offset): DataFrame def commit(end: Offset): Unit def stop(): Unit}trait Sink { def addBatch(batchId: Long, data: DataFrame): Unit}
以 microbatch 執(zhí)行模式為例:
在每個 microbatch 的最開始央星,SS 會向 source 詢問當(dāng)前的最新進(jìn)度(
getOffset
)霞怀,并將其持久化到 WAL 中。隨后莉给,source 根據(jù) SS 提供的
start
end
偏移量毙石,提供區(qū)間范圍的數(shù)據(jù)(getBatch
)廉沮。SS 觸發(fā)計算邏輯的優(yōu)化和編譯,把計算結(jié)果寫出給 sink(addBatch)徐矩,這時才觸發(fā)實際的取數(shù)據(jù)操作以及計算過程滞时。
在數(shù)據(jù)完整寫出到 sink 后,SS 通知 source 可以廢棄數(shù)據(jù)(
commit
)滤灯,并將成功執(zhí)行的batchId
寫入內(nèi)部維護(hù)的 commitLog 中坪稽。
具體到 Pulsar 的 connector 實現(xiàn)中:
在所有批次開始執(zhí)行前,SS 會調(diào)用 schema 方法返回消息的結(jié)構(gòu)信息鳞骤,在 schema 方法內(nèi)部刽漂,我們從 Pulsar 的 Schema Registry 提取出所有主題的 Schema,并進(jìn)行一致性檢查弟孟。
隨后贝咙,我們?yōu)槊總€主題分區(qū)創(chuàng)建一個消費者,按照 (start, end] 返回主題分區(qū)中的數(shù)據(jù)拂募。
-
當(dāng)收到 SS 的 commit 通知時庭猩,通過
topics
中的resetCursor
向 Pulsar 標(biāo)志消息消費的完成。Sink 中構(gòu)建的生產(chǎn)者則將 addBatch 中獲取的實際數(shù)據(jù)以消息形式追加寫入相應(yīng)的主題中陈症。對批處理作業(yè)的支持
在某個時間點執(zhí)行的批作業(yè)蔼水,可以看作是對 Pulsar 平臺中的流數(shù)據(jù)在一個時間點的快照進(jìn)行的數(shù)據(jù)分析。Spark 對歷史數(shù)據(jù)的查詢是以 Relation 為單位录肯,Spark Pulsar Connector 提供 createRelation
方法的實現(xiàn)根據(jù)用戶指定的多個主題分區(qū)構(gòu)建表趴腋,并返回包含 Schema 信息的 DataSet。在查詢計劃階段论咏,Connector 的功能分成兩步:首先优炬,根據(jù)用戶提供的一個或多個主題,在 Pulsar Schema Registry 中查找主題 Schema厅贪,并檢查多個主題 Schema 的一致性蠢护;其次,將用戶指定的所有主題分區(qū)進(jìn)行任務(wù)劃分(Partition)养涮,得到的分片即是 Spark source task 的執(zhí)行粒度葵硕。
Pulsar 提供了兩層的接口對其中的數(shù)據(jù)進(jìn)行訪問,基于主題分區(qū)的 Consumer/Reader 接口贯吓,以傳統(tǒng)消息接收為語義的順序數(shù)據(jù)讀刃赴肌;Segment 級的讀接口悄谐,提供對 Segment 數(shù)據(jù)的直接讀取介评。因此,相應(yīng)地從 Pulsar 讀數(shù)據(jù)執(zhí)行批作業(yè)可以分成兩種粒度(即讀取數(shù)據(jù)的并行度)進(jìn)行:以主題分區(qū)為粒度(每個主題分區(qū)作為一個分片)尊沸;以 Segment 為粒度(將一個主題分區(qū)的多個 Segment 組織成一個分片威沫,因此一個主題分區(qū)會有多個對應(yīng)的分片)贤惯。你可以按照批作業(yè)的并行度需求和可分配計算資源選擇合適的消息讀取的并行粒度。另一方面棒掠,將批作業(yè)的執(zhí)行存儲到 Pulsar 也很直觀孵构,你只需指定寫入的主題和消息路由規(guī)則(RoundRobin 或者按 Key 劃分),在 Sink task 中創(chuàng)建的每個生產(chǎn)者會將待寫出的消息送至對應(yīng)的主題分區(qū)烟很。
如何使用 Spark Pulsar Connector
根據(jù)一個或多個主題創(chuàng)建流處理 Source颈墅。
val df = spark .readStream .format("pulsar") .option("service.url", "pulsar://localhost:6650") .option("admin.url", "http://localhost:8080") .option("topicsPattern", "topic.*") // Subscribe to a pattern // .option("topics", "topic1,topic2") // Subscribe to multiple topics // .option("topic", "topic1"). //subscribe to a single topic .option("startingOffsets", startingOffsets) .load()df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]
構(gòu)建批處理 Source。
val df = spark .read .format("pulsar") .option("service.url", "pulsar://localhost:6650") .option("admin.url", "http://localhost:8080") .option("topicsPattern", "topic.*") .option("startingOffsets", "earliest") .option("endingOffsets", "latest") .load()df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]
使用數(shù)據(jù)中本身的 topic 字段向多個主題進(jìn)行持續(xù) Sink雾袱。
val ds = df .selectExpr("topic", "CAST(__key AS STRING)", "CAST(value AS STRING)") .writeStream .format("pulsar") .option("service.url", "pulsar://localhost:6650") .start()
將批處理結(jié)果寫回 Pulsar恤筛。
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)") .write .format("pulsar") .option("service.url", "pulsar://localhost:6650") .option("topic", "topic1") .save()
注意
由于 Spark Pulsar Connector 支持結(jié)構(gòu)化消息的消費和寫入,為了避免消息負(fù)載中字段和消息元數(shù)據(jù)(event time芹橡、publish time肠虽、key 和 messageId)的潛在命名沖突吝秕,消息元數(shù)據(jù)字段在 Spark schema 中以雙下劃線做為前綴(例如,__eventTime)。
參考資料
-
Structured Streaming: A Declarative API for Real-Time Applications in Apache Spark
https://cs.stanford.edu/~matei/papers/2018/sigmod_structured_streaming.pdf
-
Structured Streaming 源碼解析系列
Pulsar 官網(wǎng) https://pulsar.apache.org/