前言
讀完本文,你將了解到如下知識(shí)點(diǎn):
- kafka 的消費(fèi)者 和 消費(fèi)者組
- 如何正確使用kafka consumer
- 常用的 kafka consumer 配置
消費(fèi)者 和 消費(fèi)者組
-
什么是消費(fèi)者扰她?
顧名思義九杂,消費(fèi)者就是從kafka集群消費(fèi)數(shù)據(jù)的客戶端菇晃,如下圖,展示了一個(gè)消費(fèi)者從一個(gè)topic中消費(fèi)數(shù)據(jù)的模型image 單個(gè)消費(fèi)者模型存在的問(wèn)題健爬?
如果這個(gè)時(shí)候 kafka 上游生產(chǎn)的數(shù)據(jù)很快佩耳,超過(guò)了這個(gè)消費(fèi)者1
的消費(fèi)速度砂竖,那么就會(huì)導(dǎo)致數(shù)據(jù)堆積真椿,產(chǎn)生一些大家都知道的蛋疼事情了,那么我們只能加強(qiáng)消費(fèi)者
的消費(fèi)能力乎澄,所以也就有了我們下面來(lái)說(shuō)的消費(fèi)者組
-
什么是消費(fèi)者組突硝?
所謂消費(fèi)者組
,其實(shí)就是一組消費(fèi)者
的集合置济,當(dāng)我們看到下面這張圖是不是就特別舒服了解恰,我們采用了一個(gè)消費(fèi)組
來(lái)消費(fèi)這個(gè)topic
,眾人拾柴火焰高浙于,其消費(fèi)能力那是按倍數(shù)遞增的护盈,所以這里我們一般來(lái)說(shuō)都是采用消費(fèi)者組
來(lái)消費(fèi)數(shù)據(jù),而不會(huì)是單消費(fèi)者
來(lái)消費(fèi)數(shù)據(jù)的羞酗。這里值得我們注意的是:- 一個(gè)
topic
可以被 多個(gè)消費(fèi)者組
消費(fèi)腐宋,但是每個(gè)消費(fèi)者組
消費(fèi)的數(shù)據(jù)是 互不干擾的,也就是說(shuō)檀轨,每個(gè)消費(fèi)組
消費(fèi)的都是 完整的數(shù)據(jù) 胸竞。 - 一個(gè)分區(qū)只能被 同一個(gè)消費(fèi)組內(nèi) 的一個(gè)
消費(fèi)者
消費(fèi),而 不能拆給多個(gè)消費(fèi)者 消費(fèi)
- 一個(gè)
-
是不是一個(gè) 消費(fèi)組 的 消費(fèi)者 越多其消費(fèi)能力就越強(qiáng)呢裤园?
從圖3
我們就很好的可以回答這個(gè)問(wèn)題了撤师,我們可以看到消費(fèi)者4
是完全沒(méi)有消費(fèi)任何的數(shù)據(jù)的剂府,所以如果你想要加強(qiáng)消費(fèi)者組
的能力拧揽,除了添加消費(fèi)者,分區(qū)的數(shù)量也是需要跟著增加的,只有這樣他們的并行度才能上的去淤袜,消費(fèi)能力才會(huì)強(qiáng)痒谴。
為了提高 消費(fèi)組 的 消費(fèi)能力,我是不是可以隨便添加 分區(qū) 和 消費(fèi)者 呢铡羡?
答案當(dāng)然是否定的啦积蔚。。烦周。嘿嘿
我們看到圖2
尽爆,一般來(lái)說(shuō)我們建議消費(fèi)者
數(shù)量 和分區(qū)
數(shù)量是一致的,當(dāng)我們的消費(fèi)能力不夠時(shí)读慎,就必須通過(guò)調(diào)整分區(qū)的數(shù)量來(lái)提高并行度漱贱,但是,我們應(yīng)該盡量來(lái)避免這種情況發(fā)生夭委,比如:
現(xiàn)在我們需要在圖2
的基礎(chǔ)上增加一個(gè)分區(qū)4
幅狮,那么這個(gè)分區(qū)4
該由誰(shuí)來(lái)消費(fèi)呢?這個(gè)時(shí)候kafka會(huì)進(jìn)行分區(qū)再均衡
株灸,來(lái)為這個(gè)分區(qū)分配消費(fèi)者崇摄,分區(qū)再均衡
期間該消費(fèi)組
是不可用的,并且作為一個(gè)被消費(fèi)者
慌烧,分區(qū)數(shù)的改動(dòng)將影響到每一個(gè)消費(fèi)者組
逐抑,所以在創(chuàng)建topic
的時(shí)候,我們就應(yīng)該考慮好分區(qū)數(shù)屹蚊,來(lái)盡量避免這種情況發(fā)生-
分區(qū)分配過(guò)程
上面我們提到了為 分區(qū)分配消費(fèi)者泵肄,那么我們現(xiàn)在就來(lái)看看分配過(guò)程是怎么樣的。- 確定 群組協(xié)調(diào)器
每當(dāng)我們創(chuàng)建一個(gè)消費(fèi)組淑翼,kafka 會(huì)為我們分配一個(gè) broker 作為該消費(fèi)組的 coordinator(協(xié)調(diào)器) - 注冊(cè)消費(fèi)者 并選出 leader consumer
當(dāng)我們的有了 coordinator 之后腐巢,消費(fèi)者將會(huì)開始往該 coordinator上進(jìn)行注冊(cè),第一個(gè)注冊(cè)的 消費(fèi)者將成為該消費(fèi)組的 leader玄括,后續(xù)的 作為 follower冯丙, - 當(dāng) leader 選出來(lái)后,他會(huì)從coordinator那里實(shí)時(shí)獲取分區(qū) 和 consumer 信息遭京,并根據(jù)分區(qū)策略給每個(gè)consumer 分配 分區(qū)胃惜,并將分配結(jié)果告訴 coordinator。
- follower 消費(fèi)者將從 coordinator 那里獲取到自己相關(guān)的分區(qū)信息進(jìn)行消費(fèi)哪雕,對(duì)于所有的 follower 消費(fèi)者而言船殉,他們只知道自己消費(fèi)的分區(qū),并不知道其他消費(fèi)者的存在斯嚎。
- 至此利虫,消費(fèi)者都知道自己的消費(fèi)的分區(qū)挨厚,分區(qū)過(guò)程結(jié)束,當(dāng)發(fā)送分區(qū)再均衡的時(shí)候糠惫,leader 將會(huì)重復(fù)分配過(guò)程
- 確定 群組協(xié)調(diào)器
消費(fèi)組與分區(qū)重平衡
可以看到疫剃,當(dāng)新的消費(fèi)者加入消費(fèi)組,它會(huì)消費(fèi)一個(gè)或多個(gè)分區(qū)硼讽,而這些分區(qū)之前是由其他消費(fèi)者負(fù)責(zé)的巢价;另外,當(dāng)消費(fèi)者離開消費(fèi)組(比如重啟固阁、宕機(jī)等)時(shí)壤躲,它所消費(fèi)的分區(qū)會(huì)分配給其他分區(qū)。這種現(xiàn)象稱為重平衡(rebalance)备燃。重平衡是Kafka一個(gè)很重要的性質(zhì)柒爵,這個(gè)性質(zhì)保證了高可用和水平擴(kuò)展。不過(guò)也需要注意到赚爵,在重平衡期間棉胀,所有消費(fèi)者都不能消費(fèi)消息,因此會(huì)造成整個(gè)消費(fèi)組短暫的不可用冀膝。而且唁奢,將分區(qū)進(jìn)行重平衡也會(huì)導(dǎo)致原來(lái)的消費(fèi)者狀態(tài)過(guò)期,從而導(dǎo)致消費(fèi)者需要重新更新狀態(tài)窝剖,這段期間也會(huì)降低消費(fèi)性能麻掸。后面我們會(huì)討論如何安全的進(jìn)行重平衡以及如何盡可能避免。
消費(fèi)者通過(guò)定期發(fā)送心跳(hearbeat)到一個(gè)作為組協(xié)調(diào)者(group coordinator)的broker來(lái)保持在消費(fèi)組內(nèi)存活赐纱。這個(gè)broker不是固定的脊奋,每個(gè)消費(fèi)組都可能不同。當(dāng)消費(fèi)者拉取消息或者提交時(shí)疙描,便會(huì)發(fā)送心跳诚隙。
如果消費(fèi)者超過(guò)一定時(shí)間沒(méi)有發(fā)送心跳,那么它的會(huì)話(session)就會(huì)過(guò)期起胰,組協(xié)調(diào)者會(huì)認(rèn)為該消費(fèi)者已經(jīng)宕機(jī)久又,然后觸發(fā)重平衡⌒澹可以看到地消,從消費(fèi)者宕機(jī)到會(huì)話過(guò)期是有一定時(shí)間的,這段時(shí)間內(nèi)該消費(fèi)者的分區(qū)都不能進(jìn)行消息消費(fèi)畏妖;通常情況下脉执,我們可以進(jìn)行優(yōu)雅關(guān)閉,這樣消費(fèi)者會(huì)發(fā)送離開的消息到組協(xié)調(diào)者戒劫,這樣組協(xié)調(diào)者可以立即進(jìn)行重平衡而不需要等待會(huì)話過(guò)期半夷。
在0.10.1版本婆廊,Kafka對(duì)心跳機(jī)制進(jìn)行了修改,將發(fā)送心跳與拉取消息進(jìn)行分離玻熙,這樣使得發(fā)送心跳的頻率不受拉取的頻率影響否彩。另外更高版本的Kafka支持配置一個(gè)消費(fèi)者多長(zhǎng)時(shí)間不拉取消息但仍然保持存活疯攒,這個(gè)配置可以避免活鎖(livelock)嗦随。活鎖敬尺,是指應(yīng)用沒(méi)有故障但是由于某些原因不能進(jìn)一步消費(fèi)枚尼。
創(chuàng)建Kafka消費(fèi)者
讀取Kafka消息只需要?jiǎng)?chuàng)建一個(gè)kafkaConsumer,創(chuàng)建過(guò)程與KafkaProducer非常相像砂吞。我們需要使用四個(gè)基本屬性署恍,bootstrap.servers、key.deserializer蜻直、value.deserializer和group.id盯质。其中,bootstrap.servers與創(chuàng)建KafkaProducer的含義一樣概而;key.deserializer和value.deserializer是用來(lái)做反序列化的呼巷,也就是將字節(jié)數(shù)組轉(zhuǎn)換成對(duì)象;group.id不是嚴(yán)格必須的赎瑰,但通常都會(huì)指定王悍,這個(gè)參數(shù)是消費(fèi)者的消費(fèi)組。
下面是一個(gè)代碼樣例:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
訂閱主題
創(chuàng)建完消費(fèi)者后我們便可以訂閱主題了餐曼,只需要通過(guò)調(diào)用subscribe()方法即可压储,這個(gè)方法接收一個(gè)主題列表,非常簡(jiǎn)單:
consumer.subscribe(Collections.singletonList("customerCountries"));
這個(gè)例子中只訂閱了一個(gè)customerCountries主題源譬。另外集惋,我們也可以使用正則表達(dá)式來(lái)匹配多個(gè)主題,而且訂閱之后如果又有匹配的新主題踩娘,那么這個(gè)消費(fèi)組會(huì)立即對(duì)其進(jìn)行消費(fèi)芋膘。正則表達(dá)式在連接Kafka與其他系統(tǒng)時(shí)非常有用。比如訂閱所有的測(cè)試主題:
consumer.subscribe("test.*");
拉取循環(huán)
消費(fèi)數(shù)據(jù)的API和處理方式很簡(jiǎn)單霸饲,我們只需要循環(huán)不斷拉取消息即可为朋。Kafka對(duì)外暴露了一個(gè)非常簡(jiǎn)潔的poll方法,其內(nèi)部實(shí)現(xiàn)了協(xié)作厚脉、分區(qū)重平衡习寸、心跳、數(shù)據(jù)拉取等功能傻工,但使用時(shí)這些細(xì)節(jié)都被隱藏了霞溪,我們也不需要關(guān)注這些孵滞。下面是一個(gè)代碼樣例:
try {
while (true) { //1)
ConsumerRecords<String, String> records = consumer.poll(100); //2)
for (ConsumerRecord<String, String> record : records) //3)
{
log.debug("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
int updatedCount = 1;
if (custCountryMap.countainsValue(record.value())) {
updatedCount = custCountryMap.get(record.value()) + 1;
}
custCountryMap.put(record.value(), updatedCount)
JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4))
}
}
} finally {
consumer.close(); //4
}
其中,代碼中標(biāo)注了幾點(diǎn)鸯匹,說(shuō)明如下:
1)這個(gè)例子使用無(wú)限循環(huán)消費(fèi)并處理數(shù)據(jù)坊饶,這也是使用Kafka最多的一個(gè)場(chǎng)景,后面我們會(huì)討論如何更好的退出循環(huán)并關(guān)閉殴蓬。
2)這是上面代碼中最核心的一行代碼匿级。我們不斷調(diào)用poll拉取數(shù)據(jù),如果停止拉取染厅,那么Kafka會(huì)認(rèn)為此消費(fèi)者已經(jīng)死亡并進(jìn)行重平衡痘绎。參數(shù)值是一個(gè)超時(shí)時(shí)間,指明線程如果沒(méi)有數(shù)據(jù)時(shí)等待多長(zhǎng)時(shí)間肖粮,0表示不等待立即返回孤页。
3)poll()方法返回記錄的列表,每條記錄包含key/value以及主題涩馆、分區(qū)行施、位移信息。
4)主動(dòng)關(guān)閉可以使得Kafka立即進(jìn)行重平衡而不需要等待會(huì)話過(guò)期魂那。
另外需要提醒的是蛾号,消費(fèi)者對(duì)象不是線程安全的,也就是不能夠多個(gè)線程同時(shí)使用一個(gè)消費(fèi)者對(duì)象冰寻;而且也不能夠一個(gè)線程有多個(gè)消費(fèi)者對(duì)象须教。簡(jiǎn)而言之,一個(gè)線程一個(gè)消費(fèi)者斩芭,如果需要多個(gè)消費(fèi)者那么請(qǐng)使用多線程來(lái)進(jìn)行一一對(duì)應(yīng)轻腺。
單線程版
package com.neuedu;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
public class Consumer {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers",
"hadoop03:9092,hadoop05:9092,hadoop06:9092");// 該地址是集群的子集,用來(lái)探測(cè)集群划乖。
props.put("group.id", "payment");// cousumer的分組id
props.put("enable.auto.commit", "true");// 自動(dòng)提交offsets
props.put("auto.commit.interval.ms", "1000");// 每隔1s贬养,自動(dòng)提交offsets
props.put("session.timeout.ms", "30000");// Consumer向集群發(fā)送自己的心跳,超時(shí)則認(rèn)為Consumer已經(jīng)死了琴庵,kafka會(huì)把它的分區(qū)分配給其他進(jìn)程
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");// 反序列化器
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("payment"));// 訂閱的topic,可以多個(gè)
// String topic = "payment";
// TopicPartition partition0 = new TopicPartition(topic, 0);
// TopicPartition partition1 = new TopicPartition(topic, 1);
// consumer.assign(Arrays.asList(partition0));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("賭東道賭東道賭東道賭東道賭東道賭東道 offset = %d, key = %s, value = %s, partition = %s",
record.offset(), record.key(), record.value(),record.partition());
System.out.println();
}
}
}
}
多線程版
package com.neuedu;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerRunnable implements Runnable {
// 每個(gè)線程維護(hù)私有的KafkaConsumer實(shí)例
private final KafkaConsumer<String, String> consumer;
public ConsumerRunnable(String brokerList, String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true"); //本例使用自動(dòng)提交位移
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
//分區(qū)分配策略
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic)); // 本例使用分區(qū)副本自動(dòng)分配策略
}
@Override
public void run() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200); // 本例使用200ms作為獲取超時(shí)時(shí)間
for (ConsumerRecord<String, String> record : records) {
// 這里面寫處理消息的邏輯误算,本例中只是簡(jiǎn)單地打印消息
System.out.println(Thread.currentThread().getName() + " consumed " + record.partition() +
"th message with offset: " + record.offset());
}
}
}
}
package com.neuedu;
import java.util.ArrayList;
import java.util.List;
public class ConsumerGroup {
private List<ConsumerRunnable> consumers;
public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
consumers = new ArrayList<>(consumerNum);
for (int i = 0; i < consumerNum; ++i) {
ConsumerRunnable consumerThread = new ConsumerRunnable(brokerList, groupId, topic);
consumers.add(consumerThread);
}
}
public void execute() {
for (ConsumerRunnable task : consumers) {
new Thread(task).start();
}
}
}
package com.neuedu;
public class ConsumerMain {
public static void main(String[] args) {
String brokerList = "hadoop03:9092";
String groupId = "testGroup1";
String topic = "payment";
int consumerNum = 2;
ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
consumerGroup.execute();
}
}
使用起來(lái)還是很簡(jiǎn)單的,不過(guò)如果想要用好 consumer迷殿,可能你還需要了解以下這些東西:
- 分區(qū)控制策略
- consumer 的一些常用配置
- offset 的控制
ok,那么我們接下來(lái)一個(gè)個(gè)來(lái)看吧庆寺。壤圃。乍桂。
分區(qū)控制策略
- 手動(dòng)控制分區(qū)
咱們先來(lái)說(shuō)下最簡(jiǎn)單的手動(dòng)分區(qū)控制扁凛,代碼如下:
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));
看起來(lái)只是把普通的訂閱方式修改成了訂閱知道 topic
的分區(qū),其余的還是照常使用,不過(guò)這里也需要注意一下的是:
- 一般只作為獨(dú)立消費(fèi)者,也就是不能加入消費(fèi)組,或者說(shuō)他本身就是作為一個(gè)消費(fèi)組存在,要保證這一點(diǎn),我們只需要保證其
group id
是唯一的就可以了轴猎。 - 對(duì)于
topic
的分區(qū)變動(dòng)不敏感垮斯,也就是說(shuō)當(dāng)topic
新增了分區(qū)熊杨,分區(qū)的數(shù)據(jù)將會(huì)發(fā)生改變钻趋,但該消費(fèi)組對(duì)此確是不感知的,依然照常運(yùn)行,所以很多時(shí)候需要你手動(dòng)consumer.partitionsFor()
去查看topic
的分區(qū)情況 - 不要和
subscription
混合使用
- 使用
partition.assignment.strategy
進(jìn)行分區(qū)策略配置
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
這里的話 kafka 是自帶兩種分區(qū)策略的拂封,為了方便理解镣衡,我們以如下場(chǎng)景為例來(lái)進(jìn)行解釋:
已知:
TopicA 有 3 個(gè) partition(分區(qū)):A-1,A-2,A-3;
TopicB 有 3 個(gè) partition(分區(qū)):B-1,B-2在孝,B-3顾彰;
ConsumerA 和 ConsumerB 作為一個(gè)消費(fèi)組 ConsumerGroup 同時(shí)消費(fèi) TopicA 和 TopicB
-
Range
該方式最大的特點(diǎn)就是會(huì)將連續(xù)的分區(qū)分配給一個(gè)消費(fèi)者厕隧,根據(jù)示例建丧,我們可以得出如下結(jié)論:ConsumerGroup 消費(fèi) TopicA 的時(shí)候:
ConsumerA 會(huì)分配到 A-1拴曲,A-2
ConsumerB 會(huì)分配到 A-3ConsumerGroup 消費(fèi) TopicB 的時(shí)候:
ConsumerA 會(huì)分配到 B-1叁熔,B-2
ConsumerB 會(huì)分配到 B-3所以:
ConsumerA 分配到了4個(gè)分區(qū): A-1,A-2,B-1胖秒,B-2
ConsumerB 分配到了2個(gè)分區(qū):A-3风题,B-3 -
RoundRobin
該方式最大的特點(diǎn)就是會(huì)以輪詢的方式將分區(qū)分配給一個(gè)個(gè)消費(fèi)者,根據(jù)示例围小,我們可以得出如下結(jié)論:ConsumerGroup 消費(fèi) TopicA 的時(shí)候:
ConsumerA 分配到 A-1
ConsumerB 分配到 A-2
ConsumerA 分配到 A-3ConsumerGroup 消費(fèi) TopicB 的時(shí)候,因?yàn)樯洗畏峙涞搅?ConsumerA昵骤,那么這次輪到 ConsumerB了 所以:
ConsumerB 分配到 B-1
ConsumerA 分配到 B-2
ConsumerB 分配到 B-3所以:
ConsumerA 分配到了4個(gè)分區(qū): A-1,A-3吩抓,B-2
ConsumerB 分配到了2個(gè)分區(qū):A-2涉茧,B-1,B-3
從上面我們也是可以看出這兩種策略的異同疹娶,RoundRobin 相比較 Range 會(huì)使得分區(qū)分配的更加的均衡伴栓,但如果是消費(fèi)單個(gè) topic ,那么其均衡是差不多的,而 Range 會(huì)比 RoundRobin 更具優(yōu)勢(shì)一點(diǎn)钳垮,至于這個(gè)優(yōu)勢(shì)惑淳,還得看你的具體業(yè)務(wù)了。
-
自定義的分區(qū)策略
上面兩種分區(qū)策略是 kafka 默認(rèn)自帶的策略饺窿,雖然大多數(shù)情況下夠用了歧焦,但是可能針對(duì)一些特殊需求,我們也可以定義自己的分區(qū)策略- Range分區(qū)策略源碼
如何自定義呢肚医?最好的方式莫過(guò)于看源碼是怎么實(shí)現(xiàn)的绢馍,然后自己依葫蘆畫瓢來(lái)一個(gè),所以我們先來(lái)看看 Range分區(qū)策略源碼肠套,如下舰涌,我只做了簡(jiǎn)單的注釋,因?yàn)樗旧硪埠芎?jiǎn)單你稚,重點(diǎn)看下assign
的參數(shù)以及返回注釋就 ok了
public class RangeAssignor extends AbstractPartitionAssignor{ //省略部分代碼瓷耙。。刁赖。搁痛。 /** * 根據(jù)訂閱者 和 分區(qū)數(shù)量來(lái)進(jìn)行分區(qū) * @param partitionsPerTopic: topic->分區(qū)數(shù)量 * @param subscriptions: memberId 消費(fèi)者id -> subscription 消費(fèi)者信息 * @return: memberId ->list<topic名稱 和 分區(qū)序號(hào)(id)> */ @Override public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) { //topic -> list<消費(fèi)者> Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions); //初始化 返回結(jié)果 Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<TopicPartition>()); for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) { //topic String topic = topicEntry.getKey(); // 消費(fèi)該topic的 consumer-id List<String> consumersForTopic = topicEntry.getValue(); //topic 的分區(qū)數(shù)量 Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic == null) continue; Collections.sort(consumersForTopic); //平均每個(gè)消費(fèi)者分配的 分區(qū)數(shù)量 int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); //平均之后剩下的 分區(qū)數(shù) int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); //這里就是將連續(xù)分區(qū)切開然后分配給每個(gè)消費(fèi)者 List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic); for (int i = 0, n = consumersForTopic.size(); i < n; i++) { int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); } } return assignment; } }
- 自定義一個(gè) 分區(qū)策略
這里先緩緩把,太簡(jiǎn)單把宇弛,沒(méi)什么用鸡典,太復(fù)雜把,一時(shí)也想不出好的場(chǎng)景涯肩,如果你有需求轿钠,歡迎留言,我們一起來(lái)實(shí)現(xiàn)
- Range分區(qū)策略源碼
Consumer 常用配置
首先病苗,我們都應(yīng)該知道,最全最全的文檔應(yīng)該是來(lái)自官網(wǎng)(雖然有時(shí)候可能官網(wǎng)找不到):
http://kafka.apachecn.org/documentation.html#newconsumerconfigs
嗯症汹,以下內(nèi)容來(lái)自 kafka權(quán)威指南 ,請(qǐng)?jiān)徫业男卸琛M匆小7峥:罄m(xù)有時(shí)間會(huì)把工作中的遇到的補(bǔ)充上
fetch.min.bytes
該屬性指定了消費(fèi)者從服務(wù)器獲取記錄的最小字節(jié)數(shù)。broker 在收到消費(fèi)者的數(shù)據(jù)請(qǐng)求時(shí)瞒斩,
如果可用的數(shù)據(jù)量小于fetch.min.bytes 指定的大小破婆,那么它會(huì)等到有足夠的可用數(shù)據(jù)時(shí)才把它返回給消費(fèi)者。這樣可以降低消費(fèi)者和 broker 的工作負(fù)載胸囱,因?yàn)樗鼈冊(cè)谥黝}不是很活躍的時(shí)候(或者一天里的低谷時(shí)段)就不需要來(lái)來(lái)回回地處理消息祷舀。如果沒(méi)有很多可用數(shù)據(jù),但消費(fèi)者的 CPU 使用率卻很高,那么就需要把該屬性的值設(shè)得比默認(rèn)值大裳扯。如果消費(fèi)者的數(shù)量比較多抛丽,把該屬性的值設(shè)置得大一點(diǎn)可以降低broker 的工作負(fù)載。fetch.max.wait.ms
我們通過(guò) fetch.min.bytes 告訴 Kafka饰豺,等到有足夠的數(shù)據(jù)時(shí)才把它返回給消費(fèi)者亿鲜。而feth.max.wait.ms 則用于指定 broker 的等待時(shí)間,默認(rèn)是 500ms冤吨。如果沒(méi)有足夠的數(shù)據(jù)流入 Kafka蒿柳,消費(fèi)者獲取最小數(shù)據(jù)量的要求就得不到滿足,最終導(dǎo)致 500ms 的延遲漩蟆。如果要降低潛在的延遲(為了滿足 SLA)其馏,可以把該參數(shù)值設(shè)置得小一些。
如果 fetch.max.wait.ms 被設(shè)為 100ms爆安,并且fetch.min.bytes 被設(shè)為 1MB叛复,那么 Kafka 在收到消費(fèi)者的請(qǐng)求后,要么返回 1MB 數(shù)據(jù)扔仓,要么在100ms 后返回所有可用的數(shù)據(jù)褐奥,就看哪個(gè)條件先得到滿足。max.partition.fetch.bytes
該屬性指定了服務(wù)器從每個(gè)分區(qū)里返回給消費(fèi)者的最大字節(jié)數(shù)翘簇。它的默認(rèn)值是 1MB撬码,也就是說(shuō),KafkaConsumer.poll() 方法從每個(gè)分區(qū)里返回的記錄最多不超過(guò) max.partition.fetch.bytes指定的字節(jié)版保。如果一個(gè)主題有 20 個(gè)分區(qū)和 5 個(gè)消費(fèi)者呜笑,那么每個(gè)消費(fèi)者需要至少 4MB 的可用內(nèi)存來(lái)接收記錄。在為消費(fèi)者分配內(nèi)存時(shí)彻犁,可以給它們多分配一些叫胁,因?yàn)槿绻航M里有消費(fèi)者發(fā)生崩潰,剩下的消費(fèi)者需要處理更多的分區(qū)汞幢。
max.partition.fetch.bytes 的值必須比 broker 能夠接收的最大消息的字節(jié)數(shù)(通過(guò) max.message.size 屬性配置)大驼鹅,否則消費(fèi)者可能無(wú)法讀取這些消息,導(dǎo)致消費(fèi)者一直掛起重試森篷。在設(shè)置該屬性時(shí)输钩,另一個(gè)需要考慮的因素是消費(fèi)者處理數(shù)據(jù)的時(shí)間。消費(fèi)者需要頻繁調(diào)用poll() 方法來(lái)避免會(huì)話過(guò)期和發(fā)生分區(qū)再均衡仲智,如果單次調(diào)用 poll() 返回的數(shù)據(jù)太多买乃,消費(fèi)者需要更多的時(shí)間來(lái)處理,可能無(wú)法及時(shí)進(jìn)行下一個(gè)輪詢來(lái)避免會(huì)話過(guò)期钓辆。
如果出現(xiàn)這種情況剪验,可以把max.partition.fetch.bytes 值改小肴焊,或者延長(zhǎng)會(huì)話過(guò)期時(shí)間。session.timeout.ms
該屬性指定了消費(fèi)者在被認(rèn)為死亡之前可以與服務(wù)器斷開連接的時(shí)間碉咆,默認(rèn)是 3s抖韩。如果消費(fèi)者沒(méi)有在session.timeout.ms 指定的時(shí)間內(nèi)發(fā)送心跳給群組協(xié)調(diào)器,就被認(rèn)為已經(jīng)死亡疫铜,協(xié)調(diào)器就會(huì)觸發(fā)再均衡茂浮,把它的分區(qū)分配給群組里的其他消費(fèi)者。
該屬性與 heartbeat.interval.ms 緊密相關(guān)壳咕。heartbeat.interval.ms 指定了 poll() 方法向協(xié)調(diào)器發(fā)送心跳的頻率席揽,session.timeout.ms則指定了消費(fèi)者可以多久不發(fā)送心跳。
所以谓厘,一般需要同時(shí)修改這兩個(gè)屬性幌羞,heartbeat.interval.ms 必須比 session.timeout.ms 小,一般是session.timeout.ms 的三分之一竟稳。如果 session.timeout.ms 是 3s属桦,那么 heartbeat.interval.ms 應(yīng)該是 1s。把session.timeout.ms 值設(shè)得比默認(rèn)值小他爸,可以更快地檢測(cè)和恢復(fù)崩潰的節(jié)點(diǎn)聂宾,不過(guò)長(zhǎng)時(shí)間的輪詢或垃圾收集可能導(dǎo)致非預(yù)期的再均衡。把該屬性的值設(shè)置得大一些诊笤,可以減少意外的再均衡系谐,不過(guò)檢測(cè)節(jié)點(diǎn)崩潰需要更長(zhǎng)的時(shí)間。auto.offset.reset
該屬性指定了消費(fèi)者在讀取一個(gè)沒(méi)有偏移量的分區(qū)或者偏移量無(wú)效的情況下(因消費(fèi)者長(zhǎng)時(shí)間失效讨跟,包含偏移量的記錄已經(jīng)過(guò)時(shí)并被刪除)該作何處理纪他。
它的默認(rèn)值是 latest,意思是說(shuō)晾匠,在偏移量無(wú)效的情況下茶袒,消費(fèi)者將從最新的記錄開始讀取數(shù)據(jù)(在消費(fèi)者啟動(dòng)之后生成的記錄)。另一個(gè)值是earliest混聊,意思是說(shuō)弹谁,在偏移量無(wú)效的情況下,消費(fèi)者將從起始位置讀取分區(qū)的記錄句喜。enable.auto.commit
我們稍后將介紹幾種不同的提交偏移量的方式。該屬性指定了消費(fèi)者是否自動(dòng)提交偏移量沟于,默認(rèn)值是true咳胃。為了盡量避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)為 false旷太,由自己控制何時(shí)提交偏移量展懈。如果把它設(shè)為true销睁,還可以通過(guò)配置 auto.commit.interval.ms 屬性來(lái)控制提交的頻率。-
partition.assignment.strategy
(這部分好像重復(fù)了 ~~~)
我們知道存崖,分區(qū)會(huì)被分配給群組里的消費(fèi)者冻记。PartitionAssignor 根據(jù)給定的消費(fèi)者和主題,決定哪些分區(qū)應(yīng)該被分配給哪個(gè)消費(fèi)者来惧。Kafka 有兩個(gè)默認(rèn)的分配策略冗栗。
Range
該策略會(huì)把主題的若干個(gè)連續(xù)的分區(qū)分配給消費(fèi)者。假設(shè)消費(fèi)者 C1 和消費(fèi)者 C2 同時(shí)訂閱了主題T1 和 主題 T2供搀,并且每個(gè)主題有 3 個(gè)分區(qū)隅居。那么消費(fèi)者 C1 有可能分配到這兩個(gè)主題的分區(qū) 0 和分區(qū) 1,而消費(fèi)者 C2 分配到這兩個(gè)主題的分區(qū) 2葛虐。因?yàn)槊總€(gè)主題擁有奇數(shù)個(gè)分區(qū)胎源,而分配是在主題內(nèi)獨(dú)立完成的,第一個(gè)消費(fèi)者最后分配到比第二個(gè)消費(fèi)者更多的分區(qū)屿脐。只要使用了 Range 策略涕蚤,而且分區(qū)數(shù)量無(wú)法被消費(fèi)者數(shù)量整除,就會(huì)出現(xiàn)這種情況的诵。RoundRobin
該策略把主題的所有分區(qū)逐個(gè)分配給消費(fèi)者万栅。如果使用 RoundRobin 策略來(lái)給消費(fèi)者 C1 和消費(fèi)者C2 分配分區(qū),那么消費(fèi)者 C1 將分到主題 T1 的分區(qū) 0 和分區(qū) 2 以及主題 T2 的分區(qū) 1奢驯,消費(fèi)者 C2 將分配到主題 T1 的分區(qū) 1 以及主題 T2 的分區(qū) 0 和分區(qū) 2申钩。一般來(lái)說(shuō),如果所有消費(fèi)者都訂閱相同的主題(這種情況很常見(jiàn))瘪阁,RoundRobin 策略會(huì)給所有消費(fèi)者分配相同數(shù)量的分區(qū)(或最多就差一個(gè)分區(qū))撒遣。可以通過(guò)設(shè)置 partition.assignment.strategy 來(lái)選擇分區(qū)策略管跺。
默認(rèn)使用的是org.apache.kafka.clients.consumer.RangeAssignor义黎,這個(gè)類實(shí)現(xiàn)了 Range 策略,不過(guò)也可以把它改成 org.apache.kafka.clients.consumer.RoundRobinAssignor豁跑。我們還可以使用自定義策略廉涕,在這種情況下,partition.assignment.strategy 屬性的值就是自定義類的名字艇拍。 client.id
該屬性可以是任意字符串狐蜕,broker 用它來(lái)標(biāo)識(shí)從客戶端發(fā)送過(guò)來(lái)的消息,通常被用在日志卸夕、度量指標(biāo)和配額里层释。max.poll.records
該屬性用于控制單次調(diào)用 call() 方法能夠返回的記錄數(shù)量,可以幫你控制在輪詢里需要處理的數(shù)據(jù)量快集。receive.buffer.bytes 和 send.buffer.bytes
socket 在讀寫數(shù)據(jù)時(shí)用到的 TCP 緩沖區(qū)也可以設(shè)置大小贡羔。如果它們被設(shè)為 -1廉白,就使用操作系統(tǒng)的默認(rèn)值。
如果生產(chǎn)者或消費(fèi)者與 broker 處于不同的數(shù)據(jù)中心內(nèi)乖寒,可以適當(dāng)增大這些值猴蹂,因?yàn)榭鐢?shù)據(jù)中心的網(wǎng)絡(luò)一般都有比較高的延遲和比較低的帶寬
提交(commit)與位移(offset)
當(dāng)我們調(diào)用poll()時(shí),該方法會(huì)返回我們沒(méi)有消費(fèi)的消息楣嘁。當(dāng)消息從broker返回消費(fèi)者時(shí)磅轻,broker并不跟蹤這些消息是否被消費(fèi)者接收到;Kafka讓消費(fèi)者自身來(lái)管理消費(fèi)的位移马澈,并向消費(fèi)者提供更新位移的接口瓢省,這種更新位移方式稱為提交(commit)。
在正常情況下痊班,消費(fèi)者會(huì)發(fā)送分區(qū)的提交信息到Kafka勤婚,Kafka進(jìn)行記錄。當(dāng)消費(fèi)者宕機(jī)或者新消費(fèi)者加入時(shí)涤伐,Kafka會(huì)進(jìn)行重平衡馒胆,這會(huì)導(dǎo)致消費(fèi)者負(fù)責(zé)之前并不屬于它的分區(qū)。重平衡完成后凝果,消費(fèi)者會(huì)重新獲取分區(qū)的位移祝迂,下面來(lái)看下兩種有意思的情況。
假如一個(gè)消費(fèi)者在重平衡前后都負(fù)責(zé)某個(gè)分區(qū)器净,如果提交位移比之前實(shí)際處理的消息位移要小型雳,那么會(huì)導(dǎo)致消息重復(fù)消費(fèi),如下所示:
假如在重平衡前某個(gè)消費(fèi)者拉取分區(qū)消息山害,在進(jìn)行消息處理前提交了位移纠俭,但還沒(méi)完成處理宕機(jī)了,然后Kafka進(jìn)行重平衡浪慌,新的消費(fèi)者負(fù)責(zé)此分區(qū)并讀取提交位移冤荆,此時(shí)會(huì)“丟失”消息,如下所示:
因此权纤,提交位移的方式會(huì)對(duì)應(yīng)用有比較大的影響钓简,下面來(lái)看下不同的提交方式。
自動(dòng)提交
這種方式讓消費(fèi)者來(lái)管理位移汹想,應(yīng)用本身不需要顯式操作外邓。當(dāng)我們將enable.auto.commit設(shè)置為true,那么消費(fèi)者會(huì)在poll方法調(diào)用后每隔5秒(由auto.commit.interval.ms指定)提交一次位移古掏。和很多其他操作一樣坐榆,自動(dòng)提交也是由poll()方法來(lái)驅(qū)動(dòng)的;在調(diào)用poll()時(shí)冗茸,消費(fèi)者判斷是否到達(dá)提交時(shí)間席镀,如果是則提交上一次poll返回的最大位移。
需要注意到夏漱,這種方式可能會(huì)導(dǎo)致消息重復(fù)消費(fèi)豪诲。假如,某個(gè)消費(fèi)者poll消息后挂绰,應(yīng)用正在處理消息屎篱,在3秒后Kafka進(jìn)行了重平衡,那么由于沒(méi)有更新位移導(dǎo)致重平衡后這部分消息重復(fù)消費(fèi)葵蒂。
提交當(dāng)前位移
為了減少消息重復(fù)消費(fèi)或者避免消息丟失交播,很多應(yīng)用選擇自己主動(dòng)提交位移。設(shè)置auto.commit.offset為false践付,那么應(yīng)用需要自己通過(guò)調(diào)用commitSync()來(lái)主動(dòng)提交位移秦士,該方法會(huì)提交poll返回的最后位移。
為了避免消息丟失永高,我們應(yīng)當(dāng)在完成業(yè)務(wù)邏輯后才提交位移隧土。而如果在處理消息時(shí)發(fā)生了重平衡,那么只有當(dāng)前poll的消息會(huì)重復(fù)消費(fèi)命爬。下面是一個(gè)自動(dòng)提交的代碼樣例:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
log.error("commit failed", e)
}
}
上面代碼poll消息曹傀,并進(jìn)行簡(jiǎn)單的打印(在實(shí)際中有更多的處理)饲宛,最后完成處理后進(jìn)行了位移提交皆愉。
異步提交
手動(dòng)提交有一個(gè)缺點(diǎn),那就是當(dāng)發(fā)起提交調(diào)用時(shí)應(yīng)用會(huì)阻塞艇抠。當(dāng)然我們可以減少手動(dòng)提交的頻率幕庐,但這個(gè)會(huì)增加消息重復(fù)的概率(和自動(dòng)提交一樣)。另外一個(gè)解決辦法是练链,使用異步提交的API翔脱。以下為使用異步提交的方式,應(yīng)用發(fā)了一個(gè)提交請(qǐng)求然后立即返回:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s,
offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitAsync();
}
但是異步提交也有個(gè)缺點(diǎn)媒鼓,那就是如果服務(wù)器返回提交失敗届吁,異步提交不會(huì)進(jìn)行重試。相比較起來(lái)绿鸣,同步提交會(huì)進(jìn)行重試直到成功或者最后拋出異常給應(yīng)用疚沐。異步提交沒(méi)有實(shí)現(xiàn)重試是因?yàn)椋绻瑫r(shí)存在多個(gè)異步提交潮模,進(jìn)行重試可能會(huì)導(dǎo)致位移覆蓋亮蛔。舉個(gè)例子,假如我們發(fā)起了一個(gè)異步提交commitA擎厢,此時(shí)的提交位移為2000究流,隨后又發(fā)起了一個(gè)異步提交commitB且位移為3000辣吃;commitA提交失敗但commitB提交成功,此時(shí)commitA進(jìn)行重試并成功的話芬探,會(huì)將實(shí)際上將已經(jīng)提交的位移從3000回滾到2000神得,導(dǎo)致消息重復(fù)消費(fèi)。
因此偷仿,基于這種性質(zhì)哩簿,一般情況下對(duì)于異步提交,我們可能會(huì)通過(guò)回調(diào)的方式記錄提交結(jié)果:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s,
offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (e != null)
log.error("Commit failed for offsets {}", offsets, e);
}
});
}
而如果想進(jìn)行重試同時(shí)又保證提交順序的話酝静,一種簡(jiǎn)單的辦法是使用單調(diào)遞增的序號(hào)节榜。每次發(fā)起異步提交時(shí)增加此序號(hào),并且將此時(shí)的序號(hào)作為參數(shù)傳給回調(diào)方法别智;當(dāng)消息提交失敗回調(diào)時(shí)宗苍,檢查參數(shù)中的序號(hào)值與全局的序號(hào)值,如果相等那么可以進(jìn)行重試提交亿遂,否則放棄(因?yàn)橐呀?jīng)有更新的位移提交了)浓若。
混合同步提交與異步提交
正常情況下,偶然的提交失敗并不是什么大問(wèn)題蛇数,因?yàn)楹罄m(xù)的提交成功就可以了挪钓。但是在某些情況下(例如程序退出、重平衡)耳舅,我們希望最后的提交成功碌上,因此一種非常普遍的方式是混合異步提交和同步提交,如下所示:
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(),
record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
在正常處理流程中浦徊,我們使用異步提交來(lái)提高性能馏予,但最后使用同步提交來(lái)保證位移提交成功。
提交特定位移
commitSync()和commitAsync()會(huì)提交上一次poll()的最大位移盔性,但如果poll()返回了批量消息霞丧,而且消息數(shù)量非常多,我們可能會(huì)希望在處理這些批量消息過(guò)程中提交位移冕香,以免重平衡導(dǎo)致從頭開始消費(fèi)和處理蛹尝。幸運(yùn)的是,commitSync()和commitAsync()允許我們指定特定的位移參數(shù)悉尾,參數(shù)為一個(gè)分區(qū)與位移的map突那。由于一個(gè)消費(fèi)者可能會(huì)消費(fèi)多個(gè)分區(qū),所以這種方式會(huì)增加一定的代碼復(fù)雜度构眯,如下所示:
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;
....
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
if (count % 1000 == 0)
consumer.commitAsync(currentOffsets, null);
count++;
} }
代碼中在處理poll()消息的過(guò)程中愕难,不斷保存分區(qū)與位移的關(guān)系,每處理1000條消息就會(huì)異步提交(也可以使用同步提交)。
重平衡監(jiān)聽器(Rebalance Listener)
變更分區(qū)命令
kafka-topics.sh --zookeeper hadoop03:2181 --alter --topic test --partitions 6
在分區(qū)重平衡前猫缭,如果消費(fèi)者知道它即將不再負(fù)責(zé)某個(gè)分區(qū)葱弟,那么它可能需要將已經(jīng)處理過(guò)的消息位移進(jìn)行提交。Kafka的API允許我們?cè)谙M(fèi)者新增分區(qū)或者失去分區(qū)時(shí)進(jìn)行處理饵骨,我們只需要在調(diào)用subscribe()方法時(shí)傳入ConsumerRebalanceListener對(duì)象翘悉,該對(duì)象有兩個(gè)方法:
- public void onPartitionRevoked(Collection<topicpartition> partitions):此方法會(huì)在消費(fèi)者停止消費(fèi)消費(fèi)后,在重平衡開始前調(diào)用居触。</topicpartition>
- public void onPartitionAssigned(Collection<topicpartition> partitions):此方法在分區(qū)分配給消費(fèi)者后,在消費(fèi)者開始讀取消息前調(diào)用老赤。</topicpartition>
下面來(lái)看一個(gè)onPartitionRevoked9)的例子轮洋,該例子在消費(fèi)者失去某個(gè)分區(qū)時(shí)提交位移(以便其他消費(fèi)者可以接著消費(fèi)消息并處理):
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
private class HandleRebalance implements ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Lost partitions in rebalance.
Committing current
offsets:" + currentOffsets);
consumer.commitSync(currentOffsets);
}
}
try {
consumer.subscribe(topics, new HandleRebalance());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
}
consumer.commitAsync(currentOffsets, null);
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync(currentOffsets);
} finally {
consumer.close();
System.out.println("Closed consumer and we are done");
}
}
代碼中實(shí)現(xiàn)了onPartitionsRevoked()方法,當(dāng)消費(fèi)者失去某個(gè)分區(qū)時(shí)抬旺,會(huì)提交已經(jīng)處理的消息位移(而不是poll()的最大位移)弊予。上面代碼會(huì)提交所有的分區(qū)位移,而不僅僅是失去分區(qū)的位移开财,但這種做法沒(méi)什么壞處汉柒。
從指定位移開始消費(fèi)
在此之前,我們使用poll()來(lái)從最后的提交位移開始消費(fèi)责鳍,但我們也可以從一個(gè)指定的位移開始消費(fèi)碾褂。
如果想從分區(qū)開始端重新開始消費(fèi),那么可以使用seekToBeginning(TopicPartition tp)历葛;如果想從分區(qū)的最末端消費(fèi)最新的消息正塌,那么可以使用seekToEnd(TopicPartition tp)。而且恤溶,Kafka還支持我們從指定位移開始消費(fèi)乓诽。從指定位移開始消費(fèi)的應(yīng)用場(chǎng)景有很多,其中最典型的一個(gè)是:位移存在其他系統(tǒng)(例如數(shù)據(jù)庫(kù))中咒程,并且以其他系統(tǒng)的位移為準(zhǔn)鸠天。
考慮這么個(gè)場(chǎng)景:我們從Kafka中讀取消費(fèi),然后進(jìn)行處理帐姻,最后把結(jié)果寫入數(shù)據(jù)庫(kù)稠集;我們既不想丟失消息,也不想數(shù)據(jù)庫(kù)中存在重復(fù)的消息數(shù)據(jù)卖宠。對(duì)于這樣的場(chǎng)景巍杈,我們可能會(huì)按如下邏輯處理:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset());
processRecord(record);
storeRecordInDB(record);
consumer.commitAsync(currentOffsets);
}
}
這個(gè)邏輯似乎沒(méi)什么問(wèn)題,但是要注意到這么個(gè)事實(shí)扛伍,在持久化到數(shù)據(jù)庫(kù)成功后筷畦,提交位移到Kafka可能會(huì)失敗,那么這可能會(huì)導(dǎo)致消息會(huì)重復(fù)處理。對(duì)于這種情況鳖宾,我們可以優(yōu)化方案吼砂,將持久化到數(shù)據(jù)庫(kù)與提交位移實(shí)現(xiàn)為原子性操作,也就是要么同時(shí)成功鼎文,要么同時(shí)失敗渔肩。但這個(gè)是不可能的,因此我們可以在保存記錄到數(shù)據(jù)庫(kù)的同時(shí)拇惋,也保存位移周偎,然后在消費(fèi)者開始消費(fèi)時(shí)使用數(shù)據(jù)庫(kù)的位移開始消費(fèi)。這個(gè)方案是可行的撑帖,我們只需要通過(guò)seek()來(lái)指定分區(qū)位移開始消費(fèi)即可蓉坎。下面是一個(gè)改進(jìn)的樣例代碼:
public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
//在消費(fèi)者負(fù)責(zé)的分區(qū)被回收前提交數(shù)據(jù)庫(kù)事務(wù),保存消費(fèi)的記錄和位移
commitDBTransaction();
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//在開始消費(fèi)前胡嘿,從數(shù)據(jù)庫(kù)中獲取分區(qū)的位移蛉艾,并使用seek()來(lái)指定開始消費(fèi)的位移
for(TopicPartition partition: partitions)
consumer.seek(partition, getOffsetFromDB(partition));
}
}
consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
//在subscribe()之后poll一次,并從數(shù)據(jù)庫(kù)中獲取分區(qū)的位移衷敌,使用seek()來(lái)指定開始消費(fèi)的位移
consumer.poll(0);
for (TopicPartition partition: consumer.assignment())
consumer.seek(partition, getOffsetFromDB(partition));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
processRecord(record);
//保存記錄結(jié)果
storeRecordInDB(record);
//保存位移
storeOffsetInDB(record.topic(), record.partition(), record.offset());
}
//提交數(shù)據(jù)庫(kù)事務(wù)勿侯,保存消費(fèi)的記錄以及位移
commitDBTransaction();
}
具體邏輯見(jiàn)代碼注釋,此處不再贅述缴罗。另外注意的是助琐,seek()只是指定了poll()拉取的開始位移,這并不影響在Kafka中保存的提交位移(當(dāng)然我們可以在seek和poll之后提交位移覆蓋)瞒爬。
package com.neuedu;
import java.util.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
public class Consumer {
private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
static int count = 0;
static KafkaConsumer<String, String> consumer;
private class HandleRebalance implements ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Lost partitions in rebalance.Committing current offsets:" + currentOffsets);
consumer.commitSync(currentOffsets);
}
}
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers",
"hadoop03:9092,hadoop05:9092,hadoop06:9092");// 該地址是集群的子集弓柱,用來(lái)探測(cè)集群。
props.put("group.id", "payment");// cousumer的分組id
props.put("enable.auto.commit", "false");// 自動(dòng)提交offsets
props.put("auto.commit.interval.ms", "1000");// 每隔1s侧但,自動(dòng)提交offsets
props.put("session.timeout.ms", "30000");// Consumer向集群發(fā)送自己的心跳矢空,超時(shí)則認(rèn)為Consumer已經(jīng)死了,kafka會(huì)把它的分區(qū)分配給其他進(jìn)程
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");// 反序列化器
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
// consumer.subscribe(Arrays.asList("payment"));// 訂閱的topic,可以多個(gè)
String topic = "payment";
TopicPartition partition0 = new TopicPartition(topic, 0);
// TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0));
Collection<TopicPartition> partitions = Arrays.asList(partition0);
consumer.seekToBeginning(partitions);
// consumer.seek(partition0,495);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("賭東道賭東道賭東道賭東道賭東道賭東道 offset = %d, key = %s, value = %s, partition = %s",
record.offset(), record.key(), record.value(),record.partition());
System.out.println();
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
if (count % 1 == 0)
consumer.commitAsync(currentOffsets, null);
count++;
}
}
}
}