為什么需要使用kafka
從本質(zhì)上來講,是因為互聯(lián)網(wǎng)發(fā)展太快温峭,使用單體架構(gòu)無疑會是的體量巨大瓣蛀。而微服務(wù)架構(gòu)可以很好的解決這個問題,但是服務(wù)與服務(wù)之間會還是出現(xiàn)耦合雷厂、訪問控制等問題惋增。 消息隊列可以很好的滿足這些需要。它常用來實現(xiàn):異步處理改鲫、服務(wù)解耦诈皿、流量控制
異步處理
隨著業(yè)務(wù)的不斷增加,通常會在原有的服務(wù)上添加上新服務(wù)像棘,這樣會出現(xiàn)請求鏈路越來越長稽亏,鏈路latency也就逐步增加。例如:最開始的電商項目缕题,可能就是簡簡單單的扣庫存截歉、下單。慢慢地又加上了積分服務(wù)烟零、短信服務(wù)等瘪松。鏈路增長不可避免的latency增加。
相對于扣庫存锨阿、下單宵睦,積分和短信沒必要恢復(fù)的那么及時。所以只需要在下單結(jié)束的時候結(jié)束那個流程墅诡,把消息傳給消息隊列中就可以直接返回響應(yīng)了壳嚎。而且短信服務(wù)和積分服務(wù)可以并行的消費這條消息。這樣響應(yīng)的速度更快末早,用戶體驗更好烟馅;服務(wù)異步執(zhí)行,系統(tǒng)整體latency(相對使用同步機制而言)也下降了然磷。
服務(wù)解耦
上面說的新加了短信服務(wù)和積分服務(wù)焙糟,現(xiàn)在又需要添加數(shù)據(jù)分析服務(wù)、以后可能又加一個策略服務(wù)等样屠〈┐椋可以發(fā)現(xiàn)訂單的后續(xù)鏈路一直在增加缺脉,為了適配這些功能,就需要不斷的修改訂單服務(wù)悦穿,下游任何一個服務(wù)的接口改變都可能會影響到訂單服務(wù)攻礼。
這個時候可以采用消息隊列來解耦,訂單服務(wù)只需要把消息塞到消息隊列里面栗柒,下游服務(wù)誰要這個消息誰就訂閱響應(yīng)的topic礁扮。這樣訂單服務(wù)就不用被拿捏住了!瞬沦!
流量治理
后端服務(wù)相對而言是比較脆弱的太伊,因為業(yè)務(wù)較重,處理時間長逛钻。如果碰到高QPS情況僚焦,很容易頂不住。比如說題庫數(shù)據(jù)寫入到ES索引中曙痘,數(shù)據(jù)都是千萬級別的芳悲。這個時候使用中間件來做一層緩沖,消息隊列是個很不錯的選擇边坤。
變更的消息先存放到消息隊列中名扛,后端服務(wù)盡自己最大的努力去消費隊列中消費數(shù)據(jù)。
同時茧痒,對于一些不需要及時地響應(yīng)處理肮韧,且業(yè)務(wù)處理邏輯復(fù)雜、流程長旺订,那么數(shù)據(jù)放到消息隊列中惹苗,消費者按照自己的消費節(jié)奏走,也是很不錯的選擇耸峭。
上述分別對應(yīng)著 生產(chǎn)者生產(chǎn)過快 和 消費者消費過慢 兩種情況桩蓉,使用消息隊列都能很好的起到緩沖作用。
總結(jié)一下
kafka特點:
解耦合劳闹。消息隊列提供了借口院究,生產(chǎn)者和消費者能夠獨立的完成讀操作和寫操作。
高吞吐率本涕。即使是在廉價的商用機器上也能做到單機支持每秒100K條消息的傳輸
信息傳輸快业汰。以時間復(fù)雜度為
O(1)
的方式提供持久化能力,即使對TB級
以上數(shù)據(jù)也能保證常數(shù)時間的訪問性能可提供持久化菩颖。消息存儲在中間件上样漆,數(shù)據(jù)持久化,直到全部被處理完晦闰,通過這一方式規(guī)避了數(shù)據(jù)丟失的風險放祟。
kafka適用場景
根據(jù)上述功能和特點鳍怨,kafka主要有以下使用場景:
信息系統(tǒng)
Messaging
。 在這個領(lǐng)域中跪妥,kafka常常被拿來與傳統(tǒng)的消息中間件進行對比鞋喇,如RabbitMQ。網(wǎng)站活動追蹤
Website Activity Tracking
監(jiān)控
Metrics
日志收集
Log Aggregation
流處理
Stream Processing
事件溯源
Event Sourcing
提交日志
Commit Log
具體可見:使用場景
kafka組件
- Producer : 消息生產(chǎn)者眉撵,就是向 Kafka發(fā)送數(shù)據(jù) 侦香;
- Consumer : 消息消費者,從 Kafka broker 取消息的客戶端纽疟;
- Consumer Group (CG): 消費者組罐韩,由多個 consumer 組成。 消費者組內(nèi)每個消費者負責消費不同分區(qū)的數(shù)據(jù)污朽,一個分區(qū)只能由一個組內(nèi)消費者消費散吵;消費者組之間互不影響。 所有的消費者都屬于某個消費者組膘壶,即消費者組是邏輯上的一個訂閱者错蝴。
- Broker :經(jīng)紀人 一臺 Kafka 服務(wù)器就是一個 broker洲愤。一個集群由多個 broker 組成颓芭。一個 broker可以容納多個 topic。
- Topic : 話題柬赐,可以理解為一個隊列亡问, 生產(chǎn)者和消費者面向的都是一個 topic;
- Partition: 為了實現(xiàn)擴展性肛宋,一個非常大的 topic 可以分布到多個 broker(即服務(wù)器)上州藕,一個 topic 可以分為多個 partition,每個 partition 是一個有序的隊列酝陈;如果一個topic中的partition有5個床玻,那么topic的并發(fā)度為5.
- Replica: 副本(Replication),為保證集群中的某個節(jié)點發(fā)生故障時沉帮, 該節(jié)點上的 partition 數(shù)據(jù)不丟失锈死,且 Kafka仍然能夠繼續(xù)工作, Kafka 提供了副本機制穆壕,一個 topic 的每個分區(qū)都有若干個副本待牵,一個 leader 和若干個 follower。
- Leader: 每個分區(qū)多個副本的“主”喇勋,生產(chǎn)者發(fā)送數(shù)據(jù)的對象缨该,以及消費者消費數(shù)據(jù)的對象都是 leader。
- Follower: 每個分區(qū)多個副本中的“從”川背,實時從 leader 中同步數(shù)據(jù)贰拿,保持和 leader 數(shù)據(jù)的同步蛤袒。 leader 發(fā)生故障時,某個 Follower 會成為新的 leader壮不。
-
Offset : 每個Consumer 消費的信息都會有自己的序號汗盘,我們稱作當前隊列的offset。即消費點位標識消費到的位置询一。每個消費組都會維護訂閱的Topic 下每個隊列的offset
基本配置
kafka基本使用方式
消費模型:
隊列模型:
生產(chǎn)者往某個隊列里面發(fā)送消息隐孽,一個隊列可以存儲多個消費者的信息。一個隊列也可以有多個消費者健蕊,但是消費者之間是競爭關(guān)系菱阵,一個消息只能被一個消費者消費。在消息被確認消費過后缩功,會被從消息隊列中移除掉晴及,即消費者不能再次消費到已經(jīng)被消費的數(shù)據(jù)。
發(fā)布/訂閱模式:
為了解決一條消息能被多個消費者消費的問題嫡锌,發(fā)布/訂閱模式是個很不錯的選擇虑稼。生產(chǎn)者將消息塞到消息隊列對應(yīng)的topic中,所有訂閱了這個topic的消費者都能消費這條消息势木。
借用看到例子蛛倦,可以這么理解,發(fā)布/訂閱模型等于我們都加入了一個群聊中啦桌,我發(fā)一條消息溯壶,加入了這個群聊的人都能收到這條消息。 那么隊列模型就是一對一聊天甫男,我發(fā)給你的消息且改,只能在你的聊天窗口彈出,是不可能彈出到別人的聊天窗口中的板驳。
而隊列模式實現(xiàn)給群聊中的所有人的發(fā)送的方案又跛,采用的是多隊列全量存儲的方式,但是出現(xiàn)數(shù)據(jù)冗余的情況若治。簡單來說就是一對一聊天中發(fā)送同樣的消息慨蓝。
kafka 和 RocketMQ使用的是發(fā)布訂閱模式,而RabbitMQ使用的是隊列模式直砂。
消息獲取方式
生產(chǎn)者
producer 采用push(推)模式向broker 中寫入數(shù)據(jù)菌仁。
pull (拉)模式需要kafka集群事先知曉 producer的信息,即producer需要先注冊后才能使用静暂。當有生產(chǎn)者實例宕機了济丘,可能會存在空等。若需要擴展新的producer,則需要先注冊摹迷,在后續(xù)的kafka版本中逐步地和zookeeper進行了解耦疟赊,注冊成為了一個麻煩的事情。
push(推)模式的優(yōu)點是 生產(chǎn)者有數(shù)據(jù)就塞給消息隊列峡碉,不用管其他的事情近哟,只用專注于生產(chǎn)數(shù)據(jù)。
消費者
consumer 采用 pull(拉) 模式從 broker 中讀取數(shù)據(jù)鲫寄。
push(推)模式很難適應(yīng)消費速率不同的消費者吉执,因為消息發(fā)送速率是由 broker 決定的。它的目標是盡可能以最快速度傳遞消息地来,但是這樣很容易造成 consumer 來不及處理消息戳玫,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而 pull 模式則可以根據(jù) consumer 的消費能力以適當?shù)乃俾氏M消息未斑。
pull 模式不足之處是咕宿,如果 kafka 沒有數(shù)據(jù),消費者可能會陷入循環(huán)中蜡秽, 一直返回空數(shù)據(jù)府阀。 針對這一點, Kafka 的消費者在消費數(shù)據(jù)時會傳入一個時長參數(shù) timeout芽突,如果當前沒有數(shù)據(jù)可供消費试浙, consumer 會等待一段時間之后再返回,這段時長即為 timeout诉瓦。
輪詢:
那么消費者是如何知道生產(chǎn)者發(fā)送了數(shù)據(jù)呢川队?換一句話來說就是力细,消費者什么時候 pull 數(shù)據(jù)呢睬澡? 其實生產(chǎn)者產(chǎn)生的數(shù)據(jù)消費者是不知道的,KafkaConsumer 采用輪詢的方式定期去 Kafka Broker 中進行數(shù)據(jù)的檢索眠蚂,如果有數(shù)據(jù)就用來消費煞聪,如果沒有就再繼續(xù)輪詢等待。
文件存儲
先說結(jié)論逝慧,kafka存儲的數(shù)據(jù)是以追加的方式添加到隊列尾部昔脯。讀寫數(shù)據(jù)是順序讀寫。
由于生產(chǎn)者生產(chǎn)的消息會不斷追加到 log 文件末尾笛臣, 為防止 log 文件過大導(dǎo)致數(shù)據(jù)定位效率低下云稚, Kafka 采取了分片和索引機制,將每個 partition 分為多個 segment沈堡。
每個 segment對應(yīng)兩個文件——“.index”文件和“.log”文件静陈。 這些文件位于一個文件夾下, 該文件夾的命名規(guī)則為: topic 名稱+分區(qū)序號。例如鲸拥, first 這個 topic 有三個分區(qū)拐格,則其對應(yīng)的文件夾為 first-0,first-1,first-2。
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
index 和 log 文件以當前 segment 的第一條消息的 offset 命名刑赶。下圖為 index 文件和 log文件的結(jié)構(gòu)示意圖捏浊。
“.index”文件存儲大量的索引信息,“.log”文件存儲大量的數(shù)據(jù)撞叨,索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中 message 的物理偏移地址金踪。
kafka四大API
Producer API,它允許應(yīng)用程序向一個或多個 topics 上發(fā)送消息記錄
Consumer API牵敷,允許應(yīng)用程序訂閱一個或多個 topics 并處理為其生成的記錄流
Streams API热康,它允許應(yīng)用程序作為流處理器,從一個或多個主題中消費輸入流并為其生成輸出流劣领,有效的將輸入流轉(zhuǎn)換為輸出流姐军。
Connector API,它允許構(gòu)建和運行將 Kafka 主題連接到現(xiàn)有應(yīng)用程序或數(shù)據(jù)系統(tǒng)的可用生產(chǎn)者和消費者尖淘。例如奕锌,關(guān)系數(shù)據(jù)庫的連接器可能會捕獲對表的所有更改
如何保證數(shù)據(jù)高可靠、不丟失
數(shù)據(jù)丟失的原因
可以看到一共有三個階段村生,分別是生產(chǎn)消息惊暴、存儲消息和消費消息。 那么這三個階段都是有可能丟失消息的趁桃。
(生產(chǎn)消息)如果出現(xiàn)了網(wǎng)絡(luò)不可用辽话、消息本身不合格等原因?qū)е孪⒏緵]有被 Broker 接收,那就相當于消息在生產(chǎn)者端就消失了卫病。
(存儲消息)Broker 端的消息丟失油啤,一般是由 Broker 服務(wù)不可用造成的,例如 Broker 都宕機了導(dǎo)致消息丟失
-
(消費消息)消費者在消費消息的過程中蟀苛,會同時更新消費者位移益咬,也就是「已經(jīng)消費到哪一條消息了」。這里就存在一個問題帜平,當消費一個消息的時候幽告,是先處理消息,成功后再更新位移裆甩,還是先更新位移冗锁,再處理消息。
如果先更新位移嗤栓,在處理消息冻河,當消息處理出現(xiàn)問題,或者更新完位移、消息還未處理芋绸,消費者出現(xiàn)宕機等問題的時候媒殉,消息就會丟失。
而如果先處理消息再更新位移摔敛,雖然可能會出現(xiàn)重復(fù)消費同一個消息的問題廷蓉,但是,我們可以通過消費者處理邏輯實現(xiàn)冪等的方式來解決马昙。
解決方案
producer 生產(chǎn)消息
ack 機制
生產(chǎn)者 acks參數(shù)指定了必須要有多少個分區(qū)副本收到消息桃犬,生產(chǎn)者才認為該消息是寫入成功的,這個參數(shù)對于消息是否丟失起著重要作用行楞。
ack 策略
現(xiàn)在我們已經(jīng)知道生產(chǎn)者發(fā)送消息有個確認的機制攒暇,那么Kafka里是何時確認呢?Kafka是通過配置acks的值確認機制的子房,這里一共提供了三種策略岖寞,對應(yīng)不同的ACK機制:
- acks=0迷守,生產(chǎn)者不等待broker的響應(yīng)。這種情況下延遲最低,但是有可能丟失數(shù)據(jù)勉耀,比較適合高吞吐量佩厚、接受消息丟失的場景骤素。
- acks=1侠鳄,生產(chǎn)者發(fā)送消息等待broker的響應(yīng),等待leader落盤成功后響應(yīng)確認送讲。這種情況下奸笤,如果是在leader完成同步消息給follower前發(fā)生故障,則可能發(fā)生消息丟失哼鬓。
- acks=-1监右,生產(chǎn)者發(fā)送消息等待broker的響應(yīng),直到leader和follower全部落盤成功后才會響應(yīng)確認魄宏。此機制能嚴格保證不丟失數(shù)據(jù)秸侣。但當所有的follower同步完成之后存筏,leader發(fā)送ack響應(yīng)之前宠互,leader發(fā)生了宕機,此時生產(chǎn)者會以為發(fā)送失敗了椭坚,然后會重新發(fā)送數(shù)據(jù)給新的leader予跌,因此該情況下會導(dǎo)致數(shù)據(jù)重復(fù)發(fā)送。
broker 存儲消息
存儲消息階段需要在消息刷盤之后再給生產(chǎn)者響應(yīng)善茎,假設(shè)消息寫入緩存中就返回響應(yīng)券册,那么機器突然斷電這消息就沒了,而生產(chǎn)者以為已經(jīng)發(fā)送成功了。
如果Broker
是集群部署烁焙,有多副本機制航邢,即消息不僅僅要寫入當前Broker
,還需要寫入副本機中。那配置成至少寫入兩臺機子后再給生產(chǎn)者響應(yīng)骄蝇。這樣基本上就能保證存儲的可靠了膳殷。所以broker 消息存儲主要是靠的是冗余副本,即多個Replica
ISR機制 和 AR機制
簡單來說九火,分區(qū)中的所有副本統(tǒng)稱為 AR
(Assigned Replicas)赚窃。所有與leader副本保持一定程度同步的副本(包括leader副本在內(nèi))組成 ISR
(In Sync Replicas)。 ISR 集合是 AR 集合的一個子集岔激。消息會先發(fā)送到leader副本勒极,然后follower副本才能從leader中拉取消息進行同步。同步期間虑鼎,follow副本相對于leader副本而言會有一定程度的滯后辱匿。前面所說的 ”一定程度同步“ 是指可忍受的滯后范圍,這個范圍可以通過參數(shù)進行配置炫彩。于leader副本同步滯后過多的副本(不包括leader副本)將組成 OSR
(Out-of-Sync Replied)由此可見掀鹅,AR = ISR + OSR。正常情況下媒楼,所有的follower副本都應(yīng)該與leader 副本保持 一定程度的同步乐尊,即AR=ISR,OSR集合為空划址。
leader副本負責維護和跟蹤 ISR
集合中所有follower副本的滯后狀態(tài)扔嵌,當follower副本落后太多或失效時,即follower長時間未向leader發(fā)送消息夺颤,leader副本會把它從 ISR 集合中剔除痢缎。如果 OSR
集合中所有follower副本“追上”了leader副本,那么leader副本會把它從 OSR 集合轉(zhuǎn)移至 ISR 集合【副本可以在OSP,ISR中來回移動】世澜。默認情況下独旷,當leader副本發(fā)生故障時,只有在 ISR 集合中的follower副本才有資格被選舉為新的leader寥裂,而在 OSR 集合中的副本則沒有任何機會(不過這個可以通過配置來改變)嵌洼。
broker恢復(fù)機制
- LEO:(Log End Offset)每個副本的最后一個offset
- HW:(High Watermark)高水位,指的是消費者能見到的最大的 offset封恰, ISR 隊列中最小的 LEO麻养。可以理解為短板效應(yīng)
follower 故障:follower 發(fā)生故障后會被臨時踢出 ISR诺舔,待該 follower 恢復(fù)后鳖昌, follower 會讀取本地磁盤記錄的上次的 HW备畦,并將 log 文件高于 HW 的部分截取掉,從 HW 開始向 leader 進行同步许昨。等該 follower 的 LEO 大于等于該 Partition 的 HW懂盐,即 follower 追上 leader 之后,就可以重新加入 ISR 了糕档。
leader 故障:leader 發(fā)生故障之后允粤,會從 ISR 中選出一個新的 leader,之后翼岁,為保證多個副本之間的數(shù)據(jù)一致性类垫, 其余的 follower 會先將各自的 log 文件高于 HW 的部分截掉,然后從新的 leader同步數(shù)據(jù)琅坡。
comsumer 消費消息
消費者拿到消息之后直接存入內(nèi)存隊列中就直接返回給Broker
消費成功悉患,這樣其實是不算消息消費成功的。我們需要考慮消息放在內(nèi)存之后消費者就宕機了怎么辦榆俺,若直接設(shè)置為消費成功售躁,當前情況下本條消息相當于丟失了。
所以我們應(yīng)該在消費者真正執(zhí)行完業(yè)務(wù)邏輯之后茴晋,再發(fā)送給Broker
消費成功陪捷,這才是真正的消費了。
如何保證消息有序
有序性分為:全局有序和局部有序
全局有序
如果要保證消息全局有序诺擅,首先只能由一個生產(chǎn)者往Topic發(fā)送消息市袖,并且一個Topic內(nèi)部只能有一個分區(qū)(partition)。消費者也必須是單線程的消費數(shù)據(jù)烁涌。這樣消息才會是全局有序的苍碟。
不過一般情況下,我們不需要全局有序撮执。一般消息的粒度不會很大微峰,例如,同步MySql BinLog 也只需要保證單表消息有序即可抒钱。
局部有序
絕大多數(shù)的需求的有序性的要求都是局部有序蜓肆,局部有序我們就可以將Topic內(nèi)部劃分成我們需要的分區(qū)數(shù),把消息通過分區(qū)策略發(fā)往固定的分區(qū)中谋币。每個partition對應(yīng)一個單線程處理的消費者仗扬,這樣既完成了部分有序的需求,又可以通過partition數(shù)量的并發(fā)來提高消息處理消息瑞信。
小結(jié)
每個分區(qū)內(nèi),每條消息都有offset,所以只能在同一分區(qū)內(nèi)有序,但不同的分區(qū)無法做到消息順序性
如何保證數(shù)據(jù)不重復(fù)
數(shù)據(jù)重復(fù)的原因
- (
Producer
-->Broker
) 生產(chǎn)者已經(jīng)往Broker
發(fā)送消息了厉颤,Broker
也收到了消息,并且寫入了凡简。當時響應(yīng)由于網(wǎng)絡(luò)原因生產(chǎn)者沒有收到逼友,然后生產(chǎn)者又重發(fā)了一次,此時消息就重復(fù)了秤涩。 - (
Broker
-->Consumer
)假設(shè)我們消費者拿到消息消費了帜乞,業(yè)務(wù)邏輯已經(jīng)走完了,事務(wù)提交了筐眷,此時需要更新Consumer offset
了黎烈,然后這個消費者掛了,另一個消費者頂上匀谣,此時Consumer offset
還沒更新照棋,于是又拿到剛才那條消息,業(yè)務(wù)又被執(zhí)行了一遍武翎。于是消息又重復(fù)了
解決方案
可以看到正常業(yè)務(wù)而言消息重復(fù)是不可避免的烈炭,因此我們只能從另一個角度來解決重復(fù)消息的問題。我們?nèi)绾伪WC消費重復(fù)消息后宝恶,最終的結(jié)果是一樣的符隙。
關(guān)鍵點就是冪等。既然我們不能防止重復(fù)消息的產(chǎn)生垫毙,那么我們只能在業(yè)務(wù)上處理重復(fù)消息所帶來的影響霹疫。
冪等性
冪等性定義:
用戶對于同一操作發(fā)起的一次請求或者多次請求的結(jié)果是一致的,不會因為多次點擊而產(chǎn)生了副作用综芥。
例如這條 SQL update t1 set money = 150 where id = 1 and money = 100;
執(zhí)行多少遍money
都是150丽蝎,這就叫冪等。
如何保證消息發(fā)送的冪等性
produce -- > broke
每個producer會分配一個唯一 的PID膀藐,發(fā)往同一個broker的消息會附帶一個Sequence Number征峦,broker端會對<PID,partitionId,Sequence Number>做一個緩存,當具有相同主鍵的消息提交時消请,Kafka只會持久化一條栏笆。
注意:
PID 會隨著生產(chǎn)者重啟而發(fā)生變化,并且不同的partition對應(yīng)的partitionId也不相同臊泰。
broke ---> comsumer
具體還需要參照業(yè)務(wù)細節(jié)來實現(xiàn)蛉加。這里提供一個參考,可以通過上面那條 SQL 一樣缸逃,做了個前置條件判斷针饥,即money = 100
情況,并且直接修改需频,更通用的是做個version
即版本號控制丁眼,對比消息中的版本號和數(shù)據(jù)庫中的版本號。
如何處理消息堆積
消息堆積的原因
消息的堆積往往是因為生產(chǎn)者的生產(chǎn)速度與消費者的消費速度不匹配昭殉。有可能是因為消息消費失敗反復(fù)重試造成的苞七,也有可能就是消費者消費能力弱藐守,漸漸地消息就積壓了。
解決方案
阻塞生產(chǎn)者消息
消費速度跟不上蹂风,那么阻塞住生產(chǎn)者不就可以了卢厂? 但是在使用場景中,業(yè)務(wù)方的數(shù)據(jù)是源源不斷的惠啄,阻塞住很有可能帶來損失慎恒,一般不采用這種方案。
增加Topic中partition數(shù)量
增加消費者數(shù)量
- 消費者數(shù)量 < partition的數(shù)量撵渡, 可以直接增加消費者數(shù)量
- 消費者數(shù)量 <= partition的數(shù)量融柬,注意隊列數(shù)一定要增加,不然新增加的消費者是沒東西消費的趋距。一個Topic中粒氧,一個partition只會分配給一個消費者。
臨時隊列
我們可能會遇到這樣的一種場景棚品,消費者宕機了好久靠欢。等到消費者恢復(fù)過來的時候,消息已經(jīng)堆積成山了铜跑。如果還按照以前的速度來進行消費门怪,肯定是不能滿足需求的。所以這個時候需要提速消費9摹掷空!
使用 臨時隊列 是一個不錯的選擇:
新建一個 Topic,設(shè)置為 20 個 Partition
Consumer 不再處理業(yè)務(wù)邏輯了囤锉,只負責搬運坦弟,把消息放到臨時 Topic 中
這 20 個 Partition 可以有 20個 Consumer 了,它們來處理原來的業(yè)務(wù)邏輯官地。
如何保證數(shù)據(jù)的一致性
數(shù)據(jù)的高可用性通常采用的是數(shù)據(jù)冗余的方式來實現(xiàn)的酿傍,而強一致性和高可用性相對應(yīng)。一致性需要保證副本之間的同步驱入。
LEO 和 HW
- LEO:(Log End Offset)每個副本的最后一個offset
- HW:(High Watermark)高水位赤炒,指的是消費者能見到的最大的 offset, ISR 隊列中最小的 LEO亏较≥喊可以理解為短板效應(yīng)
分區(qū)分配策略
分區(qū)的原因
- 方便在集群中擴展,每個 Partition 可以通過調(diào)整以適應(yīng)它所在的機器雪情,而一個 topic又可以有多個 Partition 組成遵岩,因此整個集群就可以適應(yīng)適合的數(shù)據(jù)了;
- 可以提高并發(fā)巡通,因為可以以 Partition 為單位讀寫了尘执。
生產(chǎn)者分區(qū)機制
Kafka 對于數(shù)據(jù)的讀寫是以分區(qū)
為粒度的舍哄,分區(qū)可以分布在多個主機(Broker)中,這樣每個節(jié)點能夠?qū)崿F(xiàn)獨立的數(shù)據(jù)寫入和讀取正卧,并且能夠通過增加新的節(jié)點來增加 Kafka 集群的吞吐量蠢熄,通過分區(qū)部署在多個 Broker 來實現(xiàn)負載均衡
的效果跪解。
由于消息是存在主題(topic)的分區(qū)(partition)中的炉旷,所以當 Producer 生產(chǎn)者發(fā)送產(chǎn)生一條消息發(fā)給 topic 的時候,你如何判斷這條消息會存在哪個分區(qū)中呢叉讥? 分區(qū)策略就是用來解決這個問題的窘行。
分區(qū)策略
Kafka 的分區(qū)策略指的就是將生產(chǎn)者發(fā)送到哪個分區(qū)的算法。
指定partition
指明partition時图仓,直接將該值作為partition值罐盔。
隨機輪詢
按key存儲
若未指明,但是有key的情況下救崔,將key的hash值與該topic下可用的分區(qū)數(shù)取余得到partition值惶看。
順序輪詢(round-robin)
若既未指明partition,也沒有key時六孵,在第一次調(diào)用時隨機生成一個整數(shù)(后續(xù)每次調(diào)用都會在這個整數(shù)上自增)纬黎,將該整數(shù)與topic下可用的分區(qū)數(shù)取余得到partition值,也就是常說的round-robin
算法劫窒。
消費者如何和Parition相對應(yīng)
Rebalance 消費者再平衡機制
所謂的再平衡本今,指的是在kafka consumer所訂閱的topic發(fā)生變化時發(fā)生的一種分區(qū)重分配機制。一般有三種情況會觸發(fā)再平衡:
- consumer group中的新增或刪除某個consumer主巍,導(dǎo)致其所消費的分區(qū)需要分配到組內(nèi)其他的consumer上冠息;
- consumer訂閱的topic發(fā)生變化,比如訂閱的topic采用的是正則表達式的形式孕索,如
test-*
此時如果有一個新建了一個topictest-user
逛艰,那么這個topic的所有分區(qū)也是會自動分配給當前的consumer的,此時就會發(fā)生再平衡搞旭; - consumer所訂閱的topic發(fā)生了新增分區(qū)的行為散怖,那么新增的分區(qū)就會分配給當前的consumer,此時就會觸發(fā)再平衡选脊。
Kafka提供的再平衡策略主要有三種:Round Robin
杭抠,Range
和Sticky
,默認使用的是Range
恳啥。這三種分配策略的主要區(qū)別在于:
Round Robin
:會采用輪詢的方式將當前所有的分區(qū)依次分配給所有的consumer偏灿;Range
:首先會計算每個consumer可以消費的分區(qū)個數(shù),然后按照順序?qū)⒅付▊€數(shù)范圍的分區(qū)分配給各個consumer钝的;-
Sticky
:這種分區(qū)策略是最新版本中新增的一種策略翁垂,其主要實現(xiàn)了兩個目的:將現(xiàn)有的分區(qū)盡可能均衡的分配給各個consumer铆遭,存在此目的的原因在于
Round Robin
和Range
分配策略實際上都會導(dǎo)致某幾個consumer承載過多的分區(qū),從而導(dǎo)致消費壓力不均衡沿猜;如果發(fā)生再平衡枚荣,那么重新分配之后在前一點的基礎(chǔ)上會盡力保證當前未宕機的consumer所消費的分區(qū)不會被分配給其他的consumer上;
Round Robin
關(guān)于Roudn Robin重分配策略啼肩,其主要采用的是一種輪詢的方式分配所有的分區(qū)橄妆,該策略主要實現(xiàn)的步驟如下。這里我們首先假設(shè)有三個topic:t0祈坠、t1和t2害碾,這三個topic擁有的分區(qū)數(shù)分別為1、2和3赦拘,那么總共有六個分區(qū)慌随,這六個分區(qū)分別為:t0-0、t1-0躺同、t1-1阁猜、t2-0、t2-1和t2-2蹋艺。這里假設(shè)我們有三個consumer:C0剃袍、C1和C2,它們訂閱情況為:C0訂閱t0车海,C1訂閱t0和t1笛园,C2訂閱t0、t1和t2侍芝。那么這些分區(qū)的分配步驟如下:
-
首先將所有的partition和consumer按照字典序進行排序研铆,所謂的字典序,就是按照其名稱的字符串順序州叠,那么上面的六個分區(qū)和三個consumer排序之后分別為:
- 然后依次以按順序輪詢的方式將這六個分區(qū)分配給三個consumer棵红,如果當前consumer沒有訂閱當前分區(qū)所在的topic,則輪詢的判斷下一個consumer:
- 嘗試將t0-0分配給C0咧栗,由于C0訂閱了t0逆甜,因而可以分配成功;
- 嘗試將t1-0分配給C1致板,由于C1訂閱了t1交煞,因而可以分配成功;
- 嘗試將t1-1分配給C2斟或,由于C2訂閱了t1素征,因而可以分配成功;
- 嘗試將t2-0分配給C0,由于C0沒有訂閱t2御毅,因而會輪詢下一個consumer根欧;
- 嘗試將t2-0分配給C1,由于C1沒有訂閱t2端蛆,因而會輪詢下一個consumer凤粗;
- 嘗試將t2-0分配給C2,由于C2訂閱了t2今豆,因而可以分配成功嫌拣;
- 同理由于t2-1和t2-2所在的topic都沒有被C0和C1所訂閱,因而都不會分配成功晚凿,最終都會分配給C2亭罪。
- 按照上述的步驟將所有的分區(qū)都分配完畢之后瘦馍,最終分區(qū)的訂閱情況如下:
從上面的步驟分析可以看出歼秽,輪詢的策略就是簡單的將所有的partition和consumer按照字典序進行排序之后,然后依次將partition分配給各個consumer情组,如果當前的consumer沒有訂閱當前的partition燥筷,那么就會輪詢下一個consumer,直至最終將所有的分區(qū)都分配完畢院崇。但是從上面的分配結(jié)果可以看出肆氓,輪詢的方式會導(dǎo)致每個consumer所承載的分區(qū)數(shù)量不一致,從而導(dǎo)致各個consumer壓力不均一底瓣。
Range
所謂的Range重分配策略谢揪,就是首先會計算各個consumer將會承載的分區(qū)數(shù)量,然后將指定數(shù)量的分區(qū)分配給該consumer捐凭。這里我們假設(shè)有兩個consumer:C0和C1拨扶,兩個topic:t0和t1,這兩個topic分別都有三個分區(qū)茁肠,那么總共的分區(qū)有六個:t0-0患民、t0-1、t0-2垦梆、t1-0匹颤、t1-1和t1-2。那么Range分配策略將會按照如下步驟進行分區(qū)的分配:
- 需要注意的是托猩,Range策略是按照topic依次進行分配的印蓖,比如我們以t0進行講解,其首先會獲取t0的所有分區(qū):t0-0京腥、t0-1和t0-2赦肃,以及所有訂閱了該topic的consumer:C0和C1,并且會將這些分區(qū)和consumer按照字典序進行排序;
- 然后按照平均分配的方式計算每個consumer會得到多少個分區(qū)摆尝,如果沒有除盡温艇,則會將多出來的分區(qū)依次計算到前面幾個consumer。比如這里是三個分區(qū)和兩個consumer堕汞,那么每個consumer至少會得到1個分區(qū)勺爱,而3除以2后還余1,那么就會將多余的部分依次算到前面幾個consumer讯检,也就是這里的1會分配給第一個consumer琐鲁,總結(jié)來說,那么C0將會從第0個分區(qū)開始人灼,分配2個分區(qū)围段,而C1將會從第2個分區(qū)開始,分配1個分區(qū)投放;
- 同理奈泪,按照上面的步驟依次進行后面的topic的分配。
- 最終上面六個分區(qū)的分配情況如下:
可以看到灸芳,如果按照Range
分區(qū)方式進行分配涝桅,其本質(zhì)上是依次遍歷每個topic,然后將這些topic的分區(qū)按照其所訂閱的consumer數(shù)量進行平均的范圍分配烙样。這種方式從計算原理上就會導(dǎo)致排序在前面的consumer分配到更多的分區(qū)冯遂,從而導(dǎo)致各個consumer的壓力不均衡。
Sticky
Sticky
策略是新版本中新增的策略谒获,顧名思義蛤肌,這種策略會保證再分配時已經(jīng)分配過的分區(qū)盡量保證其能夠繼續(xù)由當前正在消費的consumer繼續(xù)消費,當然批狱,前提是每個consumer所分配的分區(qū)數(shù)量都大致相同裸准,這樣能夠保證每個consumer消費壓力比較均衡。
消費者初始分配
初始分配使用的就是sticky
策略精耐,初始狀態(tài)分配的特點是狼速,所有的分區(qū)都還未分配到任意一個consumer上。
這里我們假設(shè)有三個consumer:C0卦停、C1和C2向胡,三個topic:t0、t1和t2惊完,這三個topic分別有1僵芹、2和3個分區(qū),那么總共的分區(qū)為:t0-0小槐、t1-0拇派、t1-1荷辕、t2-0、t2-1和t2-2件豌。關(guān)于訂閱情況疮方,這里C0訂閱了t0,C1訂閱了t0和1茧彤,C2則訂閱了t0骡显、t1和t2。這里的分區(qū)分配規(guī)則如下:
-
首先將所有的分區(qū)進行排序曾掂,排序方式為:首先按照當前分區(qū)所分配的consumer數(shù)量從低到高進行排序惫谤,如果consumer數(shù)量相同,則按照分區(qū)的字典序進行排序珠洗。這里六個分區(qū)由于所在的topic的訂閱情況各不相同溜歪,因而其排序結(jié)果如下:
-
然后將所有的consumer進行排序,其排序方式為:首先按照當前consumer已經(jīng)分配的分區(qū)數(shù)量有小到大排序许蓖,如果兩個consumer分配的分區(qū)數(shù)量相同蝴猪,則會按照其名稱的字典序進行排序。由于初始時蛔糯,這三個consumer都沒有分配任何分區(qū)拯腮,因而其排序結(jié)果即為其按照字典序進行排序的結(jié)果:
- 然后將各個分區(qū)依次遍歷分配給各個consumer,首先需要注意的是蚁飒,這里的遍歷并不是C0分配完了再分配給C1,而是每次分配分區(qū)的時候都整個的對所有的consumer從頭開始遍歷分配萝喘,如果當前consumer沒有訂閱當前分區(qū)淮逻,則會遍歷下一個consumer。然后需要注意的是阁簸,在整個分配的過程中爬早,各個consumer所分配的分區(qū)數(shù)是動態(tài)變化的,而這種變化是會體現(xiàn)在各個consumer的排序上的启妹,比如初始時C0是排在第一個的筛严,此時如果分配了一個分區(qū)給C0,那么C0就會排到最后饶米,因為其擁有的分區(qū)數(shù)是最多的桨啃,即始終按照所含分區(qū)數(shù)量從小到大排序。上面的六個分區(qū)整體的分配流程如下:
- 首先將t2-0嘗試分配給C0檬输,由于C0沒有訂閱t2照瘾,因而分配不成功,繼續(xù)輪詢下一個consumer丧慈;
- 然后將t2-0嘗試分配給C1析命,由于C1沒有訂閱t2,因而分配不成功,繼續(xù)輪詢下一個consumer鹃愤;
-
接著將t2-0嘗試分配給C2簇搅,由于C2訂閱了t2,因而分配成功软吐,此時由于C2分配的分區(qū)數(shù)發(fā)生變化馍资,各個consumer變更后的排序結(jié)果為:
接下來的t2-1和t2-2,由于也只有C2訂閱了t2关噪,因而其最終還是會分配給C2鸟蟹,最終在t2-0、t2-1和t2-2分配完之后使兔,各個consumer的排序以及其分區(qū)分配情況如下:
- 接著繼續(xù)分配t1-0建钥,首先嘗試將其分配給C0,由于C0沒有訂閱t1虐沥,因而分配不成功熊经,繼續(xù)輪詢下一個consumer;
-
然后嘗試將t1-0分配給C1欲险,由于C1訂閱了t1镐依,因而分配成功,此時各個consumer以及其分配的分區(qū)情況如下:
-
同理天试,接下來會分配t1-1槐壳,雖然C1和C2都訂閱了t1,但是由于C1排在C2前面喜每,因而該分區(qū)會分配給C1务唐,即:
-
最后,嘗試將t0-0分配給C0带兜,由于C0訂閱了t0枫笛,因而分配成功,最終的分配結(jié)果為:
上面的分配過程中刚照,需要始終注意的是刑巧,雖然示例中的consumer順序始終沒有變化,但這是由于各個分區(qū)分配之后正好每個consumer所分配的分區(qū)數(shù)量的排序結(jié)果與初始狀態(tài)一致无畔。這里讀者也可以比較一下這種分配方式與前面講解的Round Robin
進行對比啊楚,可以很明顯的發(fā)現(xiàn),Sticky
重分配策略分配得更加均勻一些檩互。
kafka日志壓縮
壓縮一詞簡單來講就是一種互換思想特幔,它是一種經(jīng)典的用 CPU 時間去換磁盤空間或者 I/O 傳輸量的思想,希望以較小的 CPU 開銷帶來更少的磁盤占用或更少的網(wǎng)絡(luò) I/O 傳輸。
Kafka 的消息分為兩層:消息集合 和 消息。一個消息集合中包含若干條日志項,而日志項才是真正封裝消息的地方竹观。Kafka 底層的消息日志由一系列消息集合日志項組成拍嵌。Kafka 通常不會直接操作具體的一條條消息遭赂,它總是在消息集合這個層面上進行寫入
操作。
在 Kafka 中横辆,壓縮會發(fā)生在兩個地方:Kafka Producer 和 Kafka Consumer撇他,為什么啟用壓縮?說白了就是消息太大狈蚤,需要變小一點
來使消息發(fā)的更快一些困肩。
有壓縮必有解壓縮,Producer 使用壓縮算法壓縮消息后并發(fā)送給服務(wù)器后脆侮,由 Consumer 消費者進行解壓縮锌畸,因為采用的何種壓縮算法是隨著 key、value 一起發(fā)送過去的靖避,所以消費者知道采用何種壓縮算法潭枣。
為什么kafka速度快?
Kafka 實現(xiàn)了零拷貝
原理來快速移動數(shù)據(jù)幻捏,避免了內(nèi)核之間的切換盆犁。Kafka 可以將數(shù)據(jù)記錄分批發(fā)送,從生產(chǎn)者到文件系統(tǒng)(Kafka 主題日志)到消費者篡九,可以端到端的查看這些批次的數(shù)據(jù)谐岁。
批處理能夠進行更有效的數(shù)據(jù)壓縮并減少 I/O 延遲,Kafka 采取順序?qū)懭氪疟P的方式瓮下,避免了隨機磁盤尋址的浪費翰铡。
總結(jié)一下其實就是四個要點
- 順序讀寫
- 零拷貝
- 消息壓縮
- 分批發(fā)送