flowable 基于 ThreadPoolExecutor 進行任務(wù)批處理乙埃,統(tǒng)一事務(wù)處理

場景:在普通的開發(fā)的時候,任務(wù)是單線程處理的锯岖,這這時候性能可能有點慢介袜。基于 juc 包下的ThreadPoolExecutor 進行開發(fā)出吹,可以轉(zhuǎn)換成為批處理的遇伞,使性能成倍提高

出現(xiàn)主要的問題:將任務(wù)切割成為子任務(wù)的時候,事務(wù)統(tǒng)一性被破壞捶牢。

環(huán)境
springboot:2.2.0.RELEASE
flowable:6.4.2

git地址:https://github.com/oldguys/flowable-modeler-demo/tree/feature_threadpoolexecutor_no_spring_proxy_transaction

分析步驟:

Step1. ThreadPoolExecutor 的基本用法鸠珠,編寫通用工具類
Step2. 基于面向接口開發(fā)巍耗,進行通用抽象
Step3. 分析spring事務(wù),將基于注解的聲明式事務(wù)渐排,改為編程式事務(wù)
Step4. 使用 變量表示來決定是否使用統(tǒng)一事務(wù)

Step1: ThreadPoolExecutor 簡單用法

基本處理代碼


ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("flow-pool-%d")
                .build();

int corePoolSize = 10;
int maximumPoolSize = 10;
long keepAliveTime = 3;

TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();

ExecutorService executorService =  new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                workQueue,
                namedThreadFactory) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                // 線程處理前置方法
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
               // 線程處理后置方法
            }
};

List<Future<?>> futures = new ArrayList<>();

for (int i = 0; i < group; i++) {

    int startIndex = i * groupSize;
    int endIndex = (i + 1) * groupSize;
    if (endIndex > toDoList.size()) {
         endIndex = toDoList.size();
     }
     List<?> items = toDoList.subList(startIndex, endIndex);
    futures.add(executorService.submit(new SingleTransactionPoolTask(execution, items, flag)));
}

try {
  for (Future<?> future : futures) {
       future.get();
  }
} catch (Exception e) {
    e.printStackTrace();
    // 業(yè)務(wù)操作
} finally {
    executorService.shutdown();
}

  1. 構(gòu)造方法
名稱 類型 含義
corePoolSize int 核心線程池大小
maximumPoolSize int 最大線程池大小
keepAliveTime long 線程最大空閑時間
unit TimeUnit 時間單位
workQueue BlockingQueue<Runnable> 線程等待隊列
threadFactory ThreadFactory 線程創(chuàng)建工廠
  1. ThreadPoolExecutor 重寫方法
方法名 作用
protected void beforeExecute(Thread t, Runnable r) { } 線程處理前置調(diào)用
protected void afterExecute(Runnable r, Throwable t) { } 線程處理后置調(diào)用
protected void terminated() { } 線程處理結(jié)束之后調(diào)用

在進行主線程拆分成多子線程并發(fā)處理的時候炬太,經(jīng)常會遇到部分主線程的數(shù)據(jù)無法在子線程獲取到,此時就可以通過重寫線程池 beforeExecute() 方法驯耻,將主線程數(shù)據(jù)同步到子線程中亲族。如:工作流的Authentication.setAuthenticatedUserId(currentUserId);
基于ThreadLocal 的全局變量設(shè)置

  1. 線程池調(diào)用任務(wù)
    此處為線程池實際處理方法,

ExecutionService.submit(Runnable task);

    /**
     * Submits a Runnable task for execution and returns a Future
     * representing that task. The Future's {@code get} method will
     * return {@code null} upon <em>successful</em> completion.
     *
     * @param task the task to submit
     * @return a Future representing pending completion of the task
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     * @throws NullPointerException if the task is null
     */
    Future<?> submit(Runnable task);

SingleTransactionPoolTask 實現(xiàn) runnable 接口

public class SingleTransactionPoolTask implements Runnable {

    private final ThreadExecution threadExecution;

    private final List<?> list;

    private final BatchTransactionFlag flag;

    public SingleTransactionPoolTask(ThreadExecution threadExecution, List<?> list, BatchTransactionFlag flag) {
        this.threadExecution = threadExecution;
        this.list = list;
        this.flag = flag;
    }

