1 year ago

#291173

test-img

tantalum

Oracle AQ/JMS - Why is the queue being purged on application shutdown?

I have an application that queues and deques messages from Oracle AQ using the JMS interface. When the application is running items get queued and dequeued and I can see queued items in the queue table. However, one the application shuts down the queue table is cleared and the application cannot access the previously queued items. Any idea what might cause that behavior?

The Oracle AQ is created using this code:

BEGIN
dbms_aqadm.create_queue_table(
  queue_table => 'schema.my_queuetable',
  sort_list =>'priority,enq_time',
  comment => 'Queue table to hold my data',
  multiple_consumers => FALSE, -- THis is necessary so that a message is only processed by a single consumer
  queue_payload_type => 'SYS.AQ$_JMS_OBJECT_MESSAGE',
    compatible         => '10.0.0',
    storage_clause     => 'TABLESPACE LGQUEUE_IRM01');
END;
/

BEGIN
dbms_aqadm.create_queue (
   queue_name              => 'schema.my_queue',
   queue_table             => 'schema.my_queuetable');
END;
/

BEGIN
dbms_aqadm.start_queue(queue_name=>'schema.my_queue');
END;
/

I also have a Java class for connecting to the queue, queueing items and processing dequeued items like this:

public class MyOperationsQueueImpl implements MyOperationsQueue {
  private static final Log LOGGER = LogFactory.getLog(MyOperationsQueueImpl.class);


  private final QueueConnection queueConnection;
  private final QueueSession producerQueueSession;
  private final QueueSession consumerQueueSession;
  private final String queueName;
  private final QueueSender queueSender;
  private final QueueReceiver queueReceiver;
  private MyOperationsQueue.MyOperationEventReceiver eventReceiver;

  public MyOperationsQueueImpl(DBUtils dbUtils, String queueName) throws MyException {
    this.eventReceiver = null;
    this.queueName = queueName;
    try {
      DataSource ds = dbUtils.getDataSource();
      QueueConnectionFactory connectionFactory = AQjmsFactory.getQueueConnectionFactory(ds);
      this.queueConnection = connectionFactory.createQueueConnection();

      // We create separate producer and consumer sessions because that is what is recommended by the docs
      // See: https://docs.oracle.com/javaee/6/api/javax/jms/Session.html
      this.producerQueueSession = this.queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
      this.consumerQueueSession = this.queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
      this.queueSender = this.producerQueueSession.createSender(this.producerQueueSession.createQueue(this.queueName));
      this.queueReceiver = this.consumerQueueSession.createReceiver(this.consumerQueueSession.createQueue(this.queueName));
      this.queueConnection.start();
    } catch (JMSException| NamingException exception) {
      throw new MyOperationException("Failed to create MyOperationsQueue", exception);
    }
  }

  @Override
  protected void finalize() throws Throwable {
    this.queueReceiver.close();
    this.queueSender.close();
    this.consumerQueueSession.close();
    this.producerQueueSession.close();
    this.queueConnection.close();
    super.finalize();
  }

  @Override
  public void submitMyOperation(MyOperationParameters myParameters) throws MyOperationException {
    try {
      ObjectMessage message = this.producerQueueSession.createObjectMessage(myParameters);
      this.queueSender.send(message);
      synchronized (this) {
        if(this.eventReceiver != null) {
          this.eventReceiver.onOperationSubmitted(message.getJMSMessageID(), myParameters);
        }
      }
    } catch (JMSException exc) {
      throw new MyOperationException("Failed to submit my operation", exc);
    }
  }

  @Override
  public void setMyOperationEventReceiver(MyOperationEventReceiver operationReceiver) throws MyOperationException {
    LOGGER.debug("Setting my operation event receiver");
    synchronized (this) {
      if(this.eventReceiver != null) {
        throw new IllegalStateException("Cannot set an operation event receiver if it is already set");
      }
      this.eventReceiver = operationReceiver;
      try {
        this.queueReceiver.setMessageListener(message -> {
          LOGGER.debug("New message received from queue receiver");
          try {
            ObjectMessage objectMessage = (ObjectMessage) message;
            eventReceiver.onOperationReady(message.getJMSMessageID(), (MyOperationParameters) objectMessage.getObject());
          } catch (Exception exception) {
            try {
              eventReceiver.onOperationRetrievalFailed(message.getJMSMessageID(), exception);
            } catch (JMSException innerException) {
              LOGGER.error("Failed to get message ID for JMS Message: "+message, innerException);
            }
          }
        });
      } catch (JMSException exc) {
        throw new MyOperationException("Failed to set My message listener", exc);
      }
    }
  }

}

java

oracle

jms

oracle-aq

0 Answers

Your Answer

Accepted video resources