連載中...
Golang框架實戰(zhàn)-KisFlow流式計算框架(1)-概述
Golang框架實戰(zhàn)-KisFlow流式計算框架(2)-項目構(gòu)建/基礎(chǔ)模塊-(上)
Golang框架實戰(zhàn)-KisFlow流式計算框架(3)-項目構(gòu)建/基礎(chǔ)模塊-(下)
Golang框架實戰(zhàn)-KisFlow流式計算框架(4)-數(shù)據(jù)流
Golang框架實戰(zhàn)-KisFlow流式計算框架(5)-Function調(diào)度
2. V0.1-項目構(gòu)建及基礎(chǔ)模塊定義
首先我們創(chuàng)建我們的項目,項目的主文件目錄就叫KisFlow蜀细,且在Github上創(chuàng)建對應(yīng)的倉庫: https://github.com/aceld/kis-flow 然后將項目代碼clone到本地鬼店。
2.0 項目構(gòu)建
(這里如果你是按照本教程開發(fā)元暴,需要在自己的倉庫重新創(chuàng)建一個新項目世囊,并且clone到本地開發(fā))
2.0.1 創(chuàng)建項目目錄
接下來拙吉,我們先將項目中的必要的文件目錄創(chuàng)建好盘榨,項目的目錄結(jié)構(gòu)如下:
kis-flow /
.
├── LICENSE
├── README.md
├── common/
├── example/
├── function/
├── conn/
├── config/
├── flow/
└── kis/
這里我們創(chuàng)建三個文件夾硬霍, common/
為 存放我們一些公用的基礎(chǔ)常量和一些枚舉參數(shù)掏呼,還有一些工具類的方法坏快。 flow/
為存放KisFlow的核心代碼。 function/
為存放KisFunction的核心代碼憎夷。 conn/
為存放KisConnector的核心代碼莽鸿。 config/
存放flow、functioin拾给、connector等策略配置信息模塊祥得。 example/
為我們針對KisFlow的一些測試案例和test單元測試案例等兔沃,能夠及時驗證我們的項目效果。 kis/
來存放所有模塊的抽象層啃沪。
2.0.1 創(chuàng)建go.mod
cd 到 kis-flow的項目根目錄粘拾,執(zhí)行如下指令:
我們會得到go.mod文件,這個是作為當前項目的包管理文件创千,如下:
module kis-flow
go 1.18
首先因為在之后會有很多調(diào)試日志要打印缰雇,我們先把日志模塊集成了,日志模塊KisFlow提供一個默認的標準輸出Logger對象追驴,再對我開放一個SetLogger() 方法來進行重新設(shè)置開發(fā)者自己的Logger模塊械哟。
2.1 KisLogger
2.1.1 Logger抽象接口
將Logger的定義在kis-flow/log/
目錄下,創(chuàng)建kis_log.go
文件:
kis-flow/log/kis_log.go
package log
import "context"
type KisLogger interface {
// InfoFX 有上下文的Info級別日志接口, format字符串格式
InfoFX(ctx context.Context, str string, v ...interface{})
// ErrorFX 有上下文的Error級別日志接口, format字符串格式
ErrorFX(ctx context.Context, str string, v ...interface{})
// DebugFX 有上下文的Debug級別日志接口, format字符串格式
DebugFX(ctx context.Context, str string, v ...interface{})
// InfoF 無上下文的Info級別日志接口, format字符串格式
InfoF(str string, v ...interface{})
// ErrorF 無上下文的Error級別日志接口, format字符串格式
ErrorF(str string, v ...interface{})
// DebugF 無上下文的Debug級別日志接口, format字符串格式
DebugF(str string, v ...interface{})
}
// kisLog 默認的KisLog 對象
var kisLog KisLogger
// SetLogger 設(shè)置KisLog對象, 可以是用戶自定義的Logger對象
func SetLogger(newlog KisLogger) {
kisLog = newlog
}
// Logger 獲取到kisLog對象
func Logger() KisLogger {
return kisLog
}
KisLogger提供了三個級別的日志殿雪,分別是Info暇咆、Error、Debug丙曙。且也分別提供了具備context參數(shù)與不具備context參數(shù)的兩套日志接口爸业。
提供一個全局對象kisLog
,默認的KisLog 對象亏镰。以及方法SetLogger()
和Logger()
供開發(fā)可以設(shè)置自己的Logger對象以及獲取到Logger對象扯旷。
2.1.2 默認的日志對象KisDefaultLogger
如果開發(fā)沒有自定義的日志對象定義,那么KisFlow會提供一個默認的日志對象kisDefaultLogger
索抓,這個類實現(xiàn)了KisLogger
的全部接口钧忽,且都是默認打印到標準輸出的形式來打印日志,定義在kis-flow/log/
目錄下逼肯,創(chuàng)建kis_default_log.go
文件耸黑。
kis-flow/log/kis_default_log.go
package log
import (
"context"
"fmt"
)
// kisDefaultLog 默認提供的日志對象
type kisDefaultLog struct{}
func (log *kisDefaultLog) InfoF(str string, v ...interface{}) {
fmt.Printf(str, v...)
}
func (log *kisDefaultLog) ErrorF(str string, v ...interface{}) {
fmt.Printf(str, v...)
}
func (log *kisDefaultLog) DebugF(str string, v ...interface{}) {
fmt.Printf(str, v...)
}
func (log *kisDefaultLog) InfoFX(ctx context.Context, str string, v ...interface{}) {
fmt.Println(ctx)
fmt.Printf(str, v...)
}
func (log *kisDefaultLog) ErrorFX(ctx context.Context, str string, v ...interface{}) {
fmt.Println(ctx)
fmt.Printf(str, v...)
}
func (log *kisDefaultLog) DebugFX(ctx context.Context, str string, v ...interface{}) {
fmt.Println(ctx)
fmt.Printf(str, v...)
}
func init() {
// 如果沒有設(shè)置Logger, 則啟動時使用默認的kisDefaultLog對象
if Logger() == nil {
SetLogger(&kisDefaultLog{})
}
}
這里在init()
初始化方法中,會判斷目前是否已經(jīng)有設(shè)置全局的Logger對象篮幢,如果沒有大刊,KisFlow會默認選擇kisDefaultLog 作為全局Logger日志對象。
2.1.3 單元測試KisLogger
現(xiàn)在洲拇,我們先不針對KisLogger
做過多的方法開發(fā)奈揍,我們優(yōu)先將現(xiàn)有的程序跑起來,做一個單元測試來測試創(chuàng)建一個KisLogger
赋续。
kis-flow/test/kis_log_test.go
package test
import (
"context"
"kis-flow/log"
"testing"
)
func TestKisLogger(t *testing.T) {
ctx := context.Background()
log.Logger().InfoFX(ctx, "TestKisLogger InfoFX")
log.Logger().ErrorFX(ctx, "TestKisLogger ErrorFX")
log.Logger().DebugFX(ctx, "TestKisLogger DebugFX")
log.Logger().InfoF("TestKisLogger InfoF")
log.Logger().ErrorF("TestKisLogger ErrorF")
log.Logger().DebugF("TestKisLogger DebugF")
}
我們cd
到kis-flow/test/
目錄下執(zhí)行單元測試指令:
go test -test.v -test.paniconexit0 -test.run TestKisLogger
得到結(jié)果如下:
=== RUN TestKisLogger
context.Background
TestKisLogger InfoFX
context.Background
TestKisLogger ErrorFX
context.Background
TestKisLogger DebugFX
TestKisLogger InfoF
TestKisLogger ErrorF
TestKisLogger DebugF
--- PASS: TestKisLogger (0.00s)
PASS
ok kis-flow/test 0.509s
2.2 KisConfig
在KisFlow中,我們定義了三種核心模塊另患,分別是KisFunction
, KisFlow
, KisConnector
纽乱,所以KisConfig也分別需要針對這三個模塊進行定義,我們將全部有關(guān)KisConfig的代碼都放在kis-flow/config/
目錄下昆箕。
? kis-flow git:(master) ? tree
.
├── LICENSE
├── README.md
├── common/
│ └──
├── example/
│ └──
├── config/
│ ├──
├── test/
└── go.mod
2.2.1 KisFuncConfig 定義
KisFuncConfig在設(shè)計文檔中的yaml文件形式如下:
kistype: func
fname: 測試KisFunction_S1
fmode: Save
source:
name: 被校驗的測試數(shù)據(jù)源1-用戶訂單維度
must:
- userid
- orderid
option:
cname: 測試KisConnector_1
retry_times: 3
retry_duration: 500
default_params:
default1: default1_param
default2: default2_param
參數(shù)說明:
接下來我們根據(jù)上述的配置協(xié)議鸦列,來定義KisFunction的策略配置結(jié)構(gòu)體租冠,并且提供一些響應(yīng)的初始化方法。 我們在項目文檔中創(chuàng)建kis_func_config.go
文件薯嗤,在這里我們將需要的Config定義實現(xiàn)顽爹。
A. 結(jié)構(gòu)體定義
kis-flow/config/kis_func_config.go
package config
import (
"kis-flow/common"
"kis-flow/log"
)
// FParam 在當前Flow中Function定制固定配置參數(shù)類型
type FParam map[string]string
// KisSource 表示當前Function的業(yè)務(wù)源
type KisSource struct {
Name string `yaml:"name"` //本層Function的數(shù)據(jù)源描述
Must []string `yaml:"must"` //source必傳字段
}
// KisFuncOption 可選配置
type KisFuncOption struct {
CName string `yaml:"cname"` //連接器Connector名稱
RetryTimes int `yaml:"retry_times"` //選填,Function調(diào)度重試(不包括正常調(diào)度)最大次數(shù)
RetryDuriton int `yaml:"return_duration"` //選填,Function調(diào)度每次重試最大時間間隔(單位:ms)
Params FParam `yaml:"default_params"` //選填,在當前Flow中Function定制固定配置參數(shù)
}
// KisFuncConfig 一個KisFunction策略配置
type KisFuncConfig struct {
KisType string `yaml:"kistype"`
FName string `yaml:"fname"`
FMode string `yaml:"fmode"`
Source KisSource `yaml:"source"`
Option KisFuncOption `yaml:"option"`
}
這里KisFuncConfig
是相關(guān)結(jié)構(gòu)體,其中 FParam
骆姐、KisSource
镜粤、KisFuncOption
均為一些相關(guān)的參數(shù)類型。
B. 相關(guān)方法定義
下面我們先簡單的提供創(chuàng)建KisFuncConfig
的構(gòu)造方法玻褪。
kis-flow/config/kis_func_config.go
// NewFuncConfig 創(chuàng)建一個Function策略配置對象, 用于描述一個KisFunction信息
func NewFuncConfig(funcName string, mode common.KisMode, source *KisSource, option *KisFuncOption) *KisFuncConfig {
config := new(KisFuncConfig)
config.FName = funcName
if source == nil {
log.Logger().ErrorF("funcName NewConfig Error, source is nil, funcName = %s\n", funcName)
return nil
}
config.Source = *source
config.FMode = string(mode)
//FunctionS 和 L 需要必傳KisConnector參數(shù),原因是S和L需要通過Connector進行建立流式關(guān)系
if mode == common.S || mode == common.L {
if option == nil {
log.Logger().ErrorF("Funcion S/L need option->Cid\n")
return nil
} else if option.CName == "" {
log.Logger().ErrorF("Funcion S/L need option->Cid\n")
return nil
}
}
if option != nil {
config.Option = *option
}
return config
}
上述代碼中提到了common.S
和 common.L
兩個枚舉類型肉渴,這是我們針對KisFunction提供的五種類型的枚舉值,我們可以將他們定義在 kis-flow/common/const.go
文件中带射。
kis-flow/common/const.go
package common
type KisMode string
const (
// V 為校驗特征的KisFunction,
// 主要進行數(shù)據(jù)的過濾同规,驗證,字段梳理窟社,冪等等前置數(shù)據(jù)處理
V KisMode = "Verify"
// S 為存儲特征的KisFunction,
// S會通過NsConnector進行將數(shù)據(jù)進行存儲券勺,數(shù)據(jù)的臨時聲明周期為NsWindow
S KisMode = "Save"
// L 為加載特征的KisFunction,
// L會通過KisConnector進行數(shù)據(jù)加載灿里,通過該Function可以從邏輯上與對應(yīng)的S Function進行并流
L KisMode = "Load"
// C 為計算特征的KisFunction,
// C會通過KisFlow中的數(shù)據(jù)計算关炼,生成新的字段,將數(shù)據(jù)流傳遞給下游S進行存儲钠四,或者自己也已直接通過KisConnector進行存儲
C KisMode = "Calculate"
// E 為擴展特征的KisFunction盗扒,
// 作為流式計算的自定義特征Function,如缀去,Notify 調(diào)度器觸發(fā)任務(wù)的消息發(fā)送侣灶,刪除一些數(shù)據(jù),重置狀態(tài)等缕碎。
E KisMode = "Expand"
)
如果fmode
為Save
或者Load
說明這個function有查詢庫或者存儲數(shù)據(jù)的行為褥影,那么這個Function就需要關(guān)聯(lián)一個KisConnector,那么CName就需要傳遞進來咏雌。
C. 創(chuàng)建KisFuncConfig單元測試
現(xiàn)在凡怎,我們先不針對KisFuncConfig
做過多的方法開發(fā),我們優(yōu)先將現(xiàn)有的程序跑起來赊抖,做一個單元測試來測試創(chuàng)建一個KisFuncConfig
统倒。
kis-flow/test/kis_config_test.go
func TestNewFuncConfig(t *testing.T) {
source := config.KisSource{
Name: "公眾號抖音商城戶訂單數(shù)據(jù)",
Must: []string{"order_id", "user_id"},
}
option := config.KisFuncOption{
CName: "connectorName1",
RetryTimes: 3,
RetryDuriton: 300,
Params: config.FParam{
"param1": "value1",
"param2": "value2",
},
}
myFunc1 := config.NewFuncConfig("funcName1", common.S, &source, &option)
log.Logger().InfoF("funcName1: %+v\n", myFunc1)
}
我們cd
到kis-flow/test/
目錄下執(zhí)行單元測試指令:
go test -test.v -test.paniconexit0 -test.run TestNewFuncConfig
得到結(jié)果如下:
=== RUN TestNewFuncConfig
funcName1: &{KisType: FName:funcName1 FMode:Save Source:{Name:公眾號抖音商城戶訂單數(shù)據(jù) Must:[order_id user_id]} Option:{CName:connectorName1 RetryTimes:3 RetryDuriton:300 Params:map[param1:value1 param2:value2]}}
--- PASS: TestNewFuncConfig (0.00s)
PASS
ok kis-flow/test 0.545s
好了,現(xiàn)在最簡單的KisFuncConfig的策略創(chuàng)建基本完成了氛雪。
2.2.2 KisFlowConfig 定義
KisFlowConfig在設(shè)計文檔中的yaml文件形式如下:
kistype: flow
status: 1
flow_name: MyFlow1
flows:
- fname: 測試PrintInput
params:
args1: value1
args2: value2
- fname: 測試KisFunction_S1
- fname: 測試PrintInput
params:
args1: value11
args2: value22
default2: newDefault
- fname: 測試PrintInput
- fname: 測試KisFunction_S1
params:
my_user_param1: ffffffxxxxxx
- fname: 測試PrintInput
參數(shù)說明:
A. 結(jié)構(gòu)體定義
接下來我們根據(jù)上述的配置協(xié)議房匆,來定義KisFlow的策略配置結(jié)構(gòu)體,并且提供一些響應(yīng)的初始化方法。 我們在項目文檔中創(chuàng)建kis_flow_config.go
文件浴鸿,在這里我們將需要的Config定義實現(xiàn)井氢。
kis-flow/config/kis_flow_config.go
package config
import "kis-flow/common"
// KisFlowFunctionParam 一個Flow配置中Function的Id及攜帶固定配置參數(shù)
type KisFlowFunctionParam struct {
FuncName string `yaml:"fname"` //必須
Params FParam `yaml:"params"` //選填,在當前Flow中Function定制固定配置參數(shù)
}
// KisFlowConfig 用戶貫穿整條流式計算上下文環(huán)境的對象
type KisFlowConfig struct {
KisType string `yaml:"kistype"`
Status int `yaml:"status"`
FlowName string `yaml:"flow_name"`
Flows []KisFlowFunctionParam `yaml:"flows"`
}
這里提供了一個新的參數(shù)類型 KisFlowFunctionParam
,這個表示配置KisFlow的時候岳链,在調(diào)度的時候花竞,flow默認傳遞當前被調(diào)度Function的自定義默認參數(shù),如果不需要可以不添加此參數(shù)掸哑。
B. 相關(guān)方法定義
提供一個新建KisFlowConfig
的構(gòu)造方法约急。
kis-flow/config/kis_flow_config.go
// NewFlowConfig 創(chuàng)建一個Flow策略配置對象, 用于描述一個KisFlow信息
func NewFlowConfig(flowName string, enable common.KisOnOff) *KisFlowConfig {
config := new(KisFlowConfig)
config.FlowName = flowName
config.Flows = make([]KisFlowFunctionParam, 0)
config.Status = int(enable)
return config
}
// AppendFunctionConfig 添加一個Function Config 到當前Flow中
func (fConfig *KisFlowConfig) AppendFunctionConfig(params KisFlowFunctionParam) {
fConfig.Flows = append(fConfig.Flows, params)
}
有關(guān)flow攜帶的Function配置,這里我們采用通過AppendFunctionConfig
動態(tài)的去添加举户,目的是為了烤宙,今后可能有關(guān)kisflow的配置會從數(shù)據(jù)庫/動態(tài)遠程配置等中提取,那么就需要動態(tài)的將配置組合進來俭嘁。
C. KisFlowConfig單元測試
同樣躺枕,我們簡單些一個單元測試來測試KisFlowConfig的創(chuàng)建。
kis-flow/test/kis_config_test.go
func TestNewFlowConfig(t *testing.T) {
flowFuncParams1 := config.KisFlowFunctionParam{
FuncName: "funcName1",
Params: config.FParam{
"flowSetFunParam1": "value1",
"flowSetFunParam2": "value2",
},
}
flowFuncParams2 := config.KisFlowFunctionParam{
FuncName: "funcName2",
Params: config.FParam{
"default": "value1",
},
}
myFlow1 := config.NewFlowConfig("flowName1", common.FlowEnable)
myFlow1.AppendFunctionConfig(flowFuncParams1)
myFlow1.AppendFunctionConfig(flowFuncParams2)
log.Logger().InfoF("myFlow1: %+v\n", myFlow1)
}
我們cd
到kis-flow/test/
目錄下執(zhí)行單元測試指令:
$ go test -test.v -test.paniconexit0 -test.run TestNewFlowConfig
得到結(jié)果如下:
=== RUN TestNewFlowConfig
myFlow1: &{KisType: Status:1 FlowName:flowName1 Flows:[{FuncName:funcName1 Params:map[flowSetFunParam1:value1 flowSetFunParam2:value2]} {FuncName:funcName2 Params:map[default:value1]}]}
--- PASS: TestNewFlowConfig (0.00s)
PASS
ok kis-flow/test 0.251s
2.2.3 KisConnConfig
KisConnConfig在設(shè)計文檔中的yaml文件形式如下:
kistype: conn
cname: 測試KisConnector_1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: userid_orderid_option
params:
args1: value1
args2: value2
load: null
save:
- 測試KisFunction_S1
A. 結(jié)構(gòu)體定義
接下來我們根據(jù)上述的配置協(xié)議供填,來定義KisConnector的策略配置結(jié)構(gòu)體拐云,并且提供一些響應(yīng)的初始化方法。 我們在項目文檔中創(chuàng)建kis_conn_config.go
文件近她,在這里我們將需要的Config定義實現(xiàn)叉瘩。
kis-flow/config/kis_conn_config.go
package config
import (
"errors"
"fmt"
"kis-flow/common"
)
// KisConnConfig KisConnector 策略配置
type KisConnConfig struct {
//配置類型
KisType string `yaml:"kistype"`
//唯一描述標識
CName string `yaml:"cname"`
//基礎(chǔ)存儲媒介地址
AddrString string `yaml:"addrs"`
//存儲媒介引擎類型"Mysql" "Redis" "Kafka"等
Type common.KisConnType `yaml:"type"`
//一次存儲的標識:如Redis為Key名稱、Mysql為Table名稱,Kafka為Topic名稱等
Key string `yaml:"key"`
//配置信息中的自定義參數(shù)
Params map[string]string `yaml:"params"`
//存儲讀取所綁定的NsFuncionID
Load []string `yaml:"load"`
Save []string `yaml:"save"`
}
B. 相關(guān)方法定義
kis-flow/config/kis_conn_config.go
// NewConnConfig 創(chuàng)建一個KisConnector策略配置對象, 用于描述一個KisConnector信息
func NewConnConfig(cName string, addr string, t common.KisConnType, key string, param FParam) *KisConnConfig {
strategy := new(KisConnConfig)
strategy.CName = cName
strategy.AddrString = addr
strategy.Type = t
strategy.Key = key
strategy.Params = param
return strategy
}
// WithFunc Connector與Function進行關(guān)系綁定
func (cConfig *KisConnConfig) WithFunc(fConfig *KisFuncConfig) error {
switch common.KisMode(fConfig.FMode) {
case common.S:
cConfig.Save = append(cConfig.Save, fConfig.FName)
case common.L:
cConfig.Load = append(cConfig.Load, fConfig.FName)
default:
return errors.New(fmt.Sprintf("Wrong KisMode %s", fConfig.FMode))
}
return nil
}
這里也是通過提供WithFunc
方法來動態(tài)的添加Conn和Function的關(guān)聯(lián)關(guān)系 ###
C. KisConnConfig 單元測試 同樣粘捎,我們簡單些一個單元測試來測試KisConnConfig的創(chuàng)建薇缅。
kis-flow/test/kis_config_test.go
func TestNewConnConfig(t *testing.T) {
source := config.KisSource{
Name: "公眾號抖音商城戶訂單數(shù)據(jù)",
Must: []string{"order_id", "user_id"},
}
option := config.KisFuncOption{
CName: "connectorName1",
RetryTimes: 3,
RetryDuriton: 300,
Params: config.FParam{
"param1": "value1",
"param2": "value2",
},
}
myFunc1 := config.NewFuncConfig("funcName1", common.S, &source, &option)
connParams := config.FParam{
"param1": "value1",
"param2": "value2",
}
myConnector1 := config.NewConnConfig("connectorName1", "0.0.0.0:9987,0.0.0.0:9997", common.REDIS, "key", connParams)
if err := myConnector1.WithFunc(myFunc1); err != nil {
log.Logger().ErrorF("WithFunc err: %s\n", err.Error())
}
log.Logger().InfoF("myConnector1: %+v\n", myConnector1)
}
我們cd
到kis-fow/test/
目錄下執(zhí)行單元測試指令:
$ go test -test.v -test.paniconexit0 -test.run TestNewConnConfig
得到結(jié)果如下:
=== RUN TestNewConnConfig
myConnector1: &{KisType: CName:connectorName1 AddrString:0.0.0.0:9987,0.0.0.0:9997 Type:redis Key:key Params:map[param1:value1 param2:value2] Load:[] Save:[funcName1]}
--- PASS: TestNewConnConfig (0.00s)
PASS
ok kis-flow/test 0.481s
作者:劉丹冰Aceld github: https://github.com/aceld
KisFlow開源項目地址:https://github.com/aceld/kis-flow
連載中...
Golang框架實戰(zhàn)-KisFlow流式計算框架(1)-概述
Golang框架實戰(zhàn)-KisFlow流式計算框架(2)-項目構(gòu)建/基礎(chǔ)模塊-(上)
Golang框架實戰(zhàn)-KisFlow流式計算框架(3)-項目構(gòu)建/基礎(chǔ)模塊-(下)