基于MapReduce的常駐Kafka Consumer程序的實(shí)現(xiàn)

這篇文章是對(duì)近期工作的一個(gè)總結(jié)脱货,雖然主要利用了一些開源系統(tǒng)和比較成熟的機(jī)制,但在業(yè)務(wù)實(shí)踐過(guò)程中還是遇到了一些坑。寫下來(lái)一是為了自我總結(jié)和梳理辛萍,另外也希望能夠給別人帶來(lái)一點(diǎn)點(diǎn)的啟發(fā)遇骑。

前言

KafkaConsumer一般作為實(shí)時(shí)數(shù)據(jù)流處理當(dāng)中的訂閱端卖毁,其主要工作是實(shí)時(shí)的從Kafka中訂閱數(shù)據(jù),進(jìn)行一些簡(jiǎn)單的ETL工作(不是必須)落萎,將數(shù)據(jù)存儲(chǔ)到其它存儲(chǔ)系統(tǒng)上(一般是HDFS亥啦、HBase、Mysql等)练链。除此之外翔脱,我們的KafkaConsumer還要求具有以下特性:

  1. 數(shù)據(jù)延遲是必須在秒級(jí)別
  2. 要保證數(shù)據(jù)的Exactly Once Semantics媒鼓,即不丟不重届吁。
  3. 需要盡量做到優(yōu)雅退出。

第一個(gè)問(wèn)題比較容易做到绿鸣,數(shù)據(jù)延遲在秒級(jí)疚沐,就要求我們的KafkaConsumer必須是一個(gè)7*24小時(shí)的常駐MapReduce程序,同時(shí)由于我們只需要處理訂閱和轉(zhuǎn)儲(chǔ)潮模,所以該MapReduce程序是一個(gè) mapper only 的亮蛔。所以后文我會(huì)重點(diǎn)介紹如何保證第二個(gè)特性,即數(shù)據(jù)的不丟不重擎厢。

如何保證數(shù)據(jù)的不丟

這里為了保證數(shù)據(jù)的不丟我們主要利用了Kafka本身的數(shù)據(jù)可回溯性和我們自己在Consumer程序中實(shí)現(xiàn)的checkpoint機(jī)制尔邓。

CheckPoint機(jī)制的實(shí)現(xiàn)

Kafka本身按照Topic劃分?jǐn)?shù)據(jù)流晾剖,我們的一個(gè)KafkaConsumer程序只會(huì)訂閱一個(gè)Topic的數(shù)據(jù)。同時(shí)Kafka的一個(gè)Topic下劃分為多個(gè)partition梯嗽,每個(gè)partition是一個(gè)實(shí)際的數(shù)據(jù)管道齿尽,我們的KafkaConsumer作為mapper only的,每個(gè)mapper會(huì)去實(shí)時(shí)的訂閱一個(gè)partition的數(shù)據(jù)灯节,所以mapper的個(gè)數(shù)是和partition的個(gè)數(shù)保持一樣的循头。如何實(shí)現(xiàn)一個(gè)Kafka的InputFormat,這塊網(wǎng)上已經(jīng)有很多文章炎疆,我就不具體介紹了卡骂。下面介紹如何進(jìn)行checkpoint

由于每個(gè)mapper對(duì)應(yīng)一個(gè)Kafka的partition形入,所以每個(gè)mapper需要單獨(dú)記錄自己的進(jìn)度全跨。我們?cè)趍ysql當(dāng)中創(chuàng)建這樣一張表:

CREATE TABLE `kafka_consumer_progress` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`kafka_partition_id` int(11) NOT NULL,
`current_offset` bigint(11) NOT NULL,
) ENGINE=InnoDB AUTO_INCREMENT=3419273 DEFAULT CHARSET=utf8mb4

其中id作為自增主鍵,kafka_partition_id記錄數(shù)據(jù)在kafka中對(duì)應(yīng)的partition亿遂,current_offset記錄最后寫入的數(shù)據(jù)在該partition下對(duì)應(yīng)的offset浓若。每個(gè)mapper在將一批(注意不是每條都寫,因?yàn)橥鵰ysql中記錄checkpoint也是一個(gè)很重的操作)數(shù)據(jù)寫入kudu成功之后蛇数,需要在mysql中更新對(duì)應(yīng)的進(jìn)度挪钓。同時(shí),mapper在失敗重啟的時(shí)候也都需要先從msyql當(dāng)中恢復(fù)進(jìn)度耳舅。代碼示例如下:

