聊聊kafka client的auto commit

本文主要聊一聊kafka client的auto commit的實現(xiàn)

maven

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.1</version>
        </dependency>

poll

kafka-clients-0.10.2.1-sources.jar!/org/apache/kafka/clients/consumer/KafkaConsumer.java

public ConsumerRecords<K, V> poll(long timeout) {
        acquire();
        try {
            if (timeout < 0)
                throw new IllegalArgumentException("Timeout must not be negative");

            if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");

            // poll for new data until the timeout expires
            long start = time.milliseconds();
            long remaining = timeout;
            do {
                Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
                if (!records.isEmpty()) {
                    // before returning the fetched records, we can send off the next round of fetches
                    // and avoid block waiting for their responses to enable pipelining while the user
                    // is handling the fetched records.
                    //
                    // NOTE: since the consumed position has already been updated, we must not allow
                    // wakeups or any other errors to be triggered prior to returning the fetched records.
                    if (fetcher.sendFetches() > 0 || client.pendingRequestCount() > 0)
                        client.pollNoWakeup();

                    if (this.interceptors == null)
                        return new ConsumerRecords<>(records);
                    else
                        return this.interceptors.onConsume(new ConsumerRecords<>(records));
                }

                long elapsed = time.milliseconds() - start;
                remaining = timeout - elapsed;
            } while (remaining > 0);

            return ConsumerRecords.empty();
        } finally {
            release();
        }
    }

這里調(diào)用了pollOnce方法

/**
     * Do one round of polling. In addition to checking for new data, this does any needed offset commits
     * (if auto-commit is enabled), and offset resets (if an offset reset policy is defined).
     * @param timeout The maximum time to block in the underlying call to {@link ConsumerNetworkClient#poll(long)}.
     * @return The fetched records (may be empty)
     */
    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
        coordinator.poll(time.milliseconds());

        // fetch positions if we have partitions we're subscribed to that we
        // don't know the offset for
        if (!subscriptions.hasAllFetchPositions())
            updateFetchPositions(this.subscriptions.missingFetchPositions());

        // if data is available already, return it immediately
        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
        if (!records.isEmpty())
            return records;

        // send any new fetches (won't resend pending fetches)
        fetcher.sendFetches();

        long now = time.milliseconds();
        long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);

        client.poll(pollTimeout, now, new PollCondition() {
            @Override
            public boolean shouldBlock() {
                // since a fetch might be completed by the background thread, we need this poll condition
                // to ensure that we do not block unnecessarily in poll()
                return !fetcher.hasCompletedFetches();
            }
        });

        // after the long poll, we should check whether the group needs to rebalance
        // prior to returning data so that the group can stabilize faster
        if (coordinator.needRejoin())
            return Collections.emptyMap();

        return fetcher.fetchedRecords();
    }

注意雕旨,這里首先調(diào)用了coordinator.poll(time.milliseconds());

ConsumerCoordinator#poll

kafka-clients-0.10.2.1-sources.jar!/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java

/**
     * Poll for coordinator events. This ensures that the coordinator is known and that the consumer
     * has joined the group (if it is using group management). This also handles periodic offset commits
     * if they are enabled.
     *
     * @param now current time in milliseconds
     */
    public void poll(long now) {
        invokeCompletedOffsetCommitCallbacks();

        if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {
            ensureCoordinatorReady();
            now = time.milliseconds();
        }

        if (needRejoin()) {
            // due to a race condition between the initial metadata fetch and the initial rebalance,
            // we need to ensure that the metadata is fresh before joining initially. This ensures
            // that we have matched the pattern against the cluster's topics at least once before joining.
            if (subscriptions.hasPatternSubscription())
                client.ensureFreshMetadata();

            ensureActiveGroup();
            now = time.milliseconds();
        }

        pollHeartbeat(now);
        maybeAutoCommitOffsetsAsync(now);
    }

這里重點看3個方法扮匠,一個是ensureActiveGroup,一個是pollHeartbeat凡涩,一個是maybeAutoCommitOffsetsAsync

pollHeartbeat

protected synchronized void pollHeartbeat(long now) {
        if (heartbeatThread != null) {
            if (heartbeatThread.hasFailed()) {
                // set the heartbeat thread to null and raise an exception. If the user catches it,
                // the next call to ensureActiveGroup() will spawn a new heartbeat thread.
                RuntimeException cause = heartbeatThread.failureCause();
                heartbeatThread = null;
                throw cause;
            }

            heartbeat.poll(now);
        }
    }

這里會觸發(fā)一次心跳

當然棒搜,光靠poll來觸發(fā)心跳,這個是不靠譜的活箕,單線程重走之前版本的老路了力麸,新版本當然有個新線程來保證
kafka-clients-0.10.2.1-sources.jar!/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java

private class HeartbeatThread extends KafkaThread {
        private boolean enabled = false;
        private boolean closed = false;
        private AtomicReference<RuntimeException> failed = new AtomicReference<>(null);

