kafka學(xué)習(xí)筆記

版本選擇

  • 0.7版本:
    只有基礎(chǔ)消息隊列功能凭疮,無副本饭耳;打死也不使用
  • 0.8版本:
    增加了副本機制,新的producer API执解;建議使用0.8.2.2版本蹦锋;不建議使用0.8.2.0之后的producer API
  • 0.9版本:
    增加權(quán)限和認證渔彰,新的consumer API霍狰,Kafka Connect功能勘天;不建議使用consumer API;
  • 0.10版本:
    引入Kafka Streams功能右蕊,bug修復(fù)琼稻;建議版本0.10.2.2;建議使用新版consumer API
  • 0.11版本:
    producer API冪等尤泽,事物API欣簇,消息格式重構(gòu)规脸;建議版本0.11.0.3;謹慎對待消息格式變化
  • 1.0和2.0版本:
    Kafka Streams改進熊咽;建議版本2.0莫鸭;

部署需要注意

image.png

重要參數(shù)

  • Broker 端參數(shù)
  1. log.dirs:這是非常重要的參數(shù),指定了 Broker 需要使用的若干個文件目錄路徑横殴。要知道這個參數(shù)是沒有默認值的被因,這說明什么?這說明它必須由你親自指定衫仑。log.dir:注意這是 dir梨与,結(jié)尾沒有 s,說明它只能表示單個路徑文狱,它是補充上一個參數(shù)用的粥鞋。

提升讀寫性能:比起單塊磁盤,多塊物理磁盤同時讀寫數(shù)據(jù)有更高的吞吐量

  • Zookeeper
    zookeeper.connect:zk1:2181,zk2:2181,zk3:2181/kafka1

  • listeners:學(xué)名叫監(jiān)聽器
    其實就是告訴外部連接者要通過什么協(xié)議訪問指定主機名和端口開放的 Kafka 服務(wù)瞄崇。advertised.listeners:和 listeners 相比多了個 advertised呻粹。Advertised 的含義表示宣稱的、公布的苏研,就是說這組監(jiān)聽器是 Broker 用于對外發(fā)布的等浊。host.name/port:列出這兩個參數(shù)就是想說你把它們忘掉吧,壓根不要為它們指定值摹蘑,畢竟都是過期的參數(shù)了筹燕。

最好全部使用主機名,即 Broker 端和 Client 端應(yīng)用配置中全部填寫主機名

  • topic:
  1. auto.create.topics.enable:是否允許自動創(chuàng)建 Topic衅鹿。推薦設(shè)置為false
    規(guī)避線上自動創(chuàng)建topic問題
  2. unclean.leader.election.enable:是否允許 Unclean Leader 選舉撒踪。推薦為false
    規(guī)避落后的副本自動選為leader,導(dǎo)致數(shù)據(jù)丟失.
  3. auto.leader.rebalance.enable:是否允許定期進行 Leader 選舉。推薦false
    規(guī)避自動切換leader造成不必要的性能開銷
  • 消息保存三兄弟
  1. log.retention.{hours|minutes|ms}:這是個“三兄弟”塘安,都是控制一條消息數(shù)據(jù)被保存多長時間糠涛。從優(yōu)先級上來說 ms 設(shè)置最高、minutes 次之兼犯、hours 最低
  2. log.retention.bytes:這是指定 Broker 為消息保存的總磁盤容量大小。
  3. message.max.bytes:控制 Broker 能夠接收的最大消息大小集漾。
  • 創(chuàng)建topic時帶參數(shù)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880

  • 修改topic參數(shù)
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760

Kafka jvm設(shè)置推薦參數(shù)

  • KAFKA_HEAP_OPTS:指定堆大小切黔。
  • KAFKA_JVM_PERFORMANCE_OPTS:指定 GC 參數(shù)。

kafka 零拷貝

  • 數(shù)據(jù)在磁盤和網(wǎng)絡(luò)進行傳輸時避免昂貴的內(nèi)核態(tài)數(shù)據(jù)拷貝具篇,從而實現(xiàn)快速的數(shù)據(jù)傳輸

