如何使用Socket在客戶端實現(xiàn)長連接

? 長連接貌似是一個很高深莫測的知識,但是只要你做直播、IM饲化、游戲吃靠、彈幕里面的任何一種巢块,或者是你的app想要實時的接收某些消息,你就會要接觸到長連接技術(shù)越走。本文主要教你如何在客戶端如何使用Socket實現(xiàn)長連接廊敌。

Socket背景知識

?? 要做長連接的話,是不能用http協(xié)議來做的肋殴,因為http協(xié)議已經(jīng)是應用層協(xié)議了护锤,并且http協(xié)議是無狀態(tài)的,而我們要做長連接修陡,肯定是需要在應用層封裝自己的業(yè)務(wù)魄鸦,所以就需要基于TCP協(xié)議來做旺罢,而基于TCP協(xié)議的話扁达,就要用到Socket了。

Socket是java針對tcp層通信封裝的一套網(wǎng)絡(luò)方案

TCP協(xié)議我們知道叉讥,是基于ip(或者域名)和端口對指定機器進行的點對點訪問图仓,他的連接成功有兩個條件,就是對方ip可以到達和端口是開放的

Socket能幫完成TCP三次握手六孵,而應用層的頭部信息需要自己去解析,也就是說,自己要制定好協(xié)議懂拾,并且要去解析byte

Socket使用方式

Socket看上去不是很好用,因為他是基于java.io來實現(xiàn)的唐断,你要直接跟InputStream和OutputStream打交道恳啥,也就是直接跟byte[]打交道钝的,所以用起來并不是這么友好硝桩。

下面通過一個簡單的例子,往一臺服務(wù)器發(fā)\01 \00 \00 \00 \00這一串字節(jié)衙伶,服務(wù)器也返回相同的字節(jié)流,上代碼

? ? public void testSocketChannelBlock() throws Exception {

? ? ? ? final SocketChannel channel = SocketChannel.open(address);

? ? ? ? ByteBuffer output = ByteBuffer.allocate(5);

? ? ? ? output.put((byte) 1);

? ? ? ? output.putInt(0);

? ? ? ? output.flip();

? ? ? ? channel.write(output);

? ? ? ? logger.debug("write complete, start read");

? ? ? ? ByteBuffer input = ByteBuffer.allocate(5);

? ? ? ? int readByte = channel.read(input);

? ? ? ? logger.debug("readByte " + readByte);

? ? ? ? input.flip();

? ? ? ? if (readByte == -1) {

? ? ? ? ? ? logger.debug("readByte == -1, return!");

? ? ? ? ? ? return;

? ? ? ? }

? ? ? ? for (int i = 0; i < readByte; i++) {

? ? ? ? ? ? logger.debug("read [" + i + "]:" + input.get());

? ? ? ? }

? ? }

Selector

我們知道,傳統(tǒng)io是阻塞的花嘶,也就是說,一個線程只能處理一個io流隘击,也就是一個Socket埋同。有了Selector之后,一個線程就能處理多個SocketChannel虱肄。

Selector的原理是斟或,他能接受多個SocketChannel,然后不斷的遍歷每一個Channel的狀態(tài)平斩,如果有Channel已經(jīng)ready了绘面,他就能通過他自身提供的方法,通知到線程瘦馍,讓線程去處理對應的業(yè)務(wù)。流程圖如下:


Netty對nio這一套有比較好的封裝,里面就涉及到了Selector,

Netty 優(yōu)點

1.并發(fā)高

2.傳輸快

3.封裝好

(1)Netty為什么并發(fā)高

Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)開發(fā)的網(wǎng)絡(luò)通信框架捐凭,對比于BIO(Blocking I/O,阻塞IO)凳鬓,他的并發(fā)性能得到了很大提高柑营,兩張圖讓你了解BIO和NIO的區(qū)別:



從這兩圖可以看出,NIO的單線程能處理連接的數(shù)量比BIO要高出很多村视,而為什么單線程能處理更多的連接呢?原因就是圖二中出現(xiàn)的Selector酒奶。

當一個連接建立之后蚁孔,他有兩個步驟要做,第一步是接收完客戶端發(fā)過來的全部數(shù)據(jù)杠氢,第二步是服務(wù)端處理完請求業(yè)務(wù)之后返回response給客戶端站刑。NIO和BIO的區(qū)別主要是在第一步。

在BIO中鼻百,等待客戶端發(fā)數(shù)據(jù)這個過程是阻塞的绞旅,這樣就造成了一個線程只能處理一個請求的情況,而機器能支持的最大線程數(shù)是有限的温艇,這就是為什么BIO不能支持高并發(fā)的原因因悲。

