spring kafka 1.x consumer and producer 异常处理
spring kafka version 1.3.3
consumer 异常处理:
consumer 使用 @KafkaListener 注解
@KafkaListener(topics={"kafka"}, errorHandler="myKafkaListenerErrorHandler")
指定 errorHandler
属性,值需要是spring bean
实现 KafkaListenerErrorHandler
接口
1 |
|
producer 异常处理:
实现 ProducerListener
接口的 onError
接口
ProducerListenerAdapter 是 ProducerListener 的空实现
1 | public class MyProducerListener<K, V> extends ProducerListenerAdapter<K, V> { |
在需要的的覅方,替换 kafkaTemplate
默认的 ProducerListener
1 | kafkaTemplate.setProducerListener(new MyProducerListener<>()); |
我使用的是 spring boot,利用spring boot 的 CommandLineRunner 接口,在启动完成后,将kafkaTemplate 默认的 ProducerListener 替换成自己的实现
1 |
|