    @Override
    public void run() {
        try {
            threadExecution.threadExecute(list);
        } finally {
            flag.getCompleteThreads().incrementAndGet();
        }
    }
}
  1. 返回線程調(diào)用的 處理方法
    主要進行子線程中是否有異常可缚,如果具有異常則應(yīng)該進行的對應(yīng)業(yè)務(wù)處理
try {
  for (Future<?> future : futures) {
       future.get();
  }
} catch (Exception e) {
    e.printStackTrace();
    // 業(yè)務(wù)操作
} finally {
    executorService.shutdown();
}

Step2: 基于面向接口開發(fā)霎迫,將業(yè)務(wù)操作進行多態(tài)

基本的類關(guān)系圖

ThreadExecution 抽象子任務(wù)接口,具體不同業(yè)務(wù)編寫指定的實現(xiàn)類帘靡,形成多態(tài)知给。通用工具類統(tǒng)一調(diào)用接口

public interface ThreadExecution {

    /**
     *  處理線程任務(wù)
     * @param list
     */
    void threadExecute(List<?> list);
}

SingleTransactionPoolTask 通用任務(wù)實現(xiàn)類,基于 依賴倒置原則 調(diào)用 ThreadExecution

public class SingleTransactionPoolTask implements Runnable {

    private final ThreadExecution threadExecution;

    private final List<?> list;

    private final BatchTransactionFlag flag;

    public SingleTransactionPoolTask(ThreadExecution threadExecution, List<?> list, BatchTransactionFlag flag) {
        this.threadExecution = threadExecution;
        this.list = list;
        this.flag = flag;
    }

    @Override
    public void run() {
        try {
            threadExecution.threadExecute(list);
        } finally {
            flag.getCompleteThreads().incrementAndGet();
        }
    }
}

實現(xiàn) ThreadExecution 接口描姚,進行業(yè)務(wù)多態(tài)
BatchStartProcessThreadExecutionImpl

@Slf4j
public class BatchStartProcessThreadExecutionImpl implements ThreadExecution {

    private RuntimeService runtimeService;

    private List<BatchStartProcessInstanceRsp.ProcessInstanceItem> records;

    public BatchStartProcessThreadExecutionImpl(List<BatchStartProcessInstanceRsp.ProcessInstanceItem> records) {
        this.records = records;
        this.runtimeService = SpringContextUtils.getBean(RuntimeService.class);
    }

    @Override
    public void threadExecute(List list) {
        // 省略業(yè)務(wù)代碼
    }
}

BatchTaskCompleteTaskWithBatchTransactionThreadExecutionImpl

@Slf4j
public class BatchTaskCompleteTaskWithBatchTransactionThreadExecutionImpl implements ThreadExecution {

    private List<BatchCompleteTaskRsp.CompleteTaskItem> result;

    private FlowTaskService flowTaskService;

    public BatchTaskCompleteTaskWithBatchTransactionThreadExecutionImpl(List<BatchCompleteTaskRsp.CompleteTaskItem> result) {
        this.result = result;
        this.flowTaskService = SpringContextUtils.getBean(FlowTaskService.class);
    }

    @Override
    public void threadExecute(List list) {
        // 省略業(yè)務(wù)代碼
    }
}

Step3. 分析spring事務(wù)涩赢,將基于注解的聲明式事務(wù),改為編程式事務(wù)

在進行spring開發(fā)的時候轰胁,基本都是基于spring的聲明式事務(wù)(@Transactional)進行開發(fā)谒主,可以做到非常高效。但是基于多線程開發(fā)的時候赃阀,通過debug霎肯,可以發(fā)現(xiàn),主線程還沒有進行異常處理環(huán)節(jié)榛斯,子線程事務(wù)已經(jīng)提交观游,并且在數(shù)據(jù)庫已經(jīng)可以查詢到。 這個并不滿足于業(yè)務(wù)需求驮俗。(如圖)

dubug看出還未到執(zhí)行回退業(yè)務(wù)操作
子線程事務(wù)已經(jīng)提交懂缕,數(shù)據(jù)庫可以查詢到結(jié)果

基于對Spring事務(wù)bean之間關(guān)系的了解,事務(wù)都是圍繞著 TransactionManager王凑,實現(xiàn)類為:org.springframework.jdbc.datasource.DataSourceTransactionManager搪柑,可以找到接口org.springframework.transaction.PlatformTransactionManager,并且該接口具有如下的方法

PlatformTransactionManager接口的方法

方法名 功能
TransactionStatus getTransaction(@Nullable TransactionDefinition definition) 獲取當前的事務(wù)
void commit(TransactionStatus status) 提交事務(wù)
void rollback(TransactionStatus status) 回滾事務(wù)

所以獲取事務(wù)的代碼則為

// 獲取事務(wù)
TransactionStatus transactionStatus = transactionManager.getTransaction(TransactionDefinition.withDefaults());

// 提交事務(wù)
transactionManager.commit(transactionStatus);

// 回滾事務(wù)
transactionManager.rollback(transactionStatus);

所以原本計劃是:根據(jù)傳入?yún)?shù)索烹,把事務(wù)從子線程中獲取工碾,再回到主線程中提交。不過事與愿違的是百姓,提交事務(wù)并沒有想象中那么直接渊额。會拋出異常

DefaultCommonThreadExecutionServiceBean

@Service
public class DefaultCommonThreadExecutionServiceBean implements CommonThreadExecutionService {

    @Resource
    private DataSourceTransactionManager transactionManager;

    @Override
//    @Transactional(rollbackFor = Exception.class)
    public int executeBatch(ThreadExecution threadExecution, List<?> sequence, List<TransactionStatus> transactionStatusList) {

        TransactionStatus transactionStatus = transactionManager.getTransaction(TransactionDefinition.withDefaults());
        transactionStatusList.add(transactionStatus);

        threadExecution.threadExecute(sequence);

        return 0;
    }
}

FlowThreadPoolExecutor 代碼段

  DataSourceTransactionManager transactionManager = SpringContextUtils.getBean(DataSourceTransactionManager.class);
        try {
            for (Future future : futures) {
                future.get();
            }

            transactionStatusList.forEach(obj -> {
                transactionManager.commit(obj);
            });

        } catch (Exception e) {
            e.printStackTrace();

            transactionStatusList.forEach(obj -> {
                transactionManager.rollback(obj);
            });

        } finally {
            executorService.shutdown();
        }
系統(tǒng)拋出異常
navicat 出現(xiàn)事務(wù)數(shù)據(jù)庫被鎖,無法清除數(shù)據(jù)

根據(jù)spring事務(wù)源碼分析可知,spring的事務(wù)也是基于ThreadLocal的旬迹,所以出現(xiàn)了跨越線程的時候火惊,就會出現(xiàn)無法執(zhí)行完成。并且由navicat無法操作數(shù)據(jù)庫可以看出奔垦,數(shù)據(jù)庫事務(wù)并未提交屹耐,出現(xiàn)了行鎖。

