Netty 中的 handler 和 Pipeline

上一節(jié)我們講了 Netty 的啟動(dòng)流程,從啟動(dòng)流程入手分析了 Reactor 模型的第一步:channel 如何綁定 Selector正驻。然后講到了 EventLoop 在啟動(dòng)的時(shí)候發(fā)揮了什么作用劲藐。整個(gè)啟動(dòng)類我們從頭到尾過了一遍锌钮,今天我們來解決上節(jié)遺留的問題:Selector 如何將請求交給對應(yīng)的 handler處理。

1. handler 的初始化

還是先從啟動(dòng)類入手:

ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
  .channel(NioServerSocketChannel.class)
  .childHandler(new ServerChannelInitializer());

跟到childHandler(ChannelHandler childHandler)方法里面:

public ServerBootstrap childHandler(ChannelHandler childHandler) {
  if (childHandler == null) {
    throw new NullPointerException("childHandler");
  }
  this.childHandler = childHandler;
  return this;
}

這里做了一個(gè)賦值的操作扑浸。那么 childHandler是在哪里被使用呢烧给?用 idea 的查看引用功能可以看到:

2.png

這里有個(gè) int()方法看似比較關(guān)鍵,繼續(xù)跟進(jìn):

// 這是ServerBootStrapt對 他父類初始化 channel的實(shí)現(xiàn), 用于初始化 NioServerSocketChannel
void init(Channel channel) throws Exception {
  //ChannelOption 是在配置 Channel 的 ChannelConfig 的信息
  final Map<ChannelOption<?>, Object> options = options0();
  synchronized (options) {
    // 把 NioserverSocketChannel 和 options Map傳遞進(jìn)去, 給Channel里面的屬性賦值
    // 這些常量值全是關(guān)于和諸如TCP協(xié)議相關(guān)的信息
    channel.config().setOptions(options);
  }
      //再次一波 給Channel里面的屬性賦值  attrs0()是獲取到用戶自定義的業(yè)務(wù)邏輯屬性 --  AttributeKey
  final Map<AttributeKey<?>, Object> attrs = attrs0();
  // 這個(gè)map中維護(hù)的是 程序運(yùn)行時(shí)的 動(dòng)態(tài)的 業(yè)務(wù)數(shù)據(jù) , 可以實(shí)現(xiàn)讓業(yè)務(wù)數(shù)據(jù)隨著
  //netty的運(yùn)行原來存進(jìn)去的數(shù)據(jù)還能取出來
  synchronized (attrs) {
    for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
      @SuppressWarnings("unchecked")
      AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
      channel.attr(key).set(e.getValue());
    }
  }
    
    // ChannelPipeline 本身 就是一個(gè)重要的組件, 他里面是一個(gè)一個(gè)的處理器, 
  //說他是高級過濾器,交互的數(shù)據(jù) 會一層一層經(jīng)過它
    // 下面直接就調(diào)用了 p , 說明,在channel調(diào)用pipeline方法之前, pipeline已經(jīng)被創(chuàng)建出來了
    // 在創(chuàng)建NioServerSocketChannel這個(gè)通道對象時(shí),在他的頂級抽象父類(AbstractChannel)中創(chuàng)建了一個(gè)默認(rèn)的pipeline對象
    // ChannelHandlerContext 是 ChannelHandler和Pipeline 交互的橋梁
  ChannelPipeline p = channel.pipeline();
    // workerGroup 處理IO線程
  final EventLoopGroup currentChildGroup = childGroup;
  //添加我們自定義的 Initializer
  final ChannelHandler currentChildHandler = childHandler;
  final Entry<ChannelOption<?>, Object>[] currentChildOptions;
  final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
  synchronized (childOptions) {
    currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
  }
  synchronized (childAttrs) {
    currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
  }
   //默認(rèn) 往NioServerSocketChannel的管道里面添加了一個(gè) ChannelInitializer
  p.addLast(new ChannelInitializer<Channel>() {
    @Override
    public void initChannel(Channel ch) throws Exception {
      final ChannelPipeline pipeline = ch.pipeline();
      //這個(gè)handler 針對bossgroup的Channel 
      //給他添加上我們在server類中添加的handler()里面添加處理器
      ChannelHandler handler = config.handler();
      if (handler != null) {
        //將ServerSocketChannel的Handler添加到pipeline
        pipeline.addLast(handler);
      }

      ch.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
          //將ServerBootstrapAcceptor添加到ServerSocketChannel的pipeline
          pipeline.addLast(new ServerBootstrapAcceptor(
            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        }
      });
    }
  });
}

init()方法的傳參是 channel喝噪,感覺很好奇是被哪里調(diào)用的础嫡,往上找它的調(diào)用鏈:

-AbstractBootstrap.doBind()/Bootstrap.doResolveAndConnect()->
  -AbstractBootstrap.initAndRegister()->
    -AbstractBootstrap.init(Channel channel)

從調(diào)用鏈上看是不是很熟悉,是我們上一節(jié)分析過了的初始化里面的邏輯酝惧。上一節(jié) 在分析Bootstrap#initAndRegister方法的時(shí)候驰吓,在這里初始化了 channel 并 綁定到 Reactor 線程上。但是唯獨(dú)漏掉了 init() 方法:

3.png

因?yàn)檫@里的東西太多系奉,還是單獨(dú)抽出來一節(jié)來聊才通透檬贰。

可以看到 init()傳入的是某個(gè) ServerSocketChannel創(chuàng)建連接時(shí)候創(chuàng)建的 channel,回到init()方法內(nèi)部:

ChannelPipeline p = channel.pipeline();

從 channel 中獲取ChannelPipeline 對象缺亮,即一個(gè) channel 是跟一個(gè)特定的 ChannelPipeline 綁定的翁涤。這里又引申出另一個(gè)關(guān)鍵點(diǎn):ChannelPipeline桥言,我們喝口茶再討論。

2. ChannelPipeline

ChannelPipeline 是一個(gè)接口葵礼,實(shí)現(xiàn)類有兩個(gè):

DefaultChannelPipeline
EmbeddedChannelPipeline  

其中 EmbeddedChannelPipeline 又繼承了 DefaultChannelPipeline号阿,所以我們來看一下 DefaultChannelPipeline的實(shí)現(xiàn),首先看默認(rèn)的構(gòu)造方法:

protected DefaultChannelPipeline(Channel channel) {
  this.channel = ObjectUtil.checkNotNull(channel, "channel");
  succeededFuture = new SucceededChannelFuture(channel, null);
  voidPromise =  new VoidChannelPromise(channel, true);

  tail = new TailContext(this);
  head = new HeadContext(this);

  head.next = tail;
  tail.prev = head;
}

從構(gòu)造方法上看鸳粉,這很顯然是在構(gòu)造一個(gè)鏈表的結(jié)構(gòu)扔涧。不難看出 ChannelPipeline 內(nèi)部應(yīng)該是保存了一個(gè)鏈?zhǔn)降拇鎯壿嫛D擎湵碇斜4娴挠质鞘裁茨亟焯福渴呛芏嗟腃hannelPipeline 對象嗎枯夜?我們看看 tail 是什么類型:

final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;

很顯然 ChannelPipeline 里面又包裝了一層,即我們插入到 ChannelPipeline 中的 handler 被包裝為 ChannelHandlerContext 對象艰山。

回想到我們自己的 ServerChannelInitializer湖雹,是不是通過 pipeline.addLast()方法來添加各種 handler 的處理邏輯呢,ChannelPipeline 的類聲明上有很大一段 doc曙搬,不看白不看:


/**
 * A list of {@link ChannelHandler}s which handles or intercepts inbound events and outbound operations of a
 * {@link Channel}.  {@link ChannelPipeline} implements an advanced form of the
 * <a >Intercepting Filter</a> pattern
 * to give a user full control over how an event is handled and how the {@link ChannelHandler}s in a pipeline
 * interact with each other.
 *
 * <h3>Creation of a pipeline</h3>
 * 每一個(gè)channel 都有自己的pipeline摔吏,就是在channel 創(chuàng)建的時(shí)候自動(dòng)創(chuàng)建一個(gè)pipeline
 * Each channel has its own pipeline and it is created automatically when a new channel is created.
 *ChannelPipeline是一個(gè)處理或者攔截Channel的出棧事件或者入棧操作的ChannelHandler列表,
 *ChannelPipeline實(shí)現(xiàn)了一種高效的攔截過濾器模式的形式來讓用戶完全控制一個(gè)事件怎樣處理
 *和pipeline的ChannelHandler怎樣和其他ChannelHandler交互纵装。
 *
 * <h3>How an event flows in a pipeline</h3>
 * 下面的圖描述了一個(gè)I/O事件一般是怎樣在ChannelPipeline里的ChannelHandler處理的征讲,
 * 一個(gè)I/O事件要么被ChannelInboundHandler處理,要么里邊的事件傳播方法轉(zhuǎn)發(fā)給最近的一個(gè)處理器橡娄,比如
 * ChannelHandlerContext#fireChannelRead(Object)和ChannelHandlerContext#write(Object)诗箍。
 * The following diagram describes how I/O events are processed by {@link ChannelHandler}s in a {@link ChannelPipeline}
 * typically. An I/O event is handled by either a {@link ChannelInboundHandler} or a {@link ChannelOutboundHandler}
 * and be forwarded to its closest handler by calling the event propagation methods defined in
 * {@link ChannelHandlerContext}, such as {@link ChannelHandlerContext#fireChannelRead(Object)} and
 * {@link ChannelHandlerContext#write(Object)}.
 *
 * <pre>
 *                                                 I/O Request
 *                                            via {@link Channel} or
 *                                        {@link ChannelHandlerContext}
 *                                                      |
 *  +---------------------------------------------------+---------------+
 *  |                           ChannelPipeline         |               |
 *  |                                                  \|/              |
 *  |    +---------------------+            +-----------+----------+    |
 *  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  .               |
 *  |               .                                   .               |
 *  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
 *  |        [ method call]                       [method call]         |
 *  |               .                                   .               |
 *  |               .                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  +---------------+-----------------------------------+---------------+
 *                  |                                  \|/
 *  +---------------+-----------------------------------+---------------+
 *  |               |                                   |               |
 *  |       [ Socket.read() ]                    [ Socket.write() ]     |
 *  |                                                                   |
 *  |  Netty Internal I/O Threads (Transport Implementation)            |
 *  +-------------------------------------------------------------------+
 * </pre>
 * 通過上圖可以看到入棧的和出棧的處理器互不干擾。
 *在左圖瀑踢,一個(gè)入棧事件是從下到上的順序被綁定的處理器處理的,一個(gè)入棧處理器通常處理從I/O線程生成的數(shù)據(jù)才避,
 *一個(gè)入棧事件是從下到上的順序被綁定的處理器處理的橱夭,一個(gè)入棧處理器通常處理從I/O線程生成的數(shù)據(jù),
 * An inbound event is handled by the inbound handlers in the bottom-up direction as shown on the left side of the
 * diagram.  An inbound handler usually handles the inbound data generated by the I/O thread on the bottom of the
 * diagram.  The inbound data is often read from a remote peer via the actual input operation such as
 * {@link SocketChannel#read(ByteBuffer)}.  If an inbound event goes beyond the top inbound handler, it is discarded
 * silently, or logged if it needs your attention.
 * <p>
 *右圖桑逝,一個(gè)出棧事件會被出棧處理器處理棘劣,一個(gè)出棧處理器生成或者傳輸出棧數(shù)據(jù),比如寫請求楞遏,
 *如果一個(gè)出棧事件超出最底層的處理器茬暇,那么他將會被I/O,線程處理,與其關(guān)聯(lián)的SocketChannel#write(ByteBuffer)操作寡喝。
 * An outbound event is handled by the outbound handler in the top-down direction as shown on the right side of the
 * diagram.  An outbound handler usually generates or transforms the outbound traffic such as write requests.
 * If an outbound event goes beyond the bottom outbound handler, it is handled by an I/O thread associated with the
 * {@link Channel}. The I/O thread often performs the actual output operation such as
 * {@link SocketChannel#write(ByteBuffer)}.
 * <p>
 * For example, let us assume that we created the following pipeline:
 * <pre>
 * {@link ChannelPipeline} p = ...;
 * p.addLast("1", new InboundHandlerA());//入棧處理器
 * p.addLast("2", new InboundHandlerB());//入棧處理器
 * p.addLast("3", new OutboundHandlerA());//出棧處理器
 * p.addLast("4", new OutboundHandlerB());//出棧處理器
 * p.addLast("5", new InboundOutboundHandlerX());//既是入棧又是出棧處理器
 * </pre>
 *在前邊提到的例子中糙俗,以Inbound開頭的都是入棧處理器,以O(shè)utbound開頭的都是出棧處理器预鬓。
 * In the example above, the class whose name starts with {@code Inbound} means it is an inbound handler.
 * The class whose name starts with {@code Outbound} means it is a outbound handler.
 * <p>
 * In the given example configuration, the handler evaluation order is 1, 2, 3, 4, 5 when an event goes inbound.
 * When an event goes outbound, the order is 5, 4, 3, 2, 1.  On top of this principle, {@link ChannelPipeline} skips
 * the evaluation of certain handlers to shorten the stack depth:
 * <ul>
 * <li>3 and 4 don't implement {@link ChannelInboundHandler}, and therefore the actual evaluation order of an inbound
 *     event will be: 1, 2, and 5.</li>
 * <li>1 and 2 don't implement {@link ChannelOutboundHandler}, and therefore the actual evaluation order of a
 *     outbound event will be: 5, 4, and 3.</li>
 * <li>If 5 implements both {@link ChannelInboundHandler} and {@link ChannelOutboundHandler}, the evaluation order of
 *     an inbound and a outbound event could be 125 and 543 respectively.</li>
 * </ul>
 *
 * <h3>Forwarding an event to the next handler</h3>
 *
 * As you might noticed in the diagram shows, a handler has to invoke the event propagation methods in
 * {@link ChannelHandlerContext} to forward an event to its next handler.  Those methods include:
 *一個(gè)處理器調(diào)用ChannelHandlerContext的事件傳播方法轉(zhuǎn)發(fā)給下一個(gè)處理器巧骚,這些方法包括:
 * <ul>
 * <li>Inbound event propagation methods:
 *     <ul>
 * 入棧事件傳播方法
 *     <li>{@link ChannelHandlerContext#fireChannelRegistered()}</li>
 *     <li>{@link ChannelHandlerContext#fireChannelActive()}</li>
 *     <li>{@link ChannelHandlerContext#fireChannelRead(Object)}</li>
 *     <li>{@link ChannelHandlerContext#fireChannelReadComplete()}</li>
 *     <li>{@link ChannelHandlerContext#fireExceptionCaught(Throwable)}</li>
 *     <li>{@link ChannelHandlerContext#fireUserEventTriggered(Object)}</li>
 *     <li>{@link ChannelHandlerContext#fireChannelWritabilityChanged()}</li>
 *     <li>{@link ChannelHandlerContext#fireChannelInactive()}</li>
 *     <li>{@link ChannelHandlerContext#fireChannelUnregistered()}</li>
 *     </ul>
 * </li>
 * <li>Outbound event propagation methods:
 *     <ul>
 * 出棧事件傳播方法
 *     <li>{@link ChannelHandlerContext#bind(SocketAddress, ChannelPromise)}</li>
 *     <li>{@link ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)}</li>
 *     <li>{@link ChannelHandlerContext#write(Object, ChannelPromise)}</li>
 *     <li>{@link ChannelHandlerContext#flush()}</li>
 *     <li>{@link ChannelHandlerContext#read()}</li>
 *     <li>{@link ChannelHandlerContext#disconnect(ChannelPromise)}</li>
 *     <li>{@link ChannelHandlerContext#close(ChannelPromise)}</li>
 *     <li>{@link ChannelHandlerContext#deregister(ChannelPromise)}</li>
 *     </ul>
 * </li>
 * </ul>
 *
 * and the following example shows how the event propagation is usually done:
 *
 * <pre>
 * public class MyInboundHandler extends {@link ChannelInboundHandlerAdapter} {
 *     {@code @Override}
 *     public void channelActive({@link ChannelHandlerContext} ctx) {
 *         System.out.println("Connected!");
 *         ctx.fireChannelActive();
 *     }
 * }
 *
 * public clas MyOutboundHandler extends {@link ChannelOutboundHandlerAdapter} {
 *     {@code @Override}
 *     public void close({@link ChannelHandlerContext} ctx, {@link ChannelPromise} promise) {
 *         System.out.println("Closing ..");
 *         ctx.close(promise);
 *     }
 * }
 * </pre>
 *
 * <h3>Building a pipeline</h3>
 * <p>
 * A user is supposed to have one or more {@link ChannelHandler}s in a pipeline to receive I/O events (e.g. read) and
 * to request I/O operations (e.g. write and close).  For example, a typical server will have the following handlers
 * in each channel's pipeline, but your mileage may vary depending on the complexity and characteristics of the
 * protocol and business logic:
 *
 * <ol>
 * <li>Protocol Decoder - translates binary data (e.g. {@link ByteBuf}) into a Java object.</li>
 * <li>Protocol Encoder - translates a Java object into binary data.</li>
 * <li>Business Logic Handler - performs the actual business logic (e.g. database access).</li>
 * </ol>
 *
 * and it could be represented as shown in the following example:
 *
 * <pre>
 * static final {@link EventExecutorGroup} group = new {@link DefaultEventExecutorGroup}(16);
 * ...
 *
 * {@link ChannelPipeline} pipeline = ch.pipeline();
 *
 * pipeline.addLast("decoder", new MyProtocolDecoder());
 * pipeline.addLast("encoder", new MyProtocolEncoder());
 *
 * // Tell the pipeline to run MyBusinessLogicHandler's event handler methods
 * // in a different thread than an I/O thread so that the I/O thread is not blocked by
 * // a time-consuming task.
 * //如果你的業(yè)務(wù)邏輯是完全同步的或者完成的非常快,你不需要添加提供特別的線程池去異步執(zhí)行劈彪,反之竣蹦,
 * //你就需要一個(gè)異步的處理邏輯以保證不要阻塞IO的速度
 * // If your business logic is fully asynchronous or finished very quickly, you don't
 * // need to specify a group.
 * pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
 * </pre>
 *
 * <h3>Thread safety</h3>
 * <p>
 * A {@link ChannelHandler} can be added or removed at any time because a {@link ChannelPipeline} is thread safe.
 * For example, you can insert an encryption handler when sensitive information is about to be exchanged, and remove it
 * after the exchange.
 */

