??【Alibaba中間件技術(shù)系列】「RocketMQ技術(shù)專題」RocketMQ消息發(fā)送的全部流程和落盤原理分析

前言介紹

RocketMQ目前在國(guó)內(nèi)應(yīng)該是比較流行的MQ 了绳锅,目前本人也在公司的項(xiàng)目中進(jìn)行使用和研究西饵,借著這個(gè)機(jī)會(huì),分析一下RocketMQ 發(fā)送一條消息到存儲(chǔ)一條消息的過(guò)程鳞芙,這樣會(huì)對(duì)以后大家分析和研究RocketMQ相關(guān)的問(wèn)題有一定的幫助眷柔。

技術(shù)范圍

分析的總體技術(shù)范圍發(fā)送到存儲(chǔ),本文的主要目的是主要是為了認(rèn)識(shí)一條消息并分析被發(fā)出且被存儲(chǔ)的原朝,代碼中驯嘱,關(guān)于 MQ 文件系統(tǒng)的優(yōu)化,設(shè)計(jì)等喳坠。

現(xiàn)在出發(fā)

來(lái)自官方源碼example的一段發(fā)送代碼:

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
producer.shutdown();

send發(fā)送的分析

直接看看send方法鞠评,send 方法會(huì)設(shè)置一個(gè)默認(rèn)的 timeout:3秒。默認(rèn)使用 SYNC 模式壕鹉,另外有Async和OneWay模式剃幌。需要處理方法簽名中的 Client 端的異常,網(wǎng)絡(luò)異常御板,Broker 端的異常锥忿,線程中斷異常牛郑。

sendDefaultImpl核心實(shí)現(xiàn)類

DefaultMQProducerImpl 的 sendDefaultImpl方法就是發(fā)送的主要邏輯怠肋。

代碼里,有個(gè)地方可以提一下淹朋,關(guān)于更新故障時(shí)間的策略笙各,RocketMQ有一個(gè)類 MQFaultStrategy,用來(lái)處理MQ錯(cuò)誤础芍,然后對(duì) MQ Server 進(jìn)行服務(wù)降級(jí)杈抢。

服務(wù)降級(jí)策略

如果發(fā)送一條消息在550ms以內(nèi),那么就不用降級(jí)仑性,如果550毫秒以外惶楼,就進(jìn)行容錯(cuò)降級(jí)(熔斷)30 秒,以此類推。

sendKernelImpl核心方法

再看DefaultMQProducerImpl 的 sendKernelImpl發(fā)送到內(nèi)核的方法實(shí)現(xiàn)歼捐。

先找到broker的地址何陆。嘗試壓縮大于4M 的消息(批量消息不壓縮),然后執(zhí)行各種鉤子豹储。

  • Request對(duì)象(存放數(shù)據(jù))
  • Context 上下文對(duì)象(存放調(diào)用上下文)贷盲。

這里會(huì)設(shè)置一個(gè)消息生成時(shí)間,即bornTimestamp剥扣,后面使用消息軌跡的時(shí)候巩剖,可以查看。

同步模式的核心處理

默認(rèn)情況下:如果采用SYNC 模式钠怯,就調(diào)用 MQClientAPIImpl 來(lái)發(fā)送消息佳魔,這一層還是在 Client 模塊里,在這一層呻疹,會(huì)設(shè)置更詳細(xì)的消息細(xì)節(jié)吃引,構(gòu)造命令對(duì)象。最后調(diào)用 remotingClient的 invokeSync 發(fā)送消息刽锤。

MQClientAPIImpl的sendMessage

MQClientAPIImpl的sendMessage這一層镊尺,會(huì)給命令對(duì)象設(shè)置一個(gè)CmdCode,叫SEND_MESSAGE并思,這個(gè)東西就是一個(gè)和Broker的契約庐氮,Broker會(huì)根據(jù)這個(gè)Code進(jìn)行不同的策略。

