From 049d58ecf81450148665a299cdb6cfaa44e5cfd5 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Tue, 12 Aug 2014 15:14:05 +0000 Subject: QPID-5989: [Java Tests] Relocate system test classes to beneath src/test/java git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1617503 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/store/berkeleydb/BDBBackupTest.java | 194 -------- .../berkeleydb/BDBHAVirtualHostNodeRestTest.java | 452 ----------------- .../store/berkeleydb/BDBHAVirtualHostRestTest.java | 157 ------ .../server/store/berkeleydb/BDBUpgradeTest.java | 548 --------------------- .../store/berkeleydb/HAClusterBlackboxTest.java | 292 ----------- .../store/berkeleydb/HAClusterManagementTest.java | 321 ------------ .../store/berkeleydb/HAClusterTwoNodeTest.java | 195 -------- .../store/berkeleydb/HAClusterWhiteboxTest.java | 250 ---------- .../store/berkeleydb/HATestClusterCreator.java | 529 -------------------- .../server/store/berkeleydb/BDBBackupTest.java | 194 ++++++++ .../berkeleydb/BDBHAVirtualHostNodeRestTest.java | 455 +++++++++++++++++ .../store/berkeleydb/BDBHAVirtualHostRestTest.java | 157 ++++++ .../server/store/berkeleydb/BDBUpgradeTest.java | 548 +++++++++++++++++++++ .../store/berkeleydb/HAClusterBlackboxTest.java | 292 +++++++++++ .../store/berkeleydb/HAClusterManagementTest.java | 321 ++++++++++++ .../store/berkeleydb/HAClusterTwoNodeTest.java | 195 ++++++++ .../store/berkeleydb/HAClusterWhiteboxTest.java | 250 ++++++++++ .../store/berkeleydb/HATestClusterCreator.java | 529 ++++++++++++++++++++ 18 files changed, 2941 insertions(+), 2938 deletions(-) delete mode 100644 qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java delete mode 100644 qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java delete mode 100644 qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java delete mode 100644 qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java delete mode 100644 qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java delete mode 100644 qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java delete mode 100644 qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java delete mode 100644 qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java delete mode 100644 qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java create mode 100644 qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java create mode 100644 qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java create mode 100644 qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java create mode 100644 qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java create mode 100644 qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java create mode 100644 qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java create mode 100644 qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java create mode 100644 qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java create mode 100644 qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java (limited to 'qpid/java/bdbstore') diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java deleted file mode 100644 index fab889a49f..0000000000 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.File; -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Session; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.model.VirtualHostNode; -import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBVirtualHostNode; -import org.apache.qpid.test.utils.Piper; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.util.FileUtils; -import org.apache.qpid.util.Strings; -import org.apache.qpid.util.SystemUtils; - -/** - * Tests the BDB backup script can successfully perform a backup and that - * backup can be restored and used by the Broker. - */ -public class BDBBackupTest extends QpidBrokerTestCase -{ - protected static final Logger LOGGER = Logger.getLogger(BDBBackupTest.class); - - private static final String BACKUP_SCRIPT = "/bin/backup.sh"; - private static final String BACKUP_COMPLETE_MESSAGE = "Hot Backup Completed"; - - private static final String TEST_VHOST = "test"; - private static final String SYSTEM_TMP_DIR = System.getProperty("java.io.tmpdir"); - - private File _backupToDir; - private File _backupFromDir; - - @Override - protected void setUp() throws Exception - { - super.setUp(); - _backupToDir = new File(SYSTEM_TMP_DIR + File.separator + getTestName()); - _backupToDir.mkdirs(); - - Map virtualHostNodeAttributes = getBrokerConfiguration().getObjectAttributes(VirtualHostNode.class, TEST_VHOST); - _backupFromDir = new File(Strings.expand((String) virtualHostNodeAttributes.get(BDBVirtualHostNode.STORE_PATH))); - boolean fromDirExistsAndIsDir = _backupFromDir.isDirectory(); - assertTrue("backupFromDir " + _backupFromDir + " should already exist", fromDirExistsAndIsDir); - } - - @Override - protected void tearDown() throws Exception - { - try - { - super.tearDown(); - } - finally - { - FileUtils.delete(_backupToDir, true); - } - } - - public void testBackupAndRestoreMaintainsMessages() throws Exception - { - sendNumberedMessages(0, 10); - invokeBdbBackup(_backupFromDir, _backupToDir); - sendNumberedMessages(10, 20); - confirmBrokerHasMessages(0, 20); - stopBroker(); - - deleteStore(_backupFromDir); - replaceStoreWithBackup(_backupToDir, _backupFromDir); - - startBroker(); - confirmBrokerHasMessages(0, 10); - } - - private void sendNumberedMessages(final int startIndex, final int endIndex) throws JMSException, Exception - { - Connection con = getConnection(); - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createQueue(getTestQueueName()); - // Create queue by consumer side-effect - session.createConsumer(destination).close(); - - final int numOfMessages = endIndex - startIndex; - final int batchSize = 0; - sendMessage(session, destination, numOfMessages, startIndex, batchSize); - con.close(); - } - - private void confirmBrokerHasMessages(final int startIndex, final int endIndex) throws Exception - { - Connection con = getConnection(); - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - con.start(); - Destination destination = session.createQueue(getTestQueueName()); - MessageConsumer consumer = session.createConsumer(destination); - for (int i = startIndex; i < endIndex; i++) - { - Message msg = consumer.receive(RECEIVE_TIMEOUT); - assertNotNull("Message " + i + " not received", msg); - assertEquals("Did not receive the expected message", i, msg.getIntProperty(INDEX)); - } - - Message msg = consumer.receive(100); - if(msg != null) - { - fail("No more messages should be received, but received additional message with index: " + msg.getIntProperty(INDEX)); - } - con.close(); - } - - private void invokeBdbBackup(final File backupFromDir, final File backupToDir) throws Exception - { - if (SystemUtils.isWindows()) - { - BDBBackup.main(new String[]{"-todir", backupToDir.getAbsolutePath(), "-fromdir", backupFromDir.getAbsolutePath()}); - } - else - { - runBdbBackupScript(backupFromDir, backupToDir); - } - } - - private void runBdbBackupScript(final File backupFromDir, final File backupToDir) throws IOException, - InterruptedException - { - Process backupProcess = null; - try - { - String qpidHome = System.getProperty(QPID_HOME); - ProcessBuilder pb = new ProcessBuilder(qpidHome + BACKUP_SCRIPT, "-todir", backupToDir.getAbsolutePath(), "-fromdir", backupFromDir.getAbsolutePath()); - pb.redirectErrorStream(true); - Map env = pb.environment(); - env.put(QPID_HOME, qpidHome); - - LOGGER.debug("Backup command is " + pb.command()); - backupProcess = pb.start(); - Piper piper = new Piper(backupProcess.getInputStream(), _testcaseOutputStream, null, BACKUP_COMPLETE_MESSAGE); - piper.start(); - piper.await(2, TimeUnit.SECONDS); - backupProcess.waitFor(); - piper.join(); - - LOGGER.debug("Backup command completed " + backupProcess.exitValue()); - assertEquals("Unexpected exit value from backup script", 0, backupProcess.exitValue()); - } - finally - { - if (backupProcess != null) - { - backupProcess.getErrorStream().close(); - backupProcess.getInputStream().close(); - backupProcess.getOutputStream().close(); - } - } - } - - private void replaceStoreWithBackup(File source, File dst) throws Exception - { - LOGGER.debug("Copying store " + source + " to " + dst); - FileUtils.copyRecursive(source, dst); - } - - private void deleteStore(File storeDir) - { - LOGGER.debug("Deleting store " + storeDir); - FileUtils.delete(storeDir, true); - } - -} diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java deleted file mode 100644 index db1431ee05..0000000000 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java +++ /dev/null @@ -1,452 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import javax.servlet.http.HttpServletResponse; - -import com.sleepycat.je.Durability; -import com.sleepycat.je.EnvironmentConfig; -import com.sleepycat.je.rep.ReplicatedEnvironment; -import com.sleepycat.je.rep.ReplicationConfig; -import org.apache.qpid.server.model.RemoteReplicationNode; -import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.model.VirtualHostNode; -import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; -import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost; -import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode; -import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode; -import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; -import org.apache.qpid.systest.rest.Asserts; -import org.apache.qpid.systest.rest.QpidRestTestCase; -import org.apache.qpid.test.utils.TestBrokerConfiguration; - -public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase -{ - private static final String NODE1 = "node1"; - private static final String NODE2 = "node2"; - private static final String NODE3 = "node3"; - - private int _node1HaPort; - private int _node2HaPort; - private int _node3HaPort; - - private String _hostName; - private String _baseNodeRestUrl; - - @Override - public void setUp() throws Exception - { - setTestSystemProperty(ReplicatedEnvironmentFacade.REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, "1000"); - - super.setUp(); - _hostName = getTestName(); - _baseNodeRestUrl = "virtualhostnode/"; - - _node1HaPort = findFreePort(); - _node2HaPort = getNextAvailable(_node1HaPort + 1); - _node3HaPort = getNextAvailable(_node2HaPort + 1); - - - } - - @Override - protected void customizeConfiguration() throws IOException - { - super.customizeConfiguration(); - TestBrokerConfiguration config = getBrokerConfiguration(); - config.removeObjectConfiguration(VirtualHostNode.class, TEST2_VIRTUALHOST); - config.removeObjectConfiguration(VirtualHostNode.class, TEST3_VIRTUALHOST); - } - - public void testCreate3NodeGroup() throws Exception - { - createHANode(NODE1, _node1HaPort, _node1HaPort); - assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1); - createHANode(NODE2, _node2HaPort, _node1HaPort); - assertNode(NODE2, _node2HaPort, _node1HaPort, NODE1); - createHANode(NODE3, _node3HaPort, _node1HaPort); - assertNode(NODE3, _node3HaPort, _node1HaPort, NODE1); - assertRemoteNodes(NODE1, NODE2, NODE3); - } - - public void testMutateStateOfOneNode() throws Exception - { - createHANode(NODE1, _node1HaPort, _node1HaPort); - createHANode(NODE2, _node2HaPort, _node1HaPort); - createHANode(NODE3, _node3HaPort, _node1HaPort); - - String node1Url = _baseNodeRestUrl + NODE1; - String node2Url = _baseNodeRestUrl + NODE2; - String node3Url = _baseNodeRestUrl + NODE3; - - assertActualAndDesiredStates(node1Url, "ACTIVE", "ACTIVE"); - assertActualAndDesiredStates(node2Url, "ACTIVE", "ACTIVE"); - assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE"); - - mutateDesiredState(node1Url, "STOPPED"); - - assertActualAndDesiredStates(node1Url, "STOPPED", "STOPPED"); - assertActualAndDesiredStates(node2Url, "ACTIVE", "ACTIVE"); - assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE"); - - List> remoteNodes = getRestTestHelper().getJsonAsList("replicationnode/" + NODE2); - assertEquals("Unexpected number of remote nodes on " + NODE2, 2, remoteNodes.size()); - - Map remoteNode1 = findRemoteNodeByName(remoteNodes, NODE1); - - assertEquals("Node 1 observed from node 2 is in the wrong state", - "UNAVAILABLE", remoteNode1.get(BDBHARemoteReplicationNode.STATE)); - assertEquals("Node 1 observed from node 2 has the wrong role", - "UNKNOWN", remoteNode1.get(BDBHARemoteReplicationNode.ROLE)); - - } - - public void testNewMasterElectedWhenVirtualHostIsStopped() throws Exception - { - createHANode(NODE1, _node1HaPort, _node1HaPort); - createHANode(NODE2, _node2HaPort, _node1HaPort); - createHANode(NODE3, _node3HaPort, _node1HaPort); - - String node1Url = _baseNodeRestUrl + NODE1; - String node2Url = _baseNodeRestUrl + NODE2; - String node3Url = _baseNodeRestUrl + NODE3; - - assertActualAndDesiredStates(node1Url, "ACTIVE", "ACTIVE"); - assertActualAndDesiredStates(node2Url, "ACTIVE", "ACTIVE"); - assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE"); - - // Put virtualhost in STOPPED state - String virtualHostRestUrl = "virtualhost/" + NODE1 + "/" + _hostName; - assertActualAndDesiredStates(virtualHostRestUrl, "ACTIVE", "ACTIVE"); - mutateDesiredState(virtualHostRestUrl, "STOPPED"); - assertActualAndDesiredStates(virtualHostRestUrl, "STOPPED", "STOPPED"); - - // Now stop node 1 to cause an election between nodes 2 & 3 - mutateDesiredState(node1Url, "STOPPED"); - assertActualAndDesiredStates(node1Url, "STOPPED", "STOPPED"); - - Map newMasterData = awaitNewMaster(node2Url, node3Url); - - //Check the virtual host of the new master is in the stopped state - String newMasterVirtualHostRestUrl = "virtualhost/" + newMasterData.get(BDBHAVirtualHostNode.NAME) + "/" + _hostName; - assertActualAndDesiredStates(newMasterVirtualHostRestUrl, "STOPPED", "STOPPED"); - } - - public void testDeleteReplicaNode() throws Exception - { - createHANode(NODE1, _node1HaPort, _node1HaPort); - createHANode(NODE2, _node2HaPort, _node1HaPort); - createHANode(NODE3, _node3HaPort, _node1HaPort); - - assertRemoteNodes(NODE1, NODE2, NODE3); - - List> data = getRestTestHelper().getJsonAsList("replicationnode/" + NODE1); - assertEquals("Unexpected number of remote nodes on " + NODE1, 2, data.size()); - - int responseCode = getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE2, "DELETE"); - assertEquals("Unexpected response code on deletion of virtual host node " + NODE2, 200, responseCode); - - int counter = 0; - while (data.size() != 1 && counter<50) - { - data = getRestTestHelper().getJsonAsList("replicationnode/" + NODE1); - if (data.size() != 1) - { - Thread.sleep(100l); - } - counter++; - } - assertEquals("Unexpected number of remote nodes on " + NODE1, 1, data.size()); - } - - public void testDeleteMasterNode() throws Exception - { - createHANode(NODE1, _node1HaPort, _node1HaPort); - createHANode(NODE2, _node2HaPort, _node1HaPort); - createHANode(NODE3, _node3HaPort, _node1HaPort); - - assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1); - assertRemoteNodes(NODE1, NODE2, NODE3); - - // change priority to make Node2 a master - int responseCode = getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE2, "PUT", Collections.singletonMap(BDBHAVirtualHostNode.PRIORITY, 100)); - assertEquals("Unexpected response code on priority update of virtual host node " + NODE2, 200, responseCode); - - List> data = getRestTestHelper().getJsonAsList("replicationnode/" + NODE2); - assertEquals("Unexpected number of remote nodes on " + NODE2, 2, data.size()); - - // delete master - responseCode = getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE1, "DELETE"); - assertEquals("Unexpected response code on deletion of virtual host node " + NODE1, 200, responseCode); - - // wait for new master - waitForAttributeChanged(_baseNodeRestUrl + NODE2 + "?depth=0", BDBHAVirtualHostNode.ROLE, "MASTER"); - - // delete remote node - responseCode = getRestTestHelper().submitRequest("replicationnode/" + NODE2 + "/" + NODE1, "DELETE"); - assertEquals("Unexpected response code on deletion of remote node " + NODE1, 200, responseCode); - - int counter = 0; - while (data.size() != 1 && counter<50) - { - data = getRestTestHelper().getJsonAsList("replicationnode/" + NODE2); - if (data.size() != 1) - { - Thread.sleep(100l); - } - counter++; - } - assertEquals("Unexpected number of remote nodes on " + NODE2, 1, data.size()); - } - - public void testIntruderBDBHAVHNNotAllowedNoConnect() throws Exception - { - createHANode(NODE1, _node1HaPort, _node1HaPort); - assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1); - - // add permitted node - Map node3Data = createNodeAttributeMap(NODE3, _node3HaPort, _node1HaPort); - getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE3, "PUT", node3Data, 201); - assertNode(NODE3, _node3HaPort, _node1HaPort, NODE1); - assertRemoteNodes(NODE1, NODE3); - - int intruderPort = getNextAvailable(_node3HaPort + 1); - - // try to add not permitted node - Map nodeData = createNodeAttributeMap(NODE2, intruderPort, _node1HaPort); - getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE2, "PUT", nodeData, 409); - - assertRemoteNodes(NODE1, NODE3); - } - - public void testIntruderProtection() throws Exception - { - createHANode(NODE1, _node1HaPort, _node1HaPort); - assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1); - - Map nodeData = getRestTestHelper().getJsonAsSingletonList(_baseNodeRestUrl + NODE1); - String node1StorePath = (String)nodeData.get(BDBHAVirtualHostNode.STORE_PATH); - long transactionId = ((Number)nodeData.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID)).longValue(); - - // add permitted node - Map node3Data = createNodeAttributeMap(NODE3, _node3HaPort, _node1HaPort); - getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE3, "PUT", node3Data, 201); - assertNode(NODE3, _node3HaPort, _node1HaPort, NODE1); - assertRemoteNodes(NODE1, NODE3); - - // Ensure PINGDB is created - // in order to exclude hanging of environment - // when environment.close is called whilst PINGDB is created. - // On node joining, a record is updated in PINGDB - // if lastTransactionId is incremented then node ping task was executed - int counter = 0; - long newTransactionId = transactionId; - while(newTransactionId == transactionId && counter<50) - { - nodeData = getRestTestHelper().getJsonAsSingletonList(_baseNodeRestUrl + NODE1); - newTransactionId = ((Number)nodeData.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID)).longValue(); - if (newTransactionId != transactionId) - { - break; - } - counter++; - Thread.sleep(100l); - } - - //connect intruder node - String nodeName = NODE2; - String nodeHostPort = "localhost:" + getNextAvailable(_node3HaPort + 1); - File environmentPathFile = new File(node1StorePath, nodeName); - environmentPathFile.mkdirs(); - ReplicationConfig replicationConfig = new ReplicationConfig((String)nodeData.get(BDBHAVirtualHostNode.GROUP_NAME), nodeName, nodeHostPort); - replicationConfig.setHelperHosts((String)nodeData.get(BDBHAVirtualHostNode.ADDRESS)); - EnvironmentConfig envConfig = new EnvironmentConfig(); - envConfig.setAllowCreate(true); - envConfig.setTransactional(true); - envConfig.setDurability(Durability.parse((String)nodeData.get(BDBHAVirtualHostNode.DURABILITY))); - - ReplicatedEnvironment intruder = null; - try - { - intruder = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig); - } - finally - { - intruder.close(); - } - waitForAttributeChanged(_baseNodeRestUrl + NODE1, VirtualHostNode.STATE, State.ERRORED.name()); - } - - private void createHANode(String nodeName, int nodePort, int helperPort) throws Exception - { - Map nodeData = createNodeAttributeMap(nodeName, nodePort, helperPort); - - int responseCode = getRestTestHelper().submitRequest(_baseNodeRestUrl + nodeName, "PUT", nodeData); - assertEquals("Unexpected response code for virtual host node " + nodeName + " creation request", 201, responseCode); - String hostExpectedState = nodePort == helperPort ? State.ACTIVE.name(): State.UNAVAILABLE.name(); - waitForAttributeChanged("virtualhost/" + nodeName + "/" + _hostName, BDBHAVirtualHost.STATE, hostExpectedState); - } - - private Map createNodeAttributeMap(String nodeName, int nodePort, int helperPort) throws Exception - { - Map nodeData = new HashMap(); - nodeData.put(BDBHAVirtualHostNode.NAME, nodeName); - nodeData.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); - nodeData.put(BDBHAVirtualHostNode.GROUP_NAME, _hostName); - nodeData.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + nodePort); - nodeData.put(BDBHAVirtualHostNode.HELPER_ADDRESS, "localhost:" + helperPort); - nodeData.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, NODE1); - Map context = new HashMap<>(); - nodeData.put(BDBHAVirtualHostNode.CONTEXT, context); - String bluePrint = HATestClusterCreator.getBlueprint("localhost", _node1HaPort, _node2HaPort, _node3HaPort); - context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR, bluePrint); - return nodeData; - } - - private void assertNode(String nodeName, int nodePort, int nodeHelperPort, String masterNode) throws Exception - { - boolean isMaster = nodeName.equals(masterNode); - String expectedRole = isMaster? "MASTER" : "REPLICA"; - waitForAttributeChanged(_baseNodeRestUrl + nodeName + "?depth=0", BDBHAVirtualHostNode.ROLE, expectedRole); - - Map nodeData = getRestTestHelper().getJsonAsSingletonList(_baseNodeRestUrl + nodeName + "?depth=0"); - assertEquals("Unexpected name", nodeName, nodeData.get(BDBHAVirtualHostNode.NAME)); - assertEquals("Unexpected type", "BDB_HA", nodeData.get(BDBHAVirtualHostNode.TYPE)); - assertEquals("Unexpected address", "localhost:" + nodePort, nodeData.get(BDBHAVirtualHostNode.ADDRESS)); - assertEquals("Unexpected helper address", "localhost:" + nodeHelperPort, nodeData.get(BDBHAVirtualHostNode.HELPER_ADDRESS)); - assertEquals("Unexpected group name", _hostName, nodeData.get(BDBHAVirtualHostNode.GROUP_NAME)); - assertEquals("Unexpected role", expectedRole, nodeData.get(BDBHAVirtualHostNode.ROLE)); - - Integer lastKnownTransactionId = (Integer) nodeData.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID); - assertNotNull("Unexpected lastKnownReplicationId", lastKnownTransactionId); - assertTrue("Unexpected lastKnownReplicationId " + lastKnownTransactionId, lastKnownTransactionId > 0); - - Long joinTime = (Long) nodeData.get(BDBHAVirtualHostNode.JOIN_TIME); - assertNotNull("Unexpected joinTime", joinTime); - assertTrue("Unexpected joinTime " + joinTime, joinTime > 0); - - if (isMaster) - { - waitForAttributeChanged("virtualhost/" + masterNode + "/" + _hostName + "?depth=0", VirtualHost.STATE, State.ACTIVE.name()); - } - - } - - private void assertRemoteNodes(String masterNode, String... replicaNodes) throws Exception - { - List clusterNodes = new ArrayList(Arrays.asList(replicaNodes)); - clusterNodes.add(masterNode); - - for (String clusterNodeName : clusterNodes) - { - List remotes = new ArrayList(clusterNodes); - remotes.remove(clusterNodeName); - for (String remote : remotes) - { - String remoteUrl = "replicationnode/" + clusterNodeName + "/" + remote; - Map nodeData = waitForAttributeChanged(remoteUrl, BDBHARemoteReplicationNode.ROLE, remote.equals(masterNode) ? "MASTER" : "REPLICA"); - assertRemoteNodeData(remote, nodeData); - } - } - } - - private void assertRemoteNodeData(String name, Map nodeData) - { - assertEquals("Remote node " + name + " has unexpected name", name, nodeData.get(BDBHAVirtualHostNode.NAME)); - - Integer lastKnownTransactionId = (Integer) nodeData.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID); - assertNotNull("Node " + name + " has unexpected lastKnownReplicationId", lastKnownTransactionId); - assertTrue("Node " + name + " has unexpected lastKnownReplicationId " + lastKnownTransactionId, lastKnownTransactionId > 0); - - Long joinTime = (Long) nodeData.get(BDBHAVirtualHostNode.JOIN_TIME); - assertNotNull("Node " + name + " has unexpected joinTime", joinTime); - assertTrue("Node " + name + " has unexpected joinTime " + joinTime, joinTime > 0); - } - - private void assertActualAndDesiredStates(final String restUrl, - final String expectedDesiredState, - final String expectedActualState) throws IOException - { - Map objectData = getRestTestHelper().getJsonAsSingletonList(restUrl); - Asserts.assertActualAndDesiredState(expectedDesiredState, expectedActualState, objectData); - } - - private void mutateDesiredState(final String restUrl, final String newState) throws IOException - { - Map newAttributes = new HashMap(); - newAttributes.put(VirtualHostNode.DESIRED_STATE, newState); - - getRestTestHelper().submitRequest(restUrl, "PUT", newAttributes, HttpServletResponse.SC_OK); - } - - private Map findRemoteNodeByName(final List> remoteNodes, final String nodeName) - { - Map foundNode = null; - for (Map remoteNode : remoteNodes) - { - if (nodeName.equals(remoteNode.get(RemoteReplicationNode.NAME))) - { - foundNode = remoteNode; - break; - } - } - assertNotNull("Could not find node with name " + nodeName + " amongst remote nodes."); - return foundNode; - } - - private Map awaitNewMaster(final String... nodeUrls) - throws IOException, InterruptedException - { - Map newMasterData = null; - int counter = 0; - while (newMasterData == null && counter < 50) - { - for(String nodeUrl: nodeUrls) - { - Map nodeData = getRestTestHelper().getJsonAsSingletonList(nodeUrl); - if ("MASTER".equals(nodeData.get(BDBHAVirtualHostNode.ROLE))) - { - newMasterData = nodeData; - break; - } - } - if (newMasterData == null) - { - Thread.sleep(100l); - counter++; - } - } - assertNotNull("Could not find new master", newMasterData); - return newMasterData; - } - - -} diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java deleted file mode 100644 index 334544e334..0000000000 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY; -import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY; - -import java.io.File; -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import javax.servlet.http.HttpServletResponse; - -import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.model.VirtualHostNode; -import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; -import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode; -import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; -import org.apache.qpid.systest.rest.Asserts; -import org.apache.qpid.systest.rest.QpidRestTestCase; -import org.apache.qpid.test.utils.TestBrokerConfiguration; -import org.apache.qpid.util.FileUtils; - -public class BDBHAVirtualHostRestTest extends QpidRestTestCase -{ - private String _hostName; - private File _storeBaseDir; - private int _nodeHaPort; - private Object _nodeName; - private String _virtualhostUrl; - private String _bluePrint; - - @Override - public void setUp() throws Exception - { - setTestSystemProperty(ReplicatedEnvironmentFacade.REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, "1000"); - _hostName = "ha"; - _nodeName = "node1"; - _storeBaseDir = new File(TMP_FOLDER, "store-" + _hostName + "-" + System.currentTimeMillis()); - _nodeHaPort = getNextAvailable(getRestTestHelper().getHttpPort() + 1); - _virtualhostUrl = "virtualhost/" + _nodeName + "/" + _hostName; - _bluePrint = HATestClusterCreator.getBlueprint("localhost", _nodeHaPort); - - super.setUp(); - } - - @Override - public void tearDown() throws Exception - { - try - { - super.tearDown(); - } - finally - { - if (_storeBaseDir != null) - { - FileUtils.delete(_storeBaseDir, true); - } - } - } - - @Override - protected void customizeConfiguration() throws IOException - { - super.customizeConfiguration(); - TestBrokerConfiguration config = getBrokerConfiguration(); - config.removeObjectConfiguration(VirtualHostNode.class, TEST2_VIRTUALHOST); - config.removeObjectConfiguration(VirtualHostNode.class, TEST3_VIRTUALHOST); - - Map nodeAttributes = new HashMap(); - nodeAttributes.put(BDBHAVirtualHostNode.NAME, _nodeName); - nodeAttributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); - nodeAttributes.put(BDBHAVirtualHostNode.STORE_PATH, _storeBaseDir.getPath() + File.separator + _nodeName); - nodeAttributes.put(BDBHAVirtualHostNode.GROUP_NAME, _hostName); - nodeAttributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + _nodeHaPort); - nodeAttributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, "localhost:" + _nodeHaPort); - nodeAttributes.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, _nodeName); - Map context = new HashMap(); - context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR, _bluePrint); - - nodeAttributes.put(BDBHAVirtualHostNode.CONTEXT, context); - config.addObjectConfiguration(VirtualHostNode.class, nodeAttributes); - } - - public void testSetLocalTransactionSynchronizationPolicy() throws Exception - { - Map hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name()); - assertEquals("Unexpected synchronization policy before change", "SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY)); - - Map newPolicy = Collections.singletonMap(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, "NO_SYNC"); - getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, HttpServletResponse.SC_OK); - - hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl); - assertEquals("Unexpected synchronization policy after change", "NO_SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY)); - } - - public void testSetRemoteTransactionSynchronizationPolicy() throws Exception - { - Map hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name()); - assertEquals("Unexpected synchronization policy before change", "NO_SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY)); - - Map newPolicy = Collections.singletonMap(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "SYNC"); - getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, HttpServletResponse.SC_OK); - - hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl); - assertEquals("Unexpected synchronization policy after change", "SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY)); - } - - public void testMutateState() throws Exception - { - waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE"); - assertActualAndDesireStates(_virtualhostUrl, "ACTIVE", "ACTIVE"); - - Map newAttributes = Collections.singletonMap(VirtualHost.DESIRED_STATE, "STOPPED"); - getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK); - - waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "STOPPED"); - assertActualAndDesireStates(_virtualhostUrl, "STOPPED", "STOPPED"); - - newAttributes = Collections.singletonMap(VirtualHost.DESIRED_STATE, "ACTIVE"); - getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK); - - waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE"); - assertActualAndDesireStates(_virtualhostUrl, "ACTIVE", "ACTIVE"); - } - - private void assertActualAndDesireStates(final String restUrl, - final String expectedDesiredState, - final String expectedActualState) throws IOException - { - Map virtualhost = getRestTestHelper().getJsonAsSingletonList(restUrl); - Asserts.assertActualAndDesiredState(expectedDesiredState, expectedActualState, virtualhost); - } - -} diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java deleted file mode 100644 index 491856d953..0000000000 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java +++ /dev/null @@ -1,548 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.File; -import java.io.InputStream; -import java.util.Map; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.TabularDataSupport; - -import org.apache.qpid.management.common.mbeans.ManagedExchange; -import org.apache.qpid.management.common.mbeans.ManagedQueue; -import org.apache.qpid.server.model.VirtualHostNode; -import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBVirtualHostNode; -import org.apache.qpid.test.utils.JMXTestUtils; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.test.utils.TestBrokerConfiguration; -import org.apache.qpid.util.FileUtils; -import org.apache.qpid.util.Strings; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Tests upgrading a BDB store on broker startup. - * The store will then be used to verify that the upgrade is completed - * properly and that once upgraded it functions as expected. - * - * Store prepared using old client/broker with BDBStoreUpgradeTestPreparer. - */ -public class BDBUpgradeTest extends QpidBrokerTestCase -{ - protected static final Logger _logger = LoggerFactory.getLogger(BDBUpgradeTest.class); - - private static final String QPID_WORK_ORIG = System.getProperty("QPID_WORK"); - - private static final String STRING_1024 = generateString(1024); - private static final String STRING_1024_256 = generateString(1024*256); - - private static final String TOPIC_NAME="myUpgradeTopic"; - private static final String SUB_NAME="myDurSubName"; - private static final String SELECTOR_SUB_NAME="mySelectorDurSubName"; - private static final String SELECTOR_TOPIC_NAME="mySelectorUpgradeTopic"; - private static final String QUEUE_NAME="myUpgradeQueue"; - private static final String NON_DURABLE_QUEUE_NAME="queue-non-durable"; - private static final String PRIORITY_QUEUE_NAME="myPriorityQueue"; - private static final String QUEUE_WITH_DLQ_NAME="myQueueWithDLQ"; - - private String _storeLocation; - - @Override - public void setUp() throws Exception - { - assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG); - Map virtualHostNodeAttributes = getBrokerConfiguration().getObjectAttributes(VirtualHostNode.class, TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST); - _storeLocation = Strings.expand((String)virtualHostNodeAttributes.get(BDBVirtualHostNode.STORE_PATH)); - - //Clear the two target directories if they exist. - File directory = new File(_storeLocation); - if (directory.exists() && directory.isDirectory()) - { - FileUtils.delete(directory, true); - } - directory.mkdirs(); - - // copy store files - InputStream src = getClass().getClassLoader().getResourceAsStream("upgrade/bdbstore-v4/test-store/00000000.jdb"); - FileUtils.copy(src, new File(_storeLocation, "00000000.jdb")); - - getBrokerConfiguration().addJmxManagementConfiguration(); - super.setUp(); - } - - /** - * Test that the selector applied to the DurableSubscription was successfully - * transfered to the new store, and functions as expected with continued use - * by monitoring message count while sending new messages to the topic and then - * consuming them. - */ - public void testSelectorDurability() throws Exception - { - JMXTestUtils jmxUtils = null; - try - { - jmxUtils = new JMXTestUtils(this, "guest", "guest"); - jmxUtils.open(); - } - catch (Exception e) - { - fail("Unable to establish JMX connection, test cannot proceed"); - } - - try - { - ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SELECTOR_SUB_NAME); - assertEquals("DurableSubscription backing queue should have 1 message on it initially", - new Integer(1), dursubQueue.getMessageCount()); - - // Create a connection and start it - TopicConnection connection = (TopicConnection) getConnection(); - connection.start(); - - // Send messages which don't match and do match the selector, checking message count - TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED); - Topic topic = pubSession.createTopic(SELECTOR_TOPIC_NAME); - TopicPublisher publisher = pubSession.createPublisher(topic); - - publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false"); - pubSession.commit(); - assertEquals("DurableSubscription backing queue should still have 1 message on it", - Integer.valueOf(1), dursubQueue.getMessageCount()); - - publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true"); - pubSession.commit(); - assertEquals("DurableSubscription backing queue should now have 2 messages on it", - Integer.valueOf(2), dursubQueue.getMessageCount()); - - TopicSubscriber durSub = pubSession.createDurableSubscriber(topic, SELECTOR_SUB_NAME,"testprop='true'", false); - Message m = durSub.receive(2000); - assertNotNull("Failed to receive an expected message", m); - m = durSub.receive(2000); - assertNotNull("Failed to receive an expected message", m); - pubSession.commit(); - - pubSession.close(); - } - finally - { - jmxUtils.close(); - } - } - - /** - * Test that the DurableSubscription without selector was successfully - * transfered to the new store, and functions as expected with continued use. - */ - public void testDurableSubscriptionWithoutSelector() throws Exception - { - JMXTestUtils jmxUtils = null; - try - { - jmxUtils = new JMXTestUtils(this, "guest", "guest"); - jmxUtils.open(); - } - catch (Exception e) - { - fail("Unable to establish JMX connection, test cannot proceed"); - } - - try - { - ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME); - assertEquals("DurableSubscription backing queue should have 1 message on it initially", - new Integer(1), dursubQueue.getMessageCount()); - - // Create a connection and start it - TopicConnection connection = (TopicConnection) getConnection(); - connection.start(); - - // Send new message matching the topic, checking message count - TopicSession session = connection.createTopicSession(true, Session.SESSION_TRANSACTED); - Topic topic = session.createTopic(TOPIC_NAME); - TopicPublisher publisher = session.createPublisher(topic); - - publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "indifferent"); - session.commit(); - assertEquals("DurableSubscription backing queue should now have 2 messages on it", - Integer.valueOf(2), dursubQueue.getMessageCount()); - - TopicSubscriber durSub = session.createDurableSubscriber(topic, SUB_NAME); - Message m = durSub.receive(2000); - assertNotNull("Failed to receive an expected message", m); - m = durSub.receive(2000); - assertNotNull("Failed to receive an expected message", m); - - session.commit(); - session.close(); - } - finally - { - jmxUtils.close(); - } - } - - /** - * Test that the backing queue for the durable subscription created was successfully - * detected and set as being exclusive during the upgrade process, and that the - * regular queue was not. - */ - public void testQueueExclusivity() throws Exception - { - JMXTestUtils jmxUtils = null; - try - { - jmxUtils = new JMXTestUtils(this, "guest", "guest"); - jmxUtils.open(); - } - catch (Exception e) - { - fail("Unable to establish JMX connection, test cannot proceed"); - } - - try - { - ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_NAME); - assertFalse("Queue should not have been marked as Exclusive during upgrade", queue.isExclusive()); - - ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME); - assertTrue("DurableSubscription backing queue should have been marked as Exclusive during upgrade", dursubQueue.isExclusive()); - } - finally - { - jmxUtils.close(); - } - } - - /** - * Test that the upgraded queue continues to function properly when used - * for persistent messaging and restarting the broker. - * - * Sends the new messages to the queue BEFORE consuming those which were - * sent before the upgrade. In doing so, this also serves to test that - * the queue bindings were successfully transitioned during the upgrade. - */ - public void testBindingAndMessageDurabability() throws Exception - { - // Create a connection and start it - TopicConnection connection = (TopicConnection) getConnection(); - connection.start(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(QUEUE_NAME); - MessageProducer messageProducer = session.createProducer(queue); - - // Send a new message - sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 256*1024, 1); - - session.close(); - - // Restart the broker - restartBroker(); - - // Drain the queue of all messages - connection = (TopicConnection) getConnection(); - connection.start(); - consumeQueueMessages(connection, true); - } - - /** - * Test that all of the committed persistent messages previously sent to - * the broker are properly received following update of the MetaData and - * Content entries during the store upgrade process. - */ - public void testConsumptionOfUpgradedMessages() throws Exception - { - // Create a connection and start it - Connection connection = getConnection(); - connection.start(); - - consumeDurableSubscriptionMessages(connection, true); - consumeDurableSubscriptionMessages(connection, false); - consumeQueueMessages(connection, false); - } - - /** - * Tests store migration containing messages for non-existing queue. - * - * @throws Exception - */ - public void testMigrationOfMessagesForNonDurableQueues() throws Exception - { - // Create a connection and start it - Connection connection = getConnection(); - connection.start(); - - // consume a message for non-existing store - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(NON_DURABLE_QUEUE_NAME); - MessageConsumer messageConsumer = session.createConsumer(queue); - - for (int i = 1; i <= 3; i++) - { - Message message = messageConsumer.receive(1000); - assertNotNull("Message was not migrated!", message); - assertTrue("Unexpected message received!", message instanceof TextMessage); - assertEquals("ID property did not match", i, message.getIntProperty("ID")); - } - } - - /** - * Tests store upgrade has maintained the priority queue configuration, - * such that sending messages with priorities out-of-order and then consuming - * them gets the messages back in priority order. - */ - public void testPriorityQueue() throws Exception - { - // Create a connection and start it - Connection connection = getConnection(); - connection.start(); - - // send some messages to the priority queue - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(PRIORITY_QUEUE_NAME); - MessageProducer producer = session.createProducer(queue); - - producer.setPriority(4); - producer.send(createMessage(1, false, session, producer)); - producer.setPriority(1); - producer.send(createMessage(2, false, session, producer)); - producer.setPriority(9); - producer.send(createMessage(3, false, session, producer)); - session.close(); - - //consume the messages, expected order: msg 3, msg 1, msg 2. - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(queue); - - Message msg = consumer.receive(1500); - assertNotNull("expected message was not received", msg); - assertEquals(3, msg.getIntProperty("msg")); - msg = consumer.receive(1500); - assertNotNull("expected message was not received", msg); - assertEquals(1, msg.getIntProperty("msg")); - msg = consumer.receive(1500); - assertNotNull("expected message was not received", msg); - assertEquals(2, msg.getIntProperty("msg")); - } - - /** - * - * TODO (QPID-5650) Resolve so this test can be reenabled. - * - * Test that the queue configured to have a DLQ was recovered and has the alternate exchange - * and max delivery count, the DLE exists, the DLQ exists with no max delivery count, the - * DLQ is bound to the DLE, and that the DLQ does not itself have a DLQ. - * - * DLQs are NOT enabled at the virtualhost level, we are testing recovery of the arguments - * that turned it on for this specific queue. - */ - public void xtestRecoveryOfQueueWithDLQ() throws Exception - { - JMXTestUtils jmxUtils = null; - try - { - jmxUtils = new JMXTestUtils(this, "guest", "guest"); - jmxUtils.open(); - } - catch (Exception e) - { - fail("Unable to establish JMX connection, test cannot proceed"); - } - - try - { - //verify the DLE exchange exists, has the expected type, and a single binding for the DLQ - ManagedExchange exchange = jmxUtils.getManagedExchange(QUEUE_WITH_DLQ_NAME + "_DLE"); - assertEquals("Wrong exchange type", "fanout", exchange.getExchangeType()); - TabularDataSupport bindings = (TabularDataSupport) exchange.bindings(); - assertEquals(1, bindings.size()); - for(Object o : bindings.values()) - { - CompositeData binding = (CompositeData) o; - - String bindingKey = (String) binding.get(ManagedExchange.BINDING_KEY); - String[] queueNames = (String[]) binding.get(ManagedExchange.QUEUE_NAMES); - - //Because its a fanout exchange, we just return a single '*' key with all bound queues - assertEquals("unexpected binding key", "*", bindingKey); - assertEquals("unexpected number of queues bound", 1, queueNames.length); - assertEquals("unexpected queue name", QUEUE_WITH_DLQ_NAME + "_DLQ", queueNames[0]); - } - - //verify the queue exists, has the expected alternate exchange and max delivery count - ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME); - assertEquals("Queue does not have the expected AlternateExchange", QUEUE_WITH_DLQ_NAME + "_DLE", queue.getAlternateExchange()); - assertEquals("Unexpected maximum delivery count", Integer.valueOf(2), queue.getMaximumDeliveryCount()); - - ManagedQueue dlQqueue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME + "_DLQ"); - assertNull("Queue should not have an AlternateExchange", dlQqueue.getAlternateExchange()); - assertEquals("Unexpected maximum delivery count", Integer.valueOf(0), dlQqueue.getMaximumDeliveryCount()); - - String dlqDlqObjectNameString = jmxUtils.getQueueObjectNameString("test", QUEUE_WITH_DLQ_NAME + "_DLQ" + "_DLQ"); - assertFalse("a DLQ should not exist for the DLQ itself", jmxUtils.doesManagedObjectExist(dlqDlqObjectNameString)); - } - finally - { - jmxUtils.close(); - } - } - - private void consumeDurableSubscriptionMessages(Connection connection, boolean selector) throws Exception - { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = null; - TopicSubscriber durSub = null; - - if(selector) - { - topic = session.createTopic(SELECTOR_TOPIC_NAME); - durSub = session.createDurableSubscriber(topic, SELECTOR_SUB_NAME,"testprop='true'", false); - } - else - { - topic = session.createTopic(TOPIC_NAME); - durSub = session.createDurableSubscriber(topic, SUB_NAME); - } - - - // Retrieve the matching message - Message m = durSub.receive(2000); - assertNotNull("Failed to receive an expected message", m); - if(selector) - { - assertEquals("Selector property did not match", "true", m.getStringProperty("testprop")); - } - assertEquals("ID property did not match", 1, m.getIntProperty("ID")); - assertEquals("Message content was not as expected", generateString(1024) , ((TextMessage)m).getText()); - - // Verify that no more messages are received - m = durSub.receive(1000); - assertNull("No more messages should have been recieved", m); - - durSub.close(); - session.close(); - } - - private void consumeQueueMessages(Connection connection, boolean extraMessage) throws Exception - { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(QUEUE_NAME); - - MessageConsumer consumer = session.createConsumer(queue); - Message m; - - // Retrieve the initial pre-upgrade messages - for (int i=1; i <= 5 ; i++) - { - m = consumer.receive(2000); - assertNotNull("Failed to receive an expected message", m); - assertEquals("ID property did not match", i, m.getIntProperty("ID")); - assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText()); - } - for (int i=1; i <= 5 ; i++) - { - m = consumer.receive(2000); - assertNotNull("Failed to receive an expected message", m); - assertEquals("ID property did not match", i, m.getIntProperty("ID")); - assertEquals("Message content was not as expected", STRING_1024, ((TextMessage)m).getText()); - } - - if(extraMessage) - { - //verify that the extra message is received - m = consumer.receive(2000); - assertNotNull("Failed to receive an expected message", m); - assertEquals("ID property did not match", 1, m.getIntProperty("ID")); - assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText()); - } - - // Verify that no more messages are received - m = consumer.receive(1000); - assertNull("No more messages should have been recieved", m); - - consumer.close(); - session.close(); - } - - private Message createMessage(int msgId, boolean first, Session producerSession, MessageProducer producer) throws JMSException - { - Message send = producerSession.createTextMessage("Message: " + msgId); - send.setIntProperty("msg", msgId); - - return send; - } - - /** - * Generates a string of a given length consisting of the sequence 0,1,2,..,9,0,1,2. - * - * @param length number of characters in the string - * @return string sequence of the given length - */ - private static String generateString(int length) - { - char[] base_chars = new char[]{'0','1','2','3','4','5','6','7','8','9'}; - char[] chars = new char[length]; - for (int i = 0; i < (length); i++) - { - chars[i] = base_chars[i % 10]; - } - return new String(chars); - } - - private static void sendMessages(Session session, MessageProducer messageProducer, - Destination dest, int deliveryMode, int length, int numMesages) throws JMSException - { - for (int i = 1; i <= numMesages; i++) - { - Message message = session.createTextMessage(generateString(length)); - message.setIntProperty("ID", i); - messageProducer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); - } - } - - private static void publishMessages(Session session, TopicPublisher publisher, - Destination dest, int deliveryMode, int length, int numMesages, String selectorProperty) throws JMSException - { - for (int i = 1; i <= numMesages; i++) - { - Message message = session.createTextMessage(generateString(length)); - message.setIntProperty("ID", i); - message.setStringProperty("testprop", selectorProperty); - publisher.publish(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); - } - } -} diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java deleted file mode 100644 index 9867ce2eca..0000000000 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java +++ /dev/null @@ -1,292 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.File; -import java.util.Collections; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.Session; - -import org.apache.log4j.Logger; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.test.utils.TestUtils; - -/** - * The HA black box tests test the BDB cluster as a opaque unit. Client connects to - * the cluster via a failover url - * - * @see HAClusterWhiteboxTest - */ -public class HAClusterBlackboxTest extends QpidBrokerTestCase -{ - protected static final Logger LOGGER = Logger.getLogger(HAClusterBlackboxTest.class); - - private static final String VIRTUAL_HOST = "test"; - private static final int NUMBER_OF_NODES = 3; - - private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); - - private FailoverAwaitingListener _failoverListener; - private ConnectionURL _brokerFailoverUrl; - - @Override - protected void setUp() throws Exception - { - _brokerType = BrokerType.SPAWNED; - - assertTrue(isJavaBroker()); - assertTrue(isBrokerStorePersistent()); - - setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); - - _clusterCreator.configureClusterNodes(); - - _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); - - _clusterCreator.startCluster(); - _failoverListener = new FailoverAwaitingListener(); - - super.setUp(); - } - - @Override - public void startBroker() throws Exception - { - // Don't start default broker provided by QBTC. - } - - public void testLossOfMasterNodeCausesClientToFailover() throws Exception - { - final Connection connection = getConnection(_brokerFailoverUrl); - - ((AMQConnection)connection).setConnectionListener(_failoverListener); - - final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); - LOGGER.info("Active connection port " + activeBrokerPort); - - _clusterCreator.stopNode(activeBrokerPort); - LOGGER.info("Node is stopped"); - _failoverListener.awaitFailoverCompletion(20000); - LOGGER.info("Listener has finished"); - // any op to ensure connection remains - connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - - public void testLossOfReplicaNodeDoesNotCauseClientToFailover() throws Exception - { - LOGGER.info("Connecting to " + _brokerFailoverUrl); - final Connection connection = getConnection(_brokerFailoverUrl); - LOGGER.info("Got connection to cluster"); - - ((AMQConnection)connection).setConnectionListener(_failoverListener); - final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); - LOGGER.info("Active connection port " + activeBrokerPort); - final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection); - - LOGGER.info("Stopping inactive broker on port " + inactiveBrokerPort); - - _clusterCreator.stopNode(inactiveBrokerPort); - - _failoverListener.assertNoFailoverCompletionWithin(2000); - - // any op to ensure connection remains - connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - - public void testTransferMasterFromLocalNode() throws Exception - { - final Connection connection = getConnection(_brokerFailoverUrl); - - ((AMQConnection)connection).setConnectionListener(_failoverListener); - - final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); - LOGGER.info("Active connection port " + activeBrokerPort); - - final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection); - LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort); - - Map attributes = _clusterCreator.getNodeAttributes(inactiveBrokerPort); - assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE)); - _clusterCreator.setNodeAttributes(inactiveBrokerPort, Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER")); - - _failoverListener.awaitFailoverCompletion(20000); - LOGGER.info("Listener has finished"); - - attributes = _clusterCreator.getNodeAttributes(inactiveBrokerPort); - assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE)); - - assertProducingConsuming(connection); - - _clusterCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA"); - } - - public void testTransferMasterFromRemoteNode() throws Exception - { - final Connection connection = getConnection(_brokerFailoverUrl); - - ((AMQConnection)connection).setConnectionListener(_failoverListener); - - final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); - LOGGER.info("Active connection port " + activeBrokerPort); - - final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection); - LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort); - - _clusterCreator.awaitNodeToAttainRole(activeBrokerPort, inactiveBrokerPort, "REPLICA"); - Map attributes = _clusterCreator.getNodeAttributes(activeBrokerPort, inactiveBrokerPort); - assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE)); - - _clusterCreator.setNodeAttributes(activeBrokerPort, inactiveBrokerPort, Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER")); - - _failoverListener.awaitFailoverCompletion(20000); - LOGGER.info("Listener has finished"); - - attributes = _clusterCreator.getNodeAttributes(inactiveBrokerPort); - assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE)); - - assertProducingConsuming(connection); - - _clusterCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA"); - } - - public void testQuorumOverride() throws Exception - { - final Connection connection = getConnection(_brokerFailoverUrl); - - ((AMQConnection)connection).setConnectionListener(_failoverListener); - - Set ports = _clusterCreator.getBrokerPortNumbersForNodes(); - - final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); - ports.remove(activeBrokerPort); - - // Stop all other nodes - for (Integer p : ports) - { - _clusterCreator.stopNode(p); - } - - Map attributes = _clusterCreator.getNodeAttributes(activeBrokerPort); - assertEquals("Broker has unexpected quorum override", new Integer(0), attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE)); - _clusterCreator.setNodeAttributes(activeBrokerPort, Collections.singletonMap(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 1)); - - attributes = _clusterCreator.getNodeAttributes(activeBrokerPort); - assertEquals("Broker has unexpected quorum override", new Integer(1), attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE)); - - assertProducingConsuming(connection); - } - - public void testPriority() throws Exception - { - final Connection connection = getConnection(_brokerFailoverUrl); - - ((AMQConnection)connection).setConnectionListener(_failoverListener); - - final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); - LOGGER.info("Active connection port " + activeBrokerPort); - - int priority = 1; - Integer highestPriorityBrokerPort = null; - Set ports = _clusterCreator.getBrokerPortNumbersForNodes(); - for (Integer port : ports) - { - if (activeBrokerPort != port.intValue()) - { - priority = priority + 1; - highestPriorityBrokerPort = port; - _clusterCreator.setNodeAttributes(port, port, Collections.singletonMap(BDBHAVirtualHostNode.PRIORITY, priority)); - Map attributes = _clusterCreator.getNodeAttributes(port, port); - assertEquals("Broker has unexpected priority", priority, attributes.get(BDBHAVirtualHostNode.PRIORITY)); - } - } - - LOGGER.info("Broker on port " + highestPriorityBrokerPort + " has the highest priority of " + priority); - - LOGGER.info("Shutting down the MASTER"); - _clusterCreator.stopNode(activeBrokerPort); - - _failoverListener.awaitFailoverCompletion(20000); - LOGGER.info("Listener has finished"); - - Map attributes = _clusterCreator.getNodeAttributes(highestPriorityBrokerPort, highestPriorityBrokerPort); - assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE)); - - assertProducingConsuming(connection); - } - - private final class FailoverAwaitingListener implements ConnectionListener - { - private final CountDownLatch _failoverCompletionLatch = new CountDownLatch(1); - - @Override - public boolean preResubscribe() - { - return true; - } - - @Override - public boolean preFailover(boolean redirect) - { - return true; - } - - public void awaitFailoverCompletion(long delay) throws InterruptedException - { - if (!_failoverCompletionLatch.await(delay, TimeUnit.MILLISECONDS)) - { - LOGGER.warn("Test thread dump:\n\n" + TestUtils.dumpThreads() + "\n"); - } - assertEquals("Failover did not occur", 0, _failoverCompletionLatch.getCount()); - } - - public void assertNoFailoverCompletionWithin(long delay) throws InterruptedException - { - _failoverCompletionLatch.await(delay, TimeUnit.MILLISECONDS); - assertEquals("Failover occurred unexpectedly", 1L, _failoverCompletionLatch.getCount()); - } - - @Override - public void failoverComplete() - { - _failoverCompletionLatch.countDown(); - } - - @Override - public void bytesSent(long count) - { - } - - @Override - public void bytesReceived(long count) - { - } - } - -} diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java deleted file mode 100644 index 0ab10cc318..0000000000 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java +++ /dev/null @@ -1,321 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import static com.sleepycat.je.rep.ReplicatedEnvironment.State.DETACHED; -import static com.sleepycat.je.rep.ReplicatedEnvironment.State.MASTER; -import static com.sleepycat.je.rep.ReplicatedEnvironment.State.REPLICA; -import static com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.jms.Connection; -import javax.management.ObjectName; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.TabularData; - -import org.apache.log4j.Logger; -import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.management.common.mbeans.ManagedBroker; -import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; -import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; -import org.apache.qpid.systest.rest.RestTestHelper; -import org.apache.qpid.test.utils.JMXTestUtils; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.junit.Assert; - -/** - * System test verifying the ability to control a cluster via the Management API. - * - * @see HAClusterBlackboxTest - */ -public class HAClusterManagementTest extends QpidBrokerTestCase -{ - protected static final Logger LOGGER = Logger.getLogger(HAClusterManagementTest.class); - - private static final Set NON_MASTER_STATES = new HashSet(Arrays.asList(REPLICA.toString(), DETACHED.toString(), UNKNOWN.toString()));; - private static final String VIRTUAL_HOST = "test"; - - private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(VIRTUAL_HOST); - private static final int NUMBER_OF_NODES = 4; - - private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); - private final JMXTestUtils _jmxUtils = new JMXTestUtils(this); - - private ConnectionURL _brokerFailoverUrl; - - @Override - protected void setUp() throws Exception - { - _brokerType = BrokerType.SPAWNED; - - _clusterCreator.configureClusterNodes(); - _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); - _clusterCreator.startCluster(); - - super.setUp(); - } - - @Override - protected void tearDown() throws Exception - { - try - { - _jmxUtils.close(); - } - finally - { - super.tearDown(); - } - } - - @Override - public void startBroker() throws Exception - { - // Don't start default broker provided by QBTC. - } - - public void testReadonlyMBeanAttributes() throws Exception - { - final int brokerPortNumber = getBrokerPortNumbers().iterator().next(); - final int bdbPortNumber = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumber); - - ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber); - assertEquals("Unexpected store group name", _clusterCreator.getGroupName(), storeBean.getGroupName()); - assertEquals("Unexpected store node name", _clusterCreator.getNodeNameForNodeAt(bdbPortNumber), storeBean.getNodeName()); - assertEquals("Unexpected store node host port",_clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber), storeBean.getNodeHostPort()); - assertEquals("Unexpected store helper host port", _clusterCreator.getHelperHostPort(), storeBean.getHelperHostPort()); - // As we have chosen an arbitrary broker from the cluster, we cannot predict its state - assertNotNull("Store state must not be null", storeBean.getNodeState()); - } - - public void testStateOfActiveBrokerIsMaster() throws Exception - { - final Connection activeConnection = getConnection(_brokerFailoverUrl); - final int activeBrokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(activeConnection); - - ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(activeBrokerPortNumber); - assertEquals("Unexpected store state", MASTER.toString(), storeBean.getNodeState()); - } - - public void testStateOfNonActiveBrokerIsNotMaster() throws Exception - { - final Connection activeConnection = getConnection(_brokerFailoverUrl); - final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection); - ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(inactiveBrokerPortNumber); - final String nodeState = storeBean.getNodeState(); - assertTrue("Unexpected store state : " + nodeState, NON_MASTER_STATES.contains(nodeState)); - } - - public void testGroupMembers() throws Exception - { - final int brokerPortNumber = getBrokerPortNumbers().iterator().next(); - - ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber); - awaitAllNodesJoiningGroup(storeBean, NUMBER_OF_NODES); - - final TabularData groupMembers = storeBean.getAllNodesInGroup(); - assertNotNull(groupMembers); - - for(int bdbPortNumber : _clusterCreator.getBdbPortNumbers()) - { - final String nodeName = _clusterCreator.getNodeNameForNodeAt(bdbPortNumber); - final String nodeHostPort = _clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber); - - CompositeData row = groupMembers.get(new Object[] {nodeName}); - assertNotNull("Table does not contain row for node name " + nodeName, row); - assertEquals(nodeHostPort, row.get(ManagedBDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT)); - } - } - - public void testRemoveRemoteNodeFromGroup() throws Exception - { - final Iterator brokerPortNumberIterator = getBrokerPortNumbers().iterator(); - final int brokerPortNumberToMakeObservation = brokerPortNumberIterator.next(); - final int brokerPortNumberToBeRemoved = brokerPortNumberIterator.next(); - final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToMakeObservation); - awaitAllNodesJoiningGroup(storeBean, NUMBER_OF_NODES); - - final String removedNodeName = _clusterCreator.getNodeNameForNodeAt(_clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeRemoved)); - _clusterCreator.stopNode(brokerPortNumberToBeRemoved); - - storeBean.removeNodeFromGroup(removedNodeName); - - long limitTime = System.currentTimeMillis() + 5000; - while((NUMBER_OF_NODES == storeBean.getAllNodesInGroup().size()) && System.currentTimeMillis() < limitTime) - { - Thread.sleep(100l); - } - - int numberOfDataRowsAfterRemoval = storeBean.getAllNodesInGroup().size(); - assertEquals("Unexpected number of data rows after test", NUMBER_OF_NODES - 1, numberOfDataRowsAfterRemoval); - } - - public void testVirtualHostOperationsDeniedForNonMasterNode() throws Exception - { - final Connection activeConnection = getConnection(_brokerFailoverUrl); - final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection); - - ManagedBroker inactiveBroker = getManagedBrokerBeanForNodeAtBrokerPort(inactiveBrokerPortNumber); - - try - { - inactiveBroker.createNewQueue(getTestQueueName(), null, true); - fail("Exception not thrown"); - } - catch (Exception e) - { - String message = e.getMessage(); - assertEquals("The virtual host state of UNAVAILABLE does not permit this operation.", message); - } - - try - { - inactiveBroker.createNewExchange(getName(), "direct", true); - fail("Exception not thrown"); - } - catch (Exception e) - { - String message = e.getMessage(); - assertEquals("The virtual host state of UNAVAILABLE does not permit this operation.", message); - } - } - - public void testSetDesignatedPrimary() throws Exception - { - int brokerPort = _clusterCreator.getBrokerPortNumbersForNodes().iterator().next(); - final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPort); - assertFalse("Unexpected designated primary before change", storeBean.getDesignatedPrimary()); - storeBean.setDesignatedPrimary(true); - long limit = System.currentTimeMillis() + 5000; - while(!storeBean.getDesignatedPrimary() && System.currentTimeMillis() < limit) - { - Thread.sleep(100l); - } - assertTrue("Unexpected designated primary after change", storeBean.getDesignatedPrimary()); - } - - public void testVirtualHostMbeanOnMasterTransfer() throws Exception - { - Connection connection = getConnection(_brokerFailoverUrl); - int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); - LOGGER.info("Active connection port " + activeBrokerPort); - connection.close(); - - Set ports = _clusterCreator.getBrokerPortNumbersForNodes(); - ports.remove(activeBrokerPort); - - int inactiveBrokerPort = ports.iterator().next(); - LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort); - - ManagedBroker inactiveVirtualHostMBean = getManagedBrokerBeanForNodeAtBrokerPort(inactiveBrokerPort); - - try - { - inactiveVirtualHostMBean.createNewQueue(getTestQueueName(), null, true); - fail("Exception not thrown"); - } - catch (Exception e) - { - String message = e.getMessage(); - assertEquals("The virtual host state of UNAVAILABLE does not permit this operation.", message); - } - - Map attributes = _clusterCreator.getNodeAttributes(inactiveBrokerPort); - assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE)); - _clusterCreator.setNodeAttributes(inactiveBrokerPort, Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER")); - - _clusterCreator.awaitNodeToAttainRole(inactiveBrokerPort, "MASTER"); - - awaitVirtualHostAtNode(inactiveBrokerPort); - - ManagedBroker activeVirtualHostMBean = getManagedBrokerBeanForNodeAtBrokerPort(inactiveBrokerPort); - activeVirtualHostMBean.createNewQueue(getTestQueueName() + inactiveBrokerPort, null, true); - } - - public void awaitVirtualHostAtNode(int brokerPort) throws Exception - { - final long startTime = System.currentTimeMillis(); - Map data = Collections.emptyMap(); - String nodeName = _clusterCreator.getNodeNameForBrokerPort(brokerPort); - RestTestHelper restHelper = _clusterCreator.createRestTestHelper(brokerPort); - while(!State.ACTIVE.name().equals(data.get(VirtualHost.STATE)) && (System.currentTimeMillis() - startTime) < 30000) - { - LOGGER.debug("Awaiting virtual host '" + nodeName + "' to transit into active state"); - List> results= restHelper.getJsonAsList("virtualhost/" + nodeName + "/" + VIRTUAL_HOST); - if (results.size()== 1) - { - data = results.get(0); - } - - if (!State.ACTIVE.name().equals(data.get(VirtualHost.STATE))) - { - Thread.sleep(1000); - } - } - Assert.assertEquals("Virtual host is not active", State.ACTIVE.name(), data.get(VirtualHost.STATE)); - LOGGER.debug("Virtual host '" + nodeName + "' is in active state"); - } - - private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort(final int brokerPortNumber) throws Exception - { - _jmxUtils.open(brokerPortNumber); - - return _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY); - } - - private ManagedBroker getManagedBrokerBeanForNodeAtBrokerPort(final int brokerPortNumber) throws Exception - { - _jmxUtils.open(brokerPortNumber); - - return _jmxUtils.getManagedBroker(VIRTUAL_HOST); - } - - private void awaitAllNodesJoiningGroup(ManagedBDBHAMessageStore storeBean, int expectedNumberOfNodes) throws Exception - { - long totalTimeWaited = 0l; - long waitInterval = 100l; - long maxWaitTime = 10000; - - int currentNumberOfNodes = storeBean.getAllNodesInGroup().size(); - while (expectedNumberOfNodes > currentNumberOfNodes || totalTimeWaited > maxWaitTime) - { - LOGGER.debug("Still awaiting nodes to join group; expecting " - + expectedNumberOfNodes + " node(s) but only have " + currentNumberOfNodes - + " after " + totalTimeWaited + " ms."); - - totalTimeWaited += waitInterval; - Thread.sleep(waitInterval); - - currentNumberOfNodes = storeBean.getAllNodesInGroup().size(); - } - - assertEquals("Unexpected number of nodes in group after " + totalTimeWaited + " ms", - expectedNumberOfNodes ,currentNumberOfNodes); - } -} diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java deleted file mode 100644 index 8df419c3a7..0000000000 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.File; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.management.ObjectName; - -import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; -import org.apache.qpid.test.utils.JMXTestUtils; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -public class HAClusterTwoNodeTest extends QpidBrokerTestCase -{ - private static final String VIRTUAL_HOST = "test"; - - private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(VIRTUAL_HOST); - private static final int NUMBER_OF_NODES = 2; - - private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); - private final JMXTestUtils _jmxUtils = new JMXTestUtils(this); - - private ConnectionURL _brokerFailoverUrl; - - @Override - protected void setUp() throws Exception - { - _brokerType = BrokerType.SPAWNED; - - assertTrue(isJavaBroker()); - assertTrue(isBrokerStorePersistent()); - - super.setUp(); - } - - @Override - protected void tearDown() throws Exception - { - try - { - _jmxUtils.close(); - } - finally - { - super.tearDown(); - } - } - - @Override - public void startBroker() throws Exception - { - // Don't start default broker provided by QBTC. - } - - private void startCluster(boolean designedPrimary) throws Exception - { - setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); - _clusterCreator.configureClusterNodes(); - _clusterCreator.setDesignatedPrimaryOnFirstBroker(designedPrimary); - _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); - _clusterCreator.startCluster(); - } - - public void testMasterDesignatedPrimaryCanBeRestartedWithoutReplica() throws Exception - { - startCluster(true); - final Connection initialConnection = getConnection(_brokerFailoverUrl); - int masterPort = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection); - assertProducingConsuming(initialConnection); - initialConnection.close(); - _clusterCreator.stopCluster(); - _clusterCreator.startNode(masterPort); - final Connection secondConnection = getConnection(_brokerFailoverUrl); - assertProducingConsuming(secondConnection); - secondConnection.close(); - } - - public void testClusterRestartWithoutDesignatedPrimary() throws Exception - { - startCluster(false); - final Connection initialConnection = getConnection(_brokerFailoverUrl); - assertProducingConsuming(initialConnection); - initialConnection.close(); - _clusterCreator.stopCluster(); - _clusterCreator.startClusterParallel(); - final Connection secondConnection = getConnection(_brokerFailoverUrl); - assertProducingConsuming(secondConnection); - secondConnection.close(); - } - - public void testDesignatedPrimaryContinuesAfterSecondaryStopped() throws Exception - { - startCluster(true); - _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); - final Connection connection = getConnection(_brokerFailoverUrl); - assertNotNull("Expected to get a valid connection to primary", connection); - assertProducingConsuming(connection); - } - - public void testPersistentOperationsFailOnNonDesignatedPrimaryAfterSecondaryStopped() throws Exception - { - startCluster(false); - _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); - - try - { - Connection connection = getConnection(_brokerFailoverUrl); - assertProducingConsuming(connection); - fail("Exception not thrown"); - } - catch(JMSException e) - { - // JMSException should be thrown either on getConnection, or produce/consume - // depending on whether the relative timing of the node discovering that the - // secondary has gone. - } - } - - public void testSecondaryDoesNotBecomePrimaryWhenDesignatedPrimaryStopped() throws Exception - { - startCluster(true); - _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary()); - - try - { - getConnection(_brokerFailoverUrl); - fail("Connection not expected"); - } - catch (JMSException e) - { - // PASS - } - } - - public void testInitialDesignatedPrimaryStateOfNodes() throws Exception - { - startCluster(true); - final ManagedBDBHAMessageStore primaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfPrimary()); - assertTrue("Expected primary node to be set as designated primary", primaryStoreBean.getDesignatedPrimary()); - - final ManagedBDBHAMessageStore secondaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); - assertFalse("Expected secondary node to NOT be set as designated primary", secondaryStoreBean.getDesignatedPrimary()); - } - - public void testSecondaryDesignatedAsPrimaryAfterOriginalPrimaryStopped() throws Exception - { - startCluster(true); - final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); - _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary()); - - assertFalse("Expected node to NOT be set as designated primary", storeBean.getDesignatedPrimary()); - storeBean.setDesignatedPrimary(true); - - long limit = System.currentTimeMillis() + 5000; - while( !storeBean.getDesignatedPrimary() && System.currentTimeMillis() < limit) - { - Thread.sleep(100); - } - assertTrue("Expected node to now be set as designated primary", storeBean.getDesignatedPrimary()); - - final Connection connection = getConnection(_brokerFailoverUrl); - assertNotNull("Expected to get a valid connection to new primary", connection); - assertProducingConsuming(connection); - } - - private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort( - final int activeBrokerPortNumber) throws Exception - { - _jmxUtils.open(activeBrokerPortNumber); - - ManagedBDBHAMessageStore storeBean = _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY); - return storeBean; - } - -} diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java deleted file mode 100644 index ef5cc7c464..0000000000 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java +++ /dev/null @@ -1,250 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.File; -import java.util.Set; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Queue; -import javax.jms.Session; - -import org.apache.log4j.Logger; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.url.URLSyntaxException; - -/** - * The HA white box tests test the BDB cluster where the test retains the knowledge of the - * individual test nodes. It uses this knowledge to examine the nodes to ensure that they - * remain in the correct state throughout the test. - * - * @see HAClusterBlackboxTest - */ -public class HAClusterWhiteboxTest extends QpidBrokerTestCase -{ - protected static final Logger LOGGER = Logger.getLogger(HAClusterWhiteboxTest.class); - - private static final String VIRTUAL_HOST = "test"; - - private final int NUMBER_OF_NODES = 3; - private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); - - @Override - protected void setUp() throws Exception - { - _brokerType = BrokerType.SPAWNED; - - assertTrue(isJavaBroker()); - assertTrue(isBrokerStorePersistent()); - - setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); - - _clusterCreator.configureClusterNodes(); - _clusterCreator.startCluster(); - - super.setUp(); - } - - @Override - public void startBroker() throws Exception - { - // Don't start default broker provided by QBTC. - } - - public void testClusterPermitsConnectionToOnlyOneNode() throws Exception - { - int connectionSuccesses = 0; - int connectionFails = 0; - - for (int brokerPortNumber : getBrokerPortNumbers()) - { - try - { - getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithoutRetry(brokerPortNumber)); - connectionSuccesses++; - } - catch(JMSException e) - { - assertTrue(e.getMessage().contains("Virtual host '" + VIRTUAL_HOST + "' is not active")); - connectionFails++; - } - } - - assertEquals("Unexpected number of failed connections", NUMBER_OF_NODES - 1, connectionFails); - assertEquals("Unexpected number of successful connections", 1, connectionSuccesses); - } - - public void testClusterThatLosesNodeStillAllowsConnection() throws Exception - { - final Connection initialConnection = getConnectionToNodeInCluster(); - assertNotNull(initialConnection); - - closeConnectionAndKillBroker(initialConnection); - - final Connection subsequentConnection = getConnectionToNodeInCluster(); - assertNotNull(subsequentConnection); - - // verify that JMS persistence operations are working - assertProducingConsuming(subsequentConnection); - - closeConnection(initialConnection); - } - - public void testClusterThatLosesAllButOneNodeRefusesConnection() throws Exception - { - final Connection initialConnection = getConnectionToNodeInCluster(); - assertNotNull(initialConnection); - - closeConnectionAndKillBroker(initialConnection); - - final Connection subsequentConnection = getConnectionToNodeInCluster(); - assertNotNull(subsequentConnection); - final int subsequentPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(subsequentConnection); - - killBroker(subsequentPortNumber); - - final Connection finalConnection = getConnectionToNodeInCluster(); - assertNull(finalConnection); - - closeConnection(initialConnection); - } - - public void testClusterWithRestartedNodeStillAllowsConnection() throws Exception - { - final Connection connection = getConnectionToNodeInCluster(); - assertNotNull(connection); - - final int brokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(connection); - connection.close(); - - _clusterCreator.stopNode(brokerPortNumber); - _clusterCreator.startNode(brokerPortNumber); - - final Connection subsequentConnection = getConnectionToNodeInCluster(); - assertNotNull(subsequentConnection); - } - - public void testClusterLosingNodeRetainsData() throws Exception - { - final Connection initialConnection = getConnectionToNodeInCluster(); - - final String queueNamePrefix = getTestQueueName(); - final String inbuiltExchangeQueueUrl = "direct://amq.direct/" + queueNamePrefix + "1/" + queueNamePrefix + "1?durable='true'"; - final String customExchangeQueueUrl = "direct://my.exchange/" + queueNamePrefix + "2/" + queueNamePrefix + "2?durable='true'"; - - populateBrokerWithData(initialConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl); - - closeConnectionAndKillBroker(initialConnection); - - final Connection subsequentConnection = getConnectionToNodeInCluster(); - - assertNotNull("no valid connection obtained", subsequentConnection); - - checkBrokerData(subsequentConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl); - } - - public void xtestRecoveryOfOutOfDateNode() throws Exception - { - /* - * TODO: Implement - * - * Cant yet find a way to control cleaning in a deterministic way to allow provoking - * a node to become out of date. We do now know that even a new joiner to the group - * can throw the InsufficientLogException, so ensuring an existing cluster of nodes has - * done *any* cleaning and then adding a new node should be sufficient to cause this. - */ - } - - private void populateBrokerWithData(final Connection connection, final String... queueUrls) throws JMSException, Exception - { - populateBrokerWithData(connection, 1, queueUrls); - } - - private void populateBrokerWithData(final Connection connection, int noOfMessages, final String... queueUrls) throws JMSException, Exception - { - final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - for (final String queueUrl : queueUrls) - { - final Queue queue = session.createQueue(queueUrl); - session.createConsumer(queue).close(); - sendMessage(session, queue, noOfMessages); - } - } - - private void checkBrokerData(final Connection connection, final String... queueUrls) throws JMSException - { - connection.start(); - final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - for (final String queueUrl : queueUrls) - { - final Queue queue = session.createQueue(queueUrl); - final MessageConsumer consumer = session.createConsumer(queue); - final Message message = consumer.receive(1000); - session.commit(); - assertNotNull("Queue " + queue + " should have message", message); - assertEquals("Queue " + queue + " message has unexpected content", 0, message.getIntProperty(INDEX)); - } - } - - private Connection getConnectionToNodeInCluster() throws URLSyntaxException - { - Connection connection = null; - Set runningBrokerPorts = getBrokerPortNumbers(); - - for (int brokerPortNumber : runningBrokerPorts) - { - try - { - connection = getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithRetry(brokerPortNumber)); - break; - } - catch(JMSException je) - { - assertTrue(je.getMessage().contains("Virtual host '" + VIRTUAL_HOST + "' is not active")); - } - } - return connection; - } - - private void closeConnectionAndKillBroker(final Connection initialConnection) throws Exception - { - final int initialPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection); - initialConnection.close(); - - killBroker(initialPortNumber); // kill awaits the death of the child - } - - private void closeConnection(final Connection initialConnection) - { - try - { - initialConnection.close(); - } - catch(Exception e) - { - // ignore. - // java.net.SocketException is seen sometimes on active connection - } - } -} diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java deleted file mode 100644 index ebc32b482a..0000000000 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java +++ /dev/null @@ -1,529 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.File; -import java.io.IOException; -import java.io.StringWriter; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.Callable; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; - -import org.apache.commons.lang.StringUtils; -import org.apache.log4j.Logger; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQConnectionURL; -import org.apache.qpid.server.management.plugin.HttpManagement; -import org.apache.qpid.server.model.Plugin; -import org.apache.qpid.server.model.Port; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.model.VirtualHostNode; -import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost; -import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl; -import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode; -import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode; -import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; -import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNodeImpl; -import org.apache.qpid.systest.rest.RestTestHelper; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.test.utils.TestBrokerConfiguration; -import org.apache.qpid.url.URLSyntaxException; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.SerializationConfig; -import org.junit.Assert; - -import com.sleepycat.je.rep.ReplicationConfig; - -public class HATestClusterCreator -{ - protected static final Logger LOGGER = Logger.getLogger(HATestClusterCreator.class); - - private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''"; - private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'"; - - private static final int FAILOVER_CYCLECOUNT = 10; - private static final int FAILOVER_RETRIES = 1; - private static final int FAILOVER_CONNECTDELAY = 1000; - - private static final String SINGLE_BROKER_URL_WITH_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''"; - private static final String SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d'"; - - private static final int RETRIES = 60; - private static final int CONNECTDELAY = 75; - - private final QpidBrokerTestCase _testcase; - private final Map _brokerPortToBdbPortMap = new TreeMap(); - private final String _virtualHostName; - - private final String _ipAddressOfBroker; - private final String _groupName ; - private final int _numberOfNodes; - private int _bdbHelperPort; - private int _primaryBrokerPort; - - public HATestClusterCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes) - { - _testcase = testcase; - _virtualHostName = virtualHostName; - _groupName = virtualHostName; - _ipAddressOfBroker = getIpAddressOfBrokerHost(); - _numberOfNodes = numberOfNodes; - _bdbHelperPort = 0; - } - - public void configureClusterNodes() throws Exception - { - int brokerPort = _testcase.findFreePort(); - - int[] bdbPorts = new int[_numberOfNodes]; - for (int i = 0; i < _numberOfNodes; i++) - { - int bdbPort = _testcase.getNextAvailable(brokerPort + 1); - bdbPorts[i] = bdbPort; - _brokerPortToBdbPortMap.put(brokerPort, bdbPort); - brokerPort = _testcase.getNextAvailable(bdbPort + 1); - } - - String bluePrintJson = getBlueprint(_ipAddressOfBroker, bdbPorts); - - String helperName = null; - for (Map.Entry entry: _brokerPortToBdbPortMap.entrySet()) - { - brokerPort = entry.getKey(); - int bdbPort = entry.getValue(); - LOGGER.debug("Cluster broker port " + brokerPort + ", bdb replication port " + bdbPort); - if (_bdbHelperPort == 0) - { - _bdbHelperPort = bdbPort; - } - - String nodeName = getNodeNameForNodeAt(bdbPort); - if (helperName == null) - { - helperName = nodeName; - } - - Map virtualHostNodeAttributes = new HashMap(); - virtualHostNodeAttributes.put(BDBHAVirtualHostNode.STORE_PATH, System.getProperty("QPID_WORK") + File.separator + brokerPort); - virtualHostNodeAttributes.put(BDBHAVirtualHostNode.GROUP_NAME, _groupName); - virtualHostNodeAttributes.put(BDBHAVirtualHostNode.NAME, nodeName); - virtualHostNodeAttributes.put(BDBHAVirtualHostNode.ADDRESS, getNodeHostPortForNodeAt(bdbPort)); - virtualHostNodeAttributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, getHelperHostPort()); - virtualHostNodeAttributes.put(BDBHAVirtualHostNode.TYPE, BDBHAVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE); - virtualHostNodeAttributes.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, helperName); - - Map context = new HashMap<>(); - context.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s"); - context.put(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES, "0"); - context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR, bluePrintJson); - virtualHostNodeAttributes.put(BDBHAVirtualHostNode.CONTEXT, context); - - TestBrokerConfiguration brokerConfiguration = _testcase.getBrokerConfiguration(brokerPort); - brokerConfiguration.addJmxManagementConfiguration(); - brokerConfiguration.addHttpManagementConfiguration(); - brokerConfiguration.setObjectAttribute(Plugin.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_MANAGEMENT, HttpManagement.HTTP_BASIC_AUTHENTICATION_ENABLED, true); - brokerConfiguration.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT, Port.PORT, _testcase.getHttpManagementPort(brokerPort)); - brokerConfiguration.setObjectAttributes(VirtualHostNode.class, _virtualHostName, virtualHostNodeAttributes); - - } - _primaryBrokerPort = getPrimaryBrokerPort(); - } - - public void setDesignatedPrimaryOnFirstBroker(boolean designatedPrimary) throws Exception - { - if (_numberOfNodes != 2) - { - throw new IllegalArgumentException("Only two nodes groups have the concept of primary"); - } - TestBrokerConfiguration config = _testcase.getBrokerConfiguration(_primaryBrokerPort); - String nodeName = getNodeNameForNodeAt(_brokerPortToBdbPortMap.get(_primaryBrokerPort)); - config.setObjectAttribute(VirtualHostNode.class, nodeName, BDBHAVirtualHostNode.DESIGNATED_PRIMARY, designatedPrimary); - config.setSaved(false); - } - - private int getPrimaryBrokerPort() - { - return _brokerPortToBdbPortMap.keySet().iterator().next(); - } - - public void startNode(final int brokerPortNumber) throws Exception - { - _testcase.startBroker(brokerPortNumber); - } - - public void startCluster() throws Exception - { - for (final Integer brokerPortNumber : _brokerPortToBdbPortMap.keySet()) - { - startNode(brokerPortNumber); - } - } - - public void startClusterParallel() throws Exception - { - final ExecutorService executor = Executors.newFixedThreadPool(_brokerPortToBdbPortMap.size()); - try - { - List> brokers = new CopyOnWriteArrayList>(); - for (final Integer brokerPortNumber : _brokerPortToBdbPortMap.keySet()) - { - final TestBrokerConfiguration brokerConfig = _testcase.getBrokerConfiguration(brokerPortNumber); - Future future = executor.submit(new Callable() - { - public Object call() - { - try - { - _testcase.startBroker(brokerPortNumber, brokerConfig); - return "OK"; - } - catch (Exception e) - { - return e; - } - } - }); - brokers.add(future); - } - for (Future future : brokers) - { - Object result = future.get(30, TimeUnit.SECONDS); - LOGGER.debug("Node startup result:" + result); - if (result instanceof Exception) - { - throw (Exception) result; - } - else if (!"OK".equals(result)) - { - throw new Exception("One of the cluster nodes is not started"); - } - } - } - catch (Exception e) - { - stopCluster(); - throw e; - } - finally - { - executor.shutdown(); - } - - } - - public void stopNode(final int brokerPortNumber) - { - _testcase.killBroker(brokerPortNumber); - } - - public void stopCluster() throws Exception - { - for (final Integer brokerPortNumber : _brokerPortToBdbPortMap.keySet()) - { - try - { - stopNode(brokerPortNumber); - } - catch(Exception e) - { - LOGGER.warn("Failed to stop node on port:" + brokerPortNumber); - } - } - } - - public int getBrokerPortNumberFromConnection(Connection connection) - { - final AMQConnection amqConnection = (AMQConnection)connection; - return amqConnection.getActiveBrokerDetails().getPort(); - } - - public int getPortNumberOfAnInactiveBroker(final Connection activeConnection) - { - final Set allBrokerPorts = _testcase.getBrokerPortNumbers(); - LOGGER.debug("Broker ports:" + allBrokerPorts); - final int activeBrokerPort = getBrokerPortNumberFromConnection(activeConnection); - allBrokerPorts.remove(activeBrokerPort); - LOGGER.debug("Broker ports:" + allBrokerPorts); - final int inactiveBrokerPort = allBrokerPorts.iterator().next(); - return inactiveBrokerPort; - } - - public int getBdbPortForBrokerPort(final int brokerPortNumber) - { - return _brokerPortToBdbPortMap.get(brokerPortNumber); - } - - public Set getBdbPortNumbers() - { - return new HashSet(_brokerPortToBdbPortMap.values()); - } - - public AMQConnectionURL getConnectionUrlForAllClusterNodes() throws Exception - { - final StringBuilder brokerList = new StringBuilder(); - - for(Iterator itr = _brokerPortToBdbPortMap.keySet().iterator(); itr.hasNext(); ) - { - int brokerPortNumber = itr.next(); - - brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, FAILOVER_CONNECTDELAY, FAILOVER_RETRIES)); - if (itr.hasNext()) - { - brokerList.append(";"); - } - } - - return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, FAILOVER_CYCLECOUNT)); - } - - public AMQConnectionURL getConnectionUrlForSingleNodeWithoutRetry(final int brokerPortNumber) throws URLSyntaxException - { - return getConnectionUrlForSingleNode(brokerPortNumber, false); - } - - public AMQConnectionURL getConnectionUrlForSingleNodeWithRetry(final int brokerPortNumber) throws URLSyntaxException - { - return getConnectionUrlForSingleNode(brokerPortNumber, true); - } - - private AMQConnectionURL getConnectionUrlForSingleNode(final int brokerPortNumber, boolean retryAllowed) throws URLSyntaxException - { - final String url; - if (retryAllowed) - { - url = String.format(SINGLE_BROKER_URL_WITH_RETRY_FORMAT, _virtualHostName, brokerPortNumber, CONNECTDELAY, RETRIES); - } - else - { - url = String.format(SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT, _virtualHostName, brokerPortNumber); - } - - return new AMQConnectionURL(url); - } - - public String getGroupName() - { - return _groupName; - } - - public String getNodeNameForNodeAt(final int bdbPort) - { - return "node" + _testcase.getName() + bdbPort; - } - - public String getNodeHostPortForNodeAt(final int bdbPort) - { - return _ipAddressOfBroker + ":" + bdbPort; - } - - public String getHelperHostPort() - { - if (_bdbHelperPort == 0) - { - throw new IllegalStateException("Helper port not yet assigned."); - } - - return _ipAddressOfBroker + ":" + _bdbHelperPort; - } - - public void setHelperHostPort(int bdbHelperPort) - { - _bdbHelperPort = bdbHelperPort; - } - - public int getBrokerPortNumberOfPrimary() - { - if (_numberOfNodes != 2) - { - throw new IllegalArgumentException("Only two nodes groups have the concept of primary"); - } - - return _primaryBrokerPort; - } - - public int getBrokerPortNumberOfSecondaryNode() - { - final Set portNumbers = getBrokerPortNumbersForNodes(); - portNumbers.remove(getBrokerPortNumberOfPrimary()); - return portNumbers.iterator().next(); - } - - public Set getBrokerPortNumbersForNodes() - { - return new HashSet(_brokerPortToBdbPortMap.keySet()); - } - - - public String getIpAddressOfBrokerHost() - { - String brokerHost = _testcase.getBroker().getHost(); - try - { - return InetAddress.getByName(brokerHost).getHostAddress(); - } - catch (UnknownHostException e) - { - throw new RuntimeException("Could not determine IP address of host : " + brokerHost, e); - } - } - - public void modifyClusterNodeBdbAddress(int brokerPortNumberToBeMoved, int newBdbPort) - { - TestBrokerConfiguration config = _testcase.getBrokerConfiguration(brokerPortNumberToBeMoved); - String nodeName = getNodeNameForNodeAt(_brokerPortToBdbPortMap.get(brokerPortNumberToBeMoved)); - - Map objectAttributes = config.getObjectAttributes(VirtualHostNode.class, nodeName); - - String oldBdbHostPort = (String)objectAttributes.get(BDBHAVirtualHostNode.ADDRESS); - String[] oldHostAndPort = StringUtils.split(oldBdbHostPort, ":"); - String oldHost = oldHostAndPort[0]; - String newBdbHostPort = oldHost + ":" + newBdbPort; - config.setObjectAttribute(VirtualHostNode.class, nodeName, BDBHAVirtualHostNode.ADDRESS, newBdbHostPort); - config.setSaved(false); - } - - public String getNodeNameForBrokerPort(final int brokerPort) - { - return getNodeNameForNodeAt(_brokerPortToBdbPortMap.get(brokerPort)); - } - - public void setNodeAttributes(int brokerPort, Map attributeMap) - throws Exception - { - setNodeAttributes(brokerPort, brokerPort, attributeMap); - } - - public void setNodeAttributes(int localNodePort, int remoteNodePort, Map attributeMap) - throws Exception - { - RestTestHelper restHelper = createRestTestHelper(localNodePort); - String url = getNodeRestUrl(localNodePort, remoteNodePort); - int status = restHelper.submitRequest(url, "PUT", attributeMap); - if (status != 200) - { - throw new Exception("Unexpected http status when updating " + getNodeNameForBrokerPort(remoteNodePort) + " attribute's : " + status); - } - } - - private String getNodeRestUrl(int localNodePort, int remoteNodePort) - { - String remoteNodeName = getNodeNameForBrokerPort(remoteNodePort); - String localNodeName = getNodeNameForBrokerPort(localNodePort); - String url = null; - if (localNodePort == remoteNodePort) - { - url = "/api/latest/virtualhostnode/" + localNodeName; - } - else - { - url = "/api/latest/replicationnode/" + localNodeName + "/" + remoteNodeName; - } - return url; - } - - public Map getNodeAttributes(int brokerPort) throws IOException - { - return getNodeAttributes(brokerPort, brokerPort); - } - - public Map getNodeAttributes(int localNodePort, int remoteNodePort) throws IOException - { - RestTestHelper restHelper = createRestTestHelper(localNodePort); - List> results= restHelper.getJsonAsList(getNodeRestUrl(localNodePort, remoteNodePort)); - int size = results.size(); - if (size == 0) - { - return Collections.emptyMap(); - } - else if (size == 1) - { - return results.get(0); - } - else - { - throw new RuntimeException("Unexpected number of nodes " + size); - } - } - - public void awaitNodeToAttainRole(int brokerPort, String desiredRole) throws Exception - { - awaitNodeToAttainRole(brokerPort, brokerPort, desiredRole); - } - - public void awaitNodeToAttainRole(int localNodePort, int remoteNodePort, String desiredRole) throws Exception - { - final long startTime = System.currentTimeMillis(); - Map data = Collections.emptyMap(); - - while(!desiredRole.equals(data.get(BDBHARemoteReplicationNode.ROLE)) && (System.currentTimeMillis() - startTime) < 30000) - { - LOGGER.debug("Awaiting node '" + getNodeNameForBrokerPort(remoteNodePort) + "' to transit into " + desiredRole + " role"); - data = getNodeAttributes(localNodePort, remoteNodePort); - if (!desiredRole.equals(data.get(BDBHARemoteReplicationNode.ROLE))) - { - Thread.sleep(1000); - } - } - LOGGER.debug("Node '" + getNodeNameForBrokerPort(remoteNodePort) + "' role is " + data.get(BDBHARemoteReplicationNode.ROLE)); - Assert.assertEquals("Node is in unexpected role", desiredRole, data.get(BDBHARemoteReplicationNode.ROLE)); - } - - public RestTestHelper createRestTestHelper(int brokerPort) - { - int httpPort = _testcase.getHttpManagementPort(brokerPort); - RestTestHelper helper = new RestTestHelper(httpPort); - helper.setUsernameAndPassword("webadmin", "webadmin"); - return helper; - } - - public static String getBlueprint(String hostName, int... ports) throws Exception - { - List permittedNodes = new ArrayList(); - for (int port:ports) - { - permittedNodes.add(hostName + ":" + port); - } - Map bluePrint = new HashMap<>(); - bluePrint.put(VirtualHost.TYPE, BDBHAVirtualHostImpl.VIRTUAL_HOST_TYPE); - bluePrint.put(BDBHAVirtualHost.PERMITTED_NODES, permittedNodes); - - StringWriter writer = new StringWriter(); - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true); - mapper.writeValue(writer, bluePrint); - return writer.toString(); - } -} diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java new file mode 100644 index 0000000000..fab889a49f --- /dev/null +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBVirtualHostNode; +import org.apache.qpid.test.utils.Piper; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.util.FileUtils; +import org.apache.qpid.util.Strings; +import org.apache.qpid.util.SystemUtils; + +/** + * Tests the BDB backup script can successfully perform a backup and that + * backup can be restored and used by the Broker. + */ +public class BDBBackupTest extends QpidBrokerTestCase +{ + protected static final Logger LOGGER = Logger.getLogger(BDBBackupTest.class); + + private static final String BACKUP_SCRIPT = "/bin/backup.sh"; + private static final String BACKUP_COMPLETE_MESSAGE = "Hot Backup Completed"; + + private static final String TEST_VHOST = "test"; + private static final String SYSTEM_TMP_DIR = System.getProperty("java.io.tmpdir"); + + private File _backupToDir; + private File _backupFromDir; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + _backupToDir = new File(SYSTEM_TMP_DIR + File.separator + getTestName()); + _backupToDir.mkdirs(); + + Map virtualHostNodeAttributes = getBrokerConfiguration().getObjectAttributes(VirtualHostNode.class, TEST_VHOST); + _backupFromDir = new File(Strings.expand((String) virtualHostNodeAttributes.get(BDBVirtualHostNode.STORE_PATH))); + boolean fromDirExistsAndIsDir = _backupFromDir.isDirectory(); + assertTrue("backupFromDir " + _backupFromDir + " should already exist", fromDirExistsAndIsDir); + } + + @Override + protected void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + FileUtils.delete(_backupToDir, true); + } + } + + public void testBackupAndRestoreMaintainsMessages() throws Exception + { + sendNumberedMessages(0, 10); + invokeBdbBackup(_backupFromDir, _backupToDir); + sendNumberedMessages(10, 20); + confirmBrokerHasMessages(0, 20); + stopBroker(); + + deleteStore(_backupFromDir); + replaceStoreWithBackup(_backupToDir, _backupFromDir); + + startBroker(); + confirmBrokerHasMessages(0, 10); + } + + private void sendNumberedMessages(final int startIndex, final int endIndex) throws JMSException, Exception + { + Connection con = getConnection(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(getTestQueueName()); + // Create queue by consumer side-effect + session.createConsumer(destination).close(); + + final int numOfMessages = endIndex - startIndex; + final int batchSize = 0; + sendMessage(session, destination, numOfMessages, startIndex, batchSize); + con.close(); + } + + private void confirmBrokerHasMessages(final int startIndex, final int endIndex) throws Exception + { + Connection con = getConnection(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + con.start(); + Destination destination = session.createQueue(getTestQueueName()); + MessageConsumer consumer = session.createConsumer(destination); + for (int i = startIndex; i < endIndex; i++) + { + Message msg = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message " + i + " not received", msg); + assertEquals("Did not receive the expected message", i, msg.getIntProperty(INDEX)); + } + + Message msg = consumer.receive(100); + if(msg != null) + { + fail("No more messages should be received, but received additional message with index: " + msg.getIntProperty(INDEX)); + } + con.close(); + } + + private void invokeBdbBackup(final File backupFromDir, final File backupToDir) throws Exception + { + if (SystemUtils.isWindows()) + { + BDBBackup.main(new String[]{"-todir", backupToDir.getAbsolutePath(), "-fromdir", backupFromDir.getAbsolutePath()}); + } + else + { + runBdbBackupScript(backupFromDir, backupToDir); + } + } + + private void runBdbBackupScript(final File backupFromDir, final File backupToDir) throws IOException, + InterruptedException + { + Process backupProcess = null; + try + { + String qpidHome = System.getProperty(QPID_HOME); + ProcessBuilder pb = new ProcessBuilder(qpidHome + BACKUP_SCRIPT, "-todir", backupToDir.getAbsolutePath(), "-fromdir", backupFromDir.getAbsolutePath()); + pb.redirectErrorStream(true); + Map env = pb.environment(); + env.put(QPID_HOME, qpidHome); + + LOGGER.debug("Backup command is " + pb.command()); + backupProcess = pb.start(); + Piper piper = new Piper(backupProcess.getInputStream(), _testcaseOutputStream, null, BACKUP_COMPLETE_MESSAGE); + piper.start(); + piper.await(2, TimeUnit.SECONDS); + backupProcess.waitFor(); + piper.join(); + + LOGGER.debug("Backup command completed " + backupProcess.exitValue()); + assertEquals("Unexpected exit value from backup script", 0, backupProcess.exitValue()); + } + finally + { + if (backupProcess != null) + { + backupProcess.getErrorStream().close(); + backupProcess.getInputStream().close(); + backupProcess.getOutputStream().close(); + } + } + } + + private void replaceStoreWithBackup(File source, File dst) throws Exception + { + LOGGER.debug("Copying store " + source + " to " + dst); + FileUtils.copyRecursive(source, dst); + } + + private void deleteStore(File storeDir) + { + LOGGER.debug("Deleting store " + storeDir); + FileUtils.delete(storeDir, true); + } + +} diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java new file mode 100644 index 0000000000..a3c864d427 --- /dev/null +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java @@ -0,0 +1,455 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.servlet.http.HttpServletResponse; + +import com.sleepycat.je.Durability; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.rep.ReplicationConfig; +import org.apache.qpid.server.model.RemoteReplicationNode; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; +import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost; +import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; +import org.apache.qpid.systest.rest.Asserts; +import org.apache.qpid.systest.rest.QpidRestTestCase; +import org.apache.qpid.test.utils.TestBrokerConfiguration; + +public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase +{ + private static final String NODE1 = "node1"; + private static final String NODE2 = "node2"; + private static final String NODE3 = "node3"; + + private int _node1HaPort; + private int _node2HaPort; + private int _node3HaPort; + + private String _hostName; + private String _baseNodeRestUrl; + + @Override + public void setUp() throws Exception + { + setTestSystemProperty(ReplicatedEnvironmentFacade.REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, "1000"); + + super.setUp(); + _hostName = getTestName(); + _baseNodeRestUrl = "virtualhostnode/"; + + _node1HaPort = findFreePort(); + _node2HaPort = getNextAvailable(_node1HaPort + 1); + _node3HaPort = getNextAvailable(_node2HaPort + 1); + + + } + + @Override + protected void customizeConfiguration() throws IOException + { + super.customizeConfiguration(); + TestBrokerConfiguration config = getBrokerConfiguration(); + config.removeObjectConfiguration(VirtualHostNode.class, TEST2_VIRTUALHOST); + config.removeObjectConfiguration(VirtualHostNode.class, TEST3_VIRTUALHOST); + } + + public void testCreate3NodeGroup() throws Exception + { + createHANode(NODE1, _node1HaPort, _node1HaPort); + assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1); + createHANode(NODE2, _node2HaPort, _node1HaPort); + assertNode(NODE2, _node2HaPort, _node1HaPort, NODE1); + createHANode(NODE3, _node3HaPort, _node1HaPort); + assertNode(NODE3, _node3HaPort, _node1HaPort, NODE1); + assertRemoteNodes(NODE1, NODE2, NODE3); + } + + public void testMutateStateOfOneNode() throws Exception + { + createHANode(NODE1, _node1HaPort, _node1HaPort); + createHANode(NODE2, _node2HaPort, _node1HaPort); + createHANode(NODE3, _node3HaPort, _node1HaPort); + + String node1Url = _baseNodeRestUrl + NODE1; + String node2Url = _baseNodeRestUrl + NODE2; + String node3Url = _baseNodeRestUrl + NODE3; + + assertActualAndDesiredStates(node1Url, "ACTIVE", "ACTIVE"); + assertActualAndDesiredStates(node2Url, "ACTIVE", "ACTIVE"); + assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE"); + + mutateDesiredState(node1Url, "STOPPED"); + + assertActualAndDesiredStates(node1Url, "STOPPED", "STOPPED"); + assertActualAndDesiredStates(node2Url, "ACTIVE", "ACTIVE"); + assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE"); + + List> remoteNodes = getRestTestHelper().getJsonAsList("replicationnode/" + NODE2); + assertEquals("Unexpected number of remote nodes on " + NODE2, 2, remoteNodes.size()); + + Map remoteNode1 = findRemoteNodeByName(remoteNodes, NODE1); + + assertEquals("Node 1 observed from node 2 is in the wrong state", + "UNAVAILABLE", remoteNode1.get(BDBHARemoteReplicationNode.STATE)); + assertEquals("Node 1 observed from node 2 has the wrong role", + "UNKNOWN", remoteNode1.get(BDBHARemoteReplicationNode.ROLE)); + + } + + public void testNewMasterElectedWhenVirtualHostIsStopped() throws Exception + { + createHANode(NODE1, _node1HaPort, _node1HaPort); + createHANode(NODE2, _node2HaPort, _node1HaPort); + createHANode(NODE3, _node3HaPort, _node1HaPort); + + String node1Url = _baseNodeRestUrl + NODE1; + String node2Url = _baseNodeRestUrl + NODE2; + String node3Url = _baseNodeRestUrl + NODE3; + + assertActualAndDesiredStates(node1Url, "ACTIVE", "ACTIVE"); + assertActualAndDesiredStates(node2Url, "ACTIVE", "ACTIVE"); + assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE"); + + // Put virtualhost in STOPPED state + String virtualHostRestUrl = "virtualhost/" + NODE1 + "/" + _hostName; + assertActualAndDesiredStates(virtualHostRestUrl, "ACTIVE", "ACTIVE"); + mutateDesiredState(virtualHostRestUrl, "STOPPED"); + assertActualAndDesiredStates(virtualHostRestUrl, "STOPPED", "STOPPED"); + + // Now stop node 1 to cause an election between nodes 2 & 3 + mutateDesiredState(node1Url, "STOPPED"); + assertActualAndDesiredStates(node1Url, "STOPPED", "STOPPED"); + + Map newMasterData = awaitNewMaster(node2Url, node3Url); + + //Check the virtual host of the new master is in the stopped state + String newMasterVirtualHostRestUrl = "virtualhost/" + newMasterData.get(BDBHAVirtualHostNode.NAME) + "/" + _hostName; + assertActualAndDesiredStates(newMasterVirtualHostRestUrl, "STOPPED", "STOPPED"); + } + + public void testDeleteReplicaNode() throws Exception + { + createHANode(NODE1, _node1HaPort, _node1HaPort); + createHANode(NODE2, _node2HaPort, _node1HaPort); + createHANode(NODE3, _node3HaPort, _node1HaPort); + + assertRemoteNodes(NODE1, NODE2, NODE3); + + List> data = getRestTestHelper().getJsonAsList("replicationnode/" + NODE1); + assertEquals("Unexpected number of remote nodes on " + NODE1, 2, data.size()); + + int responseCode = getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE2, "DELETE"); + assertEquals("Unexpected response code on deletion of virtual host node " + NODE2, 200, responseCode); + + int counter = 0; + while (data.size() != 1 && counter<50) + { + data = getRestTestHelper().getJsonAsList("replicationnode/" + NODE1); + if (data.size() != 1) + { + Thread.sleep(100l); + } + counter++; + } + assertEquals("Unexpected number of remote nodes on " + NODE1, 1, data.size()); + } + + public void testDeleteMasterNode() throws Exception + { + createHANode(NODE1, _node1HaPort, _node1HaPort); + createHANode(NODE2, _node2HaPort, _node1HaPort); + createHANode(NODE3, _node3HaPort, _node1HaPort); + + assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1); + assertRemoteNodes(NODE1, NODE2, NODE3); + + // change priority to make Node2 a master + int responseCode = getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE2, "PUT", Collections.singletonMap(BDBHAVirtualHostNode.PRIORITY, 100)); + assertEquals("Unexpected response code on priority update of virtual host node " + NODE2, 200, responseCode); + + List> data = getRestTestHelper().getJsonAsList("replicationnode/" + NODE2); + assertEquals("Unexpected number of remote nodes on " + NODE2, 2, data.size()); + + // delete master + responseCode = getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE1, "DELETE"); + assertEquals("Unexpected response code on deletion of virtual host node " + NODE1, 200, responseCode); + + // wait for new master + waitForAttributeChanged(_baseNodeRestUrl + NODE2 + "?depth=0", BDBHAVirtualHostNode.ROLE, "MASTER"); + + // delete remote node + responseCode = getRestTestHelper().submitRequest("replicationnode/" + NODE2 + "/" + NODE1, "DELETE"); + assertEquals("Unexpected response code on deletion of remote node " + NODE1, 200, responseCode); + + int counter = 0; + while (data.size() != 1 && counter<50) + { + data = getRestTestHelper().getJsonAsList("replicationnode/" + NODE2); + if (data.size() != 1) + { + Thread.sleep(100l); + } + counter++; + } + assertEquals("Unexpected number of remote nodes on " + NODE2, 1, data.size()); + } + + public void testIntruderBDBHAVHNNotAllowedNoConnect() throws Exception + { + createHANode(NODE1, _node1HaPort, _node1HaPort); + assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1); + + // add permitted node + Map node3Data = createNodeAttributeMap(NODE3, _node3HaPort, _node1HaPort); + getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE3, "PUT", node3Data, 201); + assertNode(NODE3, _node3HaPort, _node1HaPort, NODE1); + assertRemoteNodes(NODE1, NODE3); + + int intruderPort = getNextAvailable(_node3HaPort + 1); + + // try to add not permitted node + Map nodeData = createNodeAttributeMap(NODE2, intruderPort, _node1HaPort); + getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE2, "PUT", nodeData, 409); + + assertRemoteNodes(NODE1, NODE3); + } + + public void testIntruderProtection() throws Exception + { + createHANode(NODE1, _node1HaPort, _node1HaPort); + assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1); + + Map nodeData = getRestTestHelper().getJsonAsSingletonList(_baseNodeRestUrl + NODE1); + String node1StorePath = (String)nodeData.get(BDBHAVirtualHostNode.STORE_PATH); + long transactionId = ((Number)nodeData.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID)).longValue(); + + // add permitted node + Map node3Data = createNodeAttributeMap(NODE3, _node3HaPort, _node1HaPort); + getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE3, "PUT", node3Data, 201); + assertNode(NODE3, _node3HaPort, _node1HaPort, NODE1); + assertRemoteNodes(NODE1, NODE3); + + // Ensure PINGDB is created + // in order to exclude hanging of environment + // when environment.close is called whilst PINGDB is created. + // On node joining, a record is updated in PINGDB + // if lastTransactionId is incremented then node ping task was executed + int counter = 0; + long newTransactionId = transactionId; + while(newTransactionId == transactionId && counter<50) + { + nodeData = getRestTestHelper().getJsonAsSingletonList(_baseNodeRestUrl + NODE1); + newTransactionId = ((Number)nodeData.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID)).longValue(); + if (newTransactionId != transactionId) + { + break; + } + counter++; + Thread.sleep(100l); + } + + //connect intruder node + String nodeName = NODE2; + String nodeHostPort = "localhost:" + getNextAvailable(_node3HaPort + 1); + File environmentPathFile = new File(node1StorePath, nodeName); + environmentPathFile.mkdirs(); + ReplicationConfig replicationConfig = new ReplicationConfig((String)nodeData.get(BDBHAVirtualHostNode.GROUP_NAME), nodeName, nodeHostPort); + replicationConfig.setHelperHosts((String)nodeData.get(BDBHAVirtualHostNode.ADDRESS)); + EnvironmentConfig envConfig = new EnvironmentConfig(); + envConfig.setAllowCreate(true); + envConfig.setTransactional(true); + envConfig.setDurability(Durability.parse((String)nodeData.get(BDBHAVirtualHostNode.DURABILITY))); + + ReplicatedEnvironment intruder = null; + try + { + intruder = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig); + } + finally + { + if (intruder != null) + { + intruder.close(); + } + } + waitForAttributeChanged(_baseNodeRestUrl + NODE1, VirtualHostNode.STATE, State.ERRORED.name()); + } + + private void createHANode(String nodeName, int nodePort, int helperPort) throws Exception + { + Map nodeData = createNodeAttributeMap(nodeName, nodePort, helperPort); + + int responseCode = getRestTestHelper().submitRequest(_baseNodeRestUrl + nodeName, "PUT", nodeData); + assertEquals("Unexpected response code for virtual host node " + nodeName + " creation request", 201, responseCode); + String hostExpectedState = nodePort == helperPort ? State.ACTIVE.name(): State.UNAVAILABLE.name(); + waitForAttributeChanged("virtualhost/" + nodeName + "/" + _hostName, BDBHAVirtualHost.STATE, hostExpectedState); + } + + private Map createNodeAttributeMap(String nodeName, int nodePort, int helperPort) throws Exception + { + Map nodeData = new HashMap(); + nodeData.put(BDBHAVirtualHostNode.NAME, nodeName); + nodeData.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + nodeData.put(BDBHAVirtualHostNode.GROUP_NAME, _hostName); + nodeData.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + nodePort); + nodeData.put(BDBHAVirtualHostNode.HELPER_ADDRESS, "localhost:" + helperPort); + nodeData.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, NODE1); + Map context = new HashMap<>(); + nodeData.put(BDBHAVirtualHostNode.CONTEXT, context); + String bluePrint = HATestClusterCreator.getBlueprint("localhost", _node1HaPort, _node2HaPort, _node3HaPort); + context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR, bluePrint); + return nodeData; + } + + private void assertNode(String nodeName, int nodePort, int nodeHelperPort, String masterNode) throws Exception + { + boolean isMaster = nodeName.equals(masterNode); + String expectedRole = isMaster? "MASTER" : "REPLICA"; + waitForAttributeChanged(_baseNodeRestUrl + nodeName + "?depth=0", BDBHAVirtualHostNode.ROLE, expectedRole); + + Map nodeData = getRestTestHelper().getJsonAsSingletonList(_baseNodeRestUrl + nodeName + "?depth=0"); + assertEquals("Unexpected name", nodeName, nodeData.get(BDBHAVirtualHostNode.NAME)); + assertEquals("Unexpected type", "BDB_HA", nodeData.get(BDBHAVirtualHostNode.TYPE)); + assertEquals("Unexpected address", "localhost:" + nodePort, nodeData.get(BDBHAVirtualHostNode.ADDRESS)); + assertEquals("Unexpected helper address", "localhost:" + nodeHelperPort, nodeData.get(BDBHAVirtualHostNode.HELPER_ADDRESS)); + assertEquals("Unexpected group name", _hostName, nodeData.get(BDBHAVirtualHostNode.GROUP_NAME)); + assertEquals("Unexpected role", expectedRole, nodeData.get(BDBHAVirtualHostNode.ROLE)); + + Integer lastKnownTransactionId = (Integer) nodeData.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID); + assertNotNull("Unexpected lastKnownReplicationId", lastKnownTransactionId); + assertTrue("Unexpected lastKnownReplicationId " + lastKnownTransactionId, lastKnownTransactionId > 0); + + Long joinTime = (Long) nodeData.get(BDBHAVirtualHostNode.JOIN_TIME); + assertNotNull("Unexpected joinTime", joinTime); + assertTrue("Unexpected joinTime " + joinTime, joinTime > 0); + + if (isMaster) + { + waitForAttributeChanged("virtualhost/" + masterNode + "/" + _hostName + "?depth=0", VirtualHost.STATE, State.ACTIVE.name()); + } + + } + + private void assertRemoteNodes(String masterNode, String... replicaNodes) throws Exception + { + List clusterNodes = new ArrayList(Arrays.asList(replicaNodes)); + clusterNodes.add(masterNode); + + for (String clusterNodeName : clusterNodes) + { + List remotes = new ArrayList(clusterNodes); + remotes.remove(clusterNodeName); + for (String remote : remotes) + { + String remoteUrl = "replicationnode/" + clusterNodeName + "/" + remote; + Map nodeData = waitForAttributeChanged(remoteUrl, BDBHARemoteReplicationNode.ROLE, remote.equals(masterNode) ? "MASTER" : "REPLICA"); + assertRemoteNodeData(remote, nodeData); + } + } + } + + private void assertRemoteNodeData(String name, Map nodeData) + { + assertEquals("Remote node " + name + " has unexpected name", name, nodeData.get(BDBHAVirtualHostNode.NAME)); + + Integer lastKnownTransactionId = (Integer) nodeData.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID); + assertNotNull("Node " + name + " has unexpected lastKnownReplicationId", lastKnownTransactionId); + assertTrue("Node " + name + " has unexpected lastKnownReplicationId " + lastKnownTransactionId, lastKnownTransactionId > 0); + + Long joinTime = (Long) nodeData.get(BDBHAVirtualHostNode.JOIN_TIME); + assertNotNull("Node " + name + " has unexpected joinTime", joinTime); + assertTrue("Node " + name + " has unexpected joinTime " + joinTime, joinTime > 0); + } + + private void assertActualAndDesiredStates(final String restUrl, + final String expectedDesiredState, + final String expectedActualState) throws IOException + { + Map objectData = getRestTestHelper().getJsonAsSingletonList(restUrl); + Asserts.assertActualAndDesiredState(expectedDesiredState, expectedActualState, objectData); + } + + private void mutateDesiredState(final String restUrl, final String newState) throws IOException + { + Map newAttributes = new HashMap(); + newAttributes.put(VirtualHostNode.DESIRED_STATE, newState); + + getRestTestHelper().submitRequest(restUrl, "PUT", newAttributes, HttpServletResponse.SC_OK); + } + + private Map findRemoteNodeByName(final List> remoteNodes, final String nodeName) + { + Map foundNode = null; + for (Map remoteNode : remoteNodes) + { + if (nodeName.equals(remoteNode.get(RemoteReplicationNode.NAME))) + { + foundNode = remoteNode; + break; + } + } + assertNotNull("Could not find node with name " + nodeName + " amongst remote nodes."); + return foundNode; + } + + private Map awaitNewMaster(final String... nodeUrls) + throws IOException, InterruptedException + { + Map newMasterData = null; + int counter = 0; + while (newMasterData == null && counter < 50) + { + for(String nodeUrl: nodeUrls) + { + Map nodeData = getRestTestHelper().getJsonAsSingletonList(nodeUrl); + if ("MASTER".equals(nodeData.get(BDBHAVirtualHostNode.ROLE))) + { + newMasterData = nodeData; + break; + } + } + if (newMasterData == null) + { + Thread.sleep(100l); + counter++; + } + } + assertNotNull("Could not find new master", newMasterData); + return newMasterData; + } + + +} diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java new file mode 100644 index 0000000000..334544e334 --- /dev/null +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java @@ -0,0 +1,157 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY; +import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import javax.servlet.http.HttpServletResponse; + +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; +import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; +import org.apache.qpid.systest.rest.Asserts; +import org.apache.qpid.systest.rest.QpidRestTestCase; +import org.apache.qpid.test.utils.TestBrokerConfiguration; +import org.apache.qpid.util.FileUtils; + +public class BDBHAVirtualHostRestTest extends QpidRestTestCase +{ + private String _hostName; + private File _storeBaseDir; + private int _nodeHaPort; + private Object _nodeName; + private String _virtualhostUrl; + private String _bluePrint; + + @Override + public void setUp() throws Exception + { + setTestSystemProperty(ReplicatedEnvironmentFacade.REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, "1000"); + _hostName = "ha"; + _nodeName = "node1"; + _storeBaseDir = new File(TMP_FOLDER, "store-" + _hostName + "-" + System.currentTimeMillis()); + _nodeHaPort = getNextAvailable(getRestTestHelper().getHttpPort() + 1); + _virtualhostUrl = "virtualhost/" + _nodeName + "/" + _hostName; + _bluePrint = HATestClusterCreator.getBlueprint("localhost", _nodeHaPort); + + super.setUp(); + } + + @Override + public void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + if (_storeBaseDir != null) + { + FileUtils.delete(_storeBaseDir, true); + } + } + } + + @Override + protected void customizeConfiguration() throws IOException + { + super.customizeConfiguration(); + TestBrokerConfiguration config = getBrokerConfiguration(); + config.removeObjectConfiguration(VirtualHostNode.class, TEST2_VIRTUALHOST); + config.removeObjectConfiguration(VirtualHostNode.class, TEST3_VIRTUALHOST); + + Map nodeAttributes = new HashMap(); + nodeAttributes.put(BDBHAVirtualHostNode.NAME, _nodeName); + nodeAttributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + nodeAttributes.put(BDBHAVirtualHostNode.STORE_PATH, _storeBaseDir.getPath() + File.separator + _nodeName); + nodeAttributes.put(BDBHAVirtualHostNode.GROUP_NAME, _hostName); + nodeAttributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + _nodeHaPort); + nodeAttributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, "localhost:" + _nodeHaPort); + nodeAttributes.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, _nodeName); + Map context = new HashMap(); + context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR, _bluePrint); + + nodeAttributes.put(BDBHAVirtualHostNode.CONTEXT, context); + config.addObjectConfiguration(VirtualHostNode.class, nodeAttributes); + } + + public void testSetLocalTransactionSynchronizationPolicy() throws Exception + { + Map hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name()); + assertEquals("Unexpected synchronization policy before change", "SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY)); + + Map newPolicy = Collections.singletonMap(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, "NO_SYNC"); + getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, HttpServletResponse.SC_OK); + + hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl); + assertEquals("Unexpected synchronization policy after change", "NO_SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY)); + } + + public void testSetRemoteTransactionSynchronizationPolicy() throws Exception + { + Map hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name()); + assertEquals("Unexpected synchronization policy before change", "NO_SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY)); + + Map newPolicy = Collections.singletonMap(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "SYNC"); + getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, HttpServletResponse.SC_OK); + + hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl); + assertEquals("Unexpected synchronization policy after change", "SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY)); + } + + public void testMutateState() throws Exception + { + waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE"); + assertActualAndDesireStates(_virtualhostUrl, "ACTIVE", "ACTIVE"); + + Map newAttributes = Collections.singletonMap(VirtualHost.DESIRED_STATE, "STOPPED"); + getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK); + + waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "STOPPED"); + assertActualAndDesireStates(_virtualhostUrl, "STOPPED", "STOPPED"); + + newAttributes = Collections.singletonMap(VirtualHost.DESIRED_STATE, "ACTIVE"); + getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK); + + waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE"); + assertActualAndDesireStates(_virtualhostUrl, "ACTIVE", "ACTIVE"); + } + + private void assertActualAndDesireStates(final String restUrl, + final String expectedDesiredState, + final String expectedActualState) throws IOException + { + Map virtualhost = getRestTestHelper().getJsonAsSingletonList(restUrl); + Asserts.assertActualAndDesiredState(expectedDesiredState, expectedActualState, virtualhost); + } + +} diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java new file mode 100644 index 0000000000..491856d953 --- /dev/null +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java @@ -0,0 +1,548 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.io.InputStream; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularDataSupport; + +import org.apache.qpid.management.common.mbeans.ManagedExchange; +import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBVirtualHostNode; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.TestBrokerConfiguration; +import org.apache.qpid.util.FileUtils; +import org.apache.qpid.util.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests upgrading a BDB store on broker startup. + * The store will then be used to verify that the upgrade is completed + * properly and that once upgraded it functions as expected. + * + * Store prepared using old client/broker with BDBStoreUpgradeTestPreparer. + */ +public class BDBUpgradeTest extends QpidBrokerTestCase +{ + protected static final Logger _logger = LoggerFactory.getLogger(BDBUpgradeTest.class); + + private static final String QPID_WORK_ORIG = System.getProperty("QPID_WORK"); + + private static final String STRING_1024 = generateString(1024); + private static final String STRING_1024_256 = generateString(1024*256); + + private static final String TOPIC_NAME="myUpgradeTopic"; + private static final String SUB_NAME="myDurSubName"; + private static final String SELECTOR_SUB_NAME="mySelectorDurSubName"; + private static final String SELECTOR_TOPIC_NAME="mySelectorUpgradeTopic"; + private static final String QUEUE_NAME="myUpgradeQueue"; + private static final String NON_DURABLE_QUEUE_NAME="queue-non-durable"; + private static final String PRIORITY_QUEUE_NAME="myPriorityQueue"; + private static final String QUEUE_WITH_DLQ_NAME="myQueueWithDLQ"; + + private String _storeLocation; + + @Override + public void setUp() throws Exception + { + assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG); + Map virtualHostNodeAttributes = getBrokerConfiguration().getObjectAttributes(VirtualHostNode.class, TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST); + _storeLocation = Strings.expand((String)virtualHostNodeAttributes.get(BDBVirtualHostNode.STORE_PATH)); + + //Clear the two target directories if they exist. + File directory = new File(_storeLocation); + if (directory.exists() && directory.isDirectory()) + { + FileUtils.delete(directory, true); + } + directory.mkdirs(); + + // copy store files + InputStream src = getClass().getClassLoader().getResourceAsStream("upgrade/bdbstore-v4/test-store/00000000.jdb"); + FileUtils.copy(src, new File(_storeLocation, "00000000.jdb")); + + getBrokerConfiguration().addJmxManagementConfiguration(); + super.setUp(); + } + + /** + * Test that the selector applied to the DurableSubscription was successfully + * transfered to the new store, and functions as expected with continued use + * by monitoring message count while sending new messages to the topic and then + * consuming them. + */ + public void testSelectorDurability() throws Exception + { + JMXTestUtils jmxUtils = null; + try + { + jmxUtils = new JMXTestUtils(this, "guest", "guest"); + jmxUtils.open(); + } + catch (Exception e) + { + fail("Unable to establish JMX connection, test cannot proceed"); + } + + try + { + ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SELECTOR_SUB_NAME); + assertEquals("DurableSubscription backing queue should have 1 message on it initially", + new Integer(1), dursubQueue.getMessageCount()); + + // Create a connection and start it + TopicConnection connection = (TopicConnection) getConnection(); + connection.start(); + + // Send messages which don't match and do match the selector, checking message count + TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED); + Topic topic = pubSession.createTopic(SELECTOR_TOPIC_NAME); + TopicPublisher publisher = pubSession.createPublisher(topic); + + publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false"); + pubSession.commit(); + assertEquals("DurableSubscription backing queue should still have 1 message on it", + Integer.valueOf(1), dursubQueue.getMessageCount()); + + publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true"); + pubSession.commit(); + assertEquals("DurableSubscription backing queue should now have 2 messages on it", + Integer.valueOf(2), dursubQueue.getMessageCount()); + + TopicSubscriber durSub = pubSession.createDurableSubscriber(topic, SELECTOR_SUB_NAME,"testprop='true'", false); + Message m = durSub.receive(2000); + assertNotNull("Failed to receive an expected message", m); + m = durSub.receive(2000); + assertNotNull("Failed to receive an expected message", m); + pubSession.commit(); + + pubSession.close(); + } + finally + { + jmxUtils.close(); + } + } + + /** + * Test that the DurableSubscription without selector was successfully + * transfered to the new store, and functions as expected with continued use. + */ + public void testDurableSubscriptionWithoutSelector() throws Exception + { + JMXTestUtils jmxUtils = null; + try + { + jmxUtils = new JMXTestUtils(this, "guest", "guest"); + jmxUtils.open(); + } + catch (Exception e) + { + fail("Unable to establish JMX connection, test cannot proceed"); + } + + try + { + ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME); + assertEquals("DurableSubscription backing queue should have 1 message on it initially", + new Integer(1), dursubQueue.getMessageCount()); + + // Create a connection and start it + TopicConnection connection = (TopicConnection) getConnection(); + connection.start(); + + // Send new message matching the topic, checking message count + TopicSession session = connection.createTopicSession(true, Session.SESSION_TRANSACTED); + Topic topic = session.createTopic(TOPIC_NAME); + TopicPublisher publisher = session.createPublisher(topic); + + publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "indifferent"); + session.commit(); + assertEquals("DurableSubscription backing queue should now have 2 messages on it", + Integer.valueOf(2), dursubQueue.getMessageCount()); + + TopicSubscriber durSub = session.createDurableSubscriber(topic, SUB_NAME); + Message m = durSub.receive(2000); + assertNotNull("Failed to receive an expected message", m); + m = durSub.receive(2000); + assertNotNull("Failed to receive an expected message", m); + + session.commit(); + session.close(); + } + finally + { + jmxUtils.close(); + } + } + + /** + * Test that the backing queue for the durable subscription created was successfully + * detected and set as being exclusive during the upgrade process, and that the + * regular queue was not. + */ + public void testQueueExclusivity() throws Exception + { + JMXTestUtils jmxUtils = null; + try + { + jmxUtils = new JMXTestUtils(this, "guest", "guest"); + jmxUtils.open(); + } + catch (Exception e) + { + fail("Unable to establish JMX connection, test cannot proceed"); + } + + try + { + ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_NAME); + assertFalse("Queue should not have been marked as Exclusive during upgrade", queue.isExclusive()); + + ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME); + assertTrue("DurableSubscription backing queue should have been marked as Exclusive during upgrade", dursubQueue.isExclusive()); + } + finally + { + jmxUtils.close(); + } + } + + /** + * Test that the upgraded queue continues to function properly when used + * for persistent messaging and restarting the broker. + * + * Sends the new messages to the queue BEFORE consuming those which were + * sent before the upgrade. In doing so, this also serves to test that + * the queue bindings were successfully transitioned during the upgrade. + */ + public void testBindingAndMessageDurabability() throws Exception + { + // Create a connection and start it + TopicConnection connection = (TopicConnection) getConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(QUEUE_NAME); + MessageProducer messageProducer = session.createProducer(queue); + + // Send a new message + sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 256*1024, 1); + + session.close(); + + // Restart the broker + restartBroker(); + + // Drain the queue of all messages + connection = (TopicConnection) getConnection(); + connection.start(); + consumeQueueMessages(connection, true); + } + + /** + * Test that all of the committed persistent messages previously sent to + * the broker are properly received following update of the MetaData and + * Content entries during the store upgrade process. + */ + public void testConsumptionOfUpgradedMessages() throws Exception + { + // Create a connection and start it + Connection connection = getConnection(); + connection.start(); + + consumeDurableSubscriptionMessages(connection, true); + consumeDurableSubscriptionMessages(connection, false); + consumeQueueMessages(connection, false); + } + + /** + * Tests store migration containing messages for non-existing queue. + * + * @throws Exception + */ + public void testMigrationOfMessagesForNonDurableQueues() throws Exception + { + // Create a connection and start it + Connection connection = getConnection(); + connection.start(); + + // consume a message for non-existing store + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(NON_DURABLE_QUEUE_NAME); + MessageConsumer messageConsumer = session.createConsumer(queue); + + for (int i = 1; i <= 3; i++) + { + Message message = messageConsumer.receive(1000); + assertNotNull("Message was not migrated!", message); + assertTrue("Unexpected message received!", message instanceof TextMessage); + assertEquals("ID property did not match", i, message.getIntProperty("ID")); + } + } + + /** + * Tests store upgrade has maintained the priority queue configuration, + * such that sending messages with priorities out-of-order and then consuming + * them gets the messages back in priority order. + */ + public void testPriorityQueue() throws Exception + { + // Create a connection and start it + Connection connection = getConnection(); + connection.start(); + + // send some messages to the priority queue + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(PRIORITY_QUEUE_NAME); + MessageProducer producer = session.createProducer(queue); + + producer.setPriority(4); + producer.send(createMessage(1, false, session, producer)); + producer.setPriority(1); + producer.send(createMessage(2, false, session, producer)); + producer.setPriority(9); + producer.send(createMessage(3, false, session, producer)); + session.close(); + + //consume the messages, expected order: msg 3, msg 1, msg 2. + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queue); + + Message msg = consumer.receive(1500); + assertNotNull("expected message was not received", msg); + assertEquals(3, msg.getIntProperty("msg")); + msg = consumer.receive(1500); + assertNotNull("expected message was not received", msg); + assertEquals(1, msg.getIntProperty("msg")); + msg = consumer.receive(1500); + assertNotNull("expected message was not received", msg); + assertEquals(2, msg.getIntProperty("msg")); + } + + /** + * + * TODO (QPID-5650) Resolve so this test can be reenabled. + * + * Test that the queue configured to have a DLQ was recovered and has the alternate exchange + * and max delivery count, the DLE exists, the DLQ exists with no max delivery count, the + * DLQ is bound to the DLE, and that the DLQ does not itself have a DLQ. + * + * DLQs are NOT enabled at the virtualhost level, we are testing recovery of the arguments + * that turned it on for this specific queue. + */ + public void xtestRecoveryOfQueueWithDLQ() throws Exception + { + JMXTestUtils jmxUtils = null; + try + { + jmxUtils = new JMXTestUtils(this, "guest", "guest"); + jmxUtils.open(); + } + catch (Exception e) + { + fail("Unable to establish JMX connection, test cannot proceed"); + } + + try + { + //verify the DLE exchange exists, has the expected type, and a single binding for the DLQ + ManagedExchange exchange = jmxUtils.getManagedExchange(QUEUE_WITH_DLQ_NAME + "_DLE"); + assertEquals("Wrong exchange type", "fanout", exchange.getExchangeType()); + TabularDataSupport bindings = (TabularDataSupport) exchange.bindings(); + assertEquals(1, bindings.size()); + for(Object o : bindings.values()) + { + CompositeData binding = (CompositeData) o; + + String bindingKey = (String) binding.get(ManagedExchange.BINDING_KEY); + String[] queueNames = (String[]) binding.get(ManagedExchange.QUEUE_NAMES); + + //Because its a fanout exchange, we just return a single '*' key with all bound queues + assertEquals("unexpected binding key", "*", bindingKey); + assertEquals("unexpected number of queues bound", 1, queueNames.length); + assertEquals("unexpected queue name", QUEUE_WITH_DLQ_NAME + "_DLQ", queueNames[0]); + } + + //verify the queue exists, has the expected alternate exchange and max delivery count + ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME); + assertEquals("Queue does not have the expected AlternateExchange", QUEUE_WITH_DLQ_NAME + "_DLE", queue.getAlternateExchange()); + assertEquals("Unexpected maximum delivery count", Integer.valueOf(2), queue.getMaximumDeliveryCount()); + + ManagedQueue dlQqueue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME + "_DLQ"); + assertNull("Queue should not have an AlternateExchange", dlQqueue.getAlternateExchange()); + assertEquals("Unexpected maximum delivery count", Integer.valueOf(0), dlQqueue.getMaximumDeliveryCount()); + + String dlqDlqObjectNameString = jmxUtils.getQueueObjectNameString("test", QUEUE_WITH_DLQ_NAME + "_DLQ" + "_DLQ"); + assertFalse("a DLQ should not exist for the DLQ itself", jmxUtils.doesManagedObjectExist(dlqDlqObjectNameString)); + } + finally + { + jmxUtils.close(); + } + } + + private void consumeDurableSubscriptionMessages(Connection connection, boolean selector) throws Exception + { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = null; + TopicSubscriber durSub = null; + + if(selector) + { + topic = session.createTopic(SELECTOR_TOPIC_NAME); + durSub = session.createDurableSubscriber(topic, SELECTOR_SUB_NAME,"testprop='true'", false); + } + else + { + topic = session.createTopic(TOPIC_NAME); + durSub = session.createDurableSubscriber(topic, SUB_NAME); + } + + + // Retrieve the matching message + Message m = durSub.receive(2000); + assertNotNull("Failed to receive an expected message", m); + if(selector) + { + assertEquals("Selector property did not match", "true", m.getStringProperty("testprop")); + } + assertEquals("ID property did not match", 1, m.getIntProperty("ID")); + assertEquals("Message content was not as expected", generateString(1024) , ((TextMessage)m).getText()); + + // Verify that no more messages are received + m = durSub.receive(1000); + assertNull("No more messages should have been recieved", m); + + durSub.close(); + session.close(); + } + + private void consumeQueueMessages(Connection connection, boolean extraMessage) throws Exception + { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(QUEUE_NAME); + + MessageConsumer consumer = session.createConsumer(queue); + Message m; + + // Retrieve the initial pre-upgrade messages + for (int i=1; i <= 5 ; i++) + { + m = consumer.receive(2000); + assertNotNull("Failed to receive an expected message", m); + assertEquals("ID property did not match", i, m.getIntProperty("ID")); + assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText()); + } + for (int i=1; i <= 5 ; i++) + { + m = consumer.receive(2000); + assertNotNull("Failed to receive an expected message", m); + assertEquals("ID property did not match", i, m.getIntProperty("ID")); + assertEquals("Message content was not as expected", STRING_1024, ((TextMessage)m).getText()); + } + + if(extraMessage) + { + //verify that the extra message is received + m = consumer.receive(2000); + assertNotNull("Failed to receive an expected message", m); + assertEquals("ID property did not match", 1, m.getIntProperty("ID")); + assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText()); + } + + // Verify that no more messages are received + m = consumer.receive(1000); + assertNull("No more messages should have been recieved", m); + + consumer.close(); + session.close(); + } + + private Message createMessage(int msgId, boolean first, Session producerSession, MessageProducer producer) throws JMSException + { + Message send = producerSession.createTextMessage("Message: " + msgId); + send.setIntProperty("msg", msgId); + + return send; + } + + /** + * Generates a string of a given length consisting of the sequence 0,1,2,..,9,0,1,2. + * + * @param length number of characters in the string + * @return string sequence of the given length + */ + private static String generateString(int length) + { + char[] base_chars = new char[]{'0','1','2','3','4','5','6','7','8','9'}; + char[] chars = new char[length]; + for (int i = 0; i < (length); i++) + { + chars[i] = base_chars[i % 10]; + } + return new String(chars); + } + + private static void sendMessages(Session session, MessageProducer messageProducer, + Destination dest, int deliveryMode, int length, int numMesages) throws JMSException + { + for (int i = 1; i <= numMesages; i++) + { + Message message = session.createTextMessage(generateString(length)); + message.setIntProperty("ID", i); + messageProducer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + } + } + + private static void publishMessages(Session session, TopicPublisher publisher, + Destination dest, int deliveryMode, int length, int numMesages, String selectorProperty) throws JMSException + { + for (int i = 1; i <= numMesages; i++) + { + Message message = session.createTextMessage(generateString(length)); + message.setIntProperty("ID", i); + message.setStringProperty("testprop", selectorProperty); + publisher.publish(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + } + } +} diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java new file mode 100644 index 0000000000..9867ce2eca --- /dev/null +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Session; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.TestUtils; + +/** + * The HA black box tests test the BDB cluster as a opaque unit. Client connects to + * the cluster via a failover url + * + * @see HAClusterWhiteboxTest + */ +public class HAClusterBlackboxTest extends QpidBrokerTestCase +{ + protected static final Logger LOGGER = Logger.getLogger(HAClusterBlackboxTest.class); + + private static final String VIRTUAL_HOST = "test"; + private static final int NUMBER_OF_NODES = 3; + + private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + + private FailoverAwaitingListener _failoverListener; + private ConnectionURL _brokerFailoverUrl; + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + + assertTrue(isJavaBroker()); + assertTrue(isBrokerStorePersistent()); + + setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); + + _clusterCreator.configureClusterNodes(); + + _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); + + _clusterCreator.startCluster(); + _failoverListener = new FailoverAwaitingListener(); + + super.setUp(); + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + public void testLossOfMasterNodeCausesClientToFailover() throws Exception + { + final Connection connection = getConnection(_brokerFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverListener); + + final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + + _clusterCreator.stopNode(activeBrokerPort); + LOGGER.info("Node is stopped"); + _failoverListener.awaitFailoverCompletion(20000); + LOGGER.info("Listener has finished"); + // any op to ensure connection remains + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + public void testLossOfReplicaNodeDoesNotCauseClientToFailover() throws Exception + { + LOGGER.info("Connecting to " + _brokerFailoverUrl); + final Connection connection = getConnection(_brokerFailoverUrl); + LOGGER.info("Got connection to cluster"); + + ((AMQConnection)connection).setConnectionListener(_failoverListener); + final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection); + + LOGGER.info("Stopping inactive broker on port " + inactiveBrokerPort); + + _clusterCreator.stopNode(inactiveBrokerPort); + + _failoverListener.assertNoFailoverCompletionWithin(2000); + + // any op to ensure connection remains + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + public void testTransferMasterFromLocalNode() throws Exception + { + final Connection connection = getConnection(_brokerFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverListener); + + final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + + final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection); + LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort); + + Map attributes = _clusterCreator.getNodeAttributes(inactiveBrokerPort); + assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE)); + _clusterCreator.setNodeAttributes(inactiveBrokerPort, Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER")); + + _failoverListener.awaitFailoverCompletion(20000); + LOGGER.info("Listener has finished"); + + attributes = _clusterCreator.getNodeAttributes(inactiveBrokerPort); + assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE)); + + assertProducingConsuming(connection); + + _clusterCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA"); + } + + public void testTransferMasterFromRemoteNode() throws Exception + { + final Connection connection = getConnection(_brokerFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverListener); + + final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + + final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection); + LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort); + + _clusterCreator.awaitNodeToAttainRole(activeBrokerPort, inactiveBrokerPort, "REPLICA"); + Map attributes = _clusterCreator.getNodeAttributes(activeBrokerPort, inactiveBrokerPort); + assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE)); + + _clusterCreator.setNodeAttributes(activeBrokerPort, inactiveBrokerPort, Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER")); + + _failoverListener.awaitFailoverCompletion(20000); + LOGGER.info("Listener has finished"); + + attributes = _clusterCreator.getNodeAttributes(inactiveBrokerPort); + assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE)); + + assertProducingConsuming(connection); + + _clusterCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA"); + } + + public void testQuorumOverride() throws Exception + { + final Connection connection = getConnection(_brokerFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverListener); + + Set ports = _clusterCreator.getBrokerPortNumbersForNodes(); + + final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + ports.remove(activeBrokerPort); + + // Stop all other nodes + for (Integer p : ports) + { + _clusterCreator.stopNode(p); + } + + Map attributes = _clusterCreator.getNodeAttributes(activeBrokerPort); + assertEquals("Broker has unexpected quorum override", new Integer(0), attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE)); + _clusterCreator.setNodeAttributes(activeBrokerPort, Collections.singletonMap(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 1)); + + attributes = _clusterCreator.getNodeAttributes(activeBrokerPort); + assertEquals("Broker has unexpected quorum override", new Integer(1), attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE)); + + assertProducingConsuming(connection); + } + + public void testPriority() throws Exception + { + final Connection connection = getConnection(_brokerFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverListener); + + final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + + int priority = 1; + Integer highestPriorityBrokerPort = null; + Set ports = _clusterCreator.getBrokerPortNumbersForNodes(); + for (Integer port : ports) + { + if (activeBrokerPort != port.intValue()) + { + priority = priority + 1; + highestPriorityBrokerPort = port; + _clusterCreator.setNodeAttributes(port, port, Collections.singletonMap(BDBHAVirtualHostNode.PRIORITY, priority)); + Map attributes = _clusterCreator.getNodeAttributes(port, port); + assertEquals("Broker has unexpected priority", priority, attributes.get(BDBHAVirtualHostNode.PRIORITY)); + } + } + + LOGGER.info("Broker on port " + highestPriorityBrokerPort + " has the highest priority of " + priority); + + LOGGER.info("Shutting down the MASTER"); + _clusterCreator.stopNode(activeBrokerPort); + + _failoverListener.awaitFailoverCompletion(20000); + LOGGER.info("Listener has finished"); + + Map attributes = _clusterCreator.getNodeAttributes(highestPriorityBrokerPort, highestPriorityBrokerPort); + assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE)); + + assertProducingConsuming(connection); + } + + private final class FailoverAwaitingListener implements ConnectionListener + { + private final CountDownLatch _failoverCompletionLatch = new CountDownLatch(1); + + @Override + public boolean preResubscribe() + { + return true; + } + + @Override + public boolean preFailover(boolean redirect) + { + return true; + } + + public void awaitFailoverCompletion(long delay) throws InterruptedException + { + if (!_failoverCompletionLatch.await(delay, TimeUnit.MILLISECONDS)) + { + LOGGER.warn("Test thread dump:\n\n" + TestUtils.dumpThreads() + "\n"); + } + assertEquals("Failover did not occur", 0, _failoverCompletionLatch.getCount()); + } + + public void assertNoFailoverCompletionWithin(long delay) throws InterruptedException + { + _failoverCompletionLatch.await(delay, TimeUnit.MILLISECONDS); + assertEquals("Failover occurred unexpectedly", 1L, _failoverCompletionLatch.getCount()); + } + + @Override + public void failoverComplete() + { + _failoverCompletionLatch.countDown(); + } + + @Override + public void bytesSent(long count) + { + } + + @Override + public void bytesReceived(long count) + { + } + } + +} diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java new file mode 100644 index 0000000000..0ab10cc318 --- /dev/null +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.DETACHED; +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.MASTER; +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.REPLICA; +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.jms.Connection; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; + +import org.apache.log4j.Logger; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; +import org.apache.qpid.systest.rest.RestTestHelper; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.junit.Assert; + +/** + * System test verifying the ability to control a cluster via the Management API. + * + * @see HAClusterBlackboxTest + */ +public class HAClusterManagementTest extends QpidBrokerTestCase +{ + protected static final Logger LOGGER = Logger.getLogger(HAClusterManagementTest.class); + + private static final Set NON_MASTER_STATES = new HashSet(Arrays.asList(REPLICA.toString(), DETACHED.toString(), UNKNOWN.toString()));; + private static final String VIRTUAL_HOST = "test"; + + private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(VIRTUAL_HOST); + private static final int NUMBER_OF_NODES = 4; + + private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + private final JMXTestUtils _jmxUtils = new JMXTestUtils(this); + + private ConnectionURL _brokerFailoverUrl; + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + + _clusterCreator.configureClusterNodes(); + _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); + _clusterCreator.startCluster(); + + super.setUp(); + } + + @Override + protected void tearDown() throws Exception + { + try + { + _jmxUtils.close(); + } + finally + { + super.tearDown(); + } + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + public void testReadonlyMBeanAttributes() throws Exception + { + final int brokerPortNumber = getBrokerPortNumbers().iterator().next(); + final int bdbPortNumber = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumber); + + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber); + assertEquals("Unexpected store group name", _clusterCreator.getGroupName(), storeBean.getGroupName()); + assertEquals("Unexpected store node name", _clusterCreator.getNodeNameForNodeAt(bdbPortNumber), storeBean.getNodeName()); + assertEquals("Unexpected store node host port",_clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber), storeBean.getNodeHostPort()); + assertEquals("Unexpected store helper host port", _clusterCreator.getHelperHostPort(), storeBean.getHelperHostPort()); + // As we have chosen an arbitrary broker from the cluster, we cannot predict its state + assertNotNull("Store state must not be null", storeBean.getNodeState()); + } + + public void testStateOfActiveBrokerIsMaster() throws Exception + { + final Connection activeConnection = getConnection(_brokerFailoverUrl); + final int activeBrokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(activeConnection); + + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(activeBrokerPortNumber); + assertEquals("Unexpected store state", MASTER.toString(), storeBean.getNodeState()); + } + + public void testStateOfNonActiveBrokerIsNotMaster() throws Exception + { + final Connection activeConnection = getConnection(_brokerFailoverUrl); + final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection); + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(inactiveBrokerPortNumber); + final String nodeState = storeBean.getNodeState(); + assertTrue("Unexpected store state : " + nodeState, NON_MASTER_STATES.contains(nodeState)); + } + + public void testGroupMembers() throws Exception + { + final int brokerPortNumber = getBrokerPortNumbers().iterator().next(); + + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber); + awaitAllNodesJoiningGroup(storeBean, NUMBER_OF_NODES); + + final TabularData groupMembers = storeBean.getAllNodesInGroup(); + assertNotNull(groupMembers); + + for(int bdbPortNumber : _clusterCreator.getBdbPortNumbers()) + { + final String nodeName = _clusterCreator.getNodeNameForNodeAt(bdbPortNumber); + final String nodeHostPort = _clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber); + + CompositeData row = groupMembers.get(new Object[] {nodeName}); + assertNotNull("Table does not contain row for node name " + nodeName, row); + assertEquals(nodeHostPort, row.get(ManagedBDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT)); + } + } + + public void testRemoveRemoteNodeFromGroup() throws Exception + { + final Iterator brokerPortNumberIterator = getBrokerPortNumbers().iterator(); + final int brokerPortNumberToMakeObservation = brokerPortNumberIterator.next(); + final int brokerPortNumberToBeRemoved = brokerPortNumberIterator.next(); + final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToMakeObservation); + awaitAllNodesJoiningGroup(storeBean, NUMBER_OF_NODES); + + final String removedNodeName = _clusterCreator.getNodeNameForNodeAt(_clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeRemoved)); + _clusterCreator.stopNode(brokerPortNumberToBeRemoved); + + storeBean.removeNodeFromGroup(removedNodeName); + + long limitTime = System.currentTimeMillis() + 5000; + while((NUMBER_OF_NODES == storeBean.getAllNodesInGroup().size()) && System.currentTimeMillis() < limitTime) + { + Thread.sleep(100l); + } + + int numberOfDataRowsAfterRemoval = storeBean.getAllNodesInGroup().size(); + assertEquals("Unexpected number of data rows after test", NUMBER_OF_NODES - 1, numberOfDataRowsAfterRemoval); + } + + public void testVirtualHostOperationsDeniedForNonMasterNode() throws Exception + { + final Connection activeConnection = getConnection(_brokerFailoverUrl); + final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection); + + ManagedBroker inactiveBroker = getManagedBrokerBeanForNodeAtBrokerPort(inactiveBrokerPortNumber); + + try + { + inactiveBroker.createNewQueue(getTestQueueName(), null, true); + fail("Exception not thrown"); + } + catch (Exception e) + { + String message = e.getMessage(); + assertEquals("The virtual host state of UNAVAILABLE does not permit this operation.", message); + } + + try + { + inactiveBroker.createNewExchange(getName(), "direct", true); + fail("Exception not thrown"); + } + catch (Exception e) + { + String message = e.getMessage(); + assertEquals("The virtual host state of UNAVAILABLE does not permit this operation.", message); + } + } + + public void testSetDesignatedPrimary() throws Exception + { + int brokerPort = _clusterCreator.getBrokerPortNumbersForNodes().iterator().next(); + final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPort); + assertFalse("Unexpected designated primary before change", storeBean.getDesignatedPrimary()); + storeBean.setDesignatedPrimary(true); + long limit = System.currentTimeMillis() + 5000; + while(!storeBean.getDesignatedPrimary() && System.currentTimeMillis() < limit) + { + Thread.sleep(100l); + } + assertTrue("Unexpected designated primary after change", storeBean.getDesignatedPrimary()); + } + + public void testVirtualHostMbeanOnMasterTransfer() throws Exception + { + Connection connection = getConnection(_brokerFailoverUrl); + int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + connection.close(); + + Set ports = _clusterCreator.getBrokerPortNumbersForNodes(); + ports.remove(activeBrokerPort); + + int inactiveBrokerPort = ports.iterator().next(); + LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort); + + ManagedBroker inactiveVirtualHostMBean = getManagedBrokerBeanForNodeAtBrokerPort(inactiveBrokerPort); + + try + { + inactiveVirtualHostMBean.createNewQueue(getTestQueueName(), null, true); + fail("Exception not thrown"); + } + catch (Exception e) + { + String message = e.getMessage(); + assertEquals("The virtual host state of UNAVAILABLE does not permit this operation.", message); + } + + Map attributes = _clusterCreator.getNodeAttributes(inactiveBrokerPort); + assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE)); + _clusterCreator.setNodeAttributes(inactiveBrokerPort, Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER")); + + _clusterCreator.awaitNodeToAttainRole(inactiveBrokerPort, "MASTER"); + + awaitVirtualHostAtNode(inactiveBrokerPort); + + ManagedBroker activeVirtualHostMBean = getManagedBrokerBeanForNodeAtBrokerPort(inactiveBrokerPort); + activeVirtualHostMBean.createNewQueue(getTestQueueName() + inactiveBrokerPort, null, true); + } + + public void awaitVirtualHostAtNode(int brokerPort) throws Exception + { + final long startTime = System.currentTimeMillis(); + Map data = Collections.emptyMap(); + String nodeName = _clusterCreator.getNodeNameForBrokerPort(brokerPort); + RestTestHelper restHelper = _clusterCreator.createRestTestHelper(brokerPort); + while(!State.ACTIVE.name().equals(data.get(VirtualHost.STATE)) && (System.currentTimeMillis() - startTime) < 30000) + { + LOGGER.debug("Awaiting virtual host '" + nodeName + "' to transit into active state"); + List> results= restHelper.getJsonAsList("virtualhost/" + nodeName + "/" + VIRTUAL_HOST); + if (results.size()== 1) + { + data = results.get(0); + } + + if (!State.ACTIVE.name().equals(data.get(VirtualHost.STATE))) + { + Thread.sleep(1000); + } + } + Assert.assertEquals("Virtual host is not active", State.ACTIVE.name(), data.get(VirtualHost.STATE)); + LOGGER.debug("Virtual host '" + nodeName + "' is in active state"); + } + + private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort(final int brokerPortNumber) throws Exception + { + _jmxUtils.open(brokerPortNumber); + + return _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY); + } + + private ManagedBroker getManagedBrokerBeanForNodeAtBrokerPort(final int brokerPortNumber) throws Exception + { + _jmxUtils.open(brokerPortNumber); + + return _jmxUtils.getManagedBroker(VIRTUAL_HOST); + } + + private void awaitAllNodesJoiningGroup(ManagedBDBHAMessageStore storeBean, int expectedNumberOfNodes) throws Exception + { + long totalTimeWaited = 0l; + long waitInterval = 100l; + long maxWaitTime = 10000; + + int currentNumberOfNodes = storeBean.getAllNodesInGroup().size(); + while (expectedNumberOfNodes > currentNumberOfNodes || totalTimeWaited > maxWaitTime) + { + LOGGER.debug("Still awaiting nodes to join group; expecting " + + expectedNumberOfNodes + " node(s) but only have " + currentNumberOfNodes + + " after " + totalTimeWaited + " ms."); + + totalTimeWaited += waitInterval; + Thread.sleep(waitInterval); + + currentNumberOfNodes = storeBean.getAllNodesInGroup().size(); + } + + assertEquals("Unexpected number of nodes in group after " + totalTimeWaited + " ms", + expectedNumberOfNodes ,currentNumberOfNodes); + } +} diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java new file mode 100644 index 0000000000..8df419c3a7 --- /dev/null +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.management.ObjectName; + +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class HAClusterTwoNodeTest extends QpidBrokerTestCase +{ + private static final String VIRTUAL_HOST = "test"; + + private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(VIRTUAL_HOST); + private static final int NUMBER_OF_NODES = 2; + + private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + private final JMXTestUtils _jmxUtils = new JMXTestUtils(this); + + private ConnectionURL _brokerFailoverUrl; + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + + assertTrue(isJavaBroker()); + assertTrue(isBrokerStorePersistent()); + + super.setUp(); + } + + @Override + protected void tearDown() throws Exception + { + try + { + _jmxUtils.close(); + } + finally + { + super.tearDown(); + } + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + private void startCluster(boolean designedPrimary) throws Exception + { + setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); + _clusterCreator.configureClusterNodes(); + _clusterCreator.setDesignatedPrimaryOnFirstBroker(designedPrimary); + _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); + _clusterCreator.startCluster(); + } + + public void testMasterDesignatedPrimaryCanBeRestartedWithoutReplica() throws Exception + { + startCluster(true); + final Connection initialConnection = getConnection(_brokerFailoverUrl); + int masterPort = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection); + assertProducingConsuming(initialConnection); + initialConnection.close(); + _clusterCreator.stopCluster(); + _clusterCreator.startNode(masterPort); + final Connection secondConnection = getConnection(_brokerFailoverUrl); + assertProducingConsuming(secondConnection); + secondConnection.close(); + } + + public void testClusterRestartWithoutDesignatedPrimary() throws Exception + { + startCluster(false); + final Connection initialConnection = getConnection(_brokerFailoverUrl); + assertProducingConsuming(initialConnection); + initialConnection.close(); + _clusterCreator.stopCluster(); + _clusterCreator.startClusterParallel(); + final Connection secondConnection = getConnection(_brokerFailoverUrl); + assertProducingConsuming(secondConnection); + secondConnection.close(); + } + + public void testDesignatedPrimaryContinuesAfterSecondaryStopped() throws Exception + { + startCluster(true); + _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + final Connection connection = getConnection(_brokerFailoverUrl); + assertNotNull("Expected to get a valid connection to primary", connection); + assertProducingConsuming(connection); + } + + public void testPersistentOperationsFailOnNonDesignatedPrimaryAfterSecondaryStopped() throws Exception + { + startCluster(false); + _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + + try + { + Connection connection = getConnection(_brokerFailoverUrl); + assertProducingConsuming(connection); + fail("Exception not thrown"); + } + catch(JMSException e) + { + // JMSException should be thrown either on getConnection, or produce/consume + // depending on whether the relative timing of the node discovering that the + // secondary has gone. + } + } + + public void testSecondaryDoesNotBecomePrimaryWhenDesignatedPrimaryStopped() throws Exception + { + startCluster(true); + _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary()); + + try + { + getConnection(_brokerFailoverUrl); + fail("Connection not expected"); + } + catch (JMSException e) + { + // PASS + } + } + + public void testInitialDesignatedPrimaryStateOfNodes() throws Exception + { + startCluster(true); + final ManagedBDBHAMessageStore primaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfPrimary()); + assertTrue("Expected primary node to be set as designated primary", primaryStoreBean.getDesignatedPrimary()); + + final ManagedBDBHAMessageStore secondaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + assertFalse("Expected secondary node to NOT be set as designated primary", secondaryStoreBean.getDesignatedPrimary()); + } + + public void testSecondaryDesignatedAsPrimaryAfterOriginalPrimaryStopped() throws Exception + { + startCluster(true); + final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary()); + + assertFalse("Expected node to NOT be set as designated primary", storeBean.getDesignatedPrimary()); + storeBean.setDesignatedPrimary(true); + + long limit = System.currentTimeMillis() + 5000; + while( !storeBean.getDesignatedPrimary() && System.currentTimeMillis() < limit) + { + Thread.sleep(100); + } + assertTrue("Expected node to now be set as designated primary", storeBean.getDesignatedPrimary()); + + final Connection connection = getConnection(_brokerFailoverUrl); + assertNotNull("Expected to get a valid connection to new primary", connection); + assertProducingConsuming(connection); + } + + private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort( + final int activeBrokerPortNumber) throws Exception + { + _jmxUtils.open(activeBrokerPortNumber); + + ManagedBDBHAMessageStore storeBean = _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY); + return storeBean; + } + +} diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java new file mode 100644 index 0000000000..ef5cc7c464 --- /dev/null +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.util.Set; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.log4j.Logger; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.url.URLSyntaxException; + +/** + * The HA white box tests test the BDB cluster where the test retains the knowledge of the + * individual test nodes. It uses this knowledge to examine the nodes to ensure that they + * remain in the correct state throughout the test. + * + * @see HAClusterBlackboxTest + */ +public class HAClusterWhiteboxTest extends QpidBrokerTestCase +{ + protected static final Logger LOGGER = Logger.getLogger(HAClusterWhiteboxTest.class); + + private static final String VIRTUAL_HOST = "test"; + + private final int NUMBER_OF_NODES = 3; + private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + + assertTrue(isJavaBroker()); + assertTrue(isBrokerStorePersistent()); + + setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); + + _clusterCreator.configureClusterNodes(); + _clusterCreator.startCluster(); + + super.setUp(); + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + public void testClusterPermitsConnectionToOnlyOneNode() throws Exception + { + int connectionSuccesses = 0; + int connectionFails = 0; + + for (int brokerPortNumber : getBrokerPortNumbers()) + { + try + { + getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithoutRetry(brokerPortNumber)); + connectionSuccesses++; + } + catch(JMSException e) + { + assertTrue(e.getMessage().contains("Virtual host '" + VIRTUAL_HOST + "' is not active")); + connectionFails++; + } + } + + assertEquals("Unexpected number of failed connections", NUMBER_OF_NODES - 1, connectionFails); + assertEquals("Unexpected number of successful connections", 1, connectionSuccesses); + } + + public void testClusterThatLosesNodeStillAllowsConnection() throws Exception + { + final Connection initialConnection = getConnectionToNodeInCluster(); + assertNotNull(initialConnection); + + closeConnectionAndKillBroker(initialConnection); + + final Connection subsequentConnection = getConnectionToNodeInCluster(); + assertNotNull(subsequentConnection); + + // verify that JMS persistence operations are working + assertProducingConsuming(subsequentConnection); + + closeConnection(initialConnection); + } + + public void testClusterThatLosesAllButOneNodeRefusesConnection() throws Exception + { + final Connection initialConnection = getConnectionToNodeInCluster(); + assertNotNull(initialConnection); + + closeConnectionAndKillBroker(initialConnection); + + final Connection subsequentConnection = getConnectionToNodeInCluster(); + assertNotNull(subsequentConnection); + final int subsequentPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(subsequentConnection); + + killBroker(subsequentPortNumber); + + final Connection finalConnection = getConnectionToNodeInCluster(); + assertNull(finalConnection); + + closeConnection(initialConnection); + } + + public void testClusterWithRestartedNodeStillAllowsConnection() throws Exception + { + final Connection connection = getConnectionToNodeInCluster(); + assertNotNull(connection); + + final int brokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(connection); + connection.close(); + + _clusterCreator.stopNode(brokerPortNumber); + _clusterCreator.startNode(brokerPortNumber); + + final Connection subsequentConnection = getConnectionToNodeInCluster(); + assertNotNull(subsequentConnection); + } + + public void testClusterLosingNodeRetainsData() throws Exception + { + final Connection initialConnection = getConnectionToNodeInCluster(); + + final String queueNamePrefix = getTestQueueName(); + final String inbuiltExchangeQueueUrl = "direct://amq.direct/" + queueNamePrefix + "1/" + queueNamePrefix + "1?durable='true'"; + final String customExchangeQueueUrl = "direct://my.exchange/" + queueNamePrefix + "2/" + queueNamePrefix + "2?durable='true'"; + + populateBrokerWithData(initialConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl); + + closeConnectionAndKillBroker(initialConnection); + + final Connection subsequentConnection = getConnectionToNodeInCluster(); + + assertNotNull("no valid connection obtained", subsequentConnection); + + checkBrokerData(subsequentConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl); + } + + public void xtestRecoveryOfOutOfDateNode() throws Exception + { + /* + * TODO: Implement + * + * Cant yet find a way to control cleaning in a deterministic way to allow provoking + * a node to become out of date. We do now know that even a new joiner to the group + * can throw the InsufficientLogException, so ensuring an existing cluster of nodes has + * done *any* cleaning and then adding a new node should be sufficient to cause this. + */ + } + + private void populateBrokerWithData(final Connection connection, final String... queueUrls) throws JMSException, Exception + { + populateBrokerWithData(connection, 1, queueUrls); + } + + private void populateBrokerWithData(final Connection connection, int noOfMessages, final String... queueUrls) throws JMSException, Exception + { + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + for (final String queueUrl : queueUrls) + { + final Queue queue = session.createQueue(queueUrl); + session.createConsumer(queue).close(); + sendMessage(session, queue, noOfMessages); + } + } + + private void checkBrokerData(final Connection connection, final String... queueUrls) throws JMSException + { + connection.start(); + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + for (final String queueUrl : queueUrls) + { + final Queue queue = session.createQueue(queueUrl); + final MessageConsumer consumer = session.createConsumer(queue); + final Message message = consumer.receive(1000); + session.commit(); + assertNotNull("Queue " + queue + " should have message", message); + assertEquals("Queue " + queue + " message has unexpected content", 0, message.getIntProperty(INDEX)); + } + } + + private Connection getConnectionToNodeInCluster() throws URLSyntaxException + { + Connection connection = null; + Set runningBrokerPorts = getBrokerPortNumbers(); + + for (int brokerPortNumber : runningBrokerPorts) + { + try + { + connection = getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithRetry(brokerPortNumber)); + break; + } + catch(JMSException je) + { + assertTrue(je.getMessage().contains("Virtual host '" + VIRTUAL_HOST + "' is not active")); + } + } + return connection; + } + + private void closeConnectionAndKillBroker(final Connection initialConnection) throws Exception + { + final int initialPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection); + initialConnection.close(); + + killBroker(initialPortNumber); // kill awaits the death of the child + } + + private void closeConnection(final Connection initialConnection) + { + try + { + initialConnection.close(); + } + catch(Exception e) + { + // ignore. + // java.net.SocketException is seen sometimes on active connection + } + } +} diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java new file mode 100644 index 0000000000..ebc32b482a --- /dev/null +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; + +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.server.management.plugin.HttpManagement; +import org.apache.qpid.server.model.Plugin; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost; +import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl; +import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNodeImpl; +import org.apache.qpid.systest.rest.RestTestHelper; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.TestBrokerConfiguration; +import org.apache.qpid.url.URLSyntaxException; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; +import org.junit.Assert; + +import com.sleepycat.je.rep.ReplicationConfig; + +public class HATestClusterCreator +{ + protected static final Logger LOGGER = Logger.getLogger(HATestClusterCreator.class); + + private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''"; + private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'"; + + private static final int FAILOVER_CYCLECOUNT = 10; + private static final int FAILOVER_RETRIES = 1; + private static final int FAILOVER_CONNECTDELAY = 1000; + + private static final String SINGLE_BROKER_URL_WITH_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''"; + private static final String SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d'"; + + private static final int RETRIES = 60; + private static final int CONNECTDELAY = 75; + + private final QpidBrokerTestCase _testcase; + private final Map _brokerPortToBdbPortMap = new TreeMap(); + private final String _virtualHostName; + + private final String _ipAddressOfBroker; + private final String _groupName ; + private final int _numberOfNodes; + private int _bdbHelperPort; + private int _primaryBrokerPort; + + public HATestClusterCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes) + { + _testcase = testcase; + _virtualHostName = virtualHostName; + _groupName = virtualHostName; + _ipAddressOfBroker = getIpAddressOfBrokerHost(); + _numberOfNodes = numberOfNodes; + _bdbHelperPort = 0; + } + + public void configureClusterNodes() throws Exception + { + int brokerPort = _testcase.findFreePort(); + + int[] bdbPorts = new int[_numberOfNodes]; + for (int i = 0; i < _numberOfNodes; i++) + { + int bdbPort = _testcase.getNextAvailable(brokerPort + 1); + bdbPorts[i] = bdbPort; + _brokerPortToBdbPortMap.put(brokerPort, bdbPort); + brokerPort = _testcase.getNextAvailable(bdbPort + 1); + } + + String bluePrintJson = getBlueprint(_ipAddressOfBroker, bdbPorts); + + String helperName = null; + for (Map.Entry entry: _brokerPortToBdbPortMap.entrySet()) + { + brokerPort = entry.getKey(); + int bdbPort = entry.getValue(); + LOGGER.debug("Cluster broker port " + brokerPort + ", bdb replication port " + bdbPort); + if (_bdbHelperPort == 0) + { + _bdbHelperPort = bdbPort; + } + + String nodeName = getNodeNameForNodeAt(bdbPort); + if (helperName == null) + { + helperName = nodeName; + } + + Map virtualHostNodeAttributes = new HashMap(); + virtualHostNodeAttributes.put(BDBHAVirtualHostNode.STORE_PATH, System.getProperty("QPID_WORK") + File.separator + brokerPort); + virtualHostNodeAttributes.put(BDBHAVirtualHostNode.GROUP_NAME, _groupName); + virtualHostNodeAttributes.put(BDBHAVirtualHostNode.NAME, nodeName); + virtualHostNodeAttributes.put(BDBHAVirtualHostNode.ADDRESS, getNodeHostPortForNodeAt(bdbPort)); + virtualHostNodeAttributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, getHelperHostPort()); + virtualHostNodeAttributes.put(BDBHAVirtualHostNode.TYPE, BDBHAVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE); + virtualHostNodeAttributes.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, helperName); + + Map context = new HashMap<>(); + context.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s"); + context.put(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES, "0"); + context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR, bluePrintJson); + virtualHostNodeAttributes.put(BDBHAVirtualHostNode.CONTEXT, context); + + TestBrokerConfiguration brokerConfiguration = _testcase.getBrokerConfiguration(brokerPort); + brokerConfiguration.addJmxManagementConfiguration(); + brokerConfiguration.addHttpManagementConfiguration(); + brokerConfiguration.setObjectAttribute(Plugin.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_MANAGEMENT, HttpManagement.HTTP_BASIC_AUTHENTICATION_ENABLED, true); + brokerConfiguration.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT, Port.PORT, _testcase.getHttpManagementPort(brokerPort)); + brokerConfiguration.setObjectAttributes(VirtualHostNode.class, _virtualHostName, virtualHostNodeAttributes); + + } + _primaryBrokerPort = getPrimaryBrokerPort(); + } + + public void setDesignatedPrimaryOnFirstBroker(boolean designatedPrimary) throws Exception + { + if (_numberOfNodes != 2) + { + throw new IllegalArgumentException("Only two nodes groups have the concept of primary"); + } + TestBrokerConfiguration config = _testcase.getBrokerConfiguration(_primaryBrokerPort); + String nodeName = getNodeNameForNodeAt(_brokerPortToBdbPortMap.get(_primaryBrokerPort)); + config.setObjectAttribute(VirtualHostNode.class, nodeName, BDBHAVirtualHostNode.DESIGNATED_PRIMARY, designatedPrimary); + config.setSaved(false); + } + + private int getPrimaryBrokerPort() + { + return _brokerPortToBdbPortMap.keySet().iterator().next(); + } + + public void startNode(final int brokerPortNumber) throws Exception + { + _testcase.startBroker(brokerPortNumber); + } + + public void startCluster() throws Exception + { + for (final Integer brokerPortNumber : _brokerPortToBdbPortMap.keySet()) + { + startNode(brokerPortNumber); + } + } + + public void startClusterParallel() throws Exception + { + final ExecutorService executor = Executors.newFixedThreadPool(_brokerPortToBdbPortMap.size()); + try + { + List> brokers = new CopyOnWriteArrayList>(); + for (final Integer brokerPortNumber : _brokerPortToBdbPortMap.keySet()) + { + final TestBrokerConfiguration brokerConfig = _testcase.getBrokerConfiguration(brokerPortNumber); + Future future = executor.submit(new Callable() + { + public Object call() + { + try + { + _testcase.startBroker(brokerPortNumber, brokerConfig); + return "OK"; + } + catch (Exception e) + { + return e; + } + } + }); + brokers.add(future); + } + for (Future future : brokers) + { + Object result = future.get(30, TimeUnit.SECONDS); + LOGGER.debug("Node startup result:" + result); + if (result instanceof Exception) + { + throw (Exception) result; + } + else if (!"OK".equals(result)) + { + throw new Exception("One of the cluster nodes is not started"); + } + } + } + catch (Exception e) + { + stopCluster(); + throw e; + } + finally + { + executor.shutdown(); + } + + } + + public void stopNode(final int brokerPortNumber) + { + _testcase.killBroker(brokerPortNumber); + } + + public void stopCluster() throws Exception + { + for (final Integer brokerPortNumber : _brokerPortToBdbPortMap.keySet()) + { + try + { + stopNode(brokerPortNumber); + } + catch(Exception e) + { + LOGGER.warn("Failed to stop node on port:" + brokerPortNumber); + } + } + } + + public int getBrokerPortNumberFromConnection(Connection connection) + { + final AMQConnection amqConnection = (AMQConnection)connection; + return amqConnection.getActiveBrokerDetails().getPort(); + } + + public int getPortNumberOfAnInactiveBroker(final Connection activeConnection) + { + final Set allBrokerPorts = _testcase.getBrokerPortNumbers(); + LOGGER.debug("Broker ports:" + allBrokerPorts); + final int activeBrokerPort = getBrokerPortNumberFromConnection(activeConnection); + allBrokerPorts.remove(activeBrokerPort); + LOGGER.debug("Broker ports:" + allBrokerPorts); + final int inactiveBrokerPort = allBrokerPorts.iterator().next(); + return inactiveBrokerPort; + } + + public int getBdbPortForBrokerPort(final int brokerPortNumber) + { + return _brokerPortToBdbPortMap.get(brokerPortNumber); + } + + public Set getBdbPortNumbers() + { + return new HashSet(_brokerPortToBdbPortMap.values()); + } + + public AMQConnectionURL getConnectionUrlForAllClusterNodes() throws Exception + { + final StringBuilder brokerList = new StringBuilder(); + + for(Iterator itr = _brokerPortToBdbPortMap.keySet().iterator(); itr.hasNext(); ) + { + int brokerPortNumber = itr.next(); + + brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, FAILOVER_CONNECTDELAY, FAILOVER_RETRIES)); + if (itr.hasNext()) + { + brokerList.append(";"); + } + } + + return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, FAILOVER_CYCLECOUNT)); + } + + public AMQConnectionURL getConnectionUrlForSingleNodeWithoutRetry(final int brokerPortNumber) throws URLSyntaxException + { + return getConnectionUrlForSingleNode(brokerPortNumber, false); + } + + public AMQConnectionURL getConnectionUrlForSingleNodeWithRetry(final int brokerPortNumber) throws URLSyntaxException + { + return getConnectionUrlForSingleNode(brokerPortNumber, true); + } + + private AMQConnectionURL getConnectionUrlForSingleNode(final int brokerPortNumber, boolean retryAllowed) throws URLSyntaxException + { + final String url; + if (retryAllowed) + { + url = String.format(SINGLE_BROKER_URL_WITH_RETRY_FORMAT, _virtualHostName, brokerPortNumber, CONNECTDELAY, RETRIES); + } + else + { + url = String.format(SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT, _virtualHostName, brokerPortNumber); + } + + return new AMQConnectionURL(url); + } + + public String getGroupName() + { + return _groupName; + } + + public String getNodeNameForNodeAt(final int bdbPort) + { + return "node" + _testcase.getName() + bdbPort; + } + + public String getNodeHostPortForNodeAt(final int bdbPort) + { + return _ipAddressOfBroker + ":" + bdbPort; + } + + public String getHelperHostPort() + { + if (_bdbHelperPort == 0) + { + throw new IllegalStateException("Helper port not yet assigned."); + } + + return _ipAddressOfBroker + ":" + _bdbHelperPort; + } + + public void setHelperHostPort(int bdbHelperPort) + { + _bdbHelperPort = bdbHelperPort; + } + + public int getBrokerPortNumberOfPrimary() + { + if (_numberOfNodes != 2) + { + throw new IllegalArgumentException("Only two nodes groups have the concept of primary"); + } + + return _primaryBrokerPort; + } + + public int getBrokerPortNumberOfSecondaryNode() + { + final Set portNumbers = getBrokerPortNumbersForNodes(); + portNumbers.remove(getBrokerPortNumberOfPrimary()); + return portNumbers.iterator().next(); + } + + public Set getBrokerPortNumbersForNodes() + { + return new HashSet(_brokerPortToBdbPortMap.keySet()); + } + + + public String getIpAddressOfBrokerHost() + { + String brokerHost = _testcase.getBroker().getHost(); + try + { + return InetAddress.getByName(brokerHost).getHostAddress(); + } + catch (UnknownHostException e) + { + throw new RuntimeException("Could not determine IP address of host : " + brokerHost, e); + } + } + + public void modifyClusterNodeBdbAddress(int brokerPortNumberToBeMoved, int newBdbPort) + { + TestBrokerConfiguration config = _testcase.getBrokerConfiguration(brokerPortNumberToBeMoved); + String nodeName = getNodeNameForNodeAt(_brokerPortToBdbPortMap.get(brokerPortNumberToBeMoved)); + + Map objectAttributes = config.getObjectAttributes(VirtualHostNode.class, nodeName); + + String oldBdbHostPort = (String)objectAttributes.get(BDBHAVirtualHostNode.ADDRESS); + String[] oldHostAndPort = StringUtils.split(oldBdbHostPort, ":"); + String oldHost = oldHostAndPort[0]; + String newBdbHostPort = oldHost + ":" + newBdbPort; + config.setObjectAttribute(VirtualHostNode.class, nodeName, BDBHAVirtualHostNode.ADDRESS, newBdbHostPort); + config.setSaved(false); + } + + public String getNodeNameForBrokerPort(final int brokerPort) + { + return getNodeNameForNodeAt(_brokerPortToBdbPortMap.get(brokerPort)); + } + + public void setNodeAttributes(int brokerPort, Map attributeMap) + throws Exception + { + setNodeAttributes(brokerPort, brokerPort, attributeMap); + } + + public void setNodeAttributes(int localNodePort, int remoteNodePort, Map attributeMap) + throws Exception + { + RestTestHelper restHelper = createRestTestHelper(localNodePort); + String url = getNodeRestUrl(localNodePort, remoteNodePort); + int status = restHelper.submitRequest(url, "PUT", attributeMap); + if (status != 200) + { + throw new Exception("Unexpected http status when updating " + getNodeNameForBrokerPort(remoteNodePort) + " attribute's : " + status); + } + } + + private String getNodeRestUrl(int localNodePort, int remoteNodePort) + { + String remoteNodeName = getNodeNameForBrokerPort(remoteNodePort); + String localNodeName = getNodeNameForBrokerPort(localNodePort); + String url = null; + if (localNodePort == remoteNodePort) + { + url = "/api/latest/virtualhostnode/" + localNodeName; + } + else + { + url = "/api/latest/replicationnode/" + localNodeName + "/" + remoteNodeName; + } + return url; + } + + public Map getNodeAttributes(int brokerPort) throws IOException + { + return getNodeAttributes(brokerPort, brokerPort); + } + + public Map getNodeAttributes(int localNodePort, int remoteNodePort) throws IOException + { + RestTestHelper restHelper = createRestTestHelper(localNodePort); + List> results= restHelper.getJsonAsList(getNodeRestUrl(localNodePort, remoteNodePort)); + int size = results.size(); + if (size == 0) + { + return Collections.emptyMap(); + } + else if (size == 1) + { + return results.get(0); + } + else + { + throw new RuntimeException("Unexpected number of nodes " + size); + } + } + + public void awaitNodeToAttainRole(int brokerPort, String desiredRole) throws Exception + { + awaitNodeToAttainRole(brokerPort, brokerPort, desiredRole); + } + + public void awaitNodeToAttainRole(int localNodePort, int remoteNodePort, String desiredRole) throws Exception + { + final long startTime = System.currentTimeMillis(); + Map data = Collections.emptyMap(); + + while(!desiredRole.equals(data.get(BDBHARemoteReplicationNode.ROLE)) && (System.currentTimeMillis() - startTime) < 30000) + { + LOGGER.debug("Awaiting node '" + getNodeNameForBrokerPort(remoteNodePort) + "' to transit into " + desiredRole + " role"); + data = getNodeAttributes(localNodePort, remoteNodePort); + if (!desiredRole.equals(data.get(BDBHARemoteReplicationNode.ROLE))) + { + Thread.sleep(1000); + } + } + LOGGER.debug("Node '" + getNodeNameForBrokerPort(remoteNodePort) + "' role is " + data.get(BDBHARemoteReplicationNode.ROLE)); + Assert.assertEquals("Node is in unexpected role", desiredRole, data.get(BDBHARemoteReplicationNode.ROLE)); + } + + public RestTestHelper createRestTestHelper(int brokerPort) + { + int httpPort = _testcase.getHttpManagementPort(brokerPort); + RestTestHelper helper = new RestTestHelper(httpPort); + helper.setUsernameAndPassword("webadmin", "webadmin"); + return helper; + } + + public static String getBlueprint(String hostName, int... ports) throws Exception + { + List permittedNodes = new ArrayList(); + for (int port:ports) + { + permittedNodes.add(hostName + ":" + port); + } + Map bluePrint = new HashMap<>(); + bluePrint.put(VirtualHost.TYPE, BDBHAVirtualHostImpl.VIRTUAL_HOST_TYPE); + bluePrint.put(BDBHAVirtualHost.PERMITTED_NODES, permittedNodes); + + StringWriter writer = new StringWriter(); + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true); + mapper.writeValue(writer, bluePrint); + return writer.toString(); + } +} -- cgit v1.2.1