Reactor模型-多線程程版

1.概述

Reactor單線程版本的設(shè)計中瞄沙,I/O任務(wù)乃至業(yè)務(wù)邏輯都由Reactor線程來完成塑猖,這無疑增加了Reactor線程的負(fù)擔(dān)积仗,高負(fù)載情況下必然會出現(xiàn)性能瓶頸。此外寝并,對于多處理器的服務(wù)器來說箫措,單個Reactor線程也發(fā)揮不了多CPU的最大功效。下面我們對之前單線程版的Reactor進行改進食茎。

改進方向
  1. 接受客戶端連接請求的不在是單個線程-Acceptor蒂破,而是一個NIO線程池。
  2. I/O處理也不再是單個線程處理别渔,而是交給一個I/O線程池進行處理。

其實改進方向很明確:就是針對可能的系統(tǒng)瓶頸惧互,由單線程改進為多線程處理。這樣的方案帶來的好處顯而易見艾猜,增加可靠性的同時也發(fā)揮多線程的優(yōu)勢淤毛,在高負(fù)載的情況下能夠從容應(yīng)對蔗蹋。

Key Word

Java NIO 事件驅(qū)動 主從Reactor模型


2.code未動皂吮,test先行

首先定義服務(wù)端用于處理請求的Handler候齿,通過實現(xiàn)ChannelHandler接口完成亚皂。

public class SimpleServerChannelHandler implements ChannelHandler {
    
    private static Logger LOG = LoggerFactory.getLogger(SimpleServerChannelHandler.class);
    
    //記錄接受消息的次數(shù)
    public volatile int receiveSize;
    
    //記錄拋出的異常
    public volatile Throwable t;
    
    @Override
    public void channelActive(NioChannel channel) {
        if(LOG.isDebugEnabled()){
            LOG.debug("ChannelActive");
        }
    }

    @Override
    public void channelRead(NioChannel channel, Object msg) throws Exception {
        
        ByteBuffer bb = (ByteBuffer)msg;

        byte[] con = new byte[bb.remaining()];
        bb.get(con);

        String str = new String(con,0,con.length);

        String resp = "";
        switch(str){
        case "request1":resp = "response1";break;
        case "request2":resp = "response2";break;
        case "request3":resp = "response3";break;
        default :resp = "Hello Client";
        }

        ByteBuffer buf = ByteBuffer.allocate(resp.getBytes().length);
        buf.put(resp.getBytes());
        
        receiveSize++;
        
        channel.sendBuffer(buf);
    }

    @Override
    public void exceptionCaught(NioChannel channel, Throwable t)
            throws Exception {
        this.t = t;
        channel.close();
    }

}

Junit測試用例孵睬,setUp用于啟動Server端和Client端烁试。

public class ReactorTest extends BaseTest{
    private static final Logger LOG = LoggerFactory.getLogger(ReactorTest.class);

    private static String HOST = "localhost";

    private static int PORT = 8888;

    private static Client client;
    private static Server server;

    static SimpleServerChannelHandler h;

    @BeforeClass
    public static void setUp() throws Exception {
        startServer();
        startClient();
    }
    private static void startServer() throws Exception{
        server = new Server();
        ReactorPool mainReactor = new ReactorPool();
        ReactorPool subReactor = new ReactorPool();

        h = new SimpleServerChannelHandler();
        server.reactor(mainReactor, subReactor)
        .handler(h)
        .bind(new InetSocketAddress(HOST,PORT));
    }
    private static void startClient() throws SocketException{
        client = new Client();
        client.socket().setTcpNoDelay(true);
        client.connect(
                new InetSocketAddress(HOST,PORT));
    }
    @Test
    public void test() {
        LOG.info("Sucessful configuration");
    }

    @Test
    public void testBaseFunction(){
        LOG.debug("testBaseFunction()");

        String msg ="Hello Reactor";
        ByteBuffer resp = client.syncSend(ByteBuffer.wrap(msg.getBytes()));
        byte[] res = new byte[resp.remaining()];
        resp.get(res);

        Assert.assertEquals("Hello Client", new String(res,0,res.length));
    }

