Spark Structured Streaming 與Kafka的整合

Structured Streaming 與0.10及以上版本的Kafka整合來對(duì)Kafka中的讀書進(jìn)行讀取和寫入操作誊涯。

Linking

對(duì)于使用SBT/Maven定義的Scala/Java應(yīng)用程序,請(qǐng)將你的應(yīng)用程序與如下的artifact相連接:

groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.11
version = 2.2.0

對(duì)于Python引用程序,你需要在發(fā)布應(yīng)用程序時(shí)添加上述的庫及其依賴,詳情請(qǐng)參考下面的發(fā)布模塊介紹扮授。

從Kafka中讀取數(shù)據(jù)

為流式查詢創(chuàng)建一個(gè)Kafka Source

Scala 代碼:

// 訂閱一個(gè)topic
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// 訂閱多個(gè)topic
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// 訂閱滿足一定正則式的topic
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Java代碼:

// 訂閱一個(gè)topic
DataFrame<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

// 訂閱多個(gè)topic
DataFrame<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

// 訂閱滿足一定正則式的topic
DataFrame<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

Python代碼:

# 訂閱一個(gè)topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# 訂閱多個(gè)topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# 訂閱滿足一定正則式的topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

為批量查詢定義一個(gè)Kafak Source

如果你的用例更適用于批處理的話闸天,你可以根據(jù)既定的offset范圍來創(chuàng)建一個(gè)DataSet/DataFrame
Scala代碼:

// 訂閱一個(gè)topic,默認(rèn)從topic最早的offset到最近的offset
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// 訂閱多個(gè)topic,并指定每個(gè)topic的訂閱范圍
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// 訂閱滿足一定正則式的topic,默認(rèn)從topic最早的offset到最近的offset
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Java代碼:

// 訂閱一個(gè)topic,默認(rèn)從topic最早的offset到最近的offset
DataFrame<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// 訂閱多個(gè)topic,并指定每個(gè)topic的訂閱范圍
DataFrame<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
  .option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// 訂閱滿足一定正則式的topic,默認(rèn)從topic最早的offset到最近的offset
DataFrame<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

Python代碼:

# 訂閱一個(gè)topic,默認(rèn)從topic最早的offset到最近的offset
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# 訂閱多個(gè)topic,并指定每個(gè)topic的訂閱范圍
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# 訂閱滿足一定正則式的topic,默認(rèn)從topic最早的offset到最近的offset
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

Source中的每一行都遵循下列模式:

Column Type
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

不論是批處理還是流式處理都必須為Kafka Source設(shè)置如下選項(xiàng):

選項(xiàng) 意義
assign json值{"topicA":[0,1],"topicB":[2,4]} 指定消費(fèi)的TopicPartition得滤,Kafka Source只能指定"assign","subscribe","subscribePattern"選項(xiàng)中的一個(gè)
subscribe 一個(gè)以逗號(hào)隔開的topic列表 訂閱的topic列表奶稠,Kafka Source只能指定"assign","subscribe","subscribePattern"選項(xiàng)中的一個(gè)
subscribePattern Java正則表達(dá)式 訂閱的topic列表的正則式俯艰,Kafka Source只能指定"assign","subscribe","subscribePattern"選項(xiàng)中的一個(gè)
kafka.bootstrap.servers 以逗號(hào)隔開的host:port列表 Kafka的"bootstrap.servers"配置

下面的配置是可選的:

選項(xiàng) 默認(rèn)值 支持的查詢類型 意義
startingOffsets "earliest","lates"(僅streaming支持);或者json 字符"""{"topicA":{"0":23,"1":-1},"TopicB":{"0":-2}}""" 對(duì)于流式處理來說是"latest",對(duì)于批處理來說是"earliest" streaming和batch 查詢開始的位置可以是"earliest"(從最早的位置開始),"latest"(從最新的位置開始),或者通過一個(gè)json為每個(gè)TopicPartition指定開始的offset锌订。通過Json指定的話,json中-2可以用于表示earliest画株,-1可以用于表示latest辆飘。注意:對(duì)于批處理而言,latest值不允許使用的谓传。
endingOffsets latest or json string{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} latest batch 一個(gè)批查詢的結(jié)束位置蜈项,可以是"latest",即最近的offset续挟,或者通過json來為每個(gè)TopicPartition指定一個(gè)結(jié)束位置紧卒,在json中,-1表示latest诗祸,而-2是不允許使用的
failOnDataLoss true or false true streaming query 數(shù)據(jù)丟失之后(topic被刪除跑芳,或者offset不在可用范圍內(nèi)時(shí))查詢是否失敗,這可能會(huì)引起一個(gè)告警直颅,如果你覺得不適用于你的應(yīng)用程序時(shí)博个,你可以禁用掉。批查詢?nèi)绻谧x出數(shù)據(jù)的時(shí)候發(fā)現(xiàn)數(shù)據(jù)丟失了總會(huì)失敗功偿。
kafkaConsumer.pollTimeoutMs long 512 streaming 和batch executor輪詢kafka中的數(shù)據(jù)的超時(shí)時(shí)間
fetchOffset.numRetries int 3 streaming 和batch 獲取Kafka offset的重試次數(shù)
fetchOffset.retryIntervalMs long 10 獲取Kafka offset的時(shí)間間隔
maxOffsetPerTrigger long none streaming 和batch 不想翻譯了盆佣。。械荷。

寫數(shù)據(jù)到Kafka

這里我們討論對(duì)鞋流式查詢或者批查詢數(shù)據(jù)到Apache Kafka的支持共耍,請(qǐng)注意Apache Kafka只支持至少一次語義,所以當(dāng)寫流式查詢數(shù)據(jù)或者批查詢數(shù)據(jù)到Kafka時(shí)吨瞎,有些記錄可能會(huì)重復(fù)痹兜,這是有可能發(fā)生的,例如关拒,kafka需要重新獲取還未被一個(gè)Broker識(shí)別的消息記錄佃蚜,即使這條消息已經(jīng)被Broker接收并將消息寫入記錄中了庸娱。由于Kafka本身的這些寫語義,Structured Streaming無法避免寫入的重復(fù)記錄的發(fā)生谐算。如果一個(gè)查詢的寫入成功熟尉,你就可以認(rèn)為是至少寫入了一次,一個(gè)刪除寫入重復(fù)記錄的解決方案就是引入一個(gè)唯一主鍵洲脂,這就可以在讀取時(shí)執(zhí)行去重操作了斤儿。

Column Type
key(可選) string或者binary
value(必選) string或者binary
topic(*可選) string

寫入Kafka中的DataFrame必須遵循如下格式:

Column Type
key(可選) string或者binary
value(必選) string或者binary
topic(*可選) string

注意:如果topic配置項(xiàng)沒有指定的話,topic列是需要指定的
value列是唯一必選的選項(xiàng)恐锦,如果key列沒有指定的話往果,系統(tǒng)會(huì)指定為null。如果topic列指定的話一铅,會(huì)將給定的數(shù)據(jù)寫入到Kafak中對(duì)應(yīng)的topic中陕贮,除非"topic"配置項(xiàng)已經(jīng)指定,如果配置項(xiàng)已經(jīng)指定的話潘飘,配置項(xiàng)中的配置會(huì)覆蓋掉topic列中的配置肮之。

選項(xiàng) 意義
kafka.bootstrap.server 逗號(hào)分隔的host:port列表 Kafka的"bootstrap.servers"配置

無論是批查詢還是流式查詢,下面的選項(xiàng)必須得為Kafka sink指定卜录;

選項(xiàng) 意義
kafka.bootstrap.server 逗號(hào)分隔的host:port列表 Kafka的"bootstrap.servers"配置
選項(xiàng) 默認(rèn)值 支持的查詢類型 意義
topic string none streaming和batch 設(shè)置允許寫入所有行到Kafka中的topic列表戈擒,這個(gè)配置會(huì)覆蓋數(shù)據(jù)中存在的topic列

下面的配置是可選的:

選項(xiàng) 默認(rèn)值 支持的查詢類型 意義
topic string none streaming和batch 設(shè)置允許寫入所有行到Kafka中的topic列表,這個(gè)配置會(huì)覆蓋數(shù)據(jù)中存在的topic列

為Streaming查詢創(chuàng)建一個(gè)Kafka Sink

Scala代碼:

// 寫DataFrame中的key-value數(shù)據(jù)到option指定的kafka topic中
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

// 寫DataFrame中的key-value數(shù)據(jù)到數(shù)據(jù)中指定的Kafka topic中
val ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()

Java代碼:

// 寫DataFrame中的key-value數(shù)據(jù)到option指定的kafka topic中
StreamingQuery ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

// 寫DataFrame中的key-value數(shù)據(jù)到數(shù)據(jù)中指定的Kafka topic中
StreamingQuery ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()

Python代碼:

# 寫DataFrame中的key-value數(shù)據(jù)到option指定的kafka topic中
ds = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .start()

# 寫DataFrame中的key-value數(shù)據(jù)到數(shù)據(jù)中指定的Kafka topic中
ds = df \
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .start()

寫B(tài)atch查詢的結(jié)果到Kafka中

Scala代碼:

//寫DataFrame中的key-value數(shù)據(jù)到option指定的kafka topic中
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()

// 寫DataFrame中的key-value數(shù)據(jù)到數(shù)據(jù)中指定的Kafka topic中
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save()

Java代碼:

// 寫DataFrame中的key-value數(shù)據(jù)到option指定的kafka topic中
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()

// 寫DataFrame中的key-value數(shù)據(jù)到數(shù)據(jù)中指定的Kafka topic中
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save()

Python代碼:

# 寫DataFrame中的key-value數(shù)據(jù)到option指定的kafka topic中
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .save()

# 寫DataFrame中的key-value數(shù)據(jù)到數(shù)據(jù)中指定的Kafka topic中
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .save()

Kafka特殊配置

Kafka自身的配置可以通過DataStreamReader.option中以kafka.為前綴來指定艰毒,如:stream.option("kafka.bootstrap.servers","host:port"),有關(guān)Kafka的可配參數(shù)筐高,請(qǐng)參閱Kafka Consumer COnfig文檔中關(guān)于讀數(shù)據(jù)的參數(shù)以及Kafka Producer COnfig文檔中關(guān)于寫數(shù)據(jù)的參數(shù)。
注意:下面的Kafka參數(shù)是不能設(shè)置的丑瞧,如果設(shè)置的話Kafka的Source或者Sink會(huì)拋出異常:
group.id: Kafka source will create a unique group id for each query automatically.
auto.offset.reset: Set the source option startingOffsets to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that startingOffsets only applies when a new streaming query is started, and that resuming will always pick up from where the query left off.
key.deserializer: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the keys.
value.deserializer: Values are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the values.
key.serializer: Keys are always serialized with ByteArraySerializer or StringSerializer. Use DataFrame operations to explicitly serialize the keys into either strings or byte arrays.
value.serializer: values are always serialized with ByteArraySerializer or StringSerializer. Use DataFrame oeprations to explicitly serialize the values into either strings or byte arrays.
enable.auto.commit: Kafka source doesn’t commit any offset.
interceptor.classes: Kafka source always read keys and values as byte arrays. It’s not safe to use ConsumerInterceptor as it may break the query.

發(fā)布

作為Spark應(yīng)用程序柑土,通過spark-submit來啟動(dòng)你的應(yīng)用程序。spark-sql-kafka-0-10_2.11以及它的依賴可以使用--packages添加到'spark-submit'中嗦篱,如:
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 ...
請(qǐng)參考應(yīng)用程序提交指南來獲取更多關(guān)于提交帶有外部依賴的應(yīng)用程序的信息冰单。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市灸促,隨后出現(xiàn)的幾起案子诫欠,更是在濱河造成了極大的恐慌,老刑警劉巖浴栽,帶你破解...
    沈念sama閱讀 206,482評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件荒叼,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡典鸡,警方通過查閱死者的電腦和手機(jī)被廓,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來萝玷,“玉大人嫁乘,你說我怎么就攤上這事昆婿。” “怎么了蜓斧?”我有些...
    開封第一講書人閱讀 152,762評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵仓蛆,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我挎春,道長(zhǎng)看疙,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,273評(píng)論 1 279
  • 正文 為了忘掉前任直奋,我火速辦了婚禮能庆,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘脚线。我一直安慰自己搁胆,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,289評(píng)論 5 373
  • 文/花漫 我一把揭開白布殉挽。 她就那樣靜靜地躺著丰涉,像睡著了一般。 火紅的嫁衣襯著肌膚如雪斯碌。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,046評(píng)論 1 285
  • 那天肛度,我揣著相機(jī)與錄音傻唾,去河邊找鬼。 笑死承耿,一個(gè)胖子當(dāng)著我的面吹牛冠骄,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播加袋,決...
    沈念sama閱讀 38,351評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼凛辣,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了职烧?” 一聲冷哼從身側(cè)響起扁誓,我...
    開封第一講書人閱讀 36,988評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎蚀之,沒想到半個(gè)月后蝗敢,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,476評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡足删,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,948評(píng)論 2 324
  • 正文 我和宋清朗相戀三年寿谴,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片失受。...
    茶點(diǎn)故事閱讀 38,064評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡讶泰,死狀恐怖咏瑟,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情痪署,我是刑警寧澤码泞,帶...
    沈念sama閱讀 33,712評(píng)論 4 323
  • 正文 年R本政府宣布,位于F島的核電站惠桃,受9級(jí)特大地震影響浦夷,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜辜王,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,261評(píng)論 3 307
  • 文/蒙蒙 一劈狐、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧呐馆,春花似錦肥缔、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至收班,卻和暖如春坟岔,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背摔桦。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評(píng)論 1 262
  • 我被黑心中介騙來泰國(guó)打工社付, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人邻耕。 一個(gè)月前我還...
    沈念sama閱讀 45,511評(píng)論 2 354
  • 正文 我出身青樓鸥咖,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親兄世。 傳聞我的和親對(duì)象是個(gè)殘疾皇子啼辣,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,802評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容