在Kafka张抄、Storm孤页、Flink、Spark Streaming等分布式流處理系統(tǒng)中(沒錯奕纫,Kafka本質(zhì)上是流處理系統(tǒng)提陶,不是單純的“消息隊列”),存在三種消息傳遞語義(message delivery semantics)匹层,分別是:
- at least once:每條消息會被收到1次或多次隙笆。例如發(fā)送方S在超時時間內(nèi)沒有收到接收方R的通知(如ack),或者收到了R的報錯升筏,就會不斷重發(fā)消息直至R傳回ack撑柔。
- at most once:每條消息會被收到0次或1次。也就是說S只負責(zé)向R發(fā)送消息您访,R也沒有任何通知機制铅忿。無論R最終是否收到,S都不會重發(fā)洋只。
- exactly once:是上面兩個的綜合辆沦,保證S發(fā)送的每一條消息,R都會“不重不漏”地恰好收到1次识虚。它是最強最精確的語義肢扯,也最難實現(xiàn)。
在我們的日常工作中担锤,90%的流處理業(yè)務(wù)都是通過Kafka+Spark Streaming+HDFS來實現(xiàn)的(這里Kafka的作用是消息隊列了)蔚晨。本篇探討保證exactly once語義的方法。
如上面的圖所示肛循,一個Spark Streaming程序由三步組成:輸入铭腕、處理邏輯、輸出多糠。要達到exactly once的理想狀態(tài)累舷,需要三步協(xié)同進行,而不是只與處理邏輯有關(guān)夹孔。Kafka與Spark Streaming集成時有兩種方法:舊的基于receiver的方法被盈,新的基于direct stream的方法。下面兩張圖可以清楚地說明搭伤。
-
基于receiver的方法
基于receiver的方法采用Kafka的高級消費者API只怎,每個executor進程都不斷拉取消息,并同時保存在executor內(nèi)存與HDFS上的預(yù)寫日志(write-ahead log/WAL)怜俐。當(dāng)消息寫入WAL后身堡,自動更新ZooKeeper中的offset。
它可以保證at least once語義拍鲤,但無法保證exactly once語義贴谎。雖然引入了WAL來確保消息不會丟失汞扎,但還有可能會出現(xiàn)消息已經(jīng)寫入WAL,但offset更新失敗的情況擅这,Kafka就會按上一次的offset重新發(fā)送消息佩捞。這種方式還會造成數(shù)據(jù)冗余(Kafka broker中一份,Spark executor中一份)蕾哟,使吞吐量和內(nèi)存利用率降低。現(xiàn)在基本都使用下面基于direct stream的方法了莲蜘。 -
基于direct stream的方法
基于direct stream的方法采用Kafka的簡單消費者API谭确,它的流程大大簡化了。executor不再從Kafka中連續(xù)讀取消息票渠,也消除了receiver和WAL逐哈。還有一個改進就是Kafka分區(qū)與RDD分區(qū)是一一對應(yīng)的,更可控问顷。
driver進程只需要每次從Kafka獲得批次消息的offset range昂秃,然后executor進程根據(jù)offset range去讀取該批次對應(yīng)的消息即可。由于offset在Kafka中能唯一確定一條消息杜窄,且在外部只能被Streaming程序本身感知到肠骆,因此消除了不一致性,達到了exactly once塞耕。
不過蚀腿,由于它采用了簡單消費者API,我們就需要自己來管理offset扫外。否則一旦程序崩潰莉钙,整個流只能從earliest或者latest點恢復(fù),這肯定是不穩(wěn)妥的筛谚。offset管理在之前的文章中提到過磁玉,這里不再贅述。
Kafka作為輸入源可以保證exactly once驾讲,那么處理邏輯呢蚊伞?答案是顯然的,Spark Streaming的處理邏輯天生具備exactly once語義蝎毡。
Spark RDD之所以被稱為“彈性分布式數(shù)據(jù)集”厚柳,是因為它具有不可變、可分區(qū)沐兵、可并行計算别垮、容錯的特征。一個RDD只能由穩(wěn)定的數(shù)據(jù)集生成扎谎,或者從其他RDD轉(zhuǎn)換(transform)得來碳想。如果在執(zhí)行RDD lineage的過程中失敗烧董,那么只要源數(shù)據(jù)不發(fā)生變化,無論重新執(zhí)行多少次lineage胧奔,都一定會得到同樣的逊移、確定的結(jié)果。
最后龙填,我們還需要保證輸出過程也符合exactly once語義胳泉。Spark Streaming的輸出一般是靠foreachRDD()算子來實現(xiàn),它默認是at least once的岩遗。如果輸出過程中途出錯扇商,那么就會重復(fù)執(zhí)行直到寫入成功。為了讓它符合exactly once宿礁,可以施加兩種限制之一:冪等性寫入(idempotent write)案铺、事務(wù)性寫入(transactional write)。
-
冪等性寫入
冪等原來是數(shù)學(xué)里的概念梆靖,即f(f(x))=f(x)控汉。冪等寫入就是寫入多次與寫入一次的結(jié)果完全相同,可以自動將at least once轉(zhuǎn)化為exactly once返吻。這對于自帶主鍵或主鍵組的業(yè)務(wù)比較合適(比如各類日志姑子、MySQL binlog等),并且實現(xiàn)起來比較簡單测僵。
但是它要求處理邏輯是map-only的壁酬,也就是只能包含轉(zhuǎn)換、過濾等操作恨课,不能包含shuffle舆乔、聚合等操作。如果條件更嚴格剂公,就只能采用事務(wù)性寫入方法希俩。
stream.foreachRDD { rdd =>
rdd.foreachPartition { iter =>
// make sure connection pool is set up on the executor before writing
SetupJdbc(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
iter.foreach { case (key, msg) =>
DB.autoCommit { implicit session =>
// the unique key for idempotency is just the text of the message itself, for example purposes
sql"insert into idem_data(msg) values (${msg})".update.apply
}
}
}
}
-
事務(wù)性寫入
這里的事務(wù)與DBMS中的事務(wù)含義基本相同,就是對數(shù)據(jù)進行一系列訪問與更新操作所組成的邏輯塊纲辽。為了符合事務(wù)的ACID特性(https://en.wikipedia.org/wiki/ACID_(computer_science))颜武,必須引入一個唯一ID標(biāo)識當(dāng)前的處理邏輯,并且將計算結(jié)果與該ID一起落盤拖吼。ID可以由主題鳞上、分區(qū)、時間吊档、offset等共同組成篙议。
事務(wù)操作可以在foreachRDD()時進行。如果數(shù)據(jù)寫入失敗,或者offset寫入與當(dāng)前offset range不匹配鬼贱,那么這一批次數(shù)據(jù)都將失敗并且回滾移怯。
// localTx is transactional, if metric update or offset update fails, neither will be committed
DB.localTx { implicit session =>
// store metric data
val metricRows = sql"""
update txn_data set metric = metric + ${metric}
where topic = ${osr.topic}
""".update.apply()
if (metricRows != 1) {
throw new Exception("...")
}
// store offsets
val offsetRows = sql"""
update txn_offsets set off = ${osr.untilOffset}
where topic = ${osr.topic} and part = ${osr.partition} and off = ${osr.fromOffset}
""".update.apply()
if (offsetRows != 1) {
throw new Exception("...")
}
}