Kafka源碼解析與實(shí)戰(zhàn)

Kafka的架構(gòu)

包括Kafka的基本組成朝抖,Kafka的拓?fù)浣Y(jié)構(gòu)以及Kafka的內(nèi)部通信協(xié)議糊闽。Kafka內(nèi)部的通信協(xié)議是建立在Kafka的拓?fù)浣Y(jié)構(gòu)之上梳玫,而Kafka的拓?fù)浣Y(jié)構(gòu)是由Kafka的基本模塊所組成的。

AK RELEASE 2.5.0

APRIL 15, 2020

Kafka的基本組成

Kafka集群中生產(chǎn)者將消息發(fā)送給以Topic命名的消息隊(duì)列Queue中右犹,消費(fèi)者訂閱發(fā)往以某個Topic命名的消息隊(duì)列Queue中的消息提澎。其中Kafka集群由若干個Broker組成,Topic由若干個Partition組成傀履,每個Partition里的消息通過Offset來獲取虱朵。

基本組成包括:

  • Broker:一臺Kafka服務(wù)器就是一個Broker,一個集群由多個Broker組成钓账,一個Broker可以容納多個Topic碴犬,Broker和Broker之間沒有Master和Standby的概念,他們之間地位是平等的

  • Topic:每條發(fā)送到Kafka集群的消息都屬于某個主題梆暮,這個主題就稱為Topic服协。物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存在一個或多個Broker上啦粹,但是用戶只需指定消息主題Topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不需要關(guān)心數(shù)據(jù)存放在何處偿荷。

  • Partition:為了實(shí)現(xiàn)可擴(kuò)展性,一個非常大的Topic可以被分為多個Partition唠椭,從而分布到多臺Broker上跳纳。Partition中的每條消息都會被分配一個自增Id(Offset)。Kafka只保證按照一個Partition中的順序?qū)⑾l(fā)送給消費(fèi)者贪嫂,但是不保證單個Topic中多個Partition之間的順序寺庄。

  • Offset:消息在Topic的Partition中的位置,同一個Partition中的消息隨著消息的寫入其對應(yīng)的Offset也自增力崇。

  • Replica:副本斗塘,Topic的Partition有N個副本,N為副本因子亮靴。其中一個Replica為Leader馍盟,其他都為Follower,Leader處理Partition的所有讀寫請求茧吊,F(xiàn)ollower定期同步Leader上的數(shù)據(jù)贞岭。

  • Message:消息是通信的基本單位八毯。每個Producer可以向一個Topic發(fā)布消息

  • Producer:消息生產(chǎn)者,將消息發(fā)布到指定的Topic中曹步,也能夠決定消息所屬的Partition:比如基于Round-Robin或者Hash算法

  • Consumer:消息消費(fèi)者宪彩,向指定的Topic獲取消息,根據(jù)指定Topic的分區(qū)索引及其對應(yīng)分區(qū)上的消息偏移量來獲取消息

  • Consumer Group:消費(fèi)者組讲婚,每個消費(fèi)者都屬于一個組尿孔。當(dāng)消費(fèi)者具有相同組時,消息會在消費(fèi)者之間負(fù)載均衡筹麸。一個Partition的消息只會被相同消費(fèi)者組中的某個消費(fèi)者消費(fèi)活合。不同消費(fèi)者組是相互獨(dú)立的。

  • Zookeeper:存放Kafka集群相關(guān)元數(shù)據(jù)的組件物赶。Zookeeper集群中保存了Topic的狀態(tài)信息白指,例如分區(qū)個數(shù)、分區(qū)組成酵紫、分區(qū)的分布情況等告嘲;保存Broker的狀態(tài)信息;保存消費(fèi)者的消費(fèi)信息等奖地。通過這些信息橄唬,Kafka很好地將消息生產(chǎn)、消息存儲参歹、消息消費(fèi)的過程結(jié)合起來仰楚。

Kafka的拓?fù)浣Y(jié)構(gòu)

一個典型的Kafka集群中包含若干個Producer(可以是某個模塊下發(fā)的Command,或者是Web前端產(chǎn)生的PageView犬庇,或者是服務(wù)器日志僧界,系統(tǒng)CPU、Memory等)臭挽,若干個Broker(Kafka集群支持水平擴(kuò)展捂襟,一般Broker數(shù)量越多,整個Kafka集群的吞吐率也就越高)欢峰,若干個Consumer Group笆豁,以及一個Zookeeper集群。Kafka通過Zookeeper管理集群配置赤赊。Producer使用Push模式將消息發(fā)布到Broker上,Consumer使用Pull模式從Broker上訂閱并消費(fèi)消息煞赢。

簡單的消息發(fā)送流程如下:

1. Producer根據(jù)指定的路由方法抛计,將消息Push到Topic的某個Partition里

2. Kafka集群接收到Producer發(fā)過來的消息并持久化到硬盤,并保留消息指定時長(可配置)照筑,不關(guān)注消息是否被消費(fèi)吹截。

3. Consumer從Kafka集群Pull數(shù)據(jù)瘦陈,并控制獲取消息的Offset

Kafka內(nèi)部的通信協(xié)議

Kafka內(nèi)部各個Broker之間的角色并不是完全相等的,Broker內(nèi)部負(fù)責(zé)管理分區(qū)和副本狀態(tài)以及異常情況下分區(qū)的重新分片等這些功能的模塊稱為KafkaController波俄。每個Kafka集群中有且只有一個Leader狀態(tài)的KafkaController晨逝,當(dāng)其出現(xiàn)異常時,其余Standby狀態(tài)的KafkaController會通過Zookeeper選舉出有一個Leader狀態(tài)的KafkaController懦铺。

維度一:通信協(xié)議詳情

  • ProducerRequest:生產(chǎn)者發(fā)送消息的請求捉貌,生產(chǎn)者將消息發(fā)送至Kafka集群中的某個Broker,Broker接收到此請求后持久化此消息并更新相關(guān)元數(shù)據(jù)信息冬念。

