summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2011-11-28 09:26:58 +0000
committerKeith Wall <kwall@apache.org>2011-11-28 09:26:58 +0000
commit24fb3dba2d14bb149c2829d7154b7cce782903df (patch)
tree92259fd5904017cba1e8f047c33752f2718df2ca /java/client/src
parent4ee4c8776c48bdc0a2bd1c2e34e71bf3a63e33cd (diff)
downloadqpid-python-24fb3dba2d14bb149c2829d7154b7cce782903df.tar.gz
QPID-3642, QPID-3640: Add Dead Letter Queue functionality for 0-10 path. Also, it fixes issue with setting of redelivered flag for pre-fetched messages as DLQ functionality relies on this flag being set correctly.
Applied patch from Andrew MacBean <andymacbean@gmail.com> and Oleksandr Rudyy<orudyy@gmail.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1207031 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java47
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java2
2 files changed, 39 insertions, 10 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 7bde470c8e..dde020a750 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -27,11 +27,14 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -402,6 +405,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
*/
public void sendClose(long timeout) throws AMQException, FailoverException
{
+ if (getTransacted())
+ {
+ releaseForRollback();
+ }
if (flushTask != null)
{
flushTask.cancel();
@@ -457,19 +464,33 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void sendRecover() throws AMQException, FailoverException
{
// release all unacked messages
- RangeSet ranges = gatherUnackedRangeSet();
- flushProcessed(ranges, false);
- getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
+ RangeSet all = new RangeSet();
+ RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags);
+ RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags);
+ for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();)
+ {
+ Range range = deliveredIter.next();
+ all.add(range);
+ }
+ for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();)
+ {
+ Range range = prefetchedIter.next();
+ all.add(range);
+ }
+ flushProcessed(all, false);
+ getQpidSession().messageRelease(delivered, Option.SET_REDELIVERED);
+ getQpidSession().messageRelease(prefetched);
+
// We need to sync so that we get notify of an error.
sync();
}
- private RangeSet gatherUnackedRangeSet()
+ private RangeSet gatherRangeSet(ConcurrentLinkedQueue<Long> messageTags)
{
RangeSet ranges = new RangeSet();
while (true)
{
- Long tag = _unacknowledgedMessageTags.poll();
+ Long tag = messageTags.poll();
if (tag == null)
{
break;
@@ -504,7 +525,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
RangeSet ranges = new RangeSet();
ranges.add((int) deliveryTag);
flushProcessed(ranges, false);
- getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
+ if (requeue)
+ {
+ getQpidSession().messageRelease(ranges);
+ }
+ else
+ {
+ getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
+ }
//I don't think we need to sync
}
@@ -1321,11 +1349,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
protected void acknowledgeImpl()
{
- RangeSet range = gatherUnackedRangeSet();
+ RangeSet ranges = gatherRangeSet(_unacknowledgedMessageTags);
- if(range.size() > 0 )
+ if(ranges.size() > 0 )
{
- messageAcknowledge(range, true);
+ messageAcknowledge(ranges, true);
getQpidSession().sync();
}
}
@@ -1343,6 +1371,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
_txRangeSet.clear();
_txSize = 0;
_unacknowledgedMessageTags.clear();
+ _prefetchedMessageTags.clear();
super.resubscribe();
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 57434c9a1d..bb277887aa 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -471,7 +471,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
_0_10session.flushProcessed(ranges, false);
- _0_10session.getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
+ _0_10session.getQpidSession().messageRelease(ranges);
clearReceiveQueue();
}
}