《netty in action》讀書筆記 PART1

6. ChannelHandler and ChannelPipeline

6.1 The ChannelHandler family

6.1.1 Channel的生命周期

ChannelUnregistered

已創(chuàng)建魁衙,但是還沒有被注冊(cè)到EventLoop上。

ChannelRegistered

已創(chuàng)建,并且已經(jīng)注冊(cè)到EventLoop。

ChannelActive

連接上遠(yuǎn)程主機(jī)。

ChannelActive

沒有連接到遠(yuǎn)程主機(jī)太雨。

Channel狀態(tài)的變化會(huì)觸發(fā)相應(yīng)的事件。

6.1.2 ChannelHandler的生命周期

handlerAdd

添加handler

handlerRemove

刪除handler

exceptionCaught

發(fā)生異常

ChannelHandler有兩個(gè)重要的子接口:ChannelInboundHandlerChannelOutboundHandler

6.1.3 ChannelInboundHandler接口

接受到數(shù)據(jù)或者Channel的狀態(tài)發(fā)生改變會(huì)調(diào)用ChannelInboundHandler中的方法芦圾。注意,當(dāng)ChannelInboundHandler中的channelRead()方法被overwrite俄认,需要對(duì)ByteBuf實(shí)例持有的資源進(jìn)行顯示釋放个少。

public class DiscardHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
}
}

可以使用SimpleChannelInboundHandler,它會(huì)自動(dòng)釋放資源眯杏,無(wú)需人工干預(yù):

@Sharable
public class SimpleDiscardHandler
extends SimpleChannelInboundHandler<Object> {
@Override
public void channelRead0(ChannelHandlerContext ctx,
Object msg) {
// No need to do anything special
}
}

6.1.4 ChannelOutboundHandler接口

它一個(gè)比較強(qiáng)大的功能是延遲執(zhí)行夜焦。

CHANNELPROMISE VS. CHANNELFUTURE

CHANNELPROMISE是CHANNELFUTURE的子接口,CHANNELFUTURE是不可寫的岂贩,CHANNELPROMISE是可寫的(例如setSuccess(),setFailure()方法)

6.1.5 ChannelHandler adapters

關(guān)系圖

6.1.6 資源管理

要注意ChannelInboundHandler.channelRead()或者ChannelOutboundHandler.write()要釋放相應(yīng)的資源茫经,否則會(huì)產(chǎn)生內(nèi)存泄漏。netty使用引用計(jì)數(shù)法來(lái)管理內(nèi)存資源萎津⌒渡。可以使用netty提供的ResourceLeakDetector來(lái)發(fā)現(xiàn)潛在的內(nèi)存泄漏問題。

java -Dio.netty.leakDetectionLevel=ADVANCED

leakDetectionLevel可以為DISABLED锉屈、SIMPLE(默認(rèn))荤傲、ADVANCED和PARANOID。

6.2 ChannelPipeline接口

ChannelPipeline可以看成由ChannelHandler組成的鏈表部念,I/O事件會(huì)在ChannelPipeline上傳播弃酌。每個(gè)新Channel會(huì)綁定一個(gè)新ChannelPipeline,兩者是一對(duì)一關(guān)系儡炼。


pipeline中的事件傳播

事件傳播的時(shí)候妓湘,會(huì)判斷ChannelHandler的類型(implements Inbound還是OutBound的接口)和事件傳播的方向是否一致,不一致跳過乌询。

6.2.1 ChannelPipeline修改

ChannelPipeline中ChannelHandler可以動(dòng)態(tài)地被添加榜贴、刪除或者替換。


ChannelPipeline中操作ChannelHandler

6.2.2 Firing events

會(huì)調(diào)用ChannelPipeline中下一個(gè)ChannelHandler里的方法。


代碼示例:


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;

public class HttpServer {
    
    public static void main(String[] args) throws InterruptedException {
        
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        
        
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup,workGroup)
                     .channel(NioServerSocketChannel.class)
                     .childHandler(new ChannelInitializer<Channel>() {
    
                        @Override
                        protected void initChannel(Channel ch) throws Exception {
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new MyHandler());
                            ch.pipeline().addLast(new MyHandler2());
                        }
                         
                    });
            ChannelFuture future = bootstrap.bind(8080).sync();
            future.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

}

class MyHandler extends SimpleChannelInboundHandler<String>{

    @Override
    protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("in MyHandler1 , messageReceived invoked");
        for(int i = 0;i < 10 ; i++) {
            ctx.fireChannelInactive();//調(diào)用fireChannelInactive 10次
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("in MyHandler1 , channelInactive invoked");
    }
}

class MyHandler2 extends SimpleChannelInboundHandler<String>{
    @Override
    protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("in MyHandler2 ,messageReceived invoked");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("in MyHandler2 , channelInactive invoked");
    }
}

輸出:

控制臺(tái)輸出

6.3 ChannelHandlerContext接口

ChannelHandlerContext代表了ChannelHandler和ChannelPipeline之間的聯(lián)系唬党,無(wú)論何時(shí)鹃共,添加一個(gè)ChannelHandler到ChannelPipeline就會(huì)創(chuàng)建一個(gè)ChannelHandlerContext。ChannelHandlerContext的主要功能是和所在ChannelPipeline的其他ChannelHandler交互驶拱。

ChannelHandlerContext有很多方法霜浴,大部分方法在Channel和ChannelPipeline里都出現(xiàn)過,但是這里有一個(gè)非常大的區(qū)別蓝纲,調(diào)用Channel和ChannelPipeline里的方法阴孟,會(huì)在整個(gè)pipeline里傳播(從頭到尾),而ChannelHandlerContext里同名的方法税迷,是從當(dāng)前ChannelHandler開始傳播永丝。

6.3.1 Using ChannelHandlerContext

概念關(guān)系圖

6.3.2 ChannelHandler和ChannelHandlerContext的高級(jí)用法。

  1. ChannelHandlerContext的pipeline()方法可以獲取ChannelPipeline的引用箭养,這樣我們可以通過這個(gè)引用操作ChannelHandler慕嚷,實(shí)現(xiàn)動(dòng)態(tài)協(xié)議
  2. 可以把ChannelHandlerContext的引用緩存起來(lái)毕泌,在ChannelHandler方法外面用喝检,甚至在一個(gè)不同的線程里使用。下面提供了一個(gè)示例懈词。
引用緩存實(shí)例
  1. 可以將一個(gè)ChannelHandler實(shí)例可能會(huì)被添加到不同的ChannelPipeline里蛇耀,但是需要使用@Sharable注解,此外還需注意的是坎弯,這個(gè)Sharable的ChannelHandler需要是線程安全的纺涤。

為什么需要@Sharable的ChannelHandler,一個(gè)需求就是通過這個(gè)@Sharable來(lái)統(tǒng)計(jì)多個(gè)Channel的數(shù)據(jù)抠忘。

6.4 異常處理

6.4.1 Inbound異常處理

Inbound異常處理

由于exception默認(rèn)會(huì)從觸發(fā)異常的ChannelHandler繼續(xù)向后流動(dòng)撩炊,所以圖中的這種處理邏輯,我們一般放在最后ChannelPipeline的末尾崎脉。這樣就可以確保拧咳,無(wú)論是哪個(gè)ChannelHandler觸發(fā)異常,都能夠被捕獲并處理囚灼。如果不對(duì)異常做捕獲處理操作骆膝,netty會(huì)打印異常未被捕獲的日志。

6.4.2 outbound異常處理

進(jìn)行outbound操作灶体,要想知道結(jié)果(正常完成還是發(fā)生異常)阅签,需要這樣做:

  1. 每個(gè)outbound操作都會(huì)返回一個(gè)ChannelFuture。添加到ChannelFuture上的監(jiān)聽器會(huì)收到成功或者錯(cuò)誤通知蝎抽。

  2. ChannelOutboundHandler中的方法絕大多數(shù)都會(huì)ChannelPromise類型的參數(shù)政钟。ChannelPromise也可以添加監(jiān)聽來(lái)接受異步通知。ChannelPromise是可寫的,可以通過它的setSucess()方法或者setFailure(Throwable cause)立即發(fā)布通知养交。

如果ChannelOutboundHandler自己拋出異常精算,netty會(huì)通知添加到ChannelPromise上的監(jiān)聽器。

7. EventLoop and threading model

7.1 Threading model overview

JDK早期版本多線程編程的方式是create新線程再start碎连。JDK5推出了Executor API灰羽,它的線程池技術(shù)通過緩存和重用大大提高了性能。

  1. 有任務(wù)(Runnable實(shí)現(xiàn))的時(shí)候破花,從線程池里挑選出一個(gè)空閑線程谦趣,把任務(wù)submit給它疲吸。
  2. 任務(wù)執(zhí)行完畢了座每,線程變成空閑,回到線程池摘悴,等待下一次挑選使用峭梳。
線程池技術(shù)

線程池不能解決上下文切換開銷的問題,上下文的開銷在heavy load下會(huì)很大蹂喻。

7.2 EventLoop接口

EventLoop是一個(gè)用來(lái)處理事件的任務(wù)葱椭,基本思想如下圖所示:

image.png

EventLoop接口的API分為兩類:concurrent和networking。

  1. concurrent
    基于java.util.concurrent包口四,提供thread executors
  2. networking
    io.netty.channel繼承了EventLoop接口孵运,提供了和Channel事件交互的能力。

7.2.1 Netty 4中I/O事件的處理

7.3.1 JDK 任務(wù)調(diào)度API

JDK5之前蔓彩,任務(wù)調(diào)度只能用java.util.Timer治笨,Timer就是一個(gè)后臺(tái)線程,有很多限制:

  1. 如果執(zhí)行多個(gè)定時(shí)任務(wù)赤嚼,一個(gè)任務(wù)發(fā)生異常沒有捕獲旷赖,整個(gè)Timer線程會(huì)掛掉(其他所有任務(wù)都會(huì)down掉)
  2. 假如某個(gè)任務(wù)的執(zhí)行時(shí)間過長(zhǎng),超過一些任務(wù)的間隔時(shí)間更卒,會(huì)導(dǎo)致這些任務(wù)執(zhí)行推遲等孵。

JDK后續(xù)推出了java.util.concurrent,其中定義的ScheduleExecutorService克服了這些缺陷蹂空。

