Kafka是一個(gè)消息中間件,這是很多人對(duì)Kafka的第一印象觅够,而Apache Kafka將自己定位為一個(gè)分布式的流處理平臺(tái)(A distributed streaming platform)态蒂,其實(shí)縱觀Kafka發(fā)展的歷史,Kafka是從消息引擎起家的辽社,但是發(fā)展至今伟墙,它已經(jīng)不只是消息引擎了。針對(duì)“What is Kafka滴铅?”這個(gè)問(wèn)題戳葵,可以用一句話概括為,Apache Kafka是一個(gè)消息引擎系統(tǒng)汉匙,也是一個(gè)分布式的流處理平臺(tái)拱烁,除此之外,Kafka還能被用作分布式儲(chǔ)存系統(tǒng)噩翠。具體來(lái)說(shuō)戏自,在0.10.0.0版本之前,Kafka社區(qū)將其定位為一個(gè)分布式伤锚、分區(qū)化且?guī)浞莨δ艿奶峤蝗罩痉?wù)擅笔,Kafka社區(qū)在0.10.0.0版本正式推出了流處理組件Kafka Streams,從這個(gè)版本開(kāi)始Kafka變身成為分布式的流處理平臺(tái)屯援,至于作為分布式儲(chǔ)存系統(tǒng)猛们,這個(gè)鮮有在生產(chǎn)環(huán)境使用的案例。
Topic: 主題是消息的邏輯容器狞洋;
Broker: 一個(gè)broker代表一臺(tái)Kafka服務(wù)器弯淘,一個(gè)Kafka集群由多個(gè)broker組成;
Partition:每個(gè)topic分為多個(gè)分區(qū)吉懊,多個(gè)分區(qū)分布在不同的broker上庐橙,每個(gè)分區(qū)是一組有序的消息日志;
Producer:生產(chǎn)者借嗽,向主題發(fā)送消息應(yīng)用程序怕午;
Consumer:消費(fèi)者,從主題訂閱消息并消費(fèi)的應(yīng)用程序淹魄;
Replica:副本郁惜,每個(gè)分區(qū)都有一個(gè)或者多個(gè)副本,副本主要用作數(shù)據(jù)冗余和failover,副本分為L(zhǎng)eader和Follower兆蕉,Leader提供讀寫(xiě)服務(wù)羽戒,F(xiàn)ollower負(fù)責(zé)從Leader副本來(lái)去日志消息;
Record:消息虎韵,Kafka處理的主要對(duì)象易稠;
Consumer offset:消費(fèi)者位移,用于表征消費(fèi)者的消費(fèi)進(jìn)度包蓝;
Consumer group:多個(gè)消費(fèi)者實(shí)例公用一個(gè)groupId構(gòu)成一個(gè)消費(fèi)組驶社,可以同時(shí)消費(fèi)多個(gè)分區(qū)的消息;
Rebalance:消費(fèi)者組重新分配訂閱主題分區(qū)的過(guò)程测萎,是Kafka消費(fèi)端實(shí)現(xiàn)高可用的重要手段
1)為何要分區(qū)
分區(qū)提供負(fù)載均衡的能力亡电,實(shí)現(xiàn)了系統(tǒng)的高伸縮性,不同的分區(qū)可以放置到不同節(jié)點(diǎn)的機(jī)器上硅瞧,數(shù)據(jù)的讀寫(xiě)操作都是針對(duì)分區(qū)粒度進(jìn)行的份乒,這樣每個(gè)節(jié)點(diǎn)的機(jī)器都能獨(dú)立的執(zhí)行各自分區(qū)的讀寫(xiě)操作,還能夠通過(guò)增加節(jié)點(diǎn)的方式來(lái)提高系統(tǒng)的吞吐量腕唧。
2)分區(qū)策略或辖,各有什么特點(diǎn)
常見(jiàn)的分區(qū)策略包括:輪詢(xún)策略(Round-robin)、隨機(jī)策略(Randomness)枣接、按消息key保存策略(Key-ordering)等
輪詢(xún)策略指的是將消息順序發(fā)送到不同的分區(qū)颂暇,比如一個(gè)topic有兩個(gè)分區(qū),那么第一條消息分送到分區(qū)0但惶,第二條消息發(fā)送到分區(qū)1耳鸯,第三條消息發(fā)送到分區(qū)0,以此類(lèi)推榆骚,輪詢(xún)策略具有非常優(yōu)秀的負(fù)載均衡表現(xiàn)片拍,能夠讓消息最大限度的均勻分布在不同的分區(qū)上煌集,這是Kafka Java API生產(chǎn)端默認(rèn)的分區(qū)策略妓肢;隨進(jìn)策略的思想是每次發(fā)送的消息時(shí)生成一個(gè)代表分區(qū)的隨機(jī)數(shù),力求將消息均勻的分布在各個(gè)分區(qū)苫纤,其負(fù)載均衡表現(xiàn)沒(méi)有輪詢(xún)策略穩(wěn)定碉钠;按消息key保存策略是求取key的哈希值對(duì)分區(qū)數(shù)取模來(lái)確定消息發(fā)送到那個(gè)分區(qū),Kafka允許對(duì)每條消息定義消息key卷拘,這個(gè)key一般是一個(gè)有著明確業(yè)務(wù)含義的字符串喊废,用于表示不同的業(yè)務(wù)邏輯,這種策略能夠保證同一個(gè)key的消息進(jìn)入同一個(gè)分區(qū)栗弟,從而保證消息的順序性污筷。
Kafka默認(rèn)的分區(qū)策略是,如果消息指定了key乍赫,則按照消息key保存策略進(jìn)行分區(qū)瓣蛀,如果沒(méi)有指定key陆蟆,則按照輪詢(xún)策略進(jìn)行消息分區(qū)。除了上述三種常用的分區(qū)策略之外惋增,Kafka提供了接口可以實(shí)現(xiàn)定制化的分區(qū)策略叠殷,比如通過(guò)消息所在的區(qū)域來(lái)進(jìn)行分區(qū)來(lái)降低網(wǎng)絡(luò)時(shí)延。
Kafka對(duì)已提交的消息作有限度的持久化保證诈皿,首先要理解什么是已提交的消息林束,當(dāng)若干個(gè)Broker成功接收到一條消息并寫(xiě)入日志文件后,會(huì)告訴生產(chǎn)者這條消息已成功提交稽亏,此時(shí)這條消息才是“已提交”的消息壶冒,有限度指的是保存消息的分區(qū)至少有一個(gè)存活。
消息丟失分為生產(chǎn)端消息丟失和消費(fèi)端消息丟失
1)生產(chǎn)端消息丟失
目前生產(chǎn)者API發(fā)送消息是異步的措左,
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}
這個(gè)方法是立即返回的依痊,但是我們不能認(rèn)為此時(shí)消息發(fā)送成功了,如果網(wǎng)絡(luò)抖動(dòng)或者消息太大超過(guò)了broker的承受能力等原因都可能導(dǎo)致消息發(fā)送失敗怎披,對(duì)于Kafka來(lái)說(shuō)這條消息不是“已提交”的消息胸嘁,故不能保證它的持久性,如果沒(méi)有重試的補(bǔ)救措施凉逛,那么消息就“丟失”了性宏。
2)消費(fèi)端消息丟失
Consumer端用offset即位移的概念來(lái)表示消費(fèi)者當(dāng)前消費(fèi)到的Topic分區(qū)的位置,下圖清晰的展示了Consumer端的位移數(shù)據(jù)状飞。
這個(gè)位移就像是我們讀書(shū)時(shí)用到的書(shū)簽一樣毫胜,書(shū)簽標(biāo)記了我們讀書(shū)的位置,更新書(shū)簽的正確方式應(yīng)該是先看書(shū)然后更新書(shū)簽的位置诬辈,如果順序顛倒了酵使,即先更新了書(shū)簽,然后看書(shū)焙糟,若看書(shū)過(guò)程中被打斷口渔,下次過(guò)來(lái)從書(shū)簽的位置開(kāi)始看書(shū),就有可能丟失前面的內(nèi)容穿撮,Kafka消費(fèi)端消息丟失的場(chǎng)景和這個(gè)看書(shū)的場(chǎng)景是一樣的缺脉。我曾今遇到一個(gè)線上消費(fèi)端丟失消息的場(chǎng)景,采用的是自動(dòng)提交的方式悦穿,消費(fèi)端開(kāi)啟線程池來(lái)消費(fèi)消息攻礼,消息放在JVM的阻塞隊(duì)列中排隊(duì)消費(fèi),當(dāng)JVM進(jìn)程發(fā)生OOM等異常重啟時(shí)栗柒,阻塞隊(duì)列中的消息就全部丟失了礁扮。
Kafka無(wú)消息丟失的最佳實(shí)踐
- 永遠(yuǎn)使用帶有回調(diào)函數(shù)的send方法
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
- 設(shè)置acks=all,acks是Producer端的參數(shù),代表了對(duì)“已提交消息”的定義太伊,設(shè)置為all表示所有副本都要接收到消息负蠕,該消息才算“已提交”
- 設(shè)置retries為一個(gè)比較大的值,retries是Producer端的參數(shù)倦畅,表示發(fā)送失敗后的重試次數(shù)遮糖,可以一定程度的應(yīng)對(duì)網(wǎng)絡(luò)抖動(dòng)等原因造成的消息發(fā)送失敗
- 設(shè)置unclean.leader.election.enable=false,控制不允許Unclean領(lǐng)導(dǎo)者選舉
什么是Unclean領(lǐng)導(dǎo)者選舉叠赐?對(duì)于一個(gè)消息分區(qū)來(lái)說(shuō)欲账,kafka內(nèi)部會(huì)維護(hù)一個(gè)ISR副本集合,正常情況下芭概,這個(gè)副本集合至少包含一個(gè)副本赛不,即Leader副本,ISR集合為空時(shí)罢洲,說(shuō)明Leader副本不能對(duì)外提供服務(wù)踢故,此時(shí)需要進(jìn)行Leader選舉,Kafka把所有不在ISR副本集合中的其他存活副本統(tǒng)稱(chēng)為非同步副本惹苗,通常來(lái)說(shuō)殿较,非同步副本可能落后Leader副本太多,如果選擇這些副本中的一個(gè)作為新的Leader副本桩蓉,可能會(huì)造成數(shù)據(jù)丟失淋纲,unclean.leader.election.enable參數(shù)控制是否允許進(jìn)行Unclean領(lǐng)導(dǎo)者選舉,如果設(shè)置為開(kāi)啟院究,可以提高系統(tǒng)的可用性洽瞬,不會(huì)對(duì)外停止服務(wù),如果設(shè)置為關(guān)閉业汰,提高了分布式系統(tǒng)的一致性伙窃,根據(jù)CAP理論,Kafka把C和A的選擇權(quán)交給用戶(hù)样漆。建議設(shè)置為關(guān)閉为障,因?yàn)檫€有很多途徑實(shí)現(xiàn)高可用,數(shù)據(jù)丟失可能會(huì)造成一些無(wú)法挽回的損失氛濒。 - 設(shè)置replication.factor>=3产场,這個(gè)是broker端的參數(shù)鹅髓,設(shè)置合理的副本數(shù)舞竿,通過(guò)消息冗余防止消息丟失
- 設(shè)置min.insync.replicas>1,broker端的參數(shù)窿冯,規(guī)定消息至少要寫(xiě)入多少個(gè)副本才算是“已提交”
- 確保replication.factor>min.insync.replicas骗奖,如果replication.factor=min.insync.replicas,那么只要有一個(gè)副本掛掉了,整個(gè)分區(qū)就無(wú)法正常工作了执桌,可用性差
- 設(shè)置enable.auto.commit=false鄙皇,采用手動(dòng)提交的方式,確保消息消費(fèi)完成之后再提交
消費(fèi)者組(Consumer Group)是Kafka提供的可擴(kuò)展且具有容錯(cuò)性的消費(fèi)機(jī)制仰挣,多個(gè)消費(fèi)者實(shí)例共用一個(gè)GroupId構(gòu)成一個(gè)消費(fèi)組伴逸,組內(nèi)的所有消費(fèi)者協(xié)調(diào)在一起消費(fèi)訂閱主題的所有分區(qū),消費(fèi)組有三個(gè)特性:
- Consumer Group小可以有一個(gè)或者多個(gè)消費(fèi)者實(shí)例膘壶,實(shí)例可以是單獨(dú)的進(jìn)程错蝴,也可以是同一個(gè)進(jìn)程下的不同線程;
- Group ID是一個(gè)字符串颓芭,在Kafka集群中顷锰,它能夠唯一標(biāo)示一個(gè)消費(fèi)組;
- 單個(gè)分區(qū)只能被消費(fèi)組中的一個(gè)消費(fèi)者實(shí)例消費(fèi)亡问,當(dāng)然它能夠被其它消費(fèi)組的消費(fèi)者消費(fèi)官紫。
消費(fèi)者組是Kafka中比較亮眼的設(shè)計(jì),Kafka用消費(fèi)者組的機(jī)制實(shí)現(xiàn)了消息引擎系統(tǒng)的兩大模型州藕,即點(diǎn)對(duì)點(diǎn)的模型和發(fā)布訂閱模型束世,如果所有實(shí)例屬于同一個(gè)Group,那么它實(shí)現(xiàn)的是點(diǎn)對(duì)點(diǎn)的消息隊(duì)列模型床玻,如果所有實(shí)例屬于不同的消費(fèi)組良狈,那么它實(shí)現(xiàn)的發(fā)布訂閱模型。
Rebalance本質(zhì)上是一種協(xié)議笨枯,規(guī)定了消費(fèi)組的所有消費(fèi)者實(shí)例如何達(dá)成一致薪丁,來(lái)分配訂閱的topic的所有分區(qū)。Rebalance的觸發(fā)條件有三個(gè): - 消費(fèi)組成員發(fā)生變更馅精,如新加入消費(fèi)者實(shí)例严嗜,或者某個(gè)消費(fèi)者實(shí)例異常崩潰被踢出消費(fèi)組;
- 訂閱的主題數(shù)發(fā)生變化洲敢;
- 訂閱主題的分區(qū)數(shù)發(fā)生變化漫玄。
Rebalance的通知機(jī)制是通過(guò)心跳線程來(lái)完成的,當(dāng)協(xié)調(diào)者決定開(kāi)啟新一輪重平衡之后压彭,會(huì)將“REBALANCE_IN_PROCESS”封裝到心跳請(qǐng)求的響應(yīng)中睦优,返回給消費(fèi)者實(shí)例,當(dāng)消費(fèi)者實(shí)例發(fā)現(xiàn)心跳響應(yīng)中包含“REBALANCE_IN_PROCESS”壮不,就知道重平衡開(kāi)始了汗盘。消費(fèi)端參數(shù)heartbeat.interval.ms的作用是控制重平衡通知的頻率,如果想要消費(fèi)者實(shí)例快速的得到Rebalance的通知询一,可以將該參數(shù)設(shè)置為一個(gè)很小的值隐孽。Rebalance的過(guò)程對(duì)消費(fèi)過(guò)程有極大地影響癌椿,類(lèi)似于JVM垃圾收集器中的Stop the world,Rebalance的過(guò)程中菱阵,消費(fèi)組的所有消費(fèi)者實(shí)例都會(huì)停止消費(fèi)踢俄,等待Rebalance完成,應(yīng)該盡量避免Rebalance的發(fā)生晴及。
副本機(jī)制指的是分布式系統(tǒng)在多態(tài)網(wǎng)絡(luò)互連的機(jī)器上保存有相同的數(shù)據(jù)拷貝都办,副本機(jī)制的好處包括:
- 提供數(shù)據(jù)冗余,即使部分組件失效虑稼,系統(tǒng)依然能夠正常運(yùn)行脆丁,提高了系統(tǒng)的可用性和數(shù)據(jù)持久性;
- 提供高伸縮性动雹,支持橫向擴(kuò)展槽卫,能夠通過(guò)增加機(jī)器的方式來(lái)提升讀性能,進(jìn)而提高系統(tǒng)讀操作的吞吐量胰蝠;
- 改善數(shù)據(jù)局部性歼培,將數(shù)據(jù)放在離用戶(hù)比較近的地方,降低網(wǎng)絡(luò)時(shí)延茸塞。
對(duì)于Kafka而言只能享受到第一個(gè)好處躲庄,它是Kafka確保系統(tǒng)高可用和數(shù)據(jù)持久性的重要基石,Kafka采用的是基于領(lǐng)導(dǎo)者(Leader-based)的副本機(jī)制钾虐,工作原理如下圖所示
在Kafka中副本分為領(lǐng)導(dǎo)者副本(Leader Replica)和跟隨者副本(Follower Replica)噪窘,每個(gè)分區(qū)都要選舉一個(gè)領(lǐng)導(dǎo)者副本,其余的副本則為追隨者副本效扫,Kafka中的Follower Replica是不對(duì)外提供讀寫(xiě)服務(wù)的倔监,任何一個(gè)Follower Replica都不會(huì)響應(yīng)生產(chǎn)端和消費(fèi)端的讀寫(xiě)請(qǐng)求,所有的讀寫(xiě)請(qǐng)求都發(fā)生在Leader Replica所在的Broker菌仁,F(xiàn)ollower Replica的唯一任務(wù)就是從Leader Replica上異步拉取日志消息寫(xiě)入到自己的日志文件中浩习,從而實(shí)現(xiàn)與Leader Replica的同步。當(dāng)Leader Replica掛掉之后济丘,會(huì)開(kāi)啟新的一輪領(lǐng)導(dǎo)者的選舉從Follower Replica中選一個(gè)作為新的Leader谱秽,這里需要強(qiáng)調(diào)的一點(diǎn)是不是所有的Follower都有機(jī)會(huì)成為L(zhǎng)eader的,只有和老的Leader保持同步的Follower才能參加選舉摹迷,Kafka引入In-sync Replicas(ISR)來(lái)表示保持同步的一組Replica疟赊,需要明確的是Leader Replica天生在ISR中,也就是說(shuō)ISR不僅僅是Follower副本的集合峡碉,正常情況下近哟,ISR必然包括Leader副本。
Kafka是通過(guò)Broker端的參數(shù)replica.lag.time.max.ms來(lái)判斷某個(gè)副本是否處于同步的狀態(tài)异赫,F(xiàn)ollower副本能夠落后Leader副本的最長(zhǎng)時(shí)間間隔椅挣,默認(rèn)為10s,若Follower副本落后Leader副本的時(shí)間不超過(guò)10s塔拳,則認(rèn)為該Follower副本是和Leader副本同步的鼠证,如果超過(guò)10s,則認(rèn)為Follower副本是非同步的靠抑,會(huì)被踢出ISR副本集合量九。
- 順序讀寫(xiě)
Kafka采用順序讀寫(xiě)磁盤(pán)的方式 - 消息分區(qū)
分區(qū)提供負(fù)載均衡的能力,實(shí)現(xiàn)了系統(tǒng)的高伸縮性颂碧,不同的分區(qū)可以放置到不同節(jié)點(diǎn)的機(jī)器上荠列,數(shù)據(jù)的讀寫(xiě)操作都是針對(duì)分區(qū)粒度進(jìn)行的,這樣每個(gè)節(jié)點(diǎn)的機(jī)器都能獨(dú)立的執(zhí)行各自分區(qū)的讀寫(xiě)操作载城,還能夠通過(guò)增加節(jié)點(diǎn)的方式來(lái)提高系統(tǒng)的吞吐量肌似。 - 零拷貝
如果線上Kafka服務(wù)器選用的是Linux操作系統(tǒng),那么Kafka能夠享受到零拷貝所帶來(lái)的快速數(shù)據(jù)傳輸特性诉瓦,Kafka客戶(hù)端底層使用了Java NIO中的Selector川队,其在Linux上的實(shí)現(xiàn)是epoll,而在Windows操作系統(tǒng)上實(shí)現(xiàn)是select睬澡,所以線上操作系統(tǒng)的選型推薦Linux - 數(shù)據(jù)壓縮
秉承時(shí)空換空間的思想固额,數(shù)據(jù)壓縮就是用CPU時(shí)間去換磁盤(pán)空間或者網(wǎng)絡(luò)IO的傳輸量,希望以較小的CPU開(kāi)銷(xiāo)帶來(lái)更小的磁盤(pán)占用或者更少的網(wǎng)絡(luò)IO傳輸煞聪。 - 批量發(fā)送
線上Kafka的性能調(diào)優(yōu)主要從四個(gè)層次和兩個(gè)方面來(lái)考慮斗躏,四個(gè)層次分別是操作系統(tǒng)層,JVM層昔脯,框架層和應(yīng)用程序?qū)幼牟凇?duì)操作系統(tǒng)的優(yōu)化主要包括操作系統(tǒng)選型和一些系統(tǒng)參數(shù)設(shè)置,如我們推薦使用Linux操作系統(tǒng)云稚,建議將ulimit -n設(shè)置為一個(gè)很大的值迈套,例如65535,來(lái)避免Too many files open這類(lèi)的錯(cuò)誤碱鳞,建議將vm.max_map_count設(shè)置為一個(gè)較大的值桑李,避免碰到OutOfMemoryError: Map failed的嚴(yán)重錯(cuò)誤等,Kafka進(jìn)程是普通的JVM進(jìn)程窿给,所以JVM層面的調(diào)優(yōu)同樣重要贵白,主要包括堆的大小設(shè)置,建議將JVM堆大小設(shè)置在6~8G崩泡,其次是垃圾回收算法設(shè)置禁荒,建議設(shè)置為G1〗亲玻框架層的調(diào)優(yōu)主要是合理設(shè)置Kafka集群的各種參數(shù)呛伴。應(yīng)用層調(diào)優(yōu)指優(yōu)化Kafka客戶(hù)端應(yīng)用程序代碼勃痴,比如使用合理的數(shù)據(jù)結(jié)構(gòu)或者復(fù)用構(gòu)建成本大的對(duì)象等。
兩個(gè)方面是從Kafka性能調(diào)優(yōu)的目標(biāo)來(lái)說(shuō)的热康,分別是吞吐量和時(shí)延沛申。
1)如何提高吞吐量
- Broker端
適量增加num.replica.fetchers的參數(shù)值,但是不要超過(guò)CPU核數(shù)姐军,該參數(shù)表示Follower用多少個(gè)線程從Leader上拉取消息铁材,如果CPU資源充足,適當(dāng)增大該值能夠提高Producer端的吞吐量奕锌;
調(diào)優(yōu)JVM參數(shù)減少full GC著觉; - Producer端
適當(dāng)增加batch.size和linger.ms參數(shù)值,使得Producer一次發(fā)送更多的消息
設(shè)置compression.type=lz4或zstd惊暴;
設(shè)置ack=0或1饼丘;
設(shè)置retries=0;
如果多線程共享同一個(gè)Producer辽话,就增加buffer.memory參數(shù)值葬毫; - Consumer端
采用多Consumer進(jìn)程或者線程同時(shí)消費(fèi)數(shù)據(jù);
增加fetch.min.bytes參數(shù)值
2)如何降低時(shí)延
- Broker端
適量增加num.replica.fetchers的參數(shù)值屡穗,但是不要超過(guò)CPU核數(shù)贴捡; - Producer端
設(shè)置linger.ms=0;
不啟用壓縮村砂,設(shè)置compression.typr=none烂斋;
設(shè)置acks=1; - Consumer端
設(shè)置fetch.min.bytes=1