kafka配置參數(shù)詳解

Property Default Description
broker.id 每個broker都可以用一個唯一的非負(fù)整數(shù)id進(jìn)行標(biāo)識盟劫;這個id可以作為broker的“名字”,并且它的存在使得broker無須混淆consumers就可以遷移到不同的host/port上厌殉。你可以選擇任意你喜歡的數(shù)字作為id款咖,只要id是唯一的即可。
log.dirs /tmp/kafka-logs kafka存放數(shù)據(jù)的路徑买猖。這個路徑并不是唯一的刁笙,可以是多個破花,路徑之間只需要使用逗號分隔即可谦趣;每當(dāng)創(chuàng)建新partition時,都會選擇在包含最少partitions的路徑下進(jìn)行旧乞。
port 6667 server接受客戶端連接的端口
zookeeper.connect null ZooKeeper連接字符串的格式為:hostname:port蔚润,此處hostname和port分別是ZooKeeper集群中某個節(jié)點(diǎn)的host和port;為了當(dāng)某個host宕掉之后你能通過其他ZooKeeper節(jié)點(diǎn)進(jìn)行連接尺栖,你可以按照一下方式制定多個hosts: hostname1:port1, hostname2:port2, hostname3:port3. ZooKeeper 允許你增加一個“chroot”路徑嫡纠,將集群中所有kafka數(shù)據(jù)存放在特定的路徑下。當(dāng)多個Kafka集群或者其他應(yīng)用使用相同ZooKeeper集群時延赌,可以使用這個方式設(shè)置數(shù)據(jù)存放路徑除盏。這種方式的實現(xiàn)可以通過這樣設(shè)置連接字符串格式,如下所示: hostname1:port1挫以,hostname2:port2者蠕,hostname3:port3/chroot/path 這樣設(shè)置就將所有kafka集群數(shù)據(jù)存放在/chroot/path路徑下。注意掐松,在你啟動broker之前踱侣,你必須創(chuàng)建這個路徑,并且consumers必須使用相同的連接格式大磺。
message.max.bytes 1000000 server可以接收的消息最大尺寸抡句。重要的是,consumer和producer有關(guān)這個屬性的設(shè)置必須同步杠愧,否則producer發(fā)布的消息對consumer來說太大待榔。
num.network.threads 3 server用來處理網(wǎng)絡(luò)請求的網(wǎng)絡(luò)線程數(shù)目;一般你不需要更改這個屬性流济。
num.io.threads 8 server用來處理請求的I/O線程的數(shù)目锐锣;這個線程數(shù)目至少要等于硬盤的個數(shù)。
background.threads 4 用于后臺處理的線程數(shù)目绳瘟,例如文件刪除雕憔;你不需要更改這個屬性。
queued.max.requests 500 在網(wǎng)絡(luò)線程停止讀取新請求之前糖声,可以排隊等待I/O線程處理的最大請求個數(shù)橘茉。
host.name null broker的hostname;如果hostname已經(jīng)設(shè)置的話姨丈,broker將只會綁定到這個地址上;如果沒有設(shè)置擅腰,它將綁定到所有接口蟋恬,并發(fā)布一份到ZK
advertised.host.name null 如果設(shè)置,則就作為broker 的hostname發(fā)往producer趁冈、consumers以及其他brokers
advertised.port null 此端口將給與producers歼争、consumers拜马、以及其他brokers,它會在建立連接時用到沐绒; 它僅在實際端口和server需要綁定的端口不一樣時才需要設(shè)置俩莽。
socket.send.buffer.bytes 100 * 1024 SO_SNDBUFF 緩存大小,server進(jìn)行socket 連接所用
socket.receive.buffer.bytes 100 * 1024 SO_RCVBUFF緩存大小乔遮,server進(jìn)行socket連接時所用
socket.request.max.bytes 100 * 1024 * 1024 server允許的最大請求尺寸扮超; 這將避免server溢出,它應(yīng)該小于Java heap size
num.partitions 1 如果創(chuàng)建topic時沒有給出劃分partitions個數(shù)蹋肮,這個數(shù)字將是topic下partitions數(shù)目的默認(rèn)數(shù)值出刷。
log.segment.bytes 101410241024 topic partition的日志存放在某個目錄下諸多文件中,這些文件將partition的日志切分成一段一段的坯辩;這個屬性就是每個文件的最大尺寸馁龟;當(dāng)尺寸達(dá)到這個數(shù)值時,就會創(chuàng)建新文件漆魔。此設(shè)置可以由每個topic基礎(chǔ)設(shè)置時進(jìn)行覆蓋坷檩。 查看 the per-topic configuration section
log.roll.hours 24 * 7 即使文件沒有到達(dá)log.segment.bytes,只要文件創(chuàng)建時間到達(dá)此屬性改抡,就會創(chuàng)建新文件矢炼。這個設(shè)置也可以有topic層面的設(shè)置進(jìn)行覆蓋; 查看the per-topic configuration section
log.cleanup.policy delete
log.retention.minutes和log.retention.hours 7 days 每個日志文件刪除之前保存的時間雀摘。默認(rèn)數(shù)據(jù)保存時間對所有topic都一樣裸删。 log.retention.minutes 和 log.retention.bytes 都是用來設(shè)置刪除日志文件的,無論哪個屬性已經(jīng)溢出阵赠。 這個屬性設(shè)置可以在topic基本設(shè)置時進(jìn)行覆蓋涯塔。 查看the per-topic configuration section
log.retention.bytes -1 每個topic下每個partition保存數(shù)據(jù)的總量;注意清蚀,這是每個partitions的上限匕荸,因此這個數(shù)值乘以partitions的個數(shù)就是每個topic保存的數(shù)據(jù)總量。同時注意:如果log.retention.hours和log.retention.bytes都設(shè)置了枷邪,則超過了任何一個限制都會造成刪除一個段文件榛搔。 注意,這項設(shè)置可以由每個topic設(shè)置時進(jìn)行覆蓋东揣。 查看the per-topic configuration section
log.retention.check.interval.ms 5 minutes 檢查日志分段文件的間隔時間践惑,以確定是否文件屬性是否到達(dá)刪除要求。
log.cleaner.enable false 當(dāng)這個屬性設(shè)置為false時嘶卧,一旦日志的保存時間或者大小達(dá)到上限時尔觉,就會被刪除;如果設(shè)置為true芥吟,則當(dāng)保存屬性達(dá)到上限時侦铜,就會進(jìn)行log compaction专甩。
log.cleaner.threads 1 進(jìn)行日志壓縮的線程數(shù)
log.cleaner.io.max.bytes.per.second None 進(jìn)行l(wèi)og compaction時,log cleaner可以擁有的最大I/O數(shù)目钉稍。這項設(shè)置限制了cleaner涤躲,以避免干擾活動的請求服務(wù)。
log.cleaner.io.buffer.size 50010241024 log cleaner清除過程中針對日志進(jìn)行索引化以及精簡化所用到的緩存大小。最好設(shè)置大點(diǎn),以提供充足的內(nèi)存沦疾。
log.cleaner.io.buffer.load.factor 512*1024 進(jìn)行l(wèi)og cleaning時所需要的I/O chunk尺寸铣卡。你不需要更改這項設(shè)置。
log.cleaner.io.buffer.load.factor 0.9 log cleaning中所使用的hash表的負(fù)載因子;你不需要更改這個選項。
log.cleaner.backoff.ms 15000 進(jìn)行日志是否清理檢查的時間間隔
log.cleaner.min.cleanable.ratio 0.5 這項配置控制log compactor試圖清理日志的頻率(假定log compaction是打開的)。默認(rèn)避免清理壓縮超過50%的日志俐镐。這個比率綁定了備份日志所消耗的最大空間(50%的日志備份時壓縮率為50%)。更高的比率則意味著浪費(fèi)消耗更少哺哼,也就可以更有效的清理更多的空間佩抹。這項設(shè)置在每個topic設(shè)置中可以覆蓋。 查看the per-topic configuration section取董。
log.cleaner.delete.retention.ms 1day 保存時間棍苹;保存壓縮日志的最長時間;也是客戶端消費(fèi)消息的最長時間茵汰,榮log.retention.minutes的區(qū)別在于一個控制未壓縮數(shù)據(jù)枢里,一個控制壓縮后的數(shù)據(jù);會被topic創(chuàng)建時的指定時間覆蓋蹂午。
log.index.size.max.bytes 1010241024 每個log segment的最大尺寸栏豺。注意,如果log尺寸達(dá)到這個數(shù)值豆胸,即使尺寸沒有超過log.segment.bytes限制奥洼,也需要產(chǎn)生新的log segment。
log.index.interval.bytes 4096 當(dāng)執(zhí)行一次fetch后晚胡,需要一定的空間掃描最近的offset灵奖,設(shè)置的越大越好,一般使用默認(rèn)值就可以
log.flush.interval.messages Long.MaxValue log文件“sync”到磁盤之前累積的消息條數(shù)估盘。因為磁盤IO操作是一個慢操作瓷患,但又是一個“數(shù)據(jù)可靠性”的必要手段,所以檢查是否需要固化到硬盤的時間間隔遣妥。需要在“數(shù)據(jù)可靠性”與“性能”之間做必要的權(quán)衡擅编,如果此值過大,將會導(dǎo)致每次“發(fā)sync”的時間過長(IO阻塞)燥透,如果此值過小沙咏,將會導(dǎo)致“fsync”的時間較長(IO阻塞),如果此值過小班套,將會導(dǎo)致”發(fā)sync“的次數(shù)較多肢藐,這也就意味著整體的client請求有一定的延遲,物理server故障吱韭,將會導(dǎo)致沒有fsync的消息丟失吆豹。
log.flush.scheduler.interval.ms Long.MaxValue 檢查是否需要fsync的時間間隔
log.flush.interval.ms Long.MaxValue 僅僅通過interval來控制消息的磁盤寫入時機(jī),是不足的理盆,這個數(shù)用來控制”fsync“的時間間隔痘煤,如果消息量始終沒有達(dá)到固化到磁盤的消息數(shù),但是離上次磁盤同步的時間間隔達(dá)到閾值猿规,也將觸發(fā)磁盤同步衷快。
log.delete.delay.ms 60000 文件在索引中清除后的保留時間,一般不需要修改
auto.create.topics.enable true 是否允許自動創(chuàng)建topic姨俩。如果是真的蘸拔,則produce或者fetch 不存在的topic時,會自動創(chuàng)建這個topic环葵。否則需要使用命令行創(chuàng)建topic
controller.socket.timeout.ms 30000 partition管理控制器進(jìn)行備份時调窍,socket的超時時間。
controller.message.queue.size Int.MaxValue controller-to-broker-channles的buffer 尺寸
default.replication.factor 1 默認(rèn)備份份數(shù)张遭,僅指自動創(chuàng)建的topics
replica.lag.time.max.ms 10000 如果一個follower在這個時間內(nèi)沒有發(fā)送fetch請求邓萨,leader將從ISR重移除這個follower,并認(rèn)為這個follower已經(jīng)掛了
replica.lag.max.messages 4000 如果一個replica沒有備份的條數(shù)超過這個數(shù)值菊卷,則leader將移除這個follower缔恳,并認(rèn)為這個follower已經(jīng)掛了
replica.socket.timeout.ms 30*1000 leader 備份數(shù)據(jù)時的socket網(wǎng)絡(luò)請求的超時時間
replica.socket.receive.buffer.bytes 64*1024 備份時向leader發(fā)送網(wǎng)絡(luò)請求時的socket receive buffer
replica.fetch.max.bytes 1024*1024 備份時每次fetch的最大值
replica.fetch.min.bytes 500 leader發(fā)出備份請求時,數(shù)據(jù)到達(dá)leader的最長等待時間
replica.fetch.min.bytes 1 備份時每次fetch之后回應(yīng)的最小尺寸
num.replica.fetchers 1 從leader備份數(shù)據(jù)的線程數(shù)
replica.high.watermark.checkpoint.interval.ms 5000 每個replica檢查是否將最高水位進(jìn)行固化的頻率
fetch.purgatory.purge.interval.requests 1000 fetch 請求清除時的清除間隔
producer.purgatory.purge.interval.requests 1000 producer請求清除時的清除間隔
zookeeper.session.timeout.ms 6000 zookeeper會話超時時間的烁。
zookeeper.connection.timeout.ms 6000 客戶端等待和zookeeper建立連接的最大時間
zookeeper.sync.time.ms 2000 zk follower落后于zk leader的最長時間
controlled.shutdown.enable true 是否能夠控制broker的關(guān)閉褐耳。如果能夠,broker將可以移動所有l(wèi)eaders到其他的broker上渴庆,在關(guān)閉之前铃芦。這減少了不可用性在關(guān)機(jī)過程中。
controlled.shutdown.max.retries 3 在執(zhí)行不徹底的關(guān)機(jī)之前襟雷,可以成功執(zhí)行關(guān)機(jī)的命令數(shù)刃滓。
controlled.shutdown.retry.backoff.ms 5000 在關(guān)機(jī)之間的backoff時間
auto.leader.rebalance.enable true 如果這是true,控制者將會自動平衡brokers對于partitions的leadership
leader.imbalance.per.broker.percentage 10 每個broker所允許的leader最大不平衡比率
leader.imbalance.check.interval.seconds 300 檢查leader不平衡的頻率
offset.metadata.max.bytes 4096 允許客戶端保存他們offsets的最大個數(shù)
max.connections.per.ip Int.MaxValue 每個ip地址上每個broker可以被連接的最大數(shù)目
max.connections.per.ip.overrides 每個ip或者h(yuǎn)ostname默認(rèn)的連接的最大覆蓋
connections.max.idle.ms 600000 空連接的超時限制
log.roll.jitter.{ms,hours} 0 從logRollTimeMillis抽離的jitter最大數(shù)目
num.recovery.threads.per.data.dir 1 每個數(shù)據(jù)目錄用來日志恢復(fù)的線程數(shù)目
unclean.leader.election.enable true 指明了是否能夠使不在ISR中replicas設(shè)置用來作為leader
delete.topic.enable false 能夠刪除topic
offsets.topic.num.partitions 50 The number of partitions for the offset commit topic. Since changing this after deployment is currently unsupported, we recommend using a higher setting for production (e.g., 100-200).
offsets.topic.retention.minutes 1440 存在時間超過這個時間限制的offsets都將被標(biāo)記為待刪除
offsets.retention.check.interval.ms 600000 offset管理器檢查陳舊offsets的頻率
offsets.topic.replication.factor 3 topic的offset的備份份數(shù)耸弄。建議設(shè)置更高的數(shù)字保證更高的可用性
offset.topic.segment.bytes 104857600 offsets topic的segment尺寸咧虎。
offsets.load.buffer.size 5242880 這項設(shè)置與批量尺寸相關(guān),當(dāng)從offsets segment中讀取時使用计呈。
offsets.commit.required.acks -1 在offset commit可以接受之前砰诵,需要設(shè)置確認(rèn)的數(shù)目征唬,一般不需要更改

