KafkaConsumer.java

public class KafkaConsumer<K, V> implements Consumer<K, V> {
    // clientId的生成器
    private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    // comsumer的唯一id
    private final String clientId;
    // 控制consumer和服務端Coordinator之間的通信邏輯
    private final ConsumerCoordinator coordinator;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    // poll方法返回用戶之前攔截,服務端返回commit響應時攔截
    private final ConsumerInterceptors<K, V> interceptors;
    // 負責consumer與broker之間的通信
    private final ConsumerNetworkClient client;
    // 維護消費者的消費狀態(tài)
    private final SubscriptionState subscriptions;
    // Kafka集群元信息
    private final Metadata metadata;
    // 當前使用KafkaConsumer的線程id
    private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
    // 重入次數(shù)
    // 檢測是否有多線程并發(fā)操作consumer
    private final AtomicInteger refcount = new AtomicInteger(0);

    @Override
    public ConsumerRecords<K, V> poll(long timeout) {
        acquire(); // 防止多線程操作
        try {
            long start = time.milliseconds();
            long remaining = timeout;
            do {
                Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
                // 有消息就返回,退出poll
                if (!records.isEmpty()) {
                    // 為了提升效率写烤,對records集合處理之前肄程,先發(fā)送一次FetchRequest,
                    // 這樣線程處理完records的同時,F(xiàn)etchRequest和FetchResponse在網(wǎng)絡上也在并行傳輸
                    fetcher.sendFetches();

                    // 發(fā)送FetchRequest,不可中斷原因:
                    // since the consumed position has already been updated, we must not allow
                    // wakeups or any other errors to be triggered prior to returning the fetched records.
                    // 之前的pollOnce
                    client.pollNoWakeup();

                    if (this.interceptors == null)
                        return new ConsumerRecords<>(records);
                    else
                        return this.interceptors.onConsume(new ConsumerRecords<>(records));
                }

                long elapsed = time.milliseconds() - start;
                remaining = timeout - elapsed;
            } while (remaining > 0);

            // 超過時間還收到消息就返回空
            return ConsumerRecords.empty();
        } finally {
            release();
        }
    }

    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
        coordinator.ensureCoordinatorReady();

        if (subscriptions.partitionsAutoAssigned())
            // 如果是AUTO模式疹吃,要先完成rebalance
            coordinator.ensurePartitionAssignment();

        // 如果consumer存在訂閱的TopicPartition沒有position歉摧,還需要恢復SubscriptionState中對應TopicPartitionState狀態(tài)
        // 如果缺失commit核畴,從服務端拉取commited,然后同步到position
        if (!subscriptions.hasAllFetchPositions())
            updateFetchPositions(this.subscriptions.missingFetchPositions());

        long now = time.milliseconds();

        // 執(zhí)行定時任務 HeartbeatTask和AutoCommitedTask
        // 從delayedTasks隊列里拉取計劃在當前時間前執(zhí)行的定時任務
        client.executeDelayedTasks(now);

        // 嘗試從completedFetch隊列緩存里獲取解析消息
        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
        if (!records.isEmpty())
            return records;

        fetcher.sendFetches();
        client.poll(timeout, now);
        return fetcher.fetchedRecords();
    }

    private void acquire() {
        ensureNotClosed();
        long threadId = Thread.currentThread().getId();
        if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
            // 檢測到被其他線程占用莺奸,就拋出異常
            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
        refcount.incrementAndGet();
    }

    // 釋放占用
    private void release() {
        if (refcount.decrementAndGet() == 0)
            currentThread.set(NO_CURRENT_THREAD);
    }

    private void updateFetchPositions(Set<TopicPartition> partitions) {
        // 刷新本地SubscriptionState.TopicPartitionState.commit的offset
        coordinator.refreshCommittedOffsetsIfNeeded();
        // 如果commit是null,就重置offset
        fetcher.updateFetchPositions(partitions);
    }
}

public class ConsumerNetworkClient implements Closeable {
    // NetworkClient
    private final KafkaClient client;
    // consumer之外的thread設置,表示要中斷consumer線程
    private final AtomicBoolean wakeup = new AtomicBoolean(false);
    // 定時任務隊列,主要是心跳任務
    // 底層實現(xiàn)是PriorityQueue
    private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue();
    // 緩沖隊列
    private final Map<Node, List<ClientRequest>> unsent = new HashMap<>();
    // 集群元數(shù)據(jù)
    private final Metadata metadata;
    // 在unset中緩存的超時時長
    private final long unsentExpiryMs;
    // consumer每進入一個不可中斷的method加1真友,退出時減1
    // 判斷是否允許喚醒selector阻塞
    private int wakeupDisabledCount = 0;

    // 待發(fā)送的請求封裝成ClientRequest是嗜,然后保存到unsent
    public RequestFuture<ClientResponse> send(Node node, ApiKeys api, AbstractRequest request) {
        long now = time.milliseconds();
        RequestFutureCompletionHandler future = new RequestFutureCompletionHandler();
        RequestHeader header = client.nextRequestHeader(api);
        RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
        put(node, new ClientRequest(now, true, send, future));
        return future;
    }

