連載中...
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(1)-概述
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(2)-項(xiàng)目構(gòu)建/基礎(chǔ)模塊-(上)
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(3)-項(xiàng)目構(gòu)建/基礎(chǔ)模塊-(下)
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(4)-數(shù)據(jù)流
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(5)-Function調(diào)度
3.1 數(shù)據(jù)類(lèi)型定義
KisFlow中可以傳遞任意類(lèi)型數(shù)據(jù)作為Flow的數(shù)據(jù)源。而且KisFlow支持批量數(shù)據(jù)的流逝計(jì)算處理役听。
首先需要對(duì)KisFlow中內(nèi)部支持的數(shù)據(jù)類(lèi)型做一個(gè)基本的定義,我們將這部分的定義代碼寫(xiě)在kis-flow/common/
中的data_type.go
文件中。
kis-flow/common/data_type.go
package common
// KisRow 一行數(shù)據(jù)
type KisRow interface{}
// KisRowArr 一次業(yè)務(wù)的批量數(shù)據(jù)
type KisRowArr []KisRow
/*
KisDataMap 當(dāng)前Flow承載的全部數(shù)據(jù)临庇,
key : 數(shù)據(jù)所在的Function ID
value: 對(duì)應(yīng)的KisRow
*/
type KisDataMap map[string]KisRowArr
-
KisRow
:表示一行數(shù)據(jù)益愈,可以是任意的數(shù)據(jù)類(lèi)型赏寇,比如字符串,json字符串其掂,一些序列化的二進(jìn)制數(shù)據(jù), protobuf潦蝇,yaml字符串等款熬,均可。 -
KisRowArr
:表示多行數(shù)據(jù)护蝶,也就是一次提交的批量數(shù)據(jù)华烟,他是KisRow的數(shù)組集合。 -
KisDataMap
:表示當(dāng)前Flow承載的全部數(shù)據(jù)持灰。是一個(gè)map[string]KisRowArr
類(lèi)型盔夜,其中key為數(shù)據(jù)所在的Function ID,value為數(shù)據(jù)堤魁。
3.2 KisFlow數(shù)據(jù)流處理
在KisFlow模塊中喂链,新增一些存放數(shù)據(jù)的成員,如下:
kis-flow/flow/kis_flow.go
// KisFlow 用于貫穿整條流式計(jì)算的上下文環(huán)境
type KisFlow struct {
// 基礎(chǔ)信息
Id string // Flow的分布式實(shí)例ID(用于KisFlow內(nèi)部區(qū)分不同實(shí)例)
Name string // Flow的可讀名稱(chēng)
Conf *config.KisFlowConfig // Flow配置策略
// Function列表
Funcs map[string]kis.Function // 當(dāng)前flow擁有的全部管理的全部Function對(duì)象, key: FunctionID
FlowHead kis.Function // 當(dāng)前Flow所擁有的Function列表表頭
FlowTail kis.Function // 當(dāng)前Flow所擁有的Function列表表尾
flock sync.RWMutex // 管理鏈表插入讀寫(xiě)的鎖
ThisFunction kis.Function // Flow當(dāng)前正在執(zhí)行的KisFunction對(duì)象
ThisFunctionId string // 當(dāng)前執(zhí)行到的Function ID (策略配置ID)
PrevFunctionId string // 當(dāng)前執(zhí)行到的Function 上一層FunctionID(策略配置ID)
// Function列表參數(shù)
funcParams map[string]config.FParam // flow在當(dāng)前Function的自定義固定配置參數(shù),Key:function的實(shí)例KisID, value:FParam
fplock sync.RWMutex // 管理funcParams的讀寫(xiě)鎖
// ++++++++ 數(shù)據(jù) ++++++++++
buffer common.KisRowArr // 用來(lái)臨時(shí)存放輸入字節(jié)數(shù)據(jù)的內(nèi)部Buf, 一條數(shù)據(jù)為interface{}, 多條數(shù)據(jù)為[]interface{} 也就是KisBatch
data common.KisDataMap // 流式計(jì)算各個(gè)層級(jí)的數(shù)據(jù)源
inPut common.KisRowArr // 當(dāng)前Function的計(jì)算輸入數(shù)據(jù)
}
-
buffer
: 用來(lái)臨時(shí)存放輸入字節(jié)數(shù)據(jù)的內(nèi)部Buf, 一條數(shù)據(jù)為interface{}, 多條數(shù)據(jù)為[]interface{} 也就是KisBatch -
data
: 流式計(jì)算各個(gè)層級(jí)的數(shù)據(jù)源 -
inPut
: 當(dāng)前Function的計(jì)算輸入數(shù)據(jù)
后續(xù)章節(jié)會(huì)使用到這幾個(gè)成員屬性妥泉,這里先做為了解椭微。
因?yàn)閐ata是一個(gè)map
類(lèi)型,所以需要在NewKisFlow()
中盲链,對(duì)其進(jìn)行初始化操作:
kis-flow/flow/kis_flow.go
// NewKisFlow 創(chuàng)建一個(gè)KisFlow.
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
flow := new(KisFlow)
// 實(shí)例Id
flow.Id = id.KisID(common.KisIdTypeFlow)
// 基礎(chǔ)信息
flow.Name = conf.FlowName
flow.Conf = conf
// Function列表
flow.Funcs = make(map[string]kis.Function)
flow.funcParams = make(map[string]config.FParam)
// ++++++++ 數(shù)據(jù)data +++++++
flow.data = make(common.KisDataMap)
return flow
}
3.2.2 業(yè)務(wù)提交數(shù)據(jù)接口
KisFlow的開(kāi)發(fā)者在編寫(xiě)業(yè)務(wù)時(shí)蝇率,可以通過(guò)flow實(shí)例來(lái)進(jìn)行提交業(yè)務(wù)源數(shù)據(jù)迟杂,所以我們需要給Flow
抽象層新增一個(gè)提交數(shù)據(jù)的接口:
kis-flow/kis/flow.go
package kis
import (
"context"
"kis-flow/common"
"kis-flow/config"
)
type Flow interface {
// Run 調(diào)度Flow,依次調(diào)度Flow中的Function并且執(zhí)行
Run(ctx context.Context) error
// Link 將Flow中的Function按照配置文件中的配置進(jìn)行連接
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
// CommitRow ++++++ 提交Flow數(shù)據(jù)到即將執(zhí)行的Function層 ++++
CommitRow(row interface{}) error
}
新增接口 CommitRow(any interface{}) error
本慕。
在kis-flow/flow/kis_flow_data.go
中實(shí)現(xiàn)KisFlow的該接口排拷。
kis-flow/flow/kis_flow_data.go
func (flow *KisFlow) CommitRow(row interface{}) error {
flow.buffer = append(flow.buffer, row)
return nil
}
CommitRow()
為提交Flow數(shù)據(jù), 一行數(shù)據(jù),如果是批量數(shù)據(jù)可以提交多次锅尘。 所有提交的數(shù)據(jù)都會(huì)暫存在flow.buffer
成員中监氢,作為緩沖區(qū)。
3.2.3 KisFlow內(nèi)部數(shù)據(jù)提交
現(xiàn)在開(kāi)發(fā)者可以通過(guò)CommitRow()
將數(shù)據(jù)提交到buffer中藤违,但是在KisFlow內(nèi)部需要一個(gè)內(nèi)部接口來(lái)將buffer提交到KisFlow的data中浪腐,作為之后當(dāng)前Flow全部Function的上下文數(shù)據(jù)供使用。所以我們這里需要再提供兩個(gè)接口顿乒。分別是首次提交數(shù)據(jù)commitSrcData()
和中間層提交數(shù)據(jù)commitCurData()
兩個(gè)函數(shù)议街。
A. 首層數(shù)據(jù)提交
kis-flow/flow/kis_flow_data.go
// commitSrcData 提交當(dāng)前Flow的數(shù)據(jù)源數(shù)據(jù), 表示首次提交當(dāng)前Flow的原始數(shù)據(jù)源
// 將flow的臨時(shí)數(shù)據(jù)buffer,提交到flow的data中,(data為各個(gè)Function層級(jí)的源數(shù)據(jù)備份)
// 會(huì)清空之前所有的flow數(shù)據(jù)
func (flow *KisFlow) commitSrcData(ctx context.Context) error {
// 制作批量數(shù)據(jù)batch
dataCnt := len(flow.buffer)
batch := make(common.KisRowArr, 0, dataCnt)
for _, row := range flow.buffer {
batch = append(batch, row)
}
// 清空之前所有數(shù)據(jù)
flow.clearData(flow.data)
// 首次提交淆游,記錄flow原始數(shù)據(jù)
// 因?yàn)槭状翁峤话茫訮revFunctionId為FirstVirtual 因?yàn)闆](méi)有上一層Function
flow.data[common.FunctionIdFirstVirtual] = batch
// 清空緩沖Buf
flow.buffer = flow.buffer[0:0]
log.Logger().DebugFX(ctx, "====> After CommitSrcData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
return nil
}
//ClearData 清空f(shuō)low所有數(shù)據(jù)
func (flow *KisFlow) clearData(data common.KisDataMap) {
for k := range data {
delete(data, k)
}
}
實(shí)際上commitSrcData()
在整個(gè)的Flow
運(yùn)行周期只會(huì)執(zhí)行一次,這個(gè)作為當(dāng)前Flow
的始祖源數(shù)據(jù)犹菱。
commitSrcData()
的最終目的是 將buffer的數(shù)據(jù)提交到data[FunctionIdFirstVirtual]
中拾稳。 這里要注意的是FunctionIdFirstVirtual
是一個(gè)虛擬fid,作為所有Function
的上游Function ID腊脱。 并且首次提交之后访得,flow.buffer
的數(shù)據(jù)將被清空。
B. 中間層數(shù)據(jù)提交
kis-flow/flow/kis_flow_data.go
//commitCurData 提交Flow當(dāng)前執(zhí)行Function的結(jié)果數(shù)據(jù)
func (flow *KisFlow) commitCurData(ctx context.Context) error {
//判斷本層計(jì)算是否有結(jié)果數(shù)據(jù),如果沒(méi)有則退出本次Flow Run循環(huán)
if len(flow.buffer) == 0 {
return nil
}
// 制作批量數(shù)據(jù)batch
batch := make(common.KisRowArr, 0, len(flow.buffer))
//如果strBuf為空陕凹,則沒(méi)有添加任何數(shù)據(jù)
for _, row := range flow.buffer {
batch = append(batch, row)
}
//將本層計(jì)算的緩沖數(shù)據(jù)提交到本層結(jié)果數(shù)據(jù)中
flow.data[flow.ThisFunctionId] = batch
//清空緩沖Buf
flow.buffer = flow.buffer[0:0]
log.Logger().DebugFX(ctx, " ====> After commitCurData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
return nil
}
commitCurData()
會(huì)在每次Function執(zhí)行計(jì)算后悍抑,將當(dāng)前Function的計(jì)算結(jié)果數(shù)據(jù)進(jìn)行提交。 commitCurData() 會(huì)在Flow的流式計(jì)算過(guò)程中被執(zhí)行多次杜耙。
commitCurData()
的最終目的是將將buffer的數(shù)據(jù)提交到data[flow.ThisFunctionId]
中 搜骡。ThisFunctionId也就是當(dāng)前正在執(zhí)行Function,同時(shí)也是下一層將要執(zhí)行的Function的上一層佑女。
提交之后记靡,flow.buffer
的數(shù)據(jù)將被清空。
3.2.4 獲取正在執(zhí)行Function的源數(shù)據(jù)
至于每層Function的源數(shù)據(jù)如何得到团驱,我們可以通過(guò)getCurData()
方法得到摸吠。 通過(guò)PrevFunctionId
進(jìn)行索引,因?yàn)楂@取當(dāng)前Function的源數(shù)據(jù)嚎花,就是上一層Function的結(jié)果數(shù)據(jù)寸痢,所以我們通過(guò)PrevFunctionId
來(lái)得到上一層Function的Id,從data[PrevFunctionId
] 中可以得到數(shù)據(jù)源紊选。
kis-flow/flow/kis_flow_data.go
// getCurData 獲取flow當(dāng)前Function層級(jí)的輸入數(shù)據(jù)
func (flow *KisFlow) getCurData() (common.KisRowArr, error) {
if flow.PrevFunctionId == "" {
return nil, errors.New(fmt.Sprintf("flow.PrevFunctionId is not set"))
}
if _, ok := flow.data[flow.PrevFunctionId]; !ok {
return nil, errors.New(fmt.Sprintf("[%s] is not in flow.data", flow.PrevFunctionId))
}
return flow.data[flow.PrevFunctionId], nil
}
3.2.5 數(shù)據(jù)流鏈?zhǔn)秸{(diào)度處理
下面我們就要在flow.Run()
方法中啼止,來(lái)加入數(shù)據(jù)流的處理動(dòng)作道逗。
kis-flow/flow/kis_flow.go
// Run 啟動(dòng)KisFlow的流式計(jì)算, 從起始Function開(kāi)始執(zhí)行流
func (flow *KisFlow) Run(ctx context.Context) error {
var fn kis.Function
fn = flow.FlowHead
if flow.Conf.Status == int(common.FlowDisable) {
//flow被配置關(guān)閉
return nil
}
// ========= 數(shù)據(jù)流 新增 ===========
// 因?yàn)榇藭r(shí)還沒(méi)有執(zhí)行任何Function, 所以PrevFunctionId為FirstVirtual 因?yàn)闆](méi)有上一層Function
flow.PrevFunctionId = common.FunctionIdFirstVirtual
// 提交數(shù)據(jù)流原始數(shù)據(jù)
if err := flow.commitSrcData(ctx); err != nil {
return err
}
// ========= 數(shù)據(jù)流 新增 ===========
//流式鏈?zhǔn)秸{(diào)用
for fn != nil {
// ========= 數(shù)據(jù)流 新增 ===========
// flow記錄當(dāng)前執(zhí)行到的Function 標(biāo)記
fid := fn.GetId()
flow.ThisFunction = fn
flow.ThisFunctionId = fid
// 得到當(dāng)前Function要處理與的源數(shù)據(jù)
if inputData, err := flow.getCurData(); err != nil {
log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error())
return err
} else {
flow.inPut = inputData
}
// ========= 數(shù)據(jù)流 新增 ===========
if err := fn.Call(ctx, flow); err != nil {
//Error
return err
} else {
//Success
// ========= 數(shù)據(jù)流 新增 ===========
if err := flow.commitCurData(ctx); err != nil {
return err
}
// 更新上一層FuncitonId游標(biāo)
flow.PrevFunctionId = flow.ThisFunctionId
// ========= 數(shù)據(jù)流 新增 ===========
fn = fn.Next()
}
}
return nil
}
- 在run() 剛執(zhí)行的時(shí)候,對(duì)PrevFunctionId 進(jìn)行初始化献烦,設(shè)置為
FunctionIdFirstVirtual
憔辫。 - 在run() 剛執(zhí)行的時(shí)候,執(zhí)行
commitSrcData()
將業(yè)務(wù)賦值的的buffer數(shù)據(jù)提交到data[FunctionIdFirstVirtual
]中仿荆。 - 進(jìn)入循環(huán),執(zhí)行每個(gè)Function的時(shí)候坏平,
getCurData()
獲取到當(dāng)前Function的源數(shù)據(jù)拢操,并且放在flow.inPut
成員中。 - 進(jìn)入循環(huán)舶替,更正
ThisFunctionId
游標(biāo)為當(dāng)前Function ID令境。 - 進(jìn)入循環(huán),每個(gè)Funciton執(zhí)行完畢后顾瞪,將Function產(chǎn)生的結(jié)果數(shù)據(jù)通過(guò)
commitCurData()
進(jìn)行提交舔庶,并且改變PrevFunctionId
為當(dāng)前FunctionID, 進(jìn)入下一層陈醒。
很顯然惕橙,我們還需要讓Flow
給開(kāi)發(fā)者提供一個(gè)獲取Input數(shù)據(jù)的接口。
kis-flow/kis/flow.go
package kis
import (
"context"
"kis-flow/common"
"kis-flow/config"
)
type Flow interface {
// Run 調(diào)度Flow钉跷,依次調(diào)度Flow中的Function并且執(zhí)行
Run(ctx context.Context) error
// Link 將Flow中的Function按照配置文件中的配置進(jìn)行連接
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
// CommitRow 提交Flow數(shù)據(jù)到即將執(zhí)行的Function層
CommitRow(row interface{}) error
// ++++++++++++++++++++++
// Input 得到flow當(dāng)前執(zhí)行Function的輸入源數(shù)據(jù)
Input() common.KisRowArr
}
實(shí)現(xiàn)如下:
kis-flow/flow/kis_flow_data.go
// Input 得到flow當(dāng)前執(zhí)行Function的輸入源數(shù)據(jù)
func (flow *KisFlow) Input() common.KisRowArr {
return flow.inPut
}
3.3 KisFunction的數(shù)據(jù)流處理
由于我們的Function調(diào)度模塊還目前還沒(méi)有實(shí)現(xiàn)弥鹦,所以有關(guān)Function在執(zhí)行Call()
方法的時(shí)候,只能暫時(shí)將業(yè)務(wù)計(jì)算的邏輯寫(xiě)死在KisFlow框架中爷辙。 在下一章節(jié)彬坏,我們會(huì)將這部分的計(jì)算邏輯開(kāi)放給開(kāi)發(fā)者進(jìn)行注冊(cè)自己的業(yè)務(wù)。
現(xiàn)在Flow已經(jīng)將數(shù)據(jù)傳遞給了每層的Function膝晾,那么在Function中我們下面來(lái)簡(jiǎn)單模擬一下業(yè)務(wù)的基礎(chǔ)計(jì)算邏輯栓始。
我們暫時(shí)修改KisFunctionC
和 KisFunctionE
兩個(gè)模塊的Call()
代碼.
假設(shè)KisFunctionC 是 KisFunctionE的上層。
kis-flow/function/kis_function_c.go
type KisFunctionC struct {
BaseFunction
}
func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error {
log.Logger().InfoF("KisFunctionC, flow = %+v\n", flow)
//TODO 調(diào)用具體的Function執(zhí)行方法
//處理業(yè)務(wù)數(shù)據(jù)
for i, row := range flow.Input() {
fmt.Printf("In KisFunctionC, row = %+v\n", row)
// 提交本層計(jì)算結(jié)果數(shù)據(jù)
_ = flow.CommitRow("Data From KisFunctionC, index " + " " + fmt.Sprintf("%d", i))
}
return nil
}
kis-flow/function/kis_function_e.go
type KisFunctionE struct {
BaseFunction
}
func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error {
log.Logger().InfoF("KisFunctionE, flow = %+v\n", flow)
// TODO 調(diào)用具體的Function執(zhí)行方法
//處理業(yè)務(wù)數(shù)據(jù)
for _, row := range flow.Input() {
fmt.Printf("In KisFunctionE, row = %+v\n", row)
}
return nil
}
3.4 數(shù)據(jù)流單元測(cè)試
下面我們模擬一個(gè)簡(jiǎn)單的計(jì)算業(yè)務(wù)血当,測(cè)試下每層的Function是否可以得到數(shù)據(jù)幻赚,并且將計(jì)算結(jié)果傳遞給下一層。
kis-flow/test/kis_flow_test.go
func TestNewKisFlowData(t *testing.T) {
ctx := context.Background()
// 1. 創(chuàng)建2個(gè)KisFunction配置實(shí)例
source1 := config.KisSource{
Name: "公眾號(hào)抖音商城戶訂單數(shù)據(jù)",
Must: []string{"order_id", "user_id"},
}
source2 := config.KisSource{
Name: "用戶訂單錯(cuò)誤率",
Must: []string{"order_id", "user_id"},
}
myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source1, nil)
if myFuncConfig1 == nil {
panic("myFuncConfig1 is nil")
}
myFuncConfig2 := config.NewFuncConfig("funcName2", common.E, &source2, nil)
if myFuncConfig2 == nil {
panic("myFuncConfig2 is nil")
}
// 2. 創(chuàng)建一個(gè) KisFlow 配置實(shí)例
myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)
// 3. 創(chuàng)建一個(gè)KisFlow對(duì)象
flow1 := flow.NewKisFlow(myFlowConfig1)
// 4. 拼接Functioin 到 Flow 上
if err := flow1.Link(myFuncConfig1, nil); err != nil {
panic(err)
}
if err := flow1.Link(myFuncConfig2, nil); err != nil {
panic(err)
}
// 5. 提交原始數(shù)據(jù)
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 6. 執(zhí)行flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}
這里我們通過(guò)flow.CommitRow()
提交了3行數(shù)據(jù)歹颓,每行數(shù)據(jù)是一個(gè)字符串坯屿,當(dāng)然數(shù)據(jù)格式可以任意,數(shù)據(jù)類(lèi)型也可以任意巍扛,只需要在各層的Function業(yè)務(wù)自身確定拉齊好即可领跛。
cd到kis-flow/test/
下執(zhí)行命令:
go test -test.v -test.paniconexit0 -test.run TestNewKisFlowData
結(jié)果如下:
=== RUN TestNewKisFlowData
context.Background
====> After CommitSrcData, flow_name = flowName1, flow_id = flow-8b607ae6d55048408dae1f4e8f6dca6f
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
KisFunctionC, flow = &{Id:flow-8b607ae6d55048408dae1f4e8f6dca6f Name:flowName1 Conf:0xc00015a780 Funcs:map[func-2182fa1a049f4c1c9eeb641f5292f09f:0xc0001381e0 func-f3e7d7868f44448fb532935768ea2ca1:0xc000138190] FlowHead:0xc000138190 FlowTail:0xc0001381e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000138190 ThisFunctionId:func-f3e7d7868f44448fb532935768ea2ca1 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-2182fa1a049f4c1c9eeb641f5292f09f:map[] func-f3e7d7868f44448fb532935768ea2ca1:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]}
In KisFunctionC, row = This is Data1 from Test
In KisFunctionC, row = This is Data2 from Test
In KisFunctionC, row = This is Data3 from Test
context.Background
====> After commitCurData, flow_name = flowName1, flow_id = flow-8b607ae6d55048408dae1f4e8f6dca6f
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f3e7d7868f44448fb532935768ea2ca1:[Data From KisFunctionC, index 0 Data From KisFunctionC, index 1 Data From KisFunctionC, index 2]]
KisFunctionE, flow = &{Id:flow-8b607ae6d55048408dae1f4e8f6dca6f Name:flowName1 Conf:0xc00015a780 Funcs:map[func-2182fa1a049f4c1c9eeb641f5292f09f:0xc0001381e0 func-f3e7d7868f44448fb532935768ea2ca1:0xc000138190] FlowHead:0xc000138190 FlowTail:0xc0001381e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0001381e0 ThisFunctionId:func-2182fa1a049f4c1c9eeb641f5292f09f PrevFunctionId:func-f3e7d7868f44448fb532935768ea2ca1 funcParams:map[func-2182fa1a049f4c1c9eeb641f5292f09f:map[] func-f3e7d7868f44448fb532935768ea2ca1:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f3e7d7868f44448fb532935768ea2ca1:[Data From KisFunctionC, index 0 Data From KisFunctionC, index 1 Data From KisFunctionC, index 2]] inPut:[Data From KisFunctionC, index 0 Data From KisFunctionC, index 1 Data From KisFunctionC, index 2]}
In KisFunctionE, row = Data From KisFunctionC, index 0
In KisFunctionE, row = Data From KisFunctionC, index 1
In KisFunctionE, row = Data From KisFunctionC, index 2
--- PASS: TestNewKisFlowData (0.00s)
PASS
ok kis-flow/test 0.636s
經(jīng)過(guò)日志的詳細(xì)校驗(yàn),結(jié)果是符合我們預(yù)期的撤奸。
好了吠昭,目前數(shù)據(jù)流的最簡(jiǎn)單版本已經(jīng)實(shí)現(xiàn)了喊括,下一章我們將Function的業(yè)務(wù)邏輯開(kāi)放給開(kāi)發(fā)者,而不是寫(xiě)在KisFlow框架中.
3.5 【V0.2】源代碼
https://github.com/aceld/kis-flow/releases/tag/v0.2
作者:劉丹冰Aceld github: https://github.com/aceld
KisFlow開(kāi)源項(xiàng)目地址:https://github.com/aceld/kis-flow
連載中...
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(1)-概述
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(2)-項(xiàng)目構(gòu)建/基礎(chǔ)模塊-(上)
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(3)-項(xiàng)目構(gòu)建/基礎(chǔ)模塊-(下)
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(4)-數(shù)據(jù)流