Kafka高級特性解析(一)
生產(chǎn)者
消息發(fā)送
數(shù)據(jù)生產(chǎn)流程解析
- Producer創(chuàng)建時(shí)闹瞧,會創(chuàng)建一個Sender線程并設(shè)置為守護(hù)線程。
- 生產(chǎn)消息時(shí),內(nèi)部其實(shí)是異步流程;生產(chǎn)的消息先經(jīng)過攔截器->序列化器->分區(qū)器铅鲤,然后將消息緩存在緩沖區(qū)(該緩沖區(qū)也是在Producer創(chuàng)建時(shí)創(chuàng)建)划提。
- 批次發(fā)送的條件為:緩沖區(qū)數(shù)據(jù)大小達(dá)到batch.size或者linger.ms達(dá)到上限,哪個先達(dá)到就算哪個邢享。
- 批次發(fā)送后鹏往,發(fā)往指定分區(qū),然后落盤到broker;如果生產(chǎn)者配置了retrires參數(shù)大于0并且失敗原因允許重試骇塘,那么客戶端內(nèi)部會對該消息進(jìn)行重試掸犬。
- 落盤到broker成功,返回生產(chǎn)元數(shù)據(jù)給生產(chǎn)者绪爸。
- 元數(shù)據(jù)返回有兩種方式:一種是通過阻塞直接返回湾碎,另一種是通過回調(diào)返回。
必要參數(shù)配置
-
broker配置
-
配置條目的使用方法
Map<String, Object> configs = new HashMap<>(); //初始鏈接 configs.put("bootstrap.servers", "59.110.241.53:9092"); //key的序列化類 configs.put("key.serializer", IntegerSerializer.class); //value的序列化類 configs.put("value.serializer", StringSerializer.class);
配置參數(shù)
-
屬性 | 說明 | 重要性 |
---|---|---|
bootstrap.servers | 生產(chǎn)者客戶端與broker集群建立初始連接需要的broker地址列表奠货, 由該初始連接發(fā)現(xiàn)Kafka集群中其他的所有broker介褥。該地址列表不需 要寫全部的Kafka集群中broker的地址,但也不要寫一個递惋,以防該節(jié) 點(diǎn)宕機(jī)的時(shí)候不可用柔滔。形式為: host1:port1,host2:port2,... . | high |
key.serializer | 實(shí)現(xiàn)了接口 org.apache.kafka.common.serialization.Serializer 的key序列化類。 | high |
value.serializer | 實(shí)現(xiàn)了接口 org.apache.kafka.common.serialization.Serializer 的value序列化類萍虽。 | high |
acks | 該選項(xiàng)控制著已發(fā)送消息的持久性睛廊。 acks=0 :生產(chǎn)者不等待broker的任何消息確認(rèn)。只要將消息放到了socket的緩沖區(qū)杉编,就認(rèn)為消息已發(fā)送超全。不能保證服務(wù)器是否收到該 消息, retries 設(shè)置也不起作用邓馒,因?yàn)榭蛻舳瞬魂P(guān)心消息是否發(fā)送 失敗嘶朱。客戶端收到的消息偏移量永遠(yuǎn)是-1光酣。<br />acks=1 :leader將記錄寫到它本地日志疏遏,就響應(yīng)客戶端確認(rèn)消息, 而不等待follower副本的確認(rèn)救军。如果leader確認(rèn)了消息就宕機(jī)财异,則可 能會丟失消息,因?yàn)閒ollower副本可能還沒來得及同步該消息唱遭。<br />acks=all :leader等待所有同步的副本確認(rèn)該消息戳寸。保證了只要有 一個同步副本存在,消息就不會丟失胆萧。這是最強(qiáng)的可用性保證再榄。等 價(jià)于acks=-1。默認(rèn)值為1鲸阔,字符串菲嘴。可選值:[all, -1, 0, 1] | high |
compression.type | 生產(chǎn)者生成數(shù)據(jù)的壓縮格式蚌吸。默認(rèn)是none(沒有壓縮)锈拨。允許的值:none,gzip ,snappy 和 lz4 。壓縮是對整個消息批次來講 的羹唠。消息批的效率也影響壓縮的比例奕枢。消息批越大,壓縮效率越 好佩微。字符串類型的值缝彬。默認(rèn)是none。 | high |
retries | 設(shè)置該屬性為一個大于1的值哺眯,將在消息發(fā)送失敗的時(shí)候重新發(fā)送消 息谷浅。該重試與客戶端收到異常重新發(fā)送并無二至。允許重試但是不 設(shè)置 max.in.flight.requests.per.connection 為1奶卓,存在消息 亂序的可能一疯,因?yàn)槿绻麅蓚€批次發(fā)送到同一個分區(qū),第一個失敗了 重試夺姑,第二個成功了墩邀,則第一個消息批在第二個消息批后。int類型 的值盏浙,默認(rèn):0眉睹,可選值:[0,...,2147483647] | high |
序列化器
由于Kafka中的數(shù)據(jù)都是字節(jié)數(shù)組,在將消息發(fā)送到Kafka之前需要先將數(shù)據(jù)序列化為字節(jié)數(shù)組废膘。 序列化器的作用就是用于序列化要發(fā)送的消息的辣往。Kafka使用 org.apache.kafka.common.serialization.Serializer 接口用于定義序列化器,將 泛型指定類型的數(shù)據(jù)轉(zhuǎn)換為字節(jié)數(shù)組殖卑。
package org.apache.kafka.common.serialization;
import java.io.Closeable;
import java.util.Map;
/**
* 將對象轉(zhuǎn)換為byte數(shù)組的接口
*
* 該接口的實(shí)現(xiàn)類需要提供無參構(gòu)造器 * @param <T> 從哪個類型轉(zhuǎn)換
*/
public interface Serializer<T> extends Closeable {
/**
* 類的配置信息
* @param configs key/value pairs
* @param isKey key的序列化還是value的序列化
*/
void configure(Map<String, ?> var1, boolean var2);
/**
*
* 將對象轉(zhuǎn)換為字節(jié)數(shù)組
* @param topic 主題名稱
* @param data 需要轉(zhuǎn)換的對象
* @return 序列化的字節(jié)數(shù)組
*/
byte[] serialize(String var1, T var2);
/**
* 關(guān)閉序列化器
* 該方法需要提供冪等性站削,因?yàn)榭赡苷{(diào)用多次。
*/
void close();
}
系統(tǒng)提供了該接口的子接口以及實(shí)現(xiàn)類
org.apache.kafka.common.serialization.ByteArraySerializer
package org.apache.kafka.common.serialization;
import java.util.Map;
public class ByteArraySerializer implements Serializer<byte[]> {
public void configure(Map<String, ?> configs, boolean isKey) {
}
public byte[] serialize(String topic, byte[] data) {
return data;
}
public void close() {
}
}
org.apache.kafka.common.serialization.ByteBufferSerializer
public class ByteBufferSerializer implements Serializer<ByteBuffer> {
public ByteBufferSerializer() {
}
public void configure(Map<String, ?> configs, boolean isKey) {
}
public byte[] serialize(String topic, ByteBuffer data) {
if (data == null) {
return null;
} else {
data.rewind();
byte[] arr;
if (data.hasArray()) {
arr = data.array();
if (data.arrayOffset() == 0 && arr.length == data.remaining()) {
return arr;
}
}
arr = new byte[data.remaining()];
data.get(arr, 0, arr.length);
data.rewind();
return arr;
}
}
public void close() {
}
}
org.apache.kafka.common.serialization.BytesSerializer:
public class BytesSerializer implements Serializer<Bytes> {
public BytesSerializer() {
}
public void configure(Map<String, ?> configs, boolean isKey) {
}
public byte[] serialize(String topic, Bytes data) {
return data == null ? null : data.get();
}
public void close() {
}
}
org.apache.kafka.common.serialization.DoubleSerializer
public class DoubleSerializer implements Serializer<Double> {
public DoubleSerializer() {
}
public void configure(Map<String, ?> configs, boolean isKey) {
}
public byte[] serialize(String topic, Double data) {
if (data == null) {
return null;
} else {
long bits = Double.doubleToLongBits(data);
return new byte[]{(byte)((int)(bits >>> 56)), (byte)((int)(bits >>> 48)), (byte)((int)(bits >>> 40)), (byte)((int)(bits >>> 32)), (byte)((int)(bits >>> 24)), (byte)((int)(bits >>> 16)), (byte)((int)(bits >>> 8)), (byte)((int)bits)};
}
}
public void close() {
}
}
org.apache.kafka.common.serialization.FloatSerializer
package org.apache.kafka.common.serialization;
import java.util.Map;
public class FloatSerializer implements Serializer<Float> {
public FloatSerializer() {
}
public void configure(Map<String, ?> configs, boolean isKey) {
}
public byte[] serialize(String topic, Float data) {
if (data == null) {
return null;
} else {
long bits = (long)Float.floatToRawIntBits(data);
return new byte[]{(byte)((int)(bits >>> 24)), (byte)((int)(bits >>> 16)), (byte)((int)(bits >>> 8)), (byte)((int)bits)};
}
}
public void close() {
}
}
org.apache.kafka.common.serialization.IntegerSerializer
public class IntegerSerializer implements Serializer<Integer> {
public IntegerSerializer() {
}
public void configure(Map<String, ?> configs, boolean isKey) {
}
public byte[] serialize(String topic, Integer data) {
return data == null ? null : new byte[]{(byte)(data >>> 24), (byte)(data >>> 16), (byte)(data >>> 8), data.byteValue()};
}
public void close() {
}
}
org.apache.kafka.common.serialization.StringSerializer
public class StringSerializer implements Serializer<String> {
private String encoding = "UTF8";
public StringSerializer() {
}
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null) {
encodingValue = configs.get("serializer.encoding");
}
if (encodingValue != null && encodingValue instanceof String) {
this.encoding = (String)encodingValue;
}
}
public byte[] serialize(String topic, String data) {
try {
return data == null ? null : data.getBytes(this.encoding);
} catch (UnsupportedEncodingException var4) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + this.encoding);
}
}
public void close() {
}
}
org.apache.kafka.common.serialization.LongSerializer
public class LongSerializer implements Serializer<Long> {
public LongSerializer() {
}
public void configure(Map<String, ?> configs, boolean isKey) {
}
public byte[] serialize(String topic, Long data) {
return data == null ? null : new byte[]{(byte)((int)(data >>> 56)), (byte)((int)(data >>> 48)), (byte)((int)(data >>> 40)), (byte)((int)(data >>> 32)), (byte)((int)(data >>> 24)), (byte)((int)(data >>> 16)), (byte)((int)(data >>> 8)), data.byteValue()};
}
public void close() {
}
}
org.apache.kafka.common.serialization.ShortSerializer
public class ShortSerializer implements Serializer<Short> {
public ShortSerializer() {
}
public void configure(Map<String, ?> configs, boolean isKey) {
}
public byte[] serialize(String topic, Short data) {
return data == null ? null : new byte[]{(byte)(data >>> 8), data.byteValue()};
}
public void close() {
}
}
自定義序列化器
數(shù)據(jù)的序列化一般生產(chǎn)中使用avro孵稽。自定義序列化器需要實(shí)現(xiàn)org.apache.kafka.common.serialization.Serializer<T>接口许起,并實(shí)現(xiàn)其中的 serialize 方法。
實(shí)體:
package com.hhb.kafka.demo2;
/**
* @description: 用戶自定義封裝消息的實(shí)體類
* @author:
* @date: 2020-08-13 20:01
**/
public class User {
private Integer userId;
private String userName;
public Integer getUserId() {
return userId;
}
public User setUserId(Integer userId) {
this.userId = userId;
return this;
}
public String getUserName() {
return userName;
}
public User setUserName(String userName) {
this.userName = userName;
return this;
}
@Override
public String toString() {
return "User{" +
"userId=" + userId +
", userName='" + userName + '\'' +
'}';
}
}
自定義序列化器
package com.hhb.kafka.demo2;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;
/**
* @description:
* @author:
* @date: 2020-08-13 20:05
**/
public class UserSerializer implements Serializer<User> {
/**
* 用戶接收對序列化器的配置參數(shù)菩鲜,并對當(dāng)前序列化器進(jìn)行配置和初始化的
*
* @param map
* @param b
*/
@Override
public void configure(Map<String, ?> map, boolean b) {
//do nothing
}
/**
* 將User的數(shù)據(jù)轉(zhuǎn)化成字節(jié)數(shù)組
*
* @param s
* @param user
* @return
*/
@Override
public byte[] serialize(String s, User user) {
if (user == null) {
return null;
}
Integer userId = user.getUserId();
String userName = user.getUserName();
try {
if (userId != null && userName != null) {
byte[] bytes = userName.getBytes("UTF-8");
int length = bytes.length;
//申請一塊內(nèi)存园细,存放數(shù)據(jù)
//第一個4個字節(jié),用于存儲userId的值
//第二個4個字節(jié)接校,用于存放userName字節(jié)數(shù)組的長度int值
//第三個長度猛频,用于存放userName序列化之后的字節(jié)數(shù)組
ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4 + length);
//設(shè)置userId
byteBuffer.putInt(userId);
//設(shè)置長度
byteBuffer.putInt(length);
//設(shè)置序列化后的userName
byteBuffer.put(bytes);
//返回
return byteBuffer.array();
}
} catch (Exception e) {
throw new SerializationException("序列化對象:User 異常");
}
return null;
}
/**
* 用戶關(guān)閉資源等操作狮崩,需要冪等
*/
@Override
public void close() {
//do nothing
}
}
生產(chǎn)者
package com.hhb.kafka.demo2;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
/**
* @description:
* @author:
* @date: 2020-08-13 20:21
**/
public class MyProducer {
public static void main(String[] args) {
Map<String, Object> map = new HashMap<>();
map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hhb:9092");
map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//設(shè)置自定義的序列化器
map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class);
KafkaProducer<String, User> producer = new KafkaProducer<String, User>(map);
User user = new User();
// user.setUserId(113).setUserName("李四");
user.setUserId(113).setUserName("張三");
ProducerRecord<String, User> record = new ProducerRecord<String, User>(
"topic_user_1", //主題
user.getUserName(), //key
user // value
);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
System.err.println("發(fā)送失敗");
}
System.err.println("輸出分區(qū)信息:" + recordMetadata.partition());
System.err.println("輸出主題信息:" + recordMetadata.topic());
System.err.println("輸出偏移量信息:" + recordMetadata.offset());
}
});
producer.close();
}
}
分區(qū)器
默認(rèn)(DefaultPartitioner)分區(qū)計(jì)算:
- 如果record提供了分區(qū)號,則使用record提供的分區(qū)號
- 如果record沒有提供分區(qū)號鹿寻,則使用key的序列化后的值的hash值對分區(qū)數(shù)量取模
- 如果record沒有提供分區(qū)號睦柴,也沒有提供key,則使用輪詢的方式分配分區(qū)號毡熏。
- 會首先在可用的分區(qū)中分配分區(qū)號
- 如果沒有可用的分區(qū)坦敌,則在該主題所有分區(qū)中分配分區(qū)號。
默認(rèn)的分區(qū)器實(shí)現(xiàn)Partitioner接口
public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();
public DefaultPartitioner() {
}
public void configure(Map<String, ?> configs) {
}
/**
* 為指定的消息記錄計(jì)算分區(qū)值 *
* @param topic 主題名稱
* @param key 根據(jù)該key的值進(jìn)行分區(qū)計(jì)算痢法,如果沒有則為null狱窘。
* @param keyBytes key的序列化字節(jié)數(shù)組,根據(jù)該數(shù)組進(jìn)行分區(qū)計(jì)算财搁。如果沒有key蘸炸,則為
null
* @param value 根據(jù)value值進(jìn)行分區(qū)計(jì)算,如果沒有尖奔,則為null
* @param valueBytes value的序列化字節(jié)數(shù)組搭儒,根據(jù)此值進(jìn)行分區(qū)計(jì)算。如果沒有越锈,則為
null
* @param cluster 當(dāng)前集群的元數(shù)據(jù)
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//獲取該topic的所有的分區(qū)
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
//當(dāng)前分區(qū)大小
int numPartitions = partitions.size();
//如果不存在key
if (keyBytes == null) {
//獲取下次的值
int nextValue = this.nextValue(topic);
//查看主題可用的分區(qū)數(shù)
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
//如果可用分區(qū)數(shù)大于0仗嗦,
if (availablePartitions.size() > 0) {
//使用獲取的值counter對可用分區(qū)數(shù)取余,返回的是索引
int part = Utils.toPositive(nextValue) % availablePartitions.size();
//根據(jù)索引甘凭,獲取真正要放對的分區(qū)對應(yīng)的分區(qū)編號
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
//如果沒有可用分區(qū)稀拐,直接用總的分區(qū)進(jìn)行返回
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
//如果存在key,對key進(jìn)行hash計(jì)算后對分區(qū)數(shù)進(jìn)行取余丹弱。就是該消息要進(jìn)入的分區(qū)
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
//根據(jù)topic獲取一個counter的計(jì)數(shù)
AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
if (null == counter) {//如果counter不存在
//隨機(jī)生成一個
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
//生成后的放到再根據(jù)topic放到定義好的topicCounterMap中
AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
//并將該值賦值給counter
if (currentCounter != null) {
counter = currentCounter;
}
}
//放回counter的值德撬,并對counter+1,方便下次獲取躲胳。
return counter.getAndIncrement();
}
public void close() {
}
}
如果有提供分區(qū)號蜓洪,在KafkaProducer類中看doSend方法的
int partition = this.partition(record, serializedKey, serializedValue, cluster);
partition:
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
//如果有分區(qū)號,直接返回分區(qū)號坯苹,如果沒有隆檀,調(diào)用的上面的partition方法
return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
如果要自定義分區(qū)器,則需要
- 首先開發(fā)Partitioner接口的實(shí)現(xiàn)類
- 在KafkaProducer中進(jìn)行設(shè)置:configs.put("partitioner.class", "xxx.xx.Xxx.class")
位于 org.apache.kafka.clients.producer 中的分區(qū)器接口:
自定義分區(qū)器
package com.hhb.kafka.partition;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* @description: 自定義分區(qū)器
* @author:
* @date: 2020-08-13 21:06
**/
public class MyPartitioner implements Partitioner {
/**
* 為指定的消息記錄計(jì)算分區(qū)值 *
*
* @param topic 主題名稱
* @param key 根據(jù)該key的值進(jìn)行分區(qū)計(jì)算粹湃,如果沒有則為null恐仑。
* @param keyBytes key的序列化字節(jié)數(shù)組,根據(jù)該數(shù)組進(jìn)行分區(qū)計(jì)算为鳄。如果沒有key裳仆,則為
* null
* @param value 根據(jù)value值進(jìn)行分區(qū)計(jì)算,如果沒有孤钦,則為null
* @param valueBytes value的序列化字節(jié)數(shù)組歧斟,根據(jù)此值進(jìn)行分區(qū)計(jì)算纯丸。如果沒有,則為
* null
* @param cluster 當(dāng)前集群的元數(shù)據(jù)
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//此處可以計(jì)算分區(qū)的數(shù)字
//在這我們直接指定分區(qū)2
return 2;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
生產(chǎn)者
package com.hhb.kafka.partition;
import com.hhb.kafka.serialize.UserSerializer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
/**
* @description:
* @author:
* @date: 2020-08-13 20:44
**/
public class MyProducer {
public static void main(String[] args) {
Map<String, Object> map = new HashMap<>();
map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hhb:9092");
map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//指定自定義分區(qū)器
map.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(map);
//不要設(shè)置指定的分區(qū)
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"topic_partition_01",
"myKey",
"myValue"
);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.err.println("輸出分區(qū)信息:" + recordMetadata.partition());
System.err.println("輸出主題信息:" + recordMetadata.topic());
System.err.println("輸出偏移量信息:" + recordMetadata.offset());
}
});
producer.close();
}
}
攔截器
Producer攔截器(interceptor)和Consumer端Interceptor是在Kafka 0.10版本被引入的静袖,主要用于實(shí)現(xiàn)Client端的定制化控制邏輯觉鼻。
對于Producer而言,Interceptor使得用戶在消息發(fā)送前以及Producer回調(diào)邏輯前有機(jī)會對消息做一些定制化需求勾徽,比如修改消息等滑凉。同時(shí)统扳,Producer允許用戶指定多個Interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain)喘帚。Intercetpor的實(shí)現(xiàn)接口是 org.apache.kafka.clients.producer.ProducerInterceptor,其定義的方法包括:
- onSend(ProducerRecord):該方法封裝進(jìn)KafkaProducer.send方法中咒钟,即運(yùn)行在用戶主線 程中吹由。Producer確保在消息被序列化以計(jì)算分區(qū)前調(diào)用該方法。用戶可以在該方法中對消息 做任何操作朱嘴,但最好保證不要修改消息所屬的topic和分區(qū)倾鲫,否則會影響目標(biāo)分區(qū)的計(jì)算。
- onAcknowledgement(RecordMetadata, Exception):該方法會在消息被應(yīng)答之前或消息發(fā) 送失敗時(shí)調(diào)用萍嬉,并且通常都是在Producer回調(diào)邏輯觸發(fā)之前乌昔。onAcknowledgement運(yùn)行在Producer的IO線程中,因此不要在該方法中放入很重的邏輯壤追,否則會拖慢Producer的消息發(fā) 送效率磕道。
- close:關(guān)閉Interceptor,主要用于執(zhí)行一些資源清理工作行冰。
如前所述溺蕉,Interceptor可能被運(yùn)行在多個線程中,因此在具體實(shí)現(xiàn)時(shí)用戶需要自行確保線程安全悼做。 另外倘若指定了多個Interceptor疯特,則Producer將按照指定順序調(diào)用它們,并僅僅是捕獲每個 Interceptor可能拋出的異常記錄到錯誤日志中而非在向上傳遞肛走。這在使用過程中要特別留意漓雅。
- 實(shí)現(xiàn)ProducerInterceptor接口
- 在KafkaProducer的設(shè)置中設(shè)置自定義的攔截器
攔截器1
package com.hhb.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* @description:
* @author:
* @date: 2020-08-13 21:32
**/
public class MyInterceptor1 implements ProducerInterceptor<Integer, String> {
private final static Logger logger = LoggerFactory.getLogger(MyInterceptor1.class);
/**
* 消息發(fā)送的時(shí)候,經(jīng)過攔截器朽色,調(diào)用該方法
*/
@Override
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
System.err.println("攔截器1 --- go");
//要發(fā)送的消息
String topic = record.topic();
Integer key = record.key();
String value = record.value();
Integer partition = record.partition();
Long timestamp = record.timestamp();
Headers headers = record.headers();
//攔截器攔下來之后的根據(jù)原來消息創(chuàng)建新的消息邻吞,此處沒有做任何改動
ProducerRecord<Integer, String> newRecord = new ProducerRecord<>(
topic, partition, timestamp, key, value, headers
);
//傳遞新的消息
return newRecord;
}
/**
* 消息確認(rèn)或異常的時(shí)候,調(diào)用該方法纵搁,該方法不應(yīng)該實(shí)現(xiàn)多大的任務(wù)吃衅,會影響生產(chǎn)者性能
*
* @param recordMetadata
* @param e
*/
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
System.err.println("攔截器1 --- back");
}
@Override
public void close() {
}
/**
* 可以獲取到生產(chǎn)的配置信息
*
* @param map
*/
@Override
public void configure(Map<String, ?> map) {
Object testConfig = map.get("testConfig");
System.err.println("獲取到的testConfig值為:" + testConfig);
}
}
攔截器2
package com.hhb.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* @description:
* @author:
* @date: 2020-08-13 21:32
**/
public class MyInterceptor2 implements ProducerInterceptor<Integer, String> {
private final static Logger logger = LoggerFactory.getLogger(MyInterceptor2.class);
/**
* 消息發(fā)送的時(shí)候,經(jīng)過攔截器腾誉,調(diào)用該方法
*/
@Override
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
System.err.println("攔截器2 --- go");
//要發(fā)送的消息
String topic = record.topic();
Integer key = record.key();
String value = record.value();
Integer partition = record.partition();
Long timestamp = record.timestamp();
Headers headers = record.headers();
//攔截器攔下來之后的根據(jù)原來消息創(chuàng)建新的消息徘层,此處沒有做任何改動
ProducerRecord<Integer, String> newRecord = new ProducerRecord<>(
topic, partition, timestamp, key, value, headers
);
//傳遞新的消息
return newRecord;
}
/**
* 消息確認(rèn)或異常的時(shí)候峻呕,調(diào)用該方法,該方法不應(yīng)該實(shí)現(xiàn)多大的任務(wù)趣效,會影響生產(chǎn)者性能
*
* @param recordMetadata
* @param e
*/
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
System.err.println("攔截器2 --- back");
}
@Override
public void close() {
}
/**
* 可以獲取到生產(chǎn)的配置信息
*
* @param map
*/
@Override
public void configure(Map<String, ?> map) {
Object testConfig = map.get("testConfig");
System.err.println("獲取到的testConfig值為:" + testConfig);
}
}
攔截器3
package com.hhb.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* @description:
* @author:
* @date: 2020-08-13 21:32
**/
public class MyInterceptor3 implements ProducerInterceptor<Integer, String> {
private final static Logger logger = LoggerFactory.getLogger(MyInterceptor3.class);
/**
* 消息發(fā)送的時(shí)候瘦癌,經(jīng)過攔截器,調(diào)用該方法
*/
@Override
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
System.err.println("攔截器3 --- go");
//要發(fā)送的消息
String topic = record.topic();
Integer key = record.key();
String value = record.value();
Integer partition = record.partition();
Long timestamp = record.timestamp();
Headers headers = record.headers();
//攔截器攔下來之后的根據(jù)原來消息創(chuàng)建新的消息跷敬,此處沒有做任何改動
ProducerRecord<Integer, String> newRecord = new ProducerRecord<>(
topic, partition, timestamp, key, value, headers
);
//傳遞新的消息
return newRecord;
}
/**
* 消息確認(rèn)或異常的時(shí)候讯私,調(diào)用該方法,該方法不應(yīng)該實(shí)現(xiàn)多大的任務(wù)西傀,會影響生產(chǎn)者性能
*
* @param recordMetadata
* @param e
*/
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
System.err.println("攔截器3 --- back");
}
@Override
public void close() {
}
/**
* 可以獲取到生產(chǎn)的配置信息
*
* @param map
*/
@Override
public void configure(Map<String, ?> map) {
Object testConfig = map.get("testConfig");
System.err.println("獲取到的testConfig值為:" + testConfig);
}
}
生產(chǎn)者
package com.hhb.kafka.interceptor;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
/**
* @description:
* @author:
* @date: 2020-08-13 20:44
**/
public class MyProducer {
public static void main(String[] args) {
Map<String, Object> map = new HashMap<>();
map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hhb:9092");
map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//設(shè)置攔截器斤寇,如果設(shè)置多個攔截器,則填寫多個攔截器的全限定類名拥褂,中間用逗號隔開
map.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.hhb.kafka.interceptor.MyInterceptor1,com.hhb.kafka.interceptor.MyInterceptor2,com.hhb.kafka.interceptor.MyInterceptor3");
//測試使用的配置
map.put("testConfig", "this is test config");
KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(map);
//不要設(shè)置指定的分區(qū)
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
"topic_interception_01",
0,
123,
"myValue"
);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.err.println("輸出分區(qū)信息:" + recordMetadata.partition());
System.err.println("輸出主題信息:" + recordMetadata.topic());
System.err.println("輸出偏移量信息:" + recordMetadata.offset());
}
});
producer.close();
}
}
原理剖析
由上圖可以看出:KafkaProducer有兩個基本線程:
-
主線程:負(fù)責(zé)消息創(chuàng)建攔截器娘锁、序列化器、分區(qū)器等操作饺鹃,并將消息追加到消息收集器RecoderAccumulator中
消息收集器RecoderAccumulator為每個分區(qū)都維護(hù)了一個 Deque< ProducerBatch > 類型的雙端隊(duì)列莫秆。
ProducerBatch 可以理解為是 ProducerRecord 的集合,批量發(fā)送有利于提升吞吐 量悔详,降低網(wǎng)絡(luò)影響;
由于生產(chǎn)者客戶端使用 java.io.ByteBuffer 在發(fā)送消息之前進(jìn)行消息保存镊屎,并維護(hù) 了一個 BufferPool 實(shí)現(xiàn) ByteBuffer 的復(fù)用;該緩存池只針對特定大小( batch.size 指定)的 ByteBuffer進(jìn)行管理,對于消息過大的緩存茄螃,不能做到重復(fù)利 用缝驳。
每次追加一條ProducerRecord消息,會尋找/新建對應(yīng)的雙端隊(duì)列责蝠,從其尾部獲取一個ProducerBatch党巾,判斷當(dāng)前消息的大小是否可以寫入該批次中。若可以寫入則寫入;若不可以寫入霜医,則新建一個ProducerBatch齿拂,判斷該消息大小是否超過客戶端參數(shù)配置 batch.size 的值,不超過肴敛,則以 batch.size建立新的ProducerBatch署海, 這樣方便進(jìn)行緩存重復(fù)利用;若超過,則以計(jì)算的消息大小建立對應(yīng)的 ProducerBatch 医男,缺點(diǎn)就是該內(nèi)存不能被復(fù)用了砸狞。
-
Sender線程:
- 該線程從消息收集器獲取緩存的消息,將其處理為 <Node, List< ProducerBatch> 的形式镀梭, Node 表示集群的broker節(jié)點(diǎn)刀森。
- 進(jìn)一步將<Node, List< ProducerBatch>轉(zhuǎn)化為<Node, Request>形式,此時(shí)才可以 向服務(wù)端發(fā)送數(shù)據(jù)报账。
- 在發(fā)送之前研底,Sender線程將消息以 Map<NodeId, Deque< Request>> 的形式保存 到 InFlightRequests 中進(jìn)行緩存埠偿,可以通過其獲取 leastLoadedNode ,即當(dāng)前Node中負(fù)載壓力最小的一個,以實(shí)現(xiàn)消息的盡快發(fā)出榜晦。
生產(chǎn)者參數(shù)配置補(bǔ)充
-
參數(shù)設(shè)置方式:
Map<String, Object> map = new HashMap<>(); map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hhb:9092"); map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //設(shè)置攔截器冠蒋,如果設(shè)置多個攔截器,則填寫多個攔截器的全限定類名乾胶,中間用逗號隔開 map.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.hhb.kafka.interceptor.MyInterceptor1,com.hhb.kafka.interceptor.MyInterceptor2,com.hhb.kafka.interceptor.MyInterceptor3"); KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(map);
補(bǔ)充參數(shù):
參數(shù)名稱 | 描述 |
---|---|
retry.backoff.ms | 在向一個指定的主題分區(qū)重發(fā)消息的時(shí)候抖剿,重試之間的等待時(shí)間。 比如3次重試识窿,每次重試之后等待該時(shí)間長度斩郎,再接著重試。在一些失敗的場景腕扶,避免了密集循環(huán)的重新發(fā)送請求孽拷。 long型值吨掌,默認(rèn)100半抱。可選值:[0,...] |
retries | retries重試次數(shù)膜宋,當(dāng)消息發(fā)送出現(xiàn)錯誤的時(shí)候窿侈,系統(tǒng)會重發(fā)消息。 跟客戶端收到錯誤時(shí)重發(fā)一樣秋茫。 如果設(shè)置了重試史简,還想保證消息的有序性,需要設(shè)置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1肛著,否則在重試此失敗消息的時(shí)候圆兵,其他的消息可能發(fā)送成功了 |
request.timeout.ms | 客戶端等待請求響應(yīng)的最大時(shí)長。如果服務(wù)端響應(yīng)超時(shí)枢贿,則會重發(fā)請求殉农,除非達(dá)到重試次數(shù)。該設(shè)置應(yīng)該比replica.lag.time.max.ms (a broker configuration)要大局荚,以免在服務(wù)器延遲時(shí)間內(nèi)重發(fā)消息超凳。int類型值,默 認(rèn):30000耀态,可選值:[0,...] |
interceptor.classes | 在生產(chǎn)者接收到該消息轮傍,向Kafka集群傳輸之前,由序列化器處理之前首装,可以通過攔截器對消息進(jìn)行處理创夜。要求攔截器類必須實(shí)現(xiàn)org.apache.kafka.clients.producer.ProducerInterceptor 接口。 默認(rèn)沒有攔截器仙逻。 Map<String, Object> configs中通過List集合配置多個攔截器類名驰吓。 |
acks | 當(dāng)生產(chǎn)者發(fā)送消息之后揍魂,如何確認(rèn)消息已經(jīng)發(fā)送成功了。 支持的值:<br />acks=0: 如果設(shè)置為0棚瘟,表示生產(chǎn)者不會等待broker對消息的確認(rèn)现斋,只要將消息放到 緩沖區(qū),就認(rèn)為消息已經(jīng)發(fā)送完成偎蘸。 該情形不能保證broker是否真的收到了消息庄蹋,retries配置也不會生效,因?yàn)?客戶端不需要知道消息是否發(fā)送成功迷雪。 發(fā)送的消息的返回的消息偏移量永遠(yuǎn)是-1限书。<br />acks=1表示消息只需要寫到主分區(qū)即可,然后就響應(yīng)客戶端章咧,而不等待副本分區(qū)的 確認(rèn)倦西。 在該情形下,如果主分區(qū)收到消息確認(rèn)之后就宕機(jī)了赁严,而副本分區(qū)還沒來得 及同步該消息扰柠,則該消息丟失。<br />acks=all 首領(lǐng)分區(qū)會等待所有的ISR副本分區(qū)確認(rèn)記錄疼约。 該處理保證了只要有一個ISR副本分區(qū)存貨卤档,消息就不會丟失。 這是Kafka最強(qiáng)的可靠性保證程剥,等效于 acks=-1 劝枣。 |
batch.size | 當(dāng)多個消息發(fā)送到同一個分區(qū)的時(shí)候,生產(chǎn)者嘗試將多個記錄作為一個批來處理织鲸。批處理提高了客戶端和服務(wù)器的處理效率舔腾。 該配置項(xiàng)以字節(jié)為單位控制默認(rèn)批的大小。 所有的批小于等于該值搂擦。 發(fā)送給broker的請求將包含多個批次稳诚,每個分區(qū)一個,并包含可發(fā)送的數(shù) 據(jù)盾饮。 如果該值設(shè)置的比較小采桃,會限制吞吐量(設(shè)置為0會完全禁用批處理)。如果設(shè)置的很大丘损,又有一點(diǎn)浪費(fèi)內(nèi)存普办,因?yàn)镵afka會永遠(yuǎn)分配這么大的內(nèi)存來 參與到消息的批整合中。 |
client.id | 生產(chǎn)者發(fā)送請求的時(shí)候傳遞給broker的id字符串徘钥。 用于在broker的請求日志中追蹤什么應(yīng)用發(fā)送了什么消息沐兰。 一般該id是跟業(yè)務(wù)有關(guān)的字符串斩祭。 |
compression.type | 生產(chǎn)者發(fā)送的所有數(shù)據(jù)的壓縮方式攒庵。默認(rèn)是none薇正,也就是不壓縮吏口。 支持的值:none、gzip、snappy和lz4。 壓縮是對于整個批來講的拘荡,所以批處理的效率也會影響到壓縮的比例。 |
send.buffer.bytes | TCP發(fā)送數(shù)據(jù)的時(shí)候使用的緩沖區(qū)(SO_SNDBUF)大小撬陵。如果設(shè)置為0珊皿,則使用操作系統(tǒng)默認(rèn)的。 |
buffer.memory | 生產(chǎn)者可以用來緩存等待發(fā)送到服務(wù)器的記錄的總內(nèi)存字節(jié)巨税。如果記錄的發(fā) 送速度超過了將記錄發(fā)送到服務(wù)器的速度蟋定,則生產(chǎn)者將阻塞 max.block.ms 的時(shí)間,此后它將引發(fā)異常草添。此設(shè)置應(yīng)大致對應(yīng)于生產(chǎn)者將使用的總內(nèi)存驶兜, 但并非生產(chǎn)者使用的所有內(nèi)存都用于緩沖。一些額外的內(nèi)存將用于壓縮(如 果啟用了壓縮)以及維護(hù)運(yùn)行中的請求远寸。long型數(shù)據(jù)抄淑。默認(rèn)值: 33554432,可選值:[0,...] |
connections.max.idle.ms | 當(dāng)連接空閑時(shí)間達(dá)到這個值而晒,就關(guān)閉連接蝇狼。long型數(shù)據(jù),默認(rèn):540000 |
linger.ms | 生產(chǎn)者在發(fā)送請求傳輸間隔會對需要發(fā)送的消息進(jìn)行累積倡怎,然后作為一個批 次發(fā)送。一般情況是消息的發(fā)送的速度比消息累積的速度慢贱枣。有時(shí)客戶端需 要減少請求的次數(shù)监署,即使是在發(fā)送負(fù)載不大的情況下。該配置設(shè)置了一個延 遲纽哥,生產(chǎn)者不會立即將消息發(fā)送到broker钠乏,而是等待這么一段時(shí)間以累積消 息,然后將這段時(shí)間之內(nèi)的消息作為一個批次發(fā)送春塌。該設(shè)置是批處理的另一 個上限:一旦批消息達(dá)到了 batch.size 指定的值晓避,消息批會立即發(fā)送,如 果積累的消息字節(jié)數(shù)達(dá)不到 batch.size 的值只壳,可以設(shè)置該毫秒值俏拱,等待這 么長時(shí)間之后,也會發(fā)送消息批吼句。該屬性默認(rèn)值是0(沒有延遲)锅必。如果設(shè) 置 linger.ms=5 ,則在一個請求發(fā)送之前先等待5ms惕艳。long型值搞隐,默認(rèn): 0驹愚,可選值:[0,...] |
max.block.ms | 控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 阻塞的時(shí)長。當(dāng)緩存滿了或元數(shù)據(jù)不可用的時(shí)候劣纲,這些方法阻塞逢捺。在用戶提供的 序列化器和分區(qū)器的阻塞時(shí)間不計(jì)入。long型值癞季,默認(rèn):60000蒸甜,可選值: [0,...] |
max.request.size | 單個請求的最大字節(jié)數(shù)。該設(shè)置會限制單個請求中消息批的消息個數(shù)余佛,以免 單個請求發(fā)送太多的數(shù)據(jù)柠新。服務(wù)器有自己的限制批大小的設(shè)置,與該配置可 能不一樣辉巡。int類型值恨憎,默認(rèn)1048576,可選值:[0,...] |
partitioner.class | 實(shí)現(xiàn)了接口org.apache.kafka.clients.producer.Partitioner 的分區(qū) 器實(shí)現(xiàn)類郊楣。默認(rèn)值為:org.apache.kafka. clients.producer.internals.DefaultPartitioner |
receive.buffer.bytes | TCP接收緩存(SO_RCVBUF)憔恳,如果設(shè)置為-1,則使用操作系統(tǒng)默認(rèn)的值净蚤。 int類型值钥组,默認(rèn)32768,可選值:[-1,...] |
security.protocol | 跟broker通信的協(xié)議:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. string類型值今瀑,默認(rèn):PLAINTEXT |
max.in.flight.requests.per .connection | 單個連接上未確認(rèn)請求的最大數(shù)量程梦。達(dá)到這個數(shù)量,客戶端阻塞橘荠。如果該值 大于1屿附,且存在失敗的請求,在重試的時(shí)候消息順序不能保證哥童。 int類型值挺份,默認(rèn)5≈福可選值:[1,...] |
reconnect.backoff.max.ms | 對于每個連續(xù)的連接失敗匀泊,每臺主機(jī)的退避將成倍增加,直至達(dá)到此最大 值朵你。在計(jì)算退避增量之后各聘,添加20%的隨機(jī)抖動以避免連接風(fēng)暴。 long型值撬呢,默認(rèn)1000伦吠,可選值:[0,...] |
reconnect.backoff.ms | 嘗試重連指定主機(jī)的基礎(chǔ)等待時(shí)間。避免了到該主機(jī)的密集重連。該退避時(shí) 間應(yīng)用于該客戶端到broker的所有連接毛仪。 long型值搁嗓,默認(rèn)50∠溲ィ可選值:[0,...] |
消費(fèi)者
概念入門
消費(fèi)者腺逛、消費(fèi)組
消費(fèi)者從訂閱的主題消費(fèi)消息,消費(fèi)消息的偏移量保存在Kafka的名字是 __consumer_offsets 的主題中衡怀。 消費(fèi)者還可以將自己的偏移量存儲到Zookeeper棍矛,需要設(shè)置offset.storage=zookeeper。推薦使用Kafka存儲消費(fèi)者的偏移量抛杨。因?yàn)閆ookeeper不適合高并發(fā)够委。多個從同一個主題消費(fèi)的消費(fèi)者可以加入到一個消費(fèi)組中。 消費(fèi)組中的消費(fèi)者共享group_id怖现。
configs.put("group.id", "xxx");
group_id一般設(shè)置為應(yīng)用的邏輯名稱茁帽。比如多個訂單處理程序組成一個消費(fèi)組,可以設(shè)置group_id 為"order_process"潘拨。group_id通過消費(fèi)者的配置指定: group.id=xxxxx 消費(fèi)組均衡地給消費(fèi)者分配分區(qū)茫船,每個分區(qū)只由消費(fèi)組中一個消費(fèi)者消費(fèi)。
一個擁有四個分區(qū)的主題晒骇,包含一個消費(fèi)者的消費(fèi)組。 此時(shí)剥啤,消費(fèi)組中的消費(fèi)者消費(fèi)主題中的所有分區(qū)。并且沒有重復(fù)的可能。如果在消費(fèi)組中添加一個消費(fèi)者2,則每個消費(fèi)者分別從兩個分區(qū)接收消息。
如果消費(fèi)組有四個消費(fèi)者一屋,則每個消費(fèi)者可以分配到一個分區(qū)。
如果向消費(fèi)組中添加更多的消費(fèi)者滚躯,超過主題分區(qū)數(shù)量丧凤,則有一部分消費(fèi)者就會閑置仍侥,不會接收任 何消息。
向消費(fèi)組添加消費(fèi)者是橫向擴(kuò)展消費(fèi)能力的主要方式。必要時(shí)助泽,需要為主題創(chuàng)建大量分區(qū),在負(fù)載增長時(shí)可以加入更多的消費(fèi)者摄凡。但是不要讓消費(fèi)者的數(shù) 量超過主題分區(qū)的數(shù)量床绪。
除了通過增加消費(fèi)者來橫向擴(kuò)展單個應(yīng)用的消費(fèi)能力之外仰担,經(jīng)常出現(xiàn)多個應(yīng)用程序從同一個主題消費(fèi)的情況。此時(shí)练慕,每個應(yīng)用都可以獲取到所有的消息项鬼。只要保證每個應(yīng)用都有自己的消費(fèi)組路操,就可以讓它們獲取到主題所有的消息垫桂。橫向擴(kuò)展消費(fèi)者和消費(fèi)組不會對性能造成負(fù)面影響。為每個需要獲取一個或多個主題全部消息的應(yīng)用創(chuàng)建一個消費(fèi)組,然后向消費(fèi)組添加消費(fèi)者來橫向 擴(kuò)展消費(fèi)能力和應(yīng)用的處理能力,則每個消費(fèi)者只處理一部分消息辫秧。
心跳機(jī)制
消費(fèi)者宕機(jī)险毁,退出消費(fèi)組宽档,觸發(fā)再平衡,重新給消費(fèi)組中的消費(fèi)者分配分區(qū)璧诵。
由于broker宕機(jī),主題X的分區(qū)3宕機(jī),此時(shí)分區(qū)3沒有Leader副本闸迷,觸發(fā)再平衡腥沽,消費(fèi)者4沒有對 應(yīng)的主題分區(qū)逮走,則消費(fèi)者4閑置。
Kafka 的心跳是 Kafka Consumer 和 Broker 之間的健康檢查巡球,只有當(dāng) Broker Coordinator 正常 時(shí)言沐,Consumer 才會發(fā)送心跳。
Consumer 和 Rebalance 相關(guān)的 2 個配置參數(shù):
參數(shù) | 字段 |
---|---|
session.timeout.ms | MemberMetadata.sessionTimeoutMs |
max.poll.interval.ms | MemberMetadata.rebalanceTimeoutMs |
broker 端酣栈,sessionTimeoutMs 參數(shù)
broker 處理心跳的邏輯在 GroupCoordinator 類中:如果心跳超期, broker coordinator 會把消 費(fèi)者從 group 中移除汹押,并觸發(fā) rebalance矿筝。
/**
* Complete existing DelayedHeartbeats for the given member and schedule the next one
*/
private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) {
// complete current heartbeat expectation
member.latestHeartbeat = time.milliseconds()
val memberKey = MemberKey(member.groupId, member.memberId)
heartbeatPurgatory.checkAndComplete(memberKey)
// reschedule the next heartbeat expiration deadline
// 計(jì)算心跳截止時(shí)刻
val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs
val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs)
heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
}
// 心跳過期
def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
group.inLock {
if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {
info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
removeMemberAndUpdateGroup(group, member)
}
}
}
def onCompleteHeartbeat() {
// TODO: add metrics for complete heartbeats
}
def partitionFor(group: String): Int = groupManager.partitionFor(group)
private def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: Long) =
member.awaitingJoinCallback != null ||
member.awaitingSyncCallback != null ||
member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline
consumer 端:sessionTimeoutMs,rebalanceTimeoutMs 參數(shù)
如果客戶端發(fā)現(xiàn)心跳超期棚贾,客戶端會標(biāo)記 coordinator 為不可用窖维,并阻塞心跳線程;如果超過了 poll 消息的間隔超過了 rebalanceTimeoutMs,則 consumer 告知 broker 主動離開消費(fèi)組妙痹,也會觸發(fā) rebalance.
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatThread
if (coordinatorUnknown()) {
if (findCoordinatorFuture != null || lookupCoordinator().failed())
// the immediate future check ensures that we backoff properly in the case that no
// brokers are available to connect to.
AbstractCoordinator.this.wait(retryBackoffMs);
} else if (heartbeat.sessionTimeoutExpired(now)) {
// the session timeout has expired without seeing a successful heartbeat, so we should
// probably make sure the coordinator is still healthy.
markCoordinatorUnknown();
} else if (heartbeat.pollTimeoutExpired(now)) {
// the poll timeout has expired, which means that the foreground thread has stalled
// in between calls to poll(), so we explicitly leave the group.
maybeLeaveGroup();
} else if (!heartbeat.shouldHeartbeat(now)) {
// poll again after waiting for the retry backoff in case the heartbeat failed or the
// coordinator disconnected
AbstractCoordinator.this.wait(retryBackoffMs);
} else {
heartbeat.sentHeartbeat(now);
sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
synchronized (AbstractCoordinator.this) {
heartbeat.receiveHeartbeat(time.milliseconds());
}
}
@Override
public void onFailure(RuntimeException e) {
synchronized (AbstractCoordinator.this) {
if (e instanceof RebalanceInProgressException) {
// it is valid to continue heartbeating while the group is rebalancing. This
// ensures that the coordinator keeps the member in the group for as long
// as the duration of the rebalance timeout. If we stop sending heartbeats,
// however, then the session timeout may expire before we can rejoin.
heartbeat.receiveHeartbeat(time.milliseconds());
} else {
heartbeat.failHeartbeat();
// wake up the thread if it's sleeping to reschedule the heartbeat
AbstractCoordinator.this.notify();
}
}
}
});
}
消息接收
必要參數(shù)配置
參數(shù) | 說明 |
---|---|
bootstrap.servers | 向Kafka集群建立初始連接用到的host/port列表铸史。 客戶端會使用這里列出的所有服務(wù)器進(jìn)行集群其他服務(wù)器的發(fā)現(xiàn),而不管是否指定了哪個服務(wù)器用作引導(dǎo)怯伊。 這個列表僅影響用來發(fā)現(xiàn)集群所有服務(wù)器的初始主機(jī)琳轿。 字符串形式:host1:port1,host2:port2,... 由于這組服務(wù)器僅用于建立初始鏈接,然后發(fā)現(xiàn)集群中的所有服務(wù)器耿芹, 因此沒有必要將集群中的所有地址寫在這里崭篡。 一般最好兩臺,以防其中一臺宕掉吧秕。 |
key.deserializer | key的反序列化類琉闪,該類需要實(shí)現(xiàn) org.apache.kafka.common.serialization.Deserializer 接口。 |
value.deserializer | 實(shí)現(xiàn)了 org.apache.kafka.common .serialization.Deserializer 接口的反序列化器砸彬,用于對消息的value進(jìn)行反序列化颠毙。 |
client.id | 當(dāng)從服務(wù)器消費(fèi)消息的時(shí)候向服務(wù)器發(fā)送的id字符串。在ip/port基礎(chǔ)上提供應(yīng)用的邏輯名稱砂碉,記錄在服務(wù)端的請求日志中蛀蜜,用于追蹤請求的源。 |
group.id | 用于唯一標(biāo)志當(dāng)前消費(fèi)者所屬的消費(fèi)組的字符串绽淘。 如果消費(fèi)者使用組管理功能如subscribe(topic)或使用基于Kafka的偏移量管理策略涵防,該項(xiàng)必須設(shè)置。 |
auto.offset.reset | 當(dāng)Kafka中沒有初始偏移量或當(dāng)前偏移量在服務(wù)器中不存在(如,數(shù)據(jù)被 刪除了)壮池,該如何處理? <br />earliest:自動重置偏移量到最早的偏移量 <br />latest:自動重置偏移量為最新的偏移量 <br />none:如果消費(fèi)組原來的(previous)偏移量不存在偏瓤,則向消費(fèi)者拋異常<br />anything:向消費(fèi)者拋異常 |
enable.auto.commit | 如果設(shè)置為true,消費(fèi)者會自動周期性地向服務(wù)器提交偏移量椰憋。 |
訂閱
主題和分區(qū)
Topic厅克,Kafka用于分類管理消息的邏輯單元,類似與MySQL的數(shù)據(jù)庫橙依。
Partition证舟,是Kafka下數(shù)據(jù)存儲的基本單元,這個是物理上的概念窗骑。同一個topic的數(shù)據(jù)女责,會被分散的存儲到多個partition中,這些partition可以在同一臺機(jī)器上创译,也可以是在多臺機(jī)器 上抵知。優(yōu)勢在于:有利于水平擴(kuò)展,避免單臺機(jī)器在磁盤空間和性能上的限制软族,同時(shí)可以通過復(fù) 制來增加數(shù)據(jù)冗余性刷喜,提高容災(zāi)能力。為了做到均勻分布立砸,通常partition的數(shù)量通常是 Broker Server數(shù)量的整數(shù)倍掖疮。
-
Consumer Group,同樣是邏輯上的概念颗祝,是Kafka實(shí)現(xiàn)單播和廣播兩種消息模型的手段浊闪。 保證一個消費(fèi)組獲取到特定主題的全部的消息。在消費(fèi)組內(nèi)部吐葵,若干個消費(fèi)者消費(fèi)主題分區(qū)的 消息规揪,消費(fèi)組可以保證一個主題的每個分區(qū)只被消費(fèi)組中的一個消費(fèi)者消費(fèi)。
consumer 采用 pull 模式從 broker 中讀取數(shù)據(jù)温峭。采用 pull 模式猛铅,consumer 可自主控制消費(fèi)消息的速率, 可以自己控制消費(fèi)方式(批量消費(fèi)/逐條消費(fèi))凤藏,還可以選擇不同的提交方式從而實(shí)現(xiàn)不同的傳輸語義奸忽。 consumer.subscribe("tp_demo_01,tp_demo_02")
反序列化
Kafka的broker中所有的消息都是字節(jié)數(shù)組,消費(fèi)者獲取到消息之后揖庄,需要先對消息進(jìn)行反序列化處理栗菜,然后才能交給用戶程序消費(fèi)處理。 消費(fèi)者的反序列化器包括key的和value的反序列化器蹄梢。
- key.deserializer: 對
- value.deserializer
需要實(shí)現(xiàn) org.apache.kafka.common.serialization.Deserializer<T> 接口疙筹。消費(fèi)者從訂閱的主題拉取消息:
consumer.poll(3000); 在Fetcher類中,對拉取到的消息首先進(jìn)行反序列化處理。源碼如下:
private ConsumerRecord<K, V> parseRecord(TopicPartition partition, RecordBatch batch, Record record) {
try {
long offset = record.offset();
long timestamp = record.timestamp();
TimestampType timestampType = batch.timestampType();
Headers headers = new RecordHeaders(record.headers());
ByteBuffer keyBytes = record.key();
byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
// 反序列化key
K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
ByteBuffer valueBytes = record.value();
byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
// 反序列化value
V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
return new ConsumerRecord(partition.topic(), partition.partition(), offset, timestamp, timestampType, record.checksumOrNull(), keyByteArray == null ? -1 : keyByteArray.length, valueByteArray == null ? -1 : valueByteArray.length, key, value, headers);
} catch (RuntimeException var16) {
throw new SerializationException("Error deserializing key/value for partition " + partition + " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", var16);
}
}
默認(rèn)提供幾個發(fā)序列化的實(shí)現(xiàn):
- org.apache.kafka.common.serialization.ByteArrayDeserializer
- org.apache.kafka.common.serialization.ByteBufferDeserializer
- org.apache.kafka.common.serialization.BytesDeserializer
- org.apache.kafka.common.serialization.DoubleDeserializer
- org.apache.kafka.common.serialization.IntegerDeserializer
- org.apache.kafka.common.serialization.ShortDeserializer
- org.apache.kafka.common.serialization.StringDeserializer
- ……
等等一些系統(tǒng)自帶的反序列化類而咆,可以看org.apache.kafka.common.serialization.Deserializer接口的實(shí)現(xiàn)類
自定義反序列化
自定義反序列化類霍比,需要實(shí)現(xiàn) org.apache.kafka.common.serialization.Deserializer<T> 接口。
User:
package com.hhb.kafka.deserialize;
/**
* @description: 用戶自定義封裝消息的實(shí)體類
* @author:
* @date: 2020-08-13 20:01
**/
public class User {
private Integer userId;
private String userName;
public User() {
}
public User(Integer userId, String userName) {
this.userId = userId;
this.userName = userName;
}
public Integer getUserId() {
return userId;
}
public User setUserId(Integer userId) {
this.userId = userId;
return this;
}
public String getUserName() {
return userName;
}
public User setUserName(String userName) {
this.userName = userName;
return this;
}
@Override
public String toString() {
return "User{" +
"userId=" + userId +
", userName='" + userName + '\'' +
'}';
}
}
UserDeserializer:
package com.hhb.kafka.deserialize;
import org.apache.kafka.common.serialization.Deserializer;
import java.nio.ByteBuffer;
import java.util.Map;
/**
* @description:
* @author:
* @date: 2020-08-17 20:00
**/
public class UserDeserializer implements Deserializer<User> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public User deserialize(String s, byte[] bytes) {
//分配空間
ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
//把byte數(shù)據(jù)寫入到byteBuffer中暴备,只是游標(biāo)指向最后悠瞬,級bytes長度的位置
byteBuffer.put(bytes);
//將游標(biāo)指向最開始
byteBuffer.flip();
//獲取第一個int
int userId = byteBuffer.getInt();
// 獲取第二個int,即userName的長度
int length = byteBuffer.getInt();
// 生成userName
String userName = new String(bytes, 8, length);
return new User(userId, userName);
}
@Override
public void close() {
}
}
UserConsumer:
package com.hhb.kafka.deserialize;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* @description:
* @author:
* @date: 2020-08-17 19:57
**/
public class UserConsumer {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hhb:9092");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//設(shè)置自定義的反序列化器
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "user_consumer");
configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_id");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<Integer, User> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Collections.singleton("topic_user_1"));
ConsumerRecords<Integer, User> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> System.err.println(record.value() + " " + record.key()));
consumer.close();
}
}
位移提交
Consumer需要向Kafka記錄自己的位移數(shù)據(jù),這個匯報(bào)過程稱為提交位移(Committing offset)
Consumer 需要為分配給它的每個分區(qū)提交各自的位移數(shù)據(jù)
位移提交的由Consumer端負(fù)責(zé)的涯捻,Kafka只負(fù)責(zé)保管浅妆。__consumer_offsets
位移提交分為自動提交和手動提交
手動位移提交分為同步提交和異步提交
自動提交
Kafka Consumer 后臺提交
- 開啟自動提交:enable.auto.commit=true
- 配置自動提交間隔:Consumer端:auto.commit.interval.ms 默認(rèn)5s
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hhb:9092");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//設(shè)置自定義的反序列化器
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "user_consumer");
configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_id");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 設(shè)置偏移量為自動提交,默認(rèn)值
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 偏移量自動提交的時(shí)間間隔。模式值是5秒障癌,這里手動設(shè)置3秒
configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "3000");
KafkaConsumer<Integer, User> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Collections.singleton("topic_user_1"));
ConsumerRecords<Integer, User> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> System.err.println(record.value() + " " + record.key()));
consumer.close();
}
- 自動提交位移的順序
- 配置 enable.auto.commit = true
- Kafka會保證在開始調(diào)用poll方法時(shí)凌外,提交上次poll返回的所有消息
- 因此自動提交不會出現(xiàn)消息丟失,但會重復(fù)消費(fèi)
- 重復(fù)消費(fèi)舉例
- Consumer 每 5s 提交 offset
- 假設(shè)提交 offset 后的 3s 發(fā)生了 Rebalance
- Rebalance 之后的所有 Consumer 從上一次提交的 offset 處繼續(xù)消費(fèi)
- 因此 Rebalance 發(fā)生前 3s 的消息會被重復(fù)消費(fèi)
手動提交-同步提交
使用 KafkaConsumer#commitSync():會提交 KafkaConsumer#poll() 返回的最新 offset
該方法為同步操作涛浙,等待直到 offset 被成功提交才返回
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1)); process(records); // 處理消息
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handle(e); // 處理提交失敗異常
}
}
- commitSync 在處理完所有消息之后
- 手動同步提交可以控制offset提交的時(shí)機(jī)和頻率
- 手動同步提交會:
調(diào)用 commitSync 時(shí)趴乡,Consumer 處于阻塞狀態(tài),直到 Broker 返回結(jié)果
會影響 TPS
-
可以選擇拉長提交間隔蝗拿,但有以下問題:
- 會導(dǎo)致 Consumer 的提交頻率下降
- Consumer 重啟后,會有更多的消息被消費(fèi)
手動提交-異步提交
- KafkaConsumer#commitAsync()
while (true) {
ConsumerRecords<String, String> records = consumer.poll(3_000);
// 處理消息
process(records);
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
handle(exception);
} });
}
- commitAsync(異步提交)出現(xiàn)問題不會自動重試蒿涎,不會自動再次提交哀托。
- 處理方式:
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
// 處理消息
process(records);
// 使用異步提交規(guī)避阻塞
commitAysnc();
}
} catch (Exception e) {
handle(e); // 處理異常
} finally {
try {
consumer.commitSync(); // 最后一次提交使用同步阻塞式提
} finally {
consumer.close();
}
}
消費(fèi)者位移管理
Kafka中,消費(fèi)者根據(jù)消息的位移順序消費(fèi)消息劳秋。消費(fèi)者的位移由消費(fèi)者管理仓手,可以存儲于zookeeper中,也可以存儲于Kafka主題 __consumer_offsets中玻淑。Kafka提供了消費(fèi)者API嗽冒,讓消費(fèi)者可以管理自己的位移。
API如下:KafkaConsumer<K, V>
細(xì)節(jié) | 說明 |
---|---|
public void assign(Collection<TopicPartition> partitions) | 給當(dāng)前消費(fèi)者手動分配一系列主題分區(qū)补履。 手動分配分區(qū)不支持增量分配添坊,如果先前有分配分區(qū),則該操作會覆蓋之前的分配箫锤。 如果給出的主題分區(qū)是空的贬蛙,則等價(jià)于調(diào)用unsubscribe方法。 手動分配主題分區(qū)的方法不使用消費(fèi)組管理功能谚攒。當(dāng)消費(fèi)組成員變了阳准,或者集群或主題的 元數(shù)據(jù)改變了,不會觸發(fā)分區(qū)分配的再平衡馏臭。 手動分區(qū)分配assign(Collection)不能和自動分區(qū)分配subscribe(Collection, ConsumerRebalanceListener)一起使用野蝇。 如果啟用了自動提交偏移量,則在新的分區(qū)分配替換舊的分區(qū)分配之前,會對舊的分區(qū)分 配中的消費(fèi)偏移量進(jìn)行異步提交绕沈。 |
public Set<TopicPartition> assignment() | 獲取給當(dāng)前消費(fèi)者分配的分區(qū)集合锐想。如果訂閱是通過調(diào)用assign方法直接分配主題分區(qū), 則返回相同的集合七冲。如果使用了主題訂閱痛倚,該方法返回當(dāng)前分配給該消費(fèi)者的主題分區(qū)集 合。如果分區(qū)訂閱還沒開始進(jìn)行分區(qū)分配澜躺,或者正在重新分配分區(qū)蝉稳,則會返回none。 |
public Map<String, List<PartitionInfo>> listTopics() | 獲取對用戶授權(quán)的所有主題分區(qū)元數(shù)據(jù)掘鄙。該方法會對服務(wù)器發(fā)起遠(yuǎn)程調(diào)用耘戚。 |
public List<PartitionInfo> partitionsFor(String topic) | 獲取指定主題的分區(qū)元數(shù)據(jù)。如果當(dāng)前消費(fèi)者沒有關(guān)于該主題的元數(shù)據(jù)操漠,就會對服務(wù)器發(fā) 起遠(yuǎn)程調(diào)用收津。 |
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) | 對于給定的主題分區(qū),列出它們第一個消息的偏移量浊伙。 注意撞秋,如果指定的分區(qū)不存在涝滴,該方法可能會永遠(yuǎn)阻塞鬓长。 該方法不改變分區(qū)的當(dāng)前消費(fèi)者偏移量。 |
public void seekToEnd(Collection<TopicPartition> partitions) | 將偏移量移動到每個給定分區(qū)的最后一個呻右。 該方法延遲執(zhí)行哑子,只有當(dāng)調(diào)用過poll方法或position方法之后才可以使用舅列。 如果沒有指定分區(qū),則將當(dāng)前消費(fèi)者分配的所有分區(qū)的消費(fèi)者偏移量移動到最后卧蜓。 如果設(shè)置了隔離級別為:isolation.level=read_committed帐要,則會將分區(qū)的消費(fèi)偏移量移 動到最后一個穩(wěn)定的偏移量,即下一個要消費(fèi)的消息現(xiàn)在還是未提交狀態(tài)的事務(wù)消息弥奸。 |
public void seek(TopicPartition partition, long offset) | 將給定主題分區(qū)的消費(fèi)偏移量移動到指定的偏移量榨惠,即當(dāng)前消費(fèi)者下一條要消費(fèi)的消息偏 移量。 若該方法多次調(diào)用其爵,則最后一次的覆蓋前面的冒冬。 如果在消費(fèi)中間隨意使用,可能會丟失數(shù)據(jù)摩渺。 |
public long position(TopicPartition partition) | 檢查指定主題分區(qū)的消費(fèi)偏移量 |
public void seekToBeginning(Collection<TopicPartition> partitions) | 將給定每個分區(qū)的消費(fèi)者偏移量移動到它們的起始偏移量简烤。該方法懶執(zhí)行,只有當(dāng)調(diào)用過 poll方法或position方法之后才會執(zhí)行摇幻。如果沒有提供分區(qū)横侦,則將所有分配給當(dāng)前消費(fèi)者的 分區(qū)消費(fèi)偏移量移動到起始偏移量挥萌。 |
準(zhǔn)備數(shù)據(jù):
# 生成文件
for i in `seq 99`; do echo "hello world $i" >> nm.txt;done
## 查看所有的topic
kafka-topics.sh --zookeeper localhost:2181/myKafka --list
# 創(chuàng)建topic
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic tp_demo_01 --partitions 3 --replication-factor 1
# 灌入數(shù)據(jù)
kafka-console-producer.sh --broker-list localhost:9092 --topic tp_demo_01 < nm.txt
# 驗(yàn)證
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tp_demo_01 --from-beginning
API:
package com.hhb.kafka.offset;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
import java.util.function.BiConsumer;
/**
* @description:
* @author:
* @date: 2020-08-17 21:38
**/
public class MyOffsetManager {
public static void main(String[] args) {
Map<String, Object> map = new HashMap<>();
map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hhb:9092");
map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
map.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup1");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(map);
//給消費(fèi)者組里面的消費(fèi)者分配分區(qū),懶加載枉侧,只有調(diào)用poll方法的時(shí)候引瀑,才會真正的把分區(qū)分配給消費(fèi)者
// consumer.subscribe(Collections.singleton("tp_demo_01"));
//如何手動給消費(fèi)者分配分區(qū)
//1. 需要知道哪些主題可以訪問、消費(fèi)
//返回Map<主題榨馁,List<主題的分區(qū)>>
Map<String, List<PartitionInfo>> stringListMap = consumer.listTopics();
stringListMap.forEach((topicName, partitionInfos) -> {
System.err.println("當(dāng)前主題:" + topicName);
for (PartitionInfo partitionInfo : partitionInfos) {
System.err.println("分區(qū)信息: " + partitionInfo.toString());
}
System.err.println("============");
});
System.err.println("============");
//獲取給當(dāng)前消費(fèi)者分配的主題分區(qū)信息
Set<TopicPartition> assignment = consumer.assignment();
System.err.println("打印分配之前的信息:" + assignment);
System.err.println("============");
//給當(dāng)前消費(fèi)者分配主題和分區(qū)
consumer.assign(Arrays.asList(
new TopicPartition("tp_demo_01", 0),
new TopicPartition("tp_demo_01", 1),
new TopicPartition("tp_demo_01", 2)
));
//獲取給當(dāng)前消費(fèi)者分配的主題分區(qū)信息
Set<TopicPartition> assignment1 = consumer.assignment();
System.err.println("打印分配之后的信息:" + assignment1);
System.err.println("============");
//查看消費(fèi)者在tp_demo_01 主題 0號分區(qū)的偏移量
long offset = consumer.position(new TopicPartition("tp_demo_01", 0));
System.err.println("查看消費(fèi)者在tp_demo_01 主題 0號分區(qū)的位移:" + offset);
System.err.println("============");
//移動偏移量到開始的位置
consumer.seekToBeginning(Arrays.asList(
new TopicPartition("tp_demo_01", 0),
new TopicPartition("tp_demo_01", 2)
));
System.err.println("查看消費(fèi)者在tp_demo_01 主題 0號分區(qū)的位移:" + consumer.position(new TopicPartition("tp_demo_01", 0)));
System.err.println("查看消費(fèi)者在tp_demo_01 主題 2號分區(qū)的位移:" + consumer.position(new TopicPartition("tp_demo_01", 2)));
System.err.println("============");
//移動偏移量到末尾的位置
consumer.seekToEnd(Arrays.asList(
new TopicPartition("tp_demo_01", 0),
new TopicPartition("tp_demo_01", 2)
));
System.err.println("查看消費(fèi)者在tp_demo_01 主題 0號分區(qū)的位移:" + consumer.position(new TopicPartition("tp_demo_01", 0)));
System.err.println("查看消費(fèi)者在tp_demo_01 主題 2號分區(qū)的位移:" + consumer.position(new TopicPartition("tp_demo_01", 2)));
System.err.println("============");
//移動偏移量到具體位置:
consumer.seek(new TopicPartition("tp_demo_01", 0), 15);
System.err.println("查看消費(fèi)者在tp_demo_01 主題 0號分區(qū)的位移:" + consumer.position(new TopicPartition("tp_demo_01", 0)));
consumer.close();
}
}
再均衡
重平衡可以說是kafka為人詬病最多的一個點(diǎn)了憨栽。重平衡其實(shí)就是一個協(xié)議,它規(guī)定了如何讓消費(fèi)者組下的所有消費(fèi)者來分配topic中的每一個分區(qū)翼虫。 比如一個topic有100個分區(qū)屑柔,一個消費(fèi)者組內(nèi)有20個消費(fèi)者,在協(xié)調(diào)者的控制下讓組內(nèi)每一個消費(fèi)者分配到5個分區(qū)珍剑,這個分配的過程就是重平衡掸宛。重平衡的觸發(fā)條件主要有三個:
- 消費(fèi)者組內(nèi)成員發(fā)生變更,這個變更包括了增加和減少消費(fèi)者招拙,比如消費(fèi)者宕機(jī)退出消費(fèi)組唧瘾。
- 主題的分區(qū)數(shù)發(fā)生變更,kafka目前只支持增加分區(qū)别凤,當(dāng)增加的時(shí)候就會觸發(fā)重平衡
-
訂閱的主題發(fā)生變化饰序,當(dāng)消費(fèi)者組使用正則表達(dá)式訂閱主題,而恰好又新建了對應(yīng)的主題规哪,就會觸發(fā)重平衡
消費(fèi)者宕機(jī)菌羽,退出消費(fèi)組,觸發(fā)再平衡由缆,重新給消費(fèi)組中的消費(fèi)者分配分區(qū)。
由于broker宕機(jī)猾蒂,主題X的分區(qū)3宕機(jī)均唉,此時(shí)分區(qū)3沒有Leader副本,觸發(fā)再平衡肚菠,消費(fèi)者4沒有對 應(yīng)的主題分區(qū)舔箭,則消費(fèi)者4閑置。
主題增加分區(qū)蚊逢,需要主題分區(qū)和消費(fèi)組進(jìn)行再均衡层扶。
由于使用正則表達(dá)式訂閱主題,當(dāng)增加的主題匹配正則表達(dá)式的時(shí)候烙荷,也要進(jìn)行再均衡镜会。
為什么說重平衡為人詬病呢?因?yàn)橹仄胶膺^程中,消費(fèi)者無法從kafka消費(fèi)消息终抽,這對kafka的 TPS影響極大戳表,而如果kafka集內(nèi)節(jié)點(diǎn)較多桶至,比如數(shù)百個,那重平衡可能會耗時(shí)極多匾旭。數(shù)分鐘到數(shù)小時(shí) 都有可能镣屹,而這段時(shí)間kafka基本處于不可用狀態(tài)。所以在實(shí)際環(huán)境中价涝,應(yīng)該盡量避免重平衡發(fā)生女蜈。
避免重平衡
要說完全避免重平衡,是不可能色瘩,因?yàn)槟銦o法完全保證消費(fèi)者不會故障伪窖。而消費(fèi)者故障其實(shí)也是最 常見的引發(fā)重平衡的地方,所以我們需要保證盡力避免消費(fèi)者故障泞遗。而其他幾種觸發(fā)重平衡的方式惰许,增加分區(qū),或是增加訂閱的主題史辙,抑或是增加消費(fèi)者汹买,更多的是主動控制。如果消費(fèi)者真正掛掉了聊倔,就沒辦法了晦毙,但實(shí)際中,會有一些情況耙蔑,kafka錯誤地認(rèn)為一個正常的消費(fèi)者已經(jīng)掛掉了见妒,我們要的就是避免這樣的情況出現(xiàn)。首先要知道哪些情況會出現(xiàn)錯誤判斷掛掉的情況甸陌。 在分布式系統(tǒng)中须揣,通常是通過心跳來維持分布式系統(tǒng)的,kafka也不例外钱豁。
在分布式系統(tǒng)中耻卡,由于網(wǎng)絡(luò)問題你不清楚沒接收到心跳,是因?yàn)閷Ψ秸嬲龗炝诉€是只是因?yàn)樨?fù)載過重沒來得及發(fā)生心跳或是網(wǎng)絡(luò)堵塞牲尺。所以一般會約定一個時(shí)間卵酪,超時(shí)即判定對方掛了。而在kafka消費(fèi) 者場景中谤碳。session.timout.ms參數(shù)就是規(guī)定這個超時(shí)時(shí)間是多少溃卡。
還有一個參數(shù),heartbeat.interval.ms蜒简,這個參數(shù)控制發(fā)送心跳的頻率瘸羡,頻率越高越不容易被誤 判,但也會消耗更多資源搓茬。此外最铁,還有最后一個參數(shù)讯赏,max.poll.interval.ms,消費(fèi)者poll數(shù)據(jù)后冷尉,需要一些處理漱挎,再進(jìn)行拉 取。如果兩次拉取時(shí)間間隔超過這個參數(shù)設(shè)置的值雀哨,那么消費(fèi)者就會被踢出消費(fèi)者組磕谅。也就是說,拉 取雾棺,然后處理膊夹,這個處理的時(shí)間不能超過 max.poll.interval.ms 這個參數(shù)的值。這個參數(shù)的默認(rèn)值是 5分鐘捌浩,而如果消費(fèi)者接收到數(shù)據(jù)后會執(zhí)行耗時(shí)的操作放刨,則應(yīng)該將其設(shè)置得大一些。
三個參數(shù):
session.timout.ms 控制心跳超時(shí)時(shí)間尸饺,
heartbeat.interval.ms 控制心跳發(fā)送頻率进统,
max.poll.interval.ms 控制poll的間隔。
這里給出一個相對較為合理的配置浪听,如下:
- session.timout.ms: 設(shè)置為6s
- heartbeat.interval.ms: 設(shè)置2s
- max.poll.interval.ms: 推薦為消費(fèi)者處理消息最長耗時(shí)再加1分鐘
消費(fèi)者攔截器
消費(fèi)者在拉取了分區(qū)消息之后螟碎,要首先經(jīng)過反序列化器對key和value進(jìn)行反序列化處理。處理完之后迹栓,如果消費(fèi)端設(shè)置了攔截器掉分,則需要經(jīng)過攔截器的處理之后,才能返回給消費(fèi)者應(yīng)用程序進(jìn)行處理克伊。
消費(fèi)端定義消息攔截器酥郭,需要實(shí)現(xiàn)org.apache.kafka.clients.consumer.ConsumerInterceptor<K, V> 接口。
- 一個可插拔接口愿吹,允許攔截甚至更改消費(fèi)者接收到的消息褥民。首要的用例在于將第三方組件引入 消費(fèi)者應(yīng)用程序,用于定制的監(jiān)控洗搂、日志處理等。
- 該接口的實(shí)現(xiàn)類通過configre方法獲取消費(fèi)者配置的屬性载弄,如果消費(fèi)者配置中沒有指定 clientID耘拇,還可以獲取KafkaConsumer生成的clientId。獲取的這個配置是跟其他攔截器共享 的宇攻,需要保證不會在各個攔截器之間產(chǎn)生沖突惫叛。
- ConsumerInterceptor方法拋出的異常會被捕獲、記錄逞刷,但是不會向下傳播嘉涌。如果用戶配置 了錯誤的key或value類型參數(shù)妻熊,消費(fèi)者不會拋出異常,而僅僅是記錄下來仑最。
- ConsumerInterceptor回調(diào)發(fā)生在 org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)方法同一個線程扔役。
該接口中有如下方法:
package org.apache.kafka.clients.consumer;
import java.util.Map;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.TopicPartition;
public interface ConsumerInterceptor<K, V> extends Configurable {
// poll方法返回結(jié)果之前,最后要調(diào)用的方法
ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> var1);
// 消費(fèi)者提交偏移量的時(shí)候警医,經(jīng)過該方法
void onCommit(Map<TopicPartition, OffsetAndMetadata> var1);
// 用戶關(guān)閉該攔截器用到的資源
void close();
}
MyConsumer:
package com.hhb.kafka.interceptor.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* @description:
* @author:
* @date: 2020-08-18 19:25
**/
public class MyConsumer {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hhb:9092");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "mygrp");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
"com.hhb.kafka.interceptor.consumer.MyInterceptor1,com.hhb.kafka.interceptor.consumer.MyInterceptor2,com.hhb.kafka.interceptor.consumer.MyInterceptor3");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Collections.singleton("tp_demo_01"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(5000);
records.forEach(record -> {
System.err.println("消費(fèi)者:分區(qū):" + record.partition() +
"亿胸,主題:" + record.topic() +
",提交偏移量:" + record.offset() +
",key : " + record.key() +
",value: " + record.value());
});
}
}
}
MyInterceptor1:
package com.hhb.kafka.interceptor.consumer;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
/**
* @description:
* @author:
* @date: 2020-08-18 19:33
**/
public class MyInterceptor1 implements ConsumerInterceptor<String, String> {
/**
* poll方法返回結(jié)果之前,最后要調(diào)用的方法
*
* @param consumerRecords
* @return
*/
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> consumerRecords) {
System.err.println("1 -------- 開始");
//在這里消息不做處理预皇,直接返回
return consumerRecords;
}
/**
* 消費(fèi)者提交偏移量的時(shí)候侈玄,經(jīng)過該方法
*
* @param map
*/
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
System.err.println("1 ---------- 結(jié)束");
}
/**
* 用戶關(guān)閉該攔截器用到的資源
*/
@Override
public void close() {
}
/**
* 獲取消費(fèi)者的配置
*
* @param map
*/
@Override
public void configure(Map<String, ?> map) {
}
}
MyInterceptor2:
package com.hhb.kafka.interceptor.consumer;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
/**
* @description:
* @author:
* @date: 2020-08-18 19:33
**/
public class MyInterceptor2 implements ConsumerInterceptor<String, String> {
/**
* poll方法返回結(jié)果之前,最后要調(diào)用的方法
*
* @param consumerRecords
* @return
*/
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> consumerRecords) {
System.err.println("2 -------- 開始");
//在這里消息不做處理吟温,直接返回
return consumerRecords;
}
/**
* 消費(fèi)者提交偏移量的時(shí)候序仙,經(jīng)過該方法
*
* @param map
*/
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
System.err.println("2 ---------- 結(jié)束");
}
/**
* 用戶關(guān)閉該攔截器用到的資源
*/
@Override
public void close() {
}
/**
* 獲取消費(fèi)者的配置
*
* @param map
*/
@Override
public void configure(Map<String, ?> map) {
}
}
消費(fèi)者參數(shù)配置補(bǔ)充
配置型 | 說明 |
---|---|
bootstrap.servers | 建立到Kafka集群的初始連接用到的host/port列表。 客戶端會使用這里指定的所有的host/port來建立初始連接鲁豪。 這個配置僅會影響發(fā)現(xiàn)集群所有節(jié)點(diǎn)的初始連接潘悼。 形式:host1:port1,host2:port2... 這個配置中不需要包含集群中所有的節(jié)點(diǎn)信息。 最好不要配置一個呈昔,以免配置的這個節(jié)點(diǎn)宕機(jī)的時(shí)候連不上挥等。 |
group.id | 用于定義當(dāng)前消費(fèi)者所屬的消費(fèi)組的唯一字符串。 如果使用了消費(fèi)組的功能 subscribe(topic) 堤尾, 或使用了基于Kafka的偏移量管理機(jī)制肝劲,則應(yīng)該配置group.id。 |
auto.commit.interval.ms | 如果設(shè)置了 enable.auto.commit 的值為true郭宝, 則該值定義了消費(fèi)者偏移量向Kafka提交的頻率辞槐。 |
auto.offset.reset | 如果Kafka中沒有初始偏移量或當(dāng)前偏移量在服務(wù)器中不存在 (比如數(shù)據(jù)被刪掉了):<br />earliest:自動重置偏移量到最早的偏移量。<br />latest:自動重置偏移量到最后一個 <br />none:如果沒有找到該消費(fèi)組以前的偏移量沒有找到粘室,就拋異常<br />其他值:向消費(fèi)者拋異常榄檬。 |
fetch.min.bytes | 服務(wù)器對每個拉取消息的請求返回的數(shù)據(jù)量最小值。 如果數(shù)據(jù)量達(dá)不到這個值衔统,請求等待鹿榜,以讓更多的數(shù)據(jù)累積, 達(dá)到這個值之后響應(yīng)請求锦爵。 默認(rèn)設(shè)置是1個字節(jié)舱殿,表示只要有一個字節(jié)的數(shù)據(jù), 就立即響應(yīng)請求险掀,或者在沒有數(shù)據(jù)的時(shí)候請求超時(shí)沪袭。 將該值設(shè)置為大一點(diǎn)兒的數(shù)字,會讓服務(wù)器等待稍微長一點(diǎn)兒的時(shí)間以累積數(shù)據(jù)樟氢。 如此則可以提高服務(wù)器的吞吐量冈绊,代價(jià)是額外的延遲時(shí)間侠鳄。 |
fetch.max.wait.ms | 如果服務(wù)器端的數(shù)據(jù)量達(dá)不到 fetch.min.bytes 的話, 服務(wù)器端不能立即響應(yīng)請求死宣。 該時(shí)間用于配置服務(wù)器端阻塞請求的最大時(shí)長伟恶。 |
fetch.max.bytes | 服務(wù)器給單個拉取請求返回的最大數(shù)據(jù)量。 消費(fèi)者批量拉取消息十电,如果第一個非空消息批次的值比該值大知押, 消息批也會返回,以讓消費(fèi)者可以接著進(jìn)行鹃骂。 即該配置并不是絕對的最大值台盯。 broker可以接收的消息批最大值通過message.max.bytes (broker配置) 或 max.message.bytes (主題配置)來指定。 需要注意的是畏线,消費(fèi)者一般會并發(fā)拉取請求静盅。 |
enable.auto.commit | 如果設(shè)置為true,則消費(fèi)者的偏移量會周期性地在后臺提交寝殴。 |
connections.max.idle.ms | 在這個時(shí)間之后關(guān)閉空閑的連接蒿叠。 |
check.crcs | 自動計(jì)算被消費(fèi)的消息的CRC32校驗(yàn)值。 可以確保在傳輸過程中或磁盤存儲過程中消息沒有被破壞蚣常。 它會增加額外的負(fù)載市咽,在追求極致性能的場合禁用。 |
exclude.internal.topics | 是否內(nèi)部主題應(yīng)該暴露給消費(fèi)者抵蚊。如果該條目設(shè)置為true施绎, 則只能先訂閱再拉取。 |
isolation.level | 控制如何讀取事務(wù)消息贞绳。 如果設(shè)置了 read_committed 谷醉,消費(fèi)者的poll()方法只會返回已經(jīng)提交的事務(wù)消息。 如果設(shè)置了 read_uncommitted (默認(rèn)值)冈闭, 消費(fèi)者的poll方法返回所有的消息俱尼,即使是已經(jīng)取消的事務(wù)消息。 非事務(wù)消息以上兩種情況都返回萎攒。 消息總是以偏移量的順序返回遇八。read_committed 只能返回到達(dá)LSO的消息。 在LSO之后出現(xiàn)的消息只能等待相關(guān)的事務(wù)提交之后才能看到耍休。 結(jié)果刃永, read_committed 模式,如果有未提交的事務(wù)羹应, 消費(fèi)者不能讀取到直到HW的消息。read_committed 的seekToEnd方法返回LSO次屠。 |
heartbeat.interval.ms | 當(dāng)使用消費(fèi)組的時(shí)候园匹,該條目指定消費(fèi)者向消費(fèi)者協(xié)調(diào)器發(fā)送心跳的時(shí)間間隔雳刺。心跳是為了確保消費(fèi)者會話的活躍狀態(tài), 同時(shí)在消費(fèi)者加入或離開消費(fèi)組的時(shí)候方便進(jìn)行再平衡裸违。該條目的值必須小于 session.timeout.ms 掖桦,也不應(yīng)該高于 session.timeout.ms 的1/3。 |
session.timeout.ms | 當(dāng)使用Kafka的消費(fèi)組的時(shí)候供汛,消費(fèi)者周期性地向broker發(fā)送心 表明自己的存在枪汪。 如果經(jīng)過該超時(shí)時(shí)間還沒有收到消費(fèi)者的心跳, 則broker將消費(fèi)者從消費(fèi)組移除怔昨,并啟動再平衡雀久。 該值必須在broker配置 group.min.session.timeout.ms 和roup.max.session.timeout.ms 之間。 |
max.poll.records | 一次調(diào)用poll()方法返回的記錄最大數(shù)量趁舀。 |
max.poll.interval.ms | 使用消費(fèi)組的時(shí)候調(diào)用poll()方法的時(shí)間間隔赖捌。 該條目指定了消費(fèi)者調(diào)用poll()方法的最大時(shí)間間隔。 如果在此時(shí)間內(nèi)消費(fèi)者沒有調(diào)用poll()方法矮烹, 則broker認(rèn)為消費(fèi)者失敗越庇,觸發(fā)再平衡, 將分區(qū)分配給消費(fèi)組中其他消費(fèi)者奉狈。 |
max.partition.fetch.bytes | 對每個分區(qū)卤唉,服務(wù)器返回的最大數(shù)量。消費(fèi)者按批次拉取數(shù)據(jù)仁期。 如果非空分區(qū)的第一個記錄大于這個值桑驱,批處理依然可以返回, 以保證消費(fèi)者可以進(jìn)行下去蟀拷。 broker接收批的大小由 message.max.bytes (broker參數(shù))或max.message.bytes (主題參數(shù))指定碰纬。 fetch.max.bytes 用于限制消費(fèi)者單次請求的數(shù)據(jù)量。 |
send.buffer.bytes | 用于TCP發(fā)送數(shù)據(jù)時(shí)使用的緩沖大小(SO_SNDBUF)问芬, -1表示使用OS默認(rèn)的緩沖區(qū)大小悦析。 |
retry.backoff.ms | 在發(fā)生失敗的時(shí)候如果需要重試,則該配置表示客戶端等待多長時(shí)間再發(fā)起重試此衅。 該時(shí)間的存在避免了密集循環(huán)强戴。 |
request.timeout.ms | 客戶端等待服務(wù)端響應(yīng)的最大時(shí)間。如果該時(shí)間超時(shí)挡鞍, 則客戶端要么重新發(fā)起請求骑歹,要么如果重試耗盡,請求失敗墨微。 |
reconnect.backoff.ms | 重新連接主機(jī)的等待時(shí)間道媚。避免了重連的密集循環(huán)。 該等待時(shí)間應(yīng)用于該客戶端到broker的所有連接。 |
reconnect.backoff.max.ms | 重新連接到反復(fù)連接失敗的broker時(shí)要等待的最長時(shí)間 (以毫秒為單位)最域。 如果提供此選項(xiàng)谴分,則對于每個連續(xù)的連接失敗, 每臺主機(jī)的退避將成倍增加镀脂,直至達(dá)到此最大值牺蹄。 在計(jì)算退避增量之后,添加20%的隨機(jī)抖動以避免連接風(fēng)暴薄翅。 |
receive.buffer.bytes | TCP連接接收數(shù)據(jù)的緩存(SO_RCVBUF)沙兰。 -1表示使用操作系統(tǒng)的默認(rèn)值。 |
partition.assignment.strategy | 當(dāng)使用消費(fèi)組的時(shí)候翘魄,分區(qū)分配策略的類名鼎天。 |
metrics.sample.window.ms | 計(jì)算指標(biāo)樣本的時(shí)間窗口。 |
metrics.recording.level | 指標(biāo)的最高記錄級別熟丸。 |
metrics.num.samples | 用于計(jì)算指標(biāo)而維護(hù)的樣本數(shù)量 |
interceptor.classes | 攔截器類的列表训措。默認(rèn)沒有攔截器 攔截器是消費(fèi)者的攔截器,該攔截器需要實(shí)現(xiàn)org.apache.kafka.clients.consumer .ConsumerInterceptor接口光羞。攔截器可用于對消費(fèi)者接收到的消息進(jìn)行攔截處理绩鸣。 |
消費(fèi)組管理
什么是消費(fèi)組
consumer group是kafka提供的可擴(kuò)展且具有容錯性的消費(fèi)者機(jī)制。
三個特性:
- 消費(fèi)組有一個或多個消費(fèi)者纱兑,消費(fèi)者可以是一個進(jìn)程呀闻,也可以是一個線程
- group.id是一個字符串,唯一標(biāo)識一個消費(fèi)組
- 消費(fèi)組訂閱的主題每個分區(qū)只能分配給消費(fèi)組一個消費(fèi)者潜慎。
消費(fèi)者位移(consumer position)
消費(fèi)者在消費(fèi)的過程中記錄已消費(fèi)的數(shù)據(jù)捡多,即消費(fèi)位移(offset)信息。每個消費(fèi)組保存自己的位移信息铐炫,那么只需要簡單的一個整數(shù)表示位置就夠了;同時(shí)可以引入 checkpoint機(jī)制定期持久化垒手。
位移管理
自動VS手動
Kafka默認(rèn)定期自動提交位移( enable.auto.commit = true ),也手動提交位移倒信。另外kafka會定 期把group消費(fèi)情況保存起來科贬,做成一個offset map,如下圖所示:
位移提交
位移是提交到Kafka中的 _consumer_offsets 主題鳖悠。 _consumer_offsets 中的消息保存了每個消費(fèi)組某一時(shí)刻提交的offset信息榜掌。
kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" -- consumer.config /mnt/module/kafka_2.12-1.0.2/config/consumer.properties --from- beginning | head
__consumers_offsets 主題配置了compact策略,使得它總是能夠保存最新的位移信息乘综,既控制 了該topic總體的日志容量憎账,也能實(shí)現(xiàn)保存最新offset的目的。
什么是再均衡
再均衡(Rebalance)本質(zhì)上是一種協(xié)議卡辰,規(guī)定了一個消費(fèi)組中所有消費(fèi)者如何達(dá)成一致來分配訂 閱主題的每個分區(qū)胞皱。比如某個消費(fèi)組有20個消費(fèi)組邪意,訂閱了一個具有100個分區(qū)的主題。正常情況下反砌,Kafka平均會為每個消費(fèi)者分配5個分區(qū)抄罕。這個分配的過程就叫再均衡。
什么時(shí)候再均衡
再均衡的觸發(fā)條件:
- 組成員發(fā)生變更(新消費(fèi)者加入消費(fèi)組組于颖、已有消費(fèi)者主動離開或崩潰了)
- 訂閱主題數(shù)發(fā)生變更。如果正則表達(dá)式進(jìn)行訂閱嚷兔,則新建匹配正則表達(dá)式的主題觸發(fā)再均衡森渐。
- 訂閱主題的分區(qū)數(shù)發(fā)生變更
如何進(jìn)行組內(nèi)分區(qū)分配
三種分配策略:RangeAssignor和RoundRobinAssignor以及StickyAssignor。后面講冒晰。
誰來執(zhí)行再均衡和消費(fèi)組管理
Kafka提供了一個角色:Group Coordinator來執(zhí)行對于消費(fèi)組的管理同衣。Group Coordinator:每個消費(fèi)組分配一個消費(fèi)組協(xié)調(diào)器用于組管理和位移管理。當(dāng)消費(fèi)組的第一個消費(fèi)者啟動的時(shí)候壶运,它會去和Kafka Broker確定誰是它們組的組協(xié)調(diào)器耐齐。之后該消費(fèi)組內(nèi)所有消費(fèi)者和該組協(xié)調(diào)器協(xié)調(diào)通信。
如何確定Coordinator
兩步:
- 確定消費(fèi)組位移信息寫入 __consumers_offsets 的哪個分區(qū)蒋情。具體計(jì)算公式:
- _consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount) 注意:groupMetadataTopicPartitionCount 由 offsets.topic.num.partitions 指定埠况,默認(rèn)是50個分區(qū)。
- 該分區(qū)leader所在的broker就是組協(xié)調(diào)器棵癣。
Rebalance Generation
它表示Rebalance之后主題分區(qū)到消費(fèi)組中消費(fèi)者映射關(guān)系的一個版本辕翰,主要是用于保護(hù)消費(fèi)組, 隔離無效偏移量提交的狈谊。如上一個版本的消費(fèi)者無法提交位移到新版本的消費(fèi)組中喜命,因?yàn)橛成潢P(guān)系變 了敦迄,你消費(fèi)的或許已經(jīng)不是原來的那個分區(qū)了氛驮。每次group進(jìn)行Rebalance之后法焰,Generation號都會加 1靶壮,表示消費(fèi)組和分區(qū)的映射關(guān)系到了一個新版本羹铅,如下圖所示: Generation 1時(shí)group有3個成員榜揖,隨 后成員2退出組郎楼,消費(fèi)組協(xié)調(diào)器觸發(fā)Rebalance死嗦,消費(fèi)組進(jìn)入Generation 2煎娇,之后成員4加入二庵,再次觸發(fā) Rebalance,消費(fèi)組進(jìn)入Generation 3.
協(xié)議(protocol)
kafka提供了5個協(xié)議來處理與消費(fèi)組協(xié)調(diào)相關(guān)的問題:
- Heartbeat請求:consumer需要定期給組協(xié)調(diào)器發(fā)送心跳來表明自己還活著
- LeaveGroup請求:主動告訴組協(xié)調(diào)器我要離開消費(fèi)組
- SyncGroup請求:消費(fèi)組Leader把分配方案告訴組內(nèi)所有成員
- JoinGroup請求:成員請求加入組
- DescribeGroup請求:顯示組的所有信息缓呛,包括成員信息催享,協(xié)議名稱,分配方案哟绊,訂閱信息 等因妙。通常該請求是給管理員使用
組協(xié)調(diào)器在再均衡的時(shí)候主要用到了前面4種請求。
liveness
消費(fèi)者如何向消費(fèi)組協(xié)調(diào)器證明自己還活著? 通過定時(shí)向消費(fèi)組協(xié)調(diào)器發(fā)送Heartbeat請求。如果 超過了設(shè)定的超時(shí)時(shí)間攀涵,那么協(xié)調(diào)器認(rèn)為該消費(fèi)者已經(jīng)掛了铣耘。一旦協(xié)調(diào)器認(rèn)為某個消費(fèi)者掛了,那么它 就會開啟新一輪再均衡以故,并且在當(dāng)前其他消費(fèi)者的心跳響應(yīng)中添加“REBALANCE_IN_PROGRESS”蜗细,告訴 其他消費(fèi)者:重新分配分區(qū)。
再均衡過程
再均衡分為2步:Join和Sync
- Join怒详, 加入組炉媒。所有成員都向消費(fèi)組協(xié)調(diào)器發(fā)送JoinGroup請求,請求加入消費(fèi)組昆烁。一旦所有 成員都發(fā)送了JoinGroup請求吊骤,協(xié)調(diào)i器從中選擇一個消費(fèi)者擔(dān)任Leader的角色,并把組成員 信息以及訂閱信息發(fā)給Leader静尼。
-
Sync白粉,Leader開始分配消費(fèi)方案,即哪個消費(fèi)者負(fù)責(zé)消費(fèi)哪些主題的哪些分區(qū)鼠渺。一旦完成分配鸭巴,Leader會將這個方案封裝進(jìn)SyncGroup請求中發(fā)給消費(fèi)組協(xié)調(diào)器,非Leader也會發(fā) SyncGroup請求拦盹,只是內(nèi)容為空奕扣。消費(fèi)組協(xié)調(diào)器接收到分配方案之后會把方案塞進(jìn) SyncGroup的response中發(fā)給各個消費(fèi)者。
注意:在協(xié)調(diào)器收集到所有成員請求前掌敬,它會把已收到請求放入一個叫purgatory(煉獄)的地方惯豆。然 后是分發(fā)分配方案的過程,即SyncGroup請求:
注意:消費(fèi)組的分區(qū)分配方案在客戶端執(zhí)行奔害。Kafka交給客戶端可以有更好的靈活性楷兽。Kafka默認(rèn)提供三種分配策略:range和round-robin和sticky』伲可以通過消費(fèi)者的參數(shù):partition.assignment.strategy 來實(shí)現(xiàn)自己分配策略芯杀。
消費(fèi)組狀態(tài)機(jī)
消費(fèi)組組協(xié)調(diào)器根據(jù)狀態(tài)機(jī)對消費(fèi)組做不同的處理:
說明:
- Dead:組內(nèi)已經(jīng)沒有任何成員的最終狀態(tài),組的元數(shù)據(jù)也已經(jīng)被組協(xié)調(diào)器移除了雅潭。這種狀態(tài) 響應(yīng)各種請求都是一個response: UNKNOWN_MEMBER_ID
- Empty:組內(nèi)無成員揭厚,但是位移信息還沒有過期。這種狀態(tài)只能響應(yīng)JoinGroup請求
- PreparingRebalance:組準(zhǔn)備開啟新的rebalance扶供,等待成員加入
- AwaitingSync:正在等待leader consumer將分配方案傳給各個成員
- Stable:再均衡完成筛圆,可以開始消費(fèi)。