diff options
| author | Keith Wall <kwall@apache.org> | 2011-11-07 08:27:31 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2011-11-07 08:27:31 +0000 |
| commit | 68a3db2e1f58d1bacffbe62885519fe0a123060f (patch) | |
| tree | 1c30e42b38f4a3b0f3ec433190359ae3a51adac8 /java/systests/src | |
| parent | 1871d98aa26c21d0a75419be1818e706ace08b65 (diff) | |
| download | qpid-python-68a3db2e1f58d1bacffbe62885519fe0a123060f.tar.gz | |
QPID-2848: refactored message consumer: pre-aquire, capacity decisions are moved into consumer, renamed field noConsume into browseOnly, cleaned up selector filter code.
Applied patch from Andrew MacBean <andymacbean@gmail.com>, Oleksandr Rudyy<orudyy@gmail.com> and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1198642 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src')
2 files changed, 113 insertions, 10 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java index d6caf05d33..4eb328f091 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java @@ -261,8 +261,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase protected void checkOverlappingMultipleGetEnum(int expectedMessages, int browserEnumerationCount, String selector) throws JMSException { QueueBrowser queueBrowser = selector == null ? - _clientSession.createBrowser(_queue) : _clientSession.createBrowser(_queue); -// _clientSession.createBrowser(_queue) : _clientSession.createBrowser(_queue, selector); + _clientSession.createBrowser(_queue) : _clientSession.createBrowser(_queue, selector); Enumeration[] msgs = new Enumeration[browserEnumerationCount]; int[] msgCount = new int[browserEnumerationCount]; @@ -347,7 +346,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase protected void checkQueueDepthWithSelectors(int totalMessages, int clients) throws JMSException { - String selector = MESSAGE_ID_PROPERTY + " % " + clients; + String selector = MESSAGE_ID_PROPERTY + " % " + clients + " = 0" ; checkOverlappingMultipleGetEnum(totalMessages / clients, clients, selector); } diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index 8c3c247e2b..b70b2f90e4 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -1,4 +1,3 @@ -package org.apache.qpid.test.client.destination; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,11 +18,13 @@ package org.apache.qpid.test.client.destination; * under the License. * */ - +package org.apache.qpid.test.client.destination; import java.util.Collections; +import java.util.Enumeration; import java.util.HashMap; import java.util.Hashtable; +import java.util.List; import java.util.Map; import java.util.Properties; @@ -34,6 +35,7 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; +import javax.jms.QueueBrowser; import javax.jms.QueueReceiver; import javax.jms.QueueSession; import javax.jms.Session; @@ -475,13 +477,13 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { prod.send(jmsSession.createTextMessage("msg" + i) ); } - - for (int i=0; i< 9; i++) + Message msg = null; + for (int i=0; i< 10; i++) { - cons.receive(); + msg = cons.receive(RECEIVE_TIMEOUT); + assertNotNull("Should have received " + i + " message", msg); + assertEquals("Unexpected message received", "msg" + i, ((TextMessage)msg).getText()); } - Message msg = cons.receive(RECEIVE_TIMEOUT); - assertNotNull("Should have received the 10th message",msg); assertNull("Shouldn't have received the 11th message as capacity is 10",cons.receive(RECEIVE_TIMEOUT)); msg.acknowledge(); for (int i=11; i<16; i++) @@ -1182,4 +1184,106 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertNotNull("The consumer on the queue bound to the alt-exchange should receive the message",cons.receive(1000)); cons.close(); } + + public void testQueueBrowserWithSelectorAutoAcknowledgement() throws Exception + { + assertQueueBrowserWithSelector(Session.AUTO_ACKNOWLEDGE); + } + + public void testQueueBrowserWithSelectorClientAcknowldgement() throws Exception + { + assertQueueBrowserWithSelector(Session.CLIENT_ACKNOWLEDGE); + } + + public void testQueueBrowserWithSelectorTransactedSession() throws Exception + { + assertQueueBrowserWithSelector(Session.SESSION_TRANSACTED); + } + + public void testConsumerWithSelectorAutoAcknowledgement() throws Exception + { + assertConsumerWithSelector(Session.AUTO_ACKNOWLEDGE); + } + + public void testConsumerWithSelectorClientAcknowldgement() throws Exception + { + assertConsumerWithSelector(Session.CLIENT_ACKNOWLEDGE); + } + + public void testConsumerWithSelectorTransactedSession() throws Exception + { + assertConsumerWithSelector(Session.SESSION_TRANSACTED); + } + + private void assertQueueBrowserWithSelector(int acknowledgement) throws Exception + { + String queueAddress = "ADDR:" + getTestQueueName() + ";{create: always}"; + + boolean transacted = acknowledgement == Session.SESSION_TRANSACTED; + Session session = _connection.createSession(transacted, acknowledgement); + + Queue queue = session.createQueue(queueAddress); + + final int numberOfMessages = 10; + List<Message> sentMessages = sendMessage(session, queue, numberOfMessages); + assertNotNull("Messages were not sent", sentMessages); + assertEquals("Unexpected number of messages were sent", numberOfMessages, sentMessages.size()); + + QueueBrowser browser = session.createBrowser(queue, INDEX + "%2=0"); + _connection.start(); + + Enumeration<Message> enumaration = browser.getEnumeration(); + + int counter = 0; + int expectedIndex = 0; + while (enumaration.hasMoreElements()) + { + Message m = enumaration.nextElement(); + assertNotNull("Expected not null message at step " + counter, m); + int messageIndex = m.getIntProperty(INDEX); + assertEquals("Unexpected index", expectedIndex, messageIndex); + expectedIndex += 2; + counter++; + } + assertEquals("Unexpected number of messsages received", 5, counter); + } + + private void assertConsumerWithSelector(int acknowledgement) throws Exception + { + String queueAddress = "ADDR:" + getTestQueueName() + ";{create: always}"; + + boolean transacted = acknowledgement == Session.SESSION_TRANSACTED; + Session session = _connection.createSession(transacted, acknowledgement); + + Queue queue = session.createQueue(queueAddress); + + final int numberOfMessages = 10; + List<Message> sentMessages = sendMessage(session, queue, numberOfMessages); + assertNotNull("Messages were not sent", sentMessages); + assertEquals("Unexpected number of messages were sent", numberOfMessages, sentMessages.size()); + + MessageConsumer consumer = session.createConsumer(queue, INDEX + "%2=0"); + + int expectedIndex = 0; + for (int i = 0; i < 5; i++) + { + Message m = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Expected not null message at step " + i, m); + int messageIndex = m.getIntProperty(INDEX); + assertEquals("Unexpected index", expectedIndex, messageIndex); + expectedIndex += 2; + + if (transacted) + { + session.commit(); + } + else if (acknowledgement == Session.CLIENT_ACKNOWLEDGE) + { + m.acknowledge(); + } + } + + Message m = consumer.receive(RECEIVE_TIMEOUT); + assertNull("Unexpected message received", m); + } } |