offsets.commit.timeout.ms 5000 offset commit的延遲時間,這和producer request的超時時間相似茁彭。

更多細(xì)節(jié)可以在scala 類 kafka.server.KafkaConfig中找到总寒。

topic-level的配置

有關(guān)topics的配置既有全局的又有每個topic獨(dú)有的配置。如果沒有給定特定topic設(shè)置理肺,則應(yīng)用默認(rèn)的全局設(shè)置摄闸。這些覆蓋會在每次創(chuàng)建topic發(fā)生。下面的例子:創(chuàng)建一個topic妹萨,命名為my-topic年枕,自定義最大消息尺寸以及刷新比率為:
> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
--replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1

需要刪除重寫時,可以按照以下來做:

> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic
--deleteConfig max.message.bytes

以下是topic-level的配置選項乎完。server的默認(rèn)配置在Server Default Property列下給出了熏兄,設(shè)定這些默認(rèn)值不會改變原有的設(shè)置

Property Default Server Default Property Description
cleanup.policy delete log.cleanup.policy 要么是”delete“要么是”compact“; 這個字符串指明了針對舊日志部分的利用方式囱怕;默認(rèn)方式("delete")將會丟棄舊的部分當(dāng)他們的回收時間或者尺寸限制到達(dá)時霍弹。”compact“將會進(jìn)行日志壓縮
delete.retention.ms 86400000 (24 hours) log.cleaner.delete.retention.ms 對于壓縮日志保留的最長時間娃弓,也是客戶端消費(fèi)消息的最長時間典格,通log.retention.minutes的區(qū)別在于一個控制未壓縮數(shù)據(jù),一個控制壓縮后的數(shù)據(jù)台丛。此項配置可以在topic創(chuàng)建時的置頂參數(shù)覆蓋
flush.messages None log.flush.interval.messages 此項配置指定時間間隔:強(qiáng)制進(jìn)行fsync日志耍缴。例如,如果這個選項設(shè)置為1挽霉,那么每條消息之后都需要進(jìn)行fsync防嗡,如果設(shè)置為5,則每5條消息就需要進(jìn)行一次fsync侠坎。一般來說蚁趁,建議你不要設(shè)置這個值。此參數(shù)的設(shè)置,需要在"數(shù)據(jù)可靠性"與"性能"之間做必要的權(quán)衡.如果此值過大,將會導(dǎo)致每次"fsync"的時間較長(IO阻塞),如果此值過小,將會導(dǎo)致"fsync"的次數(shù)較多,這也意味著整體的client請求有一定的延遲.物理server故障,將會導(dǎo)致沒有fsync的消息丟失.
flush.ms None log.flush.interval.ms 此項配置用來置頂強(qiáng)制進(jìn)行fsync日志到磁盤的時間間隔实胸;例如他嫡,如果設(shè)置為1000,那么每1000ms就需要進(jìn)行一次fsync庐完。一般不建議使用這個選項
index.interval.bytes 4096 log.index.interval.bytes 默認(rèn)設(shè)置保證了我們每4096個字節(jié)就對消息添加一個索引钢属,更多的索引使得閱讀的消息更加靠近,但是索引規(guī)模卻會由此增大门躯;一般不需要改變這個選項
max.message.bytes 1000000 max.message.bytes kafka追加消息的最大尺寸淆党。注意如果你增大這個尺寸,你也必須增大你consumer的fetch 尺寸,這樣consumer才能fetch到這些最大尺寸的消息染乌。
min.cleanable.dirty.ratio 0.5 min.cleanable.dirty.ratio 此項配置控制log壓縮器試圖進(jìn)行清除日志的頻率山孔。默認(rèn)情況下,將避免清除壓縮率超過50%的日志荷憋。這個比率避免了最大的空間浪費(fèi)
min.insync.replicas 1 min.insync.replicas 當(dāng)producer設(shè)置request.required.acks為-1時饱须,min.insync.replicas指定replicas的最小數(shù)目(必須確認(rèn)每一個repica的寫數(shù)據(jù)都是成功的),如果這個數(shù)目沒有達(dá)到台谊,producer會產(chǎn)生異常。
retention.bytes None log.retention.bytes 如果使用“delete”的retention 策略譬挚,這項配置就是指在刪除日志之前锅铅,日志所能達(dá)到的最大尺寸。默認(rèn)情況下减宣,沒有尺寸限制而只有時間限制
retention.ms 7 days log.retention.minutes 如果使用“delete”的retention策略盐须,這項配置就是指刪除日志前日志保存的時間。
segment.bytes 1GB log.segment.bytes kafka中l(wèi)og日志是分成一塊塊存儲的漆腌,此配置是指log日志劃分成塊的大小
segment.index.bytes 10MB log.index.size.max.bytes 此配置是有關(guān)offsets和文件位置之間映射的索引文件的大性舻恕;一般不需要修改這個配置
segment.ms 7 days log.roll.hours 即使log的分塊文件沒有達(dá)到需要刪除闷尿、壓縮的大小塑径,一旦log 的時間達(dá)到這個上限,就會強(qiáng)制新建一個log分塊文件
segment.jitter.ms 0 log.roll.jitter.{ms,hours} The maximum jitter to subtract from logRollTimeMillis.

