kafka架構(gòu):
kafka應(yīng)用場(chǎng)景:
1)大數(shù)據(jù)領(lǐng)域
2)數(shù)據(jù)集成
3)流計(jì)算集成
kafka消息生產(chǎn)者發(fā)送消息是批量發(fā)送捌省,默認(rèn)是16Kb發(fā)送一次
pros.put("batch.size", 16384) #16kb
批量發(fā)送等待時(shí)間
pros.put("linger.er", 5)
若5秒鐘內(nèi)未到達(dá)16kb也發(fā)送弟疆,批量發(fā)送時(shí)間和發(fā)送等待時(shí)間滿足一個(gè)即可發(fā)送
kakfa消息生產(chǎn)者只支持pull模式獲取消息
原因:因?yàn)閗afka使用于大量消息處理菇曲,在大量消息發(fā)送到broker中時(shí)若使用push模式推送消息給消息消費(fèi)者缚态,很容易由于消息生產(chǎn)者達(dá)到消費(fèi)能力極限蓝谨,導(dǎo)致消費(fèi)者宕機(jī)
kafka消費(fèi)者一次活動(dòng)多少條消息可配置:
conusmer.poll(Duration.ofMillis(1000)) #默認(rèn)每次獲取1000條消息
生產(chǎn)者和消費(fèi)者通過topic關(guān)聯(lián)
kafka消息在服務(wù)器上默認(rèn)的存儲(chǔ)位置:/tmp/kafka-logs
每個(gè)分區(qū)有一套文件:.index文件垛玻、.log文件摹察、.timeindex文件
具體消息存儲(chǔ)在.log文件內(nèi)冤留,消息會(huì)不斷追加到.log文件內(nèi)不會(huì)刪除(和rabbitmq不同碧囊,rabbitmq中消費(fèi)過的消息會(huì)被刪除)
這樣做的好處:可以實(shí)現(xiàn)順序?qū)懀ㄡ槍?duì)一個(gè)分區(qū)),速度高纤怒;提供了消費(fèi)歷史消息的能力
kafka topic 可分區(qū) partition
注意:
- kafka啟動(dòng)時(shí)糯而,設(shè)置副本數(shù)量不可比節(jié)點(diǎn)broker數(shù)量大,否則會(huì)報(bào)錯(cuò)
不同節(jié)點(diǎn)中副本有不同的角色(leader節(jié)點(diǎn)泊窘、fellow節(jié)點(diǎn))只有l(wèi)eader才提供讀(消費(fèi)者讀)寫(生產(chǎn)者寫)功能熄驼,fellow節(jié)點(diǎn)只有備份功能
為什么kafka的設(shè)計(jì)不實(shí)現(xiàn)不同節(jié)點(diǎn)讀寫分離功能:
避免了讀寫一致性問題
.log文件達(dá)到閾值kafka會(huì)對(duì)文件進(jìn)行分段segment,每個(gè)segment會(huì)有一條文件(index文件像寒、.log文件、.timeindex文件)
閾值大泄霞帧:log.sement.bytes = 1G
同一個(gè)kafka消息消費(fèi)者組內(nèi)的消費(fèi)者不會(huì)同時(shí)消費(fèi)同一個(gè)topic下相同的partition的消息诺祸;當(dāng)消費(fèi)者數(shù)量大于partition數(shù)量時(shí),多余的消費(fèi)者沒有可以消費(fèi)的partition祭芦;當(dāng)partition數(shù)量大于消費(fèi)者數(shù)量時(shí)筷笨,消費(fèi)者組中某個(gè)消費(fèi)者可以消費(fèi)多個(gè)partition
consumer offset:用于記錄消費(fèi)者消費(fèi)消息的位置,保存在_consumer_offset-0(存儲(chǔ)了某一個(gè)partition和某一個(gè)消費(fèi)者消費(fèi)消息的偏移量也就是位置)
kafka防止消息重復(fù)消費(fèi)問題(冪等),消息生產(chǎn)者配置開關(guān):
kafka只能保證一次會(huì)話里針對(duì)單個(gè)partition的消息冪等
props.put("enable.idempotence", true)
在broker會(huì)生成冪等標(biāo)志
kafka針對(duì)過個(gè)分區(qū)的冪等(kafka的事務(wù)实束,保證多消息的原子性):
解釋: 保證多個(gè)消息同時(shí)成功或者同時(shí)失敗
事務(wù)消息使用場(chǎng)景:
1奥秆、發(fā)送多條消息
2、發(fā)送消息到多個(gè)topic或者多個(gè)partition
3咸灿、消費(fèi)以后發(fā)出消息 consume-process-produce(解釋:從一個(gè)上游系統(tǒng)接收消息构订,同時(shí)又將消息發(fā)送到一個(gè)下游系統(tǒng))
代碼實(shí)現(xiàn)(偽代碼):
#初始化事務(wù)
producer.initTransactions();
try{
#開啟事務(wù)
producer.beginTransaction()
producer.send();
producer.send();
...
#提交事務(wù)
producer.commitTransaction()
} catch(KafkaException e){
#終止事務(wù)
producer.abortTransaction()
}
#應(yīng)用于上面場(chǎng)景3(消費(fèi)以后發(fā)出消息)
producer.sendOffsetsToTransaction()
producer.close();
kafka事務(wù)原理:
分布式事務(wù)的思想:2pc ,3pc避矢, TCC
1悼瘾、kafka應(yīng)用了分布式事務(wù)里的兩階段思想:第一階段是預(yù)提交,第二階段才是真正的提交审胸,
2亥宿、兩階段必須需要一個(gè)協(xié)調(diào)者的角色(Transaction Coordinator: 在kafka服務(wù)端)
3、事務(wù)日志:topic_transaction_state砂沛,當(dāng)服務(wù)異常等極端情況kafka服務(wù)重啟以后烫扼,會(huì)通過查找事務(wù)日志檢查每一個(gè)事務(wù)狀態(tài)(topic_transaction_state),處理未完成的事務(wù)消息碍庵。
4映企、生產(chǎn)者事務(wù)ID:transaction.id,極端情況下生成者掛了事務(wù)發(fā)送執(zhí)行到一半静浴,當(dāng)生產(chǎn)者重啟時(shí)堰氓,服務(wù)當(dāng)如何判斷是哪一個(gè)事務(wù)就是通過生產(chǎn)者事務(wù)ID來判斷的