ScheduledExecutorService executor =Executors.newScheduledThreadPool(10);
ScheduledFuture<?> future = executor.schedule(
  new Runnable() {
  @Override
  public void run() {
  System.out.println("60 seconds later");
}
}, 60, TimeUnit.SECONDS);
//to do
executor.shutdown();

盡管ScheduledExecutorSevice挺好用的俯萌,但是在負(fù)載大的時(shí)候有較大的性能耗費(fèi),netty進(jìn)行了優(yōu)化上枕。

7.3.2 使用EventLoop進(jìn)行任務(wù)調(diào)度

ScheduledExecutorService也有一些限制咐熙,例如會(huì)創(chuàng)建額外創(chuàng)建一些線程來(lái)管理線程池,這在任務(wù)調(diào)度非常激烈的情況下姿骏,會(huì)成為性能的瓶頸糖声。netty沒有直接使用ScheduledExecutorService,使用了繼承于ScheduledExecutorService,自己實(shí)現(xiàn)的EventLoop蘸泻。

Channel ch = ...
ScheduledFuture<?> future = ch.eventLoop().schedule(
  new Runnable() {
  @Override
  public void run() {
    System.out.println("60 seconds later");
  }
}, 60, TimeUnit.SECONDS);

重復(fù)定時(shí)執(zhí)行:

Channel ch = ...
ScheduledFuture<?> future = ch.eventLoop().scheduleAtFixedRate(
  new Runnable() {
  @Override
  public void run() {
    System.out.println("Run every 60 seconds");
  }
}, 60, 60, TimeUnit.Seconds);

7.4 實(shí)現(xiàn)細(xì)節(jié)

7.4.1 線程管理

netty線程模型的優(yōu)越之處是在于它會(huì)確定當(dāng)前執(zhí)行線程的身份琉苇,再進(jìn)行相應(yīng)操作。如果當(dāng)前執(zhí)行線程被綁定到當(dāng)前的ChannelEventLoop悦施,會(huì)被直接執(zhí)行并扇,否則會(huì)被放到EventLoop的隊(duì)列里,每個(gè)EventLoop有自己?jiǎn)为?dú)的隊(duì)列抡诞。

EventLoop 執(zhí)行邏輯

Never put a long-running task in the execution queue, because it will block any other task from executing on the same thread.” If you must make blocking calls or execute long-running tasks, we advise the use of a dedicated EventExecutor.

7.4.2 EventLoop/Thread分配

EventLoopGroup包含了EventLoopsChannels穷蛹,EventLoops創(chuàng)建方式取決于使用哪種I/O.

異步I/O

異步I/O僅僅使用少量的EventLoops,這些EventLoops被很多的Channels共享昼汗,這樣就可以用最少的線程接受很多的Channels,而不是一個(gè)線程一個(gè)Channel肴熏。

阻塞I/O

共同點(diǎn):每個(gè)Channel的I/O事件只會(huì)被一個(gè)線程處理。

8. Bootstrapping

bootstrapping an application is the process of configuring it to run

8.1 Bootstrap classes

Namely, a server devotes a parent channel to accepting connections from clients and
creating child channels for conversing with them, whereas a client will most likely
require only a single, non-parent channel for all network interactions. (As we’ll see, this
applies also to connectionless transports such as UDP , because they don’t require a
channel for each connection.)

server需要一個(gè)parent channel來(lái)接受客戶端連接顷窒,需要?jiǎng)?chuàng)建多個(gè)child channels來(lái)應(yīng)答客戶端蛙吏。

client只需要一個(gè)單獨(dú)的channel,不需要parent channel鞋吉。

服務(wù)端處理使用ServerBootstrap,客戶端使用Bootstrap鸦做。

Why are the bootstrap classes Cloneable?
You’ll sometimes need to create multiple channels that have similar or identical settings. To support this pattern without requiring a new bootstrap instance to be created and configured for each channel, AbstractBootstrap has been marked Cloneable . Calling clone() on an already configured bootstrap will return another bootstrap instance that’s immediately usable. Note that this creates only a shallow copy of the bootstrap’s EventLoopGroup , so the latter will be shared among all of the cloned channels. This is acceptable, as the cloned channels are often short-lived, a typical case being a channel created to make an HTTP request.

8.2 Bootstrapping clients and connectionless protocols

Bootstrap主要用來(lái)給客戶端和使用面向無(wú)連接的應(yīng)用創(chuàng)建Channels

Bootstraping a client:

EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new SimpleChannelInboundHandler<ByteBuf>() {
    @Override
    protected void channeRead0(
    ChannelHandlerContext channelHandlerContext,
    ByteBuf byteBuf) throws Exception {
        System.out.println("Received data");
    }
} );
ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
future.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture)throws Exception {
        if (channelFuture.isSuccess()) {
            System.out.println("Connection established");
        } else {
            System.err.println("Connection attempt failed");
            channelFuture.cause().printStackTrace();
        }
    }
} );

8.2.2 Channel和EventLoopGroup的兼容性

you can’t mix components having different
prefixes, such as NioEventLoopGroup and OioSocketChannel . The following listing
shows an attempt to do just that.

