summaryrefslogtreecommitdiff
path: root/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java')
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java101
1 files changed, 92 insertions, 9 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
index 6874abe7d4..ef2cfb6cd4 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
@@ -20,21 +20,29 @@ package org.apache.qpid.disttest.jms;
import java.util.List;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
import javax.jms.Session;
+
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.disttest.DistributedTestException;
import org.apache.qpid.disttest.controller.config.QueueConfig;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
public class QpidQueueCreator implements QueueCreator
{
private static final Logger LOGGER = LoggerFactory.getLogger(QpidQueueCreator.class);
private static final FieldTable EMPTY_QUEUE_BIND_ARGUMENTS = new FieldTable();
+ private static final String QUEUE_CREATOR_DRAIN_POLL_TIMEOUT = "qpid.disttest.queue.creator.drainPollTime";
+ private static int _drainPollTimeout = Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 500);
@Override
- public void createQueues(Session session, List<QueueConfig> configs)
+ public void createQueues(Connection connection, Session session, List<QueueConfig> configs)
{
AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session;
for (QueueConfig queueConfig : configs)
@@ -44,12 +52,88 @@ public class QpidQueueCreator implements QueueCreator
}
@Override
- public void deleteQueues(Session session, List<QueueConfig> configs)
+ public void deleteQueues(Connection connection, Session session, List<QueueConfig> configs)
{
AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session;
for (QueueConfig queueConfig : configs)
{
- deleteQueue(amqSession, queueConfig);
+ AMQDestination destination = createAMQDestination(amqSession, queueConfig);
+
+ // drainQueue method is added because deletion of queue with a lot
+ // of messages takes time and might cause the timeout exception
+ drainQueue(connection, destination);
+
+ deleteQueue(amqSession, destination.getAMQQueueName());
+ }
+ }
+
+ private AMQDestination createAMQDestination(AMQSession<?, ?> amqSession, QueueConfig queueConfig)
+ {
+ try
+ {
+ return (AMQDestination) amqSession.createQueue(queueConfig.getName());
+ }
+ catch (Exception e)
+ {
+ throw new DistributedTestException("Failed to create amq destionation object:" + queueConfig, e);
+ }
+ }
+
+ private long getQueueDepth(AMQSession<?, ?> amqSession, AMQDestination destination)
+ {
+ try
+ {
+ long queueDepth = amqSession.getQueueDepth(destination);
+ return queueDepth;
+ }
+ catch (Exception e)
+ {
+ throw new DistributedTestException("Failed to query queue depth:" + destination, e);
+ }
+ }
+
+ private void drainQueue(Connection connection, AMQDestination destination)
+ {
+ Session noAckSession = null;
+ try
+ {
+ LOGGER.debug("About to drain the queue {}", destination.getQueueName());
+ noAckSession = connection.createSession(false, org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+ MessageConsumer messageConsumer = noAckSession.createConsumer(destination);
+
+ long currentQueueDepth = getQueueDepth((AMQSession<?,?>)noAckSession, destination);
+ int counter = 0;
+ while (currentQueueDepth > 0)
+ {
+ LOGGER.info("Queue {} has {} message(s)", destination.getQueueName(), currentQueueDepth);
+
+ while(messageConsumer.receive(_drainPollTimeout) != null)
+ {
+ counter++;
+ }
+
+ currentQueueDepth = getQueueDepth((AMQSession<?,?>)noAckSession, destination);
+ }
+ LOGGER.info("Drained {} message(s) from queue {} ", counter, destination.getQueueName());
+ messageConsumer.close();
+ }
+ catch (Exception e)
+ {
+ throw new DistributedTestException("Failed to drain queue:" + destination, e);
+ }
+ finally
+ {
+ if (noAckSession != null)
+ {
+ try
+ {
+ noAckSession.close();
+ }
+ catch (JMSException e)
+ {
+ throw new DistributedTestException("Failed to close n/a session:" + noAckSession, e);
+ }
+ }
}
}
@@ -66,7 +150,7 @@ public class QpidQueueCreator implements QueueCreator
EMPTY_QUEUE_BIND_ARGUMENTS, destination.getExchangeName(),
destination, autoDelete);
- LOGGER.debug("Created queue " + queueConfig);
+ LOGGER.debug("Created queue {}", queueConfig);
}
catch (Exception e)
{
@@ -74,20 +158,19 @@ public class QpidQueueCreator implements QueueCreator
}
}
- private void deleteQueue(AMQSession<?, ?> session, QueueConfig queueConfig)
+ private void deleteQueue(AMQSession<?, ?> session, AMQShortString queueName)
{
try
{
// The Qpid AMQSession API currently makes the #deleteQueue method protected and the
// raw protocol method public. This should be changed then we should switch the below to
// use #deleteQueue.
- AMQDestination destination = (AMQDestination) session.createQueue(queueConfig.getName());
- session.sendQueueDelete(destination.getAMQQueueName());
- LOGGER.debug("Deleted queue " + queueConfig.getName());
+ session.sendQueueDelete(queueName);
+ LOGGER.debug("Deleted queue {}", queueName);
}
catch (Exception e)
{
- throw new DistributedTestException("Failed to delete queue:" + queueConfig.getName(), e);
+ throw new DistributedTestException("Failed to delete queue:" + queueName, e);
}
}
}