KisFlow-Golang流式實時計算案例(一)快速開始QuickStart

Golang框架實戰(zhàn)-KisFlow流式計算框架專欄

Golang框架實戰(zhàn)-KisFlow流式計算框架(1)-概述
Golang框架實戰(zhàn)-KisFlow流式計算框架(2)-項目構(gòu)建/基礎(chǔ)模塊-(上)
Golang框架實戰(zhàn)-KisFlow流式計算框架(3)-項目構(gòu)建/基礎(chǔ)模塊-(下)
Golang框架實戰(zhàn)-KisFlow流式計算框架(4)-數(shù)據(jù)流
Golang框架實戰(zhàn)-KisFlow流式計算框架(5)-Function調(diào)度
Golang框架實戰(zhàn)-KisFlow流式計算框架(6)-Connector
Golang框架實戰(zhàn)-KisFlow流式計算框架(7)-配置導(dǎo)入與導(dǎo)出
Golang框架實戰(zhàn)-KisFlow流式計算框架(8)-KisFlow Action
Golang框架實戰(zhàn)-KisFlow流式計算框架(9)-Cache/Params 數(shù)據(jù)緩存與數(shù)據(jù)參數(shù)
Golang框架實戰(zhàn)-KisFlow流式計算框架(10)-Flow多副本
Golang框架實戰(zhàn)-KisFlow流式計算框架(11)-Prometheus Metrics統(tǒng)計
Golang框架實戰(zhàn)-KisFlow流式計算框架(12)-基于反射自適應(yīng)注冊FaaS形參類型

案例:
KisFlow-Golang流式計算案例(一)快速開始QuickStart
KisFlow-Golang流式計算案例(二)-Flow并流操作
KisFlow-Golang流式計算案例(二)-KisFlow在多協(xié)程中的應(yīng)用


DownLoad kis-flow source

$go get github.com/aceld/kis-flow

《KisFlow開發(fā)者文檔》

1. KisFlow快速開始(使用配置文件)

案例源代碼: kis-flow-usage/2-quick_start_with_config at main · aceld/kis-flow-usage

首先我們創(chuàng)建一個項目喂击,項目的文件路徑如下:

項目目錄

├── Makefile
├── conf
│   ├── flow-CalStuAvgScore.yml
│   ├── func-AvgStuScore.yml
│   └── func-PrintStuAvgScore.yml
├── faas_stu_score_avg.go
├── faas_stu_score_avg_print.go
└── main.go

Flow

定義當(dāng)前的Flow尼桶,當(dāng)前的Flow名稱為:"CalStuAvgScore",這是一個計算學(xué)生平均分值的數(shù)據(jù)流。


定義兩個Function,F(xiàn)unction1為:Calculate,是計算學(xué)生平均分的邏輯,F(xiàn)unction2為Expand 為打印最終結(jié)果婆誓。

Config

有關(guān)Flow和Function的配置文件如下。

(1) Flow Config

conf/flow-CalStuAvgScore.yml

kistype: flow
status: 1
flow_name: CalStuAvgScore
flows:
 - fname: AvgStuScore
 - fname: PrintStuAvgScore

(2) Function1 Config

conf/func-AvgStuScore.yml

kistype: func
fname: AvgStuScore
fmode: Calculate
source:
 name: 學(xué)生學(xué)分
 must:
 - stu_id

(3) Function2(Slink) Config

conf/func-PrintStuAvgScore.yml

kistype: func
fname: PrintStuAvgScore
fmode: Expand
source:
 name: 學(xué)生學(xué)分
 must:
 - stu_id

Main

接下來是主邏輯也颤,主要分成三步驟:

  • 加載配置文件洋幻,獲取Flow實例;
  • 提交數(shù)據(jù);
  • 運(yùn)行Flow。

main.go

package main

import (
    "context"
    "fmt"

    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
)

func main() {
    ctx := context.Background()

    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)

    }

    // Get the flow
    flow1 := kis.Pool().GetFlow("CalStuAvgScore")
    if flow1 == nil {
        panic("flow1 is nil")

    }

    // Submit a string
    _ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
    // Submit a string
    _ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`)

    // Run the flow
    if err := flow1.Run(ctx); err != nil {
        fmt.Println("err: ", err)

    }

    return
}

Function1

第一個計算流程的實現(xiàn)邏輯如下, AvgStuScoreIn 為輸入數(shù)據(jù)類型翅娶,當(dāng)前有三個學(xué)分文留,AvgStuScoreOut為輸出數(shù)據(jù)類型,為平均分值竭沫。

faas_stu_score_avg.go

package main
import (
    "context"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"

)
type AvgStuScoreIn struct {
    serialize.DefaultSerialize
    StuId  int `json:"stu_id"`
    Score1 int `json:"score_1"`
    Score2 int `json:"score_2"`
    Score3 int `json:"score_3"`

}
type AvgStuScoreOut struct {
    serialize.DefaultSerialize
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`