而NIO中,當一個Socket建立好之后勺爱,Thread并不會阻塞去接受這個Socket晃琳,而是將這個請求交給Selector,Selector會不斷的去遍歷所有的Socket琐鲁,一旦有一個Socket建立完成卫旱,他會通知Thread,然后Thread處理完數(shù)據(jù)再返回給客戶端——這個過程是不阻塞的围段,這樣就能讓一個Thread處理更多的請求了顾翼。

下面兩張圖是基于BIO的處理流程和netty的處理流程,輔助你理解兩種方式的差別:



除了BIO和NIO之外奈泪,還有一些其他的IO模型适贸,下面這張圖就表示了五種IO模型的處理流程:


BIO,同步阻塞IO段磨,阻塞整個步驟取逾,如果連接少,他的延遲是最低的苹支,因為一個線程只處理一個連接砾隅,適用于少連接且延遲低的場景,比如說數(shù)據(jù)庫連接债蜜。

NIO晴埂,同步非阻塞IO,阻塞業(yè)務(wù)處理但不阻塞數(shù)據(jù)接收寻定,適用于高并發(fā)且處理簡單的場景儒洛,比如聊天軟件。

多路復用IO狼速,他的兩個步驟處理是分開的琅锻,也就是說,一個連接可能他的數(shù)據(jù)接收是線程a完成的,數(shù)據(jù)處理是線程b完成的恼蓬,他比BIO能處理更多請求惊完。

信號驅(qū)動IO,這種IO模型主要用在嵌入式開發(fā)处硬,不參與討論小槐。

異步IO,他的數(shù)據(jù)請求和數(shù)據(jù)處理都是異步的荷辕,數(shù)據(jù)請求一次返回一次凿跳,適用于長連接的業(yè)務(wù)場景。

(2)Netty為什么傳輸快

Netty的傳輸快其實也是依賴了NIO的一個特性——零拷貝疮方。我們知道控嗜,Java的內(nèi)存有堆內(nèi)存、棧內(nèi)存和字符串常量池等等案站,其中堆內(nèi)存是占用內(nèi)存空間最大的一塊躬审,也是Java對象存放的地方,一般我們的數(shù)據(jù)如果需要從IO讀取到堆內(nèi)存蟆盐,中間需要經(jīng)過Socket緩沖區(qū)承边,也就是說一個數(shù)據(jù)會被拷貝兩次才能到達他的的終點,如果數(shù)據(jù)量大石挂,就會造成不必要的資源浪費博助。

Netty針對這種情況,使用了NIO中的另一大特性——零拷貝痹愚,當他需要接收數(shù)據(jù)的時候富岳,他會在堆內(nèi)存之外開辟一塊內(nèi)存,數(shù)據(jù)就直接從IO讀到了那塊內(nèi)存中去拯腮,在netty里面通過ByteBuf可以直接對這些數(shù)據(jù)進行直接操作窖式,從而加快了傳輸速度。


傳統(tǒng)數(shù)據(jù)拷貝


零拷貝

(3)為什么說Netty封裝好动壤?

阻塞I/O

