Flink源碼5- task執(zhí)行及state和checkpoint

一 、 task執(zhí)行入口 0:15

接 上期 回顧
★ ——7 》 TaskExecutor#submitTask()

第一個入口:Task 構(gòu)造函數(shù)
——》Task 構(gòu)造函數(shù)()

* 注釋: 當(dāng)前任務(wù)的 Task 信息
*/
this.taskInfo = new TaskInfo()
......
* 注釋: 初始化 ResultPartition 和 ResultSubpartition 關(guān)于輸出的抽象
* ResultSubpartition具體實(shí)現(xiàn)為 PipelinedSubpartition
*/
final ResultPartitionWriter[] resultPartitionWriters = shuffleEnvironment
.createResultPartitionWriters()
this.consumableNotifyingPartitionWriters =
ConsumableNotifyingResultPartitionWriterDecorator
......
* 注釋: 初始化 InputGate 輸入的 對象,
inputchanle 從上游一個task節(jié)點(diǎn)拉取數(shù)據(jù)
*/
final IndexedInputGate[] gates = shuffleEnvironment.createInputGates()
* 注釋:創(chuàng)建 Task 的線程 但是不執(zhí)行 run() 方法
*/
executingThread = new Thread(TASK_THREADS_GROUP,

第二個入口:task.startTaskThread(); 通過一個線程來運(yùn)行 Task 0:45
—— 》Task #run()
—— ★》Task #dorun()

Task run過程

★ 重要步驟 反射實(shí)例化StreamTask實(shí)例


image.png

dorun:1 if(transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING))
dorun:2 setupPartitionsAndGates(consumableNotifyingPartitionWriters,
dorun:3 Environment env = new RuntimeEnvironment(jobId, vertexId,
dorun:4 invokable = loadAndInstantiateInvokable(userCodeClassLoader,
dorun: 5 invokable.invoke()豺旬;

—— dorun:4》Task #loadAndInstantiateInvokable()

反射調(diào)用構(gòu)造函數(shù)
statelessCtor = invokableClass.getConstructor(Environment.class);

//#1 SourceStreamTask 帶RuntimeEnvvironment的構(gòu)造函數(shù)
//#2 OneInputStreamTask 帶RuntimeEnvvironment的構(gòu)造函數(shù)

Constructor<? extends AbstractInvokable> statelessCtor;
try {
statelessCtor = invokableClass.getConstructor(Environment.class);
}

——#1 SourceStreamTask(Environment env) 構(gòu)造函數(shù)
——#2 OneInputStreamTask (Environment env) 構(gòu)造函數(shù)

               //1,2最后都到 父類構(gòu)造函數(shù) StreamTask()  

    ——★  》 父類構(gòu)造函數(shù) StreamTask()
           【streamtask 截圖或者筆記】
           4件大事

StreamTask:1 this.recordWriter = createRecordWriterDelegate(configuration,
StreamTask:2 this.mailboxProcessor = new MailboxProcessor(this::processInput,
StreamTask:3 this.stateBackend = createStateBackend();
StreamTask:4 this.subtaskCheckpointCoordinator = new

—— StreamTask:1 》 StreamTask. createRecordWriterDelegate 1:04
——》1》StreamTask.createRecordWriters()
——》1》StreamTask.createRecordWriter()
——》1》StreamTask.createRecordWriters

// TODO_MA 注釋: 初始化一個 ArrayList 容器用來存放創(chuàng)建出來的 RecordWriter
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>
// TODO_MA 注釋: 獲取該 StreamTask 的輸出 StreamEdge 集合
List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder
// TODO_MA 注釋: 一個 out StreamEdge 來構(gòu)建 一個 RecordWriter
// TODO_MA 注釋: 大概率 createRecordWriter() 方法的返回值是:
ChannelSelectorRecordWriter
recordWriters.add(createRecordWriter(edge, i,
——》1》StreamTask.createRecordWriters()

  • 1轻专、如果上游 StreamNode 和 下游 StreamNode 的并行度一樣魔眨,則使用: ForwardPartitioner 數(shù)據(jù)分發(fā)策略
  • 2滴劲、如果上游 StreamNode 和 下游 StreamNode 的并行度不一樣苏章,則使用: RebalancePartitioner 數(shù)據(jù)分發(fā)策略 
    StreamPartitioner<OUT> outputPartitioner = null; 
    try {
        outputPartitioner = InstantiationUtil.clone(
    
    // TODO_MA 注釋: 其實(shí)這個 output 就是負(fù)責(zé)幫您完成這個 StrewamTask 的所有數(shù)據(jù)的輸出
    // TODO_MA 注釋: 輸出到 ResultPartition
    // TODO_MA 注釋: 初始化輸出 ChannelSelectorRecordWriter
    RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output
    = new RecordWriterBuilder<SerializationDelegate<StreamRecord<OUT>>>() .setChannelSelector(outputPartitioner)
    .setTimeout(bufferTimeout)
    .setTaskName(taskName) // TODO_MA 注釋: 構(gòu)建一個 RecordWriter 返回
    .build(bufferWriter);
    ▲回到父類構(gòu)造函數(shù) StreamTask ▲

StreamTask:2——》 this.mailboxProcessor = new
MailboxProcessor(this::processInput,
//#1 SourceStreamTask 帶RuntimeEnvvironment的構(gòu)造函數(shù)
//#2 OneInputStreamTask 帶RuntimeEnvvironment的構(gòu)造函數(shù)
▲回到 父類構(gòu)造函數(shù) StreamTask ▲

StreamTask:3 ——》 this.stateBackend = createStateBackend(); 1:26 ~
state簡介 部分

——》 StreamTask.createStateBackend() 1:36

* 注釋: 根據(jù)配置獲取 StateBackend
* 一般情況下寂嘉,我們在生產(chǎn)環(huán)境中奏瞬, 在 flink-conf.yaml 文件中進(jìn)行配置:
* 1、state.backend: filesystem
* 2泉孩、state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
* 一般有三種方式:
* 1硼端、state.backend: filesystem = FsStateBackend
* 2、state.backend: jobmanager = MemoryStateBackend
* 3寓搬、state.backend: rocksdb = RocksDBStateBackend
* 也可以在程序中泽疆,進(jìn)行設(shè)置 這種方式會覆蓋配置文件中的配置:
* StreamExecutionEnvironment.setStateBackend(StateBackend backend)
*/
return StateBackendLoader.fromApplicationOrConfigOrDefault
▲ 回 父類構(gòu)造函數(shù) StreamTask▲

******創(chuàng)建 CheckpointStorage
* 1恭应、FsStateBackend = FsCheckpointStorage
*/
stateBackend.createCheckpointStorage(getEnvironment()

FsStateBackendcreateCheckpointStorage

****** 注釋: 創(chuàng)建 Channel 的 IO 線程池
*/
this.channelIOExecutor = Executors.newSingleThreadExecutor
▲ 回 父類構(gòu)造函數(shù) StreamTask▲

               ▲回     ★》Task #dorun()下 的  dorun: 5   ▲ 

*** dorun: 5 invokable.invoke()埂陆;
* 注釋: 運(yùn)行任務(wù)钢坦, 在流式應(yīng)用程序中虏劲,都是 StreamTask 的子類
* AbstractInvokable 是 Task 執(zhí)行的主要邏輯颓影,也是所有被執(zhí)行的任務(wù)的基類赁炎,包括 Streaming 模式和 Batch 模式擎场。
* 在 Streaming 模式下瞒御,所有任務(wù)都繼承自 StreamTask父叙,
* 包括 StreamTask 的子類包括 SourceStreamTask, OneInputStreamTask, TwoInputStreamTask,
* 以及用于迭代模式下的 StreamIterationHead 和 StreamIterationTail。
* 每一個 StreamNode 在添加到 StreamGraph 的時候都會有一個關(guān)聯(lián)的 jobVertexClass 屬性肴裙,
* 這個屬性就是該 StreamNode 對應(yīng)的 StreamTask 類型趾唱;對于一個 OperatorChain 而言,它所對應(yīng)的
* StreamTask 就是其 head operator 對應(yīng)的 StreamTask蜻懦。
*/
// run the invokable
invokable.invoke();

——》StreamTask#invoke()

inv 1: beforeInvoke()
inv 2: runMailboxLoop();
inv 3: afterInvoke();
inv 4: cleanUpInvoke();

——inv 1 》StreamTask#beforeInvoke()

  • 注釋: 構(gòu)建 OperatorChain 對象甜癞,里面會做很多事情
    * 初始化 output 輸出對象
    * 主要做三件事情:
    * 1、調(diào)用createStreamOutput()創(chuàng)建對應(yīng)的下游輸出RecordWriterOutput
    * 2宛乃、調(diào)用createOutputCollector()將優(yōu)化邏輯計劃當(dāng)中Chain中的StreamConfig(也就是數(shù)據(jù))寫入到第三步創(chuàng)建的RecordWriterOutput中
    * 3悠咱、通過調(diào)用getChainedOutputs()輸出結(jié)果RecordWriterOutput
    */
    operatorChain = new OperatorChain<>(this, recordWriter);

                   ——inv 1 》OperatorChain 構(gòu)造函數(shù)
                                                    ▼
      // TODO_MA 注釋: 遍歷每個輸出邊,給每個 outEdge 構(gòu)造一個 
          RecordWriterOutput 實(shí)例
    

    for(int i = 0; i < outEdgesInOrder.size(); i++) {
    * 為每一個 Operator 構(gòu)造 RecordWriterOutput
    RecordWriterOutput<?> streamOutput =
    createStreamOutput(recordWriterDelegate
    //放到map 里面
    streamOutputMap.put(outEdge, streamOutput);
    }

  • 注釋: 為每一個 Operator 創(chuàng)建 OutputCollector
    */
    this.chainEntryPoint = createOutputCollector(

  • 注釋: 創(chuàng)建 Operator
    Tuple2<OP, Optional<ProcessingTimeService>> headOperatorAndTimeService =

    • 注釋: 創(chuàng)建 OperatorWrapper
      this.headOperatorWrapper = createOperatorWrapper(
      // TODO_MA 注釋: 所有 OperatorWrapper 對象集合征炼,把 headOperatorWrapper 放入到最后
      // TODO_MA 注釋: 其實(shí)一個 OperatorChain 中析既,包含了多個 Operator,最終都被封裝成 OperatorWrapper 放入這個集合中
      // add head operator to end of chain
      allOpWrappers.add(headOperatorWrapper);

// TODO_MA 注釋: tailOperatorWrapper 在 ArrayList 的最前面
this.tailOperatorWrapper = allOpWrappers.get(0);

  • 注釋: 以 forward topological order 鏈接全部的 operator wrappers為一張圖
    */
    linkOperatorWrappers(allOpWrappers);


    operatorchain.png

    ▲OperatorChain 構(gòu)造函數(shù)結(jié)束▲

                             ▲回inv 1 》StreamTask#beforeInvoke()方法 里面
    

// TODO_MA 注釋: 獲取 OperatorChain 的第一個 Operator
// TODO_MA 注釋: 可以認(rèn)為 接收數(shù)據(jù)線程中谆奥,用到的 headOpeartor 終于被初始化
// TODO_MA 注釋: 其實(shí)到此為止眼坏,可以認(rèn)為,在當(dāng)前 OperatorChain 中要用到的各種組件都已經(jīng)創(chuàng)建好了酸些,可以接收數(shù)據(jù)宰译,然后開始流式處理了。
headOperator = operatorChain.getHeadOperator();

  • 注釋: 執(zhí)行 StreamTask 的初始化
  • 1魄懂、可能是 SourceStreamTask沿侈, 對于 SourceStreamTask 來說,只是注冊一個 savepoint 鉤子
  • 2市栗、也可能是 OneInputStreamTask
    */
    init();
  • 注釋: 狀態(tài)恢復(fù)入口
    operatorChain.initializeStateAndOpenOperators(
  • 注釋: 初始化 Mail
    * 這個地方主要是初始化 InputGate 等輸入相關(guān)的細(xì)節(jié)
    */
    readRecoveredChannelState();

----------------------至此 所有準(zhǔn)備工作完成缀拭, 準(zhǔn)備真正執(zhí)行task數(shù)據(jù)流處理到此為止,Task 初始化和預(yù)執(zhí)行相關(guān)的,都基本到位了智厌,然后就開始從我們的 SourceStreamTask 的HeadOperator 的數(shù)據(jù)接收線程诲泌,開始流式處理。-------------

中場休息

▲回到 inv 2: runMailboxLoop(); 開始 2:17

inv 2: runMailboxLoop();

——inv 2》MailboxProcessor.runMailboxLoop()

—— 跳轉(zhuǎn)到父類入口 》SourceStreamTask#LegacySourceFunctionThread.run()
headOperator.run(lock,
——★ 》StreamSource#run()

  • 注釋: 獲取 Operator 的執(zhí)行上下文對象
    */
    this.ctx = StreamSourceContexts.getSourceContext(timeCharacteristic

  • 注釋: 真正運(yùn)行用戶的 Operator
    * 1铣鹏、如果你使用:env.socketTextStream() 則調(diào)用:
    SocketTextStreamFunction
    * 2敷扫、如果你使用:Kafka數(shù)據(jù)源, 則調(diào)用: FlinkKafkaConsumerBase
    * ......
    * function --> transformation ---> streamOperator
    * headOperator.run();
    */
    userFunction.run(ctx);
    ↓ 有很多source 诚卸,選擇SocketTextStreamFunction 為例
    —— 》SocketTextStreamFunction.run()

// TODO_MA 注釋: 沒有數(shù)據(jù)葵第,則阻塞在這兒
// TODO_MA 注釋: 在 SourceStreamTask 初始化的時候,SourceThread 的代碼能執(zhí)行到這兒
while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
// TODO_MA 注釋: 一直讀到有分隔符
while (buffer.length() >= delimiter.length()
.......
***** 把讀取到的數(shù)據(jù)傳給ctx 上下文處理****
ctx.collect(record);
↓ 選擇NonTimestamp - 時間戳為 processtime
StreamSourceContexts#NonTimestampContext#collect()

* output = CountingOutput
* -
* element 收到的 一條數(shù)據(jù)
* reuse 裝一條數(shù)據(jù)合溺,待序列化的容器
* collect 執(zhí)行 collect 會對 reuse 對象執(zhí)行序列化
*/
output.collect(reuse.replace(element));

—— 》 CountingOutput.collect()
* output = ChainingOutput
*/
output.collect(record);

—— 》OperatorChain#ChainingOutput.collect()
—— 》 pushToOperator(record);

  • 注釋: 調(diào)用 Operator 的 processElement 來處理 castRecord 數(shù)據(jù)記錄
    * 假設(shè)下一個算子是 keyBy卒密, 則跳轉(zhuǎn)到 : KeyedProcessOperator
    * 因?yàn)橹笠?shuffle 了,所以之后就沒有其他的 Operator 了
    * map() = StreamOperator = StreamMap = operator
    */
    operator.processElement(castRecord);

    —— 》StreamMap.processElement()

    • 注釋: 通過 Output 收集處理之后的結(jié)果數(shù)據(jù)
      • 1棠赛、userFunction.map(element.getValue()) 這是用戶自定義的 map 邏輯
      • 2哮奇、然后計算完的結(jié)果替換掉當(dāng)前 Opeartor 中的成員變量
      • 3、然后被 StreamMap 這個 StreamOperator 繼續(xù)收集
      • 記拙υ肌:因?yàn)楫?dāng)前是 map 操作鼎俘,所以下一步肯定不是 shuffle
      • 但是,如果是 shuffle 算子辩涝,則會執(zhí)行輸出了贸伐。
        */
        output.collect(element.replace(userFunction.map(element.getValue())));

        —— 》▲回到 OperatorChain#ChainingOutput..collect() 遞歸式鏈接調(diào)用
        // TODO_MA 注釋: 跳轉(zhuǎn)到下一個 Operator 來處理元素
        // TODO_MA 注釋: StreamMap
        // TODO_MA 注釋: keyBy
        pushToOperator(record);

        operator.processElement(castRecord);

        —— 》 KeyedProcessOperator.processElement(castRecord);
  • 注釋: 處理元素
    userFunction.processElement(element.getValue(), context, collector);
    ↓ keyby就是聚合
    —— 》 GroupAggFunction.processElement()

  • 注釋: 狀態(tài)初始化 拿到初始值
    */
    RowData accumulators = accState.value();
    // 設(shè)置累加器到第一個
    function.setAccumulators(accumulators);
    // 得到上一個值
    RowData prevAggValue = function.getValue();
    // TODO_MA 注釋: 累加 或者 縮回,其實(shí)都是聚合
    // update aggregate result and set to the newRow
    if (isAccumulateMsg(input)) {
    // accumulate input
    function.accumulate(input);
    } else {
    // retract input
    function.retract(input);
    }
    // TODO_MA 注釋: 計算完畢怔揩,得到新的結(jié)果
    // get current aggregate result
    RowData newAggValue = function.getValue();
    // get accumulator
    accumulators = function.getAccumulators();
    // TODO_MA 注釋: 講計算得到的結(jié)果捉邢,替換到當(dāng)前的 resultRow 成員變量中
    // prepare UPDATE_BEFORE message for previous row
    resultRow.replace(currentKey, prevAggValue)
    .setRowKind(RowKind.UPDATE_BEFORE);
    // TODO_MA 注釋: 寫出
    out.collect(resultRow);

    RecordWriterOutput.collect(resultRow)
    —— 》 pushToRecordWriter(StreamRecord<X> record)
    * 注釋: 發(fā)送
    */
    recordWriter.emit(serializationDelegate);
    ↓ 一般不是廣播
    —— 》 ChannelSelectorRecordWriter.emit()

    * 注釋: channelSelector確定目標(biāo)channel

    • channelSelector 的作用,就和 mapreduce 框架中的 Partitioner 是一樣的作用:
      • 用來決定 record 到底被分發(fā)到那個一個分區(qū)
      • channelSelector.selectChannel(record) = partitioner.getPartition()
        */
        emit(record, channelSelector.selectChannel(record));

        // TODO_MA 注釋: 序列化
        serializer.serializeRecord(record);
    // TODO_MA 注釋: 將序列化器中的序列化結(jié)果寫入目標(biāo) channel      
    if (copyFromSerializerToTargetChannel(targetChannel)) {
    

—— 》 ChannelSelectorRecordWriter.copyFromSerializerToTargetChannel()
flushTargetPartition(targetChannel);
↓ 一般不是廣播
—— 》ResultPartitionWriter#flushTargetPartition()
* 注釋: flush 到對應(yīng)的 ResultPartition 中
* targetChannel = InputChannel
* targetPartition = ResultPartition
*/
targetPartition.flush(targetChannel);

ResultPartition.flush()

PipelinedSubpartition.flush()
—— 》 PipelinedSubpartition# notifyDataAvailable();

// TODO_MA 注釋: readView 是 ResultSubPartition 的消費(fèi)者視圖 對象
// TODO_MA 注釋: 下游的一個Task 可能會消費(fèi)上游的多個Task的某一個分區(qū)的數(shù)據(jù)商膊。
/ TODO_MA 注釋: 上游個任意一個Task的任意一個分區(qū)叫做: ResultSubPartition伏伐,
// TODO_MA 注釋: 這個 ResultSubPartition 對應(yīng)一個消費(fèi)者:
PipelinedSubpartitionView

             readView.notifyDataAvailable();

—— 》PipelinedSubpartitionView#notifyDataAvailable()

—— 》LocalInputChannel#notifyDataAvailable()
* 注釋:
*/
inputGate.notifyChannelNonEmpty(this);
—— 》SingleInputGate.notifyChannelNonEmpty()
* 注釋: 某個 channel 有可寫入數(shù)了,該干活了翘狱。
*/
queueChannel(checkNotNull(channel));
—— 》SingleInputGate.queueChannel()

  • 注釋: 加入隊(duì)列中
    * 既然將 有數(shù)據(jù)可用的channel 加入到 inputChannelsWithData秘案,
    * 那就證明,一定有其他的什么角色來從這個隊(duì)列中獲取 可用的channel 來消費(fèi)數(shù)據(jù)
    */
    inputChannelsWithData.add(channel);

  • 注釋: 發(fā)送信號潦匈!
    */
    if(availableChannels == 0) {
    // TODO_MA 注釋: 如果之前隊(duì)列中沒有channel
    //阱高,這個channel加入后,通知等待的線程
    inputChannelsWithData.notifyAll(); 2:50

以上線程會接到 steamTask啟動時wait的 線程傳到這個方法*
—— 》SingleInputGate.getChannel()
——主數(shù)據(jù)處理執(zhí)行這個 》 StreamTaskNetworkInput.emitNext()
—— 輸出》 OneInputStreamTask#StreamTaskNetworkOutput.emitRecord()

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末茬缩,一起剝皮案震驚了整個濱河市赤惊,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌凰锡,老刑警劉巖未舟,帶你破解...
    沈念sama閱讀 217,406評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件圈暗,死亡現(xiàn)場離奇詭異,居然都是意外死亡裕膀,警方通過查閱死者的電腦和手機(jī)员串,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評論 3 393
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來昼扛,“玉大人寸齐,你說我怎么就攤上這事〕常” “怎么了渺鹦?”我有些...
    開封第一講書人閱讀 163,711評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長蛹含。 經(jīng)常有香客問我毅厚,道長,這世上最難降的妖魔是什么浦箱? 我笑而不...
    開封第一講書人閱讀 58,380評論 1 293
  • 正文 為了忘掉前任吸耿,我火速辦了婚禮,結(jié)果婚禮上憎茂,老公的妹妹穿的比我還像新娘珍语。我一直安慰自己,他們只是感情好竖幔,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,432評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著是偷,像睡著了一般拳氢。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上蛋铆,一...
    開封第一講書人閱讀 51,301評論 1 301
  • 那天馋评,我揣著相機(jī)與錄音,去河邊找鬼刺啦。 笑死留特,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的玛瘸。 我是一名探鬼主播蜕青,決...
    沈念sama閱讀 40,145評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼糊渊!你這毒婦竟也來了右核?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,008評論 0 276
  • 序言:老撾萬榮一對情侶失蹤渺绒,失蹤者是張志新(化名)和其女友劉穎贺喝,沒想到半個月后菱鸥,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,443評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡躏鱼,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,649評論 3 334
  • 正文 我和宋清朗相戀三年氮采,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片染苛。...
    茶點(diǎn)故事閱讀 39,795評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡扳抽,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出殖侵,到底是詐尸還是另有隱情贸呢,我是刑警寧澤,帶...
    沈念sama閱讀 35,501評論 5 345
  • 正文 年R本政府宣布拢军,位于F島的核電站楞陷,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏茉唉。R本人自食惡果不足惜固蛾,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,119評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望度陆。 院中可真熱鬧艾凯,春花似錦、人聲如沸懂傀。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蹬蚁。三九已至恃泪,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間犀斋,已是汗流浹背贝乎。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留叽粹,地道東北人览效。 一個月前我還...
    沈念sama閱讀 47,899評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像虫几,于是被迫代替她去往敵國和親锤灿。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,724評論 2 354

推薦閱讀更多精彩內(nèi)容