Kafka學習——生產(chǎn)者和消費者客戶端

kafka客戶端

Kafka除了提供內(nèi)置Java客戶端外豪嚎,還提供了二進制連接協(xié)議仁烹,即向Kafka網(wǎng)絡(luò)端口發(fā)送適當?shù)淖止?jié)序列候齿,就可以實現(xiàn)從Kafka讀取消息或?qū)懭胂⑽们隆TS多語言實現(xiàn)了Kafka連接協(xié)議瞳筏,從而提供了許多非Java客戶端,比如Python牡昆、Go姚炕、C++等。
https://cwiki.apache.org/confluence/display/KAFKA/Clients

Kafka生產(chǎn)者

不同的使用場景對生產(chǎn)者API的使用和配置會有直接影響迁杨。比如信用卡處理系統(tǒng)钻心,不允許消息丟失、重復铅协,可以接受的寫入延遲最大為500ms捷沸,吞吐量需要每秒鐘能夠處理一百萬個消息。在比如用戶點擊事件存儲場景狐史,允許丟失少量消息或出現(xiàn)少量重復消息痒给,寫入延遲可以高一些说墨,吞吐量則取決于網(wǎng)站的使用頻度。
Kafka發(fā)送消息主要步驟:


發(fā)送消息

首先我們會創(chuàng)建一個ProducerRecord對象苍柏,ProducerRecord中包含了目標topic和消息內(nèi)容尼斧,我們同時還可以指定鍵和分區(qū)。在發(fā)送ProducerRecord對象前试吁,生產(chǎn)者會把鍵和值數(shù)據(jù)進行序列化棺棵,以便在網(wǎng)絡(luò)中進行傳輸。
接下來熄捍,數(shù)據(jù)會發(fā)發(fā)送給分區(qū)器烛恤,如果我們制定了寫入分區(qū),則分區(qū)器不會做任何事情余耽。如果沒有指定分區(qū)缚柏,分區(qū)器會根據(jù)ProducerRecord對象的鍵選擇一個分區(qū)。選擇好分區(qū)后碟贾,生產(chǎn)者就知道該往哪個主題的哪個分區(qū)下發(fā)送這條記錄了币喧。緊接著,這條記錄會被添加到一個記錄批次里面袱耽,這個批次里面的消息會被發(fā)送到相同主題和分區(qū)上杀餐。生產(chǎn)者中會有一個獨立的線程,將批次記錄發(fā)送到相應的broker上扛邑。
broker在接收到這些消息后會返回一個響應信息怜浅,如果寫入成功,則會返回一個RecordMetaData對象蔬崩,它包含了主題和分區(qū)信息恶座,以及在分區(qū)的偏移量信息。如果寫入失敗沥阳,則會返回一個錯誤信息跨琳。生產(chǎn)者在接收到錯誤信息后,會進行重新發(fā)送桐罕,如果重新發(fā)送幾次之后仍然失敗脉让,則生產(chǎn)者返回錯誤信息。

創(chuàng)建Kafka生產(chǎn)者

在創(chuàng)建Kafka生產(chǎn)者對象時功炮,我們可以設(shè)置一些屬性溅潜。Kafka生產(chǎn)者有3個必選的屬性。

  • bootstrap.servers:指定broker的地址清單薪伏,地址格式為:host1:port1,host2:port2滚澜。清單里不需要包含所有地址,因為生產(chǎn)者會從給定的broker中找到其它broker信息(仍然建議設(shè)置兩個以上嫁怀,防止其中一臺broker宕機)设捐。
  • key.serializer:指定消息鍵采用的序列化類型借浊,key.serializer必須被設(shè)置為一個實現(xiàn)了org.apache.kafka.common.serialization.Serializer接口的類,生產(chǎn)者會使用這個類把鍵對象序列化成字節(jié)數(shù)組萝招。Kafka客戶端默認提供了ByteArraySerializer蚂斤、StringSerializer和IntegerSerializer。需要注意槐沼,即便我們只發(fā)送value曙蒸,也需要設(shè)置key.serializer。
  • value.serializer:同key.serializer岗钩,value.serializer指定的類會將值序列化逸爵。

創(chuàng)建生產(chǎn)者對象:

Properties kafkaProps = new Properties();
kafkaProps.put(“bootstrap.servers”,”broker1:9092,broker2:9092”);
kafkaProps.put(“key.serializer”,”org.apache.kafka.common.serialization.StringSerializer”);
kafkaProps.put(“value.serializer”,”org.apache.kafka.common.serialization.StringSerializer”);
Producer<String,String> producer = new KafkaProducer<>(kafkaProps);

發(fā)送消息:

ProducerRecord<String,String> record = new ProducerRecord<>("my-demo-topic”,”message-key”,”message-value")
producer.send(record);

消息發(fā)送方式

Kafka生產(chǎn)者發(fā)送消息有兩種方式:同步發(fā)送和異步發(fā)送。

同步發(fā)送

producer.send()方法會返回一個Future對象凹嘲,如果我們調(diào)用Future對象的get()方法,則生產(chǎn)者會等待Kafka的相應构韵。如果服務器返回錯誤周蹭,get()方法會拋出異常。如果沒有發(fā)生錯誤疲恢,則會得到一個RecordMetaData對象凶朗。

Future<RecordMetaData> future = producer.send(record);//返回future對象
try {
    RecordMetadata recordMetadata = future.get();//同步阻塞等待返回結(jié)果
    System.out.println("topic:" + recordMetadata.topic() + ",partition:" + recordMetadata.partition() + "offset:" + recordMetadata.offset());//獲取寫入結(jié)果元數(shù)據(jù)信息
}catch (Exception e) {
    e.printStackTrace();
}

異步發(fā)送

同步發(fā)送結(jié)果寫入效率非常低,并且很多時候我們不需要等待RecordMetaData显拳,這時候我們就可以采用異步發(fā)送了棚愤。這時候我們只需要不調(diào)用get()方法即可實現(xiàn)異步發(fā)送消息。對于異步發(fā)送杂数,我們有可能需要知道當消息發(fā)送失敗時宛畦,我們能夠感知到。為了在異步發(fā)送消息的同時能夠?qū)Ξ惓O⑦M行處理揍移,生產(chǎn)者提供了回調(diào)支持次和。

class ProducerCallBack implements Callback {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if(e != null) {
            //出現(xiàn)異常踏施,進行處理
            e.printStackTrace();
        }
    }
}
producer.send(record日熬,new ProducerCallBack());

我們的回調(diào)類需要實現(xiàn)org.apache.kafka.clients.producer.CallBack接口碍遍,這個接口只有一個onCompletion方法怕敬。當Kafka返回一個錯誤信息時虽填,omComplete方法中的異常對象會為一個非空值。

錯誤類型

KakfaProducer一般會發(fā)生兩類錯誤恶守。其中一類是可重試錯誤飒赃,這類錯誤可以通過重發(fā)消息來解決,比如連接錯誤肋联、無主(no leader)錯誤等。KafkaProducer可以配置成自動重試娩贷,如果多次重試之后的結(jié)果仍然無法解決突倍,則應用程序會收到一個重試異常。另一類是無法通過重試能夠解決的異常盆昙,比如消息太大赘方,對于這類異常Kafka不會進行重試,而是直接拋出異常弱左。
需要注意的是,我們最好在producer.send()方法周圍使用try-catch來捕捉一下異常炕淮。因為在消息發(fā)送之前拆火,生產(chǎn)者還有可能發(fā)生其它異常,比如SerializationException(序列化失敗)涂圆、BufferExhaustedException或TimeoutException(緩沖區(qū)已滿)们镜、又或者InterruptException(發(fā)送線程被終端)等。

序列化器

Kafka自帶了字符串润歉、整數(shù)和字節(jié)數(shù)組序列化器模狭,但是還不能滿足大部分場景。如果發(fā)送到Kafka的對象不是簡單的字符串或整數(shù)踩衩,那么可以使用序列化框架來創(chuàng)建消息記錄嚼鹉,比如Avro、Thrift驱富、Protobuf锚赤,或者自定義序列器。

自定義序列化器

為了對Kafka序列化器有一個深入的了解褐鸥,我們可以自定義一個序列化器线脚,但是生成環(huán)境中,不建議使用自定義序列化器,除非場景非常特殊浑侥。
首先創(chuàng)建一個數(shù)據(jù)bean:

public class Customer {
    private int customerId;
    private String customerName;
  
    public int getCustomerId() {
        return customerId;
    }
    public String getCustomerName() {
        return customerName;
    }
    public void setCustomerId(int customerId) {
        this.customerId = customerId;
    }
    public void setCustomerName(String customerName) {
        this.customerName = customerName;
    }
}

創(chuàng)建序列化器姊舵,實現(xiàn)org.apache.kafka.common.serialization.Serializer接口:

public class CustomerSerializer implements Serializer<Customer> {
    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public byte[] serialize(String s, Customer customer) {
        byte[] serializedName;
        if(customer == null) {
            return null;
        }else {
            try {
                if(customer.getCustomerName() != null) {
                    serializedName = customer.getCustomerName().getBytes("UTF-8");
                } else {
                    serializedName = new byte[0];
                }
                //前四個字節(jié)存儲id,后四個字節(jié)存儲name長度寓落,最后存儲name
                ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + serializedName.length);
                buffer.putInt(customer.getCustomerId());
                buffer.putInt(serializedName.length);
                buffer.put(serializedName);
                return buffer.array();
            }catch (IOException ioe) {
                throw new SerializationException("serializer error!");
            }
        }
    }
    @Override
    public void close() {

    }
}

接下來就可以直接使用這個序列化器了括丁,首先將value.serializer設(shè)置為該類,然后就能使用ProducerRecord<String,Customer>了零如。
自定義序列化器有許多問題躏将,比如版本變遷,需要考慮版本兼容考蕾,多團隊寫kafka需要使用相同的序列化器等等祸憋。所以還是推薦已經(jīng)程序的序列化框架,比如Protobuf肖卧、Avro蚯窥、JSON或Thrift等。
之后可以學習整理一下在Kafka中使用第三方序列化框架塞帐。

分區(qū)器

Kafka消息是一個個鍵值對拦赠,但是鍵可以為null。鍵的用途主要有兩個:作為消息的附加信息和用來分區(qū)葵姥。
如果消息鍵為null荷鼠,即new ProducerRedcord<>(“topic”,”value")不設(shè)置key值,并且使用默認分區(qū)榔幸,那么所有消息會被隨機發(fā)送到主題的各個可用分區(qū)上允乐。分區(qū)器使用輪詢算法將消息均衡地分布到各個可用分區(qū)上。
如果消息鍵不為null削咆,并且使用了默認分區(qū)器牍疏,那么Kafka會使用自己的散列算法對鍵進行散列,然后根據(jù)散列值將消息映射到特定的分區(qū)上(所有相同的鍵都會映射到同一個分區(qū)上)拨齐。這里需要注意鳞陨,這時候散列映射是對topic的所有分區(qū),而不僅僅是可用的分區(qū)瞻惋,如果這時候有分區(qū)不可用厦滤,就可能會發(fā)生錯誤。還需要知道歼狼,如果主題的分區(qū)數(shù)發(fā)生了改變馁害,根據(jù)鍵的散列映射就可能發(fā)生變化。

自定義分區(qū)器

如果我們不想使用默認的分區(qū)器蹂匹,也可以實現(xiàn)自己的分區(qū)器碘菜。

public class CustomPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        
        List<PartitionInfo> partitions = cluster.partitionsForTopic("testTopic");
        int numPartitions = partitions.size();
        //通過key的長度對分區(qū)數(shù)取模
        if(keyBytes == null || !(key instanceof String))
            throw new InvalidRecordException("key is null or not is string!");
        String keyStr = String.valueOf(key);
        return keyStr.length() % (numPartitions - 1);
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map<String, ?> map) {
    }
}
//通過producer參數(shù)設(shè)置分區(qū)類
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.learn.tc.kafka.demo.CustomPartitioner");

生產(chǎn)者配置

生產(chǎn)者除了上面三個必要配置參數(shù),還有許多可配置的參數(shù),它們大部分都有合理的默認值忍啸,所以不需要修改它們仰坦。但是有一些參數(shù)在內(nèi)存使用、性能和可靠性方面對生產(chǎn)者影響比較大计雌。下面來看一下:

配置項 類型 默認值 取值范圍 描述
acks string 1 [all,-1,0,1] 需要多少個分區(qū)副本接收到消息悄晃,才認為寫入成功。acks=0:生產(chǎn)者不會等待服務器響應凿滤,如果消息丟失妈橄,生產(chǎn)者也無感知。因為無需等待相應翁脆,所以能夠以網(wǎng)絡(luò)支持的最大速度發(fā)送消息眷蚓,從而達到很高的吞吐量。acks=1:集群leader接受到消息后反番,生產(chǎn)者就會接受到來自服務器的成功相應沙热。如果leader接受到消息后掛掉了,還沒有來得及向其它節(jié)點發(fā)送消息罢缸,則可能出現(xiàn)消息丟失的問題篙贸。acks=all:所有參與服務的節(jié)點都接受到消息時,生產(chǎn)者才會收到寫入成功的相應枫疆。這是最安全的模式爵川,不過它的延遲也相對高一些。該設(shè)置同acks=-1息楔。
buffer.memory long 33554432(32M) [0,…] 生產(chǎn)者內(nèi)存緩沖區(qū)大小寝贡,生產(chǎn)者使用它緩存要發(fā)送到服務器的消息。如果應用程序發(fā)送消息的速度超過發(fā)送到服務器的速度钞螟,會導致生產(chǎn)者空間不足。send()方法會阻塞到max.block.ms參數(shù)所配置時間谎碍,如果超過這個時間鳞滨,則會拋出異常。
max.block.ms long 60000 [0,...] 阻塞等待最長時間蟆淀,比如緩沖區(qū)已滿拯啦,或者沒有可用元數(shù)據(jù)時,這些方法就會阻塞。在阻塞達到max.block.ms之后就會拋出超時異常。
compression.type string none [none,gzip,snappy,lz4] 生產(chǎn)者發(fā)送到服務器的消息是否進行壓縮究珊,默認是不壓縮的楚里。可以選擇gzip(客觀的壓縮比)称近、snappy(節(jié)省CPU)或lz4撇贺√酥可以降低網(wǎng)絡(luò)傳輸開銷和存儲開銷兵迅。
retries int 0 [0,…,2147483647] 當生產(chǎn)者接收到錯誤是臨時性錯誤時抢韭,可以通過retries參數(shù)設(shè)置重發(fā)消息的次數(shù)。重試時間間隔由retry.backoff.ms參數(shù)指定恍箭,默認為100ms(這個時間最好測試一下節(jié)點恢復時間刻恭,總嘗試次數(shù)時間大約節(jié)點恢復時間)。
batch.size int 16384(16k) [0,...] 當多個消息發(fā)送到同一個分區(qū)時扯夭,生產(chǎn)者會把他們放到同一個批次里鳍贾。該參數(shù)指定了一個批次可以使用的內(nèi)存大小。當被填滿后交洗,這個批次消息就會被發(fā)送出去(不一定等填滿才發(fā)送骑科,達到了linger.ms上限)。如果設(shè)置為0藕筋,則每條消息都會立即發(fā)送纵散。它的大小需要你在內(nèi)存、吞吐量和延遲上做出一個平衡隐圾。
linger.ms long 0 [0,...] 指定生產(chǎn)者在發(fā)送批次之前等待多長時間伍掀,KakfaProducer會在批次填滿或linger.ms達到上限時把批次消息發(fā)送出去。默認為0暇藏,即只要有線程可用蜜笤,就會將批次消息發(fā)送出去。
client.id string "" 任意字符串盐碱,服務器會用它標識消息來源把兔,還可用在日志和配額指標里。
max.in.flight.requests.per.connection int 5 [1,...] 指定生產(chǎn)者在收到服務器響應之前可以發(fā)送多少個消息瓮顽。值越高县好,占用內(nèi)存越高,并且可能導致重排序風險(失敗重試導致)暖混,但是可以得到很高的吞吐量缕贡。
timeout.ms int 30000 [0,...] 指定了broker等待副本同步返回確認消息的時間,與acks的配置相匹配(設(shè)置為大于1)拣播。
request.timeout.ms int 30000 [0,...] 生產(chǎn)者發(fā)送數(shù)據(jù)時等待服務器返回相應時間晾咪。
max.request.size int 1048576(1M) [0,...] 控制生產(chǎn)者發(fā)送數(shù)據(jù)的大小,但這個是指生產(chǎn)者向服務器發(fā)送的批次的大小贮配,即該批次下的消息總大小谍倦。(感覺和batch.size在使用上有點相似)
receive.buffer.bytes int 32768(32k) [-1,...] 指定TCP socket接受數(shù)據(jù)包的緩沖區(qū)大小,如果被設(shè)置成-1泪勒,就是用操作系統(tǒng)的默認值昼蛀。當生產(chǎn)者或消費者不在同一個數(shù)據(jù)中心的時候宴猾,可以適當調(diào)大,因為跨數(shù)據(jù)中心曹洽,網(wǎng)絡(luò)傳輸有較高延遲和比較高低的延遲帶寬鳍置。

