1 year ago

#364555

test-img

jkalyanc

Messages not rolling back on K8s pod restarts when using Spring JMS Listener with Client Ack

We have Spring JMS application ( deployed on K8s) which processes about 100 - 400 messages/sec. The application consumes messages from IBM MQ and processes them. Off late we have started noticing messages getting dropped whenever K8s pod restarts or deployments are done even though we have message ack in place. I am looking for a solution here to resolve this issue.

Software Version
Spring Boot 2.1.7.RELEASE
IBM MQ Client 9.1.0.5
JMS 2.0.1
Java 11
@Configuration
@EnableJms
public class MqConfiguration {
    @Bean
    public MQConnectionFactory mqConnectionFactory(Servers configProperties) {
        MQConnectionFactory mqConnectionFactory = new MQConnectionFactory();
        try {
            mqConnectionFactory.setHostName(configProperties.getHost());
            mqConnectionFactory.setQueueManager(configProperties.getQueueManager());
            mqConnectionFactory.setPort(Integer.valueOf(configProperties.getPort()));
            mqConnectionFactory.setChannel(configProperties.getChannel());
            mqConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
            mqConnectionFactory.setCCSID(1208);
            mqConnectionFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT);
        } catch (Exception e) {
            logger.logError(mqConnectionFactory, ,
                    "Failed to create MQ ConnectionFactory", String.valueOf(HttpStatus.SC_BAD_REQUEST), e);
        }
        return mqConnectionFactory;
    }

    @Bean(name = "messageListenerContainerFactory")
    public DefaultJmsListenerContainerFactory provideJmsListenerContainerFactory(
            MQConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        factory.setErrorHandler(new ErrorHandler() {
            @Override
            public void handleError(Throwable t) {
                ServiceMetrics metrics = new ServiceMetrics();
                metrics.setCorrelationId(UUID.getUUID());
                logger.logError(factory, "Exception occured at JMS Factory Container Listener", String.valueOf(HttpStatus.SC_BAD_REQUEST), t);
            }
        });
        
        return factory;
    }

    @Bean(name = "jmsQueueTemplate")
    public JmsTemplate provideJmsQueueTemplate(MQConnectionFactory connectionFactory) {
        return new JmsTemplate(connectionFactory);
    }
} 
@Configuration
public class AsyncConfiguration {

    @Autowired
    private Servers configProperties;


    @Bean(name = "asyncTaskExecutor")
    public ExecutorService getAsyncTaskExecutor() {
        String THREAD_POOL = "th-pool-";
        return getExecutor(THREAD_POOL, 70,true);
    }

    private ExecutorService getExecutor(String threadName, int maxPoolSize, boolean cached) {
        final ThreadFactory threadFactory = new CustomizableThreadFactory(threadName);
        if (cached) {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, maxPoolSize,
                    60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory);
            threadPoolExecutor.setRejectedExecutionHandler((r, executor) -> {
                if (!executor.isShutdown()) {
                    try {
                        executor.getQueue().put(r);
                    } catch (InterruptedException e) {
                        throw new RejectedExecutionException(e);
                    }
                }
            });
            return threadPoolExecutor;
        } else {

            return new ThreadPoolExecutor(maxPoolSize, maxPoolSize,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>(),
                    threadFactory);
        }
    }
@Component
public class InputQueueListener {

    @Autowired
    private ExecutorService asyncTaskExecutor;

    @JmsListener(destination = "${mqserver.queue}", containerFactory = "messageListenerContainerFactory", concurrency = "1-16")
    public void processXMLMessage(Message message)  {
        CompletableFuture.runAsync(() -> processMessage(message), asyncTaskExecutor);
    }

    private void processMessage(Message message) {
        String inputXmlMessage = null;
        boolean isSuccess = false;
        
        try {
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                inputXmlMessage = textMessage.getText();
            } else if (message instanceof BytesMessage) {
                BytesMessage byteMessage = (BytesMessage) message;
                inputXmlMessage = CommonHelperUtil.getMessageFromBytes(byteMessage);
            } else {
                logger.logError(null, "Invalid message type received while converting Message to XML", String.valueOf(HttpStatus.SC_BAD_REQUEST));
                errorQueuePublisher.publishErrorMessage(message);
                try {
                    message.acknowledge();
                } catch (JMSException jmsException) {
                    logger.logError(null, null, "Failed to Acknowledge XML message.",
                            String.valueOf(HttpStatus.SC_BAD_REQUEST), jmsException);
                }
            }
            -
            -

            if (isSuccessProcessed) {
                message.acknowledge();
                
            } else {
                
                message.acknowledge();
                // Publishing back to the same queue
                publishForRetry.publishMessageForRetry(message);
            }

        } catch (Exception e) {
            if (StringUtils.isBlank(serviceMetrics.getCorrelationId())) {
                serviceMetrics.setCorrelationId(UUID.getUUID());
            }

            logger.logError(null, null, "Exception while Converting Processing Message. Retrying to publish.",
                        String.valueOf(HttpStatus.SC_BAD_REQUEST), e);
                // Publishing back to the same queue
                publishForRetry.publishMessageForRetry(message);
                try {
                    message.acknowledge();
                } catch (JMSException jmsException) {
                    logger.logError(null, null,
                            "Failed to Acknowledge the Message when publishing" + "to Error Queue",
                            String.valueOf(HttpStatus.SC_BAD_REQUEST), jmsException);
                }
            }
        }
    }
}

spring-boot

ibm-mq

spring-jms

0 Answers

Your Answer

Accepted video resources