Golang框架實戰(zhàn)-KisFlow流式計算框架專欄
Golang框架實戰(zhàn)-KisFlow流式計算框架(1)-概述
Golang框架實戰(zhàn)-KisFlow流式計算框架(2)-項目構(gòu)建/基礎(chǔ)模塊-(上)
Golang框架實戰(zhàn)-KisFlow流式計算框架(3)-項目構(gòu)建/基礎(chǔ)模塊-(下)
Golang框架實戰(zhàn)-KisFlow流式計算框架(4)-數(shù)據(jù)流
Golang框架實戰(zhàn)-KisFlow流式計算框架(5)-Function調(diào)度
Golang框架實戰(zhàn)-KisFlow流式計算框架(6)-Connector
本章將設(shè)計KisFlow的Connector模塊,期功能及作用主要為掛載在某個Function下,執(zhí)行第三方存儲引擎的邏輯档桃。
5.1 Connector定義
KisFlow中提供Connector寂殉,來給開發(fā)者定義第三方存儲引擎的自定義讀寫插件模式。如果數(shù)據(jù)流的數(shù)據(jù)需要臨時從某引擎去讀或者需要存儲到某個存儲引擎中,可以通過Connector來編寫相對應的讀寫邏輯,并且通過配置,自定義掛載在Flow中的某個Function中播赁。Connector也是靈活配置的。這樣具有相同邏輯的存儲算法可以在多個Function中進行復用吼渡。
5.1.1 Connector的抽象層定義
在kis-flow/kis/
創(chuàng)建connector.go
文件容为,這里用來定義Connector的抽象接口,如下:
kis-flow/kis/connector.go
package kis
import (
"context"
"kis-flow/config"
)
type Connector interface {
// Init 初始化Connector所關(guān)聯(lián)的存儲引擎鏈接等
Init() error
// Call 調(diào)用Connector 外掛存儲邏輯的讀寫操作
Call(ctx context.Context, flow Flow, args interface{}) error
// GetId 獲取Connector的ID
GetId() string
// GetName 獲取Connector的名稱
GetName() string
// GetConfig 獲取Connector的配置信息
GetConfig() *config.KisConnConfig
}
Connector
目前階段提供的主要接口有兩個:
-
Init()
主要為當前Connector所關(guān)聯(lián)的第三方存儲引擎的初始化邏輯寺酪,如創(chuàng)建鏈接登操作坎背,Init在Connector實例的生命周期只會被執(zhí)行一次。 -
Call()
主要為Connector的調(diào)度入口寄雀,相關(guān)存儲的讀寫自定義邏輯是通過Call()方法來觸發(fā)調(diào)度得滤,具體的回調(diào)函數(shù)原型在Router
模塊定義。
5.1.2 Connector相關(guān)路由成員類型定義
通過上述的接口盒犹,我們得知懂更,一個Connector實例要配置兩個自定義方法,一個通過Init() 接口來調(diào)用急膀,一個通過Call() 接口來調(diào)用沮协。下面就需要對這兩種回調(diào)原型做定義。
(1) Connector Init
kis-flow/kis/router.go
/*
Connector Init
*/
// ConnInit Connector 第三方掛載存儲初始化
type ConnInit func(conn Connector) error
// connInitRouter
//key:
type connInitRouter map[string]ConnInit
-
ConnInit
為初始化回調(diào)函數(shù)原型卓嫂,參數(shù)為當前Connector實例指針皂股。 -
connInitRouter
為管理ConnInit
的路由,key為ConnName命黔。
(2)Connector Call
kis-flow/kis/router.go
/*
Connector Call
*/
// CaaS Connector的存儲讀取業(yè)務(wù)實現(xiàn)
type CaaS func(context.Context, Connector, Function, Flow, interface{}) error
// connFuncRouter 通過FunctionName索引到CaaS回調(diào)存儲業(yè)務(wù)的映射關(guān)系
// key: Function Name
// value: Connector的存儲讀取業(yè)務(wù)實現(xiàn)
type connFuncRouter map[string]CaaS
-
CaaS
為Connector執(zhí)行存儲讀寫邏輯的自定義回調(diào)函數(shù)原型,參數(shù)為Connector就斤、Function悍募、Flow指針,開發(fā)者可以通過這三個實例得到業(yè)務(wù)想要的一些參數(shù)洋机。最后一個參數(shù)為自定義參數(shù)坠宴,在Fcuntion調(diào)度Connector的時候,可以由開發(fā)者自定義輸入绷旗。 -
connFuncRouter
為管理CaaS
的路由喜鼓,注意:key為當前Connector所掛載的Function Name副砍。所以在調(diào)度Connector Call必須是由Function進行調(diào)度。
這里由于Connector只有Function位Save
或者Load
模式才能夠掛載庄岖,為了今后方便統(tǒng)計或者索引豁翎,還需要針對connFuncRouter
進行分組,分成Save組和Load組隅忿。下面針對這兩個組進行成員映射關(guān)系類型的定義心剥,如下:
kis-flow/kis/router.go
// connSL 通過KisMode 將connFuncRouter分為兩個子樹
// key: Function KisMode S/L
// value: NsConnRouter
type connSL map[common.KisMode]connFuncRouter
// connTree
// key: Connector Name
// value: connSL 二級樹
type connTree map[string]connSL
-
connSL
是根據(jù)KisMode分的組,成員為之前的Connector的Call路由器背桐。 -
connTree
是通過Connector Name進行索引的映射管理优烧,通過Connector Name+Function Mode + Function Name可以確定調(diào)度的Connector Call函數(shù)。
5.2 Connector路由管理
上一節(jié)對Connector需要的路由管理類型做了定義链峭,接下來需要將這些路由的添加與調(diào)度進行管理畦娄。
Connector的路由管理和Function一樣,統(tǒng)一被KisPool模塊進行管理弊仪。
5.2.1 KisPool新增Connector相關(guān)路由管理成員
(1) 添加成員
kis-flow/kis/pool.go
// kisPool 用于管理全部的Function和Flow配置的池子
type kisPool struct {
fnRouter funcRouter // 全部的Function管理路由
fnLock sync.RWMutex // fnRouter 鎖
flowRouter flowRouter // 全部的flow對象
flowLock sync.RWMutex // flowRouter 鎖
// +++++++++++++++++
cInitRouter connInitRouter // 全部的Connector初始化路由
ciLock sync.RWMutex // cInitRouter 鎖
cTree connTree //全部Connector管理路由
connectors map[string]Connector // 全部的Connector對象
cLock sync.RWMutex // cTree 鎖
// +++++++++++++++++
}
(2) 相關(guān)Map變量初始化
kis-flow/kis/pool.go
// Pool 單例構(gòu)造
func Pool() *kisPool {
_poolOnce.Do(func() {
//創(chuàng)建kisPool對象
_pool = new(kisPool)
// fnRouter初始化
_pool.fnRouter = make(funcRouter)
// flowRouter初始化
_pool.flowRouter = make(flowRouter)
// +++++++++++++++++++++++++
// connTree初始化
_pool.cTree = make(connTree)
_pool.cInitRouter = make(connInitRouter)
_pool.connectors = make(map[string]Connector)
// +++++++++++++++++++++++++
})
return _pool
}
(3) 注冊Connector Init方法
kis-flow/kis/pool.go
// CaaSInit 注冊Connector初始化業(yè)務(wù)
func (pool *kisPool) CaaSInit(cname string, c ConnInit) {
pool.ciLock.Lock() // 寫鎖
defer pool.ciLock.Unlock()
if _, ok := pool.cInitRouter[cname]; !ok {
pool.cInitRouter[cname] = c
} else {
errString := fmt.Sprintf("KisPool Reg CaaSInit Repeat CName=%s\n", cname)
panic(errString)
}
log.Logger().InfoF("Add KisPool CaaSInit CName=%s", cname)
}
(4) 執(zhí)行Connector Init方法
kis-flow/kis/pool.go
// CallConnInit 調(diào)度 ConnInit
func (pool *kisPool) CallConnInit(conn Connector) error {
pool.ciLock.RLock() // 讀鎖
defer pool.ciLock.RUnlock()
init, ok := pool.cInitRouter[conn.GetName()]
if !ok {
panic(errors.New(fmt.Sprintf("init connector cname = %s not reg..", conn.GetName())))
}
return init(conn)
}
邏輯很簡單熙卡,先加鎖保護,然后進行成員key/value鍵值對添加撼短。這里由于是路由動作再膳,添加失敗則直接panic()退出進程。
(5) 注冊Connector Call方法
kis-flow/kis/pool.go
// CaaS 注冊Connector Call業(yè)務(wù)
func (pool *kisPool) CaaS(cname string, fname string, mode common.KisMode, c CaaS) {
pool.cLock.Lock() // 寫鎖
defer pool.cLock.Unlock()
if _, ok := pool.cTree[cname]; !ok {
//cid 首次注冊曲横,不存在喂柒,創(chuàng)建二級樹NsConnSL
pool.cTree[cname] = make(connSL)
//初始化各類型FunctionMode
pool.cTree[cname][common.S] = make(connFuncRouter)
pool.cTree[cname][common.L] = make(connFuncRouter)
}
if _, ok := pool.cTree[cname][mode][fname]; !ok {
pool.cTree[cname][mode][fname] = c
} else {
errString := fmt.Sprintf("CaaS Repeat CName=%s, FName=%s, Mode =%s\n", cname, fname, mode)
panic(errString)
}
log.Logger().InfoF("Add KisPool CaaS CName=%s, FName=%s, Mode =%s", cname, fname, mode)
}
KisPool的CaaS
方法就是注冊自己的Connector連接器的邏輯處理回調(diào)函數(shù),注冊的時候禾嫉,會通過傳遞進來的參數(shù)將函數(shù)注冊到相對應的路由分組中去灾杰。
(6) 執(zhí)行Connector Call方法
kis-flow/kis/pool.go
// CallConnector 調(diào)度 Connector
func (pool *kisPool) CallConnector(ctx context.Context, flow Flow, conn Connector, args interface{}) error {
fn := flow.GetThisFunction()
fnConf := fn.GetConfig()
mode := common.KisMode(fnConf.FMode)
if callback, ok := pool.cTree[conn.GetName()][mode][fnConf.FName]; ok {
return callback(ctx, conn, fn, flow, args)
}
log.Logger().ErrorFX(ctx, "CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.\n", conn.GetName(), fnConf.FName, mode)
return errors.New(fmt.Sprintf("CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.", conn.GetName(), fnConf.FName, mode))
}
CallConnector
則是做通過ConnectorName、Function Mode熙参、Function Name做索引找到對應的CaaS函數(shù)艳吠,并且執(zhí)行。
5.3 KisConnector
接下來我們來按照Connector的抽象層孽椰,來定義且實現(xiàn)KisConnector模塊昭娩。在kis-flow/conn/
目錄下創(chuàng)建kis_connector.go
文件。
5.3.1 KisConnector 定義
kis-flow/conn/kis_connector.go
package conn
import (
"context"
"kis-flow/common"
"kis-flow/config"
"kis-flow/id"
"kis-flow/kis"
"sync"
)
type KisConnector struct {
// Connector ID
CId string
// Connector Name
CName string
// Connector Config
Conf *config.KisConnConfig
// Connector Init
onceInit sync.Once
}
KisConnector
除了標識實例的KisID 還有 名稱外黍匾,還包含了當前KisConnector的配置信息KisConnConfig
栏渺,這里面具有一個sync.Once成員,這個是用來控制Connector Init在生命周期只被執(zhí)行一次的限定锐涯,稍后會提到磕诊。
5.3.1 KisConnector 構(gòu)造方法
kis-flow/conn/kis_connector.go
// NewKisConnector 根據(jù)配置策略創(chuàng)建一個KisConnector
func NewKisConnector(config *config.KisConnConfig) *KisConnector {
conn := new(KisConnector)
conn.CId = id.KisID(common.KisIdTypeConnnector)
conn.CName = config.CName
conn.Conf = config
return conn
}
創(chuàng)建一個KisConnector實例是需要先有KisConnConfig的配置信息。
5.3.2 實現(xiàn)Connector接口
kis-flow/conn/kis_connector.go
// Init 初始化Connector所關(guān)聯(lián)的存儲引擎鏈接等
func (conn *KisConnector) Init() error {
var err error
//一個Connector只能執(zhí)行初始化業(yè)務(wù)一次
conn.onceInit.Do(func() {
err = kis.Pool().CallConnInit(conn)
})
return err
}
// Call 調(diào)用Connector 外掛存儲邏輯的讀寫操作
func (conn *KisConnector) Call(ctx context.Context, flow kis.Flow, args interface{}) error {
if err := kis.Pool().CallConnector(ctx, flow, conn, args); err != nil {
return err
}
return nil
}
func (conn *KisConnector) GetName() string {
return conn.CName
}
func (conn *KisConnector) GetConfig() *config.KisConnConfig {
return conn.Conf
}
func (conn *KisConnector) GetId() string {
return conn.CId
}
-
Init()
方法通過Once來限定調(diào)用次數(shù),最終會通過KisPool來進行路由映射且調(diào)度霎终。 -
Call()
同樣通過KisPool進行調(diào)度滞磺。
那么KisConnector的Init()和Call()在什么時候會被調(diào)用呢? 接下來我們需要實現(xiàn)KisConnConfig莱褒,將Connector的層級關(guān)系與Function和Flow進行關(guān)聯(lián)击困。
5.4 KisConnConfig配置信息
在NewKisConnector()
形參為KisConnConfig,所以開發(fā)者在創(chuàng)建一個Connector實例保礼,首先要先創(chuàng)建一個Connector的配置信息KisConnConfig沛励。在V0.1版本中,我們已經(jīng)實現(xiàn)了KisConnConfig
模塊的定義以及創(chuàng)建炮障,代碼如下:
kis-flow/config/kis_conn_config.go
package config
import (
"errors"
"fmt"
"kis-flow/common"
)
// KisConnConfig KisConnector 策略配置
type KisConnConfig struct {
//配置類型
KisType string `yaml:"kistype"`
//唯一描述標識
CName string `yaml:"cname"`
//基礎(chǔ)存儲媒介地址
AddrString string `yaml:"addrs"`
//存儲媒介引擎類型"Mysql" "Redis" "Kafka"等
Type common.KisConnType `yaml:"type"`
//一次存儲的標識:如Redis為Key名稱目派、Mysql為Table名稱,Kafka為Topic名稱等
Key string `yaml:"key"`
//配置信息中的自定義參數(shù)
Params map[string]string `yaml:"params"`
//存儲讀取所綁定的NsFuncionID
Load []string `yaml:"load"`
Save []string `yaml:"save"`
}
// NewConnConfig 創(chuàng)建一個KisConnector策略配置對象, 用于描述一個KisConnector信息
func NewConnConfig(cName string, addr string, t common.KisConnType, key string, param FParam) *KisConnConfig {
strategy := new(KisConnConfig)
strategy.CName = cName
strategy.AddrString = addr
strategy.Type = t
strategy.Key = key
strategy.Params = param
return strategy
}
// WithFunc Connector與Function進行關(guān)系綁定
func (cConfig *KisConnConfig) WithFunc(fConfig *KisFuncConfig) error {
switch common.KisMode(fConfig.FMode) {
case common.S:
cConfig.Save = append(cConfig.Save, fConfig.FName)
case common.L:
cConfig.Load = append(cConfig.Load, fConfig.FName)
default:
return errors.New(fmt.Sprintf("Wrong KisMode %s", fConfig.FMode))
}
return nil
}
5.4.1 KisFuncConfig 添加KisConnConfig成員
首先我們給KisFuncConfig 添加KisConnConfig成員。
kis-flow/config/kis_func_config.go
// KisFuncConfig 一個KisFunction策略配置
type KisFuncConfig struct {
KisType string `yaml:"kistype"`
FName string `yaml:"fname"`
FMode string `yaml:"fmode"`
Source KisSource `yaml:"source"`
Option KisFuncOption `yaml:"option"`
// ++++++++++
connConf *KisConnConfig
}
然后提供對該成員的添加和讀取方法
kis-flow/config/kis_func_config.go
func (fConf *KisFuncConfig) AddConnConfig(cConf *KisConnConfig) error {
if cConf == nil {
return errors.New("KisConnConfig is nil")
}
// Function需要和Connector進行關(guān)聯(lián)
fConf.connConf = cConf
// Connector需要和Function進行關(guān)聯(lián)
_ = cConf.WithFunc(fConf)
return nil
}
func (fConf *KisFuncConfig) GetConnConfig() (*KisConnConfig, error) {
if fConf.connConf == nil {
return nil, errors.New("KisFuncConfig.connConf not set")
}
return fConf.connConf, nil
}
這樣我們可以通過Funciton的配置信息就能夠查到相關(guān)聯(lián)的Connector配置信息胁赢。
5.5 Function/Flow與Connector關(guān)聯(lián)
5.5.1 Function與Connector關(guān)聯(lián)
kis-flow/kis/function.go
package kis
import (
"context"
"kis-flow/config"
)
// Function 流式計算基礎(chǔ)計算模塊企蹭,KisFunction是一條流式計算的基本計算邏輯單元,
// 任意個KisFunction可以組合成一個KisFlow
type Function interface {
// Call 執(zhí)行流式計算邏輯
Call(ctx context.Context, flow Flow) error
// SetConfig 給當前Function實例配置策略
SetConfig(s *config.KisFuncConfig) error
// GetConfig 獲取當前Function實例配置策略
GetConfig() *config.KisFuncConfig
// SetFlow 給當前Function實例設(shè)置所依賴的Flow實例
SetFlow(f Flow) error
// GetFlow 獲取當前Functioin實力所依賴的Flow
GetFlow() Flow
// ++++++++++++++++++++
// AddConnector 給當前Function實例添加一個Connector
AddConnector(conn Connector) error
// GetConnector 獲取當前Function實例所關(guān)聯(lián)的Connector
GetConnector() Connector
// ++++++++++++++++++++
// CreateId 給當前Funciton實力生成一個隨機的實例KisID
CreateId()
// GetId 獲取當前Function的FID
GetId() string
// GetPrevId 獲取當前Function上一個Function節(jié)點FID
GetPrevId() string
// GetNextId 獲取當前Function下一個Function節(jié)點FID
GetNextId() string
// Next 返回下一層計算流Function智末,如果當前層為最后一層谅摄,則返回nil
Next() Function
// Prev 返回上一層計算流Function,如果當前層為最后一層系馆,則返回nil
Prev() Function
// SetN 設(shè)置下一層Function實例
SetN(f Function)
// SetP 設(shè)置上一層Function實例
SetP(f Function)
}
接下來我們在BaseFunction
中實現(xiàn):
kis-flow/function/kis_base_function.go
type BaseFunction struct {
// Id , KisFunction的實例ID送漠,用于KisFlow內(nèi)部區(qū)分不同的實例對象
Id string
Config *config.KisFuncConfig
// flow
flow kis.Flow //上下文環(huán)境KisFlow
// ++++++++++++++
// connector
connector kis.Connector
// ++++++++++++++
// link
N kis.Function //下一個流計算Function
P kis.Function //上一個流計算Function
}
// ........
// ........
// AddConnector 給當前Function實例添加一個Connector
func (base *BaseFunction) AddConnector(conn kis.Connector) error {
if conn == nil {
return errors.New("conn is nil")
}
base.connector = conn
return nil
}
// GetConnector 獲取當前Function實例所關(guān)聯(lián)的Connector
func (base *BaseFunction) GetConnector() kis.Connector {
return base.connector
}
這樣一個Function實例就可以獲取到Connector實例的信息。
5.5.2 Flow與Connector關(guān)聯(lián)
同樣由蘑,F(xiàn)low中也需要獲取到Connector的信息闽寡,這里面也需要將Flow和Connector的關(guān)系進行簡單的關(guān)聯(lián)。
kis-flow/kis/flow.go
package kis
import (
"context"
"kis-flow/common"
"kis-flow/config"
)
type Flow interface {
// Run 調(diào)度Flow尼酿,依次調(diào)度Flow中的Function并且執(zhí)行
Run(ctx context.Context) error
// Link 將Flow中的Function按照配置文件中的配置進行連接
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
// CommitRow 提交Flow數(shù)據(jù)到即將執(zhí)行的Function層
CommitRow(row interface{}) error
// Input 得到flow當前執(zhí)行Function的輸入源數(shù)據(jù)
Input() common.KisRowArr
// GetName 得到Flow的名稱
GetName() string
// GetThisFunction 得到當前正在執(zhí)行的Function
GetThisFunction() Function
// GetThisFuncConf 得到當前正在執(zhí)行的Function的配置
GetThisFuncConf() *config.KisFuncConfig
// GetConnector 得到當前正在執(zhí)行的Function的Connector
// +++++++++++++++++++++++++++++++++
GetConnector() (Connector, error)
// GetConnConf 得到當前正在執(zhí)行的Function的Connector的配置
GetConnConf() (*config.KisConnConfig, error)
// +++++++++++++++++++++++++++++++++
}
新增 GetConnector()
和 GetConnConf()
接口爷狈,分別獲取Connector實例和Connector配置。
之后裳擎,在KisFlow中實現(xiàn)這兩個方法涎永。
kis-flow/flow/kis_flow.go
// GetConnector 得到當前正在執(zhí)行的Function的Connector
func (flow *KisFlow) GetConnector() (kis.Connector, error) {
if conn := flow.ThisFunction.GetConnector(); conn != nil {
return conn, nil
} else {
return nil, errors.New("GetConnector(): Connector is nil")
}
}
// GetConnConf 得到當前正在執(zhí)行的Function的Connector的配置
func (flow *KisFlow) GetConnConf() (*config.KisConnConfig, error) {
if conn := flow.ThisFunction.GetConnector(); conn != nil {
return conn.GetConfig(), nil
} else {
return nil, errors.New("GetConnConf(): Connector is nil")
}
}
實際上依然是通過當前Flow正在執(zhí)行的Function來獲取到Connector信息。
5.5.3 Function鏈接Connector
按照之前的配置文件定義鹿响,function的yaml配置文件如下:
nstype: func
fname: 測試Nsfunction_L1
fmode: Load
source:
name: 被校驗的測試數(shù)據(jù)源1-學生班級維度
must:
- stuid
- classid
option:
cname: 測試NsConnector_2
這里面有一個Option羡微,其中有一個成員cname,如果當前Function配置了Connector惶我,則需要在當前的Option中配置cname拷淘,并填寫Connector的名稱cname。
那么在當Flow去鏈接一個Funciton的時候指孤,在Function的實例被創(chuàng)建之后,可以通過Function的配置得到是否攜帶Connector,如果攜帶則也要新建Connector實例恃轩,代碼如下:
kis-flow/flow/kis_flow.go
// Link 將Function鏈接到Flow中
// fConf: 當前Function策略
// fParams: 當前Flow攜帶的Function動態(tài)參數(shù)
func (flow *KisFlow) Link(fConf *config.KisFuncConfig, fParams config.FParam) error {
// 創(chuàng)建Function實例
f := function.NewKisFunction(flow, fConf)
// ++++++++++++++++++++++++++++++
if fConf.Option.CName != "" {
// 當前Function有Connector關(guān)聯(lián)结洼,需要初始化Connector實例
// 獲取Connector配置
connConfig, err := fConf.GetConnConfig()
if err != nil {
panic(err)
}
// 創(chuàng)建Connector對象
connector := conn.NewKisConnector(connConfig)
// 初始化Connector, 執(zhí)行Connector Init 方法
if err = connector.Init(); err != nil {
panic(err)
}
// 關(guān)聯(lián)Function實例和Connector實例關(guān)系
_ = f.AddConnector(connector)
}
// ++++++++++++++++++++++++++++++
// Flow 添加 Function
if err := flow.appendFunc(f, fParams); err != nil {
return err
}
return nil
}
這樣一個Connector實例就被創(chuàng)建了。
5.6 KisConnector單元測試
接下來來對KisConnector做單元測試叉跛。
5.6.1 單元測試
創(chuàng)建kis-flow/test/kis_connector_test.go
文件:
package test
import (
"context"
"kis-flow/common"
"kis-flow/config"
"kis-flow/flow"
"kis-flow/kis"
"kis-flow/test/caas"
"kis-flow/test/faas"
"testing"
)
func TestNewKisConnector(t *testing.T) {
ctx := context.Background()
// 0. 注冊Function 回調(diào)業(yè)務(wù)
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
// 0. 注冊ConnectorInit 和 Connector 回調(diào)業(yè)務(wù)
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
// 1. 創(chuàng)建3個KisFunction配置實例, 其中myFuncConfig2 有Connector配置
source1 := config.KisSource{
Name: "公眾號抖音商城戶訂單數(shù)據(jù)",
Must: []string{"order_id", "user_id"},
}
source2 := config.KisSource{
Name: "用戶訂單錯誤率",
Must: []string{"order_id", "user_id"},
}
myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source1, nil)
if myFuncConfig1 == nil {
panic("myFuncConfig1 is nil")
}
option := config.KisFuncOption{
CName: "ConnName1",
}
myFuncConfig2 := config.NewFuncConfig("funcName2", common.S, &source2, &option)
if myFuncConfig2 == nil {
panic("myFuncConfig2 is nil")
}
myFuncConfig3 := config.NewFuncConfig("funcName3", common.E, &source2, nil)
if myFuncConfig3 == nil {
panic("myFuncConfig3 is nil")
}
// 2. 創(chuàng)建一個KisConnector配置實例
myConnConfig1 := config.NewConnConfig("ConnName1", "0.0.0.0:9998", common.REDIS, "redis-key", nil)
if myConnConfig1 == nil {
panic("myConnConfig1 is nil")
}
// 3. 將KisConnector配置實例綁定到KisFunction配置實例上
_ = myFuncConfig2.AddConnConfig(myConnConfig1)
// 4. 創(chuàng)建一個 KisFlow 配置實例
myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)
// 5. 創(chuàng)建一個KisFlow對象
flow1 := flow.NewKisFlow(myFlowConfig1)
// 6. 拼接Functioin 到 Flow 上
if err := flow1.Link(myFuncConfig1, nil); err != nil {
panic(err)
}
if err := flow1.Link(myFuncConfig2, nil); err != nil {
panic(err)
}
if err := flow1.Link(myFuncConfig3, nil); err != nil {
panic(err)
}
// 7. 提交原始數(shù)據(jù)
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 8. 執(zhí)行flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}
注意這里面funcName2 為關(guān)聯(lián)Connector的Function松忍。 所以在創(chuàng)建Function2的Config的時候提供了Option信息,且提供了關(guān)聯(lián)的Connector名稱筷厘。
5.6.2 Function Callback 與 Conn CallBack
為了方便管理CallBack業(yè)務(wù)鸣峭,我們在kis-flow/test/
目錄下,分別創(chuàng)建kis-flow/test/faas/
和kis-flow/test/caas/
目錄酥艳。
分別建立如下文件摊溶,每一個文件寫一種自定義業(yè)務(wù)。
├── caas
│ ├── caas_demo1.go
│ └── caas_init1.go
├── faas
│ ├── faas_demo1.go
│ ├── faas_demo2.go
│ └── faas_demo3.go
(1) FuncName1 的回調(diào)業(yè)務(wù)
kis-flow/test/faas/faas_demo1.go
package faas
import (
"context"
"fmt"
"kis-flow/kis"
)
// type FaaS func(context.Context, Flow) error
func FuncDemo1Handler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call funcName1Handler ----")
for index, row := range flow.Input() {
// 打印數(shù)據(jù)
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
// 計算結(jié)果數(shù)據(jù)
resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)
// 提交結(jié)果數(shù)據(jù)
_ = flow.CommitRow(resultStr)
}
return nil
}
這個作為我們第一個Function充石,打印數(shù)據(jù)莫换,并且再生產(chǎn)一些數(shù)據(jù)。
(2) FuncName2 的回調(diào)業(yè)務(wù)
kis-flow/test/faas/faas_demo2.go
package faas
import (
"context"
"fmt"
"kis-flow/kis"
"kis-flow/log"
)
// type FaaS func(context.Context, Flow) error
func FuncDemo2Handler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call funcName2Handler ----")
for index, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
conn, err := flow.GetConnector()
if err != nil {
log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): GetConnector err = %s\n", err.Error())
return err
}
if conn.Call(ctx, flow, row) != nil {
log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): Call err = %s\n", err.Error())
return err
}
// 計算結(jié)果數(shù)據(jù)
resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)
// 提交結(jié)果數(shù)據(jù)
_ = flow.CommitRow(resultStr)
}
return nil
}
FuncName2 是一個關(guān)聯(lián)Connector的業(yè)務(wù)骤铃。通過flow.GetConnector()
可以得到Connector實例拉岁,然后可以通過執(zhí)行conn.Call()
來執(zhí)行業(yè)務(wù)邏輯。
(3) FuncName3 的回調(diào)業(yè)務(wù)
kis-flow/test/faas/faas_demo3.go
package faas
import (
"context"
"fmt"
"kis-flow/kis"
)
// type FaaS func(context.Context, Flow) error
func FuncDemo3Handler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call funcName3Handler ----")
for _, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
}
return nil
}
(4) ConnName1 Init方法
kis-flow/test/caas/caas_init1.go
package caas
import (
"fmt"
"kis-flow/kis"
)
// type ConnInit func(conn Connector) error
func InitConnDemo1(connector kis.Connector) error {
fmt.Println("===> Call Connector InitDemo1")
//config info
connConf := connector.GetConfig()
fmt.Println(connConf)
// init connector , 如 初始化數(shù)據(jù)庫鏈接等
return nil
}
(5) ConnName1 的回調(diào)業(yè)務(wù)
kis-flow/test/caas/caas_demo1.go
package caas
import (
"context"
"fmt"
"kis-flow/kis"
)
// type CaaS func(context.Context, Connector, Function, Flow, interface{}) error
func CaasDemoHanler1(ctx context.Context, conn kis.Connector, fn kis.Function, flow kis.Flow, args interface{}) error {
fmt.Printf("===> In CaasDemoHanler1: flowName: %s, cName:%s, fnName:%s, mode:%s\n",
flow.GetName(), conn.GetName(), fn.GetConfig().FName, fn.GetConfig().FMode)
fmt.Printf("===> Call Connector CaasDemoHanler1, args from funciton: %s\n", args)
return nil
}
5.6.3 運行
cd到kis-flow/test/
下惰爬,執(zhí)行命令:
go test -test.v -test.paniconexit0 -test.run TestNewKisConnector
結(jié)果如下:
=== RUN TestNewKisConnector
Add KisPool FuncName=funcName1
Add KisPool FuncName=funcName2
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{ ConnName1 0.0.0.0:9998 redis redis-key map[] [] [funcName2]}
context.Background
====> After CommitSrcData, flow_name = flowName1, flow_id = flow-e5bdad6cb44e47c4b51ffdd4f53148fe
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
KisFunctionC, flow = &{Id:flow-e5bdad6cb44e47c4b51ffdd4f53148fe Name:flowName1 Conf:0xc000026800 Funcs:map[func-66e2b0afa4e14d179aa94c357c412cf8:0xc00007c300 func-f0b4bebf87614828a9375d888c54d13b:0xc00007c2a0 func-f594da0e28da417db6b15ce9c9530f84:0xc00007c240] FlowHead:0xc00007c240 FlowTail:0xc00007c300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00007c240 ThisFunctionId:func-f594da0e28da417db6b15ce9c9530f84 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-66e2b0afa4e14d179aa94c357c412cf8:map[] func-f0b4bebf87614828a9375d888c54d13b:map[] func-f594da0e28da417db6b15ce9c9530f84:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]}
---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-f594da0e28da417db6b15ce9c9530f84, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-f594da0e28da417db6b15ce9c9530f84, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-f594da0e28da417db6b15ce9c9530f84, row = This is Data3 from Test
context.Background
====> After commitCurData, flow_name = flowName1, flow_id = flow-e5bdad6cb44e47c4b51ffdd4f53148fe
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f594da0e28da417db6b15ce9c9530f84:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionS, flow = &{Id:flow-e5bdad6cb44e47c4b51ffdd4f53148fe Name:flowName1 Conf:0xc000026800 Funcs:map[func-66e2b0afa4e14d179aa94c357c412cf8:0xc00007c300 func-f0b4bebf87614828a9375d888c54d13b:0xc00007c2a0 func-f594da0e28da417db6b15ce9c9530f84:0xc00007c240] FlowHead:0xc00007c240 FlowTail:0xc00007c300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00007c2a0 ThisFunctionId:func-f0b4bebf87614828a9375d888c54d13b PrevFunctionId:func-f594da0e28da417db6b15ce9c9530f84 funcParams:map[func-66e2b0afa4e14d179aa94c357c412cf8:map[] func-f0b4bebf87614828a9375d888c54d13b:map[] func-f594da0e28da417db6b15ce9c9530f84:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f594da0e28da417db6b15ce9c9530f84:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]}
---> Call funcName2Handler ----
In FuncName = funcName2, FuncId = func-f0b4bebf87614828a9375d888c54d13b, row = data from funcName[funcName1], index = 0
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0
In FuncName = funcName2, FuncId = func-f0b4bebf87614828a9375d888c54d13b, row = data from funcName[funcName1], index = 1
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1
In FuncName = funcName2, FuncId = func-f0b4bebf87614828a9375d888c54d13b, row = data from funcName[funcName1], index = 2
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2
context.Background
====> After commitCurData, flow_name = flowName1, flow_id = flow-e5bdad6cb44e47c4b51ffdd4f53148fe
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f0b4bebf87614828a9375d888c54d13b:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f594da0e28da417db6b15ce9c9530f84:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionE, flow = &{Id:flow-e5bdad6cb44e47c4b51ffdd4f53148fe Name:flowName1 Conf:0xc000026800 Funcs:map[func-66e2b0afa4e14d179aa94c357c412cf8:0xc00007c300 func-f0b4bebf87614828a9375d888c54d13b:0xc00007c2a0 func-f594da0e28da417db6b15ce9c9530f84:0xc00007c240] FlowHead:0xc00007c240 FlowTail:0xc00007c300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00007c300 ThisFunctionId:func-66e2b0afa4e14d179aa94c357c412cf8 PrevFunctionId:func-f0b4bebf87614828a9375d888c54d13b funcParams:map[func-66e2b0afa4e14d179aa94c357c412cf8:map[] func-f0b4bebf87614828a9375d888c54d13b:map[] func-f594da0e28da417db6b15ce9c9530f84:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f0b4bebf87614828a9375d888c54d13b:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f594da0e28da417db6b15ce9c9530f84:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]}
---> Call funcName3Handler ----
In FuncName = funcName3, FuncId = func-66e2b0afa4e14d179aa94c357c412cf8, row = data from funcName[funcName2], index = 0
In FuncName = funcName3, FuncId = func-66e2b0afa4e14d179aa94c357c412cf8, row = data from funcName[funcName2], index = 1
In FuncName = funcName3, FuncId = func-66e2b0afa4e14d179aa94c357c412cf8, row = data from funcName[funcName2], index = 2
--- PASS: TestNewKisConnector (0.00s)
PASS
經(jīng)過仔細查看日志喊暖,得知Connector的Init被執(zhí)行,請Connector也在執(zhí)行FunctionName2期間也被同步執(zhí)行且有日志輸出撕瞧,結(jié)果和我們的預期一致陵叽。
5.7 【V0.4】源代碼
https://github.com/aceld/kis-flow/releases/tag/v0.4
作者:劉丹冰Aceld github: https://github.com/aceld
KisFlow開源項目地址:https://github.com/aceld/kis-flow
Golang框架實戰(zhàn)-KisFlow流式計算框架專欄
Golang框架實戰(zhàn)-KisFlow流式計算框架(1)-概述
Golang框架實戰(zhàn)-KisFlow流式計算框架(2)-項目構(gòu)建/基礎(chǔ)模塊-(上)
Golang框架實戰(zhàn)-KisFlow流式計算框架(3)-項目構(gòu)建/基礎(chǔ)模塊-(下)
Golang框架實戰(zhàn)-KisFlow流式計算框架(4)-數(shù)據(jù)流
Golang框架實戰(zhàn)-KisFlow流式計算框架(5)-Function調(diào)度
Golang框架實戰(zhàn)-KisFlow流式計算框架(6)-Connector