rocketmq rpc 通信(二)

??rocketMq底層使用netty作為網(wǎng)絡(luò)通信框架纵寝,在上一篇 rocketmq rpc 通信(一) 編解碼 介紹了rocketMq如何編解碼髓需,而本篇將重點(diǎn)介紹rocketMq netty底層的線程模型和通信方式。

1副砍、為什么使用netty

  • netty封裝了java底層的nio,而不是使用傳統(tǒng)的bio斯入,底層使用reactor線程模型胰默,使用幾個線程可以輕松處理上萬socket連接粱栖,效率更加高效
  • netty底層解決了jdk epoll 空輪訓(xùn)的bug,同時提供了許多處理網(wǎng)絡(luò)粘包拆包的工具類扩灯,開發(fā)更加高效
  • netty的api相對簡單媚赖,開發(fā)人員不用被nio的各種事件困擾,底層優(yōu)秀的pipeline機(jī)制更加利于各種handler的擴(kuò)展珠插,使開發(fā)人員更加注重業(yè)務(wù)邏輯的實(shí)現(xiàn)
  • netty是異步編程惧磺,可以處理更高的并發(fā),很多優(yōu)秀的中間件都是基于netty做網(wǎng)絡(luò)通信

2捻撑、rocketMq 使用netty的線程模型介紹

??rocketMq中一個有4類線程磨隘,如圖表所示

線程數(shù) name 說明
1 BossThread reactor主線程缤底,處理tcp連接建立
3 SelectorWorkerThread reactor worker線程,負(fù)責(zé)select網(wǎng)絡(luò)事件番捂,之后轉(zhuǎn)發(fā)到編解碼線程處理
8 CodecThread netty 的編解碼線程个唧,源碼中數(shù)量為8
N ProcessorThread 業(yè)務(wù)處理線程池,負(fù)責(zé)各種業(yè)務(wù)處理设预,如讀寫消息

說明:在rocketMq 對應(yīng)的netty 的worker線程徙歼,實(shí)際工作就是select socket 事件,之后轉(zhuǎn)發(fā)給code線程鳖枕,所以源碼中的worker線程只設(shè)置了3個線程鲁沥。上圖中的線程處理時間長短是逐漸增大的。各個線程之間的交互如圖所示

netty線程模型

還有一張圖更加清晰的解釋了 netty 的pipeline 和 server 端的如何做出處理的耕魄,此圖來自rocketMq社區(qū)
image.png

3画恰、server端流程

server端類繼承圖

3.1、server端啟動流程

??服務(wù)端啟動時吸奴,tcp參數(shù)說明

  1. ChannelOption.SO_BACKLOG對應(yīng)的是tcp/ip協(xié)議listen函數(shù)中的backlog參數(shù)允扇,函數(shù)listen(int socketfd,int backlog)用來初始化服務(wù)端可連接隊(duì)列, 服務(wù)端處理客戶端連接請求是順序處理的则奥,所以同一時間只能處理一個客戶端連接考润,多個客戶端來的時候,服務(wù)端將不能處理的客戶端連接請求放在隊(duì)列中等待處理读处,backlog參數(shù)指定了隊(duì)列的大小
  2. ChanneOption.SO_REUSEADDR對應(yīng)于套接字選項(xiàng)中的SO_REUSEADDR糊治,這個參數(shù)表示允許重復(fù)使用本地地址和端口,比如罚舱,某個服務(wù)器進(jìn)程占用了TCP的80端口進(jìn)行監(jiān)聽井辜,此時再次監(jiān)聽該端口就會返回錯誤,使用該參數(shù)就可以解決問題管闷,該參數(shù)允許共用該端口粥脚,這個在服務(wù)器程序中比較常使用
  3. Channeloption.SO_KEEPALIVE參數(shù)對應(yīng)于套接字選項(xiàng)中的SO_KEEPALIVE,該參數(shù)用于設(shè)置TCP連接包个,當(dāng)設(shè)置該選項(xiàng)以后刷允,連接會測試鏈接的狀態(tài),這個選項(xiàng)用于可能長時間沒有數(shù)據(jù)交流的連接碧囊。當(dāng)設(shè)置該選項(xiàng)以后树灶,如果在兩小時內(nèi)沒有數(shù)據(jù)的通信時,TCP會自動發(fā)送一個活動探測數(shù)據(jù)報(bào)文糯而。由于rocketMq有心跳機(jī)制天通,所以此參數(shù)設(shè)置為false。
  4. ChannelOption.TCP_NODELAY 該參數(shù)的作用就是禁止使用Nagle算法歧蒋,使用于小數(shù)據(jù)即時傳輸
    server端構(gòu)造方法土砂,這些都是netty的一下api州既,需要對netty相對熟悉
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
        final ChannelEventListener channelEventListener) {
        super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
        this.serverBootstrap = new ServerBootstrap();
        this.nettyServerConfig = nettyServerConfig;
        this.channelEventListener = channelEventListener;

        int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
        if (publicThreadNums <= 0) {
            publicThreadNums = 4;
        }

        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
            }
        });
        // 是否使用epoll
        if (useEpoll()) {
            this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });

            this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        } else {
            this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });

            this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        }
        loadSslContext();
    }

