Influxdb的Http請求處理流程

Http請求的處理流程

HTTPDService服務(wù)的添加

  1. 在 Server的啟動過程中會添加并啟動各種service谣殊, 其中就包括這個HTTPDService:appendHTTPDService(c httpd.Config) 定義在 cmd/influxdb/run/server.go
    srv := httpd.NewService(c)
    srv.Handler.MetaClient = s.MetaClient
    srv.Handler.QueryAuthorizer = meta.NewQueryAuthorizer(s.MetaClient)
    srv.Handler.WriteAuthorizer = meta.NewWriteAuthorizer(s.MetaClient)
    srv.Handler.QueryExecutor = s.QueryExecutor
    srv.Handler.Monitor = s.Monitor
    srv.Handler.PointsWriter = s.PointsWriter
    srv.Handler.Version = s.buildInfo.Version
    srv.Handler.BuildType = "OSS"
    ss := storage.NewStore(s.TSDBStore, s.MetaClient)
    srv.Handler.Store = ss
    srv.Handler.Controller = control.NewController(ss, s.Logger)

    s.Services = append(s.Services, srv)
  1. 從上面的代碼可以看出屿聋,主要是初始化這個Handler, 這個Handler類負(fù)責(zé)處理具體的Http Request,生成相應(yīng)的Response;

HTTPDService分析

  1. Httpd Service的具體實現(xiàn)在 services/httpd目錄下
  2. 這個http服務(wù)使用golang提供的net/http包實現(xiàn)
  3. 流程解析:
    3.1 創(chuàng)建Service:
    func NewService(c Config) *Service {
    s := &Service{
        addr:           c.BindAddress, //http服務(wù)監(jiān)控的地址狂巢,端口
        https:          c.HTTPSEnabled,
        cert:           c.HTTPSCertificate,
        key:            c.HTTPSPrivateKey,
        limit:          c.MaxConnectionLimit,
        tlsConfig:      c.TLS,
        err:            make(chan error),
        unixSocket:     c.UnixSocketEnabled,
        unixSocketPerm: uint32(c.UnixSocketPermissions),
        bindSocket:     c.BindSocket,
        Handler:        NewHandler(c),  // 創(chuàng)建Handler
        Logger:         zap.NewNop(),
    }
    if s.tlsConfig == nil {
        s.tlsConfig = new(tls.Config)
    }

3.2 啟動Service:

func (s *Service) Open() error {
    s.Handler.Open() // Handler必要的初始化底燎,主要是日志文件的設(shè)置

    // Open listener.
    if s.https {
         ...
        //tls listener支持
        s.ln = listener
    } else {
        ...
        listener, err := net.Listen("tcp", s.addr)
        s.ln = listener
    }

    // Open unix socket listener.
    if s.unixSocket {
        ...
        s.unixSocketListener = listener
        go s.serveUnixSocket()
    }

    // Enforce a connection limit if one has been given.
    // 使用這個LimitListener冶忱,同時僅能接收s.limit個連接卜朗,超過的connect則自動被close掉
    if s.limit > 0 {
        s.ln = LimitListener(s.ln, s.limit)
    }

    ...

    // Begin listening for requests in a separate goroutine.
    go s.serveTCP()
    return nil
}

3.3 關(guān)鍵函數(shù)之NewHandler():

h := &Handler{
        mux:            pat.New(),
        Config:         &c,
        Logger:         zap.NewNop(),
        CLFLogger:      log.New(os.Stderr, "[httpd] ", 0),
        stats:          &Statistics{},
        requestTracker: NewRequestTracker(),
    }

    // Limit the number of concurrent & enqueued write requests.
    h.writeThrottler = NewThrottler(c.MaxConcurrentWriteLimit, c.MaxEnqueuedWriteLimit)
    h.writeThrottler.EnqueueTimeout = c.EnqueuedWriteTimeout

    h.AddRoutes([]Route{
    ...
    //添加各個不同url的路由信息
    }
    
    h.AddRoutes(fluxRoute)

3.4 關(guān)鍵函數(shù)之s.serverTCP()熬的,使用之前初始化的listener和handler啟動真正的http服務(wù)

    err := http.Serve(listener, s.Handler)
    if err != nil && !strings.Contains(err.Error(), "closed") {
        s.err <- fmt.Errorf("listener failed: addr=%s, err=%s", s.Addr(), err)
    }

連接數(shù)限制

  1. 使用 LimitListener實現(xiàn),在原始的Listener外包了一層還實現(xiàn)這個限制功能
  2. LimitListener定義: 從下面的代碼可以看出創(chuàng)建了一個帶緩沖區(qū)的chan, 其緩沖區(qū)大小為要限制的連接數(shù)的大小
type limitListener struct {
    net.Listener
    sem chan struct{}
}
func LimitListener(l net.Listener, n int) net.Listener {
    return &limitListener{Listener: l, sem: make(chan struct{}, n)}
}
  1. 接收連接:
func (l *limitListener) Accept() (net.Conn, error) {
    for {
        c, err := l.Listener.Accept()
        if err != nil {
            return nil, err
        }
        
        // 如果接收的連接數(shù)達(dá)到sem chan緩沖區(qū)的大小拌喉,下面這個select將進(jìn)入default分支速那,立即close掉當(dāng)前連接
        // 否則返回封裝后的limitListenerConn, 它在close時調(diào)用l.release, 讀取sem chan中數(shù)據(jù),釋放緩沖區(qū)空間
        select {
        case l.sem <- struct{}{}:
            return &limitListenerConn{Conn: c, release: l.release}, nil
        default:
            c.Close()
        }
    }
}

Query請求的處理流程

  1. 主要實現(xiàn)在 func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.User)

  2. 調(diào)整 ResponseWriter: 根據(jù)請求中的Accept頭尿背,來使用不同的ResponseWriter, 作用是設(shè)置Http Reponse中對應(yīng)的Content-Type和格式化Body部分,目前支持三種類型:text/csv端仰,application/jsonapplication/x-msgpack田藐, 具體實現(xiàn)可在 services/httpd/response_writer.go

  3. 解析http request: 包括 uri和body部分, 最后生成 influxql.QueryExecutionOptions
    3.1 生成 influxql.Query: 通常在request uri中的q=是query語句荔烧,比如:select * from m1, 會經(jīng)過influxql.NewParserp.ParseQuery()的處理
    3.2 生成ExecutionOptions:

opts := query.ExecutionOptions{
Database:        db,
RetentionPolicy: r.FormValue("rp"),
ChunkSize:       chunkSize,
ReadOnly:        r.Method == "GET",
NodeID:          nodeID,
}
  1. 設(shè)置closing chan, 當(dāng)當(dāng)前的http連接斷開時,close掉這個closing chan, 即通過當(dāng)前正在處理的query請求坞淮,作相應(yīng)的處理
var closing chan struct{}
    if !async {
        closing = make(chan struct{})
        if notifier, ok := w.(http.CloseNotifier); ok {
            // CloseNotify() is not guaranteed to send a notification when the query
            // is closed. Use this channel to signal that the query is finished to
            // prevent lingering goroutines that may be stuck.
            done := make(chan struct{})
            defer close(done)

            notify := notifier.CloseNotify()
            go func() {
                // Wait for either the request to finish
                // or for the client to disconnect
                select {
                case <-done:
                case <-notify:
                    close(closing)
                }
            }()
            opts.AbortCh = done
        } else {
            defer close(closing)
        }
    }
  1. 執(zhí)行具體的query操作: results := h.QueryExecutor.ExecuteQuery(q, opts, closing), 返回results是個chan, 所有的query結(jié)果都從這個chan循環(huán)讀取出來;
  2. 非chunked方式的Response的合成:所有結(jié)果合部緩存在內(nèi)存中茴晋,從上面5中的chan循環(huán)讀取出來result, 先作h.Config.MaxRowLimit返回行數(shù)的限制檢查,再作merge,為了相同Series的數(shù)據(jù)連續(xù)存放和節(jié)省內(nèi)存占用.
        l := len(resp.Results)
        if l == 0 {
            resp.Results = append(resp.Results, r)
        } else if resp.Results[l-1].StatementID == r.StatementID { //相同StatemnetID的result是連續(xù)返回的回窘,中間沒有間隔
            if r.Err != nil {
                resp.Results[l-1] = r
                continue
            }

            cr := resp.Results[l-1]
            rowsMerged := 0
            if len(cr.Series) > 0 {
                lastSeries := cr.Series[len(cr.Series)-1]

                for _, row := range r.Series {
                    if !lastSeries.SameSeries(row) { //相同Series的row是連續(xù)返回的诺擅,中間沒有間隔
                        // Next row is for a different series than last.
                        break
                    }
                    // Values are for the same series, so append them.
                    lastSeries.Values = append(lastSeries.Values, row.Values...)
                    rowsMerged++
                }
            }

            // Append remaining rows as new rows.
            r.Series = r.Series[rowsMerged:]
            cr.Series = append(cr.Series, r.Series...)
            cr.Messages = append(cr.Messages, r.Messages...)
            cr.Partial = r.Partial
        } else {
            resp.Results = append(resp.Results, r)
        }
  1. chunked方式的Response: 從上面5中的chan循環(huán)讀取出來result, 每條result立即返回到client:
// Write out result immediately if chunked.
        if chunked {
            n, _ := rw.WriteResponse(Response{
                Results: []*query.Result{r},
            })
            atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n))
            w.(http.Flusher).Flush()
            continue
        }
  1. async請求處理: 簡單講就是不返回任何的查詢結(jié)果,也就是不支持,返回的http code是StatusNoContent
if async {
        go h.async(q, results)
        h.writeHeader(w, http.StatusNoContent)
        return
    }

Write請求的處理流程

  1. 寫入的line protocol例子:insert test_mea_1,tag1=v1,tag2=v2 cpu=1,memory=10啡直,對應(yīng)到http request:
    1.1 uri部分: /write?consistency=all&db=my_test_db_2&precision=ns&rp=
    1.2 body部分: test_mea_1,tag1=v1,tag2=v2 cpu=1,memory=10\n
  2. 實現(xiàn)在 func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.User)中;
    2.1 解析uri和body部分:
    database := r.URL.Query().Get("db")
    ...
    if h.Config.MaxBodySize > 0 { //限制body讀取的大小
        body = truncateReader(body, int64(h.Config.MaxBodySize))
    }
    if r.Header.Get("Content-Encoding") == "gzip" {
       //body解壓縮
    }
    ...
    _, err := buf.ReadFrom(body) //讀取body部分
    ...
    //解析 point
    points, parseError := models.ParsePointsWithPrecision(buf.Bytes(), time.Now().UTC(), r.URL.Query().Get("precision"))
    
    //決定多復(fù)本情況下的寫入一致性策略
    level := r.URL.Query().Get("consistency")
    ...
    // 寫入point
    h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, user, points); influxdb.IsClientError(err)

    // 失敗的話返回client返回信息
    h.httpError(..)
    
    // 成功時返回
    h.writeHeader(w, http.StatusNoContent)

其他Http request請求的處理不一一詳述

補(bǔ)充一下Influxdb中的Handler.AddRoute的實現(xiàn)

  1. 其作用就是添加http uri的路由信息烁涌,將相應(yīng)的uri與具體的handler函數(shù)對應(yīng)起來;
  2. Route的定義
 type Route struct {
    Name           string
    Method         string
    Pattern        string
    Gzipped        bool
    LoggingEnabled bool
    HandlerFunc    interface{}
}

  //query請求對應(yīng)的Route
   Route{
            "query", // Query serving route.
            "POST", "/query", true, true, h.serveQuery,
        }
        
    //寫請求對應(yīng)的Route
    Route{
        "write", // Data-ingest route.
        "POST", "/write", true, writeLogEnabled, h.serveWrite,
    }
  1. Influxdb使用了golang提供的net/http包來實現(xiàn)它的http服務(wù),具體的http請求都會對應(yīng)到相應(yīng)的http.Handler, 而http.Handler又使用了http.HandlerFunc來產(chǎn)生酒觅,參見:HandlerFunc, 這個AddRout就利用了HandlerFunc將handler層層包裝撮执,添加各種功能;
  2. 我們來剖析一下AddRoute的處理流程
    4.1 處理框架