    @Test
    public void testMultiSend(){

        int sendSize = 1024;

        for(int i = 0; i < sendSize; i++){
            ByteBuffer bb = ByteBuffer.wrap("Hello Reactor".getBytes());
            ByteBuffer resp = client.syncSend(bb);
            byte[] res = new byte[resp.remaining()];
            resp.get(res);

            Assert.assertEquals("Hello Client", new String(res,0,res.length));
        }
        Assert.assertEquals(sendSize, h.receiveSize);
    }
    @Test
    public void testTooLongReceivedByteSizeEexception(){
        LOG.debug("testTooLongReceivedByteSizeEexception()");

        int threshold = 1024;
        byte[] dest = new byte[threshold + 1];
        Random r = new Random();
        r.nextBytes(dest);
        client.syncSend(ByteBuffer.wrap(dest));
        
        Assert.assertEquals(IllegalArgumentException.class, h.t.getClass());
        
        Assert.assertEquals("Illegal data length, len:" + (threshold+1), h.t.getMessage());
    }
    @AfterClass
    public static void tearDown() throws Exception {
        server.close();
        client.close();
    }
}

一共進行三項基本測試:

testBaseFunction

實現(xiàn)了基本發(fā)送接收消息的功能据途。

testMultiSend

重復(fù)發(fā)送消息糖驴,并且記錄消息收發(fā)的次數(shù)罐脊。

testTooLongReceivedByteSizeEexception

測試server端在接收到異常碼流的情況下,是否拋出異常爹殊。

3.設(shè)計及實現(xiàn)

3.1 Reactor和ReactorPool

Reactor作用就是不斷進行輪詢并檢查是否有已經(jīng)就緒的事件梗夸,如果有尘盼,那么就將事件分發(fā)給對應(yīng)的Handler進行處理午阵。這個角色其實就是NIO編程中的多路復(fù)用器java.nio.channels.Selector坤按。因此惧眠,Reactor聚合一個Selector類型成員變量籽懦。輪詢的過程如下:

public class Reactor extends Thread{

//...

    private Selector selector;

    private volatile boolean isShutdown;

    Reactor(){
        try {
            selector = Selector.open();
        } catch (IOException e) {
            throw new RuntimeException("failed to open a new selector", e);
        }
    }
    
@Override
    public void run() {
        for(;;){
            try {
                getSelector().select(wakenUp);
                Set<SelectionKey> keys;
                synchronized(this){
                    keys = getSelector().selectedKeys();
                }
                Iterator<SelectionKey> it = keys.iterator();
                while(it.hasNext()){
                    SelectionKey key = it.next();
                    processSelectedKey(key);
                    it.remove();
                }
                if(isShutdown()){
                    break;
                }
            } catch (Throwable e) {
                LOG.warn("Unexpected exception in the selector loop.", e);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e1) { }
            }
        }
    }
}

processSelectedKey(key)中進行的就是根據(jù)就緒事件key.readyOps()進行相應(yīng)操作:

    private void processSelectedKey(SelectionKey key){
        try {       
            NioChannel nioChannel = (NioChannel)key.attachment();

            if (!nioChannel.isOpen()) {
                LOG.warn("trying to do i/o on a null socket");
                return;
            }

            int readyOps = key.readyOps();
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                nioChannel.sink().doRead();
            }
            if((readyOps & SelectionKey.OP_WRITE) != 0){
                nioChannel.sink().doSend();
            }
            if((readyOps & SelectionKey.OP_CONNECT) != 0){
                //remove OP_CONNECT
                key.interestOps((key.interestOps() & ~SelectionKey.OP_CONNECT));
            }
        }catch (Throwable t) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Throwable stack trace", t);
            }
            closeSocket();
        }
    }

這里的NioChannel是抽象類,是對NIO編程中的Channel語義的抽象(后面會有分析)氛魁。

此外暮顺,Reactor肯定要提供一個注冊接口啦。呆盖。拖云。

    public SelectionKey register(final NioChannel sc, final int interestOps, Object attachment){
        if(sc == null){
            throw new NullPointerException("SelectableChannel");
        }
        if(interestOps == 0){
            throw new IllegalArgumentException("interestOps must be non-zero.");
        }
        SelectionKey key;
        try {
            key = sc.channel().register(getSelector(), interestOps, attachment);
        } catch (ClosedChannelException e) {
            throw new RuntimeException("failed to register a channel", e);
        }
        return key;
    }

ReactorPool是一個Reactor的線程池,這里就通過簡單的數(shù)組形式進行模擬:

public class ReactorPool {

    private static final Logger LOG = LoggerFactory.getLogger(ReactorPool.class);

    private Reactor[] reactors;