服務(wù)端啟動方法

@Override
    public void start() {
        // 處理編解碼的線程池
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });

        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class :NioServerSocketChannel.class)// 是否使用epoll
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                new HandshakeHandler(TlsSystemConfig.tlsMode))
                                // 這些handler共用defaultEventExecutorGroup這個線程池
                            .addLast(defaultEventExecutorGroup,
                                new NettyEncoder(),
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                new NettyConnectManageHandler(),
                                new NettyServerHandler()// 具體處理
                            );
                    }
                });
        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }
    }

??說明:可以看到,在構(gòu)造函數(shù)中初始化boss和worker線程池時萝映,判斷是否可以使用epoll吴叶,下面大體上來說明下epoll比select好在哪里~
??select的最大缺陷是能打開fd的個數(shù)是有限的,linux上默認(rèn)為1024序臂,對socket的掃描是線性掃描蚌卤,就是采用輪訓(xùn)的方法效率低下,當(dāng)socket比較多時并且不管哪個socket是活躍的奥秆,都會遍歷一遍逊彭。select需要存儲一個維護(hù)fd的數(shù)據(jù)結(jié)構(gòu),在用戶態(tài)和內(nèi)核態(tài)切換時需要拷貝构订,效率也比較低下侮叮。
??epoll支持水平觸發(fā)和邊緣觸發(fā),最大的特點(diǎn)在于邊緣觸發(fā)悼瘾,它只告訴進(jìn)程哪些fd剛剛變?yōu)榫途w態(tài)囊榜,并且只會通知一次。還有一個特點(diǎn)是亥宿,epoll使用“事件”的就緒通知方式卸勺,通過epoll_ctl注冊fd,一旦該fd就緒烫扼,內(nèi)核就會采用類似callback的回調(diào)機(jī)制來激活該fd曙求,epoll_wait便可以收到通知。epoll沒有最大連接數(shù)限制存儲使用紅黑樹映企,沒有用戶態(tài)和內(nèi)核態(tài)之間的內(nèi)存拷貝悟狱,使用mmap做內(nèi)存映射,效率更加高效卑吭。

3.2芽淡、server端處理請求

??由最上面類繼承圖可知,NettyRemotingServer繼承了NettyRemotingAbstract豆赏,server只會處理request請求,處理完成后將response寫回到channel返給客戶端富稻。

class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            // 這個方法是NettyRemotingAbstract的方法
            processMessageReceived(ctx, msg);
        }
    }
