最后介紹一下Selector主卫,選擇器提供選擇執(zhí)行已經(jīng)就緒的任務(wù)的能力墨坚,這使得多元I/O成為了可能殴泰,就緒執(zhí)行和多元選擇使得單線程能夠有效地同時(shí)管理多個(gè)I/O通道隔躲。選擇器的執(zhí)行主要分為以下幾個(gè)步驟:
1芒涡、創(chuàng)建一個(gè)或者多個(gè)可選擇的通道(SelectableChannel)
2柴灯、將這些創(chuàng)建的通道注冊到選擇器對象中
3卖漫、選擇器會記住開發(fā)者關(guān)心的通道,它們也會追蹤對應(yīng)的通道是否已經(jīng)就緒
4弛槐、開發(fā)者調(diào)用一個(gè)選擇器對象的select()方法懊亡,當(dāng)方法從阻塞狀態(tài)返回時(shí),選擇鍵會被更新
5乎串、獲取選擇鍵的集合店枣,找到當(dāng)時(shí)已經(jīng)就緒的通道,通過遍歷這些鍵叹誉,開發(fā)者可以選擇對已就緒的通道要做的操作
選擇器
選擇器的作用是管理了被注冊的通道集合和它們的就緒狀態(tài)鸯两,假設(shè)我們有四個(gè)Socket通道的選擇器,可以通過下面方式創(chuàng)建:
Selector selector = Selector.open();
channel1.register(selector, SelectionKey.OP_READ);
channel2.register(selector, SelectionKey.OP_WRITE);
channel3.register(selector, SelectionKey.OP_READ | OP_WRITE);
channel4.register(selector, SelectionKey.OP_READ | OP_ACCEPT);
ready = selector.select(10000);
select()方法在將線程置于睡眠狀態(tài)直到這些感興趣的事件中的一個(gè)發(fā)生或者10秒鐘過去长豁,這就是所謂的事件驅(qū)動(dòng)钧唐。
通道是調(diào)用register方法注冊到選擇器上的,從代碼里面可以看到register()方法接受一個(gè)Selector對象作為參數(shù)匠襟,以及一個(gè)名為ops的整數(shù)型參數(shù)钝侠,第二個(gè)參數(shù)表示關(guān)心的通道操作。有四種被定義的可選擇操作:讀(read)酸舍、寫(write)帅韧、連接(connect)和接受(accept)。
注意并非所有的操作都在所有的可選擇通道上被支持啃勉,例如SocketChannel就不支持accept忽舟。
選擇鍵
一個(gè)鍵表示一個(gè)特定的通道對象和一個(gè)特定的選擇器對象之間的注冊關(guān)系。
public abstract class SelectionKey
{
public static final int OP_READ;
public static final int OP_WRITE;
public static final int OP_CONNECT;
public static final int OP_ACCEPT;
public abstract SelectableChannel channel();
public abstract Selector selector();
public abstract void cancel();
public abstract boolean isValid();
public abstract int interestOps();
public abstract void iterestOps(int ops);
public abstract int readyOps();
public final boolean isReadable();
public final boolean isWritable();
public final boolean isConnectable();
public final boolean isAcceptable();
public final Object attach(Object ob);
public final Object attachment();
}
選擇器維護(hù)著注冊過的通道的集合淮阐,并且這些注冊關(guān)系中的任意一個(gè)都是封裝在SelectionKey對象中的叮阅。每一個(gè)Selector對象維護(hù)三種鍵的集合:
public abstract class Selector
{
...
public abstract Set keys();
public abstract Set selectedKeys();
public abstract int select() throws IOException;
public abstract int select(long timeout) throws IOException;
public abstract int selectNow() throws IOException;
public abstract void wakeup();
...
}
已注冊的鍵的集合(Registered key set)
與選擇器關(guān)聯(lián)的已經(jīng)注冊的鍵的集合,并不是所有注冊過的鍵都有效泣特,這個(gè)集合通過keys()方法返回浩姥,并且可能是空的。這些鍵的集合是不可以直接修改的状您,試圖這么做將引發(fā)java.lang.UnsupportedOperationException及刻。
已選擇的鍵的集合(Selected key set)
已注冊的鍵的集合的子集,這個(gè)集合的每個(gè)成員都是相關(guān)的通道被選擇器判斷為已經(jīng)準(zhǔn)備好的并且包含于鍵的interest集合中的操作竞阐。這個(gè)集合通過selectedKeys()方法返回(有可能是空的)缴饭。鍵可以直接從這個(gè)集合中移除,但不能添加骆莹。試圖向已選擇的鍵的集合中添加元素將拋java.lang.UnsupportedOperationException颗搂。
已取消的鍵的集合(Cancelled key set)
已注冊的鍵的集合的子集,這個(gè)集合包含了cancel()方法被調(diào)用過的鍵(這個(gè)鍵已經(jīng)被無效化)幕垦,但它們還沒有被注銷丢氢。這個(gè)集合是選擇器對象的私有成員傅联,因而無法直接訪問。
下面結(jié)合之前的Channel和Buffer疚察,看一下如何寫和使用選擇器實(shí)現(xiàn)服務(wù)端Socket數(shù)據(jù)接收的程序蒸走。
服務(wù)端
1 public class SelectorServer
2 {
3 private static int PORT = 1234;
4
5 public static void main(String[] args) throws Exception
6 {
7 // 先確定端口號
8 int port = PORT;
9 if (args != null && args.length > 0)
10 {
11 port = Integer.parseInt(args[0]);
12 }
13 // 打開一個(gè)ServerSocketChannel
14 ServerSocketChannel ssc = ServerSocketChannel.open();
15 // 獲取ServerSocketChannel綁定的Socket
16 ServerSocket ss = ssc.socket();
17 // 設(shè)置ServerSocket監(jiān)聽的端口
18 ss.bind(new InetSocketAddress(port));
19 // 設(shè)置ServerSocketChannel為非阻塞模式
20 ssc.configureBlocking(false);
21 // 打開一個(gè)選擇器
22 Selector selector = Selector.open();
23 // 將ServerSocketChannel注冊到選擇器上去并監(jiān)聽accept事件
24 ssc.register(selector, SelectionKey.OP_ACCEPT);
25 while (true)
26 {
27 // 這里會發(fā)生阻塞,等待就緒的通道
28 int n = selector.select();
29 // 沒有就緒的通道則什么也不做
30 if (n == 0)
31 {
32 continue;
33 }
34 // 獲取SelectionKeys上已經(jīng)就緒的通道的集合
35 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
36 // 遍歷每一個(gè)Key
37 while (iterator.hasNext())
38 {
39 SelectionKey sk = iterator.next();
40 // 通道上是否有可接受的連接
41 if (sk.isAcceptable())
42 {
43 ServerSocketChannel ssc1 = (ServerSocketChannel)sk.channel();
44 SocketChannel sc = ssc1.accept();
45 sc.configureBlocking(false);
46 sc.register(selector, SelectionKey.OP_READ);
47 }
48 // 通道上是否有數(shù)據(jù)可讀
49 else if (sk.isReadable())
50 {
51 readDataFromSocket(sk);
52 }
53 iterator.remove();
54 }
55 }
56 }
57
58 private static ByteBuffer bb = ByteBuffer.allocate(1024);
59
60 // 從通道中讀取數(shù)據(jù)
61 protected static void readDataFromSocket(SelectionKey sk) throws Exception
62 {
63 SocketChannel sc = (SocketChannel)sk.channel();
64 bb.clear();
65 while (sc.read(bb) > 0)
66 {
67 bb.flip();
68 while (bb.hasRemaining())
69 {
70 System.out.print((char)bb.get());
71 }
72 System.out.println();
73 bb.clear();
74 }
75 }
76 }
滿足isAcceptable()則表示該通道上有數(shù)據(jù)到來了貌嫡,此時(shí)我們做的事情不是獲取該通道—>創(chuàng)建一個(gè)線程來讀取該通道上的數(shù)據(jù)比驻,這么做就和BIO沒有區(qū)別了。我們做的事情只是簡單地將對應(yīng)的SocketChannel注冊到選擇器上岛抄,通過傳入OP_READ標(biāo)記别惦,告訴選擇器我們關(guān)心新的Socket通道什么時(shí)候可以準(zhǔn)備好讀數(shù)據(jù)。
滿足isReadable()則表示新注冊的Socket通道已經(jīng)可以讀取數(shù)據(jù)了夫椭,此時(shí)調(diào)用readDataFromSocket方法讀取SocketChannel中的數(shù)據(jù)掸掸。
客戶端
選擇器客戶端的代碼,沒什么要求蹭秋,只要向服務(wù)器端發(fā)送數(shù)據(jù)就可以了扰付。
1 public class SelectorClient
2 {
3 private static final String STR = "Hello World!";
4 private static final String REMOTE_IP = "127.0.0.1";
5 private static final int THREAD_COUNT = 5;
6
7 private static class NonBlockingSocketThread extends Thread
8 {
9 public void run()
10 {
11 try
12 {
13 int port = 1234;
14 SocketChannel sc = SocketChannel.open();
15 sc.configureBlocking(false);
16 sc.connect(new InetSocketAddress(REMOTE_IP, port));
17 while (!sc.finishConnect())
18 {
19 System.out.println("同" + REMOTE_IP + "的連接正在建立,請稍等仁讨!");
20 Thread.sleep(10);
21 }
22 System.out.println("連接已建立悯周,待寫入內(nèi)容至指定ip+端口!時(shí)間為" + System.currentTimeMillis());
23 String writeStr = STR + this.getName();
24 ByteBuffer bb = ByteBuffer.allocate(writeStr.length());
25 bb.put(writeStr.getBytes());
26 bb.flip(); // 寫緩沖區(qū)的數(shù)據(jù)之前一定要先反轉(zhuǎn)(flip)
27 sc.write(bb);
28 bb.clear();
29 sc.close();
30 }
31 catch (IOException e)
32 {
33 e.printStackTrace();
34 }
35 catch (InterruptedException e)
36 {
37 e.printStackTrace();
38 }
39 }
40 }
41
42 public static void main(String[] args) throws Exception
43 {
44 NonBlockingSocketThread[] nbsts = new NonBlockingSocketThread[THREAD_COUNT];
45 for (int i = 0; i < THREAD_COUNT; i++)
46 nbsts[i] = new NonBlockingSocketThread();
47 for (int i = 0; i < THREAD_COUNT; i++)
48 nbsts[i].start();
49 // 一定要join保證線程代碼先于sc.close()運(yùn)行陪竿,否則會有AsynchronousCloseException
50 for (int i = 0; i < THREAD_COUNT; i++)
51 nbsts[i].join();
52 }
53 }