Kafka實踐——理解生產(chǎn)者和消費者的主要參數(shù)

用好Kafka的前提是理解Kafka基本運行方式,本文希望說明通過一些Kafka的基本概念躺枕,為建立一個Kafka使用模型進(jìn)行準(zhǔn)備。

基本過程

Kafka不僅僅是一收一發(fā)這樣簡單宰译,它為高效进苍、可靠地傳遞消息提供了大量特性。下圖是Kafka的基本消息處理過程熊楼。

Kafka消息處理過程
  • 生產(chǎn)階段:生產(chǎn)者(Producer)處理消息粟焊,和之前的消息打包,等待批量發(fā)布孙蒙;
  • 發(fā)布階段:生產(chǎn)者將批量消息發(fā)送給Broker,主Broker將消息記錄在自己的日志文件中悲雳;
  • 提交階段:將消息復(fù)制到追隨者(Follower Broker)的日志中挎峦;
  • 追趕階段:Broker處理從消費者上一次處理位置(Offset)到新提交消息之間的消息;
  • 獲取節(jié)點:消費者(Comsuer)批量從Broker獲取消息合瓢。

在上述過程中要特別注意三點:1坦胶、Kafka的生產(chǎn)者和消費者都是按批處理消息;2、消費者端通過消費組(Consumer Group)構(gòu)成消費集群顿苇;3峭咒、消費位置。

理解批處理

Kafka生產(chǎn)端和消費端都是按批傳遞消息纪岁,這樣可以減少Kafka消息遞送邏輯執(zhí)行的次數(shù)凑队,例如:網(wǎng)絡(luò)傳輸,資源調(diào)度等幔翰,降低消息處理的平均時延漩氨,提高吞吐量。下面分別從生產(chǎn)端和消費端了解批量控制的相關(guān)參數(shù)遗增。

生產(chǎn)端

生產(chǎn)端和發(fā)送方法(KafkaProducer.send())相關(guān)的參數(shù):

參數(shù) 定義 默認(rèn)值 解釋
batch.size The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. 16384byte 等湊夠了發(fā)叫惊。
linger.ms The producer groups together any records that arrive in between request transmissions into a single batched request. 0 等多長時間發(fā)。
compression.type Specify the final compression type for a given topic. 壓縮后傳輸
acks The number of acknowledgments the producer requires the leader to have received before considering a request complete. 1

batch.sizelinger.ms兩個參數(shù)可以控制批次的大小做修。批次越大系統(tǒng)整體的吞吐量就越大霍狰,但是人為引入的延時也越長,因此饰及,這兩個參數(shù)應(yīng)該根據(jù)業(yè)務(wù)的實際情況進(jìn)行調(diào)優(yōu)蔗坯。linger.ms的默認(rèn)值是0ms,但是這不代表消息是單條發(fā)送的旋炒,Kafka會將同時到達(dá)消息打包發(fā)送步悠。一般建議將linger.ms的值設(shè)置為5ms

另外瘫镇,需要注意參數(shù)acks=allmax.in.flight.requests.per.connection=5鼎兽。發(fā)送請求需要被Broker確認(rèn),參數(shù)max.in.flight.requests.per.connection指定允許同時執(zhí)行的沒有被確認(rèn)的發(fā)送請求數(shù)铣除,超過了無法發(fā)送谚咬。acks指定了確認(rèn)條件,包括:

  • acks=0尚粘,Broker收到請求后就回復(fù)择卦;
  • acks=1,消息已經(jīng)寫到Leader的日志中郎嫁;
  • acks=all秉继,消息已經(jīng)寫到所有的Broker日志中。

消費端

在消費端和拉仍箢酢(KafkaConsumer.poll())批次相關(guān)的參數(shù)有如下幾個:

參數(shù) 定義 默認(rèn)值 解釋
fetch.min.bytes The minimum amount of data the server should return for a fetch request. 1byte broker中數(shù)據(jù)小于這個值時尚辑,fetch操作被阻塞等待數(shù)據(jù)。
fetch.max.wait.ms The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes. 500ms broker數(shù)據(jù)不夠時最多等多長時間響應(yīng)fetch操作盔腔。
max.partition.fetch.bytes The maximum amount of data per-partition the server will return. 1M 允許每次fetch每個分區(qū)返回的最大數(shù)據(jù)量杠茬。

消費者執(zhí)行poll方法時并不是直接訪問Broker的數(shù)據(jù)月褥,而是通過fetch循環(huán)。取數(shù)據(jù)時瓢喉,如果獲得的數(shù)據(jù)小于fetch.min.bytes宁赤,那么Broker會阻塞poll直到獲得了足夠的數(shù)據(jù)后再返回給消費者。如果一直沒有足夠的數(shù)據(jù)怎么辦栓票?參數(shù)fetch.max.wait.ms指定了Broker最長的阻塞時間决左,如果數(shù)據(jù)不夠,但是達(dá)到了等待時間逗载,那么也會返回數(shù)據(jù)哆窿。參數(shù)max.partition.fetch.bytes控制了每個分區(qū)(Partition)一次可獲取的最大數(shù)據(jù),這個主要和內(nèi)存控制有關(guān)厉斟,如果分區(qū)很多挚躯,那個又都包含很多數(shù)據(jù),就需要配置相應(yīng)的內(nèi)存擦秽。

消費端提交消費位置(Offset)

Kafka中码荔,一個分區(qū)只能被同一個消費組中一個消費者消費,消費按順序進(jìn)行(也可以指定)感挥,每條消息都有自己的offset代表其在分區(qū)中的位置缩搅。poll方法從上一次已提交的offset之后拉取數(shù)據(jù),因此触幼,消費完數(shù)據(jù)必須執(zhí)行commit硼瓣,才能保證消費向后進(jìn)行。

參數(shù) 定義 默認(rèn)值
enable.auto.commit If true the consumer's offset will be periodically committed in the background. true
auto.commit.interval.ms The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true. 5000

上面兩個參數(shù)的含義很好理解置谦。需要注意的是堂鲤,多次執(zhí)行poll方法不會觸發(fā)commit,如果auto.commit.interval.ms較長媒峡,就會執(zhí)行多個poll之后再提交瘟栖。KafkaConsumer.close()方法會執(zhí)行提交,所以谅阿,只要保證執(zhí)行了close半哟,消費者程序退出時也能進(jìn)行提交。

不自動提交签餐,就需要進(jìn)行手動提交寓涨。如果不特別指定,提交的是最后一次poll的最后一個offset氯檐。這樣帶來一個問題缅茉,應(yīng)該在何時執(zhí)行提交操作,獲得消息后還是消息處理后男摧?先提交可能會消息丟失蔬墩,后提交可能會重復(fù)消費。

理解消費者集群(Consumer Group)

消費者組應(yīng)該是Kafka中最重要的概念耗拓,它提供了管理消費者集群的機制拇颅。

和消費組相關(guān)的主要參數(shù)如下:

參數(shù) 定義 默認(rèn)值
group.id A unique string that identifies the consumer group this consumer belongs to.
group.instance.id A unique identifier of the consumer instance provided by the end user.
partition.assignment.strategy A list of class names or class types, ordered by preference, of supported partition assignment strategies that the client will use to distribute partition ownership amongst consumer instances when group management is used. RangeAssignor
session.timeout.ms The timeout used to detect client failures when using Kafka's group management facility. 檢查多長時間沒有收到心跳。 10秒
heartbeat.interval.ms Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. 建議不要大于session.timeout.out值的1/3乔询。 3秒