ChannelEventLoopGroup的前綴要一樣谓着。否則會(huì)拋出IllegalStateException

8.3 Bootstraping servers

ServerBootstrap類

A ServerBootstrap creating a ServerChannel on bind() , and the ServerChannel managing a number of child Channels.

相比 Bootstrap類泼诱,增加了childHandler(),childAttr(),childOption()方法。ServerChannel來(lái)創(chuàng)建許許多多的子Channel赊锚,代表接受的連接治筒。ServerBootstrap提供了這些方法來(lái)簡(jiǎn)化對(duì)子Channel的配置。

NioEventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx,ByteBuf byteBuf) throw Exception {
    System.out.println("Received data");
}
} );

ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
future.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        if (channelFuture.isSuccess()) {
            System.out.println("Server bound");
        } else {
            System.err.println("Bound attempt failed");
            channelFuture.cause().printStackTrace();
        }
    }
} );

8.4 Bootstrapping clients from a Channel

Suppose your server is processing a client request that requires it to act as a client to
a third system. This can happen when an application, such as a proxy server, has to
integrate with an organization’s existing systems, such as web services or databases. In
such cases you’ll need to bootstrap a client Channel from a ServerChannel

作為服務(wù)端接受連接改抡,同時(shí)又作為客戶端矢炼,請(qǐng)求遠(yuǎn)程服務(wù)器(類似于proxy),最容易想到的辦法是再創(chuàng)建一個(gè)客戶端的Bootstrap阿纤,但是這樣需要另外一個(gè)EventLoop來(lái)處理客戶端角色的Channel句灌,發(fā)生在服務(wù)端Channel和客戶端Channel之間數(shù)據(jù)交換引起的上文切換也會(huì)帶來(lái)額外的性能損耗。

最好的辦法是創(chuàng)建的客戶端Channel和服務(wù)端Channel欠拾,共享同一個(gè)EventLoop:

    ServerBootstrap bootstrap = new ServerBootstrap();
//Sets the EventLoopGroups that provide EventLoops for processing Channel events
        bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()).channel(NioServerSocketChannel.class)
                .childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
                    ChannelFuture connectFuture;

                    @Override
                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                  //Creates a Bootstrap to connect to remote host
                        Bootstrap bootstrap = new Bootstrap();
                        bootstrap.channel(NioSocketChannel.class).handler(new SimpleChannelInboundHandler<ByteBuf>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
                                System.out.println("Received data");
                            }
                        });
//Uses the same EventLoop as the one assigned to the accepted channel
                        bootstrap.group(ctx.channel().eventLoop());
                        connectFuture = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
                    }

                    @Override
                    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf)
                            throws Exception {
                        if (connectFuture.isDone()) {
// do something with the data
//When the connection is complete performs some data operation (such as proxying)   
                        }
                    }
                });
        ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    System.out.println("Server bound");
                } else {
                    System.err.println("Bind attempt failed");
                    channelFuture.cause().printStackTrace();
                }
            }
        });

8.5 Adding multiple ChannelHandlers during a bootstrap

bootstrap的時(shí)候胰锌,如何添加多個(gè)ChannelHandler?

netty提供了ChannelInboundHandlerAdapter的特殊子類ChannelInitializer:

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter

ChannelInitializer提供了initChannel()可以輕松添加ChannelHandlersChannelPipeline

protected abstract void initChannel(C ch) throws Exception;

一旦Channel注冊(cè)到EventLoop,我們實(shí)現(xiàn)的initChannel()就會(huì)被調(diào)用藐窄。當(dāng)initChannel()返回的時(shí)候资昧,ChannelInitializer實(shí)例會(huì)把自己從ChannelPipeline中刪除。

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup())
                 .channel(NioServerSocketChannel.class)
                 .childHandler(new ChannelInitializerImpl());

        ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
        future.sync();

對(duì)應(yīng)ChannelInitializerImpl的實(shí)現(xiàn):

final class ChannelInitializerImpl extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new HttpClientCodec());
        pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
    }
}

8.6 Using Netty ChannelOptions and attributes

不需要我們手工配置每個(gè)Channel,netty提供了option()方法來(lái)把ChannelOptions應(yīng)用到bootstrap荆忍,ChannelOptions中的配置會(huì)自動(dòng)地應(yīng)用到所有Channel

Netty的Channelbootstrap類格带,提供了AttributeMap抽象集合和AttributeKey<T>泛型類,用來(lái)insert和retrieve屬性值撤缴。使用這些工具,我們可以安全地把任意類型的數(shù)據(jù)和Channel關(guān)聯(lián)起來(lái)叽唱。

Attribute的一個(gè)使用場(chǎng)景是屈呕,服務(wù)端應(yīng)用需要追蹤用戶和Channels的關(guān)系」淄ぃ可以把用戶的ID作為一個(gè)屬性存到Channel里虎眨。這樣就可以實(shí)現(xiàn)根據(jù)ID來(lái)路由消息和Channel不活躍自動(dòng)關(guān)閉等功能。

final AttributeKey<Integer> id = new AttributeKey<Integer>("ID");
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup()).channel(NioSocketChannel.class)
        .handler(new SimpleChannelInboundHandler<ByteBuf>() {
            @Override
            public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                Integer idValue = ctx.channel().attr(id).get();
                // do something with the idValue
            }

            @Override
            protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf)
                    throws Exception {
                System.out.println("Received data");
            }
        });