    private AtomicInteger index = new AtomicInteger();
    
    //線程數(shù)默認(rèn)為CPU數(shù)*2
    private final int DEFAULT_THREADS = Runtime.getRuntime().availableProcessors() * 2;

    public ReactorPool (){
        this(0);
    }
    public ReactorPool(int nThreads){
        if(nThreads < 0){
            throw new IllegalArgumentException("nThreads must be nonnegative number");
        }
        if(nThreads == 0){
            nThreads = DEFAULT_THREADS;
        }
        reactors = new Reactor[nThreads];
        for(int i = 0; i < nThreads; i++){
            boolean succeed = false;
            try{
                reactors[i] = new Reactor();
                succeed = true;
            }catch(Exception e){
                throw new IllegalStateException("failed to create a Reactor", e);
            }finally{
                if (!succeed) {
                    for (int j = 0; j < i; j ++) {
                        reactors[j].close();
                    }
                }
            }
        }
    }

    public Reactor next(){
        return reactors[index.incrementAndGet() % reactors.length];
    }

    public void close(){
        for(int i = 0; i < reactors.length; i++){
            reactors[i].setShutdown(true);
            reactors[i].close();
        }
    }
}

3.2 NioChannel和NioChannelSink

在進行Java原生Nio編程的過程中应又,會涉及到兩種類型的Channel:

  • java.nio.channels.SocketChannel
  • java.nio.channels.ServerSocketChannel

其分別作為客戶端和服務(wù)端調(diào)用接口宙项。為了統(tǒng)一其公共行為,這里抽象出一個抽象類NioChannel株扛,其成員組成如下:

  • 聚合一個SelectableChannel類型(SocketChannel和ServerSocketChannel的公共父類)的成員變量尤筐。
  • 持有一個所屬Reactor對象的引用
  • 聚合一個NioChannelSink類型成員變量。

NioChannelSink是將NioChannel的底層讀寫功能獨立出來洞就。一方面使NioChannel避免集成過多功能而顯得臃腫盆繁,另一方面分離出底層傳輸協(xié)議,為以后底層傳輸協(xié)議的切換做準(zhǔn)備旬蟋。(TCP vs UDP油昂,NIO、OIO、AIO)從這種意義上說冕碟,NioChannel取名為Channel貌似更合理拦惋。

public abstract class NioChannel {

    protected Reactor reactor;

    protected SelectableChannel sc;

    protected SelectionKey selectionKey;

    private NioChannelSink sink;

    protected volatile ChannelHandler handler;
    
    public NioChannel(SelectableChannel sc, int interestOps){
        this.sc = sc;
        try {
            sc.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
        }
        sink = nioChannelSink();
    }
    
