Kafka 工作原理

Kafka 架構:
以下是一般 Kafka 的架構

Kafka 架構

多節(jié)點多Broker集群


多節(jié)點多Broker集群

術語

  • Broker
    Kafka集群包含一個或多個服務器特愿,這種服務器被稱為broker,可以水平擴展光酣,一般broker數(shù)量越多疏遏,集群吞吐率越高,而且kafka 每個節(jié)點可以有多個 broker
  • Producer
    負責發(fā)布消息到Kafka broker救军,可以是web前端產(chǎn)生的page view财异,或者是服務器日志,系統(tǒng)CPU唱遭、memory等
  • Consumer
    消費消息戳寸。每個consumer屬于一個特定的consumer group(可為每個consumer指定group name,若不指定group name則屬于默認的group)拷泽。使用consumer high level API時疫鹊,同一topic的一條消息只能被同一個consumer group內(nèi)的一個consumer消費袖瞻,但多個consumer group可同時消費這一消息。
  • Zookeeper
    通過Zookeeper管理集群配置拆吆,選舉leader虏辫,以及在consumer group發(fā)生變化時進行rebalance
  • Topic
    每條發(fā)布到Kafka集群的消息都有一個類別,這個類別被稱為topic锈拨。(物理上不同topic的消息分開存儲,邏輯上一個topic的消息雖然保存于一個或多個broker上但用戶只需指定消息的topic即可生產(chǎn)或消費數(shù)據(jù)而不必關心數(shù)據(jù)存于何處)
  • Partition
    parition是物理上的概念羹唠,每個topic包含一個或多個partition奕枢,創(chuàng)建topic時可指定parition數(shù)量。每個partition對應于一個文件夾佩微,該文件夾下存儲該partition的數(shù)據(jù)和索引文件
  • Segment
    partition物理上由多個segment組成缝彬,每一個segment 數(shù)據(jù)文件都有一個索引文件對應
  • Offset
    每個partition都由一系列有序的、不可變的消息組成哺眯,這些消息被連續(xù)的追加到partition中谷浅。partition中的每個消息都有一個連續(xù)的序列號叫做offset,用于partition唯一標識一條消息.

Push vs. Pull

push模式很難適應消費速率不同的消費者,因為消息發(fā)送速率是由broker決定的奶卓。push模式的目標是盡可能以最快速度傳遞消息一疯,但是這樣很容易造成consumer來不及處理消息,典型的表現(xiàn)就是拒絕服務以及網(wǎng)絡擁塞夺姑。而pull模式則可以根據(jù)consumer的消費能力以適當?shù)乃俾氏M消息墩邀。
所以我們一般在 Kafka 前面再加一個 Log Server,可以用 LevelDB 緩存盏浙,作為一個緩沖眉睹,提高峰值處理能力

Topic & Partition

每條消費都必須指定它的topic,為了使得Kafka的吞吐率可以水平擴展废膘,物理上把topic分成一個或多個partition竹海,每個partition在物理上對應一個文件夾,該文件夾下存儲這個partition的所有消息和索引文件丐黄。

topic中partition存儲分布

假設實驗環(huán)境中Kafka集群只有一個broker,xxx/message-folder為數(shù)據(jù)文件存儲根目錄孵稽,在Kafka broker中server.properties文件配置(參數(shù)log.dirs=xxx/message-folder)许起,例如創(chuàng)建2個topic名稱分別為report_push、launch_info, partitions數(shù)量都為partitions=4
存儲路徑和目錄規(guī)則為:
xxx/message-folder

              |--report_push-0
              |--report_push-1
              |--report_push-2
              |--report_push-3
              |--launch_info-0
              |--launch_info-1
              |--launch_info-2
              |--launch_info-3

在Kafka文件存儲中菩鲜,同一個topic下有多個不同partition园细,每個partition為一個目錄,partiton命名規(guī)則為topic名稱+有序序號接校,第一個partiton序號從0開始猛频,序號最大值為partitions數(shù)量減1狮崩。如果是多broker分布情況,請參考kafka集群partition分布原理分析

partiton中文件存儲方式

下面示意圖形象說明了partition中文件存儲方式:


partition 中的 segments存儲方式

每個partion(目錄)相當于一個巨型文件被平均分配到多個大小相等segment(段)數(shù)據(jù)文件中鹿寻。但每個段segment file消息數(shù)量不一定相等睦柴,這種特性方便old segment file快速被刪除。
每個partiton只需要支持順序讀寫就行了毡熏,segment文件生命周期由服務端配置參數(shù)決定坦敌。
這樣做的好處就是能快速刪除無用文件,有效提高磁盤利用率痢法。

segment文件存儲結構

segment file由2大部分組成狱窘,分別為index file和data file,此兩個文件一一對應财搁,成對出現(xiàn)蘸炸,后綴".index"和“.log”分別表示為segment索引文件數(shù)據(jù)文件.
segment文件命名規(guī)則:

partion全局的第一個segment從0開始尖奔,后續(xù)每個segment文件名為上一個segment文件最后一條消息的offset值搭儒。數(shù)值最大為64位long大小,19位數(shù)字字符長度提茁,沒有數(shù)字用0填充淹禾。

下面文件列表是筆者在Kafka broker上做的一個實驗,創(chuàng)建一個topicXXX包含1 partition(方便觀察大小變化)茴扁,設置每個segment大小為500MB,并啟動producer向Kafka broker寫入大量數(shù)據(jù),如下圖所示segment文件列表形象說明了上述2個規(guī)則:

segment 文件命名規(guī)則

以上述圖中一對segment file文件為例稀拐,說明segment中
index<—->data file 對應關系物理結構如下:
segment 的索引文件和數(shù)據(jù)文件結構及對應關系

上述圖中索引文件存儲大量元數(shù)據(jù),數(shù)據(jù)文件存儲大量消息丹弱,索引文件中元數(shù)據(jù)指向?qū)獢?shù)據(jù)文件中message的物理偏移地址德撬。其中以索引文件中元數(shù)據(jù)3,497為例,依次在數(shù)據(jù)文件中表示第3個message(在全局partiton表示第368772個message)躲胳、以及該消息的物理偏移地址為497蜓洪。
從上述圖了解到segment data file由許多message組成,下面詳細說明message物理結構如下:

segment 數(shù)據(jù)文件中每條消息的結構

在partition中如何通過offset查找message

例如讀取offset=368776的message坯苹,需要通過下面2個步驟查找:

  • 查找segment file
    上述圖2為例隆檀,其中00000000000000000000.index表示最開始的文件,起始偏移量(offset)為0.第二個文件00000000000000368769.index的消息量起始偏移量為368770 = 368769 + 1.同樣粹湃,第三個文件00000000000000737337.index的起始偏移量為737338=737337 + 1恐仑,其他后續(xù)文件依次類推,以起始偏移量命名并排序這些文件为鳄,只要根據(jù)offset 二分查找文件列表裳仆,就可以快速定位具體文件到
    00000000000000368769.index|log
  • 通過segment file查找message
    通過第一步定位到segment file,當offset=368776時孤钦,依次定位到00000000000000368769.index的元數(shù)據(jù)物理位置(368776-368769=7)歧斟,實際上找到了 第6條消息 進而得到
    00000000000000368769.log的物理偏移地址纯丸,然后再通過 .log 文件的物理偏移地址,去 .log 文件順序讀取對應message

從上述圖可知這樣做的優(yōu)點静袖,segment index file采取稀疏索引存儲方式觉鼻,它減少索引文件大小,通過 mmap 可以直接內(nèi)存操作队橙,稀疏索引為數(shù)據(jù)文件的每個對應message設置一個元數(shù)據(jù)指針,它比稠密索引節(jié)省了更多的存儲空間坠陈,但查找起來需要消耗更多的時間

