Java IO, NIO, AIO和Netty

背景

最近在回顧一下Java IO相關(guān)的知識(shí),順帶寫一下入門級(jí)別的文章跌穗。感覺工作以后很少寫文章订晌,一直想寫點(diǎn)高質(zhì)量的文章導(dǎo)致最后一篇文章都很難寫。所以不寫原理蚌吸,只寫實(shí)踐锈拨,隨大流,有問題請(qǐng)留言羹唠。(后續(xù)有時(shí)間再補(bǔ)充原理性的東西奕枢,從硬件到操作系統(tǒng)到JVM到JDK)

實(shí)現(xiàn)案例

創(chuàng)建一個(gè)server,可以接受多個(gè)client端的連接佩微,接收到信息后返回一個(gè)接收到的信息缝彬。

傳統(tǒng)IO實(shí)現(xiàn)

傳統(tǒng)的IO就是我們所說的BIO(block io),

server端源碼如下

package tech.sohocoder.postman.io;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class Server {

    private ServerSocket serverSocket;


    private void start() throws IOException, ClassNotFoundException {
        InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", 9000);
        serverSocket = new ServerSocket();
        serverSocket.bind(inetSocketAddress);
        ExecutorService executorService = Executors.newCachedThreadPool(new CaughtExceptionsThreadFactory());
        while (true) {
            Socket socket = serverSocket.accept();
            System.out.println("accept socket: " + socket.getRemoteSocketAddress());
            executorService.submit(new SocketHandler(socket));
        }
    }

    private static class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
        @Override public void uncaughtException(Thread t, Throwable e) {
            e.printStackTrace();
        }
    }

    private class SocketHandler implements Runnable {

        private Socket socket;

        public SocketHandler(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
                    String message = ois.readObject().toString();
                    System.out.println("Message Received: " + message);
                    ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
                    //write object to Socket
                    oos.writeObject("Hi Client " + message);
                    if (message.equals("quit")) {
                        ois.close();
                        oos.close();
                        socket.close();
                        break;
                    }
                }
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException {
        Server server = new Server();
        server.start();
    }
}

client端源碼如下

package tech.sohocoder.postman.io;

import java.io.*;
import java.net.Socket;

public class Client {

    private Socket socket;

    public void start() throws IOException, ClassNotFoundException {
        socket = new Socket("localhost", 9000);
        if(socket.isConnected()) {
            System.out.println("socket is connected");
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            for (;;) {
                final String input = in.readLine();
                final String line = input != null ? input.trim() : null;
                if (null == line || line.isEmpty()) { // skip `enter` or `enter` with spaces.
                    continue;
                }
                ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
                oos.writeObject(line);
                ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
                System.out.println("Message: " + ois.readObject());
                if(line.equals("quit")) {
                    oos.close();
                    ois.close();
                    socket.close();
                    break;
                }
            }
        }
        System.out.println("Bye");
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException {
        Client client = new Client();
        client.start();
    }
}

NIO的阻塞實(shí)現(xiàn)

NIO實(shí)際上就是面向緩存及通道的新型IO(由JSR 51定義哺眯,后面JSR 203進(jìn)行了擴(kuò)展跌造,有興趣閱讀一下這兩個(gè)JSR)可以支持阻塞和非阻塞方式。先實(shí)現(xiàn)一下阻塞方式

client

package tech.sohocoder.nio.block;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

import static java.lang.System.out;

public class Client {

    private SocketChannel socketChannel;

    public void start() throws IOException {
        socketChannel = SocketChannel.open();
        SocketAddress socketAddress = new InetSocketAddress("localhost", 9000);
        socketChannel.connect(socketAddress);

        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
        for (;;) {
            final String input = in.readLine();
            final String line = input != null ? input.trim() : null;
            if (null == line || line.isEmpty()) { // skip `enter` or `enter` with spaces.
                continue;
            }

            ByteBuffer byteBuffer = ByteBuffer.wrap(line.getBytes());
            socketChannel.write(byteBuffer);

            if(line.equals("quit")) {
                out.println("quit!");
                socketChannel.close();
                break;
            }

            ByteBuffer returnByteBuffer = ByteBuffer.allocate(1024);
            socketChannel.read(returnByteBuffer);
            String message = new String(returnByteBuffer.array()).trim();
            out.println("Receive message: " + message);
        }
    }

