消息中間件—RocketMQ的RPC通信(一)

文章摘要:借用小廝的一句話“消息隊列的本質在于消息的發(fā)送驮樊、存儲和接收”。那么,對于一款消息隊列來說囚衔,如何做到消息的高效發(fā)送與接收是重點和關鍵

一挖腰、RocketMQ中Remoting通信模塊概覽

RocketMQ消息隊列的整體部署架構如下圖所示:

RocketMQ整體的架構集群圖.jpg

先來說下RocketMQ消息隊列集群中的幾個角色:
(1)NameServer:在MQ集群中做的是做命名服務,更新和路由發(fā)現(xiàn) broker服務练湿;
(2)Broker-Master:broker 消息主機服務器猴仑;
(3)Broker-Slave:broker 消息從機服務器;
(4)Producer:消息生產(chǎn)者肥哎;
(5)Consumer:消息消費者辽俗;

其中,RocketMQ集群的一部分通信如下:
(1)Broker啟動后需要完成一次將自己注冊至NameServer的操作篡诽;隨后每隔30s時間定期向NameServer上報Topic路由信息崖飘;
(2)消息生產(chǎn)者Producer作為客戶端發(fā)送消息時候霞捡,需要根據(jù)Msg的Topic從本地緩存的TopicPublishInfoTable獲取路由信息碧信。如果沒有則更新路由信息會從NameServer上重新拉扰椴辍;
(3)消息生產(chǎn)者Producer根據(jù)(2)中獲取的路由信息選擇一個隊列(MessageQueue)進行消息發(fā)送;Broker作為消息的接收者收消息并落盤存儲酥泞;
從上面(1)~(3)中可以看出在消息生產(chǎn)者, Broker和NameServer之間都會發(fā)生通信(這里只說了MQ的部分通信)啃憎,因此如何設計一個良好的網(wǎng)絡通信模塊在MQ中至關重要辛萍,它將決定RocketMQ集群整體的消息傳輸能力與最終的性能贩毕。
rocketmq-remoting 模塊是 RocketMQ消息隊列中負責網(wǎng)絡通信的模塊辉阶,它幾乎被其他所有需要網(wǎng)絡通信的模塊(諸如rocketmq-client、rocketmq-server邢隧、rocketmq-namesrv)所依賴和引用冈在。為了實現(xiàn)客戶端與服務器之間高效的數(shù)據(jù)請求與接收包券,RocketMQ消息隊列自定義了通信協(xié)議并在Netty的基礎之上擴展了通信模塊溅固。
ps:鑒于RocketMQ的通信模塊是建立在Netty基礎之上的侍郭,因此在閱讀RocketMQ的源碼之前亮元,讀者最好先對Netty的多線程模型爆捞、JAVA NIO模型均有一定的了解煮甥,這樣子理解RocketMQ源碼會較為快一些成肘。
作者閱讀的RocketMQ版本是4.2.0, 依賴的netty版本是4.0.42.Final. RocketMQ的代碼結構圖如下:

RocketMQ的Remoting源代碼目錄結構.png

源碼部分主要可以分為rocketmq-broker双霍,rocketmq-client店煞,rocketmq-common顷蟀,rocketmq-filterSrv鸣个,rocketmq-namesrv和rocketmq-remoting等模塊囤萤,通信框架就封裝在rocketmq-remoting模塊中涛舍。
本文主要從RocketMQ的協(xié)議格式富雅,消息編解碼没佑,通信方式(同步/異步/單向)蛤奢、通信流程和Remoting模塊的Netty多線程處理架構等方面介紹RocketMQ的通信模塊啤贩。

二瓜晤、RocketMQ中Remoting通信模塊的具體實現(xiàn)

1痢掠、Remoting通信模塊的類結構圖

RocketMQ的Remoting模塊類結構圖.png

從類層次結構來看:
(1)RemotingService:為最上層的接口足画,提供了三個方法:

void start();
void shutdown();
void registerRPCHook(RPCHook rpcHook);

(2)RemotingClient/RemotingSever:兩個接口繼承了最上層接口—RemotingService医舆,分別各自為Client和Server提供所必需的方法蔬将,下面所列的是RemotingServer的方法:

/**
     * 同RemotingClient端一樣
     *
     * @param requestCode
     * @param processor
     * @param executor
     */
    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor);

    /**
     * 注冊默認的處理器
     *
     * @param processor
     * @param executor
     */
    void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);

    int localListenPort();

    /**
     * 根據(jù)請求code來獲取不同的處理Pair
     *
     * @param requestCode
     * @return
     */
    Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);

    /**
     * 同RemotingClient端一樣,同步通信,有返回RemotingCommand
     * @param channel
     * @param request
     * @param timeoutMillis
     * @return
     * @throws InterruptedException
     * @throws RemotingSendRequestException
     * @throws RemotingTimeoutException
     */
    RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
        final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
        RemotingTimeoutException;

    /**
     * 同RemotingClient端一樣,異步通信,無返回RemotingCommand
     *
     * @param channel
     * @param request
     * @param timeoutMillis
     * @param invokeCallback
     * @throws InterruptedException
     * @throws RemotingTooMuchRequestException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     */
    void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback) throws InterruptedException,
        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;

    /**
     * 同RemotingClient端一樣,單向通信毙石,諸如心跳包
     *
     * @param channel
     * @param request
     * @param timeoutMillis
     * @throws InterruptedException
     * @throws RemotingTooMuchRequestException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     */
    void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException;

(3)NettyRemotingAbstract:Netty通信處理的抽象類徐矩,定義并封裝了Netty處理的公共處理方法滤灯;
(4)NettyRemotingClient/NettyRemotingServer:分別實現(xiàn)了RemotingClient和RemotingServer, 都繼承了NettyRemotingAbstract抽象類力喷。RocketMQ中其他的組件(如client、nameServer拂募、broker在進行消息的發(fā)送和接收時均使用這兩個組件)

2、消息的協(xié)議設計與編碼解碼

在Client和Server之間完成一次消息發(fā)送時录肯,需要對發(fā)送的消息進行一個協(xié)議約定论咏,因此就有必要自定義RocketMQ的消息協(xié)議颁井。同時养涮,為了高效地在網(wǎng)絡中傳輸消息和對收到的消息讀取贯吓,就需要對消息進行編解碼悄谐。在RocketMQ中威沫,RemotingCommand這個類在消息傳輸過程中對所有數(shù)據(jù)內(nèi)容的封裝棒掠,不但包含了所有的數(shù)據(jù)結構,還包含了編碼解碼操作雾袱。
RemotingCommand類的部分成員變量如下:

Header字段 類型 Request說明 Response說明
code int 請求操作碼芹橡,應答方根據(jù)不同的請求碼進行不同的業(yè)務處理 應答響應碼望伦。0表示成功,非0則表示各種錯誤
language LanguageCode 請求方實現(xiàn)的語言 應答方實現(xiàn)的語言
version int 請求方程序的版本 應答方程序的版本
opaque int 相當于reqeustId劣摇,在同一個連接上的不同請求標識碼钧惧,與響應消息中的相對應 應答不做修改直接返回
flag int 區(qū)分是普通RPC還是onewayRPC得標志 區(qū)分是普通RPC還是onewayRPC得標志
remark String 傳輸自定義文本信息 傳輸自定義文本信息
extFields HashMap<String, String> 請求自定義擴展信息 響應自定義擴展信息

這里展示下Broker向NameServer發(fā)送一次心跳注冊的報文:

[
code=103,//這里的103對應的code就是broker向nameserver注冊自己的消息
language=JAVA,
version=137,
opaque=58,//這個就是requestId
flag(B)=0,
remark=null,
extFields={
    brokerId=0,
    clusterName=DefaultCluster,
    brokerAddr=ip1: 10911,
    haServerAddr=ip1: 10912,
    brokerName=LAPTOP-SMF2CKDN
},
serializeTypeCurrentRPC=JSON

下面來看下RocketMQ通信協(xié)議的格式:

RocketMQ中Remoting協(xié)議格式.png

可見傳輸內(nèi)容主要可以分為以下4部分:
(1)消息長度:總長度追逮,四個字節(jié)存儲,占用一個int類型眼滤;
(2)序列化類型&消息頭長度:同樣占用一個int類型漾唉,第一個字節(jié)表示序列化類型分衫,后面三個字節(jié)表示消息頭長度铐懊;
(3)消息頭數(shù)據(jù):經(jīng)過序列化后的消息頭數(shù)據(jù)壁畸;
(4)消息主體數(shù)據(jù):消息主體的二進制字節(jié)數(shù)據(jù)內(nèi)容;
消息的編碼和解碼分別在RemotingCommand類的encode和decode方法中完成腻异,下面是消息編碼encode方法的具體實現(xiàn):

public ByteBuffer encode() {
    // 1> header length size
    int length = 4;    //消息總長度

    // 2> header data length
    //將消息頭編碼成byte[]
    byte[] headerData = this.headerEncode(); 
    //計算頭部長度 
    length += headerData.length;              

    // 3> body data length
    if (this.body != null) {
        //消息主體長度
        length += body.length;                
    }
    //分配ByteBuffer, 這邊加了4, 
    //這是因為在消息總長度的計算中沒有將存儲頭部長度的4個字節(jié)計算在內(nèi)
    ByteBuffer result = ByteBuffer.allocate(4 + length);  

    // length
    //將消息總長度放入ByteBuffer
    result.putInt(length);   

    // header length
    //將消息頭長度放入ByteBuffer
    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); 

    // header data
    //將消息頭數(shù)據(jù)放入ByteBuffer
    result.put(headerData);    

    // body data;
    if (this.body != null) {
        //將消息主體放入ByteBuffer
        result.put(this.body); 
    }
    //重置ByteBuffer的position位置
    result.flip();     

    return result;
}

    /**
     * markProtocolType方法是將RPC類型和headerData長度編碼放到一個byte[4]數(shù)組中
     *
     * @param source
     * @param type
     * @return
     */
    public static byte[] markProtocolType(int source, SerializeType type) {
        byte[] result = new byte[4];

        result[0] = type.getCode();
        //右移16位后再和255與->“16-24位”
        result[1] = (byte) ((source >> 16) & 0xFF);
        //右移8位后再和255與->“8-16位”
        result[2] = (byte) ((source >> 8) & 0xFF);
        //右移0位后再和255與->“8-0位”
        result[3] = (byte) (source & 0xFF);
        return result;
    }

消息解碼decode方法是編碼的逆向過程机打,其具體實現(xiàn)如下:

public static RemotingCommand decode(final ByteBuffer byteBuffer) {
        //獲取byteBuffer的總長度
        int length = byteBuffer.limit();

        //獲取前4個字節(jié)皆辽,組裝int類型,該長度為總長度
        int oriHeaderLen = byteBuffer.getInt();

        //獲取消息頭的長度,這里和0xFFFFFF做與運算扼菠,編碼時候的長度即為24位
        int headerLength = getHeaderLength(oriHeaderLen);

        byte[] headerData = new byte[headerLength];
        byteBuffer.get(headerData);

        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

        int bodyLength = length - 4 - headerLength;
        byte[] bodyData = null;
        if (bodyLength > 0) {
            bodyData = new byte[bodyLength];
            byteBuffer.get(bodyData);
        }
        cmd.body = bodyData;

        return cmd;
    }

3、消息的通信方式和通信流程

在RocketMQ消息隊列中支持通信的方式主要有以下三種:
(1)同步(sync)
(2)異步(async)
(3)單向(oneway)
其中“同步”通信模式相對簡單,一般用在發(fā)送心跳包場景下,無需關注其Response椎组。本文將主要介紹RocketMQ的異步通信流程(限于篇幅弱贼,讀者可以按照同樣的模式進行分析同步通信流程)溪烤。
下面先給出了RocketMQ異步通信的整體流程圖:

RocketMQ異步通信的整體時序圖.png

下面兩小節(jié)內(nèi)容主要介紹了Client端發(fā)送請求消息和Server端接收消息的具體實現(xiàn)责嚷,其中對于Client端的回調可以參考RocketMQ的源碼來分析這里就不做詳細介紹。

3.1、Client發(fā)送請求消息的具體實現(xiàn)

當客戶端調用異步通信接口—invokeAsync時候棍鳖,先由RemotingClient的實現(xiàn)類—NettyRemotingClient根據(jù)addr獲取相應的channel(如果本地緩存中沒有則創(chuàng)建),隨后調用invokeAsyncImpl方法,將數(shù)據(jù)流轉給抽象類NettyRemotingAbstract處理(真正做完發(fā)送請求動作的是在NettyRemotingAbstract抽象類的invokeAsyncImpl方法里面)。具體發(fā)送請求消息的源代碼如下所示:

/**
     * invokeAsync(異步調用)
     * 
     * @param channel
     * @param request
     * @param timeoutMillis
     * @param invokeCallback
     * @throws InterruptedException
     * @throws RemotingTooMuchRequestException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     */
    public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        //相當于request ID, RemotingCommand會為每一個request產(chǎn)生一個request ID, 從0開始, 每次加1

        final int opaque = request.getOpaque();
        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
            //根據(jù)request ID構建ResponseFuture
            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);
            //將ResponseFuture放入responseTable
            this.responseTable.put(opaque, responseFuture);
            try {
                //使用Netty的channel發(fā)送請求數(shù)據(jù)
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    //消息發(fā)送后執(zhí)行
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        if (f.isSuccess()) {
                            //如果發(fā)送消息成功給Server锥债,那么這里直接Set后return
                            responseFuture.setSendRequestOK(true);
                            return;
                        } else {
                            responseFuture.setSendRequestOK(false);
                        }

                        responseFuture.putResponse(null);
                        responseTable.remove(opaque);
                        try {
                            //執(zhí)行回調
                            executeInvokeCallback(responseFuture);
                        } catch (Throwable e) {
                            log.warn("excute callback in writeAndFlush addListener, and callback throw", e);
                        } finally {
                            //釋放信號量
                            responseFuture.release();
                        }

                        log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                    }
                });
            } catch (Exception e) {
                //異常處理
                responseFuture.release();
                log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
            }
        } else {
            if (timeoutMillis <= 0) {
                throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
            } else {
                String info =
                    String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                        timeoutMillis,
                        this.semaphoreAsync.getQueueLength(),
                        this.semaphoreAsync.availablePermits()
                    );
                log.warn(info);
                throw new RemotingTimeoutException(info);
            }
        }
    }

在Client端發(fā)送請求消息時有個比較重要的數(shù)據(jù)結構需要注意下:
(1)responseTable—保存請求碼與響應關聯(lián)映射

protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable 

opaque表示請求發(fā)起方在同個連接上不同的請求標識代碼鸦致,每次發(fā)送一個消息的時候总棵,可以選擇同步阻塞/異步非阻塞的方式迄汛。無論是哪種通信方式专酗,都會保存請求操作碼至ResponseFuture的Map映射—responseTable中疗隶。
(2)ResponseFuture—保存返回響應(包括回調執(zhí)行方法和信號量)

public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback,
        SemaphoreReleaseOnlyOnce once) {
        this.opaque = opaque;
        this.timeoutMillis = timeoutMillis;
        this.invokeCallback = invokeCallback;
        this.once = once;
    }

對于同步通信來說猎荠,第三、四個參數(shù)為null;而對于異步通信來說横堡,invokeCallback是在收到消息響應的時候能夠根據(jù)responseTable找到請求碼對應的回調執(zhí)行方法道宅,semaphore參數(shù)用作流控葬项,當多個線程同時往一個連接寫數(shù)據(jù)時可以通過信號量控制permit同時寫許可的數(shù)量嚷量。
(3)異常發(fā)送流程處理—定時掃描responseTable本地緩存
在發(fā)送消息時候,如果遇到異常情況(比如服務端沒有response返回給客戶端或者response因網(wǎng)絡而丟失)落包,上面所述的responseTable的本地緩存Map將會出現(xiàn)堆積情況巷查。這個時候需要一個定時任務來專門做responseTable的清理回收崇败。在RocketMQ的客戶端/服務端啟動時候會產(chǎn)生一個頻率為1s調用一次來的定時任務檢查所有的responseTable緩存中的responseFuture變量,判斷是否已經(jīng)得到返回, 并進行相應的處理贡避。

public void scanResponseTable() {
        final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
        Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<Integer, ResponseFuture> next = it.next();
            ResponseFuture rep = next.getValue();

            if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
                rep.release();
                it.remove();
                rfList.add(rep);
                log.warn("remove timeout request, " + rep);
            }
        }

        for (ResponseFuture rf : rfList) {
            try {
                executeInvokeCallback(rf);
            } catch (Throwable e) {
                log.warn("scanResponseTable, operationComplete Exception", e);
            }
        }
    }

3.2琉历、Server端接收消息并進行處理的具體實現(xiàn)

Server端接收消息的處理入口在NettyServerHandler類的channelRead0方法中拄踪,其中調用了processMessageReceived方法(這里省略了Netty服務端消息流轉的大部分流程和邏輯)姚糊。其中服務端最為重要的處理請求方法實現(xiàn)如下:

public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
    //根據(jù)RemotingCommand中的code獲取processor和ExecutorService
    final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
    final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
    final int opaque = cmd.getOpaque();

    if (pair != null) {
        Runnable run = new Runnable() {
            @Override
            public void run() {
                try {
                    //rpc hook
                    RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
                    if (rpcHook != null) {
                        rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                    }
                    //processor處理請求
                    final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                    //rpc hook
                    if (rpcHook != null) {
                        rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                    }

                    if (!cmd.isOnewayRPC()) {
                        if (response != null) {
                            response.setOpaque(opaque);
                            response.markResponseType();
                            try {
                                ctx.writeAndFlush(response);
                            } catch (Throwable e) {
                                PLOG.error("process request over, but response failed", e);
                                PLOG.error(cmd.toString());
                                PLOG.error(response.toString());
                            }
                        } else {

                        }
                    }
                } catch (Throwable e) {
                    if (!"com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException"
                        .equals(e.getClass().getCanonicalName())) {
                        PLOG.error("process request exception", e);
                        PLOG.error(cmd.toString());
                    }

                    if (!cmd.isOnewayRPC()) {
                        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //
                            RemotingHelper.exceptionSimpleDesc(e));
                        response.setOpaque(opaque);
                        ctx.writeAndFlush(response);
                    }
                }
            }
        };

        if (pair.getObject1().rejectRequest()) {
            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                "[REJECTREQUEST]system busy, start flow control for a while");
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            return;
        }

        try {
            //封裝requestTask
            final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
            //想線程池提交requestTask
            pair.getObject2().submit(requestTask);
        } catch (RejectedExecutionException e) {
            if ((System.currentTimeMillis() % 10000) == 0) {
                PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
                    + ", too many requests and system thread pool busy, RejectedExecutionException " //
                    + pair.getObject2().toString() //
                    + " request code: " + cmd.getCode());
            }

            if (!cmd.isOnewayRPC()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[OVERLOAD]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
            }
        }
    } else {
        String error = " request type " + cmd.getCode() + " not supported";
        //構建response
        final RemotingCommand response =
            RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
        response.setOpaque(opaque);
        ctx.writeAndFlush(response);
        PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
    }
}

上面的請求處理方法中根據(jù)RemotingCommand的請求業(yè)務碼來匹配到相應的業(yè)務處理器贸辈;然后生成一個新的線程提交至對應的業(yè)務線程池進行異步處理。
(1)processorTable—請求業(yè)務碼與業(yè)務處理肠槽、業(yè)務線程池的映射變量

    protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
        new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);

我想RocketMQ這種做法是為了給不同類型的請求業(yè)務碼指定不同的處理器Processor處理擎淤,同時消息實際的處理并不是在當前線程,而是被封裝成task放到業(yè)務處理器Processor對應的線程池中完成異步執(zhí)行秸仙。(在RocketMQ中能看到很多地方都是這樣的處理嘴拢,這樣的設計能夠最大程度的保證異步,保證每個線程都專注處理自己負責的東西

三寂纪、總結

剛開始看RocketMQ源碼—RPC通信模塊可能覺得略微有點復雜席吴,但是只要能夠抓住Client端發(fā)送請求消息、Server端接收消息并處理的流程以及回調過程來分析和梳理捞蛋,那么整體來說并不復雜孝冒。RPC通信部分也是RocketMQ源碼中最重要的部分之一,想要對其中的全過程和細節(jié)有更為深刻的理解襟交,還需要多在本地環(huán)境Debug和分析對應的日志迈倍。同時,鑒于篇幅所限捣域,本篇還沒有來得及對RocketMQ的Netty多線程模型進行介紹啼染,將在消息中間件—RocketMQ的RPC通信(二)篇中來做詳細地介紹。
在此順便為自己打個Call焕梅,有興趣的朋友可以關注下我的個人公眾號:“匠心獨運的博客”迹鹅,對于Java并發(fā)、Spring贞言、數(shù)據(jù)庫和消息隊列的一些細節(jié)斜棚、問題的文章將會在這個公眾號上發(fā)布,歡迎交流與討論该窗。

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末弟蚀,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子酗失,更是在濱河造成了極大的恐慌义钉,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,907評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件规肴,死亡現(xiàn)場離奇詭異捶闸,居然都是意外死亡夜畴,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評論 3 395
  • 文/潘曉璐 我一進店門删壮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來贪绘,“玉大人,你說我怎么就攤上這事央碟∷肮啵” “怎么了?”我有些...
    開封第一講書人閱讀 164,298評論 0 354
  • 文/不壞的土叔 我叫張陵硬耍,是天一觀的道長垄琐。 經(jīng)常有香客問我,道長经柴,這世上最難降的妖魔是什么狸窘? 我笑而不...
    開封第一講書人閱讀 58,586評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮坯认,結果婚禮上翻擒,老公的妹妹穿的比我還像新娘。我一直安慰自己牛哺,他們只是感情好陋气,可當我...
    茶點故事閱讀 67,633評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著引润,像睡著了一般巩趁。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上淳附,一...
    開封第一講書人閱讀 51,488評論 1 302
  • 那天议慰,我揣著相機與錄音,去河邊找鬼奴曙。 笑死别凹,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的洽糟。 我是一名探鬼主播炉菲,決...
    沈念sama閱讀 40,275評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼坤溃!你這毒婦竟也來了拍霜?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,176評論 0 276
  • 序言:老撾萬榮一對情侶失蹤薪介,失蹤者是張志新(化名)和其女友劉穎沉御,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體昭灵,經(jīng)...
    沈念sama閱讀 45,619評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡吠裆,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,819評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了烂完。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片试疙。...
    茶點故事閱讀 39,932評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖抠蚣,靈堂內(nèi)的尸體忽然破棺而出祝旷,到底是詐尸還是另有隱情,我是刑警寧澤嘶窄,帶...
    沈念sama閱讀 35,655評論 5 346
  • 正文 年R本政府宣布怀跛,位于F島的核電站,受9級特大地震影響柄冲,放射性物質發(fā)生泄漏吻谋。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,265評論 3 329
  • 文/蒙蒙 一现横、第九天 我趴在偏房一處隱蔽的房頂上張望漓拾。 院中可真熱鬧,春花似錦戒祠、人聲如沸骇两。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,871評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽低千。三九已至,卻和暖如春馏颂,著一層夾襖步出監(jiān)牢的瞬間示血,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,994評論 1 269
  • 我被黑心中介騙來泰國打工饱亮, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留矾芙,地道東北人。 一個月前我還...
    沈念sama閱讀 48,095評論 3 370
  • 正文 我出身青樓近上,卻偏偏與公主長得像剔宪,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子壹无,可洞房花燭夜當晚...
    茶點故事閱讀 44,884評論 2 354

推薦閱讀更多精彩內(nèi)容