RocketMQ 存儲機制源碼解析

producer 發(fā)送消息后析既,broker端開始存儲消息者疤,會調(diào)用 store 模塊的 DefaultMessageStore.putMessage 進行存儲消息畜埋。

DefaultMessageStore.putMessage

public PutMessageResult putMessage(MessageExtBrokerInner msg) {
    // 判斷運行狀態(tài)
    if (this.shutdown) {
        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    }
    // 如果當前節(jié)點為 SLAVE 則禁止寫入
    if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
        long value = this.printTimes.getAndIncrement();
        if ((value % 50000) == 0) {
            log.warn("message store is slave mode, so putMessage is forbidden ");
        }
        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    }
    // 判斷是否有可寫權(quán)限
    if (!this.runningFlags.isWriteable()) {
        long value = this.printTimes.getAndIncrement();
        if ((value % 50000) == 0) {
            log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
        }

        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    } else {
        this.printTimes.set(0);
    }
    // 判斷Topic 長度不能超過 127
    if (msg.getTopic().length() > Byte.MAX_VALUE) {
        log.warn("putMessage message topic length too long " + msg.getTopic().length());
        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
    }
    // properties 屬性長度不能超過 32767
    if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
        log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
        return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
    }
    // 判斷當前系統(tǒng)是否正忙
    if (this.isOSPageCacheBusy()) {
        return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
    }

    long beginTime = this.getSystemClock().now();
    // 調(diào)用 commitLog.putMessage 插入消息
    PutMessageResult result = this.commitLog.putMessage(msg);

    long eclipseTime = this.getSystemClock().now() - beginTime;
    if (eclipseTime > 500) {
        log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
    }
    // 添加統(tǒng)計消息
    this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);

    if (null == result || !result.isOk()) {
        this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
    }

    return result;
}

判斷系統(tǒng)是否繁忙

public boolean isOSPageCacheBusy() {
    // 獲取 putMessage 寫入鎖的時間,如果當前沒有正在寫的消息谤碳,則該值為0
    long begin = this.getCommitLog().getBeginTimeInLock();
    long diff = this.systemClock.now() - begin;
    return diff < 10000000
            && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
}

如果當前沒有正在寫入消息佑钾,則 begin = 0;
如果當前有正在寫入的消息伞辛,則 begin 為寫入消息獲取開始時間烂翰。

CommitLog.putMessage()

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
        // 設(shè)置存儲時間
        msg.setStoreTimestamp(System.currentTimeMillis());
        // 設(shè)置消息的校驗信息
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));

        AppendMessageResult result = null;

        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        String topic = msg.getTopic();
        int queueId = msg.getQueueId();

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        // 添加 延遲消息
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            
            if (msg.getDelayTimeLevel() > 0) {
                // 如果設(shè)置的延遲消息級別大于系統(tǒng)默認的最大值,則設(shè)置默認的最大值(默認最大級別為18)
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
                
                topic = ScheduleMessageService.SCHEDULE_TOPIC;
                // 根據(jù)延遲的級別獲取對應(yīng)延遲隊列的 queue id
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
                // 把延時消息先存儲到延時的消息隊列文件中
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

        long eclipseTimeInLock = 0;
        MappedFile unlockMappedFile = null;
        // 獲取最后一個 mappedFile 文件(最后一個是可寫入的)
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
        // put message 需要獲取鎖蚤氏,配置文件中可以配置使用自旋鎖或重入鎖
        putMessageLock.lock();
        try {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // 設(shè)置存儲時間甘耿,為了確保全局有序
            msg.setStoreTimestamp(beginLockTimestamp);

            // 如果 mappedFile 為空或已滿,則創(chuàng)建一個新的 mappedFile
            if (null == mappedFile || mappedFile.isFull()) {
                // 該方法中會判斷 lastMappedFile 是否可用竿滨,如果沒有可用的則創(chuàng)建
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); 
            }
            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            }
            // 向文件中添加信息
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                case END_OF_FILE:
                    unlockMappedFile = mappedFile;
                    // 文件不足寫下這條消息佳恬,創(chuàng)建一個新的 commitlog 文件
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                    }
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                default:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            }

            eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
            putMessageLock.unlock();
        }
        // 存儲消息超過500毫秒,打印警告信息
        if (eclipseTimeInLock > 500) {
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
        }

        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }
        // 設(shè)置返回結(jié)果
        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // 添加統(tǒng)計信息
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
        // 刷盤策略(異步刷盤還是同步刷盤)
        handleDiskFlush(result, putMessageResult, msg);
        // 同步策略(消息同步Slave:實時同步還是異步同步)
        handleHA(result, putMessageResult, msg);

        return putMessageResult;
    }

延時消息

如果發(fā)送的消息是延時消息于游,則把消息的 topic 和 queueid 修改為延時消息隊列的信息毁葱。

發(fā)送消息的時候通過 setDelayTimeLevel(int level) 方法設(shè)置延遲級別。這個級別目前只支持1-18贰剥。

每一個級別對應(yīng)一個固定的延時時間倾剿,代碼如下:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
public boolean parseDelayLevel() {
    HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
    timeUnitTable.put("s", 1000L);
    timeUnitTable.put("m", 1000L * 60);
    timeUnitTable.put("h", 1000L * 60 * 60);
    timeUnitTable.put("d", 1000L * 60 * 60 * 24);

    String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
    try {
        String[] levelArray = levelString.split(" ");
        for (int i = 0; i < levelArray.length; i++) {
            String value = levelArray[i];
            String ch = value.substring(value.length() - 1);
            Long tu = timeUnitTable.get(ch);

            int level = i + 1;
            if (level > this.maxDelayLevel) {
                this.maxDelayLevel = level;
            }
            long num = Long.parseLong(value.substring(0, value.length() - 1));
            long delayTimeMillis = tu * num;
            this.delayLevelTable.put(level, delayTimeMillis);
        }
    } catch (Exception e) {
        log.error("parseDelayLevel exception", e);
        log.info("levelString String = {}", levelString);
        return false;
    }

    return true;
}

可以看出 最大級別是18,最大延時的時間是2小時鸠澈。

獲取要寫入的 CommitLog 文件

調(diào)用this.mappedFileQueue.getLastMappedFile(0);獲取要寫入的 CommitLog 文件
因為 CommitLog 是順序?qū)懙闹妫詈笠粋€ CommitLog 肯定是要寫如的文件截驮。
如果獲取不到,或者最后的一個CommitLog 文件已經(jīng)寫滿則創(chuàng)建一個际度。

public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
    long createOffset = -1;
    // 獲取要寫入的CommitLog文件對應(yīng)的 MappedFile
    MappedFile mappedFileLast = getLastMappedFile();

    //mappedFileLast 為空或者最后一個對象對應(yīng)的文件已經(jīng)寫滿葵袭,則創(chuàng)建一個新的文件(即新的 MapedFile 對象) ;
    //計算出新文件的起始偏移量(起始偏移量就是文件名稱)
    if (mappedFileLast == null) {
        createOffset = startOffset - (startOffset % this.mappedFileSize);
    }
    if (mappedFileLast != null && mappedFileLast.isFull()) {
        createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
    }

    if (createOffset != -1 && needCreate) {
        String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
        String nextNextFilePath = this.storePath + File.separator
            + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
        MappedFile mappedFile = null;
         // 判斷 allocateMappedFileService 服務(wù)是否初始化乖菱,并創(chuàng)建下一個文件和下下一個文件
        if (this.allocateMappedFileService != null) {
            mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                nextNextFilePath, this.mappedFileSize);
        } else {
            // 如果allocateMappedFileService 沒有創(chuàng)建坡锡,則直接創(chuàng)建文件
            try {
                mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
            } catch (IOException e) {
                log.error("create mappedFile exception", e);
            }
        }
        //最后將創(chuàng)建或返回的 MapedFile 對象存入 MapedFileQueue 的 MapedFile 列表中,并返回該 MapedFile 對象給調(diào)用者
        if (mappedFile != null) {
            if (this.mappedFiles.isEmpty()) {
                mappedFile.setFirstCreateInQueue(true);
            }
            this.mappedFiles.add(mappedFile);
        }

        return mappedFile;
    }

    return mappedFileLast;
}
  • 1窒所、獲取要寫入的CommitLog文件對應(yīng)的 MappedFile鹉勒。
  • 2、如果 needCreate = false吵取,則直接返回 mappedFileLast 禽额。
  • 3、如果 needCreate = true
    3.1皮官、如果allocateMappedFileService 已創(chuàng)建脯倒,構(gòu)建需要創(chuàng)建的 MappedFile 對應(yīng)的文件路徑。這里連續(xù)構(gòu)建兩個 MappedFile捺氢。
    然后調(diào)用AllocateMappedFileService.putRequestAndReturnMappedFile去創(chuàng)建 MappedFile藻丢。
    3.2、如果allocateMappedFileService 未創(chuàng)建摄乒,則直接使用new MappedFile()創(chuàng)建CommitLog 文件悠反。

