Netty實(shí)現(xiàn)簡易的應(yīng)用層協(xié)議

前一陣在工作中用到了RabbitMQ塞祈,因此對(duì)幾種常見的消息隊(duì)列產(chǎn)生了興趣栈妆。首先從GitHub上下載了RocketMQ的源碼打算一探究竟。在閱讀remoting這個(gè)模塊時(shí)遇到了很大的障礙。RocketMQ的網(wǎng)絡(luò)編程完全基于Netty析校,而本人對(duì)Netty的理解還只停留在了知道這是一款封裝了NIO的優(yōu)秀框架上。于是正好就借此機(jī)會(huì)先揭開Netty的面紗铜涉。
閱讀完《Netty IN ACTION》后有些手癢智玻,就用Netty實(shí)現(xiàn)了一個(gè)簡易的應(yīng)用層協(xié)議以及一個(gè)同步調(diào)用的方法。
github:https://github.com/ztglcy/netty-protocol

整體結(jié)構(gòu)

程序結(jié)構(gòu)

圖里demo中是client和server的簡易demo芙代;handler中則是自定義協(xié)議的編碼器和解碼器吊奢;message中是與傳輸?shù)南⑾嚓P(guān)的類;processor是服務(wù)端的業(yè)務(wù)處理類纹烹;service中則是client和server的啟動(dòng)類页滚。

傳輸消息格式及編解碼

傳輸?shù)南⒏袷?/div>

length是一個(gè)表示消息大小的int型數(shù)字,自定義長度解碼器解決TCP黏包問題铺呵。headerLength則是表示消息頭大小的int型數(shù)字裹驰,用以將傳輸?shù)南㈩^與消息體分開進(jìn)行序列化。header和content分別存儲(chǔ)消息的消息頭以及消息體片挂。

public class MessageHeader{

    private int messageId;
    private int clientId;
    private int serverId;
    private int code;

    private MessageHeader(){}

    public MessageHeader(int code) {
        this.code = code;
    }

    public int getCode() {
        return code;
    }
    public int getMessageId() {
        return messageId;
    }

    public void setMessageId(int messageId) {
        this.messageId = messageId;
    }

    public int getClientId() {
        return clientId;
    }

    public void setClientId(int clientId) {
        this.clientId = clientId;
    }

    public int getServerId() {
        return serverId;
    }

    public void setServerId(int serverId) {
        this.serverId = serverId;
    }

    public void setCode(int code) {
        this.code = code;
    }

}

消息頭包含messageId幻林,clientId贞盯,serverId,code四個(gè)參數(shù)沪饺,分別用以表征Message的ID躏敢,客戶端ID,服務(wù)端ID整葡,以及消息體的格式code(維護(hù)在一個(gè)常量中)件余。公有的構(gòu)造方法中必傳消息體格式code,私有的構(gòu)造方法用于fastjson的反序列化遭居。

public class Message {

    private MessageHeader messageHeader;

    private byte[] content;

    private Message(){
    }

    public static Message createMessage(MessageHeader messageHeader){
        Message message = new Message();
        message.messageHeader = messageHeader;
        return message;
    }

    public byte[] getContent() {
        return content;
    }

    public void setContent(byte[] content) {
        this.content = content;
    }

    public MessageHeader getMessageHeader() {
        return messageHeader;
    }
     //還有編碼與解碼的方法
    ......
}

Message的結(jié)構(gòu)比較直白蛾扇,包含消息頭和消息體以及編碼與解碼的方法。解碼與編碼的方法在解碼器與編碼器中進(jìn)行調(diào)用魏滚。先來介紹一下編碼的過程镀首。

public class ProtocolEncoder extends MessageToByteEncoder<Message> {

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {

        ByteBuffer byteBuffer = message.encode();
        byteBuf.writeBytes(byteBuffer);

    }
}

