Mina框架會(huì)話讀寫源碼分析

一個(gè)IoSession的I/O事件是注冊(cè)在一個(gè)Selector對(duì)象上儡循,并且每個(gè)Processor線程只輪詢一個(gè)Selector對(duì)象混滔,即每一個(gè)鏈接只有一個(gè)線程處理I/O事件洒疚,這樣能保證同一IoSession數(shù)據(jù)的有序性。

下面就從部分源碼探究其中的原理坯屿,以NioAcceptor為例子:

public NioSocketAcceptor() {
        super(new DefaultSocketSessionConfig(), NioProcessor.class);
        ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
    }

這里的NioProcessor.class就是Processor的具體類型油湖。

protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
        this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true, null);
    }

SimpleIoProcessorPool是Processor的線程池,使用NioProcessor創(chuàng)建具體的線程领跛。

跳過Acceptor的初始化過程乏德,當(dāng)客戶端請(qǐng)求建立鏈接,服務(wù)端Acceptor線程會(huì)執(zhí)行以下代碼:

private void processHandles(Iterator<H> handles) throws Exception {
            while (handles.hasNext()) {
                H handle = handles.next();
                handles.remove();

                // Associates a new created connection to a processor,
                // and get back a session
                S session = accept(processor, handle); //這里的processor是processor線程池
                if (session == null) {
                    continue;
                }
                initSession(session, null, null);
                // add the session to the SocketIoProcessor
                session.getProcessor().add(session);
            }
        }

@Override
    protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {
        SelectionKey key = null;
        if (handle != null) {
            key = handle.keyFor(selector);
        }
        if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
            return null;
        }
        // accept the connection from the client
        SocketChannel ch = handle.accept();
        if (ch == null) {
            return null;
        }
        return new NioSocketSession(this, processor, ch);
    }

這里創(chuàng)建了NioSocketSession將Processor線程池與SocketChannel綁定在一起。然后通過 session.getProcessor().add(session)將會(huì)話注冊(cè)到SimpleIoProcessorPool線程池中的一個(gè)Processor對(duì)象內(nèi)部的Selector對(duì)象喊括。

為什么這里的processor是線程池胧瓜?還記得NioSocketAcceptor的構(gòu)造函數(shù)中的SimpleIoProcessorPool,processor就是它的實(shí)例郑什。

看以下NioSocketSession的getProcessor()方法:

public IoProcessor<NioSession> getProcessor() {
        return processor;
    }

返回的就是與它關(guān)聯(lián)的SimpleIoProcessorPool線程池對(duì)象.再看SimpleIoProcessorPool的addI()方法:

public final void add(S session) {
        getProcessor(session).add(session);
    }


 private IoProcessor<S> getProcessor(S session) {
        IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);
        if (processor == null) {
            if (disposed || disposing) {
                throw new IllegalStateException("A disposed processor cannot be accessed.");
            }
            processor = pool[Math.abs((int) session.getId()) % pool.length];
            if (processor == null) {
                throw new IllegalStateException("A disposed processor cannot be accessed.");
            }
            session.setAttributeIfAbsent(PROCESSOR, processor);
        }
        return processor;
    }

getProcessor()這個(gè)方法是SimpleIoProcessorPool中的府喳,負(fù)責(zé)根據(jù)Session返回一個(gè)與之關(guān)聯(lián)的Processor線程,這里用了session id對(duì)線程池中的線程總數(shù)取模的算法蘑拯。與Session關(guān)聯(lián)的Processor被添加到Session的Attribute中以便下次直接取出劫拢。

到這一部還沒有看到Session內(nèi)部的SocketChannel的IO事件是怎么注冊(cè)到Processor線程的Selector對(duì)象上的,繼續(xù)分析Processor的add()方法:

@Override
    public final void add(S session) {
        if (disposed || disposing) {
            throw new IllegalStateException("Already disposed.");
        }
        // Adds the session to the newSession queue and starts the worker
        newSessions.add(session);
        startupProcessor();
    }

