io.netty.channel.Channel是Netty網(wǎng)絡(luò)操作抽象類,它聚合了一組功能烤礁,包括但不限于網(wǎng)路的讀讼积,寫,客戶端發(fā)起連接脚仔,主動(dòng)關(guān)閉連接勤众,鏈路關(guān)閉,獲取通信雙方的網(wǎng)絡(luò)地址等鲤脏。它也包含了Netty框架相關(guān)的一些功能们颜,包括獲取該Channel的EventLoop,獲取緩沖分配器ByteBufAllocator和pipeline等猎醇。
Unsafe是個(gè)內(nèi)部接口窥突,聚合在Channel中協(xié)助進(jìn)行網(wǎng)絡(luò)讀寫相關(guān)的操作,因?yàn)樗脑O(shè)計(jì)初衷就是Channel的內(nèi)部輔助類硫嘶,不應(yīng)該被Netty框架的上層使用者調(diào)用波岛,所以被命名為Unsafe。這里不能僅從字面理解認(rèn)為它是不安全的操作音半,而要從這個(gè)架構(gòu)的設(shè)計(jì)層面體會它的設(shè)計(jì)初衷和職責(zé)则拷。
1贡蓖、Channel功能說明
1.1、netty自定義Channel的原因
- JDK的SocketChannel和ServerSocketChannel沒有統(tǒng)一的Channel接口供業(yè)務(wù)開發(fā)者使用煌茬,對于用戶而言斥铺,沒有統(tǒng)一的操作視圖,使用起來不方便坛善。
- JDK的SocketChannel和ServerSocketChannel的主要職責(zé)就是網(wǎng)絡(luò)I/O操作晾蜘,由于它們是SPI類接口,由具體的虛擬機(jī)廠家來提供眠屎,所以通過繼承SPI功能類來擴(kuò)展其功能的難度很大剔交;直接實(shí)現(xiàn)ServerSocketChannel和SocketChannel抽象類,其工作量和重新開發(fā)一個(gè)新的Channel功能類是差不多的改衩。
- Netty的Channel需要能夠跟Netty的整體架構(gòu)融合在一起岖常,例如I/O模型,基于ChannelPipeline的定制模型葫督,以及基于元數(shù)據(jù)描述配置化的TCP參數(shù)等竭鞍,這些JDK的SocketChannel和ServerSocketChanel都沒有提供,需要重新封裝橄镜。
- 自定義的Channel偎快,功能實(shí)現(xiàn)更加靈活。
1.2洽胶、netty自定義Channel的設(shè)計(jì)理念
- 在Channel接口層晒夹,采用Facade模式進(jìn)行統(tǒng)一封裝,將網(wǎng)絡(luò)I/O操作姊氓,網(wǎng)絡(luò)I/O相關(guān)的其他操作封裝起來惋戏,統(tǒng)一對外提供。
- Channel接口的定義盡量大而全他膳,為SocketChannel和ServerSocketChannel提供統(tǒng)一的視圖响逢,由不同子類實(shí)現(xiàn)不同的功能,公共功能在抽象父類中實(shí)現(xiàn)棕孙,最大程度地實(shí)現(xiàn)功能和接口的重用舔亭。
- 具體實(shí)現(xiàn)采用聚合而非包含的方式,將相關(guān)的功能類聚合在Channel中蟀俊,由Channel統(tǒng)一分配和調(diào)度钦铺,功能實(shí)現(xiàn)更加靈活。
1.3肢预、Channel主要API
接口名 | 描述 |
---|---|
EventLoop eventLoop() | Channel需要注冊到EventLoop的多路復(fù)用器上矛洞,用于處理I/O事件,通過eventLoop()方法可以獲取到Channel注冊的EventLoop。EventLoop本質(zhì)上就是處理網(wǎng)絡(luò)讀寫事件的Reactor線程沼本。在Netty中噩峦,它不僅僅用來處理網(wǎng)絡(luò)事件,也可以用來執(zhí)行定時(shí)任務(wù)和用戶自定義NioTask等任務(wù)抽兆。 |
ChannelPipeline pipeline() | 返回channel分配的ChannelPipeline |
boolean isActive() | 判斷channel是否激活识补。激活的意義取決于底層的傳輸類型。例如辫红,一個(gè)Socket傳輸一旦連接到了遠(yuǎn)程節(jié)點(diǎn)便是活動(dòng)的凭涂,而一個(gè)Datagram傳輸一旦被打開便是活動(dòng)的 |
boolean isOpen() | 判斷Channel是否已經(jīng)打開 |
boolean isRegistered() | 判斷Channel是否已經(jīng)在對應(yīng)的EventLoop中注冊 |
SocketAddress localAddress() | 返回本地的socket地址 |
SocketAddress remoteAddress() | 返回遠(yuǎn)程的socket地址 |
Channel flush() | 將之前已寫的數(shù)據(jù)沖刷到底層Channel上去 |
boolean isWritable() | 當(dāng)且僅當(dāng)I/O線程可以立即處理寫請求時(shí),返回true贴妻;當(dāng)本方法返回false時(shí)切油,任何寫操作將進(jìn)行入隊(duì),直到i/o線程準(zhǔn)備好處理隊(duì)列中的寫請求 |
ChannelMetadata metadata() | 熟悉TCP協(xié)議的讀者可能知道名惩,當(dāng)創(chuàng)建Socket的時(shí)候需要指定TCP參數(shù)澎胡,例如接收和發(fā)送的TCP緩沖區(qū)大小,TCP的超時(shí)時(shí)間绢片。是否重用地址等滤馍。在Netty中岛琼,每個(gè)Channel對應(yīng)一個(gè)物理鏈接底循,每個(gè)連接都有自己的TCP參數(shù)配置。所以槐瑞,Channel會聚合一個(gè)ChannelMetadata用來對TCP參數(shù)提供元數(shù)據(jù)描述信息熙涤,通過metadata()方法就可以獲取當(dāng)前Channel的TCP參數(shù)配置。 |
Channel read() | 從Channel中讀取數(shù)據(jù)到第一個(gè)inBound緩沖區(qū)困檩,當(dāng)讀取完畢祠挫,觸發(fā)Handler的channelRead()事件,同時(shí)觸發(fā)Handler的channelReadComplete()事件悼沿,以讓Handler決定是否繼續(xù)進(jìn)行數(shù)據(jù)讀取等舔。如果有正在讀取的操作,則此方法不做任何操作糟趾。 |
ChannelFuture closeFuture() | 當(dāng)Channel關(guān)閉時(shí)慌植,通知對應(yīng)的ChannelFuture。此方法總是返回同一個(gè)實(shí)例 |
Unsafe unsafe() | 提供一個(gè)內(nèi)部使用的類义郑,此類實(shí)現(xiàn)了Unsafae相關(guān)接口 |
Channel parent() | 對于服務(wù)端Channel而言蝶柿,它的父Channel為空;對于客戶端Channel非驮,它的父Channel就是創(chuàng)建它的ServerSocketChannel交汤。 |
ChannelId id() | 返回ChannelId對象,ChannelId是Channel的唯一標(biāo)識劫笙。 |
ChannelConfig config() | 獲取當(dāng)前Channel的配置信息芙扎,例如CONNECT_TIMEOUT_MILLS星岗。 |
long bytesBeforeUnwritable() | isWritable()返回false時(shí),其返回可寫的字節(jié)數(shù)纵顾;否則返回0 |
long bytesBeforeWritable() | isWritable()返回true時(shí)伍茄,其返回底層緩存未寫的的字節(jié)數(shù);否則返回0 |
ByteBufAllocator alloc() | 返回內(nèi)存分配器 |
2施逾、Channel類繼承圖
Channel類繼承圖如下:
從類繼承圖可以看出:
(1)Channel是所有通用類的基礎(chǔ)接口敷矫,ServerChannel是所有服務(wù)端Channel的通用接口;
(2)AbstractChannel為Channel的基礎(chǔ)抽象類汉额,其對Channel的一些通用功能做了簡單實(shí)現(xiàn)曹仗;
(3)從AbstractChannel繼承出基于不同協(xié)議及I/O類型的Channel實(shí)現(xiàn)類;
- NioSocketChannel:異步I/O的客戶端 TCP Socket 實(shí)現(xiàn)
- NioServerSocketChannel:異步I/O的服務(wù)端 TCP Socket 實(shí)現(xiàn)
- NioDatagramChannel:異步I/O的 UDP Socket 實(shí)現(xiàn)
- NioSctpChannel:異步I/O的客戶端 Sctp Socket 實(shí)現(xiàn)
- NioSctpServerChannel:異步I/O的服務(wù)端 Sctp Socket 實(shí)現(xiàn)
- EpollSocketChannel:基于linux的Epoll實(shí)現(xiàn)的事件驅(qū)動(dòng)的客戶端TCP Socket實(shí)現(xiàn)蠕搜;
- EpollServerSocketChannel:基于linux的Epoll實(shí)現(xiàn)事件驅(qū)動(dòng)的服務(wù)端TCP Socket實(shí)現(xiàn)怎茫;
- OioSocketChannel:同步I/O的客戶端 TCP Socket 實(shí)現(xiàn)
- OioServerSocketChannel:同步I/O的服務(wù)端 TCP Socket 實(shí)現(xiàn)
- OioDatagramChannel:同步I/O的 UDP Socket 實(shí)現(xiàn)
- OioSctpChannel:同步I/O的客戶端 Sctp Socket 實(shí)現(xiàn)
- OioSctpServerChannel:同步I/O的服務(wù)端 Sctp Socket 實(shí)現(xiàn)
3、AbstractChannel源碼分析
AbstractChannel 是Channel的部分實(shí)現(xiàn)妓灌,維護(hù)了一個(gè)通道相關(guān)的資源轨蛤,如channel id, pipeline等;而且實(shí)現(xiàn)了對該套接字的IO操作虫埂,以及設(shè)置interestOps祥山;這里還沒有牽扯到底層的細(xì)節(jié),只是這個(gè)框架的結(jié)構(gòu)掉伏。
3.1缝呕、成員變量
private static final ClosedChannelException FLUSH0_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractUnsafe.class, "flush0()");
private static final ClosedChannelException ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractUnsafe.class, "ensureOpen(...)");
private static final ClosedChannelException CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractUnsafe.class, "close(...)");
private static final ClosedChannelException WRITE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractUnsafe.class, "write(...)");
private static final NotYetConnectedException FLUSH0_NOT_YET_CONNECTED_EXCEPTION = ThrowableUtil.unknownStackTrace(
new NotYetConnectedException(), AbstractUnsafe.class, "flush0()");
private final Channel parent;
private final ChannelId id;
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline;
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
private final CloseFuture closeFuture = new CloseFuture(this);
private volatile SocketAddress localAddress;
private volatile SocketAddress remoteAddress;
private volatile EventLoop eventLoop;
private volatile boolean registered;
private boolean closeInitiated;
/** Cache for the string representation of this channel */
private boolean strValActive;
private String strVal;
- FLUSH0_CLOSED_CHANNEL_EXCEPTION:當(dāng)Channel已關(guān)閉時(shí)調(diào)用AbstractUnsafe的flush0(),設(shè)置此異常斧散;
- ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION :當(dāng)Channel已關(guān)閉時(shí)調(diào)用AbstractUnsafe的ensureOpen()供常,設(shè)置此異常;
- CLOSE_CLOSED_CHANNEL_EXCEPTION :當(dāng)Channel已關(guān)閉時(shí)調(diào)用AbstractUnsafe的close()鸡捐,設(shè)置此異常栈暇;
- WRITE_CLOSED_CHANNEL_EXCEPTION:當(dāng)Channel已關(guān)閉時(shí)調(diào)用AbstractUnsafe的write(),設(shè)置此異常箍镜;
- FLUSH0_NOT_YET_CONNECTED_EXCEPTION :當(dāng)Channel未連接時(shí)調(diào)用AbstractUnsafe的flush0()源祈,設(shè)置此異常;
- parent:父Channel鹿寨;
- id:Channel對應(yīng)的全局唯一ID新博;
- unsafe:Unsafe實(shí)例;
- pipeline:當(dāng)前Channel對應(yīng)的DefaultChannelPipeline脚草;
- unsafeVoidPromise :異常通知赫悄,默認(rèn)不使用。當(dāng)我們不需要異常在pipeline中傳播時(shí),無需設(shè)置埂淮;
- closeFuture:Channel關(guān)閉通知姑隅;
- localAddress:本地地址;
- remoteAddress:遠(yuǎn)程地址倔撞;
- eventLoop:當(dāng)前Channel注冊的EventLoop讲仰;
- registered:是否已經(jīng)注冊到EventLoop;
- closeInitiated:關(guān)閉時(shí)的基礎(chǔ)參數(shù)是否已設(shè)置痪蝇;
- strValActive:是否已經(jīng)緩存Channel變成active之后的toString()值鄙陡;
- strVal:Channel的toString()值的緩存;
3.2躏啰、構(gòu)造函數(shù)
Channel構(gòu)造函數(shù)如下:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
- id:通過newId()接口實(shí)現(xiàn)趁矾,默認(rèn)實(shí)現(xiàn)方式為DefaultChannelId的newInstance()實(shí)現(xiàn),DefaultChannelId的ID生成規(guī)則為:機(jī)器ID+處理器ID+序列號+時(shí)間戳+隨機(jī)碼给僵;newId()為protected類型接口毫捣,可由子類重載;
- unsafe:通過newUnsafe()接口實(shí)現(xiàn)帝际,此方法為抽象方法蔓同,具體實(shí)現(xiàn)由子類實(shí)現(xiàn);
- pipeline:通過newChannelPipeline()接口實(shí)現(xiàn)蹲诀,默認(rèn)實(shí)現(xiàn)類為DefaultChannelPipeline斑粱,其設(shè)定了pipeline的頭處理器(head)和尾處理器(tail)等基本信息;newChannelPipeline()接口未protected類型侧甫,可由子類重載珊佣;
3.3蹋宦、核心方法
AbstractChannel的接口實(shí)現(xiàn)都比較簡單披粟,其具體實(shí)現(xiàn)基本都交由pipeline和unsafe進(jìn)行處理。AbstractUnsafe是AbstractChannel的對應(yīng)Unsafe接口實(shí)現(xiàn)冷冗。此處對AbstractUnsafe的核心接口實(shí)現(xiàn)進(jìn)行分析守屉。
3.3.1、register事件框架
register()實(shí)現(xiàn)源碼:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
此處對一些基本信息進(jìn)行檢查蒿辙,如是否已經(jīng)注冊拇泛,EventLoop是否兼容等;同時(shí)判斷當(dāng)前線程是否與EventLoop在同一線程中思灌,如果是則進(jìn)行注冊(register0())俺叭,否則將以任務(wù)方式將注冊事件放入EventLoop的執(zhí)行隊(duì)列中,以防止多線程多線程并發(fā)情況泰偿。具體注冊處理交由register0()熄守。
register0()實(shí)現(xiàn)源碼:
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
注冊流程如下:
- 設(shè)置不可取消表示,并確保Channel已經(jīng)打開;
- 具體注冊細(xì)節(jié)由子類的doRegister()實(shí)現(xiàn)裕照;
- 確保用戶的handler已經(jīng)添加到pipeline中攒发;
- 異步設(shè)置注冊成功通知,并調(diào)用fireChannelRegistered()方法異步通知register事件晋南;
- 對于服務(wù)端接受的客戶端連接惠猿,如果首次注冊,觸發(fā)Channel的Active事件负间,如果已設(shè)置autoRead偶妖,則調(diào)用beginRead()開始讀取數(shù)據(jù)。
beginRead()實(shí)現(xiàn)源碼:
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
對EventLoop及Channel是否為active進(jìn)行檢查政溃,調(diào)用doBeginRead()模板方法執(zhí)行具體的處理餐屎;若處理異常,則異步調(diào)用fireExceptionCaught()方法玩祟,進(jìn)行異常通知腹缩。
3.3.2、bind事件框架
bind()實(shí)現(xiàn)源碼:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
處理流程:
- 對基礎(chǔ)的EventLoop空扎、Channel是否打開等進(jìn)行檢查藏鹊;
- 廣播檢測、地址檢查转锈、平臺相關(guān)檢查盘寡、權(quán)限檢查等;
- 調(diào)用doBind()進(jìn)行實(shí)際的綁定撮慨,具體有由子類實(shí)現(xiàn)竿痰;
- 如果是首次Active,則異步進(jìn)行fireChannelActive()通知砌溺;
3.3.3影涉、disconnect事件框架
disconnect()實(shí)現(xiàn)源碼:
public final void disconnect(final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable()) {
return;
}
boolean wasActive = isActive();
try {
doDisconnect();
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (wasActive && !isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelInactive();
}
});
}
safeSetSuccess(promise);
closeIfClosed(); // doDisconnect() might have closed the channel
}
處理流程:
- EventLoop檢查,設(shè)置不可取消等规伐;
- 調(diào)用doDisconnect()進(jìn)行實(shí)際的斷開連接處理蟹倾;
- Channel從非Inactive變?yōu)锳ctive時(shí),異步調(diào)用fireChannelInactive()進(jìn)行Inactive通知猖闪;
3.3.4鲜棠、close事件框架
close()實(shí)現(xiàn)源碼:
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
if (!promise.setUncancellable()) {
return;
}
if (closeInitiated) {
if (closeFuture.isDone()) {
// Closed already.
safeSetSuccess(promise);
} else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
// This means close() was called before so we just register a listener and return
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
promise.setSuccess();
}
});
}
return;
}
closeInitiated = true;
final boolean wasActive = isActive();
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
Executor closeExecutor = prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// Execute the close.
doClose0(promise);
} finally {
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
invokeLater(new Runnable() {
@Override
public void run() {
if (outboundBuffer != null) {
// Fail all the queued messages
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
fireChannelInactiveAndDeregister(wasActive);
}
});
}
}
});
} else {
try {
// Close the channel and fail the queued messages in all cases.
doClose0(promise);
} finally {
if (outboundBuffer != null) {
// Fail all the queued messages.
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
}
if (inFlush0) {
invokeLater(new Runnable() {
@Override
public void run() {
fireChannelInactiveAndDeregister(wasActive);
}
});
} else {
fireChannelInactiveAndDeregister(wasActive);
}
}
}
處理流程:
- 判斷關(guān)閉的初始化信息是否設(shè)置,若未設(shè)置培慌,當(dāng)Channel已經(jīng)關(guān)閉時(shí)豁陆,設(shè)置關(guān)閉成功通知;否則添加異步關(guān)閉完成監(jiān)聽器吵护;
- 獲取輸出緩沖區(qū)ChannelOutboundBuffer及關(guān)閉執(zhí)行線程盒音;
- 若執(zhí)行線程不為空竖配,則在執(zhí)行線程中添加任務(wù),進(jìn)行實(shí)際的關(guān)閉處理里逆,并處理輸出緩沖區(qū)的沖刷數(shù)據(jù)失敗處理及fireChannelInactiveAndDeregister()事件處理进胯;
- 若執(zhí)行線程為空,則直接在本線程中執(zhí)行關(guān)閉Channel的處理原押,并處理輸出緩沖區(qū)的沖刷數(shù)據(jù)失敗處理;
- 當(dāng)inFlush0為true胁镐,即當(dāng)前輸出緩沖區(qū)正在沖刷數(shù)據(jù),則異步進(jìn)行fireChannelInactiveAndDeregister()事件通知诸衔,否則直接進(jìn)行通知盯漂;
doColse0()實(shí)現(xiàn)源碼:
private void doClose0(ChannelPromise promise) {
try {
doClose();
closeFuture.setClosed();
safeSetSuccess(promise);
} catch (Throwable t) {
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
doClose()由具體子類實(shí)現(xiàn)。
3.3.5笨农、deregister事件框架
deregister()實(shí)現(xiàn)源碼:
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
if (!promise.setUncancellable()) {
return;
}
if (!registered) {
safeSetSuccess(promise);
return;
}
// As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
// we need to ensure we do the actual deregister operation later. This is needed as for example,
// we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
// the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
// the deregister operation this could lead to have a handler invoked by different EventLoop and so
// threads.
//
// See:
// https://github.com/netty/netty/issues/4435
invokeLater(new Runnable() {
@Override
public void run() {
try {
doDeregister();
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
if (fireChannelInactive) {
pipeline.fireChannelInactive();
}
// Some transports like local and AIO does not allow the deregistration of
// an open channel. Their doDeregister() calls close(). Consequently,
// close() calls deregister() again - no need to fire channelUnregistered, so check
// if it was registered.
if (registered) {
registered = false;
pipeline.fireChannelUnregistered();
}
safeSetSuccess(promise);
}
}
});
}
處理流程:
- 如果還未注冊就缆,則直接設(shè)置成功通知;否則異步進(jìn)行注銷處理谒亦;
- 異步調(diào)用doDeregister()進(jìn)行注銷處理竭宰,進(jìn)行inactive事件通知處理,進(jìn)行unregister的事件通知處理份招;
3.3.6切揭、write事件框架
write()實(shí)現(xiàn)源碼:
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
如果outboundBuffer為空,表示Channel正在關(guān)閉锁摔,則不進(jìn)行寫處理廓旬,直接設(shè)置寫失敗,并釋放msg谐腰;filterOutboundMessage()為消息過濾器孕豹,由子類實(shí)現(xiàn);outboundBuffer.addMessage()將消息添加到輸出緩沖區(qū)中十气;
3.3.7励背、flush事件框架
flush()實(shí)現(xiàn)源碼:
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}
對輸出緩沖區(qū)器進(jìn)行檢查,為空則不進(jìn)行處理桦踊;沖刷unflushedEntry中的數(shù)據(jù)椅野;沖刷輸出緩沖區(qū)的數(shù)據(jù)终畅;
flush0()實(shí)現(xiàn)源碼:
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} else {
try {
shutdownOutput(voidPromise(), t);
} catch (Throwable t2) {
close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
}
} finally {
inFlush0 = false;
}
}
處理流程:
- 若正在沖刷數(shù)據(jù)籍胯,則不進(jìn)行處理;
- 若輸出緩沖區(qū)為空离福,則不進(jìn)行處理杖狼;
- 設(shè)置inFlush0為true,表示正在沖刷數(shù)據(jù)妖爷;
- 若Channel為Inactive狀態(tài)蝶涩,則設(shè)置沖刷數(shù)據(jù)失斃砼蟆;
- 執(zhí)行doWrite()進(jìn)行實(shí)際的寫數(shù)據(jù)绿聘,若寫異常嗽上,設(shè)置相應(yīng)的寫失敗熄攘;
- 最后設(shè)置inFlush0為false兽愤,表示完成數(shù)據(jù)沖刷;
4挪圾、AbstractNioChannel源碼分析
AbstractNioChannel類繼承圖:
4.1浅萧、成員變量
private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractNioChannel.class, "doClose()");
private final SelectableChannel ch;
protected final int readInterestOp;
volatile SelectionKey selectionKey;
boolean readPending;
private final Runnable clearReadPendingRunnable = new Runnable() {
@Override
public void run() {
clearReadPending0();
}
};
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
DO_CLOSE_CLOSED_CHANNEL_EXCEPTION:調(diào)用doClose()時(shí)錯(cuò)誤的異常;
ch:NioSocketChannel和ServerSocketChannel的公共父類哲思,用于設(shè)置參數(shù)和進(jìn)行I/O操作洼畅;
readInterestOp:Read事件,服務(wù)端OP_ACCEPT棚赔,其他OP_READ
selectionKey:Channel注冊到EventLoop后返回的選擇鍵帝簇;
readPending:底層讀事件標(biāo)記
clearReadPendingRunnable:清除底層讀事件標(biāo)記任務(wù)
connectPromise:鏈接的異步結(jié)果
connectTimeoutFuture:連接超時(shí)檢測任務(wù)異步結(jié)果
requestedRemoteAddress:連接的遠(yuǎn)端地址
4.2、核心方法
AbstractNioChannel的方法實(shí)現(xiàn)很簡單靠益,其主要是通過NioUnsafe接口進(jìn)行實(shí)現(xiàn)的己儒,而AbstractNioUnsafe為NioUnsafe的具體實(shí)現(xiàn)類,其接口繼承圖如下:
故只需要分析AbstractNioUnsafe的核心方法即可捆毫。
4.2.1闪湾、connect處理
connect()實(shí)現(xiàn)源碼:
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
// Already a connect in process.
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
處理流程:
- 設(shè)置不可取消標(biāo)志,確保Channel已經(jīng)打開绩卤,檢查連接異步監(jiān)聽是否已存在等途样;
- 調(diào)用doConnect()進(jìn)行實(shí)際連接處理,此方法為抽象方法濒憋,由子類實(shí)現(xiàn)何暇;
- 如果連接成功,則設(shè)置連接成功的異步通知凛驮;
- 如果設(shè)置連接超時(shí)時(shí)間裆站,則添加超時(shí)任務(wù),進(jìn)行連接超時(shí)的異步通知黔夭;
- 添加異步連接通知的監(jiān)聽器宏胯,若異步結(jié)果被取消,則取消連接超時(shí)任務(wù)本姥,清除連接的異步結(jié)果肩袍;
fulfileConnectPromise()實(shí)現(xiàn)源碼:
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
// We still need to ensure we call fireChannelActive() in this case.
boolean active = isActive();
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && active) {
pipeline().fireChannelActive();
}
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
promise.tryFailure(cause);
closeIfClosed();
}
fulfileConnectPromise()主要設(shè)置異步結(jié)果為成功,并出發(fā)Channel的Active事件婚惫。
finishConnect()實(shí)現(xiàn)源碼:
public final void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out.
assert eventLoop().inEventLoop();
try {
boolean wasActive = isActive();
doFinishConnect();
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
// See https://github.com/netty/netty/issues/1770
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
}
finishConnect()只由EventLoop處理就緒selectionKey的OP_CONNECT事件時(shí)調(diào)用氛赐,從而完成連接操作魂爪。注意:連接操作被取消或者超時(shí)不會使該方法被調(diào)用。
4.2.2艰管、doRegister處理
doRegister()實(shí)現(xiàn)源碼:
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
@Override
protected void doDeregister() throws Exception {
eventLoop().cancel(selectionKey());
}
對于Register事件滓侍,將Channel注冊到給定NioEventLoop的selector上即可。注意牲芋,其中第二個(gè)參數(shù)0表示注冊時(shí)不關(guān)心任何事件粗井,第三個(gè)參數(shù)為Netty的NioChannel對象本身。如果拋出CancelledKeyException 表示當(dāng)前的Channel對應(yīng)的SelectionKey已經(jīng)被取消街图,此時(shí)立即對Selector進(jìn)行select操作浇衬,其原因如下:
當(dāng)select( )中的任意一種被調(diào)用時(shí),如下步驟將被執(zhí)行:
(1)已取消的鍵的集合將會被檢查餐济。如果它是非空的耘擂,每個(gè)已取消的鍵的集合中的鍵將從另外兩個(gè)集合中移除,并且相關(guān)的通道將被注銷絮姆。這個(gè)步驟結(jié)束后它掂,已取消的鍵的集合將是空的魄揉。
(2)已注冊的鍵的集合中的鍵的interest集合將被檢查。在這個(gè)步驟中的檢查執(zhí)行過后,對interest集合的改動(dòng)不會影響剩余的檢查過程瞳收。
對于Deregister事件插爹,選擇鍵執(zhí)行cancle()操作情妖,選擇鍵表示JDK Channel和selctor的關(guān)系码秉,調(diào)用cancle()終結(jié)這種關(guān)系,從而實(shí)現(xiàn)從NioEventLoop中Deregister矮燎。需要注意的是:cancle操作調(diào)用后定血,注冊關(guān)系不會立即生效,而會將cancle的key移入selector的一個(gè)取消鍵集合诞外,當(dāng)下次調(diào)用select相關(guān)方法或一個(gè)正在進(jìn)行的select調(diào)用結(jié)束時(shí)澜沟,會從取消鍵集合中移除該選擇鍵,此時(shí)注銷才真正完成峡谊。一個(gè)Cancle的選擇鍵為無效鍵茫虽,調(diào)用它相關(guān)的方法會拋出CancelledKeyException。
4.2.3既们、doRead處理
doRead()實(shí)現(xiàn)源碼:
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
doRead()操作只是簡單將read事件添加到SelectionKey中濒析。
5、AbstractNioByteChannel源碼分析
AbstractNioByteChannel類繼承圖如下:
其對應(yīng)的Unsafe類繼承圖如下:
AbstractNioByteChannel的實(shí)現(xiàn)很簡單贤壁,本處主要分析其對應(yīng)的NioByteUnsafe類悼枢。其是 NioSocketChannel 的父類,只有一個(gè)成員變量 flushTask脾拆,負(fù)責(zé)寫半包消息馒索。
5.1、核心方法
5.1.1名船、read處理
read()實(shí)現(xiàn)源碼:
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
處理流程:
- 檢查輸入流是否已關(guān)閉或遠(yuǎn)端已經(jīng)關(guān)閉绰上,是則清除讀處理;
- 調(diào)用doReadBytes()抽象方法讀取數(shù)據(jù)渠驼;
- 若數(shù)據(jù)長度小于等于0蜈块,表示無數(shù)據(jù)可讀,釋放緩沖區(qū)迷扇;若讀取的數(shù)據(jù)長度小于0表讀出錯(cuò)百揭,設(shè)置關(guān)閉相關(guān)標(biāo)志;
- 數(shù)據(jù)讀取完成后蜓席,pipeline進(jìn)行read事件通知器一;
- 若需要關(guān)閉Channel,則調(diào)用closeOnRead()進(jìn)行關(guān)閉處理厨内;
- 若讀處理異常祈秕,則調(diào)用handleReadException()進(jìn)行讀異常處理;
closeOnRead()實(shí)現(xiàn)源碼:
private void closeOnRead(ChannelPipeline pipeline) {
if (!isInputShutdown0()) {
if (isAllowHalfClosure(config())) {
shutdownInput();
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidPromise());
}
} else {
inputClosedSeenErrorOnRead = true;
pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
}
}
當(dāng)Input流已未關(guān)閉雏胃,且Channel參數(shù)ALLOW_HALF_CLOSURE為True時(shí)请毛,會觸發(fā)用戶事件ChannelInputShutdownEvent,否則瞭亮,直接關(guān)閉該Channel方仿;否則會觸發(fā)用戶事件ChannelInputShutdownReadComplete。
handleReadException()實(shí)現(xiàn)源碼:
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
RecvByteBufAllocator.Handle allocHandle) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
}
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) {
closeOnRead(pipeline);
}
}
當(dāng)緩存區(qū)有數(shù)據(jù)時(shí)统翩,會觸發(fā)read事件兼丰,觸發(fā)讀完成時(shí)間,觸發(fā)異常時(shí)間唆缴;
5.1.2鳍征、write處理
doWrite()實(shí)現(xiàn)源碼:
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = config().getWriteSpinCount();
do {
Object msg = in.current();
//無數(shù)據(jù)需要寫,則清除SelectionKey的寫標(biāo)志
if (msg == null) {
// Wrote all messages.
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
//調(diào)用doWriteInternal做實(shí)際的寫處理
writeSpinCount -= doWriteInternal(in, msg);
} while (writeSpinCount > 0);
incompleteWrite(writeSpinCount < 0);
}
此處處理比較簡單面徽,如果無數(shù)據(jù)可寫艳丛,則清除SelectionKeyd的寫事件;否則調(diào)用doWriteInternal()進(jìn)行數(shù)據(jù)的寫入趟紊;之后調(diào)用incompleteWrite()對是否寫完成進(jìn)行處理氮双。
doWriteInternal()實(shí)現(xiàn)源碼:
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
//msg類型為ByteBuf類型?
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.isReadable()) {
in.remove();
return 0;
}
final int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (!buf.isReadable()) {
in.remove();
}
return 1;
}
//msg類型為FileRegion類型霎匈?表示寫文件數(shù)據(jù)
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
if (region.transferred() >= region.count()) {
in.remove();
return 0;
}
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (region.transferred() >= region.count()) {
in.remove();
}
return 1;
}
} else {
// Should not reach here.
throw new Error();
}
return WRITE_STATUS_SNDBUF_FULL;
}
此處主要對兩種緩沖區(qū)進(jìn)行了區(qū)分戴差,ByteBuf和FileRegion;如果為ByteBuf铛嘱,則最終調(diào)用doWriteBytes()進(jìn)行數(shù)據(jù)的寫入暖释;如果為fileRegion則最終調(diào)用doWriteFileRegion()對文件進(jìn)行寫入袭厂。兩個(gè)寫入方法都為抽象方法,由子類實(shí)現(xiàn)球匕。
incompleteWrite()實(shí)現(xiàn)源碼:
protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely.
// setOpWrite 為 true表示數(shù)據(jù)未寫完纹磺,設(shè)置 SelectionKey 寫標(biāo)志位
if (setOpWrite) {
setOpWrite();
} else {
// It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
// use our write quantum. In this case we no longer want to set the write OP because the socket is still
// writable (as far as we know). We will find out next time we attempt to write if the socket is writable
// and set the write OP if necessary.
// 清除寫標(biāo)志
clearOpWrite();
// Schedule flush again later so other tasks can be picked up in the meantime
// 啟動(dòng)清除flushTask任務(wù),繼續(xù)寫半包消息
eventLoop().execute(flushTask);
}
}
6亮曹、NioSocketChannel源碼分析
NioSocketChannel實(shí)現(xiàn)源碼:
NioSocketChannel為Netty的NIO底層客戶端的具體實(shí)現(xiàn)類橄杨,一些具體的底層處理在此處實(shí)現(xiàn)。
6.1照卦、核心方法
6.1.1式矫、doBind操作
doBind()實(shí)現(xiàn)源碼:
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
doBind0(localAddress);
}
private void doBind0(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
SocketUtils.bind(javaChannel(), localAddress);
} else {
SocketUtils.bind(javaChannel().socket(), localAddress);
}
}
此處調(diào)用底層JDK進(jìn)行Channel的綁定操作。
6.1.2役耕、doConnect操作
doConnect()實(shí)現(xiàn)源碼:
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
@Override
protected void doFinishConnect() throws Exception {
if (!javaChannel().finishConnect()) {
throw new Error();
}
}
JDK中的Channel在非阻塞模式下調(diào)用connect()方法時(shí)采转,會立即返回結(jié)果:成功建立連接返回True,操作還在進(jìn)行時(shí)返回False蹄葱。返回False時(shí)氏义,需要在底層OP_CONNECT事件就緒時(shí),調(diào)用finishConnect()方法完成連接操作图云。
6.1.3惯悠、doWrite操作
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
if (in.isEmpty()) {
// All written so clear OP_WRITE
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
// Ensure the pending writes are made of ByteBufs only.
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
int nioBufferCnt = in.nioBufferCount();
// Always us nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) {
case 0:
// We have something else beside ByteBuffers to write so fallback to normal writes.
writeSpinCount -= doWrite0(in);
break;
case 1: {
// Only one ByteBuf so use non-gathering write
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
default: {
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
// We limit the max amount to int above so cast is safe
long attemptedBytes = in.nioBufferSize();
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
// Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
incompleteWrite(writeSpinCount < 0);
}
本段代碼做的優(yōu)化是:當(dāng)輸出緩沖區(qū)中有多個(gè)buffer時(shí),采用Gathering Writes將數(shù)據(jù)從這些buffer寫入到同一個(gè)channel竣况。同時(shí)動(dòng)態(tài)調(diào)整一次寫的最大緩沖區(qū)大小克婶。
7、AbstractNioMessageChannel源碼分析
AbstractNioMessageChannel類繼承圖如下:
7.1丹泉、核心方法
AbstractNioMessageChannel 是 NioServerSocketChannel情萤、NioDatagramChannel 的父類。其主要方法也是 doWrite摹恨,功能和 AbstractNioByteChannel 的 doWrite 也類似筋岛,區(qū)別只是后者只處理 ByteBuf 和 FileRegion,前者無此限制晒哄,處理所有 Object睁宰。
7.1.1、doWrite
doWrite實(shí)現(xiàn)源碼:
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
final SelectionKey key = selectionKey();
final int interestOps = key.interestOps();
for (;;) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
//無數(shù)據(jù)可寫則清除SelectionKey的寫事件標(biāo)志
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
break;
}
try {
boolean done = false;
for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
//doWriteMessage為抽象方法寝凌,由子類實(shí)現(xiàn)
if (doWriteMessage(msg, in)) {
done = true;
break;
}
}
if (done) {
in.remove();
} else {
// Did not write all messages.
//未寫完則設(shè)置SelectionKey的寫標(biāo)志位柒傻,等待底層可寫時(shí)繼續(xù)寫入
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
break;
}
} catch (Exception e) {
if (continueOnWriteError()) {
in.remove(e);
} else {
throw e;
}
}
}
}
doWriteMessage 方法在 NioServerSocketChannel 中實(shí)現(xiàn)如下所示,是因?yàn)?NioServerSocketChannel 只是用來監(jiān)聽端口较木,接收客戶端請求红符,不負(fù)責(zé)傳輸實(shí)際數(shù)據(jù)。
NioServerSocketChannel中doWriteMessage()實(shí)現(xiàn):
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
throw new UnsupportedOperationException();
}
7.1.2、read實(shí)現(xiàn)
read()實(shí)現(xiàn)源碼:
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
//doReadMessages為抽象方法预侯,由子類實(shí)現(xiàn)
int localRead = doReadMessages(readBuf);
//無數(shù)據(jù)可讀致开?
if (localRead == 0) {
break;
}
//讀取數(shù)據(jù)出錯(cuò)?
if (localRead < 0) {
closed = true;
break;
}
//增加消息數(shù)
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//觸發(fā)pipeline的read事件雌桑,將讀取到的數(shù)據(jù)交由應(yīng)用層handler進(jìn)行處理
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
//觸發(fā)pipeline的讀取完成事件
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
//觸發(fā)pipeline的異常事件
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
8喇喉、NioServerSocketChannel源碼分析
NioServerSocketChannel類繼承圖:
NioServerSocketChannel 是服務(wù)端 Channel 的實(shí)現(xiàn)類祖今,有一個(gè)用于配置 TCP 參數(shù)的 ServerSocketChannelConfig校坑。
8.1、核心方法
NioServerSocketChannel為Netty的服務(wù)端NIO的實(shí)現(xiàn)千诬,其只支持bind耍目、read和close操作。
8.1.1徐绑、doBind實(shí)現(xiàn)
doBind()實(shí)現(xiàn)源碼:
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
bind()的實(shí)現(xiàn)都是基于底層JDK實(shí)現(xiàn)的邪驮。
8.1.2、doReadMessage實(shí)現(xiàn)
doReadMessage()實(shí)現(xiàn)源碼:
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
NioServerSocketChannel用于服務(wù)端的連接監(jiān)聽傲茄,此處的doReadMessages()方法每次最多返回一個(gè)消息(客戶端連接)毅访,由此可知NioServerSocketChannel的read操作一次至多處理的連接數(shù)為config.getMaxMessagesPerRead(),也就是參數(shù)值MAX_MESSAGES_PER_READ盘榨。
8.1.3喻粹、doClose實(shí)現(xiàn)
doClose()實(shí)現(xiàn)源碼:
protected void doClose() throws Exception {
javaChannel().close();
}
調(diào)用底層JDK的Channel進(jìn)行連接的關(guān)閉。
9草巡、其他實(shí)現(xiàn)類
通過Channel的相關(guān)類繼承圖知道守呜,Netty還有其他不同實(shí)現(xiàn)的Channel,除了NIO類型的山憨,還有基于OIO的OioSocketChannel和OioServerSocketChannel查乒,基于Epoll的EpollSocketChannel和EpollServerSocketChannel等,在此不多贅述郁竟。
相關(guān)閱讀:
Netty源碼愫讀(一)ByteBuf相關(guān)源碼學(xué)習(xí) 【http://www.reibang.com/p/016daa404957】
Netty源碼愫讀(三)ChannelPipeline玛迄、ChannelHandlerContext相關(guān)源碼學(xué)習(xí)【http://www.reibang.com/p/be82d0fcdbcc】
Netty源碼愫讀(四)ChannelHandler相關(guān)源碼學(xué)習(xí)【http://www.reibang.com/p/6ee0a3b9d73a】
Netty源碼愫讀(五)EventLoop與EventLoopGroup相關(guān)源碼學(xué)習(xí)【http://www.reibang.com/p/05096995d296】
Netty源碼愫讀(六)ServerBootstrap相關(guān)源碼學(xué)習(xí)【http://www.reibang.com/p/a71a9a0291f3】
參考書籍:
《Netty權(quán)威指南》第二版
參考博客:
http://www.reibang.com/p/9258af254e1d
https://blog.csdn.net/vonzhoufz/article/details/39159193
https://hk.saowen.com/a/6cb46b200931fa05dbb222736de625a1d31e664e336a139f1f4794ff83d038ad
https://blog.csdn.net/lz710117239/article/details/77651209
https://juejin.im/post/5bda9ed6f265da3913474110