Kafka高級特性解析(一)

Kafka高級特性解析(一)

生產(chǎn)者

消息發(fā)送

數(shù)據(jù)生產(chǎn)流程解析
數(shù)據(jù)生產(chǎn)流程解析.png
  1. Producer創(chuàng)建時(shí)闹瞧,會創(chuàng)建一個Sender線程并設(shè)置為守護(hù)線程。
  2. 生產(chǎn)消息時(shí),內(nèi)部其實(shí)是異步流程;生產(chǎn)的消息先經(jīng)過攔截器->序列化器->分區(qū)器铅鲤,然后將消息緩存在緩沖區(qū)(該緩沖區(qū)也是在Producer創(chuàng)建時(shí)創(chuàng)建)划提。
  3. 批次發(fā)送的條件為:緩沖區(qū)數(shù)據(jù)大小達(dá)到batch.size或者linger.ms達(dá)到上限,哪個先達(dá)到就算哪個邢享。
  4. 批次發(fā)送后鹏往,發(fā)往指定分區(qū),然后落盤到broker;如果生產(chǎn)者配置了retrires參數(shù)大于0并且失敗原因允許重試骇塘,那么客戶端內(nèi)部會對該消息進(jìn)行重試掸犬。
  5. 落盤到broker成功,返回生產(chǎn)元數(shù)據(jù)給生產(chǎn)者绪爸。
  6. 元數(shù)據(jù)返回有兩種方式:一種是通過阻塞直接返回湾碎,另一種是通過回調(diào)返回。
必要參數(shù)配置
  • broker配置

    • 配置條目的使用方法

        Map<String, Object> configs = new HashMap<>();
        //初始鏈接
        configs.put("bootstrap.servers", "59.110.241.53:9092");
        //key的序列化類
        configs.put("key.serializer", IntegerSerializer.class);
        //value的序列化類
        configs.put("value.serializer", StringSerializer.class);
      
    • 配置參數(shù)

屬性 說明 重要性
bootstrap.servers 生產(chǎn)者客戶端與broker集群建立初始連接需要的broker地址列表奠货, 由該初始連接發(fā)現(xiàn)Kafka集群中其他的所有broker介褥。該地址列表不需 要寫全部的Kafka集群中broker的地址,但也不要寫一個递惋,以防該節(jié) 點(diǎn)宕機(jī)的時(shí)候不可用柔滔。形式為: host1:port1,host2:port2,... . high
key.serializer 實(shí)現(xiàn)了接口 org.apache.kafka.common.serialization.Serializer 的key序列化類。 high
value.serializer 實(shí)現(xiàn)了接口 org.apache.kafka.common.serialization.Serializer 的value序列化類萍虽。 high
acks 該選項(xiàng)控制著已發(fā)送消息的持久性睛廊。 acks=0 :生產(chǎn)者不等待broker的任何消息確認(rèn)。只要將消息放到了socket的緩沖區(qū)杉编,就認(rèn)為消息已發(fā)送超全。不能保證服務(wù)器是否收到該 消息, retries 設(shè)置也不起作用邓馒,因?yàn)榭蛻舳瞬魂P(guān)心消息是否發(fā)送 失敗嘶朱。客戶端收到的消息偏移量永遠(yuǎn)是-1光酣。<br />acks=1 :leader將記錄寫到它本地日志疏遏,就響應(yīng)客戶端確認(rèn)消息, 而不等待follower副本的確認(rèn)救军。如果leader確認(rèn)了消息就宕機(jī)财异,則可 能會丟失消息,因?yàn)閒ollower副本可能還沒來得及同步該消息唱遭。<br />acks=all :leader等待所有同步的副本確認(rèn)該消息戳寸。保證了只要有 一個同步副本存在,消息就不會丟失胆萧。這是最強(qiáng)的可用性保證再榄。等 價(jià)于acks=-1。默認(rèn)值為1鲸阔,字符串菲嘴。可選值:[all, -1, 0, 1] high
compression.type 生產(chǎn)者生成數(shù)據(jù)的壓縮格式蚌吸。默認(rèn)是none(沒有壓縮)锈拨。允許的值:none,gzip ,snappy 和 lz4 。壓縮是對整個消息批次來講 的羹唠。消息批的效率也影響壓縮的比例奕枢。消息批越大,壓縮效率越 好佩微。字符串類型的值缝彬。默認(rèn)是none。 high
retries 設(shè)置該屬性為一個大于1的值哺眯,將在消息發(fā)送失敗的時(shí)候重新發(fā)送消 息谷浅。該重試與客戶端收到異常重新發(fā)送并無二至。允許重試但是不 設(shè)置 max.in.flight.requests.per.connection 為1奶卓,存在消息 亂序的可能一疯,因?yàn)槿绻麅蓚€批次發(fā)送到同一個分區(qū),第一個失敗了 重試夺姑,第二個成功了墩邀,則第一個消息批在第二個消息批后。int類型 的值盏浙,默認(rèn):0眉睹,可選值:[0,...,2147483647] high
序列化器
序列化器.png

由于Kafka中的數(shù)據(jù)都是字節(jié)數(shù)組,在將消息發(fā)送到Kafka之前需要先將數(shù)據(jù)序列化為字節(jié)數(shù)組废膘。 序列化器的作用就是用于序列化要發(fā)送的消息的辣往。Kafka使用 org.apache.kafka.common.serialization.Serializer 接口用于定義序列化器,將 泛型指定類型的數(shù)據(jù)轉(zhuǎn)換為字節(jié)數(shù)組殖卑。

package org.apache.kafka.common.serialization;

import java.io.Closeable;
import java.util.Map;

/**
* 將對象轉(zhuǎn)換為byte數(shù)組的接口
*
* 該接口的實(shí)現(xiàn)類需要提供無參構(gòu)造器 * @param <T> 從哪個類型轉(zhuǎn)換
*/
public interface Serializer<T> extends Closeable {
  
    /**
    * 類的配置信息
    * @param configs key/value pairs
    * @param isKey key的序列化還是value的序列化 
    */
    void configure(Map<String, ?> var1, boolean var2);
  
    /**
    *
    * 將對象轉(zhuǎn)換為字節(jié)數(shù)組 
    * @param topic 主題名稱
    * @param data 需要轉(zhuǎn)換的對象
    * @return 序列化的字節(jié)數(shù)組 
    */
    byte[] serialize(String var1, T var2);

    /**
    * 關(guān)閉序列化器
    * 該方法需要提供冪等性站削,因?yàn)榭赡苷{(diào)用多次。 
    */
    void close();
}

系統(tǒng)提供了該接口的子接口以及實(shí)現(xiàn)類

org.apache.kafka.common.serialization.ByteArraySerializer

package org.apache.kafka.common.serialization;

import java.util.Map;

public class ByteArraySerializer implements Serializer<byte[]> {
  
    public void configure(Map<String, ?> configs, boolean isKey) {
    }
  
    public byte[] serialize(String topic, byte[] data) {
        return data;
    }
    public void close() {
    }
}

org.apache.kafka.common.serialization.ByteBufferSerializer

public class ByteBufferSerializer implements Serializer<ByteBuffer> {
    public ByteBufferSerializer() {
    }

    public void configure(Map<String, ?> configs, boolean isKey) {
    }
    public byte[] serialize(String topic, ByteBuffer data) {
        if (data == null) {
            return null;
        } else {
            data.rewind();
            byte[] arr;
            if (data.hasArray()) {
                arr = data.array();
                if (data.arrayOffset() == 0 && arr.length == data.remaining()) {
                    return arr;
                }
            }

            arr = new byte[data.remaining()];
            data.get(arr, 0, arr.length);
            data.rewind();
            return arr;
        }
    }
    public void close() {
    }
}

org.apache.kafka.common.serialization.BytesSerializer:

public class BytesSerializer implements Serializer<Bytes> {
    public BytesSerializer() {
    }

    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    public byte[] serialize(String topic, Bytes data) {
        return data == null ? null : data.get();
    }

    public void close() {
    }
}

org.apache.kafka.common.serialization.DoubleSerializer

public class DoubleSerializer implements Serializer<Double> {
    public DoubleSerializer() {
    }

    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    public byte[] serialize(String topic, Double data) {
        if (data == null) {
            return null;
        } else {
            long bits = Double.doubleToLongBits(data);
            return new byte[]{(byte)((int)(bits >>> 56)), (byte)((int)(bits >>> 48)), (byte)((int)(bits >>> 40)), (byte)((int)(bits >>> 32)), (byte)((int)(bits >>> 24)), (byte)((int)(bits >>> 16)), (byte)((int)(bits >>> 8)), (byte)((int)bits)};
        }
    }
    public void close() {
    }
}

org.apache.kafka.common.serialization.FloatSerializer

package org.apache.kafka.common.serialization;

import java.util.Map;

public class FloatSerializer implements Serializer<Float> {
    public FloatSerializer() {
    }

    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    public byte[] serialize(String topic, Float data) {
        if (data == null) {
            return null;
        } else {
            long bits = (long)Float.floatToRawIntBits(data);
            return new byte[]{(byte)((int)(bits >>> 24)), (byte)((int)(bits >>> 16)), (byte)((int)(bits >>> 8)), (byte)((int)bits)};
        }
    }

    public void close() {
    }
}

org.apache.kafka.common.serialization.IntegerSerializer

public class IntegerSerializer implements Serializer<Integer> {
    public IntegerSerializer() {
    }

    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    public byte[] serialize(String topic, Integer data) {
        return data == null ? null : new byte[]{(byte)(data >>> 24), (byte)(data >>> 16), (byte)(data >>> 8), data.byteValue()};
    }

    public void close() {
    }
}

org.apache.kafka.common.serialization.StringSerializer

public class StringSerializer implements Serializer<String> {
    private String encoding = "UTF8";

    public StringSerializer() {
    }

    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null) {
            encodingValue = configs.get("serializer.encoding");
        }

        if (encodingValue != null && encodingValue instanceof String) {
            this.encoding = (String)encodingValue;
        }

    }

    public byte[] serialize(String topic, String data) {
        try {
            return data == null ? null : data.getBytes(this.encoding);
        } catch (UnsupportedEncodingException var4) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + this.encoding);
        }
    }

    public void close() {
    }
}

org.apache.kafka.common.serialization.LongSerializer

public class LongSerializer implements Serializer<Long> {
    public LongSerializer() {
    }

    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    public byte[] serialize(String topic, Long data) {
        return data == null ? null : new byte[]{(byte)((int)(data >>> 56)), (byte)((int)(data >>> 48)), (byte)((int)(data >>> 40)), (byte)((int)(data >>> 32)), (byte)((int)(data >>> 24)), (byte)((int)(data >>> 16)), (byte)((int)(data >>> 8)), data.byteValue()};
    }

    public void close() {
    }
}

org.apache.kafka.common.serialization.ShortSerializer

public class ShortSerializer implements Serializer<Short> {
    public ShortSerializer() {
    }

    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    public byte[] serialize(String topic, Short data) {
        return data == null ? null : new byte[]{(byte)(data >>> 8), data.byteValue()};
    }

    public void close() {
    }
}
自定義序列化器

數(shù)據(jù)的序列化一般生產(chǎn)中使用avro孵稽。自定義序列化器需要實(shí)現(xiàn)org.apache.kafka.common.serialization.Serializer<T>接口许起,并實(shí)現(xiàn)其中的 serialize 方法。

實(shí)體:

package com.hhb.kafka.demo2;

/**
 * @description: 用戶自定義封裝消息的實(shí)體類
 * @author: 
 * @date: 2020-08-13 20:01
 **/
public class User {

    private Integer userId;

    private String userName;

    public Integer getUserId() {
        return userId;
    }

    public User setUserId(Integer userId) {
        this.userId = userId;
        return this;
    }

    public String getUserName() {
        return userName;
    }

    public User setUserName(String userName) {
        this.userName = userName;
        return this;
    }

    @Override
    public String toString() {
        return "User{" +
                "userId=" + userId +
                ", userName='" + userName + '\'' +
                '}';
    }
}

自定義序列化器

package com.hhb.kafka.demo2;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;

/**
 * @description:
 * @author: 
 * @date: 2020-08-13 20:05
 **/
public class UserSerializer implements Serializer<User> {

    /**
     * 用戶接收對序列化器的配置參數(shù)菩鲜,并對當(dāng)前序列化器進(jìn)行配置和初始化的
     *
     * @param map
     * @param b
     */
    @Override
    public void configure(Map<String, ?> map, boolean b) {
        //do nothing
    }

