? ? ? ?Netty是一個(gè)高效穩(wěn)定的NIO應(yīng)用通信框架,筆者在本專題將帶領(lǐng)大家分析Netty底層源碼蔫浆,徹底理解底層通信原理姐叁。
注意外潜,本專題只適宜了解java多線程和java io知識(shí)的小伙伴閱讀。
IO
? ? ? ?在計(jì)算機(jī)系統(tǒng)中I/O就是輸入(Input)和輸出(Output)的意思嘱吗,針對(duì)不同的操作對(duì)象谒麦,可以劃分為磁盤I/O模型哆致,網(wǎng)絡(luò)I/O模型摊阀,內(nèi)存映射I/O, Direct I/O、數(shù)據(jù)庫I/O等城丧,只要具有輸入輸出類型的交互系統(tǒng)都可以認(rèn)為是I/O系統(tǒng)亡哄,也可以說I/O是整個(gè)操作系統(tǒng)數(shù)據(jù)交換與人機(jī)交互的通道布疙,這個(gè)概念與選用的開發(fā)語言沒有關(guān)系灵临,是一個(gè)通用的概念。
? ? ? ?在如今的系統(tǒng)中I/O卻擁有很重要的位置宦焦,現(xiàn)在系統(tǒng)都有可能處理大量文件波闹,大量數(shù)據(jù)庫操作,而這些操作都依賴于系統(tǒng)的I/O性能孵淘,也就造成了現(xiàn)在系統(tǒng)的瓶頸往往都是由于I/O性能造成的瘫证。因此庄撮,為了解決磁盤I/O性能慢的問題洞斯,系統(tǒng)架構(gòu)中添加了緩存來提高響應(yīng)速度;或者有些高端服務(wù)器從硬件級(jí)入手扭仁,使用了固態(tài)硬盤(SSD)來替換傳統(tǒng)機(jī)械硬盤;在大數(shù)據(jù)方面刀闷,Spark越來越多的承擔(dān)了實(shí)時(shí)性計(jì)算任務(wù)仰迁,而傳統(tǒng)的Hadoop體系則大多應(yīng)用在了離線計(jì)算與大量數(shù)據(jù)存儲(chǔ)的場景徐许,這也是由于磁盤I/O性能遠(yuǎn)不如內(nèi)存I/O性能而造成的格局(Spark更多的使用了內(nèi)存雌隅,而MapReduece更多的使用了磁盤)。因此修械,一個(gè)系統(tǒng)的優(yōu)化空間肯污,往往都在低效率的I/O環(huán)節(jié)上,很少看到一個(gè)系統(tǒng)CPU哄芜、內(nèi)存的性能是其整個(gè)系統(tǒng)的瓶頸忠烛。也正因?yàn)槿绱巳ǘ海琂ava在I/O上也一直在做持續(xù)的優(yōu)化斟薇,從JDK 1.4開始便引入了NIO模型堪滨,大大的提高了以往BIO模型下的操作效率。
BIO遏乔、NIO盟萨、AIO
BIO (Blocking I/O):同步阻塞I/O模式了讨,數(shù)據(jù)的讀取寫入必須阻塞在一個(gè)線程內(nèi)等待其完成前计。這里使用那個(gè)經(jīng)典的燒開水例子男杈,這里假設(shè)一個(gè)燒開水的場景,有一排水壺在燒開水旺垒,BIO的工作模式就是袖牙, 叫一個(gè)線程停留在一個(gè)水壺那舅锄,直到這個(gè)水壺?zé)_,才去處理下一個(gè)水壺坦仍。但是實(shí)際上線程在等待水壺?zé)_的時(shí)間段什么都沒有做繁扎。
NIO (New I/O):同時(shí)支持阻塞與非阻塞模式糊闽,但這里我們以其同步非阻塞I/O模式來說明右犹,那么什么叫做同步非阻塞念链?如果還拿燒開水來說,NIO的做法是叫一個(gè)線程不斷的輪詢每個(gè)水壺的狀態(tài)谦纱,看看是否有水壺的狀態(tài)發(fā)生了改變跨嘉,從而進(jìn)行下一步的操作偿荷。
AIO ( Asynchronous I/O):異步非阻塞I/O模型唠椭。異步非阻塞與同步非阻塞的區(qū)別在哪里贪嫂?異步非阻塞無需一個(gè)線程去輪詢所有IO操作的狀態(tài)改變力崇,在相應(yīng)的狀態(tài)改變后赢织,系統(tǒng)會(huì)通知對(duì)應(yīng)的線程來處理于置。對(duì)應(yīng)到燒開水中就是,為每個(gè)水壺上面裝了一個(gè)開關(guān)瞄桨,水燒開之后讶踪,水壺會(huì)自動(dòng)通知我水燒開了乳讥。
進(jìn)程中的IO調(diào)用
進(jìn)程中的IO調(diào)用步驟大致可以分為以下四步:
? ?1. 進(jìn)程向操作系統(tǒng)請(qǐng)求數(shù)據(jù) ;
? ?2. 操作系統(tǒng)把外部數(shù)據(jù)加載到內(nèi)核的緩沖區(qū)中;
? ?3. 操作系統(tǒng)把內(nèi)核的緩沖區(qū)拷貝到進(jìn)程的緩沖區(qū) ;
? ?4. 進(jìn)程獲得數(shù)據(jù)完成自己的功能 ;
? ? ? ?當(dāng)操作系統(tǒng)在把外部數(shù)據(jù)放到進(jìn)程緩沖區(qū)的這段時(shí)間(即上述的第二云石,三步)留晚,如果應(yīng)用進(jìn)程是掛起等待的,那么就是同步IO奖地,反之参歹,就是異步IO犬庇,也就是AIO 臭挽。
異步咬腕、異步涨共、阻塞举反、非阻塞
- 同步阻塞I/O(BIO):
? ?? ?同步阻塞I/O火鼻,服務(wù)器實(shí)現(xiàn)模式為一個(gè)連接一個(gè)線程瘦陈,即客戶端有連接請(qǐng)求時(shí)服務(wù)器就需要啟動(dòng)一個(gè)線程進(jìn)行處理晨逝,如果這個(gè)連接不做任何事情會(huì)造成不必要的線程開銷捉貌,可以通過線程池機(jī)制來改善趁窃。BIO方式適用于連接數(shù)目比較小且固定的架構(gòu)醒陆,這種方式對(duì)服務(wù)端資源要求比較高裆针,并發(fā)局限于應(yīng)用中世吨,在jdk1.4以前是唯一的io現(xiàn)在耘婚,但程序直觀簡單易理解 - 同步非阻塞I/O(NIO):
? ?? ?同步非阻塞I/O沐祷,服務(wù)器實(shí)現(xiàn)模式為一個(gè)請(qǐng)求一個(gè)線程,即客戶端發(fā)送的連接請(qǐng)求都會(huì)注冊(cè)到多路復(fù)用器上胞锰,多路復(fù)用器輪詢到連接有IO請(qǐng)求時(shí)才啟動(dòng)一個(gè)線程進(jìn)行處理胜蛉。NIO方式適用于連接數(shù)目多且連接比較短(輕操作)的架構(gòu),比如聊天服務(wù)器暖璧,并發(fā)局限于應(yīng)用中澎办,編程比較復(fù)雜局蚀,jdk1,4開始支持 - 異步非阻塞I/O(AIO):
? ?? ?異步非阻塞I/O,服務(wù)器實(shí)現(xiàn)模式為一個(gè)有效請(qǐng)求一個(gè)線程扶欣,客戶端的IO請(qǐng)求都是由操作系統(tǒng)先完成了再通知服務(wù)器用其啟動(dòng)線程進(jìn)行處理料祠。AIO方式適用于連接數(shù)目多且連接比較長(重操作)的架構(gòu)髓绽,比如相冊(cè)服務(wù)器顺呕,充分調(diào)用OS參與并發(fā)操作塘匣,編程比較復(fù)雜巷帝,jdk1.7開始支持楞泼。 - IO與NIO區(qū)別:
- IO面向流堕阔,NIO面向緩沖區(qū)
- IO的各種流是阻塞的超陆,NIO是非阻塞模式
? ?? ?Java NIO的選擇允許一個(gè)單獨(dú)的線程來監(jiān)視多個(gè)輸入通道时呀,可以注冊(cè)多個(gè)通道使用一個(gè)選擇器谨娜,然后使用一個(gè)單獨(dú)的線程來“選擇”通道:這些通道里已經(jīng)有可以處理的輸入或選擇已準(zhǔn)備寫入的通道趴梢。這種選擇機(jī)制币他,使得一個(gè)單獨(dú)的線程很容易來管理多個(gè)通道
- 同步與異步的區(qū)別:
? ?? ?同步:發(fā)送一個(gè)請(qǐng)求蝴悉,等待返回辫封,再發(fā)送下一個(gè)請(qǐng)求倦微,同步可以避免出現(xiàn)死鎖欣福,臟讀的發(fā)生
? ?? ?異步:發(fā)送一個(gè)請(qǐng)求拓劝,不等待返回郑临,隨時(shí)可以再發(fā)送下一個(gè)請(qǐng)求厢洞,可以提高效率躺翻,保證并發(fā)
- 同步異步關(guān)注點(diǎn)在于消息通信機(jī)制公你,阻塞與非阻塞關(guān)注的是程序在等待調(diào)用結(jié)果時(shí)(消息陕靠、返回值)的狀態(tài)剪芥。阻塞調(diào)用是指調(diào)用結(jié)果返回之前粗俱,當(dāng)前線程會(huì)被掛起局嘁。調(diào)用線程只有在得到結(jié)果之后才會(huì)返回厚满。非阻塞調(diào)用指在不能立刻得到結(jié)果之前邦鲫,該調(diào)用不會(huì)阻塞當(dāng)前線程
- 不同層次:
CPU層次:操作系統(tǒng)進(jìn)行IO或任務(wù)調(diào)度層次庆捺,現(xiàn)代操作系統(tǒng)通常使用異步非阻塞方式進(jìn)行IO(有少部分IO可能會(huì)使用同步非阻塞)滔以,即發(fā)出IO請(qǐng)求后你画,并不等待IO操作完成坏匪,而是繼續(xù)執(zhí)行接下來的指令(非阻塞)适滓,IO操作和CPU指令互不干擾(異步)凭迹,最后通過中斷的方式通知IO操作的完成結(jié)果蕊苗。
線程層次:操作系統(tǒng)調(diào)度單元的層次朽砰,操作系統(tǒng)為了減輕程序員的思考負(fù)擔(dān)瞧柔,將底層的異步非阻塞的IO方式進(jìn)行封裝造锅,把相關(guān)系統(tǒng)調(diào)用(如read和write)以同步的方式展現(xiàn)出來哥蔚,然而同步阻塞IO會(huì)使線程掛起,同步非阻塞IO會(huì)消耗CPU資源在輪詢上,3個(gè)解決方法诺苹;
? ?? ?1. 多線程(同步阻塞)
? ?? ?2. IO多路復(fù)用(select收奔、poll、epoll)
? ?? ?3. 直接暴露出異步的IO接口滓玖,kernel-aio和IOCP(異步非阻塞)
傳統(tǒng)BIO創(chuàng)建服務(wù)
? ? ? ?JNIO是jdk1.4以后才有的坪哄,之前JAVA IO一直是BIO,C呢撞、C++程序員為什么看不起java程序員损姜?我想BIO的低性能就是其中一個(gè)重要的原因吧!
? ? ? ?Java BIO其實(shí)就是同步阻塞殊霞,高并發(fā)處理效率低摧阅,我們利用JAVA BIO開始一個(gè)服務(wù)端程序绷蹲。
public class BioServer {
public static void main(String[] args) throws IOException {
//端口
int port=8080;
ServerSocket serverSocket=null;
try {
//綁定端口
serverSocket=new ServerSocket(port);
while (true){
//主線程main會(huì)阻塞在這里棒卷,等待客戶端鏈接
Socket socket = serverSocket.accept();
processClient(socket);
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}finally {
if(serverSocket!=null){
serverSocket.close();
}
}
}
public static void processClient(Socket socket) throws InterruptedException {
//模擬處理socket
Thread.sleep(1000);
}
}
? ? ? ?這段代始終在main線程中執(zhí)行,就好比公司創(chuàng)建初期只有老板一個(gè)人祝钢,確實(shí)能完成客戶端的鏈接及請(qǐng)求處理比规,運(yùn)行程序代碼會(huì)阻塞在serverSocket=new ServerSocket(port);
,一直等到客戶端鏈接成功后拦英,才執(zhí)行處理函數(shù)processClient(socket);
蜒什,處理結(jié)束后,繼續(xù)循環(huán)疤估,此時(shí)程序繼續(xù)阻塞在Socket socket = serverSocket.accept();
等待新的客戶端鏈接灾常。processClient
花了10秒鐘才處理完畢,在此期間铃拇,如果其他客戶端請(qǐng)鏈接服務(wù)器是不成功的钞瀑,它必須等上一個(gè)客戶端請(qǐng)求處理完成了才能繼續(xù)。假如有1000個(gè)客戶請(qǐng)求呢慷荔?10000個(gè)呢雕什?想想你在瀏覽器頁面等待一天才下單成功...于是,這家電商公司倒閉了!
? ? ? ?客戶端發(fā)鏈接請(qǐng)求贷岸,希望你服務(wù)器立馬處理我的請(qǐng)求壹士,而不是等你處理完畢了別人的事情再來搭理我!時(shí)間很寶貴好嗎凰盔?服務(wù)器很委婉墓卦,表示人手不夠倦春,沒辦法處理別人事情的同時(shí)再處理你的事情户敬,畢竟一心不可二用。
? ? ? ?那就增加人手睁本!于是線程臨危受命(公司開始招人)尿庐,服務(wù)器派主線程接收請(qǐng)求(相當(dāng)于公司前臺(tái)),然后將請(qǐng)求交給另一線程(相當(dāng)于業(yè)務(wù)人員)處理呢堰,服務(wù)器繼續(xù)等待連接抄瑟,這樣的話新的客戶端能立馬鏈接上服務(wù)器,而不用等待服務(wù)器處理完別人的事情再來接待我了枉疼,代碼如下:
public class BioServer {
public static void main(String[] args) throws IOException {
//端口
int port=8080;
ServerSocket serverSocket=null;
try {
//綁定端口
serverSocket=new ServerSocket(port);
while (true){
//主線程main會(huì)阻塞在這里皮假,等待客戶端鏈接
Socket socket = serverSocket.accept();
//請(qǐng)求處理交給別人,主線程繼續(xù)接待客戶端的請(qǐng)求
new Thread(()->{
processClient(socket);
}).start();
}
} catch (IOException e) {
e.printStackTrace();
}finally {
if(serverSocket!=null){
serverSocket.close();
}
}
}
public static void processClient(Socket socket) {
//模擬處理socket
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
? ? ? ?客戶端鏈接成功后骂维,交給一個(gè)線程處理請(qǐng)求惹资,主線程繼續(xù)循環(huán)等待客戶端的鏈接?如果航闺,有10000個(gè)人來鏈接褪测,那服務(wù)器就要開10000個(gè)線程,如果10萬呢潦刃?你開10萬個(gè)線程侮措?哇,你服務(wù)器性能好高耶乖杠!線程的創(chuàng)建與銷毀很耗資源的好嗎分扎?就好比你的公司,你招10000萬個(gè)業(yè)務(wù)人員處理客戶需求胧洒?正常的做法是畏吓,招10個(gè)業(yè)務(wù)人員,輪詢處理客戶請(qǐng)求略荡,每一個(gè)業(yè)務(wù)人員處理完客戶請(qǐng)求后等待服務(wù)器分給他下一單任務(wù)庵佣,于是,線程池登場了:
public class BioServer {
public static void main(String[] args) throws IOException {
//端口
int port=8080;
ServerSocket serverSocket=null;
try {
//綁定端口
serverSocket=new ServerSocket(port);
//創(chuàng)建一個(gè)線程池汛兜,相當(dāng)于一個(gè)固定規(guī)模的業(yè)務(wù)團(tuán)隊(duì)
TimeServerHandlerExecutorPool pool = new TimeServerHandlerExecutorPool(50, 1000);
while (true){
//主線程main會(huì)阻塞在這里巴粪,等待客戶端鏈接
Socket socket = serverSocket.accept();
pool.execute(()->{processClient(socket);});
}
} catch (IOException e) {
e.printStackTrace();
}finally {
if(serverSocket!=null){
serverSocket.close();
}
}
}
public static void processClient(Socket socket) {
//模擬處理socket
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class TimeServerHandlerExecutorPool implements Executor{
private ExecutorService executorService;
public TimeServerHandlerExecutorPool(int maxPoolSize,int queueSize) {
/**
* @param corePoolSize 核心線程數(shù)量
* @param maximumPoolSize 線程創(chuàng)建最大數(shù)量
* @param keepAliveTime 當(dāng)創(chuàng)建到了線程池最大數(shù)量時(shí) 多長時(shí)間線程沒有處理任務(wù),則線程銷毀
* @param unit keepAliveTime時(shí)間單位
* @param workQueue 此線程池使用什么隊(duì)列
*/
System.out.println(Runtime.getRuntime().availableProcessors());
this.executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
maxPoolSize,120L, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(queueSize));
}
@Override
public void execute(Runnable command) {
executorService.execute(command);
}
}
? ? ? ?OK,現(xiàn)在這個(gè)公司有模有樣了肛根,一個(gè)前臺(tái)辫塌,N個(gè)業(yè)務(wù)人員,只要有訂單派哲,這N個(gè)業(yè)務(wù)人員可以不睡覺臼氨!
? ? ? ?公司規(guī)模日益增長,用戶量越來越大芭届,N個(gè)業(yè)務(wù)人員已經(jīng)加班加點(diǎn)累吐血了储矩,公司到了一個(gè)瓶頸期,急需改變現(xiàn)狀褂乍。
? ? ? ?大家有沒有發(fā)現(xiàn)持隧,當(dāng)一件事參與的人多了以后,溝通往往會(huì)成為事情發(fā)展的最大障礙逃片,前臺(tái)人員需要不斷的與業(yè)務(wù)人員溝通屡拨,業(yè)務(wù)人員來回不斷的找前臺(tái)溝通,前臺(tái)在多個(gè)業(yè)務(wù)人員之間不斷的進(jìn)行腦力切換褥实。如果前臺(tái)正在跟業(yè)務(wù)人員A溝通呀狼,這時(shí)業(yè)務(wù)人員B插進(jìn)來了,前臺(tái)轉(zhuǎn)而去跟B溝通损离,溝通完后需要回憶剛才跟A溝通到哪里了哥艇,前臺(tái)想,太累了草冈,有跟業(yè)務(wù)人員解釋的時(shí)間還不如我自己干她奥。其實(shí),這就是多線程的上下文切換怎棱,CPU通過時(shí)間片分配算法來循環(huán)執(zhí)行任務(wù)哩俭,當(dāng)前任務(wù)執(zhí)行一個(gè)時(shí)間片后會(huì)切換到下一個(gè)任務(wù)。但是拳恋,在切換前會(huì)保存上一個(gè)任務(wù)的狀態(tài)凡资,以便下次切換回這個(gè)任務(wù)時(shí),可以再次加載這個(gè)任務(wù)的狀態(tài)谬运,從任務(wù)保存到再加載的過程就是一次上下文切換隙赁。線程切換時(shí)需要知道在這之前當(dāng)前線程已經(jīng)執(zhí)行到哪條指令了,所以需要記錄程序計(jì)數(shù)器的值梆暖,另外比如說線程正在進(jìn)行某個(gè)計(jì)算的時(shí)候被掛起了伞访,那么下次繼續(xù)執(zhí)行的時(shí)候需要知道之前掛起時(shí)變量的值時(shí)多少,因此需要記錄CPU寄存器的狀態(tài)轰驳。所以一般來說厚掷,線程上下文切換過程中會(huì)記錄程序計(jì)數(shù)器弟灼、CPU寄存器狀態(tài)等數(shù)據(jù)。
公司不得不進(jìn)行改革冒黑,對(duì)前臺(tái)人員進(jìn)行業(yè)務(wù)培訓(xùn)田绑。前臺(tái)記錄多個(gè)用戶需求,搜集到一定程度后抡爹,暫停收集掩驱,對(duì)這些需求進(jìn)行篩選,大部分短期自己能做的任務(wù)自己做了冬竟,難度大且耗時(shí)的任務(wù)交給業(yè)務(wù)人員處理欧穴。隨著規(guī)模的增大,可以分成多個(gè)組诱咏,每組一個(gè)前臺(tái)和多個(gè)業(yè)務(wù)人員苔可。這就是NIO單線程Reactor模型和多線程Reactor模型缴挖,上面的比喻可能不恰當(dāng)袋狞,后面會(huì)通過代碼的形式詳細(xì)講解NIO。
利用傳統(tǒng)BIO手寫一個(gè)Redis客戶端
? ? ? ?Redis作為高性能的緩存數(shù)據(jù)庫映屋,想必大家都用過苟鸯,應(yīng)用程序通過Jedis客戶端來鏈接redies,我們就利用java BIO來模擬一個(gè)Jedis客戶端來向redis發(fā)送請(qǐng)求數(shù)據(jù)棚点。
? ? ? ?通信需要雙方定好通信協(xié)議和數(shù)據(jù)格式早处,這里通信協(xié)議就是TCP,我們主要關(guān)系數(shù)據(jù)格式瘫析,方法就是查看Jedis發(fā)送數(shù)據(jù)的格式砌梆。
- 首先,準(zhǔn)備服務(wù)程序贬循,用于接收并查看Jedis發(fā)送來的數(shù)據(jù):
public class BioServer {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(9999);
while (true) {
Socket socket = serverSocket.accept();
System.out.println("客戶端" + socket.getRemoteSocketAddress().toString() + "來連接了");
InputStream inputStream = null;
OutputStream outputStream = null;
try {
inputStream = socket.getInputStream();
outputStream = socket.getOutputStream();
int count = 0;
byte[] bytes = new byte[1024];
while ((count = inputStream.read(bytes)) != -1) {
String line = new String(bytes, 0, count, "utf-8");
//打印jedis發(fā)送過來的數(shù)據(jù)
System.out.println(line);
outputStream.write("ok".getBytes());
outputStream.flush();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
socket.close();
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
serverSocket.close();
}
}
}
- 準(zhǔn)備Jedis鏈接程序:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.0</version>
</dependency>
//模擬jedis
public class RedisClient {
public static void main(String[] args) {
Jedis redisClient=new Jedis("localhost",9999);
System.out.println(redisClient.set("yuanma", "123456"));
redisClient.close();
}
}
服務(wù)程序打印結(jié)果:
客戶端/127.0.0.1:65257來連接了
*3
$3
SET
$6
yuanma
$6
123456
? ? ? ?上面的服務(wù)程序不是真正的Redis服務(wù)器咸包,我們只是為了查看Jedis發(fā)送的數(shù)據(jù)格式。Jedis的請(qǐng)求是set yuanma 123456
杖虾,意思是設(shè)置鍵yuanma
的值為123456
烂瘫,Jedis將這個(gè)請(qǐng)求封裝成了上面的數(shù)據(jù)格式。我們根據(jù)這個(gè)數(shù)據(jù)格式奇适,利用BIO模擬Jedis向真正的Redis服務(wù)器發(fā)請(qǐng)求坟比,然后再根據(jù)鍵從redis服務(wù)器獲取值,看是否能成功嚷往。
? ? ? ?先分析下上面的數(shù)據(jù)格式葛账,*3
的意思是發(fā)送的參數(shù)有3個(gè),即set
皮仁、yuanma
和123456
籍琳,$3
表示第一個(gè)參數(shù)長度是3茄茁,SET
表示的就是第一個(gè)參數(shù);以此類推巩割,$6
表示第二個(gè)參數(shù)長度是6裙顽,yuanma
表示第二個(gè)參數(shù);$6
表示第三個(gè)參數(shù)長度是6宣谈,123456
表示第三個(gè)參數(shù)愈犹。
? ? ? ?舉一反三,如果我想Redis發(fā)送'set name netty
這條命令闻丑,數(shù)據(jù)格式應(yīng)該是這樣的:
*3
$3
SET
$4
name
$5
netty
? ? ? ?如果漩怎,向Redis服務(wù)器發(fā)送 get name
(即獲取name的值),數(shù)據(jù)格式應(yīng)該是這樣的:
*2
$3
GET
$4
name
? ? ? ?OK嗦嗡,數(shù)據(jù)格式我們研究清楚了勋锤,下面就是模擬Jedis向服務(wù)其發(fā)送請(qǐng)求并接收返回的數(shù)據(jù)。
模擬Jedis客戶端來鏈接Redis服務(wù)器
第一步侥祭,定義向Redis發(fā)送請(qǐng)求的客戶端API:
import redis.clients.jedis.Jedis;
//模擬jedis
public class RedisClient {
//發(fā)送set key value命令
public String set(String key, String value){
reutrn null;
}
//發(fā)送get key命令
public String get(String key){
return null;
}
//發(fā)送incr key命令
public String incr(String key){
return null;
}
}
第二步叁执,定義Socket通信層:
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
//socket通信
public class LubanSocket {
private Socket socket;
private InputStream inputStream;
private OutputStream outputStream;
//構(gòu)造函數(shù),鏈接Redis服務(wù)器矮冬,拿到輸入流和輸出流
public LubanSocket(String ip,int prot) {
try {
if(!isCon()){
socket=new Socket(ip,prot);
inputStream=socket.getInputStream();
outputStream=socket.getOutputStream();
}
} catch (IOException e) {
e.printStackTrace();
}
}
//發(fā)送請(qǐng)求
public void send(String str){
try {
outputStream.write(str.getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}
//讀取Redis返回的數(shù)據(jù)
public String read(){
byte[] b=new byte[1024];
int count=0;
try {
count= inputStream.read(b);
} catch (IOException e) {
e.printStackTrace();
}
return new String(b,0,count);
}
//判斷鏈接是否斷開
public boolean isCon(){
return socket!=null && !socket.isClosed() && socket.isConnected();
}
//關(guān)閉連接
public void close(){
if(outputStream!=null){
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(inputStream!=null){
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(socket!=null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
第三步谈宛,定義數(shù)據(jù)協(xié)議層
public class Resp {
/**
* redis網(wǎng)絡(luò)通信協(xié)議,比如set("name","congzhizhi")
* *3
* $3
* set
* $4
* name
* $10
* congzhizhi
* 其中胎署,*3表示發(fā)了3個(gè)參數(shù)吆录,$3表示下面的參數(shù)3個(gè)字符,以此類推
*
*
*/
public static final String star="*";
public static final String crlf="\r\n";
public static final String lengthStart="$";
//枚舉類琼牧,定義指令恢筝,這里有SET指令、GET指令巨坊、INCR指令
public static enum command{
SET,GET,INCR
}
}
下一步就是完善第一步的客戶端撬槽,組裝發(fā)送命令。代碼也很簡單抱究,直接看:
//模擬jedis
public class RedisClient {
private LubanSocket lubanSocket;
//構(gòu)造函數(shù)恢氯,鏈接Redis服務(wù)器
public RedisClient(String ip,int prot) {
this.lubanSocket=new LubanSocket(ip,prot);
}
//發(fā)送set命令
public String set(String key, String value){
lubanSocket.send(commandStrUtil(Resp.command.SET,key.getBytes(),value.getBytes()));
return lubanSocket.read();
}
//關(guān)閉鏈接
public void close(){
lubanSocket.close();
}
//發(fā)送get命令
public String get(String key){
lubanSocket.send(commandStrUtil(Resp.command.GET,key.getBytes()));
return lubanSocket.read();
}
//發(fā)送incr命令
public String incr(String key){
lubanSocket.send(commandStrUtil(Resp.command.INCR,key.getBytes()));
return lubanSocket.read();
}
//組裝命令
public String commandStrUtil(Resp.command command, byte[]... bytes){
StringBuilder stringBuilder=new StringBuilder();
//拼接*3,set key value鼓寺,總共3個(gè)勋拟,bytes代表鍵和值參數(shù),注意拼接完要加回車換行
stringBuilder.append(Resp.star).append(1+bytes.length).append(Resp.crlf);
//拼接SET的長度妈候,$3
stringBuilder.append(Resp.lengthStart).append(command.toString().getBytes().length).append(Resp.crlf);
//拼接SET字符串
stringBuilder.append(command.toString()).append(Resp.crlf);
//拼接鍵和值
for (byte[] aByte : bytes) {
stringBuilder.append(Resp.lengthStart).append(aByte.length).append(Resp.crlf);
stringBuilder.append(new String(aByte)).append(Resp.crlf);
}
return stringBuilder.toString();
}
}
上面的代碼很簡單敢靡,不細(xì)講了,下面我們來做個(gè)測試:
- 首先苦银,我們先啟動(dòng)redis啸胧,小編為演示赶站,在這里啟動(dòng)一個(gè)windows版本的redis,到安裝目錄下通過命令
redis-server.exe "redis.windows.conf"
即可啟動(dòng)纺念,端口號(hào)默認(rèn)為6379:
- 編寫測試
public static void main(String[] args) {
RedisClient redisClient=new RedisClient("localhost",6379);
System.out.println(redisClient.set("yuanma", "123456"));
System.out.println(redisClient.get("yuanma"));
redisClient.close();
}
打印結(jié)果:
這說明贝椿,我們成功向Redis發(fā)送的set和get命令,并成功接收了Redis返回的數(shù)據(jù)陷谱。
為進(jìn)一步證明烙博,我們有可視化客戶端連接Redis,然后查看我們剛才set的數(shù)據(jù)
完美烟逊!其實(shí)渣窜,不光SET 和GET命令,Redis中大部分常用的命令都可以使用咱們這個(gè)手寫的客戶端都可以宪躯。Mysql驅(qū)動(dòng)連接數(shù)據(jù)庫也是這個(gè)原理啦乔宿,就是TCP通信,只不過數(shù)據(jù)協(xié)議和IO模型(以后詳講)不同而已访雪。這就是傳統(tǒng)的JAVA AIO編程详瑞,他是同步阻塞的,無法滿足高并發(fā)鏈接冬阳,下一節(jié)我們就開始講高并發(fā)網(wǎng)絡(luò)通信基礎(chǔ)NIO蛤虐。