public class PlainOioServer {

? ? public void serve(int port) throws IOException {

? ? ? ? final ServerSocket socket = new ServerSocket(port);? ? //1

? ? ? ? try {

? ? ? ? ? ? for (;;) {

? ? ? ? ? ? ? ? final Socket clientSocket = socket.accept();? ? //2

? ? ? ? ? ? ? ? System.out.println("Accepted connection from " + clientSocket);

? ? ? ? ? ? ? ? new Thread(new Runnable() {? ? ? ? ? ? ? ? ? ? ? ? //3

? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? public void run() {

? ? ? ? ? ? ? ? ? ? ? ? OutputStream out;

? ? ? ? ? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? ? ? ? ? out = clientSocket.getOutputStream();

? ? ? ? ? ? ? ? ? ? ? ? ? ? out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8")));? ? ? ? ? ? ? ? ? ? ? ? ? ? //4

? ? ? ? ? ? ? ? ? ? ? ? ? ? out.flush();

? ? ? ? ? ? ? ? ? ? ? ? ? ? clientSocket.close();? ? ? ? ? ? ? ? //5

? ? ? ? ? ? ? ? ? ? ? ? } catch (IOException e) {

? ? ? ? ? ? ? ? ? ? ? ? ? ? e.printStackTrace();

? ? ? ? ? ? ? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? clientSocket.close();

? ? ? ? ? ? ? ? ? ? ? ? ? ? } catch (IOException ex) {

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // ignore on close

? ? ? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? }).start();? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //6

? ? ? ? ? ? }

? ? ? ? } catch (IOException e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? }

? ? }

}


非阻塞IO

public class PlainNioServer {

? ? public void serve(int port) throws IOException {

? ? ? ? ServerSocketChannel serverChannel = ServerSocketChannel.open();

? ? ? ? serverChannel.configureBlocking(false);

? ? ? ? ServerSocket ss = serverChannel.socket();

? ? ? ? InetSocketAddress address = new InetSocketAddress(port);

? ? ? ? ss.bind(address);? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //1

? ? ? ? Selector selector = Selector.open();? ? ? ? ? ? ? ? ? ? ? ? //2

? ? ? ? serverChannel.register(selector, SelectionKey.OP_ACCEPT);? ? //3

? ? ? ? final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());

? ? ? ? for (;;) {

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? selector.select();? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //4

? ? ? ? ? ? } catch (IOException ex) {

? ? ? ? ? ? ? ? ex.printStackTrace();

? ? ? ? ? ? ? ? // handle exception

? ? ? ? ? ? ? ? break;

? ? ? ? ? ? }

? ? ? ? ? ? Set<SelectionKey> readyKeys = selector.selectedKeys();? ? //5

? ? ? ? ? ? Iterator<SelectionKey> iterator = readyKeys.iterator();

? ? ? ? ? ? while (iterator.hasNext()) {

? ? ? ? ? ? ? ? SelectionKey key = iterator.next();

? ? ? ? ? ? ? ? iterator.remove();

? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? if (key.isAcceptable()) {? ? ? ? ? ? ? ? //6

? ? ? ? ? ? ? ? ? ? ? ? ServerSocketChannel server =

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? (ServerSocketChannel)key.channel();

? ? ? ? ? ? ? ? ? ? ? ? SocketChannel client = server.accept();

? ? ? ? ? ? ? ? ? ? ? ? client.configureBlocking(false);

? ? ? ? ? ? ? ? ? ? ? ? client.register(selector, SelectionKey.OP_WRITE |

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? SelectionKey.OP_READ, msg.duplicate());? ? //7

? ? ? ? ? ? ? ? ? ? ? ? System.out.println(

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? "Accepted connection from " + client);

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? if (key.isWritable()) {? ? ? ? ? ? ? ? //8

? ? ? ? ? ? ? ? ? ? ? ? SocketChannel client =

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? (SocketChannel)key.channel();

? ? ? ? ? ? ? ? ? ? ? ? ByteBuffer buffer =

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? (ByteBuffer)key.attachment();

? ? ? ? ? ? ? ? ? ? ? ? while (buffer.hasRemaining()) {

? ? ? ? ? ? ? ? ? ? ? ? ? ? if (client.write(buffer) == 0) {? ? ? ? //9

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? break;

? ? ? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ? ? client.close();? ? ? ? ? ? ? ? ? ? //10

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? } catch (IOException ex) {

? ? ? ? ? ? ? ? ? ? key.cancel();

? ? ? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? ? ? key.channel().close();

? ? ? ? ? ? ? ? ? ? } catch (IOException cex) {

? ? ? ? ? ? ? ? ? ? ? ? // 在關(guān)閉時忽略

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? }

? ? }

}


Netty

public class NettyOioServer {

? ? public void server(int port) throws Exception {

? ? ? ? final ByteBuf buf = Unpooled.unreleasableBuffer(

? ? ? ? ? ? ? ? Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));

? ? ? ? EventLoopGroup group = new OioEventLoopGroup();

? ? ? ? try {

? ? ? ? ? ? ServerBootstrap b = new ServerBootstrap();? ? ? ? //1

? ? ? ? ? ? b.group(group)? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //2

? ? ? ? ? ? .channel(OioServerSocketChannel.class)

? ? ? ? ? ? .localAddress(new InetSocketAddress(port))

? ? ? ? ? ? .childHandler(new ChannelInitializer<SocketChannel>() {//3

? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? public void initChannel(SocketChannel ch)

? ? ? ? ? ? ? ? ? ? throws Exception {

? ? ? ? ? ? ? ? ? ? ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {? ? ? ? ? ? //4

? ? ? ? ? ? ? ? ? ? ? ? @Override

? ? ? ? ? ? ? ? ? ? ? ? public void channelActive(ChannelHandlerContext ctx) throws Exception {

? ? ? ? ? ? ? ? ? ? ? ? ? ? ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);//5

? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? });

? ? ? ? ? ? ? ? }

? ? ? ? ? ? });

? ? ? ? ? ? ChannelFuture f = b.bind().sync();? //6

? ? ? ? ? ? f.channel().closeFuture().sync();

? ? ? ? } finally {

? ? ? ? ? ? group.shutdownGracefully().sync();? ? ? ? //7

? ? ? ? }

? ? }

}


Channel

數(shù)據(jù)傳輸流萝喘,與channel相關(guān)的概念有以下四個,上一張圖讓你了解netty里面的Channel琼懊。


Channel阁簸,表示一個連接,可以理解為每一個請求哼丈,就是一個Channel启妹。

ChannelHandler,核心處理業(yè)務(wù)就在這里醉旦,用于處理業(yè)務(wù)請求饶米。

ChannelHandlerContext桨啃,用于傳輸業(yè)務(wù)數(shù)據(jù)。

ChannelPipeline咙崎,用于保存處理過程需要用到的ChannelHandler和ChannelHandlerContext优幸。

ByteBuf

ByteBuf是一個存儲字節(jié)的容器,最大特點就是使用方便褪猛,它既有自己的讀索引和寫索引,方便你對整段字節(jié)緩存進行讀寫羹饰,也支持get/set伊滋,方便你對其中每一個字節(jié)進行讀寫,他的數(shù)據(jù)結(jié)構(gòu)如下圖所示:

ByteBuf數(shù)據(jù)結(jié)構(gòu)

Heap Buffer 堆緩沖區(qū)

堆緩沖區(qū)是ByteBuf最常用的模式队秩,他將數(shù)據(jù)存儲在堆空間笑旺。

Direct Buffer 直接緩沖區(qū)

直接緩沖區(qū)是ByteBuf的另外一種常用模式,他的內(nèi)存分配都不發(fā)生在堆馍资,jdk1.4引入的nio的ByteBuffer類允許jvm通過本地方法調(diào)用分配內(nèi)存筒主,這樣做有兩個好處

通過免去中間交換的內(nèi)存拷貝, 提升IO處理速度; 直接緩沖區(qū)的內(nèi)容可以駐留在垃圾回收掃描的堆區(qū)以外。

DirectBuffer 在 -XX:MaxDirectMemorySize=xxM大小限制下, 使用 Heap 之外的內(nèi)存, GC對此”無能為力”,也就意味著規(guī)避了在高負載下頻繁的GC過程對應用線程的中斷影響.

Composite Buffer 復合緩沖區(qū)

復合緩沖區(qū)相當于多個不同ByteBuf的視圖鸟蟹,這是netty提供的乌妙,jdk不提供這樣的功能。

Codec

Netty中的編碼/解碼器建钥,通過他你能完成字節(jié)與pojo藤韵、pojo與pojo的相互轉(zhuǎn)換,從而達到自定義協(xié)議的目的熊经。

在Netty里面最有名的就是HttpRequestDecoder和HttpResponseEncoder了泽艘。

認識Http請求

在動手寫Netty框架之前,我們先要了解http請求的組成镐依,如下圖:


HTTP Request 第一部分是包含的頭信息

HttpContent 里面包含的是數(shù)據(jù)匹涮,可以后續(xù)有多個 HttpContent 部分

LastHttpContent 標記是 HTTP request 的結(jié)束,同時可能包含頭的尾部信息

完整的 HTTP request槐壳,由1然低,2,3組成


HTTP response 第一部分是包含的頭信息

HttpContent 里面包含的是數(shù)據(jù)宏粤,可以后續(xù)有多個 HttpContent 部分

LastHttpContent 標記是 HTTP response 的結(jié)束脚翘,同時可能包含頭的尾部信息

完整的 HTTP response,由1绍哎,2来农,3組成

從request的介紹我們可以看出來,一次http請求并不是通過一次對話完成的崇堰,他中間可能有很次的連接沃于。通過上一章我們隊netty的了解涩咖,每一次對話都會建立一個channel,并且一個ChannelInboundHandler一般是不會同時去處理多個Channel的繁莹。

如何在一個Channel里面處理一次完整的Http請求檩互?這就要用到我們上圖提到的FullHttpRequest,我們只需要在使用netty處理channel的時候咨演,只處理消息是FullHttpRequest的Channel闸昨,這樣我們就能在一個ChannelHandler中處理一個完整的Http請求了。

什么是Decoder和Encoder

?? 在學習Decoder和Encoder之前薄风,首先要了解他們在具體是個什么東西饵较。在Netty里面,有四個核心概念遭赂,這個在第一篇文章提到的循诉,他們的分別是:

Channel,一個客戶端與服務(wù)器通信的通道

ChannelHandler撇他,業(yè)務(wù)邏輯處理器茄猫,分為ChannelInboundHandler和ChannelOutboundHandler

ChannelInboundHandler,輸入數(shù)據(jù)處理器

ChannelOutboundHandler困肩,輸出業(yè)務(wù)處理器

通常情況下划纽,業(yè)務(wù)邏輯都是存在于ChannelHandler之中

ChannelPipeline,用于存放ChannelHandler的容器

ChannelContext僻弹,通信管道的上下文

他們之間的交流流程如下圖:

Channel關(guān)系圖

他們的交互流程是:

事件傳遞給 ChannelPipeline 的第一個 ChannelHandler

ChannelHandler 通過關(guān)聯(lián)的 ChannelHandlerContext 傳遞事件給 ChannelPipeline 中的 下一個

ChannelHandler 通過關(guān)聯(lián)的 ChannelHandlerContext 傳遞事件給 ChannelPipeline 中的 下一個

而我們本文所需要詳細講的Decoder和Encoder阿浓,他們分別就是ChannelInboundHandler和ChannelOutboundHandler,分別用于在數(shù)據(jù)流進來的時候?qū)⒆止?jié)碼轉(zhuǎn)換為消息對象和數(shù)據(jù)流出去的時候?qū)⑾ο筠D(zhuǎn)換為字節(jié)碼蹋绽。

