Netty服務(wù)端示例:
public class NettyServer {
public static void main(String[] args) throws Exception {
// 創(chuàng)建兩個(gè)線(xiàn)程組bossGroup和workerGroup, 含有的子線(xiàn)程N(yùn)ioEventLoop的個(gè)數(shù)默認(rèn)為cpu核數(shù)的兩倍
// bossGroup只是處理連接請(qǐng)求 ,真正的和客戶(hù)端業(yè)務(wù)處理幌蚊,會(huì)交給workerGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup(3);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
// 創(chuàng)建服務(wù)器端的啟動(dòng)對(duì)象
ServerBootstrap bootstrap = new ServerBootstrap();
// 使用鏈?zhǔn)骄幊虂?lái)配置參數(shù)
bootstrap.group(bossGroup, workerGroup) //設(shè)置兩個(gè)線(xiàn)程組
// 使用NioServerSocketChannel作為服務(wù)器的通道實(shí)現(xiàn)
.channel(NioServerSocketChannel.class)
// 初始化服務(wù)器連接隊(duì)列大小烟瞧,服務(wù)端處理客戶(hù)端連接請(qǐng)求是順序處理的,所以同一時(shí)間只能處理一個(gè)客戶(hù)端連接。
// 多個(gè)客戶(hù)端同時(shí)來(lái)的時(shí)候,服務(wù)端將不能處理的客戶(hù)端連接請(qǐng)求放在隊(duì)列中等待處理
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {//創(chuàng)建通道初始化對(duì)象沮焕,設(shè)置初始化參數(shù),在 SocketChannel 建立起來(lái)之前執(zhí)行
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//對(duì)workerGroup的SocketChannel設(shè)置處理器
ch.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("netty server start拉宗。峦树。");
// 綁定一個(gè)端口并且同步, 生成了一個(gè)ChannelFuture異步對(duì)象,通過(guò)isDone()等方法可以判斷異步事件的執(zhí)行情況
// 啟動(dòng)服務(wù)器(并綁定端口)旦事,bind是異步操作魁巩,sync方法是等待異步操作執(zhí)行完畢
ChannelFuture cf = bootstrap.bind(9000).sync();
// 給cf注冊(cè)監(jiān)聽(tīng)器,監(jiān)聽(tīng)我們關(guān)心的事件
/*cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("監(jiān)聽(tīng)端口9000成功");
} else {
System.out.println("監(jiān)聽(tīng)端口9000失敗");
}
}
});*/
// 等待服務(wù)端監(jiān)聽(tīng)端口關(guān)閉姐浮,closeFuture是異步操作
// 通過(guò)sync方法同步等待通道關(guān)閉處理完畢谷遂,這里會(huì)阻塞等待通道關(guān)閉完成,內(nèi)部調(diào)用的是Object的wait()方法
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
1.NioEventLoopGroup和NioEventLoop
public NioEventLoopGroup() {
this(0);
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
線(xiàn)程數(shù)默認(rèn)是核心數(shù)的兩倍卖鲤。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
重點(diǎn)看下newChild()
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
NioEventLoop里面有兩個(gè)最核心的組件:
- 1)在其父類(lèi)構(gòu)造方法SingleThreadEventExecutor()里面初始化了taskQueue肾扰,有可能是LinkedBlockingQueue畴嘶,也有可能是MpscUnboundedArrayQueue或者M(jìn)pscUnboundedAtomicArrayQueue
- 2)selectorTuple = openSelector();
2.ServerBootstrap
配置參數(shù):
- ServerBootstrap#group(parentGroup, childGroup),設(shè)置this.group = parentGroup, this.childGroup = childGroup集晚。
- AbstractBootstrap#channel掠廓,設(shè)置this.channelFactory
- AbstractBootstrap#option,設(shè)置參數(shù)
- ServerBootstrap#childHandler(i)甩恼,設(shè)置this.childHandler
2.1 服務(wù)端向selector注冊(cè)ACCEPT事件并綁定端口地址
AbstractBootstrap#bind(int)
- bind(SocketAddress)
- AbstractBootstrap#doBind
1)initAndRegister();
?1-1)channel = channelFactory.newChannel(); 這里調(diào)用ReflectiveChannelFactory#newChannel蟀瞧,然后會(huì)調(diào)用傳入類(lèi)的構(gòu)造方法constructor.newInstance();
??NioServerSocketChannel()構(gòu)造方法;
??this(newSocket(DEFAULT_SELECTOR_PROVIDER));其中newSocket()會(huì)調(diào)用SelectorProvider.provider(). openServerSocketChannel()創(chuàng)建ServerSocketChannel条摸。
??pipeline = newChannelPipeline();創(chuàng)建DefaultChannelPipeline悦污。
??super(null, channel, SelectionKey.OP_ACCEPT);
??this.readInterestOp = readInterestOp;關(guān)注ACCEPT事件;
??ch.configureBlocking(false);設(shè)置為非阻塞模式钉蒲;
?1-2)init(channel); 核心是向pipeline添加了一個(gè)ChannelHandler(ChannelInitializer一次性切端、初始化handler),負(fù)責(zé)添加一個(gè)ServerBootstrapAcceptor handler顷啼,添加完后踏枣,自己就移除了,ServerBootstrapAcceptor handler: 負(fù)責(zé)接收客戶(hù)端連接創(chuàng)建連接后钙蒙,對(duì)連接的初始化工作茵瀑。
?1-3)config().group().register(channel);
??MultithreadEventLoopGroup#register()
??SingleThreadEventLoop#register()
??AbstractChannel.AbstractUnsafe#register
??eventLoop.execute()提交了一個(gè)register0()任務(wù)。
???AbstractNioChannel#doRegister躬厌,調(diào)用selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
???pipeline.invokeHandlerAddedIfNeeded();這里會(huì)調(diào)用上面設(shè)置的ChannelInitializer#initChannel方法马昨,移除自己添加ServerBootstrapAcceptor 。(參考DefaultChannelPipeline#addLast()-> callHandlerCallbackLater(newCtx, true)-> PendingHandlerAddedTask -> ChannelInitializer#handlerAdded)
???pipeline.fireChannelRegistered();
???beginRead()扛施,調(diào)用父類(lèi)AbstractNioChannel#doBeginRead鸿捧,這里會(huì)調(diào)用selectionKey.interestOps(interestOps | readInterestOp),也即關(guān)注ACCEPT事件疙渣。
2)doBind0(regFuture, channel, localAddress, promise);
?channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
2.2 NioEventLoop#run
來(lái)分析一下SingleThreadEventExecutor#execute
- 1)addTask(task);將任務(wù)加入到隊(duì)列中taskQueue.offer(task);
- 2)startThread();
SingleThreadEventExecutor#doStartThread匙奴,這里會(huì)調(diào)用executor.execute()執(zhí)行Runnable,Runnable的核心如下妄荔;
SingleThreadEventExecutor.this.run();
NioEventLoop#run
上面的executor是ThreadPerTaskExecutor泼菌,在MultithreadEventExecutorGroup構(gòu)造方法里面。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
重點(diǎn)就在NioEventLoop#run
- 1)SelectStrategy.SELECT:select(wakenUp.getAndSet(false));這里面核心是調(diào)用 int selectedKeys = selector.select(timeoutMillis);
- 2)processSelectedKeys();
- 3)runAllTasks();從taskQueue中取出任務(wù)并執(zhí)行懦冰。
無(wú)鎖串行化設(shè)計(jì)思想
Netty的NioEventLoop讀取到消息之后灶轰,直接調(diào)用ChannelPipeline的fireChannelRead(Object msg),只要用戶(hù)不主動(dòng)切換線(xiàn)程刷钢,一直會(huì)由NioEventLoop調(diào)用到用戶(hù)的Handler,期間不進(jìn)行線(xiàn)程切換乳附,這種串行化處理方式避免了多線(xiàn)程操作導(dǎo)致的鎖的競(jìng)爭(zhēng)内地,從性能角度看是最優(yōu)的伴澄。
3.pipeline責(zé)任鏈
來(lái)看看pipeline責(zé)任鏈調(diào)用流程:
- DefaultChannelPipeline#fireChannelRegistered
- AbstractChannelHandlerContext.invokeChannelRegistered(head);
- 調(diào)用head.invokeChannelRegistered()
- 調(diào)用HeadContext#channelRegistered
- 核心是findContextInbound(MASK_CHANNEL_REGISTERED),找到下一個(gè)與MASK_CHANNEL_REGISTERED匹配的調(diào)用者阱缓,然后又重復(fù)調(diào)用AbstractChannelHandlerContext.invokeChannelRegistered()方法非凌,實(shí)現(xiàn)責(zé)任鏈調(diào)用
AbstractChannelHandlerContext.invokeChannelRegistered:
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
AbstractChannelHandlerContext#invokeChannelRegistered()
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
DefaultChannelPipeline.HeadContext#channelRegistered
public void channelRegistered(ChannelHandlerContext ctx) {
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
}
AbstractChannelHandlerContext#fireChannelRegistered
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
return this;
}
核心是AbstractChannelHandlerContext#findContextInbound:
- 從前往后查找AbstractChannelHandlerContext ,直到找到與mask匹配為止
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while ((ctx.executionMask & mask) == 0);
return ctx;
}
4.服務(wù)端Channel注冊(cè)并處理ACCEPT事件
NioEventLoop#processSelectedKeys
private void processSelectedKeys() {
if (selectedKeys != null) {
//不用JDK的selector.selectedKeys(), 性能更好(1%-2%)荆针,垃圾回收更少
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
//呼應(yīng)于channel的register中的this: 例如:selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
//處理讀請(qǐng)求(斷開(kāi)連接)或接入連接
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
重點(diǎn)關(guān)注一下服務(wù)端處理SelectionKey.OP_ACCEPT請(qǐng)求:unsafe.read();
實(shí)際是AbstractNioMessageChannel.NioMessageUnsafe#read
- 1)doReadMessages()首先會(huì)調(diào)用serverSocketChannel.accept()敞嗡,然后將其封裝成NioSocketChannel。
?NioSocketChannel構(gòu)造方法航背,會(huì)創(chuàng)建DefaultChannelPipeline喉悴,會(huì)關(guān)注SelectionKey.OP_READ事件(賦值給this.readInterestOp),會(huì)設(shè)置非阻塞模式ch.configureBlocking(false)玖媚。 - 2)pipeline.fireChannelRead(readBuf.get(i));這里會(huì)觸發(fā)服務(wù)端的pipeline中的handler箕肃,核心是ServerBootstrapAcceptor#channelRead()。
?2-1)child.pipeline().addLast(childHandler)今魔,將netty服務(wù)端初始化時(shí)寫(xiě)的ChannelInitializer加入到客戶(hù)端socketChannel的pipeline里面勺像;
?2-2)childGroup.register(child).addListener(),跟上面服務(wù)端channel處理類(lèi)似错森。
??A)向workerGroup線(xiàn)程池某個(gè)NioEventLoop中的selector注冊(cè)讀事件(是在pipeline.fireChannelActive() -> .DefaultChannelPipeline.HeadContext#read -> AbstractChannel.AbstractUnsafe#beginRead吟宦,這里會(huì)注冊(cè)上面channel初始化傳入的讀事件),NioEventLoop#run死循環(huán)監(jiān)聽(tīng)該事件涩维;
??B)ChannelInitializer#initChannel方法督函,移除自己添加NettyServerHandler。
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
5.客戶(hù)端Channel處理READ事件
NioEventLoop#processSelectedKey()
-> AbstractNioByteChannel.NioByteUnsafe#read
- 1)byteBuf = allocHandle.allocate(allocator); 分配byteBuf
- 2) allocHandle.lastBytesRead(doReadBytes(byteBuf)); 從channel讀取數(shù)據(jù)激挪;
- 3)pipeline.fireChannelRead(byteBuf)辰狡,pipeline上執(zhí)行,業(yè)務(wù)邏輯的處理就在這個(gè)地方
6.直接內(nèi)存垄分、零拷貝與ByteBuf內(nèi)存池
在上面分配byteBuf里面宛篇,就使用了直接內(nèi)存:
RecvByteBufAllocator.DelegatingHandle#allocate
public ByteBuf allocate(ByteBufAllocator alloc) {
return delegate.allocate(alloc);
}
DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#allocate
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
}
AbstractByteBufAllocator#ioBuffer(int)
public ByteBuf ioBuffer(int initialCapacity) {
if (PlatformDependent.hasUnsafe() || isDirectBufferPooled()) {
return directBuffer(initialCapacity);
}
return heapBuffer(initialCapacity);
}
PooledByteBufAllocator#newDirectBuffer
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
使用直接內(nèi)存的優(yōu)缺點(diǎn):
優(yōu)點(diǎn):
- 不占用堆內(nèi)存空間,減少了發(fā)生GC的可能
- java虛擬機(jī)實(shí)現(xiàn)上薄湿,本地IO會(huì)直接操作直接內(nèi)存(直接內(nèi)存=>系統(tǒng)調(diào)用=>硬盤(pán)/網(wǎng)卡)叫倍,而非直接內(nèi)存則需要二次拷貝(堆內(nèi)存=>直接內(nèi)存=>系統(tǒng)調(diào)用=>硬盤(pán)/網(wǎng)卡)
缺點(diǎn):
- 初始分配較慢
- 沒(méi)有JVM直接幫助管理內(nèi)存,容易發(fā)生內(nèi)存溢出豺瘤。為了避免一直沒(méi)有FULL GC吆倦,最終導(dǎo)致直接內(nèi)存把物理內(nèi)存耗完。我們可以指定直接內(nèi)存的最大值坐求,通過(guò)-XX:MaxDirectMemorySize來(lái)指定蚕泽,當(dāng)達(dá)到閾值的時(shí)候,調(diào)用system.gc來(lái)進(jìn)行一次FULL GC,間接把那些沒(méi)有被使用的直接內(nèi)存回收掉须妻。
對(duì)于堆外直接內(nèi)存的分配和回收仔蝌,是一件耗時(shí)的操作。為了盡量重用緩沖區(qū)荒吏,Netty提供了基于ByteBuf內(nèi)存池的緩沖區(qū)重用機(jī)制敛惊。需要的時(shí)候直接從池子里獲取ByteBuf使用即可,使用完畢之后就重新放回到池子里去绰更。
PooledByteBufAllocator#newDirectBuffer
-> PoolArena#allocate()
-> PoolArena.DirectArena#newByteBuf
-> PooledUnsafeDirectByteBuf#newInstance
-> 最終通過(guò)RECYCLER內(nèi)存池獲取ByteBuf對(duì)象瞧挤,如果是非內(nèi)存池實(shí)現(xiàn),則直接創(chuàng)建一個(gè)新的ByteBuf對(duì)象儡湾。
PoolArena#allocate()
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
allocate(cache, buf, reqCapacity);
return buf;
}
PoolArena.DirectArena#newByteBuf
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
if (HAS_UNSAFE) {
return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
} else {
return PooledDirectByteBuf.newInstance(maxCapacity);
}
}
PooledUnsafeDirectByteBuf#newInstance
static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
PooledUnsafeDirectByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}
7.ByteBuf擴(kuò)容機(jī)制
ByteBuf.writeByte()->AbstractByteBuf
AbstractByteBuf#writeByte
public ByteBuf writeByte(int value) {
ensureWritable0(1);
_setByte(writerIndex++, value);
return this;
}
AbstractByteBuf#ensureWritable0
final void ensureWritable0(int minWritableBytes) {
ensureAccessible();
if (minWritableBytes <= writableBytes()) {
return;
}
final int writerIndex = writerIndex();
if (checkBounds) {
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
}
// Normalize the current capacity to the power of 2.
int minNewCapacity = writerIndex + minWritableBytes;
int newCapacity = alloc().calculateNewCapacity(minNewCapacity, maxCapacity);
int fastCapacity = writerIndex + maxFastWritableBytes();
// Grow by a smaller amount if it will avoid reallocation
if (newCapacity > fastCapacity && minNewCapacity <= fastCapacity) {
newCapacity = fastCapacity;
}
// Adjust to the new capacity.
capacity(newCapacity);
}
AbstractByteBufAllocator#calculateNewCapacity
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
checkPositiveOrZero(minNewCapacity, "minNewCapacity");
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
if (minNewCapacity == threshold) {
return threshold;
}
// If over threshold, do not double but just increase by threshold.
if (minNewCapacity > threshold) {
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
newCapacity += threshold;
}
return newCapacity;
}
// Not over threshold. Double up to 4 MiB, starting from 64.
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
return Math.min(newCapacity, maxCapacity);
}
Netty的ByteBuf需要?jiǎng)討B(tài)擴(kuò)容來(lái)滿(mǎn)足需要特恬,擴(kuò)容過(guò)程: 默認(rèn)門(mén)限閾值為4MB(這個(gè)閾值是一個(gè)經(jīng)驗(yàn)值,不同場(chǎng)景盒粮,可能取值不同)鸵鸥,當(dāng)需要的容量等于門(mén)限閾值,使用閾值作為新的緩存區(qū)容量 目標(biāo)容量丹皱,如果大于閾值妒穴,采用每次步進(jìn)4MB的方式進(jìn)行內(nèi)存擴(kuò)張((需要擴(kuò)容值/4MB)*4MB),擴(kuò)張后需要和最大內(nèi)存(maxCapacity)進(jìn)行比較摊崭,大于maxCapacity的話(huà)就用maxCapacity,否則使用擴(kuò)容值 目標(biāo)容量讼油,如果小于閾值,采用倍增的方式呢簸,以64(字節(jié))作為基本數(shù)值矮台,每次翻倍增長(zhǎng)64 -->128 --> 256,直到倍增后的結(jié)果大于或等于需要的容量值根时。
8.空輪詢(xún)bug處理
NioEventLoop#select
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
//按scheduled的task時(shí)間來(lái)計(jì)算select timeout時(shí)間瘦赫。
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();
if (nextWakeupTime != normalizedDeadlineNanos) {
nextWakeupTime = normalizedDeadlineNanos;
}
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) { //已經(jīng)有定時(shí)task需要執(zhí)行了,或者超過(guò)最長(zhǎng)等待時(shí)間了
if (selectCnt == 0) {
//非阻塞蛤迎,沒(méi)有數(shù)據(jù)返回0
selector.selectNow();
selectCnt = 1;
}
break;
}
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
//下面select阻塞中确虱,別人喚醒也可以可以的
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
若Selector的輪詢(xún)結(jié)果為空,也沒(méi)有wakeup或新消息處理替裆,則發(fā)生空輪詢(xún)校辩,CPU使用率100%。
Netty的解決辦法:
- 1辆童、對(duì)Selector的select操作周期進(jìn)行統(tǒng)計(jì)宜咒,每完成一次空的select操作進(jìn)行一次計(jì)數(shù),若在某個(gè)周期內(nèi)連續(xù)發(fā)生N次空輪詢(xún)把鉴,則觸發(fā)了epoll死循環(huán)bug故黑。
- 2、重建Selector,判斷是否是其他線(xiàn)程發(fā)起的重建請(qǐng)求倍阐,若不是則將原SocketChannel從舊的Selector上去除注冊(cè)概疆,重新注冊(cè)到新的Selector上逗威,并將原來(lái)的Selector關(guān)閉峰搪。
Netty解決辦法具體步驟:
- 1、先定義當(dāng)前時(shí)間currentTimeNanos凯旭。
- 2概耻、接著計(jì)算出一個(gè)執(zhí)行最少需要的時(shí)間timeoutMillis。
- 3罐呼、每次對(duì)selectCnt做++操作鞠柄。
- 4、進(jìn)行判斷嫉柴,如果執(zhí)行達(dá)到或者超過(guò)了最少時(shí)間厌杜,則seletCnt重置為1(過(guò)濾到select超時(shí)返回情況)。
- 5计螺、一旦到達(dá)SELECTOR_AUTO_REBUILD_THRESHOLD這個(gè)閥值夯尽,就需要重建selector來(lái)解決這個(gè)問(wèn)題。
- 6登馒、這個(gè)閥值默認(rèn)是512匙握。
參考
- 圖靈VIP課程,https://vip.tulingxueyuan.cn/