rocketMQ HA實(shí)現(xiàn)

rocketMQ在實(shí)現(xiàn)消息穩(wěn)定,不丟失等高可用特性時(shí)括授,采用了2種技術(shù)方案生蚁,一種是經(jīng)典Master/Slave設(shè)計(jì)風(fēng)格,但是有個(gè)最致命的缺點(diǎn)在出現(xiàn)主節(jié)點(diǎn)故障時(shí)针炉,無法切換某個(gè)slave作為主節(jié)點(diǎn)钱雷,導(dǎo)致該節(jié)點(diǎn)集群不可使用,必須將master節(jié)點(diǎn)恢復(fù)后才能使用。第二種是通過Dledger框架舶沛,不僅實(shí)現(xiàn)了數(shù)據(jù)消息的備份,也可以實(shí)現(xiàn)故障轉(zhuǎn)移窗价,保證節(jié)點(diǎn)集群可以繼續(xù)使用如庭。
本文主要熟悉Master/Slave是如何實(shí)現(xiàn),并且rocketMQ也是有自身實(shí)現(xiàn)代碼撼港,并沒有引入第三方框架坪它。
作為Broker服務(wù),他有2種角色帝牡,分別是Master往毡,Slave。當(dāng)然生產(chǎn)消息主要與Master交互靶溜,但是消息消費(fèi)2者都能使用开瞭。為了消息的穩(wěn)定,安全罩息,持久化嗤详,將消息備份在異地服務(wù)器是重要手段,所以Slave節(jié)點(diǎn)主要是保持與Master中的消息持續(xù)獲取瓷炮,并且保持在本地磁盤中葱色。當(dāng)然Slave與Master同步數(shù)據(jù),不僅僅只有消息娘香,還有很多其他的數(shù)據(jù)苍狰。

SlaveSynchronize Slave同步服務(wù)

在BrokerController中,在start啟動(dòng)方法時(shí)烘绽,針對broker角色進(jìn)行處理

        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            startProcessorByHa(messageStoreConfig.getBrokerRole());
            handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
        }

當(dāng)然采用Master/Slaver模式才會(huì)執(zhí)行代碼淋昭,在handleSlaveSynchronize方法中

    private void handleSlaveSynchronize(BrokerRole role) {
        if (role == BrokerRole.SLAVE) {
            if (null != slaveSyncFuture) {
                slaveSyncFuture.cancel(false);
            }
            this.slaveSynchronize.setMasterAddr(null);
            slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.slaveSynchronize.syncAll();
                    }
                    catch (Throwable e) {
                        log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
                    }
                }
            }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
        } else {
            //handle the slave synchronise
            if (null != slaveSyncFuture) {
                slaveSyncFuture.cancel(false);
            }
            this.slaveSynchronize.setMasterAddr(null);
        }
    }

在角色為Slave時(shí),會(huì)開啟定時(shí)任務(wù)诀姚,執(zhí)行slaveSynchronize對象的syncAll同步方法

    public void syncAll() {
        this.syncTopicConfig();
        this.syncConsumerOffset();
        this.syncDelayOffset();
        this.syncSubscriptionGroupConfig();
    }

在這塊同步代碼塊中响牛,同步了很多內(nèi)容,包括同步topic主題配置數(shù)據(jù)赫段,消費(fèi)進(jìn)度數(shù)據(jù)呀打,消息延遲隊(duì)列數(shù)據(jù)同步贬丛,和訂閱組配置同步给涕。舉例其中topic配置同步內(nèi)容

    private void syncTopicConfig() {
        String masterAddrBak = this.masterAddr;
        if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
            try {
                TopicConfigSerializeWrapper topicWrapper =
                    this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
                if (!this.brokerController.getTopicConfigManager().getDataVersion()
                    .equals(topicWrapper.getDataVersion())) {

                    this.brokerController.getTopicConfigManager().getDataVersion()
                        .assignNewOne(topicWrapper.getDataVersion());
                    this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
                    this.brokerController.getTopicConfigManager().getTopicConfigTable()
                        .putAll(topicWrapper.getTopicConfigTable());
                    this.brokerController.getTopicConfigManager().persist();

                    log.info("Update slave topic config from master, {}", masterAddrBak);
                }
            } catch (Exception e) {
                log.error("SyncTopicConfig Exception, {}", masterAddrBak, e);
            }
        }
    }

從master服務(wù)器上獲取到了topicWrapper包裝好的topic配置內(nèi)容豺憔,然后更新自身brokerController中的相關(guān)的topic數(shù)據(jù)恭应,最終進(jìn)行持久化抄邀。其他的同步數(shù)據(jù),也是一樣境肾,需要持久化到本地文件中奥喻。在這里环鲤,目前只是同步了一些外圍的配置數(shù)據(jù)憎兽,消費(fèi)進(jìn)度數(shù)據(jù)等唇兑,但是并沒有說明消息是如何同步的扎附。

消息同步

