背景
由于越來(lái)越多的Mysql數(shù)據(jù)以及Mongodb的數(shù)據(jù)需要做分析.但是大量的數(shù)據(jù)分析并不適合在Mysql以及Mongodb中進(jìn)行,所以需要將數(shù)據(jù)同步指CDH中, 由于2方面原因所以才有了實(shí)時(shí)同步的需求.
1.數(shù)據(jù)延后性
由于Sqoop/Mongoexport 數(shù)據(jù)的延后性,導(dǎo)致CDH只能作為分析平臺(tái)為分析人員提供數(shù)據(jù)支持,針對(duì)于分析視角而言是沒(méi)問(wèn)題的.應(yīng)用系統(tǒng)基本會(huì)從性能的角度考慮,從而限制用戶導(dǎo)出數(shù)據(jù)的條數(shù),如果從基層業(yè)務(wù)人員需要從應(yīng)用系統(tǒng)導(dǎo)出大量數(shù)據(jù)時(shí)問(wèn)題來(lái)了.為什么你們提供給我的數(shù)據(jù)和應(yīng)用系統(tǒng)的數(shù)據(jù)不一致呢?為什么不能導(dǎo)出最新的數(shù)據(jù)給我呢?我需要用最新的數(shù)據(jù)處理XX業(yè)務(wù)問(wèn)題,但是應(yīng)用系統(tǒng)只能每次導(dǎo)出500條,我有10000條需要導(dǎo)出,難道要導(dǎo)出20次嗎?
2.集群性能壓力
如果公司有100個(gè)數(shù)據(jù)庫(kù),而大家都知道Sqoop/Mongoexport操作 都是會(huì)影響集群性能負(fù)載,最初有考慮將所有Sqoop/Mongoexport分散在不同的服務(wù)器以解決此問(wèn)題.但是Sqoop同步的時(shí)候由于是Yarn管理資源.所以會(huì)有一定限制.如果這么多數(shù)據(jù)庫(kù)一直不停的同步.可能需要10個(gè)小時(shí)(數(shù)據(jù)庫(kù)數(shù)據(jù)大小決定)甚至10小時(shí)以上,大量的ETL腳本一般都是需要在上班之前呈現(xiàn)給業(yè)務(wù)方.那么數(shù)據(jù)庫(kù)如果是在晚上同步的話,那么早上上班前可能只有1~
2個(gè)小時(shí)的時(shí)間去做數(shù)據(jù)的ETL工作.導(dǎo)致了大量的數(shù)據(jù)腳本集中在這1~
2個(gè)小時(shí).大量的讀寫(xiě)以及查詢導(dǎo)致集群負(fù)載飆升.寫(xiě)本文的這個(gè)早上我們的集群機(jī)器負(fù)載達(dá)到了1000,這個(gè)非撑嘲恐怖甚至不敢想象今后越來(lái)越多的數(shù)據(jù)處理工作如何進(jìn)行,所以才有了實(shí)時(shí)同步的需求.
3.實(shí)時(shí)數(shù)倉(cāng)建設(shè)
目前正在建設(shè)做大數(shù)據(jù)數(shù)據(jù)倉(cāng)庫(kù)的建設(shè)工作,每天全表刷新數(shù)倉(cāng)所有數(shù)據(jù)非常耗時(shí),且浪費(fèi)資源.實(shí)時(shí)同步也為實(shí)時(shí)數(shù)倉(cāng)建設(shè)打開(kāi)了大門.之后對(duì)接 Flink 等實(shí)時(shí)處理工作也有極大的支持.
同步工具
在2018年中的時(shí)候就在關(guān)注實(shí)時(shí)同步,但是國(guó)內(nèi)很多都是使用阿里DataX,不可否認(rèn)是個(gè)好工具.但是我更希望能找到可以集成在Cloudera生態(tài)之內(nèi)的同步工具,StreamSets讓我眼前一亮,這不就是我需要的工具嗎?由于一些原因.直到今年年末才有時(shí)間繼續(xù)研究這一塊.
話不多說(shuō),進(jìn)入正題
1.Impala中創(chuàng)建KUDU表.
CREATE TABLE test.mgtest
(`_id` STRING NOT NULL,
userid STRING, name string,
age string,
PRIMARY KEY (`_id`))
PARTITION BY HASH (`_id`)
PARTITIONS 16 STORED AS KUDU
TBLPROPERTIES ('kudu.master_addresses'='cdh2:7051', 'kudu.table_name'='test.mgtest')
2.配置流
2.1.Mongo數(shù)據(jù)源配置.
2.2.數(shù)據(jù)庫(kù)選擇.
2.3.數(shù)據(jù)處理.
代碼如下:
for record in records:
newRecord = sdcFunctions.createRecord(record.sourceId + ':newRecordId')
try:
if record.value['op'] == 'i':
newRecord.attributes['sdc.operation.type']='1'
newRecord.value = record.value['o']
if record.value['op'] == 'd':
newRecord.attributes['sdc.operation.type']='2'
newRecord.value = record.value['o']
if record.value['op'] == 'u':
newRecord.attributes['sdc.operation.type']='3'
newRecord.value = record.value['o']['$set']
newRecord.value['_id'] = record.value['o2']['_id']
# Write record to processor output
#newRecord.value['Type'] = record.value['Type']
newRecord.value['ns'] = record.value['ns']
newRecord.value['op'] = record.value['op']
output.write(newRecord)
except Exception as e:
# Send record to error
error.write(newRecord, str(e))
2.4.子文檔處理
代碼如下:
import array
import json
def convert_json(item):
return json.dumps(item,ensure_ascii=False,encoding="utf-8")
def convert_array(item):
return json.dumps(item,ensure_ascii=False,encoding="utf-8")
for record in records:
try:
for colName,value in record.value.items():
temp = record.value[colName]
record.value[colName] = None
if sdcFunctions.getFieldNull(record,'/'+colName) is NULL_MAP:
temp = convert_json(temp)
elif sdcFunctions.getFieldNull(record,'/'+colName) is NULL_LIST:
temp = convert_array(temp)
record.value[colName] = temp
output.write(record)
except Exception as e:
error.write(record, str(e))
2.5.CRUD選擇器.
2.6.插入數(shù)據(jù)
更新和刪除數(shù)據(jù)更改Default Operation 的值即可.
2.7 測(cè)試MongoDB 插入數(shù)據(jù)
2.7.1 Mongo插入數(shù)據(jù)
2.7.2 Impala查詢
2.7.3 Streamsets流
2.8 Mongo更新數(shù)據(jù)
2.8.1 Mongo更新數(shù)據(jù)
2.8.2 Impala查詢
2.8.3 StreamSets流
2.9 MongoDB刪除數(shù)據(jù)
2.9.1 MongoDB刪除數(shù)據(jù)
2.9.2 Impala查詢
最后在impala使用的過(guò)程中,由于mongoDB子文檔問(wèn)題.需要升級(jí)到Impala版本3.1以上以支持get_json_object 這個(gè)function, 后續(xù)需要考慮的問(wèn)題是Hive如何使用.