netty源碼分析-注冊及連接

線程池都準(zhǔn)備好了媒咳,我們需要利用起來了。我們一客戶端的connect為例講述這個(gè)過程种远。下面是我們觸發(fā)了鏈接這個(gè)動(dòng)作

ChannelFuture f = b.connect(host, port).sync();

他里面是怎樣的邏輯呢?

private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
    //利用反射創(chuàng)建channel類顽耳,并且初始化它
    final ChannelFuture regFuture = initAndRegister(); 
    ... 
    //真正的鏈接服務(wù)端
    return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise()); 
    }
}

中間省略了好多代碼坠敷,只留下關(guān)鍵的代碼。首先我們回憶一下NIO的經(jīng)典操作射富。首先創(chuàng)建一個(gè)channel膝迎,然后在selector上注冊,并指明感興趣的事件胰耗,隨后selector就select了限次,等待感興趣的事件到來,事件到達(dá)柴灯,處理請求卖漫。這是原生的NIO的處理過程,既然netty是基于nio的赠群,頂多是幫助我們封裝了這些操作而已羊始,讓我們可以更加舒服的利用netty的api處理網(wǎng)絡(luò)的請求〔槊瑁看看上面的注釋突委,基本上和我們的了解一致,至于是不是真的一致冬三,那么久得繼續(xù)往下看了匀油。

final ChannelFuture initAndRegister() {
    Channel channel = null;
    channel = channelFactory.newChannel();  //利用反射創(chuàng)建對象
    init(channel);  //初始化,添加邏輯處理器勾笆,設(shè)置channel的Option與屬性Attribute
    ...
    ChannelFuture regFuture = config().group().register(channel);  
    ...
    return regFuture;
}

利用反射創(chuàng)建了代碼中我們指定的channel敌蚜,init初始化,添加邏輯處理器匠襟,設(shè)置channel的Option與屬性Attribute钝侠。我們更為關(guān)鍵的是看一下如果進(jìn)行注冊上篇文章也介紹了groupMultithreadEventLoopGroup的實(shí)例。

### io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

這個(gè)next方法就是我們的選擇器發(fā)揮作用了酸舍,選擇一個(gè)孩子來進(jìn)行處理(負(fù)載均衡的考慮)帅韧。具體的是NioEventLoop的事例進(jìn)行的register操作,他沒有復(fù)寫父類的方法啃勉,所以由父類SingleThreadEventLoop來具體處理

### io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)
@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

將channel包裝成了DefaultChannelPromise的對象進(jìn)行操作忽舟。

### io.netty.channel.AbstractChannel.AbstractUnsafe#register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ...  
    AbstractChannel.this.eventLoop = eventLoop;
    ...
    eventLoop.execute(new Runnable() {  //有具體的線程池進(jìn)行處理,參數(shù)傳遞過來的
                @Override
                public void run() {
                    register0(promise);
                }
            });
      ...
    }
}

老樣子,省略好多代碼叮阅,只留下重點(diǎn)刁品。eventLoop是NioEnevtLoop的實(shí)例,所以看一下他的execute,同樣的他沒有復(fù)寫這個(gè)方法浩姥,所以還是由父類提供

### io.netty.util.concurrent.SingleThreadEventExecutor#execute
@Override
public void execute(Runnable task) {
        ...
        startThread();   //開啟線程
        addTask(task);   //處理請求
        ...  
}

### io.netty.util.concurrent.SingleThreadEventExecutor#startThread
private void startThread() {
   ...
            doStartThread();
    ...
}

private void doStartThread() {
    assert thread == null;
    executor.execute(new Runnable() {  //重點(diǎn)關(guān)注這個(gè)executor
        @Override
        public void run() {
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                SingleThreadEventExecutor.this.run();  //SingleThreadEventExecutor.this是NioEventLoop的事例
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                for (;;) {
                    int oldState = state;
                    if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                            SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                        break;
                    }
                }

                // Check if confirmShutdown() was called at the end of the loop.
                if (success && gracefulShutdownStartTime == 0) {
                    logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                            SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                            "before run() implementation terminates.");
                }

                try {
                    // Run all remaining tasks and shutdown hooks.
                    for (;;) {
                        if (confirmShutdown()) {
                            break;
                        }
                    }
                } finally {
                    try {
                        cleanup();
                    } finally {
                        STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                        threadLock.release();
                        if (!taskQueue.isEmpty()) {
                            logger.warn(
                                    "An event executor terminated with " +
                                            "non-empty task queue (" + taskQueue.size() + ')');
                        }

                        terminationFuture.setSuccess(null);
                    }
                }
            }
        }
    });
}

這里有一個(gè)細(xì)節(jié)點(diǎn)不能忽略就是executor.execute挑随,我們要知道這個(gè)executor是啥志群,再創(chuàng)建NioEventLoopGroup時(shí)笛园,有這樣的邏輯

### io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)
if (executor == null) {
    executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
 
protected ThreadFactory newDefaultThreadFactory() {
    return new DefaultThreadFactory(getClass());
}

### io.netty.util.concurrent.DefaultThreadFactory
@Override
public Thread newThread(Runnable r) {
    Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
    try {
        if (t.isDaemon() != daemon) {
            t.setDaemon(daemon);
        }

        if (t.getPriority() != priority) {
            t.setPriority(priority);
        }
    } catch (Exception ignored) {
        // Doesn't matter even if failed to set.
    }
    return t;
}
 
private static final class DefaultRunnableDecorator implements Runnable {

    private final Runnable r;

    DefaultRunnableDecorator(Runnable r) {
        this.r = r;
    }

    @Override
    public void run() {
        try {
            r.run();
        } finally {
            FastThreadLocal.removeAll();
        }
    }
}

線程工廠創(chuàng)建線程的邏輯,線程池里面設(shè)置了線程工廠葬燎,那么線程池運(yùn)行多線程任務(wù)的時(shí)候眯分,其實(shí)是利用線程工廠創(chuàng)建線程來運(yùn)行

### io.netty.util.concurrent.ThreadPerTaskExecutor
public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

當(dāng)線程池有任務(wù)過來時(shí)拌汇,會(huì)調(diào)用線程工廠創(chuàng)建線程,并且啟動(dòng)該線程來處理,我們看一下NioEventLoop的run方法


@Override
protected void run() {
    for (;;) {
         ...
         processSelectedKeys();   //處理Nio中的SelectedKeys   
         ...          
    }
}
### 
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            // If the channel implementation throws an exception because there is no event loop, we ignore this
            // because we are only trying to determine if ch is registered to this event loop and thus has authority
            // to close ch.
            return;
        }
        // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
        // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
        // still healthy and should not be closed.
        // See https://github.com/netty/netty/issues/5125
        if (eventLoop != this || eventLoop == null) {
            return;
        }
        // close the channel if the key is not valid anymore
        unsafe.close(unsafe.voidPromise());
        return;
    }

    try {
        int readyOps = k.readyOps();
        // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
        // the NIO JDK channel implementation may throw a NotYetConnectedException.
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {  //如何是鏈接的請求弊决,調(diào)用unsafe的finishConnect
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);
            unsafe.finishConnect();
        }

        // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }

        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {  
            unsafe.read();  //讀取數(shù)據(jù)
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

好像終于和我們的NIO有點(diǎn)聯(lián)系了噪舀。無非也就是等感興趣的事件來了就處理,調(diào)用unsafe來處理飘诗,首先我們說一下unsafe,他是NioSocketChannelUnsafe的事例与倡,而這個(gè)類繼承了NioByteUnsafe,并且大部分的方法都是在NioByteUnSafe,我們比較關(guān)心她的讀取數(shù)據(jù)的過程

@Override
    public final void read() {
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);

        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
                byteBuf = allocHandle.allocate(allocator);
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {
                    // nothing was read. release the buffer.
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    break;
                }

                allocHandle.incMessagesRead(1);
                readPending = false;
                pipeline.fireChannelRead(byteBuf);     //觸發(fā)pipeline的生命周期方法疚察,接收消息蒸走,處理消息
                byteBuf = null;
            } while (allocHandle.continueReading());

            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();

            if (close) {
                closeOnRead(pipeline);
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
        } finally {
            // Check if there is a readPending which was not processed yet.
            // This could be for two reasons:
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
            //
            // See https://github.com/netty/netty/issues/2254
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
}

