本文首發(fā)于貓叔的博客 | MySelf巍棱,如需轉(zhuǎn)載,請申明出處.
假設(shè)
假設(shè)你已經(jīng)了解并實現(xiàn)過了一些OIO消息服務(wù)端辐棒,并對異步消息服務(wù)端更有興趣,那么本文或許能帶你更好的入門,并了解JDK部分源碼的關(guān)系流程漾根,正如題目所說泰涂,筆者將竟可能還原,以初學(xué)者能理解的角度辐怕,講訴并構(gòu)建一個NIO消息服務(wù)端逼蒙。
啟動通道并注冊選擇器
啟動模式
感謝Java一直在持續(xù)更新,對應(yīng)的各個API也做得越來越好了寄疏,我們本次生成 服務(wù)端套接字通道 也是使用到JDK提供的一個方式 open 是牢,我們將啟動一個 ServerSocketChannel ,他是一個 支持同步異步模式 的 服務(wù)端套接字通道 陕截。
它是一個抽象類驳棱,官方給了推薦的方式 open 來開啟一個我們需要的 服務(wù)端套接字通道實例 。(如下的官方源碼相關(guān)注釋)
/**
* A selectable channel for stream-oriented listening sockets.
*/
public abstract class ServerSocketChannel
extends AbstractSelectableChannel
implements NetworkChannel
{
/**
* Opens a server-socket channel.
*/
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}
}
那么好了农曲,我們現(xiàn)在可以確定我們第一步的代碼是什么樣子的了社搅!沒錯,和你想象中的一樣乳规,這很簡單形葬。
public class NioServer {
public void server(int port) throws IOException{
//1、打開服務(wù)器套接字通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
}
}
本節(jié)的重點是 啟動模式 暮的,那么這意味著笙以,我們需要向 ServerSocketChannel 進(jìn)行標(biāo)識,那么它是否提供了對用的方法設(shè)置 同步異步(阻塞非阻塞) 呢冻辩?
這很明顯源织,它是提供的,這也是它的核心功能之一微猖,其實應(yīng)該是它繼承的 父抽象類AbstractSelectableChannel 的實現(xiàn)方法: configgureBlocking(Boolean),這個方法將標(biāo)識我們的 服務(wù)端套接字通道 是否阻塞模式缘屹。(如下的官方源碼相關(guān)注釋)
/**
* Base implementation class for selectable channels.
*/
public abstract class AbstractSelectableChannel
extends SelectableChannel
{
/**
* Adjusts this channel's blocking mode.
*/
public final SelectableChannel configureBlocking(boolean block)
throws IOException
{
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
if (blocking == block)
return this;
if (block && haveValidKeys())
throw new IllegalBlockingModeException();
implConfigureBlocking(block);
blocking = block;
}
return this;
}
}
那么凛剥,我們現(xiàn)在可以進(jìn)行 啟動模式的配置 了,讀者很聰明轻姿。我們的項目Demo可以這樣寫: false為非阻塞模式犁珠、true為阻塞模式 。
public class NioServer {
public void server(int port) throws IOException{
//1互亮、打開服務(wù)器套接字通道
ServerSocketChannel serverSocketzhannel = ServerSocketChannel.open();
//2犁享、設(shè)定為非阻塞、調(diào)整此通道的阻塞模式豹休。
serverSocketChannel.configureBlocking(false);
}
}
若未配置阻塞模式炊昆,注冊選擇器 會報
java.nio.channels.IllegalBlockingModeException
異常,相關(guān)將于該小節(jié)大致講解說明。
套接字地址端口綁定
做過消息通訊服務(wù)器的朋友應(yīng)該都清楚凤巨,我們需要向服務(wù)端 指定IP與端口 视乐,即使是NIO服務(wù)器也是一樣的,否則敢茁,我們的客戶端會報 java.net.ConnectException: Connection refused: connect
異常
對于NIO的地址端口綁定佑淀,我們也需要用到 ServerSocket服務(wù)器套接字 。我們知道在寫OIO服務(wù)端的時候彰檬,我們可能僅僅需要寫一句即可伸刃,如下。
//將服務(wù)器綁定到指定端口
final ServerSocket socket = new ServerSocket(port);
當(dāng)然逢倍,JDK在實現(xiàn)NIO的時候就已經(jīng)想到了捧颅,同樣,我們可以使用 服務(wù)器套接字通道 來獲取一個 ServerSocket服務(wù)器套接字 瓶堕。這時的它并沒有綁定端口隘道,我們需要對應(yīng)綁定地址,這個類自身就有一個 bind 方法郎笆。(如下源碼相關(guān)注釋)
/**
* This class implements server sockets. A server socket waits for
* requests to come in over the network. It performs some operation
* based on that request, and then possibly returns a result to the requester.
*/
public class ServerSocket implements java.io.Closeable {
/**
*
* Binds the {@code ServerSocket} to a specific address
* (IP address and port number).
*/
public void bind(SocketAddress endpoint) throws IOException {
bind(endpoint, 50);
}
}
通過源碼谭梗,我們知道,綁定iP與端口 需要一個SocketAddress類宛蚓,我們僅需要將 IP與端口配置到對應(yīng)的SocketAddress類 中即可激捏。其實JDK中,已經(jīng)有了一個更加方便且繼承了SocketAddress的類:InetSocketAddress凄吏。
InetSocketAddress有一個需要一個port為參數(shù)的構(gòu)造方法远舅,它將創(chuàng)建 一個ip為通配符、端口為指定值的套接字地址 痕钢。這很方便我們的開發(fā)图柏,對吧?(如下源碼相關(guān)注釋)
/**
*
* This class implements an IP Socket Address (IP address + port number)
* It can also be a pair (hostname + port number), in which case an attempt
* will be made to resolve the hostname. If resolution fails then the address
* is said to be <I>unresolved</I> but can still be used on some circumstances
* like connecting through a proxy.
*/
public class InetSocketAddress
extends SocketAddress
{
/**
* Creates a socket address where the IP address is the wildcard address
* and the port number a specified value.
*/
public InetSocketAddress(int port) {
this(InetAddress.anyLocalAddress(), port);
}
}
好了任连,那么接下來我們的項目代碼可以繼續(xù)添加綁定IP與端口了蚤吹,我想聰明的你應(yīng)該有所感覺了。
public class NioServer {
public void server(int port) throws IOException{
//1随抠、打開服務(wù)器套接字通道
ServerSocketChannel serverSocketzhannel = ServerSocketChannel.open();
//2裁着、設(shè)定為非阻塞、調(diào)整此通道的阻塞模式拱她。
serverSocketChannel.configureBlocking(false);
//3二驰、檢索與此通道關(guān)聯(lián)的服務(wù)器套接字。
ServerSocket serverSocket = serverSocketChannel.socket();
//4秉沼、此類實現(xiàn) ip 套接字地址 (ip 地址 + 端口號)
InetSocketAddress address = new InetSocketAddress(port);
//5桶雀、將服務(wù)器綁定到選定的套接字地址
serverSocket.bind(address);
}
}
正如開頭我們所說的矿酵,你的項目中不添加3-5環(huán)節(jié)的代碼并沒有問題,但是當(dāng)客戶端接入時背犯,則會報錯坏瘩,因為客戶端將要 接入的地址是連接不到的 ,如會報這樣的錯誤漠魏。
java.net.ConnectException: Connection refused: connect
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.Net.connect(Net.java:449)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:647)
at com.github.myself.WebClient.main(WebClient.java:16)
注冊選擇器
接下來會是 NIO實現(xiàn)的重點 倔矾,可能有點難理解,如果希望大家能一次理解柱锹,完全深入有點難講明白哪自,不過先大致點一下。
首先要先介紹以下JDK實現(xiàn)NIO的核心:多路復(fù)用器(Selector)——選擇器
先簡單并抽象的理解下禁熏,Java通過 選擇器來實現(xiàn)處理多個Channel鏈接 壤巷,將空閑未進(jìn)行數(shù)據(jù)操作的擱置,優(yōu)先執(zhí)行有需求的數(shù)據(jù)傳輸瞧毙,即 通過一個選擇器來選擇誰需要誰不需要使用共享的線程 胧华。
由此,理所當(dāng)然宙彪,這樣的選擇器應(yīng)該也有Java自己定義的獲取方法矩动, 其自身的 open 就是啟動一個這樣的選擇器。(如下源碼相關(guān)注釋)
/**
* A multiplexor of {@link SelectableChannel} objects.
*/
public abstract class Selector implements Closeable {
/**
* Opens a selector.
*/
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
}
那么現(xiàn)在释漆,我們還要考慮一件事情悲没,我們的 服務(wù)器套接字通道 要如何與 選擇器 相關(guān)聯(lián)呢?
ServerSocketChannel 有一個注冊的方法男图,這個方法就是將它們兩個進(jìn)行了關(guān)聯(lián)示姿,同時這個注冊方法 除了關(guān)聯(lián)選擇器外,還標(biāo)識了注冊的狀態(tài) 逊笆,讓我們先看看源碼吧栈戳。
以下的 ServerSocketChannel 繼承 ---》 AbstractSelectableChannel 繼承 ---》 SelectableChannel
/**
* A channel that can be multiplexed via a {@link Selector}.
*/
public abstract class SelectableChannel
extends AbstractInterruptibleChannel
implements Channel
{
/**
* Registers this channel with the given selector, returning a selection
* key.
*/
public final SelectionKey register(Selector sel, int ops)
throws ClosedChannelException
{
return register(sel, ops, null);
}
}
我們一般需要將選擇器注冊上去,并將 ServerSocketChannel 標(biāo)識為 接受連接 的狀態(tài)难裆。我們先看看我們的項目代碼應(yīng)該如何寫荧琼。
public class NioServer {
public void server(int port) throws IOException{
//1、打開服務(wù)器套接字通道
ServerSocketChannel serverSocketzhannel = ServerSocketChannel.open();
//2差牛、設(shè)定為非阻塞、調(diào)整此通道的阻塞模式堰乔。
serverSocketChannel.configureBlocking(false);
//3偏化、檢索與此通道關(guān)聯(lián)的服務(wù)器套接字。
ServerSocket serverSocket = serverSocketChannel.socket();
//4镐侯、此類實現(xiàn) ip 套接字地址 (ip 地址 + 端口號)
InetSocketAddress address = new InetSocketAddress(port);
//5侦讨、將服務(wù)器綁定到選定的套接字地址
serverSocket.bind(address);
//6驶冒、打開Selector來處理Channel
Selector selector = Selector.open();
//7、將ServerSocket注冊到Selector已接受連接韵卤,注冊會判斷是否為非阻塞模式
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer readBuff = ByteBuffer.allocate(1024);
final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
while(true){
//下方代碼.....
}
}
}
注意: 我們前面說到,如果 ServerSocketChannel 沒有啟動非阻塞模式,那么我們在啟動的時候會報 java.lang.IllegalArgumentException
異常父款,這是為什么呢浓恶? 我想我們可能需要更深入底層去看看 register 這個方法(如下源碼注釋)
/**
* Base implementation class for selectable channels.
*/
public abstract class AbstractSelectableChannel
extends SelectableChannel
{
/**
* Registers this channel with the given selector, returning a selection key.
*/
public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (blocking)
throw new IllegalBlockingModeException();
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}
}
我想我們終于真相大白了,原來注冊這個方法會對 ServerSocketChannel 的一系列參數(shù)進(jìn)行 校驗 胸嘴,只有通過耳奕,才能注冊成功邪乍,所以我們也明白了,為什么 非阻塞是false姐刁,同時我們也可以看到芥牌,它還對我們所給的標(biāo)識做了校驗,一點要優(yōu)先注冊 接受連接(OP_ACCEPT) 這個狀態(tài)才行聂使,不然依舊會報 java.lang.IllegalArgumentException
異常壁拉。
這里解釋一下,之所以只接受 OP_ACCEPT 柏靶,是因為如果沒有一個接受其他鏈接的主服務(wù)弃理,那么通信根本無從說起,同時這樣的標(biāo)識在我們的NIO服務(wù)端中 只允許標(biāo)識一次(一個ServerSocketChannel) 屎蜓。
可能大家還會好奇有什么標(biāo)識痘昌,我想源碼的說明確實寫的很清楚了。
/**
* Operation-set bit for read operations.
*/
public static final int OP_READ = 1 << 0;
/**
* Operation-set bit for write operations.
*/
public static final int OP_WRITE = 1 << 2;
/**
* Operation-set bit for socket-connect operations.
*/
public static final int OP_CONNECT = 1 << 3;
/**
* Operation-set bit for socket-accept operations.
*/
public static final int OP_ACCEPT = 1 << 4;
好了炬转,這里給一個調(diào)試截圖辆苔,希望大家也可以慢慢的摸索一下。
注意這里的服務(wù)端并沒有構(gòu)建完成哦扼劈,我們還需要下面的幾個步驟驻啤。
NIO選擇實例與興趣點
客戶端代碼
說到這里,我們暫時先休息下荐吵,轉(zhuǎn)頭看看 客戶端的代碼 吧骑冗,這里就簡單的介紹下,我們將建立 一個針對服務(wù)地址端口的連接 先煎,然后不停的循環(huán) 寫操作與讀操作 贼涩,沒有對客戶端進(jìn)行 關(guān)閉操作。
大家如果有興趣的話薯蝎,也可以自己調(diào)試遥倦,并看看部分類的JDK源碼,如下給出本項目案例的客戶端代碼良风。
public class WebClient {
public static void main(String[] args) throws IOException {
try {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("0.0.0.0",8090));
ByteBuffer writeBuffer = ByteBuffer.allocate(32);
ByteBuffer readBuffer = ByteBuffer.allocate(32);
writeBuffer.put("hello".getBytes());
writeBuffer.flip();
while (true){
writeBuffer.rewind();
socketChannel.write(writeBuffer);
readBuffer.clear();
socketChannel.read(readBuffer);
readBuffer.flip();
System.out.println(new String(readBuffer.array()));
}
}catch (IOException e){
e.printStackTrace();
}
}
}
準(zhǔn)備IO接入操作
這里有點復(fù)雜谊迄,我也盡可能的思考了表達(dá)的方式,首先我們先明確一下烟央,所有的連接都會被Selector所囊括统诺,即我們要獲取新接入的連接,也要通過Selector來獲取疑俭,我們一開始啟動的 服務(wù)器套接字通道ServerSocketChannel 起到一個接入\入口(或許不夠準(zhǔn)確)的作用粮呢,客戶端連接通過IP與端口進(jìn)入后,會 被注冊的Selector所獲取 到钞艇,成為 Selector 其中的一員啄寡。
但是這里的一員 并不會包括一開始注冊并被標(biāo)志為接收連接 的 ServerSocketChannel 。
Selector有這樣一個方法哩照,它會自動去等待新的連接事件挺物,如果沒有連接接入,那么它將一直處于阻塞狀態(tài)飘弧。通過字面意思我們可以大致這樣寫代碼识藤。
while(true){
try{
//1、等到需要處理的新事件:阻塞將一直持續(xù)到下一個傳入事件
selector.select();
}catch(IOException e){
e.printStackTrace();
break;
}
}
那么這樣寫好像有點像樣次伶,畢竟異常我們也捕獲了痴昧,同時也使用了剛剛 開啟并注冊完畢的選擇器Selector。
讓我們看看源碼中對于這個方法 select 的注釋吧冠王。
/**
* A multiplexor of {@link SelectableChannel} objects.
*/
public abstract class Selector implements Closeable {
/**
* Selects a set of keys whose corresponding channels are ready for I/O
* operations.
*/
public abstract int select() throws IOException;
}
好的赶撰,看樣子是對的,它將返回一組套接字通道已經(jīng)準(zhǔn)備好執(zhí)行I/O操作的鍵柱彻。那么這個Key究竟是什么呢豪娜?
這里可能直觀的感受下會更好。如下圖是我調(diào)試下看到的key對象绒疗,我想大家應(yīng)該可以理解了侵歇,這個Key中也會 存放對應(yīng)連接的Channel與Selector 。
具體的內(nèi)部更深層的就探討了吓蘑。那么這也解決了我們接下來的 一個疑問 惕虑,我們要怎么向Selector拿連接進(jìn)來的實例呢?
答案很明顯磨镶,我們僅需要 獲取到這個Keys 就好了溃蔫。
選擇鍵集合操作
對于獲取Keys這個現(xiàn)在應(yīng)該已經(jīng)不是什么問題了,通過上面章節(jié)的了解琳猫,我想大家也可以想到這樣的大致語法伟叛。
//獲取所有接收事件的SelectionKey實例
Set<SelectionKey> readykeys = selector.selectedKeys();
大家或許會好奇,這里的Key對象居然是前面的 SelectionKey.OP_ACCEPT
對象脐嫂,是的统刮,這也是接下來要講的紊遵,這很奇妙,也很好玩侥蒙。
前面說到的標(biāo)識暗膜,這是每一個Key自有的,并且是可以 改變的狀態(tài) 鞭衩,在剛剛連接的時候学搜,或許我應(yīng)該大致的描述一下 一個新連接進(jìn)入選擇器后的流程 :select方法將接受到新接入的連接事件,它會被Selector以Key的形式存儲论衍,這時我們需要 對其進(jìn)行判斷 瑞佩,是否是已經(jīng)就緒可以被接受的連接,如果是坯台,這時我們需要 獲取這個連接 炬丸,同時也將其設(shè)定為 非阻塞的狀態(tài) ,并將它 注冊到選擇器上(當(dāng)然捂人,這時的標(biāo)識就不能是一開始的 OP_ACCEPT
)御雕,你可以選擇性的 注冊它的標(biāo)識 ,之后我們可以通過循環(huán)遍歷Keys來滥搭,讓 某一標(biāo)識的連接去執(zhí)行對應(yīng)的操作 酸纲。
說到這里,我想部分新手可能會有點模糊瑟匆,我想我還是把接下來的代碼都一起放出來吧闽坡,大家先看看是否能夠再次結(jié)合文本進(jìn)行了解。
while (true){
try {
//等到需要處理的新事件:阻塞將一直持續(xù)到下一個傳入事件
selector.select();
}catch (IOException e){
e.printStackTrace();
break;
}
//獲取所有接收事件的SelectionKey實例
Set<SelectionKey> readykeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readykeys.iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
try {
//檢查事件是否是一個新的已經(jīng)就緒可以被接受的連接
if (key.isAcceptable()){
//channel:返回為其創(chuàng)建此鍵的通道愁溜。 即使在取消密鑰后, 此方法仍將繼續(xù)返回通道疾嗅。
ServerSocketChannel server = (ServerSocketChannel)key.channel();
//可選擇的通道, 用于面向流的連接插槽。
SocketChannel client = server.accept();
//設(shè)定為非阻塞
client.configureBlocking(false);
//接受客戶端冕象,并將它注冊到選擇器代承,并添加附件
client.register(selector,SelectionKey.OP_WRITE | SelectionKey.OP_READ,msg.duplicate());
System.out.println("Accepted connection from " + client);
}
//檢查套接字是否已經(jīng)準(zhǔn)備好讀數(shù)據(jù)
if (key.isReadable()){
SocketChannel client = (SocketChannel)key.channel();
readBuff.clear();
client.read(readBuff);
readBuff.flip();
System.out.println("received:"+new String(readBuff.array()));
//將此鍵的興趣集設(shè)置為給定的值。 OP_WRITE
key.interestOps(SelectionKey.OP_WRITE);
}
//檢查套接字是否已經(jīng)準(zhǔn)備好寫數(shù)據(jù)
if (key.isWritable()){
SocketChannel client = (SocketChannel)key.channel();
//attachment : 檢索當(dāng)前附件
ByteBuffer buffer = (ByteBuffer)key.attachment();
buffer.rewind();
client.write(buffer);
//將此鍵的興趣集設(shè)置為給定的值渐扮。 OP_READ
key.interestOps(SelectionKey.OP_READ);
}
}catch (IOException e){
e.printStackTrace();
}
}
}
提示:讀到此處论悴,還請各位讀者能運行整個demo,并調(diào)試下墓律,看看與自己理解的是否有差別膀估。
流程效果
以下我簡單敘述一下,我在調(diào)試時的理解與效果耻讽。
1察纯、啟動服務(wù)端后,運行到
selector.select();
后阻塞,因為沒有監(jiān)聽到新的連接饼记。2香伴、啟動客戶端后,
selector.select()
監(jiān)聽到新連接具则,往下執(zhí)行獲取到的Keys的size為1瞒窒,進(jìn)入Key標(biāo)識分支判斷3、
key.isAcceptable()
首次接入為true乡洼,設(shè)置為非阻塞,并注釋到選擇器中修改標(biāo)識為SelectionKey.OP_WRITE | SelectionKey.OP_READ
匕坯,同時添加附件信息msg.duplicate()
束昵,首次循環(huán)結(jié)束4、二次循環(huán)葛峻,連接未關(guān)閉锹雏,獲取到的Keys的size為1,進(jìn)入Key標(biāo)識分支判斷术奖。
5礁遵、由于第一次該Key標(biāo)識改變,所以這次
key.isAcceptable()
為false采记,而由于改了標(biāo)識佣耐,所以接下來的key.isReadable()
、key.isWritable()
都為true唧龄,執(zhí)行讀寫操作兼砖,循環(huán)結(jié)束。6既棺、接下來的循環(huán)讽挟,基本上是
key.isReadable()
、key.isWritable()
都為true丸冕,執(zhí)行讀寫操作耽梅。7、設(shè)想一下胖烛,如果多加一條鏈接是什么效果眼姐。
回顧
這里給出幾個代碼的注意點,希望大家可以自己去了解學(xué)習(xí)洪己。
- 1妥凳、關(guān)于 ByteBuffer 本文并不重點講解,大家可以自行了解
- 2答捕、關(guān)于Key標(biāo)識判斷的代碼逝钥,以下兩句的刪減是否會對代碼有所影響呢?
key.interestOps(SelectionKey.OP_WRITE);
key.interestOps(SelectionKey.OP_READ);
- 3、如果刪除了2中的代碼艘款,并把客戶端注冊選擇器并給標(biāo)識的代碼改為以下持际,那么項目運行效果怎么樣呢?
client.register(selector, SelectionKey.OP_READ,msg.duplicate());
- 4哗咆、如果改了3的代碼蜘欲,可是不刪除2的代碼,那么效果又是怎么樣呢晌柬?
答案留給讀者去揭曉吧姥份,如果你有答案,歡迎留言年碘。
個人相關(guān)項目
InChat : 一個輕量級澈歉、高效率的支持多端(應(yīng)用與硬件Iot)的異步網(wǎng)絡(luò)應(yīng)用通訊框架