Kafka運行時很少有大量讀磁盤的操作,主要是定期批量寫磁盤操作捐康,因此操作磁盤很高效畅姊。這跟Kafka文件存儲中讀寫message的設計是息息相關的。Kafka中讀寫message有如下特點:
寫message

  • 消息從java堆轉入page cache(即物理內(nèi)存)吹由。
  • 由異步線程刷盤,消息從page cache刷入磁盤。
    讀message
  • 消息直接從page cache轉入socket發(fā)送出去朱嘴。
  • 當從page cache沒有找到相應數(shù)據(jù)時倾鲫,此時會產(chǎn)生磁盤IO,從磁盤Load消息到page cache,然后直接從socket發(fā)出去

message 被分配到 partition 的過程

每一條消息被發(fā)送到broker時,會根據(jù)paritition規(guī)則(有兩種基本的策略萍嬉,一是采用Key Hash算法乌昔,一是采用Round Robin算法)選擇被存儲到哪一個partition。如果partition規(guī)則設置的合理壤追,所有消息可以均勻分布到不同的partition里磕道,這樣就實現(xiàn)了水平擴展。(如果一個topic對應一個文件行冰,那這個文件所在的機器I/O將會成為這個topic的性能瓶頸溺蕉,而partition解決了這個問題)。在創(chuàng)建topic時可以在$KAFKA_HOME/config/server.properties中指定這個partition的數(shù)量(如下所示)悼做,當然也可以在topic創(chuàng)建之后去修改parition數(shù)量疯特。

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3

在發(fā)送一條消息時,可以指定這條消息的key肛走,producer根據(jù)這個key和partition機制來判斷將這條消息發(fā)送到哪個parition漓雅。paritition機制可以通過指定producer的paritition. class這一參數(shù)來指定,該class必須實現(xiàn)kafka.producer.Partitioner
接口朽色。本例中如果key可以被解析為整數(shù)則將對應的整數(shù)與partition總數(shù)取余邻吞,該消息會被發(fā)送到該數(shù)對應的partition。(每個parition都會有個序號)

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class JasonPartitioner<T> implements Partitioner {

public JasonPartitioner(VerifiableProperties verifiableProperties) {}

@Override
public int partition(Object key, int numPartitions) {
try {
int partitionNum = Integer.parseInt((String) key);
return Math.abs(Integer.parseInt((String) key) % numPartitions);
} catch (Exception e) {
return Math.abs(key.hashCode() % numPartitions);
}
}
}

如果將上例中的class作為partitioner.class葫男,并通過如下代碼發(fā)送20條消息(key分別為0抱冷,1,2梢褐,3)至topic2(包含4個partition)徘层。

public void sendMessage() throws InterruptedException{
  for(int i = 1; i <= 5; i++){
   List messageList = new ArrayList<KeyedMessage<String, String>>();
   for(int j = 0; j < 4; j++){
   messageList.add(new KeyedMessage<String, String>("topic2", j+"", "The " + i + " message for key " + j));
   }
   producer.send(messageList);
}
  producer.close();
}

則key相同的消息會被發(fā)送并存儲到同一個partition里峻呕,而且key的序號正好和partition序號相同。(partition序號從0開始趣效,本例中的key也正好從0開始)瘦癌。如下圖所示□尉矗  

message 刪除策略

Kafka集群會保留所有的消息讯私,無論其被消費與否。當然西傀,因為磁盤限制斤寇,不可能永久保留所有數(shù)據(jù)(實際上也沒必要),因此Kafka提供兩種策略去刪除舊數(shù)據(jù)拥褂。一是基于時間娘锁,二是基于partition文件大小。例如可以通過配置$KAFKA_HOME/config/server.properties饺鹃,讓Kafka刪除一周前的數(shù)據(jù)莫秆,也可通過配置讓Kafka在partition文件超過1GB時刪除舊數(shù)據(jù)
這里要注意,因為Kafka讀取特定消息的時間復雜度為O(1)悔详,即與文件大小無關镊屎,所以這里刪除文件與Kafka性能無關,選擇怎樣的刪除策略只與磁盤以及具體的需求有關茄螃。另外缝驳,Kafka會為每一個consumer group保留一些metadata信息–當前消費的消息的position,也即offset归苍。這個offset由consumer控制喷户。正常情況下consumer會在消費完一條消息后線性增加這個offset滤钱。當然抬虽,consumer也可將offset設成一個較小的值或南,重新消費一些消息。因為offet由consumer控制肴敛,所以Kafka broker是無狀態(tài)的署海,它不需要標記哪些消息被哪些consumer過,不需要通過broker去保證同一個consumer group只有一個consumer能消費某一條消息医男,因此也就不需要鎖機制砸狞,這也為Kafka的高吞吐率提供了有力保障。

