今天掐松,公司內(nèi)大神問我 Promise 有沒有類似 Promise.all
多個并發(fā)執(zhí)行的并且保證數(shù)據(jù)的順序尤蛮,但是又可以完成一個異步操作就執(zhí)行異步的方法,比如:
這里有 5 個 http 分別耗時 [1000, 2000, 3000, 5000, 7000] ms
蜘拉,假如我使用 Promise.all
從完成請求到開始執(zhí)行回調(diào)一共需要時間是 7000ms
const arr = [1000, 200, 500, 700, 600];
const mockHttp = v => new Promise((resolve, reject) => {
setTimeout(() => resolve(v), v);
});
const tasks = [];
arr.forEach(v => tasks.push(mockHttp(v)));
const cb = v => console.log(v);
(async () => {
console.time();
const result = await Promise.all(tasks);
console.timeEnd();
// Get data, then do callback
result.forEach(v => cb(v));
})()
// Result
// default: 1000.254150390625ms
// 1000
// 200
// 500
// 700
// 600
這樣雖然能保證拿到的結(jié)果是按照順序來的遂唧,但是,在執(zhí)行回調(diào)的時候卻是在等全部異步結(jié)束后之拨,我期望的是這樣的
// 1000
// 200
// 500
// 700
// 600
// default: 1000.254150390625ms
- 保證異步結(jié)果是按順序返回的
- 并發(fā)
這個我自己感覺是 Promise.all
與 Promise.race
的結(jié)合茉继,無奈沒有發(fā)現(xiàn) Promise
規(guī)范中沒有這種接口,于是就嘗試自己實現(xiàn)蚀乔。
當(dāng)時正好在看 Rxjs
烁竭,所以感覺這個剛好就很適合這種場景,于是就開始寫
首先吉挣,需要同樣需要 mock 異步的過程派撕,這里直接使用上面的 setTimeout 來mock 異步的過程。
const mockHttp = v => new Promise((resolve, reject) => {
setTimeout(() => resolve(v), v);
})
然后睬魂,我們開始寫異步并發(fā)的代碼终吼,這里使用 Rxjs
的觀察者模式
const tasks = [3500, 300, 5000, 3000, 700, 2000, 5];
const a = Rx.Observable.create((obs) => {
tasks.forEach((v, idx) => {
// Do something async but return value must include { value, index }
mockHttp(v).then(data => obs.next({data, idx}))
});
});
為了能拿到異步結(jié)果并且執(zhí)行,需要添加觀察者氯哮。
// 假如直接訂閱就沒有意義了际跪,不能讓異步的結(jié)果按照順序返回
function scheduler (input) {
const queen = [];
let idxObj = {};
return Rx.Observable.create((obs) => {
input.subscribe((data) => {
if (data.idx === 0 || data.idx === idxObj.idx) {
obs.next(data);
idxObj.idx = data.idx + 1;
// For wait queen value
it(idxObj, queen, obs)
} else {
queen.push(data);
}
})
})
}
function it (idxObj, data, obs) {
const newArr = Array.from(data).sort((a, b) => a.idx - b.idx)
if (newArr.length !== 0 && newArr[0].idx === idxObj.idx) {
obs.next(newArr[0])
data.splice(data.findIndex(v => v.idx === newArr[0].idx), 1);
idxObj.idx = newArr[0].idx + 1;
it(idxObj, data, obs);
}
}
這段代碼是整個功能的核心,主要作用就是增加一個隊列用于緩存排在后面但異步所需時間較短的返回值,然后在每一次返回值時去遍歷這個隊列將順序的值給返回出來姆打。因為 Rxjs
的觀察者訂閱模式良姆,可以較為簡單的實現(xiàn)這個功能,也可以使用 nodejs
的 event
庫來達到同樣的目的幔戏。
最后玛追,加上訂閱函數(shù),也就是handle
闲延,就能達到預(yù)期的效果了痊剖。
const handle = v => console.log(v);
scheduler(a).subscribe(v => {
console.log(v);
handle(v.data);
})
這里為了能看出執(zhí)行順序,所以把順序也給附加在返回值中慨代。
最后附上全部代碼鏈接 JS Fiddle
假如需要更直觀的看到執(zhí)行順序使用下面代碼在 Rxjs 頁面執(zhí)行邢笙。
console.time();
const tasks = [3500, 300, 5000, 3000, 700, 2000, 5];
const mockHttp = v => new Promise((resolve, reject) => {
setTimeout(() => resolve(v), v);
})
const a = Rx.Observable.create((obs) => {
tasks.forEach((v, idx) => {
// Do something async but return value must include { value, index }
mockHttp(v).then(data => obs.next({data, idx}))
});
});
function scheduler (input) {
const queen = [];
let idxObj = {};
return Rx.Observable.create((obs) => {
input.subscribe((data) => {
if (data.idx === 0 || data.idx === idxObj.idx) {
obs.next(data);
idxObj.idx = data.idx + 1;
// For wait queen value
it(idxObj, queen, obs)
} else {
queen.push(data);
}
})
})
}
function it (idxObj, data, obs) {
const newArr = Array.from(data).sort((a, b) => a.idx - b.idx)
if (newArr.length !== 0 && newArr[0].idx === idxObj.idx) {
obs.next(newArr[0])
data.splice(data.findIndex(v => v.idx === newArr[0].idx), 1);
idxObj.idx = newArr[0].idx + 1;
it(idxObj, data, obs);
}
}
const handle = v => { console.timeEnd(); console.log(v); console.time() }
scheduler(a).subscribe(handle)
總結(jié): 寫了這么多,就為了兩個目標侍匙, 一個是并發(fā)異步氮惯, 另一個是只要按照順序執(zhí)行 callback 并且不是等全部并發(fā)都完成。如有疏漏想暗,請指正妇汗,謝謝!