Flink Eventtime 和 Watermark

Flink 中提供了3種時(shí)間模型:EventTime、ProcessingTime、IngestionTime。
底層實(shí)現(xiàn)上分為2種:Processing Time 與 Event Time,Ingestion Time 本質(zhì)上也是一種 Processing Time悔政,官方文檔 上對(duì)于3者的描述(參考下圖):

  • EventTime 是事件創(chuàng)建的時(shí)間,即數(shù)據(jù)產(chǎn)生時(shí)自帶時(shí)間戳延旧。
  • IngestionTime 是事件進(jìn)入 Flink 的時(shí)間谋国,即進(jìn)入 source operator 是給定的時(shí)間戳。
  • ProcessingTime 是每一個(gè)執(zhí)行 window 操作的本地時(shí)間迁沫。
FlinkTimeModel

可以參考以下兩篇 Blog 和 Paper 幫助對(duì)時(shí)間域的理解芦瘾,也是官方推薦的
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102
https://static.googleusercontent.com/media/research.google.com/zh-CN//pubs/archive/43864.pdf
附筆者 翻譯1 翻譯2 翻譯3

Flink 如何設(shè)置時(shí)間域纪岁?

調(diào)用 setStreamTimeCharacteristic 設(shè)置時(shí)間域喂饥,枚舉類 TimeCharacteristic 預(yù)設(shè)了三種時(shí)間域雏节,不顯式設(shè)置的情況下扰柠,默認(rèn)使用 TimeCharacteristic.ProcessTime。這也是 Flink 程序一般最開(kāi)始的工作藐吮。

# Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

// 可選的:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

EventTime 與 WaterMarks

為什么必須處理事件時(shí)間溺拱?

在大多數(shù)情況下,消息進(jìn)入系統(tǒng)中是無(wú)序的(網(wǎng)絡(luò)谣辞、硬件迫摔、分布式邏輯都可能影響),并且會(huì)有消息延遲到達(dá)(例如移動(dòng)場(chǎng)景中泥从,由于手機(jī)無(wú)信號(hào)句占,導(dǎo)致一系列的操作消息在手機(jī)重新連接信號(hào)后發(fā)送),如果按照消息進(jìn)入系統(tǒng)的時(shí)間計(jì)算躯嫉,結(jié)果會(huì)與實(shí)時(shí)嚴(yán)重不符合纱烘。理想情況是 event time 和 processing time 是一致的(發(fā)生時(shí)間即處理時(shí)間),但是現(xiàn)實(shí)情況是不一致的祈餐,兩者存在歪斜(skew)擂啥。

因此,支持事件時(shí)間的流式處理程序需要一種方法來(lái)測(cè)量事件時(shí)間的進(jìn)度帆阳。例如哺壶,有一個(gè)按小時(shí)構(gòu)建的窗口,當(dāng)事件時(shí)間超過(guò)了一小時(shí)的時(shí)間范圍蜒谤,需要通知該窗口山宾,以便關(guān)閉正在進(jìn)行的窗口。

什么是水喻⒒铡(watermarks)

Flink 中檢測(cè)事件時(shí)間處理進(jìn)度的機(jī)制就是水印资锰,Watermark 作為數(shù)據(jù)處理流中的一部分進(jìn)行傳輸,并且攜帶一個(gè)時(shí)間戳t旬盯。一個(gè) Watermark(t) 表示流中應(yīng)該不再有事件時(shí)間比t小的元素(某個(gè)事件的時(shí)間戳比 Watermark 時(shí)間大)台妆。


Watermark 有助于解決亂序問(wèn)題

下圖表示一個(gè)順序的事件流中的 Watermark, Watermark 只代表一個(gè)簡(jiǎn)單的標(biāo)記翎猛,


stream_watermark_in_order

