前言
在沒有深度使用函數(shù)回調(diào)的經(jīng)驗的時候继阻,去看這些內(nèi)容還是有一點吃力的露筒。由于Node.js獨特的異步特性呐伞,才出現(xiàn)了“回調(diào)地獄”的問題,這篇文章中慎式,我比較詳細的記錄了如何解決異步流問題伶氢。
文章會很長,而且這篇是對異步流模式的解釋瘪吏。文中會使用一個簡單的網(wǎng)絡蜘蛛的例子癣防,它的作用是抓取指定URL的網(wǎng)頁內(nèi)容并保存在項目中,在文章的最后掌眠,可以找到整篇文章中的源碼demo蕾盯。
1.原生JavaScript模式
本篇不針對初學者,因此會省略掉大部分的基礎內(nèi)容的講解:
(spider_v1.js)
const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
function spider(url, callback) {
const filename = utilities.urlToFilename(url);
console.log(`filename: ${filename}`);
fs.exists(filename, exists => {
if (!exists) {
console.log(`Downloading ${url}`);
request(url, (err, response, body) => {
if (err) {
callback(err);
} else {
mkdirp(path.dirname(filename), err => {
if (err) {
callback(err);
} else {
fs.writeFile(filename, body, err => {
if (err) {
callback(err);
} else {
callback(null, filename, true);
}
});
}
});
}
});
} else {
callback(null, filename, false);
}
});
}
spider(process.argv[2], (err, filename, downloaded) => {
if (err) {
console.log(err);
} else if (downloaded) {
console.log(`Completed the download of ${filename}`);
} else {
console.log(`${filename} was already downloaded`);
}
});
上邊的代碼的流程大概是這樣的:
- 把url轉(zhuǎn)換成filename
- 判斷該文件名是否存在蓝丙,若存在直接返回级遭,否則進入下一步
- 發(fā)請求,獲取body
- 把body寫入到文件中
這是一個非常簡單版本的蜘蛛渺尘,他只能抓取一個url的內(nèi)容挫鸽,看到上邊的回調(diào)多么令人頭疼。那么我們開始進行優(yōu)化鸥跟。
首先丢郊,if else 這種方式可以進行優(yōu)化,這個很簡單锌雀,不用多說蚂夕,放一個對比效果:
/// before
if (err) {
callback(err);
} else {
callback(null, filename, true);
}
/// after
if (err) {
return callback(err);
}
callback(null, filename, true);
代碼這么寫,嵌套就會少一層腋逆,但經(jīng)驗豐富的程序員會認為婿牍,這樣寫過重強調(diào)了error,我們編程的重點應該放在處理正確的數(shù)據(jù)上惩歉,在可讀性上也存在這樣的要求等脂。
另一個優(yōu)化是函數(shù)拆分俏蛮,上邊代碼中的spider函數(shù)中,可以把下載文件和保存文件拆分出去上遥。
(spider_v2.js)
const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
function saveFile(filename, contents, callback) {
mkdirp(path.dirname(filename), err => {
if (err) {
return callback(err);
}
fs.writeFile(filename, contents, callback);
});
}
function download(url, filename, callback) {
console.log(`Downloading ${url}`);
request(url, (err, response, body) => {
if (err) {
return callback(err);
}
saveFile(filename, body, err => {
if (err) {
return callback(err);
}
console.log(`Downloaded and saved: ${url}`);
callback(null, body);
});
})
}
function spider(url, callback) {
const filename = utilities.urlToFilename(url);
console.log(`filename: ${filename}`);
fs.exists(filename, exists => {
if (exists) {
return callback(null, filename, false);
}
download(url, filename, err => {
if (err) {
return callback(err);
}
callback(null, filename, true);
})
});
}
spider(process.argv[2], (err, filename, downloaded) => {
if (err) {
console.log(err);
} else if (downloaded) {
console.log(`Completed the download of ${filename}`);
} else {
console.log(`${filename} was already downloaded`);
}
});
上邊的代碼基本上是采用原生優(yōu)化后的結果搏屑,但這個蜘蛛的功能太過簡單,我們現(xiàn)在需要抓取某個網(wǎng)頁中的所有url粉楚,這樣才會引申出串行和并行的問題辣恋。
(spider_v3.js)
const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
function saveFile(filename, contents, callback) {
mkdirp(path.dirname(filename), err => {
if (err) {
return callback(err);
}
fs.writeFile(filename, contents, callback);
});
}
function download(url, filename, callback) {
console.log(`Downloading ${url}`);
request(url, (err, response, body) => {
if (err) {
return callback(err);
}
saveFile(filename, body, err => {
if (err) {
return callback(err);
}
console.log(`Downloaded and saved: ${url}`);
callback(null, body);
});
})
}
/// 最大的啟發(fā)是實現(xiàn)了如何異步循環(huán)遍歷數(shù)組
function spiderLinks(currentUrl, body, nesting, callback) {
if (nesting === 0) {
return process.nextTick(callback);
}
const links = utilities.getPageLinks(currentUrl, body);
function iterate(index) {
if (index === links.length) {
return callback();
}
spider(links[index], nesting - 1, err => {
if (err) {
return callback(err);
}
iterate((index + 1));
})
}
iterate(0);
}
function spider(url, nesting, callback) {
const filename = utilities.urlToFilename(url);
fs.readFile(filename, "utf8", (err, body) => {
if (err) {
if (err.code !== 'ENOENT') {
return callback(err);
}
return download(url, filename, (err, body) => {
if (err) {
return callback(err);
}
spiderLinks(url, body, nesting, callback);
});
}
spiderLinks(url, body, nesting, callback);
});
}
spider(process.argv[2], 2, (err, filename, downloaded) => {
if (err) {
console.log(err);
} else if (downloaded) {
console.log(`Completed the download of ${filename}`);
} else {
console.log(`${filename} was already downloaded`);
}
});
上邊的代碼相比之前的代碼多了兩個核心功能,首先是通過輔助類獲取到了某個body中的links:
const links = utilities.getPageLinks(currentUrl, body);
內(nèi)部實現(xiàn)就不解釋了模软,另一個核心代碼就是:
/// 最大的啟發(fā)是實現(xiàn)了如何異步循環(huán)遍歷數(shù)組
function spiderLinks(currentUrl, body, nesting, callback) {
if (nesting === 0) {
return process.nextTick(callback);
}
const links = utilities.getPageLinks(currentUrl, body);
function iterate(index) {
if (index === links.length) {
return callback();
}
spider(links[index], nesting - 1, err => {
if (err) {
return callback(err);
}
iterate((index + 1));
})
}
iterate(0);
}
可以說上邊這一小段代碼伟骨,就是采用原生實現(xiàn)異步串行的pattern了。除了這些之外燃异,還引入了nesting的概念携狭,通過這是這個屬性,可以控制抓取層次回俐。
到這里我們就完整的實現(xiàn)了串行的功能逛腿,考慮到性能,我們要開發(fā)并行抓取的功能仅颇。
(spider_v4.js)
const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
function saveFile(filename, contents, callback) {
mkdirp(path.dirname(filename), err => {
if (err) {
return callback(err);
}
fs.writeFile(filename, contents, callback);
});
}
function download(url, filename, callback) {
console.log(`Downloading ${url}`);
request(url, (err, response, body) => {
if (err) {
return callback(err);
}
saveFile(filename, body, err => {
if (err) {
return callback(err);
}
console.log(`Downloaded and saved: ${url}`);
callback(null, body);
});
})
}
/// 最大的啟發(fā)是實現(xiàn)了如何異步循環(huán)遍歷數(shù)組
function spiderLinks(currentUrl, body, nesting, callback) {
if (nesting === 0) {
return process.nextTick(callback);
}
const links = utilities.getPageLinks(currentUrl, body);
if (links.length === 0) {
return process.nextTick(callback);
}
let completed = 0, hasErrors = false;
function done(err) {
if (err) {
hasErrors = true;
return callback(err);
}
if (++completed === links.length && !hasErrors) {
return callback();
}
}
links.forEach(link => {
spider(link, nesting - 1, done);
});
}
const spidering = new Map();
function spider(url, nesting, callback) {
if (spidering.has(url)) {
return process.nextTick(callback);
}
spidering.set(url, true);
const filename = utilities.urlToFilename(url);
/// In this pattern, there will be some issues.
/// Possible problems to download the same url again and again单默。
fs.readFile(filename, "utf8", (err, body) => {
if (err) {
if (err.code !== 'ENOENT') {
return callback(err);
}
return download(url, filename, (err, body) => {
if (err) {
return callback(err);
}
spiderLinks(url, body, nesting, callback);
});
}
spiderLinks(url, body, nesting, callback);
});
}
spider(process.argv[2], 2, (err, filename, downloaded) => {
if (err) {
console.log(err);
} else if (downloaded) {
console.log(`Completed the download of ${filename}`);
} else {
console.log(`${filename} was already downloaded`);
}
});
這段代碼同樣很簡單,也有兩個核心內(nèi)容忘瓦。一個是如何實現(xiàn)并發(fā):
/// 最大的啟發(fā)是實現(xiàn)了如何異步循環(huán)遍歷數(shù)組
function spiderLinks(currentUrl, body, nesting, callback) {
if (nesting === 0) {
return process.nextTick(callback);
}
const links = utilities.getPageLinks(currentUrl, body);
if (links.length === 0) {
return process.nextTick(callback);
}
let completed = 0, hasErrors = false;
function done(err) {
if (err) {
hasErrors = true;
return callback(err);
}
if (++completed === links.length && !hasErrors) {
return callback();
}
}
links.forEach(link => {
spider(link, nesting - 1, done);
});
}
上邊的代碼可以說是實現(xiàn)并發(fā)的一個pattern雕凹。利用循環(huán)遍歷來實現(xiàn)。另一個核心是政冻,既然是并發(fā)的,那么利用fs.exists
就會存在問題线欲,可能會重復下載同一文件明场,這里的解決方案是:
- 使用Map緩存某一url,url應該作為key
現(xiàn)在我們又有了新的需求李丰,要求限制同時并發(fā)的最大數(shù)苦锨,那么在這里就引進了一個我認為最重要的概念:隊列。
(task-Queue.js)
class TaskQueue {
constructor(concurrency) {
this.concurrency = concurrency;
this.running = 0;
this.queue = [];
}
pushTask(task) {
this.queue.push(task);
this.next();
}
next() {
while (this.running < this.concurrency && this.queue.length) {
const task = this.queue.shift();
task(() => {
this.running--;
this.next();
});
this.running++;
}
}
}
module.exports = TaskQueue;
上邊的代碼就是隊列的實現(xiàn)代碼趴泌,核心是next()
方法舟舒,可以看出,當task加入隊列中后嗜憔,會立刻執(zhí)行秃励,這不是說這個任務一定馬上執(zhí)行,而是指的是next會立刻調(diào)用吉捶。
(spider_v5.js)
const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
const TaskQueue = require("./task-Queue");
const downloadQueue = new TaskQueue(2);
function saveFile(filename, contents, callback) {
mkdirp(path.dirname(filename), err => {
if (err) {
return callback(err);
}
fs.writeFile(filename, contents, callback);
});
}
function download(url, filename, callback) {
console.log(`Downloading ${url}`);
request(url, (err, response, body) => {
if (err) {
return callback(err);
}
saveFile(filename, body, err => {
if (err) {
return callback(err);
}
console.log(`Downloaded and saved: ${url}`);
callback(null, body);
});
})
}
/// 最大的啟發(fā)是實現(xiàn)了如何異步循環(huán)遍歷數(shù)組
function spiderLinks(currentUrl, body, nesting, callback) {
if (nesting === 0) {
return process.nextTick(callback);
}
const links = utilities.getPageLinks(currentUrl, body);
if (links.length === 0) {
return process.nextTick(callback);
}
let completed = 0, hasErrors = false;
links.forEach(link => {
/// 給隊列出傳遞一個任務夺鲜,這個任務首先是一個函數(shù)皆尔,其次該函數(shù)接受一個參數(shù)
/// 當調(diào)用任務時,觸發(fā)該函數(shù)币励,然后給函數(shù)傳遞一個參數(shù)慷蠕,告訴該函數(shù)在任務結束時干什么
downloadQueue.pushTask(done => {
spider(link, nesting - 1, err => {
/// 這里表示,只要發(fā)生錯誤食呻,隊列就會退出
if (err) {
hasErrors = true;
return callback(err);
}
if (++completed === links.length && !hasErrors) {
callback();
}
done();
});
});
});
}
const spidering = new Map();
function spider(url, nesting, callback) {
if (spidering.has(url)) {
return process.nextTick(callback);
}
spidering.set(url, true);
const filename = utilities.urlToFilename(url);
/// In this pattern, there will be some issues.
/// Possible problems to download the same url again and again流炕。
fs.readFile(filename, "utf8", (err, body) => {
if (err) {
if (err.code !== 'ENOENT') {
return callback(err);
}
return download(url, filename, (err, body) => {
if (err) {
return callback(err);
}
spiderLinks(url, body, nesting, callback);
});
}
spiderLinks(url, body, nesting, callback);
});
}
spider(process.argv[2], 2, (err, filename, downloaded) => {
if (err) {
console.log(`error: ${err}`);
} else if (downloaded) {
console.log(`Completed the download of ${filename}`);
} else {
console.log(`${filename} was already downloaded`);
}
});
因此,為了限制并發(fā)的個數(shù)仅胞,只需在spiderLinks
方法中每辟,把task遍歷放入隊列就可以了。這相對來說很簡單饼问。
到這里為止影兽,我們使用原生JavaScript實現(xiàn)了一個有相對完整功能的網(wǎng)絡蜘蛛,既能串行莱革,也能并發(fā)峻堰,還可以控制并發(fā)個數(shù)。
2.使用async庫
把不同的功能放到不同的函數(shù)中盅视,會給我們帶來巨大的好處捐名,async庫十分流行,它的性能也不錯闹击,它內(nèi)部基于callback镶蹋。
(spider_v6.js)
const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
const series = require("async/series");
const eachSeries = require("async/eachSeries");
function download(url, filename, callback) {
console.log(`Downloading ${url}`);
let body;
series([
callback => {
request(url, (err, response, resBody) => {
if (err) {
return callback(err);
}
body = resBody;
callback();
});
},
mkdirp.bind(null, path.dirname(filename)),
callback => {
fs.writeFile(filename, body, callback);
}
], err => {
if (err) {
return callback(err);
}
console.log(`Downloaded and saved: ${url}`);
callback(null, body);
});
}
/// 最大的啟發(fā)是實現(xiàn)了如何異步循環(huán)遍歷數(shù)組
function spiderLinks(currentUrl, body, nesting, callback) {
if (nesting === 0) {
return process.nextTick(callback);
}
const links = utilities.getPageLinks(currentUrl, body);
if (links.length === 0) {
return process.nextTick(callback);
}
eachSeries(links, (link, cb) => {
"use strict";
spider(link, nesting - 1, cb);
}, callback);
}
const spidering = new Map();
function spider(url, nesting, callback) {
if (spidering.has(url)) {
return process.nextTick(callback);
}
spidering.set(url, true);
const filename = utilities.urlToFilename(url);
fs.readFile(filename, "utf8", (err, body) => {
if (err) {
if (err.code !== 'ENOENT') {
return callback(err);
}
return download(url, filename, (err, body) => {
if (err) {
return callback(err);
}
spiderLinks(url, body, nesting, callback);
});
}
spiderLinks(url, body, nesting, callback);
});
}
spider(process.argv[2], 1, (err, filename, downloaded) => {
if (err) {
console.log(err);
} else if (downloaded) {
console.log(`Completed the download of ${filename}`);
} else {
console.log(`${filename} was already downloaded`);
}
});
在上邊的代碼中,我們只使用了async的三個功能:
const series = require("async/series"); // 串行
const eachSeries = require("async/eachSeries"); // 并行
const queue = require("async/queue"); // 隊列
由于比較簡單赏半,就不做解釋了贺归。async中的隊列的代碼在(spider_v7.js)中,和上邊我們自定義的隊列很相似断箫,也不做更多解釋了拂酣。
3.Promise
Promise是一個協(xié)議,有很多庫實現(xiàn)了這個協(xié)議仲义,我們用的是ES6的實現(xiàn)婶熬。簡單來說promise就是一個約定,如果完成了埃撵,就調(diào)用它的resolve方法赵颅,失敗了就調(diào)用它的reject方法。它內(nèi)有實現(xiàn)了then方法暂刘,then返回promise本身饺谬,這樣就形成了調(diào)用鏈。
其實Promise的內(nèi)容有很多谣拣,在實際應用中是如何把普通的函數(shù)promise化商蕴。這方面的內(nèi)容在這里也不講了叠萍,我自己也不夠格
(spider_v8.js)
const utilities = require("./utilities");
const request = utilities.promisify(require("request"));
const fs = require("fs");
const readFile = utilities.promisify(fs.readFile);
const writeFile = utilities.promisify(fs.writeFile);
const mkdirp = utilities.promisify(require("mkdirp"));
const path = require("path");
function saveFile(filename, contents, callback) {
mkdirp(path.dirname(filename), err => {
if (err) {
return callback(err);
}
fs.writeFile(filename, contents, callback);
});
}
function download(url, filename) {
console.log(`Downloading ${url}`);
let body;
return request(url)
.then(response => {
"use strict";
body = response.body;
return mkdirp(path.dirname(filename));
})
.then(() => writeFile(filename, body))
.then(() => {
"use strict";
console.log(`Downloaded adn saved: ${url}`);
return body;
});
}
/// promise編程的本質(zhì)就是為了解決在函數(shù)中設置回調(diào)函數(shù)的問題
/// 通過中間層promise來實現(xiàn)異步函數(shù)同步化
function spiderLinks(currentUrl, body, nesting) {
let promise = Promise.resolve();
if (nesting === 0) {
return promise;
}
const links = utilities.getPageLinks(currentUrl, body);
links.forEach(link => {
"use strict";
promise = promise.then(() => spider(link, nesting - 1));
});
return promise;
}
function spider(url, nesting) {
const filename = utilities.urlToFilename(url);
return readFile(filename, "utf8")
.then(
body => spiderLinks(url, body, nesting),
err => {
"use strict";
if (err.code !== 'ENOENT') {
/// 拋出錯誤,這個方便與在整個異步鏈的最后通過呢catch來捕獲這個鏈中的錯誤
throw err;
}
return download(url, filename)
.then(body => spiderLinks(url, body, nesting));
}
);
}
spider(process.argv[2], 1)
.then(() => {
"use strict";
console.log('Download complete');
})
.catch(err => {
"use strict";
console.log(err);
});
可以看到上邊的代碼中的函數(shù)都是沒有callback的绪商,只需要在最后catch就可以了苛谷。
在設計api的時候,應該支持兩種方式格郁,及支持callback腹殿,又支持promise
function asyncDivision(dividend, divisor, cb) {
return new Promise((resolve, reject) => {
"use strict";
process.nextTick(() => {
const result = dividend / divisor;
if (isNaN(result) || !Number.isFinite(result)) {
const error = new Error("Invalid operands");
if (cb) {
cb(error);
}
return reject(error);
}
if (cb) {
cb(null, result);
}
resolve(result);
});
});
}
asyncDivision(10, 2, (err, result) => {
"use strict";
if (err) {
return console.log(err);
}
console.log(result);
});
asyncDivision(22, 11)
.then((result) => console.log(result))
.catch((err) => console.log(err));
4.Generator
Generator很有意思,他可以讓暫停函數(shù)和恢復函數(shù)例书,利用thunkify和co這兩個庫锣尉,我們下邊的代碼實現(xiàn)起來非常酷决采。
(spider_v9.js)
const thunkify = require("thunkify");
const co = require("co");
const path = require("path");
const utilities = require("./utilities");
const request = thunkify(require("request"));
const fs = require("fs");
const mkdirp = thunkify(require("mkdirp"));
const readFile = thunkify(fs.readFile);
const writeFile = thunkify(fs.writeFile);
const nextTick = thunkify(process.nextTick);
function* download(url, filename) {
console.log(`Downloading ${url}`);
const response = yield request(url);
console.log(response);
const body = response[1];
yield mkdirp(path.dirname(filename));
yield writeFile(filename, body);
console.log(`Downloaded and saved ${url}`);
return body;
}
function* spider(url, nesting) {
const filename = utilities.urlToFilename(url);
let body;
try {
body = yield readFile(filename, "utf8");
} catch (err) {
if (err.code !== 'ENOENT') {
throw err;
}
body = yield download(url, filename);
}
yield spiderLinks(url, body, nesting);
}
function* spiderLinks(currentUrl, body, nesting) {
if (nesting === 0) {
return nextTick();
}
const links = utilities.getPageLinks(currentUrl, body);
for (let i = 0; i < links.length; i++) {
yield spider(links[i], nesting - 1);
}
}
/// 通過co就自動處理了回調(diào)函數(shù)自沧,直接返回了回調(diào)函數(shù)中的參數(shù),把這些參數(shù)放到一個數(shù)組中树瞭,但是去掉了err信息
co(function* () {
try {
yield spider(process.argv[2], 1);
console.log('Download complete');
} catch (err) {
console.log(err);
}
});
總結
我并沒有寫promise和generator并發(fā)的代碼拇厢。以上這些內(nèi)容來自于這本書nodejs-design-patternshttps://github.com/agelessman/MyBooks。