kafka的實現(xiàn)原理

什么是Kafka

Kafka是一款分布式消息發(fā)布和訂閱系統(tǒng)褒繁,它的特點是高性能、高吞吐量。

最早設(shè)計的目的是作為LinkedIn的活動流和運(yùn)營數(shù)據(jù)的處理管道。這些數(shù)據(jù)主要是用來對用戶做用戶畫像分析以及服務(wù)器性能數(shù)據(jù)的一些監(jiān)控啰劲。

所以kafka一開始設(shè)計的目標(biāo)就是作為一個分布式、高吞吐量的消息系統(tǒng)板丽,所以適合運(yùn)用在大數(shù)據(jù)傳輸場景翼雀。

Kafka的應(yīng)用場景

由于kafka具有更好的吞吐量兑凿、內(nèi)置分區(qū)、冗余及容錯性的優(yōu)點(kafka每秒可以處理幾十萬消息)荔仁,讓kafka成為了一個很好的大規(guī)模消息處理應(yīng)用的解決方案酥泞。所以在企業(yè)級應(yīng)用長砚殿,主要會應(yīng)用于如下幾個方面

  • 行為跟蹤:kafka可以用于跟蹤用戶瀏覽頁面、搜索及其他行為芝囤。通過發(fā)布-訂閱模式實時記錄到對應(yīng)的topic中似炎,通過后端大數(shù)據(jù)平臺接入處理分析辛萍,并做更進(jìn)一步的實時處理和監(jiān)控

  • 日志收集:日志收集方面,有很多比較優(yōu)秀的產(chǎn)品羡藐,比如Apache Flume贩毕,很多公司使用kafka代理日志聚合。日志聚合表示從服務(wù)器上收集日志文件仆嗦,然后放到一個集中的平臺(文件服務(wù)器)進(jìn)行處理辉阶。在實際應(yīng)用開發(fā)中,我們應(yīng)用程序的log都會輸出到本地的磁盤上瘩扼,排查問題的話通過linux命令來搞定谆甜,如果應(yīng)用程序組成了負(fù)載均衡集群,并且集群的機(jī)器有幾十臺以上集绰,那么想通過日志快速定位到問題规辱,就是很麻煩的事情了。所以一般都會做一個日志統(tǒng)一收集平臺管理log日志用來快速查詢重要應(yīng)用的問題栽燕。所以很多公司的套路都是把應(yīng)用日志集中到kafka上罕袋,然后分別導(dǎo)入到es和hdfs上,用來做實時檢索分析和離線統(tǒng)計數(shù)據(jù)備份等碍岔。而另一方面浴讯,kafka本身又提供了很好的api來集成日志并且做日志收集。


    image.png

Kafka的架構(gòu)

一個典型的kafka集群包含若干Producer(可以是應(yīng)用節(jié)點產(chǎn)生的消息付秕,也可以是通過Flume收集日志產(chǎn)生的事件)兰珍,若干個Broker(kafka支持水平擴(kuò)展)、若干個Consumer Group询吴,以及一個zookeeper集群掠河。kafka通過zookeeper管理集群配置及服務(wù)協(xié)同。Producer使用push模式將消息發(fā)布到broker猛计,consumer通過監(jiān)聽使用pull模式從broker訂閱并消費消息唠摹。

多個broker協(xié)同工作,producer和consumer部署在各個業(yè)務(wù)邏輯中奉瘤。三者通過zookeeper管理協(xié)調(diào)請求和轉(zhuǎn)發(fā)勾拉。這樣就組成了一個高性能的分布式消息發(fā)布和訂閱系統(tǒng)。

圖上有一個細(xì)節(jié)是和其他mq中間件不同的點盗温,producer 發(fā)送消息到broker的過程是push藕赞,而consumer從broker消費消息的過程是pull,主動去拉數(shù)據(jù)卖局。而不是broker把數(shù)據(jù)主動發(fā)送給consumer斧蜕。


image.png

名詞解釋

1)Broker
Kafka集群包含一個或多個服務(wù)器,這種服務(wù)器被稱為broker砚偶。broker端不維護(hù)數(shù)據(jù)的消費狀態(tài)批销,提升了性能洒闸。直接使用磁盤進(jìn)行存儲,線性讀寫均芽,速度快:避免了數(shù)據(jù)在JVM內(nèi)存和系統(tǒng)內(nèi)存之間的復(fù)制丘逸,減少耗性能的創(chuàng)建對象和垃圾回收。
2)Producer
負(fù)責(zé)發(fā)布消息到Kafka broker
3)Consumer
消息消費者掀宋,向Kafka broker讀取消息的客戶端深纲,consumer從broker拉取(pull)數(shù)據(jù)并進(jìn)行處理。
4)Topic
每條發(fā)布到Kafka集群的消息都有一個類別布朦,這個類別被稱為Topic囤萤。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存于一個或多個broker上但用戶只需指定消息的Topic即可生產(chǎn)或消費數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)
5)Partition
Parition是物理上的概念是趴,每個Topic包含一個或多個Partition.
6)Consumer Group
每個Consumer屬于一個特定的Consumer Group(可為每個Consumer指定group name涛舍,若不指定group name則屬于默認(rèn)的group)
7)Topic & Partition
Topic在邏輯上可以被認(rèn)為是一個queue,每條消費都必須指定它的Topic唆途,可以簡單理解為必須指明把這條消息放進(jìn)哪個queue里富雅。為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個或多個Partition肛搬,每個Partition在物理上對應(yīng)一個文件夾没佑,該文件夾下存儲這個Partition的所有消息和索引文件。若創(chuàng)建topic1和topic2兩個topic温赔,且分別有13個和19個分區(qū)蛤奢,則整個集群上會相應(yīng)會生成共32個文件夾(本文所用集群共8個節(jié)點,此處topic1和topic2 replication-factor均為1)陶贼。

Java中使用kafka進(jìn)行通信

依賴

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.0.0</version>
</dependency>

發(fā)送端代碼

public class Producer extends Thread{
      private final KafkaProducer<Integer,String> producer;
      private final String topic;
      public Producer(String topic) {
            Properties properties=new Properties();
 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.13.102:9092,192.168.13.103:9092,192.168.13.104:9092");
            properties.put(ProducerConfig.CLIENT_ID_CONFIG,"practice-producer");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        IntegerSerializer.class.getName());
         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        StringSerializer.class.getName());
            producer=new KafkaProducer<Integer, String>(properties);
            this.topic = topic;
         
    }
    @Override
      public void run() {
            int num=0;
            while(num<50){
                  String msg="pratice test message:"+num;
                  try {
                        producer.send(new ProducerRecord<Integer, String>
                (topic,msg)).get();
                        TimeUnit.SECONDS.sleep(2);
                        num++;
                     
            }
            catch (InterruptedException e) {
                        e.printStackTrace();
                     
            }
            catch (ExecutionException e) {
                        e.printStackTrace();
                     
            }              
        }        
    }
      public static void main(String[] args) {
            new Producer("test").start();        
    }
}

消費端代碼

public class Consumer extends Thread{
      private final KafkaConsumer<Integer,String> consumer;
      private final String topic;
      public Consumer(String topic){
            Properties properties=new Properties();
          
         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.13.102:9092,192
.168.13.103:9092,192.168.13.104:9092");
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "practice-consumer");
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        //設(shè)置offset自動提交
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        //自動提交間隔時間
            properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.IntegerDeserializer");
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        //對于當(dāng)前groupid來說啤贩,消息的offset從最早的消息開始消費
            consumer= new KafkaConsumer<>(properties);
            this.topic=topic;
         
    }
    @Override
      public void run() {
            while(true) {
                  consumer.subscribe(Collections.singleton(this.topic));
                  ConsumerRecords<Integer, String> records =
            consumer.poll(Duration.ofSeconds(1));
                  records.forEach(record -> {
                        System.out.println(record.key() + " " + record.value() + " ->
offset:" + record.offset());
                     
            }
            );
               
        }
         
    }
      public static void main(String[] args) {
            new Consumer("test").start();
         
    }
}

異步發(fā)送

kafka對于消息的發(fā)送,可以支持同步和異步拜秧,前面演示的案例中痹屹,我們是基于同步發(fā)送消息。同步會需要阻塞枉氮,而異步不需要等待阻塞的過程志衍。
從本質(zhì)上來說,kafka都是采用異步的方式來發(fā)送消息到broker聊替,但是kafka并不是每次發(fā)送消息都會直接發(fā)送到broker上楼肪,而是把消息放到了一個發(fā)送隊列中,然后通過一個后臺線程不斷從隊列取出消息進(jìn)行發(fā)送惹悄,發(fā)送成功后會觸發(fā)callback淹辞。kafka客戶端會積累一定量的消息統(tǒng)一組裝成一個批量消息發(fā)送出去,觸發(fā)條件是前面提到的batch.size和linger.ms俘侠。
而同步發(fā)送的方法象缀,無非就是通過future.get()來等待消息的發(fā)送返回結(jié)果,但是這種方法會嚴(yán)重影響消息發(fā)送的性能爷速。

