Kafka解惑之Old Producer(2)——Sync Analysis


歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》会钝,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。

上接:1. Kafka解惑之Old Producer(1)—— Beginning

上篇文章結(jié)尾一下子擴(kuò)展的有點(diǎn)多塘雳,我們還是先回到DefaultEventHandler上來(lái),當(dāng)調(diào)用producer.send方法發(fā)送消息的時(shí)候,緊接著就是調(diào)用DefaultEventHandler的handle方法。下面是handle方法的主要內(nèi)容,雖然行數(shù)有點(diǎn)多,但是這是Producer中最最核心的一塊恃泪,需要反復(fù)研磨郑兴,方能一探究竟:

def handle(events: Seq[KeyedMessage[K,V]]) {
  val serializedData = serialize(events)
  var outstandingProduceRequests = serializedData
  var remainingRetries = config.messageSendMaxRetries + 1
  val correlationIdStart = correlationId.get()
  while (remainingRetries > 0 && outstandingProduceRequests.nonEmpty) {
    topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
    if (topicMetadataRefreshInterval >= 0 &&
        Time.SYSTEM.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
      CoreUtils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
      sendPartitionPerTopicCache.clear()
      topicMetadataToRefresh.clear
      lastTopicMetadataRefreshTime = Time.SYSTEM.milliseconds
    }
    outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
    if (outstandingProduceRequests.nonEmpty) {
      Thread.sleep(config.retryBackoffMs)
      CoreUtils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))
      sendPartitionPerTopicCache.clear()
      remainingRetries -= 1
      producerStats.resendRate.mark()
    }
  }
}

注意handle方法的參數(shù)是個(gè)Seq[KeyedMessage]類型的,而不是KeyedMessage贝乎。雖然Demo中用的只是單個(gè)KeyedMessage情连,最后調(diào)用底層的handle方法都是轉(zhuǎn)換為Seq類型,你可以把Seq看成是java中的List览效,在Scala中表示序列却舀,指的是一類具有一定長(zhǎng)度的可迭代訪問(wèn)的對(duì)象,其中每個(gè)元素均帶有一個(gè)從0開(kāi)始計(jì)數(shù)的固定索引位置锤灿。

這個(gè)handle方法中首先是調(diào)用serialize(events)方法對(duì)消息進(jìn)行序列化操作挽拔,這個(gè)容易理解,就是通過(guò)serializer.class參數(shù)指定的序列化類進(jìn)行序列化但校。

其次獲取所發(fā)送消息對(duì)應(yīng)的元數(shù)據(jù)信息螃诅,然后將一坨消息(也有可能是一條)轉(zhuǎn)換為HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]格式,其中key:Int表示broker的id状囱,value是TopicAndPartition與消息集的Map术裸,對(duì)應(yīng)的方法為dispatchSerializedData()。因?yàn)榭蛻舳税l(fā)消息是發(fā)到對(duì)應(yīng)的broker上亭枷,所以要對(duì)每個(gè)消息找出對(duì)應(yīng)的leader副本所在的broker的位置袭艺,然后將要發(fā)送的消息集分類,每個(gè)broker對(duì)應(yīng)其各自所要接收的消息叨粘。而TopicAndPartition是針對(duì)broker上的存儲(chǔ)層的猾编,每個(gè)TopicAndPartition對(duì)應(yīng)特定的當(dāng)前的存儲(chǔ)文件(Segment文件),將消息寫(xiě)入到存儲(chǔ)文件中宣鄙。

獲取元數(shù)據(jù)信息并不是每次發(fā)送消息都要向metadata.broker.list所指定地址中的服務(wù)索要拉取袍镀,而是向緩存中的元數(shù)據(jù)進(jìn)行拉取,拉取失敗后才向metadata.broker.list所指定地址中的服務(wù)發(fā)送元數(shù)據(jù)更新的請(qǐng)求進(jìn)行拉取冻晤。很多朋友會(huì)把metadata.broker.list看成是broker的地址苇羡,這個(gè)不完全正確,官網(wǎng)解釋:

This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas). The socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.

因?yàn)檫@個(gè)地址只提供給客戶端拉取元數(shù)據(jù)信息之用鼻弧,而剩下的動(dòng)作比如發(fā)送消息是通過(guò)與元數(shù)據(jù)信息中的broker地址建立連接之后再進(jìn)行操作设江,這也就意味著metadata.broker.list可以和broker的真正地址沒(méi)有任何交集。你完全可以為metadata.broker.list配置一個(gè)“偽裝”接口地址攘轩,這個(gè)接口配合kafka的傳輸格式并提供相應(yīng)的元數(shù)據(jù)信息叉存,這樣方便集中式的配置管理(可以集成到配置中心中)。為了簡(jiǎn)化說(shuō)明度帮,我們姑且可以狹義的認(rèn)為metadata.broker.list指的就是kafka broker的地址歼捏。

