From 2fd65c97562f0947cbd649b1f2874dbe9f39653b Mon Sep 17 00:00:00 2001 From: Andrew Donald Kennedy Date: Thu, 17 Feb 2011 14:34:10 +0000 Subject: QPID-3047: Fix QueueDepthWithSelectorTest on 0-10 Refactor test and fix 0-10 client session to flush acks git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1071620 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/AMQSession_0_10.java | 1 + .../server/queue/QueueDepthWithSelectorTest.java | 141 +++++---------------- 2 files changed, 31 insertions(+), 111 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 1eaccf53fc..6fa22b7971 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -942,6 +942,7 @@ public class AMQSession_0_10 extends AMQSession env = new Hashtable(); - - env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID/" + VHOST + "?brokerlist='" + BROKER + "'"); - env.put("queue.queue", QUEUE); - - _context = factory.getInitialContext(env); _messages = new Message[MSG_COUNT]; - _queue = (Queue) _context.lookup("queue"); - init(); - } - - @Override - public void tearDown() throws Exception - { - if (_producerConnection != null) - { - _producerConnection.close(); - } - - if (_clientConnection != null) - { - _clientConnection.close(); - } + _queue = getTestQueue(); + + //Create Producer + _producerConnection = getConnection(); + _producerConnection.start(); + _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _producer = _producerSession.createProducer(_queue); - TransportConnection.killVMBroker(ApplicationRegistry.DEFAULT_INSTANCE); - super.tearDown(); + // Create consumer + _clientConnection = getConnection(); + _clientConnection.start(); + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumer = _clientSession.createConsumer(_queue, "key = 23"); } public void test() throws Exception @@ -139,7 +94,8 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase //Verify we get all the messages. _logger.info("Verifying messages"); - verifyAllMessagesRecevied(0); + verifyAllMessagesRecevied(50); + verifyBrokerState(0); //Close the connection.. .giving the broker time to clean up its state. _clientConnection.close(); @@ -149,39 +105,18 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase verifyBrokerState(0); } - protected void init() throws NamingException, JMSException, AMQException - { - //Create Producer - _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); - _producerConnection.start(); - _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - _producer = _producerSession.createProducer(_queue); - - // Create consumer - _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); - _clientConnection.start(); - _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - _consumer = _clientSession.createConsumer(_queue, "key = 23"); - } - protected void verifyBrokerState(int expectedDepth) { try { - _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); - - _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - catch (Exception e) - { - fail(e.getMessage()); - } + Connection connection = getConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - try - { Thread.sleep(2000); - long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _queue); + long queueDepth = ((AMQSession) session).getQueueDepth((AMQDestination) _queue); assertEquals("Session reports Queue depth not as expected", expectedDepth, queueDepth); + + connection.close(); } catch (InterruptedException e) { @@ -191,34 +126,22 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase { fail(e.getMessage()); } - finally + catch (Exception e) { - try - { - _clientConnection.close(); - } - catch (JMSException e) - { - fail(e.getMessage()); - } + fail(e.getMessage()); } - } protected void verifyAllMessagesRecevied(int expectedDepth) throws Exception { - boolean[] msgIdRecevied = new boolean[MSG_COUNT]; - for (int i = 0; i < MSG_COUNT; i++) + for (int i = 0; i < expectedDepth; i++) { _messages[i] = _consumer.receive(1000); assertNotNull("should have received a message but didn't", _messages[i]); } - - long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _queue); - assertEquals("Session reports Queue depth not as expected", expectedDepth, queueDepth); - + //Check received messages int msgId = 0; for (Message msg : _messages) @@ -231,7 +154,7 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase } //Check all received - for (msgId = 0; msgId < MSG_COUNT; msgId++) + for (msgId = 0; msgId < expectedDepth; msgId++) { assertTrue("Message " + msgId + " not received.", msgIdRecevied[msgId]); } @@ -241,9 +164,6 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase * Get the next message putting the given count into the intProperties as ID. * * @param msgNo the message count to store as ID. - * - * @return - * * @throws JMSException */ protected Message nextMessage(int msgNo) throws JMSException @@ -253,5 +173,4 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase send.setIntProperty("key", 23); return send; } - } -- cgit v1.2.1