MappedFile 對應(yīng)的是一個映射文件。
RocketMQ 中 CommitLog馍佑、ConsumerQueue斋否、index等都是使用的 MappedFile進行創(chuàng)建的。這里分析的是 CommitLog 的創(chuàng)建過程挤茄,所以 MappedFile 對應(yīng)的就是一個 CommitLog文件如叼。

putRequestAndReturnMappedFile

AllocateMappedFileService.putRequestAndReturnMappedFile

public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
    int canSubmitRequests = 2;
    if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
        if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
            && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
            canSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size();
        }
    }

    AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
    // 嘗試向 ConcurrentHashMap 中存放 nextReq ,如果存放失敗說明有別的線程已經(jīng)創(chuàng)建文件
    boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
    //存放成功穷劈,則進行向requestQueue中存放數(shù)據(jù)
    if (nextPutOK) {
        if (canSubmitRequests <= 0) {
            log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
                "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
            this.requestTable.remove(nextFilePath);
            return null;
        }
        boolean offerOK = this.requestQueue.offer(nextReq);
        if (!offerOK) {
            log.warn("never expected here, add a request to preallocate queue failed");
        }
        canSubmitRequests--;
    }
    // 再創(chuàng)建一個 CommitLog 文件
    AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
    boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
    if (nextNextPutOK) {
        if (canSubmitRequests <= 0) {
            log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
                "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
            this.requestTable.remove(nextNextFilePath);
        } else {
            boolean offerOK = this.requestQueue.offer(nextNextReq);
            if (!offerOK) {
                log.warn("never expected here, add a request to preallocate queue failed");
            }
        }
    }

    if (hasException) {
        log.warn(this.getServiceName() + " service has exception. so return null");
        return null;
    }

    AllocateRequest result = this.requestTable.get(nextFilePath);
    try {
        if (result != null) {
            boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
            if (!waitOK) {
                log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
                return null;
            } else {
                this.requestTable.remove(nextFilePath);
                return result.getMappedFile();
            }
        } else {
            log.error("find preallocate mmap failed, this never happen");
        }
    } catch (InterruptedException e) {
        log.warn(this.getServiceName() + " service has exception. ", e);
    }

    return null;
}

創(chuàng)建下一個 CommitLog 文件和下下一個CommitLog文件笼恰。
1、構(gòu)建 AllocateRequest nexReq 對象歇终,并把該對象存放到 requestQueue 隊列中
2社证、構(gòu)建 AllocateRequest nextNextReq 對象,并把該對象存放到 requestQueue 隊列中

然后在 AllocateMappedFileService.run() 方法中從 requestQueue 阻塞中獲取需要創(chuàng)建的任務(wù)评凝,并創(chuàng)建文件追葡。

創(chuàng)建文件的優(yōu)先級

AllocateRequest 對象實現(xiàn)了 Comparable 接口,把 AllocateRequest 保存到 requestQueue 隊列中,該隊列是 PriorityBlockingQueue 優(yōu)先級隊列宜肉≡染可以根據(jù)優(yōu)先級進行創(chuàng)建文件。
優(yōu)先級的根據(jù) 先根據(jù)文件的大小進行排序谬返,大的文件先創(chuàng)建之斯;
如果文件大小相同,則根據(jù)文件的起始偏移量進行排序遣铝,偏移量小的優(yōu)先創(chuàng)建佑刷。

為什么大文件先創(chuàng)建呢?

因為大文件肯定是 CommitLog 文件酿炸。只要優(yōu)先把CommitLog 文件創(chuàng)建完成那么就可以接受 Producer端的請求瘫絮。然后再根據(jù) CommitLog 中的數(shù)據(jù)生存 ConsumerQueue 和 index 相關(guān)的數(shù)據(jù)。

創(chuàng)建 CommitLog 文件

AllocateMappedFileService.run() 方法