    /**
     * 將User的數(shù)據(jù)轉(zhuǎn)化成字節(jié)數(shù)組
     *
     * @param s
     * @param user
     * @return
     */
    @Override
    public byte[] serialize(String s, User user) {


        if (user == null) {
            return null;
        }

        Integer userId = user.getUserId();
        String userName = user.getUserName();
        try {
            if (userId != null && userName != null) {
                byte[] bytes = userName.getBytes("UTF-8");
                int length = bytes.length;
                //申請一塊內(nèi)存园细,存放數(shù)據(jù)
                //第一個4個字節(jié),用于存儲userId的值
                //第二個4個字節(jié)接校,用于存放userName字節(jié)數(shù)組的長度int值
                //第三個長度猛频,用于存放userName序列化之后的字節(jié)數(shù)組
                ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4 + length);
                //設(shè)置userId
                byteBuffer.putInt(userId);
                //設(shè)置長度
                byteBuffer.putInt(length);
                //設(shè)置序列化后的userName
                byteBuffer.put(bytes);
                //返回
                return byteBuffer.array();
            }
        } catch (Exception e) {
            throw new SerializationException("序列化對象:User 異常");
        }
        return null;
    }

    /**
     * 用戶關(guān)閉資源等操作狮崩,需要冪等
     */
    @Override
    public void close() {
        //do nothing
    }
}

生產(chǎn)者

package com.hhb.kafka.demo2;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;

/**
 * @description:
 * @author: 
 * @date: 2020-08-13 20:21
 **/
public class MyProducer {


    public static void main(String[] args) {

        Map<String, Object> map = new HashMap<>();
        map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hhb:9092");
        map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //設(shè)置自定義的序列化器
        map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class);
        KafkaProducer<String, User> producer = new KafkaProducer<String, User>(map);
        User user = new User();
//        user.setUserId(113).setUserName("李四");
        user.setUserId(113).setUserName("張三");
        ProducerRecord<String, User> record = new ProducerRecord<String, User>(
                "topic_user_1", //主題
                user.getUserName(), //key
                user // value

        );
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e != null) {
                    System.err.println("發(fā)送失敗");
                }
                System.err.println("輸出分區(qū)信息:" + recordMetadata.partition());
                System.err.println("輸出主題信息:" + recordMetadata.topic());
                System.err.println("輸出偏移量信息:" + recordMetadata.offset());
            }
        });

        producer.close();

    }
}
分區(qū)器
分區(qū)器.png

默認(rèn)(DefaultPartitioner)分區(qū)計(jì)算:

  1. 如果record提供了分區(qū)號,則使用record提供的分區(qū)號
  2. 如果record沒有提供分區(qū)號鹿寻,則使用key的序列化后的值的hash值對分區(qū)數(shù)量取模
  3. 如果record沒有提供分區(qū)號睦柴,也沒有提供key,則使用輪詢的方式分配分區(qū)號毡熏。
    • 會首先在可用的分區(qū)中分配分區(qū)號
    • 如果沒有可用的分區(qū)坦敌,則在該主題所有分區(qū)中分配分區(qū)號。

默認(rèn)的分區(qū)器實(shí)現(xiàn)Partitioner接口

public class DefaultPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();

    public DefaultPartitioner() {
    }

    public void configure(Map<String, ?> configs) {
    }
    
    /**
    * 為指定的消息記錄計(jì)算分區(qū)值 *
    * @param topic 主題名稱
    * @param key 根據(jù)該key的值進(jìn)行分區(qū)計(jì)算痢法,如果沒有則為null狱窘。
    * @param keyBytes key的序列化字節(jié)數(shù)組,根據(jù)該數(shù)組進(jìn)行分區(qū)計(jì)算财搁。如果沒有key蘸炸,則為
    null
    * @param value 根據(jù)value值進(jìn)行分區(qū)計(jì)算,如果沒有尖奔,則為null
    * @param valueBytes value的序列化字節(jié)數(shù)組搭儒,根據(jù)此值進(jìn)行分區(qū)計(jì)算。如果沒有越锈,則為
    null
    * @param cluster 當(dāng)前集群的元數(shù)據(jù) 
    */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //獲取該topic的所有的分區(qū)
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        //當(dāng)前分區(qū)大小
        int numPartitions = partitions.size();
        //如果不存在key
        if (keyBytes == null) {
            //獲取下次的值
            int nextValue = this.nextValue(topic);
            //查看主題可用的分區(qū)數(shù)
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            //如果可用分區(qū)數(shù)大于0仗嗦,
            if (availablePartitions.size() > 0) {
                //使用獲取的值counter對可用分區(qū)數(shù)取余,返回的是索引
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                //根據(jù)索引甘凭,獲取真正要放對的分區(qū)對應(yīng)的分區(qū)編號
                return ((PartitionInfo)availablePartitions.get(part)).partition();
            } else {
                //如果沒有可用分區(qū)稀拐,直接用總的分區(qū)進(jìn)行返回
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
          //如果存在key,對key進(jìn)行hash計(jì)算后對分區(qū)數(shù)進(jìn)行取余丹弱。就是該消息要進(jìn)入的分區(qū)
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        //根據(jù)topic獲取一個counter的計(jì)數(shù)
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
        if (null == counter) {//如果counter不存在
            //隨機(jī)生成一個
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
          //生成后的放到再根據(jù)topic放到定義好的topicCounterMap中
            AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
          //并將該值賦值給counter
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        //放回counter的值德撬,并對counter+1,方便下次獲取躲胳。
        return counter.getAndIncrement();
    }

    public void close() {
    }
}

如果有提供分區(qū)號蜓洪,在KafkaProducer類中看doSend方法的

int partition = this.partition(record, serializedKey, serializedValue, cluster);

partition:

 private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        //如果有分區(qū)號,直接返回分區(qū)號坯苹,如果沒有隆檀,調(diào)用的上面的partition方法
        return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }

如果要自定義分區(qū)器,則需要

  1. 首先開發(fā)Partitioner接口的實(shí)現(xiàn)類
  2. 在KafkaProducer中進(jìn)行設(shè)置:configs.put("partitioner.class", "xxx.xx.Xxx.class")

位于 org.apache.kafka.clients.producer 中的分區(qū)器接口:

自定義分區(qū)器

package com.hhb.kafka.partition;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * @description: 自定義分區(qū)器
 * @author: 
 * @date: 2020-08-13 21:06
 **/
public class MyPartitioner implements Partitioner {

    /**
     * 為指定的消息記錄計(jì)算分區(qū)值 *
     *
     * @param topic      主題名稱
     * @param key        根據(jù)該key的值進(jìn)行分區(qū)計(jì)算粹湃,如果沒有則為null恐仑。
     * @param keyBytes   key的序列化字節(jié)數(shù)組,根據(jù)該數(shù)組進(jìn)行分區(qū)計(jì)算为鳄。如果沒有key裳仆,則為
     *                   null
     * @param value      根據(jù)value值進(jìn)行分區(qū)計(jì)算,如果沒有孤钦,則為null
     * @param valueBytes value的序列化字節(jié)數(shù)組歧斟,根據(jù)此值進(jìn)行分區(qū)計(jì)算纯丸。如果沒有,則為
     *                   null
     * @param cluster    當(dāng)前集群的元數(shù)據(jù)
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //此處可以計(jì)算分區(qū)的數(shù)字
        //在這我們直接指定分區(qū)2
        return 2;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

生產(chǎn)者

package com.hhb.kafka.partition;

import com.hhb.kafka.serialize.UserSerializer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;

/**
 * @description:
 * @author: 
 * @date: 2020-08-13 20:44
 **/
public class MyProducer {
    public static void main(String[] args) {
        Map<String, Object> map = new HashMap<>();
        map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hhb:9092");
        map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //指定自定義分區(qū)器
        map.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(map);
        //不要設(shè)置指定的分區(qū)
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                "topic_partition_01",
                "myKey",
                "myValue"
        );
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                System.err.println("輸出分區(qū)信息:" + recordMetadata.partition());
                System.err.println("輸出主題信息:" + recordMetadata.topic());
                System.err.println("輸出偏移量信息:" + recordMetadata.offset());
            }
        });
        producer.close();
    }
}
攔截器
攔截器.png

Producer攔截器(interceptor)和Consumer端Interceptor是在Kafka 0.10版本被引入的静袖,主要用于實(shí)現(xiàn)Client端的定制化控制邏輯觉鼻。

對于Producer而言,Interceptor使得用戶在消息發(fā)送前以及Producer回調(diào)邏輯前有機(jī)會對消息做一些定制化需求勾徽,比如修改消息等滑凉。同時(shí)统扳,Producer允許用戶指定多個Interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain)喘帚。Intercetpor的實(shí)現(xiàn)接口是 org.apache.kafka.clients.producer.ProducerInterceptor,其定義的方法包括:

  • onSend(ProducerRecord):該方法封裝進(jìn)KafkaProducer.send方法中咒钟,即運(yùn)行在用戶主線 程中吹由。Producer確保在消息被序列化以計(jì)算分區(qū)前調(diào)用該方法。用戶可以在該方法中對消息 做任何操作朱嘴,但最好保證不要修改消息所屬的topic和分區(qū)倾鲫,否則會影響目標(biāo)分區(qū)的計(jì)算。
  • onAcknowledgement(RecordMetadata, Exception):該方法會在消息被應(yīng)答之前或消息發(fā) 送失敗時(shí)調(diào)用萍嬉,并且通常都是在Producer回調(diào)邏輯觸發(fā)之前乌昔。onAcknowledgement運(yùn)行在Producer的IO線程中,因此不要在該方法中放入很重的邏輯壤追,否則會拖慢Producer的消息發(fā) 送效率磕道。
  • close:關(guān)閉Interceptor,主要用于執(zhí)行一些資源清理工作行冰。

如前所述溺蕉,Interceptor可能被運(yùn)行在多個線程中,因此在具體實(shí)現(xiàn)時(shí)用戶需要自行確保線程安全悼做。 另外倘若指定了多個Interceptor疯特,則Producer將按照指定順序調(diào)用它們,并僅僅是捕獲每個 Interceptor可能拋出的異常記錄到錯誤日志中而非在向上傳遞肛走。這在使用過程中要特別留意漓雅。

  1. 實(shí)現(xiàn)ProducerInterceptor接口
  2. 在KafkaProducer的設(shè)置中設(shè)置自定義的攔截器

攔截器1

package com.hhb.kafka.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
 * @description:
 * @author: 
 * @date: 2020-08-13 21:32
 **/
public class MyInterceptor1 implements ProducerInterceptor<Integer, String> {


    private final static Logger logger = LoggerFactory.getLogger(MyInterceptor1.class);

    /**
     * 消息發(fā)送的時(shí)候,經(jīng)過攔截器朽色,調(diào)用該方法
     */
    @Override
    public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
        System.err.println("攔截器1 --- go");
        //要發(fā)送的消息
        String topic = record.topic();
        Integer key = record.key();
        String value = record.value();
        Integer partition = record.partition();
        Long timestamp = record.timestamp();
        Headers headers = record.headers();
        //攔截器攔下來之后的根據(jù)原來消息創(chuàng)建新的消息邻吞,此處沒有做任何改動
        ProducerRecord<Integer, String> newRecord = new ProducerRecord<>(
                topic, partition, timestamp, key, value, headers
        );
        //傳遞新的消息
        return newRecord;
    }

    /**
     * 消息確認(rèn)或異常的時(shí)候,調(diào)用該方法纵搁,該方法不應(yīng)該實(shí)現(xiàn)多大的任務(wù)吃衅,會影響生產(chǎn)者性能
     *
     * @param recordMetadata
     * @param e
     */
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        System.err.println("攔截器1 --- back");
    }

    @Override
    public void close() {

    }

    /**
     * 可以獲取到生產(chǎn)的配置信息
     *
     * @param map
     */
    @Override
    public void configure(Map<String, ?> map) {

        Object testConfig = map.get("testConfig");
        System.err.println("獲取到的testConfig值為:" + testConfig);
    }
}

攔截器2

package com.hhb.kafka.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
 * @description:
 * @author: 
 * @date: 2020-08-13 21:32
 **/
public class MyInterceptor2 implements ProducerInterceptor<Integer, String> {


    private final static Logger logger = LoggerFactory.getLogger(MyInterceptor2.class);

    /**
     * 消息發(fā)送的時(shí)候,經(jīng)過攔截器腾誉,調(diào)用該方法
     */
    @Override
    public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
        System.err.println("攔截器2 --- go");
        //要發(fā)送的消息
        String topic = record.topic();
        Integer key = record.key();
        String value = record.value();
        Integer partition = record.partition();
        Long timestamp = record.timestamp();
        Headers headers = record.headers();
        //攔截器攔下來之后的根據(jù)原來消息創(chuàng)建新的消息徘层,此處沒有做任何改動
        ProducerRecord<Integer, String> newRecord = new ProducerRecord<>(
                topic, partition, timestamp, key, value, headers
        );
        //傳遞新的消息
        return newRecord;
    }

    /**
     * 消息確認(rèn)或異常的時(shí)候峻呕,調(diào)用該方法,該方法不應(yīng)該實(shí)現(xiàn)多大的任務(wù)趣效,會影響生產(chǎn)者性能
     *
     * @param recordMetadata
     * @param e
     */
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        System.err.println("攔截器2 --- back");
    }

    @Override
    public void close() {

    }

    /**
     * 可以獲取到生產(chǎn)的配置信息
     *
     * @param map
     */
    @Override
    public void configure(Map<String, ?> map) {

        Object testConfig = map.get("testConfig");
        System.err.println("獲取到的testConfig值為:" + testConfig);
    }
}

攔截器3

package com.hhb.kafka.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
 * @description:
 * @author: 
 * @date: 2020-08-13 21:32
 **/
public class MyInterceptor3 implements ProducerInterceptor<Integer, String> {


    private final static Logger logger = LoggerFactory.getLogger(MyInterceptor3.class);

    /**
     * 消息發(fā)送的時(shí)候瘦癌,經(jīng)過攔截器,調(diào)用該方法
     */
    @Override
    public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
        System.err.println("攔截器3 --- go");
        //要發(fā)送的消息
        String topic = record.topic();
        Integer key = record.key();
        String value = record.value();
        Integer partition = record.partition();
        Long timestamp = record.timestamp();
        Headers headers = record.headers();
        //攔截器攔下來之后的根據(jù)原來消息創(chuàng)建新的消息跷敬,此處沒有做任何改動
        ProducerRecord<Integer, String> newRecord = new ProducerRecord<>(
                topic, partition, timestamp, key, value, headers
        );
        //傳遞新的消息
        return newRecord;
    }

    /**
     * 消息確認(rèn)或異常的時(shí)候讯私,調(diào)用該方法,該方法不應(yīng)該實(shí)現(xiàn)多大的任務(wù)西傀,會影響生產(chǎn)者性能
     *
     * @param recordMetadata
     * @param e
     */
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        System.err.println("攔截器3 --- back");
    }

    @Override
    public void close() {

    }

    /**
     * 可以獲取到生產(chǎn)的配置信息
     *
     * @param map
     */
    @Override
    public void configure(Map<String, ?> map) {

        Object testConfig = map.get("testConfig");
        System.err.println("獲取到的testConfig值為:" + testConfig);
    }
}

生產(chǎn)者

package com.hhb.kafka.interceptor;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;

/**
 * @description:
 * @author: 
 * @date: 2020-08-13 20:44
 **/
public class MyProducer {
    public static void main(String[] args) {
        Map<String, Object> map = new HashMap<>();
        map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hhb:9092");
        map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //設(shè)置攔截器斤寇,如果設(shè)置多個攔截器,則填寫多個攔截器的全限定類名拥褂,中間用逗號隔開
        map.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.hhb.kafka.interceptor.MyInterceptor1,com.hhb.kafka.interceptor.MyInterceptor2,com.hhb.kafka.interceptor.MyInterceptor3");
        //測試使用的配置
        map.put("testConfig", "this is test config");
        KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(map);
        //不要設(shè)置指定的分區(qū)
        ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
                "topic_interception_01",
                0,
                123,
                "myValue"
        );
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                System.err.println("輸出分區(qū)信息:" + recordMetadata.partition());
                System.err.println("輸出主題信息:" + recordMetadata.topic());
                System.err.println("輸出偏移量信息:" + recordMetadata.offset());
            }
        });
        producer.close();
    }
}

原理剖析

生產(chǎn)者原理剖析.png

由上圖可以看出:KafkaProducer有兩個基本線程:

  • 主線程:負(fù)責(zé)消息創(chuàng)建攔截器娘锁、序列化器、分區(qū)器等操作饺鹃,并將消息追加到消息收集器RecoderAccumulator中

    • 消息收集器RecoderAccumulator為每個分區(qū)都維護(hù)了一個 Deque< ProducerBatch > 類型的雙端隊(duì)列莫秆。

    • ProducerBatch 可以理解為是 ProducerRecord 的集合,批量發(fā)送有利于提升吞吐 量悔详,降低網(wǎng)絡(luò)影響;

    • 由于生產(chǎn)者客戶端使用 java.io.ByteBuffer 在發(fā)送消息之前進(jìn)行消息保存镊屎,并維護(hù) 了一個 BufferPool 實(shí)現(xiàn) ByteBuffer 的復(fù)用;該緩存池只針對特定大小( batch.size 指定)的 ByteBuffer進(jìn)行管理,對于消息過大的緩存茄螃,不能做到重復(fù)利 用缝驳。

    • 每次追加一條ProducerRecord消息,會尋找/新建對應(yīng)的雙端隊(duì)列责蝠,從其尾部獲取一個ProducerBatch党巾,判斷當(dāng)前消息的大小是否可以寫入該批次中。若可以寫入則寫入;若不可以寫入霜医,則新建一個ProducerBatch齿拂,判斷該消息大小是否超過客戶端參數(shù)配置 batch.size 的值,不超過肴敛,則以 batch.size建立新的ProducerBatch署海, 這樣方便進(jìn)行緩存重復(fù)利用;若超過,則以計(jì)算的消息大小建立對應(yīng)的 ProducerBatch 医男,缺點(diǎn)就是該內(nèi)存不能被復(fù)用了砸狞。

  • Sender線程:

    • 該線程從消息收集器獲取緩存的消息,將其處理為 <Node, List< ProducerBatch> 的形式镀梭, Node 表示集群的broker節(jié)點(diǎn)刀森。
    • 進(jìn)一步將<Node, List< ProducerBatch>轉(zhuǎn)化為<Node, Request>形式,此時(shí)才可以 向服務(wù)端發(fā)送數(shù)據(jù)报账。
    • 在發(fā)送之前研底,Sender線程將消息以 Map<NodeId, Deque< Request>> 的形式保存 到 InFlightRequests 中進(jìn)行緩存埠偿,可以通過其獲取 leastLoadedNode ,即當(dāng)前Node中負(fù)載壓力最小的一個,以實(shí)現(xiàn)消息的盡快發(fā)出榜晦。

生產(chǎn)者參數(shù)配置補(bǔ)充

  1. 參數(shù)設(shè)置方式:

     Map<String, Object> map = new HashMap<>();
            map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hhb:9092");
            map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
            map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    
            //設(shè)置攔截器冠蒋,如果設(shè)置多個攔截器,則填寫多個攔截器的全限定類名乾胶,中間用逗號隔開
            map.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.hhb.kafka.interceptor.MyInterceptor1,com.hhb.kafka.interceptor.MyInterceptor2,com.hhb.kafka.interceptor.MyInterceptor3");
    KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(map);
    
  2. 補(bǔ)充參數(shù):

參數(shù)名稱 描述
retry.backoff.ms 在向一個指定的主題分區(qū)重發(fā)消息的時(shí)候抖剿,重試之間的等待時(shí)間。 比如3次重試识窿,每次重試之后等待該時(shí)間長度斩郎,再接著重試。在一些失敗的場景腕扶,避免了密集循環(huán)的重新發(fā)送請求孽拷。 long型值吨掌,默認(rèn)100半抱。可選值:[0,...]
retries retries重試次數(shù)膜宋,當(dāng)消息發(fā)送出現(xiàn)錯誤的時(shí)候窿侈,系統(tǒng)會重發(fā)消息。 跟客戶端收到錯誤時(shí)重發(fā)一樣秋茫。 如果設(shè)置了重試史简,還想保證消息的有序性,需要設(shè)置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1肛著,否則在重試此失敗消息的時(shí)候圆兵,其他的消息可能發(fā)送成功了
request.timeout.ms 客戶端等待請求響應(yīng)的最大時(shí)長。如果服務(wù)端響應(yīng)超時(shí)枢贿,則會重發(fā)請求殉农,除非達(dá)到重試次數(shù)。該設(shè)置應(yīng)該比replica.lag.time.max.ms (a broker configuration)要大局荚,以免在服務(wù)器延遲時(shí)間內(nèi)重發(fā)消息超凳。int類型值,默 認(rèn):30000耀态,可選值:[0,...]
interceptor.classes 在生產(chǎn)者接收到該消息轮傍,向Kafka集群傳輸之前,由序列化器處理之前首装,可以通過攔截器對消息進(jìn)行處理创夜。要求攔截器類必須實(shí)現(xiàn)org.apache.kafka.clients.producer.ProducerInterceptor 接口。 默認(rèn)沒有攔截器仙逻。 Map<String, Object> configs中通過List集合配置多個攔截器類名驰吓。
acks 當(dāng)生產(chǎn)者發(fā)送消息之后揍魂,如何確認(rèn)消息已經(jīng)發(fā)送成功了。 支持的值:<br />acks=0: 如果設(shè)置為0棚瘟,表示生產(chǎn)者不會等待broker對消息的確認(rèn)现斋,只要將消息放到 緩沖區(qū),就認(rèn)為消息已經(jīng)發(fā)送完成偎蘸。 該情形不能保證broker是否真的收到了消息庄蹋,retries配置也不會生效,因?yàn)?客戶端不需要知道消息是否發(fā)送成功迷雪。 發(fā)送的消息的返回的消息偏移量永遠(yuǎn)是-1限书。<br />acks=1表示消息只需要寫到主分區(qū)即可,然后就響應(yīng)客戶端章咧,而不等待副本分區(qū)的 確認(rèn)倦西。 在該情形下,如果主分區(qū)收到消息確認(rèn)之后就宕機(jī)了赁严,而副本分區(qū)還沒來得 及同步該消息扰柠,則該消息丟失。<br />acks=all 首領(lǐng)分區(qū)會等待所有的ISR副本分區(qū)確認(rèn)記錄疼约。 該處理保證了只要有一個ISR副本分區(qū)存貨卤档,消息就不會丟失。 這是Kafka最強(qiáng)的可靠性保證程剥,等效于 acks=-1 劝枣。
batch.size 當(dāng)多個消息發(fā)送到同一個分區(qū)的時(shí)候,生產(chǎn)者嘗試將多個記錄作為一個批來處理织鲸。批處理提高了客戶端和服務(wù)器的處理效率舔腾。 該配置項(xiàng)以字節(jié)為單位控制默認(rèn)批的大小。
所有的批小于等于該值搂擦。 發(fā)送給broker的請求將包含多個批次稳诚,每個分區(qū)一個,并包含可發(fā)送的數(shù) 據(jù)盾饮。 如果該值設(shè)置的比較小采桃,會限制吞吐量(設(shè)置為0會完全禁用批處理)。如果設(shè)置的很大丘损,又有一點(diǎn)浪費(fèi)內(nèi)存普办,因?yàn)镵afka會永遠(yuǎn)分配這么大的內(nèi)存來 參與到消息的批整合中。
client.id 生產(chǎn)者發(fā)送請求的時(shí)候傳遞給broker的id字符串徘钥。 用于在broker的請求日志中追蹤什么應(yīng)用發(fā)送了什么消息沐兰。 一般該id是跟業(yè)務(wù)有關(guān)的字符串斩祭。
compression.type 生產(chǎn)者發(fā)送的所有數(shù)據(jù)的壓縮方式攒庵。默認(rèn)是none薇正,也就是不壓縮吏口。 支持的值:none、gzip、snappy和lz4。 壓縮是對于整個批來講的拘荡,所以批處理的效率也會影響到壓縮的比例。
send.buffer.bytes TCP發(fā)送數(shù)據(jù)的時(shí)候使用的緩沖區(qū)(SO_SNDBUF)大小撬陵。如果設(shè)置為0珊皿,則使用操作系統(tǒng)默認(rèn)的。
buffer.memory 生產(chǎn)者可以用來緩存等待發(fā)送到服務(wù)器的記錄的總內(nèi)存字節(jié)巨税。如果記錄的發(fā) 送速度超過了將記錄發(fā)送到服務(wù)器的速度蟋定,則生產(chǎn)者將阻塞 max.block.ms 的時(shí)間,此后它將引發(fā)異常草添。此設(shè)置應(yīng)大致對應(yīng)于生產(chǎn)者將使用的總內(nèi)存驶兜, 但并非生產(chǎn)者使用的所有內(nèi)存都用于緩沖。一些額外的內(nèi)存將用于壓縮(如 果啟用了壓縮)以及維護(hù)運(yùn)行中的請求远寸。long型數(shù)據(jù)抄淑。默認(rèn)值: 33554432,可選值:[0,...]
connections.max.idle.ms 當(dāng)連接空閑時(shí)間達(dá)到這個值而晒,就關(guān)閉連接蝇狼。long型數(shù)據(jù),默認(rèn):540000
linger.ms 生產(chǎn)者在發(fā)送請求傳輸間隔會對需要發(fā)送的消息進(jìn)行累積倡怎,然后作為一個批 次發(fā)送。一般情況是消息的發(fā)送的速度比消息累積的速度慢贱枣。有時(shí)客戶端需 要減少請求的次數(shù)监署,即使是在發(fā)送負(fù)載不大的情況下。該配置設(shè)置了一個延 遲纽哥,生產(chǎn)者不會立即將消息發(fā)送到broker钠乏,而是等待這么一段時(shí)間以累積消 息,然后將這段時(shí)間之內(nèi)的消息作為一個批次發(fā)送春塌。該設(shè)置是批處理的另一 個上限:一旦批消息達(dá)到了 batch.size 指定的值晓避,消息批會立即發(fā)送,如 果積累的消息字節(jié)數(shù)達(dá)不到 batch.size 的值只壳,可以設(shè)置該毫秒值俏拱,等待這 么長時(shí)間之后,也會發(fā)送消息批吼句。該屬性默認(rèn)值是0(沒有延遲)锅必。如果設(shè) 置 linger.ms=5 ,則在一個請求發(fā)送之前先等待5ms惕艳。long型值搞隐,默認(rèn): 0驹愚,可選值:[0,...]
max.block.ms 控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 阻塞的時(shí)長。當(dāng)緩存滿了或元數(shù)據(jù)不可用的時(shí)候劣纲,這些方法阻塞逢捺。在用戶提供的 序列化器和分區(qū)器的阻塞時(shí)間不計(jì)入。long型值癞季,默認(rèn):60000蒸甜,可選值: [0,...]
max.request.size 單個請求的最大字節(jié)數(shù)。該設(shè)置會限制單個請求中消息批的消息個數(shù)余佛,以免 單個請求發(fā)送太多的數(shù)據(jù)柠新。服務(wù)器有自己的限制批大小的設(shè)置,與該配置可 能不一樣辉巡。int類型值恨憎,默認(rèn)1048576,可選值:[0,...]
partitioner.class 實(shí)現(xiàn)了接口org.apache.kafka.clients.producer.Partitioner 的分區(qū) 器實(shí)現(xiàn)類郊楣。默認(rèn)值為:org.apache.kafka. clients.producer.internals.DefaultPartitioner
receive.buffer.bytes TCP接收緩存(SO_RCVBUF)憔恳,如果設(shè)置為-1,則使用操作系統(tǒng)默認(rèn)的值净蚤。 int類型值钥组,默認(rèn)32768,可選值:[-1,...]
security.protocol 跟broker通信的協(xié)議:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. string類型值今瀑,默認(rèn):PLAINTEXT
max.in.flight.requests.per .connection 單個連接上未確認(rèn)請求的最大數(shù)量程梦。達(dá)到這個數(shù)量,客戶端阻塞橘荠。如果該值 大于1屿附,且存在失敗的請求,在重試的時(shí)候消息順序不能保證哥童。 int類型值挺份,默認(rèn)5≈福可選值:[1,...]
reconnect.backoff.max.ms 對于每個連續(xù)的連接失敗匀泊,每臺主機(jī)的退避將成倍增加,直至達(dá)到此最大 值朵你。在計(jì)算退避增量之后各聘,添加20%的隨機(jī)抖動以避免連接風(fēng)暴。 long型值撬呢,默認(rèn)1000伦吠,可選值:[0,...]
reconnect.backoff.ms 嘗試重連指定主機(jī)的基礎(chǔ)等待時(shí)間。避免了到該主機(jī)的密集重連。該退避時(shí) 間應(yīng)用于該客戶端到broker的所有連接毛仪。 long型值搁嗓,默認(rèn)50∠溲ィ可選值:[0,...]

