度量系統(tǒng)--Metrics

Spark的度量系統(tǒng)有以下幾部分掉瞳,也可以參照MetricsSystem類的注釋部分

  • Instance: 數(shù)據(jù)實(shí)例欧引。Spark的Instance有Master契耿、Worker定嗓、ApplicationInfo症昏、StreamingContext等课蔬,主要用來提供Source數(shù)據(jù)何暇、啟停MetricsSystem
  • Source: 度量數(shù)據(jù)輸入源句柠。Source采集的數(shù)據(jù)來源于Instance實(shí)例屬性
  • Sink: 度量數(shù)據(jù)輸出源咸包。Spark使用MetricsServlet作為默認(rèn)Sink
  • MetricsConfig: 度量需要的配置信息桃序。initialize()方法初始化properties
  • MetricsSystem: instance粒度的Source、Sink控制中心

Source

Spark將度量數(shù)據(jù)來源抽象為Source接口诉儒。提供了ApplicationSource葡缰、MasterSource、WorkerSource忱反、DAGSchedulerSource泛释、StreamingSource、JvmSource等實(shí)現(xiàn)

private[spark] trait Source {
  def sourceName: String
  def metricRegistry: MetricRegistry
}
  • sourceName: 度量源名稱
  • metricRegistry: 度量源注冊對象

具體分析下MasterSource温算、WorkerSource怜校、JvmSource輸入源

private[spark] class MasterSource(val master: Master) extends Source {
  override val metricRegistry = new MetricRegistry()
  override val sourceName = "master"

  // Gauge for worker numbers in cluster
  metricRegistry.register(MetricRegistry.name("workers"), new Gauge[Int] {
    override def getValue: Int = master.workers.size
  })

  // Gauge for alive worker numbers in cluster
  metricRegistry.register(MetricRegistry.name("aliveWorkers"), new Gauge[Int]{
    override def getValue: Int = master.workers.count(_.state == WorkerState.ALIVE)
  })

  // Gauge for application numbers in cluster
  metricRegistry.register(MetricRegistry.name("apps"), new Gauge[Int] {
    override def getValue: Int = master.apps.size
  })

  // Gauge for waiting application numbers in cluster
  metricRegistry.register(MetricRegistry.name("waitingApps"), new Gauge[Int] {
    override def getValue: Int = master.apps.count(_.state == ApplicationState.WAITING)
  })
}
private[worker] class WorkerSource(val worker: Worker) extends Source {
  override val sourceName = "worker"
  override val metricRegistry = new MetricRegistry()

  metricRegistry.register(MetricRegistry.name("executors"), new Gauge[Int] {
    override def getValue: Int = worker.executors.size
  })

  // Gauge for cores used of this worker
  metricRegistry.register(MetricRegistry.name("coresUsed"), new Gauge[Int] {
    override def getValue: Int = worker.coresUsed
  })

  // Gauge for memory used of this worker
  metricRegistry.register(MetricRegistry.name("memUsed_MB"), new Gauge[Int] {
    override def getValue: Int = worker.memoryUsed
  })

  // Gauge for cores free of this worker
  metricRegistry.register(MetricRegistry.name("coresFree"), new Gauge[Int] {
    override def getValue: Int = worker.coresFree
  })

  // Gauge for memory free of this worker
  metricRegistry.register(MetricRegistry.name("memFree_MB"), new Gauge[Int] {
    override def getValue: Int = worker.memoryFree
  })
}

MetricRegistry的Gauge統(tǒng)計(jì)數(shù)據(jù)來源于Master、Worker對象的字段屬性

JvmSource的MetricSet來源于metrics-jvm包的實(shí)現(xiàn)

private[spark] class JvmSource extends Source {
  override val sourceName = "jvm"
  override val metricRegistry = new MetricRegistry()

  metricRegistry.registerAll(new GarbageCollectorMetricSet)
  metricRegistry.registerAll(new MemoryUsageGaugeSet)
  metricRegistry.registerAll(
    new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer))
}

Source用來創(chuàng)建MetricRegistry對象注竿,并register需要統(tǒng)計(jì)的數(shù)據(jù)指標(biāo)茄茁,指標(biāo)來源于Instance實(shí)例對象屬性

Sink

Spark將度量數(shù)據(jù)統(tǒng)計(jì)輸出源抽象為Sink接口。提供了ConsoleSink巩割、CsvSink裙顽、MetricsServlet、GraphiteSink宣谈、JmxSink愈犹、Slf4jSink等實(shí)現(xiàn)

