通過(guò)前面的源碼系列文章中的netty reactor線程三部曲枕磁,我們已經(jīng)知道,netty的reactor線程就像是一個(gè)發(fā)動(dòng)機(jī)平项,驅(qū)動(dòng)著整個(gè)netty框架的運(yùn)行玖翅,而服務(wù)端的綁定和新連接的建立正是發(fā)動(dòng)機(jī)的導(dǎo)火線,將發(fā)動(dòng)機(jī)點(diǎn)燃
netty在服務(wù)端端口綁定和新連接建立的過(guò)程中會(huì)建立相應(yīng)的channel东且,而與channel的動(dòng)作密切相關(guān)的是pipeline這個(gè)概念启具,pipeline像是可以看作是一條流水線,原始的原料(字節(jié)流)進(jìn)來(lái)珊泳,經(jīng)過(guò)加工鲁冯,最后輸出
本文,我將以新連接的建立為例分為以下幾個(gè)部分給你介紹netty中的pipeline是怎么玩轉(zhuǎn)起來(lái)的
- pipeline 初始化
- pipeline 添加節(jié)點(diǎn)
- pipeline 刪除節(jié)點(diǎn)
pipeline 初始化
在新連接的建立這篇文章中色查,我們已經(jīng)知道了創(chuàng)建NioSocketChannel
的時(shí)候會(huì)將netty的核心組件創(chuàng)建出來(lái)
pipeline是其中的一員薯演,在下面這段代碼中被創(chuàng)建
AbstractChannel
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
AbstractChannel
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
DefaultChannelPipeline
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
pipeline中保存了channel的引用,創(chuàng)建完pipeline之后综慎,整個(gè)pipeline是這個(gè)樣子的
pipeline中的每個(gè)節(jié)點(diǎn)是一個(gè)ChannelHandlerContext
對(duì)象涣仿,每個(gè)context節(jié)點(diǎn)保存了它包裹的執(zhí)行器 ChannelHandler
執(zhí)行操作所需要的上下文,其實(shí)就是pipeline,因?yàn)閜ipeline包含了channel的引用好港,可以拿到所有的context信息
默認(rèn)情況下愉镰,一條pipeline會(huì)有兩個(gè)節(jié)點(diǎn),head和tail钧汹,后面的文章我們具體分析這兩個(gè)特殊的節(jié)點(diǎn)丈探,今天我們重點(diǎn)放在pipeline
pipeline添加節(jié)點(diǎn)
下面是一段非常常見(jiàn)的客戶端代碼
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new Spliter())
p.addLast(new Decoder());
p.addLast(new BusinessHandler())
p.addLast(new Encoder());
}
});
首先,用一個(gè)spliter將來(lái)源TCP數(shù)據(jù)包拆包拔莱,然后將拆出來(lái)的包進(jìn)行decoder碗降,傳入業(yè)務(wù)處理器BusinessHandler,業(yè)務(wù)處理完encoder塘秦,輸出
整個(gè)pipeline結(jié)構(gòu)如下
我用兩種顏色區(qū)分了一下pipeline中兩種不同類型的節(jié)點(diǎn)讼渊,一個(gè)是 ChannelInboundHandler
,處理inBound事件尊剔,最典型的就是讀取數(shù)據(jù)流爪幻,加工處理;還有一種類型的Handler是 ChannelOutboundHandler
, 處理outBound事件须误,比如當(dāng)調(diào)用writeAndFlush()
類方法時(shí)挨稿,就會(huì)經(jīng)過(guò)該種類型的handler
不管是哪種類型的handler,其外層對(duì)象 ChannelHandlerContext
之間都是通過(guò)雙向鏈表連接京痢,而區(qū)分一個(gè) ChannelHandlerContext
到底是in還是out奶甘,在添加節(jié)點(diǎn)的時(shí)候我們就可以看到netty是怎么處理的
DefaultChannelPipeline
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
for (ChannelHandler h: handlers) {
addLast(executor, null, h);
}
return this;
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 1.檢查是否有重復(fù)handler
checkMultiplicity(handler);
// 2.創(chuàng)建節(jié)點(diǎn)
newCtx = newContext(group, filterName(name, handler), handler);
// 3.添加節(jié)點(diǎn)
addLast0(newCtx);
}
// 4.回調(diào)用戶方法
callHandlerAdded0(handler);
return this;
}
這里簡(jiǎn)單地用synchronized
方法是為了防止多線程并發(fā)操作pipeline底層的雙向鏈表
我們還是逐步分析上面這段代碼
1.檢查是否有重復(fù)handler
在用戶代碼添加一條handler的時(shí)候,首先會(huì)查看該handler有沒(méi)有添加過(guò)
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
netty使用一個(gè)成員變量added
標(biāo)識(shí)一個(gè)channel是否已經(jīng)添加祭椰,上面這段代碼很簡(jiǎn)單臭家,如果當(dāng)前要添加的Handler是非共享的,并且已經(jīng)添加過(guò)吭产,那就拋出異常侣监,否則鸭轮,標(biāo)識(shí)該handler已經(jīng)添加
由此可見(jiàn)臣淤,一個(gè)Handler如果是sharable的,就可以無(wú)限次被添加到pipeline中窃爷,我們客戶端代碼如果要讓一個(gè)Handler被共用邑蒋,只需要加一個(gè)@Sharable標(biāo)注即可,如下
@Sharable
public class BusinessHandler {
}
而如果Handler是sharable的按厘,一般就通過(guò)spring的注入的方式使用医吊,不需要每次都new 一個(gè)
isSharable()
方法正是通過(guò)該Handler對(duì)應(yīng)的類是否標(biāo)注@Sharable來(lái)實(shí)現(xiàn)的
ChannelHandlerAdapter
public boolean isSharable() {
Class<?> clazz = getClass();
Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = cache.get(clazz);
if (sharable == null) {
sharable = clazz.isAnnotationPresent(Sharable.class);
cache.put(clazz, sharable);
}
return sharable;
}
這里也可以看到,netty為了性能優(yōu)化到極致逮京,還使用了ThreadLocal來(lái)緩存Handler的狀態(tài)卿堂,高并發(fā)海量連接下,每次有新連接添加Handler都會(huì)創(chuàng)建調(diào)用此方法
2.創(chuàng)建節(jié)點(diǎn)
回到主流程,看創(chuàng)建上下文這段代碼
newCtx = newContext(group, filterName(name, handler), handler);
這里我們需要先分析 filterName(name, handler)
這段代碼草描,這個(gè)函數(shù)用于給handler創(chuàng)建一個(gè)唯一性的名字
private String filterName(String name, ChannelHandler handler) {
if (name == null) {
return generateName(handler);
}
checkDuplicateName(name);
return name;
}
顯然览绿,我們傳入的name為null,netty就給我們生成一個(gè)默認(rèn)的name穗慕,否則饿敲,檢查是否有重名,檢查通過(guò)的話就返回
netty創(chuàng)建默認(rèn)name的規(guī)則為 簡(jiǎn)單類名#0
逛绵,下面我們來(lái)看些具體是怎么實(shí)現(xiàn)的
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
new FastThreadLocal<Map<Class<?>, String>>() {
@Override
protected Map<Class<?>, String> initialValue() throws Exception {
return new WeakHashMap<Class<?>, String>();
}
};
private String generateName(ChannelHandler handler) {
// 先查看緩存中是否有生成過(guò)默認(rèn)name
Map<Class<?>, String> cache = nameCaches.get();
Class<?> handlerType = handler.getClass();
String name = cache.get(handlerType);
// 沒(méi)有生成過(guò)怀各,就生成一個(gè)默認(rèn)name,加入緩存
if (name == null) {
name = generateName0(handlerType);
cache.put(handlerType, name);
}
// 生成完了术浪,還要看默認(rèn)name有沒(méi)有沖突
if (context0(name) != null) {
String baseName = name.substring(0, name.length() - 1);
for (int i = 1;; i ++) {
String newName = baseName + i;
if (context0(newName) == null) {
name = newName;
break;
}
}
}
return name;
}
netty使用一個(gè) FastThreadLocal
(后面的文章會(huì)細(xì)說(shuō))變量來(lái)緩存Handler的類和默認(rèn)名稱的映射關(guān)系瓢对,在生成name的時(shí)候,首先查看緩存中有沒(méi)有生成過(guò)默認(rèn)name(簡(jiǎn)單類名#0
)胰苏,如果沒(méi)有生成沥曹,就調(diào)用generateName0()
生成默認(rèn)name,然后加入緩存
接下來(lái)還需要檢查name是否和已有的name有沖突碟联,調(diào)用context0()
妓美,查找pipeline里面有沒(méi)有對(duì)應(yīng)的context
private AbstractChannelHandlerContext context0(String name) {
AbstractChannelHandlerContext context = head.next;
while (context != tail) {
if (context.name().equals(name)) {
return context;
}
context = context.next;
}
return null;
}
context0()
方法鏈表遍歷每一個(gè) ChannelHandlerContext
,只要發(fā)現(xiàn)某個(gè)context的名字與待添加的name相同鲤孵,就返回該context壶栋,最后拋出異常,可以看到普监,這個(gè)其實(shí)是一個(gè)線性搜索的過(guò)程
如果context0(name) != null
成立仍律,說(shuō)明現(xiàn)有的context里面已經(jīng)有了一個(gè)默認(rèn)name,那么就從 簡(jiǎn)單類名#1
往上一直找金矛,直到找到一個(gè)唯一的name秸抚,比如簡(jiǎn)單類名#3
如果用戶代碼在添加Handler的時(shí)候指定了一個(gè)name,那么要做到事僅僅為檢查一下是否有重復(fù)
private void checkDuplicateName(String name) {
if (context0(name) != null) {
throw new IllegalArgumentException("Duplicate handler name: " + name);
}
}
處理完name之后廊散,就進(jìn)入到創(chuàng)建context的過(guò)程桑滩,由前面的調(diào)用鏈得知,group
為null允睹,因此childExecutor(group)
也返回null
DefaultChannelPipeline
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
private EventExecutor childExecutor(EventExecutorGroup group) {
if (group == null) {
return null;
}
//..
}
DefaultChannelHandlerContext
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
構(gòu)造函數(shù)中运准,DefaultChannelHandlerContext
將參數(shù)回傳到父類,保存Handler的引用缭受,進(jìn)入到其父類
AbstractChannelHandlerContext
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
}
netty中用兩個(gè)字段來(lái)表示這個(gè)channelHandlerContext
屬于inBound
還是outBound
胁澳,或者兩者都是,兩個(gè)boolean是通過(guò)下面兩個(gè)小函數(shù)來(lái)判斷(見(jiàn)上面一段代碼)
DefaultChannelHandlerContext
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
通過(guò)instanceof
關(guān)鍵字根據(jù)接口類型來(lái)判斷米者,因此韭畸,如果一個(gè)Handler實(shí)現(xiàn)了兩類接口宇智,那么他既是一個(gè)inBound類型的Handler,又是一個(gè)outBound類型的Handler胰丁,比如下面這個(gè)類
常用的普筹,將decode操作和encode操作合并到一起的codec,一般會(huì)繼承 MessageToMessageCodec
隘马,而MessageToMessageCodec
就是繼承ChannelDuplexHandler
MessageToMessageCodec
public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler {
protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out)
throws Exception;
protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out)
throws Exception;
}
context 創(chuàng)建完了之后太防,接下來(lái)終于要將創(chuàng)建完畢的context加入到pipeline中去了
3.添加節(jié)點(diǎn)
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev; // 1
newCtx.next = tail; // 2
prev.next = newCtx; // 3
tail.prev = newCtx; // 4
}
用下面這幅圖可見(jiàn)簡(jiǎn)單的表示這段過(guò)程,說(shuō)白了酸员,其實(shí)就是一個(gè)雙向鏈表的插入操作
操作完畢蜒车,該context就加入到pipeline中
到這里,pipeline添加節(jié)點(diǎn)的操作就完成了幔嗦,你可以根據(jù)此思路掌握所有的addxxx()系列方法
4.回調(diào)用戶方法
AbstractChannelHandlerContext
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
}
到了第四步酿愧,pipeline中的新節(jié)點(diǎn)添加完成,于是便開(kāi)始回調(diào)用戶代碼 ctx.handler().handlerAdded(ctx);
邀泉,常見(jiàn)的用戶代碼如下
AbstractChannelHandlerContext
public class DemoHandler extends SimpleChannelInboundHandler<...> {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 節(jié)點(diǎn)被添加完畢之后回調(diào)到此
// do something
}
}
接下來(lái)嬉挡,設(shè)置該節(jié)點(diǎn)的狀態(tài)
AbstractChannelHandlerContext
final void setAddComplete() {
for (;;) {
int oldState = handlerState;
if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return;
}
}
}
用cas修改節(jié)點(diǎn)的狀態(tài)至:REMOVE_COMPLETE(說(shuō)明該節(jié)點(diǎn)已經(jīng)被移除) 或者 ADD_COMPLETE
pipeline刪除節(jié)點(diǎn)
netty 有個(gè)最大的特性之一就是Handler可插拔,做到動(dòng)態(tài)編織pipeline汇恤,比如在首次建立連接的時(shí)候庞钢,需要通過(guò)進(jìn)行權(quán)限認(rèn)證,在認(rèn)證通過(guò)之后因谎,就可以將此context移除基括,下次pipeline在傳播事件的時(shí)候就就不會(huì)調(diào)用到權(quán)限認(rèn)證處理器
下面是權(quán)限認(rèn)證Handler最簡(jiǎn)單的實(shí)現(xiàn),第一個(gè)數(shù)據(jù)包傳來(lái)的是認(rèn)證信息财岔,如果校驗(yàn)通過(guò)风皿,就刪除此Handler,否則匠璧,直接關(guān)閉連接
public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throws Exception {
if (verify(authDataPacket)) {
ctx.pipeline().remove(this);
} else {
ctx.close();
}
}
private boolean verify(ByteBuf byteBuf) {
//...
}
}
重點(diǎn)就在 ctx.pipeline().remove(this)
這段代碼
@Override
public final ChannelPipeline remove(ChannelHandler handler) {
remove(getContextOrDie(handler));
return this;
}
remove操作相比add簡(jiǎn)單不少桐款,分為三個(gè)步驟:
1.找到待刪除的節(jié)點(diǎn)
2.調(diào)整雙向鏈表指針刪除
3.回調(diào)用戶函數(shù)
1.找到待刪除的節(jié)點(diǎn)
DefaultChannelPipeline
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
if (ctx == null) {
throw new NoSuchElementException(handler.getClass().getName());
} else {
return ctx;
}
}
@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
AbstractChannelHandlerContext ctx = head.next;
for (;;) {
if (ctx == null) {
return null;
}
if (ctx.handler() == handler) {
return ctx;
}
ctx = ctx.next;
}
}
這里為了找到Handler對(duì)應(yīng)的context,照樣是通過(guò)依次遍歷雙向鏈表的方式夷恍,直到某一個(gè)context的Handler和當(dāng)前Handler相同魔眨,便找到了該節(jié)點(diǎn)
2.調(diào)整雙向鏈表指針刪除
DefaultChannelPipeline
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
assert ctx != head && ctx != tail;
synchronized (this) {
// 2.調(diào)整雙向鏈表指針刪除
remove0(ctx);
}
// 3.回調(diào)用戶函數(shù)
callHandlerRemoved0(ctx);
return ctx;
}
private static void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next; // 1
next.prev = prev; // 2
}
經(jīng)歷的過(guò)程要比添加節(jié)點(diǎn)要簡(jiǎn)單,可以用下面一幅圖來(lái)表示
最后的結(jié)果為
結(jié)合這兩幅圖裁厅,可以很清晰地了解權(quán)限驗(yàn)證Handler的工作原理冰沙,另外侨艾,被刪除的節(jié)點(diǎn)因?yàn)闆](méi)有對(duì)象引用到执虹,果過(guò)段時(shí)間就會(huì)被gc自動(dòng)回收
3.回調(diào)用戶函數(shù)
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
try {
ctx.handler().handlerRemoved(ctx);
} finally {
ctx.setRemoved();
}
}
到了第三步,pipeline中的節(jié)點(diǎn)刪除完成唠梨,于是便開(kāi)始回調(diào)用戶代碼 ctx.handler().handlerRemoved(ctx);
袋励,常見(jiàn)的代碼如下
AbstractChannelHandlerContext
public class DemoHandler extends SimpleChannelInboundHandler<...> {
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 節(jié)點(diǎn)被刪除完畢之后回調(diào)到此,可做一些資源清理
// do something
}
}
最后,將該節(jié)點(diǎn)的狀態(tài)設(shè)置為removed
final void setRemoved() {
handlerState = REMOVE_COMPLETE;
}
removexxx系列的其他方法族大同小異茬故,你可以根據(jù)上面的思路展開(kāi)其他的系列方法盖灸,這里不再贅述
總結(jié)
1.以新連接創(chuàng)建為例,新連接創(chuàng)建的過(guò)程中創(chuàng)建channel磺芭,而在創(chuàng)建channel的過(guò)程中創(chuàng)建了該channel對(duì)應(yīng)的pipeline赁炎,創(chuàng)建完pipeline之后,自動(dòng)給該pipeline添加了兩個(gè)節(jié)點(diǎn)钾腺,即ChannelHandlerContext徙垫,ChannelHandlerContext中有用pipeline和channel所有的上下文信息。
2.pipeline是雙向個(gè)鏈表結(jié)構(gòu)放棒,添加和刪除節(jié)點(diǎn)均只需要調(diào)整鏈表結(jié)構(gòu)
3.pipeline中的每個(gè)節(jié)點(diǎn)包著具體的處理器ChannelHandler
姻报,節(jié)點(diǎn)根據(jù)ChannelHandler
的類型是ChannelInboundHandler
還是ChannelOutboundHandler
來(lái)判斷該節(jié)點(diǎn)屬于in還是out或者兩者都是
下一篇文章將繼續(xù)pipeline的分析,敬請(qǐng)期待间螟!
如果你覺(jué)得看的不過(guò)癮吴旋,想系統(tǒng)學(xué)習(xí)Netty原理,那么你一定不要錯(cuò)過(guò)我的Netty源碼分析系列視頻:https://coding.imooc.com/class/230.html