各類消息中間件對順序消息實現(xiàn)的做法是將具有順序性的一類消息發(fā)往相同的主題分區(qū)中蕾域,只需要將這類消息設置相同的 Key 即可,而 Kafka 會在任意時刻保證一個消費組同時只能有一個消費者監(jiān)聽消費,因此可在消費時按分區(qū)進行順序消費氓英,保證每個分區(qū)的消息具備局部順序性馋吗。由于需要確保分區(qū)消息的順序性,并不能并發(fā)地消費消費胯府,對消費的吞吐量會造成一定的影響介衔。那么,如何在保證消息順序性的前提下骂因,最大限度的提高消費者的消費能力炎咖?
本文將會對 Kafka 消費者拉取消息流程進行深度分析之后,對 Kafka 消費者順序消費線程模型進行一次實踐與優(yōu)化寒波。
Kafka 消費者拉取消息流程分析
在講實現(xiàn) Kafka 順序消費線程模型之前乘盼,我們需要先深入分析 Kafka 消費者的消息拉取機制,只有當你對 Kafka 消費者拉取消息的整個流程有深入的了解之后俄烁,你才能夠很好地理解本次線程模型改造的方案绸栅。
我先給大家模擬一下消息拉取的實際現(xiàn)象,這里 max.poll.records = 500页屠。
1粹胯、消息沒有堆積時:
可以發(fā)現(xiàn),在消息沒有堆積時辰企,消費者拉取時风纠,如果某個分區(qū)沒有的消息不足 500 條,會從其他分區(qū)湊夠 500 條后再返回牢贸。
2竹观、多個分區(qū)都有堆積時:
在消息有堆積時,可以發(fā)現(xiàn)每次返回的都是同一個分區(qū)的消息十减,但經(jīng)過不斷 debug栈幸,消費者在拉取過程中并不是等某個分區(qū)消費完沒有堆積了愤估,再拉取下一個分區(qū)的消息,而是不斷循環(huán)的拉取各個分區(qū)的消息速址,但是這個循環(huán)并不是說分區(qū) p0 拉取完 500 條玩焰,后面一定會拉取分區(qū) p1 的消息,很有可能后面還會拉取 p0 分區(qū)的消息芍锚,為了弄明白這種現(xiàn)象昔园,我仔細閱讀了相關源碼。
org.apache.kafka.clients.consumer.KafkaConsumer#poll
<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n376" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-size: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-top-left-radius: 3px; border-top-right-radius: 3px; border-bottom-right-radius: 3px; border-bottom-left-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; background-position: inherit inherit; background-repeat: inherit inherit;">private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
try {
// poll for new data until the timeout expires
do {
// 客戶端拉取消息核心邏輯
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
if (!records.isEmpty()) {
// 在返回數(shù)據(jù)之前, 發(fā)送下次的 fetch 請求, 避免用戶在下次獲取數(shù)據(jù)時線程阻塞
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
// 調用 ConsumerNetworkClient#poll 方法將 FetchRequest 發(fā)送出去并炮。
client.pollNoWakeup();
}
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
} while (timer.notExpired());
return ConsumerRecords.empty();
} finally {
release();
}
}</pre>
<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n378" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-size: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-top-left-radius: 3px; border-top-right-radius: 3px; border-bottom-right-radius: 3px; border-bottom-left-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; background-position: inherit inherit; background-repeat: inherit inherit;">consumer.poll(Duration.ofMillis(3000));</pre>
<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n385" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-size: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-top-left-radius: 3px; border-top-right-radius: 3px; border-bottom-right-radius: 3px; border-bottom-left-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; background-position: inherit inherit; background-repeat: inherit inherit;">public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
int recordsRemaining = maxPollRecords;
try {
while (recordsRemaining > 0) {
// 如果當前獲取消息的 PartitionRecords 為空默刚,或者已經(jīng)拉取完畢
// 則需要從 completedFetches 重新獲取 completedFetch 并解析成 PartitionRecords
if (nextInLineRecords == null || nextInLineRecords.isFetched) {
// 如果上一個分區(qū)緩存中的數(shù)據(jù)已經(jīng)拉取完了,直接中斷本次循環(huán)拉取逃魄,并返回空的消息列表
// 直至有緩存數(shù)據(jù)為止
CompletedFetch completedFetch = completedFetches.peek();
if (completedFetch == null) break;
try {
// CompletedFetch 即拉取消息的本地緩存數(shù)據(jù)
// 緩存數(shù)據(jù)中 CompletedFetch 解析成 PartitionRecords
nextInLineRecords = parseCompletedFetch(completedFetch);
} catch (Exception e) {
// ...
}
completedFetches.poll();
} else {
// 從分區(qū)緩存中獲取指定條數(shù)的消息
List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
// ...
fetched.put(partition, records);
recordsRemaining -= records.size();
}
}
}
} catch (KafkaException e) {
// ...
}
return fetched;
}</pre>
<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n394" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-size: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-top-left-radius: 3px; border-top-right-radius: 3px; border-bottom-right-radius: 3px; border-bottom-left-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; background-position: inherit inherit; background-repeat: inherit inherit;">public synchronized int sendFetches() {
// 解析本次可拉取的分區(qū)
Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();
for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
final Node fetchTarget = entry.getKey();
final FetchSessionHandler.FetchRequestData data = entry.getValue();
// 構建請求對象
final FetchRequest.Builder request = FetchRequest.Builder
.forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
.isolationLevel(isolationLevel)
.setMaxBytes(this.maxBytes)
.metadata(data.metadata())
.toForget(data.toForget());
// 發(fā)送請求荤西,但不是真的發(fā)送,而是將請求保存在 unsent 中
client.send(fetchTarget, request)
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) {
synchronized (Fetcher.this) {
?
// ... ...
?
// 創(chuàng)建 CompletedFetch, 并緩存到 completedFetches 隊列中
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
resp.requestHeader().apiVersion()));
}
?
}
}
// ... ...
});
}
return fetchRequestMap.size();
}</pre>
<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n398" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-size: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-top-left-radius: 3px; border-top-right-radius: 3px; border-bottom-right-radius: 3px; border-bottom-left-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; background-position: inherit inherit; background-repeat: inherit inherit;">private List<TopicPartition> fetchablePartitions() {
Set<TopicPartition> exclude = new HashSet<>();
List<TopicPartition> fetchable = subscriptions.fetchablePartitions();
if (nextInLineRecords != null && !nextInLineRecords.isFetched) {
exclude.add(nextInLineRecords.partition);
}
for (CompletedFetch completedFetch : completedFetches) {
exclude.add(completedFetch.partition);
}
fetchable.removeAll(exclude);
return fetchable;
}</pre>
<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="" cid="n47" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-size: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-top-left-radius: 3px; border-top-right-radius: 3px; border-bottom-right-radius: 3px; border-bottom-left-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; background-position: inherit inherit; background-repeat: inherit inherit;">final CountDownLatch countDownLatch = new CountDownLatch(records.count());</pre>
<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n443" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-size: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-top-left-radius: 3px; border-top-right-radius: 3px; border-bottom-right-radius: 3px; border-bottom-left-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; background-position: inherit inherit; background-repeat: inherit inherit;">// 暫停分區(qū)消費(即暫停該分區(qū)發(fā)送拉取消息請求)
org.apache.kafka.clients.consumer.KafkaConsumer#pause
// 恢復分區(qū)消費(即恢復該分區(qū)發(fā)送拉取消息請求)
org.apache.kafka.clients.consumer.KafkaConsumer#resume</pre>
<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n455" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-size: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-top-left-radius: 3px; border-top-right-radius: 3px; border-bottom-right-radius: 3px; border-bottom-left-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; background-position: inherit inherit; background-repeat: inherit inherit;">private boolean isFetchable() {
return !paused && hasValidPosition();
}</pre>
<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="" cid="n969" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-size: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-top-left-radius: 3px; border-top-right-radius: 3px; border-bottom-right-radius: 3px; border-bottom-left-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; background-position: inherit inherit; background-repeat: inherit inherit;">KafkaConsumer is not safe for multi-threaded access</pre>
ZMS GitHub 地址:https://github.com/ZTO-Express/zms
同時希望更多的開源愛好者加入到該項目中伍俘,共同打造一體化的智能消息運維平臺邪锌。
中通科技正式開源內(nèi)部的消息 Pass 云平臺化產(chǎn)品 ZMS(ZTO Messaging Service),它可以屏蔽底層消息中間件類型癌瘾,封裝了包括 Kafka 消費線模型在內(nèi)的具體實現(xiàn)觅丰,提供統(tǒng)一的對外 API,彌補了 Kafka 消費者這部分支持的不足妨退。同時還提供了通過唯一標識動態(tài)路由消息妇萄,為開發(fā)運維人員提供自動化部署運維集群,主題咬荷、消費組申請與審批冠句、實時監(jiān)控、自動告警萍丐、容災遷移等功能轩端。
我們知道 RocketMQ 本身已經(jīng)實現(xiàn)了具體的消費線程模型,用戶不需要關心具體實現(xiàn)逝变,只需要實現(xiàn)消息消費邏輯即可基茵,而 Kafka 消息者僅提供 KafkaConsumer#poll 一個方法,消費線程模型的實現(xiàn)則完全交由用戶去實現(xiàn)壳影。
寫在最后
通過本文深度分析拱层,我們已經(jīng)認識到順序消息會給消費吞吐量帶來怎么樣的影響,因此用戶在業(yè)務的實現(xiàn)上不能重度依賴順序消費去實現(xiàn)宴咧,能避免則避免根灯,如果一定要使用到順序消費,需要知道 Kafka 并不能保證嚴格的順序消費,在消費組重平衡過程中很可能就會將消息的順序性打亂烙肺,而且順序消費會影響消費吞吐量纳猪,用戶需要權衡這種需求的利弊。
總結
通過以上場景分析桃笙,該優(yōu)化方案不是提高順序消費吞吐量的銀彈氏堤,它有很大的局限性,用戶在業(yè)務的實現(xiàn)上不能重度依賴順序消費去實現(xiàn)搏明,以免影響業(yè)務性能上的需求鼠锈。
優(yōu)化前,ZMS 可保證整個分區(qū)消息的順序性星著,優(yōu)化后可根據(jù)消息 Key 在分區(qū)的基礎上不打亂相同 Key 消息的順序性前提下進行并發(fā)消費购笆,有效地提升了單分區(qū)的消費吞吐量;優(yōu)化前虚循,有很大的概率會退化成同一時刻單線程消費同欠,優(yōu)化后盡可能至少保證每個分區(qū)一條線程消費,情況好的時候每個分區(qū)可多條線程消費邮丰。
綜合對比:
2行您、如果該分區(qū)相同 Key 的消息過于集中,會導致每次拉取都是相同 key 的一批消息剪廉,同樣并行度退化成 orderlyConsumePartitionParallelism = 1。
1炕檩、如果該分區(qū)所有消息的 Key 都相同斗蒋,則消費的 Key 取模都分配都同一條線程當中,并行度退化成 orderlyConsumePartitionParallelism = 1笛质;
注意泉沾,當 orderlyConsumePartitionParallelism > 1 時,分區(qū)消費線程的有效使用率取決于該分區(qū)消息的 Key:
以上線程模型妇押,需要增加一個參數(shù) orderlyConsumePartitionParallelism跷究,用于設置分區(qū)消費并行度,假設某個消費組被分配 5 個分區(qū)進行消費敲霍,則每個分區(qū)默認啟動一條線程消費俊马,一共 5 * 1 = 5 條消費線程,當 orderlyConsumePartitionParallelism = 3肩杈,則每個分區(qū)啟動 3 條線程消費柴我,一共 5 * 3 = 15 條消費線程。orderlyConsumePartitionParallelism = 1 時扩然,則說明該分區(qū)所有消息都處在順序(串行)消費艘儒;當 orderlyConsumePartitionParallelism > 1 時,則根據(jù)分區(qū)消息的 Key 進行取模分配線程消費,保證不了整個分區(qū)順序消費界睁,但保證相同 Key 的消息順序消費觉增。
以上優(yōu)化改造的核心是在不打亂消息順序的前提下利用消息 Key 盡可能地并發(fā)消費,但如果遇到分區(qū)中的消息都是相同 Key翻斟,并且在有一定的積壓下每次拉取都是同一個分區(qū)的消息時逾礁,以上模型可能沒有理想情況下的那么好。這時是否可以將 fetch.max.bytes 與 max.partition.fetch.bytes 參數(shù)設置小一點杨赤,讓每個分區(qū)的本地緩存都不足 500 條敞斋,這樣每次 poll 的消息列表都可以包含多個分區(qū)的消息了,但這樣又會導致 RPC 請求增多疾牲,這就需要針對業(yè)務消息大小植捎,對這些參數(shù)進行調優(yōu)。
因此我們只需要利用好這個特性阳柔,就可以實現(xiàn)拉取限流焰枢,消費者主線程的 Comsumer#poll 方法依然是異步不斷地從緩存中獲取消息,同時不會造成兩次 poll 之間的時間過大導致消費者被踢出消費組舌剂。
只需要確保 KafkaConsumer 相關方法在 KafkaConsumer#poll 方法線程中調用即可济锄,具體做法可以設置一個線程安全上下文容器,異步線程操作 KafkaConsumer 相關方法是霍转,只需要將具體的分區(qū)放到上下文容器即可荐绝,后續(xù)統(tǒng)一由 poll 線程執(zhí)行。
由于 KafkaConsumer 是非線程安全的避消,如果我們在異步線程 KafkaConsumer 相關的類低滩,會報如下錯誤:
org.apache.kafka.clients.consumer.internals.SubscriptionState.TopicPartitionState#isFetchable
以上兩個方法,其實就是改變了消費者的訂閱分區(qū)的狀態(tài)值 paused岩喷,當 paused = true 時恕沫,暫停分區(qū)消費,當 paused = false 時纱意,恢復分區(qū)消費婶溯,這個參數(shù)是在哪里使用到呢?上面在分析 Kafka 消費者拉取消息流程時我們有提到發(fā)送拉取請求之前偷霉,會對可拉取的分區(qū)進行篩選迄委,其中一個條件即分區(qū) paused = false:
上面在分析 Kafka 消費者拉取消息流程時,我們知道消費者在發(fā)送拉取請求時腾它,首先會判斷本地緩存中是否存在該分區(qū)的緩存跑筝,如果存在,則不發(fā)送拉取請求瞒滴,但由于 ZMS 需要改造成異步拉取的形式曲梗,由于 Comsumer#poll 不再等待消息消費完再進行下一輪拉取赞警,因此 Kafka 的本地緩存中幾乎不會存在數(shù)據(jù)了,導致 Kafka 每次都會發(fā)送拉取請求虏两,相當于將 Kafka 的本地緩存放到 ZMS 中愧旦,因此我們需要 ZMS 層面上對消息拉取進行限流,Kafka 消費者有兩個方法可以設置訂閱的分區(qū)是否可以發(fā)送拉取請求:
異步拉取有個問題定罢,就是如果節(jié)點消費跟不上笤虫,而拉取消息過多地保存在本地,很可能會造成內(nèi)存溢出祖凫,因此我們需要對消息拉取進行限流琼蚯,當本地消息緩存量達到一定量時,阻止消息拉取惠况。
3遭庶、異步拉取與限流
由于 ZMS 目前是手動提交位移,目前每次拉取消息必須先消費完才能進行位移提交稠屠,既然已經(jīng)對分區(qū)消息進行指定的線程池消費了峦睡,由于分區(qū)之間的位移先后提交不影響,那么我們可以將位移提交交給每個分區(qū)進行管理权埠,這樣拉取主線程不必等到是否消費完才進行下一輪的消息拉取榨了。
2、細化位移提交粒度
之前的做法是將每個分區(qū)單獨一條線程消費攘蔽,無法再繼續(xù)在分區(qū)之上增加消費能力龙屉,我們知道業(yè)務方發(fā)送順序消息時,會將同一類型具有順序性的消息給一個相同的 Key满俗,以保證這類消息發(fā)送到同一個分區(qū)進行消費叔扼,從而達到消息順序消費的目的,而同一個分區(qū)會接收多種類型(即不同 Key)的消息漫雷,每次拉取的消息具有很大可能是不同類型的,那么我們就可以將同一個分區(qū)的消息鳍咱,分配一個獨立的線程池降盹,再利用消息 Key 進行取模放入對應的線程中消費集畅,達到并發(fā)消費的目的则果,且不打亂消息的順序性。
1枫慷、細化消息順序粒度
經(jīng)過對 ZMS 的消費線程模型以及對 Kafka 消費者拉取消息流程的深入了解之后丑念,我想到了如下幾個方面對 ZMS 的消費線程模型進行優(yōu)化:
如何提高 Kafka 順序消費的并發(fā)度涡戳?
在消息流量大的時候,順序消息消費時卻退化成單線程消費了脯倚。
那如果每個分區(qū)的積壓都超過了 500 條消息呢渔彰?這種實際的情況會更加多嵌屎,因為消息中間件其中一個重要功能就是用于流量削峰,流量洪峰那段時間積壓幾百上千萬條消息還是經(jīng)常能夠遇到的恍涂,那么此時每次拉取的消息中宝惰,很大概率就只剩下一個分區(qū)了,我用如下圖表示:
以上再沧,由于某些分區(qū)的消息堆積量少于 500 條(Kafka 默認每次從 Broker 拉取 500 條消息)尼夺,因此會繼續(xù)從其它分區(qū)湊夠 500 條消息,此時拉取的 500 條消息會包含 3 個分區(qū)的消息炒瘸,ZMS 根據(jù)利用分區(qū)取模將同一個分區(qū)的消息放到指定的線程池中(線程池只有一條線程)進行消費淤堵,以上圖來看,總共有 3 條線程在消費本次拉取的 500 條消息顷扩。
以上就是目前 ZMS 順序消費的線程模型拐邪,用圖表示以上代碼邏輯:
com.zto.consumer.KafkaConsumerProxy#submitRecords
消費邏輯中,在 finally 進行 countDown 操作屎即,最后會在本次消費主線程當中阻塞等待本次消息消費完成:
為了保證手動提交位移的正確性庙睡,我們必須保證本次拉取的消息消費完之后才會進行位移提交,因此 ZMS 在消費前會創(chuàng)建一個 count 為本次消息數(shù)量的 CountDownLatch:
ZMS 會對消息分區(qū)進行取模技俐,根據(jù)取模后的序號從線程池列表緩存中獲取一個線程池乘陪,從而使得相同分區(qū)的消息會被分配到相同線程池中執(zhí)行,對于順序消費來說至關重要雕擂,前面我也說了啡邑,當用戶配置了順序消費時,每個線程池只會分配一個線程井赌,如果相同分區(qū)的消息分配到同一個線程池中執(zhí)行谤逼,也就意味著相同分區(qū)的消息會串行執(zhí)行,實現(xiàn)消息消費的順序性仇穗。
com.zto.consumer.KafkaConsumerProxy#submitRecords
首先在初始化的時候流部,會對消費線程池進行初始化,具體是根據(jù) threadsNumMax 的數(shù)量創(chuàng)建若干個單個線程的線程池纹坐,單個線程的線程池就是為了保證每個分區(qū)取模后拿到線程池是串行消費的枝冀,但這里創(chuàng)建 threadsNumMax 個線程池是不合理的,后面我會說到耘子。
com.zto.consumer.KafkaConsumerProxy#addUserDefinedProperties
接下來我們來看下 ZMS 是怎么實現(xiàn)順序消費線程模型的果漾,目前 ZMS 的順序消費線程模型為每個分區(qū)單線程消費模式:
那么,如果在使用第二種消費模型的前提下谷誓,實現(xiàn)消息順序消費呢绒障?
這種消費模型獎 KafkaConsumer 實例與消息消費邏輯解耦,我們不需要創(chuàng)建多個 KafkaConsumer 實例就可進行多線程消費捍歪,還可根據(jù)消費的負載情況動態(tài)調整 worker 線程户辱,具有很強的獨立擴展性鸵钝,在公司內(nèi)部使用的多線程消費模型就是用的單 KafkaConsumer 實例 + 多 worker 線程模型。但是通常情況下焕妙,這種消費模型無法保證消費的順序性蒋伦。
2、單 KafkaConsumer 實例 + 多 worker 線程
但是缺點是無法提升單個分區(qū)的消費能力焚鹊,如果一個主題分區(qū)數(shù)量很多痕届,只能通過增加 KafkaConsumer 實例提高消費能力,這樣一來線程數(shù)量過多末患,導致項目 Socket 連接開銷巨大研叫,項目中一般不用該線程模型去消費。
這種消費模型創(chuàng)建多個 KafkaConsumer 對象璧针,每個線程維護一個 KafkaConsumer嚷炉,從而實現(xiàn)線程隔離消費,由于每個分區(qū)同一時刻只能有一個消費者消費探橱,所以這種消費模型天然支持順序消費申屹。
1、每個線程維護一個 KafkaConsumer
kafka 的消費類 KafkaConsumer 是非線程安全的隧膏,因此用戶無法在多線程中共享一個 KafkaConsumer 實例哗讥,且 KafkaConsumer 本身并沒有實現(xiàn)多線程消費邏輯,如需多線程消費胞枕,還需要用戶自行實現(xiàn)杆煞,在這里我會講到 Kafka 兩種多線程消費模型:
Kafka 順序消費線程模型的實現(xiàn)
從以上流程可看出,Kafka 消費者自身已經(jīng)實現(xiàn)了拉取限流的機制腐泻。
假設某消費者監(jiān)聽三個分區(qū)决乎,每個分區(qū)每次從 Broker 中拉取 4 條消息,用戶每次從本地緩存中獲取 2 條消息:
為了更加清晰的表達這段邏輯派桩,我舉個例子并將整個流程用圖表達出來:
當緩存中還存在中還存在某個分區(qū)的消息數(shù)據(jù)時构诚,消費者不會繼續(xù)對該分區(qū)進行拉取請求,直到該分區(qū)的本地緩存被消費完铆惑,才會繼續(xù)發(fā)送拉取請求唤反。
我們可以很清楚的得出結論:
nextInLineRecords 即我們上面提到的根據(jù)某個分區(qū)緩存 CompletedFetch 解析得到的,如果 nextInLineRecords 中的緩存還沒拉取完鸭津,則不從 broker 中拉取消息了,以及如果此時 completedFetches 緩存中存在該分區(qū)的緩存肠缨,也不進行拉取消息逆趋。
org.apache.kafka.clients.consumer.internals.Fetcher#fetchablePartitions
prepareFetchRequests 方法會調用 Fetcher#fetchablePartitions 篩選可拉取的分區(qū),我們來看下 Kafka 消費者是如何進行篩選的:
以上代碼邏輯很好理解晒奕,在發(fā)送拉取請求前闻书,先檢查哪些分區(qū)可拉取名斟,接著為每個分區(qū)構建一個 FetchRequest 對象,F(xiàn)etchRequest 中的 minBytes 和 maxBytes魄眉,分別可通過 fetch.min.bytes 和 fetch.max.bytes 參數(shù)設置砰盐。這也是每次從 Broker 中拉取的消息不一定等于 max.poll.records 的原因。
org.apache.kafka.clients.consumer.internals.Fetcher#sendFetches
發(fā)送拉取請求邏輯:
那 Kafka 消費者是如何循環(huán)地拉取它監(jiān)聽的分區(qū)呢坑律?我們接著往下分析岩梳。
答案顯然不會,不信你打開 Kafka-manager 觀察每個分區(qū)的消費進度情況晃择,每個分區(qū)都會有消費者在消費中冀值。
我們再想一下,假設某個分區(qū)的消息一直都處于堆積狀態(tài)宫屠,Kafka 會每次都拉取這個分區(qū)直至將該分區(qū)消費完畢嗎列疗?(根據(jù)假設,Kafka 消費者每次都會從這個分區(qū)拉取消息浪蹂,并將消息存到分區(qū)關聯(lián)的 CompletedFetch 緩存中抵栈,根據(jù)以上代碼邏輯,nextInLineRecords 一直處于還沒拉取完的狀態(tài)坤次,導致每次拉取都會從該分區(qū)中拉取消息古劲。)
以上代碼即可解釋為什么消息有堆積的情況下,每次拉取的消息很大概率是同一個分區(qū)的消息浙踢,因為緩存 CompletedFetch 緩存中的消息很大概率會多余每次拉取消息數(shù)量绢慢,Kafka 客戶端每次從 Broker 拉取的消息數(shù)據(jù)并不是通過 max.poll.records 決定的,該參數(shù)僅決定用戶每次從本地緩存中獲取多少條數(shù)據(jù)洛波,真正決定從 Broker 拉取的消息數(shù)據(jù)量是通過 fetch.min.bytes胰舆、max.partition.fetch.bytes、fetch.max.bytes 等參數(shù)決定的蹬挤。
maxPollRecords 為本次拉取的最大消息數(shù)量缚窿,該值可通過 max.poll.records 參數(shù)配置,默認為 500 條焰扳,該方法每次從 completedFetches 中取出一個 CompletedFetch 并解析成可以拉取的 PartitionRecords 對象倦零,即方法中的 nextInLineRecords,請注意吨悍,PartitionRecords 中的消息數(shù)量可能大與 500 條扫茅,因此可能本次可能一次性從 PartitionRecords 獲取 500 條消息后即返回,如果 PartitionRecords 中消息數(shù)量不足 500 條育瓜,會從 completedFetches 緩存中取出下一個要拉取的分區(qū)消息葫隙,recordsRemaining 會記錄本次剩余還有多少消息沒拉取,通過循環(huán)不斷地從 completedFetches 緩存中取消息躏仇,直至 recordsRemaining 為 0恋脚。
completedFetches 是拉取到的消息緩存腺办,以上代碼邏輯就是圍繞著如何從 completedFetches 緩存中獲取消息的,從以上代碼邏輯可以看出:
pollForFetches 方法會調用 Fetcher#fetchedRecords 方法從緩存中獲取并解析消息:
從 KafkaConsumer#poll 方法源碼可以看出來糟描,其實 Kafka 消費者在拉取消息過程中怀喉,有兩條線程在工作,其中用戶主線程調用 pollForFetches 方法從緩存中獲取消息消費船响,在獲取消息后躬拢,會再調用 ConsumerNetworkClient#poll 方法從 Broker 發(fā)送拉取請求,然后將拉取到的消息緩存到本地灿意,這里為什么在拉取完消息后估灿,會主動調用 ConsumerNetworkClient#poll 方法呢?我想這里的目的是為了下次 poll 的時候可以立即從緩存中拉取消息缤剧。
fetcher.sendFetches() 在構建 FetchRequest 前馅袁,會對當前可拉取分區(qū)進行篩選,而這個也是決定多分區(qū)拉取消息規(guī)律的核心荒辕,后面我會講到汗销。
fetcher.sendFetches() 經(jīng)過源碼閱讀后,得知該方法目的是為了構建拉取請求 FetchRequest 并進行發(fā)送抵窒,但是這里的發(fā)送并不是真正的發(fā)送弛针,而是將 FetchRequest 請求對象存放在 unsend 緩存當中,然后會在 ConsumerNetworkClient#poll 方法調用時才會被真正地執(zhí)行發(fā)送李皇。
pollForFetches 方法是客戶端拉取消息核心邏輯削茁,但并不是真正去 broker 中拉取,而是從緩存中去獲取消息掉房。在 pollForFetches 拉取消息后茧跋,如果消息不為零,還會調用 fetcher.sendFetches() 與 client.pollNoWakeup()卓囚,調用這兩個方法究竟有什么用呢瘾杭?
從以上代碼邏輯可以看出來,用戶給定的這個時間哪亿,目的是為了等待消息湊夠 max.poll.records 條消息后再返回粥烁,即使消息條數(shù)不夠 max.poll.records 消息,時間到了用戶給定的等待時間后蝇棉,也會返回讨阻。
我們使用 Kafka consumer 進行消費的時候通常會給一個時間,比如:
作者簡介
作者張乘輝篡殷,擅長消息中間件技能变勇,負責公司百萬 TPS 級別 Kafka 集群的維護,作者維護的公號「后端進階」不定期分享 Kafka、RocketMQ 系列不講概念直接真刀真槍的實戰(zhàn)總結以及細節(jié)上的源碼分析搀绣;同時作者也是阿里開源分布式事務框架 Seata Contributor,因此也會分享關于 Seata 的相關知識戳气;當然公號也會分享 WEB 相關知識比如 Spring 全家桶等链患。內(nèi)容不一定面面俱到,但一定讓你感受到作者對于技術的追求是認真的瓶您!
公眾號:后端進階
GitHub:https://github.com/objcoding/