Golang框架實戰(zhàn)-KisFlow流式計算框架專欄
Golang框架實戰(zhàn)-KisFlow流式計算框架(1)-概述
Golang框架實戰(zhàn)-KisFlow流式計算框架(2)-項目構(gòu)建/基礎(chǔ)模塊-(上)
Golang框架實戰(zhàn)-KisFlow流式計算框架(3)-項目構(gòu)建/基礎(chǔ)模塊-(下)
Golang框架實戰(zhàn)-KisFlow流式計算框架(4)-數(shù)據(jù)流
Golang框架實戰(zhàn)-KisFlow流式計算框架(5)-Function調(diào)度
Golang框架實戰(zhàn)-KisFlow流式計算框架(6)-Connector
Golang框架實戰(zhàn)-KisFlow流式計算框架(7)-配置導(dǎo)入與導(dǎo)出
Golang框架實戰(zhàn)-KisFlow流式計算框架(8)-KisFlow Action
Golang框架實戰(zhàn)-KisFlow流式計算框架(9)-Cache/Params 數(shù)據(jù)緩存與數(shù)據(jù)參數(shù)
Golang框架實戰(zhàn)-KisFlow流式計算框架(10)-Flow多副本
接下來我們來增強KisFlow中Function對業(yè)務(wù)數(shù)據(jù)處理的聚焦堵腹,將之前Function的寫法:
func FuncDemo3Handler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call funcName3Handler ----")
fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())
for _, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
}
return nil
}
是從flow.Input()
中 獲取到原始數(shù)據(jù)斥铺,改成可以直接獲取到業(yè)務(wù)想要的具體數(shù)據(jù)結(jié)構(gòu)類型伦吠,而無需斷言等類型判斷和轉(zhuǎn)換玛歌。改成的Function擴展參數(shù)用法大致如下:
proto
type StuScores struct {
StuId int `json:"stu_id"`
Score1 int `json:"score_1"`
Score2 int `json:"score_2"`
Score3 int `json:"score_3"`
}
type StuAvgScore struct {
StuId int `json:"stu_id"`
AvgScore float64 `json:"avg_score"`
}
FaaS
type AvgStuScoreIn struct {
serialize.DefaultSerialize
proto.StuScores
}
type AvgStuScoreOut struct {
serialize.DefaultSerialize
proto.StuAvgScore
}
// AvgStuScore(FaaS) 計算學生平均分
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
for _, row := range rows {
avgScore := proto.StuAvgScore{
StuId: row.StuId,
AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
}
// 提交結(jié)果數(shù)據(jù)
_ = flow.CommitRow(AvgStuScoreOut{StuAvgScore: avgScore})
}
return nil
}
這樣帆疟,我們可以通過第三個形式參數(shù)rows
直接拿到我們期待的目標輸出結(jié)構(gòu)體數(shù)據(jù)轻要,不需要斷言和轉(zhuǎn)換,更加關(guān)注業(yè)務(wù)方的開發(fā)效率旱爆。
當然,如果你希望獲取到原始的數(shù)據(jù)窘茁,依然可以從flow.Input()
中獲取到怀伦。
本章將實現(xiàn)KisFlow上述功能。
11.1 FaaS業(yè)務(wù)回調(diào)函數(shù)自描述
本節(jié)將完成FaaS的自描述概念改造山林,我們知道之前的FaaS回調(diào)如下:
type FaaS func(context.Context, Flow) error
那么我們需要一個結(jié)構(gòu)體房待,來描述這個函數(shù)屬性,包括他的函數(shù)名稱驼抹、函數(shù)地址桑孩、形參數(shù)量、相殘類型框冀、返回值類型等等流椒。
11.1.1 FaaSDesc 回調(diào)自描述類型
在kis-flow/kis/
下,新創(chuàng)建一個文件faas.go
明也,定義如下結(jié)構(gòu)體:
kis-flow/kis/faas.go
// FaaS Function as a Service
// 將
// type FaaS func(context.Context, Flow) error
// 改為
// type FaaS func(context.Context, Flow, ...interface{}) error
// 可以通過可變參數(shù)的任意輸入類型進行數(shù)據(jù)傳遞
type FaaS interface{}
// FaaSDesc FaaS 回調(diào)計算業(yè)務(wù)函數(shù) 描述
type FaaSDesc struct {
FnName string // Function名稱
f interface{} // FaaS 函數(shù)
fName string // 函數(shù)名稱
ArgsType []reflect.Type // 函數(shù)參數(shù)類型(集合)
ArgNum int // 函數(shù)參數(shù)個數(shù)
FuncType reflect.Type // 函數(shù)類型
FuncValue reflect.Value // 函數(shù)值(函數(shù)地址)
}
將之前的FaaS改進成interface{}
宣虾,而FaaSDesc
具備了一些屬性。
- FnName: 表示當前Function的名稱温数,例如我們之前例子的"funcDemo1" 等绣硝,這個是用來KisFlow給Function標識的FunctionName。
- f:表示定義的FaaS函數(shù)撑刺。
- fName: 定義f函數(shù)的函數(shù)名稱鹉胖。
- ArgsType:定義的f函數(shù)的形參類型列表,這是一個slice。
- ArgNum:定義的f函數(shù)的輸入形參個數(shù)甫菠。
- FuncType:定義的f函數(shù)的數(shù)據(jù)類型挠铲。
- FuncValue:定義的f函數(shù)的函數(shù)值(可以被調(diào)度的函數(shù)地址)。
11.1.2 新建一個FaaSDesc對象
下面淑蔚,提供一個新建FaaSDesc
的構(gòu)造函數(shù)市殷,形參的類型就是KisFlow的FunctionName和定義的FaaS函數(shù),如下:
kis-flow/kis/faas.go
// NewFaaSDesc 根據(jù)用戶注冊的FnName 和FaaS 回調(diào)函數(shù)刹衫,創(chuàng)建 FaaSDesc 描述實例
func NewFaaSDesc(fnName string, f FaaS) (*FaaSDesc, error) {
// 傳入的回調(diào)函數(shù)FaaS,函數(shù)值(函數(shù)地址)
funcValue := reflect.ValueOf(f)
// 傳入的回調(diào)函數(shù)FaaS 類型
funcType := funcValue.Type()
// 判斷傳遞的FaaS指針是否是函數(shù)類型
if !isFuncType(funcType) {
return nil, fmt.Errorf("provided FaaS type is %s, not a function", funcType.Name())
}
// 判斷傳遞的FaaS函數(shù)是否有返回值類型是只包括(error)
if funcType.NumOut() != 1 || funcType.Out(0) != reflect.TypeOf((*error)(nil)).Elem() {
return nil, errors.New("function must have exactly one return value of type error")
}
// FaaS函數(shù)的參數(shù)類型集合
argsType := make([]reflect.Type, funcType.NumIn())
// 獲取FaaS的函數(shù)名稱
fullName := runtime.FuncForPC(funcValue.Pointer()).Name()
// 確保 FaaS func(context.Context, Flow, ...interface{}) error 形參列表醋寝,存在context.Context 和 kis.Flow
// 是否包含kis.Flow類型的形參
containsKisFlow := false
// 是否包含context.Context類型的形參
containsCtx := false
// 遍歷FaaS的形參類型
for i := 0; i < funcType.NumIn(); i++ {
// 取出第i個形式參數(shù)類型
paramType := funcType.In(i)
if isFlowType(paramType) {
// 判斷是否包含kis.Flow類型的形參
containsKisFlow = true
} else if isContextType(paramType) {
// 判斷是否包含context.Context類型的形參
containsCtx = true
} else if isSliceType(paramType) {
// 判斷是否包含Slice類型的形參
// 獲取當前參數(shù)Slice的元素類型
itemType := paramType.Elem()
// 如果當前參數(shù)是一個指針類型,則獲取指針指向的結(jié)構(gòu)體類型
if itemType.Kind() == reflect.Ptr {
itemType = itemType.Elem() // 獲取指針指向的結(jié)構(gòu)體類型
}
} else {
// Other types are not supported...
}
// 將當前形參類型追加到argsType集合中
argsType[i] = paramType
}
if !containsKisFlow {
// 不包含kis.Flow類型的形參带迟,返回錯誤
return nil, errors.New("function parameters must have kis.Flow param, please use FaaS type like: [type FaaS func(context.Context, Flow, ...interface{}) error]")
}
if !containsCtx {
// 不包含context.Context類型的形參音羞,返回錯誤
return nil, errors.New("function parameters must have context, please use FaaS type like: [type FaaS func(context.Context, Flow, ...interface{}) error]")
}
// 返回FaaSDesc描述實例
return &FaaSDesc{
FnName: fnName,
f: f,
fName: fullName,
ArgsType: argsType,
ArgNum: len(argsType),
FuncType: funcType,
FuncValue: funcValue,
}, nil
}
這里面通過用reflect反射能力,依次從f函數(shù)中獲取相關(guān)的屬性值仓犬,存放在FaaSDesc
中嗅绰。
這里面為了確保開發(fā)者傳遞的FaaS原因滿足如下格式:
type FaaS func(context.Context, Flow, ...interface{}) error
所以對形參context.Context
和形參Flow
做了嚴格的形參類型校驗,其中的校驗方法如下:
kis-flow/kis/faas.go
// isFuncType 判斷傳遞進來的 paramType 是否是函數(shù)類型
func isFuncType(paramType reflect.Type) bool {
return paramType.Kind() == reflect.Func
}
// isFlowType 判斷傳遞進來的 paramType 是否是 kis.Flow 類型
func isFlowType(paramType reflect.Type) bool {
var flowInterfaceType = reflect.TypeOf((*Flow)(nil)).Elem()
return paramType.Implements(flowInterfaceType)
}
// isContextType 判斷傳遞進來的 paramType 是否是 context.Context 類型
func isContextType(paramType reflect.Type) bool {
typeName := paramType.Name()
return strings.Contains(typeName, "Context")
}
// isSliceType 判斷傳遞進來的 paramType 是否是切片類型
func isSliceType(paramType reflect.Type) bool {
return paramType.Kind() == reflect.Slice
}
在NewFaaSDesc()
用containsKisFlow
和containsCtx
兩個bool類型的變量來判斷是否包括Context和Flow類型搀继。
下面這段代碼是為了兼容傳遞的形參類型是結(jié)構(gòu)體指針時候的兼容:
// ... ...
// 獲得當前形參類型
itemType := paramType.Elem()
// 如果當前參數(shù)是一個指針類型窘面,則獲取指針指向的結(jié)構(gòu)體類型
if itemType.Kind() == reflect.Ptr {
itemType = itemType.Elem() // 獲取指針指向的結(jié)構(gòu)體類型
}
// ... ...
比如開發(fā)者傳遞的FaaS函數(shù)原型如下:
func MyFaaSDemo(ctx context.Context, flow Flow, []*A) error
和:
func MyFaaSDemo(ctx context.Context, flow Flow, []A) error
11.1.3 注冊FaaS函數(shù)
那么接下來,我們將kisPool
模塊叽躯,的注冊FaaS函數(shù)的方法修改成注冊一個FaaSDesc描述财边,修改后的注冊方法如下:
kis-flow/kis/pool.go
// FaaS 注冊 Function 計算業(yè)務(wù)邏輯, 通過Function Name 索引及注冊
func (pool *kisPool) FaaS(fnName string, f FaaS) {
// 當注冊FaaS計算邏輯回調(diào)時,創(chuàng)建一個FaaSDesc描述對象
faaSDesc, err := NewFaaSDesc(fnName, f)
if err != nil {
panic(err)
}
pool.fnLock.Lock() // 寫鎖
defer pool.fnLock.Unlock()
if _, ok := pool.fnRouter[fnName]; !ok {
// 將FaaSDesc描述對象注冊到fnRouter中
pool.fnRouter[fnName] = faaSDesc
} else {
errString := fmt.Sprintf("KisPoll FaaS Repeat FuncName=%s", fnName)
panic(errString)
}
log.Logger().InfoF("Add KisPool FuncName=%s", fnName)
}
那么現(xiàn)在的fnRouter
中保存的key依然是FunctionName点骑,但是value則為當前FaaS函數(shù)的描述對象FaaSDesc.
11.1.4 kisPool調(diào)度FaaSDesc
最后再調(diào)度Function的時候酣难,通過FaaSDesc取出調(diào)度函數(shù)地址和函數(shù)形參列表進行函數(shù)的調(diào)度。
修改的后的CallFunction()
如下:
kis-flow/kis/pool.go
// CallFunction 調(diào)度 Function
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {
if funcDesc, ok := pool.fnRouter[fnName]; ok {
// 被調(diào)度Function的形參列表
params := make([]reflect.Value, 0, funcDesc.ArgNum)
for _, argType := range funcDesc.ArgsType {
// 如果是Flow類型形參黑滴,則將 flow的值傳入
if isFlowType(argType) {
params = append(params, reflect.ValueOf(flow))
continue
}
// 如果是Context類型形參憨募,則將 ctx的值傳入
if isContextType(argType) {
params = append(params, reflect.ValueOf(ctx))
continue
}
// 如果是Slice類型形參,則將 flow.Input()的值傳入
if isSliceType(argType) {
params = append(params, value)
continue
}
// 傳遞的參數(shù)袁辈,既不是Flow類型菜谣,也不是Context類型,也不是Slice類型吵瞻,則默認給到零值
params = append(params, reflect.Zero(argType))
}
// 調(diào)用當前Function 的計算邏輯
retValues := funcDesc.FuncValue.Call(params)
// 取出第一個返回值葛菇,如果是nil,則返回nil
ret := retValues[0].Interface()
if ret == nil {
return nil
}
// 如果返回值是error類型橡羞,則返回error
return retValues[0].Interface().(error)
}
log.Logger().ErrorFX(ctx, "FuncName: %s Can not find in KisPool, Not Added.\n", fnName)
return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.")
}
函數(shù)的整體調(diào)度邏輯大致如下:
首選通過fnName進行從fnRouter
路由到對應(yīng)的FaaSDesc眯停。遍歷FaaSDesc的形參列表:
將Context和Flow對象依次取出來,將額外傳遞的自定義切片形參取出來卿泽,如果傳遞的參數(shù)莺债,既不是Flow類型滋觉,也不是Context類型,也不是Slice類型齐邦,則默認給到零值椎侠,如下:
params = append(params, reflect.Zero(argType))
最后執(zhí)行函數(shù)的調(diào)度:
retValues := funcDesc.FuncValue.Call(params)
得到第一個返回值error的數(shù)值,為nil則返回nil措拇,否則返回error類型我纪。
這樣我們的FaaS自描述的調(diào)度模式就建立成功了,那么有了這套功能KisFlow可以做什么事情呢丐吓,下一節(jié)我們可以再調(diào)度FaaSDesc的時候?qū)鬟f的自定義形參的數(shù)據(jù)類型進行序列化浅悉,得到開發(fā)者期待的數(shù)據(jù)類型。
11.2 FaaS形參的自定義數(shù)據(jù)類型序列化
11.2.1 Serialize序列化接口
首先券犁,我們定義一個數(shù)據(jù)序列化接口术健,在kis-flow/kis/
下創(chuàng)建serialize.go 文件,如下:
kis-flow/kis/serialize.go
// Serialize 數(shù)據(jù)序列化接口
type Serialize interface {
// UnMarshal 用于將 KisRowArr 反序列化為指定類型的值粘衬。
UnMarshal(common.KisRowArr, reflect.Type) (reflect.Value, error)
// Marshal 用于將指定類型的值序列化為 KisRowArr荞估。
Marshal(interface{}) (common.KisRowArr, error)
}
其中KisRowArr是我們KisFlow中傳遞每個Function的數(shù)據(jù)切片,之前我們定義在了kis-flow/common/data_type.go
中:
package common
// KisRow 一行數(shù)據(jù)
type KisRow interface{}
// KisRowArr 一次業(yè)務(wù)的批量數(shù)據(jù)
type KisRowArr []KisRow
/*
KisDataMap 當前Flow承載的全部數(shù)據(jù)
key : 數(shù)據(jù)所在的Function ID
value: 對應(yīng)的KisRow
*/
type KisDataMap map[string]KisRowArr
Serialize
提供了兩個接口:
- UnMarshal:用于將 KisRowArr 反序列化為指定類型的值稚新。
- Marshal:用于將指定類型的值序列化為 KisRowArr勘伺。
KisFlow會提供一個默認的Serialize
給每個FaaS函數(shù),開發(fā)者也可以自定義自己的Serialize
來對FaaS傳遞的形參進行自定義的數(shù)據(jù)序列化動作褂删。
11.2.2 KisFlow默認的Serialize序列化
KisFlow提供一個默認的Serialize序列化實例娇昙,主要以Json格式為主,在kis-flow/
下創(chuàng)建serialize/
文件夾笤妙,在kis-flow/serialize/
下創(chuàng)建serialize_default.go
文件,實現(xiàn)的序列化和反序列化代碼如下:
kis-flow/serialize/serialize_default.go
package serialize
import (
"encoding/json"
"fmt"
"kis-flow/common"
"reflect"
)
type DefaultSerialize struct{}
// UnMarshal 用于將 KisRowArr 反序列化為指定類型的值噪裕。
func (f *DefaultSerialize) UnMarshal(arr common.KisRowArr, r reflect.Type) (reflect.Value, error) {
// 確保傳入的類型是一個切片
if r.Kind() != reflect.Slice {
return reflect.Value{}, fmt.Errorf("r must be a slice")
}
slice := reflect.MakeSlice(r, 0, len(arr))
// 遍歷每個元素并嘗試反序列化
for _, row := range arr {
var elem reflect.Value
var err error
// 嘗試斷言為結(jié)構(gòu)體或指針
elem, err = unMarshalStruct(row, r.Elem())
if err == nil {
slice = reflect.Append(slice, elem)
continue
}
// 嘗試直接反序列化字符串
elem, err = unMarshalJsonString(row, r.Elem())
if err == nil {
slice = reflect.Append(slice, elem)
continue
}
// 嘗試先序列化為 JSON 再反序列化
elem, err = unMarshalJsonStruct(row, r.Elem())
if err == nil {
slice = reflect.Append(slice, elem)
} else {
return reflect.Value{}, fmt.Errorf("failed to decode row: %v", err)
}
}
return slice, nil
}
// Marshal 用于將指定類型的值序列化為 KisRowArr(json 序列化)蹲盘。
func (f *DefaultSerialize) Marshal(i interface{}) (common.KisRowArr, error) {
var arr common.KisRowArr
switch reflect.TypeOf(i).Kind() {
case reflect.Slice, reflect.Array:
slice := reflect.ValueOf(i)
for i := 0; i < slice.Len(); i++ {
// 序列化每個元素為 JSON 字符串,并將其添加到切片中膳音。
jsonBytes, err := json.Marshal(slice.Index(i).Interface())
if err != nil {
return nil, fmt.Errorf("failed to marshal element to JSON: %v ", err)
}
arr = append(arr, string(jsonBytes))
}
default:
// 如果不是切片或數(shù)組類型召衔,則直接序列化整個結(jié)構(gòu)體為 JSON 字符串。
jsonBytes, err := json.Marshal(i)
if err != nil {
return nil, fmt.Errorf("failed to marshal element to JSON: %v ", err)
}
arr = append(arr, string(jsonBytes))
}
return arr, nil
}
其中一些函數(shù)定義如下:
kis-flow/serialize/serialize_default.go
// 嘗試斷言為結(jié)構(gòu)體或指針
func unMarshalStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
// 檢查 row 是否為結(jié)構(gòu)體或結(jié)構(gòu)體指針類型
rowType := reflect.TypeOf(row)
if rowType == nil {
return reflect.Value{}, fmt.Errorf("row is nil pointer")
}
if rowType.Kind() != reflect.Struct && rowType.Kind() != reflect.Ptr {
return reflect.Value{}, fmt.Errorf("row must be a struct or struct pointer type")
}
// 如果 row 是指針類型祭陷,則獲取它指向的類型
if rowType.Kind() == reflect.Ptr {
// 空指針
if reflect.ValueOf(row).IsNil() {
return reflect.Value{}, fmt.Errorf("row is nil pointer")
}
// 解引用
row = reflect.ValueOf(row).Elem().Interface()
// 拿到解引用后的類型
rowType = reflect.TypeOf(row)
}
// 檢查是否可以將 row 斷言為 elemType(目標類型)
if !rowType.AssignableTo(elemType) {
return reflect.Value{}, fmt.Errorf("row type cannot be asserted to elemType")
}
// 將 row 轉(zhuǎn)換為 reflect.Value 并返回
return reflect.ValueOf(row), nil
}
// 嘗試直接反序列化字符串(將Json字符串 反序列化為 結(jié)構(gòu)體)
func unMarshalJsonString(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
// 判斷源數(shù)據(jù)是否可以斷言成string
str, ok := row.(string)
if !ok {
return reflect.Value{}, fmt.Errorf("not a string")
}
// 創(chuàng)建一個新的結(jié)構(gòu)體實例苍凛,用于存儲反序列化后的值
elem := reflect.New(elemType).Elem()
// 嘗試將json字符串反序列化為結(jié)構(gòu)體。
if err := json.Unmarshal([]byte(str), elem.Addr().Interface()); err != nil {
return reflect.Value{}, fmt.Errorf("failed to unmarshal string to struct: %v", err)
}
return elem, nil
}
// 嘗試先序列化為 JSON 再反序列化(將結(jié)構(gòu)體轉(zhuǎn)換成Json字符串兵志,再將Json字符串 反序列化為 結(jié)構(gòu)體)
func unMarshalJsonStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
// 將 row 序列化為 JSON 字符串
jsonBytes, err := json.Marshal(row)
if err != nil {
return reflect.Value{}, fmt.Errorf("failed to marshal row to JSON: %v ", err)
}
// 創(chuàng)建一個新的結(jié)構(gòu)體實例醇蝴,用于存儲反序列化后的值
elem := reflect.New(elemType).Interface()
// 將 JSON 字符串反序列化為結(jié)構(gòu)體
if err := json.Unmarshal(jsonBytes, elem); err != nil {
return reflect.Value{}, fmt.Errorf("failed to unmarshal JSON to element: %v ", err)
}
return reflect.ValueOf(elem).Elem(), nil
}
- UnMarshal(): 的實現(xiàn) 首先判斷形參是否是一個Slice,如果是的話想罕,那么切片中的每個元素的數(shù)據(jù)進行序列化悠栓,優(yōu)先嘗試
unMarshalStruct()
結(jié)構(gòu)體反序列化,其次嘗試json字符串的反序列化unMarshalJsonString()
,最后再嘗試具備相同屬性的結(jié)構(gòu)體但是名稱不同的反序列化unMarshalJsonStruct()
惭适。 - Marshal(): 則是將任意類型序列化為json二進制字符串存儲在KisRowArr中笙瑟。
注意:KisFlow目前的默認序列化只實現(xiàn)了json格式的序列化,開發(fā)者可以參考DefaultSerialize{} 來實現(xiàn)自己其他格式的數(shù)據(jù)序列化和反序列化動作癞志。
11.2.3 默認的默認的Serialize實例
在serialize的接口定義中往枷,定義一個全局默認的序列化實例,defaultSerialize凄杯。
kis-flow/kis/serialize.go
// defaultSerialize KisFlow提供的默認序列化實現(xiàn)(開發(fā)者可以自定義)
var defaultSerialize = &serialize.DefaultSerialize{}
同時提供一個判斷一個數(shù)據(jù)類型是否實現(xiàn)了抽象接口Serialize
的校驗方法错洁,如下:
kis-flow/kis/serialize.go
// isSerialize 判斷傳遞進來的 paramType 是否實現(xiàn)了 Serialize 接口
func isSerialize(paramType reflect.Type) bool {
return paramType.Implements(reflect.TypeOf((*Serialize)(nil)).Elem())
}
11.2.4 FaaSDesc實現(xiàn)Serialize序列化接口
現(xiàn)在將FaaSDesc去繼承且實現(xiàn)Serialize接口,在調(diào)度FaaSDesc的時候?qū)鬟f的輸入?yún)?shù)進行序列化得到相對應(yīng)的具體類型形參盾舌,定義如下:
kis-flow/kis/faas.go
// FaaSDesc FaaS 回調(diào)計算業(yè)務(wù)函數(shù) 描述
type FaaSDesc struct {
// +++++++
Serialize // 當前Function的數(shù)據(jù)輸入輸出序列化實現(xiàn)
// +++++++
FnName string // Function名稱
f interface{} // FaaS 函數(shù)
fName string // 函數(shù)名稱
ArgsType []reflect.Type // 函數(shù)參數(shù)類型(集合)
ArgNum int // 函數(shù)參數(shù)個數(shù)
FuncType reflect.Type // 函數(shù)類型
FuncValue reflect.Value // 函數(shù)值(函數(shù)地址)
}
然后墓臭,在構(gòu)造方法NewFaaSDesc()
加上對自定義形參的判斷,判斷傳遞的自定義形參是否實現(xiàn)了Serialize
的兩個序列化接口妖谴,如果實現(xiàn)了窿锉,則使用自定義的序列化接口,如果沒有實現(xiàn)膝舅,則使用默認的DefaultSerialize{}
實例嗡载。
kis-flow/kis/faas.go
// NewFaaSDesc 根據(jù)用戶注冊的FnName 和FaaS 回調(diào)函數(shù),創(chuàng)建 FaaSDesc 描述實例
func NewFaaSDesc(fnName string, f FaaS) (*FaaSDesc, error) {
// ++++++++++
// 輸入輸出序列化實例
var serializeImpl Serialize
// ++++++++++
// ... ...
// ... ...
// 遍歷FaaS的形參類型
for i := 0; i < funcType.NumIn(); i++ {
// 取出第i個形式參數(shù)類型
paramType := funcType.In(i)
if isFlowType(paramType) {
// 判斷是否包含kis.Flow類型的形參
containsKisFlow = true
} else if isContextType(paramType) {
// 判斷是否包含context.Context類型的形參
containsCtx = true
} else if isSliceType(paramType) {
// 獲取當前參數(shù)Slice的元素類型
itemType := paramType.Elem()
// 如果當前參數(shù)是一個指針類型仍稀,則獲取指針指向的結(jié)構(gòu)體類型
if itemType.Kind() == reflect.Ptr {
itemType = itemType.Elem() // 獲取指針指向的結(jié)構(gòu)體類型
}
// +++++++++++++++++++++++++++++
// Check if f implements Serialize interface
// (檢測傳遞的FaaS函數(shù)是否實現(xiàn)了Serialize接口)
if isSerialize(itemType) {
// 如果當前形參實現(xiàn)了Serialize接口洼滚,則使用當前形參的序列化實現(xiàn)
serializeImpl = reflect.New(itemType).Interface().(Serialize)
} else {
// 如果當前形參沒有實現(xiàn)Serialize接口,則使用默認的序列化實現(xiàn)
serializeImpl = defaultSerialize // Use global default implementation
}
// +++++++++++++++++++++++++++++++
} else {
// Other types are not supported
}
// 將當前形參類型追加到argsType集合中
argsType[i] = paramType
}
// ... ...
// ... ...
// 返回FaaSDesc描述實例
return &FaaSDesc{
Serialize: serializeImpl,
FnName: fnName,
f: f,
fName: fullName,
ArgsType: argsType,
ArgNum: len(argsType),
FuncType: funcType,
FuncValue: funcValue,
}, nil
}
11.2.5 完成調(diào)度FaaS數(shù)據(jù)序列化
最后在調(diào)度FaaSDesc的時候技潘,解析形參的時候遥巴,如果是自定義的Slice參數(shù),則對齊進行反序列化操作享幽,將flow.Input()
的原數(shù)據(jù)反序列化成為開發(fā)者需要的結(jié)構(gòu)體數(shù)據(jù)铲掐,進行調(diào)度FaaS,實現(xiàn)如下:
kis-flow/kis/pool.go
// CallFunction 調(diào)度 Function
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {
if funcDesc, ok := pool.fnRouter[fnName]; ok {
// 被調(diào)度Function的形參列表
params := make([]reflect.Value, 0, funcDesc.ArgNum)
for _, argType := range funcDesc.ArgsType {
// 如果是Flow類型形參值桩,則將 flow的值傳入
if isFlowType(argType) {
params = append(params, reflect.ValueOf(flow))
continue
}
// 如果是Context類型形參摆霉,則將 ctx的值傳入
if isContextType(argType) {
params = append(params, reflect.ValueOf(ctx))
continue
}
// 如果是Slice類型形參,則將 flow.Input()的值傳入
if isSliceType(argType) {
// +++++++++++++++++++
// 將flow.Input()中的原始數(shù)據(jù)奔坟,反序列化為argType類型的數(shù)據(jù)
value, err := funcDesc.Serialize.UnMarshal(flow.Input(), argType)
if err != nil {
log.Logger().ErrorFX(ctx, "funcDesc.Serialize.DecodeParam err=%v", err)
} else {
params = append(params, value)
continue
}
// +++++++++++++++++++
}
// 傳遞的參數(shù)携栋,既不是Flow類型,也不是Context類型咳秉,也不是Slice類型婉支,則默認給到零值
params = append(params, reflect.Zero(argType))
}
// 調(diào)用當前Function 的計算邏輯
retValues := funcDesc.FuncValue.Call(params)
// 取出第一個返回值,如果是nil澜建,則返回nil
ret := retValues[0].Interface()
if ret == nil {
return nil
}
// 如果返回值是error類型磅摹,則返回error
return retValues[0].Interface().(error)
}
log.Logger().ErrorFX(ctx, "FuncName: %s Can not find in KisPool, Not Added.\n", fnName)
return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.")
}
這樣我們就將數(shù)據(jù)序列化的動作和FaaSDesc模塊結(jié)合起來了滋迈,接下來,我們寫一個單元測試來測試這部分的能力户誓。
11.3 自定義形參序列化單元測試
11.3.1 Flow與Function的配置文件定義
單元測試饼灿,我們新建兩個Function配置如下:
kis-flow/test/load_conf/func/func-avgStuScore.yml
kistype: func
fname: AvgStuScore
fmode: Calculate
source:
name: 學生平均分
must:
- stu_id
kis-flow/test/load_conf/func/func-PrintStuAvgScore.yml
kistype: func
fname: PrintStuAvgScore
fmode: Expand
source:
name: 學生平均分
must:
- stu_id
然后我們來定義一個Flow將上述的兩個Function鏈接起來
kis-flow/test/load_conf/flow/flow-StuAvg.yml
kistype: flow
status: 1
flow_name: StuAvg
flows:
- fname: AvgStuScore
- fname: PrintStuAvgScore
11.3.2 自定義基礎(chǔ)數(shù)據(jù)proto定義
在kis-flow/test/
下創(chuàng)建proto/
文件夾,創(chuàng)建一個自定義的基礎(chǔ)數(shù)據(jù)proto帝美,為了今后數(shù)據(jù)協(xié)議的復(fù)用碍彭,如下:
kis-flow/test/proto/stu_score.go
package proto
// 學生學習分數(shù)
type StuScores struct {
StuId int `json:"stu_id"`
Score1 int `json:"score_1"`
Score2 int `json:"score_2"`
Score3 int `json:"score_3"`
}
// 學生的平均分
type StuAvgScore struct {
StuId int `json:"stu_id"`
AvgScore float64 `json:"avg_score"`
}
11.3.3 定義兩個FaaS計算回調(diào)函數(shù)
定義兩個FaaS計算函數(shù),一個為計算一個Student的平均分悼潭,一個打印Student的平均分庇忌,如下:
kis-flow/test/faas/faas_stu_score_avg.go
package faas
import (
"context"
"kis-flow/kis"
"kis-flow/serialize"
"kis-flow/test/proto"
)
type AvgStuScoreIn struct {
serialize.DefaultSerialize
proto.StuScores
}
type AvgStuScoreOut struct {
serialize.DefaultSerialize
proto.StuAvgScore
}
// AvgStuScore(FaaS) 計算學生平均分
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
for _, row := range rows {
avgScore := proto.StuAvgScore{
StuId: row.StuId,
AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
}
// 提交結(jié)果數(shù)據(jù)
_ = flow.CommitRow(AvgStuScoreOut{StuAvgScore: avgScore})
}
return nil
}
AvgStuScore()
方法為我們改進之后的FaaS函數(shù),其中第三個形參rows []*AvgStuScoreIn
為我們自定義序列化的形參舰褪,之前我們通過flow.Input()
來拿到原始的數(shù)據(jù)皆疹,然后進行遍歷,其實現(xiàn)在依然可以這么處理占拍,但是每次可能需要開發(fā)者在FaaS中自行斷言判斷略就,對開發(fā)的效率有些成本,那么現(xiàn)在開發(fā)者完全可以通過AvgStuScoreIn
來描述一個形參的數(shù)據(jù)晃酒,然后在AvgStuScore
的業(yè)務(wù)中表牢,通過遍歷rows
得到已經(jīng)序列化好的結(jié)構(gòu)體,增加的代碼的可讀性也降低的寫業(yè)務(wù)的開發(fā)成本贝次,提高了效率崔兴。
打印平均分的FaaS實現(xiàn)如下:
kis-flow/test/faas/faas_stu_score_avg_print.go
package faas
import (
"context"
"fmt"
"kis-flow/kis"
"kis-flow/serialize"
"kis-flow/test/proto"
)
type PrintStuAvgScoreIn struct {
serialize.DefaultSerialize
proto.StuAvgScore
}
type PrintStuAvgScoreOut struct {
serialize.DefaultSerialize
}
func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
for _, row := range rows {
fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
}
return nil
}
與上述函數(shù)一樣,我們依然采用自定義的輸入形參來進行邏輯開發(fā)蛔翅。
11.3.4 單元測試用例
接下來我們來編寫上面Flow的測試用例單元測試敲茄,代碼如下:
kis-flow/test/kis_auto_inject_param_test.go
package test
import (
"context"
"kis-flow/common"
"kis-flow/config"
"kis-flow/file"
"kis-flow/flow"
"kis-flow/kis"
"kis-flow/test/faas"
"kis-flow/test/proto"
"testing"
)
func TestAutoInjectParamWithConfig(t *testing.T) {
ctx := context.Background()
kis.Pool().FaaS("AvgStuScore", faas.AvgStuScore)
kis.Pool().FaaS("PrintStuAvgScore", faas.PrintStuAvgScore)
// 1. 加載配置文件并構(gòu)建Flow
if err := file.ConfigImportYaml("load_conf/"); err != nil {
panic(err)
}
// 2. 獲取Flow
flow1 := kis.Pool().GetFlow("StuAvg")
if flow1 == nil {
panic("flow1 is nil")
}
// 3. 提交原始數(shù)據(jù)
_ = flow1.CommitRow(&faas.AvgStuScoreIn{
StuScores: proto.StuScores{
StuId: 100,
Score1: 1,
Score2: 2,
Score3: 3,
},
})
_ = flow1.CommitRow(faas.AvgStuScoreIn{
StuScores: proto.StuScores{
StuId: 100,
Score1: 1,
Score2: 2,
Score3: 3,
},
})
// 提交原始數(shù)據(jù)(json字符串)
_ = flow1.CommitRow(`{"stu_id":101}`)
// 4. 執(zhí)行flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}
在提交原始數(shù)據(jù)的時候,我們這里面采用的是使用默認的序列化方式山析,支持json的反序列化支持折汞,在CommitRow()
的時候,一共提交的3條數(shù)據(jù)盖腿,前兩條是提交的結(jié)構(gòu)體數(shù)據(jù),最后一次是提交的json字符串损同,目前都可以支持翩腐。
cd 到kis-flow/test/
下,執(zhí)行:
$ go test -test.v -test.paniconexit0 -test.run TestAutoInjectParamWithConfig
得到結(jié)果如下:
$ go test -test.v -test.paniconexit0 -test.run TestAutoInjectParamWithConfig
...
...
Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
...
...
Add FlowRouter FlowName=StuAvg
context.Background
====> After CommitSrcData, flow_name = StuAvg, flow_id = flow-1265702bc905400da1788c0354080ded
All Level Data =
map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}]]
KisFunctionC, flow = &{Id:flow-1265702bc905400da1788c0354080ded Name:StuAvg Conf:0xc000286100 Funcs:map[AvgStuScore:0xc00023af80 PrintStuAvgScore:0xc00023b000] FlowHead:0xc00023af80 FlowTail:0xc00023b000 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00023af80 ThisFunctionId:func-12a05e62a12a45fdade8477a3bddd2fd PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-12a05e62a12a45fdade8477a3bddd2fd:map[] func-7f308d00f4fa49488760ff1dfb85dc46:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}]] inPut:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc000210b88 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}
context.Background
====> After commitCurData, flow_name = StuAvg, flow_id = flow-1265702bc905400da1788c0354080ded
All Level Data =
map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}] func-12a05e62a12a45fdade8477a3bddd2fd:[{DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:101 AvgScore:0}}]]
KisFunctionE, flow = &{Id:flow-1265702bc905400da1788c0354080ded Name:StuAvg Conf:0xc000286100 Funcs:map[AvgStuScore:0xc00023af80 PrintStuAvgScore:0xc00023b000] FlowHead:0xc00023af80 FlowTail:0xc00023b000 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00023b000 ThisFunctionId:func-7f308d00f4fa49488760ff1dfb85dc46 PrevFunctionId:func-12a05e62a12a45fdade8477a3bddd2fd funcParams:map[func-12a05e62a12a45fdade8477a3bddd2fd:map[] func-7f308d00f4fa49488760ff1dfb85dc46:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}] func-12a05e62a12a45fdade8477a3bddd2fd:[{DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:101 AvgScore:0}}]] inPut:[{DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:101 AvgScore:0}}] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc000210b88 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}
stuid: [100], avg score: [2]
stuid: [100], avg score: [2]
stuid: [101], avg score: [0]
--- PASS: TestAutoInjectParamWithConfig (0.01s)
PASS
ok kis-flow/test 0.030s
11.4 【V1.0】 源代碼
https://github.com/aceld/kis-flow/releases/tag/v1.0
作者:劉丹冰Aceld github: https://github.com/aceld
KisFlow開源項目地址:https://github.com/aceld/kis-flow
Golang框架實戰(zhàn)-KisFlow流式計算框架專欄
Golang框架實戰(zhàn)-KisFlow流式計算框架(1)-概述
Golang框架實戰(zhàn)-KisFlow流式計算框架(2)-項目構(gòu)建/基礎(chǔ)模塊-(上)
Golang框架實戰(zhàn)-KisFlow流式計算框架(3)-項目構(gòu)建/基礎(chǔ)模塊-(下)
Golang框架實戰(zhàn)-KisFlow流式計算框架(4)-數(shù)據(jù)流
Golang框架實戰(zhàn)-KisFlow流式計算框架(5)-Function調(diào)度
Golang框架實戰(zhàn)-KisFlow流式計算框架(6)-Connector
Golang框架實戰(zhàn)-KisFlow流式計算框架(7)-配置導(dǎo)入與導(dǎo)出
Golang框架實戰(zhàn)-KisFlow流式計算框架(8)-KisFlow Action
Golang框架實戰(zhàn)-KisFlow流式計算框架(10)-Flow多副本