下圖表示一個(gè)亂序的事件流中的 Watermark胖翰,表示所有事件時(shí)間戳小于 Watermark 時(shí)間戳的數(shù)據(jù)都已經(jīng)處理完了,任何事件大于 Watermark 的元素都不應(yīng)該再出現(xiàn)切厘,當(dāng)然這只是一種推測(cè)性的結(jié)果(基于多種信息的推測(cè))萨咳,


stream_watermark_out_of_order

并行流中的水印

水印是在 Source function(源函數(shù))處或之后生成的。源函數(shù)的每個(gè)并行子任務(wù)通常獨(dú)立地生成水印疫稿。這些水印定義了該特定并行源的事件時(shí)間培他。

當(dāng)水印經(jīng)過(guò)流處理程序時(shí)鹃两,會(huì)將該算子的事件時(shí)間向前推進(jìn)。當(dāng)算子提前其事件時(shí)間時(shí)舀凛,會(huì)為后續(xù)的算子生成新水印俊扳。

一些算子使用多個(gè)輸入流,例如猛遍,使用 union 或者 keyBy/partition 函數(shù)的算子馋记。此類算子的當(dāng)前事件時(shí)間是其輸入流事件時(shí)間的最小值。
當(dāng)它的輸入流更新它們的事件時(shí)間時(shí)懊烤,算子也會(huì)更新梯醒。

下圖顯示了事件和水印經(jīng)過(guò)并行流的示例,以及跟蹤事件時(shí)間的運(yùn)算符腌紧。

parallels_stream_watermark

延遲記錄(Late Elements)

某些記錄可能會(huì)違反水印的條件茸习,事件時(shí)間小于t但是晚于水印t到達(dá)。實(shí)際運(yùn)行過(guò)程中壁肋,事件可能被延遲任意的時(shí)間号胚,所以不可能指定一個(gè)時(shí)間,保證該時(shí)間之前的所有事件都被處理了浸遗。而且涕刚,即使延時(shí)時(shí)間是有界限的,過(guò)多的延遲水印的時(shí)間也是不理想的乙帮,會(huì)造成時(shí)間窗口處理的太多延時(shí)杜漠。

生成時(shí)間戳和水印

  1. 首先設(shè)置時(shí)間域
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  1. 分配時(shí)間戳
    處理事件事件需要知道事件發(fā)生時(shí)間的時(shí)間戳,通常從流中數(shù)據(jù)元的某個(gè)字段提取時(shí)間戳察净。時(shí)間戳分配與生成水印密切相關(guān)驾茴,水印告訴系統(tǒng)事件時(shí)間的進(jìn)展。

有兩種方法可以分配時(shí)間戳并生成水忧饪ā:

  • 直接在數(shù)據(jù)流 source 中
  • 通過(guò)時(shí)間戳分配器/水印生成器:在 Flink 中锈至,時(shí)間戳分配器也會(huì)定義要發(fā)出的水印

帶時(shí)間戳和水印的 Source Functions

Stream source 可以直接為生成的數(shù)據(jù)元分配時(shí)間戳,也可以發(fā)出水印译秦。完成此 算子操作后峡捡,不需要時(shí)間戳分配器。如果使用了時(shí)間戳分配器筑悴,則 source 函數(shù)提供的任何時(shí)間戳和水印都將被覆蓋们拙。

要直接為源中的數(shù)據(jù)元分配時(shí)間戳,源必須使用 collectWithTimestamp(...) 方法作用域 SourceContext阁吝。要生成水印砚婆,源必須調(diào)用 emitWatermark(Watermark) 函數(shù)。

下面是一個(gè)分配時(shí)間戳并生成水印的簡(jiǎn)單示例:

override def run(ctx: SourceContext[MyType]): Unit = {
    while (/* condition */) {
        val next: MyType = getNext()
        ctx.collectWithTimestamp(next, next.eventTimestamp)

        if (next.hasWatermarkTime) {
            ctx.emitWatermark(new Watermark(next.getWatermarkTime))
        }
    }
}

時(shí)間戳分配器/水印生成器

