Shopify/sarama的producer有兩種運(yùn)行模式:
- 同步模式
producer把消息發(fā)給kafka之后會(huì)等待結(jié)果返回丙曙。 - 異步模式
producer把消息發(fā)給kafka之后不會(huì)等待結(jié)果返回相嵌。
- 同步模式
config := sarama.NewConfig()
config.Producer.Return.Successes = true
client, err := sarama.NewClient([]{"localhost:9092"}, config)
if err != nil {
log.Fatalf("unable to create kafka client: %q", err)
}
producer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
log.Fatalf("unable to create kafka producer: %q", err)
}
defer producer.Close()
text := fmt.Sprintf("message %08d", i)
partition, offset , err := producer.SendMessage(&sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)})
if err != nil {
log.Fatalf("unable to produce message: %q", err)
}
...
注意同步模式下,下面配置必須置上:
config.Producer.Return.Successes = true
否則運(yùn)行報(bào)錯(cuò):
2018/12/25 08:08:30 unable to create kafka producer: "kafka:
invalid configuration (Producer.Return.Successes must be true to be used in a SyncProducer)"
- 異步模式
異步模式融撞,顧名思義就是produce一個(gè)message之后不等待發(fā)送完成返回;這樣調(diào)用者可以繼續(xù)做其他的工作佩厚。
config := sarama.NewConfig()
// config.Producer.Return.Successes = true
client, err := sarama.NewClient([]{"localhost:9092"}, config)
if err != nil {
log.Fatalf("unable to create kafka client: %q", err)
}
producer, err := sarama.NewAsyncProducerFromClient
if err != nil {
log.Fatalf("unable to create kafka producer: %q", err)
}
defer producer.Close()
text := fmt.Sprintf("message %08d", i)
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
// wait response
select {
//case msg := <-producer.Successes():
// log.Printf("Produced message successes: [%s]\n",msg.Value)
case err := <-producer.Errors():
log.Println("Produced message failure: ", err)
default:
log.Println("Produced message default",)
}
...
關(guān)于異步producer有一個(gè)地方取藥注意的辆床。
- 異步模式produce一個(gè)消息后,缺省并不會(huì)報(bào)告成功狀態(tài)灵临。
config.Producer.Return.Successes = false
...
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)
// wait response
select {
case msg := <-producer.Successes():
log.Printf("Produced message successes: [%s]\n",msg.Value)
case err := <-producer.Errors():
log.Println("Produced message failure: ", err)
}
則這段代碼會(huì)掛住拣挪,因?yàn)樵O(shè)置沒有要求返回成功config.Producer.Return.Successes = false
,那么在select等待的時(shí)候producer.Successes()不會(huì)返回俱诸,producer.Errors()也不會(huì)返回(假設(shè)沒有錯(cuò)誤發(fā)生),就掛在這兒赊舶。當(dāng)然可以加一個(gè)default分支繞過去睁搭,就不會(huì)掛住了:
select {
case msg := <-producer.Successes():
log.Printf("Produced message successes: [%s]\n",msg.Value)
case err := <-producer.Errors():
log.Println("Produced message failure: ", err)
default:
log.Println("Produced message default")
}
- 如果打開了Return.Successes配置赶诊,則上述代碼段等同于同步方式
config.Producer.Return.Successes = true
...
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)
// wait response
select {
case msg := <-producer.Successes():
log.Printf("Produced message successes: [%s]\n",msg.Value)
case err := <-producer.Errors():
log.Println("Produced message failure: ", err)
}
從log可以看到,每發(fā)送一條消息园骆,收到一條Return.Successes舔痪,類似于:
2018/12/25 08:51:51 Produced message: [message 00002537]
2018/12/25 08:51:51 Produced message successes: [message 00002537]
2018/12/25 08:51:51 Produced message: [message 00002538]
2018/12/25 08:51:51 Produced message successes: [message 00002538]
2018/12/25 08:51:51 Produced message: [message 00002539]
2018/12/25 08:51:51 Produced message successes: [message 00002539]
2018/12/25 08:51:51 Produced message: [message 00002540]
2018/12/25 08:51:51 Produced message successes: [message 00002540]
2018/12/25 08:51:51 Produced message: [message 00002541]
2018/12/25 08:51:51 Produced message successes: [message 00002541]
2018/12/25 08:51:51 Produced message: [message 00002542]
2018/12/25 08:51:51 Produced message successes: [message 00002542]
2018/12/25 08:51:51 Produced message: [message 00002543]
2018/12/25 08:51:51 Produced message successes: [message 00002543]
...
就像是同步produce一樣的行為了。
- 如果打開了Return.Successes配置锌唾,而又沒有producer.Successes()提取锄码,那么Successes()這個(gè)chan消息會(huì)被寫滿。
config.Producer.Return.Successes = true
...
log.Printf("Reade to Produced message: [%s]\n",text)
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)
// wait response
select {
//case msg := <-producer.Successes():
// log.Printf("Produced message successes: [%s]\n",msg.Value)
case err := <-producer.Errors():
log.Println("Produced message failure: ", err)
default:
log.Println("Produced message default",)
}
寫滿的結(jié)果就是不能再寫入了晌涕,導(dǎo)致后面的Return.Successes消息丟失, 而且producer也會(huì)掛住滋捶,因?yàn)楣蚕淼腷uffer被占滿了,大量的Return.Successes沒有被消耗掉余黎。
運(yùn)行一段時(shí)間后:
2018/12/25 08:58:24 Reade to Produced message: [message 00000603]
2018/12/25 08:58:24 Produced message: [message 00000603]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000604]
2018/12/25 08:58:24 Produced message: [message 00000604]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000605]
2018/12/25 08:58:24 Produced message: [message 00000605]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000606]
2018/12/25 08:58:24 Produced message: [message 00000606]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000607]
2018/12/25 08:58:24 Produced message: [message 00000607]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000608]
在produce第00000608個(gè)message的時(shí)候被掛住了重窟,因?yàn)橄⒕彌_滿了;這個(gè)緩沖的大小是可配的(可能是這個(gè)MaxRequestSize?)惧财,但是不管大小是多少巡扇,如果沒有去提取Success消息最終都會(huì)被占滿的。
結(jié)論就是說配置config.Producer.Return.Successes = true
和操作<-producer.Successes()
必須配套使用垮衷;配置成true厅翔,那么就要去讀取Successes,如果配置成false搀突,則不能去讀取Successes刀闷。