Shopify/sarama的同步/異步producer

Shopify/sarama的producer有兩種運(yùn)行模式:

  1. 同步模式
    producer把消息發(fā)給kafka之后會(huì)等待結(jié)果返回丙曙。
  2. 異步模式
    producer把消息發(fā)給kafka之后不會(huì)等待結(jié)果返回相嵌。
  1. 同步模式
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    client, err := sarama.NewClient([]{"localhost:9092"}, config)
    if err != nil {
        log.Fatalf("unable to create kafka client: %q", err)
    }

    producer, err := sarama.NewSyncProducerFromClient(client)
    if err != nil {
        log.Fatalf("unable to create kafka producer: %q", err)
    }
    defer producer.Close()

    text := fmt.Sprintf("message %08d", i)
    partition, offset , err := producer.SendMessage(&sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)})
    if err != nil {
        log.Fatalf("unable to produce message: %q", err)
    }
    ...

注意同步模式下,下面配置必須置上:
config.Producer.Return.Successes = true
否則運(yùn)行報(bào)錯(cuò):

2018/12/25 08:08:30 unable to create kafka producer: "kafka: 
invalid configuration (Producer.Return.Successes must be true to be used in a SyncProducer)"
  1. 異步模式

異步模式融撞,顧名思義就是produce一個(gè)message之后不等待發(fā)送完成返回;這樣調(diào)用者可以繼續(xù)做其他的工作佩厚。

    config := sarama.NewConfig()
    // config.Producer.Return.Successes = true
    client, err := sarama.NewClient([]{"localhost:9092"}, config)
    if err != nil {
        log.Fatalf("unable to create kafka client: %q", err)
    }

    producer, err := sarama.NewAsyncProducerFromClient
    if err != nil {
        log.Fatalf("unable to create kafka producer: %q", err)
    }
    defer producer.Close()

    text := fmt.Sprintf("message %08d", i)
    producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
    // wait response
    select {
            //case msg := <-producer.Successes():
            //    log.Printf("Produced message successes: [%s]\n",msg.Value)
            case err := <-producer.Errors():
                log.Println("Produced message failure: ", err)
            default:
                log.Println("Produced message default",)
    }
    ...

關(guān)于異步producer有一個(gè)地方取藥注意的辆床。

  1. 異步模式produce一個(gè)消息后,缺省并不會(huì)報(bào)告成功狀態(tài)灵临。
config.Producer.Return.Successes = false
...
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)

// wait response
select {
    case msg := <-producer.Successes():
        log.Printf("Produced message successes: [%s]\n",msg.Value)
    case err := <-producer.Errors():
        log.Println("Produced message failure: ", err)
}

則這段代碼會(huì)掛住拣挪,因?yàn)樵O(shè)置沒有要求返回成功config.Producer.Return.Successes = false,那么在select等待的時(shí)候producer.Successes()不會(huì)返回俱诸,producer.Errors()也不會(huì)返回(假設(shè)沒有錯(cuò)誤發(fā)生),就掛在這兒赊舶。當(dāng)然可以加一個(gè)default分支繞過去睁搭,就不會(huì)掛住了:

select {
    case msg := <-producer.Successes():
        log.Printf("Produced message successes: [%s]\n",msg.Value)
    case err := <-producer.Errors():
        log.Println("Produced message failure: ", err)
    default:
        log.Println("Produced message default")
}
  1. 如果打開了Return.Successes配置赶诊,則上述代碼段等同于同步方式
config.Producer.Return.Successes = true
...
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)

// wait response
select {
    case msg := <-producer.Successes():
        log.Printf("Produced message successes: [%s]\n",msg.Value)
    case err := <-producer.Errors():
        log.Println("Produced message failure: ", err)
}

從log可以看到,每發(fā)送一條消息园骆,收到一條Return.Successes舔痪,類似于:

2018/12/25 08:51:51 Produced message: [message 00002537]
2018/12/25 08:51:51 Produced message successes: [message 00002537]
2018/12/25 08:51:51 Produced message: [message 00002538]
2018/12/25 08:51:51 Produced message successes: [message 00002538]
2018/12/25 08:51:51 Produced message: [message 00002539]
2018/12/25 08:51:51 Produced message successes: [message 00002539]
2018/12/25 08:51:51 Produced message: [message 00002540]
2018/12/25 08:51:51 Produced message successes: [message 00002540]
2018/12/25 08:51:51 Produced message: [message 00002541]
2018/12/25 08:51:51 Produced message successes: [message 00002541]
2018/12/25 08:51:51 Produced message: [message 00002542]
2018/12/25 08:51:51 Produced message successes: [message 00002542]
2018/12/25 08:51:51 Produced message: [message 00002543]
2018/12/25 08:51:51 Produced message successes: [message 00002543]
...

