Http請求的處理流程
HTTPDService服務(wù)的添加
- 在 Server的啟動過程中會添加并啟動各種service谣殊, 其中就包括這個HTTPDService:
appendHTTPDService(c httpd.Config)
定義在cmd/influxdb/run/server.go
中
srv := httpd.NewService(c)
srv.Handler.MetaClient = s.MetaClient
srv.Handler.QueryAuthorizer = meta.NewQueryAuthorizer(s.MetaClient)
srv.Handler.WriteAuthorizer = meta.NewWriteAuthorizer(s.MetaClient)
srv.Handler.QueryExecutor = s.QueryExecutor
srv.Handler.Monitor = s.Monitor
srv.Handler.PointsWriter = s.PointsWriter
srv.Handler.Version = s.buildInfo.Version
srv.Handler.BuildType = "OSS"
ss := storage.NewStore(s.TSDBStore, s.MetaClient)
srv.Handler.Store = ss
srv.Handler.Controller = control.NewController(ss, s.Logger)
s.Services = append(s.Services, srv)
- 從上面的代碼可以看出屿聋,主要是初始化這個
Handler
, 這個Handler類負(fù)責(zé)處理具體的Http Request,生成相應(yīng)的Response;
HTTPDService分析
- Httpd Service的具體實現(xiàn)在
services/httpd
目錄下 - 這個http服務(wù)使用golang提供的
net/http
包實現(xiàn) - 流程解析:
3.1 創(chuàng)建Service:
func NewService(c Config) *Service {
s := &Service{
addr: c.BindAddress, //http服務(wù)監(jiān)控的地址狂巢,端口
https: c.HTTPSEnabled,
cert: c.HTTPSCertificate,
key: c.HTTPSPrivateKey,
limit: c.MaxConnectionLimit,
tlsConfig: c.TLS,
err: make(chan error),
unixSocket: c.UnixSocketEnabled,
unixSocketPerm: uint32(c.UnixSocketPermissions),
bindSocket: c.BindSocket,
Handler: NewHandler(c), // 創(chuàng)建Handler
Logger: zap.NewNop(),
}
if s.tlsConfig == nil {
s.tlsConfig = new(tls.Config)
}
3.2 啟動Service:
func (s *Service) Open() error {
s.Handler.Open() // Handler必要的初始化底燎,主要是日志文件的設(shè)置
// Open listener.
if s.https {
...
//tls listener支持
s.ln = listener
} else {
...
listener, err := net.Listen("tcp", s.addr)
s.ln = listener
}
// Open unix socket listener.
if s.unixSocket {
...
s.unixSocketListener = listener
go s.serveUnixSocket()
}
// Enforce a connection limit if one has been given.
// 使用這個LimitListener冶忱,同時僅能接收s.limit個連接卜朗,超過的connect則自動被close掉
if s.limit > 0 {
s.ln = LimitListener(s.ln, s.limit)
}
...
// Begin listening for requests in a separate goroutine.
go s.serveTCP()
return nil
}
3.3 關(guān)鍵函數(shù)之NewHandler()
:
h := &Handler{
mux: pat.New(),
Config: &c,
Logger: zap.NewNop(),
CLFLogger: log.New(os.Stderr, "[httpd] ", 0),
stats: &Statistics{},
requestTracker: NewRequestTracker(),
}
// Limit the number of concurrent & enqueued write requests.
h.writeThrottler = NewThrottler(c.MaxConcurrentWriteLimit, c.MaxEnqueuedWriteLimit)
h.writeThrottler.EnqueueTimeout = c.EnqueuedWriteTimeout
h.AddRoutes([]Route{
...
//添加各個不同url的路由信息
}
h.AddRoutes(fluxRoute)
3.4 關(guān)鍵函數(shù)之s.serverTCP()
熬的,使用之前初始化的listener和handler啟動真正的http服務(wù)
err := http.Serve(listener, s.Handler)
if err != nil && !strings.Contains(err.Error(), "closed") {
s.err <- fmt.Errorf("listener failed: addr=%s, err=%s", s.Addr(), err)
}
連接數(shù)限制
- 使用
LimitListener
實現(xiàn),在原始的Listener
外包了一層還實現(xiàn)這個限制功能 -
LimitListener
定義: 從下面的代碼可以看出創(chuàng)建了一個帶緩沖區(qū)的chan, 其緩沖區(qū)大小為要限制的連接數(shù)的大小
type limitListener struct {
net.Listener
sem chan struct{}
}
func LimitListener(l net.Listener, n int) net.Listener {
return &limitListener{Listener: l, sem: make(chan struct{}, n)}
}
- 接收連接:
func (l *limitListener) Accept() (net.Conn, error) {
for {
c, err := l.Listener.Accept()
if err != nil {
return nil, err
}
// 如果接收的連接數(shù)達(dá)到sem chan緩沖區(qū)的大小拌喉,下面這個select將進(jìn)入default分支速那,立即close掉當(dāng)前連接
// 否則返回封裝后的limitListenerConn, 它在close時調(diào)用l.release, 讀取sem chan中數(shù)據(jù),釋放緩沖區(qū)空間
select {
case l.sem <- struct{}{}:
return &limitListenerConn{Conn: c, release: l.release}, nil
default:
c.Close()
}
}
}
Query請求的處理流程
主要實現(xiàn)在
func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.User)
調(diào)整 ResponseWriter: 根據(jù)請求中的
Accept
頭尿背,來使用不同的ResponseWriter, 作用是設(shè)置Http Reponse中對應(yīng)的Content-Type
和格式化Body
部分,目前支持三種類型:text/csv
端仰,application/json
,application/x-msgpack
田藐, 具體實現(xiàn)可在services/httpd/response_writer.go
中解析http request: 包括 uri和body部分, 最后生成
influxql.Query
和ExecutionOptions
3.1 生成 influxql.Query: 通常在request uri中的q=
是query語句荔烧,比如:select * from m1, 會經(jīng)過influxql.NewParser
和p.ParseQuery()
的處理
3.2 生成ExecutionOptions:
opts := query.ExecutionOptions{
Database: db,
RetentionPolicy: r.FormValue("rp"),
ChunkSize: chunkSize,
ReadOnly: r.Method == "GET",
NodeID: nodeID,
}
- 設(shè)置closing chan, 當(dāng)當(dāng)前的http連接斷開時,close掉這個closing chan, 即通過當(dāng)前正在處理的query請求坞淮,作相應(yīng)的處理
var closing chan struct{}
if !async {
closing = make(chan struct{})
if notifier, ok := w.(http.CloseNotifier); ok {
// CloseNotify() is not guaranteed to send a notification when the query
// is closed. Use this channel to signal that the query is finished to
// prevent lingering goroutines that may be stuck.
done := make(chan struct{})
defer close(done)
notify := notifier.CloseNotify()
go func() {
// Wait for either the request to finish
// or for the client to disconnect
select {
case <-done:
case <-notify:
close(closing)
}
}()
opts.AbortCh = done
} else {
defer close(closing)
}
}
-
執(zhí)行具體的query操作:
results := h.QueryExecutor.ExecuteQuery(q, opts, closing)
, 返回results
是個chan, 所有的query結(jié)果都從這個chan循環(huán)讀取出來; -
非chunked方式的Response的合成:所有結(jié)果合部緩存在內(nèi)存中茴晋,從上面5中的chan循環(huán)讀取出來result, 先作
h.Config.MaxRowLimit
返回行數(shù)的限制檢查,再作merge,為了相同Series的數(shù)據(jù)連續(xù)存放和節(jié)省內(nèi)存占用.
l := len(resp.Results)
if l == 0 {
resp.Results = append(resp.Results, r)
} else if resp.Results[l-1].StatementID == r.StatementID { //相同StatemnetID的result是連續(xù)返回的回窘,中間沒有間隔
if r.Err != nil {
resp.Results[l-1] = r
continue
}
cr := resp.Results[l-1]
rowsMerged := 0
if len(cr.Series) > 0 {
lastSeries := cr.Series[len(cr.Series)-1]
for _, row := range r.Series {
if !lastSeries.SameSeries(row) { //相同Series的row是連續(xù)返回的诺擅,中間沒有間隔
// Next row is for a different series than last.
break
}
// Values are for the same series, so append them.
lastSeries.Values = append(lastSeries.Values, row.Values...)
rowsMerged++
}
}
// Append remaining rows as new rows.
r.Series = r.Series[rowsMerged:]
cr.Series = append(cr.Series, r.Series...)
cr.Messages = append(cr.Messages, r.Messages...)
cr.Partial = r.Partial
} else {
resp.Results = append(resp.Results, r)
}
- chunked方式的Response: 從上面5中的chan循環(huán)讀取出來result, 每條result立即返回到client:
// Write out result immediately if chunked.
if chunked {
n, _ := rw.WriteResponse(Response{
Results: []*query.Result{r},
})
atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n))
w.(http.Flusher).Flush()
continue
}
-
async請求處理: 簡單講就是不返回任何的查詢結(jié)果,也就是不支持,返回的http code是
StatusNoContent
if async {
go h.async(q, results)
h.writeHeader(w, http.StatusNoContent)
return
}
Write請求的處理流程
- 寫入的line protocol例子:
insert test_mea_1,tag1=v1,tag2=v2 cpu=1,memory=10
啡直,對應(yīng)到http request:
1.1 uri部分:/write?consistency=all&db=my_test_db_2&precision=ns&rp=
1.2 body部分:test_mea_1,tag1=v1,tag2=v2 cpu=1,memory=10\n
- 實現(xiàn)在
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.User)
中;
2.1 解析uri和body部分:
database := r.URL.Query().Get("db")
...
if h.Config.MaxBodySize > 0 { //限制body讀取的大小
body = truncateReader(body, int64(h.Config.MaxBodySize))
}
if r.Header.Get("Content-Encoding") == "gzip" {
//body解壓縮
}
...
_, err := buf.ReadFrom(body) //讀取body部分
...
//解析 point
points, parseError := models.ParsePointsWithPrecision(buf.Bytes(), time.Now().UTC(), r.URL.Query().Get("precision"))
//決定多復(fù)本情況下的寫入一致性策略
level := r.URL.Query().Get("consistency")
...
// 寫入point
h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, user, points); influxdb.IsClientError(err)
// 失敗的話返回client返回信息
h.httpError(..)
// 成功時返回
h.writeHeader(w, http.StatusNoContent)
其他Http request請求的處理不一一詳述
補(bǔ)充一下Influxdb中的Handler.AddRoute
的實現(xiàn)
- 其作用就是添加http uri的路由信息烁涌,將相應(yīng)的uri與具體的handler函數(shù)對應(yīng)起來;
-
Route
的定義
type Route struct {
Name string
Method string
Pattern string
Gzipped bool
LoggingEnabled bool
HandlerFunc interface{}
}
//query請求對應(yīng)的Route
Route{
"query", // Query serving route.
"POST", "/query", true, true, h.serveQuery,
}
//寫請求對應(yīng)的Route
Route{
"write", // Data-ingest route.
"POST", "/write", true, writeLogEnabled, h.serveWrite,
}
- Influxdb使用了golang提供的
net/http
包來實現(xiàn)它的http服務(wù),具體的http請求都會對應(yīng)到相應(yīng)的http.Handler
, 而http.Handler
又使用了http.HandlerFunc
來產(chǎn)生酒觅,參見:HandlerFunc, 這個AddRout
就利用了HandlerFunc
將handler層層包裝撮执,添加各種功能; - 我們來剖析一下
AddRoute
的處理流程
4.1 處理框架
// 針于每個route分別處理
for _, r := range routes {
//利用route的定義和當(dāng)前influxdb的config來包裝生成handler
var handler http.Handler
... //對handler進(jìn)行層層包裝
//將route和handler添加到mux, 這里這個使用了第三方的模式復(fù)用器: https://github.com/bmizerany/pat
h.mux.Add(r.Method, r.Pattern, handler)
}
4.2 添加驗證處理`handler = authenticate(hf, h, h.Config.AuthEnabled)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// influxdb的config里沒有啟動驗證,走下面的邏輯
if !requireAuthentication {
inner(w, r, nil)
return
}
// 驗證通過會生成這個 meta.User舷丹,傳過最終的請求處理函數(shù)抒钱,作授權(quán)驗證
var user meta.User
// TODO corylanou: never allow this in the future without users
if requireAuthentication && h.MetaClient.AdminUserExists() {
creds, err := parseCredentials(r)
if err != nil {
atomic.AddInt64(&h.stats.AuthenticationFailures, 1)
h.httpError(w, err.Error(), http.StatusUnauthorized)
return
}
// http 驗證支持兩種,User和jwt Bearer驗證颜凯,這都有對應(yīng)的rfc,具體內(nèi)容不展開了
// 其中user驗證又包括 basic auth和uri中自帶username和password兩種方式
// 如果驗證不通過谋币,就直接返回給客戶端 h.httpError(w, "xxxx", http.StatusUnauthorized)
switch creds.Method {
case UserAuthentication:
...
case BearerAuthentication:
...
default:
h.httpError(w, "unsupported authentication", http.StatusUnauthorized)
}
}
// 調(diào)用最終的請求處理函數(shù)
inner(w, r, user)
})
4.3 handler = cors(handler)
: 給response添加cors headers
4.4 handler = requestID(handler)
: 給response添加request id
4.5 handler = h.recovery(handler, r.Name)
: 在處理請求過程中捕獲panic