bootstrap.option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
bootstrap.attr(id, 123456);
ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
future.syncUninterruptibly();

8.7 Bootstrapping DatagramChannels

之前的bootstrap示例代碼都是基于TCP-based的SocketChannel镶摘,bootstrap也可以配置為無(wú)連接協(xié)議嗽桩。

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new OioEventLoopGroup()).channel(OioDatagramChannel.class)
        .handler(new SimpleChannelInboundHandler<DatagramPacket>() {
            @Override
            public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
                // Do something with the packet
            }
        });
ChannelFuture future = bootstrap.bind(new InetSocketAddress(0));
future.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        if (channelFuture.isSuccess()) {
            System.out.println("Channel bound");
        } else {
            System.err.println("Bind attempt failed");
            channelFuture.cause().printStackTrace();
        }
    }
});

8.8 Shutdown

Alternatively, you can call Channel.close() explicitly on all active channels before calling EventLoopGroup.shutdownGracefully() . But in all cases, remember to shut down the EventLoopGroup itself.

EventLoopGroup.shutdownGracefully(),它的返回值是一個(gè)future,這也是一個(gè)異步操作。

EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class);
...
Future<?> future = group.shutdownGracefully();
// block until the group has shutdown
future.syncUninterruptibly();

9 Unit testing

Netty提供了embedded transport來(lái)測(cè)試ChannelHandlers,embedded transportEmbeddedChannel (一種特殊的Channel實(shí)現(xiàn)) 的特色功能凄敢,可以簡(jiǎn)單地實(shí)現(xiàn)在pipeline中傳播事件碌冶。

我們可以寫入inbound或者outbound數(shù)據(jù)到EmbeddedChannel,然后檢查是否有東西傳輸?shù)?code>ChannelPipeline的末尾贡未。我們還可以確定消息是否被編解碼种樱,是否有ChannelHandler被觸發(fā)。

Inbound data會(huì)被ChannelInboundHandlers處理俊卤,代表著從遠(yuǎn)程主機(jī)讀取的數(shù)據(jù)。

outbound data會(huì)被ChannelOutboundHandlers處理害幅,代表將要發(fā)送到遠(yuǎn)程主機(jī)的數(shù)據(jù)消恍。

相關(guān)API:

圖9.1展示了數(shù)據(jù)在EmbededChannel的流動(dòng)情況。我們可以:

  1. 使用writeOutbound(),寫入消息到Channel,讓消息以outbound方向在pipeline中傳遞以现。后續(xù)狠怨,我們可以使用readOutbound()讀取處理過后的數(shù)據(jù),判斷結(jié)果是否與預(yù)期一致邑遏。

  2. 使用writeInbound(),寫入消息到Channel,讓消息以inbound方向在pipeline中傳遞佣赖。后續(xù),我們可以使用readInbound()讀取處理過后的數(shù)據(jù)记盒,判斷結(jié)果是否與預(yù)期一致憎蛤。

9.2 Testing ChannelHandlers with EmbeddedChannel

9.2.1 Testing inbound messages

圖9.2 展示了一個(gè)簡(jiǎn)單的ByteToMessageDecoder實(shí)現(xiàn)。如果有足夠的數(shù)據(jù)纪吮,這個(gè)Decoder會(huì)產(chǎn)生固定大小的frame俩檬。如果沒有足夠的數(shù)據(jù),沒有達(dá)到這個(gè)固定的size值碾盟,它會(huì)等待接下來(lái)的數(shù)據(jù)棚辽,繼續(xù)判斷能否接著產(chǎn)生frame。

具體代碼實(shí)現(xiàn)如下:

public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
    private final int frameLength;

    public FixedLengthFrameDecoder(int frameLength) {
        if (frameLength <= 0) {
            throw new IllegalArgumentException(
                    "frameLength must be a positive integer: " + frameLength);
        }
        this.frameLength = frameLength;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in,
            List<Object> out) throws Exception {
        while (in.readableBytes() >= frameLength) {
            ByteBuf buf = in.readBytes(frameLength);
            out.add(buf);
        }
    }
}

那么如何進(jìn)行單元測(cè)試呢冰肴,測(cè)試代碼如下:

public class FixedLengthFrameDecoderTest {
    @Test
    public void testFramesDecoded() {
        ByteBuf buf = Unpooled.buffer();
        for (int i = 0; i < 9; i++) {
            buf.writeByte(i);
        }
        ByteBuf input = buf.duplicate();
        EmbeddedChannel channel = new EmbeddedChannel(
                new FixedLengthFrameDecoder(3));
        // write bytes
        assertTrue(channel.writeInbound(input.retain()));
        assertTrue(channel.finish());
        // read messages
        ByteBuf read = (ByteBuf) channel.readInbound();
        assertEquals(buf.readSlice(3), read);
        read.release();
        read = (ByteBuf) channel.readInbound();
        assertEquals(buf.readSlice(3), read);
        read.release();
        read = (ByteBuf) channel.readInbound();
        assertEquals(buf.readSlice(3), read);
        read.release();
        assertNull(channel.readInbound());
        buf.release();
    }