Kafka消費者

消費者和消費者群組

消費Kafka中的數(shù)據(jù),Kafka提供了消費者和消費者群組的概念送淆。Kafka消費者從屬于消費者群組税产,一個群組里的消費者訂閱的是相同主題,也就是主題訂閱是消費者群組級別的訂閱偷崩,而不是單個消費者訂閱辟拷。消費者群組中的每個消費者都會接受訂閱主題的部分分區(qū)的數(shù)據(jù),也就是說同一個分區(qū)中的消息只會被消費者群組中的一個消費者所消費阐斜。

為什么要使用消費者群組的概念

消費者群組是為了橫向伸縮消費者處理能力主要方式衫冻,當一個消費者的處理能力跟不上生產(chǎn)者的生產(chǎn)速度時,這時候就需要增加消費者谒出,從同一個主題中讀取消息隅俘,對消息進行分流,從而擴展了數(shù)據(jù)處理能力笤喳。

消費者如何對主題消息進行分流

下面是一組主題T1和對應消費者群組的映射圖:
當消費者群組中只有一個消費者時候为居,這個消費者會接受主題內(nèi)所有分區(qū)的數(shù)據(jù):


一個消費者

當消費者群組有兩個消費者時候,這個兩個消費者會平分主題內(nèi)的所有分區(qū):


兩個消費者均分

當消費者個數(shù)與分區(qū)數(shù)相同時杀狡,每個消費者都會消費特定分區(qū)消息蒙畴,即分區(qū)與消費者進行了一一映射:


一一映射

當消費者個數(shù)超過了分區(qū)數(shù)時,這時候多于的消費者就會閑置:


多于消費者閑置

從中可以看到呜象,消費者群組中有效的消費者個數(shù)膳凝,是由消費主題決定的。往消費者群組里面增加消費者是橫向伸縮消費能力的主要方式恭陡,由于Kafka消費者經(jīng)常做一些高延遲的操作蹬音,這種情況下單個消費者無法跟上數(shù)據(jù)的生產(chǎn)速度,所以需要添加更多的消費者休玩。這時候也就需要為主題創(chuàng)建大量的分區(qū)著淆,在負載增長時可以加入更多的消費者。
Kafka還支持多個應用程序消費相同的主題哥捕,在這種情況下牧抽,每個應用程序都會獲取到主題的所有消息嘉熊,而不是部分消息遥赚。這時候只要保證每個應用程序有自己的消費者群組即可。

分區(qū)再均衡

當群組中新增消費者或移除消費者時阐肤,會觸發(fā)分區(qū)重新分配凫佛,新增的消費者消費的分區(qū)原本是由其它分區(qū)消費的讲坎,而移除的消費者,原本由它處理分區(qū)會分配給其它消費者愧薛。除了新增或移除消費者外晨炕,新增分區(qū)也會導致再分區(qū)。
分區(qū)的所有權(quán)發(fā)生變更毫炉,這種行為稱為再均衡瓮栗。再均衡非常重要,因為它為消費者群組帶來了高可用和伸縮型瞄勾。但是再均衡會導致消費者群組一小段的時間不可用费奸,并且當一個分區(qū)重新分配給一個消費者時,消費者的當前狀態(tài)會丟失进陡,它有可能還需要去刷新緩存愿阐,從而拖慢整個應用程序。所以我們應該盡量避免再均衡的問題趾疚。

Kafka集群如何感知消費者不可用

消費者是通過向群組協(xié)調(diào)器(Kafka為每個消費者群組都指定了一個broker)來維持它和群組的關(guān)系以及它對分區(qū)的所有權(quán)關(guān)系缨历,只要消費者在規(guī)定時間內(nèi)向發(fā)送心跳,就會被認為活躍的糙麦,說明它還在讀取分區(qū)中的消息辛孵。消費者是利用輪詢消息或提交偏移量時發(fā)送心跳的,如果在規(guī)定時間間隔內(nèi)沒有發(fā)送心跳喳资,群組協(xié)調(diào)器就會認為該消費者已經(jīng)死亡觉吭,就會觸發(fā)一次再均衡。
需要注意仆邓,在0.10.1之后的版本里鲜滩,Kafka引入了一個獨立的心跳線程,可以在輪詢消息的空擋時間發(fā)送心跳节值。這樣發(fā)送心跳的頻率與消息輪詢的頻率就進行了解耦徙硅。

