golang kafka 2

package main

import (
    "fmt"
    "math/rand"
    "os"
    "strconv"
    "strings"
    "time"

    "github.com/Shopify/sarama"
    "github.com/bsm/sarama-cluster" //support automatic consumer-group rebalancing and offset tracking
    "github.com/sdbaiguanghe/glog"
)

var (
    topics = "test0"
)

// consumer 消費者
func consumer() {
    groupID := "group-1"
    config := cluster.NewConfig()
    config.Group.Return.Notifications = true
    config.Consumer.Offsets.CommitInterval = 1 * time.Second
    config.Consumer.Offsets.Initial = sarama.OffsetNewest //初始從最新的offset開始

    c, err := cluster.NewConsumer(strings.Split("localhost:9092", ","), groupID, strings.Split(topics, ","), config)
    if err != nil {
        glog.Errorf("Failed open consumer: %v", err)
        return
    }
    defer c.Close()
    go func(c *cluster.Consumer) {
        errors := c.Errors()
        noti := c.Notifications()
        for {
            select {
            case err := <-errors:
                glog.Errorln(err)
            case <-noti:
            }
        }
    }(c)

    for msg := range c.Messages() {
        fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Value)
        c.MarkOffset(msg, "") //MarkOffset 并不是實時寫入kafka被去,有可能在程序crash時丟掉未提交的offset
    }
}

// syncProducer 同步生產(chǎn)者
// 并發(fā)量小時尊浓,可以用這種方式
func syncProducer() {
    config := sarama.NewConfig()
    //  config.Producer.RequiredAcks = sarama.WaitForAll
    //  config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Return.Successes = true
    config.Producer.Timeout = 5 * time.Second
    p, err := sarama.NewSyncProducer(strings.Split("localhost:9092", ","), config)
    defer p.Close()
    if err != nil {
        glog.Errorln(err)
        return
    }

    v := "sync: " + strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))
    fmt.Fprintln(os.Stdout, v)
    msg := &sarama.ProducerMessage{
        Topic: topics,
        Value: sarama.ByteEncoder(v),
    }
    if _, _, err := p.SendMessage(msg); err != nil {
        glog.Errorln(err)
        return
    }
}

// asyncProducer 異步生產(chǎn)者
// 并發(fā)量大時于宙,必須采用這種方式
func asyncProducer() {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true //必須有這個選項
    config.Producer.Timeout = 5 * time.Second
    p, err := sarama.NewAsyncProducer(strings.Split("localhost:9092", ","), config)
    defer p.Close()
    if err != nil {
        return
    }

    //必須有這個匿名函數(shù)內(nèi)容
    go func(p sarama.AsyncProducer) {
        errors := p.Errors()
        success := p.Successes()
        for {
            select {
            case err := <-errors:
                if err != nil {
                    glog.Errorln(err)
                }
            case <-success:
            }
        }
    }(p)
    for {
        v := "async: " + strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))
        fmt.Fprintln(os.Stdout, v)
        msg := &sarama.ProducerMessage{
            Topic: topics,
            Value: sarama.ByteEncoder(v),
        }
        p.Input() <- msg
        time.Sleep(time.Second * 1)
    }
}

func main() {
    go asyncProducer()
    go consumer()
    time.Sleep(time.Second * 10000)
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末理卑,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子伦吠,更是在濱河造成了極大的恐慌,老刑警劉巖娃豹,帶你破解...
    沈念sama閱讀 206,311評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異购裙,居然都是意外死亡懂版,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評論 2 382
  • 文/潘曉璐 我一進店門躏率,熙熙樓的掌柜王于貴愁眉苦臉地迎上來躯畴,“玉大人,你說我怎么就攤上這事薇芝∨畛” “怎么了?”我有些...
    開封第一講書人閱讀 152,671評論 0 342
  • 文/不壞的土叔 我叫張陵夯到,是天一觀的道長嚷缭。 經(jīng)常有香客問我,道長耍贾,這世上最難降的妖魔是什么阅爽? 我笑而不...
    開封第一講書人閱讀 55,252評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮荐开,結(jié)果婚禮上付翁,老公的妹妹穿的比我還像新娘。我一直安慰自己晃听,他們只是感情好百侧,可當(dāng)我...
    茶點故事閱讀 64,253評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著能扒,像睡著了一般佣渴。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上初斑,一...
    開封第一講書人閱讀 49,031評論 1 285
  • 那天辛润,我揣著相機與錄音,去河邊找鬼越平。 笑死频蛔,一個胖子當(dāng)著我的面吹牛灵迫,可吹牛的內(nèi)容都是我干的秦叛。 我是一名探鬼主播,決...
    沈念sama閱讀 38,340評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼瀑粥,長吁一口氣:“原來是場噩夢啊……” “哼挣跋!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起狞换,我...
    開封第一講書人閱讀 36,973評論 0 259
  • 序言:老撾萬榮一對情侶失蹤避咆,失蹤者是張志新(化名)和其女友劉穎舟肉,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體查库,經(jīng)...
    沈念sama閱讀 43,466評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡路媚,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,937評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了樊销。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片整慎。...
    茶點故事閱讀 38,039評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖围苫,靈堂內(nèi)的尸體忽然破棺而出裤园,到底是詐尸還是另有隱情,我是刑警寧澤剂府,帶...
    沈念sama閱讀 33,701評論 4 323
  • 正文 年R本政府宣布拧揽,位于F島的核電站,受9級特大地震影響腺占,放射性物質(zhì)發(fā)生泄漏淤袜。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,254評論 3 307
  • 文/蒙蒙 一衰伯、第九天 我趴在偏房一處隱蔽的房頂上張望饮怯。 院中可真熱鬧,春花似錦嚎研、人聲如沸蓖墅。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽论矾。三九已至,卻和暖如春杆勇,著一層夾襖步出監(jiān)牢的瞬間贪壳,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工蚜退, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留闰靴,地道東北人。 一個月前我還...
    沈念sama閱讀 45,497評論 2 354
  • 正文 我出身青樓钻注,卻偏偏與公主長得像蚂且,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子幅恋,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,786評論 2 345

推薦閱讀更多精彩內(nèi)容