    @Test
    public void testFramesDecoded2() {
        ByteBuf buf = Unpooled.buffer();
        for (int i = 0; i < 9; i++) {
            buf.writeByte(i);
        }
        ByteBuf input = buf.duplicate();
        EmbeddedChannel channel = new EmbeddedChannel(
                new FixedLengthFrameDecoder(3));
        assertFalse(channel.writeInbound(input.readBytes(2)));
        assertTrue(channel.writeInbound(input.readBytes(7)));
        assertTrue(channel.finish());
        ByteBuf read = (ByteBuf) channel.readInbound();
        assertEquals(buf.readSlice(3), read);
        read.release();
        read = (ByteBuf) channel.readInbound();
        assertEquals(buf.readSlice(3), read);
        read.release();
        read = (ByteBuf) channel.readInbound();
        assertEquals(buf.readSlice(3), read);
        read.release();
        assertNull(channel.readInbound());
        buf.release();
    }
}

9.2.2 Testing outbound messages

我們需要測(cè)試一個(gè)編碼器:AbsIntegerEncoder,它是Netty的MessageToMessageEncode的一個(gè)實(shí)現(xiàn)屈藐,功能是將整數(shù)取絕對(duì)值榔组。

我們的流程如下:

  1. EmbeddedChannel會(huì)將一個(gè)四字節(jié)負(fù)數(shù)按照outbound方向?qū)懭?code>Channel。

  2. 編碼器會(huì)從到來(lái)的ByteBuf讀取每個(gè)負(fù)數(shù)联逻,調(diào)用Math.abs()獲得絕對(duì)值瓷患。

  3. 編碼器將絕對(duì)值寫入到ChannelHandlerPipe

編碼器代碼實(shí)現(xiàn):

public class AbsIntegerEncoder extends MessageToMessageEncoder<ByteBuf> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext,
            ByteBuf in, List<Object> out) throws Exception {
        while (in.readableBytes() >= 4) {
            int value = Math.abs(in.readInt());
            out.add(value);
        }
    }
}

怎么測(cè)試遣妥?請(qǐng)看下文:

public class AbsIntegerEncoderTest {
    @Test
    public void testEncoded() {
        ByteBuf buf = Unpooled.buffer();
        for (int i = 1; i < 10; i++) {
            buf.writeInt(i * -1);
        }
        EmbeddedChannel channel = new EmbeddedChannel(new AbsIntegerEncoder());
        assertTrue(channel.writeOutbound(buf));
        assertTrue(channel.finish());
        // read bytes
        for (int i = 1; i < 10; i++) {
            assertEquals(i, channel.readOutbound());
        }
        assertNull(channel.readOutbound());
    }
}

9.3 Testing exception handling

為了測(cè)試異常處理擅编,我們有如下的示例。
為防止資源耗盡箫踩,當(dāng)我們讀取到的數(shù)據(jù)多于某個(gè)數(shù)值爱态,我們會(huì)拋出一個(gè)TooLongFrameException


在圖9.4中,最大frame的大小為3字節(jié)境钟,當(dāng)一個(gè)frame的字節(jié)數(shù)大于3锦担,它會(huì)被忽略,并且會(huì)拋出TooLongFrameException慨削,其他的pipeline里的其他ChannelHandlers要么覆寫exceptionCaught()進(jìn)行捕獲處理洞渔,要么會(huì)忽略這個(gè)異常。

解碼器代碼:

public class FrameChunkDecoder extends ByteToMessageDecoder {
    private final int maxFrameSize;

    public FrameChunkDecoder(int maxFrameSize) {
        this.maxFrameSize = maxFrameSize;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in,
            List<Object> out) throws Exception {
        int readableBytes = in.readableBytes();
        if (readableBytes > maxFrameSize) {
            // discard the bytes
            in.clear();
            throw new TooLongFrameException();
        }
        ByteBuf buf = in.readBytes(readableBytes);
        out.add(buf);
    }
}

如何測(cè)試缚态,請(qǐng)看:

public class FrameChunkDecoderTest {
    @Test
    public void testFramesDecoded() {
        ByteBuf buf = Unpooled.buffer();
        for (int i = 0; i < 9; i++) {
            buf.writeByte(i);
        }
        ByteBuf input = buf.duplicate();
        EmbeddedChannel channel = new EmbeddedChannel(new FrameChunkDecoder(3));
        assertTrue(channel.writeInbound(input.readBytes(2)));
        try {
            channel.writeInbound(input.readBytes(4));
            Assert.fail();
        } catch (TooLongFrameException e) {
            // expected exception
        }
        assertTrue(channel.writeInbound(input.readBytes(3)));
        assertTrue(channel.finish());
        // Read frames
        ByteBuf read = (ByteBuf) channel.readInbound();
        assertEquals(buf.readSlice(2), read);
        read.release();
        read = (ByteBuf) channel.readInbound();
        assertEquals(buf.skipBytes(4).readSlice(3), read);
        read.release();
        buf.release();
    }
}

10.The codec framework

encoder,將outbound消息轉(zhuǎn)換成易于傳輸?shù)姆绞?大部分是字節(jié)流)磁椒。
decoder,將inbound網(wǎng)絡(luò)字節(jié)流轉(zhuǎn)回成應(yīng)用程序消息格式。

10.2 Decoders

兩種場(chǎng)景需要使用到Decoders:

  1. 將字節(jié)流解碼成消息--ByteToMessageDecoderReplayingDecoder
  2. 將一種消息類型解碼成另一種類型--MessageToMessageDecoder

10.2.1 ByteToMessageDecoder抽象類

功能: 將字節(jié)流解碼成消息或者另一種字節(jié)流玫芦。

使用示例ToIntegerDecoder

每次從ByteBuf讀取四個(gè)字節(jié)浆熔,解碼成int,添加到List里。當(dāng)沒有更多的數(shù)據(jù)添加到List,List里的內(nèi)容會(huì)傳遞到下一個(gè)ChannelInboundHandler桥帆。

public class ToIntegerDecoder extends ByteToMessageDecoder {
    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() >= 4) {
            out.add(in.readInt());
        }
    }
}

編解碼框架里医增,消息處理完了,會(huì)自動(dòng)調(diào)用ReferenceCountUtil.release(message)老虫,資源會(huì)自動(dòng)釋放叶骨。

Reference counting in codecs
As we mentioned in chapters 5 and 6, reference counting requires special attention. In the case of encoders and decoders, the procedure is quite simple: once a mes- sage has been encoded or decoded, it will automatically be released by a call to ReferenceCountUtil.release(message) . If you need to keep a reference for later use you can call ReferenceCountUtil.retain(message) . This increments the reference count, preventing the message from being released.

10.2.2 ReplayingDecoder抽象類

public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder

ReplayingDecoder繼承于ByteToMessageDecoder,特點(diǎn)是我們不再需要調(diào)用readableBytes(),省了判斷數(shù)據(jù)是否足夠的邏輯祈匙。

注意:

  1. 不是所有的ByteBuf的操作都被支持忽刽。如果不支持會(huì)拋出UnsupportedOperationException異常。

  2. ReplayingDecoder會(huì)比ByteToMessageDecoder稍慢菊卷。

ToIntegerDecoder2:

public class ToIntegerDecoder2 extends ReplayingDecoder<Void> {
    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        out.add(in.readInt());
    }
}

更多的解碼工具可以在io.netty.handler.codec下找到缔恳。

  1. io.netty.handler.codec.LineBasedFrameDecoder,通過換行符(\n或者\r\n)來(lái)解析消息洁闰。

  2. io.netty.handler.codec.http.HttpObjectDecoder歉甚,解析HTTP數(shù)據(jù)。

10.2.3 MessageToMessageDecoder抽象類

消息格式互相轉(zhuǎn)換扑眉,如把一種類型的POJO轉(zhuǎn)換成另外一種纸泄。

public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter

API差不多

示例:IntegerToStringDecoder

public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> {
    @Override
    public void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
        out.add(String.valueOf(msg));
    }
}

一個(gè)更貼切詳細(xì)的例子是io.netty.handler.codec.http.HttpObjectAggregator

TooLongFrameException防止資源耗盡:

public class SafeByteToMessageDecoder extends ByteToMessageDecoder {
    private static final int MAX_FRAME_SIZE = 1024;

    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int readable = in.readableBytes();
        if (readable > MAX_FRAME_SIZE) {
            in.skipBytes(readable);
            throw new TooLongFrameException("Frame too big!");
        }
        // do something
    }
}

10.3 Encoders

與解碼器類似赖钞,Encoders分為兩種:

  1. 將消息編碼成字節(jié)流。
  2. 將一種消息編碼成另一種格式的消息聘裁。

10.3.1 MessageToByteEncoder抽象類

示例ShortToByteEncoder

public class ShortToByteEncoder extends MessageToByteEncoder<Short> {
    @Override
    public void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out) throws Exception {
        out.writeShort(msg);
    }
}

更具體的應(yīng)用實(shí)踐可以參見io.netty.handler.codec.http.websocketx.WebSocket08FrameEncoder

10.4 編解碼抽象類

既能encode,又能decode雪营,二合一。

10.4.1 ByteToMessageCodec抽象類

Any request/response protocol could be a good candidate for using the ByteToMessageCodec . For example, in an SMTP implementation, the codec would read incoming bytes and decode them to a custom message type, say SmtpRequest . On the receiving side, when a response is created, an SmtpResponse will be produced, which will be encoded back to bytes for transmission.

10.4.2 MessageToMessageCodec抽象類

public abstract class MessageToMessageCodec<INBOUND_IN,OUTBOUND_IN>
public class WebSocketConvertHandler extends MessageToMessageCodec<WebSocketFrame, WebSocketConvertHandler.MyWebSocketFrame> {
    @Override
    protected void encode(ChannelHandlerContext ctx,
        WebSocketConvertHandler.MyWebSocketFrame msg, List<Object> out)
        throws Exception {
        ByteBuf payload = msg.getData().duplicate().retain();

        switch (msg.getType()) {
        case BINARY:
            out.add(new BinaryWebSocketFrame(payload));

            break;

        case TEXT:
            out.add(new TextWebSocketFrame(payload));

            break;

        case CLOSE:
            out.add(new CloseWebSocketFrame(true, 0, payload));

            break;

        case CONTINUATION:
            out.add(new ContinuationWebSocketFrame(payload));

            break;

        case PONG:
            out.add(new PongWebSocketFrame(payload));

            break;

        case PING:
            out.add(new PingWebSocketFrame(payload));

            break;

        default:
            throw new IllegalStateException("Unsupported websocket msg " + msg);
        }
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg,
        List<Object> out) throws Exception {
        ByteBuf payload = msg.getData().duplicate().retain();

        if (msg instanceof BinaryWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.BINARY,
                    payload));
        } else if (msg instanceof CloseWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CLOSE,
                    payload));
        } else if (msg instanceof PingWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PING,
                    payload));
        } else if (msg instanceof PongWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PONG,
                    payload));
        } else if (msg instanceof TextWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.TEXT,
                    payload));
        } else if (msg instanceof ContinuationWebSocketFrame) {
            out.add(new MyWebSocketFrame(
                    MyWebSocketFrame.FrameType.CONTINUATION, payload));
        } else {
            throw new IllegalStateException("Unsupported websocket msg " + msg);
        }
    }

    public static final class MyWebSocketFrame {
        private final FrameType type;
        private final ByteBuf data;

        public WebSocketFrame(FrameType type, ByteBuf data) {
            this.type = type;
            this.data = data;
        }

        public FrameType getType() {
            return type;
        }

        public ByteBuf getData() {
            return data;
        }
        public enum FrameType {BINARY,
            CLOSE,
            PING,
            PONG,
            TEXT,
            CONTINUATION;
        }
    }
}

10.4.3 CombinedChannelDuplexHandler類

將編碼器解碼器放在一塊影響代碼的重用性衡便。CombinedChannelDuplexHandler可以解決這個(gè)問題献起。我們可以使用它而不直接使用codec抽象類。

方法簽名:

public class CombinedChannelDuplexHandler <I extends ChannelInboundHandler, O extends ChannelOutboundHandler>

下面是一個(gè)使用范例:
解碼器例子ByteToCharDecoder
功能是一次讀取2個(gè)字節(jié)镣陕,解碼成char寫到List

public class ByteToCharDecoder extends ByteToMessageDecoder {
    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
        throws Exception {
        while (in.readableBytes() >= 2) {
            out.add(in.readChar());
        }
    }
}

編碼器例子CharToByteEncoder

public class CharToByteEncoder extends MessageToByteEncoder<Character> {
    @Override
    public void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out)
        throws Exception {
        out.writeChar(msg);
    }
}

是時(shí)候combine了:

public class CombinedByteCharCodec extends CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> {
    public CombinedByteCharCodec() {
        super(new ByteToCharDecoder(), new CharToByteEncoder());
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末谴餐,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子呆抑,更是在濱河造成了極大的恐慌岂嗓,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,743評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件鹊碍,死亡現(xiàn)場(chǎng)離奇詭異厌殉,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)侈咕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,296評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門公罕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人乎完,你說我怎么就攤上這事熏兄。” “怎么了树姨?”我有些...
    開封第一講書人閱讀 157,285評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)桥状。 經(jīng)常有香客問我帽揪,道長(zhǎng),這世上最難降的妖魔是什么辅斟? 我笑而不...
    開封第一講書人閱讀 56,485評(píng)論 1 283
  • 正文 為了忘掉前任转晰,我火速辦了婚禮,結(jié)果婚禮上士飒,老公的妹妹穿的比我還像新娘查邢。我一直安慰自己,他們只是感情好酵幕,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,581評(píng)論 6 386
  • 文/花漫 我一把揭開白布扰藕。 她就那樣靜靜地躺著,像睡著了一般芳撒。 火紅的嫁衣襯著肌膚如雪邓深。 梳的紋絲不亂的頭發(fā)上未桥,一...
    開封第一講書人閱讀 49,821評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音芥备,去河邊找鬼冬耿。 笑死,一個(gè)胖子當(dāng)著我的面吹牛萌壳,可吹牛的內(nèi)容都是我干的亦镶。 我是一名探鬼主播,決...
    沈念sama閱讀 38,960評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼袱瓮,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼缤骨!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起懂讯,我...
    開封第一講書人閱讀 37,719評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤荷憋,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后褐望,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體勒庄,經(jīng)...
    沈念sama閱讀 44,186評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,516評(píng)論 2 327
  • 正文 我和宋清朗相戀三年瘫里,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了实蔽。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,650評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡谨读,死狀恐怖局装,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情劳殖,我是刑警寧澤铐尚,帶...
    沈念sama閱讀 34,329評(píng)論 4 330
  • 正文 年R本政府宣布,位于F島的核電站哆姻,受9級(jí)特大地震影響宣增,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜矛缨,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,936評(píng)論 3 313
  • 文/蒙蒙 一爹脾、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧箕昭,春花似錦灵妨、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,757評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至筋量,卻和暖如春烹吵,著一層夾襖步出監(jiān)牢的瞬間碉熄,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,991評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工肋拔, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留锈津,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,370評(píng)論 2 360
  • 正文 我出身青樓凉蜂,卻偏偏與公主長(zhǎng)得像琼梆,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子窿吩,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,527評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容