Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(4)-數(shù)據(jù)流

連載中...
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(1)-概述
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(2)-項(xiàng)目構(gòu)建/基礎(chǔ)模塊-(上)
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(3)-項(xiàng)目構(gòu)建/基礎(chǔ)模塊-(下)
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(4)-數(shù)據(jù)流
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(5)-Function調(diào)度


3.1 數(shù)據(jù)類(lèi)型定義

KisFlow中可以傳遞任意類(lèi)型數(shù)據(jù)作為Flow的數(shù)據(jù)源。而且KisFlow支持批量數(shù)據(jù)的流逝計(jì)算處理役听。

首先需要對(duì)KisFlow中內(nèi)部支持的數(shù)據(jù)類(lèi)型做一個(gè)基本的定義,我們將這部分的定義代碼寫(xiě)在kis-flow/common/中的data_type.go 文件中。

kis-flow/common/data_type.go

package common

// KisRow 一行數(shù)據(jù)
type KisRow interface{}

// KisRowArr 一次業(yè)務(wù)的批量數(shù)據(jù)
type KisRowArr []KisRow

/*
    KisDataMap 當(dāng)前Flow承載的全部數(shù)據(jù)临庇,
    key :  數(shù)據(jù)所在的Function ID
    value: 對(duì)應(yīng)的KisRow
*/
type KisDataMap map[string]KisRowArr
  • KisRow :表示一行數(shù)據(jù)益愈,可以是任意的數(shù)據(jù)類(lèi)型赏寇,比如字符串,json字符串其掂,一些序列化的二進(jìn)制數(shù)據(jù), protobuf潦蝇,yaml字符串等款熬,均可。
  • KisRowArr:表示多行數(shù)據(jù)护蝶,也就是一次提交的批量數(shù)據(jù)华烟,他是KisRow的數(shù)組集合。
  • KisDataMap :表示當(dāng)前Flow承載的全部數(shù)據(jù)持灰。是一個(gè)map[string]KisRowArr類(lèi)型盔夜,其中key為數(shù)據(jù)所在的Function ID,value為數(shù)據(jù)堤魁。

3.2 KisFlow數(shù)據(jù)流處理

在KisFlow模塊中喂链,新增一些存放數(shù)據(jù)的成員,如下:

kis-flow/flow/kis_flow.go

// KisFlow 用于貫穿整條流式計(jì)算的上下文環(huán)境
type KisFlow struct {
    // 基礎(chǔ)信息
    Id   string                // Flow的分布式實(shí)例ID(用于KisFlow內(nèi)部區(qū)分不同實(shí)例)
    Name string                // Flow的可讀名稱(chēng)
    Conf *config.KisFlowConfig // Flow配置策略

    // Function列表
    Funcs          map[string]kis.Function // 當(dāng)前flow擁有的全部管理的全部Function對(duì)象, key: FunctionID
    FlowHead       kis.Function            // 當(dāng)前Flow所擁有的Function列表表頭
    FlowTail       kis.Function            // 當(dāng)前Flow所擁有的Function列表表尾
    flock          sync.RWMutex            // 管理鏈表插入讀寫(xiě)的鎖
    ThisFunction   kis.Function            // Flow當(dāng)前正在執(zhí)行的KisFunction對(duì)象
    ThisFunctionId string                  // 當(dāng)前執(zhí)行到的Function ID (策略配置ID)
    PrevFunctionId string                  // 當(dāng)前執(zhí)行到的Function 上一層FunctionID(策略配置ID)

    // Function列表參數(shù)
    funcParams map[string]config.FParam // flow在當(dāng)前Function的自定義固定配置參數(shù),Key:function的實(shí)例KisID, value:FParam
    fplock     sync.RWMutex             // 管理funcParams的讀寫(xiě)鎖

    // ++++++++ 數(shù)據(jù) ++++++++++
    buffer common.KisRowArr  // 用來(lái)臨時(shí)存放輸入字節(jié)數(shù)據(jù)的內(nèi)部Buf, 一條數(shù)據(jù)為interface{}, 多條數(shù)據(jù)為[]interface{} 也就是KisBatch
    data   common.KisDataMap // 流式計(jì)算各個(gè)層級(jí)的數(shù)據(jù)源
    inPut  common.KisRowArr  // 當(dāng)前Function的計(jì)算輸入數(shù)據(jù)
}
  • buffer: 用來(lái)臨時(shí)存放輸入字節(jié)數(shù)據(jù)的內(nèi)部Buf, 一條數(shù)據(jù)為interface{}, 多條數(shù)據(jù)為[]interface{} 也就是KisBatch
  • data: 流式計(jì)算各個(gè)層級(jí)的數(shù)據(jù)源
  • inPut: 當(dāng)前Function的計(jì)算輸入數(shù)據(jù)

