用好Kafka的前提是理解Kafka基本運行方式,本文希望說明通過一些Kafka的基本概念躺枕,為建立一個Kafka使用模型進(jìn)行準(zhǔn)備。
基本過程
Kafka不僅僅是一收一發(fā)這樣簡單宰译,它為高效进苍、可靠地傳遞消息提供了大量特性。下圖是Kafka的基本消息處理過程熊楼。
- 生產(chǎn)階段:生產(chǎn)者(Producer)處理消息粟焊,和之前的消息打包,等待批量發(fā)布孙蒙;
- 發(fā)布階段:生產(chǎn)者將批量消息發(fā)送給Broker,主Broker將消息記錄在自己的日志文件中悲雳;
- 提交階段:將消息復(fù)制到追隨者(Follower Broker)的日志中挎峦;
- 追趕階段:Broker處理從消費者上一次處理位置(Offset)到新提交消息之間的消息;
- 獲取節(jié)點:消費者(Comsuer)批量從Broker獲取消息合瓢。
在上述過程中要特別注意三點:1坦胶、Kafka的生產(chǎn)者和消費者都是按批處理消息;2、消費者端通過消費組(Consumer Group)構(gòu)成消費集群顿苇;3峭咒、消費位置。
理解批處理
Kafka生產(chǎn)端和消費端都是按批傳遞消息纪岁,這樣可以減少Kafka消息遞送邏輯執(zhí)行的次數(shù)凑队,例如:網(wǎng)絡(luò)傳輸,資源調(diào)度等幔翰,降低消息處理的平均時延漩氨,提高吞吐量。下面分別從生產(chǎn)端和消費端了解批量控制的相關(guān)參數(shù)遗增。
生產(chǎn)端
生產(chǎn)端和發(fā)送方法(KafkaProducer.send()
)相關(guān)的參數(shù):
參數(shù) | 定義 | 默認(rèn)值 | 解釋 |
---|---|---|---|
batch.size | The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. | 16384byte | 等湊夠了發(fā)叫惊。 |
linger.ms | The producer groups together any records that arrive in between request transmissions into a single batched request. | 0 | 等多長時間發(fā)。 |
compression.type | Specify the final compression type for a given topic. | 壓縮后傳輸 | |
acks | The number of acknowledgments the producer requires the leader to have received before considering a request complete. | 1 |
batch.size
和linger.ms
兩個參數(shù)可以控制批次的大小做修。批次越大系統(tǒng)整體的吞吐量就越大霍狰,但是人為引入的延時也越長,因此饰及,這兩個參數(shù)應(yīng)該根據(jù)業(yè)務(wù)的實際情況進(jìn)行調(diào)優(yōu)蔗坯。linger.ms
的默認(rèn)值是0ms
,但是這不代表消息是單條發(fā)送的旋炒,Kafka會將同時到達(dá)消息打包發(fā)送步悠。一般建議將linger.ms
的值設(shè)置為5ms
。
另外瘫镇,需要注意參數(shù)acks=all
和max.in.flight.requests.per.connection=5
鼎兽。發(fā)送請求需要被Broker確認(rèn),參數(shù)max.in.flight.requests.per.connection
指定允許同時執(zhí)行的沒有被確認(rèn)的發(fā)送請求數(shù)铣除,超過了無法發(fā)送谚咬。acks
指定了確認(rèn)條件,包括:
-
acks=0
尚粘,Broker收到請求后就回復(fù)择卦; -
acks=1
,消息已經(jīng)寫到Leader的日志中郎嫁; -
acks=all
秉继,消息已經(jīng)寫到所有的Broker日志中。
消費端
在消費端和拉仍箢酢(KafkaConsumer.poll()
)批次相關(guān)的參數(shù)有如下幾個:
參數(shù) | 定義 | 默認(rèn)值 | 解釋 |
---|---|---|---|
fetch.min.bytes | The minimum amount of data the server should return for a fetch request. | 1byte |
broker 中數(shù)據(jù)小于這個值時尚辑,fetch 操作被阻塞等待數(shù)據(jù)。 |
fetch.max.wait.ms | The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes. | 500ms |
broker 數(shù)據(jù)不夠時最多等多長時間響應(yīng)fetch 操作盔腔。 |
max.partition.fetch.bytes | The maximum amount of data per-partition the server will return. | 1M | 允許每次fetch 每個分區(qū)返回的最大數(shù)據(jù)量杠茬。 |
消費者執(zhí)行poll
方法時并不是直接訪問Broker的數(shù)據(jù)月褥,而是通過fetch
循環(huán)。取數(shù)據(jù)時瓢喉,如果獲得的數(shù)據(jù)小于fetch.min.bytes
宁赤,那么Broker會阻塞poll
直到獲得了足夠的數(shù)據(jù)后再返回給消費者。如果一直沒有足夠的數(shù)據(jù)怎么辦栓票?參數(shù)fetch.max.wait.ms
指定了Broker最長的阻塞時間决左,如果數(shù)據(jù)不夠,但是達(dá)到了等待時間逗载,那么也會返回數(shù)據(jù)哆窿。參數(shù)max.partition.fetch.bytes
控制了每個分區(qū)(Partition)一次可獲取的最大數(shù)據(jù),這個主要和內(nèi)存控制有關(guān)厉斟,如果分區(qū)很多挚躯,那個又都包含很多數(shù)據(jù),就需要配置相應(yīng)的內(nèi)存擦秽。
消費端提交消費位置(Offset)
Kafka中码荔,一個分區(qū)只能被同一個消費組中一個消費者消費,消費按順序進(jìn)行(也可以指定)感挥,每條消息都有自己的offset
代表其在分區(qū)中的位置缩搅。poll
方法從上一次已提交的offset
之后拉取數(shù)據(jù),因此触幼,消費完數(shù)據(jù)必須執(zhí)行commit
硼瓣,才能保證消費向后進(jìn)行。
參數(shù) | 定義 | 默認(rèn)值 |
---|---|---|
enable.auto.commit | If true the consumer's offset will be periodically committed in the background. | true |
auto.commit.interval.ms | The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true. | 5000 |
上面兩個參數(shù)的含義很好理解置谦。需要注意的是堂鲤,多次執(zhí)行poll
方法不會觸發(fā)commit
,如果auto.commit.interval.ms
較長媒峡,就會執(zhí)行多個poll
之后再提交瘟栖。KafkaConsumer.close()
方法會執(zhí)行提交,所以谅阿,只要保證執(zhí)行了close
半哟,消費者程序退出時也能進(jìn)行提交。
不自動提交签餐,就需要進(jìn)行手動提交寓涨。如果不特別指定,提交的是最后一次poll
的最后一個offset
氯檐。這樣帶來一個問題缅茉,應(yīng)該在何時執(zhí)行提交操作,獲得消息后還是消息處理后男摧?先提交可能會消息丟失蔬墩,后提交可能會重復(fù)消費。
理解消費者集群(Consumer Group)
消費者組應(yīng)該是Kafka中最重要的概念耗拓,它提供了管理消費者集群的機制拇颅。
和消費組相關(guān)的主要參數(shù)如下:
參數(shù) | 定義 | 默認(rèn)值 |
---|---|---|
group.id | A unique string that identifies the consumer group this consumer belongs to. | |
group.instance.id | A unique identifier of the consumer instance provided by the end user. | |
partition.assignment.strategy | A list of class names or class types, ordered by preference, of supported partition assignment strategies that the client will use to distribute partition ownership amongst consumer instances when group management is used. | RangeAssignor |
session.timeout.ms | The timeout used to detect client failures when using Kafka's group management facility. 檢查多長時間沒有收到心跳。 | 10秒 |
heartbeat.interval.ms | Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. 建議不要大于session.timeout.out值的1/3乔询。 | 3秒 |
Kafka解決橫向擴展的方法是添加分區(qū)(Partition)和消費者(Consumer)樟插。生產(chǎn)者生產(chǎn)的消息可以分發(fā)到多個分區(qū)中,每個分區(qū)都由消費組(可多個消費組)內(nèi)的唯一的消費者消費竿刁,一個消費者可以消費多個分區(qū)黄锤。設(shè)置相同group.id
的消費者構(gòu)成一個消費組。消費者集群管理機制就是處理分區(qū)和消費者的分配關(guān)系食拜。
Kafka通過subscribe
方法實現(xiàn)自動分配鸵熟,通過assign
方法實現(xiàn)手工分配。通常都會采用自動分配的方式负甸,這樣才能充分發(fā)揮Kafka的特性流强。Kafka內(nèi)置幾種分配策略可以通過參數(shù)partition.assignment.strategy
指定。
如果采用
assgin
方式手工指定呻待,和組管理相關(guān)的機制不再生效打月。
消費者集群管理機制在kafka中叫Rebalance
,它解決的是當(dāng)分區(qū)和消費者分配關(guān)系發(fā)生變化時的重新分配蚕捉。觸發(fā)Rebalance
有兩種情況:1奏篙、消費組內(nèi)的成員數(shù)量發(fā)生變化,消費者加入或離開迫淹;2秘通、訂閱信息發(fā)生變化,分區(qū)數(shù)量變化或訂閱主題發(fā)生變化千绪。
Rebalance
的默認(rèn)機制是組內(nèi)消費者全員業(yè)務(wù)中斷充易。顯然這在很多場景下都是不合理的,也很難接受荸型,所以如何避免無效的Rebalance
始終是個熱點問題盹靴。針對這個問題Kafka已經(jīng)提供了優(yōu)化方法。
方法一瑞妇,修改分配策略稿静。將參數(shù)partition.assignment.strategy
設(shè)置為CooperativeStickyAssignor
可以優(yōu)化業(yè)務(wù)中斷的問題,它會盡量保證業(yè)務(wù)進(jìn)行辕狰,避免不必要的重新分配改备。
方法二,靜態(tài)成員關(guān)系(Static Membership)蔓倍。通過參數(shù)group.instance.id
給消費組中的每個消費者分配一個固定的id悬钳,當(dāng)這個消費者下線再上線時(在session.timeout.ms
范圍內(nèi))不會觸發(fā)Rebalance
盐捷,而是將之前已有的分配關(guān)系直接給這個消費者。
注意:session.timeout.ms
參數(shù)受限于Broker參數(shù)group.min.session.timeout.ms(6 sec)
和group.max.session.timeout.ms(30 min)
默勾,即大小不能超過這個上下限碉渡。
消費應(yīng)用的思考
無論Kafka提供了多少高級特性,如果消費應(yīng)用本身存在問題母剥,仍然不可能有效支撐業(yè)務(wù)處理的需求滞诺,因此,應(yīng)該關(guān)注一下實現(xiàn)消費應(yīng)用時面臨的基本問題环疼。
參數(shù) | 定義 | 默認(rèn)值 |
---|---|---|
max.poll.interval.ms | The maximum delay between invocations of poll() when using consumer group management. For consumers using a non-null group.instance.id which reach this timeout, partitions will not be immediately reassigned. Instead, the consumer will stop sending heartbeats and partitions will be reassigned after expiration of session.timeout.ms . |
5分鐘 |
max.poll.records | The maximum number of records returned in a single call to poll(). Note, that max.poll.records does not impact the underlying fetching behavior. The consumer will cache the records from each fetch request and returns them incrementally from each poll. | 500 |
client.id | An id string to pass to the server when making requests. |
max.poll.interval.ms
參數(shù)非常重要习霹,如果消費者沒有指定group.instance.id
,poll
間隔超時后會觸發(fā)Rebalance
炫隶,如果指定了group.instance.id
淋叶,等待session.timeout.ms
超時再觸發(fā)Rebalance
。max.poll.records
參數(shù)的目的是控制消息批次的總體時間等限,避免發(fā)生消費超時爸吮。
PS:
max.poll.records
參數(shù)和前面的fetch.xxx
參數(shù)是什么關(guān)系?如果max.poll.records
很小望门,fetch.min.bytes
很大形娇,那么fetch
的結(jié)果緩存起來?這個問題意義不大筹误,這樣設(shè)置并不合理桐早。
從這個兩個參數(shù)可以看出,Kafka認(rèn)為消費應(yīng)用必須考慮消息處理時長的問題厨剪,如果處理消息的業(yè)務(wù)邏輯耗時與參數(shù)設(shè)置不匹配哄酝,有可能發(fā)生意料之外的結(jié)果。例如我們的業(yè)務(wù)邏輯是自動提交祷膳,但是因為poll
間隔超時觸發(fā)Rebalance
陶衅,如果提交方法已經(jīng)執(zhí)行,那么會導(dǎo)致提交之后到超時發(fā)生之間的消息產(chǎn)生重復(fù)消費直晨。如果是所有消息處理完成后手動提交搀军,那么也會導(dǎo)致未進(jìn)行提交操作產(chǎn)生重復(fù)消費。
控制消費應(yīng)用執(zhí)行時間是一個必須認(rèn)真對待的問題勇皇,雖然可以通過減少max.poll.records
參數(shù)縮小一次poll
的執(zhí)行時間罩句,但是,如果處理邏輯中包含對外部服務(wù)的調(diào)用敛摘,那么就有可能因為外部服務(wù)的延時阻塞整個消費應(yīng)用门烂,即使減少消息數(shù)量也沒有用。更可控的方式應(yīng)該是給單個消息的處理設(shè)置超時時間,保證每條消息的處理都在指定時間范圍內(nèi)屯远,從而保證整體不超時蔓姚。還可以將poll
方法和消息處理獨立開,用不同的線程執(zhí)行慨丐,如果消息沒有處理完赂乐,poll
方法線程用KafkaConsumer.pause()
方法暫停獲取數(shù)據(jù),這樣poll
方法繼續(xù)按時間間隔執(zhí)行咖气,但是不獲取新數(shù)據(jù)。當(dāng)調(diào)用外部服務(wù)時挖滤,還需要考慮外部服務(wù)是否支持并發(fā)調(diào)用崩溪,如果需要應(yīng)該引入多線程或NIO的機制,提高整個業(yè)務(wù)的吞吐能力斩松。并發(fā)操作又會使超時控制變得更復(fù)雜伶唯。
消費應(yīng)用不可避免會發(fā)生異常情況,由于Kafka中數(shù)據(jù)是按批處理的惧盹,提交的也是最后消費位置乳幸,那么就一定會發(fā)生分區(qū)中消息狀態(tài)和消費應(yīng)用處理狀態(tài)不一致的情況。根據(jù)前面已經(jīng)提到的提交策略钧椰,要么接受丟失消息粹断,要么接受重復(fù)消費。通常我們不能接受消息丟失嫡霞,因此必須支持處理重復(fù)消息瓶埋,也就是消費應(yīng)用必須具備冪等性。
消費應(yīng)用避免不了進(jìn)行代碼升級诊沪、修改配置等維護操作养筒,因此,還必須考慮優(yōu)雅關(guān)機問題端姚,保證結(jié)束應(yīng)用前將消息和狀態(tài)提交處理完畢晕粪。(前面的重復(fù)消費和冪等性一定程度上可以解決這個問題,但還是應(yīng)該進(jìn)行主動控制渐裸。)
最后要說明的是巫湘,Broker,Java版的生產(chǎn)客戶端和消費客戶端都支持通過JMX獲取運行指標(biāo)橄仆,也可以在消費應(yīng)用中通過metrics
方法獲取剩膘。這些指標(biāo)是進(jìn)行精細(xì)調(diào)優(yōu)的基礎(chǔ)。沒了便于監(jiān)控盆顾,應(yīng)該給消費者設(shè)置client.id
參數(shù)(生產(chǎn)者也支持)怠褐,這樣提取運行指標(biāo)時就可以直接指定是哪個客戶端。
最后
Kafka的功能真的是太多了您宪,本文只是介紹了最常用的一些參數(shù)奈懒。還有很多特性應(yīng)該深入研究奠涌,例如:事物,日志管理(保存周期等)磷杏,保持消費順序溜畅,消費端異步提交,安全极祸,自定義分區(qū)分配策略等慈格。
建議閱讀
99th Percentile Latency at Scale with Apache Kafka
kafka 重平衡解決方案: cooperative協(xié)議和static membership功能詳解
From Eager to Smarter in Apache Kafka Consumer Rebalances
Incremental Cooperative Rebalancing in Apache Kafka: Why Stop the World When You Can Change It?
Apache Kafka Rebalance Protocol for the Cloud: Static Membership