kafka系列五:失敗后重試機制


MQ系列:
kafka系列一: kafka簡介
kafka系列二: kafka部署
kafka系列三: Spring kafka
kafka系列四:動態(tài)添加監(jiān)聽器
kafka系列五:失敗后重試機制


前言

在 Spring Kafka 中架专,失敗重試與死信隊列的處理是關(guān)鍵功能,可以確保消息處理的可靠性和健壯性。當消費者處理消息失敗時埂伦,可以配置重試機制糯崎,在重試多次后仍然失敗時砚婆,將消息發(fā)送到死信隊列進行處理狠鸳。

重試機制的用法

springboot 中使用kafka消息失敗重試機制非常便捷傍妒,關(guān)注 @RetryableTopic@DltHandler 兩個注解即可蛉签。
以下模擬處理失敗的例子

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.retrytopic.DltHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.stereotype.Service;
import org.springframework.retry.annotation.Backoff;

@Service
public class KafkaConsumerService {

    @RetryableTopic(
            attempts = "3",
            backoff = @Backoff(delay = 2000, multiplier = 2, maxDelay = 60000),
            dltStrategy = RetryableTopic.DltStrategy.FAIL_ON_ERROR,
            autoCreateTopics = "true"
    )
    @KafkaListener(topics = "my-topic", groupId = "${spring.kafka.consumer.group-id}")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        try {
            System.out.println("Received message: " + record.value());
            // 模擬異常
            if (shouldFail()) {
                throw new RuntimeException("Simulated failure");
            }
            acknowledgment.acknowledge();
        } catch (Exception e) {
            throw e;
        }
    }

    private boolean shouldFail() {
        // 模擬處理失敗的條件
        return true;
    }

    @DltHandler
    public void dltListen(ConsumerRecord<String, String> record) {
        System.out.println("Received message in DLT: " + record.value());
        // 可以在這里添加對死信消息的處理邏輯
    }
}

@RetryableTopic 注解:

這個注解會將失敗后重試的監(jiān)聽注冊為被標注的方法胡陪,如例子中,my-topic的監(jiān)聽器和my-topic處理失敗后重試的監(jiān)聽都是方法 listen()

  • attempts:指定重試次數(shù)碍舍。
  • backoff:配置重試的間隔和倍數(shù)柠座。
  • dltStrategy:配置死信隊列策略(DltStrategy.FAIL_ON_ERROR 表示處理失敗時將消息發(fā)送到死信隊列)。
  • autoCreateTopics:配置是否自動創(chuàng)建重試和死信隊列的主題片橡。

@DltHandler 注解:

用于標記處理死信隊列消息的方法妈经。
死信隊列的topic name根據(jù)原Topic(my-topic)在超過指定失敗次數(shù)后自動生成新的Topic(my-topic.dlt),并且被 @DltHandler 標注的方法監(jiān)聽捧书。

自定義死信隊列監(jiān)聽器

通過注解@RetryableTopic 和 @DltHandler吹泡,可以非常便捷地整合失敗重試機制到你的app中,但是经瓷,不夠靈活爆哑。
正如上篇文章提到的,動態(tài)增加新的Topic監(jiān)聽器時舆吮,如何引入對應(yīng)的失敗重試機制呢揭朝。
在上次分析的基礎(chǔ)上,解析@RetryableTopic注解時色冀,通過ConcurrentMessageListenerContainer的setCommonErrorHandler的方法設(shè)置異常重試配置的


    /**
     * Set the {@link CommonErrorHandler} which can handle errors for both record
     * and batch listeners.
     * @param commonErrorHandler the handler.
     * @since 2.8
     */
    public void setCommonErrorHandler(@Nullable CommonErrorHandler commonErrorHandler) {
        this.commonErrorHandler = commonErrorHandler;
    }

因此潭袱,我們需要定義一個返回實現(xiàn)CommonErrorHandler(DefaultErrorHandler ) 的方法

    /***
     *
     * @return
     */
    public DefaultErrorHandler errorHandler() {

        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);

        ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(3);
        backOff.setInitialInterval(2000);
        backOff.setMultiplier(2);
        backOff.setMaxInterval(10000);

        return new DefaultErrorHandler(recoverer, backOff);
    }

然后在動態(tài)添加新Topic監(jiān)聽器的方法中處理setCommonErrorHandler(errorHandler())


/**
     * 添加 Topeic
     *
     * @param topic
     * @param groupId
     * @param isDeadLetter
     */
    public void addKafkaListener(String topic, String groupId, boolean isDeadLetter) {
        // kafka 消費者
        DefaultKafkaConsumerFactory<String, String> consumerFactory = consumerFactory(groupId);

        // 相關(guān)屬性
        ContainerProperties props = new ContainerProperties(topic);

        // 設(shè)置監(jiān)聽器(區(qū)分死信隊列與非死信隊列)
        if (!isDeadLetter) {
            props.setMessageListener(new CustomerMsgErrorHandler());
        } else {
            props.setMessageListener(new DeadLetterHandler());
        }

        props.setGroupId(groupId);
        props.setAckMode(ContainerProperties.AckMode.MANUAL);

        ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, props);

        container.setCommonErrorHandler(errorHandler());

       // 非死信隊列,則添加一個死信隊列監(jiān)聽器
        if (!isDeadLetter) {
            addKafkaListener(topic + ".DLT", groupId, true);
        }
        container.start();
    }