// AvgStuScore(FaaS) 計算學(xué)生平均分
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {

    for _, row := range rows {
        out := AvgStuScoreOut{
            StuId:    row.StuId,
            AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
        }
        // 提交結(jié)果數(shù)據(jù)
        _ = flow.CommitRow(out)


    }
    return nil
}

Function2

打印的計算邏輯為直接打印數(shù)據(jù)即可燥翅,如下。

faas_stu_score_avg_print.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"

)

type PrintStuAvgScoreIn struct {
    serialize.DefaultSerialize
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`
}

type PrintStuAvgScoreOut struct {
    serialize.DefaultSerialize
}

func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
    for _, row := range rows {
        fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)

    }
    return nil
}

OutPut

最后運(yùn)行程序蜕提,得到結(jié)果如下:

Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
Add FlowRouter FlowName=CalStuAvgScore
stuid: [101], avg score: [90]
stuid: [102], avg score: [76.66666666666667]</pre>

2. KisFlow快速開始(使用原生接口森书,動態(tài)配置)

案例源代碼: kis-flow-usage/1-quick_start at main · aceld/kis-flow-usage

項目目錄

├── faas_stu_score_avg.go
├── faas_stu_score_avg_print.go
└── main.go

Flow

Main

main.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/common"
    "github.com/aceld/kis-flow/config"
    "github.com/aceld/kis-flow/flow"
    "github.com/aceld/kis-flow/kis"
)

func main() {
    ctx := context.Background()

    // Create a new flow configuration
    myFlowConfig1 := config.NewFlowConfig("CalStuAvgScore", common.FlowEnable)

    // Create new function configuration
    avgStuScoreConfig := config.NewFuncConfig("AvgStuScore", common.C, nil, nil)
    printStuScoreConfig := config.NewFuncConfig("PrintStuAvgScore", common.E, nil, nil)

    // Create a new flow
    flow1 := flow.NewKisFlow(myFlowConfig1)

    // Link functions to the flow
    _ = flow1.Link(avgStuScoreConfig, nil)
    _ = flow1.Link(printStuScoreConfig, nil)

    // Submit a string
    _ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
    // Submit a string
    _ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`)

    // Run the flow
    if err := flow1.Run(ctx); err != nil {
        fmt.Println("err: ", err)
    }

    return
}

func init() {
    // Register functions
    kis.Pool().FaaS("AvgStuScore", AvgStuScore)
    kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}

Function1

faas_stu_score_avg.go

package main

import (
    "context"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
)

type AvgStuScoreIn struct {
    serialize.DefaultSerialize
    StuId  int `json:"stu_id"`
    Score1 int `json:"score_1"`
    Score2 int `json:"score_2"`
    Score3 int `json:"score_3"`
}

type AvgStuScoreOut struct {
    serialize.DefaultSerialize
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`
}

// AvgStuScore(FaaS) 計算學(xué)生平均分
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
    for _, row := range rows {

        out := AvgStuScoreOut{
            StuId:    row.StuId,
            AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
        }

        // 提交結(jié)果數(shù)據(jù)
        _ = flow.CommitRow(out)
    }

    return nil
}

Function2

faas_stu_score_avg_print.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
)

type PrintStuAvgScoreIn struct {
    serialize.DefaultSerialize
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`
}

type PrintStuAvgScoreOut struct {
    serialize.DefaultSerialize
}

func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {

    for _, row := range rows {
        fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
    }

    return nil
}

OutPut

Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
funcName NewConfig source is nil, funcName = AvgStuScore, use default unNamed Source.
funcName NewConfig source is nil, funcName = PrintStuAvgScore, use default unNamed Source.
stuid: [101], avg score: [90]
stuid: [102], avg score: [76.66666666666667]

作者:劉丹冰Aceld github: https://github.com/aceld
KisFlow開源項目地址:https://github.com/aceld/kis-flow

Golang框架實戰(zhàn)-KisFlow流式計算框架專欄

