Redis的性能由哪些因素決定?
-
內(nèi)存
由于Redis是基于內(nèi)存的操作,因此內(nèi)存大小是決定其性能的一個重要因素髓考。
-
CPU
CPU 是另一個重要的影響因素,由于是單線程模型汞贸,Redis 更喜歡大緩存快速 CPU绳军, 而不是多核。
-
網(wǎng)絡(luò)通信
網(wǎng)絡(luò)帶寬和延遲通常是最大短板矢腻。
網(wǎng)絡(luò)通信模型
最終目標: 增加客戶端的訪問連接數(shù)量
-
BIO(阻塞IO模型)
ServerSocket
Socket
-
阻塞體現(xiàn)在兩個地方:
連接阻塞
IO阻塞
-
使用場景
zookeeper的leader選舉(3個節(jié)點门驾, 5個節(jié)點)
nacos的注冊地址信息同步
-
NIO(非阻塞IO)
把連接阻塞和IO阻塞改成非阻塞
Redis 為什么那么快
Redis的高性能主要依賴于幾個方面。
- C語言實現(xiàn)多柑,C語言在一定程度上還是比Java語言性能要高一些奶是,因為C語言不需要經(jīng)過JVM進行翻譯。
- 純內(nèi)存I/O竣灌,內(nèi)存I/O比磁盤I/O性能更快
- I/O多路復(fù)用聂沙,基于epoll的I/O多路復(fù)用技術(shù),實現(xiàn)高吞吐網(wǎng)絡(luò)I/O
- 單線程模型初嘹,單線程無法利用到多核CPU及汉,但是在Redis中,性能瓶頸并不是在計算上屯烦,而是在I/O
- 能力坷随,所以單線程能夠滿足高并發(fā)的要求。 從另一個層面來說驻龟,單線程可以避免多線程的頻繁上
- 下文切換以及同步鎖機制帶來的性能開銷温眉。
- 下面我們分別從上述幾個方面進行展開說明,先來看網(wǎng)絡(luò)I/O的多路復(fù)用模型翁狐。
從請求處理開始分析
當我們在客戶端向Redis Server發(fā)送一條指令类溢,并且得到Redis回復(fù)的整個過程中,Redis做了什么呢露懒?
要處理命令闯冷,則redis必須完整地接收客戶端的請求砂心,并將命令解析出來,再將結(jié)果讀出來蛇耀,通過網(wǎng)絡(luò)回
寫到客戶端计贰。整個工序分為以下幾個部分:
- 接收,通過TCP接收到命令蒂窒,可能會歷經(jīng)多次TCP包、ack荞怒、IO操作
- 解析洒琢,將命令取出來
- 執(zhí)行,到對應(yīng)的地方將value讀出來
- 返回褐桌,將value通過TCP返回給客戶端衰抑,如果value較大,則IO負荷會更重
其中解析和執(zhí)行是純cpu/內(nèi)存操作荧嵌,而接收和返回主要是IO操作呛踊,首先我們先來看通信的過程。
網(wǎng)絡(luò)IO的通信原理
同樣啦撮,用一幅圖來描述網(wǎng)絡(luò)數(shù)據(jù)的傳輸流程
- 首先谭网,對于TCP通信來說,每個TCP Socket的內(nèi)核中都有一個發(fā)送緩沖區(qū)和一個接收緩沖區(qū)接收緩沖區(qū)把數(shù)據(jù)緩存到內(nèi)核赃春,若應(yīng)用進程一直沒有調(diào)用Socket的read方法進行讀取愉择,那么該數(shù)據(jù)會一直被緩存在接收緩沖區(qū)內(nèi)。不管進程是否讀取Socket织中,對端發(fā)來的數(shù)據(jù)都會經(jīng)過內(nèi)核接收并緩存到Socket的內(nèi)核接收緩沖區(qū)锥涕。
- read所要做的工作,就是把內(nèi)核接收緩沖區(qū)中的數(shù)據(jù)復(fù)制到應(yīng)用層用戶的Buffer里狭吼。
- 進程調(diào)用Socket的send發(fā)送數(shù)據(jù)的時候层坠,一般情況下是將數(shù)據(jù)從應(yīng)用層用戶的Buffer里復(fù)制到Socket的內(nèi)核發(fā)送緩沖區(qū),然后send就會在上層返回刁笙。換句話說破花,send返回時,數(shù)據(jù)不一定會被發(fā)送到對端采盒。
- 網(wǎng)卡中的緩沖區(qū)既不屬于內(nèi)核空間旧乞,也不屬于用戶空間。它屬于硬件緩沖磅氨,允許網(wǎng)卡與操作系統(tǒng)之間有
個緩沖尺栖; 內(nèi)核緩沖區(qū)在內(nèi)核空間,在內(nèi)存中烦租,用于內(nèi)核程序延赌,做為讀自或?qū)懲布臄?shù)據(jù)緩沖區(qū)除盏; 用
戶緩沖區(qū)在用戶空間,在內(nèi)存中挫以,用于用戶程序者蠕,做為讀自或?qū)懲布臄?shù)據(jù)緩沖區(qū)。 - 網(wǎng)卡芯片收到網(wǎng)絡(luò)數(shù)據(jù)會以中斷的方式通知CPU掐松,我有數(shù)據(jù)了踱侣,存在我的硬件緩沖里了,來讀我啊大磺。
CPU收到這個中斷信號后抡句,會調(diào)用相應(yīng)的驅(qū)動接口函數(shù)從網(wǎng)卡的硬件緩沖里把數(shù)據(jù)讀到內(nèi)核緩沖區(qū),正
常情況下會向上傳遞給TCP/IP模塊一層一層的處理杠愧。
IO多路復(fù)用機制
Redis的通信采用的是多路復(fù)用機制待榔,什么是多路復(fù)用機制呢?
- 由于Redis是C語言實現(xiàn)流济,為了方便理解锐锣,我們采用Java語言來描述這個過程。
在理解多路復(fù)用之前绳瘟,我們先來了解一下BIO雕憔。
BIO模型
在Java中,如果要實現(xiàn)網(wǎng)絡(luò)通信稽荧,我們會采用Socket套接字來完成橘茉。
Socket不是一個協(xié)議,而是一個通信模型姨丈。其實它最初是BSD發(fā)明的畅卓,主要用于一臺電腦的兩個進程間通信,后來把它用到了兩臺電腦的進程間通信蟋恬。所以翁潘,可以把它簡單理解為進程間通信,不是什么高級的東西歼争。主要做的事情不就是:
- A發(fā)包:發(fā)請求包給某個已經(jīng)綁定的端口(所以我們經(jīng)常會訪問這樣的地址182.13.15.16:1235拜马,1235就是端口);收到B的允許沐绒;然后正式發(fā)送俩莽;發(fā)送完了,告訴B要斷開鏈接乔遮;收到斷開允許扮超,馬上斷開,然后發(fā)送已經(jīng)斷開信息給B。
- B收包:綁定端口和IP出刷;然后在這個端口監(jiān)聽璧疗;接收到A的請求,發(fā)允許給A馁龟,并做好接收準備崩侠,主要就是清理緩存等待接收新數(shù)據(jù);然后正式接收坷檩;接收到斷開請求却音,允許斷開;確認斷開后矢炼,繼續(xù)監(jiān)聽其它請求僧家。
可見,Socket其實就是I/O操作裸删,Socket并不僅限于網(wǎng)絡(luò)通信,在網(wǎng)絡(luò)通信中阵赠,它涵蓋了網(wǎng)絡(luò)層涯塔、傳輸層、會話層清蚀、表示層匕荸、應(yīng)用層——其實這都不需要記,因為Socket通信時候用到了IP和端口枷邪,僅這兩個就表明了它用到了網(wǎng)絡(luò)層和傳輸層榛搔;而且它無視多臺電腦通信的系統(tǒng)差別,所以它涉及了表示層东揣;一般Socket都是基于一個應(yīng)用程序的践惑,所以會涉及到會話層和應(yīng)用層。
構(gòu)建基礎(chǔ)的BIO通信模型
@Slf4j
public class BIOServerSocket {
public static final int PORT = 8080;
public static void main(String[] args) {
try(ServerSocket serverSocket = new ServerSocket(PORT)) {
log.info("啟動服務(wù):監(jiān)聽端口:{}",PORT);
//阻塞等待監(jiān)聽一個客戶端連接,返回的socket表示連接的客戶端信息
Socket socket = serverSocket.accept();
log.info("客戶端:{}連接成功",socket.getPort());
//阻塞(InputStream是阻塞的)等待獲取客戶端請求報文
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String clientStr = bufferedReader.readLine();
log.info("收到客戶端發(fā)送的消息:{}",clientStr);
//構(gòu)建輸出流,寫回客戶端
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
bufferedWriter.write("server return message:" + clientStr + "\n");
//清空緩沖區(qū)嘶卧,發(fā)送消息
bufferedWriter.flush();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
測試效果:
我們通過對BIOServerSocket進行改造尔觉,關(guān)注case1和case2部分。
- case1: 增加了while循環(huán)芥吟,實現(xiàn)重復(fù)監(jiān)聽
- case2: 當服務(wù)端收到客戶端的請求后侦铜,不直接返回,而是等待20s钟鸵。
@Slf4j
public class BIOServerSocket2 {
public static final int PORT = 8080;
public static void main(String[] args) {
try(ServerSocket serverSocket = new ServerSocket(PORT)) {
log.info("啟動服務(wù):監(jiān)聽端口:{}",PORT);
//循環(huán)接收請求
while (true){
//阻塞等待監(jiān)聽一個客戶端連接,返回的socket表示連接的客戶端信息
Socket socket = serverSocket.accept();
log.info("客戶端:{}連接成功",socket.getPort());
//阻塞(InputStream是阻塞的)等待獲取客戶端請求報文
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String clientStr = bufferedReader.readLine();
log.info("收到客戶端發(fā)送的消息:{}",clientStr);
//等待20秒
Thread.sleep(20*1000);
//構(gòu)建輸出流,寫回客戶端
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
bufferedWriter.write("server return message:" + clientStr + "\n");
//清空緩沖區(qū)钉稍,發(fā)送消息
bufferedWriter.flush();
}
} catch (IOException ioException) {
ioException.printStackTrace();
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}
客戶端代碼:BIOClientSocket
@Slf4j
public class BIOClientSocket {
public static final String HOST = "127.0.0.1";
public static final int PORT = 8080;
public static void main(String[] args) {
try(Socket socket = new Socket(HOST,PORT)) {
log.info("客戶端連接端口:{}:{}",HOST,PORT);
//構(gòu)建輸出流,請求服務(wù)端
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
bufferedWriter.write("client1 msg: hello \n");
//清空緩沖區(qū),發(fā)送消息
bufferedWriter.flush();
//阻塞(InputStream是阻塞的)等待獲取客戶端請求報文
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String clientStr = bufferedReader.readLine();
log.info("收到服務(wù)端返回的消息:{}",clientStr);
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
接著棺耍,把BIOClientSocket復(fù)制兩份(client1贡未、client2),同時向BIOServerSocket發(fā)起請求。
現(xiàn)象: client1先發(fā)送請求到Server端羞秤,由于Server端等待20s才返回缸托,導(dǎo)致client2的請求一直被阻塞。
這個情況會導(dǎo)致一個問題瘾蛋,如果服務(wù)端在同一個時刻只能處理一個客戶端的連接俐镐,而如果一個網(wǎng)站同時有1000個用戶訪問,那么剩下的999個用戶都需要等待哺哼,而這個等待的耗時取決于前面的請求的處理時長佩抹,如圖所示。
基于多線程優(yōu)化BIO
為了讓服務(wù)端能夠同時處理更多的客戶端連接取董,避免因為某個客戶端連接阻塞導(dǎo)致后續(xù)請求被阻塞棍苹,于是引入多線程技術(shù),代碼如下茵汰。
BIOServerSocketWithThread
@Slf4j
public class BIOServerSocketWithThread {
public static TaskExecutor taskExecutor = new TaskExecutor() {
ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(),
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), Executors.defaultThreadFactory());
@Override
public void execute(Runnable task) {
executorService.execute(task);
}
};
public static final int PORT = 8080;
public static void main(String[] args) {
try(ServerSocket serverSocket = new ServerSocket(PORT)) {
log.info("啟動服務(wù):監(jiān)聽端口:{}",PORT);
//循環(huán)接收請求
while (true){
//阻塞等待監(jiān)聽一個客戶端連接,返回的socket表示連接的客戶端信息
Socket socket = serverSocket.accept();
log.info("客戶端:{}連接成功",socket.getPort());
// I/O異步執(zhí)行
taskExecutor.execute(new SocketThread(socket));
}
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
SocketThread
@Slf4j
public class SocketThread implements Runnable{
private Socket socket;
public SocketThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
//阻塞(InputStream是阻塞的)等待獲取客戶端請求報文
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String clientStr = bufferedReader.readLine();
log.info("收到客戶端發(fā)送的消息:{}",clientStr);
//等待20秒
Thread.sleep(20*1000);
//構(gòu)建輸出流,寫回客戶端
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
bufferedWriter.write("server return message:" + clientStr + "\n");
//清空緩沖區(qū)枢里,發(fā)送消息
bufferedWriter.flush();
}catch (Exception e){
e.printStackTrace();
}finally {
//TODO 關(guān)閉IO流
}
}
}
結(jié)果:
當引入了多線程之后,每個客戶端的鏈接(Socket)蹂午,我們可以直接給到線程池去執(zhí)行栏豺,而由于這個過程是異步的,所以并不會同步阻塞影響后續(xù)鏈接的監(jiān)聽豆胸,因此在一定程度上可以提升服務(wù)端鏈接的處理數(shù)量奥洼。
NIO非阻塞IO
使用多線程的方式來解決這個問題,仍然有一個缺點晚胡,線程的數(shù)量取決于硬件配置灵奖,所以線程數(shù)量是有限的,如果請求量比較大的時候估盘,線程本身會收到限制從而并發(fā)量也不會太高瓷患。那怎么辦呢,我們可以采用非阻塞IO遣妥。
-
NIO 從JDK1.4 提出的尉尾,本意是New IO,它的出現(xiàn)為了彌補原本IO的不足燥透,提供了更高效的方式沙咏,提出一個通道(channel)的概念,在IO中它始終以流的形式對數(shù)據(jù)的傳輸和接收班套,下面我們演示一下NIO的使用肢藐。
NIOServerSocket
@Slf4j public class NIOServerSocket { public static final int PORT = 8080; public static void main(String[] args) { try { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //設(shè)置連接非阻塞 serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(PORT)); while(true){ //獲得一個客戶端連接,非阻塞 SocketChannel socketChannel = serverSocketChannel.accept(); if(socketChannel != null){ log.info("客戶端連接:{}",socketChannel.getRemoteAddress()); ByteBuffer byteBuffer=ByteBuffer.allocate(1024); socketChannel.read(byteBuffer); log.info("Server Receive Msg:"+new String(byteBuffer.array())); Thread.sleep(10000); //反轉(zhuǎn) byteBuffer.flip(); socketChannel.write(byteBuffer); }else{ Thread.sleep(1000); log.info("客戶端未連接"); } } } catch (IOException | InterruptedException e) { e.printStackTrace(); } } }
NIOClientSocket
@Slf4j public class NIOClientSocket { public static final String HOST = "127.0.0.1"; public static final int PORT = 8080; public static void main(String[] args) { try(SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(HOST,PORT))) { socketChannel.configureBlocking(false); log.info("客戶端連接端口:{}:{}",HOST,PORT); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); String hello = "NIOClient msg: hello \n"; byteBuffer.put(hello.getBytes()); byteBuffer.flip(); socketChannel.write(byteBuffer); while(true) { byteBuffer.clear(); int i = socketChannel.read(byteBuffer); if (i > 0) { log.info("收到服務(wù)端的數(shù)據(jù):{}",new String(byteBuffer.array())); } else { log.info("服務(wù)端數(shù)據(jù)未準備好"); Thread.sleep(1000); } } } catch (IOException | InterruptedException ioException) { ioException.printStackTrace(); } } }
演示效果:
服務(wù)端
客戶端
- 所謂的NIO(非阻塞IO),其實就是取消了IO阻塞和連接阻塞吱韭,當服務(wù)端不存在阻塞的時候吆豹,就可以不斷輪詢處理客戶端的請求鱼的,如圖所示,表示NIO下的運行流程痘煤。
上述這種NIO的使用方式凑阶,仍然存在一個問題,就是客戶端或者服務(wù)端需要通過一個線程不斷輪詢才能
獲得結(jié)果衷快,而這個輪詢過程中會浪費線程資源宙橱。
多路復(fù)用IO
大家站在全局的角度再思考一下整個過程,有哪些地方可以優(yōu)化呢蘸拔?
-
我們回到NIOClientSocket中下面這段代碼师郑,當客戶端通過 read 方法去讀取服務(wù)端返回的數(shù)據(jù)時,如果此時服務(wù)端數(shù)據(jù)未準備好调窍,對于客戶端來說就是一次無效的輪詢宝冕。
while(true) { byteBuffer.clear(); int i = socketChannel.read(byteBuffer); if (i > 0) { log.info("收到服務(wù)端的數(shù)據(jù):{}",new String(byteBuffer.array())); } else { log.info("服務(wù)端數(shù)據(jù)未準備好"); Thread.sleep(1000); } }
我們能不能夠設(shè)計成,當客戶端調(diào)用 read 方法之后邓萨,不僅僅不阻塞地梨,同時也不需要輪詢。而是等到服務(wù)端的數(shù)據(jù)就緒之后缔恳, 告訴客戶端湿刽。然后客戶端再去讀取服務(wù)端返回的數(shù)據(jù)呢?所以為了優(yōu)化這個問題褐耳,引入了多路復(fù)用機制。
-
I/O多路復(fù)用的本質(zhì)是通過一種機制(系統(tǒng)內(nèi)核緩沖I/O數(shù)據(jù))渴庆,讓單個進程可以監(jiān)視多個文件描述符铃芦,一旦某個描述符就緒(一般是讀就緒或?qū)懢途w),能夠通知程序進行相應(yīng)的讀寫操作襟雷。
什么是文件描述符(fd):在linux中刃滓,內(nèi)核把所有的外部設(shè)備都當成是一個文件來操作,對一個文件的讀寫會調(diào)用內(nèi)核提供的系統(tǒng)命令耸弄,返回一個fd(文件描述符)咧虎。而對于一個socket的讀寫也會有相應(yīng)的文件描述符,成為socketfd计呈。
-
常見的IO多路復(fù)用方式有【select砰诵、poll、epoll】捌显,都是Linux API提供的IO復(fù)用方式茁彭,那么接下來重點講一下select、和epoll這兩個模型扶歪。
-
select:進程可以通過把一個或者多個fd傳遞給select系統(tǒng)調(diào)用理肺,進程會阻塞在select操作上,這樣select可以幫我們檢測多個fd是否處于就緒狀態(tài),這個模式有兩個缺點
由于他能夠同時監(jiān)聽多個文件描述符妹萨,假如說有1000個年枕,這個時候如果其中一個fd 處于就緒狀態(tài)了,那么當前進程需要線性輪詢所有的fd乎完,也就是監(jiān)聽的fd越多熏兄,性能開銷越大。
同時囱怕,select在單個進程中能打開的fd是有限制的霍弹,默認是1024,對于那些需要支持單機上萬的TCP連接來說確實有點少娃弓。
-
epoll:linux還提供了epoll的系統(tǒng)調(diào)用典格,epoll是基于事件驅(qū)動方式來代替順序掃描,因此性能相對來說更高台丛,主要原理是耍缴,當被監(jiān)聽的fd中,有fd就緒時挽霉,會告知當前進程具體哪一個fd就緒防嗡,那么當前進程只需要去從指定的fd上讀取數(shù)據(jù)即可,另外侠坎,epoll所能支持的fd上線是操作系統(tǒng)的最大文件句柄蚁趁,這個數(shù)字要遠遠大于1024。
由于epoll能夠通過事件告知應(yīng)用進程哪個fd是可讀的实胸,所以我們也稱這種IO為異步非阻塞IO他嫡,當然它是偽異步的,因為它還需要去把數(shù)據(jù)從內(nèi)核同步復(fù)制到用戶空間中庐完,真正的異步非阻塞钢属,應(yīng)該是數(shù)據(jù)已經(jīng)完全準備好了,我只需要從用戶空間讀就行门躯。
-
I/O多路復(fù)用的好處是可以通過把多個I/O的阻塞復(fù)用到同一個select的阻塞上淆党,從而使得系統(tǒng)在單線程的情況下可以同時處理多個客戶端請求。它的最大優(yōu)勢是系統(tǒng)開銷小讶凉,并且不需要創(chuàng)建新的進程或者線程染乌,降低了系統(tǒng)的資源開銷,它的整體實現(xiàn)思想如圖所示
-
客戶端請求到服務(wù)端后懂讯,此時客戶端在傳輸數(shù)據(jù)過程中慕匠,為了避免Server端在read客戶端數(shù)據(jù)過程中阻塞,服務(wù)端會把該請求注冊到Selector復(fù)路器上域醇,服務(wù)端此時不需要等待台谊,只需要啟動一個線程蓉媳,通過selector.select()阻塞輪詢復(fù)路器上就緒的channel即可,也就是說锅铅,如果某個客戶端連接數(shù)據(jù)傳輸完成酪呻,那么select()方法會返回就緒的channel,然后執(zhí)行相關(guān)的處理即可盐须。代碼如下:
@Slf4j public class NIOSelectorServerSocket implements Runnable{ public static final int PORT = 8080; Selector selector; ServerSocketChannel serverSocketChannel; public NIOSelectorServerSocket(int port) throws IOException { selector = Selector.open(); serverSocketChannel=ServerSocketChannel.open(); //如果采用selector模型玩荠,必須要設(shè)置非阻塞 serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } @Override public void run() { while(!Thread.interrupted()){ try { //阻塞等待事件就緒 selector.select(); //事件列表 Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while(it.hasNext()){ //說明有連接進來 dispatch((SelectionKey) it.next()); //移除當前就緒的事件 it.remove(); } } catch (IOException e) { e.printStackTrace(); } } } private void dispatch(SelectionKey key) throws IOException { //是連接事件? if(key.isAcceptable()){ log.info("客戶端注冊事件:{}",key); register(key); }else if(key.isReadable()){ log.info("讀事件:{}",key); //讀事件 read(key); }else if(key.isWritable()){ log.info("寫事件:{}",key); //寫事件 write(key); } } private void register(SelectionKey key) throws IOException { //客戶端連接 ServerSocketChannel channel= (ServerSocketChannel) key.channel(); //獲得客戶端連接 SocketChannel socketChannel = channel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector,SelectionKey.OP_READ); } private void read(SelectionKey key) throws IOException { //得到的是socketChannel SocketChannel channel= (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); channel.read(byteBuffer); log.info("Server Receive Msg:{}",new String(byteBuffer.array())); //反轉(zhuǎn) byteBuffer.flip(); channel.write(byteBuffer); } private void write(SelectionKey key) throws IOException { //得到的是socketChannel } public static void main(String[] args) throws IOException { NIOSelectorServerSocket selectorServerSocket=new NIOSelectorServerSocket(PORT); new Thread(selectorServerSocket).start(); } }
-
事實上NIO已經(jīng)解決了上述BIO暴露的下面兩個問題:
- 同步阻塞IO贼邓,讀寫阻塞阶冈,線程等待時間過長。
- 在制定線程策略的時候塑径,只能根據(jù)CPU的數(shù)目來限定可用線程資源女坑,不能根據(jù)連接并發(fā)數(shù)目來制定,也就是連接有限制统舀。否則很難保證對客戶端請求的高效和公平匆骗。
到這里為止,通過NIO的多路復(fù)用機制誉简,解決了IO阻塞導(dǎo)致客戶端連接處理受限的問題碉就,服務(wù)端只需要一個線程就可以維護多個客戶端,并且客戶端的某個連接如果準備就緒時闷串,會通過事件機制告訴應(yīng)用程序某個channel可用瓮钥,應(yīng)用程序通過select方法選出就緒的channel進行處理。
單線程Reactor 模型(高性能I/O設(shè)計模式)
-
了解了NIO多路復(fù)用后烹吵,就有必要再和大家說一下Reactor多路復(fù)用高性能I/O設(shè)計模式碉熄,Reactor本質(zhì)上就是基于NIO多路復(fù)用機制提出的一個高性能IO設(shè)計模式,它的核心思想是把響應(yīng)IO事件和業(yè)務(wù)處理進行分離年叮,通過一個或者多個線程來處理IO事件,然后將就緒得到事件分發(fā)到業(yè)務(wù)處理handlers線程去異步非阻塞處理玻募,如圖所示只损。
Reactor模型有三個重要的組件:
Reactor :將I/O事件發(fā)派給對應(yīng)的Handler
Acceptor :處理客戶端連接請求
Handlers :執(zhí)行非阻塞讀/寫
-
一個單線程的Reactor模型,代碼如下:
ReactorMain
public class ReactorMain { public static final int PORT = 8080; public static void main(String[] args) throws IOException { new Thread(new Reactor(PORT),"Main-Thread").start(); } }
Reactor
@Slf4j public class Reactor implements Runnable{ private final Selector selector; private final ServerSocketChannel serverSocketChannel; public Reactor(int port) throws IOException { selector=Selector.open(); serverSocketChannel= ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT,new Acceptor(selector,serverSocketChannel)); } @Override public void run() { while(!Thread.interrupted()){ try { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while(iterator.hasNext()){ dispatch(iterator.next()); iterator.remove(); } } catch (IOException e) { e.printStackTrace(); } } } private void dispatch(SelectionKey key){ //可能拿到的對象有兩個 // Acceptor // Handler Runnable runnable = (Runnable)key.attachment(); if(runnable != null){ runnable.run(); } } }
Acceptor
@Slf4j public class Acceptor implements Runnable{ private final Selector selector; private final ServerSocketChannel serverSocketChannel; public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) { this.selector = selector; this.serverSocketChannel = serverSocketChannel; } @Override public void run() { SocketChannel channel; try { //得到一個客戶端連接 channel = serverSocketChannel.accept(); log.info("{}:收到一個客戶端連接",channel.getRemoteAddress()); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ,new Handler(channel)); } catch (IOException e) { e.printStackTrace(); } } }
Handler
@Slf4j public class Handler implements Runnable{ SocketChannel channe; public Handler(SocketChannel channe) { this.channe = channe; } @Override public void run() { log.info("{}------",Thread.currentThread().getName()); ByteBuffer buffer=ByteBuffer.allocate(1024); /*try { Thread.sleep(1000000); } catch (InterruptedException e) { e.printStackTrace(); }*/ int len=0,total=0; String msg=""; try { do { len = channe.read(buffer); if(len>0){ total+=len; msg+=new String(buffer.array()); } } while (len > buffer.capacity()); log.info("total:{}",total); //msg=表示通信傳輸報文 //耗時2s //登錄: username:password //ServetRequets: 請求信息 //數(shù)據(jù)庫的判斷 //返回數(shù)據(jù)七咧,通過channel寫回到客戶端 log.info("{}: Server receive Msg:{}",channe.getRemoteAddress(),msg); }catch (Exception e){ e.printStackTrace(); }finally { if(channe!=null){ try { channe.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
代碼是最基本的單Reactor單線程模型(整體的I/O操作是由同一個線程完成的)跃惫。
其中Reactor線程,負責(zé)多路分離套接字艾栋,有新連接到來觸發(fā)connect 事件之后爆存,交由Acceptor進行處理,有IO讀寫事件之后交給hanlder 處理蝗砾。
Acceptor主要任務(wù)就是構(gòu)建handler 先较,在獲取到和client相關(guān)的SocketChannel之后 携冤,綁定到相應(yīng)的hanlder上隐孽,對應(yīng)的SocketChannel有讀寫事件之后耐薯,基于racotor 分發(fā),hanlder就可以處理了(所有的IO事件都綁定到selector上攒发,由Reactor分發(fā))手幢。Reactor 模式本質(zhì)上指的是使用 I/O 多路復(fù)用(I/O multiplexing) + 非阻塞 I/O(nonblocking I/O) 的模式琳钉。
多線程單Reactor模型
-
單線程Reactor這種實現(xiàn)方式有存在著缺點籍铁,從實例代碼中可以看出冕碟,handler的執(zhí)行是串行的滚粟,如果其中一個handler處理線程阻塞將導(dǎo)致其他的業(yè)務(wù)處理阻塞癌幕。由于handler和reactor在同一個線程中的執(zhí)行衙耕,這也將導(dǎo)致新的無法接收新的請求,我們做一個小實驗:
- 在上述Reactor代碼的DispatchHandler的run方法中勺远,增加一個Thread.sleep()橙喘。
- 打開多個客戶端窗口連接到Reactor Server端,其中一個窗口發(fā)送一個信息后被阻塞谚中,另外一個窗口再發(fā)信息時由于前面的請求阻塞導(dǎo)致后續(xù)請求無法被處理渴杆。
為了解決這種問題,有人提出使用多線程的方式來處理業(yè)務(wù)宪塔,也就是在業(yè)務(wù)處理的地方加入線程池異步處理磁奖,將reactor和handler在不同的線程來執(zhí)行,如圖所示某筐。
多線程改造-MultiDispatchHandler比搭,我們直接將前面的Reactor單線程模型改成多線程,其實我們就是把IO阻塞的問題通過異步的方式做了優(yōu)化南誊,源碼如下:
@Slf4j
public class MutilDispatchHandler implements Runnable{
SocketChannel channel;
private Executor executor= Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public MutilDispatchHandler(SocketChannel channel) {
this.channel = channel;
}
@Override
public void run() {
processor();
}
private void processor(){
executor.execute(new ReaderHandler(channel));
}
static class ReaderHandler implements Runnable{
private SocketChannel channel;
public ReaderHandler(SocketChannel channel) {
this.channel = channel;
}
@Override
public void run() {
log.info("{}------",Thread.currentThread().getName());
ByteBuffer buffer=ByteBuffer.allocate(1024);
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int len=0,total=0;
String msg="";
try {
do {
len = channel.read(buffer);
if(len>0){
total+=len;
msg+=new String(buffer.array());
}
} while (len > buffer.capacity());
System.out.println("total:"+total);
//msg=表示通信傳輸報文
//耗時2s
//登錄: username:password
//ServetRequets: 請求信息
//數(shù)據(jù)庫的判斷
//返回數(shù)據(jù)身诺,通過channel寫回到客戶端
log.info("{}: Server receive Msg:{}",channel.getRemoteAddress(),msg);
}catch (Exception e){
e.printStackTrace();
}finally {
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
- 在多線程Reactor模型中,添加了一個工作者線程池抄囚,并將非I/O操作從Reactor線程中移出轉(zhuǎn)交給工作者線程池來執(zhí)行霉赡。這樣能夠提高Reactor線程的I/O響應(yīng),不至于因為一些耗時的業(yè)務(wù)邏輯而延遲對后面I/O請求的處理幔托。