FlinkSQL內(nèi)置了這么多函數(shù)你都使用過嗎益兄?

前言

???????? Flink Table 和 SQL 內(nèi)置了很多 SQL 中支持的函數(shù);如果有無法滿足的需要抖棘,則可以實現(xiàn)用戶自定義的函數(shù)(UDF)來解決茂腥。

一、系統(tǒng)內(nèi)置函數(shù)

???????? Flink Table API 和 SQL 為用戶提供了一組用于數(shù)據(jù)轉(zhuǎn)換的內(nèi)置函數(shù)切省。SQL 中支持的很多函數(shù)最岗,Table API 和 SQL 都已經(jīng)做了實現(xiàn),其它還在快速開發(fā)擴展中朝捆。

以下是一些典型函數(shù)的舉例般渡,全部的內(nèi)置函數(shù),可以參考官網(wǎng)介紹。

類型TableApiSQLAPI比較函數(shù)ANY1 === ANY2value1 = value2比較函數(shù)NY1 > ANY2value1 > value2邏輯函數(shù)BOOLEAN1 || BOOLEAN2boolean1 OR boolean2邏輯函數(shù)BOOLEAN.isFalseboolean IS FALSE邏輯函數(shù)!BOOLEANNOT boolean算術(shù)函數(shù)NUMERIC1 + NUMERIC2numeric1 + numeric2算術(shù)函數(shù)NUMERIC1.power(NUMERIC2)POWER(numeric1, numeric2)字符串函數(shù)STRING1 + STRING2string1 || string2字符串函數(shù)STRING.upperCase()UPPER(string)字符串函數(shù)STRING.charLength()CHAR_LENGTH(string)時間函數(shù)STRING.toDateDATE string時間函數(shù)STRING.toTimestampTIMESTAMP string時間函數(shù)currentTime()CURRENT_TIME時間函數(shù)NUMERIC.daysINTERVAL string range時間函數(shù)NUMERIC.minutes聚合函數(shù)FIELD.countCOUNT(*)聚合函數(shù)FIELD.sum0SUM([ ALL | DISTINCT ] expression)聚合函數(shù)RANK()聚合函數(shù)ROW_NUMBER()

二驯用、Flink UDF

???????? 用戶定義函數(shù)(User-defined Functions脸秽,UDF)是一個重要的特性,因為它們顯著地擴展了查詢(Query)的表達能力一些系統(tǒng)內(nèi)置函數(shù)無法解決的需求,我們可以用 UDF 來自定義實現(xiàn)固惯。

2.1 注冊用戶自定義函數(shù) UDF

???????? 在大多數(shù)情況下,用戶定義的函數(shù)必須先注冊片酝,然后才能在查詢中使用。不需要專門為Scala 的 Table API 注冊函數(shù)挖腰。????????

? ? ? ? ?函數(shù)通過調(diào)用 registerFunction()方法在 TableEnvironment 中注冊雕沿。當用戶定義的函數(shù)被注冊時,它被插入到 TableEnvironment 的函數(shù)目錄中曙聂,這樣 Table API 或 SQL 解析器就可以識別并正確地解釋它晦炊。

2.2 標量函數(shù)(Scalar Functions)

???????? 用戶定義的標量函數(shù),可以將 0宁脊、1 或多個標量值断国,映射到新的標量值。?????????

? ? ? ? ?為了定義標量函數(shù)榆苞,必須在 org.apache.flink.table.functions 中擴展基類 Scalar Function稳衬,并實現(xiàn)(一個或多個)求值(evaluation,eval)方法坐漏。標量函數(shù)的行為由求值方法決定薄疚,求值方法必須公開聲明并命名為 eval(直接 def 聲明,沒有 override)赊琳。求值方法的參數(shù)類型和返回類型街夭,確定了標量函數(shù)的參數(shù)和返回類型。

???????? 在下面的代碼中躏筏,我們定義自己的 HashCode 函數(shù)板丽,在 TableEnvironment 中注冊它,并在查詢中調(diào)用它趁尼。準備數(shù)據(jù)

sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718206,32
sensor_1,1547718208,36.2
sensor_1,1547718210,29.7
sensor_1,1547718213,30.9

代碼如下

package?udf

import?org.apache.flink.streaming.api.scala._
import?org.apache.flink.table.api.DataTypes
import?org.apache.flink.table.api.scala._
import?org.apache.flink.table.descriptors.{Csv,?FileSystem,?Schema}
import?org.apache.flink.table.functions.ScalarFunction
import?org.apache.flink.types.Row

/**
*?@Package?udf
*?@File :FlinkSqlUdfHashCode.java
*?@author?大數(shù)據(jù)老哥
*?@date?2020/12/29?21:58
*?@version?V1.0*/
object?FlinkSqlUdfHashCode?{
?def?main(args:?Array[String]):?Unit?=?{
???//1.構(gòu)建運行環(huán)境
???val?env?=?StreamExecutionEnvironment.getExecutionEnvironment
???env.setParallelism(1)?//?設置并行度為1
???//2.構(gòu)建TableEnv
???val?tableEnv?=?StreamTableEnvironment.create(env)
???//3.構(gòu)建數(shù)據(jù)源
???tableEnv.connect(new?FileSystem().path("./data/sensor.txt"))
?????.withFormat(new?Csv())
?????.withSchema(new?Schema()
???????.field("id",?DataTypes.STRING())
???????.field("timestamp",?DataTypes.INT())
???????.field("temperature",?DataTypes.DOUBLE())
?????).createTemporaryTable("sensor")
???//?轉(zhuǎn)為表
???val?tableSensor?=?tableEnv.from("sensor")
???//?床架轉(zhuǎn)換對象
???val?code?=?new?HashCode()
???//使用tableAPI?進行測試
???val?tableRes?=?tableSensor.select('id,?code('id))
???tableEnv.registerFunction("code",code)?//?注冊udf
???val?tableSql?=?tableEnv.sqlQuery(
?????"""
???????|select
???????|id,
???????|code(id)
???????|from
???????|sensor???????|""".stripMargin)
???//?輸出
???tableRes.toAppendStream[Row].print("tableAPI")
???tableSql.toAppendStream[Row].print("tableSql")

???env.execute("FlinkSqlUdfHashCode")
?}

?class?HashCode()?extends?ScalarFunction?{
???def?eval(s:?String):?String?=?{
?????s.hashCode.toString
???}
?}
}

2.3 表函數(shù)(Table Functions)

???????? 與用戶定義的標量函數(shù)類似埃碱,用戶定義的表函數(shù),可以將 0酥泞、1 或多個標量值作為輸入?yún)?shù)砚殿;?????????

? ? ? ? ?與標量函數(shù)不同的是,它可以返回任意數(shù)量的行作為輸出芝囤,而不是單個值似炎。為了定義一個表函數(shù)辛萍,必須擴展 org.apache.flink.table.functions 中的基類 TableFunction并實現(xiàn)(一個或多個)求值方法。表函數(shù)的行為由其求值方法決定羡藐,求值方法必須是 public的叹阔,并命名為 eval。求值方法的參數(shù)類型传睹,決定表函數(shù)的所有有效參數(shù)。?????????

? ? ? ? ?返回表的類型由 TableFunction 的泛型類型確定岸晦。求值方法使用 protected collect(T)方法發(fā)出輸出行欧啤。?????????

? ? ? ? ? 在 Table API 中,Table 函數(shù)需要與.joinLateral 或.leftOuterJoinLateral 一起使用启上。

? ? ? ? ? ? ?joinLateral 算子邢隧,會將外部表中的每一行,與表函數(shù)(TableFunction冈在,算子的參數(shù)是它的表達式)計算得到的所有行連接起來倒慧。? ? ? ? ?

? ? ? ? 而 leftOuterJoinLateral 算子,則是左外連接包券,它同樣會將外部表中的每一行與表函數(shù)計算生成的所有行連接起來纫谅;并且,對于表函數(shù)返回的是空表的外部行溅固,也要保留下來付秕。?????????

? ? ? ? ?在 SQL 中,則需要使用 Lateral Table()侍郭,或者帶有 ON TRUE 條件的左連接询吴。

???????? 下面的代碼中,我們將定義一個表函數(shù)亮元,在表環(huán)境中注冊它猛计,并在查詢中調(diào)用它。

數(shù)據(jù)準備

hello|word,hello|spark
hello|Flink,hello|java,hello|大數(shù)據(jù)老哥

編寫代碼

package?udf

import?org.apache.flink.streaming.api.scala._
import?org.apache.flink.table.api.scala._
import?org.apache.flink.table.functions.TableFunction
import?org.apache.flink.types.Row

/**
?*?@Package?udf
?*?@File :FlinkSqlUDFTableFunction.java
?*?@author?大數(shù)據(jù)老哥
?*?@date?2020/12/29?23:10
?*?@version?V1.0?*/
object?FlinkSqlUDFTableFunction?{
??def?main(args:?Array[String]):?Unit?=?{
????//1.構(gòu)建運行環(huán)境
????val?env?=?StreamExecutionEnvironment.getExecutionEnvironment
????env.setParallelism(1)?//?設置并行度為1
????//2.構(gòu)建TableEnv
????val?tableEnv?=?StreamTableEnvironment.create(env)
????//3.構(gòu)建數(shù)據(jù)源
????val?data?=?env.readTextFile("./data/words.txt")
????//?解析數(shù)據(jù)
????val?wordData:?DataStream[String]?=?data.flatMap(_.split(","))
????//?類型轉(zhuǎn)換
????val?tableWord?=?tableEnv.fromDataStream(wordData,'id)
????//?調(diào)用TableFunction
????val?split?=?new?Split()
????//?Table?API?方式一
????val?resTable1?=?tableWord.
??????joinLateral(split('id)?as('word,'length))
??????.select('id,'word,'length?)
????//??Table?API??方式二
????val?resTable2?=?tableWord.
??????leftOuterJoinLateral(split('id)?as('word,'length))
??????.select('id,'word,'length?)
????//?將數(shù)據(jù)注冊成表
?????tableEnv.createTemporaryView("sensor",tableWord)
?????tableEnv.registerFunction("split",split)

????//?SQL?方式一
????val?tableSQL1?=?tableEnv.sqlQuery(
??????"""
????????|select
????????|id,
????????|word,
????????|length
????????|from
????????|sensor?,LATERAL?TABLE(split(id))?AS?newsensor(word,?length)????????|""".stripMargin)
????//??SQL?方式二
????val?TableSQL2?=?tableEnv.sqlQuery(
??????"""
????????|select
????????|id,
????????|word,
????????|length
????????|from
????????|sensor
????????|?LEFT?JOIN?LATERAL?TABLE(split(id))?AS?newsensor(word,?length)?ON?TRUE????????|""".stripMargin)
????//?調(diào)用數(shù)據(jù)
????resTable1.toAppendStream[Row].print("resTable1")
????resTable2.toAppendStream[Row].print("resTable2")
????tableSQL1.toAppendStream[Row].print("tableSQL1")
????TableSQL2.toAppendStream[Row].print("TableSQL2")


????env.execute("FlinkSqlUDFTableFunction")
??}

??class?Split()?extends?TableFunction[(String,Int)]?{
????def?eval(str:?String):?Unit?=?{
??????str.split("\\|").foreach(
????????word?=>?collect((word,?word.length))
??????)
????}
??}
}

2.4 聚合函數(shù)(Aggregate Functions)

? ? ? ?假設現(xiàn)在有一張表爆捞,包含了各種飲料的數(shù)據(jù)奉瘤。該表由三列(id、name 和 price)嵌削、五行組成數(shù)據(jù)∶茫現(xiàn)在我們需要找到表中所有飲料的最高價格,即執(zhí)行 max()聚合苛秕,結(jié)果將是一個數(shù)值肌访。AggregateFunction 的工作原理如下:

  • 首先,它需要一個累加器艇劫,用來保存聚合中間結(jié)果的數(shù)據(jù)結(jié)構(gòu)(狀態(tài))吼驶。可以通過調(diào)用 AggregateFunction 的 createAccumulator()方法創(chuàng)建空累加器。
  • 隨后蟹演,對每個輸入行調(diào)用函數(shù)的 accumulate() 方法來更新累加器风钻。
  • 處理完所有行后,將調(diào)用函數(shù)的 getValue() 方法來計算并返回最終結(jié)果酒请。AggregationFunction 要求必須實現(xiàn)的方法:

???????? 除了上述方法之外骡技,還有一些可選擇實現(xiàn)的方法。其中一些方法羞反,可以讓系統(tǒng)執(zhí)行查詢更有效率布朦,而另一些方法,對于某些場景是必需的昼窗。例如是趴,如果聚合函數(shù)應用在會話窗口(session group window)上下文中,則 merge()方法是必需的澄惊。

  • retract()
  • merge()
  • resetAccumulator()

???????? ?接下來我們寫一個自定義AggregateFunction,計算一個每個price的平均值唆途。

數(shù)據(jù)準備

1,Latte,6
2,Milk,3
3,Breve,5
4,Mocha,8
5,Tea,4

代碼如下

package?udf

import?org.apache.calcite.rel.`type`.{RelDataType,?RelDataTypeFactory}
import?org.apache.flink.streaming.api.scala._
import?org.apache.flink.table.api.DataTypes
import?org.apache.flink.table.api.scala._
import?org.apache.flink.table.descriptors.{Csv,?FileSystem,?Schema}
import?org.apache.flink.table.functions.AggregateFunction
import?org.apache.flink.types.Row

import?java.util

/**
?*?@Package?udf
?*?@File :FlinkSQUDFAggregateFunction.java
?*?@author?大數(shù)據(jù)老哥
?*?@date?2020/12/30?22:06
?*?@version?V1.0?*/
object?FlinkSQUDFAggregateFunction?{
??def?main(args:?Array[String]):?Unit?=?{
????//1.構(gòu)建運行環(huán)境
????val?env?=?StreamExecutionEnvironment.getExecutionEnvironment
????env.setParallelism(1)?//?設置并行度為1
????//2.構(gòu)建TableEnv
????val?tableEnv?=?StreamTableEnvironment.create(env)
????//3.構(gòu)建數(shù)據(jù)源
????tableEnv.connect(new?FileSystem().path("./data/datas"))
??????.withFormat(new?Csv)
??????.withSchema(new?Schema()
????????.field("id",?DataTypes.STRING())
????????.field("name",?DataTypes.STRING())
????????.field("price",?DataTypes.DOUBLE())
??????).createTemporaryTable("datas")
????val?AvgTemp?=?new?AvgTemp()
????val?table?=?tableEnv.from("datas")

????val?resTableApi?=?table.groupBy('id)
??????.aggregate(AvgTemp('price)?as?'sumprice)
??????.select('id,?'sumprice)

????tableEnv.registerFunction("avgTemp",AvgTemp)
????val?tablesql?=?tableEnv.sqlQuery(
??????"""
????????|select
????????|id?,avgTemp(price)
????????|from?datas?group?by?id????????|""".stripMargin)
????resTableApi.toRetractStream[Row].print("resTableApi")
????tablesql.toRetractStream[Row].print("tablesql")
????env.execute("FlinkSQUDFAggregateFunction")
??}

??class?AvgTempAcc?{
????var?sum:?Double?=?0.0
????var?count:?Int?=?0
??}

??class?AvgTemp?extends?AggregateFunction[Double,?AvgTempAcc]?{
????override?def?getValue(acc:?AvgTempAcc):?Double?=?{
??????acc.sum?/?acc.count
????}

????override?def?createAccumulator():?AvgTempAcc?=?new?AvgTempAcc()
??}

??def?accumulate(accumulator:?AvgTempAcc,?price:?Double):?Unit?=?{
????accumulator.sum?+=?price

????accumulator.count?+=?1
??}

}

2.5表聚合函數(shù)(Table Aggregate Functions)

  • 為首先,它同樣需要一個累加器(Accumulator)掸驱,它是保存聚合中間結(jié)果的數(shù)據(jù)結(jié)構(gòu)肛搬。通過調(diào)用 TableAggregateFunctioncreateAccumulator()方法可以創(chuàng)建空累加器。
  • 為隨后亭敢,對每個輸入行調(diào)用函數(shù)的 accumulate()方法來更新累加器滚婉。
  • 為處理完所有行后,將調(diào)用函數(shù)的 emitValue()方法來計算并返回最終結(jié)果帅刀。除了上述方法之外让腹,還有一些可選擇實現(xiàn)的方法。
  • retract()
  • merge()
  • resetAccumulator()
  • emitValue()
  • emitUpdateWithRetract()

???????? 接下來我們寫一個自定義 TableAggregateFunction扣溺,用來提取每個 price 最高的兩個平均值骇窍。

數(shù)據(jù)準備

1,Latte,6
2,Milk,3
3,Breve,5
4,Mocha,8
5,Tea,4

代碼如下

package?udf

import?org.apache.flink.streaming.api.scala._
import?org.apache.flink.table.api.DataTypes
import?org.apache.flink.table.api.scala._
import?org.apache.flink.table.descriptors.{Csv,?FileSystem,?Schema}
import?org.apache.flink.table.functions.TableAggregateFunction
import?org.apache.flink.types.Row
import?org.apache.flink.util.Collector
import?udf.FlinkSQUDFAggregateFunction.AvgTemp

/**
?*?@Package?udf
?*?@File :FlinkSqlUDFTableAggregateFunction.java
?*?@author?大數(shù)據(jù)老哥
?*?@date?2020/12/30?22:53
?*?@version?V1.0?*/
object?FlinkSqlUDFTableAggregateFunction?{
??def?main(args:?Array[String]):?Unit?=?{
????//1.構(gòu)建運行環(huán)境
????val?env?=?StreamExecutionEnvironment.getExecutionEnvironment
????env.setParallelism(1)?//?設置并行度為1
????//2.構(gòu)建TableEnv
????val?tableEnv?=?StreamTableEnvironment.create(env)
????//3.構(gòu)建數(shù)據(jù)源
????tableEnv.connect(new?FileSystem().path("./data/datas"))
??????.withFormat(new?Csv)
??????.withSchema(new?Schema()
????????.field("id",?DataTypes.STRING())
????????.field("name",?DataTypes.STRING())
????????.field("price",?DataTypes.DOUBLE())
??????).createTemporaryTable("datas")
????val?table?=?tableEnv.from("datas")
????val?temp?=?new?Top2Temp()
????val?tableApi?=?table.groupBy('id)
??????.flatAggregate(temp('price)?as('tmpprice,?'rank))
??????.select('id,?'tmpprice,?'rank)
????tableEnv.registerFunction("temp",temp)


????tableApi.toRetractStream[Row].print()

????env.execute("FlinkSqlUDFTableAggregateFunction")
??}

??class?Top2TempAcc?{
????var?highestPrice:?Double?=?Int.MinValue
????var?secodeHighestPrice:?Double?=?Int.MinValue
??}

??class?Top2Temp?extends?TableAggregateFunction[(Double,?Int),?Top2TempAcc]?{
????override?def?createAccumulator():?Top2TempAcc?=?new?Top2TempAcc

????def?accumulate(acc:?Top2TempAcc,?temp:?Double):?Unit?=?{
??????if?(temp?>?acc.highestPrice)?{
????????acc.secodeHighestPrice?=?acc.highestPrice
????????acc.highestPrice?=?temp
??????}?else?if?(temp?>?acc.secodeHighestPrice)?{
????????acc.highestPrice?=?temp
??????}
????}

????def?emitValue(acc:?Top2TempAcc,?out:?Collector[(Double,?Int)]):?Unit?=?{
??????out.collect(acc.highestPrice,?1)
??????out.collect(acc.secodeHighestPrice,?2)
????}
??}

}

總結(jié)

???????? ?好了今天的內(nèi)容就分享到這里了。上述主要講解了一個系統(tǒng)自己帶的函數(shù)锥余,但是往往企業(yè)中不光只需要這些函數(shù)腹纳,有好多需求是本身函數(shù)是無法完成的。這時候就要用到我們的自定義函數(shù)了驱犹。他可以根據(jù)我們自己的需要進行編寫代碼來實現(xiàn)我們想要的功能嘲恍。我是大數(shù)據(jù)老哥我們下期見~~~

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市雄驹,隨后出現(xiàn)的幾起案子佃牛,更是在濱河造成了極大的恐慌,老刑警劉巖医舆,帶你破解...
    沈念sama閱讀 211,123評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件俘侠,死亡現(xiàn)場離奇詭異象缀,居然都是意外死亡,警方通過查閱死者的電腦和手機爷速,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,031評論 2 384
  • 文/潘曉璐 我一進店門央星,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人惫东,你說我怎么就攤上這事莉给。” “怎么了廉沮?”我有些...
    開封第一講書人閱讀 156,723評論 0 345
  • 文/不壞的土叔 我叫張陵禁谦,是天一觀的道長。 經(jīng)常有香客問我废封,道長,這世上最難降的妖魔是什么丧蘸? 我笑而不...
    開封第一講書人閱讀 56,357評論 1 283
  • 正文 為了忘掉前任漂洋,我火速辦了婚禮,結(jié)果婚禮上力喷,老公的妹妹穿的比我還像新娘刽漂。我一直安慰自己,他們只是感情好弟孟,可當我...
    茶點故事閱讀 65,412評論 5 384
  • 文/花漫 我一把揭開白布贝咙。 她就那樣靜靜地躺著,像睡著了一般拂募。 火紅的嫁衣襯著肌膚如雪庭猩。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,760評論 1 289
  • 那天陈症,我揣著相機與錄音蔼水,去河邊找鬼。 笑死录肯,一個胖子當著我的面吹牛趴腋,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播论咏,決...
    沈念sama閱讀 38,904評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼优炬,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了厅贪?” 一聲冷哼從身側(cè)響起蠢护,我...
    開封第一講書人閱讀 37,672評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎卦溢,沒想到半個月后糊余,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體秀又,經(jīng)...
    沈念sama閱讀 44,118評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,456評論 2 325
  • 正文 我和宋清朗相戀三年贬芥,在試婚紗的時候發(fā)現(xiàn)自己被綠了吐辙。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,599評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡蘸劈,死狀恐怖昏苏,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情威沫,我是刑警寧澤贤惯,帶...
    沈念sama閱讀 34,264評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站棒掠,受9級特大地震影響孵构,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜烟很,卻給世界環(huán)境...
    茶點故事閱讀 39,857評論 3 312
  • 文/蒙蒙 一颈墅、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧雾袱,春花似錦恤筛、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,731評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至林说,卻和暖如春煎殷,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背腿箩。 一陣腳步聲響...
    開封第一講書人閱讀 31,956評論 1 264
  • 我被黑心中介騙來泰國打工蝌数, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人度秘。 一個月前我還...
    沈念sama閱讀 46,286評論 2 360
  • 正文 我出身青樓顶伞,卻偏偏與公主長得像,于是被迫代替她去往敵國和親剑梳。 傳聞我的和親對象是個殘疾皇子唆貌,可洞房花燭夜當晚...
    茶點故事閱讀 43,465評論 2 348

推薦閱讀更多精彩內(nèi)容