本章我們將會討論Kafka生產(chǎn)者是如何發(fā)送消息到Kafka的垒棋。Kafka項目有一個生產(chǎn)者客戶端,我們可以通過這個客戶端的API來發(fā)送消息薯演。
概要
當我們發(fā)送消息之前贸诚,先問幾個問題:每條消息都是很關(guān)鍵且不能容忍丟失么?偶爾重復消息可以么销钝?我們關(guān)注的是消息延遲還是寫入消息的吞吐量有咨?
舉個例子,有一個信用卡交易處理系統(tǒng)蒸健,當交易發(fā)生時會發(fā)送一條消息到Kafka座享,另一個服務來讀取消息并根據(jù)規(guī)則引擎來檢查交易是否通過,將結(jié)果通過Kafka返回似忧。對于這樣的業(yè)務渣叛,消息既不能丟失也不能重復,由于交易量大因此吞吐量需要盡可能大盯捌,延遲可以稍微高一點淳衙。
再舉個例子,假如我們需要收集用戶在網(wǎng)頁上的點擊數(shù)據(jù)饺著,對于這樣的場景箫攀,少量消息丟失或者重復是可以容忍的,延遲多大都不重要只要不影響用戶體驗幼衰,吞吐則根據(jù)實時用戶數(shù)來決定靴跛。
不同的業(yè)務需要使用不同的寫入方式和配置。后面我們將會討論這些API渡嚣,現(xiàn)在先看下生產(chǎn)者寫消息的基本流程:
流程如下:
- 首先汤求,我們需要創(chuàng)建一個ProducerRecord俏险,這個對象需要包含消息的主題(topic)和值(value),可以選擇性指定一個鍵值(key)或者分區(qū)(partition)扬绪。
- 發(fā)送消息時竖独,生產(chǎn)者會對鍵值和值序列化成字節(jié)數(shù)組,然后發(fā)送到分配器(partitioner)挤牛。
- 如果我們指定了分區(qū)莹痢,那么分配器返回該分區(qū)即可;否則墓赴,分配器將會基于鍵值來選擇一個分區(qū)并返回竞膳。
- 選擇完分區(qū)后,生產(chǎn)者知道了消息所屬的主題和分區(qū)诫硕,它將這條記錄添加到相同主題和分區(qū)的批量消息中坦辟,另一個線程負責發(fā)送這些批量消息到對應的Kafka broker。
- 當broker接收到消息后章办,如果成功寫入則返回一個包含消息的主題锉走、分區(qū)及位移的RecordMetadata對象,否則返回異常藕届。
- 生產(chǎn)者接收到結(jié)果后挪蹭,對于異常可能會進行重試休偶。
創(chuàng)建Kafka生產(chǎn)者
創(chuàng)建Kafka生產(chǎn)者有三個基本屬性:
- bootstrap.servers:屬性值是一個host:port的broker列表梁厉。這個屬性指定了生產(chǎn)者建立初始連接的broker列表,這個列表不需要包含所有的broker踏兜,因為生產(chǎn)者建立初始連接后會從相應的broker獲取到集群信息词顾。但建議指定至少包含兩個broker,這樣一個broker宕機后生產(chǎn)者可以連接到另一個broker碱妆。
- key.serializer:屬性值是類的名稱计技。這個屬性指定了用來序列化鍵值(key)的類。Kafka broker只接受字節(jié)數(shù)組山橄,但生產(chǎn)者的發(fā)送消息接口允許發(fā)送任何的Java對象垮媒,因此需要將這些對象序列化成字節(jié)數(shù)組。key.serializer指定的類需要實現(xiàn)org.apache.kafka.common.serialization.Serializer接口航棱,Kafka客戶端包中包含了幾個默認實現(xiàn)睡雇,例如ByteArraySerializer、StringSerializer和IntegerSerializer饮醇。
- value.serializer:屬性值是類的名稱它抱。這個屬性指定了用來序列化消息記錄的類,與key.serializer差不多朴艰。
Maven依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
下面是一個樣例代碼:
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(kafkaProps);
創(chuàng)建完生產(chǎn)者后观蓄,我們可以發(fā)送消息混移。Kafka中有三種發(fā)送消息的方式:
- 只發(fā)不管結(jié)果(fire-and-forget):只調(diào)用接口發(fā)送消息到Kafka服務器,但不管成功寫入與否侮穿。由于Kafka是高可用的歌径,因此大部分情況下消息都會寫入,但在異常情況下會丟消息亲茅。
- 同步發(fā)送(Synchronous send):調(diào)用send()方法返回一個Future對象回铛,我們可以使用它的get()方法來判斷消息發(fā)送成功與否。
- 異步發(fā)送(Asynchronous send):調(diào)用send()時提供一個回調(diào)方法克锣,當接收到broker結(jié)果后回調(diào)此方法茵肃。
本章的例子都是單線程發(fā)送的,但生產(chǎn)者對象是線程安全的袭祟,它支持多線程發(fā)送消息來提高吞吐验残。需要的話,我們可以使用多個生產(chǎn)者對象來進一步提高吞吐巾乳。
發(fā)送消息到Kafka
最簡單的發(fā)送消息方式如下:
ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
這里做了如下幾件事:
- 我們創(chuàng)建了一個ProducerRecord您没,并且指定了主題以及消息的key/value。主題總是字符串類型的想鹰,但key/value則可以是任意類型紊婉,在本例中也是字符串药版。需要注意的是辑舷,這里的key/value的類型需要與serializer和生產(chǎn)者的類型匹配。
- 使用send()方法來發(fā)送消息槽片,該方法會返回一個RecordMetadata的Future對象何缓,但由于我們沒有跟蹤Future對象,因此并不知道發(fā)送結(jié)果还栓。如前所述碌廓,這種方式可能會丟失消息。
- 雖然我們忽略了發(fā)送消息到broker的異常剩盒,但是我們調(diào)用send()方法時仍然可能會遇到一些異常谷婆,例如序列化異常、發(fā)送緩沖區(qū)溢出異常等等辽聊。
同步發(fā)送消息
同步發(fā)送方式可以簡單修改如下:
ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
注意到纪挎,這里使用了Future.get()來獲取發(fā)送結(jié)果,如果發(fā)送消息失敗則會拋出異常跟匆,否則返回一個RecordMetadata對象异袄。發(fā)送失敗異常包含:1)broker返回不可恢復異常,生產(chǎn)者直接拋出該異常玛臂;2)對于broker其他異常烤蜕,生產(chǎn)者會進行重試封孙,如果重試超過一定次數(shù)仍不成功則拋出異常。
可恢復異常指的是讽营,如果生產(chǎn)者進行重試可能會成功虎忌,例如連接異常;不可恢復異常則是進行重試也不會成功的異常斑匪,例如消息內(nèi)容過大呐籽。
異步發(fā)送消息
首先了解下什么場景下需要異步發(fā)送消息。假如生產(chǎn)者與broker之間的網(wǎng)絡延時為10ms蚀瘸,我們發(fā)送100條消息狡蝶,發(fā)送每條消息都等待結(jié)果,那么需要1秒的時間贮勃。而如果我們采用異步的方式贪惹,幾乎沒有任何耗時,而且我們還可以通過回調(diào)知道消息的發(fā)送結(jié)果寂嘉。
異步發(fā)送消息的樣例如下:
public class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry", "Precision Products", "France");
producer.send(record, new DemoProducerCallback());
異步回調(diào)的類需要實現(xiàn)org.apache.kafka.clients.producer.Callback接口奏瞬,這個接口只有一個onCompletion方法。當Kafka返回異常時泉孩,異常值不為null硼端,代碼中只是簡單的打印,但我們可以采取其他處理方式寓搬。
kafka生產(chǎn)者 配置
- acks 和 timeout.ms
timeout.ms(0.9.0.0版本中就被棄用)
指定了 broker 等待同步副本返回消息確認的時間珍昨,與 asks 的配置相匹配——如果在指定時間內(nèi)沒有收到同步副本的確認,那么 broker 就會返回一個錯誤句喷。-
acks = 1
指定了必須要有多少個分區(qū)副本收到消息镣典,生產(chǎn)者才會認為消息寫入是成功的。這個參數(shù)對消
息丟失的可能性有重要影響唾琼。該參數(shù)有如下選項:acks=0兄春,生產(chǎn)者在成功寫入消息之前不會等待任何來自服務器的響應。也就是說锡溯,如果當中
出現(xiàn)了問題赶舆,導致服務器沒有收到消息,那么生產(chǎn)者就無從得知祭饭,消息也就丟失了芜茵。不過,因為
生產(chǎn)者不需要等待服務器的響應甜癞,所以它可以以網(wǎng)絡能夠支持的最大速度發(fā)送消息夕晓,從而達到很
高的吞吐量。acks=1悠咱,只要集群的 Leader 節(jié)點收到消息蒸辆,生產(chǎn)者就會收到一個來自服務器的成功響應征炼。如果消息無法到達 Leader 節(jié)點(比如首領(lǐng)節(jié)點崩潰,新的 Leader 還沒有被選舉出來)躬贡,生產(chǎn)者會收到一個錯誤響應谆奥,為了避免數(shù)據(jù)丟失,生產(chǎn)者會重發(fā)消息拂玻。不過酸些,如果一個沒有收到消息的節(jié)點成為新Leader,消息還是會丟失檐蚜。這個時候的吞吐量取決于使用的是同步發(fā)送還是異步發(fā)送魄懂。如果讓發(fā)送客戶端等待服務器的響應(通過調(diào)用 Future 對象的 get() 方法),顯然會增加延遲(在網(wǎng)絡上傳輸一個來回的延遲)闯第。如果客戶端使用回調(diào)市栗,延遲問題就可以得到緩解,不過吞吐量還是會受發(fā)送中消息數(shù)量的限制(比如咳短,生產(chǎn)者在收到服務器響應之前可以發(fā)送多少個消息)填帽。
如果 acks=all,只有當所有參與復制的節(jié)點全部收到消息時咙好,生產(chǎn)者才會收到一個來自服務器的成功響應篡腌。這種模式是最安全的,它可以保證不止一個服務器收到消息勾效,就算有服務器發(fā)生崩潰嘹悼,整個集群仍然可以運行。不過葵第,它的延遲比 acks=1 時更高绘迁,因為我們要等待不只一個服務器節(jié)點接收消息合溺。
buffer.memory=33554432
該參數(shù)用來設置生產(chǎn)者內(nèi)存緩沖區(qū)的大小卒密,生產(chǎn)者用它緩沖要發(fā)送到服務器的消息。如果生產(chǎn)消息的速度超過發(fā)送的速度棠赛,會導致生產(chǎn)者空間不足哮奇。這個時候,send()
方法調(diào)用要么被阻塞睛约,要么拋出異常鼎俘,取決于如何設置block.on.buffer.full
參數(shù)(在 0.9.0.0 版本里被替換成了max.block.ms
,表示在拋出異常之前可以阻塞一段時間)compression.type=none
默認情況下辩涝,消息發(fā)送時不會被壓縮贸伐。該參數(shù)可以設置為snappy
、gzip
或lz4
怔揩,它指定了消息被發(fā)送給 broker 之前使用哪一種壓縮算法進行壓縮捉邢。
- snappy 壓縮算法由 Google 發(fā)明脯丝,占用較少的 CPU,卻能提供較好的性能和相當可觀的壓縮比伏伐,如果比較關(guān)注性能和網(wǎng)絡帶寬宠进,可以使用這種算法。
- gzip 壓縮算法一般會占用較多的 CPU藐翎,但會提供更高的壓縮比材蹬,所以如果網(wǎng)絡帶寬比較有限,可以使用這種算法吝镣。
使用壓縮可以降低網(wǎng)絡傳輸開銷和存儲開銷堤器,而這往往是向 Kafka 發(fā)送消息的瓶頸所在。
- retries 和 retry.backoff.ms
retries=0
生產(chǎn)者從服務器收到的錯誤有可能是臨時性的錯誤(比如分區(qū)找不到 Leader)末贾。在這種情況下吼旧,retries
參數(shù)的值決定了生產(chǎn)者可以重發(fā)消息的次數(shù),如果達到這個次數(shù)未舟,生產(chǎn)者會放棄重試并返回錯誤圈暗。retry.backoff.ms=100
默認情況下,生產(chǎn)者會在每次重試之間等待 100ms裕膀,不過可以通過retry.backoff.ms
參數(shù)來改變這個時間間隔员串。建議在設置重試次數(shù)和重試時間間隔之前,先測試一下恢復一個崩潰節(jié)點需要多少時間(比如所有分區(qū)選舉出 Leader 需要多長時間)昼扛,讓總的重試時間比 Kafka 集群從崩潰中恢復的時間長寸齐,否則生產(chǎn)者會過早地放棄重試。不過有些錯誤不是臨時性錯誤抄谐,沒辦法通過重試來解決(比如“消息太大”錯誤)渺鹦。一般情況下,因為生產(chǎn)者會自動進行重試蛹含,所以就沒必要在代碼邏輯里處理那些可重試的錯誤毅厚。你只需要處理那些不可重試的錯誤和重試次數(shù)超出上限的情況。
- batch.size 和 linger.ms
-
batch.size:=16384
當有多個消息需要被發(fā)送到同一個分區(qū)時浦箱,生產(chǎn)者會把它們放在同一個批次里吸耿。該參數(shù)指定了一個批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計算(而不是消息個數(shù))酷窥。 -
linger.ms:=0
指定了生產(chǎn)者在每次發(fā)送消息的時間間隔
當批次被填滿 或者 等待時間達到
linger.ms
設置的間隔時間咽安,批次里的所有消息會被發(fā)送出去,哪怕此時該批次只有一條消息蓬推。
所以就算把批次大小設置得很大妆棒,也不會造成延遲,只是會占用更多的內(nèi)存而已。但如果設置得太小糕珊,因為生產(chǎn)者需要更頻繁地發(fā)送消息蛋铆,會增加一些額外的開銷。
client.id=''
該參數(shù)可以是任意的字符串放接,服務器會用它來識別消息的來源max.in.flight.requests.per.connection=5
該參數(shù)指定了生產(chǎn)者在收到服務器響應之前可以發(fā)送多少個消息刺啦。它的值越高,就會占用越多的內(nèi)存纠脾,不過也會提升吞吐量玛瘸。把它設為 1 可以保證消息是按照發(fā)送的順序?qū)懭敕掌鞯模词拱l(fā)生了重試苟蹈。
如何保證順序性:如果把 retries 設為非零整數(shù)糊渊,同時把
max.in.flight.requests.per.connection
設為比 1 大的數(shù),那么慧脱,如果第一個批次消息寫入失敗渺绒,而第二個批次寫入成功,broker 會重試寫入第一個批次菱鸥。如果此時第一個批次也寫入成功宗兼,那么兩個批次的順序就反過來了。一般來說氮采,如果某些場景要求消息是有序的殷绍,那么消息是否寫入成功也是很關(guān)鍵的,所以不建議把
retries
設為 0鹊漠≈鞯剑可以把max.in.flight.requests.per.connection
設為 1,這樣在生產(chǎn)者嘗試發(fā)送第一批消息時躯概,就不會有其他的消息發(fā)送給broker登钥。不過這樣會嚴重影響生產(chǎn)者的吞吐量,所以只有在對消息的順序有嚴格要求的情況下才能這么做娶靡。
- request.timeout.ms 和 metadata.fetch.timeout.ms
-
request.timeout.ms=305000
指定了生產(chǎn)者在發(fā)送數(shù)據(jù)時等待服務器返回響應的時間 -
metadata.fetch.timeout.ms (0.9.0.0版本中就被棄用)
指定了生產(chǎn)者在獲取元數(shù)據(jù)(比如目標分區(qū)的 Leader 是誰)時等待服務器返回響應的時間牧牢。如果等待響應超時,那么生產(chǎn)者要么重試發(fā)送數(shù)據(jù)固蛾,要么返回一個錯誤(拋出異辰嶂矗或執(zhí)行回調(diào))度陆。
-
max.request.size=1048576
該參數(shù)用于控制生產(chǎn)者發(fā)送的請求大小艾凯。它可以指能發(fā)送的單個消息的最大值,也可以指單個請求里所有消息總的大小懂傀。例如趾诗,假設這個值為 1MB,那么可以發(fā)送的單個最大消息為 1MB,或者生產(chǎn)者可以在單個請求里發(fā)送一個批次恃泪,該批次包含了 1000 個消息郑兴,每個消息大小為 1KB。另外贝乎,broker 對可接收的消息最大值也有自己的限制(message.max.bytes
)情连,所以兩邊的配置最好可以匹配,避免生產(chǎn)者發(fā)送的消息被 broker 拒絕览效。
注意區(qū)分
batch.size
只是針對一個 topic 的 partition却舀,而max.request.size
針對單次請求的。
-
receive.buffer.bytes=32768 和 send.buffer.bytes=131072
這兩個參數(shù)分別指定了 TCP socket 接收和發(fā)送數(shù)據(jù)包的緩沖區(qū)大小锤灿。如果它們被設為 -1挽拔,就使用操作系統(tǒng)的默認值。如果生產(chǎn)者或消費者與 broker 處于不同的數(shù)據(jù)中心但校,那么可以適當增大這些值螃诅,因為跨數(shù)據(jù)中心的網(wǎng)絡一般都有比較高的延遲和比較低的帶寬。
關(guān)于更多的配置信息状囱,可以查看:http://kafka.apachecn.org/documentation.html#configuration
完整實例
package com.neuedu;
import java.util.Properties;
import org.apache.kafka.clients.producer.*;
public class Producer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers",
"hadoop03:9092,hadoop05:9092,hadoop06:9092");//該地址是集群的子集术裸,用來探測集群。
props.put("acks", "all");// 記錄完整提交亭枷,最慢的但是最大可能的持久化
props.put("retries", 3);// 請求失敗重試的次數(shù)
props.put("batch.size", 16384);// batch的大小
props.put("linger.ms", 1);// 默認情況即使緩沖區(qū)有剩余的空間穗椅,也會立即發(fā)送請求,設置一段時間用來等待從而將緩沖區(qū)填的更多奶栖,單位為毫秒匹表,producer發(fā)送數(shù)據(jù)會延遲1ms,可以減少發(fā)送到kafka服務器的請求數(shù)據(jù)
props.put("buffer.memory", 33554432);// 提供給生產(chǎn)者緩沖內(nèi)存總量
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");// 序列化的方式宣鄙,
// ByteArraySerializer或者StringSerializer
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
{
producer.send(new ProducerRecord<String, String>("payment", Integer.toString(i), Integer.toString(i)));
}
producer.close();
}
}
通過上面的一些講解袍镀,應該已經(jīng)可以比較友好的使用 kafka生產(chǎn)者了,接下來我們還剩下最后一個部分冻晤,kafka的分區(qū)
分區(qū)
我們創(chuàng)建消息的時候苇羡,必須要提供主題和消息的內(nèi)容,而消息的key是可選的鼻弧,當不指定key時默認為null设江。消息的key有兩個重要的作用:1)提供描述消息的額外信息;2)用來決定消息寫入到哪個分區(qū)攘轩,所有具有相同key的消息會分配到同一個分區(qū)中叉存。
如果key為null,那么生產(chǎn)者會使用默認的分配器度帮,該分配器使用輪詢(round-robin)算法來將消息均衡到所有分區(qū)歼捏。
如果key不為null而且使用的是默認的分配器稿存,那么生產(chǎn)者會對key進行哈希并根據(jù)結(jié)果將消息分配到特定的分區(qū)。注意的是瞳秽,在計算消息與分區(qū)的映射關(guān)系時瓣履,使用的是全部的分區(qū)數(shù)而不僅僅是可用的分區(qū)數(shù)。這也意味著练俐,如果某個分區(qū)不可用(雖然使用復制方案的話這極少發(fā)生)袖迎,而消息剛好被分配到該分區(qū),那么將會寫入失敗腺晾。另外瓢棒,如果需要增加額外的分區(qū),那么消息與分區(qū)的映射關(guān)系將會發(fā)生改變丘喻,因此盡量避免這種情況脯宿。
自定義分配器
在kafka配置參數(shù)時設置分區(qū)器的類
//設置自定義分區(qū)
kafkaProps.put("partitioner.class", "com.chb.partitioner.MyPartitioner");
現(xiàn)在來看下如何自定義一個分配器,下面將key為Banana的消息單獨放在一個分區(qū)泉粉,與其他的消息進行分區(qū)隔離:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;
public class BananaPartitioner implements Partitioner {
public void configure(Map<String, ?> configs) {}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if ((keyBytes == null) || (!(key instanceOf String)))
throw new InvalidRecordException("We expect all messages to have customer name as key")
if (((String) key).equals("Banana"))
return numPartitions - 1; // Banana will always go to last partition
// Other records will get hashed to the rest of the partitions
return (Math.abs(Utils.murmur2(keyBytes)) % numPartitions)
}
public void close() {}
}