場景:在普通的開發(fā)的時候,任務(wù)是單線程處理的锯岖,這這時候性能可能有點慢介袜。基于 juc 包下的ThreadPoolExecutor 進行開發(fā)出吹,可以轉(zhuǎn)換成為批處理的遇伞,使性能成倍提高
出現(xiàn)主要的問題:將任務(wù)切割成為子任務(wù)的時候,事務(wù)統(tǒng)一性被破壞捶牢。
環(huán)境:
springboot:2.2.0.RELEASE
flowable:6.4.2
分析步驟:
Step1. ThreadPoolExecutor 的基本用法鸠珠,編寫通用工具類
Step2. 基于面向接口開發(fā)巍耗,進行通用抽象
Step3. 分析spring事務(wù),將基于注解的聲明式事務(wù)渐排,改為編程式事務(wù)
Step4. 使用 變量表示來決定是否使用統(tǒng)一事務(wù)
Step1: ThreadPoolExecutor 簡單用法
基本處理代碼
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("flow-pool-%d")
.build();
int corePoolSize = 10;
int maximumPoolSize = 10;
long keepAliveTime = 3;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
namedThreadFactory) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
// 線程處理前置方法
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
// 線程處理后置方法
}
};
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < group; i++) {
int startIndex = i * groupSize;
int endIndex = (i + 1) * groupSize;
if (endIndex > toDoList.size()) {
endIndex = toDoList.size();
}
List<?> items = toDoList.subList(startIndex, endIndex);
futures.add(executorService.submit(new SingleTransactionPoolTask(execution, items, flag)));
}
try {
for (Future<?> future : futures) {
future.get();
}
} catch (Exception e) {
e.printStackTrace();
// 業(yè)務(wù)操作
} finally {
executorService.shutdown();
}
- 構(gòu)造方法
名稱 | 類型 | 含義 |
---|---|---|
corePoolSize | int | 核心線程池大小 |
maximumPoolSize | int | 最大線程池大小 |
keepAliveTime | long | 線程最大空閑時間 |
unit | TimeUnit | 時間單位 |
workQueue | BlockingQueue<Runnable> | 線程等待隊列 |
threadFactory | ThreadFactory | 線程創(chuàng)建工廠 |
- ThreadPoolExecutor 重寫方法
方法名 | 作用 |
---|---|
protected void beforeExecute(Thread t, Runnable r) { } | 線程處理前置調(diào)用 |
protected void afterExecute(Runnable r, Throwable t) { } | 線程處理后置調(diào)用 |
protected void terminated() { } | 線程處理結(jié)束之后調(diào)用 |
在進行主線程拆分成多子線程并發(fā)處理的時候炬太,經(jīng)常會遇到部分主線程的數(shù)據(jù)無法在子線程獲取到,此時就可以通過重寫線程池 beforeExecute() 方法驯耻,將主線程數(shù)據(jù)同步到子線程中亲族。如:工作流的Authentication.setAuthenticatedUserId(currentUserId);
基于ThreadLocal 的全局變量設(shè)置
- 線程池調(diào)用任務(wù)
此處為線程池實際處理方法,
ExecutionService.submit(Runnable task);
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return {@code null} upon <em>successful</em> completion.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
Future<?> submit(Runnable task);
SingleTransactionPoolTask 實現(xiàn) runnable 接口
public class SingleTransactionPoolTask implements Runnable {
private final ThreadExecution threadExecution;
private final List<?> list;
private final BatchTransactionFlag flag;
public SingleTransactionPoolTask(ThreadExecution threadExecution, List<?> list, BatchTransactionFlag flag) {
this.threadExecution = threadExecution;
this.list = list;
this.flag = flag;
}
@Override
public void run() {
try {
threadExecution.threadExecute(list);
} finally {
flag.getCompleteThreads().incrementAndGet();
}
}
}
- 返回線程調(diào)用的 處理方法
主要進行子線程中是否有異常可缚,如果具有異常則應(yīng)該進行的對應(yīng)業(yè)務(wù)處理
try {
for (Future<?> future : futures) {
future.get();
}
} catch (Exception e) {
e.printStackTrace();
// 業(yè)務(wù)操作
} finally {
executorService.shutdown();
}
Step2: 基于面向接口開發(fā)霎迫,將業(yè)務(wù)操作進行多態(tài)
ThreadExecution 抽象子任務(wù)接口,具體不同業(yè)務(wù)編寫指定的實現(xiàn)類帘靡,形成多態(tài)知给。通用工具類統(tǒng)一調(diào)用接口
public interface ThreadExecution {
/**
* 處理線程任務(wù)
* @param list
*/
void threadExecute(List<?> list);
}
SingleTransactionPoolTask 通用任務(wù)實現(xiàn)類,基于 依賴倒置原則 調(diào)用 ThreadExecution
public class SingleTransactionPoolTask implements Runnable {
private final ThreadExecution threadExecution;
private final List<?> list;
private final BatchTransactionFlag flag;
public SingleTransactionPoolTask(ThreadExecution threadExecution, List<?> list, BatchTransactionFlag flag) {
this.threadExecution = threadExecution;
this.list = list;
this.flag = flag;
}
@Override
public void run() {
try {
threadExecution.threadExecute(list);
} finally {
flag.getCompleteThreads().incrementAndGet();
}
}
}
實現(xiàn) ThreadExecution 接口描姚,進行業(yè)務(wù)多態(tài)
BatchStartProcessThreadExecutionImpl
@Slf4j
public class BatchStartProcessThreadExecutionImpl implements ThreadExecution {
private RuntimeService runtimeService;
private List<BatchStartProcessInstanceRsp.ProcessInstanceItem> records;
public BatchStartProcessThreadExecutionImpl(List<BatchStartProcessInstanceRsp.ProcessInstanceItem> records) {
this.records = records;
this.runtimeService = SpringContextUtils.getBean(RuntimeService.class);
}
@Override
public void threadExecute(List list) {
// 省略業(yè)務(wù)代碼
}
}
BatchTaskCompleteTaskWithBatchTransactionThreadExecutionImpl
@Slf4j
public class BatchTaskCompleteTaskWithBatchTransactionThreadExecutionImpl implements ThreadExecution {
private List<BatchCompleteTaskRsp.CompleteTaskItem> result;
private FlowTaskService flowTaskService;
public BatchTaskCompleteTaskWithBatchTransactionThreadExecutionImpl(List<BatchCompleteTaskRsp.CompleteTaskItem> result) {
this.result = result;
this.flowTaskService = SpringContextUtils.getBean(FlowTaskService.class);
}
@Override
public void threadExecute(List list) {
// 省略業(yè)務(wù)代碼
}
}
Step3. 分析spring事務(wù)涩赢,將基于注解的聲明式事務(wù),改為編程式事務(wù)
在進行spring開發(fā)的時候轰胁,基本都是基于spring的聲明式事務(wù)(@Transactional)進行開發(fā)谒主,可以做到非常高效。但是基于多線程開發(fā)的時候赃阀,通過debug霎肯,可以發(fā)現(xiàn),主線程還沒有進行異常處理環(huán)節(jié)榛斯,子線程事務(wù)已經(jīng)提交观游,并且在數(shù)據(jù)庫已經(jīng)可以查詢到。 這個并不滿足于業(yè)務(wù)需求驮俗。(如圖)
基于對Spring事務(wù)bean之間關(guān)系的了解,事務(wù)都是圍繞著 TransactionManager王凑,實現(xiàn)類為:org.springframework.jdbc.datasource.DataSourceTransactionManager搪柑,可以找到接口org.springframework.transaction.PlatformTransactionManager,并且該接口具有如下的方法
PlatformTransactionManager接口的方法
方法名 | 功能 |
---|---|
TransactionStatus getTransaction(@Nullable TransactionDefinition definition) | 獲取當前的事務(wù) |
void commit(TransactionStatus status) | 提交事務(wù) |
void rollback(TransactionStatus status) | 回滾事務(wù) |
所以獲取事務(wù)的代碼則為
// 獲取事務(wù)
TransactionStatus transactionStatus = transactionManager.getTransaction(TransactionDefinition.withDefaults());
// 提交事務(wù)
transactionManager.commit(transactionStatus);
// 回滾事務(wù)
transactionManager.rollback(transactionStatus);
所以原本計劃是:根據(jù)傳入?yún)?shù)索烹,把事務(wù)從子線程中獲取工碾,再回到主線程中提交。不過事與愿違的是百姓,提交事務(wù)并沒有想象中那么直接渊额。會拋出異常
DefaultCommonThreadExecutionServiceBean
@Service
public class DefaultCommonThreadExecutionServiceBean implements CommonThreadExecutionService {
@Resource
private DataSourceTransactionManager transactionManager;
@Override
// @Transactional(rollbackFor = Exception.class)
public int executeBatch(ThreadExecution threadExecution, List<?> sequence, List<TransactionStatus> transactionStatusList) {
TransactionStatus transactionStatus = transactionManager.getTransaction(TransactionDefinition.withDefaults());
transactionStatusList.add(transactionStatus);
threadExecution.threadExecute(sequence);
return 0;
}
}
FlowThreadPoolExecutor 代碼段
DataSourceTransactionManager transactionManager = SpringContextUtils.getBean(DataSourceTransactionManager.class);
try {
for (Future future : futures) {
future.get();
}
transactionStatusList.forEach(obj -> {
transactionManager.commit(obj);
});
} catch (Exception e) {
e.printStackTrace();
transactionStatusList.forEach(obj -> {
transactionManager.rollback(obj);
});
} finally {
executorService.shutdown();
}
根據(jù)spring事務(wù)源碼分析可知,spring的事務(wù)也是基于ThreadLocal的旬迹,所以出現(xiàn)了跨越線程的時候火惊,就會出現(xiàn)無法執(zhí)行完成。并且由navicat無法操作數(shù)據(jù)庫可以看出奔垦,數(shù)據(jù)庫事務(wù)并未提交屹耐,出現(xiàn)了行鎖。
org.springframework.transaction.support.TransactionSynchronizationManager#unbindResource
/**
* Unbind a resource for the given key from the current thread.
* @param key the key to unbind (usually the resource factory)
* @return the previously bound value (usually the active resource object)
* @throws IllegalStateException if there is no value bound to the thread
* @see ResourceTransactionManager#getResourceFactory()
*/
public static Object unbindResource(Object key) throws IllegalStateException {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doUnbindResource(actualKey);
if (value == null) {
throw new IllegalStateException(
"No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
}
return value;
}
org.springframework.transaction.support.TransactionSynchronizationManager
雖然源碼中可以獲取看到事務(wù)提交代碼是 數(shù)據(jù)庫連接的提交宴倍,但是其中還是必須執(zhí)行 清除當前線程綁定的事務(wù)张症,才能徹底釋放數(shù)據(jù)庫連接。
提交事務(wù):org.springframework.jdbc.datasource.DataSourceTransactionManager#doCommit
@Override
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
con.commit();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}
拋出異常:org.springframework.transaction.support.AbstractPlatformTransactionManager#cleanupAfterCompletion
/**
* Clean up after completion, clearing synchronization if necessary,
* and invoking doCleanupAfterCompletion.
* @param status object representing the transaction
* @see #doCleanupAfterCompletion
*/
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
status.setCompleted();
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.clear();
}
if (status.isNewTransaction()) {
doCleanupAfterCompletion(status.getTransaction());
}
if (status.getSuspendedResources() != null) {
if (status.isDebug()) {
logger.debug("Resuming suspended transaction after completion of inner transaction");
}
Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
}
}
所以最終解決還是需要在子線程進行提交鸵贬,此時俗他,又可以使用線程池的重寫 java.util.concurrent.ThreadPoolExecutor#afterExecute
并且通過變量來確定子線程是否已經(jīng)執(zhí)行完成,如果執(zhí)行完成阔逼,才進行事務(wù)的提交
BatchTransactionFlag
@Getter
public class BatchTransactionFlag {
private final AtomicInteger completeThreads = new AtomicInteger();
private final AtomicInteger successThreads = new AtomicInteger();
private final int groupSize;
private boolean batchTransaction;
private Map<Long, TransactionStatus> longTransactionStatusMap;
private final List<?> toDoList;
public BatchTransactionFlag(int groupSize, boolean batchTransaction, List<?> toDoList) {
this.groupSize = groupSize;
this.batchTransaction = batchTransaction;
this.toDoList = toDoList;
if (batchTransaction) {
longTransactionStatusMap = new ConcurrentHashMap<>();
}
}
}
CommonThreadExecutionService實現(xiàn)
@Slf4j
@Service
public class DefaultCommonThreadExecutionServiceBean implements CommonThreadExecutionService {
@Resource
private DataSourceTransactionManager transactionManager;
@Override
public int executeBatch(ThreadExecution threadExecution, List sequence, Map<Long, TransactionStatus> longTransactionStatusMap, BatchTransactionFlag flag) {
synchronized (flag) {
TransactionStatus transactionStatus = transactionManager.getTransaction(TransactionDefinition.withDefaults());
longTransactionStatusMap.put(Thread.currentThread().getId(), transactionStatus);
try {
threadExecution.threadExecute(sequence);
flag.getSuccessThreads().incrementAndGet();
} finally {
flag.getCompleteThreads().incrementAndGet();
log.info("完成任務(wù):" + Thread.currentThread().getName());
}
}
return 0;
}
}
經(jīng)過測試發(fā)現(xiàn)兆衅,需要調(diào)用數(shù)據(jù)庫修改的步驟,還是需要同步塊的嗜浮,不使用會導(dǎo)致數(shù)據(jù)庫死鎖羡亩,導(dǎo)致處理超時
Step4. 使用 變量表示來決定是否使用統(tǒng)一事務(wù)
從上面可以看到由于面向接口進行處理,所以根據(jù)需要 統(tǒng)一事務(wù) 跟 不需要統(tǒng)一事務(wù) 又可以使用不同實現(xiàn)類來進行控制危融,并且在編寫線程池的時候也配合做判斷畏铆。
線程池執(zhí)行的代碼
for (int i = 0; i < group; i++) {
int startIndex = i * groupSize;
int endIndex = (i + 1) * groupSize;
if (endIndex > toDoList.size()) {
endIndex = toDoList.size();
}
List<?> items = toDoList.subList(startIndex, endIndex);
if (batchTransaction) {
futures.add(executorService.submit(new BatchTransactionPoolTask(execution, items, flag.getLongTransactionStatusMap(), flag)));
} else {
futures.add(executorService.submit(new SingleTransactionPoolTask(execution, items, flag)));
}
}
線程池的構(gòu)建
private static ThreadPoolExecutor createThreadPoolExecutorInstance(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
BatchTransactionFlag flag
) {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("flow-pool-%d")
.build();
String currentUserId = SecurityUtils.getCurrentUserId();
DataSourceTransactionManager transactionManager = SpringContextUtils.getBean(DataSourceTransactionManager.class);
return new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
namedThreadFactory) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
Authentication.setAuthenticatedUserId(currentUserId);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
if (flag.isBatchTransaction()) {
try {
while (flag.getCompleteThreads().get() != flag.getGroupSize()) {
log.info(Thread.currentThread().getName() + " 等待主線程:getGroupSize:" + flag.getGroupSize() + "\tgetCompleteThreads:" + flag.getCompleteThreads().get());
log.info("開啟事務(wù)個數(shù):" + flag.getLongTransactionStatusMap().size());
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
TransactionStatus status = flag.getLongTransactionStatusMap().get(Thread.currentThread().getId());
if (flag.getSuccessThreads().get() == flag.getCompleteThreads().get()) {
log.info(Thread.currentThread().getName() + ":全部執(zhí)行成功,提交事務(wù)");
transactionManager.commit(status);
} else {
log.info(Thread.currentThread().getName() + ":具有線程執(zhí)行失敗,回滾事務(wù)");
transactionManager.rollback(status);
}
}
}
};
}
這樣就可以做到 動態(tài)判斷是否需要統(tǒng)一事務(wù)。
詳細demo可以查看git代碼
git地址:https://github.com/oldguys/flowable-modeler-demo/tree/feature_threadpoolexecutor_no_spring_proxy_transaction