本期內(nèi)容:
1. Exactly once容錯
2. 數(shù)據(jù)輸出不重復
一. Spark Streaming中的事務場景 :
在Spark Streaming中事務是指能夠處理且只處理一次,能夠輸出且只輸出一次.
以銀行轉(zhuǎn)帳一次為例臊岸,A用戶轉(zhuǎn)賬給B用戶疮茄,事務的一致性就是保證A用戶能夠轉(zhuǎn)出且只能轉(zhuǎn)出一次,B用戶能夠收到且只能收到一次丰介。
二. Exactly once容錯:
事務處理中如何保證能夠處理且只能處理一次舵抹,數(shù)據(jù)能夠輸出且只能輸出一次肪虎。
數(shù)據(jù)丟失的主要場景如下:
在Receiver收到數(shù)據(jù)且通過Driver的調(diào)度,Executor開始計算數(shù)據(jù)的時候如果Driver突然奔潰(導致Executor會被Kill掉)惧蛹,此時Executor會被Kill掉扇救,那么Executor中的數(shù)據(jù)就會丟失。
1. 事務處理如下圖 :
事務處理過程解析 :
01. InputStream : 輸入數(shù)據(jù) 香嗓;
02. Executor : 通過Receiver接收數(shù)據(jù)迅腔,當接收到數(shù)據(jù)后向Driver 匯報 ;
03. Driver : 通過StreamingContext接收到數(shù)據(jù)會啟動Job進行操作 靠娱;
2. 解決事務源數(shù)據(jù)接收的安全性 :
事務處理解析 :
01. Executor : 在Receiver接收來自Kafka數(shù)據(jù)首先通過BlockManager寫入內(nèi)存+磁盤或者通過WAL來保證數(shù)據(jù)的安全性沧烈;
02. Executor : 通過Replication完成后產(chǎn)生Ack信號;
03. Kafka : 確定收信息并讀取下一條數(shù)據(jù)像云,Kafka才會進行updateOffsets操作 锌雀;
04. 通過WAL機制讓所有的數(shù)據(jù)通過類似HDFS的方式進行安全性容錯處理,從而解決Executor被Kill掉后導致數(shù)據(jù)丟失可以通過WAL機制恢復回來迅诬。
3. 解決Driver數(shù)據(jù)輸出的安全性 :
數(shù)據(jù)的處理怎么保證有且僅有被處理一次腋逆?
數(shù)據(jù)零丟失并不能保證Exactly Once,如果Receiver接收且保存起來后沒來得及更新updateOffsets時侈贷,就會導致數(shù)據(jù)被重復處理惩歉。
01. 通過StreamingContext接收數(shù)據(jù)通過CheckPoint進行容錯 ;
02. logging the updates : 通過記錄跟蹤所有生成RDD的轉(zhuǎn)換(transformations)也就是記錄每個RDD的lineage(血統(tǒng))來重新計算生成丟失的分區(qū)數(shù)據(jù) ;
4. Exactly Once的事務處理 :
01柬泽、 數(shù)據(jù)零丟失:必須有可靠的數(shù)據(jù)來源和可靠的Receiver慎菲,且整個應用程序的metadata必須進行checkpoint,且通過WAL來保證數(shù)據(jù)安全锨并;
02露该、Spark Streaming 1.3的時候為了避免WAL的性能損失和實現(xiàn)Exactly Once而提供了Kafka Direct API,把Kafka作為文件存儲系統(tǒng)5谥蟆解幼!
03、此時兼具有流的優(yōu)勢和文件系統(tǒng)的優(yōu)勢包警,Spark Streaming+Kafka就構建了完美的流處理世界D彀凇!害晦!
04特铝、 數(shù)據(jù)不需要copy副本,不需要WAL性能損耗壹瘟,不需要Receiver鲫剿,所有的Executors直接通過kafka direct api直接消費數(shù)據(jù),直接管理Offset稻轨,所以也不會重復消費數(shù)據(jù)灵莲;
三. Spark Streaming數(shù)據(jù)輸出多次重寫及解決方案:
1、 為什么會有這個問題殴俱,因為SparkStreaming在計算的時候基于SparkCore政冻,SparkCore天生會做以下事情導致SparkStreaming的結(jié)果(部分)重復輸出:
1、Task重試线欲;
2明场、慢任務推測;
3询筏、Stage重復榕堰;
4、Job重試嫌套;
等會導致數(shù)據(jù)的丟失。
2圾旨、 對應的解決方案:
1踱讨、一個任務失敗就是job 失敗,設置spark.task.maxFailures次數(shù)為1砍的;
2痹筛、設置spark.speculation為關閉狀態(tài)(因為慢任務推測其實非常消耗性能,所以關閉后可以顯著的提高Spark Streaming處理性能)
3、Spark streaming on kafka的話帚稠,假如job失敗后可以設置kafka的auto.offset.reset為largest的方式會自動恢復job的執(zhí)行谣旁。
最后再次強調(diào):
可以通過transform和foreachRDD基于業(yè)務邏輯代碼進行邏輯控制來實現(xiàn)數(shù)據(jù)不重復消費和輸出不重復!這二個方法類似于spark s的后門滋早,可以做任意想象的控制操作榄审!
備注:
資料來源于:DT_大數(shù)據(jù)夢工廠
更多私密內(nèi)容,請關注微信公眾號:DT_Spark
新浪微博:http://www.weibo.com/ilovepains
如果您對大數(shù)據(jù)Spark感興趣杆麸,可以免費聽由王家林老師每天晚上20:00開設的Spark永久免費公開課搁进,地址YY房間號:68917580