Kafka 概念及原理深度分析

中間件:Kafka
關(guān)鍵字:Kafka文件機(jī)制,Kafka分區(qū),Kafka數(shù)據(jù)可靠性劳坑,Kafka Ack等
注:本文是作者學(xué)習(xí)Kafka時(shí)得筆記和經(jīng)驗(yàn)總結(jié)

Kafka特性總覽:

  • 消息隊(duì)列的性能好壞言津,其文件存儲(chǔ)機(jī)制設(shè)計(jì)是衡量一個(gè)消息隊(duì)列服務(wù)技術(shù)水平和最關(guān)鍵指標(biāo)之一
  • Kafka的特性
    1. 高吞吐量进副、低延遲 :kafka每秒可以處理幾十萬條消息这揣,它的延遲最低只有幾毫秒悔常,每個(gè)topic可以分多個(gè)partition, consumer group 對(duì)partition進(jìn)行consume操作。
    2. 可擴(kuò)展性:kafka 分區(qū)擴(kuò)展给赞,consumer 擴(kuò)展机打。
    3. 持久性、可靠性:消息被持久化到本地磁盤片迅,并且支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失残邀。
    4. 容錯(cuò)性:允許集群中節(jié)點(diǎn)失敗(若副本數(shù)量為n,則允許n-1個(gè)節(jié)點(diǎn)失敻躺摺)芥挣。
    5. 高并發(fā):支持?jǐn)?shù)千個(gè)客戶端同時(shí)讀寫。
  • Kafka得使用場(chǎng)景
    1. 日志收集:一個(gè)公司用Kafka可以收集各種服務(wù)的log耻台,通過kafka以統(tǒng)一接口服務(wù)的方式開放給各種consumer空免,例如hadoop、Solr,ES等盆耽。
    2. 消息系統(tǒng):解耦生產(chǎn)者和消費(fèi)者蹋砚,降低系統(tǒng)壓力、緩存消息等摄杂。
    3. 流式處理:比如Spark streaming和Storm
  • Kafka得設(shè)計(jì)思想
    1. Kafka的集群拓?fù)浣Y(jié)構(gòu) :
      圖片摘自其他網(wǎng)站坝咐,若涉及到侵權(quán),請(qǐng)聯(lián)系作者及時(shí)刪除
    2. Kakfa Broker Leader的選舉
      ① ??Kakfa Broker集群受Zookeeper管理析恢。所有的Kafka Broker節(jié)點(diǎn)一起去Zookeeper上注冊(cè)一個(gè)臨時(shí)節(jié)點(diǎn)墨坚,因?yàn)橹挥幸粋€(gè)Kafka Broker會(huì)注冊(cè)成功,其他的都會(huì)失敗映挂,所以這個(gè)成功在Zookeeper上注冊(cè)臨時(shí)節(jié)點(diǎn)的這個(gè)Kafka Broker會(huì)成為Kafka Broker Controller框杜,其他的Kafka broker叫Kafka Broker follower。(這個(gè)過程叫Controller在ZooKeeper注冊(cè)Watch)
      ② ??這個(gè)Controller會(huì)監(jiān)聽其他的Kafka Broker的所有信息袖肥,如果這個(gè)kafka broker controller宕機(jī)了咪辱,在zookeeper上面的那個(gè)臨時(shí)節(jié)點(diǎn)就會(huì)消失,所有的broker又會(huì)重復(fù)去注冊(cè)臨時(shí)節(jié)點(diǎn)。
      ③ ??leader會(huì)追蹤和維護(hù)ISR中所有follower的滯后狀態(tài)椎组。如果滯后太多(數(shù)量滯后和時(shí)間滯后兩個(gè)維度油狂,replica.lag.time.max.ms和replica.lag.max.message可配置),leader會(huì)把該replica從ISR中移除 ( follower 在 replica.lag.time.max.ms 時(shí)間內(nèi)一直落后 leader replica.lag.max.messages 條消息的時(shí)候才會(huì)被踢出)寸癌。被移除ISR的replica一直在追趕leader专筷。leader寫入數(shù)據(jù)后并不會(huì)commit,只有ISR列表中的所有folower同步之后才會(huì)commit蒸苇,把滯后的follower移除ISR主要是避免寫消息延遲磷蛹。
      ④ ??設(shè)置ISR主要是為了broker宕掉之后,重新選舉partition的leader從ISR列表中選擇溪烤。
      圖片摘自CSDN味咳,若涉及到侵權(quán)庇勃,請(qǐng)聯(lián)系作者及時(shí)刪除

      ⑤ ???? 所以,如果一個(gè)broker掛了槽驶,controller會(huì)讀取該broker上所有得分區(qū)在zookeeper上狀態(tài)责嚷,并選取ISR列表中得一個(gè)replica作為leader(如果ISR列表中的replica全掛,選一個(gè)不在ISR列表中的replica作為leader; 如果該partition的所有的replica都宕機(jī)了掂铐,則將新的leader設(shè)置為-1罕拂,等待恢復(fù),等待ISR中的任一個(gè)Replica“活”過來全陨,并且選它作為L(zhǎng)eader爆班;或選擇第一個(gè)“活”過來的Replica(不一定是ISR中)作為L(zhǎng)eader)。另外辱姨,controller還會(huì)通知 zookeeper 這個(gè)broker 宕機(jī)了柿菩,zookeeper會(huì)通知其他得broker。
    3. ??Consumer Rebalance的觸發(fā)條件 :
      Consumer增加或刪除會(huì)觸發(fā) Consumer Group的Rebalance
      Broker的增加或者減少會(huì)觸發(fā) Consumer Rebalance(分區(qū)副本發(fā)生改變)
    4. ??Kafka工作流程-高級(jí)消費(fèi)者和低級(jí)消費(fèi)者
      ① 高級(jí)消費(fèi):
      自動(dòng)負(fù)載均衡炮叶,高階消費(fèi)者為了簡(jiǎn)化編程碗旅,封裝了一系列 API渡处,這套 API 會(huì)均勻地將分區(qū)分配給消費(fèi)者 線程镜悉,消費(fèi)者消費(fèi)哪個(gè)分區(qū)不由消費(fèi)者決定,而是由高階 API 決定医瘫,如果有消費(fèi)者線程掛 掉了侣肄,高階 API 會(huì)檢測(cè)到,進(jìn)而進(jìn)行重新分配醇份。高階消費(fèi)者 API 將大部分功能已經(jīng)實(shí)現(xiàn)稼锅, 因此,編程者編寫高階消費(fèi)者的難度也隨之降低僚纷,不需要關(guān)注分區(qū)的分配矩距,只需要關(guān)注業(yè)務(wù)邏輯就行
      自動(dòng)提交offset:自動(dòng)提交時(shí),假設(shè) 1s 提交一次 offset 的更新怖竭,設(shè)當(dāng)前 offset=10锥债,當(dāng)消費(fèi)者消費(fèi)了 0.5s 的數(shù)據(jù),offset 移動(dòng)了 15痊臭,由于提交間隔為 1s哮肚,因此這一 offset 的更新并不會(huì)被提交,這時(shí)候我們寫的消費(fèi)者掛掉广匙,重啟后允趟,消費(fèi)者會(huì)去 ZooKeeper 上獲取讀取位置,獲取到的 offset 仍為 10鸦致,它就會(huì)重復(fù)消費(fèi)潮剪,這就是一個(gè)典型的重復(fù)消費(fèi)問題涣楷。
      低級(jí)消費(fèi):對(duì)于低階消費(fèi)者就不再有分區(qū)到消費(fèi)者之間的 API 中間層了,由消費(fèi)者直接找到分區(qū)進(jìn)行消費(fèi)鲁纠,即消費(fèi)者通過 ZooKeeper 找到指定分區(qū)的 Leader 在哪個(gè) broker 上总棵。首先,在 ZooKeeper 中能夠找到 Kafka 所有 topic 的分區(qū)列表改含,并且可以找到指定分區(qū)的 Leader 在哪個(gè) broker 上情龄。
    5. ??Topic & Partition
      ① Topic相當(dāng)于傳統(tǒng)消息系統(tǒng)MQ中的一個(gè)隊(duì)列queue,producer端發(fā)送的message必須指定是發(fā)送到哪個(gè)topic捍壤,但是不需要指定topic下的哪個(gè)partition骤视,因?yàn)閗afka會(huì)把收到的message進(jìn)行l(wèi)oad balance,均勻的分布在這個(gè)topic下的不同的partition上( hash(message) % [broker數(shù)量] )鹃觉。
      物理存儲(chǔ)上专酗,這個(gè)topic會(huì)分成一個(gè)或多個(gè)partition,每個(gè)partiton相當(dāng)于是一個(gè)子queue盗扇。在物理結(jié)構(gòu)上祷肯,每個(gè)partition對(duì)應(yīng)一個(gè)物理的目錄(文件夾),文件夾命名是[topicname][partition][序號(hào)]疗隶,一個(gè)topic可以有無數(shù)多的partition佑笋,根據(jù)業(yè)務(wù)需求和數(shù)據(jù)量來設(shè)置
      ③ 在kafka配置文件中可隨時(shí)更改num.partitions參數(shù)來配置更改topic的partition數(shù)量斑鼻,也可以通過命令修改分區(qū)數(shù)蒋纬。
      當(dāng)add a new partition的時(shí)候,partition里面的message不會(huì)重新進(jìn)行分配坚弱,原來的partition里面的message數(shù)據(jù)不會(huì)變,新加的這個(gè)partition剛開始是空的蜀备,隨后進(jìn)入這個(gè)topic的message就會(huì)重新參與所有partition的load balance。
    6. ??Partition Replica :
      每個(gè)partition可以在其他的kafka broker節(jié)點(diǎn)上存副本荒叶,以便某個(gè)kafka broker節(jié)點(diǎn)宕機(jī)不會(huì)影響這個(gè)kafka集群碾阁。存replica副本的方式是按照kafka broker的順序存
      replica副本數(shù)目不能大于kafka broker節(jié)點(diǎn)的數(shù)目些楣,否則報(bào)錯(cuò)脂凶。這里的replica數(shù)其實(shí)就是partition的副本總數(shù),其中包括一個(gè)leader戈毒,其他的就是copy副本
    7. ??Topic分配partition和partition replica的算法 :
      將Broker(size=n)和待分配的Partition排序艰猬。
      將第 i 個(gè)Partition分配到第(i%n)個(gè)Broker上
      將第 i 個(gè)Partition的第 j 個(gè)Replica分配到第((i + j) % n)個(gè)Broker上埋市。
  • ??消息投遞可靠性
    1. Partition ack :producer 生產(chǎn)消息被認(rèn)為完成時(shí)得確認(rèn)值:
      ① 0:不等待broker得確認(rèn)信息冠桃,延遲最小(消費(fèi)可靠性不能保證)
      ② 1:leader收到了確認(rèn)信息
      ③ -1:ISR列表中得所有replica都返回確認(rèn)消息
  • Kafka高吞吐量 :
    1. Kafka的高吞吐量體現(xiàn)在讀寫上道宅,分布式并發(fā)的讀和寫都非呈程快胸蛛,寫的性能體現(xiàn)在以O(shè)(1)的時(shí)間復(fù)雜度進(jìn)行順序?qū)懭?/strong>。讀的性能體現(xiàn)在以O(shè)(1)的時(shí)間復(fù)雜度進(jìn)行順序讀取樱报,對(duì)topic進(jìn)行partition分區(qū)葬项,consume group中的consume線程可以以很高能性能進(jìn)行順序讀。
  • 批量發(fā)送
    1. Kafka支持以消息集合為單位進(jìn)行批量發(fā)送迹蛤,以提高push效率民珍。
    2. push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息盗飒,consumer只管從broker pull消息嚷量,兩者對(duì)消息的生產(chǎn)和消費(fèi)是異步的。
    3. Kafka集群中broker之間的關(guān)系 : 不是主從關(guān)系逆趣,各個(gè)broker在集群中地位一樣蝶溶,我們可以隨意的增加或刪除任何一個(gè)broker節(jié)點(diǎn)。
    4. Producer采用異步push方式宣渗,極大提高Kafka系統(tǒng)的吞吐率(可以通過參數(shù)控制是采用同步還是異步方式)蕉朵。
  • 順序保證性 :
    1. Kafka只能保證同一分區(qū)得消息順序性捣郊,并且需要消費(fèi)端單線程消費(fèi)究反。正常情況下舟奠,消費(fèi)端會(huì)多線程處理業(yè)務(wù)邏輯來提供系統(tǒng)性能,若是多線程消費(fèi)還想保證消息得順序性咐蝇,可以使用spring-kafka來實(shí)現(xiàn)涯鲁,讀者具體可自行查閱 spring-kafka得用法巷查。

kafka文件機(jī)制:

  • Partition Owner registry :
    1. 用來標(biāo)記partition正在被哪個(gè)consumer消費(fèi).臨時(shí)znode有序。此節(jié)點(diǎn)表達(dá)了"一個(gè)partition"只能被group下一個(gè)consumer消費(fèi),同時(shí)當(dāng)group下某個(gè)consumer失效,那么將會(huì)觸發(fā)負(fù)載均衡(即:讓partitions在多個(gè)consumer間均衡消費(fèi),接管那些"游離"的partitions)
  • ??持久化
    1. kafka使用文件存儲(chǔ)消息(append only log),這就直接決定kafka在性能上嚴(yán)重依賴文件系統(tǒng)的本身特性.且無論任何OS下,對(duì)文件系統(tǒng)本身的優(yōu)化是非常艱難的.文件緩存/直接內(nèi)存映射等是常用的手段.
    2. 因?yàn)閗afka是對(duì)日志文件進(jìn)行append操作,因此磁盤檢索的開支是較小的;同時(shí)為了減少磁盤寫入的次數(shù),broker會(huì)將消息暫時(shí)buffer起來,當(dāng)消息的個(gè)數(shù)(或尺寸)達(dá)到一定閥值時(shí),再flush到磁盤,這樣減少了磁盤IO調(diào)用的次數(shù).
    3. 對(duì)于kafka而言,較高性能的磁盤,將會(huì)帶來更加直接的性能提升岛请。
  • 性能
    1. 除磁盤IO之外,我們還需要考慮網(wǎng)絡(luò)IO,這直接關(guān)系到kafka的吞吐量問題.kafka并沒有提供太多高超的技巧;對(duì)于producer端,可以將消息buffer起來,當(dāng)消息的條數(shù)達(dá)到一定閥值時(shí),批量發(fā)送給broker;對(duì)于consumer端也是一樣,批量fetch多條消息.不過消息量的大小可以通過配置文件來指定. 對(duì)于kafka broker端,sendfile系統(tǒng)調(diào)用可以潛在的提升網(wǎng)絡(luò)IO的性能 : 將文件的數(shù)據(jù)映射到系統(tǒng)內(nèi)存中,socket直接讀取相應(yīng)的內(nèi)存區(qū)域即可,而無需進(jìn)程再次copy和交換( 這里涉及到"磁盤IO數(shù)據(jù)"/"內(nèi)核內(nèi)存"/"進(jìn)程內(nèi)存"/"網(wǎng)絡(luò)緩沖區(qū)",多者之間的數(shù)據(jù)copy)旭寿。
    2. 其實(shí)對(duì)于producer/consumer/broker三者而言,CPU的開支應(yīng)該都不大, 因此啟用消息壓縮機(jī)制是一個(gè)良好的策略;壓縮需要消耗少量的CPU資源,不過對(duì)于kafka而言,網(wǎng)絡(luò)IO更應(yīng)該需要考慮.可以將任何在網(wǎng)絡(luò)上傳輸?shù)南⒍冀?jīng)過壓縮. kafka支持gzip/snappy 等多種壓縮方式.
  • ??負(fù)載均衡 :
    1. kafka集群中的任何一個(gè)broker,都可以向producer提供metadata信息, 這些metadata中包含"集群中存活的servers列表"、"partitions leader列表" 等信息(請(qǐng)參看zookeeper中的節(jié)點(diǎn)信息)崇败。
    2. 當(dāng)producer獲取到metadata信息之后, producer將會(huì)和Topic下所有partition leader保持socket連接;消息由producer直接通過socket發(fā)送到broker,中間不會(huì)經(jīng)過任何"路由層"盅称。
    3. 異步發(fā)送,將多條消息暫且在客戶端buffer起來,并將他們批量發(fā)送到broker;小數(shù)據(jù)IO太多,會(huì)拖慢整體的網(wǎng)絡(luò)延遲,批量延遲發(fā)送事實(shí)上提升了網(wǎng)絡(luò)效率;不過這也有一定的隱患,比如當(dāng)producer失效時(shí),那些尚未發(fā)送的消息將會(huì)丟失后室。
  • Topic模型
    1. 在kafka中,partition中的消息只有一個(gè)consumer在消費(fèi),且不存在消息狀態(tài)的控制,也沒有復(fù)雜的消息確認(rèn)機(jī)制,可見kafka broker端是相當(dāng)輕量級(jí)的.當(dāng)消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并間歇性的向zookeeper提交offset.由此可見,consumer客戶端也很輕量級(jí)缩膝。松弛得控制就會(huì)引起重復(fù)消費(fèi)與消息丟失等問題。
    2. kafka中consumer負(fù)責(zé)維護(hù)消息的消費(fèi)記錄,而broker則不關(guān)心這些,這種設(shè)計(jì)不僅提高了consumer端的靈活性,也適度的減輕了broker端設(shè)計(jì)的復(fù)雜度;這是和眾多JMS prodiver的區(qū)別.
  • ??log
    1. 每個(gè)log entry格式為"4個(gè)字節(jié)的數(shù)字N表示消息的長(zhǎng)度" + "N個(gè)字節(jié)的消息內(nèi)容"; 每個(gè)日志都有一個(gè)offset來唯一的標(biāo)記一條消息, offset的值為8個(gè)字節(jié)的數(shù)字,表示此消息在此partition中所處的起始位置.
    2. 每個(gè)partition在物理存儲(chǔ)層面由多個(gè)log file組成(稱為segment) .segment file的命名為"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.
    3. 獲取消息時(shí),需要指定offset和最大chunk尺寸,offset用來表示消息的起始位置,chunk size用來表示最大獲取消息的總長(zhǎng)度(間接的表示消息的條數(shù)). 根據(jù)offset,可以找到此消息所在segment文件,然后根據(jù)segment的最小offset取差值,得到它在file中的相對(duì)位置,直接讀取輸出即可.
  • Partition Owner registry :
    1. 用來標(biāo)記partition正在被哪個(gè)consumer消費(fèi).臨時(shí)znode岸霹。此節(jié)點(diǎn)表達(dá)了"一個(gè)partition"只能被group下一個(gè)consumer消費(fèi),同時(shí)當(dāng)group下某個(gè)consumer失效,那么將會(huì)觸發(fā)負(fù)載均衡(即:讓partitions在多個(gè)consumer間均衡消費(fèi),接管那些"游離"的partitions)疾层。例如:當(dāng)consumer啟動(dòng)時(shí),所觸發(fā)的操作 :

A) 首先進(jìn)行"Consumer id Registry";
B) 然后在"Consumer id Registry"節(jié)點(diǎn)下注冊(cè)一個(gè)watch用來監(jiān)聽當(dāng)前group中其他consumer的"leave"和"join";只要此znode path下節(jié)點(diǎn)列表變更,都會(huì)觸發(fā)此group下consumer的負(fù)載均衡.(比如一個(gè)consumer失效,那么其他consumer接管partitions).
C) 在"Broker id registry"節(jié)點(diǎn)下,注冊(cè)一個(gè)watch用來監(jiān)聽broker的存活情況;如果broker列表變更,將會(huì)觸發(fā)所有的groups下的consumer重新balance.

??總結(jié)?? :
Producer端使用zookeeper用來"發(fā)現(xiàn)"broker列表,以及和Topic下每個(gè)partition leader建立socket連接并發(fā)送消息.
Broker端使用zookeeper用來注冊(cè)broker信息,以及監(jiān)測(cè)partition leader存活性.
Consumer端使用zookeeper用來注冊(cè)consumer信息,其中包括consumer消費(fèi)的partition列表等,同時(shí)也用來發(fā)現(xiàn)broker列表,并和partition leader建立socket連接,并獲取消息。

  • Leader的選擇 :
    1. 必須保證贡避,一旦一個(gè)消息被提交了痛黎,但是leader down掉了予弧,新選出的leader必須可以提供這條消息。大部分的分布式系統(tǒng)采用了多數(shù)投票法則選擇新的leader,對(duì)于多數(shù)投票法則湖饱,就是根據(jù)所有副本節(jié)點(diǎn)的狀況動(dòng)態(tài)的選擇最適合的作為leader.Kafka并不是使用這種方法掖蛤。
    2. ??Kafka動(dòng)態(tài)維護(hù)了一個(gè)同步狀態(tài)的副本的集合(a set of in-sync replicas),簡(jiǎn)稱ISR井厌,在這個(gè)集合中的節(jié)點(diǎn)都是和leader保持高度一致的蚓庭,任何一條消息必須被這個(gè)集合中的每個(gè)節(jié)點(diǎn)讀取并追加到日志中了,才會(huì)通知外部這個(gè)消息已經(jīng)被提交了仅仆。因此這個(gè)集合中的任何一個(gè)節(jié)點(diǎn)隨時(shí)都可以被選為leader.ISR在ZooKeeper中維護(hù)彪置。ISR中有f+1個(gè)節(jié)點(diǎn),就可以允許在f個(gè)節(jié)點(diǎn)down掉的情況下不會(huì)丟失消息并正常提供服蝇恶。ISR的成員是動(dòng)態(tài)的拳魁,如果一個(gè)節(jié)點(diǎn)被淘汰了,當(dāng)它重新達(dá)到“同步中”的狀態(tài)時(shí)撮弧,他可以重新加入ISR.這種leader的選擇方式是非撑税茫快速的,適合kafka的應(yīng)用場(chǎng)景贿衍。
  • 副本管理
    1. ??Kafka盡量的使所有分區(qū)均勻的分布到集群所有的節(jié)點(diǎn)上而不是集中在某些節(jié)點(diǎn)上授舟,另外主從關(guān)系也盡量均衡,這樣每個(gè)節(jié)點(diǎn)都會(huì)擔(dān)任一定比例的分區(qū)的leader.
    2. 優(yōu)化leader的選擇過程也是很重要的贸辈,它決定了系統(tǒng)發(fā)生故障時(shí)的空窗期有多久释树。Kafka選擇一個(gè)節(jié)點(diǎn)作為“controller”,當(dāng)發(fā)現(xiàn)有節(jié)點(diǎn)down掉的時(shí)候它負(fù)責(zé)在有用分區(qū)的所有節(jié)點(diǎn)中選擇新的leader,這使得Kafka可以批量的高效的管理所有分區(qū)節(jié)點(diǎn)的主從關(guān)系。如果controller down掉了擎淤,活著的節(jié)點(diǎn)中的一個(gè)會(huì)備切換為新的controller.
  • Leader與副本同步
    1. ??對(duì)于某個(gè)分區(qū)來說奢啥,保存正分區(qū)的"broker"為該分區(qū)的"leader",保存?zhèn)浞莘謪^(qū)的"broker"為該分區(qū)的"follower"嘴拢。備份分區(qū)會(huì)完全復(fù)制正分區(qū)的消息桩盲,包括消息的編號(hào)等附加屬性值。為了保持正分區(qū)和備份分區(qū)的內(nèi)容一致席吴,Kafka采取的方案是在保存?zhèn)浞莘謪^(qū)的"broker"上開啟一個(gè)消費(fèi)者進(jìn)程進(jìn)行消費(fèi)赌结,從而使得正分區(qū)的內(nèi)容與備份分區(qū)的內(nèi)容保持一致。一般情況下孝冒,一個(gè)分區(qū)有一個(gè)“正分區(qū)”和零到多個(gè)“備份分區(qū)”柬姚。可以配置“正分區(qū)+備份分區(qū)”的總數(shù)量庄涡,關(guān)于這個(gè)配置量承,不同主題可以有不同的配置值。注意,生產(chǎn)者宴合,消費(fèi)者只與保存正分區(qū)的"leader"進(jìn)行通信焕梅。
    2. Kafka允許topic的分區(qū)擁有若干副本,這個(gè)數(shù)量是可以配置的卦洽,你可以為每個(gè)topic配置副本的數(shù)量贞言。Kafka會(huì)自動(dòng)在每個(gè)副本上備份數(shù)據(jù),所以當(dāng)一個(gè)節(jié)點(diǎn)down掉時(shí)數(shù)據(jù)依然是可用的阀蒂。
    3. ??創(chuàng)建副本的單位是topic的分區(qū)该窗,每個(gè)分區(qū)都有一個(gè)leader和零或多個(gè)followers.所有的讀寫操作都由leader處理,一般分區(qū)的數(shù)量都比broker的數(shù)量多的多蚤霞,各分區(qū)的leader均勻的分布在brokers中酗失。所有的followers都復(fù)制leader的日志,日志中的消息和順序都和leader中的一致昧绣。followers向普通的consumer那樣從leader那里拉取消息并保存在自己的日志文件中规肴。
    4. ??Kafka判斷一個(gè)節(jié)點(diǎn)是否活著有兩個(gè)條件 :
      ① 節(jié)點(diǎn)必須可以維護(hù)和ZooKeeper的連接,Zookeeper通過心跳機(jī)制檢查每個(gè)節(jié)點(diǎn)的連接夜畴。
      ② 如果節(jié)點(diǎn)是個(gè)follower,他必須能及時(shí)的同步leader的寫操作拖刃,延時(shí)不能太久。
    5. ??只有當(dāng)消息被所有的副本加入到日志中時(shí)贪绘,才算是“committed”兑牡,只有committed的消息才會(huì)發(fā)送給consumer,這樣就不用擔(dān)心一旦leader down掉了消息會(huì)丟失税灌。Producer也可以選擇是否等待消息被提交的通知均函,這個(gè)是由參數(shù)acks決定的
  • ????partiton中文件存儲(chǔ)方式 :
    1. 每個(gè)partion(目錄)相當(dāng)于一個(gè)巨型文件被平均分配到多個(gè)大小相等segment(段)數(shù)據(jù)文件中菱涤。但每個(gè)段segment file消息數(shù)量不一定相等苞也,這種特性方便old segment file快速被刪除
    2. 每個(gè)partiton只需要支持順序讀寫就行了狸窘,segment文件生命周期由服務(wù)端配置參數(shù)決定墩朦。
    3. 這樣做的好處就是能快速刪除無用文件坯认,有效提高磁盤利用率翻擒。
  • ????partiton中segment文件存儲(chǔ)結(jié)構(gòu) :
    1. producer發(fā)message到某個(gè)topic,message會(huì)被均勻的分布到多個(gè)partition上(隨機(jī)或根據(jù)用戶指定的回調(diào)函數(shù)進(jìn)行分布)牛哺,kafka broker收到message往對(duì)應(yīng)partition leader的最后一個(gè)segment上添加該消息陋气,當(dāng)某個(gè)segment上的消息條數(shù)達(dá)到配置值或消息發(fā)布時(shí)間超過閾值時(shí),segment上的消息會(huì)被flush到磁盤引润,只有flush到磁盤上的消息consumer才能消費(fèi)巩趁,segment達(dá)到一定的大小后將不會(huì)再往該segment寫數(shù)據(jù),broker會(huì)創(chuàng)建新的segment
    2. 每個(gè)part在內(nèi)存中對(duì)應(yīng)一個(gè)index议慰,記錄每個(gè)segment中的第一條消息偏移蠢古。
    3. segment file組成由2大部分組成,分別為index file和data file别凹,此2個(gè)文件一 一對(duì)應(yīng)草讶,成對(duì)出現(xiàn),后綴".index"和“.log”分別表示為segment索引文件炉菲、數(shù)據(jù)文件.
    4. segment文件命名規(guī)則:partion全局的第一個(gè)segment從0開始堕战,后續(xù)每個(gè)segment文件名為上一個(gè)全局partion的最大offset(偏移message數(shù))。數(shù)值最大為64位long大小拍霜,19位數(shù)字字符長(zhǎng)度嘱丢,沒有數(shù)字用0填充
    5. 每個(gè)segment中存儲(chǔ)很多條消息祠饺,消息id由其邏輯位置決定越驻,即從消息id可直接定位到消息的存儲(chǔ)位置,避免id到位置的額外映射道偷。
    6. 下圖所示segment文件列表形象說明了上述2個(gè)規(guī)則
      Kafka文件格式
      索引文件存儲(chǔ)大量元數(shù)據(jù)伐谈,數(shù)據(jù)文件存儲(chǔ)大量消息,索引文件中元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中message的物理偏移地址试疙。其中以索引文件中 元數(shù)據(jù)3,497為例诵棵,依次在數(shù)據(jù)文件中表示第3個(gè)message(在全局partiton表示第368772個(gè)message)、以及該消息的物理偏移地址為497祝旷。
      message

      關(guān)鍵字解釋說明
      8 byte offset在parition(分區(qū))內(nèi)的每條消息都有一個(gè)有序的id號(hào)履澳,這個(gè)id號(hào)被稱為偏移(offset),它可以唯一確定每條消息在parition(分區(qū))內(nèi)的位置。即offset表示partiion的第多少message
      ② 4 byte message sizemessage大小
      ③ 4 byte CRC32用crc32校驗(yàn)message
      ④ 1 byte “magic"表示本次發(fā)布Kafka服務(wù)程序協(xié)議版本號(hào)
      ⑤ 1 byte “attributes"表示為獨(dú)立版本怀跛、或標(biāo)識(shí)壓縮類型距贷、或編碼類型。
      ⑥ 4 byte key length表示key的長(zhǎng)度,當(dāng)key為-1時(shí)吻谋,K byte key字段不填
      ⑦ K byte key可選
      ⑧ value bytes payload表示實(shí)際消息數(shù)據(jù)忠蝗。
    7. ??segment index file采取稀疏索引存儲(chǔ)方式,它減少索引文件大小漓拾,通過mmap可以直接內(nèi)存操作阁最,稀疏索引為數(shù)據(jù)文件的每個(gè)對(duì)應(yīng)message設(shè)置一個(gè)元數(shù)據(jù)指針,它 比稠密索引節(jié)省了更多的存儲(chǔ)空間,但查找起來需要消耗更多的時(shí)間骇两。
      8.??kafka會(huì)記錄offset到zk中速种。但是,zk client api對(duì)zk的頻繁寫入是一個(gè)低效的操作低千。0.8.2 kafka引入了native offset storage配阵,將offset管理從zk移出,并且可以做到水平擴(kuò)展其原理就是利用了kafka的compacted topic棋傍,offset以consumer group,topic與partion的組合作為key直接提交到compacted topic中救拉。同時(shí)Kafka又在內(nèi)存中維護(hù)了三元組來保存最新的offset信息,consumer來取最新offset信息的時(shí)候直接內(nèi)存里拿即可瘫拣。當(dāng)然近上,kafka允許你快速的checkpoint最新的offset信息到磁盤上
  • Partition Replication原則 :
    1. Kafka高效文件存儲(chǔ)設(shè)計(jì)特點(diǎn) :
      ① Kafka把topic中一個(gè)parition大文件分成多個(gè)小文件段拂铡,通過多個(gè)小文件段壹无,就容易定期清除或刪除已經(jīng)消費(fèi)完文件,減少磁盤占用感帅。
      通過索引信息可以快速定位message和確定response的最大大小斗锭。
      通過index元數(shù)據(jù)全部映射到memory,可以避免segment file的IO磁盤操作失球。
      通過索引文件稀疏存儲(chǔ)岖是,可以大幅降低index文件元數(shù)據(jù)占用空間大小
  • ??高效的數(shù)據(jù)傳輸 :
    1. 發(fā)布者每次可發(fā)布多條消息(將消息加到一個(gè)消息集合中發(fā)布)实苞,consumer每次迭代消費(fèi)一條消息豺撑。
    2. 不創(chuàng)建單獨(dú)的cache,使用系統(tǒng)的page cache黔牵。發(fā)布者順序發(fā)布聪轿,訂閱者通常比發(fā)布者滯后一點(diǎn)點(diǎn),直接使用Linux的page cache效果比較好猾浦,同時(shí)減少了cache管理及垃圾收集的開銷陆错。
    3. page cache中的每個(gè)文件都是一棵基數(shù)樹(radix tree,本質(zhì)上是多叉搜索樹)金赦,樹的每個(gè)節(jié)點(diǎn)都是一個(gè)頁(yè)音瓷。根據(jù)文件內(nèi)的偏移量就可以快速定位到所在的頁(yè)
    4. Kafka為什么不自己管理緩存夹抗,而非要用page cache绳慎?原因有如下三點(diǎn)
      JVM中一切皆對(duì)象,數(shù)據(jù)的對(duì)象存儲(chǔ)會(huì)帶來所謂object overhead漠烧,浪費(fèi)空間杏愤;
      如果由JVM來管理緩存,會(huì)受到GC的影響沽甥,并且過大的堆也會(huì)拖累GC的效率声邦,降低吞吐量;
      一旦程序崩潰摆舟,自己管理的緩存數(shù)據(jù)會(huì)全部丟失
      page cache
    5. producer生產(chǎn)消息時(shí),會(huì)使用pwrite()系統(tǒng)調(diào)用對(duì)應(yīng)到Java NIO中是FileChannel.write() API按偏移量寫入數(shù)據(jù)恨诱,并且都會(huì)先寫入page cache里媳瞪。
    6. consumer消費(fèi)消息時(shí),會(huì)使用sendfile()系統(tǒng)調(diào)用【對(duì)應(yīng)FileChannel.transferTo() API】照宝,零拷貝地將數(shù)據(jù)從page cache傳輸?shù)絙roker的Socket buffer蛇受,再通過網(wǎng)絡(luò)傳輸
    7. 同時(shí)厕鹃,page cache中的數(shù)據(jù)會(huì)隨著內(nèi)核中flusher線程的調(diào)度以及對(duì)sync()/fsync()的調(diào)用寫回到磁盤兢仰,就算進(jìn)程崩潰,也不用擔(dān)心數(shù)據(jù)丟失剂碴。

kafka分區(qū)的原因:

  • 指明partition的情況下把将,直接將指明的值直接作為partition的值
  • 沒有指明partition值但有key的值忆矛,將key的hash值與toptic的partition數(shù)進(jìn)行取余得到partition值察蹲。
  • 既沒有partition值又沒有key值的情況下,第一次調(diào)用時(shí)隨機(jī)生成一個(gè)整數(shù)(后面每次調(diào)用在這個(gè)整數(shù)上自增)催训,將這個(gè)值與 topic 可用的 partition 總數(shù)取余得到 partition 值洽议,也就是常說的 round-robin 算法

kafka高吞吐量的原因:

  • kafka的消息是不斷追加到文件中的漫拭,這個(gè)特性使kafka可以充分利用磁盤的順序讀寫性能亚兄,順序讀寫不需要硬盤磁頭的尋道時(shí)間,只需很少的扇區(qū)旋轉(zhuǎn)時(shí)間采驻,所以速度遠(yuǎn)快于隨機(jī)讀寫儿捧。生產(chǎn)者負(fù)責(zé)寫入數(shù)據(jù),Kafka會(huì)將消息持久化到磁盤挑宠,保證不會(huì)丟失數(shù)據(jù)菲盾,Kafka采用了兩個(gè)技術(shù)提高寫入的速度
    1. 順序?qū)懭?/strong>:硬盤是機(jī)械結(jié)構(gòu),需要指針尋址找到存儲(chǔ)數(shù)據(jù)的位置各淀,所以懒鉴,如果是隨機(jī)IO,磁盤會(huì)進(jìn)行頻繁的尋址碎浇,導(dǎo)致寫入速度下降临谱。Kafka使用了順序IO提高了磁盤的寫入速度,Kafka會(huì)將數(shù)據(jù)順序插入到文件末尾奴璃,消費(fèi)者端通過控制偏移量來讀取消息悉默,這樣做會(huì)導(dǎo)致數(shù)據(jù)無法刪除,時(shí)間一長(zhǎng)苟穆,磁盤空間會(huì)滿抄课,kafka提供了2種策略來刪除數(shù)據(jù):基于時(shí)間刪除和基于partition文件的大小刪除唱星。
    2. Memory Mapped Files:這個(gè)和Java NIO中的內(nèi)存映射基本相同,跟磨,mmf直接利用操作系統(tǒng)的Page來實(shí)現(xiàn)文件到物理內(nèi)存的映射间聊,完成之后對(duì)物理內(nèi)存的操作會(huì)直接同步到硬盤。mmf通過內(nèi)存映射的方式大大提高了IO速率抵拘,省去了用戶空間到內(nèi)核空間的復(fù)制哎榴。它的缺點(diǎn)顯而易見--不可靠,當(dāng)發(fā)生宕機(jī)而數(shù)據(jù)未同步到硬盤時(shí)僵蛛,數(shù)據(jù)會(huì)丟失尚蝌,Kafka提供了produce.type參數(shù)來控制是否主動(dòng)的進(jìn)行刷新,如果kafka寫入到mmp后立即flush再返回給生產(chǎn)者則為同步模式充尉,反之為異步模式飘言。
  • 零拷貝平時(shí)從服務(wù)器讀取靜態(tài)文件時(shí),服務(wù)器先將文件從磁盤復(fù)制到內(nèi)核空間喉酌,再?gòu)?fù)制到用戶空間热凹,最后再?gòu)?fù)制到內(nèi)核空間并通過網(wǎng)卡發(fā)送出去,而零拷貝則是直接從內(nèi)核到內(nèi)核再到網(wǎng)卡泪电,省去了用戶空間的復(fù)制般妙。
    1. Kafka把所有的消息存放到一個(gè)文件中,當(dāng)消費(fèi)者需要數(shù)據(jù)的時(shí)候直接將文件發(fā)送給消費(fèi)者相速,比如10W的消息共10M碟渺,全部發(fā)送給消費(fèi)者,10M的消息在內(nèi)網(wǎng)中傳輸是非惩晃埽快的苫拍,假如需要1s,那么kafka的tps就是10w旺隙。Zero copy對(duì)應(yīng)的是Linux中sendfile函數(shù)绒极,這個(gè)函數(shù)會(huì)接受一個(gè)offsize來確定從哪里開始讀取現(xiàn)實(shí)中蔬捷,不可能將整個(gè)文件全部發(fā)給消費(fèi)者垄提,他通過消費(fèi)者傳遞過來的偏移量來使用零拷貝讀取指定內(nèi)容的數(shù)據(jù)返回給消費(fèi)者

