在前面的《通過項目學習Go語言之...》系列文章中甘苍,我們對Go語言開發(fā)環(huán)境的配置以及開發(fā)項目最基礎的功能之go mod系宫、log做了一些入門的講解靶壮。同時从诲,也對項目gatekeeper中使用的核心組件gin做了一個稍微詳細的說明种蝶。
本篇契耿,將通過分析gatekeeper項目結構,理清通過代理到轉發(fā)到后端realserver以及將代理結果最終響應請求的全流程螃征。
下面先看一張getekeeper的http網(wǎng)關代理的請求響應全流程圖:
接下來搪桂,我們對整個服務的啟動和重要的流程節(jié)點進行源碼的分析。
啟動
一般服務啟動過程中主要是進行初始化盯滚、回調鉤子和監(jiān)聽注冊等基礎性工作踢械。gatekeeper也不例外,再啟動過程中魄藕,進行了配置加載内列、數(shù)據(jù)庫訪問初始化、http和tcp檢測注冊背率、系統(tǒng)信號監(jiān)聽等一系列工作话瞧。
//main.go
func main() {
conf = flag.String("config", "./conf/dev/", "input config file like ./conf/dev/")
flag.Parse()
//初始化mysql redis配置
lib.InitModule(*conf,[]string{"base","mysql","redis",})
defer lib.Destroy()
public.InitMysql()
public.InitConf()
//初始化配置、配置管理設置實時監(jiān)控配置變化并刷新
service.SysConfMgr = service.NewSysConfigManage()
service.SysConfMgr.InitConfig()
service.SysConfMgr.MonitorConfig()
//注冊請求前驗證request方法寝姿,進行請求處理前的驗證交排,只允許授權方訪問
//目前支持的是固定的配置模式,可以根據(jù)需求進行自定義
service.RegisterBeforeRequestAuthFunc(service.AuthAppToken)
//注冊請求后更改response方法
service.RegisterModifyResponseFunc(service.FilterCityData([]string{"/gatekeeper/tester_filter/goods_list"}))
//啟動http饵筑、tcp監(jiān)聽
router.HTTPServerRun()
router.TCPServerRun()
//注冊系統(tǒng)信號監(jiān)聽
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
//注冊服務優(yōu)雅關閉
router.TCPServerStop()
router.HTTPServerStop()
signal.Stop(quit)
}
Http服務監(jiān)聽是處理來自外部的全部請求埃篓,下面看一下HTTPServerRun,getekeeper的http服務是基于gin來實現(xiàn)的根资。
//httpserver.go
//HTTPServerRun 服務啟動
func HTTPServerRun() {
//設置運行模式 debug
gin.SetMode(lib.ConfBase.DebugMode)
//初始化路由
r := InitRouter()
//設置http server運行參數(shù)
HTTPSrvHandler = &http.Server{
Addr: lib.GetStringConf("base.http.addr"),
Handler: r,
ReadTimeout: time.Duration(lib.GetIntConf("base.http.read_timeout")) * time.Second,
WriteTimeout: time.Duration(lib.GetIntConf("base.http.write_timeout")) * time.Second,
MaxHeaderBytes: 1 << uint(lib.GetIntConf("base.http.max_header_bytes")),
}
//設置recover 以實現(xiàn)遇到panic時服務不中斷
//啟動http服務
go func() {
defer func() {
if err := recover(); err != nil {
public.SysLogger.Error("HttpServerRun_recover:%v", err)
}
}()
log.Printf(" [INFO] HttpServer %s listening\n",lib.GetStringConf("base.http.addr"))
if err := HTTPSrvHandler.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf(" [ERROR] HttpServer %s err:%v\n", lib.GetStringConf("base.http.addr"), err)
}
}()
}
路由注冊
在上面啟動http服務的代碼中架专,調用了InitRouter()對相關的路由進行的設置,分別注冊了四組路由:admin(管理后臺)嫂冻、gateway(服務探活ping)胶征、cluster(集群管理,reload處理配置變更刷新)桨仿、gatekeeper(網(wǎng)關代理服務)睛低;另外還啟動了一個靜態(tài)服務器/assets,用于管理后臺資源的服務器。下面我們看一下詳細的代碼:
//httprouter.go
//InitRouter 聲明http路由
func InitRouter() *gin.Engine {
//創(chuàng)建路由r(engine)钱雷,注冊中間件
router := gin.New()
//加載gin內置中間件骂铁,recovery所有panic是服務長久運行不中斷
router.Use(middleware.Recovery())
//admin
admin := router.Group("/admin")
admin.Use(middleware.RequestTraceLog())
{
controller.AdminRegister(admin)
}
//assets
router.Static("/assets", "./tmpl/green/assets")
//gateway
//提供探活
gateway := controller.Gateway{}
router.GET("/ping", gateway.Ping)
//cluster
//提供服務配置刷新
csr:=router.Group("/")
csr.Use(middleware.ClusterAuth())
csr.GET("/reload", gateway.Reload)
//注冊gatekeeper路由組,代理所有網(wǎng)關請求
gw:=router.Group(lib.GetStringConf("base.http.route_prefix"))
//注冊一系列中間件
gw.Use(
middleware.RequestTraceLog(),
middleware.MatchRule(),
middleware.AccessControl(),
middleware.HTTPLimit(),
//todo 拓展中間件
//核心中間件部分
middleware.LoadBalance())
{
gw.GET("/*action", gateway.Index)
gw.POST("/*action", gateway.Index)
gw.DELETE("/*action", gateway.Index)
gw.OPTIONS("/*action", gateway.Index)
}
return router
}
中間件注冊
在上面的代碼中罩抗,可以看到默認注冊了RequestTraceLog(),MatchRule(),AccessControl(),HTTPLimit(),LoadBalance()五個getekeeper項目自定義的中間件拉庵,分配用于請求treace日志記錄、規(guī)則匹配套蒂、訪問控制钞支、限流和負載均衡。
- RequestTraceLog
通過trace可以快速定位請求鏈操刀,在進行微服務構建時烁挟,使用teaceid機制能夠很方便的繪制請求的整個鏈路,以方便問題的排查和各段服務的耗時記錄骨坑,優(yōu)化服務撼嗓。
//trace_log.go
//RequestTraceLog trace中間件
func RequestTraceLog() gin.HandlerFunc {
return func(c *gin.Context) {
RequestInLog(c)
defer RequestOutLog(c)
c.Next()
}
}
//RequestInLog 請求進入日志
func RequestInLog(c *gin.Context) {
//初始化trance 默認分配一個traceid
traceContext := lib.NewTrace()
//如果能在header中獲取到traceid,則使用已有的id
if traceID := c.Request.Header.Get("didi-header-rid"); traceID != "" {
traceContext.TraceId = traceID
}
if spanID := c.Request.Header.Get("didi-header-spanid"); spanID != "" {
traceContext.SpanId = spanID
}
...
//將trace相關的追蹤信息保存進上下文request
c.Set("startExecTime", time.Now())
c.Set("trace", traceContext)
c.Request.Header.Set("didi-header-rid", traceContext.TraceId)
c.Request = c.Request.WithContext(context.WithValue(c.Request.Context(), public.ContextKey("trace"), traceContext))
c.Request = c.Request.WithContext(context.WithValue(c.Request.Context(), public.ContextKey("request_url"), c.Request.URL.Path))
c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(bodyBytes))
}
- MatchRule
用于規(guī)則的匹配欢唾,基于分解URI實現(xiàn)和管理配置后臺相對應的轉發(fā)邏輯的匹配且警,
//match_rule.go
//MatchRule 匹配模塊中間件
func MatchRule() gin.HandlerFunc {
return func(c *gin.Context) {
//初始化geteway service
gws := service.NewGateWayService(c.Writer, c.Request)
//進行規(guī)則匹配 選出匹配的module
if err := gws.MatchRule(); err != nil {
public.ResponseError(c, http.StatusBadRequest, err)
return
}
c.Set(MiddlewareServiceKey,gws)
}
}
- AccessControl
權限控制中間件,用于對外部過來的請求就行訪問控制過濾礁遣,目前默認支持的包括IP黑白名單斑芜、主機白名單以及請求注冊函數(shù)過濾
//gate_service.go
//AccessControl 權限驗證
func (o *GateWayService) AccessControl() error {
if o.currentModule.AccessControl == nil {
return nil
}
ctx := public.NewContext(o.w, o.req)
var errmsg string
switch {
case !AuthModuleOpened(o, ctx):
public.ContextNotice(o.req.Context(), DLTagAccessControlSuccess, map[string]interface{}{
"msg": "access_control_not_open",
})
return nil
case AuthInBlackIPList(o, ctx):
public.ContextNotice(o.req.Context(), DLTagAccessControlFailure, map[string]interface{}{
"msg": "AuthInBlackIPList",
})
return errors.New("msg:AuthInBlackIPList")
case AuthInWhiteIPList(o, ctx):
public.ContextNotice(o.req.Context(), DLTagAccessControlSuccess, map[string]interface{}{
"msg": "AuthWhiteIPList_success",
})
return nil
case AuthInWhiteHostList(o, ctx):
public.ContextNotice(o.req.Context(), DLTagAccessControlSuccess, map[string]interface{}{
"msg": "AuthWhiteHostList_success",
})
return nil
case AuthRegisterFunc(o, &errmsg):
public.ContextNotice(o.req.Context(), DLTagAccessControlSuccess, map[string]interface{}{
"msg": "AuthRegisterFunc_success",
})
return nil
}
if errmsg==""{
errmsg="auth_failure"
}
public.ContextWarning(o.req.Context(), DLTagAccessControlFailure, map[string]interface{}{
"msg": errmsg,
})
return errors.New(errmsg)
}
- HTTPLimit
限流控制中間件,以保護后端的realserver服務器以免過載導致服務異常不可用亡脸。
//HTTPLimit http限流中間件
func HTTPLimit() gin.HandlerFunc {
return func(c *gin.Context) {
//獲取上游服務
gws, ok := c.MustGet(MiddlewareServiceKey).(*service.GateWayService)
if !ok {
public.ResponseError(c, http.StatusBadRequest, errors.New("gateway_service not valid"))
return
}
//入口流量統(tǒng)計
currentModule := gws.CurrentModule()
counter := public.FlowCounterHandler.GetRequestCounter(currentModule.Base.Name)
counter.Increase(c.Request.Context(), c.Request.RemoteAddr)
//客戶端ip限流
remoteIP := public.Substr(c.Request.RemoteAddr, 0, int64(strings.Index(c.Request.RemoteAddr, ":")))
if currentModule.AccessControl.ClientFlowLimit > 0 {
limiter := public.FlowLimiterHandler.GetModuleIPVisitor(currentModule.Base.Name+"_"+remoteIP, currentModule.AccessControl.ClientFlowLimit)
if limiter.Allow() == false {
errmsg := fmt.Sprintf("moduleName:%s remoteIP:%s, QPS limit : %d, %d", currentModule.Base.Name, remoteIP, int64(limiter.Limit()), limiter.Burst())
public.ContextWarning(c.Request.Context(), service.DLTagAccessControlFailure, map[string]interface{}{
"msg": errmsg,
"ip": remoteIP,
"moduleName": currentModule.Base.Name,
})
public.ResponseError(c, http.StatusBadRequest, errors.New(errmsg))
}
}
//todo
c.Next()
}
}
- LoadBalance
該中間件是最核心的部門押搪,它負責整個負責均衡的處理,負責根據(jù)匹配規(guī)則創(chuàng)建正確的proxy浅碾,進行正常服務的處理。目前提供的負載均衡策略是rr续语,同時也實現(xiàn)了后端realserver的探活自動剔除策略垂谢。
//load_balance.go
//LoadBalance 負載均衡中間件
func LoadBalance() gin.HandlerFunc {
return func(c *gin.Context) {
gws,ok:=c.MustGet(MiddlewareServiceKey).(*service.GateWayService)
if !ok{
public.ResponseError(c, http.StatusBadRequest, errors.New("gateway_service not valid"))
return
}
//進入核心負載算法,選出正常的服務
proxy, err := gws.LoadBalance()
if err != nil {
public.ResponseError(c, http.StatusProxyAuthRequired, err)
return
}
requestBody,ok:=c.MustGet(MiddlewareRequestBodyKey).([]byte)
if !ok{
public.ResponseError(c, http.StatusBadRequest, errors.New("request_body not valid"))
return
}
c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(requestBody))
proxy.ServeHTTP(c.Writer, c.Request)
c.Abort()
}
}
//LoadBalance 請求負載
func (o *GateWayService) LoadBalance() (*httputil.ReverseProxy, error) {
ipList, err := SysConfMgr.GetModuleIPList(o.currentModule.Base.Name)
if err != nil {
public.ContextWarning(o.req.Context(), DLTagLoadBalanceFailure, map[string]interface{}{
"msg": err,
"modulename": o.currentModule.Base.Name,
"availableIpList": SysConfMgr.GetAvaliableIPList(o.currentModule.Base.Name),
})
return nil, errors.New("get_iplist_error")
}
if len(ipList) == 0 {
public.ContextWarning(o.req.Context(), DLTagLoadBalanceFailure, map[string]interface{}{
"msg": "empty_iplist_error",
"modulename": o.currentModule.Base.Name,
"availableIpList": SysConfMgr.GetAvaliableIPList(o.currentModule.Base.Name),
})
return nil, errors.New("empty_iplist_error")
}
//正常proxy的遴選
proxy, err := o.GetModuleHTTPProxy()
if err != nil {
public.ContextWarning(o.req.Context(), DLTagLoadBalanceFailure, map[string]interface{}{
"msg": err,
"module": o.currentModule.Base.Name,
})
return nil, err
}
return proxy, nil
}
//GetModuleHTTPProxy 獲取模塊的代理
func (o *GateWayService) GetModuleHTTPProxy() (*httputil.ReverseProxy, error) {
proxy,err:=SysConfMgr.GetModuleHTTPProxy(o.currentModule.Base.Name)
if err != nil {
public.ContextWarning(o.req.Context(), DLTagLoadBalanceFailure, map[string]interface{}{
"err": err,
"module": o.currentModule.Base.Name,
})
return &httputil.ReverseProxy{}, err
}
return proxy,nil
}
//GetModuleHTTPProxy 獲取http代理方法
func (s *SysConfigManage) GetModuleHTTPProxy(moduleName string) (*httputil.ReverseProxy, error) {
//基于rr的module選擇
rr, err := s.GetModuleRR(moduleName)
if err != nil {
return nil, err
}
s.moduleProxyFuncMapLocker.RLock()
defer s.moduleProxyFuncMapLocker.RUnlock()
proxyFunc, ok := s.moduleProxyFuncMap[moduleName]
if ok {
return proxyFunc(rr), nil
}
return nil, errors.New("module proxy empty")
}
以上疮茄,我們對getekeeper的啟動以及核心的組件加載和請求流程進行了分析滥朱,從源碼上看,getekeeper的代碼思路還是很清晰明了的力试。
本節(jié)完徙邻。