1. 背景
Hadoop體系雖然在目前應(yīng)用非常廣泛西傀,但架構(gòu)繁瑣、運維復(fù)雜度過高捎稚、版本升級困難乐横,且由于部門原因,數(shù)據(jù)中臺需求排期較長今野,我們急需探索敏捷性開發(fā)的數(shù)據(jù)平臺模式葡公。在目前云原生架構(gòu)的普及和湖倉一體化的大背景下,我們已經(jīng)確定了將Doris作為離線數(shù)據(jù)倉庫条霜,將TiDB(目前已經(jīng)應(yīng)用于生產(chǎn))作為實時數(shù)據(jù)平臺催什,同時因為Doris具有 on MySQL 的odbc能力,所以又可以對外部數(shù)據(jù)庫資源進行整合蛔外,統(tǒng)一對外輸出報表 [圖片上傳中...(image-1d98f3-1676355289636-9)]
2. 遇到的問題
在數(shù)據(jù)引擎上蛆楞,我們確定使用Spark和Flink
- 使用Spark on K8s client 客戶端模式做離線數(shù)據(jù)處理
- 使用Flink on K8s Native-Application/Session 模式做實時任務(wù)流管理
在這里,實際上有一些問題我們一直沒有徹底解決夹厌。用過Native-Application模式的朋友都知道,每提交一個任務(wù)裆悄,都需要打包新的鏡像矛纹,提交到私有倉庫,然后再調(diào)用Flink Run 指令溝通K8s,去拉取鏡像運行Pod光稼。任務(wù)提交之后或南,還需要去K8s查看log, 但是:
- 任務(wù)運行監(jiān)控怎么處理?
- 使用Cluster模式還是Nodeport暴露端口訪問Web UI艾君?
- 提交任務(wù)能否簡化打包鏡像的流程?
- 如何減少開發(fā)壓力采够?
3. 解決問題的過程
以上的這些其實都是需要解決的問題,如果單純的使用命令行去提交每個任務(wù)冰垄,是不現(xiàn)實的蹬癌,任務(wù)量大了,會變得不可維護虹茶。如何解決這些問題變成一個不得不面對的問題逝薪。
簡化鏡像構(gòu)建
首先,針對Flink原生鏡像需要二次build的問題:我們利用了MINIO作為外部存儲蝴罪,并使用s3-fuse通過DaemonSet的方式直接掛載在了每個宿主節(jié)點上董济,我們所需要提交的jar包都可以放到上面統(tǒng)一管理,這樣的話要门,即使擴縮容Flink節(jié)點虏肾,也能實現(xiàn)s3掛載自動伸縮廓啊。
Flink從1.13版本開始,就支持Pod Template封豪,我們可以在Pod Template中利用數(shù)據(jù)卷掛載的方式再將宿主機目錄掛載到每個pod中崖瞭。從而無需鏡像打包而直接在K8s上運行Flink程序。如上圖撑毛,我們將s3先通過s3-fuse Pod掛載在Node1书聚、Node2的/mnt/data-s3fs目錄下,然后再將/mnt/data-s3fs掛載到Pod A中藻雌。 但是雌续,因為對象存儲隨機寫入或追加文件需要重寫整個對象,導(dǎo)致這種方式僅適合于頻繁讀胯杭。而這剛好滿足我們現(xiàn)在的場景驯杜。
引入StreamPark
之前我們寫Flink Sql 基本上都是使用Java包裝Sql,打jar包做个,提交到s3平臺上鸽心,通過命令行方式提交代碼,但這種方式始終不友好居暖,流程繁瑣顽频,開發(fā)和運維成本太大。我們希望能夠進一步簡化流程太闺,將Flink TableEnvironment 抽象出來糯景,有平臺負(fù)責(zé)初始化、打包運行Flink任務(wù)省骂,實現(xiàn)Flink應(yīng)用程序的構(gòu)建蟀淮、測試和部署自動化。
這是個開源興起的時代钞澳,我們自然而然的將目光投向開源領(lǐng)域中怠惶,在一眾開源項目中,經(jīng)過對比各個項目綜合評估發(fā)現(xiàn) Zeppelin 和 StreamPark 這兩個項目對Flink的支持較為完善,都宣稱支持 Flink on K8s 轧粟,最終進入到我們的目標(biāo)選擇范圍中策治,以下是兩者在K8s相關(guān)支持的簡單比較(目前如果有更新,麻煩批評指正)逃延。
功能 | Zeppelin | StreamPark |
---|---|---|
任務(wù)狀態(tài)監(jiān)控 | 稍低 ,不能作為任務(wù)狀態(tài)監(jiān)控工具 | 較高 |
任務(wù)資源管理 | 無 | 有 览妖,但目前版本還不是很健全 |
本地化部署 | 稍低 ,on K8s模式只能將Zeppelin部署在K8s中,否則就需要打通pod和外部網(wǎng)絡(luò),但是這在生產(chǎn)環(huán)境中很少這樣做的 | 可以本地化部署 |
多語言支持 | 較高 ,支持Python/Scala/Java多語言 | 一般 ,目前K8s模式和YARN模式同時支持FlinkSql,并可以根據(jù)自身需求,使用Java/Scala開發(fā)DataStream |
Flink WebUI代理 | 目前還支持的不是很完整 ,主開發(fā)大佬目前是考慮整合ingress | 較好 ,目前支持ClusterIp/NodePort/LoadBalance模式 |
學(xué)習(xí)成本 | 成本較低 ,需要增加額外的參數(shù)學(xué)習(xí),這個和原生的FlinkSql在參數(shù)上有點區(qū)別 | 無成本 ,K8s模式下FlinkSql為原生支持的sql格式;同時支持Custome-Code(用戶編寫代碼開發(fā)Datastream/FlinkSql任務(wù)) |
Flink多版本支持 | 支持 | 支持 |
Flink原生鏡像侵入 | 有侵入 ,需要在Flink鏡像中提前部署jar包,會同jobmanage啟動在同一個pod中,和zeppelin-server通信 | 無侵入 ,但是會產(chǎn)生較多鏡像揽祥,需要定時清理 |
代碼多版本管理 | 支持 | 支持 |
<center style="box-sizing: border-box; font-family: Poppins, sans-serif, -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, "Noto Sans", "Apple Color Emoji", "Segoe UI Emoji", "Segoe UI Symbol", "Noto Color Emoji"; font-size: 16px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; background-color: rgb(255, 255, 255); text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; color: gray;">(PS: 此處僅從調(diào)研用戶角度出發(fā)讽膏,我們對雙方開發(fā)都保持極大的尊重)</center>
調(diào)研過程中,我們與兩者的主開發(fā)人員都進行了多次溝通。經(jīng)過我們反復(fù)研究之后拄丰,還是決定將 StreamPark 作為我們目前的Flink開發(fā)工具來使用府树。
<center style="box-sizing: border-box; font-family: Poppins, sans-serif, -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, "Noto Sans", "Apple Color Emoji", "Segoe UI Emoji", "Segoe UI Symbol", "Noto Color Emoji"; font-size: 16px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; background-color: rgb(255, 255, 255); text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; color: gray;">(StreamPark 官網(wǎng)的閃屏)</center>
經(jīng)過開發(fā)同學(xué)長時間開發(fā)測試俐末,StreamPark 目前已經(jīng)具備:
- 完善的Sql校驗功能
- 實現(xiàn)了自動build/push鏡像
- 使用自定義類加載器,通過Child-first 加載方式 解決了YARN和K8s兩種運行模式、支持了自由切換Flink多版本
- 與Flink-Kubernetes進行深度整合奄侠,提交任務(wù)后返回WebUI卓箫,通過remote rest api + remote K8s ,追蹤任務(wù)執(zhí)行狀態(tài)
- 同時支持了 Flink1.12垄潮、1.13烹卒、1.14 等版本
以上基本解決了我們目前開發(fā)和運維中存在的大部分問題。
在目前最新發(fā)布的1.2.0版本中弯洗,StreamPark較為完善的支持了K8s-Native-Application和K8s-session-Application模式旅急。
K8s Native Application 模式
在StreamPark中,我們只需要配置相應(yīng)的參數(shù)牡整,并在Maven pom中填寫相應(yīng)的依賴藐吮,或者上傳依賴jar包,點擊Apply逃贝,相應(yīng)的依賴就會生成谣辞。這就意味著我們也可以將所有使用的Udf打成jar包 and 各種 connector.jar,直接在sql中使用沐扳。如下圖:
[圖片上傳中...(image-9903e2-1676355289636-7)]
Sql校驗?zāi)芰?Zeppelin基本一致:
我們也可以指定資源泥从,指定Flink Run中的動態(tài)參數(shù)Dynamic Option,甚至參數(shù)可以整合pod template
程序保存后迫皱,點擊運行時歉闰,也可以指定savepoint。任務(wù)提交成功后卓起,StreamPark會根據(jù)FlinkPod網(wǎng)絡(luò)Exposed Type(loadBalancer/Nodeport/ClusterIp),返回相應(yīng)的WebURL,從而自然的實現(xiàn)WebUI跳轉(zhuǎn),但是目前因為線上私有K8s集群出于安全性考慮,尚未打通Pod與客戶端節(jié)點網(wǎng)絡(luò)(目前也沒有這個規(guī)劃)凹炸,所以我們只使用Nodeport戏阅。如果后續(xù)任務(wù)數(shù)過多,有使用ClusterIP的需求的話啤它,我們可能會將StreamPark 部署在K8s奕筐,或者同ingress做進一步整合。
注意:K8s master 如果使用vip做均衡代理的情況下变骡,F(xiàn)link 1.13版本會返回vip的ip地址离赫,1.14版本已經(jīng)修復(fù)。 下面是K8s Application模式下具體提交流程
K8s Native Session 模式
StreamPark還較好的支持了 K8s Native-Sesson模式 塌碌,這為我們后續(xù)做離線FlinkSql開發(fā)或部分資源隔離做了較好的技術(shù)支持渊胸。
Native-session模式需要事先使用Flink命令創(chuàng)建一個運行在K8s中的Flink集群,如下:
./kubernetes-session.sh \-Dkubernetes.cluster-id=flink-on-k8s-flinkSql-test \-Dkubernetes.context=XXX \-Dkubernetes.namespace=XXXX \-Dkubernetes.service-account=XXXX \-Dkubernetes.container.image=XXXX \-Dkubernetes.container.image.pull-policy=Always \-Dkubernetes.taskmanager.node-selector=XXXX \-Dkubernetes.rest-service.exposed.type=Nodeport
如上圖,使用該ClusterId作為StreamPark的任務(wù)參數(shù)Kubernetes ClusterId台妆。保存提交任務(wù)后翎猛,任務(wù)會很快處于Running狀態(tài)
我們順著application info的WebUI點擊跳轉(zhuǎn)
可以看到胖翰,其實StreamPark是將jar包通過Rest Api上傳到Flink集群上,并調(diào)度執(zhí)行任務(wù)的切厘。
Custom Code模式
另我們驚喜的是萨咳,StreamPark 還支持代碼編寫DataStream/FlinkSql任務(wù)。對于特殊需求疫稿,我們可以自己寫Java/Scala實現(xiàn)培他。可以根據(jù)StreamPark推薦的腳手架方式編寫任務(wù)遗座,也可以編寫一個標(biāo)準(zhǔn)普通的Flink任務(wù)舀凛,通過這種方式我們可以將代碼管理交由Git實現(xiàn),平臺可以用來自動化編譯打包與部署员萍。當(dāng)然腾降,如果能用Sql實現(xiàn)的功能,我們會盡量避免自定義DataStream碎绎,減少不必要的運維麻煩格郁。
4. 意見和規(guī)劃
改進意見
當(dāng)然StreamPark還有很多需要改進的地方汤功,就目前測試來看:
- 資源管理還有待加強 多文件系統(tǒng)jar包等資源管理功能尚未添加,任務(wù)版本功能有待加強。
- 前端buttern 功能還不夠豐富 比如任務(wù)添加后續(xù)可以增加復(fù)制等功能按鈕费封。
- 任務(wù)提交日志也需要可視化展示 任務(wù)提交伴隨著加載class文件,打jar包杭煎,build鏡像纽哥,提交鏡像,提交任務(wù)等過程代箭,每一個環(huán)節(jié)出錯墩划,都會導(dǎo)致任務(wù)的失敗,但是失敗日志往往不明確嗡综,或者因為某種原因?qū)е庐惓N凑伋鲆野铮瑳]有轉(zhuǎn)換任務(wù)狀態(tài),用戶會無從下手改進极景。
眾所周知察净,一個新事物的出現(xiàn)一開始總會不是那么完美。盡管有些許問題和需要改進的point盼樟,但是瑕不掩瑜氢卡,我們?nèi)匀贿x擇StreamPark作為我們的Flink DevOps,我們也將會和主開發(fā)人員一道共同完善StreamPark晨缴,也歡迎更多的人來使用译秦,為StreamPark帶來更多進步。
未來規(guī)劃
我們會繼續(xù)跟進doris,并將業(yè)務(wù)數(shù)據(jù) + 日志數(shù)據(jù)統(tǒng)一入doris诀浪,通過Flink實現(xiàn)湖倉一體棋返;
我們也會逐步將探索StreamPark同dolphinscheduler 2.x進行整合,完善dolphinscheduler離線任務(wù)雷猪,逐步用Flink 替換掉Spark睛竣,實現(xiàn)真正的流批一體;
基于我們自身在s3上的探索積累求摇,fat-jar包 build 完成之后不再構(gòu)建鏡像射沟,直接利用Pod Tempelet掛載pvc到Flink pod中的目錄,進一步優(yōu)化代碼提交流程与境;
將StreamPark持續(xù)應(yīng)用到我們生產(chǎn)中验夯,并匯同社區(qū)開發(fā)人員,共同努力摔刁,增強StreamPark在Flink流上的開發(fā)部署能力與運行監(jiān)控能力挥转,努力把StreamPark打造成一個功能完善的流數(shù)據(jù) DevOps。
附:
StreamPark Github: https://github.com/apache/incubator-streampark
Doris Github: https://github.com/apache/incubator-doris