From a9bc6b0db0cce45ae9f8d8ea5419978da0222aad Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Wed, 11 Jan 2012 02:12:38 +0000 Subject: QPID-3604 The code now drains individual consumer queues as well as the dispatch queue (via syncDipatchQueue method) and releases both unacked and prefetched messages, while only the former being marked redelivered. Also all of these transfers are being marked as completed to ensure credits don't dry up. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1229857 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 2 +- .../org/apache/qpid/client/AMQSession_0_10.java | 42 +++++++++++++++++++--- 2 files changed, 38 insertions(+), 6 deletions(-) (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 2ae7a17af2..784b75af10 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -371,7 +371,7 @@ public abstract class AMQSession tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); + _prefetchedMessageTags.addAll(tags); + } + } + + _usingDispatcherForCleanup = true; + syncDispatchQueue(); + _usingDispatcherForCleanup = false; + + RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags); + RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags); + RangeSet all = RangeSetFactory.createRangeSet(delivered.size() + + prefetched.size()); + + for (Iterator deliveredIter = delivered.iterator(); deliveredIter.hasNext();) + { + Range range = deliveredIter.next(); + all.add(range); + } + + for (Iterator prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();) + { + Range range = prefetchedIter.next(); + all.add(range); + } + + flushProcessed(all, false); + getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED); + getQpidSession().messageRelease(prefetched); + sync(); } else { -- cgit v1.2.1