    public void poll(RequestFuture<?> future) {
        while (!future.isDone()) // 同步阻塞等待future完成響應
            poll(Long.MAX_VALUE);
    }

    // 不阻塞等待也不喚醒
    public void pollNoWakeup() {
        disableWakeups();
        try {
            poll(0, time.milliseconds(), false);
        } finally {
            enableWakeups();
        }
    }

    private void poll(long timeout, long now, boolean executeDelayedTasks) {
        // 遍歷處理unsent緩存中的請求
        trySend(now);

        // 比較取最小值丽柿,避免影響定時任務執(zhí)行
        timeout = Math.min(timeout, delayedTasks.nextTimeout(now));
        // 實際發(fā)送請求,檢測wakeup標識為true就拋出異常中斷consumer.poll方法
        clientPoll(timeout, now);
        now = time.milliseconds();

        // 如果連接斷開,從unsent隊列里刪除后,再調用這些request的callback
        checkDisconnects(now);

        // 執(zhí)行定時任務
        if (executeDelayedTasks)
            delayedTasks.poll(now);

        // 可能已經(jīng)新建了某些node的連接,再嘗試一把
        trySend(now);

        // 遍歷unsent中已經(jīng)超時的request,執(zhí)行callback,然后從unsent里刪除
        failExpiredRequests(now);
    }

    private boolean trySend(long now) {
        boolean requestsSent = false;
        for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
            Node node = requestEntry.getKey();
            Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
            while (iterator.hasNext()) {
                ClientRequest request = iterator.next();
                // 檢測連接矫钓、在途請求隊列數(shù)量
                if (client.ready(node, now)) {
                    // 復制到KafkaChannel的send
                    client.send(request, now);
                    iterator.remove();
                    requestsSent = true;
                }
            }
        }
        return requestsSent;
    }

    // 設置MAX超時時長要尔,同步阻塞等待
    public void awaitMetadataUpdate() {
        int version = this.metadata.requestUpdate();
        do {
            poll(Long.MAX_VALUE);
        } while (this.metadata.version() == version);
    }

    // 等待unsent和InFlightRequests中的請求全部完成
    public void awaitPendingRequests(Node node) {
        while (pendingRequestCount(node) > 0)
            poll(retryBackoffMs);
    }

    public static class RequestFutureCompletionHandler extends RequestFuture<ClientResponse> implements RequestCompletionHandler {

        // 請求是否已經(jīng)完成
        private boolean isDone = false;
        // 成功響應,與exception互斥
        private T value;
        // 導致異常的類
        private RuntimeException exception;
        // 監(jiān)聽請求完成的情況舍杜,onSucess和onFailure方法
        private List<RequestFutureListener<T>> listeners = new ArrayList<>();

        @Override
        public void onComplete(ClientResponse response) {
            if (response.wasDisconnected()) {
                ClientRequest request = response.request();
                RequestSend send = request.request();
                ApiKeys api = ApiKeys.forId(send.header().apiKey());
                int correlation = send.header().correlationId();
                raise(DisconnectException.INSTANCE);
            } else {
                complete(response);
            }
        }

        // 適配將本實例的泛型類型T轉換成S
        public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
            final RequestFuture<S> adapted = new RequestFuture<S>();
            addListener(new RequestFutureListener<T>() {
                @Override
                public void onSuccess(T value) {
                    adapter.onSuccess(value, adapted);
                }

                @Override
                public void onFailure(RuntimeException e) {
                    adapter.onFailure(e, adapted);
                }
            });
            return adapted;
        }
    }
}

public class SubscriptionState {

    private enum SubscriptionType {
        NONE,
        AUTO_TOPICS,  // 指定topic名
        AUTO_PATTERN,  // 正則匹配topic名
        USER_ASSIGNED // 用戶指定
    };

    private SubscriptionType subscriptionType; // 表示訂閱的模式
    private Pattern subscribedPattern; // 正則匹配模式的表達式
    private final Set<String> subscription; // 所有訂閱的topic名
    private final Set<String> groupSubscription; // 只有consumerGroup的leader才有,記錄該consumerGroup訂閱的所有topic; follower只有自己訂閱的topic
    private final Set<TopicPartition> userAssignment; // 手動分配給consumer的topicPartition集合赵辕,與subscription互斥
    // 分配給當前consumer的分區(qū)
    // 記錄每個topicPartition的消費狀況,例如offset
    private final Map<TopicPartition, TopicPartitionState> assignment;
    // 是否需要分區(qū)分配, needsRejoin會根據(jù)這個判斷處理
    // consumer訂閱某個topic時設置成true
    private boolean needsPartitionAssignment;
    // 是否需要拉取offset既绩,在異步提交offset或rebalance分區(qū)時候會設置成true
    private boolean needsFetchCommittedOffsets;
    private final OffsetResetStrategy defaultResetStrategy; // 重置offset策略
    private ConsumerRebalanceListener listener; // 監(jiān)聽分區(qū)分配操作

    private static class TopicPartitionState {
        private Long position; // 最近消費消息的offset
        private OffsetAndMetadata committed; // 最近commit的offset
        private boolean paused; // 是否處于暫停狀態(tài)
        private OffsetResetStrategy resetStrategy; // 重置offset的策略
    }

    // comsumer訂閱topic時候回被調用
    public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
        setSubscriptionType(SubscriptionType.AUTO_TOPICS);
        // 缺省的listener是 NoOpConsumerRebalanceListener
        this.listener = listener;
        // 更新subscription笆怠、groupSubscription椎镣、needsPartitionAssignment=true
        changeSubscription(topics);
    }
}

public final class ConsumerCoordinator extends AbstractCoordinator {
    // consumer發(fā)送的JoinGroupRequest中包含了自身支持的PartitionAssigner,
    // GroupCoordinator從所有consumer的分配策略里選擇一個,通知leader使用此策略做分區(qū)分配
    private final List<PartitionAssignor> assignors;
    private final Metadata metadata;
    private final SubscriptionState subscriptions;
    private final boolean autoCommitEnabled;
    private final AutoCommitTask autoCommitTask; // 自動提交offset的定時任務
    private final ConsumerInterceptors<?, ?> interceptors;
    private final boolean excludeInternalTopics; // 是否排除內部topic
    // 用來檢測topic是否發(fā)生了分區(qū)數(shù)量的變化
    private MetadataSnapshot metadataSnapshot;

    // 構造方法
    public ConsumerCoordinator(ConsumerNetworkClient client, String groupId, int sessionTimeoutMs, int heartbeatIntervalMs,
                               List<PartitionAssignor> assignors, Metadata metadata, SubscriptionState subscriptions,
                               Metrics metrics, String metricGrpPrefix, Time time, long retryBackoffMs,
                               OffsetCommitCallback defaultOffsetCommitCallback, boolean autoCommitEnabled,
                               long autoCommitIntervalMs, ConsumerInterceptors<?, ?> interceptors, boolean excludeInternalTopics) {
        super(client, groupId, sessionTimeoutMs, heartbeatIntervalMs, metrics, metricGrpPrefix, time, retryBackoffMs);
        this.metadata = metadata;

        this.metadata.requestUpdate();
        this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch());
        this.subscriptions = subscriptions;
        this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
        this.autoCommitEnabled = autoCommitEnabled;
        this.assignors = assignors;
        // 添加metadata更新監(jiān)聽
        addMetadataListener();

        if (autoCommitEnabled) {
            this.autoCommitTask = new AutoCommitTask(autoCommitIntervalMs);
            this.autoCommitTask.reschedule();
        } else {
            this.autoCommitTask = null;
        }

        this.interceptors = interceptors;
        this.excludeInternalTopics = excludeInternalTopics;
    }

    // Metadata更新監(jiān)聽
    private void addMetadataListener() {
        this.metadata.addListener(new Metadata.Listener() {
            @Override
            public void onMetadataUpdate(Cluster cluster) {
                // 正則匹配topic模式
                if (subscriptions.hasPatternSubscription()) {

                    final List<String> topicsToSubscribe = new ArrayList<>();
                    for (String topic : cluster.topics())
                        if (filterTopic(topic)) // 正則匹配
                            topicsToSubscribe.add(topic);
                    // 更新subscription幕随、groupScription集合蛤高、assignment集合
                    subscriptions.changeSubscription(topicsToSubscribe);
                    // 更新元信息的topic集合
                    metadata.setTopics(subscriptions.groupSubscription());
                } else if (!cluster.unauthorizedTopics().isEmpty()) {
                    throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
                }

                // 非手動,即AUTO_TOPICS或AUTO_PATTERN
                if (subscriptions.partitionsAutoAssigned()) {
                    MetadataSnapshot snapshot = new MetadataSnapshot(subscriptions, cluster);
                    // metadataSnapshot底層是map: topic -> partition數(shù)量
                    // 不相等說明分區(qū)產(chǎn)生了變化懊昨,需要rebalance
                    if (!snapshot.equals(metadataSnapshot)) {
                        metadataSnapshot = snapshot;
                        subscriptions.needReassignment();
                    }
                }

            }
        });
    }

    // JoinGroup的入口,即rebalance
    public void ensurePartitionAssignment() {
        // 只有自動分配分區(qū)的才需要rebalance
        if (subscriptions.partitionsAutoAssigned()) {
            if (subscriptions.hasPatternSubscription())
                // 訂閱是正則匹配模式扁位,還需要檢查是否需要更新Metadata
                // 防止使用過期的Metadata進行rebalance
                client.ensureFreshMetadata();

            ensureActiveGroup();
        }
    }

    @Override
    protected void onJoinPrepare(int generation, String memberId) {
        // 如果開啟了自動提交offset爽雄,則同步提交offset
        maybeAutoCommitOffsetsSync();

        ConsumerRebalanceListener listener = subscriptions.listener();
        Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
        // 調用分區(qū)重新分配的callback
        listener.onPartitionsRevoked(revoked);

        assignmentSnapshot = null;
        // groupSubscription收縮到自身的subscription
        // needsPartitionAssignment=true
        subscriptions.needReassignment();
    }

    // 收到JoinGroupResponse后兜叨,被指定為join leader的consumer忆首,執(zhí)行分配策略
    @Override
    protected Map<String, ByteBuffer> performAssignment(String leaderId,
                                                        String assignmentStrategy,
                                                        Map<String, ByteBuffer> allSubscriptions) {
        // 默認是range分配策略
        PartitionAssignor assignor = lookupAssignor(assignmentStrategy);

        Set<String> allSubscribedTopics = new HashSet<>();
        Map<String, Subscription> subscriptions = new HashMap<>();
        for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {
            Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());
            subscriptions.put(subscriptionEntry.getKey(), subscription);
            allSubscribedTopics.addAll(subscription.topics());
        }

        // leader需要更新整個consumer group的訂閱topic
        // 可能有新的topic加入悴能,需要更新Metadata
        this.subscriptions.groupSubscribe(allSubscribedTopics);
        metadata.setTopics(this.subscriptions.groupSubscription());

        client.ensureFreshMetadata();
        assignmentSnapshot = metadataSnapshot;

        // 默認調用RangeAssignor
        // 分配結果: memberId -> 分配結果
        Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);
        Map<String, ByteBuffer> groupAssignment = new HashMap<>();
        for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) {
            ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
            groupAssignment.put(assignmentEntry.getKey(), buffer);
        }

        return groupAssignment;
    }

    // 處理SyncGroupResponse
    @Override
    protected void onJoinComplete(int generation, String memberId, String assignmentStrategy, ByteBuffer assignmentBuffer) {
        // 快照與最新的不一致浑劳,需要重新分區(qū)Assign
        if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) {
            subscriptions.needReassignment();
            return;
        }

        PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
        Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);

        // 從服務端獲取最近一次的offset標識
        subscriptions.needRefreshCommits();

        // 更新當前consumer訂閱的topic
        subscriptions.assignFromSubscribed(assignment.partitions());

        // 重新啟動AutoCommitTask定時任務
        if (autoCommitEnabled)
            autoCommitTask.reschedule();

        // rebalance后執(zhí)行callback
        ConsumerRebalanceListener listener = subscriptions.listener();
        Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
        listener.onPartitionsAssigned(assigned);
    }
}

