Flink DataStream API 批處理能力演進之路

摘要:本文由阿里云 Flink 團隊郭偉杰老師撰寫,旨在向 Flink Batch 社區(qū)用戶介紹 Flink DataStream API 批處理能力的演進之路症脂。內(nèi)容主要分為以下三個部分:

  1. 批處理語義和性能優(yōu)化
  2. Batch API 功能增強
  3. 總結(jié)

最近在和一個朋友閑聊時谚赎,他問了一個很有意思的問題:Flink 是如何在流處理引擎上支持批處理能力的淫僻?

鑒于 Flink 已經(jīng)成為了流處理領(lǐng)域的事實標準,可能很多人都不知道壶唤,F(xiàn)link 在誕生的第一天起就是支持批處理的雳灵。DataSet API 也是從那時起就被引入的,它被用來支持對有界數(shù)據(jù)的批處理操作闸盔。隨著 Flink 社區(qū)逐步意識到基于 Pipeline 的架構(gòu)非常適合流處理悯辙,因此發(fā)展出了 DataStream API[1],它是為無界的流式應(yīng)用開發(fā)的蕾殴,引入了狀態(tài)笑撞,事件時間和窗口等特殊概念岛啸。

但隨著對兩套 API 本質(zhì)的深入思考钓觉,F(xiàn)link 社區(qū)逐漸發(fā)現(xiàn):DataStream API 其實完全可以成為 DataSet API 的超集。

  • 概念上:有界數(shù)據(jù)集只是無限數(shù)據(jù)流的一種特例坚踩。
  • 語義上:DataStream API 覆蓋了 DataSet API 的大部分荡灾,同時還有針對實時流計算的擴展,只有少數(shù)和分區(qū)有關(guān)的語義暫時沒有支持瞬铸。具體的差異見下圖:

同時維護兩套 API 也對社區(qū)造成了很大的困擾批幌,并且用戶開發(fā)作業(yè)前必須提前在兩種 API 中作出選擇。對于用戶來說嗓节,離線和實時作業(yè)具有相同的處理邏輯是很常見的荧缘。如果只編寫一次代碼,就能達到分別開發(fā)流和批兩個作業(yè)的相同效果拦宣,將會帶來極大的便利截粗。鑒于以上諸多原因,F(xiàn)link 最終走上了基于 DataStream 的流批一體的發(fā)展道路鸵隧。也正是如此绸罗,F(xiàn)link 社區(qū)早在 1.12 版本就開始逐步棄用 DataSet API,將會在 Flink 2.0 中完全移除 DataSet 相關(guān)代碼豆瘫。同時珊蟀,不斷提升 DataStream 上的批處理能力,以 DataStream 為核心打造流批一體的 API外驱。

流批一體是一個相對寬泛的概念育灸,它包含 API,調(diào)度,Shuffle赐劣,容錯等多個維度筋帖,本文主要關(guān)注于 API 及其底層算子執(zhí)行上 DataStream 對批處理所做的工作,其他細節(jié)可以參考文章《Flink 執(zhí)行引擎:流批一體的融合之路》[2]绽诚。下面我們將沿著批處理語義和性能優(yōu)化以及 Batch API 功能增強兩個大的方向回顧 Flink DataStream API 批處理能力演進之路。

1. 批處理語義和性能優(yōu)化

DataStream API 雖然理論上可以覆蓋絕大多數(shù) DataSet API 上的語義和操作,但在一些細微之處還是存在一些差異恩够。下面我們從幾個方面詳細介紹一些 Flink 社區(qū)在這方面所做的努力卒落。

1.1 輸出語義

為了最大化數(shù)據(jù)的實時性,DataStream 上算子的輸出是增量式的蜂桶。例如:KeyedStream.reduce儡毕,它會在每次到來一條新的數(shù)據(jù)時更新內(nèi)部維護的狀態(tài),并向下游發(fā)送當前最新的聚合值扑媚。用數(shù)據(jù)庫的術(shù)語來說腰湾,它產(chǎn)生了一個 Upsert 流作為輸出: 如果一個鍵有 10 個輸入元素,那么也會得到 10 條輸出記錄疆股。

