某天寫完nodejs枣申,忽然福至心靈想去看看async的源碼長什么樣子,打開一看原來就1000來行看杭,非常簡潔忠藤。于是一邊看一邊寫下這篇文章,作為學(xué)習(xí)筆記記錄下來楼雹。
關(guān)于waterfall
Runs the tasks array of functions in series, each passing their results to the next in the array. However, if any of the tasks pass an error to their own callback, the next function is not executed, and the maincallback is immediately called with the error.
按順序依次執(zhí)行一組函數(shù)模孩。每個(gè)函數(shù)產(chǎn)生的結(jié)果依次傳給數(shù)組中的下一個(gè)函數(shù)。如果有一個(gè)有任何一個(gè)函數(shù)的產(chǎn)生了錯(cuò)誤贮缅,那么它的下一個(gè)函數(shù)就不會(huì)執(zhí)行榨咐,而是執(zhí)行最后的回調(diào)函數(shù)(maincallback,就是第二個(gè)參數(shù))來處理這個(gè)錯(cuò)誤谴供。
waterfall的參數(shù)如下:
Name | Type | Description |
---|---|---|
tasks | Array | An array of functions to run, each function is passed a callback(err, result1, result2, ...) it must call on completion. The first argument is an error (which can be null) and any further arguments will be passed as arguments in order to the next task. |
callback | function <optional> | An optional callback to run once all the functions have completed. This will be passed the results of the last task's callback. Invoked with (err, [results]). |
waterfall源碼
由于是直接看的手上項(xiàng)目里的源碼块茁,所以版本比較老,最新版貌似已經(jīng)把每個(gè)方法都拆成一個(gè)個(gè)小模塊了桂肌。先將就看著,等有機(jī)會(huì)在把最新的補(bǔ)上轴或,比較下看看有什么不同:)
async.waterfall = function (tasks, callback) {
// 一些邊界情況的處理
// 參數(shù)callback未定義時(shí)昌跌,默認(rèn)執(zhí)行一個(gè)空的函數(shù)
callback = callback || function () {};
// 傳入?yún)?shù)tasks不是數(shù)組時(shí)報(bào)錯(cuò)
if (tasks.constructor !== Array) {
var err = new Error('First argument to waterfall must be an array of functions');
return callback(err);
}
// 空數(shù)組直接調(diào)用callback并返回
if (!tasks.length) {
return callback();
}
// 定義了wrapIterator函數(shù)
var wrapIterator = function (iterator) {
// 此處省略若干行
};
// 執(zhí)行wranpIterator返回的函數(shù)
wrapIterator(async.iterator(tasks))();
};
這里大約可以分為3部分,第一段是對(duì)一些邊界條件的處理照雁,第二段定義了一個(gè)函數(shù)wrapIterator對(duì)iterator進(jìn)行了一層包裝,第三段執(zhí)行包裝后的wrapIterator。
為了能夠更好的理解waterfall的工作流程饺蚊,我們先來看看async.iterator()這個(gè)函數(shù)
async.iterator = function (tasks) {
// makeCallback相當(dāng)于對(duì)每個(gè)tasks里的函數(shù)進(jìn)行一層包裝
// 包裝后的函數(shù)多了一個(gè)next屬性
var makeCallback = function (index) {
// 定義一個(gè)函數(shù)對(duì)象fn
// 對(duì)象結(jié)構(gòu)為{ [Function: fn] next: [Function] }
var fn = function () {
// fn執(zhí)行時(shí)會(huì)執(zhí)行task[index],并將fn的參數(shù)傳給tasks[index]
if (tasks.length) {
tasks[index].apply(null, arguments);
}
// fn執(zhí)行完以后會(huì)返回tasks中的下個(gè)函數(shù)
return fn.next();
};
// next指向makeCallback包裝后的task[index + 1]
fn.next = function () {
return (index < tasks.length - 1) ? makeCallback(index + 1): null;
};
// fn就是包裝后的tasks[index]
return fn;
};
// 返回第一個(gè)fn萍诱,于是我們得到了一個(gè)迭代器
return makeCallback(0);
};
每調(diào)用一次makeCallback就會(huì)包裝一個(gè)task為iterator,而在fn.next指向下個(gè)iterator后又會(huì)遞歸的調(diào)用makeCallback污呼,因此makeCallback(0)相當(dāng)于完成了兩件事裕坊,遞歸的把每個(gè)task包裝為iterator,并返回了第一個(gè)iterator的引用燕酷。
抽象點(diǎn)說籍凝,iterator函數(shù)接受一個(gè)函數(shù)數(shù)組,并把它串起來以后變成了一個(gè)迭代器
用起來大致就是這樣的:
var tasks = [
function () {
console.log(1);
},
function () {
console.log(2);
}
];
var iterator = async.iterator(tasks);
iterator = iterator();
iterator = iterator();
輸出:
1
2
現(xiàn)在我們可以看下wrapIterator函數(shù)了苗缩,源碼如下
var wrapIterator = function (iterator) {
// 如果是err饵蒂,則跳轉(zhuǎn)到最后的callback
return function (err) {
if (err) {
callback.apply(null, arguments);
callback = function () {};
}
else {
// args是傳到下個(gè)function的參數(shù),第一個(gè)err是不需要的
// 要注意的是arguments并不是一個(gè)array
// 所以對(duì)arguments做切片要用如下方式
var args = Array.prototype.slice.call(arguments, 1);
// 下一個(gè)迭代器
var next = iterator.next();
// 把下一個(gè)包裝后的wrapIterator作為最后個(gè)參數(shù)傳入
if (next) {
args.push(wrapIterator(next));
}
else {
args.push(callback);
}
// 執(zhí)行iterator
// 這個(gè)iterator會(huì)執(zhí)行一遍task酱讶,并在回調(diào)處調(diào)用下一個(gè)wrapIterator
async.setImmediate(function () {
iterator.apply(null, args);
});
}
};
};
這個(gè)wrapIterator聲明了一個(gè)匿名函數(shù)退盯,這個(gè)function實(shí)際上就是我們task中間用來處理err的那個(gè)“中轉(zhuǎn)”了,類似一個(gè)調(diào)度器泻肯,對(duì)傳入的第一個(gè)err進(jìn)行辨別渊迁,如果有err則直接調(diào)最后的callback而不是下一個(gè)task。傳入的iterator灶挟,就是我們要執(zhí)行的下個(gè)tasks琉朽。
總結(jié)
waterfall的實(shí)現(xiàn)并不困難,有興趣的話可以自己嘗試下稚铣。源碼實(shí)現(xiàn)的非常簡潔漓骚,借用了iterator這個(gè)函數(shù)實(shí)現(xiàn)了一次包裝,將一個(gè)數(shù)組變成了迭代器榛泛,再用一個(gè)調(diào)度器去執(zhí)行迭代器蝌蹂。
補(bǔ)充最新版本代碼
版本是v2.1.4,現(xiàn)在async做了模塊化曹锨,每一個(gè)函數(shù)都封裝到了一個(gè)獨(dú)立的文件夾孤个。源代碼來源于官方文檔:
官方文檔
源代碼
import isArray from 'lodash/isArray';
import noop from 'lodash/noop';
import once from './internal/once';
import rest from './internal/rest';
import onlyOnce from './internal/onlyOnce';
/**
* Runs the `tasks` array of functions in series, each passing their results to
* the next in the array. However, if any of the `tasks` pass an error to their
* own callback, the next function is not executed, and the main `callback` is
* immediately called with the error.
*
* @name waterfall
* @static
* @memberOf module:ControlFlow
* @method
* @category Control Flow
* @param {Array} tasks - An array of functions to run, each function is passed
* a `callback(err, result1, result2, ...)` it must call on completion. The
* first argument is an error (which can be `null`) and any further arguments
* will be passed as arguments in order to the next task.
* @param {Function} [callback] - An optional callback to run once all the
* functions have completed. This will be passed the results of the last task's
* callback. Invoked with (err, [results]).
* @returns undefined
* @example
*
* async.waterfall([
* function(callback) {
* callback(null, 'one', 'two');
* },
* function(arg1, arg2, callback) {
* // arg1 now equals 'one' and arg2 now equals 'two'
* callback(null, 'three');
* },
* function(arg1, callback) {
* // arg1 now equals 'three'
* callback(null, 'done');
* }
* ], function (err, result) {
* // result now equals 'done'
* });
*
* // Or, with named functions:
* async.waterfall([
* myFirstFunction,
* mySecondFunction,
* myLastFunction,
* ], function (err, result) {
* // result now equals 'done'
* });
* function myFirstFunction(callback) {
* callback(null, 'one', 'two');
* }
* function mySecondFunction(arg1, arg2, callback) {
* // arg1 now equals 'one' and arg2 now equals 'two'
* callback(null, 'three');
* }
* function myLastFunction(arg1, callback) {
* // arg1 now equals 'three'
* callback(null, 'done');
* }
*/
export default function(tasks, callback) {
callback = once(callback || noop);
// 一些邊界處理
if (!isArray(tasks)) return callback(new Error('First argument to waterfall must be an array of functions'));
if (!tasks.length) return callback();
var taskIndex = 0;
// 調(diào)用這個(gè)函數(shù),則把a(bǔ)rgs傳給下一個(gè)task沛简,并遞歸調(diào)用自身齐鲤,直到最后個(gè)task
// 這個(gè)函數(shù)就是task和task之間的中轉(zhuǎn),負(fù)責(zé)處理err和args的傳遞
function nextTask(args) {
// 如果執(zhí)行完了最后一個(gè)task椒楣,則調(diào)用callback
if (taskIndex === tasks.length) {
return callback.apply(null, [null].concat(args));
}
// 給下一個(gè)task包裝了一層處理
var taskCallback = onlyOnce(rest(function(err, args) {
if (err) {
return callback.apply(null, [err].concat(args));
}
nextTask(args);
}));
// 把包裝后的下一個(gè)task作為最后個(gè)參數(shù)壓入當(dāng)前task的參數(shù)棧
args.push(taskCallback);
// 取出準(zhǔn)備調(diào)用的task给郊,taskIndex自增
var task = tasks[taskIndex++];
// 調(diào)用task
task.apply(null, args);
}
// 調(diào)用第一個(gè)task
nextTask([]);
}
小結(jié)
比起原來的代碼,新代碼更簡潔