通過MapReduce的本地運(yùn)行方式來分析MapReduce的執(zhí)行過程

一秩彤、首先拋出結(jié)論(Map階段)

    1. Job.afterCompletion():
        檢查是否是running狀態(tài)我注,如果是running避免重復(fù)提交句惯!
        如果狀態(tài)是define莺禁,提交幕垦!執(zhí)行commit()
                    
    2. commit() :  
        創(chuàng)建Cluster對象扔茅,是Job運(yùn)行的集群的抽象表達(dá)读串,包含JobRunner聊记,及文件系統(tǒng)!
        根據(jù)Cluster獲取Jobcommitter恢暖,提交Job
                    
    3. 提交前:
        ①確定當(dāng)前Job的作業(yè)目錄
        ②切片排监,將split.info / split.infometa在作業(yè)目錄生成
        ③根據(jù)切片數(shù),設(shè)置MapTask啟動(dòng)個(gè)數(shù)
        ④生成Job.xml文件(包含了所有的配置信息參數(shù))
                    
    4. 提交: 
        在LocalJobRunner上重構(gòu)Job對象杰捂!
        執(zhí)行start()社露,啟動(dòng)一個(gè)線程!
                    
    5. Job的run()
                
        ①根據(jù)切片信息琼娘,獲取包含切面及屬性信息的數(shù)組峭弟,根據(jù)這個(gè)數(shù)組附鸽,確定List<RunableandThrowable>  mapTaskRunables
                            
        ②根據(jù)設(shè)置的numReduceTasks(默認(rèn)為1),確定List<RunableandThrowable>  reduceeTaskRunables
                    
        ③創(chuàng)建線程池瞒瘸,開啟多個(gè)線程坷备,運(yùn)行MapTask和ReduceTask
                    
    6. 每個(gè)MapTaskRunable對象,都會(huì)創(chuàng)建一個(gè)MapTask
        MapTask執(zhí)行runNewMapper()執(zhí)行Map階段的核心邏輯情臭!
                    
    7. 每個(gè)ReduceTaskRunable對象省撑,都會(huì)創(chuàng)建一個(gè)ReduceTask
        ReduceTask執(zhí)行runNewReducer()執(zhí)行Rudece階段的核心邏輯!

二俯在、各階段源碼

  1. Job.waitForCompletion
public boolean waitForCompletion(boolean verbose) throws IOException, 
                        InterruptedException, ClassNotFoundException {
   // 判斷job是定義狀態(tài)竟秫,避免job提交后正在運(yùn)行造成的重復(fù)提交
   if (state == JobState.DEFINE) {
       //此方法的核心在于submit()
       submit();
   }
   if (verbose) {        // 根據(jù)傳入?yún)?shù)決定是否將job運(yùn)行的過程打印顯示
       monitorAndPrintJob();
   } else {
      // get the completion poll interval from the client.
       int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf());
       while (!isComplete()) {
           try {
               Thread.sleep(completionPollIntervalMillis);
           } catch (InterruptedException ie) {
           }
       }
    }
    return isSuccessful();
}

  1. submit()
    2.1 submit方法
public void submit() throws IOException, InterruptedException, ClassNotFoundException {
    //確認(rèn)運(yùn)行狀態(tài)
    ensureState(JobState.DEFINE);
    //確定是否是使用的新API
    setUseNewAPI();
    // 創(chuàng)建Cluster對象,Cluster是集群的抽象表達(dá)跷乐,包含JobRunner,和文件系統(tǒng)
    connect();

    // 根據(jù)Cluster創(chuàng)建Job提交器
    final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(),cluster.getClient());
    // 使用Job提交器肥败,提交Job,獲取Job運(yùn)行狀態(tài)
    //ugi是UserGroupInformation類的實(shí)例愕提,表示Hadoop中的用戶和組信息馒稍,這個(gè)類包裝了一個(gè)JAAS Subject以及提供了
    //方法來確定用戶的名字和組,它同時(shí)支持Windows浅侨、Unix和Kerberos登錄模塊
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
        public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {
            return submitter.submitJobInternal(Job.this, cluster);
        }
    });

    // 將狀態(tài)改為正在運(yùn)行
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
}

2.2. setUseNewApi()

