RocketMQ源碼閱讀(二)-通信模塊

簡介

在使用了消息隊列的通信方之間, 總體的通信架構圖如下:

在消息生產者, broker和消息消費者之間都會發(fā)生通信, RocketMQ的通信層是基于通信框架netty之上做了簡單的協(xié)議封裝. 本人閱讀的RocketMQ版本是4.1.0-incubating-SNAPSHOT, 依賴的netty版本是4.0.36.Final. RocketMQ的代碼結構圖如下:

大體上分為broker, client, common filtersrv, namesrv和remoting等模塊, 通信框架就封裝在remoting模塊中.

本文從協(xié)議格式, 消息編解碼, 通信方式(同步, 異步, 單向)和通信流程(詳細介紹同步調用流程)等方面介紹RocketMQ的通信模塊.

設計要素

對于一個消息隊列的RPC網(wǎng)絡通信來說尚胞,要求并不像服務框架那樣苛刻, 滿足一下幾點即可:

  • 編解碼處理(負責通信中的編碼和解碼, 序列化, 通信協(xié)議設計等必要功能)
  • 雙向消息處理(包括同步或異步, MQ中有異步消息的功能)
  • 單向消息處理(一般指心跳消息或者注冊消息這樣的類型)

類圖

類層次結構

以RemotingService為最上層接口,提供了三個接口:

void start();
void shutdown();
void registerRPCHook(RPCHook rpcHook);

RemotingClient和RemotingServer都繼承了RemotingService接口, 并增加了自己特有的接口.
RemotingClient:

void updateNameServerAddressList(final List<String> addrs);

List<String> getNameServerAddressList();

RemotingCommand invokeSync(final String addr, final RemotingCommand request,
        final long timeoutMillis) throws InterruptedException, RemotingConnectException,
        RemotingSendRequestException, RemotingTimeoutException;

void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;

void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
        RemotingTimeoutException, RemotingSendRequestException;

void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor);

boolean isChannelWriteable(final String addr);

NettyRemotingClient和NettyRemotingServer分別實現(xiàn)了RemotingClient和RemotingServer, 并且都繼承了NettyRemotingAbstract類. NettyRemotingAbstract這個抽象類包含了很多公共數(shù)據(jù)處理,也包含了很多重要的數(shù)據(jù)結構, 這個稍后介紹.
其它還有NettyEvent, NettyEncoder, NettyDecoder和RemotingCommand等一系列通信過程中使用到的類.

協(xié)議設計與編碼解碼

在分析具體的api接口之前, 先介紹一下RocketMQ的通信協(xié)議是如何設計的.
具體的通信協(xié)議格式如下(重點理解, 能根據(jù)通信協(xié)議格式來對網(wǎng)絡中讀取的二進制數(shù)據(jù)進行編解碼):

協(xié)議格式

消息共分為四個部分:

  • 1.消息長度(總長度, 四個字節(jié)存儲, 占用一個int類型)
  • 2.序列化類型&消息頭長度(同樣占用一個int類型, 第一個字節(jié)表示序列化類型, 后面三個字節(jié)表示消息頭長度)
  • 3.消息頭數(shù)據(jù)
  • 4.消息主體數(shù)據(jù)

消息編碼過程由類RemotingCommand中的encode()方法完成. 代碼如下:

public ByteBuffer encode() {
    // 1> header length size
    int length = 4;    //消息總長度

    // 2> header data length
    //將消息頭編碼成byte[]
    byte[] headerData = this.headerEncode(); 
    //計算頭部長度 
    length += headerData.length;              

    // 3> body data length
    if (this.body != null) {
        //消息主體長度
        length += body.length;                
    }
    //分配ByteBuffer, 這邊加了4, 
    //這是因為在消息總長度的計算中沒有將存儲頭部長度的4個字節(jié)計算在內
    ByteBuffer result = ByteBuffer.allocate(4 + length);  

    // length
    //將消息總長度放入ByteBuffer
    result.putInt(length);   

    // header length
    //將消息頭長度放入ByteBuffer
    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); 

    // header data
    //將消息頭數(shù)據(jù)放入ByteBuffer
    result.put(headerData);    

    // body data;
    if (this.body != null) {
        //將消息主體放入ByteBuffer
        result.put(this.body); 
    }
    //重置ByteBuffer的position位置
    result.flip();     

    return result;
}