分區(qū)策略

  • 輪詢策略
    生產(chǎn)者程序會按照輪詢的方式在主題的所有分區(qū)間均勻地“碼放”消息纬霞。
    輪詢策略有非常優(yōu)秀的負載均衡表現(xiàn),它總是能保證消息最大限度地被平均分配到所有分區(qū)上驱显,故默認情況下它是最合理的分區(qū)策略诗芜,也是我們最常用的分區(qū)策略之一
  • 隨機策略
    我們隨意地將消息放置到任意一個分區(qū)上
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
  • 按消息鍵保序策略[官方?jīng)]有該策略]
    Kafka 允許為每條消息定義消息鍵瞳抓,簡稱為 Key。這個 Key 的作用非常大伏恐,它可以是一個有著明確業(yè)務(wù)含義的字符串孩哑,比如客戶代碼、部門編號或是業(yè)務(wù) ID 等翠桦;也可以用來表征消息元數(shù)據(jù)横蜒。特別是在 Kafka 不支持時間戳的年代,在一些場景中销凑,工程師們都是直接將消息創(chuàng)建時間封裝進 Key 里面的丛晌。一旦消息被定義了 Key,那么你就可以保證同一個 Key 的所有消息都進入到相同的分區(qū)里面斗幼,由于每個分區(qū)下的消息處理都是有順序的澎蛛,故這個策略被稱為按消息鍵保序策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();

kafka生產(chǎn)者優(yōu)化

  • 開啟壓縮

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 // 開啟GZIP壓縮
 props.put("compression.type", "gzip");
  Producer<String, String> producer = new KafkaProducer<>(props);
  • 壓縮參數(shù)


    image.png

結(jié)論:

  1. 吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在壓縮比方面蜕窿,zstd > LZ4 > GZIP > Snappy谋逻。
  2. 壓縮比方面: zstd > LZ4 > GZIP > snappy

kafka消息不丟失保證策略

  1. 生產(chǎn)者 選擇合適的api
    Producer 永遠要使用帶有回調(diào)通知的發(fā)送 API,也就是說不要使用 producer.send(msg)渠羞,而要使用 producer.send(msg, callback)
  2. 消費者維護好偏移量
    維持先消費消息(閱讀)斤贰,再更新位移(書簽)的順序
    如果是多線程異步處理消費消息,Consumer 程序不要開啟自動提交位移次询,而是要應(yīng)用程序手動提交位移
    3.最佳實踐
  • 不要使用 producer.send(msg)荧恍,而要使用 producer.send(msg, callback)。記住屯吊,一定要使用帶有回調(diào)通知的 send 方法送巡。
  • 設(shè)置 acks = all。acks 是 Producer 的一個參數(shù)盒卸,代表了你對“已提交”消息的定義骗爆。如果設(shè)置成 all,則表明所有副本 Broker 都要接收到消息蔽介,該消息才算是“已提交”摘投。這是最高等級的“已提交”定義。
  • 設(shè)置 retries 為一個較大的值虹蓄。這里的 retries 同樣是 Producer 的參數(shù)犀呼,對應(yīng)前面提到的 Producer 自動重試。當(dāng)出現(xiàn)網(wǎng)絡(luò)的瞬時抖動時薇组,消息發(fā)送可能會失敗外臂,此時配置了 retries > 0 的 Producer 能夠自動重試消息發(fā)送,避免消息丟失律胀。
  • 設(shè)置 unclean.leader.election.enable = false宋光。這是 Broker 端的參數(shù)貌矿,它控制的是哪些 Broker 有資格競選分區(qū)的 Leader。如果一個 Broker 落后原先的 Leader 太多罪佳,那么它一旦成為新的 Leader逛漫,必然會造成消息的丟失。故一般都要將該參數(shù)設(shè)置成 false菇民,即不允許這種情況的發(fā)生尽楔。
  • 設(shè)置 replication.factor >= 3。這也是 Broker 端的參數(shù)第练。其實這里想表述的是阔馋,最好將消息多保存幾份,畢竟目前防止消息丟失的主要機制就是冗余娇掏。
  • 設(shè)置 min.insync.replicas > 1呕寝。這依然是 Broker 端參數(shù),控制的是消息至少要被寫入到多少個副本才算是“已提交”婴梧。設(shè)置成大于 1 可以提升消息持久性下梢。在實際環(huán)境中千萬不要使用默認值 1。
  • 確保 replication.factor > min.insync.replicas塞蹭。如果兩者相等孽江,那么只要有一個副本掛機,整個分區(qū)就無法正常工作了番电。我們不僅要改善消息的持久性岗屏,防止數(shù)據(jù)丟失,還要在不降低可用性的基礎(chǔ)上完成漱办。
  • 推薦設(shè)置成 replication.factor = min.insync.replicas + 1这刷。確保消息消費完成再提交。
  • Consumer 端有個參數(shù) enable.auto.commit娩井,最好把它設(shè)置成 false暇屋,并采用手動提交位移的方式。就像前面說的洞辣,這對于單 Consumer 多線程處理的場景而言是至關(guān)重要的咐刨。
