參考原文: 轉(zhuǎn)存服務(wù)器
Buffer鹊碍、Stream磷籍、Promise、async await蝠引、request单起、分片上傳
什么是轉(zhuǎn)存服務(wù)器抱怔?
即向服務(wù)器發(fā)送一個(gè)圖片的url地址,服務(wù)負(fù)責(zé)去將圖片下載到服務(wù)器上嘀倒,之后再將這個(gè)圖片上傳到存儲(chǔ)服務(wù)屈留,得到一個(gè)可以訪問(通常情況都是CDN服務(wù))的圖片地址。
當(dāng)服務(wù)器下在一個(gè)大型文件時(shí)测蘑,需要完全下載完灌危,然后緩存到本地硬盤的緩存文件中,而且 一次性上傳大文件碳胳,過程中由于耗時(shí)較長勇蝙,因此存在較高的失敗率,通常采用分片法
來解決挨约,如果分片失敗則只需重新上傳該分片即可味混。
在下載時(shí),如果下載量滿足一個(gè)分片大小則上傳诫惭。所以第一步就是監(jiān)聽下載內(nèi)容翁锡。ReadStream在接收數(shù)據(jù)時(shí)會(huì)不斷的觸發(fā)data
事件,因此只需監(jiān)聽data
事件就可以準(zhǔn)確捕獲到每一次數(shù)據(jù)傳輸過程夕土,ReadStream分為兩種模式流動(dòng)模式
和暫停模式
盗誊,流動(dòng)模式下數(shù)據(jù)會(huì)源源不斷的流出供需要者使用,而暫停模式
只有調(diào)用read()
方法才會(huì)有數(shù)據(jù)流出隘弊。這里我們通過pipe
把ReadStream與WriteStream相連哈踱,讓數(shù)據(jù)流動(dòng)起來。
const request = require('request');
const fs = require('fs');
const path = require('path');
const url = 'https://baobao-3d.bj.bcebos.com/16-0-205.shuimian.mp4';
const httpReadStream = request({method: 'GET', url: url});
const fileWriteStream = fs.createWriteStream(path.join(__dirname, 'download', 'lyf.mp4'));
httpReadStream.pipe(fileWriteStream);
let totalLength = 0;
httpReadStream
.on('response', res=> {
console.log('response.headers', res.statusCode);
})
.on('data', chunk=> {
totalLength += chunk.length;
})
.on('error', err=> {
console.log('err', err);
});
fileWriteStream.on('close', ()=> {
console.log(`已下載文件大小: ${(totalLength / 1000 / 1024).toFixed(2)} MB`)
});```
每次data事件獲取的chunk大小因網(wǎng)絡(luò)而變化梨熙,假設(shè)每次上傳分片大小為2MB开镣,每一次chunk有可能大于2MB也可能小于2MB,所以可在中間設(shè)置一緩沖區(qū)咽扇,當(dāng)緩沖區(qū)滿足2MB時(shí)就取出2MB作為一個(gè)分片進(jìn)行上傳邪财。
于是我們使用Buffer實(shí)現(xiàn)一個(gè)緩沖區(qū),主要用于分片质欲。
```javascript
class BufferCache {
constructor(cutSize = 2 * 1024 * 1000) {
this._cache = Buffer.alloc(0);
this._cutSzie = cutSize;
this._readyCache = [];
}
push(buf) {
let cacheLength = this._cache.length;
let bufLength = buf.length;
this._cache = Buffer.concat([this._cache, buf], bufLength + cacheLength)
this.cut();
}
pull() {
return this._readyCache.shift();
}
cut() {
if (this._cache.length >= this._cutSzie) {
const totalCacheLength = this._cache.length;
let cutCount = Math.floor(totalCacheLength / this._cutSzie);
for (let i = 0; i < cutCount; i++) {
let newBuffer = Buffer.alloc(this._cutSzie);
this._cache.copy(newBuffer, 0, i * this._cutSzie, (i + 1) * this._cutSzie);
this._readyCache.push(newBuffer);
}
this._cache = this._cache.slice(cutCount * this._cutSzie);
}
}
getReadChunks() {
return this._readyCache;
}
getRemainChunks() {
if (this._cache.length < this._cutSzie)
return this._cache;
else {
this.cut();
return this.getRemainChunks();
}
}
}
exports = module.exports = BufferCache;
為了便于后面的編碼树埠,提高可擴(kuò)展性和可讀性,我們將下載過程封裝如下嘶伟。通過四個(gè)回調(diào)函數(shù)輕易掌控下載開始怎憋、中途、結(jié)束、異常四種狀態(tài)绊袋。
const request = require('request');
const fs = require('fs');
const path = require('path');
const BufferCache = require('./bufferCache');
const url = 'https://baobao-3d.bj.bcebos.com/16-0-205.shuimian.mp4';
const _cutSize = 1024 * 1000 * 2;
const bufferCache = new BufferCache(_cutSize);
let isFinished = false;
function getChunks(options, onStart, onData, onFinish, onError) {
const httpReadStream = request({method: options.method, url: options.url});
const fileWriteStream = fs.createWriteStream(path.join(__dirname, 'download', 'lyf.mp4'));
httpReadStream.pipe(fileWriteStream);
let downloadedLength = 0;
httpReadStream
.on('response', res=>onStart(res))
.on('data', chunk=> {
downloadedLength += chunk.length;
onData(chunk, downloadedLength)
})
.on('error', err=>onError(err));
fileWriteStream.on('close', ()=> {
onFinish(downloadedLength)
});
}
function onStart(res) {
console.log('start downloading, statusCode is :', res.statusCode);
}
function onData(chunk, downloadedLength) {
bufferCache.push(chunk);
}
function onFinished(totalLength) {
let chunkCount = Math.ceil(totalLength / _cutSize);
console.log('total chunk count is:' + chunkCount);
}
function onError(err) {
console.log('err', err)
}
getChunks({method: 'GET', url: url}, onStart, onData, onFinished, onError);
截止目前毕匀,我們已經(jīng)完成下載、分片接下來需要考慮如下:
- 如何連續(xù)獲取準(zhǔn)備好的分片?
- 如何上傳分片?
- 上傳分片失敗的重傳問題?
- 上傳完所有分片之后的統(tǒng)一處理接口?
- 分片的并發(fā)上傳?以及并發(fā)數(shù)的控制
- 如何連續(xù)獲取準(zhǔn)備好的分片?
在onStart執(zhí)行之后即數(shù)據(jù)開始傳輸時(shí)癌别,我們可以使用Node自帶的間隔計(jì)時(shí)器setInterval皂岔,每隔200ms獲取一次分片。一個(gè)文件在經(jīng)過多次相同大小的切割之后展姐,總會(huì)遺留下小的一塊分片躁垛,因此我們還需要對最后一個(gè)分片進(jìn)行特殊處理。當(dāng) readyCache 的長度為0的時(shí)候圾笨,而且下載已經(jīng)完成教馆,不會(huì)再調(diào)用 pushBuf 函數(shù),就是獲取最后一段分片的時(shí)機(jī)墅拭。于是重寫onStart函數(shù)完成以上業(yè)務(wù)
function onStart(res) {
console.log('start downloading, statusCode is :', res.statusCode);
let interval = setInterval(function () {
if (bufferCache.getReadChunks().length > 0) {
let readCache = bufferCache.pull();
console.log('recives', readCache.length)
}
if (isFinished) {
clearInterval(interval);
let lastChunk = bufferCache.getRemainChunks();
console.log('the last chunk', lastChunk.length);
}
}, 200)
}
- 如何上傳分片?
使用HTTP進(jìn)行文件上傳涣狗,文件在傳輸過程中為一個(gè)byte序列谍婉,其 content-type 為 multipart/form-data,我們先通過Promise封裝一個(gè)上傳函數(shù)
function upload(url, data) {
return new Promise((resolve, reject) => {
request.post({
url: url,
formData: data
}, function (err, response, body) {
if (!err && response.statusCode === 200) {
resolve(body);
}
else {
reject(err);
}
});
});
}
我們現(xiàn)在需要從緩存中拿分片镀钓,如國還有剩余著繼續(xù)穗熬,沒有則通知發(fā)送完成,對于這樣的邏輯可以使用遞歸
丁溅。
假設(shè)當(dāng)前網(wǎng)絡(luò)環(huán)境擁堵唤蔗,會(huì)導(dǎo)致上傳一個(gè)分片的時(shí)間 > 200ms, 200ms之后下一次輪詢開始運(yùn)行時(shí)窟赏,原先的分片還沒上傳完畢妓柜,由于沒有一個(gè)狀態(tài)值進(jìn)行判斷,依然會(huì)調(diào)用上傳函數(shù)涯穷,又再一次進(jìn)行分片上傳棍掐,就會(huì)更加劇的網(wǎng)絡(luò)擁堵環(huán)境,導(dǎo)致分片上傳時(shí)間更短拷况。如此反復(fù)作煌,時(shí)間一長就會(huì)導(dǎo)致崩潰,造成分片上傳全部大面積失敗赚瘦。為了避免這樣的情況粟誓,我們就需要一個(gè)變量來表示當(dāng)前這個(gè)上傳流程的狀態(tài),目前我們只關(guān)心單個(gè)流程進(jìn)行上傳起意,可以只需要保證最大同時(shí)上傳的值為1即可鹰服。
function sendChunks() {
let chunkId = 0; // 給每個(gè)分片劃分ID
let sending = 0; // 當(dāng)前并行上傳的數(shù)量
let MAX_SENDING = 1; // 最大并行上傳數(shù)
function send(readCaches) {
if (readCaches.length <= 0)
return;
console.log(`發(fā)送第 ${chunkId} 塊分片`)
const chunk = readCaches.shift();
const sendPromise = upload('http://localhost:3000/upload', {
chunk: {
value: chunk,
options: {
// 在文件名稱上添加chunkId,可以方便后端服務(wù)進(jìn)行分片整理
filename: 'example.mp4_IDSPLIT_' + chunkId
}
}
});
sending++;
sendPromise.then(resBody=> {
sending--;
if (resBody.uploadStatus === 0 && readCaches.length > 0)
send(readCaches);
});
chunkId++;
}
return new Promise((resolve, reject)=> {
let readyCaches = bufferCache.getReadChunks();
let interval = setInterval(function () {
if (readyCaches.length >= 0 && sending <= MAX_SENDING) {
send(readyCaches);
}
if (isFinished && readyCaches.length === 0) {
clearInterval(interval);
const lastChunk = bufferCache.getRemainChunks();
readyCaches.push(lastChunk);
send(readyCaches)
}
}, 200)
})
}
截止此我們已經(jīng)完成下載-分片-連續(xù)上傳分片的簡單實(shí)現(xiàn)揽咕,但如果某一分片上傳失敗又該怎么辦呢获诈?send()函數(shù)可以看作一個(gè)發(fā)送單個(gè)分片(不考慮遞歸)的控制器仍源,只需在其內(nèi)部捕獲上傳錯(cuò)誤的分片,保存下來重傳即可舔涎。于是我們修改sendChunks函數(shù)如下:在send().cathc(fn)內(nèi)進(jìn)行重傳控制笼踩,在可嘗試次數(shù)之內(nèi)進(jìn)行重傳,如果失敗則拋出異常亡嫌。
function sendChunks() {
let chunkId = 0;
let sending = 0; // 當(dāng)前并行上傳的數(shù)量
let MAX_SENDING = 1; // 最大并行上傳數(shù)
let stopSend = false;
function send(options) {
let readyCache = options.readyCache;
let fresh = options.fresh;
let retryCount = options.retry;
let chunkIndex;
let chunk = null;
// 新的數(shù)據(jù)
if (fresh) {
if (readyCache.length === 0) {
return;
}
chunk = readyCache.shift();
chunkIndex = chunkId;
chunkId++;
}
// 失敗重試的數(shù)據(jù)
else {
chunk = options.data;
chunkIndex = options.index;
}
sending++;
let sendP = upload('http://localhost:3000', {
chunk: {
value: chunk,
options: {
filename: 'example.mp4_IDSPLIT_' + chunkIndex
}
}
}).then((response) => {
sending--;
let json = JSON.parse(response);
if (json.errno === 0 && readyCache.length > 0) {
return send({
retry: RETRY_COUNT,
fresh: true,
readyCache: readyCache
});
}
// 這里一直返回成功
return Promise.resolve(json);
}).catch(err => {
if (retryCount > 0) {
// 這里遞歸下去嚎于,如果成功的話,就等同于錯(cuò)誤已經(jīng)處理
return send({
retry: retryCount - 1,
index: chunkIndex,
fresh: false,
data: chunk,
readyCache: readyCache
});
}
else {
console.log(`upload failed of chunkIndex: ${chunkIndex}`);
// 停止上傳標(biāo)識(shí)挟冠,會(huì)直接停止上傳
stopSend = true;
// 返回reject于购,異常拋出
return Promise.reject(err);
}
});
}
return new Promise((resolve, reject) => {
let readyCache = bufferCache.getChunks();
let sendTimer = setInterval(() => {
if (sending < MAX_SENDING && readyCache.length > 0) {
// 改用傳入對象
send({
retry: 3, // 最大重試3次
fresh: true, // 用這個(gè)字段來區(qū)分是新的分片,還是由于失敗重試的
readyCache: readyCache
}).catch(err => {
console.log('upload failed, errmsg: ', err);
});
}
else if (isFinished && readyCache.length === 0 || stopSend) {
clearTimeout(sendTimer);
// 已經(jīng)成功走到最后一個(gè)分片了知染。
if (!stopSend) {
let lastChunk = bufferCache.getRemainChunks();
readyCache.push(lastChunk);
send({
retry: 3,
fresh: true,
readyCache: readyCache
}).catch(err => {
console.log('upload failed, errmsg: ', err);
});
}
}
// 到這里是為分片正在下載肋僧,同時(shí)又正在上傳
// 或者上傳比下載快,已經(jīng)下載好的分片都傳完了控淡,等待下載完成
}, 200);
});
}
- 上傳完所有分片之后的統(tǒng)一處理接口?
由于上傳send()在成功上傳一個(gè)分片后會(huì)返回一個(gè)Promise對象嫌吠,上傳失敗時(shí)會(huì)拋出異常,所以只需使用Promsie.all()方法捕獲即可掺炭。
let readyCache = bufferCache.getChunks();
let sendPromise = [];
let sendTimer = setInterval(() => {
if (sending < MAX_SENDING && readyCache.length > 0) {
// 把Promise塞進(jìn)數(shù)組
sendPromise.push(send({
retry: RETRY_COUNT,
fresh: true,
readyCache: readyCache
}));
}
else if ((isFinished && readyCache.length === 0) || stopSend) {
clearTimeout(sendTimer);
if (!stopSend) {
console.log('got last chunk');
let lastChunk = bufferCache.getRemainChunks();
readyCache.push(lastChunk);
// 把Promise塞進(jìn)數(shù)組
sendPromise.push(send({
retry: RETRY_COUNT,
fresh: true,
readyCache: readyCache
}));
}
// 當(dāng)所有的分片都發(fā)送之后觸發(fā)辫诅,
Promise.all(sendPromise).then(() => {
console.log('send success');
}).catch(err => {
console.log('send failed');
});
}
// not ready, wait for next interval
}, 200);
- 分片的并發(fā)上傳?以及并發(fā)數(shù)的控制?現(xiàn)在還剩最后一個(gè)問題,Node本身就是非阻塞IO涧狮、事件驅(qū)動(dòng)的炕矮,我們只需使用send()去同步的獲得執(zhí)行,而真正的上傳邏輯upload卻是異步者冤,所以不需要考慮資源競爭肤视、死鎖等問題,只需同步擴(kuò)展send方法即可涉枫。
let readyCache = bufferCache.getChunks();
let threadPool = [];
let sendTimer = setInterval(() => {
if (sending < MAX_SENDING && readyCache.length > 0) {
// 這個(gè)例子同時(shí)開啟4個(gè)分片上傳
for (let i = 0; i < MAX_SENDING; i++) {
let thread = send({
retry: RETRY_COUNT,
fresh: true,
readyCache: readyCache
});
threadPool.push(thread);
}
}
else if ((isFinished && readyCache.length === 0) || stopSend) {
clearTimeout(sendTimer);
if (!stopSend) {
console.log('got last chunk');
let lastChunk = bufferCache.getRemainChunks();
readyCache.push(lastChunk);
threadPool.push(send({
retry: RETRY_COUNT,
fresh: true,
readyCache: readyCache
}));
}
Promise.all(threadPool).then(() => {
console.log('send success');
}).catch(err => {
console.log('send failed');
});
}
}, 200);
這里我們通過文件的md5值去判斷是否屬于同一文件钢颂。
function toMd5(buffer) {
let md5 = crypto.createHash('md5');
md5.update(buffer);
return md5.digest('hex');
}
存儲(chǔ)服務(wù)器上由于是分片后的文件,所以我們先把目錄中的文件以Buffer的形式讀入內(nèi)存,在求文件的md5值即可拜银。
function filesInDirToBuffer(dirPath) {
let totalBuffer = Buffer.allocUnsafe(0);
const dirsInfo = fs.readdirSync(dirPath);
dirsInfo.forEach(file=> {
if (file != '.DS_Store') {
const currentFileBuffer = fs.readFileSync(path.join(dirPath, file));
totalBuffer = Buffer.concat([totalBuffer, currentFileBuffer], totalBuffer.length + currentFileBuffer.length);
}
});
return totalBuffer;
}