深入解析Kafka消費(fèi)者——提交和偏移量

KafkaConsumer(消費(fèi)者)每次調(diào)用 poll()方法涨醋,它總是返回由生產(chǎn)者寫入 Kafka但還沒(méi)有被消費(fèi)者讀取過(guò)的記錄盆繁, 我們因 此可以追蹤到哪些記錄是被群組里的哪個(gè)消費(fèi)者讀取的细诸。之前已經(jīng)討論過(guò)倒庵, Kafka 不會(huì)像其他 JMS 隊(duì)列那樣需要得到消費(fèi)者的確認(rèn)绿贞,這是 Kafka 的一個(gè)獨(dú)特之處因块。相反,消 費(fèi)者可以使用 Kafka來(lái)追蹤消息在分區(qū)里的位置(偏移量)籍铁。

我們把更新分區(qū)當(dāng)前位置的操作叫作提交涡上。

那么消費(fèi)者是如何提交偏移量的呢?消費(fèi)者往一個(gè) 叫作 _consumer_offset 的特殊主題發(fā)送 消息趾断,消息里包含每個(gè)分區(qū)的偏移量。 如果消費(fèi)者一直處于運(yùn)行狀態(tài)吩愧,那么偏移量就沒(méi)有 什么用處芋酌。不過(guò),如果悄費(fèi)者發(fā)生崩潰或者有新 的消費(fèi)者加入群組耻警,就會(huì)觸發(fā)再均衡隔嫡,完 成再均衡之后,每個(gè)消費(fèi)者可能分配到新 的分區(qū)甘穿,而不是之前處理的那個(gè)腮恩。為了能夠繼續(xù) 之前的工作,消費(fèi)者需要讀取每個(gè)分區(qū)最后一次提交 的偏移量温兼,然后從偏移量指定的地方 繼續(xù)處理秸滴。

如果提交的偏移量小于客戶端處理 的最后一個(gè)消息的偏移量 ,那么處于兩個(gè)偏移量之間的 消息就會(huì)被重復(fù)處理募判,如圖 4-6所示荡含。

如果提交的偏移量大于客戶端處理的最后一個(gè)消息的偏移量,那么處于兩個(gè)偏移量之間的 消息將會(huì)丟失届垫,如圖 4-7 所示释液。

所以,處理偏移量的方式對(duì)客戶端會(huì)有很大的影響装处。 KafkaConsumer API提供了很多種方式來(lái)提交偏移量误债。

自動(dòng)提交

最簡(jiǎn)單的提交方式是讓悄費(fèi)者自動(dòng)提交偏移量。如果enable.auto.commit被設(shè)為 true妄迁,那么每過(guò)5s寝蹈,消費(fèi)者會(huì)自動(dòng)把從 poll() 方法接收到的最大偏移量提交上去。提交時(shí)間間隔由 auto.commit.interval.ms 控制登淘,默認(rèn)值是 5s箫老。與消費(fèi)者里的其他東西 一樣,自動(dòng)提交也是在輪詢(poll() )里進(jìn)行的黔州。消費(fèi)者每次在進(jìn)行輪詢時(shí)會(huì)檢查是否該提交偏移量了耍鬓,如果是,那 么就會(huì)提交從上一次輪詢返回的偏移量辩撑。

不過(guò)界斜,在使用這種簡(jiǎn)便的方式之前,需要知道它將會(huì)帶來(lái)怎樣的結(jié)果合冀。
假設(shè)我們?nèi)匀皇褂媚J(rèn)的 5s提交時(shí)間間隔,在最近一次提交之后的 3s發(fā)生了再均衡项贺,再 均衡之后君躺,消費(fèi)者從最后一次提交的偏移量位置開(kāi)始讀取消息峭判。這個(gè)時(shí)候偏移量已經(jīng)落后 了 3s,所以在這 3s 內(nèi)到達(dá)的消息會(huì)被重復(fù)處理棕叫×煮Γ可以通過(guò)修改提交時(shí)間間隔來(lái)更頻繁地提交偏移量,減小可能出現(xiàn)重復(fù)消息的時(shí)間窗俺泣,不過(guò)這種情況是無(wú)也完全避免的 疗认。

