Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架專欄
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(1)-概述
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(2)-項(xiàng)目構(gòu)建/基礎(chǔ)模塊-(上)
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(3)-項(xiàng)目構(gòu)建/基礎(chǔ)模塊-(下)
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(4)-數(shù)據(jù)流
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(5)-Function調(diào)度
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(6)-Connector
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(7)-配置導(dǎo)入與導(dǎo)出
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(8)-KisFlow Action
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(9)-Cache/Params 數(shù)據(jù)緩存與數(shù)據(jù)參數(shù)
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(10)-Flow多副本
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(11)-Prometheus Metrics統(tǒng)計(jì)
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(12)-基于反射自適應(yīng)注冊(cè)FaaS形參類型
案例:
KisFlow-Golang流式計(jì)算案例(一)快速開始QuickStart
KisFlow-Golang流式計(jì)算案例(二)-Flow并流操作
KisFlow-Golang流式計(jì)算案例(二)-KisFlow在多協(xié)程中的應(yīng)用
DownLoad kis-flow source
$go get github.com/aceld/kis-flow
案例源代碼
https://github.com/aceld/kis-flow-usage/tree/main/6-flow_in_goroutines
如果需要同一個(gè)Flow在多個(gè)Goroutine中同時(shí)并發(fā)執(zhí)行,那么可以通過flow.Fork()
函數(shù)远舅,克隆一份內(nèi)存隔離但是具備相同配置的Flow實(shí)例,然后分別在不同的協(xié)程中去計(jì)算執(zhí)行各自的數(shù)據(jù)流。
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/file"
"github.com/aceld/kis-flow/kis"
"sync"
)
func main() {
ctx := context.Background()
// Get a WaitGroup
var wg sync.WaitGroup
// Load Configuration from file
if err := file.ConfigImportYaml("conf/"); err != nil {
panic(err)
}
// Get the flow
flow1 := kis.Pool().GetFlow("CalStuAvgScore")
if flow1 == nil {
panic("flow1 is nil")
}
// Fork the flow
flowClone1 := flow1.Fork(ctx)
// Add to WaitGroup
wg.Add(2)
// Run Flow1
go func() {
defer wg.Done()
// Submit a string
_ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
// Submit a string
_ = flow1.CommitRow(`{"stu_id":1001, "score_1":100, "score_2":70, "score_3":60}`)
// Run the flow
if err := flow1.Run(ctx); err != nil {
fmt.Println("err: ", err)
}
}()
// Run FlowClone1
go func() {
defer wg.Done()
// Submit a string
_ = flowClone1.CommitRow(`{"stu_id":201, "score_1":100, "score_2":90, "score_3":80}`)
// Submit a string
_ = flowClone1.CommitRow(`{"stu_id":2001, "score_1":100, "score_2":70, "score_3":60}`)
if err := flowClone1.Run(ctx); err != nil {
fmt.Println("err: ", err)
}
}()
// Wait for Goroutines to finish
wg.Wait()
fmt.Println("All flows completed.")
return
}
func init() {
// Register functions
kis.Pool().FaaS("VerifyStu", VerifyStu)
kis.Pool().FaaS("AvgStuScore", AvgStuScore)
kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}
作者:劉丹冰Aceld github: https://github.com/aceld
KisFlow開源項(xiàng)目地址:https://github.com/aceld/kis-flow
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架專欄
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(1)-概述
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(2)-項(xiàng)目構(gòu)建/基礎(chǔ)模塊-(上)
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(3)-項(xiàng)目構(gòu)建/基礎(chǔ)模塊-(下)
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(4)-數(shù)據(jù)流
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(5)-Function調(diào)度
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(6)-Connector
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(7)-配置導(dǎo)入與導(dǎo)出
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(8)-KisFlow Action
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(9)-Cache/Params 數(shù)據(jù)緩存與數(shù)據(jù)參數(shù)
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(10)-Flow多副本
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(11)-Prometheus Metrics統(tǒng)計(jì)
Golang框架實(shí)戰(zhàn)-KisFlow流式計(jì)算框架(12)-基于反射自適應(yīng)注冊(cè)FaaS形參類型
案例:
KisFlow-Golang流式計(jì)算案例(一)快速開始QuickStart
KisFlow-Golang流式計(jì)算案例(二)-Flow并流操作
KisFlow-Golang流式計(jì)算案例(三)-KisFlow在多協(xié)程中的應(yīng)用