個人博客:haichenyi.com章蚣。感謝關(guān)注
??傳統(tǒng)的同步阻塞I/O通訊模型推励,導致的結(jié)果就是只要有一方處理數(shù)據(jù)緩慢父晶,都會影響另外一方的處理性能曲梗。按照故障設(shè)計原則赞警,一方的處理出現(xiàn)問題,不應該影響到另外一方才對虏两。但是愧旦,在同步阻塞的模式下面,這樣的情況是無法避免的定罢,很難通過業(yè)務層去解決笤虫。既然同步無法避免,為了避免就產(chǎn)生了異步祖凫。Netty框架就一個完全異步非阻塞的I/O通訊方式
同步阻塞式I/O編程
??簡單的來說琼蚯,傳統(tǒng)同步阻塞的I/O通訊模式,服務器端處理的方式是惠况,每當有一個新用戶接入的時候遭庶,就new一個新的線程,一個線程只能處理一個客戶端的連接稠屠,在高性能方面峦睡,并發(fā)高的情景下無法滿足。偽代碼如下:
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @author 海晨憶
* @date 2018/2/9
* @desc
*/
public class SocketServer {
private int port = 8080;
private Socket socket = null;
public SocketServer(int port) {
this.port = port;
}
public void connect() {
ServerSocket server = null;
try {
server = new ServerSocket(port);
while (true) {
socket = server.accept();
new Thread(new Runnable() {
@Override
public void run() {
new TimerServerHandler(socket).run();
}
}).start();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
//釋放資源
if (server != null) {
try {
server.close();
} catch (IOException e) {
e.printStackTrace();
}
server = null;
}
}
}
}
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
/**
* @author 海晨憶
* @date 2018/2/9
* @desc
*/
public class TimerServerHandler implements Runnable {
private Socket socket;
public TimerServerHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream(), true);
String currentTime = null;
String body = null;
while (true) {
body = in.readLine();
if (body == null)
break;
}
} catch (IOException e) {
e.printStackTrace();
//釋放in权埠,out榨了,socket資源
}
}
}
??上面這個就是最原始的服務端IO的代碼,這里我就給出的是最簡化的弊知,當有新的客戶端接入的時候阻逮,服務端是怎么處理線程的,可以看出秩彤,每當有新的客戶端接入的時候叔扼,總是回新創(chuàng)建一個線程去服務這個新的客戶端
偽異步式編程
??后來慢慢演化出一個版本“偽異步”模型,新增加一個線程池或者消息隊列漫雷,滿足一個線程或者多個線程滿足N個客戶端瓜富,通過線程池可以靈活的調(diào)用線程資源。通過設(shè)置線程池的最大值降盹,防止海量并發(fā)接入造成的線程耗盡与柑,它的底層實現(xiàn)依然是同步阻塞模型,偽代碼如下:
import com.example.zwang.mysocket.server.TimerServerHandler;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @author 海晨憶
* @date 2018/2/9
* @desc
*/
public class SocketServer {
private int port = 8080;
private Socket socket = null;
public SocketServer(int port) {
this.port = port;
}
private void connect() {
ServerSocket server = null;
try {
server = new ServerSocket(port);
TimeServerHandlerExecutePool executePool = new TimeServerHandlerExecutePool(50, 1000);
while (true) {
socket = server.accept();
executePool.execute(new TimerServerHandler(socket));
}
} catch (IOException e) {
e.printStackTrace();
}finally {
//釋放資源
}
}
}
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author 海晨憶
* @date 2018/2/9
* @desc
*/
public class TimeServerHandlerExecutePool {
private ExecutorService executor;
public TimeServerHandlerExecutePool(int maxPoolSize, int queueSize) {
executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), maxPoolSize,
120L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize));
}
public void execute(Runnable task) {
executor.execute(task);
}
}
??“偽異步”的代碼和傳統(tǒng)同步的唯一區(qū)別就是在于,首先先創(chuàng)建了一個時間服務處理類的線程池价捧,當有新的客戶端接入的時候丑念,先將socket請求封裝成task,然后調(diào)用線程池的execute方法執(zhí)行结蟋,從而避免了每一個新請求創(chuàng)建一個新線程脯倚。由于線程池和消息隊列都是有限的,因此嵌屎,無論客戶端的并發(fā)量多大推正,它都不會導致線程個數(shù)過于大,而造成的內(nèi)存溢出宝惰。相對于傳統(tǒng)的同步阻塞植榕,是一種改良。
??但是他沒有從更本上解決同步的問題尼夺,偽異步的問題在于尊残,他還是有一方處理出現(xiàn)問題還是會影響到另一方。因為:
- 當對socket的輸入流進行讀取操作的時候淤堵,它會一直阻塞直到一下三種方式發(fā)生:
?1. 有數(shù)據(jù)可讀
?2. 可讀數(shù)據(jù)已經(jīng)讀取完
?3. 發(fā)生空指針或者I/O異常夜郁。
這意味者,當讀取inputstream方處理速度緩慢(不管是什么原因造成的速度緩慢)粘勒,另一方會一直同步阻塞,直到這一方把數(shù)據(jù)處理完. - 當調(diào)用outputstream的write方法寫輸出流的時候屎即,它將會被阻塞庙睡,直到所有要發(fā)送的字節(jié)全部寫入完畢,或者發(fā)生異常技俐。學過TCP/IP相關(guān)知識的人都直到乘陪,當消息的接收方處理消息緩慢讯柔,不能及時的從TCP緩沖區(qū)讀取數(shù)據(jù)邦邦,這將會導致發(fā)送方的TCP緩沖區(qū)的size一直減少杠愧,直到0.緩沖區(qū)為0虐急,那么發(fā)消息的一方將無法將消息寫入緩沖區(qū)懈糯,直到緩沖區(qū)的size大于0
??通過以上壕曼。我們了解到讀和寫的操作都是同步阻塞的再芋,阻塞的時間取決于對方的I/O線程的處理速度和網(wǎng)絡(luò)I/O的傳送速度戈盈。從本質(zhì)上面看仇穗,我們無法保證對方的處理速度和網(wǎng)絡(luò)傳送速度流部。如果,我們的程序依靠與對方的處理速度纹坐,那么枝冀,他的可靠性將會非常差。
NIO編程
??官方叫法new I/O,也就是新的IO編程果漾,更多的人喜歡稱它為:Non-block IO即非阻塞IO球切。
??與Socket和serverSocket類對應,NIO提供了SocketChannel和ServerSocketChannel兩種不同的套接字通道實現(xiàn)绒障,這兩種都支持阻塞式編程和非阻塞式編程吨凑。開發(fā)人員可以根據(jù)自己的需求選擇合適的編程模式。一般低負載端盆,低并發(fā)的應用程序選擇同步阻塞的方式以降低編程的復雜度怀骤。高負載,高并發(fā)的不用想了焕妙,非阻塞就是為了解決這個問題的
- 緩沖區(qū)Buffer
??Buffer是一個對象蒋伦,它包含一些寫入或者讀出的數(shù)據(jù)。再NIO中加入buffer對象焚鹊,體現(xiàn)了新庫和舊庫的一個重要區(qū)別痕届。在面向流的io中,可以直接把數(shù)據(jù)讀取或者寫入到stream對象中末患。在NIO庫中研叫,所有數(shù)據(jù)操作都是通過緩沖區(qū)處理的。
緩沖區(qū)實質(zhì)上是一個數(shù)組璧针,通常是一個字節(jié)數(shù)組(ByteBuffer)嚷炉,基本數(shù)據(jù)類型除了boolean沒有,其他都有探橱,如ShortBuffer,CharBuffer等等 - 通道Channel
??Channel是一個通道申屹,雙向通道,網(wǎng)絡(luò)數(shù)據(jù)都是通過Channel讀取隧膏,寫入的哗讥。是的,沒錯胞枕,Channel它既可以進行讀操作杆煞,也可以進行寫操作。而流只能是一個方向腐泻。只能讀操作或者只能寫操作决乎,而channel是全雙工,讀寫可以同時進行派桩。channel可以分為兩大類:網(wǎng)絡(luò)讀寫的SelectableChannel和文件操作的FileChannel瑞驱。我們前面提到的SocketChannel和ServerSocketChannel都是SelectableChannel的子類。 - 多路復用器Selector
??selector多路復用器窄坦,他是java NIO編程的基礎(chǔ)唤反,熟練的掌握selector對于NIO編程至關(guān)重要凳寺。多路復用器提供選擇已經(jīng)就緒的任務的能力。簡單的講就是他會不斷的輪詢注冊的channel彤侍,如果一個Channel發(fā)生了讀寫操作肠缨,這個Chnnel就會處于就緒狀態(tài),會被selector輪詢出來盏阶,通過SelectorKey獲取就緒Channel集合晒奕,進行后續(xù)的IO操作。一個selector對應多個Channel
??由于原生NIO編碼比較麻煩和復雜名斟,我這里就給出了思路的偽代碼脑慧。下一篇我們將用NIO中的Netty框架實現(xiàn)Socket通信,編碼簡單砰盐,一行代碼解決煩人粘包闷袒、拆包問題。
/**
* 服務端nio過程的偽代碼
*
* @param port 端口號
* @throws IOException IOException
*/
private void init(int port) throws IOException {
//第一步:打開ServerSocketChannel岩梳,用于監(jiān)聽客戶端連接囊骤,它是所有客戶端連接的父管道
ServerSocketChannel socketChannel = ServerSocketChannel.open();
//第二步:監(jiān)聽綁定端口,設(shè)置連接模式為非阻塞模式冀值,
socketChannel.socket().bind(new InetSocketAddress(InetAddress.getByName("IP"), port));
socketChannel.configureBlocking(false);
//第三步:創(chuàng)建Reactor線程也物,創(chuàng)建多路復用器,并啟動線程列疗。
Selector selector = Selector.open();
new Thread().start();
//第四步:將ServerSocketChannel注冊到Reactor線程的多路復用器上滑蚯,監(jiān)聽accept事件
SelectionKey key = socketChannel.register(selector, SelectionKey.OP_ACCEPT/*,ioHandler*/);
//第五步:多路復用器在線程run方法的無線循環(huán)體內(nèi)輪詢準備就緒的key
int num = selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey next = it.next();
//deal with io event...
}
//第六步:多路復用器檢測到有新客戶端接入,處理新的接入請求抵栈,完成TCP三次握手膘魄,建立物理鏈路
SocketChannel channel = socketChannel.accept();
//第七步:設(shè)置客戶端為非阻塞模式
channel.configureBlocking(false);
channel.socket().setReuseAddress(true);
//第八步:將新接入的客戶端注冊到reactor線程的多路復用器上,監(jiān)聽讀操作竭讳,讀取客戶端發(fā)送的消息
SelectionKey key1 = socketChannel.register(selector, SelectionKey.OP_ACCEPT/*,ioHandler*/);
//第九步:異步讀取客戶端消息到緩沖區(qū),
/*int readNumber = channel.read("receivebuff");*/
//第十步:對byteBuffer進行編解碼浙踢,如果有半包信息指針reset绢慢,繼續(xù)讀取到后續(xù)的報文,將解碼成功消息封裝成task洛波,投遞到業(yè)務線程池胰舆,進行業(yè)務邏輯編排
Object massage = null;
while (buff.hasRemain()) {
buff.mark();
Object massage1 = decode(btyeBuffer);
if (massage1 == null) {
byteBuffer.reset();
break;
}
massageList.add(massage1);
}
if (!byteBuffer.hasRemain()) {
byteBuffer.clean();
} else {
byteBuffer.compact();
}
if (massageList != null && !massageList.isEmpty()) {
for (Object massage3 : massageList){
handlerTask(massage3);
}
}
//第十一步:將POJO對象encode成ByteBuff,調(diào)用SocketChannel的異步write接口蹬挤,將異步消息發(fā)送到客戶端
socketChannel.write(buffer);
}
結(jié)束8苛!焰扳!