偏移量提交帶來的再平衡問題
消費者提交偏移量的主要是消費者往一個名為_consumer_offset的特殊主題發(fā)送消息社牲,消息中包含每個分區(qū)的偏移量惜索。 如果消費者一直運行征绸,偏移量的提交并不會產(chǎn)生任何影響克锣。但是如果有消費者發(fā)生崩潰赖晶,或者有新的消費者加入消費者群組的時候律适,會觸發(fā) Kafka 的再均衡。這使得 Kafka 完成再均衡之后遏插,每個消費者可能被會分到新分區(qū)中捂贿。
為了能夠繼續(xù)之前的工作,消費者就需要讀取每一個分區(qū)的最后一次提交的偏移量胳嘲,然后從偏移量指定的地方繼續(xù)處理厂僧。
但是這樣可能會出現(xiàn)如下的問題。
提交偏移量小于客戶端處理的偏移量
如果提交的偏移量小于客戶端處理的最后一個消息的偏移量了牛,那么處于兩個偏移量之間的消息就會被重復(fù)處理颜屠。
提交偏移量大于客戶端處理的偏移量
如果提交的偏移量大于客戶端處理的最后一個消息的偏移量,那么處于兩個偏移量之間的消息將會丟失鹰祸。 因此甫窟,如果處理偏移量,會對客戶端處理數(shù)據(jù)產(chǎn)生影響蛙婴。
KafkaConsumer API 提供了很多種方式來提交偏移量粗井。
自動提交
自動提交是 Kafka 處理偏移量最簡單的方式。
當(dāng) enable.auto.commit 屬性被設(shè)為 true街图,那么每過 5s背传,消費者會自動把從 poll()方法接收到的最大偏移量提交上去。
這是因為提交時間間隔由 auto.commit.interval.ms 控制台夺,默認值是 5s径玖。與消費者里的其他東西一樣,自動提交也是在輪詢里進行的颤介。
消費者每次在進行輪詢時會檢查是否該提交偏移量了梳星,如果是赞赖,那么就會提交從上一次輪詢返回的偏移量。
但是使用這種方式冤灾,容易出現(xiàn)提交的偏移量小于客戶端處理的最后一個消息的偏移量這種情況的問題前域。
假設(shè)我們?nèi)匀皇褂媚J的 5s 提交時間間隔,在最近一次提交之后的 3s 發(fā)生了再均衡韵吨,再均衡之后匿垄,消費者從最后一次提交的偏移量位置開始讀取消息。這個時候偏移量已經(jīng)落后了 3s(因為沒有達到5s的時限归粉,并沒有提交偏移量)椿疗,所以在這 3s 的數(shù)據(jù)將會被重復(fù)處理。
雖然可以通過修改提交時間間隔來更頻繁地提交偏移量糠悼,減小可能出現(xiàn)重復(fù)消息的時間窗的時間跨度届榄,不過這種情況是無法完全避免的。
在使用自動提交時倔喂,每次調(diào)用輪詢方法都會把上一次調(diào)用返回的偏移量提交上去铝条,它并不知道具體哪些消息已經(jīng)被處理了,所以在再次調(diào)用之前最好確保所有當(dāng)前調(diào)用返回的消息都已經(jīng)處理完畢(在調(diào)用 close() 方法之前也會進行自動提交)席噩。一般情況下不會有什么問題班缰,不過在處理異常或提前退出輪詢時要格外小心悼枢。
手動提交
大部分開發(fā)者通過控制偏移量提交時間來消除丟失消息的可能性埠忘,并在發(fā)生再均衡時減少重復(fù)消息的數(shù)量。消費者 API 提供了另一種提交偏移量的方式萧芙,開發(fā)者可以在必要的時候提交當(dāng)前偏移量给梅,而不是基于時間間隔假丧。 這是我們需要把把 auto.commit.offset 設(shè)為 false双揪,讓應(yīng)用程序決定何時提交偏移量。
同步提交
使用 commitSync() 提交偏移量最簡單也最可靠包帚。這個 API 會提交由 poll() 方法返回的最新偏移量渔期,提交成功后馬上返回,如果提交失敗就拋出異常渴邦。
代碼示例如下:
commitSync() 將會提交由 poll() 返回的最新偏移量疯趟,所以在處理完所有記錄后要確保調(diào)用了 commitSync(),否則還是會有丟失消息的風(fēng)險谋梭。
如果發(fā)生了再均衡信峻,從最近一批消息到發(fā)生再均衡之間的所有消息都將被重復(fù)處理。
同時在這個程序中瓮床,只要沒有發(fā)生不可恢復(fù)的錯誤盹舞,commitSync() 方法會一直嘗試直至提交成功产镐。如果提交失敗,我們也只能把異常記錄到錯誤日志里踢步。
異步提交
同步提交有一個不足之處癣亚,在 broker 對提交請求作出回應(yīng)之前,應(yīng)用程序會一直阻塞获印,這樣會限制應(yīng)用程序的吞吐量述雾。我們可以通過降低提交頻率來提升吞吐量,但如果發(fā)生了再均衡兼丰,會增加重復(fù)消息的數(shù)量玻孟。 這個時候可以使用異步提交 API。我們只管發(fā)送提交請求地粪,無需等待 broker 的響應(yīng)取募。
在成功提交或碰到無法恢復(fù)的錯誤之前,commitSync() 會一直重試蟆技,但是 commitAsync() 不會玩敏,這也是 commitAsync() 不好的一個地方。 它之所以不進行重試质礼,是因為在它收到服務(wù)器響應(yīng)的時候旺聚,可能有一個更大的偏移量已經(jīng)提交成功。假設(shè)我們發(fā)出一個請求用于提交偏移量 2000眶蕉,這個時候發(fā)生了短暫的通信問題砰粹,服務(wù)器收不到請求,自然也不會作出任何響應(yīng)造挽。與此同時碱璃,我們處理了另外一批消息,并成功提交了偏移量 3000饭入。如果 commitAsync() 重新嘗試提交偏移量 2000嵌器,它有可能在偏移量 3000 之后提交成功。這個時候如果發(fā)生再均衡谐丢,就會出現(xiàn)重復(fù)消息爽航。 commitAsync() 也支持回調(diào),在 broker 作出響應(yīng)時會執(zhí)行回調(diào)乾忱〖フ洌回調(diào)經(jīng)常被用于記錄提交錯誤或生成度量指標(biāo)。如果要用它來進行重試窄瘟,則一定要注意提交的順序衷佃。
同步和異步混合提交
一般情況下,針對偶爾出現(xiàn)的提交失敗蹄葱,不進行重試不會有太大問題氏义,因為如果提交失敗是因為臨時問題導(dǎo)致的衰腌,那么后續(xù)的提交總會有成功的。 但如果這是發(fā)生在關(guān)閉消費者或再均衡前的最后一次提交觅赊,就要確保能夠提交成功右蕊。因此在這種情況下,我們應(yīng)該考慮使用混合提交的方法:
在程序正常運行過程中吮螺,我們使用 commitAsync 方法來進行提交饶囚,這樣的運行速度更快,而且就算當(dāng)前提交失敗鸠补,下次提交成功也可以萝风。 如果直接關(guān)閉消費者,就沒有所謂的“下一次提交”了紫岩,因為不會再調(diào)用poll()方法规惰。使用 commitSync() 方法會一直重試,直到提交成功或發(fā)生無法恢復(fù)的錯誤泉蝌。
提交特定的偏移量
如果 poll() 方法返回一大批數(shù)據(jù)歇万,為了避免因再均衡引起的重復(fù)處理整批消息,想要在批次中間提交偏移量該怎么辦勋陪?這種情況無法通過調(diào)用 commitSync() 或 commitAsync() 來實現(xiàn)贪磺,因為它們只會提交最后一個偏移量,而此時該批次里的消息還沒有處理完诅愚。 這時候需要使用一下的兩個方法:
消費者 API 允許在調(diào)用 commitSync() 和 commitAsync() 方法時傳進去希望提交的分區(qū)和偏移量的 map寒锚。 假設(shè)處理了半個批次的消息,最后一個來自主題“customers”分區(qū) 3 的消息的偏移量是 5000违孝,你可以調(diào)用 commitSync() 方法來提交它刹前。不過,因為消費者可能不只讀取一個分區(qū)雌桑,你需要跟蹤所有分區(qū)的偏移量喇喉,所以在這個層面上控制偏移量的提交會讓代碼變復(fù)雜。 代碼如下:
這里調(diào)用的是 commitAsync()筹燕,不過調(diào)用commitSync()也是完全可以的轧飞。在提交特定偏移量時衅鹿,仍然要處理可能發(fā)生的錯誤撒踪。
監(jiān)聽再均衡
如果 Kafka 觸發(fā)了再均衡,我們需要在消費者失去對一個分區(qū)的所有權(quán)之前提交最后一個已處理記錄的偏移量大渤。如果消費者準(zhǔn)備了一個緩沖區(qū)用于處理偶發(fā)的事件制妄,那么在失去分區(qū)所有權(quán)之前,需要處理在緩沖區(qū)累積下來的記錄泵三「蹋可能還需要關(guān)閉文件句柄衔掸、數(shù)據(jù)庫連接等。 在為消費者分配新分區(qū)或移除舊分區(qū)時俺抽,可以通過消費者 API 執(zhí)行一些應(yīng)用程序代碼敞映,在調(diào)用 subscribe() 方法時傳進去一個 ConsumerRebalanceListener 實例就可以了。ConsumerRebalanceListener 有兩個需要實現(xiàn)的方法磷斧。 public void onPartitionsRevoked(Collection partitions) 方法會在再均衡開始之前和消費者停止讀取消息之后被調(diào)用振愿。如果在這里提交偏移量,下一個接管分區(qū)的消費者就知道該從哪里開始讀取了弛饭。 public void onPartitionsAssigned(Collection partitions) 方法會在重新分配分區(qū)之后和消費者開始讀取消息之前被調(diào)用冕末。 下面的例子將演示如何在失去分區(qū)所有權(quán)之前通過 onPartitionsRevoked() 方法來提交偏移量。
如果發(fā)生再均衡侣颂,我們要在即將失去分區(qū)所有權(quán)時提交偏移量档桃。要注意,提交的是最近處理過的偏移量憔晒,而不是批次中還在處理的最后一個偏移量藻肄。因為分區(qū)有可能在我們還在處理消息的時候被撤回。我們要提交所有分區(qū)的偏移量拒担,而不只是那些即將失去所有權(quán)的分區(qū)的偏移量——因為提交的偏移量是已經(jīng)處理過的仅炊,所以不會有什么問題。調(diào)用 commitSync() 方法澎蛛,確保在再均衡發(fā)生之前提交偏移量抚垄。