不管是把 Kafka 作為消息隊(duì)列、消息担忧、總線還是數(shù)據(jù)存儲(chǔ)平臺(tái)來(lái)使用 偷拔,總是需要有一個(gè)可以往 Kafka 寫入數(shù)據(jù)的生產(chǎn)者和一個(gè)可以從 Kafka讀取數(shù)據(jù)的消費(fèi)者,或者一個(gè)兼具兩種角 色的應(yīng)用程序厢绝。
例如,在一個(gè)信用卡事務(wù)處理系統(tǒng)里剿骨,有一個(gè)客戶端應(yīng)用程序代芜,它可能是一個(gè)在線商店, 每當(dāng)有支付行為發(fā)生時(shí)浓利,它負(fù)責(zé)把事務(wù)發(fā)送到 Kafka上挤庇。另一個(gè)應(yīng)用程序根據(jù)規(guī)則引擎檢 查這個(gè)事務(wù),決定是批準(zhǔn)還是拒絕贷掖。 批準(zhǔn)或拒絕的響應(yīng)消息被寫回 Kafka嫡秕,然后發(fā)送給發(fā)起事務(wù)的在線商店。第三個(gè)應(yīng)用程序從 Kafka上讀取事務(wù)和審核狀態(tài)苹威,把它們保存到數(shù)據(jù) 庫(kù)昆咽, 隨后分析師可以對(duì)這些結(jié)果進(jìn)行分析,或許還能借此改進(jìn)規(guī)則引擎 牙甫。
開(kāi)發(fā)者們可以使用 Kafka 內(nèi)置的客戶端 API開(kāi)發(fā) Kafka應(yīng)用程序掷酗。
在這一章,我們將從 Kafra生產(chǎn)者的設(shè)計(jì)和組件講起窟哺,學(xué)習(xí)如何使用 Kafka生產(chǎn)者泻轰。我們將展示如何創(chuàng)建 KafkaProducer和 ProducerRecords對(duì)象、如何將記錄發(fā)送給 Kafka且轨,以及如何處理從 Kafka 返回的錯(cuò)誤浮声,然后介紹用干控制生產(chǎn)者行為的重要配置選項(xiàng),最后深入 探討如何使用不同的分區(qū)方法和序列化器旋奢,以及如何自定義序列化器和分區(qū)器 泳挥。
在下一章,我們將會(huì)介紹 Kafra的悄費(fèi)者客戶端至朗,以及如何從 Kafka讀取消息屉符。
生產(chǎn)者概覽
一個(gè)應(yīng)用程序在很多情況下需要往 Kafka 寫入消息 : 記錄用戶的活動(dòng)(用于審計(jì)和分析 )、 記錄度量指標(biāo)、保存日志筑煮、消息辛蚊、記錄智能家電的信息、與其他應(yīng)用程序進(jìn)行異步通信真仲、 緩沖即將寫入到數(shù)據(jù)庫(kù)的數(shù)據(jù)袋马,等等。
多樣的使用場(chǎng)景意味著多樣的需求:是否每個(gè)消息都很重要?是否允許丟失 一 小部分消息?偶爾出現(xiàn)重復(fù)消息是否可以接受?是否有嚴(yán)格的延遲和吞吐量要求?
在之前提到的信用卡事務(wù)處理系統(tǒng)里秸应,消息丟失或消息重復(fù)是不允許的虑凛,可以接受的延遲最大為 500ms,對(duì)吞吐量要求較高软啼,我們希望每秒鐘可以處理一百萬(wàn)個(gè)消息桑谍。
保存網(wǎng)站的點(diǎn)擊信息是另 一種使用場(chǎng)景。在這個(gè)場(chǎng)景里祸挪,允許丟失少量的消息或出現(xiàn)少量 的消息重復(fù)锣披,延遲可以高一些,只要不影響用戶體驗(yàn)就行贿条。換句話說(shuō)雹仿,只要用戶點(diǎn)擊鏈接 后可以馬上加載頁(yè)面,那么我們并不介意消息要在幾秒鐘之后才能到達(dá) Kafka 服務(wù)器整以。 吞 吐量則取決于網(wǎng)站用戶使用網(wǎng)站的頻度胧辽。
不同的使用場(chǎng)景對(duì)生產(chǎn)者 API 的使用和配置會(huì)有直接的影響。
盡管生產(chǎn)者 API 使用起來(lái)很簡(jiǎn)單 公黑,但消息的發(fā)送過(guò)程還是有點(diǎn)復(fù)雜的邑商。下圖展示 了向Kafka 發(fā)送消息的主要步驟。
Kafka 生產(chǎn)者組件圖
我們從創(chuàng)建 一個(gè) ProducerRecord 對(duì)象開(kāi)始凡蚜, ProducerRecord 對(duì)象需要包含目標(biāo)主題和要發(fā)送的內(nèi)容人断。我們還可以指定鍵或分區(qū)。在發(fā)送 ProducerRecord對(duì)象時(shí)朝蜘,生產(chǎn)者要先把鍵和 值對(duì)象序列化成字節(jié)數(shù)組含鳞,這樣它們才能夠在網(wǎng)絡(luò)上傳輸 。
接下來(lái)芹务,數(shù)據(jù)被傳給分區(qū)器。如果之前在 ProducerRecord對(duì)象里指定了分區(qū)鸭廷,那么分區(qū)器就不會(huì)再做任何事情枣抱,直接把指定的分區(qū)返回。如果沒(méi)有指定分區(qū) 辆床,那么分區(qū)器會(huì)根據(jù) ProducerRecord對(duì)象的鍵來(lái)選擇一個(gè)分區(qū) 佳晶。選好分區(qū)以后 ,生產(chǎn)者就知道該往哪個(gè)主題和分區(qū)發(fā)送這條記錄了讼载。緊接著轿秧,這條記錄被添加到一個(gè)記錄批次里中跌,這個(gè)批次里的所有消息會(huì)被發(fā)送到相同的主題和分區(qū)上。有一個(gè)獨(dú)立的線程負(fù)責(zé)把這些記錄批次發(fā)送到相應(yīng)的 broker 上菇篡。
服務(wù)器在收到這些消息時(shí)會(huì)返回一個(gè)響應(yīng)漩符。如果消息成功寫入 Kafka,就返回 一 個(gè) RecordMetaData 對(duì)象驱还,它包含了主題和分區(qū)信息嗜暴,以及記錄在分區(qū)里的偏移量。如果寫入 失敗议蟆, 就會(huì)返回 一個(gè)錯(cuò)誤 闷沥。生產(chǎn)者在收到錯(cuò)誤之后會(huì)嘗試重新發(fā)送消息,幾次之后如果還是失敗咐容,就返回錯(cuò)誤信息舆逃。
創(chuàng)建Kafka生產(chǎn)者
要往 Kafka寫入消息,首先要?jiǎng)?chuàng)建一個(gè)生產(chǎn)者對(duì)象戳粒,井設(shè)置一些屬性路狮。
下面的代碼片段展示了如何創(chuàng)建一個(gè)新的生產(chǎn)者,這里只指定了必要的屬性享郊,其他使用默認(rèn)設(shè)置览祖。
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092");
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.seializer","org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);
Kafka生產(chǎn)者有 3個(gè)必選的屬性
bootstrap.servers
該屬性指定 broker 的地址清單,地址的格式為 host:port炊琉。清單里不需要包含所有的broker地址展蒂,生產(chǎn)者會(huì)從給定的 broker里查找到其他 broker的信息。不過(guò)建議至少要提供兩個(gè) broker的信息苔咪, 一旦其中一個(gè)宕機(jī)锰悼,生產(chǎn)者仍然能夠連接到集群上。
key.serializer
broker希望接收到的消息的鍵和值都是字節(jié)數(shù)組团赏。生產(chǎn)者接口允許使用參數(shù)化類型箕般,因此可以把 Java對(duì)象作為鍵和值發(fā)送給 broker。這樣的代碼具有良好的可讀性舔清,不過(guò)生產(chǎn)者需要知道如何把這些 Java對(duì)象轉(zhuǎn)換成字節(jié)數(shù)組丝里。 key.serializer必須被設(shè)置為一個(gè)實(shí)現(xiàn)了org.apache.kafka.common.serialization.Serializer接口的類,生產(chǎn)者會(huì)使用這個(gè)類把鍵對(duì)象序列化成字節(jié)數(shù)組体谒。 Kafka 客戶端默認(rèn)提供了ByteArraySerializer(這個(gè)只做很少的事情)杯聚、 StringSerializer和 IntegerSerializer,因此抒痒,如果你只使用常見(jiàn)的幾種 Java對(duì)象類型幌绍,那么就沒(méi)必要實(shí)現(xiàn)自己的序列化器 。要注意, key.serializer是必須設(shè)置的傀广,就算你打算只發(fā)送值內(nèi)容颁独。
value.serializer
與 key.serializer一樣, value.serializer指定的類會(huì)將值序列化伪冰。如果鍵和值都是字符串誓酒,可以使用與 key.serializer 一樣的序列化器。如果鍵是整數(shù)類型而值是字符扇 糜值, 那么需要使用不同的序列化器丰捷。
發(fā)送消息主要有3種方式:
1、發(fā)送并忘記( fire-and-forget):我們把消息發(fā)送給服務(wù)器寂汇,但井不關(guān)心它是否正常到達(dá)病往。大多數(shù)情況下,消息會(huì)正常到達(dá)骄瓣,因?yàn)?Kafka是高可用的停巷,而且生產(chǎn)者會(huì)自動(dòng)嘗試重發(fā)。不過(guò)榕栏,使用這種方式有時(shí)候也會(huì)丟失一些消息畔勤。
2、同步發(fā)送:我們使用send()方怯發(fā)送消息扒磁, 它會(huì)返回一個(gè)Future對(duì)象庆揪,調(diào)用get()方法進(jìn)行等待, 就可以知道悄息是否發(fā)送成功妨托。
3缸榛、異步發(fā)送:我們調(diào)用 send() 方怯,并指定一個(gè)回調(diào)函數(shù)兰伤, 服務(wù)器在返回響應(yīng)時(shí)調(diào)用該函數(shù)内颗。
在下面的幾個(gè)例子中 , 我們會(huì)介紹如何使用上述幾種方式來(lái)發(fā)送消息敦腔,以及如何處理可能 發(fā)生的異常情況均澳。
本章的所有例子都使用單線程,但其實(shí)生產(chǎn)者是可以使用多線程來(lái)發(fā)送消息的符衔。剛開(kāi)始的 時(shí)候可以使用單個(gè)消費(fèi)者和單個(gè)線程找前。如果需要更高的吞吐量,可以在生產(chǎn)者數(shù)量不變的 前提下增加線程數(shù)量判族。如果這樣做還不夠 躺盛, 可以增加生產(chǎn)者數(shù)量。
發(fā)送消息到Kafka
最簡(jiǎn)單的同步發(fā)送消息方式如下所示 :
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try{
producer.send(record);
} catch(Exception e) {
e.printStack();
}
生產(chǎn)者的 send() 方住將 ProducerRecord對(duì)象作為參數(shù)五嫂,它需要目標(biāo)主題的名字和要發(fā)送的鍵和值對(duì)象,它們都是字符串。鍵和值對(duì)象的類型必須與序列化器和生產(chǎn)者對(duì)象相匹配沃缘。
我們使用生產(chǎn)者的 send() 方越發(fā)送 ProducerRecord對(duì)象躯枢。從生產(chǎn)者的架構(gòu)圖里可以看到,消息先是被放進(jìn)緩沖區(qū)槐臀,然后使用單獨(dú)的線程發(fā)送到服務(wù)器端锄蹂。 send() 方法會(huì)返回一個(gè)包含 RecordMetadata 的 Future對(duì)象,不過(guò)因?yàn)槲覀儠?huì)忽略返回值水慨,所以無(wú)法知道消息是否發(fā)送成功得糜。如果不關(guān)心發(fā)送結(jié)果,那么可以使用這種發(fā)送方式晰洒。比如朝抖,記錄 Twitter 消息日志,或記錄不太重要的應(yīng)用程序日志谍珊。
我們可以忽略發(fā)送消息時(shí)可能發(fā)生的錯(cuò)誤或在服務(wù)器端可能發(fā)生的錯(cuò)誤治宣,但在發(fā)送消息之前,生產(chǎn)者還是有可能發(fā)生其他的異常砌滞。這些異常有可能是 SerializationException (說(shuō)明序列化消息失敗)侮邀、 BufferExhaustedException 或 TimeoutException (說(shuō)明緩沖區(qū)已滿),又或者是 InterruptException (說(shuō)明發(fā)送線程被中斷)贝润。
同步發(fā)送消息
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try{
producer.send(record).get();
} catch(Exception e) {
e.printStack();
}
在這里绊茧, producer.send() 方住先返回一個(gè) Future對(duì)象,然后調(diào)用 Future對(duì)象的 get() 方法等待 Kafka 響應(yīng)打掘。如果服務(wù)器返回錯(cuò)誤华畏, get()方怯會(huì)拋出異常。如果沒(méi)有發(fā)生錯(cuò)誤胧卤,我們會(huì)得到一個(gè) RecordMetadata對(duì)象唯绍,可以用它獲取消息的偏移量。如果在發(fā)送數(shù)據(jù)之前或者在發(fā)送過(guò)程中發(fā)生了任何錯(cuò)誤 枝誊,比如 broker返回 了一個(gè)不允許重發(fā)消息的異晨雒ⅲ或者已經(jīng)超過(guò)了重發(fā)的次數(shù) ,那么就會(huì)拋出異常叶撒。我們只是簡(jiǎn)單地把異常信息打印出來(lái)绝骚。
如何處理從Kafka生產(chǎn)者返回的錯(cuò)誤
KafkaProducer一般會(huì)發(fā)生兩類錯(cuò)誤。其中一類是可重試錯(cuò)誤 祠够,這類錯(cuò)誤可以通過(guò)重發(fā)消息來(lái)解決压汪。比如對(duì)于連接錯(cuò)誤,可以通過(guò)再次建立連接來(lái)解決古瓤,“無(wú)主(noleader)” 錯(cuò)誤則可 以通過(guò)重新為分區(qū)選舉首領(lǐng)來(lái)解決止剖。 KafkaProducer可以被配置成自動(dòng)重試腺阳,如果在多次重試后仍無(wú)能解決問(wèn)題,應(yīng)用程序會(huì)收到一個(gè)重試異常穿香。另一類錯(cuò)誤無(wú)出通過(guò)重試解決 亭引,比如“消息太大”異常。對(duì)于這類錯(cuò)誤皮获, KafkaProducer不會(huì)進(jìn)行任何重試焙蚓,直接拋出異常。
異步發(fā)送消息
假設(shè)消息在應(yīng)用程序和 Kafka集群之間一個(gè)來(lái)回需要 10ms洒宝。如果在發(fā)送完每個(gè)消息后都等待回應(yīng)购公,那么發(fā)送 100個(gè)消息需要 1秒。但如果只發(fā)送消息而不等待響應(yīng)雁歌,那么發(fā)送100個(gè)消息所需要的時(shí)間會(huì)少很多宏浩。大多數(shù)時(shí)候,我們并不需要等待響應(yīng)——盡管 Kafka 會(huì)把目標(biāo)主題将宪、分區(qū)信息和消息的偏移量發(fā)送回來(lái)绘闷,但對(duì)于發(fā)送端的應(yīng)用程序來(lái)說(shuō)不是必需的。不過(guò)在遇到消息發(fā)送失敗時(shí)较坛,我們需要拋出異常印蔗、記錄錯(cuò)誤日志,或者把消息寫入 “錯(cuò)誤消息”文件以便日后分析丑勤。
為了在異步發(fā)送消息的同時(shí)能夠?qū)Ξ惓G闆r進(jìn)行處理华嘹,生產(chǎn)者提供了回調(diào)支持 。下面是使用異步發(fā)送消息法竞、回調(diào)的一個(gè)例子耙厚。
生產(chǎn)者的配置
到目前為止 , 我們只介紹了生產(chǎn)者的幾個(gè)必要配置參數(shù)——bootstrap.servers API 以及序列化器岔霸。
生產(chǎn)者還有很多可配置的參數(shù)薛躬,在 Kafka文檔里都有說(shuō)明,它們大部分都有合理的默認(rèn)值 呆细, 所以沒(méi)有必要去修改它們 型宝。不過(guò)有幾個(gè)參數(shù)在內(nèi)存使用、性能和可靠性方面對(duì)生產(chǎn)者影響比較大絮爷,接下來(lái)我們會(huì)一一說(shuō)明趴酣。
1. acks
acks 參數(shù)指定了必須要有多少個(gè)分區(qū)副本收到消息,生產(chǎn)者才會(huì)認(rèn)為消息寫入是成功的坑夯。
這個(gè)參數(shù)對(duì)消息丟失的可能性有重要影響岖寞。 該參數(shù)有如下選項(xiàng)。
? 如果 acks=0柜蜈, 生產(chǎn)者在成功寫入悄息之前不會(huì)等待任何來(lái)自服務(wù)器的響應(yīng)仗谆。也就是說(shuō)指巡, 如果當(dāng)中出現(xiàn)了問(wèn)題 , 導(dǎo)致服務(wù)器沒(méi)有收到消息隶垮,那么生產(chǎn)者就無(wú)從得知厌处,消息也就丟 失了。不過(guò)岁疼,因?yàn)樯a(chǎn)者不需要等待服務(wù)器的響應(yīng),所以它可以以網(wǎng)絡(luò)能夠支持的最大 速度發(fā)送消息缆娃,從而達(dá)到很高的吞吐量捷绒。
? 如果 acks=1,只要集群的首領(lǐng)節(jié)點(diǎn)收到消息贯要,生產(chǎn)者就會(huì)收到 一個(gè)來(lái)自服務(wù)器的成功 響應(yīng)暖侨。如果消息無(wú)撞到達(dá)首領(lǐng)節(jié)點(diǎn)(比如首領(lǐng)節(jié)點(diǎn)崩憤,新的首領(lǐng)還沒(méi)有被選舉出來(lái))崇渗, 生產(chǎn)者會(huì)收到一個(gè)錯(cuò)誤響應(yīng)字逗,為了避免數(shù)據(jù)丟失,生產(chǎn)者會(huì)重發(fā)消息宅广。不過(guò)葫掉,如果一個(gè) 沒(méi)有收到消息的節(jié)點(diǎn)成為新首領(lǐng),消息還是會(huì)丟失跟狱。這個(gè)時(shí)候的吞吐量取決于使用的是 同步發(fā)送還是異步發(fā)送俭厚。如果讓發(fā)送客戶端等待服務(wù)器的響應(yīng)(通過(guò)調(diào)用 Future對(duì)象 的 get()方法),顯然會(huì)增加延遲(在網(wǎng)絡(luò)上傳輸一個(gè)來(lái)回的延遲)驶臊。如果客戶端使用異步回調(diào)挪挤,延遲問(wèn)題就可以得到緩解,不過(guò)吞吐量還是會(huì)受發(fā)送中消息數(shù)量的限制(比如关翎,生 產(chǎn)者在收到服務(wù)器響應(yīng)之前可以發(fā)送多少個(gè)消息)扛门。
? 如果 acks=all,只有當(dāng)所有參與復(fù)制的節(jié)點(diǎn)全部收到消息時(shí)纵寝,生產(chǎn)者才會(huì)收到一個(gè)來(lái)自服務(wù)器的成功響應(yīng)论寨。這種模式是最安全的,它可以保證不止一個(gè)服務(wù)器收到消息店雅,就算有服務(wù)器發(fā)生崩潰政基,整個(gè)集群仍然可以運(yùn)行(第 5 章將討論更多的細(xì)節(jié))。不過(guò)闹啦,它的延遲比 acks=1時(shí)更高沮明,因?yàn)槲覀円却恢灰粋€(gè)服務(wù)器節(jié)點(diǎn)接收消息。
2. buffer.memory
該參數(shù)用來(lái)設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小窍奋,生產(chǎn)者用它緩沖要發(fā)送到服務(wù)器的消息荐健。如果 應(yīng)用程序發(fā)送消息的速度超過(guò)發(fā)送到服務(wù)器的速度酱畅,會(huì)導(dǎo)致生產(chǎn)者空間不足。這個(gè)時(shí)候江场, send()方法調(diào)用要么被阻塞纺酸,要么拋出異常,取決于如何設(shè)置 block.on.buffe.full 參數(shù) (在0.9.0.0版本里被替換成了max.block.ms址否,表示在拋出異常之前可以阻塞一段時(shí)間)餐蔬。
3. compression.type
默認(rèn)情況下捐友,消息發(fā)送時(shí)不會(huì)被壓縮喘蟆。該參數(shù)可以設(shè)置為 snappy、 gzip 或 lz4鹅巍,它指定了消息被發(fā)送給 broker之前使用哪一種壓縮算法進(jìn)行壓縮音同。 snappy 壓縮算怯由 Google巳發(fā)明词爬, 它占用較少 的 CPU,卻能提供較好的性能和相當(dāng)可觀的壓縮比权均,如果比較關(guān)注性能和網(wǎng)絡(luò)帶寬顿膨,可以使用這種算法。 gzip壓縮算法一般會(huì)占用較多的 CPU叽赊,但會(huì)提供更高的壓縮比恋沃,所以如果網(wǎng)絡(luò)帶寬比較有限,可以使用這種算法必指。使用壓縮可以降低網(wǎng)絡(luò)傳輸開(kāi)銷和存儲(chǔ)開(kāi)銷芽唇,而這往往是向 Kafka發(fā)送消息的瓶頸所在。
4. retries
生產(chǎn)者從服務(wù)器收到的錯(cuò)誤有可能是臨時(shí)性的錯(cuò)誤(比如分區(qū)找不到首領(lǐng))取劫。在這種情況下匆笤, retries參數(shù)的值決定了生產(chǎn)者可以重發(fā)消息的次數(shù),如果達(dá)到這個(gè)次數(shù)谱邪,生產(chǎn)者會(huì)放棄重試并返回錯(cuò)誤炮捧。默認(rèn)情況下,生產(chǎn)者會(huì)在每次重試之間等待 1OOms惦银,不過(guò)可以通過(guò) retries.backoff.ms 參數(shù)來(lái)改變這個(gè)時(shí)間間隔咆课。建議在設(shè)置重試次數(shù)和重試時(shí)間間隔之前, 先測(cè)試一下恢復(fù)一個(gè)崩潰節(jié)點(diǎn)需要多少時(shí)間(比如所有分區(qū)選舉出首領(lǐng)需要多長(zhǎng)時(shí)間)扯俱, 讓總的重試時(shí)間比 Kafka集群從崩潰中恢復(fù)的時(shí)間長(zhǎng)书蚪,否則生產(chǎn)者會(huì)過(guò)早地放棄重試。不過(guò)有些錯(cuò)誤不是臨時(shí)性錯(cuò)誤迅栅,沒(méi)辦怯通過(guò)重試來(lái)解決(比如“悄息太大”錯(cuò)誤)殊校。一般情 況下,因?yàn)樯a(chǎn)者會(huì)自動(dòng)進(jìn)行重試读存,所以就沒(méi)必要在代碼邏輯里處理那些可重試的錯(cuò)誤为流。 你只需要處理那些不可重試的錯(cuò)誤或重試次數(shù)超出上限的情況呕屎。
5. batch.size
當(dāng)有多個(gè)消息需要被發(fā)送到同一個(gè)分區(qū)時(shí),生產(chǎn)者會(huì)把它們放在放一個(gè)批次里敬察。該參數(shù)指定了一個(gè)批次可以使用的內(nèi)存大小秀睛,按照字節(jié)數(shù)計(jì)算(而不是消息個(gè)數(shù))。當(dāng)批次被填滿莲祸,批次里的所有消息會(huì)被發(fā)送出去蹂安。不過(guò)生產(chǎn)者井不一定都會(huì)等到批次被填滿才發(fā)送,半捕 的批次锐帜,甚至只包含一個(gè)消息的批次也有可能被發(fā)送藤抡。所以就算把批次大小設(shè)置得很大, 也不會(huì)造成延遲抹估,只是會(huì)占用更多的內(nèi)存而已。但如果設(shè)置得太小弄兜,因?yàn)樯a(chǎn)者需要更頻繁地發(fā)送消息药蜻,會(huì)增加一些額外的開(kāi)銷。
6. linger.ms
該參數(shù)指定了生產(chǎn)者在發(fā)送批次之前等待更多消息加入批次的時(shí)間替饿。 KafkaProducer 會(huì)在批次填滿或 linger.ms達(dá)到上限時(shí)把批次發(fā)送出去语泽。默認(rèn)情況下,只要有可用的線程视卢, 生產(chǎn)者就會(huì)把消息發(fā)送出去踱卵,就算批次里只有一個(gè)消息。把 linger.ms設(shè)置成比0大的數(shù)据过, 讓生產(chǎn)者在發(fā)送批次之前等待一會(huì)兒惋砂,使更多的消息加入到這個(gè)批次 。雖然這樣會(huì)增加延遲绳锅,但也會(huì)提升吞吐量(因?yàn)橐淮涡园l(fā)送更多的消息西饵,每個(gè)消息的開(kāi)銷就變小了)。
7. client.id
該參數(shù)可以是任意的字符串鳞芙,服務(wù)器會(huì)用它來(lái)識(shí)別消息的來(lái)源眷柔,還可以用在日志和配額指標(biāo)里。
8. max.in.flight.requests.per.connection
該參數(shù)指定了生產(chǎn)者在收到服務(wù)器晌應(yīng)之前可以發(fā)送多少個(gè)消息原朝。它的值越高驯嘱,就會(huì)占用越多的內(nèi)存,不過(guò)也會(huì)提升吞吐量喳坠。 把它設(shè)為 1 可以保證消息是按照發(fā)送的順序?qū)懭敕?wù)器的鞠评,即使發(fā)生了重試。
9. timeout.ms壕鹉、 request.timeout.ms 和 metadata.fetch.timeout.ms
request.timeout.ms指定了生產(chǎn)者在發(fā)送數(shù)據(jù)時(shí)等待服務(wù)器返回響應(yīng)的時(shí)間谢澈,metadata.fetch.timeout.ms指定了生產(chǎn)者在獲取元數(shù)據(jù)(比如目標(biāo)分區(qū)的首領(lǐng)是誰(shuí))時(shí)等待服務(wù)器返回響應(yīng)的時(shí)間煌贴。如果等待響應(yīng)超時(shí),那么生產(chǎn)者要么重試發(fā)送數(shù)據(jù)锥忿,要么返回 一個(gè)錯(cuò)誤 (拋出異撑V#或執(zhí)行回調(diào))。timeout.ms 指定了 broker 等待同步副本返回消息確認(rèn)的時(shí)間敬鬓,與 asks 的配置相匹配一一如果在指定時(shí)間內(nèi)沒(méi)有收到同步副本的確認(rèn)淹朋,那么 broker就會(huì)返回 一個(gè)錯(cuò)誤 。
10. max.block.ms
該參數(shù)指定了在調(diào)用 send() 方法或使用 parttitionFor() 方能獲取元數(shù)據(jù)時(shí)生產(chǎn)者的阻塞 時(shí)間钉答。當(dāng)生產(chǎn)者的發(fā)送緩沖區(qū)已捕础芍,或者沒(méi)有可用的元數(shù)據(jù)時(shí),這些方屈就會(huì)阻塞数尿。在阻塞時(shí)間達(dá)到 max.block.ms時(shí)仑性,生產(chǎn)者會(huì)拋出超時(shí)異常。
11 . max.request.size
該參數(shù)用于控制生產(chǎn)者發(fā)送的請(qǐng)求大小右蹦。它可以指能發(fā)送的單個(gè)消息的最大值诊杆,也可以指單個(gè)請(qǐng)求里所有消息總的大小。例如何陆,假設(shè)這個(gè)值為 1MB晨汹,那么可以發(fā)送的單個(gè)最大消息為 1MB,或者生產(chǎn)者可以在單個(gè)請(qǐng)求里發(fā)送一個(gè)批次贷盲,該批次包含了 1000個(gè)消息淘这,每個(gè)消息大小為 1KB 。另外巩剖, broker對(duì)可接收的消息最大值也有自己的限制( message.max.bytes)铝穷,所以兩邊的配置最好可以匹配,避免生產(chǎn)者發(fā)送的消息被 broker拒絕 佳魔。
12. receive.buffer.bytes 和 send.buffer.bytes
這兩個(gè)參數(shù)分別指定了 TCP socket接收和發(fā)送數(shù)據(jù)包的緩沖區(qū)大小 氧骤。 如果它們被設(shè)為 -1 , 就使用操作系統(tǒng)的默認(rèn)值。如果生產(chǎn)者或消費(fèi)者與 broker處于不同的數(shù)據(jù)中心吃引,那么可以適當(dāng)增大這些值筹陵,因?yàn)榭鐢?shù)據(jù)中心的網(wǎng)絡(luò)一般都有比較高的延遲和比較低的帶寬。
順序保證
Kafka可以保證同一個(gè)分區(qū)里的消息是有序的镊尺。也就是說(shuō)朦佩,如果生產(chǎn)者按照一定的順序發(fā)送消息, broker就會(huì)按照這個(gè)順序把它們寫入分區(qū)庐氮,消費(fèi)者也會(huì)按照同樣的順序讀取它們语稠。在某些情況下 , 順序是非常重要的。如果把retries 設(shè)為非零整數(shù)仙畦,同時(shí)把 max.in.flight.requests.per.connection 設(shè)為比 1大的數(shù)输涕,那么,如果第一個(gè)批次消息寫入失敗慨畸,而第二個(gè)批次寫入成功莱坎, broker會(huì)重試寫入第一個(gè)批次。如果此時(shí)第一個(gè)批次也寫入成功寸士,那 么兩個(gè)批次的順序就反過(guò)來(lái)了檐什。
一般來(lái)說(shuō),如果某些場(chǎng)景要求消息是有序的弱卡,那么消息是否寫入成功也是 很關(guān)鍵的乃正,所以不建議把順序是非常重要的。如果把retries 設(shè)為 0婶博∥途撸可以把 max.in.flight.requests.per.connection設(shè)為 1,這樣在生產(chǎn)者嘗試發(fā)送第一批消息時(shí)凡人,就不會(huì)有其他的消息發(fā)送給 broker名党。不過(guò)這樣會(huì)嚴(yán)重影響生產(chǎn)者的吞吐量 ,所以只有在 對(duì)消息的順序有嚴(yán)格要求的情況下才能這么做划栓。