Consumer消費消息之后不需要手動提交区端,consumer客戶端會自動提交已經(jīng)消費的消息的offset芭碍。
相關(guān)參數(shù)設(shè)置:
// 是否自動提交偏移量
props.put("enable.auto.commit", "true");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自動提交偏移量
props.put("auto.commit.interval.ms", "5000");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000"); // 默認(rèn)為5000ms
自動提交可能執(zhí)行的時機(jī)
1颓屑、消費者手動指定自己需要消費的分區(qū)(此處是異步提交)
調(diào)用棧為:
KafkaConsumer#assign
ConsumerCoordinator#maybeAutoCommitOffsetsNow
public void maybeAutoCommitOffsetsNow() {
// 必須要設(shè)置自動提交且已經(jīng)和服務(wù)端的協(xié)調(diào)者建立連接
// 1官脓、如果消費者還沒有開始消費指定分區(qū)是不會觸發(fā)自動提交位移
// 2仿贬、如果消費者在消費的過程中受到一條KafkaConsumer#assign的指令茁瘦,此時消
// 費訂閱的分區(qū)極有可能發(fā)生改變品抽,所以一定要將之前訂閱的分區(qū)相關(guān)信息提交
// 給服務(wù)端的協(xié)調(diào)者。
if (autoCommitEnabled && !coordinatorUnknown())
doAutoCommitOffsetsAsync();
}
}
2甜熔、消費者拉取消息的時候(此處是異步提交)
調(diào)用棧為:
KafkaConsumer#poll
KafkaConsumer#pollOnce
ConsumerCoordinator#poll
ConsumerCoordinator#maybeAutoCommitOffsetsAsync
private void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled) {
if (coordinatorUnknown()) {
this.nextAutoCommitDeadline = now + retryBackoffMs;
// 并不是每次poll的時候會調(diào)用自動提交位移
// 條件為:now > oldNow + auto.commit.interval.ms
// 觸發(fā)條件和用戶設(shè)置的auto.commit.interval.ms有關(guān)圆恤,設(shè)置時間長
// 則觸發(fā)的次數(shù)少,設(shè)置時間短則觸發(fā)次數(shù)多
} else if (now >= nextAutoCommitDeadline) {
this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
doAutoCommitOffsetsAsync();
}
}
}
3腔稀、消費者以消費者組模式啟動盆昙,加入組重新rebalance之前(此處是同步提交)
調(diào)用棧為:
KafkaConsumer#poll
KafkaConsumer#pollOnce
ConsumerCoordinator#poll
AbstractCoordinator#ensureActiveGroup
AbstractCoordinator#joinGroupIfNeeded
AbstractCoordinator#onJoinPrepare
ConsumerCoordinator#maybeAutoCommitOffsetsSync
只要開啟了自動提交,此處是一定會向協(xié)調(diào)者同步提交位移焊虏,因為需要重新rebalance淡喜,消費者組中各個消費者的分區(qū)既有可能會發(fā)生改變,重新消費之前一定要獲取最新的唯一诵闭,盡最大努力避免重復(fù)消費炼团。
4、消費者關(guān)閉的時候(此處是同步提交)
調(diào)用棧為:
KafkaConsumer#close
ConsumerCoordinator#close
關(guān)閉的時候肯定是要同步提交消費位移的疏尿。