07 | Kakfa_生產(chǎn)者消息分區(qū)機制原理剖析

我們在使用 Apache Kafka 生產(chǎn)和消費消息的時候,肯定是希望能夠?qū)?shù)據(jù)均勻地分配到所有服務(wù)器上。比如很多公司使用 Kafka 收集應(yīng)用服務(wù)器的日志數(shù)據(jù)伟桅,這種數(shù)據(jù)都是很多的雕蔽,特別是對于那種大批量機器組成的集群環(huán)境刊侯,每分鐘產(chǎn)生的日志量都能以 GB 數(shù)声离,因此如何將這么大的數(shù)據(jù)量均勻地分配到 Kafka 的各個 Broker 上芒炼,就成為一個非常重要的問題。

今天我就來和你說說 Kafka 生產(chǎn)者如何實現(xiàn)這個需求术徊,我會以 Java API 為例進行分析本刽,但實際上其他語言的實現(xiàn)邏輯也是類似的。

為什么分區(qū)赠涮?

如果你對 Kafka 分區(qū)(Partition)的概念還不熟悉子寓。前面我說過 Kafka 有主題(Topic)的概念,它是承載真實數(shù)據(jù)的邏輯容器笋除,而在主題之下還分為若干個分區(qū)斜友,也就是說 Kafka 的消息組織方式實際上是三級結(jié)構(gòu):主題 - 分區(qū) - 消息。主題下的每條消息只會保存在某一個分區(qū)中垃它,而不會在多個分區(qū)中被保存多份鲜屏。官網(wǎng)上的這張圖非常清晰地展示了 Kafka 的三級結(jié)構(gòu),如下所示:


現(xiàn)在我拋出一個問題你可以先思考一下:你覺得為什么 Kafka 要做這樣的設(shè)計国拇?為什么使用分區(qū)的概念而不是直接使用多個主題呢墙歪?

其實分區(qū)的作用就是提供負載均衡的能力,或者說對數(shù)據(jù)進行分區(qū)的主要原因贝奇,就是為了實現(xiàn)系統(tǒng)的高伸縮性(Scalability)虹菲。不同的分區(qū)能夠被放置到不同節(jié)點的機器上,而數(shù)據(jù)的讀寫操作也都是針對分區(qū)這個粒度而進行的掉瞳,這樣每個節(jié)點的機器都能獨立地執(zhí)行各自分區(qū)的讀寫請求處理毕源。并且,我們還可以通過添加新的節(jié)點機器來增加整體系統(tǒng)的吞吐量陕习。

實際上分區(qū)的概念以及分區(qū)數(shù)據(jù)庫早在 1980 年就已經(jīng)有大牛們在做了霎褐,比如那時候有個叫 Teradata 的數(shù)據(jù)庫就引入了分區(qū)的概念。

值得注意的是该镣,不同的分布式系統(tǒng)對分區(qū)的叫法也不盡相同冻璃。比如在 Kafka 中叫分區(qū),在 MongoDB 和 Elasticsearch 中就叫分片 Shard损合,而在 HBase 中則叫 Region省艳,在 Cassandra 中又被稱作 vnode。從表面看起來它們實現(xiàn)原理可能不盡相同嫁审,但對底層分區(qū)(Partitioning)的整體思想?yún)s從未改變跋炕。

除了提供負載均衡這種最核心的功能之外,利用分區(qū)也可以實現(xiàn)其他一些業(yè)務(wù)級別的需求律适,比如實現(xiàn)業(yè)務(wù)級別的消息順序的問題辐烂,這一點我今天也會分享一個具體的案例來說明遏插。

都有哪些分區(qū)策略?

下面我們說說 Kafka 生產(chǎn)者的分區(qū)策略纠修。所謂分區(qū)策略是決定生產(chǎn)者將消息發(fā)送到哪個分區(qū)的算法胳嘲。Kafka 為我們提供了默認的分區(qū)策略,同時它也支持你自定義分區(qū)策略扣草。

如果要自定義分區(qū)策略了牛,你需要顯式地配置生產(chǎn)者端的參數(shù)partitioner.class。這個參數(shù)該怎么設(shè)定呢德召?方法很簡單,在編寫生產(chǎn)者程序時汽纤,你可以編寫一個具體的類實現(xiàn)org.apache.kafka.clients.producer.Partitioner接口上岗。這個接口也很簡單,只定義了兩個方法:partition()close()蕴坪,通常你只需要實現(xiàn)最重要的 partition 方法肴掷。我們來看看這個方法的方法簽名:


int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

這里的topickey背传、keyBytes呆瞻、valuevalueBytes都屬于消息數(shù)據(jù),cluster則是集群信息(比如當前 Kafka 集群共有多少主題径玖、多少 Broker 等)痴脾。Kafka 給你這么多信息,就是希望讓你能夠充分地利用這些信息對消息進行分區(qū)梳星,計算出它要被發(fā)送到哪個分區(qū)中赞赖。只要你自己的實現(xiàn)類定義好了 partition 方法,同時設(shè)置partitioner.class參數(shù)為你自己實現(xiàn)類的 Full Qualified Name冤灾,那么生產(chǎn)者程序就會按照你的代碼邏輯對消息進行分區(qū)前域。雖說可以有無數(shù)種分區(qū)的可能,但比較常見的分區(qū)策略也就那么幾種韵吨,下面我來詳細介紹一下匿垄。

輪詢策略

也稱 Round-robin 策略,即順序分配归粉。比如一個主題下有 3 個分區(qū)椿疗,那么第一條消息被發(fā)送到分區(qū) 0,第二條被發(fā)送到分區(qū) 1糠悼,第三條被發(fā)送到分區(qū) 2变丧,以此類推。當生產(chǎn)第 4 條消息時又會重新開始绢掰,即將其分配到分區(qū) 0痒蓬,就像下面這張圖展示的那樣童擎。

這就是所謂的輪詢策略。輪詢策略是 Kafka Java 生產(chǎn)者 API 默認提供的分區(qū)策略攻晒。如果你未指定partitioner.class參數(shù)顾复,那么你的生產(chǎn)者程序會按照輪詢的方式在主題的所有分區(qū)間均勻地“碼放”消息。

輪詢策略有非常優(yōu)秀的負載均衡表現(xiàn)鲁捏,它總是能保證消息最大限度地被平均分配到所有分區(qū)上芯砸,故默認情況下它是最合理的分區(qū)策略,也是我們最常用的分區(qū)策略之一给梅。

隨機策略

也稱 Randomness 策略假丧。所謂隨機就是我們隨意地將消息放置到任意一個分區(qū)上,如下面這張圖所示动羽。


如果要實現(xiàn)隨機策略版的 partition 方法包帚,很簡單,只需要兩行代碼即可:


List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

return ThreadLocalRandom.current().nextInt(partitions.size());

先計算出該主題總的分區(qū)數(shù)运吓,然后隨機地返回一個小于它的正整數(shù)渴邦。

本質(zhì)上看隨機策略也是力求將數(shù)據(jù)均勻地打散到各個分區(qū),但從實際表現(xiàn)來看拘哨,它要遜于輪詢策略谋梭,所以如果追求數(shù)據(jù)的均勻分布,還是使用輪詢策略比較好倦青。事實上瓮床,隨機策略是老版本生產(chǎn)者使用的分區(qū)策略,在新版本中已經(jīng)改為輪詢了产镐。

按消息鍵保序策略

也稱 Key-ordering 策略纤垂。有點尷尬的是,這個名詞是我自己編的磷账,Kafka 官網(wǎng)上并無這樣的提法峭沦。

Kafka 允許為每條消息定義消息鍵,簡稱為 Key逃糟。這個 Key 的作用非常大吼鱼,它可以是一個有著明確業(yè)務(wù)含義的字符串,比如客戶代碼绰咽、部門編號或是業(yè)務(wù) ID 等菇肃;也可以用來表征消息元數(shù)據(jù)。特別是在 Kafka 不支持時間戳的年代取募,在一些場景中琐谤,工程師們都是直接將消息創(chuàng)建時間封裝進 Key 里面的。一旦消息被定義了 Key玩敏,那么你就可以保證同一個 Key 的所有消息都進入到相同的分區(qū)里面斗忌,由于每個分區(qū)下的消息處理都是有順序的质礼,故這個策略被稱為按消息鍵保序策略,如下圖所示织阳。

實現(xiàn)這個策略的 partition 方法同樣簡單眶蕉,只需要下面兩行代碼即可:


List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

return Math.abs(key.hashCode()) % partitions.size();

前面提到的 Kafka 默認分區(qū)策略實際上同時實現(xiàn)了兩種策略:如果指定了 Key,那么默認實現(xiàn)按消息鍵保序策略唧躲;如果沒有指定 Key造挽,則使用輪詢策略。

在你了解了 Kafka 默認的分區(qū)策略之后弄痹,我來給你講一個真實的案例饭入,希望能加強你對分區(qū)策略重要性的理解。

我曾經(jīng)給一個國企進行過 Kafka 培訓肛真,當時碰到的一個問題就是如何實現(xiàn)消息的順序問題谐丢。這家企業(yè)發(fā)送的 Kafka 的消息是有因果關(guān)系的,故處理因果關(guān)系也必須要保證有序性毁欣,否則先處理了“果”后處理“因”必然造成業(yè)務(wù)上的混亂庇谆。

當時那家企業(yè)的做法是給 Kafka 主題設(shè)置單分區(qū)岳掐,也就是 1 個分區(qū)凭疮。這樣所有的消息都只在這一個分區(qū)內(nèi)讀寫,因此保證了全局的順序性串述。這樣做雖然實現(xiàn)了因果關(guān)系的順序性执解,但也喪失了 Kafka 多分區(qū)帶來的高吞吐量和負載均衡的優(yōu)勢。

后來經(jīng)過了解和調(diào)研纲酗,我發(fā)現(xiàn)這種具有因果關(guān)系的消息都有一定的特點衰腌,比如在消息體中都封裝了固定的標志位,后來我就建議他們對此標志位設(shè)定專門的分區(qū)策略觅赊,保證同一標志位的所有消息都發(fā)送到同一分區(qū)右蕊,這樣既可以保證分區(qū)內(nèi)的消息順序,也可以享受到多分區(qū)帶來的性能紅利吮螺。

這種基于個別字段的分區(qū)策略本質(zhì)上就是按消息鍵保序的思想饶囚,其實更加合適的做法是把標志位數(shù)據(jù)提取出來統(tǒng)一放到 Key 中,這樣更加符合 Kafka 的設(shè)計思想鸠补。經(jīng)過改造之后萝风,這個企業(yè)的消息處理吞吐量一下提升了 40 多倍,從這個案例你也可以看到自定制分區(qū)策略的效果可見一斑紫岩。

其他分區(qū)策略

上面這幾種分區(qū)策略都是比較基礎(chǔ)的策略规惰,除此之外你還能想到哪些有實際用途的分區(qū)策略?其實還有一種比較常見的泉蝌,即所謂的基于地理位置的分區(qū)策略歇万。當然這種策略一般只針對那些大規(guī)模的 Kafka 集群揩晴,特別是跨城市、跨國家甚至是跨大洲的集群堕花。

我就拿“極客時間”舉個例子吧文狱,假設(shè)極客時間的所有服務(wù)都部署在北京的一個機房(這里我假設(shè)它是自建機房,不考慮公有云方案缘挽。其實即使是公有云瞄崇,實現(xiàn)邏輯也差不多),現(xiàn)在極客時間考慮在南方找個城市(比如廣州)再創(chuàng)建一個機房壕曼;另外從兩個機房中選取一部分機器共同組成一個大的 Kafka 集群苏研。顯然,這個集群中必然有一部分機器在北京腮郊,另外一部分機器在廣州摹蘑。

假設(shè)極客時間計劃為每個新注冊用戶提供一份注冊禮品,比如南方的用戶注冊極客時間可以免費得到一碗“甜豆腐腦”轧飞,而北方的新注冊用戶可以得到一碗“咸豆腐腦”衅鹿。如果用 Kafka 來實現(xiàn)則很簡單,只需要創(chuàng)建一個雙分區(qū)的主題过咬,然后再創(chuàng)建兩個消費者程序分別處理南北方注冊用戶邏輯即可大渤。

但問題是你需要把南北方注冊用戶的注冊消息正確地發(fā)送到位于南北方的不同機房中,因為處理這些消息的消費者程序只可能在某一個機房中啟動著掸绞。換句話說泵三,送甜豆腐腦的消費者程序只在廣州機房啟動著,而送咸豆腐腦的程序只在北京的機房中衔掸,如果你向廣州機房中的 Broker 發(fā)送北方注冊用戶的消息烫幕,那么這個用戶將無法得到禮品!