消息同步留夜,設(shè)計(jì)到存儲(chǔ)相關(guān)碍粥,所以將這部分核心代碼放在了store模塊下嚼摩,其中HAService是核心服務(wù)枕面。
首先了解一下HAService服務(wù)內(nèi)部類HAClient潮秘,他是最為Slave客戶端枕荞,發(fā)起請求同步Master消息數(shù)據(jù)躏精。

HAClient

        private final AtomicReference<String> masterAddress = new AtomicReference<>(); // master地址
        private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
        private SocketChannel socketChannel;
        private Selector selector;
        private long lastWriteTimestamp = System.currentTimeMillis();

        private long currentReportedOffset = 0;
        private int dispatchPosition = 0; // 該位置矗烛,是不停的增長的高诺,當(dāng)byteBufferRead寫滿虱而,會(huì)重置為0
        private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
        private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);

這部分是client的基本數(shù)據(jù)牡拇,其中記錄了masterAddress地址惠呼,currentReportedOffset 當(dāng)前上報(bào)的偏移量剔蹋,dispatchPosition臨時(shí)記錄存儲(chǔ)的位置泣崩。由于HAClient是實(shí)現(xiàn)了ServiceThread類矫付,所以他也是一個(gè)線程類

        public void run() {
            log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
                try {
                    if (this.connectMaster()) {
                        if (this.isTimeToReportOffset()) {
                            boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                            if (!result) {
                                this.closeMaster();
                            }
                        }
                        this.selector.select(1000);
                        boolean ok = this.processReadEvent();
                        if (!ok) {
                            this.closeMaster();
                        }
                        // 上報(bào)一下slave的最大偏移量進(jìn)度
                        if (!reportSlaveMaxOffsetPlus()) {
                            continue;
                        }
                        long interval =
                            HAService.this.getDefaultMessageStore().getSystemClock().now()
                                - this.lastWriteTimestamp;
                        if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
                            .getHaHousekeepingInterval()) {
                            log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
                                + "] expired, " + interval);
                            this.closeMaster();
                            log.warn("HAClient, master not response some time, so close connection");
                        }
                    } else {
                        this.waitForRunning(1000 * 5);
                    }
                } catch (Exception e) {
                    log.warn(this.getServiceName() + " service has exception. ", e);
                    this.waitForRunning(1000 * 5);
                }
            }
            log.info(this.getServiceName() + " service end");
        }

作為客戶端妨马,他需要主動(dòng)發(fā)起連接master烘跺,所以在嘗試得到獲取數(shù)據(jù)時(shí)液荸,確認(rèn)master是否連接成功娇钱,在這里文搂,rocketMQ自己實(shí)現(xiàn)了基于jdk的NIO實(shí)現(xiàn)通信煤蹭。

        private boolean connectMaster() throws ClosedChannelException {
            if (null == socketChannel) {
                String addr = this.masterAddress.get();
                if (addr != null) {

                    SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
                    if (socketAddress != null) {
                        this.socketChannel = RemotingUtil.connect(socketAddress);
                        if (this.socketChannel != null) {
                            this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                        }
                    }
                }

                this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

                this.lastWriteTimestamp = System.currentTimeMillis();
            }

            return this.socketChannel != null;
        }

在連接master時(shí)硝皂,確認(rèn)masterAddress地址是否存在稽物,然后同步該地址得到channel贝或,并且注冊到selector中咪奖,進(jìn)行讀寫監(jiān)聽羊赵。當(dāng)然在連接初始化時(shí)慷垮,得到currentReportedOffset值料身,即為當(dāng)前slave的存儲(chǔ)消息的最大偏移量芹血。在連接master成功后幔烛,都會(huì)定時(shí)進(jìn)行上報(bào)master當(dāng)前偏移量

        private boolean reportSlaveMaxOffset(final long maxOffset) {
            this.reportOffset.position(0);
            this.reportOffset.limit(8);
            this.reportOffset.putLong(maxOffset);
            this.reportOffset.position(0);
            this.reportOffset.limit(8);
            // 上報(bào)一個(gè)slave最大的偏移量
            for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
                try {
                    this.socketChannel.write(this.reportOffset);
                } catch (IOException e) {
                    log.error(this.getServiceName()
                        + "reportSlaveMaxOffset this.socketChannel.write exception", e);
                    return false;
                }
            }

            lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
            return !this.reportOffset.hasRemaining();
        }

上報(bào)內(nèi)容也很簡單,將reportOffset 字節(jié)緩存進(jìn)行重置狡恬,并且添加8個(gè)子節(jié)點(diǎn)maxOffset值弟劲,然后寫入到socketChannel中兔乞。這樣master就能直到當(dāng)前slave的上報(bào)進(jìn)度了庸追。當(dāng)然定時(shí)上報(bào)還有另外好處淡溯,可以用這樣的方式代替實(shí)時(shí)心跳檢測血筑,保證channel長時(shí)間連接豺总。
在master下發(fā)數(shù)據(jù)時(shí)候喻喳,slave就能監(jiān)聽到讀事件表伦,然后channel就能從通道中讀取字節(jié)流蹦哼。

        private boolean processReadEvent() {
            int readSizeZeroTimes = 0;
            while (this.byteBufferRead.hasRemaining()) {
                try {
                    int readSize = this.socketChannel.read(this.byteBufferRead);
                    if (readSize > 0) {
                        readSizeZeroTimes = 0;
                        boolean result = this.dispatchReadRequest();
                        if (!result) {
                            log.error("HAClient, dispatchReadRequest error");
                            return false;
                        }
                    } else if (readSize == 0) {
                        if (++readSizeZeroTimes >= 3) {
                            break;
                        }
                    } else {
                        log.info("HAClient, processReadEvent read socket < 0");
                        return false;
                    }
                } catch (IOException e) {
                    log.info("HAClient, processReadEvent read socket exception", e);
                    return false;
                }
            }

            return true;
        }

