Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架專欄
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(1)-概述
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(2)-項(xiàng)目構(gòu)建/基礎(chǔ)模塊-(上)
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(3)-項(xiàng)目構(gòu)建/基礎(chǔ)模塊-(下)
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(4)-數(shù)據(jù)流
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(5)-Function調(diào)度
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(6)-Connector
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(7)-配置導(dǎo)入與導(dǎo)出
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(8)-KisFlow Action
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(9)-Cache/Params 數(shù)據(jù)緩存與數(shù)據(jù)參數(shù)
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(10)-Flow多副本
9.1 多副本能力
KisFlow如果在執(zhí)行流體中扫步,需要被多個(gè)Goroutine來(lái)并發(fā)使用,可能需要同一個(gè)配置的創(chuàng)建多個(gè)Flow來(lái)匹配多個(gè)并發(fā)的計(jì)算流,所以Flow需要一個(gè)創(chuàng)建副本的能力途事。本章將實(shí)現(xiàn)這部分的能力。
9.1.1 Flow新增接口
首先,給Flow的抽象層新增一個(gè)接口Fork()
,原型如下:
kis-flow/kis/flow.go
type Flow interface {
// Run 調(diào)度Flow情竹,依次調(diào)度Flow中的Function并且執(zhí)行
Run(ctx context.Context) error
// Link 將Flow中的Function按照配置文件中的配置進(jìn)行連接
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
// CommitRow 提交Flow數(shù)據(jù)到即將執(zhí)行的Function層
CommitRow(row interface{}) error
// Input 得到flow當(dāng)前執(zhí)行Function的輸入源數(shù)據(jù)
Input() common.KisRowArr
// GetName 得到Flow的名稱
GetName() string
// GetThisFunction 得到當(dāng)前正在執(zhí)行的Function
GetThisFunction() Function
// GetThisFuncConf 得到當(dāng)前正在執(zhí)行的Function的配置
GetThisFuncConf() *config.KisFuncConfig
// GetConnector 得到當(dāng)前正在執(zhí)行的Function的Connector
GetConnector() (Connector, error)
// GetConnConf 得到當(dāng)前正在執(zhí)行的Function的Connector的配置
GetConnConf() (*config.KisConnConfig, error)
// GetConfig 得到當(dāng)前Flow的配置
GetConfig() *config.KisFlowConfig
// GetFuncConfigByName 得到當(dāng)前Flow的配置
GetFuncConfigByName(funcName string) *config.KisFuncConfig
// Next 當(dāng)前Flow執(zhí)行到的Function進(jìn)入下一層Function所攜帶的Action動(dòng)作
Next(acts ...ActionFunc) error
// GetCacheData 得到當(dāng)前Flow的緩存數(shù)據(jù)
GetCacheData(key string) interface{}
// SetCacheData 設(shè)置當(dāng)前Flow的緩存數(shù)據(jù)
SetCacheData(key string, value interface{}, Exp time.Duration)
// GetMetaData 得到當(dāng)前Flow的臨時(shí)數(shù)據(jù)
GetMetaData(key string) interface{}
// SetMetaData 設(shè)置當(dāng)前Flow的臨時(shí)數(shù)據(jù)
SetMetaData(key string, value interface{})
// GetFuncParam 得到Flow的當(dāng)前正在執(zhí)行的Function的配置默認(rèn)參數(shù),取出一對(duì)key-value
GetFuncParam(key string) string
// GetFuncParamAll 得到Flow的當(dāng)前正在執(zhí)行的Function的配置默認(rèn)參數(shù)匀哄,取出全部Key-Value
GetFuncParamAll() config.FParam
// +++++++++++++++++++++++++
// Fork 得到Flow的一個(gè)副本(深拷貝)
Fork(ctx context.Context) Flow
}
Fork()
會(huì)根據(jù)一個(gè)已有的KisFlow實(shí)例秦效,完全克隆一個(gè)資源隔離的但是具有相同配置的KisFlow實(shí)例。
具體的實(shí)現(xiàn)方法如下:
kis-flow/flow/kis_flow.go
// Fork 得到Flow的一個(gè)副本(深拷貝)
func (flow *KisFlow) Fork(ctx context.Context) kis.Flow {
config := flow.Conf
// 通過(guò)之前的配置生成一個(gè)新的Flow
newFlow := NewKisFlow(config)
for _, fp := range flow.Conf.Flows {
if _, ok := flow.funcParams[flow.Funcs[fp.FuncName].GetId()]; !ok {
//當(dāng)前function沒(méi)有配置Params
newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), nil)
} else {
//當(dāng)前function有配置Params
newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), fp.Params)
}
}
log.Logger().DebugFX(ctx, "=====>Flow Fork, oldFlow.funcParams = %+v\n", flow.funcParams)
log.Logger().DebugFX(ctx, "=====>Flow Fork, newFlow.funcParams = %+v\n", newFlow.GetFuncParamsAllFuncs())
return newFlow
}
在Fork()
中涎嚼,首先會(huì)根據(jù)flow的配置信息阱州,重新創(chuàng)建一個(gè)KisFlow實(shí)例,并且將flow所關(guān)聯(lián)的Params等配置信息一同拷貝法梯,最后通過(guò)Link()
將新建的Function和Flow連接起來(lái)苔货。
上述代碼為了調(diào)試,給Flow新增了一個(gè)打印全部FuncParams信息的接口GetFuncParamsAllFuncs()
立哑,具體的實(shí)現(xiàn)方式如下:
kis-flow/kis/flow.go
type Flow interface {
// ... ...
// ... ...
// GetFuncParamsAllFuncs 得到Flow中所有Function的FuncParams蒲赂,取出全部Key-Value
GetFuncParamsAllFuncs() map[string]config.FParam
// ... ...
}
kis-flow/flow/kis_flow_data.go
// GetFuncParamsAllFuncs 得到Flow中所有Function的FuncParams,取出全部Key-Value
func (flow *KisFlow) GetFuncParamsAllFuncs() map[string]config.FParam {
flow.fplock.RLock()
defer flow.fplock.RUnlock()
return flow.funcParams
}
9.2 單元測(cè)試
下面我們來(lái)測(cè)試一個(gè)Fork能力刁憋,單元測(cè)試代碼如下:
kis-flow/test/kis_fork_test.go
func TestForkFlow(t *testing.T) {
ctx := context.Background()
// 0. 注冊(cè)Function 回調(diào)業(yè)務(wù)
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
// 0. 注冊(cè)ConnectorInit 和 Connector 回調(diào)業(yè)務(wù)
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
// 1. 加載配置文件并構(gòu)建Flow
if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
panic(err)
}
// 2. 獲取Flow
flow1 := kis.Pool().GetFlow("flowName1")
flow1Clone1 := flow1.Fork(ctx)
// 3. 提交原始數(shù)據(jù)
_ = flow1Clone1.CommitRow("This is Data1 from Test")
_ = flow1Clone1.CommitRow("This is Data2 from Test")
_ = flow1Clone1.CommitRow("This is Data3 from Test")
// 4. 執(zhí)行flow1
if err := flow1Clone1.Run(ctx); err != nil {
panic(err)
}
}
首先我們先創(chuàng)建flowName1的flow實(shí)例,然后通過(guò)fork()
得到flowClone1
木蹬,然后執(zhí)行flowClone1的調(diào)度流程至耻。
cd到kis-flow/test/
下執(zhí)行:
go test -test.v -test.paniconexit0 -test.run TestForkFlow
結(jié)果如下:
=== RUN TestForkFlow
Add KisPool FuncName=funcName1
Add KisPool FuncName=funcName2
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
context.Background
=====>Flow Fork, oldFlow.funcParams = map[func-6b00f430fe494302a384c2ae09eb019c:map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] func-bf9df5fc16684200b78f32985d073012:map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2] func-c0f1ae9850174f81b994a2e98fb34109:map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2]]
context.Background
=====>Flow Fork, newFlow.funcParams = map[func-614511f5142e4023b80373517f3ea0a7:map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] func-9406285e2fa94bd582dab4a875771a97:map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2] func-de7c4e4175b74a898cb43863e53b3215:map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2]]
context.Background
====> After CommitSrcData, flow_name = flowName1, flow_id = flow-38c362e52fee489db3af96ae7d83d56a
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
KisFunctionV, flow = &{Id:flow-38c362e52fee489db3af96ae7d83d56a Name:flowName1 Conf:0xc000153f80 Funcs:map[funcName1:0xc00014d180 funcName2:0xc00014d200 funcName3:0xc00014d300] FlowHead:0xc00014d180 FlowTail:0xc00014d300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00014d180 ThisFunctionId:func-9406285e2fa94bd582dab4a875771a97 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-614511f5142e4023b80373517f3ea0a7:map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] func-9406285e2fa94bd582dab4a875771a97:map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2] func-de7c4e4175b74a898cb43863e53b3215:map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc00011aae0 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}
---> Call funcName1Handler ----
Params = map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2]
In FuncName = funcName1, FuncId = func-9406285e2fa94bd582dab4a875771a97, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-9406285e2fa94bd582dab4a875771a97, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-9406285e2fa94bd582dab4a875771a97, row = This is Data3 from Test
context.Background
====> After commitCurData, flow_name = flowName1, flow_id = flow-38c362e52fee489db3af96ae7d83d56a
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-9406285e2fa94bd582dab4a875771a97:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionS, flow = &{Id:flow-38c362e52fee489db3af96ae7d83d56a Name:flowName1 Conf:0xc000153f80 Funcs:map[funcName1:0xc00014d180 funcName2:0xc00014d200 funcName3:0xc00014d300] FlowHead:0xc00014d180 FlowTail:0xc00014d300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00014d200 ThisFunctionId:func-de7c4e4175b74a898cb43863e53b3215 PrevFunctionId:func-9406285e2fa94bd582dab4a875771a97 funcParams:map[func-614511f5142e4023b80373517f3ea0a7:map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] func-9406285e2fa94bd582dab4a875771a97:map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2] func-de7c4e4175b74a898cb43863e53b3215:map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-9406285e2fa94bd582dab4a875771a97:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc00011aae0 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}
---> Call funcName2Handler ----
Params = map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2]
In FuncName = funcName2, FuncId = func-de7c4e4175b74a898cb43863e53b3215, row = data from funcName[funcName1], index = 0
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
Params = map[args1:value1 args2:value2]
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0
In FuncName = funcName2, FuncId = func-de7c4e4175b74a898cb43863e53b3215, row = data from funcName[funcName1], index = 1
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
Params = map[args1:value1 args2:value2]
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1
In FuncName = funcName2, FuncId = func-de7c4e4175b74a898cb43863e53b3215, row = data from funcName[funcName1], index = 2
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
Params = map[args1:value1 args2:value2]
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2
context.Background
====> After commitCurData, flow_name = flowName1, flow_id = flow-38c362e52fee489db3af96ae7d83d56a
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-9406285e2fa94bd582dab4a875771a97:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-de7c4e4175b74a898cb43863e53b3215:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]]
KisFunctionC, flow = &{Id:flow-38c362e52fee489db3af96ae7d83d56a Name:flowName1 Conf:0xc000153f80 Funcs:map[funcName1:0xc00014d180 funcName2:0xc00014d200 funcName3:0xc00014d300] FlowHead:0xc00014d180 FlowTail:0xc00014d300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00014d300 ThisFunctionId:func-614511f5142e4023b80373517f3ea0a7 PrevFunctionId:func-de7c4e4175b74a898cb43863e53b3215 funcParams:map[func-614511f5142e4023b80373517f3ea0a7:map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] func-9406285e2fa94bd582dab4a875771a97:map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2] func-de7c4e4175b74a898cb43863e53b3215:map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-9406285e2fa94bd582dab4a875771a97:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-de7c4e4175b74a898cb43863e53b3215:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc00011aae0 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}
---> Call funcName3Handler ----
Params = map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2]
In FuncName = funcName3, FuncId = func-614511f5142e4023b80373517f3ea0a7, row = data from funcName[funcName2], index = 0
In FuncName = funcName3, FuncId = func-614511f5142e4023b80373517f3ea0a7, row = data from funcName[funcName2], index = 1
In FuncName = funcName3, FuncId = func-614511f5142e4023b80373517f3ea0a7, row = data from funcName[funcName2], index = 2
--- PASS: TestForkFlow (0.03s)
PASS
ok kis-flow/test 0.996s
通過(guò)結(jié)果可以看出,flowClone1和flowName1具有相同的配置信息镊叁。
9.3 【V0.8】源代碼
https://github.com/aceld/kis-flow/releases/tag/v0.8
作者:劉丹冰Aceld github: https://github.com/aceld
KisFlow開源項(xiàng)目地址:https://github.com/aceld/kis-flow
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架專欄
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(1)-概述
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(2)-項(xiàng)目構(gòu)建/基礎(chǔ)模塊-(上)
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(3)-項(xiàng)目構(gòu)建/基礎(chǔ)模塊-(下)
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(4)-數(shù)據(jù)流
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(5)-Function調(diào)度
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(6)-Connector
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(7)-配置導(dǎo)入與導(dǎo)出
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(8)-KisFlow Action
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(10)-Flow多副本