在使用自動(dòng)提交時(shí) ,每次調(diào)用輪詢方怯都會(huì)把上一次調(diào)用返 回的偏移量提交上去伏钠,它并不 知道具體哪些消息已經(jīng)被處理了横漏,所以在再次調(diào)用之前最好確保所有當(dāng)前調(diào)用返回 的消息 都已經(jīng)處理完畢(在調(diào)用 close() 方法之前也會(huì)進(jìn)行自動(dòng)提交)。 一般情況下不會(huì)有什么問(wèn) 題熟掂,不過(guò)在處理異扯薪剑或提前退出輪詢時(shí)要格外小心 。

自動(dòng)提交雖然方便 赴肚, 不過(guò)并沒(méi)有為開(kāi)發(fā)者留有余地來(lái)避免重復(fù)處理消息素跺。


提交當(dāng)前偏移量

大部分開(kāi)發(fā)者通過(guò)控制偏移量提交時(shí)間來(lái)消除丟失消息的可能性,井在發(fā)生再均衡時(shí)減少 重復(fù)消息的數(shù)量誉券。消費(fèi)者 API提供了另一種提交偏移量的方式 指厌, 開(kāi)發(fā)者可以在必要的時(shí)候 提交當(dāng)前偏移盤,而不是基于時(shí)間間隔踊跟。

取消自動(dòng)提交踩验,把 auto.commit.offset 設(shè)為 false,讓應(yīng)用程序決定何時(shí)提交 偏 移量琴锭。使用 commitSync() 提交偏移量最簡(jiǎn)單也最可靠晰甚。這個(gè) API會(huì)提交由 poll() 方法返回 的最新偏移量,提交成 功后馬上返回决帖,如果提交失敗就拋出異常厕九。

要記住, commitSync() 將會(huì)提交由 poll() 返回的最新偏移量 地回, 所以在處理完所有記錄后要 確保調(diào)用了 commitSync()扁远,否則還是會(huì)有丟失消息的風(fēng)險(xiǎn)。如果發(fā)生了再均衡刻像,從最近一 批消息到發(fā)生再均衡之間的所有消息都將被重復(fù)處理畅买。

下面是我們?cè)谔幚硗曜罱慌⒑笫褂?commitSync() 方法提交偏移量的例子。


異步提交

同步提交有一個(gè)不足之處细睡,在 broker對(duì)提交請(qǐng)求作出回應(yīng)之前谷羞,應(yīng)用程序會(huì)一直阻塞,這樣會(huì)限制應(yīng)用程序的吞吐量。我們可以通過(guò)降低提交頻率來(lái)提升吞吐量湃缎,但如果發(fā)生了再均衡犀填, 會(huì)增加重復(fù)消息的數(shù)量。

這個(gè)時(shí)候可以使用異步提交 API嗓违。我們只管發(fā)送提交請(qǐng)求九巡,無(wú)需等待 broker的響應(yīng)。

在成功提交或碰到無(wú)怯恢復(fù)的錯(cuò)誤之前蹂季, commitSync() 會(huì)一直重試(應(yīng)用程序也一直阻塞)冕广,但是 commitAsync() 不會(huì),這也是 commitAsync() 不好的 一個(gè)地方偿洁。它之所以不進(jìn)行重試撒汉,是因?yàn)樵谒盏?服務(wù)器響應(yīng)的時(shí)候,可能有一個(gè)更大的偏移量已經(jīng)提交成功父能。假設(shè)我們發(fā)出一個(gè)請(qǐng)求用于提交偏移量 2000神凑,這個(gè)時(shí)候發(fā)生了短暫的通信問(wèn)題 ,服務(wù)器收不到請(qǐng)求何吝,自然也不會(huì) 作出任何響應(yīng)溉委。與此同時(shí),我們處理了另外一批消息爱榕,并成功提交了偏移量 3000瓣喊。如果 commitAsync() 重新嘗試提交偏移量 2000,它有可能在偏移量 3000之后提交成功黔酥。這個(gè)時(shí) 候如果發(fā)生再均衡藻三,就會(huì)出現(xiàn)重復(fù)消息。

我們之所以提到這個(gè)問(wèn)題的復(fù)雜性和提交順序的重要性跪者,是因?yàn)?commitAsync()也支持回 調(diào)棵帽,在 broker 作出響應(yīng)時(shí)會(huì)執(zhí)行回調(diào)≡幔回調(diào)經(jīng)常被用于記錄提交錯(cuò)誤或生成度量指標(biāo)逗概, 不 過(guò)如果你要用它來(lái)進(jìn)行重試, 一定要注意提交的順序忘衍。

重試異步提交

