經(jīng)過前面的學(xué)習(xí)纳像,我們已經(jīng)學(xué)會(huì)了Netty的使用。本章節(jié)開始我們要進(jìn)行一些細(xì)節(jié)方面的學(xué)習(xí)拯勉,使其能更好的運(yùn)用在我們以后的工作當(dāng)中竟趾。
一、什么是option宫峦?
前面學(xué)習(xí)了Netty的服務(wù)端岔帽,和客戶端,知道了創(chuàng)建服務(wù)要分別使用ServerBootStrap和BootStrap导绷,不知道有沒有關(guān)注到其中有一個(gè)方法叫做Option的犀勒?
我們分別看下ServerBootStrap和BootStrap的option:
1)ServerBootStrap
如上圖所示,有兩種option妥曲,一個(gè)是自己的option(ServerSocketChannel)贾费,一個(gè)childOption(ScoketChannel的option)。
2)BootStrap
只有一個(gè)option方法檐盟。
無論是上述哪兩種褂萧,參數(shù)都是ChannelOption<T>,而這個(gè)ChannelOption Netty已經(jīng)幫我們準(zhǔn)備好了葵萎,可以直接使用导犹。
下面我們會(huì)針對(duì)幾種重要的配置講解一下。
二陌宿、常用參數(shù)
2.1 CONNECT_TIMEOUT_MILLIS
ScoketChannel的參數(shù)锡足。
用在客戶端建立連接時(shí),如果超過指定的時(shí)間仍未連接壳坪,則拋出timeout異常舶得。
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,500);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",8080);
//阻塞等待連接
channelFuture.sync();
//阻塞等待釋放連接
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("server error:" + e);
} finally {
// 釋放EventLoopGroup
worker.shutdownGracefully();
}
}
只啟動(dòng)客戶端,拋出如下異常:
Exception in thread "main" io.netty.channel.ConnectTimeoutException: connection timed out: /127.0.0.1:8080
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:261)
at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
若果該參數(shù)設(shè)置過長(zhǎng)爽蝴,且服務(wù)端確實(shí)沒啟動(dòng)沐批,則會(huì)拋出java層面的異常,拒絕連接:
Exception in thread "main" io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: /127.0.0.1:8080
Caused by: java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
2.2 SO_TIMEOUT
這個(gè)參數(shù)適用于阻塞IO蝎亚,比如阻塞IO當(dāng)中的read九孩,accept等方法,修飾阻塞的,如果不想一直阻塞吁津,可以通過改參數(shù)設(shè)置超時(shí)時(shí)間。
不要與CONNECT_TIMEOUT_MILLIS弄混了馍刮。
2.3 SO_BACKLOG
ServerSocketChannal 參數(shù)宪拥。
在了解這個(gè)參數(shù)之前仿野,要先了解下TCP的三次握手,sync_queue(半連接隊(duì)列)和accept_queue(全連接隊(duì)列)她君。
其中半連接隊(duì)列是在首次握手時(shí)脚作,將請(qǐng)求放入半連接隊(duì)列,當(dāng)三次握手全部成功后缔刹,將請(qǐng)求從半連接隊(duì)列放入全連接隊(duì)列球涛。
下圖展示netty和三次握手的關(guān)系:
- 第一次握手,client 發(fā)送 SYN 到 server校镐,狀態(tài)修改為 SYN_SEND亿扁,server 收到,狀態(tài)改變?yōu)?SYN_REVD灭翔,并將該請(qǐng)求放入 sync queue 隊(duì)列
- 第二次握手魏烫,server 回復(fù) SYN + ACK 給 client,client 收到肝箱,狀態(tài)改變?yōu)?ESTABLISHED哄褒,并發(fā)送 ACK 給 server
- 第三次握手,server 收到 ACK煌张,狀態(tài)改變?yōu)?ESTABLISHED呐赡,將該請(qǐng)求從 sync queue 放入 accept queue。
在上面的過程中骏融,提到的sync_queue和accept_queue是我們本篇文章需要關(guān)注的重點(diǎn)链嘀。
在linux2.2之前,backlog包括了兩個(gè)隊(duì)列的大小档玻。在之后的版本當(dāng)中怀泊,由如下兩個(gè)參數(shù)來控制:
-
sync queue - 半連接隊(duì)列
- 大小通過 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在
syncookies
啟用的情況下误趴,邏輯上沒有最大值限制霹琼,這個(gè)設(shè)置便被忽略
- 大小通過 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在
-
accept queue - 全連接隊(duì)列
- 其大小通過 /proc/sys/net/core/somaxconn 指定,在使用 listen 函數(shù)時(shí)凉当,內(nèi)核會(huì)根據(jù)傳入的 backlog 參數(shù)與系統(tǒng)參數(shù)比較枣申,取二者的較小值。
- 如果 accpet queue 隊(duì)列滿了看杭,server 將發(fā)送一個(gè)拒絕連接的錯(cuò)誤信息到 client忠藤。
下面回歸正題,在netty當(dāng)中楼雹,通過ChannelOption.SO_BACKLOG設(shè)置大小模孩,如下所示:
public class Server {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
//此處為了模擬尖阔,設(shè)置為2
serverBootstrap.option(ChannelOption.SO_BACKLOG,2);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
//阻塞等待連接
channelFuture.sync();
//阻塞等待釋放連接
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("server error:" + e);
} finally {
// 釋放EventLoopGroup
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
如上代碼所示,設(shè)置了一個(gè)backlog為2的值榨咐。然后我們需要啟動(dòng)至少三個(gè)客戶端看結(jié)果诺祸。
通過前面的三次握手的圖,可以知道祭芦,只有當(dāng)服務(wù)端處理不過來時(shí),才會(huì)使用全連接隊(duì)列憔鬼,并將其占滿龟劲,否則會(huì)直接走accept()方法,導(dǎo)致我們看不到測(cè)試結(jié)果轴或。
所以我們這里不做測(cè)試了昌跌。
我們看下這個(gè)backlog的默認(rèn)值在nio當(dāng)中是多少:
在NIO當(dāng)中backlog在ServerSocketChannel當(dāng)中的bind方法被調(diào)用,所以我們從這里跟蹤進(jìn)去找到bind方法:
public final ServerSocketChannel bind(SocketAddress local)
throws IOException
{
return bind(local, 0);
}
查看bind被哪些地方調(diào)用照雁,NioServerSocketChannel:
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
跟蹤config.getBacklog():
private final ServerSocketChannelConfig config;
這個(gè)config是接口蚕愤,直接看它的實(shí)現(xiàn)DefaultServerSocketChannelConfig:
private volatile int backlog = NetUtil.SOMAXCONN;
找SOMAXCONN:
public static final int SOMAXCONN;
找到SOMAXCONN賦值的位置,默認(rèn)是windows200饺蚊,Linux或mac默認(rèn)128萍诱,如果有前面我們提到的文件/proc/sys/net/core/somaxconn,則走此配置中的文件:
SOMAXCONN = AccessController.doPrivileged(new PrivilegedAction<Integer>() {
@Override
public Integer run() {
// Determine the default somaxconn (server socket backlog) value of the platform.
// The known defaults:
// - Windows NT Server 4.0+: 200
// - Linux and Mac OS X: 128
int somaxconn = PlatformDependent.isWindows() ? 200 : 128;
File file = new File("/proc/sys/net/core/somaxconn");
BufferedReader in = null;
try {
// file.exists() may throw a SecurityException if a SecurityManager is used, so execute it in the
// try / catch block.
// See https://github.com/netty/netty/issues/4936
if (file.exists()) {
in = new BufferedReader(new FileReader(file));
somaxconn = Integer.parseInt(in.readLine());
if (logger.isDebugEnabled()) {
logger.debug("{}: {}", file, somaxconn);
}
} else {
// Try to get from sysctl
Integer tmp = null;
if (SystemPropertyUtil.getBoolean("io.netty.net.somaxconn.trySysctl", false)) {
tmp = sysctlGetInt("kern.ipc.somaxconn");
if (tmp == null) {
tmp = sysctlGetInt("kern.ipc.soacceptqueue");
if (tmp != null) {
somaxconn = tmp;
}
} else {
somaxconn = tmp;
}
}
if (tmp == null) {
logger.debug("Failed to get SOMAXCONN from sysctl and file {}. Default: {}", file,
somaxconn);
}
}
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to get SOMAXCONN from sysctl and file {}. Default: {}",
file, somaxconn, e);
}
} finally {
if (in != null) {
try {
in.close();
} catch (Exception e) {
// Ignored.
}
}
}
return somaxconn;
}
});
2.4 ulimit
屬于操作系統(tǒng)參數(shù)污呼。
使用ulimit -n 可以查看當(dāng)前的最大打開文件數(shù)裕坊。使用ulimit -a 可以查看當(dāng)前系統(tǒng)的所有限制值。
linux默認(rèn)1024燕酷,當(dāng)服務(wù)器負(fù)載較大時(shí)籍凝,會(huì)發(fā)生too many open files的錯(cuò)誤,所以我們?yōu)榱颂峁┎l(fā)量苗缩,需要手動(dòng)將其調(diào)整饵蒂。
使用如下命令可以將其調(diào)整,但是是臨時(shí)性的酱讶,可以考慮將其放在啟動(dòng)腳本當(dāng)中:
ulimit -n 4096
2.5 TCP_NODELAY
屬于 SocketChannal 參數(shù)退盯。
在前面的文章當(dāng)中,我們提到過TCP的nagle算法浴麻,我們使用netty時(shí)得问,它的默認(rèn)開始的。
nagle算法會(huì)使我們某些較小的數(shù)據(jù)包造成延遲软免,因?yàn)闉榱颂嵘使常琻agle會(huì)等到收集到一定數(shù)據(jù)后進(jìn)行發(fā)送,這樣可能造成我們消息的延遲膏萧。
可以通過如下方式設(shè)置漓骚,開啟無延遲的配置:
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
2.6 SO_SNDBUF & SO_RCVBUF
SO_SNDBUF 屬于 SocketChannal 參數(shù)
SO_RCVBUF 既可用于 SocketChannal 參數(shù)蝌衔,也可以用于 ServerSocketChannal 參數(shù)
這兩個(gè)參數(shù)不建議我們手動(dòng)進(jìn)行設(shè)置,因?yàn)椴僮飨到y(tǒng)會(huì)根據(jù)當(dāng)前占用蝌蹂,進(jìn)行自動(dòng)的調(diào)整噩斟。
2.7 ALLOCATOR
屬于 SocketChannal 參數(shù)。
ByteBuf的分配器孤个。
serverBootstrap.childOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);
這個(gè)參數(shù)只有一個(gè)DEFAULT可以使用剃允。
這個(gè)參數(shù)與ch.alloc().buffer()命令有關(guān),關(guān)系著我們分配的buf是池化還是非池化齐鲤,是直接內(nèi)存還是堆內(nèi)存斥废。
我們從上面的Default跟蹤進(jìn)去:
ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;
繼續(xù)跟蹤DEFAULT_ALLOCATOR:
static final ByteBufAllocator DEFAULT_ALLOCATOR;
找到對(duì)其賦值的位置,發(fā)現(xiàn)了如下的靜態(tài)代碼塊给郊,此處就是設(shè)置buf是pooled還是unpooled牡肉,通過環(huán)境變量:"io.netty.allocator.type" 指定,我們可以在啟動(dòng)項(xiàng)目時(shí)指定-Dio.netty.allocator.type=unpooled設(shè)置成非池化淆九。從源碼可以看到统锤,安卓是unpooled,其他事pooled炭庙。
static {
MAX_BYTES_PER_CHAR_UTF8 = (int)CharsetUtil.encoder(CharsetUtil.UTF_8).maxBytesPerChar();
String allocType = SystemPropertyUtil.get("io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
allocType = allocType.toLowerCase(Locale.US).trim();
Object alloc;
if ("unpooled".equals(allocType)) {
alloc = UnpooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else if ("pooled".equals(allocType)) {
alloc = PooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else {
alloc = PooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
}
DEFAULT_ALLOCATOR = (ByteBufAllocator)alloc;
THREAD_LOCAL_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 0);
logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", THREAD_LOCAL_BUFFER_SIZE);
MAX_CHAR_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.maxThreadLocalCharBufferSize", 16384);
logger.debug("-Dio.netty.maxThreadLocalCharBufferSize: {}", MAX_CHAR_BUFFER_SIZE);
FIND_NON_ASCII = new ByteProcessor() {
public boolean process(byte value) {
return value >= 0;
}
};
}
如上代碼中的UnpooledByteBufAllocator.DEFAULT和PooledByteBufAllocator.DEFAULT就指定了我們使用的直接內(nèi)存還是堆內(nèi)存饲窿,跟蹤其中的UnpooledByteBufAllocator.DEFAULT看一下:
public static final UnpooledByteBufAllocator DEFAULT = new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred());
跟蹤PlatformDependent.directBufferPreferred():
private static final boolean DIRECT_BUFFER_PREFERRED;
找DIRECT_BUFFER_PREFERRED賦值的位置:
DIRECT_BUFFER_PREFERRED = CLEANER != NOOP && !SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false);
重點(diǎn)關(guān)注上述代碼后半段!SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false);,我們可用通過-Dio.netty.noPreferDirect=true環(huán)境變量指定我們使用堆內(nèi)存煤搜。
2.8 RCVBUF_ALLOCATOR
屬于 SocketChannal 參數(shù)免绿。
控制 netty 接收緩沖區(qū)大小。
這個(gè)RCVBUF_ALLOCATOR不要與前面的ALLOCATOR弄混擦盾。
負(fù)責(zé)入站數(shù)據(jù)的分配嘲驾,決定入站緩沖區(qū)的大小(并可動(dòng)態(tài)調(diào)整)迹卢,統(tǒng)一采用 direct 直接內(nèi)存辽故,具體池化還是非池化由 allocator 決定。
通俗的講在handler內(nèi)部分配的byteBuf可以是直接內(nèi)存腐碱,也可以是堆內(nèi)存誊垢,但是經(jīng)過網(wǎng)絡(luò)io的內(nèi)存,netty會(huì)強(qiáng)制為直接內(nèi)存症见。
我們啟動(dòng)一個(gè)客戶端和服務(wù)端去驗(yàn)證一下上述的描述:
服務(wù)端:
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
super.channelRead(ctx, msg);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
//阻塞等待連接
channelFuture.sync();
//阻塞等待釋放連接
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("server error:" + e);
} finally {
// 釋放EventLoopGroup
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
客戶端:
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf byteBuf = ctx.alloc().buffer();
byteBuf.writeBytes("hello world!".getBytes());
System.out.println(byteBuf);
ctx.writeAndFlush(byteBuf);
super.channelActive(ctx);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080);
//阻塞等待連接
channelFuture.sync();
//阻塞等待釋放連接
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("server error:" + e);
} finally {
// 釋放EventLoopGroup
worker.shutdownGracefully();
}
}
我們重點(diǎn)關(guān)注服務(wù)端接收到的Object msg喂走,它應(yīng)該是一個(gè)Bytebuf對(duì)象,那么他是什么類型的谋作?如何生成的芋肠?
我們通過打斷點(diǎn)的方式,找到線程調(diào)用的堆棧遵蚜,發(fā)現(xiàn)第一個(gè)channel的位置:
點(diǎn)擊進(jìn)去發(fā)現(xiàn)如下代碼帖池,并且就是我們得到的ByteBuf:
重點(diǎn)關(guān)注這部分有注釋代碼:
ChannelPipeline pipeline = AbstractNioByteChannel.this.pipeline();
// 上一小節(jié)提到的allocator奈惑,負(fù)責(zé)ByteBuf是池化還是非池化
ByteBufAllocator allocator = config.getAllocator();
//此處Handle是RecvByteBufAllocator內(nèi)部類
Handle allocHandle = this.recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
//allocate方法創(chuàng)建byteBuf
byteBuf = allocHandle.allocate(allocator);
下面重點(diǎn)關(guān)注這個(gè)allocate方法,這里會(huì)分配一個(gè)ioBuffer睡汹,即直接內(nèi)存buffer:
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(this.guess());
}
如上的guess()會(huì)根據(jù)數(shù)據(jù)量大小肴甸,動(dòng)態(tài)分配buffer的大小,范圍如下囚巴,自適應(yīng)的AdaptiveRecvByteBufAllocator原在,最小不會(huì)小于64,最大不會(huì)超過65536:
public AdaptiveRecvByteBufAllocator() {
this(64, 1024, 65536);
}
關(guān)于主要參數(shù)就介紹這么多彤叉,有用的話幫忙點(diǎn)個(gè)贊吧~~