Golang框架實戰(zhàn)-KisFlow流式計算框架(6)-Connector

Golang框架實戰(zhàn)-KisFlow流式計算框架專欄


Golang框架實戰(zhàn)-KisFlow流式計算框架(1)-概述
Golang框架實戰(zhàn)-KisFlow流式計算框架(2)-項目構(gòu)建/基礎(chǔ)模塊-(上)
Golang框架實戰(zhàn)-KisFlow流式計算框架(3)-項目構(gòu)建/基礎(chǔ)模塊-(下)
Golang框架實戰(zhàn)-KisFlow流式計算框架(4)-數(shù)據(jù)流
Golang框架實戰(zhàn)-KisFlow流式計算框架(5)-Function調(diào)度
Golang框架實戰(zhàn)-KisFlow流式計算框架(6)-Connector

本章將設(shè)計KisFlow的Connector模塊,期功能及作用主要為掛載在某個Function下,執(zhí)行第三方存儲引擎的邏輯档桃。

5.1 Connector定義

KisFlow中提供Connector寂殉,來給開發(fā)者定義第三方存儲引擎的自定義讀寫插件模式。如果數(shù)據(jù)流的數(shù)據(jù)需要臨時從某引擎去讀或者需要存儲到某個存儲引擎中,可以通過Connector來編寫相對應的讀寫邏輯,并且通過配置,自定義掛載在Flow中的某個Function中播赁。Connector也是靈活配置的。這樣具有相同邏輯的存儲算法可以在多個Function中進行復用吼渡。

5.1.1 Connector的抽象層定義

kis-flow/kis/創(chuàng)建connector.go文件容为,這里用來定義Connector的抽象接口,如下:

kis-flow/kis/connector.go

package kis

import (
    "context"
    "kis-flow/config"
)

type Connector interface {
    // Init 初始化Connector所關(guān)聯(lián)的存儲引擎鏈接等
    Init() error
    // Call 調(diào)用Connector 外掛存儲邏輯的讀寫操作
    Call(ctx context.Context, flow Flow, args interface{}) error
    // GetId 獲取Connector的ID
    GetId() string
    // GetName 獲取Connector的名稱
    GetName() string
    // GetConfig 獲取Connector的配置信息
    GetConfig() *config.KisConnConfig
}

Connector 目前階段提供的主要接口有兩個:

  • Init() 主要為當前Connector所關(guān)聯(lián)的第三方存儲引擎的初始化邏輯寺酪,如創(chuàng)建鏈接登操作坎背,Init在Connector實例的生命周期只會被執(zhí)行一次。
  • Call() 主要為Connector的調(diào)度入口寄雀,相關(guān)存儲的讀寫自定義邏輯是通過Call()方法來觸發(fā)調(diào)度得滤,具體的回調(diào)函數(shù)原型在Router模塊定義。

5.1.2 Connector相關(guān)路由成員類型定義

通過上述的接口盒犹,我們得知懂更,一個Connector實例要配置兩個自定義方法,一個通過Init() 接口來調(diào)用急膀,一個通過Call() 接口來調(diào)用沮协。下面就需要對這兩種回調(diào)原型做定義。

(1) Connector Init

kis-flow/kis/router.go

/*
    Connector Init
*/
// ConnInit Connector 第三方掛載存儲初始化
type ConnInit func(conn Connector) error

// connInitRouter
//key:
type connInitRouter map[string]ConnInit

  • ConnInit為初始化回調(diào)函數(shù)原型卓嫂,參數(shù)為當前Connector實例指針皂股。
  • connInitRouter為管理ConnInit的路由,key為ConnName命黔。

(2)Connector Call

kis-flow/kis/router.go

/*
    Connector Call
*/
// CaaS Connector的存儲讀取業(yè)務(wù)實現(xiàn)
type CaaS func(context.Context, Connector, Function, Flow, interface{}) error

// connFuncRouter 通過FunctionName索引到CaaS回調(diào)存儲業(yè)務(wù)的映射關(guān)系
// key: Function Name
// value: Connector的存儲讀取業(yè)務(wù)實現(xiàn)
type connFuncRouter map[string]CaaS

  • CaaS為Connector執(zhí)行存儲讀寫邏輯的自定義回調(diào)函數(shù)原型,參數(shù)為Connector就斤、Function悍募、Flow指針,開發(fā)者可以通過這三個實例得到業(yè)務(wù)想要的一些參數(shù)洋机。最后一個參數(shù)為自定義參數(shù)坠宴,在Fcuntion調(diào)度Connector的時候,可以由開發(fā)者自定義輸入绷旗。
  • connFuncRouter為管理CaaS的路由喜鼓,注意:key為當前Connector所掛載的Function Name副砍。所以在調(diào)度Connector Call必須是由Function進行調(diào)度。

這里由于Connector只有Function位Save或者Load模式才能夠掛載庄岖,為了今后方便統(tǒng)計或者索引豁翎,還需要針對connFuncRouter進行分組,分成Save組和Load組隅忿。下面針對這兩個組進行成員映射關(guān)系類型的定義心剥,如下:

kis-flow/kis/router.go

