第04講:Flink 常用的 DataSet 和 DataStream API

Flink系列文章

  1. 第01講:Flink 的應(yīng)用場景和架構(gòu)模型
  2. 第02講:Flink 入門程序 WordCount 和 SQL 實現(xiàn)
  3. 第03講:Flink 的編程模型與其他框架比較
  4. 第04講:Flink 常用的 DataSet 和 DataStream API
  5. 第05講:Flink SQL & Table 編程和案例
  6. 第06講:Flink 集群安裝部署和 HA 配置
  7. 第07講:Flink 常見核心概念分析
  8. 第08講:Flink 窗口狰闪、時間和水印
  9. 第09講:Flink 狀態(tài)與容錯
  10. 第10講:Flink Side OutPut 分流
  11. 第11講:Flink CEP 復(fù)雜事件處理
  12. 第12講:Flink 常用的 Source 和 Connector
  13. 第13講:如何實現(xiàn)生產(chǎn)環(huán)境中的 Flink 高可用配置
  14. 第14講:Flink Exactly-once 實現(xiàn)原理解析
  15. 第15講:如何排查生產(chǎn)環(huán)境中的反壓問題
  16. 第16講:如何處理Flink生產(chǎn)環(huán)境中的數(shù)據(jù)傾斜問題
  17. 第17講:生產(chǎn)環(huán)境中的并行度和資源設(shè)置

關(guān)注公眾號:大數(shù)據(jù)技術(shù)派焙蹭,回復(fù)資料,領(lǐng)取1024G資料状勤。

本課時我們主要介紹 Flink 的 DataSet 和 DataStream 的 API丘喻,并模擬了實時計算的場景解恰,詳細(xì)講解了 DataStream 常用的 API 的使用鲫剿。

說好的流批一體呢

現(xiàn)狀

在前面的課程中掐暮,曾經(jīng)提到過枕面,F(xiàn)link 很重要的一個特點是“流批一體”愿卒,然而事實上 Flink 并沒有完全做到所謂的“流批一體”,即編寫一套代碼潮秘,可以同時支持流式計算場景和批量計算的場景琼开。目前截止 1.10 版本依然采用了 DataSet 和 DataStream 兩套 API 來適配不同的應(yīng)用場景。

DateSet 和 DataStream 的區(qū)別和聯(lián)系

在官網(wǎng)或者其他網(wǎng)站上枕荞,都可以找到目前 Flink 支持兩套 API 和一些應(yīng)用場景柜候,但大都缺少了“為什么”這樣的思考。

Apache Flink 在誕生之初的設(shè)計哲學(xué)是:用同一個引擎支持多種形式的計算躏精,包括批處理渣刷、流處理和機器學(xué)習(xí)等。尤其是在流式計算方面矗烛,Flink 實現(xiàn)了計算引擎級別的流批一體辅柴。那么對于普通開發(fā)者而言,如果使用原生的 Flink 瞭吃,直接的感受還是要編寫兩套代碼碌识。

整體架構(gòu)如下圖所示:

[圖片上傳失敗...(image-51c5ef-1644856901885)]
在 Flink 的源代碼中,我們可以在 flink-java 這個模塊中找到所有關(guān)于 DataSet 的核心類虱而,DataStream 的核心實現(xiàn)類則在 flink-streaming-java 這個模塊。

[圖片上傳失敗...(image-37e490-1644856901885)]

[圖片上傳失敗...(image-a2cced-1644856901885)]

在上述兩張圖中开泽,我們分別打開 DataSet 和 DataStream 這兩個類牡拇,可以發(fā)現(xiàn),二者支持的 API 都非常豐富且十分類似,比如常用的 map惠呼、filter导俘、join 等常見的 transformation 函數(shù)。

我們在前面的課時中講過 Flink 的編程模型剔蹋,對于 DataSet 而言旅薄,Source 部分來源于文件、表或者 Java 集合泣崩;而 DataStream 的 Source 部分則一般是消息中間件比如 Kafka 等少梁。

由于 Flink DataSet 和 DataStream API 的高度相似,并且 Flink 在實時計算領(lǐng)域中應(yīng)用的更為廣泛矫付。所以下面我們詳細(xì)講解 DataStream API 的使用凯沪。

DataStream

我們先來回顧一下 Flink 的編程模型,在之前的課時中提到過买优,F(xiàn)link 程序的基礎(chǔ)構(gòu)建模塊是(Streams)和轉(zhuǎn)換(Transformations)妨马,每一個數(shù)據(jù)流起始于一個或多個 Source,并終止于一個或多個 Sink杀赢。數(shù)據(jù)流類似于有向無環(huán)圖(DAG)烘跺。

