轉(zhuǎn)存服務(wù)器

參考原文: 轉(zhuǎn)存服務(wù)器


Buffer鹊碍、Stream磷籍、Promise、async await蝠引、request单起、分片上傳

什么是轉(zhuǎn)存服務(wù)器抱怔?

即向服務(wù)器發(fā)送一個(gè)圖片的url地址,服務(wù)負(fù)責(zé)去將圖片下載到服務(wù)器上嘀倒,之后再將這個(gè)圖片上傳到存儲(chǔ)服務(wù)屈留,得到一個(gè)可以訪問(通常情況都是CDN服務(wù))的圖片地址。

當(dāng)服務(wù)器下在一個(gè)大型文件時(shí)测蘑,需要完全下載完灌危,然后緩存到本地硬盤的緩存文件中,而且 一次性上傳大文件碳胳,過程中由于耗時(shí)較長勇蝙,因此存在較高的失敗率,通常采用分片法來解決挨约,如果分片失敗則只需重新上傳該分片即可味混。

在下載時(shí),如果下載量滿足一個(gè)分片大小則上傳诫惭。所以第一步就是監(jiān)聽下載內(nèi)容翁锡。ReadStream在接收數(shù)據(jù)時(shí)會(huì)不斷的觸發(fā)data事件,因此只需監(jiān)聽data事件就可以準(zhǔn)確捕獲到每一次數(shù)據(jù)傳輸過程夕土,ReadStream分為兩種模式流動(dòng)模式暫停模式盗誊,流動(dòng)模式下數(shù)據(jù)會(huì)源源不斷的流出供需要者使用,而暫停模式只有調(diào)用read()方法才會(huì)有數(shù)據(jù)流出隘弊。這里我們通過pipe把ReadStream與WriteStream相連哈踱,讓數(shù)據(jù)流動(dòng)起來。

const request = require('request');
const fs      = require('fs');
const path    = require('path');
const url     = 'https://baobao-3d.bj.bcebos.com/16-0-205.shuimian.mp4';


const httpReadStream  = request({method: 'GET', url: url});
const fileWriteStream = fs.createWriteStream(path.join(__dirname, 'download', 'lyf.mp4'));

httpReadStream.pipe(fileWriteStream);

let totalLength = 0;
httpReadStream
    .on('response', res=> {
        console.log('response.headers', res.statusCode);
    })
    .on('data', chunk=> {
        totalLength += chunk.length;
    })
    .on('error', err=> {
        console.log('err', err);
    });

fileWriteStream.on('close', ()=> {
    console.log(`已下載文件大小: ${(totalLength / 1000 / 1024).toFixed(2)} MB`)
});```

每次data事件獲取的chunk大小因網(wǎng)絡(luò)而變化梨熙,假設(shè)每次上傳分片大小為2MB开镣,每一次chunk有可能大于2MB也可能小于2MB,所以可在中間設(shè)置一緩沖區(qū)咽扇,當(dāng)緩沖區(qū)滿足2MB時(shí)就取出2MB作為一個(gè)分片進(jìn)行上傳邪财。

于是我們使用Buffer實(shí)現(xiàn)一個(gè)緩沖區(qū),主要用于分片质欲。
```javascript
class BufferCache {
    constructor(cutSize = 2 * 1024 * 1000) {
        this._cache      = Buffer.alloc(0);
        this._cutSzie    = cutSize;
        this._readyCache = [];
    }

    push(buf) {
        let cacheLength = this._cache.length;
        let bufLength   = buf.length;
        this._cache     = Buffer.concat([this._cache, buf], bufLength + cacheLength)
        this.cut();
    }

    pull() {
        return this._readyCache.shift();
    }


    cut() {
        if (this._cache.length >= this._cutSzie) {
            const totalCacheLength = this._cache.length;
            let cutCount           = Math.floor(totalCacheLength / this._cutSzie);

            for (let i = 0; i < cutCount; i++) {
                let newBuffer = Buffer.alloc(this._cutSzie);
                this._cache.copy(newBuffer, 0, i * this._cutSzie, (i + 1) * this._cutSzie);
                this._readyCache.push(newBuffer);
            }
            this._cache = this._cache.slice(cutCount * this._cutSzie);
        }
    }

    getReadChunks() {
        return this._readyCache;
    }

    getRemainChunks() {
        if (this._cache.length < this._cutSzie)
            return this._cache;
        else {
            this.cut();
            return this.getRemainChunks();
        }
    }
}

exports = module.exports = BufferCache;

為了便于后面的編碼树埠,提高可擴(kuò)展性和可讀性,我們將下載過程封裝如下嘶伟。通過四個(gè)回調(diào)函數(shù)輕易掌控下載開始怎憋、中途、結(jié)束、異常四種狀態(tài)绊袋。

const request     = require('request');
const fs          = require('fs');
const path        = require('path');
const BufferCache = require('./bufferCache');
const url         = 'https://baobao-3d.bj.bcebos.com/16-0-205.shuimian.mp4';
const _cutSize    = 1024 * 1000 * 2;
const bufferCache = new BufferCache(_cutSize);
let isFinished    = false;

function getChunks(options, onStart, onData, onFinish, onError) {
    const httpReadStream  = request({method: options.method, url: options.url});
    const fileWriteStream = fs.createWriteStream(path.join(__dirname, 'download', 'lyf.mp4'));

    httpReadStream.pipe(fileWriteStream);

    let downloadedLength = 0;
    httpReadStream
        .on('response', res=>onStart(res))
        .on('data', chunk=> {
            downloadedLength += chunk.length;
            onData(chunk, downloadedLength)
        })
        .on('error', err=>onError(err));
    
    fileWriteStream.on('close', ()=> {
        onFinish(downloadedLength)
    });
}

function onStart(res) {
    console.log('start downloading, statusCode is :', res.statusCode);
}

function onData(chunk, downloadedLength) {
    bufferCache.push(chunk);
}

function onFinished(totalLength) {
    let chunkCount = Math.ceil(totalLength / _cutSize);
    console.log('total chunk count is:' + chunkCount);
}

function onError(err) {
    console.log('err', err)
}

getChunks({method: 'GET', url: url}, onStart, onData, onFinished, onError);

截止目前毕匀,我們已經(jīng)完成下載、分片接下來需要考慮如下:

  • 如何連續(xù)獲取準(zhǔn)備好的分片?
  • 如何上傳分片?
  • 上傳分片失敗的重傳問題?
  • 上傳完所有分片之后的統(tǒng)一處理接口?
  • 分片的并發(fā)上傳?以及并發(fā)數(shù)的控制
  • 如何連續(xù)獲取準(zhǔn)備好的分片?
    在onStart執(zhí)行之后即數(shù)據(jù)開始傳輸時(shí)癌别,我們可以使用Node自帶的間隔計(jì)時(shí)器setInterval皂岔,每隔200ms獲取一次分片。一個(gè)文件在經(jīng)過多次相同大小的切割之后展姐,總會(huì)遺留下小的一塊分片躁垛,因此我們還需要對最后一個(gè)分片進(jìn)行特殊處理。當(dāng) readyCache 的長度為0的時(shí)候圾笨,而且下載已經(jīng)完成教馆,不會(huì)再調(diào)用 pushBuf 函數(shù),就是獲取最后一段分片的時(shí)機(jī)墅拭。于是重寫onStart函數(shù)完成以上業(yè)務(wù)
function onStart(res) {
    console.log('start downloading, statusCode is :', res.statusCode);
    let interval = setInterval(function () {
        if (bufferCache.getReadChunks().length > 0) {
            let readCache = bufferCache.pull();
            console.log('recives', readCache.length)
        }
        if (isFinished) {
            clearInterval(interval);
            let lastChunk = bufferCache.getRemainChunks();
            console.log('the last chunk', lastChunk.length);
        }
    }, 200)
}
  • 如何上傳分片?
    使用HTTP進(jìn)行文件上傳涣狗,文件在傳輸過程中為一個(gè)byte序列谍婉,其 content-type 為 multipart/form-data,我們先通過Promise封裝一個(gè)上傳函數(shù)
function upload(url, data) {
    return new Promise((resolve, reject) => {
        request.post({
            url: url,
            formData: data
        }, function (err, response, body) {
            if (!err && response.statusCode === 200) {
                resolve(body);
            }
            else {
                reject(err);
            }
        });
    });
}

