DataX同步Hive數(shù)據(jù)丟失开仰,源碼修復(fù)

DataX簡介

DataX 是阿里云 DataWorks數(shù)據(jù)集成 的開源版本诡蜓,在阿里巴巴集團內(nèi)被廣泛使用的離線數(shù)據(jù)同步工具/平臺。DataX 實現(xiàn)了包括 MySQL粟耻、Oracle垢乙、OceanBase锨咙、SqlServer、Postgre追逮、HDFS酪刀、Hive、ADS钮孵、HBase骂倘、TableStore(OTS)、MaxCompute(ODPS)巴席、Hologres历涝、DRDS 等各種異構(gòu)數(shù)據(jù)源之間高效的數(shù)據(jù)同步功能。

DataX 商業(yè)版本

阿里云DataWorks數(shù)據(jù)集成是DataX團隊在阿里云上的商業(yè)化產(chǎn)品漾唉,致力于提供復(fù)雜網(wǎng)絡(luò)環(huán)境下荧库、豐富的異構(gòu)數(shù)據(jù)源之間高速穩(wěn)定的數(shù)據(jù)移動能力,以及繁雜業(yè)務(wù)背景下的數(shù)據(jù)同步解決方案赵刑。目前已經(jīng)支持云上近3000家客戶分衫,單日同步數(shù)據(jù)超過3萬億條。DataWorks數(shù)據(jù)集成目前支持離線50+種數(shù)據(jù)源般此,可以進行整庫遷移蚪战、批量上云、增量同步铐懊、分庫分表等各類同步解決方案邀桑。2020年更新實時同步能力,2020年更新實時同步能力科乎,支持10+種數(shù)據(jù)源的讀寫任意組合概漱。提供MySQL,Oracle等多種數(shù)據(jù)源到阿里云MaxCompute喜喂,Hologres等大數(shù)據(jù)引擎的一鍵全增量同步解決方案。

商業(yè)版本參見: https://www.aliyun.com/product/bigdata/ide

DataX的特點

DataX本身作為數(shù)據(jù)同步框架竿裂,將不同數(shù)據(jù)源的同步抽象為從源頭數(shù)據(jù)源讀取數(shù)據(jù)的Reader插件玉吁,以及向目標(biāo)端寫入數(shù)據(jù)的Writer插件,理論上DataX框架可以支持任意數(shù)據(jù)源類型的數(shù)據(jù)同步工作腻异。同時DataX插件體系作為一套生態(tài)系統(tǒng), 每接入一套新數(shù)據(jù)源該新加入的數(shù)據(jù)源即可實現(xiàn)和現(xiàn)有的數(shù)據(jù)源互通进副。

DataX同步Hive數(shù)據(jù)丟失

使用Datax進行兩個集群間的數(shù)據(jù)同步,在讀取HDFS大文件數(shù)據(jù)時,存在出現(xiàn)數(shù)據(jù)丟失問題影斑。從上文我們知道DataX的數(shù)據(jù)同步原理给赞,就是將不同數(shù)據(jù)源的同步抽象為從源頭數(shù)據(jù)源讀取數(shù)據(jù)的Reader插件,以及向目標(biāo)端寫入數(shù)據(jù)的Writer插件矫户。為了適配各種異構(gòu)的數(shù)據(jù)存儲介質(zhì)片迅,DataX源碼在設(shè)計的時候針對不同的數(shù)據(jù)源編寫了相應(yīng)的Reader插件和Writer插件。既然問題是在數(shù)據(jù)源讀取就存在數(shù)據(jù)丟失的問題皆辽,我們不妨看看DataX得源碼實現(xiàn)柑蛇。

DataX的Hive數(shù)據(jù)源HdfsReader插件

HdfsReader實現(xiàn)了從Hadoop分布式文件系統(tǒng)Hdfs中讀取文件數(shù)據(jù)并轉(zhuǎn)為DataX協(xié)議的功能。textfile是Hive建表時默認使用的存儲格式驱闷,數(shù)據(jù)不做壓縮耻台,本質(zhì)上textfile就是以文本的形式將數(shù)據(jù)存放在hdfs中,對于DataX而言空另,HdfsReader實現(xiàn)上類比TxtFileReader盆耽,有諸多相似之處。orcfile扼菠,它的全名是Optimized Row Columnar file摄杂,是對RCFile做了優(yōu)化。據(jù)官方文檔介紹娇豫,這種文件格式可以提供一種高效的方法來存儲Hive數(shù)據(jù)匙姜。HdfsReader利用Hive提供的OrcSerde類,讀取解析orcfile文件的數(shù)據(jù)冯痢。目前HdfsReader支持的功能如下:

  1. 支持textfile氮昧、orcfile、rcfile浦楣、sequence file和csv格式的文件袖肥,且要求文件內(nèi)容存放的是一張邏輯意義上的二維表。
  2. 支持多種類型數(shù)據(jù)讀取(使用String表示)振劳,支持列裁剪椎组,支持列常量
  3. 支持遞歸讀取、支持正則表達式("*"和"?")历恐。
  4. 支持orcfile數(shù)據(jù)壓縮寸癌,目前支持SNAPPY,ZLIB兩種壓縮方式弱贼。
  5. 多個File可以支持并發(fā)讀取蒸苇。
  6. 支持sequence file數(shù)據(jù)壓縮,目前支持lzo壓縮方式吮旅。
  7. csv類型支持壓縮格式有:gzip溪烤、bz2、zip、lzo檬嘀、lzo_deflate然低、snappy帅韧。
  8. 目前插件中Hive版本為1.1.1痕支,Hadoop版本為2.7.1(Apache[為適配JDK1.7],在Hadoop 2.5.0, Hadoop 2.6.0 和Hive 1.2.0測試環(huán)境中寫入正常品擎;其它版本需后期進一步測試;
  9. 支持kerberos認證(注意:如果用戶需要進行kerberos認證贸铜,那么用戶使用的Hadoop集群版本需要和hdfsreader的Hadoop版本保持一致堡纬,如果高于hdfsreader的Hadoop版本,不保證kerberos認證有效)

源碼暫時未實現(xiàn)的點:

  1. 單個File支持多線程并發(fā)讀取蒿秦,這里涉及到單個File內(nèi)部切分算法烤镐。二期考慮支持。
  2. 目前還不支持hdfs HA;

HdfsReader核心實現(xiàn)DFSUtil源碼讀取orc格式的文件方法 :

public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSliceConfig,
                                 RecordSender recordSender, TaskPluginCollector taskPluginCollector) {
        LOG.info(String.format("Start Read orcfile [%s].", sourceOrcFilePath));
        List<ColumnEntry> column = UnstructuredStorageReaderUtil
                .getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
        String nullFormat = readerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
        StringBuilder allColumns = new StringBuilder();
        StringBuilder allColumnTypes = new StringBuilder();
        boolean isReadAllColumns = false;
        int columnIndexMax = -1;
        // 判斷是否讀取所有列
        if (null == column || column.size() == 0) {
            int allColumnsCount = getAllColumnsCount(sourceOrcFilePath);
            columnIndexMax = allColumnsCount - 1;
            isReadAllColumns = true;
        } else {
            columnIndexMax = getMaxIndex(column);
        }
        for (int i = 0; i <= columnIndexMax; i++) {
            allColumns.append("col");
            allColumnTypes.append("string");
            if (i != columnIndexMax) {
                allColumns.append(",");
                allColumnTypes.append(":");
            }
        }
        if (columnIndexMax >= 0) {
            JobConf conf = new JobConf(hadoopConf);
            Path orcFilePath = new Path(sourceOrcFilePath);
            Properties p = new Properties();
            p.setProperty("columns", allColumns.toString());
            p.setProperty("columns.types", allColumnTypes.toString());
            try {
                OrcSerde serde = new OrcSerde();
                serde.initialize(conf, p);
                StructObjectInspector inspector = (StructObjectInspector) serde.getObjectInspector();
                InputFormat<?, ?> in = new OrcInputFormat();
                FileInputFormat.setInputPaths(conf, orcFilePath.toString());

                //If the network disconnected, will retry 45 times, each time the retry interval for 20 seconds
                //Each file as a split
                //TODO multy threads
                InputSplit[] splits = in.getSplits(conf, 1);

                RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
                Object key = reader.createKey();
                Object value = reader.createValue();
                // 獲取列信息
                List<? extends StructField> fields = inspector.getAllStructFieldRefs();

                List<Object> recordFields;
                while (reader.next(key, value)) {
                    recordFields = new ArrayList<Object>();

                    for (int i = 0; i <= columnIndexMax; i++) {
                        Object field = inspector.getStructFieldData(value, fields.get(i));
                        recordFields.add(field);
                    }
                    transportOneRecord(column, recordFields, recordSender,
                            taskPluginCollector, isReadAllColumns, nullFormat);
                }
                reader.close();
            } catch (Exception e) {
                String message = String.format("從orcfile文件路徑[%s]中讀取數(shù)據(jù)發(fā)生異常棍鳖,請聯(lián)系系統(tǒng)管理員炮叶。"
                        , sourceOrcFilePath);
                LOG.error(message);
                throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
            }
        } else {
            String message = String.format("請確認您所讀取的列配置正確!columnIndexMax 小于0,column:%s", JSON.toJSONString(column));
            throw DataXException.asDataXException(HdfsReaderErrorCode.BAD_CONFIG_VALUE, message);
        }
    }

