Dubbo——Transporter 層核心實(shí)現(xiàn)(上)

前言

dubbo-remoting 模塊提供了多種客戶端和服務(wù)端通信的功能。在 Dubbo 的整體架構(gòu)設(shè)計(jì)圖中疹吃,我們可以看到最底層紅色框選中的部分即為 Remoting 層萨驶,其中包括了 Exchange、Transport和Serialize 三個(gè)子層次核畴。這里我們要介紹的 dubbo-remoting 模塊主要對(duì)應(yīng) Exchange 和 Transport 兩層谤草。

本文從Transporter 層的 RemotingServer、Client温学、Channel枫浙、ChannelHandler 等核心接口出發(fā),介紹這些核心接口的實(shí)現(xiàn)紧帕。

AbstractPeer 抽象類

AbstractPeer抽象類是嗜,它同時(shí)實(shí)現(xiàn)了 Endpoint 接口和 ChannelHandler 接口,如下圖所示丽柿,它也是 AbstractChannel甫题、AbstractEndpoint 抽象類的父類坠非。

AbstractPeer 繼承關(guān)系

Netty 中也有 ChannelHandler盟迟、Channel 等接口攒菠,但無(wú)特殊說(shuō)明的情況下,這里的接口指的都是 Dubbo 中定義的接口新娜。

AbstractPeer 中有四個(gè)字段:一個(gè)是表示該端點(diǎn)自身的 URL 類型的字段,還有兩個(gè) Boolean 類型的字段(closing 和 closed)用來(lái)記錄當(dāng)前端點(diǎn)的狀態(tài)私杜,這三個(gè)字段都與 Endpoint 接口相關(guān)衰粹;第四個(gè)字段指向了一個(gè) ChannelHandler 對(duì)象,AbstractPeer 對(duì) ChannelHandler 接口的所有實(shí)現(xiàn)瓢捉,都是委托給了這個(gè) ChannelHandler 對(duì)象泡态。從上面的繼承關(guān)系圖中某弦,我們可以得出這樣一個(gè)結(jié)論:AbstractChannel惊科、AbstractServer馆截、AbstractClient 都是要關(guān)聯(lián)一個(gè) ChannelHandler 對(duì)象的蜡娶。

AbstractEndpoint 抽象類

我們順著上圖的繼承關(guān)系繼續(xù)向下看,AbstractEndpoint 繼承了 AbstractPeer 這個(gè)抽象類宿接。AbstractEndpoint 中維護(hù)了一個(gè) Codec2 對(duì)象(codec 字段)和兩個(gè)超時(shí)時(shí)間(timeout 字段和 connectTimeout 字段),在 AbstractEndpoint 的構(gòu)造方法中會(huì)根據(jù)傳入的 URL 初始化這三個(gè)字段:

public abstract class AbstractEndpoint extends AbstractPeer implements Resetable {

    private Codec2 codec;

    private int timeout;

    private int connectTimeout;

    public AbstractEndpoint(URL url, ChannelHandler handler) {
        // 調(diào)用父類AbstractPeer的構(gòu)造方法
        super(url, handler);
        // 根據(jù)URL中的codec參數(shù)值副女,確定此處具體的Codec2實(shí)現(xiàn)類
        this.codec = getChannelCodec(url);
        // 根據(jù)URL中的timeout參數(shù)確定timeout字段的值,默認(rèn)1000
        this.timeout = url.getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
        // 根據(jù)URL中的connect.timeout參數(shù)確定connectTimeout字段的值塞绿,默認(rèn)3000
        this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
    }
}

由于Codec2 接口是一個(gè) SPI 擴(kuò)展點(diǎn)拷窜,這里的 AbstractEndpoint.getChannelCodec() 方法就是基于 Dubbo SPI 選擇其擴(kuò)展實(shí)現(xiàn)的篮昧,具體實(shí)現(xiàn)如下:

public abstract class AbstractEndpoint extends AbstractPeer implements Resetable {

    protected static Codec2 getChannelCodec(URL url) {
        // 根據(jù)URL的codec參數(shù)獲取擴(kuò)展名
        String codecName = url.getProtocol(); // codec extension name must stay the same with protocol name
        if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
            // 通過(guò)ExtensionLoader加載并實(shí)例化Codec2的具體擴(kuò)展實(shí)現(xiàn)
            return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
        } else {
            // Codec2接口不存在相應(yīng)的擴(kuò)展名,就嘗試從Codec這個(gè)老接口的擴(kuò)展名中查找春宣,目前Codec接口已經(jīng)廢棄了
            return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class)
                    .getExtension(codecName));
        }
    }
}

另外躏惋,AbstractEndpoint 還實(shí)現(xiàn)了 Resetable 接口(只有一個(gè) reset() 方法需要實(shí)現(xiàn))距误,雖然 AbstractEndpoint 中的 reset() 方法比較長(zhǎng)准潭,但是邏輯非常簡(jiǎn)單,就是根據(jù)傳入的 URL 參數(shù)重置 AbstractEndpoint 的三個(gè)字段泼掠。下面是重置 codec 字段的代碼片段,還是調(diào)用 getChannelCodec() 方法實(shí)現(xiàn)的:

public abstract class AbstractEndpoint extends AbstractPeer implements Resetable {

    public void reset(URL url) {
        // 檢測(cè)當(dāng)前AbstractEndpoint是否已經(jīng)關(guān)閉(略)
        // 省略重置timeout沐鼠、connectTimeout兩個(gè)字段的邏輯
        try {
            if (url.hasParameter(Constants.CODEC_KEY)) {
                this.codec = getChannelCodec(url);
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
    }
}

Server 繼承路線分析

AbstractServer 和 AbstractClient 都實(shí)現(xiàn)了 AbstractEndpoint 抽象類,我們先來(lái)看 AbstractServer 的實(shí)現(xiàn)焰檩。AbstractServer 在繼承了 AbstractEndpoint 的同時(shí),還實(shí)現(xiàn)了 RemotingServer 接口衩侥,如下圖所示:


AbstractServer 繼承關(guān)系圖

AbstractServer

AbstractServer 是對(duì)服務(wù)端的抽象,實(shí)現(xiàn)了服務(wù)端的公共邏輯峦萎。AbstractServer 的核心字段有下面幾個(gè)爱榔。

  • localAddress详幽、bindAddress(InetSocketAddress 類型):分別對(duì)應(yīng)該 Server 的本地地址和綁定的地址悴能,都是從 URL 中的參數(shù)中獲取。bindAddress 默認(rèn)值與 localAddress 一致炒嘲。

  • accepts(int 類型):該 Server 能接收的最大連接數(shù),從 URL 的 accepts 參數(shù)中獲取夭拌,默認(rèn)值為 0,表示沒(méi)有限制。

  • executorRepository(ExecutorRepository 類型):負(fù)責(zé)管理線程池骡和,后面我們會(huì)深入介紹 ExecutorRepository 的具體實(shí)現(xiàn)。

  • executor(ExecutorService 類型):當(dāng)前 Server 關(guān)聯(lián)的線程池婆赠,由上面的 ExecutorRepository 創(chuàng)建并管理页藻。

在 AbstractServer 的構(gòu)造方法中會(huì)根據(jù)傳入的 URL初始化上述字段,并調(diào)用 doOpen() 這個(gè)抽象方法完成該 Server 的啟動(dòng)份帐,具體實(shí)現(xiàn)如下:

public abstract class AbstractServer extends AbstractEndpoint implements RemotingServer {

    protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
    ExecutorService executor;
    private InetSocketAddress localAddress;
    private InetSocketAddress bindAddress;
    private int accepts;
    private int idleTimeout;

    private ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();

    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        // 調(diào)用父類的構(gòu)造方法
        super(url, handler);
        // 根據(jù)傳入的URL初始化localAddress和bindAddress
        localAddress = getUrl().toInetSocketAddress();

        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
            bindIp = ANYHOST_VALUE;
        }
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        
        // 初始化accepts等字段
        this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
        try {
            // 調(diào)用doOpen()這個(gè)抽象方法璃吧,啟動(dòng)該Server
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        
         // 獲取該Server關(guān)聯(lián)的線程池
        executor = executorRepository.createExecutorIfAbsent(url);
    }
}

ExecutorRepository

在繼續(xù)分析 AbstractServer 的具體實(shí)現(xiàn)類之前,我們先來(lái)了解一下 ExecutorRepository 這個(gè)接口废境。

ExecutorRepository 負(fù)責(zé)創(chuàng)建并管理 Dubbo 中的線程池畜挨,該接口雖然是個(gè) SPI 擴(kuò)展點(diǎn),但是只有一個(gè)默認(rèn)實(shí)現(xiàn)—— DefaultExecutorRepository噩凹。在該默認(rèn)實(shí)現(xiàn)中維護(hù)了一個(gè) ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> 集合(data 字段)緩存已有的線程池逮刨,第一層 Key 值表示線程池屬于 Provider 端還是 Consumer 端,第二層 Key 值表示線程池關(guān)聯(lián)服務(wù)的端口尤辱。

DefaultExecutorRepository.createExecutorIfAbsent() 方法會(huì)根據(jù) URL 參數(shù)創(chuàng)建相應(yīng)的線程池并緩存在合適的位置娄涩,具體實(shí)現(xiàn)如下:

public class DefaultExecutorRepository implements ExecutorRepository {