// connSL 通過KisMode 將connFuncRouter分為兩個子樹
// key: Function KisMode S/L
// value: NsConnRouter
type connSL map[common.KisMode]connFuncRouter

// connTree
// key: Connector Name
// value: connSL 二級樹
type connTree map[string]connSL
  • connSL是根據(jù)KisMode分的組,成員為之前的Connector的Call路由器背桐。
  • connTree是通過Connector Name進行索引的映射管理优烧,通過Connector Name+Function Mode + Function Name可以確定調(diào)度的Connector Call函數(shù)。

5.2 Connector路由管理

上一節(jié)對Connector需要的路由管理類型做了定義链峭,接下來需要將這些路由的添加與調(diào)度進行管理畦娄。
Connector的路由管理和Function一樣,統(tǒng)一被KisPool模塊進行管理弊仪。

5.2.1 KisPool新增Connector相關(guān)路由管理成員

(1) 添加成員

kis-flow/kis/pool.go

//  kisPool 用于管理全部的Function和Flow配置的池子
type kisPool struct {
    fnRouter funcRouter   // 全部的Function管理路由
    fnLock   sync.RWMutex // fnRouter 鎖

    flowRouter flowRouter   // 全部的flow對象
    flowLock   sync.RWMutex // flowRouter 鎖

    // +++++++++++++++++ 
    cInitRouter connInitRouter // 全部的Connector初始化路由
    ciLock      sync.RWMutex   // cInitRouter 鎖

    cTree      connTree             //全部Connector管理路由
    connectors map[string]Connector // 全部的Connector對象
    cLock      sync.RWMutex         // cTree 鎖
    // +++++++++++++++++ 
}

(2) 相關(guān)Map變量初始化

kis-flow/kis/pool.go

// Pool 單例構(gòu)造
func Pool() *kisPool {
    _poolOnce.Do(func() {
        //創(chuàng)建kisPool對象
        _pool = new(kisPool)

        // fnRouter初始化
        _pool.fnRouter = make(funcRouter)

        // flowRouter初始化
        _pool.flowRouter = make(flowRouter)

        // +++++++++++++++++++++++++
        // connTree初始化
        _pool.cTree = make(connTree)
        _pool.cInitRouter = make(connInitRouter)
        _pool.connectors = make(map[string]Connector)
        // +++++++++++++++++++++++++
    })

    return _pool
}

(3) 注冊Connector Init方法

kis-flow/kis/pool.go

// CaaSInit 注冊Connector初始化業(yè)務(wù)
func (pool *kisPool) CaaSInit(cname string, c ConnInit) {
    pool.ciLock.Lock() // 寫鎖
    defer pool.ciLock.Unlock()

    if _, ok := pool.cInitRouter[cname]; !ok {
        pool.cInitRouter[cname] = c
    } else {
        errString := fmt.Sprintf("KisPool Reg CaaSInit Repeat CName=%s\n", cname)
        panic(errString)
    }

    log.Logger().InfoF("Add KisPool CaaSInit CName=%s", cname)
}

(4) 執(zhí)行Connector Init方法

kis-flow/kis/pool.go

// CallConnInit 調(diào)度 ConnInit
func (pool *kisPool) CallConnInit(conn Connector) error {
    pool.ciLock.RLock() // 讀鎖
    defer pool.ciLock.RUnlock()

    init, ok := pool.cInitRouter[conn.GetName()]

    if !ok {
        panic(errors.New(fmt.Sprintf("init connector cname = %s not reg..", conn.GetName())))
    }

    return init(conn)
}

邏輯很簡單熙卡,先加鎖保護,然后進行成員key/value鍵值對添加撼短。這里由于是路由動作再膳,添加失敗則直接panic()退出進程。

(5) 注冊Connector Call方法

kis-flow/kis/pool.go

// CaaS 注冊Connector Call業(yè)務(wù)
func (pool *kisPool) CaaS(cname string, fname string, mode common.KisMode, c CaaS) {
    pool.cLock.Lock() // 寫鎖
    defer pool.cLock.Unlock()

    if _, ok := pool.cTree[cname]; !ok {
        //cid 首次注冊曲横,不存在喂柒,創(chuàng)建二級樹NsConnSL
        pool.cTree[cname] = make(connSL)

        //初始化各類型FunctionMode
        pool.cTree[cname][common.S] = make(connFuncRouter)
        pool.cTree[cname][common.L] = make(connFuncRouter)
    }

    if _, ok := pool.cTree[cname][mode][fname]; !ok {
        pool.cTree[cname][mode][fname] = c
    } else {
        errString := fmt.Sprintf("CaaS Repeat CName=%s, FName=%s, Mode =%s\n", cname, fname, mode)
        panic(errString)
    }

    log.Logger().InfoF("Add KisPool CaaS CName=%s, FName=%s, Mode =%s", cname, fname, mode)
}

KisPool的CaaS方法就是注冊自己的Connector連接器的邏輯處理回調(diào)函數(shù),注冊的時候禾嫉,會通過傳遞進來的參數(shù)將函數(shù)注冊到相對應的路由分組中去灾杰。

(6) 執(zhí)行Connector Call方法

