kafka producer源碼分析2 -- RecordAccumulator

這篇文章重點(diǎn)講一下kafka producer端一個(gè)重要的組件——RecordAccmulator。首先,我們以一個(gè)結(jié)構(gòu)簡圖以及一個(gè)流程圖來簡單描述一下RecordAccumulator是如何存放一條KafkaRecord的。

整體流程

RecordAccumulator內(nèi)部結(jié)構(gòu)簡圖

追加消息的簡單流程圖

上面的流程圖中只是簡單描述了一下追加的過程话速,讓大家對RecordAccumulator先有一個(gè)初步的認(rèn)識偎窘,其實(shí)實(shí)際在追加消息的過程中還有很多很嚴(yán)謹(jǐn)?shù)牟襟E,在這一小節(jié)中我們先不體現(xiàn)呻率。不過從這個(gè)簡略的流程中,我們可以看到這個(gè)流程主要就涉及到RecordAccumulator內(nèi)部batches以及BufferPool兩個(gè)重要的成員變量呻引。下面我們就詳細(xì)地介紹一下這兩個(gè)核心成員變量礼仗;

核心成員變量

兩個(gè)核心成員變量在RecordAccumulator中的形式:

    private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
    private final BufferPool free;

batches

batches這個(gè)組件我們在源碼分析1文章中提到過,是一個(gè)以TopicPartition為key的map苞七,其中每一個(gè)分區(qū)有一個(gè)自己的隊(duì)列藐守,隊(duì)列中的元素稱之為ProducerBatch,代表多個(gè)ProducerRecord的集合蹂风。
上面的那副簡圖在每一個(gè)ProducerBatch中畫了一個(gè)ByteBuffer內(nèi)存空間用于存儲record的信息卢厂,那我們進(jìn)一步來看看ProducerBatch這個(gè)類內(nèi)部是一個(gè)什么樣的結(jié)構(gòu)。
這個(gè)類內(nèi)部我們也是重點(diǎn)關(guān)注兩個(gè)成員變量:

    final TopicPartition topicPartition;
    private final MemoryRecordsBuilder recordsBuilder;

topicPartition就是代表這個(gè)ProducerBatch屬于哪一個(gè)分區(qū)惠啄。那么recordsBuilder這個(gè)變量自然就是用于存儲“多個(gè)ProducerRecord的組件啦慎恒。發(fā)現(xiàn)其實(shí)并沒有我們上面提到的ByteBuffer,反而多出來一個(gè)recordsBuilder的變量撵渡,那我們猜測應(yīng)該是recordsBuilder內(nèi)部應(yīng)該維護(hù)了這個(gè)ByteBuffer融柬。所以我們再稍微深入一下,簡單來看一下這個(gè)MemoryRecordsBuilder內(nèi)部又是一個(gè)什么樣子趋距。
MemoryrecordsBuilder這個(gè)類內(nèi)部稍微有些復(fù)雜粒氧,不過可以理解包含兩部分內(nèi)容,一部分內(nèi)容是下面這些元數(shù)據(jù)信息:

    private final byte magic;
    private final long logAppendTime;
    private final int partitionLeaderEpoch;
    private boolean isTransactional;
    private long producerId;
    private short producerEpoch;

這些內(nèi)容大家可以先不關(guān)心节腐,之后用到的時(shí)候會慢慢說外盯。
還有一部分內(nèi)容是真正用于存放每一個(gè)ProducerRecord的key以及value的位置

    private DataOutputStream appendStream;
    private final ByteBufferOutputStream bufferStream;

到這里我們還是沒有找到ByteBuffer的身影摘盆,不要著急,我們可以稍微注意一下里面的bufferStream這個(gè)成員變量饱苟,它的類型是一個(gè)ByteBufferOutputStream孩擂,這是kafka自定義的一個(gè)OutputStream,其實(shí)其內(nèi)部就是簡單維護(hù)了一個(gè)ByteBuffer用于真實(shí)的存放序列化好的數(shù)據(jù)箱熬。

public class ByteBufferOutputStream extends OutputStream {

    private static final float REALLOCATION_FACTOR = 1.1f;

    private final int initialCapacity;
    private final int initialPosition;
    private ByteBuffer buffer;

好的类垦,到現(xiàn)在為止,我們一層一層的向內(nèi)深入城须,終于到了實(shí)際存儲數(shù)據(jù)的這個(gè)ByteBuffer所在的位置蚤认。但是這個(gè)過程經(jīng)過了很多類,比如MemoryRecordsBuilder酿傍,比如ByteBufferOutputStream烙懦,這些類我們之后在真正一行一行分析源代碼的時(shí)候會慢慢說驱入,這里大家只要弄懂了ByteBuffer存放的位置就可以啦赤炒。

BufferPool

下面讓我們將目光移回到RecordAccmulator上,因?yàn)檫€有一個(gè)核心成員變量還沒有說亏较,它就是

    private final BufferPool free;

