MQ系列:
kafka系列一: kafka簡介
kafka系列二: kafka部署
kafka系列三: Spring kafka
kafka系列四:動態(tài)添加監(jiān)聽器
kafka系列五:失敗后重試機制
前言
在 Spring Kafka 中架专,失敗重試與死信隊列的處理是關(guān)鍵功能,可以確保消息處理的可靠性和健壯性。當消費者處理消息失敗時埂伦,可以配置重試機制糯崎,在重試多次后仍然失敗時砚婆,將消息發(fā)送到死信隊列進行處理狠鸳。
重試機制的用法
springboot 中使用kafka消息失敗重試機制非常便捷傍妒,關(guān)注 @RetryableTopic 和 @DltHandler 兩個注解即可蛉签。
以下模擬處理失敗的例子
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.retrytopic.DltHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.stereotype.Service;
import org.springframework.retry.annotation.Backoff;
@Service
public class KafkaConsumerService {
@RetryableTopic(
attempts = "3",
backoff = @Backoff(delay = 2000, multiplier = 2, maxDelay = 60000),
dltStrategy = RetryableTopic.DltStrategy.FAIL_ON_ERROR,
autoCreateTopics = "true"
)
@KafkaListener(topics = "my-topic", groupId = "${spring.kafka.consumer.group-id}")
public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
try {
System.out.println("Received message: " + record.value());
// 模擬異常
if (shouldFail()) {
throw new RuntimeException("Simulated failure");
}
acknowledgment.acknowledge();
} catch (Exception e) {
throw e;
}
}
private boolean shouldFail() {
// 模擬處理失敗的條件
return true;
}
@DltHandler
public void dltListen(ConsumerRecord<String, String> record) {
System.out.println("Received message in DLT: " + record.value());
// 可以在這里添加對死信消息的處理邏輯
}
}
@RetryableTopic 注解:
這個注解會將失敗后重試的監(jiān)聽注冊為被標注的方法胡陪,如例子中,my-topic的監(jiān)聽器和my-topic處理失敗后重試的監(jiān)聽都是方法 listen()
- attempts:指定重試次數(shù)碍舍。
- backoff:配置重試的間隔和倍數(shù)柠座。
- dltStrategy:配置死信隊列策略(DltStrategy.FAIL_ON_ERROR 表示處理失敗時將消息發(fā)送到死信隊列)。
- autoCreateTopics:配置是否自動創(chuàng)建重試和死信隊列的主題片橡。
@DltHandler 注解:
用于標記處理死信隊列消息的方法妈经。
死信隊列的topic name根據(jù)原Topic(my-topic)在超過指定失敗次數(shù)后自動生成新的Topic(my-topic.dlt),并且被 @DltHandler 標注的方法監(jiān)聽捧书。
自定義死信隊列監(jiān)聽器
通過注解@RetryableTopic 和 @DltHandler吹泡,可以非常便捷地整合失敗重試機制到你的app中,但是经瓷,不夠靈活爆哑。
正如上篇文章提到的,動態(tài)增加新的Topic監(jiān)聽器時舆吮,如何引入對應(yīng)的失敗重試機制呢揭朝。
在上次分析的基礎(chǔ)上,解析@RetryableTopic注解時色冀,通過ConcurrentMessageListenerContainer的setCommonErrorHandler的方法設(shè)置異常重試配置的
/**
* Set the {@link CommonErrorHandler} which can handle errors for both record
* and batch listeners.
* @param commonErrorHandler the handler.
* @since 2.8
*/
public void setCommonErrorHandler(@Nullable CommonErrorHandler commonErrorHandler) {
this.commonErrorHandler = commonErrorHandler;
}
因此潭袱,我們需要定義一個返回實現(xiàn)CommonErrorHandler(DefaultErrorHandler ) 的方法
/***
*
* @return
*/
public DefaultErrorHandler errorHandler() {
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(3);
backOff.setInitialInterval(2000);
backOff.setMultiplier(2);
backOff.setMaxInterval(10000);
return new DefaultErrorHandler(recoverer, backOff);
}
然后在動態(tài)添加新Topic監(jiān)聽器的方法中處理setCommonErrorHandler(errorHandler())
/**
* 添加 Topeic
*
* @param topic
* @param groupId
* @param isDeadLetter
*/
public void addKafkaListener(String topic, String groupId, boolean isDeadLetter) {
// kafka 消費者
DefaultKafkaConsumerFactory<String, String> consumerFactory = consumerFactory(groupId);
// 相關(guān)屬性
ContainerProperties props = new ContainerProperties(topic);
// 設(shè)置監(jiān)聽器(區(qū)分死信隊列與非死信隊列)
if (!isDeadLetter) {
props.setMessageListener(new CustomerMsgErrorHandler());
} else {
props.setMessageListener(new DeadLetterHandler());
}
props.setGroupId(groupId);
props.setAckMode(ContainerProperties.AckMode.MANUAL);
ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, props);
container.setCommonErrorHandler(errorHandler());
// 非死信隊列,則添加一個死信隊列監(jiān)聽器
if (!isDeadLetter) {
addKafkaListener(topic + ".DLT", groupId, true);
}
container.start();
}
如果直接用 @DltHandler標注方法的方式锋恬,添加死信隊列監(jiān)聽器屯换,監(jiān)聽器無效,故伶氢,直接增加了一個監(jiān)聽"topic.dlt"的監(jiān)聽器趟径,來處理死信隊列瘪吏。
另外定義一個專門模擬異常的Hander和死信隊列的Handkler
@Slf4j
public class CustomerMsgErrorHandler implements AcknowledgingMessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
// doSomething
System.out.println("------------ onMessage topic " + data.topic() + ": " + data.value());
// 模擬異常
if (shouldFail()) {
throw new RuntimeException("Simulated failure");
}
// 因為前面設(shè)置了手動提交ack的方式,這里需要在消息處理完成后提交ack
acknowledgment.acknowledge();
}
private boolean shouldFail() {
// 模擬處理失敗的條件
return true;
}
}
@Slf4j
public class DeadLetterHandler implements AcknowledgingMessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
// doSomething
System.out.println("------------ DLT onMessage topic " + data.topic() + ": " + data.value());
// 因為前面設(shè)置了手動提交ack的方式蜗巧,這里需要在消息處理完成后提交ack
acknowledgment.acknowledge();
}
}
OK掌眠,這樣就暫時實現(xiàn)了動態(tài)增加新Topic監(jiān)聽器的功能了,并且也用到了重試機制幕屹。
總結(jié)
上述配置展示了如何在 Spring Kafka 中實現(xiàn)失敗重試與死信隊列處理蓝丙。通過配置 DefaultErrorHandler,可以設(shè)置重試機制和死信隊列處理策略望拖。在消費者監(jiān)聽器中處理消息時渺尘,如果出現(xiàn)異常,消息會根據(jù)配置的重試策略進行重試说敏,多次重試失敗后會被發(fā)送到死信隊列鸥跟。死信隊列的監(jiān)聽器可以處理這些失敗的消息,從而實現(xiàn)對異常消息的特殊處理盔沫。
在實現(xiàn)動態(tài)增加新Topic監(jiān)聽器的功能時医咨,雖然,已經(jīng)按照配置去執(zhí)行失敗重試了架诞,但是拟淮,并沒有如意料中的那樣,回調(diào)@DltHandler標注的死信隊列監(jiān)聽器谴忧。估計是沒有把相關(guān)對象托管到springboot容器中的原因很泊,下次再仔細瞧瞧,解決這個問題沾谓。