3.2 Consumer Configs
consumer基本配置如下:
group.id
zookeeper.connect

Property Default Description
group.id 用來唯一標(biāo)識consumer進(jìn)程所在組的字符串填具,如果設(shè)置同樣的group id统舀,表示這些processes都是屬于同一個consumer group
zookeeper.connect 指定zookeeper的連接的字符串,格式是hostname:port劳景,此處host和port都是zookeeper server的host和port誉简,為避免某個zookeeper 機(jī)器宕機(jī)之后失聯(lián),你可以指定多個hostname:port盟广,使用逗號作為分隔: hostname1:port1闷串,hostname2:port2,hostname3:port3 可以在zookeeper連接字符串中加入zookeeper的chroot路徑筋量,此路徑用于存放他自己的數(shù)據(jù)烹吵,方式: hostname1:port1,hostname2:port2毛甲,hostname3:port3/chroot/path
consumer.id null 不需要設(shè)置年叮,一般自動產(chǎn)生
socket.timeout.ms 30*100 網(wǎng)絡(luò)請求的超時限制。真實的超時限制是 max.fetch.wait+socket.timeout.ms
socket.receive.buffer.bytes 64*1024 socket用于接收網(wǎng)絡(luò)請求的緩存大小
fetch.message.max.bytes 1024*1024 每次fetch請求中玻募,針對每次fetch消息的最大字節(jié)數(shù)只损。這些字節(jié)將會督導(dǎo)用于每個partition的內(nèi)存中,因此,此設(shè)置將會控制consumer所使用的memory大小跃惫。這個fetch請求尺寸必須至少和server允許的最大消息尺寸相等叮叹,否則,producer可能發(fā)送的消息尺寸大于consumer所能消耗的尺寸爆存。
num.consumer.fetchers 1 用于fetch數(shù)據(jù)的fetcher線程數(shù)
auto.commit.enable true 如果為真蛉顽,consumer所fetch的消息的offset將會自動的同步到zookeeper。這項提交的offset將在進(jìn)程掛掉時先较,由新的consumer使用
auto.commit.interval.ms 60*1000 consumer向zookeeper提交offset的頻率携冤,單位是秒
queued.max.message.chunks 2 用于緩存消息的最大數(shù)目,以供consumption闲勺。每個chunk必須和fetch.message.max.bytes相同
rebalance.max.retries 4 當(dāng)新的consumer加入到consumer group時曾棕,consumers集合試圖重新平衡分配到每個consumer的partitions數(shù)目。如果consumers集合改變了菜循,當(dāng)分配正在執(zhí)行時翘地,這個重新平衡會失敗并重入
fetch.min.bytes 1 每次fetch請求時,server應(yīng)該返回的最小字節(jié)數(shù)癌幕。如果沒有足夠的數(shù)據(jù)返回衙耕,請求會等待,直到足夠的數(shù)據(jù)才會返回勺远。
fetch.wait.max.ms 100 如果沒有足夠的數(shù)據(jù)能夠滿足fetch.min.bytes橙喘,則此項配置是指在應(yīng)答fetch請求之前,server會阻塞的最大時間胶逢。
rebalance.backoff.ms 2000 在重試reblance之前backoff時間
refresh.leader.backoff.ms 200 在試圖確定某個partition的leader是否失去他的leader地位之前渴杆,需要等待的backoff時間
auto.offset.reset largest zookeeper中沒有初始化的offset時,如果offset是以下值的回應(yīng): smallest:自動復(fù)位offset為smallest的offset largest:自動復(fù)位offset為largest的offset anything else:向consumer拋出異常
consumer.timeout.ms -1 如果沒有消息可用宪塔,即使等待特定的時間之后也沒有磁奖,則拋出超時異常
exclude.internal.topics true 是否將內(nèi)部topics的消息暴露給consumer
paritition.assignment.strategy range 選擇向consumer 流分配partitions的策略,可選值:range某筐,roundrobin
client.id group id value 是用戶特定的字符串比搭,用來在每次請求中幫助跟蹤調(diào)用。它應(yīng)該可以邏輯上確認(rèn)產(chǎn)生這個請求的應(yīng)用
zookeeper.session.timeout.ms 6000 zookeeper 會話的超時限制南誊。如果consumer在這段時間內(nèi)沒有向zookeeper發(fā)送心跳信息身诺,則它會被認(rèn)為掛掉了,并且reblance將會產(chǎn)生
zookeeper.connection.timeout.ms 6000 客戶端在建立通zookeeper連接中的最大等待時間
zookeeper.sync.time.ms 2000 ZK follower可以落后ZK leader的最大時間
offsets.storage zookeeper 用于存放offsets的地點(diǎn): zookeeper或者kafka
offset.channel.backoff.ms 1000 重新連接offsets channel或者是重試失敗的offset的fetch/commit請求的backoff時間
offsets.channel.socket.timeout.ms 10000 當(dāng)讀取offset的fetch/commit請求回應(yīng)的socket 超時限制抄囚。此超時限制是被consumerMetadata請求用來請求offset管理
offsets.commit.max.retries 5 重試offset commit的次數(shù)霉赡。這個重試只應(yīng)用于offset commits在shut-down之間。他
dual.commit.enabled true 如果使用“kafka”作為offsets.storage幔托,你可以二次提交offset到zookeeper(還有一次是提交到kafka)穴亏。在zookeeper-based的offset storage到kafka-based的offset storage遷移時蜂挪,這是必須的。對任意給定的consumer group來說嗓化,比較安全的建議是當(dāng)完成遷移之后就關(guān)閉這個選項
partition.assignment.strategy range 在“range”和“roundrobin”策略之間選擇一種作為分配partitions給consumer 數(shù)據(jù)流的策略棠涮; 循環(huán)的partition分配器分配所有可用的partitions以及所有可用consumer 線程。它會將partition循環(huán)的分配到consumer線程上刺覆。如果所有consumer實例的訂閱都是確定的严肪,則partitions的劃分是確定的分布。循環(huán)分配策略只有在以下條件滿足時才可以:(1)每個topic在每個consumer實力上都有同樣數(shù)量的數(shù)據(jù)流谦屑。(2)訂閱的topic的集合對于consumer group中每個consumer實例來說都是確定的驳糯。

