kafka客戶端
Kafka除了提供內(nèi)置Java客戶端外豪嚎,還提供了二進制連接協(xié)議仁烹,即向Kafka網(wǎng)絡(luò)端口發(fā)送適當?shù)淖止?jié)序列候齿,就可以實現(xiàn)從Kafka讀取消息或?qū)懭胂⑽们隆TS多語言實現(xiàn)了Kafka連接協(xié)議瞳筏,從而提供了許多非Java客戶端,比如Python牡昆、Go姚炕、C++等。
https://cwiki.apache.org/confluence/display/KAFKA/Clients
Kafka生產(chǎn)者
不同的使用場景對生產(chǎn)者API的使用和配置會有直接影響迁杨。比如信用卡處理系統(tǒng)钻心,不允許消息丟失、重復铅协,可以接受的寫入延遲最大為500ms捷沸,吞吐量需要每秒鐘能夠處理一百萬個消息。在比如用戶點擊事件存儲場景狐史,允許丟失少量消息或出現(xiàn)少量重復消息痒给,寫入延遲可以高一些说墨,吞吐量則取決于網(wǎng)站的使用頻度。
Kafka發(fā)送消息主要步驟:
首先我們會創(chuàng)建一個ProducerRecord對象苍柏,ProducerRecord中包含了目標topic和消息內(nèi)容尼斧,我們同時還可以指定鍵和分區(qū)。在發(fā)送ProducerRecord對象前试吁,生產(chǎn)者會把鍵和值數(shù)據(jù)進行序列化棺棵,以便在網(wǎng)絡(luò)中進行傳輸。
接下來熄捍,數(shù)據(jù)會發(fā)發(fā)送給分區(qū)器烛恤,如果我們制定了寫入分區(qū),則分區(qū)器不會做任何事情余耽。如果沒有指定分區(qū)缚柏,分區(qū)器會根據(jù)ProducerRecord對象的鍵選擇一個分區(qū)。選擇好分區(qū)后碟贾,生產(chǎn)者就知道該往哪個主題的哪個分區(qū)下發(fā)送這條記錄了币喧。緊接著,這條記錄會被添加到一個記錄批次里面袱耽,這個批次里面的消息會被發(fā)送到相同主題和分區(qū)上杀餐。生產(chǎn)者中會有一個獨立的線程,將批次記錄發(fā)送到相應的broker上扛邑。
broker在接收到這些消息后會返回一個響應信息怜浅,如果寫入成功,則會返回一個RecordMetaData對象蔬崩,它包含了主題和分區(qū)信息恶座,以及在分區(qū)的偏移量信息。如果寫入失敗沥阳,則會返回一個錯誤信息跨琳。生產(chǎn)者在接收到錯誤信息后,會進行重新發(fā)送桐罕,如果重新發(fā)送幾次之后仍然失敗脉让,則生產(chǎn)者返回錯誤信息。
創(chuàng)建Kafka生產(chǎn)者
在創(chuàng)建Kafka生產(chǎn)者對象時功炮,我們可以設(shè)置一些屬性溅潜。Kafka生產(chǎn)者有3個必選的屬性。
- bootstrap.servers:指定broker的地址清單薪伏,地址格式為:host1:port1,host2:port2滚澜。清單里不需要包含所有地址,因為生產(chǎn)者會從給定的broker中找到其它broker信息(仍然建議設(shè)置兩個以上嫁怀,防止其中一臺broker宕機)设捐。
- key.serializer:指定消息鍵采用的序列化類型借浊,key.serializer必須被設(shè)置為一個實現(xiàn)了org.apache.kafka.common.serialization.Serializer接口的類,生產(chǎn)者會使用這個類把鍵對象序列化成字節(jié)數(shù)組萝招。Kafka客戶端默認提供了ByteArraySerializer蚂斤、StringSerializer和IntegerSerializer。需要注意槐沼,即便我們只發(fā)送value曙蒸,也需要設(shè)置key.serializer。
- value.serializer:同key.serializer岗钩,value.serializer指定的類會將值序列化逸爵。
創(chuàng)建生產(chǎn)者對象:
Properties kafkaProps = new Properties();
kafkaProps.put(“bootstrap.servers”,”broker1:9092,broker2:9092”);
kafkaProps.put(“key.serializer”,”org.apache.kafka.common.serialization.StringSerializer”);
kafkaProps.put(“value.serializer”,”org.apache.kafka.common.serialization.StringSerializer”);
Producer<String,String> producer = new KafkaProducer<>(kafkaProps);
發(fā)送消息:
ProducerRecord<String,String> record = new ProducerRecord<>("my-demo-topic”,”message-key”,”message-value")
producer.send(record);
消息發(fā)送方式
Kafka生產(chǎn)者發(fā)送消息有兩種方式:同步發(fā)送和異步發(fā)送。
同步發(fā)送
producer.send()方法會返回一個Future對象凹嘲,如果我們調(diào)用Future對象的get()方法,則生產(chǎn)者會等待Kafka的相應构韵。如果服務器返回錯誤周蹭,get()方法會拋出異常。如果沒有發(fā)生錯誤疲恢,則會得到一個RecordMetaData對象凶朗。
Future<RecordMetaData> future = producer.send(record);//返回future對象
try {
RecordMetadata recordMetadata = future.get();//同步阻塞等待返回結(jié)果
System.out.println("topic:" + recordMetadata.topic() + ",partition:" + recordMetadata.partition() + "offset:" + recordMetadata.offset());//獲取寫入結(jié)果元數(shù)據(jù)信息
}catch (Exception e) {
e.printStackTrace();
}
異步發(fā)送
同步發(fā)送結(jié)果寫入效率非常低,并且很多時候我們不需要等待RecordMetaData显拳,這時候我們就可以采用異步發(fā)送了棚愤。這時候我們只需要不調(diào)用get()方法即可實現(xiàn)異步發(fā)送消息。對于異步發(fā)送杂数,我們有可能需要知道當消息發(fā)送失敗時宛畦,我們能夠感知到。為了在異步發(fā)送消息的同時能夠?qū)Ξ惓O⑦M行處理揍移,生產(chǎn)者提供了回調(diào)支持次和。
class ProducerCallBack implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e != null) {
//出現(xiàn)異常踏施,進行處理
e.printStackTrace();
}
}
}
producer.send(record日熬,new ProducerCallBack());
我們的回調(diào)類需要實現(xiàn)org.apache.kafka.clients.producer.CallBack
接口碍遍,這個接口只有一個onCompletion方法怕敬。當Kafka返回一個錯誤信息時虽填,omComplete方法中的異常對象會為一個非空值。
錯誤類型
KakfaProducer一般會發(fā)生兩類錯誤恶守。其中一類是可重試錯誤飒赃,這類錯誤可以通過重發(fā)消息來解決,比如連接錯誤肋联、無主(no leader)錯誤等。KafkaProducer可以配置成自動重試娩贷,如果多次重試之后的結(jié)果仍然無法解決突倍,則應用程序會收到一個重試異常。另一類是無法通過重試能夠解決的異常盆昙,比如消息太大赘方,對于這類異常Kafka不會進行重試,而是直接拋出異常弱左。
需要注意的是,我們最好在producer.send()方法周圍使用try-catch來捕捉一下異常炕淮。因為在消息發(fā)送之前拆火,生產(chǎn)者還有可能發(fā)生其它異常,比如SerializationException(序列化失敗)涂圆、BufferExhaustedException或TimeoutException(緩沖區(qū)已滿)们镜、又或者InterruptException(發(fā)送線程被終端)等。
序列化器
Kafka自帶了字符串润歉、整數(shù)和字節(jié)數(shù)組序列化器模狭,但是還不能滿足大部分場景。如果發(fā)送到Kafka的對象不是簡單的字符串或整數(shù)踩衩,那么可以使用序列化框架來創(chuàng)建消息記錄嚼鹉,比如Avro、Thrift驱富、Protobuf锚赤,或者自定義序列器。
自定義序列化器
為了對Kafka序列化器有一個深入的了解褐鸥,我們可以自定義一個序列化器线脚,但是生成環(huán)境中,不建議使用自定義序列化器,除非場景非常特殊浑侥。
首先創(chuàng)建一個數(shù)據(jù)bean:
public class Customer {
private int customerId;
private String customerName;
public int getCustomerId() {
return customerId;
}
public String getCustomerName() {
return customerName;
}
public void setCustomerId(int customerId) {
this.customerId = customerId;
}
public void setCustomerName(String customerName) {
this.customerName = customerName;
}
}
創(chuàng)建序列化器姊舵,實現(xiàn)org.apache.kafka.common.serialization.Serializer
接口:
public class CustomerSerializer implements Serializer<Customer> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String s, Customer customer) {
byte[] serializedName;
if(customer == null) {
return null;
}else {
try {
if(customer.getCustomerName() != null) {
serializedName = customer.getCustomerName().getBytes("UTF-8");
} else {
serializedName = new byte[0];
}
//前四個字節(jié)存儲id,后四個字節(jié)存儲name長度寓落,最后存儲name
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + serializedName.length);
buffer.putInt(customer.getCustomerId());
buffer.putInt(serializedName.length);
buffer.put(serializedName);
return buffer.array();
}catch (IOException ioe) {
throw new SerializationException("serializer error!");
}
}
}
@Override
public void close() {
}
}
接下來就可以直接使用這個序列化器了括丁,首先將value.serializer
設(shè)置為該類,然后就能使用ProducerRecord<String,Customer>
了零如。
自定義序列化器有許多問題躏将,比如版本變遷,需要考慮版本兼容考蕾,多團隊寫kafka需要使用相同的序列化器等等祸憋。所以還是推薦已經(jīng)程序的序列化框架,比如Protobuf肖卧、Avro蚯窥、JSON或Thrift等。
之后可以學習整理一下在Kafka中使用第三方序列化框架塞帐。
分區(qū)器
Kafka消息是一個個鍵值對拦赠,但是鍵可以為null。鍵的用途主要有兩個:作為消息的附加信息和用來分區(qū)葵姥。
如果消息鍵為null荷鼠,即new ProducerRedcord<>(“topic”,”value")
不設(shè)置key值,并且使用默認分區(qū)榔幸,那么所有消息會被隨機發(fā)送到主題的各個可用分區(qū)上允乐。分區(qū)器使用輪詢算法將消息均衡地分布到各個可用分區(qū)上。
如果消息鍵不為null削咆,并且使用了默認分區(qū)器牍疏,那么Kafka會使用自己的散列算法對鍵進行散列,然后根據(jù)散列值將消息映射到特定的分區(qū)上(所有相同的鍵都會映射到同一個分區(qū)上)拨齐。這里需要注意鳞陨,這時候散列映射是對topic的所有分區(qū),而不僅僅是可用的分區(qū)瞻惋,如果這時候有分區(qū)不可用厦滤,就可能會發(fā)生錯誤。還需要知道歼狼,如果主題的分區(qū)數(shù)發(fā)生了改變馁害,根據(jù)鍵的散列映射就可能發(fā)生變化。
自定義分區(qū)器
如果我們不想使用默認的分區(qū)器蹂匹,也可以實現(xiàn)自己的分區(qū)器碘菜。
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic("testTopic");
int numPartitions = partitions.size();
//通過key的長度對分區(qū)數(shù)取模
if(keyBytes == null || !(key instanceof String))
throw new InvalidRecordException("key is null or not is string!");
String keyStr = String.valueOf(key);
return keyStr.length() % (numPartitions - 1);
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
//通過producer參數(shù)設(shè)置分區(qū)類
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.learn.tc.kafka.demo.CustomPartitioner");
生產(chǎn)者配置
生產(chǎn)者除了上面三個必要配置參數(shù),還有許多可配置的參數(shù),它們大部分都有合理的默認值忍啸,所以不需要修改它們仰坦。但是有一些參數(shù)在內(nèi)存使用、性能和可靠性方面對生產(chǎn)者影響比較大计雌。下面來看一下:
配置項 | 類型 | 默認值 | 取值范圍 | 描述 |
---|---|---|---|---|
acks | string | 1 | [all,-1,0,1] | 需要多少個分區(qū)副本接收到消息悄晃,才認為寫入成功。acks=0:生產(chǎn)者不會等待服務器響應凿滤,如果消息丟失妈橄,生產(chǎn)者也無感知。因為無需等待相應翁脆,所以能夠以網(wǎng)絡(luò)支持的最大速度發(fā)送消息眷蚓,從而達到很高的吞吐量。acks=1:集群leader接受到消息后反番,生產(chǎn)者就會接受到來自服務器的成功相應沙热。如果leader接受到消息后掛掉了,還沒有來得及向其它節(jié)點發(fā)送消息罢缸,則可能出現(xiàn)消息丟失的問題篙贸。acks=all:所有參與服務的節(jié)點都接受到消息時,生產(chǎn)者才會收到寫入成功的相應枫疆。這是最安全的模式爵川,不過它的延遲也相對高一些。該設(shè)置同acks=-1息楔。 |
buffer.memory | long | 33554432(32M) | [0,…] | 生產(chǎn)者內(nèi)存緩沖區(qū)大小寝贡,生產(chǎn)者使用它緩存要發(fā)送到服務器的消息。如果應用程序發(fā)送消息的速度超過發(fā)送到服務器的速度钞螟,會導致生產(chǎn)者空間不足。send()方法會阻塞到max.block.ms參數(shù)所配置時間谎碍,如果超過這個時間鳞滨,則會拋出異常。 |
max.block.ms | long | 60000 | [0,...] | 阻塞等待最長時間蟆淀,比如緩沖區(qū)已滿拯啦,或者沒有可用元數(shù)據(jù)時,這些方法就會阻塞。在阻塞達到max.block.ms之后就會拋出超時異常。 |
compression.type | string | none | [none,gzip,snappy,lz4] | 生產(chǎn)者發(fā)送到服務器的消息是否進行壓縮究珊,默認是不壓縮的楚里。可以選擇gzip(客觀的壓縮比)称近、snappy(節(jié)省CPU)或lz4撇贺√酥可以降低網(wǎng)絡(luò)傳輸開銷和存儲開銷兵迅。 |
retries | int | 0 | [0,…,2147483647] | 當生產(chǎn)者接收到錯誤是臨時性錯誤時抢韭,可以通過retries參數(shù)設(shè)置重發(fā)消息的次數(shù)。重試時間間隔由retry.backoff.ms參數(shù)指定恍箭,默認為100ms(這個時間最好測試一下節(jié)點恢復時間刻恭,總嘗試次數(shù)時間大約節(jié)點恢復時間)。 |
batch.size | int | 16384(16k) | [0,...] | 當多個消息發(fā)送到同一個分區(qū)時扯夭,生產(chǎn)者會把他們放到同一個批次里鳍贾。該參數(shù)指定了一個批次可以使用的內(nèi)存大小。當被填滿后交洗,這個批次消息就會被發(fā)送出去(不一定等填滿才發(fā)送骑科,達到了linger.ms上限)。如果設(shè)置為0藕筋,則每條消息都會立即發(fā)送纵散。它的大小需要你在內(nèi)存、吞吐量和延遲上做出一個平衡隐圾。 |
linger.ms | long | 0 | [0,...] | 指定生產(chǎn)者在發(fā)送批次之前等待多長時間伍掀,KakfaProducer會在批次填滿或linger.ms達到上限時把批次消息發(fā)送出去。默認為0暇藏,即只要有線程可用蜜笤,就會將批次消息發(fā)送出去。 |
client.id | string | "" | 任意字符串盐碱,服務器會用它標識消息來源把兔,還可用在日志和配額指標里。 | |
max.in.flight.requests.per.connection | int | 5 | [1,...] | 指定生產(chǎn)者在收到服務器響應之前可以發(fā)送多少個消息瓮顽。值越高县好,占用內(nèi)存越高,并且可能導致重排序風險(失敗重試導致)暖混,但是可以得到很高的吞吐量缕贡。 |
timeout.ms | int | 30000 | [0,...] | 指定了broker等待副本同步返回確認消息的時間,與acks的配置相匹配(設(shè)置為大于1)拣播。 |
request.timeout.ms | int | 30000 | [0,...] | 生產(chǎn)者發(fā)送數(shù)據(jù)時等待服務器返回相應時間晾咪。 |
max.request.size | int | 1048576(1M) | [0,...] | 控制生產(chǎn)者發(fā)送數(shù)據(jù)的大小,但這個是指生產(chǎn)者向服務器發(fā)送的批次的大小贮配,即該批次下的消息總大小谍倦。(感覺和batch.size在使用上有點相似) |
receive.buffer.bytes | int | 32768(32k) | [-1,...] | 指定TCP socket接受數(shù)據(jù)包的緩沖區(qū)大小,如果被設(shè)置成-1泪勒,就是用操作系統(tǒng)的默認值昼蛀。當生產(chǎn)者或消費者不在同一個數(shù)據(jù)中心的時候宴猾,可以適當調(diào)大,因為跨數(shù)據(jù)中心曹洽,網(wǎng)絡(luò)傳輸有較高延遲和比較高低的延遲帶寬鳍置。 |
Kafka消費者
消費者和消費者群組
消費Kafka中的數(shù)據(jù),Kafka提供了消費者和消費者群組的概念送淆。Kafka消費者從屬于消費者群組税产,一個群組里的消費者訂閱的是相同主題,也就是主題訂閱是消費者群組級別的訂閱偷崩,而不是單個消費者訂閱辟拷。消費者群組中的每個消費者都會接受訂閱主題的部分分區(qū)的數(shù)據(jù),也就是說同一個分區(qū)中的消息只會被消費者群組中的一個消費者所消費阐斜。
為什么要使用消費者群組的概念
消費者群組是為了橫向伸縮消費者處理能力主要方式衫冻,當一個消費者的處理能力跟不上生產(chǎn)者的生產(chǎn)速度時,這時候就需要增加消費者谒出,從同一個主題中讀取消息隅俘,對消息進行分流,從而擴展了數(shù)據(jù)處理能力笤喳。
消費者如何對主題消息進行分流
下面是一組主題T1和對應消費者群組的映射圖:
當消費者群組中只有一個消費者時候为居,這個消費者會接受主題內(nèi)所有分區(qū)的數(shù)據(jù):
當消費者群組有兩個消費者時候,這個兩個消費者會平分主題內(nèi)的所有分區(qū):
當消費者個數(shù)與分區(qū)數(shù)相同時杀狡,每個消費者都會消費特定分區(qū)消息蒙畴,即分區(qū)與消費者進行了一一映射:
當消費者個數(shù)超過了分區(qū)數(shù)時,這時候多于的消費者就會閑置:
從中可以看到呜象,消費者群組中有效的消費者個數(shù)膳凝,是由消費主題決定的。往消費者群組里面增加消費者是橫向伸縮消費能力的主要方式恭陡,由于Kafka消費者經(jīng)常做一些高延遲的操作蹬音,這種情況下單個消費者無法跟上數(shù)據(jù)的生產(chǎn)速度,所以需要添加更多的消費者休玩。這時候也就需要為主題創(chuàng)建大量的分區(qū)著淆,在負載增長時可以加入更多的消費者。
Kafka還支持多個應用程序消費相同的主題哥捕,在這種情況下牧抽,每個應用程序都會獲取到主題的所有消息嘉熊,而不是部分消息遥赚。這時候只要保證每個應用程序有自己的消費者群組即可。
分區(qū)再均衡
當群組中新增消費者或移除消費者時阐肤,會觸發(fā)分區(qū)重新分配凫佛,新增的消費者消費的分區(qū)原本是由其它分區(qū)消費的讲坎,而移除的消費者,原本由它處理分區(qū)會分配給其它消費者愧薛。除了新增或移除消費者外晨炕,新增分區(qū)也會導致再分區(qū)。
分區(qū)的所有權(quán)發(fā)生變更毫炉,這種行為稱為再均衡瓮栗。再均衡非常重要,因為它為消費者群組帶來了高可用和伸縮型瞄勾。但是再均衡會導致消費者群組一小段的時間不可用费奸,并且當一個分區(qū)重新分配給一個消費者時,消費者的當前狀態(tài)會丟失进陡,它有可能還需要去刷新緩存愿阐,從而拖慢整個應用程序。所以我們應該盡量避免再均衡的問題趾疚。
Kafka集群如何感知消費者不可用
消費者是通過向群組協(xié)調(diào)器(Kafka為每個消費者群組都指定了一個broker)來維持它和群組的關(guān)系以及它對分區(qū)的所有權(quán)關(guān)系缨历,只要消費者在規(guī)定時間內(nèi)向發(fā)送心跳,就會被認為活躍的糙麦,說明它還在讀取分區(qū)中的消息辛孵。消費者是利用輪詢消息或提交偏移量時發(fā)送心跳的,如果在規(guī)定時間間隔內(nèi)沒有發(fā)送心跳喳资,群組協(xié)調(diào)器就會認為該消費者已經(jīng)死亡觉吭,就會觸發(fā)一次再均衡。
需要注意仆邓,在0.10.1之后的版本里鲜滩,Kafka引入了一個獨立的心跳線程,可以在輪詢消息的空擋時間發(fā)送心跳节值。這樣發(fā)送心跳的頻率與消息輪詢的頻率就進行了解耦徙硅。
創(chuàng)建Kafka消費者
創(chuàng)建Kakfa消費者需要創(chuàng)建KafkaConsumer對象,KafkaConsumer和KakfaProducer一樣搞疗,也有一些必要屬性嗓蘑。
- bootstrap.server:指定了Kakfa集群的連接字符串,用途和KakfaProducer一樣匿乃。需要注意桩皿,在0.8.2版本中這個參數(shù)是zookeeper.connect,即Kafka集群所使用的zk集群地址幢炸。
- key.deserializer:與KafkaProducer的key.serializer一樣泄隔,只不過是負責將key由字節(jié)轉(zhuǎn)為java對象。
- value:deserializer:與KakfaProducer的value.serializer一樣宛徊,只不過是負責將value由字節(jié)轉(zhuǎn)為java對象佛嬉。
- group.id:不是必須的字段逻澳,但是一般都會設(shè)置,這個是該消費者所屬的消費者群組暖呕。
Properties props = new Properties();
props.put("bootstrap.servers","192.168.0.1:9092");
props.put("group.id","test");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
創(chuàng)建完KafkaConsumer之后斜做,接下來就需要訂閱消費的主題了,它支持訂閱一個主題列表湾揽,同時也支持正則表表達式瓤逼,當有滿足正則表達式的主題被創(chuàng)建后,消費者群組就會觸發(fā)一次再均衡库物,來消費新的主題抛姑。
consumer.subscribe(Arrays.asList(“my-demo-topic”));
接下來就是輪詢消息了,因為消費者是一個常駐任務艳狐,所以將其放在一個無限循環(huán)中定硝。
try{
while (true) {
ConsumerRecords<String,String> records = consumer.poll(100);
for(ConsumerRecord<String,String> record : records) {
System.out.printf("offset = %d ,key = %s, value = %s%n",record.offset(),record.key(),record.value());
}
}
}finally{
consumer.close();
}
poll()方法就是對Kafka進行輪詢(心跳發(fā)送也在這時候),poll()方法中是一個超時參數(shù)毫目,當消費者緩沖區(qū)里面沒有可用數(shù)據(jù)時蔬啡,會發(fā)生阻塞,當?shù)竭_超時時間后镀虐,無論緩沖區(qū)有沒有數(shù)據(jù)都會返回箱蟆。如果該參數(shù)設(shè)為0,pll()方法會立即返回刮便,否則它會在超時時間內(nèi)一直等待broker返回數(shù)據(jù)空猜。
poll()方法會返回一個ConsumerRecords列表,每個ConsumerRecords代表一條記錄恨旱,它記錄了主題信息辈毯、分區(qū)、偏移量搜贤,以及對應的鍵值對谆沃。
最后在退出應用程序之前,調(diào)用close方法關(guān)閉消費者仪芒。網(wǎng)絡(luò)連接和socket也會隨之關(guān)閉唁影,并且立觸發(fā)發(fā)一次再均衡,而不是等待群組協(xié)調(diào)器發(fā)現(xiàn)死亡掂名,從而節(jié)省了再分配的時間据沈。
消費者配置
配置項 | 類型 | 默認值 | 取值范圍 | 描述 |
---|---|---|---|---|
fetch.min.bytes | int | 1 | [0,...] | 指定了消費者從服務器獲取記錄的最小字節(jié)數(shù),如果broker可用數(shù)據(jù)小于該值饺蔑,那么broker會等到有足夠數(shù)據(jù)時才會返回锌介。能夠降低消費者和broker的負載。 |
fetch.max.wait.ms | int | 500 | [0,…] | 指定了有足夠數(shù)據(jù)時在返回消費者膀钠,而fetch.max.wait.ms則指定了等待時間掏湾。當kafka接收到請求后,或者達到feth.min.bytes指定大小返回肿嘲,或者達到fetch.max.wait.ms指定的時間返回融击,看哪個先滿足。 |
max.partition.fetch.bytes | int | 1048976(1M) | [0,...] | 指定了服務器從每個分區(qū)里返回給消費者的最大字節(jié)數(shù)雳窟,這個配置應該比broker能夠接受的最大消息大尊浪,否則消費者可能無法讀取這些大消息,導致消費之一直掛起重試封救。 |
session.timeout.ms | int | 10000 | 指定了消費者被認為死亡之前拇涤,能夠與服務器斷開連接的時間。即在超過該參數(shù)指定的時間外誉结,沒有發(fā)送心跳鹅士,就會被認為死亡,觸發(fā)再均衡惩坑。該參數(shù)與heartbeat.interval.ms相關(guān)掉盅,因為它指定了發(fā)送心跳的時間間隔。 | |
heartbeat.interval.ms | int | 3000 | 用途如上所說以舒,控制發(fā)送心跳的時間間隔趾痘。一般為session.timeout.ms的1/3÷樱可以將它們設(shè)置的低一些永票,以便盡快觸發(fā)再均衡。 | |
auto.offset.rest | string | latest | [latest,earliest,none] | 指定了消費者在讀取一個沒有偏移量的分區(qū)或者偏移量無效(消費者長時間失效滥沫,偏移量被刪除)時侣集,該從哪里讀取。默認為latest兰绣,即最新記錄開始讀取肚吏。earliest為從起始位置讀取。none為當找不到偏移量時拋出異常狭魂。 |
enable.auto.commit | boolean | true | 指定了消費者是否自動提交偏移量罚攀。為了避免數(shù)據(jù)丟失或出現(xiàn)重復讀取數(shù)據(jù),可以改為false雌澄,由自己控制提交偏移量斋泄。 | |
max.poll.records | int | 500 | 指定了單次調(diào)用poll()方法所能夠返回最大數(shù)據(jù)量。 |
提交偏移量
Kafka不同其它的消息系統(tǒng)需要等待消費者確認才認為該消息被處理了镐牺,Kafka是采用偏移量來幫助消費者追蹤分區(qū)內(nèi)消息被處理的位置炫掐,消費者更新當前位置的操作在kafka中稱為提交。
消費者會向一個叫做_consumer_offset的特殊topic中發(fā)送消息睬涧,消息包含了每個分區(qū)的偏移量募胃。如果消費者處于一直運行狀態(tài)旗唁,這個偏移量是沒有用的,但是當消費者崩潰或者有新的消費者加入消費者群組是痹束,就會觸發(fā)再均衡检疫。這時候每個消費者可能分配到新的分區(qū),為了能夠繼續(xù)之前的工作祷嘶,消費者需要讀取每個分區(qū)的偏移量屎媳,然后從最后提交偏移量的位置繼續(xù)工作。
當發(fā)生再均衡的時候论巍,提交偏移量方式會對客戶端影響很大烛谊,會導致重復消費消息或?qū)е孪G失。
如果最后提交偏移量的位置小于客戶端最后處理消息的位置嘉汰,那么會導致兩個偏移量之間消息被重復處理丹禀。
如果最后提交偏移量的位置大于客戶端最后處理消息的位置,那么會導致兩個偏移量之間消息丟失鞋怀。
針對客戶端不同的需求湃崩,Kafka的Consumer提供了多種提交偏移量的API。
自動提交
Kafka提供了自動提交偏移量的方式接箫,這也是最簡單的方式攒读。通過將enable.auto.commit(默認為true,老版本叫auto.commit.enable)設(shè)置為true辛友,然后每隔auto.commit.interval.ms(默認為5s)會自動將從poll()方法接收到的最大偏移量提交上去薄扁。這種提交方式,也和消費者的其它東西一樣废累,自動提交也是在輪詢過程中完成的邓梅。每次輪詢自動提交上去的偏移量,都是上一次poll方法返回的最大偏移量邑滨,它并不知道哪些消息真正被處理了日缨,所以在每次調(diào)用之前,最好確保當前poll()方法返回的消息都被處理了掖看,消費者也會在調(diào)用close()方法之前進行自動提交匣距。
需要注意使用自動提交這種方式,一般不會有什么問題哎壳,不過在產(chǎn)生異骋愦或者提前退出輪詢時,就會產(chǎn)生我們上面說的兩種情況归榕,可能導致消息重復消費或消息丟失尸红。如果是對消息處理嚴格的業(yè)務,建議不要使用這種方式。
我們可以通過縮小提交偏移量的時間來消除消息丟失的可能性外里,并在再均衡時減少重復消息的數(shù)量怎爵。
手動提交
消費者還提供了另外一種提交偏移量的方式,就是把enable.auto.commit設(shè)置為false盅蝗,然后通過commitSync()方法提交偏移量鳖链。commitSync()方法是最可靠也是最簡單的方式,它會把poll()方法返回的最新偏移量提交上去风科,如果提交成功立馬返回,如果提交失敗則會拋出異常乞旦。
需要注意的是commitSync()方法一定要在poll()方法返回的消息處理完成后在提交贼穆,否則還會有消息丟失的風險的。commitSync()能夠有效避免消息丟失情況兰粉,但是還會存在消息重復處理的問題的故痊,當發(fā)生再均衡時,從最近一批消息到發(fā)生再均衡之前的消息會被重復處理玖姑。
while (true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String,String> record : records) {
//處理邏輯
}
try {
consumer.commitSync(); //處理完本批次消息在提交偏移量
}catch (Exception e) {
LOG.error("commit offset failed",e);
}
}
當使用commitSync()提交偏移量發(fā)生異常時愕秫,它會進行重試提交,直到提交成功焰络。
通過commitSync()的方法名稱就能直到戴甩,這是一個同步提交偏移量的方法,在結(jié)果返回之前闪彼,它會一直阻塞甜孤,這樣會限制應用程序的吞吐量。雖然我們可以通過降低提交頻率來提升吞吐量畏腕,但是當發(fā)生再均衡時缴川,會導致增大重復處理消息數(shù)量。
異步提交
Kafka提供了commitAsync()方法來提交偏移量描馅,commitAsync()在提交完偏移量后立馬返回把夸,并且比如偏移量提交失敗,它也不會進行重復提交铭污,因為下一批次更大的偏移量可能已經(jīng)提交了恋日,重復提交會導致覆蓋更大的偏移量。不過commitAsync()方法提供回調(diào)嘹狞,當broker做出相應時執(zhí)行回調(diào)谚鄙。回調(diào)一般用來記錄異常和度量指標刁绒,如果我們重新提交偏移量闷营,一定要注意提交順序,因為有可能更大偏移量已經(jīng)提交了,一般可以通過在回調(diào)加一個遞增的序列號來維護異步提交的順序傻盟。
while (true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String,String> record : records) {
//處理邏輯
}
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if(e != null) {
LOG.error("Commit offset fail.",e);
}
}
});
}
同步和異步組合提交
一般偶爾出現(xiàn)提交失敗速蕊,不進行重試影響不大,因為提交失敗時臨時問題導致的娘赴,那么后續(xù)總有提交成功的時候规哲。但是如果這時關(guān)閉消費者或者再均衡最后一次的提交,就需要確保提交成功诽表。
try {
while (true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String,String> record : records) {
//處理邏輯
}
consumer.commitAsync();
}
}catch (Exception e) {
e.printStackTrace();
}finally {
try {
consumer.commitSync();
}catch (Exception e) {
e.printStackTrace();
}
}
一般時候使用異步提交唉锌,來保證系統(tǒng)的吞吐量。如果直接關(guān)閉消費者竿奏,就需要使用commitSync()方法確保一定提交成功袄简。
提交指定位置的偏移量
commitSync()和commitAsync()方法每次只會提交poll()下來消息的最大偏移量,有時候我們poll()下來大批量數(shù)據(jù)泛啸,為了避免再均衡引起的大量重復處理消息绿语,可能想在中途過程中就提交偏移量。Kafka針對這種情況候址,允許我們在調(diào)用commitAsync()和commitSync()時傳入希望提交的分區(qū)和對應的偏移量吕粹。
Map<TopicPartition,OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;
while (true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String,String> record : records) {
//處理邏輯
currentOffsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset() + 1));
if(count % 100000 == 0) {
consumer.commitAsync(currentOffsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if(e != null) {
LOG.error("commit offset fail!");
}
}
});
}
}
}
之所以使用Map存儲分區(qū)和offset,是因為一個消費者可能訂閱了多個topic岗仑。
再均衡監(jiān)聽器
當發(fā)生再均衡時匹耕,我們可能有一些事情需要做,比如提交最后的偏移量荠雕。Kafka為我們提供了再均衡監(jiān)聽器泌神,當發(fā)生再均衡時會調(diào)用監(jiān)聽器中的相應方法。再均衡監(jiān)聽器是在consumer訂閱主題時傳遞進去的舞虱,你需要實現(xiàn)ConsumerRebalanceListener接口欢际。
class RebalanceHandler implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
//再均衡開始之前和消費者停止讀取消息之后調(diào)用
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
//再均衡分區(qū)重新分配之后和消費者開始消費之前調(diào)用
}
}
使用再均衡監(jiān)聽器來提交偏移量。
class RebalanceHandler implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
//再均衡開始之前和消費者停止讀取消息之后調(diào)用
consumer.commitSync(currentOffsets);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
//再均衡分區(qū)重新分配之后和消費者開始消費之前調(diào)用
}
}
consumer.subscribe(Arrays.asList("demo-topic"),new RebalanceHandler());
int count = 0;
while (true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String,String> record : records) {
//處理邏輯
currentOffsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset() + 1));
if(count % 100000 == 0) {
consumer.commitAsync(currentOffsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if(e != null) {
LOG.error("commit offset fail!");
}
}
});
}
}
}
}
從特定偏移量處理記錄
Kafka提供從指定位置讀取消息的API以便我們使用矾兜,如果想要從開始分區(qū)的起始位置開始讀取损趋,或者直接跳到分區(qū)的末尾開始讀取消息,可以使用seekToBeginning(Collection<TopicPartition> tp)
方法和seekToEnd(Collection<TopicPartition> tp)方法椅寺。
consumer.seekToBeginning(Arrays.asList(new TopicPartition("demo-topic",0))); //指定topic和分區(qū)
consumer.seekToEnd(Arrays.asList(new TopicPartition("demo-topic",1)));
Kafka也為我們提供了查找特定偏移量的API浑槽,它有許多用途,比如向前或向后偏移幾個記錄等等返帕。seek()方法能夠用來指定特定的偏移量桐玻,我們只需要指定topic、分區(qū)以及具體的偏移量即可荆萤。
consumer.seek(new TopicPartition(“demo-topic”,3),1500); //主題demo-topic的分區(qū)3镊靴,從偏移量1500開始讀取
使用seek()方法指定偏移量讀取铣卡,一般是將偏移量存儲到Kafka之外了。因為Kafka的提交偏移量無論怎么優(yōu)化都有可能導致重讀消息的風險(再均衡監(jiān)聽器只能對正常服務的消費者起作用偏竟,如果消費者掛掉后煮落,是觸發(fā)不到的),因為消費數(shù)據(jù)和提交偏移量本身不是一個原子操作踊谋。這時候我們可以能借助外部存儲系統(tǒng)蝉仇,比如我們將從Kafka處理之后的數(shù)據(jù)寫到Mysql,然后也將偏移量寫到Mysql中殖蚕,然后通過事務將二者關(guān)聯(lián)起來轿衔,這樣要么都處理成功,要么都處理失敗睦疫。
class RebalanceHandler implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
//即將失去分區(qū)前提交事務,確保記錄成功保存
//commitDBTransaction();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//從數(shù)據(jù)庫獲取偏移量害驹,再分配到新分區(qū)后,使用seek()定位
for(TopicPartition partition : partitions){
//consumer.seek(partition,getOffsetFromDB(partition));
}
}
}
consumer.subscribe(Arrays.asList("demo-topic"),new RebalanceHandler());
//調(diào)用一次pool()方法笼痛,目的是讓消費者加入到消費者群組裙秋,并獲取到分區(qū)琅拌,然后調(diào)用seek()方法定位到分區(qū)偏移量
consumer.poll(Duration.ofMillis(0));
for (TopicPartition partition : consumer.assignment()) {
//consumer.seek(topic,getOffsetFromDB(partition)); //獲取每個主題分區(qū)的維值
}
while (true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String,String> record : records) {
//processRecord(record);//處理邏輯
//storeRecord(record);//保存處理記錄
//storeOffsetInDB(record.topic(),record.partition(),record.offset());//保存偏移量
}
//commitDBTransaction();//提交事務
}
}
消費者優(yōu)雅退出
Kafka提供了consumer.weakup()方法通過拋出異常的方式來停止消費者缨伊,weakup()是kafka提供的唯一一個可以從其它線程里安全調(diào)用的方法。調(diào)用consumer.weakup()可以退出poll()进宝,并且拋出WeakupException異常刻坊,我們不需要處理這種異常,因為它只是跳出循環(huán)的一種方式党晋。還需注意谭胚,在退出線程之前,還需要調(diào)用consumer.close()方法未玻,它會提交任何還沒有提交的任務灾而,并向群組協(xié)調(diào)器發(fā)送消息,告知離開扳剿,立即觸發(fā)再均衡旁趟,而無需等待會話超時。
//main shuthook()
public void shuthook(Consumer consumer,Thread mainThread) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
consumer.wakeup();
try {
mainThread.join(); //主線程等待
}catch (InterruptedException e) {
e.printStackTrace();
}
}));
}
//
try{
while(true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String,String> record : records) {
//處理邏輯
}
consumer.commitAsync();
}
}catch(WakeupException e) {
//無需處理
}finally {
consumer.close()
}
自定義反序列化器
生產(chǎn)者中我們說了如何定義序列化器庇绽,消費者中的反序列化器定義和序列化器定義一樣锡搜,只不過是實現(xiàn)Deserializer接口,其它使用方式都和生產(chǎn)者一樣瞧掺。