IO模型
上述5種IO模型洛波,前4種模型-阻塞IO额获、非阻塞IO、IO復(fù)用、信號(hào)驅(qū)動(dòng)IO都是同步I/O模型安疗,因?yàn)槠渲姓嬲腎/O操作(recvfrom)將阻塞進(jìn)程,在內(nèi)核數(shù)據(jù)copy到用戶空間時(shí)都是阻塞的够委。
一荐类、NIO原理
Netty 是基于Java NIO 封裝的網(wǎng)絡(luò)通訊框架,只有充分理解了 Java NIO 才能理解好Netty的底層設(shè)計(jì)茁帽。Java NIO 由三個(gè)核心組件組件:
Buffer:固定數(shù)量的數(shù)據(jù)的容器玉罐。在 Java NIO 中,任何時(shí)候訪問 NIO 中的數(shù)據(jù)潘拨,都需要通過緩沖區(qū)(Buffer)進(jìn)行操作吊输。NIO 最常用的緩沖區(qū)則是 ByteBuffer。
Channel:是一個(gè)通道铁追,它就像自來水管一樣季蚂,網(wǎng)絡(luò)數(shù)據(jù)通過 Channel 這根水管讀取和寫入。傳統(tǒng)的 IO 是基于流進(jìn)行操作的琅束,Channle 和流類似癣蟋,但又有些不同:
#傳統(tǒng)IO:FileInputStream
public static void method2(){
InputStream in = null;
try{
in = new BufferedInputStream(new FileInputStream("src/nomal_io.txt"));
byte [] buf = new byte[1024];
int bytesRead = in.read(buf);
while(bytesRead != -1)
{
for(int i=0;i<bytesRead;i++)
System.out.print((char)buf[i]);
bytesRead = in.read(buf);
}
}catch (IOException e)
{
e.printStackTrace();
}finally{
try{
if(in != null){
in.close();
}
}catch (IOException e){
e.printStackTrace();
}
}
}
#NIO
public static void method1(){
RandomAccessFile aFile = null;
try{
aFile = new RandomAccessFile("src/nio.txt","rw");
FileChannel fileChannel = aFile.getChannel();
ByteBuffer buf = ByteBuffer.allocate(1024);
int bytesRead = fileChannel.read(buf);
System.out.println(bytesRead);
while(bytesRead != -1)
{
buf.flip();
while(buf.hasRemaining())
{
System.out.print((char)buf.get());
}
buf.compact();
bytesRead = fileChannel.read(buf);
}
}catch (IOException e){
e.printStackTrace();
}finally{
try{
if(aFile != null){
aFile.close();
}
}catch (IOException e){
e.printStackTrace();
}
}
}
//使用Buffer一般遵循下面幾個(gè)步驟:
//分配空間(ByteBuffer buf = ByteBuffer.allocate(1024); 還有一種allocateDirector后面再陳述)
//寫入數(shù)據(jù)到Buffer(int bytesRead = fileChannel.read(buf);)
//調(diào)用filp()方法( buf.flip();)
//從Buffer中讀取數(shù)據(jù)(System.out.print((char)buf.get());)
//調(diào)用clear()方法或者compact()方法
Channel 必須要配合 Buffer 一起使用,通過從 Channel 讀取數(shù)據(jù)到 Buffer 中或者從 Buffer 寫入數(shù)據(jù)到 Channel 中狰闪,如下:
Selector:
多路復(fù)用器 Selector疯搅,它是 Java NIO 編程的基礎(chǔ),Selector 提供了詢問Channel是否已經(jīng)準(zhǔn)備好執(zhí)行每個(gè) I/O 操作的能力埋泵。簡單來講幔欧,Selector 會(huì)不斷地輪詢注冊(cè)在其上的 Channel罪治,如果某個(gè) Channel 上面發(fā)生了讀或者寫事件,這個(gè) Channel 就處于就緒狀態(tài)礁蔗,會(huì)被 Selector 輪詢出來觉义,然后通過 SelectionKey 可以獲取就緒 Channel 的集合,進(jìn)行后續(xù)的 I/O 操作浴井。
- Acceptor為服務(wù)端Channel注冊(cè)Selector晒骇,監(jiān)聽accept事件
- 當(dāng)客戶端連接后,觸發(fā)accept事件
- 服務(wù)器構(gòu)建對(duì)應(yīng)的客戶端Channel磺浙,并在其上注冊(cè)Selector洪囤,監(jiān)聽讀寫事件
- 當(dāng)發(fā)生讀寫事件后,進(jìn)行相應(yīng)的讀寫處理
TCP服務(wù)端實(shí)例-NIO實(shí)現(xiàn)
NIO客戶端代碼(連接)
//獲取socket通道
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
//獲得通道管理器
selector=Selector.open();
channel.connect(new InetSocketAddress(serverIp, port));
//為該通道注冊(cè)SelectionKey.OP_CONNECT事件
channel.register(selector, SelectionKey.OP_CONNECT);
NIO客戶端代碼(監(jiān)聽)
while(true){
//選擇注冊(cè)過的io操作的事件(第一次為SelectionKey.OP_CONNECT)
selector.select();
while(SelectionKey key : selector.selectedKeys()){
if(key.isConnectable()){
SocketChannel channel=(SocketChannel)key.channel();
if(channel.isConnectionPending()){
channel.finishConnect();//如果正在連接撕氧,則完成連接
}
channel.register(selector, SelectionKey.OP_READ);
}else if(key.isReadable()){ //有可讀數(shù)據(jù)事件瘤缩。
SocketChannel channel = (SocketChannel)key.channel();
ByteBuffer buffer = ByteBuffer.allocate(10);
channel.read(buffer);
byte[] data = buffer.array();
String message = new String(data);
System.out.println("recevie message from server:, size:"
+ buffer.position() + " msg: " + message);
}
}
}
NIO服務(wù)端代碼(連接)
//獲取一個(gè)ServerSocket通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port));
//獲取通道管理器
selector = Selector.open();
//將通道管理器與通道綁定,并為該通道注冊(cè)SelectionKey.OP_ACCEPT事件伦泥,
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
NIO服務(wù)端代碼(監(jiān)聽)
while(true){
//當(dāng)有注冊(cè)的事件到達(dá)時(shí)剥啤,方法返回,否則阻塞不脯。
selector.select();
for(SelectionKey key : selector.selectedKeys()){
if(key.isAcceptable()){
ServerSocketChannel server =
(ServerSocketChannel)key.channel();
SocketChannel channel = server.accept();
channel.write(ByteBuffer.wrap(
new String("send message to client").getBytes()));
//在與客戶端連接成功后府怯,為客戶端通道注冊(cè)SelectionKey.OP_READ事件。
channel.register(selector, SelectionKey.OP_READ);
}else if(key.isReadable()){//有可讀數(shù)據(jù)事件
SocketChannel channel = (SocketChannel)key.channel();
ByteBuffer buffer = ByteBuffer.allocate(10);
int read = channel.read(buffer);
byte[] data = buffer.array();
String message = new String(data);
System.out.println("receive message from client, size:"
+ buffer.position() + " msg: " + message);
}
}
}
二防楷、netty
1牺丙、netty特點(diǎn)
- 一個(gè)高性能、異步事件驅(qū)動(dòng)的NIO框架域帐,它提供了對(duì)TCP赘被、UDP和文件傳輸?shù)闹С?/li>
- 使用更高效的socket底層,對(duì)epoll空輪詢引起的cpu占用飆升在內(nèi)部進(jìn)行了處理肖揣,避免了直接使用NIO的陷阱民假,簡化了NIO的處理方式。
- 采用多種decoder/encoder 支持龙优,對(duì)TCP粘包/分包進(jìn)行自動(dòng)化處理
- 可使用接受/處理線程池羊异,提高連接效率,對(duì)重連彤断、心跳檢測的簡單支持
- 可配置IO線程數(shù)野舶、TCP參數(shù), TCP接收和發(fā)送緩沖區(qū)使用直接內(nèi)存代替堆內(nèi)存宰衙,通過內(nèi)存池的方式循環(huán)利用ByteBuf
- 通過引用計(jì)數(shù)器及時(shí)申請(qǐng)釋放不再引用的對(duì)象平道,降低了GC頻率
- 使用單線程串行化的方式,高效的Reactor線程模型
- 大量使用了volitale供炼、使用了CAS和原子類一屋、線程安全類的使用窘疮、讀寫鎖的使用
2、netty線程模型
netty基于Reactor模型冀墨,是對(duì)NIO模型的一種改進(jìn)闸衫。
-
單線程Reactor模型
image.png
這個(gè)模型和上面的NIO流程很類似,只是將消息相關(guān)處理獨(dú)立到了Handler中去了诽嘉!雖然上面說到NIO一個(gè)線程就可以支持所有的IO處理蔚出。但是瓶頸也是顯而易見的!我們看一個(gè)客戶端的情況虫腋,如果這個(gè)客戶端多次進(jìn)行請(qǐng)求骄酗,如果在Handler中的處理速度較慢,那么后續(xù)的客戶端請(qǐng)求都會(huì)被積壓岔乔,導(dǎo)致響應(yīng)變慢酥筝!所以引入了Reactor多線程模型!
-
多線程Reactor模型
image.png
Reactor多線程模型就是將Handler中的IO操作和非IO操作分開滚躯,操作IO的線程稱為IO線程雏门,非IO操作的線程稱為工作線程!這樣的話,客戶端的請(qǐng)求會(huì)直接被丟到線程池中掸掏,客戶端發(fā)送請(qǐng)求就不會(huì)堵塞茁影!
3、netty核心組件
netty服務(wù)端 代碼示例
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
//EventLoopGroup繼承線程池ScheduledExecutorService
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker);
bootstrap.channel(NioServerSocketChannel.class);//利用反射構(gòu)造NioServerSocketChannel實(shí)例
bootstrap.option(ChannelOption.SO_BACKLOG, 2048);//backlog指定了內(nèi)核為此套接口排隊(duì)的最大連接個(gè)數(shù)
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.handler(new LoggingServerHandler());//handler與childHandler不同
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyChannelHandler1());
ch.pipeline().addLast(new MyChannelHandler2());
ch.pipeline().addLast(new MyChannelHandler3());
}
});
ChannelFuture f = bootstrap.bind(port).sync();//bind方法實(shí)現(xiàn)
f.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
//啟動(dòng)成功
}
});
f.channel().closeFuture().sync();
class MyChannelHandler1 extends ChannelHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
}
}
Channel
Channel 是 Netty 網(wǎng)絡(luò)操作抽象類丧凤,它除了包括基本的 I/O 操作募闲,如 bind、connect愿待、read浩螺、write 之外,還包括了 Netty 框架相關(guān)的一些功能仍侥。
EventLoop
Netty 基于事件驅(qū)動(dòng)模型要出,使用不同的事件來通知我們狀態(tài)的改變或者操作狀態(tài)的改變。它定義了在整個(gè)連接的生命周期里當(dāng)有事件發(fā)生的時(shí)候處理的核心抽象农渊。
Channel 為Netty 網(wǎng)絡(luò)操作抽象類患蹂,EventLoop 主要是為Channel 處理 I/O 操作,兩者配合參與 I/O 操作砸紊。
上圖為Channel传于、EventLoop、Thread醉顽、EventLoopGroup之間的關(guān)系沼溜。一個(gè) EventLoop 在它的生命周期內(nèi)只能與一個(gè)Thread綁定,一個(gè) EventLoop 可被分配至一個(gè)或多個(gè) Channel 游添,輪流處理系草。
ChannelFuture
Netty 為異步非阻塞弹惦,即所有的 I/O 操作都為異步的,因此悄但,我們不能立刻得知消息是否已經(jīng)被處理了棠隐。Netty 提供了 ChannelFuture 接口,通過該接口的 addListener() 方法注冊(cè)一個(gè) ChannelFutureListener檐嚣,當(dāng)操作執(zhí)行成功或者失敗時(shí)助泽,監(jiān)聽就會(huì)自動(dòng)觸發(fā)返回結(jié)果。
ChannelFuture f = bootstrap.bind(port).sync();
f.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
//啟動(dòng)成功
}
});
f.channel().closeFuture().sync();
ChannelHandler
ChannelHandler 為 Netty 中最核心的組件嚎京,它充當(dāng)了所有處理入站和出站數(shù)據(jù)的應(yīng)用程序邏輯的容器嗡贺。ChannelHandler 主要用來處理各種事件,這里的事件很廣泛鞍帝,比如可以是連接诫睬、數(shù)據(jù)接收、異常帕涌、數(shù)據(jù)轉(zhuǎn)換等摄凡。
ChannelHandler 有兩個(gè)核心子類 ChannelInboundHandler 和 ChannelOutboundHandler,其中 ChannelInboundHandler 用于接收蚓曼、處理入站數(shù)據(jù)和事件亲澡,而 ChannelOutboundHandler 則相反。
ChannelPipeline
ChannelPipeline 為 ChannelHandler 鏈纫版,提供了一個(gè)容器并定義了用于沿著鏈傳播入站和出站事件流的 API床绪。一個(gè)數(shù)據(jù)或者事件可能會(huì)被多個(gè) Handler 處理,在這個(gè)過程中其弊,數(shù)據(jù)或者事件經(jīng)流 ChannelPipeline癞己,由 ChannelHandler 處理。在這個(gè)處理過程中梭伐,一個(gè) ChannelHandler 接收數(shù)據(jù)后處理完成后交給下一個(gè) ChannelHandler痹雅,或者什么都不做直接交給下一個(gè) ChannelHandler。
當(dāng)一個(gè)數(shù)據(jù)流進(jìn)入 ChannlePipeline 時(shí)籽御,它會(huì)從 ChannelPipeline 頭部開始傳給第一個(gè) ChannelInboundHandler 练慕,當(dāng)?shù)谝粋€(gè)處理完后再傳給下一個(gè),一直傳遞到管道的尾部技掏。與之相對(duì)應(yīng)的是铃将,當(dāng)數(shù)據(jù)被寫出時(shí),它會(huì)從管道的尾部開始哑梳,先經(jīng)過管道尾部的 “最后” 一個(gè)ChannelOutboundHandler劲阎,當(dāng)它處理完成后會(huì)傳遞給前一個(gè) ChannelOutboundHandler 。
附錄