diff options
| author | Robert Gemmell <robbie@apache.org> | 2011-11-17 18:42:25 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2011-11-17 18:42:25 +0000 |
| commit | e19279e320db1b32e2e3a8a4602fe1844a3ad082 (patch) | |
| tree | 3bbab64cde0042d9f753f1e4c0aa13d710270684 | |
| parent | 7d07cd053fe2dcf8923774fed40db54bec18cc7c (diff) | |
| download | qpid-python-e19279e320db1b32e2e3a8a4602fe1844a3ad082.tar.gz | |
QPID-2703: 0-10 Transaction rollback/recover does not restore consumer credit
Defect in Java client. 0-10 requires that commands are completed, but the Java client was failing to complete those commands
corresponding to messages that were being rolled-back/recovered. Work by Robbie Gemmell and myself.
merged from trunk r1203139
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.14@1203317 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 17 insertions, 30 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 826ca46cca..bc97d822cb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -48,7 +48,6 @@ import org.apache.qpid.client.message.FieldTableSupport; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage_0_10; import org.apache.qpid.client.messaging.address.Link; -import org.apache.qpid.client.messaging.address.Link.Reliability; import org.apache.qpid.client.messaging.address.Node.ExchangeNode; import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.client.protocol.AMQProtocolHandler; @@ -140,9 +139,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic private int unackedCount = 0; /** - * USed to store the range of in tx messages + * Used to store the range of in tx messages */ - private RangeSet _txRangeSet = new RangeSet(); + private final RangeSet _txRangeSet = new RangeSet(); private int _txSize = 0; //--- constructors @@ -456,6 +455,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { // release all unacked messages RangeSet ranges = gatherUnackedRangeSet(); + flushProcessed(ranges, false); getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); // We need to sync so that we get notify of an error. sync(); @@ -478,12 +478,15 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return ranges; } - public void releaseForRollback() { - getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED); - _txRangeSet.clear(); - _txSize = 0; + if (_txSize > 0) + { + flushProcessed(_txRangeSet, false); + getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED); + _txRangeSet.clear(); + _txSize = 0; + } } /** @@ -497,6 +500,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic // The value of requeue is always true RangeSet ranges = new RangeSet(); ranges.add((int) deliveryTag); + flushProcessed(ranges, false); getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); //I don't think we need to sync } @@ -1404,6 +1408,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic // messages sent by the brokers following the first rollback // after failover _highestDeliveryTag.set(-1); + // Clear txRangeSet/unacknowledgedMessageTags so we don't complete commands corresponding to + //messages that came from the old broker. + _txRangeSet.clear(); + _txSize = 0; + _unacknowledgedMessageTags.clear(); super.resubscribe(); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 7c8ccf4cf9..20892b8e95 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -498,6 +498,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } } + _0_10session.flushProcessed(ranges, false); _0_10session.getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); clearReceiveQueue(); } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java index 849827216c..84b67f0919 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java @@ -288,29 +288,6 @@ public class AMQSession_0_10Test extends TestCase assertNotNull("ExecutionSync was not sent", event); } - public void testRejectMessage() - { - AMQSession_0_10 session = createAMQSession_0_10(); - session.rejectMessage(1l, true); - ProtocolEvent event = findSentProtocolEventOfClass(session, MessageRelease.class, false); - assertNotNull("MessageRelease event was not sent", event); - } - - public void testReleaseForRollback() - { - AMQSession_0_10 session = createAMQSession_0_10(); - try - { - session.releaseForRollback(); - } - catch (Exception e) - { - fail("Unexpected exception is cought:" + e.getMessage()); - } - ProtocolEvent event = findSentProtocolEventOfClass(session, MessageRelease.class, false); - assertNotNull("MessageRelease event was not sent", event); - } - public void testSendQueueDelete() { AMQSession_0_10 session = createAMQSession_0_10(); |