// 1. 重啟時(shí)需要先恢復(fù)進(jìn)度
currentOffset = metaClient.recoverOffset(partitionId);    
while(true) {
   //  2. 從kafka中訂閱數(shù)據(jù)
   Response response = consumer.fetch(currentOffset);
   // 3. 將數(shù)據(jù)寫入kudu
   foreach(Data data : response.getMessage()) {
        kuduWriter.write(data);
   }
   // 4. 更新進(jìn)度碌上,記錄checkpoint
   currentOffset = response.nextOffset();
   metaClient.updateProgress(partitionId, currentOffset);
}

如何保證數(shù)據(jù)的不重

進(jìn)行到這一步我們很容易發(fā)現(xiàn),即使在mapper剛剛寫入kudu浦徊,還沒來(lái)得及更新checkpoint時(shí)掛掉馏予,我們也能從上一個(gè)checkpoint恢復(fù)進(jìn)度,這樣保證了數(shù)據(jù)的at least once 語(yǔ)義盔性。但是如果該批次的數(shù)據(jù)恰好寫成功了而沒來(lái)得及更新checkpoint吗蚌,就會(huì)導(dǎo)致數(shù)據(jù)的重復(fù)寫入。如何保證數(shù)據(jù)的不重復(fù)纯出,這里我們根據(jù)具體的業(yè)務(wù)場(chǎng)景蚯妇,分為兩塊來(lái)看。

  1. 對(duì)于冪等的操作暂筝,采用主鍵去重箩言。
  2. 對(duì)于非冪等的操作,每條數(shù)據(jù)單獨(dú)記錄更新的offset焕襟。

冪等操作

針對(duì)我們的業(yè)務(wù)類型來(lái)說(shuō)陨收,數(shù)據(jù)分為Event和Profile兩種類型,其中Event類型的數(shù)據(jù)我們只支持插入這一種語(yǔ)義。所以Event類型的數(shù)據(jù)操作是冪等的务漩。對(duì)于冪等操作重復(fù)執(zhí)行不受影響拄衰,但是我們需要保證每條數(shù)據(jù)具有唯一ID。這里我們使用kafka的partition id + offset作為kudu表的主鍵饵骨,所有Event數(shù)據(jù)的寫入都相當(dāng)于對(duì)主鍵數(shù)據(jù)的replace翘悉。所以,Event數(shù)據(jù)的重復(fù)寫入相當(dāng)于在kudu層做了去重居触。

非冪等操作

Profile類型的數(shù)據(jù)妖混,是我們業(yè)務(wù)實(shí)體,每條數(shù)據(jù)具有自己的唯一ID轮洋。Profile類型的數(shù)據(jù)操作支持profile_set制市、profile_update、profile_increase弊予、profile_set_once等祥楣,所以Profile類型的操作屬于非冪等的。對(duì)于這種類型的數(shù)據(jù)我們?cè)趯?duì)應(yīng)的profile kudu表當(dāng)中增加了offset字段汉柒,記錄這條數(shù)據(jù)最后被更新的數(shù)據(jù)來(lái)源于kafka的哪一個(gè)offset误褪。這里可能會(huì)問(wèn),為什么不需要記錄partition id呢竭翠,這是因?yàn)槲覀兊腜rofile數(shù)據(jù)在導(dǎo)入Kafka的時(shí)候已經(jīng)是按照Profile的id進(jìn)行的hash取模振坚,所以相同id的Profile數(shù)據(jù)只會(huì)出現(xiàn)在相同的partition內(nèi)薇搁。
對(duì)Profile進(jìn)行更新時(shí)斋扰,我們會(huì)先從kudu當(dāng)中讀取是否已有對(duì)應(yīng)id的Profile數(shù)據(jù)存在,如果有的話啃洋,會(huì)比較kudu當(dāng)中Profile數(shù)據(jù)的offset和Kafka中的profile數(shù)據(jù)的offset传货,只有當(dāng)kudu當(dāng)中的offset小于Kafka中的offset的時(shí)候,才會(huì)對(duì)該條Profile數(shù)據(jù)進(jìn)行更新宏娄,從而完成了對(duì)非冪等的Profile操作的去重问裕。