更多細(xì)節(jié)可以查看 scala類: kafka.consumer.ConsumerConfig

3.3 Producer Configs
producer基本的配置屬性包含:
(1) metadata.broker.list
(2)request.required.acks
(3)producer.type
(4)serializer.class

Property Default Description
metadata.broker.list 服務(wù)于bootstrapping。producer僅用來獲取metadata(topics氢橙,partitions结窘,replicas)。發(fā)送實際數(shù)據(jù)的socket連接將基于返回的metadata數(shù)據(jù)信息而建立充蓝。格式是: host1:port1,host2:port2 這個列表可以是brokers的子列表或者是一個指向brokers的VIP
request.required.acks 0 此配置是表明當(dāng)一次produce請求被認(rèn)為完成時的確認(rèn)值喉磁。特別是谓苟,多少個其他brokers必須已經(jīng)提交了數(shù)據(jù)到他們的log并且向他們的leader確認(rèn)了這些信息。典型的值包括: 0: 表示producer從來不等待來自broker的確認(rèn)信息(和0.7一樣的行為)协怒。這個選擇提供了最小的時延但同時風(fēng)險最大(因為當(dāng)server宕機(jī)時涝焙,數(shù)據(jù)將會丟失)。 1:表示獲得leader replica已經(jīng)接收了數(shù)據(jù)的確認(rèn)信息孕暇。這個選擇時延較小同時確保了server確認(rèn)接收成功仑撞。 -1:producer會獲得所有同步replicas都收到數(shù)據(jù)的確認(rèn)。同時時延最大妖滔,然而隧哮,這種方式并沒有完全消除丟失消息的風(fēng)險,因為同步replicas的數(shù)量可能是1.如果你想確保某些replicas接收到數(shù)據(jù)座舍,那么你應(yīng)該在topic-level設(shè)置中選項min.insync.replicas設(shè)置一下沮翔。請閱讀一下設(shè)計文檔,可以獲得更深入的討論曲秉。
request.timeout.ms 10000 broker盡力實現(xiàn)request.required.acks需求時的等待時間采蚀,否則會發(fā)送錯誤到客戶端
producer.type sync 此選項置頂了消息是否在后臺線程中異步發(fā)送。正確的值: (1) async: 異步發(fā)送 (2) sync: 同步發(fā)送 通過將producer設(shè)置為異步承二,我們可以批量處理請求(有利于提高吞吐率)但是這也就造成了客戶端機(jī)器丟掉未發(fā)送數(shù)據(jù)的可能性
serializer.class kafka.serializer.DefaultEncoder 消息的序列化類別榆鼠。默認(rèn)編碼器輸入一個字節(jié)byte[],然后返回相同的字節(jié)byte[]
key.serializer.class 關(guān)鍵字的序列化類亥鸠。如果沒給與這項妆够,默認(rèn)情況是和消息一致
partitioner.class kafka.producer.DefaultPartitioner partitioner 類,用于在subtopics之間劃分消息。默認(rèn)partitioner基于key的hash表
compression.codec none 此項參數(shù)可以設(shè)置壓縮數(shù)據(jù)的codec责静,可選codec為:“none”袁滥, “gzip”, “snappy”
compressed.topics null 此項參數(shù)可以設(shè)置某些特定的topics是否進(jìn)行壓縮灾螃。如果壓縮codec是NoCompressCodec之外的codec题翻,則對指定的topics數(shù)據(jù)應(yīng)用這些codec。如果壓縮topics列表是空腰鬼,則將特定的壓縮codec應(yīng)用于所有topics嵌赠。如果壓縮的codec是NoCompressionCodec,壓縮對所有topics軍不可用熄赡。
message.send.max.retries 3 此項參數(shù)將使producer自動重試失敗的發(fā)送請求姜挺。此項參數(shù)將置頂重試的次數(shù)。注意:設(shè)定非0值將導(dǎo)致重復(fù)某些網(wǎng)絡(luò)錯誤:引起一條發(fā)送并引起確認(rèn)丟失
retry.backoff.ms 100 在每次重試之前彼硫,producer會更新相關(guān)topic的metadata炊豪,以此進(jìn)行查看新的leader是否分配好了。因為leader的選擇需要一點(diǎn)時間拧篮,此選項指定更新metadata之前producer需要等待的時間词渤。
topic.metadata.refresh.interval.ms 600*1000 producer一般會在某些失敗的情況下(partition missing,leader不可用等)更新topic的metadata串绩。他將會規(guī)律的循環(huán)缺虐。如果你設(shè)置為負(fù)值,metadata只有在失敗的情況下才更新礁凡。如果設(shè)置為0高氮,metadata會在每次消息發(fā)送后就會更新(不建議這種選擇,系統(tǒng)消耗太大)顷牌。重要提示: 更新是有在消息發(fā)送后才會發(fā)生剪芍,因此,如果producer從來不發(fā)送消息窟蓝,則metadata從來也不會更新紊浩。
queue.buffering.max.ms 5000 當(dāng)應(yīng)用async模式時,用戶緩存數(shù)據(jù)的最大時間間隔疗锐。例如坊谁,設(shè)置為100時,將會批量處理100ms之內(nèi)消息滑臊。這將改善吞吐率口芍,但是會增加由于緩存產(chǎn)生的延遲。
queue.buffering.max.messages 10000 當(dāng)使用async模式時雇卷,在在producer必須被阻塞或者數(shù)據(jù)必須丟失之前鬓椭,可以緩存到隊列中的未發(fā)送的最大消息條數(shù)
batch.num.messages 200 使用async模式時颠猴,可以批量處理消息的最大條數(shù)⌒∪荆或者消息數(shù)目已到達(dá)這個上線或者是queue.buffer.max.ms到達(dá)翘瓮,producer才會處理
send.buffer.bytes 100*1024 socket 寫緩存尺寸
client.id “” 這個client id是用戶特定的字符串,在每次請求中包含用來追蹤調(diào)用裤翩,他應(yīng)該邏輯上可以確認(rèn)是那個應(yīng)用發(fā)出了這個請求资盅。

