Golang框架實戰(zhàn)-KisFlow流式計算框架(12)-基于反射自適應(yīng)注冊FaaS形參類型

Golang框架實戰(zhàn)-KisFlow流式計算框架專欄

Golang框架實戰(zhàn)-KisFlow流式計算框架(1)-概述
Golang框架實戰(zhàn)-KisFlow流式計算框架(2)-項目構(gòu)建/基礎(chǔ)模塊-(上)
Golang框架實戰(zhàn)-KisFlow流式計算框架(3)-項目構(gòu)建/基礎(chǔ)模塊-(下)
Golang框架實戰(zhàn)-KisFlow流式計算框架(4)-數(shù)據(jù)流
Golang框架實戰(zhàn)-KisFlow流式計算框架(5)-Function調(diào)度
Golang框架實戰(zhàn)-KisFlow流式計算框架(6)-Connector
Golang框架實戰(zhàn)-KisFlow流式計算框架(7)-配置導(dǎo)入與導(dǎo)出
Golang框架實戰(zhàn)-KisFlow流式計算框架(8)-KisFlow Action
Golang框架實戰(zhàn)-KisFlow流式計算框架(9)-Cache/Params 數(shù)據(jù)緩存與數(shù)據(jù)參數(shù)
Golang框架實戰(zhàn)-KisFlow流式計算框架(10)-Flow多副本


接下來我們來增強KisFlow中Function對業(yè)務(wù)數(shù)據(jù)處理的聚焦堵腹,將之前Function的寫法:

func FuncDemo3Handler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call funcName3Handler ----")
    fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return nil
}

是從flow.Input()中 獲取到原始數(shù)據(jù)斥铺,改成可以直接獲取到業(yè)務(wù)想要的具體數(shù)據(jù)結(jié)構(gòu)類型伦吠,而無需斷言等類型判斷和轉(zhuǎn)換玛歌。改成的Function擴展參數(shù)用法大致如下:

proto

type StuScores struct {
    StuId  int `json:"stu_id"`
    Score1 int `json:"score_1"`
    Score2 int `json:"score_2"`
    Score3 int `json:"score_3"`
}