    public static void main(String[] args) throws IOException {
        tech.sohocoder.nio.noblock.Client client = new tech.sohocoder.nio.noblock.Client();
        client.start();
    }
}

server

package tech.sohocoder.nio.block;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

public class Server {

    private ServerSocketChannel serverSocketChannel;

    private void start() throws IOException {
        serverSocketChannel = ServerSocketChannel.open();
        SocketAddress socketAddress = new InetSocketAddress(9000);
        serverSocketChannel.bind(socketAddress);

        while (true) {
            System.out.println("listening...");
            SocketChannel socketChannel = serverSocketChannel.accept();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            int readLength = socketChannel.read(byteBuffer);
            if(readLength != -1) {
                String receiveStr = new String(byteBuffer.array()).trim();
                System.out.println(receiveStr);
                socketChannel.write(byteBuffer);
            }
            socketChannel.close();
        }
    }

    public static void main(String[] args) throws IOException {
        Server server = new Server();
        server.start();
    }
}

NIO的非阻塞方式

NIO如果需要非阻塞族购,需要使用到selector壳贪。selector是在JDK1.4加入,主要是用于支持IO多路復(fù)用寝杖,Linux下jdk實(shí)現(xiàn)就是基于epoll违施。

client端代碼保存一致。

server端實(shí)際上就是使用一個(gè)線程來支持多個(gè)連接

package tech.sohocoder.nio.noblock;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

import static java.lang.System.out;

public class Server {

    private ServerSocketChannel serverSocketChannel;

    private Selector selector;

    private void start() throws IOException, InterruptedException {
        serverSocketChannel = ServerSocketChannel.open();
        SocketAddress socketAddress = new InetSocketAddress(9000);
        serverSocketChannel.bind(socketAddress);

        serverSocketChannel.configureBlocking(false);

        int opSelectionKey = serverSocketChannel.validOps();

        selector = Selector.open();

        SelectionKey selectionKey = serverSocketChannel.register(selector, opSelectionKey);

        out.println(selector);
        out.println(selectionKey);
        while(true) {
            out.println("waiting for connected...");
            selector.select();
            Set<SelectionKey> set  = selector.selectedKeys();
            Iterator<SelectionKey> iterator = set.iterator();
            while (iterator.hasNext()) {
                SelectionKey mySelectionKey = iterator.next();
                if(mySelectionKey.isAcceptable()) {
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    SelectionKey selectionKey1 = socketChannel.register(selector, SelectionKey.OP_READ);
                    out.println("socket channel selectionkey: " + selectionKey1);
                    out.println("connect from : " + socketChannel.getRemoteAddress());
                }else if(mySelectionKey.isReadable()){
                    SocketChannel socketChannel = (SocketChannel) mySelectionKey.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    socketChannel.read(byteBuffer);
                    String message = new String(byteBuffer.array()).trim();
                    out.println("Receive message: " + message);
                    if(message.equals("quit")) {
                        out.println("close connection: " + socketChannel.getRemoteAddress());
                        socketChannel.close();
                        mySelectionKey.cancel();
                    }else {
                        ByteBuffer returnByteBuffer = ByteBuffer.wrap(" receive your message".getBytes());
                        socketChannel.write(returnByteBuffer);
                    }
                }
                iterator.remove();
            }
        }

    }

    public static void main(String[] args) throws IOException, InterruptedException {
        Server server = new Server();
        server.start();
    }
}

AIO實(shí)現(xiàn)

上面的IO瑟幕,NIO的阻塞實(shí)際上是同步阻塞的方式磕蒲,NIO的非阻塞是同步非阻塞方式留潦。AIO(asynchronous I/O))是異步IO,實(shí)現(xiàn)是異步非阻塞方式辣往,在jdk1.7中引入兔院。

server端源碼如下:

package tech.sohocoder.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static java.lang.System.out;

public class Server {

    private AsynchronousServerSocketChannel asynchronousServerSocketChannel;

    private void start() throws IOException, InterruptedException {
        // worker thread pool
        AsynchronousChannelGroup asynchronousChannelGroup = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 4);
        asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open(asynchronousChannelGroup);
        int port = 9000;
        InetSocketAddress socketAddress = new InetSocketAddress("localhost", port);
        asynchronousServerSocketChannel.bind(socketAddress);

        out.println("Starting listening on port " + port);
        // add handler
        asynchronousServerSocketChannel.accept(asynchronousServerSocketChannel, new CompletionHandler<AsynchronousSocketChannel, Object>() {
            @Override
            public void completed(AsynchronousSocketChannel asynchronousSocketChannel, Object o) {

                try {
                    out.println("connect from : " + asynchronousSocketChannel.getRemoteAddress());
                } catch (IOException e) {
                    e.printStackTrace();
                }
                // accept next connection
                asynchronousServerSocketChannel.accept(asynchronousServerSocketChannel, this);
                while (true) {
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    Future<Integer> future = asynchronousSocketChannel.read(byteBuffer);
                    try {
                        future.get();
                        String message = new String(byteBuffer.array()).trim();
                        out.println("Receive message: " + message);
                        if (message.equals("quit")) {
                            out.println("close client: " + asynchronousSocketChannel.getRemoteAddress());
                            asynchronousSocketChannel.close();
                            break;
                        }

                        ByteBuffer returnByteBuffer = ByteBuffer.wrap("receive your message".getBytes());
                        Future<Integer> returnFuture = asynchronousSocketChannel.write(returnByteBuffer);
                        returnFuture.get();

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }

            @Override
            public void failed(Throwable throwable, Object o) {
                out.println("error to accept: " + throwable.getMessage());
            }
        });
        asynchronousChannelGroup.awaitTermination(Integer.MAX_VALUE, TimeUnit.MINUTES);
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        Server server = new Server();
        server.start();
    }
}

Netty實(shí)現(xiàn)

Netty是java中使用很廣泛的庫,既可以實(shí)現(xiàn)NIO也可以實(shí)現(xiàn)AIO站削,還是針對(duì)上面的例子來實(shí)現(xiàn)一下

server端

package tech.sohocoder.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import static java.lang.System.out;

public class Server {

    private void start() throws InterruptedException {
        EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // add handler into pipeline
                            socketChannel.pipeline()
                                    .addLast(new StringDecoder())
                                    .addLast(new StringEncoder())
                                    .addLast(new ServerHandler());
                        }
                    });

            ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();
            out.println("listening...");
            channelFuture.channel().closeFuture().sync();
        }finally {
           bossEventLoopGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws InterruptedException {
        Server server = new Server();
        server.start();
    }

}

這里面需要使用到ServerHandler坊萝,具體代碼如下

package tech.sohocoder.netty;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;

import java.net.SocketAddress;

import static java.lang.System.out;

public class ServerHandler extends ChannelDuplexHandler {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        out.println("Receive message: " + msg);
        String message = "receive your message";
        ctx.writeAndFlush(message);
    }

    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        super.connect(ctx, remoteAddress, localAddress, promise);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        out.println("connect from: " + ctx.channel().remoteAddress().toString());
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        out.println("close connection: " + ctx.channel().remoteAddress().toString());
        super.channelInactive(ctx);
    }

}

client端也用netty寫一下

package tech.sohocoder.aio;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

import static java.lang.System.out;

public class Client {

    private SocketChannel socketChannel;

