Kafka消費(fèi)者:讀消息從Kafka

前言

讀完本文,你將了解到如下知識(shí)點(diǎn):

  1. kafka 的消費(fèi)者 和 消費(fèi)者組
  2. 如何正確使用kafka consumer
  3. 常用的 kafka consumer 配置

消費(fèi)者 和 消費(fèi)者組

  1. 什么是消費(fèi)者扰她?
    顧名思義九杂,消費(fèi)者就是從kafka集群消費(fèi)數(shù)據(jù)的客戶端菇晃,如下圖,展示了一個(gè)消費(fèi)者從一個(gè)topic中消費(fèi)數(shù)據(jù)的模型

    image
  2. 單個(gè)消費(fèi)者模型存在的問(wèn)題健爬?
    如果這個(gè)時(shí)候 kafka 上游生產(chǎn)的數(shù)據(jù)很快佩耳,超過(guò)了這個(gè)消費(fèi)者1 的消費(fèi)速度砂竖,那么就會(huì)導(dǎo)致數(shù)據(jù)堆積真椿,產(chǎn)生一些大家都知道的蛋疼事情了,那么我們只能加強(qiáng) 消費(fèi)者 的消費(fèi)能力乎澄,所以也就有了我們下面來(lái)說(shuō)的 消費(fèi)者組

  3. 什么是消費(fèi)者組突硝?
    所謂 消費(fèi)者組,其實(shí)就是一組 消費(fèi)者 的集合置济,當(dāng)我們看到下面這張圖是不是就特別舒服了解恰,我們采用了一個(gè) 消費(fèi)組 來(lái)消費(fèi)這個(gè) topic,眾人拾柴火焰高浙于,其消費(fèi)能力那是按倍數(shù)遞增的护盈,所以這里我們一般來(lái)說(shuō)都是采用 消費(fèi)者組 來(lái)消費(fèi)數(shù)據(jù),而不會(huì)是 單消費(fèi)者 來(lái)消費(fèi)數(shù)據(jù)的羞酗。這里值得我們注意的是:

    • 一個(gè)topic 可以被 多個(gè) 消費(fèi)者組 消費(fèi)腐宋,但是每個(gè) 消費(fèi)者組 消費(fèi)的數(shù)據(jù)是 互不干擾的,也就是說(shuō)檀轨,每個(gè) 消費(fèi)組 消費(fèi)的都是 完整的數(shù)據(jù) 胸竞。
    • 一個(gè)分區(qū)只能被 同一個(gè)消費(fèi)組內(nèi) 的一個(gè) 消費(fèi)者 消費(fèi),而 不能拆給多個(gè)消費(fèi)者 消費(fèi)
image
  1. 是不是一個(gè) 消費(fèi)組 的 消費(fèi)者 越多其消費(fèi)能力就越強(qiáng)呢裤园?
    圖3 我們就很好的可以回答這個(gè)問(wèn)題了撤师,我們可以看到 消費(fèi)者4 是完全沒(méi)有消費(fèi)任何的數(shù)據(jù)的剂府,所以如果你想要加強(qiáng) 消費(fèi)者組 的能力拧揽,除了添加消費(fèi)者,分區(qū)的數(shù)量也是需要跟著增加的,只有這樣他們的并行度才能上的去淤袜,消費(fèi)能力才會(huì)強(qiáng)痒谴。
image
  1. 為了提高 消費(fèi)組 的 消費(fèi)能力,我是不是可以隨便添加 分區(qū) 和 消費(fèi)者 呢铡羡?
    答案當(dāng)然是否定的啦积蔚。。烦周。嘿嘿
    我們看到圖2尽爆,一般來(lái)說(shuō)我們建議 消費(fèi)者 數(shù)量 和 分區(qū) 數(shù)量是一致的,當(dāng)我們的消費(fèi)能力不夠時(shí)读慎,就必須通過(guò)調(diào)整分區(qū)的數(shù)量來(lái)提高并行度漱贱,但是,我們應(yīng)該盡量來(lái)避免這種情況發(fā)生夭委,比如:
    現(xiàn)在我們需要在圖2的基礎(chǔ)上增加一個(gè) 分區(qū)4幅狮,那么這個(gè) 分區(qū)4 該由誰(shuí)來(lái)消費(fèi)呢?這個(gè)時(shí)候kafka會(huì)進(jìn)行 分區(qū)再均衡株灸,來(lái)為這個(gè)分區(qū)分配消費(fèi)者崇摄,分區(qū)再均衡 期間該 消費(fèi)組是不可用的,并且作為一個(gè) 被消費(fèi)者慌烧,分區(qū)數(shù)的改動(dòng)將影響到每一個(gè)消費(fèi)者組 逐抑,所以在創(chuàng)建 topic 的時(shí)候,我們就應(yīng)該考慮好分區(qū)數(shù)屹蚊,來(lái)盡量避免這種情況發(fā)生

  2. 分區(qū)分配過(guò)程
    上面我們提到了為 分區(qū)分配消費(fèi)者泵肄,那么我們現(xiàn)在就來(lái)看看分配過(guò)程是怎么樣的。

    1. 確定 群組協(xié)調(diào)器
      每當(dāng)我們創(chuàng)建一個(gè)消費(fèi)組淑翼,kafka 會(huì)為我們分配一個(gè) broker 作為該消費(fèi)組的 coordinator(協(xié)調(diào)器)
    2. 注冊(cè)消費(fèi)者 并選出 leader consumer
      當(dāng)我們的有了 coordinator 之后腐巢,消費(fèi)者將會(huì)開始往該 coordinator上進(jìn)行注冊(cè),第一個(gè)注冊(cè)的 消費(fèi)者將成為該消費(fèi)組的 leader玄括,后續(xù)的 作為 follower冯丙,
    3. 當(dāng) leader 選出來(lái)后,他會(huì)從coordinator那里實(shí)時(shí)獲取分區(qū) 和 consumer 信息遭京,并根據(jù)分區(qū)策略給每個(gè)consumer 分配 分區(qū)胃惜,并將分配結(jié)果告訴 coordinator。
    4. follower 消費(fèi)者將從 coordinator 那里獲取到自己相關(guān)的分區(qū)信息進(jìn)行消費(fèi)哪雕,對(duì)于所有的 follower 消費(fèi)者而言船殉,他們只知道自己消費(fèi)的分區(qū),并不知道其他消費(fèi)者的存在斯嚎。
    5. 至此利虫,消費(fèi)者都知道自己的消費(fèi)的分區(qū)挨厚,分區(qū)過(guò)程結(jié)束,當(dāng)發(fā)送分區(qū)再均衡的時(shí)候糠惫,leader 將會(huì)重復(fù)分配過(guò)程

消費(fèi)組與分區(qū)重平衡

可以看到疫剃,當(dāng)新的消費(fèi)者加入消費(fèi)組,它會(huì)消費(fèi)一個(gè)或多個(gè)分區(qū)硼讽,而這些分區(qū)之前是由其他消費(fèi)者負(fù)責(zé)的巢价;另外,當(dāng)消費(fèi)者離開消費(fèi)組(比如重啟固阁、宕機(jī)等)時(shí)壤躲,它所消費(fèi)的分區(qū)會(huì)分配給其他分區(qū)。這種現(xiàn)象稱為重平衡(rebalance)备燃。重平衡是Kafka一個(gè)很重要的性質(zhì)柒爵,這個(gè)性質(zhì)保證了高可用和水平擴(kuò)展。不過(guò)也需要注意到赚爵,在重平衡期間棉胀,所有消費(fèi)者都不能消費(fèi)消息,因此會(huì)造成整個(gè)消費(fèi)組短暫的不可用冀膝。而且唁奢,將分區(qū)進(jìn)行重平衡也會(huì)導(dǎo)致原來(lái)的消費(fèi)者狀態(tài)過(guò)期,從而導(dǎo)致消費(fèi)者需要重新更新狀態(tài)窝剖,這段期間也會(huì)降低消費(fèi)性能麻掸。后面我們會(huì)討論如何安全的進(jìn)行重平衡以及如何盡可能避免。

消費(fèi)者通過(guò)定期發(fā)送心跳(hearbeat)到一個(gè)作為組協(xié)調(diào)者(group coordinator)的broker來(lái)保持在消費(fèi)組內(nèi)存活赐纱。這個(gè)broker不是固定的脊奋,每個(gè)消費(fèi)組都可能不同。當(dāng)消費(fèi)者拉取消息或者提交時(shí)疙描,便會(huì)發(fā)送心跳诚隙。

如果消費(fèi)者超過(guò)一定時(shí)間沒(méi)有發(fā)送心跳,那么它的會(huì)話(session)就會(huì)過(guò)期起胰,組協(xié)調(diào)者會(huì)認(rèn)為該消費(fèi)者已經(jīng)宕機(jī)久又,然后觸發(fā)重平衡⌒澹可以看到地消,從消費(fèi)者宕機(jī)到會(huì)話過(guò)期是有一定時(shí)間的,這段時(shí)間內(nèi)該消費(fèi)者的分區(qū)都不能進(jìn)行消息消費(fèi)畏妖;通常情況下脉执,我們可以進(jìn)行優(yōu)雅關(guān)閉,這樣消費(fèi)者會(huì)發(fā)送離開的消息到組協(xié)調(diào)者戒劫,這樣組協(xié)調(diào)者可以立即進(jìn)行重平衡而不需要等待會(huì)話過(guò)期半夷。

在0.10.1版本婆廊,Kafka對(duì)心跳機(jī)制進(jìn)行了修改,將發(fā)送心跳與拉取消息進(jìn)行分離玻熙,這樣使得發(fā)送心跳的頻率不受拉取的頻率影響否彩。另外更高版本的Kafka支持配置一個(gè)消費(fèi)者多長(zhǎng)時(shí)間不拉取消息但仍然保持存活疯攒,這個(gè)配置可以避免活鎖(livelock)嗦随。活鎖敬尺,是指應(yīng)用沒(méi)有故障但是由于某些原因不能進(jìn)一步消費(fèi)枚尼。

創(chuàng)建Kafka消費(fèi)者
讀取Kafka消息只需要?jiǎng)?chuàng)建一個(gè)kafkaConsumer,創(chuàng)建過(guò)程與KafkaProducer非常相像砂吞。我們需要使用四個(gè)基本屬性署恍,bootstrap.servers、key.deserializer蜻直、value.deserializer和group.id盯质。其中,bootstrap.servers與創(chuàng)建KafkaProducer的含義一樣概而;key.deserializer和value.deserializer是用來(lái)做反序列化的呼巷,也就是將字節(jié)數(shù)組轉(zhuǎn)換成對(duì)象;group.id不是嚴(yán)格必須的赎瑰,但通常都會(huì)指定王悍,這個(gè)參數(shù)是消費(fèi)者的消費(fèi)組。

下面是一個(gè)代碼樣例:

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);

訂閱主題
創(chuàng)建完消費(fèi)者后我們便可以訂閱主題了餐曼,只需要通過(guò)調(diào)用subscribe()方法即可压储,這個(gè)方法接收一個(gè)主題列表,非常簡(jiǎn)單:

consumer.subscribe(Collections.singletonList("customerCountries"));

這個(gè)例子中只訂閱了一個(gè)customerCountries主題源譬。另外集惋,我們也可以使用正則表達(dá)式來(lái)匹配多個(gè)主題,而且訂閱之后如果又有匹配的新主題踩娘,那么這個(gè)消費(fèi)組會(huì)立即對(duì)其進(jìn)行消費(fèi)芋膘。正則表達(dá)式在連接Kafka與其他系統(tǒng)時(shí)非常有用。比如訂閱所有的測(cè)試主題:

consumer.subscribe("test.*");

拉取循環(huán)
消費(fèi)數(shù)據(jù)的API和處理方式很簡(jiǎn)單霸饲,我們只需要循環(huán)不斷拉取消息即可为朋。Kafka對(duì)外暴露了一個(gè)非常簡(jiǎn)潔的poll方法,其內(nèi)部實(shí)現(xiàn)了協(xié)作厚脉、分區(qū)重平衡习寸、心跳、數(shù)據(jù)拉取等功能傻工,但使用時(shí)這些細(xì)節(jié)都被隱藏了霞溪,我們也不需要關(guān)注這些孵滞。下面是一個(gè)代碼樣例:

try {
   while (true) {  //1)
       ConsumerRecords<String, String> records = consumer.poll(100);  //2)
       for (ConsumerRecord<String, String> record : records)  //3)
       {
           log.debug("topic = %s, partition = %s, offset = %d,
              customer = %s, country = %s\n",
              record.topic(), record.partition(), record.offset(),
              record.key(), record.value());
           int updatedCount = 1;
           if (custCountryMap.countainsValue(record.value())) {
               updatedCount = custCountryMap.get(record.value()) + 1;
           }
           custCountryMap.put(record.value(), updatedCount)
           JSONObject json = new JSONObject(custCountryMap);
           System.out.println(json.toString(4))
       }
   }
} finally {
      consumer.close(); //4
}

其中,代碼中標(biāo)注了幾點(diǎn)鸯匹,說(shuō)明如下:

1)這個(gè)例子使用無(wú)限循環(huán)消費(fèi)并處理數(shù)據(jù)坊饶,這也是使用Kafka最多的一個(gè)場(chǎng)景,后面我們會(huì)討論如何更好的退出循環(huán)并關(guān)閉殴蓬。
2)這是上面代碼中最核心的一行代碼匿级。我們不斷調(diào)用poll拉取數(shù)據(jù),如果停止拉取染厅,那么Kafka會(huì)認(rèn)為此消費(fèi)者已經(jīng)死亡并進(jìn)行重平衡痘绎。參數(shù)值是一個(gè)超時(shí)時(shí)間,指明線程如果沒(méi)有數(shù)據(jù)時(shí)等待多長(zhǎng)時(shí)間肖粮,0表示不等待立即返回孤页。
3)poll()方法返回記錄的列表,每條記錄包含key/value以及主題涩馆、分區(qū)行施、位移信息。
4)主動(dòng)關(guān)閉可以使得Kafka立即進(jìn)行重平衡而不需要等待會(huì)話過(guò)期魂那。
另外需要提醒的是蛾号,消費(fèi)者對(duì)象不是線程安全的,也就是不能夠多個(gè)線程同時(shí)使用一個(gè)消費(fèi)者對(duì)象冰寻;而且也不能夠一個(gè)線程有多個(gè)消費(fèi)者對(duì)象须教。簡(jiǎn)而言之,一個(gè)線程一個(gè)消費(fèi)者斩芭,如果需要多個(gè)消費(fèi)者那么請(qǐng)使用多線程來(lái)進(jìn)行一一對(duì)應(yīng)轻腺。

單線程版

package com.neuedu;

import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

public class Consumer {
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers",
                "hadoop03:9092,hadoop05:9092,hadoop06:9092");// 該地址是集群的子集,用來(lái)探測(cè)集群划乖。
        props.put("group.id", "payment");// cousumer的分組id
        props.put("enable.auto.commit", "true");// 自動(dòng)提交offsets
        props.put("auto.commit.interval.ms", "1000");// 每隔1s贬养,自動(dòng)提交offsets
        props.put("session.timeout.ms", "30000");// Consumer向集群發(fā)送自己的心跳,超時(shí)則認(rèn)為Consumer已經(jīng)死了琴庵,kafka會(huì)把它的分區(qū)分配給其他進(jìn)程
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");// 反序列化器
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Arrays.asList("payment"));// 訂閱的topic,可以多個(gè)
//        String topic = "payment";
//        TopicPartition partition0 = new TopicPartition(topic, 0);
//        TopicPartition partition1 = new TopicPartition(topic, 1);
//        consumer.assign(Arrays.asList(partition0));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("賭東道賭東道賭東道賭東道賭東道賭東道 offset = %d, key = %s, value = %s, partition = %s",
                        record.offset(), record.key(), record.value(),record.partition());
                System.out.println();

            }
        }
    }
}

多線程版

package com.neuedu;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerRunnable implements Runnable {

    // 每個(gè)線程維護(hù)私有的KafkaConsumer實(shí)例
    private final KafkaConsumer<String, String> consumer;

    public ConsumerRunnable(String brokerList, String groupId, String topic) {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put("group.id", groupId);
        props.put("enable.auto.commit", "true");        //本例使用自動(dòng)提交位移
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
//分區(qū)分配策略
  props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");
        props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));   // 本例使用分區(qū)副本自動(dòng)分配策略
    }

    @Override
    public void run() {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(200);   // 本例使用200ms作為獲取超時(shí)時(shí)間
            for (ConsumerRecord<String, String> record : records) {
                // 這里面寫處理消息的邏輯误算,本例中只是簡(jiǎn)單地打印消息
                System.out.println(Thread.currentThread().getName() + " consumed " + record.partition() +
                        "th message with offset: " + record.offset());
            }
        }
    }
}

package com.neuedu;

import java.util.ArrayList;
import java.util.List;

public class ConsumerGroup {

    private List<ConsumerRunnable> consumers;

    public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
        consumers = new ArrayList<>(consumerNum);
        for (int i = 0; i < consumerNum; ++i) {
            ConsumerRunnable consumerThread = new ConsumerRunnable(brokerList, groupId, topic);
            consumers.add(consumerThread);
        }
    }

    public void execute() {
        for (ConsumerRunnable task : consumers) {
            new Thread(task).start();
        }
    }
}

package com.neuedu;

public class ConsumerMain {

    public static void main(String[] args) {
        String brokerList = "hadoop03:9092";
        String groupId = "testGroup1";
        String topic = "payment";
        int consumerNum = 2;

        ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
        consumerGroup.execute();
    }
}

使用起來(lái)還是很簡(jiǎn)單的,不過(guò)如果想要用好 consumer迷殿,可能你還需要了解以下這些東西:

  1. 分區(qū)控制策略
  2. consumer 的一些常用配置
  3. offset 的控制

ok,那么我們接下來(lái)一個(gè)個(gè)來(lái)看吧庆寺。壤圃。乍桂。

分區(qū)控制策略

  1. 手動(dòng)控制分區(qū)
    咱們先來(lái)說(shuō)下最簡(jiǎn)單的手動(dòng)分區(qū)控制扁凛,代碼如下:
     String topic = "foo";
     TopicPartition partition0 = new TopicPartition(topic, 0);
     TopicPartition partition1 = new TopicPartition(topic, 1);
     consumer.assign(Arrays.asList(partition0, partition1));

看起來(lái)只是把普通的訂閱方式修改成了訂閱知道 topic 的分區(qū),其余的還是照常使用,不過(guò)這里也需要注意一下的是:

  • 一般只作為獨(dú)立消費(fèi)者,也就是不能加入消費(fèi)組,或者說(shuō)他本身就是作為一個(gè)消費(fèi)組存在,要保證這一點(diǎn),我們只需要保證其group id 是唯一的就可以了轴猎。
  • 對(duì)于topic的分區(qū)變動(dòng)不敏感垮斯,也就是說(shuō)當(dāng) topic新增了分區(qū)熊杨,分區(qū)的數(shù)據(jù)將會(huì)發(fā)生改變钻趋,但該消費(fèi)組對(duì)此確是不感知的,依然照常運(yùn)行,所以很多時(shí)候需要你手動(dòng)consumer.partitionsFor()去查看topic的分區(qū)情況
  • 不要和 subscription混合使用
  1. 使用partition.assignment.strategy進(jìn)行分區(qū)策略配置
  props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");
  props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");

這里的話 kafka 是自帶兩種分區(qū)策略的拂封,為了方便理解镣衡,我們以如下場(chǎng)景為例來(lái)進(jìn)行解釋:
已知:
TopicA 有 3 個(gè) partition(分區(qū)):A-1,A-2,A-3;
TopicB 有 3 個(gè) partition(分區(qū)):B-1,B-2在孝,B-3顾彰;
ConsumerA 和 ConsumerB 作為一個(gè)消費(fèi)組 ConsumerGroup 同時(shí)消費(fèi) TopicA 和 TopicB

  • Range
    該方式最大的特點(diǎn)就是會(huì)將連續(xù)的分區(qū)分配給一個(gè)消費(fèi)者厕隧,根據(jù)示例建丧,我們可以得出如下結(jié)論:

    ConsumerGroup 消費(fèi) TopicA 的時(shí)候:
    ConsumerA 會(huì)分配到 A-1拴曲,A-2
    ConsumerB 會(huì)分配到 A-3

    ConsumerGroup 消費(fèi) TopicB 的時(shí)候:
    ConsumerA 會(huì)分配到 B-1叁熔,B-2
    ConsumerB 會(huì)分配到 B-3

    所以:
    ConsumerA 分配到了4個(gè)分區(qū): A-1,A-2,B-1胖秒,B-2
    ConsumerB 分配到了2個(gè)分區(qū):A-3风题,B-3

  • RoundRobin
    該方式最大的特點(diǎn)就是會(huì)以輪詢的方式將分區(qū)分配給一個(gè)個(gè)消費(fèi)者,根據(jù)示例围小,我們可以得出如下結(jié)論:

    ConsumerGroup 消費(fèi) TopicA 的時(shí)候:
    ConsumerA 分配到 A-1
    ConsumerB 分配到 A-2
    ConsumerA 分配到 A-3

    ConsumerGroup 消費(fèi) TopicB 的時(shí)候,因?yàn)樯洗畏峙涞搅?ConsumerA昵骤,那么這次輪到 ConsumerB了 所以:
    ConsumerB 分配到 B-1
    ConsumerA 分配到 B-2
    ConsumerB 分配到 B-3

    所以:
    ConsumerA 分配到了4個(gè)分區(qū): A-1,A-3吩抓,B-2
    ConsumerB 分配到了2個(gè)分區(qū):A-2涉茧,B-1,B-3

