RocketMQ 發(fā)送消息(一條消息從發(fā)送到存儲的過程)

目錄:

  • 前言
  • quickStart
  • 單刀直入
  • Remoting 模塊發(fā)送消息實現(xiàn)
  • 如何處理返回值
  • Broker Server 處理消息流程

前言

RocketMQ 目前在國內(nèi)應(yīng)該是比較流行的 MQ 了效五,樓主目前也在使用中,今天借著本文柳沙,理理 RocketMQ 發(fā)送一條消息到存儲一條消息的過程霹菊。

注意:本文主線是發(fā)送到存儲报腔,因此孟岛,閱讀源碼時沸手,其他和這條線相關(guān)度不高的代碼内边,會酌情閱讀。另外索绪,本文的目的是為了看清一條消息是如何被發(fā)出且被存儲的湖员,代碼中,關(guān)于 MQ 文件系統(tǒng)的優(yōu)化瑞驱,設(shè)計等娘摔,并不會花很多篇幅介紹。

quickStart

來自官方源碼 example 的一段發(fā)送代碼:

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();

Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);

producer.shutdown();

單刀直入

我們直接看看 send 方法唤反。

send 方法會設(shè)置一個默認(rèn)的 timeout凳寺, 3 秒。
默認(rèn)使用 SYNC 模式拴袭,另外有 Async 和 OneWay 模式读第。
我們需要處理方法簽名中的 Client 端的異常,網(wǎng)絡(luò)異常拥刻,Broker 端的異常,線程中斷異常父泳。

DefaultMQProducerImpl # sendDefaultImpl 方法就是發(fā)送的主要邏輯般哼。

image

這端代碼里,有個有趣的地方惠窄,可以提一下蒸眠,關(guān)于更新故障時間的策略,RMQ 有一個類 MQFaultStrategy杆融,用來處理 MQ 錯誤楞卡,然后對 MQ Server 進(jìn)行服務(wù)降級。

對照圖:

image

這個策略具體內(nèi)容:如果發(fā)送一條消息在 550 毫秒以內(nèi)脾歇,那么就不用降級蒋腮,如果550 毫秒以外,就進(jìn)行容錯降級(熔斷)30 秒藕各,以此類推池摧。

再看 DefaultMQProducerImpl # sendKernelImpl 發(fā)送到內(nèi)核的方法實現(xiàn)。

先找到 broker 的地址激况。嘗試壓縮大于 4M 的消息(批量消息不壓縮)作彤。執(zhí)行各種鉤子膘魄。構(gòu)造 Request 對象(存放數(shù)據(jù)),Context 上下文對象(存放調(diào)用上下文)竭讳。

這里會設(shè)置一個消息生成時間创葡,即 bornTimestamp。后面使用消息軌跡的時候绢慢,可以查看蹈丸。

最后,如果是 SYNC 模式呐芥,就調(diào)用 MQClientAPIImpl 來發(fā)送消息逻杖,這一層還是在 Client 模塊里,在這一層思瘟,會設(shè)置更詳細(xì)的消息細(xì)節(jié)荸百,構(gòu)造命令對象。最后調(diào)用 remotingClient # invokeSync 發(fā)送消息滨攻。

注意够话,在 MQClientAPIImpl # sendMessage 這一層,會給命令對象設(shè)置一個 CmdCode光绕,叫 SEND_MESSAGE女嘲,這個東西就是一個和 Broker 的契約,Broker 會根據(jù)這個 Code 進(jìn)行不同的策略诞帐。另外欣尼,如果這里用 RPC 的方式,例如停蕉,使用一個接口的抽象方法愕鼓,然后 Broker 對抽象方法進(jìn)行 RPC 調(diào)用,這樣可不可以呢慧起?

最后菇晃,看看 remotingClient # invokeSync 是如何實現(xiàn)的。

Remoting 模塊發(fā)送消息實現(xiàn)

invokeSync 方法首先執(zhí)行 RPCBefore 鉤子蚓挤,類似 Spring 的各種 Bean 擴展組件磺送,然后就是對超時進(jìn)行判斷〔右猓可以看到估灿,每個方法幾乎都有對超時的判斷,超時判斷和超時處理在分布式場景非常重要脾歧。

然后根據(jù) addr 找到對應(yīng)的 Socket Channel甲捏。然后執(zhí)行 invokeSyncImpl 方法。

這里其實和其他大部分的 RPC 框架都是類似的了鞭执,生產(chǎn)一個永遠(yuǎn)自增的 Request ID司顿,創(chuàng)建一個 Feature 對象芒粹,和這個 ID 綁定,方便 Netty 返回數(shù)據(jù)對這個 ID 對應(yīng)的線程進(jìn)行喚醒大溜。
然后調(diào)用 Netty 的 writeAndFlush 方法化漆,將數(shù)據(jù)寫進(jìn) Socket,同時添加一個監(jiān)聽器钦奋,如果發(fā)送失敗座云,喚醒當(dāng)前線程。

發(fā)送完畢之后付材,當(dāng)前線程進(jìn)行等待朦拖,使用 CountDownLatch.wait 方法實現(xiàn),當(dāng) Netty 返回數(shù)據(jù)時厌衔,使用 CountDownLatch.countDown 進(jìn)行喚醒璧帝,然后返回從 Broker 寫入的結(jié)果,可能成功富寿,也可能失敗睬隶,需要到上層(Client 層)解析,網(wǎng)絡(luò)層只負(fù)責(zé)網(wǎng)絡(luò)的事情页徐。

我們知道苏潜, Netty 會使用 Handler 處理出去的數(shù)據(jù)和返回的數(shù)據(jù),我們看看 Client 端 Netty 有哪些 Handler.

Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    if (nettyClientConfig.isUseTLS()) {
                        if (null != sslContext) {
                            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                            log.info("Prepend SSL handler");
                        } else {
                            log.warn("Connections are insecure as SSLContext is null!");
                        }
                    }
                    pipeline.addLast(
                        defaultEventExecutorGroup,
                        new NettyEncoder(),
                        new NettyDecoder(),
                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                        new NettyConnectManageHandler(),
                        new NettyClientHandler());
                }
            });

我們看到变勇,這里使用了一個 Encoder恤左,Decoder,空閑處理器贰锁,連接管理器赃梧,ClientHandler。

XXCoder 就是對 Cmd 對象進(jìn)行序列化和反序列化的豌熄。這里的空閑使用的讀寫最大空閑時間為 120s,超過這個物咳,就會觸發(fā)空閑事件锣险。RMQ 就會關(guān)閉 Channel 連接。而針對空閑事件進(jìn)行處理的就是連接管理器了览闰。

連接管理器處理空閑芯肤、Close、Connect压鉴、異常等事件崖咨,使用監(jiān)聽器模式,不同的監(jiān)聽器對不同的事件進(jìn)行處理油吭。另外击蹲,這里也許可以借鑒 EventBus署拟,每個事件可以設(shè)置多個監(jiān)聽器。

如何處理返回值

我們看了 RMQ 中 Netty 的設(shè)計歌豺,再看看返回值處理就簡單了推穷,NettyClientHandler 會在 channelRead0 方法處理 Netty Server 的返回值。對應(yīng) RMQ类咧,則是 processMessageReceived 方法馒铃。該方法很簡潔:

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }    

其實,這是一個模板方法痕惋,固定算法区宇,由子類實現(xiàn),分為 Request 實現(xiàn)和 Response 實現(xiàn)值戳。我們看看 Response 實現(xiàn)议谷。

public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        final int opaque = cmd.getOpaque();
        // 找到 Response .
        final ResponseFuture responseFuture = responseTable.get(opaque);
        if (responseFuture != null) {
            responseFuture.setResponseCommand(cmd);

            responseTable.remove(opaque);

            if (responseFuture.getInvokeCallback() != null) {
                executeInvokeCallback(responseFuture);
            } else {// 返回結(jié)果
                responseFuture.putResponse(cmd);
                responseFuture.release();
            }
        } else {
            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(cmd.toString());
        }
    }

這里,通過 cmd 對象的 Request ID 找到 Feature,執(zhí)行 responseFuture.putResponse述寡,設(shè)置返回值柿隙,喚醒阻塞等待的發(fā)送線程。這里還有一個 release 調(diào)用鲫凶,這個和異步發(fā)送有關(guān)禀崖,默認(rèn)最大同時 65535 個異步請求,具體就不展開了螟炫。

好波附,到這里,喚醒阻塞的發(fā)送線程昼钻,返回數(shù)據(jù)掸屡,客戶端層面的發(fā)送就結(jié)束了,我們小結(jié)一下然评。根據(jù)模塊層次仅财,我們記錄一下 sendMessage 的過程:

image

層次還是比較清晰的。

我們再來看看 Server 端如何處理一條消息的碗淌。

Broker Server 處理消息流程

從哪里入手呢盏求?

我們上面看源碼,看到有個 SEND_MESSAGE Code亿眠,是 Client 和 Broker Server 的一個約定代碼碎罚,我們看看這個代碼在哪里用的。

在 broker 模塊的 BrokerController 類中纳像,有個 registerProcessor 方法荆烈,會將 SEND_MESSAGE Code 和一個 SendMessageProcessor 對象綁定。

這一步我們停一下竟趾,再去看看 netty Server 端的 Handler憔购。

NettyRemotingServer 是處理 Request 的類宫峦,他的 ServerBootstrap 會在 pipeline 中添加一個 NettyServerHandler 處理器,這個處理器的 channelRead0 方法會調(diào)用 NettyRemotingServer 的父類 processMessageReceived 方法倦始。

這個方法會從 processorTable 里斗遏,根據(jù) Cmd Code,也就是 SEND_MESSAGE 獲取對應(yīng)的 Processor鞋邑, Processor 由 2 部分組成诵次,一部分是處理數(shù)據(jù)的對象,一部分是這個對象所對應(yīng)的線程池枚碗。用于異步處理邏輯逾一,防止阻塞 Netty IO 線程。

關(guān)鍵代碼:

doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);// 處理.
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);

前后都是執(zhí)行一些鉤子肮雨,例如 ACL 啥的遵堵。

這里我們小結(jié)一下,RMQ 會有一個 BrokerController 類怨规,會注冊 Code 和 Processor 的綁定關(guān)系陌宿,BrokerController 也會把這些綁定,注冊到 Netty Server 中波丰,當(dāng) Netty Server 從 Socket 收到 Cmd 對象壳坪,根據(jù) Cmd 對象的 Code,就可以找到對應(yīng) Processor 類掰烟,對數(shù)據(jù)進(jìn)行處理爽蝴。

中間是處理 Request 請求的。這個 processRequest 方法纫骑,有很多的實現(xiàn)蝎亚,如下圖,我們主要看 SendMessageProcessor 的實現(xiàn)先馆。

image

SendMessageProcessor # sendMessage 是處理消息的主要邏輯发框。

關(guān)鍵代碼:
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

消息存儲引擎,這里我們看 DefaultMessageStore 的 putMessage 實現(xiàn)煤墙。

首先一堆校驗缤底。注意,其中有一個地方:

if (this.isOSPageCacheBusy()) {// 檢查 mmp 忙不忙.
    return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}

由于 RMQ 寫數(shù)據(jù)是王 PageCache 里面寫的番捂,因此,如果寫的慢江解,就是 PageCache 忙设预,這里忙的標(biāo)準(zhǔn)是,如果鎖文件的時間犁河,超過了 1 秒鳖枕,那就是忙魄梯。

最后調(diào)用 PutMessageResult result = this.commitLog.putMessage(msg) 寫數(shù)據(jù)。

如果耗時超過 500 毫秒宾符,就會打印日志酿秸。這樣我們排查問題的時候,可以看看 storeStats 的日志魏烫。

看看 commitLog 的 putMessage 方法實現(xiàn)辣苏。

先拿到最新的 MappedFile 文件,MappedFile 文件的命名是用 offset 命名的哄褒,一個文件默認(rèn) 1gb稀蟋,這個大小和 mmp 的機制有關(guān),通常不能過大呐赡。

然后上鎖退客,這段代碼是可以說整個 RMQ Server 的熱點區(qū)域,

這里上鎖會記錄上鎖的時間链嘀,方便前面做 PageCache Busy 的判斷萌狂。

寫入代碼:

result = mappedFile.appendMessage(msg, this.appendMessageCallback)

寫完之后,釋放鎖怀泊,如果超過 500 毫秒茫藏,打印 cost time 日志。

統(tǒng)計包个。

處理刷盤和slave 同步刷允,這里看刷盤策略和同步策略,是 SYNC 還是 ASYNC碧囊。

經(jīng)過我的測試树灶,同步刷盤和異步刷盤的性能差距是 10 倍。
而 Slave 的數(shù)據(jù)同步糯而,如果用 SYNC 模式天通,tps 最高也就 2000 多一丟度,為什么熄驼?內(nèi)網(wǎng)像寒,兩臺機器 ping 一下都要 0.2 毫秒,一秒最多 5000 次瓜贾,再加上處理邏輯诺祸, 2000 已經(jīng)到頂了,網(wǎng)絡(luò)成了瓶頸祭芦。

如果用全異步的話筷笨,我的 4c8g 的機器,單機 tps 最高能 2 萬多。美滋滋胃夏。

跑題了轴或。

我們看看 mappedFile.appendMessage 方法的實現(xiàn)。

一路追蹤仰禀,有個關(guān)鍵邏輯照雁, 在 appendMessagesInner 里:

int currentPos = this.wrotePosition.get();

if (currentPos < this.fileSize) {
    ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
    byteBuffer.position(currentPos);
    AppendMessageResult result = null;
    if (messageExt instanceof MessageExtBrokerInner) {
        // 寫數(shù)據(jù)到 緩存
        result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
    } else if (messageExt instanceof MessageExtBatch) {
        result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
    } else {
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }
    this.wrotePosition.addAndGet(result.getWroteBytes());
    this.storeTimestamp = result.getStoreTimestamp();
    return result;
}

