【Kafka】《Kafka權(quán)威指南》——寫數(shù)據(jù)

不管是把 Kafka 作為消息隊(duì)列、消息担忧、總線還是數(shù)據(jù)存儲(chǔ)平臺(tái)來(lái)使用 偷拔,總是需要有一個(gè)可以往 Kafka 寫入數(shù)據(jù)的生產(chǎn)者和一個(gè)可以從 Kafka讀取數(shù)據(jù)的消費(fèi)者,或者一個(gè)兼具兩種角 色的應(yīng)用程序厢绝。

例如,在一個(gè)信用卡事務(wù)處理系統(tǒng)里剿骨,有一個(gè)客戶端應(yīng)用程序代芜,它可能是一個(gè)在線商店, 每當(dāng)有支付行為發(fā)生時(shí)浓利,它負(fù)責(zé)把事務(wù)發(fā)送到 Kafka上挤庇。另一個(gè)應(yīng)用程序根據(jù)規(guī)則引擎檢 查這個(gè)事務(wù),決定是批準(zhǔn)還是拒絕贷掖。 批準(zhǔn)或拒絕的響應(yīng)消息被寫回 Kafka嫡秕,然后發(fā)送給發(fā)起事務(wù)的在線商店。第三個(gè)應(yīng)用程序從 Kafka上讀取事務(wù)和審核狀態(tài)苹威,把它們保存到數(shù)據(jù) 庫(kù)昆咽, 隨后分析師可以對(duì)這些結(jié)果進(jìn)行分析,或許還能借此改進(jìn)規(guī)則引擎 牙甫。

開(kāi)發(fā)者們可以使用 Kafka 內(nèi)置的客戶端 API開(kāi)發(fā) Kafka應(yīng)用程序掷酗。

在這一章,我們將從 Kafra生產(chǎn)者的設(shè)計(jì)和組件講起窟哺,學(xué)習(xí)如何使用 Kafka生產(chǎn)者泻轰。我們將展示如何創(chuàng)建 KafkaProducer和 ProducerRecords對(duì)象、如何將記錄發(fā)送給 Kafka且轨,以及如何處理從 Kafka 返回的錯(cuò)誤浮声,然后介紹用干控制生產(chǎn)者行為的重要配置選項(xiàng),最后深入 探討如何使用不同的分區(qū)方法和序列化器旋奢,以及如何自定義序列化器和分區(qū)器 泳挥。

在下一章,我們將會(huì)介紹 Kafra的悄費(fèi)者客戶端至朗,以及如何從 Kafka讀取消息屉符。

生產(chǎn)者概覽

一個(gè)應(yīng)用程序在很多情況下需要往 Kafka 寫入消息 : 記錄用戶的活動(dòng)(用于審計(jì)和分析 )、 記錄度量指標(biāo)、保存日志筑煮、消息辛蚊、記錄智能家電的信息、與其他應(yīng)用程序進(jìn)行異步通信真仲、 緩沖即將寫入到數(shù)據(jù)庫(kù)的數(shù)據(jù)袋马,等等。

多樣的使用場(chǎng)景意味著多樣的需求:是否每個(gè)消息都很重要?是否允許丟失 一 小部分消息?偶爾出現(xiàn)重復(fù)消息是否可以接受?是否有嚴(yán)格的延遲和吞吐量要求?

在之前提到的信用卡事務(wù)處理系統(tǒng)里秸应,消息丟失或消息重復(fù)是不允許的虑凛,可以接受的延遲最大為 500ms,對(duì)吞吐量要求較高软啼,我們希望每秒鐘可以處理一百萬(wàn)個(gè)消息桑谍。

保存網(wǎng)站的點(diǎn)擊信息是另 一種使用場(chǎng)景。在這個(gè)場(chǎng)景里祸挪,允許丟失少量的消息或出現(xiàn)少量 的消息重復(fù)锣披,延遲可以高一些,只要不影響用戶體驗(yàn)就行贿条。換句話說(shuō)雹仿,只要用戶點(diǎn)擊鏈接 后可以馬上加載頁(yè)面,那么我們并不介意消息要在幾秒鐘之后才能到達(dá) Kafka 服務(wù)器整以。 吞 吐量則取決于網(wǎng)站用戶使用網(wǎng)站的頻度胧辽。

不同的使用場(chǎng)景對(duì)生產(chǎn)者 API 的使用和配置會(huì)有直接的影響。

盡管生產(chǎn)者 API 使用起來(lái)很簡(jiǎn)單 公黑,但消息的發(fā)送過(guò)程還是有點(diǎn)復(fù)雜的邑商。下圖展示 了向Kafka 發(fā)送消息的主要步驟。

image

Kafka 生產(chǎn)者組件圖

我們從創(chuàng)建 一個(gè) ProducerRecord 對(duì)象開(kāi)始凡蚜, ProducerRecord 對(duì)象需要包含目標(biāo)主題和要發(fā)送的內(nèi)容人断。我們還可以指定鍵或分區(qū)。在發(fā)送 ProducerRecord對(duì)象時(shí)朝蜘,生產(chǎn)者要先把鍵和 值對(duì)象序列化成字節(jié)數(shù)組含鳞,這樣它們才能夠在網(wǎng)絡(luò)上傳輸 。

接下來(lái)芹务,數(shù)據(jù)被傳給分區(qū)器。如果之前在 ProducerRecord對(duì)象里指定了分區(qū)鸭廷,那么分區(qū)器就不會(huì)再做任何事情枣抱,直接把指定的分區(qū)返回。如果沒(méi)有指定分區(qū) 辆床,那么分區(qū)器會(huì)根據(jù) ProducerRecord對(duì)象的鍵來(lái)選擇一個(gè)分區(qū) 佳晶。選好分區(qū)以后 ,生產(chǎn)者就知道該往哪個(gè)主題和分區(qū)發(fā)送這條記錄了讼载。緊接著轿秧,這條記錄被添加到一個(gè)記錄批次里中跌,這個(gè)批次里的所有消息會(huì)被發(fā)送到相同的主題和分區(qū)上。有一個(gè)獨(dú)立的線程負(fù)責(zé)把這些記錄批次發(fā)送到相應(yīng)的 broker 上菇篡。

服務(wù)器在收到這些消息時(shí)會(huì)返回一個(gè)響應(yīng)漩符。如果消息成功寫入 Kafka,就返回 一 個(gè) RecordMetaData 對(duì)象驱还,它包含了主題和分區(qū)信息嗜暴,以及記錄在分區(qū)里的偏移量。如果寫入 失敗议蟆, 就會(huì)返回 一個(gè)錯(cuò)誤 闷沥。生產(chǎn)者在收到錯(cuò)誤之后會(huì)嘗試重新發(fā)送消息,幾次之后如果還是失敗咐容,就返回錯(cuò)誤信息舆逃。

創(chuàng)建Kafka生產(chǎn)者

要往 Kafka寫入消息,首先要?jiǎng)?chuàng)建一個(gè)生產(chǎn)者對(duì)象戳粒,井設(shè)置一些屬性路狮。

下面的代碼片段展示了如何創(chuàng)建一個(gè)新的生產(chǎn)者,這里只指定了必要的屬性享郊,其他使用默認(rèn)設(shè)置览祖。

private Properties kafkaProps = new Properties(); 

kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092");
 
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
 
kafkaProps.put("value.seializer","org.apache.kafka.common.serialization.StringSerializer");
 
producer = new KafkaProducer<String, String>(kafkaProps);

Kafka生產(chǎn)者有 3個(gè)必選的屬性

bootstrap.servers

該屬性指定 broker 的地址清單,地址的格式為 host:port炊琉。清單里不需要包含所有的broker地址展蒂,生產(chǎn)者會(huì)從給定的 broker里查找到其他 broker的信息。不過(guò)建議至少要提供兩個(gè) broker的信息苔咪, 一旦其中一個(gè)宕機(jī)锰悼,生產(chǎn)者仍然能夠連接到集群上。

key.serializer

broker希望接收到的消息的鍵和值都是字節(jié)數(shù)組团赏。生產(chǎn)者接口允許使用參數(shù)化類型箕般,因此可以把 Java對(duì)象作為鍵和值發(fā)送給 broker。這樣的代碼具有良好的可讀性舔清,不過(guò)生產(chǎn)者需要知道如何把這些 Java對(duì)象轉(zhuǎn)換成字節(jié)數(shù)組丝里。 key.serializer必須被設(shè)置為一個(gè)實(shí)現(xiàn)了org.apache.kafka.common.serialization.Serializer接口的類,生產(chǎn)者會(huì)使用這個(gè)類把鍵對(duì)象序列化成字節(jié)數(shù)組体谒。 Kafka 客戶端默認(rèn)提供了ByteArraySerializer(這個(gè)只做很少的事情)杯聚、 StringSerializer和 IntegerSerializer,因此抒痒,如果你只使用常見(jiàn)的幾種 Java對(duì)象類型幌绍,那么就沒(méi)必要實(shí)現(xiàn)自己的序列化器 。要注意, key.serializer是必須設(shè)置的傀广,就算你打算只發(fā)送值內(nèi)容颁独。

value.serializer

與 key.serializer一樣, value.serializer指定的類會(huì)將值序列化伪冰。如果鍵和值都是字符串誓酒,可以使用與 key.serializer 一樣的序列化器。如果鍵是整數(shù)類型而值是字符扇 糜值, 那么需要使用不同的序列化器丰捷。

發(fā)送消息主要有3種方式:

1、發(fā)送并忘記( fire-and-forget):我們把消息發(fā)送給服務(wù)器寂汇,但井不關(guān)心它是否正常到達(dá)病往。大多數(shù)情況下,消息會(huì)正常到達(dá)骄瓣,因?yàn)?Kafka是高可用的停巷,而且生產(chǎn)者會(huì)自動(dòng)嘗試重發(fā)。不過(guò)榕栏,使用這種方式有時(shí)候也會(huì)丟失一些消息畔勤。

2、同步發(fā)送:我們使用send()方怯發(fā)送消息扒磁, 它會(huì)返回一個(gè)Future對(duì)象庆揪,調(diào)用get()方法進(jìn)行等待, 就可以知道悄息是否發(fā)送成功妨托。

3缸榛、異步發(fā)送:我們調(diào)用 send() 方怯,并指定一個(gè)回調(diào)函數(shù)兰伤, 服務(wù)器在返回響應(yīng)時(shí)調(diào)用該函數(shù)内颗。

在下面的幾個(gè)例子中 , 我們會(huì)介紹如何使用上述幾種方式來(lái)發(fā)送消息敦腔,以及如何處理可能 發(fā)生的異常情況均澳。

本章的所有例子都使用單線程,但其實(shí)生產(chǎn)者是可以使用多線程來(lái)發(fā)送消息的符衔。剛開(kāi)始的 時(shí)候可以使用單個(gè)消費(fèi)者和單個(gè)線程找前。如果需要更高的吞吐量,可以在生產(chǎn)者數(shù)量不變的 前提下增加線程數(shù)量判族。如果這樣做還不夠 躺盛, 可以增加生產(chǎn)者數(shù)量。

發(fā)送消息到Kafka

最簡(jiǎn)單的同步發(fā)送消息方式如下所示 :

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try{
  producer.send(record);
} catch(Exception e) {
  e.printStack();
}

生產(chǎn)者的 send() 方住將 ProducerRecord對(duì)象作為參數(shù)五嫂,它需要目標(biāo)主題的名字和要發(fā)送的鍵和值對(duì)象,它們都是字符串。鍵和值對(duì)象的類型必須與序列化器和生產(chǎn)者對(duì)象相匹配沃缘。

我們使用生產(chǎn)者的 send() 方越發(fā)送 ProducerRecord對(duì)象躯枢。從生產(chǎn)者的架構(gòu)圖里可以看到,消息先是被放進(jìn)緩沖區(qū)槐臀,然后使用單獨(dú)的線程發(fā)送到服務(wù)器端锄蹂。 send() 方法會(huì)返回一個(gè)包含 RecordMetadata 的 Future對(duì)象,不過(guò)因?yàn)槲覀儠?huì)忽略返回值水慨,所以無(wú)法知道消息是否發(fā)送成功得糜。如果不關(guān)心發(fā)送結(jié)果,那么可以使用這種發(fā)送方式晰洒。比如朝抖,記錄 Twitter 消息日志,或記錄不太重要的應(yīng)用程序日志谍珊。

我們可以忽略發(fā)送消息時(shí)可能發(fā)生的錯(cuò)誤或在服務(wù)器端可能發(fā)生的錯(cuò)誤治宣,但在發(fā)送消息之前,生產(chǎn)者還是有可能發(fā)生其他的異常砌滞。這些異常有可能是 SerializationException (說(shuō)明序列化消息失敗)侮邀、 BufferExhaustedException 或 TimeoutException (說(shuō)明緩沖區(qū)已滿),又或者是 InterruptException (說(shuō)明發(fā)送線程被中斷)贝润。

同步發(fā)送消息

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try{
    producer.send(record).get();
} catch(Exception e) {
    e.printStack();
}