從上面我們也是可以看出這兩種策略的異同疹娶,RoundRobin 相比較 Range 會(huì)使得分區(qū)分配的更加的均衡伴栓,但如果是消費(fèi)單個(gè) topic ,那么其均衡是差不多的,而 Range 會(huì)比 RoundRobin 更具優(yōu)勢(shì)一點(diǎn)钳垮,至于這個(gè)優(yōu)勢(shì)惑淳,還得看你的具體業(yè)務(wù)了。

  • 自定義的分區(qū)策略
    上面兩種分區(qū)策略是 kafka 默認(rèn)自帶的策略饺窿,雖然大多數(shù)情況下夠用了歧焦,但是可能針對(duì)一些特殊需求,我們也可以定義自己的分區(qū)策略

    1. Range分區(qū)策略源碼
      如何自定義呢肚医?最好的方式莫過(guò)于看源碼是怎么實(shí)現(xiàn)的绢馍,然后自己依葫蘆畫瓢來(lái)一個(gè),所以我們先來(lái)看看 Range分區(qū)策略源碼肠套,如下舰涌,我只做了簡(jiǎn)單的注釋,因?yàn)樗旧硪埠芎?jiǎn)單你稚,重點(diǎn)看下 assign 的參數(shù)以及返回注釋就 ok了
    public class RangeAssignor extends AbstractPartitionAssignor{
      //省略部分代碼瓷耙。。刁赖。搁痛。
     /**
       * 根據(jù)訂閱者 和 分區(qū)數(shù)量來(lái)進(jìn)行分區(qū)
       * @param partitionsPerTopic: topic->分區(qū)數(shù)量
       * @param subscriptions: memberId 消費(fèi)者id -> subscription 消費(fèi)者信息
       * @return: memberId ->list<topic名稱 和 分區(qū)序號(hào)(id)>
       */
      @Override
      public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) {
          //topic -> list<消費(fèi)者>
          Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
    
          //初始化 返回結(jié)果
          Map<String, List<TopicPartition>> assignment = new HashMap<>();
          for (String memberId : subscriptions.keySet())
              assignment.put(memberId, new ArrayList<TopicPartition>());
    
          for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
              //topic
              String topic = topicEntry.getKey();
              // 消費(fèi)該topic的 consumer-id
              List<String> consumersForTopic = topicEntry.getValue();
    
              //topic 的分區(qū)數(shù)量
              Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
              if (numPartitionsForTopic == null)
                  continue;
    
              Collections.sort(consumersForTopic);
    
              //平均每個(gè)消費(fèi)者分配的 分區(qū)數(shù)量
              int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
              //平均之后剩下的 分區(qū)數(shù)
              int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
    
              //這里就是將連續(xù)分區(qū)切開然后分配給每個(gè)消費(fèi)者
              List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
              for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
                  int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
                  int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
                  assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
              }
          }
          return assignment;
        }
      }
    
    
    1. 自定義一個(gè) 分區(qū)策略
      這里先緩緩把,太簡(jiǎn)單把宇弛,沒(méi)什么用鸡典,太復(fù)雜把,一時(shí)也想不出好的場(chǎng)景涯肩,如果你有需求轿钠,歡迎留言,我們一起來(lái)實(shí)現(xiàn)

Consumer 常用配置

