簡介
kafka是一個分布式消息隊(duì)列罩阵。具有高性能竿秆、持久化、多副本備份稿壁、橫向擴(kuò)展能力幽钢。生產(chǎn)者往隊(duì)列里寫消息,消費(fèi)者從隊(duì)列里取消息進(jìn)行業(yè)務(wù)邏輯傅是。一般在架構(gòu)設(shè)計(jì)中起到解耦匪燕、削峰、異步處理的作用喧笔。
kafka對外使用topic的概念帽驯,生產(chǎn)者往topic里寫消息,消費(fèi)者從讀消息书闸。為了做到水平擴(kuò)展尼变,一個topic實(shí)際是由多個partition組成的,遇到瓶頸時浆劲,可以通過增加partition的數(shù)量來進(jìn)行橫向擴(kuò)容享甸。單個parition內(nèi)是保證消息有序。
每新寫一條消息梳侨,kafka就是在對應(yīng)的文件append寫,所以性能非常高日丹。
kafka的總體數(shù)據(jù)流是這樣的:
大概用法就是走哺,Producers往Brokers里面的指定Topic中寫消息,Consumers從Brokers里面拉去指定Topic的消息哲虾,然后進(jìn)行業(yè)務(wù)處理丙躏。圖中有兩個topic,topic 0有兩個partition束凑,topic 1有一個partition晒旅,三副本備份⊥羲撸可以看到consumer gourp 1中的consumer 2沒有分到partition處理废恋,這是有可能出現(xiàn)的谈秫,下面會講到。
關(guān)于broker鱼鼓、topics拟烫、partitions的一些元信息用zk來存,監(jiān)控和路由啥的也都會用到zk迄本。
生產(chǎn)
基本流程是這樣的:
創(chuàng)建一條記錄硕淑,記錄中一個要指定對應(yīng)的topic和value,key和partition可選嘉赎。先序列化置媳,然后按照topic和partition,放進(jìn)對應(yīng)的發(fā)送隊(duì)列中公条。kafka produce都是批量請求拇囊,會積攢一批,然后一起發(fā)送赃份,不是調(diào)send()就進(jìn)行立刻進(jìn)行網(wǎng)絡(luò)發(fā)包寂拆。如果partition沒填,那么情況會是這樣的:
key有填 按照key進(jìn)行哈希抓韩,相同key去一個partition纠永。(如果擴(kuò)展了partition的數(shù)量那么就不能保證了)
key沒填 round-robin來選partition
這些要發(fā)往同一個partition的請求按照配置,攢一波谒拴,然后由一個單獨(dú)的線程一次性發(fā)過去尝江。
API
有high level api,替我們把很多事情都干了英上,offset炭序,路由啥都替我們干了,用以來很簡單苍日。還有simple api惭聂,offset啥的都是要我們自己記錄。
partition
當(dāng)存在多副本的情況下相恃,會盡量把多個副本辜纲,分配到不同的broker上。kafka會為partition選出一個leader拦耐,之后所有該partition的請求耕腾,實(shí)際操作的都是leader,然后再同步到其他的follower杀糯。 當(dāng)一個broker歇菜后扫俺,所有l(wèi)eader在該broker上的partition都會重新選舉,選出一個leader固翰。(這里不像分布式文件存儲系統(tǒng)那樣會自動進(jìn)行復(fù)制保持副本數(shù))
然后這里就涉及兩個細(xì)節(jié):怎么分配partition狼纬,怎么選leader羹呵。
關(guān)于partition的分配,還有l(wèi)eader的選舉畸颅,總得有個執(zhí)行者担巩。在kafka中,這個執(zhí)行者就叫controller没炒。 kafka使用zk在broker中選出一個controller涛癌,用于partition分配和leader選舉。
partition的分配
將所有Broker(假設(shè)共n個Broker)和待分配的Partition排序
將第i個Partition分配到第(i mod n)個Broker上 (這個就是leader)
將第i個Partition的第j個Replica分配到第((i + j) mode n)個Broker上
leader容災(zāi)
controller會在Zookeeper的/brokers/ids節(jié)點(diǎn)上注冊Watch送火,一旦有broker宕機(jī)拳话,它就能知道。當(dāng)broker宕機(jī)后种吸,controller就會給受到影響的partition選出新leader弃衍。controller從zk的/brokers/topics/[topic]/partitions/[partition]/state中,讀取對應(yīng)partition的ISR(in-sync replica已同步的副本)列表坚俗,選一個出來做leader镜盯。選出leader后,更新zk猖败,然后發(fā)送LeaderAndISRRequest給受影響的broker速缆,讓它們改變知道這事。為什么這里不是使用zk通知恩闻,而是直接給broker發(fā)送rpc請求艺糜,我的理解可能是這樣做zk有性能問題吧。
如果ISR列表是空幢尚,那么會根據(jù)配置破停,隨便選一個replica做leader,或者干脆這個partition就是歇菜尉剩。如果ISR列表的有機(jī)器真慢,但是也歇菜了,那么還可以等ISR的機(jī)器活過來理茎。
多副本同步
這里的策略晤碘,服務(wù)端這邊的處理是follower從leader批量拉取數(shù)據(jù)來同步。但是具體的可靠性功蜓,是由生產(chǎn)者來決定的。生產(chǎn)者生產(chǎn)消息的時候宠蚂,通過request.required.acks參數(shù)來設(shè)置數(shù)據(jù)的可靠性式撼。
在acks=-1的時候,如果ISR少于min.insync.replicas指定的數(shù)目求厕,那么就會返回不可用著隆。
這里ISR列表中的機(jī)器是會變化的扰楼,根據(jù)配置replica.lag.time.max.ms,多久沒同步美浦,就會從ISR列表中剔除弦赖。以前還有根據(jù)落后多少條消息就踢出ISR,在1.0版本后就去掉了浦辨,因?yàn)檫@個值很難取蹬竖,在高峰的時候很容易出現(xiàn)節(jié)點(diǎn)不斷的進(jìn)出ISR列表。
從ISA中選出leader后流酬,follower會從把自己日志中上一個高水位后面的記錄去掉币厕,然后去和leader拿新的數(shù)據(jù)。因?yàn)樾碌膌eader選出來后芽腾,follower上面的數(shù)據(jù)旦装,可能比新leader多,所以要截取摊滔。這里高水位的意思阴绢,對于partition和leader,就是所有ISR中都有的最新一條記錄艰躺。消費(fèi)者最多只能讀到高水位呻袭;
從leader的角度來說高水位的更新會延遲一輪,例如寫入了一條新消息描滔,ISR中的broker都fetch到了棒妨,但是ISR中的broker只有在下一輪的fetch中才能告訴leader。
也正是由于這個高水位延遲一輪含长,在一些情況下券腔,kafka會出現(xiàn)丟數(shù)據(jù)和主備數(shù)據(jù)不一致的情況,0.11開始拘泞,使用leader epoch來代替高水位纷纫。(https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation#KIP-101-AlterReplicationProtocoltouseLeaderEpochratherthanHighWatermarkforTruncation-Scenario1:HighWatermarkTruncationfollowedbyImmediateLeaderElection)
消費(fèi)
訂閱topic是以一個消費(fèi)組來訂閱的,一個消費(fèi)組里面可以有多個消費(fèi)者陪腌。同一個消費(fèi)組中的兩個消費(fèi)者辱魁,不會同時消費(fèi)一個partition。換句話來說诗鸭,就是一個partition染簇,只能被消費(fèi)組里的一個消費(fèi)者消費(fèi) ,但是可以同時被多個消費(fèi)組消費(fèi)强岸。因此锻弓,如果消費(fèi)組內(nèi)的消費(fèi)者如果比partition多的話,那么就會有個別消費(fèi)者一直空閑蝌箍。
API
訂閱topic時青灼,可以用正則表達(dá)式暴心,如果有新topic匹配上,那能自動訂閱上杂拨。
offset的保存
一個消費(fèi)組消費(fèi)partition专普,需要保存offset記錄消費(fèi)到哪,以前保存在zk中弹沽,由于zk的寫性能不好檀夹,以前的解決方法都是consumer每隔一分鐘上報(bào)一次。這里zk的性能嚴(yán)重影響了消費(fèi)的速度贷币,而且很容易出現(xiàn)重復(fù)消費(fèi)击胜。在0.10版本后,kafka把這個offset的保存役纹,從zk總剝離偶摔,保存在一個名叫__consumeroffsets topic的topic中。寫進(jìn)消息的key由groupid促脉、topic辰斋、partition組成,value是偏移量offset瘸味。topic配置的清理策略是compact宫仗。總是保留最新的key旁仿,其余刪掉藕夫。一般情況下,每個key的offset都是緩存在內(nèi)存中枯冈,查詢的時候不用遍歷partition毅贮,如果沒有緩存,第一次就會遍歷partition建立緩存尘奏,然后查詢返回滩褥。
確定consumer group位移信息寫入__consumers_offsets的哪個partition,具體計(jì)算公式:
思考:如果正在跑的服務(wù)炫加,修改了offsets.topic.num.partitions瑰煎,那么offset的保存是不是就亂套了?
分配partition--reblance
生產(chǎn)過程中broker要分配partition俗孝,消費(fèi)過程這里酒甸,也要分配partition給消費(fèi)者。類似broker中選了一個controller出來赋铝,消費(fèi)也要從broker中選一個coordinator插勤,用于分配partition。下面從頂向下,分別闡述一下
- 怎么選coordinator饮六。
- 交互流程。
- reblance的流程苛蒲。
選coordinator
看offset保存在那個partition
該partition leader所在的broker就是被選定的coordinator
這里我們可以看到卤橄,consumer group的coordinator,和保存consumer group offset的partition leader是同一臺機(jī)器臂外。
交互流程
把coordinator選出來之后窟扑,就是要分配了 整個流程是這樣的:
consumer啟動、或者coordinator宕機(jī)了漏健,consumer會任意請求一個broker嚎货,發(fā)送ConsumerMetadataRequest請求,broker會按照上面說的方法蔫浆,選出這個consumer對應(yīng)coordinator的地址殖属。
consumer 發(fā)送heartbeat請求給coordinator,返回IllegalGeneration的話瓦盛,就說明consumer的信息是舊的了洗显,需要重新加入進(jìn)來,進(jìn)行reblance原环。返回成功挠唆,那么consumer就從上次分配的partition中繼續(xù)執(zhí)行。
reblance流程
consumer給coordinator發(fā)送JoinGroupRequest請求嘱吗。
這時其他consumer發(fā)heartbeat請求過來時玄组,coordinator會告訴他們,要reblance了谒麦。
其他consumer發(fā)送JoinGroupRequest請求俄讹。
所有記錄在冊的consumer都發(fā)了JoinGroupRequest請求之后,coordinator就會在這里consumer中隨便選一個leader弄匕。然后回JoinGroupRespone颅悉,這會告訴consumer你是follower還是leader,對于leader迁匠,還會把follower的信息帶給它剩瓶,讓它根據(jù)這些信息去分配partition
5、consumer向coordinator發(fā)送SyncGroupRequest城丧,其中l(wèi)eader的SyncGroupRequest會包含分配的情況延曙。6、coordinator回包亡哄,把分配的情況告訴consumer枝缔,包括leader。
當(dāng)partition或者消費(fèi)者的數(shù)量發(fā)生變化時,都得進(jìn)行reblance愿卸。列舉一下會reblance的情況:
增加partition
增加消費(fèi)者
消費(fèi)者主動關(guān)閉
消費(fèi)者宕機(jī)了
coordinator自己也宕機(jī)了
消息投遞語義
kafka支持3種消息投遞語義 At most once:最多一次灵临,消息可能會丟失,但不會重復(fù) At least once:最少一次趴荸,消息不會丟失儒溉,可能會重復(fù) Exactly once:只且一次,消息不丟失不重復(fù)发钝,只且消費(fèi)一次(0.11中實(shí)現(xiàn)顿涣,僅限于下游也是kafka)
在業(yè)務(wù)中,常常都是使用At least once的模型酝豪,如果需要可重入的話涛碑,往往是業(yè)務(wù)自己實(shí)現(xiàn)。
At least once
先獲取數(shù)據(jù)孵淘,再進(jìn)行業(yè)務(wù)處理蒲障,業(yè)務(wù)處理成功后commit offset。1夺英、生產(chǎn)者生產(chǎn)消息異常晌涕,消息是否成功寫入不確定,重做痛悯,可能寫入重復(fù)的消息 2余黎、消費(fèi)者處理消息,業(yè)務(wù)處理成功后载萌,更新offset失敗惧财,消費(fèi)者重啟的話,會重復(fù)消費(fèi)
At most once
先獲取數(shù)據(jù)扭仁,再commit offset垮衷,最后進(jìn)行業(yè)務(wù)處理。1乖坠、生產(chǎn)者生產(chǎn)消息異常搀突,不管,生產(chǎn)下一個消息熊泵,消息就丟了 2仰迁、消費(fèi)者處理消息,先更新offset顽分,再做業(yè)務(wù)處理徐许,做業(yè)務(wù)處理失敗,消費(fèi)者重啟卒蘸,消息就丟了
Exactly once
思路是這樣的雌隅,首先要保證消息不丟,再去保證不重復(fù)。所以盯著At least once的原因來搞恰起。首先想出來的:
生產(chǎn)者重做導(dǎo)致重復(fù)寫入消息----生產(chǎn)保證冪等性
消費(fèi)者重復(fù)消費(fèi)---消滅重復(fù)消費(fèi)修械,或者業(yè)務(wù)接口保證冪等性重復(fù)消費(fèi)也沒問題
由于業(yè)務(wù)接口是否冪等,不是kafka能保證的检盼,所以kafka這里提供的exactly once是有限制的祠肥,消費(fèi)者的下游也必須是kafka。 所以一下討論的梯皿,沒特殊說明,消費(fèi)者的下游系統(tǒng)都是kafka(注:使用kafka conector县恕,它對部分系統(tǒng)做了適配东羹,實(shí)現(xiàn)了exactly once)。
生產(chǎn)者冪等性好做忠烛,沒啥問題属提。
解決重復(fù)消費(fèi)有兩個方法:
下游系統(tǒng)保證冪等性,重復(fù)消費(fèi)也不會導(dǎo)致多條記錄美尸。
把commit offset和業(yè)務(wù)處理綁定成一個事務(wù)冤议。
本來exactly once實(shí)現(xiàn)第1點(diǎn)就ok了。
但是在一些使用場景下师坎,我們的數(shù)據(jù)源可能是多個topic恕酸,處理后輸出到多個topic,這時我們會希望輸出時要么全部成功胯陋,要么全部失敗蕊温。這就需要實(shí)現(xiàn)事務(wù)性。 既然要做事務(wù)遏乔,那么干脆把重復(fù)消費(fèi)的問題從根源上解決义矛,把commit offset和輸出到其他topic綁定成一個事務(wù)。
生產(chǎn)冪等性
思路是這樣的盟萨,為每個producer分配一個pid凉翻,作為該producer的唯一標(biāo)識。producer會為每一個<topic,partition>維護(hù)一個單調(diào)遞增的seq捻激。類似的制轰,broker也會為每個<pid,topic,partition>記錄下最新的seq。當(dāng)req_seq == broker_seq+1時铺罢,broker才會接受該消息艇挨。因?yàn)椋?/p>
消息的seq比broker的seq大超過時,說明中間有數(shù)據(jù)還沒寫入韭赘,即亂序了缩滨。
消息的seq不比broker的seq小,那么說明該消息已被保存。
事務(wù)性/原子性廣播
場景是這樣的:
- 先從多個源topic中獲取數(shù)據(jù)脉漏。
- 做業(yè)務(wù)處理苞冯,寫到下游的多個目的topic。
-
更新多個源topic的offset侧巨。
其中第2舅锄、3點(diǎn)作為一個事務(wù),要么全成功司忱,要么全失敗皇忿。這里得益與offset實(shí)際上是用特殊的topic去保存,這兩點(diǎn)都?xì)w一為寫多個topic的事務(wù)性處理坦仍。
基本思路是這樣的:引入tid(transaction id)鳍烁,和pid不同,這個id是應(yīng)用程序提供的繁扎,用于標(biāo)識事務(wù)炭晒,和producer是誰并沒關(guān)系野建。就是任何producer都可以使用這個tid去做事務(wù)怀浆,這樣進(jìn)行到一半就死掉的事務(wù)治宣,可以由另一個producer去恢復(fù)。同時為了記錄事務(wù)的狀態(tài)提澎,類似對offset的處理姚垃,引入transaction coordinator用于記錄transaction log。在集群中會有多個transaction coordinator盼忌,每個tid對應(yīng)唯一一個transaction coordinator莉炉。注:transaction log刪除策略是compact,已完成的事務(wù)會標(biāo)記成null碴犬,compact后不保留絮宁。
做事務(wù)時,先標(biāo)記開啟事務(wù)服协,寫入數(shù)據(jù)绍昂,全部成功就在transaction log中記錄為prepare commit狀態(tài),否則寫入prepare abort的狀態(tài)偿荷。之后再去給每個相關(guān)的partition寫入一條marker(commit或者abort)消息窘游,標(biāo)記這個事務(wù)的message可以被讀取或已經(jīng)廢棄。成功后在transaction log記錄下commit/abort狀態(tài)跳纳,至此事務(wù)結(jié)束忍饰。
1.首先使用tid請求任意一個broker(代碼中寫的是負(fù)載最小的broker),找到對應(yīng)的transaction coordinator寺庄。
2.請求transaction coordinator獲取到對應(yīng)的pid艾蓝,和pid對應(yīng)的epoch力崇,這個epoch用于防止僵死進(jìn)程復(fù)活導(dǎo)致消息錯亂,當(dāng)消息的epoch比當(dāng)前維護(hù)的epoch小時赢织,拒絕掉亮靴。tid和pid有一一對應(yīng)的關(guān)系,這樣對于同一個tid會返回相同的pid于置。
3.client先請求transaction coordinator記錄<topic,partition>的事務(wù)狀態(tài)茧吊,初始狀態(tài)是BEGIN,如果是該事務(wù)中第一個到達(dá)的<topic,partition>八毯,同時會對事務(wù)進(jìn)行計(jì)時搓侄;client輸出數(shù)據(jù)到相關(guān)的partition中;client再請求transaction coordinator記錄offset的<topic,partition>事務(wù)狀態(tài)话速;client發(fā)送offset commit到對應(yīng)offset partition休讳。
4.client發(fā)送commit請求,transaction coordinator記錄prepare commit/abort尿孔,然后發(fā)送marker給相關(guān)的partition。全部成功后筹麸,記錄commit/abort的狀態(tài)活合,最后這個記錄不需要等待其他replica的ack,因?yàn)閜repare不丟就能保證最終的正確性了物赶。
這里prepare的狀態(tài)主要是用于事務(wù)恢復(fù)白指,例如給相關(guān)的partition發(fā)送控制消息,沒發(fā)完就宕機(jī)了酵紫,備機(jī)起來后告嘲,producer發(fā)送請求獲取pid時,會把未完成的事務(wù)接著完成奖地。
當(dāng)partition中寫入commit的marker后橄唬,相關(guān)的消息就可被讀取。所以kafka事務(wù)在prepare commit到commit這個時間段內(nèi)参歹,消息是逐漸可見的仰楚,而不是同一時刻可見。
消費(fèi)事務(wù)
前面都是從生產(chǎn)的角度看待事務(wù)犬庇。還需要從消費(fèi)的角度去考慮一些問題僧界。消費(fèi)時,partition中會存在一些消息處于未commit狀態(tài)臭挽,即業(yè)務(wù)方應(yīng)該看不到的消息捂襟,需要過濾這些消息不讓業(yè)務(wù)看到,kafka選擇在消費(fèi)者進(jìn)程中進(jìn)行過來欢峰,而不是在broker中過濾葬荷,主要考慮的還是性能涨共。kafka高性能的一個關(guān)鍵點(diǎn)是zero copy,如果需要在broker中過濾闯狱,那么勢必需要讀取消息內(nèi)容到內(nèi)存煞赢,就會失去zero copy的特性。
文件組織
kafka的數(shù)據(jù)哄孤,實(shí)際上是以文件的形式存儲在文件系統(tǒng)的照筑。topic下有partition,partition下有segment瘦陈,segment是實(shí)際的一個個文件凝危,topic和partition都是抽象概念。
在目錄/${topicName}-{$partitionid}/
下晨逝,存儲著實(shí)際的log文件(即segment)蛾默,還有對應(yīng)的索引文件。
每個segment文件大小相等捉貌,文件名以這個segment中最小的offset命名支鸡,文件擴(kuò)展名是.log;segment對應(yīng)的索引的文件名字一樣趁窃,擴(kuò)展名是.index牧挣。有兩個index文件,一個是offset index用于按offset去查message醒陆,一個是time index用于按照時間去查瀑构,其實(shí)這里可以優(yōu)化合到一起,下面只說offset index刨摩∷律危總體的組織是這樣的:
為了減少索引文件的大小,降低空間使用澡刹,方便直接加載進(jìn)內(nèi)存中呻征,這里的索引使用稀疏矩陣,不會每一個message都記錄下具體位置罢浇,而是每隔一定的字節(jié)數(shù)怕犁,再建立一條索引。索引包含兩部分己莺,分別是baseOffset奏甫,還有position。
baseOffset:意思是這條索引對應(yīng)segment文件中的第幾條message凌受。這樣做方便使用數(shù)值壓縮算法來節(jié)省空間阵子。例如kafka使用的是varint。
position:在segment中的絕對位置胜蛉。
查找offset對應(yīng)的記錄時挠进,會先用二分法色乾,找出對應(yīng)的offset在哪個segment中,然后使用索引领突,在定位出offset在segment中的大概位置暖璧,再遍歷查找message。
常用配置項(xiàng)
broker配置
topic配置
關(guān)于日志清理君旦,默認(rèn)當(dāng)前正在寫的日志澎办,是怎么也不會清理掉的。還有0.10之前的版本金砍,時間看的是日志文件的mtime局蚀,但這個指是不準(zhǔn)確的,有可能文件被touch一下恕稠,mtime就變了琅绅。因此在0.10版本開始,改為使用該文件最新一條消息的時間來判斷鹅巍。按大小清理這里也要注意千扶,Kafka在定時任務(wù)中嘗試比較當(dāng)前日志量總大小是否超過閾值至少一個日志段的大小。如果超過但是沒超過一個日志段骆捧,那么就不會刪除澎羞。