Java網(wǎng)絡(luò)編程與NIO詳解2:JAVA NIO 一步步構(gòu)建I/O多路復(fù)用的請(qǐng)求模型

<font color="#333333" face="PingFangSC, helvetica neue, hiragino sans gb, arial, microsoft yahei ui, microsoft yahei, simsun, sans-serif">微信公眾號(hào)【黃小斜】作者是螞蟻金服 JAVA 工程師讹俊,專注于 JAVA 后端技術(shù)棧:SpringBoot垦沉、SSM全家桶、MySQL劣像、分布式、中間件摧玫、微服務(wù)耳奕,同時(shí)也懂點(diǎn)投資理財(cái)绑青,堅(jiān)持學(xué)習(xí)和寫作,相信終身學(xué)習(xí)的力量屋群!關(guān)注公眾號(hào)后回復(fù)”架構(gòu)師“即可領(lǐng)取 Java基礎(chǔ)闸婴、進(jìn)階、項(xiàng)目和架構(gòu)師等免費(fèi)學(xué)習(xí)資料芍躏,更有數(shù)據(jù)庫(kù)邪乍、分布式、微服務(wù)等熱門技術(shù)學(xué)習(xí)視頻对竣,內(nèi)容豐富庇楞,兼顧原理和實(shí)踐,另外也將贈(zèng)送作者原創(chuàng)的Java學(xué)習(xí)指南否纬、Java程序員面試指南等干貨資源吕晌。</font>

當(dāng)前環(huán)境

  1. jdk == 1.8

代碼地址

git 地址:https://github.com/jasonGeng88/java-network-programming

知識(shí)點(diǎn)

  • nio 下 I/O 阻塞與非阻塞實(shí)現(xiàn)
  • SocketChannel 介紹
  • I/O 多路復(fù)用的原理
  • 事件選擇器與 SocketChannel 的關(guān)系
  • 事件監(jiān)聽類型
  • 字節(jié)緩沖 ByteBuffer 數(shù)據(jù)結(jié)構(gòu)

場(chǎng)景

接著上一篇中的站點(diǎn)訪問問題,如果我們需要并發(fā)訪問10個(gè)不同的網(wǎng)站临燃,我們?cè)撊绾翁幚恚?/p>

在上一篇中睛驳,我們使用了java.net.socket類來實(shí)現(xiàn)了這樣的需求,以一線程處理一連接的方式膜廊,并配以線程池的控制乏沸,貌似得到了當(dāng)前的最優(yōu)解∽希可是這里也存在一個(gè)問題蹬跃,連接處理是同步的,也就是并發(fā)數(shù)量增大后钥勋,大量請(qǐng)求會(huì)在隊(duì)列中等待炬转,或直接異常拋出。

為解決這問題算灸,我們發(fā)現(xiàn)元兇處在“一線程一請(qǐng)求”上扼劈,如果一個(gè)線程能同時(shí)處理多個(gè)請(qǐng)求,那么在高并發(fā)下性能上會(huì)大大改善菲驴。這里就借住 JAVA 中的 nio 技術(shù)來實(shí)現(xiàn)這一模型荐吵。

nio 的阻塞實(shí)現(xiàn)

關(guān)于什么是 nio,從字面上理解為 New IO赊瞬,就是為了彌補(bǔ)原本 I/O 上的不足先煎,而在 JDK 1.4 中引入的一種新的 I/O 實(shí)現(xiàn)方式弹惦。簡(jiǎn)單理解务嫡,就是它提供了 I/O 的阻塞與非阻塞的兩種實(shí)現(xiàn)方式(當(dāng)然姥饰,默認(rèn)實(shí)現(xiàn)方式是阻塞的峦阁。)隐解。

下面退渗,我們先來看下 nio 以阻塞方式是如何處理的今艺。

建立連接

有了上一篇 socket 的經(jīng)驗(yàn)挺身,我們的第一步一定也是建立 socket 連接。只不過消略,這里不是采用 new socket() 的方式堡称,而是引入了一個(gè)新的概念 SocketChannel。它可以看作是 socket 的一個(gè)完善類艺演,除了提供 Socket 的相關(guān)功能外却紧,還提供了許多其他特性,如后面要講到的向選擇器注冊(cè)的功能胎撤。