而對于實時性往往沒有這么強要求的批作業(yè)來說费坊,這些中間的增量輸出會極大地增加下游算子的計算負擔。由于批作業(yè)的算子不需要感知數(shù)據(jù)的 Changelog, 其更期望的是一種 All-or-Nothing 式的輸出語義旬痹,即僅僅在每個 Key 最后一條數(shù)據(jù)到來后附井,才向下游發(fā)送數(shù)據(jù)。因此两残,我們需要在批模式下對一些 API(例如:KeyedStream#reduce, sum,min,max,minBy,maxBy) 的行為做出改變永毅,使其僅在輸入結(jié)束時輸出最終結(jié)果。

下表描述了 Sum 操作在流和批兩種模式下的輸入輸出情況:

(假設(shè)它們具有相同的 Key人弓,4 為該 Key 的最后一條數(shù)據(jù))

輸入 流模式輸出 批模式輸出
1 1
2 3
3 6
4 10 10

1.2 狀態(tài)訪問和更新算法

對于有狀態(tài)算子沼死,DataSet 算子在迭代數(shù)據(jù)時直接在內(nèi)存中維護最新的狀態(tài)值。在 DataStream API 中崔赌,狀態(tài)的訪問和更新則是通過與 StateBackend 交互所進行的意蛀。實現(xiàn)流批一體的統(tǒng)一架構(gòu),就意味著 DataStream API 在流和批模式下要盡可能復(fù)用相同的算子實現(xiàn)峰鄙。但是與 RocksDB 等 StateBackend 交互會帶來不小的 IO 開銷浸间,站在 Flink 開發(fā)者的視角上,該如何解決這個問題呢吟榴?讓我們更深入地思考一下他們之間的本質(zhì)差異魁蒜。

流模式下 DataStream API 上的聚合算法其實可以類比為基于 Hash 的聚合,StateBackend 在這里扮演著哈希表的角色吩翻。下圖展示了在流模式下一個 KeyedOperator 的輸入數(shù)據(jù)和狀態(tài)存儲的關(guān)系:

(綠色部分表示新數(shù)據(jù)到來后狀態(tài)存儲的更新)

我們可以看到:在流模式下兜看,狀態(tài)存儲必須維持一個哈希表,為每個 Key 存儲一條 Item狭瞎。值得注意的是细移,該狀態(tài)并不是完全存儲在內(nèi)存中的,達到一定閾值后需要溢寫到磁盤熊锭。由于批作業(yè)是沒有 Checkpoint 的弧轧,并且其 Shuffle 的中間數(shù)據(jù)直接寫入到了磁盤中雪侥,發(fā)生 Failover 后直接從上一個 Stage 的數(shù)據(jù)重新計算狀態(tài)即可,因此并不需要對狀態(tài)進行持久化存儲精绎,理論上狀態(tài)完全可以放在內(nèi)存中速缨。

接下來要考慮的是內(nèi)存是否有 OOM 風(fēng)險:對于單個 Key 來說,其狀態(tài)不會非常大代乃。由于批作業(yè)的數(shù)據(jù)是有界的旬牲,如果我們能對 key 進行分組,就可以在同一時間只追蹤單一 Key 的狀態(tài)搁吓。沿著這個思路原茅,我們可以把基于 Hash 的狀態(tài)訪問算法變?yōu)榛谂判虻摹R虼硕樽校現(xiàn)link 在批執(zhí)行模式下會對 KeyOperator 的所有輸入數(shù)據(jù)按 Key 進行排序擂橘,并且在該模式下使用一種特殊的 StateBackend,它在內(nèi)存中追蹤當前 Key 所對應(yīng)的狀態(tài)贮预,當 Key 發(fā)生切換時清除上一個 Key 的狀態(tài)值贝室。

