Apache RocketMQ作為阿里開(kāi)源的一款高性能没陡、高吞吐量的分布式消息中間件衩藤,它的通信能力已得到業(yè)界的公認(rèn)叠赦。它的通信層是借助于異常網(wǎng)絡(luò)框架Netty實(shí)現(xiàn)的纲缓。在開(kāi)發(fā)網(wǎng)絡(luò)游戲的時(shí)候痘拆,Netty也常用于游戲服務(wù)器或網(wǎng)關(guān)的通信層框架仰禽,所以,可以通過(guò)學(xué)習(xí)RocketMQ是如何使用Netty框架,從中借鑒一此應(yīng)用技巧吐葵。
在RocketMQ中的rocketmq-remoting項(xiàng)目就是針對(duì)網(wǎng)絡(luò)層封裝的項(xiàng)目规揪,其實(shí)使用Netty的時(shí)候,最主要的就是以下幾個(gè)部分:
- 線程池的劃分
- 是否使用Epoll
- Netty服務(wù)啟動(dòng)的相關(guān)配置
- Netty內(nèi)存池的使用温峭。
- 使用共享的Handler
- 服務(wù)優(yōu)雅的關(guān)閉
線程池的劃分
RocketMQ的服務(wù)端Netty啟動(dòng)類(lèi)是NettyRemotingServer
猛铅,在這里聲明了四個(gè)線程池
//這個(gè)其實(shí)就是netty中的work線程池,默認(rèn)用來(lái)處理Handler方法的調(diào)用
private final EventLoopGroup eventLoopGroupSelector;
// Netty的Boss線程
private final EventLoopGroup eventLoopGroupBoss;
// 公共線程池凤藏,這里用來(lái)處理RocketMQ的業(yè)務(wù)調(diào)用奸忽,這個(gè)有Netty沒(méi)有什么關(guān)系
private final ExecutorService publicExecutor;
// 用來(lái)處理Handler的線程池
private DefaultEventExecutorGroup defaultEventExecutorGroup;
RocketMQ的Netty啟動(dòng)代碼
下面是RocketMQ的Netty服務(wù)啟動(dòng)代碼,如果我們自己想要?jiǎng)?chuàng)建一個(gè)Netty服務(wù)揖庄,直接抄下面的代碼栗菜,修改一下相關(guān)的配置和Handler就可以了:
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
盡量使用Epoll
epoll是Linux下多路復(fù)用IO接口select/poll的增強(qiáng)版本,它能顯著減少程序在大量并發(fā)連接中只有少量活躍的情況下的系統(tǒng)CPU利用率蹄梢。先不管epoll的原理是什么疙筹,這都是底層的實(shí)現(xiàn),只需要知道epoll的效率比其它的方式高就可以检号,在能使用epoll的情況下腌歉,就應(yīng)該選擇使用Netty socket的Epoll模式蛙酪。從RocketMQ的Netty啟動(dòng)代碼來(lái)看齐苛,在選擇使用的ServerSocketChannel時(shí),它使用了一個(gè)方法來(lái)判斷是否使用Epoll桂塞。
private boolean useEpoll() {
return RemotingUtil.isLinuxPlatform()
&& nettyServerConfig.isUseEpollNativeSelector()
&& Epoll.isAvailable();
}
nettyServerConfig.isUseEpollNativeSelector可以讓使用者在可以使用epoll的環(huán)境下凹蜂,強(qiáng)制不使用epoll。
使用Netty的ByteBuf內(nèi)存池
使用Netty的內(nèi)存池可以減少對(duì)象的創(chuàng)建和內(nèi)存分配阁危,進(jìn)而減少gc的量玛痊。在RocketMQ的服務(wù)中,默認(rèn)是netty開(kāi)啟使用netty的內(nèi)存池的狂打。
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
使用共享的Handler
在netty中擂煞,使用Handler處理網(wǎng)絡(luò)的消息,不管是接收網(wǎng)絡(luò)消息趴乡,還是返回網(wǎng)絡(luò)消息对省,都會(huì)經(jīng)過(guò)Handler的處理方法。在一個(gè)網(wǎng)絡(luò)連接創(chuàng)建channel時(shí)晾捏,channel初始化時(shí)就會(huì)添加相應(yīng)的Handler蒿涎。如果每個(gè)Handler內(nèi)都沒(méi)有共用的對(duì)象,那么這些Handler最好標(biāo)記為共享的惦辛,這樣可以減少Handler對(duì)象的創(chuàng)建劳秋。比如RocketMQ的編碼Handler
@ChannelHandler.Sharable
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body);
}
} catch (Exception e) {
log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
if (remotingCommand != null) {
log.error(remotingCommand.toString());
}
RemotingUtil.closeChannel(ctx.channel());
}
}
}
在服務(wù)啟動(dòng)時(shí),會(huì)只在NettyRemotingServer創(chuàng)建一個(gè)NettyEncoder對(duì)象實(shí)例,所有的channel實(shí)例共同使用這個(gè)編碼實(shí)例玻淑。
服務(wù)優(yōu)雅的關(guān)閉
所謂服務(wù)優(yōu)雅的關(guān)閉嗽冒,是指在服務(wù)需要關(guān)閉的時(shí)候,在關(guān)閉之前补履,需要把任務(wù)處理完辛慰,而且在收到關(guān)閉時(shí),不再接收新的任務(wù)干像。在所有的Netty業(yè)務(wù)中帅腌,有業(yè)務(wù)相關(guān)的線程池就是NettyRemotingServer中創(chuàng)建的四個(gè)線程池,所以在關(guān)閉服務(wù)的時(shí)候麻汰,只需要關(guān)閉這幾個(gè)線程池即可速客。并等待線程池中的任務(wù)處理完。
在NettyRemotingServer中有一個(gè)shutdown()方法五鲫。
@Override
public void shutdown() {
try {
if (this.timer != null) {
this.timer.cancel();
}
this.eventLoopGroupBoss.shutdownGracefully();
this.eventLoopGroupSelector.shutdownGracefully();
if (this.nettyEventExecutor != null) {
this.nettyEventExecutor.shutdown();
}
if (this.defaultEventExecutorGroup != null) {
this.defaultEventExecutorGroup.shutdownGracefully();
}
} catch (Exception e) {
log.error("NettyRemotingServer shutdown exception, ", e);
}
if (this.publicExecutor != null) {
try {
this.publicExecutor.shutdown();
} catch (Exception e) {
log.error("NettyRemotingServer shutdown exception, ", e);
}
}
}
這里面調(diào)用的shutdownGracefully()就是
@Override
public Future<?> shutdownGracefully() {
return shutdownGracefully(2, 15, TimeUnit.SECONDS);
}
它會(huì)等待執(zhí)行完線程池中當(dāng)前已所有任務(wù)溺职,然后再關(guān)閉線程池。
那么位喂,何時(shí)調(diào)用的這個(gè)shutdown()方法呢浪耘。在RocketMQ服務(wù)啟動(dòng)的時(shí)候,會(huì)添加一個(gè)回調(diào)鉤子塑崖,比如Namesrv服務(wù)在啟動(dòng)的時(shí)候會(huì)執(zhí)行下面的代碼:
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
這樣在服務(wù)器關(guān)閉的時(shí)候七冲,就會(huì)觸發(fā)controller.shudown()。然后執(zhí)行關(guān)閉線程池的操作规婆。
注意澜躺,關(guān)閉服務(wù)器一般使用kill pid的命令,RocketMQ的發(fā)布包里面的bin下面抒蚜,有一個(gè)mqshutdown的腳本掘鄙,就是使用的kill pid 命令。
pid=`ps ax | grep -i 'org.apache.rocketmq.namesrv.NamesrvStartup' |grep java | grep -v grep | awk '{print $1}'`
if [ -z "$pid" ] ; then
echo "No mqnamesrv running."
exit -1;
fi
echo "The mqnamesrv(${pid}) is running..."
kill ${pid}
首先是使用腳本獲取進(jìn)程名的pid,然后使用 kill pid將進(jìn)程殺死嗡髓。
但是不能使用kill -9 pid操漠,因?yàn)檫@個(gè)命令不會(huì)給進(jìn)程執(zhí)行回調(diào)的機(jī)會(huì),就把進(jìn)程殺死了饿这。