Rocketmq
使用 Netty
實現(xiàn)了 remoting
模塊(即 RPC
模塊)抒蚜。
一. RemotingClient
和 RemotingServer
接口
1.1 RemotingService
接口
/**
* 遠(yuǎn)程RPC調(diào)用服務(wù)接口
*/
public interface RemotingService {
// 服務(wù)開啟
void start();
// 服務(wù)停止
void shutdown();
// 注冊RPC調(diào)用的鉤子對象RPCHook垮刹, 可以監(jiān)控RPC調(diào)用請求和響應(yīng)數(shù)據(jù)陕壹。
void registerRPCHook(RPCHook rpcHook);
}
RemotingService
是 RemotingClient
和 RemotingServer
接口公共父接口苟鸯,表示遠(yuǎn)程RPC
調(diào)用服務(wù)接口列林,提供了三個方法痰滋。
1.2 RemotingClient
接口
/**
* 遠(yuǎn)程RPC調(diào)用服務(wù)客戶端
*/
public interface RemotingClient extends RemotingService {
/**
* 更新 NameServer 服務(wù)器的地址列表
* @param addrs
*/
void updateNameServerAddressList(final List<String> addrs);
/**
* 獲取 NameServer 服務(wù)器的地址列表
* @return
*/
List<String> getNameServerAddressList();
/**
* 向遠(yuǎn)程服務(wù)器 addr 地址發(fā)送數(shù)據(jù)予权,并同步阻塞等待響應(yīng)些楣。
* 超過給的時間脂凶,沒有數(shù)據(jù)響應(yīng),就拋出異常愁茁。
*/
RemotingCommand invokeSync(final String addr, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException;
/**
* 向遠(yuǎn)程服務(wù)器 addr 地址發(fā)送數(shù)據(jù)蚕钦,立刻返回。
* 當(dāng)遠(yuǎn)程服務(wù)器有響應(yīng)埋市,那么就回調(diào) InvokeCallback 的方法冠桃,傳遞響應(yīng)數(shù)據(jù);
* 如果超過給的時間道宅,也會回調(diào) InvokeCallback 的方法食听,響應(yīng)失敗供置。
*/
void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException;
/**
* 注冊遠(yuǎn)程請求命令處理程序伯复,
*/
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
/**
* 設(shè)置異步請求響應(yīng)回調(diào) InvokeCallback 的方法線程池執(zhí)行器
*/
void setCallbackExecutor(final ExecutorService callbackExecutor);
ExecutorService getCallbackExecutor();
/**
* 通道是否可寫
*/
boolean isChannelWritable(final String addr);
}
RemotingClient
是RPC
服務(wù)的客戶端接口,有如下方法:
-
invokeSync
,invokeAsync
和invokeOneway
: 都是向遠(yuǎn)程服務(wù)器addr
地址發(fā)送數(shù)據(jù)闪萄。區(qū)別是
invokeSync
同步阻塞等待響應(yīng)泞当;invokeAsync
異步發(fā)送迹蛤,在InvokeCallback
回調(diào)方法中傳遞響應(yīng)結(jié)果;invokeOneway
只是發(fā)送數(shù)據(jù)襟士,不管響應(yīng)結(jié)果盗飒。
注意如果addr
值為null
,就表示向NameServer
服務(wù)器地址發(fā)送數(shù)據(jù)陋桂。 -
updateNameServerAddressList
和getNameServerAddressList
: 更新和獲取NameServer
服務(wù)器的地址列表逆趣。 -
registerProcessor
: 注冊遠(yuǎn)程請求命令處理程序。注意客戶端不只是接收到服務(wù)端的響應(yīng)結(jié)果嗜历,也會接收到服務(wù)端的請求數(shù)據(jù)的宣渗,一般都是服務(wù)端主動通知客戶端的數(shù)據(jù)信息抖所;在
MQClientAPIImpl
類中調(diào)用了這個方法。 -
setCallbackExecutor
和getCallbackExecutor
: 異步請求響應(yīng)回調(diào)InvokeCallback
方法的線程池執(zhí)行器痕囱。 -
isChannelWritable
: 通道是否可寫田轧。
1.3 RemotingServer
接口
/**
* 遠(yuǎn)程RPC調(diào)用服務(wù)服務(wù)端
*/
public interface RemotingServer extends RemotingService {
/**
* 注冊特定請求(requestCode)的處理器和對應(yīng)線程池執(zhí)行器
*/
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
/**
* 注冊默認(rèn)請求的處理器和對應(yīng)線程池執(zhí)行器
*/
void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
/**
* 服務(wù)端監(jiān)聽的端口
*/
int localListenPort();
/**
* 根據(jù)請求 requestCode,獲取對應(yīng)請求命令處理器和線程池
*/
Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
/**
* 向指定客戶端 `channel` 發(fā)送數(shù)據(jù)鞍恢,并同步阻塞等待響應(yīng)傻粘。
* 超過給的時間,沒有數(shù)據(jù)響應(yīng)有序,就拋出異常抹腿。
*/
RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
RemotingTimeoutException;
/**
* 向指定客戶端 `channel` 發(fā)送數(shù)據(jù),立刻返回旭寿。
* 當(dāng)遠(yuǎn)程服務(wù)器有響應(yīng)警绩,那么就回調(diào) InvokeCallback 的方法,傳遞響應(yīng)數(shù)據(jù)盅称;
* 如果超過給的時間肩祥,也會回調(diào) InvokeCallback 的方法,響應(yīng)失敗缩膝。
*/
void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
/**
* 向指定客戶端 `channel` 發(fā)送數(shù)據(jù)混狠,不管響應(yīng)結(jié)果。
*/
void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException;
}
RemotingServer
是 RPC
服務(wù)的服務(wù)端接口疾层,有如下方法:
-
registerProcessor
: 注冊特定請求(requestCode
)的處理器和對應(yīng)線程池執(zhí)行器将饺。 -
registerDefaultProcessor
: 注冊默認(rèn)請求的處理器和對應(yīng)線程池執(zhí)行器。 -
localListenPort
: 獲取服務(wù)端監(jiān)聽的端口痛黎。 -
getProcessorPair
: 根據(jù)請求requestCode
予弧,獲取對應(yīng)請求命令處理器和線程池。 -
invokeSync
,invokeAsync
和invokeOneway
: 向指定客戶端channel
發(fā)送數(shù)據(jù)湖饱。
二. NettyRemotingAbstract
類
NettyRemotingAbstract
是 RPC
服務(wù)基礎(chǔ)抽樣類掖蛤,客戶端和服務(wù)端實現(xiàn)類都繼承這個抽樣類。
2.1 重要成員變量
/**
* Semaphore to limit maximum number of on-going one-way requests, which protects system memory footprint.
*
* 就是為了限制 invokeOneway(...) 方法的最大請求數(shù)量井厌,保護系統(tǒng)內(nèi)存占用蚓庭。
*/
protected final Semaphore semaphoreOneway;
/**
* Semaphore to limit maximum number of on-going asynchronous requests, which protects system memory footprint.
*
* 就是為了限制 invokeAsync(...) 方法的最大請求數(shù)量,保護系統(tǒng)內(nèi)存占用仅仆。
*/
protected final Semaphore semaphoreAsync;
/**
* This map caches all on-going requests.
*
* 緩存所有正在進行的請求器赞。
*/
protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256);
/**
* This container holds all processors per request code, aka, for each incoming request, we may look up the
* responding processor in this map to handle the request.
*
* 儲存所有請求命令(requestCode)對應(yīng)的處理器 NettyRequestProcessor 和 線程池執(zhí)行器,處理請求命令返回響應(yīng)結(jié)果
*/
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
/**
* Executor to feed netty events to user defined {@link ChannelEventListener}.
*
* 將netty事件提供給用戶定義 ChannelEventListener 的后臺服務(wù)執(zhí)行器墓拜。
*/
protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
/**
* The default request processor to use in case there is no exact match in {@link #processorTable} per request code.
*
* 默認(rèn)的所有請求命令對應(yīng)的處理器拳魁;
* 當(dāng)請求命令沒有精確匹配到處理器時使用。
*/
protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
/**
* SSL context via which to create {@link SslHandler}.
*
* 用于處理 SSL
*/
protected volatile SslContext sslContext;
/**
* custom rpc hooks
*
* 用戶自定義的 RPC 回調(diào)鉤子
*/
protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();
-
semaphoreOneway
和semaphoreAsync
: 通過信號量Semaphore
來限制oneway
和async
請求的數(shù)量撮弧,因為這兩種請求都是異步潘懊。同步請求不需要這個,因為同步請求本身就是阻塞的贿衍。
-
responseTable
: 記錄所有正在進行的請求授舟,包括同步請求和異步請求,但是沒有oneway
類型請求贸辈。 -
processorTable
: 儲存指定請求命令(requestCode
)對應(yīng)的處理器NettyRequestProcessor
和 線程池執(zhí)行器释树。即通過registerProcessor
方法注冊的。 -
defaultRequestProcessor
: 默認(rèn)的所有請求命令對應(yīng)的處理器擎淤,當(dāng)processorTable
中沒有找到對應(yīng)的處理器時奢啥,就會使用這個。注意這個值只在服務(wù)端 (
NettyRemotingServer
) 實現(xiàn)中賦值了嘴拢;在客戶端(NettyRemotingClient
) 實現(xiàn)中這個值就是null
桩盲,也就是說客戶端只能處理指定requestCode
請求命令。 -
nettyEventExecutor
: 將Netty
事件提供給用戶定義ChannelEventListener
接口的后臺服務(wù)執(zhí)行器席吴。 -
sslContext
: 用于處理SSL
赌结。 -
rpcHooks
: 用戶自定義的RPC
回調(diào)鉤子。
2.2 重要方法
2.2.1 processMessageReceived
方法
// 處理遠(yuǎn)程命令孝冒,包括請求命令和響應(yīng)命令
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
// 請求命令
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
// 響應(yīng)命令
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
這個方法一般都是在 Netty
接收到數(shù)據(jù)柬姚,轉(zhuǎn)成 RemotingCommand
對象,然后調(diào)用這個方法庄涡;分為請求命令和響應(yīng)命令量承。
2.2.2 processRequestCommand
方法
/**
* Process incoming request command issued by remote peer.
*
* 處理遠(yuǎn)程請求命令
* @param ctx channel handler context.
* @param cmd request command.
*/
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
// 通過請求命令的 code 獲取對應(yīng)的處理器
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
// 如果沒有匹配到處理器,就用默認(rèn)的處理器
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
// 這次請求的 opaque穴店,用這個值來實現(xiàn)響應(yīng)和請求一一對應(yīng)
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
// 請求處理開始撕捍,回調(diào) RPC 鉤子方法
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingResponseCallback callback = new RemotingResponseCallback() {
@Override
public void callback(RemotingCommand response) {
// 請求處理結(jié)束,回調(diào) RPC 鉤子方法
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
// 請求命令不是一次性命令 Oneway
if (!cmd.isOnewayRPC()) {
if (response != null) {
// 設(shè)置響應(yīng)的 opaque 就是請求的 opaque
response.setOpaque(opaque);
response.markResponseType();
try {
// 將響應(yīng)發(fā)送回去
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
}
}
};
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
processor.asyncProcessRequest(ctx, cmd, callback);
} else {
NettyRequestProcessor processor = pair.getObject1();
RemotingCommand response = processor.processRequest(ctx, cmd);
callback.callback(response);
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());
// 不是 Oneway 類型迹鹅,就要有響應(yīng)卦洽,返回 SYSTEM_ERROR 響應(yīng)
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};
// 請求處理器 NettyRequestProcessor,是不是拒絕處理請求
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
// 設(shè)置響應(yīng)的 opaque 就是請求的 opaque
response.setOpaque(opaque);
// 將響應(yīng)發(fā)送回去
ctx.writeAndFlush(response);
return;
}
try {
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
// 請求處理過程是在請求處理器對應(yīng)的線程執(zhí)行器ExecutorService 中運行
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
// RejectedExecutionException 表示線程池拒絕執(zhí)行任務(wù)斜棚。
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
}
// 異常處理的響應(yīng)
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
// 沒有對應(yīng)請求處理器的響應(yīng)
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
// 將響應(yīng)數(shù)據(jù) response 返回
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
處理遠(yuǎn)程請求阀蒂,這個方法雖然長,但是流程其實很簡單:
- 通過
processorTable
和defaultRequestProcessor
來得到這個請求命令對應(yīng)的處理器NettyRequestProcessor
和 線程執(zhí)行器ExecutorService
弟蚀。 - 獲取請求的
opaque
, 這個值很重要蚤霞,用這個值來實現(xiàn)響應(yīng)和請求一一對應(yīng)。 - 如果沒有對應(yīng)處理器
NettyRequestProcessor
义钉,那么就返回code
是RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED
的響應(yīng)昧绣。 - 創(chuàng)建一個
run
來包裝實際處理請求的代碼,因為請求處理過程要放在指定線程執(zhí)行器ExecutorService
中執(zhí)行捶闸。通過
NettyRequestProcessor
的processRequest
方法或者asyncProcessRequest
方法處理請求夜畴,獲取響應(yīng)結(jié)果response
, 最后通過ctx.writeAndFlush(response)
方法拖刃,將響應(yīng)返回給請求端。 - 判斷請求處理器
NettyRequestProcessor
贪绘,是否拒絕處理請求兑牡。 - 在
ExecutorService
的線程池中執(zhí)行run
。
2.2.3 processResponseCommand
方法
/**
* Process response from remote peer to the previous issued requests.
*
* 處理響應(yīng)
*
* @param ctx channel handler context.
* @param cmd response command instance.
*/
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
// 通過響應(yīng)的 opaque税灌,來獲取對應(yīng)請求的 ResponseFuture
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
// 設(shè)置響應(yīng)結(jié)果
responseFuture.setResponseCommand(cmd);
responseTable.remove(opaque);
if (responseFuture.getInvokeCallback() != null) {
// 異步請求均函,設(shè)置響應(yīng)
executeInvokeCallback(responseFuture);
} else {
// 同步請求,設(shè)置響應(yīng)
responseFuture.putResponse(cmd);
responseFuture.release();
}
} else {
// 收到響應(yīng)菱涤,卻沒有對應(yīng)匹配的請求
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}
處理響應(yīng)結(jié)果苞也,方法流程:
- 通過響應(yīng)的
opaque
,從responseTable
中獲取對應(yīng)請求的ResponseFuture
粘秆。 - 設(shè)置響應(yīng)值如迟,并從
responseTable
中移除這個ResponseFuture
。 - 如果是異步請求翻擒,那么通過
executeInvokeCallback
方法氓涣,在回調(diào)線程池中響應(yīng)回調(diào)。 - 如果是同步請求陋气,通過
responseFuture.putResponse(cmd)
方法劳吠,喚醒正在阻塞等待的線程。
2.2.4 invokeSyncImpl
方法
/**
* 同步發(fā)送請求
*/
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
// 獲取這次請的 opaque
final int opaque = request.getOpaque();
try {
// 構(gòu)建一個響應(yīng)ResponseFuture
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
// 將這個請求存入 responseTable 中
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
// 通過 channel.writeAndFlush(...) 方法將請求發(fā)送到遠(yuǎn)端
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
// 這里的回調(diào)巩趁,說明請求已經(jīng)發(fā)送出去了
if (f.isSuccess()) {
// 設(shè)置請求發(fā)送成功痒玩,直接返回
responseFuture.setSendRequestOK(true);
return;
} else {
// 設(shè)置請求發(fā)送失敗,也就是不需要等待響應(yīng)了议慰,
// 這次請求就直接失敗了
responseFuture.setSendRequestOK(false);
}
// 執(zhí)行到這里蠢古,表明請求發(fā)送失敗了。
// 從正在請求集合中移除這次請求
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
// 調(diào)用 putResponse 方法别凹,喚醒通過
// `responseFuture.waitResponse(timeoutMillis)` 等待請求返回的線程
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
// 同步發(fā)送請求草讶,這里需要等待遠(yuǎn)端的響應(yīng)。
// 用到了countDownLatch 來阻塞當(dāng)前線程炉菲,
// 等待響應(yīng)回來之后喚醒堕战,或者超時喚醒
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
// null == responseCommand,說明響應(yīng)沒有,需要拋出異常
if (responseFuture.isSendRequestOK()) {
// 如果 isSendRequestOK 是true拍霜,說明請求發(fā)送出去了嘱丢,但是響應(yīng)沒有,響應(yīng)超時
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
// 如果 isSendRequestOK 是false祠饺,說明請求都沒有發(fā)送出去越驻,發(fā)送請求超時。
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
發(fā)送同步請求,方法流程:
- 創(chuàng)建
ResponseFuture
對象缀旁,并存入到responseTable
中记劈。 - 通過
channel.writeAndFlush()
方法,將請求發(fā)送到遠(yuǎn)端诵棵。并添加
ChannelFutureListener
監(jiān)控抠蚣,如果請求發(fā)送成功,那么設(shè)置responseFuture
的sendRequestOK
為true
履澳;如果請求發(fā)送失敗,那么從responseTable
移除responseFuture
怀跛,并通過putResponse
方法喚醒阻塞等待的線程距贷。 - 調(diào)用
responseFuture.waitResponse(timeoutMillis)
方法,同步阻塞等待響應(yīng)吻谋。 - 如果
responseCommand
為null
忠蝗,那么就拋出異常。
2.2.5 invokeAsyncImpl
方法
/**
* 異步發(fā)送請求
*/
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
// 開始時間
long beginStartTime = System.currentTimeMillis();
// 獲取這次請的 opaque
final int opaque = request.getOpaque();
// 通過 semaphoreAsync 來限制異步請求最大數(shù)量
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
// 通過 SemaphoreReleaseOnlyOnce 保證異步請求漓拾,
// 只會釋放一個 semaphoreAsync 的許可
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
// 已經(jīng)超時阁最,釋放許可,并拋出異常
once.release();
throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
}
// 構(gòu)建一個響應(yīng)ResponseFuture骇两,
// 注意異步請求就會有 invokeCallback 對象,
// 也要將 SemaphoreReleaseOnlyOnce 對象傳遞進去速种,用于釋放semaphoreAsync 許可。
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
// 將這個請求存入 responseTable 中
this.responseTable.put(opaque, responseFuture);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
// 這里的回調(diào)低千,說明請求已經(jīng)發(fā)送出去了
if (f.isSuccess()) {
// 設(shè)置請求發(fā)送成功配阵,直接返回
responseFuture.setSendRequestOK(true);
return;
}
// 執(zhí)行到這里,表明請求發(fā)送失敗了示血。
requestFail(opaque);
// 這里沒有調(diào)用 responseFuture.setCause(f.cause());棋傍, ChannelFuture的異常丟失了
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
}
});
} catch (Exception e) {
// 釋放semaphoreAsync 許可。
responseFuture.release();
log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
// 執(zhí)行到這里难审,表示沒有獲取到發(fā)送異步請求的許可瘫拣,直接拋出超時異常
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
} else {
String info =
String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
timeoutMillis,
this.semaphoreAsync.getQueueLength(),
this.semaphoreAsync.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
}
}
}
發(fā)送異步請求,方法流程:
- 通過
semaphoreAsync
來限制異步請求最大數(shù)量告喊。 - 如果沒有獲取到許可麸拄,那么就拋出異常。
- 獲取到許可葱绒,先創(chuàng)建
SemaphoreReleaseOnlyOnce
對象感帅,保證只會釋放一次semaphoreAsync
的許可。 - 創(chuàng)建
ResponseFuture
對象地淀,并存入到responseTable
中失球。 - 通過
channel.writeAndFlush(request)
方法,將請求送到到遠(yuǎn)端。并添加
ChannelFutureListener
監(jiān)控实苞,如果請求發(fā)送成功豺撑,那么設(shè)置responseFuture
的sendRequestOK
為true
;如果請求發(fā)送失敗黔牵,那么調(diào)用requestFail()
方法聪轿,進行失敗通知。 - 與
invokeSyncImpl
方法不同猾浦,就是不會阻塞等待陆错,通過processResponseCommand
方法,調(diào)用executeInvokeCallback
方法金赦,通知異步請求的響應(yīng)結(jié)果音瓷。
2.2.6 invokeOnewayImpl
方法
/**
* 發(fā)送一次性請求
*/
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
// 設(shè)置請求是一次性請求 RPC_ONEWAY
request.markOnewayRPC();
// 通過 semaphoreAsync 來限制一次性請求最大數(shù)量
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
// 通過SemaphoreReleaseOnlyOnce保證,只會釋放一個 semaphoreOneway 的許可
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
// 請求發(fā)送成功夹抗,就表示完成了绳慎,不需要等待響應(yīng)
once.release();
if (!f.isSuccess()) {
log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
}
}
});
} catch (Exception e) {
once.release();
log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
// 執(zhí)行到這里,表示沒有獲取到發(fā)送一次性請求的許可漠烧,直接拋出超時異常
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
} else {
String info = String.format(
"invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
timeoutMillis,
this.semaphoreOneway.getQueueLength(),
this.semaphoreOneway.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
}
}
}
發(fā)送一次性請求杏愤,與 invokeAsyncImpl
方法相比,就是不需要創(chuàng)建ResponseFuture
對象已脓,存入到 responseTable
中珊楼。
2.2.7 scanResponseTable
方法
/**
* <p>
* This method is periodically invoked to scan and expire deprecated request.
* </p>
*
* 掃描所有正在進行的請求,發(fā)現(xiàn)超時的請求摆舟,就移除它亥曹,并進行失敗通知
*/
public void scanResponseTable() {
// 記錄所有過期的請求
final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
// 遍歷所有正在進行的請求
while (it.hasNext()) {
Entry<Integer, ResponseFuture> next = it.next();
ResponseFuture rep = next.getValue();
// 如果這個請求的時間已經(jīng)超過設(shè)置的超時時間TimeoutMillis,
// 那么就要從 responseTable 中移除它恨诱,添加到 rfList 集合中媳瞪,進行失敗通知。
if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
rep.release();
it.remove();
rfList.add(rep);
log.warn("remove timeout request, " + rep);
}
}
// 遍歷超時的請求照宝,通知它們
for (ResponseFuture rf : rfList) {
try {
executeInvokeCallback(rf);
} catch (Throwable e) {
log.warn("scanResponseTable, operationComplete Exception", e);
}
}
}
掃描所有正在進行的請求蛇受,發(fā)現(xiàn)超時的請求,就從 responseTable
中移除它厕鹃,并進行失敗通知兢仰。
注意這里雖然掃描所有正在進行請求(包括同步請求和異步請求) 的超時情況,但是只調(diào)用了
executeInvokeCallback()
方法剂碴,進行異步請求的通知把将;而沒有調(diào)用responseFuture.putResponse()
方法,喚醒同步請求忆矛,因為同步請求waitResponse()
方法察蹲,等超時了會自動喚醒请垛。
2.3 ResponseFuture
類
// 請求編號
private final int opaque;
// 發(fā)送請求的通道 Channel
private final Channel processChannel;
// 請求超時時間
private final long timeoutMillis;
// 異步請求回調(diào)接口實例
private final InvokeCallback invokeCallback;
// 開始時間,用于判斷是否超時
private final long beginTimestamp = System.currentTimeMillis();
// 用于同步請求洽议,阻塞當(dāng)前線程
private final CountDownLatch countDownLatch = new CountDownLatch(1);
// 用于保證只釋放一次許可 Semaphore
private final SemaphoreReleaseOnlyOnce once;
// 保證異步回調(diào)只調(diào)用一次
private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false);
// 響應(yīng)結(jié)果對象
private volatile RemotingCommand responseCommand;
// 發(fā)送請求成功
private volatile boolean sendRequestOK = true;
// 失敗原因
private volatile Throwable cause;
通過 countDownLatch
來實現(xiàn)同步阻塞宗收,通過 executeCallbackOnlyOnce
保證異步回調(diào)只調(diào)用一次。
注:這里
executeCallbackOnlyOnce
不應(yīng)該使用AtomicBoolean
類型亚兄,因為ResponseFuture
對象每次請求的時候都會創(chuàng)建混稽,使用AtomicBoolean
對象,非常占用內(nèi)存审胚,應(yīng)該使用AtomicIntegerFieldUpdater
+volatile int
的模式匈勋。
2.4 小結(jié)
-
通過
processMessageReceived()
方法,處理遠(yuǎn)程命令包括請求命令和響應(yīng)命令菲盾。- 通過
processRequestCommand()
方法處理請求命令颓影,根據(jù)請求code
獲取對應(yīng)的請求處理器和線程池執(zhí)行器,在對應(yīng)線程池中處理請求命令懒鉴。 - 通過
processResponseCommand()
方法處理響應(yīng),如果是同步請求碎浇,就是喚醒阻塞等待線程临谱,并獲取響應(yīng)結(jié)果;如果是異步線程奴璃, 就在異步線程池執(zhí)行器getCallbackExecutor()
中悉默,將響應(yīng)結(jié)果回調(diào)。
- 通過
invokeSyncImpl()
方法苟穆,發(fā)送同步請求抄课。創(chuàng)建ResponseFuture
對象放入responseTable
集合中,通過channel.writeAndFlush(request)
方法發(fā)送請求數(shù)據(jù)雳旅,通過responseFuture.waitResponse()
方法阻塞當(dāng)前線程跟磨,等待響應(yīng)結(jié)果。invokeAsyncImpl
方法攒盈,發(fā)送異步請求抵拘。通過semaphoreAsync
限制異步請求并發(fā)數(shù),然后創(chuàng)建ResponseFuture
對象放入responseTable
集合中型豁,通過channel.writeAndFlush(request)
方法發(fā)送請求數(shù)據(jù)僵蛛。invokeOnewayImpl
方法,發(fā)送一次性請求迎变。通過semaphoreOneway
限制異步請求并發(fā)數(shù)充尉,通過channel.writeAndFlush(request)
方法發(fā)送請求數(shù)據(jù)。scanResponseTable
方法衣形,定時巡查超時請求驼侠,并進行通知。
三. NettyRemotingClient
類
這個是RPC
服務(wù)的客戶端具體實現(xiàn)類。
3.1 重要成員屬性
// Netty的配置項
private final NettyClientConfig nettyClientConfig;
// Netty客戶端引導(dǎo)類
private final Bootstrap bootstrap = new Bootstrap();
// 用來處理當(dāng)前客戶端所有 Socket連接的 IO事件
private final EventLoopGroup eventLoopGroupWorker;
// 控制 channelTables 并發(fā)修改的鎖
private final Lock lockChannelTables = new ReentrantLock();
// 緩存地址 addr 對應(yīng)的通道 channel泪电,這樣可以直接通過地址獲取 channel般妙,進行數(shù)據(jù)傳輸
private final ConcurrentMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>();
// 用于檢查請求是否過期的定時器
private final Timer timer = new Timer("ClientHouseKeepingService", true);
// namesrv 的地址列表
private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
// 當(dāng)前被選中 namesrv 地址
private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
// 記錄當(dāng)前選中 namesrv 的索引值
private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
// 用于并發(fā)修改 namesrvAddrList 和 namesrvAddrChoosed 值的鎖
private final Lock lockNamesrvChannel = new ReentrantLock();
// 公共線程池執(zhí)行器,
// 如果調(diào)用 `registerProcessor(...)`方法注冊請求處理器NettyRequestProcessor時相速,沒有設(shè)置ExecutorService,那么就是publicExecutor碟渺;
// 如果沒有設(shè)置異步請求響應(yīng)回調(diào)處理線程池 callbackExecutor,那么也直接使用這個公共線程池 publicExecutor突诬。
private final ExecutorService publicExecutor;
/**
* 異步請求響應(yīng)回調(diào)處理線程池 callbackExecutor
*/
private ExecutorService callbackExecutor;
// Netty 事件的監(jiān)聽接口
private final ChannelEventListener channelEventListener;
// 用來處理 ChannelHandler 的方法苫拍,線程數(shù)是 NettyClientConfig 中的 clientWorkerThreads 值
private DefaultEventExecutorGroup defaultEventExecutorGroup;
-
nettyClientConfig
:Netty
的一些配置項值。 -
bootstrap
:Netty
客戶端引導(dǎo)類旺隙。 -
lockChannelTables
和channelTables
: 緩存地址addr
對應(yīng)的通道channel
绒极。 -
timer
: 用于檢查請求是否過期的定時器。 -
namesrvAddrList
,namesrvAddrChoosed
,namesrvIndex
和lockNamesrvChannel
: 記錄namesrv
地址列表蔬捷,和當(dāng)前選中的namesrv
地址垄提。 -
publicExecutor
: 公共線程池執(zhí)行器。 -
callbackExecutor
: 異步請求響應(yīng)回調(diào)處理線程池執(zhí)行器周拐。 -
channelEventListener
:Netty
事件的監(jiān)聽接口铡俐。 -
eventLoopGroupWorker
: 用來處理當(dāng)前客戶端所有Socket
連接的IO
事件,只需要一個線程就可以了妥粟。 -
defaultEventExecutorGroup
: 用來處理ChannelHandler
的方法审丘,線程數(shù)是NettyClientConfig
中的clientWorkerThreads
值。
3.2 重要方法
3.2.1 構(gòu)造方法
public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
this(nettyClientConfig, null);
}
public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
final ChannelEventListener channelEventListener) {
super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
this.nettyClientConfig = nettyClientConfig;
this.channelEventListener = channelEventListener;
// 獲取公共線程池的線程數(shù)
int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
// 創(chuàng)建公共線程池 publicExecutor勾给,線程名都是 NettyClientPublicExecutor_ 開頭
// 如果調(diào)用 `registerProcessor(...)`方法注冊 請求處理器NettyRequestProcessor滩报,
// 沒有設(shè)置線程池,那么就用這個公共線程池播急,也就是處理請求的操作就在 publicExecutor 線程池中執(zhí)行脓钾。
// 如果沒有設(shè)置異步請求響應(yīng)回調(diào)處理線程池 callbackExecutor,那么也直接使用這個公共線程池 publicExecutor
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
// 用來處理當(dāng)前客戶端所有 Socket連接的 IO事件旅择,就使用一個線程處理
this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
}
});
// 是否要使用 SSL
if (nettyClientConfig.isUseTLS()) {
try {
sslContext = TlsHelper.buildSslContext(true);
log.info("SSL enabled for client");
} catch (IOException e) {
log.error("Failed to create SSLContext", e);
} catch (CertificateException e) {
log.error("Failed to create SSLContext", e);
throw new RuntimeException("Failed to create SSLContext", e);
}
}
}
就是創(chuàng)建 publicExecutor
和 eventLoopGroupWorker
對象惭笑,如果支持 SSL
, 那么再創(chuàng)建 sslContext
對象。
3.2.2 start
方法
@Override
public void start() {
// 這個線程池 defaultEventExecutorGroup 是用來處理 ChannelHandler 的方法
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
// 通過 eventLoopGroupWorker 線程來接收 IO 事件生真,
// 然后交給 defaultEventExecutorGroup 線程沉噩,來進行事件處理,
// 這樣不會阻塞 處理 IO 事件線程柱蟀。
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
// Socket 的 keepalive 是false
.option(ChannelOption.SO_KEEPALIVE, false)
// Socket 連接建立的超時時間
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
// Socket 發(fā)送緩存區(qū)大小
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
// Socket 接收緩存區(qū)大小
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
// 是否需要使用 SSL 加密
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
// 使用 defaultEventExecutorGroup 里面的線程
// 來處理接收到Socket IO事件數(shù)據(jù)的解析和處理
pipeline.addLast(
defaultEventExecutorGroup,
// 將命令RemotingCommand 對象轉(zhuǎn)成緩存區(qū)對象ByteBuf川蒙,以便發(fā)送到遠(yuǎn)端
new NettyEncoder(),
// 將接收到數(shù)據(jù)對象ByteBuf 解析成命令RemotingCommand 對象
new NettyDecoder(),
// 進行心跳處理,當(dāng)當(dāng)前通道Channel 超過一定時間沒有發(fā)送或者讀取到數(shù)據(jù),就當(dāng)失效處理
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
// 主要是做監(jiān)控用的,用來發(fā)送 Netty 的 CONNECT, CLOSE, IDLE, EXCEPTION 事件的
new NettyConnectManageHandler(),
// 這個就是用來處理解析后得到的 遠(yuǎn)程命令RemotingCommand瘟判,
// 其實就是調(diào)用了 processMessageReceived(...) 方法
new NettyClientHandler());
}
});
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
// 每隔三秒掃描有沒有過期請求
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
}
這個方法很長乳讥,但是流程其實很簡單:
- 創(chuàng)建
defaultEventExecutorGroup
線程池執(zhí)行器,用于執(zhí)行ChannelHandler
方法尚辑。 - 初始化
bootstrap
對象贱勃,使用eventLoopGroupWorker
處理通道的IO
事件刚操,使用defaultEventExecutorGroup
執(zhí)行添加到ChannelPipeline
的處理器ChannelHandler
的方法恬汁。 - 定時器
timer
每隔三秒掃描有沒有過期請求伶椿。 - 如果
channelEventListener
不為null
, 那么開啟nettyEventExecutor
線程, 將Netty
事件提供給用戶定義ChannelEventListener
氓侧。
3.2.3 closeChannel
方法
public void closeChannel(final String addr, final Channel channel) {
if (null == channel)
return;
// 從通道channel 獲取對應(yīng)的遠(yuǎn)端地址
final String addrRemote = null == addr ? RemotingHelper.parseChannelRemoteAddr(channel) : addr;
try {
// 加鎖脊另,因為要改變 channelTables 集合數(shù)據(jù)
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
boolean removeItemFromTable = true;
// 得到channelTables 中的通道 ChannelWrapper
final ChannelWrapper prevCW = this.channelTables.get(addrRemote);
log.info("closeChannel: begin close the channel[{}] Found: {}", addrRemote, prevCW != null);
// 如果 prevCW 沒有,或者和關(guān)閉的通道 channel 不是同一個约巷,都不用移除
if (null == prevCW) {
log.info("closeChannel: the channel[{}] has been removed from the channel table before", addrRemote);
removeItemFromTable = false;
} else if (prevCW.getChannel() != channel) {
log.info("closeChannel: the channel[{}] has been closed before, and has been created again, nothing to do.",
addrRemote);
removeItemFromTable = false;
}
if (removeItemFromTable) {
this.channelTables.remove(addrRemote);
log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
}
// 關(guān)閉通道
RemotingUtil.closeChannel(channel);
} catch (Exception e) {
log.error("closeChannel: close the channel exception", e);
} finally {
this.lockChannelTables.unlock();
}
} else {
log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}
} catch (InterruptedException e) {
log.error("closeChannel exception", e);
}
}
將這個 channel
從 channelTables
中移除偎痛,并通過 lockChannelTables
鎖進行并發(fā)修改控制。
3.2.4 getAndCreateChannel
方法
/**
* 根據(jù)遠(yuǎn)程地址 addr 獲取通道Channel独郎,和遠(yuǎn)端進行交互
*/
private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException {
if (null == addr) {
// 如果地址 addr 為null踩麦,那么就從 namesrvAddrList 列表中選擇一個
return getAndCreateNameserverChannel();
}
// 記錄了每個地址對應(yīng)的通道
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
// 如果這個通道還能用,那么直接返回
return cw.getChannel();
}
// 根據(jù)地址addr 創(chuàng)建通道 Channel
return this.createChannel(addr);
}
- 如果
addr == null
氓癌,那么就是namesrv
地址靖榕,通過getAndCreateNameserverChannel()
方法,獲取對應(yīng)的通道Channel
顽铸。 - 根據(jù)地址
addr
從channelTables
中獲取對應(yīng)的Channel
。 - 如果沒有可用通道
Channel
料皇,通過createChannel()
方法谓松,創(chuàng)建這個地址addr
的通道。
3.2.5 createChannel
方法
private Channel createChannel(final String addr) throws InterruptedException {
ChannelWrapper cw = this.channelTables.get(addr);
// 如果 channelTables 中有地址addr 對應(yīng)的通道践剂,并且還是能用的直接返回
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
boolean createNewConnection;
cw = this.channelTables.get(addr);
if (cw != null) {
// 如果 channelTables 中有對應(yīng)的通道
if (cw.isOK()) {
// 通道還能用鬼譬,直接返回
return cw.getChannel();
} else if (!cw.getChannelFuture().isDone()) {
// 通道還沒有創(chuàng)建完成,那么也不需要再創(chuàng)建了逊脯,等待它創(chuàng)建完成
createNewConnection = false;
} else {
// 說明通道壞了优质,移除它,設(shè)置createNewConnection為 true军洼,重新創(chuàng)建
this.channelTables.remove(addr);
createNewConnection = true;
}
} else {
createNewConnection = true;
}
if (createNewConnection) {
// 調(diào)用 bootstrap.connect(...) 方法創(chuàng)建通道
ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
cw = new ChannelWrapper(channelFuture);
// 存入 channelTables 中巩螃。
this.channelTables.put(addr, cw);
}
} catch (Exception e) {
log.error("createChannel: create channel exception", e);
} finally {
this.lockChannelTables.unlock();
}
} else {
log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}
if (cw != null) {
ChannelFuture channelFuture = cw.getChannelFuture();
// 等待通道創(chuàng)建完成
if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
if (cw.isOK()) {
log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
return cw.getChannel();
} else {
log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());
}
} else {
log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
channelFuture.toString());
}
}
return null;
}
通過 this.bootstrap.connect()
方法創(chuàng)建通道,但是要保證不能多次創(chuàng)建匕争,所以通過 lockChannelTables
進行并發(fā)控制避乏。
3.2.6 invokeSync
方法
/**
* 向遠(yuǎn)程服務(wù)器 addr 地址發(fā)送數(shù)據(jù),并同步阻塞等待響應(yīng)甘桑。
*/
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
long beginStartTime = System.currentTimeMillis();
// 根據(jù)遠(yuǎn)程地址 addr 獲取通道Channel拍皮,和遠(yuǎn)端進行交互
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
// 通道還是活躍的歹叮,能進行數(shù)據(jù)交互
try {
// 執(zhí)行鉤子
doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
// 超時就拋出異常
throw new RemotingTimeoutException("invokeSync call timeout");
}
// 調(diào)用 NettyRemotingAbstract 的invokeSyncImpl 方法,發(fā)送請求
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
// 執(zhí)行鉤子
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
return response;
} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
// 發(fā)生異常铆帽,關(guān)閉通道
this.closeChannel(addr, channel);
throw e;
} catch (RemotingTimeoutException e) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw e;
}
} else {
// 如果通道不活躍了咆耿,就關(guān)閉通道
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
- 通過
getAndCreateChannel()
方法,獲取地址addr
對應(yīng)的通道channel
爹橱。 - 如果通道不活躍了萨螺,就關(guān)閉通道,拋出異常宅荤。
- 通道可用屑迂,就調(diào)用父類
NettyRemotingAbstract
的invokeSyncImpl()
方法。
3.2.7 invokeAsync
方法
@Override
public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis();
// 根據(jù)遠(yuǎn)程地址 addr 獲取通道Channel冯键,和遠(yuǎn)端進行交互
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
// 通道還是活躍的惹盼,能進行數(shù)據(jù)交互
try {
doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
// 超時拋出異常
throw new RemotingTooMuchRequestException("invokeAsync call timeout");
}
// 調(diào)用 NettyRemotingAbstract 的 invokeAsyncImpl 方法,發(fā)送異步請求
this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
} catch (RemotingSendRequestException e) {
log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
}
} else {
// 如果獲取的通道 Channel 不能用了惫确,那么從 channelTables 中移除手报,并拋出異常。
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
與 invokeSync
方法流程類似改化。
3.3 相關(guān) Netty
的 ChannelHandler
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
// 是否需要使用 SSL 加密
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
// 使用 defaultEventExecutorGroup 里面的線程
// 來處理接收到Socket IO事件數(shù)據(jù)的解析和處理
pipeline.addLast(
defaultEventExecutorGroup,
// 將命令RemotingCommand 對象轉(zhuǎn)成緩存區(qū)對象ByteBuf掩蛤,以便發(fā)送到遠(yuǎn)端
new NettyEncoder(),
// 將接收到數(shù)據(jù)對象ByteBuf 解析成命令RemotingCommand 對象
new NettyDecoder(),
// 進行心跳處理,當(dāng)當(dāng)前通道Channel 超過一定時間沒有發(fā)送或者讀取到數(shù)據(jù)陈肛,就當(dāng)失效處理
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
// 主要是做監(jiān)控用的揍鸟,用來發(fā)送 Netty 的 CONNECT, CLOSE, IDLE, EXCEPTION 事件的
new NettyConnectManageHandler(),
// 這個就是用來處理解析后得到的 遠(yuǎn)程命令RemotingCommand,
// 其實就是調(diào)用了 processMessageReceived(...) 方法
new NettyClientHandler());
}
-
SslHandler
: 處理SSL
加密句旱,通過sslContext.newHandler(ch.alloc())
創(chuàng)建阳藻。 -
NettyEncoder
: 將命令RemotingCommand
對象轉(zhuǎn)成緩存區(qū)對象ByteBuf
,以便發(fā)送到遠(yuǎn)端谈撒。 -
NettyDecoder
: 將接收到數(shù)據(jù)對象ByteBuf
解析成命令RemotingCommand
對象腥泥。 -
IdleStateHandler
: 進行心跳處理,當(dāng)通道Channel
超過一定時間沒有發(fā)送或者讀取到數(shù)據(jù)啃匿,就會發(fā)送事件進行提醒蛔外。 -
NettyConnectManageHandler
: 主要是做監(jiān)控用的,用來發(fā)送Netty
的CONNECT
,CLOSE
,IDLE
,EXCEPTION
事件溯乒。 -
NettyClientHandler
: 調(diào)用父類NettyRemotingAbstract
的processMessageReceived
方法夹厌,處理遠(yuǎn)程命令。
3.3.1 NettyEncoder
類
/**
* 將 RemotingCommand 轉(zhuǎn)成緩存區(qū) ByteBuffer 對象橙数,
*/
@ChannelHandler.Sharable
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
// 得到遠(yuǎn)程命令頭信息數(shù)據(jù)
ByteBuffer header = remotingCommand.encodeHeader();
// 先寫頭數(shù)據(jù)
out.writeBytes(header);
byte[] body = remotingCommand.getBody();
if (body != null) {
// 再寫內(nèi)容體數(shù)據(jù)
out.writeBytes(body);
}
} catch (Exception e) {
// 發(fā)生錯誤尊流,那么就關(guān)閉通道 Channel。
log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
if (remotingCommand != null) {
log.error(remotingCommand.toString());
}
RemotingUtil.closeChannel(ctx.channel());
}
}
}
通過 encodeHeader()
方法獲取頭信息header
, 寫入頭信息灯帮,然后再寫入體信息 body
崖技。
// 在 RemotingCommand 類中 encodeHeader 方法逻住。
public ByteBuffer encodeHeader() {
return encodeHeader(this.body != null ? this.body.length : 0);
}
public ByteBuffer encodeHeader(final int bodyLength) {
// 1> header length size
// 1. 整個遠(yuǎn)程命令 RemotingCommand 的總長度
int length = 4;
// 2> header data length
// 2. 得到頭數(shù)據(jù)
byte[] headerData;
headerData = this.headerEncode();
// 增加頭數(shù)據(jù)長度
length += headerData.length;
// 3> body data length
// 3. 增加內(nèi)容體數(shù)據(jù)長度
length += bodyLength;
// 內(nèi)容體先不用添加,
// 那么 ByteBuffer 大小就是 4(總長度) + 4(頭長度) + 數(shù)據(jù)頭內(nèi)容
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
// 先將總長度存入
result.putInt(length);
// header length
// 頭數(shù)據(jù)長度 headerData.length 存入迎献,要進行處理
// 第一個字節(jié)儲存類型瞎访,后三個字節(jié)儲存頭長度 headerData.length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
result.flip();
return result;
}
請求命令RemotingCommand
轉(zhuǎn)成ByteBuffer
的數(shù)據(jù)格式:
4個字節(jié)(總長度) + 4個字節(jié)(數(shù)據(jù)頭長度) + 數(shù)據(jù)頭(header)字節(jié)內(nèi)容 + 數(shù)據(jù)體(body)字節(jié)內(nèi)容
其中4個字節(jié)數(shù)據(jù)頭長度中,第一個字節(jié)儲存類型吁恍,后三個字節(jié)儲存頭長度扒秸,也就是說 數(shù)據(jù)頭長度不會超過三個字節(jié)大小。
3.3.2 NettyDecoder
類
/**
* 將接收到數(shù)據(jù)對象 `ByteBuf` 解析成命令 `RemotingCommand` 對象
*/
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private static final int FRAME_MAX_LENGTH =
Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));
public NettyDecoder() {
/**
* 0->4 用4個字節(jié)表示整個內(nèi)容幀的總長度
* initialBytesToStrip == 4冀瓦,表示最后得到的數(shù)據(jù)伴奥,是跳過這個總長度字段。
*/
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
}
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
try {
// 得到完整的遠(yuǎn)程命令數(shù)據(jù)對應(yīng)的緩存區(qū)ByteBuf
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}
ByteBuffer byteBuffer = frame.nioBuffer();
// 從緩存區(qū)ByteBuf 中解析出一個遠(yuǎn)程命令RemotingCommand對象
return RemotingCommand.decode(byteBuffer);
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
RemotingUtil.closeChannel(ctx.channel());
} finally {
if (null != frame) {
frame.release();
}
}
return null;
}
}
繼承自 LengthFieldBasedFrameDecoder
類翼闽,得到完整的遠(yuǎn)程命令數(shù)據(jù)對應(yīng)的緩存區(qū) ByteBuf
拾徙,再通過RemotingCommand.decode(byteBuffer)
從緩存區(qū) ByteBuf
中解析出一個遠(yuǎn)程命令RemotingCommand
對象。
關(guān)于
LengthFieldBasedFrameDecoder
用法感局,請看Netty源碼_編解碼器尼啡。
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
// length 去掉了表示總長度的4個字節(jié),也就是說只包括 4(頭長度) + 頭內(nèi)容 + 體內(nèi)容
int length = byteBuffer.limit();
// 一個四個字節(jié)询微,最高一個字節(jié)記錄類型崖瞭,即 JSON 或者 ROCKETMQ
// 剩下三個字節(jié)才代表頭數(shù)據(jù)長度,即 oriHeaderLen & 0xFFFFFF
int oriHeaderLen = byteBuffer.getInt();
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
// 將頭數(shù)據(jù)存入 字節(jié)數(shù)組headerData 中
byteBuffer.get(headerData);
// 解析頭內(nèi)容
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
// 得到數(shù)據(jù)體的字節(jié)長度
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
根據(jù)數(shù)據(jù)頭長度和數(shù)據(jù)體長度撑毛,從緩存區(qū)中解析出书聚,數(shù)據(jù)頭和數(shù)據(jù)頭的內(nèi)容。
3.3.3 NettyConnectManageHandler
類
class NettyConnectManageHandler extends ChannelDuplexHandler {
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
final String local = localAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(localAddress);
final String remote = remoteAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(remoteAddress);
log.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote);
super.connect(ctx, remoteAddress, localAddress, promise);
// 發(fā)送連接 CONNECT 事件通知
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remote, ctx.channel()));
}
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY CLIENT PIPELINE: DISCONNECT {}", remoteAddress);
// 關(guān)閉通道
closeChannel(ctx.channel());
super.disconnect(ctx, promise);
// 發(fā)送關(guān)閉 CLOSE 事件通知
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
// 關(guān)閉通道
closeChannel(ctx.channel());
super.close(ctx, promise);
NettyRemotingClient.this.failFast(ctx.channel());
// 發(fā)送關(guān)閉 CLOSE 事件通知
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 接收到通道長時間空閑事件藻雌,即心跳檢測
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
// 關(guān)閉通道
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
// 發(fā)送空閑 IDLE 事件通知
NettyRemotingClient.this
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
}
}
}
ctx.fireUserEventTriggered(evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress);
log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
// 關(guān)閉通道
closeChannel(ctx.channel());
// 發(fā)送異常 EXCEPTION 事件通知
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
}
}
}
主要是做監(jiān)控用的寺惫,用來發(fā)送 Netty 的 CONNECT, CLOSE, IDLE, EXCEPTION 事件。
3.3.4 NettyClientHandler
類
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
// 處理遠(yuǎn)程命令
processMessageReceived(ctx, msg);
}
}
3.4 小結(jié)
NettyRemotingClient
主要功能:
- 通過
bootstrap
創(chuàng)建連接通道channel
蹦疑,并使用channelTables
緩存地址addr
和 通道channel
對應(yīng)關(guān)系,不需要每次都創(chuàng)建通道萨驶。 - 記錄
namesrv
的地址列表歉摧,當(dāng)發(fā)送請求時,沒有寫地址addr
腔呜,那么就向namesrv
的地址發(fā)送請求叁温。 - 通過
invokeSync
,invokeAsync
和invokeOneway
發(fā)送請求,其實就是先根據(jù)地址addr
獲取可使用的通道channel
核畴,調(diào)用父類對應(yīng)方法發(fā)送請求數(shù)據(jù)膝但。
四. NettyRemotingServer
類
這個是 RPC
服務(wù)的服務(wù)端具體實現(xiàn)類。
4.1 重要的成員屬性
// Netty 服務(wù)端引導(dǎo)類
private final ServerBootstrap serverBootstrap;
// 處理連接上服務(wù)端的所有 Socket 的IO 事件
private final EventLoopGroup eventLoopGroupSelector;
// 處理服務(wù)端接收客戶端連接的線程池
private final EventLoopGroup eventLoopGroupBoss;
// Netty的配置項
private final NettyServerConfig nettyServerConfig;
// 公共線程池
private final ExecutorService publicExecutor;
// Netty 事件的監(jiān)聽接口
private final ChannelEventListener channelEventListener;
4.2 重要方法
4.2.1 構(gòu)造方法
public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
this(nettyServerConfig, null);
}
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
if (useEpoll()) {
this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
}
});
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
} else {
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
}
});
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
}
loadSslContext();
}
創(chuàng)建公共線程池 publicExecutor
谤草,根據(jù)是否使用 useEpoll()
跟束,創(chuàng)建不同的 eventLoopGroupBoss
和 eventLoopGroupSelector
實現(xiàn)莺奸。
4.2.2 start
方法
public void start() {
// 這個線程池 defaultEventExecutorGroup 是用來處理 ChannelHandler 的方法
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
// 創(chuàng)建共享的 ChannelHandler
prepareSharableHandlers();
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
// 服務(wù)端綁定監(jiān)控端口
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
// 每隔三秒掃描有沒有過期請求
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
- 創(chuàng)建
defaultEventExecutorGroup
線程池。 - 創(chuàng)建共享的
ChannelHandler
實例冀宴。 - 初始化
serverBootstrap
服務(wù)端灭贷。添加的
ChannelHandler
與NettyRemotingClient
中的類似,這里就不再展開分析了略贮。 - 服務(wù)端
serverBootstrap
綁定監(jiān)控端口甚疟。 - 定時器
timer
每隔三秒掃描有沒有過期請求。