summaryrefslogtreecommitdiff
path: root/java/broker-plugins
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-07-25 22:37:21 +0000
committerKeith Wall <kwall@apache.org>2012-07-25 22:37:21 +0000
commit02dc201883cf730b11754aa7861e1d540624ef42 (patch)
tree54dec54a1789955f3ed5881339860af30897a316 /java/broker-plugins
parentd2b59f0faca606a677d3fa7e61667e54ae8c57b5 (diff)
downloadqpid-python-02dc201883cf730b11754aa7861e1d540624ef42.tar.gz
QPID-4164: Prevent the erroneous re-storing of recovered messages during move/copyMessage management functions.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1365832 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker-plugins')
-rw-r--r--java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java96
1 files changed, 89 insertions, 7 deletions
diff --git a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
index 8ae4fec975..79d04b239e 100644
--- a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
+++ b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
@@ -41,10 +41,12 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.management.Notification;
import javax.management.NotificationListener;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
+import javax.naming.NamingException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -83,7 +85,6 @@ public class QueueManagementTest extends QpidBrokerTestCase
private ManagedQueue _managedSourceQueue;
private ManagedQueue _managedDestinationQueue;
-
public void setUp() throws Exception
{
_jmxUtils = new JMXTestUtils(this);
@@ -93,10 +94,8 @@ public class QueueManagementTest extends QpidBrokerTestCase
_sourceQueueName = getTestQueueName() + "_src";
_destinationQueueName = getTestQueueName() + "_dest";
- _connection = getConnection();
- _connection.start();
+ createConnectionAndSession();
- _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
_sourceQueue = _session.createQueue(_sourceQueueName);
_destinationQueue = _session.createQueue(_destinationQueueName);
createQueueOnBroker(_sourceQueue);
@@ -104,8 +103,7 @@ public class QueueManagementTest extends QpidBrokerTestCase
_jmxUtils.open();
- _managedSourceQueue = _jmxUtils.getManagedQueue(_sourceQueueName);
- _managedDestinationQueue = _jmxUtils.getManagedQueue(_destinationQueueName);
+ createManagementInterfacesForQueues();
}
public void tearDown() throws Exception
@@ -498,6 +496,70 @@ public class QueueManagementTest extends QpidBrokerTestCase
assertEquals("Did not consume all messages from destination queue", numberOfMessagesToSend, totalConsumed.intValue());
}
+ /**
+ * Tests {@link ManagedQueue#moveMessages(long, long, String)} interface.
+ */
+ public void testMoveMessageBetweenQueuesWithBrokerRestart() throws Exception
+ {
+ final int numberOfMessagesToSend = 1;
+
+ sendMessage(_session, _sourceQueue, numberOfMessagesToSend);
+ syncSession(_session);
+ assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue());
+
+ restartBroker();
+
+ createManagementInterfacesForQueues();
+ createConnectionAndSession();
+
+ List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend);
+
+ // Move messages to destination
+ long messageId = amqMessagesIds.get(0);
+ _managedSourceQueue.moveMessages(messageId, messageId, _destinationQueueName);
+
+ assertEquals("Unexpected queue depth on destination queue after move", 1, _managedDestinationQueue.getMessageCount().intValue());
+ assertEquals("Unexpected queue depth on source queue after move", 0, _managedSourceQueue.getMessageCount().intValue());
+
+ assertMessageIndicesOn(_destinationQueue, 0);
+ }
+
+ /**
+ * Tests {@link ManagedQueue#copyMessages(long, long, String)} interface.
+ */
+ public void testCopyMessageBetweenQueuesWithBrokerRestart() throws Exception
+ {
+ final int numberOfMessagesToSend = 1;
+
+ sendMessage(_session, _sourceQueue, numberOfMessagesToSend);
+ syncSession(_session);
+ assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue());
+
+ restartBroker();
+
+ createManagementInterfacesForQueues();
+ createConnectionAndSession();
+
+ List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend);
+
+ // Move messages to destination
+ long messageId = amqMessagesIds.get(0);
+ _managedSourceQueue.copyMessages(messageId, messageId, _destinationQueueName);
+
+ assertEquals("Unexpected queue depth on destination queue after copy", 1, _managedDestinationQueue.getMessageCount().intValue());
+ assertEquals("Unexpected queue depth on source queue after copy", 1, _managedSourceQueue.getMessageCount().intValue());
+
+ assertMessageIndicesOn(_destinationQueue, 0);
+ }
+
+ @Override
+ public Message createNextMessage(Session session, int messageNumber) throws JMSException
+ {
+ Message message = session.createTextMessage(getContentForMessageNumber(messageNumber));
+ message.setIntProperty(INDEX, messageNumber);
+ return message;
+ }
+
private void startAsyncConsumerOn(Destination queue, Connection asyncConnection,
final CountDownLatch requiredNumberOfMessagesRead, final AtomicInteger totalConsumed) throws Exception
{
@@ -521,9 +583,10 @@ public class QueueManagementTest extends QpidBrokerTestCase
for (int i : expectedIndices)
{
- Message message = consumer.receive(1000);
+ TextMessage message = (TextMessage)consumer.receive(1000);
assertNotNull("Expected message with index " + i, message);
assertEquals("Expected message with index " + i, i, message.getIntProperty(INDEX));
+ assertEquals("Expected message content", getContentForMessageNumber(i), message.getText());
}
assertNull("Unexpected message encountered", consumer.receive(1000));
@@ -574,6 +637,25 @@ public class QueueManagementTest extends QpidBrokerTestCase
((AMQSession<?,?>)session).sync();
}
+ private void createConnectionAndSession() throws JMSException,
+ NamingException
+ {
+ _connection = getConnection();
+ _connection.start();
+ _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ }
+
+ private void createManagementInterfacesForQueues()
+ {
+ _managedSourceQueue = _jmxUtils.getManagedQueue(_sourceQueueName);
+ _managedDestinationQueue = _jmxUtils.getManagedQueue(_destinationQueueName);
+ }
+
+ private String getContentForMessageNumber(int msgCount)
+ {
+ return "Message count " + msgCount;
+ }
+
private final class RecordingNotificationListener implements NotificationListener
{
private final CountDownLatch _notificationReceivedLatch;