Kafka解決橫向擴展的方法是添加分區(qū)(Partition)和消費者(Consumer)樟插。生產(chǎn)者生產(chǎn)的消息可以分發(fā)到多個分區(qū)中,每個分區(qū)都由消費組(可多個消費組)內(nèi)的唯一的消費者消費竿刁,一個消費者可以消費多個分區(qū)黄锤。設(shè)置相同group.id的消費者構(gòu)成一個消費組。消費者集群管理機制就是處理分區(qū)和消費者的分配關(guān)系食拜。

Kafka通過subscribe方法實現(xiàn)自動分配鸵熟,通過assign方法實現(xiàn)手工分配。通常都會采用自動分配的方式负甸,這樣才能充分發(fā)揮Kafka的特性流强。Kafka內(nèi)置幾種分配策略可以通過參數(shù)partition.assignment.strategy指定。

如果采用assgin方式手工指定呻待,和組管理相關(guān)的機制不再生效打月。

消費者集群管理機制在kafka中叫Rebalance,它解決的是當(dāng)分區(qū)和消費者分配關(guān)系發(fā)生變化時的重新分配蚕捉。觸發(fā)Rebalance有兩種情況:1奏篙、消費組內(nèi)的成員數(shù)量發(fā)生變化,消費者加入或離開迫淹;2秘通、訂閱信息發(fā)生變化,分區(qū)數(shù)量變化或訂閱主題發(fā)生變化千绪。

Rebalance的默認(rèn)機制是組內(nèi)消費者全員業(yè)務(wù)中斷充易。顯然這在很多場景下都是不合理的,也很難接受荸型,所以如何避免無效的Rebalance始終是個熱點問題盹靴。針對這個問題Kafka已經(jīng)提供了優(yōu)化方法。

方法一瑞妇,修改分配策略稿静。將參數(shù)partition.assignment.strategy設(shè)置為CooperativeStickyAssignor可以優(yōu)化業(yè)務(wù)中斷的問題,它會盡量保證業(yè)務(wù)進(jìn)行辕狰,避免不必要的重新分配改备。

方法二,靜態(tài)成員關(guān)系(Static Membership)蔓倍。通過參數(shù)group.instance.id給消費組中的每個消費者分配一個固定的id悬钳,當(dāng)這個消費者下線再上線時(在session.timeout.ms范圍內(nèi))不會觸發(fā)Rebalance盐捷,而是將之前已有的分配關(guān)系直接給這個消費者。

注意:session.timeout.ms參數(shù)受限于Broker參數(shù)group.min.session.timeout.ms(6 sec)group.max.session.timeout.ms(30 min)默勾,即大小不能超過這個上下限碉渡。

消費應(yīng)用的思考

無論Kafka提供了多少高級特性,如果消費應(yīng)用本身存在問題母剥,仍然不可能有效支撐業(yè)務(wù)處理的需求滞诺,因此,應(yīng)該關(guān)注一下實現(xiàn)消費應(yīng)用時面臨的基本問題环疼。

參數(shù) 定義 默認(rèn)值
max.poll.interval.ms The maximum delay between invocations of poll() when using consumer group management. For consumers using a non-null group.instance.id which reach this timeout, partitions will not be immediately reassigned. Instead, the consumer will stop sending heartbeats and partitions will be reassigned after expiration of session.timeout.ms. 5分鐘
max.poll.records The maximum number of records returned in a single call to poll(). Note, that max.poll.records does not impact the underlying fetching behavior. The consumer will cache the records from each fetch request and returns them incrementally from each poll. 500
client.id An id string to pass to the server when making requests.

max.poll.interval.ms參數(shù)非常重要习霹,如果消費者沒有指定group.instance.idpoll間隔超時后會觸發(fā)Rebalance炫隶,如果指定了group.instance.id 淋叶,等待session.timeout.ms超時再觸發(fā)Rebalancemax.poll.records參數(shù)的目的是控制消息批次的總體時間等限,避免發(fā)生消費超時爸吮。

