首先給大家分享一個(gè)Linux下的OpenJDK1.8源碼纸巷,這個(gè)源碼里包含了sun包源碼 裙戏,自己要去找挺難找的凸主,下面的源碼分析就用到了缴川。
OpenJDK1.8 提取碼:xbae
一茉稠、NIO簡(jiǎn)介
Java NIO主要由三個(gè)部分組成,Channel把夸、Buffer和Selector而线,Channel。
借用賽哥的一句話:NIO的本質(zhì)模型就是等待消息的到來(lái),處理到來(lái)的消息吞获。
等待消息的到來(lái)這一部分由Selector去監(jiān)聽(tīng)况凉。
處理到來(lái)的消息由Channel和Buffer處理谚鄙,通道(Channel)類(lèi)似于Java中的流(IO Stream)各拷,但是通道是雙向的(流是單向),通道中的數(shù)據(jù)必須先讀入到Buffer中闷营,在從Buffer中進(jìn)行讀取烤黍,或者先把數(shù)據(jù)寫(xiě)入到Buffer中,在把Buffer中的 數(shù)據(jù)寫(xiě)入到通道傻盟。
二速蕊、Channel
Java NIO中提供以下4種Channel:
FileChannel:從文件中讀寫(xiě)數(shù)據(jù)
DatagramChannel:通過(guò)UDP協(xié)議讀寫(xiě)網(wǎng)絡(luò)中的數(shù)據(jù)
SocketChannel:通過(guò)TCP協(xié)議讀寫(xiě)網(wǎng)絡(luò)中的數(shù)據(jù)
ServerSocketChannel:在服務(wù)器端可以監(jiān)聽(tīng)新進(jìn)來(lái)的TCP連接,像WEB服務(wù)器那樣娘赴,對(duì)每一個(gè)新進(jìn)來(lái)的請(qǐng)求創(chuàng)建一個(gè)SocketChannel
三规哲、Buffer
Java NIO 有以下Buffer類(lèi)型
ByteBuffer
MappedByteBuffer
CharBuffer
DoubleBuffer
FloatBuffer
IntBuffer
LongBuffer
ShortBuffer
我們主要是通過(guò)對(duì)Buffer進(jìn)行讀寫(xiě)操作,將數(shù)據(jù)寫(xiě)入Channel中诽表。通過(guò)研究Buffer源碼可以發(fā)現(xiàn)唉锌,Buffer其實(shí)是數(shù)組,有以下幾個(gè)屬性竿奏。
索引 | 說(shuō)明 |
---|---|
capacity | 緩沖區(qū)數(shù)組的總長(zhǎng)度 |
position | 下一個(gè)要操作的數(shù)據(jù)元素的位置 |
limit | 緩沖區(qū)數(shù)組中不可操作的下一個(gè)元素的位置:limit<=capacity |
mark | 用于記錄當(dāng)前position的前一個(gè)位置或者默認(rèn)是-1 |
Buffer的設(shè)計(jì)非常簡(jiǎn)單袄简,通過(guò)以上幾個(gè)簡(jiǎn)單的屬性就可以完成讀寫(xiě)操作,當(dāng)然泛啸,簡(jiǎn)單帶來(lái)壞處就是使用起來(lái)有點(diǎn)麻煩绿语。不過(guò)熟練后還是使用起來(lái)還是很簡(jiǎn)單的。
我們看一個(gè)非常簡(jiǎn)單的例子:
初始狀態(tài)的一個(gè)ByteBuffer(總長(zhǎng)度為10):
向Buffer中寫(xiě)入5個(gè)字節(jié):
讀取的時(shí)候 候址,調(diào)用Buffer.filp()吕粹,此時(shí)postion變?yōu)?,limit變?yōu)?5岗仑,也就是能從0讀取到4:
Buffer.compact()方法將所有未讀的數(shù)據(jù)拷貝到Buffer起始處昂芜。然后將position設(shè)到最后一個(gè)未讀元素正后面。
Buffer.rewind()方法將position設(shè)回0
Buffer.mark()方法赔蒲,可以標(biāo)記Buffer中的一個(gè)特定的position泌神,之后可以通過(guò)調(diào)用Buffer.reset()方法恢復(fù)到這個(gè)position
Buffer.rewind()方法將position設(shè)回0,所以你可以重讀Buffer中的所有數(shù)據(jù)舞虱。limit保持不變欢际,仍然表示能從Buffer中讀取多少個(gè)元素。
四矾兜、NIO的一個(gè)小Demo
首先是服務(wù)端的代碼:
public class ServerConnect {
private static final int BUF_SIZE = 1024;
private static final int PORT = 8080;
private static final int TIMEOUT = 3000;
public static void selector() {
Selector selector = null;
ServerSocketChannel ssc = null;
try {
// 打開(kāi)一個(gè)Slectore
selector = Selector.open();
// 打開(kāi)一個(gè)Channel
ssc = ServerSocketChannel.open();
// 將Channel綁定端口
ssc.socket().bind(new InetSocketAddress(PORT));
// 設(shè)置Channel為非阻塞损趋,如果設(shè)置為阻塞,其實(shí)和BIO差不多了椅寺。
ssc.configureBlocking(false);
// 向selector中注冊(cè)Channel和感興趣的事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// selector監(jiān)聽(tīng)事件浑槽,select會(huì)被阻塞蒋失,直到selector監(jiān)聽(tīng)的channel中有事件發(fā)生或者超時(shí),會(huì)返回一個(gè)事件數(shù)量
//TIMEOUT就是超時(shí)時(shí)間桐玻,selector初始化的時(shí)候會(huì)添加一個(gè)用于主動(dòng)喚醒的pipe篙挽,待會(huì)源碼分析會(huì)說(shuō)
if (selector.select(TIMEOUT) == 0) {
System.out.println("==");
continue;
}
/**
* SelectionKey的組成是selector和Channel
* 有事件發(fā)生的channel會(huì)被包裝成selectionKey添加到selector的publicSelectedKeys屬性中
* publicSelectedKeys是SelectionKey的Set集合
*下面這一部分遍歷,就是遍歷有事件的channel
*/
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isAcceptable()) {
handleAccept(key);
}
if (key.isReadable()) {
handleRead(key);
}
if (key.isWritable() && key.isValid()) {
handleWrite(key);
}
if (key.isConnectable()) {
System.out.println("isConnectable = true");
}
//每次使用完镊靴,必須將該SelectionKey移除铣卡,否則會(huì)一直存儲(chǔ)在publicSelectedKeys中
//下一次遍歷又會(huì)重復(fù)處理
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (selector != null) {
selector.close();
}
if (ssc != null) {
ssc.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void handleAccept(SelectionKey key) throws IOException {
ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
SocketChannel sc = ssChannel.accept();
sc.configureBlocking(false);
sc.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocateDirect(BUF_SIZE));
}
public static void handleRead(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buf = (ByteBuffer) key.attachment();
long bytesRead = sc.read(buf);
while (bytesRead > 0) {
buf.flip();
while (buf.hasRemaining()) {
System.out.print((char) buf.get());
}
System.out.println();
buf.clear();
bytesRead = sc.read(buf);
}
if (bytesRead == -1) {
sc.close();
}
}
public static void handleWrite(SelectionKey key) throws IOException {
ByteBuffer buf = (ByteBuffer) key.attachment();
buf.flip();
SocketChannel sc = (SocketChannel) key.channel();
while (buf.hasRemaining()) {
sc.write(buf);
}
buf.compact();
}
}
客戶端代碼:
public class Client {
public static void client() {
// 申請(qǐng)一塊空間
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = null;
Thread.currentThread().setName("client");
try {
// 打開(kāi)一個(gè)Channel
socketChannel = SocketChannel.open();
//設(shè)置為非阻塞
socketChannel.configureBlocking(false);
//連接IP和端口號(hào)
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
if (socketChannel.finishConnect()) {
int i = 0;
while (true) {
// 為了不讓消息發(fā)送太快,每發(fā)一條睡1s
TimeUnit.SECONDS.sleep(1);
String info = Thread.currentThread().getName()+":I'm " + i++ + "-th information from client";
//清空Buffer
buffer.clear();
//寫(xiě)入到Buffer中
buffer.put(info.getBytes());
//進(jìn)行flip操作偏竟,為了下面可以將buffer中數(shù)據(jù)讀取到channel中煮落。
buffer.flip();
// 將buffer中的數(shù)據(jù)寫(xiě)入到channel中
while (buffer.hasRemaining()) {
System.out.println(Thread.currentThread().getName()+":"+buffer);
int write = socketChannel.write(buffer);
System.out.println(Thread.currentThread().getName()+":"+write);
}
}
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
if (socketChannel != null) {
socketChannel.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
五、NIO源碼分析
1.NIO中selector.open()是調(diào)用了SelectorProvider.provider().openSelector()踊谋,ServerSocketChannel.open()是調(diào)用SelectorProvider.provider().openServerSocketChannel()蝉仇,兩個(gè)主要組件的開(kāi)啟都是SelectorProvider.provider()的提供,我們先看一下這個(gè)源碼殖蚕。
public static SelectorProvider provider() {
//很明顯provider是單例的
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
//會(huì)根據(jù)不同的操作系統(tǒng)創(chuàng)建不同的provider
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
/**
* 根據(jù)不同系統(tǒng)返回不同SelectorProvider.
*/
public static SelectorProvider create() {
String osname = AccessController
.doPrivileged(new GetPropertyAction("os.name"));
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux"))
return createProvider("sun.nio.ch.EPollSelectorProvider");
return new sun.nio.ch.PollSelectorProvider();
}
這一部分設(shè)計(jì)的非常好轿衔,因?yàn)镹IO需要操作系統(tǒng)底層提供支持,這一部分代碼可以根據(jù)不同的操作系統(tǒng)提供不同的實(shí)現(xiàn)嫌褪,可以讓我們不用關(guān)系底層如何實(shí)現(xiàn)的呀枢。
2.ServerSocketChannel.open()最終是new ServerSocketChannelImpl(this);this是SelectorProvider的實(shí)現(xiàn)類(lèi)的實(shí)例化對(duì)象
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
super(sp);
//獲取文件描述符
this.fd = Net.serverSocket(true);
this.fdVal = IOUtil.fdVal(fd);
this.state = ST_INUSE;
}
3.selector.open()會(huì)根據(jù)操作系統(tǒng)的不同,得到的selector也會(huì)不同笼痛,如果是windows系統(tǒng)獲取的是WindowsSelectorImpl裙秋,Linux系統(tǒng)是EPollSelectorImpl,還有一些操作系統(tǒng)會(huì)提供PollSelectorImpl缨伊。這里的EPollSelectorImpl和PollSelectorImpl對(duì)應(yīng)這上次NIO基礎(chǔ)原理中的EPoll模型和Poll模型摘刑。
首先我們來(lái)分析一下PollSelectorImpl。
PollSelectorImpl(SelectorProvider sp) {
super(sp, 1, 1);
// 本地方法刻坊,新建一個(gè)pipe枷恕,返回以long編碼的管道的兩個(gè)文件描述符。
//管道的讀端以高32位返回谭胚,
//而寫(xiě)入結(jié)束以低32位返回徐块。
//這個(gè)pipe主要用途是用來(lái)喚醒selector的
long pipeFds = IOUtil.makePipe(false);
//讀文件描述符
fd0 = (int) (pipeFds >>> 32);
//寫(xiě)文件描述符
fd1 = (int) pipeFds;
try {
// 新建一個(gè)存fd的數(shù)組
pollWrapper = new PollArrayWrapper(INIT_CAP);
// 初始化,將pipe的fd放入數(shù)組中
pollWrapper.initInterrupt(fd0, fd1);
// 新建一個(gè)存放SelectionKey的數(shù)組
channelArray = new SelectionKeyImpl[INIT_CAP];
} catch (Throwable t) {
......
}
}
再來(lái)看看EPollSelectorImpl有什么不一樣:
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
try {
//這里不再是一個(gè)數(shù)組了
pollWrapper = new EPollArrayWrapper();
//初始化灾而,添加用于中斷的pipe
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<>();
} catch (Throwable t) {
......
}
}
EPollArrayWrapper() throws IOException {
// 創(chuàng)建epoll的文件描述符
epfd = epollCreate();
int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
//開(kāi)辟一個(gè)數(shù)組胡控,存儲(chǔ)來(lái)自epoll_wait的結(jié)果的epoll_event數(shù)組
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
//文件描述符> 64k時(shí)需要使用eventHigh
if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
eventsHigh = new HashMap<>();
}
4.我們?cè)賮?lái)分析一下如何注冊(cè)事件的。注冊(cè)的時(shí)候旁趟,首先把selector和channel封裝成一個(gè)SelectionKeyImpl昼激,最終調(diào)用implRegister(),把fd添加到pollWrapper,把key添加到keys中橙困。因?yàn)閜ollWrapper數(shù)據(jù)結(jié)構(gòu)的不同瞧掺,所以添加方式也有點(diǎn)區(qū)別。
poll:
protected void implRegister(SelectionKeyImpl ski) {
synchronized (closeLock) {
if (closed)
throw new ClosedSelectorException();
//檢測(cè)容量是否夠用
if (channelArray.length == totalChannels) {
// 新建一個(gè)更大的數(shù)組
int newSize = pollWrapper.totalChannels * 2;
SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
//拷貝
for (int i=channelOffset; i<totalChannels; i++)
temp[i] = channelArray[i];
channelArray = temp;
//擴(kuò)容存儲(chǔ)fd的數(shù)組
pollWrapper.grow(newSize);
}
channelArray[totalChannels] = ski;
ski.setIndex(totalChannels);
pollWrapper.addEntry(ski.channel);
totalChannels++;
keys.add(ski);
}
}
epoll
protected void implRegister(SelectionKeyImpl ski) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
int fd = Integer.valueOf(ch.getFDVal());
fdToKey.put(fd, ski);
pollWrapper.add(fd);
keys.add(ski);
}
5.初始化準(zhǔn)備好了凡傅,我們?cè)賮?lái)分析一下select()是如何監(jiān)聽(tīng)channel的辟狈,select()最終是調(diào)用doSelect(long timeout)方法,里面調(diào)用本地方法像捶,本地方法調(diào)用的系統(tǒng)提供的操作上陕,這些操作對(duì)應(yīng)NIO基礎(chǔ)原理中的三個(gè)模型桩砰。
首先來(lái)PollSelectorImpl的:
protected int doSelect(long timeout)
throws IOException
{
if (channelArray == null)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
pollWrapper.poll(totalChannels, 0, timeout);
} finally {
end();
}
//清理那些已經(jīng)cancelled的SelectionKey
processDeregisterQueue();
//統(tǒng)計(jì)有事件發(fā)生的SelectionKey數(shù)量拓春,并把符合條件發(fā)生事件的SelectionKey添加到selectedKeys哈希表中,提供給后續(xù)使用
int numKeysUpdated = updateSelectedKeys();
// 第零個(gè)位置使用來(lái)中斷的亚隅,如果不為0硼莽,則pipe中寫(xiě)入了數(shù)據(jù),用于中斷煮纵,這里進(jìn)行重置
if (pollWrapper.getReventOps(0) != 0) {
// Clear the wakeup pipe
pollWrapper.putReventOps(0, 0);
synchronized (interruptLock) {
//將fd0的數(shù)據(jù)全部讀完
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
/**
* poll0是一個(gè)本地方法懂鸵,調(diào)用系統(tǒng)底層的實(shí)現(xiàn)了
* 對(duì)應(yīng)poll模型
*/
int poll(int numfds, int offset, long timeout) {
return poll0(pollArrayAddress + (offset * SIZE_POLLFD),
numfds, timeout);
}
EPollSelectorImpl:
protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
//清理那些已經(jīng)cancelled的SelectionKey,底層會(huì)調(diào)用epoll_ctl方法移除被epoll所監(jiān)聽(tīng)的文件描述符
processDeregisterQueue();
try {
begin();
pollWrapper.poll(timeout);
} finally {
end();
}
//清理那些已經(jīng)cancelled的SelectionKey,底層會(huì)調(diào)用epoll_ctl方法移除被epoll所監(jiān)聽(tīng)的文件描述符
processDeregisterQueue();
//更新epoll已選擇fd的密鑰。 將就緒密鑰添加到就緒隊(duì)列行疏。
int numKeysUpdated = updateSelectedKeys();
//判斷是否為中斷匆光,如果中斷了,則清除記錄的中斷位置的內(nèi)容
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
/**
* epollWait也是一個(gè)本地方法
* 對(duì)應(yīng)epoll模型
*/
int poll(long timeout) throws IOException {
updateRegistrations();
// 調(diào)用系統(tǒng)底層的實(shí)現(xiàn)酿联,會(huì)將有事件的fd放在pollArray中
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
// 查詢是否存在中斷终息,并且記錄中斷事件的位置
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
6.NIO還有一個(gè)小細(xì)節(jié),就是我們最開(kāi)始建立selelctor的時(shí)候贞让,會(huì)創(chuàng)建一個(gè)pipe周崭,我之前也提到了這個(gè)pipe是用來(lái)喚醒selector,selector調(diào)用select()方法后喳张,會(huì)進(jìn)入阻塞狀態(tài)续镇,如果沒(méi)有事件他會(huì)一直阻塞,那么我們?nèi)绾沃鲃?dòng)喚醒呢销部,于是就用到了這個(gè)pipe摸航。
PollSelectorImpl和EPollSelectorImpl實(shí)現(xiàn)都是如下方式:
public Selector wakeup() {
synchronized (interruptLock) {
if (!interruptTriggered) {
pollWrapper.interrupt();
interruptTriggered = true;
}
}
return this;
}
我對(duì)Linux下的pipe不太了解,我猜應(yīng)該是使用了中斷指令舅桩。
在Windows下酱虎,WindowsSelectorImpl的實(shí)現(xiàn)方式:
public Selector wakeup() {
synchronized(this.interruptLock) {
if (!this.interruptTriggered) {
this.setWakeupSocket();
this.interruptTriggered = true;
}
return this;
}
}
其中setWakeupSocket()方法會(huì)調(diào)用一個(gè)本地方法setWakeupSocket0(),這個(gè)本地方法會(huì)想pipe中發(fā)送一個(gè)字節(jié)江咳,selector就能夠監(jiān)聽(tīng)到這個(gè)pipe中有讀事件逢净,然后selector就被喚醒了。
貼一張NIO各個(gè)組件之間的關(guān)系圖,看完源碼后可以仔細(xì)看一下這幅圖爹土,再自己跟著源碼走一遍甥雕。
6、總結(jié)和反思
NIO源碼分析到此結(jié)束了胀茵,此次閱讀源碼過(guò)程還是有點(diǎn)困難社露,但從中獲取到了很多的新知識(shí)。
1.最開(kāi)始琼娘,我以為大學(xué)學(xué)的操作系統(tǒng)沒(méi)太大用處峭弟,現(xiàn)在發(fā)現(xiàn),涉及到底層原理時(shí)脱拼,離不開(kāi)操作系統(tǒng)的知識(shí)瞒瘸。以后得抽個(gè)時(shí)間把操作系統(tǒng)在好好看一遍。
2.NIO中用到了Reactor設(shè)計(jì)模式熄浓,有效的解決基于輪詢方式的效率低的問(wèn)題
3.select情臭、poll和epoll底層數(shù)據(jù)各不相同,poll采用鏈表赌蔑,解決了fd數(shù)量的限制俯在,epoll底層使用的是紅黑樹(shù),能夠有效的提升效率娃惯。
4.NIO并不一定是非常高效的跷乐,在連接數(shù)量大,且連接比較短的情況下趾浅,NIO效率非常高愕提,但是在連接數(shù)量小,且一次性發(fā)送大量數(shù)據(jù)的情況下潮孽,可以選擇BIO加多線程的方式處理揪荣。
5.除了NIO,還有一個(gè)AIO往史,以后有空可以研究研究仗颈。