創(chuàng)建Kafka消費者

創(chuàng)建Kakfa消費者需要創(chuàng)建KafkaConsumer對象,KafkaConsumer和KakfaProducer一樣搞疗,也有一些必要屬性嗓蘑。

  • bootstrap.server:指定了Kakfa集群的連接字符串,用途和KakfaProducer一樣匿乃。需要注意桩皿,在0.8.2版本中這個參數(shù)是zookeeper.connect,即Kafka集群所使用的zk集群地址幢炸。
  • key.deserializer:與KafkaProducer的key.serializer一樣泄隔,只不過是負責將key由字節(jié)轉(zhuǎn)為java對象。
  • value:deserializer:與KakfaProducer的value.serializer一樣宛徊,只不過是負責將value由字節(jié)轉(zhuǎn)為java對象佛嬉。
  • group.id:不是必須的字段逻澳,但是一般都會設(shè)置,這個是該消費者所屬的消費者群組暖呕。
Properties props = new Properties();
props.put("bootstrap.servers","192.168.0.1:9092");
props.put("group.id","test");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);

創(chuàng)建完KafkaConsumer之后斜做,接下來就需要訂閱消費的主題了,它支持訂閱一個主題列表湾揽,同時也支持正則表表達式瓤逼,當有滿足正則表達式的主題被創(chuàng)建后,消費者群組就會觸發(fā)一次再均衡库物,來消費新的主題抛姑。

consumer.subscribe(Arrays.asList(“my-demo-topic”));

接下來就是輪詢消息了,因為消費者是一個常駐任務艳狐,所以將其放在一個無限循環(huán)中定硝。

try{
  while (true) {
      ConsumerRecords<String,String> records = consumer.poll(100);
      for(ConsumerRecord<String,String> record : records) {
          System.out.printf("offset = %d ,key = %s, value = %s%n",record.offset(),record.key(),record.value());
      }
  }
}finally{
    consumer.close();
}

poll()方法就是對Kafka進行輪詢(心跳發(fā)送也在這時候),poll()方法中是一個超時參數(shù)毫目,當消費者緩沖區(qū)里面沒有可用數(shù)據(jù)時蔬啡,會發(fā)生阻塞,當?shù)竭_超時時間后镀虐,無論緩沖區(qū)有沒有數(shù)據(jù)都會返回箱蟆。如果該參數(shù)設(shè)為0,pll()方法會立即返回刮便,否則它會在超時時間內(nèi)一直等待broker返回數(shù)據(jù)空猜。
poll()方法會返回一個ConsumerRecords列表,每個ConsumerRecords代表一條記錄恨旱,它記錄了主題信息辈毯、分區(qū)、偏移量搜贤,以及對應的鍵值對谆沃。
最后在退出應用程序之前,調(diào)用close方法關(guān)閉消費者仪芒。網(wǎng)絡(luò)連接和socket也會隨之關(guān)閉唁影,并且立觸發(fā)發(fā)一次再均衡,而不是等待群組協(xié)調(diào)器發(fā)現(xiàn)死亡掂名,從而節(jié)省了再分配的時間据沈。

消費者配置

配置項 類型 默認值 取值范圍 描述
fetch.min.bytes int 1 [0,...] 指定了消費者從服務器獲取記錄的最小字節(jié)數(shù),如果broker可用數(shù)據(jù)小于該值饺蔑,那么broker會等到有足夠數(shù)據(jù)時才會返回锌介。能夠降低消費者和broker的負載。
fetch.max.wait.ms int 500 [0,…] 指定了有足夠數(shù)據(jù)時在返回消費者膀钠,而fetch.max.wait.ms則指定了等待時間掏湾。當kafka接收到請求后,或者達到feth.min.bytes指定大小返回肿嘲,或者達到fetch.max.wait.ms指定的時間返回融击,看哪個先滿足。
max.partition.fetch.bytes int 1048976(1M) [0,...] 指定了服務器從每個分區(qū)里返回給消費者的最大字節(jié)數(shù)雳窟,這個配置應該比broker能夠接受的最大消息大尊浪,否則消費者可能無法讀取這些大消息,導致消費之一直掛起重試封救。
session.timeout.ms int 10000 指定了消費者被認為死亡之前拇涤,能夠與服務器斷開連接的時間。即在超過該參數(shù)指定的時間外誉结,沒有發(fā)送心跳鹅士,就會被認為死亡,觸發(fā)再均衡惩坑。該參數(shù)與heartbeat.interval.ms相關(guān)掉盅,因為它指定了發(fā)送心跳的時間間隔。
heartbeat.interval.ms int 3000 用途如上所說以舒,控制發(fā)送心跳的時間間隔趾痘。一般為session.timeout.ms的1/3÷樱可以將它們設(shè)置的低一些永票,以便盡快觸發(fā)再均衡。
auto.offset.rest string latest [latest,earliest,none] 指定了消費者在讀取一個沒有偏移量的分區(qū)或者偏移量無效(消費者長時間失效滥沫,偏移量被刪除)時侣集,該從哪里讀取。默認為latest兰绣,即最新記錄開始讀取肚吏。earliest為從起始位置讀取。none為當找不到偏移量時拋出異常狭魂。
enable.auto.commit boolean true 指定了消費者是否自動提交偏移量罚攀。為了避免數(shù)據(jù)丟失或出現(xiàn)重復讀取數(shù)據(jù),可以改為false雌澄,由自己控制提交偏移量斋泄。
max.poll.records int 500 指定了單次調(diào)用poll()方法所能夠返回最大數(shù)據(jù)量。