Replication

Replication與leader election配合提供了自動的failover機制镀梭。replication對Kafka的吞吐率是有一定影響的刀森,但極大的增強了可用性。默認情況下报账,Kafka的replication數(shù)量為1研底〔撼ィ  
每個partition都有一個唯一的leader,所有的讀寫操作都在leader上完成榜晦,follower批量從leader上pull數(shù)據(jù)冠蒋。一般情況下partition的數(shù)量大于等于broker的數(shù)量,并且所有partition的leader均勻分布在broker上乾胶。follower上的日志和其leader上的完全一樣抖剿。
  和大部分分布式系統(tǒng)一樣,Kakfa處理失敗需要明確定義一個broker是否alive识窿。對于Kafka而言斩郎,Kafka存活包含兩個條件,一是它必須維護與Zookeeper的session(這個通過Zookeeper的heartbeat機制來實現(xiàn))喻频。二是follower必須能夠及時將leader的writing復制過來缩宜,不能“落后太多”
  leader會track“in sync”的node list甥温。如果一個follower宕機锻煌,或者落后太多,leader將把它從”in sync” list中移除窿侈。這里所描述的“落后太多”指follower復制的消息落后于leader后的條數(shù)超過預定值,該值可在$KAFKA_HOME/config/server.properties中配置

#If a replica falls more than this many messages behind the leader, the leader will remove the follower from ISR and treat it as dead
replica.lag.max.messages=4000
#If a follower hasn't sent any fetch requests for this window of time, the leader will remove the follower from ISR (in-sync replicas) and treat it as dead
replica.lag.time.max.ms=10000

從 producer 的角度, 發(fā)的數(shù)據(jù)是否會丟秋茫?

需要說明的是史简,Kafka只解決”fail/recover”,不處理“Byzantine”(“拜占庭”)問題肛著。
  一條消息只有被“in sync” list里的所有follower都從leader復制過去才會被認為已commit圆兵。這樣就避免了部分數(shù)據(jù)被寫進了leader,還沒來得及被任何follower復制就宕機了枢贿,而造成數(shù)據(jù)丟失(consumer無法消費這些數(shù)據(jù))殉农。而對于producer而言,它可以選擇是否等待消息commit局荚,這可以通過request.required.acks來設置超凳。這種機制確保了只要“in sync” list有一個或以上的flollower,一條被commit的消息就不會丟失:

  • acks = 0耀态,發(fā)就發(fā)了轮傍,不需要 ack,無論成功與否 首装;
  • acks = 1创夜,當寫 leader replica 成功后就返回,其他的 replica 都是通過fetcher去異步更新的仙逻,當然這樣會有數(shù)據(jù)丟失的風險驰吓,如果leader的數(shù)據(jù)沒有來得及同步涧尿,leader掛了,那么會丟失數(shù)據(jù)檬贰;
  • acks = –1, 要等待所有的replicas都成功后姑廉,才能返回;這種純同步寫的延遲會比較高偎蘸。

所以庄蹋,一般的情況下,thoughput 優(yōu)先迷雪,設成1限书,在極端情況下,是有可能丟失數(shù)據(jù)的章咧; 如果可以接受較長的寫延遲倦西,可以選擇將 acks 設為 –1。
  這里的復制機制即不是同步復制赁严,也不是單純的異步復制扰柠。事實上,同步復制要求“活著的”follower都復制完疼约,這條消息才會被認為commit卤档,這種復制方式極大的影響了吞吐率(高吞吐率是Kafka非常重要的一個特性)。而異步復制方式下程剥,follower異步的從leader復制數(shù)據(jù)劝枣,數(shù)據(jù)只要被leader寫入log就被認為已經(jīng)commit,這種情況下如果follwer都落后于leader织鲸,而leader突然宕機舔腾,則會丟失數(shù)據(jù)。而Kafka的這種使用“in sync” list的方式則很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率搂擦。follower可以批量的從leader復制數(shù)據(jù)稳诚,這樣極大的提高復制性能(批量寫磁盤),極大減少了follower與leader的差距(前文有說到瀑踢,只要follower落后leader不太遠扳还,則被認為在“in sync” list里)。

