本期內(nèi)容
- Exactly Once
- 輸出不重復(fù)
-
事務(wù)
銀行轉(zhuǎn)帳為例心剥,A用戶轉(zhuǎn)賬給B用戶邦尊,B用戶可能收到多筆錢,如何保證事務(wù)的一致性优烧,也就是說事務(wù)輸出蝉揍,能夠輸出且只會輸出一次,即A只轉(zhuǎn)一次畦娄,B只收一次又沾。
從事務(wù)視角解密SparkStreaming架構(gòu):
SparkStreaming應(yīng)用程序啟動,會分配資源熙卡,除非整個集群硬件資源崩潰杖刷,一般情況下都不會有問題。SparkStreaming程序分成兩部分驳癌,一部分是Driver滑燃,另外一部分是Executor。Receiver接收到數(shù)據(jù)后不斷發(fā)送元數(shù)據(jù)給Driver喂柒,Driver接收到元數(shù)據(jù)信息后進(jìn)行CheckPoint處理不瓶。其中CheckPoint包括:Configuration(含有Spark Conf、Spark Streaming等配置信息)灾杰、Block MetaData蚊丐、DStreamGraph、未處理完和等待中的Job艳吠。當(dāng)然Receiver可以在多個Executor節(jié)點的上執(zhí)行Job麦备,Job的執(zhí)行完全基于SparkCore的調(diào)度模式進(jìn)行的。
Executor只有函數(shù)處理邏輯和數(shù)據(jù)昭娩,外部InputStream流入到Receiver中通過BlockManager寫入磁盤凛篙、內(nèi)存、WAL進(jìn)行容錯栏渺。WAL先寫入磁盤然后寫入Executor中呛梆,失敗可能性不大。如果1G數(shù)據(jù)要處理磕诊,Executor一條一條接收填物,Receiver接收數(shù)據(jù)是積累到一定記錄后才會寫入WAL,如果Receiver線程失敗時霎终,數(shù)據(jù)有可能會丟失滞磺。
-
Driver處理元數(shù)據(jù)前會進(jìn)行CheckPoint,SparkStreaming獲取數(shù)據(jù)莱褒、產(chǎn)生作業(yè)击困,但沒有解決執(zhí)行的問題,執(zhí)行一定要經(jīng)過SparkContext广凸。Dirver級別的數(shù)據(jù)修復(fù)從Driver CheckPoint中需要把數(shù)據(jù)讀入阅茶,在其內(nèi)部會重新構(gòu)建SparkContext、StreamingContext炮障、SparkJob目派,再提交Spark集群運(yùn)行。Receiver的重新恢復(fù)時會通過磁盤的WAL從磁盤恢復(fù)過來胁赢。
SparkStreaming和Kafka結(jié)合不會出現(xiàn)WAL數(shù)據(jù)丟失的問題企蹭,SparkStreaming必須考慮外部流水線的方式處理。
怎么能完成完整的語義智末、事務(wù)的一致性谅摄,保證數(shù)據(jù)的零丟失,Exactly Once的事務(wù)處理:
怎么保證數(shù)據(jù)零丟失系馆?
必須要有可靠的數(shù)據(jù)來源和可靠的Receiver送漠、整個應(yīng)用程序的MetaData必須進(jìn)行CheckPoint、通過WAL來保證數(shù)據(jù)安全(生產(chǎn)環(huán)境下Receiver接收Kafka的數(shù)據(jù)由蘑,默認(rèn)情況下會在Executor中存在二份數(shù)據(jù)闽寡,且默認(rèn)情況下必須二份數(shù)據(jù)備份后才進(jìn)行計算代兵;如果Receiver接收數(shù)據(jù)時崩潰,沒有Copy副本爷狈,此時會重新從Kafka中進(jìn)行Copy植影,Copy的依據(jù)是zookeeper元數(shù)據(jù))。
大家可以將Kafka看作是一個簡單的文件存儲系統(tǒng)涎永,在Executor中Receiver確定受到Kafka的每一條記錄后進(jìn)行Replication到其他Executor成功后會通過ack向Kafka發(fā)送確認(rèn)收到的信息并繼續(xù)從Kafka中讀取下一條信息思币。-
Driver容錯如下圖所示:
-
再次思考數(shù)據(jù)在哪些地方可能丟失?
在Receiver收到數(shù)據(jù)且通過Driver的調(diào)度Executor開始計算數(shù)據(jù)的時候如果Driver突然崩潰羡微,則此時Executor會被Kill掉(Driver崩潰會導(dǎo)致Executor會被Kill掉)谷饿,那么Executor中的數(shù)據(jù)就會丟失,此時就必須通過例如WAL機(jī)制讓所有的數(shù)據(jù)通過例如HDFS的方式首先進(jìn)行安全性容錯處理妈倔,此時如果Executor中的數(shù)據(jù)丟失的話就可以通過WAL機(jī)制恢復(fù)回來博投。
數(shù)據(jù)的處理怎么保證有且僅有被處理一次?(重要)
數(shù)據(jù)零丟失并不能保證Exactly Once启涯,如果Receiver接收且保存起來后沒來得及更新updateOffsets時贬堵,就會導(dǎo)致數(shù)據(jù)被重復(fù)處理(重復(fù)消費)。
更詳細(xì)的說明數(shù)據(jù)重復(fù)讀取的場景:
在Receiver收到數(shù)據(jù)且保存到了HDFS等持久化引擎但是沒有來得及進(jìn)行updateOffsets结洼,此時Receiver崩潰后重新啟動后就會從管理Kafka的ZooKeeper中再次讀取元數(shù)據(jù)從而導(dǎo)致重復(fù)讀取元數(shù)據(jù)黎做;從SparkStreaming來看是成功的,但是Kafka認(rèn)為是失敗的(因為Receiver崩潰時沒有及時更新offsets到ZooKeeper中)重新恢復(fù)時會重新消費一次松忍,此時會導(dǎo)致數(shù)據(jù)重新消費的情況蒸殿。
性能補(bǔ)充:
通過WAL方式保證數(shù)據(jù)不丟失,但弊端是通過WAL方式會極大的損傷SparkStreaming中的Receiver接收數(shù)據(jù)的性能(現(xiàn)網(wǎng)生產(chǎn)環(huán)境通常會Kafka direct API直接處理)鸣峭。
需要注意到是:如果通過Kafka作為數(shù)據(jù)來源的話宏所,Kafka中有數(shù)據(jù),然后Receiver接受數(shù)據(jù)的時候又會有數(shù)據(jù)副本摊溶,這個時候其實是存儲資源的浪費爬骤。(重復(fù)讀取數(shù)據(jù)解決辦法,讀取數(shù)據(jù)時可以將元數(shù)據(jù)信息放入內(nèi)存數(shù)據(jù)庫中莫换,再次計算時檢查元數(shù)據(jù)是否被計算過)霞玄。Spark1.3的時候為了避免WAL的性能損失和實現(xiàn)Exactly Once而提供了Kafka direct API,把Kafka作為文件存儲系統(tǒng)@辍?谰纭!此時Kafka兼具有流的優(yōu)勢和文件系統(tǒng)的優(yōu)勢喊暖,至此惫企,Spark Streaming+Kafka就構(gòu)建了完美的流處理世界!A赀础狞尔!
數(shù)據(jù)不需要copy副本丛版,不需要WAL性能損耗,不需要Receiver偏序,而直接通過kafka direct api直接消費數(shù)據(jù)硼婿,所有的Executors通過kafka api直接消費數(shù)據(jù),直接管理offset禽车,所以也不會重復(fù)消費數(shù)據(jù);事務(wù)實現(xiàn)啦?场Q乘ぁ!
最后一個問題,關(guān)于Spark Streaming數(shù)據(jù)輸出多次重寫及解決方案:
為什么會有這個問題记焊,因為SparkStreaming在計算的時候基于SparkCore逸月,SparkCore天生會做以下事情導(dǎo)致SparkStreaming的結(jié)果(部分)重復(fù)輸出:
1.Task重試;
2.慢任務(wù)推測遍膜;
3.Stage重復(fù)碗硬;
4.Job重試;
會導(dǎo)致數(shù)據(jù)的丟失瓢颅。
對應(yīng)的解決方案:
1.一個任務(wù)失敗就是job 失敗恩尾,設(shè)置spark.task.maxFailures次數(shù)為1;
2.設(shè)置spark.speculation為關(guān)閉狀態(tài)(因為慢任務(wù)推測其實非常消耗性能挽懦,所以關(guān)閉后可以顯著的提高Spark Streaming處理性能)
3.Spark streaming on kafka的話翰意,假如job失敗后可以設(shè)置kafka的auto.offset.reset為largest的方式會自動恢復(fù)job的執(zhí)行。
最后再次強(qiáng)調(diào):
可以通過transform和foreachRDD基于業(yè)務(wù)邏輯代碼進(jìn)行邏輯控制來實現(xiàn)數(shù)據(jù)不重復(fù)消費和輸出不重復(fù)信柿!這二個方法類似于Spark Streaming的后門冀偶,可以做任意想象的控制操作!