之前做的東華車管數(shù)據(jù)采集平臺總是發(fā)生數(shù)據(jù)丟失的情況,雖然不頻繁但是還是要關注一下原因般妙,于是今天提高了Netty的Log級別纪铺,打算查找一下問題出在哪了,提高級別代碼:
ServerBootstrap b =new ServerBootstrap();
b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 2048).handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new ChildChannelHandler());
將Loglevel設置成DEBUG模式就OK了碟渺。
于是開始安心的觀察日志:
2017-01-19 10:04:46 [ nioEventLoopGroup-1-0:1625429 ] - [ INFO ] 消息主體:60160308049620860021010707190117020453395443491162627407087d081f00002e37008801008c00f9
2017-01-19 10:04:49 [ nioEventLoopGroup-1-0:1628830 ] - [ ERROR ] LEAK: ByteBuf.release() was not called before it's garbage-collected. Enable advanced leak reporting to find out where the leak occurred. To enable advanced leak reporting, specify the JVM option '-Dio.netty.leakDetectionLevel=advanced' or call ResourceLeakDetector.setLevel() See http://netty.io/wiki/reference-counted-objects.html for more information.
2017-01-19 10:04:49 [ nioEventLoopGroup-1-0:1628845 ] - [ INFO ] 入緩存隊列操作結果:9
2017-01-19 10:04:49 [ nioEventLoopGroup-1-0:1628845 ] - [ INFO ] 消息主體:601603080496208600210107071901170204573954434611626262170f88091f00002e37008801008c00fa
2017-01-19 10:04:53 [ nioEventLoopGroup-1-0:1632839 ] - [ INFO ] 入緩存隊列操作結果:9
2017-01-19 10:04:53 [ nioEventLoopGroup-1-0:1632839 ] - [ INFO ] 消息主體:60160308049620860021010707190117020501395443581162624817108a091f00002e37008801008c00fb
2017-01-19 10:04:55 [ nioEventLoopGroup-1-0:1634196 ] - [ INFO ] 入緩存隊列操作結果:9
2017-01-19 10:04:55 [ nioEventLoopGroup-1-0:1634196 ] - [ INFO ] 消息主體:601603080496208600210107071901170205023954436011626244571288091f00002e37008801008c00fc
2017-01-19 10:04:56 [ nioEventLoopGroup-1-0:1635288 ] - [ INFO ] 入緩存隊列操作結果:9
2017-01-19 10:04:56 [ nioEventLoopGroup-1-0:1635288 ] - [ INFO ] 消息主體:60160308049620860021010707190117020503395443651162624107118a091f00002e37008801008c00fd
2017-01-19 10:04:57 [ nioEventLoopGroup-1-0:1636443 ] - [ INFO ] 入緩存隊列操作結果:9
2017-01-19 10:04:57 [ nioEventLoopGroup-1-0:1636443 ] - [ INFO ] 消息主體:601603080496208600210107071901170205053954437111626234671088091f00002e37008801008c00fe
注意這句話:
LEAK: ByteBuf.release() was not called before it's garbage-collected. Enable advanced leak reporting to find out where the leak occurred. To enable advanced leak reporting, specify the JVM option '-Dio.netty.leakDetectionLevel=advanced' or call ResourceLeakDetector.setLevel() See http://netty.io/wiki/reference-counted-objects.html for more information.
通過這句話我們可以得知鲜锚,只要加入
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
將警告級別設置成Advaced即可查到更詳細的泄漏信息,之后再度查看日志:
2017-01-19 10:35:59 [ nioEventLoopGroup-1-0:665092 ] - [ ERROR ] LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 5
#5:
io.netty.buffer.AdvancedLeakAwareByteBuf.readBytes(AdvancedLeakAwareByteBuf.java:435)
com.dhcc.ObdServer.ObdServerHandler.channelRead(ObdServerHandler.java:31)
io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)
io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)
io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:243)
io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)
io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)
io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:956)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:127)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:514)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351)
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)
io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)
io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126)
#4:
Hint: 'ObdServerHandler#0' will handle the message from this point.
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:387)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:243)
io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)
io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)
io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:956)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:127)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:514)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351)
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)
io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)
io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126)
#3:
io.netty.buffer.AdvancedLeakAwareByteBuf.release(AdvancedLeakAwareByteBuf.java:721)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:237)
io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)
io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)
io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:956)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:127)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:514)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351)
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)
io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)
io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126)
#2:
io.netty.buffer.AdvancedLeakAwareByteBuf.retain(AdvancedLeakAwareByteBuf.java:693)
io.netty.handler.codec.DelimiterBasedFrameDecoder.decode(DelimiterBasedFrameDecoder.java:277)
io.netty.handler.codec.DelimiterBasedFrameDecoder.decode(DelimiterBasedFrameDecoder.java:216)
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:316)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:230)
io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)
io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)
io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:956)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:127)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:514)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351)
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)
io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)
io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126)
#1:
io.netty.buffer.AdvancedLeakAwareByteBuf.skipBytes(AdvancedLeakAwareByteBuf.java:465)
io.netty.handler.codec.DelimiterBasedFrameDecoder.decode(DelimiterBasedFrameDecoder.java:272)
io.netty.handler.codec.DelimiterBasedFrameDecoder.decode(DelimiterBasedFrameDecoder.java:216)
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:316)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:230)
io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)
io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)
io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:956)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:127)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:514)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351)
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)
io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)
io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126)
Created at:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:250)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146)
io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:107)
io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:113)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:514)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351)
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)
io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)
io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126)
定位到我的代碼中為:
ByteBuf buff=(ByteBuf) msg;
byte[] req=new byte[buff.readableBytes()];
于是可以確定是ByteBuff內存泄漏導致的問題苫拍,于是從這方面著手調查芜繁,發(fā)現(xiàn)netty5默認的分配bytebuff的方式是PooledByteBufAllocator,所以要手動回收,要不然會造成內存泄漏绒极。
于是釋放ByteBuff即可
ReferenceCountUtil.release(buff);
這里引入一個網(wǎng)友對于這行代碼的說明:
ReferenceCountUtil.release()其實是ByteBuf.release()方法(從ReferenceCounted接口繼承而來)的包裝骏令。netty4中的ByteBuf使用了引用計數(shù)(netty4實現(xiàn)了一個可選的ByteBuf池),每一個新分配的ByteBuf>>的引用計數(shù)值為1垄提,每對這個ByteBuf對象增加一個引用榔袋,需要調用ByteBuf.retain()方法,而每減少一個引用铡俐,需要調用ByteBuf.release()方法凰兑。當這個ByteBuf對象的引用計數(shù)值為0時,表示此對象可回收审丘。我這只是用ByteBuf說明吏够,還有其他對象實現(xiàn)了ReferenceCounted接口,此時同理滩报。
在檢查問題的過程中锅知,我還懷疑是不是我的Netty使用了UDP協(xié)議導致的數(shù)據(jù)丟失,于是這里附上Netty使用的是TCP還是UDP的判斷方法:
關于TCP和UDP
socket可以基于TCP脓钾,也可以基于UDP喉镰。區(qū)別在于UDP的不保證數(shù)據(jù)包都正確收到,所以性能更好惭笑,但容錯不高侣姆。TCP保證不錯生真,所以性能沒那么好。
UDP基本只適合做在線視頻傳輸之類捺宗,我們的需求應該會是TCP柱蟀。
那這2種方式在寫法上有什么不同?網(wǎng)上搜到這樣的說法:
在ChannelFactory 的選擇上蚜厉,UDP的通信選擇 NioDatagramChannelFactory长已,TCP的通信我們選擇的是NioServerSocketChannelFactory;
在Bootstrap的選擇上昼牛,UDP選擇的是ConnectionlessBootstrap术瓮,而TCP選擇的是ServerBootstrap。
對于編解碼器decoder和Encoder贰健,以及ChannelPipelineFactory胞四,UDP開發(fā)與TCP并沒有什么區(qū)別,在此不做詳細介紹伶椿。
對于ChannelHandler辜伟,是UDP與TCP區(qū)別的核心所在。大家都知道UDP是無連接的脊另,也就是說你通過 MessageEvent 參數(shù)對象的 getChannel() 方法獲取當前會話連接导狡,但是其 isConnected() 永遠都返回 false。
UDP 開發(fā)中在消息獲取事件回調方法中偎痛,獲取了當前會話連接 channel 對象后可直接通過 channel 的 write 方法發(fā)送數(shù)據(jù)給對端 channel.write(message, remoteAddress)旱捧,第一個參數(shù)仍然是要發(fā)送的消息對象,
第二個參數(shù)則是要發(fā)送的對端 SocketAddress 地址對象踩麦。
這里最需要注意的一點是SocketAddress廊佩,在TCP通信中我們可以通過channel.getRemoteAddress()獲得,但在UDP通信中靖榕,我們必須從MessageEvent中通過調用getRemoteAddress()方法獲得對端的SocketAddress 地址标锄。