后續(xù)章節(jié)會(huì)使用到這幾個(gè)成員屬性妥泉,這里先做為了解椭微。

因?yàn)閐ata是一個(gè)map類(lèi)型,所以需要在NewKisFlow()中盲链,對(duì)其進(jìn)行初始化操作:

kis-flow/flow/kis_flow.go

// NewKisFlow 創(chuàng)建一個(gè)KisFlow.
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
    flow := new(KisFlow)
    // 實(shí)例Id
    flow.Id = id.KisID(common.KisIdTypeFlow)

    // 基礎(chǔ)信息
    flow.Name = conf.FlowName
    flow.Conf = conf

    // Function列表
    flow.Funcs = make(map[string]kis.Function)
    flow.funcParams = make(map[string]config.FParam)

    // ++++++++ 數(shù)據(jù)data +++++++
    flow.data = make(common.KisDataMap)

    return flow
}

3.2.2 業(yè)務(wù)提交數(shù)據(jù)接口

KisFlow的開(kāi)發(fā)者在編寫(xiě)業(yè)務(wù)時(shí)蝇率,可以通過(guò)flow實(shí)例來(lái)進(jìn)行提交業(yè)務(wù)源數(shù)據(jù)迟杂,所以我們需要給Flow抽象層新增一個(gè)提交數(shù)據(jù)的接口:

kis-flow/kis/flow.go

package kis

import (
    "context"
    "kis-flow/common"
    "kis-flow/config"
)

type Flow interface {
    // Run 調(diào)度Flow,依次調(diào)度Flow中的Function并且執(zhí)行
    Run(ctx context.Context) error
    // Link 將Flow中的Function按照配置文件中的配置進(jìn)行連接
    Link(fConf *config.KisFuncConfig, fParams config.FParam) error
    // CommitRow  ++++++ 提交Flow數(shù)據(jù)到即將執(zhí)行的Function層 ++++
    CommitRow(row interface{}) error
}

新增接口 CommitRow(any interface{}) error本慕。

kis-flow/flow/kis_flow_data.go中實(shí)現(xiàn)KisFlow的該接口排拷。

kis-flow/flow/kis_flow_data.go

func (flow *KisFlow) CommitRow(row interface{}) error {

    flow.buffer = append(flow.buffer, row)

    return nil
}

CommitRow() 為提交Flow數(shù)據(jù), 一行數(shù)據(jù),如果是批量數(shù)據(jù)可以提交多次锅尘。 所有提交的數(shù)據(jù)都會(huì)暫存在flow.buffer 成員中监氢,作為緩沖區(qū)。

3.2.3 KisFlow內(nèi)部數(shù)據(jù)提交

現(xiàn)在開(kāi)發(fā)者可以通過(guò)CommitRow()將數(shù)據(jù)提交到buffer中藤违,但是在KisFlow內(nèi)部需要一個(gè)內(nèi)部接口來(lái)將buffer提交到KisFlow的data中浪腐,作為之后當(dāng)前Flow全部Function的上下文數(shù)據(jù)供使用。所以我們這里需要再提供兩個(gè)接口顿乒。分別是首次提交數(shù)據(jù)commitSrcData()和中間層提交數(shù)據(jù)commitCurData()兩個(gè)函數(shù)议街。

A. 首層數(shù)據(jù)提交

kis-flow/flow/kis_flow_data.go

// commitSrcData 提交當(dāng)前Flow的數(shù)據(jù)源數(shù)據(jù), 表示首次提交當(dāng)前Flow的原始數(shù)據(jù)源
// 將flow的臨時(shí)數(shù)據(jù)buffer,提交到flow的data中,(data為各個(gè)Function層級(jí)的源數(shù)據(jù)備份)
// 會(huì)清空之前所有的flow數(shù)據(jù)
func (flow *KisFlow) commitSrcData(ctx context.Context) error {

    // 制作批量數(shù)據(jù)batch
    dataCnt := len(flow.buffer)
    batch := make(common.KisRowArr, 0, dataCnt)

    for _, row := range flow.buffer {
        batch = append(batch, row)
    }

    // 清空之前所有數(shù)據(jù)
    flow.clearData(flow.data)

    // 首次提交淆游,記錄flow原始數(shù)據(jù)
    // 因?yàn)槭状翁峤话茫訮revFunctionId為FirstVirtual 因?yàn)闆](méi)有上一層Function
    flow.data[common.FunctionIdFirstVirtual] = batch

    // 清空緩沖Buf
    flow.buffer = flow.buffer[0:0]

    log.Logger().DebugFX(ctx, "====> After CommitSrcData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)

    return nil
}

//ClearData 清空f(shuō)low所有數(shù)據(jù)
func (flow *KisFlow) clearData(data common.KisDataMap) {
    for k := range data {
        delete(data, k)
    }
}

實(shí)際上commitSrcData()在整個(gè)的Flow運(yùn)行周期只會(huì)執(zhí)行一次,這個(gè)作為當(dāng)前Flow的始祖源數(shù)據(jù)犹菱。

commitSrcData() 的最終目的是 將buffer的數(shù)據(jù)提交到data[FunctionIdFirstVirtual] 中拾稳。 這里要注意的是FunctionIdFirstVirtual是一個(gè)虛擬fid,作為所有Function的上游Function ID腊脱。 并且首次提交之后访得,flow.buffer的數(shù)據(jù)將被清空。

B. 中間層數(shù)據(jù)提交

kis-flow/flow/kis_flow_data.go

//commitCurData 提交Flow當(dāng)前執(zhí)行Function的結(jié)果數(shù)據(jù)
func (flow *KisFlow) commitCurData(ctx context.Context) error {

    //判斷本層計(jì)算是否有結(jié)果數(shù)據(jù),如果沒(méi)有則退出本次Flow Run循環(huán)
    if len(flow.buffer) == 0 {
        return nil
    }

    // 制作批量數(shù)據(jù)batch
    batch := make(common.KisRowArr, 0, len(flow.buffer))

    //如果strBuf為空陕凹,則沒(méi)有添加任何數(shù)據(jù)
    for _, row := range flow.buffer {
        batch = append(batch, row)
    }

    //將本層計(jì)算的緩沖數(shù)據(jù)提交到本層結(jié)果數(shù)據(jù)中
    flow.data[flow.ThisFunctionId] = batch

    //清空緩沖Buf
    flow.buffer = flow.buffer[0:0]

    log.Logger().DebugFX(ctx, " ====> After commitCurData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)

    return nil
}

commitCurData()會(huì)在每次Function執(zhí)行計(jì)算后悍抑,將當(dāng)前Function的計(jì)算結(jié)果數(shù)據(jù)進(jìn)行提交。 commitCurData() 會(huì)在Flow的流式計(jì)算過(guò)程中被執(zhí)行多次杜耙。

commitCurData()的最終目的是將將buffer的數(shù)據(jù)提交到data[flow.ThisFunctionId] 中 搜骡。ThisFunctionId也就是當(dāng)前正在執(zhí)行Function,同時(shí)也是下一層將要執(zhí)行的Function的上一層佑女。

提交之后记靡,flow.buffer的數(shù)據(jù)將被清空。

3.2.4 獲取正在執(zhí)行Function的源數(shù)據(jù)

至于每層Function的源數(shù)據(jù)如何得到团驱,我們可以通過(guò)getCurData()方法得到摸吠。 通過(guò)PrevFunctionId進(jìn)行索引,因?yàn)楂@取當(dāng)前Function的源數(shù)據(jù)嚎花,就是上一層Function的結(jié)果數(shù)據(jù)寸痢,所以我們通過(guò)PrevFunctionId來(lái)得到上一層Function的Id,從data[PrevFunctionId] 中可以得到數(shù)據(jù)源紊选。

kis-flow/flow/kis_flow_data.go

// getCurData 獲取flow當(dāng)前Function層級(jí)的輸入數(shù)據(jù)
func (flow *KisFlow) getCurData() (common.KisRowArr, error) {
    if flow.PrevFunctionId == "" {
        return nil, errors.New(fmt.Sprintf("flow.PrevFunctionId is not set"))
    }

    if _, ok := flow.data[flow.PrevFunctionId]; !ok {
        return nil, errors.New(fmt.Sprintf("[%s] is not in flow.data", flow.PrevFunctionId))
    }

    return flow.data[flow.PrevFunctionId], nil
}

3.2.5 數(shù)據(jù)流鏈?zhǔn)秸{(diào)度處理

下面我們就要在flow.Run()方法中啼止,來(lái)加入數(shù)據(jù)流的處理動(dòng)作道逗。

kis-flow/flow/kis_flow.go

// Run 啟動(dòng)KisFlow的流式計(jì)算, 從起始Function開(kāi)始執(zhí)行流
func (flow *KisFlow) Run(ctx context.Context) error {

    var fn kis.Function

    fn = flow.FlowHead

    if flow.Conf.Status == int(common.FlowDisable) {
        //flow被配置關(guān)閉
        return nil
    }

    // ========= 數(shù)據(jù)流 新增 ===========
    // 因?yàn)榇藭r(shí)還沒(méi)有執(zhí)行任何Function, 所以PrevFunctionId為FirstVirtual 因?yàn)闆](méi)有上一層Function
    flow.PrevFunctionId = common.FunctionIdFirstVirtual

    // 提交數(shù)據(jù)流原始數(shù)據(jù)
    if err := flow.commitSrcData(ctx); err != nil {
        return err
    }
    // ========= 數(shù)據(jù)流 新增 ===========


    //流式鏈?zhǔn)秸{(diào)用
    for fn != nil {

        // ========= 數(shù)據(jù)流 新增 ===========
        // flow記錄當(dāng)前執(zhí)行到的Function 標(biāo)記
        fid := fn.GetId()
        flow.ThisFunction = fn
        flow.ThisFunctionId = fid

        // 得到當(dāng)前Function要處理與的源數(shù)據(jù)
        if inputData, err := flow.getCurData(); err != nil {
            log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error())
            return err
        } else {
            flow.inPut = inputData
        }
        // ========= 數(shù)據(jù)流 新增 ===========


        if err := fn.Call(ctx, flow); err != nil {
            //Error
            return err
        } else {
            //Success

            // ========= 數(shù)據(jù)流 新增 ===========
            if err := flow.commitCurData(ctx); err != nil {
                return err
            }

            // 更新上一層FuncitonId游標(biāo)
            flow.PrevFunctionId = flow.ThisFunctionId
            // ========= 數(shù)據(jù)流 新增 ===========

            fn = fn.Next()
        }
    }

    return nil
}
  • 在run() 剛執(zhí)行的時(shí)候,對(duì)PrevFunctionId 進(jìn)行初始化献烦,設(shè)置為 FunctionIdFirstVirtual憔辫。
  • 在run() 剛執(zhí)行的時(shí)候,執(zhí)行commitSrcData()將業(yè)務(wù)賦值的的buffer數(shù)據(jù)提交到data[FunctionIdFirstVirtual]中仿荆。
  • 進(jìn)入循環(huán),執(zhí)行每個(gè)Function的時(shí)候坏平,getCurData()獲取到當(dāng)前Function的源數(shù)據(jù)拢操,并且放在flow.inPut 成員中。
  • 進(jìn)入循環(huán)舶替,更正ThisFunctionId 游標(biāo)為當(dāng)前Function ID令境。
  • 進(jìn)入循環(huán),每個(gè)Funciton執(zhí)行完畢后顾瞪,將Function產(chǎn)生的結(jié)果數(shù)據(jù)通過(guò)commitCurData()進(jìn)行提交舔庶,并且改變PrevFunctionId為當(dāng)前FunctionID, 進(jìn)入下一層陈醒。

