上傳文件多線程批量落盤

編寫一個抽象類

import com.alibaba.druid.util.StringUtils;

import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;

import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;

import com.iflytek.epdcloud.mhk.mark.common.entity.MachineEvaluate;

import com.iflytek.epdcloud.mhk.mark.common.enumeration.ResponseEnumeration;

import com.iflytek.epdcloud.mhk.mark.web.exception.BusinessException;

import lombok.extern.slf4j.Slf4j;

import org.springframework.web.multipart.MultipartFile;

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStream;

import java.io.InputStreamReader;

import java.util.ArrayList;

import java.util.Arrays;

import java.util.HashMap;

import java.util.List;

import java.util.concurrent.*;

import java.util.concurrent.atomic.AtomicInteger;

import java.util.stream.Collectors;

@Slf4j

public abstract class? FileUploadService {

public T t;

? ? //默認一批是1000個

? ? public static final IntegerbatchSizeIndex =1000;

? ? public static final IntegerThreadNumIndex =15;

? ? /**

? ? * 上傳文件

? ? * @param file

? ? * @param fileRunnable

? ? * @param batchSize

? ? * @param threadNum

? ? * @param list

? ? * @param args

? ? */

? public void uploadFile(MultipartFile file,FileRunnable fileRunnable,Integer batchSize,Integer threadNum,ArrayList list,Object ... args){

analyse(file,fileRunnable,batchSize,threadNum,list,args);

? }

/**

? ? * 解析文件

? ? * @param file

? ? * @param fileRunnable

? ? * @param batchSize

? ? * @param threadNum

? ? * @param list

? ? * @param args

? ? */

? ? ? public? void analyse(MultipartFile file,FileRunnable fileRunnable,Integer batchSize,Integer threadNum,ArrayList list,Object... args){

AtomicInteger atomicInteger =new AtomicInteger(0);

? ? ? ? HashMap map =new HashMap<>();

? ? ? ? BufferedReader bufferedReader =null;

? ? ? ? InputStreamReader inputStreamReader =null;

? ? ? ? InputStream is=null;

? ? ? ? try {

is = file.getInputStream();

? ? ? ? ? ? inputStreamReader =new InputStreamReader(is);

? ? ? ? ? ? bufferedReader =new BufferedReader(inputStreamReader);

? ? ? ? ? ? String line =null;

? ? ? ? ? ? while((line= bufferedReader.readLine())!=null) {

String[] array = line.split("\t");

? ? ? ? ? ? ? ? List collect = Arrays.stream(array).filter(s12 ->

!StringUtils.isEmpty(s12)

).collect(Collectors.toList());

? ? ? ? ? ? ? ? if((!CollectionUtils.isEmpty(collect))){

atomicInteger.incrementAndGet();

? ? ? ? ? ? ? ? ? ? modify(collect,list,args);

? ? ? ? ? ? ? ? }else {

log.error("第{}行的數(shù)據(jù)格式有誤",atomicInteger.toString());

? ? ? ? ? ? ? ? }

}

bufferedReader.close();

? ? ? ? }catch (Exception e){

log.error("文件解析失敗:{}",e.getMessage());

? ? ? ? ? ? throw? new BusinessException(ResponseEnumeration.MACHINE_FILE_ANALYSE_ERROR);

? ? ? ? }finally {

try {

if (bufferedReader !=null) {

bufferedReader.close();

? ? ? ? ? ? ? ? }

if (inputStreamReader !=null) {

inputStreamReader.close();

? ? ? ? ? ? ? ? }

}catch (IOException e) {

throw? new BusinessException(ResponseEnumeration.MACHINE_FILE_ANALYSE_ERROR);

? ? ? ? ? ? }

}

batchSave(list, batchSize, fileRunnable,threadNum);

? ? }

/**

? ? * 批量添加數(shù)據(jù)到數(shù)據(jù)庫

? ? * @param modifyList

? ? * @param batchSize

? ? * @param runnable

? ? * @param threadNum

? ? */

? ? public void batchSave(List modifyList, Integer batchSize, FileRunnable runnable, Integer threadNum) {

if(CollectionUtils.isEmpty(modifyList)){

log.info("沒有數(shù)據(jù)要添加數(shù)據(jù)庫");

return;

? ? ? ? }

if(ObjectUtils.isEmpty(batchSize))

batchSize =batchSizeIndex;

? ? ? ? if(ObjectUtils.isEmpty(threadNum))

threadNum =ThreadNumIndex;

? ? ? ? int batchNum = modifyList.size() / batchSize;

? ? ? ? CountDownLatch countDownLatch =new CountDownLatch(batchNum+1);

? ? ? ? LinkedBlockingQueue runnables =new LinkedBlockingQueue<>(Integer.MAX_VALUE);

? ? ? ? ThreadPoolExecutor threadPoolExecutor =new ThreadPoolExecutor(threadNum,

? ? ? ? ? ? ? ? threadNum,

? ? ? ? ? ? ? ? 20,

? ? ? ? ? ? ? ? TimeUnit.SECONDS,

? ? ? ? ? ? ? ? runnables,new BatchSaveThreadFactory());

? ? ? ? List newList =null;

? ? ? ? for (int i=1;i<=batchNum+1;i++

) {

if(i!=batchNum+1){

newList = modifyList.subList((i-1)*batchSize,i*batchSize);

? ? ? ? ? ? ? ? FileRunnable fileRunnable = runnable.newInstance();

? ? ? ? ? ? ? ? fileRunnable.setCountDownLatch(countDownLatch);

? ? ? ? ? ? ? ? fileRunnable.setList(newList);

? ? ? ? ? ? ? ? threadPoolExecutor.execute(fileRunnable);

? ? ? ? ? ? }else {

newList = modifyList.subList((i-1)*batchSize,modifyList.size());

? ? ? ? ? ? ? ? FileRunnable fileRunnable = runnable.newInstance();

? ? ? ? ? ? ? ? fileRunnable.setCountDownLatch(countDownLatch);

? ? ? ? ? ? ? ? fileRunnable.setList(newList);

? ? ? ? ? ? ? ? threadPoolExecutor.execute(fileRunnable);

? ? ? ? ? ? }

}

try {

countDownLatch.await();

? ? ? ? }catch (InterruptedException e) {

log.error("文件落盤失敗:{}",e.getMessage());

? ? ? ? ? ? throw new BusinessException(ResponseEnumeration.MACHINE_FILE_SAVE_ERROR);

? ? ? ? }

threadPoolExecutor.shutdown();

? ? }

/**

? ? * 解析之后每行數(shù)據(jù)的處理

? ? * @param collect

? ? * @param list

? ? * @param args

? ? */

? ? public abstract? void modify(List collect,List list,Object... args);

}


