Java編寫基于netty的RPC框架

一 簡單概念

RPC:?(?Remote Procedure Call),遠程調用過程,是通過網(wǎng)絡調用遠程計算機的進程中某個方法,從而獲取到想要的數(shù)據(jù),過程如同調用本地的方法一樣.

阻塞IO?:當阻塞I/O在調用InputStream.read()方法是阻塞的,一直等到數(shù)據(jù)到來時才返回,同樣ServerSocket.accept()方法時,也是阻塞,直到有客戶端連接才返回,I/O通信模式如下:

缺點:當客戶端多時,會創(chuàng)建大量的處理線程,并且為每一個線程分配一定的資源;阻塞可能帶來頻繁切換上下文,這時引入NIO

NIO?:? jdk1.4引入的(NEW Input/Output),是基于通過和緩存區(qū)的I/O方式,(插入一段題外話,學的多忘得也多,之前有認真研究過NIO,后來用到的時候,忘得一干二凈,所以學習一些東西,經(jīng)常返回看看),NIO是一種非阻塞的IO模型,通過不斷輪詢IO事件是否就緒,非阻塞是指線程在等待IO的時候,可以做其他的任務,同步的核心是Selector,Selector代替線程本省的輪詢IO事件,避免了阻塞同時減少了不必要的線程消耗;非阻塞的核心是通道和緩存區(qū),當IO事件的就緒時,可以將緩存區(qū)的數(shù)據(jù)寫入通道

其工作原理:

1? 由專門的線程來處理所有的IO事件,并且負責轉發(fā)

2? 事件驅動機制:事件到的時候才觸發(fā),而不是同步監(jiān)視

3? 線程通訊:線程之間通訊通過wait,notify等方式通訊,保證每次上下文切換都是有意義的,減少沒必要的線程切換

通道?:? 是對原I/O包中流的模擬,所有數(shù)據(jù)必須通過Channel對象,常見的通道FileChannel,SocketChannel,ServerSocketChannel,DatagramChannel(UDP協(xié)議向網(wǎng)絡連接的兩端讀寫數(shù)據(jù))

Buffer緩存區(qū)?:實際上是一個容器,一個連續(xù)的數(shù)組,任何讀寫的數(shù)據(jù)都經(jīng)過Buffer

Netty?:是由JBOSS提供的一個java開源框架,是一個高性能,異步事件驅動的NIO框架,基于JAVA NIO提供的API實現(xiàn),他提供了TCP UDP和文件傳輸?shù)闹С?,所有操作都是異步非阻塞的.通過Futrue-Listener機制,本質就是Reactor模式的現(xiàn)實,Selector作為多路復用器,EventLoop作為轉發(fā)器,而且,netty對NIO中buffer做優(yōu)化,大大提高了性能

二? Netty 客戶端和服務端的

Netty中Bootstrap和Channel的生命周期

Bootstrap簡介

Bootstarp:引導程序,將ChannelPipeline,ChannelHandler,EventLoop進行整體關聯(lián)

Bootstrap具體分為兩個實現(xiàn)

ServerBootstrap:用于服務端,使用一個ServerChannel接收客戶端的連接,并創(chuàng)建對應的子Channel

Bootstrap:用于客戶端,只需要一個單獨的Channel,配置整個Netty程序,串聯(lián)起各個組件

二者的主要區(qū)別:

1 ServerBootstrap用于Server端,通過調用bind()綁定一個端口監(jiān)聽連接,Bootstrap用于Client端,需要調用connect()方法來連接服務器端,我們也可以調用bind()方法接收返回ChannelFuture中Channel

2 客戶端的Bootstrap一般用一個EventLoopGroup,而服務器的ServerBootstrap會用兩個第一個EventLoopGroup專門負責綁定到端口監(jiān)聽連接事件,而第二個EventLoopGroup專門用來處處理每個接收的連接,這樣大大提高了并發(fā)量

public class Server {

? ? public static void main(String[] args) throws Exception {

? ? ? ? // 1 創(chuàng)建線兩個事件循環(huán)組

? ? ? ? // 一個是用于處理服務器端接收客戶端連接的

? ? ? ? // 一個是進行網(wǎng)絡通信的(網(wǎng)絡讀寫的)

? ? ? ? EventLoopGroup pGroup = new NioEventLoopGroup();

? ? ? ? EventLoopGroup cGroup = new NioEventLoopGroup();

? ? ? ? // 2 創(chuàng)建輔助工具類ServerBootstrap铆遭,用于服務器通道的一系列配置

? ? ? ? ServerBootstrap b = new ServerBootstrap();

? ? ? ? b.group(pGroup, cGroup) // 綁定倆個線程組

? ? ? ? ? ? ? ? .channel(NioServerSocketChannel.class) // 指定NIO的模式.NioServerSocketChannel對應TCP, NioDatagramChannel對應UDP

? ? ? ? ? ? ? ? .option(ChannelOption.SO_BACKLOG, 1024) // 設置TCP緩沖區(qū)

? ? ? ? ? ? ? ? .option(ChannelOption.SO_SNDBUF, 32 * 1024) // 設置發(fā)送緩沖大小

? ? ? ? ? ? ? ? .option(ChannelOption.SO_RCVBUF, 32 * 1024) // 這是接收緩沖大小

? ? ? ? ? ? ? ? .option(ChannelOption.SO_KEEPALIVE, true) // 保持連接

? ? ? ? ? ? ? ? .childHandler(new ChannelInitializer<SocketChannel>() {

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? protected void initChannel(SocketChannel sc) throws Exception {? //SocketChannel建立連接后的管道

? ? ? ? ? ? ? ? ? ? ? ? // 3 在這里配置 通信數(shù)據(jù)的處理邏輯, 可以addLast多個...

? ? ? ? ? ? ? ? ? ? ? ? sc.pipeline().addLast(new ServerHandler());

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? });

? ? ? ? // 4 綁定端口, bind返回future(異步), 加上sync阻塞在獲取連接處

? ? ? ? ChannelFuture cf1 = b.bind(8765).sync();

? ? ? ? //ChannelFuture cf2 = b.bind(8764).sync();? ?//可以綁定多個端口

? ? ? ? // 5 等待關閉, 加上sync阻塞在關閉請求處

? ? ? ? cf1.channel().closeFuture().sync();

? ? ? ? //cf2.channel().closeFuture().sync();

? ? ? ? pGroup.shutdownGracefully();

? ? ? ? cGroup.shutdownGracefully();

? ? }

}

public class ServerHandler extends ChannelHandlerAdapter {

? ? @Override

? ? public void channelActive(ChannelHandlerContext ctx) throws Exception {

? ? ? ? System.out.println("server channel active... ");

? ? }

? ? @Override

? ? public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

? ? ? ? ? ? ByteBuf buf = (ByteBuf) msg;

? ? ? ? ? ? byte[] req = new byte[buf.readableBytes()];

? ? ? ? ? ? buf.readBytes(req);

? ? ? ? ? ? String body = new String(req, "utf-8");

? ? ? ? ? ? System.out.println("Server :" + body );

? ? ? ? ? ? String response = "返回給客戶端的響應:" + body ;

? ? ? ? ? ? ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));

? ? ? ? ? ? // future完成后觸發(fā)監(jiān)聽器, 此處是寫完即關閉(短連接). 因此需要關閉連接時, 要通過server端關閉. 直接關閉用方法ctx[.channel()].close()

? ? ? ? ? ? //.addListener(ChannelFutureListener.CLOSE);

? ? }

? ? @Override

? ? public void channelReadComplete(ChannelHandlerContext ctx)

? ? ? ? ? ? throws Exception {

? ? ? ? System.out.println("讀完了");

? ? ? ? ctx.flush();

? ? }

? ? @Override

? ? public void exceptionCaught(ChannelHandlerContext ctx, Throwable t)

? ? ? ? ? ? throws Exception {

? ? ? ? ctx.close();

? ? }

}

public class Client {

? ? public static void main(String[] args) throws Exception {


? ? ? ? EventLoopGroup group = new NioEventLoopGroup();

? ? ? ? Bootstrap b = new Bootstrap();

? ? ? ? b.group(group)

? ? ? ? .channel(NioSocketChannel.class)

? ? ? ? .handler(new ChannelInitializer<SocketChannel>() {

? ? ? ? ? ? @Override

? ? ? ? ? ? protected void initChannel(SocketChannel sc) throws Exception {?

? ? ? ? ? ? ? ? sc.pipeline().addLast(new ClientHandler());

? ? ? ? ? ? }

? ? ? ? });


? ? ? ? ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();

? ? ? ? //ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync();? //可以使用多個端口

? ? ? ? //發(fā)送消息, Buffer類型. write需要flush才發(fā)送, 可用writeFlush代替

? ? ? ? cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));

? ? ? ? cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));

? ? ? ? Thread.sleep(2000);

? ? ? ? cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));

? ? ? ? //cf2.channel().writeAndFlush(Unpooled.copiedBuffer("999".getBytes()));


? ? ? ? cf1.channel().closeFuture().sync();

? ? ? ? //cf2.channel().closeFuture().sync();

? ? ? ? group.shutdownGracefully();

? ? }

}

public class ClientHandler extends ChannelHandlerAdapter{

? ? @Override

? ? public void channelActive(ChannelHandlerContext ctx) throws Exception {

? ? }

? ? @Override

? ? public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

? ? ? ? try {

? ? ? ? ? ? ByteBuf buf = (ByteBuf) msg;

? ? ? ? ? ? byte[] req = new byte[buf.readableBytes()];

? ? ? ? ? ? buf.readBytes(req);

? ? ? ? ? ? String body = new String(req, "utf-8");

? ? ? ? ? ? System.out.println("Client :" + body );

? ? ? ? } finally {

? ? ? ? ? ? // 記得釋放xxxHandler里面的方法的msg參數(shù): 寫(write)數(shù)據(jù), msg引用將被自動釋放不用手動處理; 但只讀數(shù)據(jù)時,!必須手動釋放引用數(shù)

? ? ? ? ? ? ?ReferenceCountUtil.release(msg);

? ? ? ? }

? ? }

? ? @Override

? ? public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

? ? }

? ? @Override

? ? public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

? ? ? ? ? ? throws Exception {

? ? ? ? ctx.close();

? ? }

}

其他組件:

Handle:?為了支持各種協(xié)議和處理數(shù)據(jù)的方式,可以是連接,數(shù)據(jù)接收,異常,數(shù)據(jù)格式轉換等

ChannelHandler

ChannelInboundHandler?:最常用的Handler,作用是處理接收數(shù)據(jù)的事件,來處理我們的核心業(yè)務邏輯瞧掺。

ChannelInitializer?:局雄,當一個鏈接建立時,我們需要知道怎么來接收或者發(fā)送數(shù)據(jù),當然蒲拉,我們有各種各樣的Handler實現(xiàn)來處理它,那么ChannelInitializer便是用來配置這些Handler,它會提供一個ChannelPipeline,并把Handler加入到ChannelPipeline唬血。

ChannelPipeline?:一個Netty應用基于ChannelPipeline機制,這種機制依賴EventLoop和EventLoopGroup,這三個都和事件或者事件處理相關

EventLoop?: 為Channel處理IO操作,一個EventLoop可以為多個Channel服務

EventLoopGroup?:包含多個EventLoop

Channel?:代表一個Socket連接

Future?:在Netty中所有的IO操作都是異步的,,因此我們不知道,過來的請求是否被處理了,所以我們注冊一個監(jiān)聽,當操作執(zhí)行成功或者失敗時監(jiān)聽自動觸發(fā),所有操作都會返回一個ChannelFutrue

ChannelFuture

Netty?是一個非阻塞的,事件驅動的,網(wǎng)絡編程框架,我們通過一張圖理解一下,Channel,EventLoop以及EventLoopGroup之間的關系

解釋一下,當一個連接過來,Netty首先會注冊一個channel,然后EventLoopGroup會分配一個EventLoop綁定到這個channel,在這個channel的整個生命周期過程中,這個EventLoop一直為他服務,這個玩意就是一個線程

這下聊一下Netty如何處理數(shù)據(jù)?

前面有講到,handler數(shù)據(jù)處理核心,,而ChannelPipeline負責安排Handler的順序和執(zhí)行,我們可以這樣理解,數(shù)據(jù)在ChannelPipeline中流動,其中ChannelHandler就是一個個閥門,這些數(shù)據(jù)都會經(jīng)過每一個ChannelHandler并且被他處理,其中ChannelHandler的兩個子類ChannelOutboundHandler和ChannelInboundHandler,根據(jù)不同的流向,選擇不同的Handler

由圖可以看出,一個數(shù)據(jù)流進入ChannelPipeline時,一個一個handler挨著執(zhí)行,各個handler的數(shù)據(jù)傳遞,這需要調用方法中ChannelHandlerContext來操作,而這個ChannelHandlerContext可以用來讀寫Netty中的數(shù)據(jù)流

三 Netty中的業(yè)務處理

netty中會有很多Handler.具體哪一種Handler還要看繼承是InboundAdapter還是OutboundAdapter,Netty中提供一系列的Adapter來幫助我們簡化開發(fā),在ChannelPipeline中的每一個handler都負責把Event傳遞個洗下一個handler,有這些adapter,這些工作可以自動完成,,我們只需覆蓋我們真正實現(xiàn)的部分即可,接下來比較常用的三種ChannelHandler

Encoders和Decodeers

我們在網(wǎng)絡傳輸只能傳輸字節(jié)流,在發(fā)送數(shù)據(jù)時,把我們的message轉換成bytes這個過程叫Encode(編碼),相反,接收數(shù)據(jù),需要把byte轉換成message,這個過程叫Decode(解碼)

Domain Logic

我們真正關心的如何處理解碼以后的數(shù)據(jù),我們真正的業(yè)務邏輯便是接收處理的數(shù)據(jù),Netty提供一個常用的基類就是SimpleChannelInboundHandler<T>,其中T就是Handler處理的數(shù)據(jù)類型,消息到達這個Handler,會自動調用這個Handler中的channelRead0(ChannelHandlerContext,T)方法,T就是傳過來的數(shù)據(jù)對象

四 基于netty實現(xiàn)的Rpc的例子

這是我的github上項目的位置

https://github.com/developerxiaofeng/rpcByNetty

項目目錄結構如下

歡迎工作一到五年的Java工程師朋友們加入Java架構開發(fā): 855835163

群內(nèi)提供免費的Java架構學習資料(里面有高可用、高并發(fā)唤崭、高性能及分布式拷恨、Jvm性能調優(yōu)、Spring源碼谢肾,MyBatis腕侄,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)合理利用自己每一分每一秒的時間來學習提升自己,不要再用"沒有時間“來掩飾自己思想上的懶惰芦疏!趁年輕冕杠,使勁拼,給未來的自己一個交代酸茴!

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末分预,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子薪捍,更是在濱河造成了極大的恐慌笼痹,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,265評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件酪穿,死亡現(xiàn)場離奇詭異凳干,居然都是意外死亡,警方通過查閱死者的電腦和手機被济,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評論 2 385
  • 文/潘曉璐 我一進店門救赐,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人溉潭,你說我怎么就攤上這事净响。” “怎么了喳瓣?”我有些...
    開封第一講書人閱讀 156,852評論 0 347
  • 文/不壞的土叔 我叫張陵馋贤,是天一觀的道長。 經(jīng)常有香客問我畏陕,道長配乓,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,408評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮犹芹,結果婚禮上崎页,老公的妹妹穿的比我還像新娘。我一直安慰自己腰埂,他們只是感情好飒焦,可當我...
    茶點故事閱讀 65,445評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著屿笼,像睡著了一般牺荠。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上驴一,一...
    開封第一講書人閱讀 49,772評論 1 290
  • 那天休雌,我揣著相機與錄音,去河邊找鬼肝断。 笑死杈曲,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的胸懈。 我是一名探鬼主播担扑,決...
    沈念sama閱讀 38,921評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼箫荡!你這毒婦竟也來了魁亦?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,688評論 0 266
  • 序言:老撾萬榮一對情侶失蹤羔挡,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后间唉,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體绞灼,經(jīng)...
    沈念sama閱讀 44,130評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,467評論 2 325
  • 正文 我和宋清朗相戀三年呈野,在試婚紗的時候發(fā)現(xiàn)自己被綠了低矮。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,617評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡被冒,死狀恐怖军掂,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情昨悼,我是刑警寧澤蝗锥,帶...
    沈念sama閱讀 34,276評論 4 329
  • 正文 年R本政府宣布,位于F島的核電站率触,受9級特大地震影響终议,放射性物質發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,882評論 3 312
  • 文/蒙蒙 一穴张、第九天 我趴在偏房一處隱蔽的房頂上張望细燎。 院中可真熱鬧,春花似錦皂甘、人聲如沸玻驻。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,740評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽璧瞬。三九已至,卻和暖如春益老,著一層夾襖步出監(jiān)牢的瞬間彪蓬,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,967評論 1 265
  • 我被黑心中介騙來泰國打工捺萌, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留档冬,地道東北人。 一個月前我還...
    沈念sama閱讀 46,315評論 2 360
  • 正文 我出身青樓桃纯,卻偏偏與公主長得像酷誓,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子态坦,可洞房花燭夜當晚...
    茶點故事閱讀 43,486評論 2 348

推薦閱讀更多精彩內(nèi)容