Golang-配置化 pipeline

Unix 中的 pipeline眶拉,用于程序間的統(tǒng)一接口憔维,示例:

tail -f biz.log | grep "ERROR" | grep "error_no=1234" | wc -l

借鑒這個設計思路微姊,我們想設計一個 pipeline 模塊航攒,需要有以下功能:

  1. 抽象數(shù)據(jù)處理器(Handler)
  2. 一組 Handler 可以串行
  3. 一組 Handler 可以并行
  4. Handler 可配置化
  5. Handler 可被引用


    pipeline.png

下面我們就以 Golang 為例呕屎,嘗試將 pipeline 的基本模型搭建出來:

接口 Handler

pipeline 中數(shù)據(jù)處理器的對外接口

// Args args for handler
type Args struct {
    InValue interface{}
    Params  map[string]interface{}
}

// Resp response for handler
type Resp struct {
    OutValue interface{}
    Params   map[string]interface{}
}

// Handler defines some one who can handle something
type Handler interface {
    Handle(ctx context.Context, args Args) (resp *Resp, err error)
}

Handler Builder

一個 Handler 需要一個 Builder 去實例化

// HandlerBuilder build a Handler with JSON conf
type HandlerBuilder interface {
    BuildHandlerByJSON(id string, confJSON string) (Handler, error)
}

Handler Option

用 JSON 配置文件實例化一個 Handler:

  1. 可以引用一個已存在的 Handler
  2. 也可以用一個 HandlerBuilder 構建一個 Handler
  3. 需要額外配置 Required, Timeout, DefaultValue
type Option struct {
    // ID ref of a existing handler
    ID string `json:"id"`

    // create a handler from pipe
    PipeName string          `json:"pipe_name"`
    PipeConf json.RawMessage `json:"pipe_conf"`

    // handler conf
    TimeOutMillisecond int64       `json:"time_out_millisecond"`
    Required           bool        `json:"required"`
    DefaultValue       interface{} `json:"default_value"`

    // Handler underlying handler
    Handler Handler `json:"handler"`
}

并行 Handler

  1. 一組 Handler让簿,為每個 Handler 起一個 goroutine 并發(fā)執(zhí)行售躁。
  2. 每個 Handler 都配置了 Required/Timeout/DefaultValue笑窜,當一個必要的(Requried=true) Handler 超時或報錯了,整體處理報錯憔狞,反之則以 DefaultValue 作為響應蹂安。
  3. 這個 并發(fā)執(zhí)行 本身也是一種處理器椭迎,即實現(xiàn)了 Handler 接口,可用被當做一個 Handler 用于其他處理流中田盈。
type Handlers []pipeline.Option

func (handlers Handlers) Handle(ctx context.Context, args pipeline.Args) (resp *pipeline.Resp, err error) {
    // prepare params
    var (
        wg       sync.WaitGroup
        fatalErr error
        hValChan = make(chan struct {
            idx int
            val interface{}
            err error
        })
        respData = make([]interface{}, len(handlers))
    )
    // set wait number
    wg.Add(len(handlers))

    // start goroutines to handle
    for i, h := range handlers {
        go func(index int, handler pipeline.Option) {
            defer wg.Done()         
            // do handle ...
            // push response
            hValChan <- struct {
                idx int
                val interface{}
                err error
            }{idx: index, val: respResult.val, err: respResult.err}
            return
        }(i, h)
    }

    // wait for response
    wg.Wait()
    close(hValChan)

    // handle responses
    for resp := range hValChan {
        if resp.err != nil {
            item := handlers[resp.idx]
            if item.Required {
                fatalErr = resp.err
                break
            }

            log.Printf("handle err: handler_id=%v, err=%v", item.ID, err)
            respData[resp.idx] = item.DefaultValue
            continue
        }

        respData[resp.idx] = resp.val
    }

    // build response
    return &pipeline.Resp{
        OutValue: respData,
        Params:   args.Params,
    }, fatalErr
}

串行 Handler

  1. 一組 Handler畜号,一個接一個地執(zhí)行。
  2. 每個 Handler 都配置了 Required/Timeout/DefaultValue允瞧,即當一個必要的(Requried=true) Handler 超時或報錯了简软,整體處理報錯,反之則以 DefaultValue 作為響應述暂。
  3. 這個 串行執(zhí)行 本身也是一個處理器痹升,即實現(xiàn)了 Handler 接口,可用作其他數(shù)據(jù)流的一個環(huán)節(jié)畦韭。
type Handlers struct {
    ID               string            `json:"id"`
    Handlers         []pipeline.Option `json:"handlers"`
}

func (handlers *Handlers) Handle(ctx context.Context, args pipeline.Args) (resp *pipeline.Resp, err error) {
    for step, h := range handlers.Handlers {
        inArgs := args

        if h.TimeOutMillisecond <= 0 {
            resp, err = h.Handler.Handle(ctx, args)
        } else {
            resp, err = handlers.stepWithTimeout(ctx, h, args)
        }

        if err != nil {
            if h.Required {
                return
            }

            log.Printf("line-handler failed: id=%v, step=%v, err=%v", handlers.ID, step, err)
            args = pipeline.Args{
                InValue: h.DefaultValue,
                Params:  inArgs.Params,
            }
            continue
        }

        args = pipeline.Args{
            InValue: resp.OutValue,
            Params:  resp.Params,
        }
    }

    return
}

// stepWithTimeout handles the args with timeout
func (handlers *Handlers) stepWithTimeout(ctx context.Context, h pipeline.Option, args pipeline.Args) (*pipeline.Resp, error) {
    hValChan := make(chan struct {
        resp *pipeline.Resp
        err  error
    })

    go func() {
        resp, err := h.Handler.Handle(ctx, args)
        hValChan <- struct {
            resp *pipeline.Resp
            err  error
        }{resp: resp, err: err}
    }()

    select {
    case <-time.After(time.Millisecond * time.Duration(h.TimeOutMillisecond)):
        return &pipeline.Resp{Params: args.Params}, errors.New("timeout: handler_id=" + h.ID)
    case r := <-hValChan:
        return r.resp, r.err
    }
}

源碼

github.com/Focinfi/pipeline

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末疼蛾,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子艺配,更是在濱河造成了極大的恐慌察郁,老刑警劉巖衍慎,帶你破解...
    沈念sama閱讀 219,490評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異绳锅,居然都是意外死亡西饵,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,581評論 3 395
  • 文/潘曉璐 我一進店門鳞芙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來眷柔,“玉大人,你說我怎么就攤上這事原朝⊙敝觯” “怎么了?”我有些...
    開封第一講書人閱讀 165,830評論 0 356
  • 文/不壞的土叔 我叫張陵喳坠,是天一觀的道長鞠评。 經常有香客問我,道長壕鹉,這世上最難降的妖魔是什么剃幌? 我笑而不...
    開封第一講書人閱讀 58,957評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮晾浴,結果婚禮上负乡,老公的妹妹穿的比我還像新娘。我一直安慰自己脊凰,他們只是感情好抖棘,可當我...
    茶點故事閱讀 67,974評論 6 393
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著狸涌,像睡著了一般切省。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上帕胆,一...
    開封第一講書人閱讀 51,754評論 1 307
  • 那天朝捆,我揣著相機與錄音,去河邊找鬼懒豹。 笑死右蹦,一個胖子當著我的面吹牛,可吹牛的內容都是我干的歼捐。 我是一名探鬼主播何陆,決...
    沈念sama閱讀 40,464評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼豹储!你這毒婦竟也來了贷盲?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎巩剖,沒想到半個月后铝穷,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 45,847評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡佳魔,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,995評論 3 338
  • 正文 我和宋清朗相戀三年曙聂,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片鞠鲜。...
    茶點故事閱讀 40,137評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡宁脊,死狀恐怖,靈堂內的尸體忽然破棺而出贤姆,到底是詐尸還是另有隱情榆苞,我是刑警寧澤,帶...
    沈念sama閱讀 35,819評論 5 346
  • 正文 年R本政府宣布霞捡,位于F島的核電站坐漏,受9級特大地震影響,放射性物質發(fā)生泄漏碧信。R本人自食惡果不足惜赊琳,卻給世界環(huán)境...
    茶點故事閱讀 41,482評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望砰碴。 院中可真熱鬧躏筏,春花似錦、人聲如沸衣式。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,023評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽碴卧。三九已至,卻和暖如春乃正,著一層夾襖步出監(jiān)牢的瞬間住册,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,149評論 1 272
  • 我被黑心中介騙來泰國打工瓮具, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留荧飞,地道東北人。 一個月前我還...
    沈念sama閱讀 48,409評論 3 373
  • 正文 我出身青樓名党,卻偏偏與公主長得像叹阔,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子传睹,可洞房花燭夜當晚...
    茶點故事閱讀 45,086評論 2 355

推薦閱讀更多精彩內容

  • 前奏 https://tech.meituan.com/2016/11/04/nio.html 綜述 netty通...
    jiangmo閱讀 5,860評論 0 13
  • 人不滿足,才能成長 書籍:書都不會讀睛藻,你還想成功 字數(shù):748字 前兩年启上,知道了一種植物,它的名字叫做多肉店印,這個...
    紅木姑娘閱讀 383評論 1 2
  • 如果樹上掛滿幸福 隨手摘取 會有多少心酸凋落 給予希望經幡 一世陽光 苦痛 清澈湖水藍天 云朵倒掛樹梢 表白柔甜 ...
    2016冰山來客閱讀 226評論 0 4
  • Dear baby: 原本想臨近預產期再對你在肚紙里的9個多月做個總結冈在,但今天的學校放假,讓媽咪的心也好像...
    海笑空靈閱讀 215評論 1 1
  • 大環(huán)境影響按摘,門庭冷清包券,忙而無利煩郁。 煩郁不如轉去院峡。罷罷罷兴使,自個?清, 索性偷懶閑書中照激,攀談周孔尋莊公发魄。書罷養(yǎng)花植...
    nomoreoo閱讀 238評論 0 1