Kafka(4)消費(fèi)者客戶端

一龙优、前言

Kafka不僅提供了生產(chǎn)者客戶端愕贡,同時(shí)也提供了消費(fèi)者客戶端(Cosumer API)济舆。應(yīng)用程序通過消費(fèi)者客戶端來訂閱主題抵怎,然后向broker發(fā)送拉取請(qǐng)求浓恶,獲取想要消費(fèi)的主題分區(qū)消息進(jìn)行消費(fèi)玫坛。本文接下來將對(duì)Kafka Java版消費(fèi)者客戶端相關(guān)知識(shí)進(jìn)行講解。

二包晰、重要概念

2.1 消費(fèi)模式

消費(fèi)模式
  • 消費(fèi)模式分為兩種:推送(push)模式和拉仁啤(pull)模式。
  • 推送模式是由broker將數(shù)據(jù)推送到消費(fèi)者伐憾,實(shí)時(shí)性好勉痴,可以讓消費(fèi)者能夠以可能的最大速率消費(fèi);不過树肃,由于broker控制著數(shù)據(jù)傳輸速率蒸矛,不同的消費(fèi)者消費(fèi)速率可能相差很大,當(dāng)消費(fèi)速率低于生產(chǎn)速率時(shí)胸嘴,消費(fèi)者系統(tǒng)消費(fèi)不及時(shí)雏掠,會(huì)出現(xiàn)服務(wù)拒絕, 所以推送模式系統(tǒng)很難處理不同消費(fèi)速率的消費(fèi)者劣像。一般采用推送模式的系統(tǒng)乡话,需要設(shè)置一定的流控規(guī)則來避免大流量壓垮消費(fèi)者。
  • 拉取模式是消費(fèi)者主動(dòng)從broker那里拉取數(shù)據(jù)驾讲,消費(fèi)速率由消費(fèi)者控制蚊伞,可以避免出現(xiàn)推送模式那種超載壓垮消費(fèi)者的情況。不足的地方是吮铭,如果broker中沒有數(shù)據(jù)时迫,消費(fèi)者可能會(huì)在一個(gè)緊密的循環(huán)中結(jié)束輪詢,然后處于busy-waiting狀態(tài)谓晌,直到數(shù)據(jù)到來掠拳。
  • Kafka采用pull模式。消費(fèi)者主動(dòng)從broker那里拉取數(shù)據(jù)進(jìn)行消費(fèi)纸肉。為了避免busy-waiting溺欧,Kafka在pull請(qǐng)求中加入?yún)?shù)喊熟,使得消費(fèi)者在一個(gè)“l(fā)ong pull”中阻塞等待,直到數(shù)據(jù)到來姐刁。
  • Kafka 消費(fèi)者可以通過pull模式在需要的時(shí)候通過回退位置再次消費(fèi)對(duì)應(yīng)的數(shù)據(jù)芥牌。

2.2 消費(fèi)組

消費(fèi)組與消費(fèi)者、分區(qū)關(guān)系區(qū)
  • Kafka每個(gè)消費(fèi)者都有一個(gè)對(duì)應(yīng)的消費(fèi)組聂使。消費(fèi)組是一個(gè)邏輯概念壁拉,用來對(duì)消費(fèi)者進(jìn)行歸類。
  • 如果多個(gè)消費(fèi)者屬于同一個(gè)消費(fèi)組柏靶,那么每條消息只會(huì)被其中一個(gè)消費(fèi)者處理弃理,相當(dāng)于點(diǎn)對(duì)點(diǎn)模式應(yīng)用。
  • 如果多個(gè)消費(fèi)者不屬于同一個(gè)消費(fèi)組屎蜓,那么每條消息會(huì)被其中每個(gè)消費(fèi)者都處理痘昌,相當(dāng)于訂閱/發(fā)布模式應(yīng)用。
  • 同一個(gè)消費(fèi)組內(nèi)的消費(fèi)者與分區(qū)是對(duì)應(yīng)的炬转,每一個(gè)分區(qū)只能被一個(gè)消費(fèi)組中的一個(gè)消費(fèi)者所消費(fèi)辆苔。如果同一個(gè)消費(fèi)組的消費(fèi)者個(gè)數(shù)大于主題分區(qū)數(shù),則會(huì)存在有消費(fèi)者不能拉取處理這個(gè)主題消息的情況返吻,如圖消費(fèi)者C4無法消費(fèi)主題Topic1的任何消息姑子。
  • 舉個(gè)例子乎婿,假如主題Topic1有消息P0M1测僵、P1M1、P2M2谢翎,消費(fèi)組A的消費(fèi)者C0會(huì)處理P0M1捍靠,消費(fèi)者C1會(huì)處理P1M1,消費(fèi)者C2會(huì)處理P2M2森逮,消費(fèi)組A其他消費(fèi)者不能處理這幾條消息榨婆,但消費(fèi)組B內(nèi)的消費(fèi)者可以消費(fèi)處理這三條消息。
  • 消費(fèi)組消費(fèi)者模型可以讓整體的消費(fèi)能力具有橫向伸縮性褒侧,大大提高了系統(tǒng)消費(fèi)的靈活性良风。