Encoder

?? Encoder最重要的實現(xiàn)類是MessageToByteEncoder<T>芭毙,這個類的作用就是將消息實體T從對象轉(zhuǎn)換成byte,寫入到ByteBuf卸耘,然后再丟給剩下的ChannelOutboundHandler傳給客戶端退敦,流程圖如下:


Encoder流程圖

encode方法是繼承MessageToByteEncoder唯一需要重寫的方法,可見其簡單程度蚣抗。也是因為Encoder相比于Decoder更為簡單侈百,在這里也不多做贅述,直接上代碼:

public class ShortToByteEncoder extends? ?MessageToByteEncoder<Short> {? //1

? ? @Override

? ? public void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out)

? ? ? ? ? ? throws Exception {

? ? ? ? out.writeShort(msg);? //2

? ? }

}

Decoder

?? 和Encoder一樣翰铡,decoder就是在服務(wù)端收到數(shù)據(jù)的時候钝域,將字節(jié)流轉(zhuǎn)換為實體對象Message。但是和Encoder的處理邏輯不一樣锭魔,數(shù)據(jù)傳到服務(wù)端有可能不是一次請求就能完成的例证,中間可能需要經(jīng)過幾次數(shù)據(jù)傳輸,并且每一次傳輸傳多少數(shù)據(jù)也是不確定的迷捧,所以它有兩個重要方法:

