public final class RecordAccumulator {

    // Sender線程準備關閉
    private volatile boolean closed;
    // 批次大小
    private final int batchSize;
    // 緩存了發(fā)往對應topicPartition的N個消息批次
    // Deque是非線程安全绞幌,所以出入隊列需要加鎖
    private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
    private final BufferPool free;
    // 未發(fā)送完成的批次集合,底層通過Set<RecordBatch>實現(xiàn)
    private final IncompleteRecordBatches incomplete;
    // 使用drain方法批量導出批次,為了防止饑餓,使用drainIndex記錄上次發(fā)送停止時的位置谆棱,下次發(fā)送繼續(xù)從該位置開始
    private int drainIndex;

    // 發(fā)送追加消息
    public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        // 通過topic-partition查找map里指定的消息隊列
        Deque<RecordBatch> dq = getOrCreateDeque(tp);
        synchronized (dq) {
            // 嘗試往Dequeue的最后一個批次里追加消息
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
            if (appendResult != null)
                // 追加成功返回
                return appendResult;

        // 追加失敗, 嘗試申請緩存
        // 注意這里走出synchronized代碼塊吏口,釋放dq鎖了粥鞋,可能堵塞在申請緩存空間上壮韭,把發(fā)送消息讓給其他需要更少空間的發(fā)送消息線程
        int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
        ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
        synchronized (dq) {
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
            if (appendResult != null) {
                // appendResult不為null說明第2次重試tryAppend成功了
                // 說明在synchronized釋放申請緩存的時候尚辑,有其他線程釋放了空間
                // 所以這里要把申請的緩存還回去
                return appendResult;
            // 申請到新空間后創(chuàng)建一個新批次
            // 之前從free那申請的緩存分配給MemoryRecords
            MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
            RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));

            // 新建的batch加到Dequeue和incomplete里
            // 返回result,return后面會根據(jù)條件判斷是否要喚醒Sender線程
            return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);

    // 獲取集群中符合發(fā)送消息條件的節(jié)點集合
    public ReadyCheckResult ready(Cluster cluster, long nowMs) {
        Set<Node> readyNodes = new HashSet<>();
        long nextReadyCheckDelayMs = Long.MAX_VALUE;
        boolean unknownLeadersExist = false;

        // waiter隊列里有成員在排隊辑鲤,說明有其他線程在等待BufferPool釋放空間,需要發(fā)送消息釋放空間
        boolean exhausted = > 0;
        // 遍歷每個topicPartition
        for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
            TopicPartition part = entry.getKey();
            Deque<RecordBatch> deque = entry.getValue();

            // 獲取parition的leader節(jié)點
            Node leader = cluster.leaderFor(part);
            if (leader == null) {
                // 標記需要更新Metadata
                unknownLeadersExist = true;
            } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
                synchronized (deque) {
                    // 加鎖從隊列里取出第一個批次
                    RecordBatch batch = deque.peekFirst();
                    if (batch != null) {
                        boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
                        long waitedTimeMs = nowMs - batch.lastAttemptMs;
                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                        long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                        // Dequeue中有多個批次或者第一個批次已經(jīng)滿了
                        // 如果緩存已用容量position超過writeLimit,表示批次已滿
                        boolean full = deque.size() > 1 || batch.records.isFull();
                        // 超時
                        boolean expired = waitedTimeMs >= timeToWaitMs;
                        // flushInProgress有線程正在等待flush操作完成
                        boolean sendable = full || expired || exhausted || closed || flushInProgress();
                        if (sendable && !backingOff) {
                            // 滿足符合發(fā)送消息的節(jié)點
                        } else {
                            // 下次調用ready方法的間隔, 本質是控制堵塞時參考的等待時長要素之一
                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);

        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);

    // 將partition-批次集合 轉成 節(jié)點-批次集合 映射
     public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {

        Map<Integer, List<RecordBatch>> batches = new HashMap<>();
        // 遍歷需要發(fā)送消息的節(jié)點
        for (Node node : nodes) {
            int size = 0;
            // node上的partition集合
            List<PartitionInfo> parts = cluster.partitionsForNode(;
            // 待發(fā)送消息批次的集合
            List<RecordBatch> ready = new ArrayList<>();
            // drainIndex就是斷電續(xù)傳的書簽
            int start = drainIndex = drainIndex % parts.size();
            do {
                PartitionInfo part = parts.get(drainIndex);
                TopicPartition tp = new TopicPartition(part.topic(), part.partition());
                if (!muted.contains(tp)) {
                    // 獲取該partion的消息批次集合
                    Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
                    if (deque != null) {
                        synchronized (deque) {
                            // 獲取第一條消息批次
                            RecordBatch first = deque.peekFirst();
                            if (first != null) {
                                boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
                                if (!backoff) {
                                    if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
                                        // 要發(fā)送的集合大小超過配置的單個請求的maxSize杠茬,跳出循環(huán)
                                        // 因為外層循環(huán)是node節(jié)點月褥,所以單個請求只是node粒度的
                                    } else {
                                        // 每個node的partition只取一個消息批次,后面會drainIndex+1尋找下一個partition
                                        // 防止只有一個partition在發(fā)送瓢喉,其他partition處于饑餓狀態(tài)
                                        RecordBatch batch = deque.pollFirst();
                                        // 關閉輸出流
                                        size += batch.records.sizeInBytes();
                                        batch.drainedMs = now;
                this.drainIndex = (this.drainIndex + 1) % parts.size();
            } while (start != drainIndex);
            batches.put(, ready);
        return batches;

public final class BufferPool {
    // 整個pool的大小
    private final long totalMemory;
    // 指定free隊列里每個byteBuffer的大小,等于batchSize
    private final int poolableSize;
    // 控制多線程并發(fā)分配和回收ByteBuffer
    private final ReentrantLock lock;
    // 緩存byteBuffer的隊列,緩沖池
    // 池化隊列free宁赤,避免ByteBuffer.allocate創(chuàng)建開銷
    private final Deque<ByteBuffer> free;
    // 記錄因申請不到足夠的空間而阻塞的線程,實際記錄阻塞線程對應的Condition對象
    private final Deque<Condition> waiters;
    // totalMemory - free
    private long availableMemory;

    // 追加消息時候會調用
    // 從緩沖池free里申請ByteBuffer栓票,緩沖池空間不足就會阻塞調用線程
    // size是申請空間大小,maxTimeToBlockMs申請最大等待時間
    public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
        // 加鎖
        try {
            if (size == poolableSize && !
                // 申請大小正好等于一個ByteBuffer對象的大小决左,直接從隊列里取出
                // 所以消息的長度不要超過batchSize

            int freeListSize = * this.poolableSize;
            // 緩沖池+剩余可用足夠申請
            if (this.availableMemory + freeListSize >= size) {
                // 先從free里一塊塊劃走ByteBuffer緩存到availableMemory,直到滿足size
                // 然后availableMemory里扣除size部分的緩存
                this.availableMemory -= size;
                return ByteBuffer.allocate(size);
            } else {
                int accumulated = 0;
                ByteBuffer buffer = null;
                // Condition阻塞等待有足夠的緩存
                Condition moreMemory = this.lock.newCondition();
                long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
                // 加入waiters等待隊列
                while (accumulated < size) {
                    long startWaitNs = time.nanoseconds();
                    long timeNs;
                    // 阻塞Condition等待
                    boolean waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);

                    if (waitingTimeElapsed) {
                        // 表示等待超時走贪,拋出異常結束
                        throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");

                    // 可能有空間申請佛猛,但還不夠size,所以減去時間后繼續(xù)等待
                    remainingTimeToBlockNs -= timeNs;
                    // 正好size是一個ByteBuffer
                    if (accumulated == 0 && size == this.poolableSize && ! {
                        buffer =;
                        accumulated = size;
                    } else {
                        freeUp(size - accumulated);
                        int got = (int) Math.min(size - accumulated, this.availableMemory);
                        this.availableMemory -= got;
                        accumulated += got;

                // 等待隊列是先進先出的
                Condition removed = this.waiters.removeFirst();
                // 要是還有剩余空間喚醒下一個阻塞等待的其他線程
                if (this.availableMemory > 0 || ! {
                    if (!this.waiters.isEmpty())

                if (buffer == null)
                    return ByteBuffer.allocate(size);
                    return buffer;
        } finally {
            if (lock.isHeldByCurrentThread())

    // 釋放緩存歸還
    public void deallocate(ByteBuffer buffer, int size) {
        try {
            // 正好等于單個ByteBuffer就放回free隊列
            if (size == this.poolableSize && size == buffer.capacity()) {
            } else {
                // 不等于就加到availableMemory,不放回free隊列?
                this.availableMemory += size;
            Condition moreMem = this.waiters.peekFirst();
            if (moreMem != null)
                // 喚醒正在堵塞等待可用空間的線程
        } finally {

public final class RecordBatch {
    // 消息個數(shù)
    public int recordCount = 0;
    // 最大的消息的字節(jié)數(shù)
    public int maxRecordSize = 0;
    // 嘗試發(fā)送當前批次的次數(shù)
    public volatile int attempts = 0;
    // 最后一次嘗試發(fā)送的時間戳
    public long lastAttemptMs;
    // 消息最終存放的地方
    public final MemoryRecords records;
    // 當前批次發(fā)送給哪個topic的partition
    public final TopicPartition topicPartition;
    // 標志RecordBatch狀態(tài)的Future對象
    public final ProduceRequestResult produceFuture;
    // 最后一次往批次里追加消息的時間戳
    public long lastAppendTime;
    private final List<Thunk> thunks;
    // 記錄某消息在批次中的offset
    private long offsetCounter = 0L;
    // 是否正在重試
    private boolean retry;

    // 追加消息
    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
        if (!this.records.hasRoomFor(key, value)) {
            // buffer沒有空間返回null
            return null;
        } else {
            // 委托給MemoryRecords追加消息到緩存
            long checksum = this.records.append(offsetCounter++, timestamp, key, value);
            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
            this.lastAppendTime = now;
            // 通過produceFuture的latchDown實現(xiàn)異步future的特性
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp, checksum,
                                                                   key == null ? -1 : key.length,
                                                                   value == null ? -1 : value.length);
            if (callback != null)
                // thunks就是批次里所有消息callback的隊列集合
                thunks.add(new Thunk(callback, future));
            return future;

    // 當批次成功收到正常響應坠狡、或超時挚躯、或關閉producer時,調用done方法
    public void done(long baseOffset, long timestamp, RuntimeException exception) {
        // 回調所有消息的callback
        for (int i = 0; i < this.thunks.size(); i++) {
            Thunk thunk = this.thunks.get(i);
            if (exception == null) {
                // thunk.future即服務端返回的結果擦秽,封裝成RecordMetadata
                RecordMetadata metadata = new RecordMetadata(this.topicPartition,  baseOffset, thunk.future.relativeOffset(),
                                                             timestamp == Record.NO_TIMESTAMP ? thunk.future.timestamp() : timestamp,
                thunk.callback.onCompletion(metadata, null);
            } else {
                thunk.callback.onCompletion(null, exception);
        // 這里會調用latchDown.donw()
        this.produceFuture.done(topicPartition, baseOffset, exception);

    final private static class Thunk {
        // 對應消息的callback
        final Callback callback;
        final FutureRecordMetadata future;

// 類名字面意思就是包含RecordMetadata的Future對象
public final class FutureRecordMetadata implements Future<RecordMetadata> {

    // 類似指針码荔,指向消息所在批次RecordBatch的produceFuture
    private final ProduceRequestResult result;
    // 消息在批次中的offset
    private final long relativeOffset;

    // 實現(xiàn)future接口方法
    public RecordMetadata get() throws InterruptedException, ExecutionException {
        // 本質是委托produceFuture的latchDown.await()
        return valueOrError();

    RecordMetadata valueOrError() throws ExecutionException {
        if (this.result.error() != null)
            throw new ExecutionException(this.result.error());
            return value();

    // 當producer收到消息批次的響應時漩勤,get方法返回RecordMetadata對象,即消息的元數(shù)據(jù)
    RecordMetadata value() {
        return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset,
                                  this.timestamp, this.checksum, this.serializedKeySize, this.serializedValueSize);

    // 在sender處理服務端返回的響應缩搅,回調callback時會被調用
    public void done(TopicPartition topicPartition, long baseOffset, RuntimeException error) {
        this.topicPartition = topicPartition;
        this.baseOffset = baseOffset;
        this.error = error;
        // 通知future有結果了

public final class ProduceRequestResult {
    // 沒有實現(xiàn)Future接口越败,通過CountDownLatch實現(xiàn)了類似Future的功能
    private final CountDownLatch latch = new CountDownLatch(1);
    // 服務端為批次RecordBatch中第一條消息分配的offset
    // 這個批次下每個消息根據(jù)baseOffset能夠計算出自己再服務端分區(qū)中的偏移量
    private volatile long baseOffset = -1L;
    private volatile RuntimeException error;

    // RecordBatch會調用該方法,通知消息被正常響應
    public void done(TopicPartition topicPartition, long baseOffset, RuntimeException error) {
        this.topicPartition = topicPartition;
        this.baseOffset = baseOffset;
        // 區(qū)分消息是正常響應還是異常響應
        this.error = error;
        // 喚醒阻塞在latch.await()的thread

public class MemoryRecords implements Records {

    // 壓縮后的消息輸出到buffer(壓縮場景主要是瓶頸在網(wǎng)絡帶寬)
    private final Compressor compressor;
    // buffer最多可以寫入多少個字節(jié)的數(shù)據(jù)
    private final int writeLimit;
    // 保存消息數(shù)據(jù)
    private ByteBuffer buffer;
    // 在MemoryRecords發(fā)送前會設置成false-只讀
    private boolean writable;

    // 私有構造方法
    private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int writeLimit) {

    // 只能通過emptyRecords創(chuàng)建MemoryRecords實例
    public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type, int writeLimit) {
        return new MemoryRecords(buffer, type, true, writeLimit);

    // 追加消息
    public void append(long offset, Record record) {
        // 判斷是否只讀
        if (!writable)
            throw new IllegalStateException("Memory records is not writable");

        // 調用compressor將消息寫入ByteBuffer
        int size = record.size();
        compressor.recordWritten(size + Records.LOG_OVERHEAD);

    // 估算MemoryRecords剩余空間是否足夠寫入消息硼瓣,主要是壓縮后的消息
    // 如果估算不準究飞,會導致底層ByteBuffer擴容
    public boolean hasRoomFor(byte[] key, byte[] value) {
        if (!this.writable)
            return false;

        return this.compressor.numRecordsWritten() == 0 ?
            this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) :
            this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);


public class Compressor {
    // 添加了壓縮功能
    private final DataOutputStream appendStream;
    // 封裝了ByteBuffer,當寫入數(shù)據(jù)超出ByteBuffer容量時會ByteBufferOutputStream會自動擴容
    private final ByteBufferOutputStream bufferStream;

    public Compressor(ByteBuffer buffer, CompressionType type) {
        // KafkaProducer傳入的壓縮類型
        this.type = type;
        // create the stream
        bufferStream = new ByteBufferOutputStream(buffer);
        // 根據(jù)壓縮類型創(chuàng)建合適的壓縮流,默認是NONE不壓縮
        // 裝飾器模式堂鲤,在ByteBufferOutputStream上裝飾一層DataOutputStream
        // 最終壓縮流: compressor.put -> appendStream -> bufferStream -> buffer
        // 最后數(shù)據(jù)都寫到MemoryRecords的buffer緩存里
        appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);

    // 構造壓縮流
    public static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
        switch (type) {
            case NONE:
                return new DataOutputStream(buffer);
            case GZIP:
                // GZIPOutputStream是JDK自帶的亿傅,不需要反射
                return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
            case SNAPPY:
                // snappy需要引入額外的依賴包,缺省不適用snappy壓縮的時候瘟栖,不引入依賴包葵擎,為了編譯通過這里就用反射動態(tài)創(chuàng)建
                OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
                return new DataOutputStream(stream);
            case LZ4:
                OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer);
                return new DataOutputStream(stream);

    // 寫消息到緩存里,實質調用Record消息靜態(tài)類的write放大
    private void putRecord(final long crc, final byte attributes, final long timestamp, final byte[] key, final byte[] value, final int valueOffset, final int valueSize) {
        maxTimestamp = Math.max(maxTimestamp, timestamp);
        Record.write(this, crc, attributes, timestamp, key, value, valueOffset, valueSize);