消費(fèi)者

概念入門

消費(fèi)者腺逛、消費(fèi)組

消費(fèi)者從訂閱的主題消費(fèi)消息,消費(fèi)消息的偏移量保存在Kafka的名字是 __consumer_offsets 的主題中衡怀。 消費(fèi)者還可以將自己的偏移量存儲到Zookeeper棍矛,需要設(shè)置offset.storage=zookeeper。推薦使用Kafka存儲消費(fèi)者的偏移量抛杨。因?yàn)閆ookeeper不適合高并發(fā)够委。多個從同一個主題消費(fèi)的消費(fèi)者可以加入到一個消費(fèi)組中。 消費(fèi)組中的消費(fèi)者共享group_id怖现。

configs.put("group.id", "xxx");

group_id一般設(shè)置為應(yīng)用的邏輯名稱茁帽。比如多個訂單處理程序組成一個消費(fèi)組,可以設(shè)置group_id 為"order_process"潘拨。group_id通過消費(fèi)者的配置指定: group.id=xxxxx 消費(fèi)組均衡地給消費(fèi)者分配分區(qū)茫船,每個分區(qū)只由消費(fèi)組中一個消費(fèi)者消費(fèi)。

消費(fèi)者罪治、消費(fèi)組.png

一個擁有四個分區(qū)的主題晒骇,包含一個消費(fèi)者的消費(fèi)組。 此時(shí)剥啤,消費(fèi)組中的消費(fèi)者消費(fèi)主題中的所有分區(qū)。并且沒有重復(fù)的可能。如果在消費(fèi)組中添加一個消費(fèi)者2,則每個消費(fèi)者分別從兩個分區(qū)接收消息。


消費(fèi)者野舶、消費(fèi)組1.png

如果消費(fèi)組有四個消費(fèi)者一屋,則每個消費(fèi)者可以分配到一個分區(qū)。


消費(fèi)者弟翘、消費(fèi)組2.png

如果向消費(fèi)組中添加更多的消費(fèi)者滚躯,超過主題分區(qū)數(shù)量丧凤,則有一部分消費(fèi)者就會閑置仍侥,不會接收任 何消息。


消費(fèi)者平挑、消費(fèi)組3.png

向消費(fèi)組添加消費(fèi)者是橫向擴(kuò)展消費(fèi)能力的主要方式。必要時(shí)助泽,需要為主題創(chuàng)建大量分區(qū),在負(fù)載增長時(shí)可以加入更多的消費(fèi)者摄凡。但是不要讓消費(fèi)者的數(shù) 量超過主題分區(qū)的數(shù)量床绪。

消費(fèi)者痹雅、消費(fèi)組4.png

除了通過增加消費(fèi)者來橫向擴(kuò)展單個應(yīng)用的消費(fèi)能力之外仰担,經(jīng)常出現(xiàn)多個應(yīng)用程序從同一個主題消費(fèi)的情況。此時(shí)练慕,每個應(yīng)用都可以獲取到所有的消息项鬼。只要保證每個應(yīng)用都有自己的消費(fèi)組路操,就可以讓它們獲取到主題所有的消息垫桂。橫向擴(kuò)展消費(fèi)者和消費(fèi)組不會對性能造成負(fù)面影響。為每個需要獲取一個或多個主題全部消息的應(yīng)用創(chuàng)建一個消費(fèi)組,然后向消費(fèi)組添加消費(fèi)者來橫向 擴(kuò)展消費(fèi)能力和應(yīng)用的處理能力,則每個消費(fèi)者只處理一部分消息辫秧。

心跳機(jī)制
心跳機(jī)制1.png

消費(fèi)者宕機(jī)险毁,退出消費(fèi)組宽档,觸發(fā)再平衡,重新給消費(fèi)組中的消費(fèi)者分配分區(qū)璧诵。


心跳機(jī)制2.png

由于broker宕機(jī),主題X的分區(qū)3宕機(jī),此時(shí)分區(qū)3沒有Leader副本闸迷,觸發(fā)再平衡腥沽,消費(fèi)者4沒有對 應(yīng)的主題分區(qū)逮走,則消費(fèi)者4閑置。


心跳機(jī)制3.png

Kafka 的心跳是 Kafka Consumer 和 Broker 之間的健康檢查巡球,只有當(dāng) Broker Coordinator 正常 時(shí)言沐,Consumer 才會發(fā)送心跳。

Consumer 和 Rebalance 相關(guān)的 2 個配置參數(shù):

參數(shù) 字段
session.timeout.ms MemberMetadata.sessionTimeoutMs
max.poll.interval.ms MemberMetadata.rebalanceTimeoutMs

broker 端酣栈,sessionTimeoutMs 參數(shù)

broker 處理心跳的邏輯在 GroupCoordinator 類中:如果心跳超期, broker coordinator 會把消 費(fèi)者從 group 中移除汹押,并觸發(fā) rebalance矿筝。

  /**
   * Complete existing DelayedHeartbeats for the given member and schedule the next one
   */
private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) {
  // complete current heartbeat expectation
  member.latestHeartbeat = time.milliseconds()
    val memberKey = MemberKey(member.groupId, member.memberId)
    heartbeatPurgatory.checkAndComplete(memberKey)

    // reschedule the next heartbeat expiration deadline
    // 計(jì)算心跳截止時(shí)刻
    val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs
    val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs)
    heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
}
// 心跳過期
def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
  group.inLock {
    if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {
      info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
        removeMemberAndUpdateGroup(group, member)
    }
  }
}

def onCompleteHeartbeat() {
  // TODO: add metrics for complete heartbeats
}

def partitionFor(group: String): Int = groupManager.partitionFor(group)

  private def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: Long) =
  member.awaitingJoinCallback != null ||
  member.awaitingSyncCallback != null ||
  member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline

consumer 端:sessionTimeoutMs,rebalanceTimeoutMs 參數(shù)

如果客戶端發(fā)現(xiàn)心跳超期棚贾,客戶端會標(biāo)記 coordinator 為不可用窖维,并阻塞心跳線程;如果超過了 poll 消息的間隔超過了 rebalanceTimeoutMs,則 consumer 告知 broker 主動離開消費(fèi)組妙痹,也會觸發(fā) rebalance.

org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatThread

if (coordinatorUnknown()) {
  if (findCoordinatorFuture != null || lookupCoordinator().failed())
    // the immediate future check ensures that we backoff properly in the case that no
    // brokers are available to connect to.
    AbstractCoordinator.this.wait(retryBackoffMs);
} else if (heartbeat.sessionTimeoutExpired(now)) {
  // the session timeout has expired without seeing a successful heartbeat, so we should
  // probably make sure the coordinator is still healthy.
  markCoordinatorUnknown();
} else if (heartbeat.pollTimeoutExpired(now)) {
  // the poll timeout has expired, which means that the foreground thread has stalled
  // in between calls to poll(), so we explicitly leave the group.
  maybeLeaveGroup();
} else if (!heartbeat.shouldHeartbeat(now)) {
  // poll again after waiting for the retry backoff in case the heartbeat failed or the
  // coordinator disconnected
  AbstractCoordinator.this.wait(retryBackoffMs);
} else {
  heartbeat.sentHeartbeat(now);

  sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() {
    @Override
    public void onSuccess(Void value) {
      synchronized (AbstractCoordinator.this) {
        heartbeat.receiveHeartbeat(time.milliseconds());
      }
    }

    @Override
    public void onFailure(RuntimeException e) {
      synchronized (AbstractCoordinator.this) {
        if (e instanceof RebalanceInProgressException) {
          // it is valid to continue heartbeating while the group is rebalancing. This
          // ensures that the coordinator keeps the member in the group for as long
          // as the duration of the rebalance timeout. If we stop sending heartbeats,
          // however, then the session timeout may expire before we can rejoin.
          heartbeat.receiveHeartbeat(time.milliseconds());
        } else {
          heartbeat.failHeartbeat();

          // wake up the thread if it's sleeping to reschedule the heartbeat
          AbstractCoordinator.this.notify();
        }
      }
    }
  });
}

消息接收

必要參數(shù)配置
參數(shù) 說明
bootstrap.servers 向Kafka集群建立初始連接用到的host/port列表铸史。 客戶端會使用這里列出的所有服務(wù)器進(jìn)行集群其他服務(wù)器的發(fā)現(xiàn),而不管是否指定了哪個服務(wù)器用作引導(dǎo)怯伊。 這個列表僅影響用來發(fā)現(xiàn)集群所有服務(wù)器的初始主機(jī)琳轿。 字符串形式:host1:port1,host2:port2,... 由于這組服務(wù)器僅用于建立初始鏈接,然后發(fā)現(xiàn)集群中的所有服務(wù)器耿芹, 因此沒有必要將集群中的所有地址寫在這里崭篡。 一般最好兩臺,以防其中一臺宕掉吧秕。
key.deserializer key的反序列化類琉闪,該類需要實(shí)現(xiàn) org.apache.kafka.common.serialization.Deserializer 接口。
value.deserializer 實(shí)現(xiàn)了 org.apache.kafka.common .serialization.Deserializer 接口的反序列化器砸彬,用于對消息的value進(jìn)行反序列化颠毙。
client.id 當(dāng)從服務(wù)器消費(fèi)消息的時(shí)候向服務(wù)器發(fā)送的id字符串。在ip/port基礎(chǔ)上提供應(yīng)用的邏輯名稱砂碉,記錄在服務(wù)端的請求日志中蛀蜜,用于追蹤請求的源。
group.id 用于唯一標(biāo)志當(dāng)前消費(fèi)者所屬的消費(fèi)組的字符串绽淘。 如果消費(fèi)者使用組管理功能如subscribe(topic)或使用基于Kafka的偏移量管理策略涵防,該項(xiàng)必須設(shè)置。
auto.offset.reset 當(dāng)Kafka中沒有初始偏移量或當(dāng)前偏移量在服務(wù)器中不存在(如,數(shù)據(jù)被 刪除了)壮池,該如何處理? <br />earliest:自動重置偏移量到最早的偏移量 <br />latest:自動重置偏移量為最新的偏移量 <br />none:如果消費(fèi)組原來的(previous)偏移量不存在偏瓤,則向消費(fèi)者拋異常<br />anything:向消費(fèi)者拋異常
enable.auto.commit 如果設(shè)置為true,消費(fèi)者會自動周期性地向服務(wù)器提交偏移量椰憋。
訂閱

主題和分區(qū)

  • Topic厅克,Kafka用于分類管理消息的邏輯單元,類似與MySQL的數(shù)據(jù)庫橙依。

  • Partition证舟,是Kafka下數(shù)據(jù)存儲的基本單元,這個是物理上的概念窗骑。同一個topic的數(shù)據(jù)女责,會被分散的存儲到多個partition中,這些partition可以在同一臺機(jī)器上创译,也可以是在多臺機(jī)器 上抵知。優(yōu)勢在于:有利于水平擴(kuò)展,避免單臺機(jī)器在磁盤空間和性能上的限制软族,同時(shí)可以通過復(fù) 制來增加數(shù)據(jù)冗余性刷喜,提高容災(zāi)能力。為了做到均勻分布立砸,通常partition的數(shù)量通常是 Broker Server數(shù)量的整數(shù)倍掖疮。

  • Consumer Group,同樣是邏輯上的概念颗祝,是Kafka實(shí)現(xiàn)單播和廣播兩種消息模型的手段浊闪。 保證一個消費(fèi)組獲取到特定主題的全部的消息。在消費(fèi)組內(nèi)部吐葵,若干個消費(fèi)者消費(fèi)主題分區(qū)的 消息规揪,消費(fèi)組可以保證一個主題的每個分區(qū)只被消費(fèi)組中的一個消費(fèi)者消費(fèi)。

    主題和分區(qū)1.png

consumer 采用 pull 模式從 broker 中讀取數(shù)據(jù)温峭。采用 pull 模式猛铅,consumer 可自主控制消費(fèi)消息的速率, 可以自己控制消費(fèi)方式(批量消費(fèi)/逐條消費(fèi))凤藏,還可以選擇不同的提交方式從而實(shí)現(xiàn)不同的傳輸語義奸忽。 consumer.subscribe("tp_demo_01,tp_demo_02")

反序列化

Kafka的broker中所有的消息都是字節(jié)數(shù)組,消費(fèi)者獲取到消息之后揖庄,需要先對消息進(jìn)行反序列化處理栗菜,然后才能交給用戶程序消費(fèi)處理。 消費(fèi)者的反序列化器包括key的和value的反序列化器蹄梢。

  • key.deserializer: 對
  • value.deserializer

需要實(shí)現(xiàn) org.apache.kafka.common.serialization.Deserializer<T> 接口疙筹。消費(fèi)者從訂閱的主題拉取消息:
consumer.poll(3000); 在Fetcher類中,對拉取到的消息首先進(jìn)行反序列化處理。源碼如下:

private ConsumerRecord<K, V> parseRecord(TopicPartition partition, RecordBatch batch, Record record) {
    try {
        long offset = record.offset();
        long timestamp = record.timestamp();
        TimestampType timestampType = batch.timestampType();
        Headers headers = new RecordHeaders(record.headers());
        ByteBuffer keyBytes = record.key();
        byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
        // 反序列化key
        K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
        ByteBuffer valueBytes = record.value();
        byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
        // 反序列化value
        V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
        return new ConsumerRecord(partition.topic(), partition.partition(), offset, timestamp, timestampType, record.checksumOrNull(), keyByteArray == null ? -1 : keyByteArray.length, valueByteArray == null ? -1 : valueByteArray.length, key, value, headers);
    } catch (RuntimeException var16) {
        throw new SerializationException("Error deserializing key/value for partition " + partition + " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", var16);
    }
}

默認(rèn)提供幾個發(fā)序列化的實(shí)現(xiàn):

  • org.apache.kafka.common.serialization.ByteArrayDeserializer
  • org.apache.kafka.common.serialization.ByteBufferDeserializer
  • org.apache.kafka.common.serialization.BytesDeserializer
  • org.apache.kafka.common.serialization.DoubleDeserializer
  • org.apache.kafka.common.serialization.IntegerDeserializer
  • org.apache.kafka.common.serialization.ShortDeserializer
  • org.apache.kafka.common.serialization.StringDeserializer
  • ……

等等一些系統(tǒng)自帶的反序列化類而咆,可以看org.apache.kafka.common.serialization.Deserializer接口的實(shí)現(xiàn)類

自定義反序列化

自定義反序列化類霍比,需要實(shí)現(xiàn) org.apache.kafka.common.serialization.Deserializer<T> 接口。

User:

package com.hhb.kafka.deserialize;

/**
 * @description: 用戶自定義封裝消息的實(shí)體類
 * @author: 
 * @date: 2020-08-13 20:01
 **/
public class User {

    private Integer userId;

    private String userName;

    public User() {
    }

    public User(Integer userId, String userName) {
        this.userId = userId;
        this.userName = userName;
    }

    public Integer getUserId() {
        return userId;
    }

    public User setUserId(Integer userId) {
        this.userId = userId;
        return this;
    }

    public String getUserName() {
        return userName;
    }

    public User setUserName(String userName) {
        this.userName = userName;
        return this;
    }

    @Override
    public String toString() {
        return "User{" +
                "userId=" + userId +
                ", userName='" + userName + '\'' +
                '}';
    }
}

UserDeserializer:

package com.hhb.kafka.deserialize;

import org.apache.kafka.common.serialization.Deserializer;

import java.nio.ByteBuffer;
import java.util.Map;

/**
 * @description:
 * @author:
 * @date: 2020-08-17 20:00
 **/
public class UserDeserializer implements Deserializer<User> {

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public User deserialize(String s, byte[] bytes) {
        //分配空間
        ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
        //把byte數(shù)據(jù)寫入到byteBuffer中暴备,只是游標(biāo)指向最后悠瞬,級bytes長度的位置
        byteBuffer.put(bytes);
        //將游標(biāo)指向最開始
        byteBuffer.flip();
        //獲取第一個int
        int userId = byteBuffer.getInt();
        // 獲取第二個int,即userName的長度
        int length = byteBuffer.getInt();
        // 生成userName
        String userName = new String(bytes, 8, length);

        return new User(userId, userName);
    }
    @Override
    public void close() {
    }
}

UserConsumer:

package com.hhb.kafka.deserialize;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
 * @description:
 * @author:
 * @date: 2020-08-17 19:57
 **/
public class UserConsumer {


    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hhb:9092");
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //設(shè)置自定義的反序列化器
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);

        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "user_consumer");
        configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_id");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<Integer, User> consumer = new KafkaConsumer<>(configs);

        consumer.subscribe(Collections.singleton("topic_user_1"));
        ConsumerRecords<Integer, User> records = consumer.poll(Long.MAX_VALUE);
        records.forEach(record -> System.err.println(record.value() + "    " + record.key()));
        consumer.close();
    }
}

位移提交
  1. Consumer需要向Kafka記錄自己的位移數(shù)據(jù),這個匯報(bào)過程稱為提交位移(Committing offset)

  2. Consumer 需要為分配給它的每個分區(qū)提交各自的位移數(shù)據(jù)

  3. 位移提交的由Consumer端負(fù)責(zé)的涯捻,Kafka只負(fù)責(zé)保管浅妆。__consumer_offsets

  4. 位移提交分為自動提交和手動提交

  5. 手動位移提交分為同步提交和異步提交

自動提交

Kafka Consumer 后臺提交

  • 開啟自動提交:enable.auto.commit=true
  • 配置自動提交間隔:Consumer端:auto.commit.interval.ms 默認(rèn)5s
 public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hhb:9092");
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //設(shè)置自定義的反序列化器
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);

        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "user_consumer");
        configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_id");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 設(shè)置偏移量為自動提交,默認(rèn)值
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        // 偏移量自動提交的時(shí)間間隔。模式值是5秒障癌,這里手動設(shè)置3秒
        configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "3000");

        KafkaConsumer<Integer, User> consumer = new KafkaConsumer<>(configs);

        consumer.subscribe(Collections.singleton("topic_user_1"));
        ConsumerRecords<Integer, User> records = consumer.poll(Long.MAX_VALUE);
        records.forEach(record -> System.err.println(record.value() + "    " + record.key()));
        consumer.close();
    }

  • 自動提交位移的順序
    • 配置 enable.auto.commit = true
    • Kafka會保證在開始調(diào)用poll方法時(shí)凌外,提交上次poll返回的所有消息
    • 因此自動提交不會出現(xiàn)消息丟失,但會重復(fù)消費(fèi)
  • 重復(fù)消費(fèi)舉例
    • Consumer 每 5s 提交 offset
    • 假設(shè)提交 offset 后的 3s 發(fā)生了 Rebalance
    • Rebalance 之后的所有 Consumer 從上一次提交的 offset 處繼續(xù)消費(fèi)
    • 因此 Rebalance 發(fā)生前 3s 的消息會被重復(fù)消費(fèi)

手動提交-同步提交

  • 使用 KafkaConsumer#commitSync():會提交 KafkaConsumer#poll() 返回的最新 offset

  • 該方法為同步操作涛浙,等待直到 offset 被成功提交才返回

while (true) {
    ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1)); process(records); // 處理消息
  try {
  consumer.commitSync();
  } catch (CommitFailedException e) {
    handle(e); // 處理提交失敗異常 
  }
}
  • commitSync 在處理完所有消息之后
  • 手動同步提交可以控制offset提交的時(shí)機(jī)和頻率
  • 手動同步提交會:
    • 調(diào)用 commitSync 時(shí)趴乡,Consumer 處于阻塞狀態(tài),直到 Broker 返回結(jié)果

    • 會影響 TPS

    • 可以選擇拉長提交間隔蝗拿,但有以下問題:

      • 會導(dǎo)致 Consumer 的提交頻率下降
      • Consumer 重啟后,會有更多的消息被消費(fèi)

手動提交-異步提交

  • KafkaConsumer#commitAsync()
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(3_000); 
    // 處理消息
    process(records); 
    consumer.commitAsync((offsets, exception) -> {
            if (exception != null) {
                handle(exception);
    } });
}
  • commitAsync(異步提交)出現(xiàn)問題不會自動重試蒿涎,不會自動再次提交哀托。
  • 處理方式:
  try {
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
      // 處理消息
      process(records);
      // 使用異步提交規(guī)避阻塞
      commitAysnc();
    }
  } catch (Exception e) {
    handle(e); // 處理異常
  } finally {
    try {
      consumer.commitSync(); // 最后一次提交使用同步阻塞式提
    } finally {
      consumer.close();
    }
  }
消費(fèi)者位移管理

Kafka中,消費(fèi)者根據(jù)消息的位移順序消費(fèi)消息劳秋。消費(fèi)者的位移由消費(fèi)者管理仓手,可以存儲于zookeeper中,也可以存儲于Kafka主題 __consumer_offsets中玻淑。Kafka提供了消費(fèi)者API嗽冒,讓消費(fèi)者可以管理自己的位移。

API如下:KafkaConsumer<K, V>

