一昭卓、為什么要做高可用
由于依賴Flink應(yīng)用A下游服務(wù)非常重要,對(duì)應(yīng)用A所提供數(shù)據(jù)的實(shí)時(shí)性蛙讥、可靠性要求比較高锯蛀。所以對(duì)應(yīng)用A進(jìn)行了高可用改造灭衷,運(yùn)行在兩個(gè)獨(dú)立的flink上,保障不會(huì)由于某個(gè)flink集群故障而影響下游服務(wù)旁涤。在探索過程中落地經(jīng)歷了從雙活到熱備的轉(zhuǎn)變翔曲,下面將對(duì)這一歷程進(jìn)行回顧。
二劈愚、單集群運(yùn)行
Kafka Source Connector消費(fèi)數(shù)據(jù)瞳遍,經(jīng)過計(jì)算后將計(jì)算后的數(shù)據(jù)寫入kafka另外一個(gè)topic。寫入topic有兩個(gè)地方菌羽。其中Sink直接將數(shù)據(jù)流寫入kafka掠械。ProcessFunction注冊了定時(shí)器,當(dāng)一條數(shù)據(jù)過來后會(huì)注冊一個(gè)定時(shí)器注祖,當(dāng)一個(gè)設(shè)備的數(shù)據(jù)一段時(shí)間沒有流入將會(huì)生成一條離線數(shù)據(jù)寫入到kafka猾蒂。高可用要保證的就是Sink和ProcessFunction數(shù)據(jù)能夠可靠得寫入到kafka。
三是晨、高可用階段一:雙活
程序分別跑在兩個(gè)集群肚菠。集群1和集群2在寫入kafka前,先通過REDIS的SETNX來設(shè)置值罩缴,當(dāng)設(shè)置成功時(shí)才發(fā)送數(shù)據(jù)到kafka蚊逢。這種方式經(jīng)過測試發(fā)現(xiàn)存在亂序的情況。由于無法保證先搶到發(fā)送權(quán)的箫章,一定先發(fā)送kafka烙荷。
如果在前一條記錄發(fā)送kafka成功之后,記錄一個(gè)發(fā)送成功狀態(tài)檬寂。讀取到這個(gè)狀態(tài)后才允許進(jìn)行下一條數(shù)據(jù)的發(fā)送可以強(qiáng)行保證有序终抽。但這樣做會(huì)導(dǎo)致數(shù)據(jù)時(shí)效性低下、程序復(fù)雜度升高、落地難拿诸,總之此路不通扒袖。
四、高可用階段二:熱備
兩個(gè)集群同時(shí)發(fā)送kafka數(shù)據(jù)不行亩码,那么就只讓一個(gè)集群發(fā)送數(shù)據(jù)季率。當(dāng)單個(gè)集群出現(xiàn)故障無法恢復(fù),快速用另一個(gè)集群替代進(jìn)行業(yè)務(wù)數(shù)據(jù)處理描沟。
我們將當(dāng)前負(fù)責(zé)發(fā)送數(shù)據(jù)到下游的集群稱為主集群飒泻,將不發(fā)送數(shù)據(jù)處于待命狀態(tài)的集群稱為備集群。上圖中集群1為主集群吏廉,集群2為備集群泞遗。為了保證主集群出現(xiàn)故障,切換到備集群的時(shí)候不丟數(shù)據(jù)席覆,備集群的消費(fèi)進(jìn)度一定要晚于主集群史辙。
有幾個(gè)需要回答的問題擺在面前:
- 如何實(shí)現(xiàn)備集群跟隨主集群處理進(jìn)度?
- 備集群與主集群消費(fèi)差距應(yīng)該是多少佩伤?
- 切換后的的重復(fù)數(shù)據(jù)發(fā)不發(fā)聊倔?
1.如何實(shí)現(xiàn)備集群跟隨主集群處理進(jìn)度
主集群負(fù)責(zé)記錄處理進(jìn)度,按照topic-partition粒度記錄處理的offset生巡。備集群負(fù)責(zé)讀取前者記錄的進(jìn)度進(jìn)行降速耙蔑。為保證代碼的簡潔和小的復(fù)雜度,將記錄進(jìn)度和根據(jù)進(jìn)度降速放在同一個(gè)環(huán)節(jié)處理孤荣。
在消費(fèi)環(huán)節(jié)還是在其他環(huán)節(jié)降速面臨不同的問題甸陌。消費(fèi)環(huán)節(jié)控制會(huì)導(dǎo)致備集群切換為主集群時(shí),前文中ProcessFunction少量注冊的離線定時(shí)器誤報(bào)盐股。在其他環(huán)節(jié)控制會(huì)導(dǎo)致背壓钱豁,checkpoint失敗,且不容易對(duì)數(shù)據(jù)進(jìn)度進(jìn)行控制遂庄。經(jīng)過權(quán)衡寥院,選擇在消費(fèi)環(huán)節(jié)控制。
2.備集群與主集群消費(fèi)差距應(yīng)該是多少
差距當(dāng)然是在切換后不漏數(shù)據(jù)的前提下越接近越好涛目。
下面討論最近可以多近秸谢?
給個(gè)反例,集群2是備集群霹肝,集群1是主集群估蹄,集群2緊隨集群1的進(jìn)度之后。集群1從kafka讀取到數(shù)據(jù)時(shí)記錄offset沫换,目前記錄為9臭蚁。集群1在處理完數(shù)據(jù)6時(shí)完成了一次checkpoint,目前消費(fèi)到了數(shù)據(jù)9(圖4),此時(shí)7垮兑、8雖然消費(fèi)了冷尉,但是沒有到達(dá)Sink(圖5)。集群2此時(shí)讀取到數(shù)據(jù)8系枪。
假如集群1這個(gè)時(shí)候出現(xiàn)故障宕機(jī)雀哨,切換到集群2,數(shù)據(jù)7將會(huì)丟失私爷。
所以要保證備集群消費(fèi)的進(jìn)度在已發(fā)送下游數(shù)據(jù)之后雾棺。主集群在一次完整的checkpoint做完后寫入offset可以保證備集群不漏數(shù)據(jù)。
3.切換后的的重復(fù)數(shù)據(jù)發(fā)不發(fā)
在當(dāng)前實(shí)現(xiàn)中依賴下游系統(tǒng)冪等處理來做到整體的EXACTLY ONCE衬浑。
4.整個(gè)主備集群協(xié)同步驟的描述
1.主集群消費(fèi)kafka數(shù)據(jù)捌浩,checkpoint成功后,在提交kafka offset的同時(shí)工秩,將kafka offset記錄到redis中尸饺;
2.備集群讀取redis中得到當(dāng)前主集群offset消費(fèi)進(jìn)度。消費(fèi)kafka數(shù)據(jù)拓诸,當(dāng)數(shù)據(jù)中對(duì)應(yīng)topic的partition的offset大于或者等于redis中的offset-1則丟棄這條數(shù)據(jù)(由于Flink中做checkpoint時(shí)提交的kafka消費(fèi)offset是由source emit到下游的kafka數(shù)據(jù)驅(qū)動(dòng)的)侵佃,并且調(diào)用consumer的pause停止消費(fèi)指定partition麻昼;
3.指定時(shí)間(目前是10秒)后會(huì)調(diào)用consumer的resume方法喚醒備集群的指定partition奠支,當(dāng)消費(fèi)的數(shù)據(jù)仍然滿足暫停條件則繼續(xù)步驟2;
4.主備集群根據(jù)配置在apollo中的當(dāng)前主集群id來知道自己是否是主集群抚芦。當(dāng)主集群切換為備集群時(shí)則根據(jù)redis中的值呈現(xiàn)跟隨狀態(tài)倍谜,備集群切換為主集群則呈現(xiàn)記錄redis狀態(tài)。主集群推送數(shù)據(jù)到下游叉抡,備集群不推數(shù)據(jù)尔崔。
5.一些細(xì)節(jié)
最終落地基于
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09,
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher,
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread
進(jìn)行二次開發(fā),加入熱備模塊褥民。其中進(jìn)度offset提交操作在:
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread 的 setOffsetsToCommit方法季春,此方法由上層實(shí)現(xiàn)了以下接口的類調(diào)用
/**
* This interface must be implemented by functions/operations that want to receive
* a commit notification once a checkpoint has been completely acknowledged by all
* participants.
*/
@PublicEvolving
public interface CheckpointListener {
/**
* This method is called as a notification once a distributed checkpoint has been completed.
*
* Note that any exception during this method will not cause the checkpoint to
* fail any more.
*
* @param checkpointId The ID of the checkpoint that has been completed.
* @throws Exception
*/
void notifyCheckpointComplete(long checkpointId) throws Exception;
}
為提高可靠性對(duì)redis的訪問增加熔斷降級(jí)機(jī)制。當(dāng)一段時(shí)間redis出現(xiàn)訪問異常消返,會(huì)暫時(shí)不訪問载弄,降級(jí)窗口結(jié)束后會(huì)繼續(xù)訪問。
五撵颊、結(jié)語
最終應(yīng)用A高可用方案以熱備落地宇攻,主要是解決單個(gè)集群長時(shí)間無法恢復(fù)的問題。目前不同集群使用同一份checkpoint技術(shù)層面已經(jīng)落地倡勇,后面可以有冷備方案逞刷,當(dāng)主機(jī)群宕機(jī)后,備集群讀取主機(jī)群的checkpoint文件數(shù)據(jù)啟動(dòng)繼續(xù)服務(wù)。