summaryrefslogtreecommitdiff
path: root/java/systests/src
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-01-24 15:43:23 +0000
committerAidan Skinner <aidan@apache.org>2008-01-24 15:43:23 +0000
commit02bf06d891b9baab1d03080a9332138bdf2ea0b9 (patch)
tree33016507be7862396047a03e65c8817e69158dc5 /java/systests/src
parent0a0e1e1417c83789954e9cc19787297009b8803e (diff)
downloadqpid-python-02bf06d891b9baab1d03080a9332138bdf2ea0b9.tar.gz
Merged revisions 598285,598619,598721,598834-598835,599375,599531,599533,599572,599805,602134,604151,604928,605536,605542,606015-606016 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1.1 ........ r598285 | ritchiem | 2007-11-26 14:16:01 +0000 (Mon, 26 Nov 2007) | 3 lines QPID-92, QPID-564 : Upgraded Mina to 1.0.1 still not good enough but all future versions currently have a bug with the CumulativeProtocolDecoder. It compact()s the buffer which breaks slices. Added MultiThread Support which is some of the feature set of QPID-564 ........ r598619 | ritchiem | 2007-11-27 12:51:14 +0000 (Tue, 27 Nov 2007) | 1 line Renamed POMs to M2.1.1 Removed erroneous equals() in SpecificMethodFrameListenerTest ........ r598721 | ritchiem | 2007-11-27 18:09:33 +0000 (Tue, 27 Nov 2007) | 1 line QPID-621 : Patch Supplied by Aidan Skinner. Msg Ack after msg consumer is closed. ........ r598834 | ritchiem | 2007-11-28 00:45:32 +0000 (Wed, 28 Nov 2007) | 14 lines QPID-679 : Patch provided by Aidan Skinner and additional from odd problems during test runs. AMQChannel - Catch and log AMQException occuring when requeue()-ing. Previously exceptions wouldn't be caught at all. The requeue() is called during closure so there is nothing we can do protocol wise on error other than log the issue and continue with any other shutdown that is needed. AMQMinaProtocolSession & AMQPFastProtocolHandler . Additions to catch and log AMQExceptions. Changes to AMQMinaProtocolSession were done to ignore all input on a closing session other than the close-ok. Previously only Protocol frames were ignored this resulted in Content*Body-s still being processed. Additional checks were made for the MessageStoreClosedException to log and continue. As said else were we need to seperate protocol exceptoions(AMQException) from internal code exception handling. Further All AMQExceptions occuring in the frameReceived method are now caught and logged. Allowing them to propogate higher will only result in thread death. AMQPFastProtocolHandler Caught AMQExceptions occuring whilst closing the session. Again allowing these to continue will result in thread death. There is not a lot that can be done other than log the problem as the session is already closed by this point. Prevented the stacktrace associated with a session exception being printed in the exceptionCaught method when the problem was an IO Exception. This doesn't add anything useful and only adds to the log file sizes. ApplicationRegistry - Added removeAll option which ensures that all ARs are correctly purged so that we can attempt to clean up between Unit Tests. MemoryMessageStore - This was causing us real problems during the failover testing. Similar checks should probably be made to any other Message Store Impl. The issue was that when shutting down the broker the MS.close() method is called this sets all the storage to null. However, there may still be message processing going on as the close() does not attempt to stop connection processing. Hence we now check to see if the Store is close throwing a MSClosedException if required. This prevents NPEs that have been seen during Unit failover testing. In fact the close() is called as a request to shutdown the ApplicationRegistry, but this only occurs from tests and broker shutdown, no attempt to unbind or prevent further connections during this period is yet done. CLIENT CHANGES AMQConnection - Added method to check if failover is in progress. AMQClient - Upgraded acknowledge() exception to JMSException for errors due to failover. Also , added call to update consumers as a result of failover. BasicMessageConsumer - Changes to acquireReceiving to take in to consideration blocking for failover to occur. wrt receiveNoWait.. which previously blocked for failover to complete... not exactly noWait. acknowledge will now TransportConnection - Update to ensure all inVM brokers are correctly killed. FailoverTest - QPID-679 - Finder of all the above problems. ........ r598835 | ritchiem | 2007-11-28 01:01:05 +0000 (Wed, 28 Nov 2007) | 1 line CommitRollbackTest - this one just was never right.. now we have something better. ........ r599375 | ritchiem | 2007-11-29 10:58:08 +0000 (Thu, 29 Nov 2007) | 1 line Update to broker to address fanout python failure. ........ r599531 | ritchiem | 2007-11-29 17:56:12 +0000 (Thu, 29 Nov 2007) | 1 line QPID-92 QPID-564 Forgot to upgrade mina to 1.0.1 ........ r599533 | ritchiem | 2007-11-29 18:25:21 +0000 (Thu, 29 Nov 2007) | 1 line QPID-564 QPID-92 Tidied up a few points and fixed infinite loop in Read IO Thread ........ r599572 | ritchiem | 2007-11-29 20:56:22 +0000 (Thu, 29 Nov 2007) | 2 lines Mina Fix: Vm Pipe Starts Connection session before acceptor session. This results in protocol frames arriving before the protocol decoder has been configured on the InVM Broker. Verification of this could be done by adding a client side filter that delays the first message by a few seconds. ........ r599805 | ritchiem | 2007-11-30 12:47:08 +0000 (Fri, 30 Nov 2007) | 1 line Added new simple Request/Repsonse code as my last commit here seems to have missed the actual code. ........ r602134 | rupertlssmith | 2007-12-07 16:00:14 +0000 (Fri, 07 Dec 2007) | 1 line Added JDNI config for two broker, failover setup for failover tests. Also passed into FT tests config. ........ r604151 | ritchiem | 2007-12-14 10:40:37 +0000 (Fri, 14 Dec 2007) | 2 lines QPID-707 : Added new test to check message count on broker as messages are consumed to ensure that an ack is sent at 5000 mgs. Added acks on message consumer closure. Augmented VMTestCase to have helper methods for accessing broker statistics. ........ r604928 | rupertlssmith | 2007-12-17 17:00:10 +0000 (Mon, 17 Dec 2007) | 1 line DUPS_OK mode set to be same as AUTO_ACK, fixed broken dups ok test. ........ r605536 | rupertlssmith | 2007-12-19 13:40:05 +0000 (Wed, 19 Dec 2007) | 1 line Messages were being sent mandatory by default, set to false. ........ r605542 | rupertlssmith | 2007-12-19 13:53:44 +0000 (Wed, 19 Dec 2007) | 1 line Changed test configs to use colons instead of commas. ........ r606015 | rgodfrey | 2007-12-20 20:08:01 +0000 (Thu, 20 Dec 2007) | 2 lines QPID-714 : (Patch from Aidan Skinner) Issue with competing, transactional/client-ack consumers Ack each individual message on commit, not use multiple acks ........ r606016 | rgodfrey | 2007-12-20 20:12:25 +0000 (Thu, 20 Dec 2007) | 2 lines QPID-714 : (Patch from Aidan Skinner) Issue with competing, transactional/client-ack consumers Ack each individual message on commit, not use multiple acks ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@614906 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java21
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java140
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java222
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java151
4 files changed, 521 insertions, 13 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
index 9629f87d46..624d9c9f3d 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
@@ -20,27 +20,16 @@
*/
package org.apache.qpid.test;
-import junit.extensions.TestSetup;
-
-import junit.framework.Test;
import junit.framework.TestCase;
-
import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
import javax.naming.Context;
import javax.naming.spi.InitialContextFactory;
-
import java.util.HashMap;
import java.util.Hashtable;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
public class VMTestCase extends TestCase
@@ -89,7 +78,7 @@ public class VMTestCase extends TestCase
}
env.put("connectionfactory.connection", "amqp://guest:guest@" + _clientID + _virtualhost + "?brokerlist='"
- + _brokerlist + "'");
+ + _brokerlist + "'");
for (Map.Entry<String, String> c : _connections.entrySet())
{
@@ -121,6 +110,12 @@ public class VMTestCase extends TestCase
super.tearDown();
}
+ public int getMessageCount(String queueName)
+ {
+ return ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(_virtualhost.substring(1))
+ .getQueueRegistry().getQueue(new AMQShortString(queueName)).getMessageCount();
+ }
+
public void testDummyinVMTestCase()
{
// keep maven happy
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
new file mode 100644
index 0000000000..037c8285bc
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
@@ -0,0 +1,140 @@
+package org.apache.qpid.test.client;
+
+import org.apache.qpid.test.VMTestCase;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.NamingException;
+import java.util.concurrent.CountDownLatch;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+public class DupsOkTest extends VMTestCase
+{
+
+ private Queue _queue;
+ private static final int MSG_COUNT = 9999;
+ private CountDownLatch _awaitCompletion = new CountDownLatch(1);
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _queue = (Queue) _context.lookup("queue");
+
+ //CreateQueue
+ ((ConnectionFactory) _context.lookup("connection")).createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(_queue).close();
+
+ //Create Producer put some messages on the queue
+ Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ producerConnection.start();
+
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = producerSession.createProducer(_queue);
+
+ for (int count = 1; count <= MSG_COUNT; count++)
+ {
+ Message msg = producerSession.createTextMessage("Message " + count);
+ msg.setIntProperty("count", count);
+ producer.send(msg);
+ }
+
+ producerConnection.close();
+ }
+
+ public void testDupsOK() throws NamingException, JMSException, InterruptedException
+ {
+ //Create Client
+ Connection clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ clientConnection.start();
+
+ Session clientSession = clientConnection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
+ MessageConsumer consumer = clientSession.createConsumer(_queue);
+
+ consumer.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ if (message == null)
+ {
+ fail("Should not get null messages");
+ }
+
+ if (message instanceof TextMessage)
+ {
+ try
+ {
+ /*if (message.getIntProperty("count") == 5000)
+ {
+ assertEquals("The queue should have 4999 msgs left", 4999, getMessageCount(_queue.getQueueName()));
+ }*/
+
+ if (message.getIntProperty("count") == 9999)
+ {
+ assertEquals("The queue should have 0 msgs left", 0, getMessageCount(_queue.getQueueName()));
+
+ //This is the last message so release test.
+ _awaitCompletion.countDown();
+ }
+
+ }
+ catch (JMSException e)
+ {
+ fail("Unable to get int property 'count'");
+ }
+ }
+ else
+ {
+ fail("");
+ }
+ }
+ });
+
+ try
+ {
+ _awaitCompletion.await();
+ }
+ catch (InterruptedException e)
+ {
+ fail("Unable to wait for test completion");
+ throw e;
+ }
+
+// consumer.close();
+
+ clientConnection.close();
+
+ assertEquals("The queue should have 0 msgs left", 0, getMessageCount(_queue.getQueueName()));
+ }
+
+
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
new file mode 100644
index 0000000000..fffe073362
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
@@ -0,0 +1,222 @@
+package org.apache.qpid.test.client.failover;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.log4j.Logger;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+
+public class FailoverTest extends TestCase implements ConnectionListener
+{
+ private static final Logger _logger = Logger.getLogger(FailoverTest.class);
+
+ private static final int NUM_BROKERS = 2;
+ private static final String BROKER = "amqp://guest:guest@/test?brokerlist='vm://:%d;vm://:%d'";
+ private static final String QUEUE = "queue";
+ private static final int NUM_MESSAGES = 10;
+ private Connection con;
+ private AMQConnectionFactory conFactory;
+ private Session prodSess;
+ private AMQQueue q;
+ private MessageProducer prod;
+ private Session conSess;
+ private MessageConsumer consumer;
+
+ private static int usedBrokers = 0;
+ private CountDownLatch failoverComplete;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ // Create two VM brokers
+
+ for (int i = 0; i < NUM_BROKERS; i++)
+ {
+ usedBrokers++;
+
+ TransportConnection.createVMBroker(usedBrokers);
+ }
+ //undo last addition
+
+ conFactory = new AMQConnectionFactory(String.format(BROKER, usedBrokers - 1, usedBrokers));
+ _logger.info("Connecting on:" + conFactory.getConnectionURL());
+ con = conFactory.createConnection();
+ ((AMQConnection) con).setConnectionListener(this);
+ con.start();
+ failoverComplete = new CountDownLatch(1);
+ }
+
+ private void init(boolean transacted, int mode) throws JMSException
+ {
+ prodSess = con.createSession(transacted, mode);
+ q = new AMQQueue("amq.direct", QUEUE);
+ prod = prodSess.createProducer(q);
+ conSess = con.createSession(transacted, mode);
+ consumer = conSess.createConsumer(q);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ con.close();
+ }
+ catch (Exception e)
+ {
+
+ }
+
+ try
+ {
+ TransportConnection.killAllVMBrokers();
+ ApplicationRegistry.removeAll();
+ }
+ catch (Exception e)
+ {
+ fail("Unable to clean up");
+ }
+ super.tearDown();
+ }
+
+ private void consumeMessages(int toConsume) throws JMSException
+ {
+ Message msg;
+ for (int i = 0; i < toConsume; i++)
+ {
+ msg = consumer.receive(1000);
+ assertNotNull("Message " + i + " was null!", msg);
+ assertEquals("message " + i, ((TextMessage) msg).getText());
+ }
+ }
+
+ private void sendMessages(int totalMessages) throws JMSException
+ {
+ for (int i = 0; i < totalMessages; i++)
+ {
+ prod.send(prodSess.createTextMessage("message " + i));
+ }
+
+// try
+// {
+// Thread.sleep(100 * totalMessages);
+// }
+// catch (InterruptedException e)
+// {
+// //evil ignoring of IE
+// }
+ }
+
+ public void testP2PFailover() throws Exception
+ {
+ testP2PFailover(NUM_MESSAGES, true);
+ }
+
+ public void testP2PFailoverWithMessagesLeft() throws Exception
+ {
+ testP2PFailover(NUM_MESSAGES, false);
+ }
+
+ private void testP2PFailover(int totalMessages, boolean consumeAll) throws JMSException
+ {
+ Message msg = null;
+ init(false, Session.AUTO_ACKNOWLEDGE);
+ sendMessages(totalMessages);
+
+ // Consume some messages
+ int toConsume = totalMessages;
+ if (!consumeAll)
+ {
+ toConsume = totalMessages / 2;
+ }
+
+ consumeMessages(toConsume);
+
+ _logger.info("Failing over");
+
+ causeFailure();
+
+ msg = consumer.receive(500);
+ //todo: reinstate
+ assertNull("Should not have received message from new broker!", msg);
+ // Check that messages still sent / received
+ sendMessages(totalMessages);
+ consumeMessages(totalMessages);
+ }
+
+ private void causeFailure()
+ {
+ _logger.info("Failover");
+
+ TransportConnection.killVMBroker(usedBrokers - 1);
+ ApplicationRegistry.remove(usedBrokers - 1);
+
+ _logger.info("Awaiting Failover completion");
+ try
+ {
+ failoverComplete.await();
+ }
+ catch (InterruptedException e)
+ {
+ //evil ignore IE.
+ }
+ }
+
+ public void testClientAckFailover() throws Exception
+ {
+ init(false, Session.CLIENT_ACKNOWLEDGE);
+ sendMessages(1);
+ Message msg = consumer.receive();
+ assertNotNull("Expected msgs not received", msg);
+
+
+ causeFailure();
+
+ Exception failure = null;
+ try
+ {
+ msg.acknowledge();
+ }
+ catch (Exception e)
+ {
+ failure = e;
+ }
+ assertNotNull("Exception should be thrown", failure);
+ }
+
+ public void bytesSent(long count)
+ {
+ }
+
+ public void bytesReceived(long count)
+ {
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ failoverComplete.countDown();
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
new file mode 100644
index 0000000000..f83e6e51cb
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
@@ -0,0 +1,151 @@
+package org.apache.qpid.test.unit.ack;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.test.VMTestCase;
+
+public class AcknowledgeTest extends VMTestCase
+{
+ private static final int NUM_MESSAGES = 50;
+ private Connection _con;
+ private Queue _queue;
+ private MessageProducer _producer;
+ private Session _producerSession;
+ private Session _consumerSession;
+ private MessageConsumer _consumerA;
+ private MessageConsumer _consumerB;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _queue = (Queue) _context.lookup("queue");
+
+ //CreateQueue
+ ((ConnectionFactory) _context.lookup("connection")).createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(_queue).close();
+
+ //Create Producer put some messages on the queue
+ _con = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+ _con.start();
+ }
+
+ private void init(boolean transacted, int mode) throws JMSException {
+ _producerSession = _con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _consumerSession = _con.createSession(transacted, mode);
+ _producer = _producerSession.createProducer(_queue);
+ _consumerA = _consumerSession.createConsumer(_queue);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ try
+ {
+ TransportConnection.killAllVMBrokers();
+ ApplicationRegistry.removeAll();
+ }
+ catch (Exception e)
+ {
+ fail("Unable to clean up");
+ }
+
+ }
+
+ private void consumeMessages(int toConsume, MessageConsumer consumer) throws JMSException
+ {
+ Message msg;
+ for (int i = 0; i < toConsume; i++)
+ {
+ msg = consumer.receive(1000);
+ assertNotNull("Message " + i + " was null!", msg);
+ assertEquals("message " + i, ((TextMessage) msg).getText());
+ }
+ }
+
+ private void sendMessages(int totalMessages) throws JMSException
+ {
+ for (int i = 0; i < totalMessages; i++)
+ {
+ _producer.send(_producerSession.createTextMessage("message " + i));
+ }
+ }
+
+ private void testMessageAck(boolean transacted, int mode) throws Exception
+ {
+ init(transacted, mode);
+ sendMessages(NUM_MESSAGES/2);
+ Thread.sleep(1500);
+ _consumerB = _consumerSession.createConsumer(_queue);
+ sendMessages(NUM_MESSAGES/2);
+ int count = 0;
+ Message msg = _consumerB.receive(100);
+ while (msg != null)
+ {
+ if (mode == Session.CLIENT_ACKNOWLEDGE)
+ {
+ msg.acknowledge();
+ }
+ count++;
+ msg = _consumerB.receive(1500);
+ }
+ if (transacted)
+ {
+ _consumerSession.commit();
+ }
+ _consumerA.close();
+ _consumerB.close();
+ _consumerSession.close();
+ assertEquals("Wrong number of messages on queue", NUM_MESSAGES - count, getMessageCount(_queue.getQueueName()));
+ }
+
+ public void test2ConsumersAutoAck() throws Exception
+ {
+ testMessageAck(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void test2ConsumersClientAck() throws Exception
+ {
+ testMessageAck(true, Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void test2ConsumersTx() throws Exception
+ {
+ testMessageAck(true, Session.AUTO_ACKNOWLEDGE);
+ }
+
+}