[Kafka] Producer發(fā)送消息機(jī)制解析

1. 前言
  • Sync Producer:低延遲踩窖,低吞吐率娱节,無(wú)數(shù)據(jù)丟失
  • Async Producer:高延遲辫红,高吞吐率,可能會(huì)有數(shù)據(jù)丟失
2.Producer發(fā)送消息

通過(guò)KafkaProducer的send方法發(fā)送消息田盈,send方法有兩種重載:

Future<RecordMetadata> send(ProducerRecord<K, V> record)
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)

消息是被封裝成ProducerRecord,那么ProducerRecord是怎樣的呢缴阎?


核心屬性:

  • String topic 消息所屬的主題
  • Integer partition 消息所處的主題的分區(qū)數(shù)允瞧,可以人為指定,如果指定了 key 的話(huà)蛮拔,會(huì)使用 key 的 hashCode 與隊(duì)列總數(shù)進(jìn)行取模來(lái)選擇分區(qū)述暂,如果前面兩者都未指定,則會(huì)輪詢(xún)主題下的所有分區(qū)
  • Headers headers 該消息的額外屬性對(duì)建炫,與消息體分開(kāi)存儲(chǔ)
  • K key 消息鍵畦韭,如果指定該值,則會(huì)使用該值的 hashcode 與 隊(duì)列數(shù)進(jìn)行取模來(lái)選擇分區(qū)
  • V value 消息體
  • Long timestamp 消息時(shí)間戳肛跌,根據(jù) topic 的配置信息 message.timestamp.type 的值來(lái)賦予不同的值
    CreateTime:發(fā)送客戶(hù)端發(fā)送消息時(shí)的時(shí)間戳
    LogAppendTime:消息在broker追加時(shí)的時(shí)間戳
3.源碼分析消息追加流程

KafkaProducer的send方法艺配,并不會(huì)直接向broker發(fā)送消息,kafka將消息發(fā)送異步化衍慎,即分解成兩個(gè)步驟转唉,send 方法的職責(zé)是將消息追加到內(nèi)存中(分區(qū)的緩存隊(duì)列中),然后會(huì)由專(zhuān)門(mén)的 Send 線(xiàn)程異步將緩存中的消息批量發(fā)送到 Kafka Broker 中稳捆。

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }
3.1 首先執(zhí)行消息發(fā)送的攔截器

攔截器是通過(guò)屬性interceptor.classes指定赠法,

轉(zhuǎn)成List<String>類(lèi)型,每一個(gè)元素就是攔截器的全類(lèi)路徑限定名乔夯,默認(rèn)是空的(即沒(méi)有默認(rèn)攔截器存在)


3.2 執(zhí)行doSend方法

step1:

// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);

獲取元數(shù)據(jù)信息砖织,包括topic可用的分區(qū)列表款侵,如果本地沒(méi)有該topic的分區(qū)信息,則需要向遠(yuǎn)端 broker 獲取镶苞,該方法會(huì)返回拉取元數(shù)據(jù)所耗費(fèi)的時(shí)間喳坠。在消息發(fā)送時(shí)的最大等待時(shí)間時(shí)會(huì)扣除該部分損耗的時(shí)間。

step2:

            byte[] serializedKey;
            try {
                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer");
            }
            byte[] serializedValue;
            try {
                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer");
            }

對(duì)key和value進(jìn)行序列化茂蚓,雖然序列化方法傳入了topic壕鹉、headers兩個(gè)屬性,但是參與序列化的只是key和value聋涨。

step3:

int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);

根據(jù)分區(qū)負(fù)載算法計(jì)算本次消息發(fā)送該發(fā)往的分區(qū)晾浴。其默認(rèn)實(shí)現(xiàn)類(lèi)為 DefaultPartitioner,路由算法如下:

  • 如果消息指定了分區(qū)牍白,則將消息發(fā)送到對(duì)應(yīng)分區(qū)脊凰。
  • 如果指定了 key ,則使用 key 的 hashcode 與分區(qū)數(shù)取模茂腥。
  • 如果未指定 key狸涌,則輪詢(xún)所有的分區(qū)。

step4:

            setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();

            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                    compressionType, serializedKey, serializedValue, headers);
            ensureValidRecordSize(serializedSize);
            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            // producer callback will make sure to call both 'callback' and interceptor callback
            Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);

            if (transactionManager != null && transactionManager.isTransactional())
                transactionManager.maybeAddPartitionToTransaction(tp);