提交偏移量

Kafka不同其它的消息系統(tǒng)需要等待消費者確認才認為該消息被處理了镐牺,Kafka是采用偏移量來幫助消費者追蹤分區(qū)內(nèi)消息被處理的位置炫掐,消費者更新當前位置的操作在kafka中稱為提交。
消費者會向一個叫做_consumer_offset的特殊topic中發(fā)送消息睬涧,消息包含了每個分區(qū)的偏移量募胃。如果消費者處于一直運行狀態(tài)旗唁,這個偏移量是沒有用的,但是當消費者崩潰或者有新的消費者加入消費者群組是痹束,就會觸發(fā)再均衡检疫。這時候每個消費者可能分配到新的分區(qū),為了能夠繼續(xù)之前的工作祷嘶,消費者需要讀取每個分區(qū)的偏移量屎媳,然后從最后提交偏移量的位置繼續(xù)工作。
當發(fā)生再均衡的時候论巍,提交偏移量方式會對客戶端影響很大烛谊,會導致重復消費消息或?qū)е孪G失。
如果最后提交偏移量的位置小于客戶端最后處理消息的位置嘉汰,那么會導致兩個偏移量之間消息被重復處理丹禀。


重復消費

如果最后提交偏移量的位置大于客戶端最后處理消息的位置,那么會導致兩個偏移量之間消息丟失鞋怀。


消息丟失

針對客戶端不同的需求湃崩,Kafka的Consumer提供了多種提交偏移量的API。

自動提交

Kafka提供了自動提交偏移量的方式接箫,這也是最簡單的方式攒读。通過將enable.auto.commit(默認為true,老版本叫auto.commit.enable)設(shè)置為true辛友,然后每隔auto.commit.interval.ms(默認為5s)會自動將從poll()方法接收到的最大偏移量提交上去薄扁。這種提交方式,也和消費者的其它東西一樣废累,自動提交也是在輪詢過程中完成的邓梅。每次輪詢自動提交上去的偏移量,都是上一次poll方法返回的最大偏移量邑滨,它并不知道哪些消息真正被處理了日缨,所以在每次調(diào)用之前,最好確保當前poll()方法返回的消息都被處理了掖看,消費者也會在調(diào)用close()方法之前進行自動提交匣距。
需要注意使用自動提交這種方式,一般不會有什么問題哎壳,不過在產(chǎn)生異骋愦或者提前退出輪詢時,就會產(chǎn)生我們上面說的兩種情況归榕,可能導致消息重復消費或消息丟失尸红。如果是對消息處理嚴格的業(yè)務,建議不要使用這種方式。
我們可以通過縮小提交偏移量的時間來消除消息丟失的可能性外里,并在再均衡時減少重復消息的數(shù)量怎爵。

手動提交

消費者還提供了另外一種提交偏移量的方式,就是把enable.auto.commit設(shè)置為false盅蝗,然后通過commitSync()方法提交偏移量鳖链。commitSync()方法是最可靠也是最簡單的方式,它會把poll()方法返回的最新偏移量提交上去风科,如果提交成功立馬返回,如果提交失敗則會拋出異常乞旦。
需要注意的是commitSync()方法一定要在poll()方法返回的消息處理完成后在提交贼穆,否則還會有消息丟失的風險的。commitSync()能夠有效避免消息丟失情況兰粉,但是還會存在消息重復處理的問題的故痊,當發(fā)生再均衡時,從最近一批消息到發(fā)生再均衡之前的消息會被重復處理玖姑。

while (true) {
    ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
        for(ConsumerRecord<String,String> record : records) {
            //處理邏輯
        }
    try {
        consumer.commitSync();  //處理完本批次消息在提交偏移量
    }catch (Exception e) {
        LOG.error("commit offset failed",e);
   }    
}

當使用commitSync()提交偏移量發(fā)生異常時愕秫,它會進行重試提交,直到提交成功焰络。
通過commitSync()的方法名稱就能直到戴甩,這是一個同步提交偏移量的方法,在結(jié)果返回之前闪彼,它會一直阻塞甜孤,這樣會限制應用程序的吞吐量。雖然我們可以通過降低提交頻率來提升吞吐量畏腕,但是當發(fā)生再均衡時缴川,會導致增大重復處理消息數(shù)量。

異步提交

Kafka提供了commitAsync()方法來提交偏移量描馅,commitAsync()在提交完偏移量后立馬返回把夸,并且比如偏移量提交失敗,它也不會進行重復提交铭污,因為下一批次更大的偏移量可能已經(jīng)提交了恋日,重復提交會導致覆蓋更大的偏移量。不過commitAsync()方法提供回調(diào)嘹狞,當broker做出相應時執(zhí)行回調(diào)谚鄙。回調(diào)一般用來記錄異常和度量指標刁绒,如果我們重新提交偏移量闷营,一定要注意提交順序,因為有可能更大偏移量已經(jīng)提交了,一般可以通過在回調(diào)加一個遞增的序列號來維護異步提交的順序傻盟。