ProducerRequest.requiredAcks的取值為0時趁窃,生產(chǎn)者不關(guān)心Broker Server端持久化執(zhí)行結(jié)果,但是高級消費(fèi)者發(fā)送的提交偏移量的請求還是需要返回具體執(zhí)行結(jié)果急前。為1則生產(chǎn)者消費(fèi)者都需要將Broker Server端持久化的執(zhí)行結(jié)果返回客戶端醒陆。為-1時,不會立刻返回Broker Server端消息持久化的結(jié)果裆针,而是需要等待Partition的ISR列表中的Replica完成數(shù)據(jù)同步刨摩,并且ISR列表的個數(shù)大于min.insync.replicas時才會將響應(yīng)返回給對應(yīng)客戶端。這里采用的是稱為Purgatory的策略世吨。Broker Server上對應(yīng)的Partition的HighWatermark發(fā)生改變才觸發(fā)檢查澡刹。

  • TopicMetadataRequest:獲取Topic元數(shù)據(jù)信息的請求,無論是生產(chǎn)者還是消費(fèi)者都需要通過此請求來獲取感興趣的Topic的元數(shù)據(jù)另假。

  • FetchRequest:消費(fèi)者獲取Topic的某個分區(qū)的消息的請求像屋。分區(qū)狀態(tài)為Follower的副本也需要利用此請求去同步分區(qū)狀態(tài)為Leader的對應(yīng)副本數(shù)據(jù)。

  • OffsetRequest:消費(fèi)者發(fā)送至Kafka集群來獲取感興趣Topic的分區(qū)偏移量的請求边篮,通過此請求可以獲知當(dāng)前Topic所有分區(qū)在不同時間段的偏移量詳情己莺。

  • OffsetCommitRequest:消費(fèi)者提交Topic被消費(fèi)的分區(qū)偏移量信息至Broker,Broker接收到此請求后持久化相關(guān)偏移量信息戈轿。

  • OffsetFetchRequest:消費(fèi)者發(fā)送獲取提交至Kafka集群的相關(guān)Topic被消費(fèi)詳細(xì)信息凌受,和OffsetCommitRequest相互對應(yīng)。

  • LeaderAndIsrRequest:當(dāng)Topic的某個分區(qū)狀態(tài)發(fā)送變化時思杯,處于Leader狀態(tài)的KafkaController發(fā)送至相關(guān)Broker通知其做出相應(yīng)處理胜蛉。

當(dāng)某個Replica稱為Leader:暫停Fetch→添加進(jìn)Assigned Replica列表→添加進(jìn)In-Sync Replica列表→刪除已經(jīng)不存在的Assigned Replica→初始化Leader Replica的HighWatermark

當(dāng)某個Replica成為Follower:暫停舊的Fetch線程→截?cái)鄶?shù)據(jù)至HighWatermark以下→開啟新的Fetch線程→添加進(jìn)Assigned Replica列表→刪除已經(jīng)不存在的Assigned Replica

  • StopReplicaRequest:當(dāng)Topic的某個分區(qū)被刪除或者下線的時候,處于Leader狀態(tài)KafkaController發(fā)送至相關(guān)Broker通知其做出相應(yīng)處理色乾。

  • UpdateMetadataRequest:當(dāng)Topic的元數(shù)據(jù)信息發(fā)生變化時誊册,處于Leader狀態(tài)的KafkaController發(fā)送至相關(guān)Broker通知其做出相應(yīng)處理。

  • BrokerControllerShutdownRequest:當(dāng)Broker正常下線時暖璧,發(fā)送此請求到處于Leader狀態(tài)的KafkaController案怯。

  • ConsumerMetadataRequest:獲取保存特定Consumer Group消費(fèi)詳情的分區(qū)信息

維度二:通信協(xié)議交互

  • Producer和Kafka集群:Producer需要利用ProducerRequest和TopicMetadataRequest來完成Topic元數(shù)據(jù)的查詢、消息的發(fā)送澎办。

  • Consumer和Kafka集群:Consumer需要利用TopicMetadataRequest請求嘲碱、FetchRequest請求金砍、OffsetRequest、OffsetCommitRequest麦锯、OffsetFetchRequest恕稠、ConsumerMetadataRequest來完成Topic元數(shù)據(jù)的查詢、消息的訂閱扶欣、歷史偏移量的查詢鹅巍、偏移量的提交、當(dāng)前偏移量的查詢宵蛀。

  • KafkaController狀態(tài)為Leader的Broker和KafkaController狀態(tài)為Standby的Broker:Leader需要用LeaderAndIsrRequest昆著、StopReplicaRequest、UpdateMetadataRequest來完成對Topic的管理术陶。Standby需要利用BrokerControllerShutdownRequest來通知Leader自己的下線動作凑懂。

  • Broker和Broker之間:Broker相互之間需要利用FetchRequest請求來同步Topic分區(qū)的副本數(shù)據(jù),這樣才能使Topic分區(qū)各副本數(shù)據(jù)保持一致梧宫。

Broker概述

Broker內(nèi)部存在的功能模塊包括SocketServer接谨、KafkaRequestHandlerPool、LogManager塘匣、ReplicaManager脓豪、OffsetManager、KafkaScheduler忌卤、KafkaApis扫夜、KafkaHealthcheck和TopicConfigManager九大基本模塊以及KafkaController集群控制管理模塊。

  • SocketServer:首先開啟一個Acceptor線程驰徊,新的Socket連接成功建立時會將對應(yīng)的SocketChannel以輪詢方式轉(zhuǎn)發(fā)給N個Processor線程中的某一個笤闯,由其處理接下來SocketChannel的請求,將請求放置在RequestChannel中的請求隊(duì)列棍厂;當(dāng)Processor線程監(jiān)聽到SocketChannel請求的響應(yīng)時颗味,會將響應(yīng)從RequestChannel中的響應(yīng)隊(duì)列中取出來并發(fā)給客戶端

  • KafkaRequestHandlerPool:真正處理Socket請求的線程池,其個數(shù)默認(rèn)為8個牺弹,由參數(shù)num.io.threads決定浦马。該線程池里面的線程KafkaRequestHandler從RequestChannel的請求隊(duì)列中獲取Socket請求,然后調(diào)用KafkaApis完成真正業(yè)務(wù)邏輯最后將響應(yīng)寫回至RequestChannel中的響應(yīng)隊(duì)列张漂,并交給SocketServer中對應(yīng)的Processer線程發(fā)給客戶端晶默。

  • LogManager:Kafka的日志管理模塊,主要提供針刪除任何過期數(shù)據(jù)和冗余數(shù)據(jù)航攒,刷新臟數(shù)據(jù)磺陡,對日志文件進(jìn)行Checkpoint以及日志合并的功能。

