Node異步編程
目前的異步編程主要解決方案有:
- 事件發(fā)布/訂閱模式
- Promise/Deferred模式
- 流程控制庫(kù)
事件發(fā)布/訂閱模式
Node自身提供了events模塊崇败,可以輕松實(shí)現(xiàn)事件的發(fā)布/訂閱
//訂閱
emmiter.on("event1",function(message){
console.log(message);
})
//發(fā)布
emmiter.emit("event1","I am mesaage!")羡疗;
偵聽(tīng)器可以很靈活地添加和刪除间雀,使得事件和具體處理邏輯之間可以很輕松的關(guān)聯(lián)和解耦
事件發(fā)布/訂閱模式常常用來(lái)解耦業(yè)務(wù)邏輯,事件發(fā)布者無(wú)需關(guān)注訂閱的偵聽(tīng)器如何實(shí)現(xiàn)業(yè)務(wù)邏輯镜硕,甚至不用關(guān)注有多少個(gè)偵聽(tīng)器存在,數(shù)據(jù)通過(guò)消息的方式可以很靈活的進(jìn)行傳遞辙浑。
下面的HTTP就是典型的應(yīng)用場(chǎng)景
var req = http.request(options,function(res){
res.on('data',function(chunk){
console.log('Body:'+ chunk);
})
res.on('end',function(){
//TODO
})
})
如果一個(gè)事件添加了超過(guò)10個(gè)偵聽(tīng)器,將會(huì)得到一條警告衷蜓,可以通過(guò)調(diào)用emmite.setMaxListeners(0)將這個(gè)限制去掉
繼承events模塊
var events = require('events');
function Stream(){
events.EventEmiiter.call(this);
}
util.inherits(Stream,events.EventEmitter);
利用事件隊(duì)列解決雪崩問(wèn)題
所謂雪崩問(wèn)題累提,就是在高訪問(wèn)量,大并發(fā)量的情況下緩存失效的情況磁浇,此時(shí)大量的請(qǐng)求同時(shí)融入數(shù)據(jù)庫(kù)中斋陪,數(shù)據(jù)庫(kù)無(wú)法同時(shí)承受如此大的查詢請(qǐng)求,進(jìn)而往前影響到網(wǎng)站整體的響應(yīng)速度
解決方案:
var proxy = new events.EventEmitter();
var status = "ready";
var seletc = function(callback){
proxy.once("selected",callback);//為每次請(qǐng)求訂閱這個(gè)查詢時(shí)間置吓,推入事件回調(diào)函數(shù)隊(duì)列
if(status === 'ready'){
status = 'pending';//設(shè)置狀態(tài)為進(jìn)行中以防止引起多次查詢操作
db.select("SQL",function(results){
proxy.emit("selected",results); //查詢操作完成后發(fā)布時(shí)間
status = 'ready';//重新定義為已準(zhǔn)備狀態(tài)
})
}
}
多異步之間的協(xié)作方案
以上情況事件與偵聽(tīng)器的關(guān)系都是一對(duì)多的鳍贾,但在異步編程中,也會(huì)出現(xiàn)事件與偵聽(tīng)器多對(duì)一的情況交洗。
這里以渲染頁(yè)面所需要的模板讀取、數(shù)據(jù)讀取和本地化資源讀取為例簡(jiǎn)要介紹一下
var count = 0 ;
var results = {};
var done = function(key,value){
result[key] = value;
count++;
if(count === 3){
render(results);
}
}
fs.readFile(template_path,"utf8",function(err,template){
done('template',template)
})
db.query(sql,function(err,data){
done('data',data);
})
l10n.get(function(err,resources){
done('resources',resources)
})
偏函數(shù)方案
var after = function(times,callback){
var count = 0, result = {};
return function(key,value){
results[key] = value;
count++;
if(count === times){
callback(results);
}
}
}
var done = after(times,render);
var emitter = new events.Emitter();
emitter.on('done',done); //一個(gè)偵聽(tīng)器
emitter.on('done',other); //如果業(yè)務(wù)增長(zhǎng)橡淑,可以完成多對(duì)多的方案
fs.readFile(template_path,"utf8",function(err,template){
emitter.emit('done','template',template);
})
db.query(sql,function(err,data){
emitter.emit('done','data',data);
})
l10n.get(function(err,resources){
emitter.emit('done','resources',resources)
})
引入EventProxy模塊方案
var proxy = new EventProxy();
proxy.all('template','data','resources',function(template,data,resources){
//TODO
})
fs.readFile(template_path,'utf8',function(err,template){
proxy.emit('template',template);
})
db.query(sql,function(err,data){
proxy.emit('data',data);
})
l10n.get(function(err,resources){
proxy.emit('resources',resources);
})
Promise/Deferred模式
以上使用事件的方式時(shí)构拳,執(zhí)行流程都需要被預(yù)先設(shè)定,這是發(fā)布/訂閱模式的運(yùn)行機(jī)制所決定的梁棠。
$.get('/api',{
success:onSuccess,
err:onError,
complete:onComplete
})
//需要嚴(yán)謹(jǐn)設(shè)置目標(biāo)
那么是否有一種先執(zhí)行異步調(diào)用置森,延遲傳遞處理的方式的?接下來(lái)要說(shuō)的就是針對(duì)這種情況的方式:Promise/Deferred模式
Promise/A
Promise/A提議對(duì)單個(gè)異步操作做出了這樣的抽象定義:
- Promise操作只會(huì)處在三種狀態(tài)的一種:未完成態(tài)符糊,完成態(tài)和失敗態(tài)凫海。
- Promise的狀態(tài)只會(huì)出現(xiàn)從未完成態(tài)向完成態(tài)或失敗態(tài)轉(zhuǎn)化,不能逆反男娄,完成態(tài)和失敗態(tài)不能相互轉(zhuǎn)化
- Promise的狀態(tài)一旦轉(zhuǎn)化行贪,就不能被更改。
一個(gè)Promise對(duì)象只要具備then()即可
- 接受完成態(tài)模闲、錯(cuò)誤態(tài)的回調(diào)方法
- 可選地支持progress事件回調(diào)作為第三個(gè)方法
- then()方法只接受function對(duì)象建瘫,其余對(duì)象將被忽略
- then()方法繼續(xù)返回Promise對(duì)象,以實(shí)現(xiàn)鏈?zhǔn)秸{(diào)用
通過(guò)Node的events模塊來(lái)模擬一個(gè)Promise的實(shí)現(xiàn)
var Promise = function(){
EventEmitter.call(this)
}
util.inherits(Promise,EventEmitter);
Promise.prototype.then = function(fulfilledHandler,errHandler,progeressHandler){
if(typeof fulfilledHandler === 'function'){
this.once('success',fulfilledHandler); //實(shí)現(xiàn)監(jiān)聽(tīng)對(duì)應(yīng)事件
}
if(typeof errorHandler === 'function'){
this.once('error',errorHandler)
}
if(typeof progressHandler === 'function'){
this.on('progress',progressHandler);
}
return this;
}
以上通過(guò)then()將回調(diào)函數(shù)存放起來(lái)尸折,接下來(lái)就是等待success啰脚、error、progress事件被觸發(fā)实夹,實(shí)現(xiàn)這個(gè)功能的對(duì)象稱為Deferred對(duì)象橄浓,即延遲對(duì)象。
var Deferred = function(){
this.state = 'unfulfilled';
this.promise = new Promise();
}
Deferred.prototype.resolve = function(obj){ //當(dāng)異步完成后可將resolve作為回調(diào)函數(shù)亮航,觸發(fā)相關(guān)事件
this.state = 'fulfilled';
this.promise.emit('success',obj);
}
Deferred.prototype.reject = function(err){
this.state = 'failed';
this.promise.emit('error',err);
}
Deferred.prototype.progress = function(data){
this.promise.emit('progress',data)
}
因此荸实,可以對(duì)一個(gè)典型的響應(yīng)對(duì)象進(jìn)行封裝
res.setEncoding('utf8');
res.on('data',function(chunk){
console.log("Body:" + chunk);
})
res.on('end',function(){
//done
})
res.on('error',function(err){
//error
}
轉(zhuǎn)換成
res.then(function(){
//done
},function(err){
//error
},function(chunk){
console.log('Body:' + chunk);
})
要完成上面的轉(zhuǎn)換塞赂,首先需要對(duì)res對(duì)象進(jìn)行封裝泪勒,對(duì)data,end,error等事件進(jìn)行promisify
var promisify = function(res){
var deferred = new Deferred(); //創(chuàng)建一個(gè)延遲對(duì)象來(lái)在res的異步完成回調(diào)中發(fā)布相關(guān)事件
var result = ''; //用來(lái)在progress中持續(xù)接收數(shù)據(jù)
res.on('data',function(chunk){ //res的異步操作昼蛀,回調(diào)中發(fā)布事件
result += chunk;
deferred.progress(chunk);
})
res.on('end',function(){
deferred.resolve(result);
})
res.on('error',function(err){
deferred.reject(err);
});
return deferred.promise //返回deferred.promise,讓外界不能改變deferred的狀態(tài)圆存,只能讓promise的then()方法去接收外界來(lái)偵聽(tīng)相關(guān)事件叼旋。
}
promisify(res).then(function(){
//done
},function(err){
//error
},function(chunk){
console.log('Body:' + chunk);
})
以上,它將業(yè)務(wù)中不可變的部分封裝在了Deferred中沦辙,將可變的部分交給了Promise
Promise中的多異步協(xié)作
Deferred.prototype.all = function(promises){
var count = promises.length; //記錄傳進(jìn)的promise的個(gè)數(shù)
var that = this; //保存調(diào)用all的對(duì)象
var results = [];//存放所有promise完成的結(jié)果
promises.forEach(function(promise,i){//對(duì)promises逐個(gè)進(jìn)行調(diào)用
promise.then(function(data){//每個(gè)promise成功之后夫植,存放結(jié)果到result中,count--油讯,直到所有promise被處理完了详民,才出發(fā)deferred的resolve方法,發(fā)布事件陌兑,傳遞結(jié)果出去
count--;
result[i] = data;
if(count === 0){
that.resolve(results);
}
},function(err){
that.reject(err);
});
});
return this.promise; //返回promise來(lái)讓外界偵聽(tīng)這個(gè)deferred發(fā)布的事件沈跨。
}
var promise1 = readFile('foo.txt','utf-8');//這里的文件讀取已經(jīng)經(jīng)過(guò)promise化
var promise2 = readFile('bar.txt','utf-8');
var deferred = new Deferred();
deferred.all([promise1,promise2]).thne(function(results){//promise1和promise2的then方法在deferred內(nèi)部的all方法所調(diào)用,用于同步所有的promise
//TODO
},function(err){
//TODO
})
支持序列執(zhí)行的Promise
嘗試改造一下代碼以實(shí)現(xiàn)鏈?zhǔn)秸{(diào)用
var Deferred = function(){
this.promise = new Promise()
}
//完成態(tài)
Deferred.prototype.resolve = function(obj){
var promise = this.promise;
var handler;
while((handler = promise.queue.shift())){
if(handler && handler.fulfilled){
var ret = handler.fulfilled(obj);
if(ret && ret.isPromise){
ret.queue = promise.queue;
this.promise = ret;
return;
}
}
}
}
//失敗態(tài)
Deferred.prototype.reject = function(err){
var promise = this.promise;
var handler;
while((handler = promise.queue.shift())){
if(handler && handler.error){
var ret = handler.error(err);
if(ret && ret.isPromise){
ret.queue = promise.queue;
this.promise = ret;
return
}
}
}
}
//生成回調(diào)函數(shù)
Deferred.prototype.callback = function(){
var that = this;
return function(err,file){
if(err){
return that.reject(err);
}
that.resolve(file)
}
}
var Promise = function(){
this.queue = []; //隊(duì)列用于存儲(chǔ)待執(zhí)行的回到函數(shù)
this.isPromise = true;
};
Promise.prototype.then = function(fulfilledHandler,errorHandler,progressHandler){
var handler = {};
if(typeof fulfilledHandler === 'function'){
handler.fulfilled = fulfilledHandler;
}
if(typeof errorHandler === 'function'){
handler.error = errorHandler;
}
this.queue.push(handler);
return this;
}
var readFile1 = function(file,encoding){
var deferred = new Deferred();
fs.readFile(file,encoding,deferred.callback());
return deferred.promise;
}
var readFile2 = function(file,encoding){
var deferred = new Deferred();
fs.readFile(file,encoding,deferred.callback());
return deferred.promise;
}
readFile1('file1.txt','utf8').then(function(file1){
return readFile2(file1.trim(),'utf8')
}).then(function(file2){
console.log(file2)
})
流程控制庫(kù)另外進(jìn)行總結(jié)
以上參考《深入淺出node.js》一書(shū)