public class RangeAssignor extends AbstractPartitionAssignor {
    @Override
    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                    Map<String, List<String>> subscriptions) {
        for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {

            Collections.sort(consumersForTopic);

            // 每個consumer訂閱partition數(shù)量
            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
            // 除不盡余數(shù)的partition單獨分配給consumer
            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();

            List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
            for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
                int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
                int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
                assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
            }
        }
        return assignment;
    }
}

public abstract class AbstractPartitionAssignor implements PartitionAssignor {
    // 完成partition分配
    @Override
    public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
        Set<String> allSubscribedTopics = new HashSet<>();
        Map<String, List<String>> topicSubscriptions = new HashMap<>();
        // 父類默認是去掉userData不處理的
        // 如果子類需要用到userData,就要自己實現(xiàn)PartitionAssignor接口的assign方法
        for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
            List<String> topics = subscriptionEntry.getValue().topics();
            allSubscribedTopics.addAll(topics);
            topicSubscriptions.put(subscriptionEntry.getKey(), topics);
        }

        // 統(tǒng)計每個topic的分區(qū)數(shù)量
        Map<String, Integer> partitionsPerTopic = new HashMap<>();
        for (String topic : allSubscribedTopics) {
            Integer numPartitions = metadata.partitionCountForTopic(topic);
            if (numPartitions != null && numPartitions > 0)
                partitionsPerTopic.put(topic, numPartitions);
        }


        Map<String, List<TopicPartition>> rawAssignments = assign(partitionsPerTopic, topicSubscriptions);

        Map<String, Assignment> assignments = new HashMap<>();
        for (Map.Entry<String, List<TopicPartition>> assignmentEntry : rawAssignments.entrySet())
            assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue()));
        return assignments;
    }
}

public interface PartitionAssignor {

    // 每個member的訂閱信息
    class Subscription {
        private final List<String> topics; // 訂閱的topic集合
        private final ByteBuffer userData;
    }

    class Assignment {
        private final List<TopicPartition> partitions; // 分區(qū)分配的結果
        private final ByteBuffer userData;
    }
}

public abstract class AbstractCoordinator implements Closeable {
    private final Heartbeat heartbeat; // 心跳任務的輔助類
    private final HeartbeatTask heartbeatTask; // 定時任務份帐,發(fā)送心跳和處理響應
    protected final String groupId; // consumer group id
    protected final ConsumerNetworkClient client; // 網(wǎng)絡通信

    private boolean needsJoinPrepare = true; // 是否需要發(fā)送joinGroupRequest前的準備操作
    // 是否需要重新發(fā)送JoinGroupRequest的條件之一
    // 一般收到response的錯誤碼是需要rebalance時璃吧,會設置成true
    // JoinGroupResponse收到后設置成false
    // 缺省是true
    private boolean rejoinNeeded = true;

    protected Node coordinator; // 記錄服務端GroupCoordinator所在的node節(jié)點
    protected String memberId; // 服務端GroupCoordinator返回的分配給consumer的唯一id
    protected int generation; // 可以理解每次rebalance的版本號,避免消費歷史的rebalance請求