負(fù)責(zé)提供Broker Server上Topic的分區(qū)數(shù)據(jù)讀取和寫入功能,負(fù)責(zé)讀取和寫入位于Broker Server上的所有分區(qū)副本數(shù)據(jù)仅政。如果Partition有多個Replica,則每個Broker Server不會存在相同Partition的Replica盆驹,如果存在一旦遇到Broker Server下線會丟失Partition的多份副本可用性降低圆丹。

LogManager中包含多個TopicAndPartition,每個TopicAndPartition對應(yīng)一個Log躯喇,每個Log中包含多個LogSegment(每個LogSegment文件包括一個日志數(shù)據(jù)文件【.log】和兩個索引文件(偏移量索引文件【.index】和消息時間戳索引文件【.timeindex】))辫封。LogSegment的結(jié)構(gòu)中l(wèi)og代表消息集合,每條消息都有一個Offset廉丽,這是針對Partition中的偏移量倦微;index代表的是消息的索引信息,以KV對的形式記錄正压,其中K為消息在log中的相對偏移量欣福,V為消息在log中的絕對位置;baseOffset代表的是該LogSegment日志段的起始偏移量焦履;indexIntervalByte代表的索引粒度拓劝,即寫入多少字節(jié)之后生成一條索引。OffsetIndex不會保存每條消息的索引嘉裤,因此其索引文件是一個稀疏索引文件(稀疏索引:索引項(xiàng)中只對應(yīng)主文件中的部分記錄郑临,即不會給每條記錄建立索引)。

image.png

后臺還會維護(hù)一個日志合并線程屑宠,Kafka發(fā)送消息的時候需要攜帶3個參數(shù)(Topic厢洞,Key,Message)典奉,針對相同的Key值不同的Message只保留最后一個Key值對應(yīng)的消息內(nèi)容躺翻。

  • ReplicaManager:Kafka副本管理模塊。主要提供針對Topic分區(qū)副本數(shù)據(jù)的管理功能秋柄,包括有關(guān)副本的Leader和ISR的狀態(tài)變化获枝、副本的刪除、副本的監(jiān)測等骇笔。ISR全稱是In-Syn Replicas省店,處于同步狀態(tài)的副本。AR全稱是Assign Replicas的縮寫笨触,代表分配給Partition的副本懦傍。

主要利用ReplicaFetcherThread(副本數(shù)據(jù)拉取線程)和High Watermark Mechanism(高水位線機(jī)制)來實(shí)現(xiàn)數(shù)據(jù)的同步管理。單個ReplicaFetcherThread線程負(fù)責(zé)某個Broker Server上部分TopicAndPartition的Replica數(shù)據(jù)同步芦劣。將拉取的消息寫入log粗俱,更新當(dāng)前Replica的HighWatermark,代表的是ISR中所有replicas的last commited message的最小起始偏移量虚吟。當(dāng)某個Broker Server上被分配到Replica的時候會進(jìn)入becomeLeaderOrFollower處理流程寸认;當(dāng)Replica被刪除為進(jìn)入stopReplicas處理流程签财;當(dāng)Follower狀態(tài)的Replica長時間沒有同步Leader狀態(tài)的Replica的時候會進(jìn)入maybeShrinkIsr處理流程。

  • OffsetManager:Kafka的偏移量管理模塊偏塞,主要提供針對偏移量的保存讀取的功能唱蒸,兩種方式保存:一種是把偏移量保存到Zookeeper上,另一種是Kafka灸叼,把偏移量提交至Kafka內(nèi)部Topic為"__consumer_offsets"的日志里面神汹,主要由offsets.storage參數(shù)決定。默認(rèn)為Zookeeper古今。

  • KafkaScheduler:Kafka的后臺任務(wù)調(diào)度資源池屁魏。提供后臺定時任務(wù)的調(diào)度,主要為LogManager捉腥、ReplicaManager氓拼、OffsetManager提供調(diào)度服務(wù)。

  • KafkaApis:Kafka的業(yè)務(wù)邏輯實(shí)現(xiàn)層但狭,根據(jù)不同的Request執(zhí)行不同的操作披诗,其中利用LogManager、ReplicaManager立磁、OffsetManager來完成內(nèi)部處理呈队。

  • KafkaHealthcheck:Broker Server在./brokers/ids上注冊自己的ID,當(dāng)Broker在線的時候唱歧,則對應(yīng)的ID存在宪摧;當(dāng)離線時對應(yīng)ID不存在,從而達(dá)到集群狀態(tài)監(jiān)測的目的颅崩。

  • TopicConfigManager:在/config/changes上注冊自己的回調(diào)函數(shù)來監(jiān)測Topic配置信息的變化

  • KafkaController:Kafka集群控制管理模塊几于。由于Zookeeper上保存了Kafka機(jī)器的元數(shù)據(jù)信息,因?yàn)镵afkaController通過在不同目錄注冊不同的回調(diào)函數(shù)來達(dá)到監(jiān)測集群狀態(tài)的目的沿后,以及響應(yīng)集群狀態(tài)的變化:

    1. /controller目錄保存了Kafka集群中狀態(tài)為Leader的KafkaController標(biāo)識沿彭,通過監(jiān)測這個目錄的變化可以即使響應(yīng)KafkaController狀態(tài)的切換

    2. /admin/reassign_partitions目錄保存了Topic重分區(qū)的信息,通過監(jiān)測這個目錄的變化可以及時響應(yīng)Topic分區(qū)變化的請求

    3. /admin/preferred_replica_election目錄保存了Topic分區(qū)副本的信息尖滚,通過監(jiān)測這個目錄的變化可以及時響應(yīng)Topic分區(qū)副本變化的請求喉刘。

    4. /brokers/topics目錄保存了Topic的信息,通過監(jiān)測這個目錄的變化可以及時響應(yīng)Topic創(chuàng)建和刪除的請求漆弄。

    5. /brokers/ids目錄保存了Broker的狀態(tài)睦裳,通過監(jiān)測這個目錄的變化可以及時響應(yīng)Broker的上下線情況。

