回顧:kafka極簡入門(三)--創(chuàng)建topic
前言
kafka針對broker, topic, producer, consumer的配置非常多,這里講一下常用的配置
1.broker相關配置
-
broker.id
broker在kafka集群中的唯一標識誊垢,必須是一個大于等于0的整數(shù),如果不寫的話默認從1001開始。
建議:把它設置成與機器名具有相關性的整數(shù)。 -
port
設置kafka的端口號,默認情況下是9092,不建議修改成1024以下的端口,因為需要使用root權限啟動萤悴。 -
zookeeper.connect
設置zookeeper集群,該配置參數(shù)是一組用逗號隔開的host:port/path列表- host是zookeeper服務器的機器名或ip地址
- port是zookeeper服務器的端口
- /path是可選的zookeeper路徑,作為kafka集群的chroot環(huán)境肴甸,默認是根路徑,如果指定的chroot路徑不存在囚巴,kafka會在啟動的時候創(chuàng)建它原在。使用chroot使得zookeeper集群可以共享給其他應用程序時而不會產生沖突。
-
log.dirs
配置kafka日志片段的目錄位置,多個路徑以逗號隔開彤叉。如果配置了多個路徑庶柿,kafka會根據(jù)最少使用原則,把統(tǒng)一分區(qū)的日志保存到同一路徑下秽浇,注意的是浮庐,kafka會往擁有最少數(shù)量分區(qū)的路徑新增分區(qū),而不是往擁有最小磁盤空間的路徑新增分區(qū)柬焕。 -
auto.create.topics.enable
配置是否開啟自動創(chuàng)建topic审残,如果設置為true, kafka會在以下幾種場景自動創(chuàng)建topic:- 當一個producer開始往topic寫入消息時梭域。
- 當一個consumer開始從topic消費消息時。
- 當一個client向topic發(fā)送元數(shù)據(jù)請求時维苔。
-
num.partitions
配置創(chuàng)建主題時包含多少個分區(qū)碰辅,默認值為1,因為我們能增加主題的分區(qū)數(shù)介时,但是不能減少分區(qū)的個數(shù)没宾,所以,如果要讓一個主題的分區(qū)個數(shù)少于num.partitions需要手動創(chuàng)建該主題而不是通過自動創(chuàng)建主題沸柔。 -
log.retention.hours
配置kafka保留數(shù)據(jù)的時間循衰,默認為168小時也就是7天,效果等同log.retention.minutes和log.retention.ms,只是單位不一樣褐澎,分別是小時会钝,分鐘,和毫秒工三,推薦使用log.retention.ms,粒度更加細迁酸,如果三個參數(shù)都配置了則去數(shù)值最小的配置。 -
log.retention.bytes
配置一個分區(qū)能保存最大的字節(jié)數(shù)俭正,如果超出的部分就會被刪除奸鬓,同時配置了log.retention.hours/log.retention.minutes/log.retention.ms的話,任一個滿足條件都會觸發(fā)數(shù)據(jù)刪除。 -
message.max.bytes
配置消息的大小限制掸读,默認為100000串远,也就是1M,這里的大小是指在kafka壓縮后的大小儿惫,也就是說實際消息可以大于1M,如果消息超過這個限制澡罚,則會被kafka拒收。
2.producer相關配置
-
bootstrap.servers
配置broker的地址肾请,格式為host:port,如果多個則以逗號隔開,不需要配置所有的broker,producer會從給定的broker查詢其他broker的信息留搔,不過建議至少填寫兩個,以防在一個宕機的情況還能從另外一個去獲取broker的信息铛铁。 -
acks
acks指定了必須要有多少個分區(qū)副本收到消息隔显,producer才會認為消息寫入是成功的。- acks=0 producer發(fā)送消息不等待任何來自服務器的響應避归,所以會出現(xiàn)消息丟失而producer不感知的情況荣月,該模式下可獲取最大的吞吐量管呵。
- acks=1 只要集群的leader節(jié)點收到消息梳毙,producer就會收到一個服務器的成功響應,如果消息無法達到leader節(jié)點捐下,那么producer就會獲取到一個錯誤響應账锹,這時候為了避免消息的丟失萌业,producer可以選擇重發(fā),不過如果一個沒有收到消息的節(jié)點成為新的leader,那么消息還是會丟失奸柬。
- acks=all 只有當leader節(jié)點和follower節(jié)點都收到消息時,producer才會收到成功的響應生年,這是一個避免消息丟失最安全的做法,不過這種模式吞吐量最低
-
client.id
可以是任意字符串廓奕,標識消息的來源 -
max.in.flight.requests.per.connection
配置producer在收到服務器響應前可以發(fā)送的消息個數(shù)抱婉,值越高,吞吐量就會越高桌粉,不過相應的占用的內存也會越多蒸绩,設置為1可以保證消息可以按照發(fā)送的順序寫入服務,即便發(fā)生了重試铃肯。 -
max.request.size
配置producer單次發(fā)送的所有消息的總的大小限制患亿,例如設置為1M,單個消息大小為1K,那么單次可以發(fā)的上限是1000個押逼,最好跟message.max.bytes配置匹配步藕,避免發(fā)送到broker的消息被拒絕。 -
retries
該參數(shù)決定了producer可以重發(fā)消息的次數(shù),producer從broker收到的錯誤可能是臨時性的挑格,例如分區(qū)找不到首領咙冗,這種情況下,producer在進行retries次重試后就會放棄重試并且返回錯誤恕齐,默認情況下乞娄,重試的時間間隔為100ms,可以通過retry.backoff.ms參數(shù)配置显歧,建議在設置重試間隔之前最好測試一下恢復一個崩潰的節(jié)點要多長時間仪或,重試的間隔最好比恢復時間要長。 -
batch.size
當多個消息往同一個分區(qū)發(fā)送時士骤,producer會把這些消息放到同一個分區(qū)范删,該參數(shù)指定了一個批次可以使用的內存大小,按字節(jié)數(shù)計算拷肌,當批次填滿時消息就會被發(fā)送出去到旦,不過producer不一定等批次被填滿才會發(fā)送,甚至只有一個消息也會被發(fā)送巨缘,所以就算把該值設置得很大也不會造成延遲添忘,只不過會占用內存,但是如果設置太小的話若锁,producer會很頻繁的發(fā)送搁骑,增加一些額外的開銷。 -
linger.ms
指定producer發(fā)送批次之前要等待更多消息加入批次的時間,producer會在批次填滿或者longer.ms到達上限時把批次發(fā)送出去仲器,默認情況下煤率,只要有可用的線程,就算批次只有一個消息乏冀,producer也會把消息發(fā)送出去蝶糯。把linger.ms設置成比0大的數(shù),讓producer在發(fā)送批次之前多等待一會辆沦,可以使得更多的消息可以加入到批次昼捍,雖然增加了延遲,但是同時也增加了吞吐量肢扯。
3.Consumer相關配置
-
fetch.min.bytes
配置Consumer從broker獲取記錄的最小字節(jié)數(shù)端三,broker在收到Consumer的數(shù)據(jù)請求時,如果可用的數(shù)據(jù)量小于該配置鹃彻,那么broker會等到有足夠的可用數(shù)據(jù)時才把它返回給Consumer郊闯。 -
fetch.max.wait.ms
配置broker的等待時間,默認為500ms蛛株,如果沒有足夠的數(shù)據(jù)流入团赁,導致不滿足fetch.mis.bytes,最終會導致500ms的延遲谨履。如果fetch.mis.bytes配置為1M,fetch.max.wait.ms配置為500ms,那么最終broker要么返回1M的數(shù)據(jù)欢摄,要么等待500ms后返回所有可用的數(shù)據(jù),取決于哪個條件先得到滿足笋粟。 -
max.partition.fetch.bytes
配置broker從每個分區(qū)返回給Consumer的最大字節(jié)數(shù),默認為1MB,也就是說,KafkaConsumer.poll()方法從每個分區(qū)里面返回的記錄最多不超過該值怀挠,加入有10個分區(qū)5個消費者,則每個消費者需要2MB的內存來接收消息害捕,需要注意的是绿淋,如果該值設置過大,導致消費者處理的業(yè)務的時間過長尝盼,會有回話超時的風險吞滞。 -
session.timeout.ms
配置了Consumer被認定為死亡前可以與服務器斷開連接的時間,默認3秒盾沫,如果服務器在超過該值時間沒有收到Consumer的心跳裁赠,就會認定Consumer死亡,會觸發(fā)再均衡赴精,把死亡Consumer的分區(qū)分配給其他Consumer佩捞,這個配置要跟heartbeat.interval.ms配合使用,heartbeat.interval.ms設置了poll()方法向協(xié)調器發(fā)送心跳的頻率蕾哟。建議heartbeat.interval.ms的值為session.timeout.ms的三分之一一忱。 -
enable.auto.commit
指定了Consumer是否開啟自動提交偏移量啊奄,默認為true∠瞥保可以把它設置為false,由程序自己控制何時提交偏移量來避免出現(xiàn)重復數(shù)據(jù)和數(shù)據(jù)丟失的情況琼富。 -
auto.offset.reset
指定在Consumer讀取一個沒有偏移量的分區(qū)或者偏移量無效的情況下的策略(因為消費者長時間失效仪吧,包含偏移量的記錄已經過時并刪除),默認為latest鞠眉,表示會從最新的記錄開始讀取(在消費者啟動之后生成的記錄薯鼠,會出現(xiàn)漏消費歷史記錄的情況),另外一個配置是earliest械蹋,表示從最早的消息開始消費(會出現(xiàn)重復消費的情況) -
partition.assignment.strategy
分區(qū)分配給Consumer的策略出皇,有兩種:-
Range
把topic的若干個連續(xù)的分區(qū)分配給Consumer,假設有Consumer1和Consumer2哗戈,分別訂閱了topic1和topic2郊艘,每個topic都有3個分區(qū),那么Consumer1可能分配到topic1和topic2的分區(qū)0和分區(qū)1唯咬,Consumer2分配到topic1和topic2的分區(qū)2,這種策略會導致當分區(qū)數(shù)量(針對單個topic,上面例子是3個分區(qū))無法被消費者數(shù)量(上面例子是2個消費者)整除時纱注,就會出現(xiàn)分區(qū)分布不均勻的情況。
Range.png -
RoundRobin
該策略會把所有的分區(qū)逐個分配給Consumer,還是上面的例子胆胰,如果按這種策略分配那么Consumer1最終分到的是topic1的分區(qū)0狞贱,分區(qū)2和topic2的分區(qū)1,Consumer2最終分到的是topic1的分區(qū)1和topic2的分區(qū)0和分區(qū)2蜀涨。
RoundRobin.png
-
默認使用org.apache.kafka.clients.consumer.RangeAssignor瞎嬉,這個類實現(xiàn)了Range策略,RoundRabin的實現(xiàn)類為org.apache.kafka.clients.consumer.RoundRobinAssignor,我們還可以自定義策略厚柳。
-
max.poll.records
指定單次調用poll()方法返回的記錄數(shù)量氧枣。