// 看下server端如何處理請求,代碼在NettyRemotingAbstract
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
        // 服務(wù)端啟動時會注冊相應(yīng)的processor和處理processor對應(yīng)的線程池
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
        // client生成的一個請求碼掷邦,server端原路返回
        final int opaque = cmd.getOpaque();

        if (pair != null) {
            Runnable run = new Runnable() {
                @Override
                public void run() {
                    try {
                        // 執(zhí)行before鉤子方法
                        doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                        // 具體的processor處理
                        final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                        // 執(zhí)行after鉤子方法
                        doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);

                        if (!cmd.isOnewayRPC()) {
                            if (response != null) {
                                // 將客戶端請求碼原路返回,之后client根據(jù)opaque從map中獲取相應(yīng)的ResponseFuture椭赋,讓client端的線程不在等待
                                response.setOpaque(opaque);
                                response.markResponseType();
                                try {
                                    ctx.writeAndFlush(response);
                                } catch (Throwable e) {
                                    log.error("process request over, but response failed", e);
                                    log.error(cmd.toString());
                                    log.error(response.toString());
                                }
                            } else {

                            }
                        }
                    } catch (Throwable e) {
                        log.error("process request exception", e);
                        log.error(cmd.toString());

                        if (!cmd.isOnewayRPC()) {
                            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                                RemotingHelper.exceptionSimpleDesc(e));
                            response.setOpaque(opaque);
                            ctx.writeAndFlush(response);
                        }
                    }
                }
            };
            // 執(zhí)行對應(yīng)的processor方法抚岗,看是否拒絕,這里是在code線程中執(zhí)行的哪怔,所以這個方法不能太耗時
            if (pair.getObject1().rejectRequest()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[REJECTREQUEST]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                return;
            }

            try {
                final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                // 提交到對應(yīng)的processor線程池中執(zhí)行
                pair.getObject2().submit(requestTask);
            } catch (RejectedExecutionException e) {
                if ((System.currentTimeMillis() % 10000) == 0) {
                    log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                        + ", too many requests and system thread pool busy, RejectedExecutionException "
                        + pair.getObject2().toString()
                        + " request code: " + cmd.getCode());
                }
                // 由于Oneway 類型的rpc 客戶端不關(guān)心結(jié)果宣蔚,所以不用返回客戶端信息
                if (!cmd.isOnewayRPC()) {
                    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                        "[OVERLOAD]system busy, start flow control for a while");
                    response.setOpaque(opaque);
                    ctx.writeAndFlush(response);
                }
            }
        } else {
            // 非法的code碼向抢,返回客戶端錯誤
            String error = " request type " + cmd.getCode() + " not supported";
            final RemotingCommand response =
                RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
        }
    }

??說明:服務(wù)端在接收到客戶端的請求后,由上面注冊的defaultEventExecutorGroup線程也就是 編解碼線程執(zhí)行胚委,通過RemotingCommand的code碼獲取相應(yīng)的pair key : processor,value : processor對應(yīng)的執(zhí)行線程池挟鸠,編解碼線程提交到processor對應(yīng)的執(zhí)行線程池中執(zhí)行,線程池執(zhí)行完成之后亩冬,將結(jié)果寫入pipeline中艘希,做后續(xù)處理。

4硅急、client端流程

client端的類繼承圖如圖所示


client端類繼承圖

4.1覆享、client端rpc通信方式

  1. sync,客戶端線程同步等待
  2. async营袜,客戶端線程不同步等待撒顿,通過netty異步編程的callback進(jìn)行通知
  3. oneway,客戶端只像server端請求荚板,不關(guān)心請求是否成功
4.1.1凤壁、client端handler處理

??由上面的類繼承圖,NettyRemotingClient也繼承了NettyRemotingAbstract啸驯,在NettyRemotingAbstract#processMessageReceived方法中聲明了RESPONSE_COMMAND客扎,就是客戶端收到server端的響應(yīng)具體處理的邏輯

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                // client端調(diào)用這個方法
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }

public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        final int opaque = cmd.getOpaque();
        final ResponseFuture responseFuture = responseTable.get(opaque);
        if (responseFuture != null) {
            responseFuture.setResponseCommand(cmd);

            responseTable.remove(opaque);

            if (responseFuture.getInvokeCallback() != null) {
                // async模式下,執(zhí)行回調(diào)方法
                executeInvokeCallback(responseFuture);
            } else {
                // sync模式下罚斗,喚醒client端發(fā)送線程徙鱼,發(fā)送線程返回RemotingCommand 對應(yīng)的response
                responseFuture.putResponse(cmd);
                responseFuture.release();
            }
        } else {
            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(cmd.toString());
        }
    }
4.1.2、sync調(diào)用

調(diào)用流程圖如圖所示


同步調(diào)用流程

代碼分析如下

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
        final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        final int opaque = request.getOpaque();

        try {
            // 初始化時针姿,ResponseFuture 內(nèi)部成員變量 CountDownLatch countDownLatch = new CountDownLatch(1);
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
            // 將客戶端請求碼和ResponseFuture put 到map中袱吆,在server端返回結(jié)果回調(diào)Listener時可以根據(jù)客戶端請求碼獲取ResponseFuture
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        // 成功,將future設(shè)置SendRequestOK為true距淫,直接返回
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        responseFuture.setSendRequestOK(false);
                    }
                    // 以下操作是失敗的情況绞绒,刪除客戶端請求碼對應(yīng)的future,喚醒client端發(fā)送線程
                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    // 這個方法執(zhí)行了countDownLatch.countDown()榕暇,因此會喚醒client端發(fā)送線程蓬衡。
                    responseFuture.putResponse(null);
                    log.warn("send a request command to channel <" + addr + "> failed.");
                }
            });
            // 客戶端發(fā)送線程同步等待,就是調(diào)用了ResponseFuture 中的 countDownLatch.await
            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            if (null == responseCommand) {
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }

            return responseCommand;
        } finally {
            this.responseTable.remove(opaque);
        }
    }

??說明:在同步調(diào)用過程中彤枢,有兩個地方執(zhí)行了responseFuture中countDownLatch的countdown操作狰晚,一個是在發(fā)送數(shù)據(jù)注冊的Listener中,一個是在NettyClientHandler中缴啡,這兩個地方都會喚醒client端發(fā)送線程壁晒,Listener中對應(yīng)的是rpc通信失敗,NettyClientHandler對應(yīng)的是請求成功

4.1.3业栅、async調(diào)用

調(diào)用流程圖如圖所示


異步調(diào)用流程

代碼如下

public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
       final InvokeCallback invokeCallback)
       throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
       long beginStartTime = System.currentTimeMillis();
       final int opaque = request.getOpaque();
       // 獲取一個信號量秒咐,防止發(fā)送太快谬晕,release的邏輯在執(zhí)行完callback方法后
       boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
       if (acquired) {
           // 在 once 的release 方法里,會將信號量release 減一
           final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
           long costTime = System.currentTimeMillis() - beginStartTime;
           if (timeoutMillis < costTime) {
               once.release();
               throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
           }

           final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
           this.responseTable.put(opaque, responseFuture);
           try {
               channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                   @Override
                   public void operationComplete(ChannelFuture f) throws Exception {
                       if (f.isSuccess()) {
                           // 這里是執(zhí)行成功了携取,設(shè)置為sendRequestOK為true攒钳,直接返回,回調(diào)方法在NettyClientHandler執(zhí)行了
                           responseFuture.setSendRequestOK(true);
                           return;
                       }
                       // 這里執(zhí)行失敗歹茶,將失敗結(jié)果傳遞給回調(diào)方法執(zhí)行
                       requestFail(opaque);
                       log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                   }
               });
           } catch (Exception e) {
               responseFuture.release();
               log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
               throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
           }
       } else {
           if (timeoutMillis <= 0) {
               throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
           } else {
               String info =
                   String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                       timeoutMillis,
                       this.semaphoreAsync.getQueueLength(),
                       this.semaphoreAsync.availablePermits()
                   );
               log.warn(info);
               throw new RemotingTimeoutException(info);
           }
       }
   }

??說明:異步調(diào)用其實(shí)就是傳入了一個callback夕玩,這樣做的好處是不用阻塞client端的調(diào)用線程,在server端處理成功后惊豺,利用netty的異步編程燎孟,調(diào)用回調(diào)函數(shù),完全實(shí)現(xiàn)了異步化尸昧,吞吐量有所上升揩页,注意:異步調(diào)用有可能會丟失數(shù)據(jù)(server端突然宕機(jī),數(shù)據(jù)會丟失)烹俗。rocketMq異步調(diào)用為了防止發(fā)送過快服務(wù)端壓力過大爆侣,在client端設(shè)置了信號量 Semaphore,調(diào)用時先獲取一個幢妄,在回調(diào)成功執(zhí)行完callback函數(shù)后會將信號量 release兔仰。

4.1.3、oneway調(diào)用

??說明:oneway調(diào)用客戶端不關(guān)心server端是否處理成功蕉鸳,這種模式適合哪種不重要的調(diào)用乎赴,同時server端發(fā)生異常時發(fā)現(xiàn)是oneway調(diào)用,也不會向client端發(fā)送結(jié)果數(shù)據(jù)

4.1.4潮尝、async調(diào)用定時清理

??異步調(diào)用時榕吼,有可能服務(wù)端宕機(jī),這時候客戶端會永遠(yuǎn)接收不到服務(wù)端的響應(yīng)數(shù)據(jù)勉失,因此async調(diào)用時put到map中的ResponseFuture就會一直在客戶端的內(nèi)存中羹蚣,還有重要的一點(diǎn)是async調(diào)用時獲取了一個信號量也得不到釋放,最終async有可能調(diào)用不了乱凿,這時候需要把這些數(shù)據(jù)清除掉顽素,代碼如下

public void scanResponseTable() {
        final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
        Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<Integer, ResponseFuture> next = it.next();
            ResponseFuture rep = next.getValue();

            if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
                // 釋放信號量
                rep.release();
                // 從map中remove
                it.remove();
                rfList.add(rep);
                log.warn("remove timeout request, " + rep);
            }
        }

        for (ResponseFuture rf : rfList) {
            try {
                // 異步執(zhí)行callback方法
                executeInvokeCallback(rf);
            } catch (Throwable e) {
                log.warn("scanResponseTable, operationComplete Exception", e);
            }
        }
    }

5、客戶端連接管理

??在server端啟動時徒蟆,設(shè)置了一個TCP選項(xiàng) ChannelOption.SO_KEEPALIVE, false,表示禁用tcp的keepalive戈抄,由于rocketMq有心跳機(jī)制,所以取消了這個選項(xiàng)后专。
在server端初始化pipeline時,添加了一個 IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),nettyServerConfig.getServerChannelMaxIdleTimeSeconds()默認(rèn)是120秒输莺,表示120秒沒有發(fā)生讀寫請求戚哎,netty 會實(shí)例化一個 IdleStateEvent 沿著pipeline傳輸裸诽,NettyConnectManageHandler中監(jiān)聽了這個事件,會將這個channel remove掉型凳,具體看下源碼丈冬。
IdleStateHandler是netty里面的一個類,內(nèi)部邏輯就是在channel有讀或?qū)懯录r甘畅,更新一下lastReadTime和lastWriteTime埂蕊,然后起一個定時任務(wù)檢測和當(dāng)前時間比較是否超過了allIdleTimeNanos設(shè)置的120秒。