Broker的控制管理模塊

每個Broker內(nèi)部都會存在一個KafkaController模塊撼唾,但是有且只有一個Broker內(nèi)部的KafkaController模塊對外提供控制管理Kafka集群的功能廉邑,例如負(fù)責(zé)Topic的創(chuàng)建熬甫、分區(qū)的重分配以及分區(qū)副本Leader的重新選舉等颂斜。

KafkaController的選舉策略

Leader和Follower的選舉是基于Zookeeper實(shí)現(xiàn)的,嘗試在Zookeeper的相同路徑上創(chuàng)建瞬時節(jié)點(diǎn)(Ephemeral Node)筏勒,只有一個KafkaController會創(chuàng)建成功盖矫。其中負(fù)責(zé)狀態(tài)管理的類為ZookeeperLeaderElector缰揪,字面意思上就可以看出是基于Zookeeper的Leader選舉權(quán)误窖。其中包含了controllerContext當(dāng)前Topic的元數(shù)據(jù)信息以及集群的元數(shù)據(jù)信息等碉京;electionPath為多個KafkaController競爭寫的路徑,其值為/controller课舍;onBecomingLeader為狀態(tài)轉(zhuǎn)化成Leader時候的回調(diào)函數(shù);onResigningAsLeader為狀態(tài)轉(zhuǎn)化位Follower時候的回調(diào)函數(shù)他挎;brokerId為當(dāng)前Broker Server的Id筝尾。ZookeeperLeaderElector啟動后負(fù)責(zé)觀察數(shù)據(jù)節(jié)點(diǎn)狀態(tài),瞬時節(jié)點(diǎn)消失觸發(fā)再次選舉办桨,嘗試寫入的節(jié)點(diǎn)內(nèi)容就是brokerId筹淫。

KafkaController的初始化

當(dāng)選舉為Leader時分為下面幾步:

1. 初始化Kafka集群內(nèi)部的時鐘,存放在Zookeeper的/controller_epoch呢撞,Broker Server用這個值區(qū)分請求的時效性

2. 注冊各種監(jiān)聽函數(shù)损姜,針對Zookeeper不同目錄下Kafka存儲的不同元數(shù)據(jù)進(jìn)行監(jiān)聽。

3. 通過initializeControllerContext()殊霞、replicaStateMachine.startup()和partitionStateMachine.startup()初始化Kafka集群內(nèi)部元數(shù)據(jù)信息摧阅。建立和集群內(nèi)其他狀態(tài)為Follower的KafkaController的通信鏈路,處理集群啟動前沒有及時處理的用戶請求绷蹲,此時可能會變更Kafka集群內(nèi)部的元數(shù)據(jù)信息棒卷,最后通過sendUpdateMetadataRequest()將Kafka集群內(nèi)部的元數(shù)據(jù)信息同步給其他狀態(tài)為Follower的KafkaController。

4. 根據(jù)auto.leader.rebalance.enable配置項(xiàng)按需啟動Kafka集群內(nèi)部的負(fù)載均衡線程祝钢,默認(rèn)開啟

5. 根據(jù)delete.topic.enable配置項(xiàng)按需啟動Kafka集群內(nèi)部的Topic刪除線程比规,默認(rèn)關(guān)閉

選舉為Follower的時候分為下面幾步,正好與Controller相反:

1. 取消Zookeeper路徑上的監(jiān)聽函數(shù)

  1. 根據(jù)delete.topic.enable配置項(xiàng)按需啟動Kafka集群內(nèi)部的Topic刪除線程拦英,默認(rèn)關(guān)閉

3. 關(guān)閉Kafka集群內(nèi)部的負(fù)載均衡線程

4. 斷開和集群內(nèi)其他狀態(tài)為Follower的KafkaController的通信線路

5. 重置集群內(nèi)部時鐘

Topic的分區(qū)狀態(tài)轉(zhuǎn)化機(jī)制

Topic的分區(qū)狀態(tài)維護(hù)是由PartitionStateMachine模塊負(fù)責(zé)的蜒什,通過在/brokers/topics 和 /admin/delete_topics目錄上注冊不同的監(jiān)聽函數(shù),監(jiān)聽Topic的創(chuàng)建和刪除事件觸發(fā)Topic分區(qū)狀態(tài)的轉(zhuǎn)換疤估。

PartitionStateMachine中分區(qū)狀態(tài)由PartitionState用一個字節(jié)表示不同狀態(tài)灾常,分為四種:

  • NonExistentPartition:代表分區(qū)從來沒有被創(chuàng)建或者被創(chuàng)建之后又被刪除呃狀態(tài)

  • NewPartition:分區(qū)剛創(chuàng)建包含AR,但是此時Leader或ISR還沒有創(chuàng)建做裙,處于非活動狀態(tài)無法接收數(shù)據(jù)

  • OnlinePartition:分區(qū)Leader已經(jīng)被選舉岗憋,產(chǎn)生來對應(yīng)的ISR,處于活動狀態(tài)可以接收數(shù)據(jù)

  • OfflinePartition:代表分區(qū)Leader由于某種原因下線時導(dǎo)致分區(qū)暫時不可用

每個狀態(tài)都是由一個合理的前置狀態(tài)轉(zhuǎn)換而來锚贱。

Topic分區(qū)的領(lǐng)導(dǎo)者副本選舉策略

Topic分區(qū)的Leader Replica在不同場景下的選舉策略是不一樣的仔戈,不同選舉策略都基礎(chǔ)PartitionLeaderSelector。其根據(jù)Topic、Partition监徘、當(dāng)前Leader晋修、當(dāng)前的ISR選舉出新的Leader,新的ISR和新的AR(在線狀態(tài))凰盔,共有5種不同的策略:

  • NoOpLeaderSelector:默認(rèn)的選舉策略

  • ReassignedPartitionLeaderSelector:當(dāng)分區(qū)AR重新分配時使用的策略

  • PreferredReplicaPartitionLeaderSelector:集群內(nèi)部自動平衡負(fù)載或者用戶觸發(fā)手動平衡負(fù)載時使用的策略

