一龙优、前言
Kafka不僅提供了生產(chǎn)者客戶端愕贡,同時(shí)也提供了消費(fèi)者客戶端(Cosumer API)济舆。應(yīng)用程序通過消費(fèi)者客戶端來訂閱主題抵怎,然后向broker發(fā)送拉取請(qǐng)求浓恶,獲取想要消費(fèi)的主題分區(qū)消息進(jìn)行消費(fèi)玫坛。本文接下來將對(duì)Kafka Java版消費(fèi)者客戶端相關(guān)知識(shí)進(jìn)行講解。
二包晰、重要概念
2.1 消費(fèi)模式
- 消費(fèi)模式分為兩種:推送(push)模式和拉仁啤(pull)模式。
- 推送模式是由broker將數(shù)據(jù)推送到消費(fèi)者伐憾,實(shí)時(shí)性好勉痴,可以讓消費(fèi)者能夠以可能的最大速率消費(fèi);不過树肃,由于broker控制著數(shù)據(jù)傳輸速率蒸矛,不同的消費(fèi)者消費(fèi)速率可能相差很大,當(dāng)消費(fèi)速率低于生產(chǎn)速率時(shí)胸嘴,消費(fèi)者系統(tǒng)消費(fèi)不及時(shí)雏掠,會(huì)出現(xiàn)服務(wù)拒絕, 所以推送模式系統(tǒng)很難處理不同消費(fèi)速率的消費(fèi)者劣像。一般采用推送模式的系統(tǒng)乡话,需要設(shè)置一定的流控規(guī)則來避免大流量壓垮消費(fèi)者。
- 拉取模式是消費(fèi)者主動(dòng)從broker那里拉取數(shù)據(jù)驾讲,消費(fèi)速率由消費(fèi)者控制蚊伞,可以避免出現(xiàn)推送模式那種超載壓垮消費(fèi)者的情況。不足的地方是吮铭,如果broker中沒有數(shù)據(jù)时迫,消費(fèi)者可能會(huì)在一個(gè)緊密的循環(huán)中結(jié)束輪詢,然后處于busy-waiting狀態(tài)谓晌,直到數(shù)據(jù)到來掠拳。
- Kafka采用pull模式。消費(fèi)者主動(dòng)從broker那里拉取數(shù)據(jù)進(jìn)行消費(fèi)纸肉。為了避免busy-waiting溺欧,Kafka在pull請(qǐng)求中加入?yún)?shù)喊熟,使得消費(fèi)者在一個(gè)“l(fā)ong pull”中阻塞等待,直到數(shù)據(jù)到來姐刁。
- Kafka 消費(fèi)者可以通過pull模式在需要的時(shí)候通過回退位置再次消費(fèi)對(duì)應(yīng)的數(shù)據(jù)芥牌。
2.2 消費(fèi)組
- Kafka每個(gè)消費(fèi)者都有一個(gè)對(duì)應(yīng)的消費(fèi)組聂使。消費(fèi)組是一個(gè)邏輯概念壁拉,用來對(duì)消費(fèi)者進(jìn)行歸類。
- 如果多個(gè)消費(fèi)者屬于同一個(gè)消費(fèi)組柏靶,那么每條消息只會(huì)被其中一個(gè)消費(fèi)者處理弃理,相當(dāng)于點(diǎn)對(duì)點(diǎn)模式應(yīng)用。
- 如果多個(gè)消費(fèi)者不屬于同一個(gè)消費(fèi)組屎蜓,那么每條消息會(huì)被其中每個(gè)消費(fèi)者都處理痘昌,相當(dāng)于訂閱/發(fā)布模式應(yīng)用。
- 同一個(gè)消費(fèi)組內(nèi)的消費(fèi)者與分區(qū)是對(duì)應(yīng)的炬转,每一個(gè)分區(qū)只能被一個(gè)消費(fèi)組中的一個(gè)消費(fèi)者所消費(fèi)辆苔。如果同一個(gè)消費(fèi)組的消費(fèi)者個(gè)數(shù)大于主題分區(qū)數(shù),則會(huì)存在有消費(fèi)者不能拉取處理這個(gè)主題消息的情況返吻,如圖消費(fèi)者C4無法消費(fèi)主題Topic1的任何消息姑子。
- 舉個(gè)例子乎婿,假如主題Topic1有消息P0M1测僵、P1M1、P2M2谢翎,消費(fèi)組A的消費(fèi)者C0會(huì)處理P0M1捍靠,消費(fèi)者C1會(huì)處理P1M1,消費(fèi)者C2會(huì)處理P2M2森逮,消費(fèi)組A其他消費(fèi)者不能處理這幾條消息榨婆,但消費(fèi)組B內(nèi)的消費(fèi)者可以消費(fèi)處理這三條消息。
- 消費(fèi)組消費(fèi)者模型可以讓整體的消費(fèi)能力具有橫向伸縮性褒侧,大大提高了系統(tǒng)消費(fèi)的靈活性良风。
2.3 分區(qū)自動(dòng)再均衡
- 在多個(gè)消費(fèi)者的情況下可以根據(jù)分區(qū)分配策略來自動(dòng)分配各個(gè)消費(fèi)者與分區(qū)的關(guān)系。當(dāng)消費(fèi)組內(nèi)的消費(fèi)者增加或減少時(shí)闷供,分區(qū)分配關(guān)系會(huì)自動(dòng)調(diào)整烟央,以實(shí)現(xiàn)消費(fèi)負(fù)載均衡及故障自動(dòng)轉(zhuǎn)移。
三歪脏、消費(fèi)消息過程分析
3.1 核心類KafkaConsumer
- KafkaConsumer類是整個(gè)消費(fèi)者API的核心類疑俭。應(yīng)用程序消費(fèi)消息的主要操作都需要通過它來完成。它是一個(gè)非線程安全的類婿失。
3.2 消費(fèi)消息主要步驟
- 消費(fèi)者消費(fèi)消息主要步驟如上圖所示钞艇,包括設(shè)置消費(fèi)端配置參數(shù)啄寡、創(chuàng)建KafkaConsumer實(shí)例、訂閱主題哩照、拉取主題分區(qū)消息數(shù)據(jù)挺物、提交位移給broker、關(guān)閉KafkaConsumer資源等飘弧。代碼邏輯演示如下:
public static void main(String[] args) {
//1 設(shè)置配置參數(shù)
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "messageGroup");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//2 創(chuàng)建KafkaConsumer實(shí)例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//3 訂閱主題
consumer.subscribe(Arrays.asList("topic_test_1"));
try {
while(true) {
//4 拉取主題分區(qū)消息數(shù)據(jù)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
// 業(yè)務(wù)處理姻乓。。眯牧。
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
//5 提交消費(fèi)位移(最后一個(gè)offset+1)
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
//6 關(guān)閉KafkaConsumer實(shí)例
consumer.close();
}
}
3.2 必要的配置參數(shù)
- bootstrap.servers:與生產(chǎn)者客戶端此參數(shù)作用類似蹋岩。用于建立初始連接到Kafka集群(主機(jī)/端口對(duì))的配置列表。這個(gè)配置不需要包含整個(gè)集群的服務(wù)器学少。不論這個(gè)參數(shù)配置了哪些服務(wù)器來初始化連接剪个,客戶端都是會(huì)均衡地與集群中的所有服務(wù)器建立連接。如果集群變化版确,元數(shù)據(jù)會(huì)動(dòng)態(tài)更新扣囊。為了避免單節(jié)點(diǎn)風(fēng)險(xiǎn),最好配置多臺(tái)主機(jī)绒疗。
- group.id:消費(fèi)組id侵歇,用來標(biāo)識(shí)消費(fèi)者所屬的消費(fèi)者組的唯一字符串。
- enable.auto.commit:是否自動(dòng)提交位移吓蘑,默認(rèn)為true惕虑,如果為true,表示消費(fèi)者的位移將定期在后臺(tái)提交磨镶±D瑁可設(shè)置為手動(dòng)提交消費(fèi)位移。
- key.deserializer:key的反序列化器類琳猫,需要與生產(chǎn)端的key序列化器類對(duì)應(yīng)伟叛。從broker拉取的消息格式是字節(jié)數(shù)組類型的,所以需要相應(yīng)的反序列操作將消息還原成原有的數(shù)據(jù)類型脐嫂。
- value.deserializer:value的反序列化器類统刮,需要與生產(chǎn)端的value序列化器類對(duì)應(yīng)。
3.3 訂閱主題
- 消費(fèi)者可以一次訂閱若干個(gè)主題账千。既可以集合形式訂閱多個(gè)主題侥蒙,也可以以正則表達(dá)式形式訂閱特定模式的主題,還可以訂閱主題的特定分區(qū)蕊爵。KafkaConsumer提供的訂閱主題方法如下:
// 集合形式訂閱主題
void subscribe(Collection<String> topics);
// 集合形式訂閱主題辉哥,增加再均衡監(jiān)聽
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
// 正則表達(dá)式形式訂閱主題
void subscribe(Pattern pattern);
// 正則表達(dá)式形式訂閱主題,增加再均衡監(jiān)聽
void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
// 手工分配特定分區(qū),沒有自動(dòng)再均衡功能
void assign(Collection<TopicPartition> partitions);
- 訂閱主題后醋旦,也可取消主題訂閱恒水。KafkaConsumer提供的方法如下:
void unsubscribe();
3.4 拉取數(shù)據(jù)
- 訂閱閱主題分區(qū)后,才可以拉取數(shù)據(jù)饲齐,Kafka采用拉榷ち琛(pull)模式,主動(dòng)從broker那里拉取數(shù)據(jù)進(jìn)行消費(fèi)捂人。應(yīng)用程序需要不斷輪詢調(diào)用poll()方法御雕,在每次輪詢中,消費(fèi)者將嘗試使用最后消費(fèi)位移+1作為開始偏移量滥搭,并按順序獲取酸纲。最后消費(fèi)位移可以調(diào)用seek()方法手工覆蓋。
- poll()方法有個(gè)timeout參數(shù)瑟匆,用來表示超時(shí)阻塞的最大時(shí)間闽坡。KafkaConsumer提供的方法如下:
// 從broker拉取數(shù)據(jù)
ConsumerRecords<K, V> poll(Duration timeout);
// 覆蓋消費(fèi)者將在下一次輪詢中使用的最后消費(fèi)位移
void seek(TopicPartition partition, long offset);
3.5 提交位移
- 拉取數(shù)據(jù)進(jìn)行消費(fèi)處理后,需要將所有主題分區(qū)列表的消費(fèi)位移提交給broker愁溜,然后將消費(fèi)位移存儲(chǔ)在Kafka內(nèi)部的主題__consumer_offsets中疾嗅。
- 提交的位移應(yīng)該是應(yīng)用程序?qū)⒁褂玫南乱粭l消息,即最后消費(fèi)位移 + 1冕象;
- Kafka提交位移的方式分為自動(dòng)提交和手工提交代承。默認(rèn)為自動(dòng)提交方式。在默認(rèn)的方式下渐扮,消費(fèi)者每隔5秒會(huì)將拉取到的每個(gè)分區(qū)中最大的消息位移進(jìn)行提交论悴。自動(dòng)提交間隔時(shí)間可通過參數(shù)(auto.commit.interval.ms)可修改。
- 自動(dòng)提交位移在一些非正常情況下會(huì)出現(xiàn)重復(fù)消費(fèi)和丟失消息的現(xiàn)象席爽;假設(shè)在拉取一批消息消費(fèi)后意荤,還未自動(dòng)提交位移之前啊片,消費(fèi)者奔潰了只锻,那么又會(huì)從上一次消費(fèi)位移處拉取數(shù)據(jù),這樣便出現(xiàn)了重復(fù)消費(fèi)的現(xiàn)象紫谷。
雖然不能完全避免重復(fù)消費(fèi)現(xiàn)象齐饮,但是我們可以通過縮小位移提交時(shí)間間隔來減少重復(fù)消費(fèi)的消息區(qū)間。 - 手動(dòng)提交分為同步提交和異步提交笤昨。KafkaConsumer提供的方法如下:
void commitSync();
void commitSync(Duration timeout);
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout);
void commitAsync();
void commitAsync(OffsetCommitCallback callback);
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);
3.6 優(yōu)雅結(jié)束消費(fèi)
- 在消費(fèi)完成后祖驱,如何優(yōu)雅的跳出輪詢,結(jié)束消費(fèi)呢瞒窒,KafkaConsumer提供了wakeup()方法捺僻,這個(gè)方法是線程安全的。調(diào)用wakeup()方法的線程將拋出WakeupException異常,終止長(zhǎng)輪詢匕坯。跳出輪詢后束昵,需要顯式調(diào)用close()方法關(guān)閉動(dòng)作來釋放系統(tǒng)占用資源,包括內(nèi)存資源葛峻,socket網(wǎng)絡(luò)連接資源锹雏。
3.7 指定位移消費(fèi)
- 在消費(fèi)者遇到關(guān)閉、崩潰以及再均衡時(shí)术奖,可以讓其他接替的消費(fèi)者通過seek()方法設(shè)置指定位移來保證消費(fèi)的連續(xù)礁遵。
- 當(dāng)Kafka中沒有初始消費(fèi)位移或者服務(wù)器上的當(dāng)前消費(fèi)位移不再存在時(shí)(例如數(shù)據(jù)已經(jīng)被刪除),默認(rèn)會(huì)采用自動(dòng)重置位移為最后的位移進(jìn)行消費(fèi)采记∮赌停可以通過參數(shù)auto.offset.reset修改重置方式。
四唧龄、重要配置參數(shù)
消費(fèi)者客戶端有一些重要的參數(shù)晰赞,掌握這些參數(shù)有助于我們?cè)趯?shí)際應(yīng)用中的故障排查和性能調(diào)優(yōu)。
fetch.min.bytes
- 描述:服務(wù)器應(yīng)該為消費(fèi)者拉取請(qǐng)求返回的最小數(shù)據(jù)量选侨。如果可用數(shù)據(jù)不足掖鱼,請(qǐng)求將等待大量數(shù)據(jù)累積,然后再響應(yīng)請(qǐng)求援制。默認(rèn)1字節(jié)戏挡,意味著拉取請(qǐng)求很快會(huì)被響應(yīng)。將這個(gè)參數(shù)適當(dāng)調(diào)大可以提高服務(wù)器的吞吐量晨仑,但會(huì)導(dǎo)致服務(wù)器等待更大數(shù)量的數(shù)據(jù)積累褐墅,增加一些響應(yīng)延遲。
- 類型:int洪己。
- 默認(rèn)值:1妥凳。
fetch.max.bytes
- 描述:服務(wù)器應(yīng)該為消費(fèi)者拉取請(qǐng)求返回的最大數(shù)據(jù)量。消息記錄是由消費(fèi)者分批拉取的答捕,如果拉取的第一個(gè)非空分區(qū)中的第一個(gè)消息記錄Batch字節(jié)數(shù)大于此值逝钥,那么仍將返回該消息記錄Batch,以確保消費(fèi)者能夠繼續(xù)執(zhí)行拱镐。因此艘款,這不是一個(gè)絕對(duì)的最大值。
- 類型:int沃琅。
- 默認(rèn)值:52428800哗咆。
max.partition.fetch.bytes
- 描述:服務(wù)器將返回的每個(gè)分區(qū)的最大數(shù)據(jù)量。記錄由消費(fèi)者分批拉取益眉。如果拉取的第一個(gè)非空分區(qū)中的第一個(gè)消息記錄Batch字節(jié)數(shù)大于此值晌柬,那么仍將返回該消息記錄Batch姥份,以確保消費(fèi)者能夠繼續(xù)執(zhí)行。注意年碘,該配置需結(jié)合 fetch.max.bytes以及兩個(gè)主題配置message.max.bytes 和 max.message.bytes使用殿衰。
- 類型:int。
- 默認(rèn)值:1048576盛泡。
auto.offset.reset
- 描述:位移重置方式闷祥。當(dāng)Kafka中沒有初始消費(fèi)位移或者服務(wù)器上的當(dāng)前消費(fèi)位移不再存在時(shí)(例如數(shù)據(jù)已經(jīng)被刪除)場(chǎng)景下使用,有下列幾種方式:
(1)earliest: 自動(dòng)重置位移到最早位移位置
(2)latest: 自動(dòng)重置位移為最后位移位置
(3)none: 如果沒有找到消費(fèi)者之前位移傲诵,那么向消費(fèi)者直接拋出異常
(4)其他: 向消費(fèi)者拋出異常凯砍。 - 類型:string。
- 默認(rèn)值:latest拴竹。
isolation.level
- 描述:控制如何讀取以事務(wù)方式編寫的消息悟衩。如果設(shè)置為read_committed, consumer.poll()將只返回已提交的事務(wù)性消息栓拜。如果設(shè)置為read_uncommitted(默認(rèn)值)座泳,consumer.poll()將返回所有消息,甚至是已經(jīng)中止的事務(wù)性消息幕与。如果是非事務(wù)性消息挑势,在這兩種模式都會(huì)無條件返回。
- 類型:string啦鸣。
- 默認(rèn)值:read_uncommitted潮饱。
session.timeout.ms
- 描述:在使用Kafka的組管理工具時(shí),用于檢測(cè)用戶故障的超時(shí)時(shí)間诫给。消費(fèi)者定期向broker發(fā)送心跳香拉,以表明其活動(dòng)狀態(tài)。如果在此會(huì)話超時(shí)過期之前broker沒有接收到心跳中狂,則broker將從消費(fèi)組中刪除此消費(fèi)者并啟動(dòng)重新平衡凫碌。注意,該值必須在broker配置中按group.min.session.timeout和group.max.session.timeout.ms配置的允許范圍內(nèi)胃榕。
- 類型:int盛险。
- 默認(rèn)值:10000。
heartbeat.interval.ms
- 描述:在使用Kafka的組管理工具時(shí)勤晚,心跳到消費(fèi)者協(xié)調(diào)器之間的間隔時(shí)間枉层。心跳用于確保消費(fèi)者的會(huì)話保持活動(dòng)狀態(tài),并在新消費(fèi)者加入或離開消費(fèi)組時(shí)幫助重新平衡赐写。該值必須設(shè)置低于session.timeout.ms,通常設(shè)置不超過1/3膜赃。它可以調(diào)整更低挺邀,用來控制正常重新平衡的間隔時(shí)間。
- 類型:int。
- 默認(rèn)值:3000端铛。