summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2011-11-30 13:58:14 +0000
committerKeith Wall <kwall@apache.org>2011-11-30 13:58:14 +0000
commitb148a40d311c0fa3f834ce574fbafc9a92d55106 (patch)
tree8d9897d3620ffc36e26ef94be0c1f4525d803ee1 /qpid/java
parentf16adca324a19d5aa64c9176dee84707b7d5a142 (diff)
downloadqpid-python-b148a40d311c0fa3f834ce574fbafc9a92d55106.tar.gz
QPID-3642: Fix for redelivery regression found by python tests
Applied patch from Andrew MacBean <andymacbean@gmail.com> and Oleksandr Rudyy<orudyy@gmail.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1208435 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/redelivered/RedeliveredMessageTest.java50
4 files changed, 57 insertions, 12 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index 273bab0ebe..99f2d6cbc2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -643,27 +643,26 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
});
}
- void reject(QueueEntry entry)
+ void reject(final QueueEntry entry)
{
entry.setRedelivered();
entry.routeToAlternate();
}
- void release(QueueEntry entry, boolean setRedelivered)
+ void release(final QueueEntry entry, final boolean setRedelivered)
{
- boolean maxDeliveryLimitExceeded = false;
if (setRedelivered)
{
entry.setRedelivered();
- maxDeliveryLimitExceeded = isMaxDeliveryLimitExceeded(entry);
}
- else
+
+ if (getSession().isClosing() || !setRedelivered)
{
entry.decrementDeliveryCount();
}
- if (maxDeliveryLimitExceeded)
+ if (isMaxDeliveryLimitReached(entry))
{
sendToDLQOrDiscard(entry);
}
@@ -708,7 +707,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
}
}
- private boolean isMaxDeliveryLimitExceeded(QueueEntry entry)
+ private boolean isMaxDeliveryLimitReached(QueueEntry entry)
{
final int maxDeliveryLimit = entry.getQueue().getMaximumDeliveryCount();
return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index ac95750e66..337d1f02cc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -350,7 +350,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
_transaction.rollback();
for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values())
{
- listener.onRelease(false);
+ listener.onRelease(true);
}
_messageDispositionListenerMap.clear();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index dde020a750..705e53d138 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -405,10 +405,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
*/
public void sendClose(long timeout) throws AMQException, FailoverException
{
- if (getTransacted())
- {
- releaseForRollback();
- }
if (flushTask != null)
{
flushTask.cancel();
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/redelivered/RedeliveredMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/redelivered/RedeliveredMessageTest.java
new file mode 100644
index 0000000000..a8fa183cbe
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/redelivered/RedeliveredMessageTest.java
@@ -0,0 +1,50 @@
+package org.apache.qpid.client.redelivered;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class RedeliveredMessageTest extends QpidBrokerTestCase
+{
+ private Connection _connection;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _connection = getConnection();
+ }
+
+ public void testRedeliveredFlagOnSessionClose() throws Exception
+ {
+ Session session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Destination destination = session.createQueue(getTestQueueName());
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ final int numberOfMessages = 3;
+ sendMessage(session, destination, numberOfMessages);
+
+ _connection.start();
+
+ for(int i = 0; i < numberOfMessages; i++)
+ {
+ final Message m = consumer.receive(1000l);
+ assertNotNull("Message is not recieved at " + i, m);
+ assertFalse("Redelivered should be not set", m.getJMSRedelivered());
+ }
+
+ session.close();
+ session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ destination = session.createQueue(getTestQueueName());
+ consumer = session.createConsumer(destination);
+
+ for(int i = 0; i < numberOfMessages; i++)
+ {
+ final Message m = consumer.receive(1000l);
+ assertNotNull("Message is not recieved at " + i, m);
+ assertTrue("Redelivered should be set", m.getJMSRedelivered());
+ }
+ }
+}