Reactor 線程模型實(shí)現(xiàn)原理

java線程模型演進(jìn)過程

單線程
十幾年前,那時主流的 CPU 大都還是單核仑氛,CPU 的核心頻率是機(jī)器最重要的指標(biāo)之一祝钢。在 Java 領(lǐng)域當(dāng)時比較流行的是單線程編程,對于 CPU 密集型的應(yīng)用程序而言守屉,頻繁的通過多線程進(jìn)行協(xié)作和搶占時間片反而會降低性能。

多線程
隨著硬件性能的提升蒿辙,CPU 的核數(shù)越來越越多拇泛,很多服務(wù)器標(biāo)配已經(jīng)達(dá)到 32 或 64 核滨巴。通過多線程并發(fā)編程,可以充分利用多核 CPU 的處理能力俺叭,提升系統(tǒng)的處理效率和并發(fā)性能恭取。

從 2005 年開始,隨著多核處理器的逐步普及熄守,java 的多線程并發(fā)編程也逐漸流行起來蜈垮,當(dāng)時商用主流的 JDK 版本是 1.4,用戶可以通過 new Thread()的方式創(chuàng)建新的線程裕照。

由于 JDK1.4 并沒有提供類似線程池這樣的線程管理容器攒发,多線程之間的同步、協(xié)作晋南、創(chuàng)建和銷毀等工作都需要用戶自己實(shí)現(xiàn)惠猿。由于創(chuàng)建和銷毀線程是個相對比較重量級的操作,因此负间,這種原始的多線程編程效率和性能都不高偶妖。

線程池
為了提升 Java 多線程編程的效率和性能,降低用戶開發(fā)難度政溃。JDK1.5 推出了 java.util.concurrent 并發(fā)編程包趾访。在并發(fā)編程類庫中,提供了線程池董虱、線程安全容器扼鞋、原子類等新的類庫,極大的提升了 Java 多線程編程的效率空扎,降低了開發(fā)難度藏鹊。從 JDK1.5 開始,基于線程池的并發(fā)編程已經(jīng)成為 Java 多核編程的主流转锈。

Reactor 模型
無論是 C++ 還是 Java 編寫的網(wǎng)絡(luò)框架,大多數(shù)都是基于 Reactor 模式進(jìn)行設(shè)計和開發(fā)楚殿,Reactor 模式基于事件驅(qū)動撮慨,特別適合處理海量的 I/O 事件。

單線程模型
Reactor 單線程模型脆粥,指的是所有的 IO 操作都在同一個 NIO 線程上面完成砌溺,

image.png

上圖中Reactor是一個典型的事件驅(qū)動中心,客戶端發(fā)起請求并建立連接時变隔,會觸發(fā)注冊在多路復(fù)用器Selector上的SelectionKey.OP_ACCEPT事件规伐,綁定在該事件上的Acceptor對象的職責(zé)就是接受請求,為接下來的讀寫操作做準(zhǔn)備匣缘。

public class Reactor implements Runnable {

    private static final Logger LOG = LoggerFactory.getLogger(Reactor.class);
    
    private Selector selector;
    
    private ServerSocketChannel ssc;

