diff options
Diffstat (limited to 'java/systests/src')
2 files changed, 288 insertions, 2 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java b/java/systests/src/main/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java new file mode 100644 index 0000000000..980fc7285d --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java @@ -0,0 +1,14 @@ +package org.apache.qpid.client.failover; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Session; + +public class AddressBasedFailoverBehaviourTest extends FailoverBehaviourTest +{ + @Override + protected Destination createDestination(Session session) throws JMSException + { + return session.createQueue("ADDR:" +getTestQueueName() + "_" + System.currentTimeMillis() + "; {create: always}"); + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java b/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java index 66f8fe0546..a5b9c618bc 100644 --- a/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java +++ b/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java @@ -20,6 +20,7 @@ package org.apache.qpid.client.failover; import java.text.MessageFormat; import java.util.ArrayList; +import java.util.Enumeration; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -33,12 +34,15 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.TransactionRolledBackException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.test.utils.FailoverBaseCase; /** @@ -96,6 +100,11 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio */ private JMSException _exceptionListenerException; + /** + * Latch to check that failover mutex is hold by a failover thread + */ + private CountDownLatch _failoverStarted; + @Override protected void setUp() throws Exception { @@ -105,6 +114,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio _connection.setExceptionListener(this); ((AMQConnection) _connection).setConnectionListener(this); _failoverComplete = new CountDownLatch(1); + _failoverStarted = new CountDownLatch(1); } /** @@ -625,8 +635,134 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio sessionCloseAfterFailoverImpl(Session.AUTO_ACKNOWLEDGE); } + public void testPublishAutoAcknowledgedWhileFailover() throws Exception + { + publishWhileFailingOver(Session.AUTO_ACKNOWLEDGE); + } + + public void testPublishClientAcknowledgedWhileFailover() throws Exception + { + Message receivedMessage = publishWhileFailingOver(Session.CLIENT_ACKNOWLEDGE); + receivedMessage.acknowledge(); + } + + public void testPublishTransactedAcknowledgedWhileFailover() throws Exception + { + publishWhileFailingOver(Session.SESSION_TRANSACTED); + _consumerSession.commit(); + } + + public void testPublishAutoAcknowledgedWithFailoverMutex() throws Exception + { + publishWithFailoverMutex(Session.AUTO_ACKNOWLEDGE); + } + + public void testPublishClientAcknowledgedWithFailoverMutex() throws Exception + { + publishWithFailoverMutex(Session.CLIENT_ACKNOWLEDGE); + + } + + public void testPublishTransactedAcknowledgedWithFailoverMutex() throws Exception + { + publishWithFailoverMutex(Session.SESSION_TRANSACTED); + } + + public void testClientAcknowledgedSessionCloseWhileFailover() throws Exception + { + sessionCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE); + } + + public void testTransactedSessionCloseWhileFailover() throws Exception + { + sessionCloseWhileFailoverImpl(Session.SESSION_TRANSACTED); + } + + public void testAutoAcknowledgedSessionCloseWhileFailover() throws Exception + { + sessionCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE); + } + + public void testClientAcknowledgedQueueBrowserCloseWhileFailover() throws Exception + { + browserCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE); + } + + public void testTransactedQueueBrowserCloseWhileFailover() throws Exception + { + browserCloseWhileFailoverImpl(Session.SESSION_TRANSACTED); + } + + public void testAutoAcknowledgedQueueBrowserCloseWhileFailover() throws Exception + { + browserCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE); + } + + private Message publishWhileFailingOver(int autoAcknowledge) throws JMSException, InterruptedException + { + setDelayedFailoverPolicy(5); + init(autoAcknowledge, true); + + String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0); + Message message = _producerSession.createTextMessage(text); + + failBroker(getFailingPort()); + + if(!_failoverStarted.await(5, TimeUnit.SECONDS)) + { + fail("Did not receieve notification failover had started"); + } + + _producer.send(message); + + if (_producerSession.getTransacted()) + { + _producerSession.commit(); + } + + Message receivedMessage = _consumer.receive(1000l); + assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0); + return receivedMessage; + } + + private void publishWithFailoverMutex(int autoAcknowledge) throws JMSException, InterruptedException + { + setDelayedFailoverPolicy(5); + init(autoAcknowledge, true); + + String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0); + Message message = _producerSession.createTextMessage(text); + + AMQConnection connection = (AMQConnection)_connection; + + // holding failover mutex should prevent the failover from + // proceeding before we try to send the message + synchronized(connection.getFailoverMutex()) + { + failBroker(getFailingPort()); + + // wait to make sure that connection is lost + while(!connection.isFailingOver()) + { + Thread.sleep(25l); + } + + try + { + _producer.send(message); + fail("Sending should fail because connection was lost and failover has not yet completed"); + } + catch(JMSException e) + { + // JMSException is expected + } + } + // wait for failover completion, thus ensuring it actually + //got started, before allowing the test to tear down + awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME); + } /** - * Tests {@link Session#close()} for session with given acknowledge mode + * Tests {@link Session#close()} for session with given acknowledge mode * to ensure that close works after failover. * * @param acknowledgeMode session acknowledge mode @@ -671,7 +807,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED ? true : false; _consumerSession = _connection.createSession(isTransacted, acknowledgeMode); - _destination = _consumerSession.createQueue(getTestQueueName() + "_" + System.currentTimeMillis()); + _destination = createDestination(_consumerSession); _consumer = _consumerSession.createConsumer(_destination); if (startConnection) @@ -684,6 +820,11 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio } + protected Destination createDestination(Session session) throws JMSException + { + return session.createQueue(getTestQueueName() + "_" + System.currentTimeMillis()); + } + /** * Resends messages if reconnected to a non-clustered broker * @@ -879,6 +1020,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio @Override public boolean preFailover(boolean redirect) { + _failoverStarted.countDown(); return true; } @@ -900,6 +1042,39 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio _exceptionListenerException = e; } + /** + * Causes 1 second delay before reconnect in order to test whether JMS + * methods block while failover is in progress + */ + private static class DelayingFailoverPolicy extends FailoverPolicy + { + + private CountDownLatch _suspendLatch; + private long _delay; + + public DelayingFailoverPolicy(AMQConnection connection, long delay) + { + super(connection.getConnectionURL(), connection); + _suspendLatch = new CountDownLatch(1); + _delay = delay; + } + + public void attainedConnection() + { + try + { + _suspendLatch.await(_delay, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + // continue + } + super.attainedConnection(); + } + + } + + private class FailoverTestMessageListener implements MessageListener { // message counter @@ -946,4 +1121,101 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio return _counter.get(); } } + + /** + * Tests {@link Session#close()} for session with given acknowledge mode + * to ensure that it blocks until failover implementation restores connection. + * + * @param acknowledgeMode session acknowledge mode + * @throws JMSException + */ + private void sessionCloseWhileFailoverImpl(int acknowledgeMode) throws JMSException + { + initDelayedFailover(acknowledgeMode); + + // intentionally receive message but not commit or acknowledge it in + // case of transacted or CLIENT_ACK session + Message receivedMessage = _consumer.receive(1000l); + assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0); + + failBroker(getFailingPort()); + + // test whether session#close blocks while failover is in progress + _consumerSession.close(); + + assertFailoverException(); + } + + /** + * A helper method to instantiate {@link QueueBrowser} and publish test messages on a test queue for further browsing. + * + * @param acknowledgeMode session acknowledge mode + * @return queue browser + * @throws JMSException + */ + private QueueBrowser prepareQueueBrowser(int acknowledgeMode) throws JMSException + { + init(acknowledgeMode, false); + _consumer.close(); + QueueBrowser browser = _consumerSession.createBrowser((Queue) _destination); + _connection.start(); + + produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false); + if (acknowledgeMode == Session.SESSION_TRANSACTED) + { + _producerSession.commit(); + } + return browser; + } + + /** + * Tests {@link QueueBrowser#close()} for session with given acknowledge mode + * to ensure that it blocks until failover implementation restores connection. + * + * @param acknowledgeMode session acknowledge mode + * @throws JMSException + */ + private void browserCloseWhileFailoverImpl(int acknowledgeMode) throws JMSException + { + setDelayedFailoverPolicy(); + + QueueBrowser browser = prepareQueueBrowser(acknowledgeMode); + + @SuppressWarnings("unchecked") + Enumeration<Message> messages = browser.getEnumeration(); + Message receivedMessage = (Message) messages.nextElement(); + assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0); + + failBroker(getFailingPort()); + + browser.close(); + + assertFailoverException(); + } + + private DelayingFailoverPolicy initDelayedFailover(int acknowledgeMode) throws JMSException + { + DelayingFailoverPolicy failoverPolicy = setDelayedFailoverPolicy(); + init(acknowledgeMode, true); + produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false); + if (acknowledgeMode == Session.SESSION_TRANSACTED) + { + _producerSession.commit(); + } + return failoverPolicy; + } + + private DelayingFailoverPolicy setDelayedFailoverPolicy() + { + return setDelayedFailoverPolicy(2); + } + + private DelayingFailoverPolicy setDelayedFailoverPolicy(long delay) + { + AMQConnection amqConnection = (AMQConnection) _connection; + DelayingFailoverPolicy failoverPolicy = new DelayingFailoverPolicy(amqConnection, delay); + ((AMQConnection) _connection).setFailoverPolicy(failoverPolicy); + return failoverPolicy; + } + } |
