Spark Streaming+Kafka

目錄

前言

在WeTest輿情項(xiàng)目中,需要對(duì)每天千萬(wàn)級(jí)的游戲評(píng)論信息進(jìn)行詞頻統(tǒng)計(jì),在生產(chǎn)者一端巍实,我們將數(shù)據(jù)按照每天的拉取時(shí)間存入了Kafka當(dāng)中,而在消費(fèi)者一端丸边,我們利用了spark streaming從kafka中不斷拉取數(shù)據(jù)進(jìn)行詞頻統(tǒng)計(jì)纬朝。本文首先對(duì)spark streaming嵌入kafka的方式進(jìn)行歸納總結(jié),之后簡(jiǎn)單闡述Spark streaming+kafka在輿情項(xiàng)目中的應(yīng)用俄讹,最后將自己在Spark Streaming+kafka的實(shí)際優(yōu)化中的一些經(jīng)驗(yàn)進(jìn)行歸納總結(jié)。(如有任何紕漏歡迎補(bǔ)充來(lái)踩踪蹬,我會(huì)第一時(shí)間改正v

Spark streaming接收Kafka數(shù)據(jù)

用spark streaming流式處理kafka中的數(shù)據(jù),第一步當(dāng)然是先把數(shù)據(jù)接收過(guò)來(lái)疚漆,轉(zhuǎn)換為spark streaming中的數(shù)據(jù)結(jié)構(gòu)Dstream。接收數(shù)據(jù)的方式有兩種:1.利用Receiver接收數(shù)據(jù),2.直接從kafka讀取數(shù)據(jù)狡耻。

基于Receiver的方式

這種方式利用接收器(Receiver)來(lái)接收kafka中的數(shù)據(jù),其最基本是使用Kafka高階用戶API接口沼头。對(duì)于所有的接收器,從kafka接收來(lái)的數(shù)據(jù)會(huì)存儲(chǔ)在spark的executor中,之后spark streaming提交的job會(huì)處理這些數(shù)據(jù)毡庆。如下圖:


Receiver圖形解釋

在使用時(shí)亚铁,我們需要添加相應(yīng)的依賴包:

<dependency><!-- Spark Streaming Kafka -->
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.10</artifactId>
    <version>1.6.3</version>
</dependency>

而對(duì)于Scala的基本使用方式如下:

import org.apache.spark.streaming.kafka._

 val kafkaStream = KafkaUtils.createStream(streamingContext, 
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

還有幾個(gè)需要注意的點(diǎn):

  • 在Receiver的方式中,Spark中的partition和kafka中的partition并不是相關(guān)的站粟,所以如果我們加大每個(gè)topic的partition數(shù)量,僅僅是增加線程來(lái)處理由單一Receiver消費(fèi)的主題。但是這并沒(méi)有增加Spark在處理數(shù)據(jù)上的并行度。
  • 對(duì)于不同的Group和topic我們可以使用多個(gè)Receiver創(chuàng)建不同的Dstream來(lái)并行接收數(shù)據(jù)翘单,之后可以利用union來(lái)統(tǒng)一成一個(gè)Dstream柬唯。
  • 如果我們啟用了Write Ahead Logs復(fù)制到文件系統(tǒng)如HDFS失晴,那么storage level需要設(shè)置成 StorageLevel.MEMORY_AND_DISK_SER书在,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)

直接讀取方式

在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式?jīng)]有receiver這一層制轰,其會(huì)周期性的獲取Kafka中每個(gè)topic的每個(gè)partition中的最新offsets,之后根據(jù)設(shè)定的maxRatePerPartition來(lái)處理每個(gè)batch。其形式如下圖:


image

