前言
在上一篇博客中,我們使用了BIO,也就是同步阻塞式IO實(shí)現(xiàn)了Socket通信蔗怠。
Java Socket編程那些事(1)
現(xiàn)在我們使用jdk1.4之后的NIO來實(shí)現(xiàn)墩弯,NIO(new io / no-blocking io),同步非阻塞IO寞射。
基本原理
服務(wù)端打開一個(gè)通道(ServerSocketChannel)渔工,并向通道中注冊一個(gè)選擇器(Selector),這個(gè)選擇器是與一些感興趣的操作的標(biāo)識(SelectionKey桥温,即通過這個(gè)標(biāo)識可以定位到具體的操作引矩,從而進(jìn)行響應(yīng)的處理)相關(guān)聯(lián)的,然后基于選擇器(Selector)輪詢通道(ServerSocketChannel)上注冊的事件侵浸,并進(jìn)行相應(yīng)的處理旺韭。
客戶端在請求與服務(wù)端通信時(shí),也可以向服務(wù)器端一樣注冊(比服務(wù)端少了一個(gè)SelectionKey.OP_ACCEPT操作集合)掏觉,并通過輪詢來處理指定的事件区端,而不必阻塞。
服務(wù)端
package com.richstonedt.socket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
/**
* @author zhangguoji
* @date 2017/9/8 20:47
*/
public class NIOServer {
private Selector selector;
/**
* 獲得一個(gè)ServerSocket通道履腋,并對該通道做一些初始化的工作
* @param port 綁定的端口號
* @throws IOException
*/
public void initServer(int port) throws IOException {
// 獲得一個(gè)ServerSocket通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 設(shè)置通道為非阻塞
serverChannel.configureBlocking(false);
// 將該通道對應(yīng)的ServerSocket綁定到本地port端口
serverChannel.socket().bind(new InetSocketAddress(port));
// 獲得一個(gè)通道管理器
this.selector = Selector.open();
//將通道管理器和該通道綁定珊燎,并為該通道注冊SelectionKey.OP_ACCEPT事件,注冊該事件后,
//當(dāng)該事件到達(dá)時(shí)遵湖,selector.select()會(huì)返回悔政,如果該事件沒到達(dá)selector.select()會(huì)一直阻塞。
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
/**
* 采用輪詢的方式監(jiān)聽selector上是否有需要處理的事件延旧,如果有谋国,則進(jìn)行處理
*
* @throws IOException
*/
public void listen() throws IOException {
System.out.println("服務(wù)端啟動(dòng)成功!");
// 輪詢訪問selector
while (true) {
//當(dāng)注冊的事件到達(dá)時(shí)迁沫,方法返回芦瘾;否則,該方法會(huì)一直阻塞
selector.select();
// 獲得selector中選中的項(xiàng)的迭代器,選中的項(xiàng)為注冊的事件
Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 刪除已選的key,以防重復(fù)處理
iterator.remove();
// 客戶端請求連接事件
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key
.channel();
// 獲得和客戶端連接的通道
SocketChannel channel = server.accept();
// 設(shè)置成非阻塞
channel.configureBlocking(false);
//在這里可以給客戶端發(fā)送信息
channel.write(ByteBuffer.wrap("向客戶端發(fā)送了一條信息".getBytes()));
//在和客戶端連接成功之后集畅,為了可以接收到客戶端的信息近弟,需要給通道設(shè)置讀的權(quán)限。
channel.register(this.selector, SelectionKey.OP_READ);
// 獲得了可讀的事件
} else if (key.isReadable()) {
read(key);
}
}
}
}
/**
* 處理讀取客戶端發(fā)來的信息 的事件
* @param key
* @throws IOException
*/
public void read(SelectionKey key) throws IOException {
// 服務(wù)器可讀取消息:得到事件發(fā)生的Socket通道
SocketChannel channel = (SocketChannel) key.channel();
// 創(chuàng)建讀取的緩沖區(qū)
ByteBuffer buffer = ByteBuffer.allocate(512);
channel.read(buffer);
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println("服務(wù)端收到信息:" + msg);
ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
channel.write(outBuffer);// 將消息回送給客戶端
}
/**
* 啟動(dòng)服務(wù)端測試
*
* @throws IOException
*/
public static void main(String[] args) throws IOException {
NIOServer server = new NIOServer();
server.initServer(8000);
server.listen();
}
}
服務(wù)端連接過程
1挺智、創(chuàng)建ServerSocketChannel實(shí)例serverSocketChannel祷愉,并bind到指定端口。
2赦颇、創(chuàng)建Selector實(shí)例selector二鳄;
3、將serverSocketChannel注冊到selector媒怯,并指定事件OP_ACCEPT订讼。
4、while循環(huán)執(zhí)行:
4.1扇苞、調(diào)用select方法欺殿,該方法會(huì)阻塞等待寄纵,直到有一個(gè)或多個(gè)通道準(zhǔn)備好了I/O操作或等待超時(shí)。
4.2祈餐、獲取選取的鍵列表擂啥;
4.3、循環(huán)鍵集中的每個(gè)鍵:
4.3.a帆阳、獲取通道,并從鍵中獲取附件(如果添加了附件)屋吨;
4.3.b蜒谤、確定準(zhǔn)備就緒的操縱并執(zhí)行,如果是accept操作至扰,將接收的信道設(shè)置為非阻塞模式鳍徽,并注冊到選擇器;
4.3.c敢课、如果需要,修改鍵的興趣操作集;
4.3.d筝野、從已選鍵集中移除鍵
客戶端
package com.richstonedt.socket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/**
* NIO客戶端
*
* @author zhangguoji
* @date 2017/9/8 21:43
*/
public class NIOClient {
//通道管理器
private Selector selector;
/**
* 獲得一個(gè)Socket通道焕议,并對該通道做一些初始化的工作
*
* @param ip 連接的服務(wù)器的ip
* @param port 連接的服務(wù)器的端口號
* @throws IOException
*/
public void initClient(String ip, int port) throws IOException {
// 獲得一個(gè)Socket通道
SocketChannel channel = SocketChannel.open();
// 設(shè)置通道為非阻塞
channel.configureBlocking(false);
// 獲得一個(gè)通道管理器
this.selector = Selector.open();
// 客戶端連接服務(wù)器,其實(shí)方法執(zhí)行并沒有實(shí)現(xiàn)連接,需要在listen()方法中調(diào)
//用channel.finishConnect();才能完成連接
channel.connect(new InetSocketAddress(ip, port));
//將通道管理器和該通道綁定壁肋,并為該通道注冊SelectionKey.OP_CONNECT事件。
channel.register(selector, SelectionKey.OP_CONNECT);
}
/**
* 采用輪詢的方式監(jiān)聽selector上是否有需要處理的事件髓帽,如果有必盖,則進(jìn)行處理
*
* @throws IOException
*/
public void listen() throws IOException {
// 輪詢訪問selector
while (true) {
selector.select();
// 獲得selector中選中的項(xiàng)的迭代器
Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 刪除已選的key,以防重復(fù)處理
iterator.remove();
// 連接事件發(fā)生
if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) key
.channel();
// 如果正在連接,則完成連接
if (channel.isConnectionPending()) {
channel.finishConnect();
}
// 設(shè)置成非阻塞
channel.configureBlocking(false);
//在這里可以給服務(wù)端發(fā)送信息哦
channel.write(ByteBuffer.wrap("向服務(wù)端發(fā)送了一條信息".getBytes()));
//在和服務(wù)端連接成功之后装盯,為了可以接收到服務(wù)端的信息,需要給通道設(shè)置讀的權(quán)限甲馋。
channel.register(this.selector, SelectionKey.OP_READ);
// 獲得了可讀的事件
} else if (key.isReadable()) {
read(key);
}
}
}
}
/**
* 處理讀取服務(wù)端發(fā)來的信息 的事件
*
* @param key
* @throws IOException
*/
public void read(SelectionKey key) throws IOException {
// 客戶端可讀取消息:得到事件發(fā)生的Socket通道
SocketChannel channel = (SocketChannel) key.channel();
// 創(chuàng)建讀取的緩沖區(qū)
ByteBuffer buffer = ByteBuffer.allocate(512);
channel.read(buffer);
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println("客戶端收到信息:" + msg);
ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
channel.write(outBuffer);// 將消息回送給服務(wù)端
}
/**
* 啟動(dòng)客戶端測試
*
* @throws IOException
*/
public static void main(String[] args) throws IOException {
NIOClient client = new NIOClient();
client.initClient("localhost", 8000);
client.listen();
}
}
客戶端連接過程:(和服務(wù)器端類似)
1埂奈、創(chuàng)建SocketChannel實(shí)例socketChannel,并連接到服務(wù)器端口
2定躏、創(chuàng)建Selector實(shí)例selector账磺;
3、將socketChannel注冊到selector痊远,并指定事件OP_CONNECT垮抗。
4、while循環(huán)執(zhí)行:
4.1碧聪、調(diào)用select方法冒版,該方法會(huì)阻塞等待逞姿,直到有一個(gè)或多個(gè)通道準(zhǔn)備好了I/O操作或等待超時(shí)辞嗡。
4.2续室、獲取選取的鍵列表挺狰;
4.3明郭、循環(huán)鍵集中的每個(gè)鍵:
4.3.a、獲取通道她渴,并從鍵中獲取附件(如果添加了附件)达址;
4.3.b、確定準(zhǔn)備就緒的操縱并執(zhí)行趁耗,如果是accept操作,將接收的信道設(shè)置為非阻塞模式疆虚,并注冊到選擇器苛败;
4.3.c、如果需要径簿,修改鍵的興趣操作集罢屈;
4.3.d、從已選鍵集中移除鍵
運(yùn)行結(jié)果
最終這兩段代碼的運(yùn)行結(jié)果就是客戶端和服務(wù)器之間不斷發(fā)送信息
server:
服務(wù)端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
服務(wù)端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
服務(wù)端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
服務(wù)端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
服務(wù)端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
服務(wù)端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
服務(wù)端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
服務(wù)端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
服務(wù)端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
服務(wù)端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
服務(wù)端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
服務(wù)端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
服務(wù)端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
服務(wù)端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
服務(wù)端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
服務(wù)端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
...
client:
客戶端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
客戶端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
客戶端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
客戶端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
客戶端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
客戶端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
客戶端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
客戶端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
客戶端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
客戶端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
客戶端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
客戶端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
客戶端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
客戶端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
客戶端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
客戶端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
客戶端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
客戶端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
客戶端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
客戶端收到信息:向服務(wù)端發(fā)送了一條信息向客戶端發(fā)送了一條信息
...
實(shí)現(xiàn)原理
其實(shí)Java的NIO使用了IO多路復(fù)用篇亭,缠捌,I/O多路復(fù)用就是通過一種機(jī)制,一個(gè)進(jìn)程可以監(jiān)視多個(gè)描述符译蒂,一旦某個(gè)描述符就緒(一般是讀就緒或者寫就緒)曼月,能夠通知程序進(jìn)行相應(yīng)的讀寫操作。
目前支持的IO多路復(fù)用有select,poll和epoll柔昼。
與多進(jìn)程和多線程技術(shù)相比哑芹,I/O多路復(fù)用技術(shù)的最大優(yōu)勢是系統(tǒng)開銷小,系統(tǒng)不必創(chuàng)建進(jìn)程/線程捕透,也不必維護(hù)這些進(jìn)程/線程聪姿,從而大大減小了系統(tǒng)的開銷。
select
select本質(zhì)上是通過設(shè)置或者檢查存放fd標(biāo)志位的數(shù)據(jù)結(jié)構(gòu)來進(jìn)行下一步處理乙嘀。這樣所帶來的缺點(diǎn)是:
- select最大的缺陷就是單個(gè)進(jìn)程所打開的FD是有一定限制的末购,它由FD_SETSIZE設(shè)置,默認(rèn)值是1024虎谢。
一般來說這個(gè)數(shù)目和系統(tǒng)內(nèi)存關(guān)系很大盟榴,具體數(shù)目可以cat /proc/sys/fs/file-max察看。32位機(jī)默認(rèn)是1024個(gè)嘉冒。64位機(jī)默認(rèn)是2048. - 對socket進(jìn)行掃描時(shí)是線性掃描曹货,即采用輪詢的方法咆繁,效率較低。
當(dāng)套接字比較多的時(shí)候顶籽,每次select()都要通過遍歷FD_SETSIZE個(gè)Socket來完成調(diào)度玩般,不管哪個(gè)Socket是活躍的,都遍歷一遍礼饱。這會(huì)浪費(fèi)很多CPU時(shí)間坏为。如果能給套接字注冊某個(gè)回調(diào)函數(shù),當(dāng)他們活躍時(shí)镊绪,自動(dòng)完成相關(guān)操作匀伏,那就避免了輪詢,這正是epoll與kqueue做的蝴韭。 - 需要維護(hù)一個(gè)用來存放大量fd的數(shù)據(jù)結(jié)構(gòu)够颠,這樣會(huì)使得用戶空間和內(nèi)核空間在傳遞該結(jié)構(gòu)時(shí)復(fù)制開銷大。
poll
基本原理:poll本質(zhì)上和select沒有區(qū)別榄鉴,它將用戶傳入的數(shù)組拷貝到內(nèi)核空間履磨,然后查詢每個(gè)fd對應(yīng)的設(shè)備狀態(tài),如果設(shè)備就緒則在設(shè)備等待隊(duì)列中加入一項(xiàng)并繼續(xù)遍歷庆尘,如果遍歷完所有fd后沒有發(fā)現(xiàn)就緒設(shè)備剃诅,則掛起當(dāng)前進(jìn)程,直到設(shè)備就緒或者主動(dòng)超時(shí)驶忌,被喚醒后它又要再次遍歷fd矛辕。這個(gè)過程經(jīng)歷了多次無謂的遍歷。
它沒有最大連接數(shù)的限制付魔,原因是它是基于鏈表來存儲(chǔ)的聊品,但是同樣有一個(gè)缺點(diǎn):
- 大量的fd的數(shù)組被整體復(fù)制于用戶態(tài)和內(nèi)核地址空間之間,而不管這樣的復(fù)制是不是有意義抒抬。
- poll還有一個(gè)特點(diǎn)是“水平觸發(fā)”杨刨,如果報(bào)告了fd后,沒有被處理擦剑,那么下次poll時(shí)會(huì)再次報(bào)告該fd妖胀。
epoll
epoll是在2.6內(nèi)核中提出的,是之前的select和poll的增強(qiáng)版本惠勒。相對于select和poll來說赚抡,epoll更加靈活,沒有描述符限制纠屋。epoll使用一個(gè)文件描述符管理多個(gè)描述符涂臣,將用戶關(guān)系的文件描述符的事件存放到內(nèi)核的一個(gè)事件表中,這樣在用戶空間和內(nèi)核空間的copy只需一次。
基本原理:epoll支持水平觸發(fā)和邊緣觸發(fā)赁遗,最大的特點(diǎn)在于邊緣觸發(fā)署辉,它只告訴進(jìn)程哪些fd剛剛變?yōu)榫途w態(tài),并且只會(huì)通知一次岩四。還有一個(gè)特點(diǎn)是哭尝,epoll使用“事件”的就緒通知方式,通過epoll_ctl注冊fd剖煌,一旦該fd就緒材鹦,內(nèi)核就會(huì)采用類似callback的回調(diào)機(jī)制來激活該fd,epoll_wait便可以收到通知耕姊。
epoll的優(yōu)點(diǎn):
- 沒有最大并發(fā)連接的限制桶唐,能打開的FD的上限遠(yuǎn)大于1024(1G的內(nèi)存上能監(jiān)聽約10萬個(gè)端口)。
- 效率提升茉兰,不是輪詢的方式尤泽,不會(huì)隨著FD數(shù)目的增加效率下降。
只有活躍可用的FD才會(huì)調(diào)用callback函數(shù)规脸;即Epoll最大的優(yōu)點(diǎn)就在于它只管你“活躍”的連接安吁,而跟連接總數(shù)無關(guān),因此在實(shí)際的網(wǎng)絡(luò)環(huán)境中燃辖,Epoll的效率就會(huì)遠(yuǎn)遠(yuǎn)高于select和poll。 - 內(nèi)存拷貝网棍,利用mmap()文件映射內(nèi)存加速與內(nèi)核空間的消息傳遞黔龟;即epoll使用mmap減少復(fù)制開銷。
epoll原理
epoll是Linux下的一種IO多路復(fù)用技術(shù)滥玷,可以非常高效的處理數(shù)以百萬計(jì)的socket句柄氏身。
c封裝后的3個(gè)epoll系統(tǒng)調(diào)用
-
int epoll_create(int size)
epoll_create建立一個(gè)epoll對象。參數(shù)size是內(nèi)核保證能夠正確處理的最大句柄數(shù)惑畴,多于這個(gè)最大數(shù)時(shí)內(nèi)核可不保證效果蛋欣。 - *nt epoll_ctl(int epfd, int op, int fd, struct epoll_event event)
epoll_ctl可以操作epoll_create創(chuàng)建的epoll,如將socket句柄加入到epoll中讓其監(jiān)控如贷,或把epoll正在監(jiān)控的某個(gè)socket句柄移出epoll陷虎。 - *int epoll_wait(int epfd, struct epoll_event events,int maxevents, int timeout)
epoll_wait在調(diào)用時(shí),在給定的timeout時(shí)間內(nèi)杠袱,所監(jiān)控的句柄中有事件發(fā)生時(shí)尚猿,就返回用戶態(tài)的進(jìn)程。
大概看看epoll內(nèi)部是怎么實(shí)現(xiàn)的:
epoll初始化時(shí)楣富,會(huì)向內(nèi)核注冊一個(gè)文件系統(tǒng)凿掂,用于存儲(chǔ)被監(jiān)控的句柄文件,調(diào)用epoll_create時(shí)纹蝴,會(huì)在這個(gè)文件系統(tǒng)中創(chuàng)建一個(gè)file節(jié)點(diǎn)庄萎。同時(shí)epoll會(huì)開辟自己的內(nèi)核高速緩存區(qū)踪少,以紅黑樹的結(jié)構(gòu)保存句柄,以支持快速的查找糠涛、插入援奢、刪除。還會(huì)再建立一個(gè)list鏈表脱羡,用于存儲(chǔ)準(zhǔn)備就緒的事件萝究。
當(dāng)執(zhí)行epoll_ctl時(shí),除了把socket句柄放到epoll文件系統(tǒng)里file對象對應(yīng)的紅黑樹上之外锉罐,還會(huì)給內(nèi)核中斷處理程序注冊一個(gè)回調(diào)函數(shù)帆竹,告訴內(nèi)核,如果這個(gè)句柄的中斷到了脓规,就把它放到準(zhǔn)備就緒list鏈表里栽连。所以,當(dāng)一個(gè)socket上有數(shù)據(jù)到了侨舆,內(nèi)核在把網(wǎng)卡上的數(shù)據(jù)copy到內(nèi)核中后秒紧,就把socket插入到就緒鏈表里。
當(dāng)epoll_wait調(diào)用時(shí)挨下,僅僅觀察就緒鏈表里有沒有數(shù)據(jù)熔恢,如果有數(shù)據(jù)就返回,否則就sleep臭笆,超時(shí)時(shí)立刻返回叙淌。
epoll的兩種工作模式:
LT:level-trigger,水平觸發(fā)模式愁铺,只要某個(gè)socket處于readable/writable狀態(tài)鹰霍,無論什么時(shí)候進(jìn)行epoll_wait都會(huì)返回該socket。
ET:edge-trigger茵乱,邊緣觸發(fā)模式茂洒,只有某個(gè)socket從unreadable變?yōu)閞eadable或從unwritable變?yōu)閣ritable時(shí),epoll_wait才會(huì)返回該socket瓶竭。