類圖如下:

建立連接代碼實(shí)現(xiàn):

<pre>// 初始化 socket晓殊,建立 socket 與 channel 的綁定關(guān)系
SocketChannel socketChannel = SocketChannel.open();
// 初始化遠(yuǎn)程連接地址
SocketAddress remote = new InetSocketAddress(this.host, port);
// I/O 處理設(shè)置阻塞,這也是默認(rèn)的方式哩照,可不設(shè)置
socketChannel.configureBlocking(true);
// 建立連接
socketChannel.connect(remote);</pre>

獲取 socket 連接

因?yàn)槭峭瑯邮?I/O 阻塞的實(shí)現(xiàn)挺物,所以后面的關(guān)于 socket 輸入輸出流的處理,和上一篇的基本相同飘弧。唯一差別是识藤,這里需要通過 channel 來獲取 socket 連接。

  • 獲取 socket 連接

<pre>Socket socket = socketChannel.socket();</pre>

  • 處理輸入輸出流

<pre>PrintWriter pw = getWriter(socketChannel.socket());
BufferedReader br = getReader(socketChannel.socket());</pre>

完整示例

<pre>package com.jason.network.mode.nio;

import com.jason.network.constant.HttpConstant;
import com.jason.network.util.HttpUtil;

import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;

public class NioBlockingHttpClient {

private SocketChannel socketChannel;
private String host;

public static void main(String[] args) throws IOException {

    for (String host: HttpConstant.HOSTS) {

        NioBlockingHttpClient client = new NioBlockingHttpClient(host, HttpConstant.PORT);
        client.request();

    }

}

public NioBlockingHttpClient(String host, int port) throws IOException {
    this.host = host;
    socketChannel = SocketChannel.open();
    socketChannel.socket().setSoTimeout(5000);
    SocketAddress remote = new InetSocketAddress(this.host, port);
    this.socketChannel.connect(remote);
}

public void request() throws IOException {
    PrintWriter pw = getWriter(socketChannel.socket());
    BufferedReader br = getReader(socketChannel.socket());

    pw.write(HttpUtil.compositeRequest(host));
    pw.flush();
    String msg;
    while ((msg = br.readLine()) != null){
        System.out.println(msg);
    }
}

private PrintWriter getWriter(Socket socket) throws IOException {
    OutputStream out = socket.getOutputStream();
    return new PrintWriter(out);
}

private BufferedReader getReader(Socket socket) throws IOException {
    InputStream in = socket.getInputStream();
    return new BufferedReader(new InputStreamReader(in));
}

}</pre>

nio 的非阻塞實(shí)現(xiàn)

原理分析

nio 的阻塞實(shí)現(xiàn)次伶,基本與使用原生的 socket 類似痴昧,沒有什么特別大的差別。

下面我們來看看它真正強(qiáng)大的地方冠王。到目前為止赶撰,我們將的都是阻塞 I/O。何為阻塞 I/O柱彻,看下圖:

我們主要觀察圖中的前三種 I/O 模型豪娜,關(guān)于異步 I/O,一般需要依靠操作系統(tǒng)的支持哟楷,這里不討論瘤载。

從圖中可以發(fā)現(xiàn),阻塞過程主要發(fā)生在兩個(gè)階段上:

  • 第一階段:等待數(shù)據(jù)就緒卖擅;
  • 第二階段:將已就緒的數(shù)據(jù)從內(nèi)核緩沖區(qū)拷貝到用戶空間鸣奔;

這里產(chǎn)生了一個(gè)從內(nèi)核到用戶空間的拷貝,主要是為了系統(tǒng)的性能優(yōu)化考慮惩阶。假設(shè)挎狸,從網(wǎng)卡讀到的數(shù)據(jù)直接返回給用戶空間,那勢(shì)必會(huì)造成頻繁的系統(tǒng)中斷断楷,因?yàn)閺木W(wǎng)卡讀到的數(shù)據(jù)不一定是完整的锨匆,可能斷斷續(xù)續(xù)的過來。通過內(nèi)核緩沖區(qū)作為緩沖冬筒,等待緩沖區(qū)有足夠的數(shù)據(jù)恐锣,或者讀取完結(jié)后紊遵,進(jìn)行一次的系統(tǒng)中斷,將數(shù)據(jù)返回給用戶侥蒙,這樣就能避免頻繁的中斷產(chǎn)生。