    private Handler DEFAULT_HANDLER = new Handler(){
        @Override
        public void processRequest(Processor processor, ByteBuffer msg) {
            //NOOP
        }
    };
    private Handler handler = DEFAULT_HANDLER;
    
    
    /**
     * 啟動階段
     * @param port
     * @throws IOException
     */
    public Reactor(int port, int maxClients, Handler serverHandler) throws IOException{
        selector = Selector.open();
        ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.socket().bind(new InetSocketAddress(port));
        
        this.handler = serverHandler;
        SelectionKey sk = ssc.register(selector, SelectionKey.OP_ACCEPT);
        sk.attach(new Acceptor());
    }
    /**
     * 輪詢階段
     */
    @Override
    public void run() {
        while(!ssc.socket().isClosed()){
            try {
                selector.select(1000);
                Set<SelectionKey> keys;
                synchronized(this){
                    keys = selector.selectedKeys();
                }
                Iterator<SelectionKey> it = keys.iterator();
                while(it.hasNext()){
                    SelectionKey key = it.next();
                    dispatch(key);
                    it.remove();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        close();
    }
    
    public void dispatch(SelectionKey key){
        Runnable r = (Runnable)key.attachment();
        if(r != null)
            r.run();
    }
    /**
     * 用于接受TCP連接的Acceptor
     * 
     */
    class Acceptor implements Runnable{

        @Override
        public void run() {
            SocketChannel sc;
            try {
                sc = ssc.accept();
                if(sc != null){
                    new Processor(Reactor.this,selector,sc);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    public void close(){
        try {
            selector.close();
            if(LOG.isDebugEnabled()){
                LOG.debug("Close selector");
            }
        } catch (IOException e) {
            LOG.warn("Ignoring exception during close selector, e=" + e);
        }
    }
    public void processRequest(Processor processor, ByteBuffer msg){
        if(handler != DEFAULT_HANDLER){
            handler.processRequest(processor, msg);
        }
    }
}

上面就是典型的單線程版本的Reactor實(shí)現(xiàn)猖闪,實(shí)例化Reactor對象的過程中鲜棠,在當(dāng)前多路復(fù)用器Selector上注冊了OP_ACCEPT事件,當(dāng)OP_ACCEPT事件發(fā)生后培慌,Reactor通過dispatch方法執(zhí)行Acceptor的run方法豁陆,Acceptor類的主要功能就是接受請求,建立連接吵护,并將代表連接建立的SocketChannel以參數(shù)的形式構(gòu)造Processor對象盒音。

Processor的任務(wù)就是進(jìn)行I/O操作。

下面是Processor的源碼:

/**
 * Server Processor
 */
public class Processor implements Runnable {

    private static final Logger LOG = LoggerFactory.getLogger(Processor.class);

    Reactor reactor;

    private SocketChannel sc;

    private final SelectionKey sk;

    private final ByteBuffer lenBuffer = ByteBuffer.allocate(4);

    private ByteBuffer inputBuffer = lenBuffer;

    private ByteBuffer outputDirectBuffer = ByteBuffer.allocateDirect(1024 * 64);

    private LinkedBlockingQueue<ByteBuffer> outputQueue = new LinkedBlockingQueue<ByteBuffer>();

    public Processor(Reactor reactor, Selector sel,SocketChannel channel) throws IOException{
        this.reactor = reactor;
        sc = channel;
        sc.configureBlocking(false);
        sk = sc.register(sel, SelectionKey.OP_READ);
        sk.attach(this);
        sel.wakeup();
    }

    @Override
    public void run() {
        if(sc.isOpen() && sk.isValid()){
            if(sk.isReadable()){
                doRead();
            }else if(sk.isWritable()){
                doSend();
            }
        }else{
            LOG.error("try to do read/write operation on null socket");
            try {
                if(sc != null)
                    sc.close();
            } catch (IOException e) {}
        }
    }
    private void doRead(){
        try {
            int byteSize = sc.read(inputBuffer);
            
            if(byteSize < 0){
                LOG.error("Unable to read additional data");
            }
            if(!inputBuffer.hasRemaining()){
                
                if(inputBuffer == lenBuffer){
                    //read length
                    inputBuffer.flip();
                    int len = inputBuffer.getInt();
                    if(len < 0){
                        throw new IllegalArgumentException("Illegal data length");
                    }
                    //prepare for receiving data
                    inputBuffer = ByteBuffer.allocate(len);
                }else{
                    //read data
                    if(inputBuffer.hasRemaining()){
                        sc.read(inputBuffer);
                    }
                    if(!inputBuffer.hasRemaining()){
                        inputBuffer.flip();
                        processRequest();
                        //clear lenBuffer and waiting for next reading operation 
                        lenBuffer.clear();
                        inputBuffer = lenBuffer;
                    }
                }
            }
        } catch (IOException e) {
            LOG.error("Unexcepted Exception during read. e=" + e);
            try {
                if(sc != null)
                    sc.close();
            } catch (IOException e1) {
                LOG.warn("Ignoring exception when close socketChannel");
            }
        }
    }

    /**
     * process request and get response
     * 
     * @param request
     * @return
     */
    private void processRequest(){
        reactor.processRequest(this,inputBuffer);
    }
    private void doSend(){
        try{
            /**
             * write data to channel:
             * step 1: write the length of data(occupy 4 byte)
             * step 2: data content
             */
            if(outputQueue.size() > 0){
                ByteBuffer directBuffer = outputDirectBuffer;
                directBuffer.clear();
                
                for(ByteBuffer buf : outputQueue){
                    buf.flip();
                    
                    if(buf.remaining() > directBuffer.remaining()){
                        //prevent BufferOverflowException
                        buf = (ByteBuffer) buf.slice().limit(directBuffer.remaining());
                    }
                    //transfers the bytes remaining in buf into  directBuffer
                    int p = buf.position();
                    directBuffer.put(buf);
                    //reset position
                    buf.position(p);

                    if(!directBuffer.hasRemaining()){
                        break;
                    }
                }
                directBuffer.flip();
                int sendSize = sc.write(directBuffer);
                
                while(!outputQueue.isEmpty()){
                    ByteBuffer buf = outputQueue.peek();
                    int left = buf.remaining() - sendSize;
                    if(left > 0){
                        buf.position(buf.position() + sendSize);
                        break;
                    }
                    sendSize -= buf.remaining();
                    outputQueue.remove();
                }
            }
            synchronized(reactor){
                if(outputQueue.size() == 0){
                    //disable write
                    disableWrite();
                }else{
                    //enable write
                    enableWrite();
                }
            }
        } catch (CancelledKeyException e) {
            LOG.warn("CancelledKeyException occur e=" + e);
        } catch (IOException e) {
            LOG.warn("Exception causing close, due to " + e);
        }
    }
    public void sendBuffer(ByteBuffer bb){
        try{
            synchronized(this.reactor){
                if(LOG.isDebugEnabled()){
                    LOG.debug("add sendable bytebuffer into outputQueue");
                }
                //wrap ByteBuffer with length header
                ByteBuffer wrapped = wrap(bb);
                
                outputQueue.add(wrapped);
                
                enableWrite();
            }
        }catch(Exception e){
            LOG.error("Unexcepted Exception: ", e);
        }
    }
    
    private ByteBuffer wrap(ByteBuffer bb){
        bb.flip();
        lenBuffer.clear();
        int len = bb.remaining();
        lenBuffer.putInt(len);
        ByteBuffer resp = ByteBuffer.allocate(len+4);
        lenBuffer.flip();
        
        resp.put(lenBuffer);
        resp.put(bb);
        return resp;
    }
    private void enableWrite(){
        int i = sk.interestOps();
        if((i & SelectionKey.OP_WRITE) == 0){
            sk.interestOps(i | SelectionKey.OP_WRITE);
        }
    }
    private void disableWrite(){
        int i = sk.interestOps();
        if((i & SelectionKey.OP_WRITE) == 4){
            sk.interestOps(i & (~SelectionKey.OP_WRITE));           
        }
    }
}

其實(shí)Processor要做的事情很簡單馅而,就是向selector注冊感興趣的讀寫時間祥诽,OP_READ或OP_WRITE,然后等待事件觸發(fā)瓮恭,做相應(yīng)的操作原押。

    @Override
    public void run() {
        if(sc.isOpen() && sk.isValid()){
            if(sk.isReadable()){
                doRead();
            }else if(sk.isWritable()){
                doSend();
            }
        }else{
            LOG.error("try to do read/write operation on null socket");
            try {
                if(sc != null)
                    sc.close();
            } catch (IOException e) {}
        }
    }

而doRead()和doSend()方法稍微復(fù)雜了一點(diǎn),這里其實(shí)處理了用TCP協(xié)議進(jìn)行通信時必須要解決的問題:TCP粘包拆包問題偎血。

TCP粘包拆包問題
我們都知道TCP協(xié)議是面向字節(jié)流的诸衔,而字節(jié)流是連續(xù)的,無法有效識別應(yīng)用層數(shù)據(jù)的邊界颇玷。如下圖:

image.png

上圖顯示的應(yīng)用層有三個數(shù)據(jù)包笨农,D1,D2帖渠,D3.當(dāng)應(yīng)用層數(shù)據(jù)傳到傳輸層后谒亦,可能會出現(xiàn)粘包拆包現(xiàn)象。
TCP協(xié)議的基本傳輸單位是報文段空郊,而每個報文段最大有效載荷是有限制的,一般以太網(wǎng)MTU為1500份招,去除IP頭20B,TCP頭20B狞甚,那么剩下的1460B就是傳輸層最大報文段的有效載荷锁摔。如果應(yīng)用層數(shù)據(jù)大于該值(如上圖中的數(shù)據(jù)塊D2),那么傳輸層就會進(jìn)行拆分重組哼审。

解決方案

  1. 每個消息之間加分割符(缺點(diǎn):消息編解碼耗時谐腰,并且如果消息體中本省就包含分隔字符,需要進(jìn)行轉(zhuǎn)義涩盾,效率低)
  2. 每個數(shù)據(jù)包加個HeaderJ!4夯簟(header中指定后面數(shù)據(jù)的長度砸西,這就是Tcp、Ip協(xié)議通用的做法)
image.png

header區(qū)占用4B,內(nèi)容為數(shù)據(jù)的長度芹枷。

doRead
inputBuffer負(fù)責(zé)接受數(shù)據(jù)衅疙,lenBuffer負(fù)責(zé)接受數(shù)據(jù)長度,初始化的時候杖狼,將lenBuffer賦給inputBuffer炼蛤,定義如下:

private final ByteBuffer lenBuffer = ByteBuffer.allocate(4);
private ByteBuffer inputBuffer = lenBuffer;
  1. 如果inputBuffer == lenBuffer,那么從inputBuffer中讀取出一個整型值len,這個值就是接下來要接受的數(shù)據(jù)的長度蝶涩。同時分配一個大小為len的內(nèi)存空間理朋,并復(fù)制給inputBuffer,表示準(zhǔn)備接受數(shù)據(jù)绿聘。
    private void doRead(){
        try {
            int byteSize = sc.read(inputBuffer);
            
            if(byteSize < 0){
                LOG.error("Unable to read additional data");
            }
            if(!inputBuffer.hasRemaining()){
                
                if(inputBuffer == lenBuffer){
                    //read length
                    inputBuffer.flip();
                    int len = inputBuffer.getInt();
                    if(len < 0){
                        throw new IllegalArgumentException("Illegal data length");
                    }
                    //prepare for receiving data
                    inputBuffer = ByteBuffer.allocate(len);
                else{...}
  1. 如果inputBuffer 嗽上!= lenBuffer,那么開始接受數(shù)據(jù)吧熄攘!
if(inputBuffer == lenBuffer){
        //兽愤。。挪圾。
}else{
    //read data
    if(inputBuffer.hasRemaining()){
        sc.read(inputBuffer);
    }
    if(!inputBuffer.hasRemaining()){
        inputBuffer.flip();
        processRequest();
        //clear lenBuffer and waiting for next reading operation 
        lenBuffer.clear();
        inputBuffer = lenBuffer;
    }
}

note

  1. 必須保證緩沖區(qū)是滿的浅萧,即inputBuffer.hasRemaining()=false
  2. processRequest后,將inputBuffer重新賦值為lenBuffer哲思,為下一次讀操作做準(zhǔn)備洼畅。

doWrite
用戶調(diào)用sendBuffer方法發(fā)送數(shù)據(jù),其實(shí)就是將數(shù)據(jù)加入outputQueue棚赔,這個outputQueue就是一個發(fā)送緩沖隊列帝簇。

public void sendBuffer(ByteBuffer bb){
        try{
            synchronized(this.reactor){
                if(LOG.isDebugEnabled()){
                    LOG.debug("add sendable bytebuffer into outputQueue");
                }
                //wrap ByteBuffer with length header
                ByteBuffer wrapped = wrap(bb);
                
                outputQueue.add(wrapped);
                
                enableWrite();
            }
        }catch(Exception e){
            LOG.error("Unexcepted Exception: ", e);
        }
    }

doSend方法就很好理解了,無非就是不斷從outputQueue中取數(shù)據(jù)靠益,然后寫入channel中即可丧肴。過程如下:

將發(fā)送隊列outputQueue中的數(shù)據(jù)寫入緩沖區(qū)outputDirectBuffer:

  1. 清空outputDirectBuffer,為發(fā)送數(shù)據(jù)做準(zhǔn)備
  2. 將outputQueue數(shù)據(jù)寫入outputDirectBuffer
  3. 調(diào)用socketChannel.write(outputDirectBuffer);將outputDirectBuffer寫入socket緩沖區(qū)

執(zhí)行步驟2的時候胧后,我們可能會遇到這么幾種情況:

  1. 某個數(shù)據(jù)包大小超過了outputDirectBuffer剩余空間大小
  2. outputDirectBuffer已被填滿芋浮,但是outputQueue仍有待發(fā)送的數(shù)據(jù)

執(zhí)行步驟3的時候,也可能出現(xiàn)下面兩種情況:

  1. outputDirectBuffer被全部寫入socket緩沖區(qū)
  2. outputDirectBuffer只有部分?jǐn)?shù)據(jù)或者壓根就沒有數(shù)據(jù)被寫入socket緩沖區(qū)

結(jié)合代碼:

為什么需要重置buf的position

int p = buf.position();
directBuffer.put(buf);
//reset position
buf.position(p);

寫入directBuffer的數(shù)據(jù)是即將被寫入SocketChannel的數(shù)據(jù)绩卤,問題就在于:當(dāng)我們調(diào)用

int sendSize = sc.write(directBuffer);

的時候途样,directBuffer中的數(shù)據(jù)都被寫入Channel了嗎?明顯是不確定的(具體可以看java.nio.channels.SocketChannel.write(ByteBuffer src)的doc文檔)

那如何解決濒憋?
思路很簡單,根據(jù)write方法返回值sendSize陶夜,遍歷outputQueue中的ByteBuffer凛驮,根據(jù)buf.remaining()和sendSize的大小,才可以確定buf是否真的被發(fā)送了条辟。如下所示:

while(!outputQueue.isEmpty()){
    ByteBuffer buf = outputQueue.peek();
    int left = buf.remaining() - sendSize;
    if(left > 0){
        buf.position(buf.position() + sendSize);
        break;
    }
    sendSize -= buf.remaining();
    outputQueue.remove();
}

網(wǎng)絡(luò)通信基本解決黔夭,上面的處理思路是參照Zookeeper網(wǎng)絡(luò)模塊的實(shí)現(xiàn)宏胯。

Test

Server端:

public class ServerTest {

    private static int PORT = 8888;
    
    public static void main(String[] args) throws IOException, InterruptedException {
        
        Thread t = new Thread(new Reactor(PORT,1024,new MyHandler()));
        t.start();
        System.out.println("server start");
        t.join();
    }
}

用戶自定義Handler:

public class MyHandler implements Handler {
    
    @Override
    public void processRequest(Processor processor, ByteBuffer msg) {
        byte[] con = new byte[msg.remaining()];
        msg.get(con);
        
        String str = new String(con,0,con.length);
        
        String resp = "";
        switch(str){
        case "request1":resp = "response1";break;
        case "request2":resp = "response2";break;
        case "request3":resp = "response3";break;
        default :resp = "";
        }
        
        ByteBuffer buf = ByteBuffer.allocate(resp.getBytes().length);
        buf.put(resp.getBytes());
        
        processor.sendBuffer(buf);
    }
}

client端

public class ClientTest {

    private static String HOST = "localhost";
    private static int PORT = 8888;

    public static void main(String[] args) throws IOException {
        
        Client client = new Client();
        client.socket().setTcpNoDelay(true);
        
        client.connect(
                new InetSocketAddress(HOST,PORT));
        
        ByteBuffer msg;
        for(int i = 1; i <= 3; i++){
            msg = ByteBuffer.wrap(("request" + i).getBytes());
            System.out.println("send-" + "request" + i);
            
            ByteBuffer resp = client.send(msg);
            byte[] retVal = new byte[resp.remaining()];
            resp.get(retVal);

            System.out.println("receive-" + new String(retVal,0,retVal.length));
            
        }
    }
}

輸出:

send-request1
receive-response1
send-request2
receive-response2
send-request3
receive-response3

小結(jié)

在這種實(shí)現(xiàn)方式中,dispatch方法是同步阻塞的1纠选<缗邸!所有的IO操作和業(yè)務(wù)邏輯處理都在NIO線程(即Reactor線程)中完成婚惫。如果業(yè)務(wù)處理很快氛赐,那么這種實(shí)現(xiàn)方式?jīng)]什么問題,不用切換到用戶線程先舷。但是艰管,想象一下如果業(yè)務(wù)處理很耗時(涉及很多數(shù)據(jù)庫操作、磁盤操作等)蒋川,那么這種情況下Reactor將被阻塞牲芋,這肯定是我們不希望看到的。解決方法很簡單捺球,業(yè)務(wù)邏輯進(jìn)行異步處理,即交給用戶線程處理缸浦。單線程reactor模式缺點(diǎn)如下:

  1. 自始自終都只有一個Reactor線程,缺點(diǎn)很明顯:Reactor意外掛了氮兵,整個系統(tǒng)也就無法正常工作裂逐,可靠性太差〉ň纾可靠性問題:一旦 NIO 線程意外跑飛絮姆,或者進(jìn)入死循環(huán),會導(dǎo)致整個系統(tǒng)通信模塊不可用秩霍,不能接收和處理外部消息篙悯,造成節(jié)點(diǎn)故障。
  2. 單線程的另外一個問題是在大負(fù)載的情況下铃绒,Reactor的處理速度必然會成為系統(tǒng)性能的瓶頸鸽照。一個 NIO 線程同時處理成百上千的鏈路,性能上無法支撐颠悬,即便 NIO 線程的 CPU 負(fù)荷達(dá)到 100%矮燎,也無法滿足海量消息的編碼、解碼赔癌、讀取和發(fā)送诞外;當(dāng) NIO 線程負(fù)載過重之后,處理速度將變慢灾票,這會導(dǎo)致大量客戶端連接超時峡谊,超時之后往往會進(jìn)行重發(fā),這更加重了 NIO 線程的負(fù)載,最終會導(dǎo)致大量消息積壓和處理超時既们,成為系統(tǒng)的性能瓶頸濒析;

為了解決這些問題,演進(jìn)出了 Reactor 多線程模型啥纸。

多線程模型

在Reactor單線程模型中号杏,I/0任務(wù)和業(yè)務(wù)邏輯都由Reactor線程完成,這增加了Reactor線程的負(fù)擔(dān)斯棒,高負(fù)載情況下容易出現(xiàn)性能瓶頸盾致,并且無法利用cpu多核或者多cpu的功能,所以就有了多線程版本的reactor模型名船。

改進(jìn)點(diǎn)

  • 接受客戶端連接請求的不在是單個線程-Acceptor绰上,而是一個NIO線程池。
  • I/O處理也不再是單個線程處理渠驼,而是交給一個I/O線程池進(jìn)行處理蜈块。

首先定義服務(wù)端用于處理請求的Handler,通過實(shí)現(xiàn)ChannelHandler接口完成迷扇。

public class SimpleServerChannelHandler implements ChannelHandler {
    
    private static Logger LOG = LoggerFactory.getLogger(SimpleServerChannelHandler.class);
    
    //記錄接受消息的次數(shù)
    public volatile int receiveSize;
    
    //記錄拋出的異常
    public volatile Throwable t;
    
    @Override
    public void channelActive(NioChannel channel) {
        if(LOG.isDebugEnabled()){
            LOG.debug("ChannelActive");
        }
    }

    @Override
    public void channelRead(NioChannel channel, Object msg) throws Exception {
        
        ByteBuffer bb = (ByteBuffer)msg;

        byte[] con = new byte[bb.remaining()];
        bb.get(con);

        String str = new String(con,0,con.length);

        String resp = "";
        switch(str){
        case "request1":resp = "response1";break;
        case "request2":resp = "response2";break;
        case "request3":resp = "response3";break;
        default :resp = "Hello Client";
        }

        ByteBuffer buf = ByteBuffer.allocate(resp.getBytes().length);
        buf.put(resp.getBytes());
        
        receiveSize++;
        
        channel.sendBuffer(buf);
    }

    @Override
    public void exceptionCaught(NioChannel channel, Throwable t)
            throws Exception {
        this.t = t;
        channel.close();
    }

}

Junit測試用例粪狼,setUp用于啟動Server端和Client端偿衰。

public class ReactorTest extends BaseTest{
    private static final Logger LOG = LoggerFactory.getLogger(ReactorTest.class);

    private static String HOST = "localhost";

    private static int PORT = 8888;

    private static Client client;
    private static Server server;

    static SimpleServerChannelHandler h;

    @BeforeClass
    public static void setUp() throws Exception {
        startServer();
        startClient();
    }
    private static void startServer() throws Exception{
        server = new Server();
        ReactorPool mainReactor = new ReactorPool();
        ReactorPool subReactor = new ReactorPool();

        h = new SimpleServerChannelHandler();
        server.reactor(mainReactor, subReactor)
        .handler(h)
        .bind(new InetSocketAddress(HOST,PORT));
    }
    private static void startClient() throws SocketException{
        client = new Client();
        client.socket().setTcpNoDelay(true);
        client.connect(
                new InetSocketAddress(HOST,PORT));
    }
    @Test
    public void test() {
        LOG.info("Sucessful configuration");
    }

    @Test
    public void testBaseFunction(){
        LOG.debug("testBaseFunction()");

        String msg ="Hello Reactor";
        ByteBuffer resp = client.syncSend(ByteBuffer.wrap(msg.getBytes()));
        byte[] res = new byte[resp.remaining()];
        resp.get(res);

        Assert.assertEquals("Hello Client", new String(res,0,res.length));
    }

    @Test
    public void testMultiSend(){

        int sendSize = 1024;

        for(int i = 0; i < sendSize; i++){
            ByteBuffer bb = ByteBuffer.wrap("Hello Reactor".getBytes());
            ByteBuffer resp = client.syncSend(bb);
            byte[] res = new byte[resp.remaining()];
            resp.get(res);

            Assert.assertEquals("Hello Client", new String(res,0,res.length));
        }
        Assert.assertEquals(sendSize, h.receiveSize);
    }
    @Test
    public void testTooLongReceivedByteSizeEexception(){
        LOG.debug("testTooLongReceivedByteSizeEexception()");

        int threshold = 1024;
        byte[] dest = new byte[threshold + 1];
        Random r = new Random();
        r.nextBytes(dest);
        client.syncSend(ByteBuffer.wrap(dest));
        
        Assert.assertEquals(IllegalArgumentException.class, h.t.getClass());
        
        Assert.assertEquals("Illegal data length, len:" + (threshold+1), h.t.getMessage());
    }
    @AfterClass
    public static void tearDown() throws Exception {
        server.close();
        client.close();
    }
}

一共進(jìn)行三項(xiàng)基本測試:

testBaseFunction
實(shí)現(xiàn)了基本發(fā)送接收消息的功能。testMultiSend
重復(fù)發(fā)送消息,并且記錄消息收發(fā)的次數(shù)吆寨。

testTooLongReceivedByteSizeEexception
測試server端在接收到異常碼流的情況下庭再,是否拋出異常

原理分析

Reactor和ReactorPool

Reactor作用就是不斷進(jìn)行輪詢并檢查是否有已經(jīng)就緒的事件膳汪,如果有茄袖,那么就將事件分發(fā)給對應(yīng)的Handler進(jìn)行處理。這個角色其實(shí)就是NIO編程中的多路復(fù)用器java.nio.channels.Selector雏胃。因此请毛,Reactor聚合一個Selector類型成員變量。輪詢的過程如下:

public class Reactor extends Thread{

//...

    private Selector selector;

    private volatile boolean isShutdown;

    Reactor(){
        try {
            selector = Selector.open();
        } catch (IOException e) {
            throw new RuntimeException("failed to open a new selector", e);
        }
    }
    
@Override
    public void run() {
        for(;;){
            try {
                getSelector().select(wakenUp);
                Set<SelectionKey> keys;
                synchronized(this){
                    keys = getSelector().selectedKeys();
                }
                Iterator<SelectionKey> it = keys.iterator();
                while(it.hasNext()){
                    SelectionKey key = it.next();
                    processSelectedKey(key);
                    it.remove();
                }
                if(isShutdown()){
                    break;
                }
            } catch (Throwable e) {
                LOG.warn("Unexpected exception in the selector loop.", e);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e1) { }
            }
        }
    }
}

processSelectedKey(key)中進(jìn)行的就是根據(jù)就緒事件key.readyOps()進(jìn)行相應(yīng)操作:

    private void processSelectedKey(SelectionKey key){
        try {       
            NioChannel nioChannel = (NioChannel)key.attachment();

            if (!nioChannel.isOpen()) {
                LOG.warn("trying to do i/o on a null socket");
                return;
            }

            int readyOps = key.readyOps();
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                nioChannel.sink().doRead();
            }
            if((readyOps & SelectionKey.OP_WRITE) != 0){
                nioChannel.sink().doSend();
            }
            if((readyOps & SelectionKey.OP_CONNECT) != 0){
                //remove OP_CONNECT
                key.interestOps((key.interestOps() & ~SelectionKey.OP_CONNECT));
            }
        }catch (Throwable t) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Throwable stack trace", t);
            }
            closeSocket();
        }
    }

這里的NioChannel是抽象類瞭亮,是對NIO編程中的Channel語義的抽象.
此外方仿,Reactor肯定要提供一個注冊接口。

    public SelectionKey register(final NioChannel sc, final int interestOps, Object attachment){
        if(sc == null){
            throw new NullPointerException("SelectableChannel");
        }
        if(interestOps == 0){
            throw new IllegalArgumentException("interestOps must be non-zero.");
        }
        SelectionKey key;
        try {
            key = sc.channel().register(getSelector(), interestOps, attachment);
        } catch (ClosedChannelException e) {
            throw new RuntimeException("failed to register a channel", e);
        }
        return key;
    }

ReactorPool是一個Reactor的線程池统翩,這里就通過簡單的數(shù)組形式進(jìn)行模擬:

public class ReactorPool {

    private static final Logger LOG = LoggerFactory.getLogger(ReactorPool.class);

    private Reactor[] reactors;

    private AtomicInteger index = new AtomicInteger();
    
    //線程數(shù)默認(rèn)為CPU數(shù)*2
    private final int DEFAULT_THREADS = Runtime.getRuntime().availableProcessors() * 2;

    public ReactorPool (){
        this(0);
    }
    public ReactorPool(int nThreads){
        if(nThreads < 0){
            throw new IllegalArgumentException("nThreads must be nonnegative number");
        }
        if(nThreads == 0){
            nThreads = DEFAULT_THREADS;
        }
        reactors = new Reactor[nThreads];
        for(int i = 0; i < nThreads; i++){
            boolean succeed = false;
            try{
                reactors[i] = new Reactor();
                succeed = true;
            }catch(Exception e){
                throw new IllegalStateException("failed to create a Reactor", e);
            }finally{
                if (!succeed) {
                    for (int j = 0; j < i; j ++) {
                        reactors[j].close();
                    }
                }
            }
        }
    }

    public Reactor next(){
        return reactors[index.incrementAndGet() % reactors.length];
    }

    public void close(){
        for(int i = 0; i < reactors.length; i++){
            reactors[i].setShutdown(true);
            reactors[i].close();
        }
    }
}
NioChannel和NioChannelSink

在進(jìn)行Java原生Nio編程的過程中仙蚜,會涉及到兩種類型的Channel:

java.nio.channels.SocketChannel
java.nio.channels.ServerSocketChannel

其分別作為客戶端和服務(wù)端調(diào)用接口。為了統(tǒng)一其公共行為厂汗,這里抽象出一個抽象類NioChannel委粉,其成員組成如下:

  • 聚合一個SelectableChannel類型(SocketChannel和ServerSocketChannel的公共父類)的成員變量。
  • 持有一個所屬Reactor對象的引用
  • 聚合一個NioChannelSink類型成員變量娶桦。

NioChannelSink是將NioChannel的底層讀寫功能獨(dú)立出來艳丛。一方面使NioChannel避免集成過多功能而顯得臃腫匣掸,另一方面分離出底層傳輸協(xié)議趟紊,為以后底層傳輸協(xié)議的切換做準(zhǔn)備氮双。(TCP vs UDP,NIO霎匈、OIO戴差、AIO)從這種意義上說,NioChannel取名為Channel貌似更合理铛嘱。

public abstract class NioChannel {

    protected Reactor reactor;

    protected SelectableChannel sc;

    protected SelectionKey selectionKey;

    private NioChannelSink sink;

    protected volatile ChannelHandler handler;
    
    public NioChannel(SelectableChannel sc, int interestOps){
        this.sc = sc;
        try {
            sc.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
        }
        sink = nioChannelSink();
    }
    
    protected void fireChannelRead(ByteBuffer bb){
        try {
            handler.channelRead(this, bb);
        } catch (Exception e) {
            fireExceptionCaught(e);
        }
    }
    protected void fireExceptionCaught(Throwable t){
        try {
            handler.exceptionCaught(this, t);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //暖释。。墨吓。
    
    public abstract NioChannelSink nioChannelSink();

    public interface NioChannelSink{

        void doRead();

        void doSend();

        void sendBuffer(ByteBuffer bb);
        
        void close();
    }
}

再來看下NioChannel需要提供哪些功能:
首先球匕,NIO編程中SocketChannel或ServerSocketChannel需要注冊到多路復(fù)用器Selector中。那么這里就抽象成了NioChannel和Reactor的交互帖烘。

public void register(Reactor reactor, int interestOps){
    this.reactor = reactor;
    try {
        selectionKey = sc.register(reactor().getSelector(), interestOps, this);
    } catch (ClosedChannelException e) {
        e.printStackTrace();
    }
}

這里將NioChannel對象作為附件亮曹,在Reactor中心輪詢到ready事件后,會根據(jù)事件的類型(OP_ACCEPT OP_READ等)秘症,從SelectionKey中取出綁定的附件NioChannel

NioChannel nioChannel = (NioChannel)key.attachment();

然后根據(jù)進(jìn)行key.readyOps()做相應(yīng)操作照卦。其次,作為Channel肯定要提供綁定bind和連接connect的功能了:

public abstract void bind(InetSocketAddress remoteAddress) throws Exception;
    
public abstract void connect(InetSocketAddress remoteAddress) throws Exception;

這里用抽象方法是要將實(shí)現(xiàn)交由子類來完成乡摹。

最后役耕,是用戶通過NioChannel發(fā)送的消息的函數(shù):

public void sendBuffer(ByteBuffer bb){
    sink().sendBuffer(bb);
}

protected final void enableWrite(){
    int i = selectionKey.interestOps();
    if((i & SelectionKey.OP_WRITE) == 0){
        selectionKey.interestOps(i | SelectionKey.OP_WRITE);
    }
}
protected final void disableWrite(){
    int i = selectionKey.interestOps();
    if((i & SelectionKey.OP_WRITE) == 1){
        selectionKey.interestOps(i & (~SelectionKey.OP_WRITE));         
    }
}
NioServerSocketChannel和NioSocketChannel

NioServerSocketChannel和NioSocketChannel是抽象類NioChannel的一個子類,NioServerSocketChannel和java.nio.channels.ServerSocketChannel的語義是一致的聪廉,供服務(wù)端使用瞬痘,綁定指定端口,監(jiān)聽客戶端發(fā)起的連接請求板熊,并交由相應(yīng)Handler處理框全。而NioSocketChannel和java.nio.channels.NioSocketChannel語義一致,作為通信的一個通道邻邮。

public class NioServerSocketChannel extends NioChannel{

    private static final Logger LOG = LoggerFactory.getLogger(NioServerSocketChannel.class);
    
    public NioServerSocketChannel(){
        super(newSocket());
    }
    
    public static ServerSocketChannel newSocket(){
        ServerSocketChannel socketChannel = null;
        try {
            socketChannel = ServerSocketChannel.open();
        } catch (IOException e) {
            LOG.error("Unexpected exception occur when open ServerSocketChannel");
        }
        return socketChannel;
    }
    
    @Override
    public NioChannelSink nioChannelSink() {
        return new NioServerSocketChannelSink();
    }
    
    class NioServerSocketChannelSink implements NioChannelSink{
        //竣况。。筒严。
    }
        @Override
    public void bind(InetSocketAddress remoteAddress) throws Exception {
        ServerSocketChannel ssc = (ServerSocketChannel)sc;
        ssc.bind(remoteAddress);
    }
    @Override
    public void connect(InetSocketAddress remoteAddress) throws Exception {
        throw new UnsupportedOperationException();
    }
}

這里獲取ServerSocketChannel實(shí)例的方式是通過ServerSocketChannel.open()丹泉,其實(shí)也可以通過反射來獲取,這樣就能將ServerSocketChannel和SocketChannel的實(shí)例化邏輯進(jìn)行統(tǒng)一鸭蛙,我們只需要在實(shí)例化Channel的時候?qū)erverSocketChannel.class 或 SocketChannel.class當(dāng)作參數(shù)傳入即可摹恨。(netty就是這么干的)

NioSocketChannel的實(shí)現(xiàn)如下:

public class NioSocketChannel extends NioChannel{

    private static final Logger LOG = LoggerFactory.getLogger(NioSocketChannel.class);

    public NioSocketChannel() throws IOException{
        super( newSocket());
    }
    public NioSocketChannel(SocketChannel sc) throws IOException{
        super(sc);
    }
    public static SocketChannel newSocket(){
        SocketChannel socketChannel = null;
        try {
            socketChannel = SocketChannel.open();
        } catch (IOException e) {
        }
        return socketChannel;
    }

    @Override
    public NioChannelSink nioChannelSink() {
        return new NioSocketChannelSink();
    }
    
    class NioSocketChannelSink implements NioChannelSink{
        //。娶视。晒哄。
    }
    
    @Override
    public void bind(InetSocketAddress remoteAddress) throws Exception {
        throw new UnsupportedOperationException();
    }
    @Override
    public void connect(InetSocketAddress remoteAddress) throws Exception {
        SocketChannel socketChannel = (SocketChannel)sc;
        socketChannel.connect(remoteAddress);
    }
}
NioServerSocketChannelSink和NioSocketChannelSink

通過上面分析可知睁宰,NioChannel的只向上提供了操作接口,而具體的底層讀寫等功能全部代理給了NioChannelSink完成寝凌。接下來分析下NioChannelSink的兩個子類NioServerSocketChannelSink和NioSocketChannelSink柒傻。

    public interface NioChannelSink{

        void doRead();

        void doSend();

        void sendBuffer(ByteBuffer bb);
        
        void close();
    }

對于NioChannelSink的兩個實(shí)現(xiàn)類來說,每個方法所對應(yīng)的語義如下:

doRead()
NioServerSocketChannelSink:通過accept()接受客戶端的請求较木。
NioSocketChannelSink:讀取NioChannel中的數(shù)據(jù)

doSend()
NioServerSocketChannelSink:不支持红符。
NioSocketChannelSink:將緩沖區(qū)中數(shù)據(jù)寫入NioChannel

sendBuffer()
NioServerSocketChannelSink:不支持。
NioSocketChannelSink:發(fā)送數(shù)據(jù)伐债,其實(shí)就是將待發(fā)送數(shù)據(jù)加入緩沖隊列中

close()
NioServerSocketChannelSink:關(guān)閉Channel预侯。
NioSocketChannelSink:關(guān)閉Channel。

作為網(wǎng)絡(luò)編程中的Channel所提供的功能原比這里要多且復(fù)雜峰锁,作為學(xué)習(xí)Demo萎馅,這里只實(shí)現(xiàn)了最常用的幾個功能。

下面看下NioServerSocketChannelSink的實(shí)現(xiàn):

public class NioServerSocketChannel extends NioChannel{

    //虹蒋。糜芳。。
    
    class NioServerSocketChannelSink implements NioChannelSink{

        public void doRead() {
            try {
                ServerSocketChannel ssc = (ServerSocketChannel)sc;
                handler.channelRead(NioServerSocketChannel.this,
                        new NioSocketChannel(ssc.accept()));
                if(LOG.isDebugEnabled()){
                    LOG.debug("Dispatch the SocketChannel to SubReactorPool");
                }
            } catch (Exception e1) {
                e1.printStackTrace();
            }
        }

        public void doSend(){
            throw new UnsupportedOperationException();
        }

        @Override
        public void sendBuffer(ByteBuffer bb) {
            throw new UnsupportedOperationException();
        }

        @Override
        public void close() {
            try {
                if(sc != null){
                    sc.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }// end NioChannelSink
    
    //千诬。耍目。。
}

下面是NioSocketChannelSink實(shí)現(xiàn):

public class NioSocketChannel extends NioChannel{
    
    //徐绑。邪驮。。
    
    class NioSocketChannelSink implements NioChannelSink{
        
        private static final int MAX_LEN = 1024;
        
        ByteBuffer lenBuffer = ByteBuffer.allocate(4);

        ByteBuffer inputBuffer = lenBuffer;

        ByteBuffer outputDirectBuffer = ByteBuffer.allocateDirect(1024 * 64);

        LinkedBlockingQueue<ByteBuffer> outputQueue = new LinkedBlockingQueue<ByteBuffer>();

        public void close(){
            //clear buffer
            outputDirectBuffer = null;

            try {
                if(sc != null){
                    sc.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        public void doRead() {
            
            SocketChannel socketChannel = (SocketChannel)sc;

            int byteSize;
            try {
                byteSize = socketChannel.read(inputBuffer);

                if(byteSize < 0){
                    LOG.error("Unable to read additional data");
                    throw new RuntimeException("Unable to read additional data");
                }
                if(!inputBuffer.hasRemaining()){

                    if(inputBuffer == lenBuffer){
                        //read length
                        lenBuffer.flip();
                        int len = lenBuffer.getInt();
                        if(len < 0 || len > MAX_LEN){
                            throw new IllegalArgumentException("Illegal data length, len:" + len);
                        }
                        //prepare for receiving data
                        inputBuffer = ByteBuffer.allocate(len);
                        inputBuffer.clear();
                    }else{
                        //read data
                        if(inputBuffer.hasRemaining()){
                            socketChannel.read(inputBuffer);
                        }
                        if(!inputBuffer.hasRemaining()){
                            inputBuffer.flip();
                            
                            fireChannelRead(inputBuffer);
                            
                            //clear lenBuffer and waiting for next reading operation 
                            lenBuffer.clear();
                            inputBuffer = lenBuffer;
                        }
                    }
                }
            } catch (Throwable t) {
                if(LOG.isDebugEnabled()){
                    LOG.debug("Exception :" + t);
                }
                fireExceptionCaught(t);
            }
        }

        public void doSend(){
            /**
             * write data to channel:
             * step 1: write the length of data(occupy 4 byte)
             * step 2: data content
             */
            try {
                if(outputQueue.size() > 0){
                    ByteBuffer directBuffer = outputDirectBuffer;
                    directBuffer.clear();
                    for(ByteBuffer buf : outputQueue){
                        buf.flip();

                        if(buf.remaining() > directBuffer.remaining()){
                            //prevent BufferOverflowException
                            buf = (ByteBuffer) buf.slice().limit(directBuffer.remaining());
                        }
                        //transfers the bytes remaining in buf into  directBuffer
                        int p = buf.position();
                        directBuffer.put(buf);
                        //reset position
                        buf.position(p);

                        if(!directBuffer.hasRemaining()){
                            break;
                        }
                    }
                    directBuffer.flip();
                    int sendSize = ((SocketChannel)sc).write(directBuffer);

                    while(!outputQueue.isEmpty()){
                        ByteBuffer buf = outputQueue.peek();
                        int left = buf.remaining() - sendSize;
                        if(left > 0){
                            buf.position(buf.position() + sendSize);
                            break;
                        }
                        sendSize -= buf.remaining();
                        outputQueue.remove();
                    }
                }

                synchronized(reactor){
                    if(outputQueue.size() == 0){
                        //disable write
                        disableWrite();
                    }else{
                        //enable write
                        enableWrite();
                    }
                }
            } catch (Throwable t) {
                fireExceptionCaught(t);
            }
        }
        private ByteBuffer wrapWithHead(ByteBuffer bb){
            bb.flip();
            lenBuffer.clear();
            int len = bb.remaining();
            lenBuffer.putInt(len);
            ByteBuffer resp = ByteBuffer.allocate(len+4);

            lenBuffer.flip();
            resp.put(lenBuffer);
            resp.put(bb);

            return resp;
        }
        public void sendBuffer(ByteBuffer bb){
            try{
                synchronized(this){
                    //wrap ByteBuffer with length header
                    ByteBuffer wrapped = wrapWithHead(bb);

                    outputQueue.add(wrapped);

                    enableWrite();
                }
            }catch(Exception e){
                LOG.error("Unexcepted Exception: ", e);
            }
        }
    }// end NioSocketChannelSink
    
    //傲茄。毅访。。
}

NioSocketChannelSink中的讀寫功能在Reactor單線程版本里已經(jīng)分析過盘榨,這里就不再贅述喻粹。

ChannelHandler

ChannelHandler是Reactor框架提供給用戶進(jìn)行自定義的接口。接口提供了常用的接口:

public interface ChannelHandler {
    
    void channelActive(NioChannel channel);
    
    void channelRead(NioChannel channel, Object msg) throws Exception;
    
    void exceptionCaught(NioChannel channel, Throwable t) throws Exception;
}

多線程模型小結(jié)

在網(wǎng)絡(luò)編程中草巡,每建立一個Socket連接都會消耗一定資源守呜,當(dāng)回話結(jié)束后一定要關(guān)閉。此外山憨,必須考慮非正常流程時的情況查乒。比如發(fā)生異常,可能執(zhí)行不到關(guān)閉資源的操作郁竟。 如ReactorPool的實(shí)例化過程:

    public ReactorPool(int nThreads){
        //玛迄。。
        reactors = new Reactor[nThreads];
        for(int i = 0; i < nThreads; i++){
            boolean succeed = false;
            try{
                reactors[i] = new Reactor();
                succeed = true;
            }catch(Exception e){
                throw new IllegalStateException("failed to create a Reactor", e);
            }finally{
                if (!succeed) {
                    for (int j = 0; j < i; j ++) {
                        reactors[j].close();
                    }
                }
            }
        }
    }

當(dāng)實(shí)例化過程中發(fā)送異常時棚亩,記得要及時回收已占用資源蓖议。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末虏杰,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子勒虾,更是在濱河造成了極大的恐慌纺阔,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,284評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件从撼,死亡現(xiàn)場離奇詭異州弟,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)低零,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來拯杠,“玉大人掏婶,你說我怎么就攤上這事√杜悖” “怎么了雄妥?”我有些...
    開封第一講書人閱讀 164,614評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長依溯。 經(jīng)常有香客問我老厌,道長,這世上最難降的妖魔是什么黎炉? 我笑而不...
    開封第一講書人閱讀 58,671評論 1 293
  • 正文 為了忘掉前任枝秤,我火速辦了婚禮,結(jié)果婚禮上慷嗜,老公的妹妹穿的比我還像新娘淀弹。我一直安慰自己,他們只是感情好庆械,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,699評論 6 392
  • 文/花漫 我一把揭開白布薇溃。 她就那樣靜靜地躺著,像睡著了一般缭乘。 火紅的嫁衣襯著肌膚如雪沐序。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,562評論 1 305
  • 那天堕绩,我揣著相機(jī)與錄音策幼,去河邊找鬼。 笑死逛尚,一個胖子當(dāng)著我的面吹牛垄惧,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播绰寞,決...
    沈念sama閱讀 40,309評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼到逊,長吁一口氣:“原來是場噩夢啊……” “哼铣口!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起觉壶,我...
    開封第一講書人閱讀 39,223評論 0 276
  • 序言:老撾萬榮一對情侶失蹤脑题,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后铜靶,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體叔遂,經(jīng)...
    沈念sama閱讀 45,668評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,859評論 3 336
  • 正文 我和宋清朗相戀三年争剿,在試婚紗的時候發(fā)現(xiàn)自己被綠了已艰。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,981評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡蚕苇,死狀恐怖哩掺,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情涩笤,我是刑警寧澤嚼吞,帶...
    沈念sama閱讀 35,705評論 5 347
  • 正文 年R本政府宣布,位于F島的核電站蹬碧,受9級特大地震影響舱禽,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜恩沽,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,310評論 3 330
  • 文/蒙蒙 一誊稚、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧飒筑,春花似錦片吊、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至肤晓,卻和暖如春爷贫,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背补憾。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評論 1 270
  • 我被黑心中介騙來泰國打工漫萄, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人盈匾。 一個月前我還...
    沈念sama閱讀 48,146評論 3 370
  • 正文 我出身青樓腾务,卻偏偏與公主長得像,于是被迫代替她去往敵國和親削饵。 傳聞我的和親對象是個殘疾皇子岩瘦,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,933評論 2 355

推薦閱讀更多精彩內(nèi)容

  • Reactor模型是典型的事件驅(qū)動模型未巫。在網(wǎng)絡(luò)編程中,所謂的事件當(dāng)然就是read启昧、write叙凡、bind、conne...
    topgunviper閱讀 4,014評論 0 9
  • 1.概述 在Reactor單線程版本的設(shè)計中密末,I/O任務(wù)乃至業(yè)務(wù)邏輯都由Reactor線程來完成握爷,這無疑增加了Re...
    topgunviper閱讀 4,929評論 0 7
  • 作者:李林鋒 原文:http://www.infoq.com/cn/articles/netty-high-per...
    楊鑫科閱讀 3,976評論 0 64
  • 今天來聊聊我的午餐-煎餅。 首先我慷慨的傳授給大家我們家的獨(dú)門做法(so easy): 主角:面粉 配角:木薯粉(...
    大嘴企鵝閱讀 182評論 2 1
  • 容易心動的年齡常遇那些令人感動的事严里,今天和大家說一些我眼中看到的那些愛情新啼。 郎才女貌,男帥女靚必然是人們羨慕的愛...
    修清姝閱讀 230評論 0 0