異常處理
代碼異常十之八九拔莱,十段代碼九個bug卷中,哈哈哈哈政恍。平常程序異常我們使用try catch捕獲異常,在catch方法中根據(jù)異常類型進行相關(guān)處理牛哺,既然我們可以使用try catch處理異常陋气,那為什么還要使用ConsumerAwareErrorHandler異常處理器去處理異常呢?
首先荆隘,KafkaListener要做的事只是監(jiān)聽Topic中的數(shù)據(jù)并消費恩伺,如果在KafkaListener中還需要對異常進行處理則會顯得代碼塊非常臃腫不利于維護,我們可以把異常處理的這些代碼抽象出來椰拒,構(gòu)造成一個異常處理器,KafkaListener中所拋出的異常都會經(jīng)過ConsumerAwareErrorHandler異常處理器進行處理凰荚,這樣就非常方便我們進行后期維護燃观,比如后期更改異常處理業(yè)務(wù)的時候,只需要修改ConsumerAwareErrorHandler處理器就行了便瑟,而不需要KafkaListener的一堆代碼中去修改代碼缆毁。這也是一種思想的體現(xiàn)。
單消息消費異常處理器
這里主要就是注冊一個ConsumerAwareListenerErrorHandler 類型的異常處理器到涂,bean的注冊默認使用的是方法名脊框,所以我們將這個異常處理的BeanName放到@KafkaListener注解的errorHandler屬性里面。當KafkaListener拋出異常的時候践啄,則會自動調(diào)用異常處理器浇雹。
@Component
public class ErrorListener {
private static final Logger log= LoggerFactory.getLogger(ErrorListener.class);
@KafkaListener(id = "err", topics = "topic.quick.error", errorHandler = "consumerAwareErrorHandler")
public void errorListener(String data) {
log.info("topic.quick.error receive : " + data);
throw new RuntimeException("fail");
}
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
return new ConsumerAwareListenerErrorHandler() {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
log.info("consumerAwareErrorHandler receive : "+message.getPayload().toString());
return null;
}
};
}
}
編寫測試方法,發(fā)送一條消息到topic.quick.error中屿讽,運行測試方法后我們可以看到異常處理器已經(jīng)能正常使用了昭灵。
@Autowired
private KafkaTemplate kafkaTemplate;
@Test
public void testErrorHandler() {
kafkaTemplate.send("topic.quick.error", "test error handle");
}
2018-09-14 11:42:05.099 INFO 8912 --- [ err-0-C-1] com.viu.kafka.listen.ErrorListener : topic.quick.error receive : test error handle
2018-09-14 11:42:05.101 INFO 8912 --- [ err-0-C-1] com.viu.kafka.listen.ErrorListener : consumerAwareErrorHandler receive : test error handle
批量消費異常處理器
批量消費代碼也是差不多的,只不過傳遞過來的數(shù)據(jù)都是List集合方式伐谈,這里就不做其他代碼的展示了烂完。
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
return new ConsumerAwareListenerErrorHandler() {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
log.info("consumerAwareErrorHandler receive : "+message.getPayload().toString());
MessageHeaders headers = message.getHeaders();
List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);
List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);
List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);
Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
return null;
}
};
}