1 year ago
#377147
Ankit Gautam
Spring cloud stream kafka consumer DefaultErrorHandler not working
I need help in error handling scenario in spring cloud stream kafka binder. My Application has java 8 consumer of which binding is specified in application.yaml. The consumer is written as :
@Bean
public Consumer<Message<Transaction>> doProcess() {
return message -> {
Transaction transaction = message.getPayload();
if(true) {
throw new RuntimeException("exception!! !!:)");
}
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT,
Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
}
application.yaml:
spring.application.name: appname
spring.cloud.stream:
function.definition: doProcess
kafka:
default.consumer:
startOffset: latest
useNativeDecoding: true
bindings:
input.consumer.autoCommitOffset: false
bindings:
doProcess-in-0:
destination: kafka.input.topic.name
group: appGroup
content-type: application/*+avro
consumer:
autoCommitOffset: false.
Bean defined are:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer> listener() {
System.out.println(String.format("DEBUG: Bean has bean created."));
return new KafkaListenerContainerCustomizer();
}
public class KafkaListenerContainerCustomizer implements ListenerContainerCustomizer<AbstractMessageListenerContainer> {
private Object notifier;
public KafkaListenerContainerCustomizer(Object notifier){
this.notifier = notifier;
}
@Override
public void configure(AbstractMessageListenerContainer container, String destinationName, String group) {
KafkaGlobalErrorHandler eh = new KafkaGlobalErrorHandler(new ExponentialBackOff());
container.setCommonErrorHandler(eh);
}
}
public class KafkaGlobalErrorHandler extends DefaultErrorHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaGlobalErrorHandler.class);
public KafkaGlobalErrorHandler(BackOff backOff) {
super(backOff);
}
@Override
public void handleRecord(Exception exception, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
MessageListenerContainer container) {
LOGGER.error("Error occured while processing: " + ListenerUtils.recordToString(record), exception);
String topic = record.topic();
long offset = record.offset();
int partition = record.partition();
if (exception.getClass().equals(DeserializationException.class)) {
DeserializationException deserializationException = (DeserializationException) exception;
LOGGER.error("Malformed Message Deserialization Exception on topic {}, offset {}, data, {}, msg {}",
topic,
offset,
deserializationException.getData(),
deserializationException.getLocalizedMessage());
} else {
LOGGER.error("An Exception has occurred. topic {}, offset {}, data, {}, msg {}", topic, offset, partition,
exception.getLocalizedMessage());
}
consumer.commitSync();
}
@Override
public void handleBatch(Exception exception, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
MessageListenerContainer container, Runnable invokeListener) {
for (ConsumerRecord<?, ?> record : records) {
String topic = record.topic();
long offset = record.offset();
int partition = record.partition();
if (exception.getClass().equals(DeserializationException.class)) {
DeserializationException deserializationException = (DeserializationException) exception;
LOGGER.error("Malformed Message Deserialization Exception on topic {}, offset {}, data, {}, msg {}",
topic,
offset,
deserializationException.getData(),
deserializationException.getLocalizedMessage());
} else {
LOGGER.error("An Exception has occurred. topic {}, offset {}, data, {}, msg {}", topic, offset, partition,
exception.getLocalizedMessage());
}
consumer.commitSync();
}
}
@Override
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer,
MessageListenerContainer container, boolean batchListener) {
LOGGER.error("Error occurred while not processing records", thrownException);
}
}
Now, I am struggling with error handling :
We need to write the custom exception handler, where we can catch the exception(error in both application code and framework) and send notification to a user group via email in AWS env. But, we are not able to find any error handler which can catch both types of exception. We tried with SeekToCuurentErrorHandler but it did not work. Then I tried with DefaultErrorHandler as suggested in this post : Spring cloud stream kafka consumer error handling and retries issues, but it's working only for some exception (i.e able to catch in handleOtherException method), and if consumer code throws any RuntimeException(as given in consumer code attached here), it is not caught by DefaultErrorHandler.
spring
spring-boot
apache-kafka
spring-cloud-stream
0 Answers
Your Answer