Dubbo的服務調用(心跳抄罕、編碼允蚣、解碼)

本系列主要參考官網(wǎng)文檔、芋道源碼的源碼解讀和《深入理解Apache Dubbo與實戰(zhàn)》一書呆贿。Dubbo版本為2.6.1嚷兔。
本篇用以分析心跳、編碼做入、解碼相關的代碼冒晰。

文章內容順序:
1.心跳
  1.1 為什么要有心跳?
  1.2 HeaderExchangeClient
  1.3 HeaderExchangeClient#startHeatbeatTimer
  1.4 HeartBeatTask#run
2.編碼
  2.1 編碼的鏈路:
  2.2 消息頭的字段的意義
  2.3 ExchangeCodec
  2.4 序列化的多種實現(xiàn)
3.解碼
  3.1 InternalDecoder#messageReceived
  3.2 DubboCountCodec#decode
  3.3 DubboCodec.decode(channel,buffer)
  3.4 DubboCodec#decodeBody
  3.5 DecodeableRpcInvocation#decode
  3.6 解碼的方法調用順序

首先是心跳相關

1.心跳

在上一篇服務調用的消費端中竟块,我們介紹到HeaderExchangeClient的構造方法中會有心跳的一些邏輯壶运,在那邊一筆帶過了,在這篇文章來詳細看看浪秘。

1.1為什么要有心跳蒋情?

心跳間隔,對于長連接耸携,當物理層斷開時恕出,比如拔網(wǎng)線,TCP的FIN消息來不及發(fā)送违帆,對方收不到斷開事件浙巫,此時需要心跳來幫助檢查連接是否已斷開

這里仍舊貼一下HeaderExchangeClient的構造方法

1.2HeaderExchangeClient

public HeaderExchangeClient(Client client, boolean needHeartbeat) {
        if (client == null) {
            throw new IllegalArgumentException("client == null");
        }
        this.client = client;
        
        // 創(chuàng)建 HeaderExchangeChannel 對象
        this.channel = new HeaderExchangeChannel(client);
        
        // 以下代碼均與心跳檢測邏輯有關
        String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
        this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
        this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        if (heartbeatTimeout < heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        if (needHeartbeat) {
            // 開啟心跳檢測定時器
            startHeartbeatTimer();
        }
    }

此方法最后調用 #startHeatbeatTimer() 方法,發(fā)起心跳定時器刷后。直接來看startHeatbeatTimer()方法的實現(xiàn)吧

1.3HeaderExchangeClient#startHeatbeatTimer

    private void startHeatbeatTimer() {
        // 停止原有定時任務
        stopHeartbeatTimer();
        // 發(fā)起新的定時任務
        if (heartbeat > 0) {
            heartbeatTimer = scheduled.scheduleWithFixedDelay(
                    new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                        public Collection<Channel> getChannels() {
                            return Collections.<Channel>singletonList(HeaderExchangeClient.this);
                        }
                    }, heartbeat, heartbeatTimeout),
                    heartbeat, heartbeat, TimeUnit.MILLISECONDS);
        }
    }

可以看到這邊就是直接調用線程池進行了一個定時任務HeartBeatTask對象的畴,scheduleWithFixedDelay方法意為當當前任務執(zhí)行完畢后再隔多少秒進行下一個任務。

接下來我們來看一下HeartBeatTask的實現(xiàn)尝胆。

1.4HeartBeatTask#run

    public void run() {
        try {
            long now = System.currentTimeMillis();
            for (Channel channel : channelProvider.getChannels()) {
                if (channel.isClosed()) {
                    continue;
                }
                try {
                    Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);
                    Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
                    // 最后讀寫的時間丧裁,任一超過心跳間隔,發(fā)送心跳
                    if ((lastRead != null && now - lastRead > heartbeat)
                            || (lastWrite != null && now - lastWrite > heartbeat)) {
                        Request req = new Request();
                        req.setVersion("2.0.0");
                        req.setTwoWay(true); // 需要響應
                        req.setEvent(Request.HEARTBEAT_EVENT);
                        channel.send(req);
                        if (logger.isDebugEnabled()) {
                            logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
                                    + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
                        }
                    }
                    // 最后讀的時間含衔,超過心跳超時時間
                    if (lastRead != null && now - lastRead > heartbeatTimeout) {
                        logger.warn("Close channel " + channel
                                + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
                        // 客戶端側煎娇,重新連接服務端
                        if (channel instanceof Client) {
                            try {
                                ((Client) channel).reconnect();
                            } catch (Exception e) {
                                //do nothing
                            }
                        // 服務端側,關閉客戶端連接
                        } else {
                            channel.close();
                        }
                    }
                } catch (Throwable t) {
                    logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
                }
            }
        } catch (Throwable t) {
            logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
        }
    }
  • 【任務一】:最后讀或寫的時間贪染,任一超過心跳間隔 heartbeat 缓呛,發(fā)送心跳。
  • 【任務二】:最后讀的時間杭隙,超過心跳超時時間 heartbeatTimeout 哟绊,分成兩種情況:
    客戶端側,重連連接服務端痰憎。
    服務端側票髓,關閉客戶端連接攀涵。

至此,心跳的分析告一段落洽沟。

接下來來看看編碼的操作以故。

2.編碼

2.1編碼的鏈路

image.png

看上面這張圖,當運行到NettyChannel#send的這一行后裆操,就會跳進Netty的執(zhí)行邏輯怒详,最后由NettyCodecAdapter的內部類調用編碼類執(zhí)行編碼操作

image.png

上圖中繼承的OneToOneEncoderNetty的抽象方法,那這個InternalEncoder是什么時候傳進來的呢跷车?

NettyClient#doOpen()方法中有如下代碼,會在編碼器橱野、解碼器等一并設置進來朽缴,從而使得最后編碼、解碼邏輯能交由Dubbo自己的類實現(xiàn)水援。

image.png

再貼一張編碼的鏈路密强。

NettyChannel#send
->一系列Netty內部的方法
->NettyCodecAdapter內部類(繼承了netty抽象類)#encode
->ExchangeCodec#encode
->ExchangeCodec#encodeRequest

image.png

可以看到最后是交由ExchangeCodec來執(zhí)行編碼的邏輯了。
那么這個鏈路就簡單介紹到這蜗元,直接來看他是怎么實現(xiàn)的編碼吧或渤。

2.2消息頭的字段的意義

先簡單列舉一下消息頭的內容,其中的魔數(shù)是用來分割處理粘包問題的奕扣。

image.png

接下來就直接來看看ExchangeCodec的實現(xiàn)

2.3 ExchangeCodec

public class ExchangeCodec extends TelnetCodec {

    // 消息頭長度
    protected static final int HEADER_LENGTH = 16;
    // 魔數(shù)內容
    protected static final short MAGIC = (short) 0xdabb;
    protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
    protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
    protected static final byte FLAG_REQUEST = (byte) 0x80;
    protected static final byte FLAG_TWOWAY = (byte) 0x40;
    protected static final byte FLAG_EVENT = (byte) 0x20;
    protected static final int SERIALIZATION_MASK = 0x1f;
    private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);

    public Short getMagicCode() {
        return MAGIC;
    }

    @Override
    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        if (msg instanceof Request) {
            // 對 Request 對象進行編碼
            encodeRequest(channel, buffer, (Request) msg);
        } else if (msg instanceof Response) {
            // 對 Response 對象進行編碼薪鹦,后面分析
            encodeResponse(channel, buffer, (Response) msg);
        } else {
            super.encode(channel, buffer, msg);
        }
    }

    protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
        Serialization serialization = getSerialization(channel);

        // 創(chuàng)建消息頭字節(jié)數(shù)組,長度為 16
        byte[] header = new byte[HEADER_LENGTH];

        // 設置魔數(shù)
        Bytes.short2bytes(MAGIC, header);

        // 設置數(shù)據(jù)包類型(Request/Response)和序列化器編號
        header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

        // 設置通信方式(單向/雙向)
        if (req.isTwoWay()) {
            header[2] |= FLAG_TWOWAY;
        }
        
        // 設置事件標識
        if (req.isEvent()) {
            header[2] |= FLAG_EVENT;
        }

        // 設置請求編號惯豆,8個字節(jié)池磁,從第4個字節(jié)開始設置
        Bytes.long2bytes(req.getId(), header, 4);

        // 獲取 buffer 當前的寫位置
        int savedWriteIndex = buffer.writerIndex();
        // 更新 writerIndex,為消息頭預留 16 個字節(jié)的空間
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
        // 創(chuàng)建序列化器楷兽,比如 Hessian2ObjectOutput
        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
        if (req.isEvent()) {
            // 對事件數(shù)據(jù)進行序列化操作
            encodeEventData(channel, out, req.getData());
        } else {
            // 對請求數(shù)據(jù)進行序列化操作
            encodeRequestData(channel, out, req.getData(), req.getVersion());
        }
        out.flushBuffer();
        if (out instanceof Cleanable) {
            ((Cleanable) out).cleanup();
        }
        bos.flush();
        bos.close();
        
        // 獲取寫入的字節(jié)數(shù)地熄,也就是消息體長度
        int len = bos.writtenBytes();
        checkPayload(channel, len);

        // 將消息體長度寫入到消息頭中
        Bytes.int2bytes(len, header, 12);

        // 將 buffer 指針移動到 savedWriteIndex,為寫消息頭做準備
        buffer.writerIndex(savedWriteIndex);
        // 從 savedWriteIndex 下標處寫入消息頭
        buffer.writeBytes(header);
        // 設置新的 writerIndex芯杀,writerIndex = 原寫下標 + 消息頭長度 + 消息體長度
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    }
    
    // 省略其他方法
}

以上就是請求對象的編碼過程端考,該過程首先會通過位運算將消息頭寫入到 header 數(shù)組中。然后對 Request對象的 data字段執(zhí)行序列化操作揭厚,序列化后的數(shù)據(jù)最終會存儲到ChannelBuffer 中却特。序列化操作執(zhí)行完后,可得到數(shù)據(jù)序列化后的長度 len筛圆,緊接著將 len寫入到 header指定位置處核偿。最后再將消息頭字節(jié)數(shù)組 header寫入到 ChannelBuffer 中,整個編碼過程就結束了顽染。

這里我們可以再來關注一下encodeEventData方法(encodeRequestData也是一樣的實現(xiàn)漾岳。)

2.4 序列化的多種實現(xiàn)

image.png

通過一系列重載方法轰绵,我們可以看到最后調用了out.writeObeject
而這個out,則是在encodeRequest方法中通過url傳過來的參數(shù)設置的尼荆,有多種不同的實現(xiàn)左腔。
image.png

image.png

有關序列化的協(xié)議,可以簡單參照下這篇博文: Dubbo協(xié)議及序列化

說完了編碼捅儒,再來說說解碼液样。

同樣的,還是從解碼的鏈路開始說起

3.解碼

3.1 InternalDecoder#messageReceived

NettyCodecAdapter的內部類InternalDecoder#messageReceived方法
同樣的巧还,這個類也與編碼的類一樣鞭莽,都是通過netty的pipeline設置進來的,上文已經(jīng)介紹過了麸祷。

image.png

調用了DubboCountCodec#decode方法

3.2 DubboCountCodec#decode

public final class DubboCountCodec implements Codec2 {
    private DubboCodec codec = new DubboCodec();
 public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        // 記錄當前讀位置
        int save = buffer.readerIndex();
        // 創(chuàng)建 MultiMessage 對象
        MultiMessage result = MultiMessage.create();
        do {
            // 解碼
            Object obj = codec.decode(channel, buffer);
            // 輸入不夠澎怒,重置讀進度
            if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
                buffer.readerIndex(save);
                break;
            // 解析到消息
            } else {
                // 添加結果消息
                result.addMessage(obj);
                // 記錄消息長度到隱式參數(shù)集合,用于 MonitorFilter 監(jiān)控
                logMessageLength(obj, buffer.readerIndex() - save);
                // 記錄當前讀位置
                save = buffer.readerIndex();
            }
        } while (true);
        // 需要更多的輸入
        if (result.isEmpty()) {
            return Codec2.DecodeResult.NEED_MORE_INPUT;
        }
        // 返回解析到的消息
        if (result.size() == 1) {
            return result.get(0);
        }
        return result;
    }
//省略其他代碼
}

這邊的codec指的就是DubboCodec阶牍。調用的是DubboCodec.decode(channel,buffer)

3.3 DubboCodec.decode(channel,buffer)

public class ExchangeCodec extends TelnetCodec {

    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        // 讀取 Header 數(shù)組
        int readable = buffer.readableBytes();
        byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
        buffer.readBytes(header);
        // 解碼
        return decode(channel, buffer, readable, header);
    }

    @Override
    protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
        // 非 Dubbo 協(xié)議喷面,目前是 Telnet 命令。
        // check magic number.
        if (readable > 0 && header[0] != MAGIC_HIGH || readable > 1 && header[1] != MAGIC_LOW) {
            // 將 buffer 完全復制到 `header` 數(shù)組中走孽。因為惧辈,上面的 `#decode(channel, buffer)` 方法,可能未讀全
            int length = header.length;
            if (header.length < readable) {
                header = Bytes.copyOf(header, readable);
                buffer.readBytes(header, length, readable - length);
            }
            // 【TODO 8026 】header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW 磕瓷?
            for (int i = 1; i < header.length - 1; i++) {
                if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                    buffer.readerIndex(buffer.readerIndex() - header.length + i);
                    header = Bytes.copyOf(header, i);
                    break;
                }
            }
            // 提交給父類( Telnet ) 處理盒齿,目前是 Telnet 命令。
            return super.   decode(channel, buffer, readable, header);
        }
        // Header 長度不夠困食,返回需要更多的輸入
        // check length.
        if (readable < HEADER_LENGTH) {
            return DecodeResult.NEED_MORE_INPUT;
        }

        // `[96 - 127]`:Body 的**長度**县昂。通過該長度,讀取 Body 陷舅。
        // get data length.
        int len = Bytes.bytes2int(header, 12);
        checkPayload(channel, len);

        // 總長度不夠倒彰,返回需要更多的輸入
        int tt = len + HEADER_LENGTH;
        if (readable < tt) {
            return DecodeResult.NEED_MORE_INPUT;
        }

        // 解析 Header + Body
        // limit input stream.
        ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
        try {
      //子類重寫的方法
            return decodeBody(channel, is, header);
        } finally {
            // skip 未讀完的流,并打印錯誤日志
            if (is.available() > 0) {
                try {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Skip input stream " + is.available());
                    }
                    StreamUtils.skipUnusedStream(is);
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
    }
//省略其他代碼
}

這邊對于再提一下莱睁,實際上這個方法是DubboCodec里的方法待讳,但是ExchangeCodecDubboCodec的父類,并且在DubboCodec沒有重寫這個方法仰剿,所以debug會跳到父類的方法行(因為代碼邏輯寫在父類里)创淡。

上面方法通過檢測消息頭中的魔數(shù)是否與規(guī)定的魔數(shù)相等,提前攔截掉非常規(guī)數(shù)據(jù)包南吮,比如通過 telnet命令行發(fā)出的數(shù)據(jù)包琳彩。接著再對消息體長度,以及可讀字節(jié)數(shù)進行檢測。最后調用 decodeBody方法進行后續(xù)的解碼工作露乏。

注意在最后的try塊中碧浊,會調用到DubboCodec的實現(xiàn)——DubboCodec#decodeBody。注意瘟仿,從頭到尾我們調用的都是DubboCodec類箱锐。

3.4 DubboCodec#decodeBody

    protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
        byte flag = header[2];
        // 獲得 Serialization 對象
        byte proto = (byte) (flag & SERIALIZATION_MASK);
        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
        // 獲得請求||響應編號
        // get request id.
        long id = Bytes.bytes2long(header, 4);
        // 解析響應
        if ((flag & FLAG_REQUEST) == 0) {
            // decode response.
            Response res = new Response(id);
            // 若是心跳事件,進行設置
            if ((flag & FLAG_EVENT) != 0) {
                res.setEvent(Response.HEARTBEAT_EVENT);
            }
            // 設置狀態(tài)
            // get status.
            byte status = header[3];
            res.setStatus(status);
            // 正常響應狀態(tài)
            if (status == Response.OK) {
                try {
                    Object data;
                    // 解碼心跳事件
                    if (res.isHeartbeat()) {
                        data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                    // 解碼其它事件
                    } else if (res.isEvent()) {
                        data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                    // 解碼普通響應
                    } else {
                        DecodeableRpcResult result;
                        // 在通信框架(例如劳较,Netty)的 IO 線程驹止,解碼
                        if (channel.getUrl().getParameter(Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                            result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(id), proto);
                            result.decode();
                        // 在 Dubbo ThreadPool 線程,解碼观蜗,使用 DecodeHandler
                        } else {
                            result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), (Invocation) getRequestData(id), proto);
                        }
                        data = result;
                    }
                    // 設置結果
                    res.setResult(data);
                } catch (Throwable t) {
                    if (log.isWarnEnabled()) {
                        log.warn("Decode response failed: " + t.getMessage(), t);
                    }
                    res.setStatus(Response.CLIENT_ERROR);
                    res.setErrorMessage(StringUtils.toString(t));
                }
            // 異常響應狀態(tài)
            } else {
                res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
            }
            return res;
        // 解析請求
        } else {
            // decode request.
            Request req = new Request(id);
            req.setVersion("2.0.0");
            // 是否需要響應
            req.setTwoWay((flag & FLAG_TWOWAY) != 0);
            // 若是心跳事件臊恋,進行設置
            if ((flag & FLAG_EVENT) != 0) {
                req.setEvent(Request.HEARTBEAT_EVENT);
            }
            try {
                Object data;
                // 解碼心跳事件
                if (req.isHeartbeat()) {
                    data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                // 解碼其它事件
                } else if (req.isEvent()) {
                    data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                // 解碼普通請求
                } else {
                    // 在通信框架(例如,Netty)的 IO 線程墓捻,解碼
                    DecodeableRpcInvocation inv;
                    if (channel.getUrl().getParameter(Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                        inv = new DecodeableRpcInvocation(channel, req, is, proto);
                        inv.decode();
                    // 在 Dubbo ThreadPool 線程抖仅,解碼,使用 DecodeHandler
                    } else {
                        inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                    }
                    data = inv;
                }
                req.setData(data);
            } catch (Throwable t) {
                if (log.isWarnEnabled()) {
                    log.warn("Decode request failed: " + t.getMessage(), t);
                }
                // bad request
                req.setBroken(true);
                req.setData(t);
            }
            return req;
        }
    }

如上毙替,decodeBody對部分字段進行了解碼岸售,并將解碼得到的字段封裝到 Request 中践樱。隨后會調用 DecodeableRpcInvocation#decode 方法進行后續(xù)的解碼工作袱院。
要么是在本線程內解碼环肘,要么是交由work線程池執(zhí)行复哆,會在Dubbo的線程模型益涧、handler
講解如何交給其執(zhí)行久免,又是怎么執(zhí)行的摔握。

再來看一下DecodeableRpcInvocation#decode方法

3.5 DecodeableRpcInvocation#decode

public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Decodeable {
    /**
     * 是否已經(jīng)解碼完成
     */
    private volatile boolean hasDecoded;

    @Override
    public void decode() {
        if (!hasDecoded && channel != null && inputStream != null) {
            try {
                decode(channel, inputStream);
            } catch (Throwable e) {
                if (log.isWarnEnabled()) {
                    log.warn("Decode rpc invocation failed: " + e.getMessage(), e);
                }
                request.setBroken(true);
                request.setData(e);
            } finally {
                hasDecoded = true;
            }
        }
    }

    
    @Override
    public Object decode(Channel channel, InputStream input) throws IOException {
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
                .deserialize(channel.getUrl(), input);

        // 通過反序列化得到 dubbo version,并保存到 attachments 變量中
        String dubboVersion = in.readUTF();
        request.setVersion(dubboVersion);
        setAttachment(Constants.DUBBO_VERSION_KEY, dubboVersion);

        // 通過反序列化得到 path,version,并保存到 attachments 變量中
        setAttachment(Constants.PATH_KEY, in.readUTF());
        setAttachment(Constants.VERSION_KEY, in.readUTF());

        // 通過反序列化得到調用方法名
        setMethodName(in.readUTF());
        try {
            Object[] args;
            Class<?>[] pts;
            // 通過反序列化得到參數(shù)類型字符串逻恐,比如 Ljava/lang/String;
            String desc = in.readUTF();
            if (desc.length() == 0) {
                pts = DubboCodec.EMPTY_CLASS_ARRAY;
                args = DubboCodec.EMPTY_OBJECT_ARRAY;
            } else {
                // 將 desc 解析為參數(shù)類型數(shù)組
                pts = ReflectUtils.desc2classArray(desc);
                args = new Object[pts.length];
                for (int i = 0; i < args.length; i++) {
                    try {
                        // 解析運行時參數(shù)
                        args[i] = in.readObject(pts[i]);
                    } catch (Exception e) {
                        if (log.isWarnEnabled()) {
                            log.warn("Decode argument failed: " + e.getMessage(), e);
                        }
                    }
                }
            }
            
            // 設置參數(shù)類型數(shù)組
            setParameterTypes(pts);

            // 通過反序列化得到原 attachment 的內容
            Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
            if (map != null && map.size() > 0) {
                Map<String, String> attachment = getAttachments();
                if (attachment == null) {
                    attachment = new HashMap<String, String>();
                }
                // 將 map 與當前對象中的 attachment 集合進行融合
                attachment.putAll(map);
                setAttachments(attachment);
            }
            
            // 對 callback 類型的參數(shù)進行處理
            for (int i = 0; i < args.length; i++) {
                args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
            }

            // 設置參數(shù)列表
            setArguments(args);

        } catch (ClassNotFoundException e) {
            throw new IOException(StringUtils.toString("Read invocation data failed.", e));
        } finally {
            if (in instanceof Cleanable) {
                ((Cleanable) in).cleanup();
            }
        }
        return this;
    }
}