processReadEvent 就是用以處理讀事件的妆丘,其中byteBufferRead是用來保存讀取channel通道中的字節(jié)流勺拣。當(dāng)讀取到內(nèi)容時(shí)药有,readSize是大于0的苇经。讀取完內(nèi)容后塑陵,就開始執(zhí)行分發(fā)讀請求 dispatchReadRequest方法令花。

        private boolean dispatchReadRequest() {
            final int msgHeaderSize = 8 + 4; // phyoffset + size
            int readSocketPos = this.byteBufferRead.position();

            while (true) {
                int diff = this.byteBufferRead.position() - this.dispatchPosition;
                if (diff >= msgHeaderSize) {
                    // 得到物理偏移量
                    long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
                    // 得到body的大小
                    int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);

                    long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
                    if (slavePhyOffset != 0) {
                        if (slavePhyOffset != masterPhyOffset) {
                            log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
                                + slavePhyOffset + " MASTER: " + masterPhyOffset);
                            return false;
                        }
                    }
                    if (diff >= (msgHeaderSize + bodySize)) { // 至少是一個(gè)完整的包
                        byte[] bodyData = new byte[bodySize];
                        this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
                        this.byteBufferRead.get(bodyData);
                        // 寫入到slave中去   
                        HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);

                        this.byteBufferRead.position(readSocketPos);
                        this.dispatchPosition += msgHeaderSize + bodySize; // dispatchPosition 記錄的是n個(gè)完整的包長度
                        if (!reportSlaveMaxOffsetPlus()) {
                            // 上報(bào)一下slave的最大偏移量
                            return false;
                        }
                        continue;
                    }
                }
                if (!this.byteBufferRead.hasRemaining()) {
                    this.reallocateByteBuffer();
                }

                break;
            }

            return true;
        }

定義了一個(gè)msgHeaderSize 消息頭長度,readSocketPos是當(dāng)前byteBufferRead的位置扮碧,首先明確慎王,byteBufferRead如果從channel中讀取數(shù)據(jù)時(shí)候,他的pos位置也是增長的咱旱。所以readSocketPos不是讀取消息的位置吐限,當(dāng)前消息的終點(diǎn)位置诸典。那么怎么得到讀取消息的起始位置呢搂赋,在HAClient中,有個(gè)屬性dispatchPosition 就是用來保存上一次讀取的位置幅慌。在master發(fā)送消息數(shù)據(jù)的時(shí)候胰伍,基本協(xié)議的結(jié)構(gòu)是 消息頭(包括物理偏移量骂租,消息body的長度)+ 數(shù)據(jù)內(nèi)容渗饮。在準(zhǔn)備讀取內(nèi)容時(shí)私蕾,確認(rèn)上一次讀取的位置dispatchPosition與當(dāng)前byteBufferRead的位置的長度差大于等于一個(gè)消息頭的大小踩叭。先從上一個(gè)起始點(diǎn)開始讀取8個(gè)字節(jié)容贝,即為masterPhyOffset最大偏移量斤富,然后讀取4個(gè)字節(jié)bodySize茂缚,消息內(nèi)容長度脚囊。當(dāng)前會(huì)進(jìn)行判斷slave服務(wù)中當(dāng)前最大的物理偏移量slavePhyOffset 是否與master推送過來的masterPhyOffset是否一致。這樣的目的是保證slave與master數(shù)據(jù)同步一致性衬以,保證是不會(huì)丟失看峻。在diff >= (msgHeaderSize + bodySize)比較中互妓,確定是否存在一個(gè)完整的數(shù)據(jù)包冯勉。然后聲明長度為bodySize的bodyData字節(jié)數(shù)組宛瞄,并且重置了byteBufferRead的pos位置份汗,然后將數(shù)據(jù)復(fù)制給bodyData裸影,然后將數(shù)據(jù)添加到commitLog中轩猩。數(shù)據(jù)添加完成后,又將byteBufferRead的pos位置重置為readSocketPos彤委,并且dispatchPosition 又增加了一個(gè)完整數(shù)據(jù)包的長度焦影,最后及時(shí)上報(bào)master服務(wù)當(dāng)前slave的最大物理偏移量斯辰。
byteBufferRead最為存儲(chǔ)通信數(shù)據(jù)的載體,長度為4M闸氮。在目前數(shù)據(jù)獲取時(shí)蒲跨,byteBufferRead是一直在增加财骨,即pos在增加隆箩,即有讀取到新得數(shù)據(jù),byteBufferRead的pos一直變大理澎,直到limit=pos時(shí)無法添加內(nèi)容糠爬。為保證數(shù)據(jù)持續(xù)獲取,肯定需要將byteBufferRead重置镀琉,然后再讀取屋摔。所以才會(huì)有reallocateByteBuffer方法

        private void reallocateByteBuffer() {
            int remain = READ_MAX_BUFFER_SIZE - this.dispatchPosition;
            if (remain > 0) {
                this.byteBufferRead.position(this.dispatchPosition);

                this.byteBufferBackup.position(0);
                this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE);
                this.byteBufferBackup.put(this.byteBufferRead); // 將read沒有讀取的部分放入到backup中
            }

            this.swapByteBuffer();

            this.byteBufferRead.position(remain);
            this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
            this.dispatchPosition = 0;
        }