了解了 I/O 阻塞的兩個(gè)階段匀奏,下面我們進(jìn)入正題鞭衩。看看一個(gè)線程是如何實(shí)現(xiàn)同時(shí)處理多個(gè) I/O 調(diào)用的娃善。從上圖中的非阻塞 I/O 可以看出论衍,僅僅只有第二階段需要阻塞,第一階段的數(shù)據(jù)等待過程聚磺,我們是不需要關(guān)心的坯台。不過該模型是頻繁地去檢查是否就緒,造成了 CPU 無效的處理瘫寝,反而效果不好蜒蕾。如果有一種類似的好萊塢原則— “不要給我們打電話,我們會(huì)打給你” 焕阿。這樣一個(gè)線程可以同時(shí)發(fā)起多個(gè) I/O 調(diào)用咪啡,并且不需要同步等待數(shù)據(jù)就緒。在數(shù)據(jù)就緒完成的時(shí)候暮屡,會(huì)以事件的機(jī)制撤摸,來通知我們。這樣不就實(shí)現(xiàn)了單線程同時(shí)處理多個(gè) IO 調(diào)用的問題了嗎褒纲?即所說的“I/O 多路復(fù)用模型”准夷。


廢話講了一大堆,下面就來實(shí)際操刀一下莺掠。

創(chuàng)建選擇器

由上面分析可以衫嵌,我們得有一個(gè)選擇器,它能監(jiān)聽所有的 I/O 操作汁蝶,并且以事件的方式通知我們哪些 I/O 已經(jīng)就緒了渐扮。

代碼如下:

<pre>import java.nio.channels.Selector;

...

private static Selector selector;
static {
try {
selector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
}
</pre>

創(chuàng)建非阻塞 I/O

下面,我們來創(chuàng)建一個(gè)非阻塞的 SocketChannel掖棉,代碼與阻塞實(shí)現(xiàn)類型墓律,唯一不同是socketChannel.configureBlocking(false)

注意:只有在socketChannel.configureBlocking(false)之后的代碼幔亥,才是非阻塞的耻讽,如果socketChannel.connect()在設(shè)置非阻塞模式之前,那么連接操作依舊是阻塞調(diào)用的帕棉。

<pre>SocketChannel socketChannel = SocketChannel.open();
SocketAddress remote = new InetSocketAddress(host, port);
// 設(shè)置非阻塞模式
socketChannel.configureBlocking(false);
socketChannel.connect(remote);</pre>

建立選擇器與 socket 的關(guān)聯(lián)

選擇器與 socket 都創(chuàng)建好了针肥,下一步就是將兩者進(jìn)行關(guān)聯(lián)饼记,好讓選擇器和監(jiān)聽到 Socket 的變化。這里采用了以 SocketChannel 主動(dòng)注冊(cè)到選擇器的方式進(jìn)行關(guān)聯(lián)綁定慰枕,這也就解釋了具则,為什么不直接new Socket(),而是以SocketChannel的方式來創(chuàng)建 socket具帮。

代碼如下:

<pre>socketChannel.register(selector,
SelectionKey.OP_CONNECT
| SelectionKey.OP_READ
| SelectionKey.OP_WRITE);</pre>

上面代碼博肋,我們將 socketChannel 注冊(cè)到了選擇器中,并且對(duì)它的連接蜂厅、可讀匪凡、可寫事件進(jìn)行了監(jiān)聽。

具體的事件監(jiān)聽類型如下:

操作類型 描述 所屬對(duì)象
OP_READ 1 << 0 讀操作 SocketChannel
OP_WRITE 1 << 2 寫操作 SocketChannel
OP_CONNECT 1 << 3 連接socket操作 SocketChannel
OP_ACCEPT 1 << 4 接受socket操作 ServerSocketChannel

選擇器監(jiān)聽 socket 變化

現(xiàn)在掘猿,選擇器已經(jīng)與我們關(guān)心的 socket 進(jìn)行了關(guān)聯(lián)病游。下面就是感知事件的變化,然后調(diào)用相應(yīng)的處理機(jī)制稠通。

這里與 Linux 下的 selector 有點(diǎn)不同衬衬,nio 下的 selecotr 不會(huì)去遍歷所有關(guān)聯(lián)的 socket。我們?cè)谧?cè)時(shí)設(shè)置了我們關(guān)心的事件類型改橘,每次從選擇器中獲取的佣耐,只會(huì)是那些符合事件類型,并且完成就緒操作的 socket唧龄,減少了大量無效的遍歷操作兼砖。