我們現(xiàn)在需要從緩存中拿分片镀钓,如國還有剩余著繼續(xù)穗熬,沒有則通知發(fā)送完成,對于這樣的邏輯可以使用遞歸丁溅。
假設(shè)當(dāng)前網(wǎng)絡(luò)環(huán)境擁堵唤蔗,會(huì)導(dǎo)致上傳一個(gè)分片的時(shí)間 > 200ms, 200ms之后下一次輪詢開始運(yùn)行時(shí)窟赏,原先的分片還沒上傳完畢妓柜,由于沒有一個(gè)狀態(tài)值進(jìn)行判斷,依然會(huì)調(diào)用上傳函數(shù)涯穷,又再一次進(jìn)行分片上傳棍掐,就會(huì)更加劇的網(wǎng)絡(luò)擁堵環(huán)境,導(dǎo)致分片上傳時(shí)間更短拷况。如此反復(fù)作煌,時(shí)間一長就會(huì)導(dǎo)致崩潰,造成分片上傳全部大面積失敗赚瘦。為了避免這樣的情況粟誓,我們就需要一個(gè)變量來表示當(dāng)前這個(gè)上傳流程的狀態(tài),目前我們只關(guān)心單個(gè)流程進(jìn)行上傳起意,可以只需要保證最大同時(shí)上傳的值為1即可鹰服。

function sendChunks() {
    let chunkId     = 0; // 給每個(gè)分片劃分ID
    let sending     = 0; // 當(dāng)前并行上傳的數(shù)量
    let MAX_SENDING = 1; // 最大并行上傳數(shù)

    function send(readCaches) {
        if (readCaches.length <= 0)
            return;
        console.log(`發(fā)送第 ${chunkId} 塊分片`)
        const chunk       = readCaches.shift();
        const sendPromise = upload('http://localhost:3000/upload', {
            chunk: {
                value: chunk,
                options: {
                    // 在文件名稱上添加chunkId,可以方便后端服務(wù)進(jìn)行分片整理
                    filename: 'example.mp4_IDSPLIT_' + chunkId
                }
            }
        });
        sending++;
        sendPromise.then(resBody=> {
            sending--;
            if (resBody.uploadStatus === 0 && readCaches.length > 0)
                send(readCaches);
        });
        chunkId++;
    }

    return new Promise((resolve, reject)=> {
        let readyCaches = bufferCache.getReadChunks();
        let interval    = setInterval(function () {
            if (readyCaches.length >= 0 && sending <= MAX_SENDING) {
                send(readyCaches);
            }
            if (isFinished && readyCaches.length === 0) {
                clearInterval(interval);
                const lastChunk = bufferCache.getRemainChunks();
                readyCaches.push(lastChunk);
                send(readyCaches)
            }
        }, 200)
    })
}

截止此我們已經(jīng)完成下載-分片-連續(xù)上傳分片的簡單實(shí)現(xiàn)揽咕,但如果某一分片上傳失敗又該怎么辦呢获诈?send()函數(shù)可以看作一個(gè)發(fā)送單個(gè)分片(不考慮遞歸)的控制器仍源,只需在其內(nèi)部捕獲上傳錯(cuò)誤的分片,保存下來重傳即可舔涎。于是我們修改sendChunks函數(shù)如下:在send().cathc(fn)內(nèi)進(jìn)行重傳控制笼踩,在可嘗試次數(shù)之內(nèi)進(jìn)行重傳,如果失敗則拋出異常亡嫌。

