summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-12-22 14:21:22 +0000
committerKeith Wall <kwall@apache.org>2014-12-22 14:21:22 +0000
commit5e0175cee706d931b12968004e61e5ed0aac071a (patch)
tree0391f0c722b7d65e97a4380b4e6549bf103236bb /qpid/java
parent6f483ddc75201d50120ee1dd8a4071aa628574d3 (diff)
downloadqpid-python-5e0175cee706d931b12968004e61e5ed0aac071a.tar.gz
QPID-5099: Add system test to ensure that client does indeed release the messages on consumer close
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1647319 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java38
1 files changed, 36 insertions, 2 deletions
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
index df32bd7858..de30a98a7d 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
@@ -35,7 +35,7 @@ import org.apache.qpid.test.utils.QpidBrokerTestCase;
public class MessageConsumerCloseTest extends QpidBrokerTestCase
{
- Exception _exception;
+ private volatile Exception _exception;
public void testConsumerCloseAndSessionRollback() throws Exception
{
@@ -65,7 +65,7 @@ public class MessageConsumerCloseTest extends QpidBrokerTestCase
boolean messageReceived = receiveLatch.await(1l, TimeUnit.SECONDS);
consumer.close();
- assertNull("Exception occured on rollback:" + _exception, _exception);
+ assertNull("Exception occurred on rollback:" + _exception, _exception);
assertTrue("Message is not received", messageReceived);
consumer = session.createConsumer(destination);
@@ -74,4 +74,38 @@ public class MessageConsumerCloseTest extends QpidBrokerTestCase
Message message2 = consumer.receive(1000l);
assertNotNull("message2 is not received", message2);
}
+
+ public void testPrefetchedMessagesReleasedOnConsumerClose() throws Exception
+ {
+ Connection connection = getConnection();
+ final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ Destination destination = getTestQueue();
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ sendMessage(session, destination, 3);
+
+ connection.start();
+
+ Message msg1 = consumer.receive(1000);
+ assertNotNull("Message one was null", msg1);
+ assertEquals("Message one has unexpected content", 0, msg1.getIntProperty(INDEX));
+ session.commit();
+
+ // Messages two and three will have been prefetched by the consumer.
+ // Closing the consumer must make the available for delivery elsewhere
+
+ consumer.close();
+
+ MessageConsumer consumer2 = session.createConsumer(destination);
+
+ Message msg2 = consumer2.receive(1000);
+ Message msg3 = consumer2.receive(1000);
+ assertNotNull("Message two was null", msg2);
+ assertEquals("Message two has unexpected content", 1, msg2.getIntProperty(INDEX));
+
+ assertNotNull("Message three was null", msg3);
+ assertEquals("Message three has unexpected content", 2, msg3.getIntProperty(INDEX));
+ session.commit();
+ }
}