Rocketmq源碼-remoting模塊詳解

Rocketmq 使用 Netty 實現(xiàn)了 remoting模塊(即 RPC 模塊)抒蚜。

一. RemotingClientRemotingServer 接口

1.1 RemotingService 接口

/**
 * 遠(yuǎn)程RPC調(diào)用服務(wù)接口
 */
public interface RemotingService {
    // 服務(wù)開啟
    void start();

    // 服務(wù)停止
    void shutdown();

    // 注冊RPC調(diào)用的鉤子對象RPCHook垮刹, 可以監(jiān)控RPC調(diào)用請求和響應(yīng)數(shù)據(jù)陕壹。
    void registerRPCHook(RPCHook rpcHook);
}

RemotingServiceRemotingClientRemotingServer 接口公共父接口苟鸯,表示遠(yuǎn)程RPC調(diào)用服務(wù)接口列林,提供了三個方法痰滋。

1.2 RemotingClient 接口

/**
 * 遠(yuǎn)程RPC調(diào)用服務(wù)客戶端
 */
public interface RemotingClient extends RemotingService {

    /**
     * 更新 NameServer 服務(wù)器的地址列表
     * @param addrs
     */
    void updateNameServerAddressList(final List<String> addrs);

    /**
     * 獲取 NameServer 服務(wù)器的地址列表
     * @return
     */
    List<String> getNameServerAddressList();

    /**
     * 向遠(yuǎn)程服務(wù)器 addr 地址發(fā)送數(shù)據(jù)予权,并同步阻塞等待響應(yīng)些楣。
     * 超過給的時間脂凶,沒有數(shù)據(jù)響應(yīng),就拋出異常愁茁。
     */
    RemotingCommand invokeSync(final String addr, final RemotingCommand request,
        final long timeoutMillis) throws InterruptedException, RemotingConnectException,
        RemotingSendRequestException, RemotingTimeoutException;

    /**
     * 向遠(yuǎn)程服務(wù)器 addr 地址發(fā)送數(shù)據(jù)蚕钦,立刻返回。
     * 當(dāng)遠(yuǎn)程服務(wù)器有響應(yīng)埋市,那么就回調(diào) InvokeCallback 的方法冠桃,傳遞響應(yīng)數(shù)據(jù);
     * 如果超過給的時間道宅,也會回調(diào) InvokeCallback 的方法食听,響應(yīng)失敗供置。
     */
    void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;

    void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
        RemotingTimeoutException, RemotingSendRequestException;

    /**
     * 注冊遠(yuǎn)程請求命令處理程序伯复,
     */
    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor);

    /**
     * 設(shè)置異步請求響應(yīng)回調(diào) InvokeCallback 的方法線程池執(zhí)行器
     */
    void setCallbackExecutor(final ExecutorService callbackExecutor);

    ExecutorService getCallbackExecutor();

    /**
     * 通道是否可寫
     */
    boolean isChannelWritable(final String addr);
}

RemotingClientRPC 服務(wù)的客戶端接口,有如下方法:

  1. invokeSync, invokeAsyncinvokeOneway : 都是向遠(yuǎn)程服務(wù)器 addr 地址發(fā)送數(shù)據(jù)闪萄。

    區(qū)別是 invokeSync 同步阻塞等待響應(yīng)泞当;invokeAsync異步發(fā)送迹蛤,在 InvokeCallback 回調(diào)方法中傳遞響應(yīng)結(jié)果;invokeOneway 只是發(fā)送數(shù)據(jù)襟士,不管響應(yīng)結(jié)果盗飒。
    注意如果 addr 值為 null,就表示向 NameServer 服務(wù)器地址發(fā)送數(shù)據(jù)陋桂。

  2. updateNameServerAddressListgetNameServerAddressList : 更新和獲取 NameServer 服務(wù)器的地址列表逆趣。
  3. registerProcessor : 注冊遠(yuǎn)程請求命令處理程序。

    注意客戶端不只是接收到服務(wù)端的響應(yīng)結(jié)果嗜历,也會接收到服務(wù)端的請求數(shù)據(jù)的宣渗,一般都是服務(wù)端主動通知客戶端的數(shù)據(jù)信息抖所;在 MQClientAPIImpl 類中調(diào)用了這個方法。

  4. setCallbackExecutorgetCallbackExecutor : 異步請求響應(yīng)回調(diào) InvokeCallback 方法的線程池執(zhí)行器痕囱。
  5. isChannelWritable : 通道是否可寫田轧。

1.3 RemotingServer 接口

/**
 * 遠(yuǎn)程RPC調(diào)用服務(wù)服務(wù)端
 */
public interface RemotingServer extends RemotingService {

    /**
     * 注冊特定請求(requestCode)的處理器和對應(yīng)線程池執(zhí)行器
     */
    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor);

    /**
     * 注冊默認(rèn)請求的處理器和對應(yīng)線程池執(zhí)行器
     */
    void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);

    /**
     * 服務(wù)端監(jiān)聽的端口
     */
    int localListenPort();

    /**
     * 根據(jù)請求 requestCode,獲取對應(yīng)請求命令處理器和線程池
     */
    Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);

    /**
     * 向指定客戶端 `channel` 發(fā)送數(shù)據(jù)鞍恢,并同步阻塞等待響應(yīng)傻粘。
     * 超過給的時間,沒有數(shù)據(jù)響應(yīng)有序,就拋出異常抹腿。
     */
    RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
        final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
        RemotingTimeoutException;

    /**
     * 向指定客戶端 `channel` 發(fā)送數(shù)據(jù),立刻返回旭寿。
     * 當(dāng)遠(yuǎn)程服務(wù)器有響應(yīng)警绩,那么就回調(diào) InvokeCallback 的方法,傳遞響應(yīng)數(shù)據(jù)盅称;
     * 如果超過給的時間肩祥,也會回調(diào) InvokeCallback 的方法,響應(yīng)失敗缩膝。
     */
    void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback) throws InterruptedException,
        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;

    /**
     * 向指定客戶端 `channel` 發(fā)送數(shù)據(jù)混狠,不管響應(yīng)結(jié)果。
     */
    void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException;

}

RemotingServerRPC 服務(wù)的服務(wù)端接口疾层,有如下方法:

  1. registerProcessor : 注冊特定請求(requestCode)的處理器和對應(yīng)線程池執(zhí)行器将饺。
  2. registerDefaultProcessor : 注冊默認(rèn)請求的處理器和對應(yīng)線程池執(zhí)行器。
  3. localListenPort : 獲取服務(wù)端監(jiān)聽的端口痛黎。
  4. getProcessorPair : 根據(jù)請求 requestCode予弧,獲取對應(yīng)請求命令處理器和線程池。
  5. invokeSync, invokeAsyncinvokeOneway : 向指定客戶端 channel 發(fā)送數(shù)據(jù)湖饱。

二. NettyRemotingAbstract

NettyRemotingAbstractRPC 服務(wù)基礎(chǔ)抽樣類掖蛤,客戶端和服務(wù)端實現(xiàn)類都繼承這個抽樣類。