kafka消息丟失的原因:

  • 由于Producer端設(shè)置消息發(fā)送ack=0周拐,表示發(fā)送完成即可铡俐,不會(huì)關(guān)心broker的確認(rèn)回復(fù)。
  • 高階消費(fèi)的時(shí)候妥粟,由于消息消費(fèi)是自動(dòng)提交的方法审丘,由于自動(dòng)提交時(shí)間默認(rèn)是1秒,消費(fèi)端拉取的消息還沒有處理完勾给,但是消費(fèi)進(jìn)度offset已經(jīng)更新了滩报,此時(shí)如果消費(fèi)端掛了锅知,可能存在1秒內(nèi)的數(shù)據(jù)丟失
  • ack=all的時(shí)候并不能保證數(shù)據(jù)不會(huì)丟失露泊,acks=all必須跟ISR列表里至少有2個(gè)以上的副本配合使用喉镰,起碼是有一個(gè)Leader和一個(gè)Follower才可以旅择。

kafka消息重復(fù)消費(fèi)的原因:

  • 由于使用高階消費(fèi)惭笑,自動(dòng)提交過程,拉取的消息1秒內(nèi)已經(jīng)處理完生真,但是還沒有觸發(fā)提交的時(shí)間沉噩,此時(shí)消費(fèi)進(jìn)度offset沒有更新,下次拉取時(shí)就會(huì)存在重復(fù)消費(fèi)的請(qǐng)客柱蟀。

