紹圣--kafka之消費(fèi)者(四)

消費(fèi)者網(wǎng)絡(luò)客戶端輪詢:ConsumerNetworkClient雅采。ConsumerNetworkClient是對(duì)NetworkClient的封裝爵憎。

客戶端發(fā)送請(qǐng)求后慨亲,不知道服務(wù)端什么時(shí)候返回響應(yīng)。所以客戶端獲取結(jié)果有三種輪詢方式:

1宝鼓,客戶端不阻塞刑棵,設(shè)置超時(shí)時(shí)間為0,表示請(qǐng)求發(fā)送完成后馬上返回到調(diào)用者的主線程中愚铡。

2蛉签,客戶端設(shè)置超時(shí)時(shí)間,如果在指定時(shí)間內(nèi)沒(méi)有結(jié)果茂附,返回返回到調(diào)用者的主線程中正蛙。

3,客戶端設(shè)置超時(shí)時(shí)間為最大值(可以理解為一直阻塞)营曼,如果沒(méi)有結(jié)果乒验,就會(huì)一直阻塞,不會(huì)返回到調(diào)用者的主線程中蒂阱。

NetworkClient發(fā)送請(qǐng)求過(guò)程

1锻全,client.ready(node):客戶端連接上目標(biāo)節(jié)點(diǎn),并準(zhǔn)備好發(fā)送請(qǐng)求录煤。

2岸夯,client.send(request):發(fā)送請(qǐng)求酵颁,將請(qǐng)求設(shè)置到網(wǎng)絡(luò)通道中。

3,client.poll(timeout):客戶端輪詢獲取結(jié)果短纵。

ConsumerNetworkClient發(fā)送請(qǐng)求過(guò)程

1,send():創(chuàng)建客戶端請(qǐng)求抹镊,并緩存到未發(fā)送的請(qǐng)求集合(unsent)中任斋。

2,poll():處理客戶端請(qǐng)求露筒。

3呐伞,trySend():調(diào)用NetworkClient.send(),暫時(shí)把請(qǐng)求放到網(wǎng)絡(luò)通道中慎式。

4伶氢,NetworkClient.poll():真正發(fā)送請(qǐng)求。

trySend()

處理unsent:Map<節(jié)點(diǎn)瘪吏,List<請(qǐng)求對(duì)象>>中保存的請(qǐng)求癣防,把各個(gè)節(jié)點(diǎn)的請(qǐng)求設(shè)置到各個(gè)節(jié)點(diǎn)對(duì)應(yīng)的通道(KafkaChannel)中,并注冊(cè)該通道的寫(xiě)事件肪虎。注:每個(gè)節(jié)點(diǎn)對(duì)應(yīng)的通道一次只會(huì)處理一個(gè)請(qǐng)求劣砍,所以如果一個(gè)節(jié)點(diǎn)的上一個(gè)發(fā)送請(qǐng)求還沒(méi)有發(fā)送,那么當(dāng)前此節(jié)點(diǎn)的當(dāng)前請(qǐng)求就不會(huì)設(shè)置到通道中扇救。成功設(shè)置到節(jié)點(diǎn)對(duì)應(yīng)的通道后刑枝,從集合中刪除香嗓,以防重復(fù)發(fā)送。

NetworkClient.poll()

真正把請(qǐng)求發(fā)送到網(wǎng)絡(luò)中装畅。trySend()方法中靠娱,為節(jié)點(diǎn)對(duì)應(yīng)的通道注冊(cè)了寫(xiě)事件。在輪詢方法中掠兄,寫(xiě)事件準(zhǔn)備就緒像云,處理通道的寫(xiě)操作將數(shù)據(jù)寫(xiě)到網(wǎng)絡(luò)中。trySend()方法可能設(shè)置多個(gè)節(jié)點(diǎn)的請(qǐng)求蚂夕,所以就會(huì)有多個(gè)通道的寫(xiě)事件準(zhǔn)備就緒迅诬,因此輪詢方法中就會(huì)發(fā)送多個(gè)請(qǐng)求。kafka的處理方式不是每次調(diào)用NetworkClient.send()方法就調(diào)用一次NetworkClient.poll()婿牍。而是把所有準(zhǔn)備好的客戶端請(qǐng)求都設(shè)置到對(duì)應(yīng)的網(wǎng)絡(luò)通道后執(zhí)行一次輪詢:把所有寫(xiě)事件準(zhǔn)備就緒的通道找出來(lái)侈贷,執(zhí)行寫(xiě)操作。

NetworkClient:無(wú)鏡--kafka之生產(chǎn)者(三) - 簡(jiǎn)書(shū)?有比較詳細(xì)的介紹等脂。

心跳任務(wù)

每個(gè)消費(fèi)者都需要定時(shí)的向服務(wù)端的協(xié)調(diào)者發(fā)送心跳俏蛮,以表明自己是存活的。如果消費(fèi)者在一段時(shí)間內(nèi)沒(méi)有發(fā)送心跳到服務(wù)端的協(xié)調(diào)者上遥,那么服務(wù)端的協(xié)調(diào)者就會(huì)認(rèn)為消費(fèi)者掛掉搏屑。就會(huì)將掛掉的消費(fèi)者上的分區(qū)分給消費(fèi)組中的其他消費(fèi)者。

發(fā)送心跳是在消費(fèi)者的協(xié)調(diào)者上完成的粉楚,消費(fèi)者在加入消費(fèi)組時(shí)辣恋,啟動(dòng)發(fā)送心跳線程。ConsumerCoordinator.poll-->ensureActiveGroup()-->startHeartbeatThreadIfNeeded()-->HeartbeatThread().start()模软。HeartbeatThread是AbstractCoordinator的內(nèi)部類抑党。HeartbeatThread采用死循環(huán)來(lái)不斷的發(fā)送心跳請(qǐng)求。

發(fā)送心跳請(qǐng)求采用組合模式撵摆,每個(gè)消費(fèi)者都只有一個(gè)心跳任務(wù),心跳對(duì)象記錄了心跳任務(wù)的元數(shù)據(jù)害晦。

public final class Heartbeat {

private final long sessionTimeout;? //?會(huì)話超時(shí)的時(shí)間特铝,超過(guò)表示會(huì)話失敗

private final long heartbeatInterval; //?心跳間隔,表示多久發(fā)送一次心跳

private final long maxPollInterval;

private final long retryBackoffMs;

private volatile long lastHeartbeatSend; //?發(fā)送心跳請(qǐng)求時(shí)壹瘟,記錄發(fā)送時(shí)間

private long lastHeartbeatReceive; //?接收心跳結(jié)果后鲫剿,記錄接收時(shí)間

private long lastSessionReset; //?上一次的會(huì)話重置時(shí)間

private long lastPoll;

private boolean heartbeatFailed;

public boolean shouldHeartbeat(long now) { return timeToNextHeartbeat(now) == 0; }

/*** 計(jì)算下一次發(fā)送心跳的時(shí)間 * * @param now * @return 0:表示立即發(fā)送,不等于:表示離下一次發(fā)送心跳的時(shí)間 */

public long timeToNextHeartbeat(long now) {

// 從上次發(fā)送心跳后到現(xiàn)在一共過(guò)去了多長(zhǎng)時(shí)間

long timeSinceLastHeartbeat = now - Math.max(lastHeartbeatSend, lastSessionReset);

final long delayToNextHeartbeat;

if (heartbeatFailed)

delayToNextHeartbeat = retryBackoffMs;

else

delayToNextHeartbeat = heartbeatInterval;

// 從上次發(fā)送心跳后到現(xiàn)在一共過(guò)去了多長(zhǎng)時(shí)間大于了心跳間隔時(shí)間稻轨,表明要立即發(fā)送心跳灵莲。返回0

if (timeSinceLastHeartbeat > delayToNextHeartbeat)

return 0;

else // 否則返回還有多久發(fā)送下一次心跳請(qǐng)求

return delayToNextHeartbeat - timeSinceLastHeartbeat;

}

}

