From 02dc201883cf730b11754aa7861e1d540624ef42 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Wed, 25 Jul 2012 22:37:21 +0000 Subject: QPID-4164: Prevent the erroneous re-storing of recovered messages during move/copyMessage management functions. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1365832 13f79535-47bb-0310-9956-ffa450edef68 --- .../management/jmx/QueueManagementTest.java | 96 ++++++++++++++++++++-- 1 file changed, 89 insertions(+), 7 deletions(-) (limited to 'java/broker-plugins') diff --git a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java index 8ae4fec975..79d04b239e 100644 --- a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java +++ b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java @@ -41,10 +41,12 @@ import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; +import javax.jms.TextMessage; import javax.management.Notification; import javax.management.NotificationListener; import javax.management.openmbean.CompositeData; import javax.management.openmbean.TabularData; +import javax.naming.NamingException; import java.util.ArrayList; import java.util.Arrays; @@ -83,7 +85,6 @@ public class QueueManagementTest extends QpidBrokerTestCase private ManagedQueue _managedSourceQueue; private ManagedQueue _managedDestinationQueue; - public void setUp() throws Exception { _jmxUtils = new JMXTestUtils(this); @@ -93,10 +94,8 @@ public class QueueManagementTest extends QpidBrokerTestCase _sourceQueueName = getTestQueueName() + "_src"; _destinationQueueName = getTestQueueName() + "_dest"; - _connection = getConnection(); - _connection.start(); + createConnectionAndSession(); - _session = _connection.createSession(true, Session.SESSION_TRANSACTED); _sourceQueue = _session.createQueue(_sourceQueueName); _destinationQueue = _session.createQueue(_destinationQueueName); createQueueOnBroker(_sourceQueue); @@ -104,8 +103,7 @@ public class QueueManagementTest extends QpidBrokerTestCase _jmxUtils.open(); - _managedSourceQueue = _jmxUtils.getManagedQueue(_sourceQueueName); - _managedDestinationQueue = _jmxUtils.getManagedQueue(_destinationQueueName); + createManagementInterfacesForQueues(); } public void tearDown() throws Exception @@ -498,6 +496,70 @@ public class QueueManagementTest extends QpidBrokerTestCase assertEquals("Did not consume all messages from destination queue", numberOfMessagesToSend, totalConsumed.intValue()); } + /** + * Tests {@link ManagedQueue#moveMessages(long, long, String)} interface. + */ + public void testMoveMessageBetweenQueuesWithBrokerRestart() throws Exception + { + final int numberOfMessagesToSend = 1; + + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + syncSession(_session); + assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + restartBroker(); + + createManagementInterfacesForQueues(); + createConnectionAndSession(); + + List amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + + // Move messages to destination + long messageId = amqMessagesIds.get(0); + _managedSourceQueue.moveMessages(messageId, messageId, _destinationQueueName); + + assertEquals("Unexpected queue depth on destination queue after move", 1, _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after move", 0, _managedSourceQueue.getMessageCount().intValue()); + + assertMessageIndicesOn(_destinationQueue, 0); + } + + /** + * Tests {@link ManagedQueue#copyMessages(long, long, String)} interface. + */ + public void testCopyMessageBetweenQueuesWithBrokerRestart() throws Exception + { + final int numberOfMessagesToSend = 1; + + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + syncSession(_session); + assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + restartBroker(); + + createManagementInterfacesForQueues(); + createConnectionAndSession(); + + List amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + + // Move messages to destination + long messageId = amqMessagesIds.get(0); + _managedSourceQueue.copyMessages(messageId, messageId, _destinationQueueName); + + assertEquals("Unexpected queue depth on destination queue after copy", 1, _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after copy", 1, _managedSourceQueue.getMessageCount().intValue()); + + assertMessageIndicesOn(_destinationQueue, 0); + } + + @Override + public Message createNextMessage(Session session, int messageNumber) throws JMSException + { + Message message = session.createTextMessage(getContentForMessageNumber(messageNumber)); + message.setIntProperty(INDEX, messageNumber); + return message; + } + private void startAsyncConsumerOn(Destination queue, Connection asyncConnection, final CountDownLatch requiredNumberOfMessagesRead, final AtomicInteger totalConsumed) throws Exception { @@ -521,9 +583,10 @@ public class QueueManagementTest extends QpidBrokerTestCase for (int i : expectedIndices) { - Message message = consumer.receive(1000); + TextMessage message = (TextMessage)consumer.receive(1000); assertNotNull("Expected message with index " + i, message); assertEquals("Expected message with index " + i, i, message.getIntProperty(INDEX)); + assertEquals("Expected message content", getContentForMessageNumber(i), message.getText()); } assertNull("Unexpected message encountered", consumer.receive(1000)); @@ -574,6 +637,25 @@ public class QueueManagementTest extends QpidBrokerTestCase ((AMQSession)session).sync(); } + private void createConnectionAndSession() throws JMSException, + NamingException + { + _connection = getConnection(); + _connection.start(); + _session = _connection.createSession(true, Session.SESSION_TRANSACTED); + } + + private void createManagementInterfacesForQueues() + { + _managedSourceQueue = _jmxUtils.getManagedQueue(_sourceQueueName); + _managedDestinationQueue = _jmxUtils.getManagedQueue(_destinationQueueName); + } + + private String getContentForMessageNumber(int msgCount) + { + return "Message count " + msgCount; + } + private final class RecordingNotificationListener implements NotificationListener { private final CountDownLatch _notificationReceivedLatch; -- cgit v1.2.1