MapReduce Job submit源碼追蹤

本文首發(fā)于我的個(gè)人博客:HTMADAO的Blog 轉(zhuǎn)載請(qǐng)注明出處

Job提交

在mapreduce程序的job類中歇拆,我們通過set Configuration對(duì)象矩动,得到相應(yīng)的job對(duì)象,在job對(duì)象中指定Mapper類腹纳、Reducer類,Job類等屬性后,通過waitForCompletion(true)方法提交并等待job執(zhí)行九杂。傳入的boolean類型參數(shù)決定是否監(jiān)控并打印job的執(zhí)行情況。


public class CheckJob {

    public static void main(String[] args) throws Exception {

        if (args.length!=2){

            System.out.println("你傳夠/多參數(shù)了嗎宣蠕?");

            System.exit(1);

        }else {

            //本程序要處理的數(shù)據(jù)在HDFS上的目錄

            String inputPath = args[0];

            //本程序處理結(jié)果存在HDFS上的目錄例隆,注意:這個(gè)目錄不能存在

            String outputPath = args[1];

            if (inputPath == null || inputPath == "" || outputPath == null || outputPath == ""){

                System.out.println("參數(shù)不對(duì),滾回去重配");

                System.exit(1);

            }else {

                //構(gòu)建Job類的對(duì)象

                Job checkRandomJob = Job.getInstance(HadoopUtil.getRemoteHadoopConf());

                //給當(dāng)前job類的對(duì)象設(shè)置job名稱

                checkRandomJob.setJobName("check Random app");

                //設(shè)置運(yùn)行主類

                checkRandomJob.setJarByClass(CheckJob.class);

                //設(shè)置job的Mapper及其輸出K,V的類型

                checkRandomJob.setMapperClass(CheckMapper.class);

                checkRandomJob.setMapOutputKeyClass(Text.class);

                checkRandomJob.setMapOutputValueClass(LongWritable.class);

                //設(shè)置job的輸出K,V的類型抢蚀,也可以說是Reducer輸出的K,V的類型

                checkRandomJob.setReducerClass(CheckReducer.class);

                checkRandomJob.setOutputKeyClass(Text.class);

                checkRandomJob.setOutputValueClass(LongWritable.class);

                //設(shè)置要處理的HDFS上的文件的路徑

                FileInputFormat.addInputPath(checkRandomJob,new Path(inputPath));

                //設(shè)置最終輸出結(jié)果的路徑

                FileOutputFormat.setOutputPath(checkRandomJob,new Path(outputPath));

                //等待程序完成后自動(dòng)結(jié)束程序

                System.exit(checkRandomJob.waitForCompletion(true)?0:1);

            }

        }

    }

}

job.waitForCompletion()方法

進(jìn)入Job類中的waitForCompletion()方法查看镀层,該方法傳入一個(gè)布爾值參數(shù)。方法首先檢查Job狀態(tài)皿曲,若處于DEFINE狀態(tài)則通過submit()方法提交job唱逢。而后根據(jù)傳入的參數(shù)決定是否監(jiān)控并打印job的運(yùn)行狀況吴侦。


  public boolean waitForCompletion(boolean verbose

                                  ) throws IOException, InterruptedException,

                                            ClassNotFoundException {

    //首先檢查Job狀態(tài),若處于DEFINE狀態(tài)則通過submit()方法向集群提交job

    if (state == JobState.DEFINE) {

      submit();

    }

    //若傳入?yún)?shù)為true坞古,則監(jiān)控并打印job運(yùn)行情況

    if (verbose) {

      monitorAndPrintJob();

    } else {

      // get the completion poll interval from the client.

      int completionPollIntervalMillis =

        Job.getCompletionPollInterval(cluster.getConf());

      while (!isComplete()) {

        try {

          Thread.sleep(completionPollIntervalMillis);

        } catch (InterruptedException ie) {

        }

      }

    }

    return isSuccessful();

  }

submit()方法

該方法負(fù)責(zé)向集群提交job备韧,方法首先再次檢查job的狀態(tài),如果不是DEFINE則不能提交作業(yè)痪枫,setUseNewAPI()方法作用是指定job使用的是新版mapreduce的API织堂,即org.apache.hadoop.mapreduce包下的Mapper和Reducer,而不是老版的mapred包下的類奶陈。

submit()中執(zhí)行了兩個(gè)比較重要的方法:其一的connect()方法會(huì)對(duì)Job類中的Cluster類型的成員進(jìn)行初始化易阳,該成員對(duì)象中封裝了通過Configuration設(shè)置的集群的信息,其內(nèi)部創(chuàng)建了真正的通信協(xié)議對(duì)象尿瞭,它將用于最終的job提交闽烙。

getJobSubmitter()方法通過cluster中封裝的集群信息(這里是文件系統(tǒng)和客戶端)獲取JobSubmitter對(duì)象,該對(duì)象負(fù)責(zé)最終向集群提交job并返回job的運(yùn)行進(jìn)度声搁。最后job提交器對(duì)象submitter.submitJobInternal(Job.this, cluster)將當(dāng)前job對(duì)象提交到cluster中黑竞,并返回job運(yùn)行狀態(tài)給status成員,該方法是JobSubmitter中最核心的功能代碼疏旨。提交成功后很魂,JobState被設(shè)置為RUNNING,表示當(dāng)前job進(jìn)入運(yùn)行階段檐涝,最后控制臺(tái)中打印跟蹤job運(yùn)行狀況的URL遏匆。


  public void submit()

        throws IOException, InterruptedException, ClassNotFoundException {

    ensureState(JobState.DEFINE);

    setUseNewAPI();

    connect();

    //通過cluster中封裝的集群信息(這里是文件系統(tǒng)和客戶端)獲取JobSubmitter對(duì)象,該對(duì)象負(fù)責(zé)最終向集群提交job并返回job的運(yùn)行進(jìn)度

    final JobSubmitter submitter =

        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());

    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {

      public JobStatus run() throws IOException, InterruptedException,

      ClassNotFoundException {

        return submitter.submitJobInternal(Job.this, cluster);

      }

    });

    state = JobState.RUNNING;

    LOG.info("The url to track the job: " + getTrackingURL());

  }

submitJobInternal()方法

任務(wù)提交器(JobSubmitter)最終提交任務(wù)到集群的方法谁榜。

首先checkSpecs(job)方法檢查作業(yè)輸出路徑是否配置并且是否存在幅聘。正確情況是已經(jīng)配置且不存在,輸出路徑的配置參數(shù)為mapreduce.output.fileoutputformat.outputdir

而后獲取job中封裝的Configuration對(duì)象窃植,添加MAPREDUCE_APPLICATION_FRAMEWORK_PATH(應(yīng)用框架路徑)到分布式緩存中帝蒿。

通過JobSubmissionFiles中的靜態(tài)方法getStagingDir()獲取作業(yè)執(zhí)行時(shí)相關(guān)資源的存放路徑。默認(rèn)路徑是: /tmp/hadoop-yarn/staging/root/.staging

關(guān)于ip地址的方法則是用于獲取提交任務(wù)的當(dāng)前主機(jī)的IP巷怜,并將ip葛超、主機(jī)名等相關(guān)信息封裝進(jìn)Configuration對(duì)象中。

生成jobID并將其設(shè)置進(jìn)job對(duì)象中延塑,構(gòu)造提交job的路徑绣张。然后是對(duì)該路徑設(shè)置一系列權(quán)限的操作,此處略過不表

writeConf()方法关带,將Job文件(jar包)上傳到任務(wù)提交文件夾(HDFS)

(重要)writeSplits()方法侥涵,寫分片數(shù)據(jù)文件job.splits和分片元數(shù)據(jù)文件job.splitmetainfo到任務(wù)提交文件夾,計(jì)算maptask數(shù)量。