// 初始化IdleStateHandler時疏唾,readerIdleTime = 0蓄氧,writerIdleTime = 0,allIdleTime = 120
// 在超過120秒沒有讀寫請求后槐脏,這是
public void run() {
            if (!ctx.channel().isOpen()) {
                return;
            }

            long nextDelay = allIdleTimeNanos;
            if (!reading) {
                nextDelay -= System.nanoTime() - Math.max(lastReadTime, lastWriteTime);
            }
            if (nextDelay <= 0) {
                // Both reader and writer are idle - set a new timeout and
                // notify the callback.
                allIdleTimeout = ctx.executor().schedule(
                        this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
                try {
                    IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, firstAllIdleEvent);
                    if (firstAllIdleEvent) {
                        firstAllIdleEvent = false;
                    }
                    // 發(fā)送IdleStateEvent事件到pipeline中喉童,NettyConnectManageHandler會處理這個事件
                    channelIdle(ctx, event);
                } catch (Throwable t) {
                    ctx.fireExceptionCaught(t);
                }
            } else {
                // Either read or write occurred before the timeout - set a new
                // timeout with shorter delay.
                allIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
            }
        }
// NettyConnectManageHandler處理
class NettyConnectManageHandler extends ChannelDuplexHandler {
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);
            super.channelRegistered(ctx);
        }

        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress);
            super.channelUnregistered(ctx);
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
            super.channelActive(ctx);

            if (NettyRemotingServer.this.channelEventListener != null) {
                // channel 連接事件
                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
            }
        }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
            super.channelInactive(ctx);

            if (NettyRemotingServer.this.channelEventListener != null) {
                // channel關(guān)閉事件
                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
            }
        }

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            // 這里處理IdleStateHandler 中發(fā)出來的事件
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                if (event.state().equals(IdleState.ALL_IDLE)) {
                    final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                    log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
                    RemotingUtil.closeChannel(ctx.channel());
                    if (NettyRemotingServer.this.channelEventListener != null) {
                        // 封裝NettyEvent,放入隊(duì)列顿天,別的線程從隊(duì)列中取相應(yīng)的事件做處理
                        NettyRemotingServer.this
                            .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
                    }
                }
            }

            ctx.fireUserEventTriggered(evt);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
            log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);

            if (NettyRemotingServer.this.channelEventListener != null) {
                // 發(fā)生異常事件
                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
            }

            RemotingUtil.closeChannel(ctx.channel());
        }
    }
// 從隊(duì)列中取事件的線程
class NettyEventExecutor extends ServiceThread {
        private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>();
        private final int maxSize = 10000;

        public void putNettyEvent(final NettyEvent event) {
            if (this.eventQueue.size() <= maxSize) {
                this.eventQueue.add(event);
            } else {
                log.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString());
            }
        }

        @Override
        public void run() {
            log.info(this.getServiceName() + " service started");

            final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();

            while (!this.isStopped()) {
                try {
                    NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
                    if (event != null && listener != null) {
                        switch (event.getType()) {
                            case IDLE:
                                listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());
                                break;
                            case CLOSE:
                                listener.onChannelClose(event.getRemoteAddr(), event.getChannel());
                                break;
                            case CONNECT:
                                listener.onChannelConnect(event.getRemoteAddr(), event.getChannel());
                                break;
                            case EXCEPTION:
                                listener.onChannelException(event.getRemoteAddr(), event.getChannel());
                                break;
                            default:
                                break;

                        }
                    }
                } catch (Exception e) {
                    log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

            log.info(this.getServiceName() + " service end");
        }

        @Override
        public String getServiceName() {
            return NettyEventExecutor.class.getSimpleName();
        }
    }

??說明:當(dāng)有客戶端連接或關(guān)閉或超過一定時間沒有讀寫請求時堂氯,NettyConnectManageHandler會捕捉到相應(yīng)的事件,之后放到阻塞隊(duì)列中牌废,有一個單獨(dú)的NettyEventExecutor從隊(duì)列中獲取事件咽白,調(diào)用相應(yīng)的listener做處理。