    public synchronized ExecutorService createExecutorIfAbsent(URL url) {
        // 根據(jù)URL中的side參數(shù)值決定第一層key
        String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
        if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
            componentKey = CONSUMER_SIDE;
        }
        Map<Integer, ExecutorService> executors = data.computeIfAbsent(componentKey, k -> new ConcurrentHashMap<>());
        
        // 根據(jù)URL中的port值確定第二層key
        Integer portKey = url.getPort();
        ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(url));
        // If executor has been shut down, create a new one
        if (executor.isShutdown() || executor.isTerminated()) {
            executors.remove(portKey);
            // 如果緩存中相應(yīng)的線程池已關(guān)閉咽斧,則同樣需要調(diào)用createExecutor()方法
            // 創(chuàng)建新的線程池,并替換掉緩存中已關(guān)閉的線程持
            executor = createExecutor(url);
            executors.put(portKey, executor);
        }
        return executor;
    }
}

在 createExecutor() 方法中替蔬,會(huì)通過(guò) Dubbo SPI 查找 ThreadPool 接口的擴(kuò)展實(shí)現(xiàn)格嗅,并調(diào)用其 getExecutor() 方法創(chuàng)建線程池。ThreadPool 接口被 @SPI 注解修飾苔悦,默認(rèn)使用 FixedThreadPool 實(shí)現(xiàn)蟋座,但是 ThreadPool 接口中的 getExecutor() 方法被 @Adaptive 注解修飾驯遇,動(dòng)態(tài)生成的適配器類會(huì)優(yōu)先根據(jù) URL 中的 threadpool 參數(shù)選擇 ThreadPool 的擴(kuò)展實(shí)現(xiàn)好乐。

@SPI("fixed")
public interface ThreadPool {

    @Adaptive({THREADPOOL_KEY})
    Executor getExecutor(URL url);

}

ThreadPool 接口的實(shí)現(xiàn)類如下圖所示:


不同實(shí)現(xiàn)會(huì)根據(jù) URL 參數(shù)創(chuàng)建不同特性的線程池淮蜈,這里以CacheThreadPool為例進(jìn)行分析:

public class CachedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        // 核心線程數(shù)量
        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
        // 最大線程數(shù)量
        int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
        // 緩沖隊(duì)列的最大長(zhǎng)度
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        // 非核心線程的最大空閑時(shí)長(zhǎng),當(dāng)非核心線程空閑時(shí)間超過(guò)該值時(shí)旦袋,會(huì)被回收
        int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
        // 下面就是依賴JDK的ThreadPoolExecutor創(chuàng)建指定特性的線程池并返回
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}
  • LimitedThreadPool:與 CacheThreadPool 一樣,可以指定核心線程數(shù)、最大線程數(shù)以及緩沖隊(duì)列長(zhǎng)度嗅剖。區(qū)別在于赏胚,LimitedThreadPool 創(chuàng)建的線程池的非核心線程不會(huì)被回收。

  • FixedThreadPool:核心線程數(shù)和最大線程數(shù)一致,且不會(huì)被回收革为。

上述三種類型的線程池都是基于 JDK ThreadPoolExecutor 線程池,在核心線程全部被占用的時(shí)候,會(huì)優(yōu)先將任務(wù)放到緩沖隊(duì)列中緩存勒葱,在緩沖隊(duì)列滿了之后糠聪,才會(huì)嘗試創(chuàng)建新線程來(lái)處理任務(wù)。

