聊聊flink的StateTtlConfig

本文主要研究一下flink的StateTtlConfig

實(shí)例

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
  • 這里利用builder創(chuàng)建StateTtlConfig模软,之后通過StateDescriptor的enableTimeToLive方法傳遞該config

StateTtlConfig

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/StateTtlConfig.java

/**
 * Configuration of state TTL logic.
 *
 * <p>Note: The map state with TTL currently supports {@code null} user values
 * only if the user value serializer can handle {@code null} values.
 * If the serializer does not support {@code null} values,
 * it can be wrapped with {@link org.apache.flink.api.java.typeutils.runtime.NullableSerializer}
 * at the cost of an extra byte in the serialized form.
 */
public class StateTtlConfig implements Serializable {

    private static final long serialVersionUID = -7592693245044289793L;

    public static final StateTtlConfig DISABLED =
        newBuilder(Time.milliseconds(Long.MAX_VALUE)).setUpdateType(UpdateType.Disabled).build();

    /**
     * This option value configures when to update last access timestamp which prolongs state TTL.
     */
    public enum UpdateType {
        /** TTL is disabled. State does not expire. */
        Disabled,
        /** Last access timestamp is initialised when state is created and updated on every write operation. */
        OnCreateAndWrite,
        /** The same as <code>OnCreateAndWrite</code> but also updated on read. */
        OnReadAndWrite
    }

    /**
     * This option configures whether expired user value can be returned or not.
     */
    public enum StateVisibility {
        /** Return expired user value if it is not cleaned up yet. */
        ReturnExpiredIfNotCleanedUp,
        /** Never return expired user value. */
        NeverReturnExpired
    }

    /**
     * This option configures time scale to use for ttl.
     */
    public enum TimeCharacteristic {
        /** Processing time, see also <code>TimeCharacteristic.ProcessingTime</code>. */
        ProcessingTime
    }

    private final UpdateType updateType;
    private final StateVisibility stateVisibility;
    private final TimeCharacteristic timeCharacteristic;
    private final Time ttl;
    private final CleanupStrategies cleanupStrategies;

    private StateTtlConfig(
        UpdateType updateType,
        StateVisibility stateVisibility,
        TimeCharacteristic timeCharacteristic,
        Time ttl,
        CleanupStrategies cleanupStrategies) {
        this.updateType = Preconditions.checkNotNull(updateType);
        this.stateVisibility = Preconditions.checkNotNull(stateVisibility);
        this.timeCharacteristic = Preconditions.checkNotNull(timeCharacteristic);
        this.ttl = Preconditions.checkNotNull(ttl);
        this.cleanupStrategies = cleanupStrategies;
        Preconditions.checkArgument(ttl.toMilliseconds() > 0,
            "TTL is expected to be positive");
    }

    @Nonnull
    public UpdateType getUpdateType() {
        return updateType;
    }

    @Nonnull
    public StateVisibility getStateVisibility() {
        return stateVisibility;
    }

    @Nonnull
    public Time getTtl() {
        return ttl;
    }

    @Nonnull
    public TimeCharacteristic getTimeCharacteristic() {
        return timeCharacteristic;
    }

    public boolean isEnabled() {
        return updateType != UpdateType.Disabled;
    }

    @Nonnull
    public CleanupStrategies getCleanupStrategies() {
        return cleanupStrategies;
    }

    @Override
    public String toString() {
        return "StateTtlConfig{" +
            "updateType=" + updateType +
            ", stateVisibility=" + stateVisibility +
            ", timeCharacteristic=" + timeCharacteristic +
            ", ttl=" + ttl +
            '}';
    }

    @Nonnull
    public static Builder newBuilder(@Nonnull Time ttl) {
        return new Builder(ttl);
    }

    /**
     * Builder for the {@link StateTtlConfig}.
     */
    public static class Builder {

        private UpdateType updateType = OnCreateAndWrite;
        private StateVisibility stateVisibility = NeverReturnExpired;
        private TimeCharacteristic timeCharacteristic = ProcessingTime;
        private Time ttl;
        private CleanupStrategies cleanupStrategies = new CleanupStrategies();

        public Builder(@Nonnull Time ttl) {
            this.ttl = ttl;
        }

        /**
         * Sets the ttl update type.
         *
         * @param updateType The ttl update type configures when to update last access timestamp which prolongs state TTL.
         */
        @Nonnull
        public Builder setUpdateType(UpdateType updateType) {
            this.updateType = updateType;
            return this;
        }

        @Nonnull
        public Builder updateTtlOnCreateAndWrite() {
            return setUpdateType(UpdateType.OnCreateAndWrite);
        }

        @Nonnull
        public Builder updateTtlOnReadAndWrite() {
            return setUpdateType(UpdateType.OnReadAndWrite);
        }

        /**
         * Sets the state visibility.
         *
         * @param stateVisibility The state visibility configures whether expired user value can be returned or not.
         */
        @Nonnull
        public Builder setStateVisibility(@Nonnull StateVisibility stateVisibility) {
            this.stateVisibility = stateVisibility;
            return this;
        }

        @Nonnull
        public Builder returnExpiredIfNotCleanedUp() {
            return setStateVisibility(StateVisibility.ReturnExpiredIfNotCleanedUp);
        }

        @Nonnull
        public Builder neverReturnExpired() {
            return setStateVisibility(StateVisibility.NeverReturnExpired);
        }

        /**
         * Sets the time characteristic.
         *
         * @param timeCharacteristic The time characteristic configures time scale to use for ttl.
         */
        @Nonnull
        public Builder setTimeCharacteristic(@Nonnull TimeCharacteristic timeCharacteristic) {
            this.timeCharacteristic = timeCharacteristic;
            return this;
        }

        @Nonnull
        public Builder useProcessingTime() {
            return setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        }

        /** Cleanup expired state in full snapshot on checkpoint. */
        @Nonnull
        public Builder cleanupFullSnapshot() {
            cleanupStrategies.strategies.put(
                CleanupStrategies.Strategies.FULL_STATE_SCAN_SNAPSHOT,
                new CleanupStrategies.CleanupStrategy() {  });
            return this;
        }

        /**
         * Sets the ttl time.
         * @param ttl The ttl time.
         */
        @Nonnull
        public Builder setTtl(@Nonnull Time ttl) {
            this.ttl = ttl;
            return this;
        }

        @Nonnull
        public StateTtlConfig build() {
            return new StateTtlConfig(
                updateType,
                stateVisibility,
                timeCharacteristic,
                ttl,
                cleanupStrategies);
        }
    }

    /**
     * TTL cleanup strategies.
     *
     * <p>This class configures when to cleanup expired state with TTL.
     * By default, state is always cleaned up on explicit read access if found expired.
     * Currently cleanup of state full snapshot can be additionally activated.
     */
    public static class CleanupStrategies implements Serializable {
        private static final long serialVersionUID = -1617740467277313524L;

        /** Fixed strategies ordinals in {@code strategies} config field. */
        enum Strategies {
            FULL_STATE_SCAN_SNAPSHOT
        }

        /** Base interface for cleanup strategies configurations. */
        interface CleanupStrategy extends Serializable {

        }

        final EnumMap<Strategies, CleanupStrategy> strategies = new EnumMap<>(Strategies.class);

        public boolean inFullSnapshot() {
            return strategies.containsKey(Strategies.FULL_STATE_SCAN_SNAPSHOT);
        }
    }
}
  • StateTtlConfig用于設(shè)置state的TTL屬性衅码,這里定義了三個(gè)枚舉甫煞,分別是UpdateType(Disabled秽荞、OnCreateAndWrite顷编、OnReadAndWrite)霎俩、StateVisibility(ReturnExpiredIfNotCleanedUp灵份、NeverReturnExpired)仁堪、TimeCharacteristic(ProcessingTime)
  • StateTtlConfig定義了CleanupStrategies,即TTL state的清理策略填渠,默認(rèn)在讀取到expired的state時(shí)會(huì)進(jìn)行清理弦聂,目前還額外提供在FULL_STATE_SCAN_SNAPSHOT的時(shí)候進(jìn)行清理(在checkpoint時(shí)清理full snapshot中的expired state)的選項(xiàng)
  • StateTtlConfig還提供了一個(gè)Builder鸟辅,用于快速設(shè)置UpdateType、StateVisibility莺葫、TimeCharacteristic匪凉、Time、CleanupStrategies

AbstractKeyedStateBackend

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java

    /**
     * @see KeyedStateBackend
     */
    @Override
    @SuppressWarnings("unchecked")
    public <N, S extends State, V> S getOrCreateKeyedState(
            final TypeSerializer<N> namespaceSerializer,
            StateDescriptor<S, V> stateDescriptor) throws Exception {
        checkNotNull(namespaceSerializer, "Namespace serializer");
        checkNotNull(keySerializer, "State key serializer has not been configured in the config. " +
                "This operation cannot use partitioned state.");

        InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());
        if (kvState == null) {
            if (!stateDescriptor.isSerializerInitialized()) {
                stateDescriptor.initializeSerializerUnlessSet(executionConfig);
            }
            kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
                namespaceSerializer, stateDescriptor, this, ttlTimeProvider);
            keyValueStatesByName.put(stateDescriptor.getName(), kvState);
            publishQueryableStateIfEnabled(stateDescriptor, kvState);
        }
        return (S) kvState;
    }
  • AbstractKeyedStateBackend的getOrCreateKeyedState方法里頭使用TtlStateFactory.createStateAndWrapWithTtlIfEnabled來創(chuàng)建InternalKvState

TtlStateFactory

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/TtlStateFactory.java

/**
 * This state factory wraps state objects, produced by backends, with TTL logic.
 */
public class TtlStateFactory<N, SV, S extends State, IS extends S> {
    public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
        TypeSerializer<N> namespaceSerializer,
        StateDescriptor<S, SV> stateDesc,
        KeyedStateFactory originalStateFactory,
        TtlTimeProvider timeProvider) throws Exception {
        Preconditions.checkNotNull(namespaceSerializer);
        Preconditions.checkNotNull(stateDesc);
        Preconditions.checkNotNull(originalStateFactory);
        Preconditions.checkNotNull(timeProvider);
        return  stateDesc.getTtlConfig().isEnabled() ?
            new TtlStateFactory<N, SV, S, IS>(
                namespaceSerializer, stateDesc, originalStateFactory, timeProvider)
                .createState() :
            originalStateFactory.createInternalState(namespaceSerializer, stateDesc);
    }

    private final Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> stateFactories;

    private final TypeSerializer<N> namespaceSerializer;
    private final StateDescriptor<S, SV> stateDesc;
    private final KeyedStateFactory originalStateFactory;
    private final StateTtlConfig ttlConfig;
    private final TtlTimeProvider timeProvider;
    private final long ttl;

    private TtlStateFactory(
        TypeSerializer<N> namespaceSerializer,
        StateDescriptor<S, SV> stateDesc,
        KeyedStateFactory originalStateFactory,
        TtlTimeProvider timeProvider) {
        this.namespaceSerializer = namespaceSerializer;
        this.stateDesc = stateDesc;
        this.originalStateFactory = originalStateFactory;
        this.ttlConfig = stateDesc.getTtlConfig();
        this.timeProvider = timeProvider;
        this.ttl = ttlConfig.getTtl().toMilliseconds();
        this.stateFactories = createStateFactories();
    }

    private Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> createStateFactories() {
        return Stream.of(
            Tuple2.of(ValueStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createValueState),
            Tuple2.of(ListStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createListState),
            Tuple2.of(MapStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createMapState),
            Tuple2.of(ReducingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createReducingState),
            Tuple2.of(AggregatingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createAggregatingState),
            Tuple2.of(FoldingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createFoldingState)
        ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
    }

    private IS createState() throws Exception {
        SupplierWithException<IS, Exception> stateFactory = stateFactories.get(stateDesc.getClass());
        if (stateFactory == null) {
            String message = String.format("State %s is not supported by %s",
                stateDesc.getClass(), TtlStateFactory.class);
            throw new FlinkRuntimeException(message);
        }
        return stateFactory.get();
    }

    @SuppressWarnings("unchecked")
    private IS createValueState() throws Exception {
        ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(
            stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer()));
        return (IS) new TtlValueState<>(
            originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
            ttlConfig, timeProvider, stateDesc.getSerializer());
    }

    @SuppressWarnings("unchecked")
    private <T> IS createListState() throws Exception {
        ListStateDescriptor<T> listStateDesc = (ListStateDescriptor<T>) stateDesc;
        ListStateDescriptor<TtlValue<T>> ttlDescriptor = new ListStateDescriptor<>(
            stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer()));
        return (IS) new TtlListState<>(
            originalStateFactory.createInternalState(
                namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
            ttlConfig, timeProvider, listStateDesc.getSerializer());
    }

    @SuppressWarnings("unchecked")
    private <UK, UV> IS createMapState() throws Exception {
        MapStateDescriptor<UK, UV> mapStateDesc = (MapStateDescriptor<UK, UV>) stateDesc;
        MapStateDescriptor<UK, TtlValue<UV>> ttlDescriptor = new MapStateDescriptor<>(
            stateDesc.getName(),
            mapStateDesc.getKeySerializer(),
            new TtlSerializer<>(mapStateDesc.getValueSerializer()));
        return (IS) new TtlMapState<>(
            originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
            ttlConfig, timeProvider, mapStateDesc.getSerializer());
    }

    @SuppressWarnings("unchecked")
    private IS createReducingState() throws Exception {
        ReducingStateDescriptor<SV> reducingStateDesc = (ReducingStateDescriptor<SV>) stateDesc;
        ReducingStateDescriptor<TtlValue<SV>> ttlDescriptor = new ReducingStateDescriptor<>(
            stateDesc.getName(),
            new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider),
            new TtlSerializer<>(stateDesc.getSerializer()));
        return (IS) new TtlReducingState<>(
            originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
            ttlConfig, timeProvider, stateDesc.getSerializer());
    }

    @SuppressWarnings("unchecked")
    private <IN, OUT> IS createAggregatingState() throws Exception {
        AggregatingStateDescriptor<IN, SV, OUT> aggregatingStateDescriptor =
            (AggregatingStateDescriptor<IN, SV, OUT>) stateDesc;
        TtlAggregateFunction<IN, SV, OUT> ttlAggregateFunction = new TtlAggregateFunction<>(
            aggregatingStateDescriptor.getAggregateFunction(), ttlConfig, timeProvider);
        AggregatingStateDescriptor<IN, TtlValue<SV>, OUT> ttlDescriptor = new AggregatingStateDescriptor<>(
            stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer()));
        return (IS) new TtlAggregatingState<>(
            originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
            ttlConfig, timeProvider, stateDesc.getSerializer(), ttlAggregateFunction);
    }

    @SuppressWarnings({"deprecation", "unchecked"})
    private <T> IS createFoldingState() throws Exception {
        FoldingStateDescriptor<T, SV> foldingStateDescriptor = (FoldingStateDescriptor<T, SV>) stateDesc;
        SV initAcc = stateDesc.getDefaultValue();
        TtlValue<SV> ttlInitAcc = initAcc == null ? null : new TtlValue<>(initAcc, Long.MAX_VALUE);
        FoldingStateDescriptor<T, TtlValue<SV>> ttlDescriptor = new FoldingStateDescriptor<>(
            stateDesc.getName(),
            ttlInitAcc,
            new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider, initAcc),
            new TtlSerializer<>(stateDesc.getSerializer()));
        return (IS) new TtlFoldingState<>(
            originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
            ttlConfig, timeProvider, stateDesc.getSerializer());
    }

    //......
}
  • TtlStateFactory的createStateAndWrapWithTtlIfEnabled方法這里會(huì)根據(jù)stateDesc.getTtlConfig().isEnabled()來創(chuàng)建state捺檬,如果開啟ttl則調(diào)用new TtlStateFactory<N, SV, S, IS>(namespaceSerializer, stateDesc, originalStateFactory, timeProvider).createState()再层,否則調(diào)用originalStateFactory.createInternalState(namespaceSerializer, stateDesc)
  • 這里createStateFactories創(chuàng)建了不同類型的StateDescriptor對(duì)應(yīng)創(chuàng)建方法的map,在createState的時(shí)候堡纬,根據(jù)指定類型自動(dòng)調(diào)用對(duì)應(yīng)的SupplierWithException聂受,省去if else的判斷
  • ValueStateDescriptor對(duì)應(yīng)createValueState方法,創(chuàng)建的是TtlValueState烤镐;ListStateDescriptor對(duì)應(yīng)createListState方法蛋济,創(chuàng)建的是TtlListState;MapStateDescriptor對(duì)應(yīng)createMapState方法炮叶,創(chuàng)建的是TtlMapState碗旅;ReducingStateDescriptor對(duì)應(yīng)createReducingState方法,創(chuàng)建的是TtlReducingState悴灵;AggregatingStateDescriptor對(duì)應(yīng)createAggregatingState方法扛芽,創(chuàng)建的是TtlAggregatingState;FoldingStateDescriptor對(duì)應(yīng)createFoldingState方法积瞒,創(chuàng)建的是TtlFoldingState

