前言
dubbo-remoting 模塊提供了多種客戶端和服務(wù)端通信的功能。在 Dubbo 的整體架構(gòu)設(shè)計(jì)圖中疹吃,我們可以看到最底層紅色框選中的部分即為 Remoting 層萨驶,其中包括了 Exchange、Transport和Serialize 三個(gè)子層次核畴。這里我們要介紹的 dubbo-remoting 模塊主要對(duì)應(yīng) Exchange 和 Transport 兩層谤草。
本文從Transporter 層的 RemotingServer、Client温学、Channel枫浙、ChannelHandler 等核心接口出發(fā),介紹這些核心接口的實(shí)現(xiàn)紧帕。
AbstractPeer 抽象類
AbstractPeer抽象類是嗜,它同時(shí)實(shí)現(xiàn)了 Endpoint 接口和 ChannelHandler 接口,如下圖所示丽柿,它也是 AbstractChannel甫题、AbstractEndpoint 抽象類的父類坠非。
Netty 中也有 ChannelHandler盟迟、Channel 等接口攒菠,但無(wú)特殊說(shuō)明的情況下,這里的接口指的都是 Dubbo 中定義的接口新娜。
AbstractPeer 中有四個(gè)字段:一個(gè)是表示該端點(diǎn)自身的 URL 類型的字段,還有兩個(gè) Boolean 類型的字段(closing 和 closed)用來(lái)記錄當(dāng)前端點(diǎn)的狀態(tài)私杜,這三個(gè)字段都與 Endpoint 接口相關(guān)衰粹;第四個(gè)字段指向了一個(gè) ChannelHandler 對(duì)象,AbstractPeer 對(duì) ChannelHandler 接口的所有實(shí)現(xiàn)瓢捉,都是委托給了這個(gè) ChannelHandler 對(duì)象泡态。從上面的繼承關(guān)系圖中某弦,我們可以得出這樣一個(gè)結(jié)論:AbstractChannel惊科、AbstractServer馆截、AbstractClient 都是要關(guān)聯(lián)一個(gè) ChannelHandler 對(duì)象的蜡娶。
AbstractEndpoint 抽象類
我們順著上圖的繼承關(guān)系繼續(xù)向下看,AbstractEndpoint 繼承了 AbstractPeer 這個(gè)抽象類宿接。AbstractEndpoint 中維護(hù)了一個(gè) Codec2 對(duì)象(codec 字段)和兩個(gè)超時(shí)時(shí)間(timeout 字段和 connectTimeout 字段),在 AbstractEndpoint 的構(gòu)造方法中會(huì)根據(jù)傳入的 URL 初始化這三個(gè)字段:
public abstract class AbstractEndpoint extends AbstractPeer implements Resetable {
private Codec2 codec;
private int timeout;
private int connectTimeout;
public AbstractEndpoint(URL url, ChannelHandler handler) {
// 調(diào)用父類AbstractPeer的構(gòu)造方法
super(url, handler);
// 根據(jù)URL中的codec參數(shù)值副女,確定此處具體的Codec2實(shí)現(xiàn)類
this.codec = getChannelCodec(url);
// 根據(jù)URL中的timeout參數(shù)確定timeout字段的值,默認(rèn)1000
this.timeout = url.getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
// 根據(jù)URL中的connect.timeout參數(shù)確定connectTimeout字段的值塞绿,默認(rèn)3000
this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
}
}
由于Codec2 接口是一個(gè) SPI 擴(kuò)展點(diǎn)拷窜,這里的 AbstractEndpoint.getChannelCodec() 方法就是基于 Dubbo SPI 選擇其擴(kuò)展實(shí)現(xiàn)的篮昧,具體實(shí)現(xiàn)如下:
public abstract class AbstractEndpoint extends AbstractPeer implements Resetable {
protected static Codec2 getChannelCodec(URL url) {
// 根據(jù)URL的codec參數(shù)獲取擴(kuò)展名
String codecName = url.getProtocol(); // codec extension name must stay the same with protocol name
if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
// 通過(guò)ExtensionLoader加載并實(shí)例化Codec2的具體擴(kuò)展實(shí)現(xiàn)
return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
} else {
// Codec2接口不存在相應(yīng)的擴(kuò)展名,就嘗試從Codec這個(gè)老接口的擴(kuò)展名中查找春宣,目前Codec接口已經(jīng)廢棄了
return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class)
.getExtension(codecName));
}
}
}
另外躏惋,AbstractEndpoint 還實(shí)現(xiàn)了 Resetable 接口(只有一個(gè) reset() 方法需要實(shí)現(xiàn))距误,雖然 AbstractEndpoint 中的 reset() 方法比較長(zhǎng)准潭,但是邏輯非常簡(jiǎn)單,就是根據(jù)傳入的 URL 參數(shù)重置 AbstractEndpoint 的三個(gè)字段泼掠。下面是重置 codec 字段的代碼片段,還是調(diào)用 getChannelCodec() 方法實(shí)現(xiàn)的:
public abstract class AbstractEndpoint extends AbstractPeer implements Resetable {
public void reset(URL url) {
// 檢測(cè)當(dāng)前AbstractEndpoint是否已經(jīng)關(guān)閉(略)
// 省略重置timeout沐鼠、connectTimeout兩個(gè)字段的邏輯
try {
if (url.hasParameter(Constants.CODEC_KEY)) {
this.codec = getChannelCodec(url);
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
}
Server 繼承路線分析
AbstractServer 和 AbstractClient 都實(shí)現(xiàn)了 AbstractEndpoint 抽象類,我們先來(lái)看 AbstractServer 的實(shí)現(xiàn)焰檩。AbstractServer 在繼承了 AbstractEndpoint 的同時(shí),還實(shí)現(xiàn)了 RemotingServer 接口衩侥,如下圖所示:
AbstractServer
AbstractServer 是對(duì)服務(wù)端的抽象,實(shí)現(xiàn)了服務(wù)端的公共邏輯峦萎。AbstractServer 的核心字段有下面幾個(gè)爱榔。
localAddress详幽、bindAddress(InetSocketAddress 類型):分別對(duì)應(yīng)該 Server 的本地地址和綁定的地址悴能,都是從 URL 中的參數(shù)中獲取。bindAddress 默認(rèn)值與 localAddress 一致炒嘲。
accepts(int 類型):該 Server 能接收的最大連接數(shù),從 URL 的 accepts 參數(shù)中獲取夭拌,默認(rèn)值為 0,表示沒(méi)有限制。
executorRepository(ExecutorRepository 類型):負(fù)責(zé)管理線程池骡和,后面我們會(huì)深入介紹 ExecutorRepository 的具體實(shí)現(xiàn)。
executor(ExecutorService 類型):當(dāng)前 Server 關(guān)聯(lián)的線程池婆赠,由上面的 ExecutorRepository 創(chuàng)建并管理页藻。
在 AbstractServer 的構(gòu)造方法中會(huì)根據(jù)傳入的 URL初始化上述字段,并調(diào)用 doOpen() 這個(gè)抽象方法完成該 Server 的啟動(dòng)份帐,具體實(shí)現(xiàn)如下:
public abstract class AbstractServer extends AbstractEndpoint implements RemotingServer {
protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
ExecutorService executor;
private InetSocketAddress localAddress;
private InetSocketAddress bindAddress;
private int accepts;
private int idleTimeout;
private ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
// 調(diào)用父類的構(gòu)造方法
super(url, handler);
// 根據(jù)傳入的URL初始化localAddress和bindAddress
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
// 初始化accepts等字段
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
try {
// 調(diào)用doOpen()這個(gè)抽象方法璃吧,啟動(dòng)該Server
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
// 獲取該Server關(guān)聯(lián)的線程池
executor = executorRepository.createExecutorIfAbsent(url);
}
}
ExecutorRepository
在繼續(xù)分析 AbstractServer 的具體實(shí)現(xiàn)類之前,我們先來(lái)了解一下 ExecutorRepository 這個(gè)接口废境。
ExecutorRepository 負(fù)責(zé)創(chuàng)建并管理 Dubbo 中的線程池畜挨,該接口雖然是個(gè) SPI 擴(kuò)展點(diǎn),但是只有一個(gè)默認(rèn)實(shí)現(xiàn)—— DefaultExecutorRepository噩凹。在該默認(rèn)實(shí)現(xiàn)中維護(hù)了一個(gè) ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> 集合(data 字段)緩存已有的線程池逮刨,第一層 Key 值表示線程池屬于 Provider 端還是 Consumer 端,第二層 Key 值表示線程池關(guān)聯(lián)服務(wù)的端口尤辱。
DefaultExecutorRepository.createExecutorIfAbsent() 方法會(huì)根據(jù) URL 參數(shù)創(chuàng)建相應(yīng)的線程池并緩存在合適的位置娄涩,具體實(shí)現(xiàn)如下:
public class DefaultExecutorRepository implements ExecutorRepository {
public synchronized ExecutorService createExecutorIfAbsent(URL url) {
// 根據(jù)URL中的side參數(shù)值決定第一層key
String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
componentKey = CONSUMER_SIDE;
}
Map<Integer, ExecutorService> executors = data.computeIfAbsent(componentKey, k -> new ConcurrentHashMap<>());
// 根據(jù)URL中的port值確定第二層key
Integer portKey = url.getPort();
ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(url));
// If executor has been shut down, create a new one
if (executor.isShutdown() || executor.isTerminated()) {
executors.remove(portKey);
// 如果緩存中相應(yīng)的線程池已關(guān)閉咽斧,則同樣需要調(diào)用createExecutor()方法
// 創(chuàng)建新的線程池,并替換掉緩存中已關(guān)閉的線程持
executor = createExecutor(url);
executors.put(portKey, executor);
}
return executor;
}
}
在 createExecutor() 方法中替蔬,會(huì)通過(guò) Dubbo SPI 查找 ThreadPool 接口的擴(kuò)展實(shí)現(xiàn)格嗅,并調(diào)用其 getExecutor() 方法創(chuàng)建線程池。ThreadPool 接口被 @SPI 注解修飾苔悦,默認(rèn)使用 FixedThreadPool 實(shí)現(xiàn)蟋座,但是 ThreadPool 接口中的 getExecutor() 方法被 @Adaptive 注解修飾驯遇,動(dòng)態(tài)生成的適配器類會(huì)優(yōu)先根據(jù) URL 中的 threadpool 參數(shù)選擇 ThreadPool 的擴(kuò)展實(shí)現(xiàn)好乐。
@SPI("fixed")
public interface ThreadPool {
@Adaptive({THREADPOOL_KEY})
Executor getExecutor(URL url);
}
ThreadPool 接口的實(shí)現(xiàn)類如下圖所示:
不同實(shí)現(xiàn)會(huì)根據(jù) URL 參數(shù)創(chuàng)建不同特性的線程池淮蜈,這里以CacheThreadPool為例進(jìn)行分析:
public class CachedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
// 核心線程數(shù)量
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
// 最大線程數(shù)量
int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
// 緩沖隊(duì)列的最大長(zhǎng)度
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
// 非核心線程的最大空閑時(shí)長(zhǎng),當(dāng)非核心線程空閑時(shí)間超過(guò)該值時(shí)旦袋,會(huì)被回收
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
// 下面就是依賴JDK的ThreadPoolExecutor創(chuàng)建指定特性的線程池并返回
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
LimitedThreadPool:與 CacheThreadPool 一樣,可以指定核心線程數(shù)、最大線程數(shù)以及緩沖隊(duì)列長(zhǎng)度嗅剖。區(qū)別在于赏胚,LimitedThreadPool 創(chuàng)建的線程池的非核心線程不會(huì)被回收。
FixedThreadPool:核心線程數(shù)和最大線程數(shù)一致,且不會(huì)被回收革为。
上述三種類型的線程池都是基于 JDK ThreadPoolExecutor 線程池,在核心線程全部被占用的時(shí)候,會(huì)優(yōu)先將任務(wù)放到緩沖隊(duì)列中緩存勒葱,在緩沖隊(duì)列滿了之后糠聪,才會(huì)嘗試創(chuàng)建新線程來(lái)處理任務(wù)。
EagerThreadPool 創(chuàng)建的線程池是 EagerThreadPoolExecutor(繼承了 JDK 提供的 ThreadPoolExecutor),使用的隊(duì)列是 TaskQueue(繼承了LinkedBlockingQueue)。該線程池與 ThreadPoolExecutor 不同的是:在線程數(shù)沒(méi)有達(dá)到最大線程數(shù)的前提下,EagerThreadPoolExecutor 會(huì)優(yōu)先創(chuàng)建線程來(lái)執(zhí)行任務(wù),而不是放到緩沖隊(duì)列中;當(dāng)線程數(shù)達(dá)到最大值時(shí)蠢涝,EagerThreadPoolExecutor 會(huì)將任務(wù)放入緩沖隊(duì)列堡距,等待空閑線程衬吆。
EagerThreadPoolExecutor 覆蓋了 ThreadPoolExecutor 中的兩個(gè)方法:execute() 方法和 afterExecute() 方法蟀架,具體實(shí)現(xiàn)如下色徘,我們可以看到其中維護(hù)了一個(gè) submittedTaskCount 字段(AtomicInteger 類型)溪猿,用來(lái)記錄當(dāng)前在線程池中的任務(wù)總數(shù)(正在線程中執(zhí)行的任務(wù)數(shù)+隊(duì)列中等待的任務(wù)數(shù))群井。
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// do not increment in method beforeExecute!
// 任務(wù)提交之前,遞增submittedTaskCount
submittedTaskCount.incrementAndGet();
try {
// 提交任務(wù)
super.execute(command);
} catch (RejectedExecutionException rx) {
// retry to offer the task into queue.
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
// 任務(wù)被拒絕之后蛙埂,會(huì)嘗試再次放入隊(duì)列中緩存宏怔,等待空閑線程執(zhí)行
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
// 再次入隊(duì)被拒絕抓艳,則隊(duì)列已滿位他,無(wú)法執(zhí)行任務(wù)
// 遞減submittedTaskCount
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.", rx);
}
} catch (InterruptedException x) {
// 再次入隊(duì)列異常靡菇,遞減submittedTaskCount
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Throwable t) {
// decrease any way
// 任務(wù)提交異常馍惹,遞減submittedTaskCount
submittedTaskCount.decrementAndGet();
throw t;
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
// 任務(wù)指定結(jié)束遇西,遞減submittedTaskCount
submittedTaskCount.decrementAndGet();
}
}
看到這里馅精,你可能會(huì)有些疑惑:沒(méi)有看到優(yōu)先創(chuàng)建線程執(zhí)行任務(wù)的邏輯啊。其實(shí)重點(diǎn)在關(guān)聯(lián)的 TaskQueue 實(shí)現(xiàn)中粱檀,它覆蓋了 LinkedBlockingQueue.offer() 方法硫嘶,會(huì)判斷線程池的 submittedTaskCount 值是否已經(jīng)達(dá)到最大線程數(shù),如果未超過(guò)梧税,則會(huì)返回 false沦疾,迫使線程池創(chuàng)建新線程來(lái)執(zhí)行任務(wù)。示例代碼如下:
public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
private EagerThreadPoolExecutor executor;
@Override
public boolean offer(Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}
// 獲取當(dāng)前線程池中的活躍線程數(shù)
int currentPoolThreadSize = executor.getPoolSize();
// 當(dāng)前有線程空閑第队,直接將任務(wù)提交到隊(duì)列中哮塞,空閑線程會(huì)直接從中獲取任務(wù)執(zhí)行
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(runnable);
}
// 當(dāng)前沒(méi)有空閑線程,但是還可以創(chuàng)建新線程凳谦,則返回false忆畅,迫使線程池創(chuàng)建
// 新線程來(lái)執(zhí)行任務(wù)
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
// 當(dāng)前線程數(shù)已經(jīng)達(dá)到上限,只能放到隊(duì)列中緩存了
return super.offer(runnable);
}
}
線程池最后一個(gè)相關(guān)的小細(xì)節(jié)是 AbortPolicyWithReport 尸执,它繼承了 ThreadPoolExecutor.AbortPolicy家凯,覆蓋的 rejectedExecution 方法中會(huì)輸出包含線程池相關(guān)信息的 WARN 級(jí)別日志,然后進(jìn)行 dumpJStack() 方法如失,最后才會(huì)拋出RejectedExecutionException 異常绊诲。
NettyServer
回到 Server 的繼承線上,下面來(lái)看基于 Netty 4 實(shí)現(xiàn)的 NettyServer褪贵,它繼承了前文介紹的 AbstractServer掂之,實(shí)現(xiàn)了 doOpen() 方法和 doClose() 方法。這里重點(diǎn)看 doOpen() 方法脆丁,如下所示:
public class NettyServer extends AbstractServer implements RemotingServer {
private Map<String, Channel> channels;
private ServerBootstrap bootstrap;
private io.netty.channel.Channel channel;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
@Override
protected void doOpen() throws Throwable {
// 創(chuàng)建ServerBootstrap
bootstrap = new ServerBootstrap();
// 創(chuàng)建boss EventLoopGroup
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
// 創(chuàng)建worker EventLoopGroup
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
"NettyServerWorker");
// 創(chuàng)建NettyServerHandler世舰,它是一個(gè)Netty中的ChannelHandler實(shí)現(xiàn),
// 不是Dubbo Remoting層的ChannelHandler接口的實(shí)現(xiàn)
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
// 獲取當(dāng)前NettyServer創(chuàng)建的所有Channel槽卫,這里的channels集合中的
// Channel不是Netty中的Channel對(duì)象跟压,而是Dubbo Remoting層的Channel對(duì)象
channels = nettyServerHandler.getChannels();
// 初始化ServerBootstrap歼培,指定boss和worker EventLoopGroup
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
// 連接空閑超時(shí)時(shí)間
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
// NettyCodecAdapter中會(huì)創(chuàng)建Decoder和Encoder
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation",
SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
}
ch.pipeline()
// 注冊(cè)Decoder和Encoder
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
// 注冊(cè)IdleStateHandler
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
// 注冊(cè)NettyServerHandler
.addLast("handler", nettyServerHandler);
}
});
// bind
// 綁定指定的地址和端口
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
// 等待bind操作完成
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
}
看完 NettyServer 實(shí)現(xiàn)的 doOpen() 方法之后,你會(huì)發(fā)現(xiàn)它和普通Netty 的 Server 端基本流程類似:初始化 ServerBootstrap梗搅、創(chuàng)建 Boss EventLoopGroup 和 Worker EventLoopGroup、創(chuàng)建 ChannelInitializer 指定如何初始化 Channel 上的 ChannelHandler 等一系列 Netty 使用的標(biāo)準(zhǔn)化流程。
其實(shí)在 Transporter 這一層看,功能的不同其實(shí)就是注冊(cè)在 Channel 上的 ChannelHandler 不同泪掀。
核心 ChannelHandler
下面我們來(lái)逐個(gè)看看這四個(gè) ChannelHandler 的核心功能塔拳。
首先是decoder 和 encoder,它們都是 NettyCodecAdapter 的內(nèi)部類,如下圖所示弯予,分別繼承了 Netty 中的 ByteToMessageDecoder 和 MessageToByteEncoder:
還記得 AbstractEndpoint 抽象類中的 codec 字段(Codec2 類型)嗎呼寸?InternalDecoder 和 InternalEncoder 會(huì)將真正的編解碼功能委托給 NettyServer 關(guān)聯(lián)的這個(gè) Codec2 對(duì)象去處理,這里以 InternalDecoder 為例進(jìn)行分析:
final public class NettyCodecAdapter {
private final Codec2 codec;
private final URL url;
private final org.apache.dubbo.remoting.ChannelHandler handler;
private class InternalDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
// 將ByteBuf封裝成統(tǒng)一的ChannelBuffer
ChannelBuffer message = new NettyBackedChannelBuffer(input);
// 拿到關(guān)聯(lián)的Channel
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
// decode object.
do {
// 記錄當(dāng)前readerIndex的位置
int saveReaderIndex = message.readerIndex();
// 委托給Codec2進(jìn)行解碼
Object msg = codec.decode(channel, message);
// 當(dāng)前接收到的數(shù)據(jù)不足一個(gè)消息的長(zhǎng)度捐祠,會(huì)返回NEED_MORE_INPUT,
// 這里會(huì)重置readerIndex,繼續(xù)等待接收更多的數(shù)據(jù)
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
message.readerIndex(saveReaderIndex);
break;
} else {
//is it possible to go here ?
if (saveReaderIndex == message.readerIndex()) {
throw new IOException("Decode without read data.");
}
if (msg != null) {
// 將讀取到的消息傳遞給后面的Handler處理
out.add(msg);
}
}
} while (message.readable());
}
}
}
IdleStateHandler
IdleStateHandler靴寂,它是 Netty 提供的一個(gè)工具型 ChannelHandler剖踊,用于定時(shí)心跳請(qǐng)求的功能或是自動(dòng)關(guān)閉長(zhǎng)時(shí)間空閑連接的功能梆造。它的原理到底是怎樣的呢忽肛?在 IdleStateHandler 中通過(guò) lastReadTime烂斋、lastWriteTime 等幾個(gè)字段屹逛,記錄了最近一次讀/寫事件的時(shí)間煎源,IdleStateHandler 初始化的時(shí)候,會(huì)創(chuàng)建一個(gè)定時(shí)任務(wù)柄错,定時(shí)檢測(cè)當(dāng)前時(shí)間與最后一次讀/寫時(shí)間的差值。如果超過(guò)我們?cè)O(shè)置的閾值(也就是上面 NettyServer 中設(shè)置的 idleTimeout)颂跨,就會(huì)觸發(fā) IdleStateEvent 事件池颈,并傳遞給后續(xù)的 ChannelHandler 進(jìn)行處理。后續(xù) ChannelHandler 的 userEventTriggered() 方法會(huì)根據(jù)接收到的 IdleStateEvent 事件钓丰,決定是關(guān)閉長(zhǎng)時(shí)間空閑的連接躯砰,還是發(fā)送心跳探活。
NettyServerHandler
最后來(lái)看NettyServerHandler斑粱,它繼承了 ChannelDuplexHandler弃揽,這是 Netty 提供的一個(gè)同時(shí)處理 Inbound 數(shù)據(jù)和 Outbound 數(shù)據(jù)的 ChannelHandler,從下面的繼承圖就能看出來(lái)则北。
在 NettyServerHandler 中有 channels 和 handler 兩個(gè)核心字段。
- channels(Map<String,Channel>集合):記錄了當(dāng)前 Server 創(chuàng)建的所有 Channel痕慢,從下圖中可以看到尚揣,連接創(chuàng)建(觸發(fā) channelActive() 方法)、連接斷開(kāi)(觸發(fā) channelInactive()方法)會(huì)操作 channels 集合進(jìn)行相應(yīng)的增刪掖举。
- handler(ChannelHandler 類型):NettyServerHandler 內(nèi)幾乎所有方法都會(huì)觸發(fā)該 Dubbo ChannelHandler 對(duì)象(如下圖)快骗。
這里以 write() 方法為例進(jìn)行簡(jiǎn)單分析:
public class NettyServerHandler extends ChannelDuplexHandler {
private final URL url;
private final ChannelHandler handler;
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// 將發(fā)送的數(shù)據(jù)繼續(xù)向下傳遞
super.write(ctx, msg, promise);
// 并不影響消息的繼續(xù)發(fā)送,只是觸發(fā)sent()方法進(jìn)行相關(guān)的處理,這也是方法
// 名稱是動(dòng)詞過(guò)去式的原因方篮,可以仔細(xì)體會(huì)一下名秀。其他方法可能沒(méi)有那么明顯
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
handler.sent(channel, msg);
}
}
在 NettyServer 創(chuàng)建 NettyServerHandler 的時(shí)候,可以看到下面的這行代碼:
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
其中第二個(gè)參數(shù)傳入的是 NettyServer 這個(gè)對(duì)象藕溅,你可以追溯一下 NettyServer 的繼承結(jié)構(gòu)匕得,會(huì)發(fā)現(xiàn)它的最頂層父類 AbstractPeer 實(shí)現(xiàn)了 ChannelHandler,并且將所有的方法委托給其中封裝的 ChannelHandler 對(duì)象巾表,如下圖所示:
也就是說(shuō)汁掠,NettyServerHandler 會(huì)將數(shù)據(jù)委托給這個(gè) ChannelHandler。
到此為止集币,Server 這條繼承線就介紹完了考阱。你可以回顧一下,從 AbstractPeer 開(kāi)始往下鞠苟,一路繼承下來(lái)乞榨,NettyServer 擁有了 Endpoint、ChannelHandler 以及RemotingServer多個(gè)接口的能力当娱,關(guān)聯(lián)了一個(gè) ChannelHandler 對(duì)象以及 Codec2 對(duì)象吃既,并最終將數(shù)據(jù)委托給這兩個(gè)對(duì)象進(jìn)行處理。所以趾访,上層調(diào)用方只需要實(shí)現(xiàn) ChannelHandler 和 Codec2 這兩個(gè)接口就可以了态秧。