編寫一個抽象類
import com.alibaba.druid.util.StringUtils;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
import com.iflytek.epdcloud.mhk.mark.common.entity.MachineEvaluate;
import com.iflytek.epdcloud.mhk.mark.common.enumeration.ResponseEnumeration;
import com.iflytek.epdcloud.mhk.mark.web.exception.BusinessException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.multipart.MultipartFile;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@Slf4j
public abstract class? FileUploadService {
public T t;
? ? //默認一批是1000個
? ? public static final IntegerbatchSizeIndex =1000;
? ? public static final IntegerThreadNumIndex =15;
? ? /**
? ? * 上傳文件
? ? * @param file
? ? * @param fileRunnable
? ? * @param batchSize
? ? * @param threadNum
? ? * @param list
? ? * @param args
? ? */
? public void uploadFile(MultipartFile file,FileRunnable fileRunnable,Integer batchSize,Integer threadNum,ArrayList list,Object ... args){
analyse(file,fileRunnable,batchSize,threadNum,list,args);
? }
/**
? ? * 解析文件
? ? * @param file
? ? * @param fileRunnable
? ? * @param batchSize
? ? * @param threadNum
? ? * @param list
? ? * @param args
? ? */
? ? ? public? void analyse(MultipartFile file,FileRunnable fileRunnable,Integer batchSize,Integer threadNum,ArrayList list,Object... args){
AtomicInteger atomicInteger =new AtomicInteger(0);
? ? ? ? HashMap map =new HashMap<>();
? ? ? ? BufferedReader bufferedReader =null;
? ? ? ? InputStreamReader inputStreamReader =null;
? ? ? ? InputStream is=null;
? ? ? ? try {
is = file.getInputStream();
? ? ? ? ? ? inputStreamReader =new InputStreamReader(is);
? ? ? ? ? ? bufferedReader =new BufferedReader(inputStreamReader);
? ? ? ? ? ? String line =null;
? ? ? ? ? ? while((line= bufferedReader.readLine())!=null) {
String[] array = line.split("\t");
? ? ? ? ? ? ? ? List collect = Arrays.stream(array).filter(s12 ->
!StringUtils.isEmpty(s12)
).collect(Collectors.toList());
? ? ? ? ? ? ? ? if((!CollectionUtils.isEmpty(collect))){
atomicInteger.incrementAndGet();
? ? ? ? ? ? ? ? ? ? modify(collect,list,args);
? ? ? ? ? ? ? ? }else {
log.error("第{}行的數(shù)據(jù)格式有誤",atomicInteger.toString());
? ? ? ? ? ? ? ? }
}
bufferedReader.close();
? ? ? ? }catch (Exception e){
log.error("文件解析失敗:{}",e.getMessage());
? ? ? ? ? ? throw? new BusinessException(ResponseEnumeration.MACHINE_FILE_ANALYSE_ERROR);
? ? ? ? }finally {
try {
if (bufferedReader !=null) {
bufferedReader.close();
? ? ? ? ? ? ? ? }
if (inputStreamReader !=null) {
inputStreamReader.close();
? ? ? ? ? ? ? ? }
}catch (IOException e) {
throw? new BusinessException(ResponseEnumeration.MACHINE_FILE_ANALYSE_ERROR);
? ? ? ? ? ? }
}
batchSave(list, batchSize, fileRunnable,threadNum);
? ? }
/**
? ? * 批量添加數(shù)據(jù)到數(shù)據(jù)庫
? ? * @param modifyList
? ? * @param batchSize
? ? * @param runnable
? ? * @param threadNum
? ? */
? ? public void batchSave(List modifyList, Integer batchSize, FileRunnable runnable, Integer threadNum) {
if(CollectionUtils.isEmpty(modifyList)){
log.info("沒有數(shù)據(jù)要添加數(shù)據(jù)庫");
return;
? ? ? ? }
if(ObjectUtils.isEmpty(batchSize))
batchSize =batchSizeIndex;
? ? ? ? if(ObjectUtils.isEmpty(threadNum))
threadNum =ThreadNumIndex;
? ? ? ? int batchNum = modifyList.size() / batchSize;
? ? ? ? CountDownLatch countDownLatch =new CountDownLatch(batchNum+1);
? ? ? ? LinkedBlockingQueue runnables =new LinkedBlockingQueue<>(Integer.MAX_VALUE);
? ? ? ? ThreadPoolExecutor threadPoolExecutor =new ThreadPoolExecutor(threadNum,
? ? ? ? ? ? ? ? threadNum,
? ? ? ? ? ? ? ? 20,
? ? ? ? ? ? ? ? TimeUnit.SECONDS,
? ? ? ? ? ? ? ? runnables,new BatchSaveThreadFactory());
? ? ? ? List newList =null;
? ? ? ? for (int i=1;i<=batchNum+1;i++
) {
if(i!=batchNum+1){
newList = modifyList.subList((i-1)*batchSize,i*batchSize);
? ? ? ? ? ? ? ? FileRunnable fileRunnable = runnable.newInstance();
? ? ? ? ? ? ? ? fileRunnable.setCountDownLatch(countDownLatch);
? ? ? ? ? ? ? ? fileRunnable.setList(newList);
? ? ? ? ? ? ? ? threadPoolExecutor.execute(fileRunnable);
? ? ? ? ? ? }else {
newList = modifyList.subList((i-1)*batchSize,modifyList.size());
? ? ? ? ? ? ? ? FileRunnable fileRunnable = runnable.newInstance();
? ? ? ? ? ? ? ? fileRunnable.setCountDownLatch(countDownLatch);
? ? ? ? ? ? ? ? fileRunnable.setList(newList);
? ? ? ? ? ? ? ? threadPoolExecutor.execute(fileRunnable);
? ? ? ? ? ? }
}
try {
countDownLatch.await();
? ? ? ? }catch (InterruptedException e) {
log.error("文件落盤失敗:{}",e.getMessage());
? ? ? ? ? ? throw new BusinessException(ResponseEnumeration.MACHINE_FILE_SAVE_ERROR);
? ? ? ? }
threadPoolExecutor.shutdown();
? ? }
/**
? ? * 解析之后每行數(shù)據(jù)的處理
? ? * @param collect
? ? * @param list
? ? * @param args
? ? */
? ? public abstract? void modify(List collect,List list,Object... args);
}
具體每個線程執(zhí)行的業(yè)務(wù)邏輯
在落盤時要根據(jù)業(yè)務(wù)場景選擇是否去重
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
public? abstract class FileRunnableimplements Runnable{
private CountDownLatchcountDownLatch;
? ? private Listlist;
? ? public abstract FileRunnablenewInstance();
? ? @Override
? ? public void run() {
long l = System.currentTimeMillis();
? ? ? ? doSave(list);
? ? ? ? countDownLatch.countDown();
? ? ? ? System.out.println("耗時"+(System.currentTimeMillis()-l));
? ? }
public abstract void doSave(List list);
? ? public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
? ? }
public void setList(List list) {
this.list = list;
? ? }
}
繼承抽象類進行操作
import com.alibaba.druid.util.StringUtils;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.iflytek.epdcloud.mhk.mark.common.entity.MachineEvaluate;
import com.iflytek.epdcloud.mhk.mark.common.enumeration.MachineEvaluateStatusEnumeration;
import com.iflytek.epdcloud.mhk.mark.common.enumeration.ResponseEnumeration;
import com.iflytek.epdcloud.mhk.mark.common.service.IMachineEvaluateService;
import com.iflytek.epdcloud.mhk.mark.common.utils.SnowflakeIdWorker;
import com.iflytek.epdcloud.mhk.mark.web.buiness.MachineEvaluateBusiness;
import com.iflytek.epdcloud.mhk.mark.web.exception.BusinessException;
import com.iflytek.epdcloud.mhk.mark.web.executor.BatchSaveThreadFactory;
import com.iflytek.epdcloud.mhk.mark.web.executor.FileUploadService;
import com.iflytek.epdcloud.mhk.mark.web.executor.MachineEvalutteTask;
import com.iflytek.epdcloud.mhk.mark.web.model.param.MachineEvaluateDto;
import com.iflytek.epdcloud.mhk.mark.web.model.vo.MachineEvaluateVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;
import java.util.*;
/**
*?
*/
@Slf4j
@Service
public class MachineEvaluateBusinessImplextends FileUploadServiceimplements MachineEvaluateBusiness? {
@Autowired
? ? private IMachineEvaluateServiceiMachineEvaluateService;
? ? @Transactional
@Override
? ? public void upload(MachineEvaluateDto dto)? {
HashMap map =new HashMap<>();
? ? ? ? ArrayList machineEvaluateArrayList =new ArrayList<>();
? ? ? ? MachineEvalutteTask machineEvalutteTask =new MachineEvalutteTask(iMachineEvaluateService, machineEvaluateArrayList);
? ? ? ? uploadFile(dto.getFile(),machineEvalutteTask,null ,null,machineEvaluateArrayList,dto.getQuestionTypeId(),map);
? ? }
@Override
? ? public MachineEvaluateVogetUploadResult(Long questionTypeId) {
MachineEvaluateVo machineEvaluateVo =new MachineEvaluateVo();
? ? ? ? Long total =iMachineEvaluateService.getTotal(questionTypeId,null);
? ? ? ? Long validTotal =iMachineEvaluateService.getTotal(questionTypeId,MachineEvaluateStatusEnumeration.EVALUATE_ENTERED.getCode());
? ? ? ? machineEvaluateVo.setUploadTotal(total);
? ? ? ? machineEvaluateVo.setValidTotal(validTotal);
? ? return machineEvaluateVo;
? ? }
@Override
? ? public void modify(List collect,List list, Object... args) {
{
while((collect.size()>=2)&&(collect.size()%2==0))
{
List strings = collect.subList(0, 2);
? ? ? ? ? ? ? ? String s1 = strings.get(0);
? ? ? ? ? ? ? ? String s2 = strings.get(1);
? ? ? ? ? ? ? ? Long questionTypeId = (Long) args[0];
? ? ? ? ? ? ? ? Map map = (Map) args[1];
? ? ? ? ? ? ? ? strings.clear();
? ? ? ? ? ? ? ? String s3 = map.get(s1+questionTypeId);
? ? ? ? ? ? ? ? if(!StringUtils.isEmpty(s3))
break;
? ? ? ? ? ? ? ? map.put(s1+questionTypeId,s2);
? ? ? ? ? ? ? ? MachineEvaluate machineEvaluate =new MachineEvaluate();
? ? ? ? ? ? ? ? machineEvaluate.setExamId(s1);
? ? ? ? ? ? ? ? machineEvaluate.setQuestionTypeId(questionTypeId);
? ? ? ? ? ? ? ? machineEvaluate.setScore(s2);
? ? ? ? ? ? ? ? machineEvaluate.setStatus(MachineEvaluateStatusEnumeration.EVALUATE_NOT_ENTER.getCode());
? ? ? ? ? ? ? ? machineEvaluate.setId(SnowflakeIdWorker.generateId());
? ? ? ? ? ? ? ? list.add(machineEvaluate);
? ? ? ? ? ? }
}
}
}