NIO 優(yōu)勢(shì)在于使用了Selector/Channel,Selector可以根據(jù)注冊(cè)到其中的channel,判斷key,來(lái)執(zhí)行不同的事件饰剥。
服務(wù)端:
- 創(chuàng)建ServerSocketChannel, 并設(shè)置為非阻塞模式
- 綁定端口
- 獲取Selector,并將ServerSocketChannel注冊(cè)到Selector中
- 獲取客戶端讀取事件薛匪,將讀取的內(nèi)容分發(fā)到其他客戶端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
/**
* 群聊轉(zhuǎn)發(fā)
*/
public class Server {
private Selector selector;
private ServerSocketChannel ssChannel;
private static final int PORT = 8888;
public Server() {
try {
ssChannel = ServerSocketChannel.open();
ssChannel.configureBlocking(false);
ssChannel.bind(new InetSocketAddress(PORT));
selector = Selector.open();
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void listen() {
try {
while (selector.select() > 0) {
System.out.println("selector準(zhǔn)備就緒");
// 獲取當(dāng)前選擇器種所有注冊(cè)的“事件key”
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey sk = iterator.next();
if (sk.isAcceptable()) {
System.out.println("selector有數(shù)據(jù) isAcceptable:" + sk.isAcceptable());
// 準(zhǔn)備就緒捐川,則獲取客戶端連接
SocketChannel channel = ssChannel.accept();
// 切換為非阻塞
channel.configureBlocking(false);
// 將該通道注冊(cè)到選擇器上
channel.register(selector, SelectionKey.OP_READ);
} else if (sk.isReadable()) {
System.out.println("selector有數(shù)據(jù) isReadable:" + sk.isReadable());
// 轉(zhuǎn)發(fā)
readClientData(sk);
}
// 移除該事件
iterator.remove();
}
}
} catch (Exception e) {
System.out.println("socket處理異常");
e.printStackTrace();
}
}
private void readClientData(SelectionKey sk) throws IOException {
SocketChannel sChannel = null;
try {
sChannel = (SocketChannel) sk.channel();
// 讀取
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = sChannel.read(buffer);
if (len > 0) {
buffer.flip();
// 轉(zhuǎn)發(fā)到其他客戶端
String msg = new String(buffer.array(), 0, len);
System.out.println("接收到客戶端消息"+ msg);
sendMsgToAll(msg, sChannel);
}
} catch (Exception e) {
try {
System.out.println("用戶下線" + sChannel.getRemoteAddress());
sk.cancel();
sChannel.close();
}catch (Exception ex) {
System.out.println("關(guān)閉客戶端");
}
e.printStackTrace();
}
}
private void sendMsgToAll(String buffer, SocketChannel sChannel) throws IOException {
// 獲取全部的在線channel
Set<SelectionKey> keys = selector.keys();
for (SelectionKey selectionKey : keys) {
// 獲取channel
Channel channel = selectionKey.channel();
if (channel instanceof SocketChannel && channel != sChannel) {
// 緩沖區(qū)
ByteBuffer msg = ByteBuffer.wrap(buffer.getBytes());
((SocketChannel)channel).write(msg);
}
}
}
public static void main(String[] args) throws IOException {
Server server = new Server();
server.listen();
}
}
客戶端:
- 獲取Selector,
- 獲取SocketChannel
- 將SocketChannel注冊(cè)到Selector
- selector監(jiān)聽(tīng)OP_READ事件
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
public class Client {
private Selector selector;
private SocketChannel sChannel;
private static final int PORT = 8888;
private String clientName = "client_1";
public Client() {
try {
selector = Selector.open();
sChannel = SocketChannel.open(new InetSocketAddress(PORT));
sChannel.configureBlocking(false);
sChannel.register(selector, SelectionKey.OP_READ);
System.out.println(Thread.currentThread().getName() + "啟動(dòng)成功");
} catch (IOException e) {
e.printStackTrace();
}
}
public Client(String name) {
try {
clientName = name;
selector = Selector.open();
sChannel = SocketChannel.open(new InetSocketAddress(PORT));
sChannel.configureBlocking(false);
sChannel.register(selector, SelectionKey.OP_READ);
System.out.println(Thread.currentThread().getName() + "啟動(dòng)成功");
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
Client client = new Client("client_2");
new Thread(new Runnable() {
@Override
public void run() {
try {
client.readInfo();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}).start();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String str = scanner.nextLine();
client.sendMessage2Server(client.clientName, str);
}
}
private void sendMessage2Server(String clientName, String msg) throws IOException {
sChannel.write(ByteBuffer.wrap((clientName + ": " + msg).getBytes()));
}
private void readInfo() throws IOException {
while (selector.select() > 0) {
// 獲取當(dāng)前選擇器種所有注冊(cè)的“事件key”
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey sk = iterator.next();
if (sk.isReadable()) {
// 獲取選擇器上 讀就緒 的通道
SocketChannel sChannel = (SocketChannel) sk.channel();
// 讀取
ByteBuffer buffer = ByteBuffer.allocate(1024);
sChannel.read(buffer);
System.out.println(new String(buffer.array()).trim());
}
// 移除該事件
iterator.remove();
}
}
}
}