歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》会钝,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。
上接:1. Kafka解惑之Old Producer(1)—— Beginning
上篇文章結(jié)尾一下子擴(kuò)展的有點(diǎn)多塘雳,我們還是先回到DefaultEventHandler上來(lái),當(dāng)調(diào)用producer.send方法發(fā)送消息的時(shí)候,緊接著就是調(diào)用DefaultEventHandler的handle方法。下面是handle方法的主要內(nèi)容,雖然行數(shù)有點(diǎn)多,但是這是Producer中最最核心的一塊恃泪,需要反復(fù)研磨郑兴,方能一探究竟:
def handle(events: Seq[KeyedMessage[K,V]]) {
val serializedData = serialize(events)
var outstandingProduceRequests = serializedData
var remainingRetries = config.messageSendMaxRetries + 1
val correlationIdStart = correlationId.get()
while (remainingRetries > 0 && outstandingProduceRequests.nonEmpty) {
topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
if (topicMetadataRefreshInterval >= 0 &&
Time.SYSTEM.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
CoreUtils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
sendPartitionPerTopicCache.clear()
topicMetadataToRefresh.clear
lastTopicMetadataRefreshTime = Time.SYSTEM.milliseconds
}
outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
if (outstandingProduceRequests.nonEmpty) {
Thread.sleep(config.retryBackoffMs)
CoreUtils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))
sendPartitionPerTopicCache.clear()
remainingRetries -= 1
producerStats.resendRate.mark()
}
}
}
注意handle方法的參數(shù)是個(gè)Seq[KeyedMessage]類型的,而不是KeyedMessage贝乎。雖然Demo中用的只是單個(gè)KeyedMessage情连,最后調(diào)用底層的handle方法都是轉(zhuǎn)換為Seq類型,你可以把Seq看成是java中的List览效,在Scala中表示序列却舀,指的是一類具有一定長(zhǎng)度的可迭代訪問(wèn)的對(duì)象,其中每個(gè)元素均帶有一個(gè)從0開(kāi)始計(jì)數(shù)的固定索引位置锤灿。
這個(gè)handle方法中首先是調(diào)用serialize(events)方法對(duì)消息進(jìn)行序列化操作挽拔,這個(gè)容易理解,就是通過(guò)serializer.class參數(shù)指定的序列化類進(jìn)行序列化但校。
其次獲取所發(fā)送消息對(duì)應(yīng)的元數(shù)據(jù)信息螃诅,然后將一坨消息(也有可能是一條)轉(zhuǎn)換為HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]格式,其中key:Int表示broker的id状囱,value是TopicAndPartition與消息集的Map术裸,對(duì)應(yīng)的方法為dispatchSerializedData()。因?yàn)榭蛻舳税l(fā)消息是發(fā)到對(duì)應(yīng)的broker上亭枷,所以要對(duì)每個(gè)消息找出對(duì)應(yīng)的leader副本所在的broker的位置袭艺,然后將要發(fā)送的消息集分類,每個(gè)broker對(duì)應(yīng)其各自所要接收的消息叨粘。而TopicAndPartition是針對(duì)broker上的存儲(chǔ)層的猾编,每個(gè)TopicAndPartition對(duì)應(yīng)特定的當(dāng)前的存儲(chǔ)文件(Segment文件),將消息寫(xiě)入到存儲(chǔ)文件中宣鄙。
獲取元數(shù)據(jù)信息并不是每次發(fā)送消息都要向metadata.broker.list所指定地址中的服務(wù)索要拉取袍镀,而是向緩存中的元數(shù)據(jù)進(jìn)行拉取,拉取失敗后才向metadata.broker.list所指定地址中的服務(wù)發(fā)送元數(shù)據(jù)更新的請(qǐng)求進(jìn)行拉取冻晤。很多朋友會(huì)把metadata.broker.list看成是broker的地址苇羡,這個(gè)不完全正確,官網(wǎng)解釋:
This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas). The socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.
因?yàn)檫@個(gè)地址只提供給客戶端拉取元數(shù)據(jù)信息之用鼻弧,而剩下的動(dòng)作比如發(fā)送消息是通過(guò)與元數(shù)據(jù)信息中的broker地址建立連接之后再進(jìn)行操作设江,這也就意味著metadata.broker.list可以和broker的真正地址沒(méi)有任何交集。你完全可以為metadata.broker.list配置一個(gè)“偽裝”接口地址攘轩,這個(gè)接口配合kafka的傳輸格式并提供相應(yīng)的元數(shù)據(jù)信息叉存,這樣方便集中式的配置管理(可以集成到配置中心中)。為了簡(jiǎn)化說(shuō)明度帮,我們姑且可以狹義的認(rèn)為metadata.broker.list指的就是kafka broker的地址歼捏。
緩存中的元數(shù)據(jù)每隔topic.metadata.refresh.interval.ms才去broker拉取元數(shù)據(jù)信息稿存,可以參考上面大段源碼中的if語(yǔ)句:
if (topicMetadataRefreshInterval >= 0 &&
Time.SYSTEM.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval)
topic.metadata.refresh.interval.ms參數(shù)的默認(rèn)值是600*1000ms,也就是10分鐘瞳秽。如果設(shè)置為0瓣履,則每次發(fā)送消息時(shí)都要先向broker拉取元數(shù)據(jù)信息;如果設(shè)置為負(fù)數(shù)练俐,那么只有在元數(shù)據(jù)獲取失敗的情況下才會(huì)請(qǐng)求元數(shù)據(jù)信息袖迎。由于這個(gè)老版的Scala的Producer請(qǐng)求元數(shù)據(jù)和發(fā)送消息是在同一個(gè)線程中完成的,所以此處會(huì)有延遲的隱患腺晾,具體的筆者會(huì)在后面的案例分析環(huán)節(jié)為大家詳細(xì)介紹燕锥。
接下去所要做的工作就是查看是否需要壓縮,如果客戶端設(shè)置了壓縮悯蝉,則根據(jù)compression.type參數(shù)配置的壓縮方式對(duì)消息進(jìn)行壓縮處理归形。0.8.2.x版本支持gzip和snappy的壓縮方式,1.0.0版本還支持lz4的壓縮方式泉粉。compression.type參數(shù)的默認(rèn)值值none连霉,即不需要壓縮。
最后根據(jù)brokerId分組發(fā)送消息嗡靡。這個(gè)分組發(fā)送的過(guò)程就與ProducerPool有關(guān)了跺撼,我們前面提到在實(shí)例化Producer的時(shí)候引入了DefaultEventHandler和ProducerPool。這個(gè)ProducerPool保存的是生產(chǎn)者和broker的連接讨彼,每個(gè)連接對(duì)應(yīng)一個(gè)SyncProducer對(duì)象歉井。SyncProducer包裝了NIO網(wǎng)絡(luò)層的操作,每個(gè)SyncProducer都是一個(gè)與對(duì)應(yīng)broker的socket連接哈误,是真正發(fā)送消息至broker中的執(zhí)行者哩至。
@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class ProducerPool(val config: ProducerConfig) extends Logging {
private val syncProducers = new HashMap[Int, SyncProducer]
當(dāng)調(diào)用最上層的send方法發(fā)送消息的時(shí)候,下面的執(zhí)行順序?yàn)镈efaultEventHandler.handle()->DefaultEventHandler.dispatchSerializedData()->DefaultEventHandler.send()蜜自。在底層的DefaultEventHandler.send方法定義為:
private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet])
這個(gè)方法就需要根據(jù)brokerId從ProducerPool中的HashMap中找到對(duì)應(yīng)SyncProducer菩貌,然后在將“messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]”這個(gè)消息發(fā)送到SyncProducer對(duì)應(yīng)的broker上。如果在獲取緩存中的元數(shù)據(jù)失敗的時(shí)候就需要重新向broker拉取元數(shù)據(jù)重荠,或者定時(shí)(topic.metadata.refresh.interval.ms)向broker端請(qǐng)求元數(shù)據(jù)的數(shù)據(jù)箭阶,都會(huì)有可能更新ProducerPool的信息,對(duì)應(yīng)的方法為ProducerPool.updateProducer():
def updateProducer(topicMetadata: Seq[TopicMetadata]) {
val newBrokers = new collection.mutable.HashSet[BrokerEndPoint]
topicMetadata.foreach(tmd => {
tmd.partitionsMetadata.foreach(pmd => {
if(pmd.leader.isDefined) {
newBrokers += pmd.leader.get
}
})
})
lock synchronized {
newBrokers.foreach(b => {
if(syncProducers.contains(b.id)){
syncProducers(b.id).close()
syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
} else
syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
})
}
}
會(huì)Java的讀者看這段代碼的時(shí)候應(yīng)該能看出來(lái)個(gè)90%以上戈鲁,解釋下這段代碼:首先是找到更新的元數(shù)據(jù)中所有的brorker(更具體的來(lái)說(shuō)是broker的id仇参、主機(jī)地址host和端口號(hào)port三元組信息);之后在查到原有的ProducerPool中是否有相應(yīng)的SyncProducer婆殿,如果有則關(guān)閉之后再重新建立诈乒;如果沒(méi)有則新建。SyncProducer底層是阻塞式的NIO婆芦,所以關(guān)閉再建立會(huì)有一定程度上的開(kāi)銷(xiāo)怕磨,相關(guān)細(xì)節(jié)如下:
channel = SocketChannel.open()
if(readBufferSize > 0)
channel.socket.setReceiveBufferSize(readBufferSize)
if(writeBufferSize > 0)
channel.socket.setSendBufferSize(writeBufferSize)
channel.configureBlocking(true)
channel.socket.setSoTimeout(readTimeoutMs)
channel.socket.setKeepAlive(true)
channel.socket.setTcpNoDelay(true)
channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)
玩過(guò)NIO的讀者對(duì)這段代碼相比很是熟絡(luò)喂饥,雖然是scala版的。如果沒(méi)有接觸過(guò)NIO肠鲫,那么可以先看看這一篇:攻破JAVA NIO技術(shù)壁壘仰泻。
說(shuō)道這里我們用一副結(jié)構(gòu)圖來(lái)說(shuō)明下Old Producer的大致脈絡(luò)(注:圖中的所有操作都是在一個(gè)線程中執(zhí)行的):
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客滩届。