五.Selector

一.Selector是什么

  • Selector選擇器類管理著一個被注冊的通道集合的信息和它們的就緒狀態(tài)铺然。通道是和選擇器一起被注冊的,并且使用選擇器來更新通道的就緒狀態(tài)赡鲜。
  • 一個通道可以被注冊到多個Selector選擇器上飞崖,但對每個選擇器而言只能被注冊一次。
  • SelectionKey:選擇鍵封裝了特定的通道與特定的Selector選擇器的注冊關(guān)系
connect:客戶端連接服務(wù)端事件隘截,對應(yīng)值為SelectionKey.OP_CONNECT
accept:服務(wù)端接收客戶端連接事件,對應(yīng)值為SelectionKey.OP_ACCEPT
read:讀事件,對應(yīng)值為SelectionKey.OP_READ
write:寫事件婶芭,對應(yīng)值為SelectionKey.OP_WRITE

二.層次圖

image.png
  • 1.Selector實現(xiàn)open方法东臀,我們發(fā)現(xiàn)SelectorProvider類。SocketChannel犀农、ServerSocketChannel和Selector的實例初始化都通過SelectorProvider類實現(xiàn)
  public static Selector open() throws IOException {  
        return SelectorProvider.provider().openSelector();  
    } 
  • 2.AbstractSelector取消的key放在一個set集合中惰赋,對集合進行添加操作時,必須同步取消key set集合呵哨。反注冊選擇key完成的實際工作是赁濒,將key從key對應(yīng)的通道的選擇key數(shù)組中移除。
private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();
 void cancel(SelectionKey k) {                       // package-private
        synchronized (cancelledKeys) {
            cancelledKeys.add(k);
        }
    }
  • 3.SelectorImpl:
    • 其他線程獲取選擇器的就緒key和key集合孟害,實際上返回的是key集合的代理publicKeys和就緒key集合的代理publicSelectedKeys拒炎。
protected Set selectedKeys;//已經(jīng)操作事件準(zhǔn)備就緒的選擇key(為解決1.4bug而存在) 
protected HashSet keys;//與選擇器關(guān)聯(lián)的key集合(為解決1.4bug而存在)  
private Set publicKeys;//外部訪問key集合的代理,真正使用的  
private Set publicSelectedKeys;//外部訪問就緒key集合代理,真正使用的    ;
public Set<SelectionKey> selectedKeys() {...return this.publicSelectedKeys; }}
public Set<SelectionKey> keys() { ....return this.publicKeys;}
  • select方法(準(zhǔn)備就緒事件數(shù)):委托給為lockAndDoSelect同步方法挨务,獲取key集合代理publicKeys和就緒key代理集合publicSelectedKeys击你,然后交給doSelect(long l)方法,這個方法為抽象方法谎柄,待子類擴展丁侄。
 public int select() throws IOException {
        return this.select(0L);
    }
  private int lockAndDoSelect(long var1//**超時時間**/) throws IOException {
        synchronized(this) {
            if(!this.isOpen()) {
                throw new ClosedSelectorException();
            } else {
                Set var4 = this.publicKeys;
                int var10000;
                synchronized(this.publicKeys) {
                    Set var5 = this.publicSelectedKeys;
                    synchronized(this.publicSelectedKeys) {
                        var10000 = this.doSelect(var1);
                    }
                }

                return var10000;
            }
        }
    }
 protected abstract int doSelect(long var1) throws IOException;
  • register方法:
protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {
        if(!(var1 instanceof SelChImpl)) {
            throw new IllegalSelectorException();
        } else {
            SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);
           //設(shè)置key的附加物  
            var4.attach(var3);
            Set var5 = this.publicKeys;
            synchronized(this.publicKeys) {
            //完成實際的注冊工作 
                this.implRegister(var4);
            }
            //設(shè)置key的興趣事件集 ,var2是 SelectionKey關(guān)系值
            var4.interestOps(var2);
            return var4;
        }
    }
    //待子類實現(xiàn)  
   protected abstract void implRegister(SelectionKeyImpl selectionkeyimpl);

背景知識