消費(fèi)者和服務(wù)端的協(xié)調(diào)者進(jìn)行交互,必須確保消費(fèi)者連接上協(xié)調(diào)者所在的節(jié)點(diǎn)殴俱。但是在交互過(guò)程中兩邊都會(huì)出現(xiàn)問(wèn)題政冻。比如協(xié)調(diào)者可能會(huì)掛掉枚抵,那么服務(wù)端應(yīng)該給消費(fèi)組重新選擇一個(gè)協(xié)調(diào)者,那么后面消費(fèi)組里面的消費(fèi)者就需要去連接新的協(xié)調(diào)者了明场。所以在消費(fèi)者的發(fā)送心跳線程里面必須針對(duì)服務(wù)端返回的不同的錯(cuò)誤碼處理不同的業(yè)務(wù):

if (coordinatorUnknown()) { // 沒(méi)有連接上服務(wù)端協(xié)調(diào)者

if (findCoordinatorFuture == null)

lookupCoordinator(); // 發(fā)送GroupCoordinator請(qǐng)求

else

AbstractCoordinator.this.wait(retryBackoffMs);

} else if (heartbeat.sessionTimeoutExpired(now)) { // 在會(huì)話超時(shí)時(shí)間內(nèi)沒(méi)有收到心跳應(yīng)答汽摹,客戶端認(rèn)為協(xié)調(diào)者掛了.

coordinatorDead(); // 處理協(xié)調(diào)者掛掉的邏輯,比如:處理unsent變量中保存的請(qǐng)求的處理器中的onFailure方法

} else if (heartbeat.pollTimeoutExpired(now)) { //?查看消費(fèi)者客戶端的輪詢時(shí)不是超過(guò)了心跳最大的輪詢等待時(shí)間

maybeLeaveGroup(); // 發(fā)送離開(kāi)組請(qǐng)求

} else if (!heartbeat.shouldHeartbeat(now)) { // 現(xiàn)在不需要發(fā)送心跳苦锨,下一次循環(huán)再檢查 AbstractCoordinator.this.wait(retryBackoffMs);

}

消費(fèi)者提交偏移量

消費(fèi)組發(fā)生再平衡時(shí)分區(qū)會(huì)被分配給新的消費(fèi)者逼泣,為了保證新的消費(fèi)者能夠從分區(qū)的上一次消費(fèi)位置繼續(xù)拉取并處理消息的話,那么每個(gè)消費(fèi)者都需要將所消費(fèi)的分區(qū)的消費(fèi)進(jìn)度定時(shí)的同步給消費(fèi)組對(duì)應(yīng)的服務(wù)端協(xié)調(diào)者節(jié)點(diǎn)上舟舒。

在KafkaConsumer中提供了兩種偏移量的提交方式:同步和異步拉庶。

異步

如果用戶設(shè)置了自動(dòng)提交偏移量(enable.auto.commit=true),客戶端在每一次輪詢的時(shí)候秃励,都會(huì)自定提交偏移量氏仗。

KafkaConsumer.poll()-->KafkaConsumer.pollOnce()-->ConsumerCoordinator.poll()-->ConsumerCoordinator.maybeAutoCommitOffsetsAsync()

可以看出提交偏移量請(qǐng)求還是通過(guò)客戶端協(xié)調(diào)者發(fā)送的。每次的輪詢?cè)诎l(fā)送拉取請(qǐng)求之前莺治,在客戶端協(xié)調(diào)者的輪詢方法中廓鞠,除了檢查心跳,就是要提交偏移量谣旁。也就是說(shuō)只要消費(fèi)者要去消費(fèi)消息床佳,就會(huì)執(zhí)行提交偏移量的動(dòng)作(enable.auto.commit=true的前提下)。

疑問(wèn):如果一個(gè)消費(fèi)者消費(fèi)了一次消息之后榄审,就不消費(fèi)了或者就掛機(jī)了砌们。這時(shí)提交偏移量還沒(méi)有提交給服務(wù)端的,那么在再平衡后把此分區(qū)分給了消費(fèi)組中的其他消費(fèi)者后就會(huì)出現(xiàn)重復(fù)消費(fèi)了搁进。當(dāng)然還有一種情況就是一個(gè)主題下的一個(gè)分區(qū)只能被一個(gè)消費(fèi)組中的一個(gè)消費(fèi)者所消費(fèi)浪感,但是可以被其他消費(fèi)組中的消費(fèi)者進(jìn)行消費(fèi),這樣情況是kafka所準(zhǔn)許的饼问,本身就會(huì)出現(xiàn)重復(fù)消費(fèi)的情況影兽,kafka只保證分區(qū)在同一個(gè)消費(fèi)組中的有序性,不保證在不同消費(fèi)組中的有序性莱革,所以以上情況下也就算是一個(gè)消費(fèi)組中的一個(gè)消費(fèi)者掛了或者不消費(fèi)消息了峻堰,對(duì)其他消費(fèi)組是沒(méi)有任何影響的。