public void run() {
    log.info(this.getServiceName() + " service started");
    // 循環(huán)獲取待創(chuàng)建文件的請求任務(wù)填硕,并創(chuàng)建文件
    while (!this.isStopped() && this.mmapOperation()) {
    }
    log.info(this.getServiceName() + " service end");
}

private boolean mmapOperation() {
    boolean isSuccess = false;
    AllocateRequest req = null;
    try {
        // 從 requestQueue 阻塞隊列中獲取 AllocateRequest  任務(wù)麦萤。
        req = this.requestQueue.take();
        AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
        if (null == expectedRequest) {
            log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
                + req.getFileSize());
            return true;
        }
        if (expectedRequest != req) {
            log.warn("never expected here,  maybe cause timeout " + req.getFilePath() + " "
                + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
            return true;
        }

        if (req.getMappedFile() == null) {
            long beginTime = System.currentTimeMillis();

            MappedFile mappedFile;
            // 判斷是否開啟 isTransientStorePoolEnable ,如果開啟則使用直接內(nèi)存進行寫入數(shù)據(jù)廷支,最后從直接內(nèi)存中 commit 到 FileChannel 中频鉴。
            if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                try {
                    mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
                    mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                } catch (RuntimeException e) {
                    log.warn("Use default implementation.");
                    mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                }
            } else {
                // 使用 mmap 方式創(chuàng)建 MappedFile
                mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
            }

            long eclipseTime = UtilAll.computeEclipseTimeMilliseconds(beginTime);
            if (eclipseTime > 10) {
                int queueSize = this.requestQueue.size();
                log.warn("create mappedFile spent time(ms) " + eclipseTime + " queue size " + queueSize
                    + " " + req.getFilePath() + " " + req.getFileSize());
            }

            // 預(yù)寫入數(shù)據(jù)。按照系統(tǒng)的 pagesize 進行每個pagesize 寫入一個字節(jié)數(shù)據(jù)恋拍。
            //為了把mmap 方式映射的文件都加載到內(nèi)存中。
            if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
                .getMapedFileSizeCommitLog()
                &&
                this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
                mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
                    this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
            }

            req.setMappedFile(mappedFile);
            this.hasException = false;
            isSuccess = true;
        }
    } catch (InterruptedException e) {
        log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
        this.hasException = true;
        return false;
    } catch (IOException e) {
        log.warn(this.getServiceName() + " service has exception. ", e);
        this.hasException = true;
        if (null != req) {
            requestQueue.offer(req);
            try {
                Thread.sleep(1);
            } catch (InterruptedException ignored) {
            }
        }
    } finally {
        if (req != null && isSuccess)
            req.getCountDownLatch().countDown();
    }
    return true;
}

初始化 MappedFile

private void init(final String fileName, final int fileSize) throws IOException {
    this.fileName = fileName;
    this.fileSize = fileSize;
    this.file = new File(fileName);
    this.fileFromOffset = Long.parseLong(this.file.getName());
    boolean ok = false;

    ensureDirOK(this.file.getParent());

    try {
        this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
        this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
        TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
        TOTAL_MAPPED_FILES.incrementAndGet();
        ok = true;
    } catch (FileNotFoundException e) {
        log.error("create file channel " + this.fileName + " Failed. ", e);
        throw e;
    } catch (IOException e) {
        log.error("map file " + this.fileName + " Failed. ", e);
        throw e;
    } finally {
        if (!ok && this.fileChannel != null) {
            this.fileChannel.close();
        }
    }
}

使用 RandomAccessFile 獲取 可讀性 channel 進行讀寫消息藕甩。

核心存儲機制

該方法mappedFile.appendMessage最終會調(diào)用到 DefaultAppendMessageCallback.doAppend(...)進行存儲信息施敢。

代碼如下:

public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
    final MessageExtBrokerInner msgInner) {
    // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>

    // PHY OFFSET
    long wroteOffset = fileFromOffset + byteBuffer.position();

    this.resetByteBuffer(hostHolder, 8);
    String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);

    // Record ConsumeQueue information
    keyBuilder.setLength(0);
    keyBuilder.append(msgInner.getTopic());
    keyBuilder.append('-');
    keyBuilder.append(msgInner.getQueueId());
    String key = keyBuilder.toString();
    Long queueOffset = CommitLog.this.topicQueueTable.get(key);
    if (null == queueOffset) {
        queueOffset = 0L;
        CommitLog.this.topicQueueTable.put(key, queueOffset);
    }

    // Transaction messages that require special handling
    final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
    switch (tranType) {
        // Prepared and Rollback message is not consumed, will not enter the
        // consumer queuec
        case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
            queueOffset = 0L;
            break;
        case MessageSysFlag.TRANSACTION_NOT_TYPE:
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
        default:
            break;
    }

    /**
     * Serialize message
     */
    final byte[] propertiesData =
        msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);

    final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;

    if (propertiesLength > Short.MAX_VALUE) {
        log.warn("putMessage message properties length too long. length={}", propertiesData.length);
        return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
    }

    final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
    final int topicLength = topicData.length;

    final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;

    final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);

    // Exceeds the maximum message
    if (msgLen > this.maxMessageSize) {
        CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
            + ", maxMessageSize: " + this.maxMessageSize);
        return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
    }

    // Determines whether there is sufficient free space
    if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
        this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
        // 1 TOTALSIZE
        this.msgStoreItemMemory.putInt(maxBlank);
        // 2 MAGICCODE
        this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
        // 3 The remaining space may be any value
        // Here the length of the specially set maxBlank
        final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
        byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
        return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
            queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
    }

    // Initialization of storage space
    this.resetByteBuffer(msgStoreItemMemory, msgLen);
    // 1 TOTALSIZE
    this.msgStoreItemMemory.putInt(msgLen);
    // 2 MAGICCODE
    this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
    // 3 BODYCRC
    this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
    // 4 QUEUEID
    this.msgStoreItemMemory.putInt(msgInner.getQueueId());
    // 5 FLAG
    this.msgStoreItemMemory.putInt(msgInner.getFlag());
    // 6 QUEUEOFFSET
    this.msgStoreItemMemory.putLong(queueOffset);
    // 7 PHYSICALOFFSET
    this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
    // 8 SYSFLAG
    this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
    // 9 BORNTIMESTAMP
    this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
    // 10 BORNHOST
    this.resetByteBuffer(hostHolder, 8);
    this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
    // 11 STORETIMESTAMP
    this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
    // 12 STOREHOSTADDRESS
    this.resetByteBuffer(hostHolder, 8);
    this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
    //this.msgBatchMemory.put(msgInner.getStoreHostBytes());
    // 13 RECONSUMETIMES
    this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
    // 14 Prepared Transaction Offset
    this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
    // 15 BODY
    this.msgStoreItemMemory.putInt(bodyLength);
    if (bodyLength > 0)
        this.msgStoreItemMemory.put(msgInner.getBody());
    // 16 TOPIC
    this.msgStoreItemMemory.put((byte) topicLength);
    this.msgStoreItemMemory.put(topicData);
    // 17 PROPERTIES
    this.msgStoreItemMemory.putShort((short) propertiesLength);
    if (propertiesLength > 0)
        this.msgStoreItemMemory.put(propertiesData);

    final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
    // Write messages to the queue buffer
    byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

    AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
        msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

    switch (tranType) {
        case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
            break;
        case MessageSysFlag.TRANSACTION_NOT_TYPE:
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
            // The next update ConsumeQueue information
            CommitLog.this.topicQueueTable.put(key, ++queueOffset);
            break;
        default:
            break;
    }
    return result;
}

就是把消息寫入到 byteBuffer 中。