kis-flow/kis/pool.go

// CallConnector 調(diào)度 Connector
func (pool *kisPool) CallConnector(ctx context.Context, flow Flow, conn Connector, args interface{}) error {
    fn := flow.GetThisFunction()
    fnConf := fn.GetConfig()
    mode := common.KisMode(fnConf.FMode)

    if callback, ok := pool.cTree[conn.GetName()][mode][fnConf.FName]; ok {
        return callback(ctx, conn, fn, flow, args)
    }

    log.Logger().ErrorFX(ctx, "CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.\n", conn.GetName(), fnConf.FName, mode)

    return errors.New(fmt.Sprintf("CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.", conn.GetName(), fnConf.FName, mode))
}

CallConnector則是做通過ConnectorName、Function Mode熙参、Function Name做索引找到對應的CaaS函數(shù)艳吠,并且執(zhí)行。

5.3 KisConnector

接下來我們來按照Connector的抽象層孽椰,來定義且實現(xiàn)KisConnector模塊昭娩。在kis-flow/conn/目錄下創(chuàng)建kis_connector.go文件。

5.3.1 KisConnector 定義

kis-flow/conn/kis_connector.go

package conn

import (
    "context"
    "kis-flow/common"
    "kis-flow/config"
    "kis-flow/id"
    "kis-flow/kis"
    "sync"
)

type KisConnector struct {
    // Connector ID
    CId string
    // Connector Name
    CName string
    // Connector Config
    Conf *config.KisConnConfig

    // Connector Init
    onceInit sync.Once
}

KisConnector 除了標識實例的KisID 還有 名稱外黍匾,還包含了當前KisConnector的配置信息KisConnConfig栏渺,這里面具有一個sync.Once成員,這個是用來控制Connector Init在生命周期只被執(zhí)行一次的限定锐涯,稍后會提到磕诊。

5.3.1 KisConnector 構(gòu)造方法

kis-flow/conn/kis_connector.go

// NewKisConnector 根據(jù)配置策略創(chuàng)建一個KisConnector
func NewKisConnector(config *config.KisConnConfig) *KisConnector {
    conn := new(KisConnector)
    conn.CId = id.KisID(common.KisIdTypeConnnector)
    conn.CName = config.CName
    conn.Conf = config

    return conn
}

創(chuàng)建一個KisConnector實例是需要先有KisConnConfig的配置信息。

5.3.2 實現(xiàn)Connector接口

kis-flow/conn/kis_connector.go

// Init 初始化Connector所關(guān)聯(lián)的存儲引擎鏈接等
func (conn *KisConnector) Init() error {
    var err error

    //一個Connector只能執(zhí)行初始化業(yè)務(wù)一次
    conn.onceInit.Do(func() {
        err = kis.Pool().CallConnInit(conn)
    })

    return err
}

// Call 調(diào)用Connector 外掛存儲邏輯的讀寫操作
func (conn *KisConnector) Call(ctx context.Context, flow kis.Flow, args interface{}) error {
    if err := kis.Pool().CallConnector(ctx, flow, conn, args); err != nil {
        return err
    }

    return nil
}

func (conn *KisConnector) GetName() string {
    return conn.CName
}

func (conn *KisConnector) GetConfig() *config.KisConnConfig {
    return conn.Conf
}

func (conn *KisConnector) GetId() string {
    return conn.CId
}

  • Init()方法通過Once來限定調(diào)用次數(shù),最終會通過KisPool來進行路由映射且調(diào)度霎终。
  • Call()同樣通過KisPool進行調(diào)度滞磺。

那么KisConnector的Init()和Call()在什么時候會被調(diào)用呢? 接下來我們需要實現(xiàn)KisConnConfig莱褒,將Connector的層級關(guān)系與Function和Flow進行關(guān)聯(lián)击困。

5.4 KisConnConfig配置信息

NewKisConnector()形參為KisConnConfig,所以開發(fā)者在創(chuàng)建一個Connector實例保礼,首先要先創(chuàng)建一個Connector的配置信息KisConnConfig沛励。在V0.1版本中,我們已經(jīng)實現(xiàn)了KisConnConfig模塊的定義以及創(chuàng)建炮障,代碼如下:

kis-flow/config/kis_conn_config.go

package config

import (
    "errors"
    "fmt"
    "kis-flow/common"
)

// KisConnConfig KisConnector 策略配置
type KisConnConfig struct {
    //配置類型
    KisType string `yaml:"kistype"`
    //唯一描述標識
    CName string `yaml:"cname"`
    //基礎(chǔ)存儲媒介地址
    AddrString string `yaml:"addrs"`
    //存儲媒介引擎類型"Mysql" "Redis" "Kafka"等
    Type common.KisConnType `yaml:"type"`
    //一次存儲的標識:如Redis為Key名稱目派、Mysql為Table名稱,Kafka為Topic名稱等
    Key string `yaml:"key"`
    //配置信息中的自定義參數(shù)
    Params map[string]string `yaml:"params"`
    //存儲讀取所綁定的NsFuncionID
    Load []string `yaml:"load"`
    Save []string `yaml:"save"`
}