2.3 分區(qū)自動(dòng)再均衡

  • 在多個(gè)消費(fèi)者的情況下可以根據(jù)分區(qū)分配策略來自動(dòng)分配各個(gè)消費(fèi)者與分區(qū)的關(guān)系。當(dāng)消費(fèi)組內(nèi)的消費(fèi)者增加或減少時(shí)闷供,分區(qū)分配關(guān)系會(huì)自動(dòng)調(diào)整烟央,以實(shí)現(xiàn)消費(fèi)負(fù)載均衡及故障自動(dòng)轉(zhuǎn)移。

三歪脏、消費(fèi)消息過程分析

3.1 核心類KafkaConsumer

  • KafkaConsumer類是整個(gè)消費(fèi)者API的核心類疑俭。應(yīng)用程序消費(fèi)消息的主要操作都需要通過它來完成。它是一個(gè)非線程安全的類婿失。

3.2 消費(fèi)消息主要步驟

消費(fèi)者消費(fèi)消息主要步驟
  • 消費(fèi)者消費(fèi)消息主要步驟如上圖所示钞艇,包括設(shè)置消費(fèi)端配置參數(shù)啄寡、創(chuàng)建KafkaConsumer實(shí)例、訂閱主題哩照、拉取主題分區(qū)消息數(shù)據(jù)挺物、提交位移給broker、關(guān)閉KafkaConsumer資源等飘弧。代碼邏輯演示如下:
public static void main(String[] args) {
        //1 設(shè)置配置參數(shù)
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "messageGroup");
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //2 創(chuàng)建KafkaConsumer實(shí)例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //3 訂閱主題
        consumer.subscribe(Arrays.asList("topic_test_1"));
        try {
            while(true) {
                //4 拉取主題分區(qū)消息數(shù)據(jù)
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                    for (ConsumerRecord<String, String> record : partitionRecords) {
                        // 業(yè)務(wù)處理姻乓。。眯牧。
                        System.out.println(record.offset() + ": " + record.value());
                    }
                    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                    //5 提交消費(fèi)位移(最后一個(gè)offset+1)
                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                }
            }
        } finally {
            //6 關(guān)閉KafkaConsumer實(shí)例
            consumer.close();
        }
    }

3.2 必要的配置參數(shù)

  • bootstrap.servers:與生產(chǎn)者客戶端此參數(shù)作用類似蹋岩。用于建立初始連接到Kafka集群(主機(jī)/端口對(duì))的配置列表。這個(gè)配置不需要包含整個(gè)集群的服務(wù)器学少。不論這個(gè)參數(shù)配置了哪些服務(wù)器來初始化連接剪个,客戶端都是會(huì)均衡地與集群中的所有服務(wù)器建立連接。如果集群變化版确,元數(shù)據(jù)會(huì)動(dòng)態(tài)更新扣囊。為了避免單節(jié)點(diǎn)風(fēng)險(xiǎn),最好配置多臺(tái)主機(jī)绒疗。
  • group.id:消費(fèi)組id侵歇,用來標(biāo)識(shí)消費(fèi)者所屬的消費(fèi)者組的唯一字符串。
  • enable.auto.commit:是否自動(dòng)提交位移吓蘑,默認(rèn)為true惕虑,如果為true,表示消費(fèi)者的位移將定期在后臺(tái)提交磨镶±D瑁可設(shè)置為手動(dòng)提交消費(fèi)位移。
  • key.deserializer:key的反序列化器類琳猫,需要與生產(chǎn)端的key序列化器類對(duì)應(yīng)伟叛。從broker拉取的消息格式是字節(jié)數(shù)組類型的,所以需要相應(yīng)的反序列操作將消息還原成原有的數(shù)據(jù)類型脐嫂。
  • value.deserializer:value的反序列化器類统刮,需要與生產(chǎn)端的value序列化器類對(duì)應(yīng)。

