diff options
| author | Alex Rudyy <orudyy@apache.org> | 2014-05-15 16:14:41 +0000 |
|---|---|---|
| committer | Alex Rudyy <orudyy@apache.org> | 2014-05-15 16:14:41 +0000 |
| commit | 081bc26aad845b3b8c4190afb7014206b74e4335 (patch) | |
| tree | 0b624cacf11597497898529813fce5c95c66f87e /qpid/java | |
| parent | 2c802479f99ff5eeeb0958b0869c700f55c0e889 (diff) | |
| download | qpid-python-081bc26aad845b3b8c4190afb7014206b74e4335.tar.gz | |
QPID-5715: Fix Virtual Host MBean creation in JMX management plugin
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1594963 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
10 files changed, 551 insertions, 177 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index 4aff32fe13..9602c2c29c 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -439,6 +439,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu if (virtualHost!= null) { virtualHost.close(); + childRemoved(virtualHost); } } @@ -473,6 +474,8 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu LOGGER.error("Unexpected state change: " + state); throw new IllegalStateException("Unexpected state change: " + state); } + + attributeSet(ROLE, _role, state.name()); } } diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java index 0de61d68cd..faeb068f4d 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java @@ -20,6 +20,10 @@ package org.apache.qpid.server.store.berkeleydb; import java.io.File; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -30,6 +34,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.test.utils.TestUtils; @@ -115,6 +120,129 @@ public class HAClusterBlackboxTest extends QpidBrokerTestCase connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } + public void testTransferMasterFromLocalNode() throws Exception + { + final Connection connection = getConnection(_brokerFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener); + + final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + + final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection); + LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort); + + Map<String, Object> attributes = _clusterCreator.getNodeAttributes(inactiveBrokerPort); + assertEquals("Inactive broker has unexpeced role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE)); + _clusterCreator.setNodeAttributes(inactiveBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER")); + + _failoverAwaitingListener.assertFailoverOccurs(20000); + LOGGER.info("Listener has finished"); + + attributes = _clusterCreator.getNodeAttributes(inactiveBrokerPort); + assertEquals("Inactive broker has unexpeced role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE)); + + assertProducingConsuming(connection); + + _clusterCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA"); + } + + public void testTransferMasterFromRemoteNode() throws Exception + { + final Connection connection = getConnection(_brokerFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener); + + final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + + final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection); + LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort); + + _clusterCreator.awaitNodeToAttainRole(activeBrokerPort, inactiveBrokerPort, "REPLICA"); + Map<String, Object> attributes = _clusterCreator.getNodeAttributes(activeBrokerPort, inactiveBrokerPort); + assertEquals("Inactive broker has unexpeced role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE)); + + _clusterCreator.setNodeAttributes(activeBrokerPort, inactiveBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER")); + + _failoverAwaitingListener.assertFailoverOccurs(20000); + LOGGER.info("Listener has finished"); + + attributes = _clusterCreator.getNodeAttributes(inactiveBrokerPort); + assertEquals("Inactive broker has unexpeced role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE)); + + assertProducingConsuming(connection); + + _clusterCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA"); + } + + public void testQuorumOverride() throws Exception + { + final Connection connection = getConnection(_brokerFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener); + + Set<Integer> ports = _clusterCreator.getBrokerPortNumbersForNodes(); + Iterator<Integer> iterator = ports.iterator(); + Integer quorumOverridePort = iterator.next(); + iterator.remove(); + + for (Integer p : ports) + { + _clusterCreator.stopNode(p); + } + + Map<String, Object> attributes = _clusterCreator.getNodeAttributes(quorumOverridePort); + assertEquals("Broker has unexpeced quorum override", new Integer(0), attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE)); + _clusterCreator.setNodeAttributes(quorumOverridePort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 1)); + + _failoverAwaitingListener.assertFailoverOccurs(20000); + LOGGER.info("Listener has finished"); + + attributes = _clusterCreator.getNodeAttributes(quorumOverridePort); + assertEquals("Broker has unexpeced quorum override", new Integer(1), attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE)); + + assertProducingConsuming(connection); + } + + public void testPriority() throws Exception + { + final Connection connection = getConnection(_brokerFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener); + + final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + + int priority = 1; + Integer highestPriorityBrokerPort = null; + Set<Integer> ports = _clusterCreator.getBrokerPortNumbersForNodes(); + for (Integer port : ports) + { + if (activeBrokerPort != port.intValue()) + { + priority = priority + 1; + highestPriorityBrokerPort = port; + _clusterCreator.setNodeAttributes(port, port, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PRIORITY, priority)); + Map<String, Object> attributes = _clusterCreator.getNodeAttributes(port, port); + assertEquals("Broker has unexpeced priority", priority, attributes.get(BDBHAVirtualHostNode.PRIORITY)); + } + } + + LOGGER.info("Broker on port " + highestPriorityBrokerPort + " has the highest priority of " + priority); + + LOGGER.info("Shutting down the MASTER"); + _clusterCreator.stopNode(activeBrokerPort); + + _failoverAwaitingListener.assertFailoverOccurs(20000); + LOGGER.info("Listener has finished"); + + Map<String, Object> attributes = _clusterCreator.getNodeAttributes(highestPriorityBrokerPort, highestPriorityBrokerPort); + assertEquals("Inactive broker has unexpeced role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE)); + + assertProducingConsuming(connection); + } + private final class FailoverAwaitingListener implements ConnectionListener { private final CountDownLatch _failoverLatch = new CountDownLatch(1); diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java index 200f2c1087..c184ef8f8d 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java @@ -25,8 +25,11 @@ import static com.sleepycat.je.rep.ReplicatedEnvironment.State.REPLICA; import static com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.Set; import javax.jms.Connection; @@ -37,11 +40,14 @@ import javax.management.openmbean.TabularData; import org.apache.log4j.Logger; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; +import org.apache.qpid.systest.rest.RestTestHelper; import org.apache.qpid.test.utils.JMXTestUtils; import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import com.sleepycat.je.EnvironmentFailureException; +import org.junit.Assert; /** * System test verifying the ability to control a cluster via the Management API. @@ -214,6 +220,68 @@ public class HAClusterManagementTest extends QpidBrokerTestCase assertTrue("Unexpected designated primary after change", storeBean.getDesignatedPrimary()); } + public void testVirtualHostMbeanOnMasterTransfer() throws Exception + { + Connection connection = getConnection(_brokerFailoverUrl); + int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + connection.close(); + + Set<Integer> ports = _clusterCreator.getBrokerPortNumbersForNodes(); + ports.remove(activeBrokerPort); + + int inactiveBrokerPort = ports.iterator().next(); + LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort); + + ManagedBroker inactiveVirtualHostMBean = getManagedBrokerBeanForNodeAtBrokerPort(inactiveBrokerPort); + + try + { + inactiveVirtualHostMBean.createNewQueue(getTestQueueName(), null, true); + fail("Exception not thrown"); + } + catch (Exception e) + { + String message = e.getMessage(); + assertEquals("The virtual hosts state of PASSIVE does not permit this operation.", message); + } + + Map<String, Object> attributes = _clusterCreator.getNodeAttributes(inactiveBrokerPort); + assertEquals("Inactive broker has unexpeced role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE)); + _clusterCreator.setNodeAttributes(inactiveBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER")); + + _clusterCreator.awaitNodeToAttainRole(inactiveBrokerPort, "MASTER"); + + awaitVirtualHostAtNode(inactiveBrokerPort); + + ManagedBroker activeVirtualHostMBean = getManagedBrokerBeanForNodeAtBrokerPort(inactiveBrokerPort); + activeVirtualHostMBean.createNewQueue(getTestQueueName() + inactiveBrokerPort, null, true); + } + + public void awaitVirtualHostAtNode(int brokerPort) throws Exception + { + final long startTime = System.currentTimeMillis(); + Map<String, Object> data = Collections.emptyMap(); + String nodeName = _clusterCreator.getNodeNameForBrokerPort(brokerPort); + RestTestHelper restHelper = _clusterCreator.createRestTestHelper(brokerPort); + while(!State.ACTIVE.name().equals(data.get(VirtualHost.STATE)) && (System.currentTimeMillis() - startTime) < 30000) + { + LOGGER.debug("Awaiting virtual host '" + nodeName + "' to transit into active state"); + List<Map<String, Object>> results= restHelper.getJsonAsList("virtualhost/" + nodeName + "/" + VIRTUAL_HOST); + if (results.size()== 1) + { + data = results.get(0); + } + + if (!State.ACTIVE.name().equals(data.get(VirtualHost.STATE))) + { + Thread.sleep(1000); + } + } + Assert.assertEquals("Virtual host is not active", State.ACTIVE.name(), data.get(VirtualHost.STATE)); + LOGGER.debug("Virtual host '" + nodeName + "' is in active state"); + } + private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort(final int brokerPortNumber) throws Exception { _jmxUtils.open(brokerPortNumber); diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java index b414905ddc..24296246b8 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java @@ -196,17 +196,4 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase return storeBean; } - private void assertProducingConsuming(final Connection connection) throws JMSException, Exception - { - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Destination destination = session.createQueue(getTestQueueName()); - MessageConsumer consumer = session.createConsumer(destination); - sendMessage(session, destination, 1); - connection.start(); - Message m1 = consumer.receive(RECEIVE_TIMEOUT); - assertNotNull("Message 1 is not received", m1); - assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX)); - session.commit(); - } - } diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java index 408643b98a..ef5cc7c464 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java @@ -235,22 +235,6 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase killBroker(initialPortNumber); // kill awaits the death of the child } - private void assertProducingConsuming(final Connection connection) throws JMSException, Exception - { - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Destination destination = session.createQueue(getTestQueueName()); - MessageConsumer consumer = session.createConsumer(destination); - sendMessage(session, destination, 2); - connection.start(); - Message m1 = consumer.receive(RECEIVE_TIMEOUT); - assertNotNull("Message 1 is not received", m1); - assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX)); - Message m2 = consumer.receive(RECEIVE_TIMEOUT); - assertNotNull("Message 2 is not received", m2); - assertEquals("Unexpected second message received", 1, m2.getIntProperty(INDEX)); - session.commit(); - } - private void closeConnection(final Connection initialConnection) { try diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java index 96a75db2cc..48e15561d0 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java @@ -20,8 +20,10 @@ package org.apache.qpid.server.store.berkeleydb; import java.io.File; +import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -42,11 +44,17 @@ import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.server.management.plugin.HttpManagement; +import org.apache.qpid.server.model.Plugin; +import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode; import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; +import org.apache.qpid.systest.rest.RestTestHelper; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.test.utils.TestBrokerConfiguration; import org.apache.qpid.url.URLSyntaxException; +import org.junit.Assert; import com.sleepycat.je.rep.ReplicationConfig; @@ -119,6 +127,9 @@ public class HATestClusterCreator TestBrokerConfiguration brokerConfiguration = _testcase.getBrokerConfiguration(brokerPort); brokerConfiguration.addJmxManagementConfiguration(); + brokerConfiguration.addHttpManagementConfiguration(); + brokerConfiguration.setObjectAttribute(Plugin.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_MANAGEMENT, HttpManagement.HTTP_BASIC_AUTHENTICATION_ENABLED, true); + brokerConfiguration.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT, Port.PORT, _testcase.getHttpManagementPort(brokerPort)); brokerConfiguration.setObjectAttributes(VirtualHostNode.class, _virtualHostName, virtualHostNodeAttributes); brokerPort = _testcase.getNextAvailable(bdbPort + 1); @@ -273,8 +284,7 @@ public class HATestClusterCreator return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, FAILOVER_CYCLECOUNT)); } - public AMQConnectionURL getConnectionUrlForSingleNodeWithoutRetry(final int brokerPortNumber) throws - URLSyntaxException + public AMQConnectionURL getConnectionUrlForSingleNodeWithoutRetry(final int brokerPortNumber) throws URLSyntaxException { return getConnectionUrlForSingleNode(brokerPortNumber, false); } @@ -380,4 +390,97 @@ public class HATestClusterCreator config.setSaved(false); } + public String getNodeNameForBrokerPort(final int brokerPort) + { + return getNodeNameForNodeAt(_brokerPortToBdbPortMap.get(brokerPort)); + } + + public void setNodeAttributes(int brokerPort, Map<String, Object> attributeMap) + throws Exception + { + setNodeAttributes(brokerPort, brokerPort, attributeMap); + } + + public void setNodeAttributes(int localNodePort, int remoteNodePort, Map<String, Object> attributeMap) + throws Exception + { + RestTestHelper restHelper = createRestTestHelper(localNodePort); + String url = getNodeRestUrl(localNodePort, remoteNodePort); + int status = restHelper.submitRequest(url, "PUT", attributeMap); + if (status != 200) + { + throw new Exception("Unexpected http status when updating " + getNodeNameForBrokerPort(remoteNodePort) + " attribute's : " + status); + } + } + + private String getNodeRestUrl(int localNodePort, int remoteNodePort) + { + String remoteNodeName = getNodeNameForBrokerPort(remoteNodePort); + String localNodeName = getNodeNameForBrokerPort(localNodePort); + String url = null; + if (localNodePort == remoteNodePort) + { + url = "/api/latest/virtualhostnode/" + localNodeName; + } + else + { + url = "/api/latest/replicationnode/" + localNodeName + "/" + remoteNodeName; + } + return url; + } + + public Map<String, Object> getNodeAttributes(int brokerPort) throws IOException + { + return getNodeAttributes(brokerPort, brokerPort); + } + + public Map<String, Object> getNodeAttributes(int localNodePort, int remoteNodePort) throws IOException + { + RestTestHelper restHelper = createRestTestHelper(localNodePort); + List<Map<String, Object>> results= restHelper.getJsonAsList(getNodeRestUrl(localNodePort, remoteNodePort)); + int size = results.size(); + if (size == 0) + { + return Collections.emptyMap(); + } + else if (size == 1) + { + return results.get(0); + } + else + { + throw new RuntimeException("Unexpected number of nodes " + size); + } + } + + public void awaitNodeToAttainRole(int brokerPort, String desiredRole) throws Exception + { + awaitNodeToAttainRole(brokerPort, brokerPort, desiredRole); + } + + public void awaitNodeToAttainRole(int localNodePort, int remoteNodePort, String desiredRole) throws Exception + { + final long startTime = System.currentTimeMillis(); + Map<String, Object> data = Collections.emptyMap(); + + while(!desiredRole.equals(data.get(BDBHARemoteReplicationNode.ROLE)) && (System.currentTimeMillis() - startTime) < 30000) + { + LOGGER.debug("Awaiting node '" + getNodeNameForBrokerPort(remoteNodePort) + "' to transit into " + desiredRole + " role"); + data = getNodeAttributes(localNodePort, remoteNodePort); + if (!desiredRole.equals(data.get(BDBHARemoteReplicationNode.ROLE))) + { + Thread.sleep(1000); + } + } + LOGGER.debug("Node '" + getNodeNameForBrokerPort(remoteNodePort) + "' role is " + data.get(BDBHARemoteReplicationNode.ROLE)); + Assert.assertEquals("Node is in unexpected role", desiredRole, data.get(BDBHARemoteReplicationNode.ROLE)); + } + + public RestTestHelper createRestTestHelper(int brokerPort) + { + int httpPort = _testcase.getHttpManagementPort(brokerPort); + RestTestHelper helper = new RestTestHelper(httpPort); + helper.setUsernameAndPassword("webadmin", "webadmin"); + return helper; + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java index d20c709e90..a381c72f99 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java @@ -29,11 +29,11 @@ public interface ConfigurationChangeListener * @param oldState the state prior to the change * @param newState the state after the change */ - void stateChanged(ConfiguredObject object, State oldState, State newState); + void stateChanged(ConfiguredObject<?> object, State oldState, State newState); - void childAdded(ConfiguredObject object, ConfiguredObject child); + void childAdded(ConfiguredObject<?> object, ConfiguredObject<?> child); - void childRemoved(ConfiguredObject object, ConfiguredObject child); + void childRemoved(ConfiguredObject<?> object, ConfiguredObject<?> child); - void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue); + void attributeSet(ConfiguredObject<?> object, String attributeName, Object oldAttributeValue, Object newAttributeValue); } diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java index 4b4b78decc..74153b8672 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java @@ -22,11 +22,9 @@ package org.apache.qpid.server.jmx; import java.io.IOException; -import java.lang.reflect.Type; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.List; +import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -60,9 +58,7 @@ import org.apache.qpid.server.model.port.RmiPort; import org.apache.qpid.server.plugin.QpidServiceLoader; public class JMXManagementPluginImpl - extends AbstractPluginAdapter<JMXManagementPluginImpl> implements ConfigurationChangeListener, - JMXManagementPlugin<JMXManagementPluginImpl>, - PortManager + extends AbstractPluginAdapter<JMXManagementPluginImpl> implements JMXManagementPlugin<JMXManagementPluginImpl>, PortManager { private static final Logger LOGGER = Logger.getLogger(JMXManagementPluginImpl.class); @@ -71,28 +67,38 @@ public class JMXManagementPluginImpl // default values public static final String DEFAULT_NAME = "JMXManagement"; - @SuppressWarnings("serial") - private static final Map<String, Type> ATTRIBUTE_TYPES = new HashMap<String, Type>(){{ - put(USE_PLATFORM_MBEAN_SERVER, Boolean.class); - put(NAME, String.class); - put(TYPE, String.class); - put(TYPE, String.class); - }}; - private JMXManagedObjectRegistry _objectRegistry; private final Object _childrenLock = new Object(); - private final Map<ConfiguredObject, List<ManagedObject>> _children = new HashMap<ConfiguredObject, List<ManagedObject>>(); + private final Map<ConfiguredObject<?>, Map<MBeanProvider, ManagedObject>> _children = new HashMap<ConfiguredObject<?>, Map<MBeanProvider,ManagedObject>>(); @ManagedAttributeField private boolean _usePlatformMBeanServer; private boolean _allowPortActivation; + private final Set<MBeanProvider> _mBeanProviders; + private final ChangeListener _changeListener; + private final PluginMBeansProvider _pluginMBeanProvider; + @ManagedObjectFactoryConstructor - public JMXManagementPluginImpl(Map<String, Object> attributes, Broker broker) + public JMXManagementPluginImpl(Map<String, Object> attributes, Broker<?> broker) { super(attributes, broker); + _changeListener = new ChangeListener(); + _pluginMBeanProvider = new PluginMBeansProvider(); + _mBeanProviders = new HashSet<MBeanProvider>(); + QpidServiceLoader<MBeanProvider> qpidServiceLoader = new QpidServiceLoader<MBeanProvider>(); + for (MBeanProvider provider : qpidServiceLoader.instancesOf(MBeanProvider.class)) + { + _mBeanProviders.add(provider); + } + } + + @Override + public boolean getUsePlatformMBeanServer() + { + return _usePlatformMBeanServer; } @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE) @@ -103,7 +109,7 @@ public class JMXManagementPluginImpl JmxPort<?> connectorPort = null; RmiPort registryPort = null; Collection<Port<?>> ports = broker.getPorts(); - for (Port port : ports) + for (Port<?> port : ports) { if (port.getDesiredState() != State.ACTIVE) { @@ -142,38 +148,19 @@ public class JMXManagementPluginImpl _objectRegistry = new JMXManagedObjectRegistry(broker, connectorPort, registryPort, this); - broker.addChangeListener(this); + broker.addChangeListener(_changeListener); synchronized (_childrenLock) { for(VirtualHostNode<?> virtualHostNode : broker.getVirtualHostNodes()) { - virtualHostNode.addChangeListener(this); - - // Virtualhostnodes may or may not have a virtualhost at this point. In the HA - // case, JE may spontaneously make the node a master causing it to create a virtualhost. - // Creation of the vhost uses the task executor (same thread that executes this code - // so there is no potential for a race here). - VirtualHost host = virtualHostNode.getVirtualHost(); - if (host != null) - { - VirtualHostMBean mbean = new VirtualHostMBean(host, _objectRegistry); - addMBean(host, mbean); - } - createAdditionalMBeansFromProviders(virtualHostNode, _objectRegistry); + createObjectMBeans(virtualHostNode); } Collection<AuthenticationProvider<?>> authenticationProviders = broker.getAuthenticationProviders(); for (AuthenticationProvider<?> authenticationProvider : authenticationProviders) { - if(authenticationProvider instanceof PasswordCredentialManagingAuthenticationProvider) - { - UserManagementMBean mbean = new UserManagementMBean( - (PasswordCredentialManagingAuthenticationProvider) authenticationProvider, - _objectRegistry); - addMBean(authenticationProvider, mbean); - } - createAdditionalMBeansFromProviders(authenticationProvider, _objectRegistry); + createObjectMBeans(authenticationProvider); } } new Shutdown(_objectRegistry); @@ -187,36 +174,18 @@ public class JMXManagementPluginImpl _allowPortActivation = false; } - private void addMBean(ConfiguredObject configuredObject, ManagedObject mbean) - { - List<ManagedObject> mbeanList = _children.get(configuredObject); - if (mbeanList == null) - { - mbeanList = new ArrayList<ManagedObject>(); - _children.put(configuredObject, mbeanList); - } - mbeanList.add(mbean); - } - @Override public boolean isActivationAllowed(final Port<?> port) { return _allowPortActivation; } - @Override - protected void onOpen() - { - super.onOpen(); - - } - - private boolean isConnectorPort(Port port) + private boolean isConnectorPort(Port<?> port) { return port.getAvailableProtocols().contains(Protocol.JMX_RMI); } - private boolean isRegistryPort(Port port) + private boolean isRegistryPort(Port<?> port) { return port.getAvailableProtocols().contains(Protocol.RMI); } @@ -226,27 +195,42 @@ public class JMXManagementPluginImpl { synchronized (_childrenLock) { - for(ConfiguredObject object : _children.keySet()) + for(ConfiguredObject<?> object : _children.keySet()) { - unregisterChildMBeans(object); + unregisterObjectMBeans(object); } _children.clear(); } - getBroker().removeChangeListener(this); + getBroker().removeChangeListener(_changeListener); closeObjectRegistry(); } - private void unregisterChildMBeans(ConfiguredObject object) + private void unregisterObjectMBeans(ConfiguredObject<?> object) { - List<ManagedObject> mbeans = _children.get(object); + Map<?, ManagedObject> mbeans = _children.get(object); if (mbeans != null) { - for (ManagedObject mbean : mbeans) + for (ManagedObject mbean : mbeans.values()) { if (mbean instanceof ConfigurationChangeListener) { object.removeChangeListener((ConfigurationChangeListener)mbean); } + + if (LOGGER.isDebugEnabled()) + { + String mbeanName = null; + try + { + mbeanName = mbean.getObjectName().toString(); + } + catch(Exception e) + { + // ignore + } + LOGGER.debug("Unregistering MBean " + mbeanName + " for configured object " + object); + } + try { mbean.unregister(); @@ -259,74 +243,9 @@ public class JMXManagementPluginImpl } } - @Override - public void stateChanged(ConfiguredObject object, State oldState, State newState) + private void createAdditionalMBeansFromProvidersIfNecessary(ConfiguredObject<?> child, ManagedObjectRegistry registry) throws JMException { - // no-op - } - - @Override - public void childAdded(ConfiguredObject object, ConfiguredObject child) - { - - synchronized (_childrenLock) - { - try - { - AMQManagedObject mbean; - if (child instanceof VirtualHostNode) - { - child.addChangeListener(this); - } - if(child instanceof VirtualHost) - { - VirtualHost vhostChild = (VirtualHost)child; - mbean = new VirtualHostMBean(vhostChild, _objectRegistry); - } - else if(child instanceof PasswordCredentialManagingAuthenticationProvider) - { - mbean = new UserManagementMBean((PasswordCredentialManagingAuthenticationProvider) child, _objectRegistry); - } - else - { - mbean = null; - } - - if (mbean != null) - { - addMBean(child, mbean); - } - createAdditionalMBeansFromProviders(child, _objectRegistry); - } - catch(Exception e) - { - LOGGER.error("Exception while creating mbean for " + child.getClass().getSimpleName() + " " + child.getName(), e); - // TODO - Implement error reporting on mbean creation - } - } - } - - @Override - public void childRemoved(ConfiguredObject object, ConfiguredObject child) - { - synchronized (_childrenLock) - { - child.removeChangeListener(this); - unregisterChildMBeans(child); - _children.remove(child); - } - } - - @Override - public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue) - { - // no-op - } - - private void createAdditionalMBeansFromProviders(ConfiguredObject child, ManagedObjectRegistry registry) throws JMException - { - QpidServiceLoader<MBeanProvider> qpidServiceLoader = new QpidServiceLoader<MBeanProvider>(); - for (MBeanProvider provider : qpidServiceLoader.instancesOf(MBeanProvider.class)) + for (MBeanProvider provider : _mBeanProviders) { if(LOGGER.isDebugEnabled()) { @@ -334,16 +253,17 @@ public class JMXManagementPluginImpl } ManagedObject mBean = null; - if (provider.isChildManageableByMBean(child)) + if (provider.isChildManageableByMBean(child) && !providerMBeanExists(child, provider)) { if(LOGGER.isDebugEnabled()) { - LOGGER.debug("Provider will create mbean"); + LOGGER.debug("Provider of type " + provider.getType() + " will create mbean for " + child); } + mBean = provider.createMBean(child, registry); if (mBean != null) { - addMBean(child, mBean); + registerMBean(child, provider, mBean); } } @@ -383,9 +303,170 @@ public class JMXManagementPluginImpl } } - @Override - public boolean getUsePlatformMBeanServer() + private ManagedObject createObjectMBeansIfNecessary(ConfiguredObject<?> object) throws JMException { - return _usePlatformMBeanServer; + ManagedObject mbean = null; + if (supportedConfiguredObject(object)) + { + synchronized (_childrenLock) + { + if (!providerMBeanExists(object, _pluginMBeanProvider)) + { + if (object instanceof VirtualHostNode) + { + object.addChangeListener(_changeListener); + VirtualHostNode<?> virtualHostNode = (VirtualHostNode<?>) object; + + // Virtual host nodes may or may not have a virtual host at this point. + // In the HA case, JE may spontaneously make the node a master causing it to create a virtual host. + // Creation of the virtual host uses the task executor (same thread that executes this code + // so there is no potential for a race here). + VirtualHost<?, ?, ?> host = virtualHostNode.getVirtualHost(); + if (host != null) + { + createVirtualHostMBeanIfNecessary(host, _objectRegistry); + } + } + else if (object instanceof VirtualHost) + { + mbean = createVirtualHostMBeanIfNecessary((VirtualHost<?, ?, ?>) object, _objectRegistry); + } + else if (object instanceof PasswordCredentialManagingAuthenticationProvider) + { + object.addChangeListener(_changeListener); + mbean = new UserManagementMBean((PasswordCredentialManagingAuthenticationProvider<?>) object, _objectRegistry); + registerMBean(object, _pluginMBeanProvider, mbean); + } + createAdditionalMBeansFromProvidersIfNecessary(object, _objectRegistry); + } + } + } + return mbean; } + + private VirtualHostMBean createVirtualHostMBeanIfNecessary(VirtualHost<?, ?, ?> host, ManagedObjectRegistry _objectRegistry) throws JMException + { + if (!providerMBeanExists(host, _pluginMBeanProvider)) + { + VirtualHostMBean mbean = new VirtualHostMBean(host, _objectRegistry); + registerMBean(host, _pluginMBeanProvider, mbean); + host.addChangeListener(_changeListener); + return mbean; + } + return null; + } + + private void registerMBean(ConfiguredObject<?> configuredObject, MBeanProvider mBeanProvider, ManagedObject mbean) + { + Map<MBeanProvider, ManagedObject> mbeans = _children.get(configuredObject); + if (mbeans == null) + { + mbeans = new HashMap<MBeanProvider, ManagedObject>(); + _children.put(configuredObject, mbeans); + } + mbeans.put(mBeanProvider, mbean); + } + + private boolean providerMBeanExists(ConfiguredObject<?> configuredObject, MBeanProvider mBeanProvider) + { + Map<MBeanProvider, ManagedObject> mbeans = _children.get(configuredObject); + if (mbeans == null) + { + return false; + } + return mbeans.containsKey(mBeanProvider); + } + + private void destroyObjectMBeans(ConfiguredObject<?> child, boolean removeListener) + { + if (supportedConfiguredObject(child)) + { + synchronized (_childrenLock) + { + if (removeListener) + { + child.removeChangeListener(_changeListener); + } + unregisterObjectMBeans(child); + _children.remove(child); + } + } + } + + private void createObjectMBeans(ConfiguredObject<?> object) + { + try + { + createObjectMBeansIfNecessary(object); + } + catch (JMException e) + { + LOGGER.error("Cannot create MBean for " + object, e); + } + } + + private boolean supportedConfiguredObject(ConfiguredObject<?> object) + { + return object instanceof VirtualHostNode || object instanceof VirtualHost || object instanceof PasswordCredentialManagingAuthenticationProvider; + } + + private class PluginMBeansProvider implements MBeanProvider + { + @Override + public boolean isChildManageableByMBean(ConfiguredObject<?> object) + { + return supportedConfiguredObject(object); + } + + @Override + public ManagedObject createMBean(ConfiguredObject<?> object, ManagedObjectRegistry registry) throws JMException + { + return createObjectMBeansIfNecessary(object); + } + + @Override + public String getType() + { + return "INTERNAL"; + } + } + + private class ChangeListener implements ConfigurationChangeListener + { + @Override + public void stateChanged(ConfiguredObject<?> object, State oldState, State newState) + { + if (newState == State.DELETED || newState == State.STOPPED) + { + destroyObjectMBeans(object, newState == State.DELETED); + } + else if (newState == State.ACTIVE) + { + createObjectMBeans(object); + } + } + + @Override + public void childAdded(ConfiguredObject<?> object, ConfiguredObject<?> child) + { + createObjectMBeans(child); + } + + @Override + public void childRemoved(ConfiguredObject<?> object, ConfiguredObject<?> child) + { + destroyObjectMBeans(child, true); + } + + @Override + public void attributeSet(ConfiguredObject<?> object, String attributeName, Object oldAttributeValue, Object newAttributeValue) + { + // VH can be created after attribute change, + // for instance, on role change in BDB HA VHN a VH could is recovered/created. + // A call to createObjectMBeans is safe as it checks the existence of MBean before its creation. + + createObjectMBeans(object); + } + } + } diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanProvider.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanProvider.java index 8817c16f83..16ecde5ad4 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanProvider.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanProvider.java @@ -34,11 +34,12 @@ import org.apache.qpid.server.plugin.QpidServiceLoader; */ public interface MBeanProvider extends Pluggable { + /** * Tests whether a <code>child</code> can be managed by the mbean * provided by this provider. */ - boolean isChildManageableByMBean(ConfiguredObject child); + boolean isChildManageableByMBean(ConfiguredObject<?> child); /** * Creates a mbean for this child. This method should only be called if @@ -47,6 +48,6 @@ public interface MBeanProvider extends Pluggable * * @return newly created mbean */ - ManagedObject createMBean(ConfiguredObject child, ManagedObjectRegistry registry) throws JMException; + ManagedObject createMBean(ConfiguredObject<?> child, ManagedObjectRegistry registry) throws JMException; } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index 073fae3a7a..1aee32db1b 100755 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -1411,4 +1411,23 @@ public class QpidBrokerTestCase extends QpidTestCase return FAILING_PORT; } + public int getHttpManagementPort(int mainPort) + { + return mainPort + (DEFAULT_HTTP_MANAGEMENT_PORT - DEFAULT_PORT); + } + + public void assertProducingConsuming(final Connection connection) throws Exception + { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(getTestQueueName()); + MessageConsumer consumer = session.createConsumer(destination); + sendMessage(session, destination, 1); + session.commit(); + connection.start(); + Message m1 = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message 1 is not received", m1); + assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX)); + session.commit(); + session.close(); + } } |
