Kafka原理總結(jié)

MQ(消息隊列)是跨進(jìn)程通信的方式之一膀钠,可理解為異步rpc,上游系統(tǒng)對調(diào)用結(jié)果的態(tài)度往往是重要不緊急竹勉。使用消息隊列有以下好處:業(yè)務(wù)解耦飞盆、流量削峰、靈活擴(kuò)展。接下來介紹消息中間件Kafka吓歇。

Kafka是什么孽水?

Kafka是一個分布式的消息引擎。具有以下特征

能夠發(fā)布和訂閱消息流(類似于消息隊列)

以容錯的城看、持久的方式存儲消息流

多分區(qū)概念女气,提高了并行能力

Kafka架構(gòu)總覽

Topic

消息的主題、隊列测柠,每一個消息都有它的topic炼鞠,Kafka通過topic對消息進(jìn)行歸類。Kafka中可以將Topic從物理上劃分成一個或多個分區(qū)(Partition)轰胁,每個分區(qū)在物理上對應(yīng)一個文件夾谒主,以”topicName_partitionIndex”的命名方式命名,該dir包含了這個分區(qū)的所有消息(.log)和索引文件(.index)赃阀,這使得Kafka的吞吐率可以水平擴(kuò)展霎肯。

Partition

每個分區(qū)都是一個 順序的、不可變的消息隊列凹耙, 并且可以持續(xù)的添加;分區(qū)中的消息都被分了一個序列號姿现,稱之為偏移量(offset)肠仪,在每個分區(qū)中此偏移量都是唯一的肖抱。

producer在發(fā)布消息的時候,可以為每條消息指定Key异旧,這樣消息被發(fā)送到broker時意述,會根據(jù)分區(qū)算法把消息存儲到對應(yīng)的分區(qū)中(一個分區(qū)存儲多個消息),如果分區(qū)規(guī)則設(shè)置的合理吮蛹,那么所有的消息將會被均勻的分布到不同的分區(qū)中荤崇,這樣就實現(xiàn)了負(fù)載均衡。

Broker

Kafka server潮针,用來存儲消息术荤,Kafka集群中的每一個服務(wù)器都是一個Broker,消費者將從broker拉取訂閱的消息

Producer

向Kafka發(fā)送消息每篷,生產(chǎn)者會根據(jù)topic分發(fā)消息瓣戚。生產(chǎn)者也負(fù)責(zé)把消息關(guān)聯(lián)到Topic上的哪一個分區(qū)。最簡單的方式從分區(qū)列表中輪流選擇焦读。也可以根據(jù)某種算法依照權(quán)重選擇分區(qū)子库。算法可由開發(fā)者定義。

Cousumer

Consermer實例可以是獨立的進(jìn)程矗晃,負(fù)責(zé)訂閱和消費消息仑嗅。消費者用consumerGroup來標(biāo)識自己。同一個消費組可以并發(fā)地消費多個分區(qū)的消息,同一個partition也可以由多個consumerGroup并發(fā)消費仓技,但是在consumerGroup中一個partition只能由一個consumer消費

CousumerGroup

Consumer Group:同一個Consumer Group中的Consumers鸵贬,Kafka將相應(yīng)Topic中的每個消息只發(fā)送給其中一個Consumer

Kafka producer 設(shè)計原理

發(fā)送消息的流程

1.序列化消息&&.計算partition

根據(jù)key和value的配置對消息進(jìn)行序列化,然后計算partition:

ProducerRecord對象中如果指定了partition,就使用這個partition脖捻。否則根據(jù)key和topic的partition數(shù)目取余恭理,如果key也沒有的話就隨機(jī)生成一個counter,使用這個counter來和partition數(shù)目取余郭变。這個counter每次使用的時候遞增颜价。

2發(fā)送到batch&&喚醒Sender 線程

根據(jù)topic-partition獲取對應(yīng)的batchs(Dueue<ProducerBatch>),然后將消息append到batch中.如果有batch滿了則喚醒Sender 線程诉濒。隊列的操作是加鎖執(zhí)行周伦,所以batch內(nèi)消息時有序的。后續(xù)的Sender操作當(dāng)前方法異步操作未荒。

3.Sender把消息有序發(fā)到?broker(tp replia leader)

3.1確定tp relica?leader 所在的broker?

Kafka中 每臺broker都保存了kafka集群的metadata信息专挪,metadata信息里包括了每個topic的所有partition的信息: leader, leader_epoch, controller_epoch, isr, replicas等;Kafka客戶端從任一broker都可以獲取到需要的metadata信息;sender線程通過metadata信息可以知道tp leader的brokerId

