簡介
在使用了消息隊列的通信方之間, 總體的通信架構圖如下:
在消息生產者, broker和消息消費者之間都會發(fā)生通信, RocketMQ的通信層是基于通信框架netty之上做了簡單的協(xié)議封裝. 本人閱讀的RocketMQ版本是4.1.0-incubating-SNAPSHOT, 依賴的netty版本是4.0.36.Final. RocketMQ的代碼結構圖如下:
大體上分為broker, client, common filtersrv, namesrv和remoting等模塊, 通信框架就封裝在remoting模塊中.
本文從協(xié)議格式, 消息編解碼, 通信方式(同步, 異步, 單向)和通信流程(詳細介紹同步調用流程)等方面介紹RocketMQ的通信模塊.
設計要素
對于一個消息隊列的RPC網(wǎng)絡通信來說尚胞,要求并不像服務框架那樣苛刻, 滿足一下幾點即可:
- 編解碼處理(負責通信中的編碼和解碼, 序列化, 通信協(xié)議設計等必要功能)
- 雙向消息處理(包括同步或異步, MQ中有異步消息的功能)
- 單向消息處理(一般指心跳消息或者注冊消息這樣的類型)
類圖
以RemotingService為最上層接口,提供了三個接口:
void start();
void shutdown();
void registerRPCHook(RPCHook rpcHook);
RemotingClient和RemotingServer都繼承了RemotingService接口, 并增加了自己特有的接口.
RemotingClient:
void updateNameServerAddressList(final List<String> addrs);
List<String> getNameServerAddressList();
RemotingCommand invokeSync(final String addr, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException;
void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException;
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
boolean isChannelWriteable(final String addr);
NettyRemotingClient和NettyRemotingServer分別實現(xiàn)了RemotingClient和RemotingServer, 并且都繼承了NettyRemotingAbstract類. NettyRemotingAbstract這個抽象類包含了很多公共數(shù)據(jù)處理,也包含了很多重要的數(shù)據(jù)結構, 這個稍后介紹.
其它還有NettyEvent, NettyEncoder, NettyDecoder和RemotingCommand等一系列通信過程中使用到的類.
協(xié)議設計與編碼解碼
在分析具體的api接口之前, 先介紹一下RocketMQ的通信協(xié)議是如何設計的.
具體的通信協(xié)議格式如下(重點理解, 能根據(jù)通信協(xié)議格式來對網(wǎng)絡中讀取的二進制數(shù)據(jù)進行編解碼):
消息共分為四個部分:
- 1.消息長度(總長度, 四個字節(jié)存儲, 占用一個int類型)
- 2.序列化類型&消息頭長度(同樣占用一個int類型, 第一個字節(jié)表示序列化類型, 后面三個字節(jié)表示消息頭長度)
- 3.消息頭數(shù)據(jù)
- 4.消息主體數(shù)據(jù)
消息編碼過程由類RemotingCommand中的encode()方法完成. 代碼如下:
public ByteBuffer encode() {
// 1> header length size
int length = 4; //消息總長度
// 2> header data length
//將消息頭編碼成byte[]
byte[] headerData = this.headerEncode();
//計算頭部長度
length += headerData.length;
// 3> body data length
if (this.body != null) {
//消息主體長度
length += body.length;
}
//分配ByteBuffer, 這邊加了4,
//這是因為在消息總長度的計算中沒有將存儲頭部長度的4個字節(jié)計算在內
ByteBuffer result = ByteBuffer.allocate(4 + length);
// length
//將消息總長度放入ByteBuffer
result.putInt(length);
// header length
//將消息頭長度放入ByteBuffer
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
//將消息頭數(shù)據(jù)放入ByteBuffer
result.put(headerData);
// body data;
if (this.body != null) {
//將消息主體放入ByteBuffer
result.put(this.body);
}
//重置ByteBuffer的position位置
result.flip();
return result;
}
public static byte[] markProtocolType(int source, SerializeType type) {
byte[] result = new byte[4];
result[0] = type.getCode();
result[1] = (byte) ((source >> 16) & 0xFF);
result[2] = (byte) ((source >> 8) & 0xFF);
result[3] = (byte) (source & 0xFF);
return result;
}
相應的,decode的代碼如下(也在類RemotingCommand中):
public static RemotingCommand decode(final byte[] array) {
ByteBuffer byteBuffer = ByteBuffer.wrap(array);
return decode(byteBuffer);
}
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
int length = byteBuffer.limit();
int oriHeaderLen = byteBuffer.getInt();
//計算消息頭長度
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
public static int getHeaderLength(int length) {
return length & 0xFFFFFF;
}
通信方式和通信流程
接下來看一下RocketMQ的通信方式, RocketMQ支持三種方式的通信:
- 同步(sync)
- 異步(async)
- 單向(oneway)
下面以Remoting模塊中的一個單元測試為例, 說明同步調用的通信過程.
首先看一下同步調用的整體流程(客戶端):
下面詳細分析流程圖中涉及的源代碼.
先由RemotingClient的實現(xiàn)類NettyRemotingClient構造請求(一個RemotingCommand實例), 然后根據(jù)addr獲取相應的channel, 調用invokeSyncImpl方法, 將數(shù)據(jù)流轉給抽象類NettyRemotingAbstract處理. 那么NettyRemotingClient是如何啟動的呢, 示例代碼如下:
//類:org.apache.rocketmq.remoting.RemotingServerTest
public static RemotingClient createRemotingClient() {
NettyClientConfig config = new NettyClientConfig();
RemotingClient client = new NettyRemotingClient(config);
client.start();
return client;
}
先實例化RemotingClient, 其構造函數(shù)如下:
public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
this(nettyClientConfig, null);
}
public NettyRemotingClient(final NettyClientConfig nettyClientConfig, //
final ChannelEventListener channelEventListener) {
//調用父類的構造函數(shù), 主要是設置單向調用和異步調用兩種模式下的最大并發(fā)數(shù)
super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
this.nettyClientConfig = nettyClientConfig;
//NettyEventExecuter處理線程會不斷從eventQueue中讀取消息, 調用注冊的ChannelEventListener進行處理
this.channelEventListener = channelEventListener;
//執(zhí)行用戶回調函數(shù)的線程數(shù)
int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
//執(zhí)行用戶回調函數(shù)的線程池
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
//netty eventLoopGroupWorker
this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
}
});
}
接著啟動NettyRemotingClient, 代碼如下:
public void start() {
//構建一個DefaultEventExecutorGroup, 用于處理netty handler中的操作
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//
nettyClientConfig.getClientWorkerThreads(), //
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
//初始化netty, 對netty的用法不做介紹
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)//
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
defaultEventExecutorGroup,
//編碼handler
new NettyEncoder(),
//解碼handler
new NettyDecoder(),
//心跳檢測
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
//連接管理handler,處理connect, disconnect, close等事件
new NettyConnectManageHandler(),
//處理接收到RemotingCommand消息后的事件, 收到服務器端響應后的相關操作
new NettyClientHandler());
}
});
//定時掃描responseTable,獲取返回結果,并且處理超時
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Exception e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
if (this.channelEventListener != null) {
this.nettyEventExecuter.start();
}
}
接下來看消息同步調用的邏輯.
同步調用的單元測試:
public void testInvokeSync() throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException {
//消息頭
RequestHeader requestHeader = new RequestHeader();
requestHeader.setCount(1);
requestHeader.setMessageTitle("Welcome");
//構建請求
RemotingCommand request = RemotingCommand.createRequestCommand(0, requestHeader);
//同步發(fā)送消息
RemotingCommand response = remotingClient.invokeSync("localhost:8888", request, 1000 * 3);
assertTrue(response != null);
assertThat(response.getLanguage()).isEqualTo(LanguageCode.JAVA);
assertThat(response.getExtFields()).hasSize(2);
}
class RequestHeader implements CommandCustomHeader {
@CFNullable
private Integer count;
@CFNullable
private String messageTitle;
@Override
public void checkFields() throws RemotingCommandException {
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
public String getMessageTitle() {
return messageTitle;
}
public void setMessageTitle(String messageTitle) {
this.messageTitle = messageTitle;
}
}
public interface CommandCustomHeader {
void checkFields() throws RemotingCommandException;
}
首先構建消息頭, 然后調用RemotingCommand.createRequestCommand創(chuàng)建一個request(一個RemotingCommand實例),然后調用remotingClient.invokeSync發(fā)送請求.
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
//根據(jù)addr獲得channel
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
//RocketMQ允許用戶定義rpc hook,可在發(fā)送請求前,或者接受響應后執(zhí)行
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
//將數(shù)據(jù)流轉給抽象類NettyRemotingAbstract
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis);
//rpc hook
if (this.rpcHook != null) {
this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
}
return response;
} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
} catch (RemotingTimeoutException e) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
可以看到, 真正發(fā)送請求的是invokeSyncImpl方法, 該方法定義在類NettyRemotingAbstract中, 代碼如下:
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
//相當于request ID, RemotingCommand會為每一個request產生一個request ID, 從0開始, 每次加1
final int opaque = request.getOpaque();
try {
//根據(jù)request ID構建ResponseFuture
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
//將ResponseFuture放入responseTable
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
//刷出數(shù)據(jù)
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
//消息發(fā)送后執(zhí)行
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
PLOG.warn("send a request command to channel <" + addr + "> failed.");
}
});
//等待服務器端響應結果
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
從代碼中可以看到, 即使是同步調用模式, 在RocketMQ內部依然是采用異步的方式完成. 客戶端的流程大體就是如此, 下面介紹服務器端接收到請求后的處理流程.
先看流程圖:
先由Netty接收消息, 接著由handler將數(shù)據(jù)流轉給NettyRemotingServer處理. 先看如何初始化一個RemotingServer. 示例代碼如下:
public static RemotingServer createRemotingServer() throws InterruptedException {
NettyServerConfig config = new NettyServerConfig();
//初始化RemotingServer, 此處的邏輯與RemotingClient大體相當
RemotingServer remotingServer = new NettyRemotingServer(config);
//注冊一個處理器,根據(jù)requestCode, 獲取處理器,處理請求
remotingServer.registerProcessor(0, new NettyRequestProcessor() {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
request.setRemark("Hi " + ctx.channel().remoteAddress());
return request;
}
@Override
public boolean rejectRequest() {
return false;
}
}, Executors.newCachedThreadPool());
//啟動RemotingServer
remotingServer.start();
return remotingServer;
}
首先實例化一個NettyRemotingServer對象, 此邏輯與NettyRemotingClient大致相當. 接著, 在remotingServer啟動之前注冊一個processor用于處理對應requestcode的處理器, 示例中用的是0, 這與remotingClient示例中的code是對應的(RemotingCommand.createRequestCommand(0, requestHeader)), 當remotingServer收到code=0的請求時,會使用這個processor去處理請求. 接著
啟動RemotingServer, 這個過程大致就是一個啟動netty ServerBootstrap的過程, 代碼如下:
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnetManageHandler(),
new NettyServerHandler());
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecuter.start();
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Exception e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
整個啟動過程與RemotingClient類似, 使用的handler也類似, 區(qū)別在于RemotingServerz. 服務器端以后添加的handler是NettyServerHandler(客戶端用的是NettyClientHandler). 服務器端接收到請求后, 對消息的處理邏輯在NettyRemotingAbstract中(此處省略了一大波netty框架接收到消息后的數(shù)據(jù)流轉過程), 如下:
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
//根據(jù)RemotingCommand中的code獲取processor和ExecutorService
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
//rpc hook
RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
if (rpcHook != null) {
rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
}
//processor處理請求
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
//rpc hook
if (rpcHook != null) {
rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
}
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
PLOG.error("process request over, but response failed", e);
PLOG.error(cmd.toString());
PLOG.error(response.toString());
}
} else {
}
}
} catch (Throwable e) {
if (!"com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException"
.equals(e.getClass().getCanonicalName())) {
PLOG.error("process request exception", e);
PLOG.error(cmd.toString());
}
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try {
//封裝requestTask
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
//想線程池提交requestTask
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
+ ", too many requests and system thread pool busy, RejectedExecutionException " //
+ pair.getObject2().toString() //
+ " request code: " + cmd.getCode());
}
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
String error = " request type " + cmd.getCode() + " not supported";
//構建response
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
服務器端的流程大體如此.
將客戶端和服務器端聯(lián)合起來的流程圖如下:
異步調用和單項調用的原理與同步調用大致相當, 此處不再重復介紹.
其他部分的處理過程
前文中講到, 每次有消息需要發(fā)送, 就會生成resposneFuture用于接收消息回應, 但是如果始終沒有收到回應, Map(scanResponseTable)中的responseFuture就會堆積.
這個時候就需要一個線程來專門做Map的清理回收, 即前文提到的定時掃描responseTable的任務, 這個線程會1s調用一次來檢查所有的responseFuture, 判斷是否有效, 是否已經得到返回, 并進行相應的處理. 代碼如下:
//類NettyRemotingAbstract
public void scanResponseTable() {
final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, ResponseFuture> next = it.next();
ResponseFuture rep = next.getValue();
if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
rep.release();
it.remove();
rfList.add(rep);
PLOG.warn("remove timeout request, " + rep);
}
}
for (ResponseFuture rf : rfList) {
try {
executeInvokeCallback(rf);
} catch (Throwable e) {
PLOG.warn("scanResponseTable, operationComplete Exception", e);
}
}
}
總結
消息隊列的網(wǎng)絡通信模塊總的來說并不復雜, 比較關鍵的幾個部分就是協(xié)議格式的設計, 維護request ID和responseFuture的對應關系, 超時處理等幾個方面.