    protected void fireChannelRead(ByteBuffer bb){
        try {
            handler.channelRead(this, bb);
        } catch (Exception e) {
            fireExceptionCaught(e);
        }
    }
    protected void fireExceptionCaught(Throwable t){
        try {
            handler.exceptionCaught(this, t);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //。安寺。厕妖。
    
    public abstract NioChannelSink nioChannelSink();

    public interface NioChannelSink{

        void doRead();

        void doSend();

        void sendBuffer(ByteBuffer bb);
        
        void close();
    }
}

再來分析下NioChannel需要提供哪些功能:

首先,NIO編程中SocketChannel或ServerSocketChannel需要注冊到多路復(fù)用器Selector中挑庶。那么這里就抽象成了NioChannel和Reactor的交互言秸。

public void register(Reactor reactor, int interestOps){
    this.reactor = reactor;
    try {
        selectionKey = sc.register(reactor().getSelector(), interestOps, this);
    } catch (ClosedChannelException e) {
        e.printStackTrace();
    }
}

這里將NioChannel對象作為附件,在Reactor中心輪詢到ready事件后迎捺,會根據(jù)事件的類型(OP_ACCEPT OP_READ等)举畸,從SelectionKey中取出綁定的附件NioChannel

NioChannel nioChannel = (NioChannel)key.attachment();

然后根據(jù)進行key.readyOps()做相應(yīng)操作。這在Reactor中已經(jīng)做過分析破加。

其次俱恶,作為Channel肯定要提供綁定bind和連接connect的功能了:

public abstract void bind(InetSocketAddress remoteAddress) throws Exception;
    
public abstract void connect(InetSocketAddress remoteAddress) throws Exception;

這里用抽象方法是要將實現(xiàn)交由子類來完成。

最后范舀,是用戶通過NioChannel發(fā)送的消息的函數(shù):

public void sendBuffer(ByteBuffer bb){
    sink().sendBuffer(bb);
}

protected final void enableWrite(){
    int i = selectionKey.interestOps();
    if((i & SelectionKey.OP_WRITE) == 0){
        selectionKey.interestOps(i | SelectionKey.OP_WRITE);
    }
}
protected final void disableWrite(){
    int i = selectionKey.interestOps();
    if((i & SelectionKey.OP_WRITE) == 1){
        selectionKey.interestOps(i & (~SelectionKey.OP_WRITE));         
    }
}

3.3 NioServerSocketChannel和NioSocketChannel

NioServerSocketChannel和NioSocketChannel是抽象類NioChannel的一個子類合是,NioServerSocketChannel和java.nio.channels.ServerSocketChannel的語義是一致的,供服務(wù)端使用锭环,綁定指定端口聪全,監(jiān)聽客戶端發(fā)起的連接請求,并交由相應(yīng)Handler處理辅辩。而NioSocketChannel和java.nio.channels.NioSocketChannel語義一致难礼,作為通信的一個通道。

public class NioServerSocketChannel extends NioChannel{

    private static final Logger LOG = LoggerFactory.getLogger(NioServerSocketChannel.class);
    
    public NioServerSocketChannel(){
        super(newSocket());
    }
    
    public static ServerSocketChannel newSocket(){
        ServerSocketChannel socketChannel = null;
        try {
            socketChannel = ServerSocketChannel.open();
        } catch (IOException e) {
            LOG.error("Unexpected exception occur when open ServerSocketChannel");
        }
        return socketChannel;
    }
    
    @Override
    public NioChannelSink nioChannelSink() {
        return new NioServerSocketChannelSink();
    }
    
    class NioServerSocketChannelSink implements NioChannelSink{
        //玫锋。蛾茉。。
    }
        @Override
    public void bind(InetSocketAddress remoteAddress) throws Exception {
        ServerSocketChannel ssc = (ServerSocketChannel)sc;
        ssc.bind(remoteAddress);
    }
    @Override
    public void connect(InetSocketAddress remoteAddress) throws Exception {
        throw new UnsupportedOperationException();
    }
}

這里獲取ServerSocketChannel實例的方式是通過ServerSocketChannel.open()撩鹿,其實也可以通過反射來獲取谦炬,這樣就能將ServerSocketChannel和SocketChannel的實例化邏輯進行統(tǒng)一,我們只需要在實例化Channel的時候?qū)erverSocketChannel.class 或 SocketChannel.class當(dāng)作參數(shù)傳入即可节沦。

NioSocketChannel的實現(xiàn)如下:

public class NioSocketChannel extends NioChannel{

    private static final Logger LOG = LoggerFactory.getLogger(NioSocketChannel.class);

    public NioSocketChannel() throws IOException{
        super( newSocket());
    }
    public NioSocketChannel(SocketChannel sc) throws IOException{
        super(sc);
    }
    public static SocketChannel newSocket(){
        SocketChannel socketChannel = null;
        try {
            socketChannel = SocketChannel.open();
        } catch (IOException e) {
        }
        return socketChannel;
    }

    @Override
    public NioChannelSink nioChannelSink() {
        return new NioSocketChannelSink();
    }
    
    class NioSocketChannelSink implements NioChannelSink{
        //键思。。甫贯。
    }
    
    @Override
    public void bind(InetSocketAddress remoteAddress) throws Exception {
        throw new UnsupportedOperationException();
    }
    @Override
    public void connect(InetSocketAddress remoteAddress) throws Exception {
        SocketChannel socketChannel = (SocketChannel)sc;
        socketChannel.connect(remoteAddress);
    }
}

3.4 NioServerSocketChannelSink和NioSocketChannelSink

通過上面分析可知吼鳞,NioChannel的只向上提供了操作接口,而具體的底層讀寫等功能全部代理給了NioChannelSink完成叫搁。接下來分析下NioChannelSink的兩個子類NioServerSocketChannelSink和NioSocketChannelSink赔桌。

首先再看下NioChannelSink的接口:

    public interface NioChannelSink{

        void doRead();

        void doSend();

        void sendBuffer(ByteBuffer bb);
        
        void close();
    }

對于NioChannelSink的兩個實現(xiàn)類來說供炎,每個方法所對應(yīng)的語義如下:

doRead()