那么kafka是如何來(lái)處理這個(gè)問(wèn)題的或者是有沒(méi)有處理這個(gè)問(wèn)題喃盅视?


ConsumerCoordinator.maybeAutoCommitOffsetsAsync()

private void maybeAutoCommitOffsetsAsync(long now) {

if (autoCommitEnabled) {

if (coordinatorUnknown()) {

this.nextAutoCommitDeadline = now + retryBackoffMs;

} else if (now >= nextAutoCommitDeadline) {

this.nextAutoCommitDeadline = now + autoCommitIntervalMs; doAutoCommitOffsetsAsync();

}

}

}

/** * 執(zhí)行提交偏移量請(qǐng)求 */

private void doAutoCommitOffsetsAsync() { commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {

public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {

if (exception != null) {

if (exception instanceof RetriableException)

nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline);

} else {

}

}

});

}

/** * @param offsets 使用分區(qū)的拉取偏移量作為分區(qū)的提交偏移量提交到服務(wù)端中 * @param callback */

public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {

invokeCompletedOffsetCommitCallbacks();

if (!coordinatorUnknown()) { // 確保連接上服務(wù)端的協(xié)調(diào)者捐名,執(zhí)行提交偏移量請(qǐng)求 doCommitOffsetsAsync(offsets, callback);

} else {

// 發(fā)送連接服務(wù)端的協(xié)調(diào)者請(qǐng)求,并在監(jiān)聽(tīng)器中執(zhí)行提交偏移量請(qǐng)求 lookupCoordinator().addListener(new RequestFutureListener<Void>() {

public void onSuccess(Void value) {

doCommitOffsetsAsync(offsets, callback);

}

public void onFailure(RuntimeException e) {

completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets, new RetriableCommitFailedException(e)));

}

});

}

client.pollNoWakeup();

}

private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {

this.subscriptions.needRefreshCommits(); // 通知訂閱狀態(tài)需要拉取提交偏移量 RequestFuture<Void> future = sendOffsetCommitRequest(offsets);

final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback; future.addListener(new RequestFutureListener<Void>() {

public void onSuccess(Void value) {

if (interceptors != null)

interceptors.onCommit(offsets);

completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));

}

public void onFailure(RuntimeException e) {

completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException)); }

});

}

/** * 發(fā)送提交偏移量的請(qǐng)求 * @param 分區(qū)的拉取偏移量作為提交偏移量 */

private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {

if (offsets.isEmpty())

return RequestFuture.voidSuccess();

Node coordinator = coordinator();

if (coordinator == null)

return RequestFuture.coordinatorNotAvailable();

Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());

for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) { OffsetAndMetadata offsetAndMetadata = entry.getValue();

offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData( offsetAndMetadata.offset(), offsetAndMetadata.metadata()));

}

final Generation generation;

if (subscriptions.partitionsAutoAssigned())

generation = generation();

else

generation = Generation.NO_GENERATION;

if (generation == null)

return RequestFuture.failure(new CommitFailedException());

OffsetCommitRequest req = new OffsetCommitRequest( this.groupId, generation.generationId, generation.memberId, OffsetCommitRequest.DEFAULT_RETENTION_TIME, offsetData);

return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req) .compose(new OffsetCommitResponseHandler(offsets));

}

采用組合模式發(fā)送提交偏移量請(qǐng)求(OFFSET_COMMIT)闹击。提交偏移量請(qǐng)求其實(shí)就是把拉取偏移量的值提交到服務(wù)端去保存镶蹋。消費(fèi)者收到提交偏移量請(qǐng)求的響應(yīng)后由OffsetCommitResponseHandler(組合模式中的適配器)處理,更新消費(fèi)者的訂閱狀態(tài)中的提交偏移量。