public void select() throws IOException {
    // 獲取就緒的 socket 個(gè)數(shù)
    while (selector.select() > 0){

        // 獲取符合的 socket 在選擇器中對(duì)應(yīng)的事件句柄 key
        Set keys = selector.selectedKeys();

        // 遍歷所有的key
        Iterator it = keys.iterator();
        while (it.hasNext()){

            // 獲取對(duì)應(yīng)的 key,并從已選擇的集合中移除
            SelectionKey key = (SelectionKey)it.next();
            it.remove();

            if (key.isConnectable()){
                // 進(jìn)行連接操作
                connect(key);
            }
            else if (key.isWritable()){
                // 進(jìn)行寫操作
                write(key);
            }
            else if (key.isReadable()){
                // 進(jìn)行讀操作
                receive(key);
            }
        }
    }
}

注意:這里的selector.select()是同步阻塞的既棺,等待有事件發(fā)生后讽挟,才會(huì)被喚醒。這就防止了 CPU 空轉(zhuǎn)的產(chǎn)生丸冕。當(dāng)然耽梅,我們也可以給它設(shè)置超時(shí)時(shí)間,selector.select(long timeout)來結(jié)束阻塞過程胖烛。

處理連接就緒事件

下面眼姐,我們分別來看下,一個(gè) socket 是如何來處理連接佩番、寫入數(shù)據(jù)和讀取數(shù)據(jù)的(這些操作都是阻塞的過程众旗,只是我們將等待就緒的過程變成了非阻塞的了)。

處理連接代碼:

<pre>// SelectionKey 代表 SocketChannel 在選擇器中注冊(cè)的事件句柄
private void connect(SelectionKey key) throws IOException {
// 獲取事件句柄對(duì)應(yīng)的 SocketChannel
SocketChannel channel = (SocketChannel) key.channel();

// 真正的完成 socket 連接
channel.finishConnect();

// 打印連接信息
InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
String host = remote.getHostName();
int port = remote.getPort();
System.out.println(String.format("訪問地址: %s:%s 連接成功!", host, port));
}</pre>

處理寫入就緒事件

<pre>// 字符集處理類
private Charset charset = Charset.forName("utf8");

private void write(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
String host = remote.getHostName();

// 獲取 HTTP 請(qǐng)求趟畏,同上一篇
String request = HttpUtil.compositeRequest(host);

// 向 SocketChannel 寫入事件 
channel.write(charset.encode(request));

// 修改 SocketChannel 所關(guān)心的事件
key.interestOps(SelectionKey.OP_READ);

}</pre>

這里有兩個(gè)地方需要注意:

  • 第一個(gè)是使用 channel.write(charset.encode(request)); 進(jìn)行數(shù)據(jù)寫入贡歧。有人會(huì)說,為什么不能像上面同步阻塞那樣,通過PrintWriter包裝類進(jìn)行操作利朵。因?yàn)?code>PrintWriter的 write() 方法是阻塞的律想,也就是說要等數(shù)據(jù)真正從 socket 發(fā)送出去后才返回。

這與我們這里所講的阻塞是不一致的绍弟,這里的操作雖然也是阻塞的技即,但它發(fā)生的過程是在數(shù)據(jù)從用戶空間到內(nèi)核緩沖區(qū)拷貝過程。至于系統(tǒng)將緩沖區(qū)的數(shù)據(jù)通過 socket 發(fā)送出去樟遣,這不在阻塞范圍內(nèi)姥份。也解釋了為什么要用 Charset 對(duì)寫入內(nèi)容進(jìn)行編碼了,因?yàn)榫彌_區(qū)接收的格式是ByteBuffer年碘。

  • 第二,選擇器用來監(jiān)聽事件變化的兩個(gè)參數(shù)是 interestOpsreadyOps展鸡。

    • interestOps:表示 SocketChannel 所關(guān)心的事件類型屿衅,也就是告訴選擇器,當(dāng)有這幾種事件發(fā)生時(shí)莹弊,才來通知我涤久。這里通過key.interestOps(SelectionKey.OP_READ);告訴選擇器,之后我只關(guān)心“讀就緒”事件忍弛,其他的不用通知我了响迂。

    • readyOps:表示 SocketChannel 當(dāng)前就緒的事件類型。以key.isReadable()為例细疚,判斷依據(jù)就是:return (readyOps() & OP_READ) != 0;