Golang框架實戰(zhàn)-KisFlow流式計算框架(1)-概述
Golang框架實戰(zhàn)-KisFlow流式計算框架(2)-項目構(gòu)建/基礎(chǔ)模塊-(上)
Golang框架實戰(zhàn)-KisFlow流式計算框架(3)-項目構(gòu)建/基礎(chǔ)模塊-(下)
Golang框架實戰(zhàn)-KisFlow流式計算框架(4)-數(shù)據(jù)流
Golang框架實戰(zhàn)-KisFlow流式計算框架(5)-Function調(diào)度
Golang框架實戰(zhàn)-KisFlow流式計算框架(6)-Connector
Golang框架實戰(zhàn)-KisFlow流式計算框架(7)-配置導(dǎo)入與導(dǎo)出
Golang框架實戰(zhàn)-KisFlow流式計算框架(8)-KisFlow Action
Golang框架實戰(zhàn)-KisFlow流式計算框架(10)-Flow多副本
Golang框架實戰(zhàn)-KisFlow流式計算框架(11)-Prometheus Metrics統(tǒng)計
Golang框架實戰(zhàn)-KisFlow流式計算框架(12)-基于反射自適應(yīng)注冊FaaS形參類型
Golang框架實戰(zhàn)-KisFlow流式計算框架(12)-基于反射自適應(yīng)注冊FaaS形參類型

案例:
KisFlow-Golang流式計算案例(一)快速開始QuickStart
KisFlow-Golang流式計算案例(二)-Flow并流操作
KisFlow-Golang流式計算案例(二)-KisFlow在多協(xié)程中的應(yīng)用

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子拄氯,更是在濱河造成了極大的恐慌躲查,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,324評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件译柏,死亡現(xiàn)場離奇詭異镣煮,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)鄙麦,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,356評論 3 392
  • 文/潘曉璐 我一進(jìn)店門渡蜻,熙熙樓的掌柜王于貴愁眉苦臉地迎上來混蔼,“玉大人,你說我怎么就攤上這事〉虐龋” “怎么了贸诚?”我有些...
    開封第一講書人閱讀 162,328評論 0 353
  • 文/不壞的土叔 我叫張陵徘键,是天一觀的道長绍哎。 經(jīng)常有香客問我,道長寒波,這世上最難降的妖魔是什么乘盼? 我笑而不...
    開封第一講書人閱讀 58,147評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮俄烁,結(jié)果婚禮上绸栅,老公的妹妹穿的比我還像新娘。我一直安慰自己页屠,他們只是感情好粹胯,可當(dāng)我...
    茶點故事閱讀 67,160評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著辰企,像睡著了一般风纠。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上蟆豫,一...
    開封第一講書人閱讀 51,115評論 1 296
  • 那天议忽,我揣著相機(jī)與錄音,去河邊找鬼十减。 笑死,一個胖子當(dāng)著我的面吹牛愤估,可吹牛的內(nèi)容都是我干的帮辟。 我是一名探鬼主播,決...
    沈念sama閱讀 40,025評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼玩焰,長吁一口氣:“原來是場噩夢啊……” “哼由驹!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,867評論 0 274
  • 序言:老撾萬榮一對情侶失蹤蔓榄,失蹤者是張志新(化名)和其女友劉穎并炮,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體甥郑,經(jīng)...
    沈念sama閱讀 45,307評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡逃魄,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,528評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了澜搅。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片伍俘。...
    茶點故事閱讀 39,688評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖勉躺,靈堂內(nèi)的尸體忽然破棺而出癌瘾,到底是詐尸還是另有隱情,我是刑警寧澤饵溅,帶...
    沈念sama閱讀 35,409評論 5 343
  • 正文 年R本政府宣布妨退,位于F島的核電站,受9級特大地震影響蜕企,放射性物質(zhì)發(fā)生泄漏碧注。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,001評論 3 325
  • 文/蒙蒙 一糖赔、第九天 我趴在偏房一處隱蔽的房頂上張望萍丐。 院中可真熱鬧,春花似錦放典、人聲如沸逝变。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,657評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽壳影。三九已至,卻和暖如春弥臼,著一層夾襖步出監(jiān)牢的瞬間宴咧,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,811評論 1 268
  • 我被黑心中介騙來泰國打工径缅, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留掺栅,地道東北人。 一個月前我還...
    沈念sama閱讀 47,685評論 2 368
  • 正文 我出身青樓纳猪,卻偏偏與公主長得像氧卧,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子氏堤,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,573評論 2 353

推薦閱讀更多精彩內(nèi)容