這篇文章是對(duì)近期工作的一個(gè)總結(jié)脱货,雖然主要利用了一些開源系統(tǒng)和比較成熟的機(jī)制,但在業(yè)務(wù)實(shí)踐過(guò)程中還是遇到了一些坑。寫下來(lái)一是為了自我總結(jié)和梳理辛萍,另外也希望能夠給別人帶來(lái)一點(diǎn)點(diǎn)的啟發(fā)遇骑。
前言
KafkaConsumer一般作為實(shí)時(shí)數(shù)據(jù)流處理當(dāng)中的訂閱端卖毁,其主要工作是實(shí)時(shí)的從Kafka中訂閱數(shù)據(jù),進(jìn)行一些簡(jiǎn)單的ETL工作(不是必須)落萎,將數(shù)據(jù)存儲(chǔ)到其它存儲(chǔ)系統(tǒng)上(一般是HDFS亥啦、HBase、Mysql等)练链。除此之外翔脱,我們的KafkaConsumer還要求具有以下特性:
- 數(shù)據(jù)延遲是必須在秒級(jí)別。
- 要保證數(shù)據(jù)的
Exactly Once Semantics
媒鼓,即不丟不重届吁。 - 需要盡量做到優(yōu)雅退出。
第一個(gè)問(wèn)題比較容易做到绿鸣,數(shù)據(jù)延遲在秒級(jí)疚沐,就要求我們的KafkaConsumer必須是一個(gè)7*24小時(shí)的常駐MapReduce程序,同時(shí)由于我們只需要處理訂閱和轉(zhuǎn)儲(chǔ)潮模,所以該MapReduce程序是一個(gè) mapper only
的亮蛔。所以后文我會(huì)重點(diǎn)介紹如何保證第二個(gè)特性,即數(shù)據(jù)的不丟不重擎厢。
如何保證數(shù)據(jù)的不丟
這里為了保證數(shù)據(jù)的不丟我們主要利用了Kafka本身的數(shù)據(jù)可回溯性和我們自己在Consumer程序中實(shí)現(xiàn)的checkpoint
機(jī)制尔邓。
CheckPoint機(jī)制的實(shí)現(xiàn)
Kafka本身按照Topic劃分?jǐn)?shù)據(jù)流晾剖,我們的一個(gè)KafkaConsumer程序只會(huì)訂閱一個(gè)Topic的數(shù)據(jù)。同時(shí)Kafka的一個(gè)Topic下劃分為多個(gè)partition梯嗽,每個(gè)partition是一個(gè)實(shí)際的數(shù)據(jù)管道齿尽,我們的KafkaConsumer作為mapper only
的,每個(gè)mapper會(huì)去實(shí)時(shí)的訂閱一個(gè)partition的數(shù)據(jù)灯节,所以mapper的個(gè)數(shù)是和partition的個(gè)數(shù)保持一樣的循头。如何實(shí)現(xiàn)一個(gè)Kafka的InputFormat,這塊網(wǎng)上已經(jīng)有很多文章炎疆,我就不具體介紹了卡骂。下面介紹如何進(jìn)行checkpoint
。
由于每個(gè)mapper對(duì)應(yīng)一個(gè)Kafka的partition形入,所以每個(gè)mapper需要單獨(dú)記錄自己的進(jìn)度全跨。我們?cè)趍ysql當(dāng)中創(chuàng)建這樣一張表:
CREATE TABLE `kafka_consumer_progress` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`kafka_partition_id` int(11) NOT NULL,
`current_offset` bigint(11) NOT NULL,
) ENGINE=InnoDB AUTO_INCREMENT=3419273 DEFAULT CHARSET=utf8mb4
其中id作為自增主鍵,kafka_partition_id記錄數(shù)據(jù)在kafka中對(duì)應(yīng)的partition亿遂,current_offset記錄最后寫入的數(shù)據(jù)在該partition下對(duì)應(yīng)的offset浓若。每個(gè)mapper在將一批(注意不是每條都寫,因?yàn)橥鵰ysql中記錄checkpoint
也是一個(gè)很重的操作)數(shù)據(jù)寫入kudu成功之后蛇数,需要在mysql中更新對(duì)應(yīng)的進(jìn)度挪钓。同時(shí),mapper在失敗重啟的時(shí)候也都需要先從msyql當(dāng)中恢復(fù)進(jìn)度耳舅。代碼示例如下:
// 1. 重啟時(shí)需要先恢復(fù)進(jìn)度
currentOffset = metaClient.recoverOffset(partitionId);
while(true) {
// 2. 從kafka中訂閱數(shù)據(jù)
Response response = consumer.fetch(currentOffset);
// 3. 將數(shù)據(jù)寫入kudu
foreach(Data data : response.getMessage()) {
kuduWriter.write(data);
}
// 4. 更新進(jìn)度碌上,記錄checkpoint
currentOffset = response.nextOffset();
metaClient.updateProgress(partitionId, currentOffset);
}
如何保證數(shù)據(jù)的不重
進(jìn)行到這一步我們很容易發(fā)現(xiàn),即使在mapper剛剛寫入kudu浦徊,還沒來(lái)得及更新checkpoint
時(shí)掛掉馏予,我們也能從上一個(gè)checkpoint
恢復(fù)進(jìn)度,這樣保證了數(shù)據(jù)的at least once
語(yǔ)義盔性。但是如果該批次的數(shù)據(jù)恰好寫成功了而沒來(lái)得及更新checkpoint
吗蚌,就會(huì)導(dǎo)致數(shù)據(jù)的重復(fù)寫入。如何保證數(shù)據(jù)的不重復(fù)纯出,這里我們根據(jù)具體的業(yè)務(wù)場(chǎng)景蚯妇,分為兩塊來(lái)看。
- 對(duì)于冪等的操作暂筝,采用主鍵去重箩言。
- 對(duì)于非冪等的操作,每條數(shù)據(jù)單獨(dú)記錄更新的offset焕襟。
冪等操作
針對(duì)我們的業(yè)務(wù)類型來(lái)說(shuō)陨收,數(shù)據(jù)分為Event和Profile兩種類型,其中Event類型的數(shù)據(jù)我們只支持插入這一種語(yǔ)義。所以Event類型的數(shù)據(jù)操作是冪等的务漩。對(duì)于冪等操作重復(fù)執(zhí)行不受影響拄衰,但是我們需要保證每條數(shù)據(jù)具有唯一ID。這里我們使用kafka的partition id + offset作為kudu表的主鍵饵骨,所有Event數(shù)據(jù)的寫入都相當(dāng)于對(duì)主鍵數(shù)據(jù)的replace翘悉。所以,Event數(shù)據(jù)的重復(fù)寫入相當(dāng)于在kudu層做了去重居触。
非冪等操作
Profile類型的數(shù)據(jù)妖混,是我們業(yè)務(wù)實(shí)體,每條數(shù)據(jù)具有自己的唯一ID轮洋。Profile類型的數(shù)據(jù)操作支持profile_set制市、profile_update、profile_increase弊予、profile_set_once等祥楣,所以Profile類型的操作屬于非冪等的。對(duì)于這種類型的數(shù)據(jù)我們?cè)趯?duì)應(yīng)的profile kudu表當(dāng)中增加了offset字段汉柒,記錄這條數(shù)據(jù)最后被更新的數(shù)據(jù)來(lái)源于kafka的哪一個(gè)offset误褪。這里可能會(huì)問(wèn),為什么不需要記錄partition id呢竭翠,這是因?yàn)槲覀兊腜rofile數(shù)據(jù)在導(dǎo)入Kafka的時(shí)候已經(jīng)是按照Profile的id進(jìn)行的hash取模振坚,所以相同id的Profile數(shù)據(jù)只會(huì)出現(xiàn)在相同的partition內(nèi)薇搁。
對(duì)Profile進(jìn)行更新時(shí)斋扰,我們會(huì)先從kudu當(dāng)中讀取是否已有對(duì)應(yīng)id的Profile數(shù)據(jù)存在,如果有的話啃洋,會(huì)比較kudu當(dāng)中Profile數(shù)據(jù)的offset和Kafka中的profile數(shù)據(jù)的offset传货,只有當(dāng)kudu當(dāng)中的offset小于Kafka中的offset的時(shí)候,才會(huì)對(duì)該條Profile數(shù)據(jù)進(jìn)行更新宏娄,從而完成了對(duì)非冪等的Profile操作的去重问裕。
如何做到常駐的MapReduce程序的優(yōu)雅退出。
這里有同學(xué)可能會(huì)問(wèn)兩個(gè)問(wèn)題:
- 你們的KafkaConsumer不是7*24小時(shí)常駐的嗎孵坚,為什么還需要退出粮宛?
- 上面兩步不是保證了數(shù)據(jù)的不丟不重嗎,為什么還需要優(yōu)雅退出卖宠?
對(duì)于問(wèn)題1巍杈,主要有兩種場(chǎng)景,一種是我們的程序在升級(jí)的時(shí)候扛伍,肯定是需要主動(dòng)退出并重啟的筷畦;二是由于各種異常(比如kudu掛掉等),我們需要讓KafkaConsumer退出刺洒,從而發(fā)出報(bào)警并進(jìn)行人工干預(yù)鳖宾。
對(duì)于問(wèn)題2吼砂,我們的場(chǎng)景是:需要在KafkaConsumer當(dāng)中對(duì)數(shù)據(jù)來(lái)源進(jìn)行一些統(tǒng)計(jì)工作,同時(shí)需要將統(tǒng)計(jì)信息記錄到mysql當(dāng)中鼎文,主要用處是當(dāng)數(shù)據(jù)異常時(shí)渔肩,可以進(jìn)行方便的debug工作。所以也只是做到盡量優(yōu)雅退出漂问,及時(shí)這部分?jǐn)?shù)據(jù)丟失了赖瞒,也不會(huì)影響我們程序的正常工作。
解決方案一 —— 捕獲kill信號(hào)
最開始我的想法是當(dāng)作業(yè)失敗蚤假,hadoop需要將mapper任務(wù)kill掉時(shí)栏饮,我們捕獲到相應(yīng)的信號(hào),然后進(jìn)行主動(dòng)退出磷仰。但是調(diào)研之后發(fā)現(xiàn)袍嬉,hadoop會(huì)先嘗試使用SIGTERM
殺死m(xù)apper進(jìn)程,然后等待一段時(shí)間(默認(rèn)5000毫秒)灶平,如果進(jìn)程還沒有退出時(shí)伺通,會(huì)使用SIGTERM
殺死進(jìn)程。我們雖然能夠捕獲到SIGTERM
信號(hào)逢享,但是5000毫秒對(duì)于我們來(lái)說(shuō)往往來(lái)不及做剩下的clean up工作罐监。而這個(gè)超時(shí)配置又是yarn全局的,我們沒法為KafkaConsumer單獨(dú)修改瞒爬,所以這個(gè)方案被拋棄了弓柱。
解決方案二 —— 使用Zookeeper進(jìn)行同步
方案一走不通,我們必須使用自己的方法進(jìn)行mapper間的信息同步侧但。我們想到了Zookeeper(以下簡(jiǎn)稱ZK): 在KafkaConsumer啟動(dòng)時(shí)由本地進(jìn)程創(chuàng)建對(duì)應(yīng)的ZK節(jié)點(diǎn)矢空,同時(shí)每個(gè)mapper都會(huì)作為一個(gè)Watcher觀察這個(gè)節(jié)點(diǎn)。當(dāng)KafkaConsumer需要主動(dòng)退出時(shí)禀横,本地進(jìn)程會(huì)將對(duì)應(yīng)的ZK節(jié)點(diǎn)刪除屁药,同時(shí)所有mapper觀察到對(duì)應(yīng)的ZK節(jié)點(diǎn)變化之后,會(huì)進(jìn)行最后的clean up并優(yōu)雅退出柏锄。這樣就解決了我們?cè)谏?jí)的時(shí)候需要主動(dòng)退出KafkaConsumer的問(wèn)題酿箭。但是還有一個(gè)問(wèn)題也是我最開始沒有想到的,當(dāng)一個(gè)mapper執(zhí)行出現(xiàn)異常時(shí)趾娃,需要異常退出缭嫡,這時(shí)hadoop會(huì)標(biāo)記整個(gè)Job為失敗,并將其它正在運(yùn)行的mapper也kill掉茫舶。這就要求單個(gè)mapper異常時(shí)械巡,我們也需要使用ZK進(jìn)行優(yōu)雅退出。方案類似:當(dāng)mapper異常退出時(shí),會(huì)嘗試將對(duì)應(yīng)的ZK節(jié)點(diǎn)刪除讥耗。代碼示例如下:
private void doExit(Context context) {
// 進(jìn)行最后的clean up工作
this.cleanup();
if (!Terminator.isShutdownBegan()) {
logger.error("kafka consumer mapper run failed.");
// 當(dāng)mapper因?yàn)楫惓M顺鰰r(shí),會(huì)將相應(yīng)的zk節(jié)點(diǎn)刪除,以通知其它mapper進(jìn)行安全退出
// 探測(cè)ZK節(jié)點(diǎn)是否存在
boolean pathExists = true;
try {
pathExists = this.zookeeperClient.checkExists(zkNode);
if (pathExists) {
this.zookeeperClient.deletePath(this.zkNode);
pathExists = false;
}
} catch (Exception e) {
logger.error("delete path {} failed.", zkNode, e);
try {
pathExists = this.zookeeperClient.checkExists(zkNode);
} catch (Exception e1) {
logger.error("check zk path result failed.");
}
}
if (!pathExists) {
logger.info("kafka consumer mapper exit gracefully.");
} else {
logger.warn("kafka consumer mapper exit failed.");
System.exit(1);
}
} else {
logger.info("mark shut down phase done.");
Terminator.markShutdownPhaseDone();
}
}
通過(guò)方案二有勾,我們能夠解決90%的異常退出問(wèn)題,但還不能完全解決問(wèn)題古程。比如mapper爆內(nèi)存的時(shí)候蔼卡,hadoop還是會(huì)將mapper殺掉,這是我們還是無(wú)法進(jìn)行干預(yù)挣磨,所以這里的優(yōu)雅退出只能做到盡量準(zhǔn)雇逞,如果大家有什么好的方法,也可以告訴我茁裙。