2018-08-26

Kafka的基本概念

  • Broker

    Kafka集群中包含多個(gè)服務(wù)器絮记,其中每個(gè)服務(wù)器稱為一個(gè)broker振乏。有一點(diǎn)需要注意一下恰聘,添加一個(gè)新的broker到cluster中的時(shí)候戳气,并不會(huì)分配任何數(shù)據(jù)partiton到新的broker链患,除非有新的topic被創(chuàng)建,為了不創(chuàng)建新的topic瓶您,可以考慮使用partition re-assignment tool將已有的parititon分配到新的broker中

  • Producer

    消息生產(chǎn)者麻捻,向kafka broker發(fā)送消息的客戶端,producer基于record的key決定將record發(fā)送到哪個(gè)partition览闰,默認(rèn)使用key的hash芯肤,如果沒有key巷折,則使用輪詢的策略

    利用api創(chuàng)建kafka生產(chǎn)者有三個(gè)基本屬性:

    1. bootstrap.servers:屬性值是一個(gè)host:port的broker列表压鉴,指定了簡(jiǎn)歷初始連接的broker列表,這個(gè)列表不需要包含所有的broker锻拘,因?yàn)榻⒌某跏歼B接會(huì)從相應(yīng)的broker獲取到集群的信息油吭。但是建議至少包含連個(gè)broker,保證高可用署拟。
    2. key.serializer:屬性值是類的名稱婉宰。這個(gè)屬性指定了用來(lái)序列化鍵值(key)的類。Kafka broker只接受字節(jié)數(shù)組推穷,但生產(chǎn)者的發(fā)送消息接口允許發(fā)送任何的Java對(duì)象心包,因此需要將這些對(duì)象序列化成字節(jié)數(shù)組。key.serializer指定的類需要實(shí)現(xiàn)org.apache.kafka.common.serialization.Serializer接口馒铃,Kafka客戶端包中包含了幾個(gè)默認(rèn)實(shí)現(xiàn)蟹腾,例如ByteArraySerializer痕惋、StringSerializer和IntegerSerializer。
    3. value.serializer:屬性值是類的名稱娃殖。這個(gè)屬性指定了用來(lái)序列化消息記錄的類
    4. acks: acks控制多少個(gè)副本必須寫入消息后生產(chǎn)者才能認(rèn)為寫入成功值戳,這個(gè)參數(shù)對(duì)消息丟失可能性有很大影響。這個(gè)參數(shù)有三種取值:
      • acks=0:生產(chǎn)者把消息發(fā)送到broker即認(rèn)為成功炉爆,不等待broker的處理結(jié)果堕虹。這種方式的吞吐最高,但也是最容易丟失消息的芬首。
      • acks=1:生產(chǎn)者會(huì)在該分區(qū)的群首(leader)寫入消息并返回成功后赴捞,認(rèn)為消息發(fā)送成功。如果群首寫入消息失敗郁稍,生產(chǎn)者會(huì)收到錯(cuò)誤響應(yīng)并進(jìn)行重試螟炫。這種方式能夠一定程度避免消息丟失,但如果群首宕機(jī)時(shí)該消息沒有復(fù)制到其他副本艺晴,那么該消息還是會(huì)丟失昼钻。另外,如果我們使用同步方式來(lái)發(fā)送封寞,延遲會(huì)比前一種方式大大增加(至少增加一個(gè)網(wǎng)絡(luò)往返時(shí)間)然评;如果使用異步方式,應(yīng)用感知不到延遲狈究,吞吐量則會(huì)受異步正在發(fā)送中的數(shù)量限制碗淌。
      • acks=all:生產(chǎn)者會(huì)等待所有副本成功寫入該消息,這種方式是最安全的抖锥,能夠保證消息不丟失亿眠,但是延遲也是最大的。
    5. 當(dāng)生產(chǎn)者發(fā)送消息收到一個(gè)可恢復(fù)異常時(shí)磅废,會(huì)進(jìn)行重試纳像,這個(gè)參數(shù)指定了重試的次數(shù)。在實(shí)際情況中拯勉,這個(gè)參數(shù)需要結(jié)合retry.backoff.ms(重試等待間隔)來(lái)使用竟趾,建議總的重試時(shí)間比集群重新選舉群首的時(shí)間長(zhǎng),這樣可以避免生產(chǎn)者過早結(jié)束重試導(dǎo)致失敗宫峦。
    private Properties kafkaProps = new Properties();
    kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
    kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
    producer = new KafkaProducer<String, String>(kafkaProps);
    

    producer創(chuàng)建完成后岔帽,有三種發(fā)送消息的方式:

    • Fire-and-forget(即發(fā)即棄): 發(fā)送消息給服務(wù)器, 然而并不關(guān)心消息是否成功達(dá)到.大部分情況下, 它將成功達(dá)到, 因?yàn)?Kafka 是高可用的, 并且生產(chǎn)者會(huì)自動(dòng)重試發(fā)送消息.不管怎樣,使用這種方式有些消息可能會(huì)丟失.

      ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France"); 
      try {
          // 此方法返回 RecordMetadata, 但是這里忽略了返回值, 無(wú)法知道消息是否發(fā)送成功
          // 生產(chǎn)環(huán)境一般不適用此種方式
          producer.send(record); 
      } catch (Exception e) {
          // SerializationException: 如果序列化失敗
          // BufferExhaustedException: buffer 滿了
          // TimeoutException
          // InterruptException: 發(fā)送線程被中斷
          e.printStackTrace(); 
      }
      
    • Synchronous send(同步發(fā)送): 發(fā)送消息后, send() 方法返回一個(gè) Future 對(duì)象, 使用 get() 方法在 future 上等待, 以此來(lái)判斷 send() 是否成功.獲取寫入的記錄的metadata,如topic导绷、partitionoffset

      ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
      try {
          // 發(fā)送成功, 可以獲得一個(gè) RecordMetadata 對(duì)象
          producer.send(record)
                  // 等待應(yīng)答
                  .get(); 
      } catch (Exception e) {
          // 發(fā)送失敗
          e.printStackTrace(); 
      }
      

      大部分情況下,我們不需要回復(fù)–Kafka 返回寫入的記錄的 topic犀勒、partitionoffset,通常發(fā)送端是不需要這些的.另外,我們可能需要知道什么時(shí)候發(fā)送消息失敗,所以我們可以拋出一個(gè)異常,記錄錯(cuò)誤信息或者寫入錯(cuò)誤文件用于后面的分析.不能通過重試被解決.比如, message size too large(消息大小太大),在這些情況中, KakfaProducer 將不會(huì)嘗試重試, 并立即返回異常.

    • Asynchronous send(異步發(fā)送): 使用一個(gè) callback function(回調(diào)方法)調(diào)用 send() 方法, 當(dāng)從 Kafka broker 接收到相應(yīng)的時(shí)候會(huì)觸此回調(diào)方法.

      private class DemoProducerCallback implements Callback { 
          @Override
          public void onCompletion(RecordMetadata recordMetadata, Exception e) {
           if (e != null) {//當(dāng)Kafka返回異常時(shí),異常值不為null
               e.printStackTrace(); 
              }
          }
      }
      ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA"); 
      producer.send(record, new DemoProducerCallback()); 
      

    ?

  • Consumer(每個(gè)consumer group是一個(gè)訂閱者,為每個(gè)topic partiton維護(hù)一個(gè)offset贾费,每個(gè)consumer自己也會(huì)維護(hù)一個(gè)offset)

    消息消費(fèi)者枚碗,每個(gè)consumer屬于一個(gè)特定的consumer group(可為每個(gè)consumer指定group name,若不指定group name則屬于默認(rèn)的group)铸本。同一topic的一條消息只能被同一個(gè)consumer group內(nèi)的一個(gè)consumer消費(fèi)肮雨,但多個(gè)consumer group可同時(shí)消費(fèi)這一消息。Consumer Group中的每個(gè)Consumer讀取Topic的一個(gè)或多個(gè)Partitions箱玷,并且是唯一的Consumer怨规;如果Consumer group中所有consumer總線程大于partitions數(shù)量,則會(huì)出現(xiàn)空閑情況锡足。這樣可以做到負(fù)載均衡波丰,也可以實(shí)現(xiàn)順序消費(fèi)(group中只有一個(gè)consumer)。每個(gè)consumer group維護(hù)了每個(gè)topic partition的offset舶得。

    1. 為了保證順序消費(fèi)掰烟,每個(gè)message只能發(fā)送到一個(gè)consumer中。否則效率很低沐批,需要等到所有消費(fèi)者消費(fèi)完才能發(fā)送下一個(gè)message纫骑,顯然是不合理的。同時(shí)對(duì)于topic中parition的消費(fèi)如果是異步的就很難保證順序性九孩。目前許多消息系統(tǒng)經(jīng)常使用‘獨(dú)占消費(fèi)’的方式消費(fèi)先馆。例如topic中的parition只能由特定一個(gè)消費(fèi)者消費(fèi),官網(wǎng)明確kafka智能保證一個(gè)parition中的消息的有序性躺彬,不能保證topic中不同parition的有序性
    2. 如果所有的consumer都在一個(gè)consumer group中煤墙,就像傳統(tǒng)的隊(duì)列一樣。如果所有的consumer都在不同的consumer group中就像發(fā)布訂閱模式一樣宪拥,所有的message都會(huì)廣播倒所有consumers中仿野。因此如果有很多的訂閱者,kafka的性能就會(huì)降低她君,因?yàn)閗afka需要拷貝message到所有的group中以保證順序性
    3. kafka consumer負(fù)載均衡:每個(gè)consumer是一個(gè)parititon的專有消費(fèi)者脚作,如果有新的consumer加入到了group中,它將獲得一個(gè)共享的parititon犁河,如果一個(gè)consumer掛了鳖枕,它的partition將會(huì)被分配到其他剩余的xonsumer中
    4. kafka災(zāi)備:consumer會(huì)將offset反饋給kafka broker當(dāng)一條記錄杯成功處理后魄梯。如果在發(fā)送commit offset前桨螺,consumer處理失敗,其他的consumer將會(huì)繼續(xù)從上次的commit offset開始處理酿秸。如果在處理完后這一條記錄但還未發(fā)送commit offset時(shí)consumer發(fā)生錯(cuò)誤灭翔,kafka記錄將被重復(fù)消費(fèi)。在這個(gè)情景下,kafka 實(shí)現(xiàn)了至少一次的消費(fèi)肝箱,應(yīng)該保證消息被處理時(shí)冪等的
    5. offset 管理:kafka將offset 數(shù)據(jù)保存到一個(gè)"__consumer_offset" topic中哄褒,這個(gè)topic使用日志壓縮,kafka災(zāi)備的的offset就是修改或讀取此topic中的值煌张。
    6. consumer可以消費(fèi)哪些記錄呐赡? 一條最新的記錄進(jìn)入之后,offset寫入到log parition中骏融,然后將該記錄復(fù)制到所有的partition的followers中链嘀,最后標(biāo)記"High watermark"(成功復(fù)制的最新紀(jì)錄offset)。consumer 消費(fèi)的是"High watermark"中的offset档玻,未被復(fù)制的不可以被消費(fèi)怀泊。
    7. consumer和parititon的關(guān)系:對(duì)于一個(gè)group,一個(gè)consumer只能消費(fèi)一個(gè)parition误趴,如果consumer數(shù)量大于paritition的數(shù)量霹琼,有的consumer就會(huì)空閑,可以作為災(zāi)備凉当,如果小于partiion數(shù)量枣申,每個(gè)consumer就會(huì)消費(fèi)多個(gè)parition
    8. 多線程kafka consumer :一個(gè)consumer有多個(gè)線程,很難保證記錄消費(fèi)的有序性看杭,只有在消費(fèi)單條記錄時(shí)間很長(zhǎng)的時(shí)候使用糯而,一般不建議使用。在一個(gè)進(jìn)程中跑多個(gè)線程泊窘,每個(gè)線程是一個(gè)consumer熄驼,每個(gè)線程管理自己的offset。
    三種消費(fèi)方式

    首先了解一下建立consumer的參數(shù):

    "bootstrap.servers", 指定kafka的broker

    "group.id", 指定consumer group

    "enable.auto.commit", 指定offset可以自動(dòng)被commit 到kafka烘豹,不需要程序中顯示的寫

    "auto.commit.interval.ms", 指定了commit offset的時(shí)間間隔

    "key.serializer" and "value.serializer", are classes to be used to decode the message into bytes.

    1. 自動(dòng)commit offset: parition中的一個(gè)記錄被消費(fèi)后自動(dòng)commit offset到kafka

      package com.til.kafka.consumer;  
        
      import java.util.List;  
      import java.util.Properties;  
        
      import org.apache.kafka.clients.consumer.ConsumerRecord;  
      import org.apache.kafka.clients.consumer.ConsumerRecords;  
      import org.apache.kafka.clients.consumer.KafkaConsumer;  
        
      import com.tb.constants.KafkaConstants;  
        
      //Automatic Offset Committing  
      public class AOCKafkaConsumer {  
          Properties props;  
          KafkaConsumer<String, String> consumer;  
        
          public AOCKafkaConsumer(String brokerString) {  
              props = new Properties();  
              props.put("bootstrap.servers", brokerString);  
              props.put("group.id", KafkaConstants.KAFKA_CONSUMER_GROUP);  
              props.put("enable.auto.commit", "true");  
              props.put("auto.commit.interval.ms", "1000");  
              props.put("key.deserializer", KafkaConstants.KAFKA_KEY_SERIALIZER);  
              props.put("value.deserializer", KafkaConstants.KAFKA_VALUE_SERIALIZER);  
              consumer = new KafkaConsumer<>(props);  
          }  
        
          public void subscribe(List<String> topics) {  
              consumer.subscribe(topics);  
              while (true) {  
                  ConsumerRecords<String, String> records = consumer.poll(100);  
                  for (ConsumerRecord<String, String> record : records)  
                      System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());  
              }  
          }  
      }  
      
      public class KafkaConstants {  
          public static String KAFKA_BROKER_STRING =   
                  "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094";  
          public static String KAFKA_KEY_SERIALIZER =   
                  "org.apache.kafka.common.serialization.StringSerializer";  
          public static String KAFKA_VALUE_SERIALIZER =   
                  "org.apache.kafka.common.serialization.StringSerializer";  
          public static String KAFKA_TOPIC = "TEST-1";  
          public static String KAFKA_CONSUMER_GROUP = "TEST";  
      }  
      
    2. 手動(dòng)commit offset 到kafka:手動(dòng)控制commit offset到kafka

      import java.util.List;  
      import java.util.Properties;  
        
      import org.apache.kafka.clients.consumer.ConsumerRecord;  
      import org.apache.kafka.clients.consumer.ConsumerRecords;  
      import org.apache.kafka.clients.consumer.KafkaConsumer;  
        
      import com.tb.constants.KafkaConstants;  
        
      //Manual Offset Control  
      public class MOCKafkaConsumer {  
          Properties props;  
          KafkaConsumer<String, String> consumer;  
        
          public MOCKafkaConsumer(String brokerString) {  
              props = new Properties();  
              props.put("bootstrap.servers", brokerString);  
              props.put("group.id", KafkaConstants.KAFKA_CONSUMER_GROUP);  
              props.put("enable.auto.commit", "false");  
              props.put("key.deserializer", KafkaConstants.KAFKA_KEY_SERIALIZER);  
              props.put("value.deserializer", KafkaConstants.KAFKA_VALUE_SERIALIZER);  
              consumer = new KafkaConsumer<>(props);  
          }  
        
          public void subscribe(List<String> topics) {  
              consumer.subscribe(topics);  
              while (true) {  
                  ConsumerRecords<String, String> records = consumer.poll(100);  
                  for (ConsumerRecord<String, String> record : records)  
                      System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());  
                  // This line of code manually commits offset to kafka  
                  consumer.commitSync();  
              }  
          }  
      }  
      
    3. 手動(dòng)分配一個(gè)consumer給一個(gè)partition:可以手工分配給特定的分區(qū)瓜贾,在這種類型的使用者中,我們可以繞過使用者組的概念携悯,并將使用者分配給特定的分區(qū)祭芦。

      import java.util.Arrays;  
      import java.util.List;  
      import java.util.Map;  
      import java.util.Map.Entry;  
      import java.util.Properties;  
      import java.util.Set;  
      import java.util.stream.Collectors;  
        
      import org.apache.kafka.clients.consumer.KafkaConsumer;  
      import org.apache.kafka.common.TopicPartition;  
        
      import com.tb.constants.KafkaConstants;  
        
      //Manual Partition Assignment  
      public class MPAKafkaConsumer {  
          private Properties props;  
          private KafkaConsumer<String, String> consumer;  
        
          public MPAKafkaConsumer(String brokerString) {  
        
              props = new Properties();  
              props.put("bootstrap.servers", brokerString);  
              // props.put("group.id", KafkaConstants.KAFKA_CONSUMER_GROUP);  
              props.put("enable.auto.commit", "false");  
              props.put("key.deserializer", KafkaConstants.KAFKA_KEY_SERIALIZER);  
              props.put("value.deserializer", KafkaConstants.KAFKA_VALUE_SERIALIZER);  
              consumer = new KafkaConsumer<>(props);  
        
          }  
        
          public void subscribe(List<TopicPartition> topicsPartions) {  
              consumer.assign(topicsPartions);  
        
          }  
        
      }  
      
      // consumer的構(gòu)建和測(cè)試
      import java.util.Arrays;  
        
      import org.apache.kafka.common.TopicPartition;  
        
      import com.tb.constants.KafkaConstants;  
      import com.til.kafka.consumer.MPAKafkaConsumer;  
        
      public class App {  
          public static void main(String[] args) {  
        
              // Partitions to which a consumer has to assign  
              TopicPartition partition = new   
                      TopicPartition(KafkaConstants.KAFKA_TOPIC, 0);  
        
              // This will start a consumer in new thread  
              new Thread(new Runnable() {  
                  @Override  
                  public void run() {  
                      MPAKafkaConsumer mpaKafkaConsumer =   
                              new MPAKafkaConsumer(KafkaConstants.KAFKA_BROKER_STRING);  
                        
                      mpaKafkaConsumer.subscribe(Arrays.asList(partition));  
        
                  }  
              }).start();  
          }  
      }  
      
  • Topic(復(fù)制/災(zāi)備/并行化)

    可以理解為一個(gè)MQ消息隊(duì)列的名字。每條發(fā)布到Kafka集群的消息都有一個(gè)類別憔鬼,這個(gè)類別被稱為topic龟劲。(物理上不同topic的消息分開存儲(chǔ),邏輯上一個(gè)topic的消息雖然保存于一個(gè)或多個(gè)broker上但用戶只需指定消息的topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)轴或。

    每個(gè)topic都有一個(gè)Log(topic在硬盤上的存儲(chǔ))昌跌,每個(gè)Log被分為多個(gè)pritions和segments。在硬盤上表現(xiàn)為多個(gè)文件照雁。

  • Partition:

    parition是物理上的概念蚕愤,每個(gè)topic包含一個(gè)或多個(gè)partition,創(chuàng)建topic時(shí)可指定parition數(shù)量。每個(gè)partition對(duì)應(yīng)于一個(gè)文件夾萍诱,該文件夾下存儲(chǔ)該partition的數(shù)據(jù)和索引文件悬嗓。為了實(shí)現(xiàn)擴(kuò)展性,一個(gè)非常大的topic可以分布到多個(gè) broker(即服務(wù)器)上裕坊,一個(gè)topic可以分為多個(gè)partition包竹,每個(gè)partition是一個(gè)有序的隊(duì)列。partition中的每條消息 都會(huì)被分配一個(gè)有序的id(offset)籍凝。kafka只保證按一個(gè)partition中的順序?qū)⑾l(fā)給consumer映企,不保證一個(gè)topic的整體 (多個(gè)partition間)的順序。也就是說静浴,一個(gè)topic在集群中可以有多個(gè)partition堰氓,那么分區(qū)的策略是什么?(消息發(fā)送到哪個(gè)分區(qū)上苹享,有兩種基本的策略双絮,一是采用Key Hash算法,一是采用Round Robin算法)

    patition的備份數(shù): 可以配置parititon的備份數(shù)量得问,每個(gè)parition都有一個(gè)leader server和0到多個(gè)follower servers囤攀,其中l(wèi)eader server處理一個(gè)parition中的所有的讀和寫(和想的不太一樣)。follower 復(fù)制leader宫纬,在leader掛掉后進(jìn)行替換焚挠,Kafka還使用分區(qū)在組內(nèi)進(jìn)行并行消費(fèi)者處理。Kafka在Kafka集群中的服務(wù)器上分發(fā)主題日志分區(qū)漓骚。每個(gè)服務(wù)器通過共享分區(qū)leader來(lái)處理其數(shù)據(jù)和請(qǐng)求的共享(不太懂)蝌衔。

  • zookeeper

    用來(lái)管理集群,協(xié)調(diào)broker/cluster的拓?fù)浣Y(jié)構(gòu)蝌蹂,管理集群中哪些broker是新增的噩斟,哪些已經(jīng)掛掉了,新增了一個(gè)topic還是移除了一個(gè)topic孤个,同時(shí)用來(lái)Broker topic partition中l(wèi)eader的選擇剃允。

