Kafka+Spark Streaming如何保證exactly once語義

在Kafka张抄、Storm孤页、Flink、Spark Streaming等分布式流處理系統(tǒng)中(沒錯奕纫,Kafka本質(zhì)上是流處理系統(tǒng)提陶,不是單純的“消息隊列”),存在三種消息傳遞語義(message delivery semantics)匹层,分別是:

  • at least once:每條消息會被收到1次或多次隙笆。例如發(fā)送方S在超時時間內(nèi)沒有收到接收方R的通知(如ack),或者收到了R的報錯升筏,就會不斷重發(fā)消息直至R傳回ack撑柔。
  • at most once:每條消息會被收到0次或1次。也就是說S只負責(zé)向R發(fā)送消息您访,R也沒有任何通知機制铅忿。無論R最終是否收到,S都不會重發(fā)洋只。
  • exactly once:是上面兩個的綜合辆沦,保證S發(fā)送的每一條消息,R都會“不重不漏”地恰好收到1次识虚。它是最強最精確的語義肢扯,也最難實現(xiàn)。

在我們的日常工作中担锤,90%的流處理業(yè)務(wù)都是通過Kafka+Spark Streaming+HDFS來實現(xiàn)的(這里Kafka的作用是消息隊列了)蔚晨。本篇探討保證exactly once語義的方法。


如上面的圖所示肛循,一個Spark Streaming程序由三步組成:輸入铭腕、處理邏輯、輸出多糠。要達到exactly once的理想狀態(tài)累舷,需要三步協(xié)同進行,而不是只與處理邏輯有關(guān)夹孔。Kafka與Spark Streaming集成時有兩種方法:舊的基于receiver的方法被盈,新的基于direct stream的方法。下面兩張圖可以清楚地說明搭伤。

  • 基于receiver的方法


    基于receiver的方法采用Kafka的高級消費者API只怎,每個executor進程都不斷拉取消息,并同時保存在executor內(nèi)存與HDFS上的預(yù)寫日志(write-ahead log/WAL)怜俐。當(dāng)消息寫入WAL后身堡,自動更新ZooKeeper中的offset。
    它可以保證at least once語義拍鲤,但無法保證exactly once語義贴谎。雖然引入了WAL來確保消息不會丟失汞扎,但還有可能會出現(xiàn)消息已經(jīng)寫入WAL,但offset更新失敗的情況擅这,Kafka就會按上一次的offset重新發(fā)送消息佩捞。這種方式還會造成數(shù)據(jù)冗余(Kafka broker中一份,Spark executor中一份)蕾哟,使吞吐量和內(nèi)存利用率降低。現(xiàn)在基本都使用下面基于direct stream的方法了莲蜘。

  • 基于direct stream的方法


    基于direct stream的方法采用Kafka的簡單消費者API谭确,它的流程大大簡化了。executor不再從Kafka中連續(xù)讀取消息票渠,也消除了receiver和WAL逐哈。還有一個改進就是Kafka分區(qū)與RDD分區(qū)是一一對應(yīng)的,更可控问顷。
    driver進程只需要每次從Kafka獲得批次消息的offset range昂秃,然后executor進程根據(jù)offset range去讀取該批次對應(yīng)的消息即可。由于offset在Kafka中能唯一確定一條消息杜窄,且在外部只能被Streaming程序本身感知到肠骆,因此消除了不一致性,達到了exactly once塞耕。
    不過蚀腿,由于它采用了簡單消費者API,我們就需要自己來管理offset扫外。否則一旦程序崩潰莉钙,整個流只能從earliest或者latest點恢復(fù),這肯定是不穩(wěn)妥的筛谚。offset管理在之前的文章中提到過磁玉,這里不再贅述。

Kafka作為輸入源可以保證exactly once驾讲,那么處理邏輯呢蚊伞?答案是顯然的,Spark Streaming的處理邏輯天生具備exactly once語義蝎毡。
Spark RDD之所以被稱為“彈性分布式數(shù)據(jù)集”厚柳,是因為它具有不可變、可分區(qū)沐兵、可并行計算别垮、容錯的特征。一個RDD只能由穩(wěn)定的數(shù)據(jù)集生成扎谎,或者從其他RDD轉(zhuǎn)換(transform)得來碳想。如果在執(zhí)行RDD lineage的過程中失敗烧董,那么只要源數(shù)據(jù)不發(fā)生變化,無論重新執(zhí)行多少次lineage胧奔,都一定會得到同樣的逊移、確定的結(jié)果。

最后龙填,我們還需要保證輸出過程也符合exactly once語義胳泉。Spark Streaming的輸出一般是靠foreachRDD()算子來實現(xiàn),它默認是at least once的岩遗。如果輸出過程中途出錯扇商,那么就會重復(fù)執(zhí)行直到寫入成功。為了讓它符合exactly once宿礁,可以施加兩種限制之一:冪等性寫入(idempotent write)案铺、事務(wù)性寫入(transactional write)。

  • 冪等性寫入
    冪等原來是數(shù)學(xué)里的概念梆靖,即f(f(x))=f(x)控汉。冪等寫入就是寫入多次與寫入一次的結(jié)果完全相同,可以自動將at least once轉(zhuǎn)化為exactly once返吻。這對于自帶主鍵或主鍵組的業(yè)務(wù)比較合適(比如各類日志姑子、MySQL binlog等),并且實現(xiàn)起來比較簡單测僵。
    但是它要求處理邏輯是map-only的壁酬,也就是只能包含轉(zhuǎn)換、過濾等操作恨课,不能包含shuffle舆乔、聚合等操作。如果條件更嚴格剂公,就只能采用事務(wù)性寫入方法希俩。
    stream.foreachRDD { rdd =>
      rdd.foreachPartition { iter =>
        // make sure connection pool is set up on the executor before writing
        SetupJdbc(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)

        iter.foreach { case (key, msg) =>
          DB.autoCommit { implicit session =>
            // the unique key for idempotency is just the text of the message itself, for example purposes
            sql"insert into idem_data(msg) values (${msg})".update.apply
          }
        }
      }
    }
  • 事務(wù)性寫入
    這里的事務(wù)與DBMS中的事務(wù)含義基本相同,就是對數(shù)據(jù)進行一系列訪問與更新操作所組成的邏輯塊纲辽。為了符合事務(wù)的ACID特性https://en.wikipedia.org/wiki/ACID_(computer_science))颜武,必須引入一個唯一ID標(biāo)識當(dāng)前的處理邏輯,并且將計算結(jié)果與該ID一起落盤拖吼。ID可以由主題鳞上、分區(qū)、時間吊档、offset等共同組成篙议。
    事務(wù)操作可以在foreachRDD()時進行。如果數(shù)據(jù)寫入失敗,或者offset寫入與當(dāng)前offset range不匹配鬼贱,那么這一批次數(shù)據(jù)都將失敗并且回滾移怯。
// localTx is transactional, if metric update or offset update fails, neither will be committed
    DB.localTx { implicit session =>
      // store metric data
      val metricRows = sql"""
    update txn_data set metric = metric + ${metric}
      where topic = ${osr.topic}
    """.update.apply()
      if (metricRows != 1) {
        throw new Exception("...")
      }

      // store offsets
      val offsetRows = sql"""
    update txn_offsets set off = ${osr.untilOffset}
      where topic = ${osr.topic} and part = ${osr.partition} and off = ${osr.fromOffset}
    """.update.apply()
      if (offsetRows != 1) {
        throw new Exception("...")
      }
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市这难,隨后出現(xiàn)的幾起案子舟误,更是在濱河造成了極大的恐慌,老刑警劉巖姻乓,帶你破解...
    沈念sama閱讀 222,252評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件嵌溢,死亡現(xiàn)場離奇詭異,居然都是意外死亡蹋岩,警方通過查閱死者的電腦和手機堵腹,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來星澳,“玉大人,你說我怎么就攤上這事旱易〗耍” “怎么了?”我有些...
    開封第一講書人閱讀 168,814評論 0 361
  • 文/不壞的土叔 我叫張陵阀坏,是天一觀的道長如暖。 經(jīng)常有香客問我,道長忌堂,這世上最難降的妖魔是什么盒至? 我笑而不...
    開封第一講書人閱讀 59,869評論 1 299
  • 正文 為了忘掉前任,我火速辦了婚禮士修,結(jié)果婚禮上枷遂,老公的妹妹穿的比我還像新娘。我一直安慰自己棋嘲,他們只是感情好酒唉,可當(dāng)我...
    茶點故事閱讀 68,888評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著沸移,像睡著了一般痪伦。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上雹锣,一...
    開封第一講書人閱讀 52,475評論 1 312
  • 那天网沾,我揣著相機與錄音,去河邊找鬼蕊爵。 笑死辉哥,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的攒射。 我是一名探鬼主播证薇,決...
    沈念sama閱讀 41,010評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼度苔,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了浑度?” 一聲冷哼從身側(cè)響起寇窑,我...
    開封第一講書人閱讀 39,924評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎箩张,沒想到半個月后甩骏,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,469評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡先慷,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,552評論 3 342
  • 正文 我和宋清朗相戀三年饮笛,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片论熙。...
    茶點故事閱讀 40,680評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡福青,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出脓诡,到底是詐尸還是另有隱情无午,我是刑警寧澤,帶...
    沈念sama閱讀 36,362評論 5 351
  • 正文 年R本政府宣布祝谚,位于F島的核電站宪迟,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏交惯。R本人自食惡果不足惜次泽,卻給世界環(huán)境...
    茶點故事閱讀 42,037評論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望席爽。 院中可真熱鬧意荤,春花似錦、人聲如沸只锻。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,519評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽炬藤。三九已至御铃,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間沈矿,已是汗流浹背上真。 一陣腳步聲響...
    開封第一講書人閱讀 33,621評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留羹膳,地道東北人睡互。 一個月前我還...
    沈念sama閱讀 49,099評論 3 378
  • 正文 我出身青樓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親就珠。 傳聞我的和親對象是個殘疾皇子寇壳,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,691評論 2 361

推薦閱讀更多精彩內(nèi)容