Hadoop MapReduce 學(xué)習(xí)筆記

前言

本文是個人之前紀(jì)錄的MapReduce學(xué)習(xí)筆記踪区,主要涉及到MapReduce基本概念、Hadoop 經(jīng)典示例WordCount的使用解析陈莽、hdfs與hbase的簡單了解使用〕诱疲現(xiàn)在整理了一下分享出來犁享,希望對別人有所幫助坪蚁。

學(xué)習(xí)MapReduce一定要理解這種Map半夷、Reduce的編程模型以及Mapper、Reducer數(shù)據(jù)處理的原理迅细,否則只是一味的復(fù)制粘貼可能比較難上手。

同時學(xué)習(xí)大數(shù)據(jù)的知識淘邻,一定要將自己對分布式的理解研究透徹茵典。

一、概念理解

  • MapReduce 是一種線性的可伸縮的編程模型宾舅,用于大規(guī)模數(shù)據(jù)集(大于1TB)的并行運(yùn)算
  • 在MapReduce里统阿,Map處理的是原始數(shù)據(jù),每條數(shù)據(jù)之間互相沒有關(guān)系(這一點一定要注意)筹我。Reduce階段扶平,以key為標(biāo)識,對同一個key下的value進(jìn)行統(tǒng)計蔬蕊,類似{key,[value1,value2……]}
  • 可以把MapReduce理解為结澄,把一堆雜亂無章的數(shù)據(jù)按照某種特征歸納起來,然后處理并得到最后的結(jié)果。
    Map面對的是雜亂無章的互不相關(guān)的數(shù)據(jù)麻献,它解析每個數(shù)據(jù)们妥,從中提取出key和value,也就是提取了數(shù)據(jù)的特征勉吻。
    經(jīng)過MapReduce的Shuffle階段之后监婶,在Reduce階段看到的都是已經(jīng)歸納好的數(shù)據(jù)了,在此基礎(chǔ)上我們可以做進(jìn)一步的處理以便得到結(jié)果齿桃。
  • 缺點:不適用于實時計算惑惶,實時計算一般最低都是要求秒級響應(yīng)的,MR很難滿足這個要求短纵,實時計算一般采用storm等流式計算系統(tǒng)
  • MapReduce計算流程


    MapReduce計算流程--來源網(wǎng)絡(luò)

二带污、編程模型

  • 每個應(yīng)用程序稱為一個作業(yè)(Job),每個Job是由一系列的Mapper和Reducer來完成
  • 任務(wù)過程分為兩個階段踩娘,map和reduce階段刮刑,兩個階段都是使用鍵值對(key/value)作為輸入輸出的
  • 每個Mapper處理一個Split,每個split對應(yīng)一個map線程养渴。Split中的數(shù)據(jù)作為map的輸入雷绢,map的輸出一定在map端
  • Map方法:Map(k1,v1) -> list(k2,v2) ,并行應(yīng)用于每一個輸入的數(shù)據(jù)集理卑,每一次調(diào)用都會產(chǎn)生一個(k2,v2)的隊列 翘紊。
  • Reduce方法:Reduce(k2,list(v2)) -> list(k3,v3)。收集map端輸出隊列l(wèi)ist(k2,v2)中有相同key的數(shù)據(jù)對藐唠,把它們聚集在一起帆疟,輸出時形成目的數(shù)據(jù) list(k3,v3)。
  • 新舊版本API的區(qū)別:
    • 新的api放在:org.apache.hadoop.mapreduce,舊版api放在:org.apache.hadoop.mapred
    • 新API使用虛類宇立,舊版使用的是接口踪宠,虛類更加利于擴(kuò)展

三、運(yùn)行機(jī)制

  1. 輸入分片(input split)

    map計算之前妈嘹,MapReduce會根據(jù)輸入文件計算輸入分片(input -> spliting),每個input split針對一個map任務(wù)柳琢。split存儲的并不是數(shù)據(jù),而是一個分片長度和一個記錄數(shù)據(jù)的位置的數(shù)組

  2. map階段

    map階段的操作一般都是在數(shù)據(jù)存儲節(jié)點上操作润脸,所以有時候為了能夠減輕數(shù)據(jù)傳輸?shù)木W(wǎng)絡(luò)壓力柬脸,可以先combiner階段處理一下數(shù)據(jù),在進(jìn)行reduce

  3. combiner階段

    此階段是可選的毙驯,不是必須經(jīng)過的一個階段倒堕,combiner其實也是一種reduce操作徽千,可以說combiner是一種本地化的reduce操作顶霞,是map運(yùn)算的后續(xù)操作,可以減輕網(wǎng)絡(luò)傳輸?shù)膲毫σ乇搿5莄ombiner的使用需要注意不要影響到reduce的最終結(jié)果,比如計算平均值的時候如果使用combiner就會影響最終的結(jié)果魂那,但是計算總數(shù)的話則對最終結(jié)果沒影響

  4. shuffle階段

    將map的輸出作為reduce的輸入蛾号,這個過程就是shuffle,是MapReduce優(yōu)化的重要階段涯雅。

  5. reduce階段

    reducer階段鲜结,輸入是shuffle階段的輸出,對每個不同的鍵和該鍵對應(yīng)的值的數(shù)據(jù)流進(jìn)行獨(dú)立活逆、并行的處理精刷。

四、WordCount--官方提供的example

代碼

package com.smile.test;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

    private static final String INPUT_PATH = "/user/cdh/yjq/input/words.txt";
    //hdfs輸出路徑
    private static final String OUTPUT_PATH = "/user/cdh/yjq/output/";
    
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        // Text 實現(xiàn)了BinaryComparable類可以作為key值
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

            // 解析鍵值對
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            
            for (IntWritable val : values) {
                sum += val.get();
            }

            result.set(sum);
            context.write(key, result);
        }
    }

    @SuppressWarnings("deprecation")
    public static void main(String[] args) throws Exception {

        String[] paths = {INPUT_PATH,OUTPUT_PATH};
        //獲得Configuration配置 Configuration: core-default.xml, core-site.xml 
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, paths).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        // 設(shè)置Mapper類
        job.setMapperClass(TokenizerMapper.class);
        // 設(shè)置Combiner類 
        job.setCombinerClass(IntSumReducer.class); 
        // 設(shè)置Reduce類
        job.setReducerClass(IntSumReducer.class); 
         // 設(shè)置輸出key的類型,注意跟reduce的輸出類型保持一致
        job.setOutputKeyClass(Text.class);
        // 設(shè)置輸出value的類型蔗候,注意跟reduce的輸出類型保持一致
        job.setOutputValueClass(IntWritable.class);
        // 設(shè)置輸入路徑
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
      // 設(shè)置輸出路徑
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

