diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-09-08 15:37:20 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-09-08 15:37:20 +0000 |
| commit | 7f5b8221bb212a12d588785c1881dabd3538ccdf (patch) | |
| tree | 0b5aaf6512994b1fa5107fd834d1f2b621cd7057 /qpid/java/systests/src | |
| parent | 76b0ca0a375198db3bd5f73687c0b24f713b1143 (diff) | |
| download | qpid-python-7f5b8221bb212a12d588785c1881dabd3538ccdf.tar.gz | |
QPID-6088 : [Java Client] AMQP 0-8/8/9-1 prefetch should auto expand when receive is called in a situation where the prefetch buffer is full
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1623422 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests/src')
| -rw-r--r-- | qpid/java/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java | 74 |
1 files changed, 66 insertions, 8 deletions
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java index 69441d2be6..0b08a6d93e 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java @@ -20,11 +20,9 @@ */ package org.apache.qpid.client.prefetch; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.test.utils.QpidBrokerTestCase; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Connection; import javax.jms.Destination; @@ -35,9 +33,12 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.test.utils.QpidBrokerTestCase; public class PrefetchBehaviourTest extends QpidBrokerTestCase @@ -231,5 +232,62 @@ public class PrefetchBehaviourTest extends QpidBrokerTestCase assertNotNull("The second consumer should get 9 messages, but received only " + i,m); } } + + public void testPrefetchWindowExpandsOnReceiveTransaction() throws Exception + { + + _normalConnection.start(); + + //create a second connection with prefetch set to 1 + + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString()); + Connection prefetch1Connection = getConnection(); + Session consumerSession = prefetch1Connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = consumerSession.createConsumer(consumerSession.createQueue(getTestQueueName())); + + + Session producerSession = _normalConnection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = producerSession.createQueue(getTestQueueName()); + MessageProducer producer = producerSession.createProducer(queue); + + for (int i = 0; i < 5; i++) + { + producer.send(producerSession.createTextMessage("test")); + } + producerSession.commit(); + + + prefetch1Connection.start(); + + + + Message message = consumer.receive(1000l); + assertNotNull(message); + message = consumer.receive(1000l); + assertNotNull(message); + message = consumer.receive(1000l); + assertNotNull(message); + + + Connection secondConsumerConnection = getConnection(); + Session secondConsumerSession = secondConsumerConnection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer secondConsumer = secondConsumerSession.createConsumer(consumerSession.createQueue(getTestQueueName())); + secondConsumerConnection.start(); + + message = secondConsumer.receive(1000l); + assertNotNull(message); + + message = secondConsumer.receive(1000l); + assertNotNull(message); + + consumerSession.commit(); + secondConsumerSession.commit(); + + message = consumer.receive(1000l); + assertNull(message); + + } + + } |