// NewConnConfig 創(chuàng)建一個KisConnector策略配置對象, 用于描述一個KisConnector信息
func NewConnConfig(cName string, addr string, t common.KisConnType, key string, param FParam) *KisConnConfig {
    strategy := new(KisConnConfig)
    strategy.CName = cName
    strategy.AddrString = addr

    strategy.Type = t
    strategy.Key = key
    strategy.Params = param

    return strategy
}

// WithFunc Connector與Function進行關(guān)系綁定
func (cConfig *KisConnConfig) WithFunc(fConfig *KisFuncConfig) error {

    switch common.KisMode(fConfig.FMode) {
    case common.S:
        cConfig.Save = append(cConfig.Save, fConfig.FName)
    case common.L:
        cConfig.Load = append(cConfig.Load, fConfig.FName)
    default:
        return errors.New(fmt.Sprintf("Wrong KisMode %s", fConfig.FMode))
    }

    return nil
}

5.4.1 KisFuncConfig 添加KisConnConfig成員

首先我們給KisFuncConfig 添加KisConnConfig成員。

kis-flow/config/kis_func_config.go

// KisFuncConfig 一個KisFunction策略配置
type KisFuncConfig struct {
    KisType  string        `yaml:"kistype"`
    FName    string        `yaml:"fname"`
    FMode    string        `yaml:"fmode"`
    Source   KisSource     `yaml:"source"`
    Option   KisFuncOption `yaml:"option"`
    // ++++++++++
    connConf *KisConnConfig
}

然后提供對該成員的添加和讀取方法

kis-flow/config/kis_func_config.go

func (fConf *KisFuncConfig) AddConnConfig(cConf *KisConnConfig) error {
    if cConf == nil {
        return errors.New("KisConnConfig is nil")
    }

    // Function需要和Connector進行關(guān)聯(lián)
    fConf.connConf = cConf

    // Connector需要和Function進行關(guān)聯(lián)
    _ = cConf.WithFunc(fConf)

    return nil
}

func (fConf *KisFuncConfig) GetConnConfig() (*KisConnConfig, error) {
    if fConf.connConf == nil {
        return nil, errors.New("KisFuncConfig.connConf not set")
    }

    return fConf.connConf, nil
}

這樣我們可以通過Funciton的配置信息就能夠查到相關(guān)聯(lián)的Connector配置信息胁赢。

5.5 Function/Flow與Connector關(guān)聯(lián)

5.5.1 Function與Connector關(guān)聯(lián)

kis-flow/kis/function.go

package kis

import (
    "context"
    "kis-flow/config"
)

// Function 流式計算基礎(chǔ)計算模塊企蹭,KisFunction是一條流式計算的基本計算邏輯單元,
//             任意個KisFunction可以組合成一個KisFlow
type Function interface {
    // Call 執(zhí)行流式計算邏輯
    Call(ctx context.Context, flow Flow) error

    // SetConfig 給當前Function實例配置策略
    SetConfig(s *config.KisFuncConfig) error
    // GetConfig 獲取當前Function實例配置策略
    GetConfig() *config.KisFuncConfig

    // SetFlow 給當前Function實例設(shè)置所依賴的Flow實例
    SetFlow(f Flow) error
    // GetFlow 獲取當前Functioin實力所依賴的Flow
    GetFlow() Flow

    // ++++++++++++++++++++
    // AddConnector 給當前Function實例添加一個Connector
    AddConnector(conn Connector) error
    // GetConnector 獲取當前Function實例所關(guān)聯(lián)的Connector
    GetConnector() Connector
    // ++++++++++++++++++++

    // CreateId 給當前Funciton實力生成一個隨機的實例KisID
    CreateId()
    // GetId 獲取當前Function的FID
    GetId() string
    // GetPrevId 獲取當前Function上一個Function節(jié)點FID
    GetPrevId() string
    // GetNextId 獲取當前Function下一個Function節(jié)點FID
    GetNextId() string

    // Next 返回下一層計算流Function智末,如果當前層為最后一層谅摄,則返回nil
    Next() Function
    // Prev 返回上一層計算流Function,如果當前層為最后一層系馆,則返回nil
    Prev() Function
    // SetN 設(shè)置下一層Function實例
    SetN(f Function)
    // SetP 設(shè)置上一層Function實例
    SetP(f Function)
}

接下來我們在BaseFunction中實現(xiàn):

kis-flow/function/kis_base_function.go

type BaseFunction struct {
    // Id , KisFunction的實例ID送漠,用于KisFlow內(nèi)部區(qū)分不同的實例對象
    Id     string
    Config *config.KisFuncConfig

    // flow
    flow kis.Flow //上下文環(huán)境KisFlow

    // ++++++++++++++
    // connector
    connector kis.Connector
    // ++++++++++++++

    // link
    N kis.Function //下一個流計算Function
    P kis.Function //上一個流計算Function
}

// ........
// ........

// AddConnector 給當前Function實例添加一個Connector
func (base *BaseFunction) AddConnector(conn kis.Connector) error {
    if conn == nil {
        return errors.New("conn is nil")
    }

    base.connector = conn

    return nil
}

