背景:
? ? ? 最近在用kafka做消息中間件钓瞭,producer從hive中讀取消息發(fā)送到kafka屡律,后端storm對消息分類發(fā)送到elasticsearch建立索引。
問題:
? ? ? hive表中總共350萬數(shù)據(jù)降淮,當(dāng)時整個全量索引結(jié)束后發(fā)現(xiàn)超埋,最后索引條數(shù)總共310萬左右。storm日志沒有任何錯誤日志佳鳖。
排查:
? ? ? 首先排查storm consumer的問題霍殴,由于發(fā)現(xiàn)storm日志沒有任何異常,所以第一步基本排除建索引程序的問題系吩。storm 消費kafka用的官方storm-kafka包来庭,而且已開啟ack,所以基本排除storm端的問題穿挨。
? ? 現(xiàn)在懷疑kafka里的數(shù)據(jù)本身只有310萬條數(shù)據(jù)月弛,寫了一個程序扔到了kafka集群上探查了一下,印證了自己的想法科盛。果然帽衙,數(shù)據(jù)只有310萬條。現(xiàn)在基本判斷問題的在kafka producer上贞绵。仔細(xì)查看了下producer代碼
props.put("acks","all");
props.put("retries",3);? ? ?
? ? ?"acks" 選項表示kafka 的ack級別:acks=0 意味著producer永遠(yuǎn)不會等待任何一個來自broker的ack厉萝,意味著不需要任何確實,發(fā)送及以為著成功。acks=1 意味著在leader replica已經(jīng)接收到數(shù)據(jù)后谴垫,producer會得到一個ack章母,這個選項對速度與安全性做一個平衡,但是不需要等其他副本確認(rèn)翩剪,如果發(fā)生leader掛了乳怎,其他副本還沒來得及同步,這時就會發(fā)生數(shù)據(jù)丟失的情況前弯。最后一種數(shù)據(jù)最安全的情況就是acks=al蚪缀,l意味著在所有的ISR都接收到數(shù)據(jù)后,producer才得到一個ack博杖。這個選項提供了最好的持久性椿胯,只要還有一個replica存活筷登,那么數(shù)據(jù)就不會丟失剃根,但是相應(yīng)的吞吐量會受到影響。本著對業(yè)務(wù)對數(shù)據(jù)可靠性的要求前方,我選擇了最高的可靠級別狈醉,這點沒毛病。
? ? "retries"選項大于0的值將使客戶端重新發(fā)送任何數(shù)據(jù)惠险,一旦這些數(shù)據(jù)發(fā)送失敗苗傅,會間隔一段時間重試,這個值設(shè)置的就是重試間隔時間班巩。初步懷疑這個值太小渣慕,如果磁盤卡頓,網(wǎng)絡(luò)中斷超過三秒抱慌,是否會丟數(shù)據(jù)逊桦。所以將這個參數(shù)調(diào)大到300。
? ? ?重新打包上傳到storm集群重新跑了一回抑进,數(shù)據(jù)還是丟了30多萬强经。場面一度尷尬。寺渗。問題陷入了僵局匿情。
轉(zhuǎn)機(jī):
? ? 現(xiàn)在的問題已經(jīng)超過了我的認(rèn)知,之前從來沒出現(xiàn)過如此嚴(yán)重的丟數(shù)據(jù)的問題信殊。在網(wǎng)上搜的資料大部分都看過炬称。理論上可靠性可以通過副本解決,沒有類似于我這個種問題涡拘。心想著如果不行转砖,只能更改broker 從page cache同步到硬盤的頻率了。鬼使神差下,我更改了下producer的壓縮格式府蔗,從snappy改到gzip晋控,這次kafka中的消息,竟然只少了2000姓赤。同樣的參數(shù)赡译,只改了下壓縮格式。我又查看下了前兩次用snapp格式不铆,kafka里的消息數(shù)蝌焚,發(fā)現(xiàn)了一個問題,兩次用snappy的時候誓斥,kafka消息數(shù)竟然一模一樣只洒。如果不是玄學(xué)的問題,理論上如果丟消息劳坑,350萬條毕谴,丟相同條數(shù)的信息概率簡直太小了。
? 現(xiàn)在問題似乎已經(jīng)很清晰了距芬,gzip壓縮率要比snappy高涝开,snappy優(yōu)勢在于壓縮速度。壓縮率高意味著單條數(shù)據(jù)要小】蜃校現(xiàn)在基本問題定位在單條數(shù)據(jù)大小的問題舀武。但是為什么producer端沒有異常日志呢。查看一下producer發(fā)送消息的源碼:“Future send(ProducerRecord var1)” producer 發(fā)送消息后會發(fā)揮一個future离斩,這種模式是異步發(fā)送方式银舱,當(dāng)broker返回異常信息時并不會拋出。跛梗,producer.send(producerRecord).get()寻馏,加上get(),將異步改同步茄袖,打包運(yùn)行果然發(fā)送到30萬條左右數(shù)據(jù)時就已經(jīng)拋出異常
kafka.common.MessageSizeTooLargeException
解決:
? 至此問題已經(jīng)定位到操软,下一步解決問題,搜了下stackoverflow宪祥,參考下最高票回答:
Consumer side:fetch.message.max.bytes- this will determine the largest size of a message that can be fetched by the consumer.
Broker side:replica.fetch.max.bytes- this will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated).
Broker side:message.max.bytes- this is the largest size of the message that can be received by the broker from a producer.
Broker side (per topic):max.message.bytes- this is the largest size of the message the broker will allow to be appended to the topic. This size is validated pre-compression. (Defaults to broker'smessage.max.bytes.)
? ? 已完美解決問題聂薪。