while (true) {
    ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
    for(ConsumerRecord<String,String> record : records) {
        //處理邏輯
    }
    consumer.commitAsync(new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
            if(e != null) {
                LOG.error("Commit offset fail.",e);
            }
        }
    });
}

同步和異步組合提交

一般偶爾出現(xiàn)提交失敗速蕊,不進行重試影響不大,因為提交失敗時臨時問題導致的娘赴,那么后續(xù)總有提交成功的時候规哲。但是如果這時關(guān)閉消費者或者再均衡最后一次的提交,就需要確保提交成功诽表。

try {
    while (true) {
        ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
        for(ConsumerRecord<String,String> record : records) {
            //處理邏輯
        }
        consumer.commitAsync();
    }
}catch (Exception e) {
    e.printStackTrace();
}finally {
    try {
        consumer.commitSync();
    }catch (Exception e) {
        e.printStackTrace();
    }
}

一般時候使用異步提交唉锌,來保證系統(tǒng)的吞吐量。如果直接關(guān)閉消費者竿奏,就需要使用commitSync()方法確保一定提交成功袄简。

提交指定位置的偏移量

commitSync()和commitAsync()方法每次只會提交poll()下來消息的最大偏移量,有時候我們poll()下來大批量數(shù)據(jù)泛啸,為了避免再均衡引起的大量重復處理消息绿语,可能想在中途過程中就提交偏移量。Kafka針對這種情況候址,允許我們在調(diào)用commitAsync()和commitSync()時傳入希望提交的分區(qū)和對應的偏移量吕粹。

Map<TopicPartition,OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;
while (true) {
    ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
    for(ConsumerRecord<String,String> record : records) {
        //處理邏輯
        currentOffsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset() + 1));
        if(count % 100000 == 0) {
consumer.commitAsync(currentOffsets, new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
                    if(e != null) {
                        LOG.error("commit offset fail!");
                    }
                }
            });
        }
    }
}

之所以使用Map存儲分區(qū)和offset,是因為一個消費者可能訂閱了多個topic岗仑。

再均衡監(jiān)聽器

當發(fā)生再均衡時匹耕,我們可能有一些事情需要做,比如提交最后的偏移量荠雕。Kafka為我們提供了再均衡監(jiān)聽器泌神,當發(fā)生再均衡時會調(diào)用監(jiān)聽器中的相應方法。再均衡監(jiān)聽器是在consumer訂閱主題時傳遞進去的舞虱,你需要實現(xiàn)ConsumerRebalanceListener接口欢际。

class RebalanceHandler implements ConsumerRebalanceListener {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        //再均衡開始之前和消費者停止讀取消息之后調(diào)用
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        //再均衡分區(qū)重新分配之后和消費者開始消費之前調(diào)用
    }
}

使用再均衡監(jiān)聽器來提交偏移量。

class RebalanceHandler implements ConsumerRebalanceListener {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            //再均衡開始之前和消費者停止讀取消息之后調(diào)用
            consumer.commitSync(currentOffsets);
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            //再均衡分區(qū)重新分配之后和消費者開始消費之前調(diào)用
        }
    }
    
    consumer.subscribe(Arrays.asList("demo-topic"),new RebalanceHandler());
   
    int count = 0;
    while (true) {
        ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
        for(ConsumerRecord<String,String> record : records) {
            //處理邏輯
            currentOffsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset() + 1));
            if(count % 100000 == 0) {
consumer.commitAsync(currentOffsets, new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
                        if(e != null) {
                            LOG.error("commit offset fail!");
                        }
                    }
                });
            }
        }
    }

}

從特定偏移量處理記錄

Kafka提供從指定位置讀取消息的API以便我們使用矾兜,如果想要從開始分區(qū)的起始位置開始讀取损趋,或者直接跳到分區(qū)的末尾開始讀取消息,可以使用seekToBeginning(Collection<TopicPartition> tp)方法和seekToEnd(Collection<TopicPartition> tp)方法椅寺。

consumer.seekToBeginning(Arrays.asList(new TopicPartition("demo-topic",0))); //指定topic和分區(qū)
consumer.seekToEnd(Arrays.asList(new TopicPartition("demo-topic",1)));

Kafka也為我們提供了查找特定偏移量的API浑槽,它有許多用途,比如向前或向后偏移幾個記錄等等返帕。seek()方法能夠用來指定特定的偏移量桐玻,我們只需要指定topic、分區(qū)以及具體的偏移量即可荆萤。

consumer.seek(new TopicPartition(“demo-topic”,3),1500); //主題demo-topic的分區(qū)3镊靴,從偏移量1500開始讀取

使用seek()方法指定偏移量讀取铣卡,一般是將偏移量存儲到Kafka之外了。因為Kafka的提交偏移量無論怎么優(yōu)化都有可能導致重讀消息的風險(再均衡監(jiān)聽器只能對正常服務的消費者起作用偏竟,如果消費者掛掉后煮落,是觸發(fā)不到的),因為消費數(shù)據(jù)和提交偏移量本身不是一個原子操作踊谋。這時候我們可以能借助外部存儲系統(tǒng)蝉仇,比如我們將從Kafka處理之后的數(shù)據(jù)寫到Mysql,然后也將偏移量寫到Mysql中殖蚕,然后通過事務將二者關(guān)聯(lián)起來轿衔,這樣要么都處理成功,要么都處理失敗睦疫。


    class RebalanceHandler implements ConsumerRebalanceListener {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            //即將失去分區(qū)前提交事務,確保記錄成功保存
            //commitDBTransaction();
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            //從數(shù)據(jù)庫獲取偏移量害驹,再分配到新分區(qū)后,使用seek()定位
            for(TopicPartition partition : partitions){
                //consumer.seek(partition,getOffsetFromDB(partition));
            }
        }
    }

    consumer.subscribe(Arrays.asList("demo-topic"),new RebalanceHandler());
    //調(diào)用一次pool()方法笼痛,目的是讓消費者加入到消費者群組裙秋,并獲取到分區(qū)琅拌,然后調(diào)用seek()方法定位到分區(qū)偏移量
    consumer.poll(Duration.ofMillis(0)); 
    
    for (TopicPartition partition : consumer.assignment()) {
        //consumer.seek(topic,getOffsetFromDB(partition)); //獲取每個主題分區(qū)的維值
    }
    
    while (true) {
        ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
        for(ConsumerRecord<String,String> record : records) {
            //processRecord(record);//處理邏輯
            //storeRecord(record);//保存處理記錄
            //storeOffsetInDB(record.topic(),record.partition(),record.offset());//保存偏移量
        }
        //commitDBTransaction();//提交事務
    }
}

消費者優(yōu)雅退出

Kafka提供了consumer.weakup()方法通過拋出異常的方式來停止消費者缨伊,weakup()是kafka提供的唯一一個可以從其它線程里安全調(diào)用的方法。調(diào)用consumer.weakup()可以退出poll()进宝,并且拋出WeakupException異常刻坊,我們不需要處理這種異常,因為它只是跳出循環(huán)的一種方式党晋。還需注意谭胚,在退出線程之前,還需要調(diào)用consumer.close()方法未玻,它會提交任何還沒有提交的任務灾而,并向群組協(xié)調(diào)器發(fā)送消息,告知離開扳剿,立即觸發(fā)再均衡旁趟,而無需等待會話超時。

//main shuthook()
public void  shuthook(Consumer consumer,Thread mainThread) {
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        consumer.wakeup();
        try {
            mainThread.join(); //主線程等待
        }catch (InterruptedException e) {
            e.printStackTrace();
        }
    }));
}
//
try{
    while(true) {
        ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
        for(ConsumerRecord<String,String> record : records) {
         //處理邏輯
        }
        consumer.commitAsync();
    }
}catch(WakeupException e) {
//無需處理
}finally {
consumer.close()
}

自定義反序列化器

生產(chǎn)者中我們說了如何定義序列化器庇绽,消費者中的反序列化器定義和序列化器定義一樣锡搜,只不過是實現(xiàn)Deserializer接口,其它使用方式都和生產(chǎn)者一樣瞧掺。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末耕餐,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子辟狈,更是在濱河造成了極大的恐慌肠缔,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,482評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異桩砰,居然都是意外死亡拓春,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評論 2 382
  • 文/潘曉璐 我一進店門亚隅,熙熙樓的掌柜王于貴愁眉苦臉地迎上來硼莽,“玉大人,你說我怎么就攤上這事煮纵《遥” “怎么了?”我有些...
    開封第一講書人閱讀 152,762評論 0 342
  • 文/不壞的土叔 我叫張陵行疏,是天一觀的道長匆光。 經(jīng)常有香客問我,道長酿联,這世上最難降的妖魔是什么终息? 我笑而不...
    開封第一講書人閱讀 55,273評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮贞让,結(jié)果婚禮上周崭,老公的妹妹穿的比我還像新娘。我一直安慰自己喳张,他們只是感情好续镇,可當我...
    茶點故事閱讀 64,289評論 5 373
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著销部,像睡著了一般摸航。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上舅桩,一...
    開封第一講書人閱讀 49,046評論 1 285
  • 那天酱虎,我揣著相機與錄音,去河邊找鬼擂涛。 笑死读串,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的歼指。 我是一名探鬼主播爹土,決...
    沈念sama閱讀 38,351評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼踩身!你這毒婦竟也來了胀茵?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,988評論 0 259
  • 序言:老撾萬榮一對情侶失蹤挟阻,失蹤者是張志新(化名)和其女友劉穎琼娘,沒想到半個月后峭弟,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,476評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡脱拼,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,948評論 2 324
  • 正文 我和宋清朗相戀三年瞒瘸,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片熄浓。...
    茶點故事閱讀 38,064評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡情臭,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出赌蔑,到底是詐尸還是另有隱情俯在,我是刑警寧澤,帶...
    沈念sama閱讀 33,712評論 4 323
  • 正文 年R本政府宣布跷乐,位于F島的核電站趾浅,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏皿哨。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,261評論 3 307
  • 文/蒙蒙 一往史、第九天 我趴在偏房一處隱蔽的房頂上張望佛舱。 院中可真熱鬧,春花似錦请祖、人聲如沸务热。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽眼虱。三九已至,卻和暖如春席纽,著一層夾襖步出監(jiān)牢的瞬間捏悬,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評論 1 262
  • 我被黑心中介騙來泰國打工润梯, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留过牙,地道東北人甥厦。 一個月前我還...
    沈念sama閱讀 45,511評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像寇钉,于是被迫代替她去往敵國和親刀疙。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,802評論 2 345

推薦閱讀更多精彩內(nèi)容