批執(zhí)行模式下契讲,一個 KeyedOperator 的輸入數(shù)據(jù)和狀態(tài)存儲的關(guān)系如下圖所示:

需要注意的是仿吞,這種方式引入了額外的數(shù)據(jù)排序開銷,當狀態(tài)訪問的頻率比較低捡偏,狀態(tài)的數(shù)據(jù)量比較小時唤冈,對性能會有負面影響。但是考慮到絕大多數(shù)的批處理作業(yè)規(guī)模都比較大银伟,其中的有狀態(tài)算子往往需要 per-record 的訪問和更新狀態(tài)你虹。比如對常見的 Join、Group Agg 等彤避,往往存在很多重復(fù) Key 的數(shù)據(jù)傅物,該優(yōu)化帶來的收益通常比排序帶來的開銷要大的多。

1.3 EventTime 和 Watermark

實時數(shù)據(jù)流中事件可能是亂序的琉预,即時間戳為 T 的事件可能出現(xiàn)在時間戳為 T+1 的事件之后董饰。此外,系統(tǒng)無法確定是否將來還有時間戳為 t < T 的元素到來圆米。因此卒暂,F(xiàn)link 的流處理模式是建立在事件的順序無法得到保證的前提下的。為了消除這種無序性帶來的影響娄帖,F(xiàn)link 引入了一種名為 Watermark 的標記也祠。一個時間戳為 T 的 Watermark 到來,表示不會再收到或者可以直接忽略任何 t < T 的數(shù)據(jù)近速。

但在批執(zhí)行模式下诈嘿,數(shù)據(jù)是有界的堪旧,我們明確知道每一條數(shù)據(jù)的時間,因此可以認為不存在無法預(yù)知的遲到數(shù)據(jù)奖亚。發(fā)送中間的 Watermark 是沒有意義的, 反而只會增加網(wǎng)絡(luò)傳輸?shù)膲毫拖掠翁幚磉@些 Watermark 的復(fù)雜度崎场。由于定時器和窗口的閉合都需要 Watermark 來觸發(fā),因此我們可以只在輸入結(jié)束時發(fā)送 MAX_WATERMARK遂蛀,或者在每個 Key 結(jié)束時發(fā)送 MAX_WATERMARK谭跨。這樣既不會引入太多開銷,又可以統(tǒng)一流批算子對于 EventTime 的處理李滴。

2. Batch API 功能增強

需要注意的是螃宙,DataStream API 和 DataSet API 所支持的操作并非完全一一對應(yīng)。Flink 社區(qū)有一個官方遷移文檔來專門講解如何從 DataSet 作業(yè)遷移到 DataStream 作業(yè)[3] (下文簡稱文檔)所坯。在該文檔中谆扎,根據(jù)遷移所帶來的代碼改動和執(zhí)行效率的差異,把 DataSet API 分成了四大類:

  1. 在 DataStream 有等價的 API芹助,只需要很少的方法名改動就可以完成遷移堂湖。
  2. 通過 DataStream 上不等價的其他 API 可以實現(xiàn)同樣的行為,遷移雖然需要進行代碼改動状土,但是執(zhí)行效率和 DataSet 相同无蜂。
  3. 通過 DataStream 上不等價的其他 API 可以實現(xiàn)同樣的行為,遷移不僅需要進行代碼改動蒙谓,而且執(zhí)行效率可能會存在一些差異斥季。
  4. 目前 DataStream 沒有支持,且沒有簡單的 Workaround 的 API累驮。

按照目前 DataStream 上對這些操作的支持情況酣倾,我們又可以把它們進一步分為下面兩大類:

2.1 完美支持或者可以通過 Workaround 支持

上述四類中,第1和第2類都屬于可以無痛遷移的谤专,第3類可以通過 Workaround 來實現(xiàn)躁锡,但是在執(zhí)行效率上有比較大的差異。因此置侍,我們主要關(guān)注于第三和第四類映之。

