一.Selector是什么
- Selector選擇器類管理著一個被注冊的通道集合的信息和它們的就緒狀態(tài)铺然。通道是和選擇器一起被注冊的,并且使用選擇器來更新通道的就緒狀態(tài)赡鲜。
- 一個通道可以被注冊到多個Selector選擇器上飞崖,但對每個選擇器而言只能被注冊一次。
- SelectionKey:選擇鍵封裝了特定的通道與特定的Selector選擇器的注冊關(guān)系
connect:客戶端連接服務(wù)端事件隘截,對應(yīng)值為SelectionKey.OP_CONNECT
accept:服務(wù)端接收客戶端連接事件,對應(yīng)值為SelectionKey.OP_ACCEPT
read:讀事件,對應(yīng)值為SelectionKey.OP_READ
write:寫事件婶芭,對應(yīng)值為SelectionKey.OP_WRITE
二.層次圖
- 1.Selector實現(xiàn)open方法东臀,我們發(fā)現(xiàn)SelectorProvider類。SocketChannel犀农、ServerSocketChannel和Selector的實例初始化都通過SelectorProvider類實現(xiàn)
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
- 2.AbstractSelector取消的key放在一個set集合中惰赋,對集合進行添加操作時,必須同步取消key set集合呵哨。反注冊選擇key完成的實際工作是赁濒,將key從key對應(yīng)的通道的選擇key數(shù)組中移除。
private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();
void cancel(SelectionKey k) { // package-private
synchronized (cancelledKeys) {
cancelledKeys.add(k);
}
}
- 3.SelectorImpl:
- 其他線程獲取選擇器的就緒key和key集合孟害,實際上返回的是key集合的代理publicKeys和就緒key集合的代理publicSelectedKeys拒炎。
protected Set selectedKeys;//已經(jīng)操作事件準(zhǔn)備就緒的選擇key(為解決1.4bug而存在)
protected HashSet keys;//與選擇器關(guān)聯(lián)的key集合(為解決1.4bug而存在)
private Set publicKeys;//外部訪問key集合的代理,真正使用的
private Set publicSelectedKeys;//外部訪問就緒key集合代理,真正使用的 ;
public Set<SelectionKey> selectedKeys() {...return this.publicSelectedKeys; }}
public Set<SelectionKey> keys() { ....return this.publicKeys;}
- select方法(準(zhǔn)備就緒事件數(shù)):委托給為lockAndDoSelect同步方法挨务,獲取key集合代理publicKeys和就緒key代理集合publicSelectedKeys击你,然后交給doSelect(long l)方法,這個方法為抽象方法谎柄,待子類擴展丁侄。
public int select() throws IOException {
return this.select(0L);
}
private int lockAndDoSelect(long var1//**超時時間**/) throws IOException {
synchronized(this) {
if(!this.isOpen()) {
throw new ClosedSelectorException();
} else {
Set var4 = this.publicKeys;
int var10000;
synchronized(this.publicKeys) {
Set var5 = this.publicSelectedKeys;
synchronized(this.publicSelectedKeys) {
var10000 = this.doSelect(var1);
}
}
return var10000;
}
}
}
protected abstract int doSelect(long var1) throws IOException;
- register方法:
protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {
if(!(var1 instanceof SelChImpl)) {
throw new IllegalSelectorException();
} else {
SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);
//設(shè)置key的附加物
var4.attach(var3);
Set var5 = this.publicKeys;
synchronized(this.publicKeys) {
//完成實際的注冊工作
this.implRegister(var4);
}
//設(shè)置key的興趣事件集 ,var2是 SelectionKey關(guān)系值
var4.interestOps(var2);
return var4;
}
}
//待子類實現(xiàn)
protected abstract void implRegister(SelectionKeyImpl selectionkeyimpl);
背景知識
1.FileDescriptor:文件描述符用于描述系統(tǒng)底層的特殊的結(jié)構(gòu)句柄,可以被用來表示開放文件谷誓、開放
套接字等,包含 private int fd;//文件描述值 和private long handle;//初始化文件描述句柄
2.句柄:標(biāo)識應(yīng)用程序中的不同對象應(yīng)用程序能夠通過句柄訪問相應(yīng)的對象的信息绒障,但是句柄不是指針,程序不能利用句柄來直接閱讀文件中的信息捍歪。如果句柄不在IO文件中户辱,它是毫無用處的。 句柄是Windows用來標(biāo)志應(yīng)用程序中建立的或是使用的唯一整數(shù)糙臼,大量使用了句柄來標(biāo)識對象庐镐。
- 4.WindowsSelectorImpl
全局變量
private final int INIT_CAP = 8;//選擇key集合,key包裝集合初始化容量
private static final int MAX_SELECTABLE_FDS = 1024;//最大選擇key數(shù)量
private SelectionKeyImpl channelArray[];//選擇器關(guān)聯(lián)通道集合
private PollArrayWrapper pollWrapper;//存放所有文件描述對象(選擇key变逃,喚醒管道的源與sink通道)的集合
private final List threads = new ArrayList();//選擇操作線程集合
private final FdMap fdMap = new FdMap();//存放選擇key文件句柄與選擇key映射關(guān)系的Map
private final SubSelector subSelector = new SubSelector();//子選擇器
private int totalChannels;//注冊到選擇器的通道數(shù)量
private int threadsCount;//選擇線程數(shù)
private final Pipe wakeupPipe = Pipe.open();//喚醒等待選擇操的管道
private final int wakeupSourceFd;//喚醒管道源通道文件句柄
private final int wakeupSinkFd;//喚醒管道sink通道文件句柄
//四個同步鎖
private Object closeLock;//選擇器關(guān)閉同步鎖
private final Object interruptLock = new Object();//中斷同步鎖必逆,在喚醒選擇操作線程時,用于同步
private final StartLock startLock = new StartLock();//選擇操作開始鎖
private final FinishLock finishLock = new FinishLock();//選擇操作結(jié)束鎖
//初始化
WindowsSelectorImpl(SelectorProvider var1) throws IOException {
super(var1);
this.wakeupSourceFd = ((SelChImpl)this.wakeupPipe.source()).getFDVal();//獲取句柄
SinkChannelImpl var2 = (SinkChannelImpl)this.wakeupPipe.sink();
var2.sc.socket().setTcpNoDelay(true);
this.wakeupSinkFd = var2.getFDVal();//獲取句柄
this.pollWrapper.addWakeupSocket(this.wakeupSourceFd, 0);
}
fdMap:存放選擇key文件句柄與選擇key的HashMap
private WindowsSelectorImpl.MapEntry put(SelectionKeyImpl var1) {
return (WindowsSelectorImpl.MapEntry)this.put(new Integer(var1.channel.getFDVal()), new WindowsSelectorImpl.MapEntry(var1));
}
pollWrapper: 存放選擇key和通道及其相關(guān)興趣事件到本地內(nèi)存
private static final short EVENT_OFFSET = 4;//興趣事件開始位置
static short SIZE_POLLFD = 8;//句柄長度int(4)+興趣事件(4)
void addWakeupSocket(int var1//索引, int var2//句柄) {
this.putDescriptor(var2, var1);
this.putEventOps(var2, 1);
}
//將文件描述放在索引var2上
void putDescriptor(int var1, int var2) {
this.pollArray.putInt(SIZE_POLLFD * var1 + 0, var2);
}
//存放索引文件描述信息的興趣操作事件
void putEventOps(int var1, int var2) {
this.pollArray.putShort(SIZE_POLLFD * var1 + 4, (short)var2);
}
背景知識:
在JDK1.5 update10和linux core2.6以上版本揽乱,sun優(yōu)化了Selctor的實現(xiàn)名眉,底層使用epoll替換了select/poll。在linux2.6(準(zhǔn)確來說是2.5.44)由內(nèi)核直接支持的方法凰棉。epoll解決了select和poll的缺點损拢。epoll每次注冊新的事件到epoll中,會把所有的fd(文件標(biāo)識符)拷貝進內(nèi)核撒犀,而不是在等待的時候重復(fù)拷貝福压,保證了每個fd在整個過程中只會拷貝1次掏秩。epoll它所支持的fd上限是最大可以打開文件的數(shù)目,具體數(shù)目可以cat /proc/sys/fs/file-max查看荆姆,一般來說這個數(shù)目和系統(tǒng)內(nèi)存關(guān)系比較大蒙幻。epoll在注冊新的事件時,為每個fd指定一個回調(diào)函數(shù)胆筒,當(dāng)設(shè)備就緒的時候邮破,調(diào)用這個回調(diào)函數(shù),這個回調(diào)函數(shù)就會把就緒的fd加入一個就緒表中腐泻。(所以epoll實際只需要遍歷就緒表)决乎。
參考:Linux下I/O多路復(fù)用select, poll, epoll 三種模型的Python使用
doSelect方法:其中 subSelector.poll() 是select的核心,由native函數(shù)poll0實現(xiàn)派桩,SubSelector主要有兩個方法以poll從pollWrapper拉取關(guān)注讀寫事件的選擇key。每個SelectThread使用蚌斩,SubSelector從當(dāng)前注冊到選擇器的通道中選取SubSelector索引所對應(yīng)的批次的通道已經(jīng)就緒的通道并更新操作事件铆惑。整個選擇過程有startLock和finishLock來控制。再有在一個選擇操作的所有子選擇線程執(zhí)行完送膳,才釋放finishLock
protected int doSelect(long var1) throws IOException {
......
this.subSelector.poll();
.....
}
private int poll() throws IOException {
return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress, Math.min(WindowsSelectorImpl.this.totalChannels, 1024), this.readFds, this.writeFds, this.exceptFds, WindowsSelectorImpl.this.timeout);
}
private native int poll0(long var1, int var3, int[] var4, int[] var5, int[] var6, long var7);
SelectionKeyImpl保存注冊時的channel员魏、selector、event以及保存在pollWrapper的偏移位置index叠聋。
implRegister方法:首先同步關(guān)閉鎖撕阎,以防在注冊的過程中,選擇器被關(guān)閉碌补;檢查選擇器是否關(guān)閉虏束,沒有關(guān)閉,則檢查是否擴容厦章,需要則擴容為pollWrapper為原來的兩倍镇匀;檢查過后,添加選擇key到選擇器通道集合袜啃,設(shè)置key在選擇器通道集合的索引汗侵,添加選擇key到文件描述fdMap,添加key到key集合群发,將選擇key添加到文件描述信息及關(guān)注操作事件包裝集合pollWrapper晰韵,通道計數(shù)器自增。
protected void implRegister(SelectionKeyImpl selectionkeyimpl)
{
//同步關(guān)閉鎖熟妓,以防在注冊的過程中雪猪,選擇器被關(guān)閉
synchronized(closeLock)
{
if(pollWrapper == null)
//文件描述包裝集合為null,即選器已關(guān)閉
throw new ClosedSelectorException();
growIfNeeded();//
channelArray[totalChannels] = selectionkeyimpl;//添加到選擇器通道集合
selectionkeyimpl.setIndex(totalChannels);//設(shè)置key在選擇器通道集合的索引
fdMap.put(selectionkeyimpl);//添加選擇key到文件描述fdMap
keys.add(selectionkeyimpl);//添加key到key集合
//將選擇key添加到文件描述信息及關(guān)注操作事件包裝集合pollWrapper
pollWrapper.addEntry(totalChannels, selectionkeyimpl);
totalChannels++;//通道計數(shù)器自增
}
}
void addEntry(int var1, SelectionKeyImpl var2) {
//epoll每次注冊新的事件到epoll中滑蚯,會把所有的fd(文件標(biāo)識符)拷貝進內(nèi)核浪蹂,
this.putDescriptor(var1, var2.channel.getFDVal());
}
參考:
NIO源碼分析
深入淺出NIO Socket實現(xiàn)機制
三.總結(jié)
- Selector是通過implRegister方法把每次注冊新的SelectionKeyImpl事件拷貝到pollWrapper內(nèi)存數(shù)組中抵栈,通過doSelect()的native函數(shù)poll0()拉取讀寫就緒的SelectionKeyImpl事件,如果之前沒有發(fā)生事件坤次,程序就阻塞在select處古劲,當(dāng)然不會一直阻塞,因為epoll在timeout時間內(nèi)如果沒有事件缰猴,也會返回产艾。
- ServerSocketChannelImpl的初始化主要是初始化ServerSocket通道線程thread,地址綁定滑绒,接受連接同步鎖闷堡,默認創(chuàng)建ServerSocketChannelImpl的狀態(tài)為未初始化,文件描述和文件描述id疑故,如果使用本地地址杠览,則獲取本地地址。bind首先檢查ServerSocket是否關(guān)閉纵势,是否綁定地址踱阿,如果既沒有綁定也沒關(guān)閉,則檢查綁定的socketaddress是否正確或合法钦铁;然后通過Net工具類的bind(native)和listen(native)软舌,完成實際的ServerSocket地址綁定和開啟監(jiān)聽,如果綁定是開啟的參數(shù)小于1牛曹,則默認接受50個連接佛点。accept方法主要是調(diào)用accept0(native)方法接受連接,并根據(jù)接受來接
一旦有對應(yīng)的事件發(fā)生黎比,poll0方法就會返回超营。 - SocketChannelImpl構(gòu)造主要是初始化讀寫及狀態(tài)鎖和通道socket文件描述。
connect連接方法首先同步讀鎖和寫鎖焰手,確保socket通道打開糟描,并沒有連接;然后檢查socket地址的正確性與合法性书妻,然后檢查當(dāng)前線程是否有Connect方法的訪問控制權(quán)限船响,最后嘗試連接socket地址。從緩沖區(qū)讀取字節(jié)序列寫到通道write(ByteBuffer)躲履,首先確保通道打開见间,且輸出流沒有關(guān)閉,然后委托給IOUtil寫字節(jié)序列工猜;IOUtil寫字節(jié)流過程為首先通過Util從當(dāng)前線程的緩沖區(qū)獲取可以容下字節(jié)序列的臨時緩沖區(qū)(DirectByteBuffer)米诉,如果沒有則創(chuàng)建一個DirectByteBuffer,將字節(jié)序列寫到臨時的DirectByteBuffer中篷帅,然后將寫操作委托給nativedispatcher(SocketDispatcher)史侣,將DirectByteBuffer添加到當(dāng)前線程的緩沖區(qū)拴泌, 以便重用,因為DirectByteBuffer實際上是存在物理內(nèi)存中惊橱,頻繁的分配將會消耗更多的資源蚪腐。
channel的源碼筆者還未細看,有興趣的可以看參考部分税朴。
參考:SocketChannelImpl 解析一(通道連接回季,發(fā)送數(shù)據(jù))
四.實例(聊天室的實現(xiàn))
public class TestNonBlockingNIO1 {
//客戶端
@Test
public void client() throws IOException{
//1. 獲取通道
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
//2. 切換非阻塞模式
sChannel.configureBlocking(false);
//3. 分配緩沖區(qū)
ByteBuffer buf = ByteBuffer.allocate(1024);
//4. 發(fā)送數(shù)據(jù)給服務(wù)端
Scanner scan = new Scanner(System.in);
while(scan.hasNext()){
String str = scan.next();
buf.put((new Date().toString() + "\n" + str).getBytes());
buf.flip();
sChannel.write(buf);
buf.clear();
}
//5. 關(guān)閉通道
sChannel.close();
}
//服務(wù)端
@Test
public void server() throws IOException{
//1. 獲取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
//2. 切換非阻塞模式
ssChannel.configureBlocking(false);
//3. 綁定端口號
ssChannel.bind(new InetSocketAddress(9898));
//4. 獲取選擇器
Selector selector = Selector.open();
//5. 將通道注冊到選擇器上, 并且指定“監(jiān)聽事件”
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
//6. 輪詢監(jiān)聽選擇器上的“準(zhǔn)備就緒”的事件
while(selector.select() > 0){
//7. 獲取當(dāng)前選擇器上所有“準(zhǔn)備就緒”的選擇鍵(監(jiān)聽事件)
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while(it.hasNext()){
//8. 獲取當(dāng)前準(zhǔn)備就緒的選擇鍵
SelectionKey sk = it.next();
//9. 判斷具體是哪個事件“準(zhǔn)備就緒”
if(sk.isAcceptable()){
//10.若接收狀態(tài)就緒,獲取當(dāng)前客戶端的連接
SocketChannel sChannel = ssChannel.accept();
//11.切換非阻塞式
sChannel.configureBlocking(false);
//12.將該通道注冊到選擇器上
sChannel.register(selector, SelectionKey.OP_READ);
}else if(sk.isReadable()){
//13.若“讀就緒”正林,獲取當(dāng)前選擇器上就緒狀態(tài)的通道
SocketChannel sChannel = (SocketChannel) sk.channel();
//14.讀取數(shù)據(jù)
ByteBuffer buf = ByteBuffer.allocate(1024);
int len = 0;
while((len = sChannel.read(buf)) > 0){
buf.flip();
System.out.println(new String(buf.array(), 0, len));
buf.clear();
}
}
//15.取消選擇鍵
it.remove();
}
}
}
}