數(shù)據(jù)采集介紹
ETL基本上就是數(shù)據(jù)采集的代表忍啤,包括數(shù)據(jù)的提榷杉帧(Extract)、轉(zhuǎn)換(Transform)和加載(Load)庆尘。數(shù)據(jù)源是整個大數(shù)據(jù)平臺的上游剃诅,數(shù)據(jù)采集是數(shù)據(jù)源與數(shù)倉之間的管道。在采集過程中針對業(yè)務(wù)場景對數(shù)據(jù)進行治理驶忌,完成數(shù)據(jù)清洗工作矛辕。
在大數(shù)據(jù)場景下,數(shù)據(jù)源復雜付魔、多樣聊品,包括業(yè)務(wù)數(shù)據(jù)庫、日志數(shù)據(jù)几苍、圖片翻屈、視頻等多媒體數(shù)據(jù)等。數(shù)據(jù)采集形式也需要更加復雜妻坝,多樣伸眶,包括定時、實時惠勒、增量、全量等爬坑。常見的數(shù)據(jù)采集工具也多種多樣纠屋,可以滿足多種業(yè)務(wù)需求。
一個典型的數(shù)據(jù)加載架構(gòu):
常見的三個數(shù)據(jù)采集場景:
- 場景1:從支持FTP盾计、SFTP售担、 HTTP等 協(xié)議的數(shù)據(jù)源獲取數(shù)據(jù)
- 場景2:從業(yè)務(wù)數(shù)據(jù)庫獲取數(shù)據(jù)赁遗,數(shù)據(jù)采集錄入后需支撐業(yè)務(wù)系統(tǒng)
- 場景3:數(shù)據(jù)源通過Kafka等消息隊列,需要實時采集數(shù)據(jù)
數(shù)據(jù)采集系統(tǒng)需求:
- 數(shù)據(jù)源管理與狀態(tài)監(jiān)控
- 定時族铆、實時岩四、全量、增量等多模式的數(shù)據(jù)采集及任務(wù)監(jiān)控
- 元數(shù)據(jù)管理哥攘、數(shù)據(jù)補采及數(shù)據(jù)歸檔
常用數(shù)據(jù)采集工具
Sqoop
Sqoop是常用的關(guān)系數(shù)據(jù)庫與HDFS之間的數(shù)據(jù)導入導出工具剖煌,將導入或?qū)С雒罘g成MapReduce程序來實現(xiàn)。所以常用于在Hadoop和傳統(tǒng)的數(shù)據(jù)庫(Mysq|逝淹、Postgresq|等)進行數(shù)據(jù)的傳遞耕姊。
可以通過Hadoop的MapReduce把數(shù)據(jù)從關(guān)系型數(shù)據(jù)庫中導入到Hadoop集群。使用Sqoop傳輸大量結(jié)構(gòu)化或半結(jié)構(gòu)化數(shù)據(jù)的過程是完全自動化的栅葡。
Sqoop數(shù)據(jù)傳輸示意圖:
Sqoop Import流程:
- 獲取源數(shù)據(jù)表的MetaData信息
- 根據(jù)參數(shù)提交MapReduce任務(wù)
- 表內(nèi)每行作為一條記錄茉兰,按計劃進行數(shù)據(jù)導入
Sqoop Export流程:*
- 獲取目標數(shù)據(jù)表的MetaData信息
- 根據(jù)參數(shù)提交MapReduce任務(wù)
- 對HDFS文件內(nèi)每行數(shù)據(jù)按指定字符分割,導出到數(shù)據(jù)庫
Apache Flume
Apache Flume本質(zhì)上是一個分布式欣簇、可靠的规脸、高可用的日志收集系統(tǒng),支持多種數(shù)據(jù)來源熊咽,配置靈活莫鸭。Flume可以對海量日志進行采集,聚合和傳輸网棍。
Flume系統(tǒng)分為三個組件黔龟,分別是Source(負責數(shù)據(jù)源的讀取)滥玷,Sink(負責數(shù)據(jù)的輸出)氏身,Channel(作為數(shù)據(jù)的暫存通道),這三個組件將構(gòu)成一個Agent惑畴。Flume允許用戶構(gòu)建一個復雜的數(shù)據(jù)流蛋欣,比如數(shù)據(jù)流經(jīng)多個Agent最終落地。
Flume數(shù)據(jù)傳輸示意圖:
Flume多數(shù)據(jù)源多Agent下的數(shù)據(jù)傳輸示意圖:
Flume多Sink多Agent下的數(shù)據(jù)傳輸示意圖:
關(guān)于Flume的實操內(nèi)容可以參考:
DataX
官方文檔:
DataX是阿里開源的異構(gòu)數(shù)據(jù)源離線同步工具如贷,致力于實現(xiàn)關(guān)系數(shù)據(jù)庫(MySQL陷虎、Oracle等)、HDFS杠袱、Hive尚猿、ODPS、 HBase楣富、 FTP等各種異構(gòu)數(shù)據(jù)源之間高效穩(wěn)定的數(shù)據(jù)同步功能凿掂。DataX將復雜的網(wǎng)狀的同步鏈路變成了星型數(shù)據(jù)同步鏈路,具有良好的擴展性。
網(wǎng)狀同步鏈路和DataX星型數(shù)據(jù)同步鏈路的對比圖:
DataX的架構(gòu)示意圖:
Datax數(shù)據(jù)采集實戰(zhàn)
官方文檔:
到GitHub上的下載地址下載DataX庄萎,或者拉取源碼進行編譯:
將下載好的安裝包踪少,上傳到服務(wù)器:
[root@hadoop ~]# cd /usr/local/src
[root@hadoop /usr/local/src]# ls |grep datax.tar.gz
datax.tar.gz
[root@hadoop /usr/local/src]#
將安裝包解壓到合適的目錄下:
[root@hadoop /usr/local/src]# tar -zxvf datax.tar.gz -C /usr/local
[root@hadoop /usr/local/src]# cd ../datax/
[root@hadoop /usr/local/datax]# ls
bin conf job lib plugin script tmp
[root@hadoop /usr/local/datax]#
執(zhí)行DataX的自檢腳本:
[root@hadoop /usr/local/datax]# python bin/datax.py job/job.json
...
任務(wù)啟動時刻 : 2020-11-13 11:21:01
任務(wù)結(jié)束時刻 : 2020-11-13 11:21:11
任務(wù)總計耗時 : 10s
任務(wù)平均流量 : 253.91KB/s
記錄寫入速度 : 10000rec/s
讀出記錄總數(shù) : 100000
讀寫失敗總數(shù) : 0
CSV文件數(shù)據(jù)導入Hive
檢測沒問題后,接下來簡單演示一下將CSV文件中的數(shù)據(jù)導入到Hive中糠涛。我們需要用到hdfswriter援奢,以及txtfilereader。官方文檔:
- https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md
- https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md
首先忍捡,到Hive中創(chuàng)建一個數(shù)據(jù)庫:
0: jdbc:hive2://localhost:10000> create database db01;
No rows affected (0.315 seconds)
0: jdbc:hive2://localhost:10000> use db01;
然后創(chuàng)建一張表:
create table log_dev2(
id int,
name string,
create_time int,
creator string,
info string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
stored as orcfile;
當庫集漾、表創(chuàng)建完成后,在HDFS中會有對應(yīng)的目錄文件:
[root@hadoop ~]# hdfs dfs -ls /user/hive/warehouse/db01.db
Found 1 items
drwxr-xr-x - root supergroup 0 2020-11-13 11:30 /user/hive/warehouse/db01.db/log_dev2
[root@hadoop ~]#
準備測試數(shù)據(jù):
[root@hadoop ~]# cat datax/db.csv
1,創(chuàng)建用戶,1554099545,hdfs,創(chuàng)建用戶 test
2,更新用戶,1554099546,yarn,更新用戶 test1
3,刪除用戶,1554099547,hdfs,刪除用戶 test2
4,更新用戶,1554189515,yarn,更新用戶 test3
5,刪除用戶,1554199525,hdfs,刪除用戶 test4
6,創(chuàng)建用戶,1554299345,yarn,創(chuàng)建用戶 test5
DataX通過json格式的配置文件來定義ETL任務(wù)锉罐,創(chuàng)建一個json文件:vim csv2hive.json
帆竹,我們要定義的ETL任務(wù)內(nèi)容如下:
{
"setting":{
},
"job":{
"setting":{
"speed":{
"channel":2
}
},
"content":[
{
"reader":{
"name":"txtfilereader",
"parameter":{
"path":[
"/root/datax/db.csv"
],
"encoding":"UTF-8",
"column":[
{
"index":0,
"type":"long"
},
{
"index":1,
"type":"string"
},
{
"index":2,
"type":"long"
},
{
"index":3,
"type":"string"
},
{
"index":4,
"type":"string"
}
],
"fieldDelimiter":","
}
},
"writer":{
"name":"hdfswriter",
"parameter":{
"defaultFS":"hdfs://192.168.243.161:8020",
"fileType":"orc",
"path":"/user/hive/warehouse/db01.db/log_dev2",
"fileName":"log_dev2.csv",
"column":[
{
"name":"id",
"type":"int"
},
{
"name":"name",
"type":"string"
},
{
"name":"create_time",
"type":"INT"
},
{
"name":"creator",
"type":"string"
},
{
"name":"info",
"type":"string"
}
],
"writeMode":"append",
"fieldDelimiter":",",
"compress":"NONE"
}
}
}
]
}
}
- datax使用json作為配置文件,文件可以是本地的也可以是遠程http服務(wù)器上面
- json配置文件最外層是一個
job
脓规,job
包含setting
和content
兩部分栽连,其中setting
用于對整個job
進行配置,content
是數(shù)據(jù)的源和目的 -
setting
:用于設(shè)置全局channe|配置侨舆,臟數(shù)據(jù)配置秒紧,限速配置等,本例中只配置了channel個數(shù)1挨下,也就是使用單線程執(zhí)行數(shù)據(jù)傳輸 -
content
:
-
reader:配置從哪里讀數(shù)據(jù)
-name
:插件名稱熔恢,需要和工程中的插件名保持-致
-parameter
:插件對應(yīng)的輸入?yún)?shù)
-path
:源數(shù)據(jù)文件的路徑
-encoding
:數(shù)據(jù)編碼
-fieldDelimiter
:數(shù)據(jù)分隔符
-column
:源數(shù)據(jù)按照分隔符分割之后的位置和數(shù)據(jù)類型 -
writer:配置將數(shù)據(jù)寫到哪里去
-name
:插件名稱,需要和工程中的插件名保持一致
-parameter
:插件對應(yīng)的輸入?yún)?shù)
-path
:目標路徑
-fileName
:目標文件名前綴
-writeMode
:寫入目標目錄的方式
通過DataX的Python腳本執(zhí)行我們定義的ETL任務(wù):
[root@hadoop ~]# python /usr/local/datax/bin/datax.py datax/csv2hive.json
...
任務(wù)啟動時刻 : 2020-11-15 11:10:20
任務(wù)結(jié)束時刻 : 2020-11-15 11:10:32
任務(wù)總計耗時 : 12s
任務(wù)平均流量 : 17B/s
記錄寫入速度 : 0rec/s
讀出記錄總數(shù) : 6
讀寫失敗總數(shù) : 0
查看HDFS中是否已存在相應(yīng)的數(shù)據(jù)文件:
[root@hadoop ~]# hdfs dfs -ls /user/hive/warehouse/db01.db/log_dev2
Found 1 items
-rw-r--r-- 3 root supergroup 825 2020-11-15 11:10 /user/hive/warehouse/db01.db/log_dev2/log_dev2.csv__f19a135d_6c22_4988_ae69_df39354acb1e
[root@hadoop ~]#
到Hive中驗證導入的數(shù)據(jù)是否符合預期:
0: jdbc:hive2://localhost:10000> use db01;
No rows affected (0.706 seconds)
0: jdbc:hive2://localhost:10000> show tables;
+-----------+
| tab_name |
+-----------+
| log_dev2 |
+-----------+
1 row selected (0.205 seconds)
0: jdbc:hive2://localhost:10000> select * from log_dev2;
+--------------+----------------+-----------------------+-------------------+----------------+
| log_dev2.id | log_dev2.name | log_dev2.create_time | log_dev2.creator | log_dev2.info |
+--------------+----------------+-----------------------+-------------------+----------------+
| 1 | 創(chuàng)建用戶 | 1554099545 | hdfs | 創(chuàng)建用戶 test |
| 2 | 更新用戶 | 1554099546 | yarn | 更新用戶 test1 |
| 3 | 刪除用戶 | 1554099547 | hdfs | 刪除用戶 test2 |
| 4 | 更新用戶 | 1554189515 | yarn | 更新用戶 test3 |
| 5 | 刪除用戶 | 1554199525 | hdfs | 刪除用戶 test4 |
| 6 | 創(chuàng)建用戶 | 1554299345 | yarn | 創(chuàng)建用戶 test5 |
+--------------+----------------+-----------------------+-------------------+----------------+
6 rows selected (1.016 seconds)
0: jdbc:hive2://localhost:10000>
MySQL數(shù)據(jù)導入Hive
接下來演示一下將MySQL數(shù)據(jù)導入Hive中臭笆。為了實現(xiàn)該功能叙淌,我們需要使用到mysqlreader來從MySQL中讀取數(shù)據(jù),其官方文檔如下:
首先愁铺,執(zhí)行如下SQL構(gòu)造一些測試數(shù)據(jù):
CREATE DATABASE datax_test;
USE `datax_test`;
CREATE TABLE `dev_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL,
`create_time` int(11) DEFAULT NULL,
`creator` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL,
`info` varchar(2000) COLLATE utf8_unicode_ci DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1069 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
insert into `dev_log`(`id`,`name`,`create_time`,`creator`,`info`) values
(1,'創(chuàng)建用戶',1554099545,'hdfs','創(chuàng)建用戶 test'),
(2,'更新用戶',1554099546,'yarn','更新用戶 test1'),
(3,'刪除用戶',1554099547,'hdfs','刪除用戶 test2'),
(4,'更新用戶',1554189515,'yarn','更新用戶 test3'),
(5,'刪除用戶',1554199525,'hdfs','刪除用戶 test4'),
(6,'創(chuàng)建用戶',1554299345,'yarn','創(chuàng)建用戶 test5');
然后到Hive的db01
數(shù)據(jù)庫中再創(chuàng)建一張表:
create table log_dev(
id int,
name string,
create_time int,
creator string,
info string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
stored as textfile;
創(chuàng)建ETL任務(wù)的配置文件:
[root@hadoop ~]# vim datax/mysql2hive.json
文件內(nèi)容如下:
{
"job":{
"setting":{
"speed":{
"channel":3
},
"errorLimit":{
"record":0,
"percentage":0.02
}
},
"content":[
{
"reader":{
"name":"mysqlreader",
"parameter":{
"username":"root",
"password":"123456a.",
"column":[
"id",
"name",
"create_time",
"creator",
"info"
],
"where":"creator='${creator}' and create_time>${create_time}",
"connection":[
{
"table":[
"dev_log"
],
"jdbcUrl":[
"jdbc:mysql://192.168.1.11:3306/datax_test?serverTimezone=Asia/Shanghai"
]
}
]
}
},
"writer":{
"name":"hdfswriter",
"parameter":{
"defaultFS":"hdfs://192.168.243.161:8020",
"fileType":"text",
"path":"/user/hive/warehouse/db01.db/log_dev",
"fileName":"log_dev3.csv",
"column":[
{
"name":"id",
"type":"int"
},
{
"name":"name",
"type":"string"
},
{
"name":"create_time",
"type":"INT"
},
{
"name":"creator",
"type":"string"
},
{
"name":"info",
"type":"string"
}
],
"writeMode":"append",
"fieldDelimiter":",",
"compress":"GZIP"
}
}
}
]
}
}
- mysqlreader支持傳入
where
條件來過濾需要讀取的數(shù)據(jù)鹰霍,具體參數(shù)可以在執(zhí)行datax腳本時傳入,我們可以通過這種變量替換的方式實現(xiàn)增量同步的支持
mysqlreader默認的驅(qū)動包是5.x的茵乱,由于我這里的MySQL版本是8.x茂洒,所以需要替換一下mysqlreader中的驅(qū)動包:
[root@hadoop ~]# cp /usr/local/src/mysql-connector-java-8.0.21.jar /usr/local/datax/plugin/reader/mysqlreader/libs/
[root@hadoop ~]# rm -rf /usr/local/datax/plugin/reader/mysqlreader/libs/mysql-connector-java-5.1.34.jar
然后執(zhí)行該ETL任務(wù):
[root@hadoop ~]# python /usr/local/datax/bin/datax.py datax/mysql2hive.json -p "-Dcreator=yarn -Dcreate_time=1554099547"
...
任務(wù)啟動時刻 : 2020-11-15 11:38:14
任務(wù)結(jié)束時刻 : 2020-11-15 11:38:25
任務(wù)總計耗時 : 11s
任務(wù)平均流量 : 5B/s
記錄寫入速度 : 0rec/s
讀出記錄總數(shù) : 2
讀寫失敗總數(shù) : 0
查看HDFS中是否已存在相應(yīng)的數(shù)據(jù)文件:
[root@hadoop ~]# hdfs dfs -ls /user/hive/warehouse/db01.db/log_dev
Found 1 items
-rw-r--r-- 3 root supergroup 84 2020-11-15 11:38 /user/hive/warehouse/db01.db/log_dev/log_dev3.csv__d142f3ee_126e_4056_af49_b56e45dec1ef.gz
[root@hadoop ~]#
到Hive中驗證導入的數(shù)據(jù)是否符合預期:
0: jdbc:hive2://localhost:10000> select * from log_dev;
+-------------+---------------+----------------------+------------------+---------------+
| log_dev.id | log_dev.name | log_dev.create_time | log_dev.creator | log_dev.info |
+-------------+---------------+----------------------+------------------+---------------+
| 4 | 更新用戶 | 1554189515 | yarn | 更新用戶 test3 |
| 6 | 創(chuàng)建用戶 | 1554299345 | yarn | 創(chuàng)建用戶 test5 |
+-------------+---------------+----------------------+------------------+---------------+
2 rows selected (0.131 seconds)
0: jdbc:hive2://localhost:10000>
數(shù)據(jù)治理簡介
將數(shù)據(jù)采集到數(shù)倉后所面臨的問題:
- 相比傳統(tǒng)數(shù)倉大數(shù)據(jù)時代數(shù)據(jù)更加多樣、更加復雜瓶竭、數(shù)據(jù)量更大
- 隨處可見的數(shù)據(jù)不統(tǒng)一督勺、難以提升的數(shù)據(jù)質(zhì)量、難以完成的數(shù)據(jù)模型梳理
- 多種采集工具斤贰、多種存儲方式使數(shù)據(jù)倉庫or數(shù)據(jù)湖逐漸變成數(shù)據(jù)沼澤
數(shù)據(jù)治理需要解決的問題:
- 數(shù)據(jù)不可知:用戶不知道有哪些數(shù)據(jù)智哀、不知道數(shù)據(jù)和業(yè)務(wù)的關(guān)系
- 數(shù)據(jù)不可控:沒有統(tǒng)一的數(shù)據(jù)標準,數(shù)據(jù)無法集成和統(tǒng)一
- 數(shù)據(jù)不可扔小:用戶不能便捷的取到數(shù)據(jù)瓷叫,或者取到的數(shù)據(jù)不可用
- 數(shù)據(jù)不可聯(lián):數(shù)據(jù)之間的關(guān)系沒有體現(xiàn)出來,數(shù)據(jù)深層價值無法體現(xiàn)
數(shù)據(jù)治理的目標:
- 建立統(tǒng)一數(shù)據(jù)標準與數(shù)據(jù)規(guī)范,保障數(shù)據(jù)質(zhì)量
- 制定數(shù)據(jù)管理流程赞辩,把控數(shù)據(jù)整個生命周期
- 形成平臺化工具,提供給用戶使用
數(shù)據(jù)治理:
- 數(shù)據(jù)治理包括元數(shù)據(jù)管理授艰、數(shù)據(jù)質(zhì)量管理辨嗽、數(shù)據(jù)血緣管理等
- 數(shù)據(jù)治理在數(shù)據(jù)采集、數(shù)據(jù)清洗淮腾、數(shù)據(jù)計算等各個環(huán)節(jié)
- 數(shù)據(jù)治理難得不是技術(shù)糟需,而是流程、協(xié)同和管理
元數(shù)據(jù)管理:
- 管理數(shù)據(jù)的庫表結(jié)構(gòu)等schema信息
- 數(shù)據(jù)存儲空間谷朝、讀寫記錄洲押、權(quán)限歸屬及其他各類統(tǒng)計信息
數(shù)據(jù)血緣管理:
- 數(shù)據(jù)之間的血緣關(guān)系及生命周期
- B表的數(shù)據(jù)從A表匯總而來,那么B和A表就具有血緣關(guān)系
- 數(shù)據(jù)的業(yè)務(wù)屬性信息和業(yè)務(wù)數(shù)據(jù)模型
數(shù)據(jù)治理步驟簡述:
- 統(tǒng)一數(shù)據(jù)規(guī)范和數(shù)據(jù)定義圆凰,打通業(yè)務(wù)模型和技術(shù)模型
- 提升數(shù)據(jù)質(zhì)量杈帐,實現(xiàn)數(shù)據(jù)全生命周期管理
- 挖掘數(shù)據(jù)價值,幫助業(yè)務(wù)人員便捷靈活的使用數(shù)據(jù)
數(shù)據(jù)治理與周邊系統(tǒng):
- ODS专钉、DWD挑童、DM等各層次元數(shù)據(jù)納入數(shù)據(jù)治理平臺集中管理
- 數(shù)據(jù)采集及處理流程中產(chǎn)生的元數(shù)據(jù)納入數(shù)據(jù)治理平臺,并建立血緣關(guān)系
- 提供數(shù)據(jù)管理的服務(wù)接口跃须,數(shù)據(jù)模型變更及時通知上下游
Apache Atlas數(shù)據(jù)治理
常見的數(shù)據(jù)治理工具:
- Apache Atlas:Hortonworks主推的數(shù)據(jù)治理開源項目
- Metacat:Netflix開源的元數(shù)據(jù)管理站叼、數(shù)據(jù)發(fā)現(xiàn)組件
- Navigator:Cloudera提供的數(shù)據(jù)管理的解決方案
- WhereHows:LinkedIn內(nèi)部使用并開源的數(shù)據(jù)管理解決方案
Apache Altas:
- 數(shù)據(jù)分類:自動捕獲、定義和注釋元數(shù)據(jù)菇民,對數(shù)據(jù)進行業(yè)務(wù)導向分類
- 集中審計:捕獲所有步驟尽楔、應(yīng)用及數(shù)據(jù)交互的訪問信息
- 搜索與血緣:基于分類和審計關(guān)聯(lián)數(shù)據(jù)與數(shù)據(jù)的關(guān)系,并通過可視化的方式展現(xiàn)
Apache Altas架構(gòu)圖:
- Type System:對需要管理的元數(shù)據(jù)對象抽象的實體第练,由類型構(gòu)成
- Ingest\Export:元數(shù)據(jù)的自動采集和導出工具阔馋,Export可以作 為事件進行觸發(fā),使用戶可以及時響應(yīng)
- Graph Engine:通過圖數(shù)據(jù)庫和圖計算弓|擎展現(xiàn)數(shù)據(jù)之間的關(guān)系
元數(shù)據(jù)捕獲:
- Hook:來自各個組件的Hook自動捕獲數(shù)據(jù)進行存儲
- Entity:集成的各個系統(tǒng)在操作時觸發(fā)事件進行寫入
- 獲取元數(shù)據(jù)的同時复旬,獲取數(shù)據(jù)之間的關(guān)聯(lián)關(guān)系垦缅,構(gòu)建血緣