private void startupProcessor() {
        Processor processor = processorRef.get();
        if (processor == null) {
            processor = new Processor();
            if (processorRef.compareAndSet(null, processor)) {
                executor.execute(new NamePreservingRunnable(processor, threadName));
            }
        }
        // Just stop the select() and start it again, so that the processor
        // can be activated immediately.
        wakeup();
    }

//NamePreservingRunnable的run方法强胰,顯示給線程命名,然后執(zhí)行Processor的run方法妹沙。
public void run() {
        Thread currentThread = Thread.currentThread();
        String oldName = currentThread.getName();

        if (newName != null) {
            setName(currentThread, newName);
        }

        try {
            runnable.run();
        } finally {
            setName(currentThread, oldName);
        }
    }

private class Processor implements Runnable {
        public void run() {
            assert (processorRef.get() == this);

            int nSessions = 0;
            lastIdleCheckTime = System.currentTimeMillis();
            int nbTries = 10;

            for (;;) {
                try {
                    ...
                    int selected = select(SELECT_TIMEOUT);
                    ...
                    nSessions += handleNewSessions();
                    ...
                    if (selected > 0) {
                        // LOG.debug("Processing ..."); // This log hurts one of
                        // the MDCFilter test...
                        process();
                    }
                    ...
               
                    }
                } catch (ClosedSelectorException cse) {
                    ExceptionMonitor.getInstance().exceptionCaught(cse);
                    break;
                } catch (Exception e) {
                    ExceptionMonitor.getInstance().exceptionCaught(e);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        ExceptionMonitor.getInstance().exceptionCaught(e1);
                    }
                }
            }
        }
    }

這里開始顯露一些馬腳了偶洋,先是把session添加到newSessions這個(gè)隊(duì)列中。然后建立了Processor實(shí)例距糖,這就是具體的Processor線程玄窝。通過executor的execute()方法先是執(zhí)行了NamePreservingRunnable的run()方法,其內(nèi)部執(zhí)行了Processor的run()方法悍引。

執(zhí)行Processor的run()中的select()其實(shí)就是調(diào)用其內(nèi)部Selector對(duì)象的select()方法恩脂,會(huì)導(dǎo)致Processor線程的阻塞:

protected int select(long timeout) throws Exception {
        return selector.select(timeout);
    }

然后調(diào)用了Processor內(nèi)部的Selector對(duì)象的wakeup()方法,wakeup()這個(gè)方法是當(dāng)Selector對(duì)象執(zhí)行select()方法阻塞時(shí),立即返回趣斤。

@Override
    protected void wakeup() {
        wakeupCalled.getAndSet(true);
        selector.wakeup();
    }

于是后續(xù)就執(zhí)行了:

private int handleNewSessions() {
        int addedSessions = 0;
        for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
            if (addNow(session)) {
                // A new session has been created
                addedSessions++;
            }
        }
        return addedSessions;
    }

private boolean addNow(S session) {
        boolean registered = false;

        try {
            init(session);
            registered = true;

            // Build the filter chain of this session.
            IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
            chainBuilder.buildFilterChain(session.getFilterChain());

            // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
            // in AbstractIoFilterChain.fireSessionOpened().
            // Propagate the SESSION_CREATED event up to the chain
            IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
            listeners.fireSessionCreated(session);
        } catch (Exception e) {
            ExceptionMonitor.getInstance().exceptionCaught(e);

            try {
                destroy(session);
            } catch (Exception e1) {
                ExceptionMonitor.getInstance().exceptionCaught(e1);
            } finally {
                registered = false;
            }
        }

        return registered;
    }

@Override
    protected void init(NioSession session) throws Exception {
        SelectableChannel ch = (SelectableChannel) session.getChannel();
        ch.configureBlocking(false);
        session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
    }

到此終于理清了俩块,Processor線程先是阻塞的,由Acceptor線程把session添加到newSessions隊(duì)列浓领,然后通過wakeup將Processor從Selector對(duì)象的select()方法返回執(zhí)行到handleNewSessions()方法玉凯,此方法會(huì)取出newSessions隊(duì)列中的session然后通過addNow()方法執(zhí)行NioProcessor的init()方法,由init()方法將session中的Channel的OP_READ事件注冊(cè)到Selector對(duì)象上。