type StuAvgScore struct {
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`
}

FaaS

type AvgStuScoreIn struct {
    serialize.DefaultSerialize
    proto.StuScores
}

type AvgStuScoreOut struct {
    serialize.DefaultSerialize
    proto.StuAvgScore
}

// AvgStuScore(FaaS) 計算學生平均分
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
    for _, row := range rows {
        avgScore := proto.StuAvgScore{
            StuId:    row.StuId,
            AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
        }
        // 提交結(jié)果數(shù)據(jù)
        _ = flow.CommitRow(AvgStuScoreOut{StuAvgScore: avgScore})
    }

    return nil
}

這樣帆疟,我們可以通過第三個形式參數(shù)rows直接拿到我們期待的目標輸出結(jié)構(gòu)體數(shù)據(jù)轻要,不需要斷言和轉(zhuǎn)換,更加關(guān)注業(yè)務(wù)方的開發(fā)效率旱爆。

當然,如果你希望獲取到原始的數(shù)據(jù)窘茁,依然可以從flow.Input()中獲取到怀伦。

本章將實現(xiàn)KisFlow上述功能。

11.1 FaaS業(yè)務(wù)回調(diào)函數(shù)自描述

本節(jié)將完成FaaS的自描述概念改造山林,我們知道之前的FaaS回調(diào)如下:

type FaaS func(context.Context, Flow) error

那么我們需要一個結(jié)構(gòu)體房待,來描述這個函數(shù)屬性,包括他的函數(shù)名稱驼抹、函數(shù)地址桑孩、形參數(shù)量、相殘類型框冀、返回值類型等等流椒。

11.1.1 FaaSDesc 回調(diào)自描述類型

kis-flow/kis/下,新創(chuàng)建一個文件faas.go明也,定義如下結(jié)構(gòu)體:

kis-flow/kis/faas.go

// FaaS Function as a Service

// 將
// type FaaS func(context.Context, Flow) error
// 改為
// type FaaS func(context.Context, Flow, ...interface{}) error
// 可以通過可變參數(shù)的任意輸入類型進行數(shù)據(jù)傳遞
type FaaS interface{}

// FaaSDesc FaaS 回調(diào)計算業(yè)務(wù)函數(shù) 描述
type FaaSDesc struct {
    FnName    string         // Function名稱
    f         interface{}    // FaaS 函數(shù)
    fName     string         // 函數(shù)名稱
    ArgsType  []reflect.Type // 函數(shù)參數(shù)類型(集合)
    ArgNum    int            // 函數(shù)參數(shù)個數(shù)
    FuncType  reflect.Type   // 函數(shù)類型
    FuncValue reflect.Value  // 函數(shù)值(函數(shù)地址)
}

將之前的FaaS改進成interface{}宣虾,而FaaSDesc具備了一些屬性。

  • FnName: 表示當前Function的名稱温数,例如我們之前例子的"funcDemo1" 等绣硝,這個是用來KisFlow給Function標識的FunctionName。
  • f:表示定義的FaaS函數(shù)撑刺。
  • fName: 定義f函數(shù)的函數(shù)名稱鹉胖。
  • ArgsType:定義的f函數(shù)的形參類型列表,這是一個slice。
  • ArgNum:定義的f函數(shù)的輸入形參個數(shù)甫菠。
  • FuncType:定義的f函數(shù)的數(shù)據(jù)類型挠铲。
  • FuncValue:定義的f函數(shù)的函數(shù)值(可以被調(diào)度的函數(shù)地址)。

11.1.2 新建一個FaaSDesc對象

下面淑蔚,提供一個新建FaaSDesc的構(gòu)造函數(shù)市殷,形參的類型就是KisFlow的FunctionName和定義的FaaS函數(shù),如下:

kis-flow/kis/faas.go

// NewFaaSDesc 根據(jù)用戶注冊的FnName 和FaaS 回調(diào)函數(shù)刹衫,創(chuàng)建 FaaSDesc 描述實例
func NewFaaSDesc(fnName string, f FaaS) (*FaaSDesc, error) {
    // 傳入的回調(diào)函數(shù)FaaS,函數(shù)值(函數(shù)地址)
    funcValue := reflect.ValueOf(f)

    // 傳入的回調(diào)函數(shù)FaaS 類型
    funcType := funcValue.Type()

    // 判斷傳遞的FaaS指針是否是函數(shù)類型
    if !isFuncType(funcType) {
        return nil, fmt.Errorf("provided FaaS type is %s, not a function", funcType.Name())
    }

    // 判斷傳遞的FaaS函數(shù)是否有返回值類型是只包括(error)
    if funcType.NumOut() != 1 || funcType.Out(0) != reflect.TypeOf((*error)(nil)).Elem() {
        return nil, errors.New("function must have exactly one return value of type error")
    }

    // FaaS函數(shù)的參數(shù)類型集合
    argsType := make([]reflect.Type, funcType.NumIn())

    // 獲取FaaS的函數(shù)名稱
    fullName := runtime.FuncForPC(funcValue.Pointer()).Name()

    // 確保  FaaS func(context.Context, Flow, ...interface{}) error 形參列表醋寝,存在context.Context 和 kis.Flow

    // 是否包含kis.Flow類型的形參
    containsKisFlow := false
    // 是否包含context.Context類型的形參
    containsCtx := false

    // 遍歷FaaS的形參類型
    for i := 0; i < funcType.NumIn(); i++ {

        // 取出第i個形式參數(shù)類型
        paramType := funcType.In(i)

        if isFlowType(paramType) {
            // 判斷是否包含kis.Flow類型的形參
            containsKisFlow = true

        } else if isContextType(paramType) {
            // 判斷是否包含context.Context類型的形參
            containsCtx = true

        } else if isSliceType(paramType) {
            // 判斷是否包含Slice類型的形參

            // 獲取當前參數(shù)Slice的元素類型
            itemType := paramType.Elem()

            // 如果當前參數(shù)是一個指針類型,則獲取指針指向的結(jié)構(gòu)體類型
            if itemType.Kind() == reflect.Ptr {
                itemType = itemType.Elem() // 獲取指針指向的結(jié)構(gòu)體類型
            }
        } else {
            // Other types are not supported...
        }

        // 將當前形參類型追加到argsType集合中
        argsType[i] = paramType
    }

    if !containsKisFlow {
        // 不包含kis.Flow類型的形參带迟,返回錯誤
        return nil, errors.New("function parameters must have kis.Flow param, please use FaaS type like: [type FaaS func(context.Context, Flow, ...interface{}) error]")
    }

    if !containsCtx {
        // 不包含context.Context類型的形參音羞,返回錯誤
        return nil, errors.New("function parameters must have context, please use FaaS type like: [type FaaS func(context.Context, Flow, ...interface{}) error]")
    }

    // 返回FaaSDesc描述實例
    return &FaaSDesc{
        FnName:    fnName,
        f:         f,
        fName:     fullName,
        ArgsType:  argsType,
        ArgNum:    len(argsType),
        FuncType:  funcType,
        FuncValue: funcValue,
    }, nil
}

這里面通過用reflect反射能力,依次從f函數(shù)中獲取相關(guān)的屬性值仓犬,存放在FaaSDesc中嗅绰。
這里面為了確保開發(fā)者傳遞的FaaS原因滿足如下格式:

type FaaS func(context.Context, Flow, ...interface{}) error

所以對形參context.Context和形參Flow做了嚴格的形參類型校驗,其中的校驗方法如下:

kis-flow/kis/faas.go

// isFuncType 判斷傳遞進來的 paramType 是否是函數(shù)類型
func isFuncType(paramType reflect.Type) bool {
    return paramType.Kind() == reflect.Func
}

// isFlowType 判斷傳遞進來的 paramType 是否是 kis.Flow 類型
func isFlowType(paramType reflect.Type) bool {
    var flowInterfaceType = reflect.TypeOf((*Flow)(nil)).Elem()

    return paramType.Implements(flowInterfaceType)
}

// isContextType 判斷傳遞進來的 paramType 是否是 context.Context 類型
func isContextType(paramType reflect.Type) bool {
    typeName := paramType.Name()

    return strings.Contains(typeName, "Context")
}

// isSliceType 判斷傳遞進來的 paramType 是否是切片類型
func isSliceType(paramType reflect.Type) bool {
    return paramType.Kind() == reflect.Slice
}

NewFaaSDesc()containsKisFlowcontainsCtx兩個bool類型的變量來判斷是否包括Context和Flow類型搀继。
下面這段代碼是為了兼容傳遞的形參類型是結(jié)構(gòu)體指針時候的兼容:

            // ... ... 

            // 獲得當前形參類型
            itemType := paramType.Elem()

            // 如果當前參數(shù)是一個指針類型窘面,則獲取指針指向的結(jié)構(gòu)體類型
            if itemType.Kind() == reflect.Ptr {
                itemType = itemType.Elem() // 獲取指針指向的結(jié)構(gòu)體類型
            }

            // ... ... 

比如開發(fā)者傳遞的FaaS函數(shù)原型如下:

func MyFaaSDemo(ctx context.Context, flow Flow, []*A) error

和:

func MyFaaSDemo(ctx context.Context, flow Flow, []A) error

11.1.3 注冊FaaS函數(shù)

那么接下來,我們將kisPool模塊叽躯,的注冊FaaS函數(shù)的方法修改成注冊一個FaaSDesc描述财边,修改后的注冊方法如下:

kis-flow/kis/pool.go

// FaaS 注冊 Function 計算業(yè)務(wù)邏輯, 通過Function Name 索引及注冊
func (pool *kisPool) FaaS(fnName string, f FaaS) {

    // 當注冊FaaS計算邏輯回調(diào)時,創(chuàng)建一個FaaSDesc描述對象
    faaSDesc, err := NewFaaSDesc(fnName, f)
    if err != nil {
        panic(err)
    }

    pool.fnLock.Lock() // 寫鎖
    defer pool.fnLock.Unlock()

    if _, ok := pool.fnRouter[fnName]; !ok {
        // 將FaaSDesc描述對象注冊到fnRouter中
        pool.fnRouter[fnName] = faaSDesc
    } else {
        errString := fmt.Sprintf("KisPoll FaaS Repeat FuncName=%s", fnName)
        panic(errString)
    }

    log.Logger().InfoF("Add KisPool FuncName=%s", fnName)
}

那么現(xiàn)在的fnRouter中保存的key依然是FunctionName点骑,但是value則為當前FaaS函數(shù)的描述對象FaaSDesc.

11.1.4 kisPool調(diào)度FaaSDesc

最后再調(diào)度Function的時候酣难,通過FaaSDesc取出調(diào)度函數(shù)地址和函數(shù)形參列表進行函數(shù)的調(diào)度。
修改的后的CallFunction()如下:

kis-flow/kis/pool.go

// CallFunction 調(diào)度 Function
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {

    if funcDesc, ok := pool.fnRouter[fnName]; ok {

        // 被調(diào)度Function的形參列表
        params := make([]reflect.Value, 0, funcDesc.ArgNum)

        for _, argType := range funcDesc.ArgsType {

            // 如果是Flow類型形參黑滴,則將 flow的值傳入
            if isFlowType(argType) {
                params = append(params, reflect.ValueOf(flow))
                continue
            }

            // 如果是Context類型形參憨募,則將 ctx的值傳入
            if isContextType(argType) {
                params = append(params, reflect.ValueOf(ctx))
                continue
            }

            // 如果是Slice類型形參,則將 flow.Input()的值傳入
            if isSliceType(argType) {
                params = append(params, value)
                continue
            }

            // 傳遞的參數(shù)袁辈,既不是Flow類型菜谣,也不是Context類型,也不是Slice類型吵瞻,則默認給到零值
            params = append(params, reflect.Zero(argType))
        }

        // 調(diào)用當前Function 的計算邏輯
        retValues := funcDesc.FuncValue.Call(params)

        // 取出第一個返回值葛菇,如果是nil,則返回nil
        ret := retValues[0].Interface()
        if ret == nil {
            return nil
        }

        // 如果返回值是error類型橡羞,則返回error
        return retValues[0].Interface().(error)

    }

    log.Logger().ErrorFX(ctx, "FuncName: %s Can not find in KisPool, Not Added.\n", fnName)

    return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.")
}

函數(shù)的整體調(diào)度邏輯大致如下:
首選通過fnName進行從fnRouter路由到對應(yīng)的FaaSDesc眯停。遍歷FaaSDesc的形參列表:
將Context和Flow對象依次取出來,將額外傳遞的自定義切片形參取出來卿泽,如果傳遞的參數(shù)莺债,既不是Flow類型滋觉,也不是Context類型,也不是Slice類型齐邦,則默認給到零值椎侠,如下:

            params = append(params, reflect.Zero(argType))

最后執(zhí)行函數(shù)的調(diào)度:

        retValues := funcDesc.FuncValue.Call(params)

得到第一個返回值error的數(shù)值,為nil則返回nil措拇,否則返回error類型我纪。

這樣我們的FaaS自描述的調(diào)度模式就建立成功了,那么有了這套功能KisFlow可以做什么事情呢丐吓,下一節(jié)我們可以再調(diào)度FaaSDesc的時候?qū)鬟f的自定義形參的數(shù)據(jù)類型進行序列化浅悉,得到開發(fā)者期待的數(shù)據(jù)類型。

11.2 FaaS形參的自定義數(shù)據(jù)類型序列化

11.2.1 Serialize序列化接口

首先券犁,我們定義一個數(shù)據(jù)序列化接口术健,在kis-flow/kis/下創(chuàng)建serialize.go 文件,如下:

kis-flow/kis/serialize.go

// Serialize 數(shù)據(jù)序列化接口
type Serialize interface {
    // UnMarshal 用于將 KisRowArr 反序列化為指定類型的值粘衬。
    UnMarshal(common.KisRowArr, reflect.Type) (reflect.Value, error)
    // Marshal 用于將指定類型的值序列化為 KisRowArr荞估。
    Marshal(interface{}) (common.KisRowArr, error)
}

其中KisRowArr是我們KisFlow中傳遞每個Function的數(shù)據(jù)切片,之前我們定義在了kis-flow/common/data_type.go中:

package common

// KisRow 一行數(shù)據(jù)
type KisRow interface{}

// KisRowArr 一次業(yè)務(wù)的批量數(shù)據(jù)
type KisRowArr []KisRow

/*
    KisDataMap 當前Flow承載的全部數(shù)據(jù)
    key :  數(shù)據(jù)所在的Function ID
    value: 對應(yīng)的KisRow
*/
type KisDataMap map[string]KisRowArr

Serialize提供了兩個接口:

  • UnMarshal:用于將 KisRowArr 反序列化為指定類型的值稚新。
  • Marshal:用于將指定類型的值序列化為 KisRowArr勘伺。

KisFlow會提供一個默認的Serialize給每個FaaS函數(shù),開發(fā)者也可以自定義自己的Serialize來對FaaS傳遞的形參進行自定義的數(shù)據(jù)序列化動作褂删。

11.2.2 KisFlow默認的Serialize序列化

KisFlow提供一個默認的Serialize序列化實例娇昙,主要以Json格式為主,在kis-flow/下創(chuàng)建serialize/文件夾笤妙,在kis-flow/serialize/下創(chuàng)建serialize_default.go文件,實現(xiàn)的序列化和反序列化代碼如下:

kis-flow/serialize/serialize_default.go

package serialize

import (
    "encoding/json"
    "fmt"
    "kis-flow/common"
    "reflect"
)

type DefaultSerialize struct{}

// UnMarshal 用于將 KisRowArr 反序列化為指定類型的值噪裕。
func (f *DefaultSerialize) UnMarshal(arr common.KisRowArr, r reflect.Type) (reflect.Value, error) {
    // 確保傳入的類型是一個切片
    if r.Kind() != reflect.Slice {
        return reflect.Value{}, fmt.Errorf("r must be a slice")
    }

    slice := reflect.MakeSlice(r, 0, len(arr))

    // 遍歷每個元素并嘗試反序列化
    for _, row := range arr {
        var elem reflect.Value
        var err error

        // 嘗試斷言為結(jié)構(gòu)體或指針
        elem, err = unMarshalStruct(row, r.Elem())
        if err == nil {
            slice = reflect.Append(slice, elem)
            continue
        }

        // 嘗試直接反序列化字符串
        elem, err = unMarshalJsonString(row, r.Elem())
        if err == nil {
            slice = reflect.Append(slice, elem)
            continue
        }

        // 嘗試先序列化為 JSON 再反序列化
        elem, err = unMarshalJsonStruct(row, r.Elem())
        if err == nil {
            slice = reflect.Append(slice, elem)
        } else {
            return reflect.Value{}, fmt.Errorf("failed to decode row: %v", err)
        }
    }

    return slice, nil
}

// Marshal 用于將指定類型的值序列化為 KisRowArr(json 序列化)蹲盘。
func (f *DefaultSerialize) Marshal(i interface{}) (common.KisRowArr, error) {
    var arr common.KisRowArr

    switch reflect.TypeOf(i).Kind() {
    case reflect.Slice, reflect.Array:
        slice := reflect.ValueOf(i)
        for i := 0; i < slice.Len(); i++ {
            // 序列化每個元素為 JSON 字符串,并將其添加到切片中膳音。
            jsonBytes, err := json.Marshal(slice.Index(i).Interface())
            if err != nil {
                return nil, fmt.Errorf("failed to marshal element to JSON: %v  ", err)
            }
            arr = append(arr, string(jsonBytes))
        }
    default:
        // 如果不是切片或數(shù)組類型召衔,則直接序列化整個結(jié)構(gòu)體為 JSON 字符串。
        jsonBytes, err := json.Marshal(i)
        if err != nil {
            return nil, fmt.Errorf("failed to marshal element to JSON: %v  ", err)
        }
        arr = append(arr, string(jsonBytes))
    }

    return arr, nil
}

其中一些函數(shù)定義如下:

kis-flow/serialize/serialize_default.go

// 嘗試斷言為結(jié)構(gòu)體或指針
func unMarshalStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
    // 檢查 row 是否為結(jié)構(gòu)體或結(jié)構(gòu)體指針類型
    rowType := reflect.TypeOf(row)
    if rowType == nil {
        return reflect.Value{}, fmt.Errorf("row is nil pointer")
    }
    if rowType.Kind() != reflect.Struct && rowType.Kind() != reflect.Ptr {
        return reflect.Value{}, fmt.Errorf("row must be a struct or struct pointer type")
    }

    // 如果 row 是指針類型祭陷,則獲取它指向的類型
    if rowType.Kind() == reflect.Ptr {
        // 空指針
        if reflect.ValueOf(row).IsNil() {
            return reflect.Value{}, fmt.Errorf("row is nil pointer")
        }

        // 解引用
        row = reflect.ValueOf(row).Elem().Interface()

        // 拿到解引用后的類型
        rowType = reflect.TypeOf(row)
    }

    // 檢查是否可以將 row 斷言為 elemType(目標類型)
    if !rowType.AssignableTo(elemType) {
        return reflect.Value{}, fmt.Errorf("row type cannot be asserted to elemType")
    }

    // 將 row 轉(zhuǎn)換為 reflect.Value 并返回
    return reflect.ValueOf(row), nil
}

// 嘗試直接反序列化字符串(將Json字符串 反序列化為 結(jié)構(gòu)體)
func unMarshalJsonString(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
    // 判斷源數(shù)據(jù)是否可以斷言成string
    str, ok := row.(string)
    if !ok {
        return reflect.Value{}, fmt.Errorf("not a string")
    }

    // 創(chuàng)建一個新的結(jié)構(gòu)體實例苍凛,用于存儲反序列化后的值
    elem := reflect.New(elemType).Elem()

    // 嘗試將json字符串反序列化為結(jié)構(gòu)體。
    if err := json.Unmarshal([]byte(str), elem.Addr().Interface()); err != nil {
        return reflect.Value{}, fmt.Errorf("failed to unmarshal string to struct: %v", err)
    }

    return elem, nil
}

// 嘗試先序列化為 JSON 再反序列化(將結(jié)構(gòu)體轉(zhuǎn)換成Json字符串兵志,再將Json字符串 反序列化為 結(jié)構(gòu)體)
func unMarshalJsonStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
    // 將 row 序列化為 JSON 字符串
    jsonBytes, err := json.Marshal(row)
    if err != nil {
        return reflect.Value{}, fmt.Errorf("failed to marshal row to JSON: %v  ", err)
    }

    // 創(chuàng)建一個新的結(jié)構(gòu)體實例醇蝴,用于存儲反序列化后的值
    elem := reflect.New(elemType).Interface()

    // 將 JSON 字符串反序列化為結(jié)構(gòu)體
    if err := json.Unmarshal(jsonBytes, elem); err != nil {
        return reflect.Value{}, fmt.Errorf("failed to unmarshal JSON to element: %v  ", err)
    }

    return reflect.ValueOf(elem).Elem(), nil
}

  • UnMarshal(): 的實現(xiàn) 首先判斷形參是否是一個Slice,如果是的話想罕,那么切片中的每個元素的數(shù)據(jù)進行序列化悠栓,優(yōu)先嘗試unMarshalStruct()結(jié)構(gòu)體反序列化,其次嘗試json字符串的反序列化unMarshalJsonString(),最后再嘗試具備相同屬性的結(jié)構(gòu)體但是名稱不同的反序列化unMarshalJsonStruct()惭适。
  • Marshal(): 則是將任意類型序列化為json二進制字符串存儲在KisRowArr中笙瑟。

注意:KisFlow目前的默認序列化只實現(xiàn)了json格式的序列化,開發(fā)者可以參考DefaultSerialize{} 來實現(xiàn)自己其他格式的數(shù)據(jù)序列化和反序列化動作癞志。

11.2.3 默認的默認的Serialize實例

在serialize的接口定義中往枷,定義一個全局默認的序列化實例,defaultSerialize凄杯。

kis-flow/kis/serialize.go

// defaultSerialize KisFlow提供的默認序列化實現(xiàn)(開發(fā)者可以自定義)
var defaultSerialize = &serialize.DefaultSerialize{}

同時提供一個判斷一個數(shù)據(jù)類型是否實現(xiàn)了抽象接口Serialize的校驗方法错洁,如下:

kis-flow/kis/serialize.go

// isSerialize 判斷傳遞進來的 paramType 是否實現(xiàn)了 Serialize 接口
func isSerialize(paramType reflect.Type) bool {
    return paramType.Implements(reflect.TypeOf((*Serialize)(nil)).Elem())
}

11.2.4 FaaSDesc實現(xiàn)Serialize序列化接口

現(xiàn)在將FaaSDesc去繼承且實現(xiàn)Serialize接口,在調(diào)度FaaSDesc的時候?qū)鬟f的輸入?yún)?shù)進行序列化得到相對應(yīng)的具體類型形參盾舌,定義如下:

kis-flow/kis/faas.go

// FaaSDesc FaaS 回調(diào)計算業(yè)務(wù)函數(shù) 描述
type FaaSDesc struct {
    // +++++++
    Serialize                // 當前Function的數(shù)據(jù)輸入輸出序列化實現(xiàn)
    // +++++++
    FnName    string         // Function名稱
    f         interface{}    // FaaS 函數(shù)
    fName     string         // 函數(shù)名稱
    ArgsType  []reflect.Type // 函數(shù)參數(shù)類型(集合)
    ArgNum    int            // 函數(shù)參數(shù)個數(shù)
    FuncType  reflect.Type   // 函數(shù)類型
    FuncValue reflect.Value  // 函數(shù)值(函數(shù)地址)
}

然后墓臭,在構(gòu)造方法NewFaaSDesc()加上對自定義形參的判斷,判斷傳遞的自定義形參是否實現(xiàn)了Serialize的兩個序列化接口妖谴,如果實現(xiàn)了窿锉,則使用自定義的序列化接口,如果沒有實現(xiàn)膝舅,則使用默認的DefaultSerialize{}實例嗡载。

kis-flow/kis/faas.go

// NewFaaSDesc 根據(jù)用戶注冊的FnName 和FaaS 回調(diào)函數(shù),創(chuàng)建 FaaSDesc 描述實例
func NewFaaSDesc(fnName string, f FaaS) (*FaaSDesc, error) {

    // ++++++++++
    // 輸入輸出序列化實例
    var serializeImpl Serialize
    // ++++++++++

    // ... ...
    // ... ...
    
    // 遍歷FaaS的形參類型
    for i := 0; i < funcType.NumIn(); i++ {

        // 取出第i個形式參數(shù)類型
        paramType := funcType.In(i)

        if isFlowType(paramType) {
            // 判斷是否包含kis.Flow類型的形參
            containsKisFlow = true

        } else if isContextType(paramType) {
            // 判斷是否包含context.Context類型的形參
            containsCtx = true

        } else if isSliceType(paramType) {

            // 獲取當前參數(shù)Slice的元素類型
            itemType := paramType.Elem()

            // 如果當前參數(shù)是一個指針類型仍稀,則獲取指針指向的結(jié)構(gòu)體類型
            if itemType.Kind() == reflect.Ptr {
                itemType = itemType.Elem() // 獲取指針指向的結(jié)構(gòu)體類型
            }


            // +++++++++++++++++++++++++++++

            // Check if f implements Serialize interface
            // (檢測傳遞的FaaS函數(shù)是否實現(xiàn)了Serialize接口)
            if isSerialize(itemType) {
                // 如果當前形參實現(xiàn)了Serialize接口洼滚,則使用當前形參的序列化實現(xiàn)
                serializeImpl = reflect.New(itemType).Interface().(Serialize)

            } else {
                // 如果當前形參沒有實現(xiàn)Serialize接口,則使用默認的序列化實現(xiàn)
                serializeImpl = defaultSerialize // Use global default implementation
            }
            // +++++++++++++++++++++++++++++++
            
        } else {
            // Other types are not supported
        }

        // 將當前形參類型追加到argsType集合中
        argsType[i] = paramType
    }

    // ... ...
    // ... ...

    // 返回FaaSDesc描述實例
    return &FaaSDesc{
        Serialize: serializeImpl,
        FnName:    fnName,
        f:         f,
        fName:     fullName,
        ArgsType:  argsType,
        ArgNum:    len(argsType),
        FuncType:  funcType,
        FuncValue: funcValue,
    }, nil
}

11.2.5 完成調(diào)度FaaS數(shù)據(jù)序列化

最后在調(diào)度FaaSDesc的時候技潘,解析形參的時候遥巴,如果是自定義的Slice參數(shù),則對齊進行反序列化操作享幽,將flow.Input()的原數(shù)據(jù)反序列化成為開發(fā)者需要的結(jié)構(gòu)體數(shù)據(jù)铲掐,進行調(diào)度FaaS,實現(xiàn)如下:

kis-flow/kis/pool.go

// CallFunction 調(diào)度 Function
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {

    if funcDesc, ok := pool.fnRouter[fnName]; ok {

        // 被調(diào)度Function的形參列表
        params := make([]reflect.Value, 0, funcDesc.ArgNum)

        for _, argType := range funcDesc.ArgsType {

            // 如果是Flow類型形參值桩,則將 flow的值傳入
            if isFlowType(argType) {
                params = append(params, reflect.ValueOf(flow))
                continue
            }

            // 如果是Context類型形參摆霉,則將 ctx的值傳入
            if isContextType(argType) {
                params = append(params, reflect.ValueOf(ctx))
                continue
            }

            // 如果是Slice類型形參,則將 flow.Input()的值傳入
            if isSliceType(argType) {

                // +++++++++++++++++++
                // 將flow.Input()中的原始數(shù)據(jù)奔坟,反序列化為argType類型的數(shù)據(jù)
                value, err := funcDesc.Serialize.UnMarshal(flow.Input(), argType)
                if err != nil {
                    log.Logger().ErrorFX(ctx, "funcDesc.Serialize.DecodeParam err=%v", err)
                } else {
                    params = append(params, value)
                    continue
                }
                // +++++++++++++++++++
            }

            // 傳遞的參數(shù)携栋,既不是Flow類型,也不是Context類型咳秉,也不是Slice類型婉支,則默認給到零值
            params = append(params, reflect.Zero(argType))
        }

        // 調(diào)用當前Function 的計算邏輯
        retValues := funcDesc.FuncValue.Call(params)

        // 取出第一個返回值,如果是nil澜建,則返回nil
        ret := retValues[0].Interface()
        if ret == nil {
            return nil
        }

        // 如果返回值是error類型磅摹,則返回error
        return retValues[0].Interface().(error)

    }

    log.Logger().ErrorFX(ctx, "FuncName: %s Can not find in KisPool, Not Added.\n", fnName)

    return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.")
}


這樣我們就將數(shù)據(jù)序列化的動作和FaaSDesc模塊結(jié)合起來了滋迈,接下來,我們寫一個單元測試來測試這部分的能力户誓。

11.3 自定義形參序列化單元測試

11.3.1 Flow與Function的配置文件定義

單元測試饼灿,我們新建兩個Function配置如下:

kis-flow/test/load_conf/func/func-avgStuScore.yml

kistype: func
fname: AvgStuScore
fmode: Calculate
source:
    name: 學生平均分
    must:
        - stu_id

kis-flow/test/load_conf/func/func-PrintStuAvgScore.yml

kistype: func
fname: PrintStuAvgScore
fmode: Expand
source:
    name: 學生平均分
    must:
        - stu_id

然后我們來定義一個Flow將上述的兩個Function鏈接起來

kis-flow/test/load_conf/flow/flow-StuAvg.yml

kistype: flow
status: 1
flow_name: StuAvg
flows:
    - fname: AvgStuScore
    - fname: PrintStuAvgScore

11.3.2 自定義基礎(chǔ)數(shù)據(jù)proto定義

kis-flow/test/下創(chuàng)建proto/文件夾,創(chuàng)建一個自定義的基礎(chǔ)數(shù)據(jù)proto帝美,為了今后數(shù)據(jù)協(xié)議的復(fù)用碍彭,如下:

kis-flow/test/proto/stu_score.go

package proto

// 學生學習分數(shù)
type StuScores struct {
    StuId  int `json:"stu_id"`
    Score1 int `json:"score_1"`
    Score2 int `json:"score_2"`
    Score3 int `json:"score_3"`
}

// 學生的平均分
type StuAvgScore struct {
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`
}