緩存中的元數(shù)據(jù)每隔topic.metadata.refresh.interval.ms才去broker拉取元數(shù)據(jù)信息稿存,可以參考上面大段源碼中的if語(yǔ)句:

 if (topicMetadataRefreshInterval >= 0 &&
        Time.SYSTEM.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval)

topic.metadata.refresh.interval.ms參數(shù)的默認(rèn)值是600*1000ms,也就是10分鐘瞳秽。如果設(shè)置為0瓣履,則每次發(fā)送消息時(shí)都要先向broker拉取元數(shù)據(jù)信息;如果設(shè)置為負(fù)數(shù)练俐,那么只有在元數(shù)據(jù)獲取失敗的情況下才會(huì)請(qǐng)求元數(shù)據(jù)信息袖迎。由于這個(gè)老版的Scala的Producer請(qǐng)求元數(shù)據(jù)和發(fā)送消息是在同一個(gè)線程中完成的,所以此處會(huì)有延遲的隱患腺晾,具體的筆者會(huì)在后面的案例分析環(huán)節(jié)為大家詳細(xì)介紹燕锥。

接下去所要做的工作就是查看是否需要壓縮,如果客戶端設(shè)置了壓縮悯蝉,則根據(jù)compression.type參數(shù)配置的壓縮方式對(duì)消息進(jìn)行壓縮處理归形。0.8.2.x版本支持gzip和snappy的壓縮方式,1.0.0版本還支持lz4的壓縮方式泉粉。compression.type參數(shù)的默認(rèn)值值none连霉,即不需要壓縮。

最后根據(jù)brokerId分組發(fā)送消息嗡靡。這個(gè)分組發(fā)送的過(guò)程就與ProducerPool有關(guān)了跺撼,我們前面提到在實(shí)例化Producer的時(shí)候引入了DefaultEventHandler和ProducerPool。這個(gè)ProducerPool保存的是生產(chǎn)者和broker的連接讨彼,每個(gè)連接對(duì)應(yīng)一個(gè)SyncProducer對(duì)象歉井。SyncProducer包裝了NIO網(wǎng)絡(luò)層的操作,每個(gè)SyncProducer都是一個(gè)與對(duì)應(yīng)broker的socket連接哈误,是真正發(fā)送消息至broker中的執(zhí)行者哩至。

@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class ProducerPool(val config: ProducerConfig) extends Logging {
  private val syncProducers = new HashMap[Int, SyncProducer]

當(dāng)調(diào)用最上層的send方法發(fā)送消息的時(shí)候,下面的執(zhí)行順序?yàn)镈efaultEventHandler.handle()->DefaultEventHandler.dispatchSerializedData()->DefaultEventHandler.send()蜜自。在底層的DefaultEventHandler.send方法定義為:

private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet])

這個(gè)方法就需要根據(jù)brokerId從ProducerPool中的HashMap中找到對(duì)應(yīng)SyncProducer菩貌,然后在將“messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]”這個(gè)消息發(fā)送到SyncProducer對(duì)應(yīng)的broker上。如果在獲取緩存中的元數(shù)據(jù)失敗的時(shí)候就需要重新向broker拉取元數(shù)據(jù)重荠,或者定時(shí)(topic.metadata.refresh.interval.ms)向broker端請(qǐng)求元數(shù)據(jù)的數(shù)據(jù)箭阶,都會(huì)有可能更新ProducerPool的信息,對(duì)應(yīng)的方法為ProducerPool.updateProducer():

def updateProducer(topicMetadata: Seq[TopicMetadata]) {
  val newBrokers = new collection.mutable.HashSet[BrokerEndPoint]
  topicMetadata.foreach(tmd => {
    tmd.partitionsMetadata.foreach(pmd => {
      if(pmd.leader.isDefined) {
        newBrokers += pmd.leader.get
      }
    })
  })
  lock synchronized {
    newBrokers.foreach(b => {
      if(syncProducers.contains(b.id)){
        syncProducers(b.id).close()
        syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
      } else
        syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
    })
  }
}