// GetConnector 獲取當前Function實例所關(guān)聯(lián)的Connector
func (base *BaseFunction) GetConnector() kis.Connector {
    return base.connector
}

這樣一個Function實例就可以獲取到Connector實例的信息。

5.5.2 Flow與Connector關(guān)聯(lián)

同樣由蘑,F(xiàn)low中也需要獲取到Connector的信息闽寡,這里面也需要將Flow和Connector的關(guān)系進行簡單的關(guān)聯(lián)。

kis-flow/kis/flow.go

package kis

import (
    "context"
    "kis-flow/common"
    "kis-flow/config"
)

type Flow interface {
    // Run 調(diào)度Flow尼酿,依次調(diào)度Flow中的Function并且執(zhí)行
    Run(ctx context.Context) error
    // Link 將Flow中的Function按照配置文件中的配置進行連接
    Link(fConf *config.KisFuncConfig, fParams config.FParam) error
    // CommitRow 提交Flow數(shù)據(jù)到即將執(zhí)行的Function層
    CommitRow(row interface{}) error
    // Input 得到flow當前執(zhí)行Function的輸入源數(shù)據(jù)
    Input() common.KisRowArr
    // GetName 得到Flow的名稱
    GetName() string
    // GetThisFunction 得到當前正在執(zhí)行的Function
    GetThisFunction() Function
    // GetThisFuncConf 得到當前正在執(zhí)行的Function的配置
    GetThisFuncConf() *config.KisFuncConfig
    // GetConnector 得到當前正在執(zhí)行的Function的Connector
    // +++++++++++++++++++++++++++++++++
    GetConnector() (Connector, error)
    // GetConnConf 得到當前正在執(zhí)行的Function的Connector的配置
    GetConnConf() (*config.KisConnConfig, error)
    // +++++++++++++++++++++++++++++++++
}

新增 GetConnector()GetConnConf()接口爷狈,分別獲取Connector實例和Connector配置。
之后裳擎,在KisFlow中實現(xiàn)這兩個方法涎永。

kis-flow/flow/kis_flow.go

// GetConnector 得到當前正在執(zhí)行的Function的Connector
func (flow *KisFlow) GetConnector() (kis.Connector, error) {
    if conn := flow.ThisFunction.GetConnector(); conn != nil {
        return conn, nil
    } else {
        return nil, errors.New("GetConnector(): Connector is nil")
    }
}

// GetConnConf 得到當前正在執(zhí)行的Function的Connector的配置
func (flow *KisFlow) GetConnConf() (*config.KisConnConfig, error) {
    if conn := flow.ThisFunction.GetConnector(); conn != nil {
        return conn.GetConfig(), nil
    } else {
        return nil, errors.New("GetConnConf(): Connector is nil")
    }
}

實際上依然是通過當前Flow正在執(zhí)行的Function來獲取到Connector信息。

5.5.3 Function鏈接Connector

按照之前的配置文件定義鹿响,function的yaml配置文件如下:

nstype: func
fname: 測試Nsfunction_L1
fmode: Load
source:
  name: 被校驗的測試數(shù)據(jù)源1-學生班級維度
  must:
    - stuid
    - classid
option:
  cname: 測試NsConnector_2

這里面有一個Option羡微,其中有一個成員cname,如果當前Function配置了Connector惶我,則需要在當前的Option中配置cname拷淘,并填寫Connector的名稱cname。

那么在當Flow去鏈接一個Funciton的時候指孤,在Function的實例被創(chuàng)建之后,可以通過Function的配置得到是否攜帶Connector,如果攜帶則也要新建Connector實例恃轩,代碼如下:

kis-flow/flow/kis_flow.go

// Link 將Function鏈接到Flow中
// fConf: 當前Function策略
// fParams: 當前Flow攜帶的Function動態(tài)參數(shù)
func (flow *KisFlow) Link(fConf *config.KisFuncConfig, fParams config.FParam) error {
    // 創(chuàng)建Function實例
    f := function.NewKisFunction(flow, fConf)

    // ++++++++++++++++++++++++++++++
    if fConf.Option.CName != "" {
        // 當前Function有Connector關(guān)聯(lián)结洼,需要初始化Connector實例

        // 獲取Connector配置
        connConfig, err := fConf.GetConnConfig()
        if err != nil {
            panic(err)
        }

        // 創(chuàng)建Connector對象
        connector := conn.NewKisConnector(connConfig)

        // 初始化Connector, 執(zhí)行Connector Init 方法
        if err = connector.Init(); err != nil {
            panic(err)
        }

        // 關(guān)聯(lián)Function實例和Connector實例關(guān)系
        _ = f.AddConnector(connector)
    }
    // ++++++++++++++++++++++++++++++

    
    // Flow 添加 Function
    if err := flow.appendFunc(f, fParams); err != nil {
        return err
    }

    return nil
}

這樣一個Connector實例就被創(chuàng)建了。

5.6 KisConnector單元測試

接下來來對KisConnector做單元測試叉跛。

5.6.1 單元測試

創(chuàng)建kis-flow/test/kis_connector_test.go文件:

package test

import (
    "context"
    "kis-flow/common"
    "kis-flow/config"
    "kis-flow/flow"
    "kis-flow/kis"
    "kis-flow/test/caas"
    "kis-flow/test/faas"
    "testing"
)

func TestNewKisConnector(t *testing.T) {

    ctx := context.Background()

    // 0. 注冊Function 回調(diào)業(yè)務(wù)
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. 注冊ConnectorInit 和 Connector 回調(diào)業(yè)務(wù)
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. 創(chuàng)建3個KisFunction配置實例, 其中myFuncConfig2 有Connector配置
    source1 := config.KisSource{
        Name: "公眾號抖音商城戶訂單數(shù)據(jù)",
        Must: []string{"order_id", "user_id"},
    }

    source2 := config.KisSource{
        Name: "用戶訂單錯誤率",
        Must: []string{"order_id", "user_id"},
    }

    myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source1, nil)
    if myFuncConfig1 == nil {
        panic("myFuncConfig1 is nil")
    }

    option := config.KisFuncOption{
        CName: "ConnName1",
    }

    myFuncConfig2 := config.NewFuncConfig("funcName2", common.S, &source2, &option)
    if myFuncConfig2 == nil {
        panic("myFuncConfig2 is nil")
    }

    myFuncConfig3 := config.NewFuncConfig("funcName3", common.E, &source2, nil)
    if myFuncConfig3 == nil {
        panic("myFuncConfig3 is nil")
    }

    // 2. 創(chuàng)建一個KisConnector配置實例
    myConnConfig1 := config.NewConnConfig("ConnName1", "0.0.0.0:9998", common.REDIS, "redis-key", nil)
    if myConnConfig1 == nil {
        panic("myConnConfig1 is nil")
    }

    // 3. 將KisConnector配置實例綁定到KisFunction配置實例上
    _ = myFuncConfig2.AddConnConfig(myConnConfig1)

    // 4. 創(chuàng)建一個 KisFlow 配置實例
    myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)

    // 5. 創(chuàng)建一個KisFlow對象
    flow1 := flow.NewKisFlow(myFlowConfig1)

    // 6. 拼接Functioin 到 Flow 上
    if err := flow1.Link(myFuncConfig1, nil); err != nil {
        panic(err)
    }
    if err := flow1.Link(myFuncConfig2, nil); err != nil {
        panic(err)
    }
    if err := flow1.Link(myFuncConfig3, nil); err != nil {
        panic(err)
    }

    // 7. 提交原始數(shù)據(jù)
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 8. 執(zhí)行flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}

注意這里面funcName2 為關(guān)聯(lián)Connector的Function松忍。 所以在創(chuàng)建Function2的Config的時候提供了Option信息,且提供了關(guān)聯(lián)的Connector名稱筷厘。

5.6.2 Function Callback 與 Conn CallBack

為了方便管理CallBack業(yè)務(wù)鸣峭,我們在kis-flow/test/目錄下,分別創(chuàng)建kis-flow/test/faas/kis-flow/test/caas/目錄酥艳。
分別建立如下文件摊溶,每一個文件寫一種自定義業(yè)務(wù)。

├── caas
│   ├── caas_demo1.go
│   └── caas_init1.go
├── faas
│   ├── faas_demo1.go
│   ├── faas_demo2.go
│   └── faas_demo3.go

(1) FuncName1 的回調(diào)業(yè)務(wù)

kis-flow/test/faas/faas_demo1.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

// type FaaS func(context.Context, Flow) error

func FuncDemo1Handler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call funcName1Handler ----")

    for index, row := range flow.Input() {
        // 打印數(shù)據(jù)
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)

        // 計算結(jié)果數(shù)據(jù)
        resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)

        // 提交結(jié)果數(shù)據(jù)
        _ = flow.CommitRow(resultStr)
    }

    return nil
}

這個作為我們第一個Function充石,打印數(shù)據(jù)莫换,并且再生產(chǎn)一些數(shù)據(jù)。

(2) FuncName2 的回調(diào)業(yè)務(wù)

kis-flow/test/faas/faas_demo2.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
    "kis-flow/log"
)

// type FaaS func(context.Context, Flow) error

func FuncDemo2Handler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call funcName2Handler ----")

    for index, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)

        conn, err := flow.GetConnector()
        if err != nil {
            log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): GetConnector err = %s\n", err.Error())
            return err
        }

        if conn.Call(ctx, flow, row) != nil {
            log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): Call err = %s\n", err.Error())
            return err
        }

        // 計算結(jié)果數(shù)據(jù)
        resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)

        // 提交結(jié)果數(shù)據(jù)
        _ = flow.CommitRow(resultStr)
    }

    return nil
}

FuncName2 是一個關(guān)聯(lián)Connector的業(yè)務(wù)骤铃。通過flow.GetConnector()可以得到Connector實例拉岁,然后可以通過執(zhí)行conn.Call()來執(zhí)行業(yè)務(wù)邏輯。

(3) FuncName3 的回調(diào)業(yè)務(wù)

kis-flow/test/faas/faas_demo3.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

// type FaaS func(context.Context, Flow) error

func FuncDemo3Handler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call funcName3Handler ----")

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return nil
}

