寫一個(gè)丟棄服務(wù)器
世界上最簡單的協(xié)議并非是 hello world ,而是 丟棄 ,這個(gè)協(xié)議丟棄所有收到的數(shù)據(jù)沒有任何返回厕诡。
我們可以直接使用handler實(shí)現(xiàn)
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// Discard the received data silently.
((ByteBuf) msg).release();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
- DiscardServerHandler 繼承了ChannelInboundHandlerAdapter , ChannelInboundHandlerAdapter 是 ChannelInboundHandler的實(shí)現(xiàn)類营勤。
ChannelInboundHandlerAdapter 提供了多個(gè) 可以重寫的事件處理方法灵嫌,當(dāng)前已經(jīng)足夠使用而非自己實(shí)現(xiàn)接口。 - 我們重寫了channelRead() 方法葛作,該方法會(huì)在接收到消息的時(shí)候被調(diào)用寿羞。在該例子中接收到的消息 是ByteBuf。
- 為了實(shí)現(xiàn) DISCARD 協(xié)議赂蠢,這個(gè)handler 忽略收到的消息绪穆,ByteBuf 是一個(gè)引用計(jì)數(shù)對(duì)象,應(yīng)該通過 release() 方法 顯式的被釋放虱岂。handler 有責(zé)任釋放 所有傳遞過來的引用計(jì)數(shù)對(duì)象玖院。通常,這個(gè)方法應(yīng)該像下面那樣重寫
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
// Do something with msg
} finally {
ReferenceCountUtil.release(msg);
}
}
exceptionCaught 會(huì)在 Netty 處理I O 錯(cuò)誤 或者 handler 處理事件時(shí)發(fā)生異常 被調(diào)用第岖。在大多數(shù)情況下难菌,應(yīng)該記錄異常,對(duì)應(yīng)的channel 應(yīng)該關(guān)閉蔑滓。然而 郊酒,我們可以有不同的實(shí)現(xiàn)依賴于我們想怎么處理此異常情況。例如我們發(fā)送一個(gè)待error code 的響應(yīng)键袱,在關(guān)閉鏈接之前燎窘。
目前為止,我們已經(jīng)實(shí)現(xiàn)了一半的DISCARDSERVER , 新建一個(gè)main () 啟動(dòng) handler蹄咖。
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public static void main(String[] args) throws Exception{
int port = 8888 ;
if (args.length >0 ) {
port = Integer.parseInt(args[0]);
}
new DiscardServer(port).run();
}
private void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // 1
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // 2
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 3
.childHandler(new ChannelInitializer<SocketChannel>() { // 4
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // 5
.childOption(ChannelOption.SO_KEEPALIVE, true); // 6
// bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // 7
// wait until the server socket is closed
// you can do that to gracefully
// shutdown you server
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
- NioEventLoopGroup 是一個(gè)處理IO操作的 多線程 event loop荠耽。Netty 提供了多種傳輸?shù)?EventLoopGroup 實(shí)現(xiàn)。 在這個(gè)例子中我們實(shí)現(xiàn)了服務(wù)端應(yīng)用程序比藻,使用了兩個(gè)NioEventLoopGroup 铝量。第一個(gè) boss 用來接受 傳入連接倘屹。第二個(gè)是worker 處理已經(jīng)接受的連接,注冊(cè)已經(jīng)接受的連接到worker慢叨。多少線程被創(chuàng)建纽匙,他們?cè)趺从成涞絼?chuàng)建的channels 取決于EventLoopGroup 的實(shí)現(xiàn),此外他們甚至是能通過構(gòu)造器設(shè)置拍谐。
2.ServerBootstrap 是一個(gè)幫助類 創(chuàng)立 一個(gè)server 烛缔。你能直接通過使用channels ,那是一個(gè)繁雜的過程,大多數(shù)情況下不需要那樣做轩拨。
3.這里我們指定 NioServerSocketChannel 這個(gè)class 來實(shí)例化channel 來接受新鏈接 - ChannelInitializer 是一個(gè)特殊的handler 目的是幫助用戶設(shè)置新的channel 践瓷。最可能的情況是 你想為新的 channel 設(shè)置 channelPipeline ,通過增加一些handler 例如 DiscardServerHandler 提升網(wǎng)絡(luò)應(yīng)用程序。隨著應(yīng)用程序的復(fù)雜亡蓉,你可能在pipeline中 添加更多的handler晕翠,最終抽出匿名類放入 頂級(jí)類。
5.在channel 實(shí)現(xiàn)中可以設(shè)置參數(shù)砍濒。我們寫的是 TCP/IP Server 淋肾。我們?cè)试S設(shè)置 例如 tcpNoDelay 、keepAlive 的socket options爸邢。請(qǐng)參照 ChannelOption 的 api 文檔 和 channel Config 具體實(shí)現(xiàn)樊卓,了解 ChannelOptions 的概況。
6.注意 option 和 childOption, option() 是 NioServerSocketChannel 接受進(jìn)入的連接杠河,childOption() 是被父 ServerChannel 接受的 channels 碌尔。這里是NioSocketChannel
7.綁定端口啟動(dòng)server, 這里我們綁定 所有網(wǎng)卡 8080 端口,我們可以多次調(diào)用bind(), 綁定不同的地址券敌。
觀察接受到的數(shù)據(jù)
現(xiàn)在我們寫了第一個(gè)服務(wù)器唾戚,我們需要測(cè)試它是否工作。最簡單的方式是 使用Telnet 命令陪白,我們可以在 命令行中輸入 telnet localhost 8080。
然而怎么才能知道我們的服務(wù)是正常工作的膳灶,我們不知道因?yàn)檫@是一個(gè) discard server. 你將不會(huì)收到任何回應(yīng)咱士。我們需要修改server 打印收到數(shù)據(jù)。
我們需要修改 DiscardServerHandler 的 channelRead() 方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// Discard the received data silently.
ByteBuf in = (ByteBuf) msg;
try {
while (in.isReadable()) { // 1
System.out.println((char) in.readByte());
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg); // 2
}
}
- 這個(gè)低效的loop 可以被簡化轧钓, System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
- 此外 這里可用 in.release()
寫一個(gè)響應(yīng)Server
目前我們消費(fèi)數(shù)據(jù)沒有任何回應(yīng)序厉,通常一個(gè)服務(wù)應(yīng)該回應(yīng)這個(gè)請(qǐng)求,讓我們學(xué)習(xí)響應(yīng)消息給客服端毕箍,實(shí)現(xiàn)ECHO 協(xié)議弛房,回寫收到的消息。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg); // 1
ctx.flush(); // 2
}
- ChannelHandlerContext 提供了多個(gè)操作 可以讓你去觸發(fā)不同的IO 事件和操作而柑,這里調(diào)用 write(object) 逐字返回寫入的數(shù)據(jù)文捶。我不不用像 DISCARD 那樣 release 荷逞。因?yàn)閚etty 釋放當(dāng)我寫流到網(wǎng)絡(luò)
- ctx.write(Object) 不會(huì)寫消息到網(wǎng)絡(luò)。他會(huì)被緩存 當(dāng)調(diào)用ctx.flush() 會(huì)被 flush粹排。此外簡單的調(diào)用 ctx.writeAndFlush(msg)
寫一個(gè) Time Server
在這個(gè)例子中种远,我們學(xué)習(xí)怎么構(gòu)建和發(fā)送消息,完成時(shí)關(guān)閉連接顽耳。因?yàn)槲覀兒雎允盏降南⒍グl(fā)送消息一旦連接建立坠敷。我們不能用channelRead(),應(yīng)該 重寫 channelActive()。實(shí)現(xiàn)如下
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { // 1
final ByteBuf time = ctx.alloc().buffer(4); // 2
time.writeInt( (int) (System.currentTimeMillis() / 1000L + 2208988800L));
ChannelFuture f = ctx.writeAndFlush(time);// 3
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
assert f == future;
ctx.close();
}
});// 4
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
- channelActive 被調(diào)用在連接建立準(zhǔn)備產(chǎn)生流量時(shí)射富,寫一個(gè)32位整數(shù)代表這個(gè)方法的當(dāng)前時(shí)間膝迎。
2.為了發(fā)送消息,我們需要分配一個(gè)buffer 承載這個(gè)消息胰耗,我們準(zhǔn)備寫一個(gè)32位整數(shù)限次,我們至少要分配一個(gè)4 bytes ,得到當(dāng)前的 ByteBufAllocator 通過 ChannelHandlerContext.alloc(),分配一個(gè)新 buffer.
3.通常我們寫 構(gòu)造好的消息
稍等宪郊,flip在哪掂恕,我們 在NIO 不需要調(diào)用 java.nio.ByteBuffer.flip() ? ByteBuf 沒有 類似的方法,因?yàn)樗袃蓚€(gè)指針弛槐,一個(gè)讀操作懊亡,一個(gè)寫操作。當(dāng)寫的時(shí)候?qū)懼羔樤黾佣x指針不變乎串,寫和讀指針各自 start 和 end店枣。另一點(diǎn)需要注意的是 ChannelHandlerContext.write() (and writeAndFlush()) method 返回ChannelFuture
另一個(gè)需要注意的點(diǎn)是 ChannelHandlerContext.write() 和 writeAndFlush() 返回一個(gè) ChannelFuture ,ChannelFuture 代表一個(gè)還未發(fā)生的I/O 操作叹誉,因?yàn)檫@個(gè)是異步的鸯两,例如 消息尚未被發(fā)送可能連接已經(jīng)關(guān)閉了。
Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
因此我們應(yīng)該在 ChannelFuture 被完成以后調(diào)用close() .操作完成完成后會(huì)通知 listener ,請(qǐng)注意 close() 方法不會(huì)立即關(guān)閉連接长豁,他返回一個(gè) ChannelFuture钧唐。
- 我們?cè)趺吹玫酵ㄖ谝粋€(gè)寫請(qǐng)求結(jié)束時(shí),我們可以在 返回的ChannelFuture上加一個(gè) ChannelFutureListener 操作完成是關(guān)閉 channel匠襟《巯溃或者直接使用 預(yù)定義的 f.addListener(ChannelFutureListener.CLOSE);
寫一個(gè)時(shí)間客戶端
不像DISCARD 和ECHO 服務(wù)端, 我們需要一個(gè)時(shí)間客戶端翻譯int 成為日期.
用netty 實(shí)現(xiàn)客戶端和服務(wù)端的最大不同是使用的 Bootstrap 和 Channel 實(shí)現(xiàn)
public class TimeClient {
public static void main(String[] args) throws Exception{
String host = args[0];
int port = Integer.parseInt(args[1]);
EventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // 1
b.group(worker); // 2
b.channel(NioSocketChannel.class); // 3
b.option(ChannelOption.SO_KEEPALIVE, true); // 4
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// start the client
ChannelFuture f = b.connect(host, port).sync();// 5
// wait until the connection is closed
f.channel().closeFuture().sync();
} finally {
worker.shutdownGracefully();
}
}
}
- Bootstrap 是一個(gè)類似 ServerBootstrap , 為 無連接 或者 客戶端 Channel 使用。
- 我們只定義一個(gè)EventLoopGroup ,他將被用作 boss 和 worker group酸舍,盡管boss worker 在客戶端未被使用帅韧。
- 使用NioSocketChannel 替換 NioServerSocketChannel 生成客戶端 channel .
- 注意 我們沒有使用childOption() ,因?yàn)榭蛻舳?SocketChannel 沒有 父級(jí)。
- 我們用 connect() 方法替換 bind() 方法
正如你所看到的啃勉,和服務(wù)端的代碼差別不大忽舟,ChannelHandler 是怎么實(shí)現(xiàn)的,接受32位整數(shù)翻譯成可讀的日期,打印翻譯時(shí)間 關(guān)閉連接
class TimeClientHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf m = (ByteBuf) msg; // 1
try {
long time = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(time));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
- 在 TCP/IP 協(xié)議中叮阅,Netty將讀到數(shù)據(jù)轉(zhuǎn)為 ByteBuf
這個(gè)看起來 非常簡單刁品,和服務(wù)端例子看來沒什么不同,然而 這個(gè)handler 有時(shí)會(huì)拒絕工作 拋出 IndexOutOfBoundsException 異常
處理基于流的的協(xié)議
一個(gè)小的 Socket Buffer 備注
在基于流傳輸?shù)膮f(xié)議 例如TCP/IP 會(huì)把接受到數(shù)據(jù)放在緩沖區(qū)帘饶,不幸的是基于流的傳輸是基于字節(jié)的隊(duì)列而非包的隊(duì)列哑诊,甚至你發(fā)送的消息是兩個(gè)對(duì)立的包 但是操作系統(tǒng)把他們當(dāng)做一堆字節(jié),不能保證你收到的一定是遠(yuǎn)端寫的及刻,假設(shè)操作系統(tǒng)發(fā)送了三個(gè)包
有很大的概率應(yīng)用收到 如下的 片端
因此镀裤,作為接受端,無論是客戶還是服務(wù)端缴饭,應(yīng)該整理收到的數(shù)據(jù)轉(zhuǎn)換為1個(gè)或多個(gè)有意義的片段 暑劝,在上面的例子中,接受到的數(shù)據(jù)應(yīng)該分割成如下
第一個(gè)方案
一個(gè)簡單的方法是創(chuàng)建內(nèi)部增長的buffer颗搂,直到所有的四個(gè)字節(jié)被內(nèi)部緩沖收到担猛,下面的TimeClientHandler 解決了這個(gè)問題。
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private ByteBuf buf;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
buf = ctx.alloc().buffer(4); // 1
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
buf.release(); // 1
buf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf m = (ByteBuf) msg;
buf.writeBytes(m); // 2
m.release();
if (buf.readableBytes() > 4) { // 3
long time = (buf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(time));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
- 一個(gè)ChannelHandler 可以有2個(gè)生命周期函數(shù)丢氢,1個(gè)是handlerAdded() 另一個(gè)是handlerRemoved()傅联,可以執(zhí)行一個(gè)任意任務(wù)只要不阻塞太長時(shí)間。
- 所有的數(shù)據(jù)都被積累轉(zhuǎn)入buf
- Handler 必須檢測(cè)buf 有充足的數(shù)據(jù)疚察,如例子中的四字節(jié)蒸走,進(jìn)行實(shí)際的業(yè)務(wù)邏輯,否則Netty 將會(huì)在收到更多數(shù)據(jù)的時(shí)候再次調(diào)用channelRead() 貌嫡,最后所有的四個(gè)字節(jié)都被收集
第二個(gè)方案
盡管第一個(gè)解決方案解決了這個(gè)問題比驻,改動(dòng)不夠簡潔,想象一個(gè)復(fù)雜的協(xié)議有個(gè)不同的長度的復(fù)雜字段岛抄,我們的handler 不會(huì)被很好的維護(hù)别惦。
我們可能想ChannelPipeline 中添加超過1個(gè)的ChannelHandler ,我們分割一個(gè)巨大的ChannelHandler 成多個(gè)模塊化的handler 降低復(fù)雜度,TimeClientHandler可以拆分為兩個(gè)夫椭。
- TimeDecoder 解決分包問題
- TimeClientHandler最簡單版本
幸運(yùn)的是掸掸,Netty 提供了可擴(kuò)展的類,第一步我們可以開箱即用
public class TimeDecoder extends ByteToMessageDecoder { //1
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { //2
if (in.readableBytes() < 4) { // 3
return;
}
out.add(in.readBytes(4)); // 4
}
}
- ByteToMessageDecoder是ChannelInboundHandler 的一個(gè)實(shí)現(xiàn)蹭秋,易于解決分包問題扰付。
- ByteToMessageDecoder 當(dāng)收到信息時(shí)調(diào)用decode() 方法 內(nèi)部維護(hù)一個(gè)可增長的buffer。
- 當(dāng)buffer沒有收到足夠數(shù)據(jù)時(shí)直接返回不作處理,當(dāng)收到更多消息時(shí)會(huì)再次調(diào)用 decode() 方法
- 如果decode() 添加一個(gè)對(duì)象到 out 上感凤,意味著decoder 成功解碼了一個(gè)消息悯周,ByteToMessageDecoder 會(huì)拋棄緩沖區(qū)中已讀部分粒督,請(qǐng)記住你不需要解析多個(gè)消息陪竿,ByteToMessageDecoder 會(huì)decode 并加入out,直到?jīng)]有數(shù)據(jù)。
我們需要將另一個(gè)Handler 插入 ChannelPipeline, 我們需要修改ChannelInitializer 實(shí)現(xiàn)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
}
});
如果你是一個(gè)富有冒險(xiǎn)精神的族跛,你可以嘗試使用ReplayingDecoder 進(jìn)一步簡化這個(gè) decoder ,你需要參考API 文檔闰挡。
public class TimeDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(
ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(4));
}
}
此外,Netty 提供了大量的開箱即用的解碼器
- io.netty.example.factorial 二進(jìn)制協(xié)議
- io.netty.example.telnet 基于文本的協(xié)議
使用POJO替換 ByteBuf
在應(yīng)用中使用POJO的優(yōu)勢(shì)是很明顯的礁哄,Handler是可維護(hù)和可復(fù)用的 將數(shù)據(jù) ByteBuf 抽取出來长酗,我們用ByteBuf 僅僅是讀取一個(gè)32位整數(shù), 分割是非常有必要的實(shí)現(xiàn)真實(shí)的協(xié)議桐绒。
第一步創(chuàng)建一個(gè)POJO
public class UnixTime {
private Long value;
public UnixTime(Long value) {
this.value = value;
}
public UnixTime() {
this(System.currentTimeMillis() / 1000L + 2208988800L);
}
public Long value() {
return value;
}
public void setValue(Long value) {
this.value = value;
}
@Override
public String toString() {
return new Date((value() - 2208988800L) * 1000L).toString();
}
}
TimeDecoder 使用 UnixTime 替換 ByteBuf
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { //2
if (in.readableBytes() < 4) { // 3
return;
}
out.add(new UnixTime(in.readUnsignedInt())); // 4
}
隨著 decoder 升級(jí) TimeClientHandler 更新
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
UnixTime m = (UnixTime) msg; // 1
System.out.println(m);
ctx.close();
}
是不是優(yōu)雅簡單很多夺脾,服務(wù)端使用相同的技術(shù) 更新TimeServerHandler
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { // 1
ChannelFuture f = ctx.writeAndFlush(new UnixTime());// 3
f.addListener(ChannelFutureListener.CLOSE);
}
唯一剩下的一塊是 encoder 是 ChannelOutboundHandler 的實(shí)現(xiàn),翻譯UnixTime 成 ByteBuf ,它比decode 簡單茉继,因?yàn)樗恍枰鸢M裝
public class TimeEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
UnixTime m = (UnixTime) msg;
ByteBuf encoded = ctx.alloc().buffer(4);
encoded.writeInt((int) m.value());
ctx.write(encoded, promise); //1
}
}
- 我們使用 ChannelPromise 咧叭,在我們編碼 數(shù)據(jù) 寫入網(wǎng)絡(luò)時(shí),netty 讓它變?yōu)槌晒蚴烁竭。?br> 我們不調(diào)用 ctx.flush() 菲茬,這個(gè)handler 有一個(gè) flush() 方法,可以重寫 flush操作
我們可以進(jìn)一步簡化 派撕,利用 MessageToByteEncoder
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
@Override
protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
out.writeInt((int)msg.value());
}
}
最后一個(gè)任務(wù) TimeEncoder 插入 ChannelPipeline 在 TimeServerHandler的左側(cè)
關(guān)閉應(yīng)用程序
Netty 關(guān)閉應(yīng)用程序 EventLoopGroup調(diào)用 shutdownGracefully婉弹,所有的 EventLoopGroup 和 所有屬于group 的 Channel也被關(guān)閉