我的原文博客地址是:http://flychao88.iteye.com/blog/2266611
本章主要講解分析Kafka的Producer的業(yè)務邏輯登渣,分發(fā)邏輯和負載邏輯都在Producer中維護倾贰。
一蒸眠、Kafka的總體結構圖
(圖片轉發(fā))
二、Producer源碼分析
class Producer[K,V](val config: ProducerConfig,
private val eventHandler: EventHandler[K,V])? // only for unit testing
extends Logging {
private val hasShutdown = new AtomicBoolean(false)
//異步發(fā)送隊列
private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)
private var sync: Boolean = true
//異步處理線程
private var producerSendThread: ProducerSendThread[K,V] = null
private val lock = new Object()
//根據(jù)從配置文件中載入的信息封裝成ProducerConfig類
//判斷發(fā)送類型是同步沮明,還是異步湃窍,如果是異步則啟動一個異步處理線程
config.producerType match {
case "sync" =>
case "async" =>
sync = false
producerSendThread =
new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,
queue,
ventHandler,
config.queueBufferingMaxMs,
config.batchNumMessages,
config.clientId)
producerSendThread.start()
}
private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
KafkaMetricsReporter.startReporters(config.props)
AppInfo.registerInfo()
def this(config: ProducerConfig) =
this(config,
new DefaultEventHandler[K,V](config,
Utils.createObject[Partitioner](config.partitionerClass, config.props),
Utils.createObject[Encoder[V]](config.serializerClass, config.props),
Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
new ProducerPool(config)))
/**
* Sends the data, partitioned by key to the topic using either the
* synchronous or the asynchronous producer
* @param messages the producer data object that encapsulates the topic, key and message data
*/
def send(messages: KeyedMessage[K,V]*) {
lock synchronized {
if (hasShutdown.get)
throw new ProducerClosedException
recordStats(messages)
sync match {
case true => eventHandler.handle(messages)
case false => asyncSend(messages)
}
}
}
private def recordStats(messages: Seq[KeyedMessage[K,V]]) {
for (message <- messages) {
producerTopicStats.getProducerTopicStats(message.topic).messageRate.mark()
producerTopicStats.getProducerAllTopicsStats.messageRate.mark()
}
}
//異步發(fā)送流程
//將messages異步放到queue里面,等待異步線程獲取
private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
for (message <- messages) {
val added = config.queueEnqueueTimeoutMs match {
case 0? =>
queue.offer(message)
case _? =>
try {
config.queueEnqueueTimeoutMs < 0 match {
case true =>
queue.put(message)
true
case _ =>
queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)
}
}
catch {
case e: InterruptedException =>
false
}
}
if(!added) {
producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark()
producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark()
throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
}else {
trace("Added to send queue an event: " + message.toString)
trace("Remaining queue size: " + queue.remainingCapacity)
}
}
}
/**
* Close API to close the producer pool connections to all Kafka brokers. Also closes
* the zookeeper client connection if one exists
*/
def close() = {
lock synchronized {
val canShutdown = hasShutdown.compareAndSet(false, true)
if(canShutdown) {
info("Shutting down producer")
val startTime = System.nanoTime()
KafkaMetricsGroup.removeAllProducerMetrics(config.clientId)
if (producerSendThread != null)
producerSendThread.shutdown
eventHandler.close
info("Producer shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms")
}
}
}
}
說明:
上面這段代碼很多方法我加了中文注釋仙蛉,首先要初始化一系列參數(shù),比如異步消息隊列queue碱蒙,是否是同步sync荠瘪,異步同步數(shù)據(jù)線程ProducerSendThread夯巷,其實重點就是ProducerSendThread這個類,從隊列中取出數(shù)據(jù)并讓kafka.producer.EventHandler將消息發(fā)送到broker哀墓。這個代碼量不多趁餐,但是包含了很多內容,通過config.producerType判斷是同步發(fā)送還是異步發(fā)送篮绰,每一種發(fā)送方式都有相關類支持后雷,下面我們將重點介紹這二種類型。
1阶牍、同步發(fā)送
private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {
//分區(qū)并且整理方法
val partitionedDataOpt = partitionAndCollate(messages)
partitionedDataOpt match {
case Some(partitionedData) =>
val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]
try {
for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
if (logger.isTraceEnabled)
messagesPerBrokerMap.foreach(partitionAndEvent =>
trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)
val failedTopicPartitions = send(brokerid, messageSetPerBroker)
failedTopicPartitions.foreach(topicPartition => {
messagesPerBrokerMap.get(topicPartition) match {
case Some(data) => failedProduceRequests.appendAll(data)
case None => // nothing
}
})
}
} catch {
case t: Throwable => error("Failed to send messages", t)
}
failedProduceRequests
case None => // all produce requests failed
messages
}
}
說明:
這個方法主要說了二個重要信息喷面,一個是partitionAndCollate星瘾,這個方法主要獲取topic走孽、partition和broker的,這個方法很重要琳状,下面會進行分析磕瓷。另一個重要的方法是groupMessageToSet是要對所發(fā)送數(shù)據(jù)進行壓縮 ?設置。
在我們了解的partitionAndCollate方法之前先來了解一下如下類結構:
TopicMetadata -->PartitionMetadata
case class PartitionMetadata(partitionId: Int,
val leader: Option[Broker],
replicas: Seq[Broker],
isr: Seq[Broker] = Seq.empty,
errorCode: Short = ErrorMapping.NoError)
也就是說念逞,Topic元數(shù)據(jù)包括了partition元數(shù)據(jù)困食,partition元數(shù)據(jù)中包括了partitionId,leader(leader partition在哪個broker中翎承,備份partition在哪些broker中硕盹,以及isr有哪些等等。
def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {
val ret = new HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
try {
for (message <- messages) {
//獲取Topic的partition列表
val topicPartitionsList = getPartitionListForTopic(message)
//根據(jù)hash算法得到消息應該發(fā)往哪個分區(qū)(partition)
val partitionIndex = getPartition(message.topic, message.partitionKey, topicPartitionsList)
val brokerPartition = topicPartitionsList(partitionIndex)
// postpone the failure until the send operation, so that requests for other brokers are handled correctly
val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1)
var dataPerBroker: HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] = null
ret.get(leaderBrokerId) match {
case Some(element) =>
dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
case None =>
dataPerBroker = new HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]
ret.put(leaderBrokerId, dataPerBroker)
}
val topicAndPartition = TopicAndPartition(message.topic, brokerPartition.partitionId)
var dataPerTopicPartition: ArrayBuffer[KeyedMessage[K,Message]] = null
dataPerBroker.get(topicAndPartition) match {
case Some(element) =>
dataPerTopicPartition = element.asInstanceOf[ArrayBuffer[KeyedMessage[K,Message]]]
case None =>
dataPerTopicPartition = new ArrayBuffer[KeyedMessage[K,Message]]
dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
}
dataPerTopicPartition.append(message)
}
Some(ret)
}catch {? ? // Swallow recoverable exceptions and return None so that they can be retried.
case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None
case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None
case oe: Throwable => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None
}
}
說明:
調用partitionAndCollate根據(jù)topics的messages進行分組操作叨咖,messages分配給dataPerBroker(多個不同的Broker的Map)瘩例,根據(jù)不同Broker調用不同的SyncProducer.send批量發(fā)送消息數(shù)據(jù),SyncProducer包裝了nio網(wǎng)絡操作信息甸各。
partitionAndCollate這個方法的主要作用是:獲取所有partitions的leader所在leaderBrokerId(就是在該partiionid的leader分布在哪個broker上)垛贤,創(chuàng)建一個HashMap>>>,把messages按照brokerId分組組裝數(shù)據(jù),然后為SyncProducer分別發(fā)送消息作準備工作趣倾。
我們進入getPartitionListForTopic這個方法看一下聘惦,這個方法主要是干什么的。
private def getPartitionListForTopic(m: KeyedMessage[K,Message]): Seq[PartitionAndLeader] = {
val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(m.topic, correlationId.getAndIncrement)
debug("Broker partitions registered for topic: %s are %s"
.format(m.topic, topicPartitionsList.map(p => p.partitionId).mkString(",")))
val totalNumPartitions = topicPartitionsList.length
if(totalNumPartitions == 0)
throw new NoBrokersForPartitionException("Partition key = " + m.key)
topicPartitionsList
}
說明:這個方法看上去沒什么儒恋,主要是getBrokerPartitionInfo這個方法善绎,其中KeyedMessage這個就是我們要發(fā)送的消息,返回值是Seq[PartitionAndLeader]诫尽。
def getBrokerPartitionInfo(topic: String, correlationId: Int): Seq[PartitionAndLeader] = {
debug("Getting broker partition info for topic %s".format(topic))
// check if the cache has metadata for this topic
val topicMetadata = topicPartitionInfo.get(topic)
val metadata: TopicMetadata =
topicMetadata match {
case Some(m) => m
case None =>
// refresh the topic metadata cache
updateInfo(Set(topic), correlationId)
val topicMetadata = topicPartitionInfo.get(topic)
topicMetadata match {
case Some(m) => m
case None => throw new KafkaException("Failed to fetch topic metadata for topic: " + topic)
}
}
val partitionMetadata = metadata.partitionsMetadata
if(partitionMetadata.size == 0) {
if(metadata.errorCode != ErrorMapping.NoError) {
throw new KafkaException(ErrorMapping.exceptionFor(metadata.errorCode))
} else {
throw new KafkaException("Topic metadata %s has empty partition metadata and no error code".format(metadata))
}
}
partitionMetadata.map { m =>
m.leader match {
case Some(leader) =>
debug("Partition [%s,%d] has leader %d".format(topic, m.partitionId, leader.id))
new PartitionAndLeader(topic, m.partitionId, Some(leader.id))
case None =>
debug("Partition [%s,%d] does not have a leader yet".format(topic, m.partitionId))
new PartitionAndLeader(topic, m.partitionId, None)
}
}.sortWith((s, t) => s.partitionId < t.partitionId)
}
說明:
這個方法很重要禀酱,首先看一下topicPartitionInfo這個對象,這個一個HashMap結構:HashMap[String, TopicMetadata] key是topic名稱箱锐,value是topic元數(shù)據(jù)比勉。
通過這個hash結構獲取topic元數(shù)據(jù),做match匹配,如果有數(shù)據(jù)(Some(m))則賦值給metadata浩聋,如果沒有观蜗,也就是None的時候,則通過nio遠程連到服務端更新topic信息衣洁。
請看如下流程圖: