Dubbo編碼解碼

前兩章中講到了Dubbo的服務(wù)啟動以及服務(wù)引用過程梭依,之后的篇章開始講解一下Dubbo的其他模塊,開始在在來的基礎(chǔ)上講一下細(xì)節(jié)的內(nèi)容典尾。本章主要講解Dubbo的編碼解碼內(nèi)容役拴。

編碼(encode):我們可以粗略的把這里的編碼粗略地理解為Java的序列化,即把POJO對象轉(zhuǎn)化為byte數(shù)據(jù)在網(wǎng)絡(luò)里傳輸钾埂。
解碼(decode):我們可以粗略的把這里的解碼粗略地理解為Java的反序列化河闰,即把從網(wǎng)絡(luò)中接收到的byte數(shù)據(jù)轉(zhuǎn)化為對應(yīng)的POJO對象供業(yè)務(wù)方使用。

說到Dubbo的編碼解碼褥紫,一定要理解的是Dubbo的數(shù)據(jù)包模型姜性。


Dubbo的包結(jié)構(gòu)

理解上述內(nèi)容的前提是對網(wǎng)絡(luò)有一定認(rèn)識,如果對基本的網(wǎng)絡(luò)沒有概念的話請先自行查詢相關(guān)資料看一下TCP/IP協(xié)議髓考。
因為Socket的底層已經(jīng)幫我們處理了IP/TCP的分包部念,我們不需要解析IP/TCP的包結(jié)構(gòu),處理包頭信息等。但是Dubbo作為一個封裝性的協(xié)議儡炼,需要自己處理一下包頭妓湘,包體的內(nèi)容處理,粘包也需要自行進行處理乌询。
我們首先明確一下Dubbo的包頭都有哪些成分:

  • 2個字節(jié)的魔數(shù)(標(biāo)示Dubbo的協(xié)議)
  • 1個字節(jié)的消息標(biāo)志位
    • 5個比特位的序列化ID
    • 1個比特位的事件類型(區(qū)分心跳還是正常的請求/響應(yīng)信息)
    • 1個比特表示是oneWay還是twoWay請求
    • 1個比特表示是request還是response
  • 1個字節(jié)的狀態(tài)位(標(biāo)示Request和Response的狀態(tài))
  • 8個字節(jié)的消息ID(requestId/responseId)
  • 4個字節(jié)的數(shù)據(jù)長度

從這里看到消息頭的長度固定是16個字節(jié)多柑,然后我們根據(jù)header中的數(shù)據(jù)長度字段就能知道真是的body長度,這樣就可以解析到具體的body信息了楣责。

明白了上面的內(nèi)容竣灌,我們還要再看一下Dubbo中對于數(shù)據(jù)緩存區(qū)的處理,Dubbo在使用數(shù)據(jù)緩存區(qū)的時候用到了自己的類:ChannelBuffer(不同于Netty的ChannelBuffer)秆麸,在處理編解碼的過程中經(jīng)常用到這個數(shù)據(jù)緩存區(qū)初嘹,所以首先對于這個類做一定介紹:

  •  | discardable bytes |  readable bytes  |  writable bytes  |
    
  •  +-------------------+------------------+------------------+
    
  •  |                   |                  |                  |
    
  •  0      <=      readerIndex   <=   writerIndex    <=    capacity
    

ChannelBuffer中主要有兩個游標(biāo),readerIndex和writerIndex沮趣,基本含義就是readerIndex記錄上一次讀取到的數(shù)據(jù)的位置屯烦,writerIndex記錄上一次寫數(shù)據(jù)的終點位置,初始情況下readerIndex=wirterIndex=0房铭,讀數(shù)據(jù)和寫數(shù)據(jù)的時候游標(biāo)都會自動移動驻龟。下面看一下基本的操作:

  • void clear(); 將readerIndex和writerIndex都設(shè)置為0,但是不清除原來的數(shù)據(jù)
  • void discardReadBytes(); 將readerIndex和writerIndex對應(yīng)的數(shù)據(jù)都往前移動(readerIndex個單位)
  • byte getByte(int index); 獲得特定位置的byte值
  • void getBytes(int index, byte[] dst); 將從index開始的數(shù)據(jù)copy到dst中
  • boolean readable(); 判斷緩存區(qū)是否有可讀取的內(nèi)容
  • int readableBytes(); 計算緩存區(qū)中可讀取的數(shù)據(jù)長度
  • byte readByte(); 在當(dāng)前的游標(biāo)位置讀一個字節(jié)的數(shù)據(jù)
  • void readBytes(byte[] dst); 從當(dāng)前位置讀取dst.length長度的內(nèi)容到dst中