RPC的實(shí)現(xiàn)方式
  1. 如果這里用RPC的方式宋彼,例如弄砍,使用一個(gè)接口的抽象方法。
  2. 然后输涕,Broker對(duì)抽象方法進(jìn)行 RPC 調(diào)用音婶,這樣可不可以呢?
  3. 最后莱坎,看看 remotingClient的invokeSync是如何實(shí)現(xiàn)的衣式。

Remoting模塊發(fā)送消息實(shí)現(xiàn)

invokeSync方法

  1. 首先,執(zhí)行 RPCBefore 鉤子檐什,類似Spring的各種Bean擴(kuò)展組件
  2. 然后碴卧,就是對(duì)超時(shí)進(jìn)行判斷。
  3. 最后乃正,幾乎每個(gè)方法都有對(duì)超時(shí)的判斷住册,超時(shí)判斷和超時(shí)處理在分布式場(chǎng)景非常重要。
  4. 根據(jù)addr找到對(duì)應(yīng)的Socket Channel
  5. 然后執(zhí)行invokeSyncImpl方法瓮具。
  6. 這里其實(shí)和其他大部分的RPC框架都是類似的了荧飞,生產(chǎn)一個(gè)永遠(yuǎn)自增的Request ID凡人,創(chuàng)建一個(gè)Feature對(duì)象和這個(gè)ID綁定,方便Netty返回?cái)?shù)據(jù)對(duì)這個(gè)ID對(duì)應(yīng)的線程進(jìn)行喚醒叹阔。
  7. 然后調(diào)用Netty的writeAndFlush方法划栓,將數(shù)據(jù)寫進(jìn)Socket,同時(shí)添加一個(gè)監(jiān)聽(tīng)器条获,如果發(fā)送失敗忠荞,喚醒當(dāng)前線程。
  8. 發(fā)送完畢之后帅掘,當(dāng)前線程進(jìn)行等待委煤,使用CountDownLatch.wait方法實(shí)現(xiàn),當(dāng)Netty返回?cái)?shù)據(jù)時(shí)修档,使用CountDownLatch.countDown進(jìn)行喚醒
  9. 然后返回從 Broker 寫入的結(jié)果碧绞,可能成功,也可能失敗吱窝,需要到上層(Client 層)解析讥邻,網(wǎng)絡(luò)層只負(fù)責(zé)網(wǎng)絡(luò)的事情。

Netty 會(huì)使用 Handler 處理出去的數(shù)據(jù)和返回的數(shù)據(jù)院峡,我們看看 Client 端 Netty 有哪些 Handler.

Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    if (nettyClientConfig.isUseTLS()) {
                        if (null != sslContext) {
                            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                            log.info("Prepend SSL handler");
                        } else {
                            log.warn("Connections are insecure as SSLContext is null!");
                        }
                    }
                    pipeline.addLast(
                        defaultEventExecutorGroup,
                        new NettyEncoder(),
                        new NettyDecoder(),
                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                        new NettyConnectManageHandler(),
                        new NettyClientHandler());
                }
            });

使用了一個(gè) Encoder兴使,Decoder,空閑處理器照激,連接管理器发魄,ClientHandler。

XXCoder就是對(duì)Cmd對(duì)象進(jìn)行序列化和反序列化的俩垃,這里的空閑使用的讀寫最大空閑時(shí)間為120s励幼,超過(guò)這個(gè),就會(huì)觸發(fā)空閑事件口柳。

連接管理器
  • RocketMQ就會(huì)關(guān)閉Channel 連接苹粟。而針對(duì)空閑事件進(jìn)行處理的就是連接管理器了。

  • 連接管理器處理空閑跃闹、Close嵌削、Connect、異常等事件辣卒,使用監(jiān)聽(tīng)器模式掷贾,不同的監(jiān)聽(tīng)器對(duì)不同的事件進(jìn)行處理睛榄。另外荣茫,這里也許可以借鑒 EventBus,每個(gè)事件可以設(shè)置多個(gè)監(jiān)聽(tīng)器场靴。

