5.1 Netty客戶端底層與Java NIO對(duì)應(yīng)關(guān)系
在講解Netty客戶端程序時(shí)候我們提到指定NioSocketChannel用于創(chuàng)建客戶端NIO套接字通道的實(shí)例先巴,下面我們來看NioSocketChannel是如何創(chuàng)建一個(gè)Java NIO里面的SocketChannel的。
首先我們來看NioSocketChannel的構(gòu)造函數(shù):
public NioSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
其中DEFAULT_SELECTOR_PROVIDER定義如下:
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
然后繼續(xù)看
//這里的provider為DEFAULT_SELECTOR_PROVIDER
public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
其中newSocket代碼如下:
private static SocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
所以NioSocketChannel內(nèi)部是管理一個(gè)客戶端的SocketChannel的啥么,這個(gè)SocketChannel就是講Java NIO時(shí)候的SocketChannel,也就是創(chuàng)建NioSocketChannel實(shí)例對(duì)象時(shí)候相當(dāng)于執(zhí)行了Java NIO中:
SocketChannel socketChannel = SocketChannel.open();
另外在NioSocketChannel的父類AbstractNioChannel的構(gòu)造函數(shù)里面默認(rèn)會(huì)記錄隊(duì)op_read事件感興趣,這個(gè)后面當(dāng)鏈接完成后會(huì)使用到:
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
另外在NioSocketChannel的父類AbstractNioChannel的構(gòu)造函數(shù)里面設(shè)置了該套接字為非阻塞的
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
...
}
}
下面我們看Netty里面是哪里創(chuàng)建的NioSocketChannel實(shí)例栖榨,哪里注冊(cè)到選擇器的靡挥。
下面我們看下Bootstrap的connect操作代碼:
public ChannelFuture connect(InetAddress inetHost, int inetPort) {
return connect(new InetSocketAddress(inetHost, inetPort));
}
類似Java NIO傳遞了一個(gè)InetSocketAddress對(duì)象用來記錄服務(wù)端ip和端口:
public ChannelFuture connect(SocketAddress remoteAddress) {
...
return doResolveAndConnect(remoteAddress, config.localAddress());
}
下面我們看下doResolveAndConnect的代碼:
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
//(1)
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
//(2)
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
}
...
}
}
首先我們來看代碼(1)initAndRegister:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//(1.1)
channel = channelFactory.newChannel();
//(1.2)
init(channel);
} catch (Throwable t) {
...
}
//(1.3)
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
}
其中(1.1)作用就是創(chuàng)建一個(gè)NioSocketChannel的實(shí)例,代碼(1.2)是具體設(shè)置內(nèi)部套接字的選項(xiàng)的蜈抓。
代碼(1.3)則是具體注冊(cè)客戶端套接字到選擇器的启绰,其首先會(huì)調(diào)用NioEventLoop的register方法,最后調(diào)用NioSocketChannelUnsafe的register方法:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
其中 register0內(nèi)部調(diào)用doRegister沟使,其代碼如下:
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//注冊(cè)客戶端socket到當(dāng)前eventloop的selector上
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...
}
}
}
到這里代碼(1)initAndRegister的流程講解完畢了委可,下面我們來看代碼(2)的
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
...
try {
...
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
。格带。撤缴。
}
} catch (Throwable t) {
...
}
}
其中doConnect代碼如下:
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
...
boolean success = false;
try {
//2.1
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
//2.2
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
其中2.1具體調(diào)用客戶端套接字的connect方法,等價(jià)于Java NIO里面的叽唱。
代碼2.2 由于connect 方法是異步的屈呕,所以類似JavaNIO調(diào)用connect方法進(jìn)行判斷,如果當(dāng)前沒有完成鏈接則設(shè)置對(duì)op_connect感興趣棺亭。
最后一個(gè)點(diǎn)就是何處進(jìn)行的從選擇器獲取就緒的事件的,具體是在該客戶端套接關(guān)聯(lián)的NioEventLoop里面的做的虎眨,每個(gè)NioEventLoop里面有一個(gè)線程用來循環(huán)從選擇器里面獲取就緒的事件,然后進(jìn)行處理:
protected void run() {
for (;;) {
try {
...
select(wakenUp.getAndSet(false));
...
processSelectedKeys();
...
} catch (Throwable t) {
handleLoopException(t);
}
...
}
}
其中select代碼如下:
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
...
for (;;) {
...
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
...
} catch (CancelledKeyException e) {
...
}
}
可知會(huì)從選擇器選取就緒的事件镶摘,其中processSelectedKeys代碼如下:
private void processSelectedKeys() {
...
processSelectedKeysPlain(selector.selectedKeys());
...
}
可知會(huì)獲取已經(jīng)就緒的事件集合嗽桩,然后交給processSelectedKeysPlain處理,后者循環(huán)調(diào)用processSelectedKey具體處理每個(gè)事件凄敢,代碼如下:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
...
try {
//(3)如果是op_connect事件
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
//3.1
unsafe.finishConnect();
}
//4
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
//5
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
代碼(3)如果當(dāng)前事件key為op_connect則去掉op_connect碌冶,然后調(diào)用NioSocketChannel的doFinishConnect:
protected void doFinishConnect() throws Exception {
if (!javaChannel().finishConnect()) {
throw new Error();
}
}
可知是調(diào)用了客戶端套接字的finishConnect方法,最后會(huì)調(diào)用NioSocketChannel的doBeginRead方法設(shè)置對(duì)op_read事件感興趣:
protected void doBeginRead() throws Exception {
...
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
這里interestOps為op_read,上面在講解NioSocketChannel的構(gòu)造函數(shù)時(shí)候提到過涝缝。
代碼(5)如果當(dāng)前是op_accept事件說明是服務(wù)器監(jiān)聽套接字獲取到了一個(gè)鏈接套接字扑庞,如果是op_read,則說明可以讀取客戶端發(fā)來的數(shù)據(jù)了,如果是后者則會(huì)激活管線里面的所有handler的channelRead方法拒逮,這里會(huì)激活我們自定義的NettyClientHandler的channelRead讀取客戶端發(fā)來的數(shù)據(jù)罐氨,然后在向客戶端寫入數(shù)據(jù)。
5.2 總結(jié)
本節(jié)講解了Netty客戶端底層如何使用Java NIO進(jìn)行實(shí)現(xiàn)的滩援,可見與我們前面講解的Java NIO設(shè)計(jì)的客戶端代碼步驟是一致的栅隐,只是netty對(duì)其進(jìn)行了封裝,方便了我們使用,了解了這些對(duì)深入研究netty源碼提供了一個(gè)骨架指導(dǎo)租悄。
想了解JDK NIO和更多Netty基礎(chǔ)的可以單擊我
更多關(guān)于分布式系統(tǒng)中服務(wù)降級(jí)策略的知識(shí)可以單擊 單擊我
想系統(tǒng)學(xué)dubbo的單擊我
想學(xué)并發(fā)的童鞋可以 單擊我