mysql同步數(shù)據(jù)到hive---binlog方式

概述

mysql同步數(shù)據(jù)到hive大部分公司目前都是走的jdbc的方式。

這種方式有兩個好處:

  1. 開發(fā)簡單候齿。只需要從mysql讀取相關(guān)的數(shù)據(jù)熙暴,插入到hive表當(dāng)中就行了。Sqoop 或者 waterdrop 都是走的這個邏輯慌盯。
  2. 穩(wěn)定周霉。因為是直接讀取mysql的jdbc的數(shù)據(jù),基本不會有數(shù)據(jù)不一致的問題亚皂。

也有不好的地方:

  1. 對數(shù)據(jù)庫的壓力比較大俱箱。特別是每小時全量同步一些大的表的任務(wù),都走jdbc的話灭必,mysql的壓力會比較大狞谱。

binlog同步mysql數(shù)據(jù)技術(shù)方案

總體架構(gòu)設(shè)計

  1. gorail將mysql的binlog數(shù)據(jù)同步到kafka
  2. flink消費kafka的數(shù)據(jù)寫入到hdfs
  3. spark批處理合并binlog增量數(shù)據(jù)和歷史數(shù)據(jù),生成最新的數(shù)據(jù)快照(這一步比較復(fù)雜禁漓,后面詳細說明)

具體細節(jié)

gorail將mysql的binlog數(shù)據(jù)同步到kafka

這一步最主要的細節(jié)是將mysql庫的所有binlog數(shù)據(jù)全部打入一個kafka topic跟衅,格式使用json。格式如下:

{
    "action":"insert",//update播歼、insert伶跷、delete
    "gorail_nano":"1578131519599439478",//gorail處理時間
    "primary_keys":"[[610927]]",//主鍵
    "raw_rows":"[]",//更新前數(shù)據(jù)
    "rows":"[map[action:1 content_id:1600040219 create_time:2020-01-04 17:51:59 id:610927 op_id:472 op_name:張玲玲]]",//更新后數(shù)據(jù)
    "schema":"qukan",//庫名
    "table":"content_op_log",//表名
    "timestamp":"1578131519"http://binlog產(chǎn)生時間
}
  1. 為什么選擇json?
    因為json結(jié)構(gòu)比較靈活秘狞,每個mysql的表的數(shù)據(jù)結(jié)構(gòu)都不一樣叭莫,json會更好記錄這些變化的數(shù)據(jù)。
  2. 為什么打入到一個kafka topic烁试?
    這個主要是因為方便雇初,后面消費的時候也只需要起一個flink任務(wù),便于統(tǒng)一管理减响。

flink消費kafka的數(shù)據(jù)寫入到hdfs

這一步的主要的細節(jié)在于寫入到hdfs的結(jié)構(gòu)靖诗,以及為什么不直接寫入hive。
不寫入到hive表的原因在于辩蛋,binlog的數(shù)據(jù)結(jié)構(gòu)是不固定的呻畸,而hive的結(jié)構(gòu)相對是比較固定的。如果要寫入到hive的話悼院,就需要將不同的表的binlog寫入到不同的hive表中伤为,這個維護成本太高了。而且spark其實可以直接讀取hdfs的json文件,因此直接放hdfs就好了绞愚。

寫入到hdfs的話叙甸,考慮到后續(xù)讀這個數(shù)據(jù)是要按照表去讀增量數(shù)據(jù),所以寫入的目錄一定是要帶日期和表名稱的位衩。我這邊用的目錄結(jié)構(gòu)是這樣的:

${binlog_file_path}/schema=${mysql_db_name}/table=${mysql_table_name}/dt=${date}/${json_file}

也就是說要在flink根據(jù)數(shù)據(jù)所屬的db裆蒸、table_name、和日期將數(shù)據(jù)寫入到不同的目錄里糖驴。

在這一步的處理的過程中遇到了一些比較重要的參數(shù)問題僚祷。

  1. 首先是選擇Rolling Policy。flink提供了兩種rolling policy贮缕,分別是DefaultRollingPolicy和OnCheckpointRollingPolicy辙谜。這兩者的區(qū)別是前者是可以配置多個觸發(fā)生成新文件的條件,比如文件大小達到xxx感昼,比如時間間隔超過xxx装哆。而后者比較簡單,就是在每次checkpoint的時候?qū)懭胄碌奈募ㄉぃ鴆heckpoint是按照時間間隔(每多長時間觸發(fā)一次)觸發(fā)的蜕琴,所以后者就是在每xxx時間生成一個新文件。另外還有個比較坑的地方宵溅,我把官方文檔的截圖放下面了凌简。如果你的hadoop版本小于2.7 就盡量使用OnCheckpointRollingPolicy。否則你的任務(wù)就會過一段時間報個錯誤层玲,說是找不到 truncate方法号醉。
image.png

2.如上所述checkpoint的時間間隔。不僅僅會影響checkpoint的頻率辛块,而且會影響hdfs文件的大小,而hdfs文件的大小可能會對hdfs的性能有很大影響铅碍。這個值如果太大润绵,就會造成數(shù)據(jù)延遲太高,如果太小就會造成小文件過多胞谈。我這邊設(shè)置的是5分鐘尘盼。
細心的看官,這個時候會問了烦绳,既然你的目錄是分table的卿捎,那么每個table每5分鐘的binlog數(shù)據(jù)量是不一樣的。對于某些大的mysql表径密,我們可能每5分鐘生成一個文件還能接受午阵。對于一些比較小的表,每五分鐘生成一個文件那么文件就會非常小。所以我這邊又做了一層的篩選底桂,我把mysql的大的表篩選出來植袍,只同步大的表到hdfs,用以binlog的數(shù)據(jù)同步籽懦。因為本身binlog的方式同步mysql數(shù)據(jù)為的就是節(jié)約mysql的讀取壓力于个,而小的表對于不會有太大壓力,這些表可以直接通過jdbc的方式去同步暮顺。

spark批處理合并binlog增量數(shù)據(jù)和歷史數(shù)據(jù)

這個是整個環(huán)節(jié)里面最復(fù)雜的一部分厅篓,涉及的細節(jié)也比較多。
首先捶码,我們要明確一下總體的思路是什么羽氮。總體的思路就是要讀取hdfs上的老的歷史數(shù)據(jù)宙项,然后和新的binlog數(shù)據(jù)合并生成新的快照乏苦。

  1. 讀取老數(shù)據(jù)
    如果是首次導(dǎo)入數(shù)據(jù),那么沒有老數(shù)據(jù)尤筐,我們就需要通過jdbc從mysql全量同步一次汇荐。
    如果已經(jīng)有老數(shù)據(jù)了。那么我們需要從該表讀取老數(shù)據(jù)盆繁,然后與新的binlog的數(shù)據(jù)合并掀淘。
  2. 數(shù)據(jù)合并
    注意這里binlog的數(shù)據(jù)需要做一些處理。因為之前說過油昂,binlog的數(shù)據(jù)是一個特殊的json格式革娄。我們需要按照hive表的字段結(jié)構(gòu),將binlog的json結(jié)構(gòu)轉(zhuǎn)化成我們想要的固定的hive結(jié)構(gòu)的dataframe冕碟。然后將這個dataframe和hive表的老數(shù)據(jù)union all之后拦惋,按照partition by id order by time 取最后一條就行了。
  3. 數(shù)據(jù)驗證
    因為binlog的數(shù)據(jù)會自動清理安寺,也有可能出現(xiàn)不穩(wěn)定的情況厕妖,可能會導(dǎo)致后續(xù)數(shù)據(jù)一直是錯的。所以我們在生成了hive的最新的快照之后挑庶,會和mysql的數(shù)據(jù)做一次比對言秸。數(shù)據(jù)量差異超過1/1000那么就會全量通過jdbc的方式從mysql抽取一次數(shù)據(jù)。這一步非常關(guān)鍵迎捺,這個是對數(shù)據(jù)準(zhǔn)確性的一個極大的保障举畸。

其實這中間還涉及到一些其他的細節(jié),比如mysql表結(jié)構(gòu)變更凳枝,或者mysql和hive的數(shù)據(jù)結(jié)構(gòu)不一致的情況抄沮。
另外我們這邊還存在多個db的相同的表導(dǎo)入到hive的一張表中的其他問題,我就不贅述了。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末合是,一起剝皮案震驚了整個濱河市了罪,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌聪全,老刑警劉巖泊藕,帶你破解...
    沈念sama閱讀 217,084評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異难礼,居然都是意外死亡娃圆,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,623評論 3 392
  • 文/潘曉璐 我一進店門蛾茉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來讼呢,“玉大人,你說我怎么就攤上這事谦炬≡闷粒” “怎么了?”我有些...
    開封第一講書人閱讀 163,450評論 0 353
  • 文/不壞的土叔 我叫張陵键思,是天一觀的道長础爬。 經(jīng)常有香客問我,道長吼鳞,這世上最難降的妖魔是什么看蚜? 我笑而不...
    開封第一講書人閱讀 58,322評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮赔桌,結(jié)果婚禮上供炎,老公的妹妹穿的比我還像新娘。我一直安慰自己疾党,他們只是感情好音诫,可當(dāng)我...
    茶點故事閱讀 67,370評論 6 390
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著雪位,像睡著了一般纽竣。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上茧泪,一...
    開封第一講書人閱讀 51,274評論 1 300
  • 那天,我揣著相機與錄音聋袋,去河邊找鬼队伟。 笑死,一個胖子當(dāng)著我的面吹牛幽勒,可吹牛的內(nèi)容都是我干的嗜侮。 我是一名探鬼主播,決...
    沈念sama閱讀 40,126評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼锈颗!你這毒婦竟也來了顷霹?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,980評論 0 275
  • 序言:老撾萬榮一對情侶失蹤击吱,失蹤者是張志新(化名)和其女友劉穎淋淀,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體覆醇,經(jīng)...
    沈念sama閱讀 45,414評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡朵纷,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,599評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了永脓。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片袍辞。...
    茶點故事閱讀 39,773評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖常摧,靈堂內(nèi)的尸體忽然破棺而出搅吁,到底是詐尸還是另有隱情,我是刑警寧澤落午,帶...
    沈念sama閱讀 35,470評論 5 344
  • 正文 年R本政府宣布谎懦,位于F島的核電站,受9級特大地震影響板甘,放射性物質(zhì)發(fā)生泄漏党瓮。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,080評論 3 327
  • 文/蒙蒙 一盐类、第九天 我趴在偏房一處隱蔽的房頂上張望寞奸。 院中可真熱鬧,春花似錦在跳、人聲如沸枪萄。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,713評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽瓷翻。三九已至,卻和暖如春割坠,著一層夾襖步出監(jiān)牢的瞬間齐帚,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,852評論 1 269
  • 我被黑心中介騙來泰國打工彼哼, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留对妄,地道東北人。 一個月前我還...
    沈念sama閱讀 47,865評論 2 370
  • 正文 我出身青樓敢朱,卻偏偏與公主長得像剪菱,于是被迫代替她去往敵國和親摩瞎。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,689評論 2 354