如果直接用 @DltHandler標注方法的方式锋恬,添加死信隊列監(jiān)聽器屯换,監(jiān)聽器無效,故伶氢,直接增加了一個監(jiān)聽"topic.dlt"的監(jiān)聽器趟径,來處理死信隊列瘪吏。
另外定義一個專門模擬異常的Hander和死信隊列的Handkler

@Slf4j
public class CustomerMsgErrorHandler implements AcknowledgingMessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
        // doSomething
        System.out.println("------------ onMessage topic " + data.topic() + ": " + data.value());

        // 模擬異常
        if (shouldFail()) {
            throw new RuntimeException("Simulated failure");
        }

        // 因為前面設(shè)置了手動提交ack的方式,這里需要在消息處理完成后提交ack
        acknowledgment.acknowledge();
    }

    private boolean shouldFail() {
        // 模擬處理失敗的條件
        return true;
    }
}
@Slf4j
public class DeadLetterHandler implements AcknowledgingMessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
        // doSomething
        System.out.println("------------ DLT onMessage topic " + data.topic() + ": " + data.value());

        // 因為前面設(shè)置了手動提交ack的方式蜗巧,這里需要在消息處理完成后提交ack
        acknowledgment.acknowledge();
    }
}

OK掌眠,這樣就暫時實現(xiàn)了動態(tài)增加新Topic監(jiān)聽器的功能了,并且也用到了重試機制幕屹。

總結(jié)

上述配置展示了如何在 Spring Kafka 中實現(xiàn)失敗重試與死信隊列處理蓝丙。通過配置 DefaultErrorHandler,可以設(shè)置重試機制和死信隊列處理策略望拖。在消費者監(jiān)聽器中處理消息時渺尘,如果出現(xiàn)異常,消息會根據(jù)配置的重試策略進行重試说敏,多次重試失敗后會被發(fā)送到死信隊列鸥跟。死信隊列的監(jiān)聽器可以處理這些失敗的消息,從而實現(xiàn)對異常消息的特殊處理盔沫。

在實現(xiàn)動態(tài)增加新Topic監(jiān)聽器的功能時医咨,雖然,已經(jīng)按照配置去執(zhí)行失敗重試了架诞,但是拟淮,并沒有如意料中的那樣,回調(diào)@DltHandler標注的死信隊列監(jiān)聽器谴忧。估計是沒有把相關(guān)對象托管到springboot容器中的原因很泊,下次再仔細瞧瞧,解決這個問題沾谓。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末委造,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子搏屑,更是在濱河造成了極大的恐慌争涌,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件辣恋,死亡現(xiàn)場離奇詭異亮垫,居然都是意外死亡,警方通過查閱死者的電腦和手機伟骨,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進店門饮潦,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人携狭,你說我怎么就攤上這事继蜡。” “怎么了?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵稀并,是天一觀的道長仅颇。 經(jīng)常有香客問我,道長碘举,這世上最難降的妖魔是什么忘瓦? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮引颈,結(jié)果婚禮上耕皮,老公的妹妹穿的比我還像新娘。我一直安慰自己蝙场,他們只是感情好凌停,可當我...
    茶點故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著售滤,像睡著了一般罚拟。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上完箩,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天舟舒,我揣著相機與錄音,去河邊找鬼嗜憔。 笑死,一個胖子當著我的面吹牛氏仗,可吹牛的內(nèi)容都是我干的吉捶。 我是一名探鬼主播,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼皆尔,長吁一口氣:“原來是場噩夢啊……” “哼呐舔!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起慷蠕,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤珊拼,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后流炕,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體澎现,經(jīng)...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年每辟,在試婚紗的時候發(fā)現(xiàn)自己被綠了剑辫。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡渠欺,死狀恐怖妹蔽,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情,我是刑警寧澤胳岂,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布编整,位于F島的核電站,受9級特大地震影響乳丰,放射性物質(zhì)發(fā)生泄漏掌测。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一成艘、第九天 我趴在偏房一處隱蔽的房頂上張望赏半。 院中可真熱鬧,春花似錦淆两、人聲如沸断箫。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽仲义。三九已至,卻和暖如春剑勾,著一層夾襖步出監(jiān)牢的瞬間埃撵,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工虽另, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留暂刘,地道東北人。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓捂刺,卻偏偏與公主長得像谣拣,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子族展,可洞房花燭夜當晚...
    茶點故事閱讀 42,762評論 2 345