1.Pipeline流水線
Netty的業(yè)務(wù)處理器流水線ChannelPipeline是基于責(zé)任鏈設(shè)計(jì)模式(Chain of Responsibility)來設(shè)計(jì)的旭寿,內(nèi)部是一個(gè)雙向鏈表結(jié)構(gòu),能夠支持動(dòng)態(tài)地添加和刪除Handler業(yè)務(wù)處理器赁项。
-
入站操作
image.png -
出站操作
出站流水從后往前
image.png
1.1ChanneHandlerContext上下文
- 不論定義那種類型的Handler業(yè)務(wù)處理器,最終形式都是以雙向鏈表方式保存的
- 在Handler業(yè)務(wù)處理器被添加到流水線中時(shí)涩僻,會創(chuàng)建一個(gè)通道處理器上下文ChannelHandlerContext晌涕,它代表了ChannelHandler通道處理器和ChannelPipeline通道流水線之間的關(guān)聯(lián)。
- 主要分類
1.獲取上下文所關(guān)聯(lián)的Netty組件實(shí)例撞鹉,如所關(guān)聯(lián)的通道嗜诀、所關(guān)聯(lián)的流水線、上下文內(nèi)部Handler業(yè)務(wù)處理器實(shí)例等孔祸;
- 是入站和出站處理方法
- Channel隆敢、Handler、ChannelHandlerContext三者的關(guān)系為:
Channel通道擁有一條ChannelPipeline通道流水線崔慧,每一個(gè)流水線節(jié)點(diǎn)為一個(gè)ChannelHandlerContext通道處理器上下文對象拂蝎,每一個(gè)上下文中包裹了一個(gè)ChannelHandler通道處理器。在ChannelHandler通道處理器的入站/出站處理方法中惶室,Netty都會傳遞一個(gè)Context上下文實(shí)例作為實(shí)際參數(shù)温自。通過Context實(shí)例的實(shí)參,在業(yè)務(wù)處理中皇钞,可以獲取ChannelPipeline通道流水線的實(shí)例或者Channel通道的實(shí)例悼泌。
1.2階段流水線的處理方式
- 1.入站在channelRead方法中,不去調(diào)用父類的channelRead入站方法
-2.出站處理流程中夹界,只要開始就不能被階段
2.ByteBuf緩沖區(qū)
- ByteBuf的優(yōu)勢:
- Pooling (池化馆里,這點(diǎn)減少了內(nèi)存復(fù)制和GC,提升了效率)
- 復(fù)合緩沖區(qū)類型可柿,支持零復(fù)制
- 不需要調(diào)用flip()方法去切換讀/寫模式
- 擴(kuò)展性好鸠踪,例如StringBuffer
- 可以自定義緩沖區(qū)類型
- 讀取和寫入索引分開
- 方法的鏈?zhǔn)秸{(diào)用
- 可以進(jìn)行引用計(jì)數(shù),方便重復(fù)使用
-
ByteBuf是一個(gè)字節(jié)容器复斥,內(nèi)部是一個(gè)字節(jié)數(shù)組
image.png
2.1ByteBuf的重要屬性
- readerIndex(讀指針):指示讀取的起始位置营密。每讀取一個(gè)字節(jié),readerIndex自動(dòng)增加1目锭。一旦readerIndex與writerIndex相等评汰,則表示ByteBuf不可讀了纷捞。
- writerIndex(寫指針):指示寫入的起始位置。每寫一個(gè)字節(jié)被去,writerIndex自動(dòng)增加1兰绣。一旦增加到writerIndex與capacity()容量相等,則表示ByteBuf已經(jīng)不可寫了编振。capacity()是一個(gè)成員方法缀辩,不是一個(gè)成員屬性,它表示ByteBuf中可以寫入的容量踪央。注意臀玄,它不是最大容量maxCapacity。
- maxCapacity(最大容量):表示ByteBuf可以擴(kuò)容的最大容量畅蹂。當(dāng)向ByteBuf寫數(shù)據(jù)的時(shí)候健无,如果容量不足,可以進(jìn)行擴(kuò)容液斜。擴(kuò)容的最大限度由maxCapacity的值來設(shè)定累贤,超過maxCapacity就會報(bào)錯(cuò)。
2.2ByteBuf的三組方法
- 容量系列:capacity()少漆、maxCapacity()
- 寫入系列: isWritable()臼膏、writableBytes() 等
- 讀取系列: isReadable( )、 readableBytes( )等
2.3代碼示例
public class WriteReadTest {
@Test
public void testWriteRead() {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
print("動(dòng)作:分配 ByteBuf(9, 100)", buffer);
buffer.writeBytes(new byte[]{1, 2, 3, 4});
print("動(dòng)作:寫入4個(gè)字節(jié) (1,2,3,4)", buffer);
Logger.info("start==========:get==========");
getByteBuf(buffer);
print("動(dòng)作:取數(shù)據(jù) ByteBuf", buffer);
Logger.info("start==========:read==========");
readByteBuf(buffer);
print("動(dòng)作:讀完 ByteBuf", buffer);
}
//讀取一個(gè)字節(jié)
private void readByteBuf(ByteBuf buffer) {
while (buffer.isReadable()) {
Logger.info("讀取一個(gè)字節(jié):" + buffer.readByte());
}
}
//讀取一個(gè)字節(jié)示损,不改變指針
private void getByteBuf(ByteBuf buffer) {
for (int i = 0; i < buffer.readableBytes(); i++) {
Logger.info("讀取一個(gè)字節(jié):" + buffer.getByte(i));
}
}
}
日志打由酢:
[main|PrintAttribute.print] |> after ===========動(dòng)作:分配 ByteBuf(9, 100)============
[main|PrintAttribute.print] |> 1.0 isReadable(): false
[main|PrintAttribute.print] |> 1.1 readerIndex(): 0
[main|PrintAttribute.print] |> 1.2 readableBytes(): 0
[main|PrintAttribute.print] |> 2.0 isWritable(): true
[main|PrintAttribute.print] |> 2.1 writerIndex(): 0
[main|PrintAttribute.print] |> 2.2 writableBytes(): 256
[main|PrintAttribute.print] |> 3.0 capacity(): 256
[main|PrintAttribute.print] |> 3.1 maxCapacity(): 2147483647
[main|PrintAttribute.print] |> 3.2 maxWritableBytes(): 2147483647
[main|PrintAttribute.print] |> after ===========動(dòng)作:寫入4個(gè)字節(jié) (1,2,3,4)============
[main|PrintAttribute.print] |> 1.0 isReadable(): true
[main|PrintAttribute.print] |> 1.1 readerIndex(): 0
[main|PrintAttribute.print] |> 1.2 readableBytes(): 4
[main|PrintAttribute.print] |> 2.0 isWritable(): true
[main|PrintAttribute.print] |> 2.1 writerIndex(): 4
[main|PrintAttribute.print] |> 2.2 writableBytes(): 252
[main|PrintAttribute.print] |> 3.0 capacity(): 256
[main|PrintAttribute.print] |> 3.1 maxCapacity(): 2147483647
[main|PrintAttribute.print] |> 3.2 maxWritableBytes(): 2147483643
[main|WriteReadTest.testWriteRead] |> start==========:get==========
[main|WriteReadTest.getByteBuf] |> 讀取一個(gè)字節(jié):1
[main|WriteReadTest.getByteBuf] |> 讀取一個(gè)字節(jié):2
[main|WriteReadTest.getByteBuf] |> 讀取一個(gè)字節(jié):3
[main|WriteReadTest.getByteBuf] |> 讀取一個(gè)字節(jié):4
[main|PrintAttribute.print] |> after ===========動(dòng)作:取數(shù)據(jù) ByteBuf============
[main|PrintAttribute.print] |> 1.0 isReadable(): true
[main|PrintAttribute.print] |> 1.1 readerIndex(): 0
[main|PrintAttribute.print] |> 1.2 readableBytes(): 4
[main|PrintAttribute.print] |> 2.0 isWritable(): true
[main|PrintAttribute.print] |> 2.1 writerIndex(): 4
[main|PrintAttribute.print] |> 2.2 writableBytes(): 252
[main|PrintAttribute.print] |> 3.0 capacity(): 256
[main|PrintAttribute.print] |> 3.1 maxCapacity(): 2147483647
[main|PrintAttribute.print] |> 3.2 maxWritableBytes(): 2147483643
[main|WriteReadTest.testWriteRead] |> start==========:read==========
[main|WriteReadTest.readByteBuf] |> 讀取一個(gè)字節(jié):1
[main|WriteReadTest.readByteBuf] |> 讀取一個(gè)字節(jié):2
[main|WriteReadTest.readByteBuf] |> 讀取一個(gè)字節(jié):3
[main|WriteReadTest.readByteBuf] |> 讀取一個(gè)字節(jié):4
[main|PrintAttribute.print] |> after ===========動(dòng)作:讀完 ByteBuf============
[main|PrintAttribute.print] |> 1.0 isReadable(): false
[main|PrintAttribute.print] |> 1.1 readerIndex(): 4
[main|PrintAttribute.print] |> 1.2 readableBytes(): 0
[main|PrintAttribute.print] |> 2.0 isWritable(): true
[main|PrintAttribute.print] |> 2.1 writerIndex(): 4
[main|PrintAttribute.print] |> 2.2 writableBytes(): 252
[main|PrintAttribute.print] |> 3.0 capacity(): 256
[main|PrintAttribute.print] |> 3.1 maxCapacity(): 2147483647
[main|PrintAttribute.print] |> 3.2 maxWritableBytes(): 2147483643
使用get取數(shù)據(jù)不會影想ByteBuf的指針
2.4ByteBuf的引用計(jì)數(shù)
- Netty的ByteBuf的內(nèi)存回收工作是通過引用計(jì)數(shù)的方式管理的
- Netty采用“計(jì)數(shù)器”來追蹤ByteBuf的生命周期,一是對PooledByteBuf的支持检访,二是能夠盡快地“發(fā)現(xiàn)”那些可以回收的ByteBuf(非Pooled)始鱼,以便提升ByteBuf的分配和銷毀的效率
- 什么是Pooled(池化)的ByteBuf緩沖區(qū)呢?
在通信程序的執(zhí)行過程中脆贵,Buffer緩沖區(qū)實(shí)例會被頻繁創(chuàng)建医清、使用、釋放卖氨。大家都知道会烙,頻繁創(chuàng)建對象、內(nèi)存分配双泪、釋放內(nèi)存持搜,系統(tǒng)的開銷大、性能低焙矛,如何提升性能、提高Buffer實(shí)例的使用率呢残腌?從Netty4版本開始村斟,新增了對象池化的機(jī)制贫导。即創(chuàng)建一個(gè)Buffer對象池,將沒有被引用的Buffer對象蟆盹,放入對象緩存池中孩灯;當(dāng)需要時(shí),則重新從對象緩存池中取出逾滥,而不需要重新創(chuàng)建
- 引用計(jì)數(shù)的大致規(guī)則如下:
在默認(rèn)情況下峰档,當(dāng)創(chuàng)建完一個(gè)ByteBuf時(shí),它的引用為1寨昙;每次調(diào)用retain()方法讥巡,它的引用就加1;每次調(diào)用release()方法舔哪,就是將引用計(jì)數(shù)減1欢顷;如果引用為0,再次訪問這個(gè)ByteBuf對象捉蚤,將會拋出異常抬驴;如果引用為0,表示這個(gè)ByteBuf沒有哪個(gè)進(jìn)程引用它缆巧,它占用的內(nèi)存需要回收布持。
-
在Netty中,引用計(jì)數(shù)為0的緩沖區(qū)不能再繼續(xù)使用
image.png - 為了確保引用計(jì)數(shù)不會混亂陕悬,在Netty的業(yè)務(wù)處理器開發(fā)過程中鳖链,應(yīng)該堅(jiān)持一個(gè)原則:retain和release方法應(yīng)該結(jié)對使用。
- 當(dāng)引用計(jì)數(shù)已經(jīng)為0, Netty會進(jìn)行ByteBuf的回收墩莫。分為兩種情況:
(1)Pooled池化的ByteBuf內(nèi)存芙委,回收方法是:放入可以重新分配的ByteBuf池子,等待下一次分配狂秦。(2)Unpooled未池化的ByteBuf緩沖區(qū)灌侣,回收分為兩種情況:如果是堆(Heap)結(jié)構(gòu)緩沖,會被JVM的垃圾回收機(jī)制回收裂问;如果是Direct類型侧啼,調(diào)用本地方法釋放外部內(nèi)存(unsafe.freeMemory)
2.5 ByteBuf的Allocator分配器
Netty通過ByteBufAllocator分配器來創(chuàng)建緩沖區(qū)和分配內(nèi)存空間。Netty提供了ByteBufAllocator的兩種實(shí)現(xiàn):PoolByteBufAllocator和UnpooledByteBufAllocator堪簿。
- PoolByteBufAllocator(池化ByteBuf分配器)將ByteBuf實(shí)例放入池中痊乾,提高了性能,將內(nèi)存碎片減少到最型指哪审;這個(gè)池化分配器采用了jemalloc高效內(nèi)存分配的策略,該策略被好幾種現(xiàn)代操作系統(tǒng)所采用虑瀑。
- UnpooledByteBufAllocator是普通的未池化ByteBuf分配器湿滓,它沒有把ByteBuf放入池中滴须,每次被調(diào)用時(shí),返回一個(gè)新的ByteBuf實(shí)例叽奥;通過Java的垃圾回收機(jī)制回收扔水。
- 內(nèi)存管理的策略可以靈活調(diào)整,這是使用Netty所帶來的又一個(gè)好處朝氓。只需一行簡單的配置魔市,就能獲得到池化緩沖區(qū)帶來的好處
public class AllocatorTest {
@Test
public void showAlloc() {
ByteBuf buffer = null;
//方法一:默認(rèn)分配器,分配初始容量為9赵哲,最大容量100的緩沖
buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);
//方法二:默認(rèn)分配器待德,分配初始為256,最大容量Integer.MAX_VALUE 的緩沖
buffer = ByteBufAllocator.DEFAULT.buffer();
//方法三:非池化分配器誓竿,分配基于Java的堆內(nèi)存緩沖區(qū)
buffer = UnpooledByteBufAllocator.DEFAULT.heapBuffer();
//方法四:池化分配器磅网,分配基于操作系統(tǒng)的管理的直接內(nèi)存緩沖區(qū)
buffer = PooledByteBufAllocator.DEFAULT.directBuffer();
//…..其他方法
}
}
-
根據(jù)內(nèi)存的管理方不同,分為堆緩存區(qū)和直接緩存區(qū)筷屡,也就是Heap ByteBuf和Direct ByteBuf涧偷。
image.png
2.6 ByteBuf的自動(dòng)釋放
- TailHandler自動(dòng)釋放
如果每個(gè)InboundHandler入站處理器,把最初的ByteBuf數(shù)據(jù)包一路往下傳毙死,那么TailHandler末尾處理器會自動(dòng)釋放掉入站的ByteBuf實(shí)例
(1)手動(dòng)釋放ByteBuf燎潮。具體的方式為調(diào)用byteBuf.release()。
(2)調(diào)用父類的入站方法將msg向后傳遞扼倘,依賴后面的處理器釋放ByteBuf确封。具體的方式為調(diào)用基類的入站處理方法super.channelRead(ctx,msg)。
- SimpleChannelInboundHandler自動(dòng)釋放
· 手動(dòng)釋放ByteBuf實(shí)例再菊。
· 繼承SimpleChannelInboundHandler爪喘,利用它的自動(dòng)釋放功能。