1燎字、Reactor模式:NIO網(wǎng)絡框架的典型模式
Reactor是網(wǎng)絡編程中的一種設計模式牺弄,reactor會解耦并發(fā)請求的服務并分發(fā)給對應的事件處理器來處理。目前广凸,許多流行的開源框架都用到了reactor模式典鸡,如:netty被廓、node.js、Cindy等萝玷,包括java的nio。
何為Reactor線程模型昆婿?
Reactor模式是事件驅動的球碉,有一個或多個并發(fā)輸入源,有一個Service Handler仓蛆,有多個Request Handlers睁冬;這個Service Handler會同步的將輸入的請求(Event)多路復用的分發(fā)給相應的Request Handler
Reactor模式的三種形式
1、Reactor 單線程模式:
這種實現(xiàn)方式看疙,和第一章java NIO中單線程NIO實現(xiàn)是一樣的豆拨,一個Reactor處理所有的事情。
2能庆、Reactor 多線程模式:
編解碼及業(yè)務處理使用線程池施禾,這樣的話,可以避免IO阻塞(IO阻塞的代價是非常大的)搁胆。
3弥搞、Reactors 主從模式:
把Reactor分為兩個,一個負責接收渠旁,一個負責讀寫攀例,業(yè)務處理可以用線程池,在服務端啟動時配置(也可以選擇不用線程池顾腊,這個看具體業(yè)務需求)
2粤铭、Netty中如何使用Reactor模式
- 單線程Reactor 模式
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup ,bossGroup )
- 多線程 Reactor 模式
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup ,workerGroup )
//Handler使用線程池進行處理
- 主從Reactors 模式(官方推薦)
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup ,workerGroup )
EventLoopGroup初始化是創(chuàng)建創(chuàng)建兩個NioEventLoopGroup類型的Reactor線程池bossGroup和workGroup分別用來處理客戶端的連接請求(bossGroup)和通道IO事件(workerGroup);
注:new NioEventLoopGroup()默認創(chuàng)建cpu核數(shù)*2的線程數(shù)
主從模式的好處有:
1杂靶、業(yè)務解耦:一個reactor用來處理客戶端連接梆惯,一個reactor用來處理業(yè)務
2、安全性:業(yè)務解耦以后伪煤,就可以在bossGroup中做一些SSL校驗加袋、ip黑名單、登錄之類的安全性校驗
3抱既、性能提升:只有通過安全性校驗的客戶端才能繼續(xù)進行業(yè)務處理职烧,這樣也能提升處理性能,否則大量的無效客戶端接入和正常的業(yè)務處理混雜在一起,影響業(yè)務處理性能蚀之。
使用demo:
serverBootstrap.group(boss,worker).handler(new RuleBasedIpFilter()).childHandler(new NettyServerInitializer(this.factoryCode));
3蝗敢、Netty EventLoop源碼解析
1、NioEventLoopGroup整體結構
EventExecutorGroup視圖
new NioEventLoopGroup源碼
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//EventExecutorGroup里面有一個EventExecutor數(shù)組足删,保存了多個EventExecutor;
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//初始化EventExecutor數(shù)組寿谴,數(shù)組是NioEventLoop,見下面
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
//EventExecutorChooser.next()定義選擇EventExecutor的策略失受;
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
@Override
public EventExecutor next() {
return chooser.next();
}
NioEventLoopGroup.class
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
NioEventLoop.class
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
//創(chuàng)建selector
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
- EventExecutorGroup里面有一個EventExecutor數(shù)組讶泰,保存了多個EventExecutor(NIOEventLoop);
- EventExecutorGroup是不干什么事情的,當收到一個請后拂到,他就調(diào)用next()獲得一個它里面的EventExecutor痪署,再調(diào)用這個executor的方法;
- EventExecutorChooserFactory.EventExecutorChooser.next()定義選擇EventExecutor的策略(有兩種兄旬,都是輪詢)狼犯;
2、NioEventLoopGroup創(chuàng)建分析
bossGroup
注:從圖中可以看出领铐,一個NioEventLoopGroup中包含多個NioEventLoop悯森,一個NioEventLoop中包含一個Selector,Selector監(jiān)聽NioServerSocketChannel绪撵,當NioServerSocketChannel上有客戶端channel連接后瓢姻,觸發(fā)Acceptor事件,在ServerBootstrapAcceptor handler中轉發(fā)給workGroup
workerGroup
當客戶端channel初次連接時莲兢,將其注冊到workGroup中的NioEventLoop上(通過EventExecuorChooser.next()獲取workGroup中的一個NioEventLoop)汹来,然后NioEventLoop中的Selector不斷輪詢其所管理的NioSocketChannel,如果其中有讀寫事件準備好改艇,則由DefaultChannelPipeline處理收班。
3、ServerBootstrap啟動流程分析
4谒兄、ServerBootstrap執(zhí)行流程分析
// 配置服務端的NIO線程組
// 主線程組, 用于接受客戶端的連接摔桦,但是不做任何具體業(yè)務處理,像老板一樣承疲,
//負責接待客戶邻耕,不具體服務客戶
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 工作線程組, 老板線程組會把任務丟給他,讓手下線程組去做任務燕鸽,服務客戶
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
//欲加到NioServerSocketChannel Pipeline的handler
.handler(new LoggingHandler(LogLevel.INFO))
//欲加到NioSocketChannel(accept()返回的)Pipeline的handler
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
ch.pipeline().addLast("decoder", new StringDecoder());
ch.pipeline().addLast("encoder", new StringEncoder());
ch.pipeline().addLast(new EchoServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// 綁定端口兄世,開始接收進來的連接
ChannelFuture f = b.bind(port).sync(); // (7)