1 year ago

#377147

test-img

Ankit Gautam

Spring cloud stream kafka consumer DefaultErrorHandler not working

I need help in error handling scenario in spring cloud stream kafka binder. My Application has java 8 consumer of which binding is specified in application.yaml. The consumer is written as :

@Bean
public Consumer<Message<Transaction>> doProcess() {

    return message -> {
        Transaction transaction = message.getPayload();
       
        if(true) {
            throw new RuntimeException("exception!! !!:)");
        }
       Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, 
       Acknowledgment.class);
       if (acknowledgment != null) {
           System.out.println("Acknowledgment provided");
           acknowledgment.acknowledge();
       }
  }
}

application.yaml:

spring.application.name: appname
spring.cloud.stream:
  function.definition: doProcess
  kafka:
    default.consumer:
      startOffset: latest
      useNativeDecoding: true
    bindings:
      input.consumer.autoCommitOffset: false

bindings:
  doProcess-in-0:
    destination: kafka.input.topic.name
    group: appGroup
    content-type: application/*+avro
    consumer:
      autoCommitOffset: false.

Bean defined are:

@Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer> listener() {
        System.out.println(String.format("DEBUG: Bean has bean created."));
        return new KafkaListenerContainerCustomizer();
    }

public class KafkaListenerContainerCustomizer implements ListenerContainerCustomizer<AbstractMessageListenerContainer> {
    private Object notifier;
    public KafkaListenerContainerCustomizer(Object notifier){
        this.notifier = notifier;
    }
    @Override
    public void configure(AbstractMessageListenerContainer container, String destinationName, String group) {
        KafkaGlobalErrorHandler eh = new KafkaGlobalErrorHandler(new ExponentialBackOff());
        container.setCommonErrorHandler(eh);
    }
}



public class KafkaGlobalErrorHandler extends DefaultErrorHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaGlobalErrorHandler.class);

    public KafkaGlobalErrorHandler(BackOff backOff) {
        super(backOff);
    }


    @Override
    public void handleRecord(Exception exception, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
                             MessageListenerContainer container) {

        LOGGER.error("Error occured while processing: " + ListenerUtils.recordToString(record), exception);
        String topic = record.topic();
        long offset = record.offset();
        int partition = record.partition();
        if (exception.getClass().equals(DeserializationException.class)) {
            DeserializationException deserializationException = (DeserializationException) exception;
            LOGGER.error("Malformed Message Deserialization Exception on topic {}, offset {}, data, {}, msg {}",
                    topic,
                    offset,
                    deserializationException.getData(),
                    deserializationException.getLocalizedMessage());
        } else {
            LOGGER.error("An Exception has occurred. topic {}, offset {}, data, {}, msg {}", topic, offset, partition,
                    exception.getLocalizedMessage());
        }

        consumer.commitSync();
    }

    @Override
    public void handleBatch(Exception exception, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
                            MessageListenerContainer container, Runnable invokeListener) {

        for (ConsumerRecord<?, ?> record : records) {

            String topic = record.topic();
            long offset = record.offset();
            int partition = record.partition();
            if (exception.getClass().equals(DeserializationException.class)) {
                DeserializationException deserializationException = (DeserializationException) exception;
                LOGGER.error("Malformed Message Deserialization Exception on topic {}, offset {}, data, {}, msg {}",
                        topic,
                        offset,
                        deserializationException.getData(),
                        deserializationException.getLocalizedMessage());
            } else {
                LOGGER.error("An Exception has occurred. topic {}, offset {}, data, {}, msg {}", topic, offset, partition,
                        exception.getLocalizedMessage());
            }
            consumer.commitSync();
        }
    }

    @Override
    public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer,
                                     MessageListenerContainer container, boolean batchListener) {

        LOGGER.error("Error occurred while not processing records", thrownException);
    }

}

Now, I am struggling with error handling :

We need to write the custom exception handler, where we can catch the exception(error in both application code and framework) and send notification to a user group via email in AWS env. But, we are not able to find any error handler which can catch both types of exception. We tried with SeekToCuurentErrorHandler but it did not work. Then I tried with DefaultErrorHandler as suggested in this post : Spring cloud stream kafka consumer error handling and retries issues, but it's working only for some exception (i.e able to catch in handleOtherException method), and if consumer code throws any RuntimeException(as given in consumer code attached here), it is not caught by DefaultErrorHandler.

spring

spring-boot

apache-kafka

spring-cloud-stream

0 Answers

Your Answer

Accepted video resources