在第二章中用BIO編程模型缓呛,簡(jiǎn)單的實(shí)現(xiàn)了一個(gè)聊天室。但是其最大的問(wèn)題在解釋BIO時(shí)就已經(jīng)說(shuō)了:ServerSocket接收請(qǐng)求時(shí)(accept()方法)杭隙、InputStream哟绊、OutputStream(輸入輸出流的讀和寫)都是阻塞的。還有一個(gè)問(wèn)題就是線程池痰憎,線程多了票髓,服務(wù)器性能耗不起攀涵。線程少了,在聊天室這種場(chǎng)景下洽沟,讓用戶等待連接肯定不可取以故。今天要說(shuō)到的NIO編程模型就很好的解決了這幾個(gè)問(wèn)題。有兩個(gè)主要的替換地方:
1.用Channel代替Stream裆操。2.使用Selector監(jiān)控多條Channel怒详,起到類似線程池的作用,但是它只需一條線程踪区。
既然要用NIO編程模型昆烁,那就要說(shuō)說(shuō)它的三個(gè)主要核心:Selector、Channel缎岗、Buffer静尼。它們的關(guān)系是:一個(gè)Selector管理多個(gè)Channel,一個(gè)Channel可以往Buffer中寫入和讀取數(shù)據(jù)传泊。Buffer名叫緩沖區(qū)鼠渺,底層其實(shí)是一個(gè)數(shù)組,會(huì)提供一些方法往數(shù)組寫入讀取數(shù)據(jù)或渤。
Buffer:
不太了解Buffer的可以看看這個(gè):https://blog.csdn.net/czx2018/article/details/89502699
常用API:
allocate() - 初始化一塊緩沖區(qū)
put() - 向緩沖區(qū)寫入數(shù)據(jù)
get() - 向緩沖區(qū)讀數(shù)據(jù)
filp() - 將緩沖區(qū)的讀寫模式轉(zhuǎn)換
clear() - 這個(gè)并不是把緩沖區(qū)里的數(shù)據(jù)清除系冗,而是利用后來(lái)寫入的數(shù)據(jù)來(lái)覆蓋原來(lái)寫入的數(shù)據(jù),以達(dá)到類似清除了老的數(shù)據(jù)的效果
compact() - 從讀數(shù)據(jù)切換到寫模式薪鹦,數(shù)據(jù)不會(huì)被清空掌敬,會(huì)將所有未讀的數(shù)據(jù)copy到緩沖區(qū)頭部,后續(xù)寫數(shù)據(jù)不會(huì)覆蓋池磁,而是在這些數(shù)據(jù)之后寫數(shù)據(jù)
mark() - 對(duì)position做出標(biāo)記奔害,配合reset使用
reset() - 將position置為標(biāo)記值
簡(jiǎn)單地說(shuō):Buffer實(shí)質(zhì)上是個(gè)數(shù)組,有兩個(gè)關(guān)鍵的指針地熄,一個(gè)position代表當(dāng)前數(shù)據(jù)寫入到哪了华临、一個(gè)limit代表限制。初始化時(shí)設(shè)置了數(shù)組長(zhǎng)度端考,這limit就是數(shù)組的長(zhǎng)度雅潭。如:設(shè)置intBuffer.allocate(10),最大存儲(chǔ)10個(gè)int數(shù)據(jù)却特,寫入5五個(gè)數(shù)據(jù)后扶供,需要讀取數(shù)據(jù)了。用filp()轉(zhuǎn)換讀寫模式后裂明,limit=position椿浓,position=0。也就是說(shuō)從0開(kāi)始讀,只能讀到第五個(gè)扳碍。讀完后這個(gè)緩沖區(qū)就需要clear()了提岔,實(shí)際上并沒(méi)有真的去清空數(shù)據(jù),而是position和limit兩個(gè)指針又回到了初始化的位置笋敞,接著又可以寫入數(shù)據(jù)了碱蒙,反正數(shù)組下標(biāo)相同重新寫入數(shù)據(jù)會(huì)覆蓋,就沒(méi)必要真的去清空了夯巷。
Channel:
Channel(通道)主要用于傳輸數(shù)據(jù)振亮,然后從Buffer中寫入或讀取。它們兩個(gè)結(jié)合起來(lái)雖然和流有些相似鞭莽,但主要有以下幾點(diǎn)區(qū)別:
1.流是單向的,可以發(fā)現(xiàn)Stream的輸入流和輸出流是獨(dú)立的麸祷,它們只能輸入或輸出澎怒。而通道既可以讀也可以寫。
2.通道本身不能存放數(shù)據(jù)阶牍,只能借助Buffer喷面。
3.Channel支持異步。
Channel有如下三個(gè)常用的類:FileChannel走孽、SocketChannel惧辈、ServerSocketChannel。從名字也可以看出區(qū)別磕瓷,第一個(gè)是對(duì)文件數(shù)據(jù)的讀寫盒齿,后面兩個(gè)則是針對(duì)Socket和ServerSocket,這里我們只是用后面兩個(gè)困食。更詳細(xì)的用法可以看:https://www.cnblogs.com/snailclimb/p/9086335.html边翁,下面的代碼中也會(huì)用到,會(huì)有詳細(xì)的注釋硕盹。
Selector
多個(gè)Channel可以注冊(cè)到Selector符匾,就可以直接通過(guò)一個(gè)Selector管理多個(gè)通道。Channel在不同的時(shí)間或者不同的事件下有不同的狀態(tài)瘩例,Selector會(huì)通過(guò)輪詢來(lái)達(dá)到監(jiān)視的效果啊胶,如果查到Channel的狀態(tài)正好是我們注冊(cè)時(shí)聲明的所要監(jiān)視的狀態(tài),我們就可以查出這些通道垛贤,然后做相應(yīng)的處理焰坪。這些狀態(tài)如下:
1.客戶端的SocketChannel和服務(wù)器端建立連接,SocketChannel狀態(tài)就是Connect南吮。
2.服務(wù)器端的ServerSocketChannel接收了客戶端的請(qǐng)求琳彩,ServerSocketChannel狀態(tài)就是Accept。
3.當(dāng)SocketChannel有數(shù)據(jù)可讀,那么它們的狀態(tài)就是Read露乏。
4.當(dāng)我們需要向Channel中寫數(shù)據(jù)時(shí)碧浊,那么它們的狀態(tài)就是Write。
具體的使用見(jiàn)下面代碼注釋或看https://www.cnblogs.com/snailclimb/p/9086334.html
NIO編程模型
NIO編程模型工作流程:
1.首先會(huì)創(chuàng)建一個(gè)Selector瘟仿,用來(lái)監(jiān)視管理各個(gè)不同的Channel箱锐,也就是不同的客戶端。相當(dāng)于取代了原來(lái)BIO的線程池劳较,但是它只需一個(gè)線程就可以處理多個(gè)Channel驹止,沒(méi)有了線程上下文切換帶來(lái)的消耗,很好的優(yōu)化了性能观蜗。
2.創(chuàng)建一個(gè)ServerSocketChannel監(jiān)聽(tīng)通信端口臊恋,并注冊(cè)到Selector,讓Seletor監(jiān)視這個(gè)通道的Accept狀態(tài)墓捻,也就是接收客戶端請(qǐng)求的狀態(tài)抖仅。
3.此時(shí)客戶端ClientA請(qǐng)求服務(wù)器,那么Selector就知道了有客戶端請(qǐng)求進(jìn)來(lái)砖第。這時(shí)候我們可以得到客戶端的SocketChannel撤卢,并為這個(gè)通道注冊(cè)Read狀態(tài),也就是Selector會(huì)監(jiān)聽(tīng)ClientA發(fā)來(lái)的消息梧兼。
4.一旦接收到ClientA的消息放吩,就會(huì)用其他客戶端的SocketChannel的Write狀態(tài),向它們轉(zhuǎn)發(fā)ClientA的消息羽杰。
上代碼之前渡紫,還是先說(shuō)說(shuō)各個(gè)類的作用:
相比較BIO的代碼,NIO的代碼還少了一個(gè)類忽洛,那就是服務(wù)器端的工作線程類腻惠。沒(méi)了線程池,自然也不需要一個(gè)單獨(dú)的線程去服務(wù)客戶端欲虚〖啵客戶端還是需要一個(gè)單獨(dú)的線程去等待用戶輸入,因?yàn)橛脩綦S時(shí)都可能輸入信息复哆,這個(gè)沒(méi)法預(yù)見(jiàn)欣喧,只能阻塞式的等待。
ChatServer:服務(wù)器端的唯一的類梯找,作用就是通過(guò)Selector監(jiān)聽(tīng)Read和Accept事件唆阿,并針對(duì)這些事件的類型,進(jìn)行不同的處理锈锤,如連接驯鳖、轉(zhuǎn)發(fā)闲询。
ChatClient:客戶端,通過(guò)Selector監(jiān)聽(tīng)Read和Connect事件浅辙。Read事件就是獲取服務(wù)器轉(zhuǎn)發(fā)的消息然后顯示出來(lái)扭弧;Connect事件就是和服務(wù)器建立連接,建立成功后就可以發(fā)送消息记舆。
UserInputHandler:專門等待用戶輸入的線程鸽捻,和BIO沒(méi)區(qū)別。
ChatServer
public class ChatServer {
//設(shè)置緩沖區(qū)的大小泽腮,這里設(shè)置為1024個(gè)字節(jié)
private static final int BUFFER = 1024;
//Channel都要配合緩沖區(qū)進(jìn)行讀寫御蒲,所以這里創(chuàng)建一個(gè)讀緩沖區(qū)和一個(gè)寫緩沖區(qū)
//allocate()靜態(tài)方法就是設(shè)置緩存區(qū)大小的方法
private ByteBuffer read_buffer = ByteBuffer.allocate(BUFFER);
private ByteBuffer write_buffer = ByteBuffer.allocate(BUFFER);
//為了監(jiān)聽(tīng)端口更靈活,再不寫死了诊赊,用一個(gè)構(gòu)造函數(shù)設(shè)置需要監(jiān)聽(tīng)的端口號(hào)
private int port;
public ChatServer(int port) {
this.port = port;
}
private void start() {
//創(chuàng)建ServerSocketChannel和Selector并打開(kāi)
try (ServerSocketChannel server = ServerSocketChannel.open(); Selector selector = Selector.open()) {
//【重點(diǎn),實(shí)現(xiàn)NIO編程模型的關(guān)鍵】configureBlocking設(shè)置ServerSocketChannel為非阻塞式調(diào)用,Channel默認(rèn)的是阻塞的調(diào)用方式
server.configureBlocking(false);
//綁定監(jiān)聽(tīng)端口,這里不是給ServerSocketChannel綁定厚满,而是給ServerSocket綁定,socket()就是獲取通道原生的ServerSocket或Socket
server.socket().bind(new InetSocketAddress(port));
//把server注冊(cè)到Selector并監(jiān)聽(tīng)Accept事件
server.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("啟動(dòng)服務(wù)器碧磅,監(jiān)聽(tīng)端口:" + port);
while (true) {
//select()會(huì)返回此時(shí)觸發(fā)了多少個(gè)Selector監(jiān)聽(tīng)的事件
if(selector.select()>0) {
//獲取這些已經(jīng)觸發(fā)的事件,selectedKeys()返回的是觸發(fā)事件的所有信息
Set<SelectionKey> selectionKeys = selector.selectedKeys();
//循環(huán)處理這些事件
for (SelectionKey key : selectionKeys) {
handles(key, selector);
}
//處理完后清空selectedKeys痰滋,避免重復(fù)處理
selectionKeys.clear();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
//處理事件的方法
private void handles(SelectionKey key, Selector selector) throws IOException {
//當(dāng)觸發(fā)了Accept事件,也就是有客戶端請(qǐng)求進(jìn)來(lái)
if (key.isAcceptable()) {
//獲取ServerSocketChannel
ServerSocketChannel server = (ServerSocketChannel) key.channel();
//然后通過(guò)accept()方法接收客戶端的請(qǐng)求续崖,這個(gè)方法會(huì)返回客戶端的SocketChannel,這一步和原生的ServerSocket類似
SocketChannel client = server.accept();
client.configureBlocking(false);
//把客戶端的SocketChannel注冊(cè)到Selector团搞,并監(jiān)聽(tīng)Read事件
client.register(selector, SelectionKey.OP_READ);
System.out.println("客戶端[" + client.socket().getPort() + "]上線啦严望!");
}
//當(dāng)觸發(fā)了Read事件,也就是客戶端發(fā)來(lái)了消息
if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
//獲取消息
String msg = receive(client);
System.out.println("客戶端[" + client.socket().getPort() + "]:" + msg);
//把消息轉(zhuǎn)發(fā)給其他客戶端
sendMessage(client, msg, selector);
//判斷用戶是否退出
if (msg.equals("quit")) {
//解除該事件的監(jiān)聽(tīng)
key.cancel();
//更新Selector
selector.wakeup();
System.out.println("客戶端[" + client.socket().getPort() + "]下線了逻恐!");
}
}
}
//編碼方式設(shè)置為utf-8像吻,下面字符和字符串互轉(zhuǎn)時(shí)用得到
private Charset charset = Charset.forName("UTF-8");
//接收消息的方法
private String receive(SocketChannel client) throws IOException {
//用緩沖區(qū)之前先清空一下,避免之前的信息殘留
read_buffer.clear();
//把通道里的信息讀取到緩沖區(qū),用while循環(huán)一直讀取复隆,直到讀完所有消息拨匆。因?yàn)闆](méi)有明確的類似\n這樣的結(jié)尾,所以要一直讀到?jīng)]有字節(jié)為止
while (client.read(read_buffer) > 0) ;
//把消息讀取到緩沖區(qū)后挽拂,需要轉(zhuǎn)換buffer的讀寫狀態(tài)惭每,不明白的看看前面的Buffer的講解
read_buffer.flip();
return String.valueOf(charset.decode(read_buffer));
}
//轉(zhuǎn)發(fā)消息的方法
private void sendMessage(SocketChannel client, String msg, Selector selector) throws IOException {
msg = "客戶端[" + client.socket().getPort() + "]:" + msg;
//獲取所有客戶端,keys()與前面的selectedKeys不同,這個(gè)是獲取所有已經(jīng)注冊(cè)的信息亏栈,而selectedKeys獲取的是觸發(fā)了的事件的信息
for (SelectionKey key : selector.keys()) {
//排除服務(wù)器和本客戶端并且保證key是有效的台腥,isValid()會(huì)判斷Selector監(jiān)聽(tīng)是否正常、對(duì)應(yīng)的通道是保持連接的狀態(tài)等
if (!(key.channel() instanceof ServerSocketChannel) && !client.equals(key.channel()) && key.isValid()) {
SocketChannel otherClient = (SocketChannel) key.channel();
write_buffer.clear();
write_buffer.put(charset.encode(msg));
write_buffer.flip();
//把消息寫入到緩沖區(qū)后绒北,再把緩沖區(qū)的內(nèi)容寫到客戶端對(duì)應(yīng)的通道中
while (write_buffer.hasRemaining()) {
otherClient.write(write_buffer);
}
}
}
}
public static void main(String[] args) {
new ChatServer(8888).start();
}
}
ChatClient
public class ChatClient {
private static final int BUFFER = 1024;
private ByteBuffer read_buffer = ByteBuffer.allocate(BUFFER);
private ByteBuffer write_buffer = ByteBuffer.allocate(BUFFER);
//聲明成全局變量是為了方便下面一些工具方法的調(diào)用黎侈,就不用try with resource了
private SocketChannel client;
private Selector selector;
private Charset charset = Charset.forName("UTF-8");
private void start() {
try {
client=SocketChannel.open();
selector=Selector.open();
client.configureBlocking(false);
//注冊(cè)channel,并監(jiān)聽(tīng)SocketChannel的Connect事件
client.register(selector, SelectionKey.OP_CONNECT);
//請(qǐng)求服務(wù)器建立連接
client.connect(new InetSocketAddress("127.0.0.1", 8888));
//和服務(wù)器一樣闷游,不停的獲取觸發(fā)事件峻汉,并做相應(yīng)的處理
while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey key : selectionKeys) {
handle(key);
}
selectionKeys.clear();
}
} catch (IOException e) {
e.printStackTrace();
}catch (ClosedSelectorException e){
//當(dāng)用戶輸入quit時(shí)贴汪,在send()方法中,selector會(huì)被關(guān)閉休吠,而在上面的無(wú)限while循環(huán)中扳埂,可能會(huì)使用到已經(jīng)關(guān)閉了的selector。
//所以這里捕捉一下異常蛛碌,做正常退出處理就行了聂喇。不會(huì)對(duì)服務(wù)器造成影響
}
}
private void handle(SelectionKey key) throws IOException {
//當(dāng)觸發(fā)connect事件,也就是服務(wù)器和客戶端建立連接
if (key.isConnectable()) {
SocketChannel client = (SocketChannel) key.channel();
//finishConnect()返回true蔚携,說(shuō)明和服務(wù)器已經(jīng)建立連接希太。如果是false,說(shuō)明還在連接中酝蜒,還沒(méi)完全連接完成
if(client.finishConnect()){
//新建一個(gè)新線程去等待用戶輸入
new Thread(new UserInputHandler(this)).start();
}
//連接建立完成后誊辉,注冊(cè)read事件,開(kāi)始監(jiān)聽(tīng)服務(wù)器轉(zhuǎn)發(fā)的消息
client.register(selector,SelectionKey.OP_READ);
}
//當(dāng)觸發(fā)read事件亡脑,也就是獲取到服務(wù)器的轉(zhuǎn)發(fā)消息
if(key.isReadable()){
SocketChannel client = (SocketChannel) key.channel();
//獲取消息
String msg = receive(client);
System.out.println(msg);
//判斷用戶是否退出
if (msg.equals("quit")) {
//解除該事件的監(jiān)聽(tīng)
key.cancel();
//更新Selector
selector.wakeup();
}
}
}
//獲取消息
private String receive(SocketChannel client) throws IOException{
read_buffer.clear();
while (client.read(read_buffer)>0);
read_buffer.flip();
return String.valueOf(charset.decode(read_buffer));
}
//發(fā)送消息
public void send(String msg) throws IOException{
if(!msg.isEmpty()){
write_buffer.clear();
write_buffer.put(charset.encode(msg));
write_buffer.flip();
while (write_buffer.hasRemaining()){
client.write(write_buffer);
}
if(msg.equals("quit")){
selector.close();
}
}
}
public static void main(String[] args) {
new ChatClient().start();
}
}
UserInputHandler
public class UserInputHandler implements Runnable {
ChatClient client;
public UserInputHandler(ChatClient chatClient) {
this.client=chatClient;
}
@Override
public void run() {
BufferedReader read=new BufferedReader(
new InputStreamReader(System.in)
);
while (true){
try {
String input=read.readLine();
client.send(input);
if(input.equals("quit"))
break;
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
測(cè)試運(yùn)行:之前用的是win10的終端運(yùn)行的堕澄,以后直接用IDEA運(yùn)行,方便些霉咨。不過(guò)一個(gè)類同時(shí)運(yùn)行多個(gè)蛙紫,以實(shí)現(xiàn)多個(gè)客戶端的場(chǎng)景,需要先做以下設(shè)置
[圖片上傳中...(image-47ea3c-1641479374845-3)]
[圖片上傳中...(image-8c393-1641479374845-2)]
[圖片上傳中...(image-b9b40-1641479374845-1)]
[圖片上傳中...(image-de9dea-1641479374845-6)]
設(shè)置完后途戒,就可以同時(shí)運(yùn)行兩個(gè)ChatClient了坑傅,上圖中得Unnamed就是第二個(gè)ChatClient,選中后點(diǎn)擊右邊運(yùn)行按鈕就行了喷斋。效果如下:
[圖片上傳中...(image-f48630-1641479374845-0)]
[圖片上傳中...(image-6a67cf-1641479374845-5)]
[圖片上傳中...(image-9a7c45-1641479374845-4)]