由于要給團(tuán)隊(duì)做一下關(guān)于 Flomesh 的分享超升,準(zhǔn)備下材料娶牌。
“分享是最好的學(xué)習(xí)方法记劈∽婺铮”
上一回初探可編程網(wǎng)關(guān) Pipy,領(lǐng)略了 Pipy 的“風(fēng)騷”。從 Pipy 的 GUI 交互深入了解了 Pipy 的配置加載流程。
今天看一下 Pipy 如何實(shí)現(xiàn) Metrics 的功能拘领,順便看下數(shù)據(jù)如何在多個(gè) Pipeline 中進(jìn)行流轉(zhuǎn)。
前置
首先樱调,需要對 Pipy 有一定的了解院究,如果不了解看一下上一篇文章洽瞬。
其次構(gòu)建好 Pipy 環(huán)境,關(guān)于構(gòu)建還是去看上一篇文章业汰。
Metrics 功能實(shí)現(xiàn)
至于 Pipy 實(shí)現(xiàn) Metrics 的方式,源碼中就有菩颖,位于 test/006-metrics/pipy.js
样漆。
- 代理監(jiān)聽
6080
端口,后端服務(wù)在8080
端口晦闰,Metrics 在9090
端口 - 共有 5 個(gè) Pipeline:3 個(gè) listen 類型放祟,2 個(gè) Pipeline 類型
- 7 種過濾器:
fork
、connect
呻右、decodeHttpRequest
跪妥、onMessageStart
、decodeHttpResponse
声滥、encodeHttpRespnse
眉撵、replaceMessage
貼一下源碼:
pipy({
_metrics: {
count: 0,
},
_statuses: {},
_latencies: [
1,2,5,7,10,15,20,25,30,40,50,60,70,80,90,100,
200,300,400,500,1000,2000,5000,10000,30000,60000,
Number.POSITIVE_INFINITY
],
_buckets: [],
_timestamp: 0,
})
.listen(6080)
.fork('in')
.connect('127.0.0.1:8080')
.fork('out')
// Extract request info
.pipeline('in')
.decodeHttpRequest()
.onMessageStart(
() => (
_timestamp = Date.now(),
_metrics.count++
)
)
// Extract response info
.pipeline('out')
.decodeHttpResponse()
.onMessageStart(
e => (
((status, latency, i) => (
status = e.head.status,
latency = Date.now() - _timestamp,
i = _latencies.findIndex(t => latency <= t),
_buckets[i]++,
_statuses[status] = (_statuses[status]|0) + 1
))()
)
)
// Expose as Prometheus metrics
.listen(9090)
.decodeHttpRequest()
.replaceMessage(
() => (
(sum => new Message(
[
`count ${_metrics.count}`,
...Object.entries(_statuses).map(
([k, v]) => `status{code="${k}"} ${v}`
),
..._buckets.map((n, i) => `bucket{le="${_latencies[i]}"} ${sum += n}`)
]
.join('\n')
))(0)
)
)
.encodeHttpResponse()
// Mock service on port 8080
.listen(8080)
.decodeHttpRequest()
.replaceMessage(
new Message('Hello!\n')
)
.encodeHttpResponse()
測試
使用 ab 做請求模擬 ab -n 2000 -c 10 http://localhost:6080/
,然后檢查下記錄的指標(biāo)落塑。
$ http :9090 --body
count 2000
status{code="200"} 2000
bucket{le="1"} 1762
bucket{le="2"} 1989
bucket{le="5"} 1994
bucket{le="7"} 1999
bucket{le="10"} 2000
分析
TL;DR:本次示例的核心是 fork
纽疟,從字面意思就很容易理解:新開一個(gè)處理分支(Pipeline),與主線并行執(zhí)行憾赁。
在 src/inbound.cpp:104 109
處污朽,Pipy 接收一個(gè)新的連接奈附。
創(chuàng)建 Context
和 Session
振峻,并在 L178 處注冊事件的處理器稠诲,然后在 L187 處開始接收數(shù)據(jù)巾兆。
在 #receive
方法中痒筒,定義了數(shù)據(jù)接收處理器:將讀到的數(shù)據(jù)寫入 buffer
中惨篱。這個(gè) buffer
存儲(chǔ)的是 Event
類型數(shù)據(jù)栓拜。(所以說 Pipy 是基于數(shù)據(jù)流事件挥萌,將一些封裝成了事件)
接著調(diào)用 Session#input
柬赐。
實(shí)際上調(diào)用的是 ReusableSession#input
亡问,調(diào)用 m_filters
的 #process
方法。m_filters
實(shí)際上是 Filter
類型肛宋。
為什么只有一個(gè) Filter
州藕?重點(diǎn)來了,看下 ReusableSession
的構(gòu)造過程就能明白了(這里用了個(gè)反向迭代器)酝陈。output
是當(dāng)前 Filter
處理完要執(zhí)行的床玻,實(shí)現(xiàn)類似鏈?zhǔn)降膱?zhí)行。
再回頭看上面的示例沉帮,可以想象 fork
就是 Session
的 m_filters
锈死。
src/filters/fork.cpp:85
贫堰,在 fork
過濾器中,在 1 處從 module
中獲取到目標(biāo) Pipeline
待牵,并在 3 和 4 處 創(chuàng)建了新的 Session
并保存原 Session
的數(shù)據(jù)其屏。
然后在 5 處將原 Event
輸入到新的 Session
中,觸發(fā)目標(biāo) Pipeline
的 Filter
鏈缨该。值得注意的是偎行,這里是基于事件的處理,并不是阻塞的贰拿。這就意味著蛤袒,fork
的目標(biāo) pipline
,與 fork
所在的 pipeline 是并行執(zhí)行的膨更。 在示例中妙真,就是 Pipeline
‘in’ 與 主 Pipeline
的 connect
是并行執(zhí)行的。
最終在 6 處荚守,繼續(xù)使用原 Session
的 Filter
鏈珍德。