就像是同步produce一樣的行為了。

  1. 如果打開了Return.Successes配置锌唾,而又沒有producer.Successes()提取锄码,那么Successes()這個(gè)chan消息會(huì)被寫滿。
config.Producer.Return.Successes = true
...
log.Printf("Reade to Produced message: [%s]\n",text)
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)

// wait response
select {
    //case msg := <-producer.Successes():
    //    log.Printf("Produced message successes: [%s]\n",msg.Value)
    case err := <-producer.Errors():
        log.Println("Produced message failure: ", err)
    default:
        log.Println("Produced message default",)
}

寫滿的結(jié)果就是不能再寫入了晌涕,導(dǎo)致后面的Return.Successes消息丟失, 而且producer也會(huì)掛住滋捶,因?yàn)楣蚕淼腷uffer被占滿了,大量的Return.Successes沒有被消耗掉余黎。

運(yùn)行一段時(shí)間后:

2018/12/25 08:58:24 Reade to Produced message: [message 00000603]
2018/12/25 08:58:24 Produced message: [message 00000603]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000604]
2018/12/25 08:58:24 Produced message: [message 00000604]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000605]
2018/12/25 08:58:24 Produced message: [message 00000605]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000606]
2018/12/25 08:58:24 Produced message: [message 00000606]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000607]
2018/12/25 08:58:24 Produced message: [message 00000607]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000608]

在produce第00000608個(gè)message的時(shí)候被掛住了重窟,因?yàn)橄⒕彌_滿了;這個(gè)緩沖的大小是可配的(可能是這個(gè)MaxRequestSize?)惧财,但是不管大小是多少巡扇,如果沒有去提取Success消息最終都會(huì)被占滿的。

結(jié)論就是說配置config.Producer.Return.Successes = true和操作<-producer.Successes()必須配套使用垮衷;配置成true厅翔,那么就要去讀取Successes,如果配置成false搀突,則不能去讀取Successes刀闷。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市描姚,隨后出現(xiàn)的幾起案子涩赢,更是在濱河造成了極大的恐慌,老刑警劉巖轩勘,帶你破解...
    沈念sama閱讀 217,907評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件筒扒,死亡現(xiàn)場離奇詭異,居然都是意外死亡绊寻,警方通過查閱死者的電腦和手機(jī)花墩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來澄步,“玉大人冰蘑,你說我怎么就攤上這事〈甯祝” “怎么了祠肥?”我有些...
    開封第一講書人閱讀 164,298評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長梯皿。 經(jīng)常有香客問我仇箱,道長县恕,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,586評(píng)論 1 293
  • 正文 為了忘掉前任剂桥,我火速辦了婚禮忠烛,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘权逗。我一直安慰自己美尸,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,633評(píng)論 6 392
  • 文/花漫 我一把揭開白布斟薇。 她就那樣靜靜地躺著师坎,像睡著了一般。 火紅的嫁衣襯著肌膚如雪奔垦。 梳的紋絲不亂的頭發(fā)上屹耐,一...
    開封第一講書人閱讀 51,488評(píng)論 1 302
  • 那天,我揣著相機(jī)與錄音椿猎,去河邊找鬼惶岭。 笑死,一個(gè)胖子當(dāng)著我的面吹牛犯眠,可吹牛的內(nèi)容都是我干的按灶。 我是一名探鬼主播,決...
    沈念sama閱讀 40,275評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼筐咧,長吁一口氣:“原來是場噩夢啊……” “哼鸯旁!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起量蕊,我...
    開封第一講書人閱讀 39,176評(píng)論 0 276
  • 序言:老撾萬榮一對情侶失蹤铺罢,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后残炮,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體韭赘,經(jīng)...
    沈念sama閱讀 45,619評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,819評(píng)論 3 336
  • 正文 我和宋清朗相戀三年势就,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了泉瞻。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,932評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡苞冯,死狀恐怖袖牙,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情舅锄,我是刑警寧澤鞭达,帶...
    沈念sama閱讀 35,655評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站,受9級(jí)特大地震影響碉怔,放射性物質(zhì)發(fā)生泄漏烘贴。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,265評(píng)論 3 329
  • 文/蒙蒙 一撮胧、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧老翘,春花似錦芹啥、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,871評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽昔搂。三九已至荞彼,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間捡多,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,994評(píng)論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留钓账,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,095評(píng)論 3 370
  • 正文 我出身青樓絮宁,卻偏偏與公主長得像梆暮,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子绍昂,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,884評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容