搬運(yùn)自:https://segmentfault.com/a/1190000021619832
前言
kafka針對(duì)broker, topic, producer, consumer的配置非常多,這里講一下常用的配置
1.broker相關(guān)配置
broker.id
broker在kafka集群中的唯一標(biāo)識(shí),必須是一個(gè)大于等于0的整數(shù),如果不寫(xiě)的話(huà)默認(rèn)從1001開(kāi)始吝沫。
建議:把它設(shè)置成與機(jī)器名具有相關(guān)性的整數(shù)须蜗。port
設(shè)置kafka的端口號(hào),默認(rèn)情況下是9092,不建議修改成1024以下的端口垢村,因?yàn)樾枰褂胷oot權(quán)限啟動(dòng)躏结。-
zookeeper.connect
設(shè)置zookeeper集群劫狠,該配置參數(shù)是一組用逗號(hào)隔開(kāi)的host:port/path列表- host是zookeeper服務(wù)器的機(jī)器名或ip地址
- port是zookeeper服務(wù)器的端口
- /path是可選的zookeeper路徑潜的,作為kafka集群的chroot環(huán)境骚揍,默認(rèn)是根路徑,如果指定的chroot路徑不存在啰挪,kafka會(huì)在啟動(dòng)的時(shí)候創(chuàng)建它信不。使用chroot使得zookeeper集群可以共享給其他應(yīng)用程序時(shí)而不會(huì)產(chǎn)生沖突。
log.dirs
配置kafka日志片段的目錄位置,多個(gè)路徑以逗號(hào)隔開(kāi)亡呵。如果配置了多個(gè)路徑抽活,kafka會(huì)根據(jù)最少使用原則,把統(tǒng)一分區(qū)的日志保存到同一路徑下锰什,注意的是下硕,kafka會(huì)往擁有最少數(shù)量分區(qū)的路徑新增分區(qū),而不是往擁有最小磁盤(pán)空間的路徑新增分區(qū)汁胆。-
auto.create.topics.enable
配置是否開(kāi)啟自動(dòng)創(chuàng)建topic梭姓,如果設(shè)置為true, kafka會(huì)在以下幾種場(chǎng)景自動(dòng)創(chuàng)建topic:- 當(dāng)一個(gè)producer開(kāi)始往topic寫(xiě)入消息時(shí)。
- 當(dāng)一個(gè)consumer開(kāi)始從topic消費(fèi)消息時(shí)嫩码。
- 當(dāng)一個(gè)client向topic發(fā)送元數(shù)據(jù)請(qǐng)求時(shí)糊昙。
num.partitions
配置創(chuàng)建主題時(shí)包含多少個(gè)分區(qū),默認(rèn)值為1谢谦,因?yàn)槲覀兡茉黾又黝}的分區(qū)數(shù)释牺,但是不能減少分區(qū)的個(gè)數(shù),所以回挽,如果要讓一個(gè)主題的分區(qū)個(gè)數(shù)少于num.partitions需要手動(dòng)創(chuàng)建該主題而不是通過(guò)自動(dòng)創(chuàng)建主題没咙。log.retention.hours
配置kafka保留數(shù)據(jù)的時(shí)間,默認(rèn)為168小時(shí)也就是7天千劈,效果等同log.retention.minutes和log.retention.ms,只是單位不一樣祭刚,分別是小時(shí),分鐘,和毫秒涡驮,推薦使用log.retention.ms,粒度更加細(xì)暗甥,如果三個(gè)參數(shù)都配置了則去數(shù)值最小的配置。log.retention.bytes
配置一個(gè)分區(qū)能保存最大的字節(jié)數(shù)捉捅,如果超出的部分就會(huì)被刪除撤防,同時(shí)配置了log.retention.hours/log.retention.minutes/log.retention.ms的話(huà),任一個(gè)滿(mǎn)足條件都會(huì)觸發(fā)數(shù)據(jù)刪除。message.max.bytes
配置消息的大小限制棒口,默認(rèn)為100000寄月,也就是1M,這里的大小是指在kafka壓縮后的大小无牵,也就是說(shuō)實(shí)際消息可以大于1M,如果消息超過(guò)這個(gè)限制漾肮,則會(huì)被kafka拒收。
2.producer相關(guān)配置
bootstrap.servers
配置broker的地址茎毁,格式為host:port,如果多個(gè)則以逗號(hào)隔開(kāi),不需要配置所有的broker,producer會(huì)從給定的broker查詢(xún)其他broker的信息克懊,不過(guò)建議至少填寫(xiě)兩個(gè),以防在一個(gè)宕機(jī)的情況還能從另外一個(gè)去獲取broker的信息七蜘。-
acks
acks指定了必須要有多少個(gè)分區(qū)副本收到消息谭溉,producer才會(huì)認(rèn)為消息寫(xiě)入是成功的。- acks=0 producer發(fā)送消息不等待任何來(lái)自服務(wù)器的響應(yīng)崔梗,所以會(huì)出現(xiàn)消息丟失而producer不感知的情況夜只,該模式下可獲取最大的吞吐量。
- acks=1 只要集群的leader節(jié)點(diǎn)收到消息蒜魄,producer就會(huì)收到一個(gè)服務(wù)器的成功響應(yīng)扔亥,如果消息無(wú)法達(dá)到leader節(jié)點(diǎn),那么producer就會(huì)獲取到一個(gè)錯(cuò)誤響應(yīng)谈为,這時(shí)候?yàn)榱吮苊庀⒌膩G失旅挤,producer可以選擇重發(fā),不過(guò)如果一個(gè)沒(méi)有收到消息的節(jié)點(diǎn)成為新的leader,那么消息還是會(huì)丟失伞鲫。
- acks=all 只有當(dāng)leader節(jié)點(diǎn)和follower節(jié)點(diǎn)都收到消息時(shí),producer才會(huì)收到成功的響應(yīng)粘茄,這是一個(gè)避免消息丟失最安全的做法,不過(guò)這種模式吞吐量最低
client.id
可以是任意字符串秕脓,標(biāo)識(shí)消息的來(lái)源max.in.flight.requests.per.connection
配置producer在收到服務(wù)器響應(yīng)前可以發(fā)送的消息個(gè)數(shù)柒瓣,值越高,吞吐量就會(huì)越高吠架,不過(guò)相應(yīng)的占用的內(nèi)存也會(huì)越多芙贫,設(shè)置為1可以保證消息可以按照發(fā)送的順序?qū)懭敕?wù),即便發(fā)生了重試傍药。max.request.size
配置producer單次發(fā)送的所有消息的總的大小限制磺平,例如設(shè)置為1M,單個(gè)消息大小為1K魂仍,那么單次可以發(fā)的上限是1000個(gè),最好跟message.max.bytes配置匹配拣挪,避免發(fā)送到broker的消息被拒絕擦酌。retries
該參數(shù)決定了producer可以重發(fā)消息的次數(shù),producer從broker收到的錯(cuò)誤可能是臨時(shí)性的,例如分區(qū)找不到首領(lǐng)菠劝,這種情況下赊舶,producer在進(jìn)行retries次重試后就會(huì)放棄重試并且返回錯(cuò)誤,默認(rèn)情況下闸英,重試的時(shí)間間隔為100ms锯岖,可以通過(guò)retry.backoff.ms參數(shù)配置介袜,建議在設(shè)置重試間隔之前最好測(cè)試一下恢復(fù)一個(gè)崩潰的節(jié)點(diǎn)要多長(zhǎng)時(shí)間甫何,重試的間隔最好比恢復(fù)時(shí)間要長(zhǎng)。batch.size
當(dāng)多個(gè)消息往同一個(gè)分區(qū)發(fā)送時(shí)遇伞,producer會(huì)把這些消息放到同一個(gè)分區(qū)辙喂,該參數(shù)指定了一個(gè)批次可以使用的內(nèi)存大小,按字節(jié)數(shù)計(jì)算鸠珠,當(dāng)批次填滿(mǎn)時(shí)消息就會(huì)被發(fā)送出去巍耗,不過(guò)producer不一定等批次被填滿(mǎn)才會(huì)發(fā)送,甚至只有一個(gè)消息也會(huì)被發(fā)送渐排,所以就算把該值設(shè)置得很大也不會(huì)造成延遲炬太,只不過(guò)會(huì)占用內(nèi)存,但是如果設(shè)置太小的話(huà)驯耻,producer會(huì)很頻繁的發(fā)送亲族,增加一些額外的開(kāi)銷(xiāo)。linger.ms
指定producer發(fā)送批次之前要等待更多消息加入批次的時(shí)間可缚,producer會(huì)在批次填滿(mǎn)或者longer.ms到達(dá)上限時(shí)把批次發(fā)送出去霎迫,默認(rèn)情況下,只要有可用的線(xiàn)程帘靡,就算批次只有一個(gè)消息知给,producer也會(huì)把消息發(fā)送出去。把linger.ms設(shè)置成比0大的數(shù)描姚,讓producer在發(fā)送批次之前多等待一會(huì)涩赢,可以使得更多的消息可以加入到批次,雖然增加了延遲轩勘,但是同時(shí)也增加了吞吐量筒扒。
3.Consumer相關(guān)配置
fetch.min.bytes
配置Consumer從broker獲取記錄的最小字節(jié)數(shù),broker在收到Consumer的數(shù)據(jù)請(qǐng)求時(shí)赃阀,如果可用的數(shù)據(jù)量小于該配置霎肯,那么broker會(huì)等到有足夠的可用數(shù)據(jù)時(shí)才把它返回給Consumer擎颖。fetch.max.wait.ms
配置broker的等待時(shí)間,默認(rèn)為500ms观游,如果沒(méi)有足夠的數(shù)據(jù)流入搂捧,導(dǎo)致不滿(mǎn)足fetch.mis.bytes,最終會(huì)導(dǎo)致500ms的延遲懂缕。如果fetch.mis.bytes配置為1M,fetch.max.wait.ms配置為500ms,那么最終broker要么返回1M的數(shù)據(jù)允跑,要么等待500ms后返回所有可用的數(shù)據(jù),取決于哪個(gè)條件先得到滿(mǎn)足搪柑。max.partition.fetch.bytes
配置broker從每個(gè)分區(qū)返回給Consumer的最大字節(jié)數(shù),默認(rèn)為1MB,也就是說(shuō),KafkaConsumer.poll()方法從每個(gè)分區(qū)里面返回的記錄最多不超過(guò)該值聋丝,加入有10個(gè)分區(qū)5個(gè)消費(fèi)者,則每個(gè)消費(fèi)者需要2MB的內(nèi)存來(lái)接收消息工碾,需要注意的是弱睦,如果該值設(shè)置過(guò)大,導(dǎo)致消費(fèi)者處理的業(yè)務(wù)的時(shí)間過(guò)長(zhǎng)渊额,會(huì)有回話(huà)超時(shí)的風(fēng)險(xiǎn)况木。session.timeout.ms
配置了Consumer被認(rèn)定為死亡前可以與服務(wù)器斷開(kāi)連接的時(shí)間,默認(rèn)3秒旬迹,如果服務(wù)器在超過(guò)該值時(shí)間沒(méi)有收到Consumer的心跳火惊,就會(huì)認(rèn)定Consumer死亡,會(huì)觸發(fā)再均衡奔垦,把死亡Consumer的分區(qū)分配給其他Consumer屹耐,這個(gè)配置要跟heartbeat.interval.ms配合使用,heartbeat.interval.ms設(shè)置了poll()方法向協(xié)調(diào)器發(fā)送心跳的頻率椿猎。建議heartbeat.interval.ms的值為session.timeout.ms的三分之一惶岭。enable.auto.commit
指定了Consumer是否開(kāi)啟自動(dòng)提交偏移量,默認(rèn)為true鸵贬∷姿可以把它設(shè)置為false,由程序自己控制何時(shí)提交偏移量來(lái)避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失的情況阔逼。auto.offset.reset
指定在Consumer讀取一個(gè)沒(méi)有偏移量的分區(qū)或者偏移量無(wú)效的情況下的策略(因?yàn)橄M(fèi)者長(zhǎng)時(shí)間失效兆衅,包含偏移量的記錄已經(jīng)過(guò)時(shí)并刪除),默認(rèn)為latest嗜浮,表示會(huì)從最新的記錄開(kāi)始讀取(在消費(fèi)者啟動(dòng)之后生成的記錄羡亩,會(huì)出現(xiàn)漏消費(fèi)歷史記錄的情況),另外一個(gè)配置是earliest危融,表示從最早的消息開(kāi)始消費(fèi)(會(huì)出現(xiàn)重復(fù)消費(fèi)的情況)-
partition.assignment.strategy
分區(qū)分配給Consumer的策略畏铆,有兩種:- Range
把topic的若干個(gè)連續(xù)的分區(qū)分配給Consumer,假設(shè)有Consumer1和Consumer2吉殃,分別訂閱了topic1和topic2辞居,每個(gè)topic都有3個(gè)分區(qū)楷怒,那么Consumer1可能分配到topic1和topic2的分區(qū)0和分區(qū)1,Consumer2分配到topic1和topic2的分區(qū)2,這種策略會(huì)導(dǎo)致當(dāng)分區(qū)數(shù)量(針對(duì)單個(gè)topic,上面例子是3個(gè)分區(qū))無(wú)法被消費(fèi)者數(shù)量(上面例子是2個(gè)消費(fèi)者)整除時(shí)瓦灶,就會(huì)出現(xiàn)分區(qū)分布不均勻的情況鸠删。
- Range
- RoundRobin
該策略會(huì)把所有的分區(qū)逐個(gè)分配給Consumer,還是上面的例子,如果按這種策略分配那么Consumer1最終分到的是topic1的分區(qū)0贼陶,分區(qū)2和topic2的分區(qū)1刃泡,Consumer2最終分到的是topic1的分區(qū)1和topic2的分區(qū)0和分區(qū)2。
默認(rèn)使用org.apache.kafka.clients.consumer.RangeAssignor碉怔,這個(gè)類(lèi)實(shí)現(xiàn)了Range策略烘贴,RoundRabin的實(shí)現(xiàn)類(lèi)為org.apache.kafka.clients.consumer.RoundRobinAssignor,我們還可以自定義策略。
-
max.poll.records
指定單次調(diào)用poll()方法返回的記錄數(shù)量撮胧。