基于4.1.10.Final
目前為止踩坑最多一喘,踩netty之前嗜暴,要先踩Java NIO闷沥,因為netty還是基于Java NIO的api開發(fā)的狐赡,事件模型什么的需要有基礎疟丙,這只是一個初步的研究享郊,畢竟只是出于興趣炊琉,比較好的坑在這里又活。Java NIO 系列教程柳骄。
netty的基本組件與各組件功能耐薯,netty核心就不介紹了曲初,網(wǎng)上各種大牛的源碼分析臼婆,認真看完基本就通了。本文是記錄踩坑故响,不是教學被去。思路按著這個思維導圖來走,實現(xiàn)一下簡單的功能糜值。
1.字符串傳輸寂汇。
netty是端對端的傳輸骄瓣,最簡單的可以使用嵌套字傳輸,基本功能就是hello word畔勤。假定一個場景庆揪,有一個服務端一直開著接收字符串缸榛,客戶端想發(fā)送字符串到服務端内颗,怎么做?
首先明確一點恨溜,Netty中的消息傳遞,都必須以字節(jié)的形式负懦,以ByeBuff為載體傳遞筒捺。簡單的說,就是你想直接寫個字符串過去纸厉,不行系吭,收到都是亂碼,雖然Netty定義的writer的接口參數(shù)是Object的颗品,這就是比較坑的地方了肯尺。
有了這個分析,就有思路了躯枢。
客戶端就這么發(fā):
ByteBuf buffer = Unpooled.copiedBuffer(msg, Charset.forName("UTF-8"));
ChannelFuture future = ctx.writeAndFlush(buffer);
future.addListener(f -> ctx.close());
服務器這么收:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf m = (ByteBuf) msg;
String message = m.toString(CharsetUtil.UTF_8);
System.out.println(message);
ctx.close();
}
是的可以直接強轉,原理不明锄蹂,不知道哪里進行了裝箱氓仲。---------- 遺留問題1
但是每次都要這么寫是不是有點麻煩?不是有定義好的編碼與解碼器嗎得糜,那么就可以先加一下處理敬扛,兩邊都這么加
new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new StringDecoder());
ch.pipeline().addLast("encoder", new StringEncoder());
ch.pipeline().addLast(new ServerHandler());
}
}
對的直接放上去,編碼與解碼不用關心順序朝抖,處理類放在最后就好了啥箭。
客戶端:
ChannelFuture future = ctx.writeAndFlush("hello world");
future.addListener(f -> ctx.close());
服務器端
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
ctx.close();
}
對,直接就是收到一個String治宣。至此字符串就可以互相傳遞了急侥,但是還有問題砌滞,netty中在傳送字符串的長度有限制,超過1024個字節(jié)就截斷了坏怪,導致接收的信息不完整贝润,ok要這么處理一下。
new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
ch.pipeline().addLast("encoder", new LengthFieldPrepender(4, false));
ch.pipeline().addLast(new ServerHandler());
}
自定義長度陕悬, 4個字節(jié)32位题暖,足夠存了。
2.傳遞簡單對象
有幾種方法可以實現(xiàn)對象的傳遞捉超,這里用的是protocol buffers編解碼器進行對象傳遞胧卤。
首先要了解什么是protocol buffers,這東西就相當于xsd對于xml拼岳,是一個規(guī)則文件枝誊,通過.proto文件通過官方提供的工具就可以生成java類,他有兩個常用方法
public byte[] toByteArray(){};
T parseFrom(byte[]){};
他提供了序列化方法惜纸,直接把類轉化為字節(jié)數(shù)組叶撒,再把數(shù)據(jù)轉為java類,十分方便耐版。netty是天生支持這種序列化方式的
服務器端:
@Override
protected void initChannel(SocketChannel ch) throws Exception {
/**
* 采用Base 128 Varints進行編碼祠够,在消息頭上加上32個整數(shù),來標注數(shù)據(jù)的長度粪牲。
*/
ch.pipeline().addLast("protobufVarint32FrameDecoder", new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast("protobufDecoder", new ProtobufDecoder(AddressBookProtos.AddressBook.getDefaultInstance()));
/**
* 對采用Base 128 Varints進行編碼的數(shù)據(jù)解碼
*/
ch.pipeline().addLast("protobufVarint32LengthFieldPrepender", new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast("protobufEncoder", new ProtobufEncoder());
ch.pipeline().addLast(new ServerHandler());
}
增加已經提供了的解碼古瓤、編碼器,在業(yè)務處理的handle中可以這么拿數(shù)據(jù)腺阳。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
AddressBookProtos.AddressBook addressBookProtos = (AddressBookProtos.AddressBook)msg;
List<AddressBookProtos.Person> list = addressBookProtos.getPeopleList();
}
任然是可以直接強轉成目標對象落君。然后獲取里面的成員變量。
客戶端:
在管道里加上那4個編碼亭引、解碼器绎速。然后在業(yè)務代碼中這樣定義數(shù)據(jù)并且直接塞到ctx中就可以了。其余的根本不用操心焙蚓,都封裝好了纹冤,我們只需要關心自己的業(yè)務實現(xiàn)。
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
AddressBookProtos.AddressBook pb = AddressBookProtos.AddressBook.newBuilder()
.addPeople(
AddressBookProtos.Person.newBuilder().setEmail("345@qq.com").setId(34).setName("zhangsn")
)
.addPeople(
AddressBookProtos.Person.newBuilder().setEmail("123@163.com").setId(12).setName("lisi")
)
.build();
ChannelFuture future = ctx.writeAndFlush(pb);
future.addListener(f -> ctx.close());
}
3.使用http協(xié)議
Netty對http協(xié)議有自己的抽象购公,把一個FullHttpRequest抽象成了HttpRequest赵哲、HttpContent、LastHttpContent君丁。生成一個http request也有點不同。例子演示了将宪,客戶端發(fā)送http請求绘闷,服務端接收并發(fā)送http響應到客戶橡庞,客戶端接收響應之后斷開連接。
服務端:
new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
/*
* https部分
File certificate = new File("/Users/public.crt"); // 證書
File privateKey = new File("/Users/private.pem"); // 私鑰
final SslContext context = SslContextBuilder.forServer(certificate, privateKey).build();
SSLEngine engine = context.newEngine(ch.alloc());
ch.pipeline().addLast(new SslHandler(engine));
*/
// ch.pipeline().addLast("decoder", new HttpRequestDecoder());
// ch.pipeline().addLast("encoder", new HttpResponseEncoder());
ch.pipeline().addLast(new HttpServerCodec()); //等于上面那兩個
ch.pipeline().addLast(new HttpObjectAggregator(512 * 1024)); //聚合把頭部和content聚合在了一起
ch.pipeline().addLast(new HttpServiceHandle());
}
}
如果不使用聚合印蔗,那么在接收的時候會多次觸發(fā)read方法扒最,第一次接收HttpRequest,之后接收HttpContent內容华嘹。使用聚合HttpServerCodec之后吧趣,接收的參數(shù)即有HttpRequest也有HttpContent。
客戶端發(fā)送與接收:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
URI uri = new URI("http://127.0.0.1:8889");
String msg = "Message from client";
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST,
uri.toASCIIString(), Unpooled.wrappedBuffer(msg.getBytes("UTF-8")));
// 構建http請求
request.headers().set(HttpHeaders.Names.HOST, "127.0.0.1");
request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, request.content().readableBytes());
// 發(fā)送http請求
ctx.writeAndFlush(request);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg;
System.out.println("CONTENT_TYPE:" + response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
}
if(msg instanceof HttpContent) {
HttpContent content = (HttpContent)msg;
ByteBuf buf = content.content();
System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8));
buf.release();
}
}
服務端接收:
private HttpRequest request;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//http 如果請求沒有聚合耙厚,則分段傳輸過來
if (msg instanceof HttpRequest) {
request = (HttpRequest) msg;
String uri = request.uri();
System.out.println("Uri:" + uri);
}
if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
ByteBuf buf = content.content();
System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8));
buf.release();
String res = "response from server";
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1,
OK, Unpooled.wrappedBuffer(res.getBytes("UTF-8")));
response.headers().set(CONTENT_TYPE, "text/plain");
response.headers().set(CONTENT_LENGTH, response.content().readableBytes());
if (HttpHeaders.isKeepAlive(request)) {
response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
ctx.write(response);
ctx.flush();
}
}
4.心跳
這個東西應該是與HTTP長連接或者是websocket一起的强挫,這里獨立出來了。
服務端:
{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS));
ch.pipeline().addLast(new WebSocketServiceHandle());
}
}
handle薛躬,如果10秒內沒有觸發(fā)讀俯渤,那么就會觸發(fā)userEventTriggered方法。
int dealTime = 0;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
InetSocketAddress socketAddress = (InetSocketAddress)ctx.channel().remoteAddress();
String ip = socketAddress.getHostName() + ":" + socketAddress.getPort();
ByteBuf byteBuf = (ByteBuf)msg;
String message = byteBuf.toString(CharsetUtil.UTF_8);
System.out.println(ip + ":" + message);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (dealTime == 2){
System.out.println("關嘍");
ctx.channel().close();
}
dealTime++;
String recall = "are you alive?" ;
ByteBuf buffer = Unpooled.copiedBuffer(recall, Charset.forName("UTF-8"));
ctx.writeAndFlush(buffer);
super.userEventTriggered(ctx, evt);
}
客戶端:就是一個簡單的應該型宝,連接上之后什么也不干八匠,干等10秒,等待服務端發(fā)來詢問趴酣。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf)msg;
String message = byteBuf.toString(CharsetUtil.UTF_8);
System.out.println("message from service: " + message);
String recall = "hello service i am alive";
ByteBuf buffer = Unpooled.copiedBuffer(recall, Charset.forName("UTF-8"));
ctx.writeAndFlush(buffer);
}
文件讀寫梨树、websocket、最終demo岖寞。抡四。咕咕咕
相關連接:
[netty]--最通用TCP黏包解決方案:LengthFieldBasedFrameDecoder和LengthFieldPrepender
Protocol Buffer的基本使用(六)
Protocol Buffer 語法(syntax)