分布式系統(tǒng)中,系統(tǒng)由多個(gè)子系統(tǒng)組成愈腾,數(shù)據(jù)需要在子系統(tǒng)中高性能憋活、低延遲的流轉(zhuǎn)。Kafka是"發(fā)布-訂閱"消息系統(tǒng)虱黄,是一個(gè)分布式的悦即、分區(qū)的日志服務(wù),用于處理活躍的流式數(shù)據(jù)
特點(diǎn)
- 高吞吐量:同時(shí)為發(fā)布和訂閱提供高吞吐量,讀速度為100MB/s辜梳,寫速度為50MB/s
- 持久化:消息持久化在磁盤上粱甫,同時(shí)replication防止數(shù)據(jù)丟失
- 分布式:所有的producer、broker和consumer都支持多個(gè)
- 消息處理狀態(tài)由consumer端使用offset維護(hù)作瞄,而不是由broker維護(hù)
Kafka中Zookeeper的作用
Zookeeper是1個(gè)分布式系統(tǒng)中的協(xié)調(diào)服務(wù)茶宵,主要用于集群中不同節(jié)點(diǎn)間的相互通信,解決節(jié)點(diǎn)一致性問題
Kafka中宗挥,Zookeeper用于存儲Consumer Group信息和偏移量offset乌庶,分區(qū)再平衡,實(shí)時(shí)監(jiān)控節(jié)點(diǎn)信息属韧,Broker leader選舉安拟,Producer和Consumer用Zookeeper來發(fā)現(xiàn)Broker列表
(Producer和Consumer需要知道哪些Broker是可用的,沒有zk宵喂,每個(gè)Producer和Consumer客戶端在生產(chǎn)和消費(fèi)之前都需要檢查Broker是否可用糠赦,效率太低)
Zookeeper不保存消息,消息保存在Broker上锅棕。從Kafka0.9開始拙泽,Consumer group和offset信息不保存在zk上,也保存在Broker上
- Broker
(1)Broker啟動時(shí)裸燎,向zk注冊顾瞻。在zk中,Broker是1個(gè)臨時(shí)節(jié)點(diǎn)znode德绿,當(dāng)Broker與zk斷開連接時(shí)衩茸,znode被刪除
/brokers/ids/brokerid
(2)Broker啟動時(shí)腥沽,向zk注冊持有的topic和partition,是1個(gè)臨時(shí)節(jié)點(diǎn)
/brokers/topics/topic_name/partition_index
(3)集群中的Broker在啟動時(shí),會進(jìn)行l(wèi)eader選舉狡恬。Broker會嘗試在zk上創(chuàng)建/controller臨時(shí)節(jié)點(diǎn)虚缎,成功創(chuàng)建的Broker會成為集群的leader蚣旱,其它Broker在leader確定時(shí)钦购,會收到"節(jié)點(diǎn)已存在"的消息,并在leader創(chuàng)建的/controller臨時(shí)節(jié)點(diǎn)上設(shè)置Watch都许。當(dāng)leader掛掉后稻薇,leader創(chuàng)建的臨時(shí)節(jié)點(diǎn)會被zk刪除,其它Broker會收到Watch通知胶征,并嘗試創(chuàng)建/controller臨時(shí)節(jié)點(diǎn)塞椎,創(chuàng)建成功的Broker成為新的leader
(4)Broker leader負(fù)責(zé)Partition leader的選舉
當(dāng)1個(gè)分區(qū)的leader所在的Broker掛掉時(shí),Broker會遍歷分區(qū)的所有副本睛低,在副本中選擇1個(gè)新的分區(qū)leader忱屑,簡單說蹬敲,就是在isr列表中選擇第1個(gè)副本作為新的分區(qū)leader。選出新的分區(qū)leader后莺戒,Broker leader向所有Broker通知這1變動,新的分區(qū)leader開始處理讀寫請求急波,分區(qū)follower開始同步新的分區(qū)leader的消息
- Producer
使用zk發(fā)現(xiàn)Broker列表从铲,和Topic下的Partition的leader建立連接
- Consumer
發(fā)現(xiàn)Broker列表,和Topic下的Partition的leader建立連接澄暮,注冊Consumer信息
Producer和Consumer需要知道哪些Broker是可用的名段,沒有zk,每個(gè)Producer和Consumer客戶端在生產(chǎn)和消費(fèi)之前都需要檢查Broker是否可用泣懊,效率太低
相關(guān)概念
- 生產(chǎn)者Producer伸辟,向Kafka集群發(fā)送消息
- 消費(fèi)者Consumer,與Kafka集群中的Broker實(shí)例建立長連接馍刮,不斷拉取消息信夫,然后進(jìn)行處理
- 主題Topic,通過對消息指定主題卡啰,來對消息分類
(1)1個(gè)主題Topic被認(rèn)為是1類消息静稻,每個(gè)Topic被分為多個(gè)分區(qū)Partition,Partition在存儲層面是1個(gè)增量的文件匈辱,發(fā)布到partition的消息會被追加到文件的末尾振湾,每條消息在文件中的位置稱為偏移量offset,是1long型的數(shù)字亡脸,唯一標(biāo)記1條消息
(2)創(chuàng)建分區(qū)時(shí)押搪,需要指定分區(qū)數(shù)partition和復(fù)制系數(shù)(默認(rèn)都是1)
分區(qū)數(shù)越大,吞吐量越大浅碾,需要的資源也就越多大州,Kafka集群在接收到Producer生產(chǎn)的消息時(shí),會根據(jù)均衡策略(輪詢及穗,Key-Hash摧茴,Random隨機(jī))將消息存儲到不同的partition。通過partition存儲消息埂陆,既可以存儲更多的數(shù)據(jù)(partition分布在多個(gè)Broker上苛白,避免文件達(dá)到單機(jī)磁盤上限),也可以提高讀寫效率(多個(gè)Broker處理讀寫操作的效率焚虱,肯定大于1個(gè)Broker)
復(fù)制系數(shù)的目的是冗余備份购裙,分為leader和follower,leader負(fù)責(zé)處理所有的讀寫請求鹃栽,follower會定期同步leader的數(shù)據(jù)躏率,當(dāng)leader掛掉,Kafka會選擇1個(gè)follower成為leader
創(chuàng)建Topic時(shí),Kafka集群會決定如何將partition及其副本分配到Broker上薇芝。規(guī)則是
(a)不同Partition的leader分配到不同的Broker上蓬抄,相同Partition的leader和follower分配到不同的Broker上
例如,有5個(gè)Broker夯到,Toipc有10個(gè)Partition嚷缭,復(fù)制系數(shù)為3,則需要在5個(gè)Broker上分配30個(gè)副本
(a)隨機(jī)選擇1個(gè)Broker耍贾,假如是Broker4阅爽,將Partition1的leader分配給Broker4,之后使用輪詢的方式將其他leader分配給Broker,Partition2的leader分配給Broker5荐开,Partition3的leader分配給Broker1
(b)分配完P(guān)artition leader付翁,會分配follower,需要保證1個(gè)Partition的leader和follower分配在不同的Broker
(3)被消費(fèi)的消息不是立刻被刪除晃听,文件會根據(jù)Broker的配置百侧,保留一段時(shí)間后才刪除。默認(rèn)為7天
- 偏移量offset杂伟,Consumer持有的元數(shù)據(jù)移层,offset是Consumer當(dāng)前消費(fèi)消息在Kafka文件中的位置
分組消費(fèi)
Consumer需要做1些高延遲的操作,例如數(shù)據(jù)寫入DB赫粥、使用數(shù)據(jù)進(jìn)行耗時(shí)計(jì)算观话。此時(shí),單個(gè)Consumer無法跟上Producer生產(chǎn)數(shù)據(jù)的速度越平,使用分組消費(fèi)模式频蛔,增加Consumer的個(gè)數(shù),提高處理能力
分區(qū)再平衡 Rebalance
分組消費(fèi)模式下秦叛,1個(gè)Consumer只能消費(fèi)1個(gè)Topic下的1個(gè)分區(qū)晦溪,Consumer和Partition是有對應(yīng)關(guān)系的,當(dāng)出現(xiàn)1些情況挣跋,例如
(1)Consumer加入Consumer Group
(2)Consumer退出Consumer Group或取消訂閱
(3)Topic下的partition個(gè)數(shù)增加
Consumer Group中的協(xié)調(diào)者Coordinator使用分區(qū)再平衡機(jī)制三圆,來調(diào)整Consumer和Partition的對應(yīng)關(guān)系Coordinator
Kafka0.9之后,Consumer的offset不再存放在zk上避咆,而是存在Broker上舟肉,存放的位置是使用Math.abs()對groupId.hashCode()取絕對值,再對offsets.topic.num.partitions(配置文件中的offset分區(qū)總數(shù)查库,默認(rèn)50)取余
Kafka0.9后路媚,每個(gè)Consumer Group都有1個(gè)coordinator,用來管理Consumer Group和offset樊销。coordinator就是當(dāng)前Consumer Group存放offset的這個(gè)partition的leader所在的Broker整慎。coordinator 負(fù)責(zé)與Consumer Group中的所有Consumer進(jìn)行協(xié)調(diào)通信
(1)定期發(fā)送心跳脏款,監(jiān)控Consumer存活情況
(2)Consumer離開Consumer Group時(shí),會告訴coordinator
(3)將parititon的分配情況裤园,通知組內(nèi)所有Consumer
Kafka提供了2套API撤师,High-Level Consumer API和SimpleConsumer API
High-Level Consumer API提供了對Consumer Group各種操作的封裝,在消費(fèi)消息時(shí)比然,不需要關(guān)注offset的提交丈氓,會自動提交;若使用多線程
(1)如果Consumer線程>partition强法,會有線程收不到消息
(2)如果Consumer線程<partition,會有線程收到多個(gè)partition的消息
(3)如果1個(gè)線程消費(fèi)多個(gè)partition湾笛,無法保證收到消息的順序饮怯,而1個(gè)partition中的消息時(shí)有序的
如果Consumer希望從頭開始消費(fèi)Partition的全量數(shù)據(jù)
(1)使用新的Consumer Group,即"group.id"
(2)指定"auto.offset.reset"為earliest
[KafkaConsumer is not safe for multi-threaded access]
SimpleConsumer API可以多Kafka進(jìn)行更精確的控制嚎研,但是需要自己編寫代碼蓖墅,處理offset的提交,leader的變更等
生產(chǎn)者
消息發(fā)送流程:
(1)創(chuàng)建KafkaProducer對象临扮,初始化組件论矾,例如緩沖區(qū),發(fā)送消息線程
(2)創(chuàng)建ProducerRecord消息對象杆勇,設(shè)置消息Topic贪壳,key和value,將key和value序列化為byte[]
(3)分區(qū)器為消息選擇合適的分區(qū)partition蚜退,默認(rèn)使用輪詢闰靴,可實(shí)現(xiàn)Partitioner接口的partition方法自定義分區(qū)
(4)消息會根據(jù)partition發(fā)送到不同的暫存區(qū)暫存
(5)后臺消息發(fā)送線程Sender,從暫存區(qū)中獲取消息钻注,發(fā)送給Broker
(6)如果消息成功發(fā)送到Broker蚂且,返回RecordMetadata對象,包含消息目標(biāo)分區(qū)幅恋,offset和時(shí)間戳杏死;如果發(fā)生失敗,且設(shè)置了重試次數(shù)捆交,進(jìn)行重試淑翼,否則返回失敗
創(chuàng)建配置對象Properties
(1)Broker地址 bootstrap.servers (localhost:9101)
(2)消息寫入成功策略 acks all (0,Producer不等待確認(rèn)零渐;1窒舟,等待分區(qū)leader寫入成功;all诵盼,等待isr所有副本寫入成功)
(3)重試次數(shù) retries 0
(4)緩存大小 batch.size 默認(rèn)16kb惠豺,Producer會將消息緩存银还,當(dāng)消息達(dá)到一定大小時(shí),才一起發(fā)送
(5)每次發(fā)送消息時(shí)洁墙,延遲一定時(shí)間 linger.ms 默認(rèn)0
(6)Producer可用內(nèi)存大小 buffer.memory 默認(rèn)32M
(7)Key和Value的序列化方式創(chuàng)建KafkaProducer對象
Producer<String蛹疯,String> producer = new KafkaProducer<>(props);使用KafkaProducer對象的send(),發(fā)送1個(gè)代表消息的ProducerRecord對象热监。send()是異步的捺弦,返回1個(gè)放在Future中的RecordMetadata對象,如果使用Future.get()獲取RecordMetadata對象孝扛,會阻塞直到有結(jié)果返回列吼;可以在send()上設(shè)置回調(diào)方法Callback實(shí)現(xiàn)無阻塞
消費(fèi)者
創(chuàng)建配置對象Properties
(1)Broker地址 bootstrap.servers
(2)組id group.id
(3)開啟自動提交 enable.auto.commit true
(4)offset重置方式 auto.offset.reset 默認(rèn)latest
earliest 之前提交過offset,從offset開始消費(fèi)苦始;未提交過寞钥,從頭開始消費(fèi)
latest 之前提交過offset,從offset開始消費(fèi)陌选;未提交過理郑,等待分區(qū)產(chǎn)生新的消息,從新消息開始消費(fèi)
none 之前提交過offset咨油,從offset開始消費(fèi)您炉;Consumer Group涉及的分區(qū),有1個(gè)未提交過offset役电,報(bào)錯(cuò)
(5)自動提交頻率 auto.commit.interval.ms 默認(rèn)5s
(6)Key和Value的序列化方式創(chuàng)建KafkaConsumer對象赚爵,訂閱主題
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("sonia"));Kafka High-Level Consumer API在KafkaConsumer對象的poll()中封裝了
(1)Consumer Group管理宴霸,調(diào)用poll()時(shí)囱晴,獲得coordinator和分區(qū)partition分布情況
(2)獲得分區(qū)重平衡后,Consumer和Partition的對應(yīng)信息
(3)向Broker發(fā)送心跳瓢谢,否則超時(shí)Broker會認(rèn)為Consumer死亡畸写,Consumer消費(fèi)的partition會被分配給其他的Consumer
(4)獲得消息
使用時(shí),將poll()放在while(true)循環(huán)中氓扛,不斷執(zhí)行poll()枯芬,不斷的從Broker pull消息;同時(shí)向Broker發(fā)送心跳采郎,維持與Broker之間的連接千所,實(shí)時(shí)感知分區(qū)再平衡后的Consumer與Partition的關(guān)系,并作出調(diào)整
[提交offset的問題]