本文中的writeModel主要是Mysql、Oracle等傳統(tǒng)關(guān)系數(shù)據(jù)庫中的writeMode俄周。dataX導入到hive是直接寫文件吁讨,不會支持這些writeModel。
預(yù)備知識
Mysql中的ON DUPLICATE KEY UPDATE
使用 ON DUPLICATE KEY UPDATE
語句的時候峦朗,如果你插入的記錄導致主鍵或唯一索引重復(fù)建丧,那么Mysql就會認為該條記錄存在,則執(zhí)行update語句而不是insert語句波势;反之則執(zhí)行insert語句而不是更新語句翎朱。
新建表user橄维,id作為user的主鍵,并插入一條id = 1的數(shù)據(jù)
CREATE TABLE `user` (
`id` int(12) NOT NULL ,
`username` varchar(32) DEFAULT NULL,
`password` varchar(32) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB;
INSERT INTO user VALUES (1,'GJMZ1','123456');
執(zhí)行以下語句之后user表中然后只有一條語句拴曲,只是將password改為'1234567'争舞,這就是ON DUPLICATE KEY UPDATE
的功能。
INSERT INTO user VALUES (1,'GJMZ1','1234567')ON DUPLICATE KEY UPDATE username = 'GJMZ1',password='1234567';
如果去掉表中的主鍵澈灼,執(zhí)行上面兩條sql之后竞川,表中會有兩行數(shù)據(jù),因為執(zhí)行ON DUPLICATE KEY UPDATE
語句時蕉汪,mysql是通過主鍵或者唯一索引來判斷兩條數(shù)據(jù)是否重復(fù)流译。
writeModel的用法
writeModel控制寫入數(shù)據(jù)到目標表采用 insert into
或者 replace into
或者 ON DUPLICATE KEY UPDATE
語句逞怨。
- insert:將數(shù)據(jù)源表的數(shù)據(jù)直接寫的到目的表者疤,主要用于全量的導入。實現(xiàn)原理是直接采用
insert into
; - replace和update:如果目標表中包含待寫入的數(shù)據(jù)則更新該行數(shù)據(jù)叠赦,主要用于增量導入驹马。實現(xiàn)原理:在mysql中用
ON DUPLICATE KEY UPDATE
語句,其他數(shù)據(jù)庫中用replace into
.
相關(guān)源碼
以MysqlWriter為例進行說明除秀。
流程很簡單糯累,設(shè)置的時候用戶配置的每一個字段都要更新,以"?"的方式寫進sql中册踩,最后利用PrepareStatement#setXXX進行設(shè)置泳姐。根據(jù)三種模式拼接相應(yīng)的sql即可,最后將sql保存在Cofiguration#insertOrReplaceTemplate中暂吉。
// OriginalConfPretreatmentUtil#dealWriteMode
public static void dealWriteMode(Configuration originalConfig, DataBaseType dataBaseType) {
List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
String jdbcUrl = originalConfig.getString(String.format("%s[0].%s",
Constant.CONN_MARK, Key.JDBC_URL, String.class));
// 默認為:insert 方式
String writeMode = originalConfig.getString(Key.WRITE_MODE, "INSERT");
List<String> valueHolders = new ArrayList<String>(columns.size());
for (int i = 0; i < columns.size(); i++) {
valueHolders.add("?");
}
boolean forceUseUpdate = false;
//ob10的處理
if (dataBaseType == DataBaseType.MySql && isOB10(jdbcUrl)) {
forceUseUpdate = true;
}
String writeDataSqlTemplate = WriterUtil.getWriteTemplate(columns, valueHolders, writeMode,dataBaseType, forceUseUpdate);
LOG.info("Write data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl);
originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate);
}
// WriterUtil#getWriteTemplate
public static String getWriteTemplate(List<String> columnHolders, List<String> valueHolders, String writeMode, DataBaseType dataBaseType, boolean forceUseUpdate) {
boolean isWriteModeLegal = writeMode.trim().toLowerCase().startsWith("insert")
|| writeMode.trim().toLowerCase().startsWith("replace")
|| writeMode.trim().toLowerCase().startsWith("update");
if (!isWriteModeLegal) {
throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,
String.format("您所配置的 writeMode:%s 錯誤. 因為DataX 目前僅支持replace,update 或 insert 方式. 請檢查您的配置并作出修改.", writeMode));
}
// && writeMode.trim().toLowerCase().startsWith("replace")
String writeDataSqlTemplate;
if (forceUseUpdate ||
((dataBaseType == DataBaseType.MySql || dataBaseType == DataBaseType.Tddl) && writeMode.trim().toLowerCase().startsWith("update"))
) {
//update只在mysql下使用
writeDataSqlTemplate = new StringBuilder()
.append("INSERT INTO %s (").append(StringUtils.join(columnHolders, ","))
.append(") VALUES(").append(StringUtils.join(valueHolders, ","))
.append(")")
.append(onDuplicateKeyUpdateString(columnHolders))
.toString();
} else {
//這里是保護,如果其他錯誤的使用了update,需要更換為replace
if (writeMode.trim().toLowerCase().startsWith("update")) {
writeMode = "replace";
}
writeDataSqlTemplate = new StringBuilder().append(writeMode)
.append(" INTO %s (").append(StringUtils.join(columnHolders, ","))
.append(") VALUES(").append(StringUtils.join(valueHolders, ","))
.append(")").toString();
}
return writeDataSqlTemplate;
}