更多細(xì)節(jié)需要查看 scala類
kafka.producer.ProducerConfig

3、4 New Producer Configs
我們正在努力替換現(xiàn)有的producer踊赠。代碼在trunk中是可用的呵扛,可以認(rèn)為beta版本。下面是新producer的配置

Name Type Default Importance Description
boostrap.servers list high 用于建立與kafka集群連接的host/port組筐带。數(shù)據(jù)將會在所有servers上均衡加載今穿,不管哪些server是指定用于bootstrapping。這個列表僅僅影響初始化的hosts(用于發(fā)現(xiàn)全部的servers)伦籍。這個列表格式: host1:port1,host2:port2,... 因為這些server僅僅是用于初始化的連接蓝晒,以發(fā)現(xiàn)集群所有成員關(guān)系(可能會動態(tài)的變化),這個列表不需要包含所有的servers(你可能想要不止一個server帖鸦,盡管這樣芝薇,可能某個server宕機(jī)了)。如果沒有server在這個列表出現(xiàn)富蓄,則發(fā)送數(shù)據(jù)會一直失敗,直到列表可用慢逾。
acks string 1 high producer需要server接收到數(shù)據(jù)之后發(fā)出的確認(rèn)接收的信號立倍,此項配置就是指procuder需要多少個這樣的確認(rèn)信號。此配置實際上代表了數(shù)據(jù)備份的可用性侣滩。以下設(shè)置為常用選項: (1)acks=0: 設(shè)置為0表示producer不需要等待任何確認(rèn)收到的信息口注。副本將立即加到socket buffer并認(rèn)為已經(jīng)發(fā)送。沒有任何保障可以保證此種情況下server已經(jīng)成功接收數(shù)據(jù)君珠,同時重試配置不會發(fā)生作用(因為客戶端不知道是否失斍拗尽)回饋的offset會總是設(shè)置為-1; (2)acks=1: 這意味著至少要等待leader已經(jīng)成功將數(shù)據(jù)寫入本地log策添,但是并沒有等待所有follower是否成功寫入材部。這種情況下,如果follower沒有成功備份數(shù)據(jù)唯竹,而此時leader又掛掉乐导,則消息會丟失。 (3)acks=all: 這意味著leader需要等待所有備份都成功寫入日志浸颓,這種策略會保證只要有一個備份存活就不會丟失數(shù)據(jù)物臂。這是最強(qiáng)的保證旺拉。 (4)其他的設(shè)置,例如acks=2也是可以的棵磷,這將需要給定的acks數(shù)量蛾狗,但是這種策略一般很少用。
buffer.memory long 33554432 high producer可以用來緩存數(shù)據(jù)的內(nèi)存大小仪媒。如果數(shù)據(jù)產(chǎn)生速度大于向broker發(fā)送的速度沉桌,producer會阻塞或者拋出異常,以“block.on.buffer.full”來表明规丽。 這項設(shè)置將和producer能夠使用的總內(nèi)存相關(guān)蒲牧,但并不是一個硬性的限制,因為不是producer使用的所有內(nèi)存都是用于緩存赌莺。一些額外的內(nèi)存會用于壓縮(如果引入壓縮機(jī)制)冰抢,同樣還有一些用于維護(hù)請求。
compression.type string none high producer用于壓縮數(shù)據(jù)的壓縮類型艘狭。默認(rèn)是無壓縮挎扰。正確的選項值是none、gzip巢音、snappy遵倦。 壓縮最好用于批量處理,批量處理消息越多官撼,壓縮性能越好梧躺。
retries int 0 high 設(shè)置大于0的值將使客戶端重新發(fā)送任何數(shù)據(jù),一旦這些數(shù)據(jù)發(fā)送失敗傲绣。注意掠哥,這些重試與客戶端接收到發(fā)送錯誤時的重試沒有什么不同。允許重試將潛在的改變數(shù)據(jù)的順序秃诵,如果這兩個消息記錄都是發(fā)送到同一個partition续搀,則第一個消息失敗第二個發(fā)送成功,則第二條消息會比第一條消息出現(xiàn)要早菠净。
batch.size int 16384 medium producer將試圖批處理消息記錄禁舷,以減少請求次數(shù)。這將改善client與server之間的性能毅往。這項配置控制默認(rèn)的批量處理消息字節(jié)數(shù)牵咙。 不會試圖處理大于這個字節(jié)數(shù)的消息字節(jié)數(shù)。 發(fā)送到brokers的請求將包含多個批量處理攀唯,其中會包含對每個partition的一個請求霜大。 較小的批量處理數(shù)值比較少用,并且可能降低吞吐量(0則會僅用批量處理)革答。較大的批量處理數(shù)值將會浪費(fèi)更多內(nèi)存空間战坤,這樣就需要分配特定批量處理數(shù)值的內(nèi)存大小曙强。
client.id string medium 當(dāng)向server發(fā)出請求時,這個字符串會發(fā)送給server途茫。目的是能夠追蹤請求源頭碟嘴,以此來允許ip/port許可列表之外的一些應(yīng)用可以發(fā)送信息。這項應(yīng)用可以設(shè)置任意字符串囊卜,因為沒有任何功能性的目的娜扇,除了記錄和跟蹤
linger.ms long 0 medium producer組將會匯總?cè)魏卧谡埱笈c發(fā)送之間到達(dá)的消息記錄一個單獨(dú)批量的請求。通常來說栅组,這只有在記錄產(chǎn)生速度大于發(fā)送速度的時候才能發(fā)生雀瓢。然而,在某些條件下玉掸,客戶端將希望降低請求的數(shù)量刃麸,甚至降低到中等負(fù)載一下。這項設(shè)置將通過增加小的延遲來完成--即司浪,不是立即發(fā)送一條記錄泊业,producer將會等待給定的延遲時間以允許其他消息記錄發(fā)送,這些消息記錄可以批量處理啊易。這可以認(rèn)為是TCP種Nagle的算法類似吁伺。這項設(shè)置設(shè)定了批量處理的更高的延遲邊界:一旦我們獲得某個partition的batch.size,他將會立即發(fā)送而不顧這項設(shè)置租谈,然而如果我們獲得消息字節(jié)數(shù)比這項設(shè)置要小的多篮奄,我們需要“l(fā)inger”特定的時間以獲取更多的消息。 這個設(shè)置默認(rèn)為0割去,即沒有延遲窟却。設(shè)定linger.ms=5,例如劫拗,將會減少請求數(shù)目间校,但是同時會增加5ms的延遲矾克。
max.request.size int 1028576 medium 請求的最大字節(jié)數(shù)页慷。這也是對最大記錄尺寸的有效覆蓋。注意:server具有自己對消息記錄尺寸的覆蓋胁附,這些尺寸和這個設(shè)置不同酒繁。此項設(shè)置將會限制producer每次批量發(fā)送請求的數(shù)目,以防發(fā)出巨量的請求控妻。
receive.buffer.bytes int 32768 medium TCP receive緩存大小州袒,當(dāng)閱讀數(shù)據(jù)時使用
send.buffer.bytes int 131072 medium TCP send緩存大小,當(dāng)發(fā)送數(shù)據(jù)時使用
timeout.ms int 30000 medium 此配置選項控制server等待來自followers的確認(rèn)的最大時間弓候。如果確認(rèn)的請求數(shù)目在此時間內(nèi)沒有實現(xiàn)郎哭,則會返回一個錯誤他匪。這個超時限制是以server端度量的,沒有包含請求的網(wǎng)絡(luò)延遲
block.on.buffer.full boolean true low 當(dāng)我們內(nèi)存緩存用盡時夸研,必須停止接收新消息記錄或者拋出錯誤邦蜜。默認(rèn)情況下,這個設(shè)置為真亥至,然而某些阻塞可能不值得期待悼沈,因此立即拋出錯誤更好。設(shè)置為false則會這樣:producer會拋出一個異常錯誤:BufferExhaustedException姐扮, 如果記錄已經(jīng)發(fā)送同時緩存已滿
metadata.fetch.timeout.ms long 60000 low 是指我們所獲取的一些元素?fù)?jù)的第一個時間數(shù)據(jù)絮供。元素?fù)?jù)包含:topic,host茶敏,partitions壤靶。此項配置是指當(dāng)?shù)却負(fù)?jù)fetch成功完成所需要的時間,否則會跑出異常給客戶端睡榆。
metadata.max.age.ms long 300000 low 以微秒為單位的時間萍肆,是在我們強(qiáng)制更新metadata的時間間隔。即使我們沒有看到任何partition leadership改變胀屿。
metric.reporters list [] low 類的列表塘揣,用于衡量指標(biāo)。實現(xiàn)MetricReporter接口宿崭,將允許增加一些類亲铡,這些類在新的衡量指標(biāo)產(chǎn)生時就會改變。JmxReporter總會包含用于注冊JMX統(tǒng)計
metrics.num.samples int 2 low 用于維護(hù)metrics的樣本數(shù)
metrics.sample.window.ms long 30000 low metrics系統(tǒng)維護(hù)可配置的樣本數(shù)量葡兑,在一個可修正的window size奖蔓。這項配置配置了窗口大小,例如讹堤。我們可能在30s的期間維護(hù)兩個樣本吆鹤。當(dāng)一個窗口推出后,我們會擦除并重寫最老的窗口
recoonect.backoff.ms long 10 low 連接失敗時洲守,當(dāng)我們重新連接時的等待時間疑务。這避免了客戶端反復(fù)重連
retry.backoff.ms long 100 low 在試圖重試失敗的produce請求之前的等待時間。避免陷入發(fā)送-失敗的死循環(huán)中梗醇。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末知允,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子叙谨,更是在濱河造成了極大的恐慌温鸽,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,839評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件手负,死亡現(xiàn)場離奇詭異涤垫,居然都是意外死亡姑尺,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評論 2 382
  • 文/潘曉璐 我一進(jìn)店門蝠猬,熙熙樓的掌柜王于貴愁眉苦臉地迎上來股缸,“玉大人,你說我怎么就攤上這事吱雏《匾觯” “怎么了?”我有些...
    開封第一講書人閱讀 153,116評論 0 344
  • 文/不壞的土叔 我叫張陵歧杏,是天一觀的道長镰惦。 經(jīng)常有香客問我,道長犬绒,這世上最難降的妖魔是什么旺入? 我笑而不...
    開封第一講書人閱讀 55,371評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮凯力,結(jié)果婚禮上茵瘾,老公的妹妹穿的比我還像新娘。我一直安慰自己咐鹤,他們只是感情好拗秘,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,384評論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著祈惶,像睡著了一般雕旨。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上捧请,一...
    開封第一講書人閱讀 49,111評論 1 285
  • 那天凡涩,我揣著相機(jī)與錄音,去河邊找鬼疹蛉。 笑死捷绒,一個胖子當(dāng)著我的面吹牛携丁,可吹牛的內(nèi)容都是我干的磁滚。 我是一名探鬼主播逼庞,決...
    沈念sama閱讀 38,416評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼筑舅!你這毒婦竟也來了座慰?” 一聲冷哼從身側(cè)響起陨舱,我...
    開封第一講書人閱讀 37,053評論 0 259
  • 序言:老撾萬榮一對情侶失蹤翠拣,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后游盲,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體误墓,經(jīng)...
    沈念sama閱讀 43,558評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡蛮粮,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,007評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了谜慌。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片然想。...
    茶點(diǎn)故事閱讀 38,117評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖欣范,靈堂內(nèi)的尸體忽然破棺而出变泄,到底是詐尸還是另有隱情,我是刑警寧澤恼琼,帶...
    沈念sama閱讀 33,756評論 4 324
  • 正文 年R本政府宣布妨蛹,位于F島的核電站,受9級特大地震影響晴竞,放射性物質(zhì)發(fā)生泄漏蛙卤。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,324評論 3 307
  • 文/蒙蒙 一噩死、第九天 我趴在偏房一處隱蔽的房頂上張望颤难。 院中可真熱鬧,春花似錦已维、人聲如沸行嗤。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽昂验。三九已至,卻和暖如春艾扮,著一層夾襖步出監(jiān)牢的瞬間既琴,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評論 1 262
  • 我被黑心中介騙來泰國打工泡嘴, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留甫恩,地道東北人。 一個月前我還...
    沈念sama閱讀 45,578評論 2 355
  • 正文 我出身青樓酌予,卻偏偏與公主長得像磺箕,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子抛虫,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,877評論 2 345

推薦閱讀更多精彩內(nèi)容