我們一直說Redis的性能很快,那為什么快运敢?Redis為了達(dá)到性能最大化,做了哪些方面的優(yōu)化呢忠售?
在深度解析Redis的數(shù)據(jù)結(jié)構(gòu)
這篇文章中者冤,其實從數(shù)據(jù)結(jié)構(gòu)上分析了Redis性能高的一方面原因。
在目前的k-v數(shù)據(jù)庫的技術(shù)選型中档痪,Redis幾乎是首選的用來實現(xiàn)高性能緩存的方案,它的性能有多快呢邢滑?
根據(jù)官方的基準(zhǔn)測試數(shù)據(jù)腐螟,一臺普通硬件配置的Linux機(jī)器上運(yùn)行單個Redis實例愿汰,處理簡單命令(O(n)或者O(logn)),QPS可以達(dá)到8W乐纸,如果使用pipeline批處理功能衬廷,QPS最高可以達(dá)到10W。
Redis 為什么那么快
Redis的高性能主要依賴于幾個方面汽绢。
- C語言實現(xiàn)吗跋,C語言在一定程度上還是比Java語言性能要高一些,因為C語言不需要經(jīng)過JVM進(jìn)行翻譯宁昭。
- 純內(nèi)存I/O跌宛,內(nèi)存I/O比磁盤I/O性能更快
- I/O多路復(fù)用,基于epoll的I/O多路復(fù)用技術(shù)积仗,實現(xiàn)高吞吐網(wǎng)絡(luò)I/O
- 單線程模型疆拘,單線程無法利用到多核CPU,但是在Redis中寂曹,性能瓶頸并不是在計算上哎迄,而是在I/O能力,所以單線程能夠滿足高并發(fā)的要求隆圆。 從另一個層面來說漱挚,單線程可以避免多線程的頻繁上下文切換以及同步鎖機(jī)制帶來的性能開銷。
下面我們分別從上述幾個方面進(jìn)行展開說明渺氧,先來看網(wǎng)絡(luò)I/O的多路復(fù)用模型旨涝。
從請求處理開始分析
當(dāng)我們在客戶端向Redis Server發(fā)送一條指令,并且得到Redis回復(fù)的整個過程中阶女,Redis做了什么呢颊糜?
<center>圖4-1</center>
要處理命令,則redis必須完整地接收客戶端的請求秃踩,并將命令解析出來衬鱼,再將結(jié)果讀出來,通過網(wǎng)絡(luò)回寫到客戶端憔杨。整個工序分為以下幾個部分:
- 接收鸟赫,通過TCP接收到命令,可能會歷經(jīng)多次TCP包消别、ack抛蚤、IO操作
- 解析,將命令取出來
- 執(zhí)行寻狂,到對應(yīng)的地方將value讀出來
- 返回岁经,將value通過TCP返回給客戶端,如果value較大蛇券,則IO負(fù)荷會更重
其中解析和執(zhí)行是純cpu/內(nèi)存操作缀壤,而接收和返回主要是IO操作樊拓,首先我們先來看通信的過程。
網(wǎng)絡(luò)IO的通信原理
同樣塘慕,我也畫了一幅圖來描述網(wǎng)絡(luò)數(shù)據(jù)的傳輸流程
首先筋夏,對于TCP通信來說,每個TCP Socket的內(nèi)核中都有一個發(fā)送緩沖區(qū)和一個接收緩沖區(qū)
接收緩沖區(qū)把數(shù)據(jù)緩存到內(nèi)核图呢,若應(yīng)用進(jìn)程一直沒有調(diào)用Socket的read方法進(jìn)行讀取条篷,那么該數(shù)據(jù)會一直被緩存在接收緩沖區(qū)內(nèi)。不管進(jìn)程是否讀取Socket蛤织,對端發(fā)來的數(shù)據(jù)都會經(jīng)過內(nèi)核接收并緩存到Socket的內(nèi)核接收緩沖區(qū)赴叹。
read所要做的工作,就是把內(nèi)核接收緩沖區(qū)中的數(shù)據(jù)復(fù)制到應(yīng)用層用戶的Buffer里瞳筏。
進(jìn)程調(diào)用Socket的send發(fā)送數(shù)據(jù)的時候稚瘾,一般情況下是將數(shù)據(jù)從應(yīng)用層用戶的Buffer里復(fù)制到Socket的內(nèi)核發(fā)送緩沖區(qū),然后send就會在上層返回姚炕。換句話說摊欠,send返回時,數(shù)據(jù)不一定會被發(fā)送到對端柱宦。
網(wǎng)卡中的緩沖區(qū)既不屬于內(nèi)核空間些椒,也不屬于用戶空間。它屬于硬件緩沖掸刊,允許網(wǎng)卡與操作系統(tǒng)之間有個緩沖免糕;
內(nèi)核緩沖區(qū)在內(nèi)核空間,在內(nèi)存中忧侧,用于內(nèi)核程序石窑,做為讀自或?qū)懲布臄?shù)據(jù)緩沖區(qū);
用戶緩沖區(qū)在用戶空間蚓炬,在內(nèi)存中松逊,用于用戶程序,做為讀自或?qū)懲布臄?shù)據(jù)緩沖區(qū)
網(wǎng)卡芯片收到網(wǎng)絡(luò)數(shù)據(jù)會以中斷的方式通知CPU肯夏,我有數(shù)據(jù)了经宏,存在我的硬件緩沖里了,來讀我啊驯击。
CPU收到這個中斷信號后烁兰,會調(diào)用相應(yīng)的驅(qū)動接口函數(shù)從網(wǎng)卡的硬件緩沖里把數(shù)據(jù)讀到內(nèi)核緩沖區(qū),正常情況下會向上傳遞給TCP/IP模塊一層一層的處理徊都。
NIO多路復(fù)用機(jī)制
Redis的通信采用的是多路復(fù)用機(jī)制沪斟,什么是多路復(fù)用機(jī)制呢?
由于Redis是C語言實現(xiàn)暇矫,為了簡化大家的理解币喧,我們采用Java語言來描述這個過程轨域。
在理解多路復(fù)用之前,我們先來了解一下BIO杀餐。
BIO模型
在Java中,如果要實現(xiàn)網(wǎng)絡(luò)通信朱巨,我們會采用Socket套接字來完成史翘。
Socket這不是一個協(xié)議,而是一個通信模型冀续。其實它最初是BSD發(fā)明的琼讽,主要用來一臺電腦的兩個進(jìn)程間通信,然后把它用到了兩臺電腦的進(jìn)程間通信洪唐。所以钻蹬,可以把它簡單理解為進(jìn)程間通信,不是什么高級的東西凭需。主要做的事情不就是:
A發(fā)包:發(fā)請求包給某個已經(jīng)綁定的端口(所以我們經(jīng)常會訪問這樣的地址182.13.15.16:1235问欠,1235就是端口);收到B的允許粒蜈;然后正式發(fā)送顺献;發(fā)送完了,告訴B要斷開鏈接枯怖;收到斷開允許注整,馬上斷開,然后發(fā)送已經(jīng)斷開信息給B度硝。
B收包:綁定端口和IP肿轨;然后在這個端口監(jiān)聽;接收到A的請求蕊程,發(fā)允許給A椒袍,并做好接收準(zhǔn)備,主要就是清理緩存等待接收新數(shù)據(jù)存捺;然后正式接收槐沼;接受到斷開請求,允許斷開捌治;確認(rèn)斷開后岗钩,繼續(xù)監(jiān)聽其它請求。
可見肖油,Socket其實就是I/O操作兼吓,Socket并不僅限于網(wǎng)絡(luò)通信,在網(wǎng)絡(luò)通信中森枪,它涵蓋了網(wǎng)絡(luò)層视搏、傳輸層审孽、會話層、表示層浑娜、應(yīng)用層——其實這都不需要記佑力,因為Socket通信時候用到了IP和端口,僅這兩個就表明了它用到了網(wǎng)絡(luò)層和傳輸層筋遭;而且它無視多臺電腦通信的系統(tǒng)差別打颤,所以它涉及了表示層;一般Socket都是基于一個應(yīng)用程序的漓滔,所以會涉及到會話層和應(yīng)用層编饺。
構(gòu)建基礎(chǔ)的BIO通信模型
BIOServerSocket
public class BIOServerSocket {
//先定義一個端口號,這個端口的值是可以自己調(diào)整的响驴。
static final int DEFAULT_PORT=8080;
public static void main(String[] args) throws IOException {
//先定義一個端口號透且,這個端口的值是可以自己調(diào)整的。
//在服務(wù)器端豁鲤,我們需要使用ServerSocket秽誊,所以我們先聲明一個ServerSocket變量
ServerSocket serverSocket=null;
//接下來,我們需要綁定監(jiān)聽端口, 那我們怎么做呢畅形?只需要創(chuàng)建使用serverSocket實例
//ServerSocket有很多構(gòu)造重載养距,在這里,我們把前邊定義的端口傳入日熬,表示當(dāng)前
//ServerSocket監(jiān)聽的端口是8080
serverSocket=new ServerSocket(DEFAULT_PORT);
System.out.println("啟動服務(wù)棍厌,監(jiān)聽端口:"+DEFAULT_PORT);
//回顧一下前面我們講的內(nèi)容,接下來我們就需要開始等待客戶端的連接了竖席。
//所以我們要使用的是accept這個函數(shù)耘纱,并且當(dāng)accept方法獲得一個客戶端請求時,會返回
//一個socket對象毕荐, 這個socket對象讓服務(wù)器可以用來和客戶端通信的一個端點束析。
//開始等待客戶端連接,如果沒有客戶端連接憎亚,就會一直阻塞在這個位置
Socket socket=serverSocket.accept();
//很可能有多個客戶端來發(fā)起連接员寇,為了區(qū)分客戶端,咱們可以輸出客戶端的端口號
System.out.println("客戶端:"+socket.getPort()+"已連接");
//一旦有客戶端連接過來第美,我們就可以用到IO來獲得客戶端傳過來的數(shù)據(jù)蝶锋。
//使用InputStream來獲得客戶端的輸入數(shù)據(jù)
//bufferedReader大家還記得吧,他維護(hù)了一個緩沖區(qū)可以減少數(shù)據(jù)源讀取的頻率
BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));
String clientStr=bufferedReader.readLine(); //讀取一行信息
System.out.println("客戶端發(fā)了一段消息:"+clientStr);
//服務(wù)端收到數(shù)據(jù)以后什往,可以給到客戶端一個回復(fù)扳缕。這里咱們用到BufferedWriter
BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
bufferedWriter.write("我已經(jīng)收到你的消息了\n");
bufferedWriter.flush(); //清空緩沖區(qū)觸發(fā)消息發(fā)送
}
}
BIOClientSocket
public class BIOClientSocket {
static final int DEFAULT_PORT=8080;
public static void main(String[] args) throws IOException {
//在客戶端這邊,咱們使用socket來連接到指定的ip和端口
Socket socket=new Socket("localhost",8080);
//使用BufferedWriter,像服務(wù)器端寫入一個消息
BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
bufferedWriter.write("我是客戶端Client-01\n");
bufferedWriter.flush();
BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));
String serverStr=bufferedReader.readLine(); //通過bufferedReader讀取服務(wù)端返回的消息
System.out.println("服務(wù)端返回的消息:"+serverStr);
}
}
上述代碼構(gòu)建了一個簡單的BIO通信模型躯舔,也就是服務(wù)端建立一個監(jiān)聽驴剔,客戶端向服務(wù)端發(fā)送一個消息,實現(xiàn)簡單的網(wǎng)絡(luò)通信粥庄,那BIO有什么弊端呢丧失?
我們通過對BIOServerSocket進(jìn)行改造,關(guān)注case1和case2部分飒赃。
- case1: 增加了while循環(huán)利花,實現(xiàn)重復(fù)監(jiān)聽
- case2: 當(dāng)服務(wù)端收到客戶端的請求后,不直接返回载佳,而是等待20s。
public class BIOServerSocket {
//先定義一個端口號臀栈,這個端口的值是可以自己調(diào)整的蔫慧。
static final int DEFAULT_PORT=8080;
public static void main(String[] args) throws IOException, InterruptedException {
ServerSocket serverSocket=null;
serverSocket=new ServerSocket(DEFAULT_PORT);
System.out.println("啟動服務(wù),監(jiān)聽端口:"+DEFAULT_PORT);
while(true) { //case1: 增加循環(huán)权薯,允許循環(huán)接收請求
Socket socket = serverSocket.accept();
System.out.println("客戶端:" + socket.getPort() + "已連接");
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String clientStr = bufferedReader.readLine(); //讀取一行信息
System.out.println("客戶端發(fā)了一段消息:" + clientStr);
Thread.sleep(20000); //case2: 修改:增加等待時間
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
bufferedWriter.write("我已經(jīng)收到你的消息了\n");
bufferedWriter.flush(); //清空緩沖區(qū)觸發(fā)消息發(fā)送
}
}
}
接著姑躲,把BIOClientSocket復(fù)制兩份(client1、client2)盟蚣,同時向BIOServerSocket發(fā)起請求黍析。
運(yùn)行后看到的現(xiàn)象應(yīng)該是: client1先發(fā)送請求到Server端,由于Server端等待20s才返回屎开,導(dǎo)致client2的請求一直被阻塞阐枣。
這個情況會導(dǎo)致一個問題,如果服務(wù)端在同一個時刻只能處理一個客戶端的連接奄抽,而如果一個網(wǎng)站同時有1000個用戶訪問蔼两,那么剩下的999個用戶都需要等待,而這個等待的耗時取決于前面的請求的處理時長逞度,如圖4-2所示额划。
<center>圖4-2</center>
基于多線程優(yōu)化BIO
為了讓服務(wù)端能夠同時處理更多的客戶端連接,避免因為某個客戶端連接阻塞導(dǎo)致后續(xù)請求被阻塞档泽,于是引入多線程技術(shù)俊戳,代碼如下。
ServerSocket
public static void main(String[] args) throws IOException, InterruptedException {
final int DEFAULT_PORT=8080;
ServerSocket serverSocket=null;
serverSocket=new ServerSocket(DEFAULT_PORT);
System.out.println("啟動服務(wù)馆匿,監(jiān)聽端口:"+DEFAULT_PORT);
ExecutorService executorService= Executors.newFixedThreadPool(5);
while(true) {
Socket socket = serverSocket.accept();
executorService.submit(new SocketThread(socket));
}
}
SocketThread
public class SocketThread implements Runnable{
Socket socket;
public SocketThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
System.out.println("客戶端:" + socket.getPort() + "已連接");
try {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String clientStr = null; //讀取一行信息
clientStr = bufferedReader.readLine();
System.out.println("客戶端發(fā)了一段消息:" + clientStr);
Thread.sleep(20000);
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
bufferedWriter.write("我已經(jīng)收到你的消息了\n");
bufferedWriter.flush(); //清空緩沖區(qū)觸發(fā)消息發(fā)送
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
如圖4-3所示抑胎,當(dāng)引入了多線程之后,每個客戶端的鏈接(Socket)甜熔,我們可以直接給到線程池去執(zhí)行圆恤,而由于這個過程是異步的,所以并不會同步阻塞影響后續(xù)鏈接的監(jiān)聽,因此在一定程度上可以提升服務(wù)端鏈接的處理數(shù)量盆昙。
<center>圖4-3</center>
NIO非阻塞IO
使用多線程的方式來解決這個問題羽历,仍然有一個缺點,線程的數(shù)量取決于硬件配置淡喜,所以線程數(shù)量是有限的秕磷,如果請求量比較大的時候,線程本身會收到限制從而并發(fā)量也不會太高炼团。那怎么辦呢澎嚣,我們可以采用非阻塞IO。
NIO 從JDK1.4 提出的瘟芝,本意是New IO易桃,它的出現(xiàn)為了彌補(bǔ)原本IO的不足,提供了更高效的方式锌俱,提出一個通道(channel)的概念晤郑,在IO中它始終以流的形式對數(shù)據(jù)的傳輸和接受,下面我們演示一下NIO的使用贸宏。
NioServerSocket
public class NioServerSocket {
public static void main(String[] args) {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
//讀取數(shù)據(jù)
ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.read(buffer);
System.out.println(new String(buffer.array()));
//寫出數(shù)據(jù)
Thread.sleep(10000); //阻塞一段時間
//當(dāng)數(shù)據(jù)讀取到緩沖區(qū)之后造寝,接下來就需要把緩沖區(qū)的數(shù)據(jù)寫出到通道,而在寫出之前必須要調(diào)用flip方法吭练,實際上就是重置一個有效字節(jié)范圍诫龙,然后把這個數(shù)據(jù)接觸到通道。
buffer.flip();
socketChannel.write(buffer);//寫出數(shù)據(jù)
} else {
Thread.sleep(1000);
System.out.println("連接未就緒");
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
NioClientSocket
public class NioClientSocket {
public static void main(String[] args) {
try {
SocketChannel socketChannel= SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("localhost",8080));
if(socketChannel.isConnectionPending()){
socketChannel.finishConnect();
}
ByteBuffer byteBuffer= ByteBuffer.allocate(1024);
byteBuffer.put("Hello I'M SocketChannel Client".getBytes());
byteBuffer.flip();
socketChannel.write(byteBuffer);
//讀取服務(wù)端數(shù)據(jù)
byteBuffer.clear();
while(true) {
int i = socketChannel.read(byteBuffer);
if (i > 0) {
System.out.println("收到服務(wù)端的數(shù)據(jù):" + new String(byteBuffer.array()));
} else {
System.out.println("服務(wù)端數(shù)據(jù)未準(zhǔn)備好");
Thread.sleep(1000);
}
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
所謂的NIO(非阻塞IO)鲫咽,其實就是取消了IO阻塞和連接阻塞签赃,當(dāng)服務(wù)端不存在阻塞的時候,就可以不斷輪詢處理客戶端的請求浑侥,如圖4-4所示姊舵,表示NIO下的運(yùn)行流程。
<center>圖4-4</center>
上述這種NIO的使用方式寓落,仍然存在一個問題括丁,就是客戶端或者服務(wù)端需要通過一個線程不斷輪詢才能獲得結(jié)果,而這個輪詢過程中會浪費(fèi)線程資源伶选。
多路復(fù)用IO
大家站在全局的角度再思考一下整個過程史飞,有哪些地方可以優(yōu)化呢?
我們回到NIOClientSocket中下面這段代碼仰税,當(dāng)客戶端通過read
方法去讀取服務(wù)端返回的數(shù)據(jù)時构资,如果此時服務(wù)端數(shù)據(jù)未準(zhǔn)備好,對于客戶端來說就是一次無效的輪詢陨簇。
我們能不能夠設(shè)計成吐绵,當(dāng)客戶端調(diào)用read
方法之后,不僅僅不阻塞,同時也不需要輪詢己单。而是等到服務(wù)端的數(shù)據(jù)就緒之后唉窃, 告訴客戶端。然后客戶端再去讀取服務(wù)端返回的數(shù)據(jù)呢纹笼?
就像點外賣一樣纹份,我們在網(wǎng)上下單之后,繼續(xù)做其他事情廷痘,等到外賣到了公司蔓涧,外賣小哥主動打電話告訴你,你直接去前臺取餐即可笋额。
while(true) {
int i = socketChannel.read(byteBuffer);
if (i > 0) {
System.out.println("收到服務(wù)端的數(shù)據(jù):" + new String(byteBuffer.array()));
} else {
System.out.println("服務(wù)端數(shù)據(jù)未準(zhǔn)備好");
Thread.sleep(1000);
}
}
所以為了優(yōu)化這個問題元暴,引入了多路復(fù)用機(jī)制。
I/O多路復(fù)用的本質(zhì)是通過一種機(jī)制(系統(tǒng)內(nèi)核緩沖I/O數(shù)據(jù))兄猩,讓單個進(jìn)程可以監(jiān)視多個文件描述符昨寞,一旦某個描述符就緒(一般是讀就緒或?qū)懢途w),能夠通知程序進(jìn)行相應(yīng)的讀寫操作
什么是fd:在linux中厦滤,內(nèi)核把所有的外部設(shè)備都當(dāng)成是一個文件來操作,對一個文件的讀寫會調(diào)用內(nèi)核提供的系統(tǒng)命令歼狼,返回一個fd(文件描述符)掏导。而對于一個socket的讀寫也會有相應(yīng)的文件描述符,成為socketfd羽峰。
常見的IO多路復(fù)用方式有【select趟咆、poll、epoll】梅屉,都是Linux API提供的IO復(fù)用方式值纱,那么接下來重點講一下select、和epoll這兩個模型
-
select:進(jìn)程可以通過把一個或者多個fd傳遞給select系統(tǒng)調(diào)用坯汤,進(jìn)程會阻塞在select操作上虐唠,這樣select可以幫我們檢測多個fd是否處于就緒狀態(tài),這個模式有兩個缺點
- 由于他能夠同時監(jiān)聽多個文件描述符惰聂,假如說有1000個疆偿,這個時候如果其中一個fd 處于就緒狀態(tài)了,那么當(dāng)前進(jìn)程需要線性輪詢所有的fd搓幌,也就是監(jiān)聽的fd越多杆故,性能開銷越大。
- 同時溉愁,select在單個進(jìn)程中能打開的fd是有限制的处铛,默認(rèn)是1024,對于那些需要支持單機(jī)上萬的TCP連接來說確實有點少
epoll:linux還提供了epoll的系統(tǒng)調(diào)用,epoll是基于事件驅(qū)動方式來代替順序掃描撤蟆,因此性能相對來說更高奕塑,主要原理是,當(dāng)被監(jiān)聽的fd中枫疆,有fd就緒時爵川,會告知當(dāng)前進(jìn)程具體哪一個fd就緒,那么當(dāng)前進(jìn)程只需要去從指定的fd上讀取數(shù)據(jù)即可息楔,另外寝贡,epoll所能支持的fd上線是操作系統(tǒng)的最大文件句柄,這個數(shù)字要遠(yuǎn)遠(yuǎn)大于1024
【由于epoll能夠通過事件告知應(yīng)用進(jìn)程哪個fd是可讀的值依,所以我們也稱這種IO為異步非阻塞IO圃泡,當(dāng)然它是偽異步的,因為它還需要去把數(shù)據(jù)從內(nèi)核同步復(fù)制到用戶空間中愿险,真正的異步非阻塞颇蜡,應(yīng)該是數(shù)據(jù)已經(jīng)完全準(zhǔn)備好了,我只需要從用戶空間讀就行】
I/O多路復(fù)用的好處是可以通過把多個I/O的阻塞復(fù)用到同一個select的阻塞上辆亏,從而使得系統(tǒng)在單線程的情況下可以同時處理多個客戶端請求风秤。它的最大優(yōu)勢是系統(tǒng)開銷小,并且不需要創(chuàng)建新的進(jìn)程或者線程扮叨,降低了系統(tǒng)的資源開銷缤弦,它的整體實現(xiàn)思想如圖4-5所示。
客戶端請求到服務(wù)端后彻磁,此時客戶端在傳輸數(shù)據(jù)過程中碍沐,為了避免Server端在read客戶端數(shù)據(jù)過程中阻塞,服務(wù)端會把該請求注冊到Selector復(fù)路器上衷蜓,服務(wù)端此時不需要等待累提,只需要啟動一個線程,通過selector.select()阻塞輪詢復(fù)路器上就緒的channel即可磁浇,也就是說斋陪,如果某個客戶端連接數(shù)據(jù)傳輸完成,那么select()方法會返回就緒的channel扯夭,然后執(zhí)行相關(guān)的處理即可鳍贾。
<center>圖4-5</center>
NIOServer的實現(xiàn)如下
測試訪問的時候,直接在cmd中通過telnet連接NIOServer交洗,便可發(fā)送信息骑科。
public class NIOServer implements Runnable{
Selector selector;
ServerSocketChannel serverSocketChannel;
public NIOServer(int port) throws IOException {
selector=Selector.open(); //多路復(fù)用器
serverSocketChannel=ServerSocketChannel.open();
//綁定監(jiān)聽端口
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);//非阻塞配置
//針對serverSocketChannel注冊一個ACCEPT連接監(jiān)聽事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
@Override
public void run() {
while(!Thread.interrupted()){
try {
selector.select(); //阻塞等待事件就緒
Set selected=selector.selectedKeys(); //得到事件列表
Iterator it=selected.iterator();
while(it.hasNext()){
dispatch((SelectionKey) it.next()); //分發(fā)事件
it.remove(); //移除當(dāng)前時間
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void dispatch(SelectionKey key) throws IOException {
if(key.isAcceptable()){ //如果是客戶端的連接事件,則需要針對該連接注冊讀寫事件
register(key);
}else if(key.isReadable()){
read(key);
}else if(key.isWritable()){
write(key);
}
}
private void register(SelectionKey key) throws IOException {
//得到事件對應(yīng)的連接
ServerSocketChannel server=(ServerSocketChannel)key.channel();
SocketChannel channel=server.accept(); //獲得客戶端的鏈接
channel.configureBlocking(false);
//把當(dāng)前客戶端連接注冊到selector上构拳,注冊事件為READ咆爽,
// 也就是當(dāng)前channel可讀時梁棠,就會觸發(fā)事件,然后讀取客戶端的數(shù)據(jù)
channel.register(this.selector,SelectionKey.OP_READ);
}
private void read(SelectionKey key) throws IOException {
SocketChannel channel=(SocketChannel)key.channel();
ByteBuffer byteBuffer= ByteBuffer.allocate(1024);
channel.read(byteBuffer); //把數(shù)據(jù)從channel讀取到緩沖區(qū)
System.out.println("server receive msg:"+new String(byteBuffer.array()));
}
private void write(SelectionKey key) throws IOException {
SocketChannel channel=(SocketChannel)key.channel();
//寫一個信息給到客戶端
channel.write(ByteBuffer.wrap("hello Client,I'm NIO Server\r\n".getBytes()));
}
public static void main(String[] args) throws IOException {
NIOServer server=new NIOServer(8888);
new Thread(server).start();
}
}
事實上NIO已經(jīng)解決了上述BIO暴露的下面兩個問題:
- 同步阻塞IO斗埂,讀寫阻塞符糊,線程等待時間過長。
- 在制定線程策略的時候呛凶,只能根據(jù)CPU的數(shù)目來限定可用線程資源男娄,不能根據(jù)連接并發(fā)數(shù)目來制定,也就是連接有限制漾稀。否則很難保證對客戶端請求的高效和公平模闲。
到這里為止,通過NIO的多路復(fù)用機(jī)制崭捍,解決了IO阻塞導(dǎo)致客戶端連接處理受限的問題尸折,服務(wù)端只需要一個線程就可以維護(hù)多個客戶端,并且客戶端的某個連接如果準(zhǔn)備就緒時殷蛇,會通過事件機(jī)制告訴應(yīng)用程序某個channel可用实夹,應(yīng)用程序通過select方法選出就緒的channel進(jìn)行處理。
單線程Reactor 模型(高性能I/O設(shè)計模式)
了解了NIO多路復(fù)用后粒梦,就有必要再和大家說一下Reactor多路復(fù)用高性能I/O設(shè)計模式亮航,Reactor本質(zhì)上就是基于NIO多路復(fù)用機(jī)制提出的一個高性能IO設(shè)計模式,它的核心思想是把響應(yīng)IO事件和業(yè)務(wù)處理進(jìn)行分離匀们,通過一個或者多個線程來處理IO事件塞赂,然后將就緒得到事件分發(fā)到業(yè)務(wù)處理handlers線程去異步非阻塞處理,如圖4-6所示昼蛀。
Reactor模型有三個重要的組件:
- Reactor :將I/O事件發(fā)派給對應(yīng)的Handler
- Acceptor :處理客戶端連接請求
- Handlers :執(zhí)行非阻塞讀/寫
<center>圖4-6</center>
下面演示一個單線程的Reactor模型。
Reactor
Reactor 負(fù)責(zé)響應(yīng)IO事件圆存,一旦發(fā)生叼旋,廣播發(fā)送給相應(yīng)的Handler去處理。
public class Reactor implements Runnable{
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;
public Reactor(int port) throws IOException {
//創(chuàng)建選擇器
selector= Selector.open();
//創(chuàng)建NIO-Server
serverSocketChannel=ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
SelectionKey key=serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 綁定一個附加對象
key.attach(new Acceptor(selector,serverSocketChannel));
}
@Override
public void run() {
while(!Thread.interrupted()){
try {
selector.select(); //阻塞等待就緒事件
Set selectionKeys=selector.selectedKeys();
Iterator it=selectionKeys.iterator();
while(it.hasNext()){
dispatch((SelectionKey) it.next());
it.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void dispatch(SelectionKey key){
//調(diào)用之前注冊時附加的對象沦辙,也就是attach附加的acceptor
Runnable r=(Runnable)key.attachment();
if(r!=null){
r.run();
}
}
public static void main(String[] args) throws IOException {
new Thread(new Reactor(8888)).start();
}
}
Acceptor
public class Acceptor implements Runnable{
private Selector selector;
private ServerSocketChannel serverSocketChannel;
public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
this.selector = selector;
this.serverSocketChannel = serverSocketChannel;
}
@Override
public void run() {
SocketChannel channel;
try {
channel=serverSocketChannel.accept();
System.out.println(channel.getRemoteAddress()+": 收到一個客戶端連接");
channel.configureBlocking(false);
//當(dāng)channel連接中數(shù)據(jù)就緒時夫植,調(diào)用DispatchHandler來處理channel
//巧妙使用了SocketChannel的attach功能,將Hanlder和可能會發(fā)生事件的channel鏈接在一起油讯,當(dāng)發(fā)生事件時详民,可以立即觸發(fā)相應(yīng)鏈接的Handler。
channel.register(selector, SelectionKey.OP_READ,new DispatchHandler(channel));
} catch (IOException e) {
e.printStackTrace();
}
}
}
Handler
public class DispatchHandler implements Runnable{
private SocketChannel channel;
public DispatchHandler(SocketChannel channel) {
this.channel = channel;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"---handler"); //case: 打印當(dāng)前線程名稱陌兑,證明I/O是同一個線程來處理沈跨。
ByteBuffer buffer=ByteBuffer.allocate(1024);
int len=0,total=0;
String msg="";
try {
do {
len = channel.read(buffer);
if (len > 0) {
total += len;
msg += new String(buffer.array());
}
buffer.clear();
} while (len > buffer.capacity());
System.out.println(channel.getRemoteAddress()+":Server Receive msg:"+msg);
}catch (Exception e){
e.printStackTrace();
if(channel!=null){
try {
channel.close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
}
}
演示方式,通過window的cmd窗口兔综,使用telnet 192.168.1.102 8888 連接到Server端進(jìn)行數(shù)據(jù)通信饿凛;也可以通過下面這樣一個客戶端程序來訪問狞玛。
ReactorClient
public class ReactorClient {
private static Selector selector;
public static void main(String[] args) throws IOException {
selector=Selector.open();
//創(chuàng)建一個連接通道連接指定的server
SocketChannel socketChannel= SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("192.168.1.102",8888));
socketChannel.register(selector, SelectionKey.OP_CONNECT);
while(true){
selector.select();
Set<SelectionKey> selectionKeys=selector.selectedKeys();
Iterator<SelectionKey> iterator=selectionKeys.iterator();
while(iterator.hasNext()){
SelectionKey key=iterator.next();
iterator.remove();
if(key.isConnectable()){
handleConnection(key);
}else if(key.isReadable()){
handleRead(key);
}
}
}
}
private static void handleConnection(SelectionKey key) throws IOException {
SocketChannel socketChannel=(SocketChannel)key.channel();
if(socketChannel.isConnectionPending()){
socketChannel.finishConnect();
}
socketChannel.configureBlocking(false);
while(true) {
Scanner in = new Scanner(System.in);
String msg = in.nextLine();
socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
socketChannel.register(selector,SelectionKey.OP_READ);
}
}
private static void handleRead(SelectionKey key) throws IOException {
SocketChannel channel=(SocketChannel)key.channel();
ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
channel.read(byteBuffer);
System.out.println("client receive msg:"+new String(byteBuffer.array()));
}
}
這是最基本的單Reactor單線程模型(整體的I/O操作是由同一個線程完成的)。
其中Reactor線程涧窒,負(fù)責(zé)多路分離套接字心肪,有新連接到來觸發(fā)connect 事件之后,交由Acceptor進(jìn)行處理纠吴,有IO讀寫事件之后交給hanlder 處理硬鞍。
Acceptor主要任務(wù)就是構(gòu)建handler ,在獲取到和client相關(guān)的SocketChannel之后 戴已,綁定到相應(yīng)的hanlder上固该,對應(yīng)的SocketChannel有讀寫事件之后,基于racotor 分發(fā),hanlder就可以處理了(所有的IO事件都綁定到selector上恭陡,有Reactor分發(fā))
Reactor 模式本質(zhì)上指的是使用 I/O 多路復(fù)用(I/O multiplexing) + 非阻塞 I/O(non-blocking I/O)的模式蹬音。
多線程單Reactor模型
單線程Reactor這種實現(xiàn)方式有存在著缺點,從實例代碼中可以看出休玩,handler的執(zhí)行是串行的著淆,如果其中一個handler處理線程阻塞將導(dǎo)致其他的業(yè)務(wù)處理阻塞。由于handler和reactor在同一個線程中的執(zhí)行拴疤,這也將導(dǎo)致新的無法接收新的請求永部,我們做一個小實驗:
- 在上述Reactor代碼的DispatchHandler的run方法中,增加一個Thread.sleep()呐矾。
- 打開多個客戶端窗口連接到Reactor Server端苔埋,其中一個窗口發(fā)送一個信息后被阻塞,另外一個窗口再發(fā)信息時由于前面的請求阻塞導(dǎo)致后續(xù)請求無法被處理蜒犯。
為了解決這種問題组橄,有人提出使用多線程的方式來處理業(yè)務(wù)钳踊,也就是在業(yè)務(wù)處理的地方加入線程池異步處理糟趾,將reactor和handler在不同的線程來執(zhí)行企孩,如圖4-7所示桶唐。
<center>圖4-7</center>
多線程改造-MultiDispatchHandler
我們直接將4.2.5小節(jié)中的Reactor單線程模型改成多線程舌界,其實我們就是把IO阻塞的問題通過異步的方式做了優(yōu)化呛伴,代碼如下忧换,
public class MultiDispatchHandler implements Runnable{
private SocketChannel channel;
public MultiDispatchHandler(SocketChannel channel) {
this.channel = channel;
}
private static Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() << 1);
@Override
public void run() {
processor();
}
private void processor(){
executor.execute(new ReaderHandler(channel));
}
public static class ReaderHandler implements Runnable{
private SocketChannel channel;
public ReaderHandler(SocketChannel socketChannel) {
this.channel = socketChannel;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"---handler"); //case: 打印當(dāng)前線程名稱嫩码,證明I/O是同一個線程來處理潮改。
ByteBuffer buffer= ByteBuffer.allocate(1024);
int len=0;
String msg="";
try {
do {
len = channel.read(buffer);
if (len > 0) {
msg += new String(buffer.array());
}
buffer.clear();
} while (len > buffer.capacity());
if(len>0) {
System.out.println(channel.getRemoteAddress() + ":Server Receive msg:" + msg);
}
}catch (Exception e){
e.printStackTrace();
if(channel!=null){
try {
channel.close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
}
}
}
Acceptor
public class Acceptor implements Runnable{
private Selector selector;
private ServerSocketChannel serverSocketChannel;
public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
this.selector = selector;
this.serverSocketChannel = serverSocketChannel;
}
@Override
public void run() {
SocketChannel channel;
try {
channel=serverSocketChannel.accept();
System.out.println(channel.getRemoteAddress()+": 收到一個客戶端連接");
channel.configureBlocking(false);
//當(dāng)channel連接中數(shù)據(jù)就緒時狭郑,調(diào)用DispatchHandler來處理channel
//巧妙使用了SocketChannel的attach功能,將Hanlder和可能會發(fā)生事件的channel鏈接在一起汇在,當(dāng)發(fā)生事件時翰萨,可以立即觸發(fā)相應(yīng)鏈接的Handler。
channel.register(selector, SelectionKey.OP_READ,new MultiDispatchHandler(channel));
} catch (IOException e) {
e.printStackTrace();
}
}
}
多線程Reactor總結(jié)
在多線程Reactor模型中糕殉,添加了一個工作者線程池缨历,并將非I/O操作從Reactor線程中移出轉(zhuǎn)交給工作者線程池來執(zhí)行以蕴。這樣能夠提高Reactor線程的I/O響應(yīng),不至于因為一些耗時的業(yè)務(wù)邏輯而延遲對后面I/O請求的處理辛孵。
多Reactor多線程模式(主從多Reactor模型)
在多線程單Reactor模型中丛肮,我們發(fā)現(xiàn)所有的I/O操作是由一個Reactor來完成,而Reactor運(yùn)行在單個線程中魄缚,它需要處理包括Accept()
/read()
/write
/connect
操作宝与,對于小容量的場景,影響不大冶匹。但是對于高負(fù)載习劫、大并發(fā)或大數(shù)據(jù)量的應(yīng)用場景時,容易成為瓶頸嚼隘,主要原因如下:
- 一個NIO線程同時處理成百上千的鏈路诽里,性能上無法支撐,即便NIO線程的CPU負(fù)荷達(dá)到100%飞蛹,也無法滿足海量消息的讀取和發(fā)送谤狡;
- 當(dāng)NIO線程負(fù)載過重之后,處理速度將變慢卧檐,這會導(dǎo)致大量客戶端連接超時墓懂,超時之后往往會進(jìn)行重發(fā),這更加重了NIO線程的負(fù)載霉囚,最終會導(dǎo)致大量消息積壓和處理超時捕仔,成為系統(tǒng)的性能瓶頸;
所以盈罐,我們還可以更進(jìn)一步優(yōu)化榜跌,引入多Reactor多線程模式,如圖4-8所示盅粪,Main Reactor負(fù)責(zé)接收客戶端的連接請求斜做,然后把接收到的請求傳遞給SubReactor(其中subReactor可以有多個),具體的業(yè)務(wù)IO處理由SubReactor完成湾揽。
Multiple Reactors 模式通常也可以等同于 Master-Workers 模式,比如 Nginx 和 Memcached 等就是采用這種多線程模型笼吟,雖然不同的項目實現(xiàn)細(xì)節(jié)略有區(qū)別库物,但總體來說模式是一致的。
<center>圖4-8</center>
Acceptor贷帮,請求接收者戚揭,在實踐時其職責(zé)類似服務(wù)器,并不真正負(fù)責(zé)連接請求的建立撵枢,而只將其請求委托 Main Reactor 線程池來實現(xiàn)民晒,起到一個轉(zhuǎn)發(fā)的作用精居。
Main Reactor,主 Reactor 線程組潜必,主要負(fù)責(zé)連接事件靴姿,并將IO讀寫請求轉(zhuǎn)發(fā)到 SubReactor 線程池。
Sub Reactor磁滚,Main Reactor 通常監(jiān)聽客戶端連接后會將通道的讀寫轉(zhuǎn)發(fā)到 Sub Reactor 線程池中一個線程(負(fù)載均衡)佛吓,負(fù)責(zé)數(shù)據(jù)的讀寫。在 NIO 中 通常注冊通道的讀(OP_READ)垂攘、寫事件(OP_WRITE)维雇。
MultiplyReactor
public class MultiplyReactor {
public static void main(String[] args) throws IOException {
MultiplyReactor mr = new MultiplyReactor(8888);
mr.start();
}
private static final int POOL_SIZE = Runtime.getRuntime().availableProcessors();
// Reactor(Selector) 線程池,其中一個線程被 mainReactor 使用晒他,剩余線程都被 subReactor 使用
static Executor mainReactorExecutor = Executors.newFixedThreadPool(POOL_SIZE);
// 主 Reactor吱型,接收連接,把 SocketChannel 注冊到從 Reactor 上
private Reactor mainReactor;
private int port;
public MultiplyReactor(int port) {
try {
this.port = port;
mainReactor = new Reactor();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 啟動主從 Reactor陨仅,初始化并注冊 Acceptor 到主 Reactor
*/
public void start() throws IOException {
new Acceptor(mainReactor.getSelector(), port); // 將 ServerSocketChannel 注冊到 mainReactor
mainReactorExecutor.execute(mainReactor); //使用線程池來處理main Reactor的連接請求
}
}
Reactor
public class Reactor implements Runnable{
private ConcurrentLinkedQueue<AsyncHandler> events=new ConcurrentLinkedQueue<>();
private final Selector selector;
public Reactor() throws IOException {
this.selector = Selector.open();
}
public Selector getSelector(){
return selector;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
AsyncHandler handler;
while ((handler = events.poll()) != null) {
handler.getChannel().configureBlocking(false);
SelectionKey sk=handler.getChannel().register(selector, SelectionKey.OP_READ);
sk.attach(handler);
handler.setSk(sk);
}
selector.select(); //阻塞
Set<SelectionKey> selectionKeys=selector.selectedKeys();
Iterator<SelectionKey> it=selectionKeys.iterator();
while(it.hasNext()){
SelectionKey key=it.next();
//獲取attach方法傳入的附加對象
Runnable runnable=(Runnable)key.attachment();
if(runnable!=null){
runnable.run();
}
it.remove();
}
}
}catch (Exception e){
e.printStackTrace();
}
}
public void register(AsyncHandler asyncHandler){
events.offer(asyncHandler);
selector.wakeup();
}
}
Acceptor
public class Acceptor implements Runnable{
final Selector sel;
final ServerSocketChannel serverSocket;
int handleNext = 0;
private final int POOL_SIZE=Runtime.getRuntime().availableProcessors();
private Executor subReactorExecutor= Executors.newFixedThreadPool(POOL_SIZE);
private Reactor[] subReactors=new Reactor[POOL_SIZE-1];
public Acceptor(Selector sel, int port) throws IOException {
this.sel = sel;
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port)); // 綁定端口
// 設(shè)置成非阻塞模式
serverSocket.configureBlocking(false);
// 注冊到 選擇器 并設(shè)置處理 socket 連接事件
serverSocket.register(sel, SelectionKey.OP_ACCEPT,this);
init();
System.out.println("mainReactor-" + "Acceptor: Listening on port: " + port);
}
public void init() throws IOException {
for (int i = 0; i < subReactors.length; i++) {
subReactors[i]=new Reactor();
subReactorExecutor.execute(subReactors[i]);
}
}
@Override
public synchronized void run() {
try {
// 接收連接津滞,非阻塞模式下,沒有連接直接返回 null
SocketChannel sc = serverSocket.accept();
if (sc != null) {
// 把提示發(fā)到界面
sc.write(ByteBuffer.wrap("Multiply Reactor Pattern Example\r\nreactor> ".getBytes()));
System.out.println(Thread.currentThread().getName()+":Main-Reactor-Acceptor: " + sc.socket().getLocalSocketAddress() +" 注冊到 subReactor-" + handleNext);
// 如何解決呢掂名,直接調(diào)用 wakeup据沈,有可能還沒有注冊成功又阻塞了。這是一個多線程同步的問題饺蔑,可以借助隊列進(jìn)行處理
Reactor subReactor = subReactors[handleNext];
subReactor.register(new AsyncHandler(sc));
if(++handleNext == subReactors.length) {
handleNext = 0;
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
AsyncHandler
public class AsyncHandler implements Runnable{
private SocketChannel channel;
private SelectionKey sk;
ByteBuffer inputBuffer=ByteBuffer.allocate(1024);
ByteBuffer outputBuffer=ByteBuffer.allocate(1024);
StringBuilder builder=new StringBuilder(); //存儲客戶端的完整消息
public AsyncHandler(SocketChannel channel){
this.channel=channel;
}
public SocketChannel getChannel() {
return channel;
}
public void setSk(SelectionKey sk) {
this.sk = sk;
}
@Override
public void run() {
try {
if (sk.isReadable()) {
read();
} else if (sk.isWritable()) {
write();
}
}catch (Exception e){
try {
this.sk.channel().close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
protected void read() throws IOException {
inputBuffer.clear();
int n=channel.read(inputBuffer);
if(inputBufferComplete(n)){
System.out.println(Thread.currentThread().getName()+":Server端收到客戶端的請求消息:"+builder.toString());
outputBuffer.put(builder.toString().getBytes(StandardCharsets.UTF_8));
this.sk.interestOps(SelectionKey.OP_WRITE); //更改服務(wù)的邏輯狀態(tài)以及處理的事件類型
}
}
private boolean inputBufferComplete(int bytes) throws EOFException {
if(bytes>0){
inputBuffer.flip(); //轉(zhuǎn)化成讀取模式
while(inputBuffer.hasRemaining()){ //判斷緩沖區(qū)中是否還有元素
byte ch=inputBuffer.get(); //得到輸入的字符
if(ch==3){ //表示Ctrl+c 關(guān)閉連接
throw new EOFException();
}else if(ch=='\r'||ch=='\n'){ //表示換行符
return true;
}else{
builder.append((char)ch); //拼接讀取到的數(shù)據(jù)
}
}
}else if(bytes==-1){
throw new EOFException(); //客戶端關(guān)閉了連接
}
return false;
}
private void write() throws IOException {
int written=-1;
outputBuffer.flip(); //轉(zhuǎn)化為讀模式锌介,判斷是否有數(shù)據(jù)需要發(fā)送
if(outputBuffer.hasRemaining()){
written=channel.write(outputBuffer); //把數(shù)據(jù)寫回客戶端
}
outputBuffer.clear();
builder.delete(0,builder.length());
if(written<=0){ //表示客戶端沒有輸信息
this.sk.channel().close();
}else{
channel.write(ByteBuffer.wrap("\r\nreactor>".getBytes()));
this.sk.interestOps(SelectionKey.OP_READ);
}
}
}