Reactor反應(yīng)器模式是高性能網(wǎng)絡(luò)編程在設(shè)計(jì)和架構(gòu)層面的基礎(chǔ)模式。為什么呢?只有徹底了解反應(yīng)器的原理,才能真正構(gòu)建好高性能的網(wǎng)絡(luò)應(yīng)用驴党,才能輕松地學(xué)習(xí)和掌握Netty框架。同時(shí)获茬,反應(yīng)器模式也是BAT級(jí)別大公司必不可少的面試題港庄。
4.1 Reactor 反應(yīng)器模式為何如此重要
在詳細(xì)介紹什么是Reactor反應(yīng)器模式之前倔既,首先說(shuō)明一下它的重要性。
到目前為止鹏氧,高性能網(wǎng)絡(luò)編程都繞不開(kāi)反應(yīng)器模式叉存。很多著名的服務(wù)器軟件或者中間件都是基于反應(yīng)器模式實(shí)現(xiàn)的。
比如說(shuō)度帮,“ 全宇宙最有名的歼捏、最高性能”的Web服務(wù)器Nginx,就是基于反應(yīng)器模式的笨篷;如雷貫耳的Redis瞳秽,作為最高性能的緩存服務(wù)器之一, 也是基于反應(yīng)器模式的率翅;目前火得“一塌糊涂”练俐、在開(kāi)源項(xiàng)目中應(yīng)用極為廣泛的高性能通信中間件Netty,更是基于反應(yīng)器模式的冕臭。
從開(kāi)發(fā)的角度來(lái)說(shuō)腺晾,如果要完成和勝任高性能的服務(wù)器開(kāi)發(fā),反應(yīng)器模式是必須學(xué)會(huì)和掌握的辜贵。從學(xué)習(xí)的角度來(lái)說(shuō)悯蝉,反應(yīng)器模式相當(dāng)于高性能、高并發(fā)的一項(xiàng)非常重要的基礎(chǔ)知識(shí)托慨,只有掌握了它鼻由,才能真正掌握Nginx、Redis厚棵、 Netty 等這些大名鼎鼎的中間件技術(shù)蕉世。正因?yàn)槿绱耍诖蟮幕ヂ?lián)網(wǎng)公司如阿里婆硬、騰訊狠轻、京東的面試過(guò)程中,反應(yīng)器模式相關(guān)的問(wèn)題是經(jīng)常出現(xiàn)的面試問(wèn)題彬犯。
總之向楼,反應(yīng)器模式是高性能網(wǎng)絡(luò)編程的必知、必會(huì)的模式躏嚎。
4.1.1 為什么首先學(xué)習(xí)Reactor反應(yīng)器模式
本書(shū)的目標(biāo)蜜自,是學(xué)習(xí)基于Netty的開(kāi)發(fā)高性能通信服務(wù)器。為什么在學(xué)習(xí)Netty之前卢佣,首先要學(xué)習(xí)Reactor反應(yīng)器模式呢重荠?
寫(xiě)多了代碼的程序員都知道,Java程序不是按照順序執(zhí)行的邏輯來(lái)組織的虚茶。代碼中所用到的設(shè)計(jì)模式戈鲁,在一定程度上已經(jīng)演變成了代碼的組織方式仇参。越是高水平的Java代碼,抽象的層次越高婆殿,到處都是高度抽象和面向接口的調(diào)用诈乒,大量用到繼承、多態(tài)的設(shè)計(jì)模式婆芦。
在閱讀別人的源代碼時(shí)怕磨,如果不了解代碼所使用的設(shè)計(jì)模式,往往會(huì)暈頭轉(zhuǎn)向消约,不知身在何處肠鲫,很難讀懂別人的代碼,對(duì)代碼跟蹤很成問(wèn)題或粮。反過(guò)來(lái)导饲,如果先了解代碼的設(shè)計(jì)模式,再去看代碼氯材,就會(huì)閱讀得很輕松渣锦,不會(huì)那么難懂了。
當(dāng)然氢哮,在寫(xiě)代碼時(shí)袋毙,不了解設(shè)計(jì)模式,也很難寫(xiě)出高水平的Java代碼命浴。
本書(shū)的重要使命之一娄猫,就是幫助大家學(xué)習(xí)和掌握Netty贱除。Netty本身很抽象生闲,大量應(yīng)用了設(shè)計(jì)模式。學(xué)習(xí)像Netty這樣的“精品中的精品”月幌,肯定也是需要先從設(shè)計(jì)模式入手的碍讯。而Netty的整體架構(gòu),就是基于這個(gè)著名反應(yīng)器模式扯躺。
總之捉兴,反應(yīng)器模式非常重要。首先學(xué)習(xí)和掌握反應(yīng)器模式录语,對(duì)于學(xué)習(xí)Netty的人來(lái)說(shuō)倍啥,一定是磨刀不誤砍柴工。
4.1.2 Reactor反應(yīng)器模式簡(jiǎn)介
什么是Reactor反應(yīng)器模式呢澎埠?本文站在巨人的肩膀上虽缕,引用一下Doug Lea(那是一位讓人無(wú)限景仰的大師,Java中Concurrent并發(fā)包的重要作者之一)在文章《Scalable IO in Java》中對(duì)反應(yīng)器模式的定義蒲稳,具體如下:
反應(yīng)器模式由Reactor反應(yīng)器線程氮趋、Handlers處理器兩大角色組成:
- Reactor反應(yīng)器線程的職責(zé):負(fù)責(zé)響應(yīng)IO事件伍派,并且分發(fā)到Handlers處理器。
- Handlers處理器的職責(zé):非阻塞的執(zhí)行業(yè)務(wù)處理邏輯剩胁。
在這里诉植,為了方便大家學(xué)習(xí),將Doug Lea著名的文章《Scalable IO in Java》的鏈接地址貼出來(lái):http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf昵观,建議大家去閱讀一下晾腔,提升自己的基礎(chǔ)知識(shí),開(kāi)闊下眼界啊犬。
從上面的反應(yīng)器模式定義建车,看不出這種模式有什么神奇的地方。當(dāng)然椒惨,從簡(jiǎn)單到復(fù)雜缤至,反應(yīng)器模式也有很多版本。根據(jù)前面的定義康谆,僅僅是最為簡(jiǎn)單的一個(gè)版本领斥。
如果需要徹底了解反應(yīng)器模式,還得從最原始的OIO編程開(kāi)始講起沃暗。
4.1.3 多線程O(píng)IO的致命缺陷
在Java的OIO編程中月洛,最初和最原始的網(wǎng)絡(luò)服務(wù)器程序,是用一個(gè)while循環(huán)孽锥,不斷地監(jiān)聽(tīng)端口是否有新的連接嚼黔。如果有,那么就調(diào)用一個(gè)處理函數(shù)來(lái)處理惜辑,示例代碼如下:
while(true){
socket = accept(); //阻塞唬涧,接收連接
handle(socket) ; //讀取數(shù)據(jù)、業(yè)務(wù)處理盛撑、寫(xiě)入結(jié)果
}
這種方法的最大問(wèn)題是:如果前一個(gè)網(wǎng)絡(luò)連接的handle(socket)沒(méi)有處理完碎节,那么后面的連接請(qǐng)求沒(méi)法被接收,于是后面的請(qǐng)求通通會(huì)被阻塞住抵卫,服務(wù)器的吞吐量就太低了狮荔。對(duì)于服務(wù)器來(lái)說(shuō),這是一個(gè)嚴(yán)重的問(wèn)題介粘。
為了解決這個(gè)嚴(yán)重的連接阻塞問(wèn)題殖氏,出現(xiàn)了一個(gè)極為經(jīng)典模式:Connection Per Thread(一個(gè)線程處理一個(gè)連接)模式姻采。示例代碼如下:
//...省略: 導(dǎo)入的Java類(lèi)
class ConnectionPerThread implements Runnable {
public void run() {
try {
//服務(wù)器監(jiān)聽(tīng)socket
ServerSocketserverSocket =
new ServerSocket(NioDemoConfig.SOCKET_SERVER_PORT);
while (!Thread.interrupted()) {
Socket socket = serverSocket.accept();
//接收一個(gè)連接后席函,為socket連接营曼,新建一個(gè)專(zhuān)屬的處理器對(duì)象
Handler handler = new Handler(socket);
//創(chuàng)建新線程,專(zhuān)門(mén)負(fù)責(zé)一個(gè)連接的處理
new Thread(handler).start();
}
} catch (IOException ex) { /* 處理異常 */ }
}
//處理器對(duì)象
static class Handler implements Runnable {
final Socket socket;
Handler(Socket s) {
socket = s;
}
public void run() {
while (true) {
try {
byte[] input = new byte[NioDemoConfig.SERVER_BUFFER_SIZE];
/* 讀取數(shù)據(jù) */
socket.getInputStream().read(input);
/* 處理業(yè)務(wù)邏輯,獲取處理結(jié)果*/
byte[] output =null;
/* 寫(xiě)入結(jié)果 */
socket.getOutputStream().write(output);
} catch (IOException ex) { /*處理異常*/ }
}
}
}
}
對(duì)于每一個(gè)新的網(wǎng)絡(luò)連接都分配給一個(gè)線程邀窃。每個(gè)線程都獨(dú)自處理自己負(fù)責(zé)的輸入和輸出惧蛹。當(dāng)然沧烈,服務(wù)器的監(jiān)聽(tīng)線程也是獨(dú)立的惩歉,任何的socket連接的輸入和輸出處理俏蛮,不會(huì)阻塞到后面新socket連接的監(jiān)聽(tīng)和建立柬泽。早期版本的Tomcat服務(wù)器,就是這樣實(shí)現(xiàn)的嫁蛇。
Connection Per Thread模式(一個(gè)線程處理一個(gè)連接)的優(yōu)點(diǎn)是:解決了前面的新連接被嚴(yán)重阻塞的問(wèn)題锨并,在一定程度上,極大地提高了服務(wù)器的吞吐量睬棚。
這里有個(gè)問(wèn)題:如果一個(gè)線程同時(shí)負(fù)責(zé)處理多個(gè)socket連接的輸入和輸入第煮,行不行呢?
看上去抑党,沒(méi)有什么不可以包警。但是,實(shí)際上沒(méi)有用底靠。為什么害晦?傳統(tǒng)OIO編程中每一個(gè)socket的IO讀寫(xiě)操作,都是阻塞的暑中。在同一時(shí)刻壹瘟,一個(gè)線程里只能處理一個(gè)socket,前一個(gè)socket被阻塞了鳄逾,后面連接的IO操作是無(wú)法被并發(fā)執(zhí)行的稻轨。所以,不論怎么處理雕凹,OIO中一個(gè)線程也只能是處理一個(gè)連接的IO操作殴俱。
Connection Per Thread模式的缺點(diǎn)是:對(duì)應(yīng)于大量的連接政冻,需要耗費(fèi)大量的線程資源,對(duì)線程資源要求太高线欲。在系統(tǒng)中明场,線程是比較昂貴的系統(tǒng)資源。如果線程數(shù)太多李丰,系統(tǒng)無(wú)法承受苦锨。而且,線程的反復(fù)創(chuàng)建嫌套、銷(xiāo)毀逆屡、線程的切換也需要代價(jià)。因此踱讨,在高并發(fā)的應(yīng)用場(chǎng)景下魏蔗,多線程O(píng)IO的缺陷是致命的。
如何解決Connection Per Thread模式的巨大缺陷呢痹筛?一個(gè)有效路徑是:使用Reactor反應(yīng)器模式莺治。用反應(yīng)器模式對(duì)線程的數(shù)量進(jìn)行控制,做到一個(gè)線程處理大量的連接帚稠。它是如何做到呢谣旁?首先來(lái)看簡(jiǎn)單的版本——單線程的Reactor反應(yīng)器模式。
4.2 單線程Reactor反應(yīng)器模式
總體來(lái)說(shuō)滋早,Reactor反應(yīng)器模式有點(diǎn)兒類(lèi)似事件驅(qū)動(dòng)模式榄审。
在事件驅(qū)動(dòng)模式中,當(dāng)有事件觸發(fā)時(shí)杆麸,事件源會(huì)將事件dispatch分發(fā)到handler處理器進(jìn)行事件處理搁进。反應(yīng)器模式中的反應(yīng)器角色,類(lèi)似于事件驅(qū)動(dòng)模式中的dispatcher事件分發(fā)器角色昔头。
前面已經(jīng)提到饼问,在反應(yīng)器模式中,有Reactor反應(yīng)器和Handler處理器兩個(gè)重要的組件:
- Reactor反應(yīng)器:負(fù)責(zé)查詢(xún)IO事件揭斧,當(dāng)檢測(cè)到一個(gè)IO事件莱革,將其發(fā)送給相應(yīng)的Handler處理器去處理。這里的IO事件讹开,就是NIO中選擇器監(jiān)控的通道IO事件盅视。
- Handler處理器:與IO事件(或者選擇鍵)綁定,負(fù)責(zé)IO事件的處理萧吠。完成真正的連接建立左冬、通道的讀取、處理業(yè)務(wù)邏輯纸型、負(fù)責(zé)將結(jié)果寫(xiě)出到通道等。
4.2.1 什么是單線程Reactor反應(yīng)器
什么是單線程版本的Reactor反應(yīng)器模式呢?簡(jiǎn)單地說(shuō)狰腌,Reactor反應(yīng)器和Handers處理器處于一個(gè)線程中執(zhí)行除破。它是最簡(jiǎn)單的反應(yīng)器模型,如圖4-1所示琼腔。
基于Java NIO瑰枫,如何實(shí)現(xiàn)簡(jiǎn)單的單線程版本的反應(yīng)器模式呢?需要用到SelectionKey選擇鍵的幾個(gè)重要的成員方法:
方法一:void attach(Object o)
此方法可以將任何的Java POJO對(duì)象丹莲,作為附件添加到SelectionKey實(shí)例光坝,相當(dāng)于附件屬性的setter方法。這方法非常重要甥材,因?yàn)樵趩尉€程版本的反應(yīng)器模式中盯另,需要將Handler處理器實(shí)例,作為附件添加到SelectionKey實(shí)例洲赵。
方法二:Object attachment()
此方法的作用是取出之前通過(guò)attach(Object o)添加到SelectionKey選擇鍵實(shí)例的附件鸳惯,相當(dāng)于附件屬性的getter方法,與attach(Object o)配套使用叠萍。
這個(gè)方法同樣非常重要芝发,當(dāng)IO事件發(fā)生,選擇鍵被select方法選到苛谷,可以直接將事件的附件取出辅鲸,也就是之前綁定的Handler處理器實(shí)例,通過(guò)該Handler腹殿,完成相應(yīng)的處理独悴。
總之,在反應(yīng)器模式中赫蛇,需要進(jìn)行attach和attachment結(jié)合使用:在選擇鍵注冊(cè)完成之后绵患,調(diào)用attach方法,將Handler處理器綁定到選擇鍵悟耘;當(dāng)事件發(fā)生時(shí)落蝙,調(diào)用attachment方法,可以從選擇鍵取出Handler處理器暂幼,將事件分發(fā)到Handler處理器中筏勒,完成業(yè)務(wù)處理。
4.2.2 單線程Reactor反應(yīng)器的參考代碼
Doug Lea在《Scalable IO in Java》中旺嬉,實(shí)現(xiàn)了一個(gè)單線程Reactor反應(yīng)器模式的參考代碼管行。這里,我們站在巨人的肩膀上邪媳,借鑒Doug Lea的實(shí)現(xiàn)捐顷,對(duì)其進(jìn)行介紹荡陷。為了方便說(shuō)明,對(duì)Doug Lea的參考代碼進(jìn)行一些適當(dāng)?shù)男薷难镐獭>唧w的參考代碼如下:
//...
class Reactor implements Runnable {
Selector selector;
ServerSocketChannelserverSocket;
EchoServerReactor() throws IOException {
//....省略:打開(kāi)選擇器废赞、serverSocket連接監(jiān)聽(tīng)通道
//注冊(cè)serverSocket的accept事件
SelectionKeysk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
//將新連接處理器作為附件,綁定到sk選擇鍵
sk.attach(new AcceptorHandler());
}
public void run() {
//選擇器輪詢(xún)
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
//反應(yīng)器負(fù)責(zé)dispatch收到的事件
SelectionKeysk=it.next();
dispatch(sk);
}
selected.clear();
}
} catch (IOException ex) { ex.printStackTrace(); }
}
//反應(yīng)器的分發(fā)方法
void dispatch(SelectionKey k) {
Runnable handler = (Runnable) (k.attachment());
//調(diào)用之前綁定到選擇鍵的handler處理器對(duì)象
if (handler != null) {
handler.run();
}
}
// 新連接處理器
class AcceptorHandler implements Runnable {
public void run() {
//接受新連接
//需要為新連接叮姑,創(chuàng)建一個(gè)輸入輸出的handler處理器
}
}
//….
}
在上面的代碼中唉地,設(shè)計(jì)了一個(gè)Handler處理器,叫作AcceptorHandler處理器传透,它是一個(gè)內(nèi)部類(lèi)耘沼。在注冊(cè)serverSocket服務(wù)監(jiān)聽(tīng)連接的接受事件之后,創(chuàng)建一個(gè)AcceptorHandler新連接處理器的實(shí)例朱盐,作為附件群嗤,被設(shè)置(attach)到了SelectionKey中。
//注冊(cè)serverSocket的接受(accept)事件
SelectionKeysk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
//將新連接處理器作為附件托享,綁定到sk選擇鍵
sk.attach(new AcceptorHandler());
當(dāng)新連接事件發(fā)生后骚烧,取出了之前attach到SelectionKey中的Handler業(yè)務(wù)處理器,進(jìn)行socket的各種IO處理
void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
//調(diào)用之前綁定到選擇鍵的處理器對(duì)象
if (r != null) {
r.run();
}
}
AcceptorHandler處理器的兩大職責(zé):一是接受新連接闰围,二是在為新連接創(chuàng)建一個(gè)輸入輸出的Handler處理器赃绊,稱(chēng)之為IOHandler。
// 新連接處理器
class AcceptorHandler implements Runnable {
public void run() {
// 接受新連接
// 需要為新連接創(chuàng)建一個(gè)輸入輸出的handler處理器
}
}
IOHandler羡榴,顧名思義碧查,就是負(fù)責(zé)socket的數(shù)據(jù)輸入、業(yè)務(wù)處理校仑、結(jié)果輸出忠售。示例代碼如下:
//...
class IOHandler implements Runnable {
final SocketChannel channel;
final SelectionKeysk;
IOHandler (Selector selector, SocketChannel c) throws IOException {
channel = c;
c.configureBlocking(false);
//僅僅取得選擇鍵,稍候設(shè)置感興趣的IO事件
sk = channel.register(selector, 0);
//將Handler處理器作為選擇鍵的附件
sk.attach(this);
//注冊(cè)讀寫(xiě)就緒事件
sk.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
}
public void run() {
//...處理輸入和輸出
}
}
在IOHandler的構(gòu)造器中迄沫,有兩點(diǎn)比較重要:
- 將新的SocketChannel傳輸通道稻扬,注冊(cè)到了反應(yīng)器Reactor類(lèi)的同一個(gè)選擇器中。這樣保證了Reactor類(lèi)和Handler類(lèi)在同一個(gè)線程中執(zhí)行羊瘩。
- Channel傳輸通道注冊(cè)完成后泰佳,將IOHandler自身作為附件,attach到了選擇鍵中尘吗。這樣逝她,在Reactor類(lèi)分發(fā)事件(選擇鍵)時(shí),能執(zhí)行到IOHandler的run方法睬捶。
如果上面的示例代碼比較繞口黔宛,不要緊。為了徹底地理解個(gè)中妙處擒贸,自己動(dòng)手開(kāi)發(fā)一個(gè)可以執(zhí)行的實(shí)例臀晃。下面基于反應(yīng)器模式觉渴,實(shí)現(xiàn)了一個(gè)EchoServer回顯服務(wù)器實(shí)例。仔細(xì)閱讀和運(yùn)行這個(gè)實(shí)例积仗,就可以明白上面這段繞口的程序代碼的真正含義了疆拘。
4.2.3 一個(gè)Reactor反應(yīng)器版本的EchoServer實(shí)踐案例
EchoServer回顯服務(wù)器的功能很簡(jiǎn)單:讀取客戶(hù)端的輸入蜕猫,回顯到客戶(hù)端寂曹,所以也叫回顯服務(wù)器』赜遥基于Reactor反應(yīng)器模式來(lái)實(shí)現(xiàn)隆圆,設(shè)計(jì)3個(gè)重要的類(lèi):
- 設(shè)計(jì)一個(gè)反應(yīng)器類(lèi):EchoServerReactor類(lèi)。
- 設(shè)計(jì)兩個(gè)處理器類(lèi):AcceptorHandler新連接處理器翔烁、EchoHandler回顯處理器渺氧。
反應(yīng)器類(lèi)EchoServerReactor的實(shí)現(xiàn)思路和前面的示例代碼基本上相同,具體如下:
//.....
//反應(yīng)器
class EchoServerReactor implements Runnable {
Selector selector;
ServerSocketChannel serverSocket;
EchoServerReactor() throws IOException {
//...獲取選擇器蹬屹、開(kāi)啟serverSocket服務(wù)監(jiān)聽(tīng)通道
//...綁定AcceptorHandler新連接處理器到selectKey
}
//輪詢(xún)和分發(fā)事件
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
//反應(yīng)器負(fù)責(zé)dispatch收到的事件
SelectionKey sk = it.next();
dispatch(sk);
}
selected.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
void dispatch(SelectionKeysk) {
Runnable handler = (Runnable) sk.attachment();
//調(diào)用之前attach綁定到選擇鍵的handler處理器對(duì)象
if (handler != null) {
handler.run();
}
}
// Handler:新連接處理器
class AcceptorHandler implements Runnable {
public void run() {
try {
SocketChannel channel = serverSocket.accept();
if (channel != null)
new EchoHandler(selector, channel);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
new Thread(new EchoServerReactor()).start();
}
}
EchoHandler回顯處理器侣背,主要是完成客戶(hù)端的內(nèi)容讀取和回顯,具體如下:
//...
class EchoHandler implements Runnable {
final SocketChannel channel;
final SelectionKeysk;
final ByteBufferbyteBuffer = ByteBuffer.allocate(1024);
static final int RECIEVING = 0, SENDING = 1;
int state = RECIEVING;
EchoHandler(Selector selector, SocketChannel c) throws IOException {
channel = c;
c.configureBlocking(false);
//取得選擇鍵慨默,再設(shè)置感興趣的IO事件
sk = channel.register(selector, 0);
//將Handler自身作為選擇鍵的附件
sk.attach(this);
//注冊(cè)Read就緒事件
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
public void run() {
try {
if (state == SENDING) {
//寫(xiě)入通道
channel.write(byteBuffer);
//寫(xiě)完后,準(zhǔn)備開(kāi)始從通道讀,byteBuffer切換成寫(xiě)入模式
byteBuffer.clear();
//寫(xiě)完后,注冊(cè)read就緒事件
sk.interestOps(SelectionKey.OP_READ);
//寫(xiě)完后,進(jìn)入接收的狀態(tài)
state = RECIEVING;
} else if (state == RECIEVING) {
//從通道讀
int length = 0;
while ((length = channel.read(byteBuffer)) > 0) {
Logger.info(new String(byteBuffer.array(), 0, length));
}
//讀完后贩耐,準(zhǔn)備開(kāi)始寫(xiě)入通道,byteBuffer切換成讀取模式
byteBuffer.flip();
//讀完后,注冊(cè)write就緒事件
sk.interestOps(SelectionKey.OP_WRITE);
//讀完后,進(jìn)入發(fā)送的狀態(tài)
state = SENDING;
}
//處理結(jié)束了, 這里不能關(guān)閉select key厦取,需要重復(fù)使用
//sk.cancel();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
以上兩個(gè)類(lèi)潮太,是一個(gè)基于反應(yīng)器模式的EchoServer回顯服務(wù)器的完整實(shí)現(xiàn)。它是一個(gè)單線程版本的反應(yīng)器模式虾攻,Reactor反應(yīng)器和所有的Handler處理器铡买,都執(zhí)行在同一條線程中。
運(yùn)行EchoServerReactor類(lèi)中的main方法霎箍,可以啟動(dòng)回顯服務(wù)器奇钞。如果要看到回顯輸出,還需要啟動(dòng)客戶(hù)端漂坏【鞍#客戶(hù)端的代碼,在同一個(gè)包下樊拓,類(lèi)名為EchoClient纠亚,負(fù)責(zé)數(shù)據(jù)的發(fā)送。代碼如下:
public class EchoClient {
public void start() throws IOException {
InetSocketAddress address =
new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
NioDemoConfig.SOCKET_SERVER_PORT);
// 1筋夏、獲取通道(channel)
SocketChannel socketChannel = SocketChannel.open(address);
// 2蒂胞、切換成非阻塞模式
socketChannel.configureBlocking(false);
//不斷的自旋、等待連接完成条篷,或者做一些其他的事情
while (!socketChannel.finishConnect()) {
}
Print.tcfo("客戶(hù)端啟動(dòng)成功骗随!");
//啟動(dòng)接受線程
Processer processer = new Processer(socketChannel);
new Thread(processer).start();
}
static class Processer implements Runnable {
final Selector selector;
final SocketChannel channel;
Processer(SocketChannel channel) throws IOException {
//Reactor初始化
selector = Selector.open();
this.channel = channel;
channel.register(selector,
SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
SelectionKey sk = it.next();
if (sk.isWritable()) {
ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);
Scanner scanner = new Scanner(System.in);
Print.tcfo("請(qǐng)輸入發(fā)送內(nèi)容:");
if (scanner.hasNext()) {
SocketChannel socketChannel = (SocketChannel) sk.channel();
String next = scanner.next();
buffer.put((Dateutil.getNow() + " >>" + next).getBytes());
buffer.flip();
// 操作三:通過(guò)DatagramChannel數(shù)據(jù)報(bào)通道發(fā)送數(shù)據(jù)
socketChannel.write(buffer);
buffer.clear();
}
}
if (sk.isReadable()) {
// 若選擇鍵的IO事件是“可讀”事件,讀取數(shù)據(jù)
SocketChannel socketChannel = (SocketChannel) sk.channel();
//讀取數(shù)據(jù)
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int length = 0;
while ((length = socketChannel.read(byteBuffer)) > 0) {
byteBuffer.flip();
Logger.info("server echo:" + new String(byteBuffer.array(), 0, length));
byteBuffer.clear();
}
}
//處理結(jié)束了, 這里不能關(guān)閉select key蛤织,需要重復(fù)使用
//selectionKey.cancel();
}
selected.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
new EchoClient().start();
}
}
代碼測(cè)試,client類(lèi)運(yùn)行結(jié)果如下:
[main|EchoClient.start]:客戶(hù)端啟動(dòng)成功鸿染!
[Thread-0|EchoClient$Processer.run]:請(qǐng)輸入發(fā)送內(nèi)容:
hello
server類(lèi)運(yùn)行結(jié)果如下:
[Thread-0|EchoHandler.run] |> 2019-11-19 02:39:51 >>hello
4.2.4 單線程Reactor反應(yīng)器模式的缺點(diǎn)
單線程Reactor反應(yīng)器模式指蚜,是基于Java的NIO實(shí)現(xiàn)的。相對(duì)于傳統(tǒng)的多線程O(píng)IO涨椒,反應(yīng)器模式不再需要啟動(dòng)成千上萬(wàn)條線程摊鸡,效率自然是大大提升了。
在單線程反應(yīng)器模式中蚕冬,Reactor反應(yīng)器和Handler處理器免猾,都執(zhí)行在同一條線程上。這樣囤热,帶來(lái)了一個(gè)問(wèn)題:當(dāng)其中某個(gè)Handler阻塞時(shí)猎提,會(huì)導(dǎo)致其他所有的Handler都得不到執(zhí)行。在這種場(chǎng)景下旁蔼,如果被阻塞的Handler不僅僅負(fù)責(zé)輸入和輸出處理的業(yè)務(wù)锨苏,還包括負(fù)責(zé)連接監(jiān)聽(tīng)的AcceptorHandler處理器。這個(gè)是非常嚴(yán)重的問(wèn)題棺聊。
為什么伞租?一旦AcceptorHandler處理器阻塞,會(huì)導(dǎo)致整個(gè)服務(wù)不能接收新的連接躺屁,使得服務(wù)器變得不可用肯夏。因?yàn)檫@個(gè)缺陷,因此單線程反應(yīng)器模型用得比較少犀暑。
另外驯击,目前的服務(wù)器都是多核的,單線程反應(yīng)器模式模型不能充分利用多核資源耐亏』捕迹總之,在高性能服務(wù)器應(yīng)用場(chǎng)景中广辰,單線程反應(yīng)器模式實(shí)際使用的很少暇矫。
4.3 多線程的Reactor反應(yīng)器模式
既然Reactor反應(yīng)器和Handler處理器,擠在一個(gè)線程會(huì)造成非常嚴(yán)重的性能缺陷择吊。那么李根,可以使用多線程,對(duì)基礎(chǔ)的反應(yīng)器模式進(jìn)行改造和演進(jìn)几睛。
4.3.1 多線程池Reactor反應(yīng)器演進(jìn)
多線程池Reactor反應(yīng)器的演進(jìn)房轿,分為兩個(gè)方面:
- 首先是升級(jí)Handler處理器。既要使用多線程,又要盡可能的高效率囱持,則可以考慮使用線程池夯接。
- 其次是升級(jí)Reactor反應(yīng)器》鬃保可以考慮引入多個(gè)Selector選擇器盔几,提升選擇大量通道的能力。
總體來(lái)說(shuō)掩幢,多線程池反應(yīng)器的模式逊拍,大致如下:
- 將負(fù)責(zé)輸入輸出處理的IOHandler處理器的執(zhí)行,放入獨(dú)立的線程池中粒蜈。這樣顺献,業(yè)務(wù)處理線程與負(fù)責(zé)服務(wù)監(jiān)聽(tīng)和IO事件查詢(xún)的反應(yīng)器線程相隔離,避免服務(wù)器的連接監(jiān)聽(tīng)受到阻塞枯怖。
- 如果服務(wù)器為多核的CPU,可以將反應(yīng)器線程拆分為多個(gè)子反應(yīng)器(SubReactor)線程能曾;同時(shí)度硝,引入多個(gè)選擇器,每一個(gè)SubReactor子線程負(fù)責(zé)一個(gè)選擇器寿冕。這樣蕊程,充分釋放了系統(tǒng)資源的能力;也提高了反應(yīng)器管理大量連接驼唱,提升選擇大量通道的能力藻茂。
4.3.2 多線程Reactor反應(yīng)器的實(shí)踐案例
在前面的“回顯服務(wù)器”(EchoServer)的基礎(chǔ)上,完成多線程Reactor反應(yīng)器的升級(jí)玫恳。多線程反應(yīng)器的實(shí)踐案例設(shè)計(jì)如下:
- 引入多個(gè)選擇器辨赐。
- 設(shè)計(jì)一個(gè)新的子反應(yīng)器(SubReactor)類(lèi)箱歧,一個(gè)子反應(yīng)器負(fù)責(zé)查詢(xún)一個(gè)選擇器界赔。
- 開(kāi)啟多個(gè)反應(yīng)器的處理線程关贵,一個(gè)線程負(fù)責(zé)執(zhí)行一個(gè)子反應(yīng)器(SubReactor)赦役。
為了提升效率括尸,建議SubReactor的數(shù)量和選擇器的數(shù)量一致驾诈。避免多個(gè)線程負(fù)責(zé)一個(gè)選擇器灵妨,導(dǎo)致需要進(jìn)行線程同步着憨,引起的效率降低财饥。這個(gè)實(shí)踐案例的代碼如下:
//....反應(yīng)器
class MultiThreadEchoServerReactor {
ServerSocketChannelserverSocket;
AtomicInteger next = new AtomicInteger(0);
//選擇器集合,引入多個(gè)選擇器
Selector[] selectors = new Selector[2];
//引入多個(gè)子反應(yīng)器
SubReactor[] subReactors = null;
MultiThreadEchoServerReactor() throws IOException {
//初始化多個(gè)選擇器
selectors[0] = Selector.open();
selectors[1] = Selector.open();
serverSocket = ServerSocketChannel.open();
InetSocketAddress address =
new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
NioDemoConfig.SOCKET_SERVER_PORT);
serverSocket.socket().bind(address);
//非阻塞
serverSocket.configureBlocking(false);
//第一個(gè)選擇器,負(fù)責(zé)監(jiān)控新連接事件
SelectionKeysk =
serverSocket.register(selectors[0], SelectionKey.OP_ACCEPT);
//綁定Handler:attach新連接監(jiān)控handler處理器到SelectionKey(選擇鍵)
sk.attach(new AcceptorHandler());
//第一個(gè)子反應(yīng)器换吧,一子反應(yīng)器負(fù)責(zé)一個(gè)選擇器
SubReactor subReactor1 = new SubReactor(selectors[0]);
//第二個(gè)子反應(yīng)器,一子反應(yīng)器負(fù)責(zé)一個(gè)選擇器
SubReactor subReactor2 = new SubReactor(selectors[1]);
subReactors = new SubReactor[]{subReactor1, subReactor2};
}
private void startService() {
// 一子反應(yīng)器對(duì)應(yīng)一個(gè)線程
new Thread(subReactors[0]).start();
new Thread(subReactors[1]).start();
}
//子反應(yīng)器
class SubReactor implements Runnable {
//每個(gè)線程負(fù)責(zé)一個(gè)選擇器的查詢(xún)和選擇
final Selector selector;
public SubReactor(Selector selector) {
this.selector = selector;
}
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> keySet = selector.selectedKeys();
Iterator<SelectionKey> it = keySet.iterator();
while (it.hasNext()) {
//反應(yīng)器負(fù)責(zé)dispatch收到的事件
SelectionKeysk = it.next();
dispatch(sk);
}
keySet.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
void dispatch(SelectionKeysk) {
Runnable handler = (Runnable) sk.attachment();
//調(diào)用之前attach綁定到選擇鍵的handler處理器對(duì)象
if (handler != null) {
handler.run();
}
}
}
// Handler:新連接處理器
class AcceptorHandler implements Runnable {
public void run() {
try {
SocketChannel channel = serverSocket.accept();
if (channel != null)
new MultiThreadEchoHandler(selectors[next.get()], channel);
} catch (IOException e) {
e.printStackTrace();
}
if (next.incrementAndGet() == selectors.length) {
next.set(0);
}
}
}
public static void main(String[] args) throws IOException {
MultiThreadEchoServerReactor server =
new MultiThreadEchoServerReactor();
server.startService();
}
}
上面是反應(yīng)器的演進(jìn)代碼钥星,再來(lái)看看Handler處理器的多線程演進(jìn)實(shí)踐沾瓦。
4.3.3 多線程Handler處理器的實(shí)踐案例
基于前面的單線程反應(yīng)器的EchoHandler回顯處理器的程序代碼,予以改進(jìn),新的回顯處理器為:MultiThreadEchoHandler暴拄。主要的升級(jí)是引入了一個(gè)線程池(ThreadPool)漓滔,業(yè)務(wù)處理的代碼執(zhí)行在自己的線程池中,徹底地做到業(yè)務(wù)處理線程和反應(yīng)器IO事件線程的完全隔離乖篷。這個(gè)實(shí)踐案例的代碼如下:
//...
class MultiThreadEchoHandler implements Runnable {
final SocketChannel channel;
final SelectionKeysk;
final ByteBufferbyteBuffer = ByteBuffer.allocate(1024);
static final int RECIEVING = 0, SENDING = 1;
int state = RECIEVING;
//引入線程池
static ExecutorService pool = Executors.newFixedThreadPool(4);
MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {
channel = c;
c.configureBlocking(false);
//取得選擇鍵响驴,、再設(shè)置感興趣的IO事件
sk = channel.register(selector, 0);
//將本Handler作為sk選擇鍵的附件撕蔼,方便事件分發(fā)(dispatch)
sk.attach(this);
//向sk選擇鍵注冊(cè)Read就緒事件
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
public void run() {
//異步任務(wù)豁鲤,在獨(dú)立的線程池中執(zhí)行
pool.execute(new AsyncTask());
}
//業(yè)務(wù)處理,不在反應(yīng)器線程中執(zhí)行
public synchronized void asyncRun() {
try {
if (state == SENDING) {
//寫(xiě)入通道
channel.write(byteBuffer);
//寫(xiě)完后,準(zhǔn)備開(kāi)始從通道讀,byteBuffer切換成寫(xiě)入模式
byteBuffer.clear();
//寫(xiě)完后,注冊(cè)read就緒事件
sk.interestOps(SelectionKey.OP_READ);
//寫(xiě)完后,進(jìn)入接收的狀態(tài)
state = RECIEVING;
} else if (state == RECIEVING) {
//從通道讀
int length = 0;
while ((length = channel.read(byteBuffer)) > 0) {
Logger.info(new String(byteBuffer.array(), 0, length));
}
//讀完后鲸沮,準(zhǔn)備開(kāi)始寫(xiě)入通道,byteBuffer切換成讀取模式
byteBuffer.flip();
//讀完后琳骡,注冊(cè)write就緒事件
sk.interestOps(SelectionKey.OP_WRITE);
//讀完后,進(jìn)入發(fā)送的狀態(tài)
state = SENDING;
}
//處理結(jié)束了, 這里不能關(guān)閉select key,需要重復(fù)使用
//sk.cancel();
} catch (IOException ex) {
ex.printStackTrace();
}
}
//異步任務(wù)的內(nèi)部類(lèi)
class AsyncTask implements Runnable {
public void run() {
MultiThreadEchoHandler.this.asyncRun();
}
}
}
代碼中設(shè)計(jì)了一個(gè)內(nèi)部類(lèi)AsyncTask讼溺,是一個(gè)簡(jiǎn)單的異步任務(wù)的提交類(lèi)楣号。它使得異步業(yè)務(wù)asyncRun方法,可以獨(dú)立地提交到線程池中怒坯。另外炫狱,既然業(yè)務(wù)處理異步執(zhí)行,需要在asyncRun方法的前面加上synchronized同步修飾符剔猿。
至此视译,多線程版本的反應(yīng)器模式,實(shí)踐案例的代碼就演示完了归敬。執(zhí)行新版本的多線程MultiThreadEchoServerReactor服務(wù)器酷含,可以使用之前的EchoClient客戶(hù)端與之配置,完成整個(gè)回顯(echo)的通信演示汪茧。
演示的輸出和之前單線程版本的EchoServer回顯服務(wù)器示例椅亚,是一模一樣的。
客戶(hù)端演示結(jié)果:
[main|EchoClient.start]:客戶(hù)端啟動(dòng)成功陆爽!
[Thread-0|EchoClient$Processer.run]:請(qǐng)輸入發(fā)送內(nèi)容:
Multi helloworld
服務(wù)器端演示結(jié)果:
[pool-1-thread-1|MultiThreadEchoHandler.asyncRun] |> 2019-11-19 03:13:28 >>Multi
4.4 Reactor反應(yīng)器模式小結(jié)
在總結(jié)反應(yīng)器模式前什往,首先看看和其他模式的對(duì)比,加強(qiáng)一下對(duì)它的理解慌闭。
- 反應(yīng)器模式和生產(chǎn)者消費(fèi)者模式對(duì)比
相似之處:在一定程度上别威,反應(yīng)器模式有點(diǎn)類(lèi)似生產(chǎn)者消費(fèi)者模式。在生產(chǎn)者消費(fèi)者模式中驴剔,一個(gè)或多個(gè)生產(chǎn)者將事件加入到一個(gè)隊(duì)列中省古,一個(gè)或多個(gè)消費(fèi)者主動(dòng)地從這個(gè)隊(duì)列中提取(Pull)事件來(lái)處理丧失。
不同之處在于:反應(yīng)器模式是基于查詢(xún)的豺妓,沒(méi)有專(zhuān)門(mén)的隊(duì)列去緩沖存儲(chǔ)IO事件,查詢(xún)到IO事件之后,反應(yīng)器會(huì)根據(jù)不同IO選擇鍵(事件)將其分發(fā)給對(duì)應(yīng)的Handler處理器來(lái)處理琳拭。
- 反應(yīng)器模式和觀察者模式(Observer Pattern)對(duì)比
相似之處在于:在反應(yīng)器模式中训堆,當(dāng)查詢(xún)到IO事件后,服務(wù)處理程序使用單路/多路分發(fā)(Dispatch)策略白嘁,同步地分發(fā)這些IO事件坑鱼。觀察者模式(Observer Pattern)也被稱(chēng)作發(fā)布/訂閱模式,它定義了一種依賴(lài)關(guān)系絮缅,讓多個(gè)觀察者同時(shí)監(jiān)聽(tīng)某一個(gè)主題(Topic)鲁沥。這個(gè)主題對(duì)象在狀態(tài)發(fā)生變化時(shí),會(huì)通知所有觀察者耕魄,它們能夠執(zhí)行相應(yīng)的處理画恰。
不同之處在于:在反應(yīng)器模式中,Handler處理器實(shí)例和IO事件(選擇鍵)的訂閱關(guān)系吸奴,基本上是一個(gè)事件綁定到一個(gè)Handler處理器允扇;每一個(gè)IO事件(選擇鍵)被查詢(xún)后,反應(yīng)器會(huì)將事件分發(fā)給所綁定的Handler處理器奄抽;而在觀察者模式中蔼两,同一個(gè)時(shí)刻,同一個(gè)主題可以被訂閱過(guò)的多個(gè)觀察者處理逞度。
最后,總結(jié)一下反應(yīng)器模式的優(yōu)點(diǎn)和缺點(diǎn)妙啃。作為高性能的IO模式档泽,反應(yīng)器模式的優(yōu)點(diǎn)如下:
響應(yīng)快,雖然同一反應(yīng)器線程本身是同步的揖赴,但不會(huì)被單個(gè)連接的同步IO所阻塞馆匿;
編程相對(duì)簡(jiǎn)單,最大程度避免了復(fù)雜的多線程同步燥滑,也避免了多線程的各個(gè)進(jìn)程之間切換的開(kāi)銷(xiāo)渐北;
可擴(kuò)展,可以方便地通過(guò)增加反應(yīng)器線程的個(gè)數(shù)來(lái)充分利用CPU資源铭拧。
反應(yīng)器模式的缺點(diǎn)如下:
反應(yīng)器模式增加了一定的復(fù)雜性赃蛛,因而有一定的門(mén)檻,并且不易于調(diào)試搀菩。
反應(yīng)器模式需要操作系統(tǒng)底層的IO多路復(fù)用的支持呕臂,如Linux中的epoll。如果操作系統(tǒng)的底層不支持IO多路復(fù)用肪跋,反應(yīng)器模式不會(huì)有那么高效歧蒋。
同一個(gè)Handler業(yè)務(wù)線程中,如果出現(xiàn)一個(gè)長(zhǎng)時(shí)間的數(shù)據(jù)讀寫(xiě),會(huì)影響這個(gè)反應(yīng)器中其他通道的IO處理谜洽。例如在大文件傳輸時(shí)萝映,IO操作就會(huì)影響其他客戶(hù)端(Client)的響應(yīng)時(shí)間。因而對(duì)于這種操作阐虚,還需要進(jìn)一步對(duì)反應(yīng)器模式進(jìn)行改進(jìn)序臂。
4.5 本章小結(jié)
反應(yīng)器(Reactor)模式是高性能網(wǎng)絡(luò)編程在設(shè)計(jì)和架構(gòu)層面的基礎(chǔ)模式。同時(shí)敌呈,反應(yīng)器模式贸宏,也是BAT級(jí)別大公司必不可少的面試題。
本章首先從最簡(jiǎn)單的Connection Per Thread(一個(gè)線程處理一個(gè)連接)模式入手磕洪,介紹了該模式的嚴(yán)重缺陷吭练,從而引出來(lái)了單線程的反應(yīng)器模式。
為了充分利用系統(tǒng)資源析显,最大限度地減少阻塞鲫咽,在單線程的反應(yīng)器模式的基礎(chǔ)上,又演進(jìn)出來(lái)了多線程的反應(yīng)器模式實(shí)現(xiàn)谷异。
本章的反應(yīng)器模式的實(shí)現(xiàn)分尸,僅僅是拋磚引玉,在充分利用系統(tǒng)資源歹嘹、最大限度地減少阻塞兩個(gè)維度箩绍,都有很大的提升空間,建議大家自行嘗試尺上。