跟我學Kafka源碼Producer分析

我的原文博客地址是:http://flychao88.iteye.com/blog/2266611

本章主要講解分析Kafka的Producer的業(yè)務邏輯登渣,分發(fā)邏輯和負載邏輯都在Producer中維護倾贰。

一蒸眠、Kafka的總體結構圖

(圖片轉發(fā))

二、Producer源碼分析

class Producer[K,V](val config: ProducerConfig,

private val eventHandler: EventHandler[K,V])? // only for unit testing

extends Logging {

private val hasShutdown = new AtomicBoolean(false)

//異步發(fā)送隊列

private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)

private var sync: Boolean = true

//異步處理線程

private var producerSendThread: ProducerSendThread[K,V] = null

private val lock = new Object()

//根據(jù)從配置文件中載入的信息封裝成ProducerConfig類

//判斷發(fā)送類型是同步沮明,還是異步湃窍,如果是異步則啟動一個異步處理線程

config.producerType match {

case "sync" =>

case "async" =>

sync = false

producerSendThread =

new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,

queue,

ventHandler,

config.queueBufferingMaxMs,

config.batchNumMessages,

config.clientId)

producerSendThread.start()

}

private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)

KafkaMetricsReporter.startReporters(config.props)

AppInfo.registerInfo()

def this(config: ProducerConfig) =

this(config,

new DefaultEventHandler[K,V](config,

Utils.createObject[Partitioner](config.partitionerClass, config.props),

Utils.createObject[Encoder[V]](config.serializerClass, config.props),

Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),

new ProducerPool(config)))

/**

* Sends the data, partitioned by key to the topic using either the

* synchronous or the asynchronous producer

* @param messages the producer data object that encapsulates the topic, key and message data

*/

def send(messages: KeyedMessage[K,V]*) {

lock synchronized {

if (hasShutdown.get)

throw new ProducerClosedException

recordStats(messages)

sync match {

case true => eventHandler.handle(messages)

case false => asyncSend(messages)

}

}

}

private def recordStats(messages: Seq[KeyedMessage[K,V]]) {

for (message <- messages) {

producerTopicStats.getProducerTopicStats(message.topic).messageRate.mark()

producerTopicStats.getProducerAllTopicsStats.messageRate.mark()

}

}

//異步發(fā)送流程

//將messages異步放到queue里面,等待異步線程獲取

private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {

for (message <- messages) {

val added = config.queueEnqueueTimeoutMs match {

case 0? =>

queue.offer(message)

case _? =>

try {

config.queueEnqueueTimeoutMs < 0 match {

case true =>

queue.put(message)

true

case _ =>

queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)

}

}

catch {

case e: InterruptedException =>

false

}

}

if(!added) {

producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark()

producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark()

throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)

}else {

trace("Added to send queue an event: " + message.toString)

trace("Remaining queue size: " + queue.remainingCapacity)

}

}

}

/**

* Close API to close the producer pool connections to all Kafka brokers. Also closes

* the zookeeper client connection if one exists

*/

def close() = {

lock synchronized {

val canShutdown = hasShutdown.compareAndSet(false, true)

if(canShutdown) {

info("Shutting down producer")

val startTime = System.nanoTime()

KafkaMetricsGroup.removeAllProducerMetrics(config.clientId)

if (producerSendThread != null)

producerSendThread.shutdown

eventHandler.close

info("Producer shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms")

}

}

}

}

說明:

上面這段代碼很多方法我加了中文注釋仙蛉,首先要初始化一系列參數(shù),比如異步消息隊列queue碱蒙,是否是同步sync荠瘪,異步同步數(shù)據(jù)線程ProducerSendThread夯巷,其實重點就是ProducerSendThread這個類,從隊列中取出數(shù)據(jù)并讓kafka.producer.EventHandler將消息發(fā)送到broker哀墓。這個代碼量不多趁餐,但是包含了很多內容,通過config.producerType判斷是同步發(fā)送還是異步發(fā)送篮绰,每一種發(fā)送方式都有相關類支持后雷,下面我們將重點介紹這二種類型。

1阶牍、同步發(fā)送

private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {

//分區(qū)并且整理方法

val partitionedDataOpt = partitionAndCollate(messages)

partitionedDataOpt match {

case Some(partitionedData) =>

val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]

try {

for ((brokerid, messagesPerBrokerMap) <- partitionedData) {

if (logger.isTraceEnabled)

messagesPerBrokerMap.foreach(partitionAndEvent =>

trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))

val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)

val failedTopicPartitions = send(brokerid, messageSetPerBroker)

failedTopicPartitions.foreach(topicPartition => {

messagesPerBrokerMap.get(topicPartition) match {

case Some(data) => failedProduceRequests.appendAll(data)

case None => // nothing

}

})

}

} catch {

case t: Throwable => error("Failed to send messages", t)

}