隨著Topic的新建刪除以及Broker Server的上下線墓卦,原本Topic分區(qū)的Leader Replica在集群中的分布越來越不均勻。 auto.leader.rebalance.enable為true户敬,則會自動觸發(fā)分區(qū)的Leader Replica選舉落剪,或者管理員下發(fā)分區(qū)Leader Replica選舉指令。這會在Zookeeper的 /admin/preferred_replica_election指定具體的Topic和分區(qū)尿庐,此時Leader狀態(tài)的KafkaController監(jiān)測到這個路徑的數(shù)據(jù)變化就會觸發(fā)相應(yīng)的回調(diào)函數(shù)忠怖,促使對應(yīng)的Topic分區(qū)發(fā)生Leader Replica的選舉。

  • OfflinePartitionLeaderSelector:分區(qū)狀態(tài)從OfflinePartition或者NewPartition切換為OnlinePartition時使用的策略

1. 篩選出在線的ISR和AR

2. 優(yōu)先從在線的ISR中選擇抄瑟,如果列表不為空則選擇列表中的第一個凡泣,選舉結(jié)束

3. 在線ISR為空,根據(jù) unclean.leader.election.enable 決定是否從在線的AR中選舉Leader皮假,如果允許鞋拟,則選擇AR列表中的第一個,結(jié)束選舉惹资,如果AR列表為空選舉失敗贺纲。

  • ControllerShutdownLeaderSelector:Leader狀態(tài)的KafkaController處理其他Broker Server下線導(dǎo)致分區(qū)的Leader Replica發(fā)生切換時使用的策略。

1. 篩選出在線的ISR

2. 剔除離線的ISR形成新的ISR列表

3. 如果新的ISR列表不為空褪测,則選舉第一個Replica作為新的Leader哮笆,否則選舉失敗

Topic分區(qū)的副本狀態(tài)轉(zhuǎn)換機(jī)制

Topic分區(qū)的副本狀態(tài)維護(hù)是由ReplicaStateMachine模塊負(fù)責(zé)的,Topic分區(qū)的副本狀態(tài)伴隨著Topic分區(qū)狀態(tài)的變化而變化

分區(qū)副本狀態(tài)只要有7種:

  • NewReplica:分區(qū)剛被分配但是沒有開始工作的狀態(tài)

  • OnlineReplica:分區(qū)副本開始工作時的狀態(tài)汰扭,此時該副本時該分區(qū)的Leader或者Follower

  • OfflineReplica:分區(qū)副本所在的Broker Server宕機(jī)時所導(dǎo)致的副本狀態(tài)

  • ReplicaDeletionStarted:分區(qū)副本下線之后準(zhǔn)備開始刪除的狀態(tài)

  • ReplicaDeletionSuccessful:相關(guān)Broker Server正確響應(yīng)分區(qū)副本被刪除請求之后的狀態(tài)

  • ReplicaDeletionIneligible:相關(guān)Broker Server錯誤響應(yīng)分區(qū)副本被刪除請求之后的狀態(tài)

  • NonExistentReplica:代表分區(qū)副本被徹底刪除之后的狀態(tài)

目標(biāo)狀態(tài)也是由合理的前置狀態(tài)轉(zhuǎn)換而來的稠肘。

KafkaController內(nèi)部的監(jiān)聽器

KafkaController內(nèi)部通過監(jiān)聽函數(shù)來維護(hù)集群的元數(shù)據(jù)。

  • TopicChangeListener:注冊在 /broker/topics 路徑萝毛,監(jiān)聽Topic的創(chuàng)建

  • AddPartitionListener:在Topic創(chuàng)建過程中會在 /broker/topics/[topic]目錄下注冊AddPartitionListener用于監(jiān)聽Topic分區(qū)的變化

  • PartitionReassignedListener:KafkaController轉(zhuǎn)換為Leader的過程中在路徑 /admin/reassign_partitions注冊了PartitionReassignedListener用于監(jiān)聽Topic分區(qū)的重分配项阴。在正式啟動重分配之前會判斷是否需要進(jìn)行重分配,重分配之后的AR列表和當(dāng)前的AR列表不相同并且重分配之后的AR列表所在的Broker Server都在線笆包,滿足上面兩個條件才會觸發(fā)分區(qū)的重分配环揽。

  • ReassignedPartitionsIsrChangeListener:當(dāng)Leader Replica所在的Broker Server接收到來自Follower Replica的FetchRequest請求時,KafkaApis的handleFetchRequest會統(tǒng)計(jì)每個Replica的狀態(tài)庵佣,一旦發(fā)現(xiàn)改Replica同步上Leader Replica之后歉胶,此時會調(diào)用Partition的updateLeaderHWAndMaybeExpandIsr及時更新 /brokers/topics/[topic]/partitions/partitionId/state/ 目錄上的Partition狀態(tài),包括Leader巴粪、ISR等信息通今,監(jiān)聽到分區(qū)狀態(tài)發(fā)生變化會觸發(fā)ReassignedPartitionsIsrChangeListener

  • PreferredReplicaElectionListener:每個Partition可以有多個Replica粥谬,即AR列表。在這個列表中的第一個Replica稱為“Preferred Replica”辫塌,當(dāng)創(chuàng)建Topic時漏策,Kafka要確保所有的Topic的“Preferred Replica”均勻地分布在Kafka集群中。Topic的Partition需要重新均衡Leader Replica至Preferred Replica臼氨,此時會觸發(fā)PreferredReplicaElectionListener

  • BrokerChangeListener:Broker Server的上下線影響著其中所有Replica的狀態(tài)掺喻,因此ReplicaStateMachine在路徑為/broker/topic的路徑上注冊了BrokerChangeListener,用于監(jiān)聽Broker Server的上下線储矩。

Broker Server上線時步驟為:

1. Leader狀態(tài)的KafkaController同步Kafka集群所有的Topic信息給新上線的Broker Server感耙。

2. 將原本位于該Broker Server上的所有Replica狀態(tài)切換至OnlineReplica

3. 如果Partition的Replica有且只有一個,并且正好位于Broker Server上持隧,則切換Partition狀態(tài)至OnlinePartition抑月。

4. 如果分區(qū)重分配之前由于該Broker Server下線導(dǎo)致推出的話,則嘗試重新進(jìn)行分區(qū)重分配舆蝴。

5. 如果之前由于Broker Server下線導(dǎo)致對應(yīng)的Replica無法刪除的話,則恢復(fù)刪除流程题诵。

Broker Server下線步驟:

1. 更新ControllerContext內(nèi)部正在下線的Broker Server列表

2. 將Leader Replica位于該Broker Server上的分區(qū)狀態(tài)切換為OnlinePartition洁仗,緊接著觸發(fā)分區(qū)狀態(tài)切換為OnlinePartition,利用OfflinePartitionLeaderSelector副本選舉策略進(jìn)行Leader Replica的選舉性锭。

3. 將位于該Broker Server上的Replica狀態(tài)切換為OfflineReplica

4. 如果對應(yīng)Replica的Topic處于刪除隊(duì)列中的話赠潦,則標(biāo)記暫時無法刪除。

  • DeleteTopicsListener:監(jiān)聽Topic的刪除

Kafka集群的負(fù)載均衡流程

Partition的AR列表的第一個Replica稱為“Preferred Replica”草冈,并均勻分布在整個Kafka集群中她奥。由于每個Partition只有Leader Replica對外提供讀寫服務(wù),并且Partition創(chuàng)建的時候默認(rèn)的Leader Replica位于Preferred Replica之上怎棱,此時Kafka集群的負(fù)載是均衡的哩俭,如果Kafka集群長時間運(yùn)行,Broker Server中途由于異常而發(fā)生重啟拳恋,此時Partition的Leader Replica會發(fā)生遷移凡资,這樣會導(dǎo)致其Partition的Leader Replica在集群中不再均衡了。

Kafka集群的Topic刪除流程

Topic是由Partition組成的谬运,而Partition是由Replica組成的隙赁,因此只有Partition的Assigned Replica全部被刪除了該P(yáng)artition才可以被刪除;只有Topic的所有Partition都被刪除了梆暖,該Topic才可以最終真正的被刪除伞访。

KafkaController的通信模塊

ControllerChannelManager提供了Leader狀態(tài)的KafkaController和集群其他Broker Server通信的功能,內(nèi)部針對每一個在線的Broker Server會維護(hù)一個通信鏈路轰驳,并分別通過各自的RequestSendThread線程將請求發(fā)送給對應(yīng)的Broker Server厚掷。

Topic管理工具

kafka-topics.sh提供了Topic的創(chuàng)建弟灼、修改、列舉蝗肪、描述袜爪、刪除功能,在內(nèi)部時通過TopicCommand類來實(shí)現(xiàn)的薛闪。

kafka-reassign-partitions.sh提供來重新分配分區(qū)副本的能力辛馆。該工具可以促進(jìn)Kafka集群的負(fù)載均衡。因?yàn)镕ollower Replica需要從Leader Replica Fetch數(shù)據(jù)以保持與與Leader Replica同步豁延,僅保持Leader Replica分布的平衡對整個集群的負(fù)載均衡時不夠的昙篙。另外當(dāng)Kafka集群擴(kuò)容后,該工具可以將已有Topic的Partition遷移到新加入的Broker上诱咏。

分區(qū)重分片是一個異步的流程苔可,因此該腳本還提供了查看當(dāng)前分區(qū)重分配進(jìn)度的指令。

kafka-preferred-replica-election.sh用于在整個集群中恢復(fù)Leader Replica為Preferred Replica袋狞。

生產(chǎn)者

生產(chǎn)者是指消息的生成者焚辅。生產(chǎn)者可以通過特定的分區(qū)函數(shù)決定消息路由到Topic的某個分區(qū)。消息的生產(chǎn)者發(fā)送消息有兩種模式苟鸯,分別為同步模式和異步模式同蜻。

kafka.javaapi.producer.Producer#send方法發(fā)送

指定 metadata.broker.list 屬性,配置Broker地址

指定 partitioner.class 屬性早处,配置分區(qū)函數(shù)湾蔓,分區(qū)函數(shù)決定路由。分區(qū)函數(shù)必須實(shí)現(xiàn) kafka.producer.Partitioner的 partition接口砌梆,參數(shù)為消息key值默责,分區(qū)總數(shù),返回值為分區(qū)的索引咸包。

Producer內(nèi)部包括以下幾個主要模塊:

  • ProducerSendThread:當(dāng)producer.type配置為async桃序,則其主要用于緩存客戶端的KeyedMessage,然后累積到batch.num.messages配置數(shù)量或者間隔 queue.enqueue.timeout.ms配置的時間還沒有獲取到新的客戶端的KeyedMessage烂瘫,則調(diào)用DefaultEventHandler將KeyedMessage發(fā)送出去

  • ProducerPool:緩存客戶端和各個Broker Server的通信葡缰,DefaultEventHandler從ProducerPool中獲取和某個Broker Server的通信對象SyncProducer,然后通過SyncProducer將KeyedMessage發(fā)送給指定的Broker Server忱反。

  • DefaultEventHandler:將KeyedMessage集合按照分區(qū)規(guī)則計(jì)算不同Broker Server所應(yīng)該接收的部分KeyedMessage泛释,然后通過SyncProducer將KeyedMessage發(fā)送出去。在DefaultEventHandler模塊內(nèi)部提供來SyncProducer發(fā)送失敗的重試機(jī)制和平滑擴(kuò)容Broker Server的機(jī)制温算。

發(fā)送模式

生產(chǎn)者由兩種發(fā)送模式:同步和異步

當(dāng)producer.type配置為sync時怜校,同步發(fā)送消息。

當(dāng)producer.type配置為async時注竿,異步發(fā)送消息茄茁。

消費(fèi)者

Kafka提供了兩種不同的方式來獲取消息:簡單消費(fèi)者和高級消費(fèi)者魂贬。簡單消費(fèi)者獲取消息時,用戶需要知道待消費(fèi)的消息位于哪個Topic的哪個分區(qū)裙顽,并且該目的分區(qū)的Leader Replica位于哪個Broker Server上付燥;高級消費(fèi)者獲取消息時,只需要指定待消費(fèi)的消息屬于哪個Topic即可愈犹。

簡單消費(fèi)者

簡單消費(fèi)者提供的客戶端API稱為低級API键科,本質(zhì)上客戶端獲取消息最終時利用FetchRequest請求從目的端Broker Server拉取消息。

