diff options
| author | Robert Gemmell <robbie@apache.org> | 2012-06-28 16:46:12 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2012-06-28 16:46:12 +0000 |
| commit | 07c285f662e8f60d4e8aca247b65b77ca5df4587 (patch) | |
| tree | 7fe15262589c0fe5206e02a5e9336c6288f004e0 /qpid/java/bdbstore/jmx | |
| parent | bb45ec03f95ffdfa6c0163067dcb75af8b64ceb5 (diff) | |
| download | qpid-python-07c285f662e8f60d4e8aca247b65b77ca5df4587.tar.gz | |
QPID-3998, QPID-3999, QPID-4093: add new management plugins for jmx/rest/webui functionality, partial merge from the java-config-and-management branch at r1355039
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1355072 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/jmx')
9 files changed, 1118 insertions, 0 deletions
diff --git a/qpid/java/bdbstore/jmx/MANIFEST.MF b/qpid/java/bdbstore/jmx/MANIFEST.MF new file mode 100644 index 0000000000..7046c4326d --- /dev/null +++ b/qpid/java/bdbstore/jmx/MANIFEST.MF @@ -0,0 +1,20 @@ +Manifest-Version: 1.0 +Bundle-ManifestVersion: 2 +Bundle-Name: Qpid Bdbstore-Plugins JMX +Bundle-SymbolicName: bdbstore-plugins-jmx +Bundle-Description: Bdbstore Management plugin for Qpid. +Bundle-License: http://www.apache.org/licenses/LICENSE-2.0.txt +Bundle-DocURL: http://www.apache.org/ +Bundle-Version: 1.0.0 +Bundle-RequiredExecutionEnvironment: JavaSE-1.6 +Bundle-ClassPath: . +Fragment-Host: broker-plugins-jmx +Import-Package: org.apache.qpid, + org.apache.qpid.management.common.mbeans.annotations, + org.apache.qpid.server.model, + org.apache.qpid.server.virtualhost, + org.apache.qpid.server.store.berkeleydb, + org.apache.log4j;version=1.2.16, + javax.management, + javax.management.openmbean +Export-Package: org.apache.qpid.server.store.berkeleydb.jmx diff --git a/qpid/java/bdbstore/jmx/build.xml b/qpid/java/bdbstore/jmx/build.xml new file mode 100644 index 0000000000..2015b0cbb5 --- /dev/null +++ b/qpid/java/bdbstore/jmx/build.xml @@ -0,0 +1,29 @@ +<!-- + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + --> +<project name="bdbstore-jmx" default="build"> + <property name="module.depends" value="common broker broker-plugins broker-plugins/jmx management/common bdbstore" /> + <property name="module.test.depends" value="test broker/test common/test management/common client systests bdbstore/test" /> + + <property name="module.manifest" value="MANIFEST.MF" /> + <property name="module.plugin" value="true" /> + + <import file="../../module.xml" /> + + <target name="bundle" depends="bundle-tasks" /> +</project> diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java new file mode 100644 index 0000000000..455573f7bc --- /dev/null +++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.jmx; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import javax.management.JMException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.jmx.AMQManagedObject; +import org.apache.qpid.server.jmx.ManagedObject; +import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore; + +/** + * Management mbean for BDB HA. + * <p> + * At runtime, the classloader loading this clas must have visibility of the other Qpid JMX classes. This is + * currently arranged through OSGI using the <b>fragment</b> feature so that this bundle shares the + * same classloader as broker-plugins-jmx. See the <b>Fragment-Host:</b> header within the MANIFEST.MF + * of this bundle. + * </p> + */ +public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements ManagedBDBHAMessageStore +{ + private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStoreManagerMBean.class); + + private static final TabularType GROUP_MEMBERS_TABLE; + private static final CompositeType GROUP_MEMBER_ROW; + private static final OpenType<?>[] GROUP_MEMBER_ATTRIBUTE_TYPES; + + static + { + try + { + GROUP_MEMBER_ATTRIBUTE_TYPES = new OpenType<?>[] {SimpleType.STRING, SimpleType.STRING}; + final String[] itemNames = new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT}; + final String[] itemDescriptions = new String[] {"Unique node name", "Node host / port "}; + GROUP_MEMBER_ROW = new CompositeType("GroupMember", "Replication group member", + itemNames, + itemDescriptions, + GROUP_MEMBER_ATTRIBUTE_TYPES ); + GROUP_MEMBERS_TABLE = new TabularType("GroupMembers", "Replication group memebers", + GROUP_MEMBER_ROW, + new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME}); + } + catch (final OpenDataException ode) + { + throw new ExceptionInInitializerError(ode); + } + } + + private final BDBHAMessageStore _store; + + protected BDBHAMessageStoreManagerMBean(BDBHAMessageStore store, ManagedObject parent) throws JMException + { + super(ManagedBDBHAMessageStore.class, ManagedBDBHAMessageStore.TYPE, ((AMQManagedObject)parent).getRegistry()); + LOGGER.debug("Creating BDBHAMessageStoreManagerMBean"); + _store = store; + register(); + } + + @Override + public String getObjectInstanceName() + { + return _store.getName(); + } + + @Override + public String getGroupName() + { + return _store.getGroupName(); + } + + @Override + public String getNodeName() + { + return _store.getNodeName(); + } + + @Override + public String getNodeHostPort() + { + return _store.getNodeHostPort(); + } + + @Override + public String getHelperHostPort() + { + return _store.getHelperHostPort(); + } + + @Override + public String getDurability() throws IOException, JMException + { + try + { + return _store.getDurability(); + } + catch (RuntimeException e) + { + LOGGER.debug("Failed query replication policy", e); + throw new JMException(e.getMessage()); + } + } + + + @Override + public boolean getCoalescingSync() throws IOException, JMException + { + return _store.isCoalescingSync(); + } + + @Override + public String getNodeState() throws IOException, JMException + { + try + { + return _store.getNodeState(); + } + catch (RuntimeException e) + { + LOGGER.debug("Failed query node state", e); + throw new JMException(e.getMessage()); + } + } + + @Override + public boolean getDesignatedPrimary() throws IOException, JMException + { + try + { + return _store.isDesignatedPrimary(); + } + catch (RuntimeException e) + { + LOGGER.debug("Failed query designated primary", e); + throw new JMException(e.getMessage()); + } + } + + @Override + public TabularData getAllNodesInGroup() throws IOException, JMException + { + final TabularDataSupport data = new TabularDataSupport(GROUP_MEMBERS_TABLE); + final List<Map<String, String>> members = _store.getGroupMembers(); + + for (Map<String, String> map : members) + { + CompositeData memberData = new CompositeDataSupport(GROUP_MEMBER_ROW, map); + data.put(memberData); + } + return data; + } + + @Override + public void removeNodeFromGroup(String nodeName) throws JMException + { + try + { + _store.removeNodeFromGroup(nodeName); + } + catch (AMQStoreException e) + { + LOGGER.error("Failed to remove node " + nodeName + " from group", e); + throw new JMException(e.getMessage()); + } + } + + @Override + public void setDesignatedPrimary(boolean primary) throws JMException + { + try + { + _store.setDesignatedPrimary(primary); + } + catch (AMQStoreException e) + { + LOGGER.error("Failed to set node " + _store.getNodeName() + " as designated primary", e); + throw new JMException(e.getMessage()); + } + } + + @Override + public void updateAddress(String nodeName, String newHostName, int newPort) throws JMException + { + try + { + _store.updateAddress(nodeName, newHostName, newPort); + } + catch(AMQStoreException e) + { + LOGGER.error("Failed to update address for node " + nodeName + " to " + newHostName + ":" + newPort, e); + throw new JMException(e.getMessage()); + } + } + + @Override + public ManagedObject getParentObject() + { + return null; + } + +} diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java new file mode 100644 index 0000000000..837da1eef3 --- /dev/null +++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.jmx; + +import javax.management.JMException; +import javax.management.StandardMBean; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.jmx.MBeanProvider; +import org.apache.qpid.server.jmx.ManagedObject; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; + +/** + * This provide will create a {@link BDBHAMessageStoreManagerMBean} if the child is a virtual + * host and of type {@link BDBHAMessageStore#BDB_HA_STORE_TYPE}. + * + */ +public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider +{ + private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStoreManagerMBeanProvider.class); + + public BDBHAMessageStoreManagerMBeanProvider() + { + super(); + } + + @Override + public boolean isChildManageableByMBean(ConfiguredObject child) + { + return (child instanceof VirtualHost + && BDBHAMessageStore.BDB_HA_STORE_TYPE.equals(child.getAttribute(VirtualHost.STORE_TYPE))); + } + + @Override + public StandardMBean createMBean(ConfiguredObject child, StandardMBean parent) throws JMException + { + VirtualHost virtualHostChild = (VirtualHost) child; + + VirtualHostRegistry virtualHostRegistry = ApplicationRegistry.getInstance().getVirtualHostRegistry(); + org.apache.qpid.server.virtualhost.VirtualHost vhost = virtualHostRegistry.getVirtualHost(virtualHostChild.getName()); + + BDBHAMessageStore messageStore = (BDBHAMessageStore) vhost.getMessageStore(); + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Creating mBean for child " + child); + } + + return new BDBHAMessageStoreManagerMBean(messageStore, (ManagedObject) parent); + } +} diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java new file mode 100644 index 0000000000..b85e44526b --- /dev/null +++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.jmx; + +import java.io.IOException; + +import javax.management.JMException; +import javax.management.openmbean.TabularData; + +import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute; +import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParameter; +import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation; + +public interface ManagedBDBHAMessageStore +{ + public static final String TYPE = "BDBHAMessageStore"; + + public static final String ATTR_GROUP_NAME = "GroupName"; + public static final String ATTR_NODE_NAME = "NodeName"; + public static final String ATTR_NODE_HOST_PORT = "NodeHostPort"; + public static final String ATTR_HELPER_HOST_PORT = "HelperHostPort"; + public static final String ATTR_DURABILITY = "Durability"; + public static final String ATTR_NODE_STATE = "NodeState"; + public static final String ATTR_DESIGNATED_PRIMARY = "DesignatedPrimary"; + public static final String ATTR_COALESCING_SYNC = "CoalescingSync"; + + @MBeanAttribute(name=ATTR_GROUP_NAME, description="Name identifying the group") + String getGroupName() throws IOException, JMException; + + @MBeanAttribute(name=ATTR_NODE_NAME, description="Unique name identifying the node within the group") + String getNodeName() throws IOException, JMException; + + @MBeanAttribute(name=ATTR_NODE_HOST_PORT, description="Host/port used to replicate data between this node and others in the group") + String getNodeHostPort() throws IOException, JMException; + + @MBeanAttribute(name=ATTR_NODE_STATE, description="Current state of this node") + String getNodeState() throws IOException, JMException; + + @MBeanAttribute(name=ATTR_HELPER_HOST_PORT, description="Host/port used to allow a new node to discover other group members") + String getHelperHostPort() throws IOException, JMException; + + @MBeanAttribute(name=ATTR_DURABILITY, description="Durability") + String getDurability() throws IOException, JMException; + + @MBeanAttribute(name=ATTR_DESIGNATED_PRIMARY, description="Designated primary flag. Applicable to the two node case.") + boolean getDesignatedPrimary() throws IOException, JMException; + + @MBeanAttribute(name=ATTR_COALESCING_SYNC, description="Coalescing sync flag. Applicable to the master sync policies NO_SYNC and WRITE_NO_SYNC only.") + boolean getCoalescingSync() throws IOException, JMException; + + @MBeanAttribute(name="getAllNodesInGroup", description="Get all nodes within the group, regardless of whether currently attached or not") + TabularData getAllNodesInGroup() throws IOException, JMException; + + @MBeanOperation(name="removeNodeFromGroup", description="Remove an existing node from the group") + void removeNodeFromGroup(@MBeanOperationParameter(name="nodeName", description="name of node")String nodeName) throws JMException; + + @MBeanOperation(name="setDesignatedPrimary", description="Set/unset this node as the designated primary for the group. Applicable to the two node case.") + void setDesignatedPrimary(@MBeanOperationParameter(name="primary", description="designated primary")boolean primary) throws JMException; + + @MBeanOperation(name="updateAddress", description="Update the address of another node. The node must be in a STOPPED state.") + void updateAddress(@MBeanOperationParameter(name="nodeName", description="name of node")String nodeName, + @MBeanOperationParameter(name="newHostName", description="new hostname")String newHostName, + @MBeanOperationParameter(name="newPort", description="new port number")int newPort) throws JMException; +} + diff --git a/qpid/java/bdbstore/jmx/src/main/resources/services/org.apache.qpid.server.jmx.MBeanProvider b/qpid/java/bdbstore/jmx/src/main/resources/services/org.apache.qpid.server.jmx.MBeanProvider new file mode 100644 index 0000000000..b5bc947612 --- /dev/null +++ b/qpid/java/bdbstore/jmx/src/main/resources/services/org.apache.qpid.server.jmx.MBeanProvider @@ -0,0 +1 @@ +org.apache.qpid.server.store.berkeleydb.jmx.BDBHAMessageStoreManagerMBeanProvider diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java new file mode 100644 index 0000000000..45038bf050 --- /dev/null +++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.DETACHED; +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.MASTER; +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.REPLICA; +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import javax.jms.Connection; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; + +import org.apache.log4j.Logger; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import com.sleepycat.je.EnvironmentFailureException; + +/** + * System test verifying the ability to control a cluster via the Management API. + * + * @see HAClusterBlackboxTest + */ +public class HAClusterManagementTest extends QpidBrokerTestCase +{ + protected static final Logger LOGGER = Logger.getLogger(HAClusterManagementTest.class); + + private static final Set<String> NON_MASTER_STATES = new HashSet<String>(Arrays.asList(REPLICA.toString(), DETACHED.toString(), UNKNOWN.toString()));; + private static final String VIRTUAL_HOST = "test"; + + private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + VIRTUAL_HOST; + private static final int NUMBER_OF_NODES = 4; + + private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + private final JMXTestUtils _jmxUtils = new JMXTestUtils(this); + + private ConnectionURL _brokerFailoverUrl; + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + _jmxUtils.setUp(); + + _clusterCreator.configureClusterNodes(); + _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); + _clusterCreator.startCluster(); + + super.setUp(); + } + + @Override + protected void tearDown() throws Exception + { + try + { + _jmxUtils.close(); + } + finally + { + super.tearDown(); + } + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + public void testReadonlyMBeanAttributes() throws Exception + { + final int brokerPortNumber = getBrokerPortNumbers().iterator().next(); + final int bdbPortNumber = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumber); + + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber); + assertEquals("Unexpected store group name", _clusterCreator.getGroupName(), storeBean.getGroupName()); + assertEquals("Unexpected store node name", _clusterCreator.getNodeNameForNodeAt(bdbPortNumber), storeBean.getNodeName()); + assertEquals("Unexpected store node host port",_clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber), storeBean.getNodeHostPort()); + assertEquals("Unexpected store helper host port", _clusterCreator.getHelperHostPort(), storeBean.getHelperHostPort()); + // As we have chosen an arbitrary broker from the cluster, we cannot predict its state + assertNotNull("Store state must not be null", storeBean.getNodeState()); + } + + public void testStateOfActiveBrokerIsMaster() throws Exception + { + final Connection activeConnection = getConnection(_brokerFailoverUrl); + final int activeBrokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(activeConnection); + + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(activeBrokerPortNumber); + assertEquals("Unexpected store state", MASTER.toString(), storeBean.getNodeState()); + } + + public void testStateOfNonActiveBrokerIsNotMaster() throws Exception + { + final Connection activeConnection = getConnection(_brokerFailoverUrl); + final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection); + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(inactiveBrokerPortNumber); + final String nodeState = storeBean.getNodeState(); + assertTrue("Unexpected store state : " + nodeState, NON_MASTER_STATES.contains(nodeState)); + } + + public void testGroupMembers() throws Exception + { + final int brokerPortNumber = getBrokerPortNumbers().iterator().next(); + + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber); + final TabularData groupMembers = storeBean.getAllNodesInGroup(); + assertNotNull(groupMembers); + + final int numberOfDataRows = groupMembers.size(); + assertEquals("Unexpected number of data rows", NUMBER_OF_NODES ,numberOfDataRows); + + for(int bdbPortNumber : _clusterCreator.getBdbPortNumbers()) + { + final String nodeName = _clusterCreator.getNodeNameForNodeAt(bdbPortNumber); + final String nodeHostPort = _clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber); + + CompositeData row = groupMembers.get(new Object[] {nodeName}); + assertNotNull("Table does not contain row for node name " + nodeName, row); + assertEquals(nodeHostPort, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT)); + } + } + + public void testRemoveNodeFromGroup() throws Exception + { + final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator(); + final int brokerPortNumberToMakeObservation = brokerPortNumberIterator.next(); + final int brokerPortNumberToBeRemoved = brokerPortNumberIterator.next(); + final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToMakeObservation); + final int numberOfDataRows = storeBean.getAllNodesInGroup().size(); + assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES ,numberOfDataRows); + + final String removedNodeName = _clusterCreator.getNodeNameForNodeAt(_clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeRemoved)); + _clusterCreator.stopNode(brokerPortNumberToBeRemoved); + storeBean.removeNodeFromGroup(removedNodeName); + + final int numberOfDataRowsAfterRemoval = storeBean.getAllNodesInGroup().size(); + assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES - 1,numberOfDataRowsAfterRemoval); + } + + /** + * Updates the address of a node. + * + * If the broker (node) can subsequently start without error then the update was a success, hence no need for an explicit + * assert. + * + * @see #testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() for converse case + */ + public void testUpdateAddress() throws Exception + { + final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator(); + final int brokerPortNumberToPerformUpdate = brokerPortNumberIterator.next(); + final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next(); + final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToPerformUpdate); + + _clusterCreator.stopNode(brokerPortNumberToBeMoved); + + final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved); + final int newBdbPort = getNextAvailable(oldBdbPort + 1); + + storeBean.updateAddress(_clusterCreator.getNodeNameForNodeAt(oldBdbPort), _clusterCreator.getIpAddressOfBrokerHost(), newBdbPort); + + _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort); + + _clusterCreator.startNode(brokerPortNumberToBeMoved); + } + + /** + * @see #testUpdateAddress() + */ + public void testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() throws Exception + { + final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator(); + final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next(); + + _clusterCreator.stopNode(brokerPortNumberToBeMoved); + + final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved); + final int newBdbPort = getNextAvailable(oldBdbPort + 1); + + // now deliberately don't call updateAddress + + _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort); + + try + { + _clusterCreator.startNode(brokerPortNumberToBeMoved); + fail("Exception not thrown"); + } + catch(RuntimeException rte) + { + //check cause was BDBs EnvironmentFailureException + assertTrue(rte.getMessage().contains(EnvironmentFailureException.class.getName())); + // PASS + } + } + + private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort( + final int activeBrokerPortNumber) throws Exception + { + _jmxUtils.open(activeBrokerPortNumber); + + return _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY); + } +} diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java new file mode 100644 index 0000000000..22877ec36c --- /dev/null +++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import com.sleepycat.je.rep.ReplicationConfig; + +public class HAClusterTwoNodeTest extends QpidBrokerTestCase +{ + private static final long RECEIVE_TIMEOUT = 5000l; + + private static final String VIRTUAL_HOST = "test"; + + private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + VIRTUAL_HOST; + private static final int NUMBER_OF_NODES = 2; + + private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + private final JMXTestUtils _jmxUtils = new JMXTestUtils(this); + + private ConnectionURL _brokerFailoverUrl; + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + + assertTrue(isJavaBroker()); + assertTrue(isBrokerStorePersistent()); + _jmxUtils.setUp(); + + super.setUp(); + } + + @Override + protected void tearDown() throws Exception + { + try + { + _jmxUtils.close(); + } + finally + { + super.tearDown(); + } + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + private void startCluster(boolean designedPrimary) throws Exception + { + setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); + + String storeConfigKeyPrefix = _clusterCreator.getStoreConfigKeyPrefix(); + + setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).name", ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT); + setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).value", "2 s"); + + setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES); + setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).value", "0"); + + _clusterCreator.configureClusterNodes(); + _clusterCreator.setDesignatedPrimaryOnFirstBroker(designedPrimary); + _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); + _clusterCreator.startCluster(); + } + + public void testMasterDesignatedPrimaryCanBeRestartedWithoutReplica() throws Exception + { + startCluster(true); + final Connection initialConnection = getConnection(_brokerFailoverUrl); + int masterPort = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection); + assertProducingConsuming(initialConnection); + initialConnection.close(); + _clusterCreator.stopCluster(); + _clusterCreator.startNode(masterPort); + final Connection secondConnection = getConnection(_brokerFailoverUrl); + assertProducingConsuming(secondConnection); + secondConnection.close(); + } + + public void testClusterRestartWithoutDesignatedPrimary() throws Exception + { + startCluster(false); + final Connection initialConnection = getConnection(_brokerFailoverUrl); + assertProducingConsuming(initialConnection); + initialConnection.close(); + _clusterCreator.stopCluster(); + _clusterCreator.startClusterParallel(); + final Connection secondConnection = getConnection(_brokerFailoverUrl); + assertProducingConsuming(secondConnection); + secondConnection.close(); + } + + public void testDesignatedPrimaryContinuesAfterSecondaryStopped() throws Exception + { + startCluster(true); + _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + final Connection connection = getConnection(_brokerFailoverUrl); + assertNotNull("Expected to get a valid connection to primary", connection); + assertProducingConsuming(connection); + } + + public void testPersistentOperationsFailOnNonDesignatedPrimarysAfterSecondaryStopped() throws Exception + { + startCluster(false); + _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + final Connection connection = getConnection(_brokerFailoverUrl); + assertNotNull("Expected to get a valid connection to primary", connection); + try + { + assertProducingConsuming(connection); + fail("JMS peristent operations succeded on Master 'not designated primary' buy they should fail as replica is not available"); + } + catch(JMSException e) + { + // JMSException should be thrown on transaction start/commit + } + } + + public void testSecondaryDoesNotBecomePrimaryWhenDesignatedPrimaryStopped() throws Exception + { + startCluster(true); + _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary()); + + try + { + getConnection(_brokerFailoverUrl); + fail("Connection not expected"); + } + catch (JMSException e) + { + // PASS + } + } + + public void testInitialDesignatedPrimaryStateOfNodes() throws Exception + { + startCluster(true); + final ManagedBDBHAMessageStore primaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfPrimary()); + assertTrue("Expected primary node to be set as designated primary", primaryStoreBean.getDesignatedPrimary()); + + final ManagedBDBHAMessageStore secondaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + assertFalse("Expected secondary node to NOT be set as designated primary", secondaryStoreBean.getDesignatedPrimary()); + } + + public void testSecondaryDesignatedAsPrimaryAfterOrginalPrimaryStopped() throws Exception + { + startCluster(true); + _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary()); + final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + + assertFalse("Expected node to NOT be set as designated primary", storeBean.getDesignatedPrimary()); + storeBean.setDesignatedPrimary(true); + assertTrue("Expected node to now be set as designated primary", storeBean.getDesignatedPrimary()); + + final Connection connection = getConnection(_brokerFailoverUrl); + assertNotNull("Expected to get a valid connection to new primary", connection); + assertProducingConsuming(connection); + } + + private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort( + final int activeBrokerPortNumber) throws Exception + { + _jmxUtils.open(activeBrokerPortNumber); + + ManagedBDBHAMessageStore storeBean = _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY); + return storeBean; + } + + private void assertProducingConsuming(final Connection connection) throws JMSException, Exception + { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(getTestQueueName()); + MessageConsumer consumer = session.createConsumer(destination); + sendMessage(session, destination, 1); + connection.start(); + Message m1 = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message 1 is not received", m1); + assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX)); + session.commit(); + } + +} diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java new file mode 100644 index 0000000000..49b3ddd3dc --- /dev/null +++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.jmx; + +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.management.JMException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.TabularData; + +import junit.framework.TestCase; + +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.jmx.AMQManagedObject; +import org.apache.qpid.server.jmx.ManagedObjectRegistry; +import org.apache.qpid.server.logging.SystemOutMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.TestLogActor; +import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore; +import org.apache.qpid.server.store.berkeleydb.jmx.BDBHAMessageStoreManagerMBean; +import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; + +public class BDBHAMessageStoreManagerMBeanTest extends TestCase +{ + private static final String TEST_GROUP_NAME = "testGroupName"; + private static final String TEST_NODE_NAME = "testNodeName"; + private static final String TEST_NODE_HOST_PORT = "host:1234"; + private static final String TEST_HELPER_HOST_PORT = "host:5678"; + private static final String TEST_DURABILITY = "sync,sync,all"; + private static final String TEST_NODE_STATE = "MASTER"; + private static final String TEST_STORE_NAME = "testStoreName"; + private static final boolean TEST_DESIGNATED_PRIMARY_FLAG = false; + + private BDBHAMessageStore _store; + private BDBHAMessageStoreManagerMBean _mBean; + private AMQManagedObject _mBeanParent; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + CurrentActor.set(new TestLogActor(new SystemOutMessageLogger())); + _store = mock(BDBHAMessageStore.class); + _mBeanParent = mock(AMQManagedObject.class); + when(_mBeanParent.getRegistry()).thenReturn(mock(ManagedObjectRegistry.class)); + _mBean = new BDBHAMessageStoreManagerMBean(_store, _mBeanParent); + } + + @Override + protected void tearDown() throws Exception + { + super.tearDown(); + CurrentActor.remove(); + } + + public void testObjectName() throws Exception + { + when(_store.getName()).thenReturn(TEST_STORE_NAME); + + String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + TEST_STORE_NAME; + assertEquals(expectedObjectName, _mBean.getObjectName().toString()); + } + + public void testGroupName() throws Exception + { + when(_store.getGroupName()).thenReturn(TEST_GROUP_NAME); + + assertEquals(TEST_GROUP_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_GROUP_NAME)); + } + + public void testNodeName() throws Exception + { + when(_store.getNodeName()).thenReturn(TEST_NODE_NAME); + + assertEquals(TEST_NODE_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_NAME)); + } + + public void testNodeHostPort() throws Exception + { + when(_store.getNodeHostPort()).thenReturn(TEST_NODE_HOST_PORT); + + assertEquals(TEST_NODE_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_HOST_PORT)); + } + + public void testHelperHostPort() throws Exception + { + when(_store.getHelperHostPort()).thenReturn(TEST_HELPER_HOST_PORT); + + assertEquals(TEST_HELPER_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_HELPER_HOST_PORT)); + } + + public void testDurability() throws Exception + { + when(_store.getDurability()).thenReturn(TEST_DURABILITY); + + assertEquals(TEST_DURABILITY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DURABILITY)); + } + + public void testCoalescingSync() throws Exception + { + when(_store.isCoalescingSync()).thenReturn(true); + + assertEquals(true, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_COALESCING_SYNC)); + } + + public void testNodeState() throws Exception + { + when(_store.getNodeState()).thenReturn(TEST_NODE_STATE); + + assertEquals(TEST_NODE_STATE, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_STATE)); + } + + public void testDesignatedPrimaryFlag() throws Exception + { + when(_store.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG); + + assertEquals(TEST_DESIGNATED_PRIMARY_FLAG, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DESIGNATED_PRIMARY)); + } + + public void testGroupMembersForGroupWithOneNode() throws Exception + { + List<Map<String, String>> members = Collections.singletonList(createTestNodeResult()); + when(_store.getGroupMembers()).thenReturn(members); + + final TabularData resultsTable = _mBean.getAllNodesInGroup(); + + assertTableHasHeadingsNamed(resultsTable, BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT); + + final int numberOfDataRows = resultsTable.size(); + assertEquals("Unexpected number of data rows", 1 ,numberOfDataRows); + final CompositeData row = (CompositeData) resultsTable.values().iterator().next(); + assertEquals(TEST_NODE_NAME, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME)); + assertEquals(TEST_NODE_HOST_PORT, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT)); + } + + public void testRemoveNodeFromReplicationGroup() throws Exception + { + _mBean.removeNodeFromGroup(TEST_NODE_NAME); + + verify(_store).removeNodeFromGroup(TEST_NODE_NAME); + } + + public void testRemoveNodeFromReplicationGroupWithError() throws Exception + { + doThrow(new AMQStoreException("mocked exception")).when(_store).removeNodeFromGroup(TEST_NODE_NAME); + + try + { + _mBean.removeNodeFromGroup(TEST_NODE_NAME); + fail("Exception not thrown"); + } + catch (JMException je) + { + // PASS + } + } + + public void testSetAsDesignatedPrimary() throws Exception + { + _mBean.setDesignatedPrimary(true); + + verify(_store).setDesignatedPrimary(true); + } + + public void testSetAsDesignatedPrimaryWithError() throws Exception + { + doThrow(new AMQStoreException("mocked exception")).when(_store).setDesignatedPrimary(true); + + try + { + _mBean.setDesignatedPrimary(true); + fail("Exception not thrown"); + } + catch (JMException je) + { + // PASS + } + } + + public void testUpdateAddress() throws Exception + { + String newHostName = "newHostName"; + int newPort = 1967; + + _mBean.updateAddress(TEST_NODE_NAME, newHostName, newPort); + + verify(_store).updateAddress(TEST_NODE_NAME, newHostName, newPort); + } + + private void assertTableHasHeadingsNamed(final TabularData resultsTable, String... headingNames) + { + CompositeType headingsRow = resultsTable.getTabularType().getRowType(); + for (final String headingName : headingNames) + { + assertTrue("Table should have column with heading " + headingName, headingsRow.containsKey(headingName)); + } + } + + private Map<String, String> createTestNodeResult() + { + Map<String, String> items = new HashMap<String, String>(); + items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME); + items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT); + return items; + } +} |
