OSI網(wǎng)絡(luò)七層模型
- 低三層
物料層:使用原始數(shù)據(jù)比特流能再物理介質(zhì)上傳輸
數(shù)據(jù)鏈路層: 通過(guò)校驗(yàn)留美、確認(rèn)和反饋重發(fā)等手段,行程穩(wěn)定的數(shù)據(jù)鏈路
網(wǎng)絡(luò)層:進(jìn)行路由選擇和流量控制(IP協(xié)議) - 傳輸層:(TCP/UDP協(xié)議)提供可靠的端口到端口的數(shù)據(jù)傳輸服務(wù)
- 高三層:一般都指JAVA程序
會(huì)話層:負(fù)責(zé)建立伸刃、管理和終止進(jìn)程之間的會(huì)話和數(shù)據(jù)交換
表示層:負(fù)責(zé)數(shù)據(jù)格式轉(zhuǎn)換谎砾、數(shù)據(jù)加密與解密抚太、壓縮與解壓等
應(yīng)用層:為用戶的應(yīng)用進(jìn)程提供網(wǎng)絡(luò)服務(wù)
傳輸控制協(xié)議TCP
面向連接谨究、可靠、有序麻献、字節(jié)流傳輸服務(wù)隘道,應(yīng)用程序在使用TCP前必須建立TCP連接
-
TCP三次握手
- 客戶端發(fā)送消息至服務(wù)端等待確認(rèn)
- 服務(wù)端收到症歇,并發(fā)送請(qǐng)求等待確認(rèn)
- 客戶端收到,建立連接
-
TCP四次揮手
- 客戶端發(fā)送斷開(kāi)連接至服務(wù)端等待確認(rèn)
- 服務(wù)端收到谭梗,半關(guān)閉狀態(tài)忘晤,服務(wù)端等待釋放
- 服務(wù)端發(fā)送等待確認(rèn)
- 客戶端等待一會(huì),發(fā)送確認(rèn)關(guān)閉激捏,等待一會(huì)關(guān)閉
用戶數(shù)據(jù)報(bào)協(xié)議UDP
UDP是Internet傳輸協(xié)議设塔,提供無(wú)連接、不可靠远舅、數(shù)據(jù)報(bào)盡力傳輸服務(wù)
- 無(wú)需建立連接
- 無(wú)需連接狀態(tài)
- 數(shù)據(jù)不可靠
Socket編程
- 數(shù)據(jù)報(bào)類型套接字SOCK_DGRAM(面向UDP接口)
- 流式套接字SOCK_STREAM(面向TCP接口)
- 原始套接字SOCK_RAW(面向網(wǎng)絡(luò)層協(xié)議接口IP闰蛔、ICMP等)
-
Socket API及其調(diào)用過(guò)程
-
Socket API函數(shù)定義
- listen()、accept()函數(shù)只能用于服務(wù)端
- connect()函數(shù)只能用于客戶端
- socket() 图柏、bind()序六、send()、recv()蚤吹、sendto()例诀、recvfrom()随抠、close()
網(wǎng)絡(luò)編程
阻塞(bloking)IO
資源不可用時(shí),IO請(qǐng)求一直阻塞繁涂,直到反饋結(jié)果(有數(shù)據(jù)或超時(shí))非阻塞(non-bloking)IO
資源不可用時(shí)拱她,IO請(qǐng)求離開(kāi)返回,返回?cái)?shù)據(jù)標(biāo)識(shí)資源不可用同步(synchronous)IO
應(yīng)用阻塞在發(fā)送或接受數(shù)據(jù)的狀態(tài)扔罪,直到數(shù)據(jù)成功傳輸或返回失敗異步(asynchronous)IO
應(yīng)用發(fā)送或接受數(shù)據(jù)后立即返回秉沼,實(shí)際處理是異步執(zhí)行
BIO網(wǎng)絡(luò)編程
客戶端阻塞 - OutputStream.write()
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.Charset;
import java.util.Scanner;
public class BIOClient {
private static Charset charset = Charset.forName("UTF-8");
public static void main(String[] args) throws Exception {
Socket s = new Socket("localhost", 8080);
OutputStream out = s.getOutputStream();
Scanner scanner = new Scanner(System.in);
System.out.println("請(qǐng)輸入:");
String msg = scanner.nextLine();
out.write(msg.getBytes(charset)); // 阻塞,寫完成
scanner.close();
s.close();
}
}
服務(wù)端阻塞 - ServerSocket.accept() 與 BufferedReader.readLine()
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
public class BIOServer {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(8080);
System.out.println("服務(wù)器啟動(dòng)成功");
while (!serverSocket.isClosed()) {
Socket request = serverSocket.accept();// 阻塞
System.out.println("收到新連接 : " + request.toString());
try {
// 接收數(shù)據(jù)步势、打印
InputStream inputStream = request.getInputStream(); // net + i/o
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, "utf-8"));
String msg;
while ((msg = reader.readLine()) != null) { // 沒(méi)有數(shù)據(jù)氧猬,阻塞
if (msg.length() == 0) {
break;
}
System.out.println(msg);
}
System.out.println("收到數(shù)據(jù),來(lái)自:"+ request.toString());
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
request.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
serverSocket.close();
}
}
服務(wù)端 - 加入多線程并模擬HTTP請(qǐng)求協(xié)議返回?cái)?shù)據(jù) - 子線程阻塞
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BIOServer2 {
private static ExecutorService threadPool = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(8080);
System.out.println("服務(wù)器啟動(dòng)成功");
while (!serverSocket.isClosed()) {
Socket request = serverSocket.accept();
System.out.println("收到新連接 : " + request.toString());
threadPool.execute(() -> {
try {
// 接收數(shù)據(jù)背犯、打印
InputStream inputStream = request.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, "utf-8"));
String msg;
while ((msg = reader.readLine()) != null) { // 沒(méi)有數(shù)據(jù)阻塞
if (msg.length() == 0) {
break;
}
System.out.println(msg);
}
System.out.println("收到數(shù)據(jù),來(lái)自:"+ request.toString());
// 響應(yīng)結(jié)果 200
OutputStream outputStream = request.getOutputStream();
outputStream.write("HTTP/1.1 200 OK\r\n".getBytes());
outputStream.write("Content-Length: 11\r\n\r\n".getBytes());
outputStream.write("Hello World".getBytes());
outputStream.flush();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
request.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
serverSocket.close();
}
}
NIO網(wǎng)絡(luò)編程
NIO中三個(gè)核心組件:Buffer緩沖區(qū)坏瘩、Channel通道、Sellector選擇器
Buffer緩沖區(qū)
緩沖區(qū)是一個(gè)可以寫入數(shù)據(jù)的內(nèi)存塊漠魏,然后可以再次讀取倔矾,Buffer三個(gè)重要屬性:
- capacity容量:做位一個(gè)內(nèi)存塊,Buffer具有一定的固定大小
- position位置:寫入模式時(shí)代表寫數(shù)據(jù)的位置柱锹,讀取模式時(shí)代表讀取數(shù)據(jù)的位置
- limit限制:寫入模式哪自,限制等于buffer的容量;讀取模式下禁熏,limit等于寫入的數(shù)據(jù)量
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
import java.nio.LongBuffer;
public class BufferDemo {
public static void main(String[] args) {
// 構(gòu)建一個(gè)byte字節(jié)緩沖區(qū)壤巷,容量是4
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4);
// 默認(rèn)寫入模式,查看三個(gè)重要的指標(biāo)
System.out.println(String.format("初始化:capacity容量:%s, position位置:%s, limit限制:%s", byteBuffer.capacity(),
byteBuffer.position(), byteBuffer.limit()));
// 寫入2字節(jié)的數(shù)據(jù)
byteBuffer.put((byte) 1);
byteBuffer.put((byte) 2);
byteBuffer.put((byte) 3);
// 再看數(shù)據(jù)
System.out.println(String.format("寫入3字節(jié)后瞧毙,capacity容量:%s, position位置:%s, limit限制:%s", byteBuffer.capacity(),
byteBuffer.position(), byteBuffer.limit()));
// 轉(zhuǎn)換為讀取模式(不調(diào)用flip方法胧华,也是可以讀取數(shù)據(jù)的,但是position記錄讀取的位置不對(duì))
System.out.println("#######開(kāi)始讀取");
byteBuffer.flip();
byte a = byteBuffer.get();
System.out.println(a);
byte b = byteBuffer.get();
System.out.println(b);
System.out.println(String.format("讀取2字節(jié)數(shù)據(jù)后宙彪,capacity容量:%s, position位置:%s, limit限制:%s", byteBuffer.capacity(),
byteBuffer.position(), byteBuffer.limit()));
// 繼續(xù)寫入3字節(jié)矩动,此時(shí)讀模式下,limit=3释漆,position=2.繼續(xù)寫入只能覆蓋寫入一條數(shù)據(jù)
// clear()方法清除整個(gè)緩沖區(qū)悲没。compact()方法僅清除已閱讀的數(shù)據(jù)。轉(zhuǎn)為寫入模式
byteBuffer.compact(); // buffer : 1 , 3
byteBuffer.put((byte) 3);
byteBuffer.put((byte) 4);
byteBuffer.put((byte) 5);
System.out.println(String.format("最終的情況男图,capacity容量:%s, position位置:%s, limit限制:%s", byteBuffer.capacity(),
byteBuffer.position(), byteBuffer.limit()));
// rewind() 重置position為0
// mark() 標(biāo)記position的位置
// reset() 重置position為上次mark()標(biāo)記的位置
}
}
ByteBuffer內(nèi)存類型
ByteBuffer為性能關(guān)鍵型代碼提供了直接內(nèi)存(direct堆外)和非直接內(nèi)存(heap堆)兩種實(shí)現(xiàn)
堆外內(nèi)存獲取方式:ByteBuffer buffer = ByteBuffer.allocateDirect(noBytes);
堆外內(nèi)存的好處
- 進(jìn)行網(wǎng)絡(luò)IO或者文件IO時(shí)比heapBuffer少一次拷貝示姿。因?yàn)镚C會(huì)移動(dòng)對(duì)象內(nèi)存,在寫file或者socket的過(guò)程中逊笆,JVM的實(shí)現(xiàn)中會(huì)把數(shù)據(jù)復(fù)制到堆外栈戳,再進(jìn)行寫入
- 堆外內(nèi)存不收GC控制,能降低GC的壓力览露,實(shí)現(xiàn)自動(dòng)管理荧琼。DirectByteBuffer中有一個(gè)Cleaner對(duì)象(PhantomReference),Cleaner被GC前會(huì)執(zhí)行clean方法,能觸發(fā)DirectByteBuffer中定義的Deallocator
Channel通道
Channel的API涵蓋了UDP/TCP網(wǎng)絡(luò)和文件IO,他可以創(chuàng)建連接與傳輸數(shù)據(jù),可以非阻塞的讀取和寫入通道命锄,通道始終讀取或吸入緩沖區(qū)
SocketChannel
SocketChannel用戶建立TCP網(wǎng)絡(luò)連接堰乔,有兩種創(chuàng)建形式:
- 客戶端主動(dòng)發(fā)起服務(wù)器連接
- 服務(wù)器獲取新的連接
讀和寫都變成了非阻塞的方法,wirte()在尚未寫數(shù)據(jù)就可能返回了脐恩,read()可能返回空數(shù)據(jù)镐侯。所以需要再循環(huán)中使用
ServerSocketChannel
ServerSocketChannel可監(jiān)聽(tīng)新建立的TCP連接通道,通過(guò)open()方法創(chuàng)建
ServerSocketChannel.accept() : 如果該通道處于非阻塞模式驶冒,那么如果沒(méi)有掛起的連接苟翻,該方法將立即返回NULL
NIO服務(wù)端 - 基礎(chǔ)實(shí)現(xiàn)
- 循環(huán)內(nèi)只能實(shí)現(xiàn)單線程處理一個(gè)任務(wù)
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
/**
* 直接基于非阻塞的寫法
*/
public class NIOServer {
public static void main(String[] args) throws Exception {
// 創(chuàng)建網(wǎng)絡(luò)服務(wù)端
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); // 設(shè)置為非阻塞模式 - 默認(rèn)都是阻塞的
serverSocketChannel.socket().bind(new InetSocketAddress(8080)); // 綁定端口
System.out.println("啟動(dòng)成功");
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept(); // 獲取新tcp連接通道
// tcp請(qǐng)求 讀取/響應(yīng)
if (socketChannel != null) {
System.out.println("收到新連接 : " + socketChannel.getRemoteAddress());
socketChannel.configureBlocking(false); // 默認(rèn)是阻塞的,一定要設(shè)置為非阻塞
try {
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
// 長(zhǎng)連接情況下,需要手動(dòng)判斷數(shù)據(jù)有沒(méi)有讀取結(jié)束 (此處做一個(gè)簡(jiǎn)單的判斷: 超過(guò)0字節(jié)就認(rèn)為請(qǐng)求結(jié)束了)
if (requestBuffer.position() > 0) break;
}
if(requestBuffer.position() == 0) continue; // 如果沒(méi)數(shù)據(jù)了, 則不繼續(xù)后面的處理
requestBuffer.flip();
byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
System.out.println("收到數(shù)據(jù),來(lái)自:"+ socketChannel.getRemoteAddress());
// 響應(yīng)結(jié)果 200
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 11\r\n\r\n" +
"Hello World";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
while (buffer.hasRemaining()) {
socketChannel.write(buffer);// 非阻塞
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 用到了非阻塞的API, 在設(shè)計(jì)上,和BIO可以有很大的不同.繼續(xù)改進(jìn)
}
}
NIO客戶端
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
public class NIOClient {
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
while (!socketChannel.finishConnect()) {
// 沒(méi)連接上,則一直等待
Thread.yield();
}
Scanner scanner = new Scanner(System.in);
System.out.println("請(qǐng)輸入:");
// 發(fā)送內(nèi)容
String msg = scanner.nextLine();
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
// 讀取響應(yīng)
System.out.println("收到服務(wù)端響應(yīng):");
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
// 長(zhǎng)連接情況下,需要手動(dòng)判斷數(shù)據(jù)有沒(méi)有讀取結(jié)束 (此處做一個(gè)簡(jiǎn)單的判斷: 超過(guò)0字節(jié)就認(rèn)為請(qǐng)求結(jié)束了)
if (requestBuffer.position() > 0) break;
}
requestBuffer.flip();
byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
scanner.close();
socketChannel.close();
}
}
NIO服務(wù)端優(yōu)化
- 用到了非阻塞的API, 再設(shè)計(jì)上,和BIO可以有很大的不同
- 問(wèn)題: 輪詢通道的方式,低效,浪費(fèi)CPU
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
/**
* 直接基于非阻塞的寫法,一個(gè)線程處理輪詢所有請(qǐng)求
*/
public class NIOServer1 {
/**
* 已經(jīng)建立連接的集合
*/
private static ArrayList<SocketChannel> channels = new ArrayList<>();
public static void main(String[] args) throws Exception {
// 創(chuàng)建網(wǎng)絡(luò)服務(wù)端
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); // 設(shè)置為非阻塞模式
serverSocketChannel.socket().bind(new InetSocketAddress(8080)); // 綁定端口
System.out.println("啟動(dòng)成功");
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept(); // 獲取新tcp連接通道
// tcp請(qǐng)求 讀取/響應(yīng)
if (socketChannel != null) {
System.out.println("收到新連接 : " + socketChannel.getRemoteAddress());
socketChannel.configureBlocking(false); // 默認(rèn)是阻塞的,一定要設(shè)置為非阻塞
channels.add(socketChannel);
} else {
// 沒(méi)有新連接的情況下,就去處理現(xiàn)有連接的數(shù)據(jù),處理完的就刪除掉
Iterator<SocketChannel> iterator = channels.iterator();
while (iterator.hasNext()) {
SocketChannel ch = iterator.next();
try {
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
if (ch.read(requestBuffer) == 0) {
// 等于0,代表這個(gè)通道沒(méi)有數(shù)據(jù)需要處理,那就待會(huì)再處理
continue;
}
while (ch.isOpen() && ch.read(requestBuffer) != -1) {
// 長(zhǎng)連接情況下,需要手動(dòng)判斷數(shù)據(jù)有沒(méi)有讀取結(jié)束 (此處做一個(gè)簡(jiǎn)單的判斷: 超過(guò)0字節(jié)就認(rèn)為請(qǐng)求結(jié)束了)
if (requestBuffer.position() > 0) break;
}
if(requestBuffer.position() == 0) continue; // 如果沒(méi)數(shù)據(jù)了, 則不繼續(xù)后面的處理
requestBuffer.flip();
byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
System.out.println("收到數(shù)據(jù),來(lái)自:" + ch.getRemoteAddress());
// 響應(yīng)結(jié)果 200
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 11\r\n\r\n" +
"Hello World";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
while (buffer.hasRemaining()) {
ch.write(buffer);
}
iterator.remove();
} catch (IOException e) {
e.printStackTrace();
iterator.remove();
}
}
}
}
}
}
Selector選擇器
Selector可以堅(jiān)持一個(gè)或者多個(gè)NIO通道,并確定哪些通道已經(jīng)準(zhǔn)備好進(jìn)行讀取或?qū)懭肫郏瑢?shí)現(xiàn)一個(gè)線程處理多個(gè)通道的核心概念理解:事件驅(qū)動(dòng)機(jī)制
通過(guò)Selector 事件輪詢實(shí)現(xiàn) NIO服務(wù)端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 結(jié)合Selector實(shí)現(xiàn)的非阻塞服務(wù)端(放棄對(duì)channel的輪詢,借助消息通知機(jī)制)
*/
public class NIOServerV2 {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建網(wǎng)絡(luò)服務(wù)端ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); // 設(shè)置為非阻塞模式
// 2. 構(gòu)建一個(gè)Selector選擇器,并且將channel注冊(cè)上去
Selector selector = Selector.open();
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, serverSocketChannel);// 將serverSocketChannel注冊(cè)到selector
selectionKey.interestOps(SelectionKey.OP_ACCEPT); // 對(duì)serverSocketChannel上面的accept事件感興趣(serverSocketChannel只能支持accept操作)
// 3. 綁定端口
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
System.out.println("啟動(dòng)成功");
while (true) {
// 不再輪詢通道,改用下面輪詢事件的方式.select方法有阻塞效果,直到有事件通知才會(huì)有返回
selector.select();
// 獲取事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 遍歷查詢結(jié)果e
Iterator<SelectionKey> iter = selectionKeys.iterator();
while (iter.hasNext()) {
// 被封裝的查詢結(jié)果
SelectionKey key = iter.next();
iter.remove();
// 關(guān)注 Read 和 Accept兩個(gè)事件
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.attachment();
// 將拿到的客戶端連接通道,注冊(cè)到selector上面
SocketChannel clientSocketChannel = server.accept(); // mainReactor 輪詢accept
clientSocketChannel.configureBlocking(false);
clientSocketChannel.register(selector, SelectionKey.OP_READ, clientSocketChannel); //再次注冊(cè)事件
System.out.println("收到新連接 : " + clientSocketChannel.getRemoteAddress());
}
if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.attachment();
try {
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
// 長(zhǎng)連接情況下,需要手動(dòng)判斷數(shù)據(jù)有沒(méi)有讀取結(jié)束 (此處做一個(gè)簡(jiǎn)單的判斷: 超過(guò)0字節(jié)就認(rèn)為請(qǐng)求結(jié)束了)
if (requestBuffer.position() > 0) break;
}
if(requestBuffer.position() == 0) continue; // 如果沒(méi)數(shù)據(jù)了, 則不繼續(xù)后面的處理
requestBuffer.flip();
byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
System.out.println("收到數(shù)據(jù),來(lái)自:" + socketChannel.getRemoteAddress());
// TODO 業(yè)務(wù)操作 數(shù)據(jù)庫(kù) 接口調(diào)用等等
// 響應(yīng)結(jié)果 200
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 11\r\n\r\n" +
"Hello World";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
} catch (IOException e) {
// e.printStackTrace();
key.cancel(); // 取消事件訂閱
}
}
}
selector.selectNow();
}
// 問(wèn)題: 此處一個(gè)selector監(jiān)聽(tīng)所有事件,一個(gè)線程處理所有請(qǐng)求事件. 會(huì)成為瓶頸! 要有多線程的運(yùn)用
}
}
Reactor模式
Reactor模式是一種典型的事件驅(qū)動(dòng)的編程模型崇猫,是一種為處理并發(fā)服務(wù)請(qǐng)求,并將請(qǐng)求提交到一個(gè)或
者多個(gè)服務(wù)處理程序的事件設(shè)計(jì)模式
NIO與多線程結(jié)合 Reactor模式
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* NIO selector 多路復(fù)用reactor線程模型
*/
public class NIOServerV3 {
/** 處理業(yè)務(wù)操作的線程 */
private static ExecutorService workPool = Executors.newCachedThreadPool();
/**
* 封裝了selector.select()等事件輪詢的代碼
*/
abstract class ReactorThread extends Thread {
Selector selector;
LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
/**
* Selector監(jiān)聽(tīng)到有事件后,調(diào)用這個(gè)方法
*/
public abstract void handler(SelectableChannel channel) throws Exception;
private ReactorThread() throws IOException {
selector = Selector.open();
}
volatile boolean running = false;
@Override
public void run() {
// 輪詢Selector事件
while (running) {
try {
// 執(zhí)行隊(duì)列中的任務(wù)
Runnable task;
while ((task = taskQueue.poll()) != null) {
task.run();
}
selector.select(1000);
// 獲取查詢結(jié)果
Set<SelectionKey> selected = selector.selectedKeys();
// 遍歷查詢結(jié)果
Iterator<SelectionKey> iter = selected.iterator();
while (iter.hasNext()) {
// 被封裝的查詢結(jié)果
SelectionKey key = iter.next();
iter.remove();
int readyOps = key.readyOps();
// 關(guān)注 Read 和 Accept兩個(gè)事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
try {
SelectableChannel channel = (SelectableChannel) key.attachment();
channel.configureBlocking(false);
handler(channel);
if (!channel.isOpen()) {
key.cancel(); // 如果關(guān)閉了,就取消這個(gè)KEY的訂閱
}
} catch (Exception ex) {
key.cancel(); // 如果有異常,就取消這個(gè)KEY的訂閱
}
}
}
selector.selectNow();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private SelectionKey register(SelectableChannel channel) throws Exception {
// 為什么register要以任務(wù)提交的形式需忿,讓reactor線程去處理诅炉?
// 因?yàn)榫€程在執(zhí)行channel注冊(cè)到selector的過(guò)程中,會(huì)和調(diào)用selector.select()方法的線程爭(zhēng)用同一把鎖
// 而select()方法實(shí)在eventLoop中通過(guò)while循環(huán)調(diào)用的屋厘,爭(zhēng)搶的可能性很高涕烧,為了讓register能更快的執(zhí)行,就放到同一個(gè)線程來(lái)處理
FutureTask<SelectionKey> futureTask = new FutureTask<>(() -> channel.register(selector, 0, channel));
taskQueue.add(futureTask);
return futureTask.get();
}
private void doStart() {
if (!running) {
running = true;
start();
}
}
}
private ServerSocketChannel serverSocketChannel;
// 1汗洒、創(chuàng)建多個(gè)線程 - accept處理reactor線程 (accept線程)
private ReactorThread[] mainReactorThreads = new ReactorThread[1];
// 2议纯、創(chuàng)建多個(gè)線程 - io處理reactor線程 (I/O線程)
private ReactorThread[] subReactorThreads = new ReactorThread[8];
/**
* 初始化線程組
*/
private void newGroup() throws IOException {
// 創(chuàng)建IO線程,負(fù)責(zé)處理客戶端連接以后socketChannel的IO讀寫
for (int i = 0; i < subReactorThreads.length; i++) {
subReactorThreads[i] = new ReactorThread() {
@Override
public void handler(SelectableChannel channel) throws IOException {
// work線程只負(fù)責(zé)處理IO處理,不處理accept事件
SocketChannel ch = (SocketChannel) channel;
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
while (ch.isOpen() && ch.read(requestBuffer) != -1) {
// 長(zhǎng)連接情況下,需要手動(dòng)判斷數(shù)據(jù)有沒(méi)有讀取結(jié)束 (此處做一個(gè)簡(jiǎn)單的判斷: 超過(guò)0字節(jié)就認(rèn)為請(qǐng)求結(jié)束了)
if (requestBuffer.position() > 0) break;
}
if (requestBuffer.position() == 0) return; // 如果沒(méi)數(shù)據(jù)了, 則不繼續(xù)后面的處理
requestBuffer.flip();
byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
System.out.println(Thread.currentThread().getName() + "收到數(shù)據(jù),來(lái)自:" + ch.getRemoteAddress());
// TODO 業(yè)務(wù)操作 數(shù)據(jù)庫(kù)溢谤、接口...
workPool.submit(() -> {
});
// 響應(yīng)結(jié)果 200
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 11\r\n\r\n" +
"Hello World";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
while (buffer.hasRemaining()) {
ch.write(buffer);
}
}
};
}
// 創(chuàng)建mainReactor線程, 只負(fù)責(zé)處理serverSocketChannel
for (int i = 0; i < mainReactorThreads.length; i++) {
mainReactorThreads[i] = new ReactorThread() {
AtomicInteger incr = new AtomicInteger(0);
@Override
public void handler(SelectableChannel channel) throws Exception {
// 只做請(qǐng)求分發(fā)瞻凤,不做具體的數(shù)據(jù)讀取
ServerSocketChannel ch = (ServerSocketChannel) channel;
SocketChannel socketChannel = ch.accept();
socketChannel.configureBlocking(false);
// 收到連接建立的通知之后,分發(fā)給I/O線程繼續(xù)去讀取數(shù)據(jù)
int index = incr.getAndIncrement() % subReactorThreads.length;
ReactorThread workEventLoop = subReactorThreads[index];
workEventLoop.doStart();
SelectionKey selectionKey = workEventLoop.register(socketChannel);
selectionKey.interestOps(SelectionKey.OP_READ);
System.out.println(Thread.currentThread().getName() + "收到新連接 : " + socketChannel.getRemoteAddress());
}
};
}
}
/**
* 初始化channel,并且綁定一個(gè)eventLoop線程
*
* @throws IOException IO異常
*/
private void initAndRegister() throws Exception {
// 1溯香、 創(chuàng)建ServerSocketChannel
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
// 2鲫构、 將serverSocketChannel注冊(cè)到selector
int index = new Random().nextInt(mainReactorThreads.length);
mainReactorThreads[index].doStart();
SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel);
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
}
/**
* 綁定端口
*
* @throws IOException IO異常
*/
private void bind() throws IOException {
// 1、 正式綁定端口玫坛,對(duì)外服務(wù)
serverSocketChannel.bind(new InetSocketAddress(8080));
System.out.println("啟動(dòng)完成结笨,端口8080");
}
public static void main(String[] args) throws Exception {
NIOServerV3 nioServerV3 = new NIOServerV3();
nioServerV3.newGroup(); // 1、 創(chuàng)建main和sub兩組線程
nioServerV3.initAndRegister(); // 2湿镀、 創(chuàng)建serverSocketChannel炕吸,注冊(cè)到mainReactor線程上的selector上
nioServerV3.bind(); // 3、 為serverSocketChannel綁定端口
}
}