// 針于每個route分別處理
for _, r := range routes {
        //利用route的定義和當(dāng)前influxdb的config來包裝生成handler
        var handler http.Handler
        ... //對handler進(jìn)行層層包裝
        //將route和handler添加到mux, 這里這個使用了第三方的模式復(fù)用器: https://github.com/bmizerany/pat
        h.mux.Add(r.Method, r.Pattern, handler)
}

4.2 添加驗證處理`handler = authenticate(hf, h, h.Config.AuthEnabled)

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        // influxdb的config里沒有啟動驗證,走下面的邏輯
        if !requireAuthentication {
            inner(w, r, nil)
            return
        }
        
        // 驗證通過會生成這個 meta.User舷丹,傳過最終的請求處理函數(shù)抒钱,作授權(quán)驗證
        var user meta.User

        // TODO corylanou: never allow this in the future without users
        if requireAuthentication && h.MetaClient.AdminUserExists() {
            creds, err := parseCredentials(r)
            if err != nil {
                atomic.AddInt64(&h.stats.AuthenticationFailures, 1)
                h.httpError(w, err.Error(), http.StatusUnauthorized)
                return
            }

            // http 驗證支持兩種,User和jwt Bearer驗證颜凯,這都有對應(yīng)的rfc,具體內(nèi)容不展開了
            // 其中user驗證又包括 basic auth和uri中自帶username和password兩種方式
            // 如果驗證不通過谋币,就直接返回給客戶端 h.httpError(w, "xxxx", http.StatusUnauthorized)
            switch creds.Method {
            case UserAuthentication:
                ...
            case BearerAuthentication:
                ...
            default:
                h.httpError(w, "unsupported authentication", http.StatusUnauthorized)
            }

        }
        
        // 調(diào)用最終的請求處理函數(shù)
        inner(w, r, user)
    })

4.3 handler = cors(handler) : 給response添加cors headers
4.4 handler = requestID(handler) : 給response添加request id
4.5 handler = h.recovery(handler, r.Name) : 在處理請求過程中捕獲panic

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市症概,隨后出現(xiàn)的幾起案子蕾额,更是在濱河造成了極大的恐慌,老刑警劉巖彼城,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件诅蝶,死亡現(xiàn)場離奇詭異退个,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)调炬,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進(jìn)店門语盈,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人筐眷,你說我怎么就攤上這事黎烈∠澳” “怎么了匀谣?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長资溃。 經(jīng)常有香客問我武翎,道長,這世上最難降的妖魔是什么溶锭? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任宝恶,我火速辦了婚禮,結(jié)果婚禮上趴捅,老公的妹妹穿的比我還像新娘垫毙。我一直安慰自己,他們只是感情好拱绑,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布综芥。 她就那樣靜靜地躺著,像睡著了一般猎拨。 火紅的嫁衣襯著肌膚如雪膀藐。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天红省,我揣著相機(jī)與錄音额各,去河邊找鬼。 笑死吧恃,一個胖子當(dāng)著我的面吹牛虾啦,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播痕寓,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼傲醉,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了厂抽?” 一聲冷哼從身側(cè)響起需频,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎筷凤,沒想到半個月后昭殉,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體苞七,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年挪丢,在試婚紗的時候發(fā)現(xiàn)自己被綠了蹂风。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡乾蓬,死狀恐怖惠啄,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情任内,我是刑警寧澤撵渡,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站死嗦,受9級特大地震影響趋距,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜越除,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一节腐、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧摘盆,春花似錦翼雀、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至肋殴,卻和暖如春囤锉,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背护锤。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工官地, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人烙懦。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓驱入,卻偏偏與公主長得像,于是被迫代替她去往敵國和親氯析。 傳聞我的和親對象是個殘疾皇子亏较,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,713評論 2 354

推薦閱讀更多精彩內(nèi)容