背景
? ? ? 在項目中使用多線程抓取第三方數(shù)據(jù)執(zhí)行數(shù)據(jù)入庫時,如果某個子線程執(zhí)行異常晾捏,其他子線事務(wù)全部回滾官辽,spring對多線程無法進行事務(wù)控制,是因為多線程底層連接數(shù)據(jù)庫的時候粟瞬,是使用的線程變量(TheadLocal),線程之間事務(wù)隔離同仆,每個線程有自己的連接,事務(wù)肯定不是同一個了裙品。
解決辦法
? ? ?思想就是使用兩個CountDownLatch實現(xiàn)子線程的二段提交
? ? 步驟:
??1俗批、主線程將任務(wù)分發(fā)給子線程俗或,然后使用childMonitor.await();阻塞主線程,等待所有子線程處理向數(shù)據(jù)庫中插入的業(yè)務(wù)岁忘,并使用BlockingDeque存儲線程的返回結(jié)果辛慰。
? ? ?2、使用childMonitor.countDown()釋放子線程鎖定干像,同時使用mainMonitor.await();阻塞子線程帅腌,將程序的控制權(quán)交還給主線程。
? ? ?3麻汰、主線程檢查子線程執(zhí)行任務(wù)的結(jié)果速客,若有失敗結(jié)果出現(xiàn),主線程標記狀態(tài)告知子線程回滾五鲫,然后使用mainMonitor.countDown();將程序控制權(quán)再次交給子線程溺职,子線程檢測回滾標志,判斷是否回滾位喂。
代碼實現(xiàn)
線程池工具類
publicclassThreadPoolTool {
? ? /**? ? * 多線程任務(wù)
? ? * @param transactionManager
? ? * @param data
? ? * @param threadCount
? ? * @param params
? ? * @param clazz
? ? */publicvoidexcuteTask(DataSourceTransactionManager transactionManager, List data,intthreadCount, Map params, Class clazz) {
? ? ? ? if(data ==null|| data.size() == 0) {
? ? ? ? ? ? return;
? ? ? ? }
? ? ? ? intbatch = 0;
? ? ? ? ExecutorService executor = Executors.newFixedThreadPool(threadCount);
? ? ? ? //監(jiān)控子線程的任務(wù)執(zhí)行CountDownLatch childMonitor =new CountDownLatch(threadCount);
? ? ? ? //監(jiān)控主線程浪耘,是否需要回滾CountDownLatch mainMonitor =newCountDownLatch(1);
? ? ? ? //存儲任務(wù)的返回結(jié)果,返回true表示不需要回滾塑崖,反之七冲,則回滾BlockingDeque results =newLinkedBlockingDeque(threadCount);
? ? ? ? RollBack rollback =newRollBack(false);
? ? ? ? try {
? ? ? ? ? ? LinkedBlockingQueue queue = splitQueue(data, threadCount);
? ? ? ? ? ? while(true) {
? ? ? ? ? ? ? ? List list = queue.poll();
? ? ? ? ? ? ? ? if(list ==null) {
? ? ? ? ? ? ? ? ? ? break;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? batch++;
? ? ? ? ? ? ? ? params.put("batch", batch);
? ? ? ? ? ? ? ? Constructor constructor = clazz.getConstructor(newClass[]{CountDownLatch.class, CountDownLatch.class, BlockingDeque.class, RollBack.class, DataSourceTransactionManager.class, Object.class, Map.class});
? ? ? ? ? ? ? ? ThreadTask task = (ThreadTask) constructor.newInstance(childMonitor, mainMonitor, results, rollback, transactionManager, list, params);
? ? ? ? ? ? ? ? executor.execute(task);
? ? ? ? ? ? }
? ? ? ? ? ? //? 1、主線程將任務(wù)分發(fā)給子線程规婆,然后使用childMonitor.await();阻塞主線程癞埠,等待所有子線程處理向數(shù)據(jù)庫中插入的業(yè)務(wù)。? ? ? ? ? ? childMonitor.await();
? ? ? ? ? ? System.out.println("主線程開始執(zhí)行任務(wù)");
? ? ? ? ? ? //根據(jù)返回結(jié)果來確定是否回滾for(inti = 0; i < threadCount; i++) {
? ? ? ? ? ? ? ? Boolean result = results.take();
? ? ? ? ? ? ? ? if(!result) {
? ? ? ? ? ? ? ? ? ? //有線程執(zhí)行異常聋呢,需要回滾子線程rollback.setNeedRoolBack(true);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? //? 3苗踪、主線程檢查子線程執(zhí)行任務(wù)的結(jié)果,若有失敗結(jié)果出現(xiàn)削锰,主線程標記狀態(tài)告知子線程回滾通铲,然后使用mainMonitor.countDown();將程序控制權(quán)再次交給子線程,子線程檢測回滾標志器贩,判斷是否回滾颅夺。? ? ? ? ? ? mainMonitor.countDown();
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? log.error(e.getMessage());
? ? ? ? } finally {
? ? ? ? ? ? //關(guān)閉線程池,釋放資源? ? ? ? ? ? executor.shutdown();
? ? ? ? }
? ? }
? ? /**? ? * 隊列拆分
? ? *
? ? * @param data 需要執(zhí)行的數(shù)據(jù)集合
? ? * @param threadCount 核心線程數(shù)
? ? * @return*/privateLinkedBlockingQueue> splitQueue(List data,int threadCount) {
? ? ? ? LinkedBlockingQueue> queueBatch =new LinkedBlockingQueue();
? ? ? ? inttotal = data.size();
? ? ? ? intoneSize = total / threadCount;
? ? ? ? int start;
? ? ? ? int end;
? ? ? ? for(inti = 0; i < threadCount; i++) {
? ? ? ? ? ? start = i * oneSize;
? ? ? ? ? ? end = (i + 1) * oneSize;
? ? ? ? ? ? if(i < threadCount - 1) {
? ? ? ? ? ? ? ? queueBatch.add(data.subList(start, end));
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? queueBatch.add(data.subList(start, data.size()));
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? return queueBatch;
? ? }
}
子線程任務(wù)執(zhí)行類
publicabstractclassThreadTaskimplements Runnable {
? ? /**? ? * 監(jiān)控子任務(wù)的執(zhí)行
? ? */private CountDownLatch childMonitor;
? ? /**? ? * 監(jiān)控主線程
? ? */private CountDownLatch mainMonitor;
? ? /**? ? * 存儲線程的返回結(jié)果
? ? */privateBlockingDeque resultList;
? ? /**? ? * 回滾類
? ? */private RollBack rollback;
? ? privateMap params;
? ? protected Object obj;
? ? protected DataSourceTransactionManager transactionManager;
? ? protected TransactionStatus status;
? ? publicThreadTask(CountDownLatch childCountDown, CountDownLatch mainCountDown, BlockingDeque result, RollBack rollback, DataSourceTransactionManager transactionManager, Object obj,Map params) {
? ? ? ? this.childMonitor = childCountDown;
? ? ? ? this.mainMonitor = mainCountDown;
? ? ? ? this.resultList = result;
? ? ? ? this.rollback = rollback;
? ? ? ? this.transactionManager = transactionManager;
? ? ? ? this.obj = obj;
? ? ? ? this.params = params;
? ? ? ? initParam();
? ? }
? ? /**? ? * 事務(wù)回滾
? ? */privatevoid rollBack() {
? ? ? ? System.out.println(Thread.currentThread().getName()+"開始回滾");
? ? ? ? transactionManager.rollback(status);
? ? }
? ? /**? ? * 事務(wù)提交
? ? */privatevoid submit() {
? ? ? ? System.out.println(Thread.currentThread().getName()+"提交事務(wù)");
? ? ? ? transactionManager.commit(status);
? ? }
? ? protected Object getParam(String key){
? ? ? ? return params.get(key);
? ? }
? ? publicabstractvoid initParam();
? ? /**? ? * 執(zhí)行任務(wù),返回false表示任務(wù)執(zhí)行錯誤蛹稍,需要回滾
? ? * @return*/publicabstractboolean processTask();
? ? @Override
? ? publicvoid run() {
? ? ? ? System.out.println(Thread.currentThread().getName()+"子線程開始執(zhí)行任務(wù)");
? ? ? ? DefaultTransactionDefinition def =new DefaultTransactionDefinition();
? ? ? ? def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
? ? ? ? status = transactionManager.getTransaction(def);
? ? ? ? Boolean result = processTask();
? ? ? ? //向隊列中添加處理結(jié)果? ? ? ? resultList.add(result);
? ? ? ? //2吧黄、使用childMonitor.countDown()釋放子線程鎖定,同時使用mainMonitor.await();阻塞子線程唆姐,將程序的控制權(quán)交還給主線程拗慨。? ? ? ? childMonitor.countDown();
? ? ? ? try {
? ? ? ? ? ? //等待主線程的判斷邏輯執(zhí)行完,執(zhí)行下面的是否回滾邏輯? ? ? ? ? ? mainMonitor.await();
? ? ? ? } catch (Exception e) {
? ? ? ? ? log.error(e.getMessage());
? ? ? ? }
? ? ? ? System.out.println(Thread.currentThread().getName()+"子線程執(zhí)行剩下的任務(wù)");
? ? ? ? //3、主線程檢查子線程執(zhí)行任務(wù)的結(jié)果赵抢,若有失敗結(jié)果出現(xiàn)剧蹂,主線程標記狀態(tài)告知子線程回滾,然后使用mainMonitor.countDown();將程序控制權(quán)再次交給子線程烦却,子線程檢測回滾標志宠叼,判斷是否回滾。if (rollback.isNeedRoolBack()) {
? ? ? ? ? ? rollBack();
? ? ? ? }else{
? ? ? ? ? ? //事務(wù)提交? ? ? ? ? ? submit();
? ? ? ? }
? ? }
回滾標記類
@Datapublicclass RollBack {
? ? publicRollBack(boolean needRoolBack) {
? ? ? ? this.needRoolBack = needRoolBack;
? ? }
? ? privateboolean needRoolBack;
}
?使用線程池工具:
? 1其爵,首先建立自己的任務(wù)執(zhí)行類 并且?extends ThreadTask 冒冬,實現(xiàn)initParam()和processTask()方法
/** * 多線程處理任務(wù)類
*/publicclassTestTaskextends ThreadTask {
? ? /**? ? ? 分批處理的數(shù)據(jù)
? ? */privateList objectList;
? ? /**? ? * 可能需要注入的某些服務(wù)
? ? */private TestService testService;
? ? publicTestTask(CountDownLatch childCountDown, CountDownLatch mainCountDown, BlockingDeque result, RollBack rollback, DataSourceTransactionManager transactionManager, Object obj, Map params) {
? ? ? ? super(childCountDown, mainCountDown, result, rollback, transactionManager, obj, params);
? ? }
? ? @Override
? ? publicvoid initParam() {
? ? ? ? this.objectList = (List) getParam("objectList");
? ? ? ? this.testService = (TestService) getParam("testService");
? ? }
? ? /**? ? * 執(zhí)行任務(wù),返回false表示任務(wù)執(zhí)行錯誤,需要回滾
? ? * @return*/? ? @Override
? ? publicboolean processTask() {
? ? ? ? try {
? ? ? ? ? ? for (Object o : objectList) {
? ? ? ? ? ? ? ? testService.list();
? ? ? ? ? ? ? ? System.out.println(o.toString()+"執(zhí)行自己的多線程任務(wù)邏輯");
? ? ? ? ? ? }
? ? ? ? ? ? returntrue;
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? returnfalse;
? ? ? ? }
? ? }
}
2摩渺,編寫主任務(wù)執(zhí)行方法
/**? ? * 執(zhí)行多線程任務(wù)方法
? ? */publicvoid testThreadTask() {
? ? ? ? try {
? ? ? ? ? ? ? ? intthreadCount = 5;
? ? ? ? ? ? ? ? //需要分批處理的數(shù)據(jù)List objectList =newArrayList<>();
? ? ? ? ? ? ? ? Map params =newHashMap<>();
? ? ? ? ? ? ? ? params.put("objectList",objectList);
? ? ? ? ? ? ? ? params.put("testService",testService);
? ? ? ? ? ? ? ? //調(diào)用多線程工具方法threadPoolTool.excuteTask(transactionManager,objectList,threadCount,params, TestTask.class);
? ? ? ? }catch (Exception e){
? ? ? ? ? ? thrownew RuntimeException(e.getMessage());
? ? ? ? }
? ? }