前言
讀完本文,你將了解到如下知識點:
- kafka 的消費者 和 消費者組
- 如何正確使用kafka consumer
- 常用的 kafka consumer 配置
消費者 和 消費者組
-
什么是消費者?
顧名思義攀细,消費者就是從kafka集群消費數(shù)據(jù)的客戶端浊闪,如下圖刽沾,展示了一個消費者從一個topic中消費數(shù)據(jù)的模型image 單個消費者模型存在的問題读处?
如果這個時候 kafka 上游生產(chǎn)的數(shù)據(jù)很快尘分,超過了這個消費者1
的消費速度猜惋,那么就會導(dǎo)致數(shù)據(jù)堆積,產(chǎn)生一些大家都知道的蛋疼事情了培愁,那么我們只能加強消費者
的消費能力著摔,所以也就有了我們下面來說的消費者組
-
什么是消費者組?
所謂消費者組
定续,其實就是一組消費者
的集合谍咆,當我們看到下面這張圖是不是就特別舒服了,我們采用了一個消費組
來消費這個topic
私股,眾人拾柴火焰高摹察,其消費能力那是按倍數(shù)遞增的,所以這里我們一般來說都是采用消費者組
來消費數(shù)據(jù)倡鲸,而不會是單消費者
來消費數(shù)據(jù)的供嚎。這里值得我們注意的是:- 一個
topic
可以被 多個消費者組
消費,但是每個消費者組
消費的數(shù)據(jù)是 互不干擾 的峭状,也就是說克滴,每個消費組
消費的都是 完整的數(shù)據(jù) 。 - 一個分區(qū)只能被 同一個消費組內(nèi) 的一個
消費者
消費优床,而 不能拆給多個消費者 消費
- 一個
-
是不是一個 消費組 的 消費者 越多其消費能力就越強呢劝赔?
從圖3
我們就很好的可以回答這個問題了,我們可以看到消費者4
是完全沒有消費任何的數(shù)據(jù)的胆敞,所以如果你想要加強消費者組
的能力望忆,除了添加消費者罩阵,分區(qū)的數(shù)量也是需要跟著增加的,只有這樣他們的并行度才能上的去启摄,消費能力才會強。
為了提高 消費組 的 消費能力幽钢,我是不是可以隨便添加 分區(qū) 和 消費者 呢歉备?
答案當然是否定的啦。匪燕。蕾羊。嘿嘿
我們看到圖2
,一般來說我們建議消費者
數(shù)量 和分區(qū)
數(shù)量是一致的帽驯,當我們的消費能力不夠時龟再,就必須通過調(diào)整分區(qū)的數(shù)量來提高并行度,但是尼变,我們應(yīng)該盡量來避免這種情況發(fā)生利凑,比如:
現(xiàn)在我們需要在圖2
的基礎(chǔ)上增加一個分區(qū)4
,那么這個分區(qū)4
該由誰來消費呢嫌术?這個時候kafka會進行分區(qū)再均衡
哀澈,來為這個分區(qū)分配消費者,分區(qū)再均衡
期間該消費組
是不可用的度气,并且作為一個被消費者
割按,分區(qū)數(shù)的改動將影響到每一個消費者組
,所以在創(chuàng)建topic
的時候磷籍,我們就應(yīng)該考慮好分區(qū)數(shù)适荣,來盡量避免這種情況發(fā)生-
分區(qū)分配過程
上面我們提到了為 分區(qū)分配消費者,那么我們現(xiàn)在就來看看分配過程是怎么樣的院领。- 確定 群組協(xié)調(diào)器
每當我們創(chuàng)建一個消費組弛矛,kafka 會為我們分配一個 broker 作為該消費組的 coordinator(協(xié)調(diào)器) - 注冊消費者 并選出 leader consumer
當我們的有了 coordinator 之后,消費者將會開始往該 coordinator上進行注冊栅盲,第一個注冊的 消費者將成為該消費組的 leader汪诉,后續(xù)的 作為 follower, - 當 leader 選出來后,他會從coordinator那里實時獲取分區(qū) 和 consumer 信息,并根據(jù)分區(qū)策略給每個consumer 分配 分區(qū)搁凸,并將分配結(jié)果告訴 coordinator藕届。
- follower 消費者將從 coordinator 那里獲取到自己相關(guān)的分區(qū)信息進行消費,對于所有的 follower 消費者而言赠摇,他們只知道自己消費的分區(qū),并不知道其他消費者的存在。
- 至此课竣,消費者都知道自己的消費的分區(qū)嘉赎,分區(qū)過程結(jié)束,當發(fā)送分區(qū)再均衡的時候于樟,leader 將會重復(fù)分配過程
- 確定 群組協(xié)調(diào)器
消費組與分區(qū)重平衡
可以看到公条,當新的消費者加入消費組,它會消費一個或多個分區(qū)迂曲,而這些分區(qū)之前是由其他消費者負責的靶橱;另外,當消費者離開消費組(比如重啟路捧、宕機等)時关霸,它所消費的分區(qū)會分配給其他分區(qū)。這種現(xiàn)象稱為重平衡(rebalance)杰扫。重平衡是Kafka一個很重要的性質(zhì)队寇,這個性質(zhì)保證了高可用和水平擴展。不過也需要注意到章姓,在重平衡期間佳遣,所有消費者都不能消費消息,因此會造成整個消費組短暫的不可用啤覆。而且苍日,將分區(qū)進行重平衡也會導(dǎo)致原來的消費者狀態(tài)過期,從而導(dǎo)致消費者需要重新更新狀態(tài)窗声,這段期間也會降低消費性能相恃。后面我們會討論如何安全的進行重平衡以及如何盡可能避免。
消費者通過定期發(fā)送心跳(hearbeat)到一個作為組協(xié)調(diào)者(group coordinator)的broker來保持在消費組內(nèi)存活笨觅。這個broker不是固定的拦耐,每個消費組都可能不同。當消費者拉取消息或者提交時见剩,便會發(fā)送心跳杀糯。
如果消費者超過一定時間沒有發(fā)送心跳,那么它的會話(session)就會過期苍苞,組協(xié)調(diào)者會認為該消費者已經(jīng)宕機固翰,然后觸發(fā)重平衡「牵可以看到骂际,從消費者宕機到會話過期是有一定時間的,這段時間內(nèi)該消費者的分區(qū)都不能進行消息消費冈欢;通常情況下歉铝,我們可以進行優(yōu)雅關(guān)閉,這樣消費者會發(fā)送離開的消息到組協(xié)調(diào)者凑耻,這樣組協(xié)調(diào)者可以立即進行重平衡而不需要等待會話過期太示。
在0.10.1版本柠贤,Kafka對心跳機制進行了修改,將發(fā)送心跳與拉取消息進行分離类缤,這樣使得發(fā)送心跳的頻率不受拉取的頻率影響臼勉。另外更高版本的Kafka支持配置一個消費者多長時間不拉取消息但仍然保持存活,這個配置可以避免活鎖(livelock)呀非〖崴祝活鎖,是指應(yīng)用沒有故障但是由于某些原因不能進一步消費岸裙。
創(chuàng)建Kafka消費者
讀取Kafka消息只需要創(chuàng)建一個kafkaConsumer,創(chuàng)建過程與KafkaProducer非常相像速缆。我們需要使用四個基本屬性降允,bootstrap.servers、key.deserializer艺糜、value.deserializer和group.id剧董。其中,bootstrap.servers與創(chuàng)建KafkaProducer的含義一樣破停;key.deserializer和value.deserializer是用來做反序列化的翅楼,也就是將字節(jié)數(shù)組轉(zhuǎn)換成對象;group.id不是嚴格必須的真慢,但通常都會指定毅臊,這個參數(shù)是消費者的消費組。
下面是一個代碼樣例:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
訂閱主題
創(chuàng)建完消費者后我們便可以訂閱主題了黑界,只需要通過調(diào)用subscribe()方法即可管嬉,這個方法接收一個主題列表,非常簡單:
consumer.subscribe(Collections.singletonList("customerCountries"));
這個例子中只訂閱了一個customerCountries主題朗鸠。另外蚯撩,我們也可以使用正則表達式來匹配多個主題,而且訂閱之后如果又有匹配的新主題烛占,那么這個消費組會立即對其進行消費胎挎。正則表達式在連接Kafka與其他系統(tǒng)時非常有用。比如訂閱所有的測試主題:
consumer.subscribe("test.*");
拉取循環(huán)
消費數(shù)據(jù)的API和處理方式很簡單忆家,我們只需要循環(huán)不斷拉取消息即可犹菇。Kafka對外暴露了一個非常簡潔的poll方法,其內(nèi)部實現(xiàn)了協(xié)作弦赖、分區(qū)重平衡项栏、心跳、數(shù)據(jù)拉取等功能蹬竖,但使用時這些細節(jié)都被隱藏了沼沈,我們也不需要關(guān)注這些流酬。下面是一個代碼樣例:
try {
while (true) { //1)
ConsumerRecords<String, String> records = consumer.poll(100); //2)
for (ConsumerRecord<String, String> record : records) //3)
{
log.debug("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
int updatedCount = 1;
if (custCountryMap.countainsValue(record.value())) {
updatedCount = custCountryMap.get(record.value()) + 1;
}
custCountryMap.put(record.value(), updatedCount)
JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4))
}
}
} finally {
consumer.close(); //4
}
其中,代碼中標注了幾點列另,說明如下:
1)這個例子使用無限循環(huán)消費并處理數(shù)據(jù)芽腾,這也是使用Kafka最多的一個場景,后面我們會討論如何更好的退出循環(huán)并關(guān)閉页衙。
2)這是上面代碼中最核心的一行代碼摊滔。我們不斷調(diào)用poll拉取數(shù)據(jù),如果停止拉取店乐,那么Kafka會認為此消費者已經(jīng)死亡并進行重平衡艰躺。參數(shù)值是一個超時時間,指明線程如果沒有數(shù)據(jù)時等待多長時間眨八,0表示不等待立即返回腺兴。
3)poll()方法返回記錄的列表,每條記錄包含key/value以及主題廉侧、分區(qū)页响、位移信息。
4)主動關(guān)閉可以使得Kafka立即進行重平衡而不需要等待會話過期段誊。
另外需要提醒的是闰蚕,消費者對象不是線程安全的,也就是不能夠多個線程同時使用一個消費者對象连舍;而且也不能夠一個線程有多個消費者對象没陡。簡而言之,一個線程一個消費者烟瞧,如果需要多個消費者那么請使用多線程來進行一一對應(yīng)诗鸭。
單線程版
package com.neuedu;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
public class Consumer {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers",
"hadoop03:9092,hadoop05:9092,hadoop06:9092");// 該地址是集群的子集,用來探測集群参滴。
props.put("group.id", "payment");// cousumer的分組id
props.put("enable.auto.commit", "true");// 自動提交offsets
props.put("auto.commit.interval.ms", "1000");// 每隔1s强岸,自動提交offsets
props.put("session.timeout.ms", "30000");// Consumer向集群發(fā)送自己的心跳,超時則認為Consumer已經(jīng)死了砾赔,kafka會把它的分區(qū)分配給其他進程
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");// 反序列化器
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("payment"));// 訂閱的topic,可以多個
// String topic = "payment";
// TopicPartition partition0 = new TopicPartition(topic, 0);
// TopicPartition partition1 = new TopicPartition(topic, 1);
// consumer.assign(Arrays.asList(partition0));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("賭東道賭東道賭東道賭東道賭東道賭東道 offset = %d, key = %s, value = %s, partition = %s",
record.offset(), record.key(), record.value(),record.partition());
System.out.println();
}
}
}
}
多線程版
package com.neuedu;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerRunnable implements Runnable {
// 每個線程維護私有的KafkaConsumer實例
private final KafkaConsumer<String, String> consumer;
public ConsumerRunnable(String brokerList, String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true"); //本例使用自動提交位移
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
//分區(qū)分配策略
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic)); // 本例使用分區(qū)副本自動分配策略
}
@Override
public void run() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200); // 本例使用200ms作為獲取超時時間
for (ConsumerRecord<String, String> record : records) {
// 這里面寫處理消息的邏輯蝌箍,本例中只是簡單地打印消息
System.out.println(Thread.currentThread().getName() + " consumed " + record.partition() +
"th message with offset: " + record.offset());
}
}
}
}
package com.neuedu;
import java.util.ArrayList;
import java.util.List;
public class ConsumerGroup {
private List<ConsumerRunnable> consumers;
public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
consumers = new ArrayList<>(consumerNum);
for (int i = 0; i < consumerNum; ++i) {
ConsumerRunnable consumerThread = new ConsumerRunnable(brokerList, groupId, topic);
consumers.add(consumerThread);
}
}
public void execute() {
for (ConsumerRunnable task : consumers) {
new Thread(task).start();
}
}
}
package com.neuedu;
public class ConsumerMain {
public static void main(String[] args) {
String brokerList = "hadoop03:9092";
String groupId = "testGroup1";
String topic = "payment";
int consumerNum = 2;
ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
consumerGroup.execute();
}
}
使用起來還是很簡單的,不過如果想要用好 consumer暴心,可能你還需要了解以下這些東西:
- 分區(qū)控制策略
- consumer 的一些常用配置
- offset 的控制
ok,那么我們接下來一個個來看吧妓盲。。专普。
分區(qū)控制策略
- 手動控制分區(qū)
咱們先來說下最簡單的手動分區(qū)控制悯衬,代碼如下:
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));
看起來只是把普通的訂閱方式修改成了訂閱知道 topic
的分區(qū),其余的還是照常使用檀夹,不過這里也需要注意一下的是:
- 一般只作為獨立消費者筋粗,也就是不能加入消費組策橘,或者說他本身就是作為一個消費組存在,要保證這一點娜亿,我們只需要保證其
group id
是唯一的就可以了丽已。 - 對于
topic
的分區(qū)變動不敏感,也就是說當topic
新增了分區(qū)买决,分區(qū)的數(shù)據(jù)將會發(fā)生改變沛婴,但該消費組對此確是不感知的,依然照常運行督赤,所以很多時候需要你手動consumer.partitionsFor()
去查看topic
的分區(qū)情況 - 不要和
subscription
混合使用
- 使用
partition.assignment.strategy
進行分區(qū)策略配置
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
這里的話 kafka 是自帶兩種分區(qū)策略的嘁灯,為了方便理解,我們以如下場景為例來進行解釋:
已知:
TopicA 有 3 個 partition(分區(qū)):A-1躲舌,A-2旁仿,A-3;
TopicB 有 3 個 partition(分區(qū)):B-1孽糖,B-2,B-3混弥;
ConsumerA 和 ConsumerB 作為一個消費組 ConsumerGroup 同時消費 TopicA 和 TopicB
-
Range
該方式最大的特點就是會將連續(xù)的分區(qū)分配給一個消費者实辑,根據(jù)示例铛碑,我們可以得出如下結(jié)論:ConsumerGroup 消費 TopicA 的時候:
ConsumerA 會分配到 A-1,A-2
ConsumerB 會分配到 A-3ConsumerGroup 消費 TopicB 的時候:
ConsumerA 會分配到 B-1病蛉,B-2
ConsumerB 會分配到 B-3所以:
ConsumerA 分配到了4個分區(qū): A-1,A-2瑰煎,B-1铺然,B-2
ConsumerB 分配到了2個分區(qū):A-3,B-3 -
RoundRobin
該方式最大的特點就是會以輪詢的方式將分區(qū)分配給一個個消費者酒甸,根據(jù)示例魄健,我們可以得出如下結(jié)論:ConsumerGroup 消費 TopicA 的時候:
ConsumerA 分配到 A-1
ConsumerB 分配到 A-2
ConsumerA 分配到 A-3ConsumerGroup 消費 TopicB 的時候,因為上次分配到了 ConsumerA,那么這次輪到 ConsumerB了 所以:
ConsumerB 分配到 B-1
ConsumerA 分配到 B-2
ConsumerB 分配到 B-3所以:
ConsumerA 分配到了4個分區(qū): A-1插勤,A-3沽瘦,B-2
ConsumerB 分配到了2個分區(qū):A-2,B-1农尖,B-3
從上面我們也是可以看出這兩種策略的異同析恋,RoundRobin 相比較 Range 會使得分區(qū)分配的更加的均衡,但如果是消費單個 topic 盛卡,那么其均衡是差不多的助隧,而 Range 會比 RoundRobin 更具優(yōu)勢一點,至于這個優(yōu)勢滑沧,還得看你的具體業(yè)務(wù)了并村。
-
自定義的分區(qū)策略
上面兩種分區(qū)策略是 kafka 默認自帶的策略巍实,雖然大多數(shù)情況下夠用了,但是可能針對一些特殊需求橘霎,我們也可以定義自己的分區(qū)策略- Range分區(qū)策略源碼
如何自定義呢蔫浆?最好的方式莫過于看源碼是怎么實現(xiàn)的,然后自己依葫蘆畫瓢來一個姐叁,所以我們先來看看 Range分區(qū)策略源碼瓦盛,如下,我只做了簡單的注釋外潜,因為它本身也很簡單原环,重點看下assign
的參數(shù)以及返回注釋就 ok了
public class RangeAssignor extends AbstractPartitionAssignor{ //省略部分代碼。处窥。嘱吗。。 /** * 根據(jù)訂閱者 和 分區(qū)數(shù)量來進行分區(qū) * @param partitionsPerTopic: topic->分區(qū)數(shù)量 * @param subscriptions: memberId 消費者id -> subscription 消費者信息 * @return: memberId ->list<topic名稱 和 分區(qū)序號(id)> */ @Override public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) { //topic -> list<消費者> Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions); //初始化 返回結(jié)果 Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<TopicPartition>()); for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) { //topic String topic = topicEntry.getKey(); // 消費該topic的 consumer-id List<String> consumersForTopic = topicEntry.getValue(); //topic 的分區(qū)數(shù)量 Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic == null) continue; Collections.sort(consumersForTopic); //平均每個消費者分配的 分區(qū)數(shù)量 int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); //平均之后剩下的 分區(qū)數(shù) int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); //這里就是將連續(xù)分區(qū)切開然后分配給每個消費者 List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic); for (int i = 0, n = consumersForTopic.size(); i < n; i++) { int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); } } return assignment; } }
- 自定義一個 分區(qū)策略
這里先緩緩把滔驾,太簡單把谒麦,沒什么用,太復(fù)雜把哆致,一時也想不出好的場景绕德,如果你有需求,歡迎留言摊阀,我們一起來實現(xiàn)
- Range分區(qū)策略源碼
Consumer 常用配置
首先耻蛇,我們都應(yīng)該知道,最全最全的文檔應(yīng)該是來自官網(wǎng)(雖然有時候可能官網(wǎng)找不到):
http://kafka.apachecn.org/documentation.html#newconsumerconfigs
嗯胞此,以下內(nèi)容來自 kafka權(quán)威指南 臣咖,請原諒我的小懶惰。漱牵。夺蛇。后續(xù)有時間會把工作中的遇到的補充上
fetch.min.bytes
該屬性指定了消費者從服務(wù)器獲取記錄的最小字節(jié)數(shù)。broker 在收到消費者的數(shù)據(jù)請求時布疙,
如果可用的數(shù)據(jù)量小于fetch.min.bytes 指定的大小蚊惯,那么它會等到有足夠的可用數(shù)據(jù)時才把它返回給消費者。這樣可以降低消費者和 broker 的工作負載灵临,因為它們在主題不是很活躍的時候(或者一天里的低谷時段)就不需要來來回回地處理消息截型。如果沒有很多可用數(shù)據(jù),但消費者的 CPU 使用率卻很高儒溉,那么就需要把該屬性的值設(shè)得比默認值大宦焦。如果消費者的數(shù)量比較多,把該屬性的值設(shè)置得大一點可以降低broker 的工作負載。fetch.max.wait.ms
我們通過 fetch.min.bytes 告訴 Kafka波闹,等到有足夠的數(shù)據(jù)時才把它返回給消費者酝豪。而feth.max.wait.ms 則用于指定 broker 的等待時間,默認是 500ms精堕。如果沒有足夠的數(shù)據(jù)流入 Kafka孵淘,消費者獲取最小數(shù)據(jù)量的要求就得不到滿足,最終導(dǎo)致 500ms 的延遲歹篓。如果要降低潛在的延遲(為了滿足 SLA)瘫证,可以把該參數(shù)值設(shè)置得小一些。
如果 fetch.max.wait.ms 被設(shè)為 100ms庄撮,并且fetch.min.bytes 被設(shè)為 1MB背捌,那么 Kafka 在收到消費者的請求后,要么返回 1MB 數(shù)據(jù)洞斯,要么在100ms 后返回所有可用的數(shù)據(jù)毡庆,就看哪個條件先得到滿足。max.partition.fetch.bytes
該屬性指定了服務(wù)器從每個分區(qū)里返回給消費者的最大字節(jié)數(shù)烙如。它的默認值是 1MB么抗,也就是說,KafkaConsumer.poll() 方法從每個分區(qū)里返回的記錄最多不超過 max.partition.fetch.bytes指定的字節(jié)亚铁。如果一個主題有 20 個分區(qū)和 5 個消費者乖坠,那么每個消費者需要至少 4MB 的可用內(nèi)存來接收記錄。在為消費者分配內(nèi)存時刀闷,可以給它們多分配一些,因為如果群組里有消費者發(fā)生崩潰仰迁,剩下的消費者需要處理更多的分區(qū)甸昏。
max.partition.fetch.bytes 的值必須比 broker 能夠接收的最大消息的字節(jié)數(shù)(通過 max.message.size 屬性配置)大,否則消費者可能無法讀取這些消息徐许,導(dǎo)致消費者一直掛起重試施蜜。在設(shè)置該屬性時,另一個需要考慮的因素是消費者處理數(shù)據(jù)的時間雌隅。消費者需要頻繁調(diào)用poll() 方法來避免會話過期和發(fā)生分區(qū)再均衡翻默,如果單次調(diào)用 poll() 返回的數(shù)據(jù)太多,消費者需要更多的時間來處理恰起,可能無法及時進行下一個輪詢來避免會話過期修械。
如果出現(xiàn)這種情況,可以把max.partition.fetch.bytes 值改小检盼,或者延長會話過期時間肯污。session.timeout.ms
該屬性指定了消費者在被認為死亡之前可以與服務(wù)器斷開連接的時間,默認是 3s。如果消費者沒有在session.timeout.ms 指定的時間內(nèi)發(fā)送心跳給群組協(xié)調(diào)器蹦渣,就被認為已經(jīng)死亡哄芜,協(xié)調(diào)器就會觸發(fā)再均衡,把它的分區(qū)分配給群組里的其他消費者柬唯。
該屬性與 heartbeat.interval.ms 緊密相關(guān)认臊。heartbeat.interval.ms 指定了 poll() 方法向協(xié)調(diào)器發(fā)送心跳的頻率,session.timeout.ms則指定了消費者可以多久不發(fā)送心跳锄奢。
所以失晴,一般需要同時修改這兩個屬性,heartbeat.interval.ms 必須比 session.timeout.ms 小斟薇,一般是session.timeout.ms 的三分之一师坎。如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 應(yīng)該是 1s堪滨。把session.timeout.ms 值設(shè)得比默認值小胯陋,可以更快地檢測和恢復(fù)崩潰的節(jié)點,不過長時間的輪詢或垃圾收集可能導(dǎo)致非預(yù)期的再均衡袱箱。把該屬性的值設(shè)置得大一些遏乔,可以減少意外的再均衡,不過檢測節(jié)點崩潰需要更長的時間发笔。auto.offset.reset
該屬性指定了消費者在讀取一個沒有偏移量的分區(qū)或者偏移量無效的情況下(因消費者長時間失效盟萨,包含偏移量的記錄已經(jīng)過時并被刪除)該作何處理。
它的默認值是 latest了讨,意思是說捻激,在偏移量無效的情況下,消費者將從最新的記錄開始讀取數(shù)據(jù)(在消費者啟動之后生成的記錄)前计。另一個值是earliest胞谭,意思是說,在偏移量無效的情況下男杈,消費者將從起始位置讀取分區(qū)的記錄丈屹。enable.auto.commit
我們稍后將介紹幾種不同的提交偏移量的方式。該屬性指定了消費者是否自動提交偏移量伶棒,默認值是true旺垒。為了盡量避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)為 false肤无,由自己控制何時提交偏移量先蒋。如果把它設(shè)為true,還可以通過配置 auto.commit.interval.ms 屬性來控制提交的頻率宛渐。-
partition.assignment.strategy
(這部分好像重復(fù)了 ~~~)
我們知道鞭达,分區(qū)會被分配給群組里的消費者司忱。PartitionAssignor 根據(jù)給定的消費者和主題,決定哪些分區(qū)應(yīng)該被分配給哪個消費者畴蹭。Kafka 有兩個默認的分配策略坦仍。
Range
該策略會把主題的若干個連續(xù)的分區(qū)分配給消費者。假設(shè)消費者 C1 和消費者 C2 同時訂閱了主題T1 和 主題 T2叨襟,并且每個主題有 3 個分區(qū)繁扎。那么消費者 C1 有可能分配到這兩個主題的分區(qū) 0 和分區(qū) 1,而消費者 C2 分配到這兩個主題的分區(qū) 2糊闽。因為每個主題擁有奇數(shù)個分區(qū)梳玫,而分配是在主題內(nèi)獨立完成的,第一個消費者最后分配到比第二個消費者更多的分區(qū)右犹。只要使用了 Range 策略提澎,而且分區(qū)數(shù)量無法被消費者數(shù)量整除,就會出現(xiàn)這種情況念链。RoundRobin
該策略把主題的所有分區(qū)逐個分配給消費者盼忌。如果使用 RoundRobin 策略來給消費者 C1 和消費者C2 分配分區(qū),那么消費者 C1 將分到主題 T1 的分區(qū) 0 和分區(qū) 2 以及主題 T2 的分區(qū) 1掂墓,消費者 C2 將分配到主題 T1 的分區(qū) 1 以及主題 T2 的分區(qū) 0 和分區(qū) 2谦纱。一般來說,如果所有消費者都訂閱相同的主題(這種情況很常見),RoundRobin 策略會給所有消費者分配相同數(shù)量的分區(qū)(或最多就差一個分區(qū))÷朔瘢可以通過設(shè)置 partition.assignment.strategy 來選擇分區(qū)策略。
默認使用的是org.apache.kafka.clients.consumer.RangeAssignor祠乃,這個類實現(xiàn)了 Range 策略,不過也可以把它改成 org.apache.kafka.clients.consumer.RoundRobinAssignor兑燥。我們還可以使用自定義策略跳纳,在這種情況下,partition.assignment.strategy 屬性的值就是自定義類的名字贪嫂。 client.id
該屬性可以是任意字符串,broker 用它來標識從客戶端發(fā)送過來的消息艾蓝,通常被用在日志力崇、度量指標和配額里。max.poll.records
該屬性用于控制單次調(diào)用 call() 方法能夠返回的記錄數(shù)量赢织,可以幫你控制在輪詢里需要處理的數(shù)據(jù)量亮靴。receive.buffer.bytes 和 send.buffer.bytes
socket 在讀寫數(shù)據(jù)時用到的 TCP 緩沖區(qū)也可以設(shè)置大小。如果它們被設(shè)為 -1于置,就使用操作系統(tǒng)的默認值茧吊。
如果生產(chǎn)者或消費者與 broker 處于不同的數(shù)據(jù)中心內(nèi),可以適當增大這些值,因為跨數(shù)據(jù)中心的網(wǎng)絡(luò)一般都有比較高的延遲和比較低的帶寬
提交(commit)與位移(offset)
當我們調(diào)用poll()時搓侄,該方法會返回我們沒有消費的消息瞄桨。當消息從broker返回消費者時,broker并不跟蹤這些消息是否被消費者接收到讶踪;Kafka讓消費者自身來管理消費的位移芯侥,并向消費者提供更新位移的接口,這種更新位移方式稱為提交(commit)乳讥。
在正常情況下柱查,消費者會發(fā)送分區(qū)的提交信息到Kafka,Kafka進行記錄云石。當消費者宕機或者新消費者加入時唉工,Kafka會進行重平衡,這會導(dǎo)致消費者負責之前并不屬于它的分區(qū)汹忠。重平衡完成后淋硝,消費者會重新獲取分區(qū)的位移,下面來看下兩種有意思的情況错维。
假如一個消費者在重平衡前后都負責某個分區(qū)奖地,如果提交位移比之前實際處理的消息位移要小,那么會導(dǎo)致消息重復(fù)消費赋焕,如下所示:
假如在重平衡前某個消費者拉取分區(qū)消息参歹,在進行消息處理前提交了位移,但還沒完成處理宕機了隆判,然后Kafka進行重平衡犬庇,新的消費者負責此分區(qū)并讀取提交位移,此時會“丟失”消息侨嘀,如下所示:
因此臭挽,提交位移的方式會對應(yīng)用有比較大的影響,下面來看下不同的提交方式咬腕。
自動提交
這種方式讓消費者來管理位移欢峰,應(yīng)用本身不需要顯式操作。當我們將enable.auto.commit設(shè)置為true涨共,那么消費者會在poll方法調(diào)用后每隔5秒(由auto.commit.interval.ms指定)提交一次位移纽帖。和很多其他操作一樣,自動提交也是由poll()方法來驅(qū)動的举反;在調(diào)用poll()時懊直,消費者判斷是否到達提交時間,如果是則提交上一次poll返回的最大位移火鼻。
需要注意到室囊,這種方式可能會導(dǎo)致消息重復(fù)消費雕崩。假如,某個消費者poll消息后融撞,應(yīng)用正在處理消息盼铁,在3秒后Kafka進行了重平衡,那么由于沒有更新位移導(dǎo)致重平衡后這部分消息重復(fù)消費懦铺。
提交當前位移
為了減少消息重復(fù)消費或者避免消息丟失捉貌,很多應(yīng)用選擇自己主動提交位移。設(shè)置auto.commit.offset為false冬念,那么應(yīng)用需要自己通過調(diào)用commitSync()來主動提交位移趁窃,該方法會提交poll返回的最后位移。
為了避免消息丟失急前,我們應(yīng)當在完成業(yè)務(wù)邏輯后才提交位移醒陆。而如果在處理消息時發(fā)生了重平衡,那么只有當前poll的消息會重復(fù)消費裆针。下面是一個自動提交的代碼樣例:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
log.error("commit failed", e)
}
}
上面代碼poll消息刨摩,并進行簡單的打印(在實際中有更多的處理)世吨,最后完成處理后進行了位移提交澡刹。
異步提交
手動提交有一個缺點,那就是當發(fā)起提交調(diào)用時應(yīng)用會阻塞耘婚。當然我們可以減少手動提交的頻率罢浇,但這個會增加消息重復(fù)的概率(和自動提交一樣)。另外一個解決辦法是沐祷,使用異步提交的API嚷闭。以下為使用異步提交的方式,應(yīng)用發(fā)了一個提交請求然后立即返回:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s,
offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitAsync();
}
但是異步提交也有個缺點赖临,那就是如果服務(wù)器返回提交失敗胞锰,異步提交不會進行重試。相比較起來兢榨,同步提交會進行重試直到成功或者最后拋出異常給應(yīng)用嗅榕。異步提交沒有實現(xiàn)重試是因為,如果同時存在多個異步提交吵聪,進行重試可能會導(dǎo)致位移覆蓋凌那。舉個例子,假如我們發(fā)起了一個異步提交commitA暖璧,此時的提交位移為2000,隨后又發(fā)起了一個異步提交commitB且位移為3000君旦;commitA提交失敗但commitB提交成功澎办,此時commitA進行重試并成功的話嘲碱,會將實際上將已經(jīng)提交的位移從3000回滾到2000,導(dǎo)致消息重復(fù)消費局蚀。
因此麦锯,基于這種性質(zhì),一般情況下對于異步提交琅绅,我們可能會通過回調(diào)的方式記錄提交結(jié)果:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s,
offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (e != null)
log.error("Commit failed for offsets {}", offsets, e);
}
});
}
而如果想進行重試同時又保證提交順序的話扶欣,一種簡單的辦法是使用單調(diào)遞增的序號。每次發(fā)起異步提交時增加此序號千扶,并且將此時的序號作為參數(shù)傳給回調(diào)方法料祠;當消息提交失敗回調(diào)時,檢查參數(shù)中的序號值與全局的序號值澎羞,如果相等那么可以進行重試提交髓绽,否則放棄(因為已經(jīng)有更新的位移提交了)。
混合同步提交與異步提交
正常情況下妆绞,偶然的提交失敗并不是什么大問題顺呕,因為后續(xù)的提交成功就可以了。但是在某些情況下(例如程序退出括饶、重平衡)株茶,我們希望最后的提交成功,因此一種非常普遍的方式是混合異步提交和同步提交图焰,如下所示:
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(),
record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
在正常處理流程中启盛,我們使用異步提交來提高性能,但最后使用同步提交來保證位移提交成功楞泼。
提交特定位移
commitSync()和commitAsync()會提交上一次poll()的最大位移驰徊,但如果poll()返回了批量消息,而且消息數(shù)量非常多堕阔,我們可能會希望在處理這些批量消息過程中提交位移棍厂,以免重平衡導(dǎo)致從頭開始消費和處理。幸運的是超陆,commitSync()和commitAsync()允許我們指定特定的位移參數(shù)牺弹,參數(shù)為一個分區(qū)與位移的map。由于一個消費者可能會消費多個分區(qū)时呀,所以這種方式會增加一定的代碼復(fù)雜度张漂,如下所示:
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;
....
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
if (count % 1000 == 0)
consumer.commitAsync(currentOffsets, null);
count++;
} }
代碼中在處理poll()消息的過程中,不斷保存分區(qū)與位移的關(guān)系谨娜,每處理1000條消息就會異步提交(也可以使用同步提交)航攒。
重平衡監(jiān)聽器(Rebalance Listener)
變更分區(qū)命令
kafka-topics.sh --zookeeper hadoop03:2181 --alter --topic test --partitions 6
在分區(qū)重平衡前,如果消費者知道它即將不再負責某個分區(qū)趴梢,那么它可能需要將已經(jīng)處理過的消息位移進行提交漠畜。Kafka的API允許我們在消費者新增分區(qū)或者失去分區(qū)時進行處理币他,我們只需要在調(diào)用subscribe()方法時傳入ConsumerRebalanceListener對象,該對象有兩個方法:
- public void onPartitionRevoked(Collection<topicpartition> partitions):此方法會在消費者停止消費消費后憔狞,在重平衡開始前調(diào)用蝴悉。</topicpartition>
- public void onPartitionAssigned(Collection<topicpartition> partitions):此方法在分區(qū)分配給消費者后,在消費者開始讀取消息前調(diào)用瘾敢。</topicpartition>
下面來看一個onPartitionRevoked9)的例子拍冠,該例子在消費者失去某個分區(qū)時提交位移(以便其他消費者可以接著消費消息并處理):
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
private class HandleRebalance implements ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Lost partitions in rebalance.
Committing current
offsets:" + currentOffsets);
consumer.commitSync(currentOffsets);
}
}
try {
consumer.subscribe(topics, new HandleRebalance());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
}
consumer.commitAsync(currentOffsets, null);
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync(currentOffsets);
} finally {
consumer.close();
System.out.println("Closed consumer and we are done");
}
}
代碼中實現(xiàn)了onPartitionsRevoked()方法,當消費者失去某個分區(qū)時簇抵,會提交已經(jīng)處理的消息位移(而不是poll()的最大位移)庆杜。上面代碼會提交所有的分區(qū)位移,而不僅僅是失去分區(qū)的位移正压,但這種做法沒什么壞處欣福。
從指定位移開始消費
在此之前,我們使用poll()來從最后的提交位移開始消費焦履,但我們也可以從一個指定的位移開始消費拓劝。
如果想從分區(qū)開始端重新開始消費,那么可以使用seekToBeginning(TopicPartition tp)嘉裤;如果想從分區(qū)的最末端消費最新的消息郑临,那么可以使用seekToEnd(TopicPartition tp)。而且屑宠,Kafka還支持我們從指定位移開始消費厢洞。從指定位移開始消費的應(yīng)用場景有很多,其中最典型的一個是:位移存在其他系統(tǒng)(例如數(shù)據(jù)庫)中典奉,并且以其他系統(tǒng)的位移為準躺翻。
考慮這么個場景:我們從Kafka中讀取消費,然后進行處理卫玖,最后把結(jié)果寫入數(shù)據(jù)庫公你;我們既不想丟失消息,也不想數(shù)據(jù)庫中存在重復(fù)的消息數(shù)據(jù)假瞬。對于這樣的場景陕靠,我們可能會按如下邏輯處理:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset());
processRecord(record);
storeRecordInDB(record);
consumer.commitAsync(currentOffsets);
}
}
這個邏輯似乎沒什么問題,但是要注意到這么個事實脱茉,在持久化到數(shù)據(jù)庫成功后剪芥,提交位移到Kafka可能會失敗,那么這可能會導(dǎo)致消息會重復(fù)處理琴许。對于這種情況税肪,我們可以優(yōu)化方案,將持久化到數(shù)據(jù)庫與提交位移實現(xiàn)為原子性操作,也就是要么同時成功益兄,要么同時失敗签财。但這個是不可能的,因此我們可以在保存記錄到數(shù)據(jù)庫的同時偏塞,也保存位移,然后在消費者開始消費時使用數(shù)據(jù)庫的位移開始消費邦鲫。這個方案是可行的灸叼,我們只需要通過seek()來指定分區(qū)位移開始消費即可。下面是一個改進的樣例代碼:
public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
//在消費者負責的分區(qū)被回收前提交數(shù)據(jù)庫事務(wù)庆捺,保存消費的記錄和位移
commitDBTransaction();
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//在開始消費前古今,從數(shù)據(jù)庫中獲取分區(qū)的位移,并使用seek()來指定開始消費的位移
for(TopicPartition partition: partitions)
consumer.seek(partition, getOffsetFromDB(partition));
}
}
consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
//在subscribe()之后poll一次滔以,并從數(shù)據(jù)庫中獲取分區(qū)的位移捉腥,使用seek()來指定開始消費的位移
consumer.poll(0);
for (TopicPartition partition: consumer.assignment())
consumer.seek(partition, getOffsetFromDB(partition));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
processRecord(record);
//保存記錄結(jié)果
storeRecordInDB(record);
//保存位移
storeOffsetInDB(record.topic(), record.partition(), record.offset());
}
//提交數(shù)據(jù)庫事務(wù),保存消費的記錄以及位移
commitDBTransaction();
}
具體邏輯見代碼注釋你画,此處不再贅述抵碟。另外注意的是,seek()只是指定了poll()拉取的開始位移坏匪,這并不影響在Kafka中保存的提交位移(當然我們可以在seek和poll之后提交位移覆蓋)拟逮。
package com.neuedu;
import java.util.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
public class Consumer {
private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
static int count = 0;
static KafkaConsumer<String, String> consumer;
private class HandleRebalance implements ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Lost partitions in rebalance.Committing current offsets:" + currentOffsets);
consumer.commitSync(currentOffsets);
}
}
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers",
"hadoop03:9092,hadoop05:9092,hadoop06:9092");// 該地址是集群的子集,用來探測集群适滓。
props.put("group.id", "payment");// cousumer的分組id
props.put("enable.auto.commit", "false");// 自動提交offsets
props.put("auto.commit.interval.ms", "1000");// 每隔1s沟娱,自動提交offsets
props.put("session.timeout.ms", "30000");// Consumer向集群發(fā)送自己的心跳团搞,超時則認為Consumer已經(jīng)死了,kafka會把它的分區(qū)分配給其他進程
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");// 反序列化器
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
// consumer.subscribe(Arrays.asList("payment"));// 訂閱的topic,可以多個
String topic = "payment";
TopicPartition partition0 = new TopicPartition(topic, 0);
// TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0));
Collection<TopicPartition> partitions = Arrays.asList(partition0);
consumer.seekToBeginning(partitions);
// consumer.seek(partition0,495);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("賭東道賭東道賭東道賭東道賭東道賭東道 offset = %d, key = %s, value = %s, partition = %s",
record.offset(), record.key(), record.value(),record.partition());
System.out.println();
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
if (count % 1 == 0)
consumer.commitAsync(currentOffsets, null);
count++;
}
}
}
}