如何做到數(shù)據(jù)變更的自動(dòng)化暑椰?

為什么會(huì)有數(shù)據(jù)變更

在以平臺(tái)為級(jí)別的軟件集合中,為了保證軟件質(zhì)量的可控荐绝,錯(cuò)誤的可追溯一汽,不可避免的會(huì)通過(guò)一些流程來(lái)約束、框定公司的各種條條框框低滩,比如發(fā)布需要有流程才能發(fā)布角虫,業(yè)界有JIRAITSM這種優(yōu)秀的集成式軟件來(lái)制定流程委造。

而其中數(shù)據(jù)變更也需要一個(gè)流程來(lái)控制戳鹅,當(dāng)線上需要發(fā)生數(shù)據(jù)變更的時(shí)候,團(tuán)隊(duì)需要先編寫(xiě)數(shù)據(jù)庫(kù)腳本昏兆,編寫(xiě)完后提一個(gè)流程到DBA這邊枫虏,DBA執(zhí)行以后會(huì)告訴提變更的人具體變更的行數(shù)有多少,確認(rèn)后DBA再做COMMIT爬虱,執(zhí)行完后確認(rèn)變更結(jié)果隶债,至此一整個(gè)數(shù)據(jù)變更的業(yè)務(wù)流就完成了。

而這一切的一切都是需要人來(lái)參與的跑筝,人就代表著會(huì)出錯(cuò)死讹,會(huì)有溝通成本,這顯然不符合如今敏捷開(kāi)發(fā)的思想曲梗,更何況你的生產(chǎn)網(wǎng)往往與辦公網(wǎng)段處于物理隔離的時(shí)候赞警,這種變更流程所帶來(lái)的時(shí)間和空間成本變得比較大,很多很急的數(shù)據(jù)變更不能及時(shí)的相應(yīng)虏两,最后甚至?xí)衅脚_(tái)內(nèi)部的一些子團(tuán)隊(duì)會(huì)分配一個(gè)人力去專門(mén)提流程愧旦、發(fā)布。如果能把這個(gè)現(xiàn)有的流程做提取定罢,自動(dòng)化之后笤虫,團(tuán)隊(duì)的成本就能進(jìn)一步的降低,解放出來(lái)的DBA也能夠做更有意義的事情。

有了問(wèn)題琼蚯,開(kāi)發(fā)人員和架構(gòu)師就會(huì)對(duì)應(yīng)參與進(jìn)來(lái)酬凳,企圖通過(guò)軟件的手段來(lái)解決這個(gè)問(wèn)題,在與DBA充分溝通遭庶、整理了大致的業(yè)務(wù)流之后宁仔,發(fā)現(xiàn)其實(shí)核心的業(yè)務(wù)執(zhí)行流程很簡(jiǎn)單,下面就簡(jiǎn)單的列出如下罚拟。

數(shù)據(jù)變更的核心業(yè)務(wù)流

初步方案

本著又不是不能用的心境台诗,先寫(xiě)個(gè)最簡(jiǎn)單的咯完箩,其實(shí)就是簡(jiǎn)單的JDBC的執(zhí)行

// 錄入端傳入decSqlListStr
// delete from tableName1; delete from tableName2;
// 簡(jiǎn)易實(shí)現(xiàn)
String decSqlListStr = decSqlListStr.trim();
String[] sqlArr = decSqlListStr.split("\\n*\\s*;\\n*\\s*");
int[] count = jdbcTemplate.batchUpdate(); // 返回批量?jī)?nèi)容

這個(gè)例子直接返回了允許結(jié)果集赐俗,用戶可以直接查看對(duì)應(yīng)的影響行數(shù)。

改進(jìn)方案

上述的實(shí)現(xiàn)是最初的簡(jiǎn)單想法弊知,很顯然和上方的期望業(yè)務(wù)流比只做到了一部分阻逮,并且不提供回滾策略,但至少做到了數(shù)據(jù)變更自動(dòng)化秩彤,可以不需要人參與自動(dòng)執(zhí)行了叔扼,但現(xiàn)今的這個(gè)操作是一個(gè)很危險(xiǎn)的操作,對(duì)于提數(shù)據(jù)變更的人可能會(huì)出現(xiàn)不敢做變更的情況漫雷,典型的執(zhí)行出去的SQL就像潑出去的水瓜富,再也回不來(lái)了!

其實(shí)只要對(duì)他的數(shù)據(jù)變更做一定的控制即可降盹?這就誕生了第二個(gè)思路:錄入數(shù)據(jù)影響范圍与柑,如果最終執(zhí)行的數(shù)據(jù)變更不在提交的范圍內(nèi)部的話,程序會(huì)拋出異常蓄坏,Spring框架的特性導(dǎo)致了拋出異常之后可以自動(dòng)回滾价捧。增量的代碼如下所示。