處理讀取就緒事件

<pre>private void receive(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
buffer.flip();
String receiveData = charset.decode(buffer).toString();

// 當(dāng)再?zèng)]有數(shù)據(jù)可讀時(shí)蔗彤,取消在選擇器中的關(guān)聯(lián),并關(guān)閉 socket 連接
if ("".equals(receiveData)) {
    key.cancel();
    channel.close();
    return;
}

System.out.println(receiveData);

}</pre>

這里的處理基本與寫入一致疯兼,唯一要注意的是然遏,這里我們需要自行處理去緩沖區(qū)讀取數(shù)據(jù)的操作。首先會(huì)分配一個(gè)固定大小的緩沖區(qū)吧彪,然后從內(nèi)核緩沖區(qū)中待侵,拷貝數(shù)據(jù)至我們剛分配固定緩沖區(qū)上。這里存在兩種情況:

  • 我們分配的緩沖區(qū)過大姨裸,那多余的部分以0補(bǔ)充(初始化時(shí)秧倾,其實(shí)會(huì)自動(dòng)補(bǔ)0)。
  • 我們分配的緩沖去過小傀缩,因?yàn)檫x擇器會(huì)不停的遍歷那先。只要 SocketChannel 處理讀就緒狀態(tài),那下一次會(huì)繼續(xù)讀取赡艰。當(dāng)然胃榕,分配過小,會(huì)增加遍歷次數(shù)。

最后勋又,將一下 ByteBuffer 的結(jié)構(gòu)苦掘,它主要有 position, limit,capacity 以及 mark 屬性。以 buffer.flip(); 為例楔壤,講下各屬性的作用(mark 主要是用來標(biāo)記之前 position 的位置鹤啡,是在當(dāng)前 postion 無法滿足的情況下使用的,這里不作討論)蹲嚣。

從圖中看出递瑰,

  • 容量(capacity):表示緩沖區(qū)可以保存的數(shù)據(jù)容量;
  • 極限(limit):表示緩沖區(qū)的當(dāng)前終點(diǎn)隙畜,即寫入抖部、讀取都不可超過該重點(diǎn);
  • 位置(position):表示緩沖區(qū)下一個(gè)讀寫單元的位置议惰;

完整代碼

<pre>package com.jason.network.mode.nio;

import com.jason.network.constant.HttpConstant;
import com.jason.network.util.HttpUtil;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;

public class NioNonBlockingHttpClient {

private static Selector selector;
private Charset charset = Charset.forName("utf8");

static {
    try {
        selector = Selector.open();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

public static void main(String[] args) throws IOException {

    NioNonBlockingHttpClient client = new NioNonBlockingHttpClient();

    for (String host: HttpConstant.HOSTS) {

        client.request(host, HttpConstant.PORT);

    }

    client.select();

}

public void request(String host, int port) throws IOException {
    SocketChannel socketChannel = SocketChannel.open();
    socketChannel.socket().setSoTimeout(5000);
    SocketAddress remote = new InetSocketAddress(host, port);
    socketChannel.configureBlocking(false);
    socketChannel.connect(remote);
    socketChannel.register(selector,
                    SelectionKey.OP_CONNECT
                    | SelectionKey.OP_READ
                    | SelectionKey.OP_WRITE);
}

public void select() throws IOException {
    while (selector.select(500) > 0){
        Set keys = selector.selectedKeys();

        Iterator it = keys.iterator();

        while (it.hasNext()){

            SelectionKey key = (SelectionKey)it.next();
            it.remove();

            if (key.isConnectable()){
                connect(key);
            }
            else if (key.isWritable()){
                write(key);
            }
            else if (key.isReadable()){
                receive(key);
            }
        }
    }
}

private void connect(SelectionKey key) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();
    channel.finishConnect();
    InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
    String host = remote.getHostName();
    int port = remote.getPort();
    System.out.println(String.format("訪問地址: %s:%s 連接成功!", host, port));
}

private void write(SelectionKey key) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();
    InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
    String host = remote.getHostName();

    String request = HttpUtil.compositeRequest(host);
    System.out.println(request);

    channel.write(charset.encode(request));
    key.interestOps(SelectionKey.OP_READ);
}

private void receive(SelectionKey key) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    channel.read(buffer);
    buffer.flip();
    String receiveData = charset.decode(buffer).toString();

