前言
最近公司啟動(dòng)了一個(gè)規(guī)劃2年的項(xiàng)目,是做一個(gè)數(shù)據(jù)平臺(tái)槽畔。主要包括數(shù)據(jù)同步(實(shí)時(shí)/離線)栈妆、mapping(實(shí)時(shí)/離線)、數(shù)倉(cāng)(實(shí)時(shí)/離線)、源數(shù)據(jù)管理签钩、數(shù)據(jù)血緣掏呼、調(diào)度、BI等铅檩。架構(gòu)分層自上而下為上層業(yè)務(wù)憎夷、中臺(tái)服務(wù)、底層提供基礎(chǔ)能力昧旨。
項(xiàng)目規(guī)劃的比較大拾给,萬(wàn)丈高樓平地起,只能從源頭開始著手兔沃,源頭當(dāng)然是數(shù)據(jù)接入這塊了蒋得。數(shù)據(jù)同步分為實(shí)時(shí)同步和批量同步,批量同步分為全量乒疏、增量和增量更新额衙,目前官網(wǎng)的datax已經(jīng)支持全量和增量的同步了,但是沒(méi)有支持增量的更新怕吴,所以筆者打算擴(kuò)展datax窍侧,支持hudiwriter來(lái)完善datax支持更新的業(yè)務(wù)場(chǎng)景。
Datax架構(gòu)圖
datax采用Framework + plugin架構(gòu)構(gòu)建转绷。其中Framework處理了緩沖伟件,限流,并發(fā)议经,上下文加載等技術(shù)問(wèn)題斧账,數(shù)據(jù)源讀取和寫入抽象成為Reader/Writer接口,如果內(nèi)置的plugin無(wú)法滿足我們的場(chǎng)景煞肾,開發(fā)者可以自己編寫plugin定制功能咧织。
業(yè)務(wù)上plugin分為reader和writer:
- reader為數(shù)據(jù)采集模塊,負(fù)責(zé)采集數(shù)據(jù)源的數(shù)據(jù)籍救,將數(shù)據(jù)發(fā)送給Framework拯爽。
- writer為數(shù)據(jù)寫入模塊,負(fù)責(zé)不斷向Framework取數(shù)據(jù)钧忽,并將數(shù)據(jù)寫入到目的端毯炮。
功能上plugin分為job和task:
- Job是DataX用以描述從一個(gè)源頭到一個(gè)目的端的同步作業(yè),是DataX數(shù)據(jù)同步的最小業(yè)務(wù)單元耸黑。比如:從一張mysql的表同步到odps的一個(gè)表的特定分區(qū)桃煎。
- Task是為最大化而把Job拆分得到的最小執(zhí)行單元。比如:讀一張有1024個(gè)分表的mysql分庫(kù)分表的Job大刊,拆分成1024個(gè)讀Task为迈,用若干個(gè)并發(fā)執(zhí)行。
項(xiàng)目結(jié)構(gòu)
- HudiWriter:主流程
- Key、HudiWriterErrorCode業(yè)務(wù)邏輯需要的類葫辐,非必須
- package.xml:全局的package,添加插件的打包內(nèi)容
- plugin.json:對(duì)插件本身的描述搜锰,重點(diǎn)是name和class,name表示插件名稱耿战,class表示插件的入口類蛋叼,必須準(zhǔn)確無(wú)誤;
- plugin_job_template.json:插件的示例配置文件剂陡。
代碼
github : https://github.com/dongpengfei2/DataX/tree/evyd-1.0.0
HudiWriter.java
package com.alibaba.datax.plugin.writer.hudiwriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static com.alibaba.datax.plugin.writer.hudiwriter.HudiWriterErrorCode.HUDI_ERROR_TABLE;
import static com.alibaba.datax.plugin.writer.hudiwriter.HudiWriterErrorCode.HUDI_PARAM_LOST;
import static com.alibaba.datax.plugin.writer.hudiwriter.Key.*;
/**
* Created by david.dong on 22-8-21.
*/
public class HudiWriter extends Writer {
public static class Job extends Writer.Job {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private Configuration originalConfig;
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
}
@Override
public void prepare() {
}
@Override
public void post() {
}
@Override
public void destroy() {
}
@Override
public List<Configuration> split(int mandatoryNumber) {
List<Configuration> list = new ArrayList<>();
for (int i = 0; i < mandatoryNumber; i++) {
list.add(originalConfig.clone());
}
return list;
}
}
public static class Task extends Writer.Task {
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private String primaryKey;
private String partitionFields;
private String writeOption;
private int batchSize;
private Configuration sliceConfig;
private List<Configuration> columnsList;
private List<String> partitionList;
Schema avroSchema;
private HoodieJavaWriteClient<HoodieAvroPayload> client;
@Override
public void init() {
//獲取與本task相關(guān)的配置
this.sliceConfig = super.getPluginJobConf();
String tableName = sliceConfig.getNecessaryValue(Key.HUDI_TABLE_NAME, HUDI_ERROR_TABLE);
String tablePath = sliceConfig.getNecessaryValue(Key.HUDI_TABLE_PATH, HUDI_PARAM_LOST);
String tableType = sliceConfig.getNecessaryValue(Key.HUDI_TABLE_TYPE, HUDI_PARAM_LOST);
primaryKey = sliceConfig.getNecessaryValue(Key.HUDI_PRIMARY_KEY, HUDI_PARAM_LOST);
partitionFields = sliceConfig.getString(Key.HUDI_PARTITION_FIELDS);
writeOption = sliceConfig.getNecessaryValue(Key.HUDI_WRITE_OPTION, HUDI_PARAM_LOST);
columnsList = sliceConfig.getListConfiguration(Key.HUDI_COLUMN);
batchSize = sliceConfig.getInt(HUDI_BATCH_SIZE);
partitionList = StringUtils.isEmpty(partitionFields) ? new ArrayList<>() : Arrays.asList(partitionFields.split(","));
org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
try {
//是否有Kerberos認(rèn)證
Boolean haveKerberos = sliceConfig.getBool(HAVE_KERBEROS, false);
if(haveKerberos){
String kerberosKeytabFilePath = sliceConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);
String kerberosPrincipal = sliceConfig.getString(Key.KERBEROS_PRINCIPAL);
hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos");
this.kerberosAuthentication(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf);
}
//初始化HDFS
Path path = new Path(tablePath);
FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
if (!fs.exists(path)) {
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HUDI_WRITE_TYPE_MOR.equals(tableType) ? HoodieTableType.MERGE_ON_READ : HoodieTableType.COPY_ON_WRITE)
.setTableName(tableName)
.setPayloadClassName(HoodieAvroPayload.class.getName())
.initTable(hadoopConf, tablePath);
}
} catch (IOException e) {
LOG.error(ExceptionUtils.getStackTrace(e));
}
JSONArray fields = new JSONArray();
for (Configuration columnConfig : columnsList) {
JSONObject confObject = new JSONObject();
confObject.put("name", columnConfig.getString("name"));
String configType = columnConfig.getString("type");
confObject.put("type", "date".equals(configType) || "datetime".equals(configType) ? "string" : configType);
fields.add(confObject);
}
JSONObject schemaObject = new JSONObject();
schemaObject.put("type", "record");
schemaObject.put("name", "triprec");
schemaObject.put("fields", fields);
String schemaStr = schemaObject.toJSONString();
avroSchema = new Schema.Parser().parse(schemaStr);
// Create the write client to write some records in
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(schemaStr).withParallelism(2, 2)
.withDeleteParallelism(2).forTable(tableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()).build();
client =
new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg);
}
@Override
public void prepare() {
}
@Override
public void startWrite(RecordReceiver recordReceiver) {
Record record;
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
DateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
AtomicLong counter = new AtomicLong(0);
List<HoodieRecord<HoodieAvroPayload>> writeRecords = new ArrayList<>();
while ((record = recordReceiver.getFromReader()) != null) {
GenericRecord row = new GenericData.Record(avroSchema);
for (int i=0; i<columnsList.size(); i++) {
Configuration configuration = columnsList.get(i);
String columnName = configuration.getString("name");
String columnType = configuration.getString("type");
Column column = record.getColumn(i);
Object rawData = column.getRawData();
if (rawData == null) {
row.put(columnName, null);
continue;
}
switch (columnType) {
case "int":
row.put(columnName, Integer.parseInt(rawData.toString()));
break;
case "float":
row.put(columnName, Float.parseFloat(rawData.toString()));
break;
case "double":
row.put(columnName, Double.parseDouble(rawData.toString()));
break;
case "date":
row.put(columnName, dateFormat.format(rawData));
break;
case "datetime":
row.put(columnName, dateTimeFormat.format(rawData));
break;
case "string":
default:
row.put(columnName, rawData.toString());
}
}
String partitionPath = "";
if (!partitionList.isEmpty()) {
List<Object> values = partitionList.stream().map(row::get).collect(Collectors.toList());
partitionPath = StringUtils.join(values, "/");
}
HoodieKey key = new HoodieKey(row.get(primaryKey).toString(), partitionPath);
HoodieRecord<HoodieAvroPayload> hoodieAvroPayload = new HoodieRecord<>(key, new HoodieAvroPayload(Option.of(row)));
writeRecords.add(hoodieAvroPayload);
long num = counter.incrementAndGet();
if (num >= batchSize) {
flushCache(writeRecords);
writeRecords.clear();
counter.set(0L);
}
}
if (!writeRecords.isEmpty()) {
flushCache(writeRecords);
}
}
private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf){
if(StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)){
UserGroupInformation.setConfiguration(hadoopConf);
try {
UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
} catch (Exception e) {
String message = String.format("kerberos認(rèn)證失敗,請(qǐng)確定kerberosKeytabFilePath[%s]和kerberosPrincipal[%s]填寫正確",
kerberosKeytabFilePath, kerberosPrincipal);
LOG.error(message);
throw DataXException.asDataXException(HudiWriterErrorCode.KERBEROS_LOGIN_ERROR, e);
}
}
}
private void flushCache(List<HoodieRecord<HoodieAvroPayload>> writeRecords) {
String commitTime = client.startCommit();
LOG.info("Starting commit " + commitTime);
switch (writeOption) {
case HUDI_WRITE_OPTION_INSERT:
client.insert(writeRecords, commitTime);
break;
case HUDI_WRITE_OPTION_BULK_INSERT:
client.bulkInsert(writeRecords, commitTime);
break;
case HUDI_WRITE_OPTION_UPSERT:
client.upsert(writeRecords, commitTime);
break;
}
}
@Override
public void post() {
}
@Override
public void destroy() {
if (client!=null) {
client.close();
}
}
}
}
Key.java
package com.alibaba.datax.plugin.writer.hudiwriter;
public class Key {
public static final String HUDI_TABLE_NAME = "tableName";
public static final String HUDI_TABLE_PATH = "tablePath";
public static final String HUDI_PRIMARY_KEY = "primaryKey";
public static final String HUDI_PARTITION_FIELDS = "partitionFields";
public static final String HUDI_TABLE_TYPE = "tableType";
public static final String HUDI_BATCH_SIZE = "batchSize";
public static final String HUDI_WRITE_OPTION = "writeOption";
public static final String HUDI_COLUMN = "column";
public static final String HUDI_WRITE_OPTION_INSERT = "insert";
public static final String HUDI_WRITE_OPTION_BULK_INSERT = "bulk_insert";
public static final String HUDI_WRITE_OPTION_UPSERT = "upsert";
public static final String HUDI_WRITE_TYPE_COW = "cow";
public static final String HUDI_WRITE_TYPE_MOR = "mor";
// Kerberos
public static final String HAVE_KERBEROS = "haveKerberos";
public static final String KERBEROS_KEYTAB_FILE_PATH = "kerberosKeytabFilePath";
public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";
}
HudiWriterErrorCode.java
package com.alibaba.datax.plugin.writer.hudiwriter;
import com.alibaba.datax.common.spi.ErrorCode;
public enum HudiWriterErrorCode implements ErrorCode {
HUDI_ERROR_TABLE("Hudi Error Table", "您的參數(shù)配置錯(cuò)誤."),
HUDI_PARAM_LOST("Hudi Param Lost", "您缺失了必須填寫的參數(shù)值."),
HDFS_CONNECT_ERROR("Hdfs Connect Error", "與HDFS建立連接時(shí)出現(xiàn)IO異常."),
KERBEROS_LOGIN_ERROR("Hdfs Login Error", "KERBEROS認(rèn)證失敗");
private final String code;
private final String description;
HudiWriterErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s].", this.code,
this.description);
}
}
plugin.json
{
"name": "hudiwriter",
"class": "com.alibaba.datax.plugin.writer.hudiwriter.HudiWriter",
"description": "useScene: test. mechanism: use datax framework to transport data to hudi. warn: The more you know about the data, the less problems you encounter.",
"developer": "alibaba"
}
plugin_job_template.json
{
"name": "hudiwriter",
"parameter": {
"tableName": "",
"tablePath": "",
"tableType": "",
"writeOption": "",
"primaryKey": "",
"partitionFields": "",
"batchSize": "",
"column": []
}
}
測(cè)試
wlapp_user_mysql_to_hudi.json
{
"job": {
"setting": {
"speed": {
"channel": 2
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name",
"age",
"dt",
"score",
"create_at",
"update_at"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/wlapp?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"
],
"table": [
"user"
]
}
],
"password": "123456",
"username": "root",
"where": ""
}
},
"writer": {
"name": "hudiwriter",
"parameter": {
"tableName": "user",
"tablePath": "hdfs://localhost:9000/user/hive/warehouse/wlapp.db/user",
"tableType": "mor",
"writeOption": "upsert",
"primaryKey": "id",
"partitionFields": "dt",
"batchSize": 100,
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
},
{
"name": "dt",
"type": "date"
},
{
"name": "score",
"type": "double"
},
{
"name": "create_at",
"type": "datetime"
},
{
"name": "update_at",
"type": "datetime"
}
]
}
}
}
]
}
}
啟動(dòng)命令
python bin/datax.py job/wlapp_user_mysql_to_hudi.json;
帶參數(shù)啟動(dòng)命令
#會(huì)替換掉wlapp_user_mysql_to_hudi.json中的 ${date} 變量
python bin/datax.py -p"-Ddate='2022-08-25 00:00:00'" job/wlapp_user_mysql_to_hudi.json;
結(jié)
目前測(cè)試過(guò)程中運(yùn)行正常狈涮,線上使用有問(wèn)題會(huì)持續(xù)更新