其實(shí)上面也簡單提了一下莺褒,我們每次新建一個(gè)ProducerBatch的時(shí)候都需要向BufferPool申請一塊ByteBuffer,然后將這個(gè)ByteBuffer一層一層的注入到ProducerBatch的內(nèi)部雪情,那我們接下來就來看看這個(gè)BufferPool內(nèi)部是什么樣子的遵岩,它又是如何分配ByteBuffer的。
關(guān)于BufferPool巡通,如果想要展開講的話尘执,又需要很大的篇幅,我這里專門為這個(gè)組件開了一篇文章宴凉,大家可以參考我的kafka producer源碼分析3這篇文章誊锭。

這里提一下,為什么RecordAccumulator內(nèi)部存儲ProducerBatch是以我們常見的容器存儲的弥锄,像什么map丧靡、queue等,但是到了ProducerBatch內(nèi)部就把所有的ProducerRecord都塞進(jìn)了一個(gè)ByteBuffer中呢籽暇?
我們可以這樣想温治,kafka在producer端不管中間過程經(jīng)歷了哪些步驟,最終的目的都是將消息發(fā)送給broker戒悠。那么是不是可以來一條消息就發(fā)一條消息呢熬荆?當(dāng)時(shí)是可以的,但是這樣效率比較低绸狐,其中一個(gè)原因就是需要頻繁的IO操作卤恳,網(wǎng)絡(luò)開銷是很大的捏顺,而且這樣同步發(fā)送,發(fā)送的IO操作勢必會拖慢業(yè)務(wù)線程纬黎;于是想到了可以攢一批消息一起發(fā)送幅骄,這個(gè)一批的概念就是ProducerBatch,那么在哪里攢呢本今?于是有了RecordAccumulator拆座。由此可見,ProducerBatch 相當(dāng)于是網(wǎng)絡(luò)發(fā)送的最小消息單元冠息。所以為了使消息在內(nèi)存中的存儲更加緊湊挪凑,使用ByteBuffer來存儲ProducerBatch內(nèi)的數(shù)據(jù)無疑是最好的選擇了。
那為什么RecordAccumulator中沒有直接搞一個(gè)大的ByteBuffer存儲所有待發(fā)送的數(shù)據(jù)呢逛艰?因?yàn)槲覀冞€是需要一些數(shù)據(jù)的分類的躏碳,比如不同的TopicPartition的分類,這樣搞一個(gè)容器存儲就更加方便了散怖。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末菇绵,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子镇眷,更是在濱河造成了極大的恐慌咬最,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,406評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件欠动,死亡現(xiàn)場離奇詭異永乌,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)具伍,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,395評論 3 398
  • 文/潘曉璐 我一進(jìn)店門翅雏,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人人芽,你說我怎么就攤上這事望几。” “怎么了啼肩?”我有些...
    開封第一講書人閱讀 167,815評論 0 360
  • 文/不壞的土叔 我叫張陵橄妆,是天一觀的道長。 經(jīng)常有香客問我祈坠,道長害碾,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,537評論 1 296
  • 正文 為了忘掉前任赦拘,我火速辦了婚禮慌随,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己阁猜,他們只是感情好丸逸,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,536評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著剃袍,像睡著了一般黄刚。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上民效,一...
    開封第一講書人閱讀 52,184評論 1 308
  • 那天憔维,我揣著相機(jī)與錄音,去河邊找鬼畏邢。 笑死业扒,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的舒萎。 我是一名探鬼主播程储,決...
    沈念sama閱讀 40,776評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼臂寝!你這毒婦竟也來了章鲤?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,668評論 0 276
  • 序言:老撾萬榮一對情侶失蹤交煞,失蹤者是張志新(化名)和其女友劉穎咏窿,沒想到半個(gè)月后斟或,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體素征,經(jīng)...
    沈念sama閱讀 46,212評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,299評論 3 340
  • 正文 我和宋清朗相戀三年萝挤,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了御毅。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,438評論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡怜珍,死狀恐怖端蛆,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情酥泛,我是刑警寧澤今豆,帶...
    沈念sama閱讀 36,128評論 5 349
  • 正文 年R本政府宣布,位于F島的核電站柔袁,受9級特大地震影響呆躲,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜捶索,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,807評論 3 333
  • 文/蒙蒙 一插掂、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦辅甥、人聲如沸酝润。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,279評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽要销。三九已至,卻和暖如春夏块,著一層夾襖步出監(jiān)牢的瞬間蕉陋,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,395評論 1 272
  • 我被黑心中介騙來泰國打工拨扶, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留凳鬓,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,827評論 3 376
  • 正文 我出身青樓患民,卻偏偏與公主長得像缩举,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子匹颤,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,446評論 2 359

推薦閱讀更多精彩內(nèi)容