本文為學(xué)習(xí)筆記污尉,會隨著學(xué)習(xí)深入持續(xù)更新姜盈,僅供參考
一硕蛹、datax干嘛的和之前的seatunnel使用上有什么區(qū)別
DataX 是阿里巴巴開發(fā)的一個開源數(shù)據(jù)同步工具浪漠。
DataX :更加專注于批量數(shù)據(jù)的同步和遷移秕豫,適用于數(shù)據(jù)倉庫建設(shè)朴艰、數(shù)據(jù)備份和數(shù)據(jù)遷移等場景。主要支持批處理模式混移,通過配置 JSON 文件來定義數(shù)據(jù)同步任務(wù)呵晚。不依賴于其他大數(shù)據(jù)計算框架。
SeaTunnel :側(cè)重于實時數(shù)據(jù)處理和批處理任務(wù)沫屡,適用于實時 ETL饵隙、數(shù)據(jù)流處理和數(shù)據(jù)分析等場景。支持流處理和批處理兩種模式沮脖,更適合實時數(shù)據(jù)處理任務(wù)金矛。基于 Apache Spark 和 Apache Flink 構(gòu)建勺届,具有分布式計算的優(yōu)勢和擴展性驶俊。
二、dataX實現(xiàn)數(shù)據(jù)全量同步舉例
1. 安裝 DataX
首先免姿,需要安裝 DataX饼酿。可以從 GitHub 上下載 DataX 源碼或預(yù)編譯的二進制包:
git clone https://github.com/alibaba/DataX.git
進入 DataX 目錄并編譯:
cd DataX
mvn clean package assembly:assembly -Dmaven.test.skip=true
2. 配置 JSON 文件
DataX 使用 JSON 文件來配置數(shù)據(jù)同步任務(wù)胚膊。以 MySQL 到 Hadoop 的數(shù)據(jù)同步為例故俐,首先需要創(chuàng)建一個 JSON 配置文件,例如 mysql_to_hadoop.json
紊婉。
以下是一個簡單的配置示例:
{
"job": {
"setting": {
"speed": {
"channel": 3 // 并行度药版,可以根據(jù)需要調(diào)整
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "your_mysql_username",
"password": "your_mysql_password",
"connection": [
{
"table": ["your_table_name"],
"jdbcUrl": ["jdbc:mysql://your_mysql_host:3306/your_database"]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://your_hadoop_cluster",
"fileType": "text",
"path": "/user/hadoop/your_hdfs_path/",
"fileName": "your_file_name",
"column": [
{"name": "column1", "type": "string"},
{"name": "column2", "type": "int"},
{"name": "column3", "type": "date"}
],
"writeMode":"truncate",
"fieldDelimiter": "\t"
}
}
}
]
}
}
3. 配置項說明
- reader 部分:配置 MySQL 數(shù)據(jù)源的連接信息,包括用戶名喻犁、密碼槽片、表名和 JDBC URL何缓。
-
writer 部分:配置 Hadoop HDFS 的目標(biāo)路徑、文件類型(如 text还栓、orc碌廓、parquet 等)、文件名剩盒、字段信息及寫入模式氓皱。
setting:任務(wù)的全局設(shè)置。
在 DataX 的 hdfswriter 配置中勃刨,writeMode 參數(shù)用于指定寫入數(shù)據(jù)到目標(biāo) HDFS 時的模式
append:追加模式。如果目標(biāo)路徑下已經(jīng)存在文件股淡,將在文件末尾追加新數(shù)據(jù)身隐,而不會覆蓋原有數(shù)據(jù)。
nonConflict:非沖突模式唯灵。如果目標(biāo)路徑下已經(jīng)存在文件贾铝,任務(wù)會報錯并終止,以避免覆蓋現(xiàn)有文件埠帕。
truncate:截斷模式垢揩。如果目標(biāo)路徑下已經(jīng)存在文件,將刪除現(xiàn)有文件敛瓷,然后寫入新數(shù)據(jù)
hadoopConfig:傳遞 Hadoop 相關(guān)配置項叁巨,如果配置了 dfs.replication 為 1,表示每個文件塊只保留一個副本呐籽。生產(chǎn)環(huán)境可以相應(yīng)提高锋勺。
參數(shù)詳細說明
4. 執(zhí)行同步任務(wù)
在命令行中執(zhí)行以下命令運行 DataX 任務(wù):
python {DATAX_HOME}/bin/datax.py /path/to/your/mysql_to_hadoop.json
5. 檢查同步結(jié)果
同步完成后,可以通過 HDFS 命令行或其他工具檢查數(shù)據(jù)是否成功寫入到指定的 HDFS 路徑狡蝶。
hdfs dfs -ls /user/hadoop/your_hdfs_path/
hdfs dfs -cat /user/hadoop/your_hdfs_path/your_file_name
三庶橱、DataX實現(xiàn)數(shù)據(jù)增量同步舉例
a. 時間戳字段或自增字段:通過數(shù)據(jù)表中的時間戳字段(如 last_update_time
)或自增字段(如 id
),在每次同步時記錄最后同步的位置贪惹,下次同步時只同步新增加或更新的數(shù)據(jù)苏章。
b. 支持的插件:一些 DataX 插件支持增量同步,如 MySQL Reader 和 Writer 插件奏瞬,通過配置 SQL 查詢條件來實現(xiàn)增量同步枫绅。
示例配置
還是以mysql到hdfs為例。
1. 配置文件
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "${username}",
"password": "${password}",
"column": [
"id", "name", "update_time"
],
"splitPk": "id",
"connection": [
{
"table": [
"${table}"
],
"jdbcUrl": [
"jdbc:mysql://${host}:${port}/${database}"
]
}
],
"where": "update_time > '${last_update_time}'"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://${hdfs_host}:${hdfs_port}",
"fileType": "text",
"path": "/path/to/hdfs/${table}",
"fileName": "${table}",
"column": [
{"name": "id", "type": "bigint"},
{"name": "name", "type": "string"},
{"name": "update_time", "type": "timestamp"}
],
"writeMode": "append",
"fieldDelimiter": "\t",
"compress": "gzip",
"hadoopConfig": {
"dfs.replication": "1"
}
}
}
}
]
}
}
2. 使用腳本自動更新時間戳
為了實現(xiàn)真正的增量同步硼端,需要在每次同步完成后更新 last_update_time
撑瞧。可以通過腳本來實現(xiàn):
#!/bin/bash
# MySQL 配置信息
MYSQL_HOST="your_mysql_host"
MYSQL_PORT="your_mysql_port"
MYSQL_DATABASE="your_database"
MYSQL_USERNAME="your_username"
MYSQL_PASSWORD="your_password"
# HDFS 配置信息
HDFS_HOST="your_hdfs_host"
HDFS_PORT="your_hdfs_port"
# 定義變量
LAST_UPDATE_FILE="last_update_time.txt"
JOB_TEMPLATE="job_template.json"
TABLES=("table1" "table2" "table3") # 需要同步的表名列表
# 讀取最后一次同步的時間戳
if [ ! -f "$LAST_UPDATE_FILE" ]; then
echo "1970-01-01 00:00:00" > $LAST_UPDATE_FILE
fi
LAST_UPDATE_TIME=$(cat $LAST_UPDATE_FILE)
# 遍歷所有表并生成配置文件
for TABLE in "${TABLES[@]}"; do
JOB_FILE="job_${TABLE}.json"
sed -e "s/\${username}/${MYSQL_USERNAME}/g" \
-e "s/\${password}/${MYSQL_PASSWORD}/g" \
-e "s/\${host}/${MYSQL_HOST}/g" \
-e "s/\${port}/${MYSQL_PORT}/g" \
-e "s/\${database}/${MYSQL_DATABASE}/g" \
-e "s/\${table}/${TABLE}/g" \
-e "s/\${last_update_time}/${LAST_UPDATE_TIME}/g" \
-e "s/\${hdfs_host}/${HDFS_HOST}/g" \
-e "s/\${hdfs_port}/${HDFS_PORT}/g" \
$JOB_TEMPLATE > $JOB_FILE
# 執(zhí)行 DataX 同步任務(wù)
python /path/to/datax/bin/datax.py $JOB_FILE
# 檢查是否成功
if [ $? -ne 0 ]; then
echo "DataX job for table ${TABLE} failed!"
exit 1
fi
done
# 更新 last_update_time
NEW_LAST_UPDATE_TIME=$(date '+%Y-%m-%d %H:%M:%S')
echo $NEW_LAST_UPDATE_TIME > $LAST_UPDATE_FILE
echo "DataX incremental sync completed successfully."
四显蝌、其他問題
1预伺、如果想設(shè)置mysql和hdfs中的字段不一致怎么設(shè)置订咸?
-
reader:
- column:定義從 MySQL 讀取的列名。這些列名需要與數(shù)據(jù)庫中的實際列名一致酬诀。
-
writer:
-
column:定義寫入 HDFS 的列名和數(shù)據(jù)類型脏嚷。這里可以與
reader
中的列名不同,實現(xiàn)字段名的映射和數(shù)據(jù)類型的轉(zhuǎn)換瞒御。
-
column:定義寫入 HDFS 的列名和數(shù)據(jù)類型脏嚷。這里可以與
注意事項
-
字段順序:確保
reader
和writer
中的列順序一致父叙,以保證數(shù)據(jù)正確映射。例如肴裙,reader
讀取的第一個列id
將映射到writer
的第一個列user_id
趾唱。 -
數(shù)據(jù)類型轉(zhuǎn)換:DataX 在處理數(shù)據(jù)時會嘗試進行類型轉(zhuǎn)換,但為了避免數(shù)據(jù)丟失或格式錯誤蜻懦,盡量確保源數(shù)據(jù)類型和目標(biāo)數(shù)據(jù)類型兼容甜癞。例如,將 MySQL 的
DECIMAL
轉(zhuǎn)換為 HDFS 的DOUBLE
宛乃。 -
數(shù)據(jù)驗證:在數(shù)據(jù)同步過程中悠咱,可以通過設(shè)置
errorLimit
來控制允許的錯誤數(shù)量和比例,以便及時發(fā)現(xiàn)和處理數(shù)據(jù)轉(zhuǎn)換中的問題征炼。
參考鏈接
1析既、官方文檔