上面的注釋解釋了 ChannelPipeline 做了什么事情:這是一個(gè) handler 的 list,handler 用于處理或攔截入站事件和出站事件沧奴,pipeline 實(shí)現(xiàn)了過濾器的高級形式痘括,以便用戶完全控制事件如何處理以及 handler 在 pipeline 中如何交互。

再把視線收回到 init()方法滔吠,在 init()中調(diào)用 p.addLast()方法纲菌,將 ChannelInitializer 插入到鏈表的末端。

接著看addLast()方法:

@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
  final AbstractChannelHandlerContext newCtx;
  synchronized (this) {
    //檢查 Channelhandler 的名字是否重復(fù), 如果不重復(fù), 則調(diào)用 newContext() 方法為這個(gè)Handler 創(chuàng)建一個(gè)對應(yīng)的 DefaultChannelHandlerContext 實(shí)例
    // 檢查handler是否是@Sharable屠凶,是否已添加
    checkMultiplicity(handler);
        //為了添加一個(gè) handler 到pipeline 中, 必須把此 handler 包裝成 ChannelHandlerContext
    newCtx = newContext(group, filterName(name, handler), handler);

    addLast0(newCtx);

    // 如果 registered 為 false,則表示這個(gè)channel還未注冊到EventLoop上.
    // 在這種情況下驰后,我們添加一個(gè)Task到PendingHandlerCallback中,
    // 等到這個(gè)channel注冊成功之后矗愧,將會調(diào)用立即調(diào)用 ChannelHandler.handlerAdded(...) 方法灶芝,已達(dá)到channel添加的目的

    if (!registered) {
      newCtx.setAddPending();
      callHandlerCallbackLater(newCtx, true);
      return this;
    }

    EventExecutor executor = newCtx.executor();
    if (!executor.inEventLoop()) {
      newCtx.setAddPending();
      executor.execute(new Runnable() {
        @Override
        public void run() {
          callHandlerAdded0(newCtx);
        }
      });
      return this;
    }
  }
  callHandlerAdded0(newCtx);
  return this;
}

// 檢查是否重復(fù)
private static void checkMultiplicity(ChannelHandler handler) {
  // handler是否為ChannelHandlerAdapter類型,不是則不做處理
  if (handler instanceof ChannelHandlerAdapter) {
    ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
    // 判斷handler是否添加了Sharable注解 && 是否添加過了
    if (!h.isSharable() && h.added) {
      throw new ChannelPipelineException(
        h.getClass().getName() +
        " is not a @Sharable handler, so can't be added or removed multiple times.");
    }
    h.added = true;
  }
}

// 創(chuàng)建新的節(jié)點(diǎn)
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
  // 調(diào)用DefaultChannelHandlerContext的構(gòu)造函數(shù)
  return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

addLast() 方法中, 首先檢查 Channelhandler 的名字是否重復(fù)唉韭,如果不重復(fù)則調(diào)用 newContext()方法為這個(gè) Handler 創(chuàng)建一個(gè)對應(yīng)的 DefaultChannelHandlerContext 實(shí)例夜涕,并與之關(guān)聯(lián)起來( Context 中有一個(gè) handler 屬性保存著對應(yīng)的 handler 實(shí)例)。

checkMultiplicity()方法中使用一個(gè)成員變量 added 標(biāo)識一個(gè) Channel 是否已經(jīng)添加過属愤,如果當(dāng)前要添加的Handler 是非共享并且已經(jīng)添加過那就拋出異常女器,否則標(biāo)識該 Handler 已經(jīng)添加。由此可見一個(gè) Handler 如果是 Sharable 的就可以無限次被添加到 Pipeline 中住诸,我們客戶端代碼如果要讓一個(gè) Handler 被共用驾胆,只需要加一個(gè) @Sharable 標(biāo)注即。

為了添加一個(gè) handler 到 pipeline 中, 必須把此 handler 包裝成 ChannelHandlerContext贱呐。先看一下創(chuàng)建 ChannelHandlerContext 的這一句:

newCtx = newContext(group, filterName(name, handler), handler);

filterName()這個(gè)方法其實(shí)作用還是挺重要丧诺,即你再 addLast()方法中給當(dāng)前 handler 取的名字,在這里做檢查:

private String filterName(String name, ChannelHandler handler) {
    if (name == null) {
        // 1.如果傳入的name為空 則生成
        // Netty生成的name默認(rèn)為=> 簡單類名#0
        // 如果簡單類名#0已存在則將基數(shù)+1 生成name為簡單類名#1 以此遞增
        return generateName(handler);
    }
    // 2.檢查是否有重名 檢查通過則返回
    checkDuplicateName(name);
    return name;
}

這個(gè)方法用于給handler創(chuàng)建一個(gè)唯一性的名字奄薇。

繼續(xù)跟進(jìn)newContext()驳阎,在上面的代碼中我們看到新實(shí)例化了一個(gè) newCtx 對象,并將 handler 作為參數(shù)傳遞到構(gòu)造方法中。 那么 DefaultChannelHandlerContext 會做什么呢馁蒂?繼續(xù)看看:

private final ChannelHandler handler;

DefaultChannelHandlerContext(
  DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
  super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
  if (handler == null) {
    throw new NullPointerException("handler");
  }
  this.handler = handler;
}

private static boolean isInbound(ChannelHandler handler) {
  return handler instanceof ChannelInboundHandler;
}

private static boolean isOutbound(ChannelHandler handler) {
  return handler instanceof ChannelOutboundHandler;
}

上面是 DefaultChannelHandlerContext 的構(gòu)造函數(shù)呵晚,我們看到在調(diào)用父類構(gòu)造器中傳入兩個(gè)方法:

isInbound(handler)
isOutbound(handler)

從方法名稱上大概可以知道:一個(gè)是判斷當(dāng)前 handler 是否是進(jìn)入事件的處理器,另一個(gè)是判斷當(dāng)前 handler 是否是返回事件的處理器沫屡。這兩個(gè) boolean 變量會傳遞到父類 AbstractChannelHandlerContext 中, 并初始化父類的這兩個(gè)字段:inbound 與 outbound饵隙。

  • inbound 為真時(shí), 表示對應(yīng)的 ChannelHandler 實(shí)現(xiàn)了 ChannelInboundHandler 方法;
  • outbound 為真時(shí), 表示對應(yīng)的 ChannelHandler 實(shí)現(xiàn)了 ChannelOutboundHandler 方法沮脖。

插播要點(diǎn):

Netty是如何判斷ChannelHandler類型的癞季?

在添加ChannelHandler并創(chuàng)建 ChannelHandlerContext 的時(shí)候劫瞳,通過 instanceof 判斷 handler 是否是 ChannelInboundHandler 和 ChannelOutboundHanler,并將結(jié)果保存到 AbstractChannelHandlerContext 的 inbound 和 outbound 兩個(gè)boolean 變量中绷柒。

在添加ChannelHandler并創(chuàng)建ChannelHandlerContext的時(shí)候志于,通過instanceof判斷handler是否是ChannelInboundHandler和ChannelOutboundHanler,并將結(jié)果保存到AbstractChannelHandlerContext的inbound和outbound兩個(gè)boolean變量中废睦。

而我們在自定義的 ServerChannelInitializer 類中是繼承了 ChannelInitializer

public class ServerChannelInitializer  extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();

        // 字符串解碼 和 編碼
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());

        // 自己的邏輯Handler
        pipeline.addLast("handler", new HwServerHandler());
        }

}

Netty 中通過ChannelPipeline來保證 ChannelHandler 之間的處理順序伺绽。每一個(gè) Channel 對象創(chuàng)建的時(shí)候,都會自動(dòng)創(chuàng)建一個(gè)關(guān)聯(lián)的 ChannelPipeline 對象嗜湃,我們可以通過 io.netty.channel.Channel對象的pipeline()方法獲取這個(gè)對象實(shí)例奈应。

1.png

ChannelInitializer 又是實(shí)現(xiàn)ChannelInboundHandler接口,那這就意味著

DefaultChannelHandlerContext 的 inbound = true购披,outbound = false杖挣。

即在初始化 handler 的時(shí)候默認(rèn)所有的 handler 都是入棧處理器。

當(dāng)創(chuàng)建好 Context 之后刚陡,就將這個(gè)Context 插入到 Pipeline 的雙向鏈表中:

private void addLast0(AbstractChannelHandlerContext newCtx) {
  AbstractChannelHandlerContext prev = tail.prev;
  newCtx.prev = prev;
  newCtx.next = tail;
  prev.next = newCtx;
  tail.prev = newCtx;
}

以上就是新增 handler 插入pipeline 的過程惩妇,下面接著看 Pipeline 節(jié)點(diǎn)的刪除功能。

netty 有個(gè)最大的特性之一就是 Handler 可插拔筐乳,做到動(dòng)態(tài)編織 pipeline歌殃,比如在首次建立連接的時(shí)候,需要通過進(jìn)行權(quán)限認(rèn)證蝙云,在認(rèn)證通過之后氓皱,就可以將此 context 移除,下次 pipeline 在傳播事件的時(shí)候就就不會調(diào)用到權(quán)限認(rèn)證處理器勃刨。

下面是權(quán)限認(rèn)證 Handler 最簡單的實(shí)現(xiàn)波材,第一個(gè)數(shù)據(jù)包傳來的是認(rèn)證信息,如果校驗(yàn)通過身隐,就刪除此 Handler廷区,否則,直接關(guān)閉連接:

// 鑒權(quán)Handler
public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
    
    ...
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throws Exception {
        if (verify(authDataPacket)) {
            ctx.pipeline().remove(this);
        } else {
            ctx.close();
        }
    }

    private boolean verify(ByteBuf byteBuf) {
        //...
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("AuthHandler has been removed ! ");
    }
}

我們來看看 DefaultChannelPipeline 中的 remove 方法:

public class DefaultChannelPipeline implements ChannelPipeline {
    
    ...
    
    // 從Pipeline中刪除ChannelHandler
    @Override
    public final ChannelPipeline remove(ChannelHandler handler) {
        remove(getContextOrDie(handler));
        return this;
    }
    
    ...
    
    // 獲取 ChannelHandler 抡医,獲取不到就拋出異常
    private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
        AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
        if (ctx == null) {
            throw new NoSuchElementException(handler.getClass().getName());
        } else {
            return ctx;
        }
    }
    
    ...
    
    // 刪除
    private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
        // ctx不能為heand與tail
        assert ctx != head && ctx != tail;

        synchronized (this) {
            // 從pipeline中刪除ChannelHandlerContext節(jié)點(diǎn)
            remove0(ctx);
            
            // 如果為false躲因,則表明channel還沒有注冊到eventloop上
            // 在刪除這種場景下早敬,我們先添加一個(gè)Task忌傻,一旦channel注冊成功就會調(diào)用這個(gè)Task,這個(gè)Task就會立即調(diào)用ChannelHandler.handlerRemoved(...)方法搞监,來從pipeline中刪除context水孩。
            if (!registered) {
                callHandlerCallbackLater(ctx, false);
                return ctx;
            }

            EventExecutor executor = ctx.executor();
            if (!executor.inEventLoop()) {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 回調(diào) handlerRemoved 方法
                        callHandlerRemoved0(ctx);
                    }
                });
                return ctx;
            }
        }
        // 回調(diào) handlerRemoved 方法
        callHandlerRemoved0(ctx);
        return ctx;
    }
    
    ...
 
    // 刪除節(jié)點(diǎn) ChannelHandlerContext
    private static void remove0(AbstractChannelHandlerContext ctx) {
        AbstractChannelHandlerContext prev = ctx.prev;
        AbstractChannelHandlerContext next = ctx.next;
        prev.next = next;
        next.prev = prev;
    }
    
    ...
        
    private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
        // Notify the complete removal.
        try {
            try {
                // 回調(diào) handlerRemoved 方法
                // 也就是我們前面例子 AuthHandler 中的 handlerRemoved() 方法
                ctx.handler().handlerRemoved(ctx);
            } finally {
                // 設(shè)置為ctx 狀態(tài)為 REMOVE_COMPLETE 
                ctx.setRemoved();
            }
        } catch (Throwable t) {
            fireExceptionCaught(new ChannelPipelineException(
                    ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
        }
    }
    
    ...
    
}    

好了, 刪除的邏輯就分析到這里了琐驴。通過以上的分析俘种,我們先總結(jié)一下:能發(fā)現(xiàn)在 handler 初始化的時(shí)候有三個(gè)關(guān)鍵的對象:

  1. ChannelHandler
  2. ChannelPipeline
  3. ChannelHandlerContext

他們之間的關(guān)系是:ChannelHandler 通過 ChannelPipeline 來封裝一個(gè)雙向鏈表維持各個(gè) handler 關(guān)系秤标。在 ChannelPipeline 中 不能包裝 ChannelPipeline 對象,原因是處理的邏輯都是有上下文關(guān)聯(lián)的宙刘,所以封裝了 ChannelHandlerContext 對象來包裝 handler 苍姜,維護(hù)處理邏輯以及他們之間的上下文關(guān)系。

