KisFlow-Golang流式實時計算案例(四)-KisFlow在消息隊列MQ中的應用

Golang框架實戰(zhàn)-KisFlow流式計算框架專欄

Golang框架實戰(zhàn)-KisFlow流式計算框架(1)-概述
Golang框架實戰(zhàn)-KisFlow流式計算框架(2)-項目構建/基礎模塊-(上)
Golang框架實戰(zhàn)-KisFlow流式計算框架(3)-項目構建/基礎模塊-(下)
Golang框架實戰(zhàn)-KisFlow流式計算框架(4)-數(shù)據(jù)流
Golang框架實戰(zhàn)-KisFlow流式計算框架(5)-Function調度
Golang框架實戰(zhàn)-KisFlow流式計算框架(6)-Connector
Golang框架實戰(zhàn)-KisFlow流式計算框架(7)-配置導入與導出
Golang框架實戰(zhàn)-KisFlow流式計算框架(8)-KisFlow Action
Golang框架實戰(zhàn)-KisFlow流式計算框架(9)-Cache/Params 數(shù)據(jù)緩存與數(shù)據(jù)參數(shù)
Golang框架實戰(zhàn)-KisFlow流式計算框架(10)-Flow多副本
Golang框架實戰(zhàn)-KisFlow流式計算框架(11)-Prometheus Metrics統(tǒng)計
Golang框架實戰(zhàn)-KisFlow流式計算框架(12)-基于反射自適應注冊FaaS形參類型

案例:
KisFlow-Golang流式計算案例(一)快速開始QuickStart
KisFlow-Golang流式計算案例(二)-Flow并流操作
KisFlow-Golang流式計算案例(二)-KisFlow在多協(xié)程中的應用


DownLoad kis-flow source

$go get github.com/aceld/kis-flow

KisFlow with Kafka

案例源代碼
https://github.com/aceld/kis-flow-usage/tree/main/12-with_kafka

這里以github.com/segmentio/kafka-go 作為第三方Kafka Client SDK(開發(fā)者也可以選擇其他kafka的go開發(fā)工具)烙懦。

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "github.com/segmentio/kafka-go"
    "sync"
    "time"
)

func main() {
    ctx := context.Background()

    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    // Get the flow
    flowOrg := kis.Pool().GetFlow("CalStuAvgScore")
    if flowOrg == nil {
        panic("flowOrg is nil")
    }

    // Create a new Kafka reader
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:     []string{"localhost:9092"},
        Topic:       "SourceStuScore",
        GroupID:     "group1",
        MinBytes:    10e3,                   // 10KB
        MaxBytes:    10e6,                   // 10MB
        MaxWait:     500 * time.Millisecond, // 最長等待時間
        StartOffset: kafka.FirstOffset,
    })

    defer reader.Close()

    var wg sync.WaitGroup
    for i := 0; i < 5; i++ { // use 5 consumers to consume in parallel
        wg.Add(1)
        go func() {
            // fork a new flow for each consumer
            flowCopy := flowOrg.Fork(ctx)

            defer wg.Done()
            for {
                // Read a message from Kafka
                message, err := reader.ReadMessage(ctx)
                if err != nil {
                    fmt.Printf("error reading message: %v\n", err)
                    break
                }

                // Commit the message to the flow
                _ = flowCopy.CommitRow(string(message.Value))

                // Run the flow
                if err := flowCopy.Run(ctx); err != nil {
                    fmt.Println("err: ", err)
                    return
                }
            }
        }()
    }

    wg.Wait()

    return
}

func init() {
    // Register functions
    kis.Pool().FaaS("VerifyStu", VerifyStu)
    kis.Pool().FaaS("AvgStuScore", AvgStuScore)
    kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}

KisFlow with Nsq

案例源代碼
https://github.com/aceld/kis-flow-usage/tree/main/13-with_nsq

本KisFlow消費者以github.com/nsqio/go-nsq作為第三方SDK奸腺。

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "github.com/nsqio/go-nsq"
)

func main() {
    ctx := context.Background()

    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    // Get the flow
    flowOrg := kis.Pool().GetFlow("CalStuAvgScore")
    if flowOrg == nil {
        panic("flowOrg is nil")
    }

    // Create a new NSQ consumer
    config := nsq.NewConfig()
    config.MaxInFlight = 5

    consumer, err := nsq.NewConsumer("SourceStuScore", "channel1", config)
    if err != nil {
        panic(err)
    }

    consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
        // fork a new flow for each message
        flowCopy := flowOrg.Fork(ctx)

        // Commit the message to the flow
        _ = flowCopy.CommitRow(string(message.Body))

        // Run the flow
        if err := flowCopy.Run(ctx); err != nil {
            fmt.Println("err: ", err)
            return err
        }

        return nil
    }))

    err = consumer.ConnectToNSQLookupd("localhost:4161")
    if err != nil {
        panic(err)
    }

    defer consumer.Stop()

    select {}
}

func init() {
    // Register functions
    kis.Pool().FaaS("VerifyStu", VerifyStu)
    kis.Pool().FaaS("AvgStuScore", AvgStuScore)
    kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}

KisFlow with RocketMQ

案例源代碼
https://github.com/aceld/kis-flow-usage/tree/main/14-with_rocketmq

github.com/apache/rocketmq-client-go 作為RocketMQ消費者SDK。

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "github.com/apache/rocketmq-client-go/v2"
    "github.com/apache/rocketmq-client-go/v2/consumer"
    "github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    // Get the flow
    myFloq := kis.Pool().GetFlow("CalStuAvgScore")
    if myFloq == nil {
        panic("myFloq is nil")
    }

    // Create a new RocketMQ consumer
    c, err := rocketmq.NewPushConsumer(
        consumer.WithGroupName("group1"),
        consumer.WithNameServer([]string{"localhost:9876"}),
    )
    if err != nil {
        panic(err)
    }

    err = c.Subscribe("SourceStuScore", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {

        for _, msg := range msgs {
            // Commit the message to the flow
            _ = myFloq.CommitRow(string(msg.Body))

        }

        // Run the flow
        if err := myFloq.Run(ctx); err != nil {
            fmt.Println("err: ", err)
            return consumer.ConsumeRetryLater, err
        }

        return consumer.ConsumeSuccess, nil
    })
    if err != nil {
        panic(err)
    }

    err = c.Start()
    if err != nil {
        panic(err)
    }

    defer c.Shutdown()

    select {}
}

作者:劉丹冰Aceld github: https://github.com/aceld
KisFlow開源項目地址:https://github.com/aceld/kis-flow

Golang框架實戰(zhàn)-KisFlow流式計算框架專欄

Golang框架實戰(zhàn)-KisFlow流式計算框架(1)-概述
Golang框架實戰(zhàn)-KisFlow流式計算框架(2)-項目構建/基礎模塊-(上)
Golang框架實戰(zhàn)-KisFlow流式計算框架(3)-項目構建/基礎模塊-(下)
Golang框架實戰(zhàn)-KisFlow流式計算框架(4)-數(shù)據(jù)流
Golang框架實戰(zhàn)-KisFlow流式計算框架(5)-Function調度
Golang框架實戰(zhàn)-KisFlow流式計算框架(6)-Connector
Golang框架實戰(zhàn)-KisFlow流式計算框架(7)-配置導入與導出
Golang框架實戰(zhàn)-KisFlow流式計算框架(8)-KisFlow Action
Golang框架實戰(zhàn)-KisFlow流式計算框架(9)-Cache/Params 數(shù)據(jù)緩存與數(shù)據(jù)參數(shù)
Golang框架實戰(zhàn)-KisFlow流式計算框架(10)-Flow多副本
Golang框架實戰(zhàn)-KisFlow流式計算框架(11)-Prometheus Metrics統(tǒng)計
Golang框架實戰(zhàn)-KisFlow流式計算框架(12)-基于反射自適應注冊FaaS形參類型

案例:
KisFlow-Golang流式計算案例(一)快速開始QuickStart
KisFlow-Golang流式計算案例(二)-Flow并流操作
KisFlow-Golang流式計算案例(三)-KisFlow在多協(xié)程中的應用

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末翩肌,一起剝皮案震驚了整個濱河市耳幢,隨后出現(xiàn)的幾起案子岸晦,更是在濱河造成了極大的恐慌,老刑警劉巖睛藻,帶你破解...
    沈念sama閱讀 216,324評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件启上,死亡現(xiàn)場離奇詭異,居然都是意外死亡店印,警方通過查閱死者的電腦和手機冈在,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,356評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來按摘,“玉大人包券,你說我怎么就攤上這事§畔停” “怎么了溅固?”我有些...
    開封第一講書人閱讀 162,328評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長兰珍。 經常有香客問我侍郭,道長,這世上最難降的妖魔是什么掠河? 我笑而不...
    開封第一講書人閱讀 58,147評論 1 292
  • 正文 為了忘掉前任励幼,我火速辦了婚禮,結果婚禮上口柳,老公的妹妹穿的比我還像新娘苹粟。我一直安慰自己,他們只是感情好跃闹,可當我...
    茶點故事閱讀 67,160評論 6 388
  • 文/花漫 我一把揭開白布嵌削。 她就那樣靜靜地躺著毛好,像睡著了一般。 火紅的嫁衣襯著肌膚如雪苛秕。 梳的紋絲不亂的頭發(fā)上肌访,一...
    開封第一講書人閱讀 51,115評論 1 296
  • 那天,我揣著相機與錄音艇劫,去河邊找鬼吼驶。 笑死,一個胖子當著我的面吹牛店煞,可吹牛的內容都是我干的蟹演。 我是一名探鬼主播,決...
    沈念sama閱讀 40,025評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼顷蟀,長吁一口氣:“原來是場噩夢啊……” “哼酒请!你這毒婦竟也來了?” 一聲冷哼從身側響起鸣个,我...
    開封第一講書人閱讀 38,867評論 0 274
  • 序言:老撾萬榮一對情侶失蹤羞反,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后囤萤,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體昼窗,經...
    沈念sama閱讀 45,307評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,528評論 2 332
  • 正文 我和宋清朗相戀三年涛舍,在試婚紗的時候發(fā)現(xiàn)自己被綠了澄惊。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,688評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡做盅,死狀恐怖,靈堂內的尸體忽然破棺而出窘哈,到底是詐尸還是另有隱情吹榴,我是刑警寧澤,帶...
    沈念sama閱讀 35,409評論 5 343
  • 正文 年R本政府宣布滚婉,位于F島的核電站图筹,受9級特大地震影響,放射性物質發(fā)生泄漏让腹。R本人自食惡果不足惜远剩,卻給世界環(huán)境...
    茶點故事閱讀 41,001評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望骇窍。 院中可真熱鬧瓜晤,春花似錦、人聲如沸腹纳。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,657評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至足画,卻和暖如春雄驹,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背淹辞。 一陣腳步聲響...
    開封第一講書人閱讀 32,811評論 1 268
  • 我被黑心中介騙來泰國打工医舆, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人象缀。 一個月前我還...
    沈念sama閱讀 47,685評論 2 368
  • 正文 我出身青樓蔬将,卻偏偏與公主長得像,于是被迫代替她去往敵國和親攻冷。 傳聞我的和親對象是個殘疾皇子娃胆,可洞房花燭夜當晚...
    茶點故事閱讀 44,573評論 2 353

推薦閱讀更多精彩內容