diff options
author | Keith Wall <kwall@apache.org> | 2011-11-28 09:26:58 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2011-11-28 09:26:58 +0000 |
commit | 24fb3dba2d14bb149c2829d7154b7cce782903df (patch) | |
tree | 92259fd5904017cba1e8f047c33752f2718df2ca /java/client/src | |
parent | 4ee4c8776c48bdc0a2bd1c2e34e71bf3a63e33cd (diff) | |
download | qpid-python-24fb3dba2d14bb149c2829d7154b7cce782903df.tar.gz |
QPID-3642, QPID-3640: Add Dead Letter Queue functionality for 0-10 path. Also, it fixes issue with setting of redelivered flag for pre-fetched messages as DLQ functionality relies on this flag being set correctly.
Applied patch from Andrew MacBean <andymacbean@gmail.com> and Oleksandr Rudyy<orudyy@gmail.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1207031 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 47 | ||||
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 2 |
2 files changed, 39 insertions, 10 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 7bde470c8e..dde020a750 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -27,11 +27,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Timer; import java.util.TimerTask; import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; import javax.jms.Destination; import javax.jms.JMSException; @@ -402,6 +405,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic */ public void sendClose(long timeout) throws AMQException, FailoverException { + if (getTransacted()) + { + releaseForRollback(); + } if (flushTask != null) { flushTask.cancel(); @@ -457,19 +464,33 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void sendRecover() throws AMQException, FailoverException { // release all unacked messages - RangeSet ranges = gatherUnackedRangeSet(); - flushProcessed(ranges, false); - getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); + RangeSet all = new RangeSet(); + RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags); + RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags); + for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();) + { + Range range = deliveredIter.next(); + all.add(range); + } + for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();) + { + Range range = prefetchedIter.next(); + all.add(range); + } + flushProcessed(all, false); + getQpidSession().messageRelease(delivered, Option.SET_REDELIVERED); + getQpidSession().messageRelease(prefetched); + // We need to sync so that we get notify of an error. sync(); } - private RangeSet gatherUnackedRangeSet() + private RangeSet gatherRangeSet(ConcurrentLinkedQueue<Long> messageTags) { RangeSet ranges = new RangeSet(); while (true) { - Long tag = _unacknowledgedMessageTags.poll(); + Long tag = messageTags.poll(); if (tag == null) { break; @@ -504,7 +525,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic RangeSet ranges = new RangeSet(); ranges.add((int) deliveryTag); flushProcessed(ranges, false); - getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); + if (requeue) + { + getQpidSession().messageRelease(ranges); + } + else + { + getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); + } //I don't think we need to sync } @@ -1321,11 +1349,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic protected void acknowledgeImpl() { - RangeSet range = gatherUnackedRangeSet(); + RangeSet ranges = gatherRangeSet(_unacknowledgedMessageTags); - if(range.size() > 0 ) + if(ranges.size() > 0 ) { - messageAcknowledge(range, true); + messageAcknowledge(ranges, true); getQpidSession().sync(); } } @@ -1343,6 +1371,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic _txRangeSet.clear(); _txSize = 0; _unacknowledgedMessageTags.clear(); + _prefetchedMessageTags.clear(); super.resubscribe(); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 57434c9a1d..bb277887aa 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -471,7 +471,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } _0_10session.flushProcessed(ranges, false); - _0_10session.getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); + _0_10session.getQpidSession().messageRelease(ranges); clearReceiveQueue(); } } |