    if ("".equals(receiveData)) {
        key.cancel();
        channel.close();
        return;
    }

    System.out.println(receiveData);
}

}
</pre>

示例效果

總結(jié)

本文從 nio 的阻塞方式講起慎颗,介紹了阻塞 I/O 與非阻塞 I/O 的區(qū)別,以及在 nio 下是如何一步步構(gòu)建一個(gè) IO 多路復(fù)用的模型的客戶端言询。文中需要理解的內(nèi)容比較多俯萎,如果有理解錯(cuò)誤的地方,歡迎指正~

補(bǔ)充1:基于NIO的多路復(fù)用客戶端(線程池版)

<pre>public static void main(String[] args) {
基于線程池的偽異步NIO模型 a = new 基于線程池的偽異步NIO模型();
a.startServer(); }
private Charset charset = Charset.forName("utf8"); class WriteThread implements Runnable {
private SelectionKey key;
public WriteThread(SelectionKey key) {
this.key = key;
}
@Override
public void run() {
SocketChannel socketChannel = (SocketChannel) key.channel();
Socket socket = socketChannel.socket();
try {
socketChannel.finishConnect();
} catch (IOException e) {
e.printStackTrace();
}
InetSocketAddress remote = (InetSocketAddress) socketChannel.socket().getRemoteSocketAddress();
String host = remote.getHostName();
int port = remote.getPort();
System.out.println(String.format("訪問地址: %s:%s 連接成功!", host, port)); }
}
class ReadThread implements Runnable {
private SelectionKey key;
public ReadThread(SelectionKey key) {
this.key = key;
}
@Override
public void run() {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
socketChannel.read(buffer);
} catch (IOException e) {
e.printStackTrace();
}
buffer.flip();
String receiveData = null;
try {
receiveData = new String(buffer.array(), "utf8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}

    if ("".equals(receiveData)) {
        key.cancel();

try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
return;
}

    System._out_.println(receiveData);

}
}
class ConnectThread implements Runnable {
private SelectionKey key;
public ConnectThread(SelectionKey key) {
this.key = key;
}
@Override
public void run() {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = charset.encode("hello world");
try {
socketChannel.write(byteBuffer);
System.out.println("hello world");
} catch (IOException e) {
e.printStackTrace();
}
key.interestOps(SelectionKey.OP_READ);
}
}
public void startServer() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
try {
SocketChannel socketChannel = SocketChannel.open();
Selector selector = Selector.open(); socketChannel.configureBlocking(false);
InetSocketAddress inetAddress = new InetSocketAddress(1234); socketChannel.connect(inetAddress);
socketChannel.register(selector, SelectionKey.OP_CONNECT |
SelectionKey.OP_READ |
SelectionKey.OP_WRITE); while (selector.select(500) > 0) {
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
if (key.isConnectable()) {
executorService.submit(new ConnectThread(key));
}else if(key.isReadable()) {
executorService.submit(new ReadThread(key));
}else {
executorService.submit(new WriteThread(key));
}
}
}

} catch (IOException e) {
    e.printStackTrace();

}
}</pre>

補(bǔ)充2:基于NIO的多路復(fù)用服務(wù)端

