[TOC]
數(shù)據(jù)實(shí)時(shí)處理和實(shí)時(shí)的數(shù)據(jù)
實(shí)時(shí)分為處理的實(shí)時(shí)和數(shù)據(jù)的實(shí)時(shí)
即席分析是要求對(duì)數(shù)據(jù)實(shí)時(shí)的處理轻掩,馬上要得到對(duì)應(yīng)的結(jié)果
Flink、Spark Streaming是用來(lái)對(duì)實(shí)時(shí)數(shù)據(jù)的實(shí)時(shí)處理血当,數(shù)據(jù)要求實(shí)時(shí)锌杀,處理也要迅速
數(shù)據(jù)不實(shí)時(shí)圣猎,處理也不及時(shí)的場(chǎng)景則是我們的數(shù)倉(cāng)T+1數(shù)據(jù)
而本文探討的Apache Hudi,對(duì)應(yīng)的場(chǎng)景是數(shù)據(jù)的實(shí)時(shí)饿这,而非處理的實(shí)時(shí)浊伙。它旨在將Mysql中的時(shí)候以近實(shí)時(shí)的方式映射到大數(shù)據(jù)平臺(tái),比如Hive中长捧。
業(yè)務(wù)場(chǎng)景和技術(shù)選型
傳統(tǒng)的離線數(shù)倉(cāng)嚣鄙,通常數(shù)據(jù)是T+1的,不能滿(mǎn)足對(duì)當(dāng)日數(shù)據(jù)分析的需求
而流式計(jì)算一般是基于窗口串结,并且窗口邏輯相對(duì)比較固定哑子。
而筆者所在的公司有一類(lèi)特殊的需求,業(yè)務(wù)分析比較熟悉現(xiàn)有事務(wù)數(shù)據(jù)庫(kù)的數(shù)據(jù)結(jié)構(gòu)肌割,并且希望有很多即席分析卧蜓,這些分析包含當(dāng)日比較實(shí)時(shí)的數(shù)據(jù)。慣常他們是基于Mysql從庫(kù)把敞,直接通過(guò)Sql做相應(yīng)的分析計(jì)算弥奸。但很多時(shí)候會(huì)遇到如下障礙
- 數(shù)據(jù)量較大、分析邏輯較為復(fù)雜時(shí)奋早,Mysql從庫(kù)耗時(shí)較長(zhǎng)
- 一些跨庫(kù)的分析無(wú)法實(shí)現(xiàn)
因此盛霎,一些彌合在OLTP和OLAP之間的技術(shù)框架出現(xiàn),典型有TiDB耽装。它能同時(shí)支持OLTP和OLAP愤炸。而諸如Apache Hudi和Apache Kudu則相當(dāng)于現(xiàn)有OLTP和OLAP技術(shù)的橋梁。他們能夠以現(xiàn)有OLTP中的數(shù)據(jù)結(jié)構(gòu)存儲(chǔ)數(shù)據(jù)掉奄,支持CRUD规个,同時(shí)提供跟現(xiàn)有OLAP框架的整合(如Hive,Impala)姓建,以實(shí)現(xiàn)OLAP分析
Apache Kudu诞仓,需要單獨(dú)部署集群。而Apache Hudi則不需要速兔,它可以利用現(xiàn)有的大數(shù)據(jù)集群比如HDFS做數(shù)據(jù)文件存儲(chǔ)狂芋,然后通過(guò)Hive做數(shù)據(jù)分析,相對(duì)來(lái)說(shuō)更適合資源受限的環(huán)境
Apache hudi簡(jiǎn)介
使用Aapche Hudi整體思路
Hudi 提供了Hudi 表的概念憨栽,這些表支持CRUD操作。我們可以基于這個(gè)特點(diǎn),將Mysql Binlog的數(shù)據(jù)重放至Hudi表屑柔,然后基于Hive對(duì)Hudi表進(jìn)行查詢(xún)分析屡萤。數(shù)據(jù)流向架構(gòu)如下
Hudi表數(shù)據(jù)結(jié)構(gòu)
Hudi表的數(shù)據(jù)文件,可以使用操作系統(tǒng)的文件系統(tǒng)存儲(chǔ)掸宛,也可以使用HDFS這種分布式的文件系統(tǒng)存儲(chǔ)死陆。為了后續(xù)分析性能和數(shù)據(jù)的可靠性,一般使用HDFS進(jìn)行存儲(chǔ)唧瘾。以HDFS存儲(chǔ)來(lái)看措译,一個(gè)Hudi表的存儲(chǔ)文件分為兩類(lèi)。
- 包含
_partition_key
相關(guān)的路徑是實(shí)際的數(shù)據(jù)文件饰序,按分區(qū)存儲(chǔ)领虹,當(dāng)然分區(qū)的路徑key是可以指定的,我這里使用的是_partition_key - .hoodie 由于CRUD的零散性求豫,每一次的操作都會(huì)生成一個(gè)文件塌衰,這些小文件越來(lái)越多后,會(huì)嚴(yán)重影響HDFS的性能蝠嘉,Hudi設(shè)計(jì)了一套文件合并機(jī)制最疆。 .hoodie文件夾中存放了對(duì)應(yīng)的文件合并操作相關(guān)的日志文件。
數(shù)據(jù)文件
Hudi真實(shí)的數(shù)據(jù)文件使用Parquet文件格式存儲(chǔ)
.hoodie文件
Hudi把隨著時(shí)間流逝蚤告,對(duì)表的一系列CRUD操作叫做Timeline努酸。Timeline中某一次的操作,叫做Instant杜恰。Instant包含以下信息
- Instant Action 記錄本次操作是一次數(shù)據(jù)提交(COMMITS)获诈,還是文件合并(COMPACTION),或者是文件清理(CLEANS)
- Instant Time 本次操作發(fā)生的時(shí)間
- state 操作的狀態(tài)箫章,發(fā)起(REQUESTED)烙荷,進(jìn)行中(INFLIGHT),還是已完成(COMPLETED)
.hoodie文件夾中存放對(duì)應(yīng)操作的狀態(tài)記錄
Hudi記錄Id
hudi為了實(shí)現(xiàn)數(shù)據(jù)的CRUD檬寂,需要能夠唯一標(biāo)識(shí)一條記錄终抽。hudi將把數(shù)據(jù)集中的唯一字段(record key ) + 數(shù)據(jù)所在分區(qū) (partitionPath) 聯(lián)合起來(lái)當(dāng)做數(shù)據(jù)的唯一鍵
COW和MOR
基于上述基礎(chǔ)概念之上,Hudi提供了兩類(lèi)表格式COW和MOR桶至。他們會(huì)在數(shù)據(jù)的寫(xiě)入和查詢(xún)性能上有一些不同
Copy On Write Table
簡(jiǎn)稱(chēng)COW昼伴。顧名思義,他是在數(shù)據(jù)寫(xiě)入的時(shí)候镣屹,復(fù)制一份原來(lái)的拷貝圃郊,在其基礎(chǔ)上添加新數(shù)據(jù)。正在讀數(shù)據(jù)的請(qǐng)求女蜈,讀取的是是近的完整副本持舆,這類(lèi)似Mysql 的MVCC的思想色瘩。
上圖中,每一個(gè)顏色都包含了截至到其所在時(shí)間的所有數(shù)據(jù)逸寓。老的數(shù)據(jù)副本在超過(guò)一定的個(gè)數(shù)限制后居兆,將被刪除。這種類(lèi)型的表竹伸,沒(méi)有compact instant泥栖,因?yàn)閷?xiě)入時(shí)相當(dāng)于已經(jīng)compact了。
- 優(yōu)點(diǎn) 讀取時(shí)勋篓,只讀取對(duì)應(yīng)分區(qū)的一個(gè)數(shù)據(jù)文件即可吧享,較為高效
- 缺點(diǎn) 數(shù)據(jù)寫(xiě)入的時(shí)候,需要復(fù)制一個(gè)先前的副本再在其基礎(chǔ)上生成新的數(shù)據(jù)文件譬嚣,這個(gè)過(guò)程比較耗時(shí)钢颂。且由于耗時(shí),讀請(qǐng)求讀取到的數(shù)據(jù)相對(duì)就會(huì)滯后
Merge On Read Table
簡(jiǎn)稱(chēng)MOR孤荣。新插入的數(shù)據(jù)存儲(chǔ)在delta log 中甸陌。定期再將delta log合并進(jìn)行parquet數(shù)據(jù)文件。讀取數(shù)據(jù)時(shí)盐股,會(huì)將delta log跟老的數(shù)據(jù)文件做merge钱豁,得到完整的數(shù)據(jù)返回。當(dāng)然疯汁,MOR表也可以像COW表一樣牲尺,忽略delta log,只讀取最近的完整數(shù)據(jù)文件幌蚊。下圖演示了MOR的兩種數(shù)據(jù)讀寫(xiě)方式
- 優(yōu)點(diǎn) 由于寫(xiě)入數(shù)據(jù)先寫(xiě)delta log谤碳,且delta log較小,所以寫(xiě)入成本較低
- 缺點(diǎn) 需要定期合并整理compact溢豆,否則碎片文件較多蜒简。讀取性能較差,因?yàn)樾枰獙elta log 和 老數(shù)據(jù)文件合并
基于hudi的代碼實(shí)現(xiàn)
我在github上放置了基于Hudi的封裝實(shí)現(xiàn)漩仙,對(duì)應(yīng)的源碼地址為 https://github.com/wanqiufeng/hudi-learn搓茬。
binlog數(shù)據(jù)寫(xiě)入Hudi表
- binlog-consumer分支使用Spark streaming消費(fèi)kafka中的Binlog數(shù)據(jù),并寫(xiě)入Hudi表队他。Kafka中的binlog是通過(guò)阿里的Canal工具同步拉取的卷仑。程序入口是CanalKafkaImport2Hudi,它提供了一系列參數(shù)麸折,配置程序的執(zhí)行行為
參數(shù)名 | 含義 | 是否必填 | 默認(rèn)值 |
---|---|---|---|
--base-save-path |
hudi表存放在HDFS的基礎(chǔ)路徑锡凝,比如hdfs://192.168.16.181:8020/hudi_data/ | 是 | 無(wú) |
--mapping-mysql-db-name |
指定處理的Mysql庫(kù)名 | 是 | 無(wú) |
--mapping-mysql-table-name |
指定處理的Mysql表名 | 是 | 無(wú) |
--store-table-name |
指定Hudi的表名 | 否 | 默認(rèn)會(huì)根據(jù)--mapping-mysql-db-name和--mapping-mysql-table-name自動(dòng)生成。假設(shè)--mapping-mysql-db-name 為crm垢啼,--mapping-mysql-table-name為order窜锯。那么最終的hudi表名為crm__order |
--real-save-path |
指定hudi表最終存儲(chǔ)的hdfs路徑 | 否 | 默認(rèn)根據(jù)--base-save-path和--store-table-name自動(dòng)生成张肾,生成格式為'--base-save-path'+'/'+'--store-table-name' ,推薦默認(rèn) |
--primary-key |
指定同步的mysql表中能唯一標(biāo)識(shí)記錄的字段名 | 否 | 默認(rèn)id |
--partition-key |
指定mysql表中可以用于分區(qū)的時(shí)間字段衬浑,字段必須是timestamp 或dateime類(lèi)型 | 是 | 無(wú) |
--precombine-key |
最終用于配置hudi的hoodie.datasource.write.precombine.field
|
否 | 默認(rèn)id |
--kafka-server |
指定Kafka 集群地址 | 是 | 無(wú) |
--kafka-topic |
指定消費(fèi)kafka的隊(duì)列 | 是 | 無(wú) |
--kafka-group |
指定消費(fèi)kafka的group | 否 | 默認(rèn)在存儲(chǔ)表名前加'hudi'前綴捌浩,比如'hudi_crm__order' |
--duration-seconds |
由于本程序使用Spark streaming開(kāi)發(fā),這里指定Spark streaming微批的時(shí)長(zhǎng) | 否 | 默認(rèn)10秒 |
一個(gè)使用的demo如下
/data/opt/spark-2.4.4-bin-hadoop2.6/bin/spark-submit --class com.niceshot.hudi.CanalKafkaImport2Hudi \
--name hudi__goods \
--master yarn \
--deploy-mode cluster \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
--num-executors 1 \
--queue hudi \
--conf spark.executor.memoryOverhead=2048 \
--conf "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=\tmp\hudi-debug" \
--conf spark.core.connection.ack.wait.timeout=300 \
--conf spark.locality.wait=100 \
--conf spark.streaming.backpressure.enabled=true \
--conf spark.streaming.receiver.maxRate=500 \
--conf spark.streaming.kafka.maxRatePerPartition=200 \
--conf spark.ui.retainedJobs=10 \
--conf spark.ui.retainedStages=10 \
--conf spark.ui.retainedTasks=10 \
--conf spark.worker.ui.retainedExecutors=10 \
--conf spark.worker.ui.retainedDrivers=10 \
--conf spark.sql.ui.retainedExecutions=10 \
--conf spark.yarn.submit.waitAppCompletion=false \
--conf spark.yarn.maxAppAttempts=4 \
--conf spark.yarn.am.attemptFailuresValidityInterval=1h \
--conf spark.yarn.max.executor.failures=20 \
--conf spark.yarn.executor.failuresValidityInterval=1h \
--conf spark.task.maxFailures=8 \
/data/opt/spark-applications/hudi_canal_consumer/hudi-canal-import-1.0-SNAPSHOT-jar-with-dependencies.jar --kafka-server local:9092 --kafka-topic dt_streaming_canal_xxx --base-save-path hdfs://192.168.2.1:8020/hudi_table/ --mapping-mysql-db-name crm --mapping-mysql-table-name order --primary-key id --partition-key createDate --duration-seconds 1200
歷史數(shù)據(jù)同步以及表元數(shù)據(jù)同步至hive
history_import_and_meta_sync
分支提供了將歷史數(shù)據(jù)同步至hudi表工秩,以及將hudi表數(shù)據(jù)結(jié)構(gòu)同步至hive meta的操作
同步歷史數(shù)據(jù)至hudi表
這里采用的思路是
- 將mysql全量數(shù)據(jù)通過(guò)注入sqoop等工具,導(dǎo)入到hive表进统。
- 然后采用分支代碼中的工具HiveImport2HudiConfig助币,將數(shù)據(jù)導(dǎo)入Hudi表
HiveImport2HudiConfig提供了如下一些參數(shù),用于配置程序執(zhí)行行為
參數(shù)名 | 含義 | 是否必填 | 默認(rèn)值 |
---|---|---|---|
--base-save-path |
hudi表存放在HDFS的基礎(chǔ)路徑螟碎,比如hdfs://192.168.16.181:8020/hudi_data/ | 是 | 無(wú) |
--mapping-mysql-db-name |
指定處理的Mysql庫(kù)名 | 是 | 無(wú) |
--mapping-mysql-table-name |
指定處理的Mysql表名 | 是 | 無(wú) |
--store-table-name |
指定Hudi的表名 | 否 | 默認(rèn)會(huì)根據(jù)--mapping-mysql-db-name和--mapping-mysql-table-name自動(dòng)生成眉菱。假設(shè)--mapping-mysql-db-name 為crm,--mapping-mysql-table-name為order掉分。那么最終的hudi表名為crm__order |
--real-save-path |
指定hudi表最終存儲(chǔ)的hdfs路徑 | 否 | 默認(rèn)根據(jù)--base-save-path和--store-table-name自動(dòng)生成俭缓,生成格式為'--base-save-path'+'/'+'--store-table-name' ,推薦默認(rèn) |
--primary-key |
指定同步的hive歷史表中能唯一標(biāo)識(shí)記錄的字段名 | 否 | 默認(rèn)id |
--partition-key |
指定hive歷史表中可以用于分區(qū)的時(shí)間字段酥郭,字段必須是timestamp 或dateime類(lèi)型 | 是 | 無(wú) |
--precombine-key |
最終用于配置hudi的hoodie.datasource.write.precombine.field
|
否 | 默認(rèn)id |
--sync-hive-db-name |
全量歷史數(shù)據(jù)所在hive的庫(kù)名 | 是 | 無(wú) |
--sync-hive-table-name |
全量歷史數(shù)據(jù)所在hive的表名 | 是 | 無(wú) |
--hive-base-path |
hive的所有數(shù)據(jù)文件存放地址华坦,需要參看具體的hive配置 | 否 | /user/hive/warehouse |
--hive-site-path |
hive-site.xml配置文件所在的地址 | 是 | 無(wú) |
--tmp-data-path |
程序執(zhí)行過(guò)程中臨時(shí)文件存放路徑。一般默認(rèn)路徑是/tmp不从。有可能出現(xiàn)/tmp所在磁盤(pán)太小惜姐,而導(dǎo)致歷史程序執(zhí)行失敗的情況。當(dāng)出現(xiàn)該情況時(shí)椿息,可以通過(guò)該參數(shù)自定義執(zhí)行路徑 | 否 | 默認(rèn)操作系統(tǒng)臨時(shí)目錄 |
一個(gè)程序執(zhí)行demo
nohup java -jar hudi-learn-1.0-SNAPSHOT.jar --sync-hive-db-name hudi_temp --sync-hive-table-name crm__wx_user_info --base-save-path hdfs://192.168.2.2:8020/hudi_table/ --mapping-mysql-db-name crm --mapping-mysql-table-name "order" --primary-key "id" --partition-key created_date --hive-site-path /etc/lib/hive/conf/hive-site.xml --tmp-data-path /data/tmp > order.log &
同步hudi表結(jié)構(gòu)至hive meta
需要將hudi的數(shù)據(jù)結(jié)構(gòu)和分區(qū)歹袁,以hive外表的形式同步至Hive meta,才能是Hive感知到hudi數(shù)據(jù)寝优,并通過(guò)sql進(jìn)行查詢(xún)分析条舔。Hudi本身在消費(fèi)Binlog進(jìn)行存儲(chǔ)時(shí),可以順帶將相關(guān)表元數(shù)據(jù)信息同步至hive乏矾。但考慮到每條寫(xiě)入Apache Hudi表的數(shù)據(jù)孟抗,都要讀寫(xiě)Hive Meta ,對(duì)Hive的性能可能影響很大妻熊。所以我單獨(dú)開(kāi)發(fā)了HiveMetaSyncConfig工具夸浅,用于同步hudi表元數(shù)據(jù)至Hive∪右郏考慮到目前程序只支持按天分區(qū)帆喇,所以同步工具可以一天執(zhí)行一次即可。參數(shù)配置如下
參數(shù)名 | 含義 | 是否必填 | 默認(rèn)值 | |
---|---|---|---|---|
--hive-db-name |
指定hudi表同步至哪個(gè)hive數(shù)據(jù)庫(kù) | 是 | 無(wú) | |
--hive-table-name |
指定hudi表同步至哪個(gè)hive表 | 是 | 無(wú) | 亿胸、 |
--hive-jdbc-url |
指定hive meta的jdbc鏈接地址坯钦,例如jdbc:hive2://192.168.16.181:10000 | 是 | 無(wú) | |
--hive-user-name |
指定hive meta的鏈接用戶(hù)名 | 否 | 默認(rèn)hive | |
--hive-pwd |
指定hive meta的鏈接密碼 | 否 | 默認(rèn)hive | |
--hudi-table-path |
指定hudi表所在hdfs的文件路徑 | 是 | 無(wú) | |
--hive-site-path |
指定hive的hive-site.xml路徑 | 是 | 無(wú) |
一個(gè)程序執(zhí)行demo
java -jar hudi-learn-1.0-SNAPSHOT.jar --hive-db-name streaming --hive-table-name crm__order --hive-user-name hive --hive-pwd hive --hive-jdbc-url jdbc:hive2://192.168.16.181:10000 --hudi-table-path hdfs://192.168.16.181:8020/hudi_table/crm__order --hive-site-path /lib/hive/conf/hive-site.xml
一些踩坑
hive相關(guān)配置
有些hive集群的hive.input.format配置预皇,默認(rèn)是org.apache.hadoop.hive.ql.io.CombineHiveInputFormat,這會(huì)導(dǎo)致掛載Hudi數(shù)據(jù)的Hive外表讀取到所有Hudi的Parquet數(shù)據(jù)婉刀,從而導(dǎo)致最終的讀取結(jié)果重復(fù)吟温。需要將hive的format改為org.apache.hadoop.hive.ql.io.HiveInputFormat
,為了避免在整個(gè)集群層面上更改對(duì)其余離線Hive Sql造成不必要的影響突颊,建議只對(duì)當(dāng)前hive session設(shè)置set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
spark streaming的一些調(diào)優(yōu)
由于binlog寫(xiě)入Hudi表的是基于Spark streaming實(shí)現(xiàn)的鲁豪,這里給出了一些spark 和spark streaming層面的配置,它能使整個(gè)程序工作更穩(wěn)定
配置 | 含義 |
---|---|
spark.streaming.backpressure.enabled=true | 啟動(dòng)背壓律秃,該配置能使Spark Streaming消費(fèi)速率爬橡,基于上一次的消費(fèi)情況,進(jìn)行調(diào)整棒动,避免程序崩潰 |
spark.ui.retainedJobs=10 spark.ui.retainedStages=10 spark.ui.retainedTasks=10 spark.worker.ui.retainedExecutors=10 spark.worker.ui.retainedDrivers=10 spark.sql.ui.retainedExecutions=10 |
默認(rèn)情況下糙申,spark 會(huì)在driver中存儲(chǔ)一些spark 程序執(zhí)行過(guò)程中各stage和task的歷史信息,當(dāng)driver內(nèi)存過(guò)小時(shí)船惨,可能使driver崩潰柜裸,通過(guò)上述參數(shù),調(diào)節(jié)這些歷史數(shù)據(jù)存儲(chǔ)的條數(shù)粱锐,從而減小對(duì)內(nèi)層使用 |
spark.yarn.maxAppAttempts=4 | 配置當(dāng)driver崩潰后疙挺,嘗試重啟的次數(shù) |
spark.yarn.am.attemptFailuresValidityInterval=1h | 假若driver執(zhí)行一周才崩潰一次,那我們更希望每次都能重啟卜范,而上述配置在累計(jì)到重啟4次后衔统,driver就再也不會(huì)被重啟,該配置則用于重置maxAppAttempts的時(shí)間間隔 |
spark.yarn.max.executor.failures=20 | executor執(zhí)行也可能失敗海雪,失敗后集群會(huì)自動(dòng)分配新的executor, 該配置用于配置允許executor失敗的次數(shù)锦爵,超過(guò)次數(shù)后程序會(huì)報(bào)(reason: Max number of executor failures (400) reached),并退出 |
spark.yarn.executor.failuresValidityInterval=1h | 指定executor失敗重分配次數(shù)重置的時(shí)間間隔 |
spark.task.maxFailures=8 | 允許任務(wù)執(zhí)行失敗的次數(shù) |
未來(lái)改進(jìn)
- 支持無(wú)分區(qū)奥裸,或非日期分區(qū)表险掀。目前只支持日期分區(qū)表
- 多數(shù)據(jù)類(lèi)型支持,目前為了程序的穩(wěn)定性湾宙,會(huì)將Mysql中的字段全部以String類(lèi)型存儲(chǔ)至Hudi
參考資料
歡迎關(guān)注我的個(gè)人公眾號(hào)"西北偏北UP"樟氢,記錄代碼人生,行業(yè)思考侠鳄,科技評(píng)論