使用Netty搭建udp服務(wù)和http服務(wù)

一琐旁、 組件介紹

  • Channel
    在 Netty 中, Channel 是一個 Socket 連接的抽象, 它為用戶提供了關(guān)于底層 Socket 狀態(tài)(是否是連接還是斷開) 以及對 Socket 的讀寫等操作缭乘。
    服務(wù)器連接監(jiān)聽的channel 婆廊,也叫 parent channel密强。 對應(yīng)每一個客戶端連接讀寫的channel,叫 child channel
  • EventLoopGroup
    Netty的調(diào)度模塊稱為EvenLoopGroup,它包含一組EventLoop盆佣,Channel通過注冊到EventLoop中執(zhí)行操作。默認EventLoop個數(shù)為cpu核數(shù)的兩倍
    BossGroup(boss線程組):相當(dāng)于mainReactor械荷,負責(zé)建立連接并且把連接注冊到WorkGroup中共耍。
    WorkerGroup(worker線程組):相當(dāng)于subReactor,WorkGroup負責(zé)處理連接對應(yīng)的讀寫事件吨瞎。
    boss group和worker goup相當(dāng)于多Reactor多線程設(shè)計模式痹兜。
    EvenLoopGroup.png
  • Channel類型
    NioDatagramChannel,表示異步的UDP連接颤诀;
    NioSocketChannel表示異步的TCP連接
    NioServerSocketChannel表示異步的TCP Socket連接字旭,對每一個新進來的連接都會創(chuàng)建一個SocketChannel对湃。
  • Bootstrap和ServerBootstrap
    啟動器,完成Netty客戶端或服務(wù)端的初始化谐算;
    因為UDP是無連接的熟尉,所有直接使用Bootstrap归露;Http則使用ServerBootstrap
    啟動流程:
    BootstrapStart.png

二洲脂、主要邏輯

udp服務(wù)啟動相關(guān)邏輯:

public void startUdpServer(){
    EventLoopGroup group = new NioEventLoopGroup();
    try {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)  //配置一個線程組
                .channel(NioDatagramChannel.class)  //設(shè)置channel類型為UPD
                .option(ChannelOption.SO_BROADCAST, true)   //支持廣播
                .option(ChannelOption.SO_RCVBUF, 2048 * 1024)// 設(shè)置channel讀緩沖區(qū)大小
                .option(ChannelOption.SO_SNDBUF, 2048 * 1024)// 設(shè)置channel寫緩沖區(qū)大小
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {       //裝配handler流水線
                        ChannelPipeline pipeline = ch.pipeline();    //Handler將按pipeline中添加的順序執(zhí)行
                        pipeline.addLast(new UdpServerHandler(serviceFactory));   //自定義的處理器
                    }
                });
        //綁定端口(默認是異步的,可以加ChannelFunture的監(jiān)聽事件)剧包,sync()同步阻塞等待連接成功恐锦;客戶端使用.connect(host, port)連接
        ChannelFuture channelFuture = bootstrap.bind(port).sync();     
        log.info("udp服務(wù)器啟動,端口為"+port);
        nettyUtil.setChannel(channelFuture.channel());
        //sync()同步阻塞等待channel關(guān)閉
        channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }finally{
        //關(guān)閉資源
        group.shutdownGracefully();
    }
}

udp自定義handler相關(guān)邏輯疆液,注意這里泛型為DatagramPacket一铅,表示接收處理UDP報文

@Slf4j
public class UdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
    
    private ServiceFactory serviceFactory;
    //創(chuàng)建可緩存線程池
    ExecutorService executorService = Executors.newCachedThreadPool();
    
    public ServerHandler(ServiceFactory serviceFactory){
        this.serviceFactory = serviceFactory;
    }

    //監(jiān)聽channel的消息,注意此時的handler為單線程處理堕油,可以把請求加到線程池中提升效率
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
        DatagramPacket p = packet.copy();
        //將請求放在線程池處理
        executorService.execute(new UdpHandlerRunnable(p));
    }
    
    class UdpHandlerRunnable implements Runnable{
        DatagramPacket packet;
        UdpHandlerRunnable(DatagramPacket packet){
            this.packet = packet;
        }
        public void run(){
            ByteBuf byteBuf = packet.content();
            byteBuf.retain();       // byteBuf引用計數(shù)加1潘飘,避免報引用為0異常
            String content = new String(ByteBufUtil.getBytes(byteBuf));
            log.info("得到來自 "+packet.sender()+" 的請求, content = " + content);
            // 業(yè)務(wù)邏輯
            // ...
            byteBuf.release();  //注意釋放byteBuf和packet
            packet.release();
        }
    }

}

http服務(wù)啟動相關(guān)邏輯

