summaryrefslogtreecommitdiff
path: root/java/systests/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-11-06 11:12:37 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-11-06 11:12:37 +0000
commitf0919411217014a00c7d8359802ec17bbaf99d66 (patch)
treecf6b08c36d87041f3532c63431e4aefd82597387 /java/systests/src
parent95f5d827b48e5ac49a5ded1bc9c6482de3ebbe42 (diff)
downloadqpid-python-f0919411217014a00c7d8359802ec17bbaf99d66.tar.gz
QPID-662 Transactional state not correctly reported after fail over. We now record if we have sent any messages
from here we can check if we have failed over and so have lost messages from the transaction making it invalid. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@592374 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java253
1 files changed, 237 insertions, 16 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java b/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java
index 2957dda869..75b6fbaedd 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java
@@ -22,23 +22,26 @@
package org.apache.qpid.server.txn;
import junit.framework.TestCase;
-import junit.framework.Assert;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQSessionDirtyException;
import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
-import org.apache.log4j.Logger;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
-import javax.jms.Session;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
-import javax.jms.ConnectionFactory;
-import javax.jms.Connection;
-import javax.jms.Message;
+import javax.jms.Session;
import javax.jms.TextMessage;
-import javax.jms.MessageListener;
-import javax.naming.spi.InitialContextFactory;
+import javax.jms.TransactionRolledBackException;
import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
import java.util.Hashtable;
import java.util.concurrent.CountDownLatch;
@@ -49,7 +52,8 @@ public class TxnTest extends TestCase implements MessageListener
private static final Logger _logger = Logger.getLogger(TxnTest.class);
- protected final String BROKER = "vm://:1";//"localhost";
+ //Set retries quite high to ensure that it continues to retry whilst the InVM broker is restarted.
+ protected final String BROKER = "vm://:1?retries='1000'";
protected final String VHOST = "/test";
protected final String QUEUE = "TxnTestQueue";
@@ -75,7 +79,11 @@ public class TxnTest extends TestCase implements MessageListener
Hashtable<String, String> env = new Hashtable<String, String>();
env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'");
- env.put("queue.queue", QUEUE);
+
+ // Ensure that the queue is unique for each test run.
+ // There appears to be other old sesssion/consumers when looping the tests this means that sometimes a message
+ // will disappear. When it has actually gone to the old client.
+ env.put("queue.queue", QUEUE + "-" + System.currentTimeMillis());
_context = factory.getInitialContext(env);
@@ -109,7 +117,7 @@ public class TxnTest extends TestCase implements MessageListener
{
_producerConnection.close();
}
-
+
super.tearDown();
if (BROKER.startsWith("vm://"))
@@ -124,10 +132,8 @@ public class TxnTest extends TestCase implements MessageListener
_consumer.setMessageListener(this);
_clientConnection.start();
- //Set TTL
_producer.send(_producerSession.createTextMessage("TxtTestML"));
-
try
{
//Wait for message to arrive
@@ -150,7 +156,6 @@ public class TxnTest extends TestCase implements MessageListener
public void onMessage(Message message)
{
-
try
{
assertEquals("Incorrect Message Received.", "TxtTestML", ((TextMessage) message).getText());
@@ -170,19 +175,235 @@ public class TxnTest extends TestCase implements MessageListener
{
_clientConnection.start();
- //Set TTL
_producer.send(_producerSession.createTextMessage("TxtTestReceive"));
//Receive Message
Message received = _consumer.receive(1000);
+ _clientSession.commit();
+
assertEquals("Incorrect Message Received.", "TxtTestReceive", ((TextMessage) received).getText());
- //Receive Message
+ //Receive Message
received = _consumer.receive(1000);
assertNull("More messages received", received);
_consumer.close();
}
+
+ /**
+ * Test that after the connection has failed over that a sent message is still correctly receieved.
+ * Using Auto-Ack consumer.
+ *
+ * @throws JMSException
+ */
+ public void testReceiveAfterFailover() throws JMSException
+ {
+// System.err.println("testReceiveAfterFailover");
+ _clientConnection.close();
+
+ MessageConsumer consumer = _producerSession.createConsumer(_queue);
+
+ failServer();
+
+// System.err.println("Server restarted");
+
+ String MESSAGE_TXT = "TxtTestReceiveAfterFailoverTX";
+
+// System.err.println("Prod Session:" + _producerSession + ":" + ((AMQSession) _producerSession).isClosed());
+
+ Message sent = _producerSession.createTextMessage(MESSAGE_TXT);
+// System.err.println("Created message");
+
+ _producer.send(sent);
+// System.err.println("Sent message");
+
+ //Verify correct message received
+ Message received = consumer.receive(10000);
+// System.err.println("Message Receieved:" + received);
+
+ assertNotNull("Message should be received.", received);
+ assertEquals("Incorrect Message Received.", MESSAGE_TXT, ((TextMessage) received).getText());
+
+ //Check no more messages are received
+ received = consumer.receive(1000);
+ System.err.println("Second receive completed.");
+
+ assertNull("More messages received", received);
+
+ _producer.close();
+// System.err.println("Close producer");
+
+ consumer.close();
+// System.err.println("Close consumer");
+
+ _producerConnection.close();
+ }
+
+ /**
+ * Test that after the connection has failed over the dirty transaction is notified when calling commit
+ *
+ * @throws JMSException
+ */
+ public void testSendBeforeFailoverThenCommitTx() throws JMSException
+ {
+// System.err.println("testSendBeforeFailoverThenCommitTx");
+ _clientConnection.start();
+
+ //Create a transacted producer.
+ MessageProducer txProducer = _clientSession.createProducer(_queue);
+
+ String MESSAGE_TXT = "testSendBeforeFailoverThenCommitTx";
+
+ //Send the first message
+ txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+
+ failServer();
+
+ //Check that the message isn't received.
+ Message received = _consumer.receive(1000);
+ assertNull("Message received after failover to clean broker!", received);
+
+ //Attempt to commit session
+ try
+ {
+ _clientSession.commit();
+ fail("TransactionRolledBackException not thrown");
+ }
+ catch (JMSException jmse)
+ {
+ if (!(jmse instanceof TransactionRolledBackException))
+ {
+ fail(jmse.toString());
+ }
+ }
+
+ //Close consumer & producer
+ _consumer.close();
+ txProducer.close();
+ }
+
+ /**
+ * Test that after the connection has failed over the dirty transaction is fast failed by throwing an
+ * Exception on the next send.
+ *
+ * @throws JMSException
+ */
+ public void testSendBeforeFailoverThenSendTx() throws JMSException
+ {
+// System.err.println("testSendBeforeFailoverThenSendTx");
+
+ _clientConnection.start();
+ MessageProducer txProducer = _clientSession.createProducer(_queue);
+
+ String MESSAGE_TXT = "TxtTestSendBeforeFailoverTx";
+
+ //Send the first message
+ txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+
+ failServer();
+
+ //Check that the message isn't received.
+ Message received = _consumer.receive(1000);
+ assertNull("Message received after failover to clean broker!", received);
+
+ //Attempt to send another message on the session, here we should fast fail.
+ try
+ {
+ txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+ fail("JMSException not thrown");
+ }
+ catch (JMSException jmse)
+ {
+ if (!(jmse.getLinkedException() instanceof AMQSessionDirtyException))
+ {
+ fail(jmse.toString());
+ }
+ }
+
+
+ _consumer.close();
+ }
+
+ public void testSendBeforeFailoverThenSend2Tx() throws JMSException
+ {
+// System.err.println("testSendBeforeFailoverThenSendTx");
+
+ _clientConnection.start();
+ MessageProducer txProducer = _clientSession.createProducer(_queue);
+
+ String MESSAGE_TXT = "TxtTestSendBeforeFailoverTx";
+
+ //Send the first message
+ txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+
+ failServer();
+
+ //Check that the message isn't received.
+ Message received = _consumer.receive(1000);
+ assertNull("Message received after failover to clean broker!", received);
+
+ _clientSession.rollback();
+
+ //Attempt to send another message on the session, here we should fast fail.
+ try
+ {
+ txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+ txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+ }
+ catch (JMSException jmse)
+ {
+ if (jmse.getLinkedException() instanceof AMQSessionDirtyException)
+ {
+ fail(jmse.toString());
+ }
+ }
+
+
+ _consumer.close();
+ }
+
+
+ private void failServer()
+ {
+ if (BROKER.startsWith("vm://"))
+ {
+ //Work around for MessageStore not being initialise and the send not fully completing before the failover occurs.
+ try
+ {
+ Thread.sleep(5000);
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+
+ TransportConnection.killAllVMBrokers();
+ ApplicationRegistry.remove(1);
+ try
+ {
+ TransportConnection.createVMBroker(1);
+ }
+ catch (AMQVMBrokerCreationException e)
+ {
+ _logger.error("Unable to restart broker due to :" + e);
+ }
+
+ //Work around for receive not being failover aware.. because it is the first receive it trys to
+ // unsuspend the channel but in this case the ChannelFlow command goes on the old session and the response on the
+ // new one ... though I thought the statemanager recorded the listeners so should be ok.???
+ try
+ {
+ Thread.sleep(5000);
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+
+ }
+
+ }
+
}