這篇文章重點(diǎn)講一下kafka producer
端一個(gè)重要的組件——RecordAccmulator
。首先,我們以一個(gè)結(jié)構(gòu)簡圖以及一個(gè)流程圖來簡單描述一下RecordAccumulator
是如何存放一條KafkaRecord
的。
整體流程
上面的流程圖中只是簡單描述了一下追加的過程话速,讓大家對
RecordAccumulator
先有一個(gè)初步的認(rèn)識偎窘,其實(shí)實(shí)際在追加消息的過程中還有很多很嚴(yán)謹(jǐn)?shù)牟襟E,在這一小節(jié)中我們先不體現(xiàn)呻率。不過從這個(gè)簡略的流程中,我們可以看到這個(gè)流程主要就涉及到RecordAccumulator
內(nèi)部batches
以及BufferPool
兩個(gè)重要的成員變量呻引。下面我們就詳細(xì)地介紹一下這兩個(gè)核心成員變量礼仗;
核心成員變量
兩個(gè)核心成員變量在RecordAccumulator
中的形式:
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
private final BufferPool free;
batches
batches這個(gè)組件我們在源碼分析1文章中提到過,是一個(gè)以TopicPartition為key的map苞七,其中每一個(gè)分區(qū)有一個(gè)自己的隊(duì)列藐守,隊(duì)列中的元素稱之為ProducerBatch
,代表多個(gè)ProducerRecord
的集合蹂风。
上面的那副簡圖在每一個(gè)ProducerBatch
中畫了一個(gè)ByteBuffer
內(nèi)存空間用于存儲record
的信息卢厂,那我們進(jìn)一步來看看ProducerBatch
這個(gè)類內(nèi)部是一個(gè)什么樣的結(jié)構(gòu)。
這個(gè)類內(nèi)部我們也是重點(diǎn)關(guān)注兩個(gè)成員變量:
final TopicPartition topicPartition;
private final MemoryRecordsBuilder recordsBuilder;
topicPartition
就是代表這個(gè)ProducerBatch
屬于哪一個(gè)分區(qū)惠啄。那么recordsBuilder
這個(gè)變量自然就是用于存儲“多個(gè)ProducerRecord
的組件啦慎恒。發(fā)現(xiàn)其實(shí)并沒有我們上面提到的ByteBuffer
,反而多出來一個(gè)recordsBuilder
的變量撵渡,那我們猜測應(yīng)該是recordsBuilder
內(nèi)部應(yīng)該維護(hù)了這個(gè)ByteBuffer
融柬。所以我們再稍微深入一下,簡單來看一下這個(gè)MemoryRecordsBuilder
內(nèi)部又是一個(gè)什么樣子趋距。
MemoryrecordsBuilder
這個(gè)類內(nèi)部稍微有些復(fù)雜粒氧,不過可以理解包含兩部分內(nèi)容,一部分內(nèi)容是下面這些元數(shù)據(jù)信息:
private final byte magic;
private final long logAppendTime;
private final int partitionLeaderEpoch;
private boolean isTransactional;
private long producerId;
private short producerEpoch;
這些內(nèi)容大家可以先不關(guān)心节腐,之后用到的時(shí)候會慢慢說外盯。
還有一部分內(nèi)容是真正用于存放每一個(gè)ProducerRecord
的key以及value的位置
private DataOutputStream appendStream;
private final ByteBufferOutputStream bufferStream;
到這里我們還是沒有找到ByteBuffer
的身影摘盆,不要著急,我們可以稍微注意一下里面的bufferStream
這個(gè)成員變量饱苟,它的類型是一個(gè)ByteBufferOutputStream
孩擂,這是kafka自定義的一個(gè)OutputStream
,其實(shí)其內(nèi)部就是簡單維護(hù)了一個(gè)ByteBuffer
用于真實(shí)的存放序列化好的數(shù)據(jù)箱熬。
public class ByteBufferOutputStream extends OutputStream {
private static final float REALLOCATION_FACTOR = 1.1f;
private final int initialCapacity;
private final int initialPosition;
private ByteBuffer buffer;
好的类垦,到現(xiàn)在為止,我們一層一層的向內(nèi)深入城须,終于到了實(shí)際存儲數(shù)據(jù)的這個(gè)ByteBuffer
所在的位置蚤认。但是這個(gè)過程經(jīng)過了很多類,比如MemoryRecordsBuilder
酿傍,比如ByteBufferOutputStream
烙懦,這些類我們之后在真正一行一行分析源代碼的時(shí)候會慢慢說驱入,這里大家只要弄懂了ByteBuffer
存放的位置就可以啦赤炒。
BufferPool
下面讓我們將目光移回到RecordAccmulator
上,因?yàn)檫€有一個(gè)核心成員變量還沒有說亏较,它就是
private final BufferPool free;
其實(shí)上面也簡單提了一下莺褒,我們每次新建一個(gè)ProducerBatch
的時(shí)候都需要向BufferPool
申請一塊ByteBuffer
,然后將這個(gè)ByteBuffer
一層一層的注入到ProducerBatch
的內(nèi)部雪情,那我們接下來就來看看這個(gè)BufferPool
內(nèi)部是什么樣子的遵岩,它又是如何分配ByteBuffer
的。
關(guān)于BufferPool
巡通,如果想要展開講的話尘执,又需要很大的篇幅,我這里專門為這個(gè)組件開了一篇文章宴凉,大家可以參考我的kafka producer源碼分析3這篇文章誊锭。
這里提一下,為什么
RecordAccumulator
內(nèi)部存儲ProducerBatch
是以我們常見的容器存儲的弥锄,像什么map丧靡、queue等,但是到了ProducerBatch
內(nèi)部就把所有的ProducerRecord
都塞進(jìn)了一個(gè)ByteBuffer中呢籽暇?
我們可以這樣想温治,kafka在producer端不管中間過程經(jīng)歷了哪些步驟,最終的目的都是將消息發(fā)送給broker戒悠。那么是不是可以來一條消息就發(fā)一條消息呢熬荆?當(dāng)時(shí)是可以的,但是這樣效率比較低绸狐,其中一個(gè)原因就是需要頻繁的IO操作卤恳,網(wǎng)絡(luò)開銷是很大的捏顺,而且這樣同步發(fā)送,發(fā)送的IO操作勢必會拖慢業(yè)務(wù)線程纬黎;于是想到了可以攢一批消息一起發(fā)送幅骄,這個(gè)一批的概念就是ProducerBatch
,那么在哪里攢呢本今?于是有了RecordAccumulator
拆座。由此可見,ProducerBatch
相當(dāng)于是網(wǎng)絡(luò)發(fā)送的最小消息單元冠息。所以為了使消息在內(nèi)存中的存儲更加緊湊挪凑,使用ByteBuffer
來存儲ProducerBatch
內(nèi)的數(shù)據(jù)無疑是最好的選擇了。
那為什么RecordAccumulator
中沒有直接搞一個(gè)大的ByteBuffer
存儲所有待發(fā)送的數(shù)據(jù)呢逛艰?因?yàn)槲覀冞€是需要一些數(shù)據(jù)的分類的躏碳,比如不同的TopicPartition
的分類,這樣搞一個(gè)容器存儲就更加方便了散怖。