Unix 中的 pipeline眶拉,用于程序間的統(tǒng)一接口憔维,示例:
tail -f biz.log | grep "ERROR" | grep "error_no=1234" | wc -l
借鑒這個設計思路微姊,我們想設計一個 pipeline 模塊航攒,需要有以下功能:
- 抽象數(shù)據(jù)處理器(Handler)
- 一組 Handler 可以串行
- 一組 Handler 可以并行
- Handler 可配置化
-
Handler 可被引用
pipeline.png
下面我們就以 Golang 為例呕屎,嘗試將 pipeline 的基本模型搭建出來:
接口 Handler
pipeline 中數(shù)據(jù)處理器的對外接口
// Args args for handler
type Args struct {
InValue interface{}
Params map[string]interface{}
}
// Resp response for handler
type Resp struct {
OutValue interface{}
Params map[string]interface{}
}
// Handler defines some one who can handle something
type Handler interface {
Handle(ctx context.Context, args Args) (resp *Resp, err error)
}
Handler Builder
一個 Handler
需要一個 Builder
去實例化
// HandlerBuilder build a Handler with JSON conf
type HandlerBuilder interface {
BuildHandlerByJSON(id string, confJSON string) (Handler, error)
}
Handler Option
用 JSON 配置文件實例化一個 Handler:
- 可以引用一個已存在的 Handler
- 也可以用一個
HandlerBuilder
構建一個Handler
- 需要額外配置
Required, Timeout, DefaultValue
type Option struct {
// ID ref of a existing handler
ID string `json:"id"`
// create a handler from pipe
PipeName string `json:"pipe_name"`
PipeConf json.RawMessage `json:"pipe_conf"`
// handler conf
TimeOutMillisecond int64 `json:"time_out_millisecond"`
Required bool `json:"required"`
DefaultValue interface{} `json:"default_value"`
// Handler underlying handler
Handler Handler `json:"handler"`
}
并行 Handler
- 一組 Handler让簿,為每個 Handler 起一個
goroutine
并發(fā)執(zhí)行售躁。 - 每個 Handler 都配置了
Required/Timeout/DefaultValue
笑窜,當一個必要的(Requried=true
) Handler 超時或報錯了,整體處理報錯憔狞,反之則以DefaultValue
作為響應蹂安。 - 這個 并發(fā)執(zhí)行 本身也是一種處理器椭迎,即實現(xiàn)了 Handler 接口,可用被當做一個 Handler 用于其他處理流中田盈。
type Handlers []pipeline.Option
func (handlers Handlers) Handle(ctx context.Context, args pipeline.Args) (resp *pipeline.Resp, err error) {
// prepare params
var (
wg sync.WaitGroup
fatalErr error
hValChan = make(chan struct {
idx int
val interface{}
err error
})
respData = make([]interface{}, len(handlers))
)
// set wait number
wg.Add(len(handlers))
// start goroutines to handle
for i, h := range handlers {
go func(index int, handler pipeline.Option) {
defer wg.Done()
// do handle ...
// push response
hValChan <- struct {
idx int
val interface{}
err error
}{idx: index, val: respResult.val, err: respResult.err}
return
}(i, h)
}
// wait for response
wg.Wait()
close(hValChan)
// handle responses
for resp := range hValChan {
if resp.err != nil {
item := handlers[resp.idx]
if item.Required {
fatalErr = resp.err
break
}
log.Printf("handle err: handler_id=%v, err=%v", item.ID, err)
respData[resp.idx] = item.DefaultValue
continue
}
respData[resp.idx] = resp.val
}
// build response
return &pipeline.Resp{
OutValue: respData,
Params: args.Params,
}, fatalErr
}
串行 Handler
- 一組 Handler畜号,一個接一個地執(zhí)行。
- 每個 Handler 都配置了
Required/Timeout/DefaultValue
允瞧,即當一個必要的(Requried=true
) Handler 超時或報錯了简软,整體處理報錯,反之則以DefaultValue
作為響應述暂。 - 這個 串行執(zhí)行 本身也是一個處理器痹升,即實現(xiàn)了 Handler 接口,可用作其他數(shù)據(jù)流的一個環(huán)節(jié)畦韭。
type Handlers struct {
ID string `json:"id"`
Handlers []pipeline.Option `json:"handlers"`
}
func (handlers *Handlers) Handle(ctx context.Context, args pipeline.Args) (resp *pipeline.Resp, err error) {
for step, h := range handlers.Handlers {
inArgs := args
if h.TimeOutMillisecond <= 0 {
resp, err = h.Handler.Handle(ctx, args)
} else {
resp, err = handlers.stepWithTimeout(ctx, h, args)
}
if err != nil {
if h.Required {
return
}
log.Printf("line-handler failed: id=%v, step=%v, err=%v", handlers.ID, step, err)
args = pipeline.Args{
InValue: h.DefaultValue,
Params: inArgs.Params,
}
continue
}
args = pipeline.Args{
InValue: resp.OutValue,
Params: resp.Params,
}
}
return
}
// stepWithTimeout handles the args with timeout
func (handlers *Handlers) stepWithTimeout(ctx context.Context, h pipeline.Option, args pipeline.Args) (*pipeline.Resp, error) {
hValChan := make(chan struct {
resp *pipeline.Resp
err error
})
go func() {
resp, err := h.Handler.Handle(ctx, args)
hValChan <- struct {
resp *pipeline.Resp
err error
}{resp: resp, err: err}
}()
select {
case <-time.After(time.Millisecond * time.Duration(h.TimeOutMillisecond)):
return &pipeline.Resp{Params: args.Params}, errors.New("timeout: handler_id=" + h.ID)
case r := <-hValChan:
return r.resp, r.err
}
}