時(shí)間戳分配器(Timestamp assigners)獲取流并生成帶有帶時(shí)間戳數(shù)據(jù)元和水印的新流突勇。如果原始流已經(jīng)有時(shí)間戳或水印装盯,時(shí)間戳分配器會(huì)覆蓋它們坷虑。

時(shí)間戳分配器通常在數(shù)據(jù)源生成之后立即指定,但并非被嚴(yán)格要求這樣做埂奈。常見(jiàn)的模式是在時(shí)間戳分配器之前執(zhí)行解析(MapFunction)和過(guò)濾(FilterFunction)迄损。在任何情況下,需要在第一個(gè)操作事件時(shí)間的算子執(zhí)行之前指定時(shí)間戳分配器(例如第一個(gè)窗口算子操作)账磺。
作為一種特殊情況海蔽,當(dāng)使用 Kafka 作為流式作業(yè)的數(shù)據(jù)源時(shí) ,F(xiàn)link 允許在源(或消費(fèi)者)本身內(nèi)部指定時(shí)間戳分配器/水印發(fā)射器绑谣。更多信息相關(guān)信息請(qǐng)參考 Kafka Connector 文檔党窜。

下面是一個(gè)時(shí)間戳分配器/水印生成器的簡(jiǎn)單示例(只介紹了必須實(shí)現(xiàn)的主要接口):

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// create stream source
val stream: DataStream[MyEvent] = env.readFile(
         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
         FilePathFilter.createDefaultFilter())

// assign timestamp and watermark assigner after filter function
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
        .filter( _.severity == WARNING )
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())

// window function and sink 
withTimestampsAndWatermarks
        .keyBy( _.getGroup )
        .timeWindow(Time.seconds(10))
        .reduce( (a, b) => a.add(b) )
        .addSink(...)
使用周期性(periodically)水印

AssignerWithPeriodicWatermarks 分配時(shí)間戳并定期生成水印(可能取決于流數(shù)據(jù)元借宵,或純粹基于處理時(shí)間)幌衣。

生成水印的間隔(每n毫秒)使用 ExecutionConfig.setAutoWatermarkInterval(...)。每次調(diào)用分配器的方法 getCurrentWatermark()壤玫,如果返回的水印非空并且大于先前的水印豁护,則將發(fā)出新的水印。

下面有兩個(gè)例子欲间,時(shí)間戳分配器使用周期性水映铩:

該例子假定元素到達(dá)時(shí)在一定程度上是無(wú)序的,某個(gè)時(shí)間戳t的最后達(dá)到元素相比時(shí)間戳t的最早到達(dá)元素猎贴,最大延遲n毫秒班缎。