其余的write*方法跟read*方法都是對應(yīng)的缸匪,不再一一介紹翁狐。
因為NettyServer和NettyClient的編碼解碼器是同一套,所以我們就選擇NettyServer為例來講解這個問題凌蔬。

編碼

編碼器主要就是NettyCodecAdapter.getDecoder()對象進行的露懒,下面還是直接擼代碼:

    //編碼的起始位置就是這個Encoder的encode方法
    private class InternalEncoder extends OneToOneEncoder {

        @Override
        protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
            //dynamicBuffer表示動態(tài)的buffer空間,擴自動擴展空間大小
            com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
                com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);
            //如果存在相關(guān)通道就直接獲取砂心,否則就新建
            NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
            try {
                //將編碼后的結(jié)果存入buffer中
                codec.encode(channel, buffer, msg);
            } finally {
                NettyChannel.removeChannelIfDisconnected(ch);
            }
            //將編碼后可讀的信息重新包裝后再往后傳遞
            return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());
        }
    }
    //有dubbo的默認(rèn)配置直到這里的Codec就是DubboCountCodec懈词,所以我們接著看一下DubboCountCodec.encode()
    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        //還是回到了DubboCodec
        codec.encode(channel, buffer, msg);
    }
    
    //首先進入到DubboCountCode的夫類ExchangeCodec中的encode方法
    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        //根據(jù)msg的類型確定是編碼請求,響應(yīng)或者talnet信息辩诞,然后找到對應(yīng)的編碼方法
        //在具體的實現(xiàn)上因為encodeResponse跟encodeRequest差不多坎弯,所以就以encodeRequest為例往下面繼續(xù)講解
        if (msg instanceof Request) {
            encodeRequest(channel, buffer, (Request) msg);
        } else if (msg instanceof Response) {
            encodeResponse(channel, buffer, (Response) msg);
        } else {
            super.encode(channel, buffer, msg);
        }
    }
    
    //dubbo自己封裝了一個buffer使用,并沒有使用netty的buffer译暂,為了避免耦合
    protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
        //默認(rèn)使用Hession序列化
        Serialization serialization = getSerialization(channel);
        // 16個字節(jié)的header.
        byte[] header = new byte[HEADER_LENGTH];
        // 2個字節(jié)的魔數(shù)
        Bytes.short2bytes(MAGIC, header);

        // 5個bit的Serialization id
        header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

        if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
        if (req.isEvent()) header[2] |= FLAG_EVENT;

        // 8個字節(jié)的RequestId
        Bytes.long2bytes(req.getId(), header, 4);

        // 先對body進行編碼才能確定實際的body長度
        int savedWriteIndex = buffer.writerIndex();
        // 控制頭部的長度
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
        if (req.isEvent()) {
            encodeEventData(channel, out, req.getData());
        } else {
            //實際上干的事情就是把body編碼的數(shù)據(jù)放到buffer中驯绎,bos只是buffer的裝飾者
            encodeRequestData(channel, out, req.getData());
        }
        out.flushBuffer();
        bos.flush();
        bos.close();
        //寫到buf中實際大小
        int len = bos.writtenBytes();
        //默認(rèn)只支持8M的最大大小
        checkPayload(channel, len);
        //計算實際的body長度并寫到buf中
        Bytes.int2bytes(len, header, 12);

        //定位到原始的write節(jié)點(header的起始位置)
        buffer.writerIndex(savedWriteIndex);
        buffer.writeBytes(header);
        //更新buffer的write為body結(jié)束的位置
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    }
    
    //encode方法因為不不要考慮消息的邊界問題宇姚,所以比較簡單
    protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException {
        //序列化消息體(不包含消息頭)的實際內(nèi)容
        RpcInvocation inv = (RpcInvocation) data;
        //dubbo版本信息
        out.writeUTF(inv.getAttachment(Constants.DUBBO_VERSION_KEY, DUBBO_VERSION));
        //接口名稱
        out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
        //服務(wù)版本
        out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));
        //接口方法名稱
        out.writeUTF(inv.getMethodName());
        //參數(shù)類型
        out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
        Object[] args = inv.getArguments();
        //具體的參數(shù)值
        if (args != null)
        for (int i = 0; i < args.length; i++){
            //encodeInvocationArgument主要處理回調(diào)相關(guān)的信息
            out.writeObject(encodeInvocationArgument(channel, inv, i));
        }
        //附屬信息
        out.writeObject(inv.getAttachments());
    }