Leader election

上文說明了Kafka是如何做replication的橱夭,另外一個很重要的問題是當leader宕機了普办,怎樣在follower中選舉出新的leader。因為follower可能落后許多或者crash了徘钥,所以必須確保選擇“最新”的follower作為新的leader衔蹲。一個基本的原則就是,如果leader不在了,新的leader必須擁有原來的leader commit的所有消息舆驶。這就需要作一個折衷橱健,如果leader在標明一條消息被commit前等待更多的follower確認,那在它die之后就有更多的follower可以作為新的leader沙廉,但這也會造成吞吐率的下降拘荡。  
一種非常常用的選舉leader的方式是“majority vote”(“少數(shù)服從多數(shù)”)撬陵,但Kafka并未采用這種方式珊皿。這種模式下,如果我們有2f+1個replica(包含leader和follower)巨税,那在commit之前必須保證有f+1個replica復制完消息蟋定,為了保證正確選出新的leader,fail的replica不能超過f個草添。因為在剩下的任意f+1個replica里驶兜,至少有一個replica包含有最新的所有消息。這種方式有個很大的優(yōu)勢远寸,系統(tǒng)的latency只取決于最快的幾臺server抄淑,也就是說,如果replication factor是3驰后,那latency就取決于最快的那個follower而非最慢那個肆资。majority vote也有一些劣勢,為了保證leader election的正常進行灶芝,它所能容忍的fail的follower個數(shù)比較少郑原。如果要容忍1個follower掛掉,必須要有3個以上的replica监署,如果要容忍2個follower掛掉颤专,必須要有5個以上的replica纽哥。也就是說钠乏,在生產(chǎn)環(huán)境下為了保證較高的容錯程度,必須要有大量的replica春塌,而大量的replica又會在大數(shù)據(jù)量下導致性能的急劇下降晓避。這就是這種算法更多用在Zookeeper這種共享集群配置的系統(tǒng)中而很少在需要存儲大量數(shù)據(jù)的系統(tǒng)中使用的原因。例如HDFS的HA feature是基于majority-vote-based journal只壳,但是它的數(shù)據(jù)存儲并沒有使用這種expensive的方式俏拱。實際上,leader election算法非常多吼句,比如Zookeper的Zab, RaftViewstamped Replication锅必。而Kafka所使用的leader election算法更像微軟的PacificA算法。  
Kafka在Zookeeper中動態(tài)維護了一個ISR(in-sync replicas) set搞隐,這個set里的所有replica都跟上了leader驹愚,只有ISR里的成員才有被選為leader的可能。在這種模式下劣纲,對于f+1個replica逢捺,一個Kafka topic能在保證不丟失已經(jīng)ommit的消息的前提下容忍f個replica的失敗。在大多數(shù)使用場景中癞季,這種模式是非常有利的劫瞳。事實上,為了容忍f個replica的失敗绷柒,majority vote和ISR在commit前需要等待的replica數(shù)量是一樣的志于,但是ISR需要的總的replica的個數(shù)幾乎是majority vote的一半』匝玻 
如果當前ISR中有至少一個Replica還幸存恨憎,則選擇其中一個作為新Leader,新的ISR則包含當前ISR中所有幸存的Replica(選舉算法的實現(xiàn)類似于微軟的PacificA)郊楣。否則選擇該Partition中任意一個幸存的Replica作為新的Leader以及ISR(該場景下可能會有潛在的數(shù)據(jù)丟失)憔恳。如果該Partition的所有Replica都宕機了,則將新的Leader設置為-1净蚤。
   上文說明了一個parition的replication過程钥组,然爾Kafka集群需要管理成百上千個partition,Kafka通過round-robin的方式來平衡partition從而避免大量partition集中在了少數(shù)幾個節(jié)點上今瀑。同時Kafka也需要平衡leader的分布程梦,盡可能的讓所有partition的leader均勻分布在不同broker上。另一方面橘荠,優(yōu)化leadership election的過程也是很重要的屿附,畢竟這段時間相應的partition處于不可用狀態(tài)。一種簡單的實現(xiàn)是暫停宕機的broker上的所有partition哥童,并為之選舉leader挺份。實際上,Kafka選舉一個broker作為controller贮懈,這個controller通過watch Zookeeper檢測所有的broker failure匀泊,并負責為所有受影響的parition選舉leader,再將相應的leader調(diào)整命令發(fā)送至受影響的broker朵你。
這樣做的好處是各聘,可以批量的通知leadership的變化,從而使得選舉過程成本更低抡医,尤其對大量的partition而言躲因。如果controller失敗了,幸存的所有broker都會嘗試在Zookeeper中創(chuàng)建/controller->{this broker id},如果創(chuàng)建成功(只可能有一個創(chuàng)建成功)大脉,則該broker會成為controller搁嗓,若創(chuàng)建不成功,則該broker會等待新controller的命令箱靴。

Consumer group

Ref:

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末腺逛,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子衡怀,更是在濱河造成了極大的恐慌棍矛,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件抛杨,死亡現(xiàn)場離奇詭異够委,居然都是意外死亡,警方通過查閱死者的電腦和手機怖现,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評論 3 385
  • 文/潘曉璐 我一進店門茁帽,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人屈嗤,你說我怎么就攤上這事潘拨。” “怎么了饶号?”我有些...
    開封第一講書人閱讀 157,435評論 0 348
  • 文/不壞的土叔 我叫張陵铁追,是天一觀的道長。 經(jīng)常有香客問我茫船,道長琅束,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,509評論 1 284
  • 正文 為了忘掉前任算谈,我火速辦了婚禮涩禀,結果婚禮上,老公的妹妹穿的比我還像新娘然眼。我一直安慰自己艾船,他們只是感情好,可當我...
    茶點故事閱讀 65,611評論 6 386
  • 文/花漫 我一把揭開白布罪治。 她就那樣靜靜地躺著丽声,像睡著了一般礁蔗。 火紅的嫁衣襯著肌膚如雪觉义。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,837評論 1 290
  • 那天浴井,我揣著相機與錄音晒骇,去河邊找鬼。 笑死,一個胖子當著我的面吹牛洪囤,可吹牛的內(nèi)容都是我干的徒坡。 我是一名探鬼主播,決...
    沈念sama閱讀 38,987評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼瘤缩,長吁一口氣:“原來是場噩夢啊……” “哼喇完!你這毒婦竟也來了?” 一聲冷哼從身側響起剥啤,我...
    開封第一講書人閱讀 37,730評論 0 267
  • 序言:老撾萬榮一對情侶失蹤锦溪,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后府怯,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體刻诊,經(jīng)...
    沈念sama閱讀 44,194評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,525評論 2 327
  • 正文 我和宋清朗相戀三年牺丙,在試婚紗的時候發(fā)現(xiàn)自己被綠了则涯。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,664評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡冲簿,死狀恐怖粟判,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情峦剔,我是刑警寧澤浮入,帶...
    沈念sama閱讀 34,334評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站羊异,受9級特大地震影響事秀,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜野舶,卻給世界環(huán)境...
    茶點故事閱讀 39,944評論 3 313
  • 文/蒙蒙 一易迹、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧平道,春花似錦睹欲、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,764評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至冀墨,卻和暖如春闸衫,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背诽嘉。 一陣腳步聲響...
    開封第一講書人閱讀 31,997評論 1 266
  • 我被黑心中介騙來泰國打工蔚出, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留弟翘,地道東北人。 一個月前我還...
    沈念sama閱讀 46,389評論 2 360
  • 正文 我出身青樓骄酗,卻偏偏與公主長得像稀余,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子趋翻,可洞房花燭夜當晚...
    茶點故事閱讀 43,554評論 2 349

推薦閱讀更多精彩內(nèi)容