具體每個線程執(zhí)行的業(yè)務(wù)邏輯

在落盤時要根據(jù)業(yè)務(wù)場景選擇是否去重

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.CopyOnWriteArrayList;

import java.util.concurrent.CountDownLatch;

public? abstract class FileRunnableimplements Runnable{

private CountDownLatchcountDownLatch;

? ? private Listlist;

? ? public abstract FileRunnablenewInstance();

? ? @Override

? ? public void run() {

long l = System.currentTimeMillis();

? ? ? ? doSave(list);

? ? ? ? countDownLatch.countDown();

? ? ? ? System.out.println("耗時"+(System.currentTimeMillis()-l));

? ? }

public abstract void doSave(List list);

? ? public void setCountDownLatch(CountDownLatch countDownLatch) {

this.countDownLatch = countDownLatch;

? ? }

public void setList(List list) {

this.list = list;

? ? }

}


繼承抽象類進行操作

import com.alibaba.druid.util.StringUtils;

import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;

import com.iflytek.epdcloud.mhk.mark.common.entity.MachineEvaluate;

import com.iflytek.epdcloud.mhk.mark.common.enumeration.MachineEvaluateStatusEnumeration;

import com.iflytek.epdcloud.mhk.mark.common.enumeration.ResponseEnumeration;

import com.iflytek.epdcloud.mhk.mark.common.service.IMachineEvaluateService;

import com.iflytek.epdcloud.mhk.mark.common.utils.SnowflakeIdWorker;

import com.iflytek.epdcloud.mhk.mark.web.buiness.MachineEvaluateBusiness;

import com.iflytek.epdcloud.mhk.mark.web.exception.BusinessException;

import com.iflytek.epdcloud.mhk.mark.web.executor.BatchSaveThreadFactory;

import com.iflytek.epdcloud.mhk.mark.web.executor.FileUploadService;

import com.iflytek.epdcloud.mhk.mark.web.executor.MachineEvalutteTask;

import com.iflytek.epdcloud.mhk.mark.web.model.param.MachineEvaluateDto;

import com.iflytek.epdcloud.mhk.mark.web.model.vo.MachineEvaluateVo;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

import org.springframework.transaction.annotation.Transactional;

import org.springframework.web.multipart.MultipartFile;

import java.util.*;

/**

*?

*/

@Slf4j

@Service

