開篇
?這篇文章的目的主要是講解RM的執(zhí)行回滾的Executor對象即undoExecutor泊愧,執(zhí)行回滾日志就是由undoExecutor去執(zhí)行的杏死。
undoExecutor源碼分析
public class UndoExecutorFactory {
public static AbstractUndoExecutor getUndoExecutor(String dbType, SQLUndoLog sqlUndoLog) {
if (!dbType.equals(JdbcConstants.MYSQL)) {
throw new NotSupportYetException(dbType);
}
switch (sqlUndoLog.getSqlType()) {
case INSERT:
return new MySQLUndoInsertExecutor(sqlUndoLog);
case UPDATE:
return new MySQLUndoUpdateExecutor(sqlUndoLog);
case DELETE:
return new MySQLUndoDeleteExecutor(sqlUndoLog);
default:
throw new ShouldNeverHappenException();
}
}
}
說明:
- UndoExecutorFactory負責(zé)根據(jù)不同的回滾日志返回對應(yīng)的undoExecutor對象亦镶。
UndoExecutor類依賴圖
說明:
- AbstractUndoExecutor作為回滾類的抽象基類莉钙。
- MySQLUndoDeleteExecutor負責(zé)回滾delete操作仇冯。
- MySQLUndoInsertExecutor負責(zé)回滾insert操作苛茂。
- MySQLUndoUpdateExecutor負責(zé)回滾update操作。
AbstractUndoExecutor
public abstract class AbstractUndoExecutor {
protected SQLUndoLog sqlUndoLog;
protected abstract String buildUndoSQL();
public AbstractUndoExecutor(SQLUndoLog sqlUndoLog) {
this.sqlUndoLog = sqlUndoLog;
}
public void executeOn(Connection conn) throws SQLException {
dataValidation(conn);
try {
// 拼接undoSql的模板
String undoSQL = buildUndoSQL();
// 獲取PreparedStatement對象
PreparedStatement undoPST = conn.prepareStatement(undoSQL);
// 獲取回滾的記錄
TableRecords undoRows = getUndoRows();
// 遍歷所有待回滾的記錄然后一條條的拼接字段
for (Row undoRow : undoRows.getRows()) {
ArrayList<Field> undoValues = new ArrayList<>();
Field pkValue = null;
for (Field field : undoRow.getFields()) {
if (field.getKeyType() == KeyType.PrimaryKey) {
pkValue = field;
} else {
undoValues.add(field);
}
}
// 針對每一條回滾記錄進行準(zhǔn)備
undoPrepare(undoPST, undoValues, pkValue);
// 執(zhí)行回滾操作
undoPST.executeUpdate();
}
} catch (Exception ex) {
if (ex instanceof SQLException) {
throw (SQLException) ex;
} else {
throw new SQLException(ex);
}
}
}
protected void undoPrepare(PreparedStatement undoPST,
ArrayList<Field> undoValues,
Field pkValue) throws SQLException {
int undoIndex = 0;
for (Field undoValue : undoValues) {
undoIndex++;
undoPST.setObject(undoIndex, undoValue.getValue(), undoValue.getType());
}
// PK is at last one.
// INSERT INTO a (x, y, z, pk) VALUES (?, ?, ?, ?)
// UPDATE a SET x=?, y=?, z=? WHERE pk = ?
// DELETE FROM a WHERE pk = ?
undoIndex++;
undoPST.setObject(undoIndex, pkValue.getValue(), pkValue.getType());
}
protected abstract TableRecords getUndoRows();
protected void dataValidation(Connection conn) throws SQLException {
// Validate if data is dirty.
}
}
說明:
- AbstractUndoExecutor定義了回滾操作的整個命令行模板流程鸠窗。
- 拼接undoSql的模板妓羊,buildUndoSQL()。
- 獲取PreparedStatement對象稍计,conn.prepareStatement(undoSQL)躁绸。
- 遍歷所有待回滾的記錄然后一條條的拼接字段。
- 針對每一條回滾記錄進行準(zhǔn)備臣嚣,undoPrepare(undoPST, undoValues, pkValue)净刮。
- 執(zhí)行回滾操作,undoPST.executeUpdate()硅则。
- buildUndoSQL()和getUndoRows()由子類具體實現(xiàn)淹父。
MySQLUndoInsertExecutor
public class MySQLUndoInsertExecutor extends AbstractUndoExecutor {
@Override
protected String buildUndoSQL() {
TableRecords afterImage = sqlUndoLog.getAfterImage();
List<Row> afterImageRows = afterImage.getRows();
if (afterImageRows == null || afterImageRows.size() == 0) {
throw new ShouldNeverHappenException("Invalid UNDO LOG");
}
Row row = afterImageRows.get(0);
StringBuffer mainSQL = new StringBuffer(
"DELETE FROM " + sqlUndoLog.getTableName());
StringBuffer where = new StringBuffer(" WHERE ");
boolean first = true;
for (Field field : row.getFields()) {
if (field.getKeyType() == KeyType.PrimaryKey) {
where.append(field.getName() + " = ? ");
}
}
return mainSQL.append(where).toString();
}
@Override
protected void undoPrepare(PreparedStatement undoPST,
ArrayList<Field> undoValues, Field pkValue)
throws SQLException {
undoPST.setObject(1, pkValue.getValue(), pkValue.getType());
}
public MySQLUndoInsertExecutor(SQLUndoLog sqlUndoLog) {
super(sqlUndoLog);
}
@Override
protected TableRecords getUndoRows() {
return sqlUndoLog.getAfterImage();
}
}
說明:
- Insert的回滾操作在于逆向進行delete操作,MySQLUndoInsertExecutor負責(zé)拼接delete的SQL怎虫。
- delete的SQL的where條件就是insert生成的主鍵primary key暑认。
- 整個回滾操作在父類AbstractUndoExecutor定義。
MySQLUndoDeleteExecutor
public class MySQLUndoDeleteExecutor extends AbstractUndoExecutor {
public MySQLUndoDeleteExecutor(SQLUndoLog sqlUndoLog) {
super(sqlUndoLog);
}
@Override
protected String buildUndoSQL() {
TableRecords beforeImage = sqlUndoLog.getBeforeImage();
List<Row> beforeImageRows = beforeImage.getRows();
if (beforeImageRows == null || beforeImageRows.size() == 0) {
throw new ShouldNeverHappenException("Invalid UNDO LOG");
}
Row row = beforeImageRows.get(0);
StringBuffer insertColumns = new StringBuffer();
StringBuffer insertValues = new StringBuffer();
Field pkField = null;
boolean first = true;
for (Field field : row.getFields()) {
if (field.getKeyType() == KeyType.PrimaryKey) {
pkField = field;
continue;
} else {
if (first) {
first = false;
} else {
insertColumns.append(", ");
insertValues.append(", ");
}
insertColumns.append(field.getName());
insertValues.append("?");
}
}
if (first) {
first = false;
} else {
insertColumns.append(", ");
insertValues.append(", ");
}
insertColumns.append(pkField.getName());
insertValues.append("?");
return "INSERT INTO " + sqlUndoLog.getTableName()
+ "(" + insertColumns.toString() + ")
VALUES (" + insertValues.toString() + ")";
}
@Override
protected TableRecords getUndoRows() {
return sqlUndoLog.getBeforeImage();
}
}
說明:
- Delete的回滾操作在于逆向進行Insert操作揪垄,MySQLUndoDeleteExecutor負責(zé)拼接Insert的SQL穷吮。
- Insert的拼接的SQL是insert tableName (column1,column2) values (?,?).
- 整個回滾操作在父類AbstractUndoExecutor定義逻翁。
MySQLUndoUpdateExecutor
public class MySQLUndoUpdateExecutor extends AbstractUndoExecutor {
@Override
protected String buildUndoSQL() {
TableRecords beforeImage = sqlUndoLog.getBeforeImage();
List<Row> beforeImageRows = beforeImage.getRows();
if (beforeImageRows == null || beforeImageRows.size() == 0) {
throw new ShouldNeverHappenException("Invalid UNDO LOG"); // TODO
}
Row row = beforeImageRows.get(0);
StringBuffer mainSQL = new StringBuffer(
"UPDATE " + sqlUndoLog.getTableName() + " SET ");
StringBuffer where = new StringBuffer(" WHERE ");
boolean first = true;
for (Field field : row.getFields()) {
if (field.getKeyType() == KeyType.PrimaryKey) {
where.append(field.getName() + " = ?");
} else {
if (first) {
first = false;
} else {
mainSQL.append(", ");
}
mainSQL.append(field.getName() + " = ?");
}
}
return mainSQL.append(where).toString();
}
public MySQLUndoUpdateExecutor(SQLUndoLog sqlUndoLog) {
super(sqlUndoLog);
}
@Override
protected TableRecords getUndoRows() {
return sqlUndoLog.getBeforeImage();
}
}
說明:
- update的回滾操作在于逆向進行update操作狰晚,MySQLUndoUpdateExecutor負責(zé)拼接update的SQL譬挚。
- Insert的拼接的SQL是update tableName set column1=? where column=?。
- 整個回滾操作在父類AbstractUndoExecutor定義。