如何處理返回值

看了RocketMQ中 Netty 的設(shè)計(jì)啡莉,再看看返回值處理就簡(jiǎn)單了港准,NettyClientHandler 會(huì)在 channelRead0 方法處理 Netty Server 的返回值。對(duì)應(yīng) RMQ咧欣,則是 processMessageReceived 方法浅缸。該方法很簡(jiǎn)潔:

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }

其實(shí),這是一個(gè)模板方法魄咕,固定算法衩椒,由子類實(shí)現(xiàn),分為 Request 實(shí)現(xiàn)和 Response 實(shí)現(xiàn)哮兰。我們看看 Response 實(shí)現(xiàn)毛萌。

public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        final int opaque = cmd.getOpaque();
        // 找到 Response .
        final ResponseFuture responseFuture = responseTable.get(opaque);
        if (responseFuture != null) {
            responseFuture.setResponseCommand(cmd);
            responseTable.remove(opaque);
            if (responseFuture.getInvokeCallback() != null) {
                executeInvokeCallback(responseFuture);
            } else {// 返回結(jié)果
                responseFuture.putResponse(cmd);
                responseFuture.release();
            }
        } else {
            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(cmd.toString());
        }
    }

通過(guò) cmd 對(duì)象的 Request ID 找到 Feature,執(zhí)行 responseFuture.putResponse喝滞,設(shè)置返回值阁将,喚醒阻塞等待的發(fā)送線程。

這里還有一個(gè) release 調(diào)用右遭,這個(gè)和異步發(fā)送有關(guān)做盅,默認(rèn)最大同時(shí) 65535 個(gè)異步請(qǐng)求,具體就不展開了窘哈。

到這里吹榴,喚醒阻塞的發(fā)送線程,返回?cái)?shù)據(jù)滚婉,客戶端層面的發(fā)送就結(jié)束了腊尚。

Broker端如何處理消息

看源碼,看到有個(gè) SEND_MESSAGE Code满哪,是 Client 和 Broker Server 的一個(gè)約定代碼婿斥,我們看看這個(gè)代碼在哪里用的。

在 broker 模塊的 BrokerController 類中哨鸭,有個(gè) registerProcessor 方法民宿,會(huì)將 SEND_MESSAGE Code 和一個(gè) SendMessageProcessor 對(duì)象綁定。

NettyServerHandler

NettyRemotingServer是處理Request 的類像鸡,ServerBootstrap 會(huì)在 pipeline 中添加一個(gè) NettyServerHandler處理器活鹰,這個(gè)處理器的channelRead0方法會(huì)調(diào)用 NettyRemotingServer的父類processMessageReceived 方法。

processMessageReceived

從processorTable 里只估,根據(jù) Cmd Code志群,也就是 SEND_MESSAGE 獲取對(duì)應(yīng)的 Processor

Processor 由 2 部分組成,

一部分是處理數(shù)據(jù)的對(duì)象蛔钙,一部分是這個(gè)對(duì)象所對(duì)應(yīng)的線程池锌云。用于異步處理邏輯,防止阻塞 Netty IO線程吁脱。

doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);// 處理.
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);

前后都是執(zhí)行一些鉤子桑涎,例如 ACL

RocketMQ會(huì)有一個(gè) BrokerController 類彬向,會(huì)注冊(cè) Code 和 Processor 的綁定關(guān)系,BrokerController 也會(huì)把這些綁定攻冷,注冊(cè)到 Netty Server 中娃胆,當(dāng) Netty Server 從 Socket 收到 Cmd 對(duì)象,根據(jù) Cmd 對(duì)象的 Code等曼,就可以找到對(duì)應(yīng) Processor 類里烦,對(duì)數(shù)據(jù)進(jìn)行處理。

