版本選擇
- 0.7版本:
只有基礎(chǔ)消息隊列功能凭疮,無副本饭耳;打死也不使用 - 0.8版本:
增加了副本機制,新的producer API执解;建議使用0.8.2.2版本蹦锋;不建議使用0.8.2.0之后的producer API - 0.9版本:
增加權(quán)限和認證渔彰,新的consumer API霍狰,Kafka Connect功能勘天;不建議使用consumer API; - 0.10版本:
引入Kafka Streams功能右蕊,bug修復(fù)琼稻;建議版本0.10.2.2;建議使用新版consumer API - 0.11版本:
producer API冪等尤泽,事物API欣簇,消息格式重構(gòu)规脸;建議版本0.11.0.3;謹慎對待消息格式變化 - 1.0和2.0版本:
Kafka Streams改進熊咽;建議版本2.0莫鸭;
部署需要注意
重要參數(shù)
- Broker 端參數(shù)
- log.dirs:這是非常重要的參數(shù),指定了 Broker 需要使用的若干個文件目錄路徑横殴。要知道這個參數(shù)是沒有默認值的被因,這說明什么?這說明它必須由你親自指定衫仑。log.dir:注意這是 dir梨与,結(jié)尾沒有 s,說明它只能表示單個路徑文狱,它是補充上一個參數(shù)用的粥鞋。
提升讀寫性能:比起單塊磁盤,多塊物理磁盤同時讀寫數(shù)據(jù)有更高的吞吐量
Zookeeper
zookeeper.connect:zk1:2181,zk2:2181,zk3:2181/kafka1listeners:學(xué)名叫監(jiān)聽器
其實就是告訴外部連接者要通過什么協(xié)議訪問指定主機名和端口開放的 Kafka 服務(wù)瞄崇。advertised.listeners:和 listeners 相比多了個 advertised呻粹。Advertised 的含義表示宣稱的、公布的苏研,就是說這組監(jiān)聽器是 Broker 用于對外發(fā)布的等浊。host.name/port:列出這兩個參數(shù)就是想說你把它們忘掉吧,壓根不要為它們指定值摹蘑,畢竟都是過期的參數(shù)了筹燕。
最好全部使用主機名,即 Broker 端和 Client 端應(yīng)用配置中全部填寫主機名
- topic:
- auto.create.topics.enable:是否允許自動創(chuàng)建 Topic衅鹿。推薦設(shè)置為false
規(guī)避線上自動創(chuàng)建topic問題 - unclean.leader.election.enable:是否允許 Unclean Leader 選舉撒踪。推薦為false
規(guī)避落后的副本自動選為leader,導(dǎo)致數(shù)據(jù)丟失. - auto.leader.rebalance.enable:是否允許定期進行 Leader 選舉。推薦false
規(guī)避自動切換leader造成不必要的性能開銷
- 消息保存三兄弟
- log.retention.{hours|minutes|ms}:這是個“三兄弟”塘安,都是控制一條消息數(shù)據(jù)被保存多長時間糠涛。從優(yōu)先級上來說 ms 設(shè)置最高、minutes 次之兼犯、hours 最低
- log.retention.bytes:這是指定 Broker 為消息保存的總磁盤容量大小。
- message.max.bytes:控制 Broker 能夠接收的最大消息大小集漾。
- 創(chuàng)建topic時帶參數(shù)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880
- 修改topic參數(shù)
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760
Kafka jvm設(shè)置推薦參數(shù)
- KAFKA_HEAP_OPTS:指定堆大小切黔。
- KAFKA_JVM_PERFORMANCE_OPTS:指定 GC 參數(shù)。
kafka 零拷貝
- 數(shù)據(jù)在磁盤和網(wǎng)絡(luò)進行傳輸時避免昂貴的內(nèi)核態(tài)數(shù)據(jù)拷貝具篇,從而實現(xiàn)快速的數(shù)據(jù)傳輸
分區(qū)策略
- 輪詢策略
生產(chǎn)者程序會按照輪詢的方式在主題的所有分區(qū)間均勻地“碼放”消息纬霞。
輪詢策略有非常優(yōu)秀的負載均衡表現(xiàn),它總是能保證消息最大限度地被平均分配到所有分區(qū)上驱显,故默認情況下它是最合理的分區(qū)策略诗芜,也是我們最常用的分區(qū)策略之一 - 隨機策略
我們隨意地將消息放置到任意一個分區(qū)上
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
- 按消息鍵保序策略[官方?jīng)]有該策略]
Kafka 允許為每條消息定義消息鍵瞳抓,簡稱為 Key。這個 Key 的作用非常大伏恐,它可以是一個有著明確業(yè)務(wù)含義的字符串孩哑,比如客戶代碼、部門編號或是業(yè)務(wù) ID 等翠桦;也可以用來表征消息元數(shù)據(jù)横蜒。特別是在 Kafka 不支持時間戳的年代,在一些場景中销凑,工程師們都是直接將消息創(chuàng)建時間封裝進 Key 里面的丛晌。一旦消息被定義了 Key,那么你就可以保證同一個 Key 的所有消息都進入到相同的分區(qū)里面斗幼,由于每個分區(qū)下的消息處理都是有順序的澎蛛,故這個策略被稱為按消息鍵保序策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
kafka生產(chǎn)者優(yōu)化
- 開啟壓縮
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 開啟GZIP壓縮
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);
-
壓縮參數(shù)
結(jié)論:
- 吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在壓縮比方面蜕窿,zstd > LZ4 > GZIP > Snappy谋逻。
- 壓縮比方面: zstd > LZ4 > GZIP > snappy
kafka消息不丟失保證策略
- 生產(chǎn)者 選擇合適的api
Producer 永遠要使用帶有回調(diào)通知的發(fā)送 API,也就是說不要使用 producer.send(msg)渠羞,而要使用 producer.send(msg, callback) - 消費者維護好偏移量
維持先消費消息(閱讀)斤贰,再更新位移(書簽)的順序
如果是多線程異步處理消費消息,Consumer 程序不要開啟自動提交位移次询,而是要應(yīng)用程序手動提交位移
3.最佳實踐
- 不要使用 producer.send(msg)荧恍,而要使用 producer.send(msg, callback)。記住屯吊,一定要使用帶有回調(diào)通知的 send 方法送巡。
- 設(shè)置 acks = all。acks 是 Producer 的一個參數(shù)盒卸,代表了你對“已提交”消息的定義骗爆。如果設(shè)置成 all,則表明所有副本 Broker 都要接收到消息蔽介,該消息才算是“已提交”摘投。這是最高等級的“已提交”定義。
- 設(shè)置 retries 為一個較大的值虹蓄。這里的 retries 同樣是 Producer 的參數(shù)犀呼,對應(yīng)前面提到的 Producer 自動重試。當(dāng)出現(xiàn)網(wǎng)絡(luò)的瞬時抖動時薇组,消息發(fā)送可能會失敗外臂,此時配置了 retries > 0 的 Producer 能夠自動重試消息發(fā)送,避免消息丟失律胀。
- 設(shè)置 unclean.leader.election.enable = false宋光。這是 Broker 端的參數(shù)貌矿,它控制的是哪些 Broker 有資格競選分區(qū)的 Leader。如果一個 Broker 落后原先的 Leader 太多罪佳,那么它一旦成為新的 Leader逛漫,必然會造成消息的丟失。故一般都要將該參數(shù)設(shè)置成 false菇民,即不允許這種情況的發(fā)生尽楔。
- 設(shè)置 replication.factor >= 3。這也是 Broker 端的參數(shù)第练。其實這里想表述的是阔馋,最好將消息多保存幾份,畢竟目前防止消息丟失的主要機制就是冗余娇掏。
- 設(shè)置 min.insync.replicas > 1呕寝。這依然是 Broker 端參數(shù),控制的是消息至少要被寫入到多少個副本才算是“已提交”婴梧。設(shè)置成大于 1 可以提升消息持久性下梢。在實際環(huán)境中千萬不要使用默認值 1。
- 確保 replication.factor > min.insync.replicas塞蹭。如果兩者相等孽江,那么只要有一個副本掛機,整個分區(qū)就無法正常工作了番电。我們不僅要改善消息的持久性岗屏,防止數(shù)據(jù)丟失,還要在不降低可用性的基礎(chǔ)上完成漱办。
- 推薦設(shè)置成 replication.factor = min.insync.replicas + 1这刷。確保消息消費完成再提交。
- Consumer 端有個參數(shù) enable.auto.commit娩井,最好把它設(shè)置成 false暇屋,并采用手動提交位移的方式。就像前面說的洞辣,這對于單 Consumer 多線程處理的場景而言是至關(guān)重要的咐刨。
kafka中的攔截器
- 參數(shù)設(shè)置
Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 攔截器1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 攔截器2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
- 生產(chǎn)者
public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {
private Jedis jedis; // 省略Jedis初始化
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
jedis.incr("totalSentMessage");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<java.lang.String, ?> configs) {
}
- 消費者
public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private Jedis jedis; //省略Jedis初始化
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
long lantency = 0L;
for (ConsumerRecord<String, String> record : records) {
lantency += (System.currentTimeMillis() - record.timestamp());
}
jedis.incrBy("totalLatency", lantency);
long totalLatency = Long.parseLong(jedis.get("totalLatency"));
long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
java生產(chǎn)者是如何管理tcp連接的?
Apache Kafka的所有通信都是基于TCP的,而不是于HTTP或其他協(xié)議的
1 為什采用TCP?
(1)TCP擁有一些高級功能扬霜,如多路復(fù)用請求和同時輪詢多個連接的能力所宰。
(2)很多編程語言的HTTP庫功能相對的比較簡陋。
名詞解釋:
多路復(fù)用請求:multiplexing request畜挥,是將兩個或多個數(shù)據(jù)合并到底層—物理連接中的過程。TCP的多路復(fù)用請求會在一條物理連接上創(chuàng)建若干個虛擬連接婴谱,每個虛擬連接負責(zé)流轉(zhuǎn)各自對應(yīng)的數(shù)據(jù)流蟹但。嚴格講:TCP并不能多路復(fù)用躯泰,只是提供可靠的消息交付語義保證,如自動重傳丟失的報文华糖。
2 何時創(chuàng)建TCP連接麦向?
(1)在創(chuàng)建KafkaProducer實例時,
A:生產(chǎn)者應(yīng)用會在后臺創(chuàng)建并啟動一個名為Sender的線程客叉,該Sender線程開始運行時诵竭,首先會創(chuàng)建與Broker的連接。
B:此時不知道要連接哪個Broker兼搏,kafka會通過METADATA請求獲取集群的元數(shù)據(jù)卵慰,連接所有的Broker。
(2)還可能在更新元數(shù)據(jù)后佛呻,或在消息發(fā)送時
3 何時關(guān)閉TCP連接
(1)Producer端關(guān)閉TCP連接的方式有兩種:用戶主動關(guān)閉裳朋,或kafka自動關(guān)閉。
A:用戶主動關(guān)閉吓著,通過調(diào)用producer.close()方關(guān)閉鲤嫡,也包括kill -9暴力關(guān)閉。
B:Kafka自動關(guān)閉绑莺,這與Producer端參數(shù)connection.max.idles.ms的值有關(guān)暖眼,默認為9分鐘,9分鐘內(nèi)沒有任何請求流過纺裁,就會被自動關(guān)閉诫肠。這個參數(shù)可以調(diào)整。
C:第二種方式中对扶,TCP連接是在Broker端被關(guān)閉的区赵,但這個連接請求是客戶端發(fā)起的,對TCP而言這是被動的關(guān)閉浪南,被動關(guān)閉會產(chǎn)生大量的CLOSE_WAIT連接笼才。
代碼實現(xiàn)冪等性
props.put(“enable.idempotence”, ture),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG络凿, true)骡送。
代碼開啟生產(chǎn)者事務(wù)
開啟 enable.idempotence = true。
設(shè)置 Producer 端參數(shù) transactional. id絮记。最好為其設(shè)置一個有意義的名字
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
消費組平衡問題
- 組成員數(shù)量發(fā)生變化摔踱。
- 訂閱主題數(shù)量發(fā)生變化。
- 訂閱主題的分區(qū)數(shù)發(fā)生變化怨愤。
kafka位移
Committing Offsets:Consumer 需要向 Kafka 匯報自己的位移數(shù)據(jù)派敷,這個匯報過程被稱為提交位移
- 自動提交位移
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
// 開啟自動提交位移
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
- 手動提交
try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
process(records); // 處理消息
commitAysnc(); // 使用異步提交規(guī)避阻塞
}
} catch(Exception e) {
handle(e); // 處理異常
} finally {
try {
consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}
- 精細提交位移
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
……
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
process(record); // 處理消息
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1);
if(count % 100 == 0)
consumer.commitAsync(offsets, null); // 回調(diào)處理邏輯是null
count++;
}
}
對于一次要處理很多消息的 Consumer 而言,它會關(guān)心社區(qū)有沒有方法允許它在消費的中間進行位移提交篮愉。比如前面這個 5000 條消息的例子腐芍,你可能希望每處理完 100 條消息就提交一次位移,這樣能夠避免大批量的消息重新消費试躏。
CommitFailedException異常怎么處理猪勇?
- 場景一
當(dāng)消息處理的總時間超過預(yù)設(shè)的 max.poll.interval.ms 參數(shù)值時,Kafka Consumer 端會拋出 CommitFailedException 異常
四種處理方式
- 縮短單條消息處理的時間
- 增加 Consumer 端允許下游系統(tǒng)消費一批消息的最大時長
- 減少下游系統(tǒng)一次性消費的消息總數(shù)
-
下游系統(tǒng)使用多線程來加速消費
- 場景二
Kafka Java Consumer 端還提供了一個名為 Standalone Consumer 的獨立消費者颠蕴。它沒有消費者組的概念泣刹,每個消費者實例都是獨立工作的,彼此之間毫無聯(lián)系犀被。不過椅您,你需要注意的是,獨立消費者的位移提交機制和消費者組是一樣的弱判,因此獨立消費者的位移提交也必須遵守之前說的那些規(guī)定襟沮,比如獨立消費者也要指定 group.id 參數(shù)才能提交位移.
多線程開發(fā)消費者實例
- 消費者程序啟動多個線程,每個線程維護專屬的 KafkaConsumer 實例昌腰,負責(zé)完整的消息獲取开伏、消息處理流程
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public void run() {
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords records =
consumer.poll(Duration.ofMillis(10000));
// 執(zhí)行消息處理邏輯
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}
// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
- 2.消費者程序使用單或多線程獲取消息,同時創(chuàng)建多個消費線程執(zhí)行消息處理邏
private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...
private int workerNum = ...;
executors = new ThreadPoolExecutor(
workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
...
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
for (final ConsumerRecord record : records) {
executors.submit(new Worker(record));
}
}
..
kafka副本機制
kafka如何控制請求?
https://www.processon.com/view/link/5d481e6be4b07c4cf3031755
消費組重平衡
- heartbeat.interval.ms
這個參數(shù)的真正作用是控制重平衡通知的頻率遭商。如果你想要消費者實例更迅速地得到通知固灵,那么就可以給這個參數(shù)設(shè)置一個非常小的值,這樣消費者就能更快地感知到重平衡已經(jīng)開啟了
費者組狀態(tài)機
Broker 端重平衡場景剖析
場景一:新成員入組劫流。
場景二:組成員主動離組
場景四:重平衡時協(xié)調(diào)者對組內(nèi)成員提交位移的處理巫玻。
控制器
1作用:
控制器組件(Controller),是Apache Kafka的核心組件祠汇。它的主要作用是Apache Zookeeper的幫助下管理和協(xié)調(diào)整個Kafka集群仍秤。
集群中任意一臺Broker都能充當(dāng)控制器的角色,但在運行過程中可很,只能有一個Broker成為控制器诗力。
2 特點:控制器是重度依賴Zookeeper。
3 產(chǎn)生:
控制器是被選出來的我抠,Broker在啟動時苇本,會嘗試去Zookeeper中創(chuàng)建/controller節(jié)點。Kafka當(dāng)前選舉控制器的規(guī)則是:第一個成功創(chuàng)建/controller節(jié)點的Broker會被指定為控制器菜拓。
4 功能:
A :主題管理(創(chuàng)建瓣窄,刪除,增加分區(qū))
當(dāng)執(zhí)行kafka-topics腳本時纳鼎,大部分的后臺工作都是控制器來完成的俺夕。
B :分區(qū)重分配
Kafka-reassign-partitions腳本提供的對已有主題分區(qū)進行細粒度的分配功能裳凸。
C :Preferred領(lǐng)導(dǎo)者選舉
Preferred領(lǐng)導(dǎo)者選舉主要是Kafka為了避免部分Broker負載過重而提供的一種換Leade的方案。
D :集群成員管理(新增Broker啥么,Broker主動關(guān)閉登舞,Broker宕機)
控制器組件會利用watch機制檢查Zookeeper的/brokers/ids節(jié)點下的子節(jié)點數(shù)量變更。當(dāng)有新Broker啟動后悬荣,它會在/brokers下創(chuàng)建專屬的znode節(jié)點。一旦創(chuàng)建完畢疙剑,Zookeeper會通過Watch機制將消息通知推送給控制器氯迂,這樣,控制器就能自動地感知到這個變化言缤。進而開啟后續(xù)新增Broker作業(yè)嚼蚀。
偵測Broker存活性則是依賴于剛剛提到的另一個機制:臨時節(jié)點。每個Broker啟動后管挟,會在/brokers/ids下創(chuàng)建一個臨時的znode轿曙。當(dāng)Broker宕機或主機關(guān)閉后,該Broker與Zookeeper的會話結(jié)束僻孝,這個znode會被自動刪除导帝。同理,Zookeeper的Watch機制將這一變更推送給控制器穿铆,這樣控制器就能知道有Broker關(guān)閉或宕機了您单,從而進行善后。
E :數(shù)據(jù)服務(wù)
控制器上保存了最全的集群元數(shù)據(jù)信息荞雏,其他所有Broker會定期接收控制器發(fā)來的元數(shù)據(jù)更新請求虐秦,從而更新其內(nèi)存中的緩存數(shù)據(jù)。
5 控制器保存的數(shù)據(jù)
控制器中保存的這些數(shù)據(jù)在Zookeeper中也保存了一份凤优。每當(dāng)控制器初始化時悦陋,它都會從Zookeeper上讀取對應(yīng)的元數(shù)據(jù)并填充到自己的緩存中。
6 控制器故障轉(zhuǎn)移(Failover)
故障轉(zhuǎn)移是指:當(dāng)運行中的控制器突然宕機或意外終止時筑辨,Kafka能夠快速地感知到俺驶,并立即啟用備用控制器來替代之前失敗的控制器。
7 內(nèi)部設(shè)計原理
A :控制器的內(nèi)部設(shè)計相當(dāng)復(fù)雜
控制器是多線程的設(shè)計挖垛,會在內(nèi)部創(chuàng)建很多線程痒钝。如:
(1)為每個Broker創(chuàng)建一個對應(yīng)的Socket連接,然后在創(chuàng)建一個專屬的線程痢毒,用于向這些Broker發(fā)送特定的請求送矩。
(2)控制連接zookeeper,也會創(chuàng)建單獨的線程來處理Watch機制通知回調(diào)。
(3)控制器還會為主題刪除創(chuàng)建額外的I/O線程哪替。
這些線程還會訪問共享的控制器緩存數(shù)據(jù)栋荸,為了維護數(shù)據(jù)安全性,控制在代碼中大量使用ReetrantLock同步機制,進一步拖慢了整個控制器的處理速度晌块。
B :在0.11版對控制器的低沉設(shè)計進了重構(gòu)爱沟。
(1)最大的改進是:把多線程的方案改成了單線程加事件對列的方案。
a. 單線程+隊列的實現(xiàn)方式:社區(qū)引入了一個事件處理線程匆背,統(tǒng)一處理各種控制器事件呼伸,然后控制器將原來執(zhí)行的操作全部建模成一個個獨立的事件,發(fā)送到專屬的事件隊列中钝尸,供此線程消費括享。
b. 單線程不代表之前提到的所有線程都被干掉了,控制器只是把緩存狀態(tài)變更方面的工作委托給了這個線程而已珍促。
(2)第二個改進:將之前同步操作Zookeeper全部改為異步操作铃辖。
a. Zookeeper本身的API提供了同步寫和異步寫兩種方式。同步操作zk猪叙,在有大量主題分區(qū)發(fā)生變更時娇斩,Zookeeper容易成為系統(tǒng)的瓶頸。
高水位的討論
Leader 副本保持同步條件
- 該遠程 Follower 副本在 ISR 中穴翩。
- 該遠程 Follower 副本 LEO 值落后于 Leader 副本 LEO 值的時間犬第,不超過 Broker 端參數(shù) replica.lag.time.max.ms 的值。如果使用默認值的話藏否,就是不超過 10 秒瓶殃。
kafka調(diào)優(yōu)
1.操作系統(tǒng)調(diào)優(yōu)
系統(tǒng)時禁掉 atime 更新
至少選擇 ext4 或 XFS
ulimit -n 和 vm.max_map_count
操作系統(tǒng)頁緩存
2.JVM 層調(diào)優(yōu)
設(shè)置堆大小 6-8G
GC 收集器 G1
- Broker 端調(diào)優(yōu)
Producer -> Broker -> Consumer三端kafka版本要保持一致
4.應(yīng)用層調(diào)優(yōu)
不要頻繁地創(chuàng)建 Producer 和 Consumer 對象實例
用完及時關(guān)閉
合理利用多線程來改善性能