org.springframework.transaction.support.TransactionSynchronizationManager#unbindResource

    /**
     * Unbind a resource for the given key from the current thread.
     * @param key the key to unbind (usually the resource factory)
     * @return the previously bound value (usually the active resource object)
     * @throws IllegalStateException if there is no value bound to the thread
     * @see ResourceTransactionManager#getResourceFactory()
     */
    public static Object unbindResource(Object key) throws IllegalStateException {
        Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
        Object value = doUnbindResource(actualKey);
        if (value == null) {
            throw new IllegalStateException(
                    "No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
        }
        return value;
    }

org.springframework.transaction.support.TransactionSynchronizationManager

獲取線程資源

雖然源碼中可以獲取看到事務(wù)提交代碼是 數(shù)據(jù)庫連接的提交宴倍,但是其中還是必須執(zhí)行 清除當前線程綁定的事務(wù)张症,才能徹底釋放數(shù)據(jù)庫連接。

提交事務(wù):org.springframework.jdbc.datasource.DataSourceTransactionManager#doCommit

    @Override
    protected void doCommit(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        Connection con = txObject.getConnectionHolder().getConnection();
        if (status.isDebug()) {
            logger.debug("Committing JDBC transaction on Connection [" + con + "]");
        }
        try {
            con.commit();
        }
        catch (SQLException ex) {
            throw new TransactionSystemException("Could not commit JDBC transaction", ex);
        }
    }

拋出異常:org.springframework.transaction.support.AbstractPlatformTransactionManager#cleanupAfterCompletion

    /**
     * Clean up after completion, clearing synchronization if necessary,
     * and invoking doCleanupAfterCompletion.
     * @param status object representing the transaction
     * @see #doCleanupAfterCompletion
     */
    private void cleanupAfterCompletion(DefaultTransactionStatus status) {
        status.setCompleted();
        if (status.isNewSynchronization()) {
            TransactionSynchronizationManager.clear();
        }
        if (status.isNewTransaction()) {
            doCleanupAfterCompletion(status.getTransaction());
        }
        if (status.getSuspendedResources() != null) {
            if (status.isDebug()) {
                logger.debug("Resuming suspended transaction after completion of inner transaction");
            }
            Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
            resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
        }
    }

所以最終解決還是需要在子線程進行提交鸵贬,此時俗他,又可以使用線程池的重寫 java.util.concurrent.ThreadPoolExecutor#afterExecute
并且通過變量來確定子線程是否已經(jīng)執(zhí)行完成,如果執(zhí)行完成阔逼,才進行事務(wù)的提交

BatchTransactionFlag

@Getter
public class BatchTransactionFlag {

    private final AtomicInteger completeThreads = new AtomicInteger();

    private final AtomicInteger successThreads = new AtomicInteger();

    private final int groupSize;

    private boolean batchTransaction;

    private Map<Long, TransactionStatus> longTransactionStatusMap;

    private final List<?> toDoList;

    public BatchTransactionFlag(int groupSize, boolean batchTransaction, List<?> toDoList) {
        this.groupSize = groupSize;
        this.batchTransaction = batchTransaction;
        this.toDoList = toDoList;
        if (batchTransaction) {
            longTransactionStatusMap = new ConcurrentHashMap<>();
        }
    }
}

CommonThreadExecutionService實現(xiàn)

@Slf4j
@Service
public class DefaultCommonThreadExecutionServiceBean implements CommonThreadExecutionService {

    @Resource
    private DataSourceTransactionManager transactionManager;

    @Override
    public int executeBatch(ThreadExecution threadExecution, List sequence, Map<Long, TransactionStatus> longTransactionStatusMap, BatchTransactionFlag flag) {

        synchronized (flag) {
            TransactionStatus transactionStatus = transactionManager.getTransaction(TransactionDefinition.withDefaults());
            longTransactionStatusMap.put(Thread.currentThread().getId(), transactionStatus);
            try {
                threadExecution.threadExecute(sequence);
                flag.getSuccessThreads().incrementAndGet();
            } finally {
                flag.getCompleteThreads().incrementAndGet();
                log.info("完成任務(wù):" + Thread.currentThread().getName());
            }
        }
        return 0;
    }
}

經(jīng)過測試發(fā)現(xiàn)兆衅,需要調(diào)用數(shù)據(jù)庫修改的步驟,還是需要同步塊的嗜浮,不使用會導(dǎo)致數(shù)據(jù)庫死鎖羡亩,導(dǎo)致處理超時

Step4. 使用 變量表示來決定是否使用統(tǒng)一事務(wù)

從上面可以看到由于面向接口進行處理,所以根據(jù)需要 統(tǒng)一事務(wù)不需要統(tǒng)一事務(wù) 又可以使用不同實現(xiàn)類來進行控制危融,并且在編寫線程池的時候也配合做判斷畏铆。

線程池執(zhí)行的代碼


        for (int i = 0; i < group; i++) {

            int startIndex = i * groupSize;
            int endIndex = (i + 1) * groupSize;
            if (endIndex > toDoList.size()) {
                endIndex = toDoList.size();
            }
            List<?> items = toDoList.subList(startIndex, endIndex);
            if (batchTransaction) {
                futures.add(executorService.submit(new BatchTransactionPoolTask(execution, items, flag.getLongTransactionStatusMap(), flag)));
            } else {
                futures.add(executorService.submit(new SingleTransactionPoolTask(execution, items, flag)));
            }
        }

線程池的構(gòu)建

private static ThreadPoolExecutor createThreadPoolExecutorInstance(int corePoolSize,
                                                                       int maximumPoolSize,
                                                                       long keepAliveTime,
                                                                       TimeUnit unit,
                                                                       BlockingQueue<Runnable> workQueue,
                                                                       BatchTransactionFlag flag
    ) {

        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("flow-pool-%d")
                .build();

        String currentUserId = SecurityUtils.getCurrentUserId();
        DataSourceTransactionManager transactionManager = SpringContextUtils.getBean(DataSourceTransactionManager.class);


        return new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                workQueue,
                namedThreadFactory) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                Authentication.setAuthenticatedUserId(currentUserId);
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {

                if (flag.isBatchTransaction()) {

                    try {
                        while (flag.getCompleteThreads().get() != flag.getGroupSize()) {
                            log.info(Thread.currentThread().getName() + " 等待主線程:getGroupSize:" + flag.getGroupSize() + "\tgetCompleteThreads:" + flag.getCompleteThreads().get());
                            log.info("開啟事務(wù)個數(shù):" + flag.getLongTransactionStatusMap().size());
                            Thread.sleep(1000);
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    TransactionStatus status = flag.getLongTransactionStatusMap().get(Thread.currentThread().getId());
                    if (flag.getSuccessThreads().get() == flag.getCompleteThreads().get()) {
                        log.info(Thread.currentThread().getName() + ":全部執(zhí)行成功,提交事務(wù)");
                        transactionManager.commit(status);
                    } else {
                        log.info(Thread.currentThread().getName() + ":具有線程執(zhí)行失敗,回滾事務(wù)");
                        transactionManager.rollback(status);
                    }
                }
            }
        };
    }

這樣就可以做到 動態(tài)判斷是否需要統(tǒng)一事務(wù)。
詳細demo可以查看git代碼
git地址:https://github.com/oldguys/flowable-modeler-demo/tree/feature_threadpoolexecutor_no_spring_proxy_transaction

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末吉殃,一起剝皮案震驚了整個濱河市辞居,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌蛋勺,老刑警劉巖瓦灶,帶你破解...
    沈念sama閱讀 218,204評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異抱完,居然都是意外死亡贼陶,警方通過查閱死者的電腦和手機巧娱,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評論 3 395
  • 文/潘曉璐 我一進店門碉怔,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人禁添,你說我怎么就攤上這事眨层。” “怎么了上荡?”我有些...
    開封第一講書人閱讀 164,548評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經(jīng)常有香客問我酪捡,道長叁征,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,657評論 1 293
  • 正文 為了忘掉前任逛薇,我火速辦了婚禮捺疼,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘永罚。我一直安慰自己啤呼,他們只是感情好,可當我...
    茶點故事閱讀 67,689評論 6 392
  • 文/花漫 我一把揭開白布呢袱。 她就那樣靜靜地躺著官扣,像睡著了一般。 火紅的嫁衣襯著肌膚如雪羞福。 梳的紋絲不亂的頭發(fā)上惕蹄,一...
    開封第一講書人閱讀 51,554評論 1 305
  • 那天,我揣著相機與錄音治专,去河邊找鬼卖陵。 笑死,一個胖子當著我的面吹牛张峰,可吹牛的內(nèi)容都是我干的泪蔫。 我是一名探鬼主播,決...
    沈念sama閱讀 40,302評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼喘批,長吁一口氣:“原來是場噩夢啊……” “哼撩荣!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起谤祖,我...
    開封第一講書人閱讀 39,216評論 0 276
  • 序言:老撾萬榮一對情侶失蹤婿滓,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后粥喜,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體凸主,經(jīng)...
    沈念sama閱讀 45,661評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,851評論 3 336
  • 正文 我和宋清朗相戀三年额湘,在試婚紗的時候發(fā)現(xiàn)自己被綠了卿吐。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,977評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡锋华,死狀恐怖嗡官,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情毯焕,我是刑警寧澤衍腥,帶...
    沈念sama閱讀 35,697評論 5 347
  • 正文 年R本政府宣布磺樱,位于F島的核電站,受9級特大地震影響婆咸,放射性物質(zhì)發(fā)生泄漏竹捉。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,306評論 3 330
  • 文/蒙蒙 一尚骄、第九天 我趴在偏房一處隱蔽的房頂上張望块差。 院中可真熱鬧,春花似錦倔丈、人聲如沸憨闰。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽鹉动。三九已至,卻和暖如春警儒,著一層夾襖步出監(jiān)牢的瞬間训裆,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評論 1 270
  • 我被黑心中介騙來泰國打工蜀铲, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留边琉,地道東北人。 一個月前我還...
    沈念sama閱讀 48,138評論 3 370
  • 正文 我出身青樓记劝,卻偏偏與公主長得像变姨,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子厌丑,可洞房花燭夜當晚...
    茶點故事閱讀 44,927評論 2 355

推薦閱讀更多精彩內(nèi)容