編碼器繼承了MessageToByteEncoder并實(shí)現(xiàn)了其encode()方法進(jìn)行編碼。編碼器直接調(diào)用傳進(jìn)來的Message自己的編碼方法鼠次,將編碼后的ByteBuffer寫入ByteBuf中更哄。再來看一下Message怎么實(shí)現(xiàn)這個(gè)方法的。

 public ByteBuffer encode(){
        int length = 4;
        byte[] bytes = SerializableHelper.encode(messageHeader);
        if(bytes != null){
            length += bytes.length;
        }
        if(content!=null){
            length += content.length;
        }

        ByteBuffer byteBuffer = ByteBuffer.allocate(length + 4);
        byteBuffer.putInt(length);
        if(bytes != null){
            byteBuffer.putInt(bytes.length);
            byteBuffer.put(bytes);
        }else{
            byteBuffer.putInt(0);
        }
        if(content!=null){
            byteBuffer.put(content);
        }
        byteBuffer.flip();

        return byteBuffer;
    }

length用以表示整個(gè)消息大小腥寇,計(jì)算方式為表示消息頭大小的int+序列化后的消息頭大小+消息體大小成翩。計(jì)算完成后申請一塊length+4大小的ByteBuffer(因?yàn)閘ength本身存儲(chǔ)也要4個(gè)字節(jié))。將消息內(nèi)容按照上面給出的格式依次寫入Buffer中赦役。

public class ProtocolDecoder extends LengthFieldBasedFrameDecoder {

    public ProtocolDecoder() {
        super(16777216, 0, 4,0,4);
    }

    @Override
    public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf byteBuf = (ByteBuf) super.decode(ctx, in);
        if(byteBuf == null){
            return null;
        }
        ByteBuffer byteBuffer = byteBuf.nioBuffer();
        return Message.decode(byteBuffer);
    }

}

解碼器繼承了LengthFieldBasedFrameDecoder用以解決粘包的問題麻敌。構(gòu)造函數(shù)中的參數(shù)分別表示包的最大值、長度字段的偏移量掂摔、長度字段占的字節(jié)數(shù)术羔、添加到長度字段的補(bǔ)償值以及從解碼幀中第一次去除的字節(jié)數(shù)。因?yàn)橄⒌念^部存儲(chǔ)了4個(gè)字節(jié)的表示消息大小的Int型乙漓,所以后4個(gè)參數(shù)為0级历、4、0叭披、4寥殖。經(jīng)過處理后的消息已經(jīng)剝離掉了最消息頭部的Int型。再調(diào)用Message自身的decode()方法進(jìn)行解碼涩蜘。

public static Message decode(ByteBuffer byteBuffer){
        int length = byteBuffer.limit();
        int headerLength = byteBuffer.getInt();

        byte[] headerData = new byte[headerLength];
        byteBuffer.get(headerData);
        MessageHeader messageHeader = SerializableHelper.decode(headerData,MessageHeader.class);

        byte[] content = new byte[length - headerLength -4];
        byteBuffer.get(content);

        Message message = Message.createMessage(messageHeader);
        message.setContent(content);
        return message;
    }

解碼時(shí)首先將消息頭的長度從ByteBuffer中取出嚼贡,然后讀取該長度的字節(jié)作為消息頭進(jìn)行反序列化,其他部分則作為消息體同诫,重新組裝成新的Message粤策。

服務(wù)端與客戶端的引導(dǎo)

public interface ProtocolService {

    void start();

    void shutdown();
    
}

服務(wù)端與客戶端都繼承了ProtocolService接口,實(shí)現(xiàn)了start()和shutdown()兩個(gè)方法剩辟。

public class NettyProtocolServer implements ProtocolService {

    private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    private EventLoopGroup workerGroup = new NioEventLoopGroup();
    private Map<Integer,ProtocolProcessor> processorMap = new HashMap<>();

    @Override
    public void start(){
        try{
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(8888)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast(new ProtocolEncoder()
                                            ,new ProtocolDecoder()
                                            ,new ProtocolServerProcessor()
                                    );
                        }
                    });
            ChannelFuture cf = bootstrap.bind().sync();
        } catch (InterruptedException e) {

        }
    }

    @Override
    public void shutdown() {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }

    public void registerProcessor(Integer code,ProtocolProcessor protocolProcessor){
        processorMap.put(code,protocolProcessor);
    }

    public class ProtocolServerProcessor extends SimpleChannelInboundHandler<Message> {

        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, Message message) throws Exception {
            Integer code = message.getMessageHeader().getCode();
            ProtocolProcessor processor = processorMap.get(code);
            if(processor != null){
                Message response = processor.process(message);
                channelHandlerContext.writeAndFlush(response);
            }
        }
    }
}

