一、概述
本篇主要跟蹤下producer的發(fā)送流程责语,以SYNC同步模式為例炮障,假定producer已經(jīng)start了
二、主線流程圖
三坤候、流程深入一丟丟
再次說明下胁赢,這里是以SYNC為例子
只對邏輯相對較多的幾個方法做講解
方法2:
增加了一個timeout,發(fā)送超時時間白筹,默認時間 3 秒
SendResult send(Message msg,long timeout)
方法3:sendDefaultImpl()
方法內(nèi)智末,會根據(jù)策略獲取待發(fā)送的隊列谅摄,然后調(diào)用sendKernelImpl發(fā)送消息,如果發(fā)送失敗系馆,會嘗試 1 + 重試次數(shù)(默認為2) = 3次
方法4 sendKernelImpl()
- 首先為消息添加主鍵送漠,格式如下:
UNIQ_KEY : 0BCDF1716BEC18B4AAC27F26B89A0000 - 壓縮消息
- 執(zhí)行hook的before方法(如果有的話)
- 組織requestHeader作為下個方法的參數(shù)
方法6 invokeSync
這個方法在調(diào)用 invokeSyncImpl 的前后,分別調(diào)用了doBeforeRpcHooks及doAfterRpcHooks的hooks方法由蘑,切入RPC調(diào)用
方法7 invokeSyncImpl
這個是最終和broker通訊的代碼闽寡,通過netty的channel.writeAndFlush(request)方法將消息發(fā)送給broker,并通過ChannelFutureListener回調(diào)函數(shù)獲取broker的反饋
通過下面的代碼讓阻塞線程尼酿,其實內(nèi)部就是一個length=1的CountDownLatch
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
然后在ChannelFutureListener回調(diào)函數(shù)的putResponse方法中釋放爷狈,latch - 1,保證獲取到回饋再返回
具體的源代碼如下:
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
// 在這里阻塞 等待響應(yīng)
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}