2.1 重要成員變量

    /**
     * Semaphore to limit maximum number of on-going one-way requests, which protects system memory footprint.
     *
     * 就是為了限制 invokeOneway(...) 方法的最大請求數(shù)量井厌,保護系統(tǒng)內(nèi)存占用蚓庭。
     */
    protected final Semaphore semaphoreOneway;

    /**
     * Semaphore to limit maximum number of on-going asynchronous requests, which protects system memory footprint.
     *
     * 就是為了限制 invokeAsync(...) 方法的最大請求數(shù)量,保護系統(tǒng)內(nèi)存占用仅仆。
     */
    protected final Semaphore semaphoreAsync;

    /**
     * This map caches all on-going requests.
     *
     * 緩存所有正在進行的請求器赞。
     */
    protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
        new ConcurrentHashMap<Integer, ResponseFuture>(256);

    /**
     * This container holds all processors per request code, aka, for each incoming request, we may look up the
     * responding processor in this map to handle the request.
     *
     * 儲存所有請求命令(requestCode)對應(yīng)的處理器 NettyRequestProcessor 和 線程池執(zhí)行器,處理請求命令返回響應(yīng)結(jié)果
     */
    protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
        new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);

    /**
     * Executor to feed netty events to user defined {@link ChannelEventListener}.
     *
     * 將netty事件提供給用戶定義 ChannelEventListener 的后臺服務(wù)執(zhí)行器墓拜。
     */
    protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();

    /**
     * The default request processor to use in case there is no exact match in {@link #processorTable} per request code.
     *
     * 默認(rèn)的所有請求命令對應(yīng)的處理器拳魁;
     * 當(dāng)請求命令沒有精確匹配到處理器時使用。
     */
    protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;

    /**
     * SSL context via which to create {@link SslHandler}.
     *
     * 用于處理 SSL
     */
    protected volatile SslContext sslContext;

    /**
     * custom rpc hooks
     *
     * 用戶自定義的 RPC 回調(diào)鉤子
     */
    protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();

  1. semaphoreOnewaysemaphoreAsync : 通過信號量 Semaphore 來限制 onewayasync 請求的數(shù)量撮弧,因為這兩種請求都是異步潘懊。

    同步請求不需要這個,因為同步請求本身就是阻塞的贿衍。

  2. responseTable : 記錄所有正在進行的請求授舟,包括同步請求和異步請求,但是沒有 oneway 類型請求贸辈。
  3. processorTable : 儲存指定請求命令(requestCode)對應(yīng)的處理器 NettyRequestProcessor 和 線程池執(zhí)行器释树。即通過 registerProcessor 方法注冊的。
  4. defaultRequestProcessor : 默認(rèn)的所有請求命令對應(yīng)的處理器擎淤,當(dāng) processorTable 中沒有找到對應(yīng)的處理器時奢啥,就會使用這個。

    注意這個值只在服務(wù)端 (NettyRemotingServer) 實現(xiàn)中賦值了嘴拢;在客戶端(NettyRemotingClient) 實現(xiàn)中這個值就是 null桩盲,也就是說客戶端只能處理指定 requestCode 請求命令。

  5. nettyEventExecutor : 將 Netty 事件提供給用戶定義ChannelEventListener 接口的后臺服務(wù)執(zhí)行器席吴。
  6. sslContext : 用于處理 SSL赌结。
  7. rpcHooks : 用戶自定義的 RPC 回調(diào)鉤子。

2.2 重要方法

2.2.1 processMessageReceived 方法

// 處理遠(yuǎn)程命令孝冒,包括請求命令和響應(yīng)命令
    public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    // 請求命令
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    // 響應(yīng)命令
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }

這個方法一般都是在 Netty 接收到數(shù)據(jù)柬姚,轉(zhuǎn)成 RemotingCommand 對象,然后調(diào)用這個方法庄涡;分為請求命令和響應(yīng)命令量承。

2.2.2 processRequestCommand 方法

    /**
     * Process incoming request command issued by remote peer.
     *
     * 處理遠(yuǎn)程請求命令
     * @param ctx channel handler context.
     * @param cmd request command.
     */
    public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
        // 通過請求命令的 code 獲取對應(yīng)的處理器
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        // 如果沒有匹配到處理器,就用默認(rèn)的處理器
        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
        // 這次請求的 opaque穴店,用這個值來實現(xiàn)響應(yīng)和請求一一對應(yīng)
        final int opaque = cmd.getOpaque();

        if (pair != null) {
            Runnable run = new Runnable() {
                @Override
                public void run() {
                    try {
                        // 請求處理開始撕捍,回調(diào) RPC 鉤子方法
                        doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                        final RemotingResponseCallback callback = new RemotingResponseCallback() {
                            @Override
                            public void callback(RemotingCommand response) {
                                // 請求處理結(jié)束,回調(diào) RPC 鉤子方法
                                doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                                // 請求命令不是一次性命令 Oneway
                                if (!cmd.isOnewayRPC()) {
                                    if (response != null) {
                                        // 設(shè)置響應(yīng)的 opaque 就是請求的 opaque
                                        response.setOpaque(opaque);
                                        response.markResponseType();
                                        try {
                                            // 將響應(yīng)發(fā)送回去
                                            ctx.writeAndFlush(response);
                                        } catch (Throwable e) {
                                            log.error("process request over, but response failed", e);
                                            log.error(cmd.toString());
                                            log.error(response.toString());
                                        }
                                    } else {
                                    }
                                }
                            }
                        };
                        if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
                            AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
                            processor.asyncProcessRequest(ctx, cmd, callback);
                        } else {
                            NettyRequestProcessor processor = pair.getObject1();
                            RemotingCommand response = processor.processRequest(ctx, cmd);
                            callback.callback(response);
                        }
                    } catch (Throwable e) {
                        log.error("process request exception", e);
                        log.error(cmd.toString());

                        // 不是 Oneway 類型迹鹅,就要有響應(yīng)卦洽,返回 SYSTEM_ERROR 響應(yīng)
                        if (!cmd.isOnewayRPC()) {
                            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                                RemotingHelper.exceptionSimpleDesc(e));
                            response.setOpaque(opaque);
                            ctx.writeAndFlush(response);
                        }
                    }
                }
            };

            // 請求處理器 NettyRequestProcessor,是不是拒絕處理請求
            if (pair.getObject1().rejectRequest()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[REJECTREQUEST]system busy, start flow control for a while");
                // 設(shè)置響應(yīng)的 opaque 就是請求的 opaque
                response.setOpaque(opaque);
                // 將響應(yīng)發(fā)送回去
                ctx.writeAndFlush(response);
                return;
            }

            try {
                final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                // 請求處理過程是在請求處理器對應(yīng)的線程執(zhí)行器ExecutorService 中運行
                pair.getObject2().submit(requestTask);
            } catch (RejectedExecutionException e) {
                // RejectedExecutionException 表示線程池拒絕執(zhí)行任務(wù)斜棚。
                if ((System.currentTimeMillis() % 10000) == 0) {
                    log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                        + ", too many requests and system thread pool busy, RejectedExecutionException "
                        + pair.getObject2().toString()
                        + " request code: " + cmd.getCode());
                }

                // 異常處理的響應(yīng)
                if (!cmd.isOnewayRPC()) {
                    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                        "[OVERLOAD]system busy, start flow control for a while");
                    response.setOpaque(opaque);
                    ctx.writeAndFlush(response);
                }
            }
        } else {
            // 沒有對應(yīng)請求處理器的響應(yīng)
            String error = " request type " + cmd.getCode() + " not supported";
            final RemotingCommand response =
                RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
            response.setOpaque(opaque);
            // 將響應(yīng)數(shù)據(jù) response 返回
            ctx.writeAndFlush(response);
            log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
        }
    }

處理遠(yuǎn)程請求阀蒂,這個方法雖然長,但是流程其實很簡單:

  1. 通過 processorTabledefaultRequestProcessor 來得到這個請求命令對應(yīng)的處理器NettyRequestProcessor 和 線程執(zhí)行器ExecutorService 弟蚀。
  2. 獲取請求的 opaque, 這個值很重要蚤霞,用這個值來實現(xiàn)響應(yīng)和請求一一對應(yīng)。
  3. 如果沒有對應(yīng)處理器NettyRequestProcessor义钉,那么就返回 codeRemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED 的響應(yīng)昧绣。
  4. 創(chuàng)建一個 run 來包裝實際處理請求的代碼,因為請求處理過程要放在指定線程執(zhí)行器ExecutorService 中執(zhí)行捶闸。

    通過 NettyRequestProcessorprocessRequest 方法或者 asyncProcessRequest 方法處理請求夜畴,獲取響應(yīng)結(jié)果 response, 最后通過 ctx.writeAndFlush(response) 方法拖刃,將響應(yīng)返回給請求端。

  5. 判斷請求處理器 NettyRequestProcessor贪绘,是否拒絕處理請求兑牡。
  6. ExecutorService 的線程池中執(zhí)行 run

2.2.3 processResponseCommand 方法

    /**
     * Process response from remote peer to the previous issued requests.
     *
     * 處理響應(yīng)
     *
     * @param ctx channel handler context.
     * @param cmd response command instance.
     */
    public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        final int opaque = cmd.getOpaque();
        // 通過響應(yīng)的 opaque税灌,來獲取對應(yīng)請求的 ResponseFuture
        final ResponseFuture responseFuture = responseTable.get(opaque);
        if (responseFuture != null) {
            // 設(shè)置響應(yīng)結(jié)果
            responseFuture.setResponseCommand(cmd);

            responseTable.remove(opaque);

            if (responseFuture.getInvokeCallback() != null) {
                // 異步請求均函,設(shè)置響應(yīng)
                executeInvokeCallback(responseFuture);
            } else {
                // 同步請求,設(shè)置響應(yīng)
                responseFuture.putResponse(cmd);
                responseFuture.release();
            }
        } else {
            // 收到響應(yīng)菱涤,卻沒有對應(yīng)匹配的請求
            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(cmd.toString());
        }
    }

處理響應(yīng)結(jié)果苞也,方法流程:

  1. 通過響應(yīng)的 opaque,從 responseTable 中獲取對應(yīng)請求的ResponseFuture粘秆。
  2. 設(shè)置響應(yīng)值如迟,并從 responseTable 中移除這個ResponseFuture
  3. 如果是異步請求翻擒,那么通過 executeInvokeCallback 方法氓涣,在回調(diào)線程池中響應(yīng)回調(diào)。
  4. 如果是同步請求陋气,通過 responseFuture.putResponse(cmd) 方法劳吠,喚醒正在阻塞等待的線程。

2.2.4 invokeSyncImpl 方法

   /**
     * 同步發(fā)送請求
     */
    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
        final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        // 獲取這次請的 opaque
        final int opaque = request.getOpaque();

        try {
            // 構(gòu)建一個響應(yīng)ResponseFuture
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
            // 將這個請求存入 responseTable 中
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
            // 通過 channel.writeAndFlush(...) 方法將請求發(fā)送到遠(yuǎn)端
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    // 這里的回調(diào)巩趁,說明請求已經(jīng)發(fā)送出去了

                    if (f.isSuccess()) {
                        // 設(shè)置請求發(fā)送成功痒玩,直接返回
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        // 設(shè)置請求發(fā)送失敗,也就是不需要等待響應(yīng)了议慰,
                        // 這次請求就直接失敗了
                        responseFuture.setSendRequestOK(false);
                    }

                    // 執(zhí)行到這里蠢古,表明請求發(fā)送失敗了。

                    // 從正在請求集合中移除這次請求
                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    // 調(diào)用 putResponse 方法别凹,喚醒通過
                    // `responseFuture.waitResponse(timeoutMillis)` 等待請求返回的線程
                    responseFuture.putResponse(null);
                    log.warn("send a request command to channel <" + addr + "> failed.");
                }
            });


            // 同步發(fā)送請求草讶,這里需要等待遠(yuǎn)端的響應(yīng)。
            // 用到了countDownLatch 來阻塞當(dāng)前線程炉菲,
            // 等待響應(yīng)回來之后喚醒堕战,或者超時喚醒
            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            if (null == responseCommand) {
                // null == responseCommand,說明響應(yīng)沒有,需要拋出異常

                if (responseFuture.isSendRequestOK()) {
                    // 如果 isSendRequestOK 是true拍霜,說明請求發(fā)送出去了嘱丢,但是響應(yīng)沒有,響應(yīng)超時
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {
                    // 如果 isSendRequestOK 是false祠饺,說明請求都沒有發(fā)送出去越驻,發(fā)送請求超時。
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }

            return responseCommand;
        } finally {
            this.responseTable.remove(opaque);
        }
    }

發(fā)送同步請求,方法流程:

  1. 創(chuàng)建 ResponseFuture 對象缀旁,并存入到 responseTable 中记劈。
  2. 通過 channel.writeAndFlush() 方法,將請求發(fā)送到遠(yuǎn)端诵棵。

    并添加 ChannelFutureListener 監(jiān)控抠蚣,如果請求發(fā)送成功,那么設(shè)置 responseFuturesendRequestOKtrue履澳;如果請求發(fā)送失敗,那么從 responseTable 移除 responseFuture 怀跛,并通過 putResponse 方法喚醒阻塞等待的線程距贷。

  3. 調(diào)用responseFuture.waitResponse(timeoutMillis) 方法,同步阻塞等待響應(yīng)吻谋。
  4. 如果 responseCommandnull忠蝗,那么就拋出異常。

2.2.5 invokeAsyncImpl 方法

    /**
     * 異步發(fā)送請求
     */
    public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        // 開始時間
        long beginStartTime = System.currentTimeMillis();
        // 獲取這次請的 opaque
        final int opaque = request.getOpaque();
        // 通過 semaphoreAsync 來限制異步請求最大數(shù)量
        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            // 通過 SemaphoreReleaseOnlyOnce 保證異步請求漓拾,
            // 只會釋放一個 semaphoreAsync 的許可
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTime) {
                // 已經(jīng)超時阁最,釋放許可,并拋出異常
                once.release();
                throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
            }

            // 構(gòu)建一個響應(yīng)ResponseFuture骇两,
            // 注意異步請求就會有 invokeCallback 對象,
            // 也要將 SemaphoreReleaseOnlyOnce 對象傳遞進去速种,用于釋放semaphoreAsync 許可。
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
            // 將這個請求存入 responseTable 中
            this.responseTable.put(opaque, responseFuture);
            try {
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        // 這里的回調(diào)低千,說明請求已經(jīng)發(fā)送出去了

                        if (f.isSuccess()) {
                            // 設(shè)置請求發(fā)送成功配阵,直接返回
                            responseFuture.setSendRequestOK(true);
                            return;
                        }
                        // 執(zhí)行到這里,表明請求發(fā)送失敗了示血。
                        requestFail(opaque);
                        // 這里沒有調(diào)用 responseFuture.setCause(f.cause());棋傍, ChannelFuture的異常丟失了
                        log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                    }
                });
            } catch (Exception e) {
                // 釋放semaphoreAsync 許可。
                responseFuture.release();
                log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
            }
        } else {
            // 執(zhí)行到這里难审,表示沒有獲取到發(fā)送異步請求的許可瘫拣,直接拋出超時異常
            if (timeoutMillis <= 0) {
                throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
            } else {
                String info =
                    String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                        timeoutMillis,
                        this.semaphoreAsync.getQueueLength(),
                        this.semaphoreAsync.availablePermits()
                    );
                log.warn(info);
                throw new RemotingTimeoutException(info);
            }
        }
    }

發(fā)送異步請求,方法流程:

  1. 通過 semaphoreAsync 來限制異步請求最大數(shù)量告喊。
  2. 如果沒有獲取到許可麸拄,那么就拋出異常。
  3. 獲取到許可葱绒,先創(chuàng)建 SemaphoreReleaseOnlyOnce 對象感帅,保證只會釋放一次 semaphoreAsync 的許可。
  4. 創(chuàng)建 ResponseFuture 對象地淀,并存入到 responseTable 中失球。
  5. 通過 channel.writeAndFlush(request) 方法,將請求送到到遠(yuǎn)端。

    并添加 ChannelFutureListener 監(jiān)控实苞,如果請求發(fā)送成功豺撑,那么設(shè)置 responseFuturesendRequestOKtrue;如果請求發(fā)送失敗黔牵,那么調(diào)用 requestFail() 方法聪轿,進行失敗通知。

  6. invokeSyncImpl 方法不同猾浦,就是不會阻塞等待陆错,通過 processResponseCommand 方法,調(diào)用 executeInvokeCallback 方法金赦,通知異步請求的響應(yīng)結(jié)果音瓷。

