文章主要內(nèi)容
- netty TCP NIO服務(wù)端紊搪、客戶端簡單使用
- netty UDP NIO使用
- 谷歌protobuf使用
- netty 結(jié)合谷歌protobuf使用
- 只是netty protobuf的使用试伙,更適合Android開發(fā)人員看楞遏,原理那些沒有
protobuf簡介
protobuf 是 Google 用于序列化結(jié)構(gòu)化數(shù)據(jù)
的語言,語言中立岁忘、平臺中立决侈、可擴(kuò)展機(jī)制——相對于 XML史飞,更小猫妙、更快瓷翻、更簡單。您只需定義一次數(shù)據(jù)的結(jié)構(gòu)化方式吐咳,然后就可以使用特殊生成的源代碼輕松地將結(jié)構(gòu)化數(shù)據(jù)寫入和讀取各種數(shù)據(jù)流逻悠,并使用各種語言。(翻譯的韭脊,自己去官網(wǎng)看)
個(gè)人理解是用于數(shù)據(jù)壓縮,減少IO數(shù)據(jù)傳輸量单旁。
protobuf 官網(wǎng)
protobuf github
protobuf java編寫文檔
protobuf使用
- 編寫.proto結(jié)尾的文件沪羔,如:SocketData.proto,用于生成Java類。
// 真正的實(shí)體類蔫饰,擴(kuò)展數(shù)據(jù)為了通用準(zhǔn)備
// required 必需的
// optional 可選
message SocketData{
required int32 type = 1;// int 類型琅豆,可以根據(jù)這個(gè)來處理不同的消息
optional bytes data = 2; // byte[] 真正的數(shù)據(jù)
optional string ext_s = 3;// String 擴(kuò)展數(shù)據(jù)
optional int64 ext_l = 4;// long 擴(kuò)展數(shù)據(jù)
optional float ext_f = 5;// float 擴(kuò)展數(shù)據(jù)
optional string ext_s1 = 6;// String 擴(kuò)展數(shù)據(jù)
optional int64 ext_l1 = 7;// long 擴(kuò)展數(shù)據(jù)
optional double ext_d = 8;// double 擴(kuò)展數(shù)據(jù)
optional double ext_d1 = 9;// double 擴(kuò)展數(shù)據(jù)
}
- 下載protobuf代碼生成器,找到系統(tǒng)對應(yīng)的版本進(jìn)行下載篓吁,解壓
protobuf代碼生成器下載地址
- 生成java代碼
- 解壓protobuf代碼生成器茫因,將編寫好的SocketData.proto文件復(fù)制到bin目錄下
- 在cmd執(zhí)行命令: protoc --java_out=lite:./ SocketData.proto
-
將生成的代碼復(fù)制到項(xiàng)目,即可使用
- protoc -h 查看更多編譯內(nèi)容
netty簡介
Netty 官網(wǎng)
netty github
Netty 是一個(gè) NIO 客戶端服務(wù)器框架
杖剪,可以快速輕松地開發(fā)協(xié)議服務(wù)器和客戶端等網(wǎng)絡(luò)應(yīng)用程序冻押。它極大地簡化和流線了網(wǎng)絡(luò)編程,例如 TCP 和 UDP 套接字服務(wù)器盛嘿。(翻譯的洛巢,自己去官網(wǎng)看)
個(gè)人理解,Netty是對java BIO NIO使用的統(tǒng)一封裝次兆,便于快速編寫服務(wù)端和客戶端的網(wǎng)絡(luò)應(yīng)用程序稿茉。
netty使用
android項(xiàng)目 引入 netty protobuf
android {
// ...
// 解決引入報(bào)錯(cuò)
packagingOptions {
exclude 'META-INF/*******'
exclude 'META-INF/INDEX.LIST'
exclude 'META-INF/io.netty.versions.properties'
}
}
dependencies {
// netty
implementation 'io.netty:netty-transport:4.1.79.Final' // 傳輸相關(guān)
implementation 'io.netty:netty-codec:4.1.79.Final' // 編解碼相關(guān)
// 谷歌protobuf
implementation 'com.google.protobuf:protobuf-javalite:3.21.2'
}
- 如果有需要可以引入 'io.netty:netty-all:4.1.79.Final' 包含所有
netty protobuf tcp 服務(wù)端
- EchoServer 服務(wù)端代碼
/**
* Netty nio 服務(wù)端
*/
public final class EchoServer {
// 啟動的端口號
public static int PORT = 10303;
private static final String tag = "TAG";
// 啟動的標(biāo)志
private AtomicBoolean isStart = new AtomicBoolean(false);
// 管道:主要是讀寫消息
private Channel channel;
// 處理接收到的連接、消息等
private EchoServerHandler serverHandler;
// 消息接收回調(diào)
private OnReceiveListener onReceiveListener;
// 線程數(shù)芥炭,如果是0漓库,Netty底層會根據(jù)CPU核心數(shù)*2進(jìn)行創(chuàng)建
private int threads;
public EchoServer(int threads) {
this.threads = threads;
}
public EchoServer() {
this(0);
}
/**
* 啟動服務(wù)
*/
public void start() {
// 防止并發(fā)啟動
if (isStart.compareAndSet(false, true)) {
// 使用線程池啟動
Executors.newSingleThreadExecutor().execute(() -> doStart());
}
}
/**
* 執(zhí)行啟動服務(wù)的真正代碼
*/
private void doStart() {
// 主要處理Accept請求
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 處理其它 Connect Read Write
EventLoopGroup workerGroup = new NioEventLoopGroup(threads);
// 處理鏈接及消息
serverHandler = new EchoServerHandler();
serverHandler.setOnReceiveListener(onReceiveListener);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 64)
.option(ChannelOption.SO_REUSEADDR, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// protobuf 解碼
p.addLast(new ProtobufVarint32FrameDecoder());
p.addLast(new ProtobufDecoder(SocketData.getDefaultInstance()));
// protobuf 編碼
p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufEncoder());
p.addLast(serverHandler);
}
});
// 綁定端口號
ChannelFuture f = b.bind(PORT).sync();
// 拿到Channel
channel = f.channel();
// 阻塞等待關(guān)閉
channel.closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 關(guān)閉所有事件循環(huán)以終止所有線程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
// 關(guān)閉Channel
close();
}
}
/**
* 判斷Channel是否打開
*/
private boolean isOpen() {
return channel != null && channel.isOpen();
}
/**
* 關(guān)閉及回收
*/
public void close() {
if (isStart.compareAndSet(true, false)) {
if (isOpen()) {
channel.close();
}
channel = null;
serverHandler = null;
}
}
/**
* 群發(fā)消息,所有客戶端會收到
*/
public void send(SocketData packet) {
if (isOpen()) {
serverHandler.send(packet);
}
}
// 消息回調(diào)
public void setOnReceiveListener(OnReceiveListener listener) {
this.onReceiveListener = listener;
}
}
- EchoServerHandler 服務(wù)端消息處理
/**
* 服務(wù)端消息處理园蝠,處理客戶端連接及發(fā)過來的消息
*/
@Sharable
public class EchoServerHandler extends SimpleChannelInboundHandler<SocketData> {
// 接收到客戶端消息回調(diào)
private OnReceiveListener listener;
// 保存所有客戶端的連接
private ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, SocketData msg) throws Exception {
if (listener != null) {
listener.onReceive(msg, ctx.channel());
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
group.add(ctx.channel());
}
/**
* 群發(fā)消息
*/
public void send(Object packet) {
// 群發(fā)消息
group.writeAndFlush(packet);
}
public void setOnReceiveListener(OnReceiveListener listener) {
this.listener = listener;
}
}
start方法是啟動服務(wù)米苹、close是關(guān)閉、send是群發(fā)消息砰琢,其它如果需要蘸嘶,自己編寫即可。
netty protobuf tcp 客戶端
- EchoClient 客戶端
/**
* netty nio 客戶端
*/
public final class EchoClient {
// 啟動的標(biāo)志
private AtomicBoolean isStart = new AtomicBoolean(false);
// 服務(wù)端地址
private InetAddress address;
// 服務(wù)端端口號
private int port;
// 管道:主要是讀寫消息
private Channel channel;
// 消息接收回調(diào)
private OnReceiveListener onReceiveListener;
/**
* 連接服務(wù)端端
*
* @param address 服務(wù)端地址
* @param port 服務(wù)端端口號
*/
public void start(InetAddress address, int port) {
if (address == null) {
return;
}
this.address = address;
this.port = port;
if (isStart.compareAndSet(false, true)) {
Executors.newSingleThreadExecutor().execute(() -> doStart());
}
}
private void doStart() {
// 處理所有的事件
EventLoopGroup group = new NioEventLoopGroup();
try {
// 處理服務(wù)端發(fā)過來的消息陪汽。
final EchoClientHandler clientHandler = new EchoClientHandler();
// 處理服務(wù)端發(fā)過來的消息回調(diào)
clientHandler.setOnReceiveListener(onReceiveListener);
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_REUSEADDR, true)// 地址復(fù)用
.option(ChannelOption.SO_KEEPALIVE, true)// 保持連接
.option(ChannelOption.TCP_NODELAY, true)// 不緩遲
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// protobuf 解碼
p.addLast(new ProtobufVarint32FrameDecoder());
p.addLast(new ProtobufDecoder(SocketData.getDefaultInstance()));
// protobuf 編碼
p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufEncoder());
// 自己的Handler
p.addLast(clientHandler);
}
});
// 連接服務(wù)端
ChannelFuture f = b.connect(address, port).sync();
// 拿到Channel
channel = f.channel();
// 阻塞等待連接關(guān)閉
channel.closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
close();
}
}
/**
* 判斷Channel是否打開
*/
public boolean isOpen() {
return channel != null && channel.isOpen();
}
/**
* 關(guān)閉及回收
*/
public void close() {
if (isStart.compareAndSet(true, false)) {
if (onStateListener != null) {
onStateListener.onClose(channel);
}
if (isOpen()) {
channel.close();
}
channel = null;
address = null;
}
}
/**
* 向服務(wù)端發(fā)發(fā)送消息
*/
public void send(SocketData packet) {
if (isOpen()) {
channel.writeAndFlush(packet);
}
}
/**
* 設(shè)置消息回調(diào)
*/
public void setOnReceiveListener(OnReceiveListener onReceiveListener) {
this.onReceiveListener = onReceiveListener;
}
}
- EchoClientHandler 客戶端消息處理
/**
* 處理客戶端消息
*/
public class EchoClientHandler extends SimpleChannelInboundHandler<SocketData> {
// 消息回調(diào)
private OnReceiveListener onReceiveListener;
/**
* 接收到服務(wù)端的消息
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, SocketData msg) throws Exception {
if (onReceiveListener != null) {
onReceiveListener.onReceive(msg, ctx.channel());
}
}
// 設(shè)置消息回調(diào)
public void setOnReceiveListener(OnReceiveListener onReceiveListener) {
this.onReceiveListener = onReceiveListener;
}
}
netty protobuf udp 代碼
- NioUdpServer UDP啟動代碼
/**
* netty UDP代碼
*/
public class NioUdpServer {
public static final int TYPE_IN = 1000; // 創(chuàng)建服務(wù)训唱,發(fā)個(gè)創(chuàng)建廣播
public static final int TYPE_OUT = 1001; // 退出服務(wù),發(fā)個(gè)退出廣播
public static final int TYPE_REC = 1002; // 這個(gè)是發(fā)個(gè)接收到消息的廣播
public static int PORT = 10505;
// 管道:主要是讀寫操作
private Channel channel;
// 消息回調(diào)
private OnReceiveListener onReceiveListener;
// 消息處理
private NioUdpServerHandler serverHandler;
private Handler handler = new Handler(Looper.getMainLooper());
public void start() {
if (!isOpen()) {
Executors.newSingleThreadExecutor()
.execute(() -> doStart());
}
}
public void setOnReceiveListener(OnReceiveListener onReceiveListener) {
this.onReceiveListener = onReceiveListener;
}
private void doStart() {
// 處理事件
EventLoopGroup group = new NioEventLoopGroup(1);
try {
Bootstrap bootstrap = new Bootstrap();
// 消息處理
serverHandler = new NioUdpServerHandler(this);
// 消息回調(diào)設(shè)置
serverHandler.setOnReceiveListener(onReceiveListener);
bootstrap.group(group)
.channel(NioDatagramChannel.class) // udp設(shè)置
.option(ChannelOption.SO_BROADCAST, true)// 廣播設(shè)置
.option(ChannelOption.SO_REUSEADDR, true)
.handler(new ChannelInitializer<DatagramChannel>() {
@Override
protected void initChannel(DatagramChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(serverHandler);
}
});
// 綁定端口
ChannelFuture future = bootstrap.bind(PORT).sync();
// 拿到Channel
channel = future.channel();
// 發(fā)一個(gè)啟動廣播
sendBroadcast(TYPE_IN);
// 阻塞等待關(guān)閉
channel.closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
Log.e("TAG", "udp server error " + e.getMessage());
} finally {
close();
// 關(guān)閉
group.shutdownGracefully();
}
}
/**
* 發(fā)送廣播
*/
public void sendBroadcast(int type) {
sendBroadcast(SocketData.newBuilder()
.setType(type)
.setExtS(Build.DEVICE)
.build());
}
/**
* 發(fā)送廣播
*/
public void sendBroadcast(SocketData data) {
if (isOpen()) {
// 發(fā)送udp包
DatagramPacket packet = new DatagramPacket(Unpooled.copiedBuffer(data.toByteArray()),
new InetSocketAddress("255.255.255.255", PORT));
channel.writeAndFlush(packet);
}
}
/**
* 是否打開
*/
private boolean isOpen() {
return channel != null && channel.isOpen();
}
/**
* 關(guān)閉回收
*/
public void close() {
if (isOpen()) {
// 發(fā)退出廣播 udp容易丟包挚冤,發(fā)2次沒啥大問題
sendBroadcast(TYPE_OUT);
// 發(fā)退出廣播
handler.postDelayed(() -> sendBroadcast(TYPE_OUT), 100);
// 真正退出
handler.postDelayed(() -> {
channel.close();
channel = null;
}, 300);
}
}
}
- NioUdpServer UDP消息處理代碼
/**
* 這個(gè)是適配器
*/
public class NioUdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
private OnReceiveListener onReceiveListener;
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
try {
// 發(fā)送端的信息
InetSocketAddress sender = msg.sender();
String ip = sender.getHostName();
// 解析數(shù)據(jù)
ByteBuf buf = msg.content();
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data);
// 數(shù)據(jù)轉(zhuǎn)換成 protobuf SocketData
SocketData socketData = SocketData.parseFrom(data);
int type = socketData.getType();
String name = socketData.getExtS();
// 消息回調(diào)
if (onReceiveListener != null) {
onReceiveListener.onReceive(socketData, ctx.channel());
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void setOnReceiveListener(OnReceiveListener onReceiveListener) {
this.onReceiveListener = onReceiveListener;
}
}
只是寫了廣播消息(可以獲取到同一路由下的其它設(shè)置的IP信息)况增,具體向哪個(gè)服務(wù)端發(fā)消息,其實(shí)也一樣训挡。
總結(jié)
android使用Netty的應(yīng)用場景
- wifi 局域網(wǎng)下澳骤,數(shù)據(jù)傳輸(文件 音視頻數(shù)據(jù)等)
- wifi p2p,數(shù)據(jù)傳輸(文件 音視頻數(shù)據(jù)等)