可以看到DecodeableRpcInvocation #decode會先判斷是否已經(jīng)解碼完成(這很重要,在交由Dubbo線程池執(zhí)行的時候也會進到這個方法,如果已經(jīng)解碼過,就不進行下面的流程,如果已經(jīng)沒解碼過,那么就會幫助執(zhí)行解碼操作),如果沒有解碼過,調用decode的重載方法。
重載方法通過反序列化將諸如 pathversion途戒、調用方法名继准、參數(shù)列表等信息依次解析出來,并設置到相應的字段中毡鉴,最終得到一個具有完整調用信息的 DecodeableRpcInvocation 對象崔泵。

3.6解碼的方法調用順序

所以解碼調用的順序為:

NettyCodecAdapter的內部類InternalDecoder#messageReceived
  ->DubboCountCodec#decode
    ->DubboCodec#decode(channel,buffer)//父類實現(xiàn)的方法
      ->DubboCodec#decode(channel,buffer,readable,header)//父類實現(xiàn)的方法
        ->Dubbo#decodeBody
          ->DecodeableRpcInvocation#decode//交由本線程或者業(yè)務線程池執(zhí)行
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市猪瞬,隨后出現(xiàn)的幾起案子憎瘸,更是在濱河造成了極大的恐慌,老刑警劉巖陈瘦,帶你破解...
    沈念sama閱讀 218,607評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件幌甘,死亡現(xiàn)場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機锅风,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,239評論 3 395
  • 文/潘曉璐 我一進店門酥诽,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人皱埠,你說我怎么就攤上這事肮帐。” “怎么了边器?”我有些...
    開封第一講書人閱讀 164,960評論 0 355
  • 文/不壞的土叔 我叫張陵训枢,是天一觀的道長。 經(jīng)常有香客問我忘巧,道長肮砾,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,750評論 1 294
  • 正文 為了忘掉前任袋坑,我火速辦了婚禮仗处,結果婚禮上,老公的妹妹穿的比我還像新娘枣宫。我一直安慰自己婆誓,他們只是感情好,可當我...
    茶點故事閱讀 67,764評論 6 392
  • 文/花漫 我一把揭開白布也颤。 她就那樣靜靜地躺著洋幻,像睡著了一般。 火紅的嫁衣襯著肌膚如雪翅娶。 梳的紋絲不亂的頭發(fā)上文留,一...
    開封第一講書人閱讀 51,604評論 1 305
  • 那天,我揣著相機與錄音竭沫,去河邊找鬼燥翅。 笑死,一個胖子當著我的面吹牛蜕提,可吹牛的內容都是我干的森书。 我是一名探鬼主播,決...
    沈念sama閱讀 40,347評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼谎势,長吁一口氣:“原來是場噩夢啊……” “哼凛膏!你這毒婦竟也來了?” 一聲冷哼從身側響起脏榆,我...
    開封第一講書人閱讀 39,253評論 0 276
  • 序言:老撾萬榮一對情侶失蹤猖毫,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后须喂,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體吁断,經(jīng)...
    沈念sama閱讀 45,702評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡典唇,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,893評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了胯府。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片介衔。...
    茶點故事閱讀 40,015評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖骂因,靈堂內的尸體忽然破棺而出炎咖,到底是詐尸還是另有隱情,我是刑警寧澤寒波,帶...
    沈念sama閱讀 35,734評論 5 346
  • 正文 年R本政府宣布乘盼,位于F島的核電站,受9級特大地震影響俄烁,放射性物質發(fā)生泄漏绸栅。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,352評論 3 330
  • 文/蒙蒙 一页屠、第九天 我趴在偏房一處隱蔽的房頂上張望粹胯。 院中可真熱鬧,春花似錦辰企、人聲如沸风纠。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,934評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽竹观。三九已至,卻和暖如春潜索,著一層夾襖步出監(jiān)牢的瞬間臭增,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,052評論 1 270
  • 我被黑心中介騙來泰國打工竹习, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留誊抛,地道東北人。 一個月前我還...
    沈念sama閱讀 48,216評論 3 371
  • 正文 我出身青樓由驹,卻偏偏與公主長得像芍锚,于是被迫代替她去往敵國和親昔园。 傳聞我的和親對象是個殘疾皇子蔓榄,可洞房花燭夜當晚...
    茶點故事閱讀 44,969評論 2 355