2.2.6 invokeOnewayImpl 方法

    /**
     * 發(fā)送一次性請求
     */
    public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        // 設(shè)置請求是一次性請求 RPC_ONEWAY
        request.markOnewayRPC();
        // 通過 semaphoreAsync 來限制一次性請求最大數(shù)量
        boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            // 通過SemaphoreReleaseOnlyOnce保證,只會釋放一個 semaphoreOneway 的許可
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
            try {
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        // 請求發(fā)送成功夹抗,就表示完成了绳慎,不需要等待響應(yīng)
                        once.release();
                        if (!f.isSuccess()) {
                            log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
                        }
                    }
                });
            } catch (Exception e) {
                once.release();
                log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
            }
        } else {
            // 執(zhí)行到這里,表示沒有獲取到發(fā)送一次性請求的許可漠烧,直接拋出超時異常
            if (timeoutMillis <= 0) {
                throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
            } else {
                String info = String.format(
                    "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                    timeoutMillis,
                    this.semaphoreOneway.getQueueLength(),
                    this.semaphoreOneway.availablePermits()
                );
                log.warn(info);
                throw new RemotingTimeoutException(info);
            }
        }
    }

發(fā)送一次性請求杏愤,與 invokeAsyncImpl 方法相比,就是不需要創(chuàng)建ResponseFuture 對象已脓,存入到 responseTable 中珊楼。

2.2.7 scanResponseTable 方法

    /**
     * <p>
     * This method is periodically invoked to scan and expire deprecated request.
     * </p>
     *
     * 掃描所有正在進行的請求,發(fā)現(xiàn)超時的請求摆舟,就移除它亥曹,并進行失敗通知
     */
    public void scanResponseTable() {
        // 記錄所有過期的請求
        final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
        Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();

        // 遍歷所有正在進行的請求
        while (it.hasNext()) {
            Entry<Integer, ResponseFuture> next = it.next();
            ResponseFuture rep = next.getValue();
            
            // 如果這個請求的時間已經(jīng)超過設(shè)置的超時時間TimeoutMillis,
            // 那么就要從 responseTable 中移除它恨诱,添加到 rfList 集合中媳瞪,進行失敗通知。
            if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
                rep.release();
                it.remove();
                rfList.add(rep);
                log.warn("remove timeout request, " + rep);
            }
        }
        // 遍歷超時的請求照宝,通知它們
        for (ResponseFuture rf : rfList) {
            try {
                executeInvokeCallback(rf);
            } catch (Throwable e) {
                log.warn("scanResponseTable, operationComplete Exception", e);
            }
        }
    }

掃描所有正在進行的請求蛇受,發(fā)現(xiàn)超時的請求,就從 responseTable 中移除它厕鹃,并進行失敗通知兢仰。

注意這里雖然掃描所有正在進行請求(包括同步請求和異步請求) 的超時情況,但是只調(diào)用了 executeInvokeCallback() 方法剂碴,進行異步請求的通知把将;而沒有調(diào)用 responseFuture.putResponse() 方法,喚醒同步請求忆矛,因為同步請求 waitResponse() 方法察蹲,等超時了會自動喚醒请垛。

2.3 ResponseFuture

    // 請求編號
    private final int opaque;
    // 發(fā)送請求的通道 Channel
    private final Channel processChannel;
    // 請求超時時間
    private final long timeoutMillis;
    // 異步請求回調(diào)接口實例
    private final InvokeCallback invokeCallback;
    // 開始時間,用于判斷是否超時
    private final long beginTimestamp = System.currentTimeMillis();
    // 用于同步請求洽议,阻塞當(dāng)前線程
    private final CountDownLatch countDownLatch = new CountDownLatch(1);

    // 用于保證只釋放一次許可 Semaphore
    private final SemaphoreReleaseOnlyOnce once;

    // 保證異步回調(diào)只調(diào)用一次
    private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false);
    // 響應(yīng)結(jié)果對象
    private volatile RemotingCommand responseCommand;
    // 發(fā)送請求成功
    private volatile boolean sendRequestOK = true;
    // 失敗原因
    private volatile Throwable cause;

通過 countDownLatch 來實現(xiàn)同步阻塞宗收,通過 executeCallbackOnlyOnce 保證異步回調(diào)只調(diào)用一次。

注:這里 executeCallbackOnlyOnce 不應(yīng)該使用 AtomicBoolean 類型亚兄,因為 ResponseFuture 對象每次請求的時候都會創(chuàng)建混稽,使用AtomicBoolean 對象,非常占用內(nèi)存审胚,應(yīng)該使用 AtomicIntegerFieldUpdater + volatile int 的模式匈勋。

2.4 小結(jié)

  1. 通過 processMessageReceived() 方法,處理遠(yuǎn)程命令包括請求命令和響應(yīng)命令菲盾。

    • 通過 processRequestCommand() 方法處理請求命令颓影,根據(jù)請求 code 獲取對應(yīng)的請求處理器和線程池執(zhí)行器,在對應(yīng)線程池中處理請求命令懒鉴。
    • 通過 processResponseCommand() 方法處理響應(yīng),如果是同步請求碎浇,就是喚醒阻塞等待線程临谱,并獲取響應(yīng)結(jié)果;如果是異步線程奴璃, 就在異步線程池執(zhí)行器 getCallbackExecutor() 中悉默,將響應(yīng)結(jié)果回調(diào)。
  2. invokeSyncImpl() 方法苟穆,發(fā)送同步請求抄课。創(chuàng)建 ResponseFuture 對象放入 responseTable 集合中,通過channel.writeAndFlush(request) 方法發(fā)送請求數(shù)據(jù)雳旅,通過 responseFuture.waitResponse() 方法阻塞當(dāng)前線程跟磨,等待響應(yīng)結(jié)果。

  3. invokeAsyncImpl 方法攒盈,發(fā)送異步請求抵拘。通過 semaphoreAsync 限制異步請求并發(fā)數(shù),然后創(chuàng)建 ResponseFuture 對象放入 responseTable 集合中型豁,通過channel.writeAndFlush(request) 方法發(fā)送請求數(shù)據(jù)僵蛛。

  4. invokeOnewayImpl 方法,發(fā)送一次性請求迎变。通過 semaphoreOneway 限制異步請求并發(fā)數(shù)充尉,通過channel.writeAndFlush(request) 方法發(fā)送請求數(shù)據(jù)。

  5. scanResponseTable 方法衣形,定時巡查超時請求驼侠,并進行通知。

三. NettyRemotingClient

這個是RPC 服務(wù)的客戶端具體實現(xiàn)類。