function sendChunks() {
    let chunkId = 0;
    let sending = 0; // 當(dāng)前并行上傳的數(shù)量
    let MAX_SENDING = 1; // 最大并行上傳數(shù)
    let stopSend = false;

    function send(options) {
        let readyCache = options.readyCache;
        let fresh = options.fresh;
        let retryCount = options.retry;
        let chunkIndex;

        let chunk = null;

        // 新的數(shù)據(jù)
        if (fresh) {
            if (readyCache.length === 0) {
                return;
            }

            chunk = readyCache.shift();
            chunkIndex = chunkId;
            chunkId++;
        }
        // 失敗重試的數(shù)據(jù)
        else {
            chunk = options.data;
            chunkIndex = options.index;
        }

        sending++;
        let sendP = upload('http://localhost:3000', {
            chunk: {
                value: chunk,
                options: {
                    filename: 'example.mp4_IDSPLIT_' + chunkIndex
                }
            }
        }).then((response) => {
            sending--;
            let json = JSON.parse(response);

            if (json.errno === 0 && readyCache.length > 0) {
                return send({
                    retry: RETRY_COUNT,
                    fresh: true,
                    readyCache: readyCache
                });
            }

            // 這里一直返回成功
            return Promise.resolve(json);
        }).catch(err => {
            if (retryCount > 0) {
                // 這里遞歸下去嚎于,如果成功的話,就等同于錯(cuò)誤已經(jīng)處理
                return send({
                    retry: retryCount - 1,
                    index: chunkIndex,
                    fresh: false,
                    data: chunk,
                    readyCache: readyCache
                });
            }
            else {
                console.log(`upload failed of chunkIndex: ${chunkIndex}`);
                // 停止上傳標(biāo)識(shí)挟冠,會(huì)直接停止上傳
                stopSend = true;
                // 返回reject于购,異常拋出
                return Promise.reject(err);
            }
        });
    }

    return new Promise((resolve, reject) => {
        let readyCache = bufferCache.getChunks();

        let sendTimer = setInterval(() => {
            if (sending < MAX_SENDING && readyCache.length > 0) {
                // 改用傳入對象
                send({
                    retry: 3, // 最大重試3次
                    fresh: true, // 用這個(gè)字段來區(qū)分是新的分片,還是由于失敗重試的
                    readyCache: readyCache
                }).catch(err => {
                    console.log('upload failed, errmsg: ', err);
                });
            }
            else if (isFinished && readyCache.length === 0 || stopSend) {
                clearTimeout(sendTimer);

                // 已經(jīng)成功走到最后一個(gè)分片了知染。
                if (!stopSend) {
                    let lastChunk = bufferCache.getRemainChunks();
                    readyCache.push(lastChunk);

                    send({
                        retry: 3,
                        fresh: true,
                        readyCache: readyCache
                    }).catch(err => {
                        console.log('upload failed, errmsg: ', err);
                    });
                }
            }

            // 到這里是為分片正在下載肋僧,同時(shí)又正在上傳
            // 或者上傳比下載快,已經(jīng)下載好的分片都傳完了控淡,等待下載完成
        }, 200);
    });
}
  • 上傳完所有分片之后的統(tǒng)一處理接口?
    由于上傳send()在成功上傳一個(gè)分片后會(huì)返回一個(gè)Promise對象嫌吠,上傳失敗時(shí)會(huì)拋出異常,所以只需使用Promsie.all()方法捕獲即可掺炭。
let readyCache = bufferCache.getChunks();
let sendPromise = [];

let sendTimer = setInterval(() => {
    if (sending < MAX_SENDING && readyCache.length > 0) {
        // 把Promise塞進(jìn)數(shù)組
        sendPromise.push(send({
            retry: RETRY_COUNT,
            fresh: true,
            readyCache: readyCache
        }));
    }
    else if ((isFinished && readyCache.length === 0) || stopSend) {
        clearTimeout(sendTimer);

        if (!stopSend) {
            console.log('got last chunk');
            let lastChunk = bufferCache.getRemainChunks();
            readyCache.push(lastChunk);
            // 把Promise塞進(jìn)數(shù)組
            sendPromise.push(send({
                retry: RETRY_COUNT,
                fresh: true,
                readyCache: readyCache
            }));
        }

        // 當(dāng)所有的分片都發(fā)送之后觸發(fā)辫诅,
        Promise.all(sendPromise).then(() => {
            console.log('send success');
        }).catch(err => {
            console.log('send failed');
        });
    }
    // not ready, wait for next interval
}, 200);

  • 分片的并發(fā)上傳?以及并發(fā)數(shù)的控制?現(xiàn)在還剩最后一個(gè)問題,Node本身就是非阻塞IO涧狮、事件驅(qū)動(dòng)的炕矮,我們只需使用send()去同步的獲得執(zhí)行,而真正的上傳邏輯upload卻是異步者冤,所以不需要考慮資源競爭肤视、死鎖等問題,只需同步擴(kuò)展send方法即可涉枫。

let readyCache = bufferCache.getChunks();
let threadPool = [];