producer也保存了metada信息,同時根據(jù)metadata更新策略(定期更新metadata.max.age.ms片排、失效檢測寨腔,強(qiáng)制更新:檢查到metadata失效以后,調(diào)用metadata.requestUpdate()強(qiáng)制更新

publicclassPartitionInfo{privatefinalString topic;privatefinalint partition;privatefinalNode leader;privatefinalNode[] replicas;privatefinalNode[] inSyncReplicas;privatefinalNode[] offlineReplicas;}

3.2 冪等性發(fā)送

為實現(xiàn)Producer的冪等性率寡,Kafka引入了Producer ID(即PID)和Sequence Number迫卢。對于每個PID,該P(yáng)roducer發(fā)送消息的每個<Topic, Partition>都對應(yīng)一個單調(diào)遞增的Sequence Number冶共。同樣乾蛤,Broker端也會為每個<PID, Topic, Partition>維護(hù)一個序號,并且每Commit一條消息時將其對應(yīng)序號遞增捅僵。對于接收的每條消息家卖,如果其序號比Broker維護(hù)的序號)大一,則Broker會接受它庙楚,否則將其丟棄:

如果消息序號比Broker維護(hù)的序號差值比一大上荡,說明中間有數(shù)據(jù)尚未寫入,即亂序馒闷,此時Broker拒絕該消息酪捡,Producer拋出InvalidSequenceNumber

如果消息序號小于等于Broker維護(hù)的序號,說明該消息已被保存窜司,即為重復(fù)消息沛善,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber

Sender發(fā)送失敗后會重試塞祈,這樣可以保證每個消息都被發(fā)送到broker

4.?Sender處理broker發(fā)來的produce response

????????一旦broker處理完Sender的produce請求金刁,就會發(fā)送produce response給Sender,此時producer將執(zhí)行我們?yōu)閟end()設(shè)置的回調(diào)函數(shù)。至此producer的send執(zhí)行完畢尤蛮。

吞吐性&&延時:

?buffer.memory:buffer設(shè)置大了有助于提升吞吐性媳友,但是batch太大會增大延遲,可搭配linger_ms參數(shù)使用

linger_ms:如果batch太大,或者producer qps不高,batch添加的會很慢页畦,我們可以強(qiáng)制在linger_ms時間后發(fā)送batch數(shù)據(jù)

ack:producer收到多少broker的答復(fù)才算真的發(fā)送成功

0表示producer無需等待leader的確認(rèn)(吞吐最高、數(shù)據(jù)可靠性最差)

1代表需要leader確認(rèn)寫入它的本地log并立即確認(rèn)

-1/all 代表所有的ISR都完成后確認(rèn)(吞吐最低焊唬、數(shù)據(jù)可靠性最高)

Sender線程和長連接

每初始化一個producer實例,都會初始化一個Sender實例看靠,新增到broker的長連接赶促。

代碼角度:每初始化一次KafkaProducer,都賦一個空的client

public KafkaProducer(finalMap configs) {this(configs,null,null,null,null,null, Time.SYSTEM);? ? }


終端查看TCP連接數(shù):

lsof -p portNum -np | grep TCP挟炬,適當(dāng)增大producer數(shù)量能提升吞吐

Consumer設(shè)計原理

poll消息

消費者通過fetch線程拉消息(單線程)

消費者通過心跳線程來與broker發(fā)送心跳鸥滨。超時會認(rèn)為掛掉

每個consumer group在broker上都有一個coordnator來管理,消費者加入和退出谤祖,以及消費消息的位移都由coordnator處理婿滓。

位移管理

consumer的消息位移代表了當(dāng)前group對topic-partition的消費進(jìn)度,consumer宕機(jī)重啟后可以繼續(xù)從該offset開始消費粥喜。

在kafka0.8之前凸主,位移信息存放在zookeeper上,由于zookeeper不適合高并發(fā)的讀寫容客,新版本Kafka把位移信息當(dāng)成消息秕铛,發(fā)往__consumers_offsets 這個topic所在的broker,__consumers_offsets默認(rèn)有50個分區(qū)缩挑。

消息的key 是groupId+topic_partition,value 是offset.

Kafka?Group?狀態(tài)

Empty:初始狀態(tài),Group 沒有任何成員鬓梅,如果所有的 offsets 都過期的話就會變成 Dead

PreparingRebalance:Group 正在準(zhǔn)備進(jìn)行 Rebalance

