diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2012-01-11 02:12:38 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2012-01-11 02:12:38 +0000 |
commit | 921a1c732a4a73a4f10064f0293e342f2bc93973 (patch) | |
tree | 98f374ba0aa30ad92a2c1945c4871425102e3f40 /java/client | |
parent | 5502bf0c06855533b07cb7a88198d164ffd534ec (diff) | |
download | qpid-python-921a1c732a4a73a4f10064f0293e342f2bc93973.tar.gz |
QPID-3604 The code now drains individual consumer queues as well as the
dispatch queue (via syncDipatchQueue method) and releases both unacked
and prefetched messages, while only the former being marked redelivered.
Also all of these transfers are being marked as completed to ensure
credits don't dry up.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1229857 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 2 | ||||
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 42 |
2 files changed, 38 insertions, 6 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 2ae7a17af2..784b75af10 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -371,7 +371,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover(). */ - private volatile boolean _usingDispatcherForCleanup; + protected volatile boolean _usingDispatcherForCleanup; /** Used to indicates that the connection to which this session belongs, has been stopped. */ private boolean _connectionStopped; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 8dc36673dc..49b77dcc7b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -795,11 +795,43 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { if (suspend) { - for (BasicMessageConsumer consumer : _consumers.values()) - { - getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), - Option.UNRELIABLE); - } + synchronized (getMessageDeliveryLock()) + { + for (BasicMessageConsumer consumer : _consumers.values()) + { + getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), + Option.UNRELIABLE); + sync(); + List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); + _prefetchedMessageTags.addAll(tags); + } + } + + _usingDispatcherForCleanup = true; + syncDispatchQueue(); + _usingDispatcherForCleanup = false; + + RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags); + RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags); + RangeSet all = RangeSetFactory.createRangeSet(delivered.size() + + prefetched.size()); + + for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();) + { + Range range = deliveredIter.next(); + all.add(range); + } + + for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();) + { + Range range = prefetchedIter.next(); + all.add(range); + } + + flushProcessed(all, false); + getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED); + getQpidSession().messageRelease(prefetched); + sync(); } else { |