private[spark] trait Sink {
  def start(): Unit
  def stop(): Unit
  def report(): Unit
}
  • MetricsServlet: 在Spark UI的jetty服務(wù)中創(chuàng)建ServletContextHandler,將度量數(shù)據(jù)統(tǒng)計(jì)展示在瀏覽器

具體分析下Slf4jSink實(shí)現(xiàn)

private[spark] class Slf4jSink(
    val property: Properties,
    val registry: MetricRegistry,
    securityMgr: SecurityManager)
  extends Sink {
  val SLF4J_DEFAULT_PERIOD = 10
  val SLF4J_DEFAULT_UNIT = "SECONDS"

  val SLF4J_KEY_PERIOD = "period"
  val SLF4J_KEY_UNIT = "unit"

  val pollPeriod = Option(property.getProperty(SLF4J_KEY_PERIOD)) match {
    case Some(s) => s.toInt
    case None => SLF4J_DEFAULT_PERIOD
  }

  val pollUnit: TimeUnit = Option(property.getProperty(SLF4J_KEY_UNIT)) match {
    case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
    case None => TimeUnit.valueOf(SLF4J_DEFAULT_UNIT)
  }
  // 檢查scheduleAtFixedRate周期時(shí)間最短1s
  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)

  val reporter: Slf4jReporter = Slf4jReporter.forRegistry(registry)
    .convertDurationsTo(TimeUnit.MILLISECONDS)
    .convertRatesTo(TimeUnit.SECONDS)
    .build()

  override def start() {
    reporter.start(pollPeriod, pollUnit)
  }

  override def stop() {
    reporter.stop()
  }

  override def report() {
    reporter.report()
  }
}

主要看start()方法,需要一個(gè)reporter對象漩怎,以及數(shù)據(jù)產(chǎn)生的周期時(shí)間pollPeriod勋颖、pollUnit。start再調(diào)用ScheduledReporter.start()

public void start(long period, TimeUnit unit) {
    // executor對象是Executors.newSingleThreadScheduledExecutor實(shí)現(xiàn)
    executor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                report();
            } catch (Exception ex) {
                LOG.error("Exception thrown from {}#report. Exception was suppressed.", ScheduledReporter.this.getClass().getSimpleName(), ex);
            }
        }
    }, period, period, unit);
}

Sink需要?jiǎng)?chuàng)建reporter勋锤、pollPeriod饭玲、pollUnit。周期性獲取Source數(shù)據(jù)并reporter

MetricsConfig

讀取Metrics相關(guān)的配置信息

