Spring Webflux 源碼閱讀之 socket包

Package org.springframework.web.reactive.socket

反應(yīng)性WebSocket交互的抽象和支持類该互。

WebSocketHandler

一個(gè)WebSocket會(huì)話處理程序聪轿。

public interface WebSocketHandler {

/**
 * 返回此處理程序支持的子協(xié)議列表。
 * 默認(rèn)情況下返回一個(gè)空列表笛质。
 */
default List<String> getSubProtocols() {
    return Collections.emptyList();
}

/**
 * 處理WebSocket會(huì)話。
 * @param session the session to handle
 * @return completion {@code Mono<Void>} to indicate the outcome of the
 * WebSocket session handling.
 */
Mono<Void> handle(WebSocketSession session);

}

WebSocketSession

表示具有反應(yīng)流輸入和輸出的WebSocket會(huì)話捞蚂。

在服務(wù)器端妇押,可以通過(guò)將請(qǐng)求映射到WebSocketHandler來(lái)處理WebSocket會(huì)話,并確保在Spring配置中注冊(cè)了WebSocketHandlerAdapter策略姓迅。在客戶端敲霍,可以將WebSocketHandler提供給WebSocketClient。

public interface WebSocketSession {

/**
 * Return the id for the session.
 * 返回會(huì)話的ID丁存。
 */
String getId();

/**
 *從握手請(qǐng)求中返回信息肩杈。
 * Return information from the handshake request.
 */
HandshakeInfo getHandshakeInfo();

/**
 * 返回一個(gè)DataBuffer Factory來(lái)創(chuàng)建消息有效載荷。
 * Return a {@code DataBuffer} Factory to create message payloads.
 * @return the buffer factory for the session
 */
DataBufferFactory bufferFactory();

/**
 * 獲取傳入消息的流解寝。
 * Get the flux of incoming messages.
 */
Flux<WebSocketMessage> receive();

/**
 * 將給定的消息寫(xiě)入WebSocket連接扩然。
 * Write the given messages to the WebSocket connection.
 * @param messages the messages to write
 */
Mono<Void> send(Publisher<WebSocketMessage> messages);

/**
 * 用CloseStatus.NORMAL關(guān)閉WebSocket會(huì)話。
 * Close the WebSocket session with {@link CloseStatus#NORMAL}.
 */
default Mono<Void> close() {
    return close(CloseStatus.NORMAL);
}

/**
 * 關(guān)閉具有給定狀態(tài)的WebSocket會(huì)話编丘。
 * Close the WebSocket session with the given status.
 * @param status the close status
 */
Mono<Void> close(CloseStatus status);


// WebSocketMessage factory methods

/**Factory方法使用會(huì)話的bufferFactory()創(chuàng)建文本W(wǎng)ebSocketMessag * e与学。
 */
WebSocketMessage textMessage(String payload);

/**
  * Factory方法使用會(huì)話的bufferFactory()創(chuàng)建二進(jìn)制WebSocketMes* sage彤悔。
 */
WebSocketMessage binaryMessage(Function<DataBufferFactory, DataBuffer> payloadFactory);

/**
 * 工廠方法使用會(huì)話的bufferFactory()創(chuàng)建一個(gè)ping  *WebSocketMessage。
 */
WebSocketMessage pingMessage(Function<DataBufferFactory, DataBuffer> payloadFactory);

/**
 * Factory方法使用會(huì)話的bufferFactory()創(chuàng)建pong *WebSocketMessage索守。
 */
WebSocketMessage pongMessage(Function<DataBufferFactory, DataBuffer> payloadFactory);

}

WebSocketMessage

WebSocket消息的表示晕窑。

請(qǐng)參閱WebSocketSession中的靜態(tài)工廠方法抹沪,以便為會(huì)話創(chuàng)建帶有DataBufferFactory的消息基括。

public class WebSocketMessage {

private final Type type;

private final DataBuffer payload;


/**
 * WebSocketMessage的構(gòu)造函數(shù).
 * 請(qǐng)參閱WebSocketSession中的靜態(tài)工廠方法,或使用 WebSocketSession.bufferFactory()創(chuàng)建有效內(nèi)容野来,然后調(diào)用此構(gòu)造函       數(shù)截汪。
 */
public WebSocketMessage(Type type, DataBuffer payload) {
    Assert.notNull(type, "'type' must not be null");
    Assert.notNull(payload, "'payload' must not be null");
    this.type = type;
    this.payload = payload;
}


/**
 * Return the message type (text, binary, etc).
 */
public Type getType() {
    return this.type;
}

/**
 * Return the message payload.
 */
public DataBuffer getPayload() {
    return this.payload;
}

/**
 * Return the message payload as UTF-8 text. This is a useful for text
 * WebSocket messages.
 */
public String getPayloadAsText() {
    byte[] bytes = new byte[this.payload.readableByteCount()];
    this.payload.read(bytes);
    return new String(bytes, StandardCharsets.UTF_8);
}

/**
 * 保留消息有效載荷的數(shù)據(jù)緩沖區(qū)疾牲,這在運(yùn)行時(shí)(例如Netty)和池緩沖區(qū)中很有用。一個(gè)快捷方式:
 */
public WebSocketMessage retain() {
    DataBufferUtils.retain(this.payload);
    return this;
}

/**
釋放在運(yùn)行時(shí)(如Netty)使用池緩沖區(qū)(如Netty)有用的有效載荷DataBuffer衙解。一個(gè)快捷方式:
 */
public void release() {
    DataBufferUtils.release(this.payload);
}


@Override
public boolean equals(Object other) {
    if (this == other) {
        return true;
    }
    if (!(other instanceof WebSocketMessage)) {
        return false;
    }
    WebSocketMessage otherMessage = (WebSocketMessage) other;
    return (this.type.equals(otherMessage.type) &&
            ObjectUtils.nullSafeEquals(this.payload, otherMessage.payload));
}

@Override
public int hashCode() {
    return this.type.hashCode() * 29 + this.payload.hashCode();
}


/**
 * WebSocket 消息類型.
 */
public enum Type { TEXT, BINARY, PING, PONG }

}

