JAVA NIO
始于Java1.4, 提供了新的Java IO 操作非阻塞API。目的是替代Java IO 和 JAVA Networking相關(guān)的API艘款。
NIO中有三個核心的組件:
- Buffer 緩沖區(qū)
- Channel 通道
- Selector 選擇器
1. Buffer 緩沖區(qū)
緩沖區(qū)本質(zhì)上是一個可以寫入數(shù)據(jù)的內(nèi)存塊(類似數(shù)組)离熏,然后可以再次讀取。此內(nèi)存塊包含在NIO Buffer對象中凤壁,該對象提供了一組方法,可以更輕松地使用內(nèi)存塊跪另。
相比較直接對數(shù)組的操作拧抖,Buffer API更加容易操作和管理。
使用Buffer進行數(shù)據(jù)寫入與讀取免绿,需要進行如下四個步驟:
- 將數(shù)據(jù)寫入緩沖區(qū)
- 調(diào)用buffer.flip()唧席,轉(zhuǎn)換為讀取模式
- 緩沖區(qū)讀取數(shù)據(jù)
- 調(diào)用buffer.clear() 或 buffer.compact() 清楚緩沖區(qū)
1.1 Buffer工作原理
Buffer 三個重要屬性:
- capacity 容量:作為一個內(nèi)存塊,Buffer具有一定的固定大小嘲驾,也叫容量
- position 位置:寫入模式時代表的是寫數(shù)據(jù)的位置淌哟。讀取模式時代表的是讀取數(shù)據(jù)的位置。
limit 限制:寫入模式辽故,limit等于buffer的容量徒仓。讀取模式下,limit等于寫入的數(shù)據(jù)量誊垢。
Buffer工作原理
1.2 Buffer的例子代碼
package cn.lazyfennec.net.nio;
import java.nio.ByteBuffer;
public class BufferDemo {
public static void main(String[] args) {
// 構(gòu)建一個byte字節(jié)緩沖區(qū)掉弛,容量是4
ByteBuffer byteBuffer = ByteBuffer.allocate(4);
// 默認寫入模式,查看三個重要的指標(biāo)
System.out.println(String.format("初始化:capacity容量:%s, position位置:%s, limit限制:%s", byteBuffer.capacity(),
byteBuffer.position(), byteBuffer.limit()));
// 寫入2字節(jié)的數(shù)據(jù)
byteBuffer.put((byte) 1);
byteBuffer.put((byte) 2);
byteBuffer.put((byte) 3);
// 再看數(shù)據(jù)
System.out.println(String.format("寫入3字節(jié)后喂走,capacity容量:%s, position位置:%s, limit限制:%s", byteBuffer.capacity(),
byteBuffer.position(), byteBuffer.limit()));
// 轉(zhuǎn)換為讀取模式(不調(diào)用flip方法殃饿,也是可以讀取數(shù)據(jù)的,但是position記錄讀取的位置不對)
System.out.println("#######開始讀取");
byteBuffer.flip();
byte a = byteBuffer.get();
System.out.println(a);
byte b = byteBuffer.get();
System.out.println(b);
System.out.println(String.format("讀取2字節(jié)數(shù)據(jù)后芋肠,capacity容量:%s, position位置:%s, limit限制:%s", byteBuffer.capacity(),
byteBuffer.position(), byteBuffer.limit()));
// 繼續(xù)寫入3字節(jié)壁晒,此時讀模式下,limit=3,position=2.繼續(xù)寫入只能覆蓋寫入一條數(shù)據(jù)
// clear()方法清除整個緩沖區(qū)秒咐。compact()方法僅清除已閱讀的數(shù)據(jù)谬晕。轉(zhuǎn)為寫入模式
byteBuffer.compact(); // buffer : 1 , 3
byteBuffer.put((byte) 3);
byteBuffer.put((byte) 4);
byteBuffer.put((byte) 5);
System.out.println(String.format("最終的情況,capacity容量:%s, position位置:%s, limit限制:%s", byteBuffer.capacity(),
byteBuffer.position(), byteBuffer.limit()));
// rewind() 重置position為0
// mark() 標(biāo)記position的位置
// reset() 重置position為上次mark()標(biāo)記的位置
}
}
運行結(jié)果
初始化:capacity容量:4, position位置:0, limit限制:4
寫入3字節(jié)后携取,capacity容量:4, position位置:3, limit限制:4
#######開始讀取
1
2
讀取2字節(jié)數(shù)據(jù)后攒钳,capacity容量:4, position位置:2, limit限制:3
最終的情況,capacity容量:4, position位置:4, limit限制:4
1.3 ByteBuffer的內(nèi)存類型
ByteBuffer為性能關(guān)鍵型代碼提供了直接內(nèi)存(direct堆外)和非直接內(nèi)存(heap堆)兩種實現(xiàn)雷滋。堆外內(nèi)存獲取的方式: ByteBuffer directByteBuffer = ByteBuffer.allocateDirect(noBytes);
好處:
1不撑、進行網(wǎng)絡(luò)IO或者文件IO時比heapBuffer少一次拷貝。(file/socket ----OS memory ---- jvm heap)GC會移動對象內(nèi)存晤斩,在寫file或socket的過程中焕檬,JVM的實現(xiàn)中,會先把數(shù)據(jù)復(fù)制到堆外澳泵,再進行寫入实愚。
2、GC范圍之外兔辅,降低GC壓力腊敲,但實現(xiàn)了自動管理。DirectByteBuffer中有一個Cleaner對象(PhantomReference)维苔,Cleaner被GC前會執(zhí)行clean方法碰辅,觸發(fā)DirectByteBuffer中定義的Deallocator建議:
1、性能確實可觀的時候才去使用;分配給大型介时、長壽命没宾;(網(wǎng)絡(luò)傳輸、文件讀寫場景)
2沸柔、通過虛擬機參數(shù)MaxDirectMemorySize限制大小榕吼,防止耗盡整個機器的內(nèi)存;使用堆外內(nèi)存的方式:
// 構(gòu)建一個byte字節(jié)緩沖區(qū),容量是4勉失,使用堆外內(nèi)存
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4);
2. Channel 通道
- Channel的API涵蓋了UDP/TCP網(wǎng)絡(luò)和文件IO
- FileChannel
- DatagramChannel
- SocketChannel
- ServerSocketChannel
- 和標(biāo)準IO Stream操作的區(qū)別:
- 在一個通道內(nèi)進行讀取和寫入
- stream通常是單向的(input或output)
- 可以非阻塞讀取和寫入通道
- 通道始終讀取或?qū)懭刖彌_區(qū)
2.1 SocketChannel
- 用于建立TCO網(wǎng)絡(luò)連接
- 兩種創(chuàng)建socketChannel的形式:
1.客戶端主動發(fā)起和服務(wù)器的連接羹蚣。
2.服務(wù)端獲取的新鏈接。
// 客戶端主動發(fā)起連接的方式
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false); // 設(shè)置為非阻塞模式
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
socketChannel.write(byteBuffer);// 發(fā)送請求數(shù)據(jù) - 向通道寫入數(shù)據(jù)
int bytesRead = socketChannel.read(byteBuffer); // 讀取服務(wù)端返回 - 讀取緩沖區(qū)的數(shù)據(jù)
socketChannel.close(); // 關(guān)閉連接
write寫:write()在尚未寫入任何內(nèi)容時就可能返回了乱凿。需要在循環(huán)中調(diào)用write()顽素。
read讀: read()方法可能直接返回而根本不讀取任何數(shù)據(jù),根據(jù)返回的int值判斷讀取了多少字節(jié)徒蟆。
2.2 ServerSocketChannel
ServerSocketChannel可以監(jiān)聽新建的TCP連接通道胁出,類似ServerSocket。
// 創(chuàng)建網(wǎng)絡(luò)服務(wù)端
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); // 設(shè)置為非阻塞模式
serverSocketChannel.socket().bind(new InetSocketAddress(8080)); // 綁定端口
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept(); // 獲取新tcp連接通道
if (socketChannel != null) {
// tcp請求 讀取/響應(yīng)
}
}
serverSocketChannel.accept():如果該通道處于非阻塞模式段审,那么如果沒有掛起的連接全蝶,該方法將立即返回null。必須檢查返回的SocketChannel是否為null。
- 代碼示例
服務(wù)端
package cn.lazyfennec.net.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
/**
* 直接基于非阻塞的寫法
*/
public class NIOServer {
public static void main(String[] args) throws Exception {
// 創(chuàng)建網(wǎng)絡(luò)服務(wù)端
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); // 設(shè)置為非阻塞模式
serverSocketChannel.socket().bind(new InetSocketAddress(8080)); // 綁定端口
System.out.println("啟動成功");
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept(); // 獲取新tcp連接通道
// tcp請求 讀取/響應(yīng)
if (socketChannel != null) {
System.out.println("收到新連接 : " + socketChannel.getRemoteAddress());
socketChannel.configureBlocking(false); // 默認是阻塞的,一定要設(shè)置為非阻塞
try {
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
// 長連接情況下,需要手動判斷數(shù)據(jù)有沒有讀取結(jié)束 (此處做一個簡單的判斷: 超過0字節(jié)就認為請求結(jié)束了)
if (requestBuffer.position() > 0) break;
}
if (requestBuffer.position() == 0) continue; // 如果沒數(shù)據(jù)了, 則不繼續(xù)后面的處理
requestBuffer.flip();
byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
System.out.println("收到數(shù)據(jù),來自:" + socketChannel.getRemoteAddress());
// 響應(yīng)結(jié)果 200
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 11\r\n\r\n" +
"Hello World";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
while (buffer.hasRemaining()) {
socketChannel.write(buffer);// 非阻塞
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 用到了非阻塞的API, 在設(shè)計上,和BIO可以有很大的不同.繼續(xù)改進
}
}
客戶端
package cn.lazyfennec.net.nio;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
public class NIOClient {
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
while (!socketChannel.finishConnect()) {
// 沒連接上,則一直等待
Thread.yield();
}
Scanner scanner = new Scanner(System.in);
System.out.println("請輸入:");
// 發(fā)送內(nèi)容
String msg = scanner.nextLine();
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
// 讀取響應(yīng)
System.out.println("收到服務(wù)端響應(yīng):");
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
// 長連接情況下,需要手動判斷數(shù)據(jù)有沒有讀取結(jié)束 (此處做一個簡單的判斷: 超過0字節(jié)就認為請求結(jié)束了)
if (requestBuffer.position() > 0) break;
}
requestBuffer.flip();
byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
scanner.close();
socketChannel.close();
}
}
以上的Server類中抑淫,連接依舊會阻塞绷落,可以做以下調(diào)整升級:
- 輪詢通道的方式
package cn.lazyfennec.net.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
/**
* 直接基于非阻塞的寫法,一個線程處理輪詢所有請求
*/
public class NIOServer1 {
/**
* 已經(jīng)建立連接的集合
*/
private static ArrayList<SocketChannel> channels = new ArrayList<>();
public static void main(String[] args) throws Exception {
// 創(chuàng)建網(wǎng)絡(luò)服務(wù)端
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); // 設(shè)置為非阻塞模式
serverSocketChannel.socket().bind(new InetSocketAddress(8080)); // 綁定端口
System.out.println("啟動成功");
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept(); // 獲取新tcp連接通道
// tcp請求 讀取/響應(yīng)
if (socketChannel != null) {
System.out.println("收到新連接 : " + socketChannel.getRemoteAddress());
socketChannel.configureBlocking(false); // 默認是阻塞的,一定要設(shè)置為非阻塞
channels.add(socketChannel);
} else {
// 沒有新連接的情況下,就去處理現(xiàn)有連接的數(shù)據(jù),處理完的就刪除掉
Iterator<SocketChannel> iterator = channels.iterator();
while (iterator.hasNext()) {
SocketChannel ch = iterator.next();
try {
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
if (ch.read(requestBuffer) == 0) {
// 等于0,代表這個通道沒有數(shù)據(jù)需要處理,那就待會再處理
continue;
}
while (ch.isOpen() && ch.read(requestBuffer) != -1) {
// 長連接情況下,需要手動判斷數(shù)據(jù)有沒有讀取結(jié)束 (此處做一個簡單的判斷: 超過0字節(jié)就認為請求結(jié)束了)
if (requestBuffer.position() > 0) break;
}
if (requestBuffer.position() == 0) continue; // 如果沒數(shù)據(jù)了, 則不繼續(xù)后面的處理
requestBuffer.flip();
byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
System.out.println("收到數(shù)據(jù),來自:" + ch.getRemoteAddress());
// 響應(yīng)結(jié)果 200
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 11\r\n\r\n" +
"Hello World";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
while (buffer.hasRemaining()) {
ch.write(buffer);
}
iterator.remove();
} catch (IOException e) {
e.printStackTrace();
iterator.remove();
}
}
}
}
// 用到了非阻塞的API, 再設(shè)計上,和BIO可以有很大的不同
// 問題: 輪詢通道的方式,低效,浪費CPU
}
}
Selector 選擇器
可以檢查一個或多個NIO通道,并確定哪些通道已準備好進行讀取或?qū)懭搿?strong>實現(xiàn)了單線程管理多個通道始苇,從而管理多個網(wǎng)絡(luò)連接砌烁。
1.1 Selector監(jiān)聽多個channel的多個事件:
- 四個事件對應(yīng)的四個SelectionKey常量:
- Connect連接(SelectionKey.OP_CONNECT)
- Accept準備就緒(OP_ACCEPT)
- Read讀取(OP_READ)
- Write 寫入(OP_WRITE)
- 使用Selector 避免輪詢處理
package cn.lazyfennec.net.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* 結(jié)合Selector實現(xiàn)的非阻塞服務(wù)端(放棄對channel的輪詢,借助消息通知機制)
*/
public class NIOServerV2 {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建網(wǎng)絡(luò)服務(wù)端ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); // 設(shè)置為非阻塞模式
// 2. 構(gòu)建一個Selector選擇器,并且將channel注冊上去
Selector selector = Selector.open();
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, serverSocketChannel);// 將serverSocketChannel注冊到selector
selectionKey.interestOps(SelectionKey.OP_ACCEPT); // 對serverSocketChannel上面的accept事件感興趣(serverSocketChannel只能支持accept操作)
// 3. 綁定端口
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
System.out.println("啟動成功");
while (true) {
// 不再輪詢通道,改用下面輪詢事件的方式.select方法有阻塞效果,直到有事件通知才會有返回
selector.select();
// 獲取事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 遍歷查詢結(jié)果e
Iterator<SelectionKey> iter = selectionKeys.iterator();
while (iter.hasNext()) {
// 被封裝的查詢結(jié)果
SelectionKey key = iter.next();
iter.remove();
// 關(guān)注 Read 和 Accept兩個事件
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.attachment();
// 將拿到的客戶端連接通道,注冊到selector上面
SocketChannel clientSocketChannel = server.accept(); // mainReactor 輪詢accept
clientSocketChannel.configureBlocking(false);
clientSocketChannel.register(selector, SelectionKey.OP_READ, clientSocketChannel);
System.out.println("收到新連接 : " + clientSocketChannel.getRemoteAddress());
}
if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.attachment();
try {
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
// 長連接情況下,需要手動判斷數(shù)據(jù)有沒有讀取結(jié)束 (此處做一個簡單的判斷: 超過0字節(jié)就認為請求結(jié)束了)
if (requestBuffer.position() > 0) break;
}
if (requestBuffer.position() == 0) continue; // 如果沒數(shù)據(jù)了, 則不繼續(xù)后面的處理
requestBuffer.flip();
byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
System.out.println("收到數(shù)據(jù),來自:" + socketChannel.getRemoteAddress());
// TODO 業(yè)務(wù)操作 數(shù)據(jù)庫 接口調(diào)用等等
// 響應(yīng)結(jié)果 200
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 11\r\n\r\n" +
"Hello World";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
} catch (IOException e) {
// e.printStackTrace();
key.cancel(); // 取消事件訂閱
}
}
}
selector.selectNow();
}
// 問題: 此處一個selector監(jiān)聽所有事件,一個線程處理所有請求事件. 會成為瓶頸! 要有多線程的運用
}
}
NIO 對比 BIO
NIO與多線程結(jié)合的改進方案
Doug Lea的著名文章《 Scalable l0 in Java》
- Reactors 的方式
package com.study.hc.net.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* NIO selector 多路復(fù)用reactor線程模型
*/
public class NIOServerV3 {
/** 處理業(yè)務(wù)操作的線程 */
private static ExecutorService workPool = Executors.newCachedThreadPool();
/**
* 封裝了selector.select()等事件輪詢的代碼
*/
abstract class ReactorThread extends Thread {
Selector selector;
LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
/**
* Selector監(jiān)聽到有事件后,調(diào)用這個方法
*/
public abstract void handler(SelectableChannel channel) throws Exception;
private ReactorThread() throws IOException {
selector = Selector.open();
}
volatile boolean running = false;
@Override
public void run() {
// 輪詢Selector事件
while (running) {
try {
// 執(zhí)行隊列中的任務(wù)
Runnable task;
while ((task = taskQueue.poll()) != null) {
task.run();
}
selector.select(1000);
// 獲取查詢結(jié)果
Set<SelectionKey> selected = selector.selectedKeys();
// 遍歷查詢結(jié)果
Iterator<SelectionKey> iter = selected.iterator();
while (iter.hasNext()) {
// 被封裝的查詢結(jié)果
SelectionKey key = iter.next();
iter.remove();
int readyOps = key.readyOps();
// 關(guān)注 Read 和 Accept兩個事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
try {
SelectableChannel channel = (SelectableChannel) key.attachment();
channel.configureBlocking(false);
handler(channel);
if (!channel.isOpen()) {
key.cancel(); // 如果關(guān)閉了,就取消這個KEY的訂閱
}
} catch (Exception ex) {
key.cancel(); // 如果有異常,就取消這個KEY的訂閱
}
}
}
selector.selectNow();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private SelectionKey register(SelectableChannel channel) throws Exception {
// 為什么register要以任務(wù)提交的形式,讓reactor線程去處理催式?
// 因為線程在執(zhí)行channel注冊到selector的過程中函喉,會和調(diào)用selector.select()方法的線程爭用同一把鎖
// 而select()方法實在eventLoop中通過while循環(huán)調(diào)用的,爭搶的可能性很高荣月,為了讓register能更快的執(zhí)行管呵,就放到同一個線程來處理
FutureTask<SelectionKey> futureTask = new FutureTask<>(() -> channel.register(selector, 0, channel));
taskQueue.add(futureTask);
return futureTask.get();
}
private void doStart() {
if (!running) {
running = true;
start();
}
}
}
private ServerSocketChannel serverSocketChannel;
// 1、創(chuàng)建多個線程 - accept處理reactor線程 (accept線程)
private ReactorThread[] mainReactorThreads = new ReactorThread[1];
// 2哺窄、創(chuàng)建多個線程 - io處理reactor線程 (I/O線程)
private ReactorThread[] subReactorThreads = new ReactorThread[8];
/**
* 初始化線程組
*/
private void newGroup() throws IOException {
// 創(chuàng)建IO線程,負責(zé)處理客戶端連接以后socketChannel的IO讀寫
for (int i = 0; i < subReactorThreads.length; i++) {
subReactorThreads[i] = new ReactorThread() {
@Override
public void handler(SelectableChannel channel) throws IOException {
// work線程只負責(zé)處理IO處理捐下,不處理accept事件
SocketChannel ch = (SocketChannel) channel;
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
while (ch.isOpen() && ch.read(requestBuffer) != -1) {
// 長連接情況下,需要手動判斷數(shù)據(jù)有沒有讀取結(jié)束 (此處做一個簡單的判斷: 超過0字節(jié)就認為請求結(jié)束了)
if (requestBuffer.position() > 0) break;
}
if (requestBuffer.position() == 0) return; // 如果沒數(shù)據(jù)了, 則不繼續(xù)后面的處理
requestBuffer.flip();
byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
System.out.println(Thread.currentThread().getName() + "收到數(shù)據(jù),來自:" + ch.getRemoteAddress());
// TODO 業(yè)務(wù)操作 數(shù)據(jù)庫、接口...
workPool.submit(() -> {
});
// 響應(yīng)結(jié)果 200
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 11\r\n\r\n" +
"Hello World";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
while (buffer.hasRemaining()) {
ch.write(buffer);
}
}
};
}
// 創(chuàng)建mainReactor線程, 只負責(zé)處理serverSocketChannel
for (int i = 0; i < mainReactorThreads.length; i++) {
mainReactorThreads[i] = new ReactorThread() {
AtomicInteger incr = new AtomicInteger(0);
@Override
public void handler(SelectableChannel channel) throws Exception {
// 只做請求分發(fā)堂氯,不做具體的數(shù)據(jù)讀取
ServerSocketChannel ch = (ServerSocketChannel) channel;
SocketChannel socketChannel = ch.accept();
socketChannel.configureBlocking(false);
// 收到連接建立的通知之后,分發(fā)給I/O線程繼續(xù)去讀取數(shù)據(jù)
int index = incr.getAndIncrement() % subReactorThreads.length;
ReactorThread workEventLoop = subReactorThreads[index];
workEventLoop.doStart();
SelectionKey selectionKey = workEventLoop.register(socketChannel);
selectionKey.interestOps(SelectionKey.OP_READ);
System.out.println(Thread.currentThread().getName() + "收到新連接 : " + socketChannel.getRemoteAddress());
}
};
}
}
/**
* 初始化channel,并且綁定一個eventLoop線程
*
* @throws IOException IO異常
*/
private void initAndRegister() throws Exception {
// 1牌废、 創(chuàng)建ServerSocketChannel
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
// 2咽白、 將serverSocketChannel注冊到selector
int index = new Random().nextInt(mainReactorThreads.length);
mainReactorThreads[index].doStart();
SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel);
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
}
/**
* 綁定端口
*
* @throws IOException IO異常
*/
private void bind() throws IOException {
// 1、 正式綁定端口鸟缕,對外服務(wù)
serverSocketChannel.bind(new InetSocketAddress(8080));
System.out.println("啟動完成晶框,端口8080");
}
public static void main(String[] args) throws Exception {
NIOServerV3 nioServerV3 = new NIOServerV3();
nioServerV3.newGroup(); // 1、 創(chuàng)建main和sub兩組線程
nioServerV3.initAndRegister(); // 2懂从、 創(chuàng)建serverSocketChannel授段,注冊到mainReactor線程上的selector上
nioServerV3.bind(); // 3、 為serverSocketChannel綁定端口
}
}
如果覺得有收獲就點個贊吧番甩,更多知識侵贵,請點擊關(guān)注查看我的主頁信息哦~