summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src/main
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-05-17 17:26:04 +0000
committerRobert Gemmell <robbie@apache.org>2012-05-17 17:26:04 +0000
commitf5d67044a9797c397764a7ac1aa1a1ed4aa893a3 (patch)
treecf8a9cf6a5f741e31417ca4d32a6b708bb3b9fdd /qpid/java/bdbstore/src/main
parentf523b9e510fc90ce3f7f7d7c2960f3bfee3d42df (diff)
downloadqpid-python-f5d67044a9797c397764a7ac1aa1a1ed4aa893a3.tar.gz
QPID-4006: add support for using BDB HA to form an active-passive cluster for persistent messaging
- Includes support for setting BDB configuration parameters via the store configuration, both for the existing store and the new HA variant. - Removes the MessageStoreFactory and reverts store configuration to historical values. Applied patch from Keith Wall, Andrew MacBean <andymacbean@gmail.com>, Oleksandr Rudyy <orudyy@gmail.com>, Philip Harvey <phil@philharveyonline.com>, and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1339728 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/main')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java120
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java490
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java184
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java253
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java40
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java227
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java75
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java8
8 files changed, 1043 insertions, 354 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index a9ae4eb16d..789d5714c8 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -33,11 +33,12 @@ import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockConflictException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.TransactionConfig;
import java.io.File;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -45,6 +46,7 @@ import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.FieldTable;
@@ -128,8 +130,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore
protected final StateManager _stateManager;
- protected TransactionConfig _transactionConfig = new TransactionConfig();
-
private MessageStoreRecoveryHandler _messageRecoveryHandler;
private TransactionLogRecoveryHandler _tlogRecoveryHandler;
@@ -146,6 +146,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper();
+ private Map<String, String> _envConfigMap;
+
public AbstractBDBMessageStore()
{
_stateManager = new StateManager(_eventManager);
@@ -161,7 +163,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
ConfigurationRecoveryHandler recoveryHandler,
Configuration storeConfiguration) throws Exception
{
- _stateManager.attainState(State.CONFIGURING);
+ _stateManager.attainState(State.INITIALISING);
_configRecoveryHandler = recoveryHandler;
@@ -179,12 +181,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
_messageRecoveryHandler = messageRecoveryHandler;
_tlogRecoveryHandler = tlogRecoveryHandler;
- _stateManager.attainState(State.CONFIGURED);
+ _stateManager.attainState(State.INITIALISED);
}
- public void activate() throws Exception
+ public synchronized void activate() throws Exception
{
- _stateManager.attainState(State.RECOVERING);
+ _stateManager.attainState(State.ACTIVATING);
recoverConfig(_configRecoveryHandler);
recoverMessages(_messageRecoveryHandler);
@@ -231,11 +233,30 @@ public abstract class AbstractBDBMessageStore implements MessageStore
_storeLocation = storeLocation;
+ _envConfigMap = getConfigMap(storeConfig, "envConfig");
+
LOGGER.info("Configuring BDB message store");
setupStore(environmentPath, name);
}
+ protected Map<String,String> getConfigMap(Configuration config, String prefix) throws ConfigurationException
+ {
+ final List<Object> argumentNames = config.getList(prefix + ".name");
+ final List<Object> argumentValues = config.getList(prefix + ".value");
+ final Map<String,String> attributes = new HashMap<String,String>(argumentNames.size());
+
+ for (int i = 0; i < argumentNames.size(); i++)
+ {
+ final String argName = argumentNames.get(i).toString();
+ final String argValue = argumentValues.get(i).toString();
+
+ attributes.put(argName, argValue);
+ }
+
+ return Collections.unmodifiableMap(attributes);
+ }
+
@Override
public String getStoreLocation()
{
@@ -251,9 +272,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore
*/
void startWithNoRecover() throws AMQStoreException
{
- _stateManager.attainState(State.CONFIGURING);
- _stateManager.attainState(State.CONFIGURED);
- _stateManager.attainState(State.RECOVERING);
+ _stateManager.attainState(State.INITIALISING);
+ _stateManager.attainState(State.INITIALISED);
+ _stateManager.attainState(State.ACTIVATING);
_stateManager.attainState(State.ACTIVE);
}
@@ -268,51 +289,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
_totalStoreSize = getSizeOnDisk();
}
- protected Environment createEnvironment(File environmentPath) throws DatabaseException
- {
- LOGGER.info("BDB message store using environment path " + environmentPath.getAbsolutePath());
- EnvironmentConfig envConfig = new EnvironmentConfig();
- // This is what allows the creation of the store if it does not already exist.
- envConfig.setAllowCreate(true);
- envConfig.setTransactional(true);
- envConfig.setConfigParam("je.lock.nLockTables", "7");
-
- // Added to help diagnosis of Deadlock issue
- // http://www.oracle.com/technology/products/berkeley-db/faq/je_faq.html#23
- if (Boolean.getBoolean("qpid.bdb.lock.debug"))
- {
- envConfig.setConfigParam("je.txn.deadlockStackTrace", "true");
- envConfig.setConfigParam("je.txn.dumpLocks", "true");
- }
-
- // Set transaction mode
- _transactionConfig.setReadCommitted(true);
-
- //This prevents background threads running which will potentially update the store.
- envConfig.setReadOnly(false);
- try
- {
- return new Environment(environmentPath, envConfig);
- }
- catch (DatabaseException de)
- {
- if (de.getMessage().contains("Environment.setAllowCreate is false"))
- {
- //Allow the creation this time
- envConfig.setAllowCreate(true);
- if (_environment != null )
- {
- _environment.cleanLog();
- _environment.close();
- }
- return new Environment(environmentPath, envConfig);
- }
- else
- {
- throw de;
- }
- }
- }
+ protected abstract Environment createEnvironment(File environmentPath) throws DatabaseException;
public Environment getEnvironment()
{
@@ -352,14 +329,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore
*/
public void close() throws Exception
{
- if (_stateManager.isInState(State.ACTIVE) || _stateManager.isInState(State.QUIESCED))
- {
- _stateManager.stateTransition(State.ACTIVE, State.CLOSING);
-
- closeInternal();
-
- _stateManager.stateTransition(State.CLOSING, State.CLOSED);
- }
+ _stateManager.attainState(State.CLOSING);
+ closeInternal();
+ _stateManager.attainState(State.CLOSED);
}
protected void closeInternal() throws Exception
@@ -1504,6 +1476,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
return true;
}
+ @SuppressWarnings("unchecked")
public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
{
if(metaData.isPersistent())
@@ -1914,4 +1887,25 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
return _persistentSizeHighThreshold;
}
+
+ private void setEnvironmentConfigProperties(EnvironmentConfig envConfig)
+ {
+ for (Map.Entry<String, String> configItem : _envConfigMap.entrySet())
+ {
+ LOGGER.debug("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
+ envConfig.setConfigParam(configItem.getKey(), configItem.getValue());
+ }
+ }
+
+ protected EnvironmentConfig createEnvironmentConfig()
+ {
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setAllowCreate(true);
+ envConfig.setTransactional(true);
+ envConfig.setConfigParam(EnvironmentConfig.LOCK_N_LOCK_TABLES, "7");
+
+ setEnvironmentConfigProperties(envConfig);
+
+ return envConfig;
+ }
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
new file mode 100644
index 0000000000..f887c8ce36
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.AbstractActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.store.HAMessageStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
+import org.apache.qpid.server.store.State;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
+
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Durability;
+import com.sleepycat.je.Durability.ReplicaAckPolicy;
+import com.sleepycat.je.Durability.SyncPolicy;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.OperationFailureException;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.rep.InsufficientLogException;
+import com.sleepycat.je.rep.NetworkRestore;
+import com.sleepycat.je.rep.NetworkRestoreConfig;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
+import com.sleepycat.je.rep.ReplicationMutableConfig;
+import com.sleepycat.je.rep.ReplicationNode;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
+
+public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMessageStore
+{
+ private static final String MUTLI_SYNC = "MUTLI_SYNC";
+ private static final String DEFAULT_REPLICATION_POLICY =
+ MUTLI_SYNC + "," + SyncPolicy.NO_SYNC.name() + "," + ReplicaAckPolicy.SIMPLE_MAJORITY.name();
+
+ private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class);
+
+ private String _groupName;
+ private String _nodeName;
+ private String _nodeHostPort;
+ private String _helperHostPort;
+ private String _replicationPolicy;
+ private Durability _replicationDurability;
+
+ private String _name;
+
+ private BDBHAMessageStoreManagerMBean _managedObject;
+
+ public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
+
+ public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
+
+ private CommitThreadWrapper _commitThreadWrapper;
+ private boolean _localMultiSyncCommits;
+ private boolean _autoDesignatedPrimary;
+ private Map<String, String> _repConfigMap;
+
+ @Override
+ public void configure(String name, Configuration storeConfig) throws Exception
+ {
+ //Mandatory configuration
+ _groupName = getValidatedPropertyFromConfig("highAvailability.groupName", storeConfig);
+ _nodeName = getValidatedPropertyFromConfig("highAvailability.nodeName", storeConfig);
+ _nodeHostPort = getValidatedPropertyFromConfig("highAvailability.nodeHostPort", storeConfig);
+ _helperHostPort = getValidatedPropertyFromConfig("highAvailability.helperHostPort", storeConfig);
+ _name = name;
+
+ //Optional configuration
+ _replicationPolicy = storeConfig.getString("highAvailability.replicationPolicy", DEFAULT_REPLICATION_POLICY).trim();
+ _autoDesignatedPrimary = storeConfig.getBoolean("highAvailability.autoDesignatedPrimary", Boolean.TRUE);
+
+ if(_replicationPolicy.startsWith(MUTLI_SYNC))
+ {
+ _replicationDurability = Durability.parse(_replicationPolicy.replaceFirst(MUTLI_SYNC, SyncPolicy.SYNC.name()));
+ _localMultiSyncCommits = true;
+ }
+ else
+ {
+ _replicationDurability = Durability.parse(_replicationPolicy);
+ _localMultiSyncCommits = false;
+ }
+
+ _repConfigMap = getConfigMap(storeConfig, "repConfig");
+
+ _managedObject = new BDBHAMessageStoreManagerMBean(this);
+ _managedObject.register();
+
+ super.configure(name, storeConfig);
+ }
+
+ @Override
+ protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException
+ {
+ super.setupStore(storePath, name);
+
+ if(_localMultiSyncCommits)
+ {
+ _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment());
+ _commitThreadWrapper.startCommitThread();
+ }
+ }
+
+ private String getValidatedPropertyFromConfig(String key, Configuration config) throws ConfigurationException
+ {
+ if (!config.containsKey(key))
+ {
+ throw new ConfigurationException("BDB HA configuration key not found. Please specify configuration key with XPath: "
+ + key.replace('.', '/'));
+ }
+ return config.getString(key);
+ }
+
+ @Override
+ protected Environment createEnvironment(File environmentPath) throws DatabaseException
+ {
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Environment path " + environmentPath.getAbsolutePath());
+ LOGGER.info("Group name " + _groupName);
+ LOGGER.info("Node name " + _nodeName);
+ LOGGER.info("Node host port " + _nodeHostPort);
+ LOGGER.info("Helper host port " + _helperHostPort);
+ LOGGER.info("Replication policy " + _replicationPolicy);
+ }
+
+ final ReplicationConfig replicationConfig = new ReplicationConfig(_groupName, _nodeName, _nodeHostPort);
+
+ replicationConfig.setHelperHosts(_helperHostPort);
+ setReplicationConfigProperties(replicationConfig);
+
+ final EnvironmentConfig envConfig = createEnvironmentConfig();
+ envConfig.setDurability(_replicationDurability);
+
+ ReplicatedEnvironment replicatedEnvironment = null;
+ try
+ {
+ replicatedEnvironment = new ReplicatedEnvironment(environmentPath, replicationConfig, envConfig);
+ }
+ catch (final InsufficientLogException ile)
+ {
+ LOGGER.info("InsufficientLogException thrown and so full network restore required", ile);
+ NetworkRestore restore = new NetworkRestore();
+ NetworkRestoreConfig config = new NetworkRestoreConfig();
+ config.setRetainLogFiles(false);
+ restore.execute(ile, config);
+ replicatedEnvironment = new ReplicatedEnvironment(environmentPath, replicationConfig, envConfig);
+ }
+
+ return replicatedEnvironment;
+ }
+
+ @Override
+ public void configureMessageStore(String name, MessageStoreRecoveryHandler messageRecoveryHandler,
+ TransactionLogRecoveryHandler tlogRecoveryHandler,
+ Configuration config) throws Exception
+ {
+ super.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler, config);
+
+ final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment();
+
+ replicatedEnvironment.setStateChangeListener(new BDBHAMessageStoreStateChangeListener());
+ }
+
+ @Override
+ public synchronized void passivate()
+ {
+ if (_stateManager.isNotInState(State.INITIALISED))
+ {
+ LOGGER.debug("Store becoming passive");
+ _stateManager.attainState(State.INITIALISED);
+ }
+ }
+
+ @Override
+ protected void closeInternal() throws Exception
+ {
+ try
+ {
+ if(_localMultiSyncCommits)
+ {
+ _commitThreadWrapper.stopCommitThread();
+ }
+ super.closeInternal();
+ }
+ finally
+ {
+ if (_managedObject != null)
+ {
+ _managedObject.unregister();
+ }
+ }
+ }
+
+ @Override
+ protected StoreFuture commit(Transaction tx, boolean syncCommit) throws DatabaseException
+ {
+ // Using commit() instead of commitNoSync() for the HA store to allow
+ // the HA durability configuration to influence resulting behaviour.
+ tx.commit();
+
+
+ if(_localMultiSyncCommits)
+ {
+ return _commitThreadWrapper.commit(tx, syncCommit);
+ }
+ else
+ {
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public String getGroupName()
+ {
+ return _groupName;
+ }
+
+ public String getNodeName()
+ {
+ return _nodeName;
+ }
+
+ public String getNodeHostPort()
+ {
+ return _nodeHostPort;
+ }
+
+ public String getHelperHostPort()
+ {
+ return _helperHostPort;
+ }
+
+ public String getReplicationPolicy()
+ {
+ return _replicationPolicy;
+ }
+
+ public String getNodeState()
+ {
+ ReplicatedEnvironment.State state = getReplicatedEnvironment().getState();
+ return state.toString();
+ }
+
+ public Boolean isDesignatedPrimary()
+ {
+ return getReplicatedEnvironment().getRepMutableConfig().getDesignatedPrimary();
+ }
+
+ public List<Map<String, String>> getGroupMembers()
+ {
+ List<Map<String, String>> members = new ArrayList<Map<String,String>>();
+
+ for (ReplicationNode node : getReplicatedEnvironment().getGroup().getNodes())
+ {
+ Map<String, String> nodeMap = new HashMap<String, String>();
+ nodeMap.put(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, node.getName());
+ nodeMap.put(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT, node.getHostName() + ":" + node.getPort());
+ members.add(nodeMap);
+ }
+
+ return members;
+ }
+
+ public void removeNodeFromGroup(String nodeName) throws AMQStoreException
+ {
+ try
+ {
+ createReplicationGroupAdmin().removeMember(nodeName);
+ }
+ catch (OperationFailureException ofe)
+ {
+ throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + ofe.getMessage(), ofe);
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + e.getMessage(), e);
+ }
+ }
+
+ public void setDesignatedPrimary(boolean isPrimary) throws AMQStoreException
+ {
+ try
+ {
+ final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment();
+ synchronized(replicatedEnvironment)
+ {
+ final ReplicationMutableConfig oldConfig = replicatedEnvironment.getRepMutableConfig();
+ final ReplicationMutableConfig newConfig = oldConfig.setDesignatedPrimary(isPrimary);
+ replicatedEnvironment.setRepMutableConfig(newConfig);
+ }
+
+ LOGGER.info("Node " + _nodeName + " successfully set as designated primary for group");
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Failed to set '" + _nodeName + "' as designated primary for group. " + e.getMessage(), e);
+ }
+ }
+
+ ReplicatedEnvironment getReplicatedEnvironment()
+ {
+ return (ReplicatedEnvironment)getEnvironment();
+ }
+
+ public void updateAddress(String nodeName, String newHostName, int newPort) throws AMQStoreException
+ {
+ try
+ {
+ createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort);
+ }
+ catch (OperationFailureException ofe)
+ {
+ throw new AMQStoreException("Failed to update address for '" + nodeName +
+ "' with new host " + newHostName + " and new port " + newPort + ". " + ofe.getMessage(), ofe);
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Failed to update address for '" + nodeName +
+ "' with new host " + newHostName + " and new port " + newPort + ". " + e.getMessage(), e);
+ }
+ }
+
+ private ReplicationGroupAdmin createReplicationGroupAdmin()
+ {
+ final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>();
+ helpers.addAll(getReplicatedEnvironment().getRepConfig().getHelperSockets());
+
+ final ReplicationConfig repConfig = getReplicatedEnvironment().getRepConfig();
+ helpers.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort()));
+
+ return new ReplicationGroupAdmin(_groupName, helpers);
+ }
+
+ private class BDBHAMessageStoreStateChangeListener implements StateChangeListener
+ {
+ private final Executor _executor = Executors.newSingleThreadExecutor();
+
+ @Override
+ public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+ {
+ com.sleepycat.je.rep.ReplicatedEnvironment.State state = stateChangeEvent.getState();
+
+ LOGGER.info("Received BDB event indicating transition to state " + state);
+
+ switch (state)
+ {
+ case MASTER:
+ activateStoreAsync();
+ break;
+ case REPLICA:
+ passivateStore();
+ break;
+ case DETACHED:
+ LOGGER.error("BDB replicated node in detached state, therefore passivating.");
+ passivateStore();
+ break;
+ case UNKNOWN:
+ LOGGER.warn("BDB replicated node in unknown state (hopefully temporarily)");
+ break;
+ default:
+ LOGGER.error("Unexpected state change: " + state);
+ throw new IllegalStateException("Unexpected state change: " + state);
+ }
+ }
+
+ /** synchronously calls passivate. This is acceptable because {@link HAMessageStore#passivate()} is expected to be fast */
+ private void passivateStore()
+ {
+ try
+ {
+ passivate();
+ }
+ catch(Exception e)
+ {
+ LOGGER.error("Unable to passivate", e);
+ throw new RuntimeException("Unable to passivate", e);
+ }
+ }
+
+ /**
+ * Calls {@link MessageStore#activate()}.
+ *
+ * <p/>
+ *
+ * This is done a background thread, in line with
+ * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because
+ * activate may execute transactions, which can't complete until
+ * {@link StateChangeListener#stateChange(StateChangeEvent)} has returned.
+ */
+ private void activateStoreAsync()
+ {
+ final RootMessageLogger _rootLogger = CurrentActor.get().getRootMessageLogger();
+
+ _executor.execute(new Runnable()
+ {
+ private static final String _THREAD_NAME = "BDBHANodeActivationThread";
+
+ @Override
+ public void run()
+ {
+ Thread.currentThread().setName(_THREAD_NAME);
+ CurrentActor.set(new AbstractActor(_rootLogger)
+ {
+ @Override
+ public String getLogMessage()
+ {
+ return _THREAD_NAME;
+ }
+ });
+
+ try
+ {
+ activate();
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Failed to activate on hearing MASTER change event",e);
+ }
+
+ }
+ });
+ }
+ }
+
+ @Override
+ public synchronized void activate() throws Exception
+ {
+ // Before proceeding, perform a log flush with an fsync
+ getEnvironment().flushLog(true);
+
+ super.activate();
+
+ //For replica groups with 2 electable nodes, set the new master to be the
+ //designated primary, such that it can continue working if the replica goes
+ //down and leaves it without a 'majority of 2'.
+ if(getReplicatedEnvironment().getGroup().getElectableNodes().size() <= 2 && _autoDesignatedPrimary)
+ {
+ setDesignatedPrimary(true);
+ }
+ }
+
+ private void setReplicationConfigProperties(ReplicationConfig replicationConfig)
+ {
+ for (Map.Entry<String, String> configItem : _repConfigMap.entrySet())
+ {
+ LOGGER.debug("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
+ replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue());
+ }
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java
new file mode 100644
index 0000000000..cd4e990607
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import javax.management.JMException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.management.AMQManagedObject;
+
+public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements ManagedBDBHAMessageStore
+{
+ private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStoreManagerMBean.class);
+
+ private static final TabularType GROUP_MEMBERS_TABLE;
+ private static final CompositeType GROUP_MEMBER_ROW;
+ private static final OpenType<?>[] GROUP_MEMBER_ATTRIBUTE_TYPES;
+
+ static
+ {
+ try
+ {
+ GROUP_MEMBER_ATTRIBUTE_TYPES = new OpenType<?>[] {SimpleType.STRING, SimpleType.STRING};
+ final String[] itemNames = new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT};
+ final String[] itemDescriptions = new String[] {"Unique node name", "Node host / port "};
+ GROUP_MEMBER_ROW = new CompositeType("GroupMember", "Replication group member",
+ itemNames,
+ itemDescriptions,
+ GROUP_MEMBER_ATTRIBUTE_TYPES );
+ GROUP_MEMBERS_TABLE = new TabularType("GroupMembers", "Replication group memebers",
+ GROUP_MEMBER_ROW,
+ new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME});
+ }
+ catch (final OpenDataException ode)
+ {
+ throw new ExceptionInInitializerError(ode);
+ }
+ }
+
+ private final BDBHAMessageStore _store;
+
+ protected BDBHAMessageStoreManagerMBean(BDBHAMessageStore store) throws NotCompliantMBeanException
+ {
+ super(ManagedBDBHAMessageStore.class, ManagedBDBHAMessageStore.TYPE);
+ _store = store;
+ }
+
+ @Override
+ public String getObjectInstanceName()
+ {
+ return _store.getName();
+ }
+
+ @Override
+ public String getGroupName() throws IOException
+ {
+ return _store.getGroupName();
+ }
+
+ @Override
+ public String getNodeName() throws IOException
+ {
+ return _store.getNodeName();
+ }
+
+ @Override
+ public String getNodeHostPort() throws IOException
+ {
+ return _store.getNodeHostPort();
+ }
+
+ @Override
+ public String getHelperHostPort() throws IOException
+ {
+ return _store.getHelperHostPort();
+ }
+
+ @Override
+ public String getReplicationPolicy() throws IOException
+ {
+ return _store.getReplicationPolicy();
+ }
+
+ @Override
+ public String getNodeState() throws IOException
+ {
+ return _store.getNodeState();
+ }
+
+ @Override
+ public boolean getDesignatedPrimary() throws IOException
+ {
+ return _store.isDesignatedPrimary();
+ }
+
+ @Override
+ public TabularData getAllNodesInGroup() throws IOException, JMException
+ {
+ final TabularDataSupport data = new TabularDataSupport(GROUP_MEMBERS_TABLE);
+ final List<Map<String, String>> members = _store.getGroupMembers();
+
+ for (Map<String, String> map : members)
+ {
+ CompositeData memberData = new CompositeDataSupport(GROUP_MEMBER_ROW, map);
+ data.put(memberData);
+ }
+ return data;
+ }
+
+ @Override
+ public void removeNodeFromGroup(String nodeName) throws JMException
+ {
+ try
+ {
+ _store.removeNodeFromGroup(nodeName);
+ }
+ catch (AMQStoreException e)
+ {
+ LOGGER.error("Failed to remove node " + nodeName + " from group", e);
+ throw new JMException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void setDesignatedPrimary(boolean primary) throws JMException
+ {
+ try
+ {
+ _store.setDesignatedPrimary(primary);
+ }
+ catch (AMQStoreException e)
+ {
+ LOGGER.error("Failed to set node " + _store.getNodeName() + " as designated primary", e);
+ throw new JMException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void updateAddress(String nodeName, String newHostName, int newPort) throws JMException
+ {
+ try
+ {
+ _store.updateAddress(nodeName, newHostName, newPort);
+ }
+ catch(AMQStoreException e)
+ {
+ LOGGER.error("Failed to update address for node " + nodeName + " to " + newHostName + ":" + newPort, e);
+ throw new JMException(e.getMessage());
+ }
+ }
+
+
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index b414441b92..7c29e281d9 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -21,23 +21,12 @@
package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.server.store.Event;
-import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
-import com.sleepycat.je.CheckpointConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
@@ -53,52 +42,22 @@ import com.sleepycat.je.EnvironmentConfig;
public class BDBMessageStore extends AbstractBDBMessageStore
{
private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class);
-
- private final CommitThread _commitThread = new CommitThread("Commit-Thread");
-
- private final Map<Event, List<EventListener>> _eventListeners = new HashMap<Event, List<EventListener>>();
+ private CommitThreadWrapper _commitThreadWrapper;
@Override
protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException
{
super.setupStore(storePath, name);
- startCommitThread();
+ _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment());
+ _commitThreadWrapper.startCommitThread();
}
protected Environment createEnvironment(File environmentPath) throws DatabaseException
{
LOGGER.info("BDB message store using environment path " + environmentPath.getAbsolutePath());
- EnvironmentConfig envConfig = new EnvironmentConfig();
- // This is what allows the creation of the store if it does not already exist.
- envConfig.setAllowCreate(true);
- envConfig.setTransactional(true);
-
- Properties props = System.getProperties();
-
- for(String propName : props.stringPropertyNames())
- {
- if(propName.startsWith("qpid.bdb.envconfig.je."))
- {
- envConfig.setConfigParam(propName.substring(19), props.getProperty(propName));
- }
- }
-
- envConfig.setConfigParam("je.lock.nLockTables", "7");
-
- // Added to help diagnosis of Deadlock issue
- // http://www.oracle.com/technology/products/berkeley-db/faq/je_faq.html#23
- if (Boolean.getBoolean("qpid.bdb.lock.debug"))
- {
- envConfig.setConfigParam("je.txn.deadlockStackTrace", "true");
- envConfig.setConfigParam("je.txn.dumpLocks", "true");
- }
-
- // Set transaction mode
- _transactionConfig.setReadCommitted(true);
+ EnvironmentConfig envConfig = createEnvironmentConfig();
- //This prevents background threads running which will potentially update the store.
- envConfig.setReadOnly(false);
try
{
return new Environment(environmentPath, envConfig);
@@ -118,12 +77,10 @@ public class BDBMessageStore extends AbstractBDBMessageStore
}
}
-
-
@Override
protected void closeInternal() throws Exception
{
- stopCommitThread();
+ _commitThreadWrapper.stopCommitThread();
super.closeInternal();
}
@@ -133,204 +90,6 @@ public class BDBMessageStore extends AbstractBDBMessageStore
{
tx.commitNoSync();
- BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
- commitFuture.commit();
-
- return commitFuture;
- }
-
- private void startCommitThread()
- {
- _commitThread.start();
- }
-
- private void stopCommitThread() throws InterruptedException
- {
- _commitThread.close();
- _commitThread.join();
- }
-
- private static final class BDBCommitFuture implements StoreFuture
- {
- private final CommitThread _commitThread;
- private final com.sleepycat.je.Transaction _tx;
- private DatabaseException _databaseException;
- private boolean _complete;
- private boolean _syncCommit;
-
- public BDBCommitFuture(CommitThread commitThread, com.sleepycat.je.Transaction tx, boolean syncCommit)
- {
- _commitThread = commitThread;
- _tx = tx;
- _syncCommit = syncCommit;
- }
-
- public synchronized void complete()
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("public synchronized void complete(): called (Transaction = " + _tx + ")");
- }
- _complete = true;
-
- notifyAll();
- }
-
- public synchronized void abort(DatabaseException databaseException)
- {
- _complete = true;
- _databaseException = databaseException;
-
- notifyAll();
- }
-
- public void commit() throws DatabaseException
- {
- _commitThread.addJob(this, _syncCommit);
-
- if(!_syncCommit)
- {
- LOGGER.debug("CommitAsync was requested, returning immediately.");
- return;
- }
-
- waitForCompletion();
-
- if (_databaseException != null)
- {
- throw _databaseException;
- }
-
- }
-
- public synchronized boolean isComplete()
- {
- return _complete;
- }
-
- public synchronized void waitForCompletion()
- {
- while (!isComplete())
- {
- _commitThread.explicitNotify();
- try
- {
- wait(250);
- }
- catch (InterruptedException e)
- {
- //TODO Should we ignore, or throw a 'StoreException'?
- throw new RuntimeException(e);
- }
- }
- }
- }
-
- /**
- * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
- * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
- * continuing, but it is the responsibility of this thread to tell the commit operations when they have been
- * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
- *
- * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table>
- */
- private class CommitThread extends Thread
- {
- private final AtomicBoolean _stopped = new AtomicBoolean(false);
- private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
- private final CheckpointConfig _config = new CheckpointConfig();
- private final Object _lock = new Object();
-
- public CommitThread(String name)
- {
- super(name);
- _config.setForce(true);
-
- }
-
- public void explicitNotify()
- {
- synchronized (_lock)
- {
- _lock.notify();
- }
- }
-
- public void run()
- {
- while (!_stopped.get())
- {
- synchronized (_lock)
- {
- while (!_stopped.get() && !hasJobs())
- {
- try
- {
- // RHM-7 Periodically wake up and check, just in case we
- // missed a notification. Don't want to lock the broker hard.
- _lock.wait(1000);
- }
- catch (InterruptedException e)
- {
- }
- }
- }
- processJobs();
- }
- }
-
- private void processJobs()
- {
- int size = _jobQueue.size();
-
- try
- {
- getEnvironment().flushLog(true);
-
- for(int i = 0; i < size; i++)
- {
- BDBCommitFuture commit = _jobQueue.poll();
- commit.complete();
- }
-
- }
- catch (DatabaseException e)
- {
- for(int i = 0; i < size; i++)
- {
- BDBCommitFuture commit = _jobQueue.poll();
- commit.abort(e);
- }
- }
-
- }
-
- private boolean hasJobs()
- {
- return !_jobQueue.isEmpty();
- }
-
- public void addJob(BDBCommitFuture commit, final boolean sync)
- {
-
- _jobQueue.add(commit);
- if(sync)
- {
- synchronized (_lock)
- {
- _lock.notifyAll();
- }
- }
- }
-
- public void close()
- {
- synchronized (_lock)
- {
- _stopped.set(true);
- _lock.notifyAll();
- }
- }
+ return _commitThreadWrapper.commit(tx, syncCommit);
}
-
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
deleted file mode 100644
index 7e5ef3f94c..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreFactory;
-
-public class BDBMessageStoreFactory implements MessageStoreFactory
-{
-
- @Override
- public MessageStore createMessageStore()
- {
- return new BDBMessageStore();
- }
-
- @Override
- public String getStoreClassName()
- {
- return BDBMessageStore.class.getSimpleName();
- }
-
-}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
new file mode 100644
index 0000000000..5b0abbec93
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
@@ -0,0 +1,227 @@
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.StoreFuture;
+
+import com.sleepycat.je.CheckpointConfig;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.Transaction;
+
+public class CommitThreadWrapper
+{
+ private final CommitThread _commitThread;
+
+ public CommitThreadWrapper(String name, Environment env)
+ {
+ _commitThread = new CommitThread(name, env);
+ }
+
+ public void startCommitThread()
+ {
+ _commitThread.start();
+ }
+
+ public void stopCommitThread() throws InterruptedException
+ {
+ _commitThread.close();
+ _commitThread.join();
+ }
+
+ public StoreFuture commit(Transaction tx, boolean syncCommit)
+ {
+ BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
+ commitFuture.commit();
+ return commitFuture;
+ }
+
+ private static final class BDBCommitFuture implements StoreFuture
+ {
+ private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class);
+
+ private final CommitThread _commitThread;
+ private final Transaction _tx;
+ private DatabaseException _databaseException;
+ private boolean _complete;
+ private boolean _syncCommit;
+
+ public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit)
+ {
+ _commitThread = commitThread;
+ _tx = tx;
+ _syncCommit = syncCommit;
+ }
+
+ public synchronized void complete()
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("public synchronized void complete(): called (Transaction = " + _tx + ")");
+ }
+ _complete = true;
+
+ notifyAll();
+ }
+
+ public synchronized void abort(DatabaseException databaseException)
+ {
+ _complete = true;
+ _databaseException = databaseException;
+
+ notifyAll();
+ }
+
+ public void commit() throws DatabaseException
+ {
+ _commitThread.addJob(this, _syncCommit);
+
+ if(!_syncCommit)
+ {
+ LOGGER.debug("CommitAsync was requested, returning immediately.");
+ return;
+ }
+
+ waitForCompletion();
+
+ if (_databaseException != null)
+ {
+ throw _databaseException;
+ }
+
+ }
+
+ public synchronized boolean isComplete()
+ {
+ return _complete;
+ }
+
+ public synchronized void waitForCompletion()
+ {
+ while (!isComplete())
+ {
+ _commitThread.explicitNotify();
+ try
+ {
+ wait(250);
+ }
+ catch (InterruptedException e)
+ {
+ //TODO Should we ignore, or throw a 'StoreException'?
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
+ * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
+ * continuing, but it is the responsibility of this thread to tell the commit operations when they have been
+ * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table>
+ */
+ private static class CommitThread extends Thread
+ {
+ private final AtomicBoolean _stopped = new AtomicBoolean(false);
+ private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
+ private final CheckpointConfig _config = new CheckpointConfig();
+ private final Object _lock = new Object();
+ private Environment _environment;
+
+ public CommitThread(String name, Environment env)
+ {
+ super(name);
+ _config.setForce(true);
+ _environment = env;
+ }
+
+ public void explicitNotify()
+ {
+ synchronized (_lock)
+ {
+ _lock.notify();
+ }
+ }
+
+ public void run()
+ {
+ while (!_stopped.get())
+ {
+ synchronized (_lock)
+ {
+ while (!_stopped.get() && !hasJobs())
+ {
+ try
+ {
+ // RHM-7 Periodically wake up and check, just in case we
+ // missed a notification. Don't want to lock the broker hard.
+ _lock.wait(1000);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ processJobs();
+ }
+ }
+
+ private void processJobs()
+ {
+ int size = _jobQueue.size();
+
+ try
+ {
+ _environment.flushLog(true);
+
+ for(int i = 0; i < size; i++)
+ {
+ BDBCommitFuture commit = _jobQueue.poll();
+ commit.complete();
+ }
+
+ }
+ catch (DatabaseException e)
+ {
+ for(int i = 0; i < size; i++)
+ {
+ BDBCommitFuture commit = _jobQueue.poll();
+ commit.abort(e);
+ }
+ }
+
+ }
+
+ private boolean hasJobs()
+ {
+ return !_jobQueue.isEmpty();
+ }
+
+ public void addJob(BDBCommitFuture commit, final boolean sync)
+ {
+
+ _jobQueue.add(commit);
+ if(sync)
+ {
+ synchronized (_lock)
+ {
+ _lock.notifyAll();
+ }
+ }
+ }
+
+ public void close()
+ {
+ synchronized (_lock)
+ {
+ _stopped.set(true);
+ _lock.notifyAll();
+ }
+ }
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java
new file mode 100644
index 0000000000..bfc7bbf128
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.IOException;
+
+import javax.management.JMException;
+import javax.management.openmbean.TabularData;
+
+import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation;
+
+public interface ManagedBDBHAMessageStore
+{
+ public static final String TYPE = "BDBHAMessageStore";
+
+ public static final String ATTR_GROUP_NAME = "GroupName";
+ public static final String ATTR_NODE_NAME = "NodeName";
+ public static final String ATTR_NODE_HOST_PORT = "NodeHostPort";
+ public static final String ATTR_HELPER_HOST_PORT = "HelperHostPort";
+ public static final String ATTR_REPLICATION_POLICY = "ReplicationPolicy";
+ public static final String ATTR_NODE_STATE = "NodeState";
+ public static final String ATTR_DESIGNATED_PRIMARY = "DesignatedPrimary";
+
+ @MBeanAttribute(name=ATTR_GROUP_NAME, description="Name identifying the group")
+ String getGroupName() throws IOException;
+
+ @MBeanAttribute(name=ATTR_NODE_NAME, description="Unique name identifying the node within the group")
+ String getNodeName() throws IOException;
+
+ @MBeanAttribute(name=ATTR_NODE_HOST_PORT, description="Host/port used to replicate data between this node and others in the group")
+ String getNodeHostPort() throws IOException;
+
+ @MBeanAttribute(name=ATTR_NODE_STATE, description="Current state of this node")
+ String getNodeState() throws IOException;
+
+ @MBeanAttribute(name=ATTR_HELPER_HOST_PORT, description="Host/port used to allow a new node to discover other group members")
+ String getHelperHostPort() throws IOException;
+
+ @MBeanAttribute(name=ATTR_REPLICATION_POLICY, description="Replication policy")
+ String getReplicationPolicy() throws IOException;
+
+ @MBeanAttribute(name=ATTR_DESIGNATED_PRIMARY, description="Designated primary flag. Applicable to the two node case.")
+ boolean getDesignatedPrimary() throws IOException;
+
+ @MBeanOperation(name="getAllNodesInGroup", description="Get all nodes within the group, regardless of whether currently attached or not")
+ TabularData getAllNodesInGroup() throws IOException, JMException;
+
+ @MBeanOperation(name="removeNodeFromGroup", description="Remove an existing node from the group")
+ void removeNodeFromGroup(String nodeName) throws JMException;
+
+ @MBeanOperation(name="setDesignatedPrimary", description="Set/unset this node as the designated primary for the group. Applicable to the two node case.")
+ void setDesignatedPrimary(boolean primary) throws JMException;
+
+ @MBeanOperation(name="updateAddress", description="Update the address of another node. The node must be in a STOPPED state.")
+ void updateAddress(String nodeName, String newHostName, int newPort) throws JMException;
+}
+
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
index e71e39cbb8..f1ab012efc 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
@@ -24,7 +24,7 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore;
import com.sleepycat.bind.tuple.IntegerBinding;
import com.sleepycat.bind.tuple.LongBinding;
@@ -63,7 +63,7 @@ public class Upgrader
if(versionDb.count() == 0L)
{
- int sourceVersion = isEmpty ? BDBMessageStore.VERSION: identifyOldStoreVersion();
+ int sourceVersion = isEmpty ? AbstractBDBMessageStore.VERSION: identifyOldStoreVersion();
DatabaseEntry key = new DatabaseEntry();
IntegerBinding.intToEntry(sourceVersion, key);
DatabaseEntry value = new DatabaseEntry();
@@ -87,7 +87,7 @@ public class Upgrader
int getSourceVersion(Database versionDb)
{
- int version = BDBMessageStore.VERSION + 1;
+ int version = AbstractBDBMessageStore.VERSION + 1;
OperationStatus result;
do
@@ -106,7 +106,7 @@ public class Upgrader
void performUpgradeFromVersion(int sourceVersion, Database versionDb)
throws AMQStoreException
{
- while(sourceVersion != BDBMessageStore.VERSION)
+ while(sourceVersion != AbstractBDBMessageStore.VERSION)
{
upgrade(sourceVersion, ++sourceVersion);
DatabaseEntry key = new DatabaseEntry();