首先病苗,我們都應(yīng)該知道,最全最全的文檔應(yīng)該是來(lái)自官網(wǎng)(雖然有時(shí)候可能官網(wǎng)找不到):
http://kafka.apachecn.org/documentation.html#newconsumerconfigs
嗯症汹,以下內(nèi)容來(lái)自 kafka權(quán)威指南 ,請(qǐng)?jiān)徫业男卸琛M匆小7峥:罄m(xù)有時(shí)間會(huì)把工作中的遇到的補(bǔ)充上

  1. fetch.min.bytes
    該屬性指定了消費(fèi)者從服務(wù)器獲取記錄的最小字節(jié)數(shù)。broker 在收到消費(fèi)者的數(shù)據(jù)請(qǐng)求時(shí)瞒斩,
    如果可用的數(shù)據(jù)量小于fetch.min.bytes 指定的大小破婆,那么它會(huì)等到有足夠的可用數(shù)據(jù)時(shí)才把它返回給消費(fèi)者。這樣可以降低消費(fèi)者和 broker 的工作負(fù)載胸囱,因?yàn)樗鼈冊(cè)谥黝}不是很活躍的時(shí)候(或者一天里的低谷時(shí)段)就不需要來(lái)來(lái)回回地處理消息祷舀。如果沒(méi)有很多可用數(shù)據(jù),但消費(fèi)者的 CPU 使用率卻很高,那么就需要把該屬性的值設(shè)得比默認(rèn)值大裳扯。如果消費(fèi)者的數(shù)量比較多抛丽,把該屬性的值設(shè)置得大一點(diǎn)可以降低broker 的工作負(fù)載。

  2. fetch.max.wait.ms
    我們通過(guò) fetch.min.bytes 告訴 Kafka饰豺,等到有足夠的數(shù)據(jù)時(shí)才把它返回給消費(fèi)者亿鲜。而feth.max.wait.ms 則用于指定 broker 的等待時(shí)間,默認(rèn)是 500ms冤吨。如果沒(méi)有足夠的數(shù)據(jù)流入 Kafka蒿柳,消費(fèi)者獲取最小數(shù)據(jù)量的要求就得不到滿足,最終導(dǎo)致 500ms 的延遲漩蟆。如果要降低潛在的延遲(為了滿足 SLA)其馏,可以把該參數(shù)值設(shè)置得小一些。
    如果 fetch.max.wait.ms 被設(shè)為 100ms爆安,并且fetch.min.bytes 被設(shè)為 1MB叛复,那么 Kafka 在收到消費(fèi)者的請(qǐng)求后,要么返回 1MB 數(shù)據(jù)扔仓,要么在100ms 后返回所有可用的數(shù)據(jù)褐奥,就看哪個(gè)條件先得到滿足。

  3. max.partition.fetch.bytes
    該屬性指定了服務(wù)器從每個(gè)分區(qū)里返回給消費(fèi)者的最大字節(jié)數(shù)翘簇。它的默認(rèn)值是 1MB撬码,也就是說(shuō),KafkaConsumer.poll() 方法從每個(gè)分區(qū)里返回的記錄最多不超過(guò) max.partition.fetch.bytes指定的字節(jié)版保。如果一個(gè)主題有 20 個(gè)分區(qū)和 5 個(gè)消費(fèi)者呜笑,那么每個(gè)消費(fèi)者需要至少 4MB 的可用內(nèi)存來(lái)接收記錄。在為消費(fèi)者分配內(nèi)存時(shí)彻犁,可以給它們多分配一些叫胁,因?yàn)槿绻航M里有消費(fèi)者發(fā)生崩潰,剩下的消費(fèi)者需要處理更多的分區(qū)汞幢。
    max.partition.fetch.bytes 的值必須比 broker 能夠接收的最大消息的字節(jié)數(shù)(通過(guò) max.message.size 屬性配置)大驼鹅,否則消費(fèi)者可能無(wú)法讀取這些消息,導(dǎo)致消費(fèi)者一直掛起重試森篷。在設(shè)置該屬性時(shí)输钩,另一個(gè)需要考慮的因素是消費(fèi)者處理數(shù)據(jù)的時(shí)間。消費(fèi)者需要頻繁調(diào)用poll() 方法來(lái)避免會(huì)話過(guò)期和發(fā)生分區(qū)再均衡仲智,如果單次調(diào)用 poll() 返回的數(shù)據(jù)太多买乃,消費(fèi)者需要更多的時(shí)間來(lái)處理,可能無(wú)法及時(shí)進(jìn)行下一個(gè)輪詢來(lái)避免會(huì)話過(guò)期钓辆。
    如果出現(xiàn)這種情況剪验,可以把max.partition.fetch.bytes 值改小肴焊,或者延長(zhǎng)會(huì)話過(guò)期時(shí)間。

  4. session.timeout.ms
    該屬性指定了消費(fèi)者在被認(rèn)為死亡之前可以與服務(wù)器斷開連接的時(shí)間碉咆,默認(rèn)是 3s抖韩。如果消費(fèi)者沒(méi)有在session.timeout.ms 指定的時(shí)間內(nèi)發(fā)送心跳給群組協(xié)調(diào)器,就被認(rèn)為已經(jīng)死亡疫铜,協(xié)調(diào)器就會(huì)觸發(fā)再均衡茂浮,把它的分區(qū)分配給群組里的其他消費(fèi)者。
    該屬性與 heartbeat.interval.ms 緊密相關(guān)壳咕。heartbeat.interval.ms 指定了 poll() 方法向協(xié)調(diào)器發(fā)送心跳的頻率席揽,session.timeout.ms則指定了消費(fèi)者可以多久不發(fā)送心跳。
    所以谓厘,一般需要同時(shí)修改這兩個(gè)屬性幌羞,heartbeat.interval.ms 必須比 session.timeout.ms 小,一般是session.timeout.ms 的三分之一竟稳。如果 session.timeout.ms 是 3s属桦,那么 heartbeat.interval.ms 應(yīng)該是 1s。把session.timeout.ms 值設(shè)得比默認(rèn)值小他爸,可以更快地檢測(cè)和恢復(fù)崩潰的節(jié)點(diǎn)聂宾,不過(guò)長(zhǎng)時(shí)間的輪詢或垃圾收集可能導(dǎo)致非預(yù)期的再均衡。把該屬性的值設(shè)置得大一些诊笤,可以減少意外的再均衡系谐,不過(guò)檢測(cè)節(jié)點(diǎn)崩潰需要更長(zhǎng)的時(shí)間。

  5. auto.offset.reset
    該屬性指定了消費(fèi)者在讀取一個(gè)沒(méi)有偏移量的分區(qū)或者偏移量無(wú)效的情況下(因消費(fèi)者長(zhǎng)時(shí)間失效讨跟,包含偏移量的記錄已經(jīng)過(guò)時(shí)并被刪除)該作何處理纪他。
    它的默認(rèn)值是 latest,意思是說(shuō)晾匠,在偏移量無(wú)效的情況下茶袒,消費(fèi)者將從最新的記錄開始讀取數(shù)據(jù)(在消費(fèi)者啟動(dòng)之后生成的記錄)。另一個(gè)值是earliest混聊,意思是說(shuō)弹谁,在偏移量無(wú)效的情況下,消費(fèi)者將從起始位置讀取分區(qū)的記錄句喜。

  6. enable.auto.commit
    我們稍后將介紹幾種不同的提交偏移量的方式。該屬性指定了消費(fèi)者是否自動(dòng)提交偏移量沟于,默認(rèn)值是true咳胃。為了盡量避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)為 false旷太,由自己控制何時(shí)提交偏移量展懈。如果把它設(shè)為true销睁,還可以通過(guò)配置 auto.commit.interval.ms 屬性來(lái)控制提交的頻率。

  7. partition.assignment.strategy(這部分好像重復(fù)了 ~~~)
    我們知道存崖,分區(qū)會(huì)被分配給群組里的消費(fèi)者冻记。PartitionAssignor 根據(jù)給定的消費(fèi)者和主題,決定哪些分區(qū)應(yīng)該被分配給哪個(gè)消費(fèi)者来惧。Kafka 有兩個(gè)默認(rèn)的分配策略冗栗。
    Range
      該策略會(huì)把主題的若干個(gè)連續(xù)的分區(qū)分配給消費(fèi)者。假設(shè)消費(fèi)者 C1 和消費(fèi)者 C2 同時(shí)訂閱了主題T1 和 主題 T2供搀,并且每個(gè)主題有 3 個(gè)分區(qū)隅居。那么消費(fèi)者 C1 有可能分配到這兩個(gè)主題的分區(qū) 0 和分區(qū) 1,而消費(fèi)者 C2 分配到這兩個(gè)主題的分區(qū) 2葛虐。因?yàn)槊總€(gè)主題擁有奇數(shù)個(gè)分區(qū)胎源,而分配是在主題內(nèi)獨(dú)立完成的,第一個(gè)消費(fèi)者最后分配到比第二個(gè)消費(fèi)者更多的分區(qū)屿脐。只要使用了 Range 策略涕蚤,而且分區(qū)數(shù)量無(wú)法被消費(fèi)者數(shù)量整除,就會(huì)出現(xiàn)這種情況的诵。

    RoundRobin
      該策略把主題的所有分區(qū)逐個(gè)分配給消費(fèi)者万栅。如果使用 RoundRobin 策略來(lái)給消費(fèi)者 C1 和消費(fèi)者C2 分配分區(qū),那么消費(fèi)者 C1 將分到主題 T1 的分區(qū) 0 和分區(qū) 2 以及主題 T2 的分區(qū) 1奢驯,消費(fèi)者 C2 將分配到主題 T1 的分區(qū) 1 以及主題 T2 的分區(qū) 0 和分區(qū) 2申钩。一般來(lái)說(shuō),如果所有消費(fèi)者都訂閱相同的主題(這種情況很常見(jiàn))瘪阁,RoundRobin 策略會(huì)給所有消費(fèi)者分配相同數(shù)量的分區(qū)(或最多就差一個(gè)分區(qū))撒遣。可以通過(guò)設(shè)置 partition.assignment.strategy 來(lái)選擇分區(qū)策略管跺。
    默認(rèn)使用的是org.apache.kafka.clients.consumer.RangeAssignor义黎,這個(gè)類實(shí)現(xiàn)了 Range 策略,不過(guò)也可以把它改成 org.apache.kafka.clients.consumer.RoundRobinAssignor豁跑。我們還可以使用自定義策略廉涕,在這種情況下,partition.assignment.strategy 屬性的值就是自定義類的名字艇拍。

  8. client.id
    該屬性可以是任意字符串狐蜕,broker 用它來(lái)標(biāo)識(shí)從客戶端發(fā)送過(guò)來(lái)的消息,通常被用在日志卸夕、度量指標(biāo)和配額里层释。

  9. max.poll.records
    該屬性用于控制單次調(diào)用 call() 方法能夠返回的記錄數(shù)量,可以幫你控制在輪詢里需要處理的數(shù)據(jù)量快集。

  10. receive.buffer.bytes 和 send.buffer.bytes
    socket 在讀寫數(shù)據(jù)時(shí)用到的 TCP 緩沖區(qū)也可以設(shè)置大小贡羔。如果它們被設(shè)為 -1廉白,就使用操作系統(tǒng)的默認(rèn)值。
    如果生產(chǎn)者或消費(fèi)者與 broker 處于不同的數(shù)據(jù)中心內(nèi)乖寒,可以適當(dāng)增大這些值猴蹂,因?yàn)榭鐢?shù)據(jù)中心的網(wǎng)絡(luò)一般都有比較高的延遲和比較低的帶寬