編碼的邏輯相對于解碼來說是比較簡單的闸拿,因為并不涉及處理粘包信息以及多包信息针肥,只是純粹的將包信息編碼之后然后發(fā)出去衰抑,但是這了我們并沒有提到hession的具體編碼步驟象迎,因為本身編碼的過程就是一個非常深的話題,涉及到比較多的數(shù)學(xué)邏輯,也不是這次講解的重點砾淌,所以這里只看到了dubbo在其之上有做多少內(nèi)容啦撮。


解碼

解碼器的入口也是在NettyServer中的NettyCodecAdaptor,解碼主要需要的注意地方就是:每次收到的數(shù)據(jù)包可能是不完整的汪厨,有可能收到的數(shù)據(jù)包只包含數(shù)據(jù)頭赃春,或者數(shù)據(jù)頭都是不完整的,也有可能包含消息體劫乱,但是消息體是不完整的织中,這些都是需要考慮的事情,下面我們看一下具體的解碼實現(xiàn):

    private class InternalDecoder extends SimpleChannelUpstreamHandler {
        //內(nèi)部公用的數(shù)據(jù)緩存區(qū)
        private com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
            com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;

        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
            Object o = event.getMessage();
            //編碼的結(jié)果就是ChannelBuffer
            if (! (o instanceof ChannelBuffer)) {
                ctx.sendUpstream(event);
                return;
            }

            ChannelBuffer input = (ChannelBuffer) o;
            int readable = input.readableBytes();
            //如果信息的載體衷戈,input中并沒有更多的可讀數(shù)據(jù)那就直接返回
            if (readable <= 0) {
                return;
            }

            com.alibaba.dubbo.remoting.buffer.ChannelBuffer message;
            //buffer在初次使用的時候是空的狭吼,但是之后就有可能不為空
            if (buffer.readable()) {
                //每次都復(fù)用buffer,并將之前遺留的buffer與input的內(nèi)容匯總起來一并合成message存起來
                if (buffer instanceof DynamicChannelBuffer) {
                    buffer.writeBytes(input.toByteBuffer());
                    message = buffer;
                } else {
                    //如果buffer不是動態(tài)的話要生成一個動態(tài)的buffer用來存放新的匯總信息
                    int size = buffer.readableBytes() + input.readableBytes();
                    message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(
                        size > bufferSize ? size : bufferSize);
                    message.writeBytes(buffer, buffer.readableBytes());
                    message.writeBytes(input.toByteBuffer());
                }
            } else {
                //接到第一個dubbo包的話buffer還是空的殖妇,所以在這里會把讀到的消息放到buffer中
                message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer(
                    input.toByteBuffer());
            }
            //經(jīng)過上面的邏輯處理之后message就是新的匯總信息(上次未處理的+本次新收到的)
            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
            Object msg;
            int saveReaderIndex;
            try {
                do {
                    saveReaderIndex = message.readerIndex();
                    try {
                        //DubboCountCodec.decode(代碼見下)
                        msg = codec.decode(channel, message);
                    } catch (IOException e) {
                        buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                        throw e;
                    }
                    //目前的信息解析不出來一個完成的dubbo刁笙,因此會跳出循環(huán),直到下個數(shù)據(jù)包的到來
                    if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                        message.readerIndex(saveReaderIndex);
                        break;
                    } else {
                        //兩次的readerIndex一樣說明decode沒有進行
                        if (saveReaderIndex == message.readerIndex()) {
                            buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                            throw new IOException("Decode without read data.");
                        }
                        //如果有讀到解碼的信息谦趣,將解碼后的信息繼續(xù)往下游發(fā)送
                        //這時候的msg就是對應(yīng)的Request疲吸,Response或者心跳
                        if (msg != null) {
                            Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
                        }
                    }
                } while (message.readable());
            } finally {
                if (message.readable()) {
                    //清除已經(jīng)讀過的空間
                    message.discardReadBytes();
                    buffer = message;
                } else {
                    buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                }
                NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
            }
        }
    }
    //DubboCountCodec.decode
    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        int save = buffer.readerIndex();
        MultiMessage result = MultiMessage.create();
        //可能由于網(wǎng)絡(luò)原因,一次接收到比較多的的數(shù)據(jù)包
        do {
            Object obj = codec.decode(channel, buffer);
            //如果結(jié)果表示為需要更多的數(shù)據(jù)的話前鹅,就將原本保持的readerIndex設(shè)置回去摘悴,因為在decode的時候可能改變了readerIndex
            if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
                buffer.readerIndex(save);
                break;
            } else {
                result.addMessage(obj);
                //將解碼后的結(jié)果大小記錄到result中
                logMessageLength(obj, buffer.readerIndex() - save);
                save = buffer.readerIndex();
            }
        } while (true);
        //表示收到的數(shù)據(jù)不完整或者本次編碼結(jié)束,等待下一個數(shù)據(jù)到來
        if (result.isEmpty()) {
            return Codec2.DecodeResult.NEED_MORE_INPUT;
        }
        if (result.size() == 1) {
            return result.get(0);
        }
        return result;
    }
    
    //ExchangeCodec.decode()
    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        int readable = buffer.readableBytes();
        //如果readable小于HEADER_LENGTH舰绘,說明是一個不全的dubbo消息
        byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
        buffer.readBytes(header);
        return decode(channel, buffer, readable, header);
    }
    //ExchangeCodec.decode()
    protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
        // 如果header的前兩個字節(jié)不是魔數(shù)的話
        if (readable > 0 && header[0] != MAGIC_HIGH 
                || readable > 1 && header[1] != MAGIC_LOW) {
            int length = header.length;
            //說明頭部是完整的
            if (header.length < readable) {
                header = Bytes.copyOf(header, readable);
                buffer.readBytes(header, length, readable - length);
            }
            //逐步檢查buffer中是否有魔數(shù)開頭的header烦租,有的話就直接讀取到header數(shù)組中
            for (int i = 1; i < header.length - 1; i ++) {
                if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                    buffer.readerIndex(buffer.readerIndex() - header.length + i);
                    header = Bytes.copyOf(header, i);
                    break;
                }
            }
            //處理父類telnet的解碼
            return super.decode(channel, buffer, readable, header);
        }
        // 數(shù)據(jù)不全(頭部不完整)
        if (readable < HEADER_LENGTH) {
            return DecodeResult.NEED_MORE_INPUT;
        }

        //獲取body長度,檢查8M最大限制
        int len = Bytes.bytes2int(header, 12);
        checkPayload(channel, len);

        int tt = len + HEADER_LENGTH;
        // 數(shù)據(jù)不全(body不完整)
        if( readable < tt ) {
            return DecodeResult.NEED_MORE_INPUT;
        }

        // limit input stream.
        ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

        try {
            //DubboCodec.decodeBody
            return decodeBody(channel, is, header);
        } finally {
            if (is.available() > 0) {
                try {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Skip input stream " + is.available());
                    }
                    StreamUtils.skipUnusedStream(is);
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
    }
    
    //DubboCodec.decodeBody
    protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
        byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
        // 獲得請求的ID
        long id = Bytes.bytes2long(header, 4);
        if ((flag & FLAG_REQUEST) == 0) {
            // 如果是response類型
            Response res = new Response(id);
            if ((flag & FLAG_EVENT) != 0) {
                res.setEvent(Response.HEARTBEAT_EVENT);
            }
            // 獲得狀態(tài)字段
            byte status = header[3];
            res.setStatus(status);
            if (status == Response.OK) {
                try {
                    Object data;
                    //如果是心跳事件的話除盏,直接通過解碼器讀取數(shù)據(jù)
                    if (res.isHeartbeat()) {
                        data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                    } else if (res.isEvent()) {
                        data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                    } else {
                        //將原來的RPCResult轉(zhuǎn)換為DecodeableRpcResult叉橱,轉(zhuǎn)換后的DecodeableRpcResult里面的data為解碼后的結(jié)果信息
                        DecodeableRpcResult result;
                        if (channel.getUrl().getParameter(
                            Constants.DECODE_IN_IO_THREAD_KEY,
                            Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                            result = new DecodeableRpcResult(channel, res, is,
                                                             (Invocation)getRequestData(id), proto);
                            result.decode();
                        } else {
                        //如果解碼操作不在IO線程中做的話可以到線程池中做,具體的操作就是DecodeHandler處理類者蠕,處理的步驟跟上面一樣窃祝,只不過時機不同
                            result = new DecodeableRpcResult(channel, res,
                                                             new UnsafeByteArrayInputStream(readMessageData(is)),
                                                             (Invocation) getRequestData(id), proto);
                        }
                        data = result;
                    }
                    res.setResult(data);
                } catch (Throwable t) {
                    if (log.isWarnEnabled()) {
                        log.warn("Decode response failed: " + t.getMessage(), t);
                    }
                    //解碼Response是客戶端的內(nèi)容,所以設(shè)置錯誤碼為CLIENT_ERROR
                    res.setStatus(Response.CLIENT_ERROR);
                    res.setErrorMessage(StringUtils.toString(t));
                }
            } else {
                //如果Response狀態(tài)不正常的話就直接設(shè)置解碼信息為錯誤信息
                res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
            }
            return res;
        } else {
            //解碼請求
            Request req = new Request(id);
            req.setVersion("2.0.0");
            req.setTwoWay((flag & FLAG_TWOWAY) != 0);
            if ((flag & FLAG_EVENT) != 0) {
                req.setEvent(Request.HEARTBEAT_EVENT);
            }
            try {
                Object data;
                //如果請求時心跳事件的話就直接通過解碼器解碼
                if (req.isHeartbeat()) {
                    data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                } else if (req.isEvent()) {
                    data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                } else {
                    
                    DecodeableRpcInvocation inv;
                    if (channel.getUrl().getParameter(
                        Constants.DECODE_IN_IO_THREAD_KEY,
                        Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                        inv = new DecodeableRpcInvocation(channel, req, is, proto);
                        inv.decode();
                    } else {
                    //如果解碼操作不在IO線程中做的話可以到線程池中做踱侣,具體的操作就是DecodeHandler處理類粪小,處理的步驟跟上面一樣,只不過時機不同
                        inv = new DecodeableRpcInvocation(channel, req,
                                                          new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                    }
                    data = inv;
                }
                req.setData(data);
            } catch (Throwable t) {
                if (log.isWarnEnabled()) {
                    log.warn("Decode request failed: " + t.getMessage(), t);
                }
                // bad request
                req.setBroken(true);
                req.setData(t);
            }
            return req;
        }
    }
    //DecodeableRpcResult.decode()
    public Object decode(Channel channel, InputStream input) throws IOException {
        //根據(jù)serializationType獲得對應(yīng)的Hession序列化類
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
            .deserialize(channel.getUrl(), input);

        //這個flag的讀取需要深入到Hession2的內(nèi)部理解
        //這里的in其實就是封裝類具體的序列化框架裝飾者
        byte flag = in.readByte();
        switch (flag) {
            case DubboCodec.RESPONSE_NULL_VALUE:
                break;
            case DubboCodec.RESPONSE_VALUE:
                try {
                    //返回結(jié)果:Type[]{method.getReturnType(), method.getGenericReturnType()}
                    Type[] returnType = RpcUtils.getReturnTypes(invocation);
                    setValue(returnType == null || returnType.length == 0 ? in.readObject() :
                                 (returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
                                     : in.readObject((Class<?>) returnType[0], returnType[1])));
                } catch (ClassNotFoundException e) {
                    throw new IOException(StringUtils.toString("Read response data failed.", e));
                }
                break;
            case DubboCodec.RESPONSE_WITH_EXCEPTION:
                try {
                    Object obj = in.readObject();
                    if (obj instanceof Throwable == false)
                        throw new IOException("Response data error, expect Throwable, but get " + obj);
                    setException((Throwable) obj);
                } catch (ClassNotFoundException e) {
                    throw new IOException(StringUtils.toString("Read response data failed.", e));
                }
                break;
            default:
                throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
        }
        return this;
    }
    
    //DecodeableRpcInvocation.decode
    public Object decode(Channel channel, InputStream input) throws IOException {
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
            .deserialize(channel.getUrl(), input);
        //這里的讀取順序不能變抡句,一定要跟編碼的順序保持一致探膊,否則信息將錯亂
        //依次從緩存區(qū)讀取dubbo版本信息,path和version
        setAttachment(Constants.DUBBO_VERSION_KEY, in.readUTF());
        setAttachment(Constants.PATH_KEY, in.readUTF());
        setAttachment(Constants.VERSION_KEY, in.readUTF());
        //讀取dubbo的方法
        setMethodName(in.readUTF());
        try {
            Object[] args;
            Class<?>[] pts;
            //依次構(gòu)建方法參數(shù)類型即實際方法參數(shù)
            String desc = in.readUTF();
            if (desc.length() == 0) {
                pts = DubboCodec.EMPTY_CLASS_ARRAY;
                args = DubboCodec.EMPTY_OBJECT_ARRAY;
            } else {
                pts = ReflectUtils.desc2classArray(desc);
                args = new Object[pts.length];
                for (int i = 0; i < args.length; i++) {
                    try {
                        args[i] = in.readObject(pts[i]);
                    } catch (Exception e) {
                        if (log.isWarnEnabled()) {
                            log.warn("Decode argument failed: " + e.getMessage(), e);
                        }
                    }
                }
            }
            setParameterTypes(pts);

            //讀取attachments信息并且設(shè)置到DecodeableRpcInvocation
            Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
            if (map != null && map.size() > 0) {
                Map<String, String> attachment = getAttachments();
                if (attachment == null) {
                    attachment = new HashMap<String, String>();
                }
                attachment.putAll(map);
                setAttachments(attachment);
            }
            //decode argument ,may be callback
            for (int i = 0; i < args.length; i++) {
                args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
            }

            setArguments(args);

        } catch (ClassNotFoundException e) {
            throw new IOException(StringUtils.toString("Read invocation data failed.", e));
        }
        return this;
    }

到此待榔,dubbo的編碼解碼都已經(jīng)涉及到了逞壁。我們從中可以感受到設(shè)計的巧妙之處流济,我覺得最巧妙的地方就是:不會多處理無用的內(nèi)容,例如在序列化的時候只是將request對應(yīng)的數(shù)據(jù)內(nèi)容(方法信息腌闯,版本信息绳瘟,必要的attachments等)進行編碼然后傳輸,而不是直接request對象直接作為序列化的對象進行序列化姿骏,這么做既較少了傳輸?shù)膬?nèi)容糖声,還削減了序列化的難度(最起碼不用考慮request自身的內(nèi)容)。在解碼的時候通過requestId重新構(gòu)建出對應(yīng)的Request信息分瘦,然后從網(wǎng)絡(luò)中將接收到的信息一一進行反序列化之后重新塞到新構(gòu)建的Request對象中蘸泻。正是因為序列化和反序化都是耗時操作,所以能減輕的話就減輕其操作步驟嘲玫,這里還是十分科學(xué)的蟋恬,不由得感嘆:工程師對于架構(gòu)的把控力果然牛逼!

插播一條花邊新聞:關(guān)于request和response的映射都是通過ID來做的趁冈,并在創(chuàng)建ID時候會在一個全局的FUTURE里面存入信息(我們可以把FUTURE理解為一個redis)歼争,在創(chuàng)建DefaultFuture的時候?qū)⑵浞湃雛edis(請求的處理生命周期內(nèi)),然后在之后的任何時間都可以隨時拿到這個請求ID(在請求中帶了request ID渗勘,還記得嗎沐绒?),然后在客戶端收到Response的時候通過ID找到Future旺坠,再把結(jié)果設(shè)置到Future中乔遮,這樣就完成了同步和異步的轉(zhuǎn)換,是不是很牛叉取刃。