FetchRequest請求中可以指定Topic的名稱漩怎,Topic的分區(qū)勋颖,起始偏移量、最大字節(jié)數(shù)勋锤。

客戶端無論生產(chǎn)消息還是消費(fèi)消息饭玲,最終都是通過與目的地端Broker Server建立通信鏈路,并且以阻塞模式允許叁执,然后通過該條鏈路將不同的請求發(fā)送出去茄厘。

高級消費(fèi)者

高級消費(fèi)者以Consumer Group(消費(fèi)組)的形式來管理消息的消費(fèi),以Stream(流)的形式來提供具體消息的讀取谈宛。Stream是指來自若干個Broker Server上的若干個Partition的消息次哈。客戶端需要正確設(shè)置Stream的個數(shù)入挣,并且應(yīng)該針對每個Stream開啟一個線程進(jìn)行消息的消費(fèi)。一個Stream代表了多個Partition消息的聚合硝拧,但是每一個Partition只能映射到一個Stream径筏。

消息的最終獲取是通過遍歷KafkaStream的迭代器ConsumerIterator來逐條獲取的,其數(shù)據(jù)來源于分配給該KafkaStream的阻塞消息隊(duì)列BlockingQueue障陶,而BlockingQueue的數(shù)據(jù)來源針對每個Broker Server的FetchThread線程滋恬。FetchThread線程會將Broker Server上的部分Partition數(shù)據(jù)發(fā)送給對應(yīng)的阻塞消息隊(duì)列BlockingQueue,而KafkaStream正是從該阻塞消息隊(duì)列BlockingQueue中不斷的消費(fèi)消息抱究。

ConsumerThread本質(zhì)上是客戶端的消費(fèi)線程恢氯,消費(fèi)若干個Partition上的數(shù)據(jù),并且與BlockingQueueu相互映射鼓寺,只要確定了ConsumerThread和Partition之間的關(guān)系勋拟,也就確定了BlockingQueue和Partition之間的關(guān)系。Kafka提供了兩種ConsumerThread和Partition的分配算法Range(范圍分區(qū)分配)和RoundRobin(循環(huán)分區(qū)分配)

高級消費(fèi)者中妈候,每個具體消費(fèi)者實(shí)例啟動之后會在/consumers/[group]/ids/的Zookeeper目錄下注冊自己的id敢靡;Kafka集群內(nèi)部Topic會在/brokers/topics/[topic]/的Zookeeper目錄下注冊自己的Partition,因此消費(fèi)者實(shí)例一旦發(fā)現(xiàn)以上2個路徑的數(shù)據(jù)發(fā)生變化時苦银,則會觸發(fā)高級消費(fèi)者的負(fù)載均衡流程啸胧,除此之外赶站,消費(fèi)者實(shí)例一旦和Zookeeper的鏈接重新建立時也會觸發(fā)高級消費(fèi)者的負(fù)載均衡流程。

高級消費(fèi)者內(nèi)部針對Zookeeper的連接建立纺念、Topic的Partition變化贝椿、Consumer的新增會建立3個不同的Listener,分別是ZKSessionExpireListener陷谱、ZKTopicPartitionChangeListener和ZKRebalancerListener烙博。

高級消費(fèi)者消費(fèi)消息時提供了兩種持久化偏移量的機(jī)制,由參數(shù)auto.commit.enable叭首,默認(rèn)為true自動提交习勤。否則需要手動調(diào)用ZookeeperConsumerConnector的commitOffsets。Kafka根據(jù)參數(shù)offsets.storage焙格,默認(rèn)為zookeeper(保存路徑為/consumers/[group]/offset/[topic]/[partition])图毕,可以設(shè)置為kafka(保存再Topic為“__consumer_offsets”的日志中)。高級消費(fèi)者內(nèi)部會自動間隔一定時間(由參數(shù) auto.commit.interval.ms決定眷唉,默認(rèn)60*1000ms)

面試相關(guān)

kafka高吞吐量的原因:

一予颤、順序讀寫磁盤,充分利用了操作系統(tǒng)的預(yù)讀機(jī)制冬阳。

二蛤虐、linux中使用sendfile命令,減少一次數(shù)據(jù)拷貝肝陪,如下驳庭。

①把數(shù)據(jù)從硬盤讀取到內(nèi)核中的頁緩存。

②把數(shù)據(jù)從內(nèi)核中讀取到用戶空間氯窍。(sendfile命令將跳過此步驟)

③把用戶空間中的數(shù)據(jù)寫到socket緩沖區(qū)中饲常。

④操作系統(tǒng)將數(shù)據(jù)從socket緩沖區(qū)中復(fù)制到網(wǎng)卡緩沖區(qū),以便將數(shù)據(jù)經(jīng)網(wǎng)絡(luò)發(fā)出

三狼讨、生產(chǎn)者客戶端緩存消息批量發(fā)送贝淤,消費(fèi)者批量從broker獲取消息,減少網(wǎng)絡(luò)io次數(shù)政供,充分利用磁盤順序讀寫的性能播聪。

四、通常情況下kafka的瓶頸不是cpu或者磁盤布隔,而是網(wǎng)絡(luò)帶寬离陶,所以生產(chǎn)者可以對數(shù)據(jù)進(jìn)行壓縮。

kafka在高并發(fā)的情況下,如何避免消息丟失和消息重復(fù)?

消息丟失解決方案:

首先對kafka進(jìn)行限速衅檀, 其次啟用重試機(jī)制枕磁,重試間隔時間設(shè)置長一些,最后Kafka設(shè)置acks=all术吝,即需要相應(yīng)的所有處于ISR的分區(qū)都確認(rèn)收到該消息后计济,才算發(fā)送成功

消息重復(fù)解決方案:

消息可以使用唯一id標(biāo)識

生產(chǎn)者(ack=all 代表至少成功發(fā)送一次)

消費(fèi)者 (offset手動提交茸苇,業(yè)務(wù)邏輯成功處理后,提交offset)

落表(主鍵或者唯一索引的方式沦寂,避免重復(fù)數(shù)據(jù))