3.3 訂閱主題

  • 消費(fèi)者可以一次訂閱若干個(gè)主題账千。既可以集合形式訂閱多個(gè)主題侥蒙,也可以以正則表達(dá)式形式訂閱特定模式的主題,還可以訂閱主題的特定分區(qū)蕊爵。KafkaConsumer提供的訂閱主題方法如下:
// 集合形式訂閱主題
void subscribe(Collection<String> topics);
// 集合形式訂閱主題辉哥,增加再均衡監(jiān)聽
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
// 正則表達(dá)式形式訂閱主題
void subscribe(Pattern pattern);
// 正則表達(dá)式形式訂閱主題,增加再均衡監(jiān)聽
void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
// 手工分配特定分區(qū),沒有自動(dòng)再均衡功能
void assign(Collection<TopicPartition> partitions);
  • 訂閱主題后醋旦,也可取消主題訂閱恒水。KafkaConsumer提供的方法如下:
void unsubscribe();

3.4 拉取數(shù)據(jù)

  • 訂閱閱主題分區(qū)后,才可以拉取數(shù)據(jù)饲齐,Kafka采用拉榷ち琛(pull)模式,主動(dòng)從broker那里拉取數(shù)據(jù)進(jìn)行消費(fèi)捂人。應(yīng)用程序需要不斷輪詢調(diào)用poll()方法御雕,在每次輪詢中,消費(fèi)者將嘗試使用最后消費(fèi)位移+1作為開始偏移量滥搭,并按順序獲取酸纲。最后消費(fèi)位移可以調(diào)用seek()方法手工覆蓋。
  • poll()方法有個(gè)timeout參數(shù)瑟匆,用來表示超時(shí)阻塞的最大時(shí)間闽坡。KafkaConsumer提供的方法如下:
// 從broker拉取數(shù)據(jù)
ConsumerRecords<K, V> poll(Duration timeout);
// 覆蓋消費(fèi)者將在下一次輪詢中使用的最后消費(fèi)位移
void seek(TopicPartition partition, long offset);

3.5 提交位移

  • 拉取數(shù)據(jù)進(jìn)行消費(fèi)處理后,需要將所有主題分區(qū)列表的消費(fèi)位移提交給broker愁溜,然后將消費(fèi)位移存儲(chǔ)在Kafka內(nèi)部的主題__consumer_offsets中疾嗅。
  • 提交的位移應(yīng)該是應(yīng)用程序?qū)⒁褂玫南乱粭l消息,即最后消費(fèi)位移 + 1冕象;
  • Kafka提交位移的方式分為自動(dòng)提交和手工提交代承。默認(rèn)為自動(dòng)提交方式。在默認(rèn)的方式下渐扮,消費(fèi)者每隔5秒會(huì)將拉取到的每個(gè)分區(qū)中最大的消息位移進(jìn)行提交论悴。自動(dòng)提交間隔時(shí)間可通過參數(shù)(auto.commit.interval.ms)可修改。
  • 自動(dòng)提交位移在一些非正常情況下會(huì)出現(xiàn)重復(fù)消費(fèi)和丟失消息的現(xiàn)象席爽;假設(shè)在拉取一批消息消費(fèi)后意荤,還未自動(dòng)提交位移之前啊片,消費(fèi)者奔潰了只锻,那么又會(huì)從上一次消費(fèi)位移處拉取數(shù)據(jù),這樣便出現(xiàn)了重復(fù)消費(fèi)的現(xiàn)象紫谷。
    雖然不能完全避免重復(fù)消費(fèi)現(xiàn)象齐饮,但是我們可以通過縮小位移提交時(shí)間間隔來減少重復(fù)消費(fèi)的消息區(qū)間。
  • 手動(dòng)提交分為同步提交和異步提交笤昨。KafkaConsumer提供的方法如下:
void commitSync();
void commitSync(Duration timeout);
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout);
void commitAsync();
void commitAsync(OffsetCommitCallback callback);
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);

3.6 優(yōu)雅結(jié)束消費(fèi)

  • 在消費(fèi)完成后祖驱,如何優(yōu)雅的跳出輪詢,結(jié)束消費(fèi)呢瞒窒,KafkaConsumer提供了wakeup()方法捺僻,這個(gè)方法是線程安全的。調(diào)用wakeup()方法的線程將拋出WakeupException異常,終止長(zhǎng)輪詢匕坯。跳出輪詢后束昵,需要顯式調(diào)用close()方法關(guān)閉動(dòng)作來釋放系統(tǒng)占用資源,包括內(nèi)存資源葛峻,socket網(wǎng)絡(luò)連接資源锹雏。