在組合模式異步請(qǐng)求的監(jiān)聽(tīng)器的回調(diào)方法中贺归,把完成的請(qǐng)求存放到completedOffsetCommits變量中淆两。在ConsumerCoordinator.poll的方法中第一句會(huì)調(diào)用completedOffsetCommits變量中保存完成的提交偏移量請(qǐng)求的callback方法。

以上代碼中offsets參數(shù)來(lái)自于訂閱狀態(tài)(SubscriptionState)的allConsumed()方法牧氮。消費(fèi)者所有消費(fèi)的分區(qū)偏移量實(shí)際上是分區(qū)狀態(tài)對(duì)象(TopicPartitionState)的拉取偏移量(position變量)琼腔,而不是提交偏移量(committed)

拉取偏移量和提交偏移量的關(guān)系

談到拉取偏移量就會(huì)想到拉取請(qǐng)求,發(fā)送一次拉取請(qǐng)求踱葛,在客戶端輪詢方法返回拉取的記錄集之前丹莲,會(huì)計(jì)算出下一次發(fā)送拉取請(qǐng)求時(shí)用到的拉取偏移量的值,并更新分區(qū)狀態(tài)的拉取偏移量尸诽。在這個(gè)時(shí)候并沒(méi)有更新提交偏移量甥材,所以拉取偏移量也能代表分區(qū)的消費(fèi)進(jìn)度。

疑問(wèn):如果客戶端返回的記錄集后面就出現(xiàn)異承院或者宕機(jī)了洲赵。那么最新計(jì)算的拉取偏移量還沒(méi)有賦值給提交偏移量和提交到服務(wù)端的協(xié)調(diào)者節(jié)點(diǎn)中保存,那么等消費(fèi)者重新啟動(dòng)的時(shí)候商蕴,獲取拉取偏移量就是老的叠萍,這樣拉取的消息就是消費(fèi)過(guò)的,出現(xiàn)重復(fù)消費(fèi)绪商。

消費(fèi)者客戶端在輪詢方法中返回記錄集的時(shí)候就計(jì)算出下一次拉取請(qǐng)求的偏移量苛谷,并更新分區(qū)狀態(tài)的拉取偏移量。之后的處理記錄集的業(yè)務(wù)是用戶自己保證格郁。如果在下一次輪詢前腹殿,客戶端掛掉,那么沒(méi)有把已經(jīng)處理過(guò)的偏移量提交到服務(wù)端協(xié)調(diào)者例书,那么等客戶端下一次啟動(dòng)的時(shí)候锣尉,從服務(wù)端協(xié)調(diào)者獲取的偏移量就是老的。這樣就會(huì)出現(xiàn)重復(fù)消費(fèi)决采。這點(diǎn)kafka把處理方式留給了業(yè)務(wù)系統(tǒng)自沧。

拉取記錄集(enable.auto.commit=true):

1,先提交拉取偏移量的值到服務(wù)端的協(xié)調(diào)者树瞭。提交請(qǐng)求成功在回調(diào)方法中更新分區(qū)狀態(tài)的提交偏移量暂幼。注意這里的拉取偏移量要么是從服務(wù)端獲得的拉取偏移量,要么就是上一次拉取到記錄后重新計(jì)算出的拉取偏移量移迫。

2,獲取記錄集:

? ? ? 2-1有記錄:計(jì)算出下一次拉取請(qǐng)求的拉取偏移量并更新分區(qū)狀態(tài)的拉取偏移量的值管行。

? ? ? 2-2無(wú)記錄:使用從服務(wù)端獲得的拉取偏移量創(chuàng)建拉取請(qǐng)求厨埋。

發(fā)送提交拉取偏移量的請(qǐng)求是在發(fā)送拉取請(qǐng)求之前,也就是說(shuō)使用的拉取偏移量在拉取記錄之前就保持到了服務(wù)器的協(xié)調(diào)者中捐顷。試想把這兩個(gè)步驟反過(guò)來(lái)荡陷,先發(fā)送拉取請(qǐng)求再發(fā)送提交拉取偏移量的請(qǐng)求雨效,會(huì)出現(xiàn)什么情況喃?