decode和decodeLast的不同之處织咧,在于他們的調(diào)用時機不同胀葱,正如描述所說,decodeLast只有在Channel的生命周期結(jié)束之前會調(diào)用一次笙蒙,默認是調(diào)用decode方法抵屿。

?? 同樣是ToInteger的解碼器,他的代碼如下:


public class ToIntegerDecoder extends ByteToMessageDecoder { //1

? ? @Override

? ? public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)

? ? ? ? ? ? throws Exception {

? ? ? ? if (in.readableBytes() >= 4) {? //2

? ? ? ? ? ? out.add(in.readInt());? //3

? ? ? ? }

? ? }

}

從這段代碼可以看出捅位,因為不知道這次請求發(fā)過來多少數(shù)據(jù)轧葛,所以每次都要判斷byte長度夠不夠4,如果你的數(shù)據(jù)長度更長绿渣,且不固定的話朝群,這里的邏輯會變得非常復雜。所以在這里介紹另一個我們常用的解碼器 :ReplayingDecoder中符。

ReplayingDecoder

?? ReplayingDecoder 是 byte-to-message 解碼的一種特殊的抽象基類,讀取緩沖區(qū)的數(shù)據(jù)之前需要檢查緩沖區(qū)是否有足夠的字節(jié)誉帅,使用ReplayingDecoder就無需自己檢查淀散;若ByteBuf中有足夠的字節(jié),則會正常讀妊料恰档插;若沒有足夠的字節(jié)則會停止解碼。

?? RelayingDecoder在使用的時候需要搞清楚的兩個方法是checkpoint(S s)和state()亚再,其中checkpoint的參數(shù)S,代表的是ReplayingDecoder所處的狀態(tài)氛悬,一般是枚舉類型则剃。RelayingDecoder是一個有狀態(tài)的Handler,狀態(tài)表示的是它目前讀取到了哪一步如捅,checkpoint(S s)是設(shè)置當前的狀態(tài)棍现,state()是獲取當前的狀態(tài)。

?? 在這里我們模擬一個簡單的Decoder镜遣,假設(shè)每個包包含length:int和content:String兩個數(shù)據(jù)己肮,其中l(wèi)ength可以為0,代表一個空包悲关,大于0的時候代表content的長度谎僻。代碼如下:

public class LiveDecoder extends ReplayingDecoder<LiveDecoder.LiveState> { //1

? ? public enum LiveState { //2

? ? ? ? LENGTH,

? ? ? ? CONTENT

? ? }

? ? private LiveMessage message = new LiveMessage();

? ? public LiveDecoder() {

? ? ? ? super(LiveState.LENGTH); // 3

? ? }

? ? @Override

? ? protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {

? ? ? ? switch (state()) { // 4

? ? ? ? ? ? case LENGTH:

? ? ? ? ? ? ? ? int length = byteBuf.readInt();

? ? ? ? ? ? ? ? if (length > 0) {

? ? ? ? ? ? ? ? ? ? checkpoint(LiveState.CONTENT); // 5

? ? ? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? ? ? list.add(message); // 6

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? break;

? ? ? ? ? ? case CONTENT:

? ? ? ? ? ? ? ? byte[] bytes = new byte[message.getLength()];

? ? ? ? ? ? ? ? byteBuf.readBytes(bytes);

? ? ? ? ? ? ? ? String content = new String(bytes);

? ? ? ? ? ? ? ? message.setContent(content);

? ? ? ? ? ? ? ? list.add(message);

? ? ? ? ? ? ? ? break;

? ? ? ? ? ? default:

? ? ? ? ? ? ? ? throw new IllegalStateException("invalid state:" + state());

? ? ? ? }

? ? }

}

繼承ReplayingDecoder,泛型LiveState寓辱,用來表示當前讀取的狀態(tài)

描述LiveState艘绍,有讀取長度和讀取內(nèi)容兩個狀態(tài)

初始化的時候設(shè)置為讀取長度的狀態(tài)

讀取的時候通過state()方法來確定當前處于什么狀態(tài)

如果讀取出來的長度大于0,則設(shè)置為讀取內(nèi)容狀態(tài)讶舰,下一次讀取的時候則從這個位置開始

讀取完成鞍盗,往結(jié)果里面放解析好的數(shù)據(jù)

?? 以上就是ReplayingDecoder的使用方法需了,他比ByteToMessageDecoder更加靈活,能夠通過巧妙的方式來處理復雜的業(yè)務(wù)邏輯般甲,但是也是因為這個原因肋乍,使得ReplayingDecoder帶有一定的局限性:

不是所有的標準 ByteBuf 操作都被支持,如果調(diào)用一個不支持的操作會拋出 UnreplayableOperationException

ReplayingDecoder 略慢于 ByteToMessageDecoder

所以敷存,如果不引入過多的復雜性 使用 ByteToMessageDecoder 墓造。否則,使用ReplayingDecoder。

MessageToMessage

?? Encoder和Decoder除了能完成Byte和Message的相互轉(zhuǎn)換之外锚烦,為了處理復雜的業(yè)務(wù)邏輯觅闽,還能幫助使用者完成Message和Message的相互轉(zhuǎn)換,我們熟悉的Http協(xié)議的處理涮俄,其中就用到了很多MessageToMessage的派生類蛉拙。

Netty如何實現(xiàn)長連接

一個簡單的長連接demo分為以下幾個步驟:


長連接流程

1.創(chuàng)建連接(Channel)

2.發(fā)心跳包

3.發(fā)消息,并通知其他用戶

4.一段時間沒收到心跳包或者用戶主動關(guān)閉之后關(guān)閉連接

? 看似簡單的步驟彻亲,里面有兩個技術(shù)難點:

1.如何保存已創(chuàng)建的Channel

這里我們是將Channel放在一個Map中孕锄,以Channel.hashCode()作為key

其實這樣做有一個劣勢,就是不適合水平擴展苞尝,每個機器都有一個連接數(shù)的上線畸肆,如果需要實現(xiàn)多用戶實時在線,對機器的數(shù)量要求會很高宙址,在這里我們不多做討論轴脐,不同的業(yè)務(wù)場景,設(shè)計方案也是不同的抡砂,可以在長連接方案和客戶端輪詢方案中進行選擇大咱。

2.如何自動關(guān)閉沒有心跳的連接

Netty有一個比較好的Feature,就是ScheduledFuture舀患,他可以通過ChannelHandlerContext.executor().schedule()創(chuàng)建徽级,支持延時提交,也支持取消任務(wù)聊浅,這就給我們心跳包的自動關(guān)閉提供了一個很好的實現(xiàn)方案餐抢。

開始動手

?? 首先,我們需要用一個JavaBean來封裝通信的協(xié)議內(nèi)容低匙,在這里我們只需要三個數(shù)據(jù)就行了:

1.type : byte旷痕,表示消息的類型,有心跳類型和內(nèi)容類型

2.length : int顽冶,表示消息的長度

3.content : String欺抗,表示消息的內(nèi)容(心跳包在這里沒有內(nèi)容)

?? 然后,因為我們需要將Channel和ScheduledFuture緩存在Map里面强重,所以需要將兩個對象組合成一個JavaBean绞呈。

?? 接著贸人,需要完成輸入輸出流的解析和轉(zhuǎn)換,我們需要重寫Decoder和Encoder佃声,

服務(wù)端:

import io.netty.channel.Channel;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import io.netty.util.concurrent.ScheduledFuture;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.util.HashMap;

import java.util.Map;

import java.util.concurrent.TimeUnit;

/**

* Created by RoyDeng on 17/7/20.

*/

public class LiveHandler extends SimpleChannelInboundHandler<LiveMessage> { // 1

? ? private static Map<Integer, LiveChannelCache> channelCache = new HashMap<>();

? ? private Logger logger = LoggerFactory.getLogger(LiveHandler.class);

? ? @Override

? ? protected void channelRead0(ChannelHandlerContext ctx, LiveMessage msg) throws Exception {

? ? ? ? Channel channel = ctx.channel();

? ? ? ? final int hashCode = channel.hashCode();

? ? ? ? System.out.println("channel hashCode:" + hashCode + " msg:" + msg + " cache:" + channelCache.size());

? ? ? ? if (!channelCache.containsKey(hashCode)) {

? ? ? ? ? ? System.out.println("channelCache.containsKey(hashCode), put key:" + hashCode);

? ? ? ? ? ? channel.closeFuture().addListener(future -> {

? ? ? ? ? ? ? ? System.out.println("channel close, remove key:" + hashCode);

? ? ? ? ? ? ? ? channelCache.remove(hashCode);

? ? ? ? ? ? });

? ? ? ? ? ? ScheduledFuture scheduledFuture = ctx.executor().schedule(

? ? ? ? ? ? ? ? ? ? () -> {

? ? ? ? ? ? ? ? ? ? ? ? System.out.println("schedule runs, close channel:" + hashCode);

? ? ? ? ? ? ? ? ? ? ? ? channel.close();

? ? ? ? ? ? ? ? ? ? }, 10, TimeUnit.SECONDS);

? ? ? ? ? ? channelCache.put(hashCode, new LiveChannelCache(channel, scheduledFuture));

? ? ? ? }

? ? ? ? switch (msg.getType()) {

? ? ? ? ? ? case LiveMessage.TYPE_HEART: {

? ? ? ? ? ? ? ? LiveChannelCache cache = channelCache.get(hashCode);

? ? ? ? ? ? ? ? ScheduledFuture scheduledFuture = ctx.executor().schedule(

? ? ? ? ? ? ? ? ? ? ? ? () -> channel.close(), 5, TimeUnit.SECONDS);

? ? ? ? ? ? ? ? cache.getScheduledFuture().cancel(true);

? ? ? ? ? ? ? ? cache.setScheduledFuture(scheduledFuture);

? ? ? ? ? ? ? ? ctx.channel().writeAndFlush(msg);

? ? ? ? ? ? ? ? break;

? ? ? ? ? ? }

? ? ? ? ? ? case LiveMessage.TYPE_MESSAGE: {

? ? ? ? ? ? ? ? channelCache.entrySet().stream().forEach(entry -> {

? ? ? ? ? ? ? ? ? ? Channel otherChannel = entry.getValue().getChannel();

? ? ? ? ? ? ? ? ? ? otherChannel.writeAndFlush(msg);

? ? ? ? ? ? ? ? });

? ? ? ? ? ? ? ? break;

? ? ? ? ? ? }

? ? ? ? }

? ? }

? ? @Override

? ? public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

? ? ? ? logger.debug("channelReadComplete");

? ? ? ? super.channelReadComplete(ctx);

? ? }

? ? @Override

? ? public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

? ? ? ? logger.debug("exceptionCaught");

? ? ? ? if(null != cause) cause.printStackTrace();

? ? ? ? if(null != ctx) ctx.close();

? ? }

}

