1. 前言
- Sync Producer:低延遲踩窖,低吞吐率娱节,無(wú)數(shù)據(jù)丟失
- Async Producer:高延遲辫红,高吞吐率,可能會(huì)有數(shù)據(jù)丟失
2.Producer發(fā)送消息
通過(guò)KafkaProducer的send方法發(fā)送消息田盈,send方法有兩種重載:
Future<RecordMetadata> send(ProducerRecord<K, V> record)
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
消息是被封裝成ProducerRecord,那么ProducerRecord是怎樣的呢缴阎?
核心屬性:
- String topic 消息所屬的主題
- Integer partition 消息所處的主題的分區(qū)數(shù)允瞧,可以人為指定,如果指定了 key 的話(huà)蛮拔,會(huì)使用 key 的 hashCode 與隊(duì)列總數(shù)進(jìn)行取模來(lái)選擇分區(qū)述暂,如果前面兩者都未指定,則會(huì)輪詢(xún)主題下的所有分區(qū)
- Headers headers 該消息的額外屬性對(duì)建炫,與消息體分開(kāi)存儲(chǔ)
- K key 消息鍵畦韭,如果指定該值,則會(huì)使用該值的 hashcode 與 隊(duì)列數(shù)進(jìn)行取模來(lái)選擇分區(qū)
- V value 消息體
- Long timestamp 消息時(shí)間戳肛跌,根據(jù) topic 的配置信息 message.timestamp.type 的值來(lái)賦予不同的值
CreateTime:發(fā)送客戶(hù)端發(fā)送消息時(shí)的時(shí)間戳
LogAppendTime:消息在broker追加時(shí)的時(shí)間戳
3.源碼分析消息追加流程
KafkaProducer的send方法艺配,并不會(huì)直接向broker發(fā)送消息,kafka將消息發(fā)送異步化衍慎,即分解成兩個(gè)步驟转唉,send 方法的職責(zé)是將消息追加到內(nèi)存中(分區(qū)的緩存隊(duì)列中),然后會(huì)由專(zhuān)門(mén)的 Send 線(xiàn)程異步將緩存中的消息批量發(fā)送到 Kafka Broker 中稳捆。
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
3.1 首先執(zhí)行消息發(fā)送的攔截器
攔截器是通過(guò)屬性interceptor.classes
指定赠法,
轉(zhuǎn)成List<String>類(lèi)型,每一個(gè)元素就是攔截器的全類(lèi)路徑限定名乔夯,默認(rèn)是空的(即沒(méi)有默認(rèn)攔截器存在)
3.2 執(zhí)行doSend方法
step1:
// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
獲取元數(shù)據(jù)信息砖织,包括topic可用的分區(qū)列表款侵,如果本地沒(méi)有該topic的分區(qū)信息,則需要向遠(yuǎn)端 broker 獲取镶苞,該方法會(huì)返回拉取元數(shù)據(jù)所耗費(fèi)的時(shí)間喳坠。在消息發(fā)送時(shí)的最大等待時(shí)間時(shí)會(huì)扣除該部分損耗的時(shí)間。
step2:
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer");
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer");
}
對(duì)key和value進(jìn)行序列化茂蚓,雖然序列化方法傳入了topic壕鹉、headers兩個(gè)屬性,但是參與序列化的只是key和value聋涨。
step3:
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
根據(jù)分區(qū)負(fù)載算法計(jì)算本次消息發(fā)送該發(fā)往的分區(qū)晾浴。其默認(rèn)實(shí)現(xiàn)類(lèi)為 DefaultPartitioner,路由算法如下:
- 如果消息指定了分區(qū)牍白,則將消息發(fā)送到對(duì)應(yīng)分區(qū)脊凰。
- 如果指定了 key ,則使用 key 的 hashcode 與分區(qū)數(shù)取模茂腥。
- 如果未指定 key狸涌,則輪詢(xún)所有的分區(qū)。
step4:
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
將消息頭信息設(shè)置為只讀最岗,根據(jù)使用的版本號(hào)帕胆,按照消息協(xié)議來(lái)計(jì)算消息的長(zhǎng)度,并是否超過(guò)指定長(zhǎng)度般渡,如果超過(guò)則拋出異常懒豹,先初始化消息時(shí)間戳,并對(duì)傳入的 Callable(回調(diào)函數(shù)) 加入到攔截器鏈中驯用。如果事務(wù)處理器不為空脸秽,執(zhí)行事務(wù)管理相關(guān)的。
step5:重要
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
將消息追加到緩存區(qū)蝴乔,如果當(dāng)前緩存區(qū)寫(xiě)滿(mǎn)或者創(chuàng)建了一個(gè)新的緩存區(qū)记餐,則喚醒Sender(即消息發(fā)送線(xiàn)程),將緩存區(qū)中的消息發(fā)送到broker服務(wù)器淘这,最終返回future剥扣。
3.3 RecordAccumulator的append方法
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
- TopicPartition tp topic 與分區(qū)信息,即發(fā)送到哪個(gè) topic 的那個(gè)分區(qū)铝穷。
- long timestamp 客戶(hù)端發(fā)送時(shí)的時(shí)間戳钠怯。
- byte[] key 消息的 key。
- byte[] value 消息體曙聂。
- Header[] headers 消息頭晦炊,可以理解為額外消息屬性。
- Callback callback 回調(diào)方法。
- long maxTimeToBlock 消息追加超時(shí)時(shí)間断国。
step1:
// check if we have an in-progress batch
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null)
return appendResult;
}
使用getOrCreateDeque
嘗試根據(jù)topic和分區(qū)獲取一個(gè)雙端隊(duì)列贤姆,如果不存在,則創(chuàng)建一個(gè)稳衬,然后調(diào)用tryAppend
方法將消息追加到緩存中(即消息隊(duì)列中)霞捡。
Kafka會(huì)為每一個(gè)topic的每一個(gè)分區(qū)創(chuàng)建一個(gè)消息緩存區(qū)消息先追加到緩存中,然后消息發(fā)送 API 立即返回薄疚,然后由單獨(dú)的線(xiàn)程 Sender 將緩存區(qū)中的消息定時(shí)發(fā)送到 broker 碧信。這里的緩存區(qū)的實(shí)現(xiàn)使用的是 ArrayQeque。然后調(diào)用 tryAppend 方法嘗試將消息追加到其緩存區(qū)街夭,如果追加成功砰碴,則返回結(jié)果。
Kafka雙端隊(duì)列ArrayDeque:
step2:
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
buffer = free.allocate(size, maxTimeToBlock);
如果第一步未追加成功板丽,說(shuō)明當(dāng)前沒(méi)有可用的 ProducerBatch呈枉,則需要?jiǎng)?chuàng)建一個(gè) ProducerBatch,故先從 BufferPool 中申請(qǐng) batch.size 的內(nèi)存空間埃碱,為創(chuàng)建 ProducerBatch 做準(zhǔn)備猖辫,如果由于 BufferPool 中未有剩余內(nèi)存,則最多等待 maxTimeToBlock 砚殿,如果在指定時(shí)間內(nèi)未申請(qǐng)到內(nèi)存住册,則拋出異常。
step3:
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new KafkaException("Producer closed while send in progress");
// 省略部分代碼
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
dq.addLast(batch);
incomplete.add(batch);
// Don't deallocate this buffer in the finally block as it's being used in the record batch
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
}
創(chuàng)建一個(gè)新的批次ProducerBatch瓮具,并且將消息寫(xiě)入該批次中,并返回追加結(jié)果凡人,
- 創(chuàng)建 ProducerBatch 名党,其內(nèi)部持有一個(gè) MemoryRecordsBuilder對(duì)象,該對(duì)象負(fù)責(zé)將消息寫(xiě)入到內(nèi)存中挠轴,即寫(xiě)入到 ProducerBatch 內(nèi)部持有的內(nèi)存传睹,大小等于 batch.size。
- 將消息追加到 ProducerBatch 中岸晦。
- 將新創(chuàng)建的 ProducerBatch 添加到雙端隊(duì)列的末尾欧啤。
- 將該批次加入到 incomplete 容器中,該容器存放未完成發(fā)送到 broker 服務(wù)器中的消息批次启上,當(dāng) Sender 線(xiàn)程將消息發(fā)送到 broker 服務(wù)端后邢隧,會(huì)將其移除并釋放所占內(nèi)存。
總結(jié):整個(gè)append的過(guò)程冈在,基本上就是從雙端隊(duì)列獲取一個(gè)未填充完畢的ProducerBatch(消息批次)倒慧,然后嘗試將其寫(xiě)入到該批次中(緩存、內(nèi)存中),如果追加失敗纫谅,則創(chuàng)建一個(gè)新的ProducerBatch然后繼續(xù)追加炫贤。
3.4 ProducerBatch tryAppend方法
上面RecordAccumulator的append方法,會(huì)去調(diào)用器tryAppend方法付秕,然后會(huì)再去調(diào)用ProducerBatch的tryAppend方法兰珍。
/**
* Append the record to the current record set and return the relative offset within that record set
*
* @return The RecordSend corresponding to this record or null if there isn't sufficient room.
*/
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
return null;
} else {
Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), key, value, headers));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length);
// we have to keep every future returned to the users in case the batch needs to be
// split to several new batches and resent.
thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
}
step1:
首先去判斷ProducerBatch是否還能夠容納消息,如果剩余內(nèi)存不足询吴,將直接返回 null掠河。如果返回 null ,會(huì)嘗試再創(chuàng)建一個(gè)新的ProducerBatch汰寓。
step2:
通過(guò) MemoryRecordsBuilder 將消息寫(xiě)入按照 Kafka 消息格式寫(xiě)入到內(nèi)存中口柳,即寫(xiě)入到 在創(chuàng)建 ProducerBatch 時(shí)申請(qǐng)的 ByteBuffer 中。
step3:
更新 ProducerBatch 的 maxRecordSize有滑、lastAppendTime 屬性跃闹,分別表示該批次中最大的消息長(zhǎng)度與最后一次追加消息的時(shí)間。
step4:
構(gòu)建 FutureRecordMetadata 對(duì)象毛好,這里是典型的 Future模式望艺,里面主要包含了該條消息對(duì)應(yīng)的批次的 produceFuture、消息在該批消息的下標(biāo)肌访,key 的長(zhǎng)度找默、消息體的長(zhǎng)度以及當(dāng)前的系統(tǒng)時(shí)間。
step5:
將 callback 吼驶、本條消息的憑證(Future) 加入到該批次的 thunks 中惩激,該集合存儲(chǔ)了 一個(gè)批次中所有消息的發(fā)送回執(zhí)。
4 總結(jié)
Kafka的send方法蟹演,并不會(huì)直接向broker發(fā)送消息风钻,而是首先追加到生產(chǎn)者的內(nèi)存緩存中,其內(nèi)存存儲(chǔ)結(jié)構(gòu)如下:ConcurrentMap<TopicPartition酒请,Deque<ProducerBatch>> batches
骡技,Kafka的生產(chǎn)者會(huì)為每一個(gè)topic的每一個(gè)分區(qū)單獨(dú)維護(hù)一個(gè)隊(duì)列,即ArrayDeque羞反,內(nèi)部存放的元素就是ProducerBatch布朦,即代表一個(gè)批次,即Kafka消息發(fā)送時(shí)按批發(fā)送的昼窗。
KafkaProducer的send方法是趴,最終返回的就是Future的子類(lèi),F(xiàn)uture模式 FutureRecordMetadata
膏秫。所以kafka的消息發(fā)送如何實(shí)現(xiàn)異步右遭,同步發(fā)送也就是這個(gè)返回值決定的做盅。
- 若需要同步發(fā)送,只要拿到send方法的返回結(jié)果后窘哈,調(diào)用get方法吹榴,此時(shí)如果消息未發(fā)送到Broker上,該方法就會(huì)被阻塞滚婉,等到 broker 返回消息發(fā)送結(jié)果后該方法會(huì)被喚醒并得到消息發(fā)送結(jié)果图筹。
- 若需要異步發(fā)送,則建議使用send(ProducerRecord< K, V > record, Callback callback)让腹,但是不能調(diào)用get方法远剩,Callback 會(huì)在收到 broker 的響應(yīng)結(jié)果后被調(diào)用,并且支持?jǐn)r截器骇窍。