這個(gè)引出了消息處理語(yǔ)義:至多一次废赞,至少一次徽龟,正好一次。

至多一次:消息最多被處理一次唉地,可能會(huì)丟失据悔,但絕不會(huì)重復(fù)消費(fèi)。

至少一次:消息至少被處理一次耘沼,不可能丟失极颓,但可能會(huì)重復(fù)消費(fèi)。

正好一次:消息正好被處理一次群嗤,不可能丟失菠隆,但絕不會(huì)重復(fù)消費(fèi)。

至多一次

現(xiàn)象:先發(fā)送提交拉取偏移量的請(qǐng)求保存消費(fèi)進(jìn)度狂秘,再獲取記錄集處理消息骇径。這樣可能會(huì)出現(xiàn):消費(fèi)者發(fā)送提交拉取偏移量請(qǐng)求在服務(wù)端保存完消費(fèi)進(jìn)度后,再處理消息之前掛掉者春。那么在再平衡后新的消費(fèi)者獲取的拉取偏移量在這個(gè)位置之前的消息可能沒(méi)有被真正的處理破衔。這樣就是出現(xiàn)消息丟失了(沒(méi)有被處理,消息還在服務(wù)器里)碧查。

kafka實(shí)現(xiàn)至多一次的方式:設(shè)置消費(fèi)者自動(dòng)提交偏移量运敢,并且設(shè)置較短的提交時(shí)間間隔。

至少一次

現(xiàn)象:先獲取記錄集處理消息忠售。再發(fā)送提交拉取偏移量的請(qǐng)求保存消費(fèi)進(jìn)度传惠。這樣可能會(huì)出現(xiàn):消費(fèi)者處理完消息,但是在發(fā)送提交拉取偏移量的請(qǐng)求保存消費(fèi)進(jìn)度的時(shí)候掛了稻扬。那么在再平衡后新的消費(fèi)者獲取的拉取偏移量在這個(gè)位置后面的消息可能被處理過(guò)了卦方,那么新的消費(fèi)者又會(huì)重新處理一次,這樣消息就被重復(fù)處理了泰佳。

kafka實(shí)現(xiàn)至少一次的方式:設(shè)置消費(fèi)者自動(dòng)提交偏移量盼砍,但設(shè)置很長(zhǎng)的提交時(shí)間間隔;或者關(guān)閉消費(fèi)者自動(dòng)提交偏移量逝她,處理完消息后手動(dòng)同步提交偏移量浇坐。

正好一次

正好一次其實(shí)就是保證處理記錄集和保存偏移量的請(qǐng)求必須是一個(gè)原子操作。要么同時(shí)成功要么同時(shí)失敗黔宛。

kafka實(shí)現(xiàn)至少一次的方式:關(guān)閉消費(fèi)者自動(dòng)提交偏移量近刘,訂閱主題時(shí)設(shè)置自定義的消費(fèi)者再平衡監(jiān)聽(tīng)器:發(fā)送再平衡時(shí),獲取偏移量就從關(guān)心數(shù)據(jù)庫(kù)或者是文件中獲取。

同步

消費(fèi)者同步提交偏移量的做法:在最外層用一個(gè)死循環(huán)來(lái)確保必須收到服務(wù)端的響應(yīng)結(jié)果才能結(jié)束觉渴。

public void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets) { invokeCompletedOffsetCommitCallbacks();

if (offsets.isEmpty())

return;

while (true) {

ensureCoordinatorReady();

RequestFuture<Void> future = sendOffsetCommitRequest(offsets);

client.poll(future);

if (future.succeeded()) {

if (interceptors != null)

interceptors.onCommit(offsets);

return;

}

if (!future.isRetriable())

throw future.exception();

time.sleep(retryBackoffMs);

}

}

同步方式提交偏移量通常是存在依賴條件介劫,必須等待偏移量提交完成后才能繼續(xù)往下執(zhí)行。在加入消費(fèi)組或者是重新加入消費(fèi)組的時(shí)候案淋,如果enable.auto.commit=true座韵,那么就會(huì)用阻塞的方式完成一次提交偏移量請(qǐng)求。把自己本地保存的最新的拉取偏移量提交到服務(wù)器端的協(xié)調(diào)者保存踢京。這樣后面分到分區(qū)的消費(fèi)者獲取拉取偏移量就可以從最新的消費(fèi)點(diǎn)開(kāi)始消費(fèi)誉碴。

