diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2010-05-11 19:51:32 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2010-05-11 19:51:32 +0000 |
| commit | 2ce5c11a417445ce65cbdf2b74c6451b13dd61b6 (patch) | |
| tree | c2d0e284201f431bbad1f2a7a7bb2d8516965c3a | |
| parent | 1273e0279bf7a2876714d60909893a2a5f9bc3bb (diff) | |
| download | qpid-python-2ce5c11a417445ce65cbdf2b74c6451b13dd61b6.tar.gz | |
QPID-2596 : Updated QEI to restoreCredit when aquired messages are released. Updated CommitRollbackTest.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@943249 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java | 5 | ||||
| -rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java | 85 |
2 files changed, 88 insertions, 2 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index b6d57a7a03..04ffefe93d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -178,6 +178,11 @@ public class QueueEntryImpl implements QueueEntry public void release() { + Subscription subscription = getDeliveredSubscription(); + if (subscription != null) + { + subscription.restoreCredit(this); + } _stateUpdater.set(this,AVAILABLE_STATE); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index b603455644..e96b2c48c3 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.test.unit.transacted; +import org.apache.qpid.client.AMQSession; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.client.AMQConnection; import org.slf4j.Logger; @@ -63,12 +64,12 @@ public class CommitRollbackTest extends QpidTestCase { conn = (AMQConnection) getConnection("guest", "guest"); - _session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE); + _session = conn.createSession(true, Session.SESSION_TRANSACTED); _jmsQueue = _session.createQueue(queue); _consumer = _session.createConsumer(_jmsQueue); - _pubSession = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE); + _pubSession = conn.createSession(true, Session.SESSION_TRANSACTED); _publisher = _pubSession.createProducer(_pubSession.createQueue(queue)); @@ -500,6 +501,86 @@ public class CommitRollbackTest extends QpidTestCase } /** + * QPID-2596 + * Test that rollback works. + * + * Goal is to ensure that message credit is correctly restored. + * Previously rollback would result in message release() on the Java broker + * which would leak credit. + * + * Here we set a small pre-fetch and so small credit window and then consume + * a large number of messages. + * + * By filling the pre-fetch before rolling back we ensure that all the + * credit is used and so if we do not get any back the test will fail + * consistently. + * + * Using a large pre-fetch we can guarantee that we have filled the pre-fetch + * before performing rollback. + * + * Test outline. + * + * - Connect two transacted sessions with a small pre-fetch. + * - Send a large amount of messages on one session + * - Use second session to receive pre-fetch worth of messages + * - Rollback receiver session + * - Continue to consume all the messages on the receiver session, + * committing every batch size. + * - Fail if we can't get the message. + * - End by checking all msgs are consumed + * + * @throws Exception uf some thing unexpected occured + */ + public void testReceiveThenRollbackConsumerThenReceive() throws Exception + { + // Close connection so we can reset the pre-fetch + conn.close(); + + int MAX_PREFETCH=5; + int BACK_LOG_FACTOR=200; + + setSystemProperty("max_prefetch", String.valueOf(MAX_PREFETCH)); + + // Reconnect + newConnection(); + + assertEquals("Prefetch not reset", + MAX_PREFETCH,((AMQSession)_session).getDefaultPrefetch()); + + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("Sending (" + MAX_PREFETCH * BACK_LOG_FACTOR + ")messages"); + sendMessage(_pubSession, _publisher.getDestination(), MAX_PREFETCH * BACK_LOG_FACTOR); + _pubSession.commit(); + + for (int i=0 ;i< MAX_PREFETCH; i++) + { + assertNotNull("Received:" + i, _consumer.receive(1000)); + _logger.info("Received:"+i); + } + + + _logger.info("Rolling back"); + _session.rollback(); + + _logger.info("Receiving messages"); + + for (int b = 0; b < BACK_LOG_FACTOR; b++) + { + for (int a = 0; a < MAX_PREFETCH; a++) + { + assertNotNull("Received (" + b + ")after rollback:" + a, _consumer.receive(1000)); + } + _session.commit(); + } + + Message result = _consumer.receive(500); + assertNull("test message was put and rolled back, but is still present", result); + } + + + /** * Qpid-1163 * Check that when commt is called inside onMessage then * the last message is nor redelivered. |