// 在執(zhí)行executeBatch下執(zhí)行代碼塊
Long allCount = 0L;
for (int i = 0; i < resCount.length; i++) {
  allCount += resCount[i];
}
if (allCount < min && allCount > max) {  // min涡戳、max為傳入的錄入?yún)?shù)
  throw new EasiSqlException("您的語(yǔ)句總影響行數(shù)超過(guò)配置行數(shù)结蟋,無(wú)法操作,系統(tǒng)自動(dòng)回滾");
}

這么做是一種折中的解決方案渔彰,來(lái)做到應(yīng)用層的相對(duì)可控嵌屎、可回滾。

但這么做也有一個(gè)問(wèn)題恍涂,既然如此编整,執(zhí)行數(shù)據(jù)變更的用戶都盡量大了填,即使要做到數(shù)據(jù)變更范圍不瞎填乳丰,還要在變更前count一下掌测,那就失去了做自動(dòng)化變更的意義了,那有沒(méi)有一種完美的方案,來(lái)充分的做到上方畫(huà)的業(yè)務(wù)流圖吶汞斧。像DBA最常用的Toad夜郁、Navicat都是怎么做分段式提交的吶?因?yàn)镴ava都是同步的粘勒,無(wú)法做異步獲取提交竞端。那其實(shí)這個(gè)業(yè)務(wù)流最核心的問(wèn)題就是如何做到分段式的執(zhí)行和提交。

分段式提交

數(shù)據(jù)庫(kù)客戶端在執(zhí)行成功后會(huì)返回影響行數(shù)庙睡,然后再等待DBAcommit或者rollback事富,這種分段式提交可以讓用戶做二段式確認(rèn),可以讓執(zhí)行有回旋的余地乘陪。也是數(shù)據(jù)變更最常用的一種方案统台。

最終的方案(實(shí)現(xiàn))

分段就代表了異步,說(shuō)到Java的異步啡邑,就想到了異步的線程執(zhí)行數(shù)據(jù)變更贱勃,當(dāng)執(zhí)行指令發(fā)出去后,建立一個(gè)數(shù)據(jù)庫(kù)的提交線程谤逼,這個(gè)線程包含數(shù)據(jù)庫(kù)連接贵扰、執(zhí)行的語(yǔ)句、影響行數(shù)流部、線程ID號(hào)戚绕、開(kāi)始時(shí)間、是否終止以及提交狀態(tài)

數(shù)據(jù)庫(kù)連接connection

數(shù)據(jù)庫(kù)連接的緩存可以做到讓數(shù)據(jù)庫(kù)進(jìn)行分段式的數(shù)據(jù)提交

影響行數(shù)excuteCount

執(zhí)行完發(fā)送notify事件的時(shí)候可以發(fā)送影響的行數(shù)枝冀,便于操作人員判斷

線程ID號(hào)threadId

標(biāo)識(shí)了內(nèi)容舞丛,保證notify事件的唯一性

開(kāi)始時(shí)間statTimestemp

記錄開(kāi)始事件,來(lái)釋放因?yàn)闃I(yè)務(wù)原因?qū)е聸](méi)觸發(fā)提交宾茂、回滾的資源

是否終止end

由于機(jī)器的資源有限瓷马,因此需要定期清理執(zhí)行完的線程,終止標(biāo)識(shí)可以讓清理程序抓去到后釋放線程池的資源跨晴。

提交狀態(tài)commitLevel

此為異步的核心欧聘,決定了一個(gè)數(shù)據(jù)變更事件是否提交的標(biāo)識(shí),當(dāng)數(shù)據(jù)變更在數(shù)據(jù)庫(kù)層執(zhí)行完成后端盆,線程定期檢查狀態(tài)標(biāo)識(shí)怀骤,不同的標(biāo)識(shí)對(duì)應(yīng)不同的策略,分為三種狀態(tài):

  1. hold的線程等待狀態(tài)焕妙,此時(shí)等待著指令觸發(fā)提交或者回滾蒋伦,此時(shí)會(huì)一直等待指令的觸發(fā)
  2. commit的提交狀態(tài),此時(shí)觸發(fā)數(shù)據(jù)提交
  3. rollback的回滾狀態(tài)焚鹊,此時(shí)觸發(fā)數(shù)據(jù)執(zhí)行
/***
 * 數(shù)據(jù)庫(kù)層面的commit等待線程
 */
public class DBCommitThread extends Thread {

    private final Log LOG = LogFactory.getLog(ClearDBCommintTaskConfig.class);

    private final static int MAX_RETRY_TIME = 5;

    // 半小時(shí)不釋放痕届,自動(dòng)回滾(毫秒)
    private final static Integer EXCUTE_MAX_MINUTE = 1800000;

