消費(fèi)者網(wǎng)絡(luò)客戶端輪詢:ConsumerNetworkClient雅采。ConsumerNetworkClient是對(duì)NetworkClient的封裝爵憎。
客戶端發(fā)送請(qǐng)求后慨亲,不知道服務(wù)端什么時(shí)候返回響應(yīng)。所以客戶端獲取結(jié)果有三種輪詢方式:
1宝鼓,客戶端不阻塞刑棵,設(shè)置超時(shí)時(shí)間為0,表示請(qǐng)求發(fā)送完成后馬上返回到調(diào)用者的主線程中愚铡。
2蛉签,客戶端設(shè)置超時(shí)時(shí)間,如果在指定時(shí)間內(nèi)沒(méi)有結(jié)果茂附,返回返回到調(diào)用者的主線程中正蛙。
3,客戶端設(shè)置超時(shí)時(shí)間為最大值(可以理解為一直阻塞)营曼,如果沒(méi)有結(jié)果乒验,就會(huì)一直阻塞,不會(huì)返回到調(diào)用者的主線程中蒂阱。
NetworkClient發(fā)送請(qǐng)求過(guò)程
1锻全,client.ready(node):客戶端連接上目標(biāo)節(jié)點(diǎn),并準(zhǔn)備好發(fā)送請(qǐng)求录煤。
2岸夯,client.send(request):發(fā)送請(qǐng)求酵颁,將請(qǐng)求設(shè)置到網(wǎng)絡(luò)通道中。
3,client.poll(timeout):客戶端輪詢獲取結(jié)果短纵。
ConsumerNetworkClient發(fā)送請(qǐng)求過(guò)程
1,send():創(chuàng)建客戶端請(qǐng)求抹镊,并緩存到未發(fā)送的請(qǐng)求集合(unsent)中任斋。
2,poll():處理客戶端請(qǐng)求露筒。
3呐伞,trySend():調(diào)用NetworkClient.send(),暫時(shí)把請(qǐng)求放到網(wǎng)絡(luò)通道中慎式。
4伶氢,NetworkClient.poll():真正發(fā)送請(qǐng)求。
trySend()
處理unsent:Map<節(jié)點(diǎn)瘪吏,List<請(qǐng)求對(duì)象>>中保存的請(qǐng)求癣防,把各個(gè)節(jié)點(diǎn)的請(qǐng)求設(shè)置到各個(gè)節(jié)點(diǎn)對(duì)應(yīng)的通道(KafkaChannel)中,并注冊(cè)該通道的寫(xiě)事件肪虎。注:每個(gè)節(jié)點(diǎn)對(duì)應(yīng)的通道一次只會(huì)處理一個(gè)請(qǐng)求劣砍,所以如果一個(gè)節(jié)點(diǎn)的上一個(gè)發(fā)送請(qǐng)求還沒(méi)有發(fā)送,那么當(dāng)前此節(jié)點(diǎn)的當(dāng)前請(qǐng)求就不會(huì)設(shè)置到通道中扇救。成功設(shè)置到節(jié)點(diǎn)對(duì)應(yīng)的通道后刑枝,從集合中刪除香嗓,以防重復(fù)發(fā)送。
NetworkClient.poll()
真正把請(qǐng)求發(fā)送到網(wǎng)絡(luò)中装畅。trySend()方法中靠娱,為節(jié)點(diǎn)對(duì)應(yīng)的通道注冊(cè)了寫(xiě)事件。在輪詢方法中掠兄,寫(xiě)事件準(zhǔn)備就緒像云,處理通道的寫(xiě)操作將數(shù)據(jù)寫(xiě)到網(wǎng)絡(luò)中。trySend()方法可能設(shè)置多個(gè)節(jié)點(diǎn)的請(qǐng)求蚂夕,所以就會(huì)有多個(gè)通道的寫(xiě)事件準(zhǔn)備就緒迅诬,因此輪詢方法中就會(huì)發(fā)送多個(gè)請(qǐng)求。kafka的處理方式不是每次調(diào)用NetworkClient.send()方法就調(diào)用一次NetworkClient.poll()婿牍。而是把所有準(zhǔn)備好的客戶端請(qǐng)求都設(shè)置到對(duì)應(yīng)的網(wǎng)絡(luò)通道后執(zhí)行一次輪詢:把所有寫(xiě)事件準(zhǔn)備就緒的通道找出來(lái)侈贷,執(zhí)行寫(xiě)操作。
NetworkClient:無(wú)鏡--kafka之生產(chǎn)者(三) - 簡(jiǎn)書(shū)?有比較詳細(xì)的介紹等脂。
心跳任務(wù)
每個(gè)消費(fèi)者都需要定時(shí)的向服務(wù)端的協(xié)調(diào)者發(fā)送心跳俏蛮,以表明自己是存活的。如果消費(fèi)者在一段時(shí)間內(nèi)沒(méi)有發(fā)送心跳到服務(wù)端的協(xié)調(diào)者上遥,那么服務(wù)端的協(xié)調(diào)者就會(huì)認(rèn)為消費(fèi)者掛掉搏屑。就會(huì)將掛掉的消費(fèi)者上的分區(qū)分給消費(fèi)組中的其他消費(fèi)者。
發(fā)送心跳是在消費(fèi)者的協(xié)調(diào)者上完成的粉楚,消費(fèi)者在加入消費(fèi)組時(shí)辣恋,啟動(dòng)發(fā)送心跳線程。ConsumerCoordinator.poll-->ensureActiveGroup()-->startHeartbeatThreadIfNeeded()-->HeartbeatThread().start()模软。HeartbeatThread是AbstractCoordinator的內(nèi)部類抑党。HeartbeatThread采用死循環(huán)來(lái)不斷的發(fā)送心跳請(qǐng)求。
發(fā)送心跳請(qǐng)求采用組合模式撵摆,每個(gè)消費(fèi)者都只有一個(gè)心跳任務(wù),心跳對(duì)象記錄了心跳任務(wù)的元數(shù)據(jù)害晦。
public final class Heartbeat {
private final long sessionTimeout;? //?會(huì)話超時(shí)的時(shí)間特铝,超過(guò)表示會(huì)話失敗
private final long heartbeatInterval; //?心跳間隔,表示多久發(fā)送一次心跳
private final long maxPollInterval;
private final long retryBackoffMs;
private volatile long lastHeartbeatSend; //?發(fā)送心跳請(qǐng)求時(shí)壹瘟,記錄發(fā)送時(shí)間
private long lastHeartbeatReceive; //?接收心跳結(jié)果后鲫剿,記錄接收時(shí)間
private long lastSessionReset; //?上一次的會(huì)話重置時(shí)間
private long lastPoll;
private boolean heartbeatFailed;
public boolean shouldHeartbeat(long now) { return timeToNextHeartbeat(now) == 0; }
/*** 計(jì)算下一次發(fā)送心跳的時(shí)間 * * @param now * @return 0:表示立即發(fā)送,不等于:表示離下一次發(fā)送心跳的時(shí)間 */
public long timeToNextHeartbeat(long now) {
// 從上次發(fā)送心跳后到現(xiàn)在一共過(guò)去了多長(zhǎng)時(shí)間
long timeSinceLastHeartbeat = now - Math.max(lastHeartbeatSend, lastSessionReset);
final long delayToNextHeartbeat;
if (heartbeatFailed)
delayToNextHeartbeat = retryBackoffMs;
else
delayToNextHeartbeat = heartbeatInterval;
// 從上次發(fā)送心跳后到現(xiàn)在一共過(guò)去了多長(zhǎng)時(shí)間大于了心跳間隔時(shí)間稻轨,表明要立即發(fā)送心跳灵莲。返回0
if (timeSinceLastHeartbeat > delayToNextHeartbeat)
return 0;
else // 否則返回還有多久發(fā)送下一次心跳請(qǐng)求
return delayToNextHeartbeat - timeSinceLastHeartbeat;
}
}
消費(fèi)者和服務(wù)端的協(xié)調(diào)者進(jìn)行交互,必須確保消費(fèi)者連接上協(xié)調(diào)者所在的節(jié)點(diǎn)殴俱。但是在交互過(guò)程中兩邊都會(huì)出現(xiàn)問(wèn)題政冻。比如協(xié)調(diào)者可能會(huì)掛掉枚抵,那么服務(wù)端應(yīng)該給消費(fèi)組重新選擇一個(gè)協(xié)調(diào)者,那么后面消費(fèi)組里面的消費(fèi)者就需要去連接新的協(xié)調(diào)者了明场。所以在消費(fèi)者的發(fā)送心跳線程里面必須針對(duì)服務(wù)端返回的不同的錯(cuò)誤碼處理不同的業(yè)務(wù):
if (coordinatorUnknown()) { // 沒(méi)有連接上服務(wù)端協(xié)調(diào)者
if (findCoordinatorFuture == null)
lookupCoordinator(); // 發(fā)送GroupCoordinator請(qǐng)求
else
AbstractCoordinator.this.wait(retryBackoffMs);
} else if (heartbeat.sessionTimeoutExpired(now)) { // 在會(huì)話超時(shí)時(shí)間內(nèi)沒(méi)有收到心跳應(yīng)答汽摹,客戶端認(rèn)為協(xié)調(diào)者掛了.
coordinatorDead(); // 處理協(xié)調(diào)者掛掉的邏輯,比如:處理unsent變量中保存的請(qǐng)求的處理器中的onFailure方法
} else if (heartbeat.pollTimeoutExpired(now)) { //?查看消費(fèi)者客戶端的輪詢時(shí)不是超過(guò)了心跳最大的輪詢等待時(shí)間
maybeLeaveGroup(); // 發(fā)送離開(kāi)組請(qǐng)求
} else if (!heartbeat.shouldHeartbeat(now)) { // 現(xiàn)在不需要發(fā)送心跳苦锨,下一次循環(huán)再檢查 AbstractCoordinator.this.wait(retryBackoffMs);
}
消費(fèi)者提交偏移量
消費(fèi)組發(fā)生再平衡時(shí)分區(qū)會(huì)被分配給新的消費(fèi)者逼泣,為了保證新的消費(fèi)者能夠從分區(qū)的上一次消費(fèi)位置繼續(xù)拉取并處理消息的話,那么每個(gè)消費(fèi)者都需要將所消費(fèi)的分區(qū)的消費(fèi)進(jìn)度定時(shí)的同步給消費(fèi)組對(duì)應(yīng)的服務(wù)端協(xié)調(diào)者節(jié)點(diǎn)上舟舒。
在KafkaConsumer中提供了兩種偏移量的提交方式:同步和異步拉庶。
異步
如果用戶設(shè)置了自動(dòng)提交偏移量(enable.auto.commit=true),客戶端在每一次輪詢的時(shí)候秃励,都會(huì)自定提交偏移量氏仗。
KafkaConsumer.poll()-->KafkaConsumer.pollOnce()-->ConsumerCoordinator.poll()-->ConsumerCoordinator.maybeAutoCommitOffsetsAsync()
可以看出提交偏移量請(qǐng)求還是通過(guò)客戶端協(xié)調(diào)者發(fā)送的。每次的輪詢?cè)诎l(fā)送拉取請(qǐng)求之前莺治,在客戶端協(xié)調(diào)者的輪詢方法中廓鞠,除了檢查心跳,就是要提交偏移量谣旁。也就是說(shuō)只要消費(fèi)者要去消費(fèi)消息床佳,就會(huì)執(zhí)行提交偏移量的動(dòng)作(enable.auto.commit=true的前提下)。
疑問(wèn):如果一個(gè)消費(fèi)者消費(fèi)了一次消息之后榄审,就不消費(fèi)了或者就掛機(jī)了砌们。這時(shí)提交偏移量還沒(méi)有提交給服務(wù)端的,那么在再平衡后把此分區(qū)分給了消費(fèi)組中的其他消費(fèi)者后就會(huì)出現(xiàn)重復(fù)消費(fèi)了搁进。當(dāng)然還有一種情況就是一個(gè)主題下的一個(gè)分區(qū)只能被一個(gè)消費(fèi)組中的一個(gè)消費(fèi)者所消費(fèi)浪感,但是可以被其他消費(fèi)組中的消費(fèi)者進(jìn)行消費(fèi),這樣情況是kafka所準(zhǔn)許的饼问,本身就會(huì)出現(xiàn)重復(fù)消費(fèi)的情況影兽,kafka只保證分區(qū)在同一個(gè)消費(fèi)組中的有序性,不保證在不同消費(fèi)組中的有序性莱革,所以以上情況下也就算是一個(gè)消費(fèi)組中的一個(gè)消費(fèi)者掛了或者不消費(fèi)消息了峻堰,對(duì)其他消費(fèi)組是沒(méi)有任何影響的。
那么kafka是如何來(lái)處理這個(gè)問(wèn)題的或者是有沒(méi)有處理這個(gè)問(wèn)題喃盅视?
ConsumerCoordinator.maybeAutoCommitOffsetsAsync()
private void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled) {
if (coordinatorUnknown()) {
this.nextAutoCommitDeadline = now + retryBackoffMs;
} else if (now >= nextAutoCommitDeadline) {
this.nextAutoCommitDeadline = now + autoCommitIntervalMs; doAutoCommitOffsetsAsync();
}
}
}
/** * 執(zhí)行提交偏移量請(qǐng)求 */
private void doAutoCommitOffsetsAsync() { commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
if (exception instanceof RetriableException)
nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline);
} else {
}
}
});
}
/** * @param offsets 使用分區(qū)的拉取偏移量作為分區(qū)的提交偏移量提交到服務(wù)端中 * @param callback */
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
invokeCompletedOffsetCommitCallbacks();
if (!coordinatorUnknown()) { // 確保連接上服務(wù)端的協(xié)調(diào)者捐名,執(zhí)行提交偏移量請(qǐng)求 doCommitOffsetsAsync(offsets, callback);
} else {
// 發(fā)送連接服務(wù)端的協(xié)調(diào)者請(qǐng)求,并在監(jiān)聽(tīng)器中執(zhí)行提交偏移量請(qǐng)求 lookupCoordinator().addListener(new RequestFutureListener<Void>() {
public void onSuccess(Void value) {
doCommitOffsetsAsync(offsets, callback);
}
public void onFailure(RuntimeException e) {
completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets, new RetriableCommitFailedException(e)));
}
});
}
client.pollNoWakeup();
}
private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
this.subscriptions.needRefreshCommits(); // 通知訂閱狀態(tài)需要拉取提交偏移量 RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback; future.addListener(new RequestFutureListener<Void>() {
public void onSuccess(Void value) {
if (interceptors != null)
interceptors.onCommit(offsets);
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
}
public void onFailure(RuntimeException e) {
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException)); }
});
}
/** * 發(fā)送提交偏移量的請(qǐng)求 * @param 分區(qū)的拉取偏移量作為提交偏移量 */
private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
if (offsets.isEmpty())
return RequestFuture.voidSuccess();
Node coordinator = coordinator();
if (coordinator == null)
return RequestFuture.coordinatorNotAvailable();
Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) { OffsetAndMetadata offsetAndMetadata = entry.getValue();
offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData( offsetAndMetadata.offset(), offsetAndMetadata.metadata()));
}
final Generation generation;
if (subscriptions.partitionsAutoAssigned())
generation = generation();
else
generation = Generation.NO_GENERATION;
if (generation == null)
return RequestFuture.failure(new CommitFailedException());
OffsetCommitRequest req = new OffsetCommitRequest( this.groupId, generation.generationId, generation.memberId, OffsetCommitRequest.DEFAULT_RETENTION_TIME, offsetData);
return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req) .compose(new OffsetCommitResponseHandler(offsets));
}
采用組合模式發(fā)送提交偏移量請(qǐng)求(OFFSET_COMMIT)闹击。提交偏移量請(qǐng)求其實(shí)就是把拉取偏移量的值提交到服務(wù)端去保存镶蹋。消費(fèi)者收到提交偏移量請(qǐng)求的響應(yīng)后由OffsetCommitResponseHandler(組合模式中的適配器)處理,更新消費(fèi)者的訂閱狀態(tài)中的提交偏移量。
在組合模式異步請(qǐng)求的監(jiān)聽(tīng)器的回調(diào)方法中贺归,把完成的請(qǐng)求存放到completedOffsetCommits變量中淆两。在ConsumerCoordinator.poll的方法中第一句會(huì)調(diào)用completedOffsetCommits變量中保存完成的提交偏移量請(qǐng)求的callback方法。
以上代碼中offsets參數(shù)來(lái)自于訂閱狀態(tài)(SubscriptionState)的allConsumed()方法牧氮。消費(fèi)者所有消費(fèi)的分區(qū)偏移量實(shí)際上是分區(qū)狀態(tài)對(duì)象(TopicPartitionState)的拉取偏移量(position變量)琼腔,而不是提交偏移量(committed)
拉取偏移量和提交偏移量的關(guān)系
談到拉取偏移量就會(huì)想到拉取請(qǐng)求,發(fā)送一次拉取請(qǐng)求踱葛,在客戶端輪詢方法返回拉取的記錄集之前丹莲,會(huì)計(jì)算出下一次發(fā)送拉取請(qǐng)求時(shí)用到的拉取偏移量的值,并更新分區(qū)狀態(tài)的拉取偏移量尸诽。在這個(gè)時(shí)候并沒(méi)有更新提交偏移量甥材,所以拉取偏移量也能代表分區(qū)的消費(fèi)進(jìn)度。
疑問(wèn):如果客戶端返回的記錄集后面就出現(xiàn)異承院或者宕機(jī)了洲赵。那么最新計(jì)算的拉取偏移量還沒(méi)有賦值給提交偏移量和提交到服務(wù)端的協(xié)調(diào)者節(jié)點(diǎn)中保存,那么等消費(fèi)者重新啟動(dòng)的時(shí)候商蕴,獲取拉取偏移量就是老的叠萍,這樣拉取的消息就是消費(fèi)過(guò)的,出現(xiàn)重復(fù)消費(fèi)绪商。
消費(fèi)者客戶端在輪詢方法中返回記錄集的時(shí)候就計(jì)算出下一次拉取請(qǐng)求的偏移量苛谷,并更新分區(qū)狀態(tài)的拉取偏移量。之后的處理記錄集的業(yè)務(wù)是用戶自己保證格郁。如果在下一次輪詢前腹殿,客戶端掛掉,那么沒(méi)有把已經(jīng)處理過(guò)的偏移量提交到服務(wù)端協(xié)調(diào)者例书,那么等客戶端下一次啟動(dòng)的時(shí)候锣尉,從服務(wù)端協(xié)調(diào)者獲取的偏移量就是老的。這樣就會(huì)出現(xiàn)重復(fù)消費(fèi)决采。這點(diǎn)kafka把處理方式留給了業(yè)務(wù)系統(tǒng)自沧。
拉取記錄集(enable.auto.commit=true):
1,先提交拉取偏移量的值到服務(wù)端的協(xié)調(diào)者树瞭。提交請(qǐng)求成功在回調(diào)方法中更新分區(qū)狀態(tài)的提交偏移量暂幼。注意這里的拉取偏移量要么是從服務(wù)端獲得的拉取偏移量,要么就是上一次拉取到記錄后重新計(jì)算出的拉取偏移量移迫。
2,獲取記錄集:
? ? ? 2-1有記錄:計(jì)算出下一次拉取請(qǐng)求的拉取偏移量并更新分區(qū)狀態(tài)的拉取偏移量的值管行。
? ? ? 2-2無(wú)記錄:使用從服務(wù)端獲得的拉取偏移量創(chuàng)建拉取請(qǐng)求厨埋。
發(fā)送提交拉取偏移量的請(qǐng)求是在發(fā)送拉取請(qǐng)求之前,也就是說(shuō)使用的拉取偏移量在拉取記錄之前就保持到了服務(wù)器的協(xié)調(diào)者中捐顷。試想把這兩個(gè)步驟反過(guò)來(lái)荡陷,先發(fā)送拉取請(qǐng)求再發(fā)送提交拉取偏移量的請(qǐng)求雨效,會(huì)出現(xiàn)什么情況喃?
這個(gè)引出了消息處理語(yǔ)義:至多一次废赞,至少一次徽龟,正好一次。
至多一次:消息最多被處理一次唉地,可能會(huì)丟失据悔,但絕不會(huì)重復(fù)消費(fèi)。
至少一次:消息至少被處理一次耘沼,不可能丟失极颓,但可能會(huì)重復(fù)消費(fèi)。
正好一次:消息正好被處理一次群嗤,不可能丟失菠隆,但絕不會(huì)重復(fù)消費(fèi)。
至多一次
現(xiàn)象:先發(fā)送提交拉取偏移量的請(qǐng)求保存消費(fèi)進(jìn)度狂秘,再獲取記錄集處理消息骇径。這樣可能會(huì)出現(xiàn):消費(fèi)者發(fā)送提交拉取偏移量請(qǐng)求在服務(wù)端保存完消費(fèi)進(jìn)度后,再處理消息之前掛掉者春。那么在再平衡后新的消費(fèi)者獲取的拉取偏移量在這個(gè)位置之前的消息可能沒(méi)有被真正的處理破衔。這樣就是出現(xiàn)消息丟失了(沒(méi)有被處理,消息還在服務(wù)器里)碧查。
kafka實(shí)現(xiàn)至多一次的方式:設(shè)置消費(fèi)者自動(dòng)提交偏移量运敢,并且設(shè)置較短的提交時(shí)間間隔。
至少一次
現(xiàn)象:先獲取記錄集處理消息忠售。再發(fā)送提交拉取偏移量的請(qǐng)求保存消費(fèi)進(jìn)度传惠。這樣可能會(huì)出現(xiàn):消費(fèi)者處理完消息,但是在發(fā)送提交拉取偏移量的請(qǐng)求保存消費(fèi)進(jìn)度的時(shí)候掛了稻扬。那么在再平衡后新的消費(fèi)者獲取的拉取偏移量在這個(gè)位置后面的消息可能被處理過(guò)了卦方,那么新的消費(fèi)者又會(huì)重新處理一次,這樣消息就被重復(fù)處理了泰佳。
kafka實(shí)現(xiàn)至少一次的方式:設(shè)置消費(fèi)者自動(dòng)提交偏移量盼砍,但設(shè)置很長(zhǎng)的提交時(shí)間間隔;或者關(guān)閉消費(fèi)者自動(dòng)提交偏移量逝她,處理完消息后手動(dòng)同步提交偏移量浇坐。
正好一次
正好一次其實(shí)就是保證處理記錄集和保存偏移量的請(qǐng)求必須是一個(gè)原子操作。要么同時(shí)成功要么同時(shí)失敗黔宛。
kafka實(shí)現(xiàn)至少一次的方式:關(guān)閉消費(fèi)者自動(dòng)提交偏移量近刘,訂閱主題時(shí)設(shè)置自定義的消費(fèi)者再平衡監(jiān)聽(tīng)器:發(fā)送再平衡時(shí),獲取偏移量就從關(guān)心數(shù)據(jù)庫(kù)或者是文件中獲取。
同步
消費(fèi)者同步提交偏移量的做法:在最外層用一個(gè)死循環(huán)來(lái)確保必須收到服務(wù)端的響應(yīng)結(jié)果才能結(jié)束觉渴。
public void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets) { invokeCompletedOffsetCommitCallbacks();
if (offsets.isEmpty())
return;
while (true) {
ensureCoordinatorReady();
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
client.poll(future);
if (future.succeeded()) {
if (interceptors != null)
interceptors.onCommit(offsets);
return;
}
if (!future.isRetriable())
throw future.exception();
time.sleep(retryBackoffMs);
}
}
同步方式提交偏移量通常是存在依賴條件介劫,必須等待偏移量提交完成后才能繼續(xù)往下執(zhí)行。在加入消費(fèi)組或者是重新加入消費(fèi)組的時(shí)候案淋,如果enable.auto.commit=true座韵,那么就會(huì)用阻塞的方式完成一次提交偏移量請(qǐng)求。把自己本地保存的最新的拉取偏移量提交到服務(wù)器端的協(xié)調(diào)者保存踢京。這樣后面分到分區(qū)的消費(fèi)者獲取拉取偏移量就可以從最新的消費(fèi)點(diǎn)開(kāi)始消費(fèi)誉碴。
參考資料:
Kafka技術(shù)內(nèi)幕:圖文詳解Kafka源碼設(shè)計(jì)與實(shí)現(xiàn)