很顯然惕橙,我們還需要讓Flow給開(kāi)發(fā)者提供一個(gè)獲取Input數(shù)據(jù)的接口。

kis-flow/kis/flow.go

package kis

import (
    "context"
    "kis-flow/common"
    "kis-flow/config"
)

type Flow interface {
    // Run 調(diào)度Flow钉跷,依次調(diào)度Flow中的Function并且執(zhí)行
    Run(ctx context.Context) error
    // Link 將Flow中的Function按照配置文件中的配置進(jìn)行連接
    Link(fConf *config.KisFuncConfig, fParams config.FParam) error
    // CommitRow 提交Flow數(shù)據(jù)到即將執(zhí)行的Function層
    CommitRow(row interface{}) error

    // ++++++++++++++++++++++
    // Input 得到flow當(dāng)前執(zhí)行Function的輸入源數(shù)據(jù)
    Input() common.KisRowArr
}

實(shí)現(xiàn)如下:

kis-flow/flow/kis_flow_data.go

// Input 得到flow當(dāng)前執(zhí)行Function的輸入源數(shù)據(jù)
func (flow *KisFlow) Input() common.KisRowArr {
    return flow.inPut
}

3.3 KisFunction的數(shù)據(jù)流處理

由于我們的Function調(diào)度模塊還目前還沒(méi)有實(shí)現(xiàn)弥鹦,所以有關(guān)Function在執(zhí)行Call()方法的時(shí)候,只能暫時(shí)將業(yè)務(wù)計(jì)算的邏輯寫(xiě)死在KisFlow框架中爷辙。 在下一章節(jié)彬坏,我們會(huì)將這部分的計(jì)算邏輯開(kāi)放給開(kāi)發(fā)者進(jìn)行注冊(cè)自己的業(yè)務(wù)。

現(xiàn)在Flow已經(jīng)將數(shù)據(jù)傳遞給了每層的Function膝晾,那么在Function中我們下面來(lái)簡(jiǎn)單模擬一下業(yè)務(wù)的基礎(chǔ)計(jì)算邏輯栓始。

我們暫時(shí)修改KisFunctionCKisFunctionE 兩個(gè)模塊的Call()代碼.
假設(shè)KisFunctionC 是 KisFunctionE的上層。

kis-flow/function/kis_function_c.go

type KisFunctionC struct {
    BaseFunction
}

func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error {
    log.Logger().InfoF("KisFunctionC, flow = %+v\n", flow)

    //TODO 調(diào)用具體的Function執(zhí)行方法
    //處理業(yè)務(wù)數(shù)據(jù)
    for i, row := range flow.Input() {
        fmt.Printf("In KisFunctionC, row = %+v\n", row)

        // 提交本層計(jì)算結(jié)果數(shù)據(jù)
        _ = flow.CommitRow("Data From KisFunctionC, index " + " " + fmt.Sprintf("%d", i))
    }

    return nil
}

kis-flow/function/kis_function_e.go

type KisFunctionE struct {
    BaseFunction
}

func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error {
    log.Logger().InfoF("KisFunctionE, flow = %+v\n", flow)

    // TODO 調(diào)用具體的Function執(zhí)行方法
    //處理業(yè)務(wù)數(shù)據(jù)
    for _, row := range flow.Input() {
        fmt.Printf("In KisFunctionE, row = %+v\n", row)
    }

    return nil
}

3.4 數(shù)據(jù)流單元測(cè)試

下面我們模擬一個(gè)簡(jiǎn)單的計(jì)算業(yè)務(wù)血当,測(cè)試下每層的Function是否可以得到數(shù)據(jù)幻赚,并且將計(jì)算結(jié)果傳遞給下一層。

kis-flow/test/kis_flow_test.go

func TestNewKisFlowData(t *testing.T) {
    ctx := context.Background()

    // 1. 創(chuàng)建2個(gè)KisFunction配置實(shí)例
    source1 := config.KisSource{
        Name: "公眾號(hào)抖音商城戶訂單數(shù)據(jù)",
        Must: []string{"order_id", "user_id"},
    }

    source2 := config.KisSource{
        Name: "用戶訂單錯(cuò)誤率",
        Must: []string{"order_id", "user_id"},
    }

    myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source1, nil)
    if myFuncConfig1 == nil {
        panic("myFuncConfig1 is nil")
    }

    myFuncConfig2 := config.NewFuncConfig("funcName2", common.E, &source2, nil)
    if myFuncConfig2 == nil {
        panic("myFuncConfig2 is nil")
    }

    // 2. 創(chuàng)建一個(gè) KisFlow 配置實(shí)例
    myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)

    // 3. 創(chuàng)建一個(gè)KisFlow對(duì)象
    flow1 := flow.NewKisFlow(myFlowConfig1)

    // 4. 拼接Functioin 到 Flow 上
    if err := flow1.Link(myFuncConfig1, nil); err != nil {
        panic(err)
    }
    if err := flow1.Link(myFuncConfig2, nil); err != nil {
        panic(err)
    }

    // 5. 提交原始數(shù)據(jù)
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 6. 執(zhí)行flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}

這里我們通過(guò)flow.CommitRow()提交了3行數(shù)據(jù)歹颓,每行數(shù)據(jù)是一個(gè)字符串坯屿,當(dāng)然數(shù)據(jù)格式可以任意,數(shù)據(jù)類(lèi)型也可以任意巍扛,只需要在各層的Function業(yè)務(wù)自身確定拉齊好即可领跛。

cd到kis-flow/test/下執(zhí)行命令:

go test -test.v -test.paniconexit0 -test.run TestNewKisFlowData

結(jié)果如下:

=== RUN   TestNewKisFlowData
context.Background
====> After CommitSrcData, flow_name = flowName1, flow_id = flow-8b607ae6d55048408dae1f4e8f6dca6f
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionC, flow = &{Id:flow-8b607ae6d55048408dae1f4e8f6dca6f Name:flowName1 Conf:0xc00015a780 Funcs:map[func-2182fa1a049f4c1c9eeb641f5292f09f:0xc0001381e0 func-f3e7d7868f44448fb532935768ea2ca1:0xc000138190] FlowHead:0xc000138190 FlowTail:0xc0001381e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000138190 ThisFunctionId:func-f3e7d7868f44448fb532935768ea2ca1 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-2182fa1a049f4c1c9eeb641f5292f09f:map[] func-f3e7d7868f44448fb532935768ea2ca1:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]}

In KisFunctionC, row = This is Data1 from Test
In KisFunctionC, row = This is Data2 from Test
In KisFunctionC, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName1, flow_id = flow-8b607ae6d55048408dae1f4e8f6dca6f
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f3e7d7868f44448fb532935768ea2ca1:[Data From KisFunctionC, index  0 Data From KisFunctionC, index  1 Data From KisFunctionC, index  2]]

KisFunctionE, flow = &{Id:flow-8b607ae6d55048408dae1f4e8f6dca6f Name:flowName1 Conf:0xc00015a780 Funcs:map[func-2182fa1a049f4c1c9eeb641f5292f09f:0xc0001381e0 func-f3e7d7868f44448fb532935768ea2ca1:0xc000138190] FlowHead:0xc000138190 FlowTail:0xc0001381e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0001381e0 ThisFunctionId:func-2182fa1a049f4c1c9eeb641f5292f09f PrevFunctionId:func-f3e7d7868f44448fb532935768ea2ca1 funcParams:map[func-2182fa1a049f4c1c9eeb641f5292f09f:map[] func-f3e7d7868f44448fb532935768ea2ca1:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f3e7d7868f44448fb532935768ea2ca1:[Data From KisFunctionC, index  0 Data From KisFunctionC, index  1 Data From KisFunctionC, index  2]] inPut:[Data From KisFunctionC, index  0 Data From KisFunctionC, index  1 Data From KisFunctionC, index  2]}

In KisFunctionE, row = Data From KisFunctionC, index  0
In KisFunctionE, row = Data From KisFunctionC, index  1
In KisFunctionE, row = Data From KisFunctionC, index  2
--- PASS: TestNewKisFlowData (0.00s)
PASS
ok      kis-flow/test   0.636s

經(jīng)過(guò)日志的詳細(xì)校驗(yàn),結(jié)果是符合我們預(yù)期的撤奸。

好了吠昭,目前數(shù)據(jù)流的最簡(jiǎn)單版本已經(jīng)實(shí)現(xiàn)了喊括,下一章我們將Function的業(yè)務(wù)邏輯開(kāi)放給開(kāi)發(fā)者,而不是寫(xiě)在KisFlow框架中.

3.5 【V0.2】源代碼

https://github.com/aceld/kis-flow/releases/tag/v0.2


作者:劉丹冰Aceld github: https://github.com/aceld
KisFlow開(kāi)源項(xiàng)目地址:https://github.com/aceld/kis-flow


連載中...
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(1)-概述
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(2)-項(xiàng)目構(gòu)建/基礎(chǔ)模塊-(上)
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(3)-項(xiàng)目構(gòu)建/基礎(chǔ)模塊-(下)
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(4)-數(shù)據(jù)流

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末矢棚,一起剝皮案震驚了整個(gè)濱河市郑什,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌蒲肋,老刑警劉巖蘑拯,帶你破解...
    沈念sama閱讀 218,682評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異兜粘,居然都是意外死亡申窘,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)孔轴,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)剃法,“玉大人,你說(shuō)我怎么就攤上這事路鹰〈蓿” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,083評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵晋柱,是天一觀的道長(zhǎng)优构。 經(jīng)常有香客問(wèn)我,道長(zhǎng)雁竞,這世上最難降的妖魔是什么俩块? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,763評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮浓领,結(jié)果婚禮上玉凯,老公的妹妹穿的比我還像新娘。我一直安慰自己联贩,他們只是感情好漫仆,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,785評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著泪幌,像睡著了一般盲厌。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上祸泪,一...
    開(kāi)封第一講書(shū)人閱讀 51,624評(píng)論 1 305
  • 那天吗浩,我揣著相機(jī)與錄音,去河邊找鬼没隘。 笑死懂扼,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播阀湿,決...
    沈念sama閱讀 40,358評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼赶熟,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了陷嘴?” 一聲冷哼從身側(cè)響起映砖,我...
    開(kāi)封第一講書(shū)人閱讀 39,261評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎灾挨,沒(méi)想到半個(gè)月后邑退,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,722評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡劳澄,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年瓜饥,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片浴骂。...
    茶點(diǎn)故事閱讀 40,030評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖宪潮,靈堂內(nèi)的尸體忽然破棺而出溯警,到底是詐尸還是另有隱情,我是刑警寧澤狡相,帶...
    沈念sama閱讀 35,737評(píng)論 5 346
  • 正文 年R本政府宣布梯轻,位于F島的核電站,受9級(jí)特大地震影響尽棕,放射性物質(zhì)發(fā)生泄漏喳挑。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,360評(píng)論 3 330
  • 文/蒙蒙 一滔悉、第九天 我趴在偏房一處隱蔽的房頂上張望伊诵。 院中可真熱鬧,春花似錦回官、人聲如沸曹宴。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,941評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)笛坦。三九已至,卻和暖如春苔巨,著一層夾襖步出監(jiān)牢的瞬間版扩,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,057評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工侄泽, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留礁芦,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,237評(píng)論 3 371
  • 正文 我出身青樓悼尾,卻偏偏與公主長(zhǎng)得像宴偿,于是被迫代替她去往敵國(guó)和親湘捎。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,976評(píng)論 2 355