diff options
| author | Keith Wall <kwall@apache.org> | 2012-02-26 22:56:00 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2012-02-26 22:56:00 +0000 |
| commit | c10cf720e7a4f499558b7b7843f5c28c675e990d (patch) | |
| tree | 0c86a8fadc43a69206146412d13e9484be15a705 /qpid/java | |
| parent | 814a72bbb5a1bf317c1319c4ae654e45e3033903 (diff) | |
| download | qpid-python-c10cf720e7a4f499558b7b7843f5c28c675e990d.tar.gz | |
QPID-3867: AMQQueueMBean#clearQueue|moveMessages|copyMessages should be certain to rollback transactions in the event of exception
Added finally{} block so txn will be rolled back in the event of any non-normal completion. Refactored the AMQQueue abstraction so that the caller
no longer has to provide a server transaction for move and copy operations. Bolstered the system tests around copy and move JMX management operations.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1293951 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
6 files changed, 320 insertions, 124 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index a3da598bdf..e643338c3d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -34,7 +34,6 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.List; @@ -141,10 +140,9 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition); - void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, - ServerTransaction transaction); + void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName); - void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction transaction); + void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName); void removeMessagesFromQueue(long fromMessageId, long toMessageId); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 7c59097965..b0d4cb3486 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -36,8 +36,6 @@ import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.txn.LocalTransaction; -import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.transport.MessageProperties; import javax.management.JMException; @@ -613,9 +611,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\""); } - ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore()); - _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn); - txn.commit(); + _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName); } /** @@ -648,11 +644,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\""); } - ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore()); - - _queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn); - - txn.commit(); + _queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName); } /** diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 7d0eb0c838..30c2846732 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1198,19 +1198,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes public void moveMessagesToAnotherQueue(final long fromMessageId, final long toMessageId, - String queueName, - ServerTransaction txn) throws IllegalArgumentException + String destinationQueueName) throws IllegalArgumentException { - final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); - if (toQueue == null) - { - throw new IllegalArgumentException("Queue '" + queueName + "' is not registered with the virtualhost."); - } - else if (toQueue == this) - { - throw new IllegalArgumentException("The destination queue cant be the same as the source queue"); - } + final AMQQueue toQueue = getValidatedDestinationQueue(destinationQueueName); List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() { @@ -1230,65 +1221,68 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes }); - - // Move the messages in on the message store. - for (final QueueEntry entry : entries) + final ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore()); + boolean shouldRollback = true; + try { - final ServerMessage message = entry.getMessage(); - txn.enqueue(toQueue, message, - new ServerTransaction.Action() - { - - public void postCommit() + // Move the messages in on the message store. + for (final QueueEntry entry : entries) + { + final ServerMessage message = entry.getMessage(); + txn.enqueue(toQueue, message, + new ServerTransaction.Action() { - try + + public void postCommit() { - toQueue.enqueue(message); + try + { + toQueue.enqueue(message); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } } - catch (AMQException e) + + public void onRollback() { - throw new RuntimeException(e); + entry.release(); } - } - - public void onRollback() + }); + txn.dequeue(this, message, + new ServerTransaction.Action() { - entry.release(); - } - }); - txn.dequeue(this, message, - new ServerTransaction.Action() - { - public void postCommit() - { - entry.discard(); - } - - public void onRollback() - { + public void postCommit() + { + entry.discard(); + } - } - }); + public void onRollback() + { + } + }); + } + txn.commit(); + shouldRollback = false; + } + finally + { + if (shouldRollback) + { + txn.rollback(); + } } } public void copyMessagesToAnotherQueue(final long fromMessageId, final long toMessageId, - String queueName, - final ServerTransaction txn) throws IllegalArgumentException + String destinationQueueName) throws IllegalArgumentException { - final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); - if (toQueue == null) - { - throw new IllegalArgumentException("Queue '" + queueName + "' is not registered with the virtualhost."); - } - else if (toQueue == this) - { - throw new IllegalArgumentException("The destination queue cant be the same as the source queue"); - } + final AMQQueue toQueue = getValidatedDestinationQueue(destinationQueueName); List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() { @@ -1306,36 +1300,63 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } }); - - // Move the messages in on the message store. - for (QueueEntry entry : entries) + final ServerTransaction txn = new LocalTransaction(_virtualHost.getMessageStore()); + boolean shouldRollback = true; + try { - final ServerMessage message = entry.getMessage(); - - txn.enqueue(toQueue, message, new ServerTransaction.Action() + // Copy the messages in on the message store. + for (QueueEntry entry : entries) { - public void postCommit() + final ServerMessage message = entry.getMessage(); + + txn.enqueue(toQueue, message, new ServerTransaction.Action() { - try + public void postCommit() { - toQueue.enqueue(message); + try + { + toQueue.enqueue(message); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } } - catch (AMQException e) + + public void onRollback() { - throw new RuntimeException(e); } - } + }); - public void onRollback() - { - - } - }); + } + txn.commit(); + shouldRollback = false; + } + finally + { + if (shouldRollback) + { + txn.rollback(); + } } } + private AMQQueue getValidatedDestinationQueue(String queueName) + { + final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); + if (toQueue == null) + { + throw new IllegalArgumentException("Queue '" + queueName + "' is not registered with the virtualhost."); + } + else if (toQueue == this) + { + throw new IllegalArgumentException("The destination queue can't be the same as the source queue"); + } + return toQueue; + } + public void removeMessagesFromQueue(long fromMessageId, long toMessageId) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 34dc5b4428..afaa417415 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -34,7 +34,6 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.List; @@ -357,12 +356,12 @@ public class MockAMQQueue implements AMQQueue return null; } - public void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext) + public void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName) { } - public void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext) + public void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName) { } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index c345384e28..9f022dcdde 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -45,7 +45,6 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.InternalBrokerBaseCase; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -839,7 +838,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase /** * Tests that dequeued message is not copied as part of invocation of - * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, ServerTransaction)} + * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String)} */ public void testCopyMessagesWithDequeuedEntry() { @@ -856,14 +855,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase // create another queue SimpleAMQQueue queue = createQueue(anotherQueueName); - // create transaction - ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore()); - // copy messages into another queue - _queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn); - - // commit transaction - txn.commit(); + _queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName); // get messages on another queue List<QueueEntry> entries = queue.getMessagesOnTheQueue(); @@ -889,7 +882,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase /** * Tests that dequeued message is not moved as part of invocation of - * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, ServerTransaction)} + * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String)} */ public void testMovedMessagesWithDequeuedEntry() { @@ -906,14 +899,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase // create another queue SimpleAMQQueue queue = createQueue(anotherQueueName); - // create transaction - ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore()); - // move messages into another queue - _queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn); - - // commit transaction - txn.commit(); + _queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName); // get messages on another queue List<QueueEntry> entries = queue.getMessagesOnTheQueue(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java index 0604955290..22e175b6b3 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java @@ -20,6 +20,8 @@ package org.apache.qpid.management.jmx; import org.apache.commons.lang.time.FastDateFormat; +import org.apache.log4j.Logger; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.server.queue.AMQQueueMBean; import org.apache.qpid.test.utils.JMXTestUtils; @@ -27,15 +29,25 @@ import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; import javax.jms.Session; -import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeData; import javax.management.openmbean.TabularData; + +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * Tests the JMX API for the Managed Queue. @@ -43,17 +55,41 @@ import java.util.Map; */ public class ManagedQueueMBeanTest extends QpidBrokerTestCase { - /** - * JMX helper. - */ + protected static final Logger LOGGER = Logger.getLogger(ManagedQueueMBeanTest.class); + private JMXTestUtils _jmxUtils; + private Connection _connection; + private Session _session; + + private String _sourceQueueName; + private String _destinationQueueName; + private Destination _sourceQueue; + private Destination _destinationQueue; + private ManagedQueue _managedSourceQueue; + private ManagedQueue _managedDestinationQueue; public void setUp() throws Exception { _jmxUtils = new JMXTestUtils(this); _jmxUtils.setUp(); + super.setUp(); + _sourceQueueName = getTestQueueName() + "_src"; + _destinationQueueName = getTestQueueName() + "_dest"; + + _connection = getConnection(); + _connection.start(); + + _session = _connection.createSession(true, Session.SESSION_TRANSACTED); + _sourceQueue = _session.createQueue(_sourceQueueName); + _destinationQueue = _session.createQueue(_destinationQueueName); + createQueueOnBroker(_sourceQueue); + createQueueOnBroker(_destinationQueue); + _jmxUtils.open(); + + _managedSourceQueue = _jmxUtils.getManagedQueue(_sourceQueueName); + _managedDestinationQueue = _jmxUtils.getManagedQueue(_destinationQueueName); } public void tearDown() throws Exception @@ -70,28 +106,17 @@ public class ManagedQueueMBeanTest extends QpidBrokerTestCase */ public void testViewSingleMessage() throws Exception { - final String queueName = getTestQueueName(); - - // Create queue and send numMessages messages to it. - final Connection con = getConnection(); - final Session session = con.createSession(true, Session.SESSION_TRANSACTED); - final Destination dest = session.createQueue(queueName); - session.createConsumer(dest).close(); // Create a consumer only to cause queue creation - - final List<Message> sentMessages = sendMessage(session, dest, 1); + final List<Message> sentMessages = sendMessage(_session, _sourceQueue, 1); final Message sentMessage = sentMessages.get(0); - // Obtain the management interface. - final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); - assertNotNull("ManagedQueue expected to be available", managedQueue); - assertEquals("Unexpected queue depth", 1, managedQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth", 1, _managedSourceQueue.getMessageCount().intValue()); // Check the contents of the message - final TabularData tab = managedQueue.viewMessages(1l, 1l); + final TabularData tab = _managedSourceQueue.viewMessages(1l, 1l); assertEquals("Unexpected number of rows in table", 1, tab.size()); - final Iterator<CompositeDataSupport> rowItr = (Iterator<CompositeDataSupport>) tab.values().iterator(); + final Iterator<CompositeData> rowItr = (Iterator<CompositeData>) tab.values().iterator(); - final CompositeDataSupport row1 = rowItr.next(); + final CompositeData row1 = rowItr.next(); assertNotNull("Message should have AMQ message id", row1.get(ManagedQueue.MSG_AMQ_ID)); assertEquals("Unexpected queue position", 1l, row1.get(ManagedQueue.MSG_QUEUE_POS)); assertEquals("Unexpected redelivered flag", Boolean.FALSE, row1.get(ManagedQueue.MSG_REDELIVERED)); @@ -109,6 +134,175 @@ public class ManagedQueueMBeanTest extends QpidBrokerTestCase } /** + * Tests {@link ManagedQueue#moveMessages(long, long, String)} interface. + */ + public void testMoveMessagesBetweenQueues() throws Exception + { + final int numberOfMessagesToSend = 10; + + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + + // Move first three messages to destination + long fromMessageId = amqMessagesIds.get(0); + long toMessageId = amqMessagesIds.get(2); + _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); + + assertEquals("Unexpected queue depth on destination queue after first move", 3, _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after first move", 7, _managedSourceQueue.getMessageCount().intValue()); + + // Now move a further two messages to destination + fromMessageId = amqMessagesIds.get(7); + toMessageId = amqMessagesIds.get(8); + _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); + assertEquals("Unexpected queue depth on destination queue after second move", 5, _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after second move", 5, _managedSourceQueue.getMessageCount().intValue()); + + assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8); + } + + /** + * Tests {@link ManagedQueue#copyMessages(long, long, String)} interface. + */ + public void testCopyMessagesBetweenQueues() throws Exception + { + final int numberOfMessagesToSend = 10; + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + + // Copy first three messages to destination + long fromMessageId = amqMessagesIds.get(0); + long toMessageId = amqMessagesIds.get(2); + _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); + + assertEquals("Unexpected queue depth on destination queue after first copy", 3, _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after first copy", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + // Now copy a further two messages to destination + fromMessageId = amqMessagesIds.get(7); + toMessageId = amqMessagesIds.get(8); + _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); + assertEquals("Unexpected queue depth on destination queue after second copy", 5, _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after second copy", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8); + } + + public void testMoveMessagesBetweenQueuesWithActiveConsumerOnSourceQueue() throws Exception + { + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString()); + Connection asyncConnection = getConnection(); + asyncConnection.start(); + + final int numberOfMessagesToSend = 50; + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + + long fromMessageId = amqMessagesIds.get(0); + long toMessageId = amqMessagesIds.get(numberOfMessagesToSend - 1); + + CountDownLatch consumerReadToHalfwayLatch = new CountDownLatch(numberOfMessagesToSend / 2); + AtomicInteger totalConsumed = new AtomicInteger(0); + startAsyncConsumerOn(_sourceQueue, asyncConnection, consumerReadToHalfwayLatch, totalConsumed); + + boolean halfwayPointReached = consumerReadToHalfwayLatch.await(5000, TimeUnit.MILLISECONDS); + assertTrue("Did not read half of messages within time allowed", halfwayPointReached); + + _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); + + asyncConnection.stop(); + + // The exact number of messages moved will be non deterministic, but the number of messages moved + // plus the number consumed should be equal to the number we originally sent. + + int numberOfMessagesReadByConsumer = totalConsumed.intValue(); + int numberOfMessagesOnDestinationQueue = _managedDestinationQueue.getMessageCount().intValue(); + LOGGER.debug("Async consumer read : " + numberOfMessagesReadByConsumer + + " Number of messages moved to destination : " + numberOfMessagesOnDestinationQueue); + assertEquals(numberOfMessagesToSend, numberOfMessagesReadByConsumer + numberOfMessagesOnDestinationQueue); + + int numberOfMessagesRemainingOnSourceQueue = _managedSourceQueue.getMessageCount().intValue(); + assertEquals(0, numberOfMessagesRemainingOnSourceQueue); + } + + public void testMoveMessagesBetweenQueuesWithActiveConsumerOnDestinationQueue() throws Exception + { + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString()); + Connection asyncConnection = getConnection(); + asyncConnection.start(); + + final int numberOfMessagesToSend = 50; + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + long fromMessageId = amqMessagesIds.get(0); + long toMessageId = amqMessagesIds.get(numberOfMessagesToSend - 1); + + AtomicInteger totalConsumed = new AtomicInteger(0); + CountDownLatch allMessagesConsumedLatch = new CountDownLatch(numberOfMessagesToSend); + startAsyncConsumerOn(_destinationQueue, asyncConnection, allMessagesConsumedLatch, totalConsumed); + + _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); + + allMessagesConsumedLatch.await(5000, TimeUnit.MILLISECONDS); + assertEquals("Did not consume all messages from destination queue", numberOfMessagesToSend, totalConsumed.intValue()); + } + + private void startAsyncConsumerOn(Destination queue, Connection asyncConnection, + final CountDownLatch requiredNumberOfMessagesRead, final AtomicInteger totalConsumed) throws Exception + { + Session session = asyncConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queue); + consumer.setMessageListener(new MessageListener() + { + + @Override + public void onMessage(Message arg0) + { + totalConsumed.incrementAndGet(); + requiredNumberOfMessagesRead.countDown(); + } + }); + } + + private void assertMessageIndicesOn(Destination queue, int... expectedIndexes) throws Exception + { + MessageConsumer consumer = _session.createConsumer(queue); + + for (int i : expectedIndexes) + { + Message message = consumer.receive(1000); + assertNotNull("Expected message with index " + i, message); + assertEquals("Expected message with index " + i, i, message.getIntProperty(INDEX)); + } + + assertNull("Unexpected message encountered", consumer.receive(1000)); + } + + private List<Long> getAMQMessageIdsOn(ManagedQueue managedQueue, long startIndex, long endIndex) throws Exception + { + final SortedSet<Long> messageIds = new TreeSet<Long>(); + + final TabularData tab = managedQueue.viewMessages(startIndex, endIndex); + final Iterator<CompositeData> rowItr = (Iterator<CompositeData>) tab.values().iterator(); + while(rowItr.hasNext()) + { + final CompositeData row = rowItr.next(); + long amqMessageId = (Long)row.get(ManagedQueue.MSG_AMQ_ID); + messageIds.add(amqMessageId); + } + + return new ArrayList<Long>(messageIds); + } + + /** * * Utility method to convert array of Strings in the form x = y into a * map with key/value x => y. @@ -126,4 +320,9 @@ public class ManagedQueueMBeanTest extends QpidBrokerTestCase } return headerMap; } + + private void createQueueOnBroker(Destination destination) throws JMSException + { + _session.createConsumer(destination).close(); // Create a consumer only to cause queue creation + } } |