11.3.3 定義兩個FaaS計算回調(diào)函數(shù)

定義兩個FaaS計算函數(shù),一個為計算一個Student的平均分悼潭,一個打印Student的平均分庇忌,如下:

kis-flow/test/faas/faas_stu_score_avg.go

package faas

import (
    "context"
    "kis-flow/kis"
    "kis-flow/serialize"
    "kis-flow/test/proto"
)

type AvgStuScoreIn struct {
    serialize.DefaultSerialize
    proto.StuScores
}

type AvgStuScoreOut struct {
    serialize.DefaultSerialize
    proto.StuAvgScore
}

// AvgStuScore(FaaS) 計算學生平均分
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
    for _, row := range rows {
        avgScore := proto.StuAvgScore{
            StuId:    row.StuId,
            AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
        }
        // 提交結(jié)果數(shù)據(jù)
        _ = flow.CommitRow(AvgStuScoreOut{StuAvgScore: avgScore})
    }

    return nil
}

AvgStuScore()方法為我們改進之后的FaaS函數(shù),其中第三個形參rows []*AvgStuScoreIn為我們自定義序列化的形參舰褪,之前我們通過flow.Input()來拿到原始的數(shù)據(jù)皆疹,然后進行遍歷,其實現(xiàn)在依然可以這么處理占拍,但是每次可能需要開發(fā)者在FaaS中自行斷言判斷略就,對開發(fā)的效率有些成本,那么現(xiàn)在開發(fā)者完全可以通過AvgStuScoreIn來描述一個形參的數(shù)據(jù)晃酒,然后在AvgStuScore的業(yè)務(wù)中表牢,通過遍歷rows得到已經(jīng)序列化好的結(jié)構(gòu)體,增加的代碼的可讀性也降低的寫業(yè)務(wù)的開發(fā)成本贝次,提高了效率崔兴。
打印平均分的FaaS實現(xiàn)如下:

kis-flow/test/faas/faas_stu_score_avg_print.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
    "kis-flow/serialize"
    "kis-flow/test/proto"
)

type PrintStuAvgScoreIn struct {
    serialize.DefaultSerialize
    proto.StuAvgScore
}

type PrintStuAvgScoreOut struct {
    serialize.DefaultSerialize
}

func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {

    for _, row := range rows {
        fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
    }

    return nil
}

與上述函數(shù)一樣,我們依然采用自定義的輸入形參來進行邏輯開發(fā)蛔翅。

11.3.4 單元測試用例

接下來我們來編寫上面Flow的測試用例單元測試敲茄,代碼如下:

kis-flow/test/kis_auto_inject_param_test.go

package test

import (
    "context"
    "kis-flow/common"
    "kis-flow/config"
    "kis-flow/file"
    "kis-flow/flow"
    "kis-flow/kis"
    "kis-flow/test/faas"
    "kis-flow/test/proto"
    "testing"
)

func TestAutoInjectParamWithConfig(t *testing.T) {
    ctx := context.Background()

    kis.Pool().FaaS("AvgStuScore", faas.AvgStuScore)
    kis.Pool().FaaS("PrintStuAvgScore", faas.PrintStuAvgScore)

    // 1. 加載配置文件并構(gòu)建Flow
    if err := file.ConfigImportYaml("load_conf/"); err != nil {
        panic(err)
    }

    // 2. 獲取Flow
    flow1 := kis.Pool().GetFlow("StuAvg")
    if flow1 == nil {
        panic("flow1 is nil")
    }

    // 3. 提交原始數(shù)據(jù)
    _ = flow1.CommitRow(&faas.AvgStuScoreIn{
        StuScores: proto.StuScores{
            StuId:  100,
            Score1: 1,
            Score2: 2,
            Score3: 3,
        },
    })
    _ = flow1.CommitRow(faas.AvgStuScoreIn{
        StuScores: proto.StuScores{
            StuId:  100,
            Score1: 1,
            Score2: 2,
            Score3: 3,
        },
    })

    // 提交原始數(shù)據(jù)(json字符串)
    _ = flow1.CommitRow(`{"stu_id":101}`)

    // 4. 執(zhí)行flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}


在提交原始數(shù)據(jù)的時候,我們這里面采用的是使用默認的序列化方式山析,支持json的反序列化支持折汞,在CommitRow()的時候,一共提交的3條數(shù)據(jù)盖腿,前兩條是提交的結(jié)構(gòu)體數(shù)據(jù),最后一次是提交的json字符串损同,目前都可以支持翩腐。

cd 到kis-flow/test/下,執(zhí)行:

$ go test -test.v -test.paniconexit0 -test.run TestAutoInjectParamWithConfig

得到結(jié)果如下:

$ go test -test.v -test.paniconexit0 -test.run TestAutoInjectParamWithConfig
...
...
Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
...
...
Add FlowRouter FlowName=StuAvg
context.Background
====> After CommitSrcData, flow_name = StuAvg, flow_id = flow-1265702bc905400da1788c0354080ded
All Level Data =
 map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}]]