(4) ConnName1 Init方法

kis-flow/test/caas/caas_init1.go

package caas

import (
    "fmt"
    "kis-flow/kis"
)

// type ConnInit func(conn Connector) error

func InitConnDemo1(connector kis.Connector) error {
    fmt.Println("===> Call Connector InitDemo1")
    //config info
    connConf := connector.GetConfig()

    fmt.Println(connConf)

    // init connector , 如 初始化數(shù)據(jù)庫鏈接等

    return nil
}

(5) ConnName1 的回調(diào)業(yè)務(wù)

kis-flow/test/caas/caas_demo1.go

package caas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

// type CaaS func(context.Context, Connector, Function, Flow, interface{}) error

func CaasDemoHanler1(ctx context.Context, conn kis.Connector, fn kis.Function, flow kis.Flow, args interface{}) error {
    fmt.Printf("===> In CaasDemoHanler1: flowName: %s, cName:%s, fnName:%s, mode:%s\n",
        flow.GetName(), conn.GetName(), fn.GetConfig().FName, fn.GetConfig().FMode)

    fmt.Printf("===> Call Connector CaasDemoHanler1, args from funciton: %s\n", args)

    return nil
}

5.6.3 運行

cd到kis-flow/test/下惰爬,執(zhí)行命令:

go test -test.v -test.paniconexit0 -test.run TestNewKisConnector

結(jié)果如下:

=== RUN   TestNewKisConnector
Add KisPool FuncName=funcName1
Add KisPool FuncName=funcName2
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{ ConnName1 0.0.0.0:9998 redis redis-key map[] [] [funcName2]}
context.Background
====> After CommitSrcData, flow_name = flowName1, flow_id = flow-e5bdad6cb44e47c4b51ffdd4f53148fe
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionC, flow = &{Id:flow-e5bdad6cb44e47c4b51ffdd4f53148fe Name:flowName1 Conf:0xc000026800 Funcs:map[func-66e2b0afa4e14d179aa94c357c412cf8:0xc00007c300 func-f0b4bebf87614828a9375d888c54d13b:0xc00007c2a0 func-f594da0e28da417db6b15ce9c9530f84:0xc00007c240] FlowHead:0xc00007c240 FlowTail:0xc00007c300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00007c240 ThisFunctionId:func-f594da0e28da417db6b15ce9c9530f84 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-66e2b0afa4e14d179aa94c357c412cf8:map[] func-f0b4bebf87614828a9375d888c54d13b:map[] func-f594da0e28da417db6b15ce9c9530f84:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]}

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-f594da0e28da417db6b15ce9c9530f84, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-f594da0e28da417db6b15ce9c9530f84, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-f594da0e28da417db6b15ce9c9530f84, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName1, flow_id = flow-e5bdad6cb44e47c4b51ffdd4f53148fe
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f594da0e28da417db6b15ce9c9530f84:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionS, flow = &{Id:flow-e5bdad6cb44e47c4b51ffdd4f53148fe Name:flowName1 Conf:0xc000026800 Funcs:map[func-66e2b0afa4e14d179aa94c357c412cf8:0xc00007c300 func-f0b4bebf87614828a9375d888c54d13b:0xc00007c2a0 func-f594da0e28da417db6b15ce9c9530f84:0xc00007c240] FlowHead:0xc00007c240 FlowTail:0xc00007c300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00007c2a0 ThisFunctionId:func-f0b4bebf87614828a9375d888c54d13b PrevFunctionId:func-f594da0e28da417db6b15ce9c9530f84 funcParams:map[func-66e2b0afa4e14d179aa94c357c412cf8:map[] func-f0b4bebf87614828a9375d888c54d13b:map[] func-f594da0e28da417db6b15ce9c9530f84:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f594da0e28da417db6b15ce9c9530f84:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]}

---> Call funcName2Handler ----
In FuncName = funcName2, FuncId = func-f0b4bebf87614828a9375d888c54d13b, row = data from funcName[funcName1], index = 0
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0
In FuncName = funcName2, FuncId = func-f0b4bebf87614828a9375d888c54d13b, row = data from funcName[funcName1], index = 1
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1
In FuncName = funcName2, FuncId = func-f0b4bebf87614828a9375d888c54d13b, row = data from funcName[funcName1], index = 2
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2
context.Background
 ====> After commitCurData, flow_name = flowName1, flow_id = flow-e5bdad6cb44e47c4b51ffdd4f53148fe
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f0b4bebf87614828a9375d888c54d13b:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f594da0e28da417db6b15ce9c9530f84:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionE, flow = &{Id:flow-e5bdad6cb44e47c4b51ffdd4f53148fe Name:flowName1 Conf:0xc000026800 Funcs:map[func-66e2b0afa4e14d179aa94c357c412cf8:0xc00007c300 func-f0b4bebf87614828a9375d888c54d13b:0xc00007c2a0 func-f594da0e28da417db6b15ce9c9530f84:0xc00007c240] FlowHead:0xc00007c240 FlowTail:0xc00007c300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00007c300 ThisFunctionId:func-66e2b0afa4e14d179aa94c357c412cf8 PrevFunctionId:func-f0b4bebf87614828a9375d888c54d13b funcParams:map[func-66e2b0afa4e14d179aa94c357c412cf8:map[] func-f0b4bebf87614828a9375d888c54d13b:map[] func-f594da0e28da417db6b15ce9c9530f84:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f0b4bebf87614828a9375d888c54d13b:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f594da0e28da417db6b15ce9c9530f84:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]}

