原文轉(zhuǎn)載自「劉悅的技術(shù)博客」https://v3u.cn/a_id_175
分片上傳并不是什么新概念苦囱,尤其是大文件傳輸?shù)奶幚碇薪?jīng)常會(huì)被使用撕彤,在之前的一篇文章里:python花式讀取大文件(10g/50g/1t)遇到的性能問題(面試向)我們討論了如何讀寫超大型文件,本次再來探討一下如何上傳超大型文件蚀狰,其實(shí)原理都是大同小異职员,原則就是化整為零,將大文件進(jìn)行分片處理哥蔚,切割成若干小文件蛛蒙,隨后為每個(gè)分片創(chuàng)建一個(gè)新的臨時(shí)文件來保存其內(nèi)容,待全部分片上傳完畢后深夯,后端再按順序讀取所有臨時(shí)文件的內(nèi)容诺苹,將數(shù)據(jù)寫入新文件中,最后將臨時(shí)文件再刪掉掌呜。大體流程請(qǐng)見下圖:
其實(shí)現(xiàn)在市面上有很多前端的三方庫(kù)都集成了分片上傳的功能质蕉,比如百度的WebUploader翩肌,遺憾的是它已經(jīng)淡出歷史舞臺(tái),無人維護(hù)了《矣睿現(xiàn)在比較推薦主流的庫(kù)是vue-simple-uploader粱坤,不過餓了么公司開源的elementUI市場(chǎng)占有率還是非常高的瓷产,但其實(shí)大家所不知道的是濒旦,這個(gè)非常著名的前端UI庫(kù)也已經(jīng)許久沒人維護(hù)了测秸,Vue3.0版本出來這么久了灾常,也沒有做適配钞瀑,由此可見大公司的開源產(chǎn)品還是需要給業(yè)務(wù)讓步。本次我們利用elementUI的自定義上傳結(jié)合后端的網(wǎng)紅框架FastAPI來實(shí)現(xiàn)分片上傳缠俺。
首先前端需要安裝需要的庫(kù):
npm install element-ui --save
npm install spark-md5 --save
npm install axios --save
隨后在入口文件main.js中進(jìn)行配置:
import ElementUI from 'element-ui'
import 'element-ui/lib/theme-chalk/index.css'
Vue.use(ElementUI)
import Axios from 'axios'
Vue.prototype.axios = Axios;
import QS from 'qs'
Vue.prototype.qs = QS;
配置好之后贷岸,設(shè)計(jì)方案,前端通過elementUI上傳時(shí)躏救,通過分片大小的閾值對(duì)文件進(jìn)行切割螟蒸,并且記錄每一片文件的切割順序(chunk)七嫌,在這個(gè)過程中,通過SparkMD5來計(jì)算文件的唯一標(biāo)識(shí)(防止多個(gè)文件同時(shí)上傳的覆蓋問題identifier)英妓,在每一次分片文件的上傳中绍赛,會(huì)將分片文件實(shí)體,切割順序(chunk)以及唯一標(biāo)識(shí)(identifier)異步發(fā)送到后端接口(fastapi)贺纲,后端將chunk和identifier結(jié)合在一起作為臨時(shí)文件寫入服務(wù)器磁盤中褪测,當(dāng)前端將所有的分片文件都發(fā)送完畢后潦刃,最后請(qǐng)求一次后端另外一個(gè)接口乖杠,后端將所有文件合并澄成。
根據(jù)方案,前端建立chunkupload.js文件:
import SparkMD5 from 'spark-md5'
//錯(cuò)誤信息
function getError(action, option, xhr) {
let msg
if (xhr.response) {
msg = `${xhr.response.error || xhr.response}`
} else if (xhr.responseText) {
msg = `${xhr.responseText}`
} else {
msg = `fail to post ${action} ${xhr.status}`
}
const err = new Error(msg)
err.status = xhr.status
err.method = 'post'
err.url = action
return err
}
// 上傳成功完成合并之后卫漫,獲取服務(wù)器返回的json
function getBody(xhr) {
const text = xhr.responseText || xhr.response
if (!text) {
return text
}
try {
return JSON.parse(text)
} catch (e) {
return text
}
}
// 分片上傳的自定義請(qǐng)求列赎,以下請(qǐng)求會(huì)覆蓋element的默認(rèn)上傳行為
export default function upload(option) {
if (typeof XMLHttpRequest === 'undefined') {
return
}
const spark = new SparkMD5.ArrayBuffer()// md5的ArrayBuffer加密類
const fileReader = new FileReader()// 文件讀取類
const action = option.action // 文件上傳上傳路徑
const chunkSize = 1024 * 1024 * 1 // 單個(gè)分片大小镐确,這里測(cè)試用1m
let md5 = ''// 文件的唯一標(biāo)識(shí)
const optionFile = option.file // 需要分片的文件
let fileChunkedList = [] // 文件分片完成之后的數(shù)組
const percentage = [] // 文件上傳進(jìn)度的數(shù)組源葫,單項(xiàng)就是一個(gè)分片的進(jìn)度
// 文件開始分片,push到fileChunkedList數(shù)組中掺喻, 并用第一個(gè)分片去計(jì)算文件的md5
for (let i = 0; i < optionFile.size; i = i + chunkSize) {
const tmp = optionFile.slice(i, Math.min((i + chunkSize), optionFile.size))
if (i === 0) {
fileReader.readAsArrayBuffer(tmp)
}
fileChunkedList.push(tmp)
}
// 在文件讀取完畢之后储矩,開始計(jì)算文件md5,作為文件唯一標(biāo)識(shí)
fileReader.onload = async (e) => {
spark.append(e.target.result)
md5 = spark.end() + new Date().getTime()
console.log('文件唯一標(biāo)識(shí)--------', md5)
// 將fileChunkedList轉(zhuǎn)成FormData對(duì)象即硼,并加入上傳時(shí)需要的數(shù)據(jù)
fileChunkedList = fileChunkedList.map((item, index) => {
const formData = new FormData()
if (option.data) {
// 額外加入外面?zhèn)魅氲膁ata數(shù)據(jù)
Object.keys(option.data).forEach(key => {
formData.append(key, option.data[key])
})
// 這些字段看后端需要哪些只酥,就傳哪些呀狼,也可以自己追加額外參數(shù)
formData.append(option.filename, item, option.file.name)// 文件
formData.append('chunkNumber', index + 1)// 當(dāng)前文件塊
formData.append('chunkSize', chunkSize)// 單個(gè)分塊大小
formData.append('currentChunkSize', item.size)// 當(dāng)前分塊大小
formData.append('totalSize', optionFile.size)// 文件總大小
formData.append('identifier', md5)// 文件標(biāo)識(shí)
formData.append('filename', option.file.name)// 文件名
formData.append('totalChunks', fileChunkedList.length)// 總塊數(shù)
}
return { formData: formData, index: index }
})
// 更新上傳進(jìn)度條百分比的方法
const updataPercentage = (e) => {
let loaded = 0// 當(dāng)前已經(jīng)上傳文件的總大小
percentage.forEach(item => {
loaded += item
})
e.percent = loaded / optionFile.size * 100
option.onProgress(e)
}
// 創(chuàng)建隊(duì)列上傳任務(wù)哥艇,limit是上傳并發(fā)數(shù),默認(rèn)會(huì)用兩個(gè)并發(fā)
function sendRequest(chunks, limit = 2) {
return new Promise((resolve, reject) => {
const len = chunks.length
let counter = 0
let isStop = false
const start = async () => {
if (isStop) {
return
}
const item = chunks.shift()
console.log()
if (item) {
const xhr = new XMLHttpRequest()
const index = item.index
// 分片上傳失敗回調(diào)
xhr.onerror = function error(e) {
isStop = true
reject(e)
}
// 分片上傳成功回調(diào)
xhr.onload = function onload() {
if (xhr.status < 200 || xhr.status >= 300) {
isStop = true
reject(getError(action, option, xhr))
}
if (counter === len - 1) {
// 最后一個(gè)上傳完成
resolve()
} else {
counter++
start()
}
}
// 分片上傳中回調(diào)
if (xhr.upload) {
xhr.upload.onprogress = function progress(e) {
if (e.total > 0) {
e.percent = e.loaded / e.total * 100
}
percentage[index] = e.loaded
console.log(index)
updataPercentage(e)
}
}
xhr.open('post', action, true)
if (option.withCredentials && 'withCredentials' in xhr) {
xhr.withCredentials = true
}
const headers = option.headers || {}
for (const item in headers) {
if (headers.hasOwnProperty(item) && headers[item] !== null) {
xhr.setRequestHeader(item, headers[item])
}
}
// 文件開始上傳
xhr.send(item.formData);
}
}
while (limit > 0) {
setTimeout(() => {
start()
}, Math.random() * 1000)
limit -= 1
}
})
}
try {
// 調(diào)用上傳隊(duì)列方法 等待所有文件上傳完成
await sendRequest(fileChunkedList,2)
// 這里的參數(shù)根據(jù)自己實(shí)際情況寫
const data = {
identifier: md5,
filename: option.file.name,
totalSize: optionFile.size
}
// 給后端發(fā)送文件合并請(qǐng)求
const fileInfo = await this.axios({
method: 'post',
url: 'http://localhost:8000/mergefile/',
data: this.qs.stringify(data)
}, {
headers: {
"Content-Type": "multipart/form-data"
}
}).catch(error => {
console.log("ERRRR:: ", error.response.data);
});
console.log(fileInfo);
if (fileInfo.data.code === 200) {
const success = getBody(fileInfo.request)
option.onSuccess(success)
return
}
} catch (error) {
option.onError(error)
}
}
}
之后建立upload.vue模板文件逗堵,并且引入自定義上傳控件:
<template>
<div>
<el-upload
:http-request="chunkUpload"
:ref="chunkUpload"
:action="uploadUrl"
:data="uploadData"
:on-error="onError"
:before-remove="beforeRemove"
name="file" >
<el-button size="small" type="primary">點(diǎn)擊上傳</el-button>
</el-upload>
</div>
</template>
<script>
//js部分
import chunkUpload from './chunkUpload'
export default {
data() {
return {
uploadData: {
//這里面放額外攜帶的參數(shù)
},
//文件上傳的路徑
uploadUrl: 'http://localhost:8000/uploadfile/', //文件上傳的路徑
chunkUpload: chunkUpload // 分片上傳自定義方法,在頭部引入了
}
},
methods: {
onError(err, file, fileList) {
this.$store.getters.chunkUploadXhr.forEach(item => {
item.abort()
})
this.$alert('文件上傳失敗汁咏,請(qǐng)重試', '錯(cuò)誤', {
confirmButtonText: '確定'
})
},
beforeRemove(file) {
// 如果正在分片上傳作媚,則取消分片上傳
if (file.percentage !== 100) {
this.$store.getters.chunkUploadXhr.forEach(item => {
item.abort()
})
}
}
}
}
</script>
<style>
</style>
這里定義的后端上傳接口是:http://localhsot:8000/uploadfile/ 合并文件接口是:http://localhsot:8000/mergefile/
此時(shí)啟動(dòng)前端的vue.js服務(wù):
npm run dev
頁(yè)面效果見下圖:
前端搞定了掂骏,下面我們來編寫接口厚掷,后端的任務(wù)相對(duì)簡(jiǎn)單冒黑,利用FastAPI接收分片文件、分片順序以及唯一標(biāo)識(shí)抡爹,并且將文件臨時(shí)寫入到服務(wù)器中冬竟,當(dāng)最后一個(gè)分片文件完成上傳后,第二個(gè)接口負(fù)責(zé)按照分片順序合并所有文件泵殴,合并成功后再刪除臨時(shí)文件笑诅,用來節(jié)約空間,先安裝依賴的三方庫(kù)
pip3 install python-multipart
當(dāng)然了吆你,由于是前后端分離項(xiàng)目妇多,別忘了設(shè)置一下跨域,編寫main.py:
from uploadfile import router
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from model import database
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
origins = [
"*"
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
app.include_router(router)
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/")
def read_root():
return {"Hello": "World"}
然后編寫uploadfile.py:
@router.post("/uploadfile/")
async def uploadfile(file: UploadFile = File(...), chunkNumber: str = Form(...), identifier: str = Form(...)):
task = identifier # 獲取文件唯一標(biāo)識(shí)符
chunk = chunkNumber # 獲取該分片在所有分片中的序號(hào)
filename = '%s%s' % (task,chunk) # 構(gòu)成該分片唯一標(biāo)識(shí)符
contents = await file.read() #異步讀取文件
with open('./static/upload/%s' % filename, "wb") as f:
f.write(contents)
print(file.filename)
return {"filename": file.filename}
@router.post("/mergefile/")
async def uploadfile(identifier: str = Form(...), filename: str = Form(...)):
target_filename = filename # 獲取上傳文件的文件名
task = identifier # 獲取文件的唯一標(biāo)識(shí)符
chunk = 1 # 分片序號(hào)
with open('./static/upload/%s' % target_filename, 'wb') as target_file: # 創(chuàng)建新文件
while True:
try:
filename = './static/upload/%s%d' % (task,chunk)
# 按序打開每個(gè)分片
source_file = open(filename, 'rb')
# 讀取分片內(nèi)容寫入新文件
target_file.write(source_file.read())
source_file.close()
except IOError:
break
chunk += 1
os.remove(filename)
return {"code":200}
值得一提的是這里我們使用UploadFile來定義文件參數(shù)贬循,它的優(yōu)勢(shì)在于在接收存儲(chǔ)文件過程中如果文件過大超過了內(nèi)存限制就會(huì)存儲(chǔ)在硬盤中桃序,相當(dāng)靈活,同時(shí)配合await關(guān)鍵字異步讀取文件內(nèi)容奇适,提高了性能和效率芦鳍。
啟動(dòng)后端服務(wù)測(cè)試一下效果:
uvicorn main:app --reload
可以看到柠衅,當(dāng)我們上傳一張2.9m的圖片時(shí),前端會(huì)根據(jù)設(shè)置好的的分片閾值將該圖片切割為四份贷祈,傳遞給后端接口uploadfile后喝峦,后端在根據(jù)參數(shù)用接口mergefile將其合并,整個(gè)過程一氣呵成粟耻、行云流水眉踱、勢(shì)如破竹,讓人用了之后禁不住心曠神怡册烈、把酒臨風(fēng)叁执。最后奉上項(xiàng)目地址:https://gitee.com/QiHanXiBei/fastapi_blog
原文轉(zhuǎn)載自「劉悅的技術(shù)博客」 https://v3u.cn/a_id_175