failedProduceRequests

case None => // all produce requests failed

messages

}

}

說明:

這個方法主要說了二個重要信息喷面,一個是partitionAndCollate星瘾,這個方法主要獲取topic走孽、partition和broker的,這個方法很重要琳状,下面會進行分析磕瓷。另一個重要的方法是groupMessageToSet是要對所發(fā)送數(shù)據(jù)進行壓縮 ?設置。

在我們了解的partitionAndCollate方法之前先來了解一下如下類結構:

TopicMetadata -->PartitionMetadata

case class PartitionMetadata(partitionId: Int,

val leader: Option[Broker],

replicas: Seq[Broker],

isr: Seq[Broker] = Seq.empty,

errorCode: Short = ErrorMapping.NoError)

也就是說念逞,Topic元數(shù)據(jù)包括了partition元數(shù)據(jù)困食,partition元數(shù)據(jù)中包括了partitionId,leader(leader partition在哪個broker中翎承,備份partition在哪些broker中硕盹,以及isr有哪些等等。

def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {

val ret = new HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]

try {

for (message <- messages) {

//獲取Topic的partition列表

val topicPartitionsList = getPartitionListForTopic(message)

//根據(jù)hash算法得到消息應該發(fā)往哪個分區(qū)(partition)

val partitionIndex = getPartition(message.topic, message.partitionKey, topicPartitionsList)

val brokerPartition = topicPartitionsList(partitionIndex)

// postpone the failure until the send operation, so that requests for other brokers are handled correctly

val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1)

var dataPerBroker: HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] = null

ret.get(leaderBrokerId) match {

case Some(element) =>

dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]

case None =>

dataPerBroker = new HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]

ret.put(leaderBrokerId, dataPerBroker)

}

val topicAndPartition = TopicAndPartition(message.topic, brokerPartition.partitionId)

var dataPerTopicPartition: ArrayBuffer[KeyedMessage[K,Message]] = null

dataPerBroker.get(topicAndPartition) match {

case Some(element) =>

dataPerTopicPartition = element.asInstanceOf[ArrayBuffer[KeyedMessage[K,Message]]]

case None =>

dataPerTopicPartition = new ArrayBuffer[KeyedMessage[K,Message]]

dataPerBroker.put(topicAndPartition, dataPerTopicPartition)

}

dataPerTopicPartition.append(message)

}

Some(ret)

}catch {? ? // Swallow recoverable exceptions and return None so that they can be retried.

case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None

case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None

case oe: Throwable => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None

}

}

說明:

調用partitionAndCollate根據(jù)topics的messages進行分組操作叨咖,messages分配給dataPerBroker(多個不同的Broker的Map)瘩例,根據(jù)不同Broker調用不同的SyncProducer.send批量發(fā)送消息數(shù)據(jù),SyncProducer包裝了nio網(wǎng)絡操作信息甸各。

partitionAndCollate這個方法的主要作用是:獲取所有partitions的leader所在leaderBrokerId(就是在該partiionid的leader分布在哪個broker上)垛贤,創(chuàng)建一個HashMap>>>,把messages按照brokerId分組組裝數(shù)據(jù),然后為SyncProducer分別發(fā)送消息作準備工作趣倾。

我們進入getPartitionListForTopic這個方法看一下聘惦,這個方法主要是干什么的。

private def getPartitionListForTopic(m: KeyedMessage[K,Message]): Seq[PartitionAndLeader] = {

val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(m.topic, correlationId.getAndIncrement)

debug("Broker partitions registered for topic: %s are %s"

.format(m.topic, topicPartitionsList.map(p => p.partitionId).mkString(",")))

val totalNumPartitions = topicPartitionsList.length

if(totalNumPartitions == 0)

throw new NoBrokersForPartitionException("Partition key = " + m.key)

topicPartitionsList

}

說明:這個方法看上去沒什么儒恋,主要是getBrokerPartitionInfo這個方法善绎,其中KeyedMessage這個就是我們要發(fā)送的消息,返回值是Seq[PartitionAndLeader]诫尽。

