事件發(fā)布/訂閱模式
事件監(jiān)聽器模式是一種廣泛用于異步編程的模式峡竣,是回調(diào)函數(shù)的事件化,又稱發(fā)布/訂閱模式。
// 訂閱
emitter.on("event1", function (message) {
console.log(message);
});
// 發(fā)布
emitter.emit('event1', "I am message!");
事件發(fā)布/訂閱模式可以實(shí)現(xiàn)一個(gè)事件與多個(gè)回調(diào)函數(shù)的關(guān)聯(lián),這些回調(diào)函數(shù)又稱為事件偵聽器饥瓷。通過emit()發(fā)布事件后,消息會(huì)立即傳遞給當(dāng)前事件的所有偵聽器執(zhí)行痹籍,偵聽器可以很靈活地添加和刪除呢铆,使得事件和具體處理邏輯之間可以很輕松地關(guān)聯(lián)和解耦。因?yàn)槭录l(fā)布者無須關(guān)注訂閱的偵聽器如何實(shí)現(xiàn)業(yè)務(wù)邏輯蹲缠,甚至不用關(guān)注有多少個(gè)偵聽器存在棺克,數(shù)據(jù)通過消息的方式可以很靈活的傳遞。
事件發(fā)布/訂閱模式自身并無同步和異步調(diào)用的問題线定,但在Node中娜谊,emit()調(diào)用多半是伴隨事件循環(huán)而異步觸發(fā)的,所以我們說事件發(fā)布/訂閱廣泛應(yīng)用于異步編程斤讥。
事件偵聽器模式也是一種鉤子(hook)機(jī)制纱皆,利用鉤子導(dǎo)出內(nèi)部數(shù)據(jù)或狀態(tài)給外部的調(diào)用者,Node中的很多對(duì)象具有黑盒的特點(diǎn),功能點(diǎn)較少抹剩,如果不通過事件鉤子的形式撑帖,我們就無法獲取對(duì)象在運(yùn)行期間的中間值或內(nèi)部狀態(tài)。這樣澳眷,可以使編程者不用關(guān)注組件是如何啟動(dòng)和執(zhí)行的,只需關(guān)注在需要的事件點(diǎn)上即可蛉艾。
例如HTTP請(qǐng)求的代碼中钳踊,程序員只需要將視線放在error、data勿侯、end這些業(yè)務(wù)事件點(diǎn)上即可拓瞪,至于內(nèi)部的流程如何,無需過于關(guān)注助琐。
值得一提的是祭埂,Node對(duì)事件發(fā)布/訂閱的機(jī)制做了一些額外的處理,這大多是基于健壯性而考慮的兵钮,下面為兩個(gè)具體的細(xì)節(jié)點(diǎn)蛆橡。
1、如果對(duì)一個(gè)事件添加了超過10個(gè)偵聽器掘譬,將會(huì)得到一條警告泰演,因?yàn)樵O(shè)計(jì)者認(rèn)為偵聽器太多可能導(dǎo)致內(nèi)存泄露,也可能存在過多占用CPU的場(chǎng)景葱轩。調(diào)用emitter.setMaxListeners(0);可以將這個(gè)限制去掉睦焕。
2、為了處理異常靴拱,EventEmitter對(duì)象對(duì)error事件進(jìn)行了特殊對(duì)待垃喊。如果運(yùn)行期間的錯(cuò)誤觸發(fā)了error事件,EventEmitter會(huì)檢查是否有對(duì)error事件添加過偵聽器袜炕,如果添加了本谜,這個(gè)錯(cuò)誤將會(huì)交由改偵聽器處理,否則這個(gè)錯(cuò)誤將作為異常拋出妇蛀,如果外部沒有捕獲這個(gè)異常耕突,將會(huì)引起線程退出。
繼承events模塊
在Node中评架,開發(fā)者可以輕松的繼承EventEmitter類眷茁,利用事件機(jī)制來解決業(yè)務(wù)問題。
var events = require('events');
function Stream() {
events.EventEmitter.call(this);
}
util.inherits(Stream, events.EventEmitter);
// util.inherits封裝了繼承的方法
利用事件隊(duì)列解決雪崩問題
在計(jì)算機(jī)中纵诞,緩存由于存放在內(nèi)存中上祈,訪問速度十分快,常常用于加速數(shù)據(jù)訪問,讓絕大多數(shù)的請(qǐng)求不必重復(fù)去做一些低效的數(shù)據(jù)讀取登刺,所謂雪崩問題籽腕,就是在高訪問量、大并發(fā)量的情況下緩存失效的情景纸俭,此時(shí)大量的請(qǐng)求同時(shí)涌入數(shù)據(jù)庫(kù)中皇耗,數(shù)據(jù)庫(kù)無法同時(shí)承受如此大的查詢請(qǐng)求,導(dǎo)致崩潰揍很。
雪崩的過程:
1郎楼、redis集群徹底崩潰
2、緩存服務(wù)大量對(duì)redis的請(qǐng)求hang住窒悔,占用資源
3呜袁、緩存服務(wù)大量的請(qǐng)求打到源頭服務(wù)去查詢mysql,直接打死m(xù)ysql
4简珠、源頭服務(wù)因?yàn)閙ysql被打死也崩潰阶界,對(duì)源服務(wù)的請(qǐng)求也hang住,占用資源
5聋庵、緩存服務(wù)大量的資源全部耗費(fèi)在訪問redis和源服務(wù)無果膘融,最后自己被拖死,無法提供服務(wù)
6珍策、nginx無法訪問緩存服務(wù)托启,redis和源服務(wù),只能基于本地緩存提供服務(wù)攘宙,但是緩存過期后屯耸,沒有數(shù)據(jù)提供
7、網(wǎng)站崩潰
var events = require('events');
var proxy = new events.EventEmitter();
var status = "ready";
var select = function (callback) {
proxy.once("selected", callback);
if (status === "ready") {
status = "pending";
db.select("SQL", function (results) {
proxy.emit("selected", results);
status = "ready";
});
}
};
當(dāng)進(jìn)行多次同一SQL操作的時(shí)候蹭劈,所有的回調(diào)在一個(gè)查詢周期中(ready - pending - ready)都會(huì)被壓入事件隊(duì)列中疗绣,等待執(zhí)行,并且利用了once()確保了所有回調(diào)都只執(zhí)行一次后被移除铺韧,(因?yàn)楸O(jiān)聽的都是selected事件多矮,在多次emit的時(shí)候會(huì)多次觸發(fā)回調(diào))。當(dāng)查詢結(jié)束后哈打,調(diào)用emit觸發(fā)selected事件塔逃,執(zhí)行事件隊(duì)列中所有相關(guān)回調(diào)。這種方式節(jié)省了重復(fù)的數(shù)據(jù)庫(kù)調(diào)用產(chǎn)生的開銷料仗。
多異步之間的協(xié)作方案
一般而言湾盗,事件與偵聽器的關(guān)系是一對(duì)多,但在異步編程中立轧,也會(huì)出現(xiàn)事件與偵聽器的關(guān)系是多對(duì)一的格粪,也就是說一個(gè)業(yè)務(wù)邏輯可能依賴多個(gè)事件回調(diào)的結(jié)果躏吊。
場(chǎng)景:渲染頁面需要模板讀取、數(shù)據(jù)讀取帐萎、本地化資源讀取這三步比伏,得到三種數(shù)據(jù)進(jìn)行最終渲染,且這三種操作互不依賴疆导。
var count = 0;
var results = {};
var done = function(key, value) {
results[key] = value;
count++;
if(count === 3) {
// 渲染頁面
render(results)
}
}
fs.readFile(template_path, "utf8", function(err, template){
done("template", template)
})
db.query(sql, function(err, data){
done("data", data)
})
l10n.get(function(err, data){
done("resources", resources)
})
因?yàn)槿齻€(gè)操作互不依賴赁项,當(dāng)count = 3的時(shí)候,說明三個(gè)操作都成功完成澈段,得到想要的數(shù)據(jù)開始渲染頁面肤舞,一般會(huì)把這個(gè)用于檢測(cè)次數(shù)的變量叫做哨兵變量。
在多對(duì)多的場(chǎng)景中均蜜,可以使用發(fā)布/訂閱的方式來完成一對(duì)多的發(fā)散。
// 使用偏函數(shù)完成 多對(duì)一的收斂
var after = function(times, callback) {
var count = 0;
var results = {};
return function(key, value){
results[key] = value;
count++;
if(count === times) {
// 渲染頁面
callback(results)
}
}
}
// 使用發(fā)布/訂閱的方式來完成一對(duì)多的發(fā)散
var emitter = new events.EventEmitter();
var done = after(3, render);
emitter.on("done", done); // 用于渲染
emitter.on("done", other); // 統(tǒng)一獲取相同數(shù)據(jù)的操作芒率,用于別的用途
fs.readFile(template_path, "utf8", function (err, template) {
emitter.emit("done", "template", template);
});
db.query(sql, function (err, data) {
emitter.emit("done", "data", data);
});
l10n.get(function (err, resources) {
emitter.emit("done", "resources", resources);
});
Promise/ Deferred 模式
在異步調(diào)用中囤耳,回調(diào)總是需要被預(yù)先設(shè)定,所以出現(xiàn)了Promise/ Deferred 模式來實(shí)現(xiàn)先執(zhí)行異步調(diào)用偶芍,延遲傳遞回調(diào)充择。
1、可以對(duì)一個(gè)事件傳入多個(gè)回調(diào)
2匪蟀、寫法優(yōu)雅椎麦,一定程度的緩解了嵌套過深的問題。
$.get('/api', {
success: onSuccess,
error: onError
})
// 變遷為
$.get('/api')
.success(onSuccess)
.success(onSuccess2)
.error(onError)
使用events模塊的簡(jiǎn)單實(shí)現(xiàn)
const EventEmitter = require('events').EventEmitter;
util.inherits(Promise, EventEmitter);
Promise.prototype.then = function (fulfilledHandler, errorHandler, progressHandler) {
if (typeof fulfilledHandler === 'function') {
this.once('success', fulfilledHandler);
}
if (typeof errorHandler === 'function') {
this.once('error', errorHandler);
}
if (typeof progressHandler === 'function') {
this.on('progress', progressHandler);
}
return this;
};
var Deferred = function () {
this.state = 'unfulfilled';
this.promise = new Promise();
};
Deferred.prototype.resolve = function (obj) {
this.state = 'fulfilled';
this.promise.emit('success', obj);
};
Deferred.prototype.reject = function (err) {
this.state = 'failed';
this.promise.emit('error', err);
};
Deferred.prototype.progress = function (data) {
this.promise.emit('progress', data);
};
var promisify = function (res) {
var deferred = new Deferred();
var result = '';
res.on('data', function (chunk) {
result += chunk;
deferred.progress(chunk);
});
res.on('end', function () {
deferred.resolve(result);
});
res.on('error', function (err) {
deferred.reject(err);
});
return deferred.promise;
};
// 調(diào)用
promisify(res).then(function () {
// Done
}, function (err) {
// Error
}, function (chunk) {
// progress
console.log('BODY: ' + chunk);
});
Deferred主要是用于內(nèi)部材彪, 用于維護(hù)異步模型的狀態(tài)观挎,Promise則作用于外部,通過then()方法暴露給外部以添加自定義邏輯段化。
多異步協(xié)作
在ES6中Promise的實(shí)現(xiàn)中嘁捷,是使用Promise.all()這個(gè)方法實(shí)現(xiàn),它接受一個(gè)promise實(shí)例組成的數(shù)組作為參數(shù)显熏,使用一個(gè)新的Promise包裹promise的循環(huán)調(diào)用操作雄嚣,當(dāng)所有promise實(shí)例調(diào)用完成時(shí),resolve這個(gè)新的Promise喘蟆,期間如果發(fā)生錯(cuò)誤就reject這個(gè)新的Promise缓升。
鏈?zhǔn)秸{(diào)用
1、將所有的回調(diào)都存到隊(duì)列中蕴轨。
2港谊、Promise完成時(shí),逐個(gè)執(zhí)行回調(diào)尺棋,一旦檢測(cè)到執(zhí)行回調(diào)返回了新的Promise時(shí)封锉,停止執(zhí)行绵跷,調(diào)用其的then方法并將隊(duì)列中余下的回調(diào)轉(zhuǎn)交給它。
相關(guān)邏輯都在then方法中實(shí)現(xiàn)
{{% notice info %}}
關(guān)于Promise的具體實(shí)現(xiàn)參考:一個(gè)Promise實(shí)現(xiàn)
成福、
ES6-Promise源碼
{{% /notice %}}
流程控制
除了事件和Promise外碾局, 還有一類方法是需要手工調(diào)用才能持續(xù)執(zhí)行后續(xù)調(diào)用的,我們將此類方法叫做尾觸發(fā)奴艾,常見的關(guān)鍵詞是next净当。
ES6中Generator函數(shù)就是采用這類方法來控制流程,同時(shí)最新的async await 相關(guān)API更是將寫法變得更加方便蕴潦。
{{% notice info %}}
參考:
Generator 函數(shù)的異步應(yīng)用像啼、
async 函數(shù)、
異步流程控制
{{% /notice %}}
事件發(fā)布/訂閱模式相對(duì)算是一種較為原始的方式潭苞,Promise/Deferred模式貢獻(xiàn)了一個(gè)非常不錯(cuò)的異步任務(wù)模型的抽象忽冻,而異步流程控制方案與Promise/Deferred模式的思路不同,Promise/Deferred的重頭在于封裝異步的調(diào)用部分此疹,流程控制則顯得沒有模式僧诚,將處理的重點(diǎn)放置在回調(diào)函數(shù)的注入上,從自由度來講蝗碎,流程控制相對(duì)靈活得多湖笨。