細(xì)節(jié) 說明
public void assign(Collection<TopicPartition> partitions) 給當(dāng)前消費(fèi)者手動分配一系列主題分區(qū)补履。 手動分配分區(qū)不支持增量分配添坊,如果先前有分配分區(qū),則該操作會覆蓋之前的分配箫锤。 如果給出的主題分區(qū)是空的贬蛙,則等價(jià)于調(diào)用unsubscribe方法。 手動分配主題分區(qū)的方法不使用消費(fèi)組管理功能谚攒。當(dāng)消費(fèi)組成員變了阳准,或者集群或主題的 元數(shù)據(jù)改變了,不會觸發(fā)分區(qū)分配的再平衡馏臭。 手動分區(qū)分配assign(Collection)不能和自動分區(qū)分配subscribe(Collection, ConsumerRebalanceListener)一起使用野蝇。 如果啟用了自動提交偏移量,則在新的分區(qū)分配替換舊的分區(qū)分配之前,會對舊的分區(qū)分 配中的消費(fèi)偏移量進(jìn)行異步提交绕沈。
public Set<TopicPartition> assignment() 獲取給當(dāng)前消費(fèi)者分配的分區(qū)集合锐想。如果訂閱是通過調(diào)用assign方法直接分配主題分區(qū), 則返回相同的集合七冲。如果使用了主題訂閱痛倚,該方法返回當(dāng)前分配給該消費(fèi)者的主題分區(qū)集 合。如果分區(qū)訂閱還沒開始進(jìn)行分區(qū)分配澜躺,或者正在重新分配分區(qū)蝉稳,則會返回none。
public Map<String, List<PartitionInfo>> listTopics() 獲取對用戶授權(quán)的所有主題分區(qū)元數(shù)據(jù)掘鄙。該方法會對服務(wù)器發(fā)起遠(yuǎn)程調(diào)用耘戚。
public List<PartitionInfo> partitionsFor(String topic) 獲取指定主題的分區(qū)元數(shù)據(jù)。如果當(dāng)前消費(fèi)者沒有關(guān)于該主題的元數(shù)據(jù)操漠,就會對服務(wù)器發(fā) 起遠(yuǎn)程調(diào)用收津。
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) 對于給定的主題分區(qū),列出它們第一個消息的偏移量浊伙。 注意撞秋,如果指定的分區(qū)不存在涝滴,該方法可能會永遠(yuǎn)阻塞鬓长。 該方法不改變分區(qū)的當(dāng)前消費(fèi)者偏移量。
public void seekToEnd(Collection<TopicPartition> partitions) 將偏移量移動到每個給定分區(qū)的最后一個呻右。 該方法延遲執(zhí)行哑子,只有當(dāng)調(diào)用過poll方法或position方法之后才可以使用舅列。 如果沒有指定分區(qū),則將當(dāng)前消費(fèi)者分配的所有分區(qū)的消費(fèi)者偏移量移動到最后卧蜓。 如果設(shè)置了隔離級別為:isolation.level=read_committed帐要,則會將分區(qū)的消費(fèi)偏移量移 動到最后一個穩(wěn)定的偏移量,即下一個要消費(fèi)的消息現(xiàn)在還是未提交狀態(tài)的事務(wù)消息弥奸。
public void seek(TopicPartition partition, long offset) 將給定主題分區(qū)的消費(fèi)偏移量移動到指定的偏移量榨惠,即當(dāng)前消費(fèi)者下一條要消費(fèi)的消息偏 移量。
若該方法多次調(diào)用其爵,則最后一次的覆蓋前面的冒冬。 如果在消費(fèi)中間隨意使用,可能會丟失數(shù)據(jù)摩渺。
public long position(TopicPartition partition) 檢查指定主題分區(qū)的消費(fèi)偏移量
public void seekToBeginning(Collection<TopicPartition> partitions) 將給定每個分區(qū)的消費(fèi)者偏移量移動到它們的起始偏移量简烤。該方法懶執(zhí)行,只有當(dāng)調(diào)用過 poll方法或position方法之后才會執(zhí)行摇幻。如果沒有提供分區(qū)横侦,則將所有分配給當(dāng)前消費(fèi)者的 分區(qū)消費(fèi)偏移量移動到起始偏移量挥萌。

準(zhǔn)備數(shù)據(jù):

# 生成文件
for i in `seq 99`; do echo "hello world $i" >> nm.txt;done
## 查看所有的topic
kafka-topics.sh --zookeeper localhost:2181/myKafka --list
# 創(chuàng)建topic
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic tp_demo_01 --partitions 3 --replication-factor 1
# 灌入數(shù)據(jù)
kafka-console-producer.sh  --broker-list localhost:9092 --topic tp_demo_01 < nm.txt
# 驗(yàn)證
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tp_demo_01 --from-beginning

API:

package com.hhb.kafka.offset;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.*;
import java.util.function.BiConsumer;

/**
 * @description:
 * @author: 
 * @date: 2020-08-17 21:38
 **/
public class MyOffsetManager {


    public static void main(String[] args) {


        Map<String, Object> map = new HashMap<>();
        map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hhb:9092");
        map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        map.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup1");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(map);
        //給消費(fèi)者組里面的消費(fèi)者分配分區(qū),懶加載枉侧,只有調(diào)用poll方法的時(shí)候引瀑,才會真正的把分區(qū)分配給消費(fèi)者
//        consumer.subscribe(Collections.singleton("tp_demo_01"));
        //如何手動給消費(fèi)者分配分區(qū)

        //1. 需要知道哪些主題可以訪問、消費(fèi)
        //返回Map<主題榨馁,List<主題的分區(qū)>>
        Map<String, List<PartitionInfo>> stringListMap = consumer.listTopics();
        stringListMap.forEach((topicName, partitionInfos) -> {
            System.err.println("當(dāng)前主題:" + topicName);
            for (PartitionInfo partitionInfo : partitionInfos) {
                System.err.println("分區(qū)信息: " + partitionInfo.toString());
            }
            System.err.println("============");
        });
        System.err.println("============");

        //獲取給當(dāng)前消費(fèi)者分配的主題分區(qū)信息
        Set<TopicPartition> assignment = consumer.assignment();
        System.err.println("打印分配之前的信息:" + assignment);
        System.err.println("============");
        //給當(dāng)前消費(fèi)者分配主題和分區(qū)
        consumer.assign(Arrays.asList(
                new TopicPartition("tp_demo_01", 0),
                new TopicPartition("tp_demo_01", 1),
                new TopicPartition("tp_demo_01", 2)
        ));

        //獲取給當(dāng)前消費(fèi)者分配的主題分區(qū)信息
        Set<TopicPartition> assignment1 = consumer.assignment();
        System.err.println("打印分配之后的信息:" + assignment1);
        System.err.println("============");

        //查看消費(fèi)者在tp_demo_01 主題 0號分區(qū)的偏移量
        long offset = consumer.position(new TopicPartition("tp_demo_01", 0));
        System.err.println("查看消費(fèi)者在tp_demo_01 主題 0號分區(qū)的位移:" + offset);
        System.err.println("============");

        //移動偏移量到開始的位置
        consumer.seekToBeginning(Arrays.asList(
                new TopicPartition("tp_demo_01", 0),
                new TopicPartition("tp_demo_01", 2)
        ));
        System.err.println("查看消費(fèi)者在tp_demo_01 主題 0號分區(qū)的位移:" + consumer.position(new TopicPartition("tp_demo_01", 0)));
        System.err.println("查看消費(fèi)者在tp_demo_01 主題 2號分區(qū)的位移:" + consumer.position(new TopicPartition("tp_demo_01", 2)));
        System.err.println("============");
        //移動偏移量到末尾的位置
        consumer.seekToEnd(Arrays.asList(
                new TopicPartition("tp_demo_01", 0),
                new TopicPartition("tp_demo_01", 2)
        ));
        System.err.println("查看消費(fèi)者在tp_demo_01 主題 0號分區(qū)的位移:" + consumer.position(new TopicPartition("tp_demo_01", 0)));
        System.err.println("查看消費(fèi)者在tp_demo_01 主題 2號分區(qū)的位移:" + consumer.position(new TopicPartition("tp_demo_01", 2)));

        System.err.println("============");
        //移動偏移量到具體位置:
        consumer.seek(new TopicPartition("tp_demo_01", 0), 15);
        System.err.println("查看消費(fèi)者在tp_demo_01 主題 0號分區(qū)的位移:" + consumer.position(new TopicPartition("tp_demo_01", 0)));

        consumer.close();
    }
}
再均衡

重平衡可以說是kafka為人詬病最多的一個點(diǎn)了憨栽。重平衡其實(shí)就是一個協(xié)議,它規(guī)定了如何讓消費(fèi)者組下的所有消費(fèi)者來分配topic中的每一個分區(qū)翼虫。 比如一個topic有100個分區(qū)屑柔,一個消費(fèi)者組內(nèi)有20個消費(fèi)者,在協(xié)調(diào)者的控制下讓組內(nèi)每一個消費(fèi)者分配到5個分區(qū)珍剑,這個分配的過程就是重平衡掸宛。重平衡的觸發(fā)條件主要有三個:

  1. 消費(fèi)者組內(nèi)成員發(fā)生變更,這個變更包括了增加和減少消費(fèi)者招拙,比如消費(fèi)者宕機(jī)退出消費(fèi)組唧瘾。
  2. 主題的分區(qū)數(shù)發(fā)生變更,kafka目前只支持增加分區(qū)别凤,當(dāng)增加的時(shí)候就會觸發(fā)重平衡
  3. 訂閱的主題發(fā)生變化饰序,當(dāng)消費(fèi)者組使用正則表達(dá)式訂閱主題,而恰好又新建了對應(yīng)的主題规哪,就會觸發(fā)重平衡


    再均衡.png

消費(fèi)者宕機(jī)菌羽,退出消費(fèi)組,觸發(fā)再平衡由缆,重新給消費(fèi)組中的消費(fèi)者分配分區(qū)。


再均衡1.png

由于broker宕機(jī)猾蒂,主題X的分區(qū)3宕機(jī)均唉,此時(shí)分區(qū)3沒有Leader副本,觸發(fā)再平衡肚菠,消費(fèi)者4沒有對 應(yīng)的主題分區(qū)舔箭,則消費(fèi)者4閑置。


再均衡2.png

主題增加分區(qū)蚊逢,需要主題分區(qū)和消費(fèi)組進(jìn)行再均衡层扶。


再均衡3.png

由于使用正則表達(dá)式訂閱主題,當(dāng)增加的主題匹配正則表達(dá)式的時(shí)候烙荷,也要進(jìn)行再均衡镜会。


再均衡4.png

為什么說重平衡為人詬病呢?因?yàn)橹仄胶膺^程中,消費(fèi)者無法從kafka消費(fèi)消息终抽,這對kafka的 TPS影響極大戳表,而如果kafka集內(nèi)節(jié)點(diǎn)較多桶至,比如數(shù)百個,那重平衡可能會耗時(shí)極多匾旭。數(shù)分鐘到數(shù)小時(shí) 都有可能镣屹,而這段時(shí)間kafka基本處于不可用狀態(tài)。所以在實(shí)際環(huán)境中价涝,應(yīng)該盡量避免重平衡發(fā)生女蜈。

避免重平衡

要說完全避免重平衡,是不可能色瘩,因?yàn)槟銦o法完全保證消費(fèi)者不會故障伪窖。而消費(fèi)者故障其實(shí)也是最 常見的引發(fā)重平衡的地方,所以我們需要保證盡力避免消費(fèi)者故障泞遗。而其他幾種觸發(fā)重平衡的方式惰许,增加分區(qū),或是增加訂閱的主題史辙,抑或是增加消費(fèi)者汹买,更多的是主動控制。如果消費(fèi)者真正掛掉了聊倔,就沒辦法了晦毙,但實(shí)際中,會有一些情況耙蔑,kafka錯誤地認(rèn)為一個正常的消費(fèi)者已經(jīng)掛掉了见妒,我們要的就是避免這樣的情況出現(xiàn)。首先要知道哪些情況會出現(xiàn)錯誤判斷掛掉的情況甸陌。 在分布式系統(tǒng)中须揣,通常是通過心跳來維持分布式系統(tǒng)的,kafka也不例外钱豁。

在分布式系統(tǒng)中耻卡,由于網(wǎng)絡(luò)問題你不清楚沒接收到心跳,是因?yàn)閷Ψ秸嬲龗炝诉€是只是因?yàn)樨?fù)載過重沒來得及發(fā)生心跳或是網(wǎng)絡(luò)堵塞牲尺。所以一般會約定一個時(shí)間卵酪,超時(shí)即判定對方掛了。而在kafka消費(fèi) 者場景中谤碳。session.timout.ms參數(shù)就是規(guī)定這個超時(shí)時(shí)間是多少溃卡。
還有一個參數(shù),heartbeat.interval.ms蜒简,這個參數(shù)控制發(fā)送心跳的頻率瘸羡,頻率越高越不容易被誤 判,但也會消耗更多資源搓茬。此外最铁,還有最后一個參數(shù)讯赏,max.poll.interval.ms,消費(fèi)者poll數(shù)據(jù)后冷尉,需要一些處理漱挎,再進(jìn)行拉 取。如果兩次拉取時(shí)間間隔超過這個參數(shù)設(shè)置的值雀哨,那么消費(fèi)者就會被踢出消費(fèi)者組磕谅。也就是說,拉 取雾棺,然后處理膊夹,這個處理的時(shí)間不能超過 max.poll.interval.ms 這個參數(shù)的值。這個參數(shù)的默認(rèn)值是 5分鐘捌浩,而如果消費(fèi)者接收到數(shù)據(jù)后會執(zhí)行耗時(shí)的操作放刨,則應(yīng)該將其設(shè)置得大一些。

三個參數(shù):

  • session.timout.ms 控制心跳超時(shí)時(shí)間尸饺,

  • heartbeat.interval.ms 控制心跳發(fā)送頻率进统,

  • max.poll.interval.ms 控制poll的間隔。

這里給出一個相對較為合理的配置浪听,如下:

  • session.timout.ms: 設(shè)置為6s
  • heartbeat.interval.ms: 設(shè)置2s
  • max.poll.interval.ms: 推薦為消費(fèi)者處理消息最長耗時(shí)再加1分鐘
消費(fèi)者攔截器

消費(fèi)者在拉取了分區(qū)消息之后螟碎,要首先經(jīng)過反序列化器對key和value進(jìn)行反序列化處理。處理完之后迹栓,如果消費(fèi)端設(shè)置了攔截器掉分,則需要經(jīng)過攔截器的處理之后,才能返回給消費(fèi)者應(yīng)用程序進(jìn)行處理克伊。


消費(fèi)者攔截器源碼.png

消費(fèi)端定義消息攔截器酥郭,需要實(shí)現(xiàn)org.apache.kafka.clients.consumer.ConsumerInterceptor<K, V> 接口。

  1. 一個可插拔接口愿吹,允許攔截甚至更改消費(fèi)者接收到的消息褥民。首要的用例在于將第三方組件引入 消費(fèi)者應(yīng)用程序,用于定制的監(jiān)控洗搂、日志處理等。
  2. 該接口的實(shí)現(xiàn)類通過configre方法獲取消費(fèi)者配置的屬性载弄,如果消費(fèi)者配置中沒有指定 clientID耘拇,還可以獲取KafkaConsumer生成的clientId。獲取的這個配置是跟其他攔截器共享 的宇攻,需要保證不會在各個攔截器之間產(chǎn)生沖突惫叛。
  3. ConsumerInterceptor方法拋出的異常會被捕獲、記錄逞刷,但是不會向下傳播嘉涌。如果用戶配置 了錯誤的key或value類型參數(shù)妻熊,消費(fèi)者不會拋出異常,而僅僅是記錄下來仑最。
  4. ConsumerInterceptor回調(diào)發(fā)生在 org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)方法同一個線程扔役。

該接口中有如下方法:

package org.apache.kafka.clients.consumer;

import java.util.Map;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.TopicPartition;

public interface ConsumerInterceptor<K, V> extends Configurable {
    // poll方法返回結(jié)果之前,最后要調(diào)用的方法
    ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> var1);
  
    // 消費(fèi)者提交偏移量的時(shí)候警医,經(jīng)過該方法
    void onCommit(Map<TopicPartition, OffsetAndMetadata> var1);
    
    // 用戶關(guān)閉該攔截器用到的資源
    void close();
}

MyConsumer:

package com.hhb.kafka.interceptor.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
 * @description:
 * @author: 
 * @date: 2020-08-18 19:25
 **/
public class MyConsumer {

    public static void main(String[] args) {

        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hhb:9092");
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "mygrp");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                "com.hhb.kafka.interceptor.consumer.MyInterceptor1,com.hhb.kafka.interceptor.consumer.MyInterceptor2,com.hhb.kafka.interceptor.consumer.MyInterceptor3");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(Collections.singleton("tp_demo_01"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(5000);
            records.forEach(record -> {
                System.err.println("消費(fèi)者:分區(qū):" + record.partition() +
                        "亿胸,主題:" + record.topic() +
                        ",提交偏移量:" + record.offset() +
                        ",key :  " + record.key() +
                        ",value: " + record.value());
            });
        }
    }
}

MyInterceptor1:

package com.hhb.kafka.interceptor.consumer;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Map;

/**
 * @description:
 * @author: 
 * @date: 2020-08-18 19:33
 **/
public class MyInterceptor1 implements ConsumerInterceptor<String, String> {
    /**
     * poll方法返回結(jié)果之前,最后要調(diào)用的方法
     *
     * @param consumerRecords
     * @return
     */
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> consumerRecords) {

        System.err.println("1   --------     開始");

        //在這里消息不做處理预皇,直接返回
        return consumerRecords;
    }

    /**
     * 消費(fèi)者提交偏移量的時(shí)候侈玄,經(jīng)過該方法
     *
     * @param map
     */
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        System.err.println("1  ----------  結(jié)束");

    }

    /**
     * 用戶關(guān)閉該攔截器用到的資源
     */
    @Override
    public void close() {

    }

    /**
     * 獲取消費(fèi)者的配置
     *
     * @param map
     */
    @Override
    public void configure(Map<String, ?> map) {

    }
}

MyInterceptor2:

package com.hhb.kafka.interceptor.consumer;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Map;

/**
 * @description:
 * @author: 
 * @date: 2020-08-18 19:33
 **/
public class MyInterceptor2 implements ConsumerInterceptor<String, String> {
    /**
     * poll方法返回結(jié)果之前,最后要調(diào)用的方法
     *
     * @param consumerRecords
     * @return
     */
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> consumerRecords) {

        System.err.println("2   --------     開始");

        //在這里消息不做處理吟温,直接返回
        return consumerRecords;
    }

    /**
     * 消費(fèi)者提交偏移量的時(shí)候序仙,經(jīng)過該方法
     *
     * @param map
     */
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        System.err.println("2  ----------  結(jié)束");

    }

    /**
     * 用戶關(guān)閉該攔截器用到的資源
     */
    @Override
    public void close() {

    }

    /**
     * 獲取消費(fèi)者的配置
     *
     * @param map
     */
    @Override
    public void configure(Map<String, ?> map) {

    }
}
消費(fèi)者參數(shù)配置補(bǔ)充
配置型 說明
bootstrap.servers 建立到Kafka集群的初始連接用到的host/port列表。 客戶端會使用這里指定的所有的host/port來建立初始連接鲁豪。 這個配置僅會影響發(fā)現(xiàn)集群所有節(jié)點(diǎn)的初始連接潘悼。 形式:host1:port1,host2:port2... 這個配置中不需要包含集群中所有的節(jié)點(diǎn)信息。 最好不要配置一個呈昔,以免配置的這個節(jié)點(diǎn)宕機(jī)的時(shí)候連不上挥等。
group.id 用于定義當(dāng)前消費(fèi)者所屬的消費(fèi)組的唯一字符串。 如果使用了消費(fèi)組的功能 subscribe(topic) 堤尾, 或使用了基于Kafka的偏移量管理機(jī)制肝劲,則應(yīng)該配置group.id。
auto.commit.interval.ms 如果設(shè)置了 enable.auto.commit 的值為true郭宝, 則該值定義了消費(fèi)者偏移量向Kafka提交的頻率辞槐。
auto.offset.reset 如果Kafka中沒有初始偏移量或當(dāng)前偏移量在服務(wù)器中不存在 (比如數(shù)據(jù)被刪掉了):<br />earliest:自動重置偏移量到最早的偏移量。<br />latest:自動重置偏移量到最后一個 <br />none:如果沒有找到該消費(fèi)組以前的偏移量沒有找到粘室,就拋異常<br />其他值:向消費(fèi)者拋異常榄檬。
fetch.min.bytes 服務(wù)器對每個拉取消息的請求返回的數(shù)據(jù)量最小值。 如果數(shù)據(jù)量達(dá)不到這個值衔统,請求等待鹿榜,以讓更多的數(shù)據(jù)累積, 達(dá)到這個值之后響應(yīng)請求锦爵。 默認(rèn)設(shè)置是1個字節(jié)舱殿,表示只要有一個字節(jié)的數(shù)據(jù), 就立即響應(yīng)請求险掀,或者在沒有數(shù)據(jù)的時(shí)候請求超時(shí)沪袭。 將該值設(shè)置為大一點(diǎn)兒的數(shù)字,會讓服務(wù)器等待稍微長一點(diǎn)兒的時(shí)間以累積數(shù)據(jù)樟氢。 如此則可以提高服務(wù)器的吞吐量冈绊,代價(jià)是額外的延遲時(shí)間侠鳄。
fetch.max.wait.ms 如果服務(wù)器端的數(shù)據(jù)量達(dá)不到 fetch.min.bytes 的話, 服務(wù)器端不能立即響應(yīng)請求死宣。 該時(shí)間用于配置服務(wù)器端阻塞請求的最大時(shí)長伟恶。
fetch.max.bytes 服務(wù)器給單個拉取請求返回的最大數(shù)據(jù)量。 消費(fèi)者批量拉取消息十电,如果第一個非空消息批次的值比該值大知押, 消息批也會返回,以讓消費(fèi)者可以接著進(jìn)行鹃骂。 即該配置并不是絕對的最大值台盯。 broker可以接收的消息批最大值通過message.max.bytes (broker配置) 或 max.message.bytes (主題配置)來指定。 需要注意的是畏线,消費(fèi)者一般會并發(fā)拉取請求静盅。
enable.auto.commit 如果設(shè)置為true,則消費(fèi)者的偏移量會周期性地在后臺提交寝殴。
connections.max.idle.ms 在這個時(shí)間之后關(guān)閉空閑的連接蒿叠。
check.crcs 自動計(jì)算被消費(fèi)的消息的CRC32校驗(yàn)值。 可以確保在傳輸過程中或磁盤存儲過程中消息沒有被破壞蚣常。 它會增加額外的負(fù)載市咽,在追求極致性能的場合禁用。
exclude.internal.topics 是否內(nèi)部主題應(yīng)該暴露給消費(fèi)者抵蚊。如果該條目設(shè)置為true施绎, 則只能先訂閱再拉取。
isolation.level 控制如何讀取事務(wù)消息贞绳。 如果設(shè)置了 read_committed 谷醉,消費(fèi)者的poll()方法只會返回已經(jīng)提交的事務(wù)消息。 如果設(shè)置了 read_uncommitted (默認(rèn)值)冈闭, 消費(fèi)者的poll方法返回所有的消息俱尼,即使是已經(jīng)取消的事務(wù)消息。
非事務(wù)消息以上兩種情況都返回萎攒。 消息總是以偏移量的順序返回遇八。read_committed 只能返回到達(dá)LSO的消息。 在LSO之后出現(xiàn)的消息只能等待相關(guān)的事務(wù)提交之后才能看到耍休。 結(jié)果刃永, read_committed 模式,如果有未提交的事務(wù)羹应, 消費(fèi)者不能讀取到直到HW的消息。read_committed 的seekToEnd方法返回LSO次屠。
heartbeat.interval.ms 當(dāng)使用消費(fèi)組的時(shí)候园匹,該條目指定消費(fèi)者向消費(fèi)者協(xié)調(diào)器發(fā)送心跳的時(shí)間間隔雳刺。心跳是為了確保消費(fèi)者會話的活躍狀態(tài), 同時(shí)在消費(fèi)者加入或離開消費(fèi)組的時(shí)候方便進(jìn)行再平衡裸违。該條目的值必須小于 session.timeout.ms 掖桦,也不應(yīng)該高于 session.timeout.ms 的1/3。
session.timeout.ms 當(dāng)使用Kafka的消費(fèi)組的時(shí)候供汛,消費(fèi)者周期性地向broker發(fā)送心 表明自己的存在枪汪。 如果經(jīng)過該超時(shí)時(shí)間還沒有收到消費(fèi)者的心跳, 則broker將消費(fèi)者從消費(fèi)組移除怔昨,并啟動再平衡雀久。 該值必須在broker配置 group.min.session.timeout.ms 和roup.max.session.timeout.ms 之間。
max.poll.records 一次調(diào)用poll()方法返回的記錄最大數(shù)量趁舀。
max.poll.interval.ms 使用消費(fèi)組的時(shí)候調(diào)用poll()方法的時(shí)間間隔赖捌。 該條目指定了消費(fèi)者調(diào)用poll()方法的最大時(shí)間間隔。 如果在此時(shí)間內(nèi)消費(fèi)者沒有調(diào)用poll()方法矮烹, 則broker認(rèn)為消費(fèi)者失敗越庇,觸發(fā)再平衡, 將分區(qū)分配給消費(fèi)組中其他消費(fèi)者奉狈。
max.partition.fetch.bytes 對每個分區(qū)卤唉,服務(wù)器返回的最大數(shù)量。消費(fèi)者按批次拉取數(shù)據(jù)仁期。 如果非空分區(qū)的第一個記錄大于這個值桑驱,批處理依然可以返回, 以保證消費(fèi)者可以進(jìn)行下去蟀拷。 broker接收批的大小由 message.max.bytes (broker參數(shù))或max.message.bytes (主題參數(shù))指定碰纬。 fetch.max.bytes 用于限制消費(fèi)者單次請求的數(shù)據(jù)量。
send.buffer.bytes 用于TCP發(fā)送數(shù)據(jù)時(shí)使用的緩沖大小(SO_SNDBUF)问芬, -1表示使用OS默認(rèn)的緩沖區(qū)大小悦析。
retry.backoff.ms 在發(fā)生失敗的時(shí)候如果需要重試,則該配置表示客戶端等待多長時(shí)間再發(fā)起重試此衅。 該時(shí)間的存在避免了密集循環(huán)强戴。
request.timeout.ms 客戶端等待服務(wù)端響應(yīng)的最大時(shí)間。如果該時(shí)間超時(shí)挡鞍, 則客戶端要么重新發(fā)起請求骑歹,要么如果重試耗盡,請求失敗墨微。
reconnect.backoff.ms 重新連接主機(jī)的等待時(shí)間道媚。避免了重連的密集循環(huán)。 該等待時(shí)間應(yīng)用于該客戶端到broker的所有連接。
reconnect.backoff.max.ms 重新連接到反復(fù)連接失敗的broker時(shí)要等待的最長時(shí)間 (以毫秒為單位)最域。 如果提供此選項(xiàng)谴分,則對于每個連續(xù)的連接失敗, 每臺主機(jī)的退避將成倍增加镀脂,直至達(dá)到此最大值牺蹄。 在計(jì)算退避增量之后,添加20%的隨機(jī)抖動以避免連接風(fēng)暴薄翅。
receive.buffer.bytes TCP連接接收數(shù)據(jù)的緩存(SO_RCVBUF)沙兰。 -1表示使用操作系統(tǒng)的默認(rèn)值。
partition.assignment.strategy 當(dāng)使用消費(fèi)組的時(shí)候翘魄,分區(qū)分配策略的類名鼎天。
metrics.sample.window.ms 計(jì)算指標(biāo)樣本的時(shí)間窗口。
metrics.recording.level 指標(biāo)的最高記錄級別熟丸。
metrics.num.samples 用于計(jì)算指標(biāo)而維護(hù)的樣本數(shù)量
interceptor.classes 攔截器類的列表训措。默認(rèn)沒有攔截器 攔截器是消費(fèi)者的攔截器,該攔截器需要實(shí)現(xiàn)org.apache.kafka.clients.consumer .ConsumerInterceptor接口光羞。攔截器可用于對消費(fèi)者接收到的消息進(jìn)行攔截處理绩鸣。