參考資料:

Kafka技術(shù)內(nèi)幕:圖文詳解Kafka源碼設(shè)計(jì)與實(shí)現(xiàn)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市漱挚,隨后出現(xiàn)的幾起案子翔烁,更是在濱河造成了極大的恐慌,老刑警劉巖旨涝,帶你破解...
    沈念sama閱讀 218,755評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蹬屹,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡白华,警方通過(guò)查閱死者的電腦和手機(jī)慨默,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)弧腥,“玉大人厦取,你說(shuō)我怎么就攤上這事」芴拢” “怎么了虾攻?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,138評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)更鲁。 經(jīng)常有香客問(wèn)我霎箍,道長(zhǎng),這世上最難降的妖魔是什么澡为? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,791評(píng)論 1 295
  • 正文 為了忘掉前任漂坏,我火速辦了婚禮,結(jié)果婚禮上媒至,老公的妹妹穿的比我還像新娘顶别。我一直安慰自己,他們只是感情好拒啰,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布驯绎。 她就那樣靜靜地躺著,像睡著了一般谋旦。 火紅的嫁衣襯著肌膚如雪条篷。 梳的紋絲不亂的頭發(fā)上骗随,一...
    開(kāi)封第一講書(shū)人閱讀 51,631評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音赴叹,去河邊找鬼。 笑死指蚜,一個(gè)胖子當(dāng)著我的面吹牛乞巧,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播摊鸡,決...
    沈念sama閱讀 40,362評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼绽媒,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了免猾?” 一聲冷哼從身側(cè)響起是辕,我...
    開(kāi)封第一講書(shū)人閱讀 39,264評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎猎提,沒(méi)想到半個(gè)月后获三,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,724評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡锨苏,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年疙教,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片伞租。...
    茶點(diǎn)故事閱讀 40,040評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡贞谓,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出葵诈,到底是詐尸還是另有隱情裸弦,我是刑警寧澤,帶...
    沈念sama閱讀 35,742評(píng)論 5 346
  • 正文 年R本政府宣布作喘,位于F島的核電站理疙,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏徊都。R本人自食惡果不足惜沪斟,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望暇矫。 院中可真熱鬧主之,春花似錦、人聲如沸李根。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,944評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)房轿。三九已至粤攒,卻和暖如春所森,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背夯接。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,060評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工焕济, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人盔几。 一個(gè)月前我還...
    沈念sama閱讀 48,247評(píng)論 3 371
  • 正文 我出身青樓晴弃,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親逊拍。 傳聞我的和親對(duì)象是個(gè)殘疾皇子上鞠,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 此篇開(kāi)始進(jìn)入kafka的另外一側(cè):消費(fèi)者。kafka中的消費(fèi)者比生產(chǎn)者要復(fù)雜的多芯丧,里面涉及到的消費(fèi)組芍阎,偏移量等概念...
    紹圣閱讀 1,918評(píng)論 0 0
  • 上回消費(fèi)者已經(jīng)把拉取消息之前的準(zhǔn)備工作已經(jīng)完成了,接下來(lái)就進(jìn)行消息的拉取了缨恒。 輪詢流程 首先看看最外層的poll方...
    紹圣閱讀 511評(píng)論 0 0
  • 消費(fèi)者輪詢通過(guò)拉取器(Fetcher)發(fā)送拉取請(qǐng)求谴咸,拉取器會(huì)調(diào)用消費(fèi)者網(wǎng)絡(luò)客戶端的發(fā)送方法(send)和網(wǎng)絡(luò)輪詢方...
    紹圣閱讀 755評(píng)論 0 0
  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)肿轨,斷路器寿冕,智...
    卡卡羅2017閱讀 134,659評(píng)論 18 139
  • 這是四天 接下來(lái)還有56天 據(jù)說(shuō):“每個(gè)人從0開(kāi)始到成為一個(gè)領(lǐng)域的專家,需要7年的時(shí)間椒袍⊥粘” 有人算了一筆,說(shuō)如果我...
    蔣曉玉閱讀 282評(píng)論 0 2