我們在使用 Apache Kafka 生產(chǎn)和消費消息的時候,肯定是希望能夠?qū)?shù)據(jù)均勻地分配到所有服務(wù)器上。比如很多公司使用 Kafka 收集應(yīng)用服務(wù)器的日志數(shù)據(jù)伟桅,這種數(shù)據(jù)都是很多的雕蔽,特別是對于那種大批量機器組成的集群環(huán)境刊侯,每分鐘產(chǎn)生的日志量都能以 GB 數(shù)声离,因此如何將這么大的數(shù)據(jù)量均勻地分配到 Kafka 的各個 Broker 上芒炼,就成為一個非常重要的問題。
今天我就來和你說說 Kafka 生產(chǎn)者如何實現(xiàn)這個需求术徊,我會以 Java API 為例進行分析本刽,但實際上其他語言的實現(xiàn)邏輯也是類似的。
為什么分區(qū)赠涮?
如果你對 Kafka 分區(qū)(Partition)的概念還不熟悉子寓。前面我說過 Kafka 有主題(Topic)的概念,它是承載真實數(shù)據(jù)的邏輯容器笋除,而在主題之下還分為若干個分區(qū)斜友,也就是說 Kafka 的消息組織方式實際上是三級結(jié)構(gòu):主題 - 分區(qū) - 消息。主題下的每條消息只會保存在某一個分區(qū)中垃它,而不會在多個分區(qū)中被保存多份鲜屏。官網(wǎng)上的這張圖非常清晰地展示了 Kafka 的三級結(jié)構(gòu),如下所示:
現(xiàn)在我拋出一個問題你可以先思考一下:你覺得為什么 Kafka 要做這樣的設(shè)計国拇?為什么使用分區(qū)的概念而不是直接使用多個主題呢墙歪?
其實分區(qū)的作用就是提供負載均衡的能力,或者說對數(shù)據(jù)進行分區(qū)的主要原因贝奇,就是為了實現(xiàn)系統(tǒng)的高伸縮性(Scalability)虹菲。不同的分區(qū)能夠被放置到不同節(jié)點的機器上,而數(shù)據(jù)的讀寫操作也都是針對分區(qū)這個粒度而進行的掉瞳,這樣每個節(jié)點的機器都能獨立地執(zhí)行各自分區(qū)的讀寫請求處理毕源。并且,我們還可以通過添加新的節(jié)點機器來增加整體系統(tǒng)的吞吐量陕习。
實際上分區(qū)的概念以及分區(qū)數(shù)據(jù)庫早在 1980 年就已經(jīng)有大牛們在做了霎褐,比如那時候有個叫 Teradata 的數(shù)據(jù)庫就引入了分區(qū)的概念。
值得注意的是该镣,不同的分布式系統(tǒng)對分區(qū)的叫法也不盡相同冻璃。比如在 Kafka 中叫分區(qū),在 MongoDB 和 Elasticsearch 中就叫分片 Shard损合,而在 HBase 中則叫 Region省艳,在 Cassandra 中又被稱作 vnode。從表面看起來它們實現(xiàn)原理可能不盡相同嫁审,但對底層分區(qū)(Partitioning)的整體思想?yún)s從未改變跋炕。
除了提供負載均衡這種最核心的功能之外,利用分區(qū)也可以實現(xiàn)其他一些業(yè)務(wù)級別的需求律适,比如實現(xiàn)業(yè)務(wù)級別的消息順序的問題辐烂,這一點我今天也會分享一個具體的案例來說明遏插。
都有哪些分區(qū)策略?
下面我們說說 Kafka 生產(chǎn)者的分區(qū)策略纠修。所謂分區(qū)策略是決定生產(chǎn)者將消息發(fā)送到哪個分區(qū)的算法胳嘲。Kafka 為我們提供了默認的分區(qū)策略,同時它也支持你自定義分區(qū)策略扣草。
如果要自定義分區(qū)策略了牛,你需要顯式地配置生產(chǎn)者端的參數(shù)partitioner.class
。這個參數(shù)該怎么設(shè)定呢德召?方法很簡單,在編寫生產(chǎn)者程序時汽纤,你可以編寫一個具體的類實現(xiàn)org.apache.kafka.clients.producer.Partitioner
接口上岗。這個接口也很簡單,只定義了兩個方法:partition()
和close()
蕴坪,通常你只需要實現(xiàn)最重要的 partition 方法肴掷。我們來看看這個方法的方法簽名:
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
這里的topic
、key
背传、keyBytes
呆瞻、value
和valueBytes
都屬于消息數(shù)據(jù),cluster
則是集群信息(比如當前 Kafka 集群共有多少主題径玖、多少 Broker 等)痴脾。Kafka 給你這么多信息,就是希望讓你能夠充分地利用這些信息對消息進行分區(qū)梳星,計算出它要被發(fā)送到哪個分區(qū)中赞赖。只要你自己的實現(xiàn)類定義好了 partition 方法,同時設(shè)置partitioner.class
參數(shù)為你自己實現(xiàn)類的 Full Qualified Name冤灾,那么生產(chǎn)者程序就會按照你的代碼邏輯對消息進行分區(qū)前域。雖說可以有無數(shù)種分區(qū)的可能,但比較常見的分區(qū)策略也就那么幾種韵吨,下面我來詳細介紹一下匿垄。
輪詢策略
也稱 Round-robin 策略,即順序分配归粉。比如一個主題下有 3 個分區(qū)椿疗,那么第一條消息被發(fā)送到分區(qū) 0,第二條被發(fā)送到分區(qū) 1糠悼,第三條被發(fā)送到分區(qū) 2变丧,以此類推。當生產(chǎn)第 4 條消息時又會重新開始绢掰,即將其分配到分區(qū) 0痒蓬,就像下面這張圖展示的那樣童擎。
這就是所謂的輪詢策略。輪詢策略是 Kafka Java 生產(chǎn)者 API 默認提供的分區(qū)策略攻晒。如果你未指定partitioner.class
參數(shù)顾复,那么你的生產(chǎn)者程序會按照輪詢的方式在主題的所有分區(qū)間均勻地“碼放”消息。
輪詢策略有非常優(yōu)秀的負載均衡表現(xiàn)鲁捏,它總是能保證消息最大限度地被平均分配到所有分區(qū)上芯砸,故默認情況下它是最合理的分區(qū)策略,也是我們最常用的分區(qū)策略之一给梅。
隨機策略
也稱 Randomness 策略假丧。所謂隨機就是我們隨意地將消息放置到任意一個分區(qū)上,如下面這張圖所示动羽。
如果要實現(xiàn)隨機策略版的 partition 方法包帚,很簡單,只需要兩行代碼即可:
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
先計算出該主題總的分區(qū)數(shù)运吓,然后隨機地返回一個小于它的正整數(shù)渴邦。
本質(zhì)上看隨機策略也是力求將數(shù)據(jù)均勻地打散到各個分區(qū),但從實際表現(xiàn)來看拘哨,它要遜于輪詢策略谋梭,所以如果追求數(shù)據(jù)的均勻分布,還是使用輪詢策略比較好倦青。事實上瓮床,隨機策略是老版本生產(chǎn)者使用的分區(qū)策略,在新版本中已經(jīng)改為輪詢了产镐。
按消息鍵保序策略
也稱 Key-ordering 策略纤垂。有點尷尬的是,這個名詞是我自己編的磷账,Kafka 官網(wǎng)上并無這樣的提法峭沦。
Kafka 允許為每條消息定義消息鍵,簡稱為 Key逃糟。這個 Key 的作用非常大吼鱼,它可以是一個有著明確業(yè)務(wù)含義的字符串,比如客戶代碼绰咽、部門編號或是業(yè)務(wù) ID 等菇肃;也可以用來表征消息元數(shù)據(jù)。特別是在 Kafka 不支持時間戳的年代取募,在一些場景中琐谤,工程師們都是直接將消息創(chuàng)建時間封裝進 Key 里面的。一旦消息被定義了 Key玩敏,那么你就可以保證同一個 Key 的所有消息都進入到相同的分區(qū)里面斗忌,由于每個分區(qū)下的消息處理都是有順序的质礼,故這個策略被稱為按消息鍵保序策略,如下圖所示织阳。
實現(xiàn)這個策略的 partition 方法同樣簡單眶蕉,只需要下面兩行代碼即可:
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
前面提到的 Kafka 默認分區(qū)策略實際上同時實現(xiàn)了兩種策略:如果指定了 Key,那么默認實現(xiàn)按消息鍵保序策略唧躲;如果沒有指定 Key造挽,則使用輪詢策略。
在你了解了 Kafka 默認的分區(qū)策略之后弄痹,我來給你講一個真實的案例饭入,希望能加強你對分區(qū)策略重要性的理解。
我曾經(jīng)給一個國企進行過 Kafka 培訓肛真,當時碰到的一個問題就是如何實現(xiàn)消息的順序問題谐丢。這家企業(yè)發(fā)送的 Kafka 的消息是有因果關(guān)系的,故處理因果關(guān)系也必須要保證有序性毁欣,否則先處理了“果”后處理“因”必然造成業(yè)務(wù)上的混亂庇谆。
當時那家企業(yè)的做法是給 Kafka 主題設(shè)置單分區(qū)岳掐,也就是 1 個分區(qū)凭疮。這樣所有的消息都只在這一個分區(qū)內(nèi)讀寫,因此保證了全局的順序性串述。這樣做雖然實現(xiàn)了因果關(guān)系的順序性执解,但也喪失了 Kafka 多分區(qū)帶來的高吞吐量和負載均衡的優(yōu)勢。
后來經(jīng)過了解和調(diào)研纲酗,我發(fā)現(xiàn)這種具有因果關(guān)系的消息都有一定的特點衰腌,比如在消息體中都封裝了固定的標志位,后來我就建議他們對此標志位設(shè)定專門的分區(qū)策略觅赊,保證同一標志位的所有消息都發(fā)送到同一分區(qū)右蕊,這樣既可以保證分區(qū)內(nèi)的消息順序,也可以享受到多分區(qū)帶來的性能紅利吮螺。
這種基于個別字段的分區(qū)策略本質(zhì)上就是按消息鍵保序的思想饶囚,其實更加合適的做法是把標志位數(shù)據(jù)提取出來統(tǒng)一放到 Key 中,這樣更加符合 Kafka 的設(shè)計思想鸠补。經(jīng)過改造之后萝风,這個企業(yè)的消息處理吞吐量一下提升了 40 多倍,從這個案例你也可以看到自定制分區(qū)策略的效果可見一斑紫岩。
其他分區(qū)策略
上面這幾種分區(qū)策略都是比較基礎(chǔ)的策略规惰,除此之外你還能想到哪些有實際用途的分區(qū)策略?其實還有一種比較常見的泉蝌,即所謂的基于地理位置的分區(qū)策略歇万。當然這種策略一般只針對那些大規(guī)模的 Kafka 集群揩晴,特別是跨城市、跨國家甚至是跨大洲的集群堕花。
我就拿“極客時間”舉個例子吧文狱,假設(shè)極客時間的所有服務(wù)都部署在北京的一個機房(這里我假設(shè)它是自建機房,不考慮公有云方案缘挽。其實即使是公有云瞄崇,實現(xiàn)邏輯也差不多),現(xiàn)在極客時間考慮在南方找個城市(比如廣州)再創(chuàng)建一個機房壕曼;另外從兩個機房中選取一部分機器共同組成一個大的 Kafka 集群苏研。顯然,這個集群中必然有一部分機器在北京腮郊,另外一部分機器在廣州摹蘑。
假設(shè)極客時間計劃為每個新注冊用戶提供一份注冊禮品,比如南方的用戶注冊極客時間可以免費得到一碗“甜豆腐腦”轧飞,而北方的新注冊用戶可以得到一碗“咸豆腐腦”衅鹿。如果用 Kafka 來實現(xiàn)則很簡單,只需要創(chuàng)建一個雙分區(qū)的主題过咬,然后再創(chuàng)建兩個消費者程序分別處理南北方注冊用戶邏輯即可大渤。
但問題是你需要把南北方注冊用戶的注冊消息正確地發(fā)送到位于南北方的不同機房中,因為處理這些消息的消費者程序只可能在某一個機房中啟動著掸绞。換句話說泵三,送甜豆腐腦的消費者程序只在廣州機房啟動著,而送咸豆腐腦的程序只在北京的機房中衔掸,如果你向廣州機房中的 Broker 發(fā)送北方注冊用戶的消息烫幕,那么這個用戶將無法得到禮品!
此時我們就可以根據(jù) Broker 所在的 IP 地址實現(xiàn)定制化的分區(qū)策略敞映。比如下面這段代碼:
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();
我們可以從所有分區(qū)中找出那些 Leader 副本在南方的所有分區(qū)较曼,然后隨機挑選一個進行消息發(fā)送。