public void startHttpServer(){
    EventLoopGroup bossGroup = new NioEventLoopGroup();      //boss工作組
    EventLoopGroup workerGroup = new NioEventLoopGroup();  //worker工作組
    try {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap
                .group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)                //設(shè)置channel類型為NioServerSocketChannel
                .childOption(ChannelOption.SO_RCVBUF, 2048 * 1024)// 設(shè)置channel讀緩沖區(qū)為2M
                .childOption(ChannelOption.SO_SNDBUF, 1024 * 1024)// 設(shè)置channel寫緩沖區(qū)為1M
                .childHandler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {       //裝配handler流水線
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new HttpResponseEncoder());    //編碼器
                        pipeline.addLast(new HttpRequestDecoder());      //解碼器
                        pipeline.addLast("aggregator", new HttpObjectAggregator(65536));//設(shè)置塊的最大字節(jié)數(shù)
                        pipeline.addLast(new HttpServerHandler(serviceFactory));   //自定義的處理器
                    }
                });
        ChannelFuture channelFuture = serverBootstrap.bind(port).sync();      //綁定端口,sync()同步阻塞等待連接成功
        log.info("http服務(wù)器啟動掉缺,端口為"+port);
        nettyUtil.setChannel(channelFuture.channel());
        //sync()同步阻塞等待channel關(guān)閉
        channelFuture.channel().closeFuture().sync();
    }catch (InterruptedException e) {
        e.printStackTrace();
    }finally{
        //關(guān)閉資源
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

http自定義handler相關(guān)邏輯

@Slf4j
public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    private ServiceFactory serviceFactory;
    public HttpServerHandler(ServiceFactory serviceFactory){
        this.serviceFactory = serviceFactory;
    }

    //監(jiān)聽channel的消息
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        String method = request.method().name();
        String uri = request.uri();
        String body = fullHttpRequest.content().toString(CharsetUtil.UTF_8);
        log.info("method = {}, uri = {}, body = {}", method, uri, body);
        //具體業(yè)務(wù)邏輯...
    }

    //異常捕獲
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        NettyUtil.httpResponse(ctx.channel(), Error.error("錯誤請求"));
    }
    
}

發(fā)送請求的工具類

