先來說說為什么要寫netty源碼分析的文章傻谁,一個方面是自己看了一些源碼孝治,卻找不到了解原理的方式,一個方面是萬一bat哪個大派蟠牛看到我寫的文章谈飒,給我一個5k的工作呢。不開玩笑了态蒂,在學(xué)習(xí)netty之前杭措,需要先學(xué)習(xí)java nio的知識,如果有不了解java nio的钾恢,可以先學(xué)習(xí)一下java nio方面的知識手素。下面我先介紹一個netty:
Netty是一個高性能、異步事件驅(qū)動的NIO框架赘那,提供了對TCP刑桑、UDP和文件傳輸?shù)闹С郑鳛橐粋€異步NIO框架募舟,Netty的所有IO操作都是異步非阻塞的祠斧,通過Future-Listener機制,用戶可以方便的主動獲取或者通過通知機制獲得IO操作結(jié)果拱礁。
作為當(dāng)前最流行的NIO框架琢锋,Netty在互聯(lián)網(wǎng)領(lǐng)域、大數(shù)據(jù)分布式計算領(lǐng)域呢灶、游戲行業(yè)吴超、通信行業(yè)等獲得了廣泛的應(yīng)用,一些業(yè)界著名的開源組件也基于Netty構(gòu)建鸯乃,比如RPC框架鲸阻、zookeeper等跋涣。
netty性能高的原因是netty的reactor線程模型
先來一個netty的基本實現(xiàn):
server 代碼實現(xiàn)
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void run() throws Exception {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
//new LoggingHandler(LogLevel.INFO),
new EchoServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(port).sync(); // (5)
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new EchoServer(port).run();
}
}
EchoServerHandler 實現(xiàn)
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = Logger.getLogger(
EchoServerHandler.class.getName());
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
ctx.close();
}
}
client 代碼實現(xiàn)
public class EchoClient {
private final String host;
private final int port;
private final int firstMessageSize;
public EchoClient(String host, int port, int firstMessageSize) {
this.host = host;
this.port = port;
this.firstMessageSize = firstMessageSize;
}
public void run() throws Exception {
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
//new LoggingHandler(LogLevel.INFO),
new EchoClientHandler(firstMessageSize));
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
final String host = args[0];
final int port = Integer.parseInt(args[1]);
final int firstMessageSize;
if (args.length == 3) {
firstMessageSize = Integer.parseInt(args[2]);
} else {
firstMessageSize = 256;
}
new EchoClient(host, port, firstMessageSize).run();
}
}
EchoClientHandler 實現(xiàn)
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = Logger.getLogger(
EchoClientHandler.class.getName());
private final ByteBuf firstMessage;
/**
* Creates a client-side handler.
*/
public EchoClientHandler(int firstMessageSize) {
if (firstMessageSize <= 0) {
throw new IllegalArgumentException("firstMessageSize: " + firstMessageSize);
}
firstMessage = Unpooled.buffer(firstMessageSize);
for (int i = 0; i < firstMessage.capacity(); i ++) {
firstMessage.writeByte((byte) i);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
ctx.close();
}
}
1、NioEventLoopGroup是用來處理I/O操作的線程池鸟悴,Netty對 EventLoopGroup 接口針對不同的傳輸協(xié)議提供了不同的實現(xiàn)陈辱。在本例子中,需要實例化兩個NioEventLoopGroup细诸,通常第一個稱為“boss”沛贪,用來accept客戶端連接,另一個稱為“worker”震贵,處理客戶端數(shù)據(jù)的讀寫操作利赋。 2、ServerBootstrap是啟動服務(wù)的輔助類猩系,有關(guān)socket的參數(shù)可以通過ServerBootstrap進行設(shè)置媚送。 3、這里指定NioServerSocketChannel類初始化channel用來接受客戶端請求寇甸。 4季希、通常會為新SocketChannel通過添加一些handler,來設(shè)置ChannelPipeline幽纷。ChannelInitializer 是一個特殊的handler,其中initChannel方法可以為SocketChannel 的pipeline添加指定handler博敬。 5友浸、通過綁定端口8080,就可以對外提供服務(wù)了偏窝。
下面我們要說的就是netty的啟動輔助類ServerBootStrap收恢。
ServerBootStrap主要就是包含兩個NioEventLoopGroup,每個NioEventLoopGroup由一個或多個NioEventLoop組成,我們來看看ServerBootStrap的結(jié)構(gòu):
其實很簡單祭往,就是繼承自AbstractBootstrap伦意,這個類有一些通用的屬性,比如NioEventLoopGroup等硼补。
我們主要來看doBind方法:
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化并注冊一個 Channel 對象驮肉,因為注冊是異步的過程,所以返回一個 ChannelFuture 對象已骇。
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) { // 若發(fā)生異常离钝,直接進行返回。
return regFuture;
}
// 綁定 Channel 的端口褪储,并注冊 Channel 到 SelectionKey 中卵渴。
if (regFuture.isDone()) { // 未
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise); // 綁定
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println(Thread.currentThread() + ": PendingRegistrationPromise");
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise); // 綁定
}
}
});
return promise;
}
}
通過doBind方法綁定好端口后,調(diào)用initAndRegister方法鲤竹,1浪读、方法initAndRegister返回一個ChannelFuture實例regFuture,通過regFuture可以判斷initAndRegister執(zhí)行結(jié)果。 2碘橘、如果regFuture.isDone()為true互订,說明initAndRegister已經(jīng)執(zhí)行完,則直接執(zhí)行doBind0進行socket綁定蛹屿。 3屁奏、否則regFuture添加一個ChannelFutureListener監(jiān)聽,當(dāng)initAndRegister執(zhí)行完成時错负,調(diào)用operationComplete方法并執(zhí)行doBind0進行socket綁定坟瓢。
Channel channel = null;
try {
// 創(chuàng)建 Channel 對象
channel = channelFactory.newChannel();
// 初始化 Channel 配置
init(channel);
} catch (Throwable t) {
if (channel != null) { // 已創(chuàng)建 Channel 對象
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly(); // 強制關(guān)閉 Channel
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(),
GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 注冊 Channel 到 EventLoopGroup 中
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly(); // 強制關(guān)閉 Channel
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
initAndRegister創(chuàng)建了NioServerSocketChannel實例,并為NioServerSocketChannel的pipeline添加handler,再將NioServerSocketChannel注冊到Selector上犹撒。
在NioServerSockerChannel創(chuàng)建完成后折联,調(diào)用pipeline的Head結(jié)點的read方法。服務(wù)端啟動的流程就分析完了识颊。