diff options
| author | Keith Wall <kwall@apache.org> | 2012-09-19 12:53:30 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2012-09-19 12:53:30 +0000 |
| commit | db7133718a34e1b7a7a4b1328d223141fc7c6142 (patch) | |
| tree | 591d7d7f64465982f3f403b04e76caccaf7f57d9 /qpid/java/perftests/src/main | |
| parent | 6b151e0081390e449a0cf002352c27ad08c0abe1 (diff) | |
| download | qpid-python-db7133718a34e1b7a7a4b1328d223141fc7c6142.tar.gz | |
QPID-4321: Perf tests should not try to call Message#acknowledge on a producing session
* ProducerParticipant makes erroneous call to Message#acknowledge
* Externalise the poll timeout used by QpidQueueCreator to drain the queue after test
* Topic-AckModes.js - replace tests for client-ack, dups-okay-ack with session transacted
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1387565 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/perftests/src/main')
4 files changed, 49 insertions, 46 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java index f9d50e8e64..8c69e5694b 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java @@ -174,7 +174,7 @@ public class ConsumerParticipant implements Participant { LOGGER.trace("Committing: batch size " + _command.getBatchSize() ); } - _jmsDelegate.commitOrAcknowledgeMessage(message, _command.getSessionName()); + _jmsDelegate.commitOrAcknowledgeMessageIfNecessary(_command.getSessionName(), message); } } @@ -199,7 +199,7 @@ public class ConsumerParticipant implements Participant } // commit/acknowledge remaining messages if necessary - _jmsDelegate.commitOrAcknowledgeMessage(message, _command.getSessionName()); + _jmsDelegate.commitOrAcknowledgeMessageIfNecessary(_command.getSessionName(), message); } return false; } diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java index 63cbe98b5c..567deea6f4 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java @@ -119,7 +119,7 @@ public class ProducerParticipant implements Participant { LOGGER.trace("Committing: batch size " + _command.getBatchSize() ); } - _jmsDelegate.commitOrAcknowledgeMessage(lastPublishedMessage, _command.getSessionName()); + _jmsDelegate.commitIfNecessary(_command.getSessionName()); doSleepForInterval(); } @@ -138,7 +138,7 @@ public class ProducerParticipant implements Participant { LOGGER.trace("Committing: batch size " + _command.getBatchSize() ); } - _jmsDelegate.commitOrAcknowledgeMessage(lastPublishedMessage, _command.getSessionName()); + _jmsDelegate.commitIfNecessary(_command.getSessionName()); } Date start = new Date(startTime); diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java index 3f8afc9a9a..a177770a30 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java @@ -373,30 +373,6 @@ public class ClientJmsDelegate } } - public void commitOrAcknowledgeMessage(final Message message, final String sessionName) - { - try - { - final Session session = _testSessions.get(sessionName); - if (session.getTransacted()) - { - synchronized(session) - { - session.commit(); - } - } - else if (message != null && session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) - { - message.acknowledge(); - } - } - catch (final JMSException jmse) - { - throw new DistributedTestException("Unable to commit or acknowledge message on session: " + - sessionName, jmse); - } - } - public int getAcknowledgeMode(final String sessionName) { try @@ -493,31 +469,36 @@ public class ClientJmsDelegate } } - public void rollbackOrRecover(String sessionName) + public void commitOrAcknowledgeMessageIfNecessary(final String sessionName, final Message message) { try { final Session session = _testSessions.get(sessionName); - synchronized(session) + if (session.getTransacted()) { - if (session.getTransacted()) - { - session.rollback(); - } - else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) + synchronized(session) { - session.recover(); + session.commit(); } } + else if (message != null && session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) + { + message.acknowledge(); + } } catch (final JMSException jmse) { - throw new DistributedTestException("Unable to rollback or recover on session: " + + throw new DistributedTestException("Unable to commit or acknowledge message on session: " + sessionName, jmse); } } - public void releaseMessage(String sessionName) + public void commitIfNecessary(final String sessionName) + { + commitOrAcknowledgeMessageIfNecessary(sessionName, null); + } + + public void rollbackOrRecoverIfNecessary(String sessionName) { try { @@ -528,7 +509,7 @@ public class ClientJmsDelegate { session.rollback(); } - else + else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { session.recover(); } @@ -536,7 +517,8 @@ public class ClientJmsDelegate } catch (final JMSException jmse) { - LOGGER.warn("Unable to rollback or recover on session: " + sessionName, jmse); + throw new DistributedTestException("Unable to rollback or recover on session: " + + sessionName, jmse); } } diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java index 4ce8efeae2..0b906d228f 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java @@ -24,6 +24,8 @@ import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; + +import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.disttest.DistributedTestException; @@ -36,6 +38,8 @@ public class QpidQueueCreator implements QueueCreator { private static final Logger LOGGER = LoggerFactory.getLogger(QpidQueueCreator.class); private static final FieldTable EMPTY_QUEUE_BIND_ARGUMENTS = new FieldTable(); + private static final String QUEUE_CREATOR_DRAIN_POLL_TIMEOUT = "qpid.disttest.queue.creator.drainPollTime"; + private static int _drainPollTimeout = Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 5000); @Override public void createQueues(Connection connection, Session session, List<QueueConfig> configs) @@ -57,7 +61,10 @@ public class QpidQueueCreator implements QueueCreator // drainQueue method is added because deletion of queue with a lot // of messages takes time and might cause the timeout exception - drainQueue(connection, destination); + if (queueHasMessages(amqSession, destination)) + { + drainQueue(connection, destination); + } deleteQueue(amqSession, destination.getAMQQueueName()); } } @@ -74,20 +81,34 @@ public class QpidQueueCreator implements QueueCreator } } + private boolean queueHasMessages(AMQSession<?, ?> amqSession, AMQDestination destination) + { + try + { + long queueDepth = amqSession.getQueueDepth(destination); + LOGGER.info("Queue {} has {} message(s)", destination.getQueueName(), queueDepth); + return queueDepth > 0; + } + catch (Exception e) + { + throw new DistributedTestException("Failed to query queue depth:" + destination, e); + } + } + private void drainQueue(Connection connection, AMQDestination destination) { Session noAckSession = null; try { - LOGGER.debug("About to drain the queue " + destination); + LOGGER.debug("About to drain the queue {}", destination.getQueueName()); noAckSession = connection.createSession(false, org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); MessageConsumer messageConsumer = noAckSession.createConsumer(destination); int counter = 0; - while(messageConsumer.receive(1000l) != null) + while(messageConsumer.receive(_drainPollTimeout) != null) { counter++; } - LOGGER.debug("Drained " + counter + " messages from queue " + destination); + LOGGER.info("Drained {} message(s) from queue {} ", counter, destination.getQueueName()); messageConsumer.close(); } catch (Exception e) @@ -123,7 +144,7 @@ public class QpidQueueCreator implements QueueCreator EMPTY_QUEUE_BIND_ARGUMENTS, destination.getExchangeName(), destination, autoDelete); - LOGGER.debug("Created queue " + queueConfig); + LOGGER.debug("Created queue {}", queueConfig); } catch (Exception e) { @@ -139,7 +160,7 @@ public class QpidQueueCreator implements QueueCreator // raw protocol method public. This should be changed then we should switch the below to // use #deleteQueue. session.sendQueueDelete(queueName); - LOGGER.debug("Deleted queue " + queueName); + LOGGER.debug("Deleted queue {}", queueName); } catch (Exception e) { |