[圖片上傳失敗...(image-573231-1644856901885)]

在第 02 課時中模仿了一個流式計算環(huán)境,我們選擇監(jiān)聽一個本地的 Socket 端口脂崔,并且使用 Flink 中的滾動窗口滤淳,每 5 秒打印一次計算結(jié)果。

自定義實時數(shù)據(jù)源

在本課時中脱篙,我們利用 Flink 提供的自定義 Source 功能來實現(xiàn)一個自定義的實時數(shù)據(jù)源娇钱,具體實現(xiàn)如下:

public class MyStreamingSource implements SourceFunction<MyStreamingSource.Item> {
    private boolean isRunning = true;
    /**
     * 重寫run方法產(chǎn)生一個源源不斷的數(shù)據(jù)發(fā)送源
     * @param ctx
     * @throws Exception
     */

    @Override
    public void run(SourceContext<Item> ctx) throws Exception {
        while(isRunning){
            Item item = generateItem();
            ctx.collect(item);
            //每秒產(chǎn)生一條數(shù)據(jù)
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    //隨機產(chǎn)生一條商品數(shù)據(jù)

    private Item generateItem(){
        int i = new Random().nextInt(100);
        Item item = new Item();
        item.setName("name" + i);
        item.setId(i);
        return item;
    }
    class Item{
        private String name;
        private Integer id;
        Item() {
        }

        public String getName() {

            return name;

        }



        void setName(String name) {

            this.name = name;

        }



        private Integer getId() {

            return id;

        }



        void setId(Integer id) {

            this.id = id;

        }



        @Override

        public String toString() {

            return "Item{" +

                    "name='" + name + '\'' +

                    ", id=" + id +

                    '}';

        }

    }

}





class StreamingDemo {

    public static void main(String[] args) throws Exception {



        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //獲取數(shù)據(jù)源

        DataStreamSource<MyStreamingSource.Item> text = 

        //注意:并行度設(shè)置為1,我們會在后面的課程中詳細(xì)講解并行度

        env.addSource(new MyStreamingSource()).setParallelism(1); 

        DataStream<MyStreamingSource.Item> item = text.map(

                (MapFunction<MyStreamingSource.Item, MyStreamingSource.Item>) value -> value);



        //打印結(jié)果

        item.print().setParallelism(1);

        String jobName = "user defined streaming source";

        env.execute(jobName);

    }



}

在自定義的數(shù)據(jù)源中,實現(xiàn)了 Flink 中的 SourceFunction 接口绊困,同時實現(xiàn)了其中的 run 方法文搂,在 run 方法中每隔一秒鐘隨機發(fā)送一個自定義的 Item。

可以直接運行 main 方法來進行測試:

[圖片上傳失敗...(image-b9cc3e-1644856901885)]

可以在控制臺中看到秤朗,已經(jīng)有源源不斷地數(shù)據(jù)開始輸出煤蹭。下面我們就用自定義的實時數(shù)據(jù)源來演示 DataStream API 的使用。

Map

Map 接受一個元素作為輸入取视,并且根據(jù)開發(fā)者自定義的邏輯處理后輸出硝皂。

[圖片上傳失敗...(image-918eb2-1644856901885)]

復(fù)制代碼

class StreamingDemo {

    public static void main(String[] args) throws Exception {



        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //獲取數(shù)據(jù)源

        DataStreamSource<MyStreamingSource.Item> items = env.addSource(new MyStreamingSource()).setParallelism(1); 

        //Map

        SingleOutputStreamOperator<Object> mapItems = items.map(new MapFunction<MyStreamingSource.Item, Object>() {

            @Override

            public Object map(MyStreamingSource.Item item) throws Exception {

                return item.getName();

            }

        });

        //打印結(jié)果

        mapItems.print().setParallelism(1);

        String jobName = "user defined streaming source";

        env.execute(jobName);

    }

}

我們只取出每個 Item 的 name 字段進行打印。

[圖片上傳失敗...(image-256b04-1644856901885)]

注意作谭,Map 算子是最常用的算子之一稽物,官網(wǎng)中的表述是對一個 DataStream 進行映射,每次進行轉(zhuǎn)換都會調(diào)用 MapFunction 函數(shù)折欠。從源 DataStream 到目標(biāo) DataStream 的轉(zhuǎn)換過程中贝或,返回的是 SingleOutputStreamOperator吼过。當(dāng)然了,我們也可以在重寫的 map 函數(shù)中使用 lambda 表達(dá)式咪奖。

復(fù)制代碼

SingleOutputStreamOperator<Object> mapItems = items.map(

      item -> item.getName()

);

甚至盗忱,還可以自定義自己的 Map 函數(shù)兵志。通過重寫 MapFunction 或 RichMapFunction 來自定義自己的 map 函數(shù)羹呵。

復(fù)制代碼

class StreamingDemo {

    public static void main(String[] args) throws Exception {



        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //獲取數(shù)據(jù)源

        DataStreamSource<MyStreamingSource.Item> items = env.addSource(new MyStreamingSource()).setParallelism(1);

        SingleOutputStreamOperator<String> mapItems = items.map(new MyMapFunction());

        //打印結(jié)果

        mapItems.print().setParallelism(1);

        String jobName = "user defined streaming source";

        env.execute(jobName);

    }



    static class MyMapFunction extends RichMapFunction<MyStreamingSource.Item,String> {



        @Override

        public String map(MyStreamingSource.Item item) throws Exception {

            return item.getName();

        }

    }

}

此外宠叼,在 RichMapFunction 中還提供了 open宏邮、close 等函數(shù)方法癞志,重寫這些方法還能實現(xiàn)更為復(fù)雜的功能撕贞,比如獲取累加器决采、計數(shù)器等夷陋。

FlatMap

FlatMap 接受一個元素料身,返回零到多個元素汤纸。FlatMap 和 Map 有些類似,但是當(dāng)返回值是列表的時候芹血,F(xiàn)latMap 會將列表“平鋪”贮泞,也就是以單個元素的形式進行輸出。

復(fù)制代碼

SingleOutputStreamOperator<Object> flatMapItems = items.flatMap(new FlatMapFunction<MyStreamingSource.Item, Object>() {

    @Override

    public void flatMap(MyStreamingSource.Item item, Collector<Object> collector) throws Exception {

        String name = item.getName();

        collector.collect(name);

    }

});

上面的程序會把名字逐個輸出幔烛。我們也可以在 FlatMap 中實現(xiàn)更為復(fù)雜的邏輯啃擦,比如過濾掉一些我們不需要的數(shù)據(jù)等。

Filter

顧名思義饿悬,F(xiàn)liter 的意思就是過濾掉不需要的數(shù)據(jù)令蛉,每個元素都會被 filter 函數(shù)處理,如果 filter 函數(shù)返回 true 則保留狡恬,否則丟棄珠叔。

[圖片上傳失敗...(image-3c087d-1644856901885)]

例如,我們只保留 id 為偶數(shù)的那些 item弟劲。

復(fù)制代碼

SingleOutputStreamOperator<MyStreamingSource.Item> filterItems = items.filter(new FilterFunction<MyStreamingSource.Item>() {

    @Override

    public boolean filter(MyStreamingSource.Item item) throws Exception {



        return item.getId() % 2 == 0;

    }

});

[圖片上傳失敗...(image-4f63da-1644856901885)]

當(dāng)然祷安,我們也可以在 filter 中使用 lambda 表達(dá)式:

復(fù)制代碼

SingleOutputStreamOperator<MyStreamingSource.Item> filterItems = items.filter( 

    item -> item.getId() % 2 == 0

);

KeyBy

在介紹 KeyBy 函數(shù)之前,需要你理解一個概念:KeyedStream兔乞。 在實際業(yè)務(wù)中汇鞭,我們經(jīng)常會需要根據(jù)數(shù)據(jù)的某種屬性或者單純某個字段進行分組,然后對不同的組進行不同的處理庸追。舉個例子霍骄,當(dāng)我們需要描述一個用戶畫像時,則需要根據(jù)用戶的不同行為事件進行加權(quán)淡溯;再比如读整,我們在監(jiān)控雙十一的交易大盤時,則需要按照商品的品類進行分組咱娶,分別計算銷售額绘沉。

[圖片上傳失敗...(image-e607dd-1644856901885)]

我們在使用 KeyBy 函數(shù)時會把 DataStream 轉(zhuǎn)換成為 KeyedStream煎楣,事實上 KeyedStream 繼承了 DataStream,KeyedStream 中的元素會根據(jù)用戶傳入的參數(shù)進行分組车伞。

我們在第 02 課時中講解的 WordCount 程序,曾經(jīng)使用過 KeyBy:

復(fù)制代碼

    // 將接收的數(shù)據(jù)進行拆分喻喳,分組另玖,窗口計算并且進行聚合輸出

        DataStream<WordWithCount> windowCounts = text

                .flatMap(new FlatMapFunction<String, WordWithCount>() {

                    @Override

                    public void flatMap(String value, Collector<WordWithCount> out) {

                        for (String word : value.split("\\s")) {

                            out.collect(new WordWithCount(word, 1L));

                        }

                    }

                })

                .keyBy("word")

                .timeWindow(Time.seconds(5), Time.seconds

                ....

在生產(chǎn)環(huán)境中使用 KeyBy 函數(shù)時要十分注意!該函數(shù)會把數(shù)據(jù)按照用戶指定的 key 進行分組表伦,那么相同分組的數(shù)據(jù)會被分發(fā)到一個 subtask 上進行處理谦去,在大數(shù)據(jù)量和 key 分布不均勻的時非常容易出現(xiàn)數(shù)據(jù)傾斜和反壓,導(dǎo)致任務(wù)失敗蹦哼。

[圖片上傳失敗...(image-475647-1644856901885)]

常見的解決方式是把所有數(shù)據(jù)加上隨機前后綴鳄哭,這些我們會在后面的課時中進行深入講解。

Aggregations

Aggregations 為聚合函數(shù)的總稱纲熏,常見的聚合函數(shù)包括但不限于 sum妆丘、max、min 等局劲。Aggregations 也需要指定一個 key 進行聚合勺拣,官網(wǎng)給出了幾個常見的例子:

復(fù)制代碼

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

在上面的這幾個函數(shù)中,max鱼填、min药有、sum 會分別返回最大值、最小值和匯總值苹丸;而 minBy 和 maxBy 則會把最小或者最大的元素全部返回愤惰。我們拿 max 和 maxBy 舉例說明:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//獲取數(shù)據(jù)源

List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();

data.add(new Tuple3<>(0,1,0));

data.add(new Tuple3<>(0,1,1));

data.add(new Tuple3<>(0,2,2));

data.add(new Tuple3<>(0,1,3));

data.add(new Tuple3<>(1,2,5));

data.add(new Tuple3<>(1,2,9));

data.add(new Tuple3<>(1,2,11));

data.add(new Tuple3<>(1,2,13));



DataStreamSource<MyStreamingSource.Item> items = env.fromCollection(data);

items.keyBy(0).max(2).printToErr();



//打印結(jié)果

String jobName = "user defined streaming source";

env.execute(jobName);

我們直接運行程序,會發(fā)現(xiàn)奇怪的一幕:

[圖片上傳失敗...(image-fad2c6-1644856901885)]

從上圖中可以看到赘理,我們希望按照 Tuple3 的第一個元素進行聚合宦言,并且按照第三個元素取最大值。結(jié)果如我們所料感憾,的確是按照第三個元素大小依次進行的打印蜡励,但是結(jié)果卻出現(xiàn)了一個這樣的元素 (0,1,2),這在我們的源數(shù)據(jù)中并不存在阻桅。

我們在 Flink 官網(wǎng)中的文檔可以發(fā)現(xiàn):

The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

文檔中說:min 和 minBy 的區(qū)別在于凉倚,min 會返回我們制定字段的最大值,minBy 會返回對應(yīng)的元素(max 和 maxBy 同理)嫂沉。

網(wǎng)上很多資料也這么寫:min 和 minBy 的區(qū)別在于 min 返回最小的值稽寒,而 minBy 返回最小值的key,嚴(yán)格來說這是不正確的趟章。

min 和 minBy 都會返回整個元素杏糙,只是 min 會根據(jù)用戶指定的字段取最小值慎王,并且把這個值保存在對應(yīng)的位置,而對于其他的字段宏侍,并不能保證其數(shù)值正確赖淤。max 和 maxBy 同理。

事實上谅河,對于 Aggregations 函數(shù)咱旱,F(xiàn)link 幫助我們封裝了狀態(tài)數(shù)據(jù),這些狀態(tài)數(shù)據(jù)不會被清理绷耍,所以在實際生產(chǎn)環(huán)境中應(yīng)該盡量避免在一個無限流上使用 Aggregations吐限。而且,對于同一個 keyedStream 褂始,只能調(diào)用一次 Aggregation 函數(shù)诸典。

不建議的是那些狀態(tài)無限增長的聚合,實際應(yīng)用中一般會配合窗口使用崎苗。使得狀態(tài)不會無限制擴張狐粱。

Reduce

Reduce 函數(shù)的原理是,會在每一個分組的 keyedStream 上生效益缠,它會按照用戶自定義的聚合邏輯進行分組聚合脑奠。
[圖片上傳失敗...(image-feceba-1644856901885)]

例如:

復(fù)制代碼

List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();

data.add(new Tuple3<>(0,1,0));

data.add(new Tuple3<>(0,1,1));

data.add(new Tuple3<>(0,2,2));

data.add(new Tuple3<>(0,1,3));

data.add(new Tuple3<>(1,2,5));

data.add(new Tuple3<>(1,2,9));

data.add(new Tuple3<>(1,2,11));

data.add(new Tuple3<>(1,2,13));



DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);

//items.keyBy(0).max(2).printToErr();



SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> reduce = items.keyBy(0).reduce(new ReduceFunction<Tuple3<Integer, Integer, Integer>>() {

    @Override

    public Tuple3<Integer,Integer,Integer> reduce(Tuple3<Integer, Integer, Integer> t1, Tuple3<Integer, Integer, Integer> t2) throws Exception {

        Tuple3<Integer,Integer,Integer> newTuple = new Tuple3<>();



        newTuple.setFields(0,0,(Integer)t1.getField(2) + (Integer) t2.getField(2));

        return newTuple;

    }

});



reduce.printToErr().setParallelism(1);

我們對下面的元素按照第一個元素進行分組,第三個元素分別求和幅慌,并且把第一個和第二個元素都置為 0:

復(fù)制代碼

data.add(new Tuple3<>(0,1,0));

data.add(new Tuple3<>(0,1,1));

data.add(new Tuple3<>(0,2,2));

data.add(new Tuple3<>(0,1,3));

data.add(new Tuple3<>(1,2,5));

data.add(new Tuple3<>(1,2,9));

data.add(new Tuple3<>(1,2,11));

data.add(new Tuple3<>(1,2,13));

那么最終會得到:(0,0,6) 和 (0,0,38)宋欺。

總結(jié)

這一課時介紹了常用的 API 操作,事實上 DataStream 的 API 遠(yuǎn)遠(yuǎn)不止這些胰伍,我們在看官方文檔的時候要動手去操作驗證一下齿诞,更為高級的 API 將會在實戰(zhàn)課中用到的時候著重進行講解。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末骂租,一起剝皮案震驚了整個濱河市祷杈,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌渗饮,老刑警劉巖但汞,帶你破解...
    沈念sama閱讀 216,324評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異互站,居然都是意外死亡私蕾,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,356評論 3 392
  • 文/潘曉璐 我一進店門胡桃,熙熙樓的掌柜王于貴愁眉苦臉地迎上來踩叭,“玉大人,你說我怎么就攤上這事∪荼矗” “怎么了自脯?”我有些...
    開封第一講書人閱讀 162,328評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長斤富。 經(jīng)常有香客問我膏潮,道長,這世上最難降的妖魔是什么满力? 我笑而不...
    開封第一講書人閱讀 58,147評論 1 292
  • 正文 為了忘掉前任戏罢,我火速辦了婚禮,結(jié)果婚禮上脚囊,老公的妹妹穿的比我還像新娘。我一直安慰自己桐磁,他們只是感情好悔耘,可當(dāng)我...
    茶點故事閱讀 67,160評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著我擂,像睡著了一般衬以。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上校摩,一...
    開封第一講書人閱讀 51,115評論 1 296
  • 那天看峻,我揣著相機與錄音,去河邊找鬼衙吩。 笑死互妓,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的坤塞。 我是一名探鬼主播冯勉,決...
    沈念sama閱讀 40,025評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼摹芙!你這毒婦竟也來了灼狰?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,867評論 0 274
  • 序言:老撾萬榮一對情侶失蹤浮禾,失蹤者是張志新(化名)和其女友劉穎交胚,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體盈电,經(jīng)...
    沈念sama閱讀 45,307評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡蝴簇,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,528評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了挣轨。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片军熏。...
    茶點故事閱讀 39,688評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖卷扮,靈堂內(nèi)的尸體忽然破棺而出荡澎,到底是詐尸還是另有隱情均践,我是刑警寧澤,帶...
    沈念sama閱讀 35,409評論 5 343
  • 正文 年R本政府宣布摩幔,位于F島的核電站彤委,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏或衡。R本人自食惡果不足惜焦影,卻給世界環(huán)境...
    茶點故事閱讀 41,001評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望封断。 院中可真熱鬧斯辰,春花似錦、人聲如沸坡疼。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,657評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽柄瑰。三九已至闸氮,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間教沾,已是汗流浹背蒲跨。 一陣腳步聲響...
    開封第一講書人閱讀 32,811評論 1 268
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留授翻,地道東北人或悲。 一個月前我還...
    沈念sama閱讀 47,685評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像藏姐,于是被迫代替她去往敵國和親隆箩。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,573評論 2 353

推薦閱讀更多精彩內(nèi)容