public class KafkaConsumer<K, V> implements Consumer<K, V> {
// clientId的生成器
private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
// comsumer的唯一id
private final String clientId;
// 控制consumer和服務端Coordinator之間的通信邏輯
private final ConsumerCoordinator coordinator;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
// poll方法返回用戶之前攔截,服務端返回commit響應時攔截
private final ConsumerInterceptors<K, V> interceptors;
// 負責consumer與broker之間的通信
private final ConsumerNetworkClient client;
// 維護消費者的消費狀態(tài)
private final SubscriptionState subscriptions;
// Kafka集群元信息
private final Metadata metadata;
// 當前使用KafkaConsumer的線程id
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
// 重入次數(shù)
// 檢測是否有多線程并發(fā)操作consumer
private final AtomicInteger refcount = new AtomicInteger(0);
@Override
public ConsumerRecords<K, V> poll(long timeout) {
acquire(); // 防止多線程操作
try {
long start = time.milliseconds();
long remaining = timeout;
do {
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
// 有消息就返回,退出poll
if (!records.isEmpty()) {
// 為了提升效率写烤,對records集合處理之前肄程,先發(fā)送一次FetchRequest,
// 這樣線程處理完records的同時,F(xiàn)etchRequest和FetchResponse在網(wǎng)絡上也在并行傳輸
fetcher.sendFetches();
// 發(fā)送FetchRequest,不可中斷原因:
// since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
// 之前的pollOnce
client.pollNoWakeup();
if (this.interceptors == null)
return new ConsumerRecords<>(records);
else
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
long elapsed = time.milliseconds() - start;
remaining = timeout - elapsed;
} while (remaining > 0);
// 超過時間還收到消息就返回空
return ConsumerRecords.empty();
} finally {
release();
}
}
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
coordinator.ensureCoordinatorReady();
if (subscriptions.partitionsAutoAssigned())
// 如果是AUTO模式疹吃,要先完成rebalance
coordinator.ensurePartitionAssignment();
// 如果consumer存在訂閱的TopicPartition沒有position歉摧,還需要恢復SubscriptionState中對應TopicPartitionState狀態(tài)
// 如果缺失commit核畴,從服務端拉取commited,然后同步到position
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
long now = time.milliseconds();
// 執(zhí)行定時任務 HeartbeatTask和AutoCommitedTask
// 從delayedTasks隊列里拉取計劃在當前時間前執(zhí)行的定時任務
client.executeDelayedTasks(now);
// 嘗試從completedFetch隊列緩存里獲取解析消息
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty())
return records;
fetcher.sendFetches();
client.poll(timeout, now);
return fetcher.fetchedRecords();
}
private void acquire() {
ensureNotClosed();
long threadId = Thread.currentThread().getId();
if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
// 檢測到被其他線程占用莺奸,就拋出異常
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
refcount.incrementAndGet();
}
// 釋放占用
private void release() {
if (refcount.decrementAndGet() == 0)
currentThread.set(NO_CURRENT_THREAD);
}
private void updateFetchPositions(Set<TopicPartition> partitions) {
// 刷新本地SubscriptionState.TopicPartitionState.commit的offset
coordinator.refreshCommittedOffsetsIfNeeded();
// 如果commit是null,就重置offset
fetcher.updateFetchPositions(partitions);
}
}
public class ConsumerNetworkClient implements Closeable {
// NetworkClient
private final KafkaClient client;
// consumer之外的thread設置,表示要中斷consumer線程
private final AtomicBoolean wakeup = new AtomicBoolean(false);
// 定時任務隊列,主要是心跳任務
// 底層實現(xiàn)是PriorityQueue
private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue();
// 緩沖隊列
private final Map<Node, List<ClientRequest>> unsent = new HashMap<>();
// 集群元數(shù)據(jù)
private final Metadata metadata;
// 在unset中緩存的超時時長
private final long unsentExpiryMs;
// consumer每進入一個不可中斷的method加1真友,退出時減1
// 判斷是否允許喚醒selector阻塞
private int wakeupDisabledCount = 0;
// 待發(fā)送的請求封裝成ClientRequest是嗜,然后保存到unsent
public RequestFuture<ClientResponse> send(Node node, ApiKeys api, AbstractRequest request) {
long now = time.milliseconds();
RequestFutureCompletionHandler future = new RequestFutureCompletionHandler();
RequestHeader header = client.nextRequestHeader(api);
RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
put(node, new ClientRequest(now, true, send, future));
return future;
}
public void poll(RequestFuture<?> future) {
while (!future.isDone()) // 同步阻塞等待future完成響應
poll(Long.MAX_VALUE);
}
// 不阻塞等待也不喚醒
public void pollNoWakeup() {
disableWakeups();
try {
poll(0, time.milliseconds(), false);
} finally {
enableWakeups();
}
}
private void poll(long timeout, long now, boolean executeDelayedTasks) {
// 遍歷處理unsent緩存中的請求
trySend(now);
// 比較取最小值丽柿,避免影響定時任務執(zhí)行
timeout = Math.min(timeout, delayedTasks.nextTimeout(now));
// 實際發(fā)送請求,檢測wakeup標識為true就拋出異常中斷consumer.poll方法
clientPoll(timeout, now);
now = time.milliseconds();
// 如果連接斷開,從unsent隊列里刪除后,再調用這些request的callback
checkDisconnects(now);
// 執(zhí)行定時任務
if (executeDelayedTasks)
delayedTasks.poll(now);
// 可能已經(jīng)新建了某些node的連接,再嘗試一把
trySend(now);
// 遍歷unsent中已經(jīng)超時的request,執(zhí)行callback,然后從unsent里刪除
failExpiredRequests(now);
}
private boolean trySend(long now) {
boolean requestsSent = false;
for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
Node node = requestEntry.getKey();
Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
while (iterator.hasNext()) {
ClientRequest request = iterator.next();
// 檢測連接矫钓、在途請求隊列數(shù)量
if (client.ready(node, now)) {
// 復制到KafkaChannel的send
client.send(request, now);
iterator.remove();
requestsSent = true;
}
}
}
return requestsSent;
}
// 設置MAX超時時長要尔,同步阻塞等待
public void awaitMetadataUpdate() {
int version = this.metadata.requestUpdate();
do {
poll(Long.MAX_VALUE);
} while (this.metadata.version() == version);
}
// 等待unsent和InFlightRequests中的請求全部完成
public void awaitPendingRequests(Node node) {
while (pendingRequestCount(node) > 0)
poll(retryBackoffMs);
}
public static class RequestFutureCompletionHandler extends RequestFuture<ClientResponse> implements RequestCompletionHandler {
// 請求是否已經(jīng)完成
private boolean isDone = false;
// 成功響應,與exception互斥
private T value;
// 導致異常的類
private RuntimeException exception;
// 監(jiān)聽請求完成的情況舍杜,onSucess和onFailure方法
private List<RequestFutureListener<T>> listeners = new ArrayList<>();
@Override
public void onComplete(ClientResponse response) {
if (response.wasDisconnected()) {
ClientRequest request = response.request();
RequestSend send = request.request();
ApiKeys api = ApiKeys.forId(send.header().apiKey());
int correlation = send.header().correlationId();
raise(DisconnectException.INSTANCE);
} else {
complete(response);
}
}
// 適配將本實例的泛型類型T轉換成S
public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
final RequestFuture<S> adapted = new RequestFuture<S>();
addListener(new RequestFutureListener<T>() {
@Override
public void onSuccess(T value) {
adapter.onSuccess(value, adapted);
}
@Override
public void onFailure(RuntimeException e) {
adapter.onFailure(e, adapted);
}
});
return adapted;
}
}
}
public class SubscriptionState {
private enum SubscriptionType {
NONE,
AUTO_TOPICS, // 指定topic名
AUTO_PATTERN, // 正則匹配topic名
USER_ASSIGNED // 用戶指定
};
private SubscriptionType subscriptionType; // 表示訂閱的模式
private Pattern subscribedPattern; // 正則匹配模式的表達式
private final Set<String> subscription; // 所有訂閱的topic名
private final Set<String> groupSubscription; // 只有consumerGroup的leader才有,記錄該consumerGroup訂閱的所有topic; follower只有自己訂閱的topic
private final Set<TopicPartition> userAssignment; // 手動分配給consumer的topicPartition集合赵辕,與subscription互斥
// 分配給當前consumer的分區(qū)
// 記錄每個topicPartition的消費狀況,例如offset
private final Map<TopicPartition, TopicPartitionState> assignment;
// 是否需要分區(qū)分配, needsRejoin會根據(jù)這個判斷處理
// consumer訂閱某個topic時設置成true
private boolean needsPartitionAssignment;
// 是否需要拉取offset既绩,在異步提交offset或rebalance分區(qū)時候會設置成true
private boolean needsFetchCommittedOffsets;
private final OffsetResetStrategy defaultResetStrategy; // 重置offset策略
private ConsumerRebalanceListener listener; // 監(jiān)聽分區(qū)分配操作
private static class TopicPartitionState {
private Long position; // 最近消費消息的offset
private OffsetAndMetadata committed; // 最近commit的offset
private boolean paused; // 是否處于暫停狀態(tài)
private OffsetResetStrategy resetStrategy; // 重置offset的策略
}
// comsumer訂閱topic時候回被調用
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
setSubscriptionType(SubscriptionType.AUTO_TOPICS);
// 缺省的listener是 NoOpConsumerRebalanceListener
this.listener = listener;
// 更新subscription笆怠、groupSubscription椎镣、needsPartitionAssignment=true
changeSubscription(topics);
}
}
public final class ConsumerCoordinator extends AbstractCoordinator {
// consumer發(fā)送的JoinGroupRequest中包含了自身支持的PartitionAssigner,
// GroupCoordinator從所有consumer的分配策略里選擇一個,通知leader使用此策略做分區(qū)分配
private final List<PartitionAssignor> assignors;
private final Metadata metadata;
private final SubscriptionState subscriptions;
private final boolean autoCommitEnabled;
private final AutoCommitTask autoCommitTask; // 自動提交offset的定時任務
private final ConsumerInterceptors<?, ?> interceptors;
private final boolean excludeInternalTopics; // 是否排除內部topic
// 用來檢測topic是否發(fā)生了分區(qū)數(shù)量的變化
private MetadataSnapshot metadataSnapshot;
// 構造方法
public ConsumerCoordinator(ConsumerNetworkClient client, String groupId, int sessionTimeoutMs, int heartbeatIntervalMs,
List<PartitionAssignor> assignors, Metadata metadata, SubscriptionState subscriptions,
Metrics metrics, String metricGrpPrefix, Time time, long retryBackoffMs,
OffsetCommitCallback defaultOffsetCommitCallback, boolean autoCommitEnabled,
long autoCommitIntervalMs, ConsumerInterceptors<?, ?> interceptors, boolean excludeInternalTopics) {
super(client, groupId, sessionTimeoutMs, heartbeatIntervalMs, metrics, metricGrpPrefix, time, retryBackoffMs);
this.metadata = metadata;
this.metadata.requestUpdate();
this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch());
this.subscriptions = subscriptions;
this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
this.autoCommitEnabled = autoCommitEnabled;
this.assignors = assignors;
// 添加metadata更新監(jiān)聽
addMetadataListener();
if (autoCommitEnabled) {
this.autoCommitTask = new AutoCommitTask(autoCommitIntervalMs);
this.autoCommitTask.reschedule();
} else {
this.autoCommitTask = null;
}
this.interceptors = interceptors;
this.excludeInternalTopics = excludeInternalTopics;
}
// Metadata更新監(jiān)聽
private void addMetadataListener() {
this.metadata.addListener(new Metadata.Listener() {
@Override
public void onMetadataUpdate(Cluster cluster) {
// 正則匹配topic模式
if (subscriptions.hasPatternSubscription()) {
final List<String> topicsToSubscribe = new ArrayList<>();
for (String topic : cluster.topics())
if (filterTopic(topic)) // 正則匹配
topicsToSubscribe.add(topic);
// 更新subscription幕随、groupScription集合蛤高、assignment集合
subscriptions.changeSubscription(topicsToSubscribe);
// 更新元信息的topic集合
metadata.setTopics(subscriptions.groupSubscription());
} else if (!cluster.unauthorizedTopics().isEmpty()) {
throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
}
// 非手動,即AUTO_TOPICS或AUTO_PATTERN
if (subscriptions.partitionsAutoAssigned()) {
MetadataSnapshot snapshot = new MetadataSnapshot(subscriptions, cluster);
// metadataSnapshot底層是map: topic -> partition數(shù)量
// 不相等說明分區(qū)產(chǎn)生了變化懊昨,需要rebalance
if (!snapshot.equals(metadataSnapshot)) {
metadataSnapshot = snapshot;
subscriptions.needReassignment();
}
}
}
});
}
// JoinGroup的入口,即rebalance
public void ensurePartitionAssignment() {
// 只有自動分配分區(qū)的才需要rebalance
if (subscriptions.partitionsAutoAssigned()) {
if (subscriptions.hasPatternSubscription())
// 訂閱是正則匹配模式扁位,還需要檢查是否需要更新Metadata
// 防止使用過期的Metadata進行rebalance
client.ensureFreshMetadata();
ensureActiveGroup();
}
}
@Override
protected void onJoinPrepare(int generation, String memberId) {
// 如果開啟了自動提交offset爽雄,則同步提交offset
maybeAutoCommitOffsetsSync();
ConsumerRebalanceListener listener = subscriptions.listener();
Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
// 調用分區(qū)重新分配的callback
listener.onPartitionsRevoked(revoked);
assignmentSnapshot = null;
// groupSubscription收縮到自身的subscription
// needsPartitionAssignment=true
subscriptions.needReassignment();
}
// 收到JoinGroupResponse后兜叨,被指定為join leader的consumer忆首,執(zhí)行分配策略
@Override
protected Map<String, ByteBuffer> performAssignment(String leaderId,
String assignmentStrategy,
Map<String, ByteBuffer> allSubscriptions) {
// 默認是range分配策略
PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
Set<String> allSubscribedTopics = new HashSet<>();
Map<String, Subscription> subscriptions = new HashMap<>();
for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {
Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());
subscriptions.put(subscriptionEntry.getKey(), subscription);
allSubscribedTopics.addAll(subscription.topics());
}
// leader需要更新整個consumer group的訂閱topic
// 可能有新的topic加入悴能,需要更新Metadata
this.subscriptions.groupSubscribe(allSubscribedTopics);
metadata.setTopics(this.subscriptions.groupSubscription());
client.ensureFreshMetadata();
assignmentSnapshot = metadataSnapshot;
// 默認調用RangeAssignor
// 分配結果: memberId -> 分配結果
Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);
Map<String, ByteBuffer> groupAssignment = new HashMap<>();
for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) {
ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
groupAssignment.put(assignmentEntry.getKey(), buffer);
}
return groupAssignment;
}
// 處理SyncGroupResponse
@Override
protected void onJoinComplete(int generation, String memberId, String assignmentStrategy, ByteBuffer assignmentBuffer) {
// 快照與最新的不一致浑劳,需要重新分區(qū)Assign
if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) {
subscriptions.needReassignment();
return;
}
PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
// 從服務端獲取最近一次的offset標識
subscriptions.needRefreshCommits();
// 更新當前consumer訂閱的topic
subscriptions.assignFromSubscribed(assignment.partitions());
// 重新啟動AutoCommitTask定時任務
if (autoCommitEnabled)
autoCommitTask.reschedule();
// rebalance后執(zhí)行callback
ConsumerRebalanceListener listener = subscriptions.listener();
Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
listener.onPartitionsAssigned(assigned);
}
}
public class RangeAssignor extends AbstractPartitionAssignor {
@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, List<String>> subscriptions) {
for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
Collections.sort(consumersForTopic);
// 每個consumer訂閱partition數(shù)量
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
// 除不盡余數(shù)的partition單獨分配給consumer
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
}
}
return assignment;
}
}
public abstract class AbstractPartitionAssignor implements PartitionAssignor {
// 完成partition分配
@Override
public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
Set<String> allSubscribedTopics = new HashSet<>();
Map<String, List<String>> topicSubscriptions = new HashMap<>();
// 父類默認是去掉userData不處理的
// 如果子類需要用到userData,就要自己實現(xiàn)PartitionAssignor接口的assign方法
for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
List<String> topics = subscriptionEntry.getValue().topics();
allSubscribedTopics.addAll(topics);
topicSubscriptions.put(subscriptionEntry.getKey(), topics);
}
// 統(tǒng)計每個topic的分區(qū)數(shù)量
Map<String, Integer> partitionsPerTopic = new HashMap<>();
for (String topic : allSubscribedTopics) {
Integer numPartitions = metadata.partitionCountForTopic(topic);
if (numPartitions != null && numPartitions > 0)
partitionsPerTopic.put(topic, numPartitions);
}
Map<String, List<TopicPartition>> rawAssignments = assign(partitionsPerTopic, topicSubscriptions);
Map<String, Assignment> assignments = new HashMap<>();
for (Map.Entry<String, List<TopicPartition>> assignmentEntry : rawAssignments.entrySet())
assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue()));
return assignments;
}
}
public interface PartitionAssignor {
// 每個member的訂閱信息
class Subscription {
private final List<String> topics; // 訂閱的topic集合
private final ByteBuffer userData;
}
class Assignment {
private final List<TopicPartition> partitions; // 分區(qū)分配的結果
private final ByteBuffer userData;
}
}
public abstract class AbstractCoordinator implements Closeable {
private final Heartbeat heartbeat; // 心跳任務的輔助類
private final HeartbeatTask heartbeatTask; // 定時任務份帐,發(fā)送心跳和處理響應
protected final String groupId; // consumer group id
protected final ConsumerNetworkClient client; // 網(wǎng)絡通信
private boolean needsJoinPrepare = true; // 是否需要發(fā)送joinGroupRequest前的準備操作
// 是否需要重新發(fā)送JoinGroupRequest的條件之一
// 一般收到response的錯誤碼是需要rebalance時璃吧,會設置成true
// JoinGroupResponse收到后設置成false
// 缺省是true
private boolean rejoinNeeded = true;
protected Node coordinator; // 記錄服務端GroupCoordinator所在的node節(jié)點
protected String memberId; // 服務端GroupCoordinator返回的分配給consumer的唯一id
protected int generation; // 可以理解每次rebalance的版本號,避免消費歷史的rebalance請求
private class HeartbeatTask implements DelayedTask {
// 外部調用觸發(fā)心跳任務
public void reset() {
long now = time.milliseconds();
heartbeat.resetSessionTimeout(now);
client.unschedule(this);
if (!requestInFlight)
client.schedule(this, now);
}
@Override
public void run(final long now) {
// 之前的心跳請求正常收到響應
// 不處于正在等待rebalance分配結果的狀態(tài)
// 服務端的GroupCoordinator已連接
if (generation < 0 || needRejoin() || coordinatorUnknown()) {
return;
}
if (heartbeat.sessionTimeoutExpired(now)) {
// 心跳超時則認為服務端GroupCoordinator已經(jīng)宕機
coordinatorDead();
return;
}
if (!heartbeat.shouldHeartbeat(now)) {
// 還沒到下一次心跳間隔觸發(fā)時間废境,不發(fā)送請求(等于本次任務結束)畜挨,
// 更新下一個觸發(fā)時間點,再添加一個新的定時任務
client.schedule(this, now + heartbeat.timeToNextHeartbeat(now));
} else {
heartbeat.sentHeartbeat(now);
requestInFlight = true; // 防止重復發(fā)送
// 發(fā)送心跳請求
RequestFuture<Void> future = sendHeartbeatRequest();
// 注冊該請求收到響應的callback
future.addListener(new RequestFutureListener<Void>() {
// 發(fā)送完成后新增定時任務調度
@Override
public void onSuccess(Void value) {
requestInFlight = false;
long now = time.milliseconds();
heartbeat.receiveHeartbeat(now);
long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
client.schedule(HeartbeatTask.this, nextHeartbeatTime);
}
@Override
public void onFailure(RuntimeException e) {
requestInFlight = false;
client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs);
}
});
}
}
}
// 處理心跳響應
private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
@Override
public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
Errors error = Errors.forCode(heartbeatResponse.errorCode());
if (error == Errors.NONE) {
// 成功響應噩凹,傳播成功事件
future.complete(null);
} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR_FOR_GROUP) {
coordinatorDead();
future.raise(error);
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
// 說明coordinator已經(jīng)發(fā)起了rebalance
// 觸發(fā)發(fā)送JoinGroupRequest的標識
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.REBALANCE_IN_PROGRESS);
} else if (error == Errors.ILLEGAL_GENERATION) {
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.ILLEGAL_GENERATION);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.UNKNOWN_MEMBER_ID);
future.raise(new GroupAuthorizationException(groupId));
} else {
future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
}
}
}
protected void coordinatorDead() {
if (this.coordinator != null) {
// unsent緩存中的請求清空巴元,并且調用異常的回調
client.failUnsentRequests(this.coordinator, GroupCoordinatorNotAvailableException.INSTANCE);
// 表示重新選擇GroupCoordinator
this.coordinator = null;
}
}
// 查找服務端GroupCoordinator
// 后面的rebalance、fetch消息和commit offset驮宴,都是和GroupCoordinator打交道
public void ensureCoordinatorReady() {
while (coordinatorUnknown()) {
RequestFuture<Void> future = sendGroupCoordinatorRequest();
// 阻塞等待future有響應
client.poll(future);
if (future.failed()) {
if (future.isRetriable())
client.awaitMetadataUpdate();
else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
coordinatorDead();
// 通過sleep控制重試連接間隔
time.sleep(retryBackoffMs);
}
}
}
// 處理服務端返回查找GroupCoordinator的應答
// 賦值coordinator字段逮刨,連接coordinator,啟動心跳任務
private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void> future) {
if (!coordinatorUnknown()) {
// consumer已經(jīng)找到GroupCoordinator了堵泽,不處理這個應答
future.complete(null);
} else {
GroupCoordinatorResponse groupCoordinatorResponse = new GroupCoordinatorResponse(resp.responseBody());
Errors error = Errors.forCode(groupCoordinatorResponse.errorCode());
if (error == Errors.NONE) {
this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
groupCoordinatorResponse.node().host(),
groupCoordinatorResponse.node().port());
client.tryConnect(coordinator);
if (generation > 0)
heartbeatTask.reset();
future.complete(null);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {
future.raise(error);
}
}
}
public void ensureActiveGroup() {
if (!needRejoin())
return;
if (needsJoinPrepare) {
onJoinPrepare(generation, memberId);
needsJoinPrepare = false;
}
while (needRejoin()) {
// 檢查已經(jīng)連接服務端的groupCoordinator
ensureCoordinatorReady();
// 如果還有發(fā)送給GroupCoordinator的請求修己,阻塞等待這些請求收到響應
// 即等待unsent和InFlightRequests隊列為空
if (client.pendingRequestCount(this.coordinator) > 0) {
client.awaitPendingRequests(this.coordinator);
continue;
}
RequestFuture<ByteBuffer> future = sendJoinGroupRequest();
future.addListener(new RequestFutureListener<ByteBuffer>() {
@Override
public void onSuccess(ByteBuffer value) {
onJoinComplete(generation, memberId, protocol, value);
needsJoinPrepare = true;
heartbeatTask.reset();
}
@Override
public void onFailure(RuntimeException e) {
}
});
client.poll(future);
if (future.failed()) {
RuntimeException exception = future.exception();
if (exception instanceof UnknownMemberIdException ||
exception instanceof RebalanceInProgressException ||
exception instanceof IllegalGenerationException)
continue;
else if (!future.isRetriable())
throw exception;
// 通過sleep控制重試間隔
time.sleep(retryBackoffMs);
}
}
}
// JoinGroupRequest設置到sent字段里
private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
JoinGroupRequest request = new JoinGroupRequest( groupId, this.sessionTimeoutMs,
this.memberId, protocolType(), metadata());
return client.send(coordinator, ApiKeys.JOIN_GROUP, request)
.compose(new JoinGroupResponseHandler());
}
// 處理JoinGroupResponse
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
@Override
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
Errors error = Errors.forCode(joinResponse.errorCode());
if (error == Errors.NONE) {
// 更新本地信息
AbstractCoordinator.this.memberId = joinResponse.memberId();
AbstractCoordinator.this.generation = joinResponse.generationId();
AbstractCoordinator.this.rejoinNeeded = false;
AbstractCoordinator.this.protocol = joinResponse.groupProtocol();
// 判斷自己是不是join leader
if (joinResponse.isLeader()) {
onJoinLeader(joinResponse).chain(future);
} else {
onJoinFollower().chain(future);
}
} else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
// 重試
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
future.raise(Errors.UNKNOWN_MEMBER_ID);
} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR_FOR_GROUP) {
coordinatorDead();
future.raise(error);
} else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
|| error == Errors.INVALID_SESSION_TIMEOUT
|| error == Errors.INVALID_GROUP_ID) {
log.error("Attempt to join group {} failed due to fatal error: {}", groupId, error.message());
future.raise(error);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {
future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
}
}
// join leader的邏輯
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
// 執(zhí)行分配
Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),
joinResponse.members());
// 發(fā)送請求
SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment);
return sendSyncGroupRequest(request);
}
}
}
public final class Heartbeat {
private final long timeout; // 過期時間
private final long interval; // 2次心跳的間隔恢总,缺省3000
private long lastHeartbeatSend; // 最后發(fā)送心跳請求的時間
private long lastHeartbeatReceive; // 最后收到心跳響應的時間
private long lastSessionReset; //心跳重置時間
// 計算下次心跳發(fā)送時間
public long timeToNextHeartbeat(long now) {
long timeSinceLastHeartbeat = now - Math.max(lastHeartbeatSend, lastSessionReset);
if (timeSinceLastHeartbeat > interval)
return 0;
else
return interval - timeSinceLastHeartbeat;
}
// 判斷是否超時
public boolean sessionTimeoutExpired(long now) {
return now - Math.max(lastSessionReset, lastHeartbeatReceive) > timeout;
}
}
// 從服務端拉取消息
public class Fetcher<K, V> {
// 負責網(wǎng)絡通信
private final ConsumerNetworkClient client;
// 服務端收到FetchRequest后并不是立即響應,當返回的消息積累到至少minBytes個字節(jié)才響應, 提高網(wǎng)絡有效負載
// 服務端根據(jù)請求里的minBytes決定啥時候返回?
private final int minBytes;
// 等待FetchResponse的最長時長睬愤,服務端根據(jù)此事件決定何時響應
private final int maxWaitMs;
// 每次fetch的最大字節(jié)數(shù)
private final int fetchSize;
// 每次獲取Record的最大數(shù)量
private final int maxPollRecords;
private final Metadata metadata; // Kafka集群元數(shù)據(jù)
private final SubscriptionState subscriptions; // 記錄每個TopicPartition的消費情況
// FetchResponse先轉換成CompletedFetch對象進入隊列緩存片仿,后續(xù)再解析響應消息
private final List<CompletedFetch> completedFetches;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
// 保存了CompletedFetch解析后的結果集合
// CompletedFetch里的消息只是ByteBuffer,經(jīng)過offset+size確定長度,然后反序列拿到實際結構消息
// 泛型的ConsumerRecord集合存放在nextInLineRecords里戴涝,也是最終KafkaConsumer返回的結果
private PartitionRecords<K, V> nextInLineRecords = null;
private static class PartitionRecords<K, V> {
private long fetchOffset; // 記錄了records中的第一個消息的offset
private TopicPartition partition;
private List<ConsumerRecord<K, V>> records; // 消息集合
}
// 創(chuàng)建FetchRequest請求
private Map<Node, FetchRequest> createFetchRequests() {
Cluster cluster = metadata.fetch();
Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<>();
// 遍歷可以fetch的partition
for (TopicPartition partition : fetchablePartitions()) {
// leader副本
Node node = cluster.leaderFor(partition);
if (node == null) {
metadata.requestUpdate();
}
// 如果leader副本對應的unsent或InFlightRequest隊列里還有請求為發(fā)送
// 就不對這個node請求fetch消息
else if (this.client.pendingRequestCount(node) == 0) {
Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
if (fetch == null) {
fetch = new HashMap<>();
fetchable.put(node, fetch);
}
// 通過SubscriptionState 查找每個partition對應的position
// 然后封裝成PartitionData對象
long position = this.subscriptions.position(partition);
fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize));
}
}
Map<Node, FetchRequest> requests = new HashMap<>();
for (Map.Entry<Node, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
// 將發(fā)往統(tǒng)一node的所有TopicPartition封裝成一個FetchRequest對象
Node node = entry.getKey();
FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
requests.put(node, fetch);
}
return requests;
}
private Set<TopicPartition> fetchablePartitions() {
// 先獲取consumer訂閱的partition
Set<TopicPartition> fetchable = subscriptions.fetchablePartitions();
// 下面2個隊列如果存在就說明已經(jīng)fetch過了滋戳,不用再fetch了
if (nextInLineRecords != null && !nextInLineRecords.isEmpty())
fetchable.remove(nextInLineRecords.partition);
for (CompletedFetch completedFetch : completedFetches)
fetchable.remove(completedFetch.partition);
return fetchable;
}
// 發(fā)送fetch請求
public void sendFetches() {
for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests().entrySet()) {
final FetchRequest request = fetchEntry.getValue();
client.send(fetchEntry.getKey(), ApiKeys.FETCH, request)
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) {
FetchResponse response = new FetchResponse(resp.responseBody());
// 收到的FetchResponse緩存到completedFetches隊里里
for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
long fetchOffset = request.fetchData().get(partition).offset;
FetchResponse.PartitionData fetchData = entry.getValue();
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator));
}
}
});
}
}
// 處理completedFetches隊列里的緩存
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
if (this.subscriptions.partitionAssignmentNeeded()) {
// 需要進行rebalance,返回空
return Collections.emptyMap();
} else {
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
int recordsRemaining = maxPollRecords; // 一次去除消息個最大個數(shù)
Iterator<CompletedFetch> completedFetchesIterator = completedFetches.iterator();
while (recordsRemaining > 0) {
// 先把completedFetches轉移到nextInLineRecords里
if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
if (!completedFetchesIterator.hasNext())
break;
CompletedFetch completion = completedFetchesIterator.next();
completedFetchesIterator.remove();
nextInLineRecords = parseFetchedData(completion);
} else {
recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining);
}
}
return drained;
}
}
// 解析CompletedFetch
private PartitionRecords<K, V> parseFetchedData(CompletedFetch completedFetch) {
TopicPartition tp = completedFetch.partition;
FetchResponse.PartitionData partition = completedFetch.partitionData;
long fetchOffset = completedFetch.fetchedOffset;
int bytes = 0;
int recordsCount = 0;
PartitionRecords<K, V> parsedRecords = null;
if (!subscriptions.isFetchable(tp)) {
} else if (partition.errorCode == Errors.NONE.code()) {
Long position = subscriptions.position(tp);
ByteBuffer buffer = partition.recordSet;
MemoryRecords records = MemoryRecords.readableRecords(buffer);
List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
boolean skippedRecords = false;
for (LogEntry logEntry : records) {
if (logEntry.offset() >= position) {
parsed.add(parseRecord(tp, logEntry));
bytes += logEntry.size();
} else {
// 忽略在本地記錄offset之前的消息
skippedRecords = true;
}
}
recordsCount = parsed.size();
if (!parsed.isEmpty()) {
// 解析后的Record集合封裝成PartitionRecords對象
parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed);
ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
}
}
return parsedRecords;
}
private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained, PartitionRecords<K, V> partitionRecords, int maxRecords) {
long position = subscriptions.position(partitionRecords.partition);
if (partitionRecords.fetchOffset == position) {
// 獲取消息集合啥刻,最多maxRecords個
List<ConsumerRecord<K, V>> partRecords = partitionRecords.take(maxRecords);
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1; // 最后一個消息的offset
// 追加消息集合
List<ConsumerRecord<K, V>> records = drained.get(partitionRecords.partition);
if (records == null) {
records = partRecords;
drained.put(partitionRecords.partition, records);
} else {
records.addAll(partRecords);
}
// 更新對應topicPartition的position
subscriptions.position(partitionRecords.partition, nextOffset);
return partRecords.size();
}
return 0;
}
// 重置TopicPartition的position
public void updateFetchPositions(Set<TopicPartition> partitions) {
for (TopicPartition tp : partitions) {
if (!subscriptions.isAssigned(tp) || subscriptions.isFetchable(tp))
continue; // 如果consumer有position就跳過重置
if (subscriptions.isOffsetResetNeeded(tp)) {
// 按照指定的策略重置position
resetOffset(tp);
} else if (subscriptions.committed(tp) == null) {
// consumer沒有commit的offset奸鸯,按缺省策略重置
subscriptions.needOffsetReset(tp);
resetOffset(tp);
} else {
// 否則就將position更新為commit的offset
long committed = subscriptions.committed(tp).offset();
subscriptions.seek(tp, committed);
}
}
}
private void resetOffset(TopicPartition partition) {
OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
final long timestamp;
if (strategy == OffsetResetStrategy.EARLIEST)
timestamp = ListOffsetRequest.EARLIEST_TIMESTAMP;
else if (strategy == OffsetResetStrategy.LATEST)
timestamp = ListOffsetRequest.LATEST_TIMESTAMP;
else
throw new NoOffsetForPartitionException(partition);
// 根據(jù)時間戳向分區(qū)的leader node發(fā)送OffsetRequest
long offset = listOffset(partition, timestamp);
if (subscriptions.isAssigned(partition))
this.subscriptions.seek(partition, offset); // 更新position
}
}
KafkaConsumer.java
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
- 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來荸镊,“玉大人咽斧,你說我怎么就攤上這事」妫” “怎么了张惹?”我有些...
- 文/不壞的土叔 我叫張陵,是天一觀的道長岭洲。 經(jīng)常有香客問我宛逗,道長,這世上最難降的妖魔是什么盾剩? 我笑而不...
- 正文 為了忘掉前任雷激,我火速辦了婚禮,結果婚禮上告私,老公的妹妹穿的比我還像新娘侥锦。我一直安慰自己,他們只是感情好德挣,可當我...
- 文/花漫 我一把揭開白布茬射。 她就那樣靜靜地躺著唤蔗,像睡著了一般餐弱。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上唠帝,一...
- 文/蒼蘭香墨 我猛地睜開眼徘意,長吁一口氣:“原來是場噩夢啊……” “哼苔悦!你這毒婦竟也來了?” 一聲冷哼從身側響起椎咧,我...
- 正文 年R本政府宣布庵芭,位于F島的核電站妹懒,受9級特大地震影響,放射性物質發(fā)生泄漏双吆。R本人自食惡果不足惜眨唬,卻給世界環(huán)境...
- 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望好乐。 院中可真熱鬧匾竿,春花似錦、人聲如沸蔚万。這莊子的主人今日做“春日...
- 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至昵慌,卻和暖如春假夺,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背斋攀。 一陣腳步聲響...
推薦閱讀更多精彩內容
- org.springframework.beans.factory.UnsatisfiedDependencyEx...
- 1.有很多人是通過自學,比如買些專業(yè)的java書籍猜憎、或者通過網(wǎng)上免費視頻學習娩怎、進入相應的java論壇等等。ja...
- 我不知道你會不會這樣崩瓤,在某個多愁善感的夜里,有些思緒會時不時地冒出來踩官,你會為那個有些遙遠卻會到來的日子擔憂却桶,但好在...
- 在今年6月份的全球開發(fā)者大會上,蘋果公布了ARKit——供開發(fā)者為iPhone和iPad開發(fā)增強現(xiàn)實應用的軟件開發(fā)...