  • NioServerSocketChannelSink:通過accept()接受客戶端的請求。
  • NioSocketChannelSink:讀取NioChannel中的數(shù)據(jù)

doSend()

  • NioServerSocketChannelSink:不支持纬乍。
  • NioSocketChannelSink:將緩沖區(qū)中數(shù)據(jù)寫入NioChannel

sendBuffer()

  • NioServerSocketChannelSink:不支持碱茁。
  • NioSocketChannelSink:發(fā)送數(shù)據(jù)裸卫,其實就是將待發(fā)送數(shù)據(jù)加入緩沖隊列中仿贬。

close()

  • NioServerSocketChannelSink:關(guān)閉Channel。
  • NioSocketChannelSink:同上墓贿。

當(dāng)然了茧泪,作為網(wǎng)絡(luò)編程中的Channel所提供的功能原比這里要多且復(fù)雜,作為學(xué)習(xí)Demo聋袋,這里只實現(xiàn)了最常用的幾個功能队伟。

下面看下NioServerSocketChannelSink的實現(xiàn):

public class NioServerSocketChannel extends NioChannel{

    //。幽勒。嗜侮。
    
    class NioServerSocketChannelSink implements NioChannelSink{

        public void doRead() {
            try {
                ServerSocketChannel ssc = (ServerSocketChannel)sc;
                handler.channelRead(NioServerSocketChannel.this,
                        new NioSocketChannel(ssc.accept()));
                if(LOG.isDebugEnabled()){
                    LOG.debug("Dispatch the SocketChannel to SubReactorPool");
                }
            } catch (Exception e1) {
                e1.printStackTrace();
            }
        }

        public void doSend(){
            throw new UnsupportedOperationException();
        }

        @Override
        public void sendBuffer(ByteBuffer bb) {
            throw new UnsupportedOperationException();
        }