中間是處理 Request請(qǐng)求的禁谦。這個(gè) processRequest 方法招驴,有很多的實(shí)現(xiàn),SendMessageProcessor的sendMessage 是處理消息的主要邏輯枷畏。

消息存儲(chǔ)引擎别厘,這里我們看DefaultMessageStore的putMessage 實(shí)現(xiàn)。

putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

由于RocketMQ寫數(shù)據(jù)是PageCache里面寫的拥诡,因此触趴,如果寫的慢,就是 PageCache 忙渴肉,這里忙的標(biāo)準(zhǔn)是冗懦,如果鎖文件的時(shí)間,超過(guò)了 1 秒仇祭,那就是忙披蕉。

if (this.isOSPageCacheBusy()) {// 檢查 mmp 忙不忙.
    return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}

最后調(diào)用 PutMessageResult result = this.commitLog.putMessage(msg) 寫數(shù)據(jù)。如果耗時(shí)超過(guò) 500 毫秒乌奇,就會(huì)打印日志没讲。這樣我們排查問(wèn)題的時(shí)候,可以看看 storeStats 的日志礁苗。

commitLog 的 putMessage 方法
  1. 先拿到最新的MappedFile 文件爬凑,MappedFile 文件的命名是用 offset 命名的,一個(gè)文件默認(rèn) 1gb试伙,這個(gè)大小和 mmp 的機(jī)制有關(guān)嘁信,通常不能過(guò)大。

  2. 然后上鎖疏叨,這段代碼是可以說(shuō)整個(gè) RocketMQ Server 的熱點(diǎn)區(qū)域潘靖,

  3. 這里上鎖會(huì)記錄上鎖的時(shí)間,方便前面做 PageCache Busy 的判斷蚤蔓。

寫入代碼
result = mappedFile.appendMessage(msg, this.appendMessageCallback)

寫完之后卦溢,釋放鎖,如果超過(guò) 500 毫秒,打印 cost time 日志既绕。

統(tǒng)計(jì)

處理刷盤和slave 同步,這里看刷盤策略和同步策略涮坐,是 SYNC 還是 ASYNC凄贩。經(jīng)過(guò)我的測(cè)試,同步刷盤和異步刷盤的性能差距是 10 倍袱讹。

而 Slave 的數(shù)據(jù)同步疲扎,如果用 SYNC 模式,tps 最高也就 2000 多一丟度捷雕,為什么椒丧??jī)?nèi)網(wǎng),兩臺(tái)機(jī)器 ping 一下都要 0.2 毫秒救巷,一秒最多 5000 次壶熏,再加上處理邏輯, 2000 已經(jīng)到頂了浦译,網(wǎng)絡(luò)成了瓶頸棒假。

我們看看 mappedFile.appendMessage 方法的實(shí)現(xiàn)。一路追蹤精盅,有個(gè)關(guān)鍵邏輯帽哑, 在 appendMessagesInner 里:

int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {
    ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
    byteBuffer.position(currentPos);
    AppendMessageResult result = null;
    if (messageExt instanceof MessageExtBrokerInner) {
        // 寫數(shù)據(jù)到 緩存
        result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
    } else if (messageExt instanceof MessageExtBatch) {
        result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
    } else {
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }
    this.wrotePosition.addAndGet(result.getWroteBytes());
    this.storeTimestamp = result.getStoreTimestamp();
    return result;
}

代碼中,使用了 mappedFile 從 Linux 映射的 MMap buffer叹俏,對(duì)數(shù)據(jù)進(jìn)行寫入妻枕。我們看看 doAppend 方法。

總長(zhǎng)度粘驰、魔數(shù)屡谐、CRC 校驗(yàn)、隊(duì)列 ID蝌数、各種 flag康嘉、存儲(chǔ)時(shí)間,物理 offset籽前、存儲(chǔ) IP亭珍、時(shí)間戳、擴(kuò)展屬性等等枝哄。最終肄梨,這條消息會(huì)被寫入到 MMap 中。