我們可以使用一個(gè)單調(diào)遞增的序列號(hào)來(lái)維護(hù)異步提交的順序逾苫。在每次提交偏 移量之后或在回調(diào)里提交偏移量時(shí)遞增序列號(hào)。在進(jìn)行重試前枚钓,先檢查回調(diào) 的序列號(hào)和即將提交的偏移量是否相等铅搓,如果相等,說(shuō)明沒(méi)有新的提交搀捷,那么可以安全地進(jìn)行重試星掰。如果序列號(hào)比較大,說(shuō)明有一個(gè)新的提交已經(jīng)發(fā)送出去了,應(yīng)該停止重試蹋偏。


同步和異步組合提交

一般情況下便斥,針對(duì)偶爾出現(xiàn)的提交失敗至壤,不進(jìn)行重試不會(huì)有太大問(wèn)題威始,因?yàn)槿绻峤皇?是 因?yàn)榕R時(shí)問(wèn)題導(dǎo)致的,那么后續(xù)的提交總會(huì)有成功的像街。但如果這是發(fā)生在關(guān)閉消費(fèi)者或 再均衡前的最后一次提交黎棠,就要確保能夠提交成功。

因此镰绎,在消費(fèi)者關(guān)閉前一般會(huì)組合使用 commitAsync()和 commitSync()脓斩。它們的工作原理如下(后面講到再均衡監(jiān)聽(tīng)器時(shí),我們會(huì)討論如何在發(fā)生再均衡前提交偏移量):


提交特定的偏移量

提交偏移量的頻率與處理消息批次的頻率是一樣的畴栖。但如果想要更頻繁地提交出怎么辦随静?如果 poll() 方法返回一大批數(shù)據(jù),為了避免因再均衡引起的重復(fù)處理整批消息吗讶,想要在批次中間提交偏移量該怎么辦燎猛?這種情況無(wú)法通過(guò)調(diào)用 commitSync()或 commitAsync() 來(lái)實(shí)現(xiàn),因?yàn)樗鼈冎粫?huì)提交最后一個(gè)偏移量照皆,而此時(shí)該批次里的消息還沒(méi)有處理完重绷。

幸運(yùn)的是,消費(fèi)者 API 允許在調(diào)用 commitSync()和 commitAsync()方法時(shí)傳進(jìn)去希望提交 的分區(qū)和偏移量的 map膜毁。假設(shè)你處理了半個(gè)批次的消息昭卓, 最后一個(gè)來(lái)自主題“customers” 分區(qū) 3 的消息的偏移量是 5000, 你可以調(diào)用 commitSync() 方法來(lái)提交它瘟滨。不過(guò)候醒,因?yàn)橄M(fèi)者可能不只讀取一個(gè)分區(qū), 你需要跟蹤所有分區(qū)的偏移量杂瘸,所以在這個(gè)層面上控制偏移 量 的提交會(huì)讓代碼變復(fù)雜倒淫。

下面是提交特定偏移量的例子 :


再均衡監(jiān)聽(tīng)器

在提交偏移量一節(jié)中提到過(guò),消費(fèi)者在退出和進(jìn)行分區(qū)再均衡之前胧沫,會(huì)做一些清理工作昌简。

你會(huì)在消費(fèi)者失去對(duì)一個(gè)分區(qū)的所有權(quán)之前提交最后一個(gè)已處理記錄的偏移量。如果消費(fèi) 者準(zhǔn)備了 一 個(gè)緩沖區(qū)用于處理偶發(fā)的事件绒怨,那么在失去分區(qū)所有權(quán)之前纯赎, 需要處理在緩沖 區(qū)累積下來(lái)的記錄。你可能還需要關(guān)閉文件句柄南蹂、數(shù)據(jù)庫(kù)連接等犬金。

在為消費(fèi)者分配新分區(qū)或移除舊分區(qū)時(shí),可以通過(guò)消費(fèi)者 API執(zhí)行 一 些應(yīng)用程序代碼,在調(diào)用 subscribe()方法時(shí)傳進(jìn)去一個(gè)ConsumerRebalancelistener實(shí)例就可以了晚顷。 ConsumerRebalancelistener有兩個(gè)需要實(shí)現(xiàn)的方法峰伙。

  • (1) public void onPartitionsRevoked(Collection<TopicPartition> partitions)方法會(huì)在 再均衡開(kāi)始之前和消費(fèi)者停止讀取消息之后被調(diào)用。如果在這里提交偏移量该默,下一個(gè)接 管分區(qū) 的消費(fèi)者就知道該從哪里開(kāi)始讀取了瞳氓。

  • (2) public void onPartitionsAssigned(Collection<TopicPartition> partitions)方法會(huì)在 重新分配分區(qū)之后和消費(fèi)者開(kāi)始讀取消息之前被調(diào)用。