解析

  • MapReduce的輸出路徑一定要保證文件夾不存在怒允,最好的解決方法時在代碼中添加判斷,執(zhí)行之前刪除output文件夾(具體方法見下面的hdfs操作

  • MapReduce可以沒有輸出锈遥,但必須設(shè)置輸出路徑

  • MapReduce的輸入路徑可以直接寫hdfs的目錄路徑纫事,然后放在集群下執(zhí)行,

      hadoop jar **.jar java類名 參數(shù)1 參數(shù)2 ...
    
  • Mapper

      //map
      public void map(Object key, Text value, Context context)
    

    前面兩個參數(shù)分別是輸入的key所灸,value丽惶,Context context可以記錄輸入的key和value,context也可以記錄map運(yùn)算的狀態(tài)
    map中的context記錄了map執(zhí)行的上下文爬立,在mapper類中钾唬,context可以存儲一些job conf的信息,也就是說context是作為參數(shù)傳遞的載體侠驯。比如runner中configuration的set信息[conf.set(Str, strValue)]抡秆,map中可以get到[context.getConfiguration().get(Str)]

      //setup
      protected void setup(Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
      //cleanup
      protected void cleanup(Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
    

    MapReduce框架內(nèi)的setup和cleanup方法只會執(zhí)行一次,所以一些相關(guān)變量或者是資源的初始化和釋放最好是在setup中執(zhí)行吟策,如果放在map中執(zhí)行儒士,則在解析每一行數(shù)據(jù)的時候都會執(zhí)行一次,嚴(yán)重影響程序運(yùn)行效率檩坚。

  • Reducer

      public void reduce(Text key, Iterable<IntWritable> values, Context context)
    

    reduce的輸入也是key/value形式着撩,不過是values,也就是一個key對應(yīng)的一組value效床,例如key,value1;key,value2...
    reducer不是必須的,如果用不到reducer階段可以不寫

    reduce會接收到不同map傳遞過來的數(shù)據(jù) 权谁,并且每個map傳遞過來的數(shù)據(jù)都是有序的剩檀。如果reduce端接收到的數(shù)據(jù)量比較小,那么會存儲在內(nèi)存中旺芽,如果超出緩沖區(qū)大小一定比例沪猴,則會合并后寫到磁盤上

  • 調(diào)用 runner

      Configuration conf = new Configuration();
      //連接hbase辐啄,操作hbase
      Configuration conf = HBaseConfiguration.create();
    

    MapReduce運(yùn)行之前都要初始化Configuration,主要是讀取MapReduce系統(tǒng)配置运嗜,如core-site.xml壶辜、hdfs-site.xml、mapred-site.xml担租、hbase-site.xml

      scan.setCaching(500); 
    

    增加緩存讀取條數(shù)(一次RPC調(diào)用返回多行紀(jì)錄砸民,也就是每次從服務(wù)器端讀取的行數(shù)),加快scanner讀取速度奋救,但耗費(fèi)內(nèi)存增加岭参,設(shè)太大會響應(yīng)慢、超時或者OOM尝艘。

      setBatch(int batch)
    

    設(shè)置獲取紀(jì)錄的列個數(shù)演侯,默認(rèn)無限制,也就是返回所有的列背亥。實際上就是控制一次next()傳輸多少個columns秒际,如batch為5表示每個result實例返回5個columns
    setBatch使用場景為,用客戶端的scanner緩存進(jìn)行批量交互從而提高性能時狡汉,非常大的行可能無法放入客戶端的內(nèi)存娄徊,這時需要用HBase客戶端API中進(jìn)行batching處理。

    scan.setCacheBlocks(false); 

默認(rèn)是true轴猎,分內(nèi)存嵌莉,緩存和磁盤,三種方式捻脖,一般數(shù)據(jù)的讀取為內(nèi)存->緩存->磁盤锐峭;

setCacheBlocks不適合MapReduce工作:
MR程序為非熱點數(shù)據(jù),不需要緩存可婶,因為Blockcache is LRU沿癞,也就是最近最少訪問算法(扔掉最少訪問的),那么矛渴,前一個請求(比如map讀茸笛铩)讀入Blockcache的所有記錄在后一個請求(新的map讀取)中都沒有用具温,就必須全部被swap蚕涤,那么RegionServer要不斷的進(jìn)行無意義的swapping data,也就是無意義的輸入和輸出BlockCache铣猩,增加了無必要的IO揖铜。而普通讀取時局部查找,或者查找最熱數(shù)據(jù)時达皿,會有提升性能的幫助天吓。

runner方法中可以寫定義多個job贿肩,job會順序執(zhí)行。

五龄寞、常用hadoop fs命令 (類似Linux的文件操作命令汰规,可類比學(xué)習(xí)使用)

-help
功能:輸出這個命令參數(shù)手冊

-ls
功能:顯示目錄信息
示例: hadoop fs -ls /yjq

-mkdir 
功能:在hdfs上創(chuàng)建目錄
示例:hadoop fs -mkdir -p /yjq/test

-moveFromLocal
功能:從本地剪切粘貼到hdfs
示例:hadoop fs -moveFromLocal /home/cdh/a.txt /yjq/test

-moveToLocal
功能:從hdfs剪切粘貼到本地
示例:hadoop fs -moveToLocal /yjq/test/a.txt /home/cdh/ 

-copyFromLocal
功能:從本地文件系統(tǒng)中拷貝文件到hdfs路徑去
示例:hadoop fs -copyFromLocal /home/cdh/a.txt /yjq/test

-copyToLocal
功能:從hdfs拷貝到本地
示例:hadoop fs -copyToLocal /yjq/test/a.txt /home/cdh/ 

-get
功能:等同于copyToLocal,從hdfs下載文件到本地路徑(.表示當(dāng)前路徑)
示例:hadoop fs -get /yjq/test/a.txt .

-getmerge
功能:合并下載多個文件
示例:將目錄下所有的TXT文件下載到本地物邑,并合并成一個文件
hadoop fs -getmerge /yjq/test/*.txt /home/cdh/test.txt

-put
功能:等同于copyFromLocal
示例:hadoop fs -put /home/cdh/a.txt /yjq/test

-cp
功能:從hdfs的一個路徑拷貝hdfs的另一個路徑
示例: hadoop fs -cp /yjq/test1/a.txt /yjq/test2/

-mv
功能:在hdfs目錄中移動文件
示例: hadoop fs -mv /yjq/test1/a.txt /yjq/test2/

-appendToFile
功能:追加一個文件到已經(jīng)存在的文件末尾(本地文件追加到hdfs)
示例:Hadoop fs -appendToFile /home/cdh/a.txt /yjq/test1/a.txt

-cat
功能:顯示文件內(nèi)容
示例:hadoop fs -cat /yjq/test1/a.txt

-tail
功能:顯示一個文件的末尾
示例:hadoop fs -tail /yjq/test1/a.txt

-text
功能:以字符形式打印一個文件的內(nèi)容
示例:hadoop fs -text /yjq/test1/a.txt

-chgrp溜哮、-chmod、-chown
功能:修改文件所屬權(quán)限 
示例:
hadoop fs -chmod 666 /yjq/test1/a.txt
# cdh為用戶名拂封,hadoop為用戶組
hadoop fs -chown cdh:group /yjq/test1/a.txt

-rm
功能:刪除文件或文件夾
示例:hadoop fs -rm -r /yjq/test/a.txt

-df
功能:統(tǒng)計文件系統(tǒng)的可用空間信息
示例:hadoop fs -df -h /

-du
功能:統(tǒng)計文件夾的大小信息
示例:
hadoop fs -du -s -h /yjq/*

-count
功能:統(tǒng)計一個指定目錄下的文件節(jié)點數(shù)量
示例:hadoop fs -count /yjq/

六茬射、HBase 相關(guān)操作

  1. 簡介
    • HBase是一個分布式的、面向列的開源數(shù)據(jù)庫
    • 表由行和列組成冒签,列劃分為多個列族/列簇(column family)
    • RowKey:是Byte array在抛,是表中每條記錄的“主鍵”,方便快速查找萧恕,Rowkey的設(shè)計非常重要刚梭。
    • Column Family:列族,擁有一個名稱(string)票唆,包含一個或者多個相關(guān)列
    • Column:屬于某一個columnfamily朴读,familyName:columnName,每條記錄可動態(tài)添加
    • Hbase--圖片來源網(wǎng)絡(luò)
  1. 編碼

     Configuration conf = HBaseConfiguration.create();
    

    會自動讀取hbase-site.xml配置文件

     Scan scan = new Scan();
     scan.setCaching(1000);
     scan.setStartRow(getBytes(startDate));
     scan.setStopRow(getBytes(endDate));
    
     TableMapReduceUtil.initTableMapperJob(HB_TABLE_NAME, scan, NewsStreamUrlMapper.class, Text.class, Text.class, job);
    

    參數(shù):hbase table name走趋,scan衅金,mapper class,outputKeyClass簿煌,outputValueClass氮唯,job

七、hdfs操作

  1. 運(yùn)算之前清除hdfs上的文件夾

     FileSystem fs = FileSystem.get(new Configuration());
     Path outputDir = new Path(OUTPUT_PATH);
     //運(yùn)算之前如果文件夾存在則清除文件夾
     if(fs.exists(outputDir))
         fs.delete(outputDir, true);
    
  2. HDFS讀流程

    • 客戶端向NameNode發(fā)起讀數(shù)據(jù)請求
    • NameNode找出距離最近的DataNode節(jié)點信息
    • 客戶端從DataNode分塊下載文件
  3. HDFS寫流程

    • 客戶端向NameNode發(fā)起寫數(shù)據(jù)請求
    • 分塊寫入DataNode節(jié)點姨伟,DataNode自動完成副本備份
    • DataNode向NameNode匯報存儲完成惩琉,NameNode通知客戶端

八、多表操作

MultiTableInputFormat 支持多個mapper的輸出混合到一個shuffle夺荒,一個reducer瞒渠,其中每個mapper擁有不同的inputFormat和mapper處理類。
所有的mapper需要輸出相同的數(shù)據(jù)類型技扼,對于輸出value伍玖,需要標(biāo)記該value來源,以便reducer識別

List<Scan> scans = new ArrayList<Scan>();  

Scan scan1 = new Scan();  
scan1.setCaching(100);  
scan1.setCacheBlocks(false);  
scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, inTable.getBytes());  
scans.add(scan1);  
  
Scan scan2 = new Scan();  
scan2.setCaching(100);  
scan2.setCacheBlocks(false);  
scan2.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, inPhoneImsiTable.getBytes());  
scans.add(scan2);   

TableMapReduceUtil.initTableMapperJob(scans, ReadHbaseMapper.class, Text.class,Result.class, job);

九剿吻、錯誤處理

  1. ScannerTimeoutException:org.apache.hadoop.hbase.client.ScannerTimeoutException

    這是當(dāng)從服務(wù)器傳輸數(shù)據(jù)到客戶端的時間窍箍,或者客戶端處理數(shù)據(jù)的時間大于了scanner設(shè)置的超時時間,scanner超時報錯,可在客戶端代碼中設(shè)置超時時間

     Configuration conf = HBaseConfiguration.create()              
     conf.setLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,120000) 
    

    如果Mapper階段對每條數(shù)據(jù)的處理時間過長仔燕,可以將scan.setCaching(1000)的值設(shè)置小一點,如果值設(shè)置太大魔招,則處理時間會很長就會出現(xiàn)超時錯誤晰搀。

寫在最后

很久之前寫的學(xué)習(xí)筆記了,資料來源網(wǎng)絡(luò)及項目組內(nèi)的討論办斑,參考文獻(xiàn)就不一一標(biāo)注了外恕,侵刪~

如果您覺得本文對您有幫助,點個贊吧~~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末乡翅,一起剝皮案震驚了整個濱河市鳞疲,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌蠕蚜,老刑警劉巖尚洽,帶你破解...
    沈念sama閱讀 218,036評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異靶累,居然都是意外死亡腺毫,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評論 3 395
  • 文/潘曉璐 我一進(jìn)店門挣柬,熙熙樓的掌柜王于貴愁眉苦臉地迎上來潮酒,“玉大人,你說我怎么就攤上這事邪蛔〖崩瑁” “怎么了?”我有些...
    開封第一講書人閱讀 164,411評論 0 354
  • 文/不壞的土叔 我叫張陵侧到,是天一觀的道長勃教。 經(jīng)常有香客問我,道長床牧,這世上最難降的妖魔是什么荣回? 我笑而不...
    開封第一講書人閱讀 58,622評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮戈咳,結(jié)果婚禮上心软,老公的妹妹穿的比我還像新娘。我一直安慰自己著蛙,他們只是感情好删铃,可當(dāng)我...
    茶點故事閱讀 67,661評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著踏堡,像睡著了一般猎唁。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上顷蟆,一...
    開封第一講書人閱讀 51,521評論 1 304
  • 那天诫隅,我揣著相機(jī)與錄音腐魂,去河邊找鬼。 笑死逐纬,一個胖子當(dāng)著我的面吹牛蛔屹,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播豁生,決...
    沈念sama閱讀 40,288評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼兔毒,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了甸箱?” 一聲冷哼從身側(cè)響起育叁,我...
    開封第一講書人閱讀 39,200評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎芍殖,沒想到半個月后豪嗽,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,644評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡豌骏,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,837評論 3 336
  • 正文 我和宋清朗相戀三年昵骤,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片肯适。...
    茶點故事閱讀 39,953評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡变秦,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出框舔,到底是詐尸還是另有隱情蹦玫,我是刑警寧澤,帶...
    沈念sama閱讀 35,673評論 5 346
  • 正文 年R本政府宣布刘绣,位于F島的核電站樱溉,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏纬凤。R本人自食惡果不足惜福贞,卻給世界環(huán)境...
    茶點故事閱讀 41,281評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望停士。 院中可真熱鬧挖帘,春花似錦、人聲如沸恋技。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,889評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蜻底。三九已至骄崩,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背要拂。 一陣腳步聲響...
    開封第一講書人閱讀 33,011評論 1 269
  • 我被黑心中介騙來泰國打工抠璃, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人脱惰。 一個月前我還...
    沈念sama閱讀 48,119評論 3 370
  • 正文 我出身青樓鸡典,卻偏偏與公主長得像,于是被迫代替她去往敵國和親枪芒。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,901評論 2 355

推薦閱讀更多精彩內(nèi)容