public static byte[] markProtocolType(int source, SerializeType type) {
    byte[] result = new byte[4];

    result[0] = type.getCode();
    result[1] = (byte) ((source >> 16) & 0xFF);
    result[2] = (byte) ((source >> 8) & 0xFF);
    result[3] = (byte) (source & 0xFF);
    return result;
}

相應的,decode的代碼如下(也在類RemotingCommand中):

public static RemotingCommand decode(final byte[] array) {
    ByteBuffer byteBuffer = ByteBuffer.wrap(array);
    return decode(byteBuffer);
}

public static RemotingCommand decode(final ByteBuffer byteBuffer) {
    int length = byteBuffer.limit();
    int oriHeaderLen = byteBuffer.getInt();
    //計算消息頭長度
    int headerLength = getHeaderLength(oriHeaderLen);

    byte[] headerData = new byte[headerLength];
    byteBuffer.get(headerData);

    RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

    int bodyLength = length - 4 - headerLength;
    byte[] bodyData = null;
    if (bodyLength > 0) {
        bodyData = new byte[bodyLength];
        byteBuffer.get(bodyData);
    }
    cmd.body = bodyData;

    return cmd;
}
public static int getHeaderLength(int length) {
    return length & 0xFFFFFF;
}

通信方式和通信流程

接下來看一下RocketMQ的通信方式, RocketMQ支持三種方式的通信:

  • 同步(sync)
  • 異步(async)
  • 單向(oneway)

下面以Remoting模塊中的一個單元測試為例, 說明同步調用的通信過程.

首先看一下同步調用的整體流程(客戶端):


同步調用客戶端流程圖

下面詳細分析流程圖中涉及的源代碼.
先由RemotingClient的實現(xiàn)類NettyRemotingClient構造請求(一個RemotingCommand實例), 然后根據(jù)addr獲取相應的channel, 調用invokeSyncImpl方法, 將數(shù)據(jù)流轉給抽象類NettyRemotingAbstract處理. 那么NettyRemotingClient是如何啟動的呢, 示例代碼如下:

//類:org.apache.rocketmq.remoting.RemotingServerTest
public static RemotingClient createRemotingClient() {
    NettyClientConfig config = new NettyClientConfig();
    RemotingClient client = new NettyRemotingClient(config);
    client.start();
    return client;
}

先實例化RemotingClient, 其構造函數(shù)如下:

public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
    this(nettyClientConfig, null);
}

public NettyRemotingClient(final NettyClientConfig nettyClientConfig, //
    final ChannelEventListener channelEventListener) {
    //調用父類的構造函數(shù), 主要是設置單向調用和異步調用兩種模式下的最大并發(fā)數(shù)
    super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
    this.nettyClientConfig = nettyClientConfig;
    //NettyEventExecuter處理線程會不斷從eventQueue中讀取消息, 調用注冊的ChannelEventListener進行處理
    this.channelEventListener = channelEventListener;

    //執(zhí)行用戶回調函數(shù)的線程數(shù)
    int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
    if (publicThreadNums <= 0) {
        publicThreadNums = 4;
    }
    //執(zhí)行用戶回調函數(shù)的線程池
    this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
        }
    });

    //netty eventLoopGroupWorker
    this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
        }
    });
}

接著啟動NettyRemotingClient, 代碼如下:

public void start() {
    //構建一個DefaultEventExecutorGroup, 用于處理netty handler中的操作
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//
        nettyClientConfig.getClientWorkerThreads(), //
        new ThreadFactory() {

            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
            }
        });

    //初始化netty, 對netty的用法不做介紹
    Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)//
        .option(ChannelOption.TCP_NODELAY, true)
        .option(ChannelOption.SO_KEEPALIVE, false)
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
        .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
        .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(
                    defaultEventExecutorGroup,
                    //編碼handler
                    new NettyEncoder(),
                    //解碼handler
                    new NettyDecoder(),
                    //心跳檢測
                    new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                    //連接管理handler,處理connect, disconnect, close等事件
                    new NettyConnectManageHandler(),
                    //處理接收到RemotingCommand消息后的事件, 收到服務器端響應后的相關操作
                    new NettyClientHandler());
            }
        });

    //定時掃描responseTable,獲取返回結果,并且處理超時
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                NettyRemotingClient.this.scanResponseTable();
            } catch (Exception e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);

    if (this.channelEventListener != null) {
        this.nettyEventExecuter.start();
    }
}

接下來看消息同步調用的邏輯.
同步調用的單元測試:

public void testInvokeSync() throws InterruptedException, RemotingConnectException,
    RemotingSendRequestException, RemotingTimeoutException {
    //消息頭
    RequestHeader requestHeader = new RequestHeader();
    requestHeader.setCount(1);
    requestHeader.setMessageTitle("Welcome");
    //構建請求
    RemotingCommand request = RemotingCommand.createRequestCommand(0, requestHeader);
    //同步發(fā)送消息
    RemotingCommand response = remotingClient.invokeSync("localhost:8888", request, 1000 * 3);
    assertTrue(response != null);
    assertThat(response.getLanguage()).isEqualTo(LanguageCode.JAVA);
    assertThat(response.getExtFields()).hasSize(2);
}

class RequestHeader implements CommandCustomHeader {
    @CFNullable
    private Integer count;

    @CFNullable
    private String messageTitle;

    @Override
    public void checkFields() throws RemotingCommandException {
    }

    public Integer getCount() {
        return count;
    }

    public void setCount(Integer count) {
        this.count = count;
    }

    public String getMessageTitle() {
        return messageTitle;
    }

    public void setMessageTitle(String messageTitle) {
        this.messageTitle = messageTitle;
    }
}
public interface CommandCustomHeader {
    void checkFields() throws RemotingCommandException;
}

首先構建消息頭, 然后調用RemotingCommand.createRequestCommand創(chuàng)建一個request(一個RemotingCommand實例),然后調用remotingClient.invokeSync發(fā)送請求.

public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
    throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
    //根據(jù)addr獲得channel
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
        try {
            //RocketMQ允許用戶定義rpc hook,可在發(fā)送請求前,或者接受響應后執(zhí)行
            if (this.rpcHook != null) {
                this.rpcHook.doBeforeRequest(addr, request);
            }
            //將數(shù)據(jù)流轉給抽象類NettyRemotingAbstract
            RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis);
            //rpc hook
            if (this.rpcHook != null) {
                this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
            }
            return response;
        } catch (RemotingSendRequestException e) {
            log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
            this.closeChannel(addr, channel);
            throw e;
        } catch (RemotingTimeoutException e) {
            if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                this.closeChannel(addr, channel);
                log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
            }
            log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}

可以看到, 真正發(fā)送請求的是invokeSyncImpl方法, 該方法定義在類NettyRemotingAbstract中, 代碼如下:

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        //相當于request ID, RemotingCommand會為每一個request產生一個request ID, 從0開始, 每次加1
        final int opaque = request.getOpaque();

        try {
            //根據(jù)request ID構建ResponseFuture
            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
            //將ResponseFuture放入responseTable
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
            //刷出數(shù)據(jù)
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                //消息發(fā)送后執(zhí)行
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        responseFuture.setSendRequestOK(false);
                    }

                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    responseFuture.putResponse(null);
                    PLOG.warn("send a request command to channel <" + addr + "> failed.");
                }
            });
            //等待服務器端響應結果
            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            if (null == responseCommand) {
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }

            return responseCommand;
        } finally {
            this.responseTable.remove(opaque);
        }
    }

從代碼中可以看到, 即使是同步調用模式, 在RocketMQ內部依然是采用異步的方式完成. 客戶端的流程大體就是如此, 下面介紹服務器端接收到請求后的處理流程.
先看流程圖:


服務器端消息處理流程圖

先由Netty接收消息, 接著由handler將數(shù)據(jù)流轉給NettyRemotingServer處理. 先看如何初始化一個RemotingServer. 示例代碼如下:

public static RemotingServer createRemotingServer() throws InterruptedException {
    NettyServerConfig config = new NettyServerConfig();
    //初始化RemotingServer, 此處的邏輯與RemotingClient大體相當
    RemotingServer remotingServer = new NettyRemotingServer(config);
    //注冊一個處理器,根據(jù)requestCode, 獲取處理器,處理請求
    remotingServer.registerProcessor(0, new NettyRequestProcessor() {
        @Override
        public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
            request.setRemark("Hi " + ctx.channel().remoteAddress());
            return request;
        }

        @Override
        public boolean rejectRequest() {
            return false;
        }
    }, Executors.newCachedThreadPool());
    //啟動RemotingServer
    remotingServer.start();

    return remotingServer;
}

首先實例化一個NettyRemotingServer對象, 此邏輯與NettyRemotingClient大致相當. 接著, 在remotingServer啟動之前注冊一個processor用于處理對應requestcode的處理器, 示例中用的是0, 這與remotingClient示例中的code是對應的(RemotingCommand.createRequestCommand(0, requestHeader)), 當remotingServer收到code=0的請求時,會使用這個processor去處理請求. 接著
啟動RemotingServer, 這個過程大致就是一個啟動netty ServerBootstrap的過程, 代碼如下:

public void start() {
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
        nettyServerConfig.getServerWorkerThreads(),
        new ThreadFactory() {

            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
            }
        });

    ServerBootstrap childHandler =
        this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 1024)
            .option(ChannelOption.SO_REUSEADDR, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
            .option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
            .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(
                        defaultEventExecutorGroup,
                        new NettyEncoder(),
                        new NettyDecoder(),
                        new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                        new NettyConnetManageHandler(),
                        new NettyServerHandler());
                }
            });

    if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
        childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    }

    try {
        ChannelFuture sync = this.serverBootstrap.bind().sync();
        InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
        this.port = addr.getPort();
    } catch (InterruptedException e1) {
        throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
    }

    if (this.channelEventListener != null) {
        this.nettyEventExecuter.start();
    }

    this.timer.scheduleAtFixedRate(new TimerTask() {

        @Override
        public void run() {
            try {
                NettyRemotingServer.this.scanResponseTable();
            } catch (Exception e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);
}

整個啟動過程與RemotingClient類似, 使用的handler也類似, 區(qū)別在于RemotingServerz. 服務器端以后添加的handler是NettyServerHandler(客戶端用的是NettyClientHandler). 服務器端接收到請求后, 對消息的處理邏輯在NettyRemotingAbstract中(此處省略了一大波netty框架接收到消息后的數(shù)據(jù)流轉過程), 如下:

public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
    //根據(jù)RemotingCommand中的code獲取processor和ExecutorService
    final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
    final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
    final int opaque = cmd.getOpaque();

    if (pair != null) {
        Runnable run = new Runnable() {
            @Override
            public void run() {
                try {
                    //rpc hook
                    RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
                    if (rpcHook != null) {
                        rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                    }
                    //processor處理請求
                    final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                    //rpc hook
                    if (rpcHook != null) {
                        rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                    }

                    if (!cmd.isOnewayRPC()) {
                        if (response != null) {
                            response.setOpaque(opaque);
                            response.markResponseType();
                            try {
                                ctx.writeAndFlush(response);
                            } catch (Throwable e) {
                                PLOG.error("process request over, but response failed", e);
                                PLOG.error(cmd.toString());
                                PLOG.error(response.toString());
                            }
                        } else {

                        }
                    }
                } catch (Throwable e) {
                    if (!"com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException"
                        .equals(e.getClass().getCanonicalName())) {
                        PLOG.error("process request exception", e);
                        PLOG.error(cmd.toString());
                    }

                    if (!cmd.isOnewayRPC()) {
                        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //
                            RemotingHelper.exceptionSimpleDesc(e));
                        response.setOpaque(opaque);
                        ctx.writeAndFlush(response);
                    }
                }
            }
        };

        if (pair.getObject1().rejectRequest()) {
            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                "[REJECTREQUEST]system busy, start flow control for a while");
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            return;
        }

        try {
            //封裝requestTask
            final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
            //想線程池提交requestTask
            pair.getObject2().submit(requestTask);
        } catch (RejectedExecutionException e) {
            if ((System.currentTimeMillis() % 10000) == 0) {
                PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
                    + ", too many requests and system thread pool busy, RejectedExecutionException " //
                    + pair.getObject2().toString() //
                    + " request code: " + cmd.getCode());
            }

            if (!cmd.isOnewayRPC()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[OVERLOAD]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
            }
        }
    } else {
        String error = " request type " + cmd.getCode() + " not supported";
        //構建response
        final RemotingCommand response =
            RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
        response.setOpaque(opaque);
        ctx.writeAndFlush(response);
        PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
    }
}

服務器端的流程大體如此.
將客戶端和服務器端聯(lián)合起來的流程圖如下:

同步調用整體流程圖

異步調用和單項調用的原理與同步調用大致相當, 此處不再重復介紹.

其他部分的處理過程

前文中講到, 每次有消息需要發(fā)送, 就會生成resposneFuture用于接收消息回應, 但是如果始終沒有收到回應, Map(scanResponseTable)中的responseFuture就會堆積.
這個時候就需要一個線程來專門做Map的清理回收, 即前文提到的定時掃描responseTable的任務, 這個線程會1s調用一次來檢查所有的responseFuture, 判斷是否有效, 是否已經得到返回, 并進行相應的處理. 代碼如下:

//類NettyRemotingAbstract
public void scanResponseTable() {
    final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
    Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<Integer, ResponseFuture> next = it.next();
        ResponseFuture rep = next.getValue();

        if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
            rep.release();
            it.remove();
            rfList.add(rep);
            PLOG.warn("remove timeout request, " + rep);
        }
    }

    for (ResponseFuture rf : rfList) {
        try {
            executeInvokeCallback(rf);
        } catch (Throwable e) {
            PLOG.warn("scanResponseTable, operationComplete Exception", e);
        }
    }
}

總結

消息隊列的網(wǎng)絡通信模塊總的來說并不復雜, 比較關鍵的幾個部分就是協(xié)議格式的設計, 維護request ID和responseFuture的對應關系, 超時處理等幾個方面.

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末帜慢,一起剝皮案震驚了整個濱河市笼裳,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌粱玲,老刑警劉巖躬柬,帶你破解...
    沈念sama閱讀 218,386評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異抽减,居然都是意外死亡楔脯,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,142評論 3 394
  • 文/潘曉璐 我一進店門胯甩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來昧廷,“玉大人,你說我怎么就攤上這事偎箫∧炯恚” “怎么了?”我有些...
    開封第一講書人閱讀 164,704評論 0 353
  • 文/不壞的土叔 我叫張陵淹办,是天一觀的道長眉枕。 經常有香客問我,道長,這世上最難降的妖魔是什么速挑? 我笑而不...
    開封第一講書人閱讀 58,702評論 1 294
  • 正文 為了忘掉前任谤牡,我火速辦了婚禮,結果婚禮上姥宝,老公的妹妹穿的比我還像新娘翅萤。我一直安慰自己,他們只是感情好腊满,可當我...
    茶點故事閱讀 67,716評論 6 392
  • 文/花漫 我一把揭開白布套么。 她就那樣靜靜地躺著,像睡著了一般碳蛋。 火紅的嫁衣襯著肌膚如雪胚泌。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,573評論 1 305
  • 那天肃弟,我揣著相機與錄音玷室,去河邊找鬼。 笑死笤受,一個胖子當著我的面吹牛穷缤,可吹牛的內容都是我干的。 我是一名探鬼主播感论,決...
    沈念sama閱讀 40,314評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼紊册!你這毒婦竟也來了比肄?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,230評論 0 276
  • 序言:老撾萬榮一對情侶失蹤囊陡,失蹤者是張志新(化名)和其女友劉穎芳绩,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體撞反,經...
    沈念sama閱讀 45,680評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡妥色,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,873評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了遏片。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片嘹害。...
    茶點故事閱讀 39,991評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖吮便,靈堂內的尸體忽然破棺而出笔呀,到底是詐尸還是另有隱情,我是刑警寧澤髓需,帶...
    沈念sama閱讀 35,706評論 5 346
  • 正文 年R本政府宣布许师,位于F島的核電站,受9級特大地震影響,放射性物質發(fā)生泄漏微渠。R本人自食惡果不足惜搭幻,卻給世界環(huán)境...
    茶點故事閱讀 41,329評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望逞盆。 院中可真熱鬧檀蹋,春花似錦、人聲如沸纳击。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,910評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽焕数。三九已至纱昧,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間堡赔,已是汗流浹背识脆。 一陣腳步聲響...
    開封第一講書人閱讀 33,038評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留善已,地道東北人灼捂。 一個月前我還...
    沈念sama閱讀 48,158評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像换团,于是被迫代替她去往敵國和親悉稠。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,941評論 2 355

推薦閱讀更多精彩內容

  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理艘包,服務發(fā)現(xiàn)的猛,斷路器,智...
    卡卡羅2017閱讀 134,657評論 18 139
  • Android 自定義View的各種姿勢1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 172,139評論 25 707
  • 【日精進:小煜媽打卡第173天】-20171125 今天和小叔小姑等兩三家人一起回老家想虎,一來是“池宰穑回家看看...
    b79ddab78458閱讀 42評論 0 0
  • 悠悠孤月輪,寥寥星河浪舌厨。 日日守如一岂却,千年猶在傍。 今夜短相逢裙椭,喜鵲連橋唱躏哩。 明朝久離別,唯愿人無恙揉燃。 若無兩心知...
    君懷璧閱讀 959評論 37 52
  • 聽陜北當?shù)鼐用裾f震庭,他們修窯不打地基,直接就在原土上砌你雌,頂多打個灰土已經算好的了器联,不會存在坍塌問題二汛。 于是,我們就照...
    劉冰杰閱讀 2,383評論 0 0