let sendTimer = setInterval(() => {
    if (sending < MAX_SENDING && readyCache.length > 0) {
        // 這個(gè)例子同時(shí)開啟4個(gè)分片上傳
        for (let i = 0; i < MAX_SENDING; i++) {
            let thread = send({
                retry: RETRY_COUNT,
                fresh: true,
                readyCache: readyCache
            });

            threadPool.push(thread);
        }
    }
    else if ((isFinished && readyCache.length === 0) || stopSend) {
        clearTimeout(sendTimer);

        if (!stopSend) {
            console.log('got last chunk');
            let lastChunk = bufferCache.getRemainChunks();
            readyCache.push(lastChunk);
            threadPool.push(send({
                retry: RETRY_COUNT,
                fresh: true,
                readyCache: readyCache
            }));
        }

        Promise.all(threadPool).then(() => {
            console.log('send success');
        }).catch(err => {
            console.log('send failed');
        });
    }
}, 200);

這里我們通過文件的md5值去判斷是否屬于同一文件钢颂。


function toMd5(buffer) {
    let md5 = crypto.createHash('md5');
    md5.update(buffer);
    return md5.digest('hex');
}

存儲(chǔ)服務(wù)器上由于是分片后的文件,所以我們先把目錄中的文件以Buffer的形式讀入內(nèi)存,在求文件的md5值即可拜银。

function filesInDirToBuffer(dirPath) {
    let totalBuffer = Buffer.allocUnsafe(0);
    const dirsInfo  = fs.readdirSync(dirPath);
    dirsInfo.forEach(file=> {
        if (file != '.DS_Store') {
            const currentFileBuffer = fs.readFileSync(path.join(dirPath, file));
            totalBuffer             = Buffer.concat([totalBuffer, currentFileBuffer], totalBuffer.length + currentFileBuffer.length);
        }
    });
    return totalBuffer;
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末殊鞭,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子尼桶,更是在濱河造成了極大的恐慌操灿,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,843評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件泵督,死亡現(xiàn)場離奇詭異趾盐,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,538評論 3 392
  • 文/潘曉璐 我一進(jìn)店門救鲤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來久窟,“玉大人,你說我怎么就攤上這事本缠〕饪福” “怎么了?”我有些...
    開封第一講書人閱讀 163,187評論 0 353
  • 文/不壞的土叔 我叫張陵丹锹,是天一觀的道長稀颁。 經(jīng)常有香客問我,道長楣黍,這世上最難降的妖魔是什么匾灶? 我笑而不...
    開封第一講書人閱讀 58,264評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮租漂,結(jié)果婚禮上阶女,老公的妹妹穿的比我還像新娘。我一直安慰自己哩治,他們只是感情好秃踩,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,289評論 6 390
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著锚扎,像睡著了一般吞瞪。 火紅的嫁衣襯著肌膚如雪馁启。 梳的紋絲不亂的頭發(fā)上驾孔,一...
    開封第一講書人閱讀 51,231評論 1 299
  • 那天,我揣著相機(jī)與錄音惯疙,去河邊找鬼翠勉。 笑死,一個(gè)胖子當(dāng)著我的面吹牛霉颠,可吹牛的內(nèi)容都是我干的对碌。 我是一名探鬼主播,決...
    沈念sama閱讀 40,116評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼蒿偎,長吁一口氣:“原來是場噩夢啊……” “哼朽们!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起诉位,我...
    開封第一講書人閱讀 38,945評論 0 275
  • 序言:老撾萬榮一對情侶失蹤骑脱,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后苍糠,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體叁丧,經(jīng)...
    沈念sama閱讀 45,367評論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,581評論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了拥娄。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蚊锹。...
    茶點(diǎn)故事閱讀 39,754評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖稚瘾,靈堂內(nèi)的尸體忽然破棺而出牡昆,到底是詐尸還是另有隱情,我是刑警寧澤孟抗,帶...
    沈念sama閱讀 35,458評論 5 344
  • 正文 年R本政府宣布迁杨,位于F島的核電站,受9級特大地震影響凄硼,放射性物質(zhì)發(fā)生泄漏铅协。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,068評論 3 327
  • 文/蒙蒙 一摊沉、第九天 我趴在偏房一處隱蔽的房頂上張望狐史。 院中可真熱鬧,春花似錦说墨、人聲如沸骏全。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,692評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽姜贡。三九已至,卻和暖如春棺棵,著一層夾襖步出監(jiān)牢的瞬間楼咳,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,842評論 1 269
  • 我被黑心中介騙來泰國打工烛恤, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留母怜,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,797評論 2 369
  • 正文 我出身青樓缚柏,卻偏偏與公主長得像苹熏,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子币喧,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,654評論 2 354

推薦閱讀更多精彩內(nèi)容