diff options
| author | Keith Wall <kwall@apache.org> | 2014-04-29 08:29:28 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-04-29 08:29:28 +0000 |
| commit | 5953ca009492eb4d40a63960ea1d8f1854351548 (patch) | |
| tree | 4076c42aeeed98e4bdd668fb941b8360596a8aeb /qpid/java | |
| parent | 1eecf05ef31eebe90631208ba1bf005167b9f234 (diff) | |
| download | qpid-python-5953ca009492eb4d40a63960ea1d8f1854351548.tar.gz | |
QPID-5715: [Java Broker]: Wire up the BDB HA VirtualHostNode to the ReplicatedEnvironmentFacade.
* Attributes priority, quorumOverride, designatedPrimary are exposed as read/write attributes.
* Attribute role is readable (to observe the current role of the node), and writable, to request a change in mastership.
* Attributes joinTime and lastKnownReplicationTransactionId are exposed as derived attributes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1590917 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
5 files changed, 595 insertions, 38 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index 0b00800b04..0f839ea02d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -81,14 +81,17 @@ import com.sleepycat.je.utilint.VLSN; public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener { + public static final String MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.master_transfer_interval"; public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.db_ping_socket_timeout"; public static final String REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME = "qpid.bdb.ha.remote_node_monitor_interval"; private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacade.class); + private static final int DEFAULT_MASTER_TRANSFER_TIMEOUT = 1000 * 60; private static final int DEFAULT_DB_PING_SOCKET_TIMEOUT = 1000; private static final int DEFAULT_REMOTE_NODE_MONITOR_INTERVAL = 1000; + private static final int MASTER_TRANSFER_TIMEOUT = Integer.getInteger(MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME, DEFAULT_MASTER_TRANSFER_TIMEOUT); private static final int DB_PING_SOCKET_TIMEOUT = Integer.getInteger(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, DEFAULT_DB_PING_SOCKET_TIMEOUT); private static final int REMOTE_NODE_MONITOR_INTERVAL = Integer.getInteger(REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, DEFAULT_REMOTE_NODE_MONITOR_INTERVAL); @@ -145,14 +148,13 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private final AtomicReference<State> _state = new AtomicReference<State>(State.OPENING); private final ConcurrentMap<String, DatabaseHolder> _databases = new ConcurrentHashMap<String, DatabaseHolder>(); private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>(); + private final AtomicBoolean _initialised; + private final EnvironmentFacadeTask[] _initialisationTasks; private volatile ReplicatedEnvironment _environment; private volatile long _joinTime; private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState; - private AtomicBoolean _initialised; - private EnvironmentFacadeTask[] _initialisationTasks; - public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, EnvironmentFacadeTask[] initialisationTasks) { _environmentDirectory = new File(configuration.getStorePath()); @@ -214,8 +216,14 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan shutdownAndAwaitExecutorService(_environmentJobExecutor); shutdownAndAwaitExecutorService(_groupChangeExecutor); - closeDatabases(); - closeEnvironment(); + try + { + closeDatabases(); + } + finally + { + closeEnvironment(); + } } finally { @@ -634,10 +642,52 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } + public Future<Void> transferMasterToSelfAsynchronously() + { + final String nodeName = getNodeName(); + return transferMasterAsynchronously(nodeName); + } + + public Future<Void> transferMasterAsynchronously(final String nodeName) + { + return _groupChangeExecutor.submit(new Callable<Void>() + { + @Override + public Void call() throws Exception + { + try + { + ReplicationGroupAdmin admin = createReplicationGroupAdmin(); + String newMaster = admin.transferMaster(Collections.singleton(nodeName), MASTER_TRANSFER_TIMEOUT, TimeUnit.MILLISECONDS, true); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("The mastership has been transfered to " + newMaster); + } + } + catch (DatabaseException e) + { + LOGGER.warn("Exception on transfering the mastership to " + _prettyGroupNodeName + + " Master transfer timeout : " + MASTER_TRANSFER_TIMEOUT, e); + throw e; + } + return null; + } + }); + } + + public void removeNodeFromGroup(final String nodeName) + { + createReplicationGroupAdmin().removeMember(nodeName); + } + + public void updateAddress(final String nodeName, final String newHostName, final int newPort) + { + createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort); + } public long getJoinTime() { - return _joinTime ; + return _joinTime; } public long getLastKnownReplicationTransactionId() @@ -669,16 +719,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan return members; } - public void removeNodeFromGroup(final String nodeName) - { - createReplicationGroupAdmin().removeMember(nodeName); - } - - public void updateAddress(final String nodeName, final String newHostName, final int newPort) - { - createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort); - } - private ReplicationGroupAdmin createReplicationGroupAdmin() { final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>(); @@ -690,7 +730,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan return new ReplicationGroupAdmin(_configuration.getGroupName(), helpers); } - public ReplicatedEnvironment getEnvironment() { return _environment; diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java index f0325b24f8..0e92ac83de 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java @@ -34,7 +34,10 @@ public interface BDBHAVirtualHostNode<X extends BDBHAVirtualHostNode<X>> extends public static final String DESIGNATED_PRIMARY = "designatedPrimary"; public static final String PRIORITY = "priority"; public static final String QUORUM_OVERRIDE = "quorumOverride"; + public static final String ROLE = "role"; public static final String REPLICATED_ENVIRONMENT_CONFIGURATION = "replicatedEnvironmentConfiguration"; + public static final String LAST_KNOWN_REPLICATION_TRANSACTION_ID = "lastKnownReplicationTransactionId"; + public static final String JOIN_TIME = "joinTime"; @ManagedAttribute(automate = true, mandatory=true) String getGroupName(); @@ -61,5 +64,14 @@ public interface BDBHAVirtualHostNode<X extends BDBHAVirtualHostNode<X>> extends int getQuorumOverride(); @ManagedAttribute(automate = true) + String getRole(); + + @ManagedAttribute(automate = true) Map<String, String> getReplicatedEnvironmentConfiguration(); + + @ManagedAttribute(derived = true) + Long getLastKnownReplicationTransactionId(); + + @ManagedAttribute(derived = true) + Long getJoinTime(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index 8b4948da08..8b2dce4168 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -23,13 +23,16 @@ package org.apache.qpid.server.virtualhostnode.berkeleydb; import java.security.PrivilegedAction; import java.util.HashMap; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; -import com.sleepycat.je.rep.StateChangeEvent; -import com.sleepycat.je.rep.StateChangeListener; import org.apache.log4j.Logger; - +import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.BrokerModel; @@ -42,20 +45,34 @@ import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.VirtualHostStoreUpgraderAndRecoverer; import org.apache.qpid.server.store.berkeleydb.BDBHAVirtualHost; import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; +import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostState; import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode; +import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.rep.StateChangeEvent; +import com.sleepycat.je.rep.StateChangeListener; + @ManagedObject( category = false, type = "BDB_HA" ) public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtualHostNodeImpl> implements BDBHAVirtualHostNode<BDBHAVirtualHostNodeImpl> { + /** + * Length of time we synchronously await the a JE mutation to complete. It is not considered an error if we exceed this timeout, although a + * a warning will be logged. + */ + private static final int MUTATE_JE_TIMEOUT_MS = 100; + private static final Logger LOGGER = Logger.getLogger(BDBHAVirtualHostNodeImpl.class); + private final AtomicReference<ReplicatedEnvironmentFacade> _environmentFacade = new AtomicReference<>(); + @ManagedAttributeField private Map<String, String> _environmentConfiguration; @@ -77,18 +94,22 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu @ManagedAttributeField private boolean _coalescingSync; - @ManagedAttributeField + @ManagedAttributeField(afterSet="postSetDesignatedPrimary") private boolean _designatedPrimary; - @ManagedAttributeField + @ManagedAttributeField(afterSet="postSetPriority") private int _priority; - @ManagedAttributeField + @ManagedAttributeField(afterSet="postSetQuorumOverride") private int _quorumOverride; + @ManagedAttributeField(beforeSet="preSetRole", afterSet="postSetRole") + private String _role; + @ManagedAttributeField private Map<String, String> _replicatedEnvironmentConfiguration; + @ManagedObjectFactoryConstructor public BDBHAVirtualHostNodeImpl(Map<String, Object> attributes, Broker<?> broker) { @@ -162,6 +183,39 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } @Override + public String getRole() + { + ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + if (environmentFacade != null) + { + return environmentFacade.getNodeState(); + } + return "UNKNOWN"; + } + + @Override + public Long getLastKnownReplicationTransactionId() + { + ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + if (environmentFacade != null) + { + return environmentFacade.getLastKnownReplicationTransactionId(); + } + return -1L; + } + + @Override + public Long getJoinTime() + { + ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + if (environmentFacade != null) + { + return environmentFacade.getJoinTime(); + } + return -1L; + } + + @Override public Map<String, String> getReplicatedEnvironmentConfiguration() { return _replicatedEnvironmentConfiguration; @@ -171,7 +225,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu public String toString() { return "BDBHAVirtualHostNodeImpl [id=" + getId() + ", name=" + getName() + ", storePath=" + _storePath + ", groupName=" + _groupName + ", address=" + _address - + ", state=" + getState() + "]"; + + ", state=" + getState() + ", priority=" + _priority + ", designatedPrimary=" + _designatedPrimary + ", designatedPrimary=" + _quorumOverride + "]"; } @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -223,9 +277,26 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.CREATED()); getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.STORE_LOCATION(getStorePath())); - ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) getConfigurationStore().getEnvironmentFacade(); environmentFacade.setStateChangeListener(new BDBHAMessageStoreStateChangeListener()); + _environmentFacade.set(environmentFacade); + } + + @Override + protected void stop() + { + try + { + super.stop(); + } + finally + { + ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + if (_environmentFacade.compareAndSet(environmentFacade, null)) + { + environmentFacade.close(); + } + } } private void onMaster() @@ -348,9 +419,188 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } } - private class ReplicaVirtualHost extends BDBHAVirtualHost + // used as post action by field _priority + @SuppressWarnings("unused") + private void postSetPriority() + { + ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + if (environmentFacade != null) + { + try + { + environmentFacade.setPriority(_priority).get(MUTATE_JE_TIMEOUT_MS, TimeUnit.MILLISECONDS); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Node priority changed. " + this); + } + } + catch (TimeoutException e) + { + LOGGER.warn("Change node priority did not complete within " + MUTATE_JE_TIMEOUT_MS + "ms. New value " + _priority + " will become effective once the JE task thread is free."); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + catch (ExecutionException e) + { + throw new ServerScopedRuntimeException("Failed to set priority node to value " + _priority + " on " + this, e); + } + } + } + + // used as post action by field _designatedPrimary + @SuppressWarnings("unused") + private void postSetDesignatedPrimary() + { + ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + if (environmentFacade != null) + { + try + { + environmentFacade.setDesignatedPrimary(_designatedPrimary).get(MUTATE_JE_TIMEOUT_MS, TimeUnit.MILLISECONDS); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Designated primary changed. " + this); + } + } + catch (TimeoutException e) + { + LOGGER.warn("Change designated primary did not complete within " + MUTATE_JE_TIMEOUT_MS + "ms. New value " + _designatedPrimary + " will become effective once the JE task thread is free."); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + catch (ExecutionException e) + { + throw new ServerScopedRuntimeException("Failed to set designated primary to value " + _designatedPrimary + " on " + this, e); + } + } + } + + // used as post action by field _quorumOverride + @SuppressWarnings("unused") + private void postSetQuorumOverride() + { + ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + if (environmentFacade != null) + { + try + { + environmentFacade.setElectableGroupSizeOverride(_quorumOverride).get(MUTATE_JE_TIMEOUT_MS, TimeUnit.MILLISECONDS); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Quorum override changed. " + this); + } + } + catch (TimeoutException e) + { + LOGGER.warn("Change quorum override did not complete within " + MUTATE_JE_TIMEOUT_MS + "ms. New value " + _durability + " will become effective once the JE task thread is free."); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + catch (ExecutionException e) + { + throw new ServerScopedRuntimeException("Failed to set quorum override to value " + _quorumOverride + " on " + this, e); + } + } + } + + // used as pre action by field _role + @SuppressWarnings("unused") + private void preSetRole() + { + ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + if (environmentFacade != null) + { + String currentRole = environmentFacade.getNodeState(); + if (!ReplicatedEnvironment.State.REPLICA.name().equals(currentRole)) + { + throw new IllegalConfigurationException("Cannot transfer mastership when node is not in a replica role." + + "Current role is " + currentRole); + } + } + else + { + // Ignored + } + } + + // used as post action by field _role + @SuppressWarnings("unused") + private void postSetRole() + { + ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + if (environmentFacade != null) + { + try + { + environmentFacade.transferMasterToSelfAsynchronously().get(MUTATE_JE_TIMEOUT_MS, TimeUnit.MILLISECONDS); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Requested master transfer to self. " + this); + } + } + catch (TimeoutException e) + { + LOGGER.warn("Transfer master did not complete within " + MUTATE_JE_TIMEOUT_MS + "ms. Node may still be elected master at a later time."); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + catch (ExecutionException e) + { + throw new ServerScopedRuntimeException("Failed to transfer master to " + this, e); + } + } + else + { + // Ignored + } + } + + // TODO - need a better way of suppressing the persistence of the role field. + @Override + public ConfiguredObjectRecord asObjectRecord() { + final ConfiguredObjectRecord underlying = super.asObjectRecord(); + return new ConfiguredObjectRecord() + { + @Override + public String getType() + { + return underlying.getType(); + } + + @Override + public Map<String, ConfiguredObjectRecord> getParents() + { + return underlying.getParents(); + } + + @Override + public UUID getId() + { + return underlying.getId(); + } + + @Override + public Map<String, Object> getAttributes() + { + Map<String, Object> copy = new HashMap<String, Object>(underlying.getAttributes()); + copy.remove(BDBHAVirtualHostNode.ROLE); + return copy; + } + }; + } + + private class ReplicaVirtualHost extends BDBHAVirtualHost + { ReplicaVirtualHost(Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode) { super(attributes, virtualHostNode); @@ -372,4 +622,5 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu return super.setState(currentState, desiredState); } } + } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java index d322717043..d296e9c2b7 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java @@ -25,14 +25,13 @@ import static org.mockito.Mockito.when; import java.io.File; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import com.sleepycat.je.rep.ReplicatedEnvironment; -import com.sleepycat.je.rep.ReplicationConfig; - import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutorImpl; import org.apache.qpid.server.model.Broker; @@ -43,20 +42,23 @@ import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; -import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; +import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.rep.ReplicationConfig; + public class BDBHAVirtualHostNodeTest extends QpidTestCase { private Broker<?> _broker; private File _bdbStorePath; - private VirtualHostNode<?> _virtualHostNode; private TaskExecutor _taskExecutor; + private final ConfiguredObjectFactory _objectFactory = BrokerModel.getInstance().getObjectFactory(); + private final Set<BDBHAVirtualHostNode<?>> _nodes = new HashSet<>(); @Override protected void setUp() throws Exception @@ -78,9 +80,24 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase { try { - if (_virtualHostNode != null) + Exception firstException = null; + for (VirtualHostNode<?> node : _nodes) { - _virtualHostNode.setDesiredState(_virtualHostNode.getState(), State.STOPPED); + try + { + node.setDesiredState(node.getState(), State.DELETED); + } + catch(Exception e) + { + if (firstException != null) + { + firstException = e; + } + } + if (firstException != null) + { + throw firstException; + } } } finally @@ -119,11 +136,7 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase attributes.put(BDBHAVirtualHostNode.REPLICATED_ENVIRONMENT_CONFIGURATION, Collections.singletonMap(ReplicationConfig.REP_STREAM_TIMEOUT, repStreamTimeout)); - ConfiguredObjectFactory objectFactory = BrokerModel.getInstance().getObjectFactory(); - ConfiguredObjectTypeFactory factory = objectFactory.getConfiguredObjectTypeFactory("VirtualHostNode", - "BDB_HA"); - - BDBHAVirtualHostNode<?> node = (BDBHAVirtualHostNode<?>) factory.create(null, attributes, _broker); + VirtualHostNode<?> node = createHaVHN(attributes); final CountDownLatch virtualHostAddedLatch = new CountDownLatch(1); final CountDownLatch virtualHostStateChangeLatch = new CountDownLatch(1); @@ -168,6 +181,7 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase assertEquals(groupName, environment.getGroup().getName()); assertEquals(nodeHostPort, replicationConfig.getNodeHostPort()); assertEquals(helperHostPort, replicationConfig.getHelperHosts()); + assertEquals(durability, environment.getConfig().getDurability().toString()); assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, replicationConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT)); @@ -191,6 +205,129 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase assertFalse("Store still exists", _bdbStorePath.exists()); } + public void testMutableAttributes() throws Exception + { + UUID id = UUID.randomUUID(); + String address = "localhost:" + findFreePort(); + + Map<String, Object> attributes = new HashMap<String, Object>(); + attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + attributes.put(BDBHAVirtualHostNode.ID, id); + attributes.put(BDBHAVirtualHostNode.NAME, "node"); + attributes.put(BDBHAVirtualHostNode.GROUP_NAME, "group"); + attributes.put(BDBHAVirtualHostNode.ADDRESS, address); + attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, address); + attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath); + + BDBHAVirtualHostNode<?> node = createHaVHN(attributes); + + assertEquals("Failed to activate node", State.ACTIVE, node.setDesiredState(node.getState(), State.ACTIVE)); + + BDBMessageStore bdbMessageStore = (BDBMessageStore) node.getConfigurationStore(); + ReplicatedEnvironment environment = (ReplicatedEnvironment) bdbMessageStore.getEnvironmentFacade().getEnvironment(); + + assertEquals("Unexpected node priority value before mutation", 1, environment.getRepMutableConfig().getNodePriority()); + assertFalse("Unexpected designated primary value before mutation", environment.getRepMutableConfig().getDesignatedPrimary()); + assertEquals("Unexpected electable group override value before mutation", 0, environment.getRepMutableConfig().getElectableGroupSizeOverride()); + + node.setAttribute(BDBHAVirtualHostNode.PRIORITY, 1, 2); + node.setAttribute(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, false, true); + node.setAttribute(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 0, 1); + + assertEquals("Unexpected node priority value after mutation", 2, environment.getRepMutableConfig().getNodePriority()); + assertTrue("Unexpected designated primary value after mutation", environment.getRepMutableConfig().getDesignatedPrimary()); + assertEquals("Unexpected electable group override value after mutation", 1, environment.getRepMutableConfig().getElectableGroupSizeOverride()); + + assertNotNull("Join time should be set", node.getJoinTime()); + assertNotNull("Last known replication transaction idshould be set", node.getLastKnownReplicationTransactionId()); + + } + + public void testTransferMasterToSelf() throws Exception + { + int node1PortNumber = findFreePort(); + String helperAddress = "localhost:" + node1PortNumber; + String groupName = "group"; + + Map<String, Object> node1Attributes = new HashMap<String, Object>(); + node1Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID()); + node1Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + node1Attributes.put(BDBHAVirtualHostNode.NAME, "node1"); + node1Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName); + node1Attributes.put(BDBHAVirtualHostNode.ADDRESS, helperAddress); + node1Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress); + node1Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "1"); + + BDBHAVirtualHostNode<?> node1 = createHaVHN(node1Attributes); + assertEquals("Failed to activate node", State.ACTIVE, node1.setDesiredState(node1.getState(), State.ACTIVE)); + + int node2PortNumber = getNextAvailable(node1PortNumber+1); + + Map<String, Object> node2Attributes = new HashMap<String, Object>(); + node2Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID()); + node2Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + node2Attributes.put(BDBHAVirtualHostNode.NAME, "node2"); + node2Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName); + node2Attributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + node2PortNumber); + node2Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress); + node2Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "2"); + + BDBHAVirtualHostNode<?> node2 = createHaVHN(node2Attributes); + assertEquals("Failed to activate node2", State.ACTIVE, node2.setDesiredState(node2.getState(), State.ACTIVE)); + + int node3PortNumber = getNextAvailable(node2PortNumber+1); + Map<String, Object> node3Attributes = new HashMap<String, Object>(); + node3Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID()); + node3Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + node3Attributes.put(BDBHAVirtualHostNode.NAME, "node3"); + node3Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName); + node3Attributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + node3PortNumber); + node3Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress); + node3Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "3"); + BDBHAVirtualHostNode<?> node3 = createHaVHN(node3Attributes); + assertEquals("Failed to activate node3", State.ACTIVE, node3.setDesiredState(node3.getState(), State.ACTIVE)); + + BDBHAVirtualHostNode<?> replica = null; + int findReplicaCount = 0; + while(replica == null) + { + for (BDBHAVirtualHostNode<?> node : _nodes) + { + if ("REPLICA".equals(node.getRole())) + { + replica = node; + break; + } + } + + Thread.sleep(100); + if (findReplicaCount > 20) + { + fail("Could not find a node is replica role"); + } + findReplicaCount++; + } + + replica.setAttribute(BDBHAVirtualHostNode.ROLE, "REPLICA", "MASTER"); + + int awaitMastershipCount = 0; + while(!"MASTER".equals(replica.getRole())) + { + Thread.sleep(100); + if (awaitMastershipCount > 20) + { + fail("Replica did not assume master role"); + } + awaitMastershipCount++; + } + } + + private BDBHAVirtualHostNode<?> createHaVHN(Map<String, Object> attributes) + { + BDBHAVirtualHostNode<?> node = (BDBHAVirtualHostNode<?>) _objectFactory.create(VirtualHostNode.class, attributes, _broker); + _nodes.add(node); + return node; + } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index b342493c59..1becfe3ede 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -40,6 +40,7 @@ import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.Durability; import com.sleepycat.je.Environment; +import com.sleepycat.je.rep.ReplicatedEnvironment; import com.sleepycat.je.rep.ReplicatedEnvironment.State; import com.sleepycat.je.rep.ReplicationConfig; import com.sleepycat.je.rep.StateChangeEvent; @@ -274,6 +275,123 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState()); } + public void testTransferMasterToSelf() throws Exception + { + final CountDownLatch firstNodeReplicaStateLatch = new CountDownLatch(1); + final CountDownLatch firstNodeMasterStateLatch = new CountDownLatch(1); + StateChangeListener stateChangeListener = new StateChangeListener(){ + + @Override + public void stateChange(StateChangeEvent event) throws RuntimeException + { + ReplicatedEnvironment.State state = event.getState(); + if (state == ReplicatedEnvironment.State.REPLICA) + { + firstNodeReplicaStateLatch.countDown(); + } + if (state == ReplicatedEnvironment.State.MASTER) + { + firstNodeMasterStateLatch.countDown(); + } + } + }; + ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener); + assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS)); + + int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); + String node1NodeHostPort = "localhost:" + replica1Port; + ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort); + assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState()); + + int replica2Port = getNextAvailable(replica1Port + 1); + String node2NodeHostPort = "localhost:" + replica2Port; + final CountDownLatch replicaStateLatch = new CountDownLatch(1); + final CountDownLatch masterStateLatch = new CountDownLatch(1); + StateChangeListener testStateChangeListener = new StateChangeListener() + { + @Override + public void stateChange(StateChangeEvent event) throws RuntimeException + { + ReplicatedEnvironment.State state = event.getState(); + if (state == ReplicatedEnvironment.State.REPLICA) + { + replicaStateLatch.countDown(); + } + if (state == ReplicatedEnvironment.State.MASTER) + { + masterStateLatch.countDown(); + } + } + }; + ReplicatedEnvironmentFacade thirdNode = addNode(TEST_NODE_NAME + "_2", node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener); + assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS)); + assertEquals(3, thirdNode.getNumberOfElectableGroupMembers()); + + thirdNode.transferMasterToSelfAsynchronously(); + assertTrue("Environment did not become a master", masterStateLatch.await(10, TimeUnit.SECONDS)); + assertTrue("First node environment did not become a replica", firstNodeReplicaStateLatch.await(10, TimeUnit.SECONDS)); + assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState()); + } + + public void testTransferMasterAnotherNode() throws Exception + { + final CountDownLatch firstNodeReplicaStateLatch = new CountDownLatch(1); + final CountDownLatch firstNodeMasterStateLatch = new CountDownLatch(1); + StateChangeListener stateChangeListener = new StateChangeListener(){ + + @Override + public void stateChange(StateChangeEvent event) throws RuntimeException + { + ReplicatedEnvironment.State state = event.getState(); + if (state == ReplicatedEnvironment.State.REPLICA) + { + firstNodeReplicaStateLatch.countDown(); + } + if (state == ReplicatedEnvironment.State.MASTER) + { + firstNodeMasterStateLatch.countDown(); + } + } + }; + ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener); + assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS)); + + int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); + String node1NodeHostPort = "localhost:" + replica1Port; + ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort); + assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState()); + + int replica2Port = getNextAvailable(replica1Port + 1); + String node2NodeHostPort = "localhost:" + replica2Port; + final CountDownLatch replicaStateLatch = new CountDownLatch(1); + final CountDownLatch masterStateLatch = new CountDownLatch(1); + StateChangeListener testStateChangeListener = new StateChangeListener() + { + @Override + public void stateChange(StateChangeEvent event) throws RuntimeException + { + ReplicatedEnvironment.State state = event.getState(); + if (state == ReplicatedEnvironment.State.REPLICA) + { + replicaStateLatch.countDown(); + } + if (state == ReplicatedEnvironment.State.MASTER) + { + masterStateLatch.countDown(); + } + } + }; + String thirdNodeName = TEST_NODE_NAME + "_2"; + ReplicatedEnvironmentFacade thirdNode = addNode(thirdNodeName, node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener); + assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS)); + assertEquals(3, thirdNode.getNumberOfElectableGroupMembers()); + + firstNode.transferMasterAsynchronously(thirdNodeName); + assertTrue("Environment did not become a master", masterStateLatch.await(10, TimeUnit.SECONDS)); + assertTrue("First node environment did not become a replica", firstNodeReplicaStateLatch.await(10, TimeUnit.SECONDS)); + assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState()); + } + private ReplicatedEnvironmentFacade createMaster() throws Exception { TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); |
