一. AbstractChannel
1.1 構(gòu)造方法
/**
* 創(chuàng)建一個新實例。
*/
protected AbstractChannel(Channel parent) {
this.parent = parent;
// 創(chuàng)建
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
/**
* 創(chuàng)建一個新實例。
*/
protected AbstractChannel(Channel parent, ChannelId id) {
this.parent = parent;
this.id = id;
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
可以看出在構(gòu)造方法中幼东,就綁定了這個通道的四個成員變量
parent
,id
,unsafe
,pipeline
。
protected ChannelId newId() {
return DefaultChannelId.newInstance();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
/**
* 由子類來實現(xiàn)肮街,創(chuàng)建對應(yīng)的 Unsafe 類型實例
*/
protected abstract AbstractUnsafe newUnsafe();
id
和pipeline
都是直接創(chuàng)建林束,默認是DefaultChannelId
和DefaultChannelPipeline
類型。newUnsafe()
是抽樣方法蹄梢,有子類才能創(chuàng)建對應(yīng)的Unsafe
類型實例疙筹。
1.2 ChannelOutboundInvoker
接口方法
Channel
還繼承了ChannelOutboundInvoker
接口,也就是說通道是可以發(fā)送出站IO
操作的禁炒。
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return pipeline.bind(localAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
return pipeline.connect(remoteAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return pipeline.connect(remoteAddress, localAddress);
}
@Override
public ChannelFuture disconnect() {
return pipeline.disconnect();
}
@Override
public ChannelFuture close() {
return pipeline.close();
}
@Override
public ChannelFuture deregister() {
return pipeline.deregister();
}
@Override
public Channel flush() {
pipeline.flush();
return this;
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
........
你會發(fā)現(xiàn)基本上都是調(diào)用 ChannelPipeline
對應(yīng)的方法而咆。
- 也就是說直接調(diào)用通道
Channel
的發(fā)送出站IO
事件的方法,和調(diào)用管道pipeline()
發(fā)送出站IO
事件的方法是一樣的幕袱。- 根據(jù) DefaultChannelPipeline 的分析暴备,我們知道這些出站
IO
事件最后都會調(diào)用到該通道的Unsafe
屬性對應(yīng)方法進行處理。
1.3 抽樣方法
AbstractChannel
還有幾個需要子類實現(xiàn)抽樣方法们豌,由子類提供不同的處理邏輯:
-
AbstractUnsafe newUnsafe()
不同類型的
Channel
有自己特定的Unsafe
類型涯捻。 -
boolean isCompatible(EventLoop loop)
判斷給定的事件輪詢器
EventLoop
和當前的通道類型是不是兼容阁危。每種類型的通道Channel
都有自己特定的事件輪詢器。 -
SocketAddress localAddress0()
和SocketAddress remoteAddress0()
通道綁定的本地地址和通道連接的遠程地址汰瘫。
-
void doBind(SocketAddress localAddress)
進行綁定操作狂打,每種類型的通道綁定處理是不一樣的。
-
void doDisconnect()
進行連接操作混弥。
-
void doClose()
進行關(guān)閉連接操作趴乡。
-
void doBeginRead()
將通道設(shè)為開始讀操作。
-
void doWrite(ChannelOutboundBuffer in)
進行寫操作蝗拿。
AbstractChannel
真正重點的操作都是在 AbstractUnsafe
中實現(xiàn)的啊晾捏,下面講解 AbstractUnsafe
。
二. AbstractUnsafe 類
2.1 成員屬性
// 寫緩沖區(qū) ChannelOutboundBuffer
private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
// 用于在接收數(shù)據(jù)時分配緩存區(qū) ByteBuf
private RecvByteBufAllocator.Handle recvHandle;
// 當前是否正在刷新數(shù)據(jù)哀托,防止重復刷新數(shù)據(jù)
private boolean inFlush0;
// 如果通道從未被注冊惦辛,則為true,否則為false
private boolean neverRegistered = true;
2.2 注冊 register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
// 當前通道已經(jīng)注冊仓手,失敗胖齐,調(diào)用 promise 的setFailure方法進行通知
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
// 這個事件輪詢器和當前通道不兼容,失敗贬蛙,調(diào)用 promise 的setFailure方法進行通知
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
// 當前線程就是通道 事件輪詢器線程,直接調(diào)用 register0 方法
register0(promise);
} else {
try {
// 通過 eventLoop.execute 方法,
// 保證 register0 方法在通道事件輪詢器線程中調(diào)用
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
// 發(fā)生異常蝉稳,要關(guān)閉通道饿这,并進行相關(guān)通知
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
private void register0(ChannelPromise promise) {
try {
// 檢查通道是否仍然打開
// 當注冊操作在 eventLoop 線程之外調(diào)用的話舅列,
// 有可能這時通道被別的線程關(guān)閉了
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// 第一次注冊
boolean firstRegistration = neverRegistered;
// 調(diào)用 AbstractChannel 的 doRegister 方法,進行注冊操作
doRegister();
neverRegistered = false;
// 設(shè)置 AbstractChannel 的 registered 成員屬性摩渺,表示已經(jīng)注冊
registered = true;
// 確保在通道未注冊前添加到管道上的 ChannelHandler 的 handlerAdded(…) 也會被調(diào)用
// 這是必需的,因為用戶可能已經(jīng)通過ChannelFutureListener中的管道觸發(fā)了事件翼虫。
pipeline.invokeHandlerAddedIfNeeded();
// 注冊成功的通知
safeSetSuccess(promise);
// 發(fā)送注冊 入站IO事件
pipeline.fireChannelRegistered();
// 只有在通道從未注冊的情況下才觸發(fā) channelActive 事件珍剑。
// 這可以防止在通道被取消注冊和重新注冊時觸發(fā)多個通道 channelActive 事件掸宛。
if (isActive()) {
if (firstRegistration) {
// 第一次注冊時,才會發(fā)送 channelActive 事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// 此通道在注冊之前招拙,設(shè)置了 autoRead()唧瘾。
// 這意味著我們需要重新設(shè)置開始讀取操作,以便介紹入站數(shù)據(jù)迫像。
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// 發(fā)生異常劈愚,要關(guān)閉通道,并進行相關(guān)通知
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
將通道注冊到事件輪詢器EventLoop
上:
- 如果當前通道已經(jīng)注冊闻妓,或者當前通道和事件輪詢器不兼容菌羽,那么注冊失敗,調(diào)用
promise
的setFailure
方法進行通知由缆。- 保證在事件輪詢器線程調(diào)用實際注冊
register0
方法注祖。- 調(diào)用
AbstractChannel
的doRegister
方法,進行注冊操作均唉,發(fā)送注冊事件是晨。- 如果通道已活躍,第一次注冊的時候舔箭,就會發(fā)送
channelActive
事件;- 如果不是罩缴,那么就可能設(shè)置開始讀的操作。
- 如果這期間發(fā)生異常层扶,就關(guān)閉通道箫章,并進行相關(guān)通知。
2.3 取消注冊 deregister
@Override
public final void deregister(final ChannelPromise promise) {
assertEventLoop();
deregister(promise, false);
}
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
// 如果 promise不是 不可取消的镜会,那么直接返回
if (!promise.setUncancellable()) {
return;
}
if (!registered) {
// 當前通道沒有注冊檬寂,那么也表示取消注冊成功,進行成功通知
safeSetSuccess(promise);
return;
}
// As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
// we need to ensure we do the actual deregister operation later. This is needed as for example,
// we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
// the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
// the deregister operation this could lead to have a handler invoked by different EventLoop and so
// threads.
// See:
// https://github.com/netty/netty/issues/4435
// 通過 invokeLater 方法戳表,將 doDeregister() 方法放在下一個事件輪詢周期進行
invokeLater(new Runnable() {
@Override
public void run() {
try {
// 調(diào)用 AbstractChannel 的 doDeregister 方法桶至,進行取消注冊操作
doDeregister();
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
if (fireChannelInactive) {
pipeline.fireChannelInactive();
}
// Some transports like local and AIO does not allow the deregistration of
// an open channel. Their doDeregister() calls close(). Consequently,
// close() calls deregister() again - no need to fire channelUnregistered, so check
// if it was registered.
if (registered) {
// 如果通道之前是注冊成功了,
// 這里才發(fā)送取消注冊的 IO 事件
registered = false;
pipeline.fireChannelUnregistered();
}
// 取消綁定成功通知
safeSetSuccess(promise);
}
}
});
}
重點就是調(diào)用
AbstractChannel
的doDeregister
方法匾旭,進行取消注冊操作镣屹。
如果fireChannelInactive == true
,將發(fā)送ChannelInactive
事件季率。
2.4 綁定 bind
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
// 檢查通道是否仍然打開
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
// 調(diào)用 AbstractChannel 的 doBind 方法野瘦,進行綁定操作
doBind(localAddress);
} catch (Throwable t) {
// 綁定失敗的通知
safeSetFailure(promise, t);
// doBind(localAddress) 方法有可能關(guān)閉這個通道,
// 就可能需要進行關(guān)閉通道的通知
closeIfClosed();
return;
}
// 如果綁定操作后,通道從不活躍變成活躍鞭光,就要發(fā)送 ChannelActive 事件
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
// 綁定成功的通知
safeSetSuccess(promise);
}
- 這個方法邏輯比較簡單吏廉,重點就是調(diào)用
AbstractChannel
的doBind
方法,進行綁定操作惰许。- 如果綁定操作后席覆,通道從不活躍變成活躍,就要發(fā)送
ChannelActive
事件汹买。
2.5 取消連接 disconnect
@Override
public final void disconnect(final ChannelPromise promise) {
assertEventLoop();
// 如果 promise不是 不可取消的佩伤,那么直接返回
if (!promise.setUncancellable()) {
return;
}
boolean wasActive = isActive();
try {
doDisconnect();
// 重置 remoteAddress and localAddress
remoteAddress = null;
localAddress = null;
} catch (Throwable t) {
safeSetFailure(promise, t);
// doDisconnect() 方法有可能關(guān)閉這個通道,
// 就可能需要進行關(guān)閉通道的通知
closeIfClosed();
return;
}
// 如果取消連接后晦毙,通道從活躍變成不活躍生巡,就要發(fā)送 ChannelInactive 事件
if (wasActive && !isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelInactive();
}
});
}
safeSetSuccess(promise);
// doDisconnect() 方法有可能關(guān)閉這個通道,
// 就可能需要進行關(guān)閉通道的通知
closeIfClosed();
}
- 這個方法邏輯比較簡單见妒,重點就是調(diào)用
AbstractChannel
的doDisconnect()
方法孤荣,進行取消連接操作。- 如果取消連接操作成功后须揣,通道從活躍變成不活躍盐股,就要發(fā)送
ChannelInactive
事件。
2.6關(guān)閉 close
public void close(final ChannelPromise promise) {
assertEventLoop();
ClosedChannelException closedChannelException =
StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");
close(promise, closedChannelException, closedChannelException, false);
}
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
// 如果 promise不是 不可取消的耻卡,那么直接返回
if (!promise.setUncancellable()) {
return;
}
if (closeInitiated) {
// closeInitiated == true疯汁,已經(jīng)調(diào)用過關(guān)閉操作了,就要return 返回了卵酪。
if (closeFuture.isDone()) {
// 已經(jīng)通道已經(jīng)關(guān)閉了幌蚊,通知 promise
safeSetSuccess(promise);
} else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
// 當前通道正在關(guān)閉,那么就添加一個監(jiān)聽器溃卡,當關(guān)閉成功后霹肝,再通知 promise
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
promise.setSuccess();
}
});
}
return;
}
// 保證關(guān)閉方法只調(diào)用一次,不能重復調(diào)用
closeInitiated = true;
final boolean wasActive = isActive();
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
// 禁止再向?qū)懢彌_區(qū) outboundBuffer 添加任何消息和刷新操作塑煎。
this.outboundBuffer = null;
Executor closeExecutor = prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// Execute the close.
doClose0(promise);
} finally {
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
invokeLater(new Runnable() {
@Override
public void run() {
if (outboundBuffer != null) {
// 使寫緩沖區(qū)中所有排隊的消息失敗
outboundBuffer.failFlushed(cause, notify);
// 關(guān)閉寫緩沖區(qū)
outboundBuffer.close(closeCause);
}
fireChannelInactiveAndDeregister(wasActive);
}
});
}
}
});
} else {
try {
// Close the channel and fail the queued messages in all cases.
doClose0(promise);
} finally {
if (outboundBuffer != null) {
// 使寫緩沖區(qū)中所有排隊的消息失敗
outboundBuffer.failFlushed(cause, notify);
// 關(guān)閉寫緩沖區(qū)
outboundBuffer.close(closeCause);
}
}
if (inFlush0) {
// 如果正在刷新操作,那么就讓 fireChannelInactiveAndDeregister 操作臭蚁,
// 放到下一個事件輪詢周期中處理
invokeLater(new Runnable() {
@Override
public void run() {
fireChannelInactiveAndDeregister(wasActive);
}
});
} else {
// 取消注冊和可能發(fā)送 ChannelInactive 事件
fireChannelInactiveAndDeregister(wasActive);
}
}
}
private void doClose0(ChannelPromise promise) {
try {
doClose();
closeFuture.setClosed();
safeSetSuccess(promise);
} catch (Throwable t) {
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
private void fireChannelInactiveAndDeregister(final boolean wasActive) {
deregister(voidPromise(), wasActive && !isActive());
}
方法流程:
- 通過
closeInitiated
成員屬性保證關(guān)閉方法只調(diào)用一次最铁,不能重復調(diào)用。 - 因為關(guān)閉連接垮兑,需要考慮寫緩沖區(qū)
ChannelOutboundBuffer
中的待寫入數(shù)據(jù)的問題冷尉。 - 通過
prepareToClose()
方法,返回一個關(guān)閉通道的事件執(zhí)行器系枪。- 如果不為空雀哨,那么就在這個事件執(zhí)行器中進行接下來的關(guān)閉操作。
- 如果為空,那么就在當前線程進行接下來的關(guān)閉操作雾棺。
- 調(diào)用
doClose0(promise)
方法膊夹,進行關(guān)閉以及操作成功或失敗的相關(guān)通知。 - 處理寫緩沖區(qū)
outboundBuffer
中的數(shù)據(jù)捌浩,并關(guān)閉寫緩沖區(qū)放刨。 - 最后調(diào)用
fireChannelInactiveAndDeregister
方法,取消管道注冊尸饺,以及可能會發(fā)送ChannelInactive
事件进统。如果在
doClose()
方法之后,通道從活躍變成不活躍的情況下浪听,才會發(fā)送ChannelInactive
事件螟碎。
2.7 shutdownOutput
@UnstableApi
public final void shutdownOutput(final ChannelPromise promise) {
assertEventLoop();
shutdownOutput(promise, null);
}
/**
* 關(guān)閉相應(yīng)通道的輸出部分。
* 例如迹栓,這將清理ChannelOutboundBuffer并不再允許任何寫操作掉分。
*/
private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
if (!promise.setUncancellable()) {
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
promise.setFailure(new ClosedChannelException());
return;
}
// 禁止再向?qū)懢彌_區(qū) outboundBuffer 添加任何消息和刷新操作。
this.outboundBuffer = null;
final Throwable shutdownCause = cause == null ?
new ChannelOutputShutdownException("Channel output shutdown") :
new ChannelOutputShutdownException("Channel output shutdown", cause);
Executor closeExecutor = prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// 調(diào)用 AbstractChannel 的 doShutdownOutput 方法迈螟,
// 進行 shutdown 操作
doShutdownOutput();
// 操作成功通知
promise.setSuccess();
// 操作失敗通知
} catch (Throwable err) {
promise.setFailure(err);
} finally {
// Dispatch to the EventLoop
eventLoop().execute(new Runnable() {
@Override
public void run() {
// 在 Shutdown 的時候叉抡,關(guān)閉寫緩沖區(qū) ChannelOutboundBuffer,
// 并發(fā)送用戶通知事件
closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
}
});
}
}
});
} else {
try {
// 調(diào)用 AbstractChannel 的 doShutdownOutput 方法,
// 進行 shutdown 操作
doShutdownOutput();
// 操作成功通知
promise.setSuccess();
} catch (Throwable err) {
// 操作失敗通知
promise.setFailure(err);
} finally {
// 在 Shutdown 的時候答毫,關(guān)閉寫緩沖區(qū) ChannelOutboundBuffer,
// 并發(fā)送用戶通知事件
closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
}
}
}
/**
* 在 Shutdown 的時候褥民,關(guān)閉寫緩沖區(qū) ChannelOutboundBuffer,
* 并發(fā)送用戶通知事件
*/
private void closeOutboundBufferForShutdown(
ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {
// 使寫緩沖區(qū)中所有排隊的消息失敗
buffer.failFlushed(cause, false);
// 關(guān)閉寫緩沖區(qū)
buffer.close(cause, true);
// 發(fā)送一個通道 Shutdown 的用戶通知事件
pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
}
shutdown
的方法流程和 close
很像洗搂,區(qū)別點:
shutdown
是調(diào)用AbstractChannel
的doShutdownOutput()
方法進行相關(guān)操作消返,而close
是調(diào)用AbstractChannel
的doClose()
方法。close
最后會取消注冊耘拇,以及可能會發(fā)送ChannelInactive
事件撵颊。- 而
shutdown
會發(fā)送一個ChannelOutputShutdownEvent.INSTANCE
用戶自定義的通知事件。
2.8 強制關(guān)閉 closeForcibly
@Override
public final void closeForcibly() {
assertEventLoop();
try {
doClose();
} catch (Exception e) {
logger.warn("Failed to close a channel.", e);
}
}
你會發(fā)現(xiàn)只調(diào)用了
AbstractChannel
的doClose()
方法進行關(guān)閉操作惫叛,不觸發(fā)任何事件倡勇,也不處理寫緩沖區(qū)。只可能在某些特殊情況下調(diào)用嘉涌,例如嘗試注冊失敗的時候妻熊。
2.9 開始讀 beginRead
@Override
public final void beginRead() {
assertEventLoop();
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
調(diào)用
AbstractChannel
的doBeginRead()
方法設(shè)置通道開始讀取數(shù)據(jù)。
2.10 寫操作 write
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// 寫緩沖區(qū)為 null仑最,
try {
// 現(xiàn)在釋放資源扔役,以防止資源泄漏
ReferenceCountUtil.release(msg);
} finally {
// 如果outboundBuffer為空,我們就知道通道被關(guān)閉了警医,所以立即進行失敗通知亿胸。
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise,
newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
}
return;
}
int size;
try {
// 進行消息的轉(zhuǎn)換坯钦,例如將堆緩沖區(qū)變成直接緩沖區(qū)
msg = filterOutboundMessage(msg);
// 估算數(shù)據(jù)的大小
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
try {
// 失敗時需要釋放資源,以防止資源泄漏
ReferenceCountUtil.release(msg);
} finally {
// 進行操作失敗的通知
safeSetFailure(promise, t);
}
return;
}
// 將數(shù)據(jù)添加到寫緩沖區(qū) outboundBuffer 中
outboundBuffer.addMessage(msg, size, promise);
}
方法流程
- 先判斷寫緩沖區(qū)
outboundBuffer
是不是為null
侈玄,為空說明通道已關(guān)閉婉刀,進行失敗通知。 - 通過
filterOutboundMessage(msg)
方法進行數(shù)據(jù)轉(zhuǎn)換拗馒,例如將堆緩沖區(qū)變成直接緩沖區(qū)路星。 - 估算數(shù)據(jù)大小。
- 通過
outboundBuffer.addMessage(...)
方法诱桂,將數(shù)據(jù)添加到寫緩沖區(qū)outboundBuffer
中洋丐。 - 如果發(fā)送異常,記得釋放數(shù)據(jù)
msg
的引用挥等,防止內(nèi)存泄露友绝,并進行操作失敗通知。
2.11 刷新 flush
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
// 寫緩沖區(qū)為空肝劲,直接返回
if (outboundBuffer == null) {
return;
}
// 將寫緩沖區(qū)中的消息都標記成待刷新
outboundBuffer.addFlush();
// 進行刷新操作
flush0();
}
@SuppressWarnings("deprecation")
protected void flush0() {
if (inFlush0) {
// 避免重復刷新
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
// 當前寫緩沖區(qū)沒有數(shù)據(jù)迁客,那么直接返回
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
// 避免重復刷新
inFlush0 = true;
// 如果通道處于非活動狀態(tài),則將所有掛起的寫請求標記為失敗辞槐。
if (!isActive()) {
try {
// Check if we need to generate the exception at all.
if (!outboundBuffer.isEmpty()) {
if (isOpen()) {
outboundBuffer.failFlushed(new NotYetConnectedException(), true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
}
}
} finally {
// 刷新操作完成掷漱,將 inFlush0重新設(shè)置為 false,以便下次刷新榄檬。
inFlush0 = false;
}
return;
}
try {
// 將給定緩沖區(qū)的內(nèi)容刷新到遠端
doWrite(outboundBuffer);
} catch (Throwable t) {
handleWriteError(t);
} finally {
// 刷新操作完成卜范,將 inFlush0重新設(shè)置為 false,以便下次刷新鹿榜。
inFlush0 = false;
}
}
- 通過
inFlush0
成員屬性海雪,來避免重復刷新。- 如果通道處于非活動狀態(tài)舱殿,則將所有掛起的寫請求標記為失敗奥裸。
- 通過
AbstractChannel
的doWrite(outboundBuffer)
方法,將緩沖區(qū)的內(nèi)容刷新到遠端沪袭。
2.12 小結(jié)
對比 Unsafe
的方法湾宙,你會發(fā)現(xiàn) AbstractUnsafe
中沒有實現(xiàn) connect(...)
連接方法。
對比發(fā)送入站IO
事件:
-
ChannelRegistered
和ChannelUnregistered
-
register
方法會發(fā)送ChannelRegistered
事件冈绊。 -
deregister
方法只有在通道之前已經(jīng)注冊之后创倔,才會發(fā)送ChannelUnregistered
事件。
-
-
ChannelActive
和ChannelInactive
- 一般都是通道
Channel
從不活躍變成活躍焚碌,要發(fā)送ChannelActive
事件;可能引起這個變化的操作有bind
和connect
操作霸妹。 - 通道
Channel
從活躍變成不活躍十电,就要發(fā)送ChannelInactive
事件;可能引起這個變化的操作有disconnect
,close
和shutdown
。 - 最后如果第一次注冊時鹃骂,且當前通道是活躍狀態(tài)台盯,也會發(fā)送
ChannelActive
事件。
- 一般都是通道
三. ChannelOutboundBuffer
在 AbstractChannel.Unsafe
中看到用戶調(diào)用write(...)
方法寫的數(shù)據(jù)畏线,會先添加到寫緩沖區(qū) ChannelOutboundBuffer
中静盅,然后調(diào)用 flush()
方法,才將寫緩沖區(qū)中的數(shù)據(jù)發(fā)送到遠端寝殴。
3.1 重要成員屬性
// 在鏈表結(jié)構(gòu)中第一個被刷新的節(jié)點
private Entry flushedEntry;
// 在鏈表結(jié)構(gòu)中第一個未刷新的節(jié)點
private Entry unflushedEntry;
// 表示鏈表中最后一個節(jié)點
private Entry tailEntry;
// 等待刷新節(jié)點的數(shù)量
private int flushed;
寫緩沖區(qū)通過鏈表來儲存數(shù)據(jù)(依靠 Entry.next
來實現(xiàn)鏈表)蒿叠,鏈表形式 Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
flushedEntry
表示第一個被刷新的節(jié)點,在鏈表頭蚣常,當然也是通過addFlush()
方法設(shè)置的市咽。unflushedEntry
表示第一個未刷新的節(jié)點,表示還沒有被標記刷新的第一個節(jié)點抵蚊。tailEntry
最后一個節(jié)點施绎。flushed
刷新節(jié)點的數(shù)量,這個屬性很重要贞绳,靠它來標記刷新節(jié)點谷醉,也就是說從flushedEntry
開始,flushed
數(shù)量的節(jié)點都被標記為刷新節(jié)點了冈闭。
3.2 重要方法
3.2.1 添加數(shù)據(jù)
這個方法一般在 AbstractChannel.AbstractUnsafe
的 write(...)
方法中調(diào)用俱尼。
/**
* 將給定的消息 msg 添加到ChannelOutboundBuffer中。
* 一旦消息寫入拒秘,給定的ChannelPromise將被通知号显。
*/
public void addMessage(Object msg, int size, ChannelPromise promise) {
// 將給定消息封裝成一個節(jié)點
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
} else {
// 將新消息節(jié)點添加到隊列尾
Entry tail = tailEntry;
tail.next = entry;
}
tailEntry = entry;
if (unflushedEntry == null) {
// 如果未刷新節(jié)點為空,說明隊列節(jié)點都變成刷新節(jié)點了躺酒,
// 那么這個新添加的節(jié)點押蚤,就是未刷新節(jié)點的頭了。
unflushedEntry = entry;
}
// See https://github.com/netty/netty/issues/1619
// 向未刷新的數(shù)組添加消息后羹应,增加掛起的字節(jié)數(shù)揽碘。
incrementPendingOutboundBytes(entry.pendingSize, false);
}
- 先將數(shù)據(jù)
msg
封裝成一個節(jié)點entry
,并將節(jié)點添加到鏈表尾园匹。- 如果
unflushedEntry
是null
,那么這個節(jié)點就是第一個未刷新節(jié)點雳刺。incrementPendingOutboundBytes(...)
方法,增加掛起的字節(jié)數(shù)裸违,看是否需要改變通道的 可寫屬性掖桦。
3.2.2 標記刷新
這個方法一般在 AbstractChannel.AbstractUnsafe
的 flush()
方法中調(diào)用。
/**
* 向此ChannelOutboundBuffer添加刷新供汛。
* 這意味著所有以前添加的消息都被標記為刷新枪汪,因此您將能夠處理它們涌穆。
*/
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
//
// See https://github.com/netty/netty/issues/2577
// 未刷新節(jié)點后面的鏈表示新添加的節(jié)點列表,都是要加入到刷新中
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
flushed ++;
// 將所有要刷新的節(jié)點變成不可取消的
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
// 掛起消息被取消雀久,所以確保我們釋放內(nèi)存并通知釋放的字節(jié)
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
// 節(jié)點都變成已刷新的了宿稀,未刷新節(jié)點就設(shè)置為 null
unflushedEntry = null;
}
}
- 將從
unflushedEntry
未刷新節(jié)點開始到鏈表尾的所有節(jié)點都標記為刷新。通過flushed++
來增加刷新節(jié)點數(shù)量赖捌。- 調(diào)用
setUncancellable(...)
要寫入的節(jié)點是不可取消的祝沸,如果設(shè)置失敗,就要取消掛起數(shù)據(jù)越庇,并調(diào)用decrementPendingOutboundBytes(...)
減少掛起字節(jié)數(shù)罩锐,看是否需要改變通道的 可寫屬性。
3.2.3 刪除節(jié)點
/**
* 將刪除當前消息悦荒,將其ChannelPromise標記為success并返回true唯欣。
* 如果在調(diào)用此方法時不存在刷新的消息,則返回false搬味,表示沒有準備好處理的消息境氢。
*/
public boolean remove() {
Entry e = flushedEntry;
if (e == null) {
clearNioBuffers();
return false;
}
Object msg = e.msg;
ChannelPromise promise = e.promise;
int size = e.pendingSize;
removeEntry(e);
if (!e.cancelled) {
// only release message, notify and decrement if it was not canceled before.
ReferenceCountUtil.safeRelease(msg);
safeSuccess(promise);
decrementPendingOutboundBytes(size, false, true);
}
// recycle the entry
e.recycle();
return true;
}
private void removeEntry(Entry e) {
if (-- flushed == 0) {
// flushed == 0, 表示所有刷新節(jié)點都被處理了
flushedEntry = null;
if (e == tailEntry) {
tailEntry = null;
unflushedEntry = null;
}
} else {
// 將下一個節(jié)點變成刷新節(jié)點
flushedEntry = e.next;
}
}
當緩存區(qū)當前刷新節(jié)點數(shù)據(jù)被寫入到遠端了,那么調(diào)用這個
remove()
方法碰纬,移除當前節(jié)點萍聊,得到下一個刷新節(jié)點。