<pre>class NioNonBlockingHttpServer {

private static Selector _selector_;

private Charset charset = Charset.forName("utf8"); static {
try {
selector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws IOException {

    NioNonBlockingHttpServer httpServer = new NioNonBlockingHttpServer();

httpServer.select(); }

public void request(int port) throws IOException {
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.socket().setSoTimeout(5000);
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(8383)); // serverSocketChannel.register(selector, // SelectionKey.OP_CONNECT // | SelectionKey.OP_READ // | SelectionKey.OP_WRITE);
}

public void select() throws IOException {
    while (_selector_.select(500) > 0) {
        Set keys = _selector_.selectedKeys();    Iterator it = keys.iterator();   while (it.hasNext()) {

            SelectionKey key = (SelectionKey) it.next();

it.remove(); if (key.isAcceptable()) {
accept(key);
} else if (key.isWritable()) {
write(key);
} else if (key.isReadable()) {
receive(key);
}
}
}
}

private void accept(SelectionKey key) throws IOException {
    SocketChannel socketChannel;

ServerSocketChannel channel = (ServerSocketChannel) key.channel();
socketChannel = channel.accept();//接受連接請(qǐng)求
socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); InetSocketAddress local = (InetSocketAddress) channel.socket().getLocalSocketAddress();
String host = local.getHostName();
int port = local.getPort();
System.out.println(String.format("請(qǐng)求地址: %s:%s 接收成功!", host, port)); }

private void write(SelectionKey key) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();    InetSocketAddress local = (InetSocketAddress) channel.socket().getRemoteSocketAddress();

String host = local.getHostName();
String msg = "hello Client";
channel.write(charset.encode(msg)); System.out.println(msg);
key.interestOps(SelectionKey.OP_READ);
}

private void receive(SelectionKey key) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();

ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
buffer.flip();
String receiveData = charset.decode(buffer).toString(); if ("".equals(receiveData)) {
key.cancel();
channel.close();
return; }

    System._out_.println(receiveData);

}
}</pre>

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末运杭,一起剝皮案震驚了整個(gè)濱河市夫啊,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌辆憔,老刑警劉巖撇眯,帶你破解...
    沈念sama閱讀 218,451評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異虱咧,居然都是意外死亡叛本,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,172評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門彤钟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來来候,“玉大人,你說我怎么就攤上這事逸雹∮粒” “怎么了?”我有些...
    開封第一講書人閱讀 164,782評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵梆砸,是天一觀的道長(zhǎng)转质。 經(jīng)常有香客問我,道長(zhǎng)帖世,這世上最難降的妖魔是什么休蟹? 我笑而不...
    開封第一講書人閱讀 58,709評(píng)論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮,結(jié)果婚禮上赂弓,老公的妹妹穿的比我還像新娘绑榴。我一直安慰自己,他們只是感情好盈魁,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,733評(píng)論 6 392
  • 文/花漫 我一把揭開白布翔怎。 她就那樣靜靜地躺著,像睡著了一般杨耙。 火紅的嫁衣襯著肌膚如雪赤套。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,578評(píng)論 1 305
  • 那天珊膜,我揣著相機(jī)與錄音容握,去河邊找鬼。 笑死车柠,一個(gè)胖子當(dāng)著我的面吹牛剔氏,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播堪遂,決...
    沈念sama閱讀 40,320評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼萌庆!你這毒婦竟也來了溶褪?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,241評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤践险,失蹤者是張志新(化名)和其女友劉穎猿妈,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體巍虫,經(jīng)...
    沈念sama閱讀 45,686評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡彭则,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,878評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了占遥。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片俯抖。...
    茶點(diǎn)故事閱讀 39,992評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖瓦胎,靈堂內(nèi)的尸體忽然破棺而出芬萍,到底是詐尸還是另有隱情,我是刑警寧澤搔啊,帶...
    沈念sama閱讀 35,715評(píng)論 5 346
  • 正文 年R本政府宣布柬祠,位于F島的核電站,受9級(jí)特大地震影響负芋,放射性物質(zhì)發(fā)生泄漏漫蛔。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,336評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望莽龟。 院中可真熱鬧蠕嫁,春花似錦、人聲如沸轧房。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,912評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)奶镶。三九已至迟赃,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間厂镇,已是汗流浹背纤壁。 一陣腳步聲響...
    開封第一講書人閱讀 33,040評(píng)論 1 270
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留捺信,地道東北人酌媒。 一個(gè)月前我還...
    沈念sama閱讀 48,173評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像迄靠,于是被迫代替她去往敵國(guó)和親秒咨。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,947評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容