Java中的IO潘鲫、NIO翁逞、AIO:
BIO:在Java1.4之前,我們建立網(wǎng)絡(luò)連接均使用BIO溉仑,屬于同步阻塞IO挖函。默認(rèn)情況下,當(dāng)有一條請求接入就有一條線程專門接待浊竟。所以怨喘,在客戶端向服務(wù)端請求時津畸,會詢問是否有空閑線程進(jìn)行接待,如若沒有則一直等待或拒接必怜。當(dāng)并發(fā)量小時還可以接受肉拓,當(dāng)請求量一多起來則會有許多線程生成,在Java中梳庆,多線程的上下文切換會消耗計算機(jī)有限的資源和性能暖途,造成資源浪費(fèi)。
NIO:NIO的出現(xiàn)是為了解決再BIO下的大并發(fā)量問題膏执。其特點(diǎn)是能用一條線程管理所有連接驻售。如下圖所示:
NIO是同步非阻塞模型,通過一條線程控制選擇器(Selector)來管理多個Channel更米,減少創(chuàng)建線程和上下文切換的浪費(fèi)欺栗。當(dāng)線程通過選擇器向某條Channel請求數(shù)據(jù)但其沒有數(shù)據(jù)時,是不會阻塞的壳快,直接返回纸巷,繼續(xù)干其他事。而一旦某Channel就緒眶痰,線程就能去調(diào)用請求數(shù)據(jù)等操作瘤旨。當(dāng)該線程對某條Channel進(jìn)行寫操作時同樣不會被阻塞,所以該線程能夠?qū)Χ鄠€Channel進(jìn)行管理竖伯。
NIO是面向緩沖流的存哲,即數(shù)據(jù)寫入寫出都是通過 Channel —— Buffer 這一途徑。(雙向流通)
AIO:與之前兩個IO模型不同的是七婴,AIO屬于異步非阻塞模型祟偷。當(dāng)進(jìn)行讀寫操作時只須調(diào)用api的read方法和write方法,這兩種方法均是異步打厘。對于讀方法來說修肠,當(dāng)有流可讀取時,操作系統(tǒng)會將可讀的流傳入read方法的緩沖區(qū)户盯,并通知應(yīng)用程序嵌施;對于寫操作而言,當(dāng)操作系統(tǒng)將write方法傳遞的流寫入完畢時莽鸭,操作系統(tǒng)主動通知應(yīng)用程序吗伤。換言之就是當(dāng)調(diào)用完api后,操作系統(tǒng)完成后會調(diào)用回調(diào)函數(shù)硫眨。
總結(jié):一般IO分為同步阻塞模型(BIO)足淆,同步非阻塞模型(NIO),異步阻塞模型,異步非阻塞模型(AIO)
同步阻塞模型指的是當(dāng)調(diào)用io操作時必須等到其io操作結(jié)束
同步非阻塞模型指當(dāng)調(diào)用io操作時不必等待可以繼續(xù)干其他事巧号,但必須不斷詢問io操作是否完成族奢。
異步阻塞模型指應(yīng)用調(diào)用io操作后,由操作系統(tǒng)完成io操作裂逐,但應(yīng)用必須等待或去詢問操作系統(tǒng)是否完成歹鱼。
異步非阻塞指應(yīng)用調(diào)用io操作后,由操作系統(tǒng)完成io操作并調(diào)用回調(diào)函數(shù)卜高,應(yīng)用完成放手不管弥姻。
NIO的小Demo之服務(wù)端
首先,先看下服務(wù)端的大體代碼
public class ServerHandle implements Runnable{
//帶參數(shù)構(gòu)造函數(shù)
public ServerHandle(int port){
}
//停止方法
public void shop(){
}
//寫方法
private void write(SocketChannel socketChannel, String response)throws IOException{
}
//當(dāng)有連接進(jìn)來時的處理方法
private void handleInput(SelectionKey key) throws IOException{
}
//服務(wù)端運(yùn)行主體方法
@Override
public void run() {
}
}
首先我們先看看該服務(wù)端的構(gòu)造函數(shù)的實(shí)現(xiàn):
public ServerHandle(int port){
try {
//創(chuàng)建選擇器
selector = Selector.open();
//打開監(jiān)聽通道
serverSocketChannel = ServerSocketChannel.open();
//設(shè)置為非阻塞模式
serverSocketChannel.configureBlocking(false);
//傳入端口掺涛,并設(shè)定連接隊(duì)列最大為1024
serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);
//監(jiān)聽客戶端請求
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//標(biāo)記啟動標(biāo)志
started = true;
System.out.println("服務(wù)器已啟動庭敦,端口號為:" + port);
} catch (IOException e){
e.printStackTrace();
System.exit(1);
}
}
在這里創(chuàng)建了選擇器和監(jiān)聽通道,并將該監(jiān)聽通道注冊到選擇器上并選擇其感興趣的事件(accept)薪缆。后續(xù)其他接入的連接都將通過該 監(jiān)聽通道 傳入秧廉。
然后就是寫方法的實(shí)現(xiàn):
private void doWrite(SocketChannel channel, String response) throws IOException {
byte[] bytes = response.getBytes();
ByteBuffer wirteBuffer = ByteBuffer.allocate(bytes.length);
wirteBuffer.put(bytes);
//將寫模式改為讀模式
wirteBuffer.flip();
//寫入管道
channel.write(wirteBuffer);
}
其次是當(dāng)由事件傳入時,即對連接進(jìn)來的鏈接的處理方法
private void handleInput(SelectionKey key) throws IOException{
//當(dāng)該鍵可用時
if (key.isValid()){
if (key.isAcceptable()){
//返回該密鑰創(chuàng)建的通道拣帽。
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
通過該通道獲取鏈接進(jìn)來的通道
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()){
//返回該密鑰創(chuàng)建的通道疼电。
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int readBytes = socketChannel.read(byteBuffer);
if (readBytes > 0){
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
String expression = new String(bytes, "UTF-8");
System.out.println("服務(wù)器收到的信息:" + expression);
//此處是為了區(qū)別打印在工作臺上的數(shù)據(jù)是由客戶端產(chǎn)生還是服務(wù)端產(chǎn)生
doWrite(socketChannel, "+++++" + expression + "+++++");
} else if(readBytes == 0){
//無數(shù)據(jù),忽略
}else if (readBytes < 0){
//資源關(guān)閉
key.cancel();
socketChannel.close();
}
}
}
}
這里要說明的是减拭,只要ServerSocketChannel及SocketChannel向Selector注冊了特定的事件蔽豺,Selector就會監(jiān)控這些事件是否發(fā)生。
如在構(gòu)造方法中有一通道serverSocketChannel注冊了accept事件拧粪。當(dāng)其就緒時就可以通過調(diào)用selector的selectorKeys()方法修陡,訪問”已選擇鍵集“中的就緒通道。
壓軸方法:
@Override
public void run() {
//循環(huán)遍歷
while (started) {
try {
//當(dāng)沒有就緒事件時阻塞
selector.select();
//返回就緒通道的鍵
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
SelectionKey key;
while (iterator.hasNext()){
key = iterator.next();
//獲取后必須移除可霎,否則會陷入死循環(huán)
iterator.remove();
try {
//對就緒通道的處理方法魄鸦,上述有描述
handleInput(key);
} catch (Exception e){
if (key != null){
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
}catch (Throwable throwable){
throwable.printStackTrace();
}
}
}
此方法為服務(wù)端的主體方法。大致流程如下:
- 打開ServerSocketChannel癣朗,監(jiān)聽客戶端連接
- 綁定監(jiān)聽端口拾因,設(shè)置連接為非阻塞模式(阻塞模式下不能注冊到選擇器)
- 創(chuàng)建Reactor線程,創(chuàng)建選擇器并啟動線程
- 將ServerSocketChannel注冊到Reactor線程中的Selector上旷余,監(jiān)聽ACCEPT事件
- Selector輪詢準(zhǔn)備就緒的key
- Selector監(jiān)聽到新的客戶端接入绢记,處理新的接入請求,完成TCP三次握手荣暮,簡歷物理鏈路
- 設(shè)置客戶端鏈路為非阻塞模式
- 將新接入的客戶端連接注冊到Reactor線程的Selector上,監(jiān)聽讀操作罩驻,讀取客戶端發(fā)送的網(wǎng)絡(luò)消息
異步讀取客戶端消息到緩沖區(qū) - 調(diào)用write將消息異步發(fā)送給客戶端
NIO的小Demo之客戶端
public class ClientHandle implements Runnable{
//構(gòu)造函數(shù)穗酥,構(gòu)造時順便綁定
public ClientHandle(String ip, int port){
}
//處理就緒通道
private void handleInput(SelectionKey key) throws IOException{
}
//寫方法(與服務(wù)端的寫方法一致)
private void doWrite(SocketChannel channel,String request) throws IOException{
}
//連接到服務(wù)端
private void doConnect() throws IOException{
}
//發(fā)送信息
public void sendMsg(String msg) throws Exception{
}
}
首先先看構(gòu)造函數(shù)的實(shí)現(xiàn):
public ClientHandle(String ip,int port) {
this.host = ip;
this.port = port;
try{
//創(chuàng)建選擇器
selector = Selector.open();
//打開監(jiān)聽通道
socketChannel = SocketChannel.open();
//如果為 true,則此通道將被置于阻塞模式;如果為 false砾跃,則此通道將被置于非阻塞模式
socketChannel.configureBlocking(false);
started = true;
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
}
接下來看對就緒通道的處理辦法:
private void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
SocketChannel sc = (SocketChannel) key.channel();
if(key.isConnectable()){
//這里的作用將在后面的代碼(doConnect方法)說明
if(sc.finishConnect()){
System.out.println("已連接事件");
}
else{
System.exit(1);
}
}
//讀消息
if(key.isReadable()){
//創(chuàng)建ByteBuffer骏啰,并開辟一個1k的緩沖區(qū)
ByteBuffer buffer = ByteBuffer.allocate(1024);
//讀取請求碼流,返回讀取到的字節(jié)數(shù)
int readBytes = sc.read(buffer);
//讀取到字節(jié)抽高,對字節(jié)進(jìn)行編解碼
if(readBytes>0){
buffer.flip();
//根據(jù)緩沖區(qū)可讀字節(jié)數(shù)創(chuàng)建字節(jié)數(shù)組
byte[] bytes = new byte[buffer.remaining()];
//將緩沖區(qū)可讀字節(jié)數(shù)組復(fù)制到新建的數(shù)組中
buffer.get(bytes);
String result = new String(bytes,"UTF-8");
System.out.println("客戶端收到消息:" + result);
}lse if(readBytes==0){
//忽略
}else if(readBytes<0){
//鏈路已經(jīng)關(guān)閉判耕,釋放資源
key.cancel();
sc.close();
}
}
}
}
在run方法之前需先看下此方法的實(shí)現(xiàn):
private void doConnect() throws IOException{
if(socketChannel.connect(new InetSocketAddress(host,port))){
System.out.println("connect");
}
else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
System.out.println("register");
}
}
當(dāng)SocketChannel工作于非阻塞模式下時,調(diào)用connect()時會立即返回:
如果連接建立成功則返回的是true(比如連接localhost時翘骂,能立即建立起連接)壁熄,否則返回false。
在非阻塞模式下碳竟,返回false后草丧,必須要在隨后的某個地方調(diào)用finishConnect()方法完成連接。
當(dāng)SocketChannel處于阻塞模式下時莹桅,調(diào)用connect()時會進(jìn)入阻塞昌执,直至連接建立成功或者發(fā)生IO錯誤時,才從阻塞狀態(tài)中退出诈泼。
所以該代碼在connect服務(wù)端后返回false(但還是有作用的)懂拾,并在else語句將該通道注冊在選擇器上并選擇connect事件。
客戶端的run方法:
@Override
public void run() {
try{
doConnect();
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
//循環(huán)遍歷selector
while(started){
try{
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key ;
while(it.hasNext()){
key = it.next();
it.remove();
try{
handleInput(key);
}catch(Exception e){
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
}catch(Exception e){
e.printStackTrace();
System.exit(1);
}
}
//selector關(guān)閉后會自動釋放里面管理的資源
if(selector != null){
try{
selector.close();
}catch (Exception e) {
e.printStackTrace();
}
}
}
發(fā)送信息到服務(wù)端的方法:
public void sendMsg(String msg) throws Exception{
//覆蓋其之前感興趣的事件(connect)铐达,將其更改為OP_READ
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel, msg);
}
完整代碼:
服務(wù)端:
/**
* Created by innoyiya on 2018/8/20.
*/
public class Service {
private static int DEFAULT_POST = 12345;
private static ServerHandle serverHandle;
public static void start(){
start(DEFAULT_POST);
}
public static synchronized void start(int post) {
if (serverHandle != null){
serverHandle.shop();
}
serverHandle = new ServerHandle(post);
new Thread(serverHandle,"server").start();
}
}
服務(wù)端主體:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* Created by innoyiya on 2018/8/20.
*/
public class ServerHandle implements Runnable{
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private volatile boolean started;
public ServerHandle(int port){
try {
//創(chuàng)建選擇器
selector = Selector.open();
//打開監(jiān)聽通道
serverSocketChannel = ServerSocketChannel.open();
//設(shè)置為非阻塞模式
serverSocketChannel.configureBlocking(false);
//判定端口岖赋,并設(shè)定連接隊(duì)列最大為1024
serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);
//監(jiān)聽客戶端請求
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//標(biāo)記啟動標(biāo)志
started = true;
System.out.println("服務(wù)器已啟動,端口號為:" + port);
} catch (IOException e){
e.printStackTrace();
System.exit(1);
}
}
public void shop(){
started = false;
}
private void doWrite(SocketChannel channel, String response) throws IOException {
byte[] bytes = response.getBytes();
ByteBuffer wirteBuffer = ByteBuffer.allocate(bytes.length);
wirteBuffer.put(bytes);
wirteBuffer.flip();
channel.write(wirteBuffer);
}
private void handleInput(SelectionKey key) throws IOException{
if (key.isValid()){
if (key.isAcceptable()){
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()){
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int readBytes = socketChannel.read(byteBuffer);
if (readBytes > 0){
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
String expression = new String(bytes, "UTF-8");
System.out.println("服務(wù)器收到的信息:" + expression);
doWrite(socketChannel, "+++++" + expression + "+++++");
} else if (readBytes < 0){
key.cancel();
socketChannel.close();
}
}
}
}
@Override
public void run() {
//循環(huán)遍歷
while (started) {
try {
selector.select();
//System.out.println(selector.select());
Set<SelectionKey> keys = selector.selectedKeys();
//System.out.println(keys.size());
Iterator<SelectionKey> iterator = keys.iterator();
SelectionKey key;
while (iterator.hasNext()){
key = iterator.next();
iterator.remove();
try {
handleInput(key);
} catch (Exception e){
if (key != null){
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
}catch (Throwable throwable){
throwable.printStackTrace();
}
}
}
}
客戶端:
/**
* Created by innoyiya on 2018/8/20.
*/
public class Client {
private static String DEFAULT_HOST = "localhost";
private static int DEFAULT_PORT = 12345;
private static ClientHandle clientHandle;
private static final String EXIT = "exit";
public static void start() {
start(DEFAULT_HOST, DEFAULT_PORT);
}
public static synchronized void start(String ip, int port) {
if (clientHandle != null){
clientHandle.stop();
}
clientHandle = new ClientHandle(ip, port);
new Thread(clientHandle, "Server").start();
}
//向服務(wù)器發(fā)送消息
public static boolean sendMsg(String msg) throws Exception {
if (msg.equals(EXIT)){
return false;
}
clientHandle.sendMsg(msg);
return true;
}
}
客戶端主體代碼:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* Created by innoyiya on 2018/8/20.
*/
public class ClientHandle implements Runnable{
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean started;
public ClientHandle(String ip,int port) {
this.host = ip;
this.port = port;
try{
//創(chuàng)建選擇器
selector = Selector.open();
//打開監(jiān)聽通道
socketChannel = SocketChannel.open();
//如果為 true娶桦,則此通道將被置于阻塞模式贾节;如果為 false,則此通道將被置于非阻塞模式
socketChannel.configureBlocking(false);
started = true;
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
}
public void stop(){
started = false;
}
private void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
SocketChannel sc = (SocketChannel) key.channel();
if(key.isConnectable()){
if(sc.finishConnect()){
System.out.println("已連接事件");
}
else{
System.exit(1);
}
}
//讀消息
if(key.isReadable()){
//創(chuàng)建ByteBuffer衷畦,并開辟一個1M的緩沖區(qū)
ByteBuffer buffer = ByteBuffer.allocate(1024);
//讀取請求碼流栗涂,返回讀取到的字節(jié)數(shù)
int readBytes = sc.read(buffer);
//讀取到字節(jié),對字節(jié)進(jìn)行編解碼
if(readBytes>0){
//將緩沖區(qū)當(dāng)前的limit設(shè)置為position=0祈争,用于后續(xù)對緩沖區(qū)的讀取操作
buffer.flip();
//根據(jù)緩沖區(qū)可讀字節(jié)數(shù)創(chuàng)建字節(jié)數(shù)組
byte[] bytes = new byte[buffer.remaining()];
//將緩沖區(qū)可讀字節(jié)數(shù)組復(fù)制到新建的數(shù)組中
buffer.get(bytes);
String result = new String(bytes,"UTF-8");
System.out.println("客戶端收到消息:" + result);
} else if(readBytes<0){
key.cancel();
sc.close();
}
}
}
}
//異步發(fā)送消息
private void doWrite(SocketChannel channel,String request) throws IOException{
byte[] bytes = request.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
//flip操作
writeBuffer.flip();
//發(fā)送緩沖區(qū)的字節(jié)數(shù)組
channel.write(writeBuffer);
}
private void doConnect() throws IOException{
if(socketChannel.connect(new InetSocketAddress(host,port))){
System.out.println("connect");
}
else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
System.out.println("register");
}
}
public void sendMsg(String msg) throws Exception{
//覆蓋其之前感興趣的事件斤程,將其更改為OP_READ
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel, msg);
}
@Override
public void run() {
try{
doConnect();
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
//循環(huán)遍歷selector
while(started){
try{
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key ;
while(it.hasNext()){
key = it.next();
it.remove();
try{
handleInput(key);
}catch(Exception e){
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
}catch(Exception e){
e.printStackTrace();
System.exit(1);
}
}
//selector關(guān)閉后會自動釋放里面管理的資源
if(selector != null){
try{
selector.close();
}catch (Exception e) {
e.printStackTrace();
}
}
}
}
測試類:
import java.util.Scanner;
/**
* Created by innoyiya on 2018/8/20.
*/
public class Test {
public static void main(String[] args) throws Exception {
Service.start();
Thread.sleep(1000);
Client.start();
while(Client.sendMsg(new Scanner(System.in).nextLine()));
}
}
控制臺打印:
服務(wù)器已啟動菩混,端口號為:12345
register
已連接事件
1234
服務(wù)器收到的信息:1234
客戶端收到消息:+++++1234+++++
5678
服務(wù)器收到的信息:5678
客戶端收到消息:+++++5678+++++
如有不妥之處忿墅,請告訴我。