Kafka的數(shù)據(jù)存儲(chǔ)

主要接收topic中partition數(shù)據(jù)的存儲(chǔ),partition是以文件夾的形式存在具體的borker本機(jī)上(為了效率齐鲤,并不依賴hdfs斥废,自己維護(hù)多份數(shù)據(jù))

  1. segment文件的組成

    對(duì)于一個(gè)partition(在Broker中以文件夾的形式存在),里面又有很多大小相等的segment數(shù)據(jù)文件(這個(gè)文件具體大小可以在config/server.properties中進(jìn)行設(shè)置)给郊,這種特性可以方便old segment file的快速刪除牡肉。

    • segment file 組成:由2部分組成,分別為index file和data file丑罪,這兩個(gè)文件是一一對(duì)應(yīng)的荚板,后綴”.index”和”.log”分別表示索引文件和數(shù)據(jù)文件凤壁;其中index文件結(jié)構(gòu)很簡(jiǎn)單,每一行都是一個(gè)key,value對(duì)
      key 是消息的序號(hào)offset吩屹,value 是消息的物理位置偏移量.
    • segment file 命名規(guī)則:partition的第一個(gè)segment從0開始跪另,后續(xù)每個(gè)segment文件名為上一個(gè)segment文件最后一條消息的offset,ofsset的數(shù)值最大為64位(long類型),20位數(shù)字字符長(zhǎng)度煤搜,沒有數(shù)字用0填充免绿。如下圖所示:
  2. 查找:給定一個(gè)offset,查找message擦盾。過程如下:根據(jù)segment文件的命名嘲驾,進(jìn)行二分查找,找到對(duì)應(yīng)的index和log文件迹卢,然后進(jìn)入index 順序查找到小于或等于offset的key(為了保證快速查找使用稀疏索引)辽故,拿到該index文件中offset對(duì)應(yīng)的index,在log文件中順序查找到需要查找的offset的message腐碱。

