1馏锡、Selector(選擇器)是Java NIO中能夠檢測(cè)一到多個(gè)NIO渠道,并能夠知曉渠道是否為諸如讀寫事件做好準(zhǔn)備的組件署海。這樣丰辣,一個(gè)單獨(dú)的線程可以管理多個(gè)channel笙什,從而管理多個(gè)網(wǎng)絡(luò)連接腕扶。
2棉磨、selector 操作的過程
2.1 首先創(chuàng)建Selector
Selector selector = Selector.open();
2.2 向selector注冊(cè)channel,和感興趣的事件聪建,
channel.configureBlocking(false);
SelectionKey key = channel.register(selector,Selectionkey.OP_READ);
注意register()方法的第二個(gè)參數(shù)钙畔, 這是一個(gè)interest集合,意思是在通過Selector監(jiān)聽Channel時(shí)對(duì)什么事件感興趣金麸∏嫖觯可以監(jiān)聽以下4中不同類型的事件:
服務(wù)端接收客戶端連接事件 SelectionKey.OP_ACCEPT
客戶端連接服務(wù)端事件 SelectionKey.OP_CONNECT
讀事件 SelectionKey.OP_READ
寫事件 SelectionKey.OP_WRITE
如果你對(duì)不止一種事件感興趣,那么可以用“位或”操作符將常量連接起來如下:
SelectionKey key = channel.register(selector,Selectionkey.OP_READ|SelectionKey.OP_WRITE);
當(dāng)向Selector注冊(cè)Channel時(shí)挥下,registor()方法會(huì)返回一個(gè)SelectorKey對(duì)象揍魂。這個(gè)對(duì)象包含了一些你感興趣的屬性。
interest集合
ready集合
Channel
Selector
附加的對(duì)象(可選)
2.2.1 interest集合
就像向Selector注冊(cè)通道一節(jié)中所描述的棚瘟,interest集合是你所選擇的感興趣的事件集合现斋。可以通過SelectionKey讀寫interest集合偎蘸,像這樣:
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
可以看到庄蹋,用“位與”操作interest 集合和給定的SelectionKey常量,可以確定某個(gè)確定的事件是否在interest集合中迷雪。
2.2.2 ready集合
ready 集合是通道已經(jīng)準(zhǔn)備就緒的操作的集合限书。在一次選擇(Selection)之后,你會(huì)首先訪問這個(gè)ready set振乏。Selection將在下一小節(jié)進(jìn)行解釋蔗包。可以這樣訪問ready集合:
int readySet = selectionKey.readyOps();
可以用像檢測(cè)interest集合那樣的方法慧邮,來檢測(cè)channel中什么事件或操作已經(jīng)就緒调限。但是,也可以使用以下四個(gè)方法误澳,它們都會(huì)返回一個(gè)布爾類型:
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
2.2.3 Channel + Selector
從SelectionKey訪問Channel和Selector很簡(jiǎn)單耻矮。如下:
Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
2.2.4 附加對(duì)象
可以將一個(gè)對(duì)象或者更多信息附著到SelectionKey上,這樣就能方便的識(shí)別某個(gè)給定的通道忆谓。例如裆装,可以附加 與通道一起使用的Buffer,或是包含聚集數(shù)據(jù)的某個(gè)對(duì)象倡缠。使用方法如下:
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();
還可以在用register()方法向Selector注冊(cè)Channel的時(shí)候附加對(duì)象哨免。如:
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
2.3 通過Selector選擇通道
一旦向Selector注冊(cè)了一或多個(gè)通道,就可以調(diào)用幾個(gè)重載的select()方法昙沦。這些方法返回你所感興趣的事件(如連接琢唾、接受、讀或?qū)懀┮呀?jīng)準(zhǔn)備就緒的那些通道盾饮。換句話說采桃,如果你對(duì)“讀就緒”的通道感興趣懒熙,select()方法會(huì)返回讀事件已經(jīng)就緒的那些通道。
下面是select()方法:(該方法是阻塞方法)
int select()
int select(long timeout)
int selectNow()
select()阻塞到至少有一個(gè)通道在你注冊(cè)的事件上就緒了普办。
select(long timeout)和select()一樣工扎,除了最長會(huì)阻塞timeout毫秒(參數(shù))。
selectNow()不會(huì)阻塞衔蹲,不管什么通道就緒都立刻返回(譯者注:此方法執(zhí)行非阻塞的選擇操作肢娘。如果自從前一次選擇操作后,沒有通道變成可選擇的踪危,則此方法直接返回零蔬浙。)猪落。
select()方法返回的int值表示有多少通道已經(jīng)就緒贞远。亦即,自上次調(diào)用select()方法后有多少通道變成就緒狀態(tài)笨忌。如果調(diào)用select()方法蓝仲,因?yàn)橛幸粋€(gè)通道變成就緒狀態(tài),返回了1官疲,若再次調(diào)用select()方法袱结,如果另一個(gè)通道就緒了,它會(huì)再次返回1途凫。如果對(duì)第一個(gè)就緒的channel沒有做任何操作垢夹,現(xiàn)在就有兩個(gè)就緒的通道,但在每次select()方法調(diào)用之間维费,只有一個(gè)通道就緒了果元。
2.3.1 selectedKeys()
一旦調(diào)用了select()方法,并且返回值表明有一個(gè)或更多個(gè)通道就緒了犀盟,然后可以通過調(diào)用selector的selectedKeys()方法而晒,訪問“已選擇鍵集(selected key set)”中的就緒通道。如下所示:
Set selectedKeys = selector.selectedKeys();
當(dāng)像Selector注冊(cè)Channel時(shí)阅畴,Channel.register()方法會(huì)返回一個(gè)SelectionKey 對(duì)象倡怎。這個(gè)對(duì)象代表了注冊(cè)到該Selector的通道〖妫可以通過SelectionKey的selectedKeySet()方法訪問這些對(duì)象监署。
可以遍歷這個(gè)已選擇的鍵集合來訪問就緒的通道。如下:
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
這個(gè)循環(huán)遍歷已選擇鍵集中的每個(gè)鍵纽哥,并檢測(cè)各個(gè)鍵所對(duì)應(yīng)的通道的就緒事件钠乏。
注意每次迭代末尾的keyIterator.remove()調(diào)用。Selector不會(huì)自己從已選擇鍵集中移除SelectionKey實(shí)例昵仅。必須在處理完通道時(shí)自己移除缓熟。下次該通道變成就緒時(shí)累魔,Selector會(huì)再次將其放入已選擇鍵集中。
SelectionKey.channel()方法返回的通道需要轉(zhuǎn)型成你要處理的類型够滑,如ServerSocketChannel或SocketChannel等垦写。
wakeUp()
某個(gè)線程調(diào)用select()方法后阻塞了,即使沒有通道已經(jīng)就緒彰触,也有辦法讓其從select()方法返回梯投。只要讓其它線程在第一個(gè)線程調(diào)用select()方法的那個(gè)對(duì)象上調(diào)用Selector.wakeup()方法即可。阻塞在select()方法上的線程會(huì)立馬返回况毅。如果有其它線程調(diào)用了wakeup()方法分蓖,但當(dāng)前沒有線程阻塞在select()方法上,下個(gè)調(diào)用select()方法的線程會(huì)立即“醒來(wake up)”尔许。
close()
用完Selector后調(diào)用其close()方法會(huì)關(guān)閉該Selector么鹤,且使注冊(cè)到該Selector上的所有SelectionKey實(shí)例無效。通道本身并不會(huì)關(guān)閉味廊。
總結(jié):
1蒸甜、創(chuàng)建Selector,
2、創(chuàng)建Channel余佛,()
3柠新、向Selector中注冊(cè)Channel,及感興趣的事件,
4.1辉巡、等待注冊(cè)的事件到達(dá)恨憎,不然就一直等待 selector.select()
4.2、獲取selector中選中項(xiàng)的迭代器郊楣,選中的項(xiàng)為注冊(cè)的事件 Iterator ite = this.selector.selectedKeys().iterator();
4.3憔恳、針對(duì)選中的事件對(duì)感興趣的事件進(jìn)行處理。
5痢甘、要想快速理解selector過程喇嘱,結(jié)合代碼來學(xué)習(xí)。
2.3.2 server端實(shí)例代碼
package com.anders.selector;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NIOServer {
// 通道管理器
private Selector selector;
public void initServer(int port) throws Exception {
// 獲得一個(gè)ServerSocket通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 設(shè)置通道為 非阻塞
serverChannel.configureBlocking(false);
// 將該通道對(duì)于的serverSocket綁定到port端口
serverChannel.socket().bind(new InetSocketAddress(port));
// 獲得一耳光通道管理器
this.selector = Selector.open();
// 將通道管理器和該通道綁定塞栅,并為該通道注冊(cè)selectionKey.OP_ACCEPT事件
// 注冊(cè)該事件后者铜,當(dāng)事件到達(dá)的時(shí)候,selector.select()會(huì)返回放椰,
// 如果事件沒有到達(dá)selector.select()會(huì)一直阻塞
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
// 采用輪訓(xùn)的方式監(jiān)聽selector上是否有需要處理的事件作烟,如果有,進(jìn)行處理
public void listen() throws Exception {
System.out.println("start server");
// 輪詢?cè)L問selector
while (true) {
// 當(dāng)注冊(cè)事件到達(dá)時(shí)砾医,方法返回拿撩,否則該方法會(huì)一直阻塞
selector.select();
// 獲得selector中選中的相的迭代器,選中的相為注冊(cè)的事件
Iterator ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next();
// 刪除已選的key 以防重負(fù)處理
ite.remove();
// 客戶端請(qǐng)求連接事件
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 獲得和客戶端連接的通道
SocketChannel channel = server.accept();
// 設(shè)置成非阻塞
channel.configureBlocking(false);
// 在這里可以發(fā)送消息給客戶端
channel.write(ByteBuffer.wrap(new String("hello client").getBytes()));
// 在客戶端 連接成功之后如蚜,為了可以接收到客戶端的信息压恒,需要給通道設(shè)置讀的權(quán)限
channel.register(this.selector, SelectionKey.OP_READ);
// 獲得了可讀的事件
} else if (key.isReadable()) {
read(key);
}
}
}
}
// 處理 讀取客戶端發(fā)來的信息事件
private void read(SelectionKey key) throws Exception {
// 服務(wù)器可讀消息影暴,得到事件發(fā)生的socket通道
SocketChannel channel = (SocketChannel) key.channel();
// 穿件讀取的緩沖區(qū)
ByteBuffer buffer = ByteBuffer.allocate(10);
channel.read(buffer);
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println("server receive from client: " + msg);
ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
channel.write(outBuffer);
}
public static void main(String[] args) throws Throwable {
NIOServer server = new NIOServer();
server.initServer(8989);
server.listen();
}
}
2.3.3 client 實(shí)例代碼
package com.anders.selector;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class NIOClient {
// 通道管理器
private Selector selector;
/**
* * // 獲得一個(gè)Socket通道,并對(duì)該通道做一些初始化的工作 * @param ip 連接的服務(wù)器的ip // * @param port
* 連接的服務(wù)器的端口號(hào) * @throws IOException
*/
public void initClient(String ip, int port) throws IOException { // 獲得一個(gè)Socket通道
SocketChannel channel = SocketChannel.open(); // 設(shè)置通道為非阻塞
channel.configureBlocking(false); // 獲得一個(gè)通道管理器
this.selector = Selector.open(); // 客戶端連接服務(wù)器,其實(shí)方法執(zhí)行并沒有實(shí)現(xiàn)連接探赫,需要在listen()方法中調(diào)
// 用channel.finishConnect();才能完成連接
channel.connect(new InetSocketAddress(ip, port));
// 將通道管理器和該通道綁定型宙,并為該通道注冊(cè)SelectionKey.OP_CONNECT事件。
channel.register(selector, SelectionKey.OP_CONNECT);
}
/**
* * // 采用輪詢的方式監(jiān)聽selector上是否有需要處理的事件伦吠,如果有妆兑,則進(jìn)行處理 * @throws // IOException
* @throws Exception
*/
@SuppressWarnings("unchecked")
public void listen() throws Exception { // 輪詢?cè)L問selector
while (true) {
// 選擇一組可以進(jìn)行I/O操作的事件,放在selector中,客戶端的該方法不會(huì)阻塞毛仪,
// 這里和服務(wù)端的方法不一樣搁嗓,查看api注釋可以知道,當(dāng)至少一個(gè)通道被選中時(shí)箱靴,
// selector的wakeup方法被調(diào)用腺逛,方法返回,而對(duì)于客戶端來說刨晴,通道一直是被選中的
selector.select(); // 獲得selector中選中的項(xiàng)的迭代器
Iterator ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next(); // 刪除已選的key,以防重復(fù)處理
ite.remove(); // 連接事件發(fā)生
if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) key.channel(); // 如果正在連接屉来,則完成連接
if (channel.isConnectionPending()) {
channel.finishConnect();
} // 設(shè)置成非阻塞
channel.configureBlocking(false);
// 在這里可以給服務(wù)端發(fā)送信息哦
channel.write(ByteBuffer.wrap(new String("hello server!").getBytes()));
// 在和服務(wù)端連接成功之后路翻,為了可以接收到服務(wù)端的信息狈癞,需要給通道設(shè)置讀的權(quán)限。
channel.register(this.selector, SelectionKey.OP_READ); // 獲得了可讀的事件
} else if (key.isReadable()) {
read(key);
}
}
}
}
private void read(SelectionKey key) throws Exception {
SocketChannel channel = (SocketChannel) key.channel();
// 穿件讀取的緩沖區(qū)
ByteBuffer buffer = ByteBuffer.allocate(10);
channel.read(buffer);
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println("client receive msg from server:" + msg);
ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
channel.write(outBuffer);
}
/**
* * // 啟動(dòng)客戶端測(cè)試 * @throws IOException
* @throws Exception
*/
public static void main(String[] args) throws Exception {
NIOClient client = new NIOClient();
client.initClient("localhost", 8989);
client.listen();
}
}