1、Kafka如何防止數(shù)據(jù)丟失
1)消費(fèi)端弄丟數(shù)據(jù)
? 消費(fèi)者在消費(fèi)完消息之后需要執(zhí)行消費(fèi)位移的提交,該消費(fèi)位移表示下一條需要拉取的消息的位置安拟。Kafka默認(rèn)位移提交方式是自動(dòng)提交馆截,但它不是在你每消費(fèi)一次數(shù)據(jù)之后就提交一次位移,而是每隔5秒將拉取到的每個(gè)分區(qū)中的最大的消費(fèi)位移進(jìn)行提交硼砰。自動(dòng)位移提交在正常情況下不會(huì)發(fā)生消息丟失或重復(fù)消費(fèi)的現(xiàn)象且蓬,唯一可能的情況,你拉取到消息后题翰,消費(fèi)者那邊剛好進(jìn)行了位移提交恶阴,Kafka那邊以為你已經(jīng)消費(fèi)了這條消息诈胜,其實(shí)你剛開始準(zhǔn)備對(duì)這條消息進(jìn)行業(yè)務(wù)處理,但你還沒處理完冯事,然后因?yàn)槟承┰蚪剐伲约簰斓袅耍?dāng)你服務(wù)恢復(fù)后再去消費(fèi)昵仅,那就是消費(fèi)下一條消息了缓熟,那么這條未處理的消息就相當(dāng)于丟失了。所以摔笤,很多時(shí)候并不是說拉取到消息就算消費(fèi)完成够滑,而是將消息寫入數(shù)據(jù)庫或緩存中,或者是更加復(fù)雜的業(yè)務(wù)處理吕世,在這些情況下彰触,所有的業(yè)務(wù)處理完成才能認(rèn)為消息被成功消費(fèi)。Kafka也提供了對(duì)位移提交進(jìn)行手動(dòng)提交的方式命辖,開啟手動(dòng)提交的前提是消費(fèi)者客戶端參數(shù)enable.auto.commit配置為false况毅,
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
? 消費(fèi)者端手動(dòng)提交方式提供了兩種,commitSync()同步提交方式和commitAsync()異步提交方式吮龄。commitSync()同步提交方式在調(diào)用時(shí)Consumer程序會(huì)處于阻塞狀態(tài)俭茧,直到遠(yuǎn)端的broker返回提交結(jié)果,這個(gè)狀態(tài)才會(huì)結(jié)束漓帚,這樣會(huì)對(duì)消費(fèi)者的性能有一定的影響母债。commitAsync()異步提交方式在執(zhí)行后會(huì)立刻返回,不會(huì)被阻塞尝抖,但是它也有相應(yīng)的問題產(chǎn)生毡们,如果異步提交失敗后,它雖然也有重試昧辽,但是重試提交的位移值可能早已經(jīng)“過期”或者不是最新的值了衙熔,因此異步提交的重試其實(shí)沒有意義。這里我們可以把同步提交和異步提交相結(jié)合搅荞,以達(dá)到最理想的效果红氯。
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
// 處理消息 record
}
consumer.commitAsync();
}
} catch (Exception e){
// 處理異常
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
2)Kafka端弄丟數(shù)據(jù)
? 如下圖,副本A為leader副本咕痛,副本B為follower副本痢甘,它們的HW和LEO都為4。
?? 此時(shí)茉贡,A中寫入一條消息塞栅,它的LEO更新為5,B從A中同步了這條數(shù)據(jù)腔丧,自己的LEO也更新為5
? 之后B再向A發(fā)起請(qǐng)求以拉取數(shù)據(jù)放椰,該FetchRequest請(qǐng)求中帶上了B中的LEO信息作烟,A在收到該請(qǐng)求后根據(jù)B的LEO值更新了自己的HW為5,A中雖然沒有更多的消息砾医,但還是在延時(shí)一段時(shí)間之后返回FetchRresponse拿撩,其中也包含了HW信息,最后B根據(jù)返回的HW信息更新自己的HW為5如蚜。
? 可以看到整個(gè)過程中兩者之間的HW同步有一個(gè)間隙绷雏,B在同步A中的消息之后需要再一輪的FetchRequest/FetchResponse才能更新自身的HW為5。如果在更新HW之前怖亭,B宕機(jī)了涎显,那么B在重啟之后會(huì)根據(jù)之前HW位置進(jìn)行日志截?cái)啵@樣便會(huì)將4這條消息截?cái)嘈诵桑缓笤傧駻發(fā)送請(qǐng)求拉取消息期吓。此時(shí)若A再宕機(jī),那么B就會(huì)被選舉為新的leader倾芝。B恢復(fù)之后會(huì)成為follower讨勤,由于follower副本的HW不能比leader副本的HW高,所以還會(huì)做一次日志截?cái)喑苛恚源藢W調(diào)整為4潭千。這樣一來4這條數(shù)據(jù)就丟失了(就算A不能恢復(fù),這條數(shù)據(jù)也同樣丟失了)借尿。
? 對(duì)于這種情況刨晴,一般要求起碼設(shè)置如下4個(gè)參數(shù):
1)給這個(gè)topic設(shè)置replication.factor參數(shù):這個(gè)值必須大于1,要求每個(gè)partition必須有至少2個(gè)副本
2)在kafka服務(wù)端設(shè)置min.insync.replicas參數(shù):這個(gè)值必須大于1路翻,這個(gè)是要求一個(gè)leader至少感知到有至少一個(gè)follower還跟自己保持聯(lián)系狈癞,沒掉隊(duì),這樣才能確保leader掛了還有一個(gè)follower
3)在producer端設(shè)置acks=all或-1:這個(gè)是要求每條數(shù)據(jù)茂契,必須是寫入所有replica之后蝶桶,才能認(rèn)為是寫成功了
4)在producer端設(shè)置retries為很大的一個(gè)值:這個(gè)是要求一旦寫入失敗,就無限重試掉冶,它默認(rèn)為0真竖,即在發(fā)生異常之后不進(jìn)行任何重試。
? 當(dāng)然厌小,設(shè)置了acks等于all或-1之后恢共,會(huì)影響一定的性能。Kafka從0.11.0.0(我們公司現(xiàn)在用的版本為0.10.0.0)開始引入了leader epoch的概念召锈,在需要截?cái)鄶?shù)據(jù)的時(shí)候使用leader epoch作為參考依據(jù)而不是原本的HW旁振。leader epoch代表leader的紀(jì)元信息获询,初始值為0涨岁,每當(dāng)leader變更一次拐袜,leader epoch的值就會(huì)加1,相當(dāng)于為leader增設(shè)了一個(gè)版本號(hào)梢薪。引入leader epoch很好的解決了前面所說的數(shù)據(jù)丟失問題蹬铺,也就不需要去設(shè)置acks=all了。
3)生產(chǎn)者端不會(huì)丟失數(shù)據(jù)
? 如果你配置了上面場(chǎng)景的參數(shù)秉撇,就是當(dāng)數(shù)據(jù)寫入leader副本和所有follower副本成功后才返回響應(yīng)給生產(chǎn)者甜攀,如果寫入不成功,生產(chǎn)者會(huì)不斷重試琐馆。
2规阀、Kafka 怎么防止重復(fù)消費(fèi)
? 消費(fèi)者的自動(dòng)位移提交方式會(huì)帶來重復(fù)消費(fèi)的問題。假設(shè)剛剛提交完一次消費(fèi)位移瘦麸,然后拉取一批消息進(jìn)行消費(fèi)谁撼,在下一次自動(dòng)位移提交之前,消費(fèi)者崩了滋饲,那么等消費(fèi)者恢復(fù)再來消費(fèi)消息的時(shí)候又得從上一次位移提交的地方重新開始厉碟,這樣便發(fā)生了重復(fù)消費(fèi)的現(xiàn)象。
? 其實(shí)這里可以類似上面消費(fèi)端丟失數(shù)據(jù)的情況屠缭,很多時(shí)候并不是說拉取到消息就算消費(fèi)完成箍鼓,而是將消息寫入數(shù)據(jù)庫或緩存中,或者是更加復(fù)雜的業(yè)務(wù)處理呵曹,重復(fù)消費(fèi)也同樣如此款咖,重復(fù)消費(fèi)不可怕,可怕的是你沒考慮到重復(fù)消費(fèi)之后奄喂,怎么保證冪等性之剧,通俗點(diǎn)說,就一個(gè)數(shù)據(jù)砍聊,或者一個(gè)請(qǐng)求背稼,給你重復(fù)來多次,你得確保對(duì)應(yīng)的數(shù)據(jù)是不會(huì)改變的玻蝌,不能出錯(cuò)蟹肘。這里防止重復(fù)消費(fèi),你可以像上面一樣把自動(dòng)提交改為手動(dòng)提交俯树,或者是保證消息消費(fèi)的冪等性帘腹。
保證消費(fèi)消息冪等性
1)如果你是要插入mysql中,可以對(duì)其設(shè)置唯一鍵许饿,插入重復(fù)的數(shù)據(jù)只會(huì)插入報(bào)錯(cuò)阳欲,不會(huì)有重復(fù)數(shù)據(jù)產(chǎn)生
2)如果你是要寫入redis中,每次都是set操作,可以保證冪等性
? 如何保證消息消費(fèi)是冪等性的球化,需要結(jié)合具體的業(yè)務(wù)來看秽晚。
3、Kafka為什么這么快筒愚?
1)消息壓縮
? Kafka在對(duì)消息進(jìn)行壓縮赴蝇,Producer 端壓縮、Broker 端保持巢掺、Consum進(jìn)行解壓縮句伶。它秉承了用時(shí)間去換空間的思想,具體來說就是用CPU時(shí)間去換磁盤空間或網(wǎng)絡(luò)I/O傳輸量陆淀,希望以較小的CPU開銷帶來更少的磁盤占用或更少的網(wǎng)絡(luò)I/O傳輸考余。Kafka支持多種壓縮算法,如GZIP轧苫、Snappy 和 LZ4秃殉。
2)數(shù)據(jù)讀寫
? Kafka會(huì)把收到的消息都寫入到磁盤中,它絕對(duì)不會(huì)丟失數(shù)據(jù)浸剩。因?yàn)榇疟P是機(jī)械結(jié)構(gòu)钾军,每次讀寫都會(huì)尋址->寫入,其中尋址是一個(gè)“機(jī)械動(dòng)作”绢要,它是最耗時(shí)的吏恭。所以磁盤最討厭隨機(jī)I/O,最喜歡順序I/O重罪。為了提高讀寫硬盤的速度樱哼,Kafka就是使用順序I/O。
? 如上圖剿配,每個(gè)partition在存儲(chǔ)層面可以看作一個(gè)可追加的日志文件搅幅,收到消息后Kafka會(huì)把數(shù)據(jù)順序?qū)懭胛募┪病?/p>
? 即便是順序?qū)懭氪疟P,磁盤的訪問速度還是不可能追上內(nèi)存呼胚。所以Kafka的數(shù)據(jù)并不是實(shí)時(shí)的寫入磁盤茄唐,它充分利用了現(xiàn)代操作系統(tǒng)的頁緩存,就是把磁盤中的數(shù)據(jù)緩存到內(nèi)存中蝇更,把對(duì)磁盤的訪問變?yōu)閷?duì)內(nèi)存的訪問沪编,來利用內(nèi)存提高I/O效率。
? 除了消息順序追加年扩、頁緩存等技術(shù)蚁廓,Kafka還使用了零拷貝(Zero-Copy)技術(shù)來進(jìn)一步提升性能。所謂的零拷貝是指將數(shù)據(jù)直接從磁盤文件復(fù)制到網(wǎng)卡設(shè)備中厨幻,而不需要經(jīng)由應(yīng)用程序之手相嵌,這樣大大提高了應(yīng)用程序的性能腿时,減少了內(nèi)核和用戶模式之間的上下文切換。
4饭宾、消息隊(duì)列時(shí)間開銷最大的在哪兒批糟?
? 根據(jù)上面對(duì)Kafka的分析,可以類推作為一個(gè)消息中間件所需的時(shí)間開銷主要在以下兩個(gè)方面:1)消息讀寫 2)網(wǎng)絡(luò)傳輸
5捏雌、Kafka跟其他消息隊(duì)列的差異與適應(yīng)的場(chǎng)景是哪些?
? 簡單介紹下比較常用的消息中間件:
? RabbitMQ是采用Erlang語言實(shí)現(xiàn)的AMQP協(xié)議的消息中間件笆搓,最初起源于金融系統(tǒng)性湿,用于在分布式系統(tǒng)中存儲(chǔ)和轉(zhuǎn)發(fā)消息。RabbitMQ發(fā)展到今天满败,被越來越多的人認(rèn)可肤频,這和它在可靠性、可用性算墨、擴(kuò)展性宵荒、功能豐富等方面的卓越表現(xiàn)是分不開的。
? RocketMQ是阿里開源的消息中間件净嘀,目前已捐獻(xiàn)給Apache基金會(huì)报咳,它是由Java語言開發(fā)的,具備高吞吐量挖藏、高可用性暑刃、適合大規(guī)模分布式系統(tǒng)應(yīng)用等特點(diǎn),經(jīng)歷過“雙十一”的洗禮膜眠,實(shí)力不容小覷岩臣。
從以下幾個(gè)方面來分析Kafka與其它常用的消息中間件的差異:
可靠性:Kafka的ISR機(jī)制保證其高可用,一主多從宵膨,leader副本掛掉后架谎,可以自動(dòng)選舉新的leader;RocketMQ也支持主從機(jī)制保證其高可用辟躏,通過設(shè)定brokerId=0來設(shè)置master谷扣,不支持主從切換,master失效以后捎琐,從slave中進(jìn)行消費(fèi)抑钟;RabbitMQ也是支持主從機(jī)制保證高可用,master掛掉以后野哭,最早加入集群的slave成為master在塔,支持主從自動(dòng)切換。
單機(jī)吞吐量:RabbitMQ單機(jī)吞吐量在萬級(jí)別之內(nèi)拨黔,吞吐量比RocketMQ和Kafka要低了一個(gè)數(shù)量級(jí)蛔溃;RocketMQ和Kafka單機(jī)吞吐量可以維持在十萬級(jí)別。
應(yīng)用場(chǎng)景:RabbitMQ在金融支付領(lǐng)域使用較多,而在日志處理贺待、大數(shù)據(jù)等方面Kafka使用居多徽曲,而RocketMQ目前在阿里集團(tuán)被廣泛應(yīng)用于交易、充值麸塞、流計(jì)算秃臣、消息推送、日志流式處理哪工、binglog分發(fā)等場(chǎng)景奥此,支撐了阿里多次雙十一活動(dòng)。
6雁比、Kafka在我們系統(tǒng)中的應(yīng)用稚虎,生產(chǎn)者、消費(fèi)者分別是什么偎捎?groupid是什么蠢终,topic是什么?
?我們的BinaryBinlogKafkaProducer
while (true) {
try {
if(BinlogUtil.isCache(binlog.getHeader().getTableName())){
if(!TopicUtil.isExit(binlog.getHeader().getSchemaName()+"."+binlog.getHeader().getTableName())){
//創(chuàng)建topic
TopicUtil.createTopic(binlog.getHeader().getSchemaName()+"."+binlog.getHeader().getTableName());
}
send(binlog.getHeader().getSchemaName()+"."+binlog.getHeader().getTableName(),binlog.toByteArray());
}else {
send(topic, binlog.toByteArray());
}
break;
} catch (FailedToSendMessageException e) {
// 處理異常
}
}
topic設(shè)置有個(gè)判斷茴她,如果環(huán)境變量中有設(shè)置cacheTable這個(gè)參數(shù)寻拂,則設(shè)置topic為“庫名.表名",若沒有丈牢,則使用“Binlog”作為topic
消費(fèi)者
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface MessageConsumer {
String groupId() default "";
String zkHost() default "127.0.0.1";
}
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface MessageConsumerAction {
String topic() default "Binlog";
String eventType();
}
消費(fèi)者的groupId設(shè)置在MessageConsumerServiceImpl中兜喻,設(shè)置的是className
String className = context.getIface() instanceof Proxy ? ((Class)
ifaceClass.getMethod("getTargetClass").invoke(context.getIface())).getName() : ifaceClass.getName();
groupId = "".equals(groupId) ? className : ifaceClass.getName();
消費(fèi)者的消費(fèi)是通過注解使用的
@MessageConsumer
@Transactional(value ="iplm_reportdb", rollbackFor = Array(classOf[Exception]))
class ReportBinlogServiceImplextends ReportBinlogService{
def init(): Unit ={
// 初始化加載緩存
}
@MessageConsumerAction(topic = "Binlog")
override def onReportModuleBinlogMessage(message: ByteBuffer): Unit = {
// 處理binlog
}
}