kafka日志清理

有兩種策略:

  • 一種是上面的cleanupLogs根據(jù)時(shí)間或大小策略(粗粒度)
  • 還有一種是針對(duì)每個(gè)key的日志刪除策略(細(xì)粒度)即LogCleaner方式誊垢,清理不包括activeSegment(即使超時(shí)),如果消息沒有key症见,那只能采用第一種清理策略了喂走。

日志壓縮保證了:

  1. 任何消費(fèi)者如果能夠趕上Log的Head部分,它就會(huì)看到寫入的每條消息谋作,這些消息都是順序遞增(中間不會(huì)間斷)的offset
  2. 總是維持消息的有序性芋肠,壓縮并不會(huì)對(duì)消息進(jìn)行重新排序,而是移除一些消息
  3. 每條消息的offset永遠(yuǎn)不會(huì)被改變遵蚜,它是日志文件標(biāo)識(shí)位置的永久編號(hào)
  4. 讀取/消費(fèi)時(shí)如果從最開始的offset=0開始帖池,那么至少可以看到所有記錄按照它們寫入的順序得到的最終狀態(tài)(狀態(tài)指的是value,相同key不同value吭净,最終的狀態(tài)以最新的value為準(zhǔn)):因?yàn)檫@種場(chǎng)景下寫入順序和讀取順序是一致的碘裕,寫入時(shí)和讀取時(shí)offset都是不斷遞增。舉例寫入key1的value在offset=1和offst=5的值分別是v1和v2攒钳,那么讀取到offset=1時(shí)帮孔,最終的狀態(tài)(value值)是v1女坑,讀取到offset=5時(shí)顷锰,最終狀態(tài)是v2(不能指望說讀取到offset=1時(shí)就要求狀態(tài)是v2)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末凤价,一起剝皮案震驚了整個(gè)濱河市街图,隨后出現(xiàn)的幾起案子述召,更是在濱河造成了極大的恐慌岩灭,老刑警劉巖笆搓,帶你破解...
    沈念sama閱讀 206,378評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件膏蚓,死亡現(xiàn)場(chǎng)離奇詭異实愚,居然都是意外死亡兼呵,警方通過查閱死者的電腦和手機(jī)兔辅,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)击喂,“玉大人维苔,你說我怎么就攤上這事《海” “怎么了介时?”我有些...
    開封第一講書人閱讀 152,702評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)凌彬。 經(jīng)常有香客問我沸柔,道長(zhǎng),這世上最難降的妖魔是什么铲敛? 我笑而不...
    開封第一講書人閱讀 55,259評(píng)論 1 279
  • 正文 為了忘掉前任褐澎,我火速辦了婚禮,結(jié)果婚禮上伐蒋,老公的妹妹穿的比我還像新娘工三。我一直安慰自己,他們只是感情好咽弦,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,263評(píng)論 5 371
  • 文/花漫 我一把揭開白布徒蟆。 她就那樣靜靜地躺著,像睡著了一般型型。 火紅的嫁衣襯著肌膚如雪段审。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,036評(píng)論 1 285
  • 那天闹蒜,我揣著相機(jī)與錄音寺枉,去河邊找鬼。 笑死绷落,一個(gè)胖子當(dāng)著我的面吹牛姥闪,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播砌烁,決...
    沈念sama閱讀 38,349評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼筐喳,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了函喉?” 一聲冷哼從身側(cè)響起避归,我...
    開封第一講書人閱讀 36,979評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎管呵,沒想到半個(gè)月后梳毙,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,469評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡捐下,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,938評(píng)論 2 323
  • 正文 我和宋清朗相戀三年账锹,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了萌业。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,059評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡奸柬,死狀恐怖生年,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情鸟缕,我是刑警寧澤晶框,帶...
    沈念sama閱讀 33,703評(píng)論 4 323
  • 正文 年R本政府宣布排抬,位于F島的核電站懂从,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏蹲蒲。R本人自食惡果不足惜番甩,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,257評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望届搁。 院中可真熱鬧缘薛,春花似錦、人聲如沸卡睦。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)表锻。三九已至恕齐,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間瞬逊,已是汗流浹背显歧。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留确镊,地道東北人士骤。 一個(gè)月前我還...
    沈念sama閱讀 45,501評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像蕾域,于是被迫代替她去往敵國(guó)和親拷肌。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,792評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,707評(píng)論 13 425
  • Kafka入門經(jīng)典教程-Kafka-about云開發(fā) http://www.aboutyun.com/threa...
    葡萄喃喃囈語(yǔ)閱讀 10,812評(píng)論 4 54
  • Kafka簡(jiǎn)介 Kafka是一種分布式的旨巷,基于發(fā)布/訂閱的消息系統(tǒng)巨缘。主要設(shè)計(jì)目標(biāo)如下: 以時(shí)間復(fù)雜度為O(1)的方...
    Alukar閱讀 3,074評(píng)論 0 43
  • 本文轉(zhuǎn)載自http://dataunion.org/?p=9307 背景介紹Kafka簡(jiǎn)介Kafka是一種分布式的...
    Bottle丶Fish閱讀 5,431評(píng)論 0 34
  • 幸福是需要修出來(lái)的~每天進(jìn)步1%~幸福實(shí)修13班~23 蔣鶯華 20171220(23/60) 【幸福三朵玫瑰】 ...
    蔣鶯華閱讀 151評(píng)論 0 1