image.png

kafka中的攔截器

  • 參數(shù)設(shè)置
Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 攔截器1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 攔截器2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

  • 生產(chǎn)者
public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {
    private Jedis jedis; // 省略Jedis初始化
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        jedis.incr("totalSentMessage");
        return record;
    }
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map<java.lang.String, ?> configs) {
    }
  • 消費者
public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {
    private Jedis jedis; //省略Jedis初始化


    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        long lantency = 0L;
        for (ConsumerRecord<String, String> record : records) {
            lantency += (System.currentTimeMillis() - record.timestamp());
        }
        jedis.incrBy("totalLatency", lantency);
        long totalLatency = Long.parseLong(jedis.get("totalLatency"));
        long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
        jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
        return records;
    }
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    }

    @Override
    public void close() {
    }
    @Override
    public void configure(Map<String, ?> configs) {

java生產(chǎn)者是如何管理tcp連接的?

Apache Kafka的所有通信都是基于TCP的,而不是于HTTP或其他協(xié)議的
1 為什采用TCP?
(1)TCP擁有一些高級功能扬霜,如多路復(fù)用請求和同時輪詢多個連接的能力所宰。
(2)很多編程語言的HTTP庫功能相對的比較簡陋。
名詞解釋:
多路復(fù)用請求:multiplexing request畜挥,是將兩個或多個數(shù)據(jù)合并到底層—物理連接中的過程。TCP的多路復(fù)用請求會在一條物理連接上創(chuàng)建若干個虛擬連接婴谱,每個虛擬連接負責(zé)流轉(zhuǎn)各自對應(yīng)的數(shù)據(jù)流蟹但。嚴格講:TCP并不能多路復(fù)用躯泰,只是提供可靠的消息交付語義保證,如自動重傳丟失的報文华糖。

2 何時創(chuàng)建TCP連接麦向?
(1)在創(chuàng)建KafkaProducer實例時,
A:生產(chǎn)者應(yīng)用會在后臺創(chuàng)建并啟動一個名為Sender的線程客叉,該Sender線程開始運行時诵竭,首先會創(chuàng)建與Broker的連接。
B:此時不知道要連接哪個Broker兼搏,kafka會通過METADATA請求獲取集群的元數(shù)據(jù)卵慰,連接所有的Broker。
(2)還可能在更新元數(shù)據(jù)后佛呻,或在消息發(fā)送時
3 何時關(guān)閉TCP連接
(1)Producer端關(guān)閉TCP連接的方式有兩種:用戶主動關(guān)閉裳朋,或kafka自動關(guān)閉。
A:用戶主動關(guān)閉吓著,通過調(diào)用producer.close()方關(guān)閉鲤嫡,也包括kill -9暴力關(guān)閉。
B:Kafka自動關(guān)閉绑莺,這與Producer端參數(shù)connection.max.idles.ms的值有關(guān)暖眼,默認為9分鐘,9分鐘內(nèi)沒有任何請求流過纺裁,就會被自動關(guān)閉诫肠。這個參數(shù)可以調(diào)整。
C:第二種方式中对扶,TCP連接是在Broker端被關(guān)閉的区赵,但這個連接請求是客戶端發(fā)起的,對TCP而言這是被動的關(guān)閉浪南,被動關(guān)閉會產(chǎn)生大量的CLOSE_WAIT連接笼才。

代碼實現(xiàn)冪等性

props.put(“enable.idempotence”, ture),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG络凿, true)骡送。

代碼開啟生產(chǎn)者事務(wù)

開啟 enable.idempotence = true。
設(shè)置 Producer 端參數(shù) transactional. id絮记。最好為其設(shè)置一個有意義的名字

producer.initTransactions();
try {
            producer.beginTransaction();
            producer.send(record1);
            producer.send(record2);
            producer.commitTransaction();
} catch (KafkaException e) {
            producer.abortTransaction();
}

消費組平衡問題

  1. 組成員數(shù)量發(fā)生變化摔踱。
  2. 訂閱主題數(shù)量發(fā)生變化。
  3. 訂閱主題的分區(qū)數(shù)發(fā)生變化怨愤。
image.png

kafka位移

Committing Offsets:Consumer 需要向 Kafka 匯報自己的位移數(shù)據(jù)派敷,這個匯報過程被稱為提交位移

  • 自動提交位移
Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
    // 開啟自動提交位移 
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "2000");
     
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }
  • 手動提交

   try {
           while(true) {
                ConsumerRecords<String, String> records =     consumer.poll(Duration.ofSeconds(1));                    
                process(records); // 處理消息
                commitAysnc(); // 使用異步提交規(guī)避阻塞
            }
} catch(Exception e) {
            handle(e); // 處理異常
} finally {
            try {
               consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
  } finally {
       consumer.close();
}
}
  • 精細提交位移
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
……
while (true) {
            ConsumerRecords<String, String> records = 
  consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record: records) {
                        process(record);  // 處理消息
                        offsets.put(new TopicPartition(record.topic(), record.partition()),
                                   new OffsetAndMetadata(record.offset() + 1);
                       if(count % 100 == 0)
                                    consumer.commitAsync(offsets, null); // 回調(diào)處理邏輯是null
                        count++;
  }
}

對于一次要處理很多消息的 Consumer 而言,它會關(guān)心社區(qū)有沒有方法允許它在消費的中間進行位移提交篮愉。比如前面這個 5000 條消息的例子腐芍,你可能希望每處理完 100 條消息就提交一次位移,這樣能夠避免大批量的消息重新消費试躏。

image.png

CommitFailedException異常怎么處理猪勇?

  • 場景一
    當(dāng)消息處理的總時間超過預(yù)設(shè)的 max.poll.interval.ms 參數(shù)值時,Kafka Consumer 端會拋出 CommitFailedException 異常
    四種處理方式
  1. 縮短單條消息處理的時間
  2. 增加 Consumer 端允許下游系統(tǒng)消費一批消息的最大時長
  3. 減少下游系統(tǒng)一次性消費的消息總數(shù)
  4. 下游系統(tǒng)使用多線程來加速消費


    image.png
  • 場景二
    Kafka Java Consumer 端還提供了一個名為 Standalone Consumer 的獨立消費者颠蕴。它沒有消費者組的概念泣刹,每個消費者實例都是獨立工作的,彼此之間毫無聯(lián)系犀被。不過椅您,你需要注意的是,獨立消費者的位移提交機制和消費者組是一樣的弱判,因此獨立消費者的位移提交也必須遵守之前說的那些規(guī)定襟沮,比如獨立消費者也要指定 group.id 參數(shù)才能提交位移.
image.png

多線程開發(fā)消費者實例

    1. 消費者程序啟動多個線程,每個線程維護專屬的 KafkaConsumer 實例昌腰,負責(zé)完整的消息獲取开伏、消息處理流程
public class KafkaConsumerRunner implements Runnable {
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaConsumer consumer;
     public void run() {
         try {
             consumer.subscribe(Arrays.asList("topic"));
             while (!closed.get()) {
      ConsumerRecords records = 
        consumer.poll(Duration.ofMillis(10000));
                 //  執(zhí)行消息處理邏輯
             }
         } catch (WakeupException e) {
             // Ignore exception if closing
             if (!closed.get()) throw e;
         } finally {
             consumer.close();
         }
     }
     // Shutdown hook which can be called from a separate thread
     public void shutdown() {
         closed.set(true);
         consumer.wakeup();
     }
  • 2.消費者程序使用單或多線程獲取消息,同時創(chuàng)建多個消費線程執(zhí)行消息處理邏

private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...


private int workerNum = ...;
executors = new ThreadPoolExecutor(
  workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
  new ArrayBlockingQueue<>(1000), 
  new ThreadPoolExecutor.CallerRunsPolicy());


...
while (true)  {
  ConsumerRecords<String, String> records = 
    consumer.poll(Duration.ofSeconds(1));
  for (final ConsumerRecord record : records) {
    executors.submit(new Worker(record));
  }
}
..
image.png

kafka副本機制

image.png

kafka如何控制請求?

https://www.processon.com/view/link/5d481e6be4b07c4cf3031755

消費組重平衡

