https://www.lixueduan.com/post/kafka/06-sarama-producer/
func Producer(topic string, limit int) {
config := sarama.NewConfig()
// 異步生產(chǎn)者不建議把 Errors 和 Successes 都開啟子眶,一般開啟 Errors 就行
// 同步生產(chǎn)者就必須都開啟,因?yàn)闀椒祷匕l(fā)送成功或者失敗
config.Producer.Return.Errors = true // 設(shè)定是否需要返回錯(cuò)誤信息
config.Producer.Return.Successes = true // 設(shè)定是否需要返回成功信息
producer, err := sarama.NewAsyncProducer([]string{conf.HOST}, config)
if err != nil {
log.Fatal("NewSyncProducer err:", err)
}
var (
wg sync.WaitGroup
enqueued, timeout, successes, errors int
)
// [!important] 異步生產(chǎn)者發(fā)送后必須把返回值從 Errors 或者 Successes 中讀出來 不然會阻塞 sarama 內(nèi)部處理邏輯 導(dǎo)致只能發(fā)出去一條消息
wg.Add(1)
go func() {
defer wg.Done()
for range producer.Successes() {
// log.Printf("[Producer] Success: key:%v msg:%+v \n", s.Key, s.Value)
successes++
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for e := range producer.Errors() {
log.Printf("[Producer] Errors:err:%v msg:%+v \n", e.Msg, e.Err)
errors++
}
}()
// 異步發(fā)送
for i := 0; i < limit; i++ {
str := strconv.Itoa(int(time.Now().UnixNano()))
msg := &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(str)}
// 異步發(fā)送只是寫入內(nèi)存了就返回了,并沒有真正發(fā)送出去
// sarama 庫中用的是一個(gè) channel 來接收胆萧,后臺 goroutine 異步從該 channel 中取出消息并真正發(fā)送
// select + ctx 做超時(shí)控制,防止阻塞 producer.Input() <- msg 也可能會阻塞
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
select {
case producer.Input() <- msg:
enqueued++
case <-ctx.Done():
timeout++
}
cancel()
if i%10000 == 0 && i != 0 {
log.Printf("已發(fā)送消息數(shù):%d 超時(shí)數(shù):%d\n", i, timeout)
}
}
// We are done
producer.AsyncClose()
wg.Wait()
log.Printf("發(fā)送完畢 總發(fā)送條數(shù):%d enqueued:%d timeout:%d successes: %d errors: %d\n", limit, enqueued, timeout, successes, errors)
}