AwaitingSync:Group 正在等待來 group leader 的 分配方案

Stable:穩(wěn)定的狀態(tài)(Group is stable)供置;

Dead:Group 內(nèi)已經(jīng)沒有成員,并且它的 Metadata 已經(jīng)被移除

注意

重平衡reblance

當(dāng)一些原因?qū)е耤onsumer對partition消費不再均勻時绽快,kafka會自動執(zhí)行reblance芥丧,使得consumer對partition的消費再次平衡。

什么時候發(fā)生rebalance坊罢?:

組訂閱topic數(shù)變更

topic partition數(shù)變更

consumer成員變更

consumer 加入群組或者離開群組的時候

consumer被檢測為崩潰的時候

reblance過程

舉例1?consumer被檢測為崩潰引起的reblance

比如心跳線程在timeout時間內(nèi)沒和broker發(fā)送心跳续担,此時coordnator認(rèn)為該group應(yīng)該進(jìn)行reblance。接下來其他consumer發(fā)來fetch請求后活孩,coordnator將回復(fù)他們進(jìn)行reblance通知物遇。當(dāng)consumer成員收到請求后,只有l(wèi)eader會根據(jù)分配策略進(jìn)行分配,然后把各自的分配結(jié)果返回給coordnator询兴。這個時候只有consumer leader返回的是實質(zhì)數(shù)據(jù)乃沙,其他返回的都為空。收到分配方法后诗舰,consumer將會把分配策略同步給各consumer

舉例2 consumer加入引起的reblance

使用join協(xié)議警儒,表示有consumer 要加入到group中

使用sync?協(xié)議,根據(jù)分配規(guī)則進(jìn)行分配

(上圖圖片摘自網(wǎng)絡(luò))

引申:以上reblance機(jī)制存在的問題

在大型系統(tǒng)中眶根,一個topic可能對應(yīng)數(shù)百個consumer實例蜀铲。這些consumer陸續(xù)加入到一個空消費組將導(dǎo)致多次的rebalance;此外consumer 實例啟動的時間不可控属百,很有可能超出coordinator確定的rebalance timeout(即max.poll.interval.ms)蝙茶,將會再次觸發(fā)rebalance,而每次rebalance的代價又相當(dāng)?shù)卮笾罾希驗楹芏酄顟B(tài)都需要在rebalance前被持久化隆夯,而在rebalance后被重新初始化。

新版本改進(jìn)

通過延遲進(jìn)入PreparingRebalance狀態(tài)減少reblance次數(shù)

新版本新增了group.initial.rebalance.delay.ms參數(shù)别伏√阒裕空消費組接受到成員加入請求時,不立即轉(zhuǎn)化到PreparingRebalance狀態(tài)來開啟reblance厘肮。當(dāng)時間超過group.initial.rebalance.delay.ms后愧口,再把group狀態(tài)改為PreparingRebalance(開啟reblance)。實現(xiàn)機(jī)制是在coordinator底層新增一個group狀態(tài):InitialReblance类茂。假設(shè)此時有多個consumer陸續(xù)啟動耍属,那么group狀態(tài)先轉(zhuǎn)化為InitialReblance,待group.initial.rebalance.delay.ms時間后巩检,再轉(zhuǎn)換為PreparingRebalance(開啟reblance)

Broker設(shè)計原理

Broker 是Kafka 集群中的節(jié)點厚骗。負(fù)責(zé)處理生產(chǎn)者發(fā)送過來的消息,消費者消費的請求兢哭。以及集群節(jié)點的管理等领舰。由于涉及內(nèi)容較多,先簡單介紹迟螺,后續(xù)專門抽出一篇文章分享?

broker zk注冊

broker消息存儲

Kafka的消息以二進(jìn)制的方式緊湊地存儲冲秽,節(jié)省了很大空間

此外消息存在ByteBuffer而不是堆,這樣broker進(jìn)程掛掉時矩父,數(shù)據(jù)不會丟失锉桑,同時避免了gc問題

通過零拷貝和順序?qū)ぶ罚屜⒋鎯妥x取速度都非城现辏快

處理fetch請求的時候通過zero-copy?加快速度

broker狀態(tài)數(shù)據(jù)

broker設(shè)計中民轴,每臺機(jī)器都保存了相同的狀態(tài)數(shù)據(jù)攻柠。主要包括以下:

controller所在的broker ID,即保存了當(dāng)前集群中controller是哪臺broker

集群中所有broker的信息:比如每臺broker的ID杉武、機(jī)架信息以及配置的若干組連接信息

