一兢孝、概述
Apache Kafka 發(fā)展至今疾渴,已經(jīng)是一個(gè)很成熟的消息隊(duì)列組件了集惋,也是大數(shù)據(jù)生態(tài)圈中不可或缺的一員壹置。Apache Kafka 社區(qū)非常的活躍,通過(guò)社區(qū)成員不斷的貢獻(xiàn)代碼和迭代項(xiàng)目表谊,使得 Apache Kafka 功能越發(fā)豐富钞护、性能越發(fā)穩(wěn)定,成為企業(yè)大數(shù)據(jù)技術(shù)架構(gòu)解決方案中重要的一環(huán)爆办。
Apache Kafka 作為一個(gè)熱門(mén)消息隊(duì)列中間件难咕,具備高效可靠的消息處理能力,且擁有非常廣泛的應(yīng)用領(lǐng)域押逼。那么步藕,今天就來(lái)聊一聊基于 Kafka 的實(shí)時(shí)數(shù)倉(cāng)在搜索的實(shí)踐應(yīng)用。
二挑格、為什么需要 Kafka
在設(shè)計(jì)大數(shù)據(jù)技術(shù)架構(gòu)之前,通常會(huì)做一些技術(shù)調(diào)研沾歪。我們會(huì)去思考一下為什么需要 Kafka漂彤?怎么判斷選擇的 Kafka 技術(shù)能否滿足當(dāng)前的技術(shù)要求?
2.1 早期的數(shù)據(jù)架構(gòu)
早期的數(shù)據(jù)類型比較簡(jiǎn)單灾搏,業(yè)務(wù)架構(gòu)也比較簡(jiǎn)單挫望,就是將需要的數(shù)據(jù)存儲(chǔ)下來(lái)。比如將游戲類的數(shù)據(jù)存儲(chǔ)到數(shù)據(jù)庫(kù)(MySQL狂窑、Oracle)媳板。但是,隨著業(yè)務(wù)的增量泉哈,存儲(chǔ)的數(shù)據(jù)類型也隨之增加了蛉幸,然后我們需要使用的大數(shù)據(jù)集群,利用數(shù)據(jù)倉(cāng)庫(kù)來(lái)將這些數(shù)據(jù)進(jìn)行分類存儲(chǔ)丛晦,如下圖所示:
但是奕纫,數(shù)據(jù)倉(cāng)庫(kù)存儲(chǔ)數(shù)據(jù)是有時(shí)延的,通常時(shí)延為T(mén)+1烫沙。而現(xiàn)在的數(shù)據(jù)服務(wù)對(duì)象對(duì)時(shí)延要求均有很高的要求匹层,例如物聯(lián)網(wǎng)、微服務(wù)锌蓄、移動(dòng)端APP等等升筏,皆需要實(shí)時(shí)處理這些數(shù)據(jù)。
2.2 Kafka 的出現(xiàn)
Kafka 的出現(xiàn)瘸爽,給日益增長(zhǎng)的復(fù)雜業(yè)務(wù)您访,提供了新的存儲(chǔ)方案。將各種復(fù)雜的業(yè)務(wù)數(shù)據(jù)統(tǒng)一存儲(chǔ)到 Kafka 里面蝶糯,然后在通過(guò) Kafka 做數(shù)據(jù)分流洋只。如下圖所示:
這里辆沦,可以將視頻、游戲识虚、音樂(lè)等不同類型的數(shù)據(jù)統(tǒng)一存儲(chǔ)到 Kafka 里面肢扯,然后在通過(guò)流處理對(duì) Kafka 里面的數(shù)據(jù)做分流操作。例如担锤,將數(shù)據(jù)存儲(chǔ)到數(shù)據(jù)倉(cāng)庫(kù)蔚晨、將計(jì)算的結(jié)果存儲(chǔ)到KV做實(shí)時(shí)分析等。
通常消息系統(tǒng)常見(jiàn)的有兩種肛循,它們分別是:
消息隊(duì)列:隊(duì)列消費(fèi)者充當(dāng)了工作組的角色铭腕,每條消息記錄只能傳遞給一個(gè)工作進(jìn)程,從而有效的劃分工作流程多糠;
生產(chǎn)&消費(fèi):消費(fèi)者通常是互相獨(dú)立的累舷,每個(gè)消費(fèi)者都可以獲得每條消息的副本。
這兩種方式都是有效和實(shí)用的夹孔,通過(guò)消息隊(duì)列將工作內(nèi)容分開(kāi)被盈,用于容錯(cuò)和擴(kuò)展;生產(chǎn)和消費(fèi)能夠允許多租戶搭伤,來(lái)使得系統(tǒng)解耦只怎。而 Apache Kafka 的優(yōu)點(diǎn)之一在于它將消息隊(duì)列、生產(chǎn)和消費(fèi)結(jié)合到了一個(gè)強(qiáng)大的消息系統(tǒng)當(dāng)中怜俐。
同時(shí)身堡,Kafka 擁有正確的消息處理特性,主要體現(xiàn)在以下幾個(gè)方面:
可擴(kuò)展性:當(dāng) Kafka 的性能(如存儲(chǔ)拍鲤、吞吐等)達(dá)到瓶頸時(shí)贴谎,可以通過(guò)水平擴(kuò)展來(lái)提升性能;
真實(shí)存儲(chǔ):Kafka 的數(shù)據(jù)是實(shí)時(shí)落地在磁盤(pán)上的殿漠,不會(huì)因?yàn)榧褐貑⒒蚬收隙鴣G失數(shù)據(jù)赴精;
實(shí)時(shí)處理:能夠集成主流的計(jì)算引擎(如Flink、Spark等)绞幌,對(duì)數(shù)據(jù)進(jìn)行實(shí)時(shí)處理蕾哟;
順序?qū)懭?/strong>:磁盤(pán)順序 I/O 讀寫(xiě),跳過(guò)磁頭“尋址”時(shí)間莲蜘,提高讀寫(xiě)速度谭确;
內(nèi)存映射:操作系統(tǒng)分頁(yè)存儲(chǔ)利用內(nèi)存提升 I/O 性能,實(shí)現(xiàn)文件到內(nèi)存的映射票渠,通過(guò)同步或者異步來(lái)控制 Flush逐哈;
零拷貝:將磁盤(pán)文件的數(shù)據(jù)復(fù)制到“頁(yè)面緩存”一次,然后將數(shù)據(jù)從“頁(yè)面緩存”直接發(fā)送到網(wǎng)絡(luò)问顷;
高效存儲(chǔ):Topic 和 Partition 拆為多個(gè)文件片段(Segment)昂秃,定期清理無(wú)效文件禀梳。采用稀疏存儲(chǔ),間隔若干字節(jié)建立一條索引肠骆,防止索引文件過(guò)大算途。
2.3 簡(jiǎn)單的應(yīng)用場(chǎng)景
這里,我們可以通過(guò)一個(gè)簡(jiǎn)單直觀的應(yīng)用場(chǎng)景蚀腿,來(lái)了解 Kafka 的用途嘴瓤。
場(chǎng)景:假如用戶A正在玩一款游戲,某一天用戶A喜歡上了游戲里面的一款道具莉钙,打算購(gòu)買(mǎi)廓脆,于是在當(dāng)天 14:00 時(shí)充值了 10 元,在逛游戲商店時(shí)又喜歡上了另一款道具磁玉,于是在 14:30 時(shí)又充值了 30 元停忿,接著在 15:00 時(shí)開(kāi)始下單購(gòu)買(mǎi),花費(fèi)了 20 元蜀涨,剩余金額為 20 元瞎嬉。那么,整個(gè)事件流厚柳,對(duì)應(yīng)到庫(kù)表里面的數(shù)據(jù)明細(xì)應(yīng)該是如下圖所示:
三、Kafka解決了什么問(wèn)題
早期為響應(yīng)項(xiàng)目快速上線沐兵,在服務(wù)器或者云服務(wù)器上部署一個(gè) WebServer别垮,為個(gè)人電腦或者移動(dòng)用戶提供訪問(wèn)體驗(yàn),然后后臺(tái)在對(duì)接一個(gè)數(shù)據(jù)庫(kù)扎谎,為 Web 應(yīng)用提供數(shù)據(jù)持久化以及數(shù)據(jù)查詢碳想,流程如下圖所示:
但是,隨著用戶的迅速增長(zhǎng)毁靶,用戶所有的訪問(wèn)都直接通過(guò) SQL 數(shù)據(jù)庫(kù)使得它不堪重負(fù)胧奔,數(shù)據(jù)庫(kù)的壓力也越來(lái)越大,不得不加上緩存服務(wù)以降低 SQL 數(shù)據(jù)庫(kù)的荷載预吆。
同時(shí)龙填,為了理解用戶行為,又開(kāi)始收集日志并保存到 Hadoop 這樣的大數(shù)據(jù)集群上做離線處理拐叉,并且把日志放在全文檢索系統(tǒng)(比如 ElasticSearch)中以便快速定位問(wèn)題岩遗。由于需要給投資方看業(yè)務(wù)狀況,也需要把數(shù)據(jù)匯總到數(shù)據(jù)倉(cāng)庫(kù)(比如 Hive)中以便提供交互式報(bào)表凤瘦。此時(shí)的系統(tǒng)架構(gòu)已經(jīng)具有一定的復(fù)雜性了宿礁,將來(lái)可能還會(huì)加入實(shí)時(shí)模塊以及外部數(shù)據(jù)交互。
本質(zhì)上蔬芥,這是一個(gè)數(shù)據(jù)集成問(wèn)題梆靖。沒(méi)有任何一個(gè)系統(tǒng)能夠解決所有的事情控汉,所以業(yè)務(wù)數(shù)據(jù)根據(jù)不同用途,存放在不同的系統(tǒng)返吻,比如歸檔姑子、分析、搜索思喊、緩存等壁酬。數(shù)據(jù)冗余本身沒(méi)有任何問(wèn)題,但是不同系統(tǒng)之間太過(guò)復(fù)雜的數(shù)據(jù)同步卻是一種挑戰(zhàn)恨课。如下圖所示:
而 Kafka 可以讓合適的數(shù)據(jù)以合適的形式出現(xiàn)在合適的地方舆乔。Kafka 的做法是提供消息隊(duì)列,讓生產(chǎn)者向隊(duì)列的末尾添加數(shù)據(jù)剂公,讓多個(gè)消費(fèi)者從隊(duì)列里面依次讀取數(shù)據(jù)然后自行處理希俩。如果說(shuō)之前連接的復(fù)雜度是 O(N^2),那么現(xiàn)在復(fù)雜度降低到了 O(N)纲辽,擴(kuò)展起來(lái)也方便多了颜武,流程如下圖所示:
四、Kafka的實(shí)踐應(yīng)用
4.1 為什么需要建設(shè)實(shí)時(shí)數(shù)倉(cāng)
4.1.1 目的
通常情況下拖吼,在大數(shù)據(jù)場(chǎng)景中鳞上,存儲(chǔ)海量數(shù)據(jù)建設(shè)數(shù)據(jù)倉(cāng)庫(kù)一般都是離線數(shù)倉(cāng)(時(shí)延T+1),通過(guò)定時(shí)任務(wù)每天拉取增量數(shù)據(jù)吊档,然后創(chuàng)建各個(gè)業(yè)務(wù)不同維度的數(shù)據(jù)篙议,對(duì)外提供 T+1 的數(shù)據(jù)服務(wù)。計(jì)算和數(shù)據(jù)的實(shí)時(shí)性均比較差怠硼,業(yè)務(wù)人員無(wú)法根據(jù)自己的即時(shí)性需求獲取幾分鐘之前的實(shí)時(shí)數(shù)據(jù)鬼贱。數(shù)據(jù)本身的價(jià)值隨著時(shí)間的流逝會(huì)逐步減弱,因此數(shù)據(jù)產(chǎn)生后必須盡快的到達(dá)用戶的手中香璃,實(shí)時(shí)數(shù)倉(cāng)的建設(shè)需求由此而來(lái)这难。
4.1.2 目標(biāo)
為了適應(yīng)業(yè)務(wù)高速迭代的特點(diǎn),分析用戶行為葡秒,挖掘用戶價(jià)值姻乓,提高用戶留存,在實(shí)時(shí)數(shù)據(jù)可用性同云、可擴(kuò)展性糖权、易用性、以及準(zhǔn)確性等方面提供更好的支持炸站,因此需要建設(shè)實(shí)時(shí)數(shù)倉(cāng)星澳。主要目標(biāo)包含如下所示:
統(tǒng)一收斂數(shù)據(jù)出口:統(tǒng)一數(shù)據(jù)口徑,減少數(shù)據(jù)重復(fù)性建設(shè)旱易;
降低數(shù)據(jù)維護(hù)成本:提升數(shù)據(jù)準(zhǔn)確性禁偎、及時(shí)性腿堤,優(yōu)化數(shù)據(jù)使用體驗(yàn)和成本;
減少數(shù)據(jù)使用成本:提高數(shù)據(jù)復(fù)用率如暖,避免實(shí)時(shí)數(shù)據(jù)重復(fù)消費(fèi)笆檀。
4.2 如何構(gòu)建實(shí)時(shí)數(shù)倉(cāng)為搜索提供數(shù)據(jù)
當(dāng)前實(shí)時(shí)數(shù)倉(cāng)比較主流的架構(gòu)一般來(lái)說(shuō)包含三個(gè)大的模塊,它們分別是消息隊(duì)列盒至、計(jì)算引擎酗洒、以及存儲(chǔ)。結(jié)合上述對(duì) Kafka 的綜合分析枷遂,結(jié)合搜索的業(yè)務(wù)場(chǎng)景樱衷,引入 Kafka 作為消息隊(duì)列,復(fù)用大數(shù)據(jù)平臺(tái)(BDSP)的能力作為計(jì)算引擎和存儲(chǔ)酒唉,具體架構(gòu)如下圖所示:
4.3 流處理引擎選擇
目前業(yè)界比較通用的流處理引擎主要有兩種矩桂,它們分別是Flink和Spark,那么如何選擇流處理引擎呢痪伦?我們可以對(duì)比以下特征來(lái)決定選擇哪一種流處理引擎侄榴?
Flink作為一款開(kāi)源的大數(shù)據(jù)流式計(jì)算引擎,它同時(shí)支持流批一體网沾,引入Flink作為實(shí)時(shí)數(shù)倉(cāng)建設(shè)的流引擎的主要原因如下:
高吞吐癞蚕、低延時(shí);
靈活的流窗口辉哥;
輕量級(jí)容錯(cuò)機(jī)制涣达;
流批一體
4.4 建設(shè)實(shí)時(shí)數(shù)倉(cāng)遇到的問(wèn)題
在建設(shè)初期,用于實(shí)時(shí)處理的 Kafka 集群規(guī)模較小证薇,單個(gè) Topic 的數(shù)據(jù)容量非常大,不同的實(shí)時(shí)任務(wù)都會(huì)消費(fèi)同一個(gè)大數(shù)據(jù)量的 Topic匆篓,這樣會(huì)導(dǎo)致 Kafka 集群的 I/O 壓力非常的大浑度。
因此,在使用的過(guò)程中會(huì)發(fā)現(xiàn) Kafka 的壓力非常大鸦概,經(jīng)常出現(xiàn)延時(shí)箩张、I/O能性能告警。因此窗市,我們采取了將大數(shù)據(jù)量的單 Topic 進(jìn)行實(shí)時(shí)分發(fā)來(lái)解決這種問(wèn)題先慷,基于 Flink 設(shè)計(jì)了如下圖所示的數(shù)據(jù)分發(fā)流程。
上述流程咨察,隨著業(yè)務(wù)類型和數(shù)據(jù)量的增加论熙,又會(huì)面臨新的問(wèn)題:
數(shù)據(jù)量增加,隨著消費(fèi)任務(wù)的增加摄狱,Kafka 集群 I/O 負(fù)載大時(shí)會(huì)影響消費(fèi)脓诡;
不用業(yè)務(wù)之間 Topic 的消費(fèi)沒(méi)有落地存儲(chǔ)(比如HDFS无午、HBase存儲(chǔ)等),會(huì)產(chǎn)生重復(fù)消費(fèi)的情況祝谚;
數(shù)據(jù)耦合度過(guò)高宪迟,遷移數(shù)據(jù)和任務(wù)難度大。
4.5 實(shí)時(shí)數(shù)倉(cāng)方案進(jìn)階
目前交惯,主流的實(shí)時(shí)數(shù)倉(cāng)架構(gòu)通常有2種次泽,它們分別是Lambda、Kappa席爽。
4.5.1 Lambda
隨著實(shí)時(shí)性需求的提出意荤,為了快速計(jì)算一些實(shí)時(shí)指標(biāo)(比如,實(shí)時(shí)點(diǎn)擊拳昌、曝光等)袭异,會(huì)在離線數(shù)倉(cāng)大數(shù)據(jù)架構(gòu)的基礎(chǔ)上增加一個(gè)實(shí)時(shí)計(jì)算的鏈路,并對(duì)消息隊(duì)列實(shí)現(xiàn)數(shù)據(jù)來(lái)源的流失處理炬藤,通過(guò)消費(fèi)消息隊(duì)列中的數(shù)據(jù) 御铃,用流計(jì)算引擎來(lái)實(shí)現(xiàn)指標(biāo)的增量計(jì)算,并推送到下游的數(shù)據(jù)服務(wù)中去沈矿,由下游數(shù)據(jù)服務(wù)層完成離線和實(shí)時(shí)結(jié)果的匯總上真。具體流程如下:
4.5.2 Kappa
Kappa架構(gòu)只關(guān)心流式計(jì)算,數(shù)據(jù)以流的方式寫(xiě)入到 Kafka 羹膳,然后通過(guò) Flink 這類實(shí)時(shí)計(jì)算引擎將計(jì)算結(jié)果存放到數(shù)據(jù)服務(wù)層以供查詢睡互。可以看作是在Lambda架構(gòu)的基礎(chǔ)上簡(jiǎn)化了離線數(shù)倉(cāng)的部分陵像。具體流程如下:
在實(shí)際建設(shè)實(shí)時(shí)數(shù)倉(cāng)的過(guò)程中就珠,我們結(jié)合這2種架構(gòu)的思想來(lái)使用。實(shí)時(shí)數(shù)倉(cāng)引入了類似于離線數(shù)倉(cāng)的分層理念醒颖,主要是為了提供模型的復(fù)用率妻怎,同時(shí)也要考慮易用性、一致性泞歉、以及計(jì)算的成本逼侦。
4.5.3 實(shí)時(shí)數(shù)倉(cāng)分層
在進(jìn)階建設(shè)實(shí)時(shí)數(shù)倉(cāng)時(shí),分層架構(gòu)的設(shè)計(jì)并不會(huì)像離線數(shù)倉(cāng)那邊復(fù)雜腰耙,這是為了避免數(shù)據(jù)計(jì)算鏈路過(guò)長(zhǎng)造成不必要的延時(shí)情況榛丢。具體流程圖如下所示:
ODS層:以Kafka 作為消息隊(duì)列,將所有需要實(shí)時(shí)計(jì)算處理的數(shù)據(jù)放到對(duì)應(yīng)的 Topic 進(jìn)行處理挺庞;
DW層:通過(guò)Flink實(shí)時(shí)消費(fèi)Topic中的數(shù)據(jù)晰赞,然后通過(guò)數(shù)據(jù)清理、多維度關(guān)聯(lián)(JOIN)等,將一些相同維度的業(yè)務(wù)系統(tǒng)宾肺、維表中的特征屬性進(jìn)行關(guān)聯(lián)溯饵,提供數(shù)據(jù)易用性和復(fù)用性能力,最終得到實(shí)時(shí)明細(xì)數(shù)據(jù)锨用;
DIM層:用來(lái)存儲(chǔ)關(guān)聯(lián)的查詢的維度信息丰刊,存儲(chǔ)介質(zhì)可以按需選擇,比如HBase增拥、Redis啄巧、MySQL等;
DA層:針對(duì)實(shí)時(shí)數(shù)據(jù)場(chǎng)景需求掌栅,進(jìn)行高度聚合匯總秩仆,服務(wù)于KV、BI等場(chǎng)景。OLAP分析可以使用ClickHouse,KV可以選擇HBase(若數(shù)據(jù)量較小样漆,可以采用Redis)。
通過(guò)上面的流程齐莲,建設(shè)實(shí)時(shí)數(shù)倉(cāng)分層時(shí),確保了對(duì)實(shí)時(shí)計(jì)算要求比較高的任務(wù)不會(huì)影響到BI報(bào)表磷箕、或者KV查詢选酗。但是,會(huì)有新的問(wèn)題需要解決:
Kafka 實(shí)時(shí)數(shù)據(jù)如何點(diǎn)查岳枷?
消費(fèi)任務(wù)異常時(shí)如何分析芒填?
4.5.4 Kafka監(jiān)控
針對(duì)這些問(wèn)題,我們調(diào)研和引入了Kafka 監(jiān)控系統(tǒng)——Kafka Eagle(目前改名為EFAK)空繁。復(fù)用該監(jiān)控系統(tǒng)中比較重要的維度監(jiān)控功能殿衰。
Kafka Eagle處理能夠滿足上訴兩個(gè)維度的監(jiān)控需求之外,還提供了一些日常比較實(shí)用的功能盛泡,比如Topic記錄查看播玖、Topic容量查看、消費(fèi)和生產(chǎn)任務(wù)的速率饭于、消費(fèi)積壓等。我們采用了 Kafka-Eagle 來(lái)作為對(duì)實(shí)時(shí)數(shù)倉(cāng)的任務(wù)監(jiān)控维蒙。Kafka-Eagle 系統(tǒng)設(shè)計(jì)架構(gòu)如下圖所示:
Kafka-Eagle 是一款完全開(kāi)源的對(duì) Kafka 集群及應(yīng)用做全面監(jiān)控的系統(tǒng)掰吕,其核心由以下幾個(gè)部分組成:
數(shù)據(jù)采集:核心數(shù)據(jù)來(lái)源 JMX 和 API 獲取颅痊;
數(shù)據(jù)存儲(chǔ):支持 MySQL 和 Sqlite 存儲(chǔ)殖熟;
數(shù)據(jù)展示:消費(fèi)者應(yīng)用、圖表趨勢(shì)監(jiān)控(包括集群狀態(tài)斑响、消費(fèi)生產(chǎn)速率菱属、消費(fèi)積壓等)钳榨、開(kāi)發(fā)的分布式 KSQL 查詢引擎,通過(guò) KSQL 消息查詢纽门;
數(shù)據(jù)告警:支持常用的 IM 告警(微信薛耻,釘釘,WebHook等)赏陵,同時(shí)郵件饼齿、短信、電話告警也一并支持蝙搔。
部分預(yù)覽截圖如下:
1)Topic最近7天寫(xiě)入量分布
默認(rèn)展示所有Topic的每天寫(xiě)入總量分布缕溉,可選擇時(shí)間維度、Topic聚合維度吃型,來(lái)查看寫(xiě)入量的分布情況证鸥,預(yù)覽截圖如下所示:
2)KSQL查詢Topic消息記錄
可以通過(guò)編寫(xiě)SQL語(yǔ)句,來(lái)查詢(支持過(guò)濾條件)Topic中的消息記錄勤晚,預(yù)覽截圖如下所示:
3)消費(fèi)Topic積壓詳情
可以監(jiān)控所有被消費(fèi)的Topic的消費(fèi)速率枉层、消費(fèi)積壓等詳情,預(yù)覽截圖如下所示: