前言
Kafka 作為一個消息系統(tǒng)筑煮,其中很大的一個用途就是作為業(yè)務上的解耦都毒,而它實現(xiàn)的模式就是經(jīng)典的生產(chǎn)者消費者模式。毫無疑問伴澄,就出現(xiàn)了producer赋除、consumer。然后消息總得有地方存放啊非凌,然后就有了具體的broker举农,那在broker上是如何進行組織和存放的,就出現(xiàn)了partition清焕。對應的為保證消息不丟失并蝗,也就出現(xiàn)了消息備份組這樣一個概念(ISR,in-sync replica)再加上消息的topic也就形成了秸妥,kafka的 topic-partition-message 的三級負載結(jié)構(gòu)滚停。到這里Kafka中比較核心的幾個概念就都有了,下面開始詳細介紹粥惧。
producer
producer也就是生產(chǎn)者键畴,是kafka中消息的產(chǎn)生方,產(chǎn)生消息并提交給kafka集群完成消息的持久化突雪,這個過程中主要涉及ProducerRecord對象的構(gòu)建起惕、分區(qū)選擇、元數(shù)據(jù)的填充咏删、ProducerRecord對象的序列化惹想、進入消息緩沖池、完成消息的發(fā)送督函、接受broker的響應嘀粱。
具體的流程是這樣的:
1激挪、確定topic信息
2、確定value信息
3锋叨、然后進行消息的序列化處理
4垄分、由分區(qū)選擇器確定對應的分區(qū)信息
5、將消息寫入消息緩沖區(qū)
6娃磺、完成消息請求的發(fā)送
7薄湿、完成消息響應的處理
ProducerRecord:
ProducerRecord 對象比較核心的信息有:topic、partition(這個信息是根據(jù)分區(qū)選擇器來確定的)偷卧、key豺瘤、value、timestamp
PS:時間戳信息是默認當前時間的涯冠,但是用戶可以指定時間戳信息炉奴,但是不推薦這么做,broker中大體有這么幾種log也就是消息存放文件普通日志文件蛇更,時間索引文件瞻赶,普通索引文件。如果強行指定時間戳很有可能導致時間索引失效派任。
元數(shù)據(jù):
元數(shù)據(jù)信息主要包括offset消息在分區(qū)日志中的位移信息砸逊、timestamp、topic/partition topic及對應的分區(qū)信息掌逛、checksum 消息對應的CRC32碼师逸、serializedKeySize 序列化后的key的字節(jié)數(shù)、serializedValueSize 序列化后的Value的字節(jié)數(shù)
Partition:
分區(qū)選擇器豆混,默認是murmur2 對于key進行hash計算然后對于總分區(qū)數(shù)求模以此得到被發(fā)送的分區(qū)號篓像,當然我們實現(xiàn)producer時可以自定義partition,或者指定特定分區(qū)皿伺。
serializer:
serializer是kafka實現(xiàn)的自己的序列化工具用于將消息對象序列化成字節(jié)序列员辩,Kafka中提供了ByteArraySerializer、ByteBufferSerializer鸵鸥、BytesSerializer奠滑、Long(Double Integer String)Serializer等幾種序列化方法,用戶也可以使用自定義的或者第三方的序列化工具妒穴。只需要使用指定對應參數(shù)即可(切記Kafka中指定對應的工具類時都是使用權(quán)限定名稱來做的)
序列化相關的參數(shù)有如下:
key.serializer
針對各個部分做序列化方式
key.deserializer key.serializer
對應解序列化方式
value.serializer
對value部分指定的序列化方式
value.deserializer value.serializer
對應解序列化方式
可以簡單的理解為key要比value的應用范圍廣宋税。
batch:
buffer.memory 指定producer待發(fā)送消息緩沖區(qū)的內(nèi)存大小,默認32m讼油,如果需要更改就使用這個參數(shù)進行修改杰赛。這里需要注意的是當producer端寫消息的速度超過了專屬IO線程發(fā)送消息的速度,并且緩沖區(qū)的消息數(shù)量超過buffer.memory指定的大小時矮台,producer會拋出異常通知用戶介入處理乏屯,這個緩沖區(qū)的大小需要根據(jù)實際場景來確定阔墩。
batch.size 指一個batch的大小,它直接決定了一個batch中存在的消息數(shù)量瓶珊,這個直接與producer的吞吐量及延時等直接相關,因為所謂的micr-batch 是指原本應該串行一條條發(fā)送的消息更改為緩存一部分消息耸彪,等達到對應的消息規(guī)模時一次性發(fā)送伞芹,也不會像批處理規(guī)模那么大(主要為了平衡延時與性能,這個會有專門的篇章來介紹micr-batch)
linger.size
producer端會專門劃出一部分內(nèi)存用于待發(fā)送消息的緩存蝉娜,batch.size決定了發(fā)送消息數(shù)量唱较,同時間接決定了消息緩存時存在的延時。linger.size 就是針對這一點設計出來的召川,它決定了消息被投放進緩沖區(qū)時是否立馬被發(fā)送南缓,默認參數(shù)是0(立即發(fā)送),這個大多數(shù)情況下是合理的荧呐,但是會很大程度上拉低kafka的吞吐量汉形。具體要根據(jù)實際的使用場景來確定了。
通信協(xié)議:
kafka 并沒有使用現(xiàn)有的http協(xié)議等倍阐,而是在TCP 協(xié)議之上實現(xiàn)了自己的通信協(xié)議概疆。單個client會創(chuàng)建多個socket鏈接與多個broker進行交互,Kafka 原生Java client使用類似于epoll的方式在單個連接上不停的輪訓傳數(shù)據(jù),但是每個broker上只需要維護一個Scoket鏈接,保證了消息的請求的順序處理峰搪,所以很清晰的可以看到在client端就需要我們自己去維護這個順序了岔冀。
整體來說Kafka 中大約有三類連接:client與broker之間消息傳輸、controller 與所有broker之間的交互概耻、client 獲取元數(shù)據(jù)&rebalance的通信過程使套。
同其他協(xié)議類似,Kafka的通信協(xié)議的請求和響應也都是格式化的鞠柄。由 固定長度初始類型(int8侦高、int16、int32春锋、int64)矫膨、可變長度類型(bytes、string)期奔、數(shù)組侧馅。請求頭由 api_key(int16,請求類型)呐萌、api_version(int16馁痴,請求版本號)、correlation_id(int32肺孤,與請求響應的關聯(lián)號罗晕,這個字段就是給響應用的)济欢、client_id(client id)
經(jīng)常接觸到的Kafka請求類型有:PRODUCE請求(生產(chǎn)消息請求)、FETCH請求(服務于消費消息小渊,并不一定是clients向broker拉消息法褥,也可能是follower副本向leader副本索要消息)、METADATA請求(獲取指定topic的元數(shù)據(jù)信息:[topics]+allow_auto_topic_creation)
PS:這里有一點需要說明酬屉,clients與broker是單向兼容的半等,這個在生產(chǎn)環(huán)境中如果不注意是格外容易發(fā)生問題的。這個兼容性是指呐萨,高版本broker可以兼容低版本clients杀饵,但是低版本broker無法兼容高版本clients,所以說升級clients版本谬擦,尤其是對接新的consumer時一定要格外注意切距。這個問題主要針對非Java client的,對于Java client來說惨远,會自動判斷連接的broker端所支持的client請求的最高版本谜悟。
producer interceptor
攔截器是新版本才出現(xiàn)的一個特性,并且是非必須的锨络,interceptor 核心的函數(shù)有onSend(在消息序列化計算分區(qū)之前就被調(diào)用)赌躺、onAcknowleagement(被應答前或者說在發(fā)送失敗時,這個方法是運行在producer的I/O線程中的羡儿,所以說如果存在很多重邏輯的話會導致嚴重影響處理消息的速率)礼患、close。通常是通過為clients定制一部分通用且簡單的邏輯時才會使用的掠归。
壓縮算法
Kafka支持的壓縮算法還是很可觀的:GZIP缅叠、Snappy、LZ4虏冻,默認情況下不進行消息壓縮肤粱,畢竟會消耗很大一部分cpu時間,導致send方法處理時間變慢厨相。啟動LZ4 進行消息壓縮的producer的吞吐量是最高的领曼。