EagerThreadPool 創(chuàng)建的線程池是 EagerThreadPoolExecutor(繼承了 JDK 提供的 ThreadPoolExecutor),使用的隊(duì)列是 TaskQueue(繼承了LinkedBlockingQueue)。該線程池與 ThreadPoolExecutor 不同的是:在線程數(shù)沒(méi)有達(dá)到最大線程數(shù)的前提下,EagerThreadPoolExecutor 會(huì)優(yōu)先創(chuàng)建線程來(lái)執(zhí)行任務(wù),而不是放到緩沖隊(duì)列中;當(dāng)線程數(shù)達(dá)到最大值時(shí)蠢涝,EagerThreadPoolExecutor 會(huì)將任務(wù)放入緩沖隊(duì)列堡距,等待空閑線程衬吆。

EagerThreadPoolExecutor 覆蓋了 ThreadPoolExecutor 中的兩個(gè)方法:execute() 方法和 afterExecute() 方法蟀架,具體實(shí)現(xiàn)如下色徘,我們可以看到其中維護(hù)了一個(gè) submittedTaskCount 字段(AtomicInteger 類型)溪猿,用來(lái)記錄當(dāng)前在線程池中的任務(wù)總數(shù)(正在線程中執(zhí)行的任務(wù)數(shù)+隊(duì)列中等待的任務(wù)數(shù))群井。

public class EagerThreadPoolExecutor extends ThreadPoolExecutor {

    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
    
    @Override
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        // do not increment in method beforeExecute!
        // 任務(wù)提交之前,遞增submittedTaskCount
        submittedTaskCount.incrementAndGet();
        try {
            // 提交任務(wù)
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // retry to offer the task into queue.
            final TaskQueue queue = (TaskQueue) super.getQueue();
            try {
                // 任務(wù)被拒絕之后蛙埂,會(huì)嘗試再次放入隊(duì)列中緩存宏怔,等待空閑線程執(zhí)行
                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                    // 再次入隊(duì)被拒絕抓艳,則隊(duì)列已滿位他,無(wú)法執(zhí)行任務(wù)
                    // 遞減submittedTaskCount
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.", rx);
                }
            } catch (InterruptedException x) {
                // 再次入隊(duì)列異常靡菇,遞減submittedTaskCount
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } catch (Throwable t) {
            // decrease any way
            // 任務(wù)提交異常馍惹,遞減submittedTaskCount
            submittedTaskCount.decrementAndGet();
            throw t;
        }
    }
    
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        // 任務(wù)指定結(jié)束遇西,遞減submittedTaskCount
        submittedTaskCount.decrementAndGet();
    }
}

看到這里馅精,你可能會(huì)有些疑惑:沒(méi)有看到優(yōu)先創(chuàng)建線程執(zhí)行任務(wù)的邏輯啊。其實(shí)重點(diǎn)在關(guān)聯(lián)的 TaskQueue 實(shí)現(xiàn)中粱檀,它覆蓋了 LinkedBlockingQueue.offer() 方法硫嘶,會(huì)判斷線程池的 submittedTaskCount 值是否已經(jīng)達(dá)到最大線程數(shù),如果未超過(guò)梧税,則會(huì)返回 false沦疾,迫使線程池創(chuàng)建新線程來(lái)執(zhí)行任務(wù)。示例代碼如下:

public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {

    private EagerThreadPoolExecutor executor;
    
    @Override
    public boolean offer(Runnable runnable) {
        if (executor == null) {
            throw new RejectedExecutionException("The task queue does not have executor!");
        }
        // 獲取當(dāng)前線程池中的活躍線程數(shù)
        int currentPoolThreadSize = executor.getPoolSize();
        
         // 當(dāng)前有線程空閑第队,直接將任務(wù)提交到隊(duì)列中哮塞,空閑線程會(huì)直接從中獲取任務(wù)執(zhí)行
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            return super.offer(runnable);
        }

        // 當(dāng)前沒(méi)有空閑線程,但是還可以創(chuàng)建新線程凳谦,則返回false忆畅,迫使線程池創(chuàng)建
        // 新線程來(lái)執(zhí)行任務(wù)
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }

        // 當(dāng)前線程數(shù)已經(jīng)達(dá)到上限,只能放到隊(duì)列中緩存了
        return super.offer(runnable);
    }
}