private void setUseNewAPI() throws IOException {
    // reduceTask的個(gè)數(shù)纽谒,通過mapreduce.job.reduces設(shè)置,默認(rèn)是1
    int numReduces = conf.getNumReduceTasks();
    ......//后面是一系列的檢查語句,此處不作具體解釋
}
  1. Jobsubmitter提交Job
JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {

    //validate the jobs output specs 
    // 獲取輸出格式如输,調(diào)用方法鼓黔,檢查輸入目錄是否設(shè)置且不存在!
    checkSpecs(job);

    Configuration conf = job.getConfiguration();
    //將conf加入分布式緩存中
    addMRFrameworkToDistributedCache(conf);
    // 生成Job運(yùn)行期間臨時(shí)作業(yè)目錄
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

    //configure the command line options correctly on the submitting dfs
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
        //得到本機(jī)的提交地址
        submitHostAddress = ip.getHostAddress();
        //得到本機(jī)的主機(jī)名字
        submitHostName = ip.getHostName();
        //獲取之后不见,在配置文件中進(jìn)行submitHostName和submitHostAddress的設(shè)置
        conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
        conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
    // 生成Job的id
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);
    // 根據(jù)Jobid澳化,在Job作業(yè)目錄,創(chuàng)建一個(gè)子目錄
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;
    try{
        ......//此段內(nèi)容為權(quán)限檢查

        // 生成當(dāng)前Job的作業(yè)目錄
        copyAndConfigureFiles(job, submitJobDir);
        // 獲取當(dāng)前Job總的配置文件Job.xml(包含8個(gè)配置文件中所有的信息)的路徑
        Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
          
        // Create the splits for the job
        LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));

        // 生成當(dāng)前Job的切片信息: Job.split(切片信息)脖祈,job.splitinfo(切片的屬性)
        // Job.split保存了有多少個(gè)切片對象肆捕,以及每個(gè)切片是從哪個(gè)文件切的哪部分
        // job.splitinfo記錄的是每一個(gè)切片刷晋,應(yīng)該去哪個(gè)主機(jī)來讀取盖高,塊信息的DN主機(jī)
        int maps = writeSplits(job, submitJobDir);

        // 根據(jù)切片數(shù),設(shè)置應(yīng)該啟動(dòng)幾個(gè)MapTask
        conf.setInt(MRJobConfig.NUM_MAPS, maps);
        LOG.info("number of splits:" + maps);

        ......//此段內(nèi)容為權(quán)限檢查
        // Write job file to submit dir
        // 將當(dāng)前Job的配置信息生成到作業(yè)目錄的job.xml中
        writeConf(conf, submitJobFile);
        
        // Now, actually submit the job (using the submit name)
        printTokens(jobId, job.getCredentials());

        // 正式提交
        status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

        if (status != null) {
            return status;
        } else {
            throw new IOException("Could not launch job");
        }
    }finally {
        if (status == null) {
            LOG.info("Cleaning up the staging area " + submitJobDir);
            if (jtFs != null && submitJobDir != null)
                jtFs.delete(submitJobDir, true);
        }
    }
}

4.1 提交Job
submitJob()方法是接口 ClientProtocol(RPC 協(xié)議)中的一個(gè)抽象方法眼虱。根據(jù) RPC 原理喻奥,在【客戶端代理對象submitClient】調(diào)用RPC協(xié)議中的submitJob()方法,此方法一定在服務(wù)端執(zhí)行捏悬。該方法也有兩種實(shí)現(xiàn): LocalJobRunner(本地模式)和 YARNRunner(YARN模式)

//本地模式的Job提交方式撞蚕,
public org.apache.hadoop.mapreduce.JobStatus submitJob(
      org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir,
      Credentials credentials) throws IOException {
    Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);   //跳至第4.2階段
    job.job.setCredentials(credentials);
    return job.status;
}

4.2 New Job

//重構(gòu)Job對象!
public Job(JobID jobid, String jobSubmitDir) throws IOException {
     …… //各種設(shè)置过牙,重構(gòu)在LocalJobRunner上運(yùn)行的Job
     // 開啟一個(gè)線程
     this.start();
}
  1. Job.run()
