參考自大神文章檩赢,大神原文貼上http://www.reibang.com/p/c5068caab217
介紹
netty是一款基于NIO的高性能網(wǎng)絡(luò)框架前计,它對JDK中的NIO做了封裝和優(yōu)化吱七,提供了更好的性能的同時(shí)埋心,降低了使用的難度正塌。netty支持NIO中的select嘀略、poll、epoll(僅Linux)等乓诽。關(guān)于這三者及BIO帜羊、NIO、AIO的介紹請看https://segmentfault.com/a/1190000003063859
同時(shí)鸠天,netty還支持HTTP讼育、HTTPS、websocket協(xié)議稠集,并提供了開箱即用的一些基類奶段。
官網(wǎng)服務(wù)端最佳實(shí)踐
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new DiscardServer(port).run();
}
}
- NioEventLoopGroup是一個(gè)多線程事件驅(qū)動(dòng)IO操作類。netty對不同的傳輸提供了多樣的EventLoopGroup實(shí)現(xiàn)剥纷,我們這里是將netty作為服務(wù)端使用痹籍,因此這里有兩個(gè)NioEventLoopGroup。其中一個(gè)稱為boss晦鞋,負(fù)責(zé)接收連接請求蹲缠,另外一個(gè)稱為worker,負(fù)責(zé)處理具體的channel悠垛。EventLoopGroup是基于JDK的線程池進(jìn)行封裝的實(shí)現(xiàn)线定,boss和worker可以設(shè)置不同的線程數(shù),默認(rèn)情況下确买,boss線程池中線程數(shù)量為1個(gè)斤讥,worker中的線程數(shù)量為2*CPU。
- ServerBootstrap是服務(wù)端啟動(dòng)的引導(dǎo)類湾趾。
- 指定使用NioServerSocketChannel來建立請求連接芭商。
- 構(gòu)造一系列channelHandler處理鏈來組成ChannelPipeline。
- option用來配置一些channel的參數(shù)搀缠,配置的參數(shù)會被ChannelConfig使用铛楣。
- 另外:示例中的DiscardServerHandler是繼承了ChannelInboundHandlerAdapter,復(fù)寫了channelRead和exceptionCaught方法胡嘿。
官網(wǎng)客戶端最佳實(shí)踐
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = args[0];
int port = Integer.parseInt(args[1]);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
- 客戶端引導(dǎo)類使用Bootstrap蛉艾。
- 客戶端只需要一個(gè)NioEventLoopGroup即可,因?yàn)闊o需單獨(dú)處理連接和IO操作。
- 指定建立連接的方式為NioSocketChannel。
- option用來配置一些channel的參數(shù)勿侯,配置的參數(shù)會被ChannelConfig使用拓瞪。
- 連接到指定的IP和端口
Netty的核心組件簡介
-
ServerBootStrap、BootStrap
ServerBootStrap是服務(wù)端應(yīng)用啟動(dòng)引導(dǎo)類助琐,使用這個(gè)類只需綁定本地端口即可祭埂,簡單的使用方法即是使用main方法調(diào)用即可,無需依賴Tomcat兵钮、WebLogic容器等蛆橡。
BootStrap是客戶端應(yīng)用啟動(dòng)引導(dǎo)類,此類需要綁定連接到服務(wù)端的host和port掘譬,也可以使用main方法直接啟動(dòng)即可泰演,不依賴于Tomcat、Jboss容器葱轩。
-
NioEventLoopGroup睦焕、NioEventLoop
服務(wù)端一般可使用一個(gè)或兩個(gè)NioEventLoopGroup(推薦)來處理channel(等同于JDK中的socket)的連接及IO操作。NioEventLoopGroup是NIO的EventLoopGroup具體實(shí)現(xiàn)靴拱,而EventLoopGroup也繼承了ScheduledExecutorService垃喊,即NioEventLoopGroup本身是一個(gè)線程池,那么NioEventLoopGroup也提供了一些構(gòu)造方法去構(gòu)造NioEventLoopGroup袜炕。一般boss的NioEventLoopGroup的線程大小為1本谜,worker的NioEventLoopGroup為2*cpu。boss的NioEventLoopGroup負(fù)責(zé)處理 客戶端的連接請求偎窘,worker的NioEventLoopGroup負(fù)責(zé)處理具體的IO操作等乌助。
EventLoop 定義了 Netty 的核心抽象,用于處理連接的生命周期中所發(fā)生的事件评架。NioEventLoop是具體處理channel連接及IO操作的類眷茁,一個(gè)channel在其生命周期只和一個(gè)NioEventLoop綁定炕泳,一個(gè)NioEventLoop可以同時(shí)處理多個(gè)channel虫埂。NioEventLoop使用select()不斷輪詢其管理的channel缴川,直至接收到客戶端的數(shù)據(jù)。
-
NioServerSocketChannel、NioSocketChannel
NioServerSocketChannel是netty為非阻塞同步IO提供的服務(wù)端socket連接類烂斋,NioSocketChannel是netty為非阻塞同步IO提供的客戶端socket連接類。
bootstrap類在引導(dǎo)服務(wù)時(shí)指定channel類型乾蓬,服務(wù)端接收新連接時(shí)便使用NioServerSocketChannel創(chuàng)建新的channel施戴,客戶端使用NioSocketChannel建立連接。
-
ChannelHandler皇耗、ChannelInboundHandler南窗、ChannelOutboundHandler
- ChannelHandler接口為處理入站和出站數(shù)據(jù)的應(yīng)用程序邏輯的容器。
- ChannelInboundHandler、ChannelOutboundHandler是ChannelHandler的子接口万伤,分別是進(jìn)站時(shí)處理類的接口和出站時(shí)處理類的接口窒悔。即ChannelInboundHandler提供了服務(wù)端接收數(shù)據(jù)時(shí)處理的接口,提供了一些方法敌买,如channelRegisteredchannelUnregistered简珠,channelActive,channelInactive虹钮,channelRead方法等聋庵,ChannelOutboundHandler提供了服務(wù)端返回?cái)?shù)據(jù)時(shí)做一些處理的接口。
-
ChannelInboundHandlerAdapter芙粱、ChannelOutboundHandlerAdapter
ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter分別是ChannelInboundHandler和ChannelOutboundHandler的基礎(chǔ)實(shí)現(xiàn)類祭玉,我們自定義的handler最好繼承這兩個(gè),盡可能只重寫自己需要的方法春畔。
-
ChannelPipeline
ChannelPipeline控制著ChannelHandler的流轉(zhuǎn)攘宙,所有的ChannelHandler都在ChannelPipeline上流轉(zhuǎn)。
-
ChannelHandlerContext
ChannelHandlerContext是ChannelPipeline與ChannelHandler之間的上下文拐迁。
-
ByteBuf
ByteBuf是netty提供的操作字節(jié)碼的類蹭劈,比JDK中的ByteBuffer提供了更為方便的操作字節(jié)碼的方法。ByteBuf中同時(shí)保存了readIndex和writeIndex线召,讀寫互不干擾铺韧。
-
ChannelFuture
ChannelFuture是netty中異步IO操作的結(jié)果,在netty中所有的IO操作都是異步的缓淹,在使用ChannelFuture時(shí)需先判斷操作已結(jié)束且成功哈打。即isDone&&isSuccess。
SocketChannel讯壶、ServerSocketChannel
SocketChannel等同于JDK中的socket料仗,ServerSocketChannel等同于JDK中的ServerSocket。
服務(wù)端啟動(dòng)解析
源碼分析參考自美團(tuán)大神的文章http://www.reibang.com/p/c5068caab217
官網(wǎng)的netty服務(wù)端demo中伏蚊,(3)立轧、(4)、(5)躏吊、(6)均為服務(wù)端的一些配置氛改。
ChannelFuture f = b.bind(port).sync(); // (7) 這里才是真正的啟動(dòng)過程。
點(diǎn)進(jìn)去查看源碼比伏。
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
創(chuàng)建了一個(gè)InetSocketAddress新對象胜卤,并調(diào)用了bind()方法。
public ChannelFuture bind(SocketAddress localAddress) {
//校驗(yàn)赁项,EventLoopGroup和ChannelFactory不能為空葛躏。
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
首先就是validate方法澈段,校驗(yàn)EventLoopGroup和ChannelFactory不能為空。EventLoopGroup是我們在(2)就已經(jīng)綁定了的舰攒,ChannelFactory先不用管均蜜。先繼續(xù)看大致流程。
然后調(diào)用了doBind()方法芒率。
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
//.......省略些判斷等細(xì)枝末節(jié)
final Channel channel = regFuture.channel();
//.......省略些判斷等細(xì)枝末節(jié)
doBind0(regFuture, channel, localAddress, promise);
}
從源碼中可以看出囤耳,doBind()方法主要做了兩件事情,第一件就是initAndRegister偶芍,從方法名可猜測是初始化以及注冊充择,另一件是綁定了什么。
先看下initAndRegister方法匪蟀。
final ChannelFuture initAndRegister() {
//...創(chuàng)建了一個(gè)新的channel
Channel channel = channelFactory.newChannel();
//...初始化channel
init(channel);
//...
ChannelFuture regFuture = config().group().register(channel);
}
initAndRegister方法主要做了三件事椎麦,一是創(chuàng)建了新的channel,二是初始化channel材彪,三是注冊了channel观挎。每個(gè)方法我們都進(jìn)去細(xì)看一下。
注意段化,通過debug可以發(fā)現(xiàn)嘁捷,這個(gè)channelFactory在之前就已new出來了,實(shí)現(xiàn)類為ReflectiveChannelFactory显熏,我們看下這個(gè)實(shí)現(xiàn)類的newChannel方法雄嚣。
public T newChannel() {
try {
//這個(gè)class就是之前指定的channel類型,NioServerSocketChannel
return clazz.getConstructor().newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
這個(gè)方法其實(shí)就是調(diào)用了默認(rèn)構(gòu)造函數(shù)喘蟆,new了一個(gè)新對象缓升。
這里看一下NioServerSocketChannel的默認(rèn)構(gòu)造函數(shù)。
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
再看一下這里調(diào)用的super父類方法蕴轨。
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
//readInterestOp->SelectionKey.OP_ACCEPT
this.readInterestOp = readInterestOp;
//...
ch.configureBlocking(false);//設(shè)置為非阻塞IO港谊,典型的NIO
}
這里的 readInterestOp
即前面層層傳入的 SelectionKey.OP_ACCEPT
,接下來重點(diǎn)分析 super(parent);
(這里的parent其實(shí)是null橙弱,由前面寫死傳入)歧寺。
再看一下 這里的super()方法。
protected AbstractChannel(Channel parent) {
//AbstractChannel與ChannelId膘螟、Unsafe成福、ChannelPipeline綁定
this.parent = parent;
id = newId();//DefaultChannelId
unsafe = newUnsafe();//NioMessageUnsafe
pipeline = newChannelPipeline();//DefaultChannelPipeline
}
這里new了三個(gè)對象碾局,分別是ChannelId荆残,Unsafe,DefaultChannelPipeline净当。這里只是new了三個(gè)對象内斯,先不著急他們是做什么的蕴潦,繼續(xù)追代碼。
再看一下init()方法俘闯。
void init(Channel channel) throws Exception {
//這里的channel就是剛剛創(chuàng)建的channel->NioServerSocketChannel
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
//這個(gè)pipeline是NioServerSocketChannel的父類的構(gòu)造方法中new出來的DefaultChannelPipeline
ChannelPipeline p = channel.pipeline();
//這里的childGroup就是new出來的workerEventLoopGroup
final EventLoopGroup currentChildGroup = childGroup;
//childHandler--->server端 new出的ChannelInitializer
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
這個(gè)方法太長了潭苞,分成幾個(gè)部分理解。
這一部分主要是先分別通過options0方法和attrs方法獲取到options和attrs真朗,再設(shè)置到ChannelConfig或Channel中此疹。
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
再看這一部分。
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
//這里的config是ServerBootstrapConfig遮婶,綁定了當(dāng)前的ServerBootstrap
//這里的config.handler蝗碎,其實(shí)就是ServerBootstrap中調(diào)用.handler()方法綁定的那個(gè)handler,可能為空旗扑,不調(diào)用時(shí)就為空蹦骑。
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
可以看到,這里構(gòu)造的currentChildGroup臀防、currentChildHandler眠菇、currentChildOptions、currentChildAttrs都是用來構(gòu)造ServerBootstrapAcceptor的袱衷,這個(gè)ServerBootstrapAcceptor從名字上就可以看出來捎废,這是一個(gè)接入器,專門接受新請求致燥,把新的請求扔給某個(gè)事件循環(huán)器缕坎,我們先不做過多分析。
總結(jié)一下init方法:
- 構(gòu)造出NioServerSocketChannel篡悟,channel內(nèi)部綁定了channelId谜叹、channelPipeline、以及Unsafe搬葬。
- 將ServerBootstrap綁定的option設(shè)置到channel綁定的ChannelConfig中
- 將ServerBootstrap綁定的attr設(shè)置到channel的attr中
- 如果ServerBootstrap調(diào)用了handler方法荷腊,將綁定的handler添加到pipeline最后面
- 異步構(gòu)造一個(gè)ServerBootstrapAcceptor,并將其添加到pipeline最后面急凰。
register方法分析女仰。
//這里的config是ServerBootstrapConfig,綁定了當(dāng)前的ServerBootstrap抡锈,調(diào)用了group方法就是獲取到workerEvenetLoopGroup
ChannelFuture regFuture = config().group().register(channel);
點(diǎn)擊register方法
SingleThreadEventLoop
public ChannelFuture register(Channel channel) {
//構(gòu)造了一個(gè)DefaultChannelPromise疾忍,包含了當(dāng)前NioServerSocketChannel以及當(dāng)前ServerBootstrap
//DefaultChannelPromise是ChannelFuture的一個(gè)實(shí)現(xiàn)類
return register(new DefaultChannelPromise(channel, this));
}
構(gòu)造了一個(gè)DefaultChannelPromise對象再調(diào)用register方法。
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
//這里的unsafe具體實(shí)現(xiàn)類是NioMessageUnsafe
promise.channel().unsafe().register(this, promise);
return promise;
}
從上面的方法可以看到床三,unsafe()方法的返回值再次調(diào)用了register方法一罩,點(diǎn)擊查看詳情。
AbstractUnsafe,AbstractChannel的內(nèi)部類
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//....
//eventLoop.inEventLoop()就是判斷是否是當(dāng)前線程
if (eventLoop.inEventLoop()) {
//是當(dāng)前線程就直接處理
register0(promise);
} else {
try {
//不是當(dāng)前線程就開個(gè)線程再處理
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
不同的邏輯都調(diào)用了register0方法
private void register0(ChannelPromise promise) {
try {
//.....
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
//handle added
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
//handle registered
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//javaChannel()方法返回的是NioServerSocketChannel
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
//這里的att是AbstractNioChannel
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (blocking)
throw new IllegalBlockingModeException();
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}
register方法總結(jié):
- 構(gòu)造了一個(gè)DefaultChannelPromise類撇簿,這個(gè)類是ChannelFuture的實(shí)現(xiàn)類
- 判斷當(dāng)前線程是否是啟動(dòng)時(shí)的線程聂渊,是則直接調(diào)用register0方法差购,否則新開線程調(diào)用register0方法汉嗽,原因贊不理解。
- 在某種條件下(未知,詳見AbstractSelectableChannel的register方法和findKey方法),將AbstractNioChannel的實(shí)現(xiàn)類(這里其實(shí)是NioServerSocketChannel)綁定到SelectionKey
- 調(diào)用handler()方法綁定的channelhandler中的channelAdded、channelRegister等方法
總結(jié):
netty啟動(dòng)一個(gè)服務(wù)經(jīng)過的所有過程:
1.設(shè)置啟動(dòng)參數(shù),如負(fù)責(zé)接收請求的boss線程組奈虾,負(fù)責(zé)處理請求的worker線程組蜡塌,設(shè)置channel類型劳曹,綁定端口等房资。
2.創(chuàng)建server對應(yīng)的channel鳖擒,創(chuàng)建各大組件馆蠕,包括ChannelConfig颂郎,ChannelId,ChannelPipeline寄雀,ChannelHandler急膀,Unsafe等墅茉。
3.初始化server對應(yīng)的channel,設(shè)置attr及options等绷旗,給server的channel添加接入器,觸發(fā)register事件心剥,調(diào)用handler()方法中綁定的channelhandler的channelAdd、channelRegister等方法背桐。
4.調(diào)用到j(luò)dk底層做端口綁定优烧,并觸發(fā)active事件,active觸發(fā)的時(shí)候链峭,真正做服務(wù)端口綁定
參考自大神博客畦娄,大神原文貼上http://www.reibang.com/p/c5068caab217