第一章 初始kafka
參考書籍: 朱小廝--深入理解Kafka 核心設(shè)計(jì)與實(shí)踐原理
Kafka體系結(jié)構(gòu)
-
Kafka體系架構(gòu)包含若干Producer, 若干Broker , 若干Consumer,以及一個(gè)Zookeeper集群鞭盟。
- Zookeeper是Kafka用來負(fù)責(zé)集群元數(shù)據(jù)的管理齿诉、控制器的選舉等操作粤剧。
- Producer:生產(chǎn)者抵恋,即發(fā)送消息的一方弧关。生產(chǎn)者負(fù)責(zé)創(chuàng)建消息世囊,然后將其投遞到Kafka中
- Broker:一個(gè)獨(dú)立的Kafka服務(wù)節(jié)點(diǎn)株憾。 一個(gè)或多個(gè)Broker組成了一個(gè)Kafka集群
- Consumer: 消費(fèi)者墙歪,也就是接收消息的一方虹菲。消費(fèi)者連接到Kafka上并拉取消息届惋,進(jìn)行相應(yīng)的業(yè)務(wù)邏輯處理
kafka體系架構(gòu)
主題和分區(qū)
Kafka的每條消息都屬于一個(gè)主題郑藏,生產(chǎn)者負(fù)責(zé)將消息發(fā)送到特定的主題,而消費(fèi)者負(fù)責(zé)訂閱主題并消費(fèi)
-
一個(gè)主題可以細(xì)分為多個(gè)分區(qū)拌牲,一個(gè)分區(qū)屬于單個(gè)主題。 分區(qū)可以看成是一個(gè)可追加的日志文件失驶, 消息在被追加到分區(qū)日志文件時(shí)會(huì)分配一個(gè)
偏移量
土居, 偏移量是消息在分區(qū)中的唯一標(biāo)識擦耀,Kafka保證了 偏移量在分區(qū)中是有序的眷蜓。消息寫入 每一條消息發(fā)送時(shí)會(huì)根據(jù)分區(qū)規(guī)則選擇存儲(chǔ)到哪一個(gè)分區(qū)吁系,在主題創(chuàng)建之后可以通過修改分區(qū)的數(shù)量實(shí)現(xiàn)水平擴(kuò)展。
多副本(Replica機(jī)制)
- 多副本機(jī)制是通過增加副本數(shù)量進(jìn)行數(shù)據(jù)冗余碧聪,從而提高容災(zāi)能力逞姿。 副本之間是“一主多從”的關(guān)系。其中leader副本負(fù)責(zé)處理讀寫請求栋烤,follower副本只負(fù)責(zé)與leader副本的消息同步 (區(qū)別于讀寫分離) 明郭, 當(dāng)leader副本宕機(jī)時(shí)通過leader選舉和失效轉(zhuǎn)移,保證了Kafka的高可用性话侄。
- folower副本的消息相對于leader副本具有一定的滯后性
幾個(gè)重要名詞概念
AR (Assigned Replicas): 分區(qū)中的所有副本
ISR (In-Sync-Replicas): 與leader副本保持一定程度同步的副本
-
OSR(Out-of-Sync-Replicas): 與leader副本滯后過多的副本
即 AR = ISR + OR年堆;
HW(High WaterMark): 高水位, 用來標(biāo)記一個(gè)特定的消息偏移量痒蓬,消費(fèi)者只能拉取到這個(gè)offset之前的消息(可見性)
LEO( Log End Offset) : 標(biāo)志著當(dāng)前日志文件中下一條待寫入消息的offset 谊却。 分區(qū)ISR集合中的每個(gè)副本都維護(hù)自身的LEO炎辨,而ISR集合中的最小LEO為分區(qū)的HW碴萧,對消費(fèi)者而言只能消費(fèi)HW之前的消息破喻。
第二章 生產(chǎn)者
KafkaProducer是線程安全的
public class KafkaProducerAnalysis {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "producer.client.id.demo");
return props;
}
public static void main(String[] args) {
Properties props = initConfig();
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello, Kafka!");
producer.send(record);
}
}
- 生產(chǎn)邏輯的幾個(gè)步驟
- 配置生產(chǎn)者客戶端參數(shù)并創(chuàng)建生產(chǎn)者實(shí)例
- 構(gòu)建待發(fā)送消息
- 發(fā)送消息
- 關(guān)閉生產(chǎn)者實(shí)例
發(fā)送消息的三種模式
public Future<RecordMetadata> send(ProducerRecord<K, V> record);
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
- 發(fā)后即忘
- 只管發(fā)送消息,不關(guān)心信息是否正確到達(dá)几莽。
- 優(yōu)點(diǎn):性能最高,吞吐量大 缺點(diǎn):會(huì)造成數(shù)據(jù)丟失姨夹,可靠性低
- 同步
- 發(fā)送消息后返回Future對象峭沦,調(diào)用get()方法時(shí)阻塞等待熙侍,直到發(fā)送成功或出現(xiàn)異常
- 優(yōu)點(diǎn):可靠性高,如有異程曜纾可處理或進(jìn)行消息重發(fā) 缺點(diǎn):性能低矛辕,造成阻塞
- 異步
- 發(fā)送消息時(shí)指定回調(diào)函數(shù)聊品,Kafka在返回響應(yīng)時(shí)會(huì)調(diào)用該函數(shù)實(shí)現(xiàn)異步的發(fā)送確認(rèn)翻屈。
- 在同一個(gè)分區(qū)中,如果消息record1比record2先發(fā)送厘贼,那么它會(huì)保證callback1在callback2之前調(diào)用嘴秸。
序列化器
- 生產(chǎn)者使用序列化器將對象轉(zhuǎn)換為字節(jié)數(shù)組凭疮,才能通過網(wǎng)絡(luò)發(fā)送給Kafka
- 消費(fèi)者使用反序列化其把Kafka中收到的字節(jié)數(shù)組轉(zhuǎn)換為相應(yīng)的對象哭尝。
- 因此生產(chǎn)者的序列化器和消費(fèi)者使用的反序列化器要一一對應(yīng)。
分區(qū)器
分區(qū)器 是根據(jù)key這個(gè)字段來計(jì)算partition值桶唐。它的作用是為消息分配分區(qū)
生產(chǎn)者攔截器
生產(chǎn)者攔截器既可用來在消息發(fā)送前做一些準(zhǔn)備工作如 按照某個(gè)規(guī)則過濾掉不符合要求的消息尤泽,修改消息內(nèi)容等坯约。也可以用來在發(fā)送回調(diào)邏輯前做一些定制化需求,如統(tǒng)計(jì)工作卿拴。 還可以指定多個(gè)攔截器形成攔截器鏈
生產(chǎn)者整體架構(gòu)
- 整個(gè)生產(chǎn)者客戶端由 主線程和Sender線程構(gòu)成
- 在④中,是用于緩存消息梨与,以便Sender線程進(jìn)行批量發(fā)送堕花,進(jìn)而減少網(wǎng)絡(luò)傳輸
- 在⑤中,是將 <分區(qū)粥鞋,消息集合> 轉(zhuǎn)化為 <brokerId, 消息集合>缘挽。 即邏輯地址到物理地址的轉(zhuǎn)化
- 在⑦中,用于緩存尚未收到回應(yīng)的消息呻粹,以便異常時(shí)可進(jìn)行重發(fā)
- 重要參數(shù) max.in.flight.requests.per 默認(rèn)值為5到踏,即每個(gè)連接最多只能緩存5個(gè)未響應(yīng)的請求窝稿。 可類比于TCP連接中的滑動(dòng)窗口大小
元數(shù)據(jù)的更新
元數(shù)據(jù)是指Kafka集群中的元數(shù)據(jù)踪少,這些元數(shù)據(jù)記錄了集群中有哪些主題,這些主題有哪些分區(qū),每個(gè)分區(qū)的leader副本分配在哪個(gè)節(jié)點(diǎn)上,follwer副本分配在哪些節(jié)點(diǎn)上,哪些副本在AR,ISR集合中,集群有哪些節(jié)點(diǎn),控制節(jié)點(diǎn)又是哪一個(gè)等信息愁铺。
元數(shù)據(jù)更新會(huì)挑選 InFlightRequests中當(dāng)前負(fù)載最小的節(jié)點(diǎn)發(fā)送更新元數(shù)據(jù)請求瓶竭。 由于Sender線程需要更新荧恍,而主線程需要讀取。因此數(shù)據(jù)同步問題也要考慮蔽介。使用synchronized和final保證武花。
幾個(gè)重要的參數(shù)
- acks : 用來指定分區(qū)中必須要有多少個(gè)副本收到這條消息累铅,這樣生產(chǎn)者才認(rèn)為消息寫入成功
- 取值為1 : 只要leader副本成功寫入消息投储,就會(huì)收到kafka的成功響應(yīng)
- 取值為0: 不需要等待任何服務(wù)器響應(yīng)勋眯,寫入就認(rèn)為成功
- 取值為-1或all:需要等待ISR中的所有副本都成功寫入消息讶坯,才會(huì)收到kafka的成功響應(yīng)
- max.request.size
- 限制生產(chǎn)者客戶端能發(fā)送消息最大值
- retires 洼冻、retry.backoff.ms
- 配置生產(chǎn)者重試次數(shù) 屋彪、 兩次重試的時(shí)間間隔
- max.in.flight.requests.per.connection
- 默認(rèn)值為5蟹但,即每個(gè)連接最多只能緩存5個(gè)未響應(yīng)的請求。
- 當(dāng)此參數(shù) > 1 沙郭,則會(huì)因?yàn)橹匕l(fā)而出現(xiàn)錯(cuò)序的問題
第三章 消費(fèi)者
kafkaConsumer是線程不安全的
消費(fèi)者和消費(fèi)者組
每個(gè)消費(fèi)者只能消費(fèi)所分配到的分區(qū)中的消息再扭,即每一個(gè)分區(qū)只能被一個(gè)消費(fèi)者組中的一個(gè)消費(fèi)者所消費(fèi)
-
當(dāng)消費(fèi)組內(nèi)的消費(fèi)者個(gè)數(shù)變化時(shí)對應(yīng)的分區(qū)分配演變?nèi)缦拢海J(rèn)的RangeAssinor為例)
消費(fèi)者與消費(fèi)者組 消費(fèi)者與消費(fèi)者組的模型讓整體消費(fèi)能力具有了伸縮性÷缭洌可以增加消費(fèi)者個(gè)數(shù)來提高(或降低)整體消費(fèi)能力
-
當(dāng)消費(fèi)者過多怨愤,出現(xiàn)消費(fèi)者分配不到任何分區(qū)時(shí)柿汛,那么這些消費(fèi)者將無法消費(fèi)消息貌笨,造成浪費(fèi)
消費(fèi)組內(nèi)有過多的消費(fèi)者 -
Kafka基于消費(fèi)者和消費(fèi)者組模型 支持了 點(diǎn)對點(diǎn)和發(fā)布/訂閱兩種模式
- 如果所有消費(fèi)者都隸屬于同一個(gè)消費(fèi)組,那么所有的消息都會(huì)被均衡地投遞給每一個(gè)消費(fèi)者祠汇,即每條消息只會(huì)被一個(gè)消費(fèi)者處理屿良,相當(dāng)于點(diǎn)對點(diǎn)模式
- 如果所有的消費(fèi)者都隸屬于不同的消費(fèi)組喷橙,那么所有的消息都會(huì)被廣播給所有的消費(fèi)者,即每條消息會(huì)被所有的消費(fèi)者處理登舞,相當(dāng)于發(fā)布/訂閱模式的應(yīng)用
每一個(gè)消費(fèi)者只隸屬于一個(gè)消費(fèi)組。消息發(fā)送時(shí)可指定消費(fèi)者組管挟, 消費(fèi)者客戶端通過group.id配置消費(fèi)者組名稱守谓,默認(rèn)為空字符串。
消費(fèi)者客戶端開發(fā)
KafkaConsumer是非線程安全的
public class KafkaConsumerAnalysis {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";
public static final AtomicBoolean isRunning = new AtomicBoolean(true);
public static Properties initConfig() {
Properties props = new Properties();
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("client.id", "consumer.client.id.demo");
return props;
}
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
try {
while (isRunning.get()) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = " + record.topic()
+ ", partition = " + record.partition()
+ ", offset = " + record.offset());
System.out.println("key = " + record.key()
+ ", value = " + record.value());
//do something to process record.
}
}
} catch (Exception e) {
log.error("occur exception ", e);
} finally {
consumer.close();
}
}
}
-
Kafka的消費(fèi)邏輯
- 配置消費(fèi)者客戶端參數(shù)及創(chuàng)建相應(yīng)消費(fèi)者實(shí)例
- 訂閱主題
- 拉取消息并消費(fèi)
- 提交消費(fèi)位移(后面會(huì)講)
- 關(guān)閉消費(fèi)者實(shí)例
-
訂閱主題和分區(qū)的細(xì)節(jié)
-
有三種訂閱的方式。 集合訂閱的方式subscribe(Collection)闽铐、正則表達(dá)式訂閱方式subscribe(Pattern)、
指定分區(qū)的訂閱方式assign(Collection)
subscribe訂閱主題時(shí)具有再平衡(后面會(huì)講)的功能奶浦,而assign沒有兄墅。
-
反序列化器
將字節(jié)數(shù)組轉(zhuǎn)化為對象,與生產(chǎn)者的序列化器要一一對應(yīng)
消息消費(fèi)
幾個(gè)常用API
#KafkaConsumer
public ConsumerRecords<K, V> poll(Duration timeout);
#ConsumerRecords
public List<ConsumerRecord<K, V>> records(TopicPartition partition);
public Iterable<ConsumerRecord<K, V>> records(String topic);
- timeout參數(shù)用于控制阻塞時(shí)間澳叉,再消費(fèi)者緩沖區(qū)里沒有數(shù)據(jù)時(shí)會(huì)發(fā)生阻塞
- 按照分區(qū)的維度隙咸,獲取拉取消息中 某個(gè)分區(qū)的所有記錄
- 按照主題的維度,獲取拉取消息中 某個(gè)主題的所有記錄
位移提交
-
Consumer會(huì)記錄上一次的消費(fèi)位移成洗,并進(jìn)行持久化保存五督。 存儲(chǔ)到Kafka的內(nèi)部主題__consumer_offset中
消費(fèi)位移 -
位移的提交時(shí)機(jī)也有講究,可能會(huì)造成重復(fù)消費(fèi)和消息丟失的現(xiàn)象
拉取到消息之后就進(jìn)行位移提交瓶殃, 若消費(fèi)到一半時(shí)宕機(jī)充包,則造成消失丟失現(xiàn)象
-
消費(fèi)完所有消息后在進(jìn)行位移提交, 若消費(fèi)到一半時(shí)宕機(jī),則造成重復(fù)消費(fèi)現(xiàn)象
消費(fèi)位移的提交位置
Kafka默認(rèn)的消費(fèi)位移提交方式是自動(dòng)提交(定期)基矮。
enable.auto.commit
默認(rèn)為true;auto.commit.interval.ms
配置提交的周期淆储。自動(dòng)提交的動(dòng)作是在poll()方法的邏輯中完成的,會(huì)在每次拉取請求之間檢查是否可以進(jìn)行位移提交家浇。-
自動(dòng)提交會(huì)造成重復(fù)消費(fèi)和消息丟失的現(xiàn)象
重復(fù)消費(fèi): 消費(fèi)到一半時(shí)宕機(jī)本砰,而尚未提交,則造成重復(fù)消費(fèi)
-
消息丟失:如圖線程A進(jìn)行拉取消息到緩存钢悲,線程B從緩存中處理邏輯点额。 若線程B處理到一半時(shí)宕機(jī),那么下次恢復(fù)時(shí)又從【X+7】開始拉取莺琳,造成了【x+4】-【X+7】消息的丟失
自動(dòng)位移提交中消息丟失的情況
可以看出自動(dòng)提交編碼簡單但會(huì)出現(xiàn)消息丟失和重復(fù)消費(fèi)現(xiàn)象还棱,并且無法做到精確的位移管理,因此Kafka還提供了 手動(dòng)提交的方式惭等。通常不是拉取到消息就算消費(fèi)完成了诱贿,而是當(dāng)我們通過這條消息完成一系列業(yè)務(wù)處理后,才認(rèn)為消息被成功消費(fèi)咕缎。開啟手動(dòng)提交需要
enable.auto.commit
設(shè)置為false-
手動(dòng)提交可分為 同步提交和異步提交珠十。 即commitSync()和commitAsync()兩種方式
以下是同步提交和異步提交的一些案例
#拉取所有消息并處理后進(jìn)行同步提交 while (running.get()) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { //do some logical processing. } consumer.commitSync(); } #批量處理+批量提交 int minBatchSize = 200; while (running.get()) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { //do some logical processing with buffer. consumer.commitSync(); buffer.clear(); } } #帶參數(shù)的同步位移提交,可控制提交的offset凭豪,該案例為每消費(fèi)一條就提交一次 while (running.get()) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { //do some logical processing. long offset = record.offset(); TopicPartition partition = new TopicPartition(record.topic(), record.partition()); consumer.commitSync(Collections .singletonMap(partition, new OffsetAndMetadata(offset + 1))); } } #按分區(qū)粒度同步提交消費(fèi)位移焙蹭,每處理完一個(gè)分區(qū)就提交一次 while (running.get()) { ConsumerRecords<String, String> records = consumer.poll(1000); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { //do some logical processing. } long lastConsumedOffset = partitionRecords .get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastConsumedOffset + 1))); } } #異步提交,可指定提交完成后的回調(diào)函數(shù) public void commitAsync()嫂伞; public void commitAsync(OffsetCommitCallback callback)孔厉; public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
異步提交也存在重復(fù)消費(fèi)的問題。如果先提交了【X+2】帖努,再提交【X+8】撰豺。如果后者提交成功而前者提交失敗。 如果此時(shí)前者進(jìn)行重試提交拼余,那么成功后會(huì)造成數(shù)據(jù)的重復(fù)消費(fèi)污桦。
對于異步提交可以設(shè)置一個(gè)遞增的序號維護(hù)異步提交的順序,如當(dāng)位移提交失敗需要重試提交時(shí)匙监,對比所提交的位移和維護(hù)的序號大小凡橱,如果前者小于后者,就不需要再重復(fù)提交了亭姥。
控制或關(guān)閉消費(fèi)
- KafkaConsumer提供了暫停pause()和恢復(fù)resume()某些分區(qū)的消費(fèi)稼钩;以及關(guān)閉close()的方法'
指定位移消費(fèi)
- 如用一個(gè)新的消費(fèi)者組來消費(fèi)主題時(shí),由于沒有可查找的消費(fèi)偏移达罗,因此會(huì)按照
auto.offset.reset
配置來決定從何處開始消費(fèi)消息坝撑。 - seek()方法為我們提供了從特定位置讀取消息的能力,通過這個(gè)方法來向前跳過若干消息,也可以通過這個(gè)方法來向后回溯若干消息
再均衡
- 再均衡是指分區(qū)所屬權(quán)從一個(gè)消費(fèi)者轉(zhuǎn)移到另一個(gè)消費(fèi)者的行為巡李。它為消費(fèi)者組具備高可用性和伸縮性提供了保障抚笔。
- 在再均衡期間,消費(fèi)組內(nèi)的消費(fèi)者是無法讀取消息的。即在再均衡發(fā)生期間的一段時(shí)間內(nèi)击儡,消費(fèi)者會(huì)變得不可用
- 當(dāng)一個(gè)分區(qū)被重新分配給另一個(gè)消費(fèi)者時(shí),消費(fèi)者當(dāng)前的狀態(tài)也會(huì)丟失蝠引。比如消費(fèi)者消費(fèi)完某個(gè)分區(qū)的一部分信息但還沒來得及提交消費(fèi)位移就發(fā)生了再均衡操作
- 調(diào)用subscribe()方法時(shí)可以提供再均衡監(jiān)聽器來設(shè)置 消費(fèi)者停止讀取消息 和 開始讀取消費(fèi) 時(shí)的回調(diào)函數(shù)
消費(fèi)者攔截器
- 在消費(fèi)者poll()方法返回之前 阳谍、 提交完消息位移之后 、關(guān)閉消費(fèi)者之前 調(diào)用攔截器中的方法
- 多個(gè)消費(fèi)者攔截器也能組成攔截器鏈
多線程的實(shí)現(xiàn)
-
一個(gè)消費(fèi)線程消費(fèi)一個(gè)或多個(gè)分區(qū)
一個(gè)消費(fèi)線程消費(fèi)一個(gè)或多個(gè)分區(qū) 多個(gè)消費(fèi)線程同時(shí)消費(fèi)同一個(gè)分區(qū) (會(huì)使得提交偏移量異常復(fù)雜)
-
類似IO多路復(fù)用螃概,一個(gè)線程拉取消息矫夯,拉取消息后提交到線程池中。 (因?yàn)槔∠?huì)比處理業(yè)務(wù)邏輯快)
- 會(huì)出現(xiàn)消息丟失的情況吊洼,線程A消費(fèi)0-99 训貌,線程B消費(fèi)100-199 后提交。 線程A未提交而掛掉后冒窍,那么0-99這一段數(shù)據(jù)就丟失了
第三種方式