diff options
Diffstat (limited to 'java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java')
-rw-r--r-- | java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java | 101 |
1 files changed, 92 insertions, 9 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java index 6874abe7d4..ef2cfb6cd4 100644 --- a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java +++ b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java @@ -20,21 +20,29 @@ package org.apache.qpid.disttest.jms; import java.util.List; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; import javax.jms.Session; + import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.disttest.DistributedTestException; import org.apache.qpid.disttest.controller.config.QueueConfig; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + public class QpidQueueCreator implements QueueCreator { private static final Logger LOGGER = LoggerFactory.getLogger(QpidQueueCreator.class); private static final FieldTable EMPTY_QUEUE_BIND_ARGUMENTS = new FieldTable(); + private static final String QUEUE_CREATOR_DRAIN_POLL_TIMEOUT = "qpid.disttest.queue.creator.drainPollTime"; + private static int _drainPollTimeout = Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 500); @Override - public void createQueues(Session session, List<QueueConfig> configs) + public void createQueues(Connection connection, Session session, List<QueueConfig> configs) { AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session; for (QueueConfig queueConfig : configs) @@ -44,12 +52,88 @@ public class QpidQueueCreator implements QueueCreator } @Override - public void deleteQueues(Session session, List<QueueConfig> configs) + public void deleteQueues(Connection connection, Session session, List<QueueConfig> configs) { AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session; for (QueueConfig queueConfig : configs) { - deleteQueue(amqSession, queueConfig); + AMQDestination destination = createAMQDestination(amqSession, queueConfig); + + // drainQueue method is added because deletion of queue with a lot + // of messages takes time and might cause the timeout exception + drainQueue(connection, destination); + + deleteQueue(amqSession, destination.getAMQQueueName()); + } + } + + private AMQDestination createAMQDestination(AMQSession<?, ?> amqSession, QueueConfig queueConfig) + { + try + { + return (AMQDestination) amqSession.createQueue(queueConfig.getName()); + } + catch (Exception e) + { + throw new DistributedTestException("Failed to create amq destionation object:" + queueConfig, e); + } + } + + private long getQueueDepth(AMQSession<?, ?> amqSession, AMQDestination destination) + { + try + { + long queueDepth = amqSession.getQueueDepth(destination); + return queueDepth; + } + catch (Exception e) + { + throw new DistributedTestException("Failed to query queue depth:" + destination, e); + } + } + + private void drainQueue(Connection connection, AMQDestination destination) + { + Session noAckSession = null; + try + { + LOGGER.debug("About to drain the queue {}", destination.getQueueName()); + noAckSession = connection.createSession(false, org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); + MessageConsumer messageConsumer = noAckSession.createConsumer(destination); + + long currentQueueDepth = getQueueDepth((AMQSession<?,?>)noAckSession, destination); + int counter = 0; + while (currentQueueDepth > 0) + { + LOGGER.info("Queue {} has {} message(s)", destination.getQueueName(), currentQueueDepth); + + while(messageConsumer.receive(_drainPollTimeout) != null) + { + counter++; + } + + currentQueueDepth = getQueueDepth((AMQSession<?,?>)noAckSession, destination); + } + LOGGER.info("Drained {} message(s) from queue {} ", counter, destination.getQueueName()); + messageConsumer.close(); + } + catch (Exception e) + { + throw new DistributedTestException("Failed to drain queue:" + destination, e); + } + finally + { + if (noAckSession != null) + { + try + { + noAckSession.close(); + } + catch (JMSException e) + { + throw new DistributedTestException("Failed to close n/a session:" + noAckSession, e); + } + } } } @@ -66,7 +150,7 @@ public class QpidQueueCreator implements QueueCreator EMPTY_QUEUE_BIND_ARGUMENTS, destination.getExchangeName(), destination, autoDelete); - LOGGER.debug("Created queue " + queueConfig); + LOGGER.debug("Created queue {}", queueConfig); } catch (Exception e) { @@ -74,20 +158,19 @@ public class QpidQueueCreator implements QueueCreator } } - private void deleteQueue(AMQSession<?, ?> session, QueueConfig queueConfig) + private void deleteQueue(AMQSession<?, ?> session, AMQShortString queueName) { try { // The Qpid AMQSession API currently makes the #deleteQueue method protected and the // raw protocol method public. This should be changed then we should switch the below to // use #deleteQueue. - AMQDestination destination = (AMQDestination) session.createQueue(queueConfig.getName()); - session.sendQueueDelete(destination.getAMQQueueName()); - LOGGER.debug("Deleted queue " + queueConfig.getName()); + session.sendQueueDelete(queueName); + LOGGER.debug("Deleted queue {}", queueName); } catch (Exception e) { - throw new DistributedTestException("Failed to delete queue:" + queueConfig.getName(), e); + throw new DistributedTestException("Failed to delete queue:" + queueName, e); } } } |