前言
利用線程池和CountDownLatch,多線程并發(fā)處理批量數(shù)據(jù)捧弃,實(shí)現(xiàn)多線程事務(wù)回滾拿诸,事務(wù)補(bǔ)償跟压。
//定義兩計(jì)數(shù)器
private CountDownLatch begin,end;
begin設(shè)置為1胰蝠,用于發(fā)布開始命令,如果需要開始震蒋,則begin.countdown
end用于記錄任務(wù)的執(zhí)行情況茸塞。begin.countdown后,需end.await查剖,等待任務(wù)都執(zhí)行完钾虐。
當(dāng)begin.countdown開始執(zhí)行任務(wù)后,在最后需end.countdown
當(dāng)end.countdown減到為0后笋庄,則切換到主線程效扫,繼續(xù)開始往下執(zhí)行
基于回調(diào)函數(shù)
實(shí)現(xiàn)更靈活的去配置各業(yè)務(wù)數(shù)據(jù)操作場(chǎng)景,即:暴露excute方法執(zhí)行線程任務(wù),執(zhí)行的具體執(zhí)行任務(wù)交給回調(diào)函數(shù)實(shí)現(xiàn)直砂。
基于spring上下文中獲取事務(wù)管理器
封裝獲取spring上下文工具類
ApplicationContextProvider
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
/**
* @Author by mocar小師兄
* @DESC: 從已有的spring上下文取得已實(shí)例化的bean
*/
@Component
public class ApplicationContextProvider implements ApplicationContextAware {
private static final Logger log = LoggerFactory.getLogger(ApplicationContextProvider.class);
private static ApplicationContext applicationContext;
/**
* 設(shè)置spring上下文
* @param applicationContext spring上下文
* @throws BeansException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
log.info("spring上下文applicationContext正在初始化,application:{}" ,applicationContext);
this.applicationContext = applicationContext;
log.info("spring上下文applicationContext初始化完成!");
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
public static Object getBean(String name){
if(applicationContext==null){
log.warn("applicationContext是空的");
return null;
}
return getApplicationContext().getBean(name);
}
public static <T> T getBean(Class<T> clazz){
return getApplicationContext().getBean(clazz);
}
}
封裝的工具類
package com.example.javademo.transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
public class TransactionMultipartExecutor<T> {
private static final Logger log = LoggerFactory.getLogger(TransactionMultipartExecutor.class);
/**
* 單個(gè)線程處理的數(shù)據(jù)量
*/
private int singleCount;
/**
* 處理的總數(shù)據(jù)量
*/
private int listSize;
/**
* 開啟的線程數(shù)
*/
private int runSize;
/**
* 操作的數(shù)據(jù)集
*/
private List<T> list;
/**
* 計(jì)數(shù)器(攔截器)
*/
private CountDownLatch begin, end;
/**
* 線程池
*/
private ExecutorService executorService;
/**
* 是否存在異常
*/
private AtomicReference<Boolean> isError = new AtomicReference<>(false);
/**
* 回調(diào)函數(shù)
*/
private CallBack callBack;
/**
* 概率模擬報(bào)錯(cuò)
*/
private Random random = new Random();
/**
* 事務(wù)管理器
*/
private PlatformTransactionManager transactionManager;
public void setCallBack(CallBack callBack) {
this.callBack = callBack;
}
public TransactionMultipartExecutor(int singleCount, List<T> list) {
if (singleCount <= 0 || CollectionUtils.isEmpty(list)){
throw new RuntimeException("Illegal parameter");
}
//transactionManager = ContextLoader.getCurrentWebApplicationContext().getBean(PlatformTransactionManager.class);
transactionManager = ApplicationContextProvider.getBean(PlatformTransactionManager.class);
this.singleCount = singleCount;
this.list = list;
this.listSize = list.size();
this.runSize = (this.listSize%this.singleCount)==0 ? this.listSize/this.singleCount : this.listSize/this.singleCount + 1;
}
public void excute() throws InterruptedException {
// 創(chuàng)建固定線程數(shù)量的線程池
executorService = Executors.newFixedThreadPool(runSize);
begin = new CountDownLatch(1);
end = new CountDownLatch(runSize);
//創(chuàng)建線程
int startIndex = 0;
int endIndex = 0;
List<T> newList = null;
for (int i = 0; i < runSize; i++) {
//計(jì)算每個(gè)線程對(duì)應(yīng)的數(shù)據(jù)
if (i < (runSize - 1)) {
startIndex = i * singleCount;
endIndex = (i + 1) * singleCount;
newList = list.subList(startIndex, endIndex);
} else {
startIndex = i * singleCount;
endIndex = listSize;
newList = list.subList(startIndex, endIndex);
}
//創(chuàng)建線程類處理數(shù)據(jù)
MyThread<T> myThread = new MyThread(newList, begin, end) {
@Override
public void method(List list) {
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
TransactionStatus status = transactionManager.getTransaction(def);
//具體執(zhí)行邏輯交給回調(diào)函數(shù)
try {
callBack.method(list);
/*if (random.nextInt(2) == 1) {
throw new RuntimeException("模擬異常拋出錯(cuò)誤回滾");
}*/
log.warn("多線程事務(wù)批量操作執(zhí)行成功,線程名:{},操作成功數(shù)量:{}",Thread.currentThread().getName(), list.size());
} catch (Exception e) {
// 接收異常,處理異常
isError.set(true);
//e.printStackTrace();
log.error("多線程事務(wù)批量操作拋錯(cuò),線程名:{},操作失敗數(shù)量:{},報(bào)錯(cuò)信息:{},{}",Thread.currentThread().getName(),list.size(),e.toString(), e);
}
//計(jì)數(shù)器減一
end.countDown();
try {
//等待所有線程任務(wù)完成荡短,監(jiān)控是否有異常,有則統(tǒng)一回滾
//log.warn("等待所有任務(wù)執(zhí)行完成,當(dāng)前時(shí)間:{},當(dāng)前end計(jì)數(shù):{}", LocalDateTime.now(), end.getCount());
end.await();
//log.warn("完成所有任務(wù),當(dāng)前時(shí)間:{},當(dāng)前end計(jì)數(shù):{}", LocalDateTime.now(), end.getCount());
if (isError.get()) {
// 事務(wù)回滾
transactionManager.rollback(status);
} else {
//事務(wù)提交
transactionManager.commit(status);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//執(zhí)行線程
executorService.execute(myThread);
}
//計(jì)數(shù)器減一哆键,開始執(zhí)行任務(wù) begin此時(shí)為0
begin.countDown();//
//等待任務(wù)全部執(zhí)行完畢掘托,變?yōu)?則任務(wù)全部完成
end.await();
//關(guān)閉線程池
executorService.shutdown();
//不拋錯(cuò)也是可以回滾的
/*if (isError.get()) {
// 主線程拋出自定義的異常
throw new RuntimeException("主線程拋出模擬異常");
}*/
}
//抽象線程類
public abstract class MyThread<T> implements Runnable {
//list:總數(shù)據(jù)分割后某線程負(fù)責(zé)執(zhí)行的數(shù)據(jù)
private List<T> list;
private CountDownLatch begin, end;
public MyThread(List<T> list, CountDownLatch begin, CountDownLatch end) {
this.list = list;
this.begin = begin;
this.end = end;
}
@Override
public void run() {
try {
begin.await();
//執(zhí)行程序
method(list);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//計(jì)數(shù)器減一
//end.countDown();
}
}
public abstract void method(List<T> list);
}
//回調(diào)接口定義
public interface CallBack<T> {
public void method(List<T> list);
}
public static void main(String[] agrs) {
List<String> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add("hello" + i);
}
TransactionMultipartExecutor<String> tool = new TransactionMultipartExecutor(3, list);
tool.setCallBack(new CallBack<String>() {
@Override
public void method(List<String> list) {
//總數(shù)據(jù)分割后某線程負(fù)責(zé)執(zhí)行的數(shù)據(jù)
for (int i = 0; i < list.size(); i++) {
System.out.print(Thread.currentThread().getId() + ":" + list.get(i) + " ");
}
System.out.println();
}
});
try {
tool.excute();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}