小米業(yè)務線眾多称近,從信息流第队,電商,廣告到金融等覆蓋了眾多領域刨秆,小米流式平臺為小米集團各業(yè)務提供一體化的流式數(shù)據(jù)解決方案凳谦,主要包括數(shù)據(jù)采集,數(shù)據(jù)集成和流式計算三個模塊坛善。目前每天數(shù)據(jù)量達到 1.2 萬億條晾蜘,實時同步任務 1.5 萬,實時計算的數(shù)據(jù) 1 萬億條眠屎。
伴隨著小米業(yè)務的發(fā)展剔交,流式平臺也經(jīng)歷三次大升級改造,滿足了眾多業(yè)務的各種需求改衩。最新的一次迭代基于 Apache Flink岖常,對于流式平臺內(nèi)部模塊進行了徹底的重構,同時小米各業(yè)務也在由 Spark Streaming 逐步切換到 Flink葫督。
背景介紹
小米流式平臺的愿景是為小米所有的業(yè)務線提供流式數(shù)據(jù)的一體化竭鞍、平臺化解決方案。具體來講包括以下三個方面:
流式數(shù)據(jù)存儲:流式數(shù)據(jù)存儲指的是消息隊列橄镜,小米開發(fā)了一套自己的消息隊列偎快,其類似于 Apache kafka,但它有自己的特點洽胶,小米流式平臺提供消息隊列的存儲功能晒夹;
流式數(shù)據(jù)接入和轉儲:有了消息隊列來做流式數(shù)據(jù)的緩存區(qū)之后,繼而需要提供流式數(shù)據(jù)接入和轉儲的功能姊氓;
流式數(shù)據(jù)處理:指的是平臺基于 Flink丐怯、Spark Streaming 和 Storm 等計算引擎對流式數(shù)據(jù)進行處理的過程。
下圖展示了流式平臺的整體架構翔横。從左到右第一列橙色部分是數(shù)據(jù)源读跷,包含兩部分,即 User 和 Database禾唁。
User 指的是用戶各種各樣的埋點數(shù)據(jù)效览,如用戶 APP 和 WebServer 的日志,其次是 Database 數(shù)據(jù)荡短,如 MySQL丐枉、HBase 和其他的 RDS 數(shù)據(jù)。
中間藍色部分是流式平臺的具體內(nèi)容肢预,其中 Talos 是小米實現(xiàn)的消息隊列矛洞,其上層包含 Consumer SDK 和 Producer SDK洼哎。
此外小米還實現(xiàn)了一套完整的 Talos Source烫映,主要用于收集剛才提到的用戶和數(shù)據(jù)庫的全場景的數(shù)據(jù)沼本。
Talos Sink 和 Source 共同組合成一個數(shù)據(jù)流服務,主要負責將 Talos 的數(shù)據(jù)以極低的延遲轉儲到其他系統(tǒng)中锭沟;Sink 是一套標準化的服務抽兆,但其不夠定制化,后續(xù)會基于 Flink SQL 重構 Talos Sink 模塊族淮。
下圖展示了小米的業(yè)務規(guī)模辫红。在存儲層面小米每天大概有 1.2 萬億條消息,峰值流量可以達到 4300 萬條每秒祝辣。轉儲模塊僅 Talos Sink 每天轉儲的數(shù)據(jù)量就高達 1.6 PB贴妻,轉儲作業(yè)目前將近有 1.5 萬個。每天的流式計算作業(yè)超過 800 個蝙斜,F(xiàn)link 作業(yè)超過 200 個名惩,F(xiàn)link 每天處理的消息量可以達到 7000 億條,數(shù)據(jù)量在 1 PB 以上孕荠。
小米流式平臺發(fā)展歷史
小米流式平臺發(fā)展歷史分為如下三個階段:
Streaming Platform 1.0:小米流式平臺的 1.0 版本構建于 2010 年娩鹉,其最初使用的是 Scribe、Kafka 和 Storm稚伍,其中 Scribe 是一套解決數(shù)據(jù)收集和數(shù)據(jù)轉儲的服務弯予。
Streaming Platform 2.0:由于 1.0 版本存在的種種問題,我們自研了小米自己的消息隊列 Talos个曙,還包括 Talos Source锈嫩、Talos Sink,并接入了 Spark Streaming困檩。
Streaming Platform 3.0:該版本在上一個版本的基礎上增加了 Schema 的支持祠挫,還引入了 Flink 和 Stream SQL。
Streaming Platform 1.0 整體是一個級聯(lián)的服務悼沿,前面包括 Scribe Agent 和 Scribe Server 的多級級聯(lián)等舔,主要用于收集數(shù)據(jù),然后滿足離線計算和實時計算的場景糟趾。離線計算使用的是 HDFS 和 Hive慌植,實時計算使用的是 Kafka 和 Storm。雖然這種離線加實時的方式可以基本滿足小米當時的業(yè)務需求义郑,但也存在一系列的問題蝶柿。
首先是 Scribe Agent 過多,而配置和包管理機制缺乏非驮,導致維護成本非常高交汤;
Scribe 采用的 Push 架構,異常情況下無法有效緩存數(shù)據(jù),同時 HDFS / Kafka 數(shù)據(jù)相互影響芙扎;
最后數(shù)據(jù)鏈級聯(lián)比較長的時候星岗,整個全鏈路數(shù)據(jù)黑盒,缺乏監(jiān)控和數(shù)據(jù)檢驗機制戒洼。
為了解決 Streaming Platform 1.0 的問題俏橘,小米推出了 Streaming Platform 2.0 版本。該版本引入了 Talos圈浇,將其作為數(shù)據(jù)緩存區(qū)來進行流式數(shù)據(jù)的存儲寥掐,左側是多種多樣的數(shù)據(jù)源,右側是多種多樣的 Sink磷蜀,即將原本的級聯(lián)架構轉換成星型架構召耘,優(yōu)點是方便地擴展。
由于 Agent 自身數(shù)量及管理的流較多(具體數(shù)據(jù)均在萬級別)褐隆,為此該版本實現(xiàn)了一套配置管理和包管理系統(tǒng)怎茫,可以支持 Agent 一次配置之后的自動更新和重啟等。
此外妓灌,小米還實現(xiàn)了去中心化的配置服務轨蛤,配置文件設定好后可以自動地分發(fā)到分布式結點上去。
最后虫埂,該版本還實現(xiàn)了數(shù)據(jù)的端到端監(jiān)控祥山,通過埋點來監(jiān)控數(shù)據(jù)在整個鏈路上的數(shù)據(jù)丟失情況和數(shù)據(jù)傳輸延遲情況等。
Streaming Platform 2.0 的優(yōu)勢主要有:
引入了 Multi Source & Multi Sink掉伏,之前兩個系統(tǒng)之間導數(shù)據(jù)需要直接連接缝呕,現(xiàn)在的架構將系統(tǒng)集成復雜度由原來的 O(M*N) 降低為 O(M+N);
引入配置管理和包管理機制斧散,徹底解決系統(tǒng)升級供常、修改和上線等一系列問題,降低運維的壓力鸡捐;
引入端到端數(shù)據(jù)監(jiān)控機制栈暇,實現(xiàn)全鏈路數(shù)據(jù)監(jiān)控,量化全鏈路數(shù)據(jù)質(zhì)量箍镜;
產(chǎn)品化解決方案源祈,避免重復建設,解決業(yè)務運維問題色迂。
下圖詳細介紹一下 MySQL 同步的案例香缺,場景是將 MySQL 的一個表通過上述的機制同步到消息隊列 Talos。具體流程是 Binlog 服務偽裝成 MySQL 的 Slave歇僧,向 MySQL 發(fā)送 Dump binlog 請求图张;MySQL 收到 Dump 請求后,開始推動 Binlog 給 Binlog 服務;Binlog 服務將 binlog 以嚴格有序的形式轉儲到 Talos祸轮。之后會接入 Spark Streaming 作業(yè)姑隅,對 binlog 進行解析,解析結果寫入到 Kudu 表中倔撞。目前平臺支持寫入到 Kudu 中的表的數(shù)量級超過 3000 個。
Agent Source 的功能模塊如下圖所示慕趴。其支持 RPC痪蝇、Http 協(xié)議,并可以通過 File 來監(jiān)聽本地文件冕房,實現(xiàn)內(nèi)存和文件雙緩存躏啰,保證數(shù)據(jù)的高可靠。平臺基于 RPC 協(xié)議實現(xiàn)了 Logger Appender 和 RPC 協(xié)議的 SDK耙册;對于 Http 協(xié)議實現(xiàn)了 HttpClient给僵;對于文件實現(xiàn)了 File Watcher 來對本地文件進行自動地發(fā)現(xiàn)和掃描,Offset Manager 自動記錄 offset详拙;Agent 機制與 K8S 環(huán)境深度整合帝际,可以很容易地和后端的流式計算等相結合。
下圖是 Talos Sink 的邏輯流程圖饶辙,其基于 Spark Streaming 來實現(xiàn)一系列流程蹲诀。最左側是一系列 Talos Topic 的 Partition 分片,基于每個 batch 抽象公共邏輯弃揽,如 startProcessBatch() 和 stopProcessBatch()脯爪,不同 Sink 只需要實現(xiàn) Write 邏輯;不同的 Sink 獨立為不同的作業(yè)矿微,避免相互影響痕慢;Sink 在 Spark Streaming 基礎上進行了優(yōu)化,實現(xiàn)了根據(jù) Topic 流量進行動態(tài)資源調(diào)度涌矢,保證系統(tǒng)延遲的前提下最大限度節(jié)省資源掖举。
下圖是平臺實現(xiàn)的端到端數(shù)據(jù)監(jiān)控機制。具體實現(xiàn)是為每個消息都有一個時間戳 EventTime娜庇,表示這個消息真正生成的時間拇泛,根據(jù) EventTime 來劃分時間窗口,窗口大小為一分鐘思灌,數(shù)據(jù)傳輸?shù)拿恳惶y(tǒng)計當前時間窗口內(nèi)接受到的消息數(shù)量俺叭,最后統(tǒng)計出消息的完整度。延遲是計算某一跳 ProcessTime 和 EventTime 之間的差值泰偿。
Streaming Platform 2.0 目前的問題主要有三點:
Talos 數(shù)據(jù)缺乏 Schema 管理熄守,Talos 對于傳入的數(shù)據(jù)是不理解的,這種情況下無法使用 SQL 來消費 Talos 的數(shù)據(jù);
Talos Sink 模塊不支持定制化需求裕照,例如從 Talos 將數(shù)據(jù)傳輸?shù)?Kudu 中攒发,Talos 中有十個字段,但 Kudu 中只需要 5 個字段晋南,該功能目前無法很好地支持惠猿;
Spark Streaming 自身問題,不支持 Event Time负间,端到端 Exactly Once 語義偶妖。
基于 Flink 的實時數(shù)倉
為了解決 Streaming Platform 2.0 的上述問題,小米進行了大量調(diào)研政溃,也和阿里的實時計算團隊做了一系列溝通和交流趾访,最終決定將使用 Flink 來改造平臺當前的流程,下面具體介紹小米流式計算平臺基于Flink的實踐董虱。
使用 Flink 對平臺進行改造的設計理念如下:
全鏈路 Schema 支持扼鞋,這里的全鏈路不僅包含 Talos 到 Flink 的階段,而是從最開始的數(shù)據(jù)收集階段一直到后端的計算處理愤诱。需要實現(xiàn)數(shù)據(jù)校驗機制云头,避免數(shù)據(jù)污染;字段變更和兼容性檢查機制淫半,在大數(shù)據(jù)場景下盘寡,Schema 變更頻繁,兼容性檢查很有必要撮慨,借鑒 Kafka 的經(jīng)驗竿痰,在 Schema 引入向前、向后或全兼容檢查機制砌溺。
借助 Flink 社區(qū)的力量全面推進 Flink 在小米的落地影涉,一方面 Streaming 實時計算的作業(yè)逐漸從 Spark、Storm 遷移到 Flink规伐,保證原本的延遲和資源節(jié)省蟹倾,目前小米已經(jīng)運行了超過 200 個 Flink 作業(yè);另一方面期望用 Flink 改造 Sink 的流程猖闪,提升運行效率的同時鲜棠,支持 ETL,在此基礎上大力推進 Streaming SQL培慌;
實現(xiàn) Streaming 產(chǎn)品化豁陆,引入 Streaming Job 和 Streaming SQL 的平臺化管理;
基于 Flink SQL 改造 Talos Sink吵护,支持業(yè)務邏輯定制化
下圖是 Streaming Platform 3.0 版本的架構圖盒音,與 2.0 版本的架構設計類似表鳍,只是表達的角度不同。具體包含以下幾個模塊:
抽象 Table:該版本中各種存儲系統(tǒng)如 MySQL 和 Hive 等都會抽象成 Table祥诽,為 SQL 化做準備譬圣。
Job 管理:提供 Streaming 作業(yè)的管理支持,包括多版本支持雄坪、配置與Jar分離厘熟、編譯部署和作業(yè)狀態(tài)管理等常見的功能。
SQL 管理:SQL 最終要轉換為一個 Data Stream 作業(yè)维哈,該部分功能主要有 Web IDE 支持绳姨、Schema 探查、UDF/維表 Join笨农、SQL 編譯、自動構建 DDL 和 SQL 存儲等帖渠。
Talos Sink:該模塊基于 SQL 管理對 2.0 版本的 Sink 重構谒亦,包含的功能主要有一鍵建表、Sink 格式自動更新空郊、字段映射份招、作業(yè)合并、簡單 SQL 和配置管理等狞甚。前面提到的場景中锁摔,基于 Spark Streaming 將 Message 從 Talos 讀取出來,并原封不動地轉到 HDFS 中做離線數(shù)倉的分析哼审,此時可以直接用 SQL 表達很方便地實現(xiàn)谐腰。未來希望實現(xiàn)該模塊與小米內(nèi)部的其他系統(tǒng)如 ElasticSearch 和 Kudu 等進行深度整合,具體的場景是假設已有 Talos Schema涩盾,基于 Talos Topic Schema 自動幫助用戶創(chuàng)建 Kudu 表十气。
平臺化:為用戶提供一體化、平臺化的解決方案春霍,包括調(diào)試開發(fā)砸西、監(jiān)控報警和運維等。
Job 管理
Job 管理提供 Job 全生命周期管理址儒、Job 權限管理和 Job 標簽管理等功能芹枷;支持Job 運行歷史展示,方便用戶追溯莲趣;支持 Job 狀態(tài)與延遲監(jiān)控鸳慈,可以實現(xiàn)失敗作業(yè)自動拉起。
SQL 管理
主要包括以下四個環(huán)節(jié):
將外部表轉換為 SQL DDL喧伞,對應 Flink 1.9 中標準的 DDL 語句蝶涩,主要包含 Table Schema理朋、Table Format 和 Connector Properities。
基于完整定義的外部 SQL 表绿聘,增加 SQL 語句嗽上,既可以得到完成的表達用戶的需求。即 SQL Config 表示完整的用戶預計表達熄攘,由 Source Table DDL、Sink Table DDL 和 SQL DML語句組成挪圾。
將 SQL Config 轉換成 Job Config浅萧,即轉換為 Stream Job 的表現(xiàn)形式。
將 Job Config 轉換為 JobGraph哲思,用于提交 Flink Job洼畅。
外部表轉換成 SQL DDL 的流程如下圖所示。
首先根據(jù)外部表獲取 Table Schema 和 Table Format 信息棚赔,后者用于反解數(shù)據(jù)帝簇,如對于 Hive 數(shù)據(jù)反序列化;
然后再后端生成默認的 Connector 配置靠益,該配置主要分為三部分丧肴,即不可修改的、帶默認值的用戶可修改的胧后、不帶默認值的用戶必須配置的芋浮。
不可修改的配置情況是假設消費的是 Talos 組件,那么 connector.type 一定是 talos壳快,則該配置不需要改浪册;而默認值是從 Topic 頭部開始消費畏腕,但用戶可以設置從尾部開始消費立莉,這種情況屬于帶默認值但是用戶可修改的配置但壮;而一些權限信息是用戶必須配置的。
之所以做三層配置管理凛驮,是為了盡可能減少用戶配置的復雜度裆站。Table Schema、Table Format 和 Connector 1 其他配置信息黔夭,組成了SQL DDL宏胯。將 SQL Config 返回給用戶之后,對于可修改的需要用戶填寫本姥,這樣便可以完成從外部表到 SQL DDL 的轉換肩袍,紅色字體表示的是用戶修改的信息。
SQL 管理引入了一個 External Table 的特性婚惫。假設用戶在平臺上選擇消費某個 Topic 的時候氛赐,該特性會自動地獲取上面提到的 Table 的 Schema 和 Format 信息魂爪,并且顯示去掉了注冊 Flink Table 的邏輯;獲取 Schema 時艰管,該特性會將外部表字段類型自動轉換為 Flink Table 字段類型滓侍,并自動注冊為 Flink Tab 了。同時將 Connector Properties 分成三類牲芋,參數(shù)帶默認值撩笆,只有必須項要求用戶填寫;所有參數(shù)均采用 Map 的形式表達缸浦,非常便于后續(xù)轉化為 Flink 內(nèi)部的 TableDescriptor夕冲。
上面介紹了 SQL DDL 的創(chuàng)建過程,在已經(jīng)創(chuàng)建的 SQL DDL 的基礎上裂逐,如 Source SQL DDL 和 Sink SQL DDL歹鱼,要求用戶填寫 SQL query 并返回給后端,后端會對 SQL 進行驗證卜高,然后會生成一個 SQL Config弥姻,即一個 SQL 語句的完整表達。
SQL Config 轉換為 Job Config 的流程如下圖所示篙悯。
首先在 SQL Config 的基礎上增加作業(yè)所需要的資源蚁阳、Job 的相關配置(Flink 的 state 參數(shù)等)铃绒;
然后將 SQLConfig 編譯成一個 Job Descriptor鸽照,即 Job Config 的描述,如 Job 的 Jar 包地址颠悬、MainClass 和 MainArgs 等矮燎。
下圖展示了 Job Config 轉換為 Job Graph 的過程。對于 DDL 中的 Schema赔癌、Format 和 Property 是和 Flink 中的 Table Descriptor 是一一對應的诞外,這種情況下只需要調(diào)用 Flink 的相關內(nèi)置接口就可以很方便地將信息轉換為 Table Descriptor,如 CreateTableSource()灾票、RegistorTableSource() 等峡谊。通過上述過程,DDL 便可以注冊到 Flink 系統(tǒng)中直接使用刊苍。對于 SQL 語句既们,可以直接使用 TableEnv 的 sqlUpdate() 可以完成轉換。
SQL Config 轉換為一個 Template Job 的流程如下所示正什。前面填寫的 Jar 包地址即該 Template 的 Jar 地址啥纸,MainClass 是該 Template Job。假設已經(jīng)有了 SQL DDL婴氮,可以直接轉換成 Table Descriptor斯棒,然后通過 TableFactorUtil 的 findAndCreateTableSource() 方法得到一個 Table Source盾致,Table Sink 的轉換過程類似。完成前兩步操作后荣暮,最后進行 sqlUpdate() 操作庭惜。這樣便可以將一個 SQL Job 轉換為最后可執(zhí)行的 Job Graph 提交到集群上運行。
Talos Sink 采用了下圖所示的三種模式:
Row:Talos 的數(shù)據(jù)原封不動地灌到目標系統(tǒng)中渠驼,這種模式的好處是數(shù)據(jù)讀取和寫入的時候無需進行序列化和反序列化蜈块,效率較高;
ID mapping:即左右兩邊字段進行 mapping迷扇,name 對應 field_name百揭,timestamp 對應 timestamp,其中 Region 的字段丟掉蜓席;
SQL:通過 SQL 表達來表示邏輯上的處理器一。
未來規(guī)劃
小米流式平臺未來的計劃主要有以下幾點:
在 Flink 落地的時候持續(xù)推進 Streaming Job 和平臺化建設;
使用 Flink SQL 統(tǒng)一離線數(shù)倉和實時數(shù)倉厨内;
在 Schema 的基礎上數(shù)據(jù)血緣分析和展示祈秕,包括數(shù)據(jù)治理方面的內(nèi)容;
持續(xù)參與 Flink 社區(qū)的建設雏胃。
點擊查看??流式平臺架構設計模式及項目源碼