summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2014-05-15 16:14:41 +0000
committerAlex Rudyy <orudyy@apache.org>2014-05-15 16:14:41 +0000
commit081bc26aad845b3b8c4190afb7014206b74e4335 (patch)
tree0b624cacf11597497898529813fce5c95c66f87e /qpid/java
parent2c802479f99ff5eeeb0958b0869c700f55c0e889 (diff)
downloadqpid-python-081bc26aad845b3b8c4190afb7014206b74e4335.tar.gz
QPID-5715: Fix Virtual Host MBean creation in JMX management plugin
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1594963 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java3
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java128
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java72
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java13
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java16
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java107
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java8
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java357
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanProvider.java5
-rwxr-xr-xqpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java19
10 files changed, 551 insertions, 177 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
index 4aff32fe13..9602c2c29c 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
@@ -439,6 +439,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
if (virtualHost!= null)
{
virtualHost.close();
+ childRemoved(virtualHost);
}
}
@@ -473,6 +474,8 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
LOGGER.error("Unexpected state change: " + state);
throw new IllegalStateException("Unexpected state change: " + state);
}
+
+ attributeSet(ROLE, _role, state.name());
}
}
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
index 0de61d68cd..faeb068f4d 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
@@ -20,6 +20,10 @@
package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -30,6 +34,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.test.utils.TestUtils;
@@ -115,6 +120,129 @@ public class HAClusterBlackboxTest extends QpidBrokerTestCase
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
+ public void testTransferMasterFromLocalNode() throws Exception
+ {
+ final Connection connection = getConnection(_brokerFailoverUrl);
+
+ ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener);
+
+ final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection);
+ LOGGER.info("Active connection port " + activeBrokerPort);
+
+ final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection);
+ LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort);
+
+ Map<String, Object> attributes = _clusterCreator.getNodeAttributes(inactiveBrokerPort);
+ assertEquals("Inactive broker has unexpeced role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE));
+ _clusterCreator.setNodeAttributes(inactiveBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
+
+ _failoverAwaitingListener.assertFailoverOccurs(20000);
+ LOGGER.info("Listener has finished");
+
+ attributes = _clusterCreator.getNodeAttributes(inactiveBrokerPort);
+ assertEquals("Inactive broker has unexpeced role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
+
+ assertProducingConsuming(connection);
+
+ _clusterCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA");
+ }
+
+ public void testTransferMasterFromRemoteNode() throws Exception
+ {
+ final Connection connection = getConnection(_brokerFailoverUrl);
+
+ ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener);
+
+ final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection);
+ LOGGER.info("Active connection port " + activeBrokerPort);
+
+ final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection);
+ LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort);
+
+ _clusterCreator.awaitNodeToAttainRole(activeBrokerPort, inactiveBrokerPort, "REPLICA");
+ Map<String, Object> attributes = _clusterCreator.getNodeAttributes(activeBrokerPort, inactiveBrokerPort);
+ assertEquals("Inactive broker has unexpeced role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE));
+
+ _clusterCreator.setNodeAttributes(activeBrokerPort, inactiveBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
+
+ _failoverAwaitingListener.assertFailoverOccurs(20000);
+ LOGGER.info("Listener has finished");
+
+ attributes = _clusterCreator.getNodeAttributes(inactiveBrokerPort);
+ assertEquals("Inactive broker has unexpeced role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
+
+ assertProducingConsuming(connection);
+
+ _clusterCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA");
+ }
+
+ public void testQuorumOverride() throws Exception
+ {
+ final Connection connection = getConnection(_brokerFailoverUrl);
+
+ ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener);
+
+ Set<Integer> ports = _clusterCreator.getBrokerPortNumbersForNodes();
+ Iterator<Integer> iterator = ports.iterator();
+ Integer quorumOverridePort = iterator.next();
+ iterator.remove();
+
+ for (Integer p : ports)
+ {
+ _clusterCreator.stopNode(p);
+ }
+
+ Map<String, Object> attributes = _clusterCreator.getNodeAttributes(quorumOverridePort);
+ assertEquals("Broker has unexpeced quorum override", new Integer(0), attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE));
+ _clusterCreator.setNodeAttributes(quorumOverridePort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 1));
+
+ _failoverAwaitingListener.assertFailoverOccurs(20000);
+ LOGGER.info("Listener has finished");
+
+ attributes = _clusterCreator.getNodeAttributes(quorumOverridePort);
+ assertEquals("Broker has unexpeced quorum override", new Integer(1), attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE));
+
+ assertProducingConsuming(connection);
+ }
+
+ public void testPriority() throws Exception
+ {
+ final Connection connection = getConnection(_brokerFailoverUrl);
+
+ ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener);
+
+ final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection);
+ LOGGER.info("Active connection port " + activeBrokerPort);
+
+ int priority = 1;
+ Integer highestPriorityBrokerPort = null;
+ Set<Integer> ports = _clusterCreator.getBrokerPortNumbersForNodes();
+ for (Integer port : ports)
+ {
+ if (activeBrokerPort != port.intValue())
+ {
+ priority = priority + 1;
+ highestPriorityBrokerPort = port;
+ _clusterCreator.setNodeAttributes(port, port, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PRIORITY, priority));
+ Map<String, Object> attributes = _clusterCreator.getNodeAttributes(port, port);
+ assertEquals("Broker has unexpeced priority", priority, attributes.get(BDBHAVirtualHostNode.PRIORITY));
+ }
+ }
+
+ LOGGER.info("Broker on port " + highestPriorityBrokerPort + " has the highest priority of " + priority);
+
+ LOGGER.info("Shutting down the MASTER");
+ _clusterCreator.stopNode(activeBrokerPort);
+
+ _failoverAwaitingListener.assertFailoverOccurs(20000);
+ LOGGER.info("Listener has finished");
+
+ Map<String, Object> attributes = _clusterCreator.getNodeAttributes(highestPriorityBrokerPort, highestPriorityBrokerPort);
+ assertEquals("Inactive broker has unexpeced role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
+
+ assertProducingConsuming(connection);
+ }
+
private final class FailoverAwaitingListener implements ConnectionListener
{
private final CountDownLatch _failoverLatch = new CountDownLatch(1);
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
index 200f2c1087..c184ef8f8d 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
@@ -25,8 +25,11 @@ import static com.sleepycat.je.rep.ReplicatedEnvironment.State.REPLICA;
import static com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
import javax.jms.Connection;
@@ -37,11 +40,14 @@ import javax.management.openmbean.TabularData;
import org.apache.log4j.Logger;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
+import org.apache.qpid.systest.rest.RestTestHelper;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-import com.sleepycat.je.EnvironmentFailureException;
+import org.junit.Assert;
/**
* System test verifying the ability to control a cluster via the Management API.
@@ -214,6 +220,68 @@ public class HAClusterManagementTest extends QpidBrokerTestCase
assertTrue("Unexpected designated primary after change", storeBean.getDesignatedPrimary());
}
+ public void testVirtualHostMbeanOnMasterTransfer() throws Exception
+ {
+ Connection connection = getConnection(_brokerFailoverUrl);
+ int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection);
+ LOGGER.info("Active connection port " + activeBrokerPort);
+ connection.close();
+
+ Set<Integer> ports = _clusterCreator.getBrokerPortNumbersForNodes();
+ ports.remove(activeBrokerPort);
+
+ int inactiveBrokerPort = ports.iterator().next();
+ LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort);
+
+ ManagedBroker inactiveVirtualHostMBean = getManagedBrokerBeanForNodeAtBrokerPort(inactiveBrokerPort);
+
+ try
+ {
+ inactiveVirtualHostMBean.createNewQueue(getTestQueueName(), null, true);
+ fail("Exception not thrown");
+ }
+ catch (Exception e)
+ {
+ String message = e.getMessage();
+ assertEquals("The virtual hosts state of PASSIVE does not permit this operation.", message);
+ }
+
+ Map<String, Object> attributes = _clusterCreator.getNodeAttributes(inactiveBrokerPort);
+ assertEquals("Inactive broker has unexpeced role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE));
+ _clusterCreator.setNodeAttributes(inactiveBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
+
+ _clusterCreator.awaitNodeToAttainRole(inactiveBrokerPort, "MASTER");
+
+ awaitVirtualHostAtNode(inactiveBrokerPort);
+
+ ManagedBroker activeVirtualHostMBean = getManagedBrokerBeanForNodeAtBrokerPort(inactiveBrokerPort);
+ activeVirtualHostMBean.createNewQueue(getTestQueueName() + inactiveBrokerPort, null, true);
+ }
+
+ public void awaitVirtualHostAtNode(int brokerPort) throws Exception
+ {
+ final long startTime = System.currentTimeMillis();
+ Map<String, Object> data = Collections.emptyMap();
+ String nodeName = _clusterCreator.getNodeNameForBrokerPort(brokerPort);
+ RestTestHelper restHelper = _clusterCreator.createRestTestHelper(brokerPort);
+ while(!State.ACTIVE.name().equals(data.get(VirtualHost.STATE)) && (System.currentTimeMillis() - startTime) < 30000)
+ {
+ LOGGER.debug("Awaiting virtual host '" + nodeName + "' to transit into active state");
+ List<Map<String, Object>> results= restHelper.getJsonAsList("virtualhost/" + nodeName + "/" + VIRTUAL_HOST);
+ if (results.size()== 1)
+ {
+ data = results.get(0);
+ }
+
+ if (!State.ACTIVE.name().equals(data.get(VirtualHost.STATE)))
+ {
+ Thread.sleep(1000);
+ }
+ }
+ Assert.assertEquals("Virtual host is not active", State.ACTIVE.name(), data.get(VirtualHost.STATE));
+ LOGGER.debug("Virtual host '" + nodeName + "' is in active state");
+ }
+
private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort(final int brokerPortNumber) throws Exception
{
_jmxUtils.open(brokerPortNumber);
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
index b414905ddc..24296246b8 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
@@ -196,17 +196,4 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase
return storeBean;
}
- private void assertProducingConsuming(final Connection connection) throws JMSException, Exception
- {
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Destination destination = session.createQueue(getTestQueueName());
- MessageConsumer consumer = session.createConsumer(destination);
- sendMessage(session, destination, 1);
- connection.start();
- Message m1 = consumer.receive(RECEIVE_TIMEOUT);
- assertNotNull("Message 1 is not received", m1);
- assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX));
- session.commit();
- }
-
}
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
index 408643b98a..ef5cc7c464 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
@@ -235,22 +235,6 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase
killBroker(initialPortNumber); // kill awaits the death of the child
}
- private void assertProducingConsuming(final Connection connection) throws JMSException, Exception
- {
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Destination destination = session.createQueue(getTestQueueName());
- MessageConsumer consumer = session.createConsumer(destination);
- sendMessage(session, destination, 2);
- connection.start();
- Message m1 = consumer.receive(RECEIVE_TIMEOUT);
- assertNotNull("Message 1 is not received", m1);
- assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX));
- Message m2 = consumer.receive(RECEIVE_TIMEOUT);
- assertNotNull("Message 2 is not received", m2);
- assertEquals("Unexpected second message received", 1, m2.getIntProperty(INDEX));
- session.commit();
- }
-
private void closeConnection(final Connection initialConnection)
{
try
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
index 96a75db2cc..48e15561d0 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
@@ -20,8 +20,10 @@
package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -42,11 +44,17 @@ import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.server.management.plugin.HttpManagement;
+import org.apache.qpid.server.model.Plugin;
+import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
+import org.apache.qpid.systest.rest.RestTestHelper;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
import org.apache.qpid.url.URLSyntaxException;
+import org.junit.Assert;
import com.sleepycat.je.rep.ReplicationConfig;
@@ -119,6 +127,9 @@ public class HATestClusterCreator
TestBrokerConfiguration brokerConfiguration = _testcase.getBrokerConfiguration(brokerPort);
brokerConfiguration.addJmxManagementConfiguration();
+ brokerConfiguration.addHttpManagementConfiguration();
+ brokerConfiguration.setObjectAttribute(Plugin.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_MANAGEMENT, HttpManagement.HTTP_BASIC_AUTHENTICATION_ENABLED, true);
+ brokerConfiguration.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT, Port.PORT, _testcase.getHttpManagementPort(brokerPort));
brokerConfiguration.setObjectAttributes(VirtualHostNode.class, _virtualHostName, virtualHostNodeAttributes);
brokerPort = _testcase.getNextAvailable(bdbPort + 1);
@@ -273,8 +284,7 @@ public class HATestClusterCreator
return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, FAILOVER_CYCLECOUNT));
}
- public AMQConnectionURL getConnectionUrlForSingleNodeWithoutRetry(final int brokerPortNumber) throws
- URLSyntaxException
+ public AMQConnectionURL getConnectionUrlForSingleNodeWithoutRetry(final int brokerPortNumber) throws URLSyntaxException
{
return getConnectionUrlForSingleNode(brokerPortNumber, false);
}
@@ -380,4 +390,97 @@ public class HATestClusterCreator
config.setSaved(false);
}
+ public String getNodeNameForBrokerPort(final int brokerPort)
+ {
+ return getNodeNameForNodeAt(_brokerPortToBdbPortMap.get(brokerPort));
+ }
+
+ public void setNodeAttributes(int brokerPort, Map<String, Object> attributeMap)
+ throws Exception
+ {
+ setNodeAttributes(brokerPort, brokerPort, attributeMap);
+ }
+
+ public void setNodeAttributes(int localNodePort, int remoteNodePort, Map<String, Object> attributeMap)
+ throws Exception
+ {
+ RestTestHelper restHelper = createRestTestHelper(localNodePort);
+ String url = getNodeRestUrl(localNodePort, remoteNodePort);
+ int status = restHelper.submitRequest(url, "PUT", attributeMap);
+ if (status != 200)
+ {
+ throw new Exception("Unexpected http status when updating " + getNodeNameForBrokerPort(remoteNodePort) + " attribute's : " + status);
+ }
+ }
+
+ private String getNodeRestUrl(int localNodePort, int remoteNodePort)
+ {
+ String remoteNodeName = getNodeNameForBrokerPort(remoteNodePort);
+ String localNodeName = getNodeNameForBrokerPort(localNodePort);
+ String url = null;
+ if (localNodePort == remoteNodePort)
+ {
+ url = "/api/latest/virtualhostnode/" + localNodeName;
+ }
+ else
+ {
+ url = "/api/latest/replicationnode/" + localNodeName + "/" + remoteNodeName;
+ }
+ return url;
+ }
+
+ public Map<String, Object> getNodeAttributes(int brokerPort) throws IOException
+ {
+ return getNodeAttributes(brokerPort, brokerPort);
+ }
+
+ public Map<String, Object> getNodeAttributes(int localNodePort, int remoteNodePort) throws IOException
+ {
+ RestTestHelper restHelper = createRestTestHelper(localNodePort);
+ List<Map<String, Object>> results= restHelper.getJsonAsList(getNodeRestUrl(localNodePort, remoteNodePort));
+ int size = results.size();
+ if (size == 0)
+ {
+ return Collections.emptyMap();
+ }
+ else if (size == 1)
+ {
+ return results.get(0);
+ }
+ else
+ {
+ throw new RuntimeException("Unexpected number of nodes " + size);
+ }
+ }
+
+ public void awaitNodeToAttainRole(int brokerPort, String desiredRole) throws Exception
+ {
+ awaitNodeToAttainRole(brokerPort, brokerPort, desiredRole);
+ }
+
+ public void awaitNodeToAttainRole(int localNodePort, int remoteNodePort, String desiredRole) throws Exception
+ {
+ final long startTime = System.currentTimeMillis();
+ Map<String, Object> data = Collections.emptyMap();
+
+ while(!desiredRole.equals(data.get(BDBHARemoteReplicationNode.ROLE)) && (System.currentTimeMillis() - startTime) < 30000)
+ {
+ LOGGER.debug("Awaiting node '" + getNodeNameForBrokerPort(remoteNodePort) + "' to transit into " + desiredRole + " role");
+ data = getNodeAttributes(localNodePort, remoteNodePort);
+ if (!desiredRole.equals(data.get(BDBHARemoteReplicationNode.ROLE)))
+ {
+ Thread.sleep(1000);
+ }
+ }
+ LOGGER.debug("Node '" + getNodeNameForBrokerPort(remoteNodePort) + "' role is " + data.get(BDBHARemoteReplicationNode.ROLE));
+ Assert.assertEquals("Node is in unexpected role", desiredRole, data.get(BDBHARemoteReplicationNode.ROLE));
+ }
+
+ public RestTestHelper createRestTestHelper(int brokerPort)
+ {
+ int httpPort = _testcase.getHttpManagementPort(brokerPort);
+ RestTestHelper helper = new RestTestHelper(httpPort);
+ helper.setUsernameAndPassword("webadmin", "webadmin");
+ return helper;
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java
index d20c709e90..a381c72f99 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java
@@ -29,11 +29,11 @@ public interface ConfigurationChangeListener
* @param oldState the state prior to the change
* @param newState the state after the change
*/
- void stateChanged(ConfiguredObject object, State oldState, State newState);
+ void stateChanged(ConfiguredObject<?> object, State oldState, State newState);
- void childAdded(ConfiguredObject object, ConfiguredObject child);
+ void childAdded(ConfiguredObject<?> object, ConfiguredObject<?> child);
- void childRemoved(ConfiguredObject object, ConfiguredObject child);
+ void childRemoved(ConfiguredObject<?> object, ConfiguredObject<?> child);
- void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue);
+ void attributeSet(ConfiguredObject<?> object, String attributeName, Object oldAttributeValue, Object newAttributeValue);
}
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java
index 4b4b78decc..74153b8672 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java
@@ -22,11 +22,9 @@
package org.apache.qpid.server.jmx;
import java.io.IOException;
-import java.lang.reflect.Type;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -60,9 +58,7 @@ import org.apache.qpid.server.model.port.RmiPort;
import org.apache.qpid.server.plugin.QpidServiceLoader;
public class JMXManagementPluginImpl
- extends AbstractPluginAdapter<JMXManagementPluginImpl> implements ConfigurationChangeListener,
- JMXManagementPlugin<JMXManagementPluginImpl>,
- PortManager
+ extends AbstractPluginAdapter<JMXManagementPluginImpl> implements JMXManagementPlugin<JMXManagementPluginImpl>, PortManager
{
private static final Logger LOGGER = Logger.getLogger(JMXManagementPluginImpl.class);
@@ -71,28 +67,38 @@ public class JMXManagementPluginImpl
// default values
public static final String DEFAULT_NAME = "JMXManagement";
- @SuppressWarnings("serial")
- private static final Map<String, Type> ATTRIBUTE_TYPES = new HashMap<String, Type>(){{
- put(USE_PLATFORM_MBEAN_SERVER, Boolean.class);
- put(NAME, String.class);
- put(TYPE, String.class);
- put(TYPE, String.class);
- }};
-
private JMXManagedObjectRegistry _objectRegistry;
private final Object _childrenLock = new Object();
- private final Map<ConfiguredObject, List<ManagedObject>> _children = new HashMap<ConfiguredObject, List<ManagedObject>>();
+ private final Map<ConfiguredObject<?>, Map<MBeanProvider, ManagedObject>> _children = new HashMap<ConfiguredObject<?>, Map<MBeanProvider,ManagedObject>>();
@ManagedAttributeField
private boolean _usePlatformMBeanServer;
private boolean _allowPortActivation;
+ private final Set<MBeanProvider> _mBeanProviders;
+ private final ChangeListener _changeListener;
+ private final PluginMBeansProvider _pluginMBeanProvider;
+
@ManagedObjectFactoryConstructor
- public JMXManagementPluginImpl(Map<String, Object> attributes, Broker broker)
+ public JMXManagementPluginImpl(Map<String, Object> attributes, Broker<?> broker)
{
super(attributes, broker);
+ _changeListener = new ChangeListener();
+ _pluginMBeanProvider = new PluginMBeansProvider();
+ _mBeanProviders = new HashSet<MBeanProvider>();
+ QpidServiceLoader<MBeanProvider> qpidServiceLoader = new QpidServiceLoader<MBeanProvider>();
+ for (MBeanProvider provider : qpidServiceLoader.instancesOf(MBeanProvider.class))
+ {
+ _mBeanProviders.add(provider);
+ }
+ }
+
+ @Override
+ public boolean getUsePlatformMBeanServer()
+ {
+ return _usePlatformMBeanServer;
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)
@@ -103,7 +109,7 @@ public class JMXManagementPluginImpl
JmxPort<?> connectorPort = null;
RmiPort registryPort = null;
Collection<Port<?>> ports = broker.getPorts();
- for (Port port : ports)
+ for (Port<?> port : ports)
{
if (port.getDesiredState() != State.ACTIVE)
{
@@ -142,38 +148,19 @@ public class JMXManagementPluginImpl
_objectRegistry = new JMXManagedObjectRegistry(broker, connectorPort, registryPort, this);
- broker.addChangeListener(this);
+ broker.addChangeListener(_changeListener);
synchronized (_childrenLock)
{
for(VirtualHostNode<?> virtualHostNode : broker.getVirtualHostNodes())
{
- virtualHostNode.addChangeListener(this);
-
- // Virtualhostnodes may or may not have a virtualhost at this point. In the HA
- // case, JE may spontaneously make the node a master causing it to create a virtualhost.
- // Creation of the vhost uses the task executor (same thread that executes this code
- // so there is no potential for a race here).
- VirtualHost host = virtualHostNode.getVirtualHost();
- if (host != null)
- {
- VirtualHostMBean mbean = new VirtualHostMBean(host, _objectRegistry);
- addMBean(host, mbean);
- }
- createAdditionalMBeansFromProviders(virtualHostNode, _objectRegistry);
+ createObjectMBeans(virtualHostNode);
}
Collection<AuthenticationProvider<?>> authenticationProviders = broker.getAuthenticationProviders();
for (AuthenticationProvider<?> authenticationProvider : authenticationProviders)
{
- if(authenticationProvider instanceof PasswordCredentialManagingAuthenticationProvider)
- {
- UserManagementMBean mbean = new UserManagementMBean(
- (PasswordCredentialManagingAuthenticationProvider) authenticationProvider,
- _objectRegistry);
- addMBean(authenticationProvider, mbean);
- }
- createAdditionalMBeansFromProviders(authenticationProvider, _objectRegistry);
+ createObjectMBeans(authenticationProvider);
}
}
new Shutdown(_objectRegistry);
@@ -187,36 +174,18 @@ public class JMXManagementPluginImpl
_allowPortActivation = false;
}
- private void addMBean(ConfiguredObject configuredObject, ManagedObject mbean)
- {
- List<ManagedObject> mbeanList = _children.get(configuredObject);
- if (mbeanList == null)
- {
- mbeanList = new ArrayList<ManagedObject>();
- _children.put(configuredObject, mbeanList);
- }
- mbeanList.add(mbean);
- }
-
@Override
public boolean isActivationAllowed(final Port<?> port)
{
return _allowPortActivation;
}
- @Override
- protected void onOpen()
- {
- super.onOpen();
-
- }
-
- private boolean isConnectorPort(Port port)
+ private boolean isConnectorPort(Port<?> port)
{
return port.getAvailableProtocols().contains(Protocol.JMX_RMI);
}
- private boolean isRegistryPort(Port port)
+ private boolean isRegistryPort(Port<?> port)
{
return port.getAvailableProtocols().contains(Protocol.RMI);
}
@@ -226,27 +195,42 @@ public class JMXManagementPluginImpl
{
synchronized (_childrenLock)
{
- for(ConfiguredObject object : _children.keySet())
+ for(ConfiguredObject<?> object : _children.keySet())
{
- unregisterChildMBeans(object);
+ unregisterObjectMBeans(object);
}
_children.clear();
}
- getBroker().removeChangeListener(this);
+ getBroker().removeChangeListener(_changeListener);
closeObjectRegistry();
}
- private void unregisterChildMBeans(ConfiguredObject object)
+ private void unregisterObjectMBeans(ConfiguredObject<?> object)
{
- List<ManagedObject> mbeans = _children.get(object);
+ Map<?, ManagedObject> mbeans = _children.get(object);
if (mbeans != null)
{
- for (ManagedObject mbean : mbeans)
+ for (ManagedObject mbean : mbeans.values())
{
if (mbean instanceof ConfigurationChangeListener)
{
object.removeChangeListener((ConfigurationChangeListener)mbean);
}
+
+ if (LOGGER.isDebugEnabled())
+ {
+ String mbeanName = null;
+ try
+ {
+ mbeanName = mbean.getObjectName().toString();
+ }
+ catch(Exception e)
+ {
+ // ignore
+ }
+ LOGGER.debug("Unregistering MBean " + mbeanName + " for configured object " + object);
+ }
+
try
{
mbean.unregister();
@@ -259,74 +243,9 @@ public class JMXManagementPluginImpl
}
}
- @Override
- public void stateChanged(ConfiguredObject object, State oldState, State newState)
+ private void createAdditionalMBeansFromProvidersIfNecessary(ConfiguredObject<?> child, ManagedObjectRegistry registry) throws JMException
{
- // no-op
- }
-
- @Override
- public void childAdded(ConfiguredObject object, ConfiguredObject child)
- {
-
- synchronized (_childrenLock)
- {
- try
- {
- AMQManagedObject mbean;
- if (child instanceof VirtualHostNode)
- {
- child.addChangeListener(this);
- }
- if(child instanceof VirtualHost)
- {
- VirtualHost vhostChild = (VirtualHost)child;
- mbean = new VirtualHostMBean(vhostChild, _objectRegistry);
- }
- else if(child instanceof PasswordCredentialManagingAuthenticationProvider)
- {
- mbean = new UserManagementMBean((PasswordCredentialManagingAuthenticationProvider) child, _objectRegistry);
- }
- else
- {
- mbean = null;
- }
-
- if (mbean != null)
- {
- addMBean(child, mbean);
- }
- createAdditionalMBeansFromProviders(child, _objectRegistry);
- }
- catch(Exception e)
- {
- LOGGER.error("Exception while creating mbean for " + child.getClass().getSimpleName() + " " + child.getName(), e);
- // TODO - Implement error reporting on mbean creation
- }
- }
- }
-
- @Override
- public void childRemoved(ConfiguredObject object, ConfiguredObject child)
- {
- synchronized (_childrenLock)
- {
- child.removeChangeListener(this);
- unregisterChildMBeans(child);
- _children.remove(child);
- }
- }
-
- @Override
- public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue)
- {
- // no-op
- }
-
- private void createAdditionalMBeansFromProviders(ConfiguredObject child, ManagedObjectRegistry registry) throws JMException
- {
- QpidServiceLoader<MBeanProvider> qpidServiceLoader = new QpidServiceLoader<MBeanProvider>();
- for (MBeanProvider provider : qpidServiceLoader.instancesOf(MBeanProvider.class))
+ for (MBeanProvider provider : _mBeanProviders)
{
if(LOGGER.isDebugEnabled())
{
@@ -334,16 +253,17 @@ public class JMXManagementPluginImpl
}
ManagedObject mBean = null;
- if (provider.isChildManageableByMBean(child))
+ if (provider.isChildManageableByMBean(child) && !providerMBeanExists(child, provider))
{
if(LOGGER.isDebugEnabled())
{
- LOGGER.debug("Provider will create mbean");
+ LOGGER.debug("Provider of type " + provider.getType() + " will create mbean for " + child);
}
+
mBean = provider.createMBean(child, registry);
if (mBean != null)
{
- addMBean(child, mBean);
+ registerMBean(child, provider, mBean);
}
}
@@ -383,9 +303,170 @@ public class JMXManagementPluginImpl
}
}
- @Override
- public boolean getUsePlatformMBeanServer()
+ private ManagedObject createObjectMBeansIfNecessary(ConfiguredObject<?> object) throws JMException
{
- return _usePlatformMBeanServer;
+ ManagedObject mbean = null;
+ if (supportedConfiguredObject(object))
+ {
+ synchronized (_childrenLock)
+ {
+ if (!providerMBeanExists(object, _pluginMBeanProvider))
+ {
+ if (object instanceof VirtualHostNode)
+ {
+ object.addChangeListener(_changeListener);
+ VirtualHostNode<?> virtualHostNode = (VirtualHostNode<?>) object;
+
+ // Virtual host nodes may or may not have a virtual host at this point.
+ // In the HA case, JE may spontaneously make the node a master causing it to create a virtual host.
+ // Creation of the virtual host uses the task executor (same thread that executes this code
+ // so there is no potential for a race here).
+ VirtualHost<?, ?, ?> host = virtualHostNode.getVirtualHost();
+ if (host != null)
+ {
+ createVirtualHostMBeanIfNecessary(host, _objectRegistry);
+ }
+ }
+ else if (object instanceof VirtualHost)
+ {
+ mbean = createVirtualHostMBeanIfNecessary((VirtualHost<?, ?, ?>) object, _objectRegistry);
+ }
+ else if (object instanceof PasswordCredentialManagingAuthenticationProvider)
+ {
+ object.addChangeListener(_changeListener);
+ mbean = new UserManagementMBean((PasswordCredentialManagingAuthenticationProvider<?>) object, _objectRegistry);
+ registerMBean(object, _pluginMBeanProvider, mbean);
+ }
+ createAdditionalMBeansFromProvidersIfNecessary(object, _objectRegistry);
+ }
+ }
+ }
+ return mbean;
}
+
+ private VirtualHostMBean createVirtualHostMBeanIfNecessary(VirtualHost<?, ?, ?> host, ManagedObjectRegistry _objectRegistry) throws JMException
+ {
+ if (!providerMBeanExists(host, _pluginMBeanProvider))
+ {
+ VirtualHostMBean mbean = new VirtualHostMBean(host, _objectRegistry);
+ registerMBean(host, _pluginMBeanProvider, mbean);
+ host.addChangeListener(_changeListener);
+ return mbean;
+ }
+ return null;
+ }
+
+ private void registerMBean(ConfiguredObject<?> configuredObject, MBeanProvider mBeanProvider, ManagedObject mbean)
+ {
+ Map<MBeanProvider, ManagedObject> mbeans = _children.get(configuredObject);
+ if (mbeans == null)
+ {
+ mbeans = new HashMap<MBeanProvider, ManagedObject>();
+ _children.put(configuredObject, mbeans);
+ }
+ mbeans.put(mBeanProvider, mbean);
+ }
+
+ private boolean providerMBeanExists(ConfiguredObject<?> configuredObject, MBeanProvider mBeanProvider)
+ {
+ Map<MBeanProvider, ManagedObject> mbeans = _children.get(configuredObject);
+ if (mbeans == null)
+ {
+ return false;
+ }
+ return mbeans.containsKey(mBeanProvider);
+ }
+
+ private void destroyObjectMBeans(ConfiguredObject<?> child, boolean removeListener)
+ {
+ if (supportedConfiguredObject(child))
+ {
+ synchronized (_childrenLock)
+ {
+ if (removeListener)
+ {
+ child.removeChangeListener(_changeListener);
+ }
+ unregisterObjectMBeans(child);
+ _children.remove(child);
+ }
+ }
+ }
+
+ private void createObjectMBeans(ConfiguredObject<?> object)
+ {
+ try
+ {
+ createObjectMBeansIfNecessary(object);
+ }
+ catch (JMException e)
+ {
+ LOGGER.error("Cannot create MBean for " + object, e);
+ }
+ }
+
+ private boolean supportedConfiguredObject(ConfiguredObject<?> object)
+ {
+ return object instanceof VirtualHostNode || object instanceof VirtualHost || object instanceof PasswordCredentialManagingAuthenticationProvider;
+ }
+
+ private class PluginMBeansProvider implements MBeanProvider
+ {
+ @Override
+ public boolean isChildManageableByMBean(ConfiguredObject<?> object)
+ {
+ return supportedConfiguredObject(object);
+ }
+
+ @Override
+ public ManagedObject createMBean(ConfiguredObject<?> object, ManagedObjectRegistry registry) throws JMException
+ {
+ return createObjectMBeansIfNecessary(object);
+ }
+
+ @Override
+ public String getType()
+ {
+ return "INTERNAL";
+ }
+ }
+
+ private class ChangeListener implements ConfigurationChangeListener
+ {
+ @Override
+ public void stateChanged(ConfiguredObject<?> object, State oldState, State newState)
+ {
+ if (newState == State.DELETED || newState == State.STOPPED)
+ {
+ destroyObjectMBeans(object, newState == State.DELETED);
+ }
+ else if (newState == State.ACTIVE)
+ {
+ createObjectMBeans(object);
+ }
+ }
+
+ @Override
+ public void childAdded(ConfiguredObject<?> object, ConfiguredObject<?> child)
+ {
+ createObjectMBeans(child);
+ }
+
+ @Override
+ public void childRemoved(ConfiguredObject<?> object, ConfiguredObject<?> child)
+ {
+ destroyObjectMBeans(child, true);
+ }
+
+ @Override
+ public void attributeSet(ConfiguredObject<?> object, String attributeName, Object oldAttributeValue, Object newAttributeValue)
+ {
+ // VH can be created after attribute change,
+ // for instance, on role change in BDB HA VHN a VH could is recovered/created.
+ // A call to createObjectMBeans is safe as it checks the existence of MBean before its creation.
+
+ createObjectMBeans(object);
+ }
+ }
+
}
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanProvider.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanProvider.java
index 8817c16f83..16ecde5ad4 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanProvider.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanProvider.java
@@ -34,11 +34,12 @@ import org.apache.qpid.server.plugin.QpidServiceLoader;
*/
public interface MBeanProvider extends Pluggable
{
+
/**
* Tests whether a <code>child</code> can be managed by the mbean
* provided by this provider.
*/
- boolean isChildManageableByMBean(ConfiguredObject child);
+ boolean isChildManageableByMBean(ConfiguredObject<?> child);
/**
* Creates a mbean for this child. This method should only be called if
@@ -47,6 +48,6 @@ public interface MBeanProvider extends Pluggable
*
* @return newly created mbean
*/
- ManagedObject createMBean(ConfiguredObject child, ManagedObjectRegistry registry) throws JMException;
+ ManagedObject createMBean(ConfiguredObject<?> child, ManagedObjectRegistry registry) throws JMException;
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
index 073fae3a7a..1aee32db1b 100755
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
@@ -1411,4 +1411,23 @@ public class QpidBrokerTestCase extends QpidTestCase
return FAILING_PORT;
}
+ public int getHttpManagementPort(int mainPort)
+ {
+ return mainPort + (DEFAULT_HTTP_MANAGEMENT_PORT - DEFAULT_PORT);
+ }
+
+ public void assertProducingConsuming(final Connection connection) throws Exception
+ {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Destination destination = session.createQueue(getTestQueueName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ sendMessage(session, destination, 1);
+ session.commit();
+ connection.start();
+ Message m1 = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message 1 is not received", m1);
+ assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX));
+ session.commit();
+ session.close();
+ }
}