summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2012-01-11 02:12:38 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2012-01-11 02:12:38 +0000
commita9bc6b0db0cce45ae9f8d8ea5419978da0222aad (patch)
tree61a78c3dd688b43f39786417f9d7023bd76e80b1 /qpid/java
parent38fd3808e4001b1e7f46b5eeaa1c51394c83d46b (diff)
downloadqpid-python-a9bc6b0db0cce45ae9f8d8ea5419978da0222aad.tar.gz
QPID-3604 The code now drains individual consumer queues as well as the
dispatch queue (via syncDipatchQueue method) and releases both unacked and prefetched messages, while only the former being marked redelivered. Also all of these transfers are being marked as completed to ensure credits don't dry up. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1229857 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java42
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java39
3 files changed, 77 insertions, 6 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 2ae7a17af2..784b75af10 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -371,7 +371,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of
* to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover().
*/
- private volatile boolean _usingDispatcherForCleanup;
+ protected volatile boolean _usingDispatcherForCleanup;
/** Used to indicates that the connection to which this session belongs, has been stopped. */
private boolean _connectionStopped;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 8dc36673dc..49b77dcc7b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -795,11 +795,43 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
if (suspend)
{
- for (BasicMessageConsumer consumer : _consumers.values())
- {
- getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
- Option.UNRELIABLE);
- }
+ synchronized (getMessageDeliveryLock())
+ {
+ for (BasicMessageConsumer consumer : _consumers.values())
+ {
+ getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
+ Option.UNRELIABLE);
+ sync();
+ List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
+ _prefetchedMessageTags.addAll(tags);
+ }
+ }
+
+ _usingDispatcherForCleanup = true;
+ syncDispatchQueue();
+ _usingDispatcherForCleanup = false;
+
+ RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags);
+ RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags);
+ RangeSet all = RangeSetFactory.createRangeSet(delivered.size()
+ + prefetched.size());
+
+ for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();)
+ {
+ Range range = deliveredIter.next();
+ all.add(range);
+ }
+
+ for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();)
+ {
+ Range range = prefetchedIter.next();
+ all.add(range);
+ }
+
+ flushProcessed(all, false);
+ getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED);
+ getQpidSession().messageRelease(prefetched);
+ sync();
}
else
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
index d2950b095b..5c5ad66777 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
@@ -5,12 +5,14 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
+import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+import javax.jms.TextMessage;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -133,4 +135,41 @@ public class PrefetchBehaviourTest extends QpidBrokerTestCase
assertFalse("Unexpecte exception during async message processing",_exceptionCaught.get());
}
+ /**
+ * Test Goal: Verify if connection stop releases all messages in it's prefetch buffer.
+ * Test Strategy: Send 10 messages to a queue. Create a consumer with maxprefetch of 5, but never consume them.
+ * Stop the connection. Create a new connection and a consumer with maxprefetch 10 on the same queue.
+ * Try to receive all 10 messages.
+ */
+ public void testConnectionStop() throws Exception
+ {
+ setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "10");
+ Connection con = getConnection();
+ con.start();
+ Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination queue = ssn.createQueue("ADDR:my-queue;{create: always}");
+
+ MessageProducer prod = ssn.createProducer(queue);
+ for (int i=0; i<10;i++)
+ {
+ prod.send(ssn.createTextMessage("Msg" + i));
+ }
+
+ MessageConsumer consumer = ssn.createConsumer(queue);
+ // This is to ensure we get the first client to prefetch.
+ Message msg = consumer.receive(1000);
+ assertNotNull("The first consumer should get one message",msg);
+ con.stop();
+
+ Connection con2 = getConnection();
+ con2.start();
+ Session ssn2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer2 = ssn2.createConsumer(queue);
+ for (int i=0; i<9;i++)
+ {
+ TextMessage m = (TextMessage)consumer2.receive(1000);
+ assertNotNull("The second consumer should get 9 messages, but received only " + i,m);
+ }
+ }
+
}