這篇文章是我在粗略學(xué)習(xí)kafka時遇到的問題和網(wǎng)上查閱過的資料。kafka和java我都不是很熟悉灵寺,只知道基本的概念曼库。文章中可能有疏漏和錯誤的地方,大家參考一下就好略板。有錯誤拜托大家指正毁枯!
broker端
- 配置文件需要配置的參數(shù)
- message.max.bytes : kafka 會接收單個消息size的最大限制, 默認(rèn)為1M左右叮称。如果producer發(fā)送比這個大的消息种玛,kafka默認(rèn)會丟掉。producer可以從callback函數(shù)中獲得錯誤碼:10瓤檐。
- log.segment.bytes : kafka數(shù)據(jù)文件的大小蒂誉。默認(rèn)為1G, 需要確保此值大于一個消息的最大大小。
- replica.fetch.max.bytes : broker可復(fù)制的消息的最大字節(jié)數(shù), 默認(rèn)為1M距帅。這個值應(yīng)該比message.max.bytes大右锨,否則broker會接收此消息,但無法將此消息復(fù)制出去碌秸,從而造成數(shù)據(jù)丟失绍移。
- bin目錄下的kafka-run-class.sh中需要配置的參數(shù)
kafka是由scala和java編寫的。所以需要調(diào)一些jvm的參數(shù)讥电。java的內(nèi)存分為堆內(nèi)內(nèi)存和堆外內(nèi)存蹂窖。
-Xms2048m, -Xmx2048m,設(shè)置的是堆內(nèi)內(nèi)存恩敌。-Xms是初始可用的最大堆內(nèi)內(nèi)存瞬测。-Xmx設(shè)置的是最大可用的堆內(nèi)內(nèi)存。二者設(shè)置成一樣是因為效率問題纠炮,可以讓jvm少做一些運算月趟。如果這兩個參數(shù)設(shè)置的太小,kafka會出現(xiàn)java.lang.OutOfMemoryError: Java heap space的錯誤恢口。
-
--XX:MaxDirectMemorySize=8192m孝宗。這個參數(shù)配置的太小,kafka會出現(xiàn)java.lang.OutOfMemoryError: Direct buffer memory的錯誤耕肩。 因為kafka的網(wǎng)絡(luò)IO使用了java的nio中的DirectMemory的方式,而這個申請的是堆外內(nèi)存因妇。
至于kafka的什么組件的哪些方法用了堆內(nèi)內(nèi)存還是堆外內(nèi)存问潭,還有用多少,我不清楚婚被,沒有學(xué)過java狡忙,也沒看過kafka源碼。只是碰到了這個問題址芯,然后記錄一下去枷。
producer端(php-rdkafka)
- message.max.bytes, 要設(shè)置大于發(fā)送最大數(shù)據(jù)的大小是复,否則會produce失敗删顶。
consumer端(php-rdkafka)
- receive.message.max.bytes : kafka 協(xié)議response 的最大長度,應(yīng)該保證次參數(shù)大于等于message.max.bytes淑廊。否則消費會失敗逗余。
另外,還需要注意一個問題季惩,版本過低的librdkafka的receive.message.max.bytes只支持1000到1000000000录粱。目前最新版本的可以支持到2147483647。使用此參數(shù)的時候還需要注意一個問題画拾,比如我在broker端設(shè)置的message.max.bytes為1000啥繁, 在consumer端設(shè)置的receive.message.max.bytes也為1000,但是除了數(shù)據(jù)青抛,response還有協(xié)議相關(guān)字段旗闽,這時候整個response的大小就會超過1000。而consumer就會收到“Receive failed: Invalid message size 1047207987 (0..1000000000): increase receive.message.max.bytes”這樣的錯誤蜜另。broker為何會返回總量為1000大小的數(shù)據(jù)呢适室?其實librdkafka有這樣一個參數(shù):fetch.max.bytes, 它有這樣的描述:Maximum amount of data the broker shall return for a Fetch request. Messages are fetched in batches by the consumer and if the first message batch in the first non-empty partition of the Fetch request is larger than this value, then the message batch will still be returned to ensure the consumer can make progress. The maximum message batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (broker topic config). fetch.max.bytes is automatically adjusted upwards to be at least message.max.bytes (consumer config). 它會自動調(diào)整到message.max.bytes 這樣的大小,返回的數(shù)據(jù)大小甚至還可能超過這個大小举瑰。而你此時設(shè)置的
receive.message.max.bytes == message.max.bytes == 1000 就會出上面說的那個錯誤捣辆。所以應(yīng)該讓consumer端設(shè)置的receive.message.max.bytes大于broker端設(shè)置的 message.max.bytes ,我猜應(yīng)該大于單個最大數(shù)據(jù)的大小此迅,這樣就不會出錯了汽畴,但是沒有驗證。目前我的測試環(huán)境數(shù)據(jù)最大為500M左右耸序。message.max.bytes 我設(shè)置了900M忍些, receive.message.max.bytes設(shè)置了1000000000, 暫時沒有出現(xiàn)問題佑吝。
內(nèi)存方面需要考慮的問題
- 有一段這樣的描述 Brokers allocate a buffer the size of replica.fetch.max.bytes for each partition they replicate. If replica.fetch.max.bytes is set to 1 MiB, and you have 1000 partitions, about 1 GiB of RAM is required. Ensure that the number of partitions multiplied by the size of the largest message does not exceed available memory.
The same consideration applies for the consumer fetch.message.max.bytes setting. Ensure that you have enough memory for the largest message for each partition the consumer replicates. With larger messages, you might need to use fewer partitions or provide more RAM.
一些坑
- 如果一個消息需要的處理時間很長坐昙,broker會認(rèn)為consumer已經(jīng)掛了绳匀,把partition分配給其他的consumer芋忿,然后循環(huán)往復(fù)炸客, 這條record永遠(yuǎn)沒法消費。方法是增加max.poll.interval.ms 參數(shù)戈钢。關(guān)于此參數(shù)的一些討論:
https://stackoverflow.com/questions/39730126/difference-between-session-timeout-ms-and-max-poll-interval-ms-for-kafka-0-10-0 - 當(dāng)開啟auto commit時痹仙,這一次poll得到消息對應(yīng)的offset其實是下一次調(diào)用poll時再提交的。即使過了auto.commit.interval.ms也不會提交殉了。關(guān)于此參數(shù)的一些討論:
https://stackoverflow.com/questions/38230862/need-clarification-about-kafka-auto-commit-and-auto-commit-interval-ms
- 當(dāng)開啟auto commit時痹仙,這一次poll得到消息對應(yīng)的offset其實是下一次調(diào)用poll時再提交的。即使過了auto.commit.interval.ms也不會提交殉了。關(guān)于此參數(shù)的一些討論:
性能調(diào)優(yōu)
https://www.cnblogs.com/SpeakSoftlyLove/p/6511547.html
https://blog.csdn.net/vegetable_bird_001/article/details/51858915
以上內(nèi)容參考資料:
- kafka的一些概念
http://f.dataguru.cn/thread-730782-1-1.html
https://www.cnblogs.com/huxi2b/p/6223228.html - kafka jvm 配置
https://blog.csdn.net/chen88358323/article/details/51824232
https://blog.csdn.net/xzj9581/article/details/72866225 - kafka 內(nèi)存考慮
https://www.cloudera.com/documentation/kafka/latest/topics/kafka_performance.html#concept_exp_hzk_br__d22306e79
http://www.cnblogs.com/doubletree/p/4264969.html
http://www.voidcn.com/article/p-zmwswtfy-bbz.html
kafka的復(fù)制
https://blog.csdn.net/lizhitao/article/details/51718185 - kafka和DirectMemory的關(guān)系
http://www.360doc.com/content/13/0502/23/7669533_282552666.shtml - java的nio和內(nèi)存管理介紹
http://www.importnew.com/17781.html
https://blog.csdn.net/flyineagle/article/details/1399554
https://blog.csdn.net/evan_man/article/details/509105
http://www.importnew.com/21463.html42
http://blog.chinaunix.net/uid-26863299-id-3559878.html
https://blog.csdn.net/u010003835/article/details/52957904 - kafka出現(xiàn)的問題
https://blog.csdn.net/guoyuqi0554/article/details/48630907
http://blog.51cto.com/navyaijm/1931962