這種方法相較于Receiver方式的優(yōu)勢(shì)在于:

  • 簡(jiǎn)化的并行:在Receiver的方式中我們提到創(chuàng)建多個(gè)Receiver之后利用union來(lái)合并成一個(gè)Dstream的方式提高數(shù)據(jù)傳輸并行度。而在Direct方式中,Kafka中的partition與RDD中的partition是一一對(duì)應(yīng)的并行讀取Kafka數(shù)據(jù)业岁,這種映射關(guān)系也更利于理解和優(yōu)化棍好。
  • 高效:在Receiver的方式中,為了達(dá)到0數(shù)據(jù)丟失需要將數(shù)據(jù)存入Write Ahead Log中,這樣在Kafka和日志中就保存了兩份數(shù)據(jù),浪費(fèi)!而第二種方式不存在這個(gè)問(wèn)題,只要我們Kafka的數(shù)據(jù)保留時(shí)間足夠長(zhǎng),我們都能夠從Kafka進(jìn)行數(shù)據(jù)恢復(fù)。
  • 精確一次:在Receiver的方式中,使用的是Kafka的高階API接口從Zookeeper中獲取offset值,這也是傳統(tǒng)的從Kafka中讀取數(shù)據(jù)的方式汛聚,但由于Spark Streaming消費(fèi)的數(shù)據(jù)和Zookeeper中記錄的offset不同步瞄桨,這種方式偶爾會(huì)造成數(shù)據(jù)重復(fù)消費(fèi)芯侥。而第二種方式唉工,直接使用了簡(jiǎn)單的低階Kafka API,Offsets則利用Spark Streaming的checkpoints進(jìn)行記錄继谚,消除了這種不一致性济瓢。

以上主要是對(duì)官方文檔[1]的一個(gè)簡(jiǎn)單翻譯,詳細(xì)內(nèi)容大家可以直接看下官方文檔這里不再贅述纽帖。

不同于Receiver的方式宠漩,是從Zookeeper中讀取offset值,那么自然zookeeper就保存了當(dāng)前消費(fèi)的offset值懊直,那么如果重新啟動(dòng)開(kāi)始消費(fèi)就會(huì)接著上一次offset值繼續(xù)消費(fèi)扒吁。而在Direct的方式中,我們是直接從kafka來(lái)讀數(shù)據(jù)室囊,那么offset需要自己記錄雕崩,可以利用checkpoint、數(shù)據(jù)庫(kù)或文件記錄或者回寫到zookeeper中進(jìn)行記錄融撞。這里我們給出利用Kafka底層API接口盼铁,將offset及時(shí)同步到zookeeper中的通用類,我將其放在了github上:
Spark streaming+Kafka demo
示例中KafkaManager是一個(gè)通用類尝偎,而KafkaCluster是kafka源碼中的一個(gè)類饶火,由于包名權(quán)限的原因我把它單獨(dú)提出來(lái)鹏控,ComsumerMain簡(jiǎn)單展示了通用類的使用方法,在每次創(chuàng)建KafkaStream時(shí)肤寝,都會(huì)先從zooker中查看上次的消費(fèi)記錄offsets当辐,而每個(gè)batch處理完成后,會(huì)同步offsets到zookeeper中鲤看。

Spark向kafka中寫入數(shù)據(jù)

上文闡述了Spark如何從Kafka中流式的讀取數(shù)據(jù)缘揪,下面我整理向Kafka中寫數(shù)據(jù)。與讀數(shù)據(jù)不同义桂,Spark并沒(méi)有提供統(tǒng)一的接口用于寫入Kafka找筝,所以我們需要使用底層Kafka接口進(jìn)行包裝。
最直接的做法我們可以想到如下這種方式:

input.foreachRDD(rdd =>
  // 不能在這里創(chuàng)建KafkaProducer
  rdd.foreachPartition(partition =>
    partition.foreach{
      case x:String=>{
        val props = new HashMap[String, Object]()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        println(x)
        val producer = new KafkaProducer[String,String](props)
        val message=new ProducerRecord[String, String]("output",null,x)
        producer.send(message)
      }
    }
  )
) 

但是這種方式缺點(diǎn)很明顯慷吊,對(duì)于每個(gè)partition的每條記錄袖裕,我們都需要?jiǎng)?chuàng)建KafkaProducer,然后利用producer進(jìn)行輸出操作罢浇,注意這里我們并不能將KafkaProducer的新建任務(wù)放在foreachPartition外邊陆赋,因?yàn)镵afkaProducer是不可序列化的(not serializable)沐祷。顯然這種做法是不靈活且低效的嚷闭,因?yàn)槊織l記錄都需要建立一次連接。如何解決呢赖临?

  1. 首先胞锰,我們需要將KafkaProducer利用lazy val的方式進(jìn)行包裝如下:
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }
class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
  /* This is the key idea that allows us to work around running into
     NotSerializableExceptions. */
  lazy val producer = createProducer()
  def send(topic: String, key: K, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, key, value))
  def send(topic: String, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, value))
}

object KafkaSink {
  import scala.collection.JavaConversions._
  def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
    val createProducerFunc = () => {
      val producer = new KafkaProducer[K, V](config)
      sys.addShutdownHook {
        // Ensure that, on executor JVM shutdown, the Kafka producer sends
        // any buffered messages to Kafka before shutting down.
        producer.close()
      }
      producer
    }
    new KafkaSink(createProducerFunc)
  }
  def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
}
  1. 之后我們利用廣播變量的形式,將KafkaProducer廣播到每一個(gè)executor兢榨,如下:
// 廣播KafkaSink
val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
  val kafkaProducerConfig = {
    val p = new Properties()
    p.setProperty("bootstrap.servers", Conf.brokers)
    p.setProperty("key.serializer", classOf[StringSerializer].getName)
    p.setProperty("value.serializer", classOf[StringSerializer].getName)
    p
  }
  log.warn("kafka producer init done!")
  ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
}

這樣我們就能在每個(gè)executor中愉快的將數(shù)據(jù)輸入到kafka當(dāng)中:

//輸出到kafka
segmentedStream.foreachRDD(rdd => {
  if (!rdd.isEmpty) {
    rdd.foreach(record => {
      kafkaProducer.value.send(Conf.outTopics, record._1.toString, record._2)
      // do something else
    })
  }
})

Spark streaming+Kafka應(yīng)用

WeTest輿情監(jiān)控對(duì)于每天爬取的千萬(wàn)級(jí)游戲玩家評(píng)論信息都要實(shí)時(shí)的進(jìn)行詞頻統(tǒng)計(jì)嗅榕,對(duì)于爬取到的游戲玩家評(píng)論數(shù)據(jù),我們會(huì)生產(chǎn)到Kafka中吵聪,而另一端的消費(fèi)者我們采用了Spark Streaming來(lái)進(jìn)行流式處理凌那,首先利用上文我們闡述的Direct方式從Kafka拉取batch,之后經(jīng)過(guò)分詞吟逝、統(tǒng)計(jì)等相關(guān)處理帽蝶,回寫到DB上(至于Spark中DB的回寫方式可參考我之前總結(jié)的博文:Spark踩坑記——數(shù)據(jù)庫(kù)(Hbase+Mysql)),由此高效實(shí)時(shí)的完成每天大量數(shù)據(jù)的詞頻統(tǒng)計(jì)任務(wù)块攒。

Spark streaming+Kafka調(diào)優(yōu)

Spark streaming+Kafka的使用中励稳,當(dāng)數(shù)據(jù)量較小,很多時(shí)候默認(rèn)配置和使用便能夠滿足情況囱井,但是當(dāng)數(shù)據(jù)量大的時(shí)候驹尼,就需要進(jìn)行一定的調(diào)整和優(yōu)化,而這種調(diào)整和優(yōu)化本身也是不同的場(chǎng)景需要不同的配置庞呕。

合理的批處理時(shí)間(batchDuration)

幾乎所有的Spark Streaming調(diào)優(yōu)文檔都會(huì)提及批處理時(shí)間的調(diào)整新翎,在StreamingContext初始化的時(shí)候,有一個(gè)參數(shù)便是批處理時(shí)間的設(shè)定。如果這個(gè)值設(shè)置的過(guò)短料祠,即個(gè)batchDuration所產(chǎn)生的Job并不能在這期間完成處理骆捧,那么就會(huì)造成數(shù)據(jù)不斷堆積,最終導(dǎo)致Spark Streaming發(fā)生阻塞髓绽。而且敛苇,一般對(duì)于batchDuration的設(shè)置不會(huì)小于500ms,因?yàn)檫^(guò)小會(huì)導(dǎo)致SparkStreaming頻繁的提交作業(yè)顺呕,對(duì)整個(gè)streaming造成額外的負(fù)擔(dān)枫攀。在平時(shí)的應(yīng)用中,根據(jù)不同的應(yīng)用場(chǎng)景和硬件配置株茶,我設(shè)在1~10s之間来涨,我們可以根據(jù)SparkStreaming的可視化監(jiān)控界面,觀察Total Delay來(lái)進(jìn)行batchDuration的調(diào)整启盛,如下圖:


image

合理的Kafka拉取量(maxRatePerPartition重要)

對(duì)于Spark Streaming消費(fèi)kafka中數(shù)據(jù)的應(yīng)用場(chǎng)景蹦掐,這個(gè)配置是非常關(guān)鍵的,配置參數(shù)為:spark.streaming.kafka.maxRatePerPartition僵闯。這個(gè)參數(shù)默認(rèn)是沒(méi)有上線的卧抗,即kafka當(dāng)中有多少數(shù)據(jù)它就會(huì)直接全部拉出。而根據(jù)生產(chǎn)者寫入Kafka的速率以及消費(fèi)者本身處理數(shù)據(jù)的速度鳖粟,同時(shí)這個(gè)參數(shù)需要結(jié)合上面的batchDuration社裆,使得每個(gè)partition拉取在每個(gè)batchDuration期間拉取的數(shù)據(jù)能夠順利的處理完畢,做到盡可能高的吞吐量向图,而這個(gè)參數(shù)的調(diào)整可以參考可視化監(jiān)控界面中的Input Rate和Processing Time泳秀,如下圖:


image
image

緩存反復(fù)使用的Dstream(RDD)

Spark中的RDD和SparkStreaming中的Dstream,如果被反復(fù)的使用榄攀,最好利用cache()嗜傅,將該數(shù)據(jù)流緩存起來(lái),防止過(guò)度的調(diào)度資源造成的網(wǎng)絡(luò)開(kāi)銷檩赢÷类郑可以參考觀察Scheduling Delay參數(shù),如下圖:


image

設(shè)置合理的GC

長(zhǎng)期使用Java的小伙伴都知道漠畜,JVM中的垃圾回收機(jī)制币他,可以讓我們不過(guò)多的關(guān)注與內(nèi)存的分配回收,更加專注于業(yè)務(wù)邏輯憔狞,JVM都會(huì)為我們搞定蝴悉。對(duì)JVM有些了解的小伙伴應(yīng)該知道,在Java虛擬機(jī)中瘾敢,將內(nèi)存分為了初生代(eden generation)拍冠、年輕代(young generation)尿这、老年代(old generation)以及永久代(permanent generation),其中每次GC都是需要耗費(fèi)一定時(shí)間的庆杜,尤其是老年代的GC回收射众,需要對(duì)內(nèi)存碎片進(jìn)行整理,通常采用標(biāo)記-清楚的做法晃财。同樣的在Spark程序中叨橱,JVM GC的頻率和時(shí)間也是影響整個(gè)Spark效率的關(guān)鍵因素。在通常的使用中建議:

--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"

設(shè)置合理的CPU資源數(shù)

CPU的core數(shù)量断盛,每個(gè)executor可以占用一個(gè)或多個(gè)core罗洗,可以通過(guò)觀察CPU的使用率變化來(lái)了解計(jì)算資源的使用情況,例如钢猛,很常見(jiàn)的一種浪費(fèi)是一個(gè)executor占用了多個(gè)core伙菜,但是總的CPU使用率卻不高(因?yàn)橐粋€(gè)executor并不總能充分利用多核的能力),這個(gè)時(shí)候可以考慮讓么個(gè)executor占用更少的core命迈,同時(shí)worker下面增加更多的executor贩绕,或者一臺(tái)host上面增加更多的worker來(lái)增加并行執(zhí)行的executor的數(shù)量,從而增加CPU利用率壶愤。但是增加executor的時(shí)候需要考慮好內(nèi)存消耗淑倾,因?yàn)橐慌_(tái)機(jī)器的內(nèi)存分配給越多的executor,每個(gè)executor的內(nèi)存就越小公你,以致出現(xiàn)過(guò)多的數(shù)據(jù)spill over甚至out of memory的情況踊淳。

設(shè)置合理的parallelism

partition和parallelism假瞬,partition指的就是數(shù)據(jù)分片的數(shù)量陕靠,每一次task只能處理一個(gè)partition的數(shù)據(jù),這個(gè)值太小了會(huì)導(dǎo)致每片數(shù)據(jù)量太大脱茉,導(dǎo)致內(nèi)存壓力剪芥,或者諸多executor的計(jì)算能力無(wú)法利用充分;但是如果太大了則會(huì)導(dǎo)致分片太多琴许,執(zhí)行效率降低税肪。在執(zhí)行action類型操作的時(shí)候(比如各種reduce操作),partition的數(shù)量會(huì)選擇parent RDD中最大的那一個(gè)榜田。而parallelism則指的是在RDD進(jìn)行reduce類操作的時(shí)候益兄,默認(rèn)返回?cái)?shù)據(jù)的paritition數(shù)量(而在進(jìn)行map類操作的時(shí)候,partition數(shù)量通常取自parent RDD中較大的一個(gè)箭券,而且也不會(huì)涉及shuffle净捅,因此這個(gè)parallelism的參數(shù)沒(méi)有影響)。所以說(shuō)辩块,這兩個(gè)概念密切相關(guān)蛔六,都是涉及到數(shù)據(jù)分片的荆永,作用方式其實(shí)是統(tǒng)一的。通過(guò)spark.default.parallelism可以設(shè)置默認(rèn)的分片數(shù)量国章,而很多RDD的操作都可以指定一個(gè)partition參數(shù)來(lái)顯式控制具體的分片數(shù)量具钥。
在SparkStreaming+kafka的使用中,我們采用了Direct連接方式液兽,前文闡述過(guò)Spark中的partition和Kafka中的Partition是一一對(duì)應(yīng)的骂删,我們一般默認(rèn)設(shè)置為Kafka中Partition的數(shù)量。

使用高性能的算子

這里參考了美團(tuán)技術(shù)團(tuán)隊(duì)的博文四啰,并沒(méi)有做過(guò)具體的性能測(cè)試桃漾,其建議如下:

  • 使用reduceByKey/aggregateByKey替代groupByKey
  • 使用mapPartitions替代普通map
  • 使用foreachPartitions替代foreach
  • 使用filter之后進(jìn)行coalesce操作
  • 使用repartitionAndSortWithinPartitions替代repartition與sort類操作

使用Kryo優(yōu)化序列化性能

這個(gè)優(yōu)化原則我本身也沒(méi)有經(jīng)過(guò)測(cè)試,但是好多優(yōu)化文檔有提到拟逮,這里也記錄下來(lái)撬统。
在Spark中,主要有三個(gè)地方涉及到了序列化:

  • 在算子函數(shù)中使用到外部變量時(shí)敦迄,該變量會(huì)被序列化后進(jìn)行網(wǎng)絡(luò)傳輸(見(jiàn)“原則七:廣播大變量”中的講解)恋追。
  • 將自定義的類型作為RDD的泛型類型時(shí)(比如JavaRDD,Student是自定義類型)罚屋,所有自定義類型對(duì)象苦囱,都會(huì)進(jìn)行序列化。因此這種情況下脾猛,也要求自定義的類必須實(shí)現(xiàn)Serializable接口撕彤。
  • 使用可序列化的持久化策略時(shí)(比如MEMORY_ONLY_SER),Spark會(huì)將RDD中的每個(gè)partition都序列化成一個(gè)大的字節(jié)數(shù)組猛拴。

對(duì)于這三種出現(xiàn)序列化的地方羹铅,我們都可以通過(guò)使用Kryo序列化類庫(kù),來(lái)優(yōu)化序列化和反序列化的性能愉昆。Spark默認(rèn)使用的是Java的序列化機(jī)制职员,也就是ObjectOutputStream/ObjectInputStream API來(lái)進(jìn)行序列化和反序列化。但是Spark同時(shí)支持使用Kryo序列化庫(kù)跛溉,Kryo序列化類庫(kù)的性能比Java序列化類庫(kù)的性能要高很多焊切。官方介紹,Kryo序列化機(jī)制比Java序列化機(jī)制芳室,性能高10倍左右专肪。Spark之所以默認(rèn)沒(méi)有使用Kryo作為序列化類庫(kù),是因?yàn)镵ryo要求最好要注冊(cè)所有需要進(jìn)行序列化的自定義類型堪侯,因此對(duì)于開(kāi)發(fā)者來(lái)說(shuō)嚎尤,這種方式比較麻煩。

以下是使用Kryo的代碼示例抖格,我們只要設(shè)置序列化類诺苹,再注冊(cè)要序列化的自定義類型即可(比如算子函數(shù)中使用到的外部變量類型咕晋、作為RDD泛型類型的自定義類型等):

// 創(chuàng)建SparkConf對(duì)象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 設(shè)置序列化器為KryoSerializer收奔。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注冊(cè)要序列化的自定義類型掌呜。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

結(jié)果

經(jīng)過(guò)種種調(diào)試優(yōu)化,我們最終要達(dá)到的目的是坪哄,Spark Streaming能夠?qū)崟r(shí)的拉取Kafka當(dāng)中的數(shù)據(jù)质蕉,并且能夠保持穩(wěn)定,如下圖所示:


image

當(dāng)然不同的應(yīng)用場(chǎng)景會(huì)有不同的圖形翩肌,這是本文詞頻統(tǒng)計(jì)優(yōu)化穩(wěn)定后的監(jiān)控圖模暗,我們可以看到Processing Time這一柱形圖中有一Stable的虛線,而大多數(shù)Batch都能夠在這一虛線下處理完畢念祭,說(shuō)明整體Spark Streaming是運(yùn)行穩(wěn)定的兑宇。

參考文獻(xiàn)

  1. Spark Streaming + Kafka Integration Guide
  2. DirectStream、Stream的區(qū)別-SparkStreaming源碼分析02
  3. Spark Streaming性能調(diào)優(yōu)詳解
  4. Spark性能優(yōu)化指南——基礎(chǔ)篇
  5. Spark的性能調(diào)優(yōu)
  6. How to write to Kafka from Spark Streaming
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末粱坤,一起剝皮案震驚了整個(gè)濱河市隶糕,隨后出現(xiàn)的幾起案子奕扣,更是在濱河造成了極大的恐慌共耍,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,816評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件纫谅,死亡現(xiàn)場(chǎng)離奇詭異株旷,居然都是意外死亡再登,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門晾剖,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)锉矢,“玉大人,你說(shuō)我怎么就攤上這事钞瀑∩蜃玻” “怎么了慷荔?”我有些...
    開(kāi)封第一講書人閱讀 158,300評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵雕什,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我显晶,道長(zhǎng)贷岸,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書人閱讀 56,780評(píng)論 1 285
  • 正文 為了忘掉前任磷雇,我火速辦了婚禮偿警,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘唯笙。我一直安慰自己螟蒸,他們只是感情好盒使,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,890評(píng)論 6 385
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著七嫌,像睡著了一般少办。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上诵原,一...
    開(kāi)封第一講書人閱讀 50,084評(píng)論 1 291
  • 那天英妓,我揣著相機(jī)與錄音,去河邊找鬼绍赛。 笑死蔓纠,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的吗蚌。 我是一名探鬼主播腿倚,決...
    沈念sama閱讀 39,151評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼蚯妇!你這毒婦竟也來(lái)了猴誊?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 37,912評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤侮措,失蹤者是張志新(化名)和其女友劉穎懈叹,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體分扎,經(jīng)...
    沈念sama閱讀 44,355評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡澄成,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,666評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了畏吓。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片墨状。...
    茶點(diǎn)故事閱讀 38,809評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖菲饼,靈堂內(nèi)的尸體忽然破棺而出肾砂,到底是詐尸還是另有隱情,我是刑警寧澤宏悦,帶...
    沈念sama閱讀 34,504評(píng)論 4 334
  • 正文 年R本政府宣布镐确,位于F島的核電站,受9級(jí)特大地震影響饼煞,放射性物質(zhì)發(fā)生泄漏源葫。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,150評(píng)論 3 317
  • 文/蒙蒙 一砖瞧、第九天 我趴在偏房一處隱蔽的房頂上張望息堂。 院中可真熱鬧,春花似錦、人聲如沸荣堰。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,882評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)振坚。三九已至即硼,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間屡拨,已是汗流浹背只酥。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,121評(píng)論 1 267
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留呀狼,地道東北人裂允。 一個(gè)月前我還...
    沈念sama閱讀 46,628評(píng)論 2 362
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像哥艇,于是被迫代替她去往敵國(guó)和親绝编。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,724評(píng)論 2 351

推薦閱讀更多精彩內(nèi)容