def getBrokerPartitionInfo(topic: String, correlationId: Int): Seq[PartitionAndLeader] = {

debug("Getting broker partition info for topic %s".format(topic))

// check if the cache has metadata for this topic

val topicMetadata = topicPartitionInfo.get(topic)

val metadata: TopicMetadata =

topicMetadata match {

case Some(m) => m

case None =>

// refresh the topic metadata cache

updateInfo(Set(topic), correlationId)

val topicMetadata = topicPartitionInfo.get(topic)

topicMetadata match {

case Some(m) => m

case None => throw new KafkaException("Failed to fetch topic metadata for topic: " + topic)

}

}

val partitionMetadata = metadata.partitionsMetadata

if(partitionMetadata.size == 0) {

if(metadata.errorCode != ErrorMapping.NoError) {

throw new KafkaException(ErrorMapping.exceptionFor(metadata.errorCode))

} else {

throw new KafkaException("Topic metadata %s has empty partition metadata and no error code".format(metadata))

}

}

partitionMetadata.map { m =>

m.leader match {

case Some(leader) =>

debug("Partition [%s,%d] has leader %d".format(topic, m.partitionId, leader.id))

new PartitionAndLeader(topic, m.partitionId, Some(leader.id))

case None =>

debug("Partition [%s,%d] does not have a leader yet".format(topic, m.partitionId))

new PartitionAndLeader(topic, m.partitionId, None)

}

}.sortWith((s, t) => s.partitionId < t.partitionId)

}

說明:

這個方法很重要禀酱,首先看一下topicPartitionInfo這個對象,這個一個HashMap結構:HashMap[String, TopicMetadata] key是topic名稱箱锐,value是topic元數(shù)據(jù)比勉。

通過這個hash結構獲取topic元數(shù)據(jù),做match匹配,如果有數(shù)據(jù)(Some(m))則賦值給metadata浩聋,如果沒有观蜗,也就是None的時候,則通過nio遠程連到服務端更新topic信息衣洁。

請看如下流程圖:

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末墓捻,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子坊夫,更是在濱河造成了極大的恐慌砖第,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,839評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件环凿,死亡現(xiàn)場離奇詭異梧兼,居然都是意外死亡,警方通過查閱死者的電腦和手機智听,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評論 2 382
  • 文/潘曉璐 我一進店門羽杰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人到推,你說我怎么就攤上這事考赛。” “怎么了莉测?”我有些...
    開封第一講書人閱讀 153,116評論 0 344
  • 文/不壞的土叔 我叫張陵颜骤,是天一觀的道長。 經(jīng)常有香客問我捣卤,道長忍抽,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,371評論 1 279
  • 正文 為了忘掉前任腌零,我火速辦了婚禮梯找,結果婚禮上,老公的妹妹穿的比我還像新娘益涧。我一直安慰自己锈锤,他們只是感情好,可當我...
    茶點故事閱讀 64,384評論 5 374
  • 文/花漫 我一把揭開白布闲询。 她就那樣靜靜地躺著久免,像睡著了一般。 火紅的嫁衣襯著肌膚如雪扭弧。 梳的紋絲不亂的頭發(fā)上阎姥,一...
    開封第一講書人閱讀 49,111評論 1 285
  • 那天,我揣著相機與錄音鸽捻,去河邊找鬼呼巴。 笑死泽腮,一個胖子當著我的面吹牛,可吹牛的內容都是我干的衣赶。 我是一名探鬼主播诊赊,決...
    沈念sama閱讀 38,416評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼府瞄!你這毒婦竟也來了碧磅?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,053評論 0 259
  • 序言:老撾萬榮一對情侶失蹤遵馆,失蹤者是張志新(化名)和其女友劉穎鲸郊,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體货邓,經(jīng)...
    沈念sama閱讀 43,558評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡秆撮,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,007評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了逻恐。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片像吻。...
    茶點故事閱讀 38,117評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖复隆,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情姆涩,我是刑警寧澤挽拂,帶...
    沈念sama閱讀 33,756評論 4 324
  • 正文 年R本政府宣布,位于F島的核電站骨饿,受9級特大地震影響亏栈,放射性物質發(fā)生泄漏。R本人自食惡果不足惜宏赘,卻給世界環(huán)境...
    茶點故事閱讀 39,324評論 3 307
  • 文/蒙蒙 一绒北、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧察署,春花似錦闷游、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至扳埂,卻和暖如春业簿,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背阳懂。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評論 1 262
  • 我被黑心中介騙來泰國打工梅尤, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留柜思,地道東北人。 一個月前我還...
    沈念sama閱讀 45,578評論 2 355
  • 正文 我出身青樓巷燥,卻偏偏與公主長得像酝蜒,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子矾湃,可洞房花燭夜當晚...
    茶點故事閱讀 42,877評論 2 345

推薦閱讀更多精彩內容