    // JDBC執(zhí)行
    private Connection connection;

    // 執(zhí)行的SQL語(yǔ)句
    private String[] sqlList;

    private String threadId;

    private Long statTimestemp;

    private int[] excuteCount;

    private int retryTime = 0;

    public boolean end = false;

    // 0: hold, 1: commit, 2: rollback
    private String commitLevel = "0";

    public DBCommitThread (Connection connection, String[] sqlList, String threadId) {
        this.connection = connection;
        this.sqlList = sqlList;
        this.threadId = threadId;
    }

    @Override
    public void run () {
        Statement stmt = null;
        try {
            connection.setAutoCommit(false);
            statTimestemp = System.currentTimeMillis();
            stmt = connection.createStatement();
            for (int i = 0; i < sqlList.length; i++) {
                stmt.addBatch(sqlList[i]);
            }
            if (stmt != null) {
                // 執(zhí)行
                int[] count = stmt.executeBatch();
                excuteCount = count;
                notifyToServer("1", null);

                while ("0".equals(this.commitLevel)) {
                    sleep(2000);
                    Long nowExcuteTime = System.currentTimeMillis() - statTimestemp;
                    if (DBCommitThread.EXCUTE_MAX_MINUTE < nowExcuteTime) {
                        // holding too long, rollback
                        this.commitLevel = "2";
                    }
                }
                // commit or rollback
                if ("1".equals(this.commitLevel)) {
                    LOG.info("------BATCH DB COMMIT------");
                    connection.commit();
                } else {
                    LOG.info("------BATCH DB ROLLBACK------");
                    connection.rollback();
                }
            }
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            notifyToServer("0", e.getMessage());
            rollback();
        } finally {
            close(stmt, connection);
        }

        // End Thread
        if (!isInterrupted()) {
            isInterrupted();
            this.end = true;
        }
    }

    /***
     * outer call
     * @param commitLevel commit level
     */
    public void setCommitLevel (String commitLevel) {
        this.commitLevel = commitLevel;
    }

    public String getExcuteCount () {
        return JsonUtils.beanToJson(excuteCount);
    }

    private void rollback () {
        try {
            connection.rollback();
        } catch (SQLException se) {
            LOG.error(se.getMessage(), se);
        }
    }

    private void close (Statement stmt, Connection connection) {
        try {
            if (null != stmt) {
                stmt.close();
            }
            if (null != connection && !connection.isClosed()) {
                connection.setAutoCommit(true);
                connection.close();
            }
        } catch (SQLException se) {
            LOG.error(se.getMessage(), se);
        }
    }

    private void notifyToServer (String successStatus, String errMsg) {
      // 通知主程序更新數(shù)據(jù)變更狀態(tài),為第一張業(yè)務(wù)流程圖里的第二級(jí),此處實(shí)現(xiàn)不做展示
    }

}

線程池方法實(shí)現(xiàn)如下研叫,清理Task為直接調(diào)用clearEndThread不做展示

public class ThreadFactory {

    public static Map<String, DBCommitThread> dbCommitThreadMap;

    static {
        ThreadFactory.dbCommitThreadMap = new ConcurrentHashMap<>();
    }

    // 創(chuàng)建線程锤窑,數(shù)據(jù)變更開(kāi)始建立
    public static String createDBCommitThread (Connection connection, String[] sqlList) {
        // 超出內(nèi)容
        if (dbCommitThreadMap.size() >= 20) {
            throw new EasiSqlException("Too mush executeBatch Thread");
        }
        String uuid = UUID.randomUUID().toString();
        DBCommitThread newThread = new DBCommitThread(connection, sqlList, uuid);
        newThread.start();
        dbCommitThreadMap.put(uuid, newThread);
        return uuid;
    }

    // 觸發(fā)設(shè)置指令級(jí)別,二段式提交的提交指令
    public static void setCurrentThreadCommitLevel (String threadId, String commitLevel) {
        if (ThreadFactory.dbCommitThreadMap.containsKey(threadId)) {
            DBCommitThread dbCommitThread = ThreadFactory.dbCommitThreadMap.get(threadId);
            dbCommitThread.setCommitLevel(commitLevel);
        }
    }

    // 清理程序定期清理
    public static void clearEndThread () {
        Set<String> mapSet = ThreadFactory.dbCommitThreadMap.keySet();
        Iterator it = mapSet.iterator();
        while(it.hasNext()) {
            DBCommitThread dbCommitThread = (DBCommitThread) it.next();
            if (dbCommitThread.end) {
                it.remove();
            }
        }
    }
}