??kafka故障處理機(jī)制:

  • Replica 消息圖示:
  • 圖片摘自CSDN川蒙,若涉及侵權(quán),請(qǐng)聯(lián)系作者及時(shí)刪除
  • follower故障 : follower發(fā)生故障后會(huì)被臨時(shí)踢出ISR长已,待該follower恢復(fù)后畜眨,follower會(huì)讀取本地磁盤記錄的上次的HW,并將log文件高于HW的部分截取掉术瓮,從HW開始向leader進(jìn)行同步康聂。等該follower的LEO大于等于該P(yáng)artition的HW,即follower追上leader之后胞四,就可以重新加入ISR了恬汁。
  • leader故障 : leader發(fā)生故障之后,會(huì)從ISR中選出一個(gè)新的leader辜伟,之后氓侧,為保證多個(gè)副本之間的數(shù)據(jù)一致性,其余的follower會(huì)先將各自的log文件高于HW的部分截掉导狡,然后從新的leader同步數(shù)據(jù)约巷。
  • 注意:這只能保證副本之間的數(shù)據(jù)一致性,并不能保證數(shù)據(jù)不丟失或者不重復(fù)旱捧。

kafka broker 宕機(jī) :

  • 只需要新啟動(dòng)一個(gè)broker, 把broker.id設(shè)置為 損壞的那個(gè)broker的id, 就會(huì)自動(dòng)復(fù)制過去丟失的數(shù)據(jù)独郎。