服務(wù)端的start()方法中完成了服務(wù)端的初始化掐场,很常見的netty寫法往扔,將編碼器、解碼器以及業(yè)務(wù)處理器ProtocolServerProcessor加入了Worker的Pipeline中熊户。這里業(yè)務(wù)處理器也可以放在線程池里執(zhí)行萍膛,防止業(yè)務(wù)處理時(shí)間太長造成堵塞。shutdown()方法則將兩個(gè)EventLoopGroup進(jìn)行關(guān)閉嚷堡,防止資源泄露蝗罗。registerProcessor()方法則是將業(yè)務(wù)處理器以KV的形式注冊到服務(wù)端中,ProtocolServerProcessor會(huì)根據(jù)消息頭中的code在map中查找對(duì)應(yīng)的業(yè)務(wù)處理器進(jìn)行業(yè)務(wù)的處理蝌戒。

public class DemoProcessor implements ProtocolProcessor{

    @Override
    public Message process(Message message) {
        byte[] bodyDate = message.getContent();
        DemoMessageBody messageBody = SerializableHelper.decode(bodyDate,DemoMessageBody.class);
        System.out.println(messageBody.getDemo());

        MessageHeader messageHeader = new MessageHeader(1);
        messageHeader.setMessageId(message.getMessageHeader().getMessageId());
        DemoMessageBody responseBody = new DemoMessageBody();
        responseBody.setDemo("I received!");

        Message response = Message.createMessage(messageHeader);
        response.setContent(SerializableHelper.encode(responseBody));
        return response;

    }
}

DemoProcessor是一個(gè)示例的業(yè)務(wù)處理器串塑,將傳來的消息體解碼后返回一個(gè)I received!的回復(fù),這里注意的是messageId要與請求的消息一致北苟,用以表征這是哪個(gè)請求的返回桩匪。

public class NettyProtocolClient implements ProtocolService {

    private EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    private Bootstrap bootstrap = new Bootstrap();
    private ConcurrentHashMap<Integer,Response> responseMap = new ConcurrentHashMap<>();

    @Override
    public void start() {
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                                .addLast(new ProtocolDecoder(),
                                        new ProtocolEncoder(),
                                        new ProtocolClientProcessor());
                    }
                });
    }

    @Override
    public void shutdown() {
        eventLoopGroup.shutdownGracefully();
    }

    public Message send(String address, Message message){
        try {
            Response response = new Response();
            responseMap.put(message.getMessageHeader().getMessageId(),response);
            Channel channel = bootstrap.connect(address,8888).sync().channel();

            channel.writeAndFlush(message);
            Message responseMessage = response.waitResponse();
            responseMap.remove(message.getMessageHeader().getMessageId());
            channel.close();
            return responseMessage;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    public class ProtocolClientProcessor extends SimpleChannelInboundHandler<Message> {

        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, Message message) throws Exception {
            Response response = responseMap.get(message.getMessageHeader().getMessageId());
            if (response != null){
                response.putResponse(message);
            }
        }
    }
}

客戶端與服務(wù)端的start()方法和shutdown()方法類似∮驯牵客戶端提供了一個(gè)send()方法用以消息的同步調(diào)用傻昙,send()方法在發(fā)送信息后以消息的messageId為key生成一個(gè)Response的實(shí)例緩存在responseMap中,調(diào)用Response中的countDownLatch.await()方法堵塞住等待返回(這里應(yīng)該加一個(gè)時(shí)間限制以防止線程無限期地堵塞撞嗜印)妆档。ProtocolClientProcessor會(huì)處理返回的消息,將其存入對(duì)應(yīng)的Response中虫碉,并調(diào)用countDownLatch.countDown()贾惦。這樣客戶端線程就可以收到結(jié)果同步返回。還可以改進(jìn)的一點(diǎn)在于保持客戶端與服務(wù)端的長連接敦捧,將其緩存在客戶端中须板,每次發(fā)送消息都用已緩存的連接,減少開銷绞惦。

DEMO

最后分別編寫一個(gè)客戶端與服務(wù)端的demo用以測試我們的協(xié)議逼纸。

public class ServerDemo {

    public static void main(String[] args) {
        NettyProtocolServer nettyProtocolServer = new NettyProtocolServer();
        nettyProtocolServer.registerProcessor(1,new DemoProcessor());
        nettyProtocolServer.start();
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        nettyProtocolServer.shutdown();
    }
}