將消息頭信息設(shè)置為只讀最岗,根據(jù)使用的版本號(hào)帕胆,按照消息協(xié)議來(lái)計(jì)算消息的長(zhǎng)度,并是否超過(guò)指定長(zhǎng)度般渡,如果超過(guò)則拋出異常懒豹,先初始化消息時(shí)間戳,并對(duì)傳入的 Callable(回調(diào)函數(shù)) 加入到攔截器鏈中驯用。如果事務(wù)處理器不為空脸秽,執(zhí)行事務(wù)管理相關(guān)的。

step5:重要

            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;
            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly

將消息追加到緩存區(qū)蝴乔,如果當(dāng)前緩存區(qū)寫(xiě)滿(mǎn)或者創(chuàng)建了一個(gè)新的緩存區(qū)记餐,則喚醒Sender(即消息發(fā)送線(xiàn)程),將緩存區(qū)中的消息發(fā)送到broker服務(wù)器淘这,最終返回future剥扣。


3.3 RecordAccumulator的append方法
public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
  • TopicPartition tp topic 與分區(qū)信息,即發(fā)送到哪個(gè) topic 的那個(gè)分區(qū)铝穷。
  • long timestamp 客戶(hù)端發(fā)送時(shí)的時(shí)間戳钠怯。
  • byte[] key 消息的 key。
  • byte[] value 消息體曙聂。
  • Header[] headers 消息頭晦炊,可以理解為額外消息屬性。
  • Callback callback 回調(diào)方法。
  • long maxTimeToBlock 消息追加超時(shí)時(shí)間断国。

step1:

            // check if we have an in-progress batch
            Deque<ProducerBatch> dq = getOrCreateDeque(tp);
            synchronized (dq) {
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null)
                    return appendResult;
            }

使用getOrCreateDeque嘗試根據(jù)topic和分區(qū)獲取一個(gè)雙端隊(duì)列贤姆,如果不存在,則創(chuàng)建一個(gè)稳衬,然后調(diào)用tryAppend方法將消息追加到緩存中(即消息隊(duì)列中)霞捡。
Kafka會(huì)為每一個(gè)topic的每一個(gè)分區(qū)創(chuàng)建一個(gè)消息緩存區(qū)消息先追加到緩存中,然后消息發(fā)送 API 立即返回薄疚,然后由單獨(dú)的線(xiàn)程 Sender 將緩存區(qū)中的消息定時(shí)發(fā)送到 broker 碧信。這里的緩存區(qū)的實(shí)現(xiàn)使用的是 ArrayQeque。然后調(diào)用 tryAppend 方法嘗試將消息追加到其緩存區(qū)街夭,如果追加成功砰碴,則返回結(jié)果。

Kafka雙端隊(duì)列ArrayDeque:

step2:

            byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
            int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            buffer = free.allocate(size, maxTimeToBlock);

如果第一步未追加成功板丽,說(shuō)明當(dāng)前沒(méi)有可用的 ProducerBatch呈枉,則需要?jiǎng)?chuàng)建一個(gè) ProducerBatch,故先從 BufferPool 中申請(qǐng) batch.size 的內(nèi)存空間埃碱,為創(chuàng)建 ProducerBatch 做準(zhǔn)備猖辫,如果由于 BufferPool 中未有剩余內(nèi)存,則最多等待 maxTimeToBlock 砚殿,如果在指定時(shí)間內(nèi)未申請(qǐng)到內(nèi)存住册,則拋出異常。

step3:

synchronized (dq) {
    // Need to check if producer is closed again after grabbing the dequeue lock.
    if (closed)
        throw new KafkaException("Producer closed while send in progress");
    // 省略部分代碼
    MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
    ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
    FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
    dq.addLast(batch);
    incomplete.add(batch);
    // Don't deallocate this buffer in the finally block as it's being used in the record batch
    buffer = null;
    return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
}