集群中所有節(jié)點的信息:嚴(yán)格來說辙诞,它和上一個有些重復(fù),不過此項是按照broker ID和監(jiān)聽器類型進(jìn)行分組的轻抱。對于超大集群來說飞涂,使用這一項緩存可以快速地定位和查找給定節(jié)點信息,而無需遍歷上一項中的內(nèi)容祈搜,算是一個優(yōu)化吧

集群中所有分區(qū)的信息:所謂分區(qū)信息指的是分區(qū)的leader较店、ISR和AR信息以及當(dāng)前處于offline狀態(tài)的副本集合。這部分?jǐn)?shù)據(jù)按照topic-partitionID進(jìn)行分組容燕,可以快速地查找到每個分區(qū)的當(dāng)前狀態(tài)梁呈。(注:AR表示assigned replicas,即創(chuàng)建topic時為該分區(qū)分配的副本集合)

broker負(fù)載均衡

分區(qū)數(shù)量負(fù)載:各臺broker的partition數(shù)量應(yīng)該均勻

partition?Replica分配算法如下:

將所有Broker(假設(shè)共n個Broker)和待分配的Partition排序

將第i個Partition分配到第(i mod n)個Broker上

將第i個Partition的第j個Replica分配到第((i + j) mod n)個Broker上

容量大小負(fù)載:每臺broker的硬盤占用大小應(yīng)該均勻

在kafka1.1之前蘸秘,Kafka能夠保證各臺broker上partition數(shù)量均勻官卡,但由于每個partition內(nèi)的消息數(shù)不同,可能存在不同硬盤之間內(nèi)存占用差異大的情況醋虏。在Kafka1.1中增加了副本跨路徑遷移功能kafka-reassign-partitions.sh寻咒,我們可以結(jié)合它和監(jiān)控系統(tǒng),實現(xiàn)自動化的負(fù)載均衡

Kafka高可用

在介紹kafka高可用之前先介紹幾個概念

同步復(fù)制:要求所有能工作的Follower都復(fù)制完颈嚼,這條消息才會被認(rèn)為commit毛秘,這種復(fù)制方式極大的影響了吞吐率

異步復(fù)制:Follower異步的從Leader pull數(shù)據(jù),data只要被Leader寫入log認(rèn)為已經(jīng)commit阻课,這種情況下如果Follower落后于Leader的比較多叫挟,如果Leader突然宕機(jī),會丟失數(shù)據(jù)

Isr

Kafka結(jié)合同步復(fù)制和異步復(fù)制限煞,使用ISR(與Partition Leader保持同步的Replica列表)的方式在確保數(shù)據(jù)不丟失和吞吐率之間做了平衡抹恳。Producer只需把消息發(fā)送到Partition Leader,Leader將消息寫入本地Log晰骑。Follower則從Leader pull數(shù)據(jù)适秩。Follower在收到該消息向Leader發(fā)送ACK。一旦Leader收到了ISR中所有Replica的ACK硕舆,該消息就被認(rèn)為已經(jīng)commit了,Leader將增加HW并且向Producer發(fā)送ACK骤公。這樣如果leader掛了抚官,只要Isr中有一個replica存活,就不會丟數(shù)據(jù)阶捆。

Isr動態(tài)更新

Leader會跟蹤ISR凌节,如果ISR中一個Follower宕機(jī)钦听,或者落后太多,Leader將把它從ISR中移除倍奢。這里所描述的“落后太多”指Follower復(fù)制的消息落后于Leader后的條數(shù)超過預(yù)定值(replica.lag.max.messages)或者Follower超過一定時間(replica.lag.time.max.ms)未向Leader發(fā)送fetch請求朴上。

? broker Nodes In Zookeeper?

/brokers/topics/[topic]/partitions/[partition]/state 保存了topic-partition的leader和Isr等信息

Controller負(fù)責(zé)broker故障檢查&&故障轉(zhuǎn)移(fail/recover)

Controller在Zookeeper上注冊Watch,一旦有Broker宕機(jī)卒煞,其在Zookeeper對應(yīng)的znode會自動被刪除痪宰,Zookeeper會觸發(fā)?Controller注冊的watch,Controller讀取最新的Broker信息

Controller確定set_p畔裕,該集合包含了宕機(jī)的所有Broker上的所有Partition

