同步:任務(wù)一的完成需要依賴任務(wù)二巡蘸,只有等待任務(wù)二完成,任務(wù)一才算完成擂送。
異步:任務(wù)一會(huì)通知任務(wù)二完成什么任務(wù)悦荒,但是兩個(gè)任務(wù)是互不等待,都會(huì)進(jìn)行嘹吨。任務(wù)二完成之后會(huì)告訴任務(wù)一搬味。
阻塞:CPU停下來等待一個(gè)慢的操作完成才繼續(xù)后面的工作。
非阻塞:CPU遇到這個(gè)慢的操作會(huì)先去執(zhí)行其他的命令蟀拷,等慢的動(dòng)作完成之后在處理慢操作對(duì)應(yīng)的命令碰纬。
接下來我們說說同步阻塞,同步非阻塞和異步非阻塞
之前看過一位大牛的博客问芬,他舉了個(gè)例子來解釋三個(gè)概念悦析,我覺得收益匪淺。小時(shí)候媽媽讓去燒水此衅,然后自己拿著水壺去了强戴,在燒水的過程中一直等水燒開,這個(gè)就是同步阻塞炕柔。后來發(fā)現(xiàn)燒水需要很長時(shí)間酌泰,便在燒水的過程中去干別的事,時(shí)不時(shí)的來看看水是不是燒開了匕累,這個(gè)模型就是同步非阻塞陵刹。再后來水壺有了燒開水之后發(fā)聲的功能,那么燒水的時(shí)候欢嘿,我可以不用時(shí)不時(shí)的去查看衰琐,只要聽到聲音了就知道水燒開了也糊,這個(gè)模型就是異步非阻塞。接下來我們用代碼看看下三種模型的具體實(shí)現(xiàn)羡宙。
BIO:同步阻塞
數(shù)據(jù)的讀取寫入必須阻塞在一個(gè)線程內(nèi)等待其完成狸剃,在java中這樣的模型簡單容易理解,每次來一個(gè)請(qǐng)求狗热,服務(wù)器都會(huì)開啟一個(gè)線程去處理钞馁,當(dāng)在連接數(shù)小于1000時(shí),可以讓每一個(gè)連接專注于自己的 I/O匿刮,不用過多考慮系統(tǒng)的過載僧凰、限流等問題。在搭配線程池的使用熟丸,可以很好的解決服務(wù)端連接異常的問題训措。但是當(dāng)連接數(shù)達(dá)到萬級(jí)別之后,線程之間切換帶來請(qǐng)求處理慢的問題逐漸體現(xiàn)光羞。
服務(wù)端
public class BioServer {
final static int PROT = 7788;
public static void main(String[] args) throws IOException {
ServerSocket server = null;
server = new ServerSocket(PROT);
System.out.println(" server start .. ");
while(true) {
//進(jìn)行阻塞
socket = server.accept();
//新建一個(gè)線程執(zhí)行客戶端的任務(wù)
HandlerExecutorPool executorPool = new HandlerExecutorPool(50, 1000);
executorPool.execute(new ServerHandler(socket));
}
}
}
class HandlerExecutorPool {
private ExecutorService executor;
public HandlerExecutorPool(int maxPoolSize, int queueSize){
this.executor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
maxPoolSize,
120L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueSize));
}
public void execute(Runnable task){
this.executor.execute(task);
}
}
class ServerHandler implements Runnable {
private Socket socket;
public ServerHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream(), true);
String body = null;
while (true) {
body = in.readLine();
if (body == null) break;
System.out.println("Server :" + body);
out.println("服務(wù)器端回送響的應(yīng)數(shù)據(jù).");
}
}catch (Exception e){
}
}
}
客戶端
public class Client implements Runnable {
final static String ADDRESS = "127.0.0.1";
final static int PORT = 8088;
public static void main(String[] args) throws IOException {
new Thread(new Client()).start();
}
@Override
public void run() {
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
socket = new Socket(ADDRESS, PORT);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
//向服務(wù)器端發(fā)送數(shù)據(jù)
while (true) {
out.println("接收到客戶端的請(qǐng)求數(shù)據(jù)...");
out.println("接收到客戶端的請(qǐng)求數(shù)據(jù)1111...");
String response = in.readLine();
System.out.println("Client: " + response);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
NIO:同步非阻塞
jdk1.7以后引入了NIO的變成模式绩鸣。首先有三個(gè)概念需要了解。
buffer緩存區(qū):NIO是將所有數(shù)據(jù)都用到緩沖區(qū)數(shù)組中纱兑,
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 7788);//創(chuàng)建連接的地址
SocketChannel sc = null;//聲明連接通道
ByteBuffer buf = ByteBuffer.allocate(1024);//建立緩沖區(qū)
sc = SocketChannel.open();//打開通道
sc.connect(address);//進(jìn)行連接
while(true){
//定義一個(gè)字節(jié)數(shù)組呀闻,然后使用系統(tǒng)錄入功能:
byte[] bytes = new byte[1024];
System.in.read(bytes);
buf.put(bytes);//把數(shù)據(jù)放到緩沖區(qū)中
buf.flip();//對(duì)緩沖區(qū)進(jìn)行復(fù)位
sc.write(buf);//寫出數(shù)據(jù)
buf.clear();//清空緩沖區(qū)數(shù)據(jù)
}
···
Channel 通道
NIO支持網(wǎng)絡(luò)數(shù)據(jù)從Channel中讀取,Channel是區(qū)別與傳統(tǒng)的輸入輸出流的萍启,傳統(tǒng)輸入輸出流只支持單向數(shù)據(jù)流動(dòng)总珠,而Channel同時(shí)支持讀取和寫入,有多種狀態(tài)位可以被識(shí)別勘纯。
Selector 多路復(fù)用選擇器
NIO模型中一個(gè)連接就是一個(gè)Channel,所有的Channel都注冊(cè)在Selector 中,Selector多路復(fù)用器選擇器輪詢查看Channel的狀態(tài)位钓瞭,當(dāng)Channel發(fā)生讀寫操作時(shí)驳遵。便處于就緒狀態(tài),selector多路選擇復(fù)用器會(huì)將所有處于就緒狀態(tài)的Channel輪詢出來山涡,以繼續(xù)后面的io操作堤结,一個(gè)Selector可以負(fù)責(zé)上萬級(jí)別的Channel,沒有上限,這也是JDK使用epoll代替了傳統(tǒng)的selector實(shí)現(xiàn)鸭丛。
服務(wù)端代碼
public class NioServer implements Runnable {
private Selector seletor;
//2 建立緩沖區(qū)
private ByteBuffer readBuf = ByteBuffer.allocate(1024);
//3
private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
public NioServer(int port) {
try {
//1 打開多路復(fù)用器
this.seletor = Selector.open();
//2 打開服務(wù)器通道
ServerSocketChannel ssc = ServerSocketChannel.open();
//3 設(shè)置服務(wù)器通道為非阻塞模式
ssc.configureBlocking(false);
//4 綁定地址
ssc.bind(new InetSocketAddress(port));
//5 把服務(wù)器通道注冊(cè)到多路復(fù)用器上竞穷,并且監(jiān)聽阻塞事件
ssc.register(this.seletor, SelectionKey.OP_ACCEPT);
System.out.println("Server start, port :" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while (true) {
try {
//1 必須要讓多路復(fù)用器開始監(jiān)聽
this.seletor.select();
//2 返回多路復(fù)用器已經(jīng)選擇的結(jié)果集
Iterator<SelectionKey> keys = this.seletor.selectedKeys().iterator();
//3 進(jìn)行遍歷
while (keys.hasNext()) {
//4 獲取一個(gè)選擇的元素
SelectionKey key = keys.next();
//5 直接從容器中移除就可以了
keys.remove();
//6 如果是有效的
if (key.isValid()) {
//7 如果為阻塞狀態(tài)
if (key.isAcceptable()) {
this.accept(key);
}
//8 如果為可讀狀態(tài)
if (key.isReadable()) {
this.read(key);
}
//9 寫數(shù)據(jù)
if (key.isWritable()) {
this.write(key); //ssc
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void write(SelectionKey key) throws ClosedChannelException {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
ssc.register(this.seletor, SelectionKey.OP_WRITE);
}
private void read(SelectionKey key) {
try {
//1 清空緩沖區(qū)舊的數(shù)據(jù)
this.readBuf.clear();
//2 獲取之前注冊(cè)的socket通道對(duì)象
SocketChannel sc = (SocketChannel) key.channel();
//3 讀取數(shù)據(jù)
int count = sc.read(this.readBuf);
//4 如果沒有數(shù)據(jù)
if(count == -1){
key.channel().close();
key.cancel();
return;
}
//5 有數(shù)據(jù)則進(jìn)行讀取 讀取之前需要進(jìn)行復(fù)位方法(把position 和limit進(jìn)行復(fù)位)
this.readBuf.flip();
//6 根據(jù)緩沖區(qū)的數(shù)據(jù)長度創(chuàng)建相應(yīng)大小的byte數(shù)組笨触,接收緩沖區(qū)的數(shù)據(jù)
byte[] bytes = new byte[this.readBuf.remaining()];
//7 接收緩沖區(qū)數(shù)據(jù)
this.readBuf.get(bytes);
//8 打印結(jié)果
String body = new String(bytes).trim();
System.out.println("Server : " + body);
// 9..可以寫回給客戶端數(shù)據(jù)
} catch (IOException e) {
e.printStackTrace();
}
}
private void accept(SelectionKey key) {
try {
//1 獲取服務(wù)通道
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//2 執(zhí)行阻塞方法
SocketChannel sc = ssc.accept();
//3 設(shè)置阻塞模式
sc.configureBlocking(false);
//4 注冊(cè)到多路復(fù)用器上辖试,并設(shè)置讀取標(biāo)識(shí)
sc.register(this.seletor, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Thread(new NioServer(8088)).start();;
}
}
客戶端
class NioClient{
private static String DEFAULT_HOST = "127.0.0.1";
private static int DEFAULT_PORT = 8088;
private static ClientHandle clientHandle;
public static void start(){
start(DEFAULT_HOST,DEFAULT_PORT);
}
public static synchronized void start(String ip,int port){
if(clientHandle!=null)
clientHandle.stop();
clientHandle = new ClientHandle(ip,port);
new Thread(clientHandle,"Server").start();
}
//向服務(wù)器發(fā)送消息
public static boolean sendMsg(String msg) throws Exception{
if(msg.equals("q")) return false;
clientHandle.sendMsg(msg);
return true;
}
public static void main(String[] args){
start();
}
}
class ClientHandle implements Runnable{
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean started;
public ClientHandle(String ip,int port) {
this.host = ip;
this.port = port;
try{
//創(chuàng)建選擇器
selector = Selector.open();
//打開監(jiān)聽通道
socketChannel = SocketChannel.open();
//如果為 true携茂,則此通道將被置于阻塞模式轮锥;如果為 false偏塞,則此通道將被置于非阻塞模式
socketChannel.configureBlocking(false);//開啟非阻塞模式
started = true;
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
}
public void stop(){
started = false;
}
@Override
public void run() {
try{
doConnect();
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
//循環(huán)遍歷selector
while(started){
try{
//無論是否有讀寫事件發(fā)生,selector每隔1s被喚醒一次
selector.select(1000);
//阻塞,只有當(dāng)至少一個(gè)注冊(cè)的事件發(fā)生的時(shí)候才會(huì)繼續(xù).
// selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key = null;
while(it.hasNext()){
key = it.next();
it.remove();
try{
handleInput(key);
}catch(Exception e){
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
}catch(Exception e){
e.printStackTrace();
System.exit(1);
}
}
//selector關(guān)閉后會(huì)自動(dòng)釋放里面管理的資源
if(selector != null)
try{
selector.close();
}catch (Exception e) {
e.printStackTrace();
}
}
private void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
SocketChannel sc = (SocketChannel) key.channel();
if(key.isConnectable()){
if(sc.finishConnect());
else System.exit(1);
}
//讀消息
if(key.isReadable()){
//創(chuàng)建ByteBuffer航罗,并開辟一個(gè)1M的緩沖區(qū)
ByteBuffer buffer = ByteBuffer.allocate(1024);
//讀取請(qǐng)求碼流欺栗,返回讀取到的字節(jié)數(shù)
int readBytes = sc.read(buffer);
//讀取到字節(jié),對(duì)字節(jié)進(jìn)行編解碼
if(readBytes>0){
//將緩沖區(qū)當(dāng)前的limit設(shè)置為position=0允蚣,用于后續(xù)對(duì)緩沖區(qū)的讀取操作
buffer.flip();
//根據(jù)緩沖區(qū)可讀字節(jié)數(shù)創(chuàng)建字節(jié)數(shù)組
byte[] bytes = new byte[buffer.remaining()];
//將緩沖區(qū)可讀字節(jié)數(shù)組復(fù)制到新建的數(shù)組中
buffer.get(bytes);
String result = new String(bytes,"UTF-8");
System.out.println("客戶端收到消息:" + result);
}
//沒有讀取到字節(jié) 忽略
// else if(readBytes==0);
//鏈路已經(jīng)關(guān)閉于颖,釋放資源
else if(readBytes<0){
key.cancel();
sc.close();
}
}
}
}
//異步發(fā)送消息
private void doWrite(SocketChannel channel,String request) throws IOException{
//將消息編碼為字節(jié)數(shù)組
byte[] bytes = request.getBytes();
//根據(jù)數(shù)組容量創(chuàng)建ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
//將字節(jié)數(shù)組復(fù)制到緩沖區(qū)
writeBuffer.put(bytes);
//flip操作
writeBuffer.flip();
//發(fā)送緩沖區(qū)的字節(jié)數(shù)組
channel.write(writeBuffer);
//****此處不含處理“寫半包”的代碼
}
private void doConnect() throws IOException{
if(socketChannel.connect(new InetSocketAddress(host,port)));
else socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
public void sendMsg(String msg) throws Exception{
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel, msg);
}
}
AIO:異步非阻塞
AIO基于事件和回調(diào)機(jī)制,不需要過多的Selector對(duì)注冊(cè)的通道進(jìn)行輪詢即可實(shí)現(xiàn)異步讀寫,從而簡化了NIO的編程模型嚷兔。
服務(wù)端
public class AioServer {
public static void main(String[] args) {
// AIO線程復(fù)用版
Thread sThread = new Thread(new Runnable() {
@Override
public void run() {
AsynchronousChannelGroup group = null;
try {
group = AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(4));
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group).bind(new InetSocketAddress(InetAddress.getLocalHost(), 8088));
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel>() {
@Override
public void completed(AsynchronousSocketChannel result, AsynchronousServerSocketChannel attachment) {
server.accept(null, this); // 接收下一個(gè)請(qǐng)求
try {
Future<Integer> f = result.write(Charset.defaultCharset().encode("你好森渐,世界"));
f.get();
System.out.println("服務(wù)端發(fā)送時(shí)間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
result.close();
} catch (InterruptedException | ExecutionException | IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {
}
});
group.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
});
sThread.start();
}
}
客戶端
class AioClient {
public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
// Socket 客戶端
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
Future<Void> future = client.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8088));
future.get();
ByteBuffer buffer = ByteBuffer.allocate(100);
client.read(buffer, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
System.out.println("客戶端打印:" + new String(buffer.array()));
}
@Override
public void failed(Throwable exc, Void attachment) {
exc.printStackTrace();
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
Thread.sleep(10 * 1000);
}
}