為什么會(huì)有數(shù)據(jù)變更
在以平臺(tái)為級(jí)別的軟件集合中,為了保證軟件質(zhì)量的可控荐绝,錯(cuò)誤的可追溯一汽,不可避免的會(huì)通過(guò)一些流程來(lái)約束、框定公司的各種條條框框低滩,比如發(fā)布需要有流程才能發(fā)布角虫,業(yè)界有JIRA
、ITSM
這種優(yōu)秀的集成式軟件來(lái)制定流程委造。
而其中數(shù)據(jù)變更也需要一個(gè)流程來(lái)控制戳鹅,當(dāng)線上需要發(fā)生數(shù)據(jù)變更的時(shí)候,團(tuán)隊(duì)需要先編寫(xiě)數(shù)據(jù)庫(kù)腳本昏兆,編寫(xiě)完后提一個(gè)流程到DBA
這邊枫虏,DBA
執(zhí)行以后會(huì)告訴提變更的人具體變更的行數(shù)有多少,確認(rèn)后DBA
再做COMMIT
爬虱,執(zhí)行完后確認(rèn)變更結(jié)果隶债,至此一整個(gè)數(shù)據(jù)變更的業(yè)務(wù)流就完成了。
而這一切的一切都是需要人來(lái)參與的跑筝,人就代表著會(huì)出錯(cuò)死讹,會(huì)有溝通成本,這顯然不符合如今敏捷開(kāi)發(fā)的思想曲梗,更何況你的生產(chǎn)網(wǎng)往往與辦公網(wǎng)段處于物理隔離的時(shí)候赞警,這種變更流程所帶來(lái)的時(shí)間和空間成本變得比較大,很多很急的數(shù)據(jù)變更不能及時(shí)的相應(yīng)虏两,最后甚至?xí)衅脚_(tái)內(nèi)部的一些子團(tuán)隊(duì)會(huì)分配一個(gè)人力去專門(mén)提流程愧旦、發(fā)布。如果能把這個(gè)現(xiàn)有的流程做提取定罢,自動(dòng)化之后笤虫,團(tuán)隊(duì)的成本就能進(jìn)一步的降低,解放出來(lái)的DBA
也能夠做更有意義的事情。
有了問(wèn)題琼蚯,開(kāi)發(fā)人員和架構(gòu)師就會(huì)對(duì)應(yīng)參與進(jìn)來(lái)酬凳,企圖通過(guò)軟件的手段來(lái)解決這個(gè)問(wèn)題,在與DBA充分溝通遭庶、整理了大致的業(yè)務(wù)流之后宁仔,發(fā)現(xiàn)其實(shí)核心的業(yè)務(wù)執(zhí)行流程很簡(jiǎn)單,下面就簡(jiǎn)單的列出如下罚拟。
初步方案
本著又不是不能用的心境台诗,先寫(xiě)個(gè)最簡(jiǎn)單的咯完箩,其實(shí)就是簡(jiǎn)單的JDBC
的執(zhí)行
// 錄入端傳入decSqlListStr
// delete from tableName1; delete from tableName2;
// 簡(jiǎn)易實(shí)現(xiàn)
String decSqlListStr = decSqlListStr.trim();
String[] sqlArr = decSqlListStr.split("\\n*\\s*;\\n*\\s*");
int[] count = jdbcTemplate.batchUpdate(); // 返回批量?jī)?nèi)容
這個(gè)例子直接返回了允許結(jié)果集赐俗,用戶可以直接查看對(duì)應(yīng)的影響行數(shù)。
改進(jìn)方案
上述的實(shí)現(xiàn)是最初的簡(jiǎn)單想法弊知,很顯然和上方的期望業(yè)務(wù)流比只做到了一部分阻逮,并且不提供回滾策略,但至少做到了數(shù)據(jù)變更自動(dòng)化秩彤,可以不需要人參與自動(dòng)執(zhí)行了叔扼,但現(xiàn)今的這個(gè)操作是一個(gè)很危險(xiǎn)的操作,對(duì)于提數(shù)據(jù)變更的人可能會(huì)出現(xiàn)不敢做變更的情況漫雷,典型的執(zhí)行出去的SQL
就像潑出去的水瓜富,再也回不來(lái)了!
其實(shí)只要對(duì)他的數(shù)據(jù)變更做一定的控制即可降盹?這就誕生了第二個(gè)思路:錄入數(shù)據(jù)影響范圍与柑,如果最終執(zhí)行的數(shù)據(jù)變更不在提交的范圍內(nèi)部的話,程序會(huì)拋出異常蓄坏,Spring
框架的特性導(dǎo)致了拋出異常之后可以自動(dòng)回滾价捧。增量的代碼如下所示。
// 在執(zhí)行executeBatch下執(zhí)行代碼塊
Long allCount = 0L;
for (int i = 0; i < resCount.length; i++) {
allCount += resCount[i];
}
if (allCount < min && allCount > max) { // min涡戳、max為傳入的錄入?yún)?shù)
throw new EasiSqlException("您的語(yǔ)句總影響行數(shù)超過(guò)配置行數(shù)结蟋,無(wú)法操作,系統(tǒng)自動(dòng)回滾");
}
這么做是一種折中的解決方案渔彰,來(lái)做到應(yīng)用層的相對(duì)可控嵌屎、可回滾。
但這么做也有一個(gè)問(wèn)題恍涂,既然如此编整,執(zhí)行數(shù)據(jù)變更的用戶都盡量大了填,即使要做到數(shù)據(jù)變更范圍不瞎填乳丰,還要在變更前count
一下掌测,那就失去了做自動(dòng)化變更的意義了,那有沒(méi)有一種完美的方案,來(lái)充分的做到上方畫(huà)的業(yè)務(wù)流圖吶汞斧。像DBA
最常用的Toad
夜郁、Navicat
都是怎么做分段式提交的吶?因?yàn)镴ava都是同步的粘勒,無(wú)法做異步獲取提交竞端。那其實(shí)這個(gè)業(yè)務(wù)流最核心的問(wèn)題就是如何做到分段式的執(zhí)行和提交。
分段式提交
數(shù)據(jù)庫(kù)客戶端在執(zhí)行成功后會(huì)返回影響行數(shù)庙睡,然后再等待
DBA
的commit
或者rollback
事富,這種分段式提交可以讓用戶做二段式確認(rèn),可以讓執(zhí)行有回旋的余地乘陪。也是數(shù)據(jù)變更最常用的一種方案统台。
最終的方案(實(shí)現(xiàn))
分段就代表了異步,說(shuō)到Java的異步啡邑,就想到了異步的線程執(zhí)行數(shù)據(jù)變更贱勃,當(dāng)執(zhí)行指令發(fā)出去后,建立一個(gè)數(shù)據(jù)庫(kù)的提交線程谤逼,這個(gè)線程包含數(shù)據(jù)庫(kù)連接贵扰、執(zhí)行的語(yǔ)句、影響行數(shù)流部、線程ID號(hào)戚绕、開(kāi)始時(shí)間、是否終止以及提交狀態(tài)
數(shù)據(jù)庫(kù)連接
connection
數(shù)據(jù)庫(kù)連接的緩存可以做到讓數(shù)據(jù)庫(kù)進(jìn)行分段式的數(shù)據(jù)提交
影響行數(shù)
excuteCount
執(zhí)行完發(fā)送notify事件的時(shí)候可以發(fā)送影響的行數(shù)枝冀,便于操作人員判斷
線程ID號(hào)
threadId
標(biāo)識(shí)了內(nèi)容舞丛,保證notify事件的唯一性
開(kāi)始時(shí)間
statTimestemp
記錄開(kāi)始事件,來(lái)釋放因?yàn)闃I(yè)務(wù)原因?qū)е聸](méi)觸發(fā)提交宾茂、回滾的資源
是否終止
end
由于機(jī)器的資源有限瓷马,因此需要定期清理執(zhí)行完的線程,終止標(biāo)識(shí)可以讓清理程序抓去到后釋放線程池的資源跨晴。
提交狀態(tài)
commitLevel
此為異步的核心欧聘,決定了一個(gè)數(shù)據(jù)變更事件是否提交的標(biāo)識(shí),當(dāng)數(shù)據(jù)變更在數(shù)據(jù)庫(kù)層執(zhí)行完成后端盆,線程定期檢查狀態(tài)標(biāo)識(shí)怀骤,不同的標(biāo)識(shí)對(duì)應(yīng)不同的策略,分為三種狀態(tài):
hold
的線程等待狀態(tài)焕妙,此時(shí)等待著指令觸發(fā)提交或者回滾蒋伦,此時(shí)會(huì)一直等待指令的觸發(fā)commit
的提交狀態(tài),此時(shí)觸發(fā)數(shù)據(jù)提交rollback
的回滾狀態(tài)焚鹊,此時(shí)觸發(fā)數(shù)據(jù)執(zhí)行
/***
* 數(shù)據(jù)庫(kù)層面的commit等待線程
*/
public class DBCommitThread extends Thread {
private final Log LOG = LogFactory.getLog(ClearDBCommintTaskConfig.class);
private final static int MAX_RETRY_TIME = 5;
// 半小時(shí)不釋放痕届,自動(dòng)回滾(毫秒)
private final static Integer EXCUTE_MAX_MINUTE = 1800000;
// JDBC執(zhí)行
private Connection connection;
// 執(zhí)行的SQL語(yǔ)句
private String[] sqlList;
private String threadId;
private Long statTimestemp;
private int[] excuteCount;
private int retryTime = 0;
public boolean end = false;
// 0: hold, 1: commit, 2: rollback
private String commitLevel = "0";
public DBCommitThread (Connection connection, String[] sqlList, String threadId) {
this.connection = connection;
this.sqlList = sqlList;
this.threadId = threadId;
}
@Override
public void run () {
Statement stmt = null;
try {
connection.setAutoCommit(false);
statTimestemp = System.currentTimeMillis();
stmt = connection.createStatement();
for (int i = 0; i < sqlList.length; i++) {
stmt.addBatch(sqlList[i]);
}
if (stmt != null) {
// 執(zhí)行
int[] count = stmt.executeBatch();
excuteCount = count;
notifyToServer("1", null);
while ("0".equals(this.commitLevel)) {
sleep(2000);
Long nowExcuteTime = System.currentTimeMillis() - statTimestemp;
if (DBCommitThread.EXCUTE_MAX_MINUTE < nowExcuteTime) {
// holding too long, rollback
this.commitLevel = "2";
}
}
// commit or rollback
if ("1".equals(this.commitLevel)) {
LOG.info("------BATCH DB COMMIT------");
connection.commit();
} else {
LOG.info("------BATCH DB ROLLBACK------");
connection.rollback();
}
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
notifyToServer("0", e.getMessage());
rollback();
} finally {
close(stmt, connection);
}
// End Thread
if (!isInterrupted()) {
isInterrupted();
this.end = true;
}
}
/***
* outer call
* @param commitLevel commit level
*/
public void setCommitLevel (String commitLevel) {
this.commitLevel = commitLevel;
}
public String getExcuteCount () {
return JsonUtils.beanToJson(excuteCount);
}
private void rollback () {
try {
connection.rollback();
} catch (SQLException se) {
LOG.error(se.getMessage(), se);
}
}
private void close (Statement stmt, Connection connection) {
try {
if (null != stmt) {
stmt.close();
}
if (null != connection && !connection.isClosed()) {
connection.setAutoCommit(true);
connection.close();
}
} catch (SQLException se) {
LOG.error(se.getMessage(), se);
}
}
private void notifyToServer (String successStatus, String errMsg) {
// 通知主程序更新數(shù)據(jù)變更狀態(tài),為第一張業(yè)務(wù)流程圖里的第二級(jí),此處實(shí)現(xiàn)不做展示
}
}
線程池方法實(shí)現(xiàn)如下研叫,清理Task為直接調(diào)用
clearEndThread
不做展示
public class ThreadFactory {
public static Map<String, DBCommitThread> dbCommitThreadMap;
static {
ThreadFactory.dbCommitThreadMap = new ConcurrentHashMap<>();
}
// 創(chuàng)建線程锤窑,數(shù)據(jù)變更開(kāi)始建立
public static String createDBCommitThread (Connection connection, String[] sqlList) {
// 超出內(nèi)容
if (dbCommitThreadMap.size() >= 20) {
throw new EasiSqlException("Too mush executeBatch Thread");
}
String uuid = UUID.randomUUID().toString();
DBCommitThread newThread = new DBCommitThread(connection, sqlList, uuid);
newThread.start();
dbCommitThreadMap.put(uuid, newThread);
return uuid;
}
// 觸發(fā)設(shè)置指令級(jí)別,二段式提交的提交指令
public static void setCurrentThreadCommitLevel (String threadId, String commitLevel) {
if (ThreadFactory.dbCommitThreadMap.containsKey(threadId)) {
DBCommitThread dbCommitThread = ThreadFactory.dbCommitThreadMap.get(threadId);
dbCommitThread.setCommitLevel(commitLevel);
}
}
// 清理程序定期清理
public static void clearEndThread () {
Set<String> mapSet = ThreadFactory.dbCommitThreadMap.keySet();
Iterator it = mapSet.iterator();
while(it.hasNext()) {
DBCommitThread dbCommitThread = (DBCommitThread) it.next();
if (dbCommitThread.end) {
it.remove();
}
}
}
}
具體的業(yè)務(wù)流程對(duì)應(yīng)上述的方法如下
createDBCommitThread
開(kāi)始創(chuàng)建notifyToServer
通知中央管理層更新變更狀態(tài)和數(shù)據(jù)- 等待指令觸發(fā)
setCurrentThreadCommitLevel
嚷炉,分別對(duì)應(yīng)提交成功和提交回滾clearEndThread
清理程序運(yùn)行- 中央控制程序關(guān)閉數(shù)據(jù)變更
結(jié)語(yǔ)
當(dāng)然渊啰,整體的軟件架構(gòu)遠(yuǎn)遠(yuǎn)沒(méi)有描述的這么簡(jiǎn)單,平臺(tái)級(jí)別的軟件體系涉及到了非常龐大的數(shù)據(jù)庫(kù)個(gè)數(shù)申屹,其中對(duì)數(shù)據(jù)執(zhí)行層绘证、管理層、用戶驗(yàn)證層做了分布式的拆分哗讥,數(shù)據(jù)執(zhí)行層參照微服務(wù)里面的sidecar
設(shè)計(jì)嚷那,它是數(shù)據(jù)庫(kù)的一個(gè)配套服務(wù),執(zhí)行層配合管理層做了服務(wù)發(fā)現(xiàn)忌栅,進(jìn)行機(jī)器的統(tǒng)一管理车酣,多活曲稼,對(duì)外公布API服務(wù)索绪,死機(jī)器踢出,而數(shù)據(jù)變更自動(dòng)化僅僅是數(shù)據(jù)中臺(tái)設(shè)計(jì)中的一個(gè)子模塊設(shè)計(jì)贫悄,究其本質(zhì)瑞驱,都是為了讓?xiě)?yīng)用的開(kāi)發(fā)維護(hù)能更方便,可以給用戶提供更優(yōu)質(zhì)窄坦、快速的數(shù)據(jù)服務(wù)唤反。