當(dāng)byteBufferRead數(shù)據(jù)存滿,但是任然會(huì)出現(xiàn) 最后一個(gè)數(shù)據(jù)包是不完整的弓熏。肯定需要將最后一段數(shù)據(jù)進(jìn)行保存起來滞谢,下次繼續(xù)使用狮杨。在這里清寇,他采用了byteBufferBackup一個(gè)備份的字節(jié)緩存华烟。remain是指一個(gè)數(shù)據(jù)包的部分?jǐn)?shù)據(jù)長度盔夜。當(dāng)remain大于0,說明包不完整椭微,就會(huì)將byteBufferRead剩余部分復(fù)制給重置后的byteBufferBackup,此時(shí)byteBufferBackup是存在數(shù)據(jù)瓢剿,并且當(dāng)前pos為remain间狂。swapByteBuffer方法就是將byteBufferRead和byteBufferBackup執(zhí)行對象互相交換,即現(xiàn)在的byteBufferRead就是原來的byteBufferBackup對象纺弊。然后又重置了byteBufferRead的limit值,并且dispatchPosition的位置也變成0了犹菱。下次byteBufferRead再從socketChannel中讀取的位置就是從remain開始了腊脱。
這是作為slave端陕凹,如何主動(dòng)發(fā)起連接搜骡,并且上報(bào)最大偏移量和消息數(shù)據(jù)獲取并存儲(chǔ)的邏輯。

Master服務(wù)端

最為服務(wù)端,首先他可以由多個(gè)slave同時(shí)同步消息數(shù)據(jù)店茶,所以需要有個(gè)管理客戶端的服務(wù)。

AcceptSocketService

該類是HAService內(nèi)部類两嘴,他是監(jiān)聽管理連接slave的connection服務(wù)
在配置通信服務(wù)端時(shí)

        public void beginAccept() throws Exception {
            this.serverSocketChannel = ServerSocketChannel.open();
            this.selector = RemotingUtil.openSelector();
            this.serverSocketChannel.socket().setReuseAddress(true);
            this.serverSocketChannel.socket().bind(this.socketAddressListen);
            this.serverSocketChannel.configureBlocking(false);
            this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
        }

綁定端口丛楚,配置非阻塞,并且注冊了接收事件憔辫,即接受客戶端連接請求趣些。
在開啟線程時(shí)

        public void run() {
            log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    this.selector.select(1000);
                    Set<SelectionKey> selected = this.selector.selectedKeys();

                    if (selected != null) {
                        for (SelectionKey k : selected) {
                            if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                                // 有請求連接
                                SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();

                                if (sc != null) {
                                    HAService.log.info("HAService receive new connection, "
                                        + sc.socket().getRemoteSocketAddress());

                                    try {
                                        // 創(chuàng)建一個(gè)客戶端的連接,
                                        HAConnection conn = new HAConnection(HAService.this, sc);
                                        conn.start();
                                        HAService.this.addConnection(conn);
                                    } catch (Exception e) {
                                        log.error("new HAConnection exception", e);
                                        sc.close();
                                    }
                                }
                            } else {
                                log.warn("Unexpected ops in select " + k.readyOps());
                            }
                        }

                        selected.clear();
                    }
                } catch (Exception e) {
                    log.error(this.getServiceName() + " service has exception.", e);
                }
            }

            log.info(this.getServiceName() + " service end");
        }

就是實(shí)時(shí)監(jiān)聽連接請求贰您,selected 即為得到事件的所以連接通道,然后判斷確認(rèn)存在OP_ACCEPT事件锦亦,將客戶端SocketChannel封裝成HAConnection并且添加到HAService中connectionList集合中舶替。當(dāng)然AcceptSocketService只是管理連接客戶端數(shù),服務(wù)器真正讀寫邏輯在HAConnection完成的杠园。

