Kafka的分區(qū)數(shù)是不是越多越好骏掀?
分區(qū)多的優(yōu)點(diǎn)
kafka使用分區(qū)將topic的消息打散到多個(gè)分區(qū)分布保存在不同的broker上突诬,實(shí)現(xiàn)了producer和consumer消息處理的高吞吐量伞插。Kafka的producer和consumer都可以多線程地并行操作,而每個(gè)線程處理的是一個(gè)分區(qū)的數(shù)據(jù)。因此分區(qū)實(shí)際上是調(diào)優(yōu)Kafka并行度的最小單元写妥。對(duì)于producer而言,它實(shí)際上是用多個(gè)線程并發(fā)地向不同分區(qū)所在的broker發(fā)起Socket連接同時(shí)給這些分區(qū)發(fā)送消息审姓;而consumer珍特,同一個(gè)消費(fèi)組內(nèi)的所有consumer線程都被指定topic的某一個(gè)分區(qū)進(jìn)行消費(fèi)。
所以說魔吐,如果一個(gè)topic分區(qū)越多扎筒,理論上整個(gè)集群所能達(dá)到的吞吐量就越大。
分區(qū)不是越多越好
分區(qū)是否越多越好呢酬姆?顯然也不是嗜桌,因?yàn)槊總€(gè)分區(qū)都有自己的開銷:
一、客戶端/服務(wù)器端需要使用的內(nèi)存就越多 Kafka0.8.2之后轴踱,在客戶端producer有個(gè)參數(shù)batch.size症脂,默認(rèn)是16KB。它會(huì)為每個(gè)分區(qū)緩存消息淫僻,一旦滿了就打包將消息批量發(fā)出诱篷。看上去這是個(gè)能夠提升性能的設(shè)計(jì)雳灵。不過很顯然棕所,因?yàn)檫@個(gè)參數(shù)是分區(qū)級(jí)別的,如果分區(qū)數(shù)越多悯辙,這部分緩存所需的內(nèi)存占用也會(huì)更多琳省。假設(shè)你有10000個(gè)分區(qū)迎吵,按照默認(rèn)設(shè)置,這部分緩存需要占用約157MB的內(nèi)存针贬。而consumer端呢击费?我們拋開獲取數(shù)據(jù)所需的內(nèi)存不說,只說線程的開銷桦他。如果還是假設(shè)有10000個(gè)分區(qū)蔫巩,同時(shí)consumer線程數(shù)要匹配分區(qū)數(shù)(大部分情況下是最佳的消費(fèi)吞吐量配置)的話,那么在consumer client就要?jiǎng)?chuàng)建10000個(gè)線程快压,也需要?jiǎng)?chuàng)建大約10000個(gè)Socket去獲取分區(qū)數(shù)據(jù)圆仔。這里面的線程切換的開銷本身已經(jīng)不容小覷了。
服務(wù)器端的開銷也不小蔫劣,如果閱讀Kafka源碼的話可以發(fā)現(xiàn)坪郭,服務(wù)器端的很多組件都在內(nèi)存中維護(hù)了分區(qū)級(jí)別的緩存,比如controller脉幢,F(xiàn)etcherManager等歪沃,因此分區(qū)數(shù)越多,這種緩存的成本就越大鸵隧。
二绸罗、文件句柄的開銷 每個(gè)分區(qū)在底層文件系統(tǒng)都有屬于自己的一個(gè)目錄。該目錄下通常會(huì)有兩個(gè)文件: base_offset.log和base_offset.index豆瘫。Kafak的controller和ReplicaManager會(huì)為每個(gè)broker都保存這兩個(gè)文件句柄(file handler)珊蟀。很明顯,如果分區(qū)數(shù)越多外驱,所需要保持打開狀態(tài)的文件句柄數(shù)也就越多育灸,最終可能會(huì)突破你的ulimit -n的限制。
三昵宇、降低高可用性 Kafka通過副本(replica)機(jī)制來保證高可用磅崭。具體做法就是為每個(gè)分區(qū)保存若干個(gè)副本(replica_factor指定副本數(shù))。每個(gè)副本保存在不同的broker上瓦哎。期中的一個(gè)副本充當(dāng)leader 副本砸喻,負(fù)責(zé)處理producer和consumer請(qǐng)求。其他副本充當(dāng)follower角色蒋譬,由Kafka controller負(fù)責(zé)保證與leader的同步割岛。如果leader所在的broker掛掉了,contorller會(huì)檢測(cè)到然后在zookeeper的幫助下重選出新的leader——這中間會(huì)有短暫的不可用時(shí)間窗口犯助,雖然大部分情況下可能只是幾毫秒級(jí)別癣漆。但如果你有10000個(gè)分區(qū),10個(gè)broker剂买,也就是說平均每個(gè)broker上有1000個(gè)分區(qū)惠爽。此時(shí)這個(gè)broker掛掉了癌蓖,那么zookeeper和controller需要立即對(duì)這1000個(gè)分區(qū)進(jìn)行l(wèi)eader選舉。比起很少的分區(qū)leader選舉而言婚肆,這必然要花更長(zhǎng)的時(shí)間租副,并且通常不是線性累加的。如果這個(gè)broker還同時(shí)是controller情況就更糟了旬痹。
如何確定分區(qū)數(shù)量呢
可以遵循一定的步驟來嘗試確定分區(qū)數(shù):創(chuàng)建一個(gè)只有1個(gè)分區(qū)的topic附井,然后測(cè)試這個(gè)topic的producer吞吐量和consumer吞吐量。假設(shè)它們的值分別是Tp和Tc两残,單位可以是MB/s。然后假設(shè)總的目標(biāo)吞吐量是Tt把跨,那么分區(qū)數(shù) = Tt / max(Tp, Tc)
說明:Tp表示producer的吞吐量人弓。測(cè)試producer通常是很容易的,因?yàn)樗倪壿嫹浅:?jiǎn)單着逐,就是直接發(fā)送消息到Kafka就好了崔赌。Tc表示consumer的吞吐量。測(cè)試Tc通常與應(yīng)用的關(guān)系更大耸别, 因?yàn)門c的值取決于你拿到消息之后執(zhí)行什么操作健芭,因此Tc的測(cè)試通常也要麻煩一些。
一條消息如何知道要被發(fā)送到哪個(gè)分區(qū)秀姐?
按照key值分配
默認(rèn)情況下慈迈,Kafka根據(jù)傳遞消息的key來進(jìn)行分區(qū)的分配,即hash(key) % numPartitions:
def partition(key: Any, numPartitions: Int): Int = {
Utils.abs(key.hashCode) % numPartitions
}
這保證了相同key的消息一定會(huì)被路由到相同的分區(qū)省有。
key為null時(shí)痒留,從緩存中取分區(qū)id或者隨機(jī)取一個(gè)
如果你沒有指定key,那么Kafka是如何確定這條消息去往哪個(gè)分區(qū)的呢蠢沿?
if(key == null) { // 如果沒有指定key
val id = sendPartitionPerTopicCache.get(topic) // 先看看Kafka有沒有緩存的現(xiàn)成的分區(qū)Id
id match {
case Some(partitionId) =>
partitionId // 如果有的話直接使用這個(gè)分區(qū)Id就好了
case None => // 如果沒有的話伸头,
val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) //找出所有可用分區(qū)的leader所在的broker
if (availablePartitions.isEmpty)
throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
val index = Utils.abs(Random.nextInt) % availablePartitions.size // 從中隨機(jī)挑一個(gè)
val partitionId = availablePartitions(index).partitionId
sendPartitionPerTopicCache.put(topic, partitionId) // 更新緩存以備下一次直接使用
partitionId
}
}
不指定key時(shí),Kafka幾乎就是隨機(jī)找一個(gè)分區(qū)發(fā)送無key的消息舷蟀,然后把這個(gè)分區(qū)號(hào)加入到緩存中以備后面直接使用——當(dāng)然了恤磷,Kafka本身也會(huì)清空該緩存(默認(rèn)每10分鐘或每次請(qǐng)求topic元數(shù)據(jù)時(shí))。
Consumer個(gè)數(shù)與分區(qū)數(shù)有什么關(guān)系野宜?
topic下的一個(gè)分區(qū)只能被同一個(gè)consumer group下的一個(gè)consumer線程來消費(fèi)扫步,但反之并不成立,即一個(gè)consumer線程可以消費(fèi)多個(gè)分區(qū)的數(shù)據(jù)速缨,比如Kafka提供的ConsoleConsumer锌妻,默認(rèn)就只是一個(gè)線程來消費(fèi)所有分區(qū)的數(shù)據(jù)。
即分區(qū)數(shù)決定了同組消費(fèi)者個(gè)數(shù)的上限
所以旬牲,如果你的分區(qū)數(shù)是N仿粹,那么最好線程數(shù)也保持為N搁吓,這樣通常能夠達(dá)到最大的吞吐量。超過N的配置只是浪費(fèi)系統(tǒng)資源吭历,因?yàn)槎喑龅木€程不會(huì)被分配到任何分區(qū)堕仔。
Consumer消費(fèi)Partition的分配策略
Kafka提供的兩種分配策略: range和roundrobin,由參數(shù)partition.assignment.strategy指定晌区,默認(rèn)是range策略摩骨。
當(dāng)以下事件發(fā)生時(shí),Kafka 將會(huì)進(jìn)行一次分區(qū)分配:
- 同一個(gè) Consumer Group 內(nèi)新增消費(fèi)者
- 消費(fèi)者離開當(dāng)前所屬的Consumer Group朗若,包括shuts down 或 crashes
- 訂閱的主題新增分區(qū)
將分區(qū)的所有權(quán)從一個(gè)消費(fèi)者移到另一個(gè)消費(fèi)者稱為重新平衡(rebalance)恼五,如何rebalance就涉及到本文提到的分區(qū)分配策略。
下面我們將詳細(xì)介紹 Kafka 內(nèi)置的兩種分區(qū)分配策略哭懈。本文假設(shè)我們有個(gè)名為 T1 的主題灾馒,其包含了10個(gè)分區(qū),然后我們有兩個(gè)消費(fèi)者(C1遣总,C2)
來消費(fèi)這10個(gè)分區(qū)里面的數(shù)據(jù)睬罗,而且 C1 的 num.streams = 1,C2 的 num.streams = 2旭斥。
Range strategy
Range策略是對(duì)每個(gè)主題而言的容达,首先對(duì)同一個(gè)主題里面的分區(qū)按照序號(hào)進(jìn)行排序,并對(duì)消費(fèi)者按照字母順序進(jìn)行排序垂券。在我們的例子里面花盐,排完序的分區(qū)將會(huì)是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消費(fèi)者線程排完序?qū)?huì)是C1-0, C2-0, C2-1圆米。然后將partitions的個(gè)數(shù)除于消費(fèi)者線程的總數(shù)來決定每個(gè)消費(fèi)者線程消費(fèi)幾個(gè)分區(qū)卒暂。如果除不盡,那么前面幾個(gè)消費(fèi)者線程將會(huì)多消費(fèi)一個(gè)分區(qū)娄帖。在我們的例子里面也祠,我們有10個(gè)分區(qū),3個(gè)消費(fèi)者線程近速, 10 / 3 = 3诈嘿,而且除不盡,那么消費(fèi)者線程 C1-0 將會(huì)多消費(fèi)一個(gè)分區(qū)削葱,所以最后分區(qū)分配的結(jié)果看起來是這樣的:
- C1-0 將消費(fèi) 0, 1, 2, 3 分區(qū)
- C2-0 將消費(fèi) 4, 5, 6 分區(qū)
- C2-1 將消費(fèi) 7, 8, 9 分區(qū)
假如我們有11個(gè)分區(qū)奖亚,那么最后分區(qū)分配的結(jié)果看起來是這樣的:
- C1-0 將消費(fèi) 0, 1, 2, 3 分區(qū)
- C2-0 將消費(fèi) 4, 5, 6, 7 分區(qū)
- C2-1 將消費(fèi) 8, 9, 10 分區(qū)
假如我們有2個(gè)主題(T1和T2),分別有10個(gè)分區(qū)析砸,那么最后分區(qū)分配的結(jié)果看起來是這樣的:
- C1-0 將消費(fèi) T1主題的 0, 1, 2, 3 分區(qū)以及 T2主題的 0, 1, 2, 3分區(qū)
- C2-0 將消費(fèi) T1主題的 4, 5, 6 分區(qū)以及 T2主題的 4, 5, 6分區(qū)
- C2-1 將消費(fèi) T1主題的 7, 8, 9 分區(qū)以及 T2主題的 7, 8, 9分區(qū)
可以看出昔字,C1-0 消費(fèi)者線程比其他消費(fèi)者線程多消費(fèi)了2個(gè)分區(qū),這就是Range strategy的一個(gè)很明顯的弊端。
RoundRobin strategy
使用RoundRobin策略有兩個(gè)前提條件必須滿足:
- 同一個(gè)Consumer Group里面的所有消費(fèi)者的num.streams必須相等作郭;
- 每個(gè)消費(fèi)者訂閱的主題必須相同陨囊。
所以這里假設(shè)前面提到的2個(gè)消費(fèi)者的num.streams = 2。RoundRobin策略的工作原理:將所有主題的分區(qū)組成 TopicAndPartition 列表夹攒,然后對(duì) TopicAndPartition 列表按照 hashCode 進(jìn)行排序蜘醋,看下面的代碼應(yīng)該會(huì)明白:
val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) =>
info("Consumer %s rebalancing the following partitions for topic %s: %s"
.format(ctx.consumerId, topic, partitions))
partitions.map(partition => {
TopicAndPartition(topic, partition)
})
}.toSeq.sortWith((topicPartition1, topicPartition2) => {
/*
* Randomize the order by taking the hashcode to reduce the likelihood of all partitions of a given topic ending
* up on one consumer (if it has a high enough stream count).
*/
topicPartition1.toString.hashCode < topicPartition2.toString.hashCode
})
最后按照round-robin風(fēng)格將分區(qū)分別分配給不同的消費(fèi)者線程。
在這個(gè)的例子里面咏尝,假如按照 hashCode 排序完的topic-partitions組依次為T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9压语,我們的消費(fèi)者線程排序?yàn)镃1-0, C1-1, C2-0, C2-1,最后分區(qū)分配的結(jié)果為:
- C1-0 將消費(fèi) T1-5, T1-2, T1-6 分區(qū)编检;
- C1-1 將消費(fèi) T1-3, T1-1, T1-9 分區(qū)胎食;
- C2-0 將消費(fèi) T1-0, T1-4 分區(qū);
- C2-1 將消費(fèi) T1-8, T1-7 分區(qū)允懂;
多個(gè)主題的分區(qū)分配和單個(gè)主題類似斥季。遺憾的是,目前我們還不能自定義分區(qū)分配策略累驮,只能通過partition.assignment.strategy參數(shù)選擇 range 或 roundrobin。
本文轉(zhuǎn)載自https://blog.csdn.net/OiteBody/article/details/80595971