下面的例子將演示如何在失去分區(qū)所有權(quán)之前通過(guò) onPartitionsRevoked()方法來(lái)提交偏移量栓袖。在下一節(jié)匣摘,我們會(huì)演示另一個(gè)同時(shí)使用了 onPartitionsAssigned()方法的例子。


從特定偏移量處開(kāi)始處理記錄

到目前為止裹刮,我們知道了如何使用 poll() 方法從各個(gè)分區(qū)的最新偏移量處開(kāi)始處理消息音榜。 不過(guò),有時(shí)候我們也需要從特定的偏移量處開(kāi)始讀取消息捧弃。

如果你想從分區(qū)的起始位置開(kāi)始讀取消息赠叼,或者直接跳到分區(qū)的末尾開(kāi)始讀取消息封恰, 可以使 用 seekToBeginning(Collection<TopicPartition> tp) 和 seekToEnd(Collection<TopicPartition> tp) 這兩個(gè)方法彪杉。

不過(guò), Kafka也為我們提供了用 于查找特定偏移量的 API蹦掐。 它有很多用途葛家,比如向 后回退 幾個(gè)消息或者向前跳過(guò)幾個(gè)消息(對(duì)時(shí)間比較敏感的應(yīng)用程序在處理滯后的情況下希望能 夠向前跳過(guò)若干個(gè)消息)户辞。在使用 Kafka 以外的系統(tǒng)來(lái)存儲(chǔ)偏移量時(shí),它將給我們 帶來(lái)更 大的驚喜癞谒。

試想一下這樣的場(chǎng)景:應(yīng)用程序從 Kafka讀取事件(可能是網(wǎng)站的用戶點(diǎn)擊事件流 )底燎,對(duì) 它們進(jìn)行處理(可能是使用自動(dòng)程序清理點(diǎn)擊操作井添加會(huì)話信息),然后把結(jié)果保 存到 數(shù)據(jù)庫(kù)弹砚、 NoSQL 存儲(chǔ)引擎或 Hadoop双仍。假設(shè)我們真的不想丟失任何數(shù)據(jù),也不想在數(shù)據(jù)庫(kù) 里多次保存相同的結(jié)果桌吃。

這種情況下朱沃,消費(fèi)者的代碼可能是這樣的 :

在這個(gè)例子里,每處理一條記錄就提交一次偏移量茅诱。盡管如此逗物, 在記錄被保存到數(shù)據(jù)庫(kù)之后以及偏移量被提交之前 ,應(yīng)用程序仍然有可能發(fā)生崩潰瑟俭,導(dǎo)致重復(fù)處理數(shù)據(jù)翎卓,數(shù)據(jù)庫(kù)里就會(huì)出現(xiàn)重復(fù)記錄。

如果保存記錄和偏移量可以在一個(gè)原子操作里完成摆寄,就可以避免出現(xiàn)上述情況失暴。記錄和偏 移量要么 都被成功提交坯门,要么都不提交。如果記錄是保存在數(shù)據(jù)庫(kù)里而偏移量是提交到 Kafka 上逗扒,那么就無(wú)法實(shí)現(xiàn)原子操作古戴。

不過(guò) ,如果在同一個(gè)事務(wù)里把記錄和偏移量都寫到數(shù)據(jù)庫(kù)里會(huì)怎樣呢矩肩?那么我們就會(huì)知道 記錄和偏移量要么都成功提交现恼,要么都沒(méi)有,然后重新處理記錄蛮拔。

現(xiàn)在的問(wèn)題是:如果偏移量是保存在數(shù)據(jù)庫(kù)里而不是 Kafka里述暂,那么消費(fèi)者在得到新分區(qū) 時(shí)怎么知道該從哪里開(kāi)始讀取?這個(gè)時(shí)候可以使用 seek() 方法。在消費(fèi)者啟動(dòng)或分配到新 分區(qū)時(shí) 建炫,可以使用 seek()方法查找保存在數(shù)據(jù)庫(kù)里的偏移量。

下面的例子大致說(shuō)明了如何使用這個(gè) API疼蛾。 使用 ConsumerRebalancelistener和 seek() 方 戰(zhàn)確保我們是從數(shù)據(jù)庫(kù)里保存的偏移量所指定的位置開(kāi)始處理消息的肛跌。

