參考
kafka 技術(shù)分享
如何確定Kafka的分區(qū)數(shù),key和consumer線程數(shù)伤溉,以及不消費問題解決
kafka性能參數(shù)和壓力測試揭秘
kafka producer線程與吞吐量
1.partition數(shù)量配置
partition數(shù)量由topic的并發(fā)決定扰路,并發(fā)少則1個分區(qū)就可以猾骡,并發(fā)越高舅桩,分區(qū)數(shù)越多,可以提高吞吐量担猛。
創(chuàng)建topic時指定topic數(shù)量
bin/kafka-topics.sh --create --zookeeper 10.25.58.35:2181 --replication-factor 3 --partitions 3 --topic test8
2.日志保留策略設(shè)置
當kafka broker的被寫入海量消息后,會生成很多數(shù)據(jù)文件丢氢,占用大量磁盤空間傅联,kafka默認是保留7天,建議根據(jù)磁盤情況配置疚察,避免磁盤撐爆蒸走。
log.retention.hours=72
段文件配置1GB,有利于快速回收磁盤空間貌嫡,重啟kafka加載也會加快(如果文件過小比驻,則文件數(shù)量比較多该溯,kafka啟動時是單線程掃描目錄(log.dir)下所有數(shù)據(jù)文件)
log.segment.bytes=1073741824
3.文件刷盤策略
為了大幅度提高producer寫入吞吐量,需要定期批量寫文件别惦。建議配置:
每當producer寫入10000條消息時狈茉,刷數(shù)據(jù)到磁盤
log.flush.interval.messages=10000
每間隔1秒鐘時間,刷數(shù)據(jù)到磁盤
log.flush.interval.ms=1000
4.網(wǎng)絡(luò)和io操作線程配置優(yōu)化
一般num.network.threads主要處理網(wǎng)絡(luò)io掸掸,讀寫緩沖區(qū)數(shù)據(jù)氯庆,基本沒有io等待,配置線程數(shù)量為cpu核數(shù)加1.
broker處理消息的最大線程數(shù)
num.network.threads=xxx
num.io.threads主要進行磁盤io操作扰付,高峰期可能有些io等待堤撵,因此配置需要大些。配置線程數(shù)量為cpu核數(shù)2倍羽莺,最大不超過3倍.
broker處理磁盤IO的線程數(shù)
num.io.threads=xxx
加入隊列的最大請求數(shù),超過該值实昨,network thread阻塞
queued.max.requests=5000
server使用的send buffer大小。
socket.send.buffer.bytes=1024000
server使用的recive buffer大小。
socket.receive.buffer.bytes=1024000
5.異步提交(kafka.javaapi.producer)
采用同步:1000條8s扣墩;
采用異步:100條或3s異步寫入趾浅,速度提升為1w條2s(ProducerConfig)
request.required.acks=0
producer.type=async
##在異步模式下,一個batch發(fā)送的消息數(shù)量锐墙。producer會等待直到要發(fā)送的消息數(shù)量達到這個值,之后才會發(fā)送长酗。但如果消息數(shù)量不夠溪北,達到queue.buffer.max.ms時也會直接發(fā)送。
batch.num.messages=100
##默認值:200夺脾,當使用異步模式時之拨,緩沖數(shù)據(jù)的最大時間。例如設(shè)為100的話咧叭,會每隔100毫秒把所有的消息批量發(fā)送蚀乔。這會提高吞吐量,但是會增加消息的到達延時
queue.buffering.max.ms=100
##默認值:5000菲茬,在異步模式下吉挣,producer端允許buffer的最大消息數(shù)量,如果producer無法盡快將消息發(fā)送給broker婉弹,從而導致消息在producer端大量沉積睬魂,如果消息的條數(shù)達到此配置值,將會導致producer端阻塞或者消息被拋棄镀赌。
queue.buffering.max.messages=1000 ##發(fā)送隊列緩沖長度
##默認值:10000氯哮,當消息在producer端沉積的條數(shù)達到 queue.buffering.max.meesages 時,阻塞一定時間后商佛,隊列仍然沒有enqueue(producer仍然沒有發(fā)送出任何消息)喉钢。此時producer可以繼續(xù)阻塞或者將消息拋棄姆打,此timeout值用于控制阻塞的時間,如果值為-1(默認值)則 無阻塞超時限制出牧,消息不會被拋棄穴肘;如果值為0 則立即清空隊列,消息被拋棄舔痕。
queue.enqueue.timeout.ms=100
compression.codec=gzip
6.producer版本
參考
what's the difference between kafka.javaapi.* and org.apache.kafka.*?
Kafka new producer not behaving consistently
使用新producer發(fā)送少量消息時丟失
新producer:org.apache.kafka.clients.producer(KafkaProducer.java)
老producer:kafka.javaapi.producer(Producer.scala)
- 查閱資料后评抚,原因為使用producer時必須調(diào)用producer.close(),且在發(fā)送后Thread.sleep適當時間伯复,則不會丟失數(shù)據(jù)慨代。否則會造成資源泄露,導致數(shù)據(jù)丟失啸如。
- 當使用多個producer進行發(fā)送時(使用apache線程池)侍匙,當同時有多個producer并發(fā)發(fā)送時,依然會造成數(shù)據(jù)丟失叮雳。sleep后有好轉(zhuǎn)想暗,但仍然丟失。
- 使用老producer帘不,且compression.codec不為snappy時说莫,不會造成數(shù)據(jù)丟失。使用線程池也不會丟失寞焙。
7.性能測試
kafka 10 性能測試
kafka自帶的性能測試工具储狭,位于bin/kafka-producer-perf-test.sh。
8.生產(chǎn)端發(fā)送堵塞
- 調(diào)整producer緩沖區(qū)大小 queue.buffering.max.messages
- 增加通道數(shù)量:多建幾個producer捣郊,使用連接池管理producer
producer使用線程池
- buffer.memory設(shè)置的緩存是針對每個producerThread
針對每個producerThread辽狈,不應(yīng)設(shè)置高,以免影響內(nèi)存 - 線程池中線程數(shù)量如何設(shè)置呛牲?
監(jiān)視剩余線程數(shù)據(jù)刮萌,進行動態(tài)調(diào)整,并針對可能出現(xiàn)的峰值預(yù)留一定的線程娘扩。 - 使用tryAcquire()還是acquire()尊勿??阻塞或放棄消息畜侦??
使用apache的線程池即可躯保,設(shè)置阻塞時的等待時間旋膳,超過后則拋出異常。 - 是否對線程池容量進行動態(tài)調(diào)整途事?
使用apache的線程池即可验懊。 - 線程池最大線程數(shù)100擅羞,啟用50個thread同時發(fā)送日志,報錯:
kafka.common.QueueFullException: Event queue is full of unsent messages, could not send event: KeyedMessage(test12,null,null,........
報錯原因為生產(chǎn)速度大于發(fā)送速度(網(wǎng)絡(luò)傳輸?shù)葲Q定)义图,可設(shè)置繼續(xù)等待時間减俏,超過此時間后丟棄消息;或設(shè)置一直阻塞碱工,排隊等待消息發(fā)送完畢(會造成線程死鎖)娃承。