/**
     * udp單播發(fā)送
     * @param host
     * @param port
     * @param pushMsg   待發(fā)送的信息
     */
    public void singleCast(String host, int port, String pushMsg){
        InetSocketAddress remoteAddress = new InetSocketAddress(host, port);    //遠程地址
        ByteBuf byteBuf1 = new UnpooledByteBufAllocator(false).buffer();
        byteBuf1.writeCharSequence(pushMsg, CharsetUtil.UTF_8);
        DatagramPacket packet = new DatagramPacket(byteBuf1, remoteAddress);
        this.channel.writeAndFlush(packet);
    }

    /**
     * http響應(yīng)
     * @param channel
     * @param sendMsg   響應(yīng)的信息
     */
    public void httpResponse(Channel channel, String sendMsg){
        FullHttpResponse response = new DefaultFullHttpResponse(
                HttpVersion.HTTP_1_1,
                HttpResponseStatus.OK,
                Unpooled.wrappedBuffer(sendMsg.getBytes(CharsetUtil.UTF_8)));
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=UTF-8");
        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        channel.writeAndFlush(response);
    }

    /**
     * 參數(shù)轉(zhuǎn)map
     * @param fullReq
     * @return
     */
    public Map<String, String> parse(FullHttpRequest fullReq) {
        try{
            HttpMethod method = fullReq.method();
            Map<String, String> parmMap = new HashMap<>();
            if (HttpMethod.GET == method) {
                // 是GET請求
                QueryStringDecoder decoder = new QueryStringDecoder(fullReq.uri());
                decoder.parameters().entrySet().forEach( entry -> {
                    // entry.getValue()是一個List, 只取第一個元素
                    parmMap.put(entry.getKey(), entry.getValue().get(0));
                });
            } else if (HttpMethod.POST == method) {
                // 是POST請求
                HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(fullReq);
                decoder.offer(fullReq);
                List<InterfaceHttpData> parmList = decoder.getBodyHttpDatas();
                for (InterfaceHttpData parm : parmList) {

                    Attribute data = (Attribute) parm;
                    parmMap.put(data.getName(), data.getValue());
                }

            } else {
                // 不支持其它方法
                log.error("暫不支持該方法");
            }
            return parmMap;
        }catch (IOException e) {
            return null;
        }
    }

    /**
     * 文件下載
     * @param ctx
     * @param fileName
     * @param type
     */
    public void responseExportFile(ChannelHandlerContext ctx, String fileName, String type) {

        try {
            StringBuilder path = new StringBuilder();
            if( type.equals("1") ){ //apk
                path.append(".").append(File.separator).append("upload").append(File.separator).append("apk").append(File.separator).append(fileName);
            }
            if( type.equals("2") ){ //圖片
                path.append(".").append(File.separator).append("upload").append(File.separator).append("img").append(File.separator).append(fileName);
            }
            File file = new File(path.toString()).getCanonicalFile();
            if(!file.exists()){
                log.error("下載文件不存在, path = {}", path);
                this.httpResponse(ctx.channel(), JSON.toJSONString(Result.failure("下載文件不存在")));
                return;
            }
            //隨機讀取文件
            final RandomAccessFile raf = new RandomAccessFile(file, "r");
            long fileLength = raf.length();
            //定義response對象
            HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
            //設(shè)置請求頭部
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, fileLength);
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream; charset=UTF-8");
            response.headers().add(HttpHeaderNames.CONTENT_DISPOSITION,
                    "attachment; filename=\"" + URLEncoder.encode(file.getName(), "UTF-8") + "\";");
            ctx.write(response);
            //設(shè)置事件通知對象
            ChannelFuture sendFileFuture = ctx
                    .write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
            sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
                //文件傳輸完成執(zhí)行監(jiān)聽器
                @Override
                public void operationComplete(ChannelProgressiveFuture future)
                        throws Exception {
                   log.info("文件 {} 下載成功.", fileName);
                }
                //文件傳輸進度監(jiān)聽器
                @Override
                public void operationProgressed(ChannelProgressiveFuture future,
                                                long progress, long total) throws Exception {
                }
            });
            //刷新緩沖區(qū)數(shù)據(jù)卜录,文件結(jié)束標(biāo)志符
            ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市眶明,隨后出現(xiàn)的幾起案子艰毒,更是在濱河造成了極大的恐慌,老刑警劉巖搜囱,帶你破解...
    沈念sama閱讀 222,000評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件丑瞧,死亡現(xiàn)場離奇詭異,居然都是意外死亡蜀肘,警方通過查閱死者的電腦和手機绊汹,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,745評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來扮宠,“玉大人西乖,你說我怎么就攤上這事『眩” “怎么了浴栽?”我有些...
    開封第一講書人閱讀 168,561評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長轿偎。 經(jīng)常有香客問我典鸡,道長,這世上最難降的妖魔是什么坏晦? 我笑而不...
    開封第一講書人閱讀 59,782評論 1 298
  • 正文 為了忘掉前任萝玷,我火速辦了婚禮嫁乘,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘球碉。我一直安慰自己蜓斧,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 68,798評論 6 397
  • 文/花漫 我一把揭開白布睁冬。 她就那樣靜靜地躺著挎春,像睡著了一般。 火紅的嫁衣襯著肌膚如雪豆拨。 梳的紋絲不亂的頭發(fā)上直奋,一...
    開封第一講書人閱讀 52,394評論 1 310
  • 那天,我揣著相機與錄音施禾,去河邊找鬼脚线。 笑死,一個胖子當(dāng)著我的面吹牛弥搞,可吹牛的內(nèi)容都是我干的邮绿。 我是一名探鬼主播,決...
    沈念sama閱讀 40,952評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼攀例,長吁一口氣:“原來是場噩夢啊……” “哼船逮!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起肛度,我...
    開封第一講書人閱讀 39,852評論 0 276
  • 序言:老撾萬榮一對情侶失蹤傻唾,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后承耿,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體冠骄,經(jīng)...
    沈念sama閱讀 46,409評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,483評論 3 341
  • 正文 我和宋清朗相戀三年加袋,在試婚紗的時候發(fā)現(xiàn)自己被綠了凛辣。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,615評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡职烧,死狀恐怖扁誓,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情蚀之,我是刑警寧澤蝗敢,帶...
    沈念sama閱讀 36,303評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站足删,受9級特大地震影響寿谴,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜失受,卻給世界環(huán)境...
    茶點故事閱讀 41,979評論 3 334
  • 文/蒙蒙 一讶泰、第九天 我趴在偏房一處隱蔽的房頂上張望咏瑟。 院中可真熱鬧,春花似錦痪署、人聲如沸码泞。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,470評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽余寥。三九已至,卻和暖如春辜王,著一層夾襖步出監(jiān)牢的瞬間劈狐,已是汗流浹背罐孝。 一陣腳步聲響...
    開封第一講書人閱讀 33,571評論 1 272
  • 我被黑心中介騙來泰國打工呐馆, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人莲兢。 一個月前我還...
    沈念sama閱讀 49,041評論 3 377
  • 正文 我出身青樓汹来,卻偏偏與公主長得像,于是被迫代替她去往敵國和親改艇。 傳聞我的和親對象是個殘疾皇子收班,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,630評論 2 359

推薦閱讀更多精彩內(nèi)容