submitClient.submitJob()方法独令,真正的提交作業(yè)到集群端朵,并返回作業(yè)狀態(tài)到status成員。submitClient是前面初始化Cluster對(duì)象時(shí)構(gòu)建的燃箭。


  JobStatus submitJobInternal(Job job, Cluster cluster)

  throws ClassNotFoundException, InterruptedException, IOException {

    //validate the jobs output specs 檢查作業(yè)輸出路徑是否配置并且是否存在

    checkSpecs(job);

    Configuration conf = job.getConfiguration();

    addMRFrameworkToDistributedCache(conf);

    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

    //configure the command line options correctly on the submitting dfs

    //獲取提交任務(wù)的當(dāng)前主機(jī)的IP冲呢,并將ip、主機(jī)名等相關(guān)信息封裝僅Configuration對(duì)象中

    InetAddress ip = InetAddress.getLocalHost();

    if (ip != null) {

      submitHostAddress = ip.getHostAddress();

      submitHostName = ip.getHostName();

      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);

      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);

    }

    //生成作業(yè)ID,即jobID

    JobID jobId = submitClient.getNewJobID();

    //將jobID設(shè)置入job

    job.setJobID(jobId);

    //構(gòu)造提交作業(yè)路徑,jobStagingArea后接/jobID

    Path submitJobDir = new Path(jobStagingArea, jobId.toString());

    JobStatus status = null;

    try {

      conf.set(MRJobConfig.USER_NAME,

          UserGroupInformation.getCurrentUser().getShortUserName());

      conf.set("hadoop.http.filter.initializers",

          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");

      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());

      LOG.debug("Configuring job " + jobId + " with " + submitJobDir

          + " as the submit dir");

      // get delegation token for the dir 一系列關(guān)于作業(yè)提交路徑權(quán)限的設(shè)置

      TokenCache.obtainTokensForNamenodes(job.getCredentials(),

          new Path[] { submitJobDir }, conf);



      populateTokenCache(conf, job.getCredentials());

      // generate a secret to authenticate shuffle transfers

      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {

        KeyGenerator keyGen;

        try {

          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);

          keyGen.init(SHUFFLE_KEY_LENGTH);

        } catch (NoSuchAlgorithmException e) {

          throw new IOException("Error generating shuffle secret key", e);

        }

        SecretKey shuffleKey = keyGen.generateKey();

        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),

            job.getCredentials());

      }

            if (CryptoUtils.isEncryptedSpillEnabled(conf)) {

        conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);

        LOG.warn("Max job attempts set to 1 since encrypted intermediate" +

                "data spill is enabled");

      }

  //復(fù)制并配置相關(guān)文件

      copyAndConfigureFiles(job, submitJobDir);

  //獲取配置文件路徑

      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);



      // Create the splits for the job

      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));

      //writeSplits()方法招狸,寫分片數(shù)據(jù)文件job.splits和分片元數(shù)據(jù)文件job.splitmetainfo,計(jì)算map任務(wù)數(shù)

      int maps = writeSplits(job, submitJobDir);

      conf.setInt(MRJobConfig.NUM_MAPS, maps);

      LOG.info("number of splits:" + maps);

      int maxMaps = conf.getInt(MRJobConfig.JOB_MAX_MAP,

          MRJobConfig.DEFAULT_JOB_MAX_MAP);

      if (maxMaps >= 0 && maxMaps < maps) {

        throw new IllegalArgumentException("The number of map tasks " + maps +

            " exceeded limit " + maxMaps);

      }

      // write "queue admins of the queue to which job is being submitted"

      // to job file.

      String queue = conf.get(MRJobConfig.QUEUE_NAME,

          JobConf.DEFAULT_QUEUE_NAME);

      AccessControlList acl = submitClient.getQueueAdmins(queue);

      conf.set(toFullPropertyName(queue,

          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

      // removing jobtoken referrals before copying the jobconf to HDFS

      // as the tasks don't need this setting, actually they may break

      // because of it if present as the referral will point to a

      // different job.

      TokenCache.cleanUpTokenReferral(conf);

      if (conf.getBoolean(

          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,

          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {

        // Add HDFS tracking ids

        ArrayList<String> trackingIds = new ArrayList<String>();

        for (Token<? extends TokenIdentifier> t :

            job.getCredentials().getAllTokens()) {

          trackingIds.add(t.decodeIdentifier().getTrackingId());

        }

        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,

            trackingIds.toArray(new String[trackingIds.size()]));

      }

      // Set reservation info if it exists

      ReservationId reservationId = job.getReservationId();

      if (reservationId != null) {

        conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());

      }

      // Write job file to submit dir將Job文件(jar包)上傳到任務(wù)提交文件夾(HDFS)

      writeConf(conf, submitJobFile);



      //

      // Now, actually submit the job (using the submit name)

      //

      printTokens(jobId, job.getCredentials());

      status = submitClient.submitJob(

          jobId, submitJobDir.toString(), job.getCredentials());

      if (status != null) {

        return status;

      } else {

        throw new IOException("Could not launch job");

      }

    } finally {

      if (status == null) {

        LOG.info("Cleaning up the staging area " + submitJobDir);

        if (jtFs != null && submitJobDir != null)

          jtFs.delete(submitJobDir, true);

      }

    }

  }

writeSplits()方法

使用newAPI將會(huì)調(diào)用writeNewSplits()方法


  private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,

      Path jobSubmitDir) throws IOException,

      InterruptedException, ClassNotFoundException {

    JobConf jConf = (JobConf)job.getConfiguration();

    int maps;

    //如果使用newAPI則調(diào)用writeNewSplits

    if (jConf.getUseNewMapper()) {

      maps = writeNewSplits(job, jobSubmitDir);

    } else {

      maps = writeOldSplits(jConf, jobSubmitDir);

    }

    return maps;

  }

writeNewSplits()方法

writeNewSplits()方法將會(huì)根據(jù)我們?cè)O(shè)置的inputFormat.class通過反射獲得inputFormat對(duì)象input敬拓,然后調(diào)用inputFormat對(duì)象的getSplits方法,當(dāng)獲得分片信息之后調(diào)用JobSplitWriter.createSplitFiles方法將分片的信息寫入到submitJobDir/job.split文件中裙戏。


  private <T extends InputSplit>

  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,

      InterruptedException, ClassNotFoundException {

    Configuration conf = job.getConfiguration();

    InputFormat<?, ?> input =

      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

    List<InputSplit> splits = input.getSplits(job);

    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest

    // go first

    Arrays.sort(array, new SplitComparator());

    JobSplitWriter.createSplitFiles(jobSubmitDir, conf,

        jobSubmitDir.getFileSystem(conf), array);

    return array.length;

  }

FileInputFormat類中的getSplits()方法

這里我們需要注意這一代碼:


while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {

    int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);

    splits.add(makeSplit(path, length-bytesRemaining, splitSize,

                        blkLocations[blkIndex].getHosts(),

                        blkLocations[blkIndex].getCachedHosts()));

    bytesRemaining -= splitSize;

}

if (bytesRemaining != 0) {

    int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);

    splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,

                        blkLocations[blkIndex].getHosts(),

                        blkLocations[blkIndex].getCachedHosts()));

}

這里的while循環(huán)中的判定語(yǔ)句作用是判斷分塊中剩余的字節(jié)大小與預(yù)設(shè)分片大小的比例是否超過某個(gè)限定值SPLIT_SLOP乘凸,該值是一個(gè)常量,為1.1累榜,在FileInputFormat類中定義营勤。也就是說當(dāng)剩余字節(jié)大于預(yù)設(shè)分片大小的110%后,對(duì)剩余的文件繼續(xù)分片壹罚,否則不足110%葛作,直接將剩余文件生成一個(gè)分片。


private static final double SPLIT_SLOP = 1.1;

getSplits()方法全覽:


  public List<InputSplit> getSplits(JobContext job) throws IOException {

    StopWatch sw = new StopWatch().start();

    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));

    long maxSize = getMaxSplitSize(job);

    // generate splits

    List<InputSplit> splits = new ArrayList<InputSplit>();

    List<FileStatus> files = listStatus(job);

    for (FileStatus file: files) {

      Path path = file.getPath();

      long length = file.getLen();

      if (length != 0) {

        BlockLocation[] blkLocations;

        if (file instanceof LocatedFileStatus) {

          blkLocations = ((LocatedFileStatus) file).getBlockLocations();

        } else {

          FileSystem fs = path.getFileSystem(job.getConfiguration());

          blkLocations = fs.getFileBlockLocations(file, 0, length);

        }

        if (isSplitable(job, path)) {

          long blockSize = file.getBlockSize();

          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

          long bytesRemaining = length;

          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {

            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);

            splits.add(makeSplit(path, length-bytesRemaining, splitSize,

                        blkLocations[blkIndex].getHosts(),

                        blkLocations[blkIndex].getCachedHosts()));

            bytesRemaining -= splitSize;

          }

          if (bytesRemaining != 0) {

            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);

            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,

                      blkLocations[blkIndex].getHosts(),

                      blkLocations[blkIndex].getCachedHosts()));

          }

        } else { // not splitable

          if (LOG.isDebugEnabled()) {

            // Log only if the file is big enough to be splitted

            if (length > Math.min(file.getBlockSize(), minSize)) {

              LOG.debug("File is not splittable so no parallelization "

                  + "is possible: " + file.getPath());

            }

          }

          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),

                      blkLocations[0].getCachedHosts()));

        }

      } else {

        //Create empty hosts array for zero length files

        splits.add(makeSplit(path, 0, length, new String[0]));

      }

    }

    // Save the number of input files for metrics/loadgen

    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());

    sw.stop();

    if (LOG.isDebugEnabled()) {

      LOG.debug("Total # of splits generated by getSplits: " + splits.size()

          + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));

    }

    return splits;

  }

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末猖凛,一起剝皮案震驚了整個(gè)濱河市赂蠢,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌辨泳,老刑警劉巖虱岂,帶你破解...
    沈念sama閱讀 212,816評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異菠红,居然都是意外死亡第岖,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門试溯,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)绍傲,“玉大人,你說我怎么就攤上這事耍共。” “怎么了猎塞?”我有些...
    開封第一講書人閱讀 158,300評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵试读,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我荠耽,道長(zhǎng)钩骇,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,780評(píng)論 1 285
  • 正文 為了忘掉前任,我火速辦了婚禮倘屹,結(jié)果婚禮上银亲,老公的妹妹穿的比我還像新娘。我一直安慰自己纽匙,他們只是感情好务蝠,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,890評(píng)論 6 385
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著烛缔,像睡著了一般馏段。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上践瓷,一...
    開封第一講書人閱讀 50,084評(píng)論 1 291
  • 那天院喜,我揣著相機(jī)與錄音,去河邊找鬼晕翠。 笑死喷舀,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的淋肾。 我是一名探鬼主播硫麻,決...
    沈念sama閱讀 39,151評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼巫员!你這毒婦竟也來(lái)了庶香?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,912評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤简识,失蹤者是張志新(化名)和其女友劉穎赶掖,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體七扰,經(jīng)...
    沈念sama閱讀 44,355評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡奢赂,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,666評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了颈走。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片膳灶。...
    茶點(diǎn)故事閱讀 38,809評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖立由,靈堂內(nèi)的尸體忽然破棺而出轧钓,到底是詐尸還是另有隱情,我是刑警寧澤锐膜,帶...
    沈念sama閱讀 34,504評(píng)論 4 334
  • 正文 年R本政府宣布毕箍,位于F島的核電站,受9級(jí)特大地震影響道盏,放射性物質(zhì)發(fā)生泄漏而柑。R本人自食惡果不足惜文捶,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,150評(píng)論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望媒咳。 院中可真熱鬧粹排,春花似錦、人聲如沸涩澡。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)筏养。三九已至斧抱,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間渐溶,已是汗流浹背辉浦。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評(píng)論 1 267
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留茎辐,地道東北人宪郊。 一個(gè)月前我還...
    沈念sama閱讀 46,628評(píng)論 2 362
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像拖陆,于是被迫代替她去往敵國(guó)和親弛槐。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,724評(píng)論 2 351

推薦閱讀更多精彩內(nèi)容