1.FileDescriptor:文件描述符用于描述系統(tǒng)底層的特殊的結(jié)構(gòu)句柄,可以被用來表示開放文件谷誓、開放
套接字等,包含 private int fd;//文件描述值 和private long handle;//初始化文件描述句柄
2.句柄:標(biāo)識應(yīng)用程序中的不同對象應(yīng)用程序能夠通過句柄訪問相應(yīng)的對象的信息绒障,但是句柄不是指針,程序不能利用句柄來直接閱讀文件中的信息捍歪。如果句柄不在IO文件中户辱,它是毫無用處的。 句柄是Windows用來標(biāo)志應(yīng)用程序中建立的或是使用的唯一整數(shù)糙臼,大量使用了句柄來標(biāo)識對象庐镐。

  • 4.WindowsSelectorImpl
    全局變量
    private final int INIT_CAP = 8;//選擇key集合,key包裝集合初始化容量  
    private static final int MAX_SELECTABLE_FDS = 1024;//最大選擇key數(shù)量  
    private SelectionKeyImpl channelArray[];//選擇器關(guān)聯(lián)通道集合  
    private PollArrayWrapper pollWrapper;//存放所有文件描述對象(選擇key变逃,喚醒管道的源與sink通道)的集合  
   private final List threads = new ArrayList();//選擇操作線程集合  
    private final FdMap fdMap = new FdMap();//存放選擇key文件句柄與選擇key映射關(guān)系的Map  
   private final SubSelector subSelector = new SubSelector();//子選擇器  
    private int totalChannels;//注冊到選擇器的通道數(shù)量  
    private int threadsCount;//選擇線程數(shù)  
    private final Pipe wakeupPipe = Pipe.open();//喚醒等待選擇操的管道  
    private final int wakeupSourceFd;//喚醒管道源通道文件句柄
    private final int wakeupSinkFd;//喚醒管道sink通道文件句柄
  //四個同步鎖
    private Object closeLock;//選擇器關(guān)閉同步鎖  
    private final Object interruptLock = new Object();//中斷同步鎖必逆,在喚醒選擇操作線程時,用于同步  
    private final StartLock startLock = new StartLock();//選擇操作開始鎖  
    private final FinishLock finishLock = new FinishLock();//選擇操作結(jié)束鎖  
  //初始化
 WindowsSelectorImpl(SelectorProvider var1) throws IOException {
        super(var1);
        this.wakeupSourceFd = ((SelChImpl)this.wakeupPipe.source()).getFDVal();//獲取句柄
        SinkChannelImpl var2 = (SinkChannelImpl)this.wakeupPipe.sink();
        var2.sc.socket().setTcpNoDelay(true);
        this.wakeupSinkFd = var2.getFDVal();//獲取句柄
        this.pollWrapper.addWakeupSocket(this.wakeupSourceFd, 0);
    }

fdMap:存放選擇key文件句柄與選擇key的HashMap

   private WindowsSelectorImpl.MapEntry put(SelectionKeyImpl var1) {
            return (WindowsSelectorImpl.MapEntry)this.put(new Integer(var1.channel.getFDVal()), new WindowsSelectorImpl.MapEntry(var1));
        }

pollWrapper: 存放選擇key和通道及其相關(guān)興趣事件到本地內(nèi)存

 private static final short EVENT_OFFSET = 4;//興趣事件開始位置  
 static short SIZE_POLLFD = 8;//句柄長度int(4)+興趣事件(4) 
 void addWakeupSocket(int var1//索引, int var2//句柄) {
        this.putDescriptor(var2, var1);
        this.putEventOps(var2, 1);
    }
//將文件描述放在索引var2上
void putDescriptor(int var1, int var2) {
        this.pollArray.putInt(SIZE_POLLFD * var1 + 0, var2);
    }
//存放索引文件描述信息的興趣操作事件 
void putEventOps(int var1, int var2) {
        this.pollArray.putShort(SIZE_POLLFD * var1 + 4, (short)var2);
    }

image.png

背景知識
在JDK1.5 update10和linux core2.6以上版本揽乱,sun優(yōu)化了Selctor的實現(xiàn)名眉,底層使用epoll替換了select/poll。在linux2.6(準(zhǔn)確來說是2.5.44)由內(nèi)核直接支持的方法凰棉。epoll解決了select和poll的缺點损拢。epoll每次注冊新的事件到epoll中,會把所有的fd(文件標(biāo)識符)拷貝進內(nèi)核撒犀,而不是在等待的時候重復(fù)拷貝福压,保證了每個fd在整個過程中只會拷貝1次掏秩。epoll它所支持的fd上限是最大可以打開文件的數(shù)目,具體數(shù)目可以cat /proc/sys/fs/file-max查看荆姆,一般來說這個數(shù)目和系統(tǒng)內(nèi)存關(guān)系比較大蒙幻。epoll在注冊新的事件時,為每個fd指定一個回調(diào)函數(shù)胆筒,當(dāng)設(shè)備就緒的時候邮破,調(diào)用這個回調(diào)函數(shù),這個回調(diào)函數(shù)就會把就緒的fd加入一個就緒表中腐泻。(所以epoll實際只需要遍歷就緒表)决乎。
參考:Linux下I/O多路復(fù)用select, poll, epoll 三種模型的Python使用

doSelect方法:其中 subSelector.poll() 是select的核心,由native函數(shù)poll0實現(xiàn)派桩,SubSelector主要有兩個方法以poll從pollWrapper拉取關(guān)注讀寫事件的選擇key。每個SelectThread使用蚌斩,SubSelector從當(dāng)前注冊到選擇器的通道中選取SubSelector索引所對應(yīng)的批次的通道已經(jīng)就緒的通道并更新操作事件铆惑。整個選擇過程有startLock和finishLock來控制。再有在一個選擇操作的所有子選擇線程執(zhí)行完送膳,才釋放finishLock

 protected int doSelect(long var1) throws IOException {
       ......
       this.subSelector.poll();        
       .....
    }
 private int poll() throws IOException {
        return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress, Math.min(WindowsSelectorImpl.this.totalChannels, 1024), this.readFds, this.writeFds, this.exceptFds, WindowsSelectorImpl.this.timeout);
 }
 private native int poll0(long var1, int var3, int[] var4, int[] var5, int[] var6, long var7);

SelectionKeyImpl保存注冊時的channel员魏、selector、event以及保存在pollWrapper的偏移位置index叠聋。

implRegister方法:首先同步關(guān)閉鎖撕阎,以防在注冊的過程中,選擇器被關(guān)閉碌补;檢查選擇器是否關(guān)閉虏束,沒有關(guān)閉,則檢查是否擴容厦章,需要則擴容為pollWrapper為原來的兩倍镇匀;檢查過后,添加選擇key到選擇器通道集合袜啃,設(shè)置key在選擇器通道集合的索引汗侵,添加選擇key到文件描述fdMap,添加key到key集合群发,將選擇key添加到文件描述信息及關(guān)注操作事件包裝集合pollWrapper晰韵,通道計數(shù)器自增。

protected void implRegister(SelectionKeyImpl selectionkeyimpl)  
{  
    //同步關(guān)閉鎖熟妓,以防在注冊的過程中雪猪,選擇器被關(guān)閉  
    synchronized(closeLock)  
    {  
        if(pollWrapper == null)  
            //文件描述包裝集合為null,即選器已關(guān)閉  
            throw new ClosedSelectorException();  
        growIfNeeded();//  
        channelArray[totalChannels] = selectionkeyimpl;//添加到選擇器通道集合  
        selectionkeyimpl.setIndex(totalChannels);//設(shè)置key在選擇器通道集合的索引  
        fdMap.put(selectionkeyimpl);//添加選擇key到文件描述fdMap  
        keys.add(selectionkeyimpl);//添加key到key集合  
     //將選擇key添加到文件描述信息及關(guān)注操作事件包裝集合pollWrapper  
        pollWrapper.addEntry(totalChannels, selectionkeyimpl);  
        totalChannels++;//通道計數(shù)器自增  
    }  
}  
void addEntry(int var1, SelectionKeyImpl var2) {
      //epoll每次注冊新的事件到epoll中滑蚯,會把所有的fd(文件標(biāo)識符)拷貝進內(nèi)核浪蹂,
        this.putDescriptor(var1, var2.channel.getFDVal());
    }

參考:
NIO源碼分析
深入淺出NIO Socket實現(xiàn)機制