所以一個(gè)IoSession對(duì)應(yīng)的是一個(gè)Proceccor線程联贩,也是一個(gè)Selector對(duì)象漫仆,每個(gè)IoSession的讀取數(shù)據(jù)處理一定是同步的。

既然有讀就一定有寫泪幌,記得上述代碼中有一段:

private void processHandles(Iterator<H> handles) throws Exception {
            while (handles.hasNext()) {
                H handle = handles.next();
                handles.remove();

                // Associates a new created connection to a processor,
                // and get back a session
                S session = accept(processor, handle); //這里的processor是processor線程池
                if (session == null) {
                    continue;
                }
                initSession(session, null, null);
                // add the session to the SocketIoProcessor
                session.getProcessor().add(session);
            }
        }

重點(diǎn)是initSession()方法:

 protected final void initSession(IoSession session, IoFuture future, IoSessionInitializer sessionInitializer) {
...
((AbstractIoSession) session).setWriteRequestQueue(session.getService().getSessionDataStructureFactory()
                    .getWriteRequestQueue(session));
...
}

這里為session添加了WriteRequestQueue其實(shí)就是session的消息寫入隊(duì)列盲厌,當(dāng)session被暫停或者WriteRequestQueue隊(duì)列非空寫入的消息會(huì)添加到這個(gè)隊(duì)列里:

if (!s.isWriteSuspended()) {
                if (writeRequestQueue.isEmpty(session)) {
                    // We can write directly the message
                    s.getProcessor().write(s, writeRequest);
                } else {
                    s.getWriteRequestQueue().offer(s, writeRequest);
                    s.getProcessor().flush(s);
                }
            } else {
                s.getWriteRequestQueue().offer(s, writeRequest);
            }

而如果隊(duì)列是空的則會(huì)執(zhí)行write()方法祸泪,其實(shí)也是將寫入請(qǐng)求插入隊(duì)列然后直接執(zhí)行flush()方法吗浩。

 @Override
    public void write(S session, WriteRequest writeRequest) {
        WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();

        writeRequestQueue.offer(session, writeRequest);

        if (!session.isWriteSuspended()) {
            this.flush(session);
        }
    }

flush()方法會(huì)在flushingSessions隊(duì)列添加session并通過wakeup()方法將Processor線程從阻塞中恢復(fù):

@Override
    public final void flush(S session) {
        // add the session to the queue if it's not already
        // in the queue, then wake up the select()
        if (session.setScheduledForFlush(true)) {
            flushingSessions.add(session);
            wakeup();
        }
    }

在Processor線程中會(huì)執(zhí)行flush(long currentTime)方法,依次取出隊(duì)列的每個(gè)session,注意這里的隊(duì)列是ConcurrentLinkedQueue浴滴,所以不管在任何線程調(diào)用IoSession的write()方法寫入消息拓萌,最終都會(huì)同步的插入到這個(gè)隊(duì)列。

通過flushNow(session, currentTime)方法先是取出session的WriteRequestQueue隊(duì)列(每個(gè)session都有一個(gè)寫入消息的同步隊(duì)列)升略,然后依次取出其中的寫消息請(qǐng)求微王,然后調(diào)用writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)屡限,最終調(diào)用write(NioSession session, IoBuffer buf, int length) 通過session關(guān)聯(lián)的Channel的write()方法將字節(jié)流發(fā)送。由于代碼過多只貼出最終部分:

@Override
    protected int write(NioSession session, IoBuffer buf, int length) throws IOException {
        if (buf.remaining() <= length) {
            return session.getChannel().write(buf.buf());
        }

        int oldLimit = buf.limit();
        buf.limit(buf.position() + length);
        try {
            return session.getChannel().write(buf.buf());
        } finally {
            buf.limit(oldLimit);
        }
    }

