概述
Netty的線程模型沒什么出彩的地方,舊瓶裝新酒晰骑,其就是基于Reactor模式
Reactor模式結構
首先用一張許多人都看過的圖來開始說明Reactor模式
這張圖估計在許多博客和帖子都會看到继薛,但是許多博客卻沒有詳細說明及解釋這張圖在netty的架構上反應出來
Reactor模式角色定義
1:MainReactor: 負責響應client的連接請求乞娄,并建立連接惧互,可以有1個或者多個,維護一個獨立的NIO Selector
2:SubReactor: 負責對client的讀寫請求進行處理鳍刷,可以1個或者多個,并也維護一個獨立的NIO Selector
3:Acceptor: 負責MainReactor和SubReactor的橋梁左右理盆,已經準備好的連接轉發(fā)到SubReactor中進行處理Netty中Reactor模式角色定義
1:MainReactorEventLoopGroup痘煤,在Netty4以前叫做BossGroup;
2:SubReactor:EventLoopGroup, 在Netty4以前叫做WorkGroup猿规;
3:Acceptor:ServerBootstrapAcceptor:一個系統(tǒng)自帶的ChannelInboundHandler事件攔截器衷快,真正的將已準備好的Channel注冊到SubReactor中;
SubReactor:EventLoopGroup
subReactor的任務比較簡單姨俩,接收Acceptor的Channel蘸拔,后將Channel重新進行注冊,并觸發(fā)自定義的Handler來處理邏輯
1)接收Acceptor傳遞過來的Channel通道
2)注冊到相應的selector环葵。
private void register0(ChannelPromise promise) {
try {
doRegister();
registered = true;
promise.setSuccess();
pipeline.fireChannelRegistered();
if (isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
}
}
3)調用eventLoop.execute用以執(zhí)行注冊任務
4)啟動子線程调窍。即啟動了subReactor
注意:pipeline.fireChannelRegistered()觸發(fā)的事件,其實現(xiàn)原理就是初始化不同的ChannelInitializer對象张遭,對不同類型的Channel添加不同的攔截處理
- 啟動時邓萨,Channel是NioServerSocketChannel,調用的是ServerBootstrapAcceptor
- 連接時菊卷,Channel是NioSocketChannel缔恳,調用的是用戶自定義的InboundHandler
例如:
ServerBootstrap.childHandler(new GearmanServerInitializer())//此用戶自定義的的ChannelInitializer
public class GearmanServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) throws Exception {
pipeline.addLast("decoder", new Decoder());
pipeline.addLast("encoder", new Encoder());
pipeline.addLast("handler", new PacketHandler(networkManager));
}
}
Acceptor:ServerBootstrapAcceptor
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
// ...... 省略無關代碼
try {
childGroup.register(child);
} catch (Throwable t) {
// ......省略無關代碼
}
}
說明:
當Channel已經Ready后,就會ServerBootstrapAcceptor#channelRead
1:首先把用戶自定義的handler注冊到pipleline中
2:將已準備好的Channel與childGroup的烁,觸發(fā)點就是childGroup.register(child);
如果看了上篇文章褐耳,會發(fā)現(xiàn)兩者都是存在注冊通道的原理,其實是不同的
- 在server啟動時渴庆,通過回調bind的監(jiān)聽會把Selector注冊事件改為electionKey.OP_ACCEPT
- 而當有連接進來的時候铃芦,通過重新注冊又把Selector注冊事件改為了0
在這一點有點勞民傷財?shù)奈兜馈#ㄆ鋵嵅皇墙罄祝谙旅鏁iT有注意點提到這點)
MainReactor:EventLoopGroup
如果看了上篇文章刃滓,應該知道在server啟動時,會啟動MainReactor耸弄,一直循環(huán)執(zhí)行IO任務和非IO任務
protected void run() {
for (;;) {
oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
selectNow();
} else {
select();
if (wakenUp.get()) {
selector.wakeup();
}
}
cancelledKeys = 0;
final long ioStartTime = System.nanoTime();
needsToSelectAgain = false;
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
final long ioTime = System.nanoTime() - ioStartTime;
final int ioRatio = this.ioRatio;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
//..........省略無關代碼
} catch (Throwable t) {
//...........省略無關代碼
}
}
}
在上篇文章主要提的是runAllTasks這個方法咧虎,主要執(zhí)行非IO任務
這里主要是來說明下IO任務,selectedKeys不為空
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
執(zhí)行processSelectedKeysOptimized(selectedKeys.flip());
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}
注意的地方就是 final Object a = k.attachment();這個attachment是從哪里來的,看如下selectionKey =javaChannel().register(eventLoop().selector, 0, this);
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
}
}
}
- 對于當server啟動時计呈,由于該當前對象是NioServerSocketChannel
- 對于當連接進來是砰诵,由于當前對象是NioSocketChannel
在注冊整個Selector選擇器的時候,把當前通道(Channel)也注冊進去了捌显,上面那個勞民傷財其實是句玩笑茁彭,在這里體現(xiàn)出兩次注冊的用意來了
繼續(xù)以上processSelectedKeysOptimized,其中processSelectedKey就是處理網(wǎng)絡Io事件扶歪,把該事件發(fā)給Acceptor的主要觸發(fā)點,而有點要
代碼如下:
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
int readyOps = -1;
try {
readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
//........省略無關代碼
} catch (CancelledKeyException e) {
//.......省略無關代碼
}
}
而組裝channel理肺,bytebuffer等網(wǎng)絡數(shù)據(jù)是在NioMessageUnsafe#read()中
public void read() {
//... 省略無關代碼
try {
for (;;) {
int localRead = doReadMessages(readBuf);
// .......省略無關代碼
}
} catch (Throwable t) {
}
for (int i = 0; i < readBuf.size(); i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
pipeline.fireChannelReadComplete();
// ..... 省略無關代碼
}
}
- doReadMessages(readBuf);
把當前請求連接封裝成一個Channel(其實就是NioSocketChannel) - pipeline.fireChannelRead(readBuf.get(i));
通知Acceptor來讀取,其實就是通知ServerBootstrapAcceptor#channelRead