summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2014-05-09 13:24:24 +0000
committerAlex Rudyy <orudyy@apache.org>2014-05-09 13:24:24 +0000
commit566f96394dd04b81c3e2987a221dd935a7ab2276 (patch)
treef0bee06cad9ce8c3b40209b3100e68224cba405b /qpid/java
parentb5bea31b2564f50898d921a2b506f482fa4c6422 (diff)
downloadqpid-python-566f96394dd04b81c3e2987a221dd935a7ab2276.tar.gz
QPID-5409 : Change BDB HA MBean to delegate operations to BDB HA node instance
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1593538 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java117
-rw-r--r--qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java12
-rw-r--r--qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java3
-rw-r--r--qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java125
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java19
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java60
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java2
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java24
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java12
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java86
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java63
11 files changed, 241 insertions, 282 deletions
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
index 3a21bc70d7..7146af364e 100644
--- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
+++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
@@ -20,7 +20,9 @@
package org.apache.qpid.server.store.berkeleydb.jmx;
import java.io.IOException;
-import java.util.List;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import javax.management.JMException;
@@ -39,7 +41,10 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.jmx.AMQManagedObject;
import org.apache.qpid.server.jmx.ManagedObject;
import org.apache.qpid.server.jmx.ManagedObjectRegistry;
-import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+import org.apache.qpid.server.model.IllegalStateTransitionException;
+import org.apache.qpid.server.model.RemoteReplicationNode;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
/**
* Management mbean for BDB HA.
@@ -57,7 +62,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
try
{
GROUP_MEMBER_ATTRIBUTE_TYPES = new OpenType<?>[] {SimpleType.STRING, SimpleType.STRING};
- final String[] itemNames = new String[] {ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT};
+ final String[] itemNames = new String[] {GRP_MEM_COL_NODE_NAME, GRP_MEM_COL_NODE_HOST_PORT};
final String[] itemDescriptions = new String[] {"Unique node name", "Node host / port "};
GROUP_MEMBER_ROW = new CompositeType("GroupMember", "Replication group member",
itemNames,
@@ -65,7 +70,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
GROUP_MEMBER_ATTRIBUTE_TYPES );
GROUP_MEMBERS_TABLE = new TabularType("GroupMembers", "Replication group memebers",
GROUP_MEMBER_ROW,
- new String[] {ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME});
+ new String[] {GRP_MEM_COL_NODE_NAME});
}
catch (final OpenDataException ode)
{
@@ -73,15 +78,15 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
}
}
- private final ReplicatedEnvironmentFacade _replicatedEnvironmentFacade;
+ private final BDBHAVirtualHostNode<?> _virtualHostNode;
private final String _objectName;
- protected BDBHAMessageStoreManagerMBean(String virtualHostName, ReplicatedEnvironmentFacade replicatedEnvironmentFacade, ManagedObjectRegistry registry) throws JMException
+ protected BDBHAMessageStoreManagerMBean(BDBHAVirtualHostNode<?> virtualHostNode, ManagedObjectRegistry registry) throws JMException
{
super(ManagedBDBHAMessageStore.class, ManagedBDBHAMessageStore.TYPE, registry);
- LOGGER.debug("Creating BDBHAMessageStoreManagerMBean for " + virtualHostName);
- _replicatedEnvironmentFacade = replicatedEnvironmentFacade;
- _objectName = ObjectName.quote(virtualHostName);
+ LOGGER.debug("Creating BDBHAMessageStoreManagerMBean for " + virtualHostNode.getName());
+ _virtualHostNode = virtualHostNode;
+ _objectName = ObjectName.quote( virtualHostNode.getGroupName());
register();
}
@@ -94,46 +99,38 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
@Override
public String getGroupName()
{
- return _replicatedEnvironmentFacade.getGroupName();
+ return _virtualHostNode.getGroupName();
}
@Override
public String getNodeName()
{
- return _replicatedEnvironmentFacade.getNodeName();
+ return _virtualHostNode.getName();
}
@Override
public String getNodeHostPort()
{
- return _replicatedEnvironmentFacade.getHostPort();
+ return _virtualHostNode.getAddress();
}
@Override
public String getHelperHostPort()
{
- return _replicatedEnvironmentFacade.getHelperHostPort();
+ return _virtualHostNode.getHelperAddress();
}
@Override
public String getDurability() throws IOException, JMException
{
- try
- {
- return _replicatedEnvironmentFacade.getDurability();
- }
- catch (RuntimeException e)
- {
- LOGGER.debug("Failed query replication policy", e);
- throw new JMException(e.getMessage());
- }
+ return _virtualHostNode.getDurability();
}
@Override
public boolean getCoalescingSync() throws IOException, JMException
{
- return _replicatedEnvironmentFacade.isCoalescingSync();
+ return _virtualHostNode.isCoalescingSync();
}
@Override
@@ -141,11 +138,11 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
{
try
{
- return _replicatedEnvironmentFacade.getNodeState();
+ return _virtualHostNode.getRole();
}
catch (RuntimeException e)
{
- LOGGER.debug("Failed query node state", e);
+ LOGGER.debug("Failed query node role", e);
throw new JMException(e.getMessage());
}
}
@@ -153,26 +150,30 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
@Override
public boolean getDesignatedPrimary() throws IOException, JMException
{
- try
- {
- return _replicatedEnvironmentFacade.isDesignatedPrimary();
- }
- catch (RuntimeException e)
- {
- LOGGER.debug("Failed query designated primary", e);
- throw new JMException(e.getMessage());
- }
+ return _virtualHostNode.isDesignatedPrimary();
}
@Override
public TabularData getAllNodesInGroup() throws IOException, JMException
{
final TabularDataSupport data = new TabularDataSupport(GROUP_MEMBERS_TABLE);
- final List<Map<String, String>> members = _replicatedEnvironmentFacade.getGroupMembers();
- for (Map<String, String> map : members)
+ Map<String, String> localNodeMap = new HashMap<String, String>();
+ localNodeMap.put(GRP_MEM_COL_NODE_NAME, _virtualHostNode.getName());
+ localNodeMap.put(GRP_MEM_COL_NODE_HOST_PORT, _virtualHostNode.getAddress());
+ CompositeData localNodeData = new CompositeDataSupport(GROUP_MEMBER_ROW, localNodeMap);
+ data.put(localNodeData);
+
+ @SuppressWarnings("rawtypes")
+ final Collection<? extends RemoteReplicationNode> members = _virtualHostNode.getRemoteReplicationNodes();
+ for (RemoteReplicationNode<?> remoteNode : members)
{
- CompositeData memberData = new CompositeDataSupport(GROUP_MEMBER_ROW, map);
+ BDBHARemoteReplicationNode<?> haReplicationNode = (BDBHARemoteReplicationNode<?>)remoteNode;
+ Map<String, String> nodeMap = new HashMap<String, String>();
+ nodeMap.put(GRP_MEM_COL_NODE_NAME, haReplicationNode.getName());
+ nodeMap.put(GRP_MEM_COL_NODE_HOST_PORT, haReplicationNode.getAddress());
+
+ CompositeData memberData = new CompositeDataSupport(GROUP_MEMBER_ROW, nodeMap);
data.put(memberData);
}
return data;
@@ -181,14 +182,32 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
@Override
public void removeNodeFromGroup(String nodeName) throws JMException
{
- try
+ if (getNodeName().equals(nodeName))
{
- _replicatedEnvironmentFacade.removeNodeFromGroup(nodeName);
+ _virtualHostNode.delete();
}
- catch (RuntimeException e)
+ else
{
- LOGGER.error("Failed to remove node " + nodeName + " from group", e);
- throw new JMException(e.getMessage());
+ @SuppressWarnings("rawtypes")
+ Collection<? extends RemoteReplicationNode> remoteNodes = _virtualHostNode.getRemoteReplicationNodes();
+ for (RemoteReplicationNode<?> remoteNode : remoteNodes)
+ {
+ if (remoteNode.getName().equals(nodeName))
+ {
+ try
+ {
+ remoteNode.delete();
+ return;
+ }
+ catch(IllegalStateTransitionException e)
+ {
+ LOGGER.error("Cannot remove node '" + nodeName + "' from the group", e);
+ throw new JMException("Cannot remove node '" + nodeName + "' from the group:" + e.getMessage());
+ }
+ }
+ }
+
+ throw new JMException("Failed to find replication node with name '" + nodeName + "'.");
}
}
@@ -197,11 +216,11 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
{
try
{
- _replicatedEnvironmentFacade.setDesignatedPrimary(primary);
+ _virtualHostNode.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, primary));
}
catch (RuntimeException e)
{
- LOGGER.error("Failed to set node " + _replicatedEnvironmentFacade.getNodeName() + " as designated primary", e);
+ LOGGER.error("Failed to set node " + _virtualHostNode.getName() + " as designated primary", e);
throw new JMException(e.getMessage());
}
}
@@ -209,15 +228,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
@Override
public void updateAddress(String nodeName, String newHostName, int newPort) throws JMException
{
- try
- {
- _replicatedEnvironmentFacade.updateAddress(nodeName, newHostName, newPort);
- }
- catch(RuntimeException e)
- {
- LOGGER.error("Failed to update address for node " + nodeName + " to " + newHostName + ":" + newPort, e);
- throw new JMException(e.getMessage());
- }
+ throw new UnsupportedOperationException("Unsupported operation. Delete the node then add a new node in its place.");
}
@Override
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
index 558cc7e8a9..fb61df68b7 100644
--- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
+++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
@@ -21,17 +21,12 @@
package org.apache.qpid.server.store.berkeleydb.jmx;
import javax.management.JMException;
-import javax.management.StandardMBean;
import org.apache.log4j.Logger;
import org.apache.qpid.server.jmx.MBeanProvider;
import org.apache.qpid.server.jmx.ManagedObject;
import org.apache.qpid.server.jmx.ManagedObjectRegistry;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.store.berkeleydb.BDBHAVirtualHost;
-import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
@@ -58,17 +53,12 @@ public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider
@Override
public ManagedObject createMBean(ConfiguredObject child, ManagedObjectRegistry registry) throws JMException
{
- BDBHAVirtualHostNode<?> virtualHostNode = (BDBHAVirtualHostNode<?>) child;
-
- BDBMessageStore messageStore = (BDBMessageStore) virtualHostNode.getConfigurationStore();
-
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Creating mBean for child " + child);
}
- ReplicatedEnvironmentFacade replicatedEnvironmentFacade = (ReplicatedEnvironmentFacade)messageStore.getEnvironmentFacade();
- return new BDBHAMessageStoreManagerMBean(virtualHostNode.getGroupName(), replicatedEnvironmentFacade, registry);
+ return new BDBHAMessageStoreManagerMBean((BDBHAVirtualHostNode<?>) child, registry);
}
@Override
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java
index b85e44526b..fc1cd0801a 100644
--- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java
+++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java
@@ -41,6 +41,9 @@ public interface ManagedBDBHAMessageStore
public static final String ATTR_DESIGNATED_PRIMARY = "DesignatedPrimary";
public static final String ATTR_COALESCING_SYNC = "CoalescingSync";
+ public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
+ public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
+
@MBeanAttribute(name=ATTR_GROUP_NAME, description="Name identifying the group")
String getGroupName() throws IOException, JMException;
diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
index 6aeadde0f8..439af259ab 100644
--- a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
+++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
@@ -19,15 +19,16 @@
*/
package org.apache.qpid.server.store.berkeleydb.jmx;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.Iterator;
import javax.management.JMException;
import javax.management.ObjectName;
@@ -37,22 +38,24 @@ import javax.management.openmbean.TabularData;
import junit.framework.TestCase;
-import org.apache.qpid.server.jmx.AMQManagedObject;
import org.apache.qpid.server.jmx.ManagedObjectRegistry;
-import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+import org.apache.qpid.server.model.IllegalStateTransitionException;
+import org.apache.qpid.server.model.RemoteReplicationNode;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
public class BDBHAMessageStoreManagerMBeanTest extends TestCase
{
- private static final String TEST_GROUP_NAME = "testGroupName";
+ private static final String TEST_VHOST_NAME = "test";
+ private static final String TEST_GROUP_NAME = TEST_VHOST_NAME;
private static final String TEST_NODE_NAME = "testNodeName";
private static final String TEST_NODE_HOST_PORT = "host:1234";
private static final String TEST_HELPER_HOST_PORT = "host:5678";
private static final String TEST_DURABILITY = "sync,sync,all";
private static final String TEST_NODE_STATE = "MASTER";
- private static final String TEST_STORE_NAME = "testStoreName";
private static final boolean TEST_DESIGNATED_PRIMARY_FLAG = false;
- private ReplicatedEnvironmentFacade _replicatedEnvironmentFacade;
+ private BDBHAVirtualHostNode<?> _virtualHostNode;
private BDBHAMessageStoreManagerMBean _mBean;
@Override
@@ -60,9 +63,13 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
{
super.setUp();
- _replicatedEnvironmentFacade = mock(ReplicatedEnvironmentFacade.class);
+ _virtualHostNode = mock(BDBHAVirtualHostNode.class);
+ when(_virtualHostNode.getName()).thenReturn(TEST_NODE_NAME);
+ when(_virtualHostNode.getGroupName()).thenReturn(TEST_GROUP_NAME);
+ when(_virtualHostNode.getAddress()).thenReturn(TEST_NODE_HOST_PORT);
+
ManagedObjectRegistry registry = mock(ManagedObjectRegistry.class);
- _mBean = new BDBHAMessageStoreManagerMBean(TEST_STORE_NAME, _replicatedEnvironmentFacade, registry);
+ _mBean = new BDBHAMessageStoreManagerMBean(_virtualHostNode, registry);
}
@Override
@@ -73,134 +80,119 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
public void testObjectName() throws Exception
{
- String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(TEST_STORE_NAME);
+ String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(TEST_VHOST_NAME);
assertEquals(expectedObjectName, _mBean.getObjectName().toString());
}
public void testGroupName() throws Exception
{
- when(_replicatedEnvironmentFacade.getGroupName()).thenReturn(TEST_GROUP_NAME);
+ when(_virtualHostNode.getGroupName()).thenReturn(TEST_GROUP_NAME);
assertEquals(TEST_GROUP_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_GROUP_NAME));
}
public void testNodeName() throws Exception
{
- when(_replicatedEnvironmentFacade.getNodeName()).thenReturn(TEST_NODE_NAME);
+ when(_virtualHostNode.getName()).thenReturn(TEST_NODE_NAME);
assertEquals(TEST_NODE_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_NAME));
}
public void testNodeHostPort() throws Exception
{
- when(_replicatedEnvironmentFacade.getHostPort()).thenReturn(TEST_NODE_HOST_PORT);
+ when(_virtualHostNode.getAddress()).thenReturn(TEST_NODE_HOST_PORT);
assertEquals(TEST_NODE_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_HOST_PORT));
}
public void testHelperHostPort() throws Exception
{
- when(_replicatedEnvironmentFacade.getHelperHostPort()).thenReturn(TEST_HELPER_HOST_PORT);
+ when(_virtualHostNode.getHelperAddress()).thenReturn(TEST_HELPER_HOST_PORT);
assertEquals(TEST_HELPER_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_HELPER_HOST_PORT));
}
public void testDurability() throws Exception
{
- when(_replicatedEnvironmentFacade.getDurability()).thenReturn(TEST_DURABILITY);
+ when(_virtualHostNode.getDurability()).thenReturn(TEST_DURABILITY);
assertEquals(TEST_DURABILITY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DURABILITY));
}
public void testCoalescingSync() throws Exception
{
- when(_replicatedEnvironmentFacade.isCoalescingSync()).thenReturn(true);
+ when(_virtualHostNode.isCoalescingSync()).thenReturn(true);
assertEquals(true, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_COALESCING_SYNC));
}
public void testNodeState() throws Exception
{
- when(_replicatedEnvironmentFacade.getNodeState()).thenReturn(TEST_NODE_STATE);
+ when(_virtualHostNode.getRole()).thenReturn(TEST_NODE_STATE);
assertEquals(TEST_NODE_STATE, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_STATE));
}
public void testDesignatedPrimaryFlag() throws Exception
{
- when(_replicatedEnvironmentFacade.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG);
+ when(_virtualHostNode.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG);
assertEquals(TEST_DESIGNATED_PRIMARY_FLAG, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DESIGNATED_PRIMARY));
}
public void testGroupMembersForGroupWithOneNode() throws Exception
{
- List<Map<String, String>> members = Collections.singletonList(createTestNodeResult());
- when(_replicatedEnvironmentFacade.getGroupMembers()).thenReturn(members);
+ BDBHARemoteReplicationNode<?> node = mockRemoteNode();
final TabularData resultsTable = _mBean.getAllNodesInGroup();
- assertTableHasHeadingsNamed(resultsTable, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT);
+ assertTableHasHeadingsNamed(resultsTable, BDBHAMessageStoreManagerMBean.GRP_MEM_COL_NODE_NAME,
+ BDBHAMessageStoreManagerMBean.GRP_MEM_COL_NODE_HOST_PORT);
final int numberOfDataRows = resultsTable.size();
- assertEquals("Unexpected number of data rows", 1 ,numberOfDataRows);
- final CompositeData row = (CompositeData) resultsTable.values().iterator().next();
- assertEquals(TEST_NODE_NAME, row.get(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME));
- assertEquals(TEST_NODE_HOST_PORT, row.get(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT));
- }
+ assertEquals("Unexpected number of data rows", 2, numberOfDataRows);
+ Iterator<?> iterator = resultsTable.values().iterator();
- public void testRemoveNodeFromReplicationGroup() throws Exception
- {
- _mBean.removeNodeFromGroup(TEST_NODE_NAME);
+ final CompositeData firstRow = (CompositeData) iterator.next();
+ assertEquals(TEST_NODE_NAME, firstRow.get(BDBHAMessageStoreManagerMBean.GRP_MEM_COL_NODE_NAME));
+ assertEquals(TEST_NODE_HOST_PORT, firstRow.get(BDBHAMessageStoreManagerMBean.GRP_MEM_COL_NODE_HOST_PORT));
- verify(_replicatedEnvironmentFacade).removeNodeFromGroup(TEST_NODE_NAME);
+ final CompositeData secondRow = (CompositeData) iterator.next();
+ assertEquals(node.getName(), secondRow.get(BDBHAMessageStoreManagerMBean.GRP_MEM_COL_NODE_NAME));
+ assertEquals(node.getAddress(), secondRow.get(BDBHAMessageStoreManagerMBean.GRP_MEM_COL_NODE_HOST_PORT));
}
- public void testRemoveNodeFromReplicationGroupWithError() throws Exception
+ public void testRemoveNodeFromReplicationGroup() throws Exception
{
- doThrow(new RuntimeException("mocked exception")).when(_replicatedEnvironmentFacade).removeNodeFromGroup(TEST_NODE_NAME);
+ BDBHARemoteReplicationNode<?> node = mockRemoteNode();
- try
- {
- _mBean.removeNodeFromGroup(TEST_NODE_NAME);
- fail("Exception not thrown");
- }
- catch (JMException je)
- {
- // PASS
- }
- }
-
- public void testSetAsDesignatedPrimary() throws Exception
- {
- _mBean.setDesignatedPrimary(true);
+ _mBean.removeNodeFromGroup(node.getName());
- verify(_replicatedEnvironmentFacade).setDesignatedPrimary(true);
+ verify(node).delete();
}
- public void testSetAsDesignatedPrimaryWithError() throws Exception
+ public void testRemoveNodeFromReplicationGroupOnIllegalStateTransitionException() throws Exception
{
- doThrow(new RuntimeException("mocked exception")).when(_replicatedEnvironmentFacade).setDesignatedPrimary(true);
+ BDBHARemoteReplicationNode<?> node = mockRemoteNode();
+ doThrow(new IllegalStateTransitionException("test")).when(node).delete();
- try
+ try
{
- _mBean.setDesignatedPrimary(true);
+ _mBean.removeNodeFromGroup("remotenode");
fail("Exception not thrown");
}
catch (JMException je)
{
- // PASS
+ // PASS#
}
}
- public void testUpdateAddress() throws Exception
+ public void testSetAsDesignatedPrimary() throws Exception
{
- String newHostName = "newHostName";
- int newPort = 1967;
-
- _mBean.updateAddress(TEST_NODE_NAME, newHostName, newPort);
+ _mBean.setDesignatedPrimary(true);
- verify(_replicatedEnvironmentFacade).updateAddress(TEST_NODE_NAME, newHostName, newPort);
+ verify(_virtualHostNode).setAttributes(
+ eq(Collections.<String, Object> singletonMap(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, true)));
}
private void assertTableHasHeadingsNamed(final TabularData resultsTable, String... headingNames)
@@ -212,11 +204,16 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
}
}
- private Map<String, String> createTestNodeResult()
+ private BDBHARemoteReplicationNode<?> mockRemoteNode()
{
- Map<String, String> items = new HashMap<String, String>();
- items.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME);
- items.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT);
- return items;
+ BDBHARemoteReplicationNode<?> remoteNode = mock(BDBHARemoteReplicationNode.class);
+ when(remoteNode.getName()).thenReturn("remotenode");
+ when(remoteNode.getAddress()).thenReturn("remotehost:port");
+
+ @SuppressWarnings("rawtypes")
+ Collection<? extends RemoteReplicationNode> remoteNodes = Collections.singletonList(remoteNode);
+ doReturn(remoteNodes).when(_virtualHostNode).getRemoteReplicationNodes();
+
+ return remoteNode;
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index ab3d5e0cfa..cde00a8804 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -135,10 +135,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
public static final String TYPE = "BDB-HA";
- // TODO: JMX will change to observe the model, at that point these names will disappear
- public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
- public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
-
private final ReplicatedEnvironmentConfiguration _configuration;
private final Durability _durability;
private final Boolean _coalescingSync;
@@ -718,21 +714,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
- public List<Map<String, String>> getGroupMembers()
- {
- List<Map<String, String>> members = new ArrayList<Map<String, String>>();
-
- for (ReplicationNode node : _environment.getGroup().getNodes())
- {
- Map<String, String> nodeMap = new HashMap<String, String>();
- nodeMap.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, node.getName());
- nodeMap.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, node.getHostName() + ":" + node.getPort());
- members.add(nodeMap);
- }
-
- return members;
- }
-
private ReplicationGroupAdmin createReplicationGroupAdmin()
{
final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
index d143d5a748..d1e6b39bcc 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
@@ -21,14 +21,15 @@
package org.apache.qpid.server.virtualhostnode.berkeleydb;
+import java.io.File;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import com.sleepycat.je.rep.MasterStateException;
import com.sleepycat.je.rep.ReplicatedEnvironment;
-import org.apache.log4j.Logger;
+import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -53,7 +54,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB
private final AtomicReference<State> _state;
- public BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNodeImpl virtualHostNode, Map<String, Object> attributes, ReplicatedEnvironmentFacade replicatedEnvironmentFacade)
+ public BDBHARemoteReplicationNodeImpl(BDBHAVirtualHostNode<?> virtualHostNode, Map<String, Object> attributes, ReplicatedEnvironmentFacade replicatedEnvironmentFacade)
{
super(parentsMap(virtualHostNode), attributes);
_address = (String)attributes.get(ADDRESS);
@@ -97,10 +98,30 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB
return _lastTransactionId;
}
- @StateTransition(currentState = {State.ACTIVE, State.QUIESCED, State.STOPPED, State.ERRORED}, desiredState = State.DELETED)
+ @StateTransition(currentState = {State.ACTIVE, State.STOPPED}, desiredState = State.DELETED)
private void doDelete()
{
- this.deleted();
+ String nodeName = getName();
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Deleting node '" + nodeName + "' from group '" + getGroupName() + "'");
+ }
+
+ try
+ {
+ _replicatedEnvironmentFacade.removeNodeFromGroup(nodeName);
+ _state.set(State.DELETED);
+ deleted();
+ }
+ catch(MasterStateException e)
+ {
+ throw new IllegalStateTransitionException("Node '" + nodeName + "' cannot be deleted when role is a master");
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateTransitionException("Unexpected exception on node '" + nodeName + "' deletion", e);
+ }
}
protected void afterSetRole()
@@ -127,37 +148,6 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB
}
@Override
- protected boolean setState(State desiredState)
- {
- if (desiredState == State.DELETED)
- {
- String nodeName = getName();
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Deleting node '" + nodeName + "' from group '" + getGroupName() + "'");
- }
-
- try
- {
- _replicatedEnvironmentFacade.removeNodeFromGroup(nodeName);
- _state.set(State.DELETED);
- delete();
- return true;
- }
- catch(MasterStateException e)
- {
- throw new IllegalStateTransitionException("Node '" + nodeName + "' cannot be deleted when role is a master");
- }
- catch (Exception e)
- {
- throw new IllegalStateTransitionException("Unexpected exception on node '" + nodeName + "' deletion", e);
- }
- }
- return false;
- }
-
- @Override
protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes)
{
super.validateChange(proxyForValidation, changedAttributes);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
index ce7c79208f..d2ed22c14c 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
@@ -37,8 +37,8 @@ import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicationNode;
import com.sleepycat.je.rep.StateChangeEvent;
import com.sleepycat.je.rep.StateChangeListener;
-import org.apache.log4j.Logger;
+import org.apache.log4j.Logger;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
index 4bff8918fd..6fd7b0bc1d 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
@@ -403,16 +403,32 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
assertTrue("Replication nodes have not been seen during 5s", remoteNodeLatch.await(5, TimeUnit.SECONDS));
- Collection<? extends RemoteReplicationNode> remoteNodes = node1.getRemoteReplicationNodes();
- BDBHARemoteReplicationNodeImpl replicaRemoteNode = (BDBHARemoteReplicationNodeImpl)remoteNodes.iterator().next();
+ BDBHARemoteReplicationNodeImpl replicaRemoteNode = null;
long awaitReplicaRoleCount = 0;
- while(!"REPLICA".equals(replicaRemoteNode.getRole()))
+ while(replicaRemoteNode == null)
{
+ Collection<? extends RemoteReplicationNode> remoteNodes = node1.getRemoteReplicationNodes();
+ if (remoteNodes != null)
+ {
+ for (RemoteReplicationNode node : remoteNodes)
+ {
+ BDBHARemoteReplicationNodeImpl bdbNode = (BDBHARemoteReplicationNodeImpl)node;
+ if ("REPLICA".equals(bdbNode.getRole()))
+ {
+ replicaRemoteNode = bdbNode;
+ break;
+ }
+ }
+ if (replicaRemoteNode != null)
+ {
+ break;
+ }
+ }
Thread.sleep(100);
if (awaitReplicaRoleCount > 50)
{
- fail("Remote replication node is not in a REPLICA role");
+ fail("Remote replication node is not in a REPLICA role: " + remoteNodes);
}
awaitReplicaRoleCount++;
}
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java
index b15a4ab37d..db06aa579e 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java
@@ -123,26 +123,24 @@ public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase
private void assertNode(String nodeName, int nodePort, int nodeHelperPort, String masterNode) throws Exception
{
boolean isMaster = nodeName.equals(masterNode);
- waitForAttributeChanged(_baseNodeRestUrl + nodeName + "?depth=0", BDBHAVirtualHostNode.ROLE, isMaster? "MASTER" : "REPLICA");
+ String expectedRole = isMaster? "MASTER" : "REPLICA";
+ waitForAttributeChanged(_baseNodeRestUrl + nodeName + "?depth=0", BDBHAVirtualHostNode.ROLE, expectedRole);
- Map<String, Object> nodeData = getRestTestHelper().getJsonAsSingletonList(_baseNodeRestUrl + nodeName);
+ Map<String, Object> nodeData = getRestTestHelper().getJsonAsSingletonList(_baseNodeRestUrl + nodeName + "?depth=0");
assertEquals("Unexpected name", nodeName, nodeData.get(BDBHAVirtualHostNode.NAME));
assertEquals("Unexpected type", "BDB_HA", nodeData.get(BDBHAVirtualHostNode.TYPE));
assertEquals("Unexpected path", new File(_storeBaseDir, nodeName).getPath(), nodeData.get(BDBHAVirtualHostNode.STORE_PATH));
assertEquals("Unexpected address", "localhost:" + nodePort, nodeData.get(BDBHAVirtualHostNode.ADDRESS));
assertEquals("Unexpected helper address", "localhost:" + nodeHelperPort, nodeData.get(BDBHAVirtualHostNode.HELPER_ADDRESS));
assertEquals("Unexpected group name", _hostName, nodeData.get(BDBHAVirtualHostNode.GROUP_NAME));;
+ assertEquals("Unexpected role", expectedRole, nodeData.get(BDBHAVirtualHostNode.ROLE));
if (isMaster)
{
- assertEquals("Unexpected role", "MASTER", nodeData.get(BDBHAVirtualHostNode.ROLE));
Map<String, Object> hostData = getRestTestHelper().getJsonAsSingletonList("virtualhost/" + masterNode + "/" + _hostName + "?depth=0");
assertEquals("Unexpected host name", _hostName, hostData.get(VirtualHost.NAME));
}
- else
- {
- assertEquals("Unexpected role", "REPLICA", nodeData.get(BDBHAVirtualHostNode.ROLE));
- }
+
}
private void assertRemoteNodes(String masterNode, String... replicaNodes) throws Exception
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
index e8d18971ad..200f2c1087 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
@@ -38,7 +38,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore;
-import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -144,11 +143,11 @@ public class HAClusterManagementTest extends QpidBrokerTestCase
CompositeData row = groupMembers.get(new Object[] {nodeName});
assertNotNull("Table does not contain row for node name " + nodeName, row);
- assertEquals(nodeHostPort, row.get(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT));
+ assertEquals(nodeHostPort, row.get(ManagedBDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT));
}
}
- public void testRemoveNodeFromGroup() throws Exception
+ public void testRemoveRemoteNodeFromGroup() throws Exception
{
final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
final int brokerPortNumberToMakeObservation = brokerPortNumberIterator.next();
@@ -158,70 +157,17 @@ public class HAClusterManagementTest extends QpidBrokerTestCase
final String removedNodeName = _clusterCreator.getNodeNameForNodeAt(_clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeRemoved));
_clusterCreator.stopNode(brokerPortNumberToBeRemoved);
- storeBean.removeNodeFromGroup(removedNodeName);
-
- final int numberOfDataRowsAfterRemoval = storeBean.getAllNodesInGroup().size();
- assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES - 1,numberOfDataRowsAfterRemoval);
- }
-
- /**
- * Updates the address of a node.
- *
- * If the broker (node) can subsequently start without error then the update was a success, hence no need for an explicit
- * assert.
- *
- * @see #testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() for converse case
- */
- public void testUpdateAddress() throws Exception
- {
- final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
- final int brokerPortNumberToPerformUpdate = brokerPortNumberIterator.next();
- final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next();
- final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToPerformUpdate);
-
- _clusterCreator.stopNode(brokerPortNumberToBeMoved);
-
- final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved);
- final int newBdbPort = getNextAvailable(oldBdbPort + 1);
-
- storeBean.updateAddress(_clusterCreator.getNodeNameForNodeAt(oldBdbPort), _clusterCreator.getIpAddressOfBrokerHost(), newBdbPort);
-
- _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort);
-
- _clusterCreator.startNode(brokerPortNumberToBeMoved);
- }
-
- /**
- * @see #testUpdateAddress()
- */
- public void testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() throws Exception
- {
- final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
- final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next();
- _clusterCreator.stopNode(brokerPortNumberToBeMoved);
-
- final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved);
- final int newBdbPort = getNextAvailable(oldBdbPort + 1);
-
- // now deliberately don't call updateAddress
-
- _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort);
+ storeBean.removeNodeFromGroup(removedNodeName);
- try
- {
- _clusterCreator.startNode(brokerPortNumberToBeMoved);
- fail("Exception not thrown");
- }
- catch(RuntimeException rte)
+ long limitTime = System.currentTimeMillis() + 5000;
+ while((NUMBER_OF_NODES == storeBean.getAllNodesInGroup().size()) && System.currentTimeMillis() < limitTime)
{
- //check cause was BDBs EnvironmentFailureException
- assertTrue("Message '"+rte.getMessage()+"' does not contain '"
- + EnvironmentFailureException.class.getName()
- + "'.",
- rte.getMessage().contains(EnvironmentFailureException.class.getName()));
- // PASS
+ Thread.sleep(100l);
}
+
+ int numberOfDataRowsAfterRemoval = storeBean.getAllNodesInGroup().size();
+ assertEquals("Unexpected number of data rows after test", NUMBER_OF_NODES - 1, numberOfDataRowsAfterRemoval);
}
public void testVirtualHostOperationsDeniedForNonMasterNode() throws Exception
@@ -254,6 +200,20 @@ public class HAClusterManagementTest extends QpidBrokerTestCase
}
}
+ public void testSetDesignatedPrimary() throws Exception
+ {
+ int brokerPort = _clusterCreator.getBrokerPortNumbersForNodes().iterator().next();
+ final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPort);
+ assertFalse("Unexpected designated primary before change", storeBean.getDesignatedPrimary());
+ storeBean.setDesignatedPrimary(true);
+ long limit = System.currentTimeMillis() + 5000;
+ while(!storeBean.getDesignatedPrimary() && System.currentTimeMillis() < limit)
+ {
+ Thread.sleep(100l);
+ }
+ assertTrue("Unexpected designated primary after change", storeBean.getDesignatedPrimary());
+ }
+
private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort(final int brokerPortNumber) throws Exception
{
_jmxUtils.open(brokerPortNumber);
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java
index d3b5b786e9..9d1cd8df59 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java
@@ -23,15 +23,16 @@ package org.apache.qpid.server.jmx;
import java.io.IOException;
import java.lang.reflect.Type;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.management.JMException;
import org.apache.log4j.Logger;
-
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.jmx.mbeans.LoggingManagementMBean;
import org.apache.qpid.server.jmx.mbeans.ServerInformationMBean;
@@ -81,7 +82,7 @@ public class JMXManagementPluginImpl
private JMXManagedObjectRegistry _objectRegistry;
private final Object _childrenLock = new Object();
- private final Map<ConfiguredObject, AMQManagedObject> _children = new HashMap<ConfiguredObject, AMQManagedObject>();
+ private final Map<ConfiguredObject, List<ManagedObject>> _children = new HashMap<ConfiguredObject, List<ManagedObject>>();
@ManagedAttributeField
private boolean _usePlatformMBeanServer;
@@ -157,7 +158,7 @@ public class JMXManagementPluginImpl
if (host != null)
{
VirtualHostMBean mbean = new VirtualHostMBean(host, _objectRegistry);
- _children.put(host, mbean);
+ addMBean(host, mbean);
}
createAdditionalMBeansFromProviders(virtualHostNode, _objectRegistry);
}
@@ -170,7 +171,7 @@ public class JMXManagementPluginImpl
UserManagementMBean mbean = new UserManagementMBean(
(PasswordCredentialManagingAuthenticationProvider) authenticationProvider,
_objectRegistry);
- _children.put(authenticationProvider, mbean);
+ addMBean(authenticationProvider, mbean);
}
createAdditionalMBeansFromProviders(authenticationProvider, _objectRegistry);
}
@@ -186,6 +187,17 @@ public class JMXManagementPluginImpl
_allowPortActivation = false;
}
+ private void addMBean(ConfiguredObject configuredObject, ManagedObject mbean)
+ {
+ List<ManagedObject> mbeanList = _children.get(configuredObject);
+ if (mbeanList == null)
+ {
+ mbeanList = new ArrayList<ManagedObject>();
+ _children.put(configuredObject, mbeanList);
+ }
+ mbeanList.add(mbean);
+ }
+
@Override
public boolean isActivationAllowed(final Port<?> port)
{
@@ -223,7 +235,21 @@ public class JMXManagementPluginImpl
{
for(ConfiguredObject object : _children.keySet())
{
- AMQManagedObject mbean = _children.get(object);
+ unregisterChildMBeans(object);
+ }
+ _children.clear();
+ }
+ getBroker().removeChangeListener(this);
+ closeObjectRegistry();
+ }
+
+ private void unregisterChildMBeans(ConfiguredObject object)
+ {
+ List<ManagedObject> mbeans = _children.get(object);
+ if (mbeans != null)
+ {
+ for (ManagedObject mbean : mbeans)
+ {
if (mbean instanceof ConfigurationChangeListener)
{
object.removeChangeListener((ConfigurationChangeListener)mbean);
@@ -237,10 +263,7 @@ public class JMXManagementPluginImpl
LOGGER.error("Exception while unregistering mbean for " + object.getClass().getSimpleName() + " " + object.getName(), e);
}
}
- _children.clear();
}
- getBroker().removeChangeListener(this);
- closeObjectRegistry();
}
@Override
@@ -278,7 +301,7 @@ public class JMXManagementPluginImpl
if (mbean != null)
{
- _children.put(child, mbean);
+ addMBean(child, mbean);
}
createAdditionalMBeansFromProviders(child, _objectRegistry);
}
@@ -296,20 +319,8 @@ public class JMXManagementPluginImpl
synchronized (_childrenLock)
{
child.removeChangeListener(this);
-
- AMQManagedObject mbean = _children.remove(child);
- if(mbean != null)
- {
- try
- {
- mbean.unregister();
- }
- catch(Exception e)
- {
- LOGGER.error("Exception while unregistering mbean for " + child.getClass().getSimpleName() + " " + child.getName(), e);
- //TODO - report error on removing child MBean
- }
- }
+ unregisterChildMBeans(child);
+ _children.remove(child);
}
}
@@ -337,8 +348,10 @@ public class JMXManagementPluginImpl
LOGGER.debug("Provider will create mbean");
}
mBean = provider.createMBean(child, registry);
- // TODO track the mbeans that have been created on behalf of a child in a map, then
- // if the child is ever removed, destroy these beans too.
+ if (mBean != null)
+ {
+ addMBean(child, mBean);
+ }
}
if(LOGGER.isDebugEnabled())