summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-09-08 15:37:20 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-09-08 15:37:20 +0000
commit7f5b8221bb212a12d588785c1881dabd3538ccdf (patch)
tree0b5aaf6512994b1fa5107fd834d1f2b621cd7057 /qpid/java/systests/src
parent76b0ca0a375198db3bd5f73687c0b24f713b1143 (diff)
downloadqpid-python-7f5b8221bb212a12d588785c1881dabd3538ccdf.tar.gz
QPID-6088 : [Java Client] AMQP 0-8/8/9-1 prefetch should auto expand when receive is called in a situation where the prefetch buffer is full
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1623422 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests/src')
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java74
1 files changed, 66 insertions, 8 deletions
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
index 69441d2be6..0b08a6d93e 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
@@ -20,11 +20,9 @@
*/
package org.apache.qpid.client.prefetch;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.Destination;
@@ -35,9 +33,12 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
public class PrefetchBehaviourTest extends QpidBrokerTestCase
@@ -231,5 +232,62 @@ public class PrefetchBehaviourTest extends QpidBrokerTestCase
assertNotNull("The second consumer should get 9 messages, but received only " + i,m);
}
}
+
+ public void testPrefetchWindowExpandsOnReceiveTransaction() throws Exception
+ {
+
+ _normalConnection.start();
+
+ //create a second connection with prefetch set to 1
+
+ setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString());
+ Connection prefetch1Connection = getConnection();
+ Session consumerSession = prefetch1Connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = consumerSession.createConsumer(consumerSession.createQueue(getTestQueueName()));
+
+
+ Session producerSession = _normalConnection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = producerSession.createQueue(getTestQueueName());
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ for (int i = 0; i < 5; i++)
+ {
+ producer.send(producerSession.createTextMessage("test"));
+ }
+ producerSession.commit();
+
+
+ prefetch1Connection.start();
+
+
+
+ Message message = consumer.receive(1000l);
+ assertNotNull(message);
+ message = consumer.receive(1000l);
+ assertNotNull(message);
+ message = consumer.receive(1000l);
+ assertNotNull(message);
+
+
+ Connection secondConsumerConnection = getConnection();
+ Session secondConsumerSession = secondConsumerConnection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer secondConsumer = secondConsumerSession.createConsumer(consumerSession.createQueue(getTestQueueName()));
+ secondConsumerConnection.start();
+
+ message = secondConsumer.receive(1000l);
+ assertNotNull(message);
+
+ message = secondConsumer.receive(1000l);
+ assertNotNull(message);
+
+ consumerSession.commit();
+ secondConsumerSession.commit();
+
+ message = consumer.receive(1000l);
+ assertNull(message);
+
+ }
+
+
}