summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/jmx
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-06-28 16:46:12 +0000
committerRobert Gemmell <robbie@apache.org>2012-06-28 16:46:12 +0000
commit07c285f662e8f60d4e8aca247b65b77ca5df4587 (patch)
tree7fe15262589c0fe5206e02a5e9336c6288f004e0 /qpid/java/bdbstore/jmx
parentbb45ec03f95ffdfa6c0163067dcb75af8b64ceb5 (diff)
downloadqpid-python-07c285f662e8f60d4e8aca247b65b77ca5df4587.tar.gz
QPID-3998, QPID-3999, QPID-4093: add new management plugins for jmx/rest/webui functionality, partial merge from the java-config-and-management branch at r1355039
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1355072 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/jmx')
-rw-r--r--qpid/java/bdbstore/jmx/MANIFEST.MF20
-rw-r--r--qpid/java/bdbstore/jmx/build.xml29
-rw-r--r--qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java232
-rw-r--r--qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java73
-rw-r--r--qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java82
-rw-r--r--qpid/java/bdbstore/jmx/src/main/resources/services/org.apache.qpid.server.jmx.MBeanProvider1
-rw-r--r--qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java231
-rw-r--r--qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java217
-rw-r--r--qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java233
9 files changed, 1118 insertions, 0 deletions
diff --git a/qpid/java/bdbstore/jmx/MANIFEST.MF b/qpid/java/bdbstore/jmx/MANIFEST.MF
new file mode 100644
index 0000000000..7046c4326d
--- /dev/null
+++ b/qpid/java/bdbstore/jmx/MANIFEST.MF
@@ -0,0 +1,20 @@
+Manifest-Version: 1.0
+Bundle-ManifestVersion: 2
+Bundle-Name: Qpid Bdbstore-Plugins JMX
+Bundle-SymbolicName: bdbstore-plugins-jmx
+Bundle-Description: Bdbstore Management plugin for Qpid.
+Bundle-License: http://www.apache.org/licenses/LICENSE-2.0.txt
+Bundle-DocURL: http://www.apache.org/
+Bundle-Version: 1.0.0
+Bundle-RequiredExecutionEnvironment: JavaSE-1.6
+Bundle-ClassPath: .
+Fragment-Host: broker-plugins-jmx
+Import-Package: org.apache.qpid,
+ org.apache.qpid.management.common.mbeans.annotations,
+ org.apache.qpid.server.model,
+ org.apache.qpid.server.virtualhost,
+ org.apache.qpid.server.store.berkeleydb,
+ org.apache.log4j;version=1.2.16,
+ javax.management,
+ javax.management.openmbean
+Export-Package: org.apache.qpid.server.store.berkeleydb.jmx
diff --git a/qpid/java/bdbstore/jmx/build.xml b/qpid/java/bdbstore/jmx/build.xml
new file mode 100644
index 0000000000..2015b0cbb5
--- /dev/null
+++ b/qpid/java/bdbstore/jmx/build.xml
@@ -0,0 +1,29 @@
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -->
+<project name="bdbstore-jmx" default="build">
+ <property name="module.depends" value="common broker broker-plugins broker-plugins/jmx management/common bdbstore" />
+ <property name="module.test.depends" value="test broker/test common/test management/common client systests bdbstore/test" />
+
+ <property name="module.manifest" value="MANIFEST.MF" />
+ <property name="module.plugin" value="true" />
+
+ <import file="../../module.xml" />
+
+ <target name="bundle" depends="bundle-tasks" />
+</project>
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
new file mode 100644
index 0000000000..455573f7bc
--- /dev/null
+++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.jmx;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import javax.management.JMException;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.jmx.AMQManagedObject;
+import org.apache.qpid.server.jmx.ManagedObject;
+import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore;
+
+/**
+ * Management mbean for BDB HA.
+ * <p>
+ * At runtime, the classloader loading this clas must have visibility of the other Qpid JMX classes. This is
+ * currently arranged through OSGI using the <b>fragment</b> feature so that this bundle shares the
+ * same classloader as broker-plugins-jmx. See the <b>Fragment-Host:</b> header within the MANIFEST.MF
+ * of this bundle.
+ * </p>
+ */
+public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements ManagedBDBHAMessageStore
+{
+ private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStoreManagerMBean.class);
+
+ private static final TabularType GROUP_MEMBERS_TABLE;
+ private static final CompositeType GROUP_MEMBER_ROW;
+ private static final OpenType<?>[] GROUP_MEMBER_ATTRIBUTE_TYPES;
+
+ static
+ {
+ try
+ {
+ GROUP_MEMBER_ATTRIBUTE_TYPES = new OpenType<?>[] {SimpleType.STRING, SimpleType.STRING};
+ final String[] itemNames = new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT};
+ final String[] itemDescriptions = new String[] {"Unique node name", "Node host / port "};
+ GROUP_MEMBER_ROW = new CompositeType("GroupMember", "Replication group member",
+ itemNames,
+ itemDescriptions,
+ GROUP_MEMBER_ATTRIBUTE_TYPES );
+ GROUP_MEMBERS_TABLE = new TabularType("GroupMembers", "Replication group memebers",
+ GROUP_MEMBER_ROW,
+ new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME});
+ }
+ catch (final OpenDataException ode)
+ {
+ throw new ExceptionInInitializerError(ode);
+ }
+ }
+
+ private final BDBHAMessageStore _store;
+
+ protected BDBHAMessageStoreManagerMBean(BDBHAMessageStore store, ManagedObject parent) throws JMException
+ {
+ super(ManagedBDBHAMessageStore.class, ManagedBDBHAMessageStore.TYPE, ((AMQManagedObject)parent).getRegistry());
+ LOGGER.debug("Creating BDBHAMessageStoreManagerMBean");
+ _store = store;
+ register();
+ }
+
+ @Override
+ public String getObjectInstanceName()
+ {
+ return _store.getName();
+ }
+
+ @Override
+ public String getGroupName()
+ {
+ return _store.getGroupName();
+ }
+
+ @Override
+ public String getNodeName()
+ {
+ return _store.getNodeName();
+ }
+
+ @Override
+ public String getNodeHostPort()
+ {
+ return _store.getNodeHostPort();
+ }
+
+ @Override
+ public String getHelperHostPort()
+ {
+ return _store.getHelperHostPort();
+ }
+
+ @Override
+ public String getDurability() throws IOException, JMException
+ {
+ try
+ {
+ return _store.getDurability();
+ }
+ catch (RuntimeException e)
+ {
+ LOGGER.debug("Failed query replication policy", e);
+ throw new JMException(e.getMessage());
+ }
+ }
+
+
+ @Override
+ public boolean getCoalescingSync() throws IOException, JMException
+ {
+ return _store.isCoalescingSync();
+ }
+
+ @Override
+ public String getNodeState() throws IOException, JMException
+ {
+ try
+ {
+ return _store.getNodeState();
+ }
+ catch (RuntimeException e)
+ {
+ LOGGER.debug("Failed query node state", e);
+ throw new JMException(e.getMessage());
+ }
+ }
+
+ @Override
+ public boolean getDesignatedPrimary() throws IOException, JMException
+ {
+ try
+ {
+ return _store.isDesignatedPrimary();
+ }
+ catch (RuntimeException e)
+ {
+ LOGGER.debug("Failed query designated primary", e);
+ throw new JMException(e.getMessage());
+ }
+ }
+
+ @Override
+ public TabularData getAllNodesInGroup() throws IOException, JMException
+ {
+ final TabularDataSupport data = new TabularDataSupport(GROUP_MEMBERS_TABLE);
+ final List<Map<String, String>> members = _store.getGroupMembers();
+
+ for (Map<String, String> map : members)
+ {
+ CompositeData memberData = new CompositeDataSupport(GROUP_MEMBER_ROW, map);
+ data.put(memberData);
+ }
+ return data;
+ }
+
+ @Override
+ public void removeNodeFromGroup(String nodeName) throws JMException
+ {
+ try
+ {
+ _store.removeNodeFromGroup(nodeName);
+ }
+ catch (AMQStoreException e)
+ {
+ LOGGER.error("Failed to remove node " + nodeName + " from group", e);
+ throw new JMException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void setDesignatedPrimary(boolean primary) throws JMException
+ {
+ try
+ {
+ _store.setDesignatedPrimary(primary);
+ }
+ catch (AMQStoreException e)
+ {
+ LOGGER.error("Failed to set node " + _store.getNodeName() + " as designated primary", e);
+ throw new JMException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void updateAddress(String nodeName, String newHostName, int newPort) throws JMException
+ {
+ try
+ {
+ _store.updateAddress(nodeName, newHostName, newPort);
+ }
+ catch(AMQStoreException e)
+ {
+ LOGGER.error("Failed to update address for node " + nodeName + " to " + newHostName + ":" + newPort, e);
+ throw new JMException(e.getMessage());
+ }
+ }
+
+ @Override
+ public ManagedObject getParentObject()
+ {
+ return null;
+ }
+
+}
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
new file mode 100644
index 0000000000..837da1eef3
--- /dev/null
+++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.jmx;
+
+import javax.management.JMException;
+import javax.management.StandardMBean;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.jmx.MBeanProvider;
+import org.apache.qpid.server.jmx.ManagedObject;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+
+/**
+ * This provide will create a {@link BDBHAMessageStoreManagerMBean} if the child is a virtual
+ * host and of type {@link BDBHAMessageStore#BDB_HA_STORE_TYPE}.
+ *
+ */
+public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider
+{
+ private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStoreManagerMBeanProvider.class);
+
+ public BDBHAMessageStoreManagerMBeanProvider()
+ {
+ super();
+ }
+
+ @Override
+ public boolean isChildManageableByMBean(ConfiguredObject child)
+ {
+ return (child instanceof VirtualHost
+ && BDBHAMessageStore.BDB_HA_STORE_TYPE.equals(child.getAttribute(VirtualHost.STORE_TYPE)));
+ }
+
+ @Override
+ public StandardMBean createMBean(ConfiguredObject child, StandardMBean parent) throws JMException
+ {
+ VirtualHost virtualHostChild = (VirtualHost) child;
+
+ VirtualHostRegistry virtualHostRegistry = ApplicationRegistry.getInstance().getVirtualHostRegistry();
+ org.apache.qpid.server.virtualhost.VirtualHost vhost = virtualHostRegistry.getVirtualHost(virtualHostChild.getName());
+
+ BDBHAMessageStore messageStore = (BDBHAMessageStore) vhost.getMessageStore();
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Creating mBean for child " + child);
+ }
+
+ return new BDBHAMessageStoreManagerMBean(messageStore, (ManagedObject) parent);
+ }
+}
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java
new file mode 100644
index 0000000000..b85e44526b
--- /dev/null
+++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.jmx;
+
+import java.io.IOException;
+
+import javax.management.JMException;
+import javax.management.openmbean.TabularData;
+
+import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParameter;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation;
+
+public interface ManagedBDBHAMessageStore
+{
+ public static final String TYPE = "BDBHAMessageStore";
+
+ public static final String ATTR_GROUP_NAME = "GroupName";
+ public static final String ATTR_NODE_NAME = "NodeName";
+ public static final String ATTR_NODE_HOST_PORT = "NodeHostPort";
+ public static final String ATTR_HELPER_HOST_PORT = "HelperHostPort";
+ public static final String ATTR_DURABILITY = "Durability";
+ public static final String ATTR_NODE_STATE = "NodeState";
+ public static final String ATTR_DESIGNATED_PRIMARY = "DesignatedPrimary";
+ public static final String ATTR_COALESCING_SYNC = "CoalescingSync";
+
+ @MBeanAttribute(name=ATTR_GROUP_NAME, description="Name identifying the group")
+ String getGroupName() throws IOException, JMException;
+
+ @MBeanAttribute(name=ATTR_NODE_NAME, description="Unique name identifying the node within the group")
+ String getNodeName() throws IOException, JMException;
+
+ @MBeanAttribute(name=ATTR_NODE_HOST_PORT, description="Host/port used to replicate data between this node and others in the group")
+ String getNodeHostPort() throws IOException, JMException;
+
+ @MBeanAttribute(name=ATTR_NODE_STATE, description="Current state of this node")
+ String getNodeState() throws IOException, JMException;
+
+ @MBeanAttribute(name=ATTR_HELPER_HOST_PORT, description="Host/port used to allow a new node to discover other group members")
+ String getHelperHostPort() throws IOException, JMException;
+
+ @MBeanAttribute(name=ATTR_DURABILITY, description="Durability")
+ String getDurability() throws IOException, JMException;
+
+ @MBeanAttribute(name=ATTR_DESIGNATED_PRIMARY, description="Designated primary flag. Applicable to the two node case.")
+ boolean getDesignatedPrimary() throws IOException, JMException;
+
+ @MBeanAttribute(name=ATTR_COALESCING_SYNC, description="Coalescing sync flag. Applicable to the master sync policies NO_SYNC and WRITE_NO_SYNC only.")
+ boolean getCoalescingSync() throws IOException, JMException;
+
+ @MBeanAttribute(name="getAllNodesInGroup", description="Get all nodes within the group, regardless of whether currently attached or not")
+ TabularData getAllNodesInGroup() throws IOException, JMException;
+
+ @MBeanOperation(name="removeNodeFromGroup", description="Remove an existing node from the group")
+ void removeNodeFromGroup(@MBeanOperationParameter(name="nodeName", description="name of node")String nodeName) throws JMException;
+
+ @MBeanOperation(name="setDesignatedPrimary", description="Set/unset this node as the designated primary for the group. Applicable to the two node case.")
+ void setDesignatedPrimary(@MBeanOperationParameter(name="primary", description="designated primary")boolean primary) throws JMException;
+
+ @MBeanOperation(name="updateAddress", description="Update the address of another node. The node must be in a STOPPED state.")
+ void updateAddress(@MBeanOperationParameter(name="nodeName", description="name of node")String nodeName,
+ @MBeanOperationParameter(name="newHostName", description="new hostname")String newHostName,
+ @MBeanOperationParameter(name="newPort", description="new port number")int newPort) throws JMException;
+}
+
diff --git a/qpid/java/bdbstore/jmx/src/main/resources/services/org.apache.qpid.server.jmx.MBeanProvider b/qpid/java/bdbstore/jmx/src/main/resources/services/org.apache.qpid.server.jmx.MBeanProvider
new file mode 100644
index 0000000000..b5bc947612
--- /dev/null
+++ b/qpid/java/bdbstore/jmx/src/main/resources/services/org.apache.qpid.server.jmx.MBeanProvider
@@ -0,0 +1 @@
+org.apache.qpid.server.store.berkeleydb.jmx.BDBHAMessageStoreManagerMBeanProvider
diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
new file mode 100644
index 0000000000..45038bf050
--- /dev/null
+++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import static com.sleepycat.je.rep.ReplicatedEnvironment.State.DETACHED;
+import static com.sleepycat.je.rep.ReplicatedEnvironment.State.MASTER;
+import static com.sleepycat.je.rep.ReplicatedEnvironment.State.REPLICA;
+import static com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import javax.jms.Connection;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import com.sleepycat.je.EnvironmentFailureException;
+
+/**
+ * System test verifying the ability to control a cluster via the Management API.
+ *
+ * @see HAClusterBlackboxTest
+ */
+public class HAClusterManagementTest extends QpidBrokerTestCase
+{
+ protected static final Logger LOGGER = Logger.getLogger(HAClusterManagementTest.class);
+
+ private static final Set<String> NON_MASTER_STATES = new HashSet<String>(Arrays.asList(REPLICA.toString(), DETACHED.toString(), UNKNOWN.toString()));;
+ private static final String VIRTUAL_HOST = "test";
+
+ private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + VIRTUAL_HOST;
+ private static final int NUMBER_OF_NODES = 4;
+
+ private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
+ private final JMXTestUtils _jmxUtils = new JMXTestUtils(this);
+
+ private ConnectionURL _brokerFailoverUrl;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ _brokerType = BrokerType.SPAWNED;
+ _jmxUtils.setUp();
+
+ _clusterCreator.configureClusterNodes();
+ _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes();
+ _clusterCreator.startCluster();
+
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ _jmxUtils.close();
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ @Override
+ public void startBroker() throws Exception
+ {
+ // Don't start default broker provided by QBTC.
+ }
+
+ public void testReadonlyMBeanAttributes() throws Exception
+ {
+ final int brokerPortNumber = getBrokerPortNumbers().iterator().next();
+ final int bdbPortNumber = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumber);
+
+ ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber);
+ assertEquals("Unexpected store group name", _clusterCreator.getGroupName(), storeBean.getGroupName());
+ assertEquals("Unexpected store node name", _clusterCreator.getNodeNameForNodeAt(bdbPortNumber), storeBean.getNodeName());
+ assertEquals("Unexpected store node host port",_clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber), storeBean.getNodeHostPort());
+ assertEquals("Unexpected store helper host port", _clusterCreator.getHelperHostPort(), storeBean.getHelperHostPort());
+ // As we have chosen an arbitrary broker from the cluster, we cannot predict its state
+ assertNotNull("Store state must not be null", storeBean.getNodeState());
+ }
+
+ public void testStateOfActiveBrokerIsMaster() throws Exception
+ {
+ final Connection activeConnection = getConnection(_brokerFailoverUrl);
+ final int activeBrokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(activeConnection);
+
+ ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(activeBrokerPortNumber);
+ assertEquals("Unexpected store state", MASTER.toString(), storeBean.getNodeState());
+ }
+
+ public void testStateOfNonActiveBrokerIsNotMaster() throws Exception
+ {
+ final Connection activeConnection = getConnection(_brokerFailoverUrl);
+ final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection);
+ ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(inactiveBrokerPortNumber);
+ final String nodeState = storeBean.getNodeState();
+ assertTrue("Unexpected store state : " + nodeState, NON_MASTER_STATES.contains(nodeState));
+ }
+
+ public void testGroupMembers() throws Exception
+ {
+ final int brokerPortNumber = getBrokerPortNumbers().iterator().next();
+
+ ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber);
+ final TabularData groupMembers = storeBean.getAllNodesInGroup();
+ assertNotNull(groupMembers);
+
+ final int numberOfDataRows = groupMembers.size();
+ assertEquals("Unexpected number of data rows", NUMBER_OF_NODES ,numberOfDataRows);
+
+ for(int bdbPortNumber : _clusterCreator.getBdbPortNumbers())
+ {
+ final String nodeName = _clusterCreator.getNodeNameForNodeAt(bdbPortNumber);
+ final String nodeHostPort = _clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber);
+
+ CompositeData row = groupMembers.get(new Object[] {nodeName});
+ assertNotNull("Table does not contain row for node name " + nodeName, row);
+ assertEquals(nodeHostPort, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT));
+ }
+ }
+
+ public void testRemoveNodeFromGroup() throws Exception
+ {
+ final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
+ final int brokerPortNumberToMakeObservation = brokerPortNumberIterator.next();
+ final int brokerPortNumberToBeRemoved = brokerPortNumberIterator.next();
+ final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToMakeObservation);
+ final int numberOfDataRows = storeBean.getAllNodesInGroup().size();
+ assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES ,numberOfDataRows);
+
+ final String removedNodeName = _clusterCreator.getNodeNameForNodeAt(_clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeRemoved));
+ _clusterCreator.stopNode(brokerPortNumberToBeRemoved);
+ storeBean.removeNodeFromGroup(removedNodeName);
+
+ final int numberOfDataRowsAfterRemoval = storeBean.getAllNodesInGroup().size();
+ assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES - 1,numberOfDataRowsAfterRemoval);
+ }
+
+ /**
+ * Updates the address of a node.
+ *
+ * If the broker (node) can subsequently start without error then the update was a success, hence no need for an explicit
+ * assert.
+ *
+ * @see #testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() for converse case
+ */
+ public void testUpdateAddress() throws Exception
+ {
+ final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
+ final int brokerPortNumberToPerformUpdate = brokerPortNumberIterator.next();
+ final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next();
+ final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToPerformUpdate);
+
+ _clusterCreator.stopNode(brokerPortNumberToBeMoved);
+
+ final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved);
+ final int newBdbPort = getNextAvailable(oldBdbPort + 1);
+
+ storeBean.updateAddress(_clusterCreator.getNodeNameForNodeAt(oldBdbPort), _clusterCreator.getIpAddressOfBrokerHost(), newBdbPort);
+
+ _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort);
+
+ _clusterCreator.startNode(brokerPortNumberToBeMoved);
+ }
+
+ /**
+ * @see #testUpdateAddress()
+ */
+ public void testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() throws Exception
+ {
+ final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
+ final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next();
+
+ _clusterCreator.stopNode(brokerPortNumberToBeMoved);
+
+ final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved);
+ final int newBdbPort = getNextAvailable(oldBdbPort + 1);
+
+ // now deliberately don't call updateAddress
+
+ _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort);
+
+ try
+ {
+ _clusterCreator.startNode(brokerPortNumberToBeMoved);
+ fail("Exception not thrown");
+ }
+ catch(RuntimeException rte)
+ {
+ //check cause was BDBs EnvironmentFailureException
+ assertTrue(rte.getMessage().contains(EnvironmentFailureException.class.getName()));
+ // PASS
+ }
+ }
+
+ private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort(
+ final int activeBrokerPortNumber) throws Exception
+ {
+ _jmxUtils.open(activeBrokerPortNumber);
+
+ return _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY);
+ }
+}
diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
new file mode 100644
index 0000000000..22877ec36c
--- /dev/null
+++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import com.sleepycat.je.rep.ReplicationConfig;
+
+public class HAClusterTwoNodeTest extends QpidBrokerTestCase
+{
+ private static final long RECEIVE_TIMEOUT = 5000l;
+
+ private static final String VIRTUAL_HOST = "test";
+
+ private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + VIRTUAL_HOST;
+ private static final int NUMBER_OF_NODES = 2;
+
+ private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
+ private final JMXTestUtils _jmxUtils = new JMXTestUtils(this);
+
+ private ConnectionURL _brokerFailoverUrl;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ _brokerType = BrokerType.SPAWNED;
+
+ assertTrue(isJavaBroker());
+ assertTrue(isBrokerStorePersistent());
+ _jmxUtils.setUp();
+
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ _jmxUtils.close();
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ @Override
+ public void startBroker() throws Exception
+ {
+ // Don't start default broker provided by QBTC.
+ }
+
+ private void startCluster(boolean designedPrimary) throws Exception
+ {
+ setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties");
+
+ String storeConfigKeyPrefix = _clusterCreator.getStoreConfigKeyPrefix();
+
+ setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).name", ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT);
+ setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).value", "2 s");
+
+ setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES);
+ setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).value", "0");
+
+ _clusterCreator.configureClusterNodes();
+ _clusterCreator.setDesignatedPrimaryOnFirstBroker(designedPrimary);
+ _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes();
+ _clusterCreator.startCluster();
+ }
+
+ public void testMasterDesignatedPrimaryCanBeRestartedWithoutReplica() throws Exception
+ {
+ startCluster(true);
+ final Connection initialConnection = getConnection(_brokerFailoverUrl);
+ int masterPort = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection);
+ assertProducingConsuming(initialConnection);
+ initialConnection.close();
+ _clusterCreator.stopCluster();
+ _clusterCreator.startNode(masterPort);
+ final Connection secondConnection = getConnection(_brokerFailoverUrl);
+ assertProducingConsuming(secondConnection);
+ secondConnection.close();
+ }
+
+ public void testClusterRestartWithoutDesignatedPrimary() throws Exception
+ {
+ startCluster(false);
+ final Connection initialConnection = getConnection(_brokerFailoverUrl);
+ assertProducingConsuming(initialConnection);
+ initialConnection.close();
+ _clusterCreator.stopCluster();
+ _clusterCreator.startClusterParallel();
+ final Connection secondConnection = getConnection(_brokerFailoverUrl);
+ assertProducingConsuming(secondConnection);
+ secondConnection.close();
+ }
+
+ public void testDesignatedPrimaryContinuesAfterSecondaryStopped() throws Exception
+ {
+ startCluster(true);
+ _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode());
+ final Connection connection = getConnection(_brokerFailoverUrl);
+ assertNotNull("Expected to get a valid connection to primary", connection);
+ assertProducingConsuming(connection);
+ }
+
+ public void testPersistentOperationsFailOnNonDesignatedPrimarysAfterSecondaryStopped() throws Exception
+ {
+ startCluster(false);
+ _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode());
+ final Connection connection = getConnection(_brokerFailoverUrl);
+ assertNotNull("Expected to get a valid connection to primary", connection);
+ try
+ {
+ assertProducingConsuming(connection);
+ fail("JMS peristent operations succeded on Master 'not designated primary' buy they should fail as replica is not available");
+ }
+ catch(JMSException e)
+ {
+ // JMSException should be thrown on transaction start/commit
+ }
+ }
+
+ public void testSecondaryDoesNotBecomePrimaryWhenDesignatedPrimaryStopped() throws Exception
+ {
+ startCluster(true);
+ _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary());
+
+ try
+ {
+ getConnection(_brokerFailoverUrl);
+ fail("Connection not expected");
+ }
+ catch (JMSException e)
+ {
+ // PASS
+ }
+ }
+
+ public void testInitialDesignatedPrimaryStateOfNodes() throws Exception
+ {
+ startCluster(true);
+ final ManagedBDBHAMessageStore primaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfPrimary());
+ assertTrue("Expected primary node to be set as designated primary", primaryStoreBean.getDesignatedPrimary());
+
+ final ManagedBDBHAMessageStore secondaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode());
+ assertFalse("Expected secondary node to NOT be set as designated primary", secondaryStoreBean.getDesignatedPrimary());
+ }
+
+ public void testSecondaryDesignatedAsPrimaryAfterOrginalPrimaryStopped() throws Exception
+ {
+ startCluster(true);
+ _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary());
+ final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode());
+
+ assertFalse("Expected node to NOT be set as designated primary", storeBean.getDesignatedPrimary());
+ storeBean.setDesignatedPrimary(true);
+ assertTrue("Expected node to now be set as designated primary", storeBean.getDesignatedPrimary());
+
+ final Connection connection = getConnection(_brokerFailoverUrl);
+ assertNotNull("Expected to get a valid connection to new primary", connection);
+ assertProducingConsuming(connection);
+ }
+
+ private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort(
+ final int activeBrokerPortNumber) throws Exception
+ {
+ _jmxUtils.open(activeBrokerPortNumber);
+
+ ManagedBDBHAMessageStore storeBean = _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY);
+ return storeBean;
+ }
+
+ private void assertProducingConsuming(final Connection connection) throws JMSException, Exception
+ {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Destination destination = session.createQueue(getTestQueueName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ sendMessage(session, destination, 1);
+ connection.start();
+ Message m1 = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message 1 is not received", m1);
+ assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX));
+ session.commit();
+ }
+
+}
diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
new file mode 100644
index 0000000000..49b3ddd3dc
--- /dev/null
+++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.jmx;
+
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.management.JMException;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.TabularData;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.jmx.AMQManagedObject;
+import org.apache.qpid.server.jmx.ManagedObjectRegistry;
+import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.TestLogActor;
+import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore;
+import org.apache.qpid.server.store.berkeleydb.jmx.BDBHAMessageStoreManagerMBean;
+import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore;
+
+public class BDBHAMessageStoreManagerMBeanTest extends TestCase
+{
+ private static final String TEST_GROUP_NAME = "testGroupName";
+ private static final String TEST_NODE_NAME = "testNodeName";
+ private static final String TEST_NODE_HOST_PORT = "host:1234";
+ private static final String TEST_HELPER_HOST_PORT = "host:5678";
+ private static final String TEST_DURABILITY = "sync,sync,all";
+ private static final String TEST_NODE_STATE = "MASTER";
+ private static final String TEST_STORE_NAME = "testStoreName";
+ private static final boolean TEST_DESIGNATED_PRIMARY_FLAG = false;
+
+ private BDBHAMessageStore _store;
+ private BDBHAMessageStoreManagerMBean _mBean;
+ private AMQManagedObject _mBeanParent;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
+ _store = mock(BDBHAMessageStore.class);
+ _mBeanParent = mock(AMQManagedObject.class);
+ when(_mBeanParent.getRegistry()).thenReturn(mock(ManagedObjectRegistry.class));
+ _mBean = new BDBHAMessageStoreManagerMBean(_store, _mBeanParent);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ CurrentActor.remove();
+ }
+
+ public void testObjectName() throws Exception
+ {
+ when(_store.getName()).thenReturn(TEST_STORE_NAME);
+
+ String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + TEST_STORE_NAME;
+ assertEquals(expectedObjectName, _mBean.getObjectName().toString());
+ }
+
+ public void testGroupName() throws Exception
+ {
+ when(_store.getGroupName()).thenReturn(TEST_GROUP_NAME);
+
+ assertEquals(TEST_GROUP_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_GROUP_NAME));
+ }
+
+ public void testNodeName() throws Exception
+ {
+ when(_store.getNodeName()).thenReturn(TEST_NODE_NAME);
+
+ assertEquals(TEST_NODE_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_NAME));
+ }
+
+ public void testNodeHostPort() throws Exception
+ {
+ when(_store.getNodeHostPort()).thenReturn(TEST_NODE_HOST_PORT);
+
+ assertEquals(TEST_NODE_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_HOST_PORT));
+ }
+
+ public void testHelperHostPort() throws Exception
+ {
+ when(_store.getHelperHostPort()).thenReturn(TEST_HELPER_HOST_PORT);
+
+ assertEquals(TEST_HELPER_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_HELPER_HOST_PORT));
+ }
+
+ public void testDurability() throws Exception
+ {
+ when(_store.getDurability()).thenReturn(TEST_DURABILITY);
+
+ assertEquals(TEST_DURABILITY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DURABILITY));
+ }
+
+ public void testCoalescingSync() throws Exception
+ {
+ when(_store.isCoalescingSync()).thenReturn(true);
+
+ assertEquals(true, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_COALESCING_SYNC));
+ }
+
+ public void testNodeState() throws Exception
+ {
+ when(_store.getNodeState()).thenReturn(TEST_NODE_STATE);
+
+ assertEquals(TEST_NODE_STATE, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_STATE));
+ }
+
+ public void testDesignatedPrimaryFlag() throws Exception
+ {
+ when(_store.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG);
+
+ assertEquals(TEST_DESIGNATED_PRIMARY_FLAG, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DESIGNATED_PRIMARY));
+ }
+
+ public void testGroupMembersForGroupWithOneNode() throws Exception
+ {
+ List<Map<String, String>> members = Collections.singletonList(createTestNodeResult());
+ when(_store.getGroupMembers()).thenReturn(members);
+
+ final TabularData resultsTable = _mBean.getAllNodesInGroup();
+
+ assertTableHasHeadingsNamed(resultsTable, BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT);
+
+ final int numberOfDataRows = resultsTable.size();
+ assertEquals("Unexpected number of data rows", 1 ,numberOfDataRows);
+ final CompositeData row = (CompositeData) resultsTable.values().iterator().next();
+ assertEquals(TEST_NODE_NAME, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME));
+ assertEquals(TEST_NODE_HOST_PORT, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT));
+ }
+
+ public void testRemoveNodeFromReplicationGroup() throws Exception
+ {
+ _mBean.removeNodeFromGroup(TEST_NODE_NAME);
+
+ verify(_store).removeNodeFromGroup(TEST_NODE_NAME);
+ }
+
+ public void testRemoveNodeFromReplicationGroupWithError() throws Exception
+ {
+ doThrow(new AMQStoreException("mocked exception")).when(_store).removeNodeFromGroup(TEST_NODE_NAME);
+
+ try
+ {
+ _mBean.removeNodeFromGroup(TEST_NODE_NAME);
+ fail("Exception not thrown");
+ }
+ catch (JMException je)
+ {
+ // PASS
+ }
+ }
+
+ public void testSetAsDesignatedPrimary() throws Exception
+ {
+ _mBean.setDesignatedPrimary(true);
+
+ verify(_store).setDesignatedPrimary(true);
+ }
+
+ public void testSetAsDesignatedPrimaryWithError() throws Exception
+ {
+ doThrow(new AMQStoreException("mocked exception")).when(_store).setDesignatedPrimary(true);
+
+ try
+ {
+ _mBean.setDesignatedPrimary(true);
+ fail("Exception not thrown");
+ }
+ catch (JMException je)
+ {
+ // PASS
+ }
+ }
+
+ public void testUpdateAddress() throws Exception
+ {
+ String newHostName = "newHostName";
+ int newPort = 1967;
+
+ _mBean.updateAddress(TEST_NODE_NAME, newHostName, newPort);
+
+ verify(_store).updateAddress(TEST_NODE_NAME, newHostName, newPort);
+ }
+
+ private void assertTableHasHeadingsNamed(final TabularData resultsTable, String... headingNames)
+ {
+ CompositeType headingsRow = resultsTable.getTabularType().getRowType();
+ for (final String headingName : headingNames)
+ {
+ assertTrue("Table should have column with heading " + headingName, headingsRow.containsKey(headingName));
+ }
+ }
+
+ private Map<String, String> createTestNodeResult()
+ {
+ Map<String, String> items = new HashMap<String, String>();
+ items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME);
+ items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT);
+ return items;
+ }
+}