2.客戶端

package com.dz.test;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.io.IOException;

import java.net.InetSocketAddress;

import java.net.Socket;

import java.nio.ByteBuffer;

import java.util.Scanner;


public class LongConnection {

? ? private Logger logger = LoggerFactory.getLogger(LongConnection.class);

? ? String host = "localhost";

? ? int port = 8080;

? ? public void testLongConnection() throws Exception {

? ? ? ? logger.debug("start");

? ? ? ? final Socket socket = new Socket();

? ? ? ? socket.connect(new InetSocketAddress(host, port));

? ? ? ? Scanner scanner = new Scanner(System.in);

? ? ? ? new Thread(() -> {

? ? ? ? ? ? while (true) {

? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? byte[] input = new byte[64];

? ? ? ? ? ? ? ? ? ? int readByte = socket.getInputStream().read(input);

? ? ? ? ? ? ? ? ? ? logger.debug("readByte " + readByte);

? ? ? ? ? ? ? ? } catch (IOException e) {

? ? ? ? ? ? ? ? ? ? e.printStackTrace();

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? }).start();

? ? ? ? int code;

? ? ? ? while (true) {

? ? ? ? ? ? code = scanner.nextInt();

? ? ? ? ? ? logger.debug("input code:" + code);

? ? ? ? ? ? if (code == 0) {

? ? ? ? ? ? ? ? break;

? ? ? ? ? ? } else if (code == 1) {

? ? ? ? ? ? ? ? ByteBuffer byteBuffer = ByteBuffer.allocate(5);

? ? ? ? ? ? ? ? byteBuffer.put((byte) 1);

? ? ? ? ? ? ? ? byteBuffer.putInt(0);

? ? ? ? ? ? ? ? socket.getOutputStream().write(byteBuffer.array());

? ? ? ? ? ? ? ? logger.debug("write heart finish!");

? ? ? ? ? ? } else if (code == 2) {

? ? ? ? ? ? ? ? byte[] content = ("hello, I'm" + hashCode()).getBytes();

? ? ? ? ? ? ? ? ByteBuffer byteBuffer = ByteBuffer.allocate(content.length + 5);

? ? ? ? ? ? ? ? byteBuffer.put((byte) 2);

? ? ? ? ? ? ? ? byteBuffer.putInt(content.length);

? ? ? ? ? ? ? ? byteBuffer.put(content);

? ? ? ? ? ? ? ? socket.getOutputStream().write(byteBuffer.array());

? ? ? ? ? ? ? ? logger.debug("write content finish!");

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? socket.close();

? ? }

? ? // 因為Junit不支持用戶輸入,所以用main的方式來執(zhí)行用例

? ? public static void main(String[] args) throws Exception {

? ? ? ? new LongConnection().testLongConn();

? ? }

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末艺智,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子圾亏,更是在濱河造成了極大的恐慌十拣,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,496評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件志鹃,死亡現(xiàn)場離奇詭異夭问,居然都是意外死亡,警方通過查閱死者的電腦和手機曹铃,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,407評論 3 392
  • 文/潘曉璐 我一進店門缰趋,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人陕见,你說我怎么就攤上這事埠胖。” “怎么了淳玩?”我有些...
    開封第一講書人閱讀 162,632評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長非竿。 經(jīng)常有香客問我蜕着,道長,這世上最難降的妖魔是什么红柱? 我笑而不...
    開封第一講書人閱讀 58,180評論 1 292
  • 正文 為了忘掉前任承匣,我火速辦了婚禮,結(jié)果婚禮上锤悄,老公的妹妹穿的比我還像新娘韧骗。我一直安慰自己,他們只是感情好零聚,可當我...
    茶點故事閱讀 67,198評論 6 388
  • 文/花漫 我一把揭開白布袍暴。 她就那樣靜靜地躺著,像睡著了一般隶症。 火紅的嫁衣襯著肌膚如雪政模。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,165評論 1 299
  • 那天蚂会,我揣著相機與錄音淋样,去河邊找鬼。 笑死胁住,一個胖子當著我的面吹牛趁猴,可吹牛的內(nèi)容都是我干的刊咳。 我是一名探鬼主播,決...
    沈念sama閱讀 40,052評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼儡司,長吁一口氣:“原來是場噩夢啊……” “哼娱挨!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起枫慷,我...
    開封第一講書人閱讀 38,910評論 0 274
  • 序言:老撾萬榮一對情侶失蹤让蕾,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后或听,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體探孝,經(jīng)...
    沈念sama閱讀 45,324評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,542評論 2 332
  • 正文 我和宋清朗相戀三年誉裆,在試婚紗的時候發(fā)現(xiàn)自己被綠了顿颅。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,711評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡足丢,死狀恐怖粱腻,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情斩跌,我是刑警寧澤绍些,帶...
    沈念sama閱讀 35,424評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站耀鸦,受9級特大地震影響柬批,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜袖订,卻給世界環(huán)境...
    茶點故事閱讀 41,017評論 3 326
  • 文/蒙蒙 一氮帐、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧洛姑,春花似錦上沐、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,668評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至产徊,卻和暖如春昂勒,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背舟铜。 一陣腳步聲響...
    開封第一講書人閱讀 32,823評論 1 269
  • 我被黑心中介騙來泰國打工戈盈, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人。 一個月前我還...
    沈念sama閱讀 47,722評論 2 368
  • 正文 我出身青樓塘娶,卻偏偏與公主長得像归斤,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子刁岸,可洞房花燭夜當晚...
    茶點故事閱讀 44,611評論 2 353

推薦閱讀更多精彩內(nèi)容