3.7 指定位移消費(fèi)

  • 在消費(fèi)者遇到關(guān)閉、崩潰以及再均衡時(shí)术奖,可以讓其他接替的消費(fèi)者通過seek()方法設(shè)置指定位移來保證消費(fèi)的連續(xù)礁遵。
  • 當(dāng)Kafka中沒有初始消費(fèi)位移或者服務(wù)器上的當(dāng)前消費(fèi)位移不再存在時(shí)(例如數(shù)據(jù)已經(jīng)被刪除),默認(rèn)會(huì)采用自動(dòng)重置位移為最后的位移進(jìn)行消費(fèi)采记∮赌停可以通過參數(shù)auto.offset.reset修改重置方式。

四唧龄、重要配置參數(shù)

消費(fèi)者客戶端有一些重要的參數(shù)晰赞,掌握這些參數(shù)有助于我們?cè)趯?shí)際應(yīng)用中的故障排查和性能調(diào)優(yōu)。

fetch.min.bytes
  • 描述:服務(wù)器應(yīng)該為消費(fèi)者拉取請(qǐng)求返回的最小數(shù)據(jù)量选侨。如果可用數(shù)據(jù)不足掖鱼,請(qǐng)求將等待大量數(shù)據(jù)累積,然后再響應(yīng)請(qǐng)求援制。默認(rèn)1字節(jié)戏挡,意味著拉取請(qǐng)求很快會(huì)被響應(yīng)。將這個(gè)參數(shù)適當(dāng)調(diào)大可以提高服務(wù)器的吞吐量晨仑,但會(huì)導(dǎo)致服務(wù)器等待更大數(shù)量的數(shù)據(jù)積累褐墅,增加一些響應(yīng)延遲。
  • 類型:int洪己。
  • 默認(rèn)值:1妥凳。
fetch.max.bytes
  • 描述:服務(wù)器應(yīng)該為消費(fèi)者拉取請(qǐng)求返回的最大數(shù)據(jù)量。消息記錄是由消費(fèi)者分批拉取的答捕,如果拉取的第一個(gè)非空分區(qū)中的第一個(gè)消息記錄Batch字節(jié)數(shù)大于此值逝钥,那么仍將返回該消息記錄Batch,以確保消費(fèi)者能夠繼續(xù)執(zhí)行拱镐。因此艘款,這不是一個(gè)絕對(duì)的最大值。
  • 類型:int沃琅。
  • 默認(rèn)值:52428800哗咆。
max.partition.fetch.bytes
  • 描述:服務(wù)器將返回的每個(gè)分區(qū)的最大數(shù)據(jù)量。記錄由消費(fèi)者分批拉取益眉。如果拉取的第一個(gè)非空分區(qū)中的第一個(gè)消息記錄Batch字節(jié)數(shù)大于此值晌柬,那么仍將返回該消息記錄Batch姥份,以確保消費(fèi)者能夠繼續(xù)執(zhí)行。注意年碘,該配置需結(jié)合 fetch.max.bytes以及兩個(gè)主題配置message.max.bytes 和 max.message.bytes使用殿衰。
  • 類型:int。
  • 默認(rèn)值:1048576盛泡。
auto.offset.reset
  • 描述:位移重置方式闷祥。當(dāng)Kafka中沒有初始消費(fèi)位移或者服務(wù)器上的當(dāng)前消費(fèi)位移不再存在時(shí)(例如數(shù)據(jù)已經(jīng)被刪除)場(chǎng)景下使用,有下列幾種方式:
    (1)earliest: 自動(dòng)重置位移到最早位移位置
    (2)latest: 自動(dòng)重置位移為最后位移位置
    (3)none: 如果沒有找到消費(fèi)者之前位移傲诵,那么向消費(fèi)者直接拋出異常
    (4)其他: 向消費(fèi)者拋出異常凯砍。
  • 類型:string。
  • 默認(rèn)值:latest拴竹。