對于Hdfs大文件在讀取數(shù)據(jù)的時候會對大文件進行分片/區(qū)塊的讀取渡处,正如上述代碼片段:

                //Each file as a split
                //TODO multy threads
                InputSplit[] splits = in.getSplits(conf, 1);

                RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);

從代碼實現(xiàn)可以很容易發(fā)現(xiàn)在讀取文件的時候只取了分片后的第一個區(qū)塊的數(shù)據(jù)镜悉,也尚未開啟多線程消費多分片的數(shù)據(jù),這樣就會導(dǎo)致在大文件讀取時医瘫,存在多分片情況丟失數(shù)據(jù)的現(xiàn)象侣肄。

問題發(fā)現(xiàn)后對上述代碼進行完善,完善后的代碼如下:

public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSliceConfig,
                                 RecordSender recordSender, TaskPluginCollector taskPluginCollector) {
        LOG.info(String.format("Start Read orcfile [%s].", sourceOrcFilePath));
        List<ColumnEntry> column = UnstructuredStorageReaderUtil
                .getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
        String nullFormat = readerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
        StringBuilder allColumns = new StringBuilder();
        StringBuilder allColumnTypes = new StringBuilder();
        boolean isReadAllColumns = false;
        int columnIndexMax = -1;
        // 判斷是否讀取所有列
        if (null == column || column.size() == 0) {
            int allColumnsCount = getAllColumnsCount(sourceOrcFilePath);
            columnIndexMax = allColumnsCount - 1;
            isReadAllColumns = true;
        } else {
            columnIndexMax = getMaxIndex(column);
        }
        for (int i = 0; i <= columnIndexMax; i++) {
            allColumns.append("col");
            allColumnTypes.append("string");
            if (i != columnIndexMax) {
                allColumns.append(",");
                allColumnTypes.append(":");
            }
        }
        if (columnIndexMax >= 0) {
            JobConf conf = new JobConf(hadoopConf);
            Path orcFilePath = new Path(sourceOrcFilePath);
            Properties p = new Properties();
            p.setProperty("columns", allColumns.toString());
            p.setProperty("columns.types", allColumnTypes.toString());
            try {
                OrcSerde serde = new OrcSerde();
                serde.initialize(conf, p);
                StructObjectInspector inspector = (StructObjectInspector)                 serde.getObjectInspector();
                InputFormat<?, ?> in = new OrcInputFormat();
                FileInputFormat.setInputPaths(conf, orcFilePath.toString());

                //If the network disconnected, will retry 45 times, each time the retry interval for 20 seconds
                //Each file as a split
                //TODO multy threads
                InputSplit[] splits = in.getSplits(conf, 1);

                RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
                Object key = reader.createKey();
                Object value = reader.createValue();
                // 獲取列信息
                List<? extends StructField> fields = inspector.getAllStructFieldRefs();

                List<Object> recordFields;
                while (reader.next(key, value)) {
                    recordFields = new ArrayList<Object>();

                    for (int i = 0; i <= columnIndexMax; i++) {
                        Object field = inspector.getStructFieldData(value, fields.get(i));
                        recordFields.add(field);
                    }
                    transportOneRecord(column, recordFields, recordSender,
                            taskPluginCollector, isReadAllColumns, nullFormat);
                }
                reader.close();
            } catch (Exception e) {
                String message = String.format("從orcfile文件路徑[%s]中讀取數(shù)據(jù)發(fā)生異常醇份,請聯(lián)系系統(tǒng)管理員稼锅。"
                        , sourceOrcFilePath);
                LOG.error(message);
                throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
            }
        } else {
            String message = String.format("請確認您所讀取的列配置正確!columnIndexMax 小于0,column:%s", JSON.toJSONString(column));
            throw DataXException.asDataXException(HdfsReaderErrorCode.BAD_CONFIG_VALUE, message);
        }
    }