HAConnection

    public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
        this.haService = haService;
        this.socketChannel = socketChannel;
        this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
        this.socketChannel.configureBlocking(false);
        this.socketChannel.socket().setSoLinger(false, -1);
        this.socketChannel.socket().setTcpNoDelay(true);
        this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
        this.socketChannel.socket().setSendBufferSize(1024 * 64);
        this.writeSocketService = new WriteSocketService(this.socketChannel);
        this.readSocketService = new ReadSocketService(this.socketChannel);
        this.haService.getConnectionCount().incrementAndGet();
    }

構(gòu)成器中創(chuàng)建了readSocketService讀服務(wù)顾瞪,和writeSocketService寫服務(wù)。當(dāng)然讀服務(wù)主要任務(wù)是讀取 slave上報(bào)的最大偏移量抛蚁,而寫服務(wù)是寫入消息數(shù)據(jù)到通道中陈醒。

ReadSocketService

讀服務(wù)只要監(jiān)聽讀事件即可,并且通過線程篮绿,實(shí)時(shí)監(jiān)聽數(shù)據(jù)孵延,然后執(zhí)行處理讀請求

        private boolean processReadEvent() {
            int readSizeZeroTimes = 0;

            if (!this.byteBufferRead.hasRemaining()) {
                this.byteBufferRead.flip();
                this.processPosition = 0;
            }

            while (this.byteBufferRead.hasRemaining()) {
                try {
                    int readSize = this.socketChannel.read(this.byteBufferRead);
                    if (readSize > 0) {
                        readSizeZeroTimes = 0;
                        this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                        if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
                            // slave上報(bào)的最大偏移量占8個(gè)子節(jié),正常情況下byteBufferRead的position是8的倍數(shù)亲配,但是不能確定出現(xiàn)粘包情況的出現(xiàn)
                            // 所以pos重新計(jì)算尘应,規(guī)避該情況的出現(xiàn)惶凝,保證獲取的位置是正確的
                            int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
                            long readOffset = this.byteBufferRead.getLong(pos - 8);
                            this.processPosition = pos;

                            HAConnection.this.slaveAckOffset = readOffset;
                            if (HAConnection.this.slaveRequestOffset < 0) {
                                HAConnection.this.slaveRequestOffset = readOffset;
                                log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
                            }

                            HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                        }
                    } else if (readSize == 0) {
                        if (++readSizeZeroTimes >= 3) {
                            break;
                        }
                    } else {
                        log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
                        return false;
                    }
                } catch (IOException e) {
                    log.error("processReadEvent exception", e);
                    return false;
                }
            }

            return true;
        }
    }

byteBufferRead 存儲(chǔ)最大長度為1M,但是當(dāng)byteBufferRead滿后犬钢,就直接重置苍鲜,并沒有處理未完整包,為什么呢玷犹?因?yàn)閟lave上報(bào)最大偏移量時(shí)混滔,占用8個(gè)字節(jié),所以byteBufferRead的長度肯定能存儲(chǔ)完整的包歹颓。processPosition是上一次讀取數(shù)據(jù)的位置坯屿。從socketChannel讀取內(nèi)容時(shí)都會(huì)復(fù)制到byteBufferRead中,然后與上一次去讀位置比較巍扛,因?yàn)橐粋€(gè)包只要8個(gè)字節(jié)领跛,所以差值大于等于8即可。但是在獲取slave上報(bào)的偏移量時(shí)撤奸,他只是獲取了最近的一個(gè)包吠昭,即如果this.byteBufferRead.position() - this.processPosition是8的好幾倍,他也是讀取最某位的數(shù)據(jù)胧瓜。不關(guān)心矢棚,中間包的數(shù)據(jù)情況。
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8); 該pos獲取的方式府喳,就是為了保證pos是8的倍數(shù)蒲肋,剔除掉某位可能出現(xiàn)不完整包的情況。然后readOffset的偏移量就是pos-8 的位置讀取8個(gè)字節(jié)的數(shù)據(jù)劫拢。并且重置了processPosition的位置肉津。slaveAckOffset 就是認(rèn)為slave存儲(chǔ)的數(shù)據(jù)也是持久化成功的確認(rèn)位置,然后執(zhí)行haService.notifyTransferSome服務(wù)(待會(huì)聊舱沧,為什么需要這個(gè)通知服務(wù))妹沙。其中slaveRequestOffset 是HAConnection的屬性,當(dāng)小于0時(shí)需要將此次讀取到readOffset賦值給slaveRequestOffset熟吏。
這是一個(gè)讀取服務(wù)距糖,既可以直到當(dāng)前slave的消息持久的位置,也能保證心跳連接方式牵寺。

WriteSocketService

該服務(wù)肯定是推送消息數(shù)據(jù)給slave悍引,并且注冊了寫事件。在slave實(shí)現(xiàn)讀消息時(shí)帽氓,已經(jīng)知道一個(gè)包的組成結(jié)構(gòu)趣斤,消息頭(起始偏移量+消息長度)+ 消息內(nèi)容。在實(shí)現(xiàn)線程接口方法時(shí)黎休,分開講解浓领,一部分 初始情況怎么做玉凯,第二部數(shù)據(jù)是怎么寫的,如果沒有寫完怎么做

第一部分 初始操作


                    if (-1 == HAConnection.this.slaveRequestOffset) {
                        Thread.sleep(10);
                        continue;
                    }
                    if (-1 == this.nextTransferFromWhere) {
                        // 初始化
                        if (0 == HAConnection.this.slaveRequestOffset) {
                            // slave 沒有任何請求偏移量联贩,那么默認(rèn)從一個(gè)文件開始
                            long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
                            masterOffset =
                                masterOffset - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                                    .getMappedFileSizeCommitLog());

                            if (masterOffset < 0) {
                                masterOffset = 0;
                            }

                            this.nextTransferFromWhere = masterOffset;
                        } else {
                            this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
                        }

                        log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
                            + "], and slave request " + HAConnection.this.slaveRequestOffset);
                    }

slaveRequestOffset 等于-1 說明slave還沒有發(fā)送當(dāng)前最大偏移量漫仆,作為master,是無法確定給slave推送消息數(shù)據(jù)的起始位置泪幌,所以不能繼續(xù)執(zhí)行盲厌。
當(dāng)nextTransferFromWhere為-1時(shí),說明master還沒有開始推送祸泪,需要確認(rèn)推送給slave的起始位置吗浩。如果slaveRequestOffset等于0,說明slave當(dāng)前狀況完全是新服務(wù)浴滴,沒有本地存儲(chǔ)的消息拓萌,那么master推送給slave的起始位置是存儲(chǔ)消息的最后一個(gè)文件開始位置。masterOffset是當(dāng)前master最大偏移量升略,最終取模相減之后,即為最后文件的起始位置屡限。
如果說slaveRequestOffset大于0品嚣,那么master推送給slave起始位置,即為slave當(dāng)前最大的位置slaveRequestOffset钧大。
這是初始工作翰撑,最重要的任務(wù)就是需要確認(rèn)master推送數(shù)據(jù)時(shí)的起始位置。

第二部分 推送數(shù)據(jù)

                    SelectMappedBufferResult selectResult =
                        HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
                    if (selectResult != null) {
                        int size = selectResult.getSize();
                        if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
                            size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
                        }

                        long thisOffset = this.nextTransferFromWhere;
                        this.nextTransferFromWhere += size;

                        selectResult.getByteBuffer().limit(size);
                        this.selectMappedBufferResult = selectResult;

                        // Build Header
                        this.byteBufferHeader.position(0);
                        this.byteBufferHeader.limit(headerSize);
                        this.byteBufferHeader.putLong(thisOffset);
                        this.byteBufferHeader.putInt(size);
                        this.byteBufferHeader.flip();

                        this.lastWriteOver = this.transferData();
                    } else {

                        HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
                    }

通過nextTransferFromWhere位置截取SelectMappedBufferResult數(shù)據(jù)啊央,由于受到推送最大長度限制眶诈,默認(rèn)最大size為32k數(shù)據(jù)。需要對byteBuffer進(jìn)行l(wèi)imit 限制最大為size長度瓜饥,然后將selectResult對象指向給當(dāng)前屬性的selectMappedBufferResult逝撬。開始先定義頭數(shù)據(jù)byteBufferHeader ,放入了nextTransferFromWhere值8個(gè)字節(jié)乓土,和內(nèi)容長度size 4個(gè)字節(jié)宪潮。最后執(zhí)行傳輸數(shù)據(jù)方法transferData

        private boolean transferData() throws Exception {
            int writeSizeZeroTimes = 0;
            // Write Header
            while (this.byteBufferHeader.hasRemaining()) {
                int writeSize = this.socketChannel.write(this.byteBufferHeader);
                if (writeSize > 0) {
                    writeSizeZeroTimes = 0;
                    this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                } else if (writeSize == 0) {
                    if (++writeSizeZeroTimes >= 3) {
                        break;
                    }
                } else {
                    throw new Exception("ha master write header error < 0");
                }
            }

            if (null == this.selectMappedBufferResult) {
                return !this.byteBufferHeader.hasRemaining();
            }

            writeSizeZeroTimes = 0;

            // Write Body
            if (!this.byteBufferHeader.hasRemaining()) {
                while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
                    int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
                    if (writeSize > 0) {
                        writeSizeZeroTimes = 0;
                        this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                    } else if (writeSize == 0) {
                        if (++writeSizeZeroTimes >= 3) {
                            break;
                        }
                    } else {
                        throw new Exception("ha master write body error < 0");
                    }
                }
            }

            boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();

            if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
                this.selectMappedBufferResult.release();
                this.selectMappedBufferResult = null;
            }

            return result;
        }

寫數(shù)據(jù)分2部分,一部分是頭數(shù)據(jù)趣苏,一部分是寫消息數(shù)據(jù)狡相。首先保證byteBufferHeader寫完,才能寫入消息內(nèi)容食磕。在寫完消息數(shù)據(jù)后尽棕,有個(gè)注意點(diǎn),就是將selectMappedBufferResult進(jìn)行release釋放彬伦。因?yàn)閺腸ommitLog中獲取selectMappedBufferResult時(shí)滔悉,標(biāo)記為使用狀態(tài)伊诵。如果用完,需要主動(dòng)釋放氧敢。假設(shè)此次沒有寫完時(shí)日戈,在下個(gè)循環(huán)會(huì)繼續(xù)執(zhí)行。
有個(gè)問題需要思考一下孙乖,在master推送數(shù)據(jù)時(shí)浙炼,推送的數(shù)據(jù)都是完整的消息內(nèi)容嗎?
答案肯定是否定唯袄,首先.規(guī)定了消息最大推送長度為32k弯屈,假設(shè)slave是全新的服務(wù)器或者長時(shí)間沒有同步,導(dǎo)致slave與master服務(wù)的最大偏移量差會(huì)很大恋拷。那么每次推送的數(shù)據(jù)中肯定都不是非常完整的消息內(nèi)容了资厉。所以只要偏差小于32k,基本上就能推送完整的消息了蔬顾。

GroupTransferService

在master讀取slave最大偏移量時(shí)宴偿,會(huì)執(zhí)行HAService中的notifyTransferSome方法

    public void notifyTransferSome(final long offset) {
        for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
            // 通知一下slave已經(jīng)更新到了最大的偏移量,有些同步等待的可以確認(rèn)slave成功了
            boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
            if (ok) {
                this.groupTransferService.notifyTransferSome();
                break;
            } else {
                value = this.push2SlaveMaxOffset.get();
            }
        }
    }

因?yàn)橐粋€(gè)master同時(shí)保持多個(gè)slave連接诀豁,并且每個(gè)slave上報(bào)的偏移量可能都不太一致窄刘,所以該放入通過原子長整型對象push2SlaveMaxOffset,來保證原子性舷胜。當(dāng)offset確實(shí)大于了value值娩践,并且push2SlaveMaxOffset更新成功,那么才能執(zhí)行g(shù)roupTransferService.notifyTransferSome();方法烹骨。如果更新失敗翻伺,那么從新判斷更新。
那么GroupTransferService最用是什么呢沮焕?在之前消息存儲(chǔ)講到吨岭,當(dāng)broker是同步狀態(tài)時(shí)候,一條消息存儲(chǔ)成功遇汞,是需要消息在本地broker持久化成功未妹,第二如果存在Slave,保證slave也需要將消息持久化成功空入。
在CommitLog類handleHA方法

    public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
            HAService service = this.defaultMessageStore.getHaService();
            if (messageExt.isWaitStoreMsgOK()) {
                // Determine whether to wait
                if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                    service.putRequest(request);
                    service.getWaitNotifyObject().wakeupAll();
                    boolean flushOK =
                        request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    if (!flushOK) {
                        log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
                            + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                    }
                }
                // Slave problem
                else {
                    // Tell the producer, slave not available
                    putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                }
            }
        }

    }

通過DefaulteMessageStore中獲取到HAService服務(wù)络它,并且slave都正常,然后創(chuàng)建GroupCommitRequest 對象放入到HAservice中歪赢,即為GroupTransferService中化戳,然后進(jìn)行等待,等待核心就是通過CountDownLatch實(shí)現(xiàn)。
GroupTransferService 服務(wù)也是實(shí)現(xiàn)ServiceThread点楼,

        private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject();
        private volatile List<CommitLog.GroupCommitRequest> requestsWrite = new ArrayList<>();
        private volatile List<CommitLog.GroupCommitRequest> requestsRead = new ArrayList<>();

其中requestsWrite 用于放置提交請求的扫尖,requestsRead是用來去讀請求,通過swapRequests方法進(jìn)行讀寫轉(zhuǎn)換掠廓。swapRequests方法是通過等待結(jié)束后會(huì)調(diào)用onWaitEnd方法换怖,然后執(zhí)行讀寫轉(zhuǎn)換。
首先是放入提交請求方法蟀瞧。

        public synchronized void putRequest(final CommitLog.GroupCommitRequest request) {
            synchronized (this.requestsWrite) {
                this.requestsWrite.add(request);
            }
            if (hasNotified.compareAndSet(false, true)) {
                waitPoint.countDown(); // notify
            }
        }

首先對requestsWrite進(jìn)行鎖定沉颂,然后放入提交請求。如果當(dāng)前線程沒有通知悦污,那么將hasNotified標(biāo)記為已經(jīng)通知铸屉,然后喚醒線程,因?yàn)榫€程await是通過waitPoint實(shí)現(xiàn)的切端。

        private void doWaitTransfer() {
            synchronized (this.requestsRead) {
                if (!this.requestsRead.isEmpty()) {
                    for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                        boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                        for (int i = 0; !transferOK && i < 5; i++) {
                            this.notifyTransferObject.waitForRunning(1000);
                            transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                        }

                        if (!transferOK) {
                            log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
                        }

                        req.wakeupCustomer(transferOK);
                    }

                    this.requestsRead.clear();
                }
            }
        }

