2022-04-15 kafka異步發(fā)送 返回結(jié)果處理

https://www.lixueduan.com/post/kafka/06-sarama-producer/

func Producer(topic string, limit int) {
    config := sarama.NewConfig()
    // 異步生產(chǎn)者不建議把 Errors 和 Successes 都開啟子眶,一般開啟 Errors 就行
    // 同步生產(chǎn)者就必須都開啟,因?yàn)闀椒祷匕l(fā)送成功或者失敗
    config.Producer.Return.Errors = true    // 設(shè)定是否需要返回錯(cuò)誤信息
    config.Producer.Return.Successes = true // 設(shè)定是否需要返回成功信息
    producer, err := sarama.NewAsyncProducer([]string{conf.HOST}, config)
    if err != nil {
        log.Fatal("NewSyncProducer err:", err)
    }
    var (
        wg                                   sync.WaitGroup
        enqueued, timeout, successes, errors int
    )
    // [!important] 異步生產(chǎn)者發(fā)送后必須把返回值從 Errors 或者 Successes 中讀出來 不然會阻塞 sarama 內(nèi)部處理邏輯 導(dǎo)致只能發(fā)出去一條消息
    wg.Add(1)
    go func() {
        defer wg.Done()
        for range producer.Successes() {
            // log.Printf("[Producer] Success: key:%v msg:%+v \n", s.Key, s.Value)
            successes++
        }
    }()

    wg.Add(1)
    go func() {
        defer wg.Done()
        for e := range producer.Errors() {
            log.Printf("[Producer] Errors:err:%v msg:%+v \n", e.Msg, e.Err)
            errors++
        }
    }()

    // 異步發(fā)送
    for i := 0; i < limit; i++ {
        str := strconv.Itoa(int(time.Now().UnixNano()))
        msg := &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(str)}
        // 異步發(fā)送只是寫入內(nèi)存了就返回了,并沒有真正發(fā)送出去
        // sarama 庫中用的是一個(gè) channel 來接收胆萧,后臺 goroutine 異步從該 channel 中取出消息并真正發(fā)送
        // select + ctx 做超時(shí)控制,防止阻塞 producer.Input() <- msg 也可能會阻塞
        ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
        select {
        case producer.Input() <- msg:
            enqueued++
        case <-ctx.Done():
            timeout++
        }
        cancel()
        if i%10000 == 0 && i != 0 {
            log.Printf("已發(fā)送消息數(shù):%d 超時(shí)數(shù):%d\n", i, timeout)
        }
    }

    // We are done
    producer.AsyncClose()
    wg.Wait()
    log.Printf("發(fā)送完畢 總發(fā)送條數(shù):%d enqueued:%d timeout:%d successes: %d errors: %d\n", limit, enqueued, timeout, successes, errors)
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末温兼,一起剝皮案震驚了整個(gè)濱河市揪利,隨后出現(xiàn)的幾起案子植锉,更是在濱河造成了極大的恐慌泥栖,老刑警劉巖礼烈,帶你破解...
    沈念sama閱讀 219,039評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件弧满,死亡現(xiàn)場離奇詭異,居然都是意外死亡此熬,警方通過查閱死者的電腦和手機(jī)庭呜,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評論 3 395
  • 文/潘曉璐 我一進(jìn)店門洽蛀,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人疟赊,你說我怎么就攤上這事郊供。” “怎么了近哟?”我有些...
    開封第一講書人閱讀 165,417評論 0 356
  • 文/不壞的土叔 我叫張陵驮审,是天一觀的道長。 經(jīng)常有香客問我吉执,道長疯淫,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,868評論 1 295
  • 正文 為了忘掉前任戳玫,我火速辦了婚禮熙掺,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘咕宿。我一直安慰自己币绩,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,892評論 6 392
  • 文/花漫 我一把揭開白布府阀。 她就那樣靜靜地躺著缆镣,像睡著了一般。 火紅的嫁衣襯著肌膚如雪试浙。 梳的紋絲不亂的頭發(fā)上董瞻,一...
    開封第一講書人閱讀 51,692評論 1 305
  • 那天,我揣著相機(jī)與錄音田巴,去河邊找鬼钠糊。 笑死,一個(gè)胖子當(dāng)著我的面吹牛壹哺,可吹牛的內(nèi)容都是我干的抄伍。 我是一名探鬼主播,決...
    沈念sama閱讀 40,416評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼斗躏,長吁一口氣:“原來是場噩夢啊……” “哼逝慧!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起啄糙,我...
    開封第一講書人閱讀 39,326評論 0 276
  • 序言:老撾萬榮一對情侶失蹤笛臣,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后隧饼,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體沈堡,經(jīng)...
    沈念sama閱讀 45,782評論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,957評論 3 337
  • 正文 我和宋清朗相戀三年燕雁,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了诞丽。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片鲸拥。...
    茶點(diǎn)故事閱讀 40,102評論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖僧免,靈堂內(nèi)的尸體忽然破棺而出刑赶,到底是詐尸還是另有隱情,我是刑警寧澤懂衩,帶...
    沈念sama閱讀 35,790評論 5 346
  • 正文 年R本政府宣布撞叨,位于F島的核電站,受9級特大地震影響浊洞,放射性物質(zhì)發(fā)生泄漏牵敷。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,442評論 3 331
  • 文/蒙蒙 一法希、第九天 我趴在偏房一處隱蔽的房頂上張望枷餐。 院中可真熱鬧,春花似錦苫亦、人聲如沸毛肋。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽村生。三九已至,卻和暖如春饼丘,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背辽话。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評論 1 272
  • 我被黑心中介騙來泰國打工肄鸽, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人油啤。 一個(gè)月前我還...
    沈念sama閱讀 48,332評論 3 373
  • 正文 我出身青樓典徘,卻偏偏與公主長得像,于是被迫代替她去往敵國和親益咬。 傳聞我的和親對象是個(gè)殘疾皇子逮诲,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,044評論 2 355

推薦閱讀更多精彩內(nèi)容