public void run() {
        int num=0;
        while(num<50){
              String msg="pratice test message:"+num;
              try {
                    producer.send(new ProducerRecord<>(topic, msg), new Callback() {
                          @Override
                          public void onCompletion(RecordMetadata recordMetadata,
                Exception e) {
                                System.out.println("callback:
"+recordMetadata.offset()+"->"+recordMetadata.partition());
                             
                }
                       
            }
            );
                    TimeUnit.SECONDS.sleep(2);
                    num++;
                 
        }
        catch (InterruptedException e) {
                    e.printStackTrace();
                 
        }
           
    }
     
}

batch.size

生產(chǎn)者發(fā)送多個消息到broker上的同一個分區(qū)時央星,為了減少網(wǎng)絡(luò)請求帶來的性能開銷,通過批量的方式來提交消息惫东,可以通過這個參數(shù)來控制批量提交的字節(jié)數(shù)大小莉给,默認(rèn)大小是16384byte,也就是16kb,意味著當(dāng)一批消息大小達(dá)到指定的batch.size的時候會統(tǒng)一發(fā)送

linger.ms

Producer默認(rèn)會把兩次發(fā)送時間間隔內(nèi)收集到的所有Requests進(jìn)行一次聚合然后再發(fā)送廉沮,以此提高吞吐量颓遏,而linger.ms就是為每次發(fā)送到broker的請求增加一些delay,以此來聚合更多的Message請求滞时。這個有點想TCP里面的Nagle算法叁幢,在TCP協(xié)議的傳輸中,為了減少大量小數(shù)據(jù)包的發(fā)送坪稽,采用了Nagle算法曼玩,也就是基于小包的等-停協(xié)議。
batch.size和linger.ms這兩個參數(shù)是kafka性能優(yōu)化的關(guān)鍵參數(shù)窒百,batch.size和linger.ms這兩者的作用是一樣的黍判,如果兩個都配置了,那么怎么工作的呢篙梢?實際上顷帖,當(dāng)二者都配置的時候,只要滿足其中一個要求渤滞,就會發(fā)送請求到broker上

一些基礎(chǔ)配置分析

group.id

consumer group是kafka提供的可擴(kuò)展且具有容錯性的消費者機(jī)制贬墩。既然是一個組,那么組內(nèi)必然可以有多個消費者或消費者實例(consumer instance)蔼水,它們共享一個公共的ID震糖,即group ID。組內(nèi)的所有消費者協(xié)調(diào)在一起來消費訂閱主題(subscribed topics)的所有分區(qū)(partition)趴腋。當(dāng)然吊说,每個分區(qū)只能由同一個消費組內(nèi)的一個consumer來消費.如下圖所示,分別有三個消費者优炬,屬于兩個不同的group颁井,那么對于firstTopic這個topic來說,這兩個組的消費者都能同時消費這個topic中的消息蠢护,對于此時的架構(gòu)來說雅宾,這個firstTopic就類似于ActiveMQ中的topic概念。如右圖所示葵硕,如果3個消費者都屬于同一個group眉抬,那么此時firstTopic就是一個Queue的概念


image.png
image.png

enable.auto.commit

消費者消費消息以后自動提交贯吓,只有當(dāng)消息提交以后,該消息才不會被再次接收到蜀变,還可以配合auto.commit.interval.ms控制自動提交的頻率悄谐。
當(dāng)然,我們也可以通過consumer.commitSync()的方式實現(xiàn)手動提交

auto.offset.reset

這個參數(shù)是針對新的groupid中的消費者而言的库北,當(dāng)有新groupid的消費者來消費指定的topic時爬舰,對于該參數(shù)的配置,會有不同的語義寒瓦。
auto.offset.reset=latest情況下情屹,新的消費者將會從其他消費者最后消費的offset處開始消費Topic下的消息。
auto.offset.reset= earliest情況下杂腰,新的消費者會從該topic最早的消息開始消費垃你。
auto.offset.reset=none情況下,新的消費者加入以后颈墅,由于之前不存在offset蜡镶,則會直接拋出異常。

max.poll.records

此設(shè)置限制每次調(diào)用poll返回的消息數(shù)恤筛,這樣可以更容易的預(yù)測每次poll間隔要處理的最大值官还。通過調(diào)整此值,可以減少poll間隔

原理分析

從前面的整個演示過程來看毒坛,只要不是超大規(guī)模的使用kafka望伦,那么基本上沒什么大問題,否則煎殷,對于kafka本身的運(yùn)維的挑戰(zhàn)會很大屯伞,同時,針對每一個參數(shù)的調(diào)優(yōu)也顯得很重要豪直。
據(jù)我了解劣摇,快手在使用kafka集群規(guī)模是挺大的,他們在19年的開發(fā)者大會上有提到

總機(jī)器數(shù)大概2000 臺弓乙;30 多個集群末融;topic 12000 個;一共大概 20 萬 TP(topic partition)暇韧;每天總處理的消息數(shù)超過 4 萬億條勾习;峰值超過 1 億條

文章出處

關(guān)于Topic和Partition

Topic

在kafka中,topic是一個存儲消息的邏輯概念懈玻,可以認(rèn)為是一個消息集合巧婶。每條消息發(fā)送到kafka集群的消息都有一個類別。物理上來說,不同的topic的消息是分開存儲的艺栈,
每個topic可以有多個生產(chǎn)者向它發(fā)送消息英岭,也可以有多個消費者去消費其中的消息。


image.png

Partition

每個topic可以劃分多個分區(qū)(每個Topic至少有一個分區(qū))眼滤,同一topic下的不同分區(qū)包含的消息是不同的巴席。每個消息在被添加到分區(qū)時,都會被分配一個offset(稱之為偏移量)诅需,它是消息在此分區(qū)中的唯一編號,kafka通過offset保證消息在分區(qū)內(nèi)的順序荧库,offset的順序不跨分區(qū)堰塌,即kafka只保證在同一個分區(qū)內(nèi)的消息是有序的。

下圖中分衫,對于名字為test的topic场刑,做了3個分區(qū),分別是p0蚪战、p1牵现、p2.
每一條消息發(fā)送到broker時,會根據(jù)partition的規(guī)則選擇存儲到哪一個partition邀桑。如果partition規(guī)則設(shè)置合理瞎疼,那么所有的消息會均勻的分布在不同的partition中,這樣就有點類似數(shù)據(jù)庫的分庫分表的概念壁畸,把數(shù)據(jù)做了分片處理贼急。


image.png

Topic&Partition的存儲

Partition是以文件的形式存儲在文件系統(tǒng)中,比如創(chuàng)建一個名為firstTopic的topic捏萍,其中有3個partition太抓,那么在kafka的數(shù)據(jù)目錄(/tmp/kafka-log)中就有3個目錄,firstTopic-0~3令杈, 命名規(guī)則是<topic_name>-<partition_id>

sh kafka-topics.sh --create --zookeeper 192.168.11.156:2181 --replication-factor 1 --partitions 3 --topic firstTopic

關(guān)于消息分發(fā)

kafka消息分發(fā)策略

消息是kafka中最基本的數(shù)據(jù)單元走敌,在kafka中,一條消息由key逗噩、value兩部分構(gòu)成掉丽,在發(fā)送一條消息時,我們可以指定這個key给赞,那么producer會根據(jù)key和partition機(jī)制來判斷當(dāng)前這條消息應(yīng)該發(fā)送并存儲到哪個partition中机打。我們可以根據(jù)需要進(jìn)行擴(kuò)展producer的partition機(jī)制。

自定義Partitioner

public class MyPartitioner implements Partitioner {
      private Random random = new Random();
      @Override
      public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
            //獲取集群中指定topic的所有分區(qū)信息
            List<PartitionInfo> partitionInfos=cluster.partitionsForTopic(s);
            int numOfPartition=partitionInfos.size();
            int partitionNum=0;
            if(o==null){
                   //key沒有設(shè)置
                  partitionNum=random.nextint(numOfPartition);
                //隨機(jī)指定分區(qū)               
            } else{
                  partitionNum=Math.abs((o1.hashCode()))%numOfPartition;               
            }
            System.out.println("key->"+o+",value->"+o1+"->send to partition:"+partitionNum);
            return partitionNum;         
    }
}

發(fā)送端代碼添加自定義分區(qū)

public KafkaProducerDemo(String topic,Boolean isAysnc){
      Properties properties=new Properties();
      properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
          "192.168.13.102:9092,192.168.13.103:9092,192.168.13.104:9092");
      properties.put(ProducerConfig.CLIENT_ID_CONFIG,"KafkaProducerDemo");
      properties.put(ProducerConfig.ACKS_CONFIG,"-1");
      properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.IntegerSerializer");
      properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer");
     properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.wei.kafka.MyPartitioner");
      producer=new KafkaProducer<Integer, String>(properties);
      this.topic=topic;
      this.isAysnc=isAysnc;
}

消息默認(rèn)的分發(fā)機(jī)制

默認(rèn)情況下片迅,kafka采用的是hash取模的分區(qū)算法残邀。如果Key為null,則會隨機(jī)分配一個分區(qū)。這個隨機(jī)是在這個參數(shù)”metadata.max.age.ms”的時間范圍內(nèi)隨機(jī)選擇一個芥挣。對于這個時間段內(nèi)驱闷,如果key為null,則只會發(fā)送到唯一的分區(qū)空免。這個值值哦默認(rèn)情況下是10分鐘更新一次空另。