3.1 重要成員屬性

    // Netty的配置項
    private final NettyClientConfig nettyClientConfig;
    // Netty客戶端引導(dǎo)類
    private final Bootstrap bootstrap = new Bootstrap();
    // 用來處理當(dāng)前客戶端所有 Socket連接的 IO事件
    private final EventLoopGroup eventLoopGroupWorker;
    // 控制 channelTables 并發(fā)修改的鎖
    private final Lock lockChannelTables = new ReentrantLock();
    // 緩存地址 addr 對應(yīng)的通道 channel泪电,這樣可以直接通過地址獲取 channel般妙,進行數(shù)據(jù)傳輸
    private final ConcurrentMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>();

    // 用于檢查請求是否過期的定時器
    private final Timer timer = new Timer("ClientHouseKeepingService", true);

    // namesrv 的地址列表
    private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
    // 當(dāng)前被選中 namesrv 地址
    private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
    // 記錄當(dāng)前選中 namesrv 的索引值
    private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
    // 用于并發(fā)修改 namesrvAddrList 和 namesrvAddrChoosed 值的鎖
    private final Lock lockNamesrvChannel = new ReentrantLock();


    // 公共線程池執(zhí)行器,
    // 如果調(diào)用 `registerProcessor(...)`方法注冊請求處理器NettyRequestProcessor時相速,沒有設(shè)置ExecutorService,那么就是publicExecutor碟渺;
    // 如果沒有設(shè)置異步請求響應(yīng)回調(diào)處理線程池 callbackExecutor,那么也直接使用這個公共線程池 publicExecutor突诬。
    private final ExecutorService publicExecutor;

    /**
     * 異步請求響應(yīng)回調(diào)處理線程池 callbackExecutor
     */
    private ExecutorService callbackExecutor;
    // Netty 事件的監(jiān)聽接口
    private final ChannelEventListener channelEventListener;
    // 用來處理 ChannelHandler 的方法苫拍,線程數(shù)是 NettyClientConfig 中的 clientWorkerThreads 值
    private DefaultEventExecutorGroup defaultEventExecutorGroup;
  1. nettyClientConfig : Netty 的一些配置項值。
  2. bootstrap : Netty 客戶端引導(dǎo)類旺隙。
  3. lockChannelTableschannelTables : 緩存地址 addr 對應(yīng)的通道 channel绒极。
  4. timer : 用于檢查請求是否過期的定時器。
  5. namesrvAddrList, namesrvAddrChoosed, namesrvIndexlockNamesrvChannel: 記錄 namesrv 地址列表蔬捷,和當(dāng)前選中的 namesrv 地址垄提。
  6. publicExecutor : 公共線程池執(zhí)行器。
  7. callbackExecutor : 異步請求響應(yīng)回調(diào)處理線程池執(zhí)行器周拐。
  8. channelEventListener : Netty 事件的監(jiān)聽接口铡俐。
  9. eventLoopGroupWorker : 用來處理當(dāng)前客戶端所有 Socket 連接的 IO 事件,只需要一個線程就可以了妥粟。
  10. defaultEventExecutorGroup : 用來處理 ChannelHandler 的方法审丘,線程數(shù)是 NettyClientConfig 中的 clientWorkerThreads 值。

3.2 重要方法

3.2.1 構(gòu)造方法

   public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
        this(nettyClientConfig, null);
    }

    public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
        final ChannelEventListener channelEventListener) {
        super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
        this.nettyClientConfig = nettyClientConfig;
        this.channelEventListener = channelEventListener;

        // 獲取公共線程池的線程數(shù)
        int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
        if (publicThreadNums <= 0) {
            publicThreadNums = 4;
        }

        // 創(chuàng)建公共線程池 publicExecutor勾给,線程名都是 NettyClientPublicExecutor_ 開頭
        // 如果調(diào)用 `registerProcessor(...)`方法注冊 請求處理器NettyRequestProcessor滩报,
        // 沒有設(shè)置線程池,那么就用這個公共線程池播急,也就是處理請求的操作就在 publicExecutor 線程池中執(zhí)行脓钾。

        // 如果沒有設(shè)置異步請求響應(yīng)回調(diào)處理線程池 callbackExecutor,那么也直接使用這個公共線程池 publicExecutor
        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
            }
        });

        // 用來處理當(dāng)前客戶端所有 Socket連接的 IO事件旅择,就使用一個線程處理
        this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
            }
        });

        // 是否要使用 SSL
        if (nettyClientConfig.isUseTLS()) {
            try {
                sslContext = TlsHelper.buildSslContext(true);
                log.info("SSL enabled for client");
            } catch (IOException e) {
                log.error("Failed to create SSLContext", e);
            } catch (CertificateException e) {
                log.error("Failed to create SSLContext", e);
                throw new RuntimeException("Failed to create SSLContext", e);
            }
        }
    }

就是創(chuàng)建 publicExecutoreventLoopGroupWorker 對象惭笑,如果支持 SSL, 那么再創(chuàng)建 sslContext 對象。

3.2.2 start 方法

    @Override
    public void start() {
        // 這個線程池 defaultEventExecutorGroup 是用來處理 ChannelHandler 的方法
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyClientConfig.getClientWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
                }
            });

        // 通過 eventLoopGroupWorker 線程來接收 IO 事件生真,
        // 然后交給 defaultEventExecutorGroup 線程沉噩,來進行事件處理,
        // 這樣不會阻塞 處理 IO 事件線程柱蟀。
        Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            // Socket 的 keepalive 是false
            .option(ChannelOption.SO_KEEPALIVE, false)
            // Socket 連接建立的超時時間
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            // Socket 發(fā)送緩存區(qū)大小
            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            // Socket 接收緩存區(qū)大小
            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    if (nettyClientConfig.isUseTLS()) {
                        // 是否需要使用 SSL 加密
                        if (null != sslContext) {
                            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                            log.info("Prepend SSL handler");
                        } else {
                            log.warn("Connections are insecure as SSLContext is null!");
                        }
                    }
                    // 使用 defaultEventExecutorGroup 里面的線程
                    // 來處理接收到Socket IO事件數(shù)據(jù)的解析和處理
                    pipeline.addLast(
                        defaultEventExecutorGroup,
                        // 將命令RemotingCommand 對象轉(zhuǎn)成緩存區(qū)對象ByteBuf川蒙,以便發(fā)送到遠(yuǎn)端
                        new NettyEncoder(),
                        // 將接收到數(shù)據(jù)對象ByteBuf 解析成命令RemotingCommand 對象
                        new NettyDecoder(),
                        // 進行心跳處理,當(dāng)當(dāng)前通道Channel 超過一定時間沒有發(fā)送或者讀取到數(shù)據(jù),就當(dāng)失效處理
                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                        // 主要是做監(jiān)控用的,用來發(fā)送 Netty 的 CONNECT, CLOSE, IDLE, EXCEPTION 事件的
                        new NettyConnectManageHandler(),
                        // 這個就是用來處理解析后得到的 遠(yuǎn)程命令RemotingCommand瘟判,
                        // 其實就是調(diào)用了 processMessageReceived(...) 方法
                        new NettyClientHandler());
                }
            });

        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    // 每隔三秒掃描有沒有過期請求
                    NettyRemotingClient.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }
    }

這個方法很長乳讥,但是流程其實很簡單:

  1. 創(chuàng)建 defaultEventExecutorGroup 線程池執(zhí)行器,用于執(zhí)行 ChannelHandler 方法尚辑。
  2. 初始化 bootstrap 對象贱勃,使用 eventLoopGroupWorker 處理通道的 IO 事件刚操,使用 defaultEventExecutorGroup 執(zhí)行添加到 ChannelPipeline 的處理器ChannelHandler的方法恬汁。
  3. 定時器 timer 每隔三秒掃描有沒有過期請求伶椿。
  4. 如果 channelEventListener 不為 null, 那么開啟 nettyEventExecutor 線程, 將 Netty 事件提供給用戶定義 ChannelEventListener氓侧。

3.2.3 closeChannel 方法

  public void closeChannel(final String addr, final Channel channel) {
        if (null == channel)
            return;

        // 從通道channel 獲取對應(yīng)的遠(yuǎn)端地址
        final String addrRemote = null == addr ? RemotingHelper.parseChannelRemoteAddr(channel) : addr;

        try {
            // 加鎖脊另,因為要改變 channelTables 集合數(shù)據(jù)
            if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    boolean removeItemFromTable = true;
                    // 得到channelTables 中的通道 ChannelWrapper
                    final ChannelWrapper prevCW = this.channelTables.get(addrRemote);

                    log.info("closeChannel: begin close the channel[{}] Found: {}", addrRemote, prevCW != null);

                    // 如果 prevCW 沒有,或者和關(guān)閉的通道 channel 不是同一個约巷,都不用移除
                    if (null == prevCW) {
                        log.info("closeChannel: the channel[{}] has been removed from the channel table before", addrRemote);
                        removeItemFromTable = false;
                    } else if (prevCW.getChannel() != channel) {
                        log.info("closeChannel: the channel[{}] has been closed before, and has been created again, nothing to do.",
                            addrRemote);
                        removeItemFromTable = false;
                    }

                    if (removeItemFromTable) {
                        this.channelTables.remove(addrRemote);
                        log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
                    }

                    // 關(guān)閉通道
                    RemotingUtil.closeChannel(channel);
                } catch (Exception e) {
                    log.error("closeChannel: close the channel exception", e);
                } finally {
                    this.lockChannelTables.unlock();
                }
            } else {
                log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
            }
        } catch (InterruptedException e) {
            log.error("closeChannel exception", e);
        }
    }

