「kafka」kafka-clients凡怎,java編寫消費者客戶端及原理剖析

與生產(chǎn)者對應的是消費者悉稠,應用程序通過KafkaConsumer來訂閱主題宫蛆,并從訂閱的主題中拉取消息。不過我們需要先了解消費者和消費組的概念的猛,否則無法理解如何使用KafkaConsumer耀盗。

消費者與消費組

每個消費者對應一個消費組,當消息發(fā)布到主題后卦尊,只會被投遞給訂閱它的每個消費組中的一個消費者叛拷。如下圖所示:



主題中,共有四個分區(qū)岂却,P0忿薇、P1、P2淌友、P3煌恢,有兩個消費組A和B都訂閱了這個主題骇陈,消費組A中有4個消費者(C0震庭、C1、C2你雌、C3)器联,消費者B有兩個消費者(C4二汛、C5)。按照Kafka默認的規(guī)則拨拓,消費組A中的每一個消費者分配到一個分區(qū)肴颊,消費組B中每一個消費者分配到兩個分區(qū),兩個消費組之間互不影響渣磷。
每個消費者只能消費被分配到的分區(qū)中的消息婿着。換言之,每個分區(qū)只能被一個消費組中的一個消費者所消費醋界。
再來看一下消費組內(nèi)消費者的個數(shù)變化時所對應分區(qū)分配的演變竟宋。假設目前某消費組內(nèi)只有一個消費者C0,訂閱了一個主題形纺,這個主題包含7個分區(qū)丘侠,P0/P1/P2/P3/P4/P5/P6。也就是說逐样,這個消費者訂閱了7個分區(qū):



此時消費組內(nèi)又增加了新的消費者C1蜗字,按照既定的邏輯,需要將原來消費者C0的部分分區(qū)分配給消費者C1消費:

C0和C1各自消費所分配的分區(qū)脂新,彼此間并無邏輯上的干擾挪捕。緊接著又增加消費者C2:



消費者與消費組這種模型可以讓整體的消費能力具備橫向伸縮,我們可以增加(減少)消費者的個數(shù)來提高(或降低)整體的消費能力争便。對于分區(qū)數(shù)固定的情況担神,一味地增加消費者并不能讓消費能力一直得到增強,如果消費者過多始花,出現(xiàn)了消費者個數(shù)大于分區(qū)個數(shù)的情況妄讯,就會有消費者分配不到任何分區(qū)。如下圖所示:

以上分配策略都是基于默認的分區(qū)分配策略進行分析的酷宵,可以通過消費者客戶端參數(shù)partition.assignment.strategy來設置消費者與訂閱主題之間的分區(qū)分配策略亥贸。

對于消息中間件而言,一般有兩種消息投遞模式:點對點模式和發(fā)布/訂閱模式浇垦。點對點模式是基于隊列的炕置,消息生產(chǎn)者發(fā)送消息到隊列,消息消費者從隊列中接收消息男韧。發(fā)布訂閱模式以主題為內(nèi)容節(jié)點朴摊,主題可以認為是消息傳遞的中介,使得消息訂閱者和發(fā)布者保持獨立此虑,不需要進行接觸即可保持消息的傳遞甚纲,在消息的一對多廣播時采用。

  • 如果消費者都屬于同一消費組朦前,那么所有的消息都會被均衡的投遞給每一個消費者介杆,即每條消息都只會被一個消費者處理鹃操,這就相當于點對點模式的應用。
  • 如果所有消費者都隸屬于不同的消費組春哨,那么所有的消息都會被廣播給所有的消費者荆隘,即每條消息都會被所有的消費者處理,這就相當于訂閱/發(fā)布應用赴背。

可以通過消費者客戶端參數(shù)group.id來配置椰拒,默認值為空字符串。消費組是邏輯上的概念凰荚,它將消費者進行歸類耸三,消費者并非邏輯上的概念,它是實際上的應用實例浇揩,它可以是一個線程仪壮,也可以是一個進程,同一個消費組內(nèi)的消費者可以部署在同一臺機器上胳徽,也可以部署在不同的機器上积锅。

客戶端開發(fā)

采用目前流行的新消費者(java語言編寫)客戶端。
一個正產(chǎn)的消費邏輯需要以下幾個步驟

  1. 配置消費者客戶端參數(shù)及創(chuàng)建響應的客戶端實例养盗。
  2. 訂閱主題缚陷。
  3. 拉取消息并消費。
  4. 提交消費位移往核。
  5. 關(guān)閉消費者實例箫爷。
    一個基本的消費者案例如下:
public class Consumer {
    public static final String brokerList = "192.168.0.138:9092";
    public static final String topic = "topic-demo";
    public static final String group = "group-id";
    public static final String client = "client-id";

    public  static final AtomicBoolean isRunning = new AtomicBoolean(true);

    public static Properties initConfig(){

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,group);
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG,client);
        return  properties;
    }
    public static void main(String[] args) {

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(initConfig());
        consumer.subscribe(Collections.singletonList(topic));
        try {
            while (isRunning.get()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                records.forEach(record->{
                    System.out.println("topic="+record.topic()+",  partition="+record.partition()+",  offset="+record.offset());
                    System.out.println("key="+record.key()+", value="+record.value());
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }
}
  • bootstrap.servers:指定kafka集群地址,可以設置一個或多個聂儒,用逗號隔開虎锚。注意這里不需要設置集群中全部的broker地址,消費者會從現(xiàn)有的配置中查找全部的集群成員衩婚。如果只設置一個地址窜护,啟動時為避免該地址機器宕機連接不到集群,最好設置兩個或兩個以上地址非春。

  • key.deserializer和value.deserializer:與生產(chǎn)者客戶端參數(shù)相對應柱徙。消費者從kafka獲取到的消息格式都是字節(jié)數(shù)組(byte[]),所以需要執(zhí)行相應的反序列化操作才能還原成原有的對象格式奇昙。

  • client.id:客戶端id护侮,如果不設置,會自動生成一個非空字符串储耐,內(nèi)容形式為consumer-1羊初,consumer-2這種格式。

消費者客戶端參數(shù)眾多弧岳,在這里羅列講解沒有意義凳忙,之后會一一詳解业踏。

訂閱主題與分區(qū)

一個消費者可以訂閱一個或多個主題禽炬。如上代碼示例涧卵,通過consumer.subscribe方式訂閱主題,對于這個方法而言腹尖,既可以以集合的方式訂閱多個主題柳恐,也可以以正則表達式的形式訂閱特定模式的主題。subscribe的幾個重載的方法如下:

void subscribe(Collection<String> var1);
void subscribe(Collection<String> var1, ConsumerRebalanceListener var2);
void assign(Collection<TopicPartition> var1);
void subscribe(Pattern var1, ConsumerRebalanceListener var2);
void subscribe(Pattern var1);

如果消費者采用的是正則表達式的方式訂閱热幔,在之后的創(chuàng)建過程中乐设,如果有人又創(chuàng)建了新的主題,并且主題的名字與正則表達式相匹配绎巨,那么這個消費者就可以消費到新添加的主題中的消息近尚。如果應用程序需要消費多個主題,并且可以處理不同的類型场勤,那么這種訂閱方式就很有效戈锻。在kafka和其他系統(tǒng)之間進行數(shù)據(jù)賦值時,這種正則表達式的方式顯得很常見和媳。

consumer.subscribe(Pattern.compile("topic-.*"));

重載方法中有一個ConsumerRebalanceListener 格遭,這個是用來設置相應的再均衡監(jiān)聽器的,之后會講留瞳。

消費者不但可以訂閱主題拒迅,還可以直接訂閱主題的特定分區(qū),通過assign方法來實現(xiàn)這一功能她倘。

void assign(Collection<TopicPartition> partitions);

這個方法只接受一個參數(shù)partitions璧微,用來指定分區(qū)集合。關(guān)于TopicPartition類硬梁,用來表示分區(qū)往毡,這個類的內(nèi)部結(jié)果如下所示:

public final class TopicPartition implements Serializable {
    private final int partition;
    private final String topic;
其他省略

有兩個屬性,partition和topic靶溜,分別代表自身的分區(qū)編號和主題名稱开瞭,這個類和我們所說的主題-分區(qū)概念對應起來。在案例代碼清單中罩息,我們使用assign方法替代subscribe方法嗤详,訂閱主題topic-demo的分區(qū)0。

consumer.assign(Arrays.asList(new TopicPartition("topic-demo",0)));//訂閱主題topic-demo的分區(qū)0

如果瓷炮,我們事先不知道主題中有多少個分區(qū)怎么辦葱色?partitionsFor方法可以用來查詢指定主題的元數(shù)據(jù)信息,定義如下:

 List<PartitionInfo> partitionsFor(String topic);

其中娘香,PartitionInfo類型即為主題的分區(qū)元數(shù)據(jù)信息:

public class PartitionInfo {
    private final String topic;//主題名稱
    private final int partition;//分區(qū)編號
    private final Node leader;//leader副本所在的位置
    private final Node[] replicas;//分區(qū)的AR集合
    private final Node[] inSyncReplicas;//分區(qū)的ISR集合
    private final Node[] offlineReplicas;//分區(qū)的OSR集合

通過partitionsFor方法的協(xié)助苍狰,我們可以通過assign方法來實現(xiàn)訂閱主題全部分區(qū)的功能:

List<TopicPartition> partitions = new ArrayList<>();
 //獲取主題的全部分區(qū)
consumer.partitionsFor("topic-demo").forEach(partitionInfo -> {
      System.out.println("分區(qū):"+partitionInfo.partition());
      partitions.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()));
    });
    //通過assign方法來實現(xiàn)訂閱主題全部分區(qū)
   consumer.assign(partitions);

既然有訂閱办龄,那就有取消訂閱,可以使用unsubscribe方法取消訂閱淋昭。

//取消訂閱
consumer.unsubscribe();

集合訂閱的方式俐填、正則表達式的訂閱方式和指定分區(qū)的訂閱方式,分別代表了3種不同的訂閱狀態(tài):AUTO_TOPICS翔忽、AUTO_PATTERN英融、USER_ASSIGNED,如果沒有訂閱那么狀態(tài)為NONE歇式。這三種狀態(tài)是互斥的驶悟,在一個消費者中,只能使用其中的一種材失。通過sbscribe方法訂閱的主題具有消費者自動再均衡的功能痕鳍,在多個消費者的情況下根據(jù)分區(qū)策略來自動分配各個消費者與分區(qū)的關(guān)系。當消費組內(nèi)的消費者增加或減少時龙巨,分區(qū)分配關(guān)系會自動調(diào)整笼呆,以實現(xiàn)消費負載均衡及故障自動轉(zhuǎn)移。而通過assign方法訂閱分區(qū)時恭应,是不具備消費者自動均衡的功能抄邀。

反序列化

「kafka」kafka-clients,java編寫生產(chǎn)者客戶端及原理剖析我們講過了生產(chǎn)者的序列化與消費者的反序列化程序demo昼榛。Kafka提供的反序列器有ByteBufferDeserializer境肾、ByteArrayDeserializer、BytesDeserializer胆屿、DoubleDeserializer奥喻、FloatDeserializer、IntegerDeserializer非迹、LongDeserializer环鲤、ShortDeserializer、StringDeserializer憎兽,這些反序列化器都實現(xiàn)了Deserializer接口冷离,該接口有三個方法:

 void configure(Map<String, ?> var1, boolean var2);//用來配置當前類
 T deserialize(String var1, byte[] var2);//用來執(zhí)行反序列化
 void close();//關(guān)閉當前序列化器

我們來看一下StringDeserilizer的源碼:

public class StringDeserializer implements Deserializer<String> {
    private String encoding = "UTF8";

    public StringDeserializer() {
    }

    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null) {
            encodingValue = configs.get("deserializer.encoding");
        }

        if (encodingValue instanceof String) {
            this.encoding = (String)encodingValue;
        }

    }
    public String deserialize(String topic, byte[] data) {
        try {
            return data == null ? null : new String(data, this.encoding);
        } catch (UnsupportedEncodingException var4) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + this.encoding);
        }
    }
    public void close() {
    }
}

configure方法用來定義編碼格式,默認就UTF-8就好了纯命,不用管這個西剥。我們看一下自定義反序列化器,只要實現(xiàn)了Deserializer接口即可:

public class UserDeserializer implements Deserializer<User> {
    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public User deserialize(String s, byte[] bytes) {
        ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
        int nameLength = byteBuffer.getInt();
        byte name[] = new byte[nameLength];
        byteBuffer.get(name,0,nameLength);
        int age = byteBuffer.getInt();

        return new User().setAge(age).setName(new String(name));
    }

    @Override
    public void close() {

    }
}


public class User {
    private String name;
    private int age = -1;

    public String getName() {
        return name;
    }

    public User setName(String name) {
        this.name = name;
        return this;
    }

    public int getAge() {
        return age;
    }

    public User setAge(int age) {
        this.age = age;return this;
    }
}

總之亿汞,就是將kafka返回的字節(jié)序列轉(zhuǎn)化成你的業(yè)務對象瞭空。關(guān)于序列化,我會在之后寫一篇當下流行的序列化方法匯總的博文,比如Avro咆畏、JSON南捂、Thrif、ProtoBuf或Protostuff等旧找,歡迎關(guān)注溺健。
這里簡單舉一例,用Protostuff來實現(xiàn)序列化與反序列化:

//依賴
    <dependency>
            <groupId>io.protostuff</groupId>
            <artifactId>protostuff-core</artifactId>
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>io.protostuff</groupId>
            <artifactId>protostuff-runtime</artifactId>
            <version>1.7.2</version>
        </dependency>

序列化:

public class ProtostuffUserSerializer implements Serializer<User> {
    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public byte[] serialize(String s, User user) {
        if (user == null){
            return null;
        }
        Schema schema = RuntimeSchema.getSchema(user.getClass());
        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
        byte[] protostuff = null;
        protostuff = ProtostuffIOUtil.toByteArray(user,schema,buffer);
        buffer.clear();
        return protostuff;
    }

    @Override
    public void close() {

    }
}

反序列化

public class ProtostuffUserDesirializer implements Deserializer<User> {
    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public User deserialize(String s, byte[] bytes) {

        if (bytes == null) {
            return null;
        }
        Schema schema = RuntimeSchema.getSchema(User.getClass());
        User user = new User();
        ProtostuffIOUtil.mergeFrom(bytes, user, schema);
        return user;
    }

    @Override
    public void close() {

    }
}

消息消費

Kafka的消費是基于拉模式的钦讳。消息的消費一般有兩種模式:推模式和拉模式矿瘦。推模式是服務器主動將消息推送給消費者枕面,拉模式是消費者向服務端發(fā)送請求拉取消息愿卒。

從代碼示例中可以看出,消費是一個不斷輪詢的過程潮秘,消費者重復調(diào)用poll方法琼开,返回的是所訂閱主題(分區(qū))上的一組消息。

返回的消息類型為ConsumerRecord枕荞,源碼如下所示:


public class ConsumerRecord<K, V> {
    public static final long NO_TIMESTAMP = -1L;
    public static final int NULL_SIZE = -1;
    public static final int NULL_CHECKSUM = -1;
    private final String topic;//主題
    private final int partition;//分區(qū)
    private final long offset;//所屬分區(qū)偏移量
    private final long timestamp;//時間戳
    //兩種類型柜候,CreateTime  和 LogAppendTime
   //分別代表消息創(chuàng)建的時間,追加到日志的時間
    private final TimestampType timestampType;
    private final int serializedKeySize;//key經(jīng)過序列化后的大小躏精,如果key為空渣刷,該值為-1
    private final int serializedValueSize;//value經(jīng)過序列化后的大小,如果value為空矗烛,該值為-1
    private final Headers headers;//消息的頭部內(nèi)容
    private final K key;//消息的鍵
    private final V value;//消息的值
    private final Optional<Integer> leaderEpoch;
    private volatile Long checksum;//CRC32的校驗值
部分省略

實例代碼中辅柴,我們通過遍歷消息集合處理每一條消息,除此之外瞭吃,我們還可以按照分區(qū)維度來進行消費碌嘀,這一點很有用谍夭,在手動提交位移時尤為明顯整慎,ConsumerRecords提供了一個records(TopicPartition)方法來獲取消息中指定分區(qū)的消息哀托,此方法的定義如下:

    public List<ConsumerRecord<K, V>> records(TopicPartition partition) {
        List<ConsumerRecord<K, V>> recs = (List)this.records.get(partition);
        return recs == null ? Collections.emptyList() : Collections.unmodifiableList(recs);
    }

修改實例代碼卫键,將所有消息按分區(qū)處理:

 //按分區(qū)處理消息
   for (TopicPartition tp : records.partitions()){//獲取所有分區(qū)
        for (ConsumerRecord<String, String> record : records.records(tp)){//獲取指定分區(qū)的消息
            System.out.println("partition:"+record.partition()+"----value:"+record.value());
        }
    }

此外荞彼,ConsumerRecords類中還提供了幾個方法來方便開發(fā)人員對消息集進行處理:

  • count方法迁央,獲取消息個數(shù)
  • isEmpty方法币励,判斷返回的消息是否為空
  • empty方法阳仔,獲取一個空的消息集

到目前為止攒霹,可以建單人位怯疤,poll方法只是拉取一下消息而已,但就其內(nèi)部邏輯而言并不簡單剔蹋,它涉及消費位移旅薄、消費者協(xié)調(diào)器、組協(xié)調(diào)器、消費者選舉少梁、分區(qū)分配的分發(fā)洛口、再均衡的邏輯、心跳等內(nèi)容凯沪。后續(xù)會詳細介紹第焰。

位移提交

對于Kafka的分區(qū)而言,它的每條消息都有唯一的offset妨马,用來表示消息在分區(qū)中對應的位置挺举。消費者使用offset來表示消費到分區(qū)中某個消息所在的位置。offset烘跺,顧名思義湘纵,偏移量,也可翻譯為位移滤淳。在每次調(diào)用poll()方法時梧喷,它返回的是還沒有消費過的消息集,要做到這一點脖咐,就需要記錄上一次消費過的位移铺敌。并且這個位移必須做持久化保存,而不是單單保存在內(nèi)存中屁擅,否則消費者重啟之后就無法知道之前的消費位移了偿凭。

當加入新的消費者的時,必然會有再均衡的動作派歌,對于同一分區(qū)而言弯囊,它可能在再均衡動作之后分配給新的消費者,如果不持久化保存消費位移硝皂,那么這個新的消費者也無法知道之前的消費位移常挚。消費者位移存儲在Kafka內(nèi)部的主題_consumer_offsets中。

這種把消費位移存儲起來(持久化)的動作稱為“提交”稽物,消費者再消費完消息之后需要執(zhí)行消費位移的提交奄毡。
如下圖,假設當前消費者已經(jīng)消費了x位置的消息贝或,那么我們就可以說消費者的消費位移為x吼过。


不過,需要明確的是咪奖,當前消費者需要提交的消費位移并不是x盗忱,而是x+1,對應上圖的position羊赵,他表示下一條需要拉取的消息的位置趟佃。在消費者中還有一個commited offset的概念扇谣,它表示已經(jīng)提交過的消費位移。

KafkaConsumer類提供了position(TopicPartition)和commited(TopicPartition)兩個方法來分別獲取上面所說的position和commiited offset的值闲昭。
為了論證lastConsumedOffset罐寨、commited offset 和position之間的關(guān)系,我們使用上面兩個方法來做相關(guān)演示序矩。我們向主題中分區(qū)編號為0的分區(qū)發(fā)送若干消息鸯绿,之后再創(chuàng)建一個消費者去消費其中的消息,等待消費完這些消息之后簸淀,同步提交消費位移瓶蝴。最后觀察上面三者的值。

//定義主題topic-demo,分區(qū)編號為0
 TopicPartition topicPartition = new TopicPartition("topic-demo",0);
 consumer.assign(Arrays.asList(topicPartition));//訂閱主題topic-demo的分區(qū)0
 long lastConsumedOffset = -1;//當前消費到的位移
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
 //獲取主題topic-demo的分區(qū)0的消息
 List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
//獲取最后一條消息的偏移量租幕,該消息是已經(jīng)消費的最后一條消息
 lastConsumedOffset = partitionRecords.get(partitionRecords.size()-1).offset();
 consumer.commitSync();//同步提交消費位移
 System.out.println("consumed Offset is  "+lastConsumedOffset);
 OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition);
 System.out.println("commited Offset is "+offsetAndMetadata.offset());
 long position = consumer.position(topicPartition);
 System.out.println("the offset of the netxt record is  "+position);

打印結(jié)果

consumed Offset is  182
commited Offset is 183
the offset of the netxt record is  183

可以看出舷手,消費者消費到此分區(qū)的最大偏移量為182,對應的消費位移lastConsumedOffset 也就是182令蛉。在消費完之后執(zhí)行同步提交聚霜,但是最終結(jié)果顯示所提交的位移commited offset 為183狡恬,并且下一次所要拉取的消息的起始偏移量position為183珠叔,結(jié)論
position = commited offset = lastConsumedOffset + 1
當然,position和commited offset的值不會一直相同弟劲,這一點會在下面的示例中有所體現(xiàn)祷安。

對于位移提交具體時機的把握也很有講究,有可能造成重復消費和消息丟失的現(xiàn)象兔乞。參考下圖所示汇鞭,x代表上一次提交的消費位移,說明已經(jīng)完成了x-1之前的所有消息的消費庸追。x+3表示當前正在處理的位置霍骄。如果poll拉取到消息之后就進行了位移提交,即提交了x+7淡溯,那么當前消費x+3的時候遇到了異常读整,在故障恢復之后, 我們重新拉取到的消息是從x+7開始的咱娶。也就是說米间,x+3到x+6之間的消息并未消費,如此便發(fā)生了消息丟失的現(xiàn)象膘侮。


再考慮另一種情形屈糊,位移提交的動作是在消費完所有拉取到的消息之后才執(zhí)行的,那么當消費x+3的時候遇到了異常琼了,在故障恢復之后逻锐,我們重新拉取的消息是從x開始的。也就是說 x到x+2之間的消息又重新消費了一遍,故而發(fā)生了重復消費的現(xiàn)象昧诱。

而實際情況可能更加復雜慷丽。在kafka中默認的消費位移的提交方式是自動提交,這個由消費客戶端參數(shù)enable.auto.commit配置鳄哭,默認為true要糊。當然這個默認的自動提交不是每消費一條消息就提交一次,而是定期提交妆丘,這個定期的周期時間由客戶端參數(shù)auto.commit.interval.ms配置锄俄,默認值為5秒,此參數(shù)生效的前提是enable.auto.commit為true勺拣。

在默認情況下奶赠,消費者客戶端每隔5秒會將拉取到的每個分區(qū)中的最大的消息位移進行提交。自動位移提交的動作實在poll方法的邏輯里完成的药有,在每次真正向服務器發(fā)起拉取請求之前會檢查是否可以進行位移提交毅戈,如果可以,那么就會提交上一次輪詢的位移愤惰。

在kafka消費的編程邏輯中位移是一大難點苇经,自動提交消費位移的方式非常簡便,它免去了復雜的位移提交邏輯宦言,讓代碼更簡潔扇单。但隨之而來的是重復消費和消費丟失的問題。假設剛提交完一次消費位移奠旺,然后拉取一批消息進行消費蜘澜,在下一次自動提交消費位移之前,消費者崩潰了响疚,那么又得從上一次位移提交的地方重新開始消費鄙信。我們可以通過減少位移提交的時間間隔來減少重復消息的窗口大小,但這樣并不能避免重復消費的發(fā)送忿晕,而且也會使位移提交更加頻繁装诡。

自動位移提交的方式在正常情況下不會發(fā)生消息丟失和重復消費的現(xiàn)象,但是在編程的世界里異常不可避免杏糙。自動提交無法做到精確的位移管理慎王。Kafka提供了手動管理位移提交的操作,這樣可以使開發(fā)人員對消費位移的管理控制更加靈活宏侍。很多時候并不是說poll拉取到消息就算消費完成赖淤,而是需要將消息寫入到數(shù)據(jù)庫、寫入本地緩存谅河,或者是更加復雜的業(yè)務處理咱旱。在這些場景下确丢,所有的業(yè)務處理完成才能認為消息被成功消費,手動的提交方式讓開發(fā)人員根據(jù)程序的邏輯在合適的地方進行位移提交吐限。手動提交功能的前提是enable.auto.commit配置為false鲜侥,手動提交分為同步提交和異步提交,對應于KafkaConsumer中的commitSync和commitAsync兩個方法诸典。

  • commitSync
   ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
//                按分區(qū)處理消息
  for (TopicPartition tp : records.partitions()){
          for (ConsumerRecord<String, String> record : records.records(tp)){
                System.out.println("partition:"+record.partition()+"----value:"+record.value());
          }
  }
  consumer.commitSync();

先將拉取到的每一條消息進行處理描函,然后對整個消息集做同步提交。針對上面的示例還可以修改為批量處理+批量提交的方式狐粱。

final int minBatchSize = 200;
List<ConsumerRecord> buffer = new Arraylist<>();
whie(true){
   ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
  for (TopicPartition tp : records.partitions()){
          for (ConsumerRecord<String, String> record : records.records(tp)){
            buffer.add(record);          
          }
    }
if(buffer.size() >= minBatchSize){
    //do local processing with buffer
  consumer.commitSync();
  buffer.clear;
}
}

上面的示例中舀寓,將拉取到的消息存入緩存buffer,等到累積到足夠多的時候肌蜻,再做相應的批量處理互墓,之后再做批量提交。

這兩個示例都有重復消費的問題蒋搜,如果在業(yè)務邏輯處理完之后篡撵,并且在同步位移提交之前,程序出現(xiàn)了崩潰豆挽。那么恢復之后育谬,只能從上一次位移提交的地方拉取消息。

commitSync方法會根據(jù)poll拉取到的最新位移來進行提交祷杈,即position的位置斑司,只要沒有發(fā)生不可恢復的錯誤,它就會阻塞消費者線程直至位移提交完成但汞。對于不可恢復的錯誤,如CommitFailedException/WakeupException/InterruptException/AuthenticationException/AuthorizationException等互站,我們可以將其捕獲并做針對性的處理私蕾。

commitSync提交位移的頻率和拉取批次消息、處理批次消息的頻率是一致的胡桃,如果想尋求更細粒度踩叭、更準確的提交,那么就需要commitSync另一個含參的方法翠胰,

    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)

參數(shù)offsets用來提交指定分區(qū)的位移容贝。無參的commitSync方法只能提交當前批次對應的position值。如果需要提交一個中間值之景,比如業(yè)務每消費一條消息就提交一次位移斤富,那么就可以使用這種方式

  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
  records.forEach(record->{
       System.out.println("topic="+record.topic()+",  partition="+record.partition()+",  offset="+record.offset());
       long offset = record.offset();
       TopicPartition partition = new TopicPartition(record.topic(), record.partition());
       //每消費一條消息提交一次位移
       consumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(offset+1)));
       System.out.println("_______________________________");
    });

在實際應用中,很少有這種每消費一條消息锻狗,就提交一次消費位移的場景满力。commitSync方法本身是同步進行的焕参,會消耗一定的性能。更多的時候油额,是按照分區(qū)的粒度劃分提交位移的界限叠纷,

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
// 按分區(qū)處理消息
for (TopicPartition tp : records.partitions()){
   //獲取當前分區(qū)的所有消息
  List<ConsumerRecord<String,String>> partitionRecords = records.records(tp);
  for (ConsumerRecord<String, String> record : partitionRecords){
       System.out.println("partition:"+record.partition()+"----value:"+record.value());
  }
  //當前分區(qū)最后一條消息的位移
  long lastConsumedOffset = partitionRecords.get(partitionRecords.size() -1).offset();
  //按分區(qū)的粒度,進行位移提交
  consumer.commitSync(Collections.singletonMap(tp,new OffsetAndMetadata(lastConsumedOffset+1)));
 }

與commitSync相反潦嘶,異步提交的方式commitAsync在執(zhí)行的時候涩嚣,消費者線程不會阻塞,可能在提交消費位移的結(jié)果返回之前就開始了新一輪的拉取操作掂僵』貉蓿可以是消費者的性能增強。

void commitAsync();
void commitAsync(OffsetCommitCallback var1);
void commitAsync(Map<TopicPartition, OffsetAndMetadata> var1, OffsetCommitCallback var2);

第一個無參的方法和第三個方法中的offsets都很好理解看峻,對照commitSync方法即可阶淘。關(guān)鍵是這里第二個方法和第三個方法中的OffsetCommitCallback參數(shù),它提供了一個異步提交的回調(diào)方法互妓,當位移提交完成后回調(diào)OffsetCommitCallback里的onComplete方法:

      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
//                按分區(qū)處理消息
                for (TopicPartition tp : records.partitions()){
                    //獲取當前分區(qū)的所有消息
                    List<ConsumerRecord<String,String>> partitionRecords = records.records(tp);
                    for (ConsumerRecord<String, String> record : partitionRecords){
                        System.out.println("partition:"+record.partition()+"----value:"+record.value());
                    }
                    //當前分區(qū)最后一條消息的位移
                    long lastConsumedOffset = partitionRecords.get(partitionRecords.size() -1).offset();
                    //按分區(qū)的粒度溪窒,進行位移提交
                    consumer.commitAsync(Collections.singletonMap(tp, new OffsetAndMetadata(lastConsumedOffset + 1)), new OffsetCommitCallback() {
                        @Override
                        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
                            if(e == null){
                                System.out.println(map);
                            }else{
                                System.out.println("提交失敗");
                            }
                        }
                    });
                }

控制或關(guān)閉消費

KafkaConsumer提供了對消費速度進行控制的方法,有些場景冯勉,需要我們暫停某些分區(qū)的消費而先消費其他分區(qū)澈蚌,當達到一定條件時再恢復這些分區(qū)的消費。pause()和resume()方法來分別實現(xiàn)暫停某些分區(qū)在拉取操作時返回數(shù)據(jù)給客戶端和恢復某些分區(qū)向客戶端返回數(shù)據(jù)的操作灼狰。

    void pause(Collection<TopicPartition> var1);
    void resume(Collection<TopicPartition> var1);

還有一個無參的paused方法返回被暫停的分區(qū)集合

Set<TopicPartition> paused();

之前的示例展示的都是使用一個while循環(huán)來包裹住poll方法及相應的消費邏輯宛瞄,如何優(yōu)雅的退出這個循環(huán)也很有考究。還有一種方式是調(diào)用KafkaConsumer的wakeup方法交胚,調(diào)用該方法可以退出poll的邏輯份汗,并拋出WakeupException異常,我們不需要處理這個異常蝴簇,它只是跳出循環(huán)的方式杯活。

跳出循環(huán)以后一定要顯示執(zhí)行關(guān)閉動作以釋放運行過程中占用的各種系統(tǒng)資源,包括內(nèi)存資源熬词,socket連接等等旁钧。KafkaConsumer提供了close方法實現(xiàn)關(guān)閉

指定位移消費

正是有了消費位移的持久化,才使消費者在關(guān)閉互拾、崩潰或者遇到再均衡的時候歪今,可以讓接替的消費者能夠根據(jù)存儲的消費位移繼續(xù)進行消費。

當一個新的消費組建立的時候颜矿,它根本沒有可以查找的消費位移寄猩。或者消費組內(nèi)的一個新消費者訂閱了一個新的主題或衡,它也沒有可以查找的消費位移焦影。