        @Override
        public void close() {
            try {
                if(sc != null){
                    sc.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }// end NioChannelSink
    
    //。啥容。锈颗。
}

下面是NioSocketChannelSink實現(xiàn):

public class NioSocketChannel extends NioChannel{
    
    //。咪惠。击吱。
    
    class NioSocketChannelSink implements NioChannelSink{
        
        private static final int MAX_LEN = 1024;
        
        ByteBuffer lenBuffer = ByteBuffer.allocate(4);

        ByteBuffer inputBuffer = lenBuffer;

        ByteBuffer outputDirectBuffer = ByteBuffer.allocateDirect(1024 * 64);

        LinkedBlockingQueue<ByteBuffer> outputQueue = new LinkedBlockingQueue<ByteBuffer>();

        public void close(){
            //clear buffer
            outputDirectBuffer = null;

            try {
                if(sc != null){
                    sc.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        public void doRead() {
            
            SocketChannel socketChannel = (SocketChannel)sc;

            int byteSize;
            try {
                byteSize = socketChannel.read(inputBuffer);

                if(byteSize < 0){
                    LOG.error("Unable to read additional data");
                    throw new RuntimeException("Unable to read additional data");
                }
                if(!inputBuffer.hasRemaining()){

                    if(inputBuffer == lenBuffer){
                        //read length
                        lenBuffer.flip();
                        int len = lenBuffer.getInt();
                        if(len < 0 || len > MAX_LEN){
                            throw new IllegalArgumentException("Illegal data length, len:" + len);
                        }
                        //prepare for receiving data
                        inputBuffer = ByteBuffer.allocate(len);
                        inputBuffer.clear();
                    }else{
                        //read data
                        if(inputBuffer.hasRemaining()){
                            socketChannel.read(inputBuffer);
                        }
                        if(!inputBuffer.hasRemaining()){
                            inputBuffer.flip();
                            
                            fireChannelRead(inputBuffer);
                            
                            //clear lenBuffer and waiting for next reading operation 
                            lenBuffer.clear();
                            inputBuffer = lenBuffer;
                        }
                    }
                }
            } catch (Throwable t) {
                if(LOG.isDebugEnabled()){
                    LOG.debug("Exception :" + t);
                }
                fireExceptionCaught(t);
            }
        }

        public void doSend(){
            /**
             * write data to channel:
             * step 1: write the length of data(occupy 4 byte)
             * step 2: data content
             */
            try {
                if(outputQueue.size() > 0){
                    ByteBuffer directBuffer = outputDirectBuffer;
                    directBuffer.clear();
                    for(ByteBuffer buf : outputQueue){
                        buf.flip();

                        if(buf.remaining() > directBuffer.remaining()){
                            //prevent BufferOverflowException
                            buf = (ByteBuffer) buf.slice().limit(directBuffer.remaining());
                        }
                        //transfers the bytes remaining in buf into  directBuffer
                        int p = buf.position();
                        directBuffer.put(buf);
                        //reset position
                        buf.position(p);

                        if(!directBuffer.hasRemaining()){
                            break;
                        }
                    }
                    directBuffer.flip();
                    int sendSize = ((SocketChannel)sc).write(directBuffer);

                    while(!outputQueue.isEmpty()){
                        ByteBuffer buf = outputQueue.peek();
                        int left = buf.remaining() - sendSize;
                        if(left > 0){
                            buf.position(buf.position() + sendSize);
                            break;
                        }
                        sendSize -= buf.remaining();
                        outputQueue.remove();
                    }
                }

                synchronized(reactor){
                    if(outputQueue.size() == 0){
                        //disable write
                        disableWrite();
                    }else{
                        //enable write
                        enableWrite();
                    }
                }
            } catch (Throwable t) {
                fireExceptionCaught(t);
            }
        }
        private ByteBuffer wrapWithHead(ByteBuffer bb){
            bb.flip();
            lenBuffer.clear();
            int len = bb.remaining();
            lenBuffer.putInt(len);
            ByteBuffer resp = ByteBuffer.allocate(len+4);

            lenBuffer.flip();
            resp.put(lenBuffer);
            resp.put(bb);

            return resp;
        }
        public void sendBuffer(ByteBuffer bb){
            try{
                synchronized(this){
                    //wrap ByteBuffer with length header
                    ByteBuffer wrapped = wrapWithHead(bb);

                    outputQueue.add(wrapped);

                    enableWrite();
                }
            }catch(Exception e){
                LOG.error("Unexcepted Exception: ", e);
            }
        }
    }// end NioSocketChannelSink
    
    //。遥昧。覆醇。
}

NioSocketChannelSink中的讀寫功能在Reactor單線程版本里已經(jīng)分析過,這里就不再贅述炭臭。

3.5 ChannelHandler

ChannelHandler是Reactor框架提供給用戶進行自定義的接口永脓。接口提供了常用的接口:

public interface ChannelHandler {
    
    void channelActive(NioChannel channel);
    
    void channelRead(NioChannel channel, Object msg) throws Exception;
    
    void exceptionCaught(NioChannel channel, Throwable t) throws Exception;
}

4. 總結(jié)

4.1 軟件設(shè)計中的一些注意點

時刻緊繃一根弦:資源是有限的

比如在網(wǎng)絡(luò)編程中,每建立一個Socket連接都會消耗一定資源鞋仍,當(dāng)回話結(jié)束后一定要關(guān)閉常摧。此外,必須考慮非正常流程時的情況凿试。比如發(fā)生異常排宰,可能執(zhí)行不到關(guān)閉資源的操作。 如ReactorPool的實例化過程:

    public ReactorPool(int nThreads){
        //那婉。板甘。
        reactors = new Reactor[nThreads];
        for(int i = 0; i < nThreads; i++){
            boolean succeed = false;
            try{
                reactors[i] = new Reactor();
                succeed = true;
            }catch(Exception e){
                throw new IllegalStateException("failed to create a Reactor", e);
            }finally{
                if (!succeed) {
                    for (int j = 0; j < i; j ++) {
                        reactors[j].close();
                    }
                }
            }
        }
    }

當(dāng)實例化過程中發(fā)送異常時,記得要及時回收已占用資源详炬。

又比如在通信一端接受字節(jié)流的時候需要注意對異常碼流的處理盐类,避免碼流過大而耗盡內(nèi)存寞奸,導(dǎo)致OOM。

并發(fā)操作分析

  • 這個類是線程安全的嗎在跳?
  • 這個方法是在哪個線程中執(zhí)行的枪萄?
  • 是否是熱點區(qū)域?
  • 是否存在并發(fā)修改的可能猫妙?
  • 并發(fā)修改是否可見瓷翻?

在單線程版的Reactor模型中,所有的邏輯都由Reactor單個線程執(zhí)行割坠,不存在多線程并發(fā)操作的情況齐帚,那么在我們添加了線程池workerPool后,情況又會怎么樣呢彼哼?

一般我們在分析并發(fā)性問題对妄,通常的做法是先找到可能被多個線程共同訪問的類,再分析下這個類是否是線程安全的敢朱。如何判斷某個類是否是線程安全的剪菱?

  1. 該類是否是有狀態(tài)的,無狀態(tài)的類一定是線程安全的拴签。
  2. 如果有狀態(tài)孝常,是否可變。如果一個類狀態(tài)不可變篓吁,那么肯定也是線程安全的茫因。

所謂的狀態(tài)暫可以簡單理解為是否有成員變量,不管是靜態(tài)成員變量還是普通成員變量杖剪。

關(guān)于"單一職責(zé)"

單一職責(zé)原則是面向?qū)ο筌浖O(shè)計的基本原則之一冻押,難點在于接口的職責(zé)如何劃分,而職責(zé)的劃分又需要具體問題具體考慮盛嘿。拿本次這個小Demo來說洛巢,NioChannel的職責(zé)是作為數(shù)據(jù)傳輸通道,而通道中數(shù)據(jù)傳輸方式可能有很多種次兆,那么這里就抽象出一個NioChannelSink接口負(fù)責(zé)具體傳輸方式的實現(xiàn)稿茉。

職責(zé)粒度的劃分需要根據(jù)需求好好把控。過粗不利于擴展芥炭,過細(xì)不利于實現(xiàn)漓库。

后記

長路漫漫。园蝠。渺蒿。繼續(xù)前進!1胙Α茂装!

gitbub完整源碼

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末怠蹂,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子少态,更是在濱河造成了極大的恐慌城侧,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,204評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件彼妻,死亡現(xiàn)場離奇詭異嫌佑,居然都是意外死亡,警方通過查閱死者的電腦和手機澳骤,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評論 3 395
  • 文/潘曉璐 我一進店門歧强,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人为肮,你說我怎么就攤上這事》艟” “怎么了颊艳?”我有些...
    開封第一講書人閱讀 164,548評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長忘分。 經(jīng)常有香客問我棋枕,道長,這世上最難降的妖魔是什么妒峦? 我笑而不...
    開封第一講書人閱讀 58,657評論 1 293
  • 正文 為了忘掉前任重斑,我火速辦了婚禮,結(jié)果婚禮上肯骇,老公的妹妹穿的比我還像新娘窥浪。我一直安慰自己,他們只是感情好笛丙,可當(dāng)我...
    茶點故事閱讀 67,689評論 6 392
  • 文/花漫 我一把揭開白布漾脂。 她就那樣靜靜地躺著,像睡著了一般胚鸯。 火紅的嫁衣襯著肌膚如雪骨稿。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,554評論 1 305
  • 那天姜钳,我揣著相機與錄音坦冠,去河邊找鬼。 笑死哥桥,一個胖子當(dāng)著我的面吹牛辙浑,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播泰讽,決...
    沈念sama閱讀 40,302評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼例衍,長吁一口氣:“原來是場噩夢啊……” “哼昔期!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起佛玄,我...
    開封第一講書人閱讀 39,216評論 0 276
  • 序言:老撾萬榮一對情侶失蹤硼一,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后梦抢,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體般贼,經(jīng)...
    沈念sama閱讀 45,661評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,851評論 3 336
  • 正文 我和宋清朗相戀三年奥吩,在試婚紗的時候發(fā)現(xiàn)自己被綠了哼蛆。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,977評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡霞赫,死狀恐怖腮介,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情端衰,我是刑警寧澤叠洗,帶...
    沈念sama閱讀 35,697評論 5 347
  • 正文 年R本政府宣布,位于F島的核電站旅东,受9級特大地震影響灭抑,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜抵代,卻給世界環(huán)境...
    茶點故事閱讀 41,306評論 3 330
  • 文/蒙蒙 一腾节、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧荤牍,春花似錦案腺、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至涎才,卻和暖如春鞋既,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背耍铜。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評論 1 270
  • 我被黑心中介騙來泰國打工邑闺, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人棕兼。 一個月前我還...
    沈念sama閱讀 48,138評論 3 370
  • 正文 我出身青樓陡舅,卻偏偏與公主長得像,于是被迫代替她去往敵國和親伴挚。 傳聞我的和親對象是個殘疾皇子靶衍,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,927評論 2 355

推薦閱讀更多精彩內(nèi)容