業(yè)務(wù)邏輯處理(選擇唯一主鍵存儲到Redis或者mongdb中学密,先查詢是否存在,若存在則不處理传藏;若不存在腻暮,先插入Redis或Mongdb,再進(jìn)行業(yè)務(wù)邏輯處理)

kafka怎么保證數(shù)據(jù)消費(fèi)一次且僅消費(fèi)一次

冪等producer:保證發(fā)送單個分區(qū)的消息只會發(fā)送一次,不會出現(xiàn)重復(fù)消息

事務(wù)(transaction):保證原子性地寫入到多個分區(qū)毯侦,即寫入到多個分區(qū)的消息要么全部成功哭靖,要么全部回滾流處理EOS:流處理本質(zhì)上可看成是“讀取-處理-寫入”的管道。此EOS保證整個過程的操作是原子性侈离。注意试幽,這只適用于Kafka Streams

kafka保證數(shù)據(jù)一致性和可靠性

數(shù)據(jù)一致性保證

一致性定義:若某條消息對client可見,那么即使Leader掛了卦碾,在新Leader上數(shù)據(jù)依然可以被讀到

HW-HighWaterMark: client可以從Leader讀到的最大msg offset铺坞,即對外可見的最大offset, HW=max(replica.offset)

對于Leader新收到的msg洲胖,client不能立刻消費(fèi)济榨,Leader會等待該消息被所有ISR中的replica同步后,更新HW绿映,此時該消息才能被client消費(fèi)擒滑,這樣就保證了如果Leader fail,該消息仍然可以從新選舉的Leader中獲取叉弦。

對于來自內(nèi)部Broker的讀取請求丐一,沒有HW的限制。同時卸奉,F(xiàn)ollower也會維護(hù)一份自己的HW钝诚,F(xiàn)olloer.HW = min(Leader.HW, Follower.offset)

數(shù)據(jù)可靠性保證

當(dāng)Producer向Leader發(fā)送數(shù)據(jù)時颖御,可以通過acks參數(shù)設(shè)置數(shù)據(jù)可靠性的級別

0: 不論寫入是否成功榄棵,server不需要給Producer發(fā)送Response,如果發(fā)生異常潘拱,server會終止連接疹鳄,觸發(fā)Producer更新meta數(shù)據(jù);

1: Leader寫入成功后即發(fā)送Response芦岂,此種情況如果Leader fail瘪弓,會丟失數(shù)據(jù)

-1: 等待所有ISR接收到消息后再給Producer發(fā)送Response,這是最強(qiáng)保證

kafka到spark streaming怎么保證數(shù)據(jù)完整性禽最,怎么保證數(shù)據(jù)不重復(fù)消費(fèi)腺怯?

保證數(shù)據(jù)不丟失(at-least)

spark RDD內(nèi)部機(jī)制可以保證數(shù)據(jù)at-least語義袱饭。

Receiver方式

開啟WAL(預(yù)寫日志),將從kafka中接受到的數(shù)據(jù)寫入到日志文件中呛占,所有數(shù)據(jù)從失敗中可恢復(fù)虑乖。

Direct方式

依靠checkpoint機(jī)制來保證。

保證數(shù)據(jù)不重復(fù)(exactly-once)

要保證數(shù)據(jù)不重復(fù)晾虑,即Exactly once語義疹味。

  • 冪等操作:重復(fù)執(zhí)行不會產(chǎn)生問題,不需要做額外的工作即可保證數(shù)據(jù)不重復(fù)帜篇。

  • 業(yè)務(wù)代碼添加事務(wù)操作

就是說針對每個partition的數(shù)據(jù)糙捺,產(chǎn)生一個uniqueId,只有這個partition的所有數(shù)據(jù)被完全消費(fèi)笙隙,則算成功洪灯,否則算失效,要回滾逃沿。下次重復(fù)執(zhí)行這個uniqueId時婴渡,如果已經(jīng)被執(zhí)行成功,則skip掉凯亮。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末边臼,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子假消,更是在濱河造成了極大的恐慌柠并,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,755評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件富拗,死亡現(xiàn)場離奇詭異臼予,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)啃沪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評論 3 395
  • 文/潘曉璐 我一進(jìn)店門粘拾,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人创千,你說我怎么就攤上這事缰雇。” “怎么了追驴?”我有些...
    開封第一講書人閱讀 165,138評論 0 355
  • 文/不壞的土叔 我叫張陵械哟,是天一觀的道長。 經(jīng)常有香客問我殿雪,道長暇咆,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,791評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮爸业,結(jié)果婚禮上其骄,老公的妹妹穿的比我還像新娘。我一直安慰自己扯旷,他們只是感情好年栓,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著薄霜,像睡著了一般某抓。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上惰瓜,一...
    開封第一講書人閱讀 51,631評論 1 305
  • 那天否副,我揣著相機(jī)與錄音,去河邊找鬼崎坊。 笑死备禀,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的奈揍。 我是一名探鬼主播曲尸,決...
    沈念sama閱讀 40,362評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼男翰!你這毒婦竟也來了另患?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,264評論 0 276
  • 序言:老撾萬榮一對情侶失蹤蛾绎,失蹤者是張志新(化名)和其女友劉穎昆箕,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體租冠,經(jīng)...
    沈念sama閱讀 45,724評論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡鹏倘,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了顽爹。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片纤泵。...
    茶點(diǎn)故事閱讀 40,040評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖镜粤,靈堂內(nèi)的尸體忽然破棺而出捏题,到底是詐尸還是另有隱情,我是刑警寧澤繁仁,帶...
    沈念sama閱讀 35,742評論 5 346
  • 正文 年R本政府宣布涉馅,位于F島的核電站归园,受9級特大地震影響黄虱,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜庸诱,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評論 3 330
  • 文/蒙蒙 一捻浦、第九天 我趴在偏房一處隱蔽的房頂上張望晤揣。 院中可真熱鬧,春花似錦朱灿、人聲如沸昧识。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽跪楞。三九已至,卻和暖如春侣灶,著一層夾襖步出監(jiān)牢的瞬間甸祭,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評論 1 270
  • 我被黑心中介騙來泰國打工褥影, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留池户,地道東北人。 一個月前我還...
    沈念sama閱讀 48,247評論 3 371
  • 正文 我出身青樓凡怎,卻偏偏與公主長得像校焦,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子统倒,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評論 2 355