本小節(jié)我們來討論Kafka生產(chǎn)者是如何發(fā)送消息到Kafka的凯旭, Kafka項目有一個生產(chǎn)者客戶端,我們可以通過這個客戶端的API來發(fā)送消息。生產(chǎn)者客戶端是用Java寫的罐呼,但Kafka寫消息的協(xié)議是支持多語言的鞠柄,其它語言的api可見這個wiki
概要
通過本文,你可以了解到以下內(nèi)容:
- kafka producer端的整體結(jié)構(gòu),相關(guān)參數(shù)配置,以及性能優(yōu)化;
- 分區(qū)器,攔截器的擴展;
- 消息序列化擴展;
- 分區(qū)器,攔截器,序列化的執(zhí)行順序;
開始
很多做業(yè)務(wù)的同學(xué)都知道,在我們系統(tǒng)中發(fā)送一條消息給kafka 集群,我們只需要簡單的調(diào)一下已經(jīng)封裝好的接口,下面是來于我實際項目中的接口方法:
kafkaProducer.produce(String topic,Object msg)
每次要發(fā)消息,我就是這么簡單的調(diào)用一下就能確保消息能被consumer端正常的消費,但是kafka producer做了哪些工作我卻渾然不知,今天我就跟大家說到底說道這個里面到底有哪些不為人知的操作;
- 引出第一個問題,kafka消息的發(fā)送是一個什么樣的過程? 這個過程中做了哪些操作?
借助于kafka官網(wǎng)上的API,首先給大家來一張producer端的消息流轉(zhuǎn)圖:
接下來,結(jié)合一段代碼,給大家簡單說下流程:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("compresstion.type","snappy");
props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < MAZ_RETRY_SIZE; i++) {
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
}
producer.close();
流程如下:
- 首先需要指定
kafka producer
端的配置;
- zk的地址和端口;
- producer端
ack
應(yīng)答機制,本demo中ack
設(shè)置為all
,表示生產(chǎn)者會等待所有副本成功寫入該消息,這種方式是最安全的嫉柴,能夠保證消息不丟失厌杜,但是延遲也是最大的; -
retries
設(shè)置標(biāo)示消息發(fā)送失敗,生產(chǎn)者可以自動重試,但是此刻設(shè)置為0標(biāo)示不重試;這個參數(shù)需要結(jié)合retry.backoff.ms
(重試等待間隔)來使用,建議總的重試時間比集群重新選舉群首的時間長,這樣可以避免生產(chǎn)者過早結(jié)束重試導(dǎo)致失敗; -
batch.size
參數(shù)標(biāo)示生產(chǎn)者為每個分區(qū)維護了一個未發(fā)送記錄的緩沖區(qū),這個緩沖區(qū)的大小由batch.size配置指定,配置的很大可能會導(dǎo)致更多的批處理,也需要更多的內(nèi)存(但是對于每個活動分區(qū)计螺,我們通常都有一個這樣的緩沖區(qū)),默認是16384Bytes; -
linger.ms
指定生產(chǎn)者在發(fā)送批量消息前等待的時間,當(dāng)設(shè)置此參數(shù)后,即便沒有達到批量消息的指定大小,到達時間后生產(chǎn)者也會發(fā)送批量消息到broker.默認情況下,生產(chǎn)者的發(fā)送消息線程只要空閑了就會發(fā)送消息,即便只有一條消息.設(shè)置這個參數(shù)后,發(fā)送線程會等待一定的時間,這樣可以批量發(fā)送消息增加吞吐量,但同時也會增加延遲; -
buffer.memory
控制生產(chǎn)者可用于緩沖的內(nèi)存總量;消息的發(fā)送速度超過了它們可以傳輸?shù)椒?wù)器的速度,那么這個緩沖空間將被耗盡.
當(dāng)緩沖區(qū)空間耗盡時,額外的發(fā)送調(diào)用將阻塞.阻止時間的閾值由max.block.ms
確定夯尽,在此之后它將引發(fā)TimeoutException
.這個緩存是針對每個producerThread,不應(yīng)設(shè)置高以免影響內(nèi)存;
生產(chǎn)者如果每發(fā)送一條消息都直接通過網(wǎng)絡(luò)發(fā)送到服務(wù)端,勢必會造成過多 的網(wǎng)絡(luò)請求登馒。如果我們能夠?qū)⒍鄺l消息按照分區(qū)進行分組匙握,并采用批量的方式一次發(fā)送一個消息集,并且對消息集進行壓縮谊娇,就可以減少網(wǎng)絡(luò)傳輸?shù)膸挿喂拢M一步提高數(shù)據(jù)的傳輸效率。
-
key.serializer
和value.serializer
指定了如何將key和value序列化成二進制碼流的方式,也就是上圖中的序列化方式; -
compresstion.type
:默認情況下消息是不壓縮的济欢,這個參數(shù)可以指定使用消息壓縮,參數(shù)可以取值為snappy小渊、gzip或者lz4;
- 接下來,我們需要創(chuàng)建一個
ProducerRecord
法褥,這個對象需要包含消息的topic
和值value
,可以選擇性指定一個鍵值key
或者分區(qū)partition
酬屉。 - 發(fā)送消息時半等,生產(chǎn)者會根據(jù)配置的
key.serializer
和value.serializer
對鍵值和值序列化成字節(jié)數(shù)組,然后發(fā)送到分配器partitioner
呐萨。 - 如果我們指定了分區(qū)杀饵,那么分配器返回該分區(qū)即可;否則,分配器將會基于鍵值來選擇一個分區(qū)并返回。
- 選擇完分區(qū)后谬擦,生產(chǎn)者知道了消息所屬的主題和分區(qū)切距,它將這條記錄添加到相同主題和分區(qū)的批量消息中,另一個線程負責(zé)發(fā)送這些批量消息到對應(yīng)的
Kafka broker
惨远。 - 當(dāng)
broker
接收到消息后谜悟,如果成功寫入則返回一個包含消息的主題、分區(qū)及位移的RecordMetadata
對象北秽,否則返回異常. - 生產(chǎn)者接收到結(jié)果后葡幸,對于異常可能會進行重試,根據(jù)參數(shù)
reties
的配置決定.
kafka發(fā)送端文件存儲原理
我們普遍認為一旦涉及到磁盤的訪問贺氓,數(shù)據(jù)的讀寫就會變得很慢蔚叨,其實不然,操作系統(tǒng)已經(jīng)針對磁盤的訪問速率做了很大的優(yōu)化;比如,預(yù)讀會提前將一個比較大的磁盤讀入內(nèi)存蔑水,后寫會把很多小的邏輯寫操作合并起來組合成一個大的物理寫操作邢锯;并且,操作系統(tǒng)還會將主內(nèi)存剩余的所有空間都用作磁盤緩存肤粱,所有的磁盤讀寫都會經(jīng)過統(tǒng)一的磁盤緩存弹囚,綜上所述,如果針對磁盤的順序讀寫领曼,某些情況它可能比隨機的內(nèi)存訪問都要快鸥鹉。
文件寫入的邏輯無外乎一下這兩種,但kafka選擇了第一種庶骄,也就是a圖的邏輯:
b圖是首先在內(nèi)存中保存盡可能多的數(shù)據(jù)毁渗,并在需要時將這些數(shù)據(jù)刷新進磁盤;
a圖是所有數(shù)據(jù)立即寫入磁盤单刁,但不進行刷新數(shù)據(jù)的調(diào)用灸异,數(shù)據(jù)首先會被傳輸?shù)酱疟P緩存,操作系統(tǒng)隨后會將數(shù)據(jù)定期自動刷新到磁盤羔飞。
發(fā)送端優(yōu)化
新的API中肺樟,生產(chǎn)者要發(fā)送消息,并不是直接發(fā)送給服務(wù)器逻淌,而是在客戶端先把消息放入一個緩沖隊列中么伯,然后由一個消息發(fā)送線程從隊列中拉取消息,以批鹽的方式發(fā)送消息給服務(wù)端卡儒。 Kafka的記錄收集器RecordAccumulator 負責(zé)緩存生產(chǎn)者客戶端產(chǎn)生的消息田柔,發(fā)送線程( Sender)負責(zé)讀取記錄收集器的批量消息, 通過網(wǎng)絡(luò)發(fā)送給服務(wù)端骨望。
開篇我們便列出了kafka 發(fā)送端的流程圖硬爆,消息發(fā)送之初,首先會為消息指定一個分區(qū)(發(fā)送消息時未指定分區(qū)的情況下)擎鸠,對于沒有鍵的消息缀磕,通過計數(shù)器自增輪詢的方式依次將消息分配到不同的分區(qū)上;對于有鍵的消息,對鍵計算散列值糠亩,然后和主題的分區(qū)數(shù)進行取模得到分區(qū)編號虐骑,具體的客戶端代碼實現(xiàn):
public int partition(ProducerRecord<byte[], byte[]> record, Cluster cluster) {
//獲取集群中所有的分區(qū)
List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
int numPartitions = partitions.size();
//如果指定分區(qū)
if (record.partition() != null) {
// they have given us a partition, use it
if (record.partition() < 0 || record.partition() >= numPartitions)
throw new IllegalArgumentException("Invalid partition given with record: " + record.partition()
+ " is not in the range [0..."
+ numPartitions
+ "].");
return record.partition();
// 如果沒有key,則負載均衡的分布
} else if (record.key() == null) {
int nextValue = counter.getAndIncrement();
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(record.topic());
if (availablePartitions.size() > 0) {
int part = Utils.abs(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.abs(nextValue) % numPartitions;
}
} else {
// 如果有key赎线,則對key 進行hash取模運算
return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
}
}
}
在客戶端就為消息選擇分區(qū)的目的是什么? 只有為消息選擇分區(qū)廷没,我們才能知道應(yīng)該發(fā)送到哪個節(jié)點,如果隨便找一個服務(wù)端節(jié)點垂寥,再由那個節(jié)點去決定如何將消息轉(zhuǎn)發(fā)給其他正確的節(jié)點來保存颠黎。這種方式增加了服務(wù)端的負擔(dān)另锋,多了不必要的數(shù)據(jù)傳輸。
序列化
在上述代碼中,我們看到了kafka producer在發(fā)送消息的時候會將key和value進行序列化,上面的程序中使用的是Kafka客戶端自帶的org.apache.kafka.common.serialization.StringSerializer
狭归,除了用于String類型的序列化器之外還有:ByteArray
夭坪、ByteBuffer
、Bytes
过椎、Double
室梅、Integer
、Long
這幾種類型,這幾個序列化類都實現(xiàn)了org.apache.kafka.common.serialization.Serializer
接口接下來,此接口有三種方法:
-
public void configure(Map<String, ?> configs, boolean isKey)
:用來配置當(dāng)前類疚宇。 -
public byte[] serialize(String topic, T data)
:用來執(zhí)行序列化亡鼠。 -
public void close()
:用來關(guān)閉當(dāng)前序列化器。一般情況下這個方法都是個空方法敷待,如果實現(xiàn)了此方法间涵,必須確保此方法的冪等性,因為這個方法很可能會被KafkaProducer調(diào)用多次榜揖。
業(yè)界用的多的序列化框架無外乎如Avro勾哩、JSON、Thrift举哟、ProtoBuf或者Protostuff等工具,這里我就不擴展開了,讀者如果感興趣可以搜索相關(guān)的資料,下面就以一個簡單的例子來介紹下如何自定義序列化方式.
假設(shè)我們有一個自定義的Company類:
@Data
public class Company {
private String name;
private String address;
}
接下來我們Company的name和address屬性進行序列化,實現(xiàn)下Serializer接口:
public class CompanySerializer implements Serializer<Customer> {
public void configure(Map<String, ?> configs, boolean isKey) {}
public byte[] serialize(String topic, Company data) {
if (data == null) {
return null;
}
byte[] name, address;
try {
if (data.getName() != null) {
name = data.getName().getBytes("UTF-8");
} else {
name = new byte[0];
}
if (data.getAddress() != null) {
address = data.getAddress().getBytes("UTF-8");
} else {
address = new byte[0];
}
ByteBuffer buffer = ByteBuffer.allocate(4+4+name.length + address.length);
buffer.putInt(name.length);
buffer.put(name);
buffer.putInt(address.length);
buffer.put(address);
return buffer.array();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return new byte[0];
}
public void close() {}
}
使用自定義的序列化類的方式也簡單,在前面的代碼中替換下properties中的序列化類即可:
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.xxx.kafka. CompanySerializer");
分區(qū)器
在上文的demo中,我們創(chuàng)建消息的時候,必須要提供主題和消息的內(nèi)容,而消息的key是可選的(也就是我們平時工作中,發(fā)送消息時只需要指定topic和message),當(dāng)不指定key時默認為null.消息的key有兩個重要的作用:
- 提供描述消息的額外信息;
- 用來決定消息寫入到哪個分區(qū),所有具有相同key的消息會分配到同一個分區(qū)中.
如果key為null,那么生產(chǎn)者會使用默認的分配器,該分配器使用輪詢round-robin)算法來將消息均衡到所有分區(qū).
如果key不為null且使用的是默認的分配器,那么生產(chǎn)者會對key進行哈希并根據(jù)結(jié)果將消息分配到特定的分區(qū).注意的是,在計算消息與分區(qū)的映射關(guān)系時,使用的是全部的分區(qū)數(shù)而不僅僅是可用的分區(qū)數(shù).這也意味著,如果某個分區(qū)不可用(雖然使用復(fù)制方案的話這極少發(fā)生),而消息剛好被分配到該分區(qū),那么將會寫入失敗.另外,如果需要增加額外的分區(qū),那么消息與分區(qū)的映射關(guān)系將會發(fā)生改變,因此盡量避免這種情況,具體的信息可以查看DefaultPartitioner
中的代碼實現(xiàn):
/**
* Compute the partition for the given record.
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//獲取指定topic的partitions
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
//key=null
if (keyBytes == null) {
int nextValue = nextValue(topic);
//可用分區(qū)
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
//消息隨機分布到topic可用的partition中
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// 無分區(qū)可利用, 給定一個不可用的分區(qū)
return Utils.toPositive(nextValue) % numPartitions;
}
//如果 key 不為 null思劳,并且使用了默認的分區(qū)器,kafka 會使用自己的 hash 算法對 key 取 hash 值
} else {//通過hash獲取partition
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
現(xiàn)在來看下如何自定義一個分配器,下面將key為Test的消息單獨放在一個分區(qū),與其他的消息進行分區(qū)隔離:
public class TestPartitioner implements Partitioner {
public void configure(Map<String, ?> configs) {}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if ((keyBytes == null) || (!(key instanceOf String)))
throw new InvalidRecordException("We expect all messages to have customer name as key")
if (((String) key).equals("Test"))
return numPartitions; // Banana will always go to last partition
// Other records will get hashed to the rest of the partitions
return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
}
public void close() {}
}
使用自定義的分區(qū)器
使用很簡單,在配置文件中或者properties文件中指定分區(qū)器的類即可;
props.put("partitioner.class", "com.xxx.kafka.TestPartitioner");
攔截器
Producer攔截器是個相當(dāng)新的功能.對于producer而言,interceptor使得用戶在消息發(fā)送前以及producer回調(diào)邏輯前有機會對消息做一些定制化需求,比如修改消息等.同時,producer允許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈,Intercetpor的實現(xiàn)接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定義的方法包括:
-
onSend(ProducerRecord)
:該方法封裝進KafkaProducer.send方法中妨猩,即它運行在用戶主線程中的芳室。Producer確保在消息被序列化以計算分區(qū)前調(diào)用該方法哟绊。用戶可以在該方法中對消息做任何操作,但最好保證不要修改消息所屬的topic和分區(qū)坐梯,否則會影響目標(biāo)分區(qū)的計算 -
onAcknowledgement(RecordMetadata, Exception e)
:該方法會在消息被應(yīng)答之前或消息發(fā)送失敗時調(diào)用震嫉,并且通常都是在producer回調(diào)邏輯觸發(fā)之前森瘪。onAcknowledgement運行在producer的IO線程中,因此不要在該方法中放入很重的邏輯票堵,否則會拖慢producer的消息發(fā)送效率 -
close
:關(guān)閉interceptor,主要用于執(zhí)行一些資源清理工作,一般不作實現(xiàn);
interceptor可能被運行在多個線程中,因此在具體實現(xiàn)時用戶需要自行確保線程安全.另外倘若指定了多個interceptor,則producer將按照指定順序調(diào)用它們,并僅僅是捕獲每個interceptor可能拋出的異常記錄到錯誤日志中而非在向上傳遞.這在使用過程中要特別留意.
下面我們簡單演示一個雙interceptor組成的攔截鏈,第一個interceptor會在消息發(fā)送前將時間戳信息加到消息value的最前部;第二個interceptor會在消息發(fā)送后更新成功發(fā)送消息數(shù)或失敗發(fā)送消息數(shù).
第一個,在send方法中,我們會創(chuàng)建一個新的message,把時間戳寫入消息體的最前部.
public class TimeStampPrependerInterceptor implements ProducerInterceptor<String, String> {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord onSend(ProducerRecord msg) {
return new ProducerRecord(
msg(), msg(), record.timestamp(), msg(), System.currentTimeMillis() + "," + msg().toString());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
定義第二個interceptor:CounterInterceptor
,該interceptor會在消息發(fā)送后更新"發(fā)送成功消息數(shù)"和"發(fā)送失敗消息數(shù)"兩個計數(shù)器,并在producer關(guān)閉時打印這兩個計數(shù)器;
public class CounterInterceptor implements ProducerInterceptor<String, String> {
private int errorCounter = 0;
private int successCounter = 0;
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
successCounter++;
} else {
errorCounter++;
}
}
@Override
public void close() {
// 保存結(jié)果
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
}
}
定義好interceptor之后,我們需要在producer中這樣指定即可,代碼如下:
List<String> interceptors = new ArrayList<>();
interceptors.add("com.xxx.kafka.TimeStampPrependerInterceptor"); // interceptor 1
interceptors.add("com.xxx.kafka.CounterInterceptor"); // interceptor 2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
// 一定要關(guān)閉producer扼睬,這樣才會調(diào)用interceptors中的close方法
producer.close();
寫了這么多,基本上將一個簡單消息從kafka producer發(fā)送時可以做的事情弄清了,但是我還是有一個疑問,分區(qū)器,攔截器,序列化他們之間有順序?
這個疑問留給大家自己去解決!!!