KisFunctionC, flow = &{Id:flow-1265702bc905400da1788c0354080ded Name:StuAvg Conf:0xc000286100 Funcs:map[AvgStuScore:0xc00023af80 PrintStuAvgScore:0xc00023b000] FlowHead:0xc00023af80 FlowTail:0xc00023b000 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00023af80 ThisFunctionId:func-12a05e62a12a45fdade8477a3bddd2fd PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-12a05e62a12a45fdade8477a3bddd2fd:map[] func-7f308d00f4fa49488760ff1dfb85dc46:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}]] inPut:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc000210b88 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}

context.Background
 ====> After commitCurData, flow_name = StuAvg, flow_id = flow-1265702bc905400da1788c0354080ded
All Level Data =
 map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}] func-12a05e62a12a45fdade8477a3bddd2fd:[{DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:101 AvgScore:0}}]]

KisFunctionE, flow = &{Id:flow-1265702bc905400da1788c0354080ded Name:StuAvg Conf:0xc000286100 Funcs:map[AvgStuScore:0xc00023af80 PrintStuAvgScore:0xc00023b000] FlowHead:0xc00023af80 FlowTail:0xc00023b000 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00023b000 ThisFunctionId:func-7f308d00f4fa49488760ff1dfb85dc46 PrevFunctionId:func-12a05e62a12a45fdade8477a3bddd2fd funcParams:map[func-12a05e62a12a45fdade8477a3bddd2fd:map[] func-7f308d00f4fa49488760ff1dfb85dc46:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}] func-12a05e62a12a45fdade8477a3bddd2fd:[{DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:101 AvgScore:0}}]] inPut:[{DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:101 AvgScore:0}}] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc000210b88 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}

