MongoDB實(shí)時(shí)同步Impala

背景

由于越來(lái)越多的Mysql數(shù)據(jù)以及Mongodb的數(shù)據(jù)需要做分析.但是大量的數(shù)據(jù)分析并不適合在Mysql以及Mongodb中進(jìn)行,所以需要將數(shù)據(jù)同步指CDH中, 由于2方面原因所以才有了實(shí)時(shí)同步的需求.

1.數(shù)據(jù)延后性
由于Sqoop/Mongoexport 數(shù)據(jù)的延后性,導(dǎo)致CDH只能作為分析平臺(tái)為分析人員提供數(shù)據(jù)支持,針對(duì)于分析視角而言是沒(méi)問(wèn)題的.應(yīng)用系統(tǒng)基本會(huì)從性能的角度考慮,從而限制用戶導(dǎo)出數(shù)據(jù)的條數(shù),如果從基層業(yè)務(wù)人員需要從應(yīng)用系統(tǒng)導(dǎo)出大量數(shù)據(jù)時(shí)問(wèn)題來(lái)了.為什么你們提供給我的數(shù)據(jù)和應(yīng)用系統(tǒng)的數(shù)據(jù)不一致呢?為什么不能導(dǎo)出最新的數(shù)據(jù)給我呢?我需要用最新的數(shù)據(jù)處理XX業(yè)務(wù)問(wèn)題,但是應(yīng)用系統(tǒng)只能每次導(dǎo)出500條,我有10000條需要導(dǎo)出,難道要導(dǎo)出20次嗎?

2.集群性能壓力
如果公司有100個(gè)數(shù)據(jù)庫(kù),而大家都知道Sqoop/Mongoexport操作 都是會(huì)影響集群性能負(fù)載,最初有考慮將所有Sqoop/Mongoexport分散在不同的服務(wù)器以解決此問(wèn)題.但是Sqoop同步的時(shí)候由于是Yarn管理資源.所以會(huì)有一定限制.如果這么多數(shù)據(jù)庫(kù)一直不停的同步.可能需要10個(gè)小時(shí)(數(shù)據(jù)庫(kù)數(shù)據(jù)大小決定)甚至10小時(shí)以上,大量的ETL腳本一般都是需要在上班之前呈現(xiàn)給業(yè)務(wù)方.那么數(shù)據(jù)庫(kù)如果是在晚上同步的話,那么早上上班前可能只有1~2個(gè)小時(shí)的時(shí)間去做數(shù)據(jù)的ETL工作.導(dǎo)致了大量的數(shù)據(jù)腳本集中在這1~2個(gè)小時(shí).大量的讀寫(xiě)以及查詢導(dǎo)致集群負(fù)載飆升.寫(xiě)本文的這個(gè)早上我們的集群機(jī)器負(fù)載達(dá)到了1000,這個(gè)非撑嘲恐怖甚至不敢想象今后越來(lái)越多的數(shù)據(jù)處理工作如何進(jìn)行,所以才有了實(shí)時(shí)同步的需求.

3.實(shí)時(shí)數(shù)倉(cāng)建設(shè)
目前正在建設(shè)做大數(shù)據(jù)數(shù)據(jù)倉(cāng)庫(kù)的建設(shè)工作,每天全表刷新數(shù)倉(cāng)所有數(shù)據(jù)非常耗時(shí),且浪費(fèi)資源.實(shí)時(shí)同步也為實(shí)時(shí)數(shù)倉(cāng)建設(shè)打開(kāi)了大門.之后對(duì)接 Flink 等實(shí)時(shí)處理工作也有極大的支持.

同步工具

在2018年中的時(shí)候就在關(guān)注實(shí)時(shí)同步,但是國(guó)內(nèi)很多都是使用阿里DataX,不可否認(rèn)是個(gè)好工具.但是我更希望能找到可以集成在Cloudera生態(tài)之內(nèi)的同步工具,StreamSets讓我眼前一亮,這不就是我需要的工具嗎?由于一些原因.直到今年年末才有時(shí)間繼續(xù)研究這一塊.

話不多說(shuō),進(jìn)入正題

1.Impala中創(chuàng)建KUDU表.

CREATE TABLE test.mgtest
(`_id` STRING NOT NULL, 
userid STRING, name string, 
age string, 
PRIMARY KEY (`_id`)) 
PARTITION BY HASH (`_id`) 
PARTITIONS 16 STORED AS KUDU 
TBLPROPERTIES ('kudu.master_addresses'='cdh2:7051', 'kudu.table_name'='test.mgtest')
image.png

2.配置流


image.png

2.1.Mongo數(shù)據(jù)源配置.


image.png

2.2.數(shù)據(jù)庫(kù)選擇.


image.png

2.3.數(shù)據(jù)處理.


image.png

代碼如下:

for record in records:
  newRecord = sdcFunctions.createRecord(record.sourceId + ':newRecordId')
  try:      
    if record.value['op'] == 'i':
      newRecord.attributes['sdc.operation.type']='1'
      newRecord.value = record.value['o']
    if record.value['op'] == 'd':
      newRecord.attributes['sdc.operation.type']='2'
      newRecord.value = record.value['o']
    if record.value['op'] == 'u':
      newRecord.attributes['sdc.operation.type']='3'
      newRecord.value = record.value['o']['$set'] 
      newRecord.value['_id'] = record.value['o2']['_id']
    # Write record to processor output
    #newRecord.value['Type'] = record.value['Type']
    newRecord.value['ns'] = record.value['ns']
    newRecord.value['op'] = record.value['op']
    output.write(newRecord)
  except Exception as e:
    # Send record to error
    error.write(newRecord, str(e))

2.4.子文檔處理


image.png

代碼如下:

import array
import json

def convert_json(item):
  return json.dumps(item,ensure_ascii=False,encoding="utf-8")
def convert_array(item):
  return json.dumps(item,ensure_ascii=False,encoding="utf-8")
for record in records:
  try:
    for colName,value in record.value.items():
      temp = record.value[colName]
      record.value[colName] = None
      if sdcFunctions.getFieldNull(record,'/'+colName) is NULL_MAP:
        temp = convert_json(temp)
      elif sdcFunctions.getFieldNull(record,'/'+colName) is NULL_LIST:
        temp = convert_array(temp)
      record.value[colName] = temp
    output.write(record)
  except Exception as e:
    error.write(record, str(e))

2.5.CRUD選擇器.


image.png

2.6.插入數(shù)據(jù)


image.png

更新和刪除數(shù)據(jù)更改Default Operation 的值即可.


image.png

2.7 測(cè)試MongoDB 插入數(shù)據(jù)
2.7.1 Mongo插入數(shù)據(jù)


image.png

2.7.2 Impala查詢


image.png

2.7.3 Streamsets流
image.png

2.8 Mongo更新數(shù)據(jù)
2.8.1 Mongo更新數(shù)據(jù)


image.png

2.8.2 Impala查詢


image.png

2.8.3 StreamSets流
image.png

2.9 MongoDB刪除數(shù)據(jù)
2.9.1 MongoDB刪除數(shù)據(jù)


image.png

2.9.2 Impala查詢


image.png

最后在impala使用的過(guò)程中,由于mongoDB子文檔問(wèn)題.需要升級(jí)到Impala版本3.1以上以支持get_json_object 這個(gè)function, 后續(xù)需要考慮的問(wèn)題是Hive如何使用.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末航瞭,一起剝皮案震驚了整個(gè)濱河市并级,隨后出現(xiàn)的幾起案子卿拴,更是在濱河造成了極大的恐慌村生,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,198評(píng)論 6 514
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件讶踪,死亡現(xiàn)場(chǎng)離奇詭異蕴侣,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)数苫,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,334評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門聪舒,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人虐急,你說(shuō)我怎么就攤上這事箱残。” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 167,643評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵被辑,是天一觀的道長(zhǎng)燎悍。 經(jīng)常有香客問(wèn)我,道長(zhǎng)盼理,這世上最難降的妖魔是什么谈山? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,495評(píng)論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮宏怔,結(jié)果婚禮上奏路,老公的妹妹穿的比我還像新娘。我一直安慰自己臊诊,他們只是感情好鸽粉,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,502評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著妨猩,像睡著了一般潜叛。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上壶硅,一...
    開(kāi)封第一講書(shū)人閱讀 52,156評(píng)論 1 308
  • 那天,我揣著相機(jī)與錄音销斟,去河邊找鬼庐椒。 笑死,一個(gè)胖子當(dāng)著我的面吹牛蚂踊,可吹牛的內(nèi)容都是我干的约谈。 我是一名探鬼主播,決...
    沈念sama閱讀 40,743評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼犁钟,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼棱诱!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起涝动,我...
    開(kāi)封第一講書(shū)人閱讀 39,659評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤迈勋,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后醋粟,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體靡菇,經(jīng)...
    沈念sama閱讀 46,200評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,282評(píng)論 3 340
  • 正文 我和宋清朗相戀三年米愿,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了厦凤。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,424評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡育苟,死狀恐怖较鼓,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情违柏,我是刑警寧澤博烂,帶...
    沈念sama閱讀 36,107評(píng)論 5 349
  • 正文 年R本政府宣布香椎,位于F島的核電站,受9級(jí)特大地震影響脖母,放射性物質(zhì)發(fā)生泄漏士鸥。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,789評(píng)論 3 333
  • 文/蒙蒙 一谆级、第九天 我趴在偏房一處隱蔽的房頂上張望烤礁。 院中可真熱鬧,春花似錦肥照、人聲如沸脚仔。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,264評(píng)論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)鲤脏。三九已至,卻和暖如春吕朵,著一層夾襖步出監(jiān)牢的瞬間猎醇,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,390評(píng)論 1 271
  • 我被黑心中介騙來(lái)泰國(guó)打工努溃, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留硫嘶,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,798評(píng)論 3 376
  • 正文 我出身青樓梧税,卻偏偏與公主長(zhǎng)得像沦疾,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子第队,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,435評(píng)論 2 359