- 對于集群中的每一個broker都保存著相同的完整的整個集群的metadata信息;
- metadata信息里包括了每個topic的所有partition的信息: leader, leader_epoch, controller_epoch, isr, replicas等;
- Kafka客戶端從任一broker都可以獲取到需要的metadata信息;
Metadata的存儲在哪里 --- MetadataCache組件
- 在每個Broker的
KafkaServer
對象中都會創(chuàng)建MetadataCache
組件, 負(fù)責(zé)緩存所有的metadata信息;
val metadataCache: MetadataCache = new MetadataCache(config.brokerId)
- 所在文件: core/src/main/scala/kafka/server/MetadataCache.scala
- 所有的metadata信息存儲在map里, key是topic, value又是一個map, 其中key是parition id, value是
PartitionStateInfo
private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] =
new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]()
-
PartitionStateInfo
: 包括LeaderIsrAndControllerEpoch
和Replica數(shù)組; 下面的readFrom
方法從接受到的buffer構(gòu)造一個PartitionStateInfo
對象:
def readFrom(buffer: ByteBuffer): PartitionStateInfo = {
val controllerEpoch = buffer.getInt
val leader = buffer.getInt
val leaderEpoch = buffer.getInt
val isrSize = buffer.getInt
val isr = for(i <- 0 until isrSize) yield buffer.getInt
val zkVersion = buffer.getInt
val replicationFactor = buffer.getInt
val replicas = for(i <- 0 until replicationFactor) yield buffer.getInt
PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, leaderEpoch, isr.toList, zkVersion), controllerEpoch),
replicas.toSet)
}
-
MetadataCache
還保存著推送過來的有效的broker信息
private var aliveBrokers: Map[Int, Broker] = Map()
MetadataCache如何獲取和更新metadata信息
-
KafkaApis
對象處理UpdateMetadataRequest
case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
-
handleUpdateMetadataRequest
:
val updateMetadataRequest = request.requestObj.asInstanceOf[UpdateMetadataRequest]
authorizeClusterAction(request)
replicaManager.maybeUpdateMetadataCache(updateMetadataRequest, metadataCache)
val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId)
requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, updateMetadataResponse)))
可以看到是調(diào)用了ReplicaManager.maybeUpdateMetadataCache方法, 里面又會調(diào)用到MetadataCache.updateCache
方法
-
MetadataCache.updateCache
:
aliveBrokers = updateMetadataRequest.aliveBrokers.map(b => (b.id, b)).toMap
updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) =>
if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) {
removePartitionInfo(tp.topic, tp.partition)
} else {
addOrUpdatePartitionInfo(tp.topic, tp.partition, info)
}
}
干三件事
- 更新
aliveBrokers
; - 如果某個topic的的parition的leader是無效的, 則
removePartitionInfo(tp.topic, tp.partition)
; - 新增或更新某個topic的某個parition的信息,
addOrUpdatePartitionInfo(tp.topic, tp.partition, info)
: 將信息meta信息保存到MetadataCache
的cache
對象中;
Metadata信息從哪里來
- 這個問題實際上就是在問
UpdateMetaRequest
是誰在什么時候發(fā)送的; - 來源肯定是
KafkaController
發(fā)送的; - broker變動, topic創(chuàng)建, partition增加等等時機都需要更新metadata;
誰使用metadata信息
- 主要是客戶端, 客戶端從metadata中獲取topic的partition信息, 知道leader是誰, 才可以發(fā)送和消費msg;
-
KafkaApis
對象處理MetadataRequest
case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
-
KafkaApis.handleTopicMetadataRequest
:
val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
//if topics is empty -> fetch all topics metadata but filter out the topic response that are not authorized
val topics = if (metadataRequest.topics.isEmpty) {
val topicResponses = metadataCache.getTopicMetadata(metadataRequest.topics.toSet, request.securityProtocol)
topicResponses.map(_.topic).filter(topic => authorize(request.session, Describe, new Resource(Topic, topic))).toSet
} else {
metadataRequest.topics.toSet
}
//when topics is empty this will be a duplicate authorization check but given this should just be a cache lookup, it should not matter.
var (authorizedTopics, unauthorizedTopics) = topics.partition(topic => authorize(request.session, Describe, new Resource(Topic, topic)))
if (!authorizedTopics.isEmpty) {
val topicResponses = metadataCache.getTopicMetadata(authorizedTopics, request.securityProtocol)
if (config.autoCreateTopicsEnable && topicResponses.size != authorizedTopics.size) {
val nonExistentTopics: Set[String] = topics -- topicResponses.map(_.topic).toSet
authorizer.foreach {
az => if (!az.authorize(request.session, Create, Resource.ClusterResource)) {
authorizedTopics --= nonExistentTopics
unauthorizedTopics ++= nonExistentTopics
}
}
}
}
val unauthorizedTopicMetaData = unauthorizedTopics.map(topic => new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.TopicAuthorizationCode))
val topicMetadata = if (authorizedTopics.isEmpty) Seq.empty[TopicMetadata] else getTopicMetadata(authorizedTopics, request.securityProtocol)
val brokers = metadataCache.getAliveBrokers
val response = new TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(request.securityProtocol)), topicMetadata ++ unauthorizedTopicMetaData, metadataRequest.correlationId)
requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
看著代碼不少, 實際上比較簡單:
- 先確定需要獲取哪些topic的metadata信息, 如果request里未指定topic, 則獲取當(dāng)前所有的topic的metadata信息;
- 有效性驗證,將topic分為
authorizedTopics
和unauthorizedTopics
; - 獲取authorizedTopics的metadata, 注意
getTopicMetadata
方法是關(guān)鍵所在, 它會先篩選出當(dāng)前不存在的topic, 如果auto.create.topics.enable=true
, 則調(diào)用AdminUtils.createTopic
先創(chuàng)建此topic, 但此時其PartitionStateInfo為空, 不過也會作為Metadata Response的一部分返回給客戶端.