From 24fb3dba2d14bb149c2829d7154b7cce782903df Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Mon, 28 Nov 2011 09:26:58 +0000 Subject: QPID-3642, QPID-3640: Add Dead Letter Queue functionality for 0-10 path. Also, it fixes issue with setting of redelivered flag for pre-fetched messages as DLQ functionality relies on this flag being set correctly. Applied patch from Andrew MacBean and Oleksandr Rudyy git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1207031 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/AMQSession_0_10.java | 47 +++++++++++++++++----- .../qpid/client/BasicMessageConsumer_0_10.java | 2 +- 2 files changed, 39 insertions(+), 10 deletions(-) (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 7bde470c8e..dde020a750 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -27,11 +27,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Timer; import java.util.TimerTask; import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; import javax.jms.Destination; import javax.jms.JMSException; @@ -402,6 +405,10 @@ public class AMQSession_0_10 extends AMQSession deliveredIter = delivered.iterator(); deliveredIter.hasNext();) + { + Range range = deliveredIter.next(); + all.add(range); + } + for (Iterator prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();) + { + Range range = prefetchedIter.next(); + all.add(range); + } + flushProcessed(all, false); + getQpidSession().messageRelease(delivered, Option.SET_REDELIVERED); + getQpidSession().messageRelease(prefetched); + // We need to sync so that we get notify of an error. sync(); } - private RangeSet gatherUnackedRangeSet() + private RangeSet gatherRangeSet(ConcurrentLinkedQueue messageTags) { RangeSet ranges = new RangeSet(); while (true) { - Long tag = _unacknowledgedMessageTags.poll(); + Long tag = messageTags.poll(); if (tag == null) { break; @@ -504,7 +525,14 @@ public class AMQSession_0_10 extends AMQSession 0 ) + if(ranges.size() > 0 ) { - messageAcknowledge(range, true); + messageAcknowledge(ranges, true); getQpidSession().sync(); } } @@ -1343,6 +1371,7 @@ public class AMQSession_0_10 extends AMQSession