kafka常用命令:

查詢 :

## 查詢集群描述
bin/kafka-topics.sh --describe --zookeeper 

## topic列表查詢
bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181 

## topic列表查詢(支持0.9版本+)
bin/kafka-topics.sh --list --bootstrap-server localhost:9092

## 新消費(fèi)者列表查詢(支持0.9版本+)
bin/kafka-consumer-groups.sh --list  --new-consumer --bootstrap-server localhost:9092 

## 新消費(fèi)者列表查詢(支持0.10版本+)
bin/kafka-consumer-groups.sh --list  --bootstrap-server localhost:9092 

## 顯示某個(gè)消費(fèi)組的消費(fèi)詳情(僅支持offset存儲(chǔ)在zookeeper上的)
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test

## 顯示某個(gè)消費(fèi)組的消費(fèi)詳情(0.9版本 - 0.10.1.0 之前)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group

## 顯示某個(gè)消費(fèi)組的消費(fèi)詳情(0.10.1.0版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

發(fā)送和消費(fèi)

## 生產(chǎn)者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

## 消費(fèi)者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test

## 新生產(chǎn)者(支持0.9版本+)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties

## 新消費(fèi)者(支持0.9版本+)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties

## 高級(jí)點(diǎn)的用法
bin/kafka-simple-consumer-shell.sh --brist localhost:9092 --topic test --partition 0 --offset 1234  --max-messages 10

