轉自:https://zhuanlan.zhihu.com/p/28910259?utm_source=qq&utm_medium=social
首先我們先來看一下netty中的概念,熟悉了基本概念后再來進行netty高性能的分析闲孤。
netty中的概念
- ChannelEvent
因為Netty是基于事件驅動的,ChannelEvent就相當于某一個事件,比如說連接成功時打印一句話。 - ChannelPipeline
Pipeline意味著管道、傳輸途徑赌渣。也就是說,他是控制ChannelEvent事件分發(fā)和傳遞的昌犹。事件在管道中流轉坚芜,第一站到哪,第二站到哪斜姥,到哪是終點鸿竖,就是用這個ChannelPipeline處理的。比如:開發(fā)某項目的流程是先給A設計铸敏,然后給B開發(fā)缚忧。這一套流程相當于是ChannelPipeline。 - ChannelHandler
剛說Pipeline負責把事件分發(fā)到相應的站點去處理杈笔,這個站點就是指ChannelHandler闪水。事件到了ChannelHandler這里,就要被具體處理了蒙具,也就是說球榆,我們具體的業(yè)務邏輯一般都是從這里開始的。 - Channel
有了各部門的協調處理禁筏,我們還需要一個從整體把握形勢的的部門:channel持钉。它夠告訴你當前通道的狀態(tài),是連通還是關閉篱昔、獲取通道相關的配置信息右钾、得到Pipeline等。Channel的實現類型決定了你這個通道是同步的還是異步的,例如舀射,NioServerSocketChannel窘茁。 - EventLoopGroup
group:群組,Loop:循環(huán)脆烟,Event:事件山林,Netty內部都是通過線程在處理各種數據,EventLoopGroup就是用來管理調度他們的邢羔,相當于線程池驼抹,一般使用NioEventLoopGroup,默認的NioEventLoop個數為cpu核數*2拜鹤。它是4.x版本提出來的一個新概念框冀,類似于3.x版本中的線程。
小demo
上面了解到了netty中的某些概念敏簿,下面通過一個demo來熟悉下netty的使用流程明也。
server端代碼
public class HelloServer {
//服務端監(jiān)聽的端口地址
private static final int port = 9527;
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
//使用主從Reactor多線程模型
b.group(bossGroup, workerGroup);
//使用nio傳輸方式
b.channel(NioServerSocketChannel.class);
//添加我們自己的handler處理流程
b.childHandler(new HelloServerInitializer());
// 服務器綁定端口監(jiān)聽
ChannelFuture f = b.bind(port).sync();
// 監(jiān)聽服務器關閉監(jiān)聽
f.channel().closeFuture().sync();
// 可以簡寫為
/* b.bind(portNumber).sync().channel().closeFuture().sync(); */
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
HelloServerInitializer :用于定義我們自己的處理流程
public class HelloServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
/**
* 獲取pipeline,并定義流程
*/
ChannelPipeline pipeline = socketChannel.pipeline();
// 以("\n")為結尾分割的 解碼器
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
// 字符串解碼 和 編碼
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// 自己的邏輯Handler
pipeline.addLast("handler", new HelloServerHandler());
}
}
HelloServerHandler :server端自己的handler處理邏輯
public class HelloServerHandler extends SimpleChannelInboundHandler<String> {
/**
* 每一次收到消息時執(zhí)行此方法
*
* 字符串最后面的"\n"是必須的惯裕。
* 因為我們在前面的解碼器DelimiterBasedFrameDecoder是一個根據字符串結尾為“\n”來結尾的温数。
* 假如沒有這個字符的話。解碼會出現問題蜻势。
*
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 收到消息直接打印輸出
System.out.println(ctx.channel().remoteAddress() + " Say : " + msg);
// 返回客戶端消息 - 我已經接收到了你的消息
ctx.writeAndFlush("Received your message !\n");
}
/**
* 覆蓋 channelActive 方法
* 在channel被啟用的時候觸發(fā) (在建立連接的時候)
*
* 這里channeActive的意思是當連接活躍(建立)的時候觸發(fā).輸出消息源的遠程地址撑刺。并返回歡迎消息。
*
* 在3.x版本中此處有很大區(qū)別握玛。在3.x版本中write()方法是自動flush的够傍。
* 在4.x版本的前面幾個版本也是一樣的。但是在4.0.9之后修改為WriteAndFlush挠铲。
* 普通的write方法將不會發(fā)送消息王带。需要手動在write之后flush()一次
*
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("RamoteAddress : " + ctx.channel().remoteAddress() + " active !");
ctx.write( "Welcome to " + InetAddress.getLocalHost().getHostName() + " service!\n");
ctx.flush();
super.channelActive(ctx);
}
}
server端詳解
server首先綁定了端口9527,接著在main函數開始的位置定義了兩個工作線程group市殷,一個命名為WorkerGroup愕撰,另一個命名為BossGroup,Worker線程用于管理線程為Boss線程服務醋寝,都是實例化NioEventLoopGroup搞挣。
接下來實例化ServerBootstrap、設置group音羞、設置channel為NioServerSocketChannel囱桨,也就是使用nio的傳輸方式。
接著設置childHandler嗅绰,在這里使用實例化一個HelloServerInitializer類來實現舍肠,繼承ChannelInitializer搀继。主要作用是設置相關的字節(jié)解碼編碼器和代碼處理邏輯。Handler的具體分析會在下文列出翠语。
設置好Handler綁定端口7878叽躯,并調用函數sync(),監(jiān)聽端口等待客戶端連接和發(fā)送消息肌括,監(jiān)聽端口關閉点骑。
在main函數的結尾用到了EventLoopGroup提供的便捷的方法shutdownGraceFully,我們可以在DefaultEventExecutorGroup的父類MultithreadEventExecutorGroup中看到它的實現代碼谍夭。關閉了全部EventExecutor數組child里面子元素黑滴。相比于3.x版本這是一個比較重大的改動〗羲鳎可以很輕松的全部關閉袁辈,而不需要擔心出現內存泄露。
client端代碼
public class HelloClient {
public static String host = "127.0.0.1";
public static int port = 9527;
public static void main(String[] args) throws InterruptedException, IOException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
//使用nio傳輸方式并綁定自己的處理流程
b.group(group).channel(NioSocketChannel.class).handler(new HelloClientInitializer());
// 連接服務端
Channel ch = b.connect(host, port).sync().channel();
// 控制臺輸入
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
for (;;) {
String line = in.readLine();
if (line == null) {
continue;
}
/**
* 向服務端發(fā)送在控制臺輸入的文本 并用"\r\n"結尾
* 之所以用\r\n結尾 是因為我們在handler中添加了 DelimiterBasedFrameDecoder 幀解碼珠漂。
* 這個解碼器是一個根據\n符號位分隔符的解碼器晚缩。所以每條消息的最后必須加上\n否則無法識別和解碼
*/
ch.writeAndFlush(line + "\r\n");
}
} finally {
// The connection is closed automatically on shutdown.
group.shutdownGracefully();
}
}
}
HelloClientInitializer : 我們定義的客戶端處理流程
public class HelloClientInitializer extends ChannelInitializer<SocketChannel> {
/**
* 初始化HelloClientInitializer時執(zhí)行此方法,并定義流程
*/
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
/**
* 這個地方的 必須和服務端對應上甘磨。否則無法正常解碼和編碼
*/
// 以("\n")為結尾分割的 解碼器
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
// 字符串解碼 和 編碼
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// 自己的邏輯Handler
pipeline.addLast("handler", new HelloClientHandler());
}
}
HelloClientHandler : 客戶端自己的處理邏輯
public class HelloClientHandler extends SimpleChannelInboundHandler<String> {
//服務端發(fā)送信息時觸發(fā)此方法
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println("Server say : " + s);
}
//連接成功時觸發(fā)此方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client active ");
super.channelActive(ctx);
}
//服務器關閉時觸發(fā)此方法
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client close ");
super.channelInactive(ctx);
}
}
client端詳解
相比于服務端的代碼,客戶端要精簡一些眯停,客戶端僅僅只需要一個worker的EventLoopGroup
唯一不同的可能就是客戶端的connect方法济舆。服務端的綁定并監(jiān)聽端口,客戶端是連接指定的地址莺债。Sync().channel()是為了返回這個連接服務端的channel滋觉,并用于后面代碼的調用。
當用戶輸入一行內容并回車之后齐邦。循環(huán)的讀取每一行內容椎侠。然后使用writeAndFlush向服務端發(fā)送消息。
netty高性能分析
很早看到一個問題:實現rpc高性能的三個話題措拇,傳輸我纪、協議、線程模型丐吓。下面就先對照代碼浅悉,從這三方面談談netty的設計。
- 傳輸:IO模型在很大程度上決定了框架的性能券犁,相比于bio术健,netty建議采用異步通信模式,因為nio一個線程可以并發(fā)處理N個客戶端連接和讀寫操作粘衬,這從根本上解決了傳統同步阻塞IO一連接一線程模型荞估,架構的性能咳促、彈性伸縮能力和可靠性都得到了極大的提升。正如代碼中所示勘伺,使用的是NioEventLoopGroup和NioSocketChannel來提升傳輸效率跪腹。
- 協議:采用什么樣的通信協議,對系統的性能極其重要娇昙,netty默認提供了對Google Protobuf的支持尺迂,也可以通過擴展Netty的編解碼接口,用戶可以實現其它的高性能序列化框架冒掌。
- 線程:netty使用了Reactor線程模型噪裕,但Reactor模型不同,對性能的影響也非常大股毫,下面介紹常用的Reactor線程模型有三種膳音,分別如下:
- Reactor單線程模型
單線程模型的線程即作為NIO服務端接收客戶端的TCP連接,又作為NIO客戶端向服務端發(fā)起TCP連接铃诬,即讀取通信對端的請求或者應答消息祭陷,又向通信對端發(fā)送消息請求或者應答消息。理論上一個線程可以獨立處理所有IO相關的操作趣席,但一個NIO線程同時處理成百上千的鏈路兵志,性能上無法支撐,即便NIO線程的CPU負荷達到100%宣肚,也無法滿足海量消息的編碼想罕、解碼、讀取和發(fā)送霉涨,又因為當NIO線程負載過重之后按价,處理速度將變慢,這會導致大量客戶端連接超時笙瑟,超時之后往往會進行重發(fā)楼镐,這更加重了NIO線程的負載,最終會導致大量消息積壓和處理超時往枷,NIO線程會成為系統的性能瓶頸框产。 - Reactor多線程模型
有專門一個NIO線程用于監(jiān)聽服務端,接收客戶端的TCP連接請求错洁;網絡IO操作(讀寫)由一個NIO線程池負責茅信,線程池可以采用標準的JDK線程池實現。但百萬客戶端并發(fā)連接時墓臭,一個nio線程用來監(jiān)聽和接受明顯不夠蘸鲸,因此有了主從多線程模型。 - 主從Reactor多線程模型
利用主從NIO線程模型窿锉,可以解決1個服務端監(jiān)聽線程無法有效處理所有客戶端連接的性能不足問題酌摇,即把監(jiān)聽服務端膝舅,接收客戶端的TCP連接請求分給一個線程池。因此窑多,在代碼中可以看到仍稀,我們在server端選擇的就是這種方式,并且也推薦使用該線程模型埂息。在啟動類中創(chuàng)建不同的EventLoopGroup實例并通過適當的參數配置技潘,就可以支持上述三種Reactor線程模型。
因此netty可以作為現今主流的IO通信框架千康。
源碼分析 -- EventLoopGroup
在netty高性能淺析中我們寫了一個小demo享幽,其中服務端和客戶端在最開始都要建立一個或多個 EventLoopGroup,用來管理調度線程處理各種數據拾弃,但它具體是如何實現的呢值桩?讓我們進入源碼詳細看看。
繼承結構
源碼分析
EventExecutorGroup
先來看下中間層的EventExecutorGroup
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
boolean isShuttingDown();
Future<?> shutdownGracefully();
Future<?> shutdownGracefully(long var1, long var3, TimeUnit var5);
Future<?> terminationFuture();
/** @deprecated */
@Deprecated
void shutdown();
/** @deprecated */
@Deprecated
List<Runnable> shutdownNow();
EventExecutor next();
Iterator<EventExecutor> iterator();
Future<?> submit(Runnable var1);
<T> Future<T> submit(Runnable var1, T var2);
<T> Future<T> submit(Callable<T> var1);
ScheduledFuture<?> schedule(Runnable var1, long var2, TimeUnit var4);
<V> ScheduledFuture<V> schedule(Callable<V> var1, long var2, TimeUnit var4);
ScheduledFuture<?> scheduleAtFixedRate(Runnable var1, long var2, long var4, TimeUnit var6);
ScheduledFuture<?> scheduleWithFixedDelay(Runnable var1, long var2, long var4, TimeUnit var6);
}
這個接口定義比較清晰豪椿,首先它繼承了ScheduleEcecutorService 奔坟,因此可以把它看成是一個Task的執(zhí)行器,另外又實現了Iterable接口搭盾,就可以看出它的目的也可以當成EventExecutor的容器方法名也清晰代表了其意思咳秉,像是一些提交任務,取出可用的EventExecutor鸯隅,關閉執(zhí)行器等
EventExecutor
接著來看EventExecutorGroup的其中一個子類EventExecutor
public interface EventExecutor extends EventExecutorGroup {
EventExecutor next();
EventExecutorGroup parent();
boolean inEventLoop();
boolean inEventLoop(Thread var1);
<V> Promise<V> newPromise();
<V> ProgressivePromise<V> newProgressivePromise();
<V> Future<V> newSucceededFuture(V var1);
<V> Future<V> newFailedFuture(Throwable var1);
}
它繼承了EventExecutorGroup類澜建,因此也可以將它看成是一個任務的執(zhí)行器,不過稍微有點不同的是它的next方法返回的是自己的一個引用滋迈,還有就是可以判斷某線程任務是否被EventLoop執(zhí)行了霎奢。
EventLoopGroup
再來看EventExecutorGroup的另一個子類EventLoopGroup
public interface EventLoopGroup extends EventExecutorGroup {
EventLoop next();
ChannelFuture register(Channel var1);
ChannelFuture register(Channel var1, ChannelPromise var2);
}
它的定義也比較簡單户誓,不過這里的next方法返回的是空閑的EventLoop饼灿,也可以看出它的特有功能就是注冊某channel
EventLoop
最后來看EventLoop的定義:
public interface EventLoop extends EventExecutor, EventLoopGroup {
EventLoopGroup parent();
}
該接口同時繼承了EventExecutor和EventLoopGroup,因此可以看做是容器里的一個線程執(zhí)行器
NioEventLoopGroup
然后我們看一下在上節(jié)demo中用到的EventLoopGroup實現類NioEventLoopGroup帝美,他也是netty推薦使用的EventLoopGroup實現類碍彭,其實NioEventLoopGroup和NioEventLoop都可以。但是前者使用的是線程池
group中默認的NioEventLoop個數為cpu核數*2
最終對于用戶悼潭,一般都以EventLoopGroup作為使用的入口庇忌,也就是所謂的事件循環(huán)皆疹,主要是用于事件的注冊