到此分析Processor線程讀寫終于結(jié)束了炕倘,可以得出結(jié)論钧大,會(huì)話的讀寫都是在Processor線程池中的一個(gè)Processor線程執(zhí)行的。其中讀消息是按事件順序依次完成的罩旋,寫消息可以由多個(gè)線程同時(shí)寫啊央,但是寫入的請(qǐng)求一定是同步地插入到Session地寫消息隊(duì)列中,然后由Processor線程按順序依次完成發(fā)送涨醋。擔(dān)心Mina框架讀寫的并發(fā)問題可以打住了瓜饥。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市浴骂,隨后出現(xiàn)的幾起案子乓土,更是在濱河造成了極大的恐慌,老刑警劉巖溯警,帶你破解...
    沈念sama閱讀 206,602評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件趣苏,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡梯轻,警方通過查閱死者的電腦和手機(jī)食磕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來喳挑,“玉大人彬伦,你說我怎么就攤上這事∫了校” “怎么了媚朦?”我有些...
    開封第一講書人閱讀 152,878評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)日戈。 經(jīng)常有香客問我询张,道長(zhǎng),這世上最難降的妖魔是什么浙炼? 我笑而不...
    開封第一講書人閱讀 55,306評(píng)論 1 279
  • 正文 為了忘掉前任份氧,我火速辦了婚禮,結(jié)果婚禮上弯屈,老公的妹妹穿的比我還像新娘蜗帜。我一直安慰自己,他們只是感情好资厉,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,330評(píng)論 5 373
  • 文/花漫 我一把揭開白布厅缺。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪湘捎。 梳的紋絲不亂的頭發(fā)上诀豁,一...
    開封第一講書人閱讀 49,071評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音窥妇,去河邊找鬼舷胜。 笑死,一個(gè)胖子當(dāng)著我的面吹牛活翩,可吹牛的內(nèi)容都是我干的烹骨。 我是一名探鬼主播,決...
    沈念sama閱讀 38,382評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼材泄,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼沮焕!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起拉宗,我...
    開封第一講書人閱讀 37,006評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤遇汞,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后簿废,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,512評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡络它,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,965評(píng)論 2 325
  • 正文 我和宋清朗相戀三年族檬,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片化戳。...
    茶點(diǎn)故事閱讀 38,094評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡单料,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出点楼,到底是詐尸還是另有隱情扫尖,我是刑警寧澤,帶...
    沈念sama閱讀 33,732評(píng)論 4 323
  • 正文 年R本政府宣布掠廓,位于F島的核電站换怖,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏蟀瞧。R本人自食惡果不足惜沉颂,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,283評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望悦污。 院中可真熱鬧铸屉,春花似錦、人聲如沸切端。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至昌屉,卻和暖如春钙蒙,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背怠益。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評(píng)論 1 262
  • 我被黑心中介騙來泰國(guó)打工仪搔, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人蜻牢。 一個(gè)月前我還...
    沈念sama閱讀 45,536評(píng)論 2 354
  • 正文 我出身青樓烤咧,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親抢呆。 傳聞我的和親對(duì)象是個(gè)殘疾皇子煮嫌,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,828評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)抱虐,斷路器昌阿,智...
    卡卡羅2017閱讀 134,600評(píng)論 18 139
  • 從三月份找實(shí)習(xí)到現(xiàn)在,面了一些公司恳邀,掛了不少懦冰,但最終還是拿到小米、百度谣沸、阿里刷钢、京東、新浪乳附、CVTE内地、樂視家的研發(fā)崗...
    時(shí)芥藍(lán)閱讀 42,187評(píng)論 11 349
  • 轉(zhuǎn)至元數(shù)據(jù)結(jié)尾創(chuàng)建: 董瀟偉,最新修改于: 十二月 23, 2016 轉(zhuǎn)至元數(shù)據(jù)起始第一章:isa和Class一....
    40c0490e5268閱讀 1,681評(píng)論 0 9
  • kafka的定義:是一個(gè)分布式消息系統(tǒng)赋除,由LinkedIn使用Scala編寫阱缓,用作LinkedIn的活動(dòng)流(Act...
    時(shí)待吾閱讀 5,302評(píng)論 1 15
  • 回想起我的高中荆针,除了比大學(xué)要輕松的日常,還有我的那個(gè)心愛的姑娘颁糟。 我這個(gè)人祭犯,性格算是開朗的,但是唯獨(dú)對(duì)女生(...
    不必徒勞閱讀 237評(píng)論 0 1