平衡Leader:

bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot

kafka自帶壓測(cè)工具:

bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100  --producer-props bootstrap.servers=localhost:9092

分區(qū)擴(kuò)容 :

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2

? 本文部分內(nèi)容參見 美團(tuán)技術(shù)團(tuán)隊(duì)

  1. ? 文章要是勘誤或者知識(shí)點(diǎn)說的不正確,歡迎評(píng)論廊佩,畢竟這也是作者通過閱讀源碼獲得的知識(shí)囚聚,難免會(huì)有疏忽!
  2. ? 要是感覺文章對(duì)你有所幫助标锄,不妨點(diǎn)個(gè)關(guān)注顽铸,或者移駕看一下作者的其他文集,也都是干活多多哦料皇,文章也在全力更新中谓松。
  3. ? 著作權(quán)歸作者所有星压。商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處鬼譬!
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末娜膘,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子优质,更是在濱河造成了極大的恐慌竣贪,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,378評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件巩螃,死亡現(xiàn)場(chǎng)離奇詭異演怎,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)避乏,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門爷耀,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人拍皮,你說我怎么就攤上這事歹叮。” “怎么了铆帽?”我有些...
    開封第一講書人閱讀 152,702評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵咆耿,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我锄贼,道長(zhǎng)票灰,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,259評(píng)論 1 279
  • 正文 為了忘掉前任宅荤,我火速辦了婚禮屑迂,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘冯键。我一直安慰自己惹盼,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,263評(píng)論 5 371
  • 文/花漫 我一把揭開白布惫确。 她就那樣靜靜地躺著手报,像睡著了一般。 火紅的嫁衣襯著肌膚如雪改化。 梳的紋絲不亂的頭發(fā)上掩蛤,一...
    開封第一講書人閱讀 49,036評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音陈肛,去河邊找鬼揍鸟。 笑死,一個(gè)胖子當(dāng)著我的面吹牛句旱,可吹牛的內(nèi)容都是我干的阳藻。 我是一名探鬼主播晰奖,決...
    沈念sama閱讀 38,349評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼腥泥!你這毒婦竟也來了匾南?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,979評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤蛔外,失蹤者是張志新(化名)和其女友劉穎蛆楞,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體冒萄,經(jīng)...
    沈念sama閱讀 43,469評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡臊岸,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,938評(píng)論 2 323
  • 正文 我和宋清朗相戀三年橙数,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了尊流。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,059評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡灯帮,死狀恐怖崖技,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情钟哥,我是刑警寧澤迎献,帶...
    沈念sama閱讀 33,703評(píng)論 4 323
  • 正文 年R本政府宣布,位于F島的核電站腻贰,受9級(jí)特大地震影響吁恍,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜播演,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,257評(píng)論 3 307
  • 文/蒙蒙 一冀瓦、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧写烤,春花似錦翼闽、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至暂衡,卻和暖如春询微,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背狂巢。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來泰國(guó)打工撑毛, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人隧膘。 一個(gè)月前我還...
    沈念sama閱讀 45,501評(píng)論 2 354
  • 正文 我出身青樓代态,卻偏偏與公主長(zhǎng)得像寺惫,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子蹦疑,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,792評(píng)論 2 345