6鸟缕、總結(jié)

??本文介紹了rocketMq 使用netty的線程模型晶框,client端發(fā)送數(shù)據(jù),server端接收數(shù)據(jù)并根據(jù)相應(yīng)的請求code碼找到相應(yīng)的processor提交到線程池中處理叁扫,處理完成后將結(jié)果寫會客戶端三妈。客戶端調(diào)用又分為3中模式莫绣,還介紹了channel的管理機(jī)制畴蒲,netty底層其實(shí)還是挺復(fù)雜的,還需要深入學(xué)習(xí)对室。以上就是通過看rocketMq rpc模塊相關(guān)代碼總結(jié)出來的模燥,如有錯誤,歡迎指出討論~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末掩宜,一起剝皮案震驚了整個濱河市蔫骂,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌牺汤,老刑警劉巖辽旋,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異,居然都是意外死亡补胚,警方通過查閱死者的電腦和手機(jī)码耐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來溶其,“玉大人骚腥,你說我怎么就攤上這事∑刻樱” “怎么了束铭?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長厢绝。 經(jīng)常有香客問我契沫,道長,這世上最難降的妖魔是什么代芜? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任埠褪,我火速辦了婚禮,結(jié)果婚禮上挤庇,老公的妹妹穿的比我還像新娘钞速。我一直安慰自己,他們只是感情好嫡秕,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布渴语。 她就那樣靜靜地躺著,像睡著了一般昆咽。 火紅的嫁衣襯著肌膚如雪驾凶。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天掷酗,我揣著相機(jī)與錄音调违,去河邊找鬼。 笑死泻轰,一個胖子當(dāng)著我的面吹牛技肩,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播浮声,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼虚婿,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了泳挥?” 一聲冷哼從身側(cè)響起然痊,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎屉符,沒想到半個月后剧浸,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體锹引,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年辛蚊,在試婚紗的時候發(fā)現(xiàn)自己被綠了粤蝎。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,096評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡袋马,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出秸应,到底是詐尸還是另有隱情虑凛,我是刑警寧澤,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布软啼,位于F島的核電站桑谍,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏祸挪。R本人自食惡果不足惜锣披,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望贿条。 院中可真熱鬧雹仿,春花似錦、人聲如沸整以。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽公黑。三九已至邑商,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間凡蚜,已是汗流浹背人断。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留朝蜘,地道東北人恶迈。 一個月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓,卻偏偏與公主長得像芹务,于是被迫代替她去往敵國和親蝉绷。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,037評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 文章摘要:如何設(shè)計(jì)RPC通信層模型是任何一款性能強(qiáng)勁的MQ所要重點(diǎn)考慮的問題在(一)篇中主要介紹了RocketMQ...
    癲狂俠閱讀 5,231評論 0 16
  • 文章摘要:借用小廝的一句話“消息隊(duì)列的本質(zhì)在于消息的發(fā)送枣抱、存儲和接收”熔吗。那么,對于一款消息隊(duì)列來說佳晶,如何做到消息的...
    癲狂俠閱讀 19,704評論 6 31
  • 發(fā)現(xiàn)自己近一年有些毛病桅狠,自己也算是研習(xí)了不少的源代碼,看了不少的技術(shù)書籍,但是自己就是記憶力不行中跌,總是過段時間就會...
    kevinfuture閱讀 9,498評論 0 21
  • 展會信息: 展會時間:2017.1.24-26 展會地址:美國紐約javits會展中心 主...
    jiexing1100閱讀 311評論 0 0
  • 我喜歡上一個姑娘 她的身上總是淡淡的梔子花香 每當(dāng)她經(jīng)過我的身旁 甜蜜就充斥了我的心房 ...
    牟大萌啊閱讀 337評論 1 0