summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2012-01-11 15:24:44 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2012-01-11 15:24:44 +0000
commitb696d17832f9bf7fbd45147d864b52c2a11daa58 (patch)
tree650d27c5faedc235651882fcfa79a50e5d304464 /qpid/java/client
parentc747f2d757c044c059342f1c7c143b653b0ca4db (diff)
downloadqpid-python-b696d17832f9bf7fbd45147d864b52c2a11daa58.tar.gz
QPID-3604 Reverting the changes as it releases messages everytime the
channel is suspended. This results in several test failures. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1230088 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java3
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java43
2 files changed, 8 insertions, 38 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 784b75af10..48c4e3e3e6 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -371,7 +371,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of
* to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover().
*/
- protected volatile boolean _usingDispatcherForCleanup;
+ private volatile boolean _usingDispatcherForCleanup;
/** Used to indicates that the connection to which this session belongs, has been stopped. */
private boolean _connectionStopped;
@@ -3570,3 +3570,4 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
+
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 49b77dcc7b..8395c8f4b7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -795,43 +795,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
if (suspend)
{
- synchronized (getMessageDeliveryLock())
- {
- for (BasicMessageConsumer consumer : _consumers.values())
- {
- getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
- Option.UNRELIABLE);
- sync();
- List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
- _prefetchedMessageTags.addAll(tags);
- }
- }
-
- _usingDispatcherForCleanup = true;
- syncDispatchQueue();
- _usingDispatcherForCleanup = false;
-
- RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags);
- RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags);
- RangeSet all = RangeSetFactory.createRangeSet(delivered.size()
- + prefetched.size());
-
- for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();)
- {
- Range range = deliveredIter.next();
- all.add(range);
- }
-
- for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();)
- {
- Range range = prefetchedIter.next();
- all.add(range);
- }
-
- flushProcessed(all, false);
- getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED);
- getQpidSession().messageRelease(prefetched);
- sync();
+ for (BasicMessageConsumer consumer : _consumers.values())
+ {
+ getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
+ Option.UNRELIABLE);
+ }
}
else
{
@@ -1387,3 +1355,4 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
getQpidSession().sync();
}
}
+