服務(wù)端的demo首先構(gòu)建一個(gè)NettyProtocolServer的實(shí)例洋措,將DemoProcessor注冊到服務(wù)端中之后掛起主線程等待客戶端的消息济蝉,最后shutdown掉NettyProtocolServer。

public class ClientDemo {
    public static void main(String[] args) {
        NettyProtocolClient client = new NettyProtocolClient();
        client.start();
        Message message = demoMessage();
        Message messageResponse = client.send("localhost",message);
        System.out.println(SerializableHelper.decode(messageResponse.getContent(),DemoMessageBody.class).getDemo());
        client.shutdown();
    }

    private static Message demoMessage(){
        MessageHeader messageHeader = new MessageHeader(1);
        messageHeader.setMessageId(1);
        messageHeader.setClientId(1);
        messageHeader.setServerId(1);
        Message message =Message.createMessage(messageHeader);
        DemoMessageBody responseBody = new DemoMessageBody();
        responseBody.setDemo("Hello World!");
        message.setContent(SerializableHelper.encode(responseBody));
        return message;
    }
}

客戶端的demo也很簡單菠发。構(gòu)建一個(gè)NettyProtocolClient的實(shí)例王滤,拼裝一個(gè)消息,調(diào)用send()方法滓鸠,再對(duì)返回的消息稍加處理就OK啦(客戶端拼裝和處理消息可以再抽出一個(gè)中間層)雁乡。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市糜俗,隨后出現(xiàn)的幾起案子踱稍,更是在濱河造成了極大的恐慌曲饱,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,284評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件珠月,死亡現(xiàn)場離奇詭異扩淀,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)啤挎,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門驻谆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人庆聘,你說我怎么就攤上這事胜臊。” “怎么了伙判?”我有些...
    開封第一講書人閱讀 164,614評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵象对,是天一觀的道長。 經(jīng)常有香客問我宴抚,道長织盼,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,671評(píng)論 1 293
  • 正文 為了忘掉前任酱塔,我火速辦了婚禮沥邻,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘羊娃。我一直安慰自己唐全,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,699評(píng)論 6 392
  • 文/花漫 我一把揭開白布蕊玷。 她就那樣靜靜地躺著邮利,像睡著了一般。 火紅的嫁衣襯著肌膚如雪垃帅。 梳的紋絲不亂的頭發(fā)上延届,一...
    開封第一講書人閱讀 51,562評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音贸诚,去河邊找鬼方庭。 笑死,一個(gè)胖子當(dāng)著我的面吹牛酱固,可吹牛的內(nèi)容都是我干的械念。 我是一名探鬼主播,決...
    沈念sama閱讀 40,309評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼运悲,長吁一口氣:“原來是場噩夢啊……” “哼龄减!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起班眯,我...
    開封第一講書人閱讀 39,223評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤希停,失蹤者是張志新(化名)和其女友劉穎烁巫,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體宠能,經(jīng)...
    沈念sama閱讀 45,668評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡程拭,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,859評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了棍潘。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片恃鞋。...
    茶點(diǎn)故事閱讀 39,981評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖亦歉,靈堂內(nèi)的尸體忽然破棺而出恤浪,到底是詐尸還是另有隱情,我是刑警寧澤肴楷,帶...
    沈念sama閱讀 35,705評(píng)論 5 347
  • 正文 年R本政府宣布水由,位于F島的核電站,受9級(jí)特大地震影響赛蔫,放射性物質(zhì)發(fā)生泄漏砂客。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,310評(píng)論 3 330
  • 文/蒙蒙 一呵恢、第九天 我趴在偏房一處隱蔽的房頂上張望鞠值。 院中可真熱鬧,春花似錦渗钉、人聲如沸彤恶。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽声离。三九已至,卻和暖如春瘫怜,著一層夾襖步出監(jiān)牢的瞬間术徊,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評(píng)論 1 270
  • 我被黑心中介騙來泰國打工鲸湃, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留纹坐,地道東北人灭美。 一個(gè)月前我還...
    沈念sama閱讀 48,146評(píng)論 3 370
  • 正文 我出身青樓痕惋,卻偏偏與公主長得像防嗡,于是被迫代替她去往敵國和親拙吉。 傳聞我的和親對(duì)象是個(gè)殘疾皇子脊僚,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,933評(píng)論 2 355