isolation.level
  • 描述:控制如何讀取以事務(wù)方式編寫的消息悟衩。如果設(shè)置為read_committed, consumer.poll()將只返回已提交的事務(wù)性消息栓拜。如果設(shè)置為read_uncommitted(默認(rèn)值)座泳,consumer.poll()將返回所有消息,甚至是已經(jīng)中止的事務(wù)性消息幕与。如果是非事務(wù)性消息挑势,在這兩種模式都會(huì)無條件返回。
  • 類型:string啦鸣。
  • 默認(rèn)值:read_uncommitted潮饱。
session.timeout.ms
  • 描述:在使用Kafka的組管理工具時(shí),用于檢測(cè)用戶故障的超時(shí)時(shí)間诫给。消費(fèi)者定期向broker發(fā)送心跳香拉,以表明其活動(dòng)狀態(tài)。如果在此會(huì)話超時(shí)過期之前broker沒有接收到心跳中狂,則broker將從消費(fèi)組中刪除此消費(fèi)者并啟動(dòng)重新平衡凫碌。注意,該值必須在broker配置中按group.min.session.timeout和group.max.session.timeout.ms配置的允許范圍內(nèi)胃榕。
  • 類型:int盛险。
  • 默認(rèn)值:10000。
heartbeat.interval.ms
  • 描述:在使用Kafka的組管理工具時(shí)勤晚,心跳到消費(fèi)者協(xié)調(diào)器之間的間隔時(shí)間枉层。心跳用于確保消費(fèi)者的會(huì)話保持活動(dòng)狀態(tài),并在新消費(fèi)者加入或離開消費(fèi)組時(shí)幫助重新平衡赐写。該值必須設(shè)置低于session.timeout.ms,通常設(shè)置不超過1/3膜赃。它可以調(diào)整更低挺邀,用來控制正常重新平衡的間隔時(shí)間。
  • 類型:int。
  • 默認(rèn)值:3000端铛。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
禁止轉(zhuǎn)載泣矛,如需轉(zhuǎn)載請(qǐng)通過簡(jiǎn)信或評(píng)論聯(lián)系作者。
  • 序言:七十年代末禾蚕,一起剝皮案震驚了整個(gè)濱河市您朽,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌换淆,老刑警劉巖哗总,帶你破解...
    沈念sama閱讀 216,372評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異倍试,居然都是意外死亡讯屈,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門县习,熙熙樓的掌柜王于貴愁眉苦臉地迎上來涮母,“玉大人,你說我怎么就攤上這事躁愿∨驯荆” “怎么了?”我有些...
    開封第一講書人閱讀 162,415評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵彤钟,是天一觀的道長(zhǎng)炮赦。 經(jīng)常有香客問我,道長(zhǎng)样勃,這世上最難降的妖魔是什么吠勘? 我笑而不...
    開封第一講書人閱讀 58,157評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮峡眶,結(jié)果婚禮上剧防,老公的妹妹穿的比我還像新娘。我一直安慰自己辫樱,他們只是感情好峭拘,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評(píng)論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著狮暑,像睡著了一般鸡挠。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上搬男,一...
    開封第一講書人閱讀 51,125評(píng)論 1 297
  • 那天拣展,我揣著相機(jī)與錄音,去河邊找鬼缔逛。 笑死备埃,一個(gè)胖子當(dāng)著我的面吹牛姓惑,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播按脚,決...
    沈念sama閱讀 40,028評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼于毙,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了辅搬?” 一聲冷哼從身側(cè)響起唯沮,我...
    開封第一講書人閱讀 38,887評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎堪遂,沒想到半個(gè)月后介蛉,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,310評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡蚤氏,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評(píng)論 2 332
  • 正文 我和宋清朗相戀三年甘耿,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片竿滨。...
    茶點(diǎn)故事閱讀 39,690評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡佳恬,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出于游,到底是詐尸還是另有隱情毁葱,我是刑警寧澤,帶...
    沈念sama閱讀 35,411評(píng)論 5 343
  • 正文 年R本政府宣布贰剥,位于F島的核電站倾剿,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏蚌成。R本人自食惡果不足惜前痘,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評(píng)論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望担忧。 院中可真熱鬧芹缔,春花似錦、人聲如沸瓶盛。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽惩猫。三九已至芝硬,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間轧房,已是汗流浹背拌阴。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評(píng)論 1 268
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留锯厢,地道東北人皮官。 一個(gè)月前我還...
    沈念sama閱讀 47,693評(píng)論 2 368
  • 正文 我出身青樓脯倒,卻偏偏與公主長(zhǎng)得像实辑,于是被迫代替她去往敵國(guó)和親捺氢。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容