會(huì)Java的讀者看這段代碼的時(shí)候應(yīng)該能看出來(lái)個(gè)90%以上戈鲁,解釋下這段代碼:首先是找到更新的元數(shù)據(jù)中所有的brorker(更具體的來(lái)說(shuō)是broker的id仇参、主機(jī)地址host和端口號(hào)port三元組信息);之后在查到原有的ProducerPool中是否有相應(yīng)的SyncProducer婆殿,如果有則關(guān)閉之后再重新建立诈乒;如果沒(méi)有則新建。SyncProducer底層是阻塞式的NIO婆芦,所以關(guān)閉再建立會(huì)有一定程度上的開(kāi)銷(xiāo)怕磨,相關(guān)細(xì)節(jié)如下:

channel = SocketChannel.open()
if(readBufferSize > 0)
  channel.socket.setReceiveBufferSize(readBufferSize)
if(writeBufferSize > 0)
  channel.socket.setSendBufferSize(writeBufferSize)
channel.configureBlocking(true)
channel.socket.setSoTimeout(readTimeoutMs)
channel.socket.setKeepAlive(true)
channel.socket.setTcpNoDelay(true)
channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)

玩過(guò)NIO的讀者對(duì)這段代碼相比很是熟絡(luò)喂饥,雖然是scala版的。如果沒(méi)有接觸過(guò)NIO肠鲫,那么可以先看看這一篇:攻破JAVA NIO技術(shù)壁壘仰泻。

說(shuō)道這里我們用一副結(jié)構(gòu)圖來(lái)說(shuō)明下Old Producer的大致脈絡(luò)(注:圖中的所有操作都是在一個(gè)線程中執(zhí)行的):


Sync

歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客滩届。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市被啼,隨后出現(xiàn)的幾起案子帜消,更是在濱河造成了極大的恐慌,老刑警劉巖浓体,帶你破解...
    沈念sama閱讀 219,490評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件泡挺,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡命浴,警方通過(guò)查閱死者的電腦和手機(jī)娄猫,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,581評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)生闲,“玉大人媳溺,你說(shuō)我怎么就攤上這事“叮” “怎么了悬蔽?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,830評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)捉兴。 經(jīng)常有香客問(wèn)我蝎困,道長(zhǎng),這世上最難降的妖魔是什么倍啥? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,957評(píng)論 1 295
  • 正文 為了忘掉前任禾乘,我火速辦了婚禮,結(jié)果婚禮上虽缕,老公的妹妹穿的比我還像新娘始藕。我一直安慰自己,他們只是感情好彼宠,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,974評(píng)論 6 393
  • 文/花漫 我一把揭開(kāi)白布鳄虱。 她就那樣靜靜地躺著,像睡著了一般凭峡。 火紅的嫁衣襯著肌膚如雪拙已。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,754評(píng)論 1 307
  • 那天摧冀,我揣著相機(jī)與錄音倍踪,去河邊找鬼系宫。 笑死,一個(gè)胖子當(dāng)著我的面吹牛建车,可吹牛的內(nèi)容都是我干的扩借。 我是一名探鬼主播,決...
    沈念sama閱讀 40,464評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼缤至,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼潮罪!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起领斥,我...
    開(kāi)封第一講書(shū)人閱讀 39,357評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤嫉到,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后月洛,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體何恶,經(jīng)...
    沈念sama閱讀 45,847評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,995評(píng)論 3 338
  • 正文 我和宋清朗相戀三年嚼黔,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了细层。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,137評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡唬涧,死狀恐怖疫赎,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情碎节,我是刑警寧澤虚缎,帶...
    沈念sama閱讀 35,819評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站钓株,受9級(jí)特大地震影響实牡,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜轴合,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,482評(píng)論 3 331
  • 文/蒙蒙 一创坞、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧受葛,春花似錦题涨、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,023評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至闰渔,卻和暖如春席函,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背冈涧。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,149評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工茂附, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留正蛙,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,409評(píng)論 3 373
  • 正文 我出身青樓营曼,卻偏偏與公主長(zhǎng)得像乒验,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子蒂阱,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,086評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理锻全,服務(wù)發(fā)現(xiàn),斷路器录煤,智...
    卡卡羅2017閱讀 134,672評(píng)論 18 139
  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,723評(píng)論 13 425
  • kafka的定義:是一個(gè)分布式消息系統(tǒng)虱痕,由LinkedIn使用Scala編寫(xiě),用作LinkedIn的活動(dòng)流(Act...
    時(shí)待吾閱讀 5,324評(píng)論 1 15
  • Kafka入門(mén)經(jīng)典教程-Kafka-about云開(kāi)發(fā) http://www.aboutyun.com/threa...
    葡萄喃喃囈語(yǔ)閱讀 10,833評(píng)論 4 54
  • 本文轉(zhuǎn)載自http://dataunion.org/?p=9307 背景介紹Kafka簡(jiǎn)介Kafka是一種分布式的...
    Bottle丶Fish閱讀 5,471評(píng)論 0 34