CloseStatus

表示W(wǎng)ebSocket“關(guān)閉”的狀態(tài)碼和原因阳柔。 1xxx范圍內(nèi)的狀態(tài)碼由協(xié)議預(yù)先定義。


public final class CloseStatus {

    /**
     * "1000 indicates a normal closure, meaning that the purpose for which the connection
     * was established has been fulfilled."
     */
    public static final CloseStatus NORMAL = new CloseStatus(1000);

    /**
     * "1001 indicates that an endpoint is "going away", such as a server going down or a
     * browser having navigated away from a page."
     */
    public static final CloseStatus GOING_AWAY = new CloseStatus(1001);

    /**
     * "1002 indicates that an endpoint is terminating the connection due to a protocol
     * error."
     */
    public static final CloseStatus PROTOCOL_ERROR  = new CloseStatus(1002);

    /**
     * "1003 indicates that an endpoint is terminating the connection because it has
     * received a type of data it cannot accept (e.g., an endpoint that understands only
     * text data MAY send this if it receives a binary message)."
     */
    public static final CloseStatus NOT_ACCEPTABLE = new CloseStatus(1003);

    // 10004: Reserved.
    // The specific meaning might be defined in the future.

    /**
     * "1005 is a reserved value and MUST NOT be set as a status code in a Close control
     * frame by an endpoint. It is designated for use in applications expecting a status
     * code to indicate that no status code was actually present."
     */
    public static final CloseStatus NO_STATUS_CODE = new CloseStatus(1005);

    /**
     * "1006 is a reserved value and MUST NOT be set as a status code in a Close control
     * frame by an endpoint. It is designated for use in applications expecting a status
     * code to indicate that the connection was closed abnormally, e.g., without sending
     * or receiving a Close control frame."
     */
    public static final CloseStatus NO_CLOSE_FRAME = new CloseStatus(1006);

    /**
     * "1007 indicates that an endpoint is terminating the connection because it has
     * received data within a message that was not consistent with the type of the message
     * (e.g., non-UTF-8 [RFC3629] data within a text message)."
     */
    public static final CloseStatus BAD_DATA = new CloseStatus(1007);

    /**
     * "1008 indicates that an endpoint is terminating the connection because it has
     * received a message that violates its policy. This is a generic status code that can
     * be returned when there is no other more suitable status code (e.g., 1003 or 1009)
     * or if there is a need to hide specific details about the policy."
     */
    public static final CloseStatus POLICY_VIOLATION = new CloseStatus(1008);

    /**
     * "1009 indicates that an endpoint is terminating the connection because it has
     * received a message that is too big for it to process."
     */
    public static final CloseStatus TOO_BIG_TO_PROCESS = new CloseStatus(1009);

    /**
     * "1010 indicates that an endpoint (client) is terminating the connection because it
     * has expected the server to negotiate one or more extension, but the server didn't
     * return them in the response message of the WebSocket handshake. The list of
     * extensions that are needed SHOULD appear in the /reason/ part of the Close frame.
     * Note that this status code is not used by the server, because it can fail the
     * WebSocket handshake instead."
     */
    public static final CloseStatus REQUIRED_EXTENSION = new CloseStatus(1010);

    /**
     * "1011 indicates that a server is terminating the connection because it encountered
     * an unexpected condition that prevented it from fulfilling the request."
     */
    public static final CloseStatus SERVER_ERROR = new CloseStatus(1011);

    /**
     * "1012 indicates that the service is restarted. A client may reconnect, and if it
     * chooses to do, should reconnect using a randomized delay of 5 - 30s."
     */
    public static final CloseStatus SERVICE_RESTARTED = new CloseStatus(1012);

    /**
     * "1013 indicates that the service is experiencing overload. A client should only
     * connect to a different IP (when there are multiple for the target) or reconnect to
     * the same IP upon user action."
     */
    public static final CloseStatus SERVICE_OVERLOAD = new CloseStatus(1013);

    /**
     * "1015 is a reserved value and MUST NOT be set as a status code in a Close control
     * frame by an endpoint. It is designated for use in applications expecting a status
     * code to indicate that the connection was closed due to a failure to perform a TLS
     * handshake (e.g., the server certificate can't be verified)."
     */
    public static final CloseStatus TLS_HANDSHAKE_FAILURE = new CloseStatus(1015);


    private final int code;

    @Nullable
    private final String reason;


    /**
     * Create a new {@link CloseStatus} instance.
     * @param code the status code
     */
    public CloseStatus(int code) {
        this(code, null);
    }

    /**
     * Create a new {@link CloseStatus} instance.
     * @param code the status code
     * @param reason the reason
     */
    public CloseStatus(int code, @Nullable String reason) {
        Assert.isTrue((code >= 1000 && code < 5000), "Invalid status code");
        this.code = code;
        this.reason = reason;
    }


    /**
     * Return the status code.
     */
    public int getCode() {
        return this.code;
    }

    /**
     * Return the reason, or {@code null} if none.
     */
    @Nullable
    public String getReason() {
        return this.reason;
    }

    /**
     * Create a new {@link CloseStatus} from this one with the specified reason.
     * @param reason the reason
     * @return a new {@link CloseStatus} instance
     */
    public CloseStatus withReason(String reason) {
        Assert.hasText(reason, "Reason must not be empty");
        return new CloseStatus(this.code, reason);
    }


    public boolean equalsCode(CloseStatus other) {
        return (this.code == other.code);
    }

    @Override
    public boolean equals(Object other) {
        if (this == other) {
            return true;
        }
        if (!(other instanceof CloseStatus)) {
            return false;
        }
        CloseStatus otherStatus = (CloseStatus) other;
        return (this.code == otherStatus.code &&
                ObjectUtils.nullSafeEquals(this.reason, otherStatus.reason));
    }

    @Override
    public int hashCode() {
        return this.code * 29 + ObjectUtils.nullSafeHashCode(this.reason);
    }