如何做到常駐的MapReduce程序的優(yōu)雅退出。

這里有同學(xué)可能會(huì)問(wèn)兩個(gè)問(wèn)題:

  1. 你們的KafkaConsumer不是7*24小時(shí)常駐的嗎孵坚,為什么還需要退出粮宛?
  2. 上面兩步不是保證了數(shù)據(jù)的不丟不重嗎,為什么還需要優(yōu)雅退出卖宠?

對(duì)于問(wèn)題1巍杈,主要有兩種場(chǎng)景,一種是我們的程序在升級(jí)的時(shí)候扛伍,肯定是需要主動(dòng)退出并重啟的筷畦;二是由于各種異常(比如kudu掛掉等),我們需要讓KafkaConsumer退出刺洒,從而發(fā)出報(bào)警并進(jìn)行人工干預(yù)鳖宾。
對(duì)于問(wèn)題2吼砂,我們的場(chǎng)景是:需要在KafkaConsumer當(dāng)中對(duì)數(shù)據(jù)來(lái)源進(jìn)行一些統(tǒng)計(jì)工作,同時(shí)需要將統(tǒng)計(jì)信息記錄到mysql當(dāng)中鼎文,主要用處是當(dāng)數(shù)據(jù)異常時(shí)渔肩,可以進(jìn)行方便的debug工作。所以也只是做到盡量優(yōu)雅退出漂问,及時(shí)這部分?jǐn)?shù)據(jù)丟失了赖瞒,也不會(huì)影響我們程序的正常工作。

解決方案一 —— 捕獲kill信號(hào)

最開始我的想法是當(dāng)作業(yè)失敗蚤假,hadoop需要將mapper任務(wù)kill掉時(shí)栏饮,我們捕獲到相應(yīng)的信號(hào),然后進(jìn)行主動(dòng)退出磷仰。但是調(diào)研之后發(fā)現(xiàn)袍嬉,hadoop會(huì)先嘗試使用SIGTERM殺死m(xù)apper進(jìn)程,然后等待一段時(shí)間(默認(rèn)5000毫秒)灶平,如果進(jìn)程還沒有退出時(shí)伺通,會(huì)使用SIGTERM殺死進(jìn)程。我們雖然能夠捕獲到SIGTERM信號(hào)逢享,但是5000毫秒對(duì)于我們來(lái)說(shuō)往往來(lái)不及做剩下的clean up工作罐监。而這個(gè)超時(shí)配置又是yarn全局的,我們沒法為KafkaConsumer單獨(dú)修改瞒爬,所以這個(gè)方案被拋棄了弓柱。

解決方案二 —— 使用Zookeeper進(jìn)行同步

方案一走不通,我們必須使用自己的方法進(jìn)行mapper間的信息同步侧但。我們想到了Zookeeper(以下簡(jiǎn)稱ZK): 在KafkaConsumer啟動(dòng)時(shí)由本地進(jìn)程創(chuàng)建對(duì)應(yīng)的ZK節(jié)點(diǎn)矢空,同時(shí)每個(gè)mapper都會(huì)作為一個(gè)Watcher觀察這個(gè)節(jié)點(diǎn)。當(dāng)KafkaConsumer需要主動(dòng)退出時(shí)禀横,本地進(jìn)程會(huì)將對(duì)應(yīng)的ZK節(jié)點(diǎn)刪除屁药,同時(shí)所有mapper觀察到對(duì)應(yīng)的ZK節(jié)點(diǎn)變化之后,會(huì)進(jìn)行最后的clean up并優(yōu)雅退出柏锄。這樣就解決了我們?cè)谏?jí)的時(shí)候需要主動(dòng)退出KafkaConsumer的問(wèn)題酿箭。但是還有一個(gè)問(wèn)題也是我最開始沒有想到的,當(dāng)一個(gè)mapper執(zhí)行出現(xiàn)異常時(shí)趾娃,需要異常退出缭嫡,這時(shí)hadoop會(huì)標(biāo)記整個(gè)Job為失敗,并將其它正在運(yùn)行的mapper也kill掉茫舶。這就要求單個(gè)mapper異常時(shí)械巡,我們也需要使用ZK進(jìn)行優(yōu)雅退出。方案類似:當(dāng)mapper異常退出時(shí),會(huì)嘗試將對(duì)應(yīng)的ZK節(jié)點(diǎn)刪除讥耗。代碼示例如下:

private void doExit(Context context) {  
    // 進(jìn)行最后的clean up工作
    this.cleanup();  
    if (!Terminator.isShutdownBegan()) {
        logger.error("kafka consumer mapper run failed.");
        // 當(dāng)mapper因?yàn)楫惓M顺鰰r(shí),會(huì)將相應(yīng)的zk節(jié)點(diǎn)刪除,以通知其它mapper進(jìn)行安全退出    
        // 探測(cè)ZK節(jié)點(diǎn)是否存在
        boolean pathExists = true;
        try {
            pathExists = this.zookeeperClient.checkExists(zkNode);
            if (pathExists) {
                this.zookeeperClient.deletePath(this.zkNode);
                pathExists = false;
             }
          } catch (Exception e) {
              logger.error("delete path {} failed.", zkNode, e);
              try {
                  pathExists = this.zookeeperClient.checkExists(zkNode);
              } catch (Exception e1) {
                  logger.error("check zk path result failed.");
              }
          } 
          if (!pathExists) { 
              logger.info("kafka consumer mapper exit gracefully.");
          } else {
              logger.warn("kafka consumer mapper exit failed."); 
              System.exit(1);
          }
      } else {
          logger.info("mark shut down phase done.");    
          Terminator.markShutdownPhaseDone();
      }
}

通過(guò)方案二有勾,我們能夠解決90%的異常退出問(wèn)題,但還不能完全解決問(wèn)題古程。比如mapper爆內(nèi)存的時(shí)候蔼卡,hadoop還是會(huì)將mapper殺掉,這是我們還是無(wú)法進(jìn)行干預(yù)挣磨,所以這里的優(yōu)雅退出只能做到盡量準(zhǔn)雇逞,如果大家有什么好的方法,也可以告訴我茁裙。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末塘砸,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子晤锥,更是在濱河造成了極大的恐慌掉蔬,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,695評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件矾瘾,死亡現(xiàn)場(chǎng)離奇詭異女轿,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)壕翩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門蛉迹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人放妈,你說(shuō)我怎么就攤上這事北救。” “怎么了大猛?”我有些...
    開封第一講書人閱讀 168,130評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵扭倾,是天一觀的道長(zhǎng)淀零。 經(jīng)常有香客問(wèn)我挽绩,道長(zhǎng),這世上最難降的妖魔是什么驾中? 我笑而不...
    開封第一講書人閱讀 59,648評(píng)論 1 297
  • 正文 為了忘掉前任唉堪,我火速辦了婚禮,結(jié)果婚禮上肩民,老公的妹妹穿的比我還像新娘唠亚。我一直安慰自己,他們只是感情好持痰,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,655評(píng)論 6 397
  • 文/花漫 我一把揭開白布灶搜。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪割卖。 梳的紋絲不亂的頭發(fā)上前酿,一...
    開封第一講書人閱讀 52,268評(píng)論 1 309
  • 那天,我揣著相機(jī)與錄音鹏溯,去河邊找鬼罢维。 笑死,一個(gè)胖子當(dāng)著我的面吹牛丙挽,可吹牛的內(nèi)容都是我干的肺孵。 我是一名探鬼主播,決...
    沈念sama閱讀 40,835評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼颜阐,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼平窘!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起凳怨,我...
    開封第一講書人閱讀 39,740評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤初婆,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后猿棉,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體磅叛,經(jīng)...
    沈念sama閱讀 46,286評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,375評(píng)論 3 340
  • 正文 我和宋清朗相戀三年萨赁,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了弊琴。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,505評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡杖爽,死狀恐怖敲董,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情慰安,我是刑警寧澤腋寨,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布,位于F島的核電站化焕,受9級(jí)特大地震影響萄窜,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜撒桨,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,873評(píng)論 3 333
  • 文/蒙蒙 一查刻、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧凤类,春花似錦穗泵、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)现诀。三九已至,卻和暖如春履肃,著一層夾襖步出監(jiān)牢的瞬間赶盔,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工榆浓, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留于未,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,921評(píng)論 3 376
  • 正文 我出身青樓陡鹃,卻偏偏與公主長(zhǎng)得像烘浦,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子萍鲸,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,515評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容