diff options
| author | Keith Wall <kwall@apache.org> | 2014-08-13 06:45:31 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-08-13 06:45:31 +0000 |
| commit | 43215591cd093a6c1e6c46bc59e35244016a105d (patch) | |
| tree | a000fc682e72653563c3cc780ea6ab8da7dac256 /qpid/java | |
| parent | efc7384e66bc77ef7f0a4621451319b4f2f00c78 (diff) | |
| download | qpid-python-43215591cd093a6c1e6c46bc59e35244016a105d.tar.gz | |
QPID-5991: [Java System Tests] Rename tests to have better structure; Remove some duplication between tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1617668 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
8 files changed, 175 insertions, 342 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index d18dec3825..d045ae01fa 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -95,7 +95,7 @@ import org.apache.qpid.server.util.DaemonThreadFactory; public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener { public static final String MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.master_transfer_interval"; - public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.db_ping_socket_timeout"; + public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.replication.db_ping_socket_timeout"; public static final String REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME = "qpid.bdb.ha.remote_node_monitor_interval"; private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacade.class); diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java deleted file mode 100644 index ef5cc7c464..0000000000 --- a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java +++ /dev/null @@ -1,250 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.File; -import java.util.Set; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Queue; -import javax.jms.Session; - -import org.apache.log4j.Logger; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.url.URLSyntaxException; - -/** - * The HA white box tests test the BDB cluster where the test retains the knowledge of the - * individual test nodes. It uses this knowledge to examine the nodes to ensure that they - * remain in the correct state throughout the test. - * - * @see HAClusterBlackboxTest - */ -public class HAClusterWhiteboxTest extends QpidBrokerTestCase -{ - protected static final Logger LOGGER = Logger.getLogger(HAClusterWhiteboxTest.class); - - private static final String VIRTUAL_HOST = "test"; - - private final int NUMBER_OF_NODES = 3; - private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); - - @Override - protected void setUp() throws Exception - { - _brokerType = BrokerType.SPAWNED; - - assertTrue(isJavaBroker()); - assertTrue(isBrokerStorePersistent()); - - setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); - - _clusterCreator.configureClusterNodes(); - _clusterCreator.startCluster(); - - super.setUp(); - } - - @Override - public void startBroker() throws Exception - { - // Don't start default broker provided by QBTC. - } - - public void testClusterPermitsConnectionToOnlyOneNode() throws Exception - { - int connectionSuccesses = 0; - int connectionFails = 0; - - for (int brokerPortNumber : getBrokerPortNumbers()) - { - try - { - getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithoutRetry(brokerPortNumber)); - connectionSuccesses++; - } - catch(JMSException e) - { - assertTrue(e.getMessage().contains("Virtual host '" + VIRTUAL_HOST + "' is not active")); - connectionFails++; - } - } - - assertEquals("Unexpected number of failed connections", NUMBER_OF_NODES - 1, connectionFails); - assertEquals("Unexpected number of successful connections", 1, connectionSuccesses); - } - - public void testClusterThatLosesNodeStillAllowsConnection() throws Exception - { - final Connection initialConnection = getConnectionToNodeInCluster(); - assertNotNull(initialConnection); - - closeConnectionAndKillBroker(initialConnection); - - final Connection subsequentConnection = getConnectionToNodeInCluster(); - assertNotNull(subsequentConnection); - - // verify that JMS persistence operations are working - assertProducingConsuming(subsequentConnection); - - closeConnection(initialConnection); - } - - public void testClusterThatLosesAllButOneNodeRefusesConnection() throws Exception - { - final Connection initialConnection = getConnectionToNodeInCluster(); - assertNotNull(initialConnection); - - closeConnectionAndKillBroker(initialConnection); - - final Connection subsequentConnection = getConnectionToNodeInCluster(); - assertNotNull(subsequentConnection); - final int subsequentPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(subsequentConnection); - - killBroker(subsequentPortNumber); - - final Connection finalConnection = getConnectionToNodeInCluster(); - assertNull(finalConnection); - - closeConnection(initialConnection); - } - - public void testClusterWithRestartedNodeStillAllowsConnection() throws Exception - { - final Connection connection = getConnectionToNodeInCluster(); - assertNotNull(connection); - - final int brokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(connection); - connection.close(); - - _clusterCreator.stopNode(brokerPortNumber); - _clusterCreator.startNode(brokerPortNumber); - - final Connection subsequentConnection = getConnectionToNodeInCluster(); - assertNotNull(subsequentConnection); - } - - public void testClusterLosingNodeRetainsData() throws Exception - { - final Connection initialConnection = getConnectionToNodeInCluster(); - - final String queueNamePrefix = getTestQueueName(); - final String inbuiltExchangeQueueUrl = "direct://amq.direct/" + queueNamePrefix + "1/" + queueNamePrefix + "1?durable='true'"; - final String customExchangeQueueUrl = "direct://my.exchange/" + queueNamePrefix + "2/" + queueNamePrefix + "2?durable='true'"; - - populateBrokerWithData(initialConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl); - - closeConnectionAndKillBroker(initialConnection); - - final Connection subsequentConnection = getConnectionToNodeInCluster(); - - assertNotNull("no valid connection obtained", subsequentConnection); - - checkBrokerData(subsequentConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl); - } - - public void xtestRecoveryOfOutOfDateNode() throws Exception - { - /* - * TODO: Implement - * - * Cant yet find a way to control cleaning in a deterministic way to allow provoking - * a node to become out of date. We do now know that even a new joiner to the group - * can throw the InsufficientLogException, so ensuring an existing cluster of nodes has - * done *any* cleaning and then adding a new node should be sufficient to cause this. - */ - } - - private void populateBrokerWithData(final Connection connection, final String... queueUrls) throws JMSException, Exception - { - populateBrokerWithData(connection, 1, queueUrls); - } - - private void populateBrokerWithData(final Connection connection, int noOfMessages, final String... queueUrls) throws JMSException, Exception - { - final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - for (final String queueUrl : queueUrls) - { - final Queue queue = session.createQueue(queueUrl); - session.createConsumer(queue).close(); - sendMessage(session, queue, noOfMessages); - } - } - - private void checkBrokerData(final Connection connection, final String... queueUrls) throws JMSException - { - connection.start(); - final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - for (final String queueUrl : queueUrls) - { - final Queue queue = session.createQueue(queueUrl); - final MessageConsumer consumer = session.createConsumer(queue); - final Message message = consumer.receive(1000); - session.commit(); - assertNotNull("Queue " + queue + " should have message", message); - assertEquals("Queue " + queue + " message has unexpected content", 0, message.getIntProperty(INDEX)); - } - } - - private Connection getConnectionToNodeInCluster() throws URLSyntaxException - { - Connection connection = null; - Set<Integer> runningBrokerPorts = getBrokerPortNumbers(); - - for (int brokerPortNumber : runningBrokerPorts) - { - try - { - connection = getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithRetry(brokerPortNumber)); - break; - } - catch(JMSException je) - { - assertTrue(je.getMessage().contains("Virtual host '" + VIRTUAL_HOST + "' is not active")); - } - } - return connection; - } - - private void closeConnectionAndKillBroker(final Connection initialConnection) throws Exception - { - final int initialPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection); - initialConnection.close(); - - killBroker(initialPortNumber); // kill awaits the death of the child - } - - private void closeConnection(final Connection initialConnection) - { - try - { - initialConnection.close(); - } - catch(Exception e) - { - // ignore. - // java.net.SocketException is seen sometimes on active connection - } - } -} diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java index 0791637ba3..301375d0fb 100644 --- a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.server.store.berkeleydb; +package org.apache.qpid.server.store.berkeleydb.replication; import java.io.File; import java.io.IOException; @@ -39,7 +39,6 @@ import org.apache.qpid.server.model.RemoteReplicationNode; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; -import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost; import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode; import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode; @@ -330,7 +329,7 @@ public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase nodeData.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, NODE1); Map<String,String> context = new HashMap<>(); nodeData.put(BDBHAVirtualHostNode.CONTEXT, context); - String bluePrint = HATestClusterCreator.getBlueprint("localhost", _node1HaPort, _node2HaPort, _node3HaPort); + String bluePrint = GroupCreator.getBlueprint("localhost", _node1HaPort, _node2HaPort, _node3HaPort); context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR, bluePrint); return nodeData; } diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java index 334544e334..07ce033a55 100644 --- a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.server.store.berkeleydb; +package org.apache.qpid.server.store.berkeleydb.replication; import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY; import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY; @@ -34,7 +34,6 @@ import javax.servlet.http.HttpServletResponse; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; -import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode; import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; import org.apache.qpid.systest.rest.Asserts; @@ -60,7 +59,7 @@ public class BDBHAVirtualHostRestTest extends QpidRestTestCase _storeBaseDir = new File(TMP_FOLDER, "store-" + _hostName + "-" + System.currentTimeMillis()); _nodeHaPort = getNextAvailable(getRestTestHelper().getHttpPort() + 1); _virtualhostUrl = "virtualhost/" + _nodeName + "/" + _hostName; - _bluePrint = HATestClusterCreator.getBlueprint("localhost", _nodeHaPort); + _bluePrint = GroupCreator.getBlueprint("localhost", _nodeHaPort); super.setUp(); } diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java index ebc32b482a..e78ef34759 100644 --- a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java @@ -17,7 +17,7 @@ * under the License. * */ -package org.apache.qpid.server.store.berkeleydb; +package org.apache.qpid.server.store.berkeleydb.replication; import java.io.File; import java.io.IOException; @@ -46,6 +46,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.server.management.plugin.HttpManagement; import org.apache.qpid.server.model.Plugin; import org.apache.qpid.server.model.Port; @@ -67,9 +68,9 @@ import org.junit.Assert; import com.sleepycat.je.rep.ReplicationConfig; -public class HATestClusterCreator +public class GroupCreator { - protected static final Logger LOGGER = Logger.getLogger(HATestClusterCreator.class); + protected static final Logger LOGGER = Logger.getLogger(GroupCreator.class); private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''"; private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'"; @@ -94,7 +95,7 @@ public class HATestClusterCreator private int _bdbHelperPort; private int _primaryBrokerPort; - public HATestClusterCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes) + public GroupCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes) { _testcase = testcase; _virtualHostName = virtualHostName; @@ -291,7 +292,12 @@ public class HATestClusterCreator return new HashSet<Integer>(_brokerPortToBdbPortMap.values()); } - public AMQConnectionURL getConnectionUrlForAllClusterNodes() throws Exception + public ConnectionURL getConnectionUrlForAllClusterNodes() throws Exception + { + return getConnectionUrlForAllClusterNodes(FAILOVER_CONNECTDELAY, FAILOVER_RETRIES, FAILOVER_CYCLECOUNT); + } + + public ConnectionURL getConnectionUrlForAllClusterNodes(int connectDelay, int retries, final int cyclecount) throws Exception { final StringBuilder brokerList = new StringBuilder(); @@ -299,14 +305,14 @@ public class HATestClusterCreator { int brokerPortNumber = itr.next(); - brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, FAILOVER_CONNECTDELAY, FAILOVER_RETRIES)); + brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, connectDelay, retries)); if (itr.hasNext()) { brokerList.append(";"); } } - return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, FAILOVER_CYCLECOUNT)); + return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, cyclecount)); } public AMQConnectionURL getConnectionUrlForSingleNodeWithoutRetry(final int brokerPortNumber) throws URLSyntaxException @@ -434,7 +440,7 @@ public class HATestClusterCreator int status = restHelper.submitRequest(url, "PUT", attributeMap); if (status != 200) { - throw new Exception("Unexpected http status when updating " + getNodeNameForBrokerPort(remoteNodePort) + " attribute's : " + status); + throw new Exception("Unexpected http status when updating " + getNodeNameForBrokerPort(remoteNodePort) + " attribute(s) : " + status); } } diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/JMXManagementTest.java index 0ab10cc318..c6f005c0e7 100644 --- a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/JMXManagementTest.java @@ -17,7 +17,7 @@ * under the License. * */ -package org.apache.qpid.server.store.berkeleydb; +package org.apache.qpid.server.store.berkeleydb.replication; import static com.sleepycat.je.rep.ReplicatedEnvironment.State.DETACHED; import static com.sleepycat.je.rep.ReplicatedEnvironment.State.MASTER; @@ -52,11 +52,11 @@ import org.junit.Assert; /** * System test verifying the ability to control a cluster via the Management API. * - * @see HAClusterBlackboxTest + * @see MultiNodeTest */ -public class HAClusterManagementTest extends QpidBrokerTestCase +public class JMXManagementTest extends QpidBrokerTestCase { - protected static final Logger LOGGER = Logger.getLogger(HAClusterManagementTest.class); + protected static final Logger LOGGER = Logger.getLogger(JMXManagementTest.class); private static final Set<String> NON_MASTER_STATES = new HashSet<String>(Arrays.asList(REPLICA.toString(), DETACHED.toString(), UNKNOWN.toString()));; private static final String VIRTUAL_HOST = "test"; @@ -64,7 +64,7 @@ public class HAClusterManagementTest extends QpidBrokerTestCase private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(VIRTUAL_HOST); private static final int NUMBER_OF_NODES = 4; - private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + private final GroupCreator _clusterCreator = new GroupCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); private final JMXTestUtils _jmxUtils = new JMXTestUtils(this); private ConnectionURL _brokerFailoverUrl; diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java index 9867ce2eca..d6ba419de1 100644 --- a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java @@ -17,17 +17,20 @@ * under the License. * */ -package org.apache.qpid.server.store.berkeleydb; +package org.apache.qpid.server.store.berkeleydb.replication; import java.io.File; import java.util.Collections; -import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.log4j.Logger; @@ -41,20 +44,23 @@ import org.apache.qpid.test.utils.TestUtils; /** * The HA black box tests test the BDB cluster as a opaque unit. Client connects to * the cluster via a failover url - * - * @see HAClusterWhiteboxTest */ -public class HAClusterBlackboxTest extends QpidBrokerTestCase +public class MultiNodeTest extends QpidBrokerTestCase { - protected static final Logger LOGGER = Logger.getLogger(HAClusterBlackboxTest.class); + protected static final Logger LOGGER = Logger.getLogger(MultiNodeTest.class); private static final String VIRTUAL_HOST = "test"; private static final int NUMBER_OF_NODES = 3; - private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + private final GroupCreator _groupCreator = new GroupCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); private FailoverAwaitingListener _failoverListener; - private ConnectionURL _brokerFailoverUrl; + + /** Used when expectation is client will (re)-connect */ + private ConnectionURL _positiveFailoverUrl; + + /** Used when expectation is client will not (re)-connect */ + private ConnectionURL _negativeFailoverUrl; @Override protected void setUp() throws Exception @@ -66,11 +72,12 @@ public class HAClusterBlackboxTest extends QpidBrokerTestCase setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); - _clusterCreator.configureClusterNodes(); + _groupCreator.configureClusterNodes(); - _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); + _positiveFailoverUrl = _groupCreator.getConnectionUrlForAllClusterNodes(); + _negativeFailoverUrl = _groupCreator.getConnectionUrlForAllClusterNodes(200, 0, 2); - _clusterCreator.startCluster(); + _groupCreator.startCluster(); _failoverListener = new FailoverAwaitingListener(); super.setUp(); @@ -84,14 +91,14 @@ public class HAClusterBlackboxTest extends QpidBrokerTestCase public void testLossOfMasterNodeCausesClientToFailover() throws Exception { - final Connection connection = getConnection(_brokerFailoverUrl); + final Connection connection = getConnection(_positiveFailoverUrl); ((AMQConnection)connection).setConnectionListener(_failoverListener); - final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); LOGGER.info("Active connection port " + activeBrokerPort); - _clusterCreator.stopNode(activeBrokerPort); + _groupCreator.stopNode(activeBrokerPort); LOGGER.info("Node is stopped"); _failoverListener.awaitFailoverCompletion(20000); LOGGER.info("Listener has finished"); @@ -101,103 +108,175 @@ public class HAClusterBlackboxTest extends QpidBrokerTestCase public void testLossOfReplicaNodeDoesNotCauseClientToFailover() throws Exception { - LOGGER.info("Connecting to " + _brokerFailoverUrl); - final Connection connection = getConnection(_brokerFailoverUrl); - LOGGER.info("Got connection to cluster"); + final Connection connection = getConnection(_positiveFailoverUrl); ((AMQConnection)connection).setConnectionListener(_failoverListener); - final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); LOGGER.info("Active connection port " + activeBrokerPort); - final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection); + final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection); LOGGER.info("Stopping inactive broker on port " + inactiveBrokerPort); - _clusterCreator.stopNode(inactiveBrokerPort); + _groupCreator.stopNode(inactiveBrokerPort); _failoverListener.assertNoFailoverCompletionWithin(2000); - // any op to ensure connection remains - connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertProducingConsuming(connection); + } + + public void testLossOfQuorumCausesClientDisconnection() throws Exception + { + final Connection connection = getConnection(_negativeFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverListener); + + Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes(); + + final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); + ports.remove(activeBrokerPort); + + // Stop all other nodes + for (Integer p : ports) + { + _groupCreator.stopNode(p); + } + + try + { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(getTestQueueName()); + session.createConsumer(destination).close(); + fail("Exception not thrown - creating durable queue should fail without quorum"); + } + catch(JMSException jms) + { + // PASS + } + + // New connections should now fail as vhost will be unavailable + try + { + getConnection(_negativeFailoverUrl); + fail("Exception not thrown"); + } + catch (JMSException je) + { + // PASS + } + } + + public void testPersistentMessagesAvailableAfterFailover() throws Exception + { + final Connection connection = getConnection(_positiveFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverListener); + + final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); + + Session producingSession = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination queue = producingSession.createQueue(getTestQueueName()); + producingSession.createConsumer(queue).close(); + sendMessage(producingSession, queue, 10); + + _groupCreator.stopNode(activeBrokerPort); + LOGGER.info("Old master (broker port " + activeBrokerPort + ") is stopped"); + + _failoverListener.awaitFailoverCompletion(20000); + LOGGER.info("Failover has finished"); + + final int activeBrokerPortAfterFailover = _groupCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("New master (broker port " + activeBrokerPort + ") after failover"); + + Session consumingSession = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = consumingSession.createConsumer(queue); + + connection.start(); + for(int i = 0; i < 10; i++) + { + Message m = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message " + i + " is not received", m); + assertEquals("Unexpected message received", i, m.getIntProperty(INDEX)); + } + consumingSession.commit(); } public void testTransferMasterFromLocalNode() throws Exception { - final Connection connection = getConnection(_brokerFailoverUrl); + final Connection connection = getConnection(_positiveFailoverUrl); ((AMQConnection)connection).setConnectionListener(_failoverListener); - final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); LOGGER.info("Active connection port " + activeBrokerPort); - final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection); + final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection); LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort); - Map<String, Object> attributes = _clusterCreator.getNodeAttributes(inactiveBrokerPort); + Map<String, Object> attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort); assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE)); - _clusterCreator.setNodeAttributes(inactiveBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER")); + _groupCreator.setNodeAttributes(inactiveBrokerPort, + Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER")); _failoverListener.awaitFailoverCompletion(20000); LOGGER.info("Listener has finished"); - attributes = _clusterCreator.getNodeAttributes(inactiveBrokerPort); + attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort); assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE)); assertProducingConsuming(connection); - _clusterCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA"); + _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA"); } public void testTransferMasterFromRemoteNode() throws Exception { - final Connection connection = getConnection(_brokerFailoverUrl); + final Connection connection = getConnection(_positiveFailoverUrl); ((AMQConnection)connection).setConnectionListener(_failoverListener); - final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); LOGGER.info("Active connection port " + activeBrokerPort); - final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection); + final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection); LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort); - _clusterCreator.awaitNodeToAttainRole(activeBrokerPort, inactiveBrokerPort, "REPLICA"); - Map<String, Object> attributes = _clusterCreator.getNodeAttributes(activeBrokerPort, inactiveBrokerPort); + _groupCreator.awaitNodeToAttainRole(activeBrokerPort, inactiveBrokerPort, "REPLICA"); + Map<String, Object> attributes = _groupCreator.getNodeAttributes(activeBrokerPort, inactiveBrokerPort); assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE)); - _clusterCreator.setNodeAttributes(activeBrokerPort, inactiveBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER")); + _groupCreator.setNodeAttributes(activeBrokerPort, inactiveBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER")); _failoverListener.awaitFailoverCompletion(20000); LOGGER.info("Listener has finished"); - attributes = _clusterCreator.getNodeAttributes(inactiveBrokerPort); + attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort); assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE)); assertProducingConsuming(connection); - _clusterCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA"); + _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA"); } public void testQuorumOverride() throws Exception { - final Connection connection = getConnection(_brokerFailoverUrl); - - ((AMQConnection)connection).setConnectionListener(_failoverListener); + final Connection connection = getConnection(_positiveFailoverUrl); - Set<Integer> ports = _clusterCreator.getBrokerPortNumbersForNodes(); + Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes(); - final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); ports.remove(activeBrokerPort); // Stop all other nodes for (Integer p : ports) { - _clusterCreator.stopNode(p); + _groupCreator.stopNode(p); } - Map<String, Object> attributes = _clusterCreator.getNodeAttributes(activeBrokerPort); + Map<String, Object> attributes = _groupCreator.getNodeAttributes(activeBrokerPort); assertEquals("Broker has unexpected quorum override", new Integer(0), attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE)); - _clusterCreator.setNodeAttributes(activeBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 1)); + _groupCreator.setNodeAttributes(activeBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 1)); - attributes = _clusterCreator.getNodeAttributes(activeBrokerPort); + attributes = _groupCreator.getNodeAttributes(activeBrokerPort); assertEquals("Broker has unexpected quorum override", new Integer(1), attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE)); assertProducingConsuming(connection); @@ -205,24 +284,24 @@ public class HAClusterBlackboxTest extends QpidBrokerTestCase public void testPriority() throws Exception { - final Connection connection = getConnection(_brokerFailoverUrl); + final Connection connection = getConnection(_positiveFailoverUrl); ((AMQConnection)connection).setConnectionListener(_failoverListener); - final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); LOGGER.info("Active connection port " + activeBrokerPort); int priority = 1; Integer highestPriorityBrokerPort = null; - Set<Integer> ports = _clusterCreator.getBrokerPortNumbersForNodes(); + Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes(); for (Integer port : ports) { if (activeBrokerPort != port.intValue()) { priority = priority + 1; highestPriorityBrokerPort = port; - _clusterCreator.setNodeAttributes(port, port, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PRIORITY, priority)); - Map<String, Object> attributes = _clusterCreator.getNodeAttributes(port, port); + _groupCreator.setNodeAttributes(port, port, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PRIORITY, priority)); + Map<String, Object> attributes = _groupCreator.getNodeAttributes(port, port); assertEquals("Broker has unexpected priority", priority, attributes.get(BDBHAVirtualHostNode.PRIORITY)); } } @@ -230,12 +309,12 @@ public class HAClusterBlackboxTest extends QpidBrokerTestCase LOGGER.info("Broker on port " + highestPriorityBrokerPort + " has the highest priority of " + priority); LOGGER.info("Shutting down the MASTER"); - _clusterCreator.stopNode(activeBrokerPort); + _groupCreator.stopNode(activeBrokerPort); _failoverListener.awaitFailoverCompletion(20000); LOGGER.info("Listener has finished"); - Map<String, Object> attributes = _clusterCreator.getNodeAttributes(highestPriorityBrokerPort, highestPriorityBrokerPort); + Map<String, Object> attributes = _groupCreator.getNodeAttributes(highestPriorityBrokerPort, highestPriorityBrokerPort); assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE)); assertProducingConsuming(connection); diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java index 8df419c3a7..0f8a1609de 100644 --- a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java @@ -17,7 +17,7 @@ * under the License. * */ -package org.apache.qpid.server.store.berkeleydb; +package org.apache.qpid.server.store.berkeleydb.replication; import java.io.File; @@ -30,14 +30,14 @@ import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; import org.apache.qpid.test.utils.JMXTestUtils; import org.apache.qpid.test.utils.QpidBrokerTestCase; -public class HAClusterTwoNodeTest extends QpidBrokerTestCase +public class TwoNodeTest extends QpidBrokerTestCase { private static final String VIRTUAL_HOST = "test"; private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(VIRTUAL_HOST); private static final int NUMBER_OF_NODES = 2; - private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + private final GroupCreator _groupCreator = new GroupCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); private final JMXTestUtils _jmxUtils = new JMXTestUtils(this); private ConnectionURL _brokerFailoverUrl; @@ -75,21 +75,21 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase private void startCluster(boolean designedPrimary) throws Exception { setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); - _clusterCreator.configureClusterNodes(); - _clusterCreator.setDesignatedPrimaryOnFirstBroker(designedPrimary); - _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); - _clusterCreator.startCluster(); + _groupCreator.configureClusterNodes(); + _groupCreator.setDesignatedPrimaryOnFirstBroker(designedPrimary); + _brokerFailoverUrl = _groupCreator.getConnectionUrlForAllClusterNodes(); + _groupCreator.startCluster(); } public void testMasterDesignatedPrimaryCanBeRestartedWithoutReplica() throws Exception { startCluster(true); final Connection initialConnection = getConnection(_brokerFailoverUrl); - int masterPort = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection); + int masterPort = _groupCreator.getBrokerPortNumberFromConnection(initialConnection); assertProducingConsuming(initialConnection); initialConnection.close(); - _clusterCreator.stopCluster(); - _clusterCreator.startNode(masterPort); + _groupCreator.stopCluster(); + _groupCreator.startNode(masterPort); final Connection secondConnection = getConnection(_brokerFailoverUrl); assertProducingConsuming(secondConnection); secondConnection.close(); @@ -101,8 +101,8 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase final Connection initialConnection = getConnection(_brokerFailoverUrl); assertProducingConsuming(initialConnection); initialConnection.close(); - _clusterCreator.stopCluster(); - _clusterCreator.startClusterParallel(); + _groupCreator.stopCluster(); + _groupCreator.startClusterParallel(); final Connection secondConnection = getConnection(_brokerFailoverUrl); assertProducingConsuming(secondConnection); secondConnection.close(); @@ -111,7 +111,7 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase public void testDesignatedPrimaryContinuesAfterSecondaryStopped() throws Exception { startCluster(true); - _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfSecondaryNode()); final Connection connection = getConnection(_brokerFailoverUrl); assertNotNull("Expected to get a valid connection to primary", connection); assertProducingConsuming(connection); @@ -120,7 +120,7 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase public void testPersistentOperationsFailOnNonDesignatedPrimaryAfterSecondaryStopped() throws Exception { startCluster(false); - _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfSecondaryNode()); try { @@ -139,7 +139,7 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase public void testSecondaryDoesNotBecomePrimaryWhenDesignatedPrimaryStopped() throws Exception { startCluster(true); - _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary()); + _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfPrimary()); try { @@ -155,18 +155,18 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase public void testInitialDesignatedPrimaryStateOfNodes() throws Exception { startCluster(true); - final ManagedBDBHAMessageStore primaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfPrimary()); + final ManagedBDBHAMessageStore primaryStoreBean = getStoreBeanForNodeAtBrokerPort(_groupCreator.getBrokerPortNumberOfPrimary()); assertTrue("Expected primary node to be set as designated primary", primaryStoreBean.getDesignatedPrimary()); - final ManagedBDBHAMessageStore secondaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + final ManagedBDBHAMessageStore secondaryStoreBean = getStoreBeanForNodeAtBrokerPort(_groupCreator.getBrokerPortNumberOfSecondaryNode()); assertFalse("Expected secondary node to NOT be set as designated primary", secondaryStoreBean.getDesignatedPrimary()); } public void testSecondaryDesignatedAsPrimaryAfterOriginalPrimaryStopped() throws Exception { startCluster(true); - final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); - _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary()); + final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(_groupCreator.getBrokerPortNumberOfSecondaryNode()); + _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfPrimary()); assertFalse("Expected node to NOT be set as designated primary", storeBean.getDesignatedPrimary()); storeBean.setDesignatedPrimary(true); |
