RocketMQ中這樣使用的Netty框架

Apache RocketMQ作為阿里開(kāi)源的一款高性能没陡、高吞吐量的分布式消息中間件衩藤,它的通信能力已得到業(yè)界的公認(rèn)叠赦。它的通信層是借助于異常網(wǎng)絡(luò)框架Netty實(shí)現(xiàn)的纲缓。在開(kāi)發(fā)網(wǎng)絡(luò)游戲的時(shí)候痘拆,Netty也常用于游戲服務(wù)器或網(wǎng)關(guān)的通信層框架仰禽,所以,可以通過(guò)學(xué)習(xí)RocketMQ是如何使用Netty框架,從中借鑒一此應(yīng)用技巧吐葵。

在RocketMQ中的rocketmq-remoting項(xiàng)目就是針對(duì)網(wǎng)絡(luò)層封裝的項(xiàng)目规揪,其實(shí)使用Netty的時(shí)候,最主要的就是以下幾個(gè)部分:

  1. 線程池的劃分
  2. 是否使用Epoll
  3. Netty服務(wù)啟動(dòng)的相關(guān)配置
  4. Netty內(nèi)存池的使用温峭。
  5. 使用共享的Handler
  6. 服務(wù)優(yōu)雅的關(guān)閉

線程池的劃分

RocketMQ的服務(wù)端Netty啟動(dòng)類(lèi)是NettyRemotingServer猛铅,在這里聲明了四個(gè)線程池

    //這個(gè)其實(shí)就是netty中的work線程池,默認(rèn)用來(lái)處理Handler方法的調(diào)用
    private final EventLoopGroup eventLoopGroupSelector;
   // Netty的Boss線程
    private final EventLoopGroup eventLoopGroupBoss;
   // 公共線程池凤藏,這里用來(lái)處理RocketMQ的業(yè)務(wù)調(diào)用奸忽,這個(gè)有Netty沒(méi)有什么關(guān)系
    private final ExecutorService publicExecutor;
   // 用來(lái)處理Handler的線程池
    private DefaultEventExecutorGroup defaultEventExecutorGroup;

RocketMQ的Netty啟動(dòng)代碼

下面是RocketMQ的Netty服務(wù)啟動(dòng)代碼,如果我們自己想要?jiǎng)?chuàng)建一個(gè)Netty服務(wù)揖庄,直接抄下面的代碼栗菜,修改一下相關(guān)的配置和Handler就可以了:

 ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                connectionManageHandler,
                                serverHandler
                            );
                    }
                });

        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

盡量使用Epoll

epoll是Linux下多路復(fù)用IO接口select/poll的增強(qiáng)版本,它能顯著減少程序在大量并發(fā)連接中只有少量活躍的情況下的系統(tǒng)CPU利用率蹄梢。先不管epoll的原理是什么疙筹,這都是底層的實(shí)現(xiàn),只需要知道epoll的效率比其它的方式高就可以检号,在能使用epoll的情況下腌歉,就應(yīng)該選擇使用Netty socket的Epoll模式蛙酪。從RocketMQ的Netty啟動(dòng)代碼來(lái)看齐苛,在選擇使用的ServerSocketChannel時(shí),它使用了一個(gè)方法來(lái)判斷是否使用Epoll桂塞。

    private boolean useEpoll() {
        return RemotingUtil.isLinuxPlatform()
            && nettyServerConfig.isUseEpollNativeSelector()
            && Epoll.isAvailable();
    }

nettyServerConfig.isUseEpollNativeSelector可以讓使用者在可以使用epoll的環(huán)境下凹蜂,強(qiáng)制不使用epoll。

使用Netty的ByteBuf內(nèi)存池

使用Netty的內(nèi)存池可以減少對(duì)象的創(chuàng)建和內(nèi)存分配阁危,進(jìn)而減少gc的量玛痊。在RocketMQ的服務(wù)中,默認(rèn)是netty開(kāi)啟使用netty的內(nèi)存池的狂打。

        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

使用共享的Handler

在netty中擂煞,使用Handler處理網(wǎng)絡(luò)的消息,不管是接收網(wǎng)絡(luò)消息趴乡,還是返回網(wǎng)絡(luò)消息对省,都會(huì)經(jīng)過(guò)Handler的處理方法。在一個(gè)網(wǎng)絡(luò)連接創(chuàng)建channel時(shí)晾捏,channel初始化時(shí)就會(huì)添加相應(yīng)的Handler蒿涎。如果每個(gè)Handler內(nèi)都沒(méi)有共用的對(duì)象,那么這些Handler最好標(biāo)記為共享的惦辛,這樣可以減少Handler對(duì)象的創(chuàng)建劳秋。比如RocketMQ的編碼Handler

@ChannelHandler.Sharable
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

    @Override
    public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
        throws Exception {
        try {
            ByteBuffer header = remotingCommand.encodeHeader();
            out.writeBytes(header);
            byte[] body = remotingCommand.getBody();
            if (body != null) {
                out.writeBytes(body);
            }
        } catch (Exception e) {
            log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            if (remotingCommand != null) {
                log.error(remotingCommand.toString());
            }
            RemotingUtil.closeChannel(ctx.channel());
        }
    }
}

在服務(wù)啟動(dòng)時(shí),會(huì)只在NettyRemotingServer創(chuàng)建一個(gè)NettyEncoder對(duì)象實(shí)例,所有的channel實(shí)例共同使用這個(gè)編碼實(shí)例玻淑。

服務(wù)優(yōu)雅的關(guān)閉

所謂服務(wù)優(yōu)雅的關(guān)閉嗽冒,是指在服務(wù)需要關(guān)閉的時(shí)候,在關(guān)閉之前补履,需要把任務(wù)處理完辛慰,而且在收到關(guān)閉時(shí),不再接收新的任務(wù)干像。在所有的Netty業(yè)務(wù)中帅腌,有業(yè)務(wù)相關(guān)的線程池就是NettyRemotingServer中創(chuàng)建的四個(gè)線程池,所以在關(guān)閉服務(wù)的時(shí)候麻汰,只需要關(guān)閉這幾個(gè)線程池即可速客。并等待線程池中的任務(wù)處理完。
在NettyRemotingServer中有一個(gè)shutdown()方法五鲫。

  @Override
    public void shutdown() {
        try {
            if (this.timer != null) {
                this.timer.cancel();
            }

            this.eventLoopGroupBoss.shutdownGracefully();

            this.eventLoopGroupSelector.shutdownGracefully();

            if (this.nettyEventExecutor != null) {
                this.nettyEventExecutor.shutdown();
            }

            if (this.defaultEventExecutorGroup != null) {
                this.defaultEventExecutorGroup.shutdownGracefully();
            }
        } catch (Exception e) {
            log.error("NettyRemotingServer shutdown exception, ", e);
        }

        if (this.publicExecutor != null) {
            try {
                this.publicExecutor.shutdown();
            } catch (Exception e) {
                log.error("NettyRemotingServer shutdown exception, ", e);
            }
        }
    }

這里面調(diào)用的shutdownGracefully()就是

    @Override
    public Future<?> shutdownGracefully() {
        return shutdownGracefully(2, 15, TimeUnit.SECONDS);
    }

它會(huì)等待執(zhí)行完線程池中當(dāng)前已所有任務(wù)溺职,然后再關(guān)閉線程池。
那么位喂,何時(shí)調(diào)用的這個(gè)shutdown()方法呢浪耘。在RocketMQ服務(wù)啟動(dòng)的時(shí)候,會(huì)添加一個(gè)回調(diào)鉤子塑崖,比如Namesrv服務(wù)在啟動(dòng)的時(shí)候會(huì)執(zhí)行下面的代碼:

        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));

這樣在服務(wù)器關(guān)閉的時(shí)候七冲,就會(huì)觸發(fā)controller.shudown()。然后執(zhí)行關(guān)閉線程池的操作规婆。
注意澜躺,關(guān)閉服務(wù)器一般使用kill pid的命令,RocketMQ的發(fā)布包里面的bin下面抒蚜,有一個(gè)mqshutdown的腳本掘鄙,就是使用的kill pid 命令。

  pid=`ps ax | grep -i 'org.apache.rocketmq.namesrv.NamesrvStartup' |grep java | grep -v grep | awk '{print $1}'`
    if [ -z "$pid" ] ; then
            echo "No mqnamesrv running."
            exit -1;
    fi

    echo "The mqnamesrv(${pid}) is running..."

    kill ${pid}

首先是使用腳本獲取進(jìn)程名的pid,然后使用 kill pid將進(jìn)程殺死嗡髓。
但是不能使用kill -9 pid操漠,因?yàn)檫@個(gè)命令不會(huì)給進(jìn)程執(zhí)行回調(diào)的機(jī)會(huì),就把進(jìn)程殺死了饿这。


歡迎關(guān)注
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末浊伙,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子蛹稍,更是在濱河造成了極大的恐慌吧黄,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,544評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件唆姐,死亡現(xiàn)場(chǎng)離奇詭異拗慨,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,430評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)赵抢,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)剧蹂,“玉大人,你說(shuō)我怎么就攤上這事烦却〕璧穑” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,764評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵其爵,是天一觀的道長(zhǎng)冒冬。 經(jīng)常有香客問(wèn)我,道長(zhǎng)摩渺,這世上最難降的妖魔是什么简烤? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,193評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮摇幻,結(jié)果婚禮上横侦,老公的妹妹穿的比我還像新娘。我一直安慰自己绰姻,他們只是感情好枉侧,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,216評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著狂芋,像睡著了一般榨馁。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上银酗,一...
    開(kāi)封第一講書(shū)人閱讀 51,182評(píng)論 1 299
  • 那天辆影,我揣著相機(jī)與錄音,去河邊找鬼黍特。 笑死,一個(gè)胖子當(dāng)著我的面吹牛锯蛀,可吹牛的內(nèi)容都是我干的灭衷。 我是一名探鬼主播,決...
    沈念sama閱讀 40,063評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼旁涤,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼翔曲!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起劈愚,我...
    開(kāi)封第一講書(shū)人閱讀 38,917評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤瞳遍,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后菌羽,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體掠械,經(jīng)...
    沈念sama閱讀 45,329評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,543評(píng)論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了猾蒂。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片均唉。...
    茶點(diǎn)故事閱讀 39,722評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖肚菠,靈堂內(nèi)的尸體忽然破棺而出舔箭,到底是詐尸還是另有隱情,我是刑警寧澤蚊逢,帶...
    沈念sama閱讀 35,425評(píng)論 5 343
  • 正文 年R本政府宣布层扶,位于F島的核電站,受9級(jí)特大地震影響烙荷,放射性物質(zhì)發(fā)生泄漏怒医。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,019評(píng)論 3 326
  • 文/蒙蒙 一奢讨、第九天 我趴在偏房一處隱蔽的房頂上張望稚叹。 院中可真熱鬧,春花似錦拿诸、人聲如沸扒袖。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,671評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)季率。三九已至,卻和暖如春描沟,著一層夾襖步出監(jiān)牢的瞬間飒泻,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,825評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工吏廉, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留泞遗,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,729評(píng)論 2 368
  • 正文 我出身青樓席覆,卻偏偏與公主長(zhǎng)得像史辙,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子佩伤,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,614評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容