創(chuàng)建一個(gè)新的批次ProducerBatch瓮具,并且將消息寫(xiě)入該批次中,并返回追加結(jié)果凡人,

  • 創(chuàng)建 ProducerBatch 名党,其內(nèi)部持有一個(gè) MemoryRecordsBuilder對(duì)象,該對(duì)象負(fù)責(zé)將消息寫(xiě)入到內(nèi)存中挠轴,即寫(xiě)入到 ProducerBatch 內(nèi)部持有的內(nèi)存传睹,大小等于 batch.size。
  • 將消息追加到 ProducerBatch 中岸晦。
  • 將新創(chuàng)建的 ProducerBatch 添加到雙端隊(duì)列的末尾欧啤。
  • 將該批次加入到 incomplete 容器中,該容器存放未完成發(fā)送到 broker 服務(wù)器中的消息批次启上,當(dāng) Sender 線(xiàn)程將消息發(fā)送到 broker 服務(wù)端后邢隧,會(huì)將其移除并釋放所占內(nèi)存。

總結(jié):整個(gè)append的過(guò)程冈在,基本上就是從雙端隊(duì)列獲取一個(gè)未填充完畢的ProducerBatch(消息批次)倒慧,然后嘗試將其寫(xiě)入到該批次中(緩存、內(nèi)存中),如果追加失敗纫谅,則創(chuàng)建一個(gè)新的ProducerBatch然后繼續(xù)追加炫贤。

3.4 ProducerBatch tryAppend方法

上面RecordAccumulator的append方法,會(huì)去調(diào)用器tryAppend方法付秕,然后會(huì)再去調(diào)用ProducerBatch的tryAppend方法兰珍。

    /**
     * Append the record to the current record set and return the relative offset within that record set
     *
     * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
     */
    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
        if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
            return null;
        } else {
            Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
            this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
                    recordsBuilder.compressionType(), key, value, headers));
            this.lastAppendTime = now;
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp, checksum,
                                                                   key == null ? -1 : key.length,
                                                                   value == null ? -1 : value.length);
            // we have to keep every future returned to the users in case the batch needs to be
            // split to several new batches and resent.
            thunks.add(new Thunk(callback, future));
            this.recordCount++;
            return future;
        }
    }

step1:
首先去判斷ProducerBatch是否還能夠容納消息,如果剩余內(nèi)存不足询吴,將直接返回 null掠河。如果返回 null ,會(huì)嘗試再創(chuàng)建一個(gè)新的ProducerBatch汰寓。
step2:
通過(guò) MemoryRecordsBuilder 將消息寫(xiě)入按照 Kafka 消息格式寫(xiě)入到內(nèi)存中口柳,即寫(xiě)入到 在創(chuàng)建 ProducerBatch 時(shí)申請(qǐng)的 ByteBuffer 中。
step3:
更新 ProducerBatch 的 maxRecordSize有滑、lastAppendTime 屬性跃闹,分別表示該批次中最大的消息長(zhǎng)度與最后一次追加消息的時(shí)間。
step4:
構(gòu)建 FutureRecordMetadata 對(duì)象毛好,這里是典型的 Future模式望艺,里面主要包含了該條消息對(duì)應(yīng)的批次的 produceFuture、消息在該批消息的下標(biāo)肌访,key 的長(zhǎng)度找默、消息體的長(zhǎng)度以及當(dāng)前的系統(tǒng)時(shí)間。
step5:
將 callback 吼驶、本條消息的憑證(Future) 加入到該批次的 thunks 中惩激,該集合存儲(chǔ)了 一個(gè)批次中所有消息的發(fā)送回執(zhí)。

4 總結(jié)

Kafka的send方法蟹演,并不會(huì)直接向broker發(fā)送消息风钻,而是首先追加到生產(chǎn)者的內(nèi)存緩存中,其內(nèi)存存儲(chǔ)結(jié)構(gòu)如下:ConcurrentMap<TopicPartition酒请,Deque<ProducerBatch>> batches骡技,Kafka的生產(chǎn)者會(huì)為每一個(gè)topic的每一個(gè)分區(qū)單獨(dú)維護(hù)一個(gè)隊(duì)列,即ArrayDeque羞反,內(nèi)部存放的元素就是ProducerBatch布朦,即代表一個(gè)批次,即Kafka消息發(fā)送時(shí)按批發(fā)送的昼窗。