public class MachineEvaluateBusinessImplextends FileUploadServiceimplements MachineEvaluateBusiness? {

@Autowired

? ? private IMachineEvaluateServiceiMachineEvaluateService;

? ? @Transactional

@Override

? ? public void upload(MachineEvaluateDto dto)? {

HashMap map =new HashMap<>();

? ? ? ? ArrayList machineEvaluateArrayList =new ArrayList<>();

? ? ? ? MachineEvalutteTask machineEvalutteTask =new MachineEvalutteTask(iMachineEvaluateService, machineEvaluateArrayList);

? ? ? ? uploadFile(dto.getFile(),machineEvalutteTask,null ,null,machineEvaluateArrayList,dto.getQuestionTypeId(),map);

? ? }

@Override

? ? public MachineEvaluateVogetUploadResult(Long questionTypeId) {

MachineEvaluateVo machineEvaluateVo =new MachineEvaluateVo();

? ? ? ? Long total =iMachineEvaluateService.getTotal(questionTypeId,null);

? ? ? ? Long validTotal =iMachineEvaluateService.getTotal(questionTypeId,MachineEvaluateStatusEnumeration.EVALUATE_ENTERED.getCode());

? ? ? ? machineEvaluateVo.setUploadTotal(total);

? ? ? ? machineEvaluateVo.setValidTotal(validTotal);

? ? return machineEvaluateVo;

? ? }

@Override

? ? public void modify(List collect,List list, Object... args) {

{

while((collect.size()>=2)&&(collect.size()%2==0))

{

List strings = collect.subList(0, 2);

? ? ? ? ? ? ? ? String s1 = strings.get(0);

? ? ? ? ? ? ? ? String s2 = strings.get(1);

? ? ? ? ? ? ? ? Long questionTypeId = (Long) args[0];

? ? ? ? ? ? ? ? Map map = (Map) args[1];

? ? ? ? ? ? ? ? strings.clear();

? ? ? ? ? ? ? ? String s3 = map.get(s1+questionTypeId);

? ? ? ? ? ? ? ? if(!StringUtils.isEmpty(s3))

break;

? ? ? ? ? ? ? ? map.put(s1+questionTypeId,s2);

? ? ? ? ? ? ? ? MachineEvaluate machineEvaluate =new MachineEvaluate();

? ? ? ? ? ? ? ? machineEvaluate.setExamId(s1);

? ? ? ? ? ? ? ? machineEvaluate.setQuestionTypeId(questionTypeId);

? ? ? ? ? ? ? ? machineEvaluate.setScore(s2);

? ? ? ? ? ? ? ? machineEvaluate.setStatus(MachineEvaluateStatusEnumeration.EVALUATE_NOT_ENTER.getCode());

? ? ? ? ? ? ? ? machineEvaluate.setId(SnowflakeIdWorker.generateId());

? ? ? ? ? ? ? ? list.add(machineEvaluate);

? ? ? ? ? ? }

}

}

}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子枣申,更是在濱河造成了極大的恐慌猖闪,老刑警劉巖趟妥,帶你破解...
    沈念sama閱讀 222,104評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件棍弄,死亡現(xiàn)場離奇詭異檬嘀,居然都是意外死亡脊阴,警方通過查閱死者的電腦和手機握侧,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來嘿期,“玉大人品擎,你說我怎么就攤上這事”感欤” “怎么了萄传?”我有些...
    開封第一講書人閱讀 168,697評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經(jīng)常有香客問我秀菱,道長振诬,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,836評論 1 298
  • 正文 為了忘掉前任衍菱,我火速辦了婚禮赶么,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘脊串。我一直安慰自己辫呻,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 68,851評論 6 397
  • 文/花漫 我一把揭開白布琼锋。 她就那樣靜靜地躺著放闺,像睡著了一般。 火紅的嫁衣襯著肌膚如雪缕坎。 梳的紋絲不亂的頭發(fā)上怖侦,一...
    開封第一講書人閱讀 52,441評論 1 310
  • 那天,我揣著相機與錄音念赶,去河邊找鬼础钠。 笑死,一個胖子當(dāng)著我的面吹牛叉谜,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播踩萎,決...
    沈念sama閱讀 40,992評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼停局,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了香府?” 一聲冷哼從身側(cè)響起董栽,我...
    開封第一講書人閱讀 39,899評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎企孩,沒想到半個月后锭碳,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,457評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡勿璃,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,529評論 3 341
  • 正文 我和宋清朗相戀三年擒抛,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片补疑。...
    茶點故事閱讀 40,664評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡歧沪,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出莲组,到底是詐尸還是另有隱情诊胞,我是刑警寧澤,帶...
    沈念sama閱讀 36,346評論 5 350
  • 正文 年R本政府宣布锹杈,位于F島的核電站撵孤,受9級特大地震影響迈着,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜邪码,卻給世界環(huán)境...
    茶點故事閱讀 42,025評論 3 334
  • 文/蒙蒙 一裕菠、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧霞扬,春花似錦糕韧、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,511評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至斧拍,卻和暖如春雀扶,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背肆汹。 一陣腳步聲響...
    開封第一講書人閱讀 33,611評論 1 272
  • 我被黑心中介騙來泰國打工愚墓, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人昂勉。 一個月前我還...
    沈念sama閱讀 49,081評論 3 377
  • 正文 我出身青樓浪册,卻偏偏與公主長得像,于是被迫代替她去往敵國和親岗照。 傳聞我的和親對象是個殘疾皇子村象,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,675評論 2 359

推薦閱讀更多精彩內(nèi)容