在對原始DataX源碼進行調(diào)整后僚纷,重新對HdfsReader工程模塊進行打jar矩距,并覆蓋DataX部署的libs目錄下的HdfsReader的jar,重啟DataX應(yīng)用后問題得到解決怖竭。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末锥债,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子痊臭,更是在濱河造成了極大的恐慌哮肚,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,907評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件广匙,死亡現(xiàn)場離奇詭異绽左,居然都是意外死亡,警方通過查閱死者的電腦和手機艇潭,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人蹋凝,你說我怎么就攤上這事鲁纠。” “怎么了鳍寂?”我有些...
    開封第一講書人閱讀 164,298評論 0 354
  • 文/不壞的土叔 我叫張陵改含,是天一觀的道長。 經(jīng)常有香客問我迄汛,道長捍壤,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,586評論 1 293
  • 正文 為了忘掉前任鞍爱,我火速辦了婚禮鹃觉,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘睹逃。我一直安慰自己盗扇,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,633評論 6 392
  • 文/花漫 我一把揭開白布沉填。 她就那樣靜靜地躺著疗隶,像睡著了一般。 火紅的嫁衣襯著肌膚如雪翼闹。 梳的紋絲不亂的頭發(fā)上斑鼻,一...
    開封第一講書人閱讀 51,488評論 1 302
  • 那天,我揣著相機與錄音猎荠,去河邊找鬼坚弱。 笑死,一個胖子當(dāng)著我的面吹牛法牲,可吹牛的內(nèi)容都是我干的史汗。 我是一名探鬼主播,決...
    沈念sama閱讀 40,275評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼拒垃,長吁一口氣:“原來是場噩夢啊……” “哼停撞!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起悼瓮,我...
    開封第一講書人閱讀 39,176評論 0 276
  • 序言:老撾萬榮一對情侶失蹤戈毒,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后横堡,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體埋市,經(jīng)...
    沈念sama閱讀 45,619評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,819評論 3 336
  • 正文 我和宋清朗相戀三年命贴,在試婚紗的時候發(fā)現(xiàn)自己被綠了道宅。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片食听。...
    茶點故事閱讀 39,932評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖污茵,靈堂內(nèi)的尸體忽然破棺而出樱报,到底是詐尸還是另有隱情,我是刑警寧澤泞当,帶...
    沈念sama閱讀 35,655評論 5 346
  • 正文 年R本政府宣布迹蛤,位于F島的核電站,受9級特大地震影響襟士,放射性物質(zhì)發(fā)生泄漏盗飒。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,265評論 3 329
  • 文/蒙蒙 一陋桂、第九天 我趴在偏房一處隱蔽的房頂上張望逆趣。 院中可真熱鬧,春花似錦章喉、人聲如沸汗贫。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,871評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽落包。三九已至,卻和暖如春摊唇,著一層夾襖步出監(jiān)牢的瞬間咐蝇,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,994評論 1 269
  • 我被黑心中介騙來泰國打工巷查, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留有序,地道東北人。 一個月前我還...
    沈念sama閱讀 48,095評論 3 370
  • 正文 我出身青樓岛请,卻偏偏與公主長得像旭寿,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子崇败,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,884評論 2 354

推薦閱讀更多精彩內(nèi)容