在操作系統(tǒng)的底層異步通過信號(hào)量、信息等方式得到廣泛應(yīng)用。但在主流高級(jí)編程語言中,基于程序員的思維方式異步并不多見库北。
異步I/O的優(yōu)勢(shì)
- 良好的用戶體驗(yàn):在瀏覽器中,JavaScript 的執(zhí)行與UI渲染共用一個(gè)線程们陆,所以異步可消除UI阻塞的問題寒瓦;第二當(dāng)頁(yè)面所需的資源路勁較多時(shí),可通過異步并發(fā)提升資源獲取總時(shí)長(zhǎng)坪仇。
- 資源分配:并發(fā)操作之前主流的做法是多線程(或進(jìn)程)并行杂腰,但多線程會(huì)存在死鎖及狀態(tài)同步等問題,多進(jìn)程的資源開銷也很大椅文。Node 通過類似工作子進(jìn)程(線程)的概念來實(shí)現(xiàn)異步I/O喂很。
異步I/O的實(shí)現(xiàn)
實(shí)現(xiàn)原理
- 基于輪詢:read / select / poll / epoll (輪詢-休眠-通過事件喚醒) / kqueue (FreeBSD 下的 epoll)
- 基于事件:程序發(fā)起非阻塞調(diào)用后惜颇,繼續(xù)向下執(zhí)行,當(dāng)異步操作完成后通過信號(hào)或回調(diào)函數(shù)將數(shù)據(jù)回傳給程序少辣。實(shí)現(xiàn):Linux 下的 AIO凌摄,但僅支持內(nèi)核I/O。
Node中異步I/O的實(shí)現(xiàn)
在 libuv 0.1中:Node 在 *unix 平臺(tái)下自行設(shè)計(jì)線程池來模擬異步I/O漓帅; 在 Window 下通過操作系統(tǒng)層的 IOCP (內(nèi)部也是線程池原理) 進(jìn)行異步操作锨亏。通過 libuv 實(shí)現(xiàn)抽象兼容。在libuv 1.0 中: 兩個(gè)平臺(tái)都是用統(tǒng)一的線程池來處理異步任務(wù)忙干。所以 Node 的單線程只是 JavaScript 執(zhí)行在單線程中器予,內(nèi)部完成異步任務(wù)都需要線程池的支持。
Node 的異步I/O
事件循環(huán)
Node 進(jìn)程啟動(dòng)時(shí)會(huì)創(chuàng)建一個(gè)循環(huán)捐迫,每執(zhí)行一次循環(huán)體的過程稱為 Tick乾翔,每個(gè) Tick 的過程就是查看是否有事件或關(guān)聯(lián)的回調(diào)函數(shù)待處理,有執(zhí)行施戴,無進(jìn)入下個(gè)循環(huán)末融。如果判斷不再有任何事件需要處理則退出進(jìn)程。
觀察者
Tick 通過 觀察者
來獲知是否有事件需要處理暇韧。每個(gè)事件循環(huán)中可存在多個(gè)觀察者。
請(qǐng)求對(duì)象
在調(diào)用 Node 異步接口的時(shí)候浓瞪,會(huì)創(chuàng)建一個(gè)請(qǐng)求對(duì)象用于保存所有異步調(diào)用的狀態(tài)懈玻,包括等待線程池分配執(zhí)行,操作完成后的回調(diào)函數(shù)乾颁。
這里以 fs.open
為例
內(nèi)部調(diào)用代碼:
fs.open = function(path, flags, mode, callback_) {
var callback = makeCallback(arguments[arguments.length - 1]);
mode = modeNum(mode, 0o666);
if (!nullCheck(path, callback)) return;
var req = new FSReqWrap();
req.oncomplete = callback;
binding.open(pathModule._makeLong(path),
stringToFlags(flags),
mode,
req);
};
其中 FSReqWrap
即為請(qǐng)求對(duì)象涂乌。其繼承自 ReqWrap
class FSReqWrap: public ReqWrap<uv_fs_t> {
public:
enum Ownership { COPY, MOVE };
inline static FSReqWrap* New(Environment* env,
Local<Object> req,
const char* syscall,
const char* data = nullptr,
enum encoding encoding = UTF8,
Ownership ownership = COPY);
inline void Dispose();
};
uv_fs_t
結(jié)構(gòu)體:
/* uv_fs_t is a subclass of uv_req_t. */
struct uv_fs_s {
UV_REQ_FIELDS
uv_fs_type fs_type;
uv_loop_t* loop;
uv_fs_cb cb;
ssize_t result;
void* ptr;
const char* path;
uv_stat_t statbuf; /* Stores the result of uv_fs_stat() and uv_fs_fstat(). */
UV_FS_PRIVATE_FIELDS
};
#define UV_REQ_FIELDS \
/* public */ \
void* data; \
/* read-only */ \
uv_req_type type; \
/* private */ \
void* active_queue[2]; \
void* reserved[4]; \
UV_REQ_PRIVATE_FIELDS \
oncomplete
用于保存回調(diào)函數(shù)。
執(zhí)行回調(diào)函數(shù)
當(dāng)線程池執(zhí)行完響應(yīng)的操作后英岭,將處理結(jié)果存儲(chǔ)在 req->result
中湾盒,發(fā)起回調(diào),再調(diào)用存儲(chǔ)在 reqWrap 中的回調(diào)函數(shù)
req_wrap->MakeCallback(env->oncomplete_string(), argc, argv);
非 I/O 的異步 API
定時(shí)器 setTimeout() / setInterval()
setTimeout() 和 setInterval() 被調(diào)用時(shí)會(huì)創(chuàng)建一個(gè)uv_timer插入到定時(shí)器觀察者
中诅妹。每次 Tick 執(zhí)行時(shí)罚勾,會(huì)從觀察者中迭代取出uv_timer,檢測(cè)其是否超過定時(shí)時(shí)間吭狡,如果超過尖殃,就形成事件,執(zhí)行回調(diào)函數(shù)划煮。正是基于該原理所以 JavaScript 中的定時(shí)器是不精確的送丰。
通過 lib/internel/bootstrap_node.js
下 setupGlobalTimeouts
函數(shù)引入,具體處理邏輯在 lib/timers.js
中弛秋。
function setupGlobalTimeouts() {
const timers = NativeModule.require('timers');
global.clearImmediate = timers.clearImmediate;
global.clearInterval = timers.clearInterval;
global.clearTimeout = timers.clearTimeout;
global.setImmediate = timers.setImmediate;
global.setInterval = timers.setInterval;
global.setTimeout = timers.setTimeout;
}
所有特定 timeout 的 timer 存放在一個(gè) timeoutList 中器躏,每個(gè) timeoutList 都綁定一個(gè) c++ 下的 uv_timer
俐载,之后 uv_timer 被加入到 event_loop 的 handle_queue 中,事件循環(huán)會(huì)檢測(cè)該 uv_timer 的 timeout 時(shí)間是否到了登失,檢測(cè)到 timeout 時(shí) uv_timer 會(huì)調(diào)起 listOnTimeout
函數(shù)遏佣,去檢測(cè)對(duì)應(yīng)的 timeoutList 中是否有 timer
定時(shí)任務(wù)到期需要執(zhí)行的,有則將其從該定時(shí)列表中剔除掉壁畸,再執(zhí)行該 timer 的回調(diào)函數(shù)贼急。
function listOnTimeout() {
var list = this._list;
var msecs = list.msecs;
var now = TimerWrap.now();
var diff, timer;
while (timer = L.peek(list)) {
diff = now - timer._idleStart;
// Check if this loop iteration is too early for the next timer.
// This happens if there are more timers scheduled for later in the list.
if (diff < msecs) {
// 由于該定時(shí)列表中較早加入的timer到期,`內(nèi)部timer` 調(diào)起該list的回調(diào)函數(shù)捏萍,其他后加入的timer就被提早通知太抓,
// 發(fā)起 `內(nèi)部核心timer` 新的 start, 等待事件下次調(diào)起
}
// The actual logic for when a timeout happens.
L.remove(timer);
assert(timer !== L.peek(list));
if (!timer._onTimeout) continue;
// 調(diào)用實(shí)際的回調(diào)函數(shù)
// 當(dāng)回調(diào)函數(shù)執(zhí)行出錯(cuò)時(shí),在下一個(gè) tick 重新執(zhí)行該 timeoutList 的 listOnTimeout 函數(shù)
tryOnTimeout(timer, list);
setInterval() 與 setTimeout() 使用相同的邏輯處理令杈,只是當(dāng) timeout 執(zhí)行完回調(diào)函數(shù)后走敌,檢測(cè)到 timer 的 _repeat
屬性不為空,則再將其加入 timeoutList 中逗噩。
未搞懂的地方:內(nèi)部 uv_timer 與 事件輪詢的調(diào)用邏輯
setImmediate()
所有 immediate 被存放到一個(gè) immediateQueue
的 linked list 中, 事件循環(huán)執(zhí)行到 uv_check
掉丽,回調(diào)到 processImmediate
函數(shù)
unction createImmediate(args, callback) {
// declaring it `const immediate` causes v6.0.0 to deoptimize this function
var immediate = new Immediate();
immediate._callback = callback;
immediate._argv = args;
immediate._onImmediate = callback;
if (!process._needImmediateCallback) {
process._needImmediateCallback = true;
process._immediateCallback = processImmediate;
}
immediateQueue.append(immediate);
return immediate;
}
processImmediate 函數(shù)中檢測(cè) immediateQueue 中待執(zhí)行的任務(wù)并執(zhí)行。
function processImmediate() {
var immediate = immediateQueue.head;
var tail = immediateQueue.tail;
var domain;
// Clear the linked list early in case new `setImmediate()` calls occur while
// immediate callbacks are executed
immediateQueue.head = immediateQueue.tail = null;
while (immediate) {
domain = immediate.domain;
if (!immediate._onImmediate) {
immediate = immediate._idleNext;
continue;
}
if (domain)
domain.enter();
immediate._callback = immediate._onImmediate;
// Save next in case `clearImmediate(immediate)` is called from callback
var next = immediate._idleNext;
tryOnImmediate(immediate, tail);
if (domain)
domain.exit();
// If `clearImmediate(immediate)` wasn't called from the callback, use the
// `immediate`'s next item
if (immediate._idleNext)
immediate = immediate._idleNext;
else
immediate = next;
}
// Only round-trip to C++ land if we have to. Calling clearImmediate() on an
// immediate that's in |queue| is okay. Worst case is we make a superfluous
// call to NeedImmediateCallbackSetter().
if (!immediateQueue.head) {
process._needImmediateCallback = false;
}
}
注意:《深入淺出Node.js》書中說:setImmediate() 在每輪循環(huán)中執(zhí)行鏈表中的一個(gè)回調(diào)函數(shù)异雁,這里看到 while (immediate)
, 所以 node v6.x (事實(shí)上從 v0.12.x 開始)中已經(jīng)不是這樣的了捶障。以下代碼可用來校驗(yàn)。
// 用于校驗(yàn)的代碼
process.nextTick(function () {
console.log('nextTick -- 1')
})
setImmediate(function () {
console.log('setImmediate -- 1')
process.nextTick(function () {
console.log('nextTick -- run')
})
})
setImmediate(function () {
console.log('setImmediate -- 2')
})
process.nextTick(function () {
console.log('nextTick -- 2')
})
console.log('start')
/* log
start
nextTick -- 1
nextTick -- 2
setImmediate -- 1
setImmediate -- 2
nextTick -- run
*/
process.nextTick()
調(diào)用 process.nextTick() 時(shí)纲刀,會(huì)將相關(guān)參數(shù) push 到 nextTickQueue
中项炼,然后當(dāng)事件循環(huán)進(jìn)入下一個(gè) tick 時(shí)調(diào)用 _tickCallback
函數(shù)
創(chuàng)建一個(gè) nextTick 回調(diào)任務(wù)
function nextTick(callback) {
if (typeof callback !== 'function')
throw new TypeError('callback is not a function');
// on the way out, don't bother. it won't get fired anyway.
if (process._exiting)
return;
var args;
if (arguments.length > 1) {
args = new Array(arguments.length - 1);
for (var i = 1; i < arguments.length; i++)
args[i - 1] = arguments[i];
}
nextTickQueue.push({
callback,
domain: process.domain || null,
args
});
tickInfo[kLength]++;
}
處理 nextTickQueue 中的回調(diào)任務(wù),這里 while (tickInfo[kIndex] < tickInfo[kLength])
可以看出在一個(gè) tick 中是一次性處理掉所有的當(dāng)前 nextTickQueue 中的任務(wù)示绊。
function _tickCallback() {
var callback, args, tock;
do {
while (tickInfo[kIndex] < tickInfo[kLength]) {
tock = nextTickQueue[tickInfo[kIndex]++];
callback = tock.callback;
args = tock.args;
// Using separate callback execution functions allows direct
// callback invocation with small numbers of arguments to avoid the
// performance hit associated with using `fn.apply()`
_combinedTickCallback(args, callback); // 執(zhí)行 nextTcik 的回調(diào)
if (1e4 < tickInfo[kIndex])
tickDone();
}
tickDone();
_runMicrotasks(); // 重新啟動(dòng) v8 的微任務(wù)
emitPendingUnhandledRejections(); // 處理當(dāng)前 tick 中的 promise 任務(wù)
} while (tickInfo[kLength] !== 0);
}
Promise
Promise 任務(wù)同 nextTick 一樣都是通過 v8 MicroTasks 來執(zhí)行