private[spark] class MetricsConfig(conf: SparkConf) extends Logging {

  private val DEFAULT_PREFIX = "*"
  private val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
  private val DEFAULT_METRICS_CONF_FILENAME = "metrics.properties"

  private[metrics] val properties = new Properties()
  private[metrics] var perInstanceSubProperties: mutable.HashMap[String, Properties] = null
  
  // 設(shè)置default Properties屬性
  private def setDefaultProperties(prop: Properties) {
    prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet")
    prop.setProperty("*.sink.servlet.path", "/metrics/json")
    prop.setProperty("master.sink.servlet.path", "/metrics/master/json")
    prop.setProperty("applications.sink.servlet.path", "/metrics/applications/json")
  }

  /**
   * Load properties from various places, based on precedence
   * If the same property is set again latter on in the method, it overwrites the previous value
   */
  // 入口方法叁执,加載配置信息
  def initialize() {
    // Add default properties in case there's no properties file
    setDefaultProperties(properties)

    loadPropertiesFromFile(conf.getOption("spark.metrics.conf"))

    // Also look for the properties in provided Spark configuration
    val prefix = "spark.metrics.conf."
    conf.getAll.foreach {
      case (k, v) if k.startsWith(prefix) =>
        properties.setProperty(k.substring(prefix.length()), v)
      case _ =>
    }

    // Now, let's populate a list of sub-properties per instance, instance being the prefix that
    // appears before the first dot in the property name.
    // Add to the sub-properties per instance, the default properties (those with prefix "*"), if
    // they don't have that exact same sub-property already defined.
    //
    // For example, if properties has ("*.class"->"default_class", "*.path"->"default_path",
    // "driver.path"->"driver_path"), for driver specific sub-properties, we'd like the output to be
    // ("driver"->Map("path"->"driver_path", "class"->"default_class")
    // Note how class got added to based on the default property, but path remained the same
    // since "driver.path" already existed and took precedence over "*.path"
    perInstanceSubProperties = subProperties(properties, INSTANCE_REGEX)
    if (perInstanceSubProperties.contains(DEFAULT_PREFIX)) {
      val defaultSubProperties = perInstanceSubProperties(DEFAULT_PREFIX).asScala
      for ((instance, prop) <- perInstanceSubProperties if (instance != DEFAULT_PREFIX);
           (k, v) <- defaultSubProperties if (prop.get(k) == null)) {
        prop.put(k, v)
      }
    }
  }

  /**
   * Take a simple set of properties and a regex that the instance names (part before the first dot)
   * have to conform to. And, return a map of the first order prefix (before the first dot) to the
   * sub-properties under that prefix.
   *
   * For example, if the properties sent were Properties("*.sink.servlet.class"->"class1",
   * "*.sink.servlet.path"->"path1"), the returned map would be
   * Map("*" -> Properties("sink.servlet.class" -> "class1", "sink.servlet.path" -> "path1"))
   * Note in the subProperties (value of the returned Map), only the suffixes are used as property
   * keys.
   * If, in the passed properties, there is only one property with a given prefix, it is still
   * "unflattened". For example, if the input was Properties("*.sink.servlet.class" -> "class1"
   * the returned Map would contain one key-value pair
   * Map("*" -> Properties("sink.servlet.class" -> "class1"))
   * Any passed in properties, not complying with the regex are ignored.
   *
   * @param prop the flat list of properties to "unflatten" based on prefixes
   * @param regex the regex that the prefix has to comply with
   * @return an unflatted map, mapping prefix with sub-properties under that prefix
   */
   // 參考下面圖片示例
  def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = {
    val subProperties = new mutable.HashMap[String, Properties]
    prop.asScala.foreach { kv =>
      if (regex.findPrefixOf(kv._1.toString).isDefined) {
        val regex(prefix, suffix) = kv._1.toString
        subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2.toString)
      }
    }
    subProperties
  }

  // 當(dāng)key不存在時(shí)茄厘,獲取*對應(yīng)的properties屬性
  def getInstance(inst: String): Properties = {
    perInstanceSubProperties.get(inst) match {
      case Some(s) => s
      case None => perInstanceSubProperties.getOrElse(DEFAULT_PREFIX, new Properties)
    }
  }

  /**
   * Loads configuration from a config file. If no config file is provided, try to get file
   * in class path.
   */
  private[this] def loadPropertiesFromFile(path: Option[String]): Unit = {
    var is: InputStream = null
    try {
      is = path match {
        // 標(biāo)準(zhǔn)寫法。path存在時(shí)FileInputStream讀韧搅怠蚕断;不存在讀取項(xiàng)目中的metrics.properties文件,通過classloader加載: Utils.getSparkClassLoader.getResourceAsStream
        case Some(f) => new FileInputStream(f)
        case None => Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_METRICS_CONF_FILENAME)
      }

      if (is != null) {
        // load進(jìn)properties里
        properties.load(is)
      }
    } catch {
      case e: Exception =>
        val file = path.getOrElse(DEFAULT_METRICS_CONF_FILENAME)
        logError(s"Error loading configuration file $file", e)
    } finally {
      if (is != null) {
        // 切記必須close
        is.close()
      }
    }
  }
}
subprop.png

MetricsSystem

負(fù)責(zé)register Sources入挣、Sinks,并start sinks硝拧。MetricsSystem不是系統(tǒng)的控制中心径筏,而是每個(gè)instance一個(gè)MetricsSystem對象,負(fù)責(zé)instance粒度的控制

MetricsSystem類三個(gè)核心方法: registerSources()障陶、registerSinks()滋恬、sinks.foreach(_.start)