此時我們就可以根據(jù) Broker 所在的 IP 地址實現(xiàn)定制化的分區(qū)策略敞映。比如下面這段代碼:


List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();

我們可以從所有分區(qū)中找出那些 Leader 副本在南方的所有分區(qū)较曼,然后隨機挑選一個進行消息發(fā)送。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末振愿,一起剝皮案震驚了整個濱河市捷犹,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌埃疫,老刑警劉巖伏恐,帶你破解...
    沈念sama閱讀 218,640評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異栓霜,居然都是意外死亡翠桦,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,254評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來销凑,“玉大人丛晌,你說我怎么就攤上這事《酚祝” “怎么了澎蛛?”我有些...
    開封第一講書人閱讀 165,011評論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長蜕窿。 經(jīng)常有香客問我谋逻,道長,這世上最難降的妖魔是什么桐经? 我笑而不...
    開封第一講書人閱讀 58,755評論 1 294
  • 正文 為了忘掉前任毁兆,我火速辦了婚禮,結(jié)果婚禮上阴挣,老公的妹妹穿的比我還像新娘气堕。我一直安慰自己,他們只是感情好畔咧,可當我...
    茶點故事閱讀 67,774評論 6 392
  • 文/花漫 我一把揭開白布茎芭。 她就那樣靜靜地躺著,像睡著了一般誓沸。 火紅的嫁衣襯著肌膚如雪梅桩。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,610評論 1 305
  • 那天蔽介,我揣著相機與錄音摘投,去河邊找鬼煮寡。 笑死虹蓄,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的幸撕。 我是一名探鬼主播薇组,決...
    沈念sama閱讀 40,352評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼坐儿!你這毒婦竟也來了律胀?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,257評論 0 276
  • 序言:老撾萬榮一對情侶失蹤貌矿,失蹤者是張志新(化名)和其女友劉穎炭菌,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體逛漫,經(jīng)...
    沈念sama閱讀 45,717評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡黑低,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,894評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片克握。...
    茶點故事閱讀 40,021評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡蕾管,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出菩暗,到底是詐尸還是另有隱情掰曾,我是刑警寧澤,帶...
    沈念sama閱讀 35,735評論 5 346
  • 正文 年R本政府宣布停团,位于F島的核電站旷坦,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏佑稠。R本人自食惡果不足惜塞蹭,卻給世界環(huán)境...
    茶點故事閱讀 41,354評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望讶坯。 院中可真熱鬧番电,春花似錦、人聲如沸辆琅。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,936評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽婉烟。三九已至娩井,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間似袁,已是汗流浹背洞辣。 一陣腳步聲響...
    開封第一講書人閱讀 33,054評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留昙衅,地道東北人扬霜。 一個月前我還...
    沈念sama閱讀 48,224評論 3 371
  • 正文 我出身青樓,卻偏偏與公主長得像而涉,于是被迫代替她去往敵國和親著瓶。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,974評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 本文目標 Topic&Partition 消息分發(fā)策略 消息消費原理 消息的存儲策略 Partition 副本機制...
    JavaEdge閱讀 1,073評論 1 3
  • 簡介 ? Kafka起初是由LinkedIn公司采用Scala語言開發(fā)的一個多分區(qū)啼县、多副本且基于Zookeep...
    四夕_y閱讀 829評論 0 1
  • 本章我們將會討論Kafka生產(chǎn)者是如何發(fā)送消息到Kafka的材原。Kafka項目有一個生產(chǎn)者客戶端,我們可以通過這個客...
    printf200閱讀 7,964評論 0 3
  • 本章我們將會討論Kafka生產(chǎn)者是如何發(fā)送消息到Kafka的季眷。Kafka項目有一個生產(chǎn)者客戶端余蟹,我們可以通過這個客...
    zwb_jianshu閱讀 475評論 0 0
  • 媽媽,我生日的時候子刮,你送我禮物嗎威酒?一聽這話,我怎么有被挖陷阱的感覺呢?立刻打起十二萬分的精神兼搏,目不轉(zhuǎn)睛地盯著女兒卵慰。...
    碎碎妖閱讀 315評論 2 6