//運(yùn)行Job
public void run() {
    JobID jobId = profile.getJobID();
    JobContext jContext = new JobContextImpl(job, jobId);
    
    org.apache.hadoop.mapreduce.OutputCommitter outputCommitter = null;
    try {
        outputCommitter = createOutputCommitter(conf.getUseNewMapper(), jobId, conf);
    } catch (Exception e) {
        LOG.info("Failed to createOutputCommitter", e);
        return;
    }
    
    try {
        // 根據(jù)切片信息甥厦,創(chuàng)建所有的切片及切片屬性對象
        TaskSplitMetaInfo[] taskSplitMetaInfos = 
        SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);

        int numReduceTasks = job.getNumReduceTasks();
        outputCommitter.setupJob(jContext);
        status.setSetupProgress(1.0f);
        // 使用一個(gè)Map記錄每個(gè)MapTask最終保存文件的信息
        Map<TaskAttemptID, MapOutputFile> mapOutputFiles = Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());
        //創(chuàng)建MapTask運(yùn)行的線程集合纺铭,有幾片就啟動(dòng)幾個(gè)線程
        List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(taskSplitMetaInfos, jobId, mapOutputFiles);
              
        initCounters(mapRunnables.size(), numReduceTasks);

        // 創(chuàng)建線程池
        ExecutorService mapService = createMapExecutor();
        // 開啟Map階段線程的運(yùn)行
        //注意:mapreduce的運(yùn)行過程中,使用了線程池的技術(shù)(放到隊(duì)列當(dāng)中刀疙,在將來的某個(gè)時(shí)刻進(jìn)行執(zhí)行)
        runTasks(mapRunnables, mapService, "map");

        try {
            if (numReduceTasks > 0) {
              //計(jì)算reduce對應(yīng)的runnable個(gè)數(shù)
              List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables(
                  jobId, mapOutputFiles);
               //開啟線程池
              ExecutorService reduceService = createReduceExecutor();
              //開啟Reduce階段線程的運(yùn)行舶赔,此篇文章不講解此階段,后面新開文章講解此階段內(nèi)容
              runTasks(reduceRunnables, reduceService, "reduce");
            }
        } finally {
            for (MapOutputFile output : mapOutputFiles.values()) {
              output.removeAll();
            }
        }
        // delete the temporary directory in output directory
        outputCommitter.commitJob(jContext);
        status.setCleanupProgress(1.0f);

        if (killed) {
            this.status.setRunState(JobStatus.KILLED);
        } else {
            this.status.setRunState(JobStatus.SUCCEEDED);
        }
        JobEndNotifier.localRunnerNotification(job, status);
    } catch (Throwable t) {
        try {
            outputCommitter.abortJob(jContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
        } catch (IOException ioe) {
            LOG.info("Error cleaning up job:" + id);
        }
        status.setCleanupProgress(1.0f);
        if (killed) {
            this.status.setRunState(JobStatus.KILLED);
        } else {
            this.status.setRunState(JobStatus.FAILED);
        }
        LOG.warn(id, t);

        JobEndNotifier.localRunnerNotification(job, status);

    } finally {
        try {
            fs.delete(systemJobFile.getParent(), true);  // delete submit dir
            localFs.delete(localJobFile, true);              // delete local copy
            // Cleanup distributed cache
            localDistributedCacheManager.close();
        } catch (IOException e) {
            LOG.warn("Error cleaning up "+id+": "+e);
        }
    }
}

6.Job類中的runTasks()方法

private void runTasks(List<RunnableWithThrowable> runnables,
    ExecutorService service, String taskType) throws Exception {
    // Start populating the executor with work units.
    // They may begin running immediately (in other threads).
    for (Runnable r : runnables) {
        //進(jìn)行提交  是一個(gè)線程池谦秧,執(zhí)行map或者reduce
        service.submit(r);
    }
    ...
}
  1. MapTaskRunable的run()