在這里绊茧, producer.send() 方住先返回一個(gè) Future對(duì)象,然后調(diào)用 Future對(duì)象的 get() 方法等待 Kafka 響應(yīng)打掘。如果服務(wù)器返回錯(cuò)誤华畏, get()方怯會(huì)拋出異常。如果沒(méi)有發(fā)生錯(cuò)誤胧卤,我們會(huì)得到一個(gè) RecordMetadata對(duì)象唯绍,可以用它獲取消息的偏移量。如果在發(fā)送數(shù)據(jù)之前或者在發(fā)送過(guò)程中發(fā)生了任何錯(cuò)誤 枝誊,比如 broker返回 了一個(gè)不允許重發(fā)消息的異晨雒ⅲ或者已經(jīng)超過(guò)了重發(fā)的次數(shù) ,那么就會(huì)拋出異常叶撒。我們只是簡(jiǎn)單地把異常信息打印出來(lái)绝骚。

如何處理從Kafka生產(chǎn)者返回的錯(cuò)誤

KafkaProducer一般會(huì)發(fā)生兩類錯(cuò)誤。其中一類是可重試錯(cuò)誤 祠够,這類錯(cuò)誤可以通過(guò)重發(fā)消息來(lái)解決压汪。比如對(duì)于連接錯(cuò)誤,可以通過(guò)再次建立連接來(lái)解決古瓤,“無(wú)主(noleader)” 錯(cuò)誤則可 以通過(guò)重新為分區(qū)選舉首領(lǐng)來(lái)解決止剖。 KafkaProducer可以被配置成自動(dòng)重試腺阳,如果在多次重試后仍無(wú)能解決問(wèn)題,應(yīng)用程序會(huì)收到一個(gè)重試異常穿香。另一類錯(cuò)誤無(wú)出通過(guò)重試解決 亭引,比如“消息太大”異常。對(duì)于這類錯(cuò)誤皮获, KafkaProducer不會(huì)進(jìn)行任何重試焙蚓,直接拋出異常。

異步發(fā)送消息

假設(shè)消息在應(yīng)用程序和 Kafka集群之間一個(gè)來(lái)回需要 10ms洒宝。如果在發(fā)送完每個(gè)消息后都等待回應(yīng)购公,那么發(fā)送 100個(gè)消息需要 1秒。但如果只發(fā)送消息而不等待響應(yīng)雁歌,那么發(fā)送100個(gè)消息所需要的時(shí)間會(huì)少很多宏浩。大多數(shù)時(shí)候,我們并不需要等待響應(yīng)——盡管 Kafka 會(huì)把目標(biāo)主題将宪、分區(qū)信息和消息的偏移量發(fā)送回來(lái)绘闷,但對(duì)于發(fā)送端的應(yīng)用程序來(lái)說(shuō)不是必需的。不過(guò)在遇到消息發(fā)送失敗時(shí)较坛,我們需要拋出異常印蔗、記錄錯(cuò)誤日志,或者把消息寫入 “錯(cuò)誤消息”文件以便日后分析丑勤。

為了在異步發(fā)送消息的同時(shí)能夠?qū)Ξ惓G闆r進(jìn)行處理华嘹,生產(chǎn)者提供了回調(diào)支持 。下面是使用異步發(fā)送消息法竞、回調(diào)的一個(gè)例子耙厚。

生產(chǎn)者的配置

到目前為止 , 我們只介紹了生產(chǎn)者的幾個(gè)必要配置參數(shù)——bootstrap.servers API 以及序列化器岔霸。

生產(chǎn)者還有很多可配置的參數(shù)薛躬,在 Kafka文檔里都有說(shuō)明,它們大部分都有合理的默認(rèn)值 呆细, 所以沒(méi)有必要去修改它們 型宝。不過(guò)有幾個(gè)參數(shù)在內(nèi)存使用、性能和可靠性方面對(duì)生產(chǎn)者影響比較大絮爷,接下來(lái)我們會(huì)一一說(shuō)明趴酣。

1. acks

acks 參數(shù)指定了必須要有多少個(gè)分區(qū)副本收到消息,生產(chǎn)者才會(huì)認(rèn)為消息寫入是成功的坑夯。

這個(gè)參數(shù)對(duì)消息丟失的可能性有重要影響岖寞。 該參數(shù)有如下選項(xiàng)。
? 如果 acks=0柜蜈, 生產(chǎn)者在成功寫入悄息之前不會(huì)等待任何來(lái)自服務(wù)器的響應(yīng)仗谆。也就是說(shuō)指巡, 如果當(dāng)中出現(xiàn)了問(wèn)題 , 導(dǎo)致服務(wù)器沒(méi)有收到消息隶垮,那么生產(chǎn)者就無(wú)從得知厌处,消息也就丟 失了。不過(guò)岁疼,因?yàn)樯a(chǎn)者不需要等待服務(wù)器的響應(yīng),所以它可以以網(wǎng)絡(luò)能夠支持的最大 速度發(fā)送消息缆娃,從而達(dá)到很高的吞吐量捷绒。

? 如果 acks=1,只要集群的首領(lǐng)節(jié)點(diǎn)收到消息贯要,生產(chǎn)者就會(huì)收到 一個(gè)來(lái)自服務(wù)器的成功 響應(yīng)暖侨。如果消息無(wú)撞到達(dá)首領(lǐng)節(jié)點(diǎn)(比如首領(lǐng)節(jié)點(diǎn)崩憤,新的首領(lǐng)還沒(méi)有被選舉出來(lái))崇渗, 生產(chǎn)者會(huì)收到一個(gè)錯(cuò)誤響應(yīng)字逗,為了避免數(shù)據(jù)丟失,生產(chǎn)者會(huì)重發(fā)消息宅广。不過(guò)葫掉,如果一個(gè) 沒(méi)有收到消息的節(jié)點(diǎn)成為新首領(lǐng),消息還是會(huì)丟失跟狱。這個(gè)時(shí)候的吞吐量取決于使用的是 同步發(fā)送還是異步發(fā)送俭厚。如果讓發(fā)送客戶端等待服務(wù)器的響應(yīng)(通過(guò)調(diào)用 Future對(duì)象 的 get()方法),顯然會(huì)增加延遲(在網(wǎng)絡(luò)上傳輸一個(gè)來(lái)回的延遲)驶臊。如果客戶端使用異步回調(diào)挪挤,延遲問(wèn)題就可以得到緩解,不過(guò)吞吐量還是會(huì)受發(fā)送中消息數(shù)量的限制(比如关翎,生 產(chǎn)者在收到服務(wù)器響應(yīng)之前可以發(fā)送多少個(gè)消息)扛门。

? 如果 acks=all,只有當(dāng)所有參與復(fù)制的節(jié)點(diǎn)全部收到消息時(shí)纵寝,生產(chǎn)者才會(huì)收到一個(gè)來(lái)自服務(wù)器的成功響應(yīng)论寨。這種模式是最安全的,它可以保證不止一個(gè)服務(wù)器收到消息店雅,就算有服務(wù)器發(fā)生崩潰政基,整個(gè)集群仍然可以運(yùn)行(第 5 章將討論更多的細(xì)節(jié))。不過(guò)闹啦,它的延遲比 acks=1時(shí)更高沮明,因?yàn)槲覀円却恢灰粋€(gè)服務(wù)器節(jié)點(diǎn)接收消息。

2. buffer.memory

該參數(shù)用來(lái)設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小窍奋,生產(chǎn)者用它緩沖要發(fā)送到服務(wù)器的消息荐健。如果 應(yīng)用程序發(fā)送消息的速度超過(guò)發(fā)送到服務(wù)器的速度酱畅,會(huì)導(dǎo)致生產(chǎn)者空間不足。這個(gè)時(shí)候江场, send()方法調(diào)用要么被阻塞纺酸,要么拋出異常,取決于如何設(shè)置 block.on.buffe.full 參數(shù) (在0.9.0.0版本里被替換成了max.block.ms址否,表示在拋出異常之前可以阻塞一段時(shí)間)餐蔬。

3. compression.type

默認(rèn)情況下捐友,消息發(fā)送時(shí)不會(huì)被壓縮喘蟆。該參數(shù)可以設(shè)置為 snappy、 gzip 或 lz4鹅巍,它指定了消息被發(fā)送給 broker之前使用哪一種壓縮算法進(jìn)行壓縮音同。 snappy 壓縮算怯由 Google巳發(fā)明词爬, 它占用較少 的 CPU,卻能提供較好的性能和相當(dāng)可觀的壓縮比权均,如果比較關(guān)注性能和網(wǎng)絡(luò)帶寬顿膨,可以使用這種算法。 gzip壓縮算法一般會(huì)占用較多的 CPU叽赊,但會(huì)提供更高的壓縮比恋沃,所以如果網(wǎng)絡(luò)帶寬比較有限,可以使用這種算法必指。使用壓縮可以降低網(wǎng)絡(luò)傳輸開(kāi)銷和存儲(chǔ)開(kāi)銷芽唇,而這往往是向 Kafka發(fā)送消息的瓶頸所在。

