一、kakfa整體架構(gòu)
二淆两、術(shù)語科普
producer & consumer
producer 是生產(chǎn)者锯玛,負(fù)責(zé)消息生產(chǎn),上游程序中按照標(biāo)準(zhǔn)的消息格式組裝(按照每個(gè)消息事件的字段定義)發(fā)送到指定的topic师骗。producer生產(chǎn)消息的時(shí)候,不會(huì)因?yàn)閏onsumer處理能力不夠讨惩,而阻塞producer的生產(chǎn)辟癌。consumer會(huì)從指定的topic 拉取消息,然后處理消費(fèi)步脓,并提交offset(消息處理偏移量,消費(fèi)掉的消息并不會(huì)主動(dòng)刪除,而是kafka系統(tǒng)根據(jù)保存周期自動(dòng)消除)靴患。
Topic
topic是消費(fèi)分類存儲(chǔ)的隊(duì)列仍侥,可以按照消息類型來分topic存儲(chǔ)。
replication
replication是topic復(fù)制副本個(gè)數(shù)鸳君,用于解決數(shù)據(jù)丟失农渊,防止leader topic宕機(jī)后,其他副本可以快代替或颊。
broker
broker是緩存代理砸紊,Kafka集群中的一臺(tái)或多臺(tái)服務(wù)器統(tǒng)稱broker,用來保存producer發(fā)送的消息囱挑。Broker沒有副本機(jī)制醉顽,一旦broker宕機(jī),該broker的消息將都不可用平挑。
partition
partition是topic的物理分組游添,在創(chuàng)建topic的時(shí)候,可以指定partition 數(shù)量通熄。每個(gè)partition是邏輯有序的唆涝,保證每個(gè)消息都是順序插入的,而且每個(gè)消息的offset在不同partition的是唯一不同的
offset
偏移量唇辨。kafka為每條在分區(qū)的消息保存一個(gè)偏移量offset廊酣,這也是消費(fèi)者在分區(qū)的位置。比如一個(gè)偏移量是5的消費(fèi)者赏枚,表示已經(jīng)消費(fèi)了從0-4偏移量的消息亡驰,下一個(gè)要消費(fèi)的消息的偏移量是5。每次消息處理完后嗡贺,要么主動(dòng)提交offset隐解,要么自動(dòng)提交,把offset偏移到下一位诫睬,如處理offset=6消息煞茫。在kafka配置中,如果enable_auto_commit=True和auto_commit_interval_ms=xx摄凡,那表示每xx 毫秒自動(dòng)提交偏移量
group
分組续徽。是指在消費(fèi)同一topic的不同consumer。每個(gè)consumer都有唯一的groupId亲澡,同一groupId 屬于同一個(gè)group钦扭。不同groupId的consumer相互不影響。對于一個(gè)topic床绪,同一個(gè)group的consumer數(shù)量不能超過 partition數(shù)量客情。比如其弊,Topic A 有 16個(gè)partition,某一個(gè)group下有2個(gè)consumer膀斋,那2個(gè)consumer分別消費(fèi)8個(gè)partition梭伐,而這個(gè)group的consumer數(shù)量最多不能超過16個(gè)。
三仰担、kafka的配置
kafka的配置主要分四類糊识,分別是zookeeper、server摔蓝、consumer赂苗、producer。其他的配置可以忽略贮尉。
zookeeper配置
zk的配置比較簡單拌滋,也可以默認(rèn)不改.dataDir是zk存儲(chǔ)節(jié)點(diǎn)配置的目錄地址,clientPort是zk啟動(dòng)的端口绘盟,默認(rèn)2181鸠真,maxClientCnxns是限制ip的連接此處,設(shè)置0表示無連接次數(shù)龄毡,一般情況根據(jù)業(yè)務(wù)部署情況吠卷,配置合理的值。
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
server配置
############################# Server Basics #############################
# 每一個(gè)broker在集群中的唯一表示沦零,要求是正數(shù)祭隔。當(dāng)該服務(wù)器的IP地址發(fā)生改變時(shí),broker.id沒有變化路操,則不會(huì)影響consumers的消息情況
broker.id=0
############################# Socket Server Settings #############################
#默認(rèn)kafka連接ip和端口
#listeners=PLAINTEXT://:9092
#broker處理消息的最大線程數(shù)疾渴,一般情況下不需要去修改
num.network.threads=3
# broker處理磁盤IO的線程數(shù),數(shù)值應(yīng)該大于你的硬盤數(shù)屯仗,默認(rèn)不修改
num.io.threads=8
# socket的發(fā)送緩沖區(qū)搞坝,根據(jù)可以服務(wù)性能可調(diào)
socket.send.buffer.bytes=102400
# socket的接受緩沖區(qū),根據(jù)可以服務(wù)性能可調(diào)
socket.receive.buffer.bytes=102400
# socket請求的最大數(shù)值魁袜,防止server 內(nèi)存溢出桩撮,message.max.bytes必然要小于socket.request.max.bytes,否則會(huì)被topic創(chuàng)建時(shí)的指定參數(shù)覆蓋
socket.request.max.bytes=104857600
############################# Log Basics #############################
# 日志目錄
log.dirs=/tmp/kafka-logs
# 不指定partition數(shù)量情況下的默認(rèn)數(shù)量
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Retention Policy #############################
# 默認(rèn)保留 7天的消息記錄
log.retention.hours=168
#消息總存儲(chǔ)達(dá)到的最大數(shù)量峰弹,如果超過這個(gè)字節(jié)數(shù)店量,無論是否達(dá)到7天,都會(huì)刪除
log.retention.bytes=1073741824
# topic的分區(qū)是以一堆segment文件存儲(chǔ)的鞠呈,這個(gè)控制每個(gè)segment的大小融师,會(huì)被topic創(chuàng)建時(shí)的指定參數(shù)覆蓋
log.segment.bytes=1073741824
# 日志清理策略選擇有:delete和compact主要針對過期數(shù)據(jù)的處理,或是日志文件達(dá)到限制的額度蚁吝,會(huì)被 topic創(chuàng)建時(shí)的指定參數(shù)覆蓋
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
#zookeeper配置
zookeeper.connect=localhost:2181
# zookeeper的連接超時(shí)
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
producer配置
############################# Producer Basics #############################
#broker 列表旱爆;格式: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092
# 數(shù)據(jù)存儲(chǔ)的壓縮格式舀射,默認(rèn)是none,其他: none, gzip, snappy, lz4, zstd
compression.type=none
# 指定分區(qū)處理類怀伦。默認(rèn)kafka.producer.DefaultPartitioner
#partitioner.class=
# 寫請求的超時(shí)
#request.timeout.ms=
consumer配置
#(必需)zookeeper連接服務(wù)器地址
zookeeper.connect=zk01:2181,ka02:2181,zk03:2181
#zookeeper的session的過期時(shí)間
zookeeper.session.timeout.ms=5000
# zookeeper連接超時(shí)
zookeeper.connectiontimeout.ms=10000
#指定多久消費(fèi)者更新offset到zookeeper中
zookeeper.sync.time.ms=2000
#(必需)consumer group id
group.id=test-consumer-group
#自動(dòng)向zookeeper提交offset信息后控,和下面的auto.commit.interval.ms結(jié)合起來使用,合理的自動(dòng)提交間隔空镜,可以防止rebalance
auto.commit.enable=true
#自動(dòng)更新時(shí)間
auto.commit.interval.ms=1000
#當(dāng)前consumer的標(biāo)識(shí)
consumer.id=xxx
#消費(fèi)者客戶端編號(hào),用于區(qū)分不同客戶端捌朴,默認(rèn)客戶端程序自動(dòng)產(chǎn)生
client.id=xxx
#最大取多少塊緩存到消費(fèi)者(默認(rèn)10)
queued.max.message.chunks=50
#當(dāng)有新的consumer加入到group時(shí)吴攒,將會(huì)reblance.
rebalance.max.retries=5
#獲取消息的最大尺寸,broker不會(huì)向consumer輸出大于此值得chunk
fetch.min.bytes=655360
#當(dāng)消息尺寸不足時(shí),server阻塞的時(shí)間,如果超時(shí)砂蔽,立即發(fā)送給consumer
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360
#重置reset的極致洼怔,smallest:自動(dòng)設(shè)置reset到最小的offset.
auto.offset.reset=smallest