另外悬包,ChannelHandler 又分為兩個(gè)大類的事件:

  1. ChannelInboundHandler:處理輸入數(shù)據(jù)和所有類型的狀態(tài)變化
  2. ChannelOutboundHandler:處理輸出數(shù)據(jù)衙猪,可以攔截所有操作

除了分開的 輸入 和 輸出 事件以外, Netty 還提供了一個(gè) 支持同時(shí)處理輸入和輸出事件的全能處理器:ChannelDuplexHandler布近,ChannelDuplexHandler 則同時(shí)實(shí)現(xiàn)了 ChannelInboundHandler 和 ChannelOutboundHandler 接口垫释。如果一個(gè)所需的 ChannelHandler 既要處理入站事件又要處理出站事件,推薦繼承此類撑瞧。

從 ChannelPipeline 類的 doc 文檔中給的注釋看:

 * <pre>
 *                                                 I/O Request
 *                                            via {@link Channel} or
 *                                        {@link ChannelHandlerContext}
 *                                                      |
 *  +---------------------------------------------------+---------------+
 *  |                           ChannelPipeline         |               |
 *  |                                                  \|/              |
 *  |    +---------------------+            +-----------+----------+    |
 *  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  .               |
 *  |               .                                   .               |
 *  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
 *  |        [ method call]                       [method call]         |
 *  |               .                                   .               |
 *  |               .                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  +---------------+-----------------------------------+---------------+
 *                  |                                  \|/
 *  +---------------+-----------------------------------+---------------+
 *  |               |                                   |               |
 *  |       [ Socket.read() ]                    [ Socket.write() ]     |
 *  |                                                                   |
 *  |  Netty Internal I/O Threads (Transport Implementation)            |
 *  +-------------------------------------------------------------------+

inbound 事件和 outbound 事件的流向是不一樣的棵譬,inbound 的傳遞方式是通過調(diào)用相應(yīng)的 ChannelHandlerContext.fireIN_EVT() 方法, 而 outbound 方法的的傳遞方式是通過調(diào)用 ChannelHandlerContext.OUT_EVT() 方法预伺。

inbound 事件

ChannelInboundHandler 中有如下方法:

5.png

ChannelInboundHandler 中的方法具體說明見下表:

方法名 方法說明
channelInactive 激活事件订咸,綁定端口成功后調(diào)用pipeline.fireChannelActive()
channelInactive 非激活事件,連接關(guān)閉后調(diào)用pipeline.fireChannelInactive()
channelRead 讀事件扭屁,channel 有數(shù)據(jù)時(shí)調(diào)用pipeline.fireChannelRead()
channelReadComplete 讀完事件算谈,channel 讀完之后調(diào)用pipeline.fireChannelReadComplete()
channelRegistered 注冊事件,channel 注冊到 EventLoop 上后調(diào)用料滥,例如服務(wù)崗啟動(dòng)時(shí)pipeline.fireChannelRegistered()
channelUnregistered 注銷事件然眼,channel 從 EventLoop 上注銷后調(diào)用,例如關(guān)閉連接成功后pipeline.fireChannelUnregistered()
channelWritabilityChanged 可寫狀態(tài)變更事件葵腹,當(dāng)一個(gè) Channel 的可寫的狀態(tài)發(fā)生改變的時(shí)候執(zhí)行高每,可以保證寫的操作不要太快,防止 OOM践宴,pipeline.fireChannelWritabilityChanged()
exceptionCaught 異常事件 說明:我們可以看出鲸匿,Inbound 事件都是由 I/O 線程觸發(fā),用戶實(shí)現(xiàn)部分關(guān)注的事件被動(dòng)調(diào)用
userEventTriggered 用戶事件觸發(fā)阻肩,例如心跳檢測ctx.fireUserEventTriggered(evt)

我們可以看出带欢,Inbound 事件都是由I/O線程觸發(fā),Netty 為了更好的處理 channel 中的數(shù)據(jù),給 JDK 原生的channel 添加了 pipeline 組件烤惊,Netty 會把原生 JDK 的 channel 中的數(shù)據(jù)導(dǎo)向這個(gè) pipeline乔煞,從 pipeline 中的 header 開始往下傳播,用戶對這個(gè)過程擁有百分百的控制權(quán)柒室,可以把數(shù)據(jù)拿出來處理也可以往下傳播渡贾。一直傳播到tail節(jié)點(diǎn),tail節(jié)點(diǎn)會進(jìn)行回收雄右。如果在傳播的過程中最終沒到尾節(jié)點(diǎn)自己也沒回收就會面臨內(nèi)存泄露的問題空骚。

所以對于 inbound 事件纺讲,操作者是有 100% 的主動(dòng)權(quán)來控制應(yīng)該做什么,在什么時(shí)候做什么囤屹。下面以channelRead() 事件的傳播為例熬甚,來分析 inbound 事件的傳播。

我們在客戶端新增3個(gè) handler 用于處理事件:

public class InBoundHandlerA extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InBoundHandlerA: " + msg);
        ctx.fireChannelRead(msg);
    }
}
public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InBoundHandlerB: " + msg);
        ctx.fireChannelRead(msg);
    }
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.channel().pipeline().fireChannelRead("hello world");  // 1
              // ctx.fireChannelRead("hello world"); // 2
    }
}
public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InBoundHandlerC: " + msg);
        ctx.fireChannelRead(msg);
    }
}

此刻客戶端處理事件的順序?yàn)椋?/p>

7.jpg

在新連接接入時(shí)肋坚,會先回調(diào) channelActive()方法则涯,此時(shí) InBoundHandlerB 的 channelActive() 方法得到執(zhí)行,觸發(fā)客戶端 pipeline.fireChannelRead()方法冲簿,將 channlRead 事件傳播至 pipeline粟判。在實(shí)際工作中一般是由 NioEventLoop 輪詢到讀IO事件,并觸發(fā)NioByteUnsafe.read()操作峦剔。

這里為了分析方便档礁,使用 InBoundHandlerB 的 channelActive() 方法模擬觸發(fā)客戶端 channel 讀取到數(shù)據(jù)并傳播至 pipeline 的邏輯,并分為兩種情況分析:

  • ctx.channel().pipeline().fireChannelRead("hello world");: 調(diào)用 pipeline#fireChannelRead()方法傳播事件吝沫;
  • ctx.fireChannelRead("hello world");: 調(diào)用 ChannelHandlerContext#fireChannelRead() 方法傳播事件呻澜。

分析兩種觸發(fā)方式的區(qū)別。

  1. ctx.channel().pipeline().fireChannelRead("hello world");: 調(diào)用 pipeline#fireChannelRead()方法傳播事件惨险。
// DefaultChannelPipeline
public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}
// AbstractChannelHandlerContext
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}
// HeadContext
private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

DefaultChannelPipeline.fireChannelRead()方法首先調(diào)用到 HeadContext.channelRead() 方法:

// HeadContext
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.fireChannelRead(msg);
}

HeadContext.channelRead()方法將事件往后傳播:

// HeadContext
public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}
private AbstractChannelHandlerContext findContextInbound() {
  // 遍歷鏈表
    AbstractChannelHandlerContext ctx = this; 
    do {
      // 往后查找inbound handler
        ctx = ctx.next; 
    } while (!ctx.inbound);
    return ctx;
}

此時(shí)找到 InBoundHandlerA羹幸,并調(diào)用invokeChannelRead()方法:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}
private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

就這樣,InBoundHandlerA 的 channelRead()方法就會回調(diào)到辫愉。類似 InBoundHandlerC栅受、InBoundHandlerB的 channelRead()方法也會回調(diào)到。最終 channelRead()事件到達(dá) TailContext:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    onUnhandledInboundMessage(msg);
}
protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug(
                "Discarded inbound message {} that reached at the tail of the pipeline. " +
                        "Please check your pipeline configuration.", msg);
    } finally {
        ReferenceCountUtil.release(msg);
    }
}