當消費者查找不到所記錄的消費位移的時候车遂,就會根據(jù)消費者客戶端參數(shù)auto.offset.reset的配置來決定從何處開始進行消費,這個參數(shù)的默認值為latest斯辰,表示從分區(qū)末尾開始消費舶担。



如圖,按照默認的配置彬呻,消費者會從8開始消費衣陶,更加確切的說是從8開始拉取消息。如果將auto.offset.reset設置成earliest闸氮,那么消費者會從起始處剪况,也就是0開始消費。

properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

到目前為止蒲跨,我們知道消息的拉取是根據(jù)poll方法中的邏輯來處理的译断,這個邏輯對于普通開發(fā)人員來說是個黑盒子,無法精確的掌控其消費的起始位置或悲。有些場景我們需要更細粒度的掌控孙咪,可以讓我們從特定的位移處開始拉取消息,seek方法正好提供了這個功能巡语,讓我們得以追前消費或回溯消費翎蹈。

void seek(TopicPartition var1, long offset);

seek方法中的參數(shù)partition表示分區(qū),而offset參數(shù)用來指定從分區(qū)的哪個位置開始消費男公。seek方法只能重置消費者分配到的分區(qū)的消費位置荤堪,而分區(qū)的分配是在poll方法的調(diào)用過程中實現(xiàn)的。也就是說在執(zhí)行seek方法之前需要先執(zhí)行一次poll方法枢赔,等到分配到分區(qū)之后才可以重置消費位置澄阳。

 consumer.poll(Duration.ofMillis(10000));
 Set<TopicPartition> assignment = consumer.assignment();
 for(TopicPartition tp : assignment){
      consumer.seek(tp,2);
 }
 while (true){
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                .....
 }

consumer.seek(tp,2)設置每個分區(qū)消費的位置是2。

如果消費組內(nèi)的消費者在啟動的時候能夠找到消費位移糠爬,除非發(fā)生越界寇荧,否則auto.offset.reset參數(shù)并不會奏效,此時如果想指定從開頭或末尾開始消費执隧,就需要seek方法的幫助了,如下代碼所示:

  Set<TopicPartition> set = new HashSet<>();
        while (set.size() == 0){
            consumer.poll(Duration.ofMillis(10000));
            set = consumer.assignment();
        }
         Map<TopicPartition, Long> topicPartitionLongMap = consumer.endOffsets(set);
        for (TopicPartition tp: set){
            consumer.seek(tp,topicPartitionLongMap.get(tp));
        }

endOffsets方法用來獲取指定分區(qū)的末尾的消息位置户侥,與endOffsets對應的是beginningOffsets镀琉,一個分區(qū)的起始為止起初是0,但并不代表每時每刻都是0蕊唐,因為日志清理的動作會清理舊的數(shù)據(jù)屋摔,所以分區(qū)的位置會自然而然的增加。

有時候我們并不知道特定的消費位置替梨,卻知道一個相關(guān)的時間點钓试,比如我們想要消費昨天8點之后的消息装黑,KafkaConsumer提供了一個offsetForTimes方法:

    Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);
    Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> var1, Duration timestampsToSearch);

timestampsToSearch是一個Map類型,key為待查詢的分區(qū)弓熏,而value為待查詢的時間戳恋谭,該方法會返回時間戳大于等于待查詢時間的第一條消息對應的位置和時間戳,對應于OffsetAndTimestamp中的offset和timestamp字段挽鞠。

 Set<TopicPartition> set = new HashSet<>();
        Map<TopicPartition,Long> map = new HashMap<>();
        while (set.size() == 0){
            consumer.poll(Duration.ofMillis(10000));
            set = consumer.assignment();
        }
        for (TopicPartition tp: set){
            map.put(tp,System.currentTimeMillis()-1*24*3600*1000);
        }
        Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(map);

        for (TopicPartition tp: set){
             OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
             if (offsetAndTimestamp!=null)
                 consumer.seek(tp,offsetAndTimestamp.offset());

        }

消費者攔截器

消費者攔截器主要在消費到消息或在提交消費位移的時候進行一些定制化的工作疚颊。
消費者攔截器需要實現(xiàn)ConsumerInterceptor接口,該接口有三個方法:

public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
    ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> var1);
    void onCommit(Map<TopicPartition, OffsetAndMetadata> var1);
    void close();
}

KafkaConsumer會在poll方法返回之前調(diào)用攔截器的onConsume方法來對消息進行相應的定制化操作信认,比如修改返回的內(nèi)容材义、按照某種規(guī)則過濾消息。如果onConsume方法拋出異常嫁赏,那么會被捕獲并記錄到日志其掂,但是異常不會在向上傳遞。

KafkaConsumer會在提交完消費位移之后調(diào)用調(diào)用攔截器的onCommit方法潦蝇,可以使用這個方法來記錄跟蹤所提交的位移信息款熬,比如當消費者調(diào)用commitSync的無參方法時,我們不知道提交的具體細節(jié)护蝶,可以使用攔截器onCommit方法做到這一點华烟。

在某些場景中,會對消息設置一個有效期的屬性持灰,如果某條消息在既定的時間窗口內(nèi)無法到達盔夜,那么就被視為無效,它也不需要再被繼續(xù)處理了堤魁。下面使用消費者攔截器實現(xiàn)一個簡單的TTL功能


public class ConsumerInterceptorTTL implements ConsumerInterceptor<String,String> {

    private  static  final  long EXPIRE_INTERVAL = 10 * 1000;
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> consumerRecords) {
        long now = System.currentTimeMillis();
        //構(gòu)建新的分區(qū)消息映射表
        Map<TopicPartition, List<ConsumerRecord<String,String>>> newRecords = new HashMap<>();
        //遍歷分區(qū)
        for (TopicPartition tp : consumerRecords.partitions()){
            //獲取分區(qū)內(nèi)的消息
            List<ConsumerRecord<String,String>> tpRecords = consumerRecords.records(tp);
            List<ConsumerRecord<String,String>> newTpRecords = new ArrayList<>();
            //遍歷消息喂链,做判斷
            for (ConsumerRecord<String, String> tpRecord : tpRecords) {
                //拿到10秒以內(nèi)的消息
                if (now - tpRecord.timestamp() < EXPIRE_INTERVAL){
                    newTpRecords.add(tpRecord);
                }
            }
            if (!newTpRecords.isEmpty()){
                newRecords.put(tp, newTpRecords);
            }
        }
        return new ConsumerRecords<>(newRecords);
    }
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        map.forEach((tp,offset)->{
            System.out.println(tp+":"+offset.offset());
        });
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map<String, ?> map) {
    }
}

我們使用消息的timestamp字段來判定是否過期,如果消息的時間戳與當前的時間戳相差超過10秒則判定為過期妥泉,那么這條消息也就被過濾掉而不返回給消費者客戶端椭微。

自定義攔截器實現(xiàn)后,需要在KafkaConsumer中配置該攔截器盲链,通過參數(shù)interceptor.classes參數(shù)實現(xiàn):

properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptorTTL.class);

關(guān)于KafkaConsumer的多線程實現(xiàn)

KafkaProducer是線程安全的蝇率,然而KafkaConsumer是非線程安全的。KafkaConsumer當中定義了一個acquire方法刽沾,用來檢測當前是否只有一個線程在操作本慕,若有其他線程正在操作則會拋出異常。KafkaConsumer中的每個公用方法在執(zhí)行所要執(zhí)行的動作之前都會調(diào)用這個方法侧漓,只有wakeup方法是個例外锅尘。

   private void acquire() {
        long threadId = Thread.currentThread().getId();
        if (threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {
            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
        } else {
            this.refcount.incrementAndGet();
        }
    }

KafkaConsumer非線程安全并不意味著我們在消費消息的時候只能以單線程的方式運行。如果生產(chǎn)者發(fā)送消息的速度大于消費者處理消息的速度布蔗,那么就會有越來越多的消息得不到及時的處理藤违,造成一定的時延浪腐。除此之外,kafka中存在消息保留機制顿乒,有些消息有可能在被消費之前就被清理了议街,從而造成消息的丟失。我們可以通過多線程的方式實現(xiàn)消息消費淆游,多線程的目的就是提高整體的消費能力傍睹。多線程的實現(xiàn)方式有多種,第一種也是最常見的方式:線程封閉犹菱,即為每個線程實例化一個KafkaConsumer對象拾稳。

一個線程對一個KafkaConsumer實例,我們可以稱為消費線程腊脱。一個消費線程可以消費一個或多個分區(qū)中的消息访得,所有的消費線程都隸屬于同一個消費組。這種方式實現(xiàn)的并發(fā)度受限于分區(qū)的實際個數(shù)陕凹,文章開頭講過悍抑,當消費者個數(shù)大于分區(qū)個數(shù)時,就會有部分消費線程一直處于空閑的狀態(tài)杜耙。

第二種方式是搜骡,多個消費線程同時消費同一個分區(qū),這個通過assign佑女、seek等方法實現(xiàn)记靡,這樣可以打破原有的消費線程的個數(shù)不能超過分區(qū)數(shù)的限制,不過這種方式對于位移提交和順序控制的處理就會變得非常復雜团驱,實際應用的很少摸吠。一般而言,分區(qū)時消費線程的最小劃分單位嚎花。我們通過實際編碼實現(xiàn)第一種:


public class MultiConsumerThreadDemo {
    public static final String brokerList = "192.168.3.8:9092";
    public static final String topic = "topic";
    public static final String group = "group-id42";
    public static final String client = "client-id2";
    public static Properties initConfig(){

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //消費位移自動提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);

        properties.put(ConsumerConfig.GROUP_ID_CONFIG,group);
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG,client);
        return  properties;
    }

    public static void main(String[] args) {
        Properties props = initConfig();
        int consumerThreadNum = 4;
        for (int i=0;i<consumerThreadNum;i++){
            new KafkaConsumerThread(props,topic).start();
        }
    }
    public static class KafkaConsumerThread extends Thread{

        private  KafkaConsumer<String,String> kafkaConsumer;

        public  KafkaConsumerThread(Properties props,String topic){
            this.kafkaConsumer = new KafkaConsumer<String, String>(props);
            this.kafkaConsumer.subscribe(Arrays.asList(topic));
        }

        @Override
        public void run() {

            try {
                while (true){
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                    records.forEach(record->{
                        System.out.println("topic="+record.topic()+",  partition="+record.partition()+",  offset="+record.offset());
                        //消息處理
                    });

                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                this.kafkaConsumer.close();
            }

        }
    }


}

內(nèi)部類KafkaConsumerThread代表消費線程寸痢,內(nèi)部包裹著一 個獨立的KafkaConsumer實例。通過main方法來啟動多個消費線程紊选,一般來講一個主題的分區(qū)數(shù)在開發(fā)時就是確定的啼止,可以將consumerThreadNum設置成不大于分區(qū)數(shù)的值,如果不知道主題的分區(qū)數(shù)兵罢,也可以通過之前講的partitionsFor方法來動態(tài)獲取族壳。

這種方式的優(yōu)點是每個線程可以按順序消費各個分區(qū)中的消息。缺點是趣些,每個消費線程都要維護一個獨立的TCP鏈接,如果分區(qū)數(shù)和consumerThreadNum都很大贰您,那么會造成不小的系統(tǒng)開銷坏平。

如果消費者對消息處理的速度很快拢操,那么poll拉取的頻次也會更高,進而整體消費性能也會提升舶替。相反令境,如果客戶端對消息的處理速度很慢,比如進行一個事務性操作顾瞪,或者等待一個RPC的同步響應舔庶,那么poll的拉取頻次也會下降,進而造成整體的性能下降陈醒。一般而言惕橙,poll拉取的速度是相當快的,而整體消費的瓶頸也正是消息處理這一塊钉跷,我們可以將處理消息部分改成多線程的實現(xiàn)方式弥鹦,如下圖所示



代碼如下:

public class MultiConsumerThreadDemo1 {
    public static final String brokerList = "192.168.3.8:9092";
    public static final String topic = "topic";
    public static final String group = "group-id42";
    public static final String client = "client-id2";
    public static Properties initConfig(){

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //消費位移自動提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);

        properties.put(ConsumerConfig.GROUP_ID_CONFIG,group);
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG,client);
        return  properties;
    }

    public static void main(String[] args) {
        Properties props = initConfig();
        int consumerThreadNum = 4;
        for (int i=0;i<consumerThreadNum;i++){
            new KafkaConsumerThread(props,topic,Runtime.getRuntime().availableProcessors()).start();
        }
    }
    public static class KafkaConsumerThread extends Thread{

        private  KafkaConsumer<String,String> kafkaConsumer;
        private  ExecutorService executorService;
        private int threadNum;

        public  KafkaConsumerThread(Properties props, String topic, int processorNum){
            this.kafkaConsumer = new KafkaConsumer<String, String>(props);
            this.kafkaConsumer.subscribe(Arrays.asList(topic));
            this.threadNum = processorNum;
            executorService = new ThreadPoolExecutor(
                    threadNum,
                    threadNum,
                    0L,
                    TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<>(1000),
                    new ThreadPoolExecutor.CallerRunsPolicy());
        }

        @Override
        public void run() {

            try {
                while (true){
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                        //消息處理
                        executorService.submit(new RecordsHandler(records));
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                this.kafkaConsumer.close();
            }
        }
    }

    public static class RecordsHandler implements Runnable {
        private final  ConsumerRecords<String,String > records;
        public RecordsHandler(ConsumerRecords<String, String> records) {
            this.records = records;
        }
        @Override
        public void run() {
            //真正處理records的地方
        }
    }
}

RecordHandler類是用來處理消息的,而KafkaConsumerThread類對應的是一個消費線程爷辙,里面通過線程池的方式來調(diào)用RecordHandler處理一批批消息彬坏。注意KafkaConsumerThread類中ThreadPollExecutor里的最后一個參數(shù)設置的是CallerRunsPolicy,這樣可以防止線程池的總體消費能力跟不上poll拉取的能力從而導致異诚チ溃現(xiàn)象的發(fā)生栓始。但是這種方式對消息的順序處理能力就比較困難了。注意血当,代碼中的參數(shù)配置properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);幻赚,旨在說明在具體實現(xiàn)的時候并沒有考慮位移提交的情況。對于第一種實現(xiàn)方式而言歹颓,如果要做具體的位移提交坯屿,直接在KafkaConsumerThread中的run方法里實現(xiàn)即可。我們引入一個共享變量offsets來參與提交


