diff options
| author | Keith Wall <kwall@apache.org> | 2011-11-30 13:58:14 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2011-11-30 13:58:14 +0000 |
| commit | b148a40d311c0fa3f834ce574fbafc9a92d55106 (patch) | |
| tree | 8d9897d3620ffc36e26ef94be0c1f4525d803ee1 /qpid/java | |
| parent | f16adca324a19d5aa64c9176dee84707b7d5a142 (diff) | |
| download | qpid-python-b148a40d311c0fa3f834ce574fbafc9a92d55106.tar.gz | |
QPID-3642: Fix for redelivery regression found by python tests
Applied patch from Andrew MacBean <andymacbean@gmail.com> and Oleksandr Rudyy<orudyy@gmail.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1208435 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
4 files changed, 57 insertions, 12 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 273bab0ebe..99f2d6cbc2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -643,27 +643,26 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr }); } - void reject(QueueEntry entry) + void reject(final QueueEntry entry) { entry.setRedelivered(); entry.routeToAlternate(); } - void release(QueueEntry entry, boolean setRedelivered) + void release(final QueueEntry entry, final boolean setRedelivered) { - boolean maxDeliveryLimitExceeded = false; if (setRedelivered) { entry.setRedelivered(); - maxDeliveryLimitExceeded = isMaxDeliveryLimitExceeded(entry); } - else + + if (getSession().isClosing() || !setRedelivered) { entry.decrementDeliveryCount(); } - if (maxDeliveryLimitExceeded) + if (isMaxDeliveryLimitReached(entry)) { sendToDLQOrDiscard(entry); } @@ -708,7 +707,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr } } - private boolean isMaxDeliveryLimitExceeded(QueueEntry entry) + private boolean isMaxDeliveryLimitReached(QueueEntry entry) { final int maxDeliveryLimit = entry.getQueue().getMaximumDeliveryCount(); return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index ac95750e66..337d1f02cc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -350,7 +350,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi _transaction.rollback(); for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values()) { - listener.onRelease(false); + listener.onRelease(true); } _messageDispositionListenerMap.clear(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index dde020a750..705e53d138 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -405,10 +405,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic */ public void sendClose(long timeout) throws AMQException, FailoverException { - if (getTransacted()) - { - releaseForRollback(); - } if (flushTask != null) { flushTask.cancel(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/redelivered/RedeliveredMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/redelivered/RedeliveredMessageTest.java new file mode 100644 index 0000000000..a8fa183cbe --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/redelivered/RedeliveredMessageTest.java @@ -0,0 +1,50 @@ +package org.apache.qpid.client.redelivered; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class RedeliveredMessageTest extends QpidBrokerTestCase +{ + private Connection _connection; + + public void setUp() throws Exception + { + super.setUp(); + _connection = getConnection(); + } + + public void testRedeliveredFlagOnSessionClose() throws Exception + { + Session session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Destination destination = session.createQueue(getTestQueueName()); + MessageConsumer consumer = session.createConsumer(destination); + + final int numberOfMessages = 3; + sendMessage(session, destination, numberOfMessages); + + _connection.start(); + + for(int i = 0; i < numberOfMessages; i++) + { + final Message m = consumer.receive(1000l); + assertNotNull("Message is not recieved at " + i, m); + assertFalse("Redelivered should be not set", m.getJMSRedelivered()); + } + + session.close(); + session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + destination = session.createQueue(getTestQueueName()); + consumer = session.createConsumer(destination); + + for(int i = 0; i < numberOfMessages; i++) + { + final Message m = consumer.receive(1000l); + assertNotNull("Message is not recieved at " + i, m); + assertTrue("Redelivered should be set", m.getJMSRedelivered()); + } + } +} |