提交(commit)與位移(offset)

當(dāng)我們調(diào)用poll()時(shí),該方法會(huì)返回我們沒(méi)有消費(fèi)的消息楣嘁。當(dāng)消息從broker返回消費(fèi)者時(shí)磅轻,broker并不跟蹤這些消息是否被消費(fèi)者接收到;Kafka讓消費(fèi)者自身來(lái)管理消費(fèi)的位移马澈,并向消費(fèi)者提供更新位移的接口瓢省,這種更新位移方式稱為提交(commit)。

在正常情況下痊班,消費(fèi)者會(huì)發(fā)送分區(qū)的提交信息到Kafka勤婚,Kafka進(jìn)行記錄。當(dāng)消費(fèi)者宕機(jī)或者新消費(fèi)者加入時(shí)涤伐,Kafka會(huì)進(jìn)行重平衡馒胆,這會(huì)導(dǎo)致消費(fèi)者負(fù)責(zé)之前并不屬于它的分區(qū)。重平衡完成后凝果,消費(fèi)者會(huì)重新獲取分區(qū)的位移祝迂,下面來(lái)看下兩種有意思的情況。

假如一個(gè)消費(fèi)者在重平衡前后都負(fù)責(zé)某個(gè)分區(qū)器净,如果提交位移比之前實(shí)際處理的消息位移要小型雳,那么會(huì)導(dǎo)致消息重復(fù)消費(fèi),如下所示:

image

假如在重平衡前某個(gè)消費(fèi)者拉取分區(qū)消息山害,在進(jìn)行消息處理前提交了位移纠俭,但還沒(méi)完成處理宕機(jī)了,然后Kafka進(jìn)行重平衡浪慌,新的消費(fèi)者負(fù)責(zé)此分區(qū)并讀取提交位移冤荆,此時(shí)會(huì)“丟失”消息,如下所示:

image

因此权纤,提交位移的方式會(huì)對(duì)應(yīng)用有比較大的影響钓简,下面來(lái)看下不同的提交方式。

自動(dòng)提交

這種方式讓消費(fèi)者來(lái)管理位移汹想,應(yīng)用本身不需要顯式操作外邓。當(dāng)我們將enable.auto.commit設(shè)置為true,那么消費(fèi)者會(huì)在poll方法調(diào)用后每隔5秒(由auto.commit.interval.ms指定)提交一次位移古掏。和很多其他操作一樣坐榆,自動(dòng)提交也是由poll()方法來(lái)驅(qū)動(dòng)的;在調(diào)用poll()時(shí)冗茸,消費(fèi)者判斷是否到達(dá)提交時(shí)間席镀,如果是則提交上一次poll返回的最大位移。

需要注意到夏漱,這種方式可能會(huì)導(dǎo)致消息重復(fù)消費(fèi)豪诲。假如,某個(gè)消費(fèi)者poll消息后挂绰,應(yīng)用正在處理消息屎篱,在3秒后Kafka進(jìn)行了重平衡,那么由于沒(méi)有更新位移導(dǎo)致重平衡后這部分消息重復(fù)消費(fèi)葵蒂。

提交當(dāng)前位移

為了減少消息重復(fù)消費(fèi)或者避免消息丟失交播,很多應(yīng)用選擇自己主動(dòng)提交位移。設(shè)置auto.commit.offset為false践付,那么應(yīng)用需要自己通過(guò)調(diào)用commitSync()來(lái)主動(dòng)提交位移秦士,該方法會(huì)提交poll返回的最后位移。

為了避免消息丟失永高,我們應(yīng)當(dāng)在完成業(yè)務(wù)邏輯后才提交位移隧土。而如果在處理消息時(shí)發(fā)生了重平衡,那么只有當(dāng)前poll的消息會(huì)重復(fù)消費(fèi)命爬。下面是一個(gè)自動(dòng)提交的代碼樣例:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
    }

    try {
        consumer.commitSync();
    } catch (CommitFailedException e) {
        log.error("commit failed", e)
    }
}

上面代碼poll消息曹傀,并進(jìn)行簡(jiǎn)單的打印(在實(shí)際中有更多的處理)饲宛,最后完成處理后進(jìn)行了位移提交皆愉。

異步提交

手動(dòng)提交有一個(gè)缺點(diǎn),那就是當(dāng)發(fā)起提交調(diào)用時(shí)應(yīng)用會(huì)阻塞艇抠。當(dāng)然我們可以減少手動(dòng)提交的頻率幕庐,但這個(gè)會(huì)增加消息重復(fù)的概率(和自動(dòng)提交一樣)。另外一個(gè)解決辦法是练链,使用異步提交的API翔脱。以下為使用異步提交的方式,應(yīng)用發(fā)了一個(gè)提交請(qǐng)求然后立即返回:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s,
        offset = %d, customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
    }

    consumer.commitAsync();
}

但是異步提交也有個(gè)缺點(diǎn)媒鼓,那就是如果服務(wù)器返回提交失敗届吁,異步提交不會(huì)進(jìn)行重試。相比較起來(lái)绿鸣,同步提交會(huì)進(jìn)行重試直到成功或者最后拋出異常給應(yīng)用疚沐。異步提交沒(méi)有實(shí)現(xiàn)重試是因?yàn)椋绻瑫r(shí)存在多個(gè)異步提交潮模,進(jìn)行重試可能會(huì)導(dǎo)致位移覆蓋亮蛔。舉個(gè)例子,假如我們發(fā)起了一個(gè)異步提交commitA擎厢,此時(shí)的提交位移為2000究流,隨后又發(fā)起了一個(gè)異步提交commitB且位移為3000辣吃;commitA提交失敗但commitB提交成功,此時(shí)commitA進(jìn)行重試并成功的話芬探,會(huì)將實(shí)際上將已經(jīng)提交的位移從3000回滾到2000神得,導(dǎo)致消息重復(fù)消費(fèi)。

