本文詳細(xì)介紹組成非阻塞通信的幾大類:Buffer、Channel位衩、Selector梨熙、SelectionKey
非阻塞通信的流程
- ServerSocketChannel通過open方法獲取ServerSocketChannel,通過ServerSocketChannel設(shè)置為非阻塞模式,再通過ServerSocketChannel獲取socket巍糯,綁定服務(wù)進(jìn)程監(jiān)聽端口。服務(wù)啟動(dòng)成功客扎。
- 然后就是非阻塞通信的精髓了祟峦,Selector通過靜態(tài)的open()方法獲取到Selector,然后ServerSocketChannel注冊(cè)Selection.OP_ACCEPT事件到Selector上徙鱼。
- Selector就會(huì)監(jiān)控事件發(fā)生宅楞,Selector通過select()監(jiān)控已發(fā)生的SelectionKey對(duì)象的數(shù)目,通過selectKeys()方法返回對(duì)應(yīng)的selectionKey對(duì)象集合袱吆。遍歷該集合得到相應(yīng)的selectionKey對(duì)象厌衙,通過該對(duì)象的channel()方法獲取關(guān)聯(lián)的ServerSocketChannel對(duì)象, 通過selector()方法就可以獲取關(guān)聯(lián)的Selector對(duì)象绞绒。
- 通過上面獲取的ServerSocketChannel執(zhí)行accept()方法獲取SocketChannel婶希,再通過SocketChannel設(shè)置為非阻塞模式,在將SocketChannel注冊(cè)到上面創(chuàng)建的Selector上蓬衡,注冊(cè)
SelectionKey.OP_READ |SelectionKey.OP_WRITE
事件喻杈。 - Selector將在監(jiān)控對(duì)應(yīng)上面綁定的事件,監(jiān)控到對(duì)應(yīng)的事件的話執(zhí)行讀和寫的操作狰晚。
示例代碼:
上面描述了服務(wù)端非阻塞方式通信的一個(gè)流程筒饰,下面通過具體代碼實(shí)現(xiàn):
/**
* 非阻塞模式
*
*/
public class EchoServer2 {
private Selector selector = null;
private ServerSocketChannel serverSocketChannel = null;
private int port = 8001;
private Charset charset = Charset.forName("UTF-8");
public EchoServer2() throws IOException {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
//服務(wù)器重啟的時(shí)候,重用端口
serverSocketChannel.socket().setReuseAddress(true);
//設(shè)置非阻塞模式
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(port));
System.out.println("服務(wù)器啟動(dòng)成功");
}
/**
* 服務(wù)方法
*/
public void service() throws IOException {
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (selector.select() > 0) {
Set readyKes = selector.selectedKeys();
Iterator it = readyKes.iterator();
while (it.hasNext()) {
SelectionKey key = null;
try {
key = (SelectionKey) it.next();
it.remove();
if (key.isAcceptable()) {
System.out.println("連接事件");
//連接事件
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = ssc.accept();
System.out.println("接收到客戶連接壁晒,來自:" + socketChannel.socket().getInetAddress() +
" : " + socketChannel.socket().getPort());
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.register(selector, SelectionKey.OP_READ |
SelectionKey.OP_WRITE, buffer);
} else if (key.isReadable()) {
//接收數(shù)據(jù)
receive(key);
} else if (key.isWritable()) {
//發(fā)送數(shù)據(jù)
send(key);
}
} catch (IOException e) {
e.printStackTrace();
try {
if (key != null) {
key.cancel();
key.channel().close();
}
}catch (IOException ex){
ex.printStackTrace();
}
}
}
}
}
private void send(SelectionKey key) throws IOException {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel channel = (SocketChannel) key.channel();
buffer.flip(); //把極限設(shè)置為位置瓷们,把位置設(shè)置為0
String data = decode(buffer);
if (data.indexOf("\r\n") == -1) {
return;
}
String outputData = data.substring(0, data.indexOf("\n") + 1);
System.out.println("請(qǐng)求數(shù)據(jù):" + outputData);
ByteBuffer outputBuffer = encode("echo:" + outputData);
while (outputBuffer.hasRemaining()) {
channel.write(outputBuffer);
}
ByteBuffer temp = encode(outputData);
buffer.position(temp.limit());
buffer.compact();
if (outputData.equals("bye\r\n")) {
key.cancel();
channel.close();
System.out.println("關(guān)閉與客戶的連接");
}
}
private String decode(ByteBuffer buffer) {
CharBuffer charBuffer = charset.decode(buffer);
return charBuffer.toString();
}
private ByteBuffer encode(String s) {
return charset.encode(s);
}
private void receive(SelectionKey key) throws IOException {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer readBuff = ByteBuffer.allocate(32);
socketChannel.read(readBuff);
readBuff.flip();
buffer.limit(buffer.capacity());
buffer.put(readBuff);
}
public static void main(String[] args) throws IOException {
new EchoServer2().service();
}
}
/**
* 創(chuàng)建非阻塞客戶端
*
*/
public class EchoClient2 {
private SocketChannel socketChannel;
private int port = 8001;
private Selector selector;
private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
private ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);
private Charset charset = Charset.forName("UTF-8");
public EchoClient2() throws IOException {
socketChannel = SocketChannel.open();
InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLocalHost(), port);
socketChannel.connect(inetSocketAddress);//
socketChannel.configureBlocking(false);//設(shè)置為非阻塞模式
System.out.println("與服務(wù)器連接成功");
selector = Selector.open();
}
public static void main(String[] args) throws IOException {
final EchoClient2 client = new EchoClient2();
Thread receiver = new Thread(new Runnable() {
@Override
public void run() {
client.receiveFromUser();
}
});
receiver.start();
client.talk();
}
private void receiveFromUser() {
try {
System.out.println("請(qǐng)輸入數(shù)據(jù):");
BufferedReader localReader = new BufferedReader(new InputStreamReader(System.in));
String msg = null;
while ((msg = localReader.readLine()) != null) {
System.out.println("用戶輸入的數(shù)據(jù):" + msg);
synchronized (sendBuffer) {
sendBuffer.put(encode(msg + "\r\n"));
}
if (msg.equalsIgnoreCase("bye")) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private ByteBuffer encode(String s) {
return charset.encode(s);
}
private void talk() throws IOException {
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
while (selector.select() > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
SelectionKey key = null;
try {
key = it.next();
it.remove();
if (key.isReadable()) {
//System.out.println("讀事件");
//讀事件
receive(key);
}
if (key.isWritable()) {
// System.out.println("寫事件");
//寫事件
send(key);
}
} catch (IOException e) {
e.printStackTrace();
if (key != null) {
key.cancel();
key.channel().close();
}
}
}
}
}
private void send(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
synchronized (sendBuffer) {
sendBuffer.flip();//把極限設(shè)為位置,把位置設(shè)為零
channel.write(sendBuffer);
sendBuffer.compact();//刪除已經(jīng)發(fā)送的數(shù)據(jù)秒咐。
}
}
private void receive(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
channel.read(receiveBuffer);
receiveBuffer.flip();//將limit的值設(shè)置為position的值谬晕,將position的值設(shè)置為0
String receiveData = decode(receiveBuffer);
if (receiveData.indexOf("\n") == -1) {
return;
}
String outputData = receiveData.substring(0, receiveData.indexOf("\n") + 1);
System.out.println("響應(yīng)數(shù)據(jù):" + outputData);
if (outputData.equalsIgnoreCase("echo:bye\r\n")) {
key.cancel();
socketChannel.close();
;
System.out.println("關(guān)閉與服務(wù)器的連接");
selector.close();
System.exit(0);
}
ByteBuffer temp = encode(outputData);
receiveBuffer.position(temp.limit());
receiveBuffer.compact();//刪除已經(jīng)打印的數(shù)據(jù)
}
private String decode(ByteBuffer receiveBuffer) {
CharBuffer buffer = charset.decode(receiveBuffer);
return buffer.toString();
}
}
實(shí)現(xiàn)非阻塞通信的方式
- 緩沖區(qū)
- 通道
- Selector
緩沖區(qū)
作用:減少物理讀寫次數(shù),減少內(nèi)存創(chuàng)建和銷毀次數(shù)携取。 緩沖區(qū)的屬性:capacity(最大容量)攒钳、limit(實(shí)際容量)、position(當(dāng)前位置)歹茶。<font color='red'><b>PS:</b></font>其他地方是翻譯成capacity(容量)夕玩、limit(極限)、position位置)惊豺,我個(gè)人覺得翻譯成上面的更好理解燎孟,為啥通過下面的方法解析和圖解就可明白。當(dāng)然最好通過英文表達(dá)這樣最清楚尸昧。
三個(gè)屬性的關(guān)系為:capacity≥limit≥position≥0
圖解關(guān)系如下:
緩沖區(qū)類結(jié)構(gòu):
java.nio.ByteBuffer類是一個(gè)抽象類揩页,不能被實(shí)例化。但是提供了8個(gè)具體的實(shí)現(xiàn)類烹俗,其中最基本的的緩沖區(qū)是ByteBuffer爆侣,它存放的數(shù)據(jù)單元是字節(jié)。
常用方法:
clear():把limit設(shè)置為capacity幢妄,再把位置設(shè)為0
flip():把limit設(shè)置為position兔仰,再把位置設(shè)置為0。
rewind():不改變limit蕉鸳,把位置設(shè)為0乎赴。
allocate():創(chuàng)建一個(gè)緩沖中,方法參數(shù)指定緩沖區(qū)大小
compact():將緩沖區(qū)的當(dāng)前位置和界限之間的字節(jié)(如果有)復(fù)制到緩沖區(qū)的開始處潮尝。
測(cè)試上述方法:
測(cè)試clear()方法
@Test
public void testClear() {
//創(chuàng)建一個(gè)10chars大小的緩沖區(qū)榕吼,默認(rèn)情況下limit和capacity是相等的
CharBuffer buffer = CharBuffer.allocate(10);
System.out.println("創(chuàng)建默認(rèn)情況");
printBufferInfo(buffer);
buffer.limit(8);//修改limit的值
System.out.println("修改limit后");
printBufferInfo(buffer);
// clear():把limit設(shè)置為capacity,再把位置設(shè)為0
buffer.clear();
System.out.println("執(zhí)行clear()方法后");
printBufferInfo(buffer);
}
執(zhí)行結(jié)果如下:
測(cè)試flip()方法:
@Test
public void testFlip() {
CharBuffer buffer = CharBuffer.allocate(10);
System.out.println("創(chuàng)建默認(rèn)情況");
printBufferInfo(buffer);
//put的方法會(huì)修改position的值
buffer.put('H');
buffer.put('E');
buffer.put('L');
buffer.put('L');
buffer.put('O');
System.out.println("調(diào)用put方法后:");
printBufferInfo(buffer);
//flip():把limit設(shè)置為position勉失,再把位置設(shè)置為0羹蚣。
buffer.flip();
System.out.println("調(diào)用flip方法后:");
printBufferInfo(buffer);
}
執(zhí)行結(jié)果如下:
測(cè)試rewind()方法
@Test
public void testRewind() {
CharBuffer buffer = CharBuffer.allocate(10);
System.out.println("創(chuàng)建默認(rèn)情況");
printBufferInfo(buffer);
//put的方法會(huì)修改position的值
buffer.put('H');
buffer.put('E');
buffer.put('L');
buffer.put('L');
buffer.put('O');
buffer.limit(8);
System.out.println("調(diào)用put、limit方法后:");
printBufferInfo(buffer);
//rewind():不改變limit乱凿,把位置設(shè)為0顽素。
buffer.rewind();
System.out.println("調(diào)用rewind方法后:");
printBufferInfo(buffer);
}
執(zhí)行結(jié)果如下:
測(cè)試compact()方法
@Test
public void testCompact(){
CharBuffer buffer = CharBuffer.allocate(10);
System.out.println("創(chuàng)建默認(rèn)情況");
printBufferInfo(buffer);
//put的方法會(huì)修改position的值
buffer.put('H');
buffer.put('E');
buffer.put('L');
buffer.put('L');
buffer.put('O');
buffer.limit(8);//修改limit的值
System.out.println("調(diào)用put和limit方法后:");
printBufferInfo(buffer);
System.out.println("調(diào)用compact方法后:");
//將緩沖區(qū)的當(dāng)前位置和界限之間的字節(jié)(如果有)復(fù)制到緩沖區(qū)的開始處。
buffer.compact();
printBufferInfo(buffer);
}
這是JDK中介紹該方法的作用:
將緩沖區(qū)的當(dāng)前位置和界限之間的字節(jié)(如果有)復(fù)制到緩沖區(qū)的開始處告匠。即將索引 p = position() 處的字節(jié)復(fù)制到索引 0 處戈抄,將索引 p + 1 處的字節(jié)復(fù)制到索引 1 處,依此類推后专,直到將索引 limit() - 1 處的字節(jié)復(fù)制到索引 n = limit() - 1 - p 處划鸽。然后將緩沖區(qū)的位置設(shè)置為 n+1,并將其界限設(shè)置為其容量戚哎。如果已定義了標(biāo)記裸诽,則丟棄它。
官方表示的太難理解了:
將緩沖區(qū)的當(dāng)前位置和界限之間的字節(jié)(如果有)復(fù)制到緩沖區(qū)的開始處型凳。并將limit(實(shí)際容量)設(shè)置為 capacity(最大容量)丈冬。執(zhí)行compact()方法前,limit的值是:8甘畅,position的值是:5埂蕊。按照上面描述的執(zhí)行完compact()后往弓,position的值計(jì)算方式是:n+1;n=limit-1-p;所有n=8-1-5=2,最后position的值為:2+1=3。和程序運(yùn)行的結(jié)果一致蓄氧。
可以在這種情況:從緩沖區(qū)寫入數(shù)據(jù)之后調(diào)用此方法函似,以防寫入不完整。
buf.clear(); // Prepare buffer for use
while (in.read(buf) >= 0 || buf.position != 0) {
buf.flip();
out.write(buf);
buf.compact(); // In case of partial write
}
如果out.write()方法沒有將緩存中的數(shù)據(jù)讀取完喉童,這個(gè)時(shí)候的position位置指向的是剩余數(shù)據(jù)的位置撇寞。達(dá)到防止寫入不完整。
通道
作用: 連接緩沖區(qū)與數(shù)據(jù)源或數(shù)據(jù)目的地堂氯。
常用類:
Channel
接口有下面兩個(gè)子接口ReadableByteChannel和WritableByteChannel和一個(gè)抽象實(shí)現(xiàn)類SelectableChannel蔑担。
在ReadableByteChannel接口中申明了read(ByteBuffer
dst)方法。在WritableByteChannel接口中申明了write(ByteBuffer[]
srcs):方法咽白。SelectableChannel抽象類中主要方法啤握,configureBlocking(boolean
block)、register();方法晶框。 ByteChannel
接口繼承了ReadableChannel和WritableChannel恨统。所以ByteChannel具有讀和寫的功能。ServerSocketChannel繼承了SelectableChannel類抽象類三妈,所以SocketChannel具有設(shè)置是否是阻塞模式畜埋、向selector注冊(cè)事件功能。
SocketChannel也繼承了SelectableChannel類還實(shí)現(xiàn)ByteChannel接口畴蒲,所以SocketChannel具有設(shè)置是否是阻塞模式悠鞍、向selector注冊(cè)事件、從緩沖區(qū)讀寫數(shù)據(jù)的功能模燥。
通過類圖展現(xiàn):
Selector類:
作用:只要ServerSocketChannel及SocketChannel向Selector注冊(cè)了特定的事件咖祭,Selector就會(huì)監(jiān)聽這些事件的發(fā)生。
流程:
Selector通過靜態(tài)的open()方法創(chuàng)建一個(gè)Selector對(duì)象蔫骂,SelectableChannel類向Selector注冊(cè)了特定的事件么翰。Selector就會(huì)監(jiān)控這些事件發(fā)生,Selector通過select()監(jiān)控已發(fā)生的SelectionKey對(duì)象的數(shù)目辽旋,通過selectKeys()方法返回對(duì)應(yīng)的selectionKey對(duì)象集合浩嫌。遍歷該集合得到相應(yīng)的selectionKey對(duì)象,通過該對(duì)象的channel()方法獲取關(guān)聯(lián)的SelectableChannel對(duì)象补胚,
通過selector()方法就可以獲取關(guān)聯(lián)的Selector對(duì)象码耐。
<font color='red'><b>Note:</b></font>
當(dāng)Selector的select()方法還有一個(gè)重載方式:select(long timeout)。并且該方法采用阻塞的工作方式溶其,如果相關(guān)事件的selectionKey對(duì)象的數(shù)目一個(gè)也沒有骚腥,就進(jìn)入阻塞狀態(tài)。知道出現(xiàn)以下情況之一瓶逃,才從select()方法中返回束铭。
- 至少有一個(gè)SelectionKey的相關(guān)事件已經(jīng)發(fā)生廓块。
- 其他線程調(diào)用了Selector的wakeup()方法,導(dǎo)致執(zhí)行select()方法的線程立即返回契沫。
- 當(dāng)前執(zhí)行的select()方法的線程被中斷剿骨。
- 超出了等待時(shí)間。僅限調(diào)用select(long timeout)方法時(shí)出現(xiàn)埠褪。如果沒有設(shè)置超時(shí)時(shí)間,則永遠(yuǎn)不會(huì)超時(shí)挤庇。
Selector類有兩個(gè)非常重要的方法: 靜態(tài)方法open()钞速,這是Selector的靜態(tài)工廠方法,創(chuàng)建一個(gè)Selector對(duì)象嫡秕。
selectedKeys()方法返回被Selector捕獲的SelectionKey的集合渴语。
SelectionKey類
作用:
ServerSocketChannel或SocketChannel通過register()方法向Selector注冊(cè)事件時(shí),register()方法會(huì)創(chuàng)建一個(gè)SelectionKey對(duì)象昆咽,該對(duì)象是用來跟蹤注冊(cè)事件的句柄驾凶。在SelectionKey對(duì)象的有效期間,Selector會(huì)一直監(jiān)控與SelectionKey對(duì)象相關(guān)的事件掷酗,如果事件發(fā)生调违,就會(huì)把SelectionKey對(duì)象添加到Selected-keys集合中。SelectionKey中定義的事件: 定義了4種事件:
1泻轰、SelectionKey.OP_ACCEPT:接收連接就緒事件技肩,表示服務(wù)器監(jiān)聽到了客戶連接,服務(wù)器可以接收這個(gè)連接了浮声。常量值為16.
2虚婿、SelectionKey.OP_CONNECT:連接就緒事件,表示客戶與服務(wù)器的連接已經(jīng)建立成功泳挥。常量值為8.
3然痊、SelectionKey.OP_READ:讀就緒事件,表示通道中已經(jīng)有了可讀數(shù)據(jù)可以執(zhí)行讀操作屉符。常量值為1.
4剧浸、SelectionKey.OP_WRITE:寫就緒事件,表示已經(jīng)可以向通道寫數(shù)據(jù)了矗钟。常量值為4.常用方法:
channel()方法:返回與它關(guān)聯(lián)的SelectedChannel(包括ServerSocketChannel和SocketChannel)辛蚊。
selector()方法:返回與它關(guān)聯(lián)的Selector對(duì)象。
它們之間的關(guān)系如下:
<a >視頻和電子版書籍</a>
歡迎關(guān)注微信公眾號(hào) 在路上的coder
每天分享優(yōu)秀的Java技術(shù)文章真仲!
掃描二維碼關(guān)注: