Android簡單使用netty及谷歌的protobuf

文章主要內(nèi)容

  1. netty TCP NIO服務(wù)端紊搪、客戶端簡單使用
  2. netty UDP NIO使用
  3. 谷歌protobuf使用
  4. netty 結(jié)合谷歌protobuf使用
  5. 只是netty protobuf的使用试伙,更適合Android開發(fā)人員看楞遏,原理那些沒有

protobuf簡介

protobuf 是 Google 用于序列化結(jié)構(gòu)化數(shù)據(jù)的語言,語言中立岁忘、平臺中立决侈、可擴(kuò)展機(jī)制——相對于 XML史飞,更小猫妙、更快瓷翻、更簡單。您只需定義一次數(shù)據(jù)的結(jié)構(gòu)化方式吐咳,然后就可以使用特殊生成的源代碼輕松地將結(jié)構(gòu)化數(shù)據(jù)寫入和讀取各種數(shù)據(jù)流逻悠,并使用各種語言。(翻譯的韭脊,自己去官網(wǎng)看)
個(gè)人理解是用于數(shù)據(jù)壓縮,減少IO數(shù)據(jù)傳輸量单旁。
protobuf 官網(wǎng)
protobuf github
protobuf java編寫文檔

protobuf使用

  1. 編寫.proto結(jié)尾的文件沪羔,如:SocketData.proto,用于生成Java類。
// 真正的實(shí)體類蔫饰,擴(kuò)展數(shù)據(jù)為了通用準(zhǔn)備
// required 必需的
// optional 可選
message SocketData{
  required int32 type = 1;// int 類型琅豆,可以根據(jù)這個(gè)來處理不同的消息
  optional bytes  data = 2; // byte[] 真正的數(shù)據(jù)
  optional string ext_s = 3;// String 擴(kuò)展數(shù)據(jù)
  optional int64  ext_l = 4;// long 擴(kuò)展數(shù)據(jù)
  optional float  ext_f = 5;// float 擴(kuò)展數(shù)據(jù)
  optional string ext_s1 = 6;// String 擴(kuò)展數(shù)據(jù)
  optional int64  ext_l1 = 7;// long 擴(kuò)展數(shù)據(jù)
  optional double  ext_d = 8;// double 擴(kuò)展數(shù)據(jù)
  optional double  ext_d1 = 9;// double 擴(kuò)展數(shù)據(jù)
}
  1. 下載protobuf代碼生成器,找到系統(tǒng)對應(yīng)的版本進(jìn)行下載篓吁,解壓
    protobuf代碼生成器下載地址
    protobuf代碼生成器.png
  2. 生成java代碼
  • 解壓protobuf代碼生成器茫因,將編寫好的SocketData.proto文件復(fù)制到bin目錄下
  • 在cmd執(zhí)行命令: protoc --java_out=lite:./ SocketData.proto
  • 將生成的代碼復(fù)制到項(xiàng)目,即可使用


    proto生成java文件.png
  • protoc -h 查看更多編譯內(nèi)容

netty簡介

Netty 官網(wǎng)
netty github
Netty 是一個(gè) NIO 客戶端服務(wù)器框架杖剪,可以快速輕松地開發(fā)協(xié)議服務(wù)器和客戶端等網(wǎng)絡(luò)應(yīng)用程序冻押。它極大地簡化和流線了網(wǎng)絡(luò)編程,例如 TCP 和 UDP 套接字服務(wù)器盛嘿。(翻譯的洛巢,自己去官網(wǎng)看)
個(gè)人理解,Netty是對java BIO NIO使用的統(tǒng)一封裝次兆,便于快速編寫服務(wù)端和客戶端的網(wǎng)絡(luò)應(yīng)用程序稿茉。

netty使用

netty官方事例

android項(xiàng)目 引入 netty protobuf

android {
    // ...
    // 解決引入報(bào)錯(cuò)
    packagingOptions {
        exclude 'META-INF/*******'
        exclude 'META-INF/INDEX.LIST'
        exclude 'META-INF/io.netty.versions.properties'
    }
}
dependencies {
    // netty
    implementation 'io.netty:netty-transport:4.1.79.Final' // 傳輸相關(guān)
    implementation 'io.netty:netty-codec:4.1.79.Final' // 編解碼相關(guān)
    // 谷歌protobuf
    implementation 'com.google.protobuf:protobuf-javalite:3.21.2'
}
  • 如果有需要可以引入 'io.netty:netty-all:4.1.79.Final' 包含所有

netty protobuf tcp 服務(wù)端

  • EchoServer 服務(wù)端代碼
/**
 * Netty nio 服務(wù)端
 */
public final class EchoServer {
    // 啟動的端口號
    public static int PORT = 10303;
    private static final String tag = "TAG";
    // 啟動的標(biāo)志
    private AtomicBoolean isStart = new AtomicBoolean(false);
    // 管道:主要是讀寫消息
    private Channel channel;
    // 處理接收到的連接、消息等
    private EchoServerHandler serverHandler;
    // 消息接收回調(diào)
    private OnReceiveListener onReceiveListener;
    // 線程數(shù)芥炭,如果是0漓库,Netty底層會根據(jù)CPU核心數(shù)*2進(jìn)行創(chuàng)建
    private int threads;

    public EchoServer(int threads) {
        this.threads = threads;
    }

    public EchoServer() {
        this(0);
    }

    /**
     * 啟動服務(wù)
     */
    public void start() {
        // 防止并發(fā)啟動
        if (isStart.compareAndSet(false, true)) {
            // 使用線程池啟動
            Executors.newSingleThreadExecutor().execute(() -> doStart());
        }

    }

    /**
     * 執(zhí)行啟動服務(wù)的真正代碼
     */
    private void doStart() {
        // 主要處理Accept請求
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 處理其它 Connect Read Write
        EventLoopGroup workerGroup = new NioEventLoopGroup(threads);
    // 處理鏈接及消息
        serverHandler = new EchoServerHandler();
        serverHandler.setOnReceiveListener(onReceiveListener);
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 64)
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            // protobuf 解碼
                            p.addLast(new ProtobufVarint32FrameDecoder());
                            p.addLast(new ProtobufDecoder(SocketData.getDefaultInstance()));
                            // protobuf 編碼
                            p.addLast(new ProtobufVarint32LengthFieldPrepender());
                            p.addLast(new ProtobufEncoder());
                            p.addLast(serverHandler);
                        }
                    });

            // 綁定端口號
            ChannelFuture f = b.bind(PORT).sync();

            // 拿到Channel
            channel = f.channel();

            // 阻塞等待關(guān)閉
            channel.closeFuture().sync();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 關(guān)閉所有事件循環(huán)以終止所有線程
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            // 關(guān)閉Channel
            close();
        }

    }

    /**
     * 判斷Channel是否打開
     */
    private boolean isOpen() {
        return channel != null && channel.isOpen();
    }

    /**
     * 關(guān)閉及回收
     */
    public void close() {
        if (isStart.compareAndSet(true, false)) {
            if (isOpen()) {
                channel.close();
            }
            channel = null;
            serverHandler = null;
        }
    }

    /**
     * 群發(fā)消息,所有客戶端會收到
     */
    public void send(SocketData packet) {
        if (isOpen()) {
            serverHandler.send(packet);
        }
    }
    
    // 消息回調(diào)
    public void setOnReceiveListener(OnReceiveListener listener) {
        this.onReceiveListener = listener;
    }
}
  • EchoServerHandler 服務(wù)端消息處理
/**
 * 服務(wù)端消息處理园蝠,處理客戶端連接及發(fā)過來的消息
 */
@Sharable
public class EchoServerHandler extends SimpleChannelInboundHandler<SocketData> {
    // 接收到客戶端消息回調(diào)
    private OnReceiveListener listener;
    // 保存所有客戶端的連接
    private ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, SocketData msg) throws Exception {
        if (listener != null) {
            listener.onReceive(msg, ctx.channel());
        }
    }
    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        group.add(ctx.channel()); 
    }

    /**
     * 群發(fā)消息
     */
    public void send(Object packet) {
        // 群發(fā)消息
        group.writeAndFlush(packet);
    }


    public void setOnReceiveListener(OnReceiveListener listener) {
        this.listener = listener;
    }
}

start方法是啟動服務(wù)米苹、close是關(guān)閉、send是群發(fā)消息砰琢,其它如果需要蘸嘶,自己編寫即可。

netty protobuf tcp 客戶端

  • EchoClient 客戶端
/**
 * netty nio 客戶端
 */
public final class EchoClient {
    // 啟動的標(biāo)志
    private AtomicBoolean isStart = new AtomicBoolean(false);
    // 服務(wù)端地址
    private InetAddress address;
    // 服務(wù)端端口號
    private int port;
    // 管道:主要是讀寫消息
    private Channel channel;
    // 消息接收回調(diào)
    private OnReceiveListener onReceiveListener;

    /**
     * 連接服務(wù)端端
     *
     * @param address 服務(wù)端地址
     * @param port    服務(wù)端端口號
     */
    public void start(InetAddress address, int port) {
        if (address == null) {
            return;
        }
        this.address = address;
        this.port = port;
        if (isStart.compareAndSet(false, true)) {
            Executors.newSingleThreadExecutor().execute(() -> doStart());
        }
    }

    private void doStart() {
        // 處理所有的事件
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 處理服務(wù)端發(fā)過來的消息陪汽。
            final EchoClientHandler clientHandler = new EchoClientHandler();
            // 處理服務(wù)端發(fā)過來的消息回調(diào)
            clientHandler.setOnReceiveListener(onReceiveListener);
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_REUSEADDR, true)// 地址復(fù)用
                    .option(ChannelOption.SO_KEEPALIVE, true)// 保持連接
                    .option(ChannelOption.TCP_NODELAY, true)// 不緩遲
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            // protobuf 解碼
                            p.addLast(new ProtobufVarint32FrameDecoder());
                            p.addLast(new ProtobufDecoder(SocketData.getDefaultInstance()));
                            // protobuf 編碼
                            p.addLast(new ProtobufVarint32LengthFieldPrepender());
                            p.addLast(new ProtobufEncoder());
                            // 自己的Handler
                            p.addLast(clientHandler);
                        }
                    });

            // 連接服務(wù)端
            ChannelFuture f = b.connect(address, port).sync();
            // 拿到Channel
            channel = f.channel();
            // 阻塞等待連接關(guān)閉
            channel.closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
            close();
        }
    }

    /**
     * 判斷Channel是否打開
     */
    public boolean isOpen() {
        return channel != null && channel.isOpen();
    }

    /**
     * 關(guān)閉及回收
     */
    public void close() {
        if (isStart.compareAndSet(true, false)) {
            if (onStateListener != null) {
                onStateListener.onClose(channel);
            }
            if (isOpen()) {
                channel.close();
            }
            channel = null;
            address = null;
        }
    }

    /**
     * 向服務(wù)端發(fā)發(fā)送消息
     */
    public void send(SocketData packet) {
        if (isOpen()) {
            channel.writeAndFlush(packet);
        }
    }

    /**
     * 設(shè)置消息回調(diào)
     */
    public void setOnReceiveListener(OnReceiveListener onReceiveListener) {
        this.onReceiveListener = onReceiveListener;
    }
}
  • EchoClientHandler 客戶端消息處理
/**
 * 處理客戶端消息
 */
public class EchoClientHandler extends SimpleChannelInboundHandler<SocketData> {
    // 消息回調(diào)
    private OnReceiveListener onReceiveListener;

