From 35e75488ec3f1b0048f2948701bcfdc8106d760e Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Mon, 10 Sep 2012 12:39:44 +0000 Subject: QPID-4289: Fix 0-8/0-9/0-9-1 failover issues Applied patch from Philip Harvey and Oleksandr Rudyy git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1382799 13f79535-47bb-0310-9956-ffa450edef68 --- .../client/failover/FailoverBehaviourTest.java | 24 +- .../failover/MultipleBrokersFailoverTest.java | 255 +++++++++++++++++++++ .../qpid/test/utils/InternalBrokerHolder.java | 23 +- .../java/org/apache/qpid/test/utils/TestUtils.java | 54 +++++ 4 files changed, 329 insertions(+), 27 deletions(-) create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestUtils.java (limited to 'qpid/java/systests/src') diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java index 4b766864b4..b0d1750643 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java @@ -1313,7 +1313,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio * @param acknowledgeMode session acknowledge mode * @throws JMSException */ - private void sessionCloseWhileFailoverImpl(int acknowledgeMode) throws JMSException + private void sessionCloseWhileFailoverImpl(int acknowledgeMode) throws Exception { initDelayedFailover(acknowledgeMode); @@ -1324,9 +1324,14 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio failBroker(getFailingPort()); + // wait until failover is started + _failoverStarted.await(5, TimeUnit.SECONDS); + // test whether session#close blocks while failover is in progress _consumerSession.close(); + assertTrue("Failover has not completed yet but session was closed", _failoverComplete.await(5, TimeUnit.SECONDS)); + assertFailoverException(); } @@ -1360,10 +1365,8 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio * @param acknowledgeMode session acknowledge mode * @throws JMSException */ - private void browserCloseWhileFailoverImpl(int acknowledgeMode) throws JMSException + private void browserCloseWhileFailoverImpl(int acknowledgeMode) throws Exception { - setDelayedFailoverPolicy(); - QueueBrowser browser = prepareQueueBrowser(acknowledgeMode); @SuppressWarnings("unchecked") @@ -1373,8 +1376,13 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio failBroker(getFailingPort()); + // wait until failover is started + _failoverStarted.await(5, TimeUnit.SECONDS); + browser.close(); + assertTrue("Failover has not completed yet but browser was closed", _failoverComplete.await(5, TimeUnit.SECONDS)); + assertFailoverException(); } @@ -1402,5 +1410,11 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio ((AMQConnection) _connection).setFailoverPolicy(failoverPolicy); return failoverPolicy; } - + + @Override + public void failBroker(int port) + { + killBroker(port); + } + } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java new file mode 100644 index 0000000000..be21fbb115 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java @@ -0,0 +1,255 @@ +package org.apache.qpid.client.failover; + +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQBrokerDetails; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.server.store.MessageStoreConstants; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.TestUtils; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.util.FileUtils; + +public class MultipleBrokersFailoverTest extends QpidBrokerTestCase implements ConnectionListener +{ + private static final Logger _logger = Logger.getLogger(MultipleBrokersFailoverTest.class); + + private static final String FAILOVER_VIRTUAL_HOST = "failover"; + private static final String NON_FAILOVER_VIRTUAL_HOST = "nonfailover"; + private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'"; + private static final int FAILOVER_RETRIES = 1; + private static final int FAILOVER_CONNECTDELAY = 1000; + private int[] _brokerPorts; + private AMQConnectionURL _connectionURL; + private Connection _connection; + private CountDownLatch _failoverComplete; + private CountDownLatch _failoverStarted; + private Session _consumerSession; + private Destination _destination; + private MessageConsumer _consumer; + private Session _producerSession; + private MessageProducer _producer; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + int port = findFreePort(); + _brokerPorts = new int[4]; + _connectionURL = new AMQConnectionURL("amqp://guest:guest@test/" + FAILOVER_VIRTUAL_HOST + + "?&failover='roundrobin?cyclecount='1''"); + + // we need to create 4 brokers: + // 1st broker will be running in test JVM and will not have failover host (only tcp connection will established, amqp connection will be closed) + // 2d broker will be spawn in separate JVM and should have a failover host (amqp connection should be established) + // 3d broker will be spawn in separate JVM and should not have a failover host (only tcp connection will established, amqp connection will be closed) + // 4d broker will be spawn in separate JVM and should have a failover host (amqp connection should be established) + + // the test should connect to the second broker first and fail over to the forth broker + // after unsuccessful try to establish the connection to the 3d broker + for (int i = 0; i < _brokerPorts.length; i++) + { + if (i > 0) + { + port = getNextAvailable(port + 1); + } + _brokerPorts[i] = port; + + XMLConfiguration testConfiguration = new XMLConfiguration(); + testConfiguration.addProperty("management.enabled", "false"); + + XMLConfiguration testVirtualhosts = new XMLConfiguration(); + String host = null; + if (i == 1 || i == _brokerPorts.length - 1) + { + host = FAILOVER_VIRTUAL_HOST; + } + else + { + host = NON_FAILOVER_VIRTUAL_HOST; + } + testVirtualhosts.addProperty("virtualhost.name", host); + testVirtualhosts.addProperty("virtualhost." + host + ".store.class", getTestProfileMessageStoreClassName()); + testVirtualhosts.addProperty( + "virtualhost." + host + ".store." + MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY, "${QPID_WORK}/" + + host); + + startBroker(port, testConfiguration, testVirtualhosts); + revertSystemProperties(); + + _connectionURL.addBrokerDetails(new AMQBrokerDetails(String.format(BROKER_PORTION_FORMAT, port, + FAILOVER_CONNECTDELAY, FAILOVER_RETRIES))); + } + _connection = getConnection(_connectionURL); + ((AMQConnection) _connection).setConnectionListener(this); + _failoverComplete = new CountDownLatch(1); + _failoverStarted = new CountDownLatch(1); + } + + public void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + for (int i = 0; i < _brokerPorts.length; i++) + { + if (_brokerPorts[i] > 0) + { + stopBrokerSafely(_brokerPorts[i]); + FileUtils.deleteDirectory(System.getProperty("QPID_WORK") + "/" + getFailingPort()); + } + } + + } + } + + public void startBroker() throws Exception + { + // noop, stop starting broker in super.tearDown() + } + + public void testFailoverOnBrokerKill() throws Exception + { + init(Session.SESSION_TRANSACTED, true); + assertConnectionPort(_brokerPorts[1]); + + assertSendReceive(0); + + killBroker(_brokerPorts[1]); + + awaitForFailoverCompletion(FAILOVER_CONNECTDELAY * _brokerPorts.length * 2); + assertEquals("Failover is not started as expected", 0, _failoverStarted.getCount()); + + assertSendReceive(2); + assertConnectionPort(_brokerPorts[_brokerPorts.length - 1]); + } + + public void testFailoverOnBrokerStop() throws Exception + { + init(Session.SESSION_TRANSACTED, true); + assertConnectionPort(_brokerPorts[1]); + + assertSendReceive(0); + + stopBroker(_brokerPorts[1]); + + awaitForFailoverCompletion(FAILOVER_CONNECTDELAY * _brokerPorts.length * 2); + assertEquals("Failover is not started as expected", 0, _failoverStarted.getCount()); + + assertSendReceive(1); + assertConnectionPort(_brokerPorts[_brokerPorts.length - 1]); + } + + private void assertConnectionPort(int brokerPort) + { + int connectionPort = ((AMQConnection)_connection).getActiveBrokerDetails().getPort(); + assertEquals("Unexpected broker port", brokerPort, connectionPort); + } + + private void assertSendReceive(int index) throws JMSException + { + Message message = createNextMessage(_producerSession, index); + _producer.send(message); + if (_producerSession.getTransacted()) + { + _producerSession.commit(); + } + Message receivedMessage = _consumer.receive(1000l); + assertReceivedMessage(receivedMessage, index); + if (_consumerSession.getTransacted()) + { + _consumerSession.commit(); + } + } + + private void awaitForFailoverCompletion(long delay) + { + _logger.info("Awaiting Failover completion.."); + try + { + if (!_failoverComplete.await(delay, TimeUnit.MILLISECONDS)) + { + _logger.warn("Test thread stack:\n\n" + TestUtils.dumpThreads()); + fail("Failover did not complete"); + } + } + catch (InterruptedException e) + { + fail("Test was interrupted:" + e.getMessage()); + } + } + + private void assertReceivedMessage(Message receivedMessage, int messageIndex) + { + assertNotNull("Expected message [" + messageIndex + "] is not received!", receivedMessage); + assertTrue( + "Failure to receive message [" + messageIndex + "], expected TextMessage but received " + receivedMessage, + receivedMessage instanceof TextMessage); + } + + private void init(int acknowledgeMode, boolean startConnection) throws JMSException + { + boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED ? true : false; + + _consumerSession = _connection.createSession(isTransacted, acknowledgeMode); + _destination = _consumerSession.createQueue(getTestQueueName()); + _consumer = _consumerSession.createConsumer(_destination); + + if (startConnection) + { + _connection.start(); + } + + _producerSession = _connection.createSession(isTransacted, acknowledgeMode); + _producer = _producerSession.createProducer(_destination); + + } + + @Override + public void bytesSent(long count) + { + } + + @Override + public void bytesReceived(long count) + { + } + + @Override + public boolean preFailover(boolean redirect) + { + _failoverStarted.countDown(); + return true; + } + + @Override + public boolean preResubscribe() + { + return true; + } + + @Override + public void failoverComplete() + { + _failoverComplete.countDown(); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java index a71a4ef517..c5e8a08ec6 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java @@ -82,28 +82,7 @@ public class InternalBrokerHolder implements BrokerHolder @Override public String dumpThreads() { - ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); - ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true); - StringBuilder dump = new StringBuilder(); - dump.append(String.format("%n")); - for (ThreadInfo threadInfo : threadInfos) - { - dump.append(threadInfo); - } - - long[] deadLocks = threadMXBean.findDeadlockedThreads(); - if (deadLocks != null && deadLocks.length > 0) - { - ThreadInfo[] deadlockedThreads = threadMXBean.getThreadInfo(deadLocks); - dump.append(String.format("%n")); - dump.append("Deadlock is detected!"); - dump.append(String.format("%n")); - for (ThreadInfo threadInfo : deadlockedThreads) - { - dump.append(threadInfo); - } - } - return dump.toString(); + return TestUtils.dumpThreads(); } @Override diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestUtils.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestUtils.java new file mode 100644 index 0000000000..c651d3ec7f --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestUtils.java @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.utils; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; + +public class TestUtils +{ + public static String dumpThreads() + { + ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true); + StringBuilder dump = new StringBuilder(); + dump.append(String.format("%n")); + for (ThreadInfo threadInfo : threadInfos) + { + dump.append(threadInfo); + } + + long[] deadLocks = threadMXBean.findDeadlockedThreads(); + if (deadLocks != null && deadLocks.length > 0) + { + ThreadInfo[] deadlockedThreads = threadMXBean.getThreadInfo(deadLocks); + dump.append(String.format("%n")); + dump.append("Deadlock is detected!"); + dump.append(String.format("%n")); + for (ThreadInfo threadInfo : deadlockedThreads) + { + dump.append(threadInfo); + } + } + return dump.toString(); + } +} -- cgit v1.2.1