public void run() {
    try {
        // 生成當(dāng)前線程的id
        TaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, TaskType.MAP, taskId), 0);
        LOG.info("Starting task: " + mapId);
        mapIds.add(mapId);

        // 創(chuàng)建MapTask對象竟纳,這個(gè)對象負(fù)責(zé)當(dāng)前線程(節(jié)點(diǎn))map階段邏輯的執(zhí)行!
        MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId,info.getSplitIndex(), 1);
        map.setUser(UserGroupInformation.getCurrentUser().getShortUserName());

        //設(shè)置目錄
        //map為MapTask類型的一個(gè)值疚鲤,例如本次調(diào)試中所獲取的值為:attempt_localXXX_0001_m_000000_0
        //localConf()加載配置文件的信息
        setupChildMapredLocalDirs(map, localConf);
        
        //創(chuàng)建一個(gè)map輸出文件并設(shè)置配置信息
        // 當(dāng)前MapTask生成的數(shù)據(jù)保存的對象
        MapOutputFile mapOutput = new MROutputFiles();
        mapOutput.setConf(localConf);
        mapOutputFiles.put(mapId, mapOutput);

        //指的是一個(gè)job_localXXX_0001.xml文件
        map.setJobFile(localJobFile.toString());
        localConf.setUser(map.getUser());
        map.localizeConfiguration(localConf);
        map.setConf(localConf);
        try {
            map_tasks.getAndIncrement();
            //launchMap()方法锥累,進(jìn)行啟動(dòng)map
            myMetrics.launchMap(mapId);
            map.run(localConf, Job.this);     //此處將調(diào)用Maptask的run方法
            myMetrics.completeMap(mapId);
        } finally {
            map_tasks.getAndDecrement();
        }

        LOG.info("Finishing task: " + mapId);
    } catch (Throwable e) {
      this.storedException = e;
    }
}
  1. MapTask.run()
@Override
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
  throws IOException, ClassNotFoundException, InterruptedException {
  this.umbilical = umbilical;

    if (isMapTask()) {
        // If there are no reducers then there won't be any sort. Hence the map 
        // phase will govern the entire attempt's progress.
        if (conf.getNumReduceTasks() == 0) {
            mapPhase = getProgress().addPhase("map", 1.0f);
        } else {
            // 如果需要對key-value進(jìn)行排序,那么必須有reduce階段集歇!
            // If there are reducers then the entire attempt's progress will be 
            // split between the map phase (67%) and the sort phase (33%).
            mapPhase = getProgress().addPhase("map", 0.667f);
            sortPhase  = getProgress().addPhase("sort", 0.333f);
        }
    }
    TaskReporter reporter = startReporter(umbilical);

    boolean useNewApi = job.getUseNewMapper();
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
    if (jobCleanup) {
        runJobCleanupTask(umbilical, reporter);
        return;
    }
    if (jobSetup) {
        runJobSetupTask(umbilical, reporter);
        return;
    }
    if (taskCleanup) {
        runTaskCleanupTask(umbilical, reporter);
        return;
    }

    if (useNewApi) {
        //此處進(jìn)入Map的核心處理階段的入口
        runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
        runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);
}

9*. MapTask核心邏輯 RunNewMapper()

@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
                  final TaskSplitIndex splitIndex,
                  final TaskUmbilicalProtocol umbilical,
                  TaskReporter reporter
                  ) throws IOException, ClassNotFoundException,
                           InterruptedException {
    // make a task context so we can get the classes
    // 創(chuàng)建配置的上下文
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
                                                                  getTaskID(),
                                                                  reporter);
    // make a mapper  實(shí)例化Mapper對象
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
    // make the input format  實(shí)例化InputFormat
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
    // rebuild the input split  //重建切片 切片包含了當(dāng)前片所屬的文件及哪個(gè)部分
    org.apache.hadoop.mapreduce.InputSplit split = null;
    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
        splitIndex.getStartOffset());
    LOG.info("Processing split: " + split);

    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
        (split, inputFormat, reporter, taskContext);
    
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    org.apache.hadoop.mapreduce.RecordWriter output = null;
    
    // get an output object
    if (job.getNumReduceTasks() == 0) {
        // 負(fù)責(zé)直接將MapTask輸出的結(jié)果輸出到最終的輸出目錄
        output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {
        // 負(fù)責(zé)將MapTask產(chǎn)生的結(jié)果進(jìn)行收集桶略,交給ReduceTask
        output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }

    org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
    mapContext = 
      new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
          input, output, 
          committer, 
          reporter, split);

    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
        mapperContext = 
          new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
              mapContext);

    try {
        input.initialize(split, mapperContext);
        //進(jìn)入之后運(yùn)行用戶自定義的mapper示例
        mapper.run(mapperContext);
        mapPhase.complete();
        setPhase(TaskStatus.Phase.SORT);
        statusUpdate(umbilical);
        input.close();
        input = null;
        output.close(mapperContext);
        output = null;
    } finally {
        closeQuietly(input);
        closeQuietly(output, mapperContext);
    }
}

10.Mapper.run()

//運(yùn)行自定義的Mapper!
public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
        while (context.nextKeyValue()) {
            //在此處開始運(yùn)行自己寫的mapper程序
            map(context.getCurrentKey(), context.getCurrentValue(), context);
        }
    } finally {
        cleanup(context);
    }
}

參考文獻(xiàn):
MapReduce Job本地提交過程源碼跟蹤及分析https://blog.csdn.net/lemonZhaoTao/article/details/72943618

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末鬼悠,一起剝皮案震驚了整個(gè)濱河市删性,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌焕窝,老刑警劉巖蹬挺,帶你破解...
    沈念sama閱讀 219,039評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異它掂,居然都是意外死亡巴帮,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評論 3 395
  • 文/潘曉璐 我一進(jìn)店門虐秋,熙熙樓的掌柜王于貴愁眉苦臉地迎上來榕茧,“玉大人,你說我怎么就攤上這事客给∮醚海” “怎么了?”我有些...
    開封第一講書人閱讀 165,417評論 0 356
  • 文/不壞的土叔 我叫張陵靶剑,是天一觀的道長蜻拨。 經(jīng)常有香客問我,道長桩引,這世上最難降的妖魔是什么缎讼? 我笑而不...
    開封第一講書人閱讀 58,868評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮坑匠,結(jié)果婚禮上血崭,老公的妹妹穿的比我還像新娘。我一直安慰自己,他們只是感情好夹纫,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,892評論 6 392
  • 文/花漫 我一把揭開白布咽瓷。 她就那樣靜靜地躺著,像睡著了一般舰讹。 火紅的嫁衣襯著肌膚如雪忱详。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,692評論 1 305
  • 那天跺涤,我揣著相機(jī)與錄音匈睁,去河邊找鬼。 笑死桶错,一個(gè)胖子當(dāng)著我的面吹牛航唆,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播院刁,決...
    沈念sama閱讀 40,416評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼糯钙,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了退腥?” 一聲冷哼從身側(cè)響起任岸,我...
    開封第一講書人閱讀 39,326評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎狡刘,沒想到半個(gè)月后享潜,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,782評論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡嗅蔬,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,957評論 3 337
  • 正文 我和宋清朗相戀三年剑按,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片澜术。...
    茶點(diǎn)故事閱讀 40,102評論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡艺蝴,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出鸟废,到底是詐尸還是另有隱情猜敢,我是刑警寧澤,帶...
    沈念sama閱讀 35,790評論 5 346
  • 正文 年R本政府宣布盒延,位于F島的核電站缩擂,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏兰英。R本人自食惡果不足惜撇叁,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,442評論 3 331
  • 文/蒙蒙 一供鸠、第九天 我趴在偏房一處隱蔽的房頂上張望畦贸。 院中可真熱鬧,春花似錦、人聲如沸黍翎。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至君账,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間沈善,已是汗流浹背乡数。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留闻牡,地道東北人净赴。 一個(gè)月前我還...
    沈念sama閱讀 48,332評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像罩润,于是被迫代替她去往敵國和親玖翅。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,044評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 勒皮他人的外貌十分奇特割以,衣著也富有特色金度。仆人們都拿著一根系著一個(gè)充氣的氣囊的短棒,里面裝有干碗豆和小石頭以提醒沉浸...
    陸辰熠閱讀 355評論 0 0
  • 醒來严沥,已是早晨8點(diǎn)猜极,在年末的最后一天睡了個(gè)懶覺,查看手機(jī)今日天氣晴朗消玄,空氣良好魔吐。寫下今日三件事,準(zhǔn)備出發(fā)了莱找。 上午...
    senny1978閱讀 256評論 2 2
  • 時(shí)至午夜難入眠 不知誰家鼾聲傳 羨慕人家心態(tài)安 笑我人生看不穿 (可嘆酬姆,可嘆)
    秋心姐有故事閱讀 729評論 8 39