程序中調(diào)用kafka生產(chǎn)者發(fā)送消息榔组,并不是每調(diào)用一次send方法,就直接將消息通過(guò)底層網(wǎng)絡(luò)發(fā)送給broker了,而是會(huì)將多個(gè)消息形成一個(gè)批次干旁,然后再以批次的形式,發(fā)送給broker它改,當(dāng)然了疤孕,消息的發(fā)送也不是由生產(chǎn)者線(xiàn)程發(fā)送的。那么央拖,kafka的消息如何形成一個(gè)批次,以及批次的形式鹉戚,這個(gè)就是消息累加器的作用鲜戒。
下面從源碼的角度來(lái)看下消息累加器是如何處理消息的,并且還會(huì)和分區(qū)器一起搭配使用抹凳,下面這個(gè)方法是doSend方法的實(shí)現(xiàn)邏輯遏餐,這里只截取和累加器相關(guān)的代碼部分
//前面代碼省略
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true);
if (result.abortForNewBatch) {
int prevPartition = partition;
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false);
}
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
在對(duì)消息的key、value進(jìn)行序列化后赢底,并且根據(jù)分區(qū)器選擇好分區(qū)之后失都,會(huì)調(diào)用累加器的append方法,因此幸冻,重點(diǎn)關(guān)注下append方法的實(shí)現(xiàn)邏輯
/**
@param abortOnNewBatch粹庞,這個(gè)參數(shù)的作用是,是否放棄使用新的批次洽损,每個(gè)分區(qū)都會(huì)對(duì)應(yīng)一個(gè)雙向隊(duì)列庞溜,
每個(gè)隊(duì)列的元素是一個(gè)批次,當(dāng)有新消息時(shí)碑定,會(huì)取出隊(duì)列的最后一個(gè)元素流码,并將消息累加到該批次中,假如批次的容量達(dá)到上限了延刘,那么新消息默認(rèn)需要生成一個(gè)新的批次漫试,
再重新添加到雙向隊(duì)列中,如果參數(shù)為true碘赖,表示在這種情況下驾荣,放棄使用新的批次
*/
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock,
boolean abortOnNewBatch) throws InterruptedException {
//每調(diào)用一次append方法,都會(huì)被記錄下來(lái)
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// 取出分區(qū)對(duì)應(yīng)的雙向隊(duì)列崖疤,若沒(méi)有秘车,則生成一個(gè)新的隊(duì)列,并放入到map中
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
//試圖將消息添加到隊(duì)列的最后一個(gè)批次元素中劫哼,若添加成功叮趴,那么方法直接返回
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null)
return appendResult;
}
//當(dāng)添加失敗時(shí),若參數(shù)指定為true权烧,那么方法會(huì)直接返回眯亦,不會(huì)創(chuàng)建新的批次伤溉。
//外層方法第一次調(diào)用append方法時(shí)傳的參數(shù)為true,
//主要是因?yàn)?kafka的發(fā)送者線(xiàn)程(區(qū)別于生產(chǎn)者線(xiàn)程)以一個(gè)批次為發(fā)送基本單位妻率,因此為了讓消息盡量多的累加到一個(gè)批次乱顾,
//當(dāng)?shù)谝淮螣o(wú)法往分區(qū)隊(duì)列的最后一個(gè)批次累加時(shí),優(yōu)先選擇另一個(gè)分區(qū)的隊(duì)列批次宫静。
if (abortOnNewBatch) {
// Return a result that will cause another call to append.
return new RecordAppendResult(null, false, false, true);
}
//計(jì)算此次消息需要的內(nèi)存大小
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
// 再次檢查生產(chǎn)者線(xiàn)程是否關(guān)閉了
if (closed)
throw new KafkaException("Producer closed while send in progress");
//這邊為何又要重新嘗試append走净,因此當(dāng)有多個(gè)線(xiàn)程并發(fā)的往同一分區(qū)累加消息,
//可能另一個(gè)線(xiàn)程已經(jīng)生成好一個(gè)新的批次對(duì)象孤里,并加入到雙向隊(duì)列中了伏伯,
//因而這邊需要再次嘗試append數(shù)據(jù),而不是直接生成新的批次對(duì)象
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}
//若嘗試append失敗之后捌袜,這里才開(kāi)始真正的構(gòu)建新的批次對(duì)象说搅,并加入到雙向隊(duì)列之中
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callback, time.milliseconds()));
dq.addLast(batch);
//每個(gè)批次還未添加到一個(gè)未完成的集合中,因此這些批次還未發(fā)送和得到broker的確認(rèn)
incomplete.add(batch);
// Don't deallocate this buffer in the finally block as it's being used in the record batch
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
}
} finally {
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}
append方法的返回對(duì)象RecordAppendResult包含以下幾個(gè)
public final FutureRecordMetadata future;//消息記錄的元數(shù)據(jù)信息
public final boolean batchIsFull; //批次是否滿(mǎn)了或者隊(duì)列是否不為空
public final boolean newBatchCreated;//是否新創(chuàng)建的批次
public final boolean abortForNewBatch;//放棄使用新的批次虏等,表示消息往分區(qū)append失敗弄唧,需要重新append
其中abortForNewBatch決定doSend方法中是否再次調(diào)用append方法
if (result.abortForNewBatch) {
int prevPartition = partition;
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false);
}
上述方法體中,會(huì)調(diào)用分區(qū)器的onNewBatch方法霍衫,設(shè)置一個(gè)新的分區(qū)對(duì)應(yīng)的粘性分區(qū)候引,然后往新的分區(qū)append數(shù)據(jù),這里為何要使用新的分區(qū)慕淡,原因在上述append方法實(shí)現(xiàn)中解釋過(guò)了背伴。
當(dāng)批次是滿(mǎn)的或者是新創(chuàng)建時(shí),doSend方法會(huì)喚醒發(fā)送者線(xiàn)程峰髓。這里有個(gè)地方需要注意的是傻寂,kafka生產(chǎn)者線(xiàn)程和發(fā)送者線(xiàn)程是分開(kāi)的,生產(chǎn)者線(xiàn)程負(fù)責(zé)往底層的隊(duì)列中添加消息的批次對(duì)象携兵,而發(fā)送者線(xiàn)程不斷從隊(duì)列中取出消息批次來(lái)發(fā)送給broker疾掰,實(shí)現(xiàn)了消息的構(gòu)造和發(fā)送解耦。