DataX簡介
DataX 是阿里云 DataWorks數(shù)據(jù)集成 的開源版本诡蜓,在阿里巴巴集團內(nèi)被廣泛使用的離線數(shù)據(jù)同步工具/平臺。DataX 實現(xiàn)了包括 MySQL粟耻、Oracle垢乙、OceanBase锨咙、SqlServer、Postgre追逮、HDFS酪刀、Hive、ADS钮孵、HBase骂倘、TableStore(OTS)、MaxCompute(ODPS)巴席、Hologres历涝、DRDS 等各種異構(gòu)數(shù)據(jù)源之間高效的數(shù)據(jù)同步功能。
DataX 商業(yè)版本
阿里云DataWorks數(shù)據(jù)集成是DataX團隊在阿里云上的商業(yè)化產(chǎn)品漾唉,致力于提供復(fù)雜網(wǎng)絡(luò)環(huán)境下荧库、豐富的異構(gòu)數(shù)據(jù)源之間高速穩(wěn)定的數(shù)據(jù)移動能力,以及繁雜業(yè)務(wù)背景下的數(shù)據(jù)同步解決方案赵刑。目前已經(jīng)支持云上近3000家客戶分衫,單日同步數(shù)據(jù)超過3萬億條。DataWorks數(shù)據(jù)集成目前支持離線50+種數(shù)據(jù)源般此,可以進行整庫遷移蚪战、批量上云、增量同步铐懊、分庫分表等各類同步解決方案邀桑。2020年更新實時同步能力,2020年更新實時同步能力科乎,支持10+種數(shù)據(jù)源的讀寫任意組合概漱。提供MySQL,Oracle等多種數(shù)據(jù)源到阿里云MaxCompute喜喂,Hologres等大數(shù)據(jù)引擎的一鍵全增量同步解決方案。
商業(yè)版本參見: https://www.aliyun.com/product/bigdata/ide
DataX的特點
DataX本身作為數(shù)據(jù)同步框架竿裂,將不同數(shù)據(jù)源的同步抽象為從源頭數(shù)據(jù)源讀取數(shù)據(jù)的Reader插件玉吁,以及向目標(biāo)端寫入數(shù)據(jù)的Writer插件,理論上DataX框架可以支持任意數(shù)據(jù)源類型的數(shù)據(jù)同步工作腻异。同時DataX插件體系作為一套生態(tài)系統(tǒng), 每接入一套新數(shù)據(jù)源該新加入的數(shù)據(jù)源即可實現(xiàn)和現(xiàn)有的數(shù)據(jù)源互通进副。
DataX同步Hive數(shù)據(jù)丟失
使用Datax進行兩個集群間的數(shù)據(jù)同步,在讀取HDFS大文件數(shù)據(jù)時,存在出現(xiàn)數(shù)據(jù)丟失問題影斑。從上文我們知道DataX的數(shù)據(jù)同步原理给赞,就是將不同數(shù)據(jù)源的同步抽象為從源頭數(shù)據(jù)源讀取數(shù)據(jù)的Reader插件,以及向目標(biāo)端寫入數(shù)據(jù)的Writer插件矫户。為了適配各種異構(gòu)的數(shù)據(jù)存儲介質(zhì)片迅,DataX源碼在設(shè)計的時候針對不同的數(shù)據(jù)源編寫了相應(yīng)的Reader插件和Writer插件。既然問題是在數(shù)據(jù)源讀取就存在數(shù)據(jù)丟失的問題皆辽,我們不妨看看DataX得源碼實現(xiàn)柑蛇。
DataX的Hive數(shù)據(jù)源HdfsReader插件
HdfsReader實現(xiàn)了從Hadoop分布式文件系統(tǒng)Hdfs中讀取文件數(shù)據(jù)并轉(zhuǎn)為DataX協(xié)議的功能。textfile是Hive建表時默認使用的存儲格式驱闷,數(shù)據(jù)不做壓縮耻台,本質(zhì)上textfile就是以文本的形式將數(shù)據(jù)存放在hdfs中,對于DataX而言空另,HdfsReader實現(xiàn)上類比TxtFileReader盆耽,有諸多相似之處。orcfile扼菠,它的全名是Optimized Row Columnar file摄杂,是對RCFile做了優(yōu)化。據(jù)官方文檔介紹娇豫,這種文件格式可以提供一種高效的方法來存儲Hive數(shù)據(jù)匙姜。HdfsReader利用Hive提供的OrcSerde類,讀取解析orcfile文件的數(shù)據(jù)冯痢。目前HdfsReader支持的功能如下:
- 支持textfile氮昧、orcfile、rcfile浦楣、sequence file和csv格式的文件袖肥,且要求文件內(nèi)容存放的是一張邏輯意義上的二維表。
- 支持多種類型數(shù)據(jù)讀取(使用String表示)振劳,支持列裁剪椎组,支持列常量
- 支持遞歸讀取、支持正則表達式("*"和"?")历恐。
- 支持orcfile數(shù)據(jù)壓縮寸癌,目前支持SNAPPY,ZLIB兩種壓縮方式弱贼。
- 多個File可以支持并發(fā)讀取蒸苇。
- 支持sequence file數(shù)據(jù)壓縮,目前支持lzo壓縮方式吮旅。
- csv類型支持壓縮格式有:gzip溪烤、bz2、zip、lzo檬嘀、lzo_deflate然低、snappy帅韧。
- 目前插件中Hive版本為1.1.1痕支,Hadoop版本為2.7.1(Apache[為適配JDK1.7],在Hadoop 2.5.0, Hadoop 2.6.0 和Hive 1.2.0測試環(huán)境中寫入正常品擎;其它版本需后期進一步測試;
- 支持kerberos認證(注意:如果用戶需要進行kerberos認證贸铜,那么用戶使用的Hadoop集群版本需要和hdfsreader的Hadoop版本保持一致堡纬,如果高于hdfsreader的Hadoop版本,不保證kerberos認證有效)
源碼暫時未實現(xiàn)的點:
- 單個File支持多線程并發(fā)讀取蒿秦,這里涉及到單個File內(nèi)部切分算法烤镐。二期考慮支持。
- 目前還不支持hdfs HA;
HdfsReader核心實現(xiàn)DFSUtil源碼讀取orc格式的文件方法 :
public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSliceConfig,
RecordSender recordSender, TaskPluginCollector taskPluginCollector) {
LOG.info(String.format("Start Read orcfile [%s].", sourceOrcFilePath));
List<ColumnEntry> column = UnstructuredStorageReaderUtil
.getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
String nullFormat = readerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
StringBuilder allColumns = new StringBuilder();
StringBuilder allColumnTypes = new StringBuilder();
boolean isReadAllColumns = false;
int columnIndexMax = -1;
// 判斷是否讀取所有列
if (null == column || column.size() == 0) {
int allColumnsCount = getAllColumnsCount(sourceOrcFilePath);
columnIndexMax = allColumnsCount - 1;
isReadAllColumns = true;
} else {
columnIndexMax = getMaxIndex(column);
}
for (int i = 0; i <= columnIndexMax; i++) {
allColumns.append("col");
allColumnTypes.append("string");
if (i != columnIndexMax) {
allColumns.append(",");
allColumnTypes.append(":");
}
}
if (columnIndexMax >= 0) {
JobConf conf = new JobConf(hadoopConf);
Path orcFilePath = new Path(sourceOrcFilePath);
Properties p = new Properties();
p.setProperty("columns", allColumns.toString());
p.setProperty("columns.types", allColumnTypes.toString());
try {
OrcSerde serde = new OrcSerde();
serde.initialize(conf, p);
StructObjectInspector inspector = (StructObjectInspector) serde.getObjectInspector();
InputFormat<?, ?> in = new OrcInputFormat();
FileInputFormat.setInputPaths(conf, orcFilePath.toString());
//If the network disconnected, will retry 45 times, each time the retry interval for 20 seconds
//Each file as a split
//TODO multy threads
InputSplit[] splits = in.getSplits(conf, 1);
RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
Object key = reader.createKey();
Object value = reader.createValue();
// 獲取列信息
List<? extends StructField> fields = inspector.getAllStructFieldRefs();
List<Object> recordFields;
while (reader.next(key, value)) {
recordFields = new ArrayList<Object>();
for (int i = 0; i <= columnIndexMax; i++) {
Object field = inspector.getStructFieldData(value, fields.get(i));
recordFields.add(field);
}
transportOneRecord(column, recordFields, recordSender,
taskPluginCollector, isReadAllColumns, nullFormat);
}
reader.close();
} catch (Exception e) {
String message = String.format("從orcfile文件路徑[%s]中讀取數(shù)據(jù)發(fā)生異常棍鳖,請聯(lián)系系統(tǒng)管理員炮叶。"
, sourceOrcFilePath);
LOG.error(message);
throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
}
} else {
String message = String.format("請確認您所讀取的列配置正確!columnIndexMax 小于0,column:%s", JSON.toJSONString(column));
throw DataXException.asDataXException(HdfsReaderErrorCode.BAD_CONFIG_VALUE, message);
}
}
對于Hdfs大文件在讀取數(shù)據(jù)的時候會對大文件進行分片/區(qū)塊的讀取渡处,正如上述代碼片段:
//Each file as a split
//TODO multy threads
InputSplit[] splits = in.getSplits(conf, 1);
RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
從代碼實現(xiàn)可以很容易發(fā)現(xiàn)在讀取文件的時候只取了分片后的第一個區(qū)塊的數(shù)據(jù)镜悉,也尚未開啟多線程消費多分片的數(shù)據(jù),這樣就會導(dǎo)致在大文件讀取時医瘫,存在多分片情況丟失數(shù)據(jù)的現(xiàn)象侣肄。
問題發(fā)現(xiàn)后對上述代碼進行完善,完善后的代碼如下:
public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSliceConfig,
RecordSender recordSender, TaskPluginCollector taskPluginCollector) {
LOG.info(String.format("Start Read orcfile [%s].", sourceOrcFilePath));
List<ColumnEntry> column = UnstructuredStorageReaderUtil
.getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
String nullFormat = readerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
StringBuilder allColumns = new StringBuilder();
StringBuilder allColumnTypes = new StringBuilder();
boolean isReadAllColumns = false;
int columnIndexMax = -1;
// 判斷是否讀取所有列
if (null == column || column.size() == 0) {
int allColumnsCount = getAllColumnsCount(sourceOrcFilePath);
columnIndexMax = allColumnsCount - 1;
isReadAllColumns = true;
} else {
columnIndexMax = getMaxIndex(column);
}
for (int i = 0; i <= columnIndexMax; i++) {
allColumns.append("col");
allColumnTypes.append("string");
if (i != columnIndexMax) {
allColumns.append(",");
allColumnTypes.append(":");
}
}
if (columnIndexMax >= 0) {
JobConf conf = new JobConf(hadoopConf);
Path orcFilePath = new Path(sourceOrcFilePath);
Properties p = new Properties();
p.setProperty("columns", allColumns.toString());
p.setProperty("columns.types", allColumnTypes.toString());
try {
OrcSerde serde = new OrcSerde();
serde.initialize(conf, p);
StructObjectInspector inspector = (StructObjectInspector) serde.getObjectInspector();
InputFormat<?, ?> in = new OrcInputFormat();
FileInputFormat.setInputPaths(conf, orcFilePath.toString());
//If the network disconnected, will retry 45 times, each time the retry interval for 20 seconds
//Each file as a split
//TODO multy threads
InputSplit[] splits = in.getSplits(conf, 1);
RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
Object key = reader.createKey();
Object value = reader.createValue();
// 獲取列信息
List<? extends StructField> fields = inspector.getAllStructFieldRefs();
List<Object> recordFields;
while (reader.next(key, value)) {
recordFields = new ArrayList<Object>();
for (int i = 0; i <= columnIndexMax; i++) {
Object field = inspector.getStructFieldData(value, fields.get(i));
recordFields.add(field);
}
transportOneRecord(column, recordFields, recordSender,
taskPluginCollector, isReadAllColumns, nullFormat);
}
reader.close();
} catch (Exception e) {
String message = String.format("從orcfile文件路徑[%s]中讀取數(shù)據(jù)發(fā)生異常醇份,請聯(lián)系系統(tǒng)管理員稼锅。"
, sourceOrcFilePath);
LOG.error(message);
throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
}
} else {
String message = String.format("請確認您所讀取的列配置正確!columnIndexMax 小于0,column:%s", JSON.toJSONString(column));
throw DataXException.asDataXException(HdfsReaderErrorCode.BAD_CONFIG_VALUE, message);
}
}
在對原始DataX源碼進行調(diào)整后僚纷,重新對HdfsReader工程模塊進行打jar矩距,并覆蓋DataX部署的libs目錄下的HdfsReader的jar,重啟DataX應(yīng)用后問題得到解決怖竭。