將這個 channelchannelTables 中移除偎痛,并通過 lockChannelTables 鎖進行并發(fā)修改控制。

3.2.4 getAndCreateChannel 方法

 /**
     * 根據(jù)遠(yuǎn)程地址 addr 獲取通道Channel独郎,和遠(yuǎn)端進行交互
     */
    private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException {
        if (null == addr) {
            // 如果地址 addr 為null踩麦,那么就從 namesrvAddrList 列表中選擇一個
            return getAndCreateNameserverChannel();
        }

        // 記錄了每個地址對應(yīng)的通道
        ChannelWrapper cw = this.channelTables.get(addr);
        if (cw != null && cw.isOK()) {
            // 如果這個通道還能用,那么直接返回
            return cw.getChannel();
        }

        // 根據(jù)地址addr 創(chuàng)建通道 Channel
        return this.createChannel(addr);
    }
  1. 如果 addr == null氓癌,那么就是 namesrv 地址靖榕,通過 getAndCreateNameserverChannel() 方法,獲取對應(yīng)的通道 Channel 顽铸。
  2. 根據(jù)地址 addrchannelTables 中獲取對應(yīng)的 Channel
  3. 如果沒有可用通道 Channel料皇,通過 createChannel()方法谓松,創(chuàng)建這個地址 addr 的通道。

3.2.5 createChannel 方法

    private Channel createChannel(final String addr) throws InterruptedException {
        ChannelWrapper cw = this.channelTables.get(addr);
        // 如果 channelTables 中有地址addr 對應(yīng)的通道践剂,并且還是能用的直接返回
        if (cw != null && cw.isOK()) {
            return cw.getChannel();
        }

        if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                boolean createNewConnection;
                cw = this.channelTables.get(addr);
                if (cw != null) {
                    // 如果 channelTables 中有對應(yīng)的通道
                    if (cw.isOK()) {
                        // 通道還能用鬼譬,直接返回
                        return cw.getChannel();
                    } else if (!cw.getChannelFuture().isDone()) {
                        // 通道還沒有創(chuàng)建完成,那么也不需要再創(chuàng)建了逊脯,等待它創(chuàng)建完成
                        createNewConnection = false;
                    } else {
                        // 說明通道壞了优质,移除它,設(shè)置createNewConnection為 true军洼,重新創(chuàng)建
                        this.channelTables.remove(addr);
                        createNewConnection = true;
                    }
                } else {
                    createNewConnection = true;
                }

                if (createNewConnection) {
                    // 調(diào)用 bootstrap.connect(...) 方法創(chuàng)建通道
                    ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
                    log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
                    cw = new ChannelWrapper(channelFuture);
                    // 存入 channelTables 中巩螃。
                    this.channelTables.put(addr, cw);
                }
            } catch (Exception e) {
                log.error("createChannel: create channel exception", e);
            } finally {
                this.lockChannelTables.unlock();
            }
        } else {
            log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
        }

        if (cw != null) {
            ChannelFuture channelFuture = cw.getChannelFuture();
            // 等待通道創(chuàng)建完成
            if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
                if (cw.isOK()) {
                    log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
                    return cw.getChannel();
                } else {
                    log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());
                }
            } else {
                log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
                    channelFuture.toString());
            }
        }

        return null;
    }

通過 this.bootstrap.connect() 方法創(chuàng)建通道,但是要保證不能多次創(chuàng)建匕争,所以通過 lockChannelTables 進行并發(fā)控制避乏。

3.2.6 invokeSync 方法

    /**
     * 向遠(yuǎn)程服務(wù)器 addr 地址發(fā)送數(shù)據(jù),并同步阻塞等待響應(yīng)甘桑。
     */
    @Override
    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
        long beginStartTime = System.currentTimeMillis();
        // 根據(jù)遠(yuǎn)程地址 addr 獲取通道Channel拍皮,和遠(yuǎn)端進行交互
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            // 通道還是活躍的歹叮,能進行數(shù)據(jù)交互
            try {
                // 執(zhí)行鉤子
                doBeforeRpcHooks(addr, request);
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTime) {
                    // 超時就拋出異常
                    throw new RemotingTimeoutException("invokeSync call timeout");
                }
                // 調(diào)用 NettyRemotingAbstract 的invokeSyncImpl 方法,發(fā)送請求
                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
                // 執(zhí)行鉤子
                doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                return response;
            } catch (RemotingSendRequestException e) {
                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
                // 發(fā)生異常铆帽,關(guān)閉通道
                this.closeChannel(addr, channel);
                throw e;
            } catch (RemotingTimeoutException e) {
                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                    this.closeChannel(addr, channel);
                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
                }
                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                throw e;
            }
        } else {
            // 如果通道不活躍了咆耿,就關(guān)閉通道
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }
  1. 通過 getAndCreateChannel() 方法,獲取地址 addr 對應(yīng)的通道 channel爹橱。
  2. 如果通道不活躍了萨螺,就關(guān)閉通道,拋出異常宅荤。
  3. 通道可用屑迂,就調(diào)用父類 NettyRemotingAbstractinvokeSyncImpl() 方法。

3.2.7 invokeAsync 方法

    @Override
    public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
        throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException {
        long beginStartTime = System.currentTimeMillis();
        // 根據(jù)遠(yuǎn)程地址 addr 獲取通道Channel冯键,和遠(yuǎn)端進行交互
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            // 通道還是活躍的惹盼,能進行數(shù)據(jù)交互
            try {
                doBeforeRpcHooks(addr, request);
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTime) {
                    // 超時拋出異常
                    throw new RemotingTooMuchRequestException("invokeAsync call timeout");
                }
                // 調(diào)用 NettyRemotingAbstract 的 invokeAsyncImpl 方法,發(fā)送異步請求
                this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
            } catch (RemotingSendRequestException e) {
                log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            }
        } else {
            // 如果獲取的通道 Channel 不能用了惫确,那么從 channelTables 中移除手报,并拋出異常。
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }

invokeSync 方法流程類似改化。

3.3 相關(guān) NettyChannelHandler

 .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    if (nettyClientConfig.isUseTLS()) {
                        // 是否需要使用 SSL 加密
                        if (null != sslContext) {
                            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                            log.info("Prepend SSL handler");
                        } else {
                            log.warn("Connections are insecure as SSLContext is null!");
                        }
                    }
                    // 使用 defaultEventExecutorGroup 里面的線程
                    // 來處理接收到Socket IO事件數(shù)據(jù)的解析和處理
                    pipeline.addLast(
                        defaultEventExecutorGroup,
                        // 將命令RemotingCommand 對象轉(zhuǎn)成緩存區(qū)對象ByteBuf掩蛤,以便發(fā)送到遠(yuǎn)端
                        new NettyEncoder(),
                        // 將接收到數(shù)據(jù)對象ByteBuf 解析成命令RemotingCommand 對象
                        new NettyDecoder(),
                        // 進行心跳處理,當(dāng)當(dāng)前通道Channel 超過一定時間沒有發(fā)送或者讀取到數(shù)據(jù)陈肛,就當(dāng)失效處理
                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                        // 主要是做監(jiān)控用的揍鸟,用來發(fā)送 Netty 的 CONNECT, CLOSE, IDLE, EXCEPTION 事件的
                        new NettyConnectManageHandler(),
                        // 這個就是用來處理解析后得到的 遠(yuǎn)程命令RemotingCommand,
                        // 其實就是調(diào)用了 processMessageReceived(...) 方法
                        new NettyClientHandler());
                }
  1. SslHandler : 處理 SSL 加密句旱,通過sslContext.newHandler(ch.alloc()) 創(chuàng)建阳藻。
  2. NettyEncoder : 將命令 RemotingCommand 對象轉(zhuǎn)成緩存區(qū)對象ByteBuf,以便發(fā)送到遠(yuǎn)端谈撒。
  3. NettyDecoder : 將接收到數(shù)據(jù)對象 ByteBuf 解析成命令RemotingCommand 對象腥泥。
  4. IdleStateHandler : 進行心跳處理,當(dāng)通道 Channel 超過一定時間沒有發(fā)送或者讀取到數(shù)據(jù)啃匿,就會發(fā)送事件進行提醒蛔外。
  5. NettyConnectManageHandler : 主要是做監(jiān)控用的,用來發(fā)送 NettyCONNECT, CLOSE, IDLE, EXCEPTION 事件溯乒。
  6. NettyClientHandler : 調(diào)用父類 NettyRemotingAbstractprocessMessageReceived 方法夹厌,處理遠(yuǎn)程命令。

3.3.1 NettyEncoder

/**
 * 將 RemotingCommand 轉(zhuǎn)成緩存區(qū) ByteBuffer 對象橙数,
 */
@ChannelHandler.Sharable
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

    @Override
    public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
        throws Exception {
        try {
            // 得到遠(yuǎn)程命令頭信息數(shù)據(jù)
            ByteBuffer header = remotingCommand.encodeHeader();
            // 先寫頭數(shù)據(jù)
            out.writeBytes(header);
            byte[] body = remotingCommand.getBody();
            if (body != null) {
                // 再寫內(nèi)容體數(shù)據(jù)
                out.writeBytes(body);
            }
        } catch (Exception e) {
            // 發(fā)生錯誤尊流,那么就關(guān)閉通道 Channel。
            log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            if (remotingCommand != null) {
                log.error(remotingCommand.toString());
            }
            RemotingUtil.closeChannel(ctx.channel());
        }
    }
}

通過 encodeHeader() 方法獲取頭信息header, 寫入頭信息灯帮,然后再寫入體信息 body崖技。

  // 在 RemotingCommand 類中 encodeHeader 方法逻住。

    public ByteBuffer encodeHeader() {
        return encodeHeader(this.body != null ? this.body.length : 0);
    }

    public ByteBuffer encodeHeader(final int bodyLength) {
        // 1> header length size
        // 1. 整個遠(yuǎn)程命令 RemotingCommand 的總長度
        int length = 4;

        // 2> header data length
        // 2. 得到頭數(shù)據(jù)
        byte[] headerData;
        headerData = this.headerEncode();

        // 增加頭數(shù)據(jù)長度
        length += headerData.length;

        // 3> body data length
        // 3. 增加內(nèi)容體數(shù)據(jù)長度
        length += bodyLength;

        // 內(nèi)容體先不用添加,
        // 那么 ByteBuffer 大小就是 4(總長度) + 4(頭長度) + 數(shù)據(jù)頭內(nèi)容
        ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);

        // 先將總長度存入
        result.putInt(length);

        // header length
        // 頭數(shù)據(jù)長度 headerData.length 存入迎献,要進行處理
        // 第一個字節(jié)儲存類型瞎访,后三個字節(jié)儲存頭長度 headerData.length
        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

        // header data
        result.put(headerData);

        result.flip();

        return result;
    }

請求命令RemotingCommand 轉(zhuǎn)成ByteBuffer 的數(shù)據(jù)格式:

4個字節(jié)(總長度) + 4個字節(jié)(數(shù)據(jù)頭長度) + 數(shù)據(jù)頭(header)字節(jié)內(nèi)容 + 數(shù)據(jù)體(body)字節(jié)內(nèi)容

其中4個字節(jié)數(shù)據(jù)頭長度中,第一個字節(jié)儲存類型吁恍,后三個字節(jié)儲存頭長度扒秸,也就是說 數(shù)據(jù)頭長度不會超過三個字節(jié)大小。

3.3.2 NettyDecoder

/**
 * 將接收到數(shù)據(jù)對象 `ByteBuf` 解析成命令 `RemotingCommand` 對象
 */
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

    private static final int FRAME_MAX_LENGTH =
        Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));

    public NettyDecoder() {
        /**
         * 0->4 用4個字節(jié)表示整個內(nèi)容幀的總長度
         * initialBytesToStrip == 4冀瓦,表示最后得到的數(shù)據(jù)伴奥,是跳過這個總長度字段。
         */
        super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
    }

    @Override
    public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf frame = null;
        try {
            // 得到完整的遠(yuǎn)程命令數(shù)據(jù)對應(yīng)的緩存區(qū)ByteBuf
            frame = (ByteBuf) super.decode(ctx, in);
            if (null == frame) {
                return null;
            }

            ByteBuffer byteBuffer = frame.nioBuffer();

            // 從緩存區(qū)ByteBuf 中解析出一個遠(yuǎn)程命令RemotingCommand對象
            return RemotingCommand.decode(byteBuffer);
        } catch (Exception e) {
            log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            RemotingUtil.closeChannel(ctx.channel());
        } finally {
            if (null != frame) {
                frame.release();
            }
        }

        return null;
    }
}

繼承自 LengthFieldBasedFrameDecoder 類翼闽,得到完整的遠(yuǎn)程命令數(shù)據(jù)對應(yīng)的緩存區(qū) ByteBuf拾徙,再通過RemotingCommand.decode(byteBuffer) 從緩存區(qū) ByteBuf 中解析出一個遠(yuǎn)程命令RemotingCommand對象。

關(guān)于 LengthFieldBasedFrameDecoder 用法感局,請看Netty源碼_編解碼器尼啡。

    public static RemotingCommand decode(final ByteBuffer byteBuffer) {
        // length 去掉了表示總長度的4個字節(jié),也就是說只包括 4(頭長度) + 頭內(nèi)容 + 體內(nèi)容
        int length = byteBuffer.limit();
        // 一個四個字節(jié)询微,最高一個字節(jié)記錄類型崖瞭,即 JSON 或者 ROCKETMQ
        // 剩下三個字節(jié)才代表頭數(shù)據(jù)長度,即 oriHeaderLen & 0xFFFFFF
        int oriHeaderLen = byteBuffer.getInt();
        int headerLength = getHeaderLength(oriHeaderLen);

        byte[] headerData = new byte[headerLength];
        // 將頭數(shù)據(jù)存入 字節(jié)數(shù)組headerData 中
        byteBuffer.get(headerData);

        // 解析頭內(nèi)容
        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

        // 得到數(shù)據(jù)體的字節(jié)長度
        int bodyLength = length - 4 - headerLength;
        byte[] bodyData = null;
        if (bodyLength > 0) {
            bodyData = new byte[bodyLength];
            byteBuffer.get(bodyData);
        }
        cmd.body = bodyData;

        return cmd;
    }

根據(jù)數(shù)據(jù)頭長度和數(shù)據(jù)體長度撑毛,從緩存區(qū)中解析出书聚,數(shù)據(jù)頭和數(shù)據(jù)頭的內(nèi)容。

3.3.3 NettyConnectManageHandler

class NettyConnectManageHandler extends ChannelDuplexHandler {
        @Override
        public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
            ChannelPromise promise) throws Exception {
            final String local = localAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(localAddress);
            final String remote = remoteAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(remoteAddress);
            log.info("NETTY CLIENT PIPELINE: CONNECT  {} => {}", local, remote);

            super.connect(ctx, remoteAddress, localAddress, promise);

            // 發(fā)送連接 CONNECT 事件通知
            if (NettyRemotingClient.this.channelEventListener != null) {
                NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remote, ctx.channel()));
            }
        }

        @Override
        public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.info("NETTY CLIENT PIPELINE: DISCONNECT {}", remoteAddress);
            // 關(guān)閉通道
            closeChannel(ctx.channel());
            super.disconnect(ctx, promise);

            // 發(fā)送關(guān)閉 CLOSE 事件通知
            if (NettyRemotingClient.this.channelEventListener != null) {
                NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
            }
        }

        @Override
        public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
            // 關(guān)閉通道
            closeChannel(ctx.channel());
            super.close(ctx, promise);
            NettyRemotingClient.this.failFast(ctx.channel());
            // 發(fā)送關(guān)閉 CLOSE 事件通知
            if (NettyRemotingClient.this.channelEventListener != null) {
                NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
            }
        }

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            // 接收到通道長時間空閑事件藻雌,即心跳檢測
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                if (event.state().equals(IdleState.ALL_IDLE)) {
                    final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                    log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
                    // 關(guān)閉通道
                    closeChannel(ctx.channel());
                    if (NettyRemotingClient.this.channelEventListener != null) {
                        // 發(fā)送空閑 IDLE 事件通知
                        NettyRemotingClient.this
                            .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
                    }
                }
            }

            ctx.fireUserEventTriggered(evt);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress);
            log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
            // 關(guān)閉通道
            closeChannel(ctx.channel());
            // 發(fā)送異常 EXCEPTION 事件通知
            if (NettyRemotingClient.this.channelEventListener != null) {
                NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
            }
        }
    }

