summaryrefslogtreecommitdiff
path: root/java/systests/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/systests/src')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java14
-rw-r--r--java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java276
2 files changed, 288 insertions, 2 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java b/java/systests/src/main/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java
new file mode 100644
index 0000000000..980fc7285d
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java
@@ -0,0 +1,14 @@
+package org.apache.qpid.client.failover;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+public class AddressBasedFailoverBehaviourTest extends FailoverBehaviourTest
+{
+ @Override
+ protected Destination createDestination(Session session) throws JMSException
+ {
+ return session.createQueue("ADDR:" +getTestQueueName() + "_" + System.currentTimeMillis() + "; {create: always}");
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java b/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
index 66f8fe0546..a5b9c618bc 100644
--- a/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
@@ -20,6 +20,7 @@ package org.apache.qpid.client.failover;
import java.text.MessageFormat;
import java.util.ArrayList;
+import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -33,12 +34,15 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.test.utils.FailoverBaseCase;
/**
@@ -96,6 +100,11 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
*/
private JMSException _exceptionListenerException;
+ /**
+ * Latch to check that failover mutex is hold by a failover thread
+ */
+ private CountDownLatch _failoverStarted;
+
@Override
protected void setUp() throws Exception
{
@@ -105,6 +114,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
_connection.setExceptionListener(this);
((AMQConnection) _connection).setConnectionListener(this);
_failoverComplete = new CountDownLatch(1);
+ _failoverStarted = new CountDownLatch(1);
}
/**
@@ -625,8 +635,134 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
sessionCloseAfterFailoverImpl(Session.AUTO_ACKNOWLEDGE);
}
+ public void testPublishAutoAcknowledgedWhileFailover() throws Exception
+ {
+ publishWhileFailingOver(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testPublishClientAcknowledgedWhileFailover() throws Exception
+ {
+ Message receivedMessage = publishWhileFailingOver(Session.CLIENT_ACKNOWLEDGE);
+ receivedMessage.acknowledge();
+ }
+
+ public void testPublishTransactedAcknowledgedWhileFailover() throws Exception
+ {
+ publishWhileFailingOver(Session.SESSION_TRANSACTED);
+ _consumerSession.commit();
+ }
+
+ public void testPublishAutoAcknowledgedWithFailoverMutex() throws Exception
+ {
+ publishWithFailoverMutex(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testPublishClientAcknowledgedWithFailoverMutex() throws Exception
+ {
+ publishWithFailoverMutex(Session.CLIENT_ACKNOWLEDGE);
+
+ }
+
+ public void testPublishTransactedAcknowledgedWithFailoverMutex() throws Exception
+ {
+ publishWithFailoverMutex(Session.SESSION_TRANSACTED);
+ }
+
+ public void testClientAcknowledgedSessionCloseWhileFailover() throws Exception
+ {
+ sessionCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void testTransactedSessionCloseWhileFailover() throws Exception
+ {
+ sessionCloseWhileFailoverImpl(Session.SESSION_TRANSACTED);
+ }
+
+ public void testAutoAcknowledgedSessionCloseWhileFailover() throws Exception
+ {
+ sessionCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testClientAcknowledgedQueueBrowserCloseWhileFailover() throws Exception
+ {
+ browserCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void testTransactedQueueBrowserCloseWhileFailover() throws Exception
+ {
+ browserCloseWhileFailoverImpl(Session.SESSION_TRANSACTED);
+ }
+
+ public void testAutoAcknowledgedQueueBrowserCloseWhileFailover() throws Exception
+ {
+ browserCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ private Message publishWhileFailingOver(int autoAcknowledge) throws JMSException, InterruptedException
+ {
+ setDelayedFailoverPolicy(5);
+ init(autoAcknowledge, true);
+
+ String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0);
+ Message message = _producerSession.createTextMessage(text);
+
+ failBroker(getFailingPort());
+
+ if(!_failoverStarted.await(5, TimeUnit.SECONDS))
+ {
+ fail("Did not receieve notification failover had started");
+ }
+
+ _producer.send(message);
+
+ if (_producerSession.getTransacted())
+ {
+ _producerSession.commit();
+ }
+
+ Message receivedMessage = _consumer.receive(1000l);
+ assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
+ return receivedMessage;
+ }
+
+ private void publishWithFailoverMutex(int autoAcknowledge) throws JMSException, InterruptedException
+ {
+ setDelayedFailoverPolicy(5);
+ init(autoAcknowledge, true);
+
+ String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0);
+ Message message = _producerSession.createTextMessage(text);
+
+ AMQConnection connection = (AMQConnection)_connection;
+
+ // holding failover mutex should prevent the failover from
+ // proceeding before we try to send the message
+ synchronized(connection.getFailoverMutex())
+ {
+ failBroker(getFailingPort());
+
+ // wait to make sure that connection is lost
+ while(!connection.isFailingOver())
+ {
+ Thread.sleep(25l);
+ }
+
+ try
+ {
+ _producer.send(message);
+ fail("Sending should fail because connection was lost and failover has not yet completed");
+ }
+ catch(JMSException e)
+ {
+ // JMSException is expected
+ }
+ }
+ // wait for failover completion, thus ensuring it actually
+ //got started, before allowing the test to tear down
+ awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
+ }
/**
- * Tests {@link Session#close()} for session with given acknowledge mode
+ * Tests {@link Session#close()} for session with given acknowledge mode
* to ensure that close works after failover.
*
* @param acknowledgeMode session acknowledge mode
@@ -671,7 +807,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED ? true : false;
_consumerSession = _connection.createSession(isTransacted, acknowledgeMode);
- _destination = _consumerSession.createQueue(getTestQueueName() + "_" + System.currentTimeMillis());
+ _destination = createDestination(_consumerSession);
_consumer = _consumerSession.createConsumer(_destination);
if (startConnection)
@@ -684,6 +820,11 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
}
+ protected Destination createDestination(Session session) throws JMSException
+ {
+ return session.createQueue(getTestQueueName() + "_" + System.currentTimeMillis());
+ }
+
/**
* Resends messages if reconnected to a non-clustered broker
*
@@ -879,6 +1020,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
@Override
public boolean preFailover(boolean redirect)
{
+ _failoverStarted.countDown();
return true;
}
@@ -900,6 +1042,39 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
_exceptionListenerException = e;
}
+ /**
+ * Causes 1 second delay before reconnect in order to test whether JMS
+ * methods block while failover is in progress
+ */
+ private static class DelayingFailoverPolicy extends FailoverPolicy
+ {
+
+ private CountDownLatch _suspendLatch;
+ private long _delay;
+
+ public DelayingFailoverPolicy(AMQConnection connection, long delay)
+ {
+ super(connection.getConnectionURL(), connection);
+ _suspendLatch = new CountDownLatch(1);
+ _delay = delay;
+ }
+
+ public void attainedConnection()
+ {
+ try
+ {
+ _suspendLatch.await(_delay, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ // continue
+ }
+ super.attainedConnection();
+ }
+
+ }
+
+
private class FailoverTestMessageListener implements MessageListener
{
// message counter
@@ -946,4 +1121,101 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
return _counter.get();
}
}
+
+ /**
+ * Tests {@link Session#close()} for session with given acknowledge mode
+ * to ensure that it blocks until failover implementation restores connection.
+ *
+ * @param acknowledgeMode session acknowledge mode
+ * @throws JMSException
+ */
+ private void sessionCloseWhileFailoverImpl(int acknowledgeMode) throws JMSException
+ {
+ initDelayedFailover(acknowledgeMode);
+
+ // intentionally receive message but not commit or acknowledge it in
+ // case of transacted or CLIENT_ACK session
+ Message receivedMessage = _consumer.receive(1000l);
+ assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
+
+ failBroker(getFailingPort());
+
+ // test whether session#close blocks while failover is in progress
+ _consumerSession.close();
+
+ assertFailoverException();
+ }
+
+ /**
+ * A helper method to instantiate {@link QueueBrowser} and publish test messages on a test queue for further browsing.
+ *
+ * @param acknowledgeMode session acknowledge mode
+ * @return queue browser
+ * @throws JMSException
+ */
+ private QueueBrowser prepareQueueBrowser(int acknowledgeMode) throws JMSException
+ {
+ init(acknowledgeMode, false);
+ _consumer.close();
+ QueueBrowser browser = _consumerSession.createBrowser((Queue) _destination);
+ _connection.start();
+
+ produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
+ if (acknowledgeMode == Session.SESSION_TRANSACTED)
+ {
+ _producerSession.commit();
+ }
+ return browser;
+ }
+
+ /**
+ * Tests {@link QueueBrowser#close()} for session with given acknowledge mode
+ * to ensure that it blocks until failover implementation restores connection.
+ *
+ * @param acknowledgeMode session acknowledge mode
+ * @throws JMSException
+ */
+ private void browserCloseWhileFailoverImpl(int acknowledgeMode) throws JMSException
+ {
+ setDelayedFailoverPolicy();
+
+ QueueBrowser browser = prepareQueueBrowser(acknowledgeMode);
+
+ @SuppressWarnings("unchecked")
+ Enumeration<Message> messages = browser.getEnumeration();
+ Message receivedMessage = (Message) messages.nextElement();
+ assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
+
+ failBroker(getFailingPort());
+
+ browser.close();
+
+ assertFailoverException();
+ }
+
+ private DelayingFailoverPolicy initDelayedFailover(int acknowledgeMode) throws JMSException
+ {
+ DelayingFailoverPolicy failoverPolicy = setDelayedFailoverPolicy();
+ init(acknowledgeMode, true);
+ produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
+ if (acknowledgeMode == Session.SESSION_TRANSACTED)
+ {
+ _producerSession.commit();
+ }
+ return failoverPolicy;
+ }
+
+ private DelayingFailoverPolicy setDelayedFailoverPolicy()
+ {
+ return setDelayedFailoverPolicy(2);
+ }
+
+ private DelayingFailoverPolicy setDelayedFailoverPolicy(long delay)
+ {
+ AMQConnection amqConnection = (AMQConnection) _connection;
+ DelayingFailoverPolicy failoverPolicy = new DelayingFailoverPolicy(amqConnection, delay);
+ ((AMQConnection) _connection).setFailoverPolicy(failoverPolicy);
+ return failoverPolicy;
+ }
+
}