---> Call funcName3Handler ----
In FuncName = funcName3, FuncId = func-66e2b0afa4e14d179aa94c357c412cf8, row = data from funcName[funcName2], index = 0
In FuncName = funcName3, FuncId = func-66e2b0afa4e14d179aa94c357c412cf8, row = data from funcName[funcName2], index = 1
In FuncName = funcName3, FuncId = func-66e2b0afa4e14d179aa94c357c412cf8, row = data from funcName[funcName2], index = 2
--- PASS: TestNewKisConnector (0.00s)
PASS

經(jīng)過仔細查看日志喊暖,得知Connector的Init被執(zhí)行,請Connector也在執(zhí)行FunctionName2期間也被同步執(zhí)行且有日志輸出撕瞧,結(jié)果和我們的預期一致陵叽。

5.7 【V0.4】源代碼

https://github.com/aceld/kis-flow/releases/tag/v0.4


作者:劉丹冰Aceld github: https://github.com/aceld
KisFlow開源項目地址:https://github.com/aceld/kis-flow


Golang框架實戰(zhàn)-KisFlow流式計算框架專欄

Golang框架實戰(zhàn)-KisFlow流式計算框架(1)-概述
Golang框架實戰(zhàn)-KisFlow流式計算框架(2)-項目構(gòu)建/基礎(chǔ)模塊-(上)
Golang框架實戰(zhàn)-KisFlow流式計算框架(3)-項目構(gòu)建/基礎(chǔ)模塊-(下)
Golang框架實戰(zhàn)-KisFlow流式計算框架(4)-數(shù)據(jù)流
Golang框架實戰(zhàn)-KisFlow流式計算框架(5)-Function調(diào)度
Golang框架實戰(zhàn)-KisFlow流式計算框架(6)-Connector

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市风范,隨后出現(xiàn)的幾起案子咨跌,更是在濱河造成了極大的恐慌,老刑警劉巖硼婿,帶你破解...
    沈念sama閱讀 218,682評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件锌半,死亡現(xiàn)場離奇詭異,居然都是意外死亡寇漫,警方通過查閱死者的電腦和手機刊殉,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來州胳,“玉大人记焊,你說我怎么就攤上這事∷ㄗ玻” “怎么了遍膜?”我有些...
    開封第一講書人閱讀 165,083評論 0 355
  • 文/不壞的土叔 我叫張陵碗硬,是天一觀的道長。 經(jīng)常有香客問我瓢颅,道長恩尾,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,763評論 1 295
  • 正文 為了忘掉前任挽懦,我火速辦了婚禮翰意,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘信柿。我一直安慰自己冀偶,他們只是感情好,可當我...
    茶點故事閱讀 67,785評論 6 392
  • 文/花漫 我一把揭開白布渔嚷。 她就那樣靜靜地躺著进鸠,像睡著了一般。 火紅的嫁衣襯著肌膚如雪圃伶。 梳的紋絲不亂的頭發(fā)上堤如,一...
    開封第一講書人閱讀 51,624評論 1 305
  • 那天,我揣著相機與錄音窒朋,去河邊找鬼搀罢。 笑死,一個胖子當著我的面吹牛侥猩,可吹牛的內(nèi)容都是我干的榔至。 我是一名探鬼主播,決...
    沈念sama閱讀 40,358評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼欺劳,長吁一口氣:“原來是場噩夢啊……” “哼唧取!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起划提,我...
    開封第一講書人閱讀 39,261評論 0 276
  • 序言:老撾萬榮一對情侶失蹤枫弟,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后鹏往,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體淡诗,經(jīng)...
    沈念sama閱讀 45,722評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年伊履,在試婚紗的時候發(fā)現(xiàn)自己被綠了韩容。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,030評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡唐瀑,死狀恐怖群凶,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情哄辣,我是刑警寧澤请梢,帶...
    沈念sama閱讀 35,737評論 5 346
  • 正文 年R本政府宣布赠尾,位于F島的核電站,受9級特大地震影響毅弧,放射性物質(zhì)發(fā)生泄漏萍虽。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,360評論 3 330
  • 文/蒙蒙 一形真、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧超全,春花似錦咆霜、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至疏遏,卻和暖如春脉课,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背财异。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評論 1 270
  • 我被黑心中介騙來泰國打工倘零, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人戳寸。 一個月前我還...
    沈念sama閱讀 48,237評論 3 371
  • 正文 我出身青樓呈驶,卻偏偏與公主長得像,于是被迫代替她去往敵國和親疫鹊。 傳聞我的和親對象是個殘疾皇子袖瞻,可洞房花燭夜當晚...
    茶點故事閱讀 44,976評論 2 355

推薦閱讀更多精彩內(nèi)容