autocannon是純node實現(xiàn)的接口壓力測試工具,市面上類似的產(chǎn)品很多,老牌的AB,帶有圖形界面的soap ui等.不過autocannon可以方便的進行命令行調(diào)用,甚至在代碼內(nèi)調(diào)用,這對于nodejs項目的單元測試來說是相當方便的.
下面就來簡單分析一下他的源碼.
綜述
上一篇我對winston框架做了源碼分析,其核心關鍵詞是流,通過流將模塊鏈接起來.
而對于autocannon,關鍵詞是事件,無數(shù)的事件傳遞是模塊之間產(chǎn)生關系的橋梁.
代碼架構
用madge生成的結構圖.
autocannon.js
上面說到,autocannon可以方便的進行命令行調(diào)用.全局安裝時,它就是一個命令行工具,因此需要接收運行參數(shù).
引用了minimist這個庫使得它能像其他命令行工具一樣,既能接收完整參數(shù),也可以接收別名參數(shù).
nodejs同python一樣,文件既可以是執(zhí)行的主體,也可以是供其他應用調(diào)用的模塊.這里使用了一個官方文檔的小技巧,區(qū)分autocannon到底是被當做命令行工具還是被當做模塊來調(diào)用.
When a file is run directly from Node, require.main is set to its module.
if (require.main === module) {
const argv = crossArgv(process.argv.slice(2))
start(parseArguments(argv))
}
作為程序的入口,autocannon.js一半的篇幅用于處理運行參數(shù)的初始化與規(guī)范化.
保證輸入的參數(shù)的規(guī)范后,便開始按參數(shù)運行整個程序.
運行程序的方法是很簡單的:
const run = require('./lib/run')
const track = require('./lib/progressTracker')
...
function runTracker (argv, ondone) {
//按參數(shù)執(zhí)行壓測
const tracker = run(argv)
tracker.on('done', (result) => {
if (ondone) ondone()
if (argv.json) {
console.log(JSON.stringify(result))
}
})
tracker.on('error', (err) => {
if (err) {
throw err
}
})
//綁定到實時進度顯示
if (!argv.json || !process.stdout.isTTY) track(tracker, argv)
process.once('SIGINT', () => {
tracker.stop()
})
}
不過,在運行前,autocannon還對一種特殊參數(shù)做了處理.當參數(shù)里包含'--on-port'項時,可以追加一個參數(shù)'--'.
'--'后面跟隨的命令,會在監(jiān)聽端口時執(zhí)行.這個方法一般用于單元測試.
if (argv.onPort) {
//兼容性判斷,需要node 8+支持的異步鉤子
if (!hasAsyncHooks()) {
console.error('The --on-port flag requires the async_hooks builtin module')
process.exit(1)
}
//利用進程間通信創(chuàng)建一個自定義服務啟動檢測器
const { socketPath, server } = createChannel((port) => {
//自定義的服務會傳回自己的端口
const url = new URL(argv.url, `http://localhost:${port}`).href
const opts = Object.assign({}, argv, {
onPort: false,
url: url
})
//按照參數(shù)執(zhí)行壓測
runTracker(opts, () => {
proc.kill('SIGINT')
server.close()
})
})
//將預加載項加入環(huán)境變量的PATH中去
const alterPath = managePath({ PATH: process.env.NODE_PATH })
alterPath.unshift(path.join(__dirname, 'lib/preload'))
//argv.spawn[]此時是命令'--'后面跟隨的命令,用于執(zhí)行指定的額外自定義服務
//這個服務需要實現(xiàn)通過AUTOCANNON_SOCKET,將自己的端口告訴給autocannon的功能
//啟動這個自定義服務前在服務里加載這個預加載模塊autocannonDetectPort
const proc = spawn(argv.spawn[0], argv.spawn.slice(1), {
stdio: ['ignore', 'inherit', 'inherit'],
env: Object.assign({}, process.env, {
NODE_OPTIONS: ['-r', 'autocannonDetectPort'].join(' ') +
(process.env.NODE_OPTIONS ? ` ${process.env.NODE_OPTIONS}` : ''),
NODE_PATH: alterPath.get(),
AUTOCANNON_SOCKET: socketPath
})
})
}
function createChannel (onport) {
const pipeName = `${process.pid}.autocannon`
//windows和*nix對于localsocket的不同調(diào)用位置
const socketPath = process.platform === 'win32'
? `\\\\?\\pipe\\${pipeName}`
: path.join(os.tmpdir(), pipeName)
const server = net.createServer((socket) => {
//socket一旦連接并返回所啟動的服務的端口,就開始壓測
socket.once('data', (chunk) => {
const port = chunk.toString()
onport(port)
})
})
//監(jiān)聽localsocket
server.listen(socketPath)
server.on('close', () => {
try {
fs.unlinkSync(socketPath)
} catch (err) {}
})
return { socketPath, server }
}
要看明白autocannon調(diào)用預加載模塊的目的,就不得不看一下這個預加載模塊autocannonDetectPort到底寫了什么:
//當app監(jiān)聽了某個端口,給予提醒
const onListen = require('on-net-listen')
const net = require('net')
//獲取IPC的unix socket
const socket = net.connect(process.env.AUTOCANNON_SOCKET)
//將端口信息通過IPC傳遞給autocannon
onListen(function (addr) {
this.destroy()
const port = Buffer.from(addr.port + '')
socket.write(port)
})
//關閉socket
socket.unref()
至此,一切都明朗了:
對于某些需要一定初始化時間的服務,不能在服務剛一啟動就進行壓測,需要等待其初始化完成,開始監(jiān)聽端口時再進行.對于這種情況,就需要--on-port這個運行標志了.
因此'--on-port'標志需要和'--'參數(shù)一起使用,'--'參數(shù)后面跟著需要壓測的服務,將'--on-port'標志設為true后,就可以實現(xiàn)等待初始化完成.
autocannon處理是這樣的:
當'--on-port'標志打開時,autocannon運行一個小的IPC服務,用于接收待測服務初始化完成后傳回來的端口號,
將'autocannonDetectPort'這個檢測端口的預加載模塊(也是一個IPC客戶端)的路徑寫入環(huán)境變量,用于后期調(diào)用.
正式運行待測服務時,通過 -r 參數(shù)進行預加載,將'autocannonDetectPort'嵌入待測服務中.
等待待測服務初始化完畢,開始監(jiān)聽端口,'autocannonDetectPort'將監(jiān)聽的端口發(fā)送回IPC服務端autocannon主程序.
autocannon接收到端口后視為初始化完畢,開始進行壓測.當然,如果已經(jīng)給autocannon傳入了端口,則此次IPC傳回端口只作為一個服務初始化完成的標志,不使用該傳回的端口.
這里的巧妙點就是這個預加載命令了,不用待測服務修改源文件,沒有侵入性.
progressTracker.js
progressTracker是以命令行的方式展示進度的模塊,因此加載了許多的處理顯示效果的庫.
本質(zhì)上是對autocannon的各種事件的監(jiān)聽,并展示相應結果.
lib/run.js
run模塊是整個autocannon的核心,催動著整個程序的運行.
run模塊主要處理了兩件事:
- 初始化所有的客戶端.
- 定時統(tǒng)計/監(jiān)視實時進度.
先說說初始化部分:
run模塊引用了直方圖庫hdr-histogram-js用于記錄處理統(tǒng)計結果.
在autocannon中,發(fā)起請求的根源是client,而壓力的測試也更多來自于client的多少以及持續(xù)的時間/總量.
client本質(zhì)上是eventemitter的繼承,因此,run模塊監(jiān)聽了client上諸如'respone','timeout','error'等事件,對不同事件作出相應.
大多數(shù)響應方法是用于統(tǒng)計計數(shù)的.同時,run模塊本身也繼承了eventemitter,在接受到各種事件時也會轉發(fā)事件.
運行時有一個重要的標志:stop標志.
autocannon有幾種情況下會stop:
正常情況下:
- 請求達到了預定的請求數(shù),stop標志變?yōu)閠rue.
- 求情發(fā)送持續(xù)時長到達預定時長,stop標志變?yōu)閠rue.
非正常情況下: - 錯誤數(shù)+超時數(shù)超過了閾值(如果有的話),stop標志變?yōu)閠rue.
為了不占用資源,并沒有添加一個對stop的監(jiān)聽,也就是是說,stop變?yōu)閠rue時并不會立即停止.
處理stop的位置是在run模塊的另一個部分,定時統(tǒng)計/監(jiān)視進度的函數(shù):
當run模塊初始化時,會啟動一個定時器,每一秒收集一次壓測的各項統(tǒng)計計數(shù).這樣是合理的設計,因為既然是壓測,其短時間內(nèi)反饋相應變化非常大,沒有必要每次有變化就立即相應,即將收集數(shù)據(jù)的方法綁定到各個client的相應事件上.基于性能的要求也不應如此.
在每秒收集數(shù)據(jù)時也檢查一下stop標志是否為true.若為true,則將client盡數(shù)銷毀,各計時器也一并清理,同時激發(fā)'完成'事件,將統(tǒng)計結果通過事件傳遞出去.
lib/httpClient.js
httpClient是發(fā)出請求的主體,若干httpClient給待測服務施加了壓力.
我覺得httpClient是整個程序的精華所在,httpClient并沒有用request或http.request這種封裝好的模塊,它通過一些底層的編寫socket構建請求讓我們了解到一些因為頻繁使用koa,express等現(xiàn)有框架而忽視的細節(jié):
Client.prototype._connect = function () {
//是否是安全傳輸
if (this.secure) {
//IPC模式下使用指定的unix socket來連接
if (this.ipc) {
this.conn = tls.connect(this.opts.socketPath, { rejectUnauthorized: false })
} else {
//建立tlssocket
this.conn = tls.connect(this.opts.port, this.opts.hostname, { rejectUnauthorized: false, servername: this.opts.servername })
}
} else {
if (this.ipc) {
this.conn = net.connect(this.opts.socketPath)
} else {
//建立普通socket
this.conn = net.connect(this.opts.port, this.opts.hostname)
}
}
//遇到錯誤重新建立socket
this.conn.on('error', (error) => {
this.emit('connError', error)
if (!this.destroyed) this._connect()
})
//接受到信息時進行的處理i
this.conn.on('data', (chunk) => {
this.resData[0].bytes += chunk.length
this.parser.execute(chunk)
})
//一次請求結束,接著重復執(zhí)行請求
this.conn.on('end', () => {
if (!this.destroyed) this._connect()
})
this._doRequest(0)
}
同時定義了請求的發(fā)送與socket的銷毀:
Client.prototype._doRequest = function (rpi) {
//是否超過頻率限制
if (!this.rate || (this.rate && this.reqsMadeThisSecond++ < this.rate)) {
if (!this.destroyed && this.responseMax && this.reqsMade >= this.responseMax) {
return this.destroy()
}
this.emit('request')
//記錄高精度的時間,,用來獲取納秒級別的時間差
this.resData[rpi].startTime = process.hrtime()
//發(fā)送請求
this.conn.write(this.requestIterator.move())
//重置超時記錄器
this.timeoutTicker.reschedule(this.timeout)
} else {
this.paused = true
}
}
Client.prototype._destroyConnection = function () {
//清除所有監(jiān)聽
this.conn.removeAllListeners('error')
this.conn.removeAllListeners('end')
this.conn.on('error', () => {})
//銷毀socket
this.conn.destroy()
}
Client.prototype.destroy = function () {
if (!this.destroyed) {
this.destroyed = true
this.timeoutTicker.clear()
if (this.rate) clearInterval(this.rateInterval)
this.emit('done')
this._destroyConnection()
}
}
這樣做的好處自然是更靈活的定制細節(jié).
其中有一個對請求返回處理的細節(jié),在連接時,有這樣一段代碼:
this.conn.on('data', (chunk) => {
this.resData[0].bytes += chunk.length
this.parser.execute(chunk)
})
這里的parser是HTTPParser,是一個內(nèi)置的c/c++模塊,用于解析出respones的請求體,本身是基于事件綁定對不同階段進行不同處理.
在解析過程中,經(jīng)歷了以下事件:
- kOnHeaders:不斷解析獲取的請求頭
- kOnHeadersComplete:請求頭解析完畢
- kOnBody:不斷解析獲取的請求體
- kOnMessageComplete:請求體解析完畢
- kOnExecute:一次解析完畢 ( 無法一次性接收 HTTP 報文的情況 )
在httpClient中,對這些階段進行了綁定:
this.parser = new HTTPParser(HTTPParser.RESPONSE)
this.parser[HTTPParser.kOnHeaders] = () => {}
this.parser[HTTPParser.kOnHeadersComplete] = (opts) => {
this.emit('headers', opts)
this.resData[0].headers = opts
}
//監(jiān)聽了'body'事件就可以獲取到body的數(shù)據(jù)
this.parser[HTTPParser.kOnBody] = (body) => {
this.emit('body', body)
}
this.parser[HTTPParser.kOnMessageComplete] = () => {
//獲取高精度的時間,精確計算一個請求的花費
let end = process.hrtime(this.resData[0].startTime)
let responseTime = end[0] * 1e3 + end[1] / 1e6
//返回此次請求結果
this.emit('response', this.resData[0].headers.statusCode, this.resData[0].bytes, responseTime)
this.resData[0].bytes = 0
if (!this.destroyed && this.reconnectRate && this.reqsMade % this.reconnectRate === 0) {
return this._resetConnection()
}
//一次請求結束后立即再開始一次請求
this._doRequest(0)
}
如果我們用request等現(xiàn)有庫發(fā)送大量請求,是會經(jīng)常出現(xiàn)各種原因導致nodejs崩潰的.理論來說一臺主機一般可以維持65535個連接,而nodejs維持個1萬+的連接也沒問題.但是使用request等框架,很可能發(fā)送上百上千的請求就掛掉了.其中的原因可能是因為各種資源的泄露或占用沒有被釋放.也可能是socket資源沒有充分利用起來,每次都建立新對象,舊的又沒有及時被回收,造成socket資源緊張.
面對這種情況,一般是需要限制發(fā)送頻,保證處理速度,但這對于一個壓測軟件來說,是致命的.
從底層編寫的請求發(fā)送體,不僅可以在精確的節(jié)點獲取精確的信息,還可以控制請求發(fā)送方方面面的細節(jié),盡可能避免資源泄露,能重復使用的就重復使用,保證了穩(wěn)定性.
至此,lib/httpClient模塊發(fā)送請求的流程基本上是這樣的:
lib/requestIterator.js和lib/httpRequestBuilder
請求迭代器存在的意義在于請求體可以隨著請求而變化,這對無緩存壓測很有意義.
requestIterator本身是一個有限循環(huán)數(shù)組的迭代,在給定的請求體內(nèi)數(shù)組不斷的循環(huán),供httpclient發(fā)送請求.
RequestIterator.prototype.move = function () {
//返回當前的請求體
let ret = this.currentRequest.requestBuffer
//將當前請求體的指針指向請求體數(shù)組內(nèi)下一個請求體
this.nextRequest()
//他還有隨機生成id的功能
return this.reqDefaults.idReplacement
? Buffer.from(ret.toString().replace(/\[<id>\]/g, hyperid()))
: ret
}
因為是socket直接發(fā)送請求,所以發(fā)送的請求體需要經(jīng)過httpRequestBuilder,將用戶定義的請求參數(shù)最終處理成Buffer形式.
總結
要說autocannon最有價值學習的地方,我覺得應該是其從底層入手構建的請求模塊.對細節(jié)的把控支持其能短時間發(fā)送大量請求.
autocannon可以控制同時worker的數(shù)量,當前常見的限流/限頻的流程控制方法也有控制worker數(shù)量的功能,其中有何不同呢?
以樸靈大牛的bagpipe為例,bagpipe是比較有名的并行控制模塊,他與autocannon的理念正好相反,是為了限制發(fā)送請求的速度,保證若干個下載結束后再進行下若干個請求.
因此,這兩者的差異就很明確了:他們的目的和要處理的任務不同.
- autocannon注重過程,bagpipe注重結果.如果用bagpipe寫autocannon的流程的話,autocannon就會變成一個資源下載器.
- 從根本上來說,autocannon是若干個worker去干一個任務,而bagpipe則是一個worker干若干個任務.對于有序的,有限的,復雜的任務,多worker形式相比單worker形式并不適宜.而autocannon所面臨的任務,正好是無序,簡單,無限的.