線程池最后一個(gè)相關(guān)的小細(xì)節(jié)是 AbortPolicyWithReport 尸执,它繼承了 ThreadPoolExecutor.AbortPolicy家凯,覆蓋的 rejectedExecution 方法中會(huì)輸出包含線程池相關(guān)信息的 WARN 級(jí)別日志,然后進(jìn)行 dumpJStack() 方法如失,最后才會(huì)拋出RejectedExecutionException 異常绊诲。

NettyServer

回到 Server 的繼承線上,下面來(lái)看基于 Netty 4 實(shí)現(xiàn)的 NettyServer褪贵,它繼承了前文介紹的 AbstractServer掂之,實(shí)現(xiàn)了 doOpen() 方法和 doClose() 方法。這里重點(diǎn)看 doOpen() 方法脆丁,如下所示:

public class NettyServer extends AbstractServer implements RemotingServer {

    private Map<String, Channel> channels;
    
    private ServerBootstrap bootstrap;
    
    private io.netty.channel.Channel channel;

    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;

    @Override
    protected void doOpen() throws Throwable {
        // 創(chuàng)建ServerBootstrap
        bootstrap = new ServerBootstrap();
        // 創(chuàng)建boss EventLoopGroup
        bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
        // 創(chuàng)建worker EventLoopGroup
        workerGroup = NettyEventLoopFactory.eventLoopGroup(
                getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                "NettyServerWorker");

        // 創(chuàng)建NettyServerHandler世舰,它是一個(gè)Netty中的ChannelHandler實(shí)現(xiàn),
        // 不是Dubbo Remoting層的ChannelHandler接口的實(shí)現(xiàn)
        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        
        // 獲取當(dāng)前NettyServer創(chuàng)建的所有Channel槽卫,這里的channels集合中的
        // Channel不是Netty中的Channel對(duì)象跟压,而是Dubbo Remoting層的Channel對(duì)象    
        channels = nettyServerHandler.getChannels();
        // 初始化ServerBootstrap歼培,指定boss和worker EventLoopGroup
        bootstrap.group(bossGroup, workerGroup)
                .channel(NettyEventLoopFactory.serverSocketChannelClass())
                .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // FIXME: should we use getTimeout()?
                        // 連接空閑超時(shí)時(shí)間
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        // NettyCodecAdapter中會(huì)創(chuàng)建Decoder和Encoder
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                            ch.pipeline().addLast("negotiation",
                                    SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
                        }
                        ch.pipeline()
                                // 注冊(cè)Decoder和Encoder
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                // 注冊(cè)IdleStateHandler
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                // 注冊(cè)NettyServerHandler
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        // 綁定指定的地址和端口
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        // 等待bind操作完成
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }
}

看完 NettyServer 實(shí)現(xiàn)的 doOpen() 方法之后,你會(huì)發(fā)現(xiàn)它和普通Netty 的 Server 端基本流程類似:初始化 ServerBootstrap梗搅、創(chuàng)建 Boss EventLoopGroup 和 Worker EventLoopGroup、創(chuàng)建 ChannelInitializer 指定如何初始化 Channel 上的 ChannelHandler 等一系列 Netty 使用的標(biāo)準(zhǔn)化流程。

其實(shí)在 Transporter 這一層看,功能的不同其實(shí)就是注冊(cè)在 Channel 上的 ChannelHandler 不同泪掀。

核心 ChannelHandler

下面我們來(lái)逐個(gè)看看這四個(gè) ChannelHandler 的核心功能塔拳。

首先是decoder 和 encoder,它們都是 NettyCodecAdapter 的內(nèi)部類,如下圖所示弯予,分別繼承了 Netty 中的 ByteToMessageDecoder 和 MessageToByteEncoder:

還記得 AbstractEndpoint 抽象類中的 codec 字段(Codec2 類型)嗎呼寸?InternalDecoder 和 InternalEncoder 會(huì)將真正的編解碼功能委托給 NettyServer 關(guān)聯(lián)的這個(gè) Codec2 對(duì)象去處理,這里以 InternalDecoder 為例進(jìn)行分析:

final public class NettyCodecAdapter {

    private final Codec2 codec;

    private final URL url;

    private final org.apache.dubbo.remoting.ChannelHandler handler;

    private class InternalDecoder extends ByteToMessageDecoder {

        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
            // 將ByteBuf封裝成統(tǒng)一的ChannelBuffer
            ChannelBuffer message = new NettyBackedChannelBuffer(input);
            // 拿到關(guān)聯(lián)的Channel
            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);

            // decode object.
            do {
                // 記錄當(dāng)前readerIndex的位置
                int saveReaderIndex = message.readerIndex();
                // 委托給Codec2進(jìn)行解碼
                Object msg = codec.decode(channel, message);
                // 當(dāng)前接收到的數(shù)據(jù)不足一個(gè)消息的長(zhǎng)度捐祠,會(huì)返回NEED_MORE_INPUT,
                // 這里會(huì)重置readerIndex,繼續(xù)等待接收更多的數(shù)據(jù)             
                if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                    message.readerIndex(saveReaderIndex);
                    break;
                } else {
                    //is it possible to go here ?
                    if (saveReaderIndex == message.readerIndex()) {
                        throw new IOException("Decode without read data.");
                    }
                    if (msg != null) {
                         // 將讀取到的消息傳遞給后面的Handler處理
                        out.add(msg);
                    }
                }
            } while (message.readable());
        }
    }
}

IdleStateHandler

IdleStateHandler靴寂,它是 Netty 提供的一個(gè)工具型 ChannelHandler剖踊,用于定時(shí)心跳請(qǐng)求的功能或是自動(dòng)關(guān)閉長(zhǎng)時(shí)間空閑連接的功能梆造。它的原理到底是怎樣的呢忽肛?在 IdleStateHandler 中通過(guò) lastReadTime烂斋、lastWriteTime 等幾個(gè)字段屹逛,記錄了最近一次讀/寫事件的時(shí)間煎源,IdleStateHandler 初始化的時(shí)候,會(huì)創(chuàng)建一個(gè)定時(shí)任務(wù)柄错,定時(shí)檢測(cè)當(dāng)前時(shí)間與最后一次讀/寫時(shí)間的差值。如果超過(guò)我們?cè)O(shè)置的閾值(也就是上面 NettyServer 中設(shè)置的 idleTimeout)颂跨,就會(huì)觸發(fā) IdleStateEvent 事件池颈,并傳遞給后續(xù)的 ChannelHandler 進(jìn)行處理。后續(xù) ChannelHandler 的 userEventTriggered() 方法會(huì)根據(jù)接收到的 IdleStateEvent 事件钓丰,決定是關(guān)閉長(zhǎng)時(shí)間空閑的連接躯砰,還是發(fā)送心跳探活。

NettyServerHandler

最后來(lái)看NettyServerHandler斑粱,它繼承了 ChannelDuplexHandler弃揽,這是 Netty 提供的一個(gè)同時(shí)處理 Inbound 數(shù)據(jù)和 Outbound 數(shù)據(jù)的 ChannelHandler,從下面的繼承圖就能看出來(lái)则北。

NettyServerHandler 繼承關(guān)系圖

在 NettyServerHandler 中有 channels 和 handler 兩個(gè)核心字段。

  • channels(Map<String,Channel>集合):記錄了當(dāng)前 Server 創(chuàng)建的所有 Channel痕慢,從下圖中可以看到尚揣,連接創(chuàng)建(觸發(fā) channelActive() 方法)、連接斷開(kāi)(觸發(fā) channelInactive()方法)會(huì)操作 channels 集合進(jìn)行相應(yīng)的增刪掖举。
  • handler(ChannelHandler 類型):NettyServerHandler 內(nèi)幾乎所有方法都會(huì)觸發(fā)該 Dubbo ChannelHandler 對(duì)象(如下圖)快骗。

這里以 write() 方法為例進(jìn)行簡(jiǎn)單分析:

public class NettyServerHandler extends ChannelDuplexHandler {

    private final URL url;