字段 字段大小(字節(jié)) 字段含義
msgSize 4 代表這個消息的大小
MAGICCODE 4 MAGICCODE = daa320a7
BODY CRC 4 消息體 BODY CRC
queueId 4 queueId
flag 4 系統(tǒng)消息標志(是否壓縮消息狭莱、是否有tag僵娃、是否事務(wù)等)
QUEUEOFFSET 8 這個值是個自增值不是真正的 consume queue 的偏移量,可以代表這個consumeQueue 隊列或者 tranStateTable 隊列中消息的個數(shù)
SYSFLAG 4 指明消息是事物事物狀態(tài)等消息特征腋妙,二進制為四個字節(jié)從右往左數(shù):當 4 個字節(jié)均為 0(值為 0)時表示非事務(wù)消息默怨;當?shù)?1 個字 節(jié)為 1(值為 1)時表示表示消息是壓縮的(Compressed);當?shù)?2 個字節(jié)為 1(值為 2) 表示多消息(MultiTags)骤素;當?shù)?3 個字節(jié)為 1(值為 4)時表示 prepared 消息匙睹;當?shù)?4 個字節(jié)為 1(值為 8)時表示commit 消息; 當?shù)?3/4 個字節(jié)均為 1 時(值為 12)時表示 rollback 消息;當?shù)?3/4 個字節(jié)均為 0 時表 示非事務(wù)消息济竹;
BORNTIMESTAMP 8 消息產(chǎn)生端(producer)的時間戳
BORNHOST 8 消息產(chǎn)生端(producer)地址(address:port)
STORETIMESTAMP 8 消息在broker存儲時間
STOREHOSTADDRESS 8 消息存儲到broker的地址(address:port)
RECONSUMETIMES 8 消息被某個訂閱組重新消費了幾次(訂閱組 之間獨立計數(shù)),因為重試消息發(fā)送到了topic 名字為%retry%groupName的隊列 queueId=0的隊列中去了痕檬,成功消費一次記 錄為0;
PreparedTransaction Offset 8 表示是prepared狀態(tài)的事物消息
messagebodyLength 4 消息體大小值
messagebody bodyLength 消息體內(nèi)容
topicLength 1 topic名稱內(nèi)容大小
topic topicLength topic的內(nèi)容值
propertiesLength 2 屬性值大小
properties propertiesLength propertiesLength大小的屬性數(shù)據(jù)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末送浊,一起剝皮案震驚了整個濱河市梦谜,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖唁桩,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件闭树,死亡現(xiàn)場離奇詭異,居然都是意外死亡荒澡,警方通過查閱死者的電腦和手機蔼啦,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來仰猖,“玉大人捏肢,你說我怎么就攤上這事〖⑶郑” “怎么了鸵赫?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長躏升。 經(jīng)常有香客問我辩棒,道長,這世上最難降的妖魔是什么膨疏? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任一睁,我火速辦了婚禮,結(jié)果婚禮上佃却,老公的妹妹穿的比我還像新娘者吁。我一直安慰自己,他們只是感情好饲帅,可當我...
    茶點故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布复凳。 她就那樣靜靜地躺著,像睡著了一般灶泵。 火紅的嫁衣襯著肌膚如雪育八。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天赦邻,我揣著相機與錄音髓棋,去河邊找鬼。 笑死惶洲,一個胖子當著我的面吹牛按声,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播湃鹊,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼儒喊,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了币呵?” 一聲冷哼從身側(cè)響起怀愧,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤侨颈,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后芯义,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體哈垢,經(jīng)...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年扛拨,在試婚紗的時候發(fā)現(xiàn)自己被綠了耘分。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡绑警,死狀恐怖求泰,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情计盒,我是刑警寧澤渴频,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布,位于F島的核電站,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏莉恼。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一场钉、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧懈涛,春花似錦逛万、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至价匠,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間呛每,已是汗流浹背踩窖。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留晨横,地道東北人洋腮。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像手形,于是被迫代替她去往敵國和親啥供。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,762評論 2 345

推薦閱讀更多精彩內(nèi)容

  • 0.前言 RMQ對于消息持久化的方式是順序?qū)懙奖镜卮疟P文件库糠,相對于持久化到遠程的數(shù)據(jù)庫或者KV來說伙狐,往本地磁盤文件...
    lambdacalculus閱讀 2,623評論 1 6
  • 本來想將broker和client分開寫涮毫。但是他們的每個功能都是共同協(xié)作完成的,寫broker的時候贷屎,難免會涉及到...
    msrpp閱讀 3,538評論 1 7
  • Swift1> Swift和OC的區(qū)別1.1> Swift沒有地址/指針的概念1.2> 泛型1.3> 類型嚴謹 對...
    cosWriter閱讀 11,089評論 1 32
  • 關(guān)于Mongodb的全面總結(jié) MongoDB的內(nèi)部構(gòu)造《MongoDB The Definitive Guide》...
    中v中閱讀 31,898評論 2 89
  • 不要逼我寫文章 我就愛聊八卦 每周一篇公眾號文章罢防,對不起,我憋~憋~憋不出來唉侄! 我喜歡的就是和lulane咒吐、zho...
    喵馬烏閱讀 206評論 0 0