代碼中,使用了 mappedFile 從 Linux 映射的 MMap buffer答恶,對數(shù)據(jù)進(jìn)行寫入饺蚊。我們看看 doAppend 方法。

我們可以看到亥宿,一條消息有太多的內(nèi)容:
總長度卸勺、魔數(shù)、CRC 校驗烫扼、隊列 ID曙求、各種 flag、存儲時間映企,物理 offset悟狱、存儲 IP砰诵、時間戳穿撮、擴展屬性等等瑰步。

最終旁赊,這條消息會被寫入到 MMap 中。

那什么時候刷盤呢蛀柴?

如果是 SYNC 模式郭卫,執(zhí)行 CommitLog 的 handleDiskFlush 的方法時比然,就會立刻刷盤并等待刷盤結(jié)果囤攀。
如果是 ASYNC 模式软免,執(zhí)行 CommitLog 的 handleDiskFlush 的方法時,會通知異步線程進(jìn)行刷盤焚挠,但不等待結(jié)果膏萧。

另外,如果沒有新數(shù)據(jù)蝌衔,則為 500ms 執(zhí)行一次刷盤策略榛泛。

簡單說下異步刷盤:

  1. 默認(rèn)刷盤 4 頁,Linux 一頁是 4kb 數(shù)據(jù)噩斟,4頁就是 16kb曹锨。
  2. 如果寫的數(shù)據(jù)減去已經(jīng)刷的數(shù)據(jù),剩下的數(shù)據(jù)大于等于 4 頁剃允,就執(zhí)行刷盤.
  3. 執(zhí)行 mappedByteBuffer.force() 或者 fileChannel.force(false);

我們這里小結(jié)一下艘希,看看 RMQ Server 處理處理一條消息的:

image.png

總結(jié)

來張大圖總結(jié)一下硼身。

image.png

篇幅有限,下篇再一起看看 RMQ 如何消費消息覆享。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市营袜,隨后出現(xiàn)的幾起案子撒顿,更是在濱河造成了極大的恐慌,老刑警劉巖荚板,帶你破解...
    沈念sama閱讀 218,858評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件凤壁,死亡現(xiàn)場離奇詭異,居然都是意外死亡跪另,警方通過查閱死者的電腦和手機拧抖,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來免绿,“玉大人唧席,你說我怎么就攤上這事〕凹荩” “怎么了淌哟?”我有些...
    開封第一講書人閱讀 165,282評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長辽故。 經(jīng)常有香客問我徒仓,道長,這世上最難降的妖魔是什么誊垢? 我笑而不...
    開封第一講書人閱讀 58,842評論 1 295
  • 正文 為了忘掉前任掉弛,我火速辦了婚禮,結(jié)果婚禮上喂走,老公的妹妹穿的比我還像新娘殃饿。我一直安慰自己,他們只是感情好缴啡,可當(dāng)我...
    茶點故事閱讀 67,857評論 6 392
  • 文/花漫 我一把揭開白布壁晒。 她就那樣靜靜地躺著,像睡著了一般业栅。 火紅的嫁衣襯著肌膚如雪秒咐。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,679評論 1 305
  • 那天碘裕,我揣著相機與錄音携取,去河邊找鬼。 笑死帮孔,一個胖子當(dāng)著我的面吹牛雷滋,可吹牛的內(nèi)容都是我干的不撑。 我是一名探鬼主播,決...
    沈念sama閱讀 40,406評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼晤斩,長吁一口氣:“原來是場噩夢啊……” “哼焕檬!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起澳泵,我...
    開封第一講書人閱讀 39,311評論 0 276
  • 序言:老撾萬榮一對情侶失蹤实愚,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后兔辅,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體腊敲,經(jīng)...
    沈念sama閱讀 45,767評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年维苔,在試婚紗的時候發(fā)現(xiàn)自己被綠了碰辅。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,090評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡介时,死狀恐怖没宾,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情潮尝,我是刑警寧澤榕吼,帶...
    沈念sama閱讀 35,785評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站勉失,受9級特大地震影響羹蚣,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜乱凿,卻給世界環(huán)境...
    茶點故事閱讀 41,420評論 3 331
  • 文/蒙蒙 一顽素、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧徒蟆,春花似錦胁出、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至寺枉,卻和暖如春抑淫,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背姥闪。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評論 1 271
  • 我被黑心中介騙來泰國打工始苇, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人筐喳。 一個月前我還...
    沈念sama閱讀 48,298評論 3 372
  • 正文 我出身青樓催式,卻偏偏與公主長得像函喉,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子荣月,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,033評論 2 355