關(guān)于Metadata,這個之前沒講過蹋砚,簡單理解就是Topic/Partition和broker的映射關(guān)系扼菠,每一個topic的每一個partition,需要知道對應(yīng)的broker列表是什么坝咐,leader是誰循榆、follower是誰。這些信息都是存儲在Metadata這個類里面墨坚。

消費端如何消費指定的分區(qū)

通過下面的代碼秧饮,就可以消費指定該topic下的0號分區(qū)。其他分區(qū)的數(shù)據(jù)就無法接收

//消費指定分區(qū)的時候泽篮,不需要再訂閱
//kafkaConsumer.subscribe(Collections.singletonList(topic));
//消費指定的分區(qū)
TopicPartition topicPartition=new TopicPartition(topic,0);
kafkaConsumer.assign(Arrays.asList(topicPartition));

消息的消費原理

在實際生產(chǎn)過程中盗尸,每個topic都會有多個partitions,多個partitions的好處在于帽撑,一方面能夠?qū)roker上的數(shù)據(jù)進(jìn)行分片有效減少了消息的容量從而提升io性能泼各。另外一方面,為了提高消費端的消費能力油狂,一般會通過多個consumer去消費同一個topic 历恐,也就是消費端的負(fù)載均衡機(jī)制,也就是我們接下來要了解的专筷,在多個partition以及多個consumer的情況下弱贼,消費者是如何消費消息的。
kafka存在consumer group的概念磷蛹,也就是group.id一樣的consumer吮旅,這些consumer屬于一個consumer group,組內(nèi)的所有消費者協(xié)調(diào)在一起來消費訂閱主題的所有分區(qū)味咳。當(dāng)然每一個分區(qū)只能由同一個消費組內(nèi)的consumer來消費庇勃,那么同一個consumergroup里面的consumer是怎么去分配該消費哪個分區(qū)里的數(shù)據(jù)的呢?如下圖所示槽驶,3個分區(qū)责嚷,3個消費者,那么哪個消費者消分哪個分區(qū)掂铐?


image.png

對于上面這個圖來說罕拂,這3個消費者會分別消費test這個topic 的3個分區(qū)揍异,也就是每個consumer消費一個partition。

  • 演示1(3個partiton對應(yīng)3個consumer)
    ? 創(chuàng)建一個帶3個分區(qū)的topic
    ? 啟動3個消費者消費同一個topic爆班,并且這3個consumer屬于同一個組
    ? 啟動發(fā)送者進(jìn)行消息發(fā)送

演示結(jié)果:consumer1會消費partition0分區(qū)衷掷、consumer2會消費partition1分區(qū)、consumer3會消費partition2分區(qū)
如果是2個consumer消費3個partition呢柿菩?會是怎么樣的結(jié)果戚嗅?

  • 演示2(3個partiton對應(yīng)2個consumer)
    ? 基于上面演示的案例的topic不變
    ? 啟動2個消費這消費該topic
    ? 啟動發(fā)送者進(jìn)行消息發(fā)送
    演示結(jié)果:consumer1會消費partition0/partition1分區(qū)、consumer2會消費partition2分區(qū)

  • 演示3(3個partition對應(yīng)4個或以上consumer)
    演示結(jié)果:仍然只有3個consumer對應(yīng)3個partition枢舶,其他的consumer無法消費消息
    通過這個演示的過程懦胞,引出接下來需要了解的kafka的分區(qū)分配策略(Partition Assignment Strategy)

consumer和partition的數(shù)量建議

  1. 如果consumer比partition多,是浪費祟辟,因為kafka的設(shè)計是在一個partition上是不允許并發(fā)的医瘫,所以consumer數(shù)不要大于partition數(shù)
  2. 如果consumer比partition少,一個consumer會對應(yīng)于多個partitions旧困,這里主要合理分配consumer數(shù)和partition數(shù),否則會導(dǎo)致partition里面的數(shù)據(jù)被取的不均勻稼锅。最好partiton數(shù)目是consumer數(shù)目的整數(shù)倍吼具,所以partition數(shù)目很重要,比如取24矩距,就很容易設(shè)定consumer數(shù)目
  3. 如果consumer從多個partition讀到數(shù)據(jù)拗盒,不保證數(shù)據(jù)間的順序性,kafka只保證在一個partition上數(shù)據(jù)是有序的锥债,但多個partition陡蝇,根據(jù)你讀的順序會有不同
  4. 增減consumer,broker哮肚,partition會導(dǎo)致rebalance登夫,所以rebalance后consumer對應(yīng)的partition會發(fā)生變化

什么是分區(qū)分配策略

通過前面的案例演示,我們應(yīng)該能猜到允趟,同一個group中的消費者對于一個topic中的多個partition恼策,存在一定的分區(qū)分配策略。
在kafka中潮剪,存在三種分區(qū)分配策略涣楷,一種是Range(默認(rèn))、 另一種是RoundRobin(輪詢)抗碰、StickyAssignor(粘性)狮斗。 在消費端中的ConsumerConfig中,通過這個屬性來指定分區(qū)分配策略

public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";

RangeAssignor(范圍分區(qū))

Range策略是對每個主題而言的弧蝇,首先對同一個主題里面的分區(qū)按照序號進(jìn)行排序碳褒,并對消費者按照字母順序進(jìn)行排序折砸。

假設(shè)n = 分區(qū)數(shù)/消費者數(shù)量
m= 分區(qū)數(shù)%消費者數(shù)量
那么前m個消費者每個分配n+l個分區(qū),后面的(消費者數(shù)量-m)個消費者每個分配n個分區(qū)

假設(shè)我們有10個分區(qū)骤视,3個消費者鞍爱,排完序的分區(qū)將會是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消費者線程排完序?qū)荂1-0, C2-0, C3-0专酗。然后將partitions的個數(shù)除于消費者線程的總數(shù)來決定每個消費者線程消費幾個分區(qū)睹逃。如果除不盡,那么前面幾個消費者線程將會多消費一個分區(qū)祷肯。在我們的例子里面沉填,我們有10個分區(qū),3個消費者線程佑笋, 10 / 3 = 3翼闹,而且除不盡,那么消費者線程 C1-0 將會多消費一個分區(qū).
結(jié)果看起來是這樣的:
C1-0 將消費 0, 1, 2, 3 分區(qū)
C2-0 將消費 4, 5, 6 分區(qū)
C3-0 將消費 7, 8, 9 分區(qū)

假如我們有11個分區(qū)蒋纬,那么最后分區(qū)分配的結(jié)果看起來是這樣的:
C1-0 將消費 0, 1, 2, 3 分區(qū)
C2-0 將消費 4, 5, 6, 7 分區(qū)
C3-0 將消費 8, 9, 10 分區(qū)

假如我們有2個主題(T1和T2)猎荠,分別有10個分區(qū),那么最后分區(qū)分配的結(jié)果看起來是這樣的:
C1-0 將消費 T1主題的 0, 1, 2, 3 分區(qū)以及 T2主題的 0, 1, 2, 3分區(qū)
C2-0 將消費 T1主題的 4, 5, 6 分區(qū)以及 T2主題的 4, 5, 6分區(qū)
C3-0 將消費 T1主題的 7, 8, 9 分區(qū)以及 T2主題的 7, 8, 9分區(qū)

可以看出蜀备,C1-0 消費者線程比其他消費者線程多消費了2個分區(qū)关摇,這就是Range strategy的一個很明顯的弊端

RoundRobinAssignor(輪詢分區(qū))

輪詢分區(qū)策略是把所有partition和所有consumer線程都列出來,然后按照hashcode進(jìn)行排序碾阁。最后通過輪詢算法分配partition給消費線程输虱。如果所有consumer實例的訂閱是相同的,那么partition會均勻分布脂凶。

在我們的例子里面宪睹,假如按照 hashCode 排序完的topic-partitions組依次為T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我們的消費者線程排序為C1-0, C1-1, C2-0, C2-1蚕钦,最后分區(qū)分配的結(jié)果為:
C1-0 將消費 T1-5, T1-2, T1-6 分區(qū)亭病;
C1-1 將消費 T1-3, T1-1, T1-9 分區(qū);
C2-0 將消費 T1-0, T1-4 分區(qū)冠桃;
C2-1 將消費 T1-8, T1-7 分區(qū)命贴;

使用輪詢分區(qū)策略必須滿足兩個條件:

  1. 每個主題的消費者實例具有相同數(shù)量的流
  2. 每個消費者訂閱的主題必須是相同的

StrickyAssignor 分配策略

kafka在0.11.x版本支持了StrickyAssignor, 翻譯過來叫粘滯策略,它主要有兩個目的:

  • 分區(qū)的分配盡可能的均勻
  • 分區(qū)的分配盡可能和上次分配保持相同

當(dāng)兩者發(fā)生沖突時食听, 第 一 個目標(biāo)優(yōu)先于第二個目標(biāo)胸蛛。 鑒于這兩個目標(biāo)狗准, StickyAssignor分配策略的具體實現(xiàn)要比RangeAssignor和RoundRobinAssi gn or這兩種分配策略要復(fù)雜得多蔼囊,假設(shè)我們有這樣一個場景:

假設(shè)消費組有3個消費者:C0,C1,C2放接,它們分別訂閱了4個Topic(t0,t1,t2,t3),并且每個主題有兩個分區(qū)(p0,p1),也就是說冈欢,整個消費組訂閱了8個分區(qū):tOpO 椿胯、 tOpl 迈喉、 tlpO 仑扑、 tlpl 放航、 t2p0 、t2pl 嚷量、t3p0 陋桂、 t3pl
那么最終的分配場景結(jié)果為
CO: tOpO、tlpl 蝶溶、 t3p0
Cl: tOpl嗜历、t2p0 、 t3pl
C2: tlpO抖所、t2pl
這種分配方式有點類似于輪詢策略梨州,但實際上并不是,因為假設(shè)這個時候田轧,C1這個消費者掛了暴匠,就勢必會造成重新分區(qū)(reblance),如果是輪詢傻粘,那么結(jié)果應(yīng)該是
CO: tOpO每窖、tlpO、t2p0弦悉、t3p0
C2: tOpl岛请、tlpl、t2pl警绩、t3pl
然后,strickyAssignor它是一種粘滯策略盅称,所以它會滿足`分區(qū)的分配盡可能和上次分配保持相同肩祥,所以分配結(jié)果應(yīng)該是
消費者CO: tOpO、tlpl 缩膝、 t3p0混狠、t2p0
消費者C2: tlpO、t2pl疾层、tOpl将饺、t3pl
也就是說,C0和C2保留了上一次是的分配結(jié)果痛黎,并且把原來C1的分區(qū)分配給了C0和C2予弧。 這種策略的好處是使得分區(qū)發(fā)生變化時,由于分區(qū)的“粘性湖饱,減少了不必要的分區(qū)移動

誰來執(zhí)行Rebalance以及管理consumer的group呢掖蛤?

Kafka提供了一個角色:coordinator來執(zhí)行對于consumer group的管理,Kafka提供了一個角色:coordinator來執(zhí)行對于consumer group的管理井厌,當(dāng)consumer group的第一個consumer啟動的時候蚓庭,它會去和kafka server確定誰是它們組的coordinator致讥。之后該group內(nèi)的所有成員都會和該coordinator進(jìn)行協(xié)調(diào)通信

如何確定coordinator

consumer group如何確定自己的coordinator是誰呢, 消費者向kafka集群中的任意一個broker發(fā)送一個GroupCoordinatorRequest請求,服務(wù)端會返回一個負(fù)載最小的broker節(jié)點的id器赞,并將該broker設(shè)置為coordinator

JoinGroup的過程

在rebalance之前垢袱,需要保證coordinator是已經(jīng)確定好了的,整個rebalance的過程分為兩個步驟港柜,Join和Sync

join: 表示加入到consumer group中请契,在這一步中,所有的成員都會向coordinator發(fā)送joinGroup的請求潘懊。一旦所有成員都發(fā)送了joinGroup請求姚糊,那么coordinator會選擇一個consumer擔(dān)任leader角色,并把組成員信息和訂閱信息發(fā)送消費者
leader選舉算法比較簡單授舟,如果消費組內(nèi)沒有l(wèi)eader救恨,那么第一個加入消費組的消費者就是消費者leader,如果這個時候leader消費者退出了消費組释树,那么重新選舉一個leader肠槽,這個選舉很隨意,類似于隨機(jī)算法


image.png

protocol_metadata: 序列化后的消費者的訂閱信息
leader_id: 消費組中的消費者奢啥,coordinator會選擇一個座位leader秸仙,對應(yīng)的就是member_id
member_metadata 對應(yīng)消費者的訂閱信息
members:consumer group中全部的消費者的訂閱信息
generation_id: 年代信息,類似于之前講解zookeeper的時候的epoch是一樣的桩盲,對于每一輪rebalance寂纪,generation_id都會遞增。主要用來保護(hù)consumer group赌结。隔離無效的offset提交捞蛋。也就是上一輪的consumer成員無法提交offset到新的consumer group中。

每個消費者都可以設(shè)置自己的分區(qū)分配策略柬姚,對于消費組而言拟杉,會從各個消費者上報過來的分區(qū)分配策略中選舉一個彼此都贊同的策略來實現(xiàn)整體的分區(qū)分配,這個"贊同"的規(guī)則是量承,消費組內(nèi)的各個消費者會通過投票來決定

  • 在joingroup階段搬设,每個consumer都會把自己支持的分區(qū)分配策略發(fā)送到coordinator
  • coordinator手機(jī)到所有消費者的分配策略,組成一個候選集
  • 每個消費者需要從候選集里找出一個自己支持的策略撕捍,并且為這個策略投票
  • 最終計算候選集中各個策略的選票數(shù)拿穴,票數(shù)最多的就是當(dāng)前消費組的分配策略

Synchronizing Group State階段

完成分區(qū)分配之后,就進(jìn)入了Synchronizing Group State階段卦洽,主要邏輯是向GroupCoordinator發(fā)送SyncGroupRequest請求贞言,并且處理SyncGroupResponse響應(yīng),簡單來說阀蒂,就是leader將消費者對應(yīng)的partition分配方案同步給consumer group 中的所有consumer


image.png

每個消費者都會向coordinator發(fā)送syncgroup請求该窗,不過只有l(wèi)eader節(jié)點會發(fā)送分配方案弟蚀,其他消費者只是打打醬油而已。當(dāng)leader把方案發(fā)給coordinator以后酗失,coordinator會把結(jié)果設(shè)置到SyncGroupResponse中义钉。這樣所有成員都知道自己應(yīng)該消費哪個分區(qū)。

consumer group的分區(qū)分配方案是在客戶端執(zhí)行的规肴!Kafka將這個權(quán)利下放給客戶端主要是因為這樣做可以有更好的靈活性

總結(jié)

我們再來總結(jié)一下consumer group rebalance的過程
? 對于每個consumer group子集捶闸,都會在服務(wù)端對應(yīng)一個GroupCoordinator進(jìn)行管理,GroupCoordinator會在zookeeper上添加watcher拖刃,當(dāng)消費者加入或者退出consumer group時删壮,會修改zookeeper上保存的數(shù)據(jù),從而觸發(fā)GroupCoordinator開始Rebalance操作
? 當(dāng)消費者準(zhǔn)備加入某個Consumer group或者GroupCoordinator發(fā)生故障轉(zhuǎn)移時兑牡,消費者并不知道GroupCoordinator的在網(wǎng)絡(luò)中的位置央碟,這個時候就需要確定GroupCoordinator,消費者會向集群中的任意一個Broker節(jié)點發(fā)送ConsumerMetadataRequest請求均函,收到請求的broker會返回一個response作為響應(yīng)亿虽,其中包含管理當(dāng)前ConsumerGroup的GroupCoordinator,
? 消費者會根據(jù)broker的返回信息苞也,連接到groupCoordinator洛勉,并且發(fā)送HeartbeatRequest,發(fā)送心跳的目的是要要奧噶蘇GroupCoordinator這個消費者是正常在線的如迟。當(dāng)消費者在指定時間內(nèi)沒有發(fā)送心跳請求收毫,則GroupCoordinator會觸發(fā)Rebalance操作。

? 發(fā)起join group請求殷勘,兩種情況

  • 如果GroupCoordinator返回的心跳包數(shù)據(jù)包含異常牛哺,說明GroupCoordinator因為前面說的幾種情況導(dǎo)致了Rebalance操作,那這個時候劳吠,consumer會發(fā)起join group請求
  • 新加入到consumer group的consumer確定好了GroupCoordinator以后消費者會向GroupCoordinator發(fā)起join group請求,GroupCoordinator會收集全部消費者信息之后巩趁,來確認(rèn)可用的消費者痒玩,并從中選取一個消費者成為group_leader。并把相應(yīng)的信息(分區(qū)分配策略议慰、leader_id蠢古、…)封裝成response返回給所有消費者,但是只有g(shù)roup leader會收到當(dāng)前consumer group中的所有消費者信息别凹。當(dāng)消費者確定自己是group leader以后草讶,會根據(jù)消費者的信息以及選定分區(qū)分配策略進(jìn)行分區(qū)分配
  • 接著進(jìn)入Synchronizing Group State階段,每個消費者會發(fā)送SyncGroupRequest請求到GroupCoordinator炉菲,但是只有Group Leader的請求會存在分區(qū)分配結(jié)果堕战,GroupCoordinator會根據(jù)Group Leader的分區(qū)分配結(jié)果形成SyncGroupResponse返回給所有的Consumer坤溃。
  • consumer根據(jù)分配結(jié)果,執(zhí)行相應(yīng)的操作

到這里為止嘱丢,我們已經(jīng)知道了消息的發(fā)送分區(qū)策略薪介,以及消費者的分區(qū)消費策略和rebalance。對于應(yīng)用層面來說越驻,還有一個最重要的東西沒有講解汁政,就是offset,他類似一個游標(biāo)缀旁,表示當(dāng)前消費的消息的位置记劈。

如何保存消費端的消費位置

什么是offset

前面在講解partition的時候,提到過offset并巍, 每個topic可以劃分多個分區(qū)(每個Topic至少有一個分區(qū))目木,同一topic下的不同分區(qū)包含的消息是不同的。每個消息在被添加到分區(qū)時履澳,都會被分配一個offset(稱之為偏移量)嘶窄,它是消息在此分區(qū)中的唯一編號,kafka通過offset保證消息在分區(qū)內(nèi)的順序距贷,offset的順序不跨分區(qū)柄冲,即kafka只保證在同一個分區(qū)內(nèi)的消息是有序的; 對于應(yīng)用層的消費來說忠蝗,每次消費一個消息并且提交以后现横,會保存當(dāng)前消費到的最近的一個offset。那么offset保存在哪里阁最?

image.png

offset在哪里維護(hù)戒祠?

在kafka中,提供了一個consumer_offsets_* 的一個topic速种,把offset信息寫入到這個topic中姜盈。
consumer_offsets——按保存了每個consumer group某一時刻提交的offset信息。
__consumer_offsets 默認(rèn)有50個分區(qū)配阵。
根據(jù)前面我們演示的案例馏颂,我們設(shè)置了一個KafkaConsumerDemo的groupid。首先我們需要找到這個consumer_group保存在哪個分區(qū)中
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KafkaConsumerDemo");
計算公式:
Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ; 由于默認(rèn)情況下groupMetadataTopicPartitionCount有50個分區(qū)棋傍,計算得到的結(jié)果為:35, 意味著當(dāng)前的consumer_group的位移信息保存在__consumer_offsets的第35個分區(qū)
執(zhí)行如下命令救拉,可以查看當(dāng)前consumer_goup中的offset位移提交的信息

kafka-console-consumer.sh --topic __consumer_offsets --partition 15 --bootstrap-server 192.168.13.102:9092,192.168.13.103:9092,192.168.13.104:9092
--formatter
'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'

從輸出結(jié)果中,我們就可以看到test這個topic的offset的位移日志

分區(qū)的副本機(jī)制

我們已經(jīng)知道Kafka的每個topic都可以分為多個Partition瘫拣,并且多個partition會均勻分布在集群的各個節(jié)點下亿絮。雖然這種方式能夠有效的對數(shù)據(jù)進(jìn)行分片,但是對于每個partition來說,都是單點的派昧,當(dāng)其中一個partition不可用的時候黔姜,那么這部分消息就沒辦法消費。所以kafka為了提高partition的可靠性而提供了副本的概念(Replica),通過副本機(jī)制來實現(xiàn)冗余備份斗锭。

每個分區(qū)可以有多個副本地淀,并且在副本集合中會存在一個leader的副本,所有的讀寫請求都是由leader副本來進(jìn)行處理岖是。剩余的其他副本都做為follower副本帮毁,follower副本會從leader副本同步消息日志。
這個有點類似zookeeper中l(wèi)eader和follower的概念豺撑,但是具體的時間方式還是有比較大的差異烈疚。所以我們可以認(rèn)為,副本集會存在一主多從的關(guān)系聪轿。

一般情況下爷肝,同一個分區(qū)的多個副本會被均勻分配到集群中的不同broker上,當(dāng)leader副本所在的broker出現(xiàn)故障后陆错,可以重新選舉新的leader副本繼續(xù)對外提供服務(wù)灯抛。通過這樣的副本機(jī)制來提高kafka集群的可用性。

創(chuàng)建一個帶副本機(jī)制的topic

通過下面的命令去創(chuàng)建帶2個副本的topic

sh kafka-topics.sh --create --zookeeper 192.168.11.156:2181 --replication-factor 3 --partitions 3 --topic secondTopic

然后我們可以在/tmp/kafka-log路徑下看到對應(yīng)topic的副本信息了音瓷。我們通過一個圖形的方式來表達(dá)对嚼。
針對secondTopic這個topic的3個分區(qū)對應(yīng)的3個副本


image.png

如何知道那個各個分區(qū)中對應(yīng)的leader是誰呢?

在zookeeper服務(wù)器上绳慎,通過如下命令去獲取對應(yīng)分區(qū)的信息, 比如下面這個是獲取secondTopic第1個
分區(qū)的狀態(tài)信息纵竖。

get /brokers/topics/secondTopic/partitions/1/state

{"controller_epoch":12,"leader":0,"version":1,"leader_epoch":0,"isr":[0,1]}
或通過這個命令

sh kafka-topics.sh --zookeeper 192.168.13.106:2181 --describe --topic test_partition

leader表示當(dāng)前分區(qū)的leader是那個broker-id。下圖中杏愤。綠色線條的表示該分區(qū)中的leader節(jié)點靡砌。其他節(jié)點就為follower


image.png

需要注意的是,kafka集群中的一個broker中最多只能有一個副本珊楼,leader副本所在的broker節(jié)點的分區(qū)叫l(wèi)eader節(jié)點通殃,follower副本所在的broker節(jié)點的分區(qū)叫follower節(jié)點

副本的leader選舉

Kafka提供了數(shù)據(jù)復(fù)制算法保證,如果leader副本所在的broker節(jié)點宕機(jī)或者出現(xiàn)故障厕宗,或者分區(qū)的leader節(jié)點發(fā)生故障邓了,這個時候怎么處理呢?
那么媳瞪,kafka必須要保證從follower副本中選擇一個新的leader副本。那么kafka是如何實現(xiàn)選舉的呢照宝?
要了解leader選舉蛇受,我們需要了解幾個概念
Kafka分區(qū)下有可能有很多個副本(replica)用于實現(xiàn)冗余,從而進(jìn)一步實現(xiàn)高可用厕鹃。副本根據(jù)角色的不同可分為3類:

  • leader副本:響應(yīng)clients端讀寫請求的副本
  • follower副本:被動地備份leader副本中的數(shù)據(jù)兢仰,不能響應(yīng)clients端讀寫請求乍丈。
  • ISR副本:包含了leader副本和所有與leader副本保持同步的follower副本——如何判定是否與leader同步后面會提到每個Kafka副本對象都有兩個重要的屬性:LEO和HW。注意是所有的副本把将,而不只是leader副本轻专。
  • LEO:即日志末端位移(log end offset),記錄了該副本底層日志(log)中下一條消息的位移值察蹲。注意是下一條消息请垛!也就是說,如果LEO=10洽议,那么表示該副本保存了10條消息宗收,位移值范圍是[0, 9]。另外亚兄,leader LEO和follower LEO的更新是有區(qū)別的混稽。我們后面會詳細(xì)說
  • HW:即上面提到的水位值。對于同一個副本對象而言审胚,其HW值不會大于LEO值匈勋。小于等于HW值的所有消息都被認(rèn)為是“已備份”的(replicated)。同理膳叨,leader副本和follower副本的HW更新是有區(qū)別的
    從生產(chǎn)者發(fā)出的 一 條消息首先會被寫入分區(qū)的leader 副本洽洁,不過還需要等待ISR集合中的所有follower副本都同步完之后才能被認(rèn)為已經(jīng)提交,之后才會更新分區(qū)的HW, 進(jìn)而消費者可以消費到這條消息懒鉴。

副本協(xié)同機(jī)制

剛剛提到了诡挂,消息的讀寫操作都只會由leader節(jié)點來接收和處理。follower副本只負(fù)責(zé)同步數(shù)據(jù)以及當(dāng)leader副本所在的broker掛了以后临谱,會從follower副本中選取新的leader璃俗。

寫請求首先由Leader副本處理,之后follower副本會從leader上拉取寫入的消息悉默,這個過程會有一定的延遲城豁,導(dǎo)致follower副本中保存的消息略少于leader副本,但是只要沒有超出閾值都可以容忍抄课。但是如果一個follower副本出現(xiàn)異常唱星,比如宕機(jī)、網(wǎng)絡(luò)斷開等原因長時間沒有同步到消息跟磨,那這個時候间聊,leader就會把它踢出去。kafka通過ISR集合來維護(hù)一個分區(qū)副本信息


image.png

一個新leader被選舉并被接受客戶端的消息成功寫入抵拘。Kafka確保從同步副本列表中選舉一個副本為leader哎榴;leader負(fù)責(zé)維護(hù)和跟蹤ISR(in-Sync replicas , 副本同步隊列)中所有follower滯后的狀態(tài)。當(dāng)producer發(fā)送一條消息到broker后尚蝌,leader寫入消息并復(fù)制到所有follower迎变。消息提交之后才被成功復(fù)制到所有的同步副本。

ISR

ISR表示目前“可用且消息量與leader相差不多的副本集合飘言,這是整個副本集合的一個子集”衣形。怎么去理解可用和相差不多這兩個詞呢?具體來說姿鸿,ISR集合中的副本必須滿足兩個條件:

  1. 副本所在節(jié)點必須維持著與zookeeper的連接
  2. 副本最后一條消息的offset與leader副本的最后一條消息的offset之間的差值不能超過指定的閾值
    (replica.lag.time.max.ms) replica.lag.time.max.ms:如果該follower在此時間間隔內(nèi)一直沒有追上過leader的所有消息谆吴,則該follower就會被剔除isr列表
  3. ISR數(shù)據(jù)保存在Zookeeper的 /brokers/topics/<topic>/partitions/<partitionId>/state 節(jié)點中

follower副本把leader副本LEO之前的日志全部同步完成時,則認(rèn)為follower副本已經(jīng)追趕上了leader副本般妙,這個時候會更新這個副本的lastCaughtUpTimeMs標(biāo)識纪铺,kafk副本管理器會啟動一個副本過期檢查的定時任務(wù),這個任務(wù)會定期檢查當(dāng)前時間與副本的lastCaughtUpTimeMs的差值是否大于參數(shù)replica.lag.time.max.ms 的值碟渺,如果大于鲜锚,則會把這個副本踢出ISR集合

image.png

如何處理所有的Replica不工作的情況

在ISR中至少有一個follower時,Kafka可以確保已經(jīng)commit的數(shù)據(jù)不丟失苫拍,但如果某個Partition的所有Replica都宕機(jī)了芜繁,就無法保證數(shù)據(jù)不丟失了

  1. 等待ISR中的任一個Replica“活”過來,并且選它作為Leader
  2. 選擇第一個“活”過來的Replica(不一定是ISR中的)作為Leader

這就需要在可用性和一致性當(dāng)中作出一個簡單的折衷绒极。
如果一定要等待ISR中的Replica“活”過來骏令,那不可用的時間就可能會相對較長。而且如果ISR中的所有Replica都無法“活”過來了垄提,或者數(shù)據(jù)都丟失了榔袋,這個Partition將永遠(yuǎn)不可用。
選擇第一個“活”過來的Replica作為Leader铡俐,而這個Replica不是ISR中的Replica凰兑,那即使它并不保證已經(jīng)包含了所有已commit的消息,它也會成為Leader而作為consumer的數(shù)據(jù)源(所有讀寫都由Leader完成)审丘。在我們課堂講的版本中吏够,使用的是第一種策略。

副本數(shù)據(jù)同步原理

了解了副本的協(xié)同過程以后滩报,還有一個最重要的機(jī)制锅知,就是數(shù)據(jù)的同步過程。它需要解決

  1. 怎么傳播消息
  2. 在向消息發(fā)送端返回ack之前需要保證多少個Replica已經(jīng)接收到這個消息

下圖中脓钾,深紅色部分表示test_replica分區(qū)的leader副本售睹,另外兩個節(jié)點上淺色部分表示follower副本


image.png

Producer在發(fā)布消息到某個Partition時,

  • 先通過ZooKeeper找到該P(yáng)artition的Leader get /brokers/topics/<topic>/partitions/2/state 可训,然后無論該Topic的Replication Factor為多少(也即該P(yáng)artition有多少個Replica)昌妹,Producer只將該消息發(fā)送到該P(yáng)artition的Leader生真。
  • Leader會將該消息寫入其本地Log。每個Follower都從Leader pull數(shù)據(jù)捺宗。這種方式上,F(xiàn)ollower存儲的數(shù)據(jù)順序與Leader保持一致川蒙。
  • Follower在收到該消息并寫入其Log后蚜厉,向Leader發(fā)送ACK。
  • 一旦Leader收到了ISR中的所有Replica的ACK畜眨,該消息就被認(rèn)為已經(jīng)commit了昼牛,Leader將增加HW(HighWatermark)并且向Producer發(fā)送ACK。

LEO:即日志末端位移(log end offset)康聂,記錄了該副本底層日志(log)中下一條消息的位移值贰健。注意是下一條消息!也就是說恬汁,如果LEO=10伶椿,那么表示該副本保存了10條消息,位移值范圍是[0, 9]氓侧。另外脊另,leader LEO和follower LEO的更新是有區(qū)別的。

HW:即上面提到的水位值(Hight Water)约巷。對于同一個副本對象而言偎痛,其HW值不會大于LEO值。小于等于HW值的所有消息都被認(rèn)為是“已備份”的(replicated)独郎。同理踩麦,leader副本和follower副本的HW更新是有區(qū)別的

通過下面這幅圖來表達(dá)LEO、HW的含義氓癌,隨著follower副本不斷和leader副本進(jìn)行數(shù)據(jù)同步谓谦,follower副本的LEO會主鍵后移并且追趕到leader副本,這個追趕上的判斷標(biāo)準(zhǔn)是當(dāng)前副本的LEO是否大于或者等于leader副本的HW顽铸,這個追趕上也會使得被踢出的follower副本重新加入到ISR集合中茁计。
另外, 假如說下圖中的最右側(cè)的follower副本被踢出ISR集合谓松,也會導(dǎo)致這個分區(qū)的HW發(fā)生變化星压,變成了3


image.png

數(shù)據(jù)丟失的問題

表達(dá)的含義是,至少需要多少個副本同步才能表示消息是提交的鬼譬, 所以娜膘,當(dāng) min.insync.replicas=1的時候,一旦消息被寫入leader端log即被認(rèn)為是“已提交”优质,而延遲一輪FETCH RPC更新HW值的設(shè)計使得follower HW值是異步延遲更新的竣贪,倘若在這個過程中l(wèi)eader發(fā)生變更军洼,那么成為新leader的follower的HW值就有可能是過期的,使得clients端認(rèn)為是成功提交的消息被刪除演怎。


image.png

acks配置表示producer發(fā)送消息到broker上以后的確認(rèn)值匕争。有三個可選項
0:表示producer不需要等待broker的消息確認(rèn)。這個選項時延最小但同時風(fēng)險最大(因為當(dāng)server宕機(jī)時爷耀,數(shù)據(jù)將會丟失)甘桑。
1:表示producer只需要獲得kafka集群中的leader節(jié)點確認(rèn)即可,這個選擇時延較小同時確保了leader節(jié)點確認(rèn)接收成功歹叮。
all(-1):需要ISR中所有的Replica給予接收確認(rèn)跑杭,速度最慢,安全性最高咆耿,但是由于ISR可能會縮小到僅包含一個Replica德谅,所以設(shè)置參數(shù)為all并不能一定避免數(shù)據(jù)丟失,

數(shù)據(jù)丟失的解決方案

在kafka0.11.0.0版本之后萨螺,引入了一個leader epoch來解決這個問題窄做,所謂的leader epoch實際上是一對值(epoch,offset)屑迂,epoch代表leader的版本號浸策,從0開始遞增,當(dāng)leader發(fā)生過變更惹盼,epoch就+1庸汗,而offset則是對應(yīng)這個epoch版本的leader寫入第一條消息的offset,比如
(0,0), (1,50) ,表示第一個leader從offset=0開始寫消息手报,一共寫了50條蚯舱。第二個leader版本號是1,從offset=50開始寫掩蛤,這個信息會持久化在對應(yīng)的分區(qū)的本地磁盤上枉昏,文件名是 /tmp/kafka-log/topic/leader-epoch-checkpoint 。
leader broker中會保存這樣一個緩存揍鸟,并且定期寫入到checkpoint文件中
當(dāng)leader寫log時它會嘗試更新整個緩存: 如果這個leader首次寫消息兄裂,則會在緩存中增加一個條目;否則就不做更新阳藻。而每次副本重新成為leader時會查詢這部分緩存晰奖,獲取出對應(yīng)leader版本的offset

我們基于同樣的情況來分析,follower宕機(jī)并且恢復(fù)之后腥泥,有兩種情況匾南,如果這個時候leader副本沒有掛,也就是意味著沒有發(fā)生leader選舉蛔外,那么follower恢復(fù)之后并不會去截斷自己的日志蛆楞,而是先發(fā)送一個OffsetsForLeaderEpochRequest請求給到leader副本溯乒,leader副本收到請求之后返回當(dāng)前的LEO。
如果follower副本的leaderEpoch和leader副本的epoch相同豹爹, leader的leo只可能大于或者等于follower副本的leo值裆悄,所以這個時候不會發(fā)生截斷
如果follower副本和leader副本的epoch值不同,那么leader副本會查找follower副本傳過來的epoch+1在本地文件中存儲的StartOffset返回給follower副本臂聋,也就是新leader副本的LEO灯帮。這樣也避免了數(shù)據(jù)丟失的問題
如果leader副本宕機(jī)了重新選舉新的leader,那么原本的follower副本就會變成leader,意味著epoch從0變成1,使得原本follower副本中LEO的值的到了保留唠叛。

Leader副本的選舉過程

  1. KafkaController會監(jiān)聽ZooKeeper的/brokers/ids節(jié)點路徑值朋,一旦發(fā)現(xiàn)有broker掛了,執(zhí)行下面的邏輯吁恍。這里暫時先不考慮KafkaController所在broker掛了的情況扒秸,KafkaController掛了,各個broker會重新leader選舉出新的KafkaController
  2. leader副本在該broker上的分區(qū)就要重新進(jìn)行l(wèi)eader選舉冀瓦,目前的選舉策略是
    a) 優(yōu)先從isr列表中選出第一個作為leader副本伴奥,這個叫優(yōu)先副本,理想情況下有限副本就是該分區(qū)的leader副本
    b) 如果isr列表為空翼闽,則查看該topic的unclean.leader.election.enable配置拾徙。
    unclean.leader.election.enable:為true則代表允許選用非isr列表的副本作為leader,那么此時就意味著數(shù)據(jù)可能丟失感局,為false的話尼啡,則表示不允許,直接拋出NoReplicaOnlineException異常询微,造成leader副本選舉失敗崖瞭。
    c) 如果上述配置為true,則從其他副本中選出一個作為leader副本撑毛,并且isr列表只包含該leader副本书聚。一旦選舉成功,則將選舉后的leader和isr和其他副本信息寫入到該分區(qū)的對應(yīng)的zk路徑上藻雌。

消息的存儲

消息發(fā)送端發(fā)送消息到broker上以后雌续,消息是如何持久化的呢?那么接下來去分析下消息的存儲首先我們需要了解的是蹦疑,kafka是使用日志文件的方式來保存生產(chǎn)者和發(fā)送者的消息西雀,每條消息都有一個offset值來表示它在分區(qū)中的偏移量。Kafka中存儲的一般都是海量的消息數(shù)據(jù)歉摧,為了避免日志文件過大艇肴,Log并不是直接對應(yīng)在一個磁盤上的日志文件腔呜,而是對應(yīng)磁盤上的一個目錄,這個目錄的命名規(guī)則是<topic_name>_<partition_id>

消息的文件存儲機(jī)制

一個topic的多個partition在物理磁盤上的保存路徑再悼,路徑保存在 /tmp/kafka-logs/topic_partition核畴,包含日志文件、索引文件和時間索引文件

image.png

kafka是通過分段的方式將Log分為多個LogSegment冲九,LogSegment是一個邏輯上的概念谤草,一個LogSegment對應(yīng)磁盤上的一個日志文件和一個索引文件,其中日志文件是用來記錄消息的莺奸。索引文件是用來保存消息的索引丑孩。那么這個LogSegment是什么呢?

LogSegment

假設(shè)kafka以partition為最小存儲單位灭贷,那么我們可以想象當(dāng)kafka producer不斷發(fā)送消息温学,必然會引起partition文件的無線擴(kuò)張,這樣對于消息文件的維護(hù)以及被消費的消息的清理帶來非常大的挑戰(zhàn)甚疟,所以kafka 以segment為單位又把partition進(jìn)行細(xì)分仗岖。每個partition相當(dāng)于一個巨型文件被平均分配到多個大小相等的segment數(shù)據(jù)文件中(每個segment文件中的消息不一定相等),這種特性方便已經(jīng)被消費的消息的清理览妖,提高磁盤的利用率轧拄。

  • log.segment.bytes=107370 (設(shè)置分段大小),默認(rèn)是1gb,我們把這個值調(diào)小以后讽膏,可以看到日志分段的效果
  • 抽取其中3個分段來進(jìn)行分析


    image.png

    segment file由2大部分組成檩电,分別為index file和data file,此2個文件一一對應(yīng)府树,成對出現(xiàn)是嗜,后綴".index"和“.log”分別表示為segment索引文件、數(shù)據(jù)文件.
    segment文件命名規(guī)則:partion全局的第一個segment從0開始挺尾,后續(xù)每個segment文件名為上一個segment文件最后一條消息的offset值進(jìn)行遞增鹅搪。數(shù)值最大為64位long大小,20位數(shù)字字符長度遭铺,沒有數(shù)字用0填充

查看segment文件命名規(guī)則

通過下面這條命令可以看到kafka消息日志的內(nèi)容

sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.log --print-data-log

假如第一個log文件的最后一個offset為:5376,所以下一個segment的文件命名為:
00000000000000005376.log丽柿。對應(yīng)的index為00000000000000005376.index

segment中index和log的對應(yīng)關(guān)系

從所有分段中,找一個分段進(jìn)行分析
為了提高查找消息的性能魂挂,為每一個日志文件添加2個索引索引文件:OffsetIndex 和 TimeIndex甫题,分別對應(yīng).index以及.timeindex, TimeIndex索引文件格式:它是映射時間戳和相對offset
查看索引內(nèi)容:

sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.index --print-data-log
image.png

如圖所示,index中存儲了索引以及物理偏移量涂召。 log存儲了消息的內(nèi)容坠非。索引文件的元數(shù)據(jù)執(zhí)行對應(yīng)數(shù)據(jù)文件中message的物理偏移地址。舉個簡單的案例來說果正,以[4053,80899]為例炎码,在log文件中盟迟,對應(yīng)的是第4053條記錄,物理偏移量(position)為80899. position是ByteBuffer的指針位置

在partition中如何通過offset查找message

查找的算法是

  1. 根據(jù)offset的值潦闲,查找segment段中的index索引文件攒菠。由于索引文件命名是以上一個文件的最后一個offset進(jìn)行命名的,所以歉闰,使用二分查找算法能夠根據(jù)offset快速定位到指定的索引文件辖众。
  2. 找到索引文件后,根據(jù)offset進(jìn)行定位和敬,找到索引文件中的符合范圍的索引凹炸。(kafka采用稀疏索引的方式來提高查找性能)
  3. 得到position以后,再到對應(yīng)的log文件中昼弟,從position出開始查找offset對應(yīng)的消息还惠,將每條消息的offset與目標(biāo)offset進(jìn)行比較,直到找到消息

比如說私杜,我們要查找offset=2490這條消息,那么先找到00000000000000000000.index, 然后找到[2487,49111]這個索引救欧,再到log文件中衰粹,根據(jù)49111這個position開始查找,比較每條消息的offset是否大于等于2490笆怠。最后查找到對應(yīng)的消息以后返回

Log文件的消息內(nèi)容分析

前面我們通過kafka提供的命令铝耻,可以查看二進(jìn)制的日志文件信息,一條消息蹬刷,會包含很多的字段瓢捉。

offset: 5371 position: 102124 CreateTime: 1531477349286 isvalid: true keysize:
-1 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1
sequence: -1 isTransactional: false headerKeys: [] payload: message_5371

offset和position這兩個前面已經(jīng)講過了、 createTime表示創(chuàng)建時間办成、keysize和valuesize表示key和value的大小泡态、 compresscodec表示壓縮編碼、payload:表示消息的具體內(nèi)容

日志的清除策略以及壓縮策略

日志清除策略

前面提到過迂卢,日志的分段存儲某弦,一方面能夠減少單個文件內(nèi)容的大小,另一方面而克,方便kafka進(jìn)行日志清理靶壮。日志的清理策略有兩個:

  1. 根據(jù)消息的保留時間,當(dāng)消息在kafka中保存的時間超過了指定的時間员萍,就會觸發(fā)清理過程
  2. 根據(jù)topic存儲的數(shù)據(jù)大小腾降,當(dāng)topic所占的日志文件大小大于一定的閥值,則可以開始刪除最舊的消息碎绎。kafka會啟動一個后臺線程螃壤,定期檢查是否存在可以刪除的消息

通過log.retention.bytes和log.retention.hours這兩個參數(shù)來設(shè)置抗果,當(dāng)其中任意一個達(dá)到要求,都會執(zhí)行刪除映穗。
默認(rèn)的保留時間是:7天

日志壓縮策略

Kafka還提供了“日志壓縮(Log Compaction)”功能窖张,通過這個功能可以有效的減少日志文件的大小,緩解磁盤緊張的情況蚁滋,在很多實際場景中宿接,消息的key和value的值之間的對應(yīng)關(guān)系是不斷變化的,就像數(shù)據(jù)庫中的數(shù)據(jù)會不斷被修改一樣辕录,消費者只關(guān)心key對應(yīng)的最新的value睦霎。因此,我們可以開啟kafka的日志壓縮功能走诞,服務(wù)端會在后臺啟動啟動Cleaner線程池副女,定期將相同的key進(jìn)行合并,只保留最新的value值蚣旱。日志的壓縮原理是


image.png

磁盤存儲的性能問題

磁盤存儲的性能優(yōu)化

我們現(xiàn)在大部分企業(yè)仍然用的是機(jī)械結(jié)構(gòu)的磁盤碑幅,如果把消息以隨機(jī)的方式寫入到磁盤,那么磁盤首先要做的就是尋址塞绿,也就是定位到數(shù)據(jù)所在的物理地址沟涨,在磁盤上就要找到對應(yīng)的柱面、磁頭以及對應(yīng)的扇區(qū)异吻;這個過程相對內(nèi)存來說會消耗大量時間裹赴,為了規(guī)避隨機(jī)讀寫帶來的時間消耗,kafka采用順序?qū)懙姆绞酱鎯?shù)據(jù)诀浪。即使是這樣棋返,但是頻繁的I/O操作仍然會造成磁盤的性能瓶頸

零拷貝

消息從發(fā)送到落地保存,broker維護(hù)的消息日志本身就是文件目錄雷猪,每個文件都是二進(jìn)制保存睛竣,生產(chǎn)者和消費者使用相同的格式來處理。在消費者獲取消息時求摇,服務(wù)器先從硬盤讀取數(shù)據(jù)到內(nèi)存酵颁,然后把內(nèi)存中的數(shù)據(jù)原封不動的通過socket發(fā)送給消費者。雖然這個操作描述起來很簡單月帝,但實際上經(jīng)歷了很多步驟躏惋。

操作系統(tǒng)將數(shù)據(jù)從磁盤讀入到內(nèi)核空間的頁緩存:
? 應(yīng)用程序?qū)?shù)據(jù)從內(nèi)核空間讀入到用戶空間緩存中
? 應(yīng)用程序?qū)?shù)據(jù)寫回到內(nèi)核空間到socket緩存中
? 操作系統(tǒng)將數(shù)據(jù)從socket緩沖區(qū)復(fù)制到網(wǎng)卡緩沖區(qū),以便將數(shù)據(jù)經(jīng)網(wǎng)絡(luò)發(fā)出


image.png

通過“零拷貝”技術(shù)嚷辅,可以去掉這些沒必要的數(shù)據(jù)復(fù)制操作簿姨,同時也會減少上下文切換次數(shù)。現(xiàn)代的unix操作系統(tǒng)提供一個優(yōu)化的代碼路徑,用于將數(shù)據(jù)從頁緩存?zhèn)鬏數(shù)絪ocket扁位;在Linux中准潭,是通過sendfile系統(tǒng)調(diào)用來完成的。Java提供了訪問這個系統(tǒng)調(diào)用的方法:FileChannel.transferTo API
使用sendfile域仇,只需要一次拷貝就行刑然,允許操作系統(tǒng)將數(shù)據(jù)直接從頁緩存發(fā)送到網(wǎng)絡(luò)上。所以在這個優(yōu)化的路徑中暇务,只有最后一步將數(shù)據(jù)拷貝到網(wǎng)卡緩存中是需要的


image.png

頁緩存

頁緩存是操作系統(tǒng)實現(xiàn)的一種主要的磁盤緩存泼掠,但凡設(shè)計到緩存的,基本都是為了提升i/o性能垦细,所以頁緩存是用來減少磁盤I/O操作的择镇。
磁盤高速緩存有兩個重要因素:
第一,訪問磁盤的速度要遠(yuǎn)低于訪問內(nèi)存的速度括改,若從處理器L1和L2高速緩存訪問則速度更快腻豌。
第二,數(shù)據(jù)一旦被訪問嘱能,就很有可能短時間內(nèi)再次訪問吝梅。正是由于基于訪問內(nèi)存比磁盤快的多,所以磁盤的內(nèi)存緩存將給系統(tǒng)存儲性能帶來質(zhì)的飛越惹骂。

當(dāng) 一 個進(jìn)程準(zhǔn)備讀取磁盤上的文件內(nèi)容時苏携, 操作系統(tǒng)會先查看待讀取的數(shù)據(jù)所在的頁(page)是否在頁緩存(pagecache)中,如果存在(命中)則直接返回數(shù)據(jù)析苫, 從而避免了對物理磁盤的I/0操作;如果沒有命中穿扳, 則操作系統(tǒng)會向磁盤發(fā)起讀取請求并將讀取的數(shù)據(jù)頁存入頁緩存衩侥, 之后再將數(shù)據(jù)返回給進(jìn)程。
同樣矛物,如果 一 個進(jìn)程需要將數(shù)據(jù)寫入磁盤茫死, 那么操作系統(tǒng)也會檢測數(shù)據(jù)對應(yīng)的頁是否在頁緩存中,如果不存在履羞, 則會先在頁緩存中添加相應(yīng)的頁峦萎, 最后將數(shù)據(jù)寫入對應(yīng)的頁。 被修改過后的頁也就變成了臟頁忆首, 操作系統(tǒng)會在合適的時間把臟頁中的數(shù)據(jù)寫入磁盤爱榔, 以保持?jǐn)?shù)據(jù)的 一 致性
Kafka中大量使用了頁緩存, 這是Kafka實現(xiàn)高吞吐的重要因素之 一 糙及。 雖然消息都是先被寫入頁緩存详幽,然后由操作系統(tǒng)負(fù)責(zé)具體的刷盤任務(wù)的, 但在Kafka中同樣提供了同步刷盤及間斷性強(qiáng)制刷盤(fsync),可以通過 log.flush.interval.messages 和 log.flush.interval.ms 參數(shù)來控制。
同步刷盤能夠保證消息的可靠性唇聘,避免因為宕機(jī)導(dǎo)致頁緩存數(shù)據(jù)還未完成同步時造成的數(shù)據(jù)丟失版姑。但是實際使用上,我們沒必要去考慮這樣的因素以及這種問題帶來的損失迟郎,消息可靠性可以由多副本來解決剥险,同步刷盤會帶來性能的影響。 刷盤的操作由操作系統(tǒng)去完成即可

Kafka消息的可靠性

沒有一個中間件能夠做到百分之百的完全可靠宪肖,可靠性更多的還是基于幾個9的衡量指標(biāo)表制,比如4個9、5個9. 軟件系統(tǒng)的可靠性只能夠無限去接近100%匈庭,但不可能達(dá)到100%夫凸。所以kafka如何是實現(xiàn)最大可能的可靠性呢?

  • 分區(qū)副本阱持, 你可以創(chuàng)建更多的分區(qū)來提升可靠性夭拌,但是分區(qū)數(shù)過多也會帶來性能上的開銷,一般來說衷咽,3個副本就能滿足對大部分場景的可靠性要求

  • acks鸽扁,生產(chǎn)者發(fā)送消息的可靠性,也就是我要保證我這個消息一定是到了broker并且完成了多副本的持久化镶骗,但這種要求也同樣會帶來性能上的開銷桶现。它有幾個可選項:
    1 ,生產(chǎn)者把消息發(fā)送到leader副本鼎姊,leader副本在成功寫入到本地日志之后就告訴生產(chǎn)者消息提交成功骡和,但是如果isr集合中的follower副本還沒來得及同步leader副本的消息,leader掛了相寇,就會造成消息丟失慰于。
    -1 ,消息不僅僅寫入到leader副本唤衫,并且被ISR集合中所有副本同步完成之后才告訴生產(chǎn)者已經(jīng)提交成功婆赠,這個時候即使leader副本掛了也不會造成數(shù)據(jù)丟失。
    0:表示producer不需要等待broker的消息確認(rèn)佳励。這個選項時延最小但同時風(fēng)險最大(因為當(dāng)server宕機(jī)時休里,數(shù)據(jù)將會丟失)。

  • 保障消息到了broker之后赃承,消費者也需要有一定的保證妙黍,因為消費者也可能出現(xiàn)某些問題導(dǎo)致消息沒有消費到。

  • enable.auto.commit默認(rèn)為true瞧剖,也就是自動提交offset废境,自動提交是批量執(zhí)行的,有一個時間窗口,這種方式會帶來重復(fù)提交或者消息丟失的問題噩凹,所以對于高可靠性要求的程序巴元,要使用手動提交。 對于高可靠要求的應(yīng)用來說驮宴,寧愿重復(fù)消費也不應(yīng)該因為消費異常而導(dǎo)致消息丟失

    ——學(xué)自咕泡學(xué)院

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末逮刨,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子堵泽,更是在濱河造成了極大的恐慌修己,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,270評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件迎罗,死亡現(xiàn)場離奇詭異睬愤,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)纹安,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,489評論 3 395
  • 文/潘曉璐 我一進(jìn)店門尤辱,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人厢岂,你說我怎么就攤上這事光督。” “怎么了塔粒?”我有些...
    開封第一講書人閱讀 165,630評論 0 356
  • 文/不壞的土叔 我叫張陵结借,是天一觀的道長。 經(jīng)常有香客問我卒茬,道長船老,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,906評論 1 295
  • 正文 為了忘掉前任圃酵,我火速辦了婚禮柳畔,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘辜昵。我一直安慰自己荸镊,他們只是感情好咽斧,可當(dāng)我...
    茶點故事閱讀 67,928評論 6 392
  • 文/花漫 我一把揭開白布堪置。 她就那樣靜靜地躺著,像睡著了一般张惹。 火紅的嫁衣襯著肌膚如雪舀锨。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,718評論 1 305
  • 那天宛逗,我揣著相機(jī)與錄音坎匿,去河邊找鬼。 笑死,一個胖子當(dāng)著我的面吹牛替蔬,可吹牛的內(nèi)容都是我干的告私。 我是一名探鬼主播,決...
    沈念sama閱讀 40,442評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼承桥,長吁一口氣:“原來是場噩夢啊……” “哼驻粟!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起凶异,我...
    開封第一講書人閱讀 39,345評論 0 276
  • 序言:老撾萬榮一對情侶失蹤蜀撑,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后剩彬,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體酷麦,經(jīng)...
    沈念sama閱讀 45,802評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,984評論 3 337
  • 正文 我和宋清朗相戀三年喉恋,在試婚紗的時候發(fā)現(xiàn)自己被綠了沃饶。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,117評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡瀑晒,死狀恐怖绍坝,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情苔悦,我是刑警寧澤轩褐,帶...
    沈念sama閱讀 35,810評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站玖详,受9級特大地震影響把介,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜蟋座,卻給世界環(huán)境...
    茶點故事閱讀 41,462評論 3 331
  • 文/蒙蒙 一拗踢、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧向臀,春花似錦巢墅、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,011評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至芹彬,卻和暖如春蓄髓,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背舒帮。 一陣腳步聲響...
    開封第一講書人閱讀 33,139評論 1 272
  • 我被黑心中介騙來泰國打工会喝, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留陡叠,地道東北人。 一個月前我還...
    沈念sama閱讀 48,377評論 3 373
  • 正文 我出身青樓肢执,卻偏偏與公主長得像枉阵,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子预茄,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,060評論 2 355

推薦閱讀更多精彩內(nèi)容