調(diào)用pipeline的生命周期方,同時(shí)將數(shù)據(jù)傳遞過去貌嫡,handler開始處理了比驻。以上皆是處理了SelectionKey的過程。注冊搞好了岛抄,我們就可以開始連接别惦。在我們追蹤下來,connect核心的代碼

doConnect(remoteAddress, localAddress)

### io.netty.channel.socket.nio.NioSocketChannel#doConnect
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    if (localAddress != null) {
        doBind0(localAddress);
    }

    boolean success = false;
    try {
        boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
        if (!connected) {
            selectionKey().interestOps(SelectionKey.OP_CONNECT);
        }
        success = true;
        return connected;
    } finally {
        if (!success) {
            doClose();
        }
    }
}

socket 鏈接遠(yuǎn)程服務(wù)器夫椭,因?yàn)槭钱惒芥溄拥УВ詂onnected為false,那么就注冊了OP_CONNECT事件,這樣蹭秋,當(dāng)連接事件做好之后扰付,在線程組中會(huì)有無限循環(huán),查詢準(zhǔn)備好的事件仁讨,連接事件好了羽莺,就會(huì)進(jìn)行處理,同時(shí)觸發(fā)聲明周期的方法洞豁,進(jìn)行流程的流轉(zhuǎn)盐固。
以上荒给。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市刁卜,隨后出現(xiàn)的幾起案子志电,更是在濱河造成了極大的恐慌,老刑警劉巖蛔趴,帶你破解...
    沈念sama閱讀 222,252評(píng)論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件挑辆,死亡現(xiàn)場離奇詭異,居然都是意外死亡孝情,警方通過查閱死者的電腦和手機(jī)之拨,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來咧叭,“玉大人,你說我怎么就攤上這事烁竭》撇纾” “怎么了?”我有些...
    開封第一講書人閱讀 168,814評(píng)論 0 361
  • 文/不壞的土叔 我叫張陵派撕,是天一觀的道長婉弹。 經(jīng)常有香客問我,道長终吼,這世上最難降的妖魔是什么镀赌? 我笑而不...
    開封第一講書人閱讀 59,869評(píng)論 1 299
  • 正文 為了忘掉前任,我火速辦了婚禮际跪,結(jié)果婚禮上商佛,老公的妹妹穿的比我還像新娘。我一直安慰自己姆打,他們只是感情好良姆,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,888評(píng)論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著幔戏,像睡著了一般玛追。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上闲延,一...
    開封第一講書人閱讀 52,475評(píng)論 1 312
  • 那天痊剖,我揣著相機(jī)與錄音,去河邊找鬼垒玲。 笑死陆馁,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的侍匙。 我是一名探鬼主播氮惯,決...
    沈念sama閱讀 41,010評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼叮雳,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了妇汗?” 一聲冷哼從身側(cè)響起帘不,我...
    開封第一講書人閱讀 39,924評(píng)論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎杨箭,沒想到半個(gè)月后寞焙,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,469評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡互婿,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,552評(píng)論 3 342
  • 正文 我和宋清朗相戀三年捣郊,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片慈参。...
    茶點(diǎn)故事閱讀 40,680評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡呛牲,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出驮配,到底是詐尸還是另有隱情娘扩,我是刑警寧澤,帶...
    沈念sama閱讀 36,362評(píng)論 5 351
  • 正文 年R本政府宣布壮锻,位于F島的核電站琐旁,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏猜绣。R本人自食惡果不足惜灰殴,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,037評(píng)論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望掰邢。 院中可真熱鬧牺陶,春花似錦、人聲如沸辣之。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,519評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽召烂。三九已至碱工,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間奏夫,已是汗流浹背怕篷。 一陣腳步聲響...
    開封第一講書人閱讀 33,621評(píng)論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留酗昼,地道東北人廊谓。 一個(gè)月前我還...
    沈念sama閱讀 49,099評(píng)論 3 378
  • 正文 我出身青樓,卻偏偏與公主長得像麻削,于是被迫代替她去往敵國和親蒸痹。 傳聞我的和親對象是個(gè)殘疾皇子春弥,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,691評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容