消費(fèi)組管理

什么是消費(fèi)組

consumer group是kafka提供的可擴(kuò)展且具有容錯性的消費(fèi)者機(jī)制。

三個特性:

  1. 消費(fèi)組有一個或多個消費(fèi)者纱兑,消費(fèi)者可以是一個進(jìn)程呀闻,也可以是一個線程
  2. group.id是一個字符串,唯一標(biāo)識一個消費(fèi)組
  3. 消費(fèi)組訂閱的主題每個分區(qū)只能分配給消費(fèi)組一個消費(fèi)者潜慎。
消費(fèi)者位移(consumer position)

消費(fèi)者在消費(fèi)的過程中記錄已消費(fèi)的數(shù)據(jù)捡多,即消費(fèi)位移(offset)信息。每個消費(fèi)組保存自己的位移信息铐炫,那么只需要簡單的一個整數(shù)表示位置就夠了;同時(shí)可以引入 checkpoint機(jī)制定期持久化垒手。

位移管理

自動VS手動

Kafka默認(rèn)定期自動提交位移( enable.auto.commit = true ),也手動提交位移倒信。另外kafka會定 期把group消費(fèi)情況保存起來科贬,做成一個offset map,如下圖所示:


位移管理.png

位移提交

位移是提交到Kafka中的 _consumer_offsets 主題鳖悠。 _consumer_offsets 中的消息保存了每個消費(fèi)組某一時(shí)刻提交的offset信息榜掌。

kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" -- consumer.config /mnt/module/kafka_2.12-1.0.2/config/consumer.properties --from- beginning | head
位移提交.png

__consumers_offsets 主題配置了compact策略,使得它總是能夠保存最新的位移信息乘综,既控制 了該topic總體的日志容量憎账,也能實(shí)現(xiàn)保存最新offset的目的。

什么是再均衡

再均衡(Rebalance)本質(zhì)上是一種協(xié)議卡辰,規(guī)定了一個消費(fèi)組中所有消費(fèi)者如何達(dá)成一致來分配訂 閱主題的每個分區(qū)胞皱。比如某個消費(fèi)組有20個消費(fèi)組邪意,訂閱了一個具有100個分區(qū)的主題。正常情況下反砌,Kafka平均會為每個消費(fèi)者分配5個分區(qū)抄罕。這個分配的過程就叫再均衡。

什么時(shí)候再均衡

再均衡的觸發(fā)條件:

  1. 組成員發(fā)生變更(新消費(fèi)者加入消費(fèi)組組于颖、已有消費(fèi)者主動離開或崩潰了)
  2. 訂閱主題數(shù)發(fā)生變更。如果正則表達(dá)式進(jìn)行訂閱嚷兔,則新建匹配正則表達(dá)式的主題觸發(fā)再均衡森渐。
  3. 訂閱主題的分區(qū)數(shù)發(fā)生變更
如何進(jìn)行組內(nèi)分區(qū)分配

三種分配策略:RangeAssignor和RoundRobinAssignor以及StickyAssignor。后面講冒晰。

誰來執(zhí)行再均衡和消費(fèi)組管理

Kafka提供了一個角色:Group Coordinator來執(zhí)行對于消費(fèi)組的管理同衣。Group Coordinator:每個消費(fèi)組分配一個消費(fèi)組協(xié)調(diào)器用于組管理和位移管理。當(dāng)消費(fèi)組的第一個消費(fèi)者啟動的時(shí)候壶运,它會去和Kafka Broker確定誰是它們組的組協(xié)調(diào)器耐齐。之后該消費(fèi)組內(nèi)所有消費(fèi)者和該組協(xié)調(diào)器協(xié)調(diào)通信。

如何確定Coordinator

兩步:

  1. 確定消費(fèi)組位移信息寫入 __consumers_offsets 的哪個分區(qū)蒋情。具體計(jì)算公式:
  • _consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount) 注意:groupMetadataTopicPartitionCount 由 offsets.topic.num.partitions 指定埠况,默認(rèn)是50個分區(qū)。
  1. 該分區(qū)leader所在的broker就是組協(xié)調(diào)器棵癣。
Rebalance Generation

它表示Rebalance之后主題分區(qū)到消費(fèi)組中消費(fèi)者映射關(guān)系的一個版本辕翰,主要是用于保護(hù)消費(fèi)組, 隔離無效偏移量提交的狈谊。如上一個版本的消費(fèi)者無法提交位移到新版本的消費(fèi)組中喜命,因?yàn)橛成潢P(guān)系變 了敦迄,你消費(fèi)的或許已經(jīng)不是原來的那個分區(qū)了氛驮。每次group進(jìn)行Rebalance之后法焰,Generation號都會加 1靶壮,表示消費(fèi)組和分區(qū)的映射關(guān)系到了一個新版本羹铅,如下圖所示: Generation 1時(shí)group有3個成員榜揖,隨 后成員2退出組郎楼,消費(fèi)組協(xié)調(diào)器觸發(fā)Rebalance死嗦,消費(fèi)組進(jìn)入Generation 2煎娇,之后成員4加入二庵,再次觸發(fā) Rebalance,消費(fèi)組進(jìn)入Generation 3.


Rebalance Generation.png
協(xié)議(protocol)

kafka提供了5個協(xié)議來處理與消費(fèi)組協(xié)調(diào)相關(guān)的問題:

  • Heartbeat請求:consumer需要定期給組協(xié)調(diào)器發(fā)送心跳來表明自己還活著
  • LeaveGroup請求:主動告訴組協(xié)調(diào)器我要離開消費(fèi)組
  • SyncGroup請求:消費(fèi)組Leader把分配方案告訴組內(nèi)所有成員
  • JoinGroup請求:成員請求加入組
  • DescribeGroup請求:顯示組的所有信息缓呛,包括成員信息催享,協(xié)議名稱,分配方案哟绊,訂閱信息 等因妙。通常該請求是給管理員使用

組協(xié)調(diào)器在再均衡的時(shí)候主要用到了前面4種請求。

liveness

消費(fèi)者如何向消費(fèi)組協(xié)調(diào)器證明自己還活著? 通過定時(shí)向消費(fèi)組協(xié)調(diào)器發(fā)送Heartbeat請求。如果 超過了設(shè)定的超時(shí)時(shí)間攀涵,那么協(xié)調(diào)器認(rèn)為該消費(fèi)者已經(jīng)掛了铣耘。一旦協(xié)調(diào)器認(rèn)為某個消費(fèi)者掛了,那么它 就會開啟新一輪再均衡以故,并且在當(dāng)前其他消費(fèi)者的心跳響應(yīng)中添加“REBALANCE_IN_PROGRESS”蜗细,告訴 其他消費(fèi)者:重新分配分區(qū)。

再均衡過程

再均衡分為2步:Join和Sync

  1. Join怒详, 加入組炉媒。所有成員都向消費(fèi)組協(xié)調(diào)器發(fā)送JoinGroup請求,請求加入消費(fèi)組昆烁。一旦所有 成員都發(fā)送了JoinGroup請求吊骤,協(xié)調(diào)i器從中選擇一個消費(fèi)者擔(dān)任Leader的角色,并把組成員 信息以及訂閱信息發(fā)給Leader静尼。
  2. Sync白粉,Leader開始分配消費(fèi)方案,即哪個消費(fèi)者負(fù)責(zé)消費(fèi)哪些主題的哪些分區(qū)鼠渺。一旦完成分配鸭巴,Leader會將這個方案封裝進(jìn)SyncGroup請求中發(fā)給消費(fèi)組協(xié)調(diào)器,非Leader也會發(fā) SyncGroup請求拦盹,只是內(nèi)容為空奕扣。消費(fèi)組協(xié)調(diào)器接收到分配方案之后會把方案塞進(jìn) SyncGroup的response中發(fā)給各個消費(fèi)者。


    再均衡過程1.png

注意:在協(xié)調(diào)器收集到所有成員請求前掌敬,它會把已收到請求放入一個叫purgatory(煉獄)的地方惯豆。然 后是分發(fā)分配方案的過程,即SyncGroup請求:


再均衡過程2.png

注意:消費(fèi)組的分區(qū)分配方案在客戶端執(zhí)行奔害。Kafka交給客戶端可以有更好的靈活性楷兽。Kafka默認(rèn)提供三種分配策略:range和round-robin和sticky』伲可以通過消費(fèi)者的參數(shù):partition.assignment.strategy 來實(shí)現(xiàn)自己分配策略芯杀。

消費(fèi)組狀態(tài)機(jī)

消費(fèi)組組協(xié)調(diào)器根據(jù)狀態(tài)機(jī)對消費(fèi)組做不同的處理:


消費(fèi)組狀態(tài)機(jī).png

說明:

  1. Dead:組內(nèi)已經(jīng)沒有任何成員的最終狀態(tài),組的元數(shù)據(jù)也已經(jīng)被組協(xié)調(diào)器移除了雅潭。這種狀態(tài) 響應(yīng)各種請求都是一個response: UNKNOWN_MEMBER_ID
  2. Empty:組內(nèi)無成員揭厚,但是位移信息還沒有過期。這種狀態(tài)只能響應(yīng)JoinGroup請求
  3. PreparingRebalance:組準(zhǔn)備開啟新的rebalance扶供,等待成員加入
  4. AwaitingSync:正在等待leader consumer將分配方案傳給各個成員
  5. Stable:再均衡完成筛圆,可以開始消費(fèi)。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末椿浓,一起剝皮案震驚了整個濱河市太援,隨后出現(xiàn)的幾起案子闽晦,更是在濱河造成了極大的恐慌,老刑警劉巖提岔,帶你破解...
    沈念sama閱讀 206,311評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件仙蛉,死亡現(xiàn)場離奇詭異,居然都是意外死亡碱蒙,警方通過查閱死者的電腦和手機(jī)荠瘪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來赛惩,“玉大人巧还,你說我怎么就攤上這事》唤眨” “怎么了?”我有些...
    開封第一講書人閱讀 152,671評論 0 342
  • 文/不壞的土叔 我叫張陵澎怒,是天一觀的道長褒搔。 經(jīng)常有香客問我,道長喷面,這世上最難降的妖魔是什么星瘾? 我笑而不...
    開封第一講書人閱讀 55,252評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮惧辈,結(jié)果婚禮上琳状,老公的妹妹穿的比我還像新娘。我一直安慰自己盒齿,他們只是感情好念逞,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,253評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著边翁,像睡著了一般翎承。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上符匾,一...
    開封第一講書人閱讀 49,031評論 1 285
  • 那天叨咖,我揣著相機(jī)與錄音,去河邊找鬼啊胶。 笑死甸各,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的焰坪。 我是一名探鬼主播趣倾,決...
    沈念sama閱讀 38,340評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼某饰!你這毒婦竟也來了誊酌?” 一聲冷哼從身側(cè)響起部凑,我...
    開封第一講書人閱讀 36,973評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎碧浊,沒想到半個月后涂邀,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,466評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡箱锐,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,937評論 2 323
  • 正文 我和宋清朗相戀三年比勉,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片驹止。...
    茶點(diǎn)故事閱讀 38,039評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡浩聋,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出臊恋,到底是詐尸還是另有隱情衣洁,我是刑警寧澤,帶...
    沈念sama閱讀 33,701評論 4 323
  • 正文 年R本政府宣布抖仅,位于F島的核電站坊夫,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏撤卢。R本人自食惡果不足惜环凿,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,254評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望放吩。 院中可真熱鬧智听,春花似錦、人聲如沸渡紫。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽惕澎。三九已至环肘,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間集灌,已是汗流浹背悔雹。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留欣喧,地道東北人腌零。 一個月前我還...
    沈念sama閱讀 45,497評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像唆阿,于是被迫代替她去往敵國和親益涧。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,786評論 2 345

推薦閱讀更多精彩內(nèi)容