stuid: [100], avg score: [2]
stuid: [100], avg score: [2]
stuid: [101], avg score: [0]
--- PASS: TestAutoInjectParamWithConfig (0.01s)
PASS
ok      kis-flow/test   0.030s

11.4 【V1.0】 源代碼

https://github.com/aceld/kis-flow/releases/tag/v1.0


作者:劉丹冰Aceld github: https://github.com/aceld
KisFlow開源項目地址:https://github.com/aceld/kis-flow

Golang框架實戰(zhàn)-KisFlow流式計算框架專欄

Golang框架實戰(zhàn)-KisFlow流式計算框架(1)-概述
Golang框架實戰(zhàn)-KisFlow流式計算框架(2)-項目構(gòu)建/基礎(chǔ)模塊-(上)
Golang框架實戰(zhàn)-KisFlow流式計算框架(3)-項目構(gòu)建/基礎(chǔ)模塊-(下)
Golang框架實戰(zhàn)-KisFlow流式計算框架(4)-數(shù)據(jù)流
Golang框架實戰(zhàn)-KisFlow流式計算框架(5)-Function調(diào)度
Golang框架實戰(zhàn)-KisFlow流式計算框架(6)-Connector
Golang框架實戰(zhàn)-KisFlow流式計算框架(7)-配置導(dǎo)入與導(dǎo)出
Golang框架實戰(zhàn)-KisFlow流式計算框架(8)-KisFlow Action
Golang框架實戰(zhàn)-KisFlow流式計算框架(10)-Flow多副本

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末膏燃,一起剝皮案震驚了整個濱河市茂卦,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌组哩,老刑警劉巖等龙,帶你破解...
    沈念sama閱讀 219,490評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件处渣,死亡現(xiàn)場離奇詭異,居然都是意外死亡蛛砰,警方通過查閱死者的電腦和手機罐栈,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,581評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來泥畅,“玉大人荠诬,你說我怎么就攤上這事∥蝗剩” “怎么了柑贞?”我有些...
    開封第一講書人閱讀 165,830評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長聂抢。 經(jīng)常有香客問我钧嘶,道長,這世上最難降的妖魔是什么琳疏? 我笑而不...
    開封第一講書人閱讀 58,957評論 1 295
  • 正文 為了忘掉前任有决,我火速辦了婚禮,結(jié)果婚禮上轿亮,老公的妹妹穿的比我還像新娘疮薇。我一直安慰自己,他們只是感情好我注,可當我...
    茶點故事閱讀 67,974評論 6 393
  • 文/花漫 我一把揭開白布按咒。 她就那樣靜靜地躺著,像睡著了一般但骨。 火紅的嫁衣襯著肌膚如雪励七。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,754評論 1 307
  • 那天奔缠,我揣著相機與錄音掠抬,去河邊找鬼。 笑死校哎,一個胖子當著我的面吹牛两波,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播闷哆,決...
    沈念sama閱讀 40,464評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼腰奋,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了抱怔?” 一聲冷哼從身側(cè)響起劣坊,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎屈留,沒想到半個月后局冰,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體冤今,經(jīng)...
    沈念sama閱讀 45,847評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡万俗,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,995評論 3 338
  • 正文 我和宋清朗相戀三年讽挟,在試婚紗的時候發(fā)現(xiàn)自己被綠了递沪。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,137評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡赠摇,死狀恐怖固逗,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情藕帜,我是刑警寧澤烫罩,帶...
    沈念sama閱讀 35,819評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站洽故,受9級特大地震影響贝攒,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜时甚,卻給世界環(huán)境...
    茶點故事閱讀 41,482評論 3 331
  • 文/蒙蒙 一隘弊、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧荒适,春花似錦梨熙、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,023評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至陕壹,卻和暖如春质欲,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背糠馆。 一陣腳步聲響...
    開封第一講書人閱讀 33,149評論 1 272
  • 我被黑心中介騙來泰國打工嘶伟, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人又碌。 一個月前我還...
    沈念sama閱讀 48,409評論 3 373
  • 正文 我出身青樓九昧,卻偏偏與公主長得像,于是被迫代替她去往敵國和親毕匀。 傳聞我的和親對象是個殘疾皇子铸鹰,可洞房花燭夜當晚...
    茶點故事閱讀 45,086評論 2 355

推薦閱讀更多精彩內(nèi)容