概述
mysql同步數(shù)據(jù)到hive大部分公司目前都是走的jdbc的方式。
這種方式有兩個好處:
- 開發(fā)簡單候齿。只需要從mysql讀取相關(guān)的數(shù)據(jù)熙暴,插入到hive表當(dāng)中就行了。Sqoop 或者 waterdrop 都是走的這個邏輯慌盯。
- 穩(wěn)定周霉。因為是直接讀取mysql的jdbc的數(shù)據(jù),基本不會有數(shù)據(jù)不一致的問題亚皂。
也有不好的地方:
- 對數(shù)據(jù)庫的壓力比較大俱箱。特別是每小時全量同步一些大的表的任務(wù),都走jdbc的話灭必,mysql的壓力會比較大狞谱。
binlog同步mysql數(shù)據(jù)技術(shù)方案
總體架構(gòu)設(shè)計
- gorail將mysql的binlog數(shù)據(jù)同步到kafka
- flink消費kafka的數(shù)據(jù)寫入到hdfs
- spark批處理合并binlog增量數(shù)據(jù)和歷史數(shù)據(jù),生成最新的數(shù)據(jù)快照(這一步比較復(fù)雜禁漓,后面詳細說明)
具體細節(jié)
gorail將mysql的binlog數(shù)據(jù)同步到kafka
這一步最主要的細節(jié)是將mysql庫的所有binlog數(shù)據(jù)全部打入一個kafka topic跟衅,格式使用json。格式如下:
{
"action":"insert",//update播歼、insert伶跷、delete
"gorail_nano":"1578131519599439478",//gorail處理時間
"primary_keys":"[[610927]]",//主鍵
"raw_rows":"[]",//更新前數(shù)據(jù)
"rows":"[map[action:1 content_id:1600040219 create_time:2020-01-04 17:51:59 id:610927 op_id:472 op_name:張玲玲]]",//更新后數(shù)據(jù)
"schema":"qukan",//庫名
"table":"content_op_log",//表名
"timestamp":"1578131519"http://binlog產(chǎn)生時間
}
- 為什么選擇json?
因為json結(jié)構(gòu)比較靈活秘狞,每個mysql的表的數(shù)據(jù)結(jié)構(gòu)都不一樣叭莫,json會更好記錄這些變化的數(shù)據(jù)。 - 為什么打入到一個kafka topic烁试?
這個主要是因為方便雇初,后面消費的時候也只需要起一個flink任務(wù),便于統(tǒng)一管理减响。
flink消費kafka的數(shù)據(jù)寫入到hdfs
這一步的主要的細節(jié)在于寫入到hdfs的結(jié)構(gòu)靖诗,以及為什么不直接寫入hive。
不寫入到hive表的原因在于辩蛋,binlog的數(shù)據(jù)結(jié)構(gòu)是不固定的呻畸,而hive的結(jié)構(gòu)相對是比較固定的。如果要寫入到hive的話悼院,就需要將不同的表的binlog寫入到不同的hive表中伤为,這個維護成本太高了。而且spark其實可以直接讀取hdfs的json文件,因此直接放hdfs就好了绞愚。
寫入到hdfs的話叙甸,考慮到后續(xù)讀這個數(shù)據(jù)是要按照表去讀增量數(shù)據(jù),所以寫入的目錄一定是要帶日期和表名稱的位衩。我這邊用的目錄結(jié)構(gòu)是這樣的:
${binlog_file_path}/schema=${mysql_db_name}/table=${mysql_table_name}/dt=${date}/${json_file}
也就是說要在flink根據(jù)數(shù)據(jù)所屬的db裆蒸、table_name、和日期將數(shù)據(jù)寫入到不同的目錄里糖驴。
在這一步的處理的過程中遇到了一些比較重要的參數(shù)問題僚祷。
- 首先是選擇Rolling Policy。flink提供了兩種rolling policy贮缕,分別是DefaultRollingPolicy和OnCheckpointRollingPolicy辙谜。這兩者的區(qū)別是前者是可以配置多個觸發(fā)生成新文件的條件,比如文件大小達到xxx感昼,比如時間間隔超過xxx装哆。而后者比較簡單,就是在每次checkpoint的時候?qū)懭胄碌奈募ㄉぃ鴆heckpoint是按照時間間隔(每多長時間觸發(fā)一次)觸發(fā)的蜕琴,所以后者就是在每xxx時間生成一個新文件。另外還有個比較坑的地方宵溅,我把官方文檔的截圖放下面了凌简。如果你的hadoop版本小于2.7 就盡量使用OnCheckpointRollingPolicy。否則你的任務(wù)就會過一段時間報個錯誤层玲,說是找不到 truncate方法号醉。
2.如上所述checkpoint的時間間隔。不僅僅會影響checkpoint的頻率辛块,而且會影響hdfs文件的大小,而hdfs文件的大小可能會對hdfs的性能有很大影響铅碍。這個值如果太大润绵,就會造成數(shù)據(jù)延遲太高,如果太小就會造成小文件過多胞谈。我這邊設(shè)置的是5分鐘尘盼。
細心的看官,這個時候會問了烦绳,既然你的目錄是分table的卿捎,那么每個table每5分鐘的binlog數(shù)據(jù)量是不一樣的。對于某些大的mysql表径密,我們可能每5分鐘生成一個文件還能接受午阵。對于一些比較小的表,每五分鐘生成一個文件那么文件就會非常小。所以我這邊又做了一層的篩選底桂,我把mysql的大的表篩選出來植袍,只同步大的表到hdfs,用以binlog的數(shù)據(jù)同步籽懦。因為本身binlog的方式同步mysql數(shù)據(jù)為的就是節(jié)約mysql的讀取壓力于个,而小的表對于不會有太大壓力,這些表可以直接通過jdbc的方式去同步暮顺。
spark批處理合并binlog增量數(shù)據(jù)和歷史數(shù)據(jù)
這個是整個環(huán)節(jié)里面最復(fù)雜的一部分厅篓,涉及的細節(jié)也比較多。
首先捶码,我們要明確一下總體的思路是什么羽氮。總體的思路就是要讀取hdfs上的老的歷史數(shù)據(jù)宙项,然后和新的binlog數(shù)據(jù)合并生成新的快照乏苦。
- 讀取老數(shù)據(jù)
如果是首次導(dǎo)入數(shù)據(jù),那么沒有老數(shù)據(jù)尤筐,我們就需要通過jdbc從mysql全量同步一次汇荐。
如果已經(jīng)有老數(shù)據(jù)了。那么我們需要從該表讀取老數(shù)據(jù)盆繁,然后與新的binlog的數(shù)據(jù)合并掀淘。 - 數(shù)據(jù)合并
注意這里binlog的數(shù)據(jù)需要做一些處理。因為之前說過油昂,binlog的數(shù)據(jù)是一個特殊的json格式革娄。我們需要按照hive表的字段結(jié)構(gòu),將binlog的json結(jié)構(gòu)轉(zhuǎn)化成我們想要的固定的hive結(jié)構(gòu)的dataframe冕碟。然后將這個dataframe和hive表的老數(shù)據(jù)union all之后拦惋,按照partition by id order by time 取最后一條就行了。 - 數(shù)據(jù)驗證
因為binlog的數(shù)據(jù)會自動清理安寺,也有可能出現(xiàn)不穩(wěn)定的情況厕妖,可能會導(dǎo)致后續(xù)數(shù)據(jù)一直是錯的。所以我們在生成了hive的最新的快照之后挑庶,會和mysql的數(shù)據(jù)做一次比對言秸。數(shù)據(jù)量差異超過1/1000那么就會全量通過jdbc的方式從mysql抽取一次數(shù)據(jù)。這一步非常關(guān)鍵迎捺,這個是對數(shù)據(jù)準(zhǔn)確性的一個極大的保障举畸。
其實這中間還涉及到一些其他的細節(jié),比如mysql表結(jié)構(gòu)變更凳枝,或者mysql和hive的數(shù)據(jù)結(jié)構(gòu)不一致的情況抄沮。
另外我們這邊還存在多個db的相同的表導(dǎo)入到hive的一張表中的其他問題,我就不贅述了。