    @Override
    public String toString() {
        return "CloseStatus[code=" + this.code + ", reason=" + this.reason + "]";
    }

}
狀態(tài)碼 常量 表示
1000 NORMAL 1000表示一個(gè)正常的閉包蚓峦,這意味著建立連接的目的已經(jīng)完成
1001 GOING_AWAY 1001表示端點(diǎn)正在“消失”舌剂,比如服務(wù)器關(guān)閉或?yàn)g覽器從頁(yè)面上離開(kāi)。
1002 PROTOCOL_ERROR 1002表示由于協(xié)議錯(cuò)誤暑椰,端點(diǎn)正在終止連接
1003 NOT_ACCEPTABLE 1003表示端點(diǎn)正在終止連接霍转,因?yàn)樗邮樟艘环N無(wú)法接受的數(shù)據(jù)類型(例如,一個(gè)只理解文本數(shù)據(jù)的端點(diǎn)一汽,如果接收到二進(jìn)制消息避消,則可以發(fā)送此數(shù)據(jù)
1004 Reserved 保留,未來(lái)可能在定義
1005 NO_STATUS_CODE 1005是一個(gè)保留值召夹,不能在端點(diǎn)的閉合控制幀中設(shè)置為狀態(tài)碼岩喷。
1006 NO_CLOSE_FRAME 1006是一個(gè)保留值,不能在端點(diǎn)的閉合控制幀中設(shè)置為狀態(tài)碼
1007 BAD_DATA 1007表示端點(diǎn)正在終止連接戳鹅,因?yàn)樗谝粭l消息中接收到與消息類型不一致的數(shù)據(jù)(例如均驶,文本消息中的非utf - 8[RFC3629]數(shù)據(jù))。
1008 POLICY_VIOLATION 1008表示端點(diǎn)正在終止連接枫虏,因?yàn)樗盏搅诉`反其策略的消息。
1009 TOO_BIG_TO_PROCESS 1009表示端點(diǎn)正在終止連接爬虱,因?yàn)樗盏搅艘粋€(gè)太大的消息隶债,無(wú)法處理。
10010 REQUIRED_EXTENSION 1010表示端點(diǎn)(客戶端)終止了連接跑筝,因?yàn)樗谕?wù)器可以協(xié)商一個(gè)或多個(gè)擴(kuò)展死讹,但是服務(wù)器并沒(méi)有在WebSocket握手的響應(yīng)消息中返回它們
10011 SERVER_ERROR 1011表明服務(wù)器正在終止連接,因?yàn)樗龅搅艘粋€(gè)意想不到的情況曲梗,阻止它完成請(qǐng)求
10012 SERVICE_RESTARTED 1012表示該服務(wù)重新啟動(dòng)
10013 SERVICE_OVERLOAD 1013顯示服務(wù)正在經(jīng)歷過(guò)載赞警。
10015 TLS_HANDSHAKE_FAILURE 1015是一個(gè)保留的值妓忍,不能在端點(diǎn)的閉環(huán)控制幀中設(shè)置為狀態(tài)碼

HandshakeInfo

與啟動(dòng)WebSocketSession會(huì)話的握手請(qǐng)求相關(guān)的簡(jiǎn)單信息容器

public class HandshakeInfo {

private final URI uri;

private final Mono<Principal> principalMono;

private final HttpHeaders headers;

@Nullable
private final String protocol;


/**
 * Constructor with information about the handshake.
 * @param uri the endpoint URL
 * @param headers request headers for server or response headers or client
 * @param principal the principal for the session
 * @param protocol the negotiated sub-protocol (may be {@code null})
 */
public HandshakeInfo(URI uri, HttpHeaders headers, Mono<Principal> principal, @Nullable String protocol) {
    Assert.notNull(uri, "URI is required");
    Assert.notNull(headers, "HttpHeaders are required");
    Assert.notNull(principal, "Principal is required");
    this.uri = uri;
    this.headers = headers;
    this.principalMono = principal;
    this.protocol = protocol;
}


/**
* 返回WebSocket端點(diǎn)的URL
 * Return the URL for the WebSocket endpoint.
 */
public URI getUri() {
    return this.uri;
}

/**
 * Return the handshake HTTP headers. Those are the request headers for a
 * server session and the response headers for a client session.
 */
public HttpHeaders getHeaders() {
    return this.headers;
}

/**
 *  返回與握手HTTP請(qǐng)求相關(guān)的主體。
 * Return the principal associated with the handshake HTTP request.
 */
public Mono<Principal> getPrincipal() {
    return this.principalMono;
}

/**
* 在握手時(shí)協(xié)商的子協(xié)議愧旦,如果沒(méi)有世剖,則為null。
 * The sub-protocol negotiated at handshake time, or {@code null} if none.
 * @see <a >
 * https://tools.ietf.org/html/rfc6455#section-1.9</a>
 */
@Nullable
public String getSubProtocol() {
    return this.protocol;
}


@Override
public String toString() {
    return "HandshakeInfo[uri=" + this.uri + ", headers=" + this.headers + "]";
}

}

Package org.springframework.web.reactive.socket.adapter

將Spring的Reactive WebSocket API與WebSocket運(yùn)行時(shí)相適配的類笤虫。

AbstractWebSocketSession<T>

WebSocketSession實(shí)現(xiàn)的便捷基類旁瘫,包含公共字段并暴露給外界來(lái)訪問(wèn)。還實(shí)現(xiàn)了WebSocketMessage工廠方法琼蚯。

AbstractListenerWebSocketSession

在事件偵聽(tīng)器WebSocket API之間架設(shè)的WebSocketSession實(shí)現(xiàn)的基類

public abstract class AbstractWebSocketSession<T> implements WebSocketSession {

private final T delegate;

private final String id;

private final HandshakeInfo handshakeInfo;

private final DataBufferFactory bufferFactory;


/**
 * Create a new instance and associate the given attributes with it.
 */
protected AbstractWebSocketSession(T delegate, String id, HandshakeInfo handshakeInfo,
        DataBufferFactory bufferFactory) {

    Assert.notNull(delegate, "Native session is required.");
    Assert.notNull(id, "Session id is required.");
    Assert.notNull(handshakeInfo, "HandshakeInfo is required.");
    Assert.notNull(bufferFactory, "DataBuffer factory is required.");

    this.delegate = delegate;
    this.id = id;
    this.handshakeInfo = handshakeInfo;
    this.bufferFactory = bufferFactory;
}


protected T getDelegate() {
    return this.delegate;
}

@Override
public String getId() {
    return this.id;
}

@Override
public HandshakeInfo getHandshakeInfo() {
    return this.handshakeInfo;
}

// 返回一個(gè)DataBuffer Factory來(lái)創(chuàng)建消息有效載荷酬凳。
@Override
public DataBufferFactory bufferFactory() {
    return this.bufferFactory;
}

//獲取傳入消息的流。
@Override
public abstract Flux<WebSocketMessage> receive();

//將給定的消息寫(xiě)入WebSocket連接遭庶。
@Override
public abstract Mono<Void> send(Publisher<WebSocketMessage> messages);


// WebSocketMessage factory methods

@Override
public WebSocketMessage textMessage(String payload) {
    byte[] bytes = payload.getBytes(StandardCharsets.UTF_8);
    DataBuffer buffer = bufferFactory().wrap(bytes);
    return new WebSocketMessage(WebSocketMessage.Type.TEXT, buffer);
}


// Factory方法使用WebSocketSession.bufferFactory()為會(huì)話創(chuàng)建二進(jìn)制WebSocketMessage宁仔。
@Override
public WebSocketMessage binaryMessage(Function<DataBufferFactory, DataBuffer> payloadFactory) {
    DataBuffer payload = payloadFactory.apply(bufferFactory());
    return new WebSocketMessage(WebSocketMessage.Type.BINARY, payload);
}

//工廠方法使用WebSocketSession.bufferFactory()為會(huì)話創(chuàng)建一個(gè)ping WebSocketMessage。

@Override
public WebSocketMessage pingMessage(Function<DataBufferFactory, DataBuffer> payloadFactory) {
    DataBuffer payload = payloadFactory.apply(bufferFactory());
    return new WebSocketMessage(WebSocketMessage.Type.PING, payload);
}

//工廠方法創(chuàng)建一個(gè)使用WebSocketSession.bufferFactory pong WebSocketMessage會(huì)話()峦睡。
@Override
public WebSocketMessage pongMessage(Function<DataBufferFactory, DataBuffer> payloadFactory) {
    DataBuffer payload = payloadFactory.apply(bufferFactory());
    return new WebSocketMessage(WebSocketMessage.Type.PONG, payload);
}


@Override
public String toString() {
    return getClass().getSimpleName() + "[id=" + getId() + ", uri=" + getHandshakeInfo().getUri() + "]";
}

}

AbstractListenerWebSocketSession

在事件偵聽(tīng)器WebSocket API(例如Java WebSocket API JSR-356台诗,Jetty,Undertow)和Reactive Streams之間進(jìn)行橋接的WebSocketSession實(shí)現(xiàn)的基類赐俗。

也是訂閱者的實(shí)現(xiàn)拉队,因此,它可以用作會(huì)話處理的完成訂閱


public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSocketSession<T>
        implements Subscriber<Void> {

    /**
     * The "back-pressure" buffer size to use if the underlying WebSocket API
     * does not have flow control for receiving messages.
     */
    private static final int RECEIVE_BUFFER_SIZE = 8192;


    @Nullable
    private final MonoProcessor<Void> completionMono;

    private final WebSocketReceivePublisher receivePublisher = new WebSocketReceivePublisher();

    @Nullable
    private volatile WebSocketSendProcessor sendProcessor;

    private final AtomicBoolean sendCalled = new AtomicBoolean();


    /**
     * Base constructor.
     * @param delegate the native WebSocket session, channel, or connection
     * @param id the session id
     * @param handshakeInfo the handshake info
     * @param bufferFactory the DataBuffer factor for the current connection
     */
    public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo handshakeInfo,
            DataBufferFactory bufferFactory) {

        this(delegate, id, handshakeInfo, bufferFactory, null);
    }

    /**
     * Alternative constructor with completion {@code Mono&lt;Void&gt;} to propagate
     * the session completion (success or error) (for client-side use).
     */
    public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo handshakeInfo,
            DataBufferFactory bufferFactory, @Nullable MonoProcessor<Void> completionMono) {

        super(delegate, id, handshakeInfo, bufferFactory);
        this.completionMono = completionMono;
    }


    protected WebSocketSendProcessor getSendProcessor() {
        WebSocketSendProcessor sendProcessor = this.sendProcessor;
        Assert.state(sendProcessor != null, "No WebSocketSendProcessor available");
        return sendProcessor;
    }

    @Override
    public Flux<WebSocketMessage> receive() {
        return canSuspendReceiving() ?
                Flux.from(this.receivePublisher) :
                Flux.from(this.receivePublisher).onBackpressureBuffer(RECEIVE_BUFFER_SIZE);
    }

    @Override
    public Mono<Void> send(Publisher<WebSocketMessage> messages) {
        if (this.sendCalled.compareAndSet(false, true)) {
            WebSocketSendProcessor sendProcessor = new WebSocketSendProcessor();
            this.sendProcessor = sendProcessor;
            return Mono.from(subscriber -> {
                    messages.subscribe(sendProcessor);
                    sendProcessor.subscribe(subscriber);
            });
        }
        else {
            return Mono.error(new IllegalStateException("send() has already been called"));
        }
    }

    /**
     * 底層的WebSocket API是否具有流量控制功能阻逮,可以暫停和恢復(fù)接收消息粱快。
     */
    protected abstract boolean canSuspendReceiving();

    /**
     * Suspend receiving until received message(s) are processed and more demand
     * is generated by the downstream Subscriber.
     * <p><strong>Note:</strong> if the underlying WebSocket API does not provide
     * flow control for receiving messages, and this method should be a no-op
     * and {@link #canSuspendReceiving()} should return {@code false}.
     */
    protected abstract void suspendReceiving();

    /**
     * Resume receiving new message(s) after demand is generated by the
     * downstream Subscriber.
     * <p><strong>Note:</strong> if the underlying WebSocket API does not provide
     * flow control for receiving messages, and this method should be a no-op
     * and {@link #canSuspendReceiving()} should return {@code false}.
     */
    protected abstract void resumeReceiving();

    /**
     * Send the given WebSocket message.
     */
    protected abstract boolean sendMessage(WebSocketMessage message) throws IOException;


    // WebSocketHandler adapter delegate methods

    /** Handle a message callback from the WebSocketHandler adapter */
    void handleMessage(Type type, WebSocketMessage message) {
        this.receivePublisher.handleMessage(message);
    }

    /** Handle an error callback from the WebSocketHandler adapter */
    void handleError(Throwable ex) {
        this.receivePublisher.onError(ex);
        WebSocketSendProcessor sendProcessor = this.sendProcessor;
        if (sendProcessor != null) {
            sendProcessor.cancel();
            sendProcessor.onError(ex);
        }
    }

    /** Handle a close callback from the WebSocketHandler adapter */
    void handleClose(CloseStatus reason) {
        this.receivePublisher.onAllDataRead();
        WebSocketSendProcessor sendProcessor = this.sendProcessor;
        if (sendProcessor != null) {
            sendProcessor.cancel();
            sendProcessor.onComplete();
        }
    }


    // Subscriber<Void> implementation

    @Override
    public void onSubscribe(Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(Void aVoid) {
        // no op
    }

    @Override
    public void onError(Throwable ex) {
        if (this.completionMono != null) {
            this.completionMono.onError(ex);
        }
        int code = CloseStatus.SERVER_ERROR.getCode();
        close(new CloseStatus(code, ex.getMessage()));
    }

    @Override
    public void onComplete() {
        if (this.completionMono != null) {
            this.completionMono.onComplete();
        }
        close();
    }


    private final class WebSocketReceivePublisher extends AbstractListenerReadPublisher<WebSocketMessage> {

        @Nullable
        private volatile WebSocketMessage webSocketMessage;

        @Override
        protected void checkOnDataAvailable() {
            if (this.webSocketMessage != null) {
                onDataAvailable();
            }
        }

        @Override
        @Nullable
        protected WebSocketMessage read() throws IOException {
            if (this.webSocketMessage != null) {
                WebSocketMessage result = this.webSocketMessage;
                this.webSocketMessage = null;
                resumeReceiving();
                return result;
            }

            return null;
        }

        void handleMessage(WebSocketMessage webSocketMessage) {
            this.webSocketMessage = webSocketMessage;
            suspendReceiving();
            onDataAvailable();
        }
    }


    protected final class WebSocketSendProcessor extends AbstractListenerWriteProcessor<WebSocketMessage> {

        private volatile boolean isReady = true;

        @Override
        protected boolean write(WebSocketMessage message) throws IOException {
            return sendMessage(message);
        }

        @Override
        protected void releaseData() {
            this.currentData = null;
        }

        @Override
        protected boolean isDataEmpty(WebSocketMessage message) {
            return (message.getPayload().readableByteCount() == 0);
        }

        @Override
        protected boolean isWritePossible() {
            return (this.isReady && this.currentData != null);
        }

        /**
         * Sub-classes can invoke this before sending a message (false) and
         * after receiving the async send callback (true) effective translating
         * async completion callback into simple flow control.
         */
        public void setReadyToSend(boolean ready) {
            this.isReady = ready;
        }
    }

}

我們可以看到這兩個(gè)類是由子類來(lái)實(shí)現(xiàn)的。不同的服務(wù)器有不同的實(shí)現(xiàn)叔扼。
@Override
public abstract Flux<WebSocketMessage> receive();

@Override
public abstract Mono<Void> send(Publisher<WebSocketMessage> messages);

又分為兩個(gè)分支事哭,一個(gè)是NettyWebSocketSessionSupport下的ReactorNettyWebSocketSession

NettyWebSocketSessionSupport

基于Netty的WebSocketSession適配器的基類,它提供了將Netty WebSocketFrames轉(zhuǎn)換為WebSocketMessages和從WebSocketMessages轉(zhuǎn)換的便利方法瓜富。

public abstract class NettyWebSocketSessionSupport<T> extends AbstractWebSocketSession<T> {

/**
* 默認(rèn)的最大大小用于聚集入站W(wǎng)ebSocket幀鳍咱。
 * The default max size for aggregating inbound WebSocket frames.
 */
protected static final int DEFAULT_FRAME_MAX_SIZE = 64 * 1024;


private static final Map<Class<?>, WebSocketMessage.Type> MESSAGE_TYPES;

static {
    MESSAGE_TYPES = new HashMap<>(4);
    MESSAGE_TYPES.put(TextWebSocketFrame.class, WebSocketMessage.Type.TEXT);
    MESSAGE_TYPES.put(BinaryWebSocketFrame.class, WebSocketMessage.Type.BINARY);
    MESSAGE_TYPES.put(PingWebSocketFrame.class, WebSocketMessage.Type.PING);
    MESSAGE_TYPES.put(PongWebSocketFrame.class, WebSocketMessage.Type.PONG);
}


protected NettyWebSocketSessionSupport(T delegate, HandshakeInfo info, NettyDataBufferFactory factory) {
    super(delegate, ObjectUtils.getIdentityHexString(delegate), info, factory);
}


//返回一個(gè)DataBuffer Factory來(lái)創(chuàng)建消息有效載荷。
@Override
public NettyDataBufferFactory bufferFactory() {
    return (NettyDataBufferFactory) super.bufferFactory();
}


protected WebSocketMessage toMessage(WebSocketFrame frame) {
    DataBuffer payload = bufferFactory().wrap(frame.content());
    return new WebSocketMessage(MESSAGE_TYPES.get(frame.getClass()), payload);
}

protected WebSocketFrame toFrame(WebSocketMessage message) {
    ByteBuf byteBuf = NettyDataBufferFactory.toByteBuf(message.getPayload());
    if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
        return new TextWebSocketFrame(byteBuf);
    }
    else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
        return new BinaryWebSocketFrame(byteBuf);
    }
    else if (WebSocketMessage.Type.PING.equals(message.getType())) {
        return new PingWebSocketFrame(byteBuf);
    }
    else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
        return new PongWebSocketFrame(byteBuf);
    }
    else {
        throw new IllegalArgumentException("Unexpected message type: " + message.getType());
    }
}

}

ReactorNettyWebSocketSession

Spring WebSocketSession實(shí)現(xiàn)与柑,可以適應(yīng)反應(yīng)堆Netty的WebSocket NettyInbound和NettyOutbound谤辜。

public class ReactorNettyWebSocketSession
    extends NettyWebSocketSessionSupport<ReactorNettyWebSocketSession.WebSocketConnection> {


public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
        HandshakeInfo info, NettyDataBufferFactory bufferFactory) {

    super(new WebSocketConnection(inbound, outbound), info, bufferFactory);
}


//獲取傳入消息的流。
@Override
public Flux<WebSocketMessage> receive() {
    return getDelegate().getInbound()
            .aggregateFrames(DEFAULT_FRAME_MAX_SIZE)
            .receiveFrames()
            .map(super::toMessage);
}

//將給定的消息寫(xiě)入WebSocket連接价捧。
@Override
public Mono<Void> send(Publisher<WebSocketMessage> messages) {
    Flux<WebSocketFrame> frames = Flux.from(messages).map(this::toFrame);
    return getDelegate().getOutbound()
            .options(NettyPipeline.SendOptions::flushOnEach)
            .sendObject(frames)
            .then();
}

//關(guān)閉具有給定狀態(tài)的WebSocket會(huì)話丑念。
@Override
public Mono<Void> close(CloseStatus status) {
    return Mono.error(new UnsupportedOperationException(
            "Currently in Reactor Netty applications are expected to use the " +
                    "Cancellation returned from subscribing to the \"receive\"-side Flux " +
                    "in order to close the WebSocket session."));
}


/**
 * Simple container for {@link NettyInbound} and {@link NettyOutbound}.
 */
public static class WebSocketConnection {

    private final WebsocketInbound inbound;

    private final WebsocketOutbound outbound;


    public WebSocketConnection(WebsocketInbound inbound, WebsocketOutbound outbound) {
        this.inbound = inbound;
        this.outbound = outbound;
    }

    public WebsocketInbound getInbound() {
        return this.inbound;
    }

    public WebsocketOutbound getOutbound() {
        return this.outbound;
    }
}

}

另外基于AbstractListenerWebSocketSession的集中不同的實(shí)現(xiàn):

StandardWebSocketSession

為標(biāo)準(zhǔn)Java(JSR 356)會(huì)話跳轉(zhuǎn)WebSocketSession適配器。

StandardWebSocketHandlerAdapter

Java WebSocket API(JSR-356)的適配器结蟋,它將事件委托給一個(gè)被動(dòng)的WebSocketHandler及其會(huì)話脯倚。

另外幾個(gè)也是差不多,都有不同的實(shí)現(xiàn)嵌屎。

websocket.jpg

package org.springframework.web.reactive.socket.client;

WebSocketClient

反應(yīng)式風(fēng)格處理WebSocket會(huì)話的協(xié)議推正。

public interface WebSocketClient {

// 具有自定義標(biāo)頭的執(zhí)行
Mono<Void> execute(URI url, WebSocketHandler handler);

//對(duì)給定的URL執(zhí)行握手請(qǐng)求恍涂,并使用給定的handler處理生成的WebSocket會(huì)話。
Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler);

}

WebSocketClientSupport

WebSocketClient實(shí)現(xiàn)的基類植榕。


public class WebSocketClientSupport {

    private static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";


    protected final Log logger = LogFactory.getLog(getClass());


    protected List<String> beforeHandshake(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
        if (logger.isDebugEnabled()) {
            logger.debug("Executing handshake to " + url);
        }
        return handler.getSubProtocols();
    }

    protected HandshakeInfo afterHandshake(URI url, HttpHeaders responseHeaders) {
        if (logger.isDebugEnabled()) {
            logger.debug("Handshake response: " + url + ", " + responseHeaders);
        }
        String protocol = responseHeaders.getFirst(SEC_WEBSOCKET_PROTOCOL);
        return new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
    }

}

ReactorNettyWebSocketClient

用于Reactor Netty的WebSocketClient實(shí)現(xiàn)再沧。

public class ReactorNettyWebSocketClient extends WebSocketClientSupport implements WebSocketClient {

private final HttpClient httpClient;


/**
 * Default constructor.
 */
public ReactorNettyWebSocketClient() {
    this(options -> {});
}

/**
 * Constructor that accepts an {@link HttpClientOptions.Builder} consumer
 * to supply to {@link HttpClient#create(Consumer)}.
 */
public ReactorNettyWebSocketClient(Consumer<? super HttpClientOptions.Builder> clientOptions) {
    this.httpClient = HttpClient.create(clientOptions);
}


/**
 * Return the configured {@link HttpClient}.
 */
public HttpClient getHttpClient() {
    return this.httpClient;
}

//對(duì)給定的URL執(zhí)行握手請(qǐng)求,并使用給定的處理程序處理生成的WebSocket會(huì)話内贮。
@Override
public Mono<Void> execute(URI url, WebSocketHandler handler) {
    return execute(url, new HttpHeaders(), handler);
}

@Override
public Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler) {
    List<String> protocols = beforeHandshake(url, headers, handler);

    return getHttpClient()
            .ws(url.toString(),
                    nettyHeaders -> setNettyHeaders(headers, nettyHeaders),
                    StringUtils.collectionToCommaDelimitedString(protocols))
            .flatMap(response -> {
                HandshakeInfo info = afterHandshake(url, toHttpHeaders(response));
                ByteBufAllocator allocator = response.channel().alloc();
                NettyDataBufferFactory factory = new NettyDataBufferFactory(allocator);
                return response.receiveWebsocket((in, out) -> {
                    WebSocketSession session = new ReactorNettyWebSocketSession(in, out, info, factory);
                    return handler.handle(session);
                });
            });
}

private void setNettyHeaders(HttpHeaders headers, io.netty.handler.codec.http.HttpHeaders nettyHeaders) {
    headers.forEach(nettyHeaders::set);
}

private HttpHeaders toHttpHeaders(HttpClientResponse response) {
    HttpHeaders headers = new HttpHeaders();
    response.responseHeaders().forEach(entry -> {
        String name = entry.getKey();
        headers.put(name, response.responseHeaders().getAll(name));
    });
    return headers;
}

}
webscoketClient.jpg

org.springframework.web.reactive.socket.server.support;

WebSocket請(qǐng)求的服務(wù)器端支持類产园。

RequestUpgradeStrategy

根據(jù)底層網(wǎng)絡(luò)運(yùn)行時(shí)將HTTP請(qǐng)求升級(jí)到WebSocket會(huì)話的策略。

public interface RequestUpgradeStrategy {

/**
 * 升級(jí)到WebSocket會(huì)話并使用給定的處理程序處理它夜郁。
 * @param exchange the current exchange
 * @param webSocketHandler handler for the WebSocket session
 * @param subProtocol the selected sub-protocol got the handler
 * @return completion {@code Mono<Void>} to indicate the outcome of the
 * WebSocket session handling.
 */
Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler, @Nullable String subProtocol);

}

ReactorNettyRequestUpgradeStrategy

持有RequestUpgradeStrategy的實(shí)現(xiàn)什燕。

public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrategy {

@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler, @Nullable String subProtocol) {
    ReactorServerHttpResponse response = (ReactorServerHttpResponse) exchange.getResponse();
    HandshakeInfo info = getHandshakeInfo(exchange, subProtocol);
    NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();

    return response.getReactorResponse().sendWebsocket(subProtocol,
            (in, out) -> handler.handle(new ReactorNettyWebSocketSession(in, out, info, bufferFactory)));
}

private HandshakeInfo getHandshakeInfo(ServerWebExchange exchange, @Nullable String protocol) {
    ServerHttpRequest request = exchange.getRequest();
    Mono<Principal> principal = exchange.getPrincipal();
    return new HandshakeInfo(request.getURI(), request.getHeaders(), principal, protocol);
}

}

我們可以看到這里的upgrade方法把接收到的http請(qǐng)求協(xié)議轉(zhuǎn)換為websocket協(xié)議。

還有其他幾種竞端。


image.png

WebSocketService

一種委托websocket相關(guān)的HTTP請(qǐng)求的服務(wù)屎即。

對(duì)于WebSocket端點(diǎn),這意味著要處理初始的WebSocket HTTP握手請(qǐng)求事富。對(duì)于SockJS端點(diǎn)技俐,這可能意味著處理所有在SockJS協(xié)議中定義的HTTP請(qǐng)求。

public interface WebSocketService {

/**
 * 處理HTTP請(qǐng)求并使用給定的WebSocketHandler统台。
 * @param exchange the current exchange
 * @param webSocketHandler handler for WebSocket session
 * @return a completion Mono for the WebSocket session handling
 */
Mono<Void> handleRequest(ServerWebExchange exchange, WebSocketHandler webSocketHandler);

}

HandshakeWebSocketService

WebSocketService的實(shí)現(xiàn)雕擂。

public class HandshakeWebSocketService implements WebSocketService, Lifecycle {

private static final String SEC_WEBSOCKET_KEY = "Sec-WebSocket-Key";

private static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";


private static final boolean tomcatPresent = ClassUtils.isPresent(
        "org.apache.tomcat.websocket.server.WsHttpUpgradeHandler",
        HandshakeWebSocketService.class.getClassLoader());

private static final boolean jettyPresent = ClassUtils.isPresent(
        "org.eclipse.jetty.websocket.server.WebSocketServerFactory",
        HandshakeWebSocketService.class.getClassLoader());

private static final boolean undertowPresent = ClassUtils.isPresent(
        "io.undertow.websockets.WebSocketProtocolHandshakeHandler",
        HandshakeWebSocketService.class.getClassLoader());

private static final boolean reactorNettyPresent = ClassUtils.isPresent(
        "reactor.ipc.netty.http.server.HttpServerResponse",
        HandshakeWebSocketService.class.getClassLoader());


protected static final Log logger = LogFactory.getLog(HandshakeWebSocketService.class);


private final RequestUpgradeStrategy upgradeStrategy;

private volatile boolean running = false;


/**
    * 默認(rèn)構(gòu)造函數(shù)自動(dòng),基于類路徑檢測(cè)的RequestUpgradeStrategy的發(fā)現(xiàn)使用贱勃。
 * Default constructor automatic, classpath detection based discovery of the
 * {@link RequestUpgradeStrategy} to use.
 */
public HandshakeWebSocketService() {
    this(initUpgradeStrategy());
}

/**
 * 使用RequestUpgradeStrategy的替代構(gòu)造函數(shù)井赌。
 * @param upgradeStrategy the strategy to use
 */
public HandshakeWebSocketService(RequestUpgradeStrategy upgradeStrategy) {
    Assert.notNull(upgradeStrategy, "RequestUpgradeStrategy is required");
    this.upgradeStrategy = upgradeStrategy;
}

private static RequestUpgradeStrategy initUpgradeStrategy() {
    String className;
    if (tomcatPresent) {
        className = "TomcatRequestUpgradeStrategy";
    }
    else if (jettyPresent) {
        className = "JettyRequestUpgradeStrategy";
    }
    else if (undertowPresent) {
        className = "UndertowRequestUpgradeStrategy";
    }
    else if (reactorNettyPresent) {
        // As late as possible (Reactor Netty commonly used for WebClient)
        className = "ReactorNettyRequestUpgradeStrategy";
    }
    else {
        throw new IllegalStateException("No suitable default RequestUpgradeStrategy found");
    }

    try {
        className = "org.springframework.web.reactive.socket.server.upgrade." + className;
        Class<?> clazz = ClassUtils.forName(className, HandshakeWebSocketService.class.getClassLoader());
        return (RequestUpgradeStrategy) ReflectionUtils.accessibleConstructor(clazz).newInstance();
    }
    catch (Throwable ex) {
        throw new IllegalStateException(
                "Failed to instantiate RequestUpgradeStrategy: " + className, ex);
    }
}


/**
 * Return the {@link RequestUpgradeStrategy} for WebSocket requests.
 */
public RequestUpgradeStrategy getUpgradeStrategy() {
    return this.upgradeStrategy;
}

@Override
public boolean isRunning() {
    return this.running;
}

@Override
public void start() {
    if (!isRunning()) {
        this.running = true;
        doStart();
    }
}

protected void doStart() {
    if (getUpgradeStrategy() instanceof Lifecycle) {
        ((Lifecycle) getUpgradeStrategy()).start();
    }
}

@Override
public void stop() {
    if (isRunning()) {
        this.running = false;
        doStop();
    }
}

protected void doStop() {
    if (getUpgradeStrategy() instanceof Lifecycle) {
        ((Lifecycle) getUpgradeStrategy()).stop();
    }
}


    //處理HTTP請(qǐng)求并使用給定的WebSocketHandler。
@Override
public Mono<Void> handleRequest(ServerWebExchange exchange, WebSocketHandler handler) {
    ServerHttpRequest request = exchange.getRequest();
    HttpMethod method = request.getMethod();
    HttpHeaders headers = request.getHeaders();

    if (logger.isDebugEnabled()) {
        logger.debug("Handling " + request.getURI() + " with headers: " + headers);
    }

    if (HttpMethod.GET != method) {
        return Mono.error(new MethodNotAllowedException(
                request.getMethodValue(), Collections.singleton(HttpMethod.GET)));
    }

    if (!"WebSocket".equalsIgnoreCase(headers.getUpgrade())) {
        return handleBadRequest("Invalid 'Upgrade' header: " + headers);
    }

    List<String> connectionValue = headers.getConnection();
    if (!connectionValue.contains("Upgrade") && !connectionValue.contains("upgrade")) {
        return handleBadRequest("Invalid 'Connection' header: " + headers);
    }

    String key = headers.getFirst(SEC_WEBSOCKET_KEY);
    if (key == null) {
        return handleBadRequest("Missing \"Sec-WebSocket-Key\" header");
    }

    String protocol = selectProtocol(headers, handler);
    return this.upgradeStrategy.upgrade(exchange, handler, protocol);
}

private Mono<Void> handleBadRequest(String reason) {
    if (logger.isDebugEnabled()) {
        logger.debug(reason);
    }
    return Mono.error(new ServerWebInputException(reason));
}

@Nullable
private String selectProtocol(HttpHeaders headers, WebSocketHandler handler) {
    String protocolHeader = headers.getFirst(SEC_WEBSOCKET_PROTOCOL);
    if (protocolHeader != null) {
        List<String> supportedProtocols = handler.getSubProtocols();
        for (String protocol : StringUtils.commaDelimitedListToStringArray(protocolHeader)) {
            if (supportedProtocols.contains(protocol)) {
                return protocol;
            }
        }
    }
    return null;
}

}

WebSocketService實(shí)現(xiàn)通過(guò)委托給RequestUpgradeStrategy處理WebSocket HTTP握手請(qǐng)求贵扰,該請(qǐng)求可以從類路徑自動(dòng)檢測(cè)(無(wú)參數(shù)構(gòu)造函數(shù))仇穗,但也可以顯式配置。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末戚绕,一起剝皮案震驚了整個(gè)濱河市纹坐,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌舞丛,老刑警劉巖耘子,帶你破解...
    沈念sama閱讀 206,482評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異瓷马,居然都是意外死亡拴还,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門欧聘,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人端盆,你說(shuō)我怎么就攤上這事怀骤》逊猓” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,762評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵蒋伦,是天一觀的道長(zhǎng)弓摘。 經(jīng)常有香客問(wèn)我,道長(zhǎng)痕届,這世上最難降的妖魔是什么韧献? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,273評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮研叫,結(jié)果婚禮上锤窑,老公的妹妹穿的比我還像新娘。我一直安慰自己嚷炉,他們只是感情好渊啰,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,289評(píng)論 5 373
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著申屹,像睡著了一般绘证。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上哗讥,一...
    開(kāi)封第一講書(shū)人閱讀 49,046評(píng)論 1 285
  • 那天嚷那,我揣著相機(jī)與錄音,去河邊找鬼杆煞。 笑死魏宽,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的索绪。 我是一名探鬼主播湖员,決...
    沈念sama閱讀 38,351評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼瑞驱!你這毒婦竟也來(lái)了娘摔?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 36,988評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤唤反,失蹤者是張志新(化名)和其女友劉穎凳寺,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體彤侍,經(jīng)...
    沈念sama閱讀 43,476評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡肠缨,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,948評(píng)論 2 324
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了盏阶。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片晒奕。...
    茶點(diǎn)故事閱讀 38,064評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出脑慧,到底是詐尸還是另有隱情魄眉,我是刑警寧澤,帶...
    沈念sama閱讀 33,712評(píng)論 4 323
  • 正文 年R本政府宣布闷袒,位于F島的核電站坑律,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏囊骤。R本人自食惡果不足惜晃择,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,261評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望也物。 院中可真熱鬧宫屠,春花似錦、人聲如沸焦除。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,264評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)膘魄。三九已至乌逐,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間创葡,已是汗流浹背浙踢。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,486評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留灿渴,地道東北人洛波。 一個(gè)月前我還...
    沈念sama閱讀 45,511評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像骚露,于是被迫代替她去往敵國(guó)和親蹬挤。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,802評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容