Kafka 0.9版本正式使用Java版本的producer替換了原Scala版本的producer。
1坊饶、Kafka Producer工作流程
注:ProducerRecord允許用戶在創(chuàng)建消息對象的時候就直接指定要發(fā)送的分區(qū)泄伪,這樣producer后續(xù)發(fā)送該消息時可以直接發(fā)送到指定分區(qū),而不用先通過Partitioner計算目標(biāo)分區(qū)了匿级。另外蟋滴,我們還可以直接指定消息的時間戳——但一定要慎重使用這個功能,因為它有可能會令時間戳索引機制失效痘绎。
流程描述:
用戶首先構(gòu)建待發(fā)送的消息對象ProducerRecord津函,然后調(diào)用KafkaProducer#send方法進行發(fā)送。KafkaProducer接收到消息后首先對其進行序列化孤页,然后結(jié)合本地緩存的元數(shù)據(jù)信息一起發(fā)送給partitioner去確定目標(biāo)分區(qū)尔苦,最后追加寫入到內(nèi)存中的消息緩沖池(accumulator)。此時KafkaProducer#send方法成功返回。同時允坚,KafkaProducer中還有一個專門的Sender IO線程負(fù)責(zé)將緩沖池中的消息分批次發(fā)送給對應(yīng)的broker魂那,完成真正的消息發(fā)送邏輯。
新版本的producer從設(shè)計上來說具有以下幾個特點:
總共創(chuàng)建兩個線程:執(zhí)行KafkaPrducer#send邏輯的線程——我們稱之為“用戶主線程”稠项;執(zhí)行發(fā)送邏輯的IO線程——我們稱之為“Sender線程”涯雅。
不同于Scala老版本的producer,新版本producer完全異步發(fā)送消息展运,并提供了回調(diào)機制(callback)供用戶判斷消息是否成功發(fā)送活逆。
batching機制——“分批發(fā)送“機制。每個批次(batch)中包含了若干個PRODUCE請求乐疆,因此具有更高的吞吐量划乖。
更加合理的默認(rèn)分區(qū)策略:對于無key消息而言,Scala版本分區(qū)策略是一段時間內(nèi)(默認(rèn)是10分鐘)將消息發(fā)往固定的目標(biāo)分區(qū)挤土,這容易造成消息分布的不均勻琴庵,而新版本的producer采用輪詢的方式均勻地將消息分發(fā)到不同的分區(qū)。
底層統(tǒng)一使用基于Selector的網(wǎng)絡(luò)客戶端實現(xiàn)仰美,結(jié)合Java提供的Future實現(xiàn)完整地提供了更加健壯和優(yōu)雅的生命周期管理迷殿。
關(guān)鍵參數(shù)
batch.size 我把它列在了首位,因為該參數(shù)對于調(diào)優(yōu)producer至關(guān)重要咖杂。之前提到過新版producer采用分批發(fā)送機制庆寺,該參數(shù)即控制一個batch的大小。默認(rèn)是16KB
acks關(guān)乎到消息持久性(durability)的一個參數(shù)诉字。高吞吐量和高持久性很多時候是相矛盾的懦尝,需要先明確我們的目標(biāo)是什么? 高吞吐量壤圃?高持久性陵霉?亦或是中等?因此該參數(shù)也有對應(yīng)的三個取值:0伍绳, -1和1
linger.ms減少網(wǎng)絡(luò)IO踊挠,節(jié)省帶寬之用。原理就是把原本需要多次發(fā)送的小batch冲杀,通過引入延時的方式合并成大batch發(fā)送效床,減少了網(wǎng)絡(luò)傳輸?shù)膲毫Γ瑥亩嵘掏铝咳ㄋ.?dāng)然剩檀,也會引入延時
compression.type producer所使用的壓縮器,目前支持gzip, snappy和lz4旺芽。壓縮是在用戶主線程完成的谨朝,通常都需要花費大量的CPU時間卤妒,但對于減少網(wǎng)絡(luò)IO來說確實利器。生產(chǎn)環(huán)境中可以結(jié)合壓力測試進行適當(dāng)配置
max.in.flight.requests.per.connection 關(guān)乎消息亂序的一個配置參數(shù)字币。它指定了Sender線程在單個Socket連接上能夠發(fā)送未應(yīng)答PRODUCE請求的最大請求數(shù)则披。適當(dāng)增加此值通常會增大吞吐量,從而整體上提升producer的性能洗出。不過筆者始終覺得其效果不如調(diào)節(jié)batch.size來得明顯士复,所以請謹(jǐn)慎使用。另外如果開啟了重試機制翩活,配置該參數(shù)大于1可能造成消息發(fā)送的亂序(先發(fā)送A阱洪,然后發(fā)送B,但B卻先行被broker接收)
retries 重試機制菠镇,對于瞬時失敗的消息發(fā)送冗荸,開啟重試后KafkaProducer會嘗試再次發(fā)送消息。對于有強烈無消息丟失需求的用戶來說利耍,開啟重試機制是必選項蚌本。
2、內(nèi)部流程
當(dāng)用戶調(diào)用KafkaProducer.send(ProducerRecord, Callback)時Kafka內(nèi)部流程分析:
(1)隘梨、Step 1: 序列化+計算目標(biāo)分區(qū)
這是KafkaProducer#send邏輯的第一步程癌,即為待發(fā)送消息進行序列化并計算目標(biāo)分區(qū),如下圖所示:
如上圖所示轴猎,一條所屬topic是"test"嵌莉,消息體是"message"的消息被序列化之后結(jié)合KafkaProducer緩存的元數(shù)據(jù)(比如該topic分區(qū)數(shù)信息等)共同傳給后面的Partitioner實現(xiàn)類進行目標(biāo)分區(qū)的計算。
(2)捻脖、 Step 2: 追加寫入消息緩沖區(qū)(accumulator)
producer創(chuàng)建時會創(chuàng)建一個默認(rèn)32MB(由buffer.memory參數(shù)指定)的accumulator緩沖區(qū)锐峭,專門保存待發(fā)送的消息。除了之前在“關(guān)鍵參數(shù)”段落中提到的linger.ms和batch.size等參數(shù)之外可婶,該數(shù)據(jù)結(jié)構(gòu)中還包含了一個特別重要的集合信息:消息批次信息(batches)沿癞。該集合本質(zhì)上是一個HashMap,里面分別保存了每個topic分區(qū)下的batch隊列扰肌,即前面說的批次是按照topic分區(qū)進行分組的抛寝。這樣發(fā)往不同分區(qū)的消息保存在對應(yīng)分區(qū)下的batch隊列中熊杨。舉個簡單的例子曙旭,假設(shè)消息M1, M2被發(fā)送到test的0分區(qū)但屬于不同的batch,M3分送到test的1分區(qū)晶府,那么batches中包含的信息就是:{"test-0" -> [batch1, batch2], "test-1" -> [batch3]}桂躏。
單個topic分區(qū)下的batch隊列中保存的是若干個消息批次。每個batch中最重要的3個組件包括:
compressor: 負(fù)責(zé)執(zhí)行追加寫入操作
batch緩沖區(qū):由batch.size參數(shù)控制川陆,消息被真正追加寫入到的地方
thunks:保存消息回調(diào)邏輯的集合
這一步的目的就是將待發(fā)送的消息寫入消息緩沖池中剂习,具體流程如下圖所示:
這一步執(zhí)行完畢之后理論上講KafkaProducer.send方法就執(zhí)行完畢了,用戶主線程所做的事情就是等待Sender線程發(fā)送消息并執(zhí)行返回結(jié)果了。
(3)鳞绕、Step 3: Sender線程預(yù)處理及消息發(fā)送
此時失仁,該Sender線程登場了。嚴(yán)格來說们何,Sender線程自KafkaProducer創(chuàng)建后就一直都在運行著 萄焦。它的工作流程基本上是這樣的:
不斷輪詢緩沖區(qū)尋找已做好發(fā)送準(zhǔn)備的分區(qū) ;
將輪詢獲得的各個batch按照目標(biāo)分區(qū)所在的leader broker進行分組冤竹;
將分組后的batch通過底層創(chuàng)建的Socket連接發(fā)送給各個broker拂封;
等待服務(wù)器端發(fā)送response回來。
為了說明上的方便鹦蠕,我還是基于圖的方式來解釋Sender線程的工作原理:
(4)冒签、Step 4: Sender線程處理response
上圖中Sender線程會發(fā)送PRODUCE請求給對應(yīng)的broker,broker處理完畢之后發(fā)送對應(yīng)的PRODUCE response钟病。一旦Sender線程接收到response將依次(按照消息發(fā)送順序)調(diào)用batch中的回調(diào)方法萧恕,如下圖所示:
做完這一步,producer發(fā)送消息就可以算作是100%完成了档悠。通過這4步我們可以看到新版本producer發(fā)送事件完全是異步過程廊鸥。因此在調(diào)優(yōu)producer前我們就需要搞清楚性能瓶頸到底是在用戶主線程還是在Sender線程。
由于KafkaProducer是線程安全的辖所,因此在使用上有兩種基本的使用方法:
refer:
http://www.cnblogs.com/huxi2b/p/6364613.html
http://www.cnblogs.com/davidwang456/p/4182001.html