小結(jié)

  • StateTtlConfig用于設(shè)置state的TTL屬性,這里主要設(shè)置UpdateType登下、StateVisibility茫孔、TimeCharacteristic、Time被芳、CleanupStrategies這幾個(gè)屬性
  • AbstractKeyedStateBackend的getOrCreateKeyedState方法里頭使用TtlStateFactory.createStateAndWrapWithTtlIfEnabled來創(chuàng)建InternalKvState
  • TtlStateFactory的createStateAndWrapWithTtlIfEnabled方法這里會(huì)根據(jù)stateDesc.getTtlConfig().isEnabled()來創(chuàng)建對(duì)應(yīng)的state缰贝;TtlStateFactory的createState會(huì)根據(jù)不同類型的StateDescriptor創(chuàng)建對(duì)應(yīng)類型的ttl state

doc

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市畔濒,隨后出現(xiàn)的幾起案子剩晴,更是在濱河造成了極大的恐慌,老刑警劉巖侵状,帶你破解...
    沈念sama閱讀 217,657評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件赞弥,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡趣兄,警方通過查閱死者的電腦和手機(jī)绽左,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來艇潭,“玉大人拼窥,你說我怎么就攤上這事戏蔑。” “怎么了鲁纠?”我有些...
    開封第一講書人閱讀 164,057評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵总棵,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我改含,道長(zhǎng)彻舰,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,509評(píng)論 1 293
  • 正文 為了忘掉前任候味,我火速辦了婚禮刃唤,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘白群。我一直安慰自己尚胞,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,562評(píng)論 6 392
  • 文/花漫 我一把揭開白布帜慢。 她就那樣靜靜地躺著笼裳,像睡著了一般。 火紅的嫁衣襯著肌膚如雪粱玲。 梳的紋絲不亂的頭發(fā)上躬柬,一...
    開封第一講書人閱讀 51,443評(píng)論 1 302
  • 那天,我揣著相機(jī)與錄音抽减,去河邊找鬼允青。 笑死,一個(gè)胖子當(dāng)著我的面吹牛卵沉,可吹牛的內(nèi)容都是我干的颠锉。 我是一名探鬼主播,決...
    沈念sama閱讀 40,251評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼史汗,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼琼掠!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起停撞,我...
    開封第一講書人閱讀 39,129評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤瓷蛙,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后戈毒,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體艰猬,經(jīng)...
    沈念sama閱讀 45,561評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,779評(píng)論 3 335
  • 正文 我和宋清朗相戀三年副硅,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了姥宝。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,902評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡恐疲,死狀恐怖腊满,靈堂內(nèi)的尸體忽然破棺而出套么,到底是詐尸還是另有隱情,我是刑警寧澤碳蛋,帶...
    沈念sama閱讀 35,621評(píng)論 5 345
  • 正文 年R本政府宣布胚泌,位于F島的核電站,受9級(jí)特大地震影響肃弟,放射性物質(zhì)發(fā)生泄漏玷室。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,220評(píng)論 3 328
  • 文/蒙蒙 一笤受、第九天 我趴在偏房一處隱蔽的房頂上張望穷缤。 院中可真熱鬧,春花似錦箩兽、人聲如沸津肛。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽身坐。三九已至,卻和暖如春落包,著一層夾襖步出監(jiān)牢的瞬間部蛇,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工咐蝇, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留涯鲁,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,025評(píng)論 2 370
  • 正文 我出身青樓嘹害,卻偏偏與公主長(zhǎng)得像撮竿,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子笔呀,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,843評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容