    /**
     * 接收到服務(wù)端的消息
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, SocketData msg) throws Exception {
        if (onReceiveListener != null) {
            onReceiveListener.onReceive(msg, ctx.channel());
        }
    }

    // 設(shè)置消息回調(diào)
    public void setOnReceiveListener(OnReceiveListener onReceiveListener) {
        this.onReceiveListener = onReceiveListener;
    }
}

netty protobuf udp 代碼

  • NioUdpServer UDP啟動代碼
/**
 * netty UDP代碼
 */
public class NioUdpServer {
    public static final int TYPE_IN = 1000; // 創(chuàng)建服務(wù)训唱,發(fā)個(gè)創(chuàng)建廣播
    public static final int TYPE_OUT = 1001; // 退出服務(wù),發(fā)個(gè)退出廣播
    public static final int TYPE_REC = 1002; // 這個(gè)是發(fā)個(gè)接收到消息的廣播

    public static int PORT = 10505;
    // 管道:主要是讀寫操作
    private Channel channel;
    // 消息回調(diào)
    private OnReceiveListener onReceiveListener;
    // 消息處理
    private NioUdpServerHandler serverHandler;
    private Handler handler = new Handler(Looper.getMainLooper());

    public void start() {
        if (!isOpen()) {
            Executors.newSingleThreadExecutor()
                    .execute(() -> doStart());
        }
    }

    public void setOnReceiveListener(OnReceiveListener onReceiveListener) {
        this.onReceiveListener = onReceiveListener;
    }

    private void doStart() {
        // 處理事件
        EventLoopGroup group = new NioEventLoopGroup(1);
        try {
            Bootstrap bootstrap = new Bootstrap();
            // 消息處理
            serverHandler = new NioUdpServerHandler(this);
            // 消息回調(diào)設(shè)置
            serverHandler.setOnReceiveListener(onReceiveListener);
            bootstrap.group(group)
                    .channel(NioDatagramChannel.class) // udp設(shè)置
                    .option(ChannelOption.SO_BROADCAST, true)// 廣播設(shè)置
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .handler(new ChannelInitializer<DatagramChannel>() {
                        @Override
                        protected void initChannel(DatagramChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            p.addLast(serverHandler);
                        }
                    });
            // 綁定端口
            ChannelFuture future = bootstrap.bind(PORT).sync();
            // 拿到Channel
            channel = future.channel();
            // 發(fā)一個(gè)啟動廣播
            sendBroadcast(TYPE_IN);
            // 阻塞等待關(guān)閉
            channel.closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
            Log.e("TAG", "udp server error " + e.getMessage());
        } finally {
            close();
            // 關(guān)閉
            group.shutdownGracefully();
        }

    }

    /**
     * 發(fā)送廣播
     */
    public void sendBroadcast(int type) {
        sendBroadcast(SocketData.newBuilder()
                .setType(type)
                .setExtS(Build.DEVICE)
                .build());
    }

    /**
     * 發(fā)送廣播
     */
    public void sendBroadcast(SocketData data) {
        if (isOpen()) {
            // 發(fā)送udp包
            DatagramPacket packet = new DatagramPacket(Unpooled.copiedBuffer(data.toByteArray()),
                    new InetSocketAddress("255.255.255.255", PORT));
            channel.writeAndFlush(packet);
        }
    }

    /**
     * 是否打開
     */
    private boolean isOpen() {
        return channel != null && channel.isOpen();
    }

    /**
     * 關(guān)閉回收
     */
    public void close() {
        if (isOpen()) {
            // 發(fā)退出廣播 udp容易丟包挚冤,發(fā)2次沒啥大問題
            sendBroadcast(TYPE_OUT);
            // 發(fā)退出廣播
            handler.postDelayed(() -> sendBroadcast(TYPE_OUT), 100);

            // 真正退出
            handler.postDelayed(() -> {
                channel.close();
                channel = null;
            }, 300);

        }
    }
}
  • NioUdpServer UDP消息處理代碼
/**
 * 這個(gè)是適配器
 */
public class NioUdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {

    private OnReceiveListener onReceiveListener;


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
        try {
            // 發(fā)送端的信息
            InetSocketAddress sender = msg.sender();
            String ip = sender.getHostName();
            // 解析數(shù)據(jù)
            ByteBuf buf = msg.content();
            byte[] data = new byte[buf.readableBytes()];
            buf.readBytes(data);
            // 數(shù)據(jù)轉(zhuǎn)換成 protobuf  SocketData
            SocketData socketData = SocketData.parseFrom(data);
            int type = socketData.getType();
            String name = socketData.getExtS();
            // 消息回調(diào)
            if (onReceiveListener != null) {
                onReceiveListener.onReceive(socketData, ctx.channel());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public void setOnReceiveListener(OnReceiveListener onReceiveListener) {
        this.onReceiveListener = onReceiveListener;
    }

}

只是寫了廣播消息(可以獲取到同一路由下的其它設(shè)置的IP信息)况增,具體向哪個(gè)服務(wù)端發(fā)消息,其實(shí)也一樣训挡。

總結(jié)

android使用Netty的應(yīng)用場景

  • wifi 局域網(wǎng)下澳骤,數(shù)據(jù)傳輸(文件 音視頻數(shù)據(jù)等)
  • wifi p2p,數(shù)據(jù)傳輸(文件 音視頻數(shù)據(jù)等)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末澜薄,一起剝皮案震驚了整個(gè)濱河市为肮,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌肤京,老刑警劉巖颊艳,帶你破解...
    沈念sama閱讀 211,265評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異,居然都是意外死亡棋枕,警方通過查閱死者的電腦和手機(jī)白修,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來重斑,“玉大人兵睛,你說我怎么就攤上這事】耍” “怎么了祖很?”我有些...
    開封第一講書人閱讀 156,852評論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀的道長寒矿。 經(jīng)常有香客問我突琳,道長,這世上最難降的妖魔是什么符相? 我笑而不...
    開封第一講書人閱讀 56,408評論 1 283
  • 正文 為了忘掉前任拆融,我火速辦了婚禮,結(jié)果婚禮上啊终,老公的妹妹穿的比我還像新娘镜豹。我一直安慰自己,他們只是感情好蓝牲,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,445評論 5 384
  • 文/花漫 我一把揭開白布趟脂。 她就那樣靜靜地躺著,像睡著了一般例衍。 火紅的嫁衣襯著肌膚如雪昔期。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,772評論 1 290
  • 那天佛玄,我揣著相機(jī)與錄音硼一,去河邊找鬼。 笑死梦抢,一個(gè)胖子當(dāng)著我的面吹牛般贼,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播奥吩,決...
    沈念sama閱讀 38,921評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼哼蛆,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了霞赫?” 一聲冷哼從身側(cè)響起腮介,我...
    開封第一講書人閱讀 37,688評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎绩脆,沒想到半個(gè)月后萤厅,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體橄抹,經(jīng)...
    沈念sama閱讀 44,130評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡靴迫,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,467評論 2 325
  • 正文 我和宋清朗相戀三年惕味,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片玉锌。...
    茶點(diǎn)故事閱讀 38,617評論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡名挥,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出主守,到底是詐尸還是另有隱情禀倔,我是刑警寧澤,帶...
    沈念sama閱讀 34,276評論 4 329
  • 正文 年R本政府宣布参淫,位于F島的核電站救湖,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏涎才。R本人自食惡果不足惜鞋既,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,882評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望耍铜。 院中可真熱鬧邑闺,春花似錦、人聲如沸棕兼。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,740評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽伴挚。三九已至靶衍,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間茎芋,已是汗流浹背颅眶。 一陣腳步聲響...
    開封第一講書人閱讀 31,967評論 1 265
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留败徊,地道東北人帚呼。 一個(gè)月前我還...
    沈念sama閱讀 46,315評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像皱蹦,于是被迫代替她去往敵國和親煤杀。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,486評論 2 348

推薦閱讀更多精彩內(nèi)容