因此偷仿,基于這種性質(zhì)哩簿,一般情況下對(duì)于異步提交,我們可能會(huì)通過(guò)回調(diào)的方式記錄提交結(jié)果:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic = %s, partition = %s,
        offset = %d, customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
    }
    consumer.commitAsync(new OffsetCommitCallback() {
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (e != null)
                log.error("Commit failed for offsets {}", offsets, e);
        } 
    });
}

而如果想進(jìn)行重試同時(shí)又保證提交順序的話酝静,一種簡(jiǎn)單的辦法是使用單調(diào)遞增的序號(hào)节榜。每次發(fā)起異步提交時(shí)增加此序號(hào),并且將此時(shí)的序號(hào)作為參數(shù)傳給回調(diào)方法别智;當(dāng)消息提交失敗回調(diào)時(shí)宗苍,檢查參數(shù)中的序號(hào)值與全局的序號(hào)值,如果相等那么可以進(jìn)行重試提交亿遂,否則放棄(因?yàn)橐呀?jīng)有更新的位移提交了)浓若。

混合同步提交與異步提交

正常情況下,偶然的提交失敗并不是什么大問(wèn)題蛇数,因?yàn)楹罄m(xù)的提交成功就可以了挪钓。但是在某些情況下(例如程序退出、重平衡)耳舅,我們希望最后的提交成功碌上,因此一種非常普遍的方式是混合異步提交和同步提交,如下所示:

try {
    while (true) {
       ConsumerRecords<String, String> records = consumer.poll(100);
       for (ConsumerRecord<String, String> record : records) {
           System.out.printf("topic = %s, partition = %s, offset = %d,
           customer = %s, country = %s\n",
           record.topic(), record.partition(),
           record.offset(), record.key(), record.value());
       }

       consumer.commitAsync();
    }
} catch (Exception e) {
    log.error("Unexpected error", e);
} finally {
    try {
        consumer.commitSync();
    } finally {
        consumer.close();
    }
}

在正常處理流程中浦徊,我們使用異步提交來(lái)提高性能馏予,但最后使用同步提交來(lái)保證位移提交成功。

提交特定位移

commitSync()和commitAsync()會(huì)提交上一次poll()的最大位移盔性,但如果poll()返回了批量消息霞丧,而且消息數(shù)量非常多,我們可能會(huì)希望在處理這些批量消息過(guò)程中提交位移冕香,以免重平衡導(dǎo)致從頭開始消費(fèi)和處理蛹尝。幸運(yùn)的是,commitSync()和commitAsync()允許我們指定特定的位移參數(shù)悉尾,參數(shù)為一個(gè)分區(qū)與位移的map突那。由于一個(gè)消費(fèi)者可能會(huì)消費(fèi)多個(gè)分區(qū),所以這種方式會(huì)增加一定的代碼復(fù)雜度构眯,如下所示:

private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;

....

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());

        currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
        if (count % 1000 == 0)
            consumer.commitAsync(currentOffsets, null);
        count++;
} }

代碼中在處理poll()消息的過(guò)程中愕难,不斷保存分區(qū)與位移的關(guān)系,每處理1000條消息就會(huì)異步提交(也可以使用同步提交)。

重平衡監(jiān)聽器(Rebalance Listener)

變更分區(qū)命令

kafka-topics.sh --zookeeper hadoop03:2181 --alter --topic test --partitions 6

在分區(qū)重平衡前猫缭,如果消費(fèi)者知道它即將不再負(fù)責(zé)某個(gè)分區(qū)葱弟,那么它可能需要將已經(jīng)處理過(guò)的消息位移進(jìn)行提交。Kafka的API允許我們?cè)谙M(fèi)者新增分區(qū)或者失去分區(qū)時(shí)進(jìn)行處理饵骨,我們只需要在調(diào)用subscribe()方法時(shí)傳入ConsumerRebalanceListener對(duì)象翘悉,該對(duì)象有兩個(gè)方法:

  • public void onPartitionRevoked(Collection<topicpartition> partitions):此方法會(huì)在消費(fèi)者停止消費(fèi)消費(fèi)后,在重平衡開始前調(diào)用居触。</topicpartition>
  • public void onPartitionAssigned(Collection<topicpartition> partitions):此方法在分區(qū)分配給消費(fèi)者后,在消費(fèi)者開始讀取消息前調(diào)用老赤。</topicpartition>

下面來(lái)看一個(gè)onPartitionRevoked9)的例子轮洋,該例子在消費(fèi)者失去某個(gè)分區(qū)時(shí)提交位移(以便其他消費(fèi)者可以接著消費(fèi)消息并處理):

private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

private class HandleRebalance implements ConsumerRebalanceListener {
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println("Lost partitions in rebalance.
          Committing current
        offsets:" + currentOffsets);
        consumer.commitSync(currentOffsets);
    }
}

try {
    consumer.subscribe(topics, new HandleRebalance());
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
        {
             System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
             currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
        }
        consumer.commitAsync(currentOffsets, null);
    }
} catch (WakeupException e) {
    // ignore, we're closing
} catch (Exception e) {
   log.error("Unexpected error", e);
} finally {
   try {
       consumer.commitSync(currentOffsets);
   } finally {
       consumer.close();
       System.out.println("Closed consumer and we are done");
   }
}

代碼中實(shí)現(xiàn)了onPartitionsRevoked()方法,當(dāng)消費(fèi)者失去某個(gè)分區(qū)時(shí)抬旺,會(huì)提交已經(jīng)處理的消息位移(而不是poll()的最大位移)弊予。上面代碼會(huì)提交所有的分區(qū)位移,而不僅僅是失去分區(qū)的位移开财,但這種做法沒(méi)什么壞處汉柒。

從指定位移開始消費(fèi)

在此之前,我們使用poll()來(lái)從最后的提交位移開始消費(fèi)责鳍,但我們也可以從一個(gè)指定的位移開始消費(fèi)碾褂。

如果想從分區(qū)開始端重新開始消費(fèi),那么可以使用seekToBeginning(TopicPartition tp)历葛;如果想從分區(qū)的最末端消費(fèi)最新的消息正塌,那么可以使用seekToEnd(TopicPartition tp)。而且恤溶,Kafka還支持我們從指定位移開始消費(fèi)乓诽。從指定位移開始消費(fèi)的應(yīng)用場(chǎng)景有很多,其中最典型的一個(gè)是:位移存在其他系統(tǒng)(例如數(shù)據(jù)庫(kù))中咒程,并且以其他系統(tǒng)的位移為準(zhǔn)鸠天。

考慮這么個(gè)場(chǎng)景:我們從Kafka中讀取消費(fèi),然后進(jìn)行處理帐姻,最后把結(jié)果寫入數(shù)據(jù)庫(kù)稠集;我們既不想丟失消息,也不想數(shù)據(jù)庫(kù)中存在重復(fù)的消息數(shù)據(jù)卖宠。對(duì)于這樣的場(chǎng)景巍杈,我們可能會(huì)按如下邏輯處理:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        currentOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset());
        processRecord(record);
        storeRecordInDB(record);
        consumer.commitAsync(currentOffsets);
    }
}

