傳統(tǒng)的BIO模式
class Server {
public static void main() {
ExecutorService executor = Excutors.newFixedThreadPollExecutor(100);//線程池
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(8088);
while(!Thread.currentThread.isInturrupted()){//主線程死循環(huán)等待新連接到來
Socket socket = serverSocket.accept();
executor.submit(new ConnectIOnHandler(socket));//為新的連接創(chuàng)建新的線程
}
}
static class ConnectIOnHandler implements Runnable {
private Socket socket;
public ConnectIOnHandler(Socket socket){
this.socket = socket;
}
public void run(){
while(!Thread.currentThread.isInturrupted()&&!socket.isClosed()){
String someThing = socket.read();//讀取數(shù)據(jù)
if(someThing!=null){
......//處理數(shù)據(jù)
socket.write()....//寫數(shù)據(jù)
}
}
}
}
}
上面的代碼中懂版,我們在主線程中處理客戶端的連接請求,然后為每個建立的連接分配一個線程去執(zhí)行躏率。socket.read()躯畴、socket.write()是同步阻塞的,我們開啟了多線程薇芝,就可以讓CPU去處理更多的連接蓬抄,這也是多線程的本質:
利用了多核的并行處理能力
當io阻塞系統(tǒng),但CPU空閑時恩掷,利用多線程使用CPU資源
上面的方案也有其致命缺陷倡鲸,因為其本質還是依賴線程:
線程創(chuàng)建和銷毀的代價很高
線程很占內存
線程的切換帶來的資源消耗。有可能恰好輪到一個線程的時間片黄娘,但此時這個線程被io阻塞峭状,這時會發(fā)生線程切換(無意義的損耗)
上面的線程池定義了100個線程,意味著同時只能為100個用戶服務逼争。倘若服務器同故障節(jié)點通信优床,由于其io是阻塞的,如果所有可用線程被故障節(jié)點阻塞誓焦,那么新的請求在隊列中排隊,直到連接超時胆敞。
所以,當面對數(shù)十萬的連接請求杂伟,傳統(tǒng)的BIO是無能為力的移层。
NIO工作原理
回顧前面的學習內容 Linux網(wǎng)絡IO模型
BIO的read過程:發(fā)起系統(tǒng)調用,試圖從內核空間讀取數(shù)據(jù)到用戶空間赫粥,如果數(shù)據(jù)沒有就緒(數(shù)據(jù)還沒有從硬件拷貝到內核)观话,一直阻塞,直到返回數(shù)據(jù)
NIO的處理過程:發(fā)起系統(tǒng)調用越平,試圖從內核空間讀取數(shù)據(jù)到用戶空間频蛔,如果數(shù)據(jù)沒有就緒,直接返回0秦叛,永遠也不會阻塞
需要注意的是:
從內核拷貝數(shù)據(jù)到用戶空間這個io操作是阻塞的晦溪,而且需要消耗CPU(性能非常高,基本不耗時)
BIO等待內核數(shù)據(jù)就緒的過程是空等挣跋,不需要CPU
Reactor與NIO相結合
所謂的Reactor模式三圆,核心就是事件驅動,或者j叫回調的方式。這種方式就是嫌术,應用業(yè)務向一個中間人注冊一個回調(event handler)哀澈,當IO就緒后,就這個中間人產(chǎn)生一個事件度气,并通知此handler進行處理割按。
那么由誰來充當這個中間人呢?是由一個不斷等待和循環(huán)的單獨進程(線程)來做這件事磷籍,它接受所有handler的注冊适荣,并負責先操作系統(tǒng)查詢IO是否就緒,在就緒后就調用指定handler進行處理院领,這個角色的名字就叫做Reactor弛矛。
回想一下 Linux網(wǎng)絡IO模型 中提到的 IO復用,一個線程可以同時處理多個Connection比然,是不是正好契合Reactor的思想丈氓。所以,在java中可以使用NIO來實現(xiàn)Reactor模型强法。
單線程Reactor
Reactor:負責響應事件万俗,將事件分發(fā)給綁定了該事件的Handler處理;
Handler:事件處理器饮怯,綁定了某類事件闰歪,負責執(zhí)行對應事件的Task對事件進行處理;
Acceptor:Handler的一種蓖墅,綁定了connect事件库倘。當客戶端發(fā)起connect請求時,Reactor會將accept事件分發(fā)給Acceptor處理论矾。
看一下其對應的實現(xiàn):
class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException { //Reactor初始化
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false); //非阻塞
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); //分步處理,第一步,接收accept事件
sk.attach(new Acceptor()); //attach callback object, Acceptor
}
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
dispatch((SelectionKey)(it.next()); //Reactor負責dispatch收到的事件
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable)(k.attachment()); //調用之前注冊的callback對象
if (r != null)
r.run();
}
class Acceptor implements Runnable { // inner
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null)
new Handler(selector, c);
}
catch(IOException ex) { /* ... */ }
}
}
}
final class Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c) throws IOException {
socket = c; c.configureBlocking(false);
// Optionally try first read now
sk = socket.register(sel, 0);
sk.attach(this); //將Handler作為callback對象
sk.interestOps(SelectionKey.OP_READ); //第二步,接收Read事件
sel.wakeup();
}
boolean inputIsComplete() { /* ... */ }
boolean outputIsComplete() { /* ... */ }
void process() { /* ... */ }
public void run() {
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (IOException ex) { /* ... */ }
}
void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// Normally also do first write now
sk.interestOps(SelectionKey.OP_WRITE); //第三步,接收write事件
}
}
void send() throws IOException {
socket.write(output);
if (outputIsComplete()) sk.cancel(); //write完就結束了, 關閉select key
}
}
NIO由原來的阻塞讀寫(占用線程)變成了單線程輪詢事件教翩,找到可以進行讀寫的網(wǎng)絡描述符進行讀寫。除了事件的輪詢是阻塞的(沒有可干的事情必須要阻塞)贪壳,剩余的I/O操作都是純CPU操作迂曲,沒有必要開啟多線程。
缺點:
一個連接里完整的網(wǎng)絡處理過程一般分為accept寥袭、read、decode关霸、process(compute)传黄、encode、send這幾步队寇,如果在process這個過程中需要處理大量的耗時業(yè)務膘掰,比如連接DB或者進行耗時的計算等,整個線程都被阻塞,無法處理其他的鏈路
單線程识埋,不能充分利用多核處理器
單線程處理I/O的效率確實非常高凡伊,沒有線程切換,只是拼命的讀窒舟、寫系忙、選擇事件。但是如果有成千上萬個鏈路惠豺,即使不停的處理银还,一個線程也無法支撐
單線程,一旦線程意外進入死循環(huán)或者拋出未捕獲的異常洁墙,整個系統(tǒng)就掛掉了
對于缺點1蛹疯,通常的解決辦法是將decode、process热监、encode扔到后臺業(yè)務線程池中執(zhí)行捺弦,避免阻塞reactor。但對于缺點2孝扛、3列吼、4,單線程的reactor是無能為力的疗琉。
多線程的Reactor
有專門一個reactor線程用于監(jiān)聽服務端ServerSocketChannel冈欢,接收客戶端的TCP連接請求;
網(wǎng)絡IO的讀/寫操作等由一個worker reactor線程池負責盈简,由線程池中的NIO線程負責監(jiān)聽SocketChannel事件凑耻,進行消息的讀取、解碼柠贤、編碼和發(fā)送香浩。
一個NIO線程可以同時處理N條鏈路,但是一個鏈路只注冊在一個NIO線程上處理臼勉,防止發(fā)生并發(fā)操作問題邻吭。
注意,socketchannel宴霸、selector囱晴、thread三者的對應關系是:
socketchannel只能注冊到一個selector上,但是一個selector可以被多個socketchannel注冊瓢谢;
selector與thread一般為一一對應畸写。
Selector[] selectors; // 一個selector對應一個線程
int next = 0;
class Acceptor {
public synchronized void run() { ...
Socket connection = serverSocket.accept();
if (connection != null)
new Handler(selectors[next], connection);
if (++next == selectors.length) next = 0;
}
}
主從多線程Reactor
在絕大多數(shù)場景下,Reactor多線程模型都可以滿足性能需求氓扛;但是在極個別特殊場景中枯芬,一個NIO線程負責監(jiān)聽和處理所有的客戶端連接可能會存在性能問題。比如,建立連接時需要進行復雜的驗證和授權工作等千所。
服務端用于接收客戶端連接的不再是個1個單獨的reactor線程狂魔,而是一個boss reactor線程池;
服務端啟用多個ServerSocketChannel監(jiān)聽不同端口時淫痰,每個ServerSocketChannel的監(jiān)聽工作可以由線程池中的一個NIO線程完成最楷。
NIO實戰(zhàn)
參考老外寫的一個 Java-NIO-Server:Java NIO: Non-blocking Server,代碼在 github上黑界。不錯的一個參考管嬉,解決了NIO中半包粘包的問題,但是代碼可讀性不高朗鸠;
另外一個NIO-Server蚯撩,代碼比較簡單,可讀性較高烛占,代碼風格值得學習胎挎。但避開了半包粘包的問題,也不算是正真意義上的Reactor模型忆家。