go項目Kafka示例

1辛燥,如何保證消息100%投遞镜沽?

--生產(chǎn)者

--消費者

2,如何保證消息的順序性茂洒?

kafka同一個分區(qū)可以保證順序性,因此只需要將需要保證順序性的消息放到同一個分區(qū)即可(要么只用一個分區(qū)似将,要么用hash分區(qū))获黔。

3,如何保證消息不重復在验?

消息是有可能重復的玷氏,但是我們要保證消費了重復的消息也不出問題,就是要保證消息的冪等性

4腋舌,如何單播盏触、多播?

通過消費者組來實現(xiàn)。一個主題可以設置0或者多個消費者組赞辩,在同一個消費者組內(nèi)雌芽,不同消費者消費的內(nèi)容是互斥的。注意辨嗽,同一個消費者組內(nèi)的消費者個數(shù)不能大于分區(qū)數(shù)量世落,否則多余的消費者不會分配到分區(qū)將處于阻塞狀態(tài)

消費者

KafkaConsumer.go

package cws
import (
    "github.com/IBM/sarama"
    "log"
    "os"
    "os/signal"
)

func KafkaConsumer() {
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true
    client, err := sarama.NewClient([]string{"localhost:9192", "localhost:9292", "localhost:9392"}, config)
    defer client.Close()
    if err != nil {
        panic(err)
    }
    consumer, err := sarama.NewConsumerFromClient(client)

    defer consumer.Close()
    if err != nil {
        panic(err)
    }
    // get partitionId list
    partitions, err := consumer.Partitions("my_topic")
    if err != nil {
        panic(err)
    }

    for _, partitionId := range partitions {
        // create partitionConsumer for every partitionId
        partitionConsumer, err := consumer.ConsumePartition("my_topic", partitionId, sarama.OffsetNewest)
        if err != nil {
            panic(err)
        }

        go func(pc *sarama.PartitionConsumer) {
            defer (*pc).Close()
            // block
            for message := range (*pc).Messages() {
                value := string(message.Value)
                log.Printf("Partitionid: %d; offset:%d, value: %s\n", message.Partition, message.Offset, value)
            }

        }(&partitionConsumer)
    }
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)
    select {
    case <-signals:

    }
}

消費組

KafkaConsumerGroup.go

package cws

import (
    "context"
    "fmt"
    "github.com/IBM/sarama"
    "os"
    "os/signal"
    "sync"
)

type consumerGroupHandler struct {
    name string
}

func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        fmt.Printf("%s Message topic:%q partition:%d offset:%d  value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
        // 手動確認消息
        sess.MarkMessage(msg, "")
//手動的話需要提交下。
        sess.Commit()
    }
    return nil
}

func handleErrors(group *sarama.ConsumerGroup, wg *sync.WaitGroup) {
    wg.Done()
    for err := range (*group).Errors() {
        fmt.Println("ERROR", err)
    }
}

func consume(group *sarama.ConsumerGroup, wg *sync.WaitGroup, name string) {
    fmt.Println(name + "start")
    wg.Done()
    ctx := context.Background()
    for {
        topics := []string{"my_topic"}
        handler := consumerGroupHandler{name: name}
        err := (*group).Consume(ctx, topics, handler)
        if err != nil {
            panic(err)
        }
    }
}

func KafkaConsumerGroup() {
    var wg sync.WaitGroup
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = false
    config.Version = sarama.V0_10_2_0
    client, err := sarama.NewClient([]string{"localhost:9192", "localhost:9292", "localhost:9392"}, config)
    defer client.Close()
    if err != nil {
        panic(err)
    }
    group1, err := sarama.NewConsumerGroupFromClient("c1", client)
    if err != nil {
        panic(err)
    }
    group2, err := sarama.NewConsumerGroupFromClient("c2", client)
    if err != nil {
        panic(err)
    }
    group3, err := sarama.NewConsumerGroupFromClient("c3", client)
    if err != nil {
        panic(err)
    }
    defer group1.Close()
    defer group2.Close()
    defer group3.Close()
    wg.Add(3)
    go consume(&group1, &wg, "c1")
    go consume(&group2, &wg, "c2")
    go consume(&group3, &wg, "c3")
    wg.Wait()
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)
    select {
    case <-signals:
    }
}

生產(chǎn)者

KafkaProducer.go

package cws

import (
    "github.com/IBM/sarama"
    "log"
    "os"
    "os/signal"
    "sync"
)

func main() {
    config := sarama.NewConfig()

    config.Producer.Return.Successes = true
    config.Producer.Partitioner = sarama.NewRandomPartitioner

    client, err := sarama.NewClient([]string{"192.168.0.104:9192", "localhost:9292", "localhost:9392"}, config)
    defer client.Close()
    if err != nil {
        panic(err)
    }
    producer, err := sarama.NewAsyncProducerFromClient(client)
    if err != nil {
        panic(err)
    }

    // Trap SIGINT to trigger a graceful shutdown.
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    var (
        wg                          sync.WaitGroup
        enqueued, successes, errors int
    )

    wg.Add(1)
    // start a groutines to count successes num
    go func() {
        defer wg.Done()
        for range producer.Successes() {
            successes++
        }
    }()

    wg.Add(1)
    // start a groutines to count error num
    go func() {
        defer wg.Done()
        for err := range producer.Errors() {
            log.Println(err)
            errors++
        }
    }()

ProducerLoop:
    for {
        message := &sarama.ProducerMessage{Topic: "my_topic", Value: sarama.StringEncoder("testing 123")}
        select {
        case producer.Input() <- message:
            enqueued++

        case <-signals:
            producer.AsyncClose() // Trigger a shutdown of the producer.
            break ProducerLoop
        }
    }

    wg.Wait()

    log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
}

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末糟需,一起剝皮案震驚了整個濱河市屉佳,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌洲押,老刑警劉巖武花,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異杈帐,居然都是意外死亡体箕,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進店門挑童,熙熙樓的掌柜王于貴愁眉苦臉地迎上來累铅,“玉大人,你說我怎么就攤上這事炮沐≌海” “怎么了?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵大年,是天一觀的道長换薄。 經(jīng)常有香客問我,道長翔试,這世上最難降的妖魔是什么轻要? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮垦缅,結果婚禮上冲泥,老公的妹妹穿的比我還像新娘。我一直安慰自己壁涎,他們只是感情好凡恍,可當我...
    茶點故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著怔球,像睡著了一般嚼酝。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上竟坛,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天闽巩,我揣著相機與錄音钧舌,去河邊找鬼。 笑死涎跨,一個胖子當著我的面吹牛洼冻,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播隅很,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼撞牢,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了外构?” 一聲冷哼從身側響起普泡,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎审编,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體歧匈,經(jīng)...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡垒酬,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了件炉。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片勘究。...
    茶點故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖斟冕,靈堂內(nèi)的尸體忽然破棺而出口糕,到底是詐尸還是另有隱情,我是刑警寧澤磕蛇,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布景描,位于F島的核電站,受9級特大地震影響秀撇,放射性物質(zhì)發(fā)生泄漏超棺。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一呵燕、第九天 我趴在偏房一處隱蔽的房頂上張望棠绘。 院中可真熱鬧,春花似錦再扭、人聲如沸氧苍。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽让虐。三九已至,卻和暖如春敦跌,著一層夾襖步出監(jiān)牢的瞬間澄干,已是汗流浹背逛揩。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留麸俘,地道東北人辩稽。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像从媚,于是被迫代替她去往敵國和親逞泄。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,762評論 2 345

推薦閱讀更多精彩內(nèi)容