    private class HeartbeatTask implements DelayedTask {

        // 外部調用觸發(fā)心跳任務
        public void reset() {
            long now = time.milliseconds();
            heartbeat.resetSessionTimeout(now);
            client.unschedule(this);

            if (!requestInFlight)
                client.schedule(this, now);
        }

        @Override
        public void run(final long now) {
            // 之前的心跳請求正常收到響應
            // 不處于正在等待rebalance分配結果的狀態(tài)
            // 服務端的GroupCoordinator已連接
            if (generation < 0 || needRejoin() || coordinatorUnknown()) {
                return;
            }

            if (heartbeat.sessionTimeoutExpired(now)) {
                // 心跳超時則認為服務端GroupCoordinator已經(jīng)宕機
                coordinatorDead();
                return;
            }

            if (!heartbeat.shouldHeartbeat(now)) {
                // 還沒到下一次心跳間隔觸發(fā)時間废境,不發(fā)送請求(等于本次任務結束)畜挨,
                // 更新下一個觸發(fā)時間點,再添加一個新的定時任務
                client.schedule(this, now + heartbeat.timeToNextHeartbeat(now));
            } else {
                heartbeat.sentHeartbeat(now);
                requestInFlight = true; // 防止重復發(fā)送

                // 發(fā)送心跳請求
                RequestFuture<Void> future = sendHeartbeatRequest();
                // 注冊該請求收到響應的callback
                future.addListener(new RequestFutureListener<Void>() {
                    // 發(fā)送完成后新增定時任務調度
                    @Override
                    public void onSuccess(Void value) {
                        requestInFlight = false;
                        long now = time.milliseconds();
                        heartbeat.receiveHeartbeat(now);
                        long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
                        client.schedule(HeartbeatTask.this, nextHeartbeatTime);
                    }

                    @Override
                    public void onFailure(RuntimeException e) {
                        requestInFlight = false;
                        client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs);
                    }
                });
            }
        }
    }

    // 處理心跳響應
    private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {

        @Override
        public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
            Errors error = Errors.forCode(heartbeatResponse.errorCode());
            if (error == Errors.NONE) {
                // 成功響應噩凹,傳播成功事件
                future.complete(null);
            } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
                    || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
                coordinatorDead();
                future.raise(error);
            } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                // 說明coordinator已經(jīng)發(fā)起了rebalance
                // 觸發(fā)發(fā)送JoinGroupRequest的標識
                AbstractCoordinator.this.rejoinNeeded = true;
                future.raise(Errors.REBALANCE_IN_PROGRESS);
            } else if (error == Errors.ILLEGAL_GENERATION) {
                AbstractCoordinator.this.rejoinNeeded = true;
                future.raise(Errors.ILLEGAL_GENERATION);
            } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
                AbstractCoordinator.this.rejoinNeeded = true;
                future.raise(Errors.UNKNOWN_MEMBER_ID);
                future.raise(new GroupAuthorizationException(groupId));
            } else {
                future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
            }
        }
    }

    protected void coordinatorDead() {
        if (this.coordinator != null) {
            // unsent緩存中的請求清空巴元,并且調用異常的回調
            client.failUnsentRequests(this.coordinator, GroupCoordinatorNotAvailableException.INSTANCE);
            // 表示重新選擇GroupCoordinator
            this.coordinator = null;
        }
    }

    // 查找服務端GroupCoordinator
    // 后面的rebalance、fetch消息和commit offset驮宴,都是和GroupCoordinator打交道
    public void ensureCoordinatorReady() {
        while (coordinatorUnknown()) {
            RequestFuture<Void> future = sendGroupCoordinatorRequest();
            // 阻塞等待future有響應
            client.poll(future);

            if (future.failed()) {
                if (future.isRetriable())
                    client.awaitMetadataUpdate();
                else
                    throw future.exception();
            } else if (coordinator != null && client.connectionFailed(coordinator)) {
                coordinatorDead();
                // 通過sleep控制重試連接間隔
                time.sleep(retryBackoffMs);
            }
        }
    }

    // 處理服務端返回查找GroupCoordinator的應答
    // 賦值coordinator字段逮刨,連接coordinator,啟動心跳任務
    private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void> future) {

        if (!coordinatorUnknown()) {
            // consumer已經(jīng)找到GroupCoordinator了堵泽,不處理這個應答
            future.complete(null);
        } else {
            GroupCoordinatorResponse groupCoordinatorResponse = new GroupCoordinatorResponse(resp.responseBody());
            Errors error = Errors.forCode(groupCoordinatorResponse.errorCode());
            if (error == Errors.NONE) {
                this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
                        groupCoordinatorResponse.node().host(),
                        groupCoordinatorResponse.node().port());

                client.tryConnect(coordinator);

                if (generation > 0)
                    heartbeatTask.reset();
                future.complete(null);
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(new GroupAuthorizationException(groupId));
            } else {
                future.raise(error);
            }
        }
    }

    public void ensureActiveGroup() {
        if (!needRejoin())
            return;

        if (needsJoinPrepare) {
            onJoinPrepare(generation, memberId);
            needsJoinPrepare = false;
        }

        while (needRejoin()) {
            // 檢查已經(jīng)連接服務端的groupCoordinator
            ensureCoordinatorReady();

            // 如果還有發(fā)送給GroupCoordinator的請求修己,阻塞等待這些請求收到響應
            // 即等待unsent和InFlightRequests隊列為空
            if (client.pendingRequestCount(this.coordinator) > 0) {
                client.awaitPendingRequests(this.coordinator);
                continue;
            }

            RequestFuture<ByteBuffer> future = sendJoinGroupRequest();
            future.addListener(new RequestFutureListener<ByteBuffer>() {
                @Override
                public void onSuccess(ByteBuffer value) {
                    onJoinComplete(generation, memberId, protocol, value);
                    needsJoinPrepare = true;
                    heartbeatTask.reset();
                }

                @Override
                public void onFailure(RuntimeException e) {
                }
            });
            client.poll(future);

            if (future.failed()) {
                RuntimeException exception = future.exception();
                if (exception instanceof UnknownMemberIdException ||
                        exception instanceof RebalanceInProgressException ||
                        exception instanceof IllegalGenerationException)
                    continue;
                else if (!future.isRetriable())
                    throw exception;
                // 通過sleep控制重試間隔
                time.sleep(retryBackoffMs);
            }
        }
    }

    // JoinGroupRequest設置到sent字段里
    private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
        JoinGroupRequest request = new JoinGroupRequest( groupId, this.sessionTimeoutMs,
                this.memberId, protocolType(), metadata());

        return client.send(coordinator, ApiKeys.JOIN_GROUP, request)
                .compose(new JoinGroupResponseHandler());
    }

    // 處理JoinGroupResponse
    private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
        @Override
        public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
            Errors error = Errors.forCode(joinResponse.errorCode());
            if (error == Errors.NONE) {
                // 更新本地信息
                AbstractCoordinator.this.memberId = joinResponse.memberId();
                AbstractCoordinator.this.generation = joinResponse.generationId();
                AbstractCoordinator.this.rejoinNeeded = false;
                AbstractCoordinator.this.protocol = joinResponse.groupProtocol();
                // 判斷自己是不是join leader
                if (joinResponse.isLeader()) {
                    onJoinLeader(joinResponse).chain(future);
                } else {
                    onJoinFollower().chain(future);
                }
            } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
                // 重試
                future.raise(error);
            } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
                future.raise(Errors.UNKNOWN_MEMBER_ID);
            } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
                    || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
                coordinatorDead();
                future.raise(error);
            } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
                    || error == Errors.INVALID_SESSION_TIMEOUT
                    || error == Errors.INVALID_GROUP_ID) {
                log.error("Attempt to join group {} failed due to fatal error: {}", groupId, error.message());
                future.raise(error);
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(new GroupAuthorizationException(groupId));
            } else {
                future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
            }
        }

        // join leader的邏輯
        private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
            // 執(zhí)行分配
            Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),
                    joinResponse.members());
            // 發(fā)送請求
            SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment);
            return sendSyncGroupRequest(request);
        }
    }
}