doWaitTransfer方法是核心彻坛,首先鎖住requestsRead,然后就行遍歷踏枣。當(dāng)push2SlaveMaxOffset值大于等于了req中物理偏移量值(該值是消息的起始物理偏移量+消息長度)昌屉,那么需要喚醒寫入線程,并且標(biāo)記成功茵瀑。如何transferOK =false怠益,則會(huì)進(jìn)行等待,等待時(shí)長最多 5s瘾婿。如果失敗,然后會(huì)寫入喚醒線程烤咧,但是標(biāo)記為失敗偏陪。寫入線程喚醒后,通過狀態(tài)進(jìn)行反饋煮嫌,如果flushOK為false 笛谦,putMessageResult放置狀態(tài)FLUSH_SLAVE_TIMEOUT。
那么有沒有可能昌阿,存在request丟失的情況饥脑,不會(huì)出現(xiàn)。極端情況懦冰,當(dāng)put請求時(shí)當(dāng)前對象為write灶轰,但此時(shí)GroupTransferService線程正好等待結(jié)束,進(jìn)行讀寫轉(zhuǎn)換了刷钢,此時(shí)讀的對象變成了寫笋颤,寫的對象變成了讀,但是放置線程對象執(zhí)行了讀的遍歷内地。因?yàn)樵诜胖脮r(shí)伴澄,還是讀取遍歷時(shí)赋除,都進(jìn)行對對象鎖定。保證其他線程不能修改了非凌。但是會(huì)出現(xiàn)一種情況举农,就是讀取線程遍歷完后,會(huì)清空讀請求敞嗡,然后釋放鎖颁糟,此時(shí)put線程就獲取到該對象,然后放置了request對象秸妥。那么該對象何時(shí)會(huì)被讀取到呢滚停?需要進(jìn)行2次swap后才能被讀取到,但是不會(huì)被清空粥惧。


同步流程

作為broker 節(jié)點(diǎn) slave角色芽世,需要同步很多數(shù)據(jù),但是其中有2個(gè)數(shù)據(jù)不同步损搬,一個(gè)時(shí)消費(fèi)隊(duì)列ConsumeQueue數(shù)據(jù)崎淳,一個(gè)是索引數(shù)據(jù)IndexFile。那么回顧一下這2個(gè)數(shù)據(jù)是如何實(shí)現(xiàn)的咏删,在BrokerController中惹想,有一個(gè)線程服務(wù)ReputMessageService,會(huì)不停的從commitLog中獲取消息督函,然后進(jìn)行分發(fā)嘀粱,從而會(huì)添加消費(fèi)隊(duì)列數(shù)據(jù)和索引數(shù)據(jù)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末辰狡,一起剝皮案震驚了整個(gè)濱河市锋叨,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌宛篇,老刑警劉巖娃磺,帶你破解...
    沈念sama閱讀 219,589評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異叫倍,居然都是意外死亡偷卧,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,615評論 3 396
  • 文/潘曉璐 我一進(jìn)店門吆倦,熙熙樓的掌柜王于貴愁眉苦臉地迎上來听诸,“玉大人,你說我怎么就攤上這事逼庞∩吒” “怎么了?”我有些...
    開封第一講書人閱讀 165,933評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長派任。 經(jīng)常有香客問我砸逊,道長,這世上最難降的妖魔是什么掌逛? 我笑而不...
    開封第一講書人閱讀 58,976評論 1 295
  • 正文 為了忘掉前任师逸,我火速辦了婚禮,結(jié)果婚禮上豆混,老公的妹妹穿的比我還像新娘篓像。我一直安慰自己,他們只是感情好皿伺,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,999評論 6 393
  • 文/花漫 我一把揭開白布员辩。 她就那樣靜靜地躺著,像睡著了一般鸵鸥。 火紅的嫁衣襯著肌膚如雪奠滑。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,775評論 1 307
  • 那天妒穴,我揣著相機(jī)與錄音宋税,去河邊找鬼。 笑死讼油,一個(gè)胖子當(dāng)著我的面吹牛杰赛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播矮台,決...
    沈念sama閱讀 40,474評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼乏屯,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了瘦赫?” 一聲冷哼從身側(cè)響起瓶珊,我...
    開封第一講書人閱讀 39,359評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎耸彪,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體忘苛,經(jīng)...
    沈念sama閱讀 45,854評論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡蝉娜,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,007評論 3 338
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了扎唾。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片召川。...
    茶點(diǎn)故事閱讀 40,146評論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖胸遇,靈堂內(nèi)的尸體忽然破棺而出荧呐,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 35,826評論 5 346
  • 正文 年R本政府宣布倍阐,位于F島的核電站概疆,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏峰搪。R本人自食惡果不足惜岔冀,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,484評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望概耻。 院中可真熱鬧使套,春花似錦、人聲如沸鞠柄。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,029評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽厌杜。三九已至奉呛,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間期奔,已是汗流浹背侧馅。 一陣腳步聲響...
    開封第一講書人閱讀 33,153評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留呐萌,地道東北人馁痴。 一個(gè)月前我還...
    沈念sama閱讀 48,420評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像肺孤,于是被迫代替她去往敵國和親罗晕。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,107評論 2 356

推薦閱讀更多精彩內(nèi)容