具體的業(yè)務(wù)流程對(duì)應(yīng)上述的方法如下

  1. createDBCommitThread開(kāi)始創(chuàng)建
  2. notifyToServer通知中央管理層更新變更狀態(tài)和數(shù)據(jù)
  3. 等待指令觸發(fā)setCurrentThreadCommitLevel嚷炉,分別對(duì)應(yīng)提交成功和提交回滾
  4. clearEndThread清理程序運(yùn)行
  5. 中央控制程序關(guān)閉數(shù)據(jù)變更

結(jié)語(yǔ)

當(dāng)然渊啰,整體的軟件架構(gòu)遠(yuǎn)遠(yuǎn)沒(méi)有描述的這么簡(jiǎn)單,平臺(tái)級(jí)別的軟件體系涉及到了非常龐大的數(shù)據(jù)庫(kù)個(gè)數(shù)申屹,其中對(duì)數(shù)據(jù)執(zhí)行層绘证、管理層、用戶驗(yàn)證層做了分布式的拆分哗讥,數(shù)據(jù)執(zhí)行層參照微服務(wù)里面的sidecar設(shè)計(jì)嚷那,它是數(shù)據(jù)庫(kù)的一個(gè)配套服務(wù),執(zhí)行層配合管理層做了服務(wù)發(fā)現(xiàn)忌栅,進(jìn)行機(jī)器的統(tǒng)一管理车酣,多活曲稼,對(duì)外公布API服務(wù)索绪,死機(jī)器踢出,而數(shù)據(jù)變更自動(dòng)化僅僅是數(shù)據(jù)中臺(tái)設(shè)計(jì)中的一個(gè)子模塊設(shè)計(jì)贫悄,究其本質(zhì)瑞驱,都是為了讓?xiě)?yīng)用的開(kāi)發(fā)維護(hù)能更方便,可以給用戶提供更優(yōu)質(zhì)窄坦、快速的數(shù)據(jù)服務(wù)唤反。

頂層的代碼設(shè)計(jì),Hunter為管理層鸭津,watch dog為執(zhí)行層彤侍,雙方均支持可擴(kuò)展
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市逆趋,隨后出現(xiàn)的幾起案子盏阶,更是在濱河造成了極大的恐慌,老刑警劉巖闻书,帶你破解...
    沈念sama閱讀 216,402評(píng)論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件名斟,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡魄眉,警方通過(guò)查閱死者的電腦和手機(jī)砰盐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)坑律,“玉大人岩梳,你說(shuō)我怎么就攤上這事。” “怎么了冀值?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,483評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵淘捡,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我池摧,道長(zhǎng)焦除,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,165評(píng)論 1 292
  • 正文 為了忘掉前任作彤,我火速辦了婚禮膘魄,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘竭讳。我一直安慰自己创葡,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,176評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布绢慢。 她就那樣靜靜地躺著灿渴,像睡著了一般。 火紅的嫁衣襯著肌膚如雪胰舆。 梳的紋絲不亂的頭發(fā)上骚露,一...
    開(kāi)封第一講書(shū)人閱讀 51,146評(píng)論 1 297
  • 那天,我揣著相機(jī)與錄音缚窿,去河邊找鬼棘幸。 笑死,一個(gè)胖子當(dāng)著我的面吹牛倦零,可吹牛的內(nèi)容都是我干的误续。 我是一名探鬼主播,決...
    沈念sama閱讀 40,032評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼扫茅,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼蹋嵌!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起葫隙,我...
    開(kāi)封第一講書(shū)人閱讀 38,896評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤栽烂,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后停蕉,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體愕鼓,經(jīng)...
    沈念sama閱讀 45,311評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,536評(píng)論 2 332
  • 正文 我和宋清朗相戀三年慧起,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了菇晃。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,696評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡蚓挤,死狀恐怖磺送,靈堂內(nèi)的尸體忽然破棺而出驻子,到底是詐尸還是另有隱情,我是刑警寧澤估灿,帶...
    沈念sama閱讀 35,413評(píng)論 5 343
  • 正文 年R本政府宣布崇呵,位于F島的核電站,受9級(jí)特大地震影響馅袁,放射性物質(zhì)發(fā)生泄漏域慷。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,008評(píng)論 3 325
  • 文/蒙蒙 一汗销、第九天 我趴在偏房一處隱蔽的房頂上張望犹褒。 院中可真熱鬧,春花似錦弛针、人聲如沸叠骑。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)宙枷。三九已至,卻和暖如春茧跋,著一層夾襖步出監(jiān)牢的瞬間慰丛,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,815評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工厌衔, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留璧帝,地道東北人捍岳。 一個(gè)月前我還...
    沈念sama閱讀 47,698評(píng)論 2 368
  • 正文 我出身青樓富寿,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親锣夹。 傳聞我的和親對(duì)象是個(gè)殘疾皇子页徐,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,592評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容