public final class RecordAccumulator {
// Sender線程準備關閉
private volatile boolean closed;
// 批次大小
private final int batchSize;
// 緩存了發(fā)往對應topicPartition的N個消息批次
// Deque是非線程安全绞幌,所以出入隊列需要加鎖
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
private final BufferPool free;
// 未發(fā)送完成的批次集合,底層通過Set<RecordBatch>實現(xiàn)
private final IncompleteRecordBatches incomplete;
// 使用drain方法批量導出批次,為了防止饑餓,使用drainIndex記錄上次發(fā)送停止時的位置谆棱,下次發(fā)送繼續(xù)從該位置開始
private int drainIndex;
// 發(fā)送追加消息
public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Callback callback,
long maxTimeToBlock) throws InterruptedException {
// 通過topic-partition查找map里指定的消息隊列
Deque<RecordBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
// 嘗試往Dequeue的最后一個批次里追加消息
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null)
// 追加成功返回
return appendResult;
}
// 追加失敗, 嘗試申請緩存
// 注意這里走出synchronized代碼塊吏口,釋放dq鎖了粥鞋,可能堵塞在申請緩存空間上壮韭,把發(fā)送消息讓給其他需要更少空間的發(fā)送消息線程
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null) {
// appendResult不為null說明第2次重試tryAppend成功了
// 說明在synchronized釋放申請緩存的時候尚辑,有其他線程釋放了空間
// 所以這里要把申請的緩存還回去
free.deallocate(buffer);
return appendResult;
}
// 申請到新空間后創(chuàng)建一個新批次
// 之前從free那申請的緩存分配給MemoryRecords
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
// 新建的batch加到Dequeue和incomplete里
dq.addLast(batch);
incomplete.add(batch);
// 返回result,return后面會根據(jù)條件判斷是否要喚醒Sender線程
return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
}
}
// 獲取集群中符合發(fā)送消息條件的節(jié)點集合
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
boolean unknownLeadersExist = false;
// waiter隊列里有成員在排隊辑鲤,說明有其他線程在等待BufferPool釋放空間,需要發(fā)送消息釋放空間
boolean exhausted = this.free.queued() > 0;
// 遍歷每個topicPartition
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
TopicPartition part = entry.getKey();
Deque<RecordBatch> deque = entry.getValue();
// 獲取parition的leader節(jié)點
Node leader = cluster.leaderFor(part);
if (leader == null) {
// 標記需要更新Metadata
unknownLeadersExist = true;
} else if (!readyNodes.contains(leader) && !muted.contains(part)) {
synchronized (deque) {
// 加鎖從隊列里取出第一個批次
RecordBatch batch = deque.peekFirst();
if (batch != null) {
boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
long waitedTimeMs = nowMs - batch.lastAttemptMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
// Dequeue中有多個批次或者第一個批次已經(jīng)滿了
// 如果緩存已用容量position超過writeLimit,表示批次已滿
boolean full = deque.size() > 1 || batch.records.isFull();
// 超時
boolean expired = waitedTimeMs >= timeToWaitMs;
// flushInProgress有線程正在等待flush操作完成
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
// 滿足符合發(fā)送消息的節(jié)點
readyNodes.add(leader);
} else {
// 下次調用ready方法的間隔, 本質是控制selector.select()堵塞時參考的等待時長要素之一
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
}
// 將partition-批次集合 轉成 節(jié)點-批次集合 映射
public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
Map<Integer, List<RecordBatch>> batches = new HashMap<>();
// 遍歷需要發(fā)送消息的節(jié)點
for (Node node : nodes) {
int size = 0;
// node上的partition集合
List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
// 待發(fā)送消息批次的集合
List<RecordBatch> ready = new ArrayList<>();
// drainIndex就是斷電續(xù)傳的書簽
int start = drainIndex = drainIndex % parts.size();
do {
PartitionInfo part = parts.get(drainIndex);
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
if (!muted.contains(tp)) {
// 獲取該partion的消息批次集合
Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
if (deque != null) {
synchronized (deque) {
// 獲取第一條消息批次
RecordBatch first = deque.peekFirst();
if (first != null) {
boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
if (!backoff) {
if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
// 要發(fā)送的集合大小超過配置的單個請求的maxSize杠茬,跳出循環(huán)
// 因為外層循環(huán)是node節(jié)點月褥,所以單個請求只是node粒度的
break;
} else {
// 每個node的partition只取一個消息批次,后面會drainIndex+1尋找下一個partition
// 防止只有一個partition在發(fā)送瓢喉,其他partition處于饑餓狀態(tài)
RecordBatch batch = deque.pollFirst();
// 關閉輸出流
batch.records.close();
size += batch.records.sizeInBytes();
ready.add(batch);
batch.drainedMs = now;
}
}
}
}
}
}
this.drainIndex = (this.drainIndex + 1) % parts.size();
} while (start != drainIndex);
batches.put(node.id(), ready);
}
return batches;
}
}
public final class BufferPool {
// 整個pool的大小
private final long totalMemory;
// 指定free隊列里每個byteBuffer的大小,等于batchSize
private final int poolableSize;
// 控制多線程并發(fā)分配和回收ByteBuffer
private final ReentrantLock lock;
// 緩存byteBuffer的隊列,緩沖池
// 池化隊列free宁赤,避免ByteBuffer.allocate創(chuàng)建開銷
private final Deque<ByteBuffer> free;
// 記錄因申請不到足夠的空間而阻塞的線程,實際記錄阻塞線程對應的Condition對象
private final Deque<Condition> waiters;
// totalMemory - free
private long availableMemory;
// 追加消息時候會調用
// 從緩沖池free里申請ByteBuffer栓票,緩沖池空間不足就會阻塞調用線程
// size是申請空間大小,maxTimeToBlockMs申請最大等待時間
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
// 加鎖
this.lock.lock();
try {
if (size == poolableSize && !this.free.isEmpty())
// 申請大小正好等于一個ByteBuffer對象的大小决左,直接從隊列里取出
// 所以消息的長度不要超過batchSize
return this.free.pollFirst();
int freeListSize = this.free.size() * this.poolableSize;
// 緩沖池+剩余可用足夠申請
if (this.availableMemory + freeListSize >= size) {
// 先從free里一塊塊劃走ByteBuffer緩存到availableMemory,直到滿足size
freeUp(size);
// 然后availableMemory里扣除size部分的緩存
this.availableMemory -= size;
lock.unlock();
return ByteBuffer.allocate(size);
} else {
int accumulated = 0;
ByteBuffer buffer = null;
// Condition阻塞等待有足夠的緩存
Condition moreMemory = this.lock.newCondition();
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
// 加入waiters等待隊列
this.waiters.addLast(moreMemory);
while (accumulated < size) {
long startWaitNs = time.nanoseconds();
long timeNs;
// 阻塞Condition等待
boolean waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
if (waitingTimeElapsed) {
// 表示等待超時走贪,拋出異常結束
this.waiters.remove(moreMemory);
throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
}
// 可能有空間申請佛猛,但還不夠size,所以減去時間后繼續(xù)等待
remainingTimeToBlockNs -= timeNs;
// 正好size是一個ByteBuffer
if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
buffer = this.free.pollFirst();
accumulated = size;
} else {
freeUp(size - accumulated);
int got = (int) Math.min(size - accumulated, this.availableMemory);
this.availableMemory -= got;
accumulated += got;
}
}
// 等待隊列是先進先出的
Condition removed = this.waiters.removeFirst();
// 要是還有剩余空間喚醒下一個阻塞等待的其他線程
if (this.availableMemory > 0 || !this.free.isEmpty()) {
if (!this.waiters.isEmpty())
this.waiters.peekFirst().signal();
}
lock.unlock();
if (buffer == null)
return ByteBuffer.allocate(size);
else
return buffer;
}
} finally {
if (lock.isHeldByCurrentThread())
lock.unlock();
}
}
// 釋放緩存歸還
public void deallocate(ByteBuffer buffer, int size) {
lock.lock();
try {
// 正好等于單個ByteBuffer就放回free隊列
if (size == this.poolableSize && size == buffer.capacity()) {
buffer.clear();
this.free.add(buffer);
} else {
// 不等于就加到availableMemory,不放回free隊列?
this.availableMemory += size;
}
Condition moreMem = this.waiters.peekFirst();
if (moreMem != null)
// 喚醒正在堵塞等待可用空間的線程
moreMem.signal();
} finally {
lock.unlock();
}
}
}
public final class RecordBatch {
// 消息個數(shù)
public int recordCount = 0;
// 最大的消息的字節(jié)數(shù)
public int maxRecordSize = 0;
// 嘗試發(fā)送當前批次的次數(shù)
public volatile int attempts = 0;
// 最后一次嘗試發(fā)送的時間戳
public long lastAttemptMs;
// 消息最終存放的地方
public final MemoryRecords records;
// 當前批次發(fā)送給哪個topic的partition
public final TopicPartition topicPartition;
// 標志RecordBatch狀態(tài)的Future對象
public final ProduceRequestResult produceFuture;
// 最后一次往批次里追加消息的時間戳
public long lastAppendTime;
private final List<Thunk> thunks;
// 記錄某消息在批次中的offset
private long offsetCounter = 0L;
// 是否正在重試
private boolean retry;
// 追加消息
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
if (!this.records.hasRoomFor(key, value)) {
// buffer沒有空間返回null
return null;
} else {
// 委托給MemoryRecords追加消息到緩存
long checksum = this.records.append(offsetCounter++, timestamp, key, value);
this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
this.lastAppendTime = now;
// 通過produceFuture的latchDown實現(xiàn)異步future的特性
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length);
if (callback != null)
// thunks就是批次里所有消息callback的隊列集合
thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
}
// 當批次成功收到正常響應坠狡、或超時挚躯、或關閉producer時,調用done方法
public void done(long baseOffset, long timestamp, RuntimeException exception) {
// 回調所有消息的callback
for (int i = 0; i < this.thunks.size(); i++) {
Thunk thunk = this.thunks.get(i);
if (exception == null) {
// thunk.future即服務端返回的結果擦秽,封裝成RecordMetadata
RecordMetadata metadata = new RecordMetadata(this.topicPartition, baseOffset, thunk.future.relativeOffset(),
timestamp == Record.NO_TIMESTAMP ? thunk.future.timestamp() : timestamp,
thunk.future.checksum(),
thunk.future.serializedKeySize(),
thunk.future.serializedValueSize());
thunk.callback.onCompletion(metadata, null);
} else {
thunk.callback.onCompletion(null, exception);
}
}
// 這里會調用latchDown.donw()
this.produceFuture.done(topicPartition, baseOffset, exception);
}
final private static class Thunk {
// 對應消息的callback
final Callback callback;
final FutureRecordMetadata future;
}
}
// 類名字面意思就是包含RecordMetadata的Future對象
public final class FutureRecordMetadata implements Future<RecordMetadata> {
// 類似指針码荔,指向消息所在批次RecordBatch的produceFuture
private final ProduceRequestResult result;
// 消息在批次中的offset
private final long relativeOffset;
// 實現(xiàn)future接口方法
@Override
public RecordMetadata get() throws InterruptedException, ExecutionException {
// 本質是委托produceFuture的latchDown.await()
this.result.await();
return valueOrError();
}
RecordMetadata valueOrError() throws ExecutionException {
if (this.result.error() != null)
throw new ExecutionException(this.result.error());
else
return value();
}
// 當producer收到消息批次的響應時漩勤,get方法返回RecordMetadata對象,即消息的元數(shù)據(jù)
RecordMetadata value() {
return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset,
this.timestamp, this.checksum, this.serializedKeySize, this.serializedValueSize);
}
// 在sender處理服務端返回的響應缩搅,回調callback時會被調用
public void done(TopicPartition topicPartition, long baseOffset, RuntimeException error) {
this.topicPartition = topicPartition;
this.baseOffset = baseOffset;
this.error = error;
// 通知future有結果了
this.latch.countDown();
}
}
public final class ProduceRequestResult {
// 沒有實現(xiàn)Future接口越败,通過CountDownLatch實現(xiàn)了類似Future的功能
private final CountDownLatch latch = new CountDownLatch(1);
// 服務端為批次RecordBatch中第一條消息分配的offset
// 這個批次下每個消息根據(jù)baseOffset能夠計算出自己再服務端分區(qū)中的偏移量
private volatile long baseOffset = -1L;
private volatile RuntimeException error;
// RecordBatch會調用該方法,通知消息被正常響應
public void done(TopicPartition topicPartition, long baseOffset, RuntimeException error) {
this.topicPartition = topicPartition;
this.baseOffset = baseOffset;
// 區(qū)分消息是正常響應還是異常響應
this.error = error;
// 喚醒阻塞在latch.await()的thread
this.latch.countDown();
}
}
public class MemoryRecords implements Records {
// 壓縮后的消息輸出到buffer(壓縮場景主要是瓶頸在網(wǎng)絡帶寬)
private final Compressor compressor;
// buffer最多可以寫入多少個字節(jié)的數(shù)據(jù)
private final int writeLimit;
// 保存消息數(shù)據(jù)
private ByteBuffer buffer;
// 在MemoryRecords發(fā)送前會設置成false-只讀
private boolean writable;
// 私有構造方法
private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int writeLimit) {
...
}
// 只能通過emptyRecords創(chuàng)建MemoryRecords實例
public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type, int writeLimit) {
return new MemoryRecords(buffer, type, true, writeLimit);
}
// 追加消息
public void append(long offset, Record record) {
// 判斷是否只讀
if (!writable)
throw new IllegalStateException("Memory records is not writable");
// 調用compressor將消息寫入ByteBuffer
int size = record.size();
compressor.putLong(offset);
compressor.putInt(size);
compressor.put(record.buffer());
compressor.recordWritten(size + Records.LOG_OVERHEAD);
record.buffer().rewind();
}
// 估算MemoryRecords剩余空間是否足夠寫入消息硼瓣,主要是壓縮后的消息
// 如果估算不準究飞,會導致底層ByteBuffer擴容
public boolean hasRoomFor(byte[] key, byte[] value) {
if (!this.writable)
return false;
return this.compressor.numRecordsWritten() == 0 ?
this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) :
this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
}
}
public class Compressor {
// 添加了壓縮功能
private final DataOutputStream appendStream;
// 封裝了ByteBuffer,當寫入數(shù)據(jù)超出ByteBuffer容量時會ByteBufferOutputStream會自動擴容
private final ByteBufferOutputStream bufferStream;
public Compressor(ByteBuffer buffer, CompressionType type) {
// KafkaProducer傳入的壓縮類型
this.type = type;
// create the stream
bufferStream = new ByteBufferOutputStream(buffer);
// 根據(jù)壓縮類型創(chuàng)建合適的壓縮流,默認是NONE不壓縮
// 裝飾器模式堂鲤,在ByteBufferOutputStream上裝飾一層DataOutputStream
// 最終壓縮流: compressor.put -> appendStream -> bufferStream -> buffer
// 最后數(shù)據(jù)都寫到MemoryRecords的buffer緩存里
appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
}
// 構造壓縮流
public static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
switch (type) {
case NONE:
return new DataOutputStream(buffer);
case GZIP:
// GZIPOutputStream是JDK自帶的亿傅,不需要反射
return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
case SNAPPY:
// snappy需要引入額外的依賴包,缺省不適用snappy壓縮的時候瘟栖,不引入依賴包葵擎,為了編譯通過這里就用反射動態(tài)創(chuàng)建
OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
return new DataOutputStream(stream);
case LZ4:
OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer);
return new DataOutputStream(stream);
}
}
// 寫消息到緩存里,實質調用Record消息靜態(tài)類的write放大
private void putRecord(final long crc, final byte attributes, final long timestamp, final byte[] key, final byte[] value, final int valueOffset, final int valueSize) {
maxTimestamp = Math.max(maxTimestamp, timestamp);
Record.write(this, crc, attributes, timestamp, key, value, valueOffset, valueSize);
}
}
RecordAccumulator.java
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
- 文/潘曉璐 我一進店門糯崎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來译打,“玉大人,你說我怎么就攤上這事拇颅。” “怎么了乔询?”我有些...
- 文/不壞的土叔 我叫張陵樟插,是天一觀的道長。 經(jīng)常有香客問我竿刁,道長黄锤,這世上最難降的妖魔是什么? 我笑而不...
- 正文 為了忘掉前任食拜,我火速辦了婚禮鸵熟,結果婚禮上,老公的妹妹穿的比我還像新娘负甸。我一直安慰自己流强,他們只是感情好痹届,可當我...
- 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著打月,像睡著了一般队腐。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上奏篙,一...
- 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼盹靴!你這毒婦竟也來了炸茧?” 一聲冷哼從身側響起,我...
- 正文 年R本政府宣布滞诺,位于F島的核電站,受9級特大地震影響环疼,放射性物質發(fā)生泄漏习霹。R本人自食惡果不足惜,卻給世界環(huán)境...
- 文/蒙蒙 一炫隶、第九天 我趴在偏房一處隱蔽的房頂上張望淋叶。 院中可真熱鬧,春花似錦伪阶、人聲如沸煞檩。這莊子的主人今日做“春日...
- 文/蒼蘭香墨 我抬頭看了看天上的太陽形娇。三九已至锰霜,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間桐早,已是汗流浹背癣缅。 一陣腳步聲響...
推薦閱讀更多精彩內容
- org.springframework.beans.factory.UnsatisfiedDependencyEx...
- 1.有很多人是通過自學门烂,比如買些專業(yè)的java書籍乳愉、或者通過網(wǎng)上免費視頻學習、進入相應的java論壇等等屯远。ja...
- 因為生病,床上整整躺了兩天房揭,感覺自己快羽化登仙了备闲。作為一個矯情的人,不免會想起在家時的種種好崩溪,真是懷念又讓人心酸...
- 你好,我是玉石玩家張麗云斩松,每天寫一篇原創(chuàng)文章分享我的經(jīng)驗和觀察伶唯,希望能給你一些啟發(fā)和幫助。這是第77篇原創(chuàng)文章惧盹。 ...
- 背景是遇到需要實現(xiàn)選擇不同的選項后跳轉頁面乳幸,有一些是跳的外鏈瞪讼,所以需要在新的選項卡打開。 Location可以實現(xiàn)...