    public void start() throws IOException {
        socketChannel = SocketChannel.open();
        SocketAddress socketAddress = new InetSocketAddress("localhost", 9000);
        socketChannel.connect(socketAddress);
        if(socketChannel.isConnected()) {
            out.println("connect to " + socketChannel.getRemoteAddress());
        }

        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
        for (;;) {
            final String input = in.readLine();
            final String line = input != null ? input.trim() : null;
            if (null == line || line.isEmpty()) { // skip `enter` or `enter` with spaces.
                continue;
            }

            ByteBuffer byteBuffer = ByteBuffer.wrap(line.getBytes());
            socketChannel.write(byteBuffer);

            if(line.equals("quit")) {
                out.println("quit!");
                socketChannel.close();
                break;
            }

            ByteBuffer returnByteBuffer = ByteBuffer.allocate(1024);
            socketChannel.read(returnByteBuffer);
            String message = new String(returnByteBuffer.array()).trim();
            out.println("Receive message: " + message);
        }
    }

    public static void main(String[] args) throws IOException {
        Client client = new Client();
        client.start();
    }
}

同樣要實(shí)現(xiàn)一個(gè)ClientHandler

package tech.sohocoder.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import static java.lang.System.out;

public class ClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        out.println("Receive message: " + s);
    }
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市许起,隨后出現(xiàn)的幾起案子十偶,更是在濱河造成了極大的恐慌,老刑警劉巖园细,帶你破解...
    沈念sama閱讀 217,542評(píng)論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件惦积,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡猛频,警方通過查閱死者的電腦和手機(jī)狮崩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,822評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來鹿寻,“玉大人厉亏,你說我怎么就攤上這事×液停” “怎么了爱只?”我有些...
    開封第一講書人閱讀 163,912評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)招刹。 經(jīng)常有香客問我恬试,道長(zhǎng),這世上最難降的妖魔是什么疯暑? 我笑而不...
    開封第一講書人閱讀 58,449評(píng)論 1 293
  • 正文 為了忘掉前任训柴,我火速辦了婚禮,結(jié)果婚禮上妇拯,老公的妹妹穿的比我還像新娘幻馁。我一直安慰自己,他們只是感情好越锈,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,500評(píng)論 6 392
  • 文/花漫 我一把揭開白布仗嗦。 她就那樣靜靜地躺著,像睡著了一般甘凭。 火紅的嫁衣襯著肌膚如雪稀拐。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,370評(píng)論 1 302
  • 那天丹弱,我揣著相機(jī)與錄音德撬,去河邊找鬼铲咨。 笑死,一個(gè)胖子當(dāng)著我的面吹牛蜓洪,可吹牛的內(nèi)容都是我干的纤勒。 我是一名探鬼主播,決...
    沈念sama閱讀 40,193評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼隆檀,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼摇天!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起刚操,我...
    開封第一講書人閱讀 39,074評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤闸翅,失蹤者是張志新(化名)和其女友劉穎再芋,沒想到半個(gè)月后菊霜,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,505評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡济赎,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,722評(píng)論 3 335
  • 正文 我和宋清朗相戀三年鉴逞,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片司训。...
    茶點(diǎn)故事閱讀 39,841評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡构捡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出壳猜,到底是詐尸還是另有隱情勾徽,我是刑警寧澤,帶...
    沈念sama閱讀 35,569評(píng)論 5 345
  • 正文 年R本政府宣布统扳,位于F島的核電站喘帚,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏咒钟。R本人自食惡果不足惜吹由,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,168評(píng)論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望朱嘴。 院中可真熱鬧倾鲫,春花似錦、人聲如沸萍嬉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,783評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽壤追。三九已至玫荣,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間大诸,已是汗流浹背捅厂。 一陣腳步聲響...
    開封第一講書人閱讀 32,918評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工贯卦, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人焙贷。 一個(gè)月前我還...
    沈念sama閱讀 47,962評(píng)論 2 370
  • 正文 我出身青樓撵割,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親辙芍。 傳聞我的和親對(duì)象是個(gè)殘疾皇子啡彬,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,781評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容