??rocketMq底層使用netty作為網(wǎng)絡(luò)通信框架纵寝,在上一篇 rocketmq rpc 通信(一) 編解碼 介紹了rocketMq如何編解碼髓需,而本篇將重點(diǎn)介紹rocketMq netty底層的線程模型和通信方式。
1副砍、為什么使用netty
- netty封裝了java底層的nio,而不是使用傳統(tǒng)的bio斯入,底層使用reactor線程模型胰默,使用幾個線程可以輕松處理上萬socket連接粱栖,效率更加高效
- netty底層解決了jdk epoll 空輪訓(xùn)的bug,同時提供了許多處理網(wǎng)絡(luò)粘包拆包的工具類扩灯,開發(fā)更加高效
- netty的api相對簡單媚赖,開發(fā)人員不用被nio的各種事件困擾,底層優(yōu)秀的pipeline機(jī)制更加利于各種handler的擴(kuò)展珠插,使開發(fā)人員更加注重業(yè)務(wù)邏輯的實(shí)現(xiàn)
- netty是異步編程惧磺,可以處理更高的并發(fā),很多優(yōu)秀的中間件都是基于netty做網(wǎng)絡(luò)通信
2捻撑、rocketMq 使用netty的線程模型介紹
??rocketMq中一個有4類線程磨隘,如圖表所示
線程數(shù) | name | 說明 |
---|---|---|
1 | BossThread | reactor主線程缤底,處理tcp連接建立 |
3 | SelectorWorkerThread | reactor worker線程,負(fù)責(zé)select網(wǎng)絡(luò)事件番捂,之后轉(zhuǎn)發(fā)到編解碼線程處理 |
8 | CodecThread | netty 的編解碼線程个唧,源碼中數(shù)量為8 |
N | ProcessorThread | 業(yè)務(wù)處理線程池,負(fù)責(zé)各種業(yè)務(wù)處理设预,如讀寫消息 |
說明:在rocketMq 對應(yīng)的netty 的worker線程徙歼,實(shí)際工作就是select socket 事件,之后轉(zhuǎn)發(fā)給code線程鳖枕,所以源碼中的worker線程只設(shè)置了3個線程鲁沥。上圖中的線程處理時間長短是逐漸增大的。各個線程之間的交互如圖所示
還有一張圖更加清晰的解釋了 netty 的pipeline 和 server 端的如何做出處理的耕魄,此圖來自rocketMq社區(qū)
3画恰、server端流程
3.1、server端啟動流程
??服務(wù)端啟動時吸奴,tcp參數(shù)說明
- ChannelOption.SO_BACKLOG對應(yīng)的是tcp/ip協(xié)議listen函數(shù)中的backlog參數(shù)允扇,函數(shù)listen(int socketfd,int backlog)用來初始化服務(wù)端可連接隊(duì)列, 服務(wù)端處理客戶端連接請求是順序處理的则奥,所以同一時間只能處理一個客戶端連接考润,多個客戶端來的時候,服務(wù)端將不能處理的客戶端連接請求放在隊(duì)列中等待處理读处,backlog參數(shù)指定了隊(duì)列的大小
- ChanneOption.SO_REUSEADDR對應(yīng)于套接字選項(xiàng)中的SO_REUSEADDR糊治,這個參數(shù)表示允許重復(fù)使用本地地址和端口,比如罚舱,某個服務(wù)器進(jìn)程占用了TCP的80端口進(jìn)行監(jiān)聽井辜,此時再次監(jiān)聽該端口就會返回錯誤,使用該參數(shù)就可以解決問題管闷,該參數(shù)允許共用該端口粥脚,這個在服務(wù)器程序中比較常使用
- Channeloption.SO_KEEPALIVE參數(shù)對應(yīng)于套接字選項(xiàng)中的SO_KEEPALIVE,該參數(shù)用于設(shè)置TCP連接包个,當(dāng)設(shè)置該選項(xiàng)以后刷允,連接會測試鏈接的狀態(tài),這個選項(xiàng)用于可能長時間沒有數(shù)據(jù)交流的連接碧囊。當(dāng)設(shè)置該選項(xiàng)以后树灶,如果在兩小時內(nèi)沒有數(shù)據(jù)的通信時,TCP會自動發(fā)送一個活動探測數(shù)據(jù)報(bào)文糯而。由于rocketMq有心跳機(jī)制天通,所以此參數(shù)設(shè)置為false。
- ChannelOption.TCP_NODELAY 該參數(shù)的作用就是禁止使用Nagle算法歧蒋,使用于小數(shù)據(jù)即時傳輸
server端構(gòu)造方法土砂,這些都是netty的一下api州既,需要對netty相對熟悉
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
// 是否使用epoll
if (useEpoll()) {
this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
}
});
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
} else {
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
}
});
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
}
loadSslContext();
}
服務(wù)端啟動方法
@Override
public void start() {
// 處理編解碼的線程池
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class :NioServerSocketChannel.class)// 是否使用epoll
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
// 這些handler共用defaultEventExecutorGroup這個線程池
.addLast(defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler()// 具體處理
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
}
??說明:可以看到,在構(gòu)造函數(shù)中初始化boss和worker線程池時萝映,判斷是否可以使用epoll吴叶,下面大體上來說明下epoll比select好在哪里~
??select的最大缺陷是能打開fd的個數(shù)是有限的,linux上默認(rèn)為1024序臂,對socket的掃描是線性掃描蚌卤,就是采用輪訓(xùn)的方法效率低下,當(dāng)socket比較多時并且不管哪個socket是活躍的奥秆,都會遍歷一遍逊彭。select需要存儲一個維護(hù)fd的數(shù)據(jù)結(jié)構(gòu),在用戶態(tài)和內(nèi)核態(tài)切換時需要拷貝构订,效率也比較低下侮叮。
??epoll支持水平觸發(fā)和邊緣觸發(fā),最大的特點(diǎn)在于邊緣觸發(fā)悼瘾,它只告訴進(jìn)程哪些fd剛剛變?yōu)榫途w態(tài)囊榜,并且只會通知一次。還有一個特點(diǎn)是亥宿,epoll使用“事件”的就緒通知方式卸勺,通過epoll_ctl注冊fd,一旦該fd就緒烫扼,內(nèi)核就會采用類似callback的回調(diào)機(jī)制來激活該fd曙求,epoll_wait便可以收到通知。epoll沒有最大連接數(shù)限制存儲使用紅黑樹映企,沒有用戶態(tài)和內(nèi)核態(tài)之間的內(nèi)存拷貝悟狱,使用mmap做內(nèi)存映射,效率更加高效卑吭。
3.2芽淡、server端處理請求
??由最上面類繼承圖可知,NettyRemotingServer繼承了NettyRemotingAbstract豆赏,server只會處理request請求,處理完成后將response寫回到channel返給客戶端富稻。
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
// 這個方法是NettyRemotingAbstract的方法
processMessageReceived(ctx, msg);
}
}
// 看下server端如何處理請求,代碼在NettyRemotingAbstract
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
// 服務(wù)端啟動時會注冊相應(yīng)的processor和處理processor對應(yīng)的線程池
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
// client生成的一個請求碼掷邦,server端原路返回
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
// 執(zhí)行before鉤子方法
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
// 具體的processor處理
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
// 執(zhí)行after鉤子方法
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
if (!cmd.isOnewayRPC()) {
if (response != null) {
// 將客戶端請求碼原路返回,之后client根據(jù)opaque從map中獲取相應(yīng)的ResponseFuture椭赋,讓client端的線程不在等待
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};
// 執(zhí)行對應(yīng)的processor方法抚岗,看是否拒絕,這里是在code線程中執(zhí)行的哪怔,所以這個方法不能太耗時
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try {
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
// 提交到對應(yīng)的processor線程池中執(zhí)行
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
}
// 由于Oneway 類型的rpc 客戶端不關(guān)心結(jié)果宣蔚,所以不用返回客戶端信息
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
// 非法的code碼向抢,返回客戶端錯誤
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
??說明:服務(wù)端在接收到客戶端的請求后,由上面注冊的defaultEventExecutorGroup線程也就是 編解碼線程執(zhí)行胚委,通過RemotingCommand的code碼獲取相應(yīng)的pair key : processor,value : processor對應(yīng)的執(zhí)行線程池挟鸠,編解碼線程提交到processor對應(yīng)的執(zhí)行線程池中執(zhí)行,線程池執(zhí)行完成之后亩冬,將結(jié)果寫入pipeline中艘希,做后續(xù)處理。
4硅急、client端流程
client端的類繼承圖如圖所示
4.1覆享、client端rpc通信方式
- sync,客戶端線程同步等待
- async营袜,客戶端線程不同步等待撒顿,通過netty異步編程的callback進(jìn)行通知
- oneway,客戶端只像server端請求荚板,不關(guān)心請求是否成功
4.1.1凤壁、client端handler處理
??由上面的類繼承圖,NettyRemotingClient也繼承了NettyRemotingAbstract啸驯,在NettyRemotingAbstract#processMessageReceived方法中聲明了RESPONSE_COMMAND客扎,就是客戶端收到server端的響應(yīng)具體處理的邏輯
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
// client端調(diào)用這個方法
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);
responseTable.remove(opaque);
if (responseFuture.getInvokeCallback() != null) {
// async模式下,執(zhí)行回調(diào)方法
executeInvokeCallback(responseFuture);
} else {
// sync模式下罚斗,喚醒client端發(fā)送線程徙鱼,發(fā)送線程返回RemotingCommand 對應(yīng)的response
responseFuture.putResponse(cmd);
responseFuture.release();
}
} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}
4.1.2、sync調(diào)用
調(diào)用流程圖如圖所示
代碼分析如下
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();
try {
// 初始化時针姿,ResponseFuture 內(nèi)部成員變量 CountDownLatch countDownLatch = new CountDownLatch(1);
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
// 將客戶端請求碼和ResponseFuture put 到map中袱吆,在server端返回結(jié)果回調(diào)Listener時可以根據(jù)客戶端請求碼獲取ResponseFuture
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
// 成功,將future設(shè)置SendRequestOK為true距淫,直接返回
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
// 以下操作是失敗的情況绞绒,刪除客戶端請求碼對應(yīng)的future,喚醒client端發(fā)送線程
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
// 這個方法執(zhí)行了countDownLatch.countDown()榕暇,因此會喚醒client端發(fā)送線程蓬衡。
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
// 客戶端發(fā)送線程同步等待,就是調(diào)用了ResponseFuture 中的 countDownLatch.await
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
??說明:在同步調(diào)用過程中彤枢,有兩個地方執(zhí)行了responseFuture中countDownLatch的countdown操作狰晚,一個是在發(fā)送數(shù)據(jù)注冊的Listener中,一個是在NettyClientHandler中缴啡,這兩個地方都會喚醒client端發(fā)送線程壁晒,Listener中對應(yīng)的是rpc通信失敗,NettyClientHandler對應(yīng)的是請求成功
4.1.3业栅、async調(diào)用
調(diào)用流程圖如圖所示
代碼如下
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis();
final int opaque = request.getOpaque();
// 獲取一個信號量秒咐,防止發(fā)送太快谬晕,release的邏輯在執(zhí)行完callback方法后
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
// 在 once 的release 方法里,會將信號量release 減一
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
once.release();
throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
}
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
this.responseTable.put(opaque, responseFuture);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
// 這里是執(zhí)行成功了携取,設(shè)置為sendRequestOK為true攒钳,直接返回,回調(diào)方法在NettyClientHandler執(zhí)行了
responseFuture.setSendRequestOK(true);
return;
}
// 這里執(zhí)行失敗歹茶,將失敗結(jié)果傳遞給回調(diào)方法執(zhí)行
requestFail(opaque);
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
}
});
} catch (Exception e) {
responseFuture.release();
log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
} else {
String info =
String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
timeoutMillis,
this.semaphoreAsync.getQueueLength(),
this.semaphoreAsync.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
}
}
}
??說明:異步調(diào)用其實(shí)就是傳入了一個callback夕玩,這樣做的好處是不用阻塞client端的調(diào)用線程,在server端處理成功后惊豺,利用netty的異步編程燎孟,調(diào)用回調(diào)函數(shù),完全實(shí)現(xiàn)了異步化尸昧,吞吐量有所上升揩页,注意:異步調(diào)用有可能會丟失數(shù)據(jù)(server端突然宕機(jī),數(shù)據(jù)會丟失)烹俗。rocketMq異步調(diào)用為了防止發(fā)送過快服務(wù)端壓力過大爆侣,在client端設(shè)置了信號量 Semaphore,調(diào)用時先獲取一個幢妄,在回調(diào)成功執(zhí)行完callback函數(shù)后會將信號量 release兔仰。
4.1.3、oneway調(diào)用
??說明:oneway調(diào)用客戶端不關(guān)心server端是否處理成功蕉鸳,這種模式適合哪種不重要的調(diào)用乎赴,同時server端發(fā)生異常時發(fā)現(xiàn)是oneway調(diào)用,也不會向client端發(fā)送結(jié)果數(shù)據(jù)
4.1.4潮尝、async調(diào)用定時清理
??異步調(diào)用時榕吼,有可能服務(wù)端宕機(jī),這時候客戶端會永遠(yuǎn)接收不到服務(wù)端的響應(yīng)數(shù)據(jù)勉失,因此async調(diào)用時put到map中的ResponseFuture就會一直在客戶端的內(nèi)存中羹蚣,還有重要的一點(diǎn)是async調(diào)用時獲取了一個信號量也得不到釋放,最終async有可能調(diào)用不了乱凿,這時候需要把這些數(shù)據(jù)清除掉顽素,代碼如下
public void scanResponseTable() {
final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, ResponseFuture> next = it.next();
ResponseFuture rep = next.getValue();
if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
// 釋放信號量
rep.release();
// 從map中remove
it.remove();
rfList.add(rep);
log.warn("remove timeout request, " + rep);
}
}
for (ResponseFuture rf : rfList) {
try {
// 異步執(zhí)行callback方法
executeInvokeCallback(rf);
} catch (Throwable e) {
log.warn("scanResponseTable, operationComplete Exception", e);
}
}
}
5、客戶端連接管理
??在server端啟動時徒蟆,設(shè)置了一個TCP選項(xiàng) ChannelOption.SO_KEEPALIVE, false
,表示禁用tcp的keepalive戈抄,由于rocketMq有心跳機(jī)制,所以取消了這個選項(xiàng)后专。
在server端初始化pipeline時,添加了一個 IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds())
,nettyServerConfig.getServerChannelMaxIdleTimeSeconds()
默認(rèn)是120秒输莺,表示120秒沒有發(fā)生讀寫請求戚哎,netty 會實(shí)例化一個 IdleStateEvent 沿著pipeline傳輸裸诽,NettyConnectManageHandler中監(jiān)聽了這個事件,會將這個channel remove掉型凳,具體看下源碼丈冬。
IdleStateHandler是netty里面的一個類,內(nèi)部邏輯就是在channel有讀或?qū)懯录r甘畅,更新一下lastReadTime和lastWriteTime埂蕊,然后起一個定時任務(wù)檢測和當(dāng)前時間比較是否超過了allIdleTimeNanos設(shè)置的120秒。
// 初始化IdleStateHandler時疏唾,readerIdleTime = 0蓄氧,writerIdleTime = 0,allIdleTime = 120
// 在超過120秒沒有讀寫請求后槐脏,這是
public void run() {
if (!ctx.channel().isOpen()) {
return;
}
long nextDelay = allIdleTimeNanos;
if (!reading) {
nextDelay -= System.nanoTime() - Math.max(lastReadTime, lastWriteTime);
}
if (nextDelay <= 0) {
// Both reader and writer are idle - set a new timeout and
// notify the callback.
allIdleTimeout = ctx.executor().schedule(
this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
try {
IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, firstAllIdleEvent);
if (firstAllIdleEvent) {
firstAllIdleEvent = false;
}
// 發(fā)送IdleStateEvent事件到pipeline中喉童,NettyConnectManageHandler會處理這個事件
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Either read or write occurred before the timeout - set a new
// timeout with shorter delay.
allIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
}
}
// NettyConnectManageHandler處理
class NettyConnectManageHandler extends ChannelDuplexHandler {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress);
super.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
super.channelActive(ctx);
if (NettyRemotingServer.this.channelEventListener != null) {
// channel 連接事件
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
super.channelInactive(ctx);
if (NettyRemotingServer.this.channelEventListener != null) {
// channel關(guān)閉事件
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 這里處理IdleStateHandler 中發(fā)出來的事件
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
RemotingUtil.closeChannel(ctx.channel());
if (NettyRemotingServer.this.channelEventListener != null) {
// 封裝NettyEvent,放入隊(duì)列顿天,別的線程從隊(duì)列中取相應(yīng)的事件做處理
NettyRemotingServer.this
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
}
}
}
ctx.fireUserEventTriggered(evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);
if (NettyRemotingServer.this.channelEventListener != null) {
// 發(fā)生異常事件
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
}
RemotingUtil.closeChannel(ctx.channel());
}
}
// 從隊(duì)列中取事件的線程
class NettyEventExecutor extends ServiceThread {
private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>();
private final int maxSize = 10000;
public void putNettyEvent(final NettyEvent event) {
if (this.eventQueue.size() <= maxSize) {
this.eventQueue.add(event);
} else {
log.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString());
}
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();
while (!this.isStopped()) {
try {
NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
if (event != null && listener != null) {
switch (event.getType()) {
case IDLE:
listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());
break;
case CLOSE:
listener.onChannelClose(event.getRemoteAddr(), event.getChannel());
break;
case CONNECT:
listener.onChannelConnect(event.getRemoteAddr(), event.getChannel());
break;
case EXCEPTION:
listener.onChannelException(event.getRemoteAddr(), event.getChannel());
break;
default:
break;
}
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info(this.getServiceName() + " service end");
}
@Override
public String getServiceName() {
return NettyEventExecutor.class.getSimpleName();
}
}
??說明:當(dāng)有客戶端連接或關(guān)閉或超過一定時間沒有讀寫請求時堂氯,NettyConnectManageHandler會捕捉到相應(yīng)的事件,之后放到阻塞隊(duì)列中牌废,有一個單獨(dú)的NettyEventExecutor從隊(duì)列中獲取事件咽白,調(diào)用相應(yīng)的listener做處理。
6鸟缕、總結(jié)
??本文介紹了rocketMq 使用netty的線程模型晶框,client端發(fā)送數(shù)據(jù),server端接收數(shù)據(jù)并根據(jù)相應(yīng)的請求code碼找到相應(yīng)的processor提交到線程池中處理叁扫,處理完成后將結(jié)果寫會客戶端三妈。客戶端調(diào)用又分為3中模式莫绣,還介紹了channel的管理機(jī)制畴蒲,netty底層其實(shí)還是挺復(fù)雜的,還需要深入學(xué)習(xí)对室。以上就是通過看rocketMq rpc模塊相關(guān)代碼總結(jié)出來的模燥,如有錯誤,歡迎指出討論~