    private final ChannelHandler handler;
    
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        // 將發(fā)送的數(shù)據(jù)繼續(xù)向下傳遞
        super.write(ctx, msg, promise);
        // 并不影響消息的繼續(xù)發(fā)送,只是觸發(fā)sent()方法進(jìn)行相關(guān)的處理,這也是方法
        // 名稱是動(dòng)詞過(guò)去式的原因方篮,可以仔細(xì)體會(huì)一下名秀。其他方法可能沒(méi)有那么明顯
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        handler.sent(channel, msg);
    }
}

在 NettyServer 創(chuàng)建 NettyServerHandler 的時(shí)候,可以看到下面的這行代碼:

final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);

其中第二個(gè)參數(shù)傳入的是 NettyServer 這個(gè)對(duì)象藕溅,你可以追溯一下 NettyServer 的繼承結(jié)構(gòu)匕得,會(huì)發(fā)現(xiàn)它的最頂層父類 AbstractPeer 實(shí)現(xiàn)了 ChannelHandler,并且將所有的方法委托給其中封裝的 ChannelHandler 對(duì)象巾表,如下圖所示:


也就是說(shuō)汁掠,NettyServerHandler 會(huì)將數(shù)據(jù)委托給這個(gè) ChannelHandler。

到此為止集币,Server 這條繼承線就介紹完了考阱。你可以回顧一下,從 AbstractPeer 開(kāi)始往下鞠苟,一路繼承下來(lái)乞榨,NettyServer 擁有了 Endpoint、ChannelHandler 以及RemotingServer多個(gè)接口的能力当娱,關(guān)聯(lián)了一個(gè) ChannelHandler 對(duì)象以及 Codec2 對(duì)象吃既,并最終將數(shù)據(jù)委托給這兩個(gè)對(duì)象進(jìn)行處理。所以趾访,上層調(diào)用方只需要實(shí)現(xiàn) ChannelHandler 和 Codec2 這兩個(gè)接口就可以了态秧。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市扼鞋,隨后出現(xiàn)的幾起案子申鱼,更是在濱河造成了極大的恐慌,老刑警劉巖云头,帶你破解...
    沈念sama閱讀 221,888評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件捐友,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡溃槐,警方通過(guò)查閱死者的電腦和手機(jī)匣砖,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,677評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)昏滴,“玉大人猴鲫,你說(shuō)我怎么就攤上這事∫ナ猓” “怎么了拂共?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,386評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)姻几。 經(jīng)常有香客問(wèn)我宜狐,道長(zhǎng)势告,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,726評(píng)論 1 297
  • 正文 為了忘掉前任抚恒,我火速辦了婚禮咱台,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘俭驮。我一直安慰自己回溺,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,729評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布表鳍。 她就那樣靜靜地躺著馅而,像睡著了一般。 火紅的嫁衣襯著肌膚如雪譬圣。 梳的紋絲不亂的頭發(fā)上瓮恭,一...
    開(kāi)封第一講書(shū)人閱讀 52,337評(píng)論 1 310
  • 那天,我揣著相機(jī)與錄音厘熟,去河邊找鬼屯蹦。 笑死,一個(gè)胖子當(dāng)著我的面吹牛绳姨,可吹牛的內(nèi)容都是我干的登澜。 我是一名探鬼主播,決...
    沈念sama閱讀 40,902評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼飘庄,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼脑蠕!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起跪削,我...
    開(kāi)封第一講書(shū)人閱讀 39,807評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤谴仙,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后碾盐,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體晃跺,經(jīng)...
    沈念sama閱讀 46,349評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,439評(píng)論 3 340
  • 正文 我和宋清朗相戀三年毫玖,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了掀虎。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,567評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡付枫,死狀恐怖烹玉,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情阐滩,我是刑警寧澤春霍,帶...
    沈念sama閱讀 36,242評(píng)論 5 350
  • 正文 年R本政府宣布,位于F島的核電站叶眉,受9級(jí)特大地震影響址儒,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜衅疙,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,933評(píng)論 3 334
  • 文/蒙蒙 一莲趣、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧饱溢,春花似錦喧伞、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,420評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至肋杖,卻和暖如春溉仑,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背状植。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,531評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工浊竟, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人津畸。 一個(gè)月前我還...
    沈念sama閱讀 48,995評(píng)論 3 377
  • 正文 我出身青樓振定,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親肉拓。 傳聞我的和親對(duì)象是個(gè)殘疾皇子后频,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,585評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容