Netty介紹
Netty是由JBOSS提供的一個java開源框架,是業(yè)界最流行的NIO框架蝙眶,整合了多種協(xié)議(包括FTP失晴、SMTP靡狞、HTTP等各種二進制文本協(xié)議)的實現(xiàn)經(jīng)驗龙致,精心設計的框架对人,在多個大型商業(yè)項目中得到充分驗證孽椰。
那些主流框架產(chǎn)品在用飒筑?
- 搜索引擎框架 ElasticSerach
- Hadopp子項目Avro項目渴丸,使用Netty作為底層通信框架
- 阿里巴巴開源的RPC框架 Dubbo
補充:netty4是dubbo2.5.6后引入的懂盐,2.5.6之前的netty用的是netty3Netty在Dubbo里面使用的地址 https://github.com/apache/incubator-dubbo/tree/master/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4
BIO時間返回器
public class BioServer {
public static final int PORT=3456;
public static void main(String[] args) throws IOException {
ServerSocket server=null;
try {
server=new ServerSocket(PORT);
Socket socket=null;
while (true) {
socket= server.accept();
new Thread(new TimerServerHandler(socket)).start();
}
} catch (Exception e) {
e.printStackTrace();
}finally {
if (server != null) {
server.close();
}
}
}
}
public class TimerServerHandler implements Runnable {
private Socket socket;
public TimerServerHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream(), true);
String body=null;
while ((body = in.readLine()) != null && body.length() != 0) {
System.out.println("客戶端發(fā)送:"+body);
out.println(new Date().toString());
}
} catch (Exception e) {
} finally {
if (in!=null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (out!=null) {
try {
out.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (this.socket != null) {
try {
this.socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
public class BioClient {
public static final int PORT=3456;
public static void main(String[] args) {
Socket socket=null;
BufferedReader in=null;
PrintWriter out=null;
try {
socket=new Socket("127.0.0.1",PORT);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
out.println("i am client");
String s = in.readLine();
System.out.println("服務器當前時間:"+s);
} catch (Exception e) {
} finally {
if (in!=null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (out!=null) {
try {
out.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
BIO優(yōu)缺點
-
優(yōu)點
- 模型簡單
- 編碼簡單
缺點:性能瓶頸冶忱,請求數(shù)和線程數(shù) N:N關(guān)系高并發(fā)情況下袱箱,CPU切換線程上下文損耗大
案例:web服務器Tomcat7之前庆揪,都是使用BIO式曲,7之后就使用NIO
改進:偽NIO,使用線程池去處理業(yè)務邏輯
網(wǎng)絡IO模型
同步異步、堵塞和非堵塞
-
洗衣機洗衣服
- 洗衣機洗衣服(無論阻塞式IO還是非阻塞式IO缸榛,都是同步IO模型)
同步阻塞:你把衣服丟到洗衣機洗吝羞,然后看著洗衣機洗完,洗好后再去晾衣服(你就干等内颗,啥都不做钧排,阻塞在那邊)
同步非阻塞:你把衣服丟到洗衣機洗,然后會客廳做其他事情均澳,定時去陽臺看洗衣機是不是洗完了恨溜,洗好后再去晾衣服,這之間可以干其他事情
異步阻塞: 你把衣服丟到洗衣機洗,然后看著洗衣機洗完找前,洗好后再去晾衣服(幾乎沒這個情況糟袁,幾乎沒這個說法,可以忽略)
異步非阻塞:你把衣服丟到洗衣機洗躺盛,然后會客廳做其他事情项戴,洗衣機洗好后會自動去晾衣服,晾完成后放個音樂告訴你洗好衣服并晾好了
IO詳解
- IO操作分兩步:發(fā)起IO請求等待數(shù)據(jù)準備槽惫,實際IO操作(洗衣服周叮,晾衣服)同步須要主動讀寫數(shù)據(jù),在讀寫數(shù)據(jù)的過程中還是會阻塞(好比晾衣服阻塞了你) 異步僅僅須要I/O操作完畢的通知界斜。并不主動讀寫數(shù)據(jù)则吟,由操作系統(tǒng)內(nèi)核完畢數(shù)據(jù)的讀寫(機器人幫你自動晾衣服)
- 五種IO的模型:阻塞IO、非阻塞IO锄蹂、多路復用IO氓仲、信號驅(qū)動IO和異步IO,前四種都是同步IO,在內(nèi)核數(shù)據(jù)copy到用戶空間時都是阻塞的
權(quán)威:RFC標準,或者書籍 《UNIX Network Programming》中文名《UNIX網(wǎng)絡編程-卷一》第六章
1)阻塞式I/O敬扛;
2)非阻塞式I/O晰洒;
3)I/O復用(select,poll啥箭,epoll...)谍珊;
I/O多路復用是阻塞在select,epoll這樣的系統(tǒng)調(diào)用沒有阻塞在真正的I/O系統(tǒng)調(diào)用如recvfrom進程受阻于select,等待可能多個套接口中的任一個變?yōu)榭勺x
IO多路復用使用兩個系統(tǒng)調(diào)用(select和recvfrom)
blocking IO只調(diào)用了一個系統(tǒng)調(diào)用(recvfrom)
select/epoll 核心是可以同時處理多個connection急侥,而不是更快砌滞,所以連接數(shù)不高的話,性能不一定比多線程+阻塞IO好
多路復用模型中坏怪,每一個socket贝润,設置為non-blocking,
阻塞是在select這
信號驅(qū)動式I/O(SIGIO)
異步I/O(POSIX的aio_系列函數(shù))Future-Listener機制
-
IO操作分為兩步
- 發(fā)起IO請求,等待數(shù)據(jù)準備(Waiting for the data to be ready)
- 實際的IO操作铝宵,將數(shù)據(jù)從內(nèi)核拷貝到進程中(Copying the data from the kernel to the process)
前四種IO模型都是同步IO操作打掘,區(qū)別在于第一階段,而他們的第二階段是一樣的:在數(shù)據(jù)從內(nèi)核復制到應用緩沖區(qū)期間(用戶空間)鹏秋,進程阻塞于recvfrom調(diào)用或者select()函數(shù)尊蚁。相反,異步I/O模型在這兩個階段都要處理侣夷。
阻塞IO和非阻塞IO的區(qū)別在于第一步横朋,發(fā)起IO請求是否會被阻塞,如果阻塞直到完成那么就是傳統(tǒng)的阻塞IO百拓,如果不阻塞叶撒,那么就是非阻塞IO。同步IO和異步IO的區(qū)別就在于第二個步驟是否阻塞耐版,如果實際的IO讀寫阻塞請求進程祠够,那么就是同步IO,因此阻塞IO粪牲、非阻塞IO古瓤、IO復用、信號驅(qū)動IO都是同步IO腺阳,如果不阻塞落君,而是操作系統(tǒng)幫你做完IO操作再將結(jié)果返回給你,那么就是異步IO亭引。
幾個核心點:
阻塞非阻塞說的是線程的狀態(tài)(重要)
同步和異步說的是消息的通知機制(重要)
同步需要主動讀寫數(shù)據(jù),異步是不需要主動讀寫數(shù)據(jù)
同步IO和異步IO是針對用戶應用程序和內(nèi)核的交互
異步需要內(nèi)核層次的支持
IO多路復用技術(shù)
什么是IO多路復用:I/O多路復用绎速,I/O是指網(wǎng)絡I/O, 多路指多個TCP連接(即socket或者channel),復用指復用一個或幾個線程焙蚓。簡單來說:就是使用一個或者幾個線程處理多個TCP連接,最大優(yōu)勢是減少系統(tǒng)開銷小纹冤,不必創(chuàng)建過多的進程/線程洒宝,也不必維護這些進程/線程
select:
基本原理:監(jiān)視文件3類描述符: writefds、readfds萌京、和exceptfds,調(diào)用后select
函數(shù)會阻塞住雁歌,等有數(shù)據(jù) 可讀、可寫知残、出異常 或者 超時 就會返回,select函數(shù)正常返回后靠瞎,通過遍歷fdset整個數(shù)組才能發(fā)現(xiàn)哪些句柄發(fā)生了事件,來找到
就緒的描述符fd求妹,然后進行對應的IO操作,幾乎在所有的平臺上支持乏盐,跨平臺支持性好
缺點:
1)select采用輪詢的方式掃描文件描述符,全部掃描制恍,隨著文件描述符FD數(shù)量增多而性能下降
2)每次調(diào)用 select()父能,需要把 fd 集合從用戶態(tài)拷貝到內(nèi)核態(tài),并進行遍歷(消息傳遞都是從內(nèi)核到用戶空間)
3)最大的缺陷就是單個進程打開的FD有限制吧趣,默認是1024法竞,這個指的是jvm的限制耙厚,而不是linux的限制(可修改宏定義强挫,但是效率仍然慢)
static final int MAX_FD = 1024
poll:
基本流程:
select() 和 poll() 系統(tǒng)調(diào)用的大體一樣,處理多個描述符也是使用輪詢的方式薛躬,根據(jù)描述符的狀態(tài)進行處理,一樣需要把 fd 集合從用戶態(tài)拷貝到內(nèi)核態(tài)俯渤,并進行遍歷。最大區(qū)別是: poll沒有最大文件描述符限制(使用鏈表的方式存儲fd)
select和poll基本沒啥區(qū)別型宝,主要是一個鏈表一個數(shù)組八匠。
Epoll講解
epoll 基本原理:
在2.6內(nèi)核中提出的,對比select和poll趴酣,epoll更加靈活梨树,沒有描述符限制,用戶態(tài)拷貝到內(nèi)核態(tài)只需要一次
使用事件通知岖寞,通過epoll_ctl注冊fd抡四,一旦該fd就緒,內(nèi)核就會采用callback的回調(diào)機制來激活對應的fd
優(yōu)點:
1)沒fd這個限制仗谆,所支持的FD上限是操作系統(tǒng)的最大文件句柄數(shù)指巡,1G內(nèi)存大概支持10萬個句柄
2)效率提高,使用回調(diào)通知而不是輪詢的方式隶垮,不會隨著FD數(shù)目的增加效率下降
3)通過callback機制通知藻雪,內(nèi)核和用戶空間mmap同一塊內(nèi)存實現(xiàn)
Linux內(nèi)核核心函數(shù)
1)epoll_create() 在Linux內(nèi)核里面申請一個文件系統(tǒng) B+樹,返回epoll對象狸吞,也是一個fd
2)epoll_ctl() 操作epoll對象勉耀,在這個對象里面修改添加刪除對應的鏈接fd, 綁定一個callback函數(shù)
3)epoll_wait() 判斷并完成對應的IO操作
缺點:
編程模型比select/poll 復雜
例子:100萬個連接指煎,里面有1萬個連接是活躍,在 select瑰排、poll贯要、epoll分別是怎樣的表現(xiàn)
select:不修改宏定義,則需要 1000個進程才可以支持 100萬連接
poll:100萬個鏈接椭住,遍歷都響應不過來了崇渗,還有空間的拷貝消耗大量的資源
epoll:通過回調(diào)通知,性能相比之下提升很大
Java的I/O演進歷史
- jdk1.4之前是采用同步阻塞模型京郑,也就是BIO 大型服務一般采用C或者C++, 因為可以直接操作系統(tǒng)提供的異步IO,AIO
- jdk1.4推出NIO,支持非阻塞IO宅广,jdk1.7升級,推出NIO2.0,提供AIO的功能,支持文件和網(wǎng)絡套接字的異步IO
Netty線程模型和Reactor模式
- 設計模式——Reactor模式(反應器設計模式)些举,是一種基于事件驅(qū)動的設計模式跟狱,在事件驅(qū)動的應用中,將一個或多個客戶的服務請求分離(demultiplex)和調(diào)度(dispatch)給應用程序户魏。在事件驅(qū)動的應用中驶臊,同步地、有序地處理同時接收的多個服務請求一般出現(xiàn)在高并發(fā)系統(tǒng)中叼丑,比如Netty关翎,Redis等
- 優(yōu)點
- 1)響應快,不會因為單個同步而阻塞鸠信,雖然Reactor本身依然是同步的
- 2)編程相對簡單纵寝,最大程度的避免復雜的多線程及同步問題,并且避免了多線程/進程的切換開銷星立;
- 3)可擴展性爽茴,可以方便的通過增加Reactor實例個數(shù)來充分利用CPU資源;
- 缺點
- 1)相比傳統(tǒng)的簡單模型绰垂,Reactor增加了一定的復雜性室奏,因而有一定的門檻,并且不易于調(diào)試劲装。
- 2)Reactor模式需要系統(tǒng)底層的的支持胧沫,比如Java中的Selector支持,操作系統(tǒng)的select系統(tǒng)調(diào)用支持
- 通俗理解:KTV例子前臺接待酱畅,服務人員帶領去開機器
- Reactor模式基于事件驅(qū)動琳袄,適合處理海量的I/O事件,屬于同步非阻塞IO(NIO)
- Reactor單線程模型(比較少用)
- 1)作為NIO服務端纺酸,接收客戶端的TCP連接窖逗;作為NIO客戶端,向服務端發(fā)起TCP連接餐蔬;
- 2)服務端讀請求數(shù)據(jù)并響應碎紊;客戶端寫請求并讀取響應
使用場景: 對應小業(yè)務則適合佑附,編碼簡單;對于高負載仗考、大并發(fā)的應用場景不適合音同,一個NIO線程處理太多請求,則負載過高秃嗜,并且可能響應變慢权均,導致大量請求超時,而且萬一線程掛了锅锨,則不可用了
- Reactor多線程模型
- 內(nèi)容:Acceptor不在是一個線程叽赊,而是一組NIO線程;IO線程也是一組NIO線程必搞,這樣就是兩個線程池去處理接入連接和處理IO
- 使用場景:滿足目前的大部分場景必指,也是Netty推薦使用的線程模型
實際上的Reactor模式,是基于Java NIO的恕洲,在他的基礎上塔橡,抽象出來兩個組件——Reactor和Handler兩個組件:
(1)Reactor:負責響應IO事件,當檢測到一個新的事件霜第,將其發(fā)送給相應的Handler去處理葛家;新的事件包含連接建立就緒、讀就緒庶诡、寫就緒等惦银。
(2)Handler:將自身(handler)與事件綁定咆课,負責事件的處理末誓,完成channel的讀入,完成處理業(yè)務邏輯后书蚪,負責將結(jié)果寫出channel喇澡。
總結(jié):上面的單線程Reactor其實就可以看著一個特殊的handler。而多線程Reactor則分為兩部分殊校,一部分是Reactor(可以為多線程晴玖,線程組或者單線程),而handler也就是上面說的IO線程为流,必須是線程組或者多線程呕屎。
附屬資料:
為什么Netty使用NIO而不是AIO,是同步非阻塞還是異步非阻塞敬察?
答案:
在Linux系統(tǒng)上秀睛,AIO的底層實現(xiàn)仍使用EPOLL,與NIO相同莲祸,因此在性能上沒有明顯的優(yōu)勢
Netty整體架構(gòu)是reactor模型蹂安,采用epoll機制椭迎,所以往深的說,還是IO多路復用模式田盈,所以也可說netty是同步非阻塞模型(看的層次不一樣)
很多人說這是netty是基于Java NIO 類庫實現(xiàn)的異步通訊框架
特點:異步非阻塞畜号、基于事件驅(qū)動,性能高允瞧,高可靠性和高可定制性简软。
參考資料:
https://github.com/netty/netty/issues/2515
基于netty搭建echo服務
常用服務組件
- EventLoop和EventLoopGroup
- Bootstrapt啟動引導類
- Channel 生命周期,狀態(tài)變化
- ChannelHandler和ChannelPipline
代碼
public class EchoServer {
private int port;
public EchoServer(int port) {
this.port = port;
}
/**
* 啟動流程
*/
public void run() throws InterruptedException {
//配置服務端線程組
EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workGroup=new NioEventLoopGroup();
try {
//啟動類
ServerBootstrap serverBootstrap=new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.option(ChannelOption.TCP_NODELAY,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
//串聯(lián)很多要處理的handler
ch.pipeline().addLast(new EchoHandler());
}
});
//綁定端口述暂,同步等待成功
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
//等待服務端監(jiān)聽端口關(guān)閉
channelFuture.channel().closeFuture().sync();
}finally {
//優(yōu)雅退出替饿,釋放線程池
workGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
int port=8080;
if (args.length > 0) {
port=Integer.parseInt(args[0]);
}
new EchoServer(port).run();
}
}
public class EchoHandler extends ChannelInboundHandlerAdapter {
//讀取數(shù)據(jù)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// Channel channel = ctx.channel();
// channel.writeAndFlush()
// ChannelPipeline pipeline = ctx.pipeline();
// pipeline.writeAndFlush()
ByteBuf data= (ByteBuf) msg;
System.out.println("服務端收到數(shù)據(jù):"+data.toString(CharsetUtil.UTF_8));
ctx.writeAndFlush(data);
}
//讀取完成
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("EchoServerHandler channelReadComplete");
}
//異常捕獲
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();//關(guān)閉管道
}
}
public class EchoClient {
private String host;
private int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
//https://blog.csdn.net/fd2025/article/details/79740226
public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new EchoClientHandler());
}
});
//連接到服務端,connect是異步連接贸典,再調(diào)用同步async,等待連接成功從
ChannelFuture channelFuture = bootstrap.connect().sync();
//阻塞视卢,直到客戶端通道關(guān)閉
channelFuture.channel().closeFuture().sync();
} finally {
//優(yōu)雅退出,釋放nio線程
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new EchoClient("127.0.0.1", 8080).start();
}
}
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf msg) throws Exception {
System.out.println("Client Received: "+msg.toString(CharsetUtil.UTF_8));
}
//channel激活的時候
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
ctx.writeAndFlush(Unpooled.copiedBuffer("哈哈測試",CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("EchoClientHandler Complate");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>net.xdclass</groupId>
<artifactId>echo-project</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.32.Final</version>
</dependency>
</dependencies>
</project>
Netty的核心鏈路源碼
剖析EventLoop和EventLoopGroup線程模型
- 高性能RPC框架的3個要素:IO模型(linux的IO模型五種)廊驼、數(shù)據(jù)協(xié)議(http,rpc等)据过、線程模型
線程模型
1. 傳統(tǒng)IO模型:
每個請求都分配一個線程用來處理該請求,關(guān)于該請求
的read妒挎,handle绳锅,和send都放在一個線程中進行處理
2. 基于線程池的偽異步IO模型
針對傳統(tǒng)IO模型中會造成線程資源極大浪費的缺點,通
過線程池來復用線程處理客戶端連接和數(shù)據(jù)處理.
* 會有一個阻塞線程負責socket連接酝掩,即acceptor鳞芙;
*會有一個線程池維護n個活躍線程和一個消息隊列,來
處理socketTask期虾,所以資源是可控的原朝,所以無論客戶端
多少并發(fā)連接,都會導致系統(tǒng)資源耗盡和宕機镶苞;
缺點:
- 無法解決通信阻塞的問題喳坠,因為socket.read()方法是
流式數(shù)據(jù)讀取,因此只能讀取完所有數(shù)據(jù)后才能正確處理茂蚓,如果一個socket發(fā)送數(shù)據(jù)需要60秒那么該線程處理數(shù)
據(jù)至少要60秒壕鹉,那么這段時間內(nèi)的io事件,該線程是
無法及時處理的聋涨,如果這樣的io事件出現(xiàn)多次晾浴,很可
能造成消息隊列阻塞;
- 只有一個acceptor負責socket連接牍白,如果線程池阻塞隊列阻塞之后脊凰,那么所有新的客戶端連接也將會被拒絕;如果大量連接拒絕淹朋,就可能會認定為系統(tǒng)故障笙各;
3. Reactor模型(實時響應)
前面已經(jīng)講過這個模型钉答;
IO復用結(jié)合線程池復用就是Reactor模型設計的基本思想
總結(jié):線程模型其實就是IO模型的相關(guān)運用,可能還會搭配線程池服用杈抢,例如Reactor模型
EventLoop好比一個線程数尿,1個EventLoop可以服務多個Channel,1個Channel只有一個EventLoop可以創(chuàng)建多個 EventLoop 來優(yōu)化資源利用惶楼,也就是EventLoopGroup
-
EventLoopGroup 負責分配 EventLoop 到新創(chuàng)建的 Channel右蹦,里面包含多個EventLoop
- EventLoopGroup -> 多個 EventLoop
- EventLoop -> 維護一個Selector(其實就是遍歷器)
- 學習資料:http://ifeve.com/selectors/
EventLoopGroup默認線程池數(shù)量是系統(tǒng)核數(shù)*2
Bootstrap模塊講解
- 服務器啟動引導類ServerBootstrap
- group :設置線程組模型,Reactor線程模型對比EventLoopGroup
- 單線程
- 多線程
- 主從線程
- 參考:https://blog.csdn.net/QH_JAVA/article/details/78443646
- group :設置線程組模型,Reactor線程模型對比EventLoopGroup
- channel:
設置channel通道類型NioServerSocketChannel歼捐、OioServerSocketChannel
-
option: 作用于每個新建立的channel何陆,設置TCP連接中的一些參數(shù),如下
- ChannelOption.SO_BACKLOG: 存放已完成三次握手的請求的等待隊列的最大長度;
- Linux服務器TCP連接底層知識:
- syn queue:半連接隊列,洪水攻擊豹储,tcp_max_syn_backlog
- accept queue:全連接隊列贷盲, net.core.somaxconn
- 系統(tǒng)默認的somaxconn參數(shù)要足夠大 ,如果backlog比somaxconn大剥扣,則會優(yōu)先用后者 https://github.com/netty/netty/blob/4.1/common/src/main/java/io/netty/util/NetUtil.java#L250
- ChannelOption.TCP_NODELAY: 為了解決Nagle的算法問題巩剖,默認是false, 要求高實時性,有數(shù)據(jù)時馬上發(fā)送钠怯,就將該選項設置為true關(guān)閉Nagle算法佳魔;如果要減少發(fā)送次數(shù),就設置為false晦炊,會累積一定大小后再發(fā)送
- 知識拓展: https://baike.baidu.com/item/Nagle%E7%AE%97%E6%B3%95/5645172 https://www.2cto.com/article/201309/241096.html
childOption: 作用于被accept之后的連接
childHandler: 用于對每個通道里面的數(shù)據(jù)處理
粗略的理解為option是給bossGroup配置的鞠鲜,childOption是給workerGroup配置的;這兩個線程組對應reactor模型的Acceptor和handler
- 客戶端啟動引導類Bootstrap
- remoteAddress: 服務端地址
- handler:和服務端通信的處理器
Channel模塊
- 什么是Channel: 客戶端和服務端建立的一個連接通道
- 什么是ChannelHandler: 負責Channel的邏輯處理
- 什么是ChannelPipeline:負責管理ChannelHandler的有序容器
- 他們是什么關(guān)系
一個Channel包含一個ChannelPipeline,所有ChannelHandler都會順序加入到ChannelPipeline中 創(chuàng)建Channel時會自動創(chuàng)建一個ChannelPipeline断国,每個Channel都有一個管理它的pipeline贤姆,這關(guān)聯(lián)是永久性的
- Channel當狀態(tài)出現(xiàn)變化,就會觸發(fā)對應的事件
- 狀態(tài):
- channelRegistered: channel注冊到一個EventLoop
- channelActive: 變?yōu)榛钴S狀態(tài)(連接到了遠程主機)并思,可以接受和發(fā)送數(shù)據(jù)
- channelInactive: channel處于非活躍狀態(tài)庐氮,沒有連接到遠程主機
- channelUnregistered: channel已經(jīng)創(chuàng)建语稠,但是未注冊到一個EventLoop里面宋彼,也就是沒有和Selector綁定
- 狀態(tài):
特別注意:執(zhí)行順序channelRegistered-》channelActive=》channelInactive=》channelUnregistered
ChannelHandler和ChannelPipeline模塊講解
- 方法: handlerAdded : 當 ChannelHandler 添加到 ChannelPipeline 調(diào)用; handlerRemoved : 當 ChannelHandler 從 ChannelPipeline 移除時調(diào)用; exceptionCaught : 執(zhí)行拋出異常時調(diào)用;
- ChannelHandler下主要是兩個子接口
- ChannelInboundHandler:(入站) 處理輸入數(shù)據(jù)和Channel狀態(tài)類型改變, 適配器ChannelInboundHandlerAdapter(適配器設計模式) 常用的:SimpleChannelInboundHandler
- ChannelOutboundHandler:(出站) 處理輸出數(shù)據(jù)仙畦,適配器ChannelOutboundHandlerAdapter
- ChannelPipeline: 好比廠里的流水線一樣输涕,可以在上面添加多個ChannelHandler,也可看成是一串 ChannelHandler實例慨畸,攔截穿過 Channel 的輸入輸出 event,ChannelPipeline實現(xiàn)了攔截器的一種高級形式莱坎,使得用戶可以對事件的處理以及ChannelHanler之間交互獲得完全的控制權(quán)
ChannelHandlerContext模塊
- ChannelHandlerContext是連接ChannelHandler和ChannelPipeline的橋梁,ChannelHandlerContext部分方法和Channel及ChannelPipeline重合,好比調(diào)用write方法
- Channel寸士、ChannelPipeline檐什、ChannelHandlerContext 都可以調(diào)用此方法碴卧,前兩者都會在整個管道流里傳播,而ChannelHandlerContext就只會在后續(xù)的Handler里面?zhèn)鞑?/li>
- AbstractChannelHandlerContext類雙向鏈表結(jié)構(gòu)乃正,next/prev分別是后繼節(jié)點住册,和前驅(qū)節(jié)點
- DefaultChannelHandlerContext 是實現(xiàn)類,但是大部分都是父類那邊完成瓮具,這個只是簡單的實現(xiàn)一些方法 主要就是判斷Handler的類型
- ChannelInboundHandler之間的傳遞荧飞,主要通過調(diào)用ctx里面的FireXXX()方法來實現(xiàn)下個handler的調(diào)用
入站出站Handler執(zhí)行順序
- InboundHandler順序執(zhí)行,OutboundHandler逆序執(zhí)行
- InboundHandler之間傳遞數(shù)據(jù)名党,通過ctx.fireChannelRead(msg)
- InboundHandler通過ctx.write(msg)叹阔,則會傳遞到outboundHandler
- 使用ctx.write(msg)傳遞消息,Inbound需要放在結(jié)尾传睹,在Outbound之后耳幢,不然outboundhandler會不執(zhí)行;但是使用channel.write(msg)欧啤、pipline.write(msg)情況會不一致帅掘,outboundhandler都會執(zhí)行
- outBound和Inbound誰先執(zhí)行,針對客戶端和服務端而言堂油,客戶端是發(fā)起請求再接受數(shù)據(jù)修档,先outbound再inbound,服務端則相反
總結(jié):需要保證最后一個outhandler的的上下文可以有next的指向府框,否則最后一個outhandler就不會執(zhí)行了吱窝,也就是說最后一個inhanlder之后的outhandler都不會執(zhí)行。所以一般最后都要有一個inhandler迫靖。
模塊ChannelFuture
- Netty中的所有I/O操作都是異步的,這意味著任何I/O調(diào)用都會立即返回院峡,而ChannelFuture會提供有關(guān)的信息I/O操作的結(jié)果或狀態(tài)。
- ChannelFuture狀態(tài)
- 未完成:當I/O操作開始時系宜,將創(chuàng)建一個新的對象照激,新的最初是未完成的 - 它既沒有成功,也沒有成功盹牧,也沒有被取消俩垃,因為I/O操作尚未完成。
- 已完成:當I/O操作完成汰寓,不管是成功口柳、失敗還是取消,F(xiàn)uture都是標記為已完成的, 失敗的時候也有具體的信息有滑,例如原因失敗跃闹,但請注意,即使失敗和取消屬于完成狀態(tài)
- 注意:不要在IO線程內(nèi)調(diào)用future對象的sync或者await方法。不能在channelHandler中調(diào)用sync或者await方法望艺,會阻塞
- ChannelPromise:繼承于ChannelFuture苛秕,進一步拓展用于設置IO操作的結(jié)果
Netty網(wǎng)絡數(shù)據(jù)傳輸編解碼
- 最開始接觸的編碼碼:java序列化/反序列化(就是編解碼)、url編碼找默、base64編解碼
- 為啥jdk有編解碼想帅,還要netty自己開發(fā)編解碼?
- java自帶序列化的缺點
1)無法跨語言
2) 序列化后的碼流太大啡莉,也就是數(shù)據(jù)包太大
3) 序列化和反序列化性能比較差
- 業(yè)界里面也有其他編碼框架: google的 protobuf(PB)港准、Facebook的Trift、Jboss的Marshalling咧欣、Kyro等
- Netty里面的編解碼:
- 解碼器:負責處理“入站 InboundHandler”數(shù)據(jù)
- 編碼器:負責“出站 OutboundHandler” 數(shù)據(jù)
- Netty里面提供默認的編解碼器浅缸,也支持自定義編解碼器
- Encoder:編碼器
- Decoder:解碼器
- Codec:編解碼器
解碼器Decoder
- Decoder對應的就是ChannelInboundHandler,主要就是字節(jié)數(shù)組轉(zhuǎn)換為消息對象
- 主要是兩個方法 decode decodeLast
- 抽象解碼器
- ByteToMessageDecoder用于將字節(jié)轉(zhuǎn)為消息魄咕,需要檢查緩沖區(qū)是否有足夠的字節(jié)
- ReplayingDecoder繼承ByteToMessageDecoder衩椒,不需要檢查緩沖區(qū)是否有足夠的字節(jié),但是ReplayingDecoder速度略滿于ByteToMessageDecoder哮兰,不是所有的ByteBuf都支持
- 選擇:項目復雜性高則使用ReplayingDecoder毛萌,否則使用 ByteToMessageDecoder
- MessageToMessageDecoder用于從一種消息解碼為另外一種消息(例如POJO到POJO)
- 解碼器具體的實現(xiàn),用的比較多的是(更多是為了解決TCP底層的粘包和拆包問題)
- DelimiterBasedFrameDecoder: 指定消息分隔符的解碼器
- LineBasedFrameDecoder: 以換行符為結(jié)束標志的解碼器
- FixedLengthFrameDecoder:固定長度解碼器
- LengthFieldBasedFrameDecoder:message = header+body, 基于長度解碼的通用解碼器
- StringDecoder:文本解碼器喝滞,將接收到的對象轉(zhuǎn)化為字符串阁将,一般會與上面的進行配合,然后在后面添加業(yè)務handle
編碼器Encoder
- Encoder對應的就是ChannelOutboundHandler右遭,消息對象轉(zhuǎn)換為字節(jié)數(shù)組
- Netty本身未提供和解碼一樣的編碼器做盅,是因為場景不同,兩者非對等的(也就是不見得是一對一的關(guān)系)
- MessageToByteEncoder消息轉(zhuǎn)為字節(jié)數(shù)組,調(diào)用write方法窘哈,會先判斷當前編碼器是否支持需要發(fā)送的消息類型吹榴,如果不支持,則透傳滚婉;
- MessageToMessageEncoder用于從一種消息編碼為另外一種消息(例如POJO到POJO)
編解碼器類Codec
組合解碼器和編碼器图筹,以此提供對于字節(jié)和消息都相同的操作
優(yōu)點:成對出現(xiàn),編解碼都是在一個類里面完成
缺點:耦合在一起让腹,拓展性不佳
Codec:組合編解碼
1)ByteToMessageCodec
2)MessageToMessageCodec
decoder:解碼
1)ByteToMessageDecoder
2)MessageToMessageDecoder
encoder:編碼
1)ByteToMessageEncoder
2)MessageToMessageEncoder
TCP粘包拆包
什么是粘包拆包
1)TCP拆包: 一個完整的包可能會被TCP拆分為多個包進行發(fā)送
2)TCP粘包: 把多個小的包封裝成一個大的數(shù)據(jù)包發(fā)送, client發(fā)送的若干數(shù)據(jù)包 Server接收時粘成一包
發(fā)送方和接收方都可能出現(xiàn)這個原因
發(fā)送方的原因:TCP默認會使用Nagle算法
接收方的原因: TCP接收到數(shù)據(jù)放置緩存中远剩,應用程序從緩存中讀取
UDP: 是沒有粘包和拆包的問題,有邊界協(xié)議
TCP半包讀寫常見解決方案
發(fā)送方:可以關(guān)閉Nagle算法
接受方: TCP是無界的數(shù)據(jù)流哨鸭,并沒有處理粘包現(xiàn)象的機制, 且協(xié)議本身無法避免粘包民宿,半包讀寫的發(fā)生需要在應用層進行處理
應用層解決半包讀寫的辦法
1)設置定長消息 (10字符)
xdclass000xdclass000xdclass000xdclass000
2)設置消息的邊界 ($$ 切割)
sdfafwefqwefwe$$dsafadfadsfwqehidwuehfiw$$879329832r89qweew$$
3)使用帶消息頭的協(xié)議,消息頭存儲消息開始標識及消息的長度信息
Header+Body
Netty自帶解決TCP半包讀寫方案
DelimiterBasedFrameDecoder: 指定消息分隔符的解碼器
- LineBasedFrameDecoder:以換行符為結(jié)束標志的解碼器
- FixedLengthFrameDecoder:固定長度解碼器
- LengthFieldBasedFrameDecoder:message = header+body, 基于長度解碼的通用解碼器
public void run() throws Exception{
//配置服務端的線程組
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ServerHandler());
}
});
System.out.println("Echo 服務器啟動");
//綁定端口像鸡,同步等待成功
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
//等待服務端監(jiān)聽端口關(guān)閉
channelFuture.channel().closeFuture().sync();
}finally {
//優(yōu)雅退出,釋放線程池
workGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
LineBasedFrameDecoder解決TCP半包讀寫
- LineBaseFrameDecoder 以換行符為結(jié)束標志的解碼器 ,構(gòu)造函數(shù)里面的數(shù)字表示最長遍歷的幀數(shù)
- StringDecoder解碼器將對象轉(zhuǎn)成字符串
自定義分隔符解決TCP讀寫問題
- maxLength:表示一行最大的長度,如果超過這個長度依然沒有檢測自定義分隔符只估,將會拋出TooLongFrameException
- failFast:如果為true志群,則超出maxLength后立即拋出TooLongFrameException,不進行繼續(xù)解碼.如果為false蛔钙,則等到完整的消息被解碼后锌云,再拋出TooLongFrameException異常
- stripDelimiter:解碼后的消息是否去除掉分隔符
- delimiters:分隔符,ByteBuf類型
自定義長度半包讀寫器LengthFieldBasedFrameDecoder
maxFrameLength 數(shù)據(jù)包的最大長度
lengthFieldOffset 長度字段的偏移位吁脱,長度字段開始的地方桑涎,意思是跳過指定長度個字節(jié)之后的才是消息體字段
lengthFieldLength 長度字段占的字節(jié)數(shù), 幀數(shù)據(jù)長度的字段本身的長度
lengthAdjustment
一般 Header + Body,添加到長度字段的補償值,如果為負數(shù)兼贡,開發(fā)人員認為這個 Header的長度字段是整個消息包的長度攻冷,則Netty應該減去對應的數(shù)字
initialBytesToStrip 從解碼幀中第一次去除的字節(jié)數(shù), 獲取完一個完整的數(shù)據(jù)包之后,忽略前面的指定位數(shù)的長度字節(jié)遍希,應用解碼器拿到的就是不帶長度域的數(shù)據(jù)包
failFast 是否快速失敗
緩沖ByteBuf
ByteBuf是為解決ByteBuffer的問題和滿足網(wǎng)絡應用程序開發(fā)人員的日常需求而設計的
JDK ByteBuffer的缺點:
無法動態(tài)擴容:長度固定等曼,不能動態(tài)擴展和收縮,當數(shù)據(jù)大于ByteBuffer容量時凿蒜,會發(fā)生索引越界異常
API使用復雜:讀寫的時候需要手工調(diào)用flip()和rewind()等方法禁谦,使用時需要非常謹慎的使用這些API,否則很容易出現(xiàn)錯誤
ByteBuf:是數(shù)據(jù)容器(字節(jié)容器)
JDK ByteBuffer:共用讀寫索引废封,每次讀寫操作都需要Flip(復位州泊,因為讀索引和寫索引是同一個)擴容麻煩,而且擴容后容易造成浪費
Netty ByteBuf: 讀寫使用不同的索引漂洋,所以操作便捷自動擴容平斩,使用便捷
增強
- API操作便捷性
- 動態(tài)擴容
- 多種ByteBuf實現(xiàn)
- 高效的零拷貝機制
ByteBuf操作
ByteBuf動態(tài)擴容
capacity默認值:256字節(jié),最大值:Integet.MAX_VALUE(2GB)
write*方法調(diào)用時终畅,通過AbstractByteBuf.ensureWritable0進行檢查
容量計算方法:AbstractByteBufAllocator.calculateNewCapacity(新capacity的最小要求砸捏,capacity最大值)
根據(jù)新capacity的最小值要求,對應有兩套計算方法:
沒超過4M:從64字節(jié)開始爽冕,每次增加一倍仇祭,直至計算出來的newCpacity滿足新容量最小要求
示例:當前大小256,寫250颈畸,繼續(xù)寫10字節(jié)數(shù)據(jù)乌奇,需要的容量最小要求是261,則新容量是6422*2=512
超過4M:新容量=新容量最小要求/4M*4M+4M
示例:當前大小3M眯娱,已寫3M礁苗,繼續(xù)寫2M數(shù)據(jù),需要的容量最小要求是5M徙缴,則新容量是9M(不能超過最大值)
4M的來源:一個固定的閾值AbstractByteBufAllocator.CALCULATE_THRESHOLD
ByteBuf實現(xiàn)
所謂池化试伙,其實就是內(nèi)存復用
Unsafe的實現(xiàn)
PooledByteBuf對象、內(nèi)存復用
零拷貝機制
Netty的零拷貝機制,是一種應用層的實現(xiàn)疏叨。和底層JVM潘靖、操作系統(tǒng)內(nèi)存機制并無過多的關(guān)聯(lián)。
使用ByteBuf時netty高性能很重喲的一個原因蚤蔓。
說明:例如2.就是buffer持有array的引用卦溢,實際上數(shù)據(jù)沒動,3也是秀又,數(shù)據(jù)沒動单寂,只是其中l(wèi)l的引用被buffer持有;還有1吐辙,如果是常規(guī)jdk的數(shù)組合并宣决,其實是拷貝數(shù)據(jù),同時新開內(nèi)存生成新的數(shù)組
ByteBuf創(chuàng)建方法和常用的模式
ByteBuf:傳遞字節(jié)數(shù)據(jù)的容器
ByteBuf的創(chuàng)建方法
1)ByteBufAllocator
池化(Netty4.x版本后默認使用 PooledByteBufAllocator提高性能并且最大程度減少內(nèi)存碎片
非池化UnpooledByteBufAllocator: 每次返回新的實例
2)Unpooled: 提供靜態(tài)方法創(chuàng)建未池化的ByteBuf袱讹,可以創(chuàng)建堆內(nèi)存和直接內(nèi)存緩沖區(qū)
?
ByteBuf使用模式
堆緩存區(qū)HEAP BUFFER:
優(yōu)點:存儲在JVM的堆空間中疲扎,可以快速的分配和釋放
缺點:每次使用前會拷貝到直接緩存區(qū)(也叫堆外內(nèi)存)
直接緩存區(qū)DIRECR BUFFER:
優(yōu)點:存儲在堆外內(nèi)存上,堆外分配的直接內(nèi)存捷雕,不會占用堆空間
缺點:內(nèi)存的分配和釋放椒丧,比在堆緩沖區(qū)更復雜
復合緩沖區(qū)COMPOSITE BUFFER:
可以創(chuàng)建多個不同的ByteBuf,然后放在一起救巷,但是只是一個視圖
選擇:大量IO數(shù)據(jù)讀寫壶熏,用“直接緩存區(qū)”; 業(yè)務消息編解碼用“堆緩存區(qū)”
Netty內(nèi)部設計模式
Builder構(gòu)造器模式:ServerBootstap
責任鏈設計模式:pipeline的事件傳播
工廠模式: 創(chuàng)建Channel
適配器模式:HandlerAdapter
單機百萬連接
必備知識
- 網(wǎng)絡IO模型
- Linux文件描述符
- 單進程文件句柄數(shù)(默認1024浦译,不同系統(tǒng)不一樣棒假,每個進程都有最大的文件描述符限制)
- 全局文件句柄數(shù)
- 如何確定一個唯一的TCP連接.
- TCP四元組:源IP地址、源端口精盅、目的ip帽哑、目的端口
Netty單機百萬連接Linux內(nèi)核參數(shù)優(yōu)化
局部文件句柄限制(單個進程最大文件打開數(shù))
ulimit -n 一個進程最大打開的文件數(shù) fd 不同系統(tǒng)有不同的默認值
root身份編輯 vim /etc/security/limits.conf
增加下面
root soft nofile 1000000
root hard nofile 1000000
* soft nofile 1000000
* hard nofile 1000000
* 表示當前用戶,修改后要重啟
全局文件句柄限制(所有進程最大打開的文件數(shù)叹俏,不同系統(tǒng)是不一樣妻枕,可以直接echo臨時修改)
查看命令
cat /proc/sys/fs/file-max
永久修改全局文件句柄, 修改后生效 sysctl -p
vim /etc/sysctl.conf
增加 fs.file-max = 1000000
啟動
java -jar millionServer-1.0-SNAPSHOT.jar -Xms5g -Xmx5g -XX:NewSize=3g -XX:MaxNewSize=3g