根據(jù)上面的分析可知恭朗,通過 pipleline 觸發(fā) inbound 事件傳播時(shí)屏镊,從 HeadContext 開始傳播。對于 inbound 事件痰腮,會按照 ChannelInboundHandler 添加的順序處理該事件而芥,HeadContext 首先處理該事件,然后依次傳遞到 pipeline 中的 ChannelInboundHandler 中膀值。

  1. ctx.fireChannelRead("hello world");: 調(diào)用 ChannelHandlerContext#fireChannelRead()方法傳播事件棍丐。
public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

通過調(diào)用 ChannelHandlerContext#fireChannelRead()方法傳播 channelRead 事件時(shí),直接查找到當(dāng)前節(jié)點(diǎn)的下一個(gè) inbound節(jié)點(diǎn)沧踏,將事件傳播至該節(jié)點(diǎn)歌逢,不會從 HeadContext 開始傳遞。

以上就是 inbound 事件的大概傳播過程悦冀,我們要注意一點(diǎn):inbound 事件是按照 addLast()的添加順序依次執(zhí)行的趋翻。下面接著看 outbound 事件睛琳。

outbound事件

ChannelOutboundHandler 中有如下方法:

4.png

ChannelOutboundHandler 中的事件看起來見名知意:

方法名 方法說明
bind 綁定端口
close 關(guān)閉 channel
connect 用于客戶端盒蟆,連接一個(gè)遠(yuǎn)程機(jī)器
disconnect 關(guān)閉遠(yuǎn)程連接
deregister 執(zhí)行斷開連接 disconnect 操作后調(diào)用踏烙,將 channelEventLoop 中注銷
flush 將通道排隊(duì)的數(shù)據(jù)刷新到遠(yuǎn)程機(jī)器上
read 用于新接入連接時(shí),注冊成功多路復(fù)用器上后历等,修改監(jiān)聽為 OP_READ 操作位
write 向通道寫數(shù)據(jù)

從 ChannelOutboundHandler 接口的定義可以看出讨惩,outbound 事件包括端口綁定 bind、連接 connect寒屯、斷開連接 disconnect荐捻、關(guān)閉連接 close、取消 channel 在 EventLoop 的注冊寡夹、讀寫數(shù)據(jù)处面、刷新數(shù)據(jù)等。這些操作一般都是由用戶主動(dòng)觸發(fā)的菩掏,這與 inbound 事件(如 channelRead)被動(dòng)觸發(fā)的情況不同魂角。

下面以write事件的傳播為例,來分析outbound事件的傳播細(xì)節(jié)智绸。在服務(wù)端新增3個(gè)事件處理器:

public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("OutBoundHandlerA: " + msg);
        ctx.write(msg, promise);
    }
}
public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("OutBoundHandlerB: " + msg);
        ctx.write(msg, promise);
    }
    public void handlerAdded(final ChannelHandlerContext ctx) {
        // 定時(shí)任務(wù)野揪。模擬用戶寫操作
        ctx.executor().schedule(() -> { 
            ctx.channel().write("hello world"); // 1
                // ctx.write("hello world"); // 2
        }, 3, TimeUnit.SECONDS);
    }
}
public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter {
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("OutBoundHandlerC: " + msg);
        ctx.write(msg, promise);
    }
}

此時(shí)客戶端 channel pipeline 的結(jié)構(gòu)如下圖:

8.jpg

還記得我們上面看 ChannelPipeline 的 doc 分析, 入站事件是從前到后的順序執(zhí)行瞧栗,出站事件是從后往前執(zhí)行的嗎斯稳?按照這邏輯,可以看到在新連接接入時(shí)迹恐,會先回調(diào) OutBoundHandlerB#handlerAdded()方法挣惰,該方法會調(diào)度一個(gè)定時(shí)任務(wù),模擬用戶觸發(fā)的寫操作殴边,將 write 事件傳播至 pipeline通熄。

這里分兩種情況進(jìn)行分析,介紹兩種觸發(fā)方式的區(qū)別:

  • ctx.channel().write("hello world");: 調(diào)用 channel(也即pipeline)#write()方法傳播事件找都;
  • ctx.write("hello world");: 調(diào)用 ChannelHandlerContext#write()方法傳播事件唇辨。
  1. ctx.channel().write("hello world");: 調(diào)用 channel(也即pipeline)#write()方法傳播事件。
// DefaultChannelPipeline
public final ChannelFuture write(Object msg) {
    return tail.write(msg);
}
// TailContext
public ChannelFuture write(Object msg) {
    return write(msg, newPromise());
}
// TailContext
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
    try {
        if (!validatePromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            // cancelled
            return promise;
        }
    } catch (RuntimeException e) {
        ReferenceCountUtil.release(msg);
        throw e;
    }
    // 這里
    write(msg, false, promise); 

    return promise;
}
// TailContext
private void write(Object msg, boolean flush, ChannelPromise promise) {
    // 找到下一個(gè)outbound handler
    AbstractChannelHandlerContext next = findContextOutbound(); 
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) { 
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise); 
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

執(zhí)行 ctx.channel().write("hello world");時(shí)能耻,會調(diào)用到 TailContxt.write()方法赏枚。TailContxt.write()方法中首先找出下一個(gè) outbound handler:

// TailContext
private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        // 反向遍歷
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}

findContextOutbound()方法通過鏈表反向遍歷的方式查找下一個(gè) outbound handler,這里是找到了OutBoundHandlerB晓猛,并調(diào)用 OutBoundHandlerB.invokeWrite()方法饿幅。

// OutBoundHandlerB對應(yīng)的ChannelHandlerContext
private void invokeWrite(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
    } else {
        write(msg, promise);
    }
}
// OutBoundHandlerB對應(yīng)的ChannelHandlerContext
private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}
// OutBoundHandlerB
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    System.out.println("OutBoundHandlerB: " + msg);
    ctx.write(msg, promise);
}

在調(diào)用完 OutBoundHandlerB.write() 方法后,通過ctx.write(msg, promise);繼續(xù)傳播事件:

// OutBoundHandlerB對應(yīng)的ChannelHandlerContext
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
    try {
        if (!validatePromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            // cancelled
            return promise;
        }
    } catch (RuntimeException e) {
        ReferenceCountUtil.release(msg);
        throw e;
    }
    write(msg, false, promise);

    return promise;
}
// OutBoundHandlerB對應(yīng)的ChannelHandlerContext
private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

跟上面類似的邏輯戒职,調(diào)用ctx.write(msg, promise);時(shí)直接查找下一個(gè) outbound handler栗恩,這里是OutBoundHandlerC。接下來是通過next.invokeWrite(m, promise);調(diào)用 OutBoundHandlerC.write()方法洪燥,與上面相同磕秤。就這樣乳乌,write 事件將傳播至 HeadContext。

// HeadContext
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

HeadContext 將調(diào)用 NioByteUnsafe.write()方法市咆,最終處理這個(gè)寫出的數(shù)據(jù):

public final void write(Object msg, ChannelPromise promise) {

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
        ReferenceCountUtil.release(msg);
        return;
    }

    int size;
    try {
        // mas轉(zhuǎn)換汉操,比如堆內(nèi)存轉(zhuǎn)為直接內(nèi)存
        msg = filterOutboundMessage(msg); 
        // 計(jì)算消息的大小
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }
        // 消息添加到ChannelOutboundBuffer
    outboundBuffer.addMessage(msg, size, promise); 
}

根據(jù)上面的分析可知,通過 channel(也即pipleline)觸發(fā) outbound 事件傳播時(shí)蒙兰,從 TailContext 開始傳播磷瘤。對于 outbound 事件,會按照 ChannelOutboundHandler 添加的順序逆序處理該事件搜变,TailContext 由于是 inbound 類型的 ChannelHandler采缚,它直接將 outbound 事件傳播至下一個(gè) outbound 節(jié)點(diǎn),然后逐漸傳遞到 pipeline 中的 HeadContext 節(jié)點(diǎn)挠他,最終事件由 HeadContext 節(jié)點(diǎn)處理仰担。

  1. ctx.write("hello world");: 調(diào)用 ChannelHandlerContext#write()方法傳播事件。
// OutBoundHandlerB對應(yīng)的ChannelHandlerContext
public ChannelFuture write(Object msg) {
    return write(msg, newPromise());
}
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
   
    // ... 省略
  
    write(msg, false, promise);

    return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
    // 直接查找下一個(gè)outbound handler
    AbstractChannelHandlerContext next = findContextOutbound(); 
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        } 
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

調(diào)用 ChannelHandlerContext#write()方法傳播 outbound 事件時(shí)绩社,直接從當(dāng)前節(jié)點(diǎn)開始反向遍歷 context 鏈表摔蓝,查找下一個(gè) outbound handler,并調(diào)用其 write 方法愉耙,然后將 write 事件傳播至 HeadContext贮尉。outbound事件傳播我們就先分析到這里。

記住一個(gè)要點(diǎn)就是:Outbound類型的事件是從鏈表的 tail 開始傳播的,所以執(zhí)行的順序和我們的添加進(jìn)去的順序相反朴沿。

異常事件的傳播

上面都是正常的事件猜谚,那如果是發(fā)生異常的情況,異常會以什么樣的方式傳播下去呢赌渣?我們來寫個(gè)demo魏铅。

在服務(wù)端添加如下的 handler:

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
  .channel(NioServerSocketChannel.class)
  .childOption(ChannelOption.TCP_NODELAY, true)
  .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) {
      ch.pipeline().addLast(new InBoundHandlerA());
      ch.pipeline().addLast(new InBoundHandlerB());
      ch.pipeline().addLast(new InBoundHandlerC());
      ch.pipeline().addLast(new OutBoundHandlerA());
      ch.pipeline().addLast(new OutBoundHandlerB());
      ch.pipeline().addLast(new OutBoundHandlerC());
    }
  });

Handler 如下:

public class InBoundHandlerA extends ChannelInboundHandlerAdapter {
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("InBoundHandlerA.exceptionCaught()");
        ctx.fireExceptionCaught(cause);
    }
}
public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 拋出異常
        throw new RunTimeException("from InBoundHandlerB"); 
    }
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("InBoundHandlerB.exceptionCaught()");
        ctx.fireExceptionCaught(cause);
    }
}
public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("InBoundHandlerC.exceptionCaught()");
        ctx.fireExceptionCaught(cause);
    }
}
public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("OutBoundHandlerA.exceptionCaught()");
        ctx.fireExceptionCaught(cause);
    }
}
public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("OutBoundHandlerB.exceptionCaught()");
        ctx.fireExceptionCaught(cause);
    }
}
public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter {
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("OutBoundHandlerC.exceptionCaught()");
        ctx.fireExceptionCaught(cause);
    }
}
public class BusinessException extends Exception {
    public BusinessException(String message) {
        super(message);
    }
}

從 InBoundHandlerB 的定義可以看出,在接收到 channelRead 事件時(shí)將拋出 RunTimeException坚芜,這種情況模擬了 inbound 事件在 pipeline 傳播以及處理過程中發(fā)生的異常览芳。下面先來分析 inbound 事件傳播過程中發(fā)生異常時(shí),異常事件傳播的細(xì)節(jié)鸿竖。

1.inbound事件傳播過程中發(fā)生異常

假設(shè) channel 讀取到了一定數(shù)據(jù)沧竟,并回調(diào)了InBoundHandlerB.channelRead()方法,此時(shí)拋出RunTimeException 異常:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    throw new BusinessException("from InBoundHandlerB");
}
private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            // handler(): InBoundHandlerB
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

此時(shí)將進(jìn)入notifyHandlerException(t);:

private void notifyHandlerException(Throwable cause) {
    if (inExceptionCaught(cause)) {
        if (logger.isWarnEnabled()) {
            logger.warn(
                    "An exception was thrown by a user handler " +
                            "while handling an exceptionCaught event", cause);
        }
        return;
    }

    invokeExceptionCaught(cause);
}

notifyHandlerException()方法直接調(diào)用invokeExceptionCaught(cause);傳播異常事件:

private void invokeExceptionCaught(final Throwable cause) {
    if (invokeHandler()) {
        try {
            handler().exceptionCaught(this, cause);
        } catch (Throwable error) {
            // ...
        }
    } else {
        fireExceptionCaught(cause);
    }
}

在 inbound 事件傳播過程中發(fā)生異常時(shí)缚忧,首先調(diào)用發(fā)生異常所在 handler 的 exceptionCaught 方法悟泵,即 InBoundHandlerB.exceptionCaught()

// InBoundHandlerB
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    System.out.println("InBoundHandlerB.exceptionCaught()");
    ctx.fireExceptionCaught(cause);
}

然后調(diào)用ctx.fireExceptionCaught(cause);繼續(xù)傳播異常事件:

// InBoundHandlerB對應(yīng)的ChannelHandlerContext
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
    // next: InBoundHandlerC
     // 直接調(diào)用next節(jié)點(diǎn)的exceptionCaught方法
    invokeExceptionCaught(next, cause);
    return this;
}
static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
    ObjectUtil.checkNotNull(cause, "cause");
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeExceptionCaught(cause);
    } else {
        try {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeExceptionCaught(cause);
                }
            });
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to submit an exceptionCaught() event.", t);
                logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
            }
        }
    }
}
private void invokeExceptionCaught(final Throwable cause) {
    if (invokeHandler()) {
        try {
            handler().exceptionCaught(this, cause); // InBoundHandlerC
        } catch (Throwable error) {
            // ..
        }
    } else {
        fireExceptionCaught(cause);
    }
}

可以看到,在異常發(fā)生節(jié)點(diǎn) InBoundHandlerB 繼續(xù)傳播事件時(shí)闪水,是直接調(diào)用了 InBoundHandlerB 對應(yīng) context 節(jié)點(diǎn)的 next 節(jié)點(diǎn)InBoundHandlerC#exceptionCaught() 方法糕非,而不管下一個(gè)節(jié)點(diǎn)是 inbound 還是 outbound 類型。異常事件按順序經(jīng)過 InBoundHandlerB、InBoundHandlerC朽肥、OutBoundHandlerA禁筏、OutBoundHandlerB、OutBoundHandlerC鞠呈,最終到達(dá) TailContext:

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    onUnhandledInboundException(cause);
}
protected void onUnhandledInboundException(Throwable cause) {
    try {
        logger.warn(
                "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
                        "It usually means the last handler in the pipeline did not handle the exception.",
                cause);
    } finally {
        ReferenceCountUtil.release(cause);
    }
}

如果 TailContext 之前的 handler 都未處理該異常事件,在 TailContext 將以 warn 日志的方式記錄該異常信息右钾,并釋放內(nèi)存蚁吝。

2.outbound事件傳播過程中發(fā)生異常

下面以channel.writeAndFlush()事件的傳播為例,分析 outbound 事件傳播過程中發(fā)生異常時(shí)舀射,異常事件的傳播細(xì)節(jié)窘茁。

// AbstractChannelHandlerContext
private void write(Object msg, boolean flush, ChannelPromise promise) {
         // 某個(gè)outbound handler
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise); 
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        // 異常捕獲
        invokeFlush0(); 
    } else {
        writeAndFlush(msg, promise);
    }
}

假設(shè) writeAndFlush 事件傳播至 context 鏈表中的某個(gè)節(jié)點(diǎn),因此將調(diào)用以上的invokeWriteAndFlush() 方法脆烟。繼續(xù)看 invokeWrite0(msg, promise);中的邏輯:

private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}
private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
    if (!(promise instanceof VoidChannelPromise)) {
        // promise設(shè)置為失敗
        PromiseNotificationUtil.tryFailure(promise, cause, logger); 
    }
}

可見,如果在 ChannelOutboundHandler.write() 方法中發(fā)生異常,只是調(diào)用notifyOutboundHandlerException()方法倔矾,將promise設(shè)置為失敗狀態(tài)浮还,不拋出任何異常。

再來看invokeFlush0();的邏輯:

private void invokeFlush0() {
    try {
        ((ChannelOutboundHandler) handler()).flush(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}

可見拜鹤,如果在ChannelOutboundHandler.flush()方法中發(fā)生異常框冀,將調(diào)用notifyHandlerException()方法:

private void notifyHandlerException(Throwable cause) {
    // ...省略

    invokeExceptionCaught(cause);
}
 private void invokeExceptionCaught(final Throwable cause) {
    if (invokeHandler()) {
        try {
            handler().exceptionCaught(this, cause);
        } catch (Throwable error) {
            // 省略...
        }
    } else {
        fireExceptionCaught(cause);
    }
}

同樣的,會觸發(fā)異常事件從當(dāng)前節(jié)點(diǎn)向后傳播敏簿,最后到達(dá)TailContext明也。

插播要點(diǎn):

對于ChannelHandler的添加應(yīng)該遵循什么樣的順序?

對于 inbound 事件的傳播惯裕,事件的處理順序與 ChannelInboundHandler 的添加順序相同温数;

對于 outbound 事件的傳播,事件的處理順序與 ChannelOutboundHandler 的添加順序相反蜻势。對于異常事件的傳播撑刺,事件的處理順序與 ChannelHandler 的添加順序相同,與 inbound握玛、outbound 無關(guān)猜煮。

用戶手動(dòng)觸發(fā)事件傳播,不同的觸發(fā)方式有什么樣的區(qū)別败许?
  1. ctx.channel.xxx(): 對于 inbound 事件王带,從 pipeline 頭部節(jié)點(diǎn) head 開始傳播;對于 outbound 事件市殷,從 pipeline尾 部節(jié)點(diǎn) tail 開始傳播愕撰。
  2. ctx.xxx() : 對于 inbound 事件,從當(dāng)前節(jié)點(diǎn)下一節(jié)點(diǎn)開始傳播(指向尾部tail);對于 outbound 事件搞挣,從當(dāng)前節(jié)點(diǎn)下一節(jié)點(diǎn)開始傳播(指向頭部head)带迟。

xxx()方法指 fireChannelRead、write 等方法囱桨。

無論我們是調(diào)用 inbound 中的方法仓犬,還是 outbound 中的方法,當(dāng)你在某一個(gè) handler 中捕獲了一個(gè)事件舍肠,如果你還想繼續(xù)讓該事件傳下去搀继,那么你就應(yīng)該在處理邏輯之后調(diào)用 ChannelHandlerContext 繼續(xù)將事件傳遞下去。

這里再重新畫一張完整的圖翠语,展示EventLoop叽躯,Channel,以及 handler 的關(guān)系:

6.png

3. 自定義 handler 如何實(shí)現(xiàn)

Netty 自身提供了一些 handler肌括,包括編解碼處理点骑,心跳檢測,超時(shí)檢測等等谍夭。那如果是我們自定義的處理器改如何實(shí)現(xiàn)呢黑滴?

拿 inbound 事件來舉例,Netty 提供兩種方式供我們實(shí)現(xiàn) ChannelHandler紧索,一種是繼承 ChannelInboundHandlerAdapter跷跪,一種是繼承 SimpleChannelInboundHandler

// ChannelInboundHandlerAdapter
public class ServerHandler1 extends ChannelInboundHandlerAdapter {
    // todo
}


// SimpleChannelInboundHandler
public class ServerHandler2 extends SimpleChannelInboundHandler<Object> {
   // todo
}

這兩個(gè)類從名字上可知道大概的區(qū)別齐板,ChannelInboundHandlerAdapter相當(dāng)于是一個(gè)簡寫版吵瞻,SimpleChannelInboundHandler是完整版,各取所需甘磨。SimpleChannelInboundHandler 是一個(gè)泛型類橡羞,而 ChannelInboundHandlerAdapter 并不是一個(gè)泛型類。這里的泛型主要用于消息格式轉(zhuǎn)換使用济舆。在使用 ChannelInboundHandlerAdapter 進(jìn)行操作時(shí)卿泽,我們需要自己轉(zhuǎn)換消息格式,而 SimpleChannelInboundHandler 就是 Netty 幫我們轉(zhuǎn)換消息格式滋觉,只要我們在泛型中聲明轉(zhuǎn)換的類型签夭,并且這個(gè)類型支持被轉(zhuǎn)換,那么我們就可以使用這個(gè)轉(zhuǎn)換后的消息進(jìn)行操作椎侠。

本節(jié)我們就到這里打住第租,沒說到的也不說了,口干舌燥我纪!自己有點(diǎn)暈菜了慎宾。代碼太繞丐吓,大家努力學(xué)習(xí)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末趟据,一起剝皮案震驚了整個(gè)濱河市券犁,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌汹碱,老刑警劉巖粘衬,帶你破解...
    沈念sama閱讀 219,039評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異咳促,居然都是意外死亡稚新,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評論 3 395
  • 文/潘曉璐 我一進(jìn)店門等缀,熙熙樓的掌柜王于貴愁眉苦臉地迎上來枷莉,“玉大人娇昙,你說我怎么就攤上這事尺迂。” “怎么了冒掌?”我有些...
    開封第一講書人閱讀 165,417評論 0 356
  • 文/不壞的土叔 我叫張陵噪裕,是天一觀的道長。 經(jīng)常有香客問我股毫,道長膳音,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,868評論 1 295
  • 正文 為了忘掉前任铃诬,我火速辦了婚禮祭陷,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘趣席。我一直安慰自己兵志,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,892評論 6 392
  • 文/花漫 我一把揭開白布宣肚。 她就那樣靜靜地躺著想罕,像睡著了一般。 火紅的嫁衣襯著肌膚如雪霉涨。 梳的紋絲不亂的頭發(fā)上按价,一...
    開封第一講書人閱讀 51,692評論 1 305
  • 那天,我揣著相機(jī)與錄音笙瑟,去河邊找鬼楼镐。 笑死,一個(gè)胖子當(dāng)著我的面吹牛往枷,可吹牛的內(nèi)容都是我干的鸠蚪。 我是一名探鬼主播今阳,決...
    沈念sama閱讀 40,416評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼茅信!你這毒婦竟也來了盾舌?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,326評論 0 276
  • 序言:老撾萬榮一對情侶失蹤蘸鲸,失蹤者是張志新(化名)和其女友劉穎妖谴,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體酌摇,經(jīng)...
    沈念sama閱讀 45,782評論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡膝舅,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,957評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了窑多。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片仍稀。...
    茶點(diǎn)故事閱讀 40,102評論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖埂息,靈堂內(nèi)的尸體忽然破棺而出技潘,到底是詐尸還是另有隱情,我是刑警寧澤千康,帶...
    沈念sama閱讀 35,790評論 5 346
  • 正文 年R本政府宣布享幽,位于F島的核電站,受9級特大地震影響拾弃,放射性物質(zhì)發(fā)生泄漏值桩。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,442評論 3 331
  • 文/蒙蒙 一豪椿、第九天 我趴在偏房一處隱蔽的房頂上張望奔坟。 院中可真熱鬧,春花似錦搭盾、人聲如沸咳秉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽滴某。三九已至,卻和暖如春滋迈,著一層夾襖步出監(jiān)牢的瞬間霎奢,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評論 1 272
  • 我被黑心中介騙來泰國打工饼灿, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留幕侠,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,332評論 3 373
  • 正文 我出身青樓碍彭,卻偏偏與公主長得像晤硕,于是被迫代替她去往敵國和親悼潭。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,044評論 2 355