4. retries

生產(chǎn)者從服務(wù)器收到的錯(cuò)誤有可能是臨時(shí)性的錯(cuò)誤(比如分區(qū)找不到首領(lǐng))取劫。在這種情況下匆笤, retries參數(shù)的值決定了生產(chǎn)者可以重發(fā)消息的次數(shù),如果達(dá)到這個(gè)次數(shù)谱邪,生產(chǎn)者會(huì)放棄重試并返回錯(cuò)誤炮捧。默認(rèn)情況下,生產(chǎn)者會(huì)在每次重試之間等待 1OOms惦银,不過(guò)可以通過(guò) retries.backoff.ms 參數(shù)來(lái)改變這個(gè)時(shí)間間隔咆课。建議在設(shè)置重試次數(shù)和重試時(shí)間間隔之前, 先測(cè)試一下恢復(fù)一個(gè)崩潰節(jié)點(diǎn)需要多少時(shí)間(比如所有分區(qū)選舉出首領(lǐng)需要多長(zhǎng)時(shí)間)扯俱, 讓總的重試時(shí)間比 Kafka集群從崩潰中恢復(fù)的時(shí)間長(zhǎng)书蚪,否則生產(chǎn)者會(huì)過(guò)早地放棄重試。不過(guò)有些錯(cuò)誤不是臨時(shí)性錯(cuò)誤迅栅,沒(méi)辦怯通過(guò)重試來(lái)解決(比如“悄息太大”錯(cuò)誤)殊校。一般情 況下,因?yàn)樯a(chǎn)者會(huì)自動(dòng)進(jìn)行重試读存,所以就沒(méi)必要在代碼邏輯里處理那些可重試的錯(cuò)誤为流。 你只需要處理那些不可重試的錯(cuò)誤或重試次數(shù)超出上限的情況呕屎。

5. batch.size

當(dāng)有多個(gè)消息需要被發(fā)送到同一個(gè)分區(qū)時(shí),生產(chǎn)者會(huì)把它們放在放一個(gè)批次里敬察。該參數(shù)指定了一個(gè)批次可以使用的內(nèi)存大小秀睛,按照字節(jié)數(shù)計(jì)算(而不是消息個(gè)數(shù))。當(dāng)批次被填滿莲祸,批次里的所有消息會(huì)被發(fā)送出去蹂安。不過(guò)生產(chǎn)者井不一定都會(huì)等到批次被填滿才發(fā)送,半捕 的批次锐帜,甚至只包含一個(gè)消息的批次也有可能被發(fā)送藤抡。所以就算把批次大小設(shè)置得很大, 也不會(huì)造成延遲抹估,只是會(huì)占用更多的內(nèi)存而已。但如果設(shè)置得太小弄兜,因?yàn)樯a(chǎn)者需要更頻繁地發(fā)送消息药蜻,會(huì)增加一些額外的開(kāi)銷。

6. linger.ms

該參數(shù)指定了生產(chǎn)者在發(fā)送批次之前等待更多消息加入批次的時(shí)間替饿。 KafkaProducer 會(huì)在批次填滿或 linger.ms達(dá)到上限時(shí)把批次發(fā)送出去语泽。默認(rèn)情況下,只要有可用的線程视卢, 生產(chǎn)者就會(huì)把消息發(fā)送出去踱卵,就算批次里只有一個(gè)消息。把 linger.ms設(shè)置成比0大的數(shù)据过, 讓生產(chǎn)者在發(fā)送批次之前等待一會(huì)兒惋砂,使更多的消息加入到這個(gè)批次 。雖然這樣會(huì)增加延遲绳锅,但也會(huì)提升吞吐量(因?yàn)橐淮涡园l(fā)送更多的消息西饵,每個(gè)消息的開(kāi)銷就變小了)。

7. client.id

該參數(shù)可以是任意的字符串鳞芙,服務(wù)器會(huì)用它來(lái)識(shí)別消息的來(lái)源眷柔,還可以用在日志和配額指標(biāo)里。

8. max.in.flight.requests.per.connection

該參數(shù)指定了生產(chǎn)者在收到服務(wù)器晌應(yīng)之前可以發(fā)送多少個(gè)消息原朝。它的值越高驯嘱,就會(huì)占用越多的內(nèi)存,不過(guò)也會(huì)提升吞吐量喳坠。 把它設(shè)為 1 可以保證消息是按照發(fā)送的順序?qū)懭敕?wù)器的鞠评,即使發(fā)生了重試。

9. timeout.ms壕鹉、 request.timeout.ms 和 metadata.fetch.timeout.ms

request.timeout.ms指定了生產(chǎn)者在發(fā)送數(shù)據(jù)時(shí)等待服務(wù)器返回響應(yīng)的時(shí)間谢澈,metadata.fetch.timeout.ms指定了生產(chǎn)者在獲取元數(shù)據(jù)(比如目標(biāo)分區(qū)的首領(lǐng)是誰(shuí))時(shí)等待服務(wù)器返回響應(yīng)的時(shí)間煌贴。如果等待響應(yīng)超時(shí),那么生產(chǎn)者要么重試發(fā)送數(shù)據(jù)锥忿,要么返回 一個(gè)錯(cuò)誤 (拋出異撑V#或執(zhí)行回調(diào))。timeout.ms 指定了 broker 等待同步副本返回消息確認(rèn)的時(shí)間敬鬓,與 asks 的配置相匹配一一如果在指定時(shí)間內(nèi)沒(méi)有收到同步副本的確認(rèn)淹朋,那么 broker就會(huì)返回 一個(gè)錯(cuò)誤 。

10. max.block.ms

該參數(shù)指定了在調(diào)用 send() 方法或使用 parttitionFor() 方能獲取元數(shù)據(jù)時(shí)生產(chǎn)者的阻塞 時(shí)間钉答。當(dāng)生產(chǎn)者的發(fā)送緩沖區(qū)已捕础芍,或者沒(méi)有可用的元數(shù)據(jù)時(shí),這些方屈就會(huì)阻塞数尿。在阻塞時(shí)間達(dá)到 max.block.ms時(shí)仑性,生產(chǎn)者會(huì)拋出超時(shí)異常。

11 . max.request.size

該參數(shù)用于控制生產(chǎn)者發(fā)送的請(qǐng)求大小右蹦。它可以指能發(fā)送的單個(gè)消息的最大值诊杆,也可以指單個(gè)請(qǐng)求里所有消息總的大小。例如何陆,假設(shè)這個(gè)值為 1MB晨汹,那么可以發(fā)送的單個(gè)最大消息為 1MB,或者生產(chǎn)者可以在單個(gè)請(qǐng)求里發(fā)送一個(gè)批次贷盲,該批次包含了 1000個(gè)消息淘这,每個(gè)消息大小為 1KB 。另外巩剖, broker對(duì)可接收的消息最大值也有自己的限制( message.max.bytes)铝穷,所以兩邊的配置最好可以匹配,避免生產(chǎn)者發(fā)送的消息被 broker拒絕 佳魔。

12. receive.buffer.bytes 和 send.buffer.bytes

這兩個(gè)參數(shù)分別指定了 TCP socket接收和發(fā)送數(shù)據(jù)包的緩沖區(qū)大小 氧骤。 如果它們被設(shè)為 -1 , 就使用操作系統(tǒng)的默認(rèn)值。如果生產(chǎn)者或消費(fèi)者與 broker處于不同的數(shù)據(jù)中心吃引,那么可以適當(dāng)增大這些值筹陵,因?yàn)榭鐢?shù)據(jù)中心的網(wǎng)絡(luò)一般都有比較高的延遲和比較低的帶寬。

順序保證

Kafka可以保證同一個(gè)分區(qū)里的消息是有序的镊尺。也就是說(shuō)朦佩,如果生產(chǎn)者按照一定的順序發(fā)送消息, broker就會(huì)按照這個(gè)順序把它們寫入分區(qū)庐氮,消費(fèi)者也會(huì)按照同樣的順序讀取它們语稠。在某些情況下 , 順序是非常重要的。如果把retries 設(shè)為非零整數(shù)仙畦,同時(shí)把 max.in.flight.requests.per.connection 設(shè)為比 1大的數(shù)输涕,那么,如果第一個(gè)批次消息寫入失敗慨畸,而第二個(gè)批次寫入成功莱坎, broker會(huì)重試寫入第一個(gè)批次。如果此時(shí)第一個(gè)批次也寫入成功寸士,那 么兩個(gè)批次的順序就反過(guò)來(lái)了檐什。

一般來(lái)說(shuō),如果某些場(chǎng)景要求消息是有序的弱卡,那么消息是否寫入成功也是 很關(guān)鍵的乃正,所以不建議把順序是非常重要的。如果把retries 設(shè)為 0婶博∥途撸可以把 max.in.flight.requests.per.connection設(shè)為 1,這樣在生產(chǎn)者嘗試發(fā)送第一批消息時(shí)凡人,就不會(huì)有其他的消息發(fā)送給 broker名党。不過(guò)這樣會(huì)嚴(yán)重影響生產(chǎn)者的吞吐量 ,所以只有在 對(duì)消息的順序有嚴(yán)格要求的情況下才能這么做划栓。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市条获,隨后出現(xiàn)的幾起案子忠荞,更是在濱河造成了極大的恐慌,老刑警劉巖帅掘,帶你破解...
    沈念sama閱讀 218,386評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件委煤,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡修档,警方通過(guò)查閱死者的電腦和手機(jī)碧绞,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,142評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)吱窝,“玉大人讥邻,你說(shuō)我怎么就攤上這事≡合浚” “怎么了兴使?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,704評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)照激。 經(jīng)常有香客問(wèn)我发魄,道長(zhǎng),這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,702評(píng)論 1 294
  • 正文 為了忘掉前任励幼,我火速辦了婚禮汰寓,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘苹粟。我一直安慰自己有滑,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,716評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布六水。 她就那樣靜靜地躺著俺孙,像睡著了一般。 火紅的嫁衣襯著肌膚如雪掷贾。 梳的紋絲不亂的頭發(fā)上睛榄,一...
    開(kāi)封第一講書(shū)人閱讀 51,573評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音想帅,去河邊找鬼场靴。 笑死,一個(gè)胖子當(dāng)著我的面吹牛港准,可吹牛的內(nèi)容都是我干的旨剥。 我是一名探鬼主播,決...
    沈念sama閱讀 40,314評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼浅缸,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼轨帜!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起衩椒,我...
    開(kāi)封第一講書(shū)人閱讀 39,230評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤蚌父,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后毛萌,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體苟弛,經(jīng)...
    沈念sama閱讀 45,680評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,873評(píng)論 3 336
  • 正文 我和宋清朗相戀三年阁将,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了膏秫。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,991評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡做盅,死狀恐怖缤削,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情吹榴,我是刑警寧澤僻他,帶...
    沈念sama閱讀 35,706評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站腊尚,受9級(jí)特大地震影響吨拗,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,329評(píng)論 3 330
  • 文/蒙蒙 一劝篷、第九天 我趴在偏房一處隱蔽的房頂上張望哨鸭。 院中可真熱鬧,春花似錦娇妓、人聲如沸像鸡。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,910評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)只估。三九已至,卻和暖如春着绷,著一層夾襖步出監(jiān)牢的瞬間蛔钙,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,038評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工荠医, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留吁脱,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,158評(píng)論 3 370
  • 正文 我出身青樓彬向,卻偏偏與公主長(zhǎng)得像兼贡,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子娃胆,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,941評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 不管是把 Kafka 作為消息隊(duì)列遍希、消息、總線還是數(shù)據(jù)存儲(chǔ)平臺(tái)來(lái)使用 里烦,總是需要有一個(gè)可以往 Kafka 寫入數(shù)據(jù)...
    消失er閱讀 11,243評(píng)論 1 5
  • 本章我們將會(huì)討論Kafka生產(chǎn)者是如何發(fā)送消息到Kafka的凿蒜。Kafka項(xiàng)目有一個(gè)生產(chǎn)者客戶端,我們可以通過(guò)這個(gè)客...
    __元昊__閱讀 664評(píng)論 0 0
  • 本章我們將會(huì)討論Kafka生產(chǎn)者是如何發(fā)送消息到Kafka的招驴。Kafka項(xiàng)目有一個(gè)生產(chǎn)者客戶端篙程,我們可以通過(guò)這個(gè)客...
    zwb_jianshu閱讀 475評(píng)論 0 0
  • 本章我們將會(huì)討論Kafka生產(chǎn)者是如何發(fā)送消息到Kafka的枷畏。Kafka項(xiàng)目有一個(gè)生產(chǎn)者客戶端别厘,我們可以通過(guò)這個(gè)客...
    printf200閱讀 7,963評(píng)論 0 3
  • 人有三必窮:為上則不能愛(ài)下渴肉,為下則好非其上冗懦,是人之一必窮也。鄉(xiāng)(向)則不若(順)仇祭,偝(背)則謾之披蕉,世人只二必窮也。...
    同聞悅讀閱讀 5,205評(píng)論 2 9