KafkaProducer的send方法是趴,最終返回的就是Future的子類(lèi),F(xiàn)uture模式 FutureRecordMetadata膏秫。所以kafka的消息發(fā)送如何實(shí)現(xiàn)異步右遭,同步發(fā)送也就是這個(gè)返回值決定的做盅。

  • 若需要同步發(fā)送,只要拿到send方法的返回結(jié)果后窘哈,調(diào)用get方法吹榴,此時(shí)如果消息未發(fā)送到Broker上,該方法就會(huì)被阻塞滚婉,等到 broker 返回消息發(fā)送結(jié)果后該方法會(huì)被喚醒并得到消息發(fā)送結(jié)果图筹。
  • 若需要異步發(fā)送,則建議使用send(ProducerRecord< K, V > record, Callback callback)让腹,但是不能調(diào)用get方法远剩,Callback 會(huì)在收到 broker 的響應(yīng)結(jié)果后被調(diào)用,并且支持?jǐn)r截器骇窍。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末瓜晤,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子腹纳,更是在濱河造成了極大的恐慌痢掠,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,948評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件嘲恍,死亡現(xiàn)場(chǎng)離奇詭異足画,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)佃牛,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,371評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)淹辞,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人俘侠,你說(shuō)我怎么就攤上這事象缀。” “怎么了爷速?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,490評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵攻冷,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我遍希,道長(zhǎng),這世上最難降的妖魔是什么里烦? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,521評(píng)論 1 284
  • 正文 為了忘掉前任凿蒜,我火速辦了婚禮,結(jié)果婚禮上胁黑,老公的妹妹穿的比我還像新娘废封。我一直安慰自己,他們只是感情好丧蘸,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,627評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布漂洋。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪刽漂。 梳的紋絲不亂的頭發(fā)上演训,一...
    開(kāi)封第一講書(shū)人閱讀 49,842評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音贝咙,去河邊找鬼样悟。 笑死,一個(gè)胖子當(dāng)著我的面吹牛庭猩,可吹牛的內(nèi)容都是我干的窟她。 我是一名探鬼主播,決...
    沈念sama閱讀 38,997評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼蔼水,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼震糖!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起趴腋,我...
    開(kāi)封第一講書(shū)人閱讀 37,741評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤吊说,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后于样,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體疏叨,經(jīng)...
    沈念sama閱讀 44,203評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,534評(píng)論 2 327
  • 正文 我和宋清朗相戀三年穿剖,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了蚤蔓。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,673評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡糊余,死狀恐怖秀又,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情贬芥,我是刑警寧澤吐辙,帶...
    沈念sama閱讀 34,339評(píng)論 4 330
  • 正文 年R本政府宣布,位于F島的核電站蘸劈,受9級(jí)特大地震影響昏苏,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜威沫,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,955評(píng)論 3 313
  • 文/蒙蒙 一贤惯、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧棒掠,春花似錦孵构、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,770評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)蜡镶。三九已至,卻和暖如春恤筛,著一層夾襖步出監(jiān)牢的瞬間官还,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,000評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工叹俏, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留妻枕,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,394評(píng)論 2 360
  • 正文 我出身青樓粘驰,卻偏偏與公主長(zhǎng)得像屡谐,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子蝌数,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,562評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容

  • 一愕掏、入門(mén)1、簡(jiǎn)介Kafka is a distributed,partitioned,replicated com...
    HxLiang閱讀 3,345評(píng)論 0 9
  • Design 1. Motivation 我們?cè)O(shè)計(jì)Kafka用來(lái)作為統(tǒng)一的平臺(tái)來(lái)處理大公司可能擁有的所有實(shí)時(shí)數(shù)據(jù)源...
    BlackManba_24閱讀 1,361評(píng)論 0 8
  • 安裝kafka tar -zxvf 進(jìn)入到config目錄下修改server.propertiesbroker.i...
    猿日記閱讀 896評(píng)論 0 0
  • 一顶伞、Kafka簡(jiǎn)介 Kafka (科技術(shù)語(yǔ))饵撑。Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費(fèi)者規(guī)...
    邊學(xué)邊記閱讀 1,725評(píng)論 0 14
  • MQ(消息隊(duì)列)是跨進(jìn)程通信的方式之一唆貌,可理解為異步rpc滑潘,上游系統(tǒng)對(duì)調(diào)用結(jié)果的態(tài)度往往是重要不緊急。使用消息隊(duì)列...
    allin8116閱讀 502評(píng)論 0 0