summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-11-17 18:42:25 +0000
committerRobert Gemmell <robbie@apache.org>2011-11-17 18:42:25 +0000
commite19279e320db1b32e2e3a8a4602fe1844a3ad082 (patch)
tree3bbab64cde0042d9f753f1e4c0aa13d710270684
parent7d07cd053fe2dcf8923774fed40db54bec18cc7c (diff)
downloadqpid-python-e19279e320db1b32e2e3a8a4602fe1844a3ad082.tar.gz
QPID-2703: 0-10 Transaction rollback/recover does not restore consumer credit
Defect in Java client. 0-10 requires that commands are completed, but the Java client was failing to complete those commands corresponding to messages that were being rolled-back/recovered. Work by Robbie Gemmell and myself. merged from trunk r1203139 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.14@1203317 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java23
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java1
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java23
3 files changed, 17 insertions, 30 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 826ca46cca..bc97d822cb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -48,7 +48,6 @@ import org.apache.qpid.client.message.FieldTableSupport;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
import org.apache.qpid.client.messaging.address.Link;
-import org.apache.qpid.client.messaging.address.Link.Reliability;
import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
@@ -140,9 +139,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
private int unackedCount = 0;
/**
- * USed to store the range of in tx messages
+ * Used to store the range of in tx messages
*/
- private RangeSet _txRangeSet = new RangeSet();
+ private final RangeSet _txRangeSet = new RangeSet();
private int _txSize = 0;
//--- constructors
@@ -456,6 +455,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
// release all unacked messages
RangeSet ranges = gatherUnackedRangeSet();
+ flushProcessed(ranges, false);
getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
// We need to sync so that we get notify of an error.
sync();
@@ -478,12 +478,15 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return ranges;
}
-
public void releaseForRollback()
{
- getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED);
- _txRangeSet.clear();
- _txSize = 0;
+ if (_txSize > 0)
+ {
+ flushProcessed(_txRangeSet, false);
+ getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED);
+ _txRangeSet.clear();
+ _txSize = 0;
+ }
}
/**
@@ -497,6 +500,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
// The value of requeue is always true
RangeSet ranges = new RangeSet();
ranges.add((int) deliveryTag);
+ flushProcessed(ranges, false);
getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
//I don't think we need to sync
}
@@ -1404,6 +1408,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
// messages sent by the brokers following the first rollback
// after failover
_highestDeliveryTag.set(-1);
+ // Clear txRangeSet/unacknowledgedMessageTags so we don't complete commands corresponding to
+ //messages that came from the old broker.
+ _txRangeSet.clear();
+ _txSize = 0;
+ _unacknowledgedMessageTags.clear();
super.resubscribe();
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 7c8ccf4cf9..20892b8e95 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -498,6 +498,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
}
+ _0_10session.flushProcessed(ranges, false);
_0_10session.getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
clearReceiveQueue();
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
index 849827216c..84b67f0919 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
@@ -288,29 +288,6 @@ public class AMQSession_0_10Test extends TestCase
assertNotNull("ExecutionSync was not sent", event);
}
- public void testRejectMessage()
- {
- AMQSession_0_10 session = createAMQSession_0_10();
- session.rejectMessage(1l, true);
- ProtocolEvent event = findSentProtocolEventOfClass(session, MessageRelease.class, false);
- assertNotNull("MessageRelease event was not sent", event);
- }
-
- public void testReleaseForRollback()
- {
- AMQSession_0_10 session = createAMQSession_0_10();
- try
- {
- session.releaseForRollback();
- }
- catch (Exception e)
- {
- fail("Unexpected exception is cought:" + e.getMessage());
- }
- ProtocolEvent event = findSentProtocolEventOfClass(session, MessageRelease.class, false);
- assertNotNull("MessageRelease event was not sent", event);
- }
-
public void testSendQueueDelete()
{
AMQSession_0_10 session = createAMQSession_0_10();