每一個處理消息的RecordHandler類在處理完消息之后都將對應的消費位移保存到共享變量offsets中巍扛,KafkaConsumerThread在每一次poll方法之后都讀取offsets中的內(nèi)容并對其進行位移提交领跛。注意在實現(xiàn)過程中需要對其進行加鎖操作,防止出現(xiàn)并發(fā)問題撤奸。并且在寫入offsets的時候需要注意位移覆蓋的問題

public class MultiConsumerThreadDemo1 {
    public static final String brokerList = "192.168.3.8:9092";
    public static final String topic = "topic";
    public static final String group = "group-id42";
    public static final String client = "client-id2";
    public static Properties initConfig(){

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //消費位移自動提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);

        properties.put(ConsumerConfig.GROUP_ID_CONFIG,group);
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG,client);
        return  properties;
    }

    static Map<TopicPartition, OffsetAndMetadata> offsets =new HashMap<>();

    public static void main(String[] args) {
        Properties props = initConfig();
        int consumerThreadNum = 4;
        for (int i=0;i<consumerThreadNum;i++){
            new KafkaConsumerThread(props,topic,Runtime.getRuntime().availableProcessors()).start();
        }
    }
    public static class KafkaConsumerThread extends Thread{

        private  KafkaConsumer<String,String> kafkaConsumer;
        private  ExecutorService executorService;
        private int threadNum;

        public  KafkaConsumerThread(Properties props, String topic, int processorNum){
            this.kafkaConsumer = new KafkaConsumer<String, String>(props);
            this.kafkaConsumer.subscribe(Arrays.asList(topic));
            this.threadNum = processorNum;
            executorService = new ThreadPoolExecutor(
                    threadNum,
                    threadNum,
                    0L,
                    TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<>(1000),
                    new ThreadPoolExecutor.CallerRunsPolicy());
        }

        @Override
        public void run() {

            try {
                while (true){
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                        //消息處理
                        executorService.submit(new RecordsHandler(records));
                        //位移提交工作
                        synchronized (offsets){
                            if(!offsets.isEmpty()){
                                kafkaConsumer.commitSync(offsets);
                                offsets.clear();
                            }
                        }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                this.kafkaConsumer.close();
            }
        }
    }

    public static class RecordsHandler implements Runnable {
        private final  ConsumerRecords<String,String > records;
        public RecordsHandler(ConsumerRecords<String, String> records) {
            this.records = records;
        }
        @Override
        public void run() {
            //真正處理records的地方

            //處理完后進行位移操作
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> tpRecords = this.records.records(partition);
                long lastConsumedOffset = tpRecords.get(tpRecords.size()-1).offset();
                synchronized (offsets){
                    if (!offsets.containsKey(partition)){
                        offsets.put(partition,new OffsetAndMetadata(lastConsumedOffset+1));
                    }else{
                        long position = offsets.get(partition).offset();
                        if (position<lastConsumedOffset+1){
                            offsets.put(partition,new OffsetAndMetadata(lastConsumedOffset+1));
                        }
                    }
                }
                
            }
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末吠昭,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子胧瓜,更是在濱河造成了極大的恐慌矢棚,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件府喳,死亡現(xiàn)場離奇詭異蒲肋,居然都是意外死亡,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進店門兜粘,熙熙樓的掌柜王于貴愁眉苦臉地迎上來申窘,“玉大人,你說我怎么就攤上這事孔轴√攴ǎ” “怎么了?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵路鹰,是天一觀的道長贷洲。 經(jīng)常有香客問我,道長晋柱,這世上最難降的妖魔是什么优构? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘没炒。我一直安慰自己,他們只是感情好玉凯,可當我...
    茶點故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著联贩,像睡著了一般漫仆。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上泪幌,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天盲厌,我揣著相機與錄音,去河邊找鬼祸泪。 笑死吗浩,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的没隘。 我是一名探鬼主播懂扼,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼右蒲!你這毒婦竟也來了阀湿?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤瑰妄,失蹤者是張志新(化名)和其女友劉穎陷嘴,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體间坐,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡灾挨,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年邑退,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片涨醋。...
    茶點故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡瓜饥,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出浴骂,到底是詐尸還是另有隱情,我是刑警寧澤宪潮,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布溯警,位于F島的核電站,受9級特大地震影響狡相,放射性物質(zhì)發(fā)生泄漏梯轻。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一尽棕、第九天 我趴在偏房一處隱蔽的房頂上張望喳挑。 院中可真熱鬧,春花似錦滔悉、人聲如沸伊诵。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽曹宴。三九已至,卻和暖如春歉提,著一層夾襖步出監(jiān)牢的瞬間笛坦,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工苔巨, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留版扩,地道東北人。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓侄泽,卻偏偏與公主長得像礁芦,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子蔬顾,可洞房花燭夜當晚...
    茶點故事閱讀 44,577評論 2 353