        private HeartbeatThread() {
            super("kafka-coordinator-heartbeat-thread" + (groupId.isEmpty() ? "" : " | " + groupId), true);
        }

        //......
}

    /**
     * Ensure that the group is active (i.e. joined and synced)
     */
    public void ensureActiveGroup() {
        // always ensure that the coordinator is ready because we may have been disconnected
        // when sending heartbeats and does not necessarily require us to rejoin the group.
        ensureCoordinatorReady();
        startHeartbeatThreadIfNeeded();
        joinGroupIfNeeded();
    }

    private synchronized void startHeartbeatThreadIfNeeded() {
        if (heartbeatThread == null) {
            heartbeatThread = new HeartbeatThread();
            heartbeatThread.start();
        }
    }        

ensureActiveGroup這個方法保證heartbeatThread有啟動起來

maybeAutoCommitOffsetsAsync

private void maybeAutoCommitOffsetsAsync(long now) {
        if (autoCommitEnabled) {
            if (coordinatorUnknown()) {
                this.nextAutoCommitDeadline = now + retryBackoffMs;
            } else if (now >= nextAutoCommitDeadline) {
                this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
                doAutoCommitOffsetsAsync();
            }
        }
    }
private void doAutoCommitOffsetsAsync() {
        Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
        log.debug("Sending asynchronous auto-commit of offsets {} for group {}", allConsumedOffsets, groupId);

        commitOffsetsAsync(allConsumedOffsets, new OffsetCommitCallback() {
            @Override
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                if (exception != null) {
                    log.warn("Auto-commit of offsets {} failed for group {}: {}", offsets, groupId,
                            exception.getMessage());
                    if (exception instanceof RetriableException)
                        nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline);
                } else {
                    log.debug("Completed auto-commit of offsets {} for group {}", offsets, groupId);
                }
            }
        });
    }    

這里會根據(jù)設定的auto.commit.interval.ms然后計算下次的auto commit時間

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市育韩,隨后出現(xiàn)的幾起案子克蚂,更是在濱河造成了極大的恐慌,老刑警劉巖筋讨,帶你破解...
    沈念sama閱讀 210,914評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件陨舱,死亡現(xiàn)場離奇詭異,居然都是意外死亡版仔,警方通過查閱死者的電腦和手機游盲,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,935評論 2 383
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蛮粮,“玉大人益缎,你說我怎么就攤上這事∪幌耄” “怎么了莺奔?”我有些...
    開封第一講書人閱讀 156,531評論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長变泄。 經(jīng)常有香客問我令哟,道長,這世上最難降的妖魔是什么妨蛹? 我笑而不...
    開封第一講書人閱讀 56,309評論 1 282
  • 正文 為了忘掉前任屏富,我火速辦了婚禮,結(jié)果婚禮上蛙卤,老公的妹妹穿的比我還像新娘狠半。我一直安慰自己,他們只是感情好颤难,可當我...
    茶點故事閱讀 65,381評論 5 384
  • 文/花漫 我一把揭開白布神年。 她就那樣靜靜地躺著,像睡著了一般行嗤。 火紅的嫁衣襯著肌膚如雪已日。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,730評論 1 289
  • 那天栅屏,我揣著相機與錄音飘千,去河邊找鬼堂鲜。 笑死,一個胖子當著我的面吹牛占婉,可吹牛的內(nèi)容都是我干的泡嘴。 我是一名探鬼主播甫恩,決...
    沈念sama閱讀 38,882評論 3 404
  • 文/蒼蘭香墨 我猛地睜開眼逆济,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了磺箕?” 一聲冷哼從身側(cè)響起奖慌,我...
    開封第一講書人閱讀 37,643評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎松靡,沒想到半個月后简僧,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,095評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡雕欺,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,448評論 2 325
  • 正文 我和宋清朗相戀三年岛马,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片屠列。...
    茶點故事閱讀 38,566評論 1 339
  • 序言:一個原本活蹦亂跳的男人離奇死亡啦逆,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出笛洛,到底是詐尸還是另有隱情夏志,我是刑警寧澤,帶...
    沈念sama閱讀 34,253評論 4 328
  • 正文 年R本政府宣布苛让,位于F島的核電站沟蔑,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏狱杰。R本人自食惡果不足惜瘦材,卻給世界環(huán)境...
    茶點故事閱讀 39,829評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望仿畸。 院中可真熱鬧宇色,春花似錦、人聲如沸颁湖。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,715評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽甥捺。三九已至抢蚀,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間镰禾,已是汗流浹背皿曲。 一陣腳步聲響...
    開封第一講書人閱讀 31,945評論 1 264
  • 我被黑心中介騙來泰國打工唱逢, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人屋休。 一個月前我還...
    沈念sama閱讀 46,248評論 2 360
  • 正文 我出身青樓坞古,卻偏偏與公主長得像,于是被迫代替她去往敵國和親劫樟。 傳聞我的和親對象是個殘疾皇子痪枫,可洞房花燭夜當晚...
    茶點故事閱讀 43,440評論 2 348

推薦閱讀更多精彩內(nèi)容