private[spark] class MetricsSystem private (
    val instance: String,
    conf: SparkConf,
    securityMgr: SecurityManager)
  extends Logging {
  // 構(gòu)造MetricsConfig對象,用于讀取配置信息
  private[this] val metricsConfig = new MetricsConfig(conf)

  private val sinks = new mutable.ArrayBuffer[Sink]
  private val sources = new mutable.ArrayBuffer[Source]
  private val registry = new MetricRegistry()

  private var running: Boolean = false

  // Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui
  private var metricsServlet: Option[MetricsServlet] = None

  /**
   * Get any UI handlers used by this metrics system; can only be called after start().
   */
  def getServletHandlers: Array[ServletContextHandler] = {
    require(running, "Can only call getServletHandlers on a running MetricsSystem")
    metricsServlet.map(_.getHandlers(conf)).getOrElse(Array())
  }
  // MetricsConfig對象初始化
  metricsConfig.initialize()

  def start() {
    require(!running, "Attempting to start a MetricsSystem that is already running")
    running = true
    // 注冊StaticSources抱究,也就是CodegenMetrics恢氯、HiveCatalogMetrics
    StaticSources.allSources.foreach(registerSource)
    // 注冊Sources
    registerSources()
    // 獲取Sinks
    registerSinks()
    // 啟動(dòng)Sinks
    sinks.foreach(_.start)
  }

  def stop() {
    if (running) {
      // foreach調(diào)用Sinks的stop方法
      sinks.foreach(_.stop)
    } else {
      logWarning("Stopping a MetricsSystem that is not running")
    }
    running = false
  }

  def report() {
    // foreach調(diào)用Sinks的report方法
    sinks.foreach(_.report())
  }

  /**
   * Build a name that uniquely identifies each metric source.
   * The name is structured as follows: <app ID>.<executor ID (or "driver")>.<source name>.
   * If either ID is not available, this defaults to just using <source name>.
   *
   * @param source Metric source to be named by this method.
   * @return An unique metric name for each combination of
   *         application, executor/driver and metric source.
   */
  // 構(gòu)建registry name
  private[spark] def buildRegistryName(source: Source): String = {
    val metricsNamespace = conf.get(METRICS_NAMESPACE).orElse(conf.getOption("spark.app.id"))

    val executorId = conf.getOption("spark.executor.id")
    val defaultName = MetricRegistry.name(source.sourceName)

    if (instance == "driver" || instance == "executor") {
      if (metricsNamespace.isDefined && executorId.isDefined) {
        // 當(dāng)instance是driver或executor時(shí),name的元素構(gòu)成
        // {{conf.getOption("spark.app.id")}}.{{conf.getOption("spark.executor.id")}}.{{source.sourceName}}
        MetricRegistry.name(metricsNamespace.get, executorId.get, source.sourceName)
      } else {
        // Only Driver and Executor set spark.app.id and spark.executor.id.
        // Other instance types, e.g. Master and Worker, are not related to a specific application.
        if (metricsNamespace.isEmpty) {
          logWarning(s"Using default name $defaultName for source because neither " +
            s"${METRICS_NAMESPACE.key} nor spark.app.id is set.")
        }
        if (executorId.isEmpty) {
          logWarning(s"Using default name $defaultName for source because spark.executor.id is " +
            s"not set.")
        }
        defaultName
      }
    } else { defaultName }
  }

  def getSourcesByName(sourceName: String): Seq[Source] =
    sources.filter(_.sourceName == sourceName)

  // 注冊單個(gè)source
  def registerSource(source: Source) {
    sources += source
    try {
      val regName = buildRegistryName(source)
      registry.register(regName, source.metricRegistry)
    } catch {
      case e: IllegalArgumentException => logInfo("Metrics already registered", e)
    }
  }
  
  // 刪除source
  def removeSource(source: Source) {
    sources -= source
    val regName = buildRegistryName(source)
    registry.removeMatching(new MetricFilter {
      def matches(name: String, metric: Metric): Boolean = name.startsWith(regName)
    })
  }

  // 注冊所有以source開頭的數(shù)據(jù)源
  private def registerSources() {
    val instConfig = metricsConfig.getInstance(instance)
    // MetricsSystem.SOURCE_REGEX: "^source\\.(.+)\\.(.+)".r
    val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)

    // Register all the sources related to instance
    sourceConfigs.foreach { kv =>
      val classPath = kv._2.getProperty("class")
      try {
        // 反射對象鼓寺。這里只能反射無參數(shù)的Source對象勋拟,比如JvmSource
        val source = Utils.classForName(classPath).newInstance()
        registerSource(source.asInstanceOf[Source])
      } catch {
        case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
      }
    }
  }

  // 獲取以sink開頭的Sinks
  private def registerSinks() {
    val instConfig = metricsConfig.getInstance(instance)
    // 以sink開頭的屬性配置: "^sink\\.(.+)\\.(.+)".r
    val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)

    sinkConfigs.foreach { kv =>
      val classPath = kv._2.getProperty("class")
      if (null != classPath) {
        try {
          // 傳入構(gòu)造函數(shù)參數(shù)值創(chuàng)建sink對象: kv._2, registry, securityMgr
          val sink = Utils.classForName(classPath)
            .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
            .newInstance(kv._2, registry, securityMgr)
          if (kv._1 == "servlet") {
            // key是servlet時(shí),轉(zhuǎn)換成MetricsServlet對象
            metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
          } else {
            // 否則添加到sinks列表
            sinks += sink.asInstanceOf[Sink]
          }
        } catch {
          case e: Exception =>
            logError("Sink class " + classPath + " cannot be instantiated")
            throw e
        }
      }
    }
  }
}

總結(jié)

先看下metrics.properties.template模板

*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink

*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink

*.sink.statsd.class=org.apache.spark.metrics.sink.StatsdSink
*.sink.statsd.prefix=spark

*.sink.console.period=10
*.sink.console.unit=seconds

master.sink.console.period=15
master.sink.console.unit=seconds

*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink

*.sink.csv.period=1
*.sink.csv.unit=minutes

*.sink.csv.directory=/tmp/

worker.sink.csv.period=10
worker.sink.csv.unit=minutes

*.sink.slf4j.class=org.apache.spark.metrics.sink.Slf4jSink

*.sink.slf4j.period=1
*.sink.slf4j.unit=minutes

master.source.jvm.class=org.apache.spark.metrics.source.JvmSource

worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource

driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource

executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
  1. 先讀取properties配置信息
  2. 根據(jù)instance name妈候,獲取${name}開頭的敢靡,不存在時(shí)讀取*開頭的屬性值,生成instConfig對象
  3. instConfig基礎(chǔ)上再分別獲取source或sink開頭的sourceConfigs對象
  4. sourceConfigs獲取source class苦银、sink class反射對象啸胧。source反射時(shí)調(diào)用默認(rèn)的無參構(gòu)造函數(shù),只能反射比如JvmSource幔虏,對于MasterSource需要在Master類里new出來纺念;sink反射時(shí)傳入了構(gòu)造函數(shù)參數(shù)值,參數(shù)也是從prop里讀取的
  5. register sources想括、sinks start
  6. MetricsSystem負(fù)責(zé)source陷谱、sink的啟停,而每個(gè)instance單獨(dú)啟停自身的metrics
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末主胧,一起剝皮案震驚了整個(gè)濱河市叭首,隨后出現(xiàn)的幾起案子习勤,更是在濱河造成了極大的恐慌,老刑警劉巖焙格,帶你破解...
    沈念sama閱讀 216,544評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件图毕,死亡現(xiàn)場離奇詭異,居然都是意外死亡眷唉,警方通過查閱死者的電腦和手機(jī)予颤,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,430評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來冬阳,“玉大人蛤虐,你說我怎么就攤上這事「闻悖” “怎么了驳庭?”我有些...
    開封第一講書人閱讀 162,764評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長氯窍。 經(jīng)常有香客問我饲常,道長,這世上最難降的妖魔是什么狼讨? 我笑而不...
    開封第一講書人閱讀 58,193評論 1 292
  • 正文 為了忘掉前任贝淤,我火速辦了婚禮,結(jié)果婚禮上政供,老公的妹妹穿的比我還像新娘播聪。我一直安慰自己,他們只是感情好布隔,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,216評論 6 388
  • 文/花漫 我一把揭開白布离陶。 她就那樣靜靜地躺著,像睡著了一般执泰。 火紅的嫁衣襯著肌膚如雪枕磁。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,182評論 1 299
  • 那天术吝,我揣著相機(jī)與錄音计济,去河邊找鬼。 笑死排苍,一個(gè)胖子當(dāng)著我的面吹牛沦寂,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播淘衙,決...
    沈念sama閱讀 40,063評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼传藏,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起毯侦,我...
    開封第一講書人閱讀 38,917評論 0 274
  • 序言:老撾萬榮一對情侶失蹤哭靖,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后侈离,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體试幽,經(jīng)...
    沈念sama閱讀 45,329評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,543評論 2 332
  • 正文 我和宋清朗相戀三年卦碾,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了铺坞。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,722評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡洲胖,死狀恐怖济榨,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情绿映,我是刑警寧澤擒滑,帶...
    沈念sama閱讀 35,425評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站叉弦,受9級特大地震影響橘忱,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜卸奉,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,019評論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望颖御。 院中可真熱鬧榄棵,春花似錦、人聲如沸潘拱。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,671評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽芦岂。三九已至瘪弓,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間禽最,已是汗流浹背腺怯。 一陣腳步聲響...
    開封第一講書人閱讀 32,825評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留川无,地道東北人呛占。 一個(gè)月前我還...
    沈念sama閱讀 47,729評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像懦趋,于是被迫代替她去往敵國和親晾虑。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,614評論 2 353

推薦閱讀更多精彩內(nèi)容