本系列主要參考官網(wǎng)文檔、芋道源碼的源碼解讀和《深入理解Apache Dubbo與實戰(zhàn)》一書呆贿。Dubbo版本為2.6.1嚷兔。
本篇用以分析心跳、編碼做入、解碼相關的代碼冒晰。
文章內容順序:
1.心跳
1.1 為什么要有心跳?
1.2 HeaderExchangeClient
1.3 HeaderExchangeClient#startHeatbeatTimer
1.4 HeartBeatTask#run
2.編碼
2.1 編碼的鏈路:
2.2 消息頭的字段的意義
2.3 ExchangeCodec
2.4 序列化的多種實現(xiàn)
3.解碼
3.1 InternalDecoder#messageReceived
3.2 DubboCountCodec#decode
3.3 DubboCodec.decode(channel,buffer)
3.4 DubboCodec#decodeBody
3.5 DecodeableRpcInvocation#decode
3.6 解碼的方法調用順序
首先是心跳相關
1.心跳
在上一篇服務調用的消費端中竟块,我們介紹到HeaderExchangeClient
的構造方法中會有心跳的一些邏輯壶运,在那邊一筆帶過了,在這篇文章來詳細看看浪秘。
1.1為什么要有心跳蒋情?
心跳間隔,對于長連接耸携,當物理層斷開時恕出,比如拔網(wǎng)線,TCP的FIN消息來不及發(fā)送违帆,對方收不到斷開事件浙巫,此時需要心跳來幫助檢查連接是否已斷開
這里仍舊貼一下HeaderExchangeClient的構造方法
1.2HeaderExchangeClient
public HeaderExchangeClient(Client client, boolean needHeartbeat) {
if (client == null) {
throw new IllegalArgumentException("client == null");
}
this.client = client;
// 創(chuàng)建 HeaderExchangeChannel 對象
this.channel = new HeaderExchangeChannel(client);
// 以下代碼均與心跳檢測邏輯有關
String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}
if (needHeartbeat) {
// 開啟心跳檢測定時器
startHeartbeatTimer();
}
}
此方法最后調用 #startHeatbeatTimer()
方法,發(fā)起心跳定時器刷后。直接來看startHeatbeatTimer()
方法的實現(xiàn)吧
1.3HeaderExchangeClient#startHeatbeatTimer
private void startHeatbeatTimer() {
// 停止原有定時任務
stopHeartbeatTimer();
// 發(fā)起新的定時任務
if (heartbeat > 0) {
heartbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
public Collection<Channel> getChannels() {
return Collections.<Channel>singletonList(HeaderExchangeClient.this);
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
}
}
可以看到這邊就是直接調用線程池進行了一個定時任務HeartBeatTask
對象的畴,scheduleWithFixedDelay
方法意為當當前任務執(zhí)行完畢后再隔多少秒進行下一個任務。
接下來我們來看一下HeartBeatTask的實現(xiàn)尝胆。
1.4HeartBeatTask#run
public void run() {
try {
long now = System.currentTimeMillis();
for (Channel channel : channelProvider.getChannels()) {
if (channel.isClosed()) {
continue;
}
try {
Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);
Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
// 最后讀寫的時間丧裁,任一超過心跳間隔,發(fā)送心跳
if ((lastRead != null && now - lastRead > heartbeat)
|| (lastWrite != null && now - lastWrite > heartbeat)) {
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true); // 需要響應
req.setEvent(Request.HEARTBEAT_EVENT);
channel.send(req);
if (logger.isDebugEnabled()) {
logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
}
}
// 最后讀的時間含衔,超過心跳超時時間
if (lastRead != null && now - lastRead > heartbeatTimeout) {
logger.warn("Close channel " + channel
+ ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
// 客戶端側煎娇,重新連接服務端
if (channel instanceof Client) {
try {
((Client) channel).reconnect();
} catch (Exception e) {
//do nothing
}
// 服務端側,關閉客戶端連接
} else {
channel.close();
}
}
} catch (Throwable t) {
logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
}
}
} catch (Throwable t) {
logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
}
}
- 【任務一】:最后讀或寫的時間贪染,任一超過心跳間隔 heartbeat 缓呛,發(fā)送心跳。
- 【任務二】:最后讀的時間杭隙,超過心跳超時時間 heartbeatTimeout 哟绊,分成兩種情況:
客戶端側,重連連接服務端痰憎。
服務端側票髓,關閉客戶端連接攀涵。
至此,心跳的分析告一段落洽沟。
接下來來看看編碼的操作以故。
2.編碼
2.1編碼的鏈路
看上面這張圖,當運行到NettyChannel#send
的這一行后裆操,就會跳進Netty
的執(zhí)行邏輯怒详,最后由NettyCodecAdapter
的內部類調用編碼類執(zhí)行編碼操作
上圖中繼承的
OneToOneEncoder
是Netty
的抽象方法,那這個InternalEncoder
是什么時候傳進來的呢跷车?
在NettyClient#doOpen()
方法中有如下代碼,會在編碼器橱野、解碼器等一并設置進來朽缴,從而使得最后編碼、解碼邏輯能交由Dubbo
自己的類實現(xiàn)水援。
再貼一張編碼的鏈路密强。
NettyChannel#send
->一系列Netty內部的方法
->NettyCodecAdapter內部類(繼承了netty抽象類)#encode
->ExchangeCodec#encode
->ExchangeCodec#encodeRequest
可以看到最后是交由
ExchangeCodec
來執(zhí)行編碼的邏輯了。那么這個鏈路就簡單介紹到這蜗元,直接來看他是怎么實現(xiàn)的編碼吧或渤。
2.2消息頭的字段的意義
先簡單列舉一下消息頭的內容,其中的魔數(shù)是用來分割處理粘包問題的奕扣。
接下來就直接來看看
ExchangeCodec
的實現(xiàn)
2.3 ExchangeCodec
public class ExchangeCodec extends TelnetCodec {
// 消息頭長度
protected static final int HEADER_LENGTH = 16;
// 魔數(shù)內容
protected static final short MAGIC = (short) 0xdabb;
protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
protected static final byte FLAG_REQUEST = (byte) 0x80;
protected static final byte FLAG_TWOWAY = (byte) 0x40;
protected static final byte FLAG_EVENT = (byte) 0x20;
protected static final int SERIALIZATION_MASK = 0x1f;
private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);
public Short getMagicCode() {
return MAGIC;
}
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
if (msg instanceof Request) {
// 對 Request 對象進行編碼
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) {
// 對 Response 對象進行編碼薪鹦,后面分析
encodeResponse(channel, buffer, (Response) msg);
} else {
super.encode(channel, buffer, msg);
}
}
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
Serialization serialization = getSerialization(channel);
// 創(chuàng)建消息頭字節(jié)數(shù)組,長度為 16
byte[] header = new byte[HEADER_LENGTH];
// 設置魔數(shù)
Bytes.short2bytes(MAGIC, header);
// 設置數(shù)據(jù)包類型(Request/Response)和序列化器編號
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
// 設置通信方式(單向/雙向)
if (req.isTwoWay()) {
header[2] |= FLAG_TWOWAY;
}
// 設置事件標識
if (req.isEvent()) {
header[2] |= FLAG_EVENT;
}
// 設置請求編號惯豆,8個字節(jié)池磁,從第4個字節(jié)開始設置
Bytes.long2bytes(req.getId(), header, 4);
// 獲取 buffer 當前的寫位置
int savedWriteIndex = buffer.writerIndex();
// 更新 writerIndex,為消息頭預留 16 個字節(jié)的空間
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
// 創(chuàng)建序列化器楷兽,比如 Hessian2ObjectOutput
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
// 對事件數(shù)據(jù)進行序列化操作
encodeEventData(channel, out, req.getData());
} else {
// 對請求數(shù)據(jù)進行序列化操作
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();
// 獲取寫入的字節(jié)數(shù)地熄,也就是消息體長度
int len = bos.writtenBytes();
checkPayload(channel, len);
// 將消息體長度寫入到消息頭中
Bytes.int2bytes(len, header, 12);
// 將 buffer 指針移動到 savedWriteIndex,為寫消息頭做準備
buffer.writerIndex(savedWriteIndex);
// 從 savedWriteIndex 下標處寫入消息頭
buffer.writeBytes(header);
// 設置新的 writerIndex芯杀,writerIndex = 原寫下標 + 消息頭長度 + 消息體長度
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
// 省略其他方法
}
以上就是請求對象的編碼過程端考,該過程首先會通過位運算將消息頭寫入到 header
數(shù)組中。然后對 Request
對象的 data
字段執(zhí)行序列化操作揭厚,序列化后的數(shù)據(jù)最終會存儲到ChannelBuffer
中却特。序列化操作執(zhí)行完后,可得到數(shù)據(jù)序列化后的長度 len
筛圆,緊接著將 len
寫入到 header
指定位置處核偿。最后再將消息頭字節(jié)數(shù)組 header
寫入到 ChannelBuffer
中,整個編碼過程就結束了顽染。
這里我們可以再來關注一下encodeEventData
方法(encodeRequestData
也是一樣的實現(xiàn)漾岳。)
2.4 序列化的多種實現(xiàn)
image.png
通過一系列重載方法轰绵,我們可以看到最后調用了out.writeObeject
而這個out
,則是在encodeRequest
方法中通過url傳過來的參數(shù)設置的尼荆,有多種不同的實現(xiàn)左腔。
image.pngimage.png
有關序列化的協(xié)議,可以簡單參照下這篇博文: Dubbo協(xié)議及序列化
說完了編碼捅儒,再來說說解碼液样。
同樣的,還是從解碼的鏈路開始說起
3.解碼
3.1 InternalDecoder#messageReceived
NettyCodecAdapter
的內部類InternalDecoder#messageReceived
方法
同樣的巧还,這個類也與編碼的類一樣鞭莽,都是通過netty的pipeline
設置進來的,上文已經(jīng)介紹過了麸祷。
調用了DubboCountCodec#decode
方法
3.2 DubboCountCodec#decode
public final class DubboCountCodec implements Codec2 {
private DubboCodec codec = new DubboCodec();
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
// 記錄當前讀位置
int save = buffer.readerIndex();
// 創(chuàng)建 MultiMessage 對象
MultiMessage result = MultiMessage.create();
do {
// 解碼
Object obj = codec.decode(channel, buffer);
// 輸入不夠澎怒,重置讀進度
if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
buffer.readerIndex(save);
break;
// 解析到消息
} else {
// 添加結果消息
result.addMessage(obj);
// 記錄消息長度到隱式參數(shù)集合,用于 MonitorFilter 監(jiān)控
logMessageLength(obj, buffer.readerIndex() - save);
// 記錄當前讀位置
save = buffer.readerIndex();
}
} while (true);
// 需要更多的輸入
if (result.isEmpty()) {
return Codec2.DecodeResult.NEED_MORE_INPUT;
}
// 返回解析到的消息
if (result.size() == 1) {
return result.get(0);
}
return result;
}
//省略其他代碼
}
這邊的codec
指的就是DubboCodec
阶牍。調用的是DubboCodec.decode(channel,buffer)
3.3 DubboCodec.decode(channel,buffer)
public class ExchangeCodec extends TelnetCodec {
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
// 讀取 Header 數(shù)組
int readable = buffer.readableBytes();
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
buffer.readBytes(header);
// 解碼
return decode(channel, buffer, readable, header);
}
@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// 非 Dubbo 協(xié)議喷面,目前是 Telnet 命令。
// check magic number.
if (readable > 0 && header[0] != MAGIC_HIGH || readable > 1 && header[1] != MAGIC_LOW) {
// 將 buffer 完全復制到 `header` 數(shù)組中走孽。因為惧辈,上面的 `#decode(channel, buffer)` 方法,可能未讀全
int length = header.length;
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
// 【TODO 8026 】header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW 磕瓷?
for (int i = 1; i < header.length - 1; i++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
// 提交給父類( Telnet ) 處理盒齿,目前是 Telnet 命令。
return super. decode(channel, buffer, readable, header);
}
// Header 長度不夠困食,返回需要更多的輸入
// check length.
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}
// `[96 - 127]`:Body 的**長度**县昂。通過該長度,讀取 Body 陷舅。
// get data length.
int len = Bytes.bytes2int(header, 12);
checkPayload(channel, len);
// 總長度不夠倒彰,返回需要更多的輸入
int tt = len + HEADER_LENGTH;
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;
}
// 解析 Header + Body
// limit input stream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
try {
//子類重寫的方法
return decodeBody(channel, is, header);
} finally {
// skip 未讀完的流,并打印錯誤日志
if (is.available() > 0) {
try {
if (logger.isWarnEnabled()) {
logger.warn("Skip input stream " + is.available());
}
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
//省略其他代碼
}
這邊對于再提一下莱睁,實際上這個方法是DubboCodec
里的方法待讳,但是ExchangeCodec
是DubboCodec
的父類,并且在DubboCodec
沒有重寫這個方法仰剿,所以debug
會跳到父類的方法行(因為代碼邏輯寫在父類里)创淡。
上面方法通過檢測消息頭中的魔數(shù)是否與規(guī)定的魔數(shù)相等,提前攔截掉非常規(guī)數(shù)據(jù)包南吮,比如通過 telnet
命令行發(fā)出的數(shù)據(jù)包琳彩。接著再對消息體長度,以及可讀字節(jié)數(shù)進行檢測。最后調用 decodeBody
方法進行后續(xù)的解碼工作露乏。
注意在最后的try塊中碧浊,會調用到DubboCodec的實現(xiàn)——DubboCodec#decodeBody
。注意瘟仿,從頭到尾我們調用的都是DubboCodec
類箱锐。
3.4 DubboCodec#decodeBody
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2];
// 獲得 Serialization 對象
byte proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
// 獲得請求||響應編號
// get request id.
long id = Bytes.bytes2long(header, 4);
// 解析響應
if ((flag & FLAG_REQUEST) == 0) {
// decode response.
Response res = new Response(id);
// 若是心跳事件,進行設置
if ((flag & FLAG_EVENT) != 0) {
res.setEvent(Response.HEARTBEAT_EVENT);
}
// 設置狀態(tài)
// get status.
byte status = header[3];
res.setStatus(status);
// 正常響應狀態(tài)
if (status == Response.OK) {
try {
Object data;
// 解碼心跳事件
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
// 解碼其它事件
} else if (res.isEvent()) {
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
// 解碼普通響應
} else {
DecodeableRpcResult result;
// 在通信框架(例如劳较,Netty)的 IO 線程驹止,解碼
if (channel.getUrl().getParameter(Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(id), proto);
result.decode();
// 在 Dubbo ThreadPool 線程,解碼观蜗,使用 DecodeHandler
} else {
result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), (Invocation) getRequestData(id), proto);
}
data = result;
}
// 設置結果
res.setResult(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode response failed: " + t.getMessage(), t);
}
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
// 異常響應狀態(tài)
} else {
res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
}
return res;
// 解析請求
} else {
// decode request.
Request req = new Request(id);
req.setVersion("2.0.0");
// 是否需要響應
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
// 若是心跳事件臊恋,進行設置
if ((flag & FLAG_EVENT) != 0) {
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
Object data;
// 解碼心跳事件
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
// 解碼其它事件
} else if (req.isEvent()) {
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
// 解碼普通請求
} else {
// 在通信框架(例如,Netty)的 IO 線程墓捻,解碼
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
inv = new DecodeableRpcInvocation(channel, req, is, proto);
inv.decode();
// 在 Dubbo ThreadPool 線程抖仅,解碼,使用 DecodeHandler
} else {
inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
data = inv;
}
req.setData(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode request failed: " + t.getMessage(), t);
}
// bad request
req.setBroken(true);
req.setData(t);
}
return req;
}
}
如上毙替,decodeBody
對部分字段進行了解碼岸售,并將解碼得到的字段封裝到 Request 中践樱。隨后會調用 DecodeableRpcInvocation#decode
方法進行后續(xù)的解碼工作袱院。
要么是在本線程內解碼环肘,要么是交由work
線程池執(zhí)行复哆,會在Dubbo的線程模型益涧、handler
講解如何交給其執(zhí)行久免,又是怎么執(zhí)行的摔握。
再來看一下DecodeableRpcInvocation#decode
方法
3.5 DecodeableRpcInvocation#decode
public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Decodeable {
/**
* 是否已經(jīng)解碼完成
*/
private volatile boolean hasDecoded;
@Override
public void decode() {
if (!hasDecoded && channel != null && inputStream != null) {
try {
decode(channel, inputStream);
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn("Decode rpc invocation failed: " + e.getMessage(), e);
}
request.setBroken(true);
request.setData(e);
} finally {
hasDecoded = true;
}
}
}
@Override
public Object decode(Channel channel, InputStream input) throws IOException {
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
// 通過反序列化得到 dubbo version,并保存到 attachments 變量中
String dubboVersion = in.readUTF();
request.setVersion(dubboVersion);
setAttachment(Constants.DUBBO_VERSION_KEY, dubboVersion);
// 通過反序列化得到 path,version,并保存到 attachments 變量中
setAttachment(Constants.PATH_KEY, in.readUTF());
setAttachment(Constants.VERSION_KEY, in.readUTF());
// 通過反序列化得到調用方法名
setMethodName(in.readUTF());
try {
Object[] args;
Class<?>[] pts;
// 通過反序列化得到參數(shù)類型字符串逻恐,比如 Ljava/lang/String;
String desc = in.readUTF();
if (desc.length() == 0) {
pts = DubboCodec.EMPTY_CLASS_ARRAY;
args = DubboCodec.EMPTY_OBJECT_ARRAY;
} else {
// 將 desc 解析為參數(shù)類型數(shù)組
pts = ReflectUtils.desc2classArray(desc);
args = new Object[pts.length];
for (int i = 0; i < args.length; i++) {
try {
// 解析運行時參數(shù)
args[i] = in.readObject(pts[i]);
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn("Decode argument failed: " + e.getMessage(), e);
}
}
}
}
// 設置參數(shù)類型數(shù)組
setParameterTypes(pts);
// 通過反序列化得到原 attachment 的內容
Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
if (map != null && map.size() > 0) {
Map<String, String> attachment = getAttachments();
if (attachment == null) {
attachment = new HashMap<String, String>();
}
// 將 map 與當前對象中的 attachment 集合進行融合
attachment.putAll(map);
setAttachments(attachment);
}
// 對 callback 類型的參數(shù)進行處理
for (int i = 0; i < args.length; i++) {
args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
}
// 設置參數(shù)列表
setArguments(args);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read invocation data failed.", e));
} finally {
if (in instanceof Cleanable) {
((Cleanable) in).cleanup();
}
}
return this;
}
}
可以看到DecodeableRpcInvocation #decode
會先判斷是否已經(jīng)解碼完成(這很重要,在交由Dubbo線程池執(zhí)行的時候也會進到這個方法,如果已經(jīng)解碼過,就不進行下面的流程,如果已經(jīng)沒解碼過,那么就會幫助執(zhí)行解碼操作),如果沒有解碼過,調用decode的重載方法。
重載方法通過反序列化將諸如 path
、version
途戒、調用方法名继准、參數(shù)列表等信息依次解析出來,并設置到相應的字段中毡鉴,最終得到一個具有完整調用信息的 DecodeableRpcInvocation
對象崔泵。
3.6解碼的方法調用順序
所以解碼調用的順序為:
NettyCodecAdapter的內部類InternalDecoder#messageReceived
->DubboCountCodec#decode
->DubboCodec#decode(channel,buffer)//父類實現(xiàn)的方法
->DubboCodec#decode(channel,buffer,readable,header)//父類實現(xiàn)的方法
->Dubbo#decodeBody
->DecodeableRpcInvocation#decode//交由本線程或者業(yè)務線程池執(zhí)行