OSS文件分片上傳
依賴
<!-- https://mvnrepository.com/artifact/com.aliyun.oss/aliyun-sdk-oss -->
<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<version>3.8.1</version>
</dependency>
基礎(chǔ)參數(shù)dto
/**
* @author WJL
*/
@Data
@Builder
public class OssParamDTO {
private String endpoint;
private String accessKeyId;
private String accessKeySecret;
private String bucketName;
private String folder;
/**
* objectName = folder + fileName
*/
private String objectName;
/**
* 上傳線程
*/
private Integer task;
/**
* 每個(gè)線程處理大小 分片大小
*/
private Integer number;
}
具體上傳方法
小文件上傳
public static PutObjectResult uploadFile(OssParamDTO ossParamDTO, InputStream inputStream){
// 創(chuàng)建OSSClient實(shí)例绎速。
OSS ossClient = new OSSClientBuilder().build(ossParamDTO.getEndpoint(), ossParamDTO.getAccessKeyId(), ossParamDTO.getAccessKeySecret());
PutObjectResult putObjectResult = null;
// 上傳文件流奖蔓。
try {
putObjectResult = ossClient.putObject(ossParamDTO.getBucketName(), ossParamDTO.getObjectName(), inputStream);
//權(quán)限設(shè)置
ossClient.setBucketAcl(ossParamDTO.getBucketName(), CannedAccessControlList.PublicRead);
} catch (Exception e) {
e.printStackTrace();
}finally {
// 關(guān)閉OSSClient儒老。
ossClient.shutdown();
}
return putObjectResult;
}
大文件上傳尺碰,分片oss自己處理
處理邏輯:前段輪訓(xùn)查詢數(shù)據(jù)庫某個(gè)字段德澈,當(dāng)該字段被回調(diào)接口更新時(shí)結(jié)束輪訓(xùn)歇攻,上傳完成
public static void uploadBigFile(OssParamDTO ossParamDTO,String path, File file,Long fileId) throws Throwable {
System.out.println("上傳時(shí)間:"+System.currentTimeMillis());
// Endpoint以杭州為例,其它Region請(qǐng)按實(shí)際情況填寫梆造。
String endpoint = ossParamDTO.getEndpoint();
// 阿里云主賬號(hào)AccessKey擁有所有API的訪問權(quán)限缴守,風(fēng)險(xiǎn)很高。強(qiáng)烈建議您創(chuàng)建并使用RAM賬號(hào)進(jìn)行API訪問或日常運(yùn)維镇辉,請(qǐng)登錄 https://ram.console.aliyun.com 創(chuàng)建RAM賬號(hào)屡穗。
String accessKeyId = ossParamDTO.getAccessKeyId();
String accessKeySecret = ossParamDTO.getAccessKeySecret();
// 創(chuàng)建OSSClient實(shí)例。
OSS ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);
ObjectMetadata meta = new ObjectMetadata();
// 指定上傳的內(nèi)容類型忽肛。
meta.setContentType("text/plain");
// 通過UploadFileRequest設(shè)置多個(gè)參數(shù)村砂。
UploadFileRequest uploadFileRequest = new UploadFileRequest(ossParamDTO.getBucketName(),ossParamDTO.getObjectName());
// 指定上傳的本地文件。
uploadFileRequest.setUploadFile(path);
// 指定上傳并發(fā)線程數(shù)屹逛,默認(rèn)為1础废。
uploadFileRequest.setTaskNum(ossParamDTO.getTask());
// 指定上傳的分片大小汛骂,范圍為100KB~5GB,默認(rèn)為文件大小/10000评腺。
uploadFileRequest.setPartSize(ossParamDTO.getNumber() * 1024 * 1024);
uploadFileRequest.setObjectMetadata(meta);
// 設(shè)置上傳成功回調(diào)帘瞭,參數(shù)為Callback類型。
Callback callback = new Callback();
callback.setCalbackBodyType(Callback.CalbackBodyType.URL);
//回調(diào)參數(shù) --- 同同步到數(shù)據(jù)庫
callback.setCallbackBody("fileId="+fileId+"&fileName=${object}&uploadStatus=1");
//回調(diào)接口(自己服務(wù)器接口蒿讥,可供外網(wǎng)訪問)
callback.setCallbackUrl("http://3m8wv2.natappfree.cc/web/common/callBack");
uploadFileRequest.setCallback(callback);
// 斷點(diǎn)續(xù)傳上傳蝶念。
ossClient.uploadFile(uploadFileRequest);
//權(quán)限設(shè)置
ossClient.setBucketAcl(ossParamDTO.getBucketName(), CannedAccessControlList.PublicRead);
// 關(guān)閉OSSClient。
ossClient.shutdown();
}
大文件本地分片芋绸,多線程執(zhí)行分片上傳媒殉,再合并碎片
分片上傳代碼
PartETag getUploadPartETag(String objectName, String bucketName, String uploadId,
InputStream instream, Long curPartSize,Integer partNum,
OSS ossClient, CountDownLatch countDownLatch){
long before = System.currentTimeMillis();
UploadPartRequest uploadPartRequest = null;
try {
log.debug("分片文件上傳線程: {}",Thread.currentThread().getName());
uploadPartRequest = new UploadPartRequest();
uploadPartRequest.setBucketName(bucketName);
uploadPartRequest.setKey(objectName);
uploadPartRequest.setUploadId(uploadId);
uploadPartRequest.setInputStream(instream);
// 設(shè)置分片大小。除了最后一個(gè)分片沒有大小限制侥钳,其他的分片最小為100KB适袜。
uploadPartRequest.setPartSize(curPartSize);
// 設(shè)置分片號(hào)。每一個(gè)上傳的分片都有一個(gè)分片號(hào)舷夺,取值范圍是1~10000苦酱,如果超出這個(gè)范圍,OSS將返回InvalidArgument的錯(cuò)誤碼给猾。
uploadPartRequest.setPartNumber(partNum);
// 每個(gè)分片不需要按順序上傳疫萤,甚至可以在不同客戶端上傳,OSS會(huì)按照分片號(hào)排序組成完整的文件敢伸。
UploadPartResult uploadPartResult = ossClient.uploadPart(uploadPartRequest);
// 每次上傳分片之后扯饶,OSS的返回結(jié)果會(huì)包含一個(gè)PartETag。PartETag將被保存到partETags中池颈。
log.debug("getPartETag ::{}" ,uploadPartResult.getPartETag().getETag());
return uploadPartResult.getPartETag();
}finally {
countDownLatch.countDown();
log.debug("線程: {} 執(zhí)行完畢, 等待線程數(shù) :{}, 消耗時(shí)間: {}",
Thread.currentThread().getName(),countDownLatch.getCount(),
((System.currentTimeMillis()-before)/1000)+"s");
}
}
外部分片代碼
@Qualifier("taskExecutor")
@Autowired
ThreadPoolTaskExecutor taskExecutor;
/**
* 上傳
* @param ossParamDTO
* @param multipartFile
* @return
*/
public CompleteMultipartUploadResult uploadBigFileForProd(OssParamDTO ossParamDTO, MultipartFile multipartFile){
Long before = System.currentTimeMillis();
// Endpoint以杭州為例尾序,其它Region請(qǐng)按實(shí)際情況填寫。
String endpoint = ossParamDTO.getEndpoint();
// 阿里云主賬號(hào)AccessKey擁有所有API的訪問權(quán)限躯砰,風(fēng)險(xiǎn)很高每币。強(qiáng)烈建議您創(chuàng)建并使用RAM賬號(hào)進(jìn)行API訪問或日常運(yùn)維,請(qǐng)登錄 https://ram.console.aliyun.com 創(chuàng)建RAM賬號(hào)琢歇。
String accessKeyId = ossParamDTO.getAccessKeyId();
String accessKeySecret = ossParamDTO.getAccessKeySecret();
String bucketName = ossParamDTO.getBucketName();
// <yourObjectName>表示上傳文件到OSS時(shí)需要指定包含文件后綴在內(nèi)的完整路徑兰怠,例如abc/efg/123.jpg。
String objectName = ossParamDTO.getObjectName();
// 創(chuàng)建OSSClient實(shí)例李茫。
OSS ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);
// 創(chuàng)建InitiateMultipartUploadRequest對(duì)象揭保。
InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, objectName);
// 初始化分片。
InitiateMultipartUploadResult upresult = ossClient.initiateMultipartUpload(request);
// 返回uploadId魄宏,它是分片上傳事件的唯一標(biāo)識(shí)秸侣,您可以根據(jù)這個(gè)ID來發(fā)起相關(guān)的操作,如取消分片上傳、查詢分片上傳等味榛。
String uploadId = upresult.getUploadId();
// partETags是PartETag的集合方篮。PartETag由分片的ETag和分片號(hào)組成。
List<PartETag> partETags = new ArrayList<>();
// 計(jì)算文件有多少個(gè)分片 15MB
final long partSize = 2 * 1024 * 1024L;
long fileLength = multipartFile.getSize();
int partCount = (int) (fileLength / partSize);
if (fileLength % partSize != 0) {
partCount++;
}
// 遍歷分片上傳励负。
log.info("分片數(shù)量 {}",partCount);
List<Future<PartETag>> futureList = Collections.synchronizedList(new ArrayList());
CountDownLatch countDownLatch = new CountDownLatch(partCount);
for (int i = 0; i < partCount; i++) {
long startPos = i * partSize;
long curPartSize = (i + 1 == partCount) ? (fileLength - startPos) : partSize;
InputStream instream = null;
try {
instream = multipartFile.getInputStream();
} catch (IOException e) {
e.printStackTrace();
}
// 跳過已經(jīng)上傳的分片。
try {
instream.skip(startPos);
} catch (IOException e) {
e.printStackTrace();
}
int finalI = i;
InputStream finalInstream = instream;
Future<PartETag> partETagFuture = taskExecutor.submit(() ->
fileServiceExtAsync.getUploadPartETag(objectName, bucketName, uploadId, finalInstream, curPartSize, finalI + 1, ossClient, countDownLatch));
futureList.add(partETagFuture);
}
try {
countDownLatch.await();
for (Future<PartETag> tagFuture : futureList) {
partETags.add(tagFuture.get());
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// 創(chuàng)建CompleteMultipartUploadRequest對(duì)象匕得。
List<PartETag> collect = partETags.stream().sorted(Comparator.comparing(PartETag::getPartNumber)).collect(Collectors.toList());
// 在執(zhí)行完成分片上傳操作時(shí)继榆,需要提供所有有效的partETags。OSS收到提交的partETags后汁掠,會(huì)逐一驗(yàn)證每個(gè)分片的有效性略吨。當(dāng)所有的數(shù)據(jù)分片驗(yàn)證通過后,OSS將把這些分片組合成一個(gè)完整的文件考阱。
log.debug("文件開始合并");
CompleteMultipartUploadRequest completeMultipartUploadRequest =
new CompleteMultipartUploadRequest(bucketName, objectName, uploadId, collect);
// 如果需要在完成文件上傳的同時(shí)設(shè)置文件訪問權(quán)限翠忠,請(qǐng)參考以下示例代碼。
completeMultipartUploadRequest.setObjectACL(CannedAccessControlList.PublicRead);
// 完成上傳乞榨。
CompleteMultipartUploadResult completeMultipartUploadResult = ossClient.completeMultipartUpload(completeMultipartUploadRequest);
// 關(guān)閉OSSClient秽之。
ossClient.shutdown();
log.debug("消耗總時(shí)間: {}",((System.currentTimeMillis()-before)/1000)+"s");
return completeMultipartUploadResult;
}