/**
The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxOutOfOrderness = 3500L // 3.5 seconds

    var currentMaxTimestamp: Long = _

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        val timestamp = element.getCreationTime()
        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
        timestamp
    }

    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        new Watermark(currentMaxTimestamp - maxOutOfOrderness)
    }
}

該例子假設(shè)元素在有界延遲后到達(dá),生成器生成的水印比處理時(shí)間滯后固定時(shí)間長(zhǎng)度她渴。

class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxTimeLag = 5000L // 5 seconds

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        element.getCreationTime
    }

    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current time minus the maximum time lag
        new Watermark(System.currentTimeMillis() - maxTimeLag)
    }
}

第二個(gè)例子比較容易理解达址,使用系統(tǒng)時(shí)間減去允許的延時(shí)時(shí)間作為 watermark 的時(shí)間。只跟當(dāng)前系統(tǒng)時(shí)間有關(guān)系趁耗,如果大批事件出現(xiàn)延時(shí)的情況沉唠,可能很多在 watermark 的時(shí)間之后出現(xiàn)了,會(huì)被被丟棄苛败。

第一個(gè)例子满葛,在當(dāng)前事件的事件時(shí)間和當(dāng)前最大時(shí)間(記錄最大的事件時(shí)間)中取最大值,得到最大的事件時(shí)間罢屈。用這個(gè)最大值減去一個(gè)允許的延時(shí)時(shí)間作為 watermark 時(shí)間嘀韧。同樣的如果大批事件發(fā)生延時(shí),那么對(duì)應(yīng)的 watermark 的時(shí)間就會(huì)向后推儡遮。

帶標(biāo)記(Punctuated)水印

使用 AssignerWithPunctuatedWatermarks 在某個(gè)事件指定生成新的水印的時(shí)候生成水印乳蛾。這種情況下,F(xiàn)link 首先會(huì)調(diào)用 extractTimestamp(...) 方法為數(shù)據(jù)分配時(shí)間戳鄙币,然后立即調(diào)用 checkAndGetNextWatermark(...)肃叶。

checkAndGetNextWatermark(...) 方法傳遞在 extractTimestamp(...) 生成的時(shí)間戳,并且界定是否要生成水印十嘿。每當(dāng) checkAndGetNextWatermark(...) 方法返回非空水印因惭,并且該水印大于先一個(gè)水印時(shí),將向后發(fā)出新水印绩衷。

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        element.getCreationTime
    }

    override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
        if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
    }
}

每個(gè)事件都可以生成水印蹦魔。但是,由于水印會(huì)導(dǎo)致一些后續(xù)的計(jì)算咳燕,因此過(guò)多的水印會(huì)降低性能勿决。

每個(gè) Kafka 分區(qū)一個(gè)時(shí)間戳

當(dāng)使用 Kafka 作為數(shù)據(jù)源的時(shí)候,每個(gè)分區(qū)可能有一個(gè)簡(jiǎn)單的事件時(shí)間模式(按時(shí)間戳升序或其他)招盲。當(dāng)消費(fèi)來(lái)自 Kafka 的流時(shí)低缩,多個(gè)分區(qū)一般會(huì)并行消費(fèi),分區(qū)中的事件交替消費(fèi)曹货,會(huì)破壞分區(qū)中的模式(Kafka 的消費(fèi)者客戶端工作方式)咆繁。

在這種情況下,可以使用 Flink 的 Kafka-partition-aware(分區(qū)感知)水印生成器顶籽。使用這個(gè)特性的時(shí)候玩般,水印會(huì)在 Kafka 消費(fèi)者內(nèi)部為每個(gè)分區(qū)生成,并且每個(gè)分區(qū)水印的合并方式與在流shuffle時(shí)合并水印的方式相同礼饱。

例如坏为,如果事件時(shí)間戳嚴(yán)格按每個(gè) Kafka 分區(qū)升序排列,那么使用升序時(shí)間戳水印生成器镊绪,為每分區(qū)生成水印 將產(chǎn)生完美的總體水印久脯。
下圖顯示了如何為每個(gè) Kafka 分區(qū)生成水印,以及在這種情況下水印如何通過(guò)流式數(shù)據(jù)流傳播镰吆。

val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props)

// kafka source set timestamp assigner
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {
    def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
})

val stream: DataStream[MyType] = env.addSource(kafkaSource)
Kafka consumer watermark
具有遞增時(shí)間戳的 Assigner

定期生成水印的最簡(jiǎn)單的特殊情況是帘撰,給定的源任務(wù)看到的時(shí)間戳按升序出現(xiàn)的情況。在這種情況下万皿,當(dāng)前時(shí)間戳始終可以充當(dāng)水印摧找。

時(shí)間戳只需要在每個(gè)并行數(shù)據(jù)源任務(wù)中是升序的。例如牢硅,如果在特定設(shè)置中蹬耘,每個(gè)并發(fā)的源實(shí)例讀取一個(gè) Kafka 分區(qū),則只需要在每個(gè) Kafka 分區(qū)中時(shí)間戳是遞增减余。水印合并機(jī)制將生成正確的水印综苔,當(dāng)并行流被shuffle,union,connect 或 merge 時(shí)如筛。

val stream: DataStream[MyEvent] = ...

val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
允許固定時(shí)間延遲的 Assigner

另一個(gè)定期水印的例子是堡牡,當(dāng)水印滯后于流中看到的最大時(shí)間戳(事件時(shí)間)一段固定的時(shí)間。包括杨刨,預(yù)先知道流中可能遇到的最大延遲的情況晤柄。Flink 提供了 BoundedOutOfOrdernessTimestampExtractor,使用參數(shù) maxOutOfOrderness妖胀,計(jì)算給定窗口的最終結(jié)果時(shí)芥颈,允許元素延遲的最長(zhǎng)時(shí)間,超過(guò)的會(huì)被忽略赚抡。延遲為 t - t_wt是數(shù)據(jù)的事件時(shí)間時(shí)間戳爬坑,t_w是前一個(gè)水印的時(shí)間戳),如果延遲大于0涂臣,數(shù)據(jù)被認(rèn)為是遲到的盾计,默認(rèn)會(huì)在計(jì)算窗口的作業(yè)結(jié)果時(shí)被忽略。

val stream: DataStream[MyEvent] = ...

val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))

Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_time.html
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamps_watermarks.html
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html
http://vishnuviswanath.com/flink_eventtime.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末肉康,一起剝皮案震驚了整個(gè)濱河市闯估,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌吼和,老刑警劉巖涨薪,帶你破解...
    沈念sama閱讀 212,383評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異炫乓,居然都是意外死亡刚夺,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,522評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門末捣,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)侠姑,“玉大人,你說(shuō)我怎么就攤上這事箩做∶Ш欤” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,852評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵邦邦,是天一觀的道長(zhǎng)安吁。 經(jīng)常有香客問(wèn)我,道長(zhǎng)燃辖,這世上最難降的妖魔是什么鬼店? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,621評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮黔龟,結(jié)果婚禮上妇智,老公的妹妹穿的比我還像新娘滥玷。我一直安慰自己,他們只是感情好巍棱,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,741評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布惑畴。 她就那樣靜靜地躺著,像睡著了一般拉盾。 火紅的嫁衣襯著肌膚如雪桨菜。 梳的紋絲不亂的頭發(fā)上豁状,一...
    開(kāi)封第一講書(shū)人閱讀 49,929評(píng)論 1 290
  • 那天捉偏,我揣著相機(jī)與錄音,去河邊找鬼泻红。 笑死夭禽,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的谊路。 我是一名探鬼主播讹躯,決...
    沈念sama閱讀 39,076評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼缠劝!你這毒婦竟也來(lái)了潮梯?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,803評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤惨恭,失蹤者是張志新(化名)和其女友劉穎秉馏,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體脱羡,經(jīng)...
    沈念sama閱讀 44,265評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡萝究,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,582評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了锉罐。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片帆竹。...
    茶點(diǎn)故事閱讀 38,716評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖脓规,靈堂內(nèi)的尸體忽然破棺而出栽连,到底是詐尸還是另有隱情,我是刑警寧澤侨舆,帶...
    沈念sama閱讀 34,395評(píng)論 4 333
  • 正文 年R本政府宣布秒紧,位于F島的核電站,受9級(jí)特大地震影響态罪,放射性物質(zhì)發(fā)生泄漏噩茄。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,039評(píng)論 3 316
  • 文/蒙蒙 一复颈、第九天 我趴在偏房一處隱蔽的房頂上張望绩聘。 院中可真熱鬧沥割,春花似錦、人聲如沸凿菩。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,798評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)衅谷。三九已至椒拗,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間获黔,已是汗流浹背蚀苛。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,027評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留玷氏,地道東北人堵未。 一個(gè)月前我還...
    沈念sama閱讀 46,488評(píng)論 2 361
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像盏触,于是被迫代替她去往敵國(guó)和親渗蟹。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,612評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容