PS:max.poll.records參數(shù)和前面的fetch.xxx參數(shù)是什么關(guān)系?如果max.poll.records很小望门,fetch.min.bytes很大形娇,那么fetch的結(jié)果緩存起來?這個問題意義不大筹误,這樣設(shè)置并不合理桐早。

從這個兩個參數(shù)可以看出,Kafka認(rèn)為消費應(yīng)用必須考慮消息處理時長的問題厨剪,如果處理消息的業(yè)務(wù)邏輯耗時與參數(shù)設(shè)置不匹配哄酝,有可能發(fā)生意料之外的結(jié)果。例如我們的業(yè)務(wù)邏輯是自動提交祷膳,但是因為poll間隔超時觸發(fā)Rebalance陶衅,如果提交方法已經(jīng)執(zhí)行,那么會導(dǎo)致提交之后到超時發(fā)生之間的消息產(chǎn)生重復(fù)消費直晨。如果是所有消息處理完成后手動提交搀军,那么也會導(dǎo)致未進(jìn)行提交操作產(chǎn)生重復(fù)消費。

控制消費應(yīng)用執(zhí)行時間是一個必須認(rèn)真對待的問題勇皇,雖然可以通過減少max.poll.records參數(shù)縮小一次poll的執(zhí)行時間罩句,但是,如果處理邏輯中包含對外部服務(wù)的調(diào)用敛摘,那么就有可能因為外部服務(wù)的延時阻塞整個消費應(yīng)用门烂,即使減少消息數(shù)量也沒有用。更可控的方式應(yīng)該是給單個消息的處理設(shè)置超時時間,保證每條消息的處理都在指定時間范圍內(nèi)屯远,從而保證整體不超時蔓姚。還可以將poll方法和消息處理獨立開,用不同的線程執(zhí)行慨丐,如果消息沒有處理完赂乐,poll方法線程用KafkaConsumer.pause()方法暫停獲取數(shù)據(jù),這樣poll方法繼續(xù)按時間間隔執(zhí)行咖气,但是不獲取新數(shù)據(jù)。當(dāng)調(diào)用外部服務(wù)時挖滤,還需要考慮外部服務(wù)是否支持并發(fā)調(diào)用崩溪,如果需要應(yīng)該引入多線程或NIO的機制,提高整個業(yè)務(wù)的吞吐能力斩松。并發(fā)操作又會使超時控制變得更復(fù)雜伶唯。

消費應(yīng)用不可避免會發(fā)生異常情況,由于Kafka中數(shù)據(jù)是按批處理的惧盹,提交的也是最后消費位置乳幸,那么就一定會發(fā)生分區(qū)中消息狀態(tài)和消費應(yīng)用處理狀態(tài)不一致的情況。根據(jù)前面已經(jīng)提到的提交策略钧椰,要么接受丟失消息粹断,要么接受重復(fù)消費。通常我們不能接受消息丟失嫡霞,因此必須支持處理重復(fù)消息瓶埋,也就是消費應(yīng)用必須具備冪等性

消費應(yīng)用避免不了進(jìn)行代碼升級诊沪、修改配置等維護操作养筒,因此,還必須考慮優(yōu)雅關(guān)機問題端姚,保證結(jié)束應(yīng)用前將消息和狀態(tài)提交處理完畢晕粪。(前面的重復(fù)消費和冪等性一定程度上可以解決這個問題,但還是應(yīng)該進(jìn)行主動控制渐裸。)

最后要說明的是巫湘,Broker,Java版的生產(chǎn)客戶端和消費客戶端都支持通過JMX獲取運行指標(biāo)橄仆,也可以在消費應(yīng)用中通過metrics方法獲取剩膘。這些指標(biāo)是進(jìn)行精細(xì)調(diào)優(yōu)的基礎(chǔ)。沒了便于監(jiān)控盆顾,應(yīng)該給消費者設(shè)置client.id參數(shù)(生產(chǎn)者也支持)怠褐,這樣提取運行指標(biāo)時就可以直接指定是哪個客戶端。

最后

Kafka的功能真的是太多了您宪,本文只是介紹了最常用的一些參數(shù)奈懒。還有很多特性應(yīng)該深入研究奠涌,例如:事物,日志管理(保存周期等)磷杏,保持消費順序溜畅,消費端異步提交,安全极祸,自定義分區(qū)分配策略等慈格。

建議閱讀

Optimizing Kafka producers

Optimizing Kafka consumers

99th Percentile Latency at Scale with Apache Kafka

kafka 重平衡解決方案: cooperative協(xié)議和static membership功能詳解

From Eager to Smarter in Apache Kafka Consumer Rebalances

Incremental Cooperative Rebalancing in Apache Kafka: Why Stop the World When You Can Change It?

Apache Kafka Rebalance Protocol for the Cloud: Static Membership

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市遥金,隨后出現(xiàn)的幾起案子浴捆,更是在濱河造成了極大的恐慌,老刑警劉巖稿械,帶你破解...
    沈念sama閱讀 212,383評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件选泻,死亡現(xiàn)場離奇詭異,居然都是意外死亡美莫,警方通過查閱死者的電腦和手機页眯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,522評論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來厢呵,“玉大人窝撵,你說我怎么就攤上這事∈鑫” “怎么了忿族?”我有些...
    開封第一講書人閱讀 157,852評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長蝌矛。 經(jīng)常有香客問我道批,道長,這世上最難降的妖魔是什么入撒? 我笑而不...
    開封第一講書人閱讀 56,621評論 1 284
  • 正文 為了忘掉前任隆豹,我火速辦了婚禮,結(jié)果婚禮上茅逮,老公的妹妹穿的比我還像新娘璃赡。我一直安慰自己,他們只是感情好献雅,可當(dāng)我...
    茶點故事閱讀 65,741評論 6 386
  • 文/花漫 我一把揭開白布碉考。 她就那樣靜靜地躺著,像睡著了一般挺身。 火紅的嫁衣襯著肌膚如雪侯谁。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,929評論 1 290
  • 那天,我揣著相機與錄音墙贱,去河邊找鬼热芹。 笑死,一個胖子當(dāng)著我的面吹牛惨撇,可吹牛的內(nèi)容都是我干的伊脓。 我是一名探鬼主播,決...
    沈念sama閱讀 39,076評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼魁衙,長吁一口氣:“原來是場噩夢啊……” “哼报腔!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起剖淀,我...
    開封第一講書人閱讀 37,803評論 0 268
  • 序言:老撾萬榮一對情侶失蹤榄笙,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后祷蝌,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,265評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡帆卓,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,582評論 2 327
  • 正文 我和宋清朗相戀三年巨朦,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片剑令。...
    茶點故事閱讀 38,716評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡糊啡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出吁津,到底是詐尸還是另有隱情棚蓄,我是刑警寧澤,帶...
    沈念sama閱讀 34,395評論 4 333
  • 正文 年R本政府宣布碍脏,位于F島的核電站梭依,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏典尾。R本人自食惡果不足惜役拴,卻給世界環(huán)境...
    茶點故事閱讀 40,039評論 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望钾埂。 院中可真熱鬧河闰,春花似錦、人聲如沸褥紫。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,798評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽髓考。三九已至部念,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背印机。 一陣腳步聲響...
    開封第一講書人閱讀 32,027評論 1 266
  • 我被黑心中介騙來泰國打工矢腻, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人射赛。 一個月前我還...
    沈念sama閱讀 46,488評論 2 361
  • 正文 我出身青樓多柑,卻偏偏與公主長得像,于是被迫代替她去往敵國和親楣责。 傳聞我的和親對象是個殘疾皇子竣灌,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,612評論 2 350

推薦閱讀更多精彩內(nèi)容