前兩章中講到了Dubbo的服務(wù)啟動以及服務(wù)引用過程梭依,之后的篇章開始講解一下Dubbo的其他模塊,開始在在來的基礎(chǔ)上講一下細(xì)節(jié)的內(nèi)容典尾。本章主要講解Dubbo的編碼解碼內(nèi)容役拴。
編碼(encode):我們可以粗略的把這里的編碼粗略地理解為Java的序列化,即把POJO對象轉(zhuǎn)化為byte數(shù)據(jù)在網(wǎng)絡(luò)里傳輸钾埂。
解碼(decode):我們可以粗略的把這里的解碼粗略地理解為Java的反序列化河闰,即把從網(wǎng)絡(luò)中接收到的byte數(shù)據(jù)轉(zhuǎn)化為對應(yīng)的POJO對象供業(yè)務(wù)方使用。
說到Dubbo的編碼解碼褥紫,一定要理解的是Dubbo的數(shù)據(jù)包模型姜性。
理解上述內(nèi)容的前提是對網(wǎng)絡(luò)有一定認(rèn)識,如果對基本的網(wǎng)絡(luò)沒有概念的話請先自行查詢相關(guān)資料看一下TCP/IP協(xié)議髓考。
因為Socket的底層已經(jīng)幫我們處理了IP/TCP的分包部念,我們不需要解析IP/TCP的包結(jié)構(gòu),處理包頭信息等。但是Dubbo作為一個封裝性的協(xié)議儡炼,需要自己處理一下包頭妓湘,包體的內(nèi)容處理,粘包也需要自行進行處理乌询。
我們首先明確一下Dubbo的包頭都有哪些成分:
- 2個字節(jié)的魔數(shù)(標(biāo)示Dubbo的協(xié)議)
- 1個字節(jié)的消息標(biāo)志位
- 5個比特位的序列化ID
- 1個比特位的事件類型(區(qū)分心跳還是正常的請求/響應(yīng)信息)
- 1個比特表示是oneWay還是twoWay請求
- 1個比特表示是request還是response
- 1個字節(jié)的狀態(tài)位(標(biāo)示Request和Response的狀態(tài))
- 8個字節(jié)的消息ID(requestId/responseId)
- 4個字節(jié)的數(shù)據(jù)長度
從這里看到消息頭的長度固定是16個字節(jié)多柑,然后我們根據(jù)header中的數(shù)據(jù)長度字段就能知道真是的body長度,這樣就可以解析到具體的body信息了楣责。
明白了上面的內(nèi)容竣灌,我們還要再看一下Dubbo中對于數(shù)據(jù)緩存區(qū)的處理,Dubbo在使用數(shù)據(jù)緩存區(qū)的時候用到了自己的類:ChannelBuffer(不同于Netty的ChannelBuffer)秆麸,在處理編解碼的過程中經(jīng)常用到這個數(shù)據(jù)緩存區(qū)初嘹,所以首先對于這個類做一定介紹:
| discardable bytes | readable bytes | writable bytes |
+-------------------+------------------+------------------+
| | | |
0 <= readerIndex <= writerIndex <= capacity
ChannelBuffer中主要有兩個游標(biāo),readerIndex和writerIndex沮趣,基本含義就是readerIndex記錄上一次讀取到的數(shù)據(jù)的位置屯烦,writerIndex記錄上一次寫數(shù)據(jù)的終點位置,初始情況下readerIndex=wirterIndex=0房铭,讀數(shù)據(jù)和寫數(shù)據(jù)的時候游標(biāo)都會自動移動驻龟。下面看一下基本的操作:
- void clear(); 將readerIndex和writerIndex都設(shè)置為0,但是不清除原來的數(shù)據(jù)
- void discardReadBytes(); 將readerIndex和writerIndex對應(yīng)的數(shù)據(jù)都往前移動(readerIndex個單位)
- byte getByte(int index); 獲得特定位置的byte值
- void getBytes(int index, byte[] dst); 將從index開始的數(shù)據(jù)copy到dst中
- boolean readable(); 判斷緩存區(qū)是否有可讀取的內(nèi)容
- int readableBytes(); 計算緩存區(qū)中可讀取的數(shù)據(jù)長度
- byte readByte(); 在當(dāng)前的游標(biāo)位置讀一個字節(jié)的數(shù)據(jù)
- void readBytes(byte[] dst); 從當(dāng)前位置讀取dst.length長度的內(nèi)容到dst中
其余的write*方法跟read*方法都是對應(yīng)的缸匪,不再一一介紹翁狐。
因為NettyServer和NettyClient的編碼解碼器是同一套,所以我們就選擇NettyServer為例來講解這個問題凌蔬。
編碼
編碼器主要就是NettyCodecAdapter.getDecoder()對象進行的露懒,下面還是直接擼代碼:
//編碼的起始位置就是這個Encoder的encode方法
private class InternalEncoder extends OneToOneEncoder {
@Override
protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
//dynamicBuffer表示動態(tài)的buffer空間,擴自動擴展空間大小
com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);
//如果存在相關(guān)通道就直接獲取砂心,否則就新建
NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
try {
//將編碼后的結(jié)果存入buffer中
codec.encode(channel, buffer, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ch);
}
//將編碼后可讀的信息重新包裝后再往后傳遞
return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());
}
}
//有dubbo的默認(rèn)配置直到這里的Codec就是DubboCountCodec懈词,所以我們接著看一下DubboCountCodec.encode()
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
//還是回到了DubboCodec
codec.encode(channel, buffer, msg);
}
//首先進入到DubboCountCode的夫類ExchangeCodec中的encode方法
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
//根據(jù)msg的類型確定是編碼請求,響應(yīng)或者talnet信息辩诞,然后找到對應(yīng)的編碼方法
//在具體的實現(xiàn)上因為encodeResponse跟encodeRequest差不多坎弯,所以就以encodeRequest為例往下面繼續(xù)講解
if (msg instanceof Request) {
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) {
encodeResponse(channel, buffer, (Response) msg);
} else {
super.encode(channel, buffer, msg);
}
}
//dubbo自己封裝了一個buffer使用,并沒有使用netty的buffer译暂,為了避免耦合
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
//默認(rèn)使用Hession序列化
Serialization serialization = getSerialization(channel);
// 16個字節(jié)的header.
byte[] header = new byte[HEADER_LENGTH];
// 2個字節(jié)的魔數(shù)
Bytes.short2bytes(MAGIC, header);
// 5個bit的Serialization id
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
if (req.isEvent()) header[2] |= FLAG_EVENT;
// 8個字節(jié)的RequestId
Bytes.long2bytes(req.getId(), header, 4);
// 先對body進行編碼才能確定實際的body長度
int savedWriteIndex = buffer.writerIndex();
// 控制頭部的長度
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
encodeEventData(channel, out, req.getData());
} else {
//實際上干的事情就是把body編碼的數(shù)據(jù)放到buffer中驯绎,bos只是buffer的裝飾者
encodeRequestData(channel, out, req.getData());
}
out.flushBuffer();
bos.flush();
bos.close();
//寫到buf中實際大小
int len = bos.writtenBytes();
//默認(rèn)只支持8M的最大大小
checkPayload(channel, len);
//計算實際的body長度并寫到buf中
Bytes.int2bytes(len, header, 12);
//定位到原始的write節(jié)點(header的起始位置)
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header);
//更新buffer的write為body結(jié)束的位置
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
//encode方法因為不不要考慮消息的邊界問題宇姚,所以比較簡單
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException {
//序列化消息體(不包含消息頭)的實際內(nèi)容
RpcInvocation inv = (RpcInvocation) data;
//dubbo版本信息
out.writeUTF(inv.getAttachment(Constants.DUBBO_VERSION_KEY, DUBBO_VERSION));
//接口名稱
out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
//服務(wù)版本
out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));
//接口方法名稱
out.writeUTF(inv.getMethodName());
//參數(shù)類型
out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
Object[] args = inv.getArguments();
//具體的參數(shù)值
if (args != null)
for (int i = 0; i < args.length; i++){
//encodeInvocationArgument主要處理回調(diào)相關(guān)的信息
out.writeObject(encodeInvocationArgument(channel, inv, i));
}
//附屬信息
out.writeObject(inv.getAttachments());
}
編碼的邏輯相對于解碼來說是比較簡單的闸拿,因為并不涉及處理粘包信息以及多包信息针肥,只是純粹的將包信息編碼之后然后發(fā)出去衰抑,但是這了我們并沒有提到hession的具體編碼步驟象迎,因為本身編碼的過程就是一個非常深的話題,涉及到比較多的數(shù)學(xué)邏輯,也不是這次講解的重點砾淌,所以這里只看到了dubbo在其之上有做多少內(nèi)容啦撮。
解碼
解碼器的入口也是在NettyServer中的NettyCodecAdaptor,解碼主要需要的注意地方就是:每次收到的數(shù)據(jù)包可能是不完整的汪厨,有可能收到的數(shù)據(jù)包只包含數(shù)據(jù)頭赃春,或者數(shù)據(jù)頭都是不完整的,也有可能包含消息體劫乱,但是消息體是不完整的织中,這些都是需要考慮的事情,下面我們看一下具體的解碼實現(xiàn):
private class InternalDecoder extends SimpleChannelUpstreamHandler {
//內(nèi)部公用的數(shù)據(jù)緩存區(qū)
private com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
Object o = event.getMessage();
//編碼的結(jié)果就是ChannelBuffer
if (! (o instanceof ChannelBuffer)) {
ctx.sendUpstream(event);
return;
}
ChannelBuffer input = (ChannelBuffer) o;
int readable = input.readableBytes();
//如果信息的載體衷戈,input中并沒有更多的可讀數(shù)據(jù)那就直接返回
if (readable <= 0) {
return;
}
com.alibaba.dubbo.remoting.buffer.ChannelBuffer message;
//buffer在初次使用的時候是空的狭吼,但是之后就有可能不為空
if (buffer.readable()) {
//每次都復(fù)用buffer,并將之前遺留的buffer與input的內(nèi)容匯總起來一并合成message存起來
if (buffer instanceof DynamicChannelBuffer) {
buffer.writeBytes(input.toByteBuffer());
message = buffer;
} else {
//如果buffer不是動態(tài)的話要生成一個動態(tài)的buffer用來存放新的匯總信息
int size = buffer.readableBytes() + input.readableBytes();
message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(
size > bufferSize ? size : bufferSize);
message.writeBytes(buffer, buffer.readableBytes());
message.writeBytes(input.toByteBuffer());
}
} else {
//接到第一個dubbo包的話buffer還是空的殖妇,所以在這里會把讀到的消息放到buffer中
message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer(
input.toByteBuffer());
}
//經(jīng)過上面的邏輯處理之后message就是新的匯總信息(上次未處理的+本次新收到的)
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
Object msg;
int saveReaderIndex;
try {
do {
saveReaderIndex = message.readerIndex();
try {
//DubboCountCodec.decode(代碼見下)
msg = codec.decode(channel, message);
} catch (IOException e) {
buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
throw e;
}
//目前的信息解析不出來一個完成的dubbo刁笙,因此會跳出循環(huán),直到下個數(shù)據(jù)包的到來
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
message.readerIndex(saveReaderIndex);
break;
} else {
//兩次的readerIndex一樣說明decode沒有進行
if (saveReaderIndex == message.readerIndex()) {
buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
throw new IOException("Decode without read data.");
}
//如果有讀到解碼的信息谦趣,將解碼后的信息繼續(xù)往下游發(fā)送
//這時候的msg就是對應(yīng)的Request疲吸,Response或者心跳
if (msg != null) {
Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
}
}
} while (message.readable());
} finally {
if (message.readable()) {
//清除已經(jīng)讀過的空間
message.discardReadBytes();
buffer = message;
} else {
buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
}
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
}
//DubboCountCodec.decode
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int save = buffer.readerIndex();
MultiMessage result = MultiMessage.create();
//可能由于網(wǎng)絡(luò)原因,一次接收到比較多的的數(shù)據(jù)包
do {
Object obj = codec.decode(channel, buffer);
//如果結(jié)果表示為需要更多的數(shù)據(jù)的話前鹅,就將原本保持的readerIndex設(shè)置回去摘悴,因為在decode的時候可能改變了readerIndex
if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
buffer.readerIndex(save);
break;
} else {
result.addMessage(obj);
//將解碼后的結(jié)果大小記錄到result中
logMessageLength(obj, buffer.readerIndex() - save);
save = buffer.readerIndex();
}
} while (true);
//表示收到的數(shù)據(jù)不完整或者本次編碼結(jié)束,等待下一個數(shù)據(jù)到來
if (result.isEmpty()) {
return Codec2.DecodeResult.NEED_MORE_INPUT;
}
if (result.size() == 1) {
return result.get(0);
}
return result;
}
//ExchangeCodec.decode()
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int readable = buffer.readableBytes();
//如果readable小于HEADER_LENGTH舰绘,說明是一個不全的dubbo消息
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
buffer.readBytes(header);
return decode(channel, buffer, readable, header);
}
//ExchangeCodec.decode()
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// 如果header的前兩個字節(jié)不是魔數(shù)的話
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
int length = header.length;
//說明頭部是完整的
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
//逐步檢查buffer中是否有魔數(shù)開頭的header烦租,有的話就直接讀取到header數(shù)組中
for (int i = 1; i < header.length - 1; i ++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
//處理父類telnet的解碼
return super.decode(channel, buffer, readable, header);
}
// 數(shù)據(jù)不全(頭部不完整)
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}
//獲取body長度,檢查8M最大限制
int len = Bytes.bytes2int(header, 12);
checkPayload(channel, len);
int tt = len + HEADER_LENGTH;
// 數(shù)據(jù)不全(body不完整)
if( readable < tt ) {
return DecodeResult.NEED_MORE_INPUT;
}
// limit input stream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
try {
//DubboCodec.decodeBody
return decodeBody(channel, is, header);
} finally {
if (is.available() > 0) {
try {
if (logger.isWarnEnabled()) {
logger.warn("Skip input stream " + is.available());
}
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
//DubboCodec.decodeBody
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
// 獲得請求的ID
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
// 如果是response類型
Response res = new Response(id);
if ((flag & FLAG_EVENT) != 0) {
res.setEvent(Response.HEARTBEAT_EVENT);
}
// 獲得狀態(tài)字段
byte status = header[3];
res.setStatus(status);
if (status == Response.OK) {
try {
Object data;
//如果是心跳事件的話除盏,直接通過解碼器讀取數(shù)據(jù)
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
} else if (res.isEvent()) {
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
} else {
//將原來的RPCResult轉(zhuǎn)換為DecodeableRpcResult叉橱,轉(zhuǎn)換后的DecodeableRpcResult里面的data為解碼后的結(jié)果信息
DecodeableRpcResult result;
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
result = new DecodeableRpcResult(channel, res, is,
(Invocation)getRequestData(id), proto);
result.decode();
} else {
//如果解碼操作不在IO線程中做的話可以到線程池中做,具體的操作就是DecodeHandler處理類者蠕,處理的步驟跟上面一樣窃祝,只不過時機不同
result = new DecodeableRpcResult(channel, res,
new UnsafeByteArrayInputStream(readMessageData(is)),
(Invocation) getRequestData(id), proto);
}
data = result;
}
res.setResult(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode response failed: " + t.getMessage(), t);
}
//解碼Response是客戶端的內(nèi)容,所以設(shè)置錯誤碼為CLIENT_ERROR
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
} else {
//如果Response狀態(tài)不正常的話就直接設(shè)置解碼信息為錯誤信息
res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
}
return res;
} else {
//解碼請求
Request req = new Request(id);
req.setVersion("2.0.0");
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
if ((flag & FLAG_EVENT) != 0) {
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
Object data;
//如果請求時心跳事件的話就直接通過解碼器解碼
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
} else if (req.isEvent()) {
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
} else {
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
inv = new DecodeableRpcInvocation(channel, req, is, proto);
inv.decode();
} else {
//如果解碼操作不在IO線程中做的話可以到線程池中做踱侣,具體的操作就是DecodeHandler處理類粪小,處理的步驟跟上面一樣,只不過時機不同
inv = new DecodeableRpcInvocation(channel, req,
new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
data = inv;
}
req.setData(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode request failed: " + t.getMessage(), t);
}
// bad request
req.setBroken(true);
req.setData(t);
}
return req;
}
}
//DecodeableRpcResult.decode()
public Object decode(Channel channel, InputStream input) throws IOException {
//根據(jù)serializationType獲得對應(yīng)的Hession序列化類
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
//這個flag的讀取需要深入到Hession2的內(nèi)部理解
//這里的in其實就是封裝類具體的序列化框架裝飾者
byte flag = in.readByte();
switch (flag) {
case DubboCodec.RESPONSE_NULL_VALUE:
break;
case DubboCodec.RESPONSE_VALUE:
try {
//返回結(jié)果:Type[]{method.getReturnType(), method.getGenericReturnType()}
Type[] returnType = RpcUtils.getReturnTypes(invocation);
setValue(returnType == null || returnType.length == 0 ? in.readObject() :
(returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
: in.readObject((Class<?>) returnType[0], returnType[1])));
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
break;
case DubboCodec.RESPONSE_WITH_EXCEPTION:
try {
Object obj = in.readObject();
if (obj instanceof Throwable == false)
throw new IOException("Response data error, expect Throwable, but get " + obj);
setException((Throwable) obj);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
break;
default:
throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
}
return this;
}
//DecodeableRpcInvocation.decode
public Object decode(Channel channel, InputStream input) throws IOException {
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
//這里的讀取順序不能變抡句,一定要跟編碼的順序保持一致探膊,否則信息將錯亂
//依次從緩存區(qū)讀取dubbo版本信息,path和version
setAttachment(Constants.DUBBO_VERSION_KEY, in.readUTF());
setAttachment(Constants.PATH_KEY, in.readUTF());
setAttachment(Constants.VERSION_KEY, in.readUTF());
//讀取dubbo的方法
setMethodName(in.readUTF());
try {
Object[] args;
Class<?>[] pts;
//依次構(gòu)建方法參數(shù)類型即實際方法參數(shù)
String desc = in.readUTF();
if (desc.length() == 0) {
pts = DubboCodec.EMPTY_CLASS_ARRAY;
args = DubboCodec.EMPTY_OBJECT_ARRAY;
} else {
pts = ReflectUtils.desc2classArray(desc);
args = new Object[pts.length];
for (int i = 0; i < args.length; i++) {
try {
args[i] = in.readObject(pts[i]);
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn("Decode argument failed: " + e.getMessage(), e);
}
}
}
}
setParameterTypes(pts);
//讀取attachments信息并且設(shè)置到DecodeableRpcInvocation
Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
if (map != null && map.size() > 0) {
Map<String, String> attachment = getAttachments();
if (attachment == null) {
attachment = new HashMap<String, String>();
}
attachment.putAll(map);
setAttachments(attachment);
}
//decode argument ,may be callback
for (int i = 0; i < args.length; i++) {
args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
}
setArguments(args);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read invocation data failed.", e));
}
return this;
}
到此待榔,dubbo的編碼解碼都已經(jīng)涉及到了逞壁。我們從中可以感受到設(shè)計的巧妙之處流济,我覺得最巧妙的地方就是:不會多處理無用的內(nèi)容,例如在序列化的時候只是將request對應(yīng)的數(shù)據(jù)內(nèi)容(方法信息腌闯,版本信息绳瘟,必要的attachments等)進行編碼然后傳輸,而不是直接request對象直接作為序列化的對象進行序列化姿骏,這么做既較少了傳輸?shù)膬?nèi)容糖声,還削減了序列化的難度(最起碼不用考慮request自身的內(nèi)容)。在解碼的時候通過requestId重新構(gòu)建出對應(yīng)的Request信息分瘦,然后從網(wǎng)絡(luò)中將接收到的信息一一進行反序列化之后重新塞到新構(gòu)建的Request對象中蘸泻。正是因為序列化和反序化都是耗時操作,所以能減輕的話就減輕其操作步驟嘲玫,這里還是十分科學(xué)的蟋恬,不由得感嘆:工程師對于架構(gòu)的把控力果然牛逼!
插播一條花邊新聞:關(guān)于request和response的映射都是通過ID來做的趁冈,并在創(chuàng)建ID時候會在一個全局的FUTURE里面存入信息(我們可以把FUTURE理解為一個redis)歼争,在創(chuàng)建DefaultFuture的時候?qū)⑵浞湃雛edis(請求的處理生命周期內(nèi)),然后在之后的任何時間都可以隨時拿到這個請求ID(在請求中帶了request ID渗勘,還記得嗎沐绒?),然后在客戶端收到Response的時候通過ID找到Future旺坠,再把結(jié)果設(shè)置到Future中乔遮,這樣就完成了同步和異步的轉(zhuǎn)換,是不是很牛叉取刃。
今天的內(nèi)容就這樣子了蹋肮,撒花~~~啦啦啦~~~