public final class Heartbeat {
    private final long timeout; // 過期時間
    private final long interval; // 2次心跳的間隔恢总,缺省3000

    private long lastHeartbeatSend; // 最后發(fā)送心跳請求的時間
    private long lastHeartbeatReceive; // 最后收到心跳響應的時間
    private long lastSessionReset; //心跳重置時間

    // 計算下次心跳發(fā)送時間
    public long timeToNextHeartbeat(long now) {
        long timeSinceLastHeartbeat = now - Math.max(lastHeartbeatSend, lastSessionReset);

        if (timeSinceLastHeartbeat > interval)
            return 0;
        else
            return interval - timeSinceLastHeartbeat;
    }

    // 判斷是否超時
    public boolean sessionTimeoutExpired(long now) {
        return now - Math.max(lastSessionReset, lastHeartbeatReceive) > timeout;
    }
}

// 從服務端拉取消息
public class Fetcher<K, V> {
    // 負責網(wǎng)絡通信
    private final ConsumerNetworkClient client;
    // 服務端收到FetchRequest后并不是立即響應,當返回的消息積累到至少minBytes個字節(jié)才響應, 提高網(wǎng)絡有效負載
    // 服務端根據(jù)請求里的minBytes決定啥時候返回?
    private final int minBytes;
    // 等待FetchResponse的最長時長睬愤,服務端根據(jù)此事件決定何時響應
    private final int maxWaitMs;
    // 每次fetch的最大字節(jié)數(shù)
    private final int fetchSize;
    // 每次獲取Record的最大數(shù)量
    private final int maxPollRecords;
    private final Metadata metadata; // Kafka集群元數(shù)據(jù)
    private final SubscriptionState subscriptions; // 記錄每個TopicPartition的消費情況
    // FetchResponse先轉換成CompletedFetch對象進入隊列緩存片仿,后續(xù)再解析響應消息
    private final List<CompletedFetch> completedFetches;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    // 保存了CompletedFetch解析后的結果集合
    // CompletedFetch里的消息只是ByteBuffer,經(jīng)過offset+size確定長度,然后反序列拿到實際結構消息
    // 泛型的ConsumerRecord集合存放在nextInLineRecords里戴涝,也是最終KafkaConsumer返回的結果
    private PartitionRecords<K, V> nextInLineRecords = null;