  • heartbeat.interval.ms
    這個參數(shù)的真正作用是控制重平衡通知的頻率遭商。如果你想要消費者實例更迅速地得到通知固灵,那么就可以給這個參數(shù)設(shè)置一個非常小的值,這樣消費者就能更快地感知到重平衡已經(jīng)開啟了

費者組狀態(tài)機

image.png

image.png

Broker 端重平衡場景剖析

場景一:新成員入組劫流。


image.png

場景二:組成員主動離組


image.png

場景四:重平衡時協(xié)調(diào)者對組內(nèi)成員提交位移的處理巫玻。

image.png

控制器

1作用:
控制器組件(Controller),是Apache Kafka的核心組件祠汇。它的主要作用是Apache Zookeeper的幫助下管理和協(xié)調(diào)整個Kafka集群仍秤。
集群中任意一臺Broker都能充當(dāng)控制器的角色,但在運行過程中可很,只能有一個Broker成為控制器诗力。

2 特點:控制器是重度依賴Zookeeper。
3 產(chǎn)生:
控制器是被選出來的我抠,Broker在啟動時苇本,會嘗試去Zookeeper中創(chuàng)建/controller節(jié)點。Kafka當(dāng)前選舉控制器的規(guī)則是:第一個成功創(chuàng)建/controller節(jié)點的Broker會被指定為控制器菜拓。

4 功能:
A :主題管理(創(chuàng)建瓣窄,刪除,增加分區(qū))
當(dāng)執(zhí)行kafka-topics腳本時纳鼎,大部分的后臺工作都是控制器來完成的俺夕。
B :分區(qū)重分配
Kafka-reassign-partitions腳本提供的對已有主題分區(qū)進行細粒度的分配功能裳凸。
C :Preferred領(lǐng)導(dǎo)者選舉
Preferred領(lǐng)導(dǎo)者選舉主要是Kafka為了避免部分Broker負載過重而提供的一種換Leade的方案。
D :集群成員管理(新增Broker啥么,Broker主動關(guān)閉登舞,Broker宕機)
控制器組件會利用watch機制檢查Zookeeper的/brokers/ids節(jié)點下的子節(jié)點數(shù)量變更。當(dāng)有新Broker啟動后悬荣,它會在/brokers下創(chuàng)建專屬的znode節(jié)點。一旦創(chuàng)建完畢疙剑,Zookeeper會通過Watch機制將消息通知推送給控制器氯迂,這樣,控制器就能自動地感知到這個變化言缤。進而開啟后續(xù)新增Broker作業(yè)嚼蚀。
偵測Broker存活性則是依賴于剛剛提到的另一個機制:臨時節(jié)點。每個Broker啟動后管挟,會在/brokers/ids下創(chuàng)建一個臨時的znode轿曙。當(dāng)Broker宕機或主機關(guān)閉后,該Broker與Zookeeper的會話結(jié)束僻孝,這個znode會被自動刪除导帝。同理,Zookeeper的Watch機制將這一變更推送給控制器穿铆,這樣控制器就能知道有Broker關(guān)閉或宕機了您单,從而進行善后。

E :數(shù)據(jù)服務(wù)
控制器上保存了最全的集群元數(shù)據(jù)信息荞雏,其他所有Broker會定期接收控制器發(fā)來的元數(shù)據(jù)更新請求虐秦,從而更新其內(nèi)存中的緩存數(shù)據(jù)。

5 控制器保存的數(shù)據(jù)

控制器中保存的這些數(shù)據(jù)在Zookeeper中也保存了一份凤优。每當(dāng)控制器初始化時悦陋,它都會從Zookeeper上讀取對應(yīng)的元數(shù)據(jù)并填充到自己的緩存中。

6 控制器故障轉(zhuǎn)移(Failover)
故障轉(zhuǎn)移是指:當(dāng)運行中的控制器突然宕機或意外終止時筑辨,Kafka能夠快速地感知到俺驶,并立即啟用備用控制器來替代之前失敗的控制器。

7 內(nèi)部設(shè)計原理
A :控制器的內(nèi)部設(shè)計相當(dāng)復(fù)雜
控制器是多線程的設(shè)計挖垛,會在內(nèi)部創(chuàng)建很多線程痒钝。如:
(1)為每個Broker創(chuàng)建一個對應(yīng)的Socket連接,然后在創(chuàng)建一個專屬的線程痢毒,用于向這些Broker發(fā)送特定的請求送矩。
(2)控制連接zookeeper,也會創(chuàng)建單獨的線程來處理Watch機制通知回調(diào)。
(3)控制器還會為主題刪除創(chuàng)建額外的I/O線程哪替。
這些線程還會訪問共享的控制器緩存數(shù)據(jù)栋荸,為了維護數(shù)據(jù)安全性,控制在代碼中大量使用ReetrantLock同步機制,進一步拖慢了整個控制器的處理速度晌块。

B :在0.11版對控制器的低沉設(shè)計進了重構(gòu)爱沟。

(1)最大的改進是:把多線程的方案改成了單線程加事件對列的方案。

a. 單線程+隊列的實現(xiàn)方式:社區(qū)引入了一個事件處理線程匆背,統(tǒng)一處理各種控制器事件呼伸,然后控制器將原來執(zhí)行的操作全部建模成一個個獨立的事件,發(fā)送到專屬的事件隊列中钝尸,供此線程消費括享。
b. 單線程不代表之前提到的所有線程都被干掉了,控制器只是把緩存狀態(tài)變更方面的工作委托給了這個線程而已珍促。
(2)第二個改進:將之前同步操作Zookeeper全部改為異步操作铃辖。
a. Zookeeper本身的API提供了同步寫和異步寫兩種方式。同步操作zk猪叙,在有大量主題分區(qū)發(fā)生變更時娇斩,Zookeeper容易成為系統(tǒng)的瓶頸。

高水位的討論

Leader 副本保持同步條件

  1. 該遠程 Follower 副本在 ISR 中穴翩。
  2. 該遠程 Follower 副本 LEO 值落后于 Leader 副本 LEO 值的時間犬第,不超過 Broker 端參數(shù) replica.lag.time.max.ms 的值。如果使用默認值的話藏否,就是不超過 10 秒瓶殃。
image.png

kafka調(diào)優(yōu)

1.操作系統(tǒng)調(diào)優(yōu)
系統(tǒng)時禁掉 atime 更新
至少選擇 ext4 或 XFS
ulimit -n 和 vm.max_map_count
操作系統(tǒng)頁緩存

2.JVM 層調(diào)優(yōu)
設(shè)置堆大小 6-8G
GC 收集器 G1

  1. Broker 端調(diào)優(yōu)
    Producer -> Broker -> Consumer三端kafka版本要保持一致

4.應(yīng)用層調(diào)優(yōu)
不要頻繁地創(chuàng)建 Producer 和 Consumer 對象實例
用完及時關(guān)閉
合理利用多線程來改善性能

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市副签,隨后出現(xiàn)的幾起案子遥椿,更是在濱河造成了極大的恐慌,老刑警劉巖淆储,帶你破解...
    沈念sama閱讀 206,839評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件冠场,死亡現(xiàn)場離奇詭異,居然都是意外死亡本砰,警方通過查閱死者的電腦和手機碴裙,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來点额,“玉大人舔株,你說我怎么就攤上這事』估猓” “怎么了载慈?”我有些...
    開封第一講書人閱讀 153,116評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長珍手。 經(jīng)常有香客問我办铡,道長辞做,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,371評論 1 279
  • 正文 為了忘掉前任寡具,我火速辦了婚禮秤茅,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘童叠。我一直安慰自己框喳,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 64,384評論 5 374
  • 文/花漫 我一把揭開白布拯钻。 她就那樣靜靜地躺著帖努,像睡著了一般。 火紅的嫁衣襯著肌膚如雪粪般。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,111評論 1 285
  • 那天污桦,我揣著相機與錄音亩歹,去河邊找鬼。 笑死凡橱,一個胖子當(dāng)著我的面吹牛小作,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播稼钩,決...
    沈念sama閱讀 38,416評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼顾稀,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了坝撑?” 一聲冷哼從身側(cè)響起静秆,我...
    開封第一講書人閱讀 37,053評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎巡李,沒想到半個月后抚笔,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,558評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡侨拦,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,007評論 2 325
  • 正文 我和宋清朗相戀三年殊橙,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片狱从。...
    茶點故事閱讀 38,117評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡膨蛮,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出季研,到底是詐尸還是另有隱情敞葛,我是刑警寧澤,帶...
    沈念sama閱讀 33,756評論 4 324
  • 正文 年R本政府宣布训貌,位于F島的核電站制肮,受9級特大地震影響冒窍,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜豺鼻,卻給世界環(huán)境...
    茶點故事閱讀 39,324評論 3 307
  • 文/蒙蒙 一综液、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧儒飒,春花似錦谬莹、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至井誉,卻和暖如春蕉扮,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背颗圣。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評論 1 262
  • 我被黑心中介騙來泰國打工喳钟, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人在岂。 一個月前我還...
    沈念sama閱讀 45,578評論 2 355
  • 正文 我出身青樓奔则,卻偏偏與公主長得像,于是被迫代替她去往敵國和親蔽午。 傳聞我的和親對象是個殘疾皇子易茬,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,877評論 2 345

推薦閱讀更多精彩內(nèi)容