一酪耕、IO模型
IO模型就是說用什么樣的通道進行數(shù)據(jù)的發(fā)送和接收呜象,Java共支持3種網(wǎng)絡編程IO模式:BIO孽惰,NIO晚岭,AIO
BIO(Blocking IO)
同步阻塞模型,一個客戶端連接對應一個處理線程
BIO代碼示例:
package com.tuling.bio;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
public class SocketServer {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(9000);
while (true) {
System.out.println("等待連接灰瞻。腥例。");
//阻塞方法
Socket clientSocket = serverSocket.accept();
System.out.println("有客戶端連接了。酝润。");
new Thread(new Runnable() {
@Override
public void run() {
try {
handler(clientSocket);
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}
private static void handler(Socket clientSocket) throws IOException {
byte[] bytes = new byte[1024];
System.out.println("準備read燎竖。。");
//接收客戶端的數(shù)據(jù)要销,阻塞方法构回,沒有數(shù)據(jù)可讀時就阻塞
int read = clientSocket.getInputStream().read(bytes);
System.out.println("read完畢。疏咐。");
if (read != -1) {
System.out.println("接收到客戶端的數(shù)據(jù):" + new String(bytes, 0, read));
}
clientSocket.getOutputStream().write("HelloClient".getBytes());
clientSocket.getOutputStream().flush();
}
}
//客戶端代碼
public class SocketClient {
public static void main(String[] args) throws IOException {
Socket socket = new Socket("localhost", 9000);
//向服務端發(fā)送數(shù)據(jù)
socket.getOutputStream().write("HelloServer".getBytes());
socket.getOutputStream().flush();
System.out.println("向服務端發(fā)送數(shù)據(jù)結束");
byte[] bytes = new byte[1024];
//接收服務端回傳的數(shù)據(jù)
socket.getInputStream().read(bytes);
System.out.println("接收到服務端的數(shù)據(jù):" + new String(bytes));
socket.close();
}
}
缺點:
- IO代碼里read操作是阻塞操作纤掸,如果連接不做數(shù)據(jù)讀寫操作會導致線程阻塞,浪費資源浑塞。
- 如果線程很多借跪,會導致服務器線程太多,壓力太大酌壕,比如C10K問題掏愁。
應用場景:
BIO 方式適用于連接數(shù)目比較小且固定的架構, 這種方式對服務器資源要求比較高卵牍, 但程序簡單易理解果港。
NIO(Non Blocking IO)
同步非阻塞,服務器實現(xiàn)模式為一個線程可以處理多個請求(連接)糊昙,客戶端發(fā)送的連接請求都會注冊到多路復用器selector上辛掠,多路復用器輪詢到連接有IO請求就進行處理,JDK1.4開始引入释牺。
應用場景:
NIO方式適用于連接數(shù)目多且連接比較短(輕操作) 的架構萝衩, 比如聊天服務器回挽, 彈幕系統(tǒng), 服務器間通訊欠气,編程比較復雜
NIO非阻塞代碼示例:
package com.tuling.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class NioServer {
// 保存客戶端連接
static List<SocketChannel> channelList = new ArrayList<>();
public static void main(String[] args) throws IOException, InterruptedException {
// 創(chuàng)建NIO ServerSocketChannel,與BIO的serverSocket類似
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(9000));
// 設置ServerSocketChannel為非阻塞
serverSocket.configureBlocking(false);
System.out.println("服務啟動成功");
while (true) {
// 非阻塞模式accept方法不會阻塞厅各,否則會阻塞
// NIO的非阻塞是由操作系統(tǒng)內(nèi)部實現(xiàn)的,底層調(diào)用了linux內(nèi)核的accept函數(shù)
SocketChannel socketChannel = serverSocket.accept();
if (socketChannel != null) { // 如果有客戶端進行連接
System.out.println("連接成功");
// 設置SocketChannel為非阻塞
socketChannel.configureBlocking(false);
// 保存客戶端連接在List中
channelList.add(socketChannel);
}
// 遍歷連接進行數(shù)據(jù)讀取
Iterator<SocketChannel> iterator = channelList.iterator();
while (iterator.hasNext()) {
SocketChannel sc = iterator.next();
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
// 非阻塞模式read方法不會阻塞预柒,否則會阻塞
int len = sc.read(byteBuffer);
// 如果有數(shù)據(jù)队塘,把數(shù)據(jù)打印出來
if (len > 0) {
System.out.println("接收到消息:" + new String(byteBuffer.array()));
} else if (len == -1) { // 如果客戶端斷開,把socket從集合中去掉
iterator.remove();
System.out.println("客戶端斷開連接");
}
}
}
}
}
總結:如果連接數(shù)太多的話宜鸯,會有大量的無效遍歷憔古,假如有10000個連接,其中只有1000個連接有寫數(shù)據(jù)淋袖,但是由于其他9000個連接并沒有斷開鸿市,我們還是要每次輪詢遍歷一萬次,其中有十分之九的遍歷都是無效的即碗,這顯然不是一個讓人很滿意的狀態(tài)焰情。
NIO引入多路復用器代碼示例:
package com.tuling.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NioSelectorServer {
public static void main(String[] args) throws IOException, InterruptedException {
// 創(chuàng)建NIO ServerSocketChannel
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(9000));
// 設置ServerSocketChannel為非阻塞
serverSocket.configureBlocking(false);
// 打開Selector處理Channel,即創(chuàng)建epoll
Selector selector = Selector.open();
// 把ServerSocketChannel注冊到selector上剥懒,并且selector對客戶端accept連接操作感興趣
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服務啟動成功");
while (true) {
// 阻塞等待需要處理的事件發(fā)生
selector.select();
// 獲取selector中注冊的全部事件的 SelectionKey 實例
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 遍歷SelectionKey對事件進行處理
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 如果是OP_ACCEPT事件内舟,則進行連接獲取和事件注冊
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = server.accept();
socketChannel.configureBlocking(false);
// 這里只注冊了讀事件,如果需要給客戶端發(fā)送數(shù)據(jù)可以注冊寫事件
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("客戶端連接成功");
} else if (key.isReadable()) { // 如果是OP_READ事件初橘,則進行讀取和打印
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
int len = socketChannel.read(byteBuffer);
// 如果有數(shù)據(jù)验游,把數(shù)據(jù)打印出來
if (len > 0) {
System.out.println("接收到消息:" + new String(byteBuffer.array()));
} else if (len == -1) { // 如果客戶端斷開連接,關閉Socket
System.out.println("客戶端斷開連接");
socketChannel.close();
}
}
//從事件集合里刪除本次處理的key保檐,防止下次select重復處理
iterator.remove();
}
}
}
}
NIO 有三大核心組件: Channel(通道)耕蝉, Buffer(緩沖區(qū)),Selector(多路復用器)
- channel 類似于流夜只,每個 channel 對應一個 buffer緩沖區(qū)垒在,buffer 底層就是個數(shù)組
- channel 會注冊到 selector 上,由 selector 根據(jù) channel 讀寫事件的發(fā)生將其交由某個空閑的線程處理
- NIO 的 Buffer 和 channel 都是既可以讀也可以寫
注意:多路復用器selector包含了所有channel的accpet扔亥、read監(jiān)聽事件爪膊,若有事件發(fā)生便遍歷所有的channel確定有事件的channel,而后進行的accpet砸王、read操作就會暢通無阻,這也是NIO非阻塞的前提原因峦阁。
注意:多路復用器selector監(jiān)聽ServerSocketChannel的accpet事件發(fā)生后才會創(chuàng)建SocketChannel谦铃,同時創(chuàng)建的SocketChannel還要注冊read事件。
注意:SocketChannel對象客戶端和服務端兩側都有榔昔,同時對應的buffer也是驹闰。
NIO底層在JDK1.4版本是用linux的內(nèi)核函數(shù)select()或poll()來實現(xiàn)瘪菌,跟上面的NioServer代碼類似,selector每次都會輪詢所有的sockchannel看下哪個channel有讀寫事件嘹朗,有的話就處理师妙,沒有就繼續(xù)遍歷,JDK1.5開始引入了epoll基于事件響應機制來優(yōu)化NIO屹培。
NioSelectorServer 代碼里如下幾個方法非常重要默穴,我們從Hotspot與Linux內(nèi)核函數(shù)級別來理解下
Selector.open() //創(chuàng)建多路復用器
socketChannel.register(selector, SelectionKey.OP_READ) //將channel注冊到多路復用器上
selector.select() //阻塞等待需要處理的事件發(fā)生
問題:NIO中selector.select()使用epoll形式監(jiān)聽事件的時候,事件依次過來后直接觸發(fā)線程依次進行處理褪秀,為什么還需要循環(huán)遍歷事件處理呢蓄诽?
- 因為selector.select()使用epoll形式監(jiān)聽事件的時候,有事件觸發(fā)后不會馬上進行處理媒吗,而是有時間延后的仑氛,期間有多個事件觸發(fā)的話,會統(tǒng)一放在緩存列表中時間一到統(tǒng)一處理闸英。
select | poll | epoll(jdk 1.5及以上) | |
---|---|---|---|
操作方式 | 遍歷 | 遍歷 | 回調(diào) |
底層實現(xiàn) | 數(shù)組 | 鏈表 | 哈希表 |
IO效率 | 每次調(diào)用都進行線性遍歷锯岖,時間復雜度為O(n) | 每次調(diào)用都進行線性遍歷,時間復雜度為O(n) | 事件通知方式甫何,每當有IO事件就緒出吹,系統(tǒng)注冊的回調(diào)函數(shù)就會被調(diào)用,時間復雜度O(1) |
最大連接 | 有上限 | 無上限 | 無上限 |
Redis線程模型
Redis就是典型的基于epoll的NIO線程模型(nginx也是)沛豌,epoll實例收集所有事件(連接與讀寫事件)趋箩,由一個服務端線程連續(xù)處理所有事件命令。
AIO(NIO 2.0)
異步非阻塞加派, 由操作系統(tǒng)完成后回調(diào)通知服務端程序啟動線程去處理叫确, 一般適用于連接數(shù)較多且連接時間較長的應用
應用場景:
AIO方式適用于連接數(shù)目多且連接比較長(重操作)的架構,JDK7 開始支持
AIO代碼示例:
package com.tuling.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class AIOServer {
public static void main(String[] args) throws Exception {
final AsynchronousServerSocketChannel serverChannel =
AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(9000));
serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
try {
System.out.println("2--"+Thread.currentThread().getName());
// 再此接收客戶端連接芍锦,如果不寫這行代碼后面的客戶端連接連不上服務端
serverChannel.accept(attachment, this);
System.out.println(socketChannel.getRemoteAddress());
ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
System.out.println("3--"+Thread.currentThread().getName());
buffer.flip();
System.out.println(new String(buffer.array(), 0, result));
socketChannel.write(ByteBuffer.wrap("HelloClient".getBytes()));
}
@Override
public void failed(Throwable exc, ByteBuffer buffer) {
exc.printStackTrace();
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
});
System.out.println("1--"+Thread.currentThread().getName());
Thread.sleep(Integer.MAX_VALUE);
}
}
package com.tuling.aio;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
public class AIOClient {
public static void main(String... args) throws Exception {
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 9000)).get();
socketChannel.write(ByteBuffer.wrap("HelloServer".getBytes()));
ByteBuffer buffer = ByteBuffer.allocate(512);
Integer len = socketChannel.read(buffer).get();
if (len != -1) {
System.out.println("客戶端收到信息:" + new String(buffer.array(), 0, len));
}
}
}
BIO | NIO | AIO | |
---|---|---|---|
IO模型 | 同步阻塞 | 同步非阻塞(多路復用) | 異步非阻塞 |
編程難度 | 簡單 | 復雜 | 復雜 |
可靠性 | 差 | 好 | 好 |
吞吐量 | 低 | 高 | 高 |
為什么Netty使用NIO而不是AIO竹勉?
在Linux系統(tǒng)上,AIO的底層實現(xiàn)仍使用Epoll娄琉,沒有很好實現(xiàn)AIO次乓,因此在性能上沒有明顯的優(yōu)勢,而且被JDK封裝了一層不容易深度優(yōu)化孽水,Linux上AIO還不夠成熟票腰。Netty是異步非阻塞框架,Netty在NIO上做了很多異步的封裝女气。