主要是做監(jiān)控用的寺惫,用來發(fā)送 Netty 的 CONNECT, CLOSE, IDLE, EXCEPTION 事件。

3.3.4 NettyClientHandler

    class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            // 處理遠(yuǎn)程命令
            processMessageReceived(ctx, msg);
        }
    }

3.4 小結(jié)

NettyRemotingClient 主要功能:

  1. 通過 bootstrap 創(chuàng)建連接通道 channel蹦疑,并使用 channelTables 緩存地址addr 和 通道 channel 對應(yīng)關(guān)系,不需要每次都創(chuàng)建通道萨驶。
  2. 記錄 namesrv 的地址列表歉摧,當(dāng)發(fā)送請求時,沒有寫地址 addr 腔呜,那么就向 namesrv 的地址發(fā)送請求叁温。
  3. 通過 invokeSync,invokeAsyncinvokeOneway 發(fā)送請求,其實就是先根據(jù)地址 addr 獲取可使用的通道 channel核畴,調(diào)用父類對應(yīng)方法發(fā)送請求數(shù)據(jù)膝但。

四. NettyRemotingServer

這個是 RPC 服務(wù)的服務(wù)端具體實現(xiàn)類。

4.1 重要的成員屬性

    // Netty 服務(wù)端引導(dǎo)類
    private final ServerBootstrap serverBootstrap;
    // 處理連接上服務(wù)端的所有 Socket 的IO 事件
    private final EventLoopGroup eventLoopGroupSelector;
    // 處理服務(wù)端接收客戶端連接的線程池
    private final EventLoopGroup eventLoopGroupBoss;
    // Netty的配置項
    private final NettyServerConfig nettyServerConfig;

    // 公共線程池
    private final ExecutorService publicExecutor;
    // Netty 事件的監(jiān)聽接口
    private final ChannelEventListener channelEventListener;

4.2 重要方法

4.2.1 構(gòu)造方法

    public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
        this(nettyServerConfig, null);
    }

    public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
        final ChannelEventListener channelEventListener) {
        super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
        this.serverBootstrap = new ServerBootstrap();
        this.nettyServerConfig = nettyServerConfig;
        this.channelEventListener = channelEventListener;

        int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
        if (publicThreadNums <= 0) {
            publicThreadNums = 4;
        }

        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
            }
        });

        if (useEpoll()) {
            this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });

            this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        } else {
            this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });

            this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        }

        loadSslContext();
    }

創(chuàng)建公共線程池 publicExecutor谤草,根據(jù)是否使用 useEpoll()跟束,創(chuàng)建不同的 eventLoopGroupBosseventLoopGroupSelector 實現(xiàn)莺奸。

4.2.2 start 方法

   public void start() {
        // 這個線程池 defaultEventExecutorGroup 是用來處理 ChannelHandler 的方法
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });

        // 創(chuàng)建共享的 ChannelHandler
        prepareSharableHandlers();

        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                connectionManageHandler,
                                serverHandler
                            );
                    }
                });

        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            // 服務(wù)端綁定監(jiān)控端口
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }

        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    // 每隔三秒掃描有沒有過期請求
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }
  1. 創(chuàng)建 defaultEventExecutorGroup 線程池。
  2. 創(chuàng)建共享的 ChannelHandler 實例冀宴。
  3. 初始化 serverBootstrap 服務(wù)端灭贷。

    添加的 ChannelHandlerNettyRemotingClient 中的類似,這里就不再展開分析了略贮。

  4. 服務(wù)端 serverBootstrap 綁定監(jiān)控端口甚疟。
  5. 定時器 timer 每隔三秒掃描有沒有過期請求。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末逃延,一起剝皮案震驚了整個濱河市览妖,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌揽祥,老刑警劉巖讽膏,帶你破解...
    沈念sama閱讀 218,284評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異盔然,居然都是意外死亡桅打,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評論 3 395
  • 文/潘曉璐 我一進店門愈案,熙熙樓的掌柜王于貴愁眉苦臉地迎上來挺尾,“玉大人,你說我怎么就攤上這事站绪≡馄蹋” “怎么了?”我有些...
    開封第一講書人閱讀 164,614評論 0 354
  • 文/不壞的土叔 我叫張陵恢准,是天一觀的道長魂挂。 經(jīng)常有香客問我,道長馁筐,這世上最難降的妖魔是什么涂召? 我笑而不...
    開封第一講書人閱讀 58,671評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮敏沉,結(jié)果婚禮上果正,老公的妹妹穿的比我還像新娘。我一直安慰自己盟迟,他們只是感情好秋泳,可當(dāng)我...
    茶點故事閱讀 67,699評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著攒菠,像睡著了一般迫皱。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上辖众,一...
    開封第一講書人閱讀 51,562評論 1 305
  • 那天卓起,我揣著相機與錄音和敬,去河邊找鬼。 笑死既绩,一個胖子當(dāng)著我的面吹牛概龄,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播饲握,決...
    沈念sama閱讀 40,309評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼私杜,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了救欧?” 一聲冷哼從身側(cè)響起衰粹,我...
    開封第一講書人閱讀 39,223評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎笆怠,沒想到半個月后铝耻,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,668評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡蹬刷,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,859評論 3 336
  • 正文 我和宋清朗相戀三年瓢捉,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片办成。...
    茶點故事閱讀 39,981評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡泡态,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出迂卢,到底是詐尸還是另有隱情某弦,我是刑警寧澤,帶...
    沈念sama閱讀 35,705評論 5 347
  • 正文 年R本政府宣布而克,位于F島的核電站靶壮,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏员萍。R本人自食惡果不足惜腾降,卻給世界環(huán)境...
    茶點故事閱讀 41,310評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望碎绎。 院中可真熱鬧蜂莉,春花似錦、人聲如沸混卵。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽幕随。三九已至,卻和暖如春宿接,著一層夾襖步出監(jiān)牢的瞬間赘淮,已是汗流浹背辕录。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留梢卸,地道東北人走诞。 一個月前我還...
    沈念sama閱讀 48,146評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像蛤高,于是被迫代替她去往敵國和親蚣旱。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,933評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 簡介 在使用了消息隊列的通信方之間, 總體的通信架構(gòu)圖如下: 在消息生產(chǎn)者, broker和消息消費者之間都會發(fā)生...
    _呆瓜_閱讀 7,188評論 5 19
  • rocketmq網(wǎng)絡(luò)部分的整體的架構(gòu) remoting 模塊是 mq 的基礎(chǔ)通信模塊戴陡,理解通信層的原理對理解模塊間...
    millions_chan閱讀 3,499評論 2 5
  • 文章摘要:借用小廝的一句話“消息隊列的本質(zhì)在于消息的發(fā)送塞绿、存儲和接收”。那么恤批,對于一款消息隊列來說异吻,如何做到消息的...
    癲狂俠閱讀 19,704評論 6 31
  • RocketMQ使用Netty進行底層通信,包括NameServer喜庞、Broker(Master/Slave)诀浪、P...
    shallowinggg閱讀 1,119評論 0 0
  • 由于rocketMQ采用netty通信組件進行服務(wù)互通,所以需要定義好協(xié)議的格式延都。有些協(xié)議采用google的pro...
    挺ASir閱讀 1,595評論 0 0