summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-04-29 08:29:28 +0000
committerKeith Wall <kwall@apache.org>2014-04-29 08:29:28 +0000
commit5953ca009492eb4d40a63960ea1d8f1854351548 (patch)
tree4076c42aeeed98e4bdd668fb941b8360596a8aeb /qpid/java
parent1eecf05ef31eebe90631208ba1bf005167b9f234 (diff)
downloadqpid-python-5953ca009492eb4d40a63960ea1d8f1854351548.tar.gz
QPID-5715: [Java Broker]: Wire up the BDB HA VirtualHostNode to the ReplicatedEnvironmentFacade.
* Attributes priority, quorumOverride, designatedPrimary are exposed as read/write attributes. * Attribute role is readable (to observe the current role of the node), and writable, to request a change in mastership. * Attributes joinTime and lastKnownReplicationTransactionId are exposed as derived attributes. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1590917 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java73
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java12
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java269
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java161
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java118
5 files changed, 595 insertions, 38 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 0b00800b04..0f839ea02d 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -81,14 +81,17 @@ import com.sleepycat.je.utilint.VLSN;
public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener
{
+ public static final String MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.master_transfer_interval";
public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.db_ping_socket_timeout";
public static final String REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME = "qpid.bdb.ha.remote_node_monitor_interval";
private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacade.class);
+ private static final int DEFAULT_MASTER_TRANSFER_TIMEOUT = 1000 * 60;
private static final int DEFAULT_DB_PING_SOCKET_TIMEOUT = 1000;
private static final int DEFAULT_REMOTE_NODE_MONITOR_INTERVAL = 1000;
+ private static final int MASTER_TRANSFER_TIMEOUT = Integer.getInteger(MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME, DEFAULT_MASTER_TRANSFER_TIMEOUT);
private static final int DB_PING_SOCKET_TIMEOUT = Integer.getInteger(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, DEFAULT_DB_PING_SOCKET_TIMEOUT);
private static final int REMOTE_NODE_MONITOR_INTERVAL = Integer.getInteger(REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, DEFAULT_REMOTE_NODE_MONITOR_INTERVAL);
@@ -145,14 +148,13 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private final AtomicReference<State> _state = new AtomicReference<State>(State.OPENING);
private final ConcurrentMap<String, DatabaseHolder> _databases = new ConcurrentHashMap<String, DatabaseHolder>();
private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>();
+ private final AtomicBoolean _initialised;
+ private final EnvironmentFacadeTask[] _initialisationTasks;
private volatile ReplicatedEnvironment _environment;
private volatile long _joinTime;
private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
- private AtomicBoolean _initialised;
- private EnvironmentFacadeTask[] _initialisationTasks;
-
public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, EnvironmentFacadeTask[] initialisationTasks)
{
_environmentDirectory = new File(configuration.getStorePath());
@@ -214,8 +216,14 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
shutdownAndAwaitExecutorService(_environmentJobExecutor);
shutdownAndAwaitExecutorService(_groupChangeExecutor);
- closeDatabases();
- closeEnvironment();
+ try
+ {
+ closeDatabases();
+ }
+ finally
+ {
+ closeEnvironment();
+ }
}
finally
{
@@ -634,10 +642,52 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
+ public Future<Void> transferMasterToSelfAsynchronously()
+ {
+ final String nodeName = getNodeName();
+ return transferMasterAsynchronously(nodeName);
+ }
+
+ public Future<Void> transferMasterAsynchronously(final String nodeName)
+ {
+ return _groupChangeExecutor.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ try
+ {
+ ReplicationGroupAdmin admin = createReplicationGroupAdmin();
+ String newMaster = admin.transferMaster(Collections.singleton(nodeName), MASTER_TRANSFER_TIMEOUT, TimeUnit.MILLISECONDS, true);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("The mastership has been transfered to " + newMaster);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ LOGGER.warn("Exception on transfering the mastership to " + _prettyGroupNodeName
+ + " Master transfer timeout : " + MASTER_TRANSFER_TIMEOUT, e);
+ throw e;
+ }
+ return null;
+ }
+ });
+ }
+
+ public void removeNodeFromGroup(final String nodeName)
+ {
+ createReplicationGroupAdmin().removeMember(nodeName);
+ }
+
+ public void updateAddress(final String nodeName, final String newHostName, final int newPort)
+ {
+ createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort);
+ }
public long getJoinTime()
{
- return _joinTime ;
+ return _joinTime;
}
public long getLastKnownReplicationTransactionId()
@@ -669,16 +719,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
return members;
}
- public void removeNodeFromGroup(final String nodeName)
- {
- createReplicationGroupAdmin().removeMember(nodeName);
- }
-
- public void updateAddress(final String nodeName, final String newHostName, final int newPort)
- {
- createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort);
- }
-
private ReplicationGroupAdmin createReplicationGroupAdmin()
{
final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>();
@@ -690,7 +730,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
return new ReplicationGroupAdmin(_configuration.getGroupName(), helpers);
}
-
public ReplicatedEnvironment getEnvironment()
{
return _environment;
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
index f0325b24f8..0e92ac83de 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
@@ -34,7 +34,10 @@ public interface BDBHAVirtualHostNode<X extends BDBHAVirtualHostNode<X>> extends
public static final String DESIGNATED_PRIMARY = "designatedPrimary";
public static final String PRIORITY = "priority";
public static final String QUORUM_OVERRIDE = "quorumOverride";
+ public static final String ROLE = "role";
public static final String REPLICATED_ENVIRONMENT_CONFIGURATION = "replicatedEnvironmentConfiguration";
+ public static final String LAST_KNOWN_REPLICATION_TRANSACTION_ID = "lastKnownReplicationTransactionId";
+ public static final String JOIN_TIME = "joinTime";
@ManagedAttribute(automate = true, mandatory=true)
String getGroupName();
@@ -61,5 +64,14 @@ public interface BDBHAVirtualHostNode<X extends BDBHAVirtualHostNode<X>> extends
int getQuorumOverride();
@ManagedAttribute(automate = true)
+ String getRole();
+
+ @ManagedAttribute(automate = true)
Map<String, String> getReplicatedEnvironmentConfiguration();
+
+ @ManagedAttribute(derived = true)
+ Long getLastKnownReplicationTransactionId();
+
+ @ManagedAttribute(derived = true)
+ Long getJoinTime();
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
index 8b4948da08..8b2dce4168 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
@@ -23,13 +23,16 @@ package org.apache.qpid.server.virtualhostnode.berkeleydb;
import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
-import com.sleepycat.je.rep.StateChangeEvent;
-import com.sleepycat.je.rep.StateChangeListener;
import org.apache.log4j.Logger;
-
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
@@ -42,20 +45,34 @@ import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.VirtualHostStoreUpgraderAndRecoverer;
import org.apache.qpid.server.store.berkeleydb.BDBHAVirtualHost;
import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostState;
import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+
@ManagedObject( category = false, type = "BDB_HA" )
public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtualHostNodeImpl> implements BDBHAVirtualHostNode<BDBHAVirtualHostNodeImpl>
{
+ /**
+ * Length of time we synchronously await the a JE mutation to complete. It is not considered an error if we exceed this timeout, although a
+ * a warning will be logged.
+ */
+ private static final int MUTATE_JE_TIMEOUT_MS = 100;
+
private static final Logger LOGGER = Logger.getLogger(BDBHAVirtualHostNodeImpl.class);
+ private final AtomicReference<ReplicatedEnvironmentFacade> _environmentFacade = new AtomicReference<>();
+
@ManagedAttributeField
private Map<String, String> _environmentConfiguration;
@@ -77,18 +94,22 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
@ManagedAttributeField
private boolean _coalescingSync;
- @ManagedAttributeField
+ @ManagedAttributeField(afterSet="postSetDesignatedPrimary")
private boolean _designatedPrimary;
- @ManagedAttributeField
+ @ManagedAttributeField(afterSet="postSetPriority")
private int _priority;
- @ManagedAttributeField
+ @ManagedAttributeField(afterSet="postSetQuorumOverride")
private int _quorumOverride;
+ @ManagedAttributeField(beforeSet="preSetRole", afterSet="postSetRole")
+ private String _role;
+
@ManagedAttributeField
private Map<String, String> _replicatedEnvironmentConfiguration;
+
@ManagedObjectFactoryConstructor
public BDBHAVirtualHostNodeImpl(Map<String, Object> attributes, Broker<?> broker)
{
@@ -162,6 +183,39 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
@Override
+ public String getRole()
+ {
+ ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ if (environmentFacade != null)
+ {
+ return environmentFacade.getNodeState();
+ }
+ return "UNKNOWN";
+ }
+
+ @Override
+ public Long getLastKnownReplicationTransactionId()
+ {
+ ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ if (environmentFacade != null)
+ {
+ return environmentFacade.getLastKnownReplicationTransactionId();
+ }
+ return -1L;
+ }
+
+ @Override
+ public Long getJoinTime()
+ {
+ ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ if (environmentFacade != null)
+ {
+ return environmentFacade.getJoinTime();
+ }
+ return -1L;
+ }
+
+ @Override
public Map<String, String> getReplicatedEnvironmentConfiguration()
{
return _replicatedEnvironmentConfiguration;
@@ -171,7 +225,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
public String toString()
{
return "BDBHAVirtualHostNodeImpl [id=" + getId() + ", name=" + getName() + ", storePath=" + _storePath + ", groupName=" + _groupName + ", address=" + _address
- + ", state=" + getState() + "]";
+ + ", state=" + getState() + ", priority=" + _priority + ", designatedPrimary=" + _designatedPrimary + ", designatedPrimary=" + _quorumOverride + "]";
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -223,9 +277,26 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.CREATED());
getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.STORE_LOCATION(getStorePath()));
-
ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) getConfigurationStore().getEnvironmentFacade();
environmentFacade.setStateChangeListener(new BDBHAMessageStoreStateChangeListener());
+ _environmentFacade.set(environmentFacade);
+ }
+
+ @Override
+ protected void stop()
+ {
+ try
+ {
+ super.stop();
+ }
+ finally
+ {
+ ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ if (_environmentFacade.compareAndSet(environmentFacade, null))
+ {
+ environmentFacade.close();
+ }
+ }
}
private void onMaster()
@@ -348,9 +419,188 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
}
- private class ReplicaVirtualHost extends BDBHAVirtualHost
+ // used as post action by field _priority
+ @SuppressWarnings("unused")
+ private void postSetPriority()
+ {
+ ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ if (environmentFacade != null)
+ {
+ try
+ {
+ environmentFacade.setPriority(_priority).get(MUTATE_JE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Node priority changed. " + this);
+ }
+ }
+ catch (TimeoutException e)
+ {
+ LOGGER.warn("Change node priority did not complete within " + MUTATE_JE_TIMEOUT_MS + "ms. New value " + _priority + " will become effective once the JE task thread is free.");
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e)
+ {
+ throw new ServerScopedRuntimeException("Failed to set priority node to value " + _priority + " on " + this, e);
+ }
+ }
+ }
+
+ // used as post action by field _designatedPrimary
+ @SuppressWarnings("unused")
+ private void postSetDesignatedPrimary()
+ {
+ ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ if (environmentFacade != null)
+ {
+ try
+ {
+ environmentFacade.setDesignatedPrimary(_designatedPrimary).get(MUTATE_JE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Designated primary changed. " + this);
+ }
+ }
+ catch (TimeoutException e)
+ {
+ LOGGER.warn("Change designated primary did not complete within " + MUTATE_JE_TIMEOUT_MS + "ms. New value " + _designatedPrimary + " will become effective once the JE task thread is free.");
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e)
+ {
+ throw new ServerScopedRuntimeException("Failed to set designated primary to value " + _designatedPrimary + " on " + this, e);
+ }
+ }
+ }
+
+ // used as post action by field _quorumOverride
+ @SuppressWarnings("unused")
+ private void postSetQuorumOverride()
+ {
+ ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ if (environmentFacade != null)
+ {
+ try
+ {
+ environmentFacade.setElectableGroupSizeOverride(_quorumOverride).get(MUTATE_JE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Quorum override changed. " + this);
+ }
+ }
+ catch (TimeoutException e)
+ {
+ LOGGER.warn("Change quorum override did not complete within " + MUTATE_JE_TIMEOUT_MS + "ms. New value " + _durability + " will become effective once the JE task thread is free.");
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e)
+ {
+ throw new ServerScopedRuntimeException("Failed to set quorum override to value " + _quorumOverride + " on " + this, e);
+ }
+ }
+ }
+
+ // used as pre action by field _role
+ @SuppressWarnings("unused")
+ private void preSetRole()
+ {
+ ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ if (environmentFacade != null)
+ {
+ String currentRole = environmentFacade.getNodeState();
+ if (!ReplicatedEnvironment.State.REPLICA.name().equals(currentRole))
+ {
+ throw new IllegalConfigurationException("Cannot transfer mastership when node is not in a replica role."
+ + "Current role is " + currentRole);
+ }
+ }
+ else
+ {
+ // Ignored
+ }
+ }
+
+ // used as post action by field _role
+ @SuppressWarnings("unused")
+ private void postSetRole()
+ {
+ ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ if (environmentFacade != null)
+ {
+ try
+ {
+ environmentFacade.transferMasterToSelfAsynchronously().get(MUTATE_JE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Requested master transfer to self. " + this);
+ }
+ }
+ catch (TimeoutException e)
+ {
+ LOGGER.warn("Transfer master did not complete within " + MUTATE_JE_TIMEOUT_MS + "ms. Node may still be elected master at a later time.");
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e)
+ {
+ throw new ServerScopedRuntimeException("Failed to transfer master to " + this, e);
+ }
+ }
+ else
+ {
+ // Ignored
+ }
+ }
+
+ // TODO - need a better way of suppressing the persistence of the role field.
+ @Override
+ public ConfiguredObjectRecord asObjectRecord()
{
+ final ConfiguredObjectRecord underlying = super.asObjectRecord();
+ return new ConfiguredObjectRecord()
+ {
+ @Override
+ public String getType()
+ {
+ return underlying.getType();
+ }
+
+ @Override
+ public Map<String, ConfiguredObjectRecord> getParents()
+ {
+ return underlying.getParents();
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return underlying.getId();
+ }
+
+ @Override
+ public Map<String, Object> getAttributes()
+ {
+ Map<String, Object> copy = new HashMap<String, Object>(underlying.getAttributes());
+ copy.remove(BDBHAVirtualHostNode.ROLE);
+ return copy;
+ }
+ };
+ }
+
+ private class ReplicaVirtualHost extends BDBHAVirtualHost
+ {
ReplicaVirtualHost(Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
{
super(attributes, virtualHostNode);
@@ -372,4 +622,5 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
return super.setState(currentState, desiredState);
}
}
+
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
index d322717043..d296e9c2b7 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
@@ -25,14 +25,13 @@ import static org.mockito.Mockito.when;
import java.io.File;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import com.sleepycat.je.rep.ReplicatedEnvironment;
-import com.sleepycat.je.rep.ReplicationConfig;
-
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
import org.apache.qpid.server.model.Broker;
@@ -43,20 +42,23 @@ import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
+
public class BDBHAVirtualHostNodeTest extends QpidTestCase
{
private Broker<?> _broker;
private File _bdbStorePath;
- private VirtualHostNode<?> _virtualHostNode;
private TaskExecutor _taskExecutor;
+ private final ConfiguredObjectFactory _objectFactory = BrokerModel.getInstance().getObjectFactory();
+ private final Set<BDBHAVirtualHostNode<?>> _nodes = new HashSet<>();
@Override
protected void setUp() throws Exception
@@ -78,9 +80,24 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
{
try
{
- if (_virtualHostNode != null)
+ Exception firstException = null;
+ for (VirtualHostNode<?> node : _nodes)
{
- _virtualHostNode.setDesiredState(_virtualHostNode.getState(), State.STOPPED);
+ try
+ {
+ node.setDesiredState(node.getState(), State.DELETED);
+ }
+ catch(Exception e)
+ {
+ if (firstException != null)
+ {
+ firstException = e;
+ }
+ }
+ if (firstException != null)
+ {
+ throw firstException;
+ }
}
}
finally
@@ -119,11 +136,7 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
attributes.put(BDBHAVirtualHostNode.REPLICATED_ENVIRONMENT_CONFIGURATION,
Collections.singletonMap(ReplicationConfig.REP_STREAM_TIMEOUT, repStreamTimeout));
- ConfiguredObjectFactory objectFactory = BrokerModel.getInstance().getObjectFactory();
- ConfiguredObjectTypeFactory factory = objectFactory.getConfiguredObjectTypeFactory("VirtualHostNode",
- "BDB_HA");
-
- BDBHAVirtualHostNode<?> node = (BDBHAVirtualHostNode<?>) factory.create(null, attributes, _broker);
+ VirtualHostNode<?> node = createHaVHN(attributes);
final CountDownLatch virtualHostAddedLatch = new CountDownLatch(1);
final CountDownLatch virtualHostStateChangeLatch = new CountDownLatch(1);
@@ -168,6 +181,7 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
assertEquals(groupName, environment.getGroup().getName());
assertEquals(nodeHostPort, replicationConfig.getNodeHostPort());
assertEquals(helperHostPort, replicationConfig.getHelperHosts());
+
assertEquals(durability, environment.getConfig().getDurability().toString());
assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, replicationConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT));
@@ -191,6 +205,129 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
assertFalse("Store still exists", _bdbStorePath.exists());
}
+ public void testMutableAttributes() throws Exception
+ {
+ UUID id = UUID.randomUUID();
+ String address = "localhost:" + findFreePort();
+
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+ attributes.put(BDBHAVirtualHostNode.ID, id);
+ attributes.put(BDBHAVirtualHostNode.NAME, "node");
+ attributes.put(BDBHAVirtualHostNode.GROUP_NAME, "group");
+ attributes.put(BDBHAVirtualHostNode.ADDRESS, address);
+ attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, address);
+ attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath);
+
+ BDBHAVirtualHostNode<?> node = createHaVHN(attributes);
+
+ assertEquals("Failed to activate node", State.ACTIVE, node.setDesiredState(node.getState(), State.ACTIVE));
+
+ BDBMessageStore bdbMessageStore = (BDBMessageStore) node.getConfigurationStore();
+ ReplicatedEnvironment environment = (ReplicatedEnvironment) bdbMessageStore.getEnvironmentFacade().getEnvironment();
+
+ assertEquals("Unexpected node priority value before mutation", 1, environment.getRepMutableConfig().getNodePriority());
+ assertFalse("Unexpected designated primary value before mutation", environment.getRepMutableConfig().getDesignatedPrimary());
+ assertEquals("Unexpected electable group override value before mutation", 0, environment.getRepMutableConfig().getElectableGroupSizeOverride());
+
+ node.setAttribute(BDBHAVirtualHostNode.PRIORITY, 1, 2);
+ node.setAttribute(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, false, true);
+ node.setAttribute(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 0, 1);
+
+ assertEquals("Unexpected node priority value after mutation", 2, environment.getRepMutableConfig().getNodePriority());
+ assertTrue("Unexpected designated primary value after mutation", environment.getRepMutableConfig().getDesignatedPrimary());
+ assertEquals("Unexpected electable group override value after mutation", 1, environment.getRepMutableConfig().getElectableGroupSizeOverride());
+
+ assertNotNull("Join time should be set", node.getJoinTime());
+ assertNotNull("Last known replication transaction idshould be set", node.getLastKnownReplicationTransactionId());
+
+ }
+
+ public void testTransferMasterToSelf() throws Exception
+ {
+ int node1PortNumber = findFreePort();
+ String helperAddress = "localhost:" + node1PortNumber;
+ String groupName = "group";
+
+ Map<String, Object> node1Attributes = new HashMap<String, Object>();
+ node1Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
+ node1Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+ node1Attributes.put(BDBHAVirtualHostNode.NAME, "node1");
+ node1Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+ node1Attributes.put(BDBHAVirtualHostNode.ADDRESS, helperAddress);
+ node1Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
+ node1Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "1");
+
+ BDBHAVirtualHostNode<?> node1 = createHaVHN(node1Attributes);
+ assertEquals("Failed to activate node", State.ACTIVE, node1.setDesiredState(node1.getState(), State.ACTIVE));
+
+ int node2PortNumber = getNextAvailable(node1PortNumber+1);
+
+ Map<String, Object> node2Attributes = new HashMap<String, Object>();
+ node2Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
+ node2Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+ node2Attributes.put(BDBHAVirtualHostNode.NAME, "node2");
+ node2Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+ node2Attributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + node2PortNumber);
+ node2Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
+ node2Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "2");
+
+ BDBHAVirtualHostNode<?> node2 = createHaVHN(node2Attributes);
+ assertEquals("Failed to activate node2", State.ACTIVE, node2.setDesiredState(node2.getState(), State.ACTIVE));
+
+ int node3PortNumber = getNextAvailable(node2PortNumber+1);
+ Map<String, Object> node3Attributes = new HashMap<String, Object>();
+ node3Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
+ node3Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+ node3Attributes.put(BDBHAVirtualHostNode.NAME, "node3");
+ node3Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+ node3Attributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + node3PortNumber);
+ node3Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
+ node3Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "3");
+ BDBHAVirtualHostNode<?> node3 = createHaVHN(node3Attributes);
+ assertEquals("Failed to activate node3", State.ACTIVE, node3.setDesiredState(node3.getState(), State.ACTIVE));
+
+ BDBHAVirtualHostNode<?> replica = null;
+ int findReplicaCount = 0;
+ while(replica == null)
+ {
+ for (BDBHAVirtualHostNode<?> node : _nodes)
+ {
+ if ("REPLICA".equals(node.getRole()))
+ {
+ replica = node;
+ break;
+ }
+ }
+
+ Thread.sleep(100);
+ if (findReplicaCount > 20)
+ {
+ fail("Could not find a node is replica role");
+ }
+ findReplicaCount++;
+ }
+
+ replica.setAttribute(BDBHAVirtualHostNode.ROLE, "REPLICA", "MASTER");
+
+ int awaitMastershipCount = 0;
+ while(!"MASTER".equals(replica.getRole()))
+ {
+ Thread.sleep(100);
+ if (awaitMastershipCount > 20)
+ {
+ fail("Replica did not assume master role");
+ }
+ awaitMastershipCount++;
+ }
+ }
+
+ private BDBHAVirtualHostNode<?> createHaVHN(Map<String, Object> attributes)
+ {
+ BDBHAVirtualHostNode<?> node = (BDBHAVirtualHostNode<?>) _objectFactory.create(VirtualHostNode.class, attributes, _broker);
+ _nodes.add(node);
+ return node;
+ }
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index b342493c59..1becfe3ede 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -40,6 +40,7 @@ import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.Durability;
import com.sleepycat.je.Environment;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicatedEnvironment.State;
import com.sleepycat.je.rep.ReplicationConfig;
import com.sleepycat.je.rep.StateChangeEvent;
@@ -274,6 +275,123 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState());
}
+ public void testTransferMasterToSelf() throws Exception
+ {
+ final CountDownLatch firstNodeReplicaStateLatch = new CountDownLatch(1);
+ final CountDownLatch firstNodeMasterStateLatch = new CountDownLatch(1);
+ StateChangeListener stateChangeListener = new StateChangeListener(){
+
+ @Override
+ public void stateChange(StateChangeEvent event) throws RuntimeException
+ {
+ ReplicatedEnvironment.State state = event.getState();
+ if (state == ReplicatedEnvironment.State.REPLICA)
+ {
+ firstNodeReplicaStateLatch.countDown();
+ }
+ if (state == ReplicatedEnvironment.State.MASTER)
+ {
+ firstNodeMasterStateLatch.countDown();
+ }
+ }
+ };
+ ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener);
+ assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS));
+
+ int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+ String node1NodeHostPort = "localhost:" + replica1Port;
+ ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort);
+ assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState());
+
+ int replica2Port = getNextAvailable(replica1Port + 1);
+ String node2NodeHostPort = "localhost:" + replica2Port;
+ final CountDownLatch replicaStateLatch = new CountDownLatch(1);
+ final CountDownLatch masterStateLatch = new CountDownLatch(1);
+ StateChangeListener testStateChangeListener = new StateChangeListener()
+ {
+ @Override
+ public void stateChange(StateChangeEvent event) throws RuntimeException
+ {
+ ReplicatedEnvironment.State state = event.getState();
+ if (state == ReplicatedEnvironment.State.REPLICA)
+ {
+ replicaStateLatch.countDown();
+ }
+ if (state == ReplicatedEnvironment.State.MASTER)
+ {
+ masterStateLatch.countDown();
+ }
+ }
+ };
+ ReplicatedEnvironmentFacade thirdNode = addNode(TEST_NODE_NAME + "_2", node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener);
+ assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS));
+ assertEquals(3, thirdNode.getNumberOfElectableGroupMembers());
+
+ thirdNode.transferMasterToSelfAsynchronously();
+ assertTrue("Environment did not become a master", masterStateLatch.await(10, TimeUnit.SECONDS));
+ assertTrue("First node environment did not become a replica", firstNodeReplicaStateLatch.await(10, TimeUnit.SECONDS));
+ assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState());
+ }
+
+ public void testTransferMasterAnotherNode() throws Exception
+ {
+ final CountDownLatch firstNodeReplicaStateLatch = new CountDownLatch(1);
+ final CountDownLatch firstNodeMasterStateLatch = new CountDownLatch(1);
+ StateChangeListener stateChangeListener = new StateChangeListener(){
+
+ @Override
+ public void stateChange(StateChangeEvent event) throws RuntimeException
+ {
+ ReplicatedEnvironment.State state = event.getState();
+ if (state == ReplicatedEnvironment.State.REPLICA)
+ {
+ firstNodeReplicaStateLatch.countDown();
+ }
+ if (state == ReplicatedEnvironment.State.MASTER)
+ {
+ firstNodeMasterStateLatch.countDown();
+ }
+ }
+ };
+ ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener);
+ assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS));
+
+ int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+ String node1NodeHostPort = "localhost:" + replica1Port;
+ ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort);
+ assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState());
+
+ int replica2Port = getNextAvailable(replica1Port + 1);
+ String node2NodeHostPort = "localhost:" + replica2Port;
+ final CountDownLatch replicaStateLatch = new CountDownLatch(1);
+ final CountDownLatch masterStateLatch = new CountDownLatch(1);
+ StateChangeListener testStateChangeListener = new StateChangeListener()
+ {
+ @Override
+ public void stateChange(StateChangeEvent event) throws RuntimeException
+ {
+ ReplicatedEnvironment.State state = event.getState();
+ if (state == ReplicatedEnvironment.State.REPLICA)
+ {
+ replicaStateLatch.countDown();
+ }
+ if (state == ReplicatedEnvironment.State.MASTER)
+ {
+ masterStateLatch.countDown();
+ }
+ }
+ };
+ String thirdNodeName = TEST_NODE_NAME + "_2";
+ ReplicatedEnvironmentFacade thirdNode = addNode(thirdNodeName, node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener);
+ assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS));
+ assertEquals(3, thirdNode.getNumberOfElectableGroupMembers());
+
+ firstNode.transferMasterAsynchronously(thirdNodeName);
+ assertTrue("Environment did not become a master", masterStateLatch.await(10, TimeUnit.SECONDS));
+ assertTrue("First node environment did not become a replica", firstNodeReplicaStateLatch.await(10, TimeUnit.SECONDS));
+ assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState());
+ }
+
private ReplicatedEnvironmentFacade createMaster() throws Exception
{
TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);