對set_p中的每一個Partition衣撬,選舉出新的leader、Isr扮饶,并更新結(jié)果具练。

  3.1 從/brokers/topics/[topic]/partitions/[partition]/state讀取該P(yáng)artition當(dāng)前的ISR

  3.2 決定該P(yáng)artition的新Leader和Isr。如果當(dāng)前ISR中有至少一個Replica還幸存甜无,則選擇其中一個作為新Leader扛点,新的ISR則包含當(dāng)前ISR中所有幸存的Replica。否則選擇該P(yáng)artition中任意一個幸存的Replica作為新的Leader以及ISR(該場景下可能會有潛在的數(shù)據(jù)丟失)

  3.3 更新Leader岂丘、ISR陵究、leader_epoch、controller_epoch:寫入/brokers/topics/[topic]/partitions/[partition]/state

直接通過RPC向set_p相關(guān)的Broker發(fā)送LeaderAndISRRequest命令元潘。Controller可以在一個RPC操作中發(fā)送多個命令從而提高效率畔乙。

Controller掛掉

每個 broker 都會在 zookeeper 的臨時節(jié)點?"/controller" 注冊 watcher,當(dāng) controller 宕機(jī)時?"/controller"?會消失翩概,觸發(fā)broker的watch牲距,每個 broker 都嘗試創(chuàng)建新的 controller path,只有一個競選成功并當(dāng)選為 controller钥庇。

使用Kafka如何保證冪等性

不丟消息

首先kafka保證了對已提交消息的at least保證

Sender有重試機(jī)制

producer業(yè)務(wù)方在使用producer發(fā)送消息時牍鞠,注冊回調(diào)函數(shù)。在onError方法中重發(fā)消息

consumer?拉取到消息后评姨,處理完畢再commit难述,保證commit的消息一定被處理完畢

不重復(fù)

consumer拉取到消息先保存,commit成功后刪除緩存數(shù)據(jù)

Kafka高性能

partition提升了并發(fā)

zero-copy

順序?qū)懭?/p>

消息聚集batch

頁緩存

業(yè)務(wù)方對 Kafka producer的優(yōu)化

增大producer數(shù)量

ack配置

batch

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末吐句,一起剝皮案震驚了整個濱河市胁后,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌嗦枢,老刑警劉巖攀芯,帶你破解...
    沈念sama閱讀 206,378評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異文虏,居然都是意外死亡侣诺,警方通過查閱死者的電腦和手機(jī)殖演,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來年鸳,“玉大人趴久,你說我怎么就攤上這事∩θ罚” “怎么了彼棍?”我有些...
    開封第一講書人閱讀 152,702評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長妥箕。 經(jīng)常有香客問我滥酥,道長,這世上最難降的妖魔是什么畦幢? 我笑而不...
    開封第一講書人閱讀 55,259評論 1 279
  • 正文 為了忘掉前任坎吻,我火速辦了婚禮,結(jié)果婚禮上宇葱,老公的妹妹穿的比我還像新娘瘦真。我一直安慰自己,他們只是感情好黍瞧,可當(dāng)我...
    茶點故事閱讀 64,263評論 5 371
  • 文/花漫 我一把揭開白布诸尽。 她就那樣靜靜地躺著,像睡著了一般印颤。 火紅的嫁衣襯著肌膚如雪您机。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,036評論 1 285
  • 那天年局,我揣著相機(jī)與錄音际看,去河邊找鬼。 笑死矢否,一個胖子當(dāng)著我的面吹牛仲闽,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播僵朗,決...
    沈念sama閱讀 38,349評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼赖欣,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了验庙?” 一聲冷哼從身側(cè)響起顶吮,我...
    開封第一講書人閱讀 36,979評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎粪薛,沒想到半個月后云矫,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,469評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡汗菜,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,938評論 2 323
  • 正文 我和宋清朗相戀三年让禀,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片陨界。...
    茶點故事閱讀 38,059評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡巡揍,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出菌瘪,到底是詐尸還是另有隱情腮敌,我是刑警寧澤,帶...
    沈念sama閱讀 33,703評論 4 323
  • 正文 年R本政府宣布俏扩,位于F島的核電站糜工,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏录淡。R本人自食惡果不足惜捌木,卻給世界環(huán)境...
    茶點故事閱讀 39,257評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望嫉戚。 院中可真熱鬧刨裆,春花似錦、人聲如沸彬檀。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽窍帝。三九已至努潘,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間坤学,已是汗流浹背疯坤。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留拥峦,地道東北人贴膘。 一個月前我還...
    沈念sama閱讀 45,501評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像略号,于是被迫代替她去往敵國和親刑峡。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,792評論 2 345

推薦閱讀更多精彩內(nèi)容