    private static class PartitionRecords<K, V> {
        private long fetchOffset; // 記錄了records中的第一個消息的offset
        private TopicPartition partition;
        private List<ConsumerRecord<K, V>> records; // 消息集合
    }

    // 創(chuàng)建FetchRequest請求
    private Map<Node, FetchRequest> createFetchRequests() {
        Cluster cluster = metadata.fetch();
        Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<>();
        // 遍歷可以fetch的partition
        for (TopicPartition partition : fetchablePartitions()) {
            // leader副本
            Node node = cluster.leaderFor(partition);
            if (node == null) {
                metadata.requestUpdate();
            }
            // 如果leader副本對應的unsent或InFlightRequest隊列里還有請求為發(fā)送
            // 就不對這個node請求fetch消息
            else if (this.client.pendingRequestCount(node) == 0) {
                Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
                if (fetch == null) {
                    fetch = new HashMap<>();
                    fetchable.put(node, fetch);
                }

                // 通過SubscriptionState 查找每個partition對應的position
                // 然后封裝成PartitionData對象
                long position = this.subscriptions.position(partition);
                fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize));
            }
        }

        Map<Node, FetchRequest> requests = new HashMap<>();
        for (Map.Entry<Node, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
            // 將發(fā)往統(tǒng)一node的所有TopicPartition封裝成一個FetchRequest對象
            Node node = entry.getKey();
            FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
            requests.put(node, fetch);
        }
        return requests;
    }

    private Set<TopicPartition> fetchablePartitions() {
        // 先獲取consumer訂閱的partition
        Set<TopicPartition> fetchable = subscriptions.fetchablePartitions();
        // 下面2個隊列如果存在就說明已經(jīng)fetch過了滋戳,不用再fetch了
        if (nextInLineRecords != null && !nextInLineRecords.isEmpty())
            fetchable.remove(nextInLineRecords.partition);
        for (CompletedFetch completedFetch : completedFetches)
            fetchable.remove(completedFetch.partition);
        return fetchable;
    }

    // 發(fā)送fetch請求
    public void sendFetches() {
        for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests().entrySet()) {
            final FetchRequest request = fetchEntry.getValue();
            client.send(fetchEntry.getKey(), ApiKeys.FETCH, request)
                    .addListener(new RequestFutureListener<ClientResponse>() {
                        @Override
                        public void onSuccess(ClientResponse resp) {
                            FetchResponse response = new FetchResponse(resp.responseBody());
                            // 收到的FetchResponse緩存到completedFetches隊里里
                            for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                                TopicPartition partition = entry.getKey();
                                long fetchOffset = request.fetchData().get(partition).offset;
                                FetchResponse.PartitionData fetchData = entry.getValue();
                                completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator));
                            }
                        }
                    });
        }
    }

    // 處理completedFetches隊列里的緩存
    public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
        if (this.subscriptions.partitionAssignmentNeeded()) {
            // 需要進行rebalance,返回空
            return Collections.emptyMap();
        } else {
            Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
            int recordsRemaining = maxPollRecords; // 一次去除消息個最大個數(shù)
            Iterator<CompletedFetch> completedFetchesIterator = completedFetches.iterator();

            while (recordsRemaining > 0) {
                // 先把completedFetches轉移到nextInLineRecords里
                if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
                    if (!completedFetchesIterator.hasNext())
                        break;

                    CompletedFetch completion = completedFetchesIterator.next();
                    completedFetchesIterator.remove();
                    nextInLineRecords = parseFetchedData(completion);
                } else {
                    recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining);
                }
            }

            return drained;
        }
    }

    // 解析CompletedFetch
    private PartitionRecords<K, V> parseFetchedData(CompletedFetch completedFetch) {
        TopicPartition tp = completedFetch.partition;
        FetchResponse.PartitionData partition = completedFetch.partitionData;
        long fetchOffset = completedFetch.fetchedOffset;
        int bytes = 0;
        int recordsCount = 0;
        PartitionRecords<K, V> parsedRecords = null;

        if (!subscriptions.isFetchable(tp)) {
        } else if (partition.errorCode == Errors.NONE.code()) {
            Long position = subscriptions.position(tp);

            ByteBuffer buffer = partition.recordSet;
            MemoryRecords records = MemoryRecords.readableRecords(buffer);
            List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
            boolean skippedRecords = false;
            for (LogEntry logEntry : records) {
                if (logEntry.offset() >= position) {
                    parsed.add(parseRecord(tp, logEntry));
                    bytes += logEntry.size();
                } else {
                    // 忽略在本地記錄offset之前的消息
                    skippedRecords = true;
                }
            }

            recordsCount = parsed.size();

            if (!parsed.isEmpty()) {
                // 解析后的Record集合封裝成PartitionRecords對象
                parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed);
                ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
            }
        }

        return parsedRecords;
    }

    private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained, PartitionRecords<K, V> partitionRecords, int maxRecords) {
        long position = subscriptions.position(partitionRecords.partition);
        if (partitionRecords.fetchOffset == position) {
            // 獲取消息集合啥刻,最多maxRecords個
            List<ConsumerRecord<K, V>> partRecords = partitionRecords.take(maxRecords);
            long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1; // 最后一個消息的offset

            // 追加消息集合
            List<ConsumerRecord<K, V>> records = drained.get(partitionRecords.partition);
            if (records == null) {
                records = partRecords;
                drained.put(partitionRecords.partition, records);
            } else {
                records.addAll(partRecords);
            }

            // 更新對應topicPartition的position
            subscriptions.position(partitionRecords.partition, nextOffset);
            return partRecords.size();
        }

        return 0;
    }

    // 重置TopicPartition的position
    public void updateFetchPositions(Set<TopicPartition> partitions) {
        for (TopicPartition tp : partitions) {
            if (!subscriptions.isAssigned(tp) || subscriptions.isFetchable(tp))
                continue; // 如果consumer有position就跳過重置

            if (subscriptions.isOffsetResetNeeded(tp)) {
                // 按照指定的策略重置position
                resetOffset(tp);
            } else if (subscriptions.committed(tp) == null) {
                // consumer沒有commit的offset奸鸯,按缺省策略重置
                subscriptions.needOffsetReset(tp);
                resetOffset(tp);
            } else {
                // 否則就將position更新為commit的offset
                long committed = subscriptions.committed(tp).offset();
                subscriptions.seek(tp, committed);
            }
        }
    }

    private void resetOffset(TopicPartition partition) {
        OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
        final long timestamp;
        if (strategy == OffsetResetStrategy.EARLIEST)
            timestamp = ListOffsetRequest.EARLIEST_TIMESTAMP;
        else if (strategy == OffsetResetStrategy.LATEST)
            timestamp = ListOffsetRequest.LATEST_TIMESTAMP;
        else
            throw new NoOffsetForPartitionException(partition);

        // 根據(jù)時間戳向分區(qū)的leader node發(fā)送OffsetRequest
        long offset = listOffset(partition, timestamp);

        if (subscriptions.isAssigned(partition))
            this.subscriptions.seek(partition, offset); // 更新position
    }
}
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市可帽,隨后出現(xiàn)的幾起案子娄涩,更是在濱河造成了極大的恐慌,老刑警劉巖映跟,帶你破解...
    沈念sama閱讀 218,755評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蓄拣,死亡現(xiàn)場離奇詭異,居然都是意外死亡努隙,警方通過查閱死者的電腦和手機球恤,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來荸镊,“玉大人咽斧,你說我怎么就攤上這事」妫” “怎么了张惹?”我有些...
    開封第一講書人閱讀 165,138評論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長岭洲。 經(jīng)常有香客問我宛逗,道長,這世上最難降的妖魔是什么盾剩? 我笑而不...
    開封第一講書人閱讀 58,791評論 1 295
  • 正文 為了忘掉前任雷激,我火速辦了婚禮,結果婚禮上告私,老公的妹妹穿的比我還像新娘侥锦。我一直安慰自己,他們只是感情好德挣,可當我...
    茶點故事閱讀 67,794評論 6 392
  • 文/花漫 我一把揭開白布茬射。 她就那樣靜靜地躺著唤蔗,像睡著了一般餐弱。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上唠帝,一...
    開封第一講書人閱讀 51,631評論 1 305
  • 那天,我揣著相機與錄音玄柏,去河邊找鬼襟衰。 笑死,一個胖子當著我的面吹牛粪摘,可吹牛的內容都是我干的瀑晒。 我是一名探鬼主播,決...
    沈念sama閱讀 40,362評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼徘意,長吁一口氣:“原來是場噩夢啊……” “哼苔悦!你這毒婦竟也來了?” 一聲冷哼從身側響起椎咧,我...
    開封第一講書人閱讀 39,264評論 0 276
  • 序言:老撾萬榮一對情侶失蹤玖详,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后勤讽,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蟋座,經(jīng)...
    沈念sama閱讀 45,724評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年脚牍,在試婚紗的時候發(fā)現(xiàn)自己被綠了向臀。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,040評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡诸狭,死狀恐怖飒硅,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情作谚,我是刑警寧澤,帶...
    沈念sama閱讀 35,742評論 5 346
  • 正文 年R本政府宣布庵芭,位于F島的核電站妹懒,受9級特大地震影響,放射性物質發(fā)生泄漏双吆。R本人自食惡果不足惜眨唬,卻給世界環(huán)境...
    茶點故事閱讀 41,364評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望好乐。 院中可真熱鬧匾竿,春花似錦、人聲如沸蔚万。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至昵慌,卻和暖如春假夺,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背斋攀。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評論 1 270
  • 我被黑心中介騙來泰國打工已卷, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人淳蔼。 一個月前我還...
    沈念sama閱讀 48,247評論 3 371
  • 正文 我出身青樓侧蘸,卻偏偏與公主長得像,于是被迫代替她去往敵國和親鹉梨。 傳聞我的和親對象是個殘疾皇子讳癌,可洞房花燭夜當晚...
    茶點故事閱讀 44,979評論 2 355

推薦閱讀更多精彩內容