那什么時(shí)候刷盤
  • 如果是 SYNC 模式挠锥,執(zhí)行 CommitLog 的 handleDiskFlush 的方法時(shí)众羡,就會(huì)立刻刷盤并等待刷盤結(jié)果。
  • 如果是 ASYNC 模式蓖租,執(zhí)行 CommitLog 的 handleDiskFlush 的方法時(shí)粱侣,會(huì)通知異步線程進(jìn)行刷盤羊壹,但不等待結(jié)果。

如果沒(méi)有新數(shù)據(jù)齐婴,則為 500ms 執(zhí)行一次刷盤策略油猫。

簡(jiǎn)單說(shuō)下異步刷盤:

默認(rèn)刷盤 4 頁(yè),Linux 一頁(yè)是 4kb 數(shù)據(jù)柠偶,4頁(yè)就是 16kb情妖。

如果寫的數(shù)據(jù)減去已經(jīng)刷的數(shù)據(jù),剩下的數(shù)據(jù)大于等于 4 頁(yè)诱担,就執(zhí)行刷盤毡证,執(zhí)行 mappedByteBuffer.force() 或者 fileChannel.force(false);

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市蔫仙,隨后出現(xiàn)的幾起案子料睛,更是在濱河造成了極大的恐慌,老刑警劉巖摇邦,帶你破解...
    沈念sama閱讀 219,110評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件秦效,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡涎嚼,警方通過(guò)查閱死者的電腦和手機(jī)阱州,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)法梯,“玉大人苔货,你說(shuō)我怎么就攤上這事×⒀疲” “怎么了夜惭?”我有些...
    開封第一講書人閱讀 165,474評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)铛绰。 經(jīng)常有香客問(wèn)我诈茧,道長(zhǎng),這世上最難降的妖魔是什么捂掰? 我笑而不...
    開封第一講書人閱讀 58,881評(píng)論 1 295
  • 正文 為了忘掉前任敢会,我火速辦了婚禮,結(jié)果婚禮上这嚣,老公的妹妹穿的比我還像新娘鸥昏。我一直安慰自己,他們只是感情好姐帚,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,902評(píng)論 6 392
  • 文/花漫 我一把揭開白布吏垮。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪膳汪。 梳的紋絲不亂的頭發(fā)上唯蝶,一...
    開封第一講書人閱讀 51,698評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音遗嗽,去河邊找鬼粘我。 笑死,一個(gè)胖子當(dāng)著我的面吹牛媳谁,可吹牛的內(nèi)容都是我干的涂滴。 我是一名探鬼主播友酱,決...
    沈念sama閱讀 40,418評(píng)論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼晴音,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了缔杉?” 一聲冷哼從身側(cè)響起锤躁,我...
    開封第一講書人閱讀 39,332評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎或详,沒(méi)想到半個(gè)月后系羞,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,796評(píng)論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡霸琴,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,968評(píng)論 3 337
  • 正文 我和宋清朗相戀三年椒振,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片梧乘。...
    茶點(diǎn)故事閱讀 40,110評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡澎迎,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出选调,到底是詐尸還是另有隱情夹供,我是刑警寧澤,帶...
    沈念sama閱讀 35,792評(píng)論 5 346
  • 正文 年R本政府宣布仁堪,位于F島的核電站哮洽,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏弦聂。R本人自食惡果不足惜鸟辅,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,455評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望莺葫。 院中可真熱鬧剔桨,春花似錦、人聲如沸徙融。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,003評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至树绩,卻和暖如春萨脑,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背饺饭。 一陣腳步聲響...
    開封第一講書人閱讀 33,130評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工渤早, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人瘫俊。 一個(gè)月前我還...
    沈念sama閱讀 48,348評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像扛芽,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子登下,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,047評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容