這個(gè)邏輯似乎沒(méi)什么問(wèn)題,但是要注意到這么個(gè)事實(shí)扛伍,在持久化到數(shù)據(jù)庫(kù)成功后筷畦,提交位移到Kafka可能會(huì)失敗,那么這可能會(huì)導(dǎo)致消息會(huì)重復(fù)處理。對(duì)于這種情況鳖宾,我們可以優(yōu)化方案吼砂,將持久化到數(shù)據(jù)庫(kù)與提交位移實(shí)現(xiàn)為原子性操作,也就是要么同時(shí)成功鼎文,要么同時(shí)失敗渔肩。但這個(gè)是不可能的,因此我們可以在保存記錄到數(shù)據(jù)庫(kù)的同時(shí)拇惋,也保存位移周偎,然后在消費(fèi)者開始消費(fèi)時(shí)使用數(shù)據(jù)庫(kù)的位移開始消費(fèi)。這個(gè)方案是可行的撑帖,我們只需要通過(guò)seek()來(lái)指定分區(qū)位移開始消費(fèi)即可蓉坎。下面是一個(gè)改進(jìn)的樣例代碼:

public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        //在消費(fèi)者負(fù)責(zé)的分區(qū)被回收前提交數(shù)據(jù)庫(kù)事務(wù),保存消費(fèi)的記錄和位移
        commitDBTransaction();
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        //在開始消費(fèi)前胡嘿,從數(shù)據(jù)庫(kù)中獲取分區(qū)的位移蛉艾,并使用seek()來(lái)指定開始消費(fèi)的位移
        for(TopicPartition partition: partitions)
            consumer.seek(partition, getOffsetFromDB(partition));
    } 
}

    consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
    //在subscribe()之后poll一次,并從數(shù)據(jù)庫(kù)中獲取分區(qū)的位移衷敌,使用seek()來(lái)指定開始消費(fèi)的位移
    consumer.poll(0);
    for (TopicPartition partition: consumer.assignment())
        consumer.seek(partition, getOffsetFromDB(partition));

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
        {
            processRecord(record);
            //保存記錄結(jié)果
            storeRecordInDB(record);
            //保存位移
            storeOffsetInDB(record.topic(), record.partition(), record.offset());
        }
        //提交數(shù)據(jù)庫(kù)事務(wù)勿侯,保存消費(fèi)的記錄以及位移
        commitDBTransaction();
    }

具體邏輯見(jiàn)代碼注釋,此處不再贅述缴罗。另外注意的是助琐,seek()只是指定了poll()拉取的開始位移,這并不影響在Kafka中保存的提交位移(當(dāng)然我們可以在seek和poll之后提交位移覆蓋)瞒爬。

package com.neuedu;

import java.util.*;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

public class Consumer {
    private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
    static int count = 0;
    static KafkaConsumer<String, String> consumer;
    private class HandleRebalance implements ConsumerRebalanceListener {
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            System.out.println("Lost partitions in rebalance.Committing current offsets:" + currentOffsets);
            consumer.commitSync(currentOffsets);
        }
    }
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers",
                "hadoop03:9092,hadoop05:9092,hadoop06:9092");// 該地址是集群的子集弓柱,用來(lái)探測(cè)集群。
        props.put("group.id", "payment");// cousumer的分組id
        props.put("enable.auto.commit", "false");// 自動(dòng)提交offsets
        props.put("auto.commit.interval.ms", "1000");// 每隔1s侧但,自動(dòng)提交offsets
        props.put("session.timeout.ms", "30000");// Consumer向集群發(fā)送自己的心跳矢空,超時(shí)則認(rèn)為Consumer已經(jīng)死了,kafka會(huì)把它的分區(qū)分配給其他進(jìn)程
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");// 反序列化器
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<>(props);

//        consumer.subscribe(Arrays.asList("payment"));// 訂閱的topic,可以多個(gè)
        String topic = "payment";
        TopicPartition partition0 = new TopicPartition(topic, 0);
//        TopicPartition partition1 = new TopicPartition(topic, 1);
        consumer.assign(Arrays.asList(partition0));

        Collection<TopicPartition> partitions = Arrays.asList(partition0);

        consumer.seekToBeginning(partitions);
//        consumer.seek(partition0,495);

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("賭東道賭東道賭東道賭東道賭東道賭東道 offset = %d, key = %s, value = %s, partition = %s",
                        record.offset(), record.key(), record.value(),record.partition());
                System.out.println();
                currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
                if (count % 1 == 0)
                    consumer.commitAsync(currentOffsets, null);
                count++;
            }

        }
    }
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末禀横,一起剝皮案震驚了整個(gè)濱河市屁药,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌柏锄,老刑警劉巖酿箭,帶你破解...
    沈念sama閱讀 218,858評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異趾娃,居然都是意外死亡缭嫡,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門抬闷,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)妇蛀,“玉大人耕突,你說(shuō)我怎么就攤上這事∑兰埽” “怎么了眷茁?”我有些...
    開封第一講書人閱讀 165,282評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)纵诞。 經(jīng)常有香客問(wèn)我上祈,道長(zhǎng),這世上最難降的妖魔是什么浙芙? 我笑而不...
    開封第一講書人閱讀 58,842評(píng)論 1 295
  • 正文 為了忘掉前任登刺,我火速辦了婚禮,結(jié)果婚禮上茁裙,老公的妹妹穿的比我還像新娘塘砸。我一直安慰自己,他們只是感情好晤锥,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,857評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著廊宪,像睡著了一般矾瘾。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上箭启,一...
    開封第一講書人閱讀 51,679評(píng)論 1 305
  • 那天壕翩,我揣著相機(jī)與錄音,去河邊找鬼傅寡。 笑死放妈,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的荐操。 我是一名探鬼主播芜抒,決...
    沈念sama閱讀 40,406評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼托启!你這毒婦竟也來(lái)了宅倒?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,311評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤屯耸,失蹤者是張志新(化名)和其女友劉穎拐迁,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體疗绣,經(jīng)...
    沈念sama閱讀 45,767評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡线召,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了多矮。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片缓淹。...
    茶點(diǎn)故事閱讀 40,090評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出割卖,到底是詐尸還是另有隱情前酿,我是刑警寧澤,帶...
    沈念sama閱讀 35,785評(píng)論 5 346
  • 正文 年R本政府宣布鹏溯,位于F島的核電站罢维,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏丙挽。R本人自食惡果不足惜肺孵,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,420評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望颜阐。 院中可真熱鬧平窘,春花似錦、人聲如沸凳怨。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)肤舞。三九已至紫新,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間李剖,已是汗流浹背芒率。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評(píng)論 1 271
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留篙顺,地道東北人偶芍。 一個(gè)月前我還...
    沈念sama閱讀 48,298評(píng)論 3 372
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像德玫,于是被迫代替她去往敵國(guó)和親匪蟀。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,033評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容