三.總結(jié)

  • Selector是通過implRegister方法把每次注冊新的SelectionKeyImpl事件拷貝到pollWrapper內(nèi)存數(shù)組中抵栈,通過doSelect()的native函數(shù)poll0()拉取讀寫就緒的SelectionKeyImpl事件,如果之前沒有發(fā)生事件坤次,程序就阻塞在select處古劲,當(dāng)然不會一直阻塞,因為epoll在timeout時間內(nèi)如果沒有事件缰猴,也會返回产艾。
  • ServerSocketChannelImpl的初始化主要是初始化ServerSocket通道線程thread,地址綁定滑绒,接受連接同步鎖闷堡,默認創(chuàng)建ServerSocketChannelImpl的狀態(tài)為未初始化,文件描述和文件描述id疑故,如果使用本地地址杠览,則獲取本地地址。bind首先檢查ServerSocket是否關(guān)閉纵势,是否綁定地址踱阿,如果既沒有綁定也沒關(guān)閉,則檢查綁定的socketaddress是否正確或合法钦铁;然后通過Net工具類的bind(native)和listen(native)软舌,完成實際的ServerSocket地址綁定和開啟監(jiān)聽,如果綁定是開啟的參數(shù)小于1牛曹,則默認接受50個連接佛点。accept方法主要是調(diào)用accept0(native)方法接受連接,并根據(jù)接受來接
    一旦有對應(yīng)的事件發(fā)生黎比,poll0方法就會返回超营。
  • SocketChannelImpl構(gòu)造主要是初始化讀寫及狀態(tài)鎖和通道socket文件描述。
    connect連接方法首先同步讀鎖和寫鎖焰手,確保socket通道打開糟描,并沒有連接;然后檢查socket地址的正確性與合法性书妻,然后檢查當(dāng)前線程是否有Connect方法的訪問控制權(quán)限船响,最后嘗試連接socket地址。從緩沖區(qū)讀取字節(jié)序列寫到通道write(ByteBuffer)躲履,首先確保通道打開见间,且輸出流沒有關(guān)閉,然后委托給IOUtil寫字節(jié)序列工猜;IOUtil寫字節(jié)流過程為首先通過Util從當(dāng)前線程的緩沖區(qū)獲取可以容下字節(jié)序列的臨時緩沖區(qū)(DirectByteBuffer)米诉,如果沒有則創(chuàng)建一個DirectByteBuffer,將字節(jié)序列寫到臨時的DirectByteBuffer中篷帅,然后將寫操作委托給nativedispatcher(SocketDispatcher)史侣,將DirectByteBuffer添加到當(dāng)前線程的緩沖區(qū)拴泌, 以便重用,因為DirectByteBuffer實際上是存在物理內(nèi)存中惊橱,頻繁的分配將會消耗更多的資源蚪腐。
    channel的源碼筆者還未細看,有興趣的可以看參考部分税朴。
    參考:SocketChannelImpl 解析一(通道連接回季,發(fā)送數(shù)據(jù))

四.實例(聊天室的實現(xiàn))

public class TestNonBlockingNIO1 {