通過(guò)把偏移量和記錄保存到同 一個(gè)外部系統(tǒng)來(lái)實(shí)現(xiàn)單次語(yǔ)義可以有很多種方式,不過(guò)它們 都需要結(jié)合使用 ConsumerRebalancelistener和 seek() 方法來(lái)確保能夠及時(shí)保存偏移量察郁, 井保證消費(fèi)者總是能夠從正確的位置開(kāi)始讀取消息衍慎。


如何退出

在之前討論輪詢時(shí)就說(shuō)過(guò),不需要擔(dān)心消費(fèi)者會(huì)在一個(gè)無(wú)限循環(huán)里輪詢消息皮钠,我們會(huì)告訴消費(fèi)者如何優(yōu)雅地退出循環(huán)稳捆。

如果確定要退出循環(huán),需要通過(guò)另一個(gè)線程調(diào)用 consumer.wakeup()方法麦轰。如果循環(huán)運(yùn)行 在主線程里乔夯,可以在 ShutdownHook里調(diào)用該方法。要記住款侵, consumer.wakeup() 是消費(fèi)者 唯一一個(gè)可以從其他線程里安全調(diào)用的方法末荐。調(diào)用 consumer.wakeup()可以退出 poll(), 并拋出 WakeupException異常,或者如果調(diào)用 cconsumer.wakeup() 時(shí)線程沒(méi)有等待輪詢新锈, 那 么異常將在下一輪調(diào)用 poll()時(shí)拋出甲脏。我們不需要處理 WakeupException,因?yàn)樗皇怯糜谔鲅h(huán)的一種方式妹笆。不過(guò)块请, 在退出線程之前調(diào)用 consumer.close()是很有必要的, 它 會(huì)提交任何還沒(méi)有提交的東西 拳缠, 并向群組協(xié)調(diào)器(broker)發(fā)送消息墩新,告知自己要離開(kāi)群組,接下來(lái) 就會(huì)觸發(fā)再均衡 脊凰,而不需要等待會(huì)話超時(shí)抖棘。

下面是運(yùn)行在主線程上的消費(fèi)者退出線程的代碼 茂腥。 這些代碼經(jīng)過(guò)了簡(jiǎn)化,你可以在這里查 看完整的代碼: http://bit.ly/2u47e9A切省。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末最岗,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子朝捆,更是在濱河造成了極大的恐慌般渡,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,755評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件芙盘,死亡現(xiàn)場(chǎng)離奇詭異驯用,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)儒老,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門蝴乔,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人驮樊,你說(shuō)我怎么就攤上這事薇正。” “怎么了囚衔?”我有些...
    開(kāi)封第一講書人閱讀 165,138評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵挖腰,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我练湿,道長(zhǎng)猴仑,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書人閱讀 58,791評(píng)論 1 295
  • 正文 為了忘掉前任肥哎,我火速辦了婚禮辽俗,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘贤姆。我一直安慰自己榆苞,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布霞捡。 她就那樣靜靜地躺著坐漏,像睡著了一般。 火紅的嫁衣襯著肌膚如雪碧信。 梳的紋絲不亂的頭發(fā)上赊琳,一...
    開(kāi)封第一講書人閱讀 51,631評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音砰碴,去河邊找鬼躏筏。 笑死,一個(gè)胖子當(dāng)著我的面吹牛呈枉,可吹牛的內(nèi)容都是我干的趁尼。 我是一名探鬼主播埃碱,決...
    沈念sama閱讀 40,362評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼酥泞!你這毒婦竟也來(lái)了砚殿?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 39,264評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤芝囤,失蹤者是張志新(化名)和其女友劉穎似炎,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體悯姊,經(jīng)...
    沈念sama閱讀 45,724評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡羡藐,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了悯许。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片仆嗦。...
    茶點(diǎn)故事閱讀 40,040評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖岸晦,靈堂內(nèi)的尸體忽然破棺而出欧啤,到底是詐尸還是另有隱情,我是刑警寧澤启上,帶...
    沈念sama閱讀 35,742評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站店印,受9級(jí)特大地震影響冈在,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜按摘,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評(píng)論 3 330
  • 文/蒙蒙 一包券、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧炫贤,春花似錦溅固、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,944評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至掠河,卻和暖如春亮元,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背唠摹。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,060評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工爆捞, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人勾拉。 一個(gè)月前我還...
    沈念sama閱讀 48,247評(píng)論 3 371
  • 正文 我出身青樓煮甥,卻偏偏與公主長(zhǎng)得像盗温,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子成肘,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容