summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2010-05-11 19:51:32 +0000
committerMartin Ritchie <ritchiem@apache.org>2010-05-11 19:51:32 +0000
commit2ce5c11a417445ce65cbdf2b74c6451b13dd61b6 (patch)
treec2d0e284201f431bbad1f2a7a7bb2d8516965c3a
parent1273e0279bf7a2876714d60909893a2a5f9bc3bb (diff)
downloadqpid-python-2ce5c11a417445ce65cbdf2b74c6451b13dd61b6.tar.gz
QPID-2596 : Updated QEI to restoreCredit when aquired messages are released. Updated CommitRollbackTest.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@943249 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java85
2 files changed, 88 insertions, 2 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index b6d57a7a03..04ffefe93d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -178,6 +178,11 @@ public class QueueEntryImpl implements QueueEntry
public void release()
{
+ Subscription subscription = getDeliveredSubscription();
+ if (subscription != null)
+ {
+ subscription.restoreCredit(this);
+ }
_stateUpdater.set(this,AVAILABLE_STATE);
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
index b603455644..e96b2c48c3 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.test.unit.transacted;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.client.AMQConnection;
import org.slf4j.Logger;
@@ -63,12 +64,12 @@ public class CommitRollbackTest extends QpidTestCase
{
conn = (AMQConnection) getConnection("guest", "guest");
- _session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ _session = conn.createSession(true, Session.SESSION_TRANSACTED);
_jmsQueue = _session.createQueue(queue);
_consumer = _session.createConsumer(_jmsQueue);
- _pubSession = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ _pubSession = conn.createSession(true, Session.SESSION_TRANSACTED);
_publisher = _pubSession.createProducer(_pubSession.createQueue(queue));
@@ -500,6 +501,86 @@ public class CommitRollbackTest extends QpidTestCase
}
/**
+ * QPID-2596
+ * Test that rollback works.
+ *
+ * Goal is to ensure that message credit is correctly restored.
+ * Previously rollback would result in message release() on the Java broker
+ * which would leak credit.
+ *
+ * Here we set a small pre-fetch and so small credit window and then consume
+ * a large number of messages.
+ *
+ * By filling the pre-fetch before rolling back we ensure that all the
+ * credit is used and so if we do not get any back the test will fail
+ * consistently.
+ *
+ * Using a large pre-fetch we can guarantee that we have filled the pre-fetch
+ * before performing rollback.
+ *
+ * Test outline.
+ *
+ * - Connect two transacted sessions with a small pre-fetch.
+ * - Send a large amount of messages on one session
+ * - Use second session to receive pre-fetch worth of messages
+ * - Rollback receiver session
+ * - Continue to consume all the messages on the receiver session,
+ * committing every batch size.
+ * - Fail if we can't get the message.
+ * - End by checking all msgs are consumed
+ *
+ * @throws Exception uf some thing unexpected occured
+ */
+ public void testReceiveThenRollbackConsumerThenReceive() throws Exception
+ {
+ // Close connection so we can reset the pre-fetch
+ conn.close();
+
+ int MAX_PREFETCH=5;
+ int BACK_LOG_FACTOR=200;
+
+ setSystemProperty("max_prefetch", String.valueOf(MAX_PREFETCH));
+
+ // Reconnect
+ newConnection();
+
+ assertEquals("Prefetch not reset",
+ MAX_PREFETCH,((AMQSession)_session).getDefaultPrefetch());
+
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+ _logger.info("Sending (" + MAX_PREFETCH * BACK_LOG_FACTOR + ")messages");
+ sendMessage(_pubSession, _publisher.getDestination(), MAX_PREFETCH * BACK_LOG_FACTOR);
+ _pubSession.commit();
+
+ for (int i=0 ;i< MAX_PREFETCH; i++)
+ {
+ assertNotNull("Received:" + i, _consumer.receive(1000));
+ _logger.info("Received:"+i);
+ }
+
+
+ _logger.info("Rolling back");
+ _session.rollback();
+
+ _logger.info("Receiving messages");
+
+ for (int b = 0; b < BACK_LOG_FACTOR; b++)
+ {
+ for (int a = 0; a < MAX_PREFETCH; a++)
+ {
+ assertNotNull("Received (" + b + ")after rollback:" + a, _consumer.receive(1000));
+ }
+ _session.commit();
+ }
+
+ Message result = _consumer.receive(500);
+ assertNull("test message was put and rolled back, but is still present", result);
+ }
+
+
+ /**
* Qpid-1163
* Check that when commt is called inside onMessage then
* the last message is nor redelivered.