package main
import (
"fmt"
"math/rand"
"os"
"strconv"
"strings"
"time"
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster" //support automatic consumer-group rebalancing and offset tracking
"github.com/sdbaiguanghe/glog"
)
var (
topics = "test0"
)
// consumer 消費者
func consumer() {
groupID := "group-1"
config := cluster.NewConfig()
config.Group.Return.Notifications = true
config.Consumer.Offsets.CommitInterval = 1 * time.Second
config.Consumer.Offsets.Initial = sarama.OffsetNewest //初始從最新的offset開始
c, err := cluster.NewConsumer(strings.Split("localhost:9092", ","), groupID, strings.Split(topics, ","), config)
if err != nil {
glog.Errorf("Failed open consumer: %v", err)
return
}
defer c.Close()
go func(c *cluster.Consumer) {
errors := c.Errors()
noti := c.Notifications()
for {
select {
case err := <-errors:
glog.Errorln(err)
case <-noti:
}
}
}(c)
for msg := range c.Messages() {
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Value)
c.MarkOffset(msg, "") //MarkOffset 并不是實時寫入kafka被去,有可能在程序crash時丟掉未提交的offset
}
}
// syncProducer 同步生產(chǎn)者
// 并發(fā)量小時尊浓,可以用這種方式
func syncProducer() {
config := sarama.NewConfig()
// config.Producer.RequiredAcks = sarama.WaitForAll
// config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
p, err := sarama.NewSyncProducer(strings.Split("localhost:9092", ","), config)
defer p.Close()
if err != nil {
glog.Errorln(err)
return
}
v := "sync: " + strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))
fmt.Fprintln(os.Stdout, v)
msg := &sarama.ProducerMessage{
Topic: topics,
Value: sarama.ByteEncoder(v),
}
if _, _, err := p.SendMessage(msg); err != nil {
glog.Errorln(err)
return
}
}
// asyncProducer 異步生產(chǎn)者
// 并發(fā)量大時于宙,必須采用這種方式
func asyncProducer() {
config := sarama.NewConfig()
config.Producer.Return.Successes = true //必須有這個選項
config.Producer.Timeout = 5 * time.Second
p, err := sarama.NewAsyncProducer(strings.Split("localhost:9092", ","), config)
defer p.Close()
if err != nil {
return
}
//必須有這個匿名函數(shù)內(nèi)容
go func(p sarama.AsyncProducer) {
errors := p.Errors()
success := p.Successes()
for {
select {
case err := <-errors:
if err != nil {
glog.Errorln(err)
}
case <-success:
}
}
}(p)
for {
v := "async: " + strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))
fmt.Fprintln(os.Stdout, v)
msg := &sarama.ProducerMessage{
Topic: topics,
Value: sarama.ByteEncoder(v),
}
p.Input() <- msg
time.Sleep(time.Second * 1)
}
}
func main() {
go asyncProducer()
go consumer()
time.Sleep(time.Second * 10000)
}
golang kafka 2
最后編輯于 :
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
- 文/潘曉璐 我一進店門躏率,熙熙樓的掌柜王于貴愁眉苦臉地迎上來躯畴,“玉大人,你說我怎么就攤上這事薇芝∨畛” “怎么了?”我有些...
- 文/不壞的土叔 我叫張陵夯到,是天一觀的道長嚷缭。 經(jīng)常有香客問我,道長耍贾,這世上最難降的妖魔是什么阅爽? 我笑而不...
- 正文 為了忘掉前任,我火速辦了婚禮荐开,結(jié)果婚禮上付翁,老公的妹妹穿的比我還像新娘。我一直安慰自己晃听,他們只是感情好百侧,可當(dāng)我...
- 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著能扒,像睡著了一般佣渴。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上初斑,一...
- 文/蒼蘭香墨 我猛地睜開眼瀑粥,長吁一口氣:“原來是場噩夢啊……” “哼挣跋!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起狞换,我...
- 正文 年R本政府宣布拧揽,位于F島的核電站,受9級特大地震影響腺占,放射性物質(zhì)發(fā)生泄漏淤袜。R本人自食惡果不足惜,卻給世界環(huán)境...
- 文/蒙蒙 一衰伯、第九天 我趴在偏房一處隱蔽的房頂上張望饮怯。 院中可真熱鬧,春花似錦嚎研、人聲如沸蓖墅。這莊子的主人今日做“春日...
- 文/蒼蘭香墨 我抬頭看了看天上的太陽论矾。三九已至,卻和暖如春杆勇,著一層夾襖步出監(jiān)牢的瞬間贪壳,已是汗流浹背。 一陣腳步聲響...
推薦閱讀更多精彩內(nèi)容
- =========================================================...
- 本文轉(zhuǎn)載自http://dataunion.org/?p=9307 背景介紹Kafka簡介Kafka是一種分布式的...
- 談到docker源碼杏死,其實網(wǎng)上有很多的源碼的分析的文章,也看過一些大牛寫的docker源碼解讀的文章,收獲很大淑翼。我...
- 1腐巢、下載解壓 2、環(huán)境變量 hive-env.sh 添加 3玄括、hive-site.xml配置主要說來就是最好創(chuàng)建一...