第三類主要有兩種操作:全量 Partition 處理以及笛卡爾積。DataStream 上可以通過 Window 機制來支持這類需求墅垮,但是其中主要存在以下兩個問題:

(1)需要明確知道輸入在什么時候結(jié)束惕医,在拿到全量數(shù)據(jù)后才能進行處理。

Flink 目前內(nèi)置的窗口一般都是隨著時間推進到某個具體的點算色,或者輸入數(shù)據(jù)的量達到某個具體的值來觸發(fā)的抬伺。并沒有一種能夠感知輸入是否結(jié)束的窗口實現(xiàn)。文檔通過自定義的 WindowAssigner 和 Window Trigger 實現(xiàn)了一種僅在輸入結(jié)束時才觸發(fā)計算的窗口灾梦。

隨著用戶作業(yè)的遷移峡钓,我們看到這種需求其實廣泛存在妓笙,因此 Flink 社區(qū)在 FLIP-331[4] 中提出了EndOfStreamWindow 的概念,并會在 Flink 1.20 版本中進行支持能岩,你可以通過如下方式來使用:

input.window(GlobalWindows.createWithEndOfStreamTrigger())
                .apply(
                        new WindowFunction<T, R, KEY, GlobalWindow>() {
                            @Override
                            public void apply(
                                    KEY key,
                                    GlobalWindow window,
                                    Iterable<T> input,
                                    Collector<R> out)
                                    throws Exception {
                                // do something with the iterable input, It has all the input data.
                            }
                        },
                        resultType);

(2)Non-Keyed Stream 上不支持窗口操作

Flink 中的窗口是基于 State 來實現(xiàn)的寞宫,而不同 Key 的 State 是不屬于同一個命名空間的,因此窗口只有在能明確定義 Key 的流上才有意義拉鹃。文檔中引入如下函數(shù)來給數(shù)據(jù)附加上當前分區(qū)(并行度)的信息辈赋,然后以該字段作為數(shù)據(jù)的 Key。

public static class AddSubtaskIDMapFunction<T> extends RichMapFunction<T, Tuple2<String, T>> {
    @Override
    public Tuple2<String, T> map(T value) {
        return Tuple2.of(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()), value);
    }
}

這種方式雖然可以產(chǎn)生正確結(jié)果膏燕,但也引入了per-record的額外開銷钥屈。為了優(yōu)化這個問題,F(xiàn)link 社區(qū)在 FLIP-380[5] 中引入了對 Non-Keyed 全量分區(qū)處理的原生支持坝辫。下面一一介紹這幾個 API 的使用方式和注意事項:

2.1.1Map Partition

該 API 用來對一個分區(qū)的數(shù)據(jù)做全量處理篷就,并在獲取所有數(shù)據(jù)后進行輸出。

假如我們需要計算每個分區(qū)內(nèi)數(shù)據(jù)的條數(shù)近忙,并輸出給下游算子竭业。可以使用如下方式來實現(xiàn):

inputStream.fullWindowPartition()
                .mapPartition(
         new MapPartitionFunction<Record, Long>() {
                             @Override
                    public void mapPartition(
                            Iterable<Record> values, Collector<Long> out)
                            throws Exception {
                        long counter = 0;
                        for (T value : values) {
                            counter++;
                        }
                        out.collect(counter));
                    }
          })

它與 map 的主要區(qū)別如下:

MapPartition Map
計算觸發(fā)時機 所有輸入結(jié)束后觸發(fā)一次 每條輸入數(shù)據(jù)都會觸發(fā)一次
輸入數(shù)據(jù)類型 包含所有數(shù)據(jù)的 Iterable 對象 每條數(shù)據(jù)自身

值得注意的是:MapPartition 雖然給調(diào)用者提供了一個基于全量數(shù)據(jù)的 Iterable 對象及舍,但它并不會把全量數(shù)據(jù)都加載到內(nèi)存未辆。該 API 的底層實現(xiàn)充分利用了 Flink 執(zhí)行引擎的反壓機制,在對 Iterable 對象進行迭代時只會按需把數(shù)據(jù)加載到內(nèi)存击纬。