今天的內(nèi)容就這樣子了蹋肮,撒花~~~啦啦啦~~~

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市璧疗,隨后出現(xiàn)的幾起案子坯辩,更是在濱河造成了極大的恐慌,老刑警劉巖崩侠,帶你破解...
    沈念sama閱讀 216,544評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件漆魔,死亡現(xiàn)場離奇詭異,居然都是意外死亡却音,警方通過查閱死者的電腦和手機改抡,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,430評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來系瓢,“玉大人阿纤,你說我怎么就攤上這事∫穆” “怎么了欠拾?”我有些...
    開封第一講書人閱讀 162,764評論 0 353
  • 文/不壞的土叔 我叫張陵胰锌,是天一觀的道長。 經(jīng)常有香客問我清蚀,道長,這世上最難降的妖魔是什么爹谭? 我笑而不...
    開封第一講書人閱讀 58,193評論 1 292
  • 正文 為了忘掉前任枷邪,我火速辦了婚禮,結(jié)果婚禮上诺凡,老公的妹妹穿的比我還像新娘东揣。我一直安慰自己,他們只是感情好腹泌,可當(dāng)我...
    茶點故事閱讀 67,216評論 6 388
  • 文/花漫 我一把揭開白布嘶卧。 她就那樣靜靜地躺著,像睡著了一般凉袱。 火紅的嫁衣襯著肌膚如雪芥吟。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,182評論 1 299
  • 那天专甩,我揣著相機與錄音钟鸵,去河邊找鬼。 笑死涤躲,一個胖子當(dāng)著我的面吹牛棺耍,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播种樱,決...
    沈念sama閱讀 40,063評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼蒙袍,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了嫩挤?” 一聲冷哼從身側(cè)響起害幅,我...
    開封第一講書人閱讀 38,917評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎岂昭,沒想到半個月后矫限,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,329評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡佩抹,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,543評論 2 332
  • 正文 我和宋清朗相戀三年叼风,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片棍苹。...
    茶點故事閱讀 39,722評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡无宿,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出枢里,到底是詐尸還是另有隱情孽鸡,我是刑警寧澤蹂午,帶...
    沈念sama閱讀 35,425評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站彬碱,受9級特大地震影響豆胸,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜巷疼,卻給世界環(huán)境...
    茶點故事閱讀 41,019評論 3 326
  • 文/蒙蒙 一晚胡、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧嚼沿,春花似錦估盘、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,671評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至攀细,卻和暖如春箫踩,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背谭贪。 一陣腳步聲響...
    開封第一講書人閱讀 32,825評論 1 269
  • 我被黑心中介騙來泰國打工班套, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人故河。 一個月前我還...
    沈念sama閱讀 47,729評論 2 368
  • 正文 我出身青樓吱韭,卻偏偏與公主長得像,于是被迫代替她去往敵國和親鱼的。 傳聞我的和親對象是個殘疾皇子理盆,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,614評論 2 353

推薦閱讀更多精彩內(nèi)容

  • 編碼問題一直困擾著開發(fā)人員,尤其在 Java 中更加明顯凑阶,因為 Java 是跨平臺語言猿规,不同平臺之間編碼之間的切換...
    x360閱讀 2,478評論 1 20
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)宙橱,斷路器姨俩,智...
    卡卡羅2017閱讀 134,652評論 18 139
  • 為什么要編碼 不知道大家有沒有想過一個問題,那就是為什么要編碼师郑?我們能不能不編碼环葵?要回答這個問題必須要回到計算機是...
    艾小天兒閱讀 17,303評論 0 2
  • 我不記得什么時候開始和家人分開了,和爸爸媽媽宝冕,弟弟妹妹张遭,和他們之間變的越來越疏遠。 我們可以好幾年不聯(lián)系地梨,他們都不...
    拉鋸之戰(zhàn)閱讀 177評論 0 0
  • 隨著社會的發(fā)展菊卷,現(xiàn)在的整形醫(yī)院缔恳,微創(chuàng),半永久定妝等等特別多洁闰,女人都想要讓自己更漂亮歉甚。 7月份去學(xué)習(xí)平衡易容術(shù)的時候...
    紫翼天葵閱讀 218評論 0 6