    //客戶端
    @Test
    public void client() throws IOException{
        //1. 獲取通道
        SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));

        //2. 切換非阻塞模式
        sChannel.configureBlocking(false);

        //3. 分配緩沖區(qū)
        ByteBuffer buf = ByteBuffer.allocate(1024);

        //4. 發(fā)送數(shù)據(jù)給服務(wù)端
        Scanner scan = new Scanner(System.in);

        while(scan.hasNext()){
            String str = scan.next();

            buf.put((new Date().toString() + "\n" + str).getBytes());
            buf.flip();
            sChannel.write(buf);
            buf.clear();
        }

        //5. 關(guān)閉通道
        sChannel.close();
    }

    //服務(wù)端
    @Test
    public void server() throws IOException{
        //1. 獲取通道
        ServerSocketChannel ssChannel = ServerSocketChannel.open();

        //2. 切換非阻塞模式
        ssChannel.configureBlocking(false);

        //3. 綁定端口號
        ssChannel.bind(new InetSocketAddress(9898));

        //4. 獲取選擇器
        Selector selector = Selector.open();

        //5. 將通道注冊到選擇器上, 并且指定“監(jiān)聽事件”
        ssChannel.register(selector, SelectionKey.OP_ACCEPT);

        //6. 輪詢監(jiān)聽選擇器上的“準(zhǔn)備就緒”的事件
        while(selector.select() > 0){

            //7. 獲取當(dāng)前選擇器上所有“準(zhǔn)備就緒”的選擇鍵(監(jiān)聽事件)
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();

            while(it.hasNext()){
                //8. 獲取當(dāng)前準(zhǔn)備就緒的選擇鍵
                SelectionKey sk = it.next();

                //9. 判斷具體是哪個事件“準(zhǔn)備就緒”
                if(sk.isAcceptable()){
                    //10.若接收狀態(tài)就緒,獲取當(dāng)前客戶端的連接
                    SocketChannel sChannel = ssChannel.accept();

                    //11.切換非阻塞式
                    sChannel.configureBlocking(false);

                    //12.將該通道注冊到選擇器上
                    sChannel.register(selector, SelectionKey.OP_READ);
                }else if(sk.isReadable()){
                    //13.若“讀就緒”正林,獲取當(dāng)前選擇器上就緒狀態(tài)的通道
                    SocketChannel sChannel = (SocketChannel) sk.channel();

                    //14.讀取數(shù)據(jù)
                    ByteBuffer buf = ByteBuffer.allocate(1024);

                    int len = 0;
                    while((len = sChannel.read(buf)) > 0){
                        buf.flip();
                        System.out.println(new String(buf.array(), 0, len));
                        buf.clear();
                    }
                }

                //15.取消選擇鍵
                it.remove();
            }
        }
    }
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末泡一,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子觅廓,更是在濱河造成了極大的恐慌鼻忠,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,729評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件杈绸,死亡現(xiàn)場離奇詭異粥烁,居然都是意外死亡,警方通過查閱死者的電腦和手機蝇棉,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,226評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來芥永,“玉大人篡殷,你說我怎么就攤上這事÷窠В” “怎么了板辽?”我有些...
    開封第一講書人閱讀 169,461評論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長棘催。 經(jīng)常有香客問我劲弦,道長,這世上最難降的妖魔是什么醇坝? 我笑而不...
    開封第一講書人閱讀 60,135評論 1 300
  • 正文 為了忘掉前任邑跪,我火速辦了婚禮,結(jié)果婚禮上呼猪,老公的妹妹穿的比我還像新娘画畅。我一直安慰自己,他們只是感情好宋距,可當(dāng)我...
    茶點故事閱讀 69,130評論 6 398
  • 文/花漫 我一把揭開白布轴踱。 她就那樣靜靜地躺著,像睡著了一般谚赎。 火紅的嫁衣襯著肌膚如雪淫僻。 梳的紋絲不亂的頭發(fā)上诱篷,一...
    開封第一講書人閱讀 52,736評論 1 312
  • 那天,我揣著相機與錄音雳灵,去河邊找鬼棕所。 笑死,一個胖子當(dāng)著我的面吹牛细办,可吹牛的內(nèi)容都是我干的橙凳。 我是一名探鬼主播,決...
    沈念sama閱讀 41,179評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼笑撞,長吁一口氣:“原來是場噩夢啊……” “哼岛啸!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起茴肥,我...
    開封第一講書人閱讀 40,124評論 0 277
  • 序言:老撾萬榮一對情侶失蹤坚踩,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后瓤狐,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體瞬铸,經(jīng)...
    沈念sama閱讀 46,657評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,723評論 3 342
  • 正文 我和宋清朗相戀三年础锐,在試婚紗的時候發(fā)現(xiàn)自己被綠了嗓节。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,872評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡皆警,死狀恐怖拦宣,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情信姓,我是刑警寧澤鸵隧,帶...
    沈念sama閱讀 36,533評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站意推,受9級特大地震影響豆瘫,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜菊值,卻給世界環(huán)境...
    茶點故事閱讀 42,213評論 3 336
  • 文/蒙蒙 一外驱、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧俊性,春花似錦略步、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,700評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至典徊,卻和暖如春杭煎,著一層夾襖步出監(jiān)牢的瞬間恩够,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,819評論 1 274
  • 我被黑心中介騙來泰國打工羡铲, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蜂桶,地道東北人。 一個月前我還...
    沈念sama閱讀 49,304評論 3 379
  • 正文 我出身青樓也切,卻偏偏與公主長得像扑媚,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子雷恃,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,876評論 2 361

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理疆股,服務(wù)發(fā)現(xiàn),斷路器倒槐,智...
    卡卡羅2017閱讀 134,716評論 18 139
  • 作者: 一字馬胡 轉(zhuǎn)載標(biāo)志 【2017-11-24】 更新日志 一旬痹、Java OIO Java OIO (Jav...
    一字馬胡閱讀 1,357評論 0 12
  • Java NIO(New IO)是從Java 1.4版本開始引入的一個新的IO API,可以替代標(biāo)準(zhǔn)的Java I...
    JackChen1024閱讀 7,557評論 1 143
  • NIO 操作系統(tǒng)背景知識 unix提供了5中io模型讨越,其中java的底層實現(xiàn)依賴的是操作系統(tǒng)的io復(fù)用模型两残。lin...
    江江的大豬閱讀 557評論 0 2
  • 生活中總有許多我們搞不清楚的事情,那么在別人忙著自己的事情顧不上我們的時候把跨,你會大膽向他們詢問嗎人弓? 如果是我來回答...
    一只正在成長的獅子閱讀 245評論 1 2