2.1.2Reduce/Aggregate Partition

該 API 主要用于對分區(qū)內(nèi)的數(shù)據(jù)做全量聚合鼎姐,分別需要傳入 ReduceFunction 和 AggregateFunction。ReduceFunction 描述了兩條輸入數(shù)據(jù)如何合并產(chǎn)生同樣類型的輸出數(shù)據(jù)更振,而 AggregateFunction 是更通用的 ReduceFunction, 它通過引入一個中間的 Accumulator, 支持產(chǎn)生不同類型的輸出。

下面的例子展示了如何在一個雙字段的 Tuple 數(shù)據(jù)流上對其第二個字段做全量聚合

inputStream.fullWindowPartition()
       .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                        @Override
                        public Tuple2<String, Integer> reduce(
                              Tuple2<String, Integer> value1,
                              Tuple2<String, Integer> value2) throws Exception {
                          return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                                    }
                                })

2.1.3 Sort Partition

另一種比較重要的操作就是排序饭尝,對分區(qū)內(nèi)數(shù)據(jù)進行排序的需求在批處理中是廣泛存在的肯腕。理論上,我們可以通過 MapPartition 來輕松實現(xiàn)全內(nèi)存的排序钥平,但是在大規(guī)模 Batch 作業(yè)中实撒,把數(shù)據(jù)全部加載到內(nèi)存中往往是不現(xiàn)實的。sortPartition API 支持外部排序涉瘾,在數(shù)據(jù)量到達一定閾值后會溢寫磁盤知态,因此無需擔心內(nèi)存的 OOM 問題。

下面是一個對分區(qū)內(nèi)數(shù)據(jù)做全量升序排列的示例代碼:

DataStreamSource<Tuple2<String, Integer>> source = xxxxx
 // 按照 tuple 的第一個字段進行排序
 SingleOutputStreamOperator<Tuple2<String, Integer>> sortedPartition =
                source.fullWindowPartition().sortPartition(1, Order.ASCENDING);

注意:排序算子會使用 Flink Managed Memory立叛。內(nèi)存的大小會影響排序的效率负敏,過小的內(nèi)存會導(dǎo)致數(shù)據(jù)頻繁地寫入和讀出磁盤。如果你的一些排序操作相對較重(數(shù)據(jù) Record 比較大秘蛇,數(shù)據(jù)量比較多)其做,建議調(diào)大“execution.sort-partition.memory”值來提升性能顶考。

2.2 目前還不支持

上述第四類代表目前 Flink DataStream API 還沒有支持的操作。主要有兩種: RangePartition 和 GroupCombine.

其中 GroupCombine 會把數(shù)據(jù)分成多個批次妖泄,對每個批次的數(shù)據(jù)進行合并驹沿。它并不是用戶的業(yè)務(wù)需求,是引擎為了提高執(zhí)行效率而對用戶的需求蹈胡,因此Flink 社區(qū)暫時沒有計劃支持該操作渊季。而 RangePartition 基于現(xiàn)有的 DataStream API 可以實現(xiàn),但是相對復(fù)雜(需要用戶實現(xiàn)復(fù)雜的采樣算法)罚渐,筆者所在的團隊已經(jīng)對此在做 PoC 實現(xiàn)了梭域,未來會在合適的時機貢獻回社區(qū)。

3. 總結(jié)

本文回顧了 Flink 在批處理能力上從 DataSet API 到流批一體的 DataStream API 的演進搅轿,并從批處理語義&性能優(yōu)化以及 Batch API 功能增強兩大方面分別展示了 Flink 社區(qū)是如何思考和提升 DataStream 批處理能力的病涨,相信隨著社區(qū)的不斷努力,F(xiàn)link Batch 會越來越好璧坟。Flink DataStream API 的流批一體能力也將在數(shù)據(jù)處理領(lǐng)域扮演越來越重要的角色既穆。

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741#FLIP131:ConsolidatetheuserfacingDataflowSDKs/APIs(anddeprecatetheDataSetAPI)-WhydoesFlinkhavethreeAPIs

[2] https://developer.aliyun.com/article/783112

[3] https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/dataset_migration/

[4] https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamTrigger+and+isOutputOnlyAfterEndOfStream+operator+attribute+to+optimize+task+deployment

[5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-380%3A+Support+Full+Partition+Processing+On+Non-keyed+DataStream

歡迎大家加入 Flink Batch 交流釘釘群。本群旨在為 Flink Batch 愛好者提供一個交流技術(shù)和傳遞資訊的平臺雀鹃,在這里:

  • 你可以掌握Flink Batch前沿的資訊幻工,可以與 Flink 開發(fā)者及 Committer 面對面交流
  • Flink Batch 的問題集中解決,各位開發(fā)者及 Committer 及時解決你的 Blocker

“Flink Batch 交流群”群的釘釘群號: 34817520

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末黎茎,一起剝皮案震驚了整個濱河市囊颅,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌傅瞻,老刑警劉巖踢代,帶你破解...
    沈念sama閱讀 219,110評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異嗅骄,居然都是意外死亡胳挎,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評論 3 395
  • 文/潘曉璐 我一進店門溺森,熙熙樓的掌柜王于貴愁眉苦臉地迎上來慕爬,“玉大人,你說我怎么就攤上這事屏积∫搅” “怎么了?”我有些...
    開封第一講書人閱讀 165,474評論 0 356
  • 文/不壞的土叔 我叫張陵炊林,是天一觀的道長姥卢。 經(jīng)常有香客問我,道長铛铁,這世上最難降的妖魔是什么隔显? 我笑而不...
    開封第一講書人閱讀 58,881評論 1 295
  • 正文 為了忘掉前任却妨,我火速辦了婚禮,結(jié)果婚禮上括眠,老公的妹妹穿的比我還像新娘彪标。我一直安慰自己,他們只是感情好掷豺,可當我...
    茶點故事閱讀 67,902評論 6 392
  • 文/花漫 我一把揭開白布捞烟。 她就那樣靜靜地躺著,像睡著了一般当船。 火紅的嫁衣襯著肌膚如雪题画。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,698評論 1 305
  • 那天德频,我揣著相機與錄音苍息,去河邊找鬼。 笑死壹置,一個胖子當著我的面吹牛竞思,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播钞护,決...
    沈念sama閱讀 40,418評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼盖喷,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了难咕?” 一聲冷哼從身側(cè)響起课梳,我...
    開封第一講書人閱讀 39,332評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎余佃,沒想到半個月后暮刃,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,796評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡咙冗,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,968評論 3 337
  • 正文 我和宋清朗相戀三年沾歪,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片雾消。...
    茶點故事閱讀 40,110評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖挫望,靈堂內(nèi)的尸體忽然破棺而出立润,到底是詐尸還是另有隱情,我是刑警寧澤媳板,帶...
    沈念sama閱讀 35,792評論 5 346
  • 正文 年R本政府宣布桑腮,位于F島的核電站,受9級特大地震影響蛉幸,放射性物質(zhì)發(fā)生泄漏破讨。R本人自食惡果不足惜丛晦,卻給世界環(huán)境...
    茶點故事閱讀 41,455評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望提陶。 院中可真熱鬧烫沙,春花似錦、人聲如沸隙笆。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,003評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽撑柔。三九已至瘸爽,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間铅忿,已是汗流浹背剪决。 一陣腳步聲響...
    開封第一講書人閱讀 33,130評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留檀训,地道東北人柑潦。 一個月前我還...
    沈念sama閱讀 48,348評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像肢扯,于是被迫代替她去往敵國和親妒茬。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,047評論 2 355

推薦閱讀更多精彩內(nèi)容