多線程事務(wù)控制

背景

? ? ? 在項目中使用多線程抓取第三方數(shù)據(jù)執(zhí)行數(shù)據(jù)入庫時,如果某個子線程執(zhí)行異常晾捏,其他子線事務(wù)全部回滾官辽,spring對多線程無法進行事務(wù)控制,是因為多線程底層連接數(shù)據(jù)庫的時候粟瞬,是使用的線程變量(TheadLocal),線程之間事務(wù)隔離同仆,每個線程有自己的連接,事務(wù)肯定不是同一個了裙品。

解決辦法

? ? ?思想就是使用兩個CountDownLatch實現(xiàn)子線程的二段提交

? ? 步驟:

??1俗批、主線程將任務(wù)分發(fā)給子線程俗或,然后使用childMonitor.await();阻塞主線程,等待所有子線程處理向數(shù)據(jù)庫中插入的業(yè)務(wù)岁忘,并使用BlockingDeque存儲線程的返回結(jié)果辛慰。

? ? ?2、使用childMonitor.countDown()釋放子線程鎖定干像,同時使用mainMonitor.await();阻塞子線程帅腌,將程序的控制權(quán)交還給主線程。

? ? ?3麻汰、主線程檢查子線程執(zhí)行任務(wù)的結(jié)果速客,若有失敗結(jié)果出現(xiàn),主線程標記狀態(tài)告知子線程回滾五鲫,然后使用mainMonitor.countDown();將程序控制權(quán)再次交給子線程溺职,子線程檢測回滾標志,判斷是否回滾位喂。

代碼實現(xiàn)

線程池工具類

publicclassThreadPoolTool {

? ? /**? ? * 多線程任務(wù)

? ? * @param transactionManager

? ? * @param data

? ? * @param threadCount

? ? * @param params

? ? * @param clazz

? ? */publicvoidexcuteTask(DataSourceTransactionManager transactionManager, List data,intthreadCount, Map params, Class clazz) {

? ? ? ? if(data ==null|| data.size() == 0) {

? ? ? ? ? ? return;

? ? ? ? }

? ? ? ? intbatch = 0;

? ? ? ? ExecutorService executor = Executors.newFixedThreadPool(threadCount);

? ? ? ? //監(jiān)控子線程的任務(wù)執(zhí)行CountDownLatch childMonitor =new CountDownLatch(threadCount);

? ? ? ? //監(jiān)控主線程浪耘,是否需要回滾CountDownLatch mainMonitor =newCountDownLatch(1);

? ? ? ? //存儲任務(wù)的返回結(jié)果,返回true表示不需要回滾塑崖,反之七冲,則回滾BlockingDeque results =newLinkedBlockingDeque(threadCount);

? ? ? ? RollBack rollback =newRollBack(false);

? ? ? ? try {

? ? ? ? ? ? LinkedBlockingQueue queue = splitQueue(data, threadCount);

? ? ? ? ? ? while(true) {

? ? ? ? ? ? ? ? List list = queue.poll();

? ? ? ? ? ? ? ? if(list ==null) {

? ? ? ? ? ? ? ? ? ? break;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? batch++;

? ? ? ? ? ? ? ? params.put("batch", batch);

? ? ? ? ? ? ? ? Constructor constructor = clazz.getConstructor(newClass[]{CountDownLatch.class, CountDownLatch.class, BlockingDeque.class, RollBack.class, DataSourceTransactionManager.class, Object.class, Map.class});

? ? ? ? ? ? ? ? ThreadTask task = (ThreadTask) constructor.newInstance(childMonitor, mainMonitor, results, rollback, transactionManager, list, params);

? ? ? ? ? ? ? ? executor.execute(task);

? ? ? ? ? ? }

? ? ? ? ? ? //? 1、主線程將任務(wù)分發(fā)給子線程规婆,然后使用childMonitor.await();阻塞主線程癞埠,等待所有子線程處理向數(shù)據(jù)庫中插入的業(yè)務(wù)。? ? ? ? ? ? childMonitor.await();

? ? ? ? ? ? System.out.println("主線程開始執(zhí)行任務(wù)");

? ? ? ? ? ? //根據(jù)返回結(jié)果來確定是否回滾for(inti = 0; i < threadCount; i++) {

? ? ? ? ? ? ? ? Boolean result = results.take();

? ? ? ? ? ? ? ? if(!result) {

? ? ? ? ? ? ? ? ? ? //有線程執(zhí)行異常聋呢,需要回滾子線程rollback.setNeedRoolBack(true);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? ? ? //? 3苗踪、主線程檢查子線程執(zhí)行任務(wù)的結(jié)果,若有失敗結(jié)果出現(xiàn)削锰,主線程標記狀態(tài)告知子線程回滾通铲,然后使用mainMonitor.countDown();將程序控制權(quán)再次交給子線程,子線程檢測回滾標志器贩,判斷是否回滾颅夺。? ? ? ? ? ? mainMonitor.countDown();

? ? ? ? } catch (Exception e) {

? ? ? ? ? ? log.error(e.getMessage());

? ? ? ? } finally {

? ? ? ? ? ? //關(guān)閉線程池,釋放資源? ? ? ? ? ? executor.shutdown();

? ? ? ? }

? ? }

? ? /**? ? * 隊列拆分

? ? *

? ? * @param data 需要執(zhí)行的數(shù)據(jù)集合

? ? * @param threadCount 核心線程數(shù)

? ? * @return*/privateLinkedBlockingQueue> splitQueue(List data,int threadCount) {

? ? ? ? LinkedBlockingQueue> queueBatch =new LinkedBlockingQueue();

? ? ? ? inttotal = data.size();

? ? ? ? intoneSize = total / threadCount;

? ? ? ? int start;

? ? ? ? int end;

? ? ? ? for(inti = 0; i < threadCount; i++) {

? ? ? ? ? ? start = i * oneSize;

? ? ? ? ? ? end = (i + 1) * oneSize;

? ? ? ? ? ? if(i < threadCount - 1) {

? ? ? ? ? ? ? ? queueBatch.add(data.subList(start, end));

? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? queueBatch.add(data.subList(start, data.size()));

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? return queueBatch;

? ? }

}


子線程任務(wù)執(zhí)行類

publicabstractclassThreadTaskimplements Runnable {

? ? /**? ? * 監(jiān)控子任務(wù)的執(zhí)行

? ? */private CountDownLatch childMonitor;

? ? /**? ? * 監(jiān)控主線程

? ? */private CountDownLatch mainMonitor;

? ? /**? ? * 存儲線程的返回結(jié)果

? ? */privateBlockingDeque resultList;

? ? /**? ? * 回滾類

? ? */private RollBack rollback;

? ? privateMap params;

? ? protected Object obj;

? ? protected DataSourceTransactionManager transactionManager;

? ? protected TransactionStatus status;

? ? publicThreadTask(CountDownLatch childCountDown, CountDownLatch mainCountDown, BlockingDeque result, RollBack rollback, DataSourceTransactionManager transactionManager, Object obj,Map params) {

? ? ? ? this.childMonitor = childCountDown;

? ? ? ? this.mainMonitor = mainCountDown;

? ? ? ? this.resultList = result;

? ? ? ? this.rollback = rollback;

? ? ? ? this.transactionManager = transactionManager;

? ? ? ? this.obj = obj;

? ? ? ? this.params = params;

? ? ? ? initParam();

? ? }

? ? /**? ? * 事務(wù)回滾

? ? */privatevoid rollBack() {

? ? ? ? System.out.println(Thread.currentThread().getName()+"開始回滾");

? ? ? ? transactionManager.rollback(status);

? ? }

? ? /**? ? * 事務(wù)提交

? ? */privatevoid submit() {

? ? ? ? System.out.println(Thread.currentThread().getName()+"提交事務(wù)");

? ? ? ? transactionManager.commit(status);

? ? }

? ? protected Object getParam(String key){

? ? ? ? return params.get(key);

? ? }

? ? publicabstractvoid initParam();

? ? /**? ? * 執(zhí)行任務(wù),返回false表示任務(wù)執(zhí)行錯誤蛹稍,需要回滾

? ? * @return*/publicabstractboolean processTask();

? ? @Override

? ? publicvoid run() {

? ? ? ? System.out.println(Thread.currentThread().getName()+"子線程開始執(zhí)行任務(wù)");

? ? ? ? DefaultTransactionDefinition def =new DefaultTransactionDefinition();

? ? ? ? def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);

? ? ? ? status = transactionManager.getTransaction(def);

? ? ? ? Boolean result = processTask();

? ? ? ? //向隊列中添加處理結(jié)果? ? ? ? resultList.add(result);

? ? ? ? //2吧黄、使用childMonitor.countDown()釋放子線程鎖定,同時使用mainMonitor.await();阻塞子線程唆姐,將程序的控制權(quán)交還給主線程拗慨。? ? ? ? childMonitor.countDown();

? ? ? ? try {

? ? ? ? ? ? //等待主線程的判斷邏輯執(zhí)行完,執(zhí)行下面的是否回滾邏輯? ? ? ? ? ? mainMonitor.await();

? ? ? ? } catch (Exception e) {

? ? ? ? ? log.error(e.getMessage());

? ? ? ? }

? ? ? ? System.out.println(Thread.currentThread().getName()+"子線程執(zhí)行剩下的任務(wù)");

? ? ? ? //3、主線程檢查子線程執(zhí)行任務(wù)的結(jié)果赵抢,若有失敗結(jié)果出現(xiàn)剧蹂,主線程標記狀態(tài)告知子線程回滾,然后使用mainMonitor.countDown();將程序控制權(quán)再次交給子線程烦却,子線程檢測回滾標志宠叼,判斷是否回滾。if (rollback.isNeedRoolBack()) {

? ? ? ? ? ? rollBack();

? ? ? ? }else{

? ? ? ? ? ? //事務(wù)提交? ? ? ? ? ? submit();

? ? ? ? }

? ? }


回滾標記類


@Datapublicclass RollBack {

? ? publicRollBack(boolean needRoolBack) {

? ? ? ? this.needRoolBack = needRoolBack;

? ? }

? ? privateboolean needRoolBack;

}

?使用線程池工具:

? 1其爵,首先建立自己的任務(wù)執(zhí)行類 并且?extends ThreadTask 冒冬,實現(xiàn)initParam()和processTask()方法

/** * 多線程處理任務(wù)類

*/publicclassTestTaskextends ThreadTask {

? ? /**? ? ? 分批處理的數(shù)據(jù)

? ? */privateList objectList;

? ? /**? ? * 可能需要注入的某些服務(wù)

? ? */private TestService testService;

? ? publicTestTask(CountDownLatch childCountDown, CountDownLatch mainCountDown, BlockingDeque result, RollBack rollback, DataSourceTransactionManager transactionManager, Object obj, Map params) {

? ? ? ? super(childCountDown, mainCountDown, result, rollback, transactionManager, obj, params);

? ? }

? ? @Override

? ? publicvoid initParam() {

? ? ? ? this.objectList = (List) getParam("objectList");

? ? ? ? this.testService = (TestService) getParam("testService");

? ? }

? ? /**? ? * 執(zhí)行任務(wù),返回false表示任務(wù)執(zhí)行錯誤,需要回滾

? ? * @return*/? ? @Override

? ? publicboolean processTask() {

? ? ? ? try {

? ? ? ? ? ? for (Object o : objectList) {

? ? ? ? ? ? ? ? testService.list();

? ? ? ? ? ? ? ? System.out.println(o.toString()+"執(zhí)行自己的多線程任務(wù)邏輯");

? ? ? ? ? ? }

? ? ? ? ? ? returntrue;

? ? ? ? } catch (Exception e) {

? ? ? ? ? ? returnfalse;

? ? ? ? }

? ? }

}

2摩渺,編寫主任務(wù)執(zhí)行方法

/**? ? * 執(zhí)行多線程任務(wù)方法

? ? */publicvoid testThreadTask() {

? ? ? ? try {

? ? ? ? ? ? ? ? intthreadCount = 5;

? ? ? ? ? ? ? ? //需要分批處理的數(shù)據(jù)List objectList =newArrayList<>();

? ? ? ? ? ? ? ? Map params =newHashMap<>();

? ? ? ? ? ? ? ? params.put("objectList",objectList);

? ? ? ? ? ? ? ? params.put("testService",testService);

? ? ? ? ? ? ? ? //調(diào)用多線程工具方法threadPoolTool.excuteTask(transactionManager,objectList,threadCount,params, TestTask.class);

? ? ? ? }catch (Exception e){

? ? ? ? ? ? thrownew RuntimeException(e.getMessage());

? ? ? ? }

? ? }

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末简烤,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子证逻,更是在濱河造成了極大的恐慌乐埠,老刑警劉巖抗斤,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件囚企,死亡現(xiàn)場離奇詭異,居然都是意外死亡瑞眼,警方通過查閱死者的電腦和手機龙宏,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來伤疙,“玉大人银酗,你說我怎么就攤上這事⊥较瘢” “怎么了黍特?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長锯蛀。 經(jīng)常有香客問我灭衷,道長,這世上最難降的妖魔是什么旁涤? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任翔曲,我火速辦了婚禮,結(jié)果婚禮上劈愚,老公的妹妹穿的比我還像新娘瞳遍。我一直安慰自己,他們只是感情好菌羽,可當我...
    茶點故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布掠械。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪份蝴。 梳的紋絲不亂的頭發(fā)上犁功,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天,我揣著相機與錄音婚夫,去河邊找鬼浸卦。 笑死,一個胖子當著我的面吹牛案糙,可吹牛的內(nèi)容都是我干的限嫌。 我是一名探鬼主播,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼时捌,長吁一口氣:“原來是場噩夢啊……” “哼怒医!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起奢讨,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤稚叹,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后拿诸,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體扒袖,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年亩码,在試婚紗的時候發(fā)現(xiàn)自己被綠了季率。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡描沟,死狀恐怖飒泻,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情吏廉,我是刑警寧澤泞遗,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站席覆,受9級特大地震影響史辙,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜娜睛,卻給世界環(huán)境...
    茶點故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一髓霞、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧畦戒,春花似錦方库、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽徐鹤。三九已至,卻和暖如春邀层,著一層夾襖步出監(jiān)牢的瞬間返敬,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工寥院, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留劲赠,地道東北人。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓秸谢,卻偏偏與公主長得像凛澎,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子估蹄,可洞房花燭夜當晚...
    茶點故事閱讀 44,713評論 2 354

推薦閱讀更多精彩內(nèi)容