diff options
| author | Robert Gemmell <robbie@apache.org> | 2012-05-17 17:26:04 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2012-05-17 17:26:04 +0000 |
| commit | f5d67044a9797c397764a7ac1aa1a1ed4aa893a3 (patch) | |
| tree | cf8a9cf6a5f741e31417ca4d32a6b708bb3b9fdd /qpid/java/bdbstore/src/main | |
| parent | f523b9e510fc90ce3f7f7d7c2960f3bfee3d42df (diff) | |
| download | qpid-python-f5d67044a9797c397764a7ac1aa1a1ed4aa893a3.tar.gz | |
QPID-4006: add support for using BDB HA to form an active-passive cluster for persistent messaging
- Includes support for setting BDB configuration parameters via the store configuration, both for the existing store and the new HA variant.
- Removes the MessageStoreFactory and reverts store configuration to historical values.
Applied patch from Keith Wall, Andrew MacBean <andymacbean@gmail.com>, Oleksandr Rudyy <orudyy@gmail.com>, Philip Harvey <phil@philharveyonline.com>, and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1339728 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/main')
8 files changed, 1043 insertions, 354 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index a9ae4eb16d..789d5714c8 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -33,11 +33,12 @@ import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.LockConflictException; import com.sleepycat.je.LockMode; import com.sleepycat.je.OperationStatus; -import com.sleepycat.je.TransactionConfig; import java.io.File; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -45,6 +46,7 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.FieldTable; @@ -128,8 +130,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore protected final StateManager _stateManager; - protected TransactionConfig _transactionConfig = new TransactionConfig(); - private MessageStoreRecoveryHandler _messageRecoveryHandler; private TransactionLogRecoveryHandler _tlogRecoveryHandler; @@ -146,6 +146,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper(); + private Map<String, String> _envConfigMap; + public AbstractBDBMessageStore() { _stateManager = new StateManager(_eventManager); @@ -161,7 +163,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore ConfigurationRecoveryHandler recoveryHandler, Configuration storeConfiguration) throws Exception { - _stateManager.attainState(State.CONFIGURING); + _stateManager.attainState(State.INITIALISING); _configRecoveryHandler = recoveryHandler; @@ -179,12 +181,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore _messageRecoveryHandler = messageRecoveryHandler; _tlogRecoveryHandler = tlogRecoveryHandler; - _stateManager.attainState(State.CONFIGURED); + _stateManager.attainState(State.INITIALISED); } - public void activate() throws Exception + public synchronized void activate() throws Exception { - _stateManager.attainState(State.RECOVERING); + _stateManager.attainState(State.ACTIVATING); recoverConfig(_configRecoveryHandler); recoverMessages(_messageRecoveryHandler); @@ -231,11 +233,30 @@ public abstract class AbstractBDBMessageStore implements MessageStore _storeLocation = storeLocation; + _envConfigMap = getConfigMap(storeConfig, "envConfig"); + LOGGER.info("Configuring BDB message store"); setupStore(environmentPath, name); } + protected Map<String,String> getConfigMap(Configuration config, String prefix) throws ConfigurationException + { + final List<Object> argumentNames = config.getList(prefix + ".name"); + final List<Object> argumentValues = config.getList(prefix + ".value"); + final Map<String,String> attributes = new HashMap<String,String>(argumentNames.size()); + + for (int i = 0; i < argumentNames.size(); i++) + { + final String argName = argumentNames.get(i).toString(); + final String argValue = argumentValues.get(i).toString(); + + attributes.put(argName, argValue); + } + + return Collections.unmodifiableMap(attributes); + } + @Override public String getStoreLocation() { @@ -251,9 +272,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore */ void startWithNoRecover() throws AMQStoreException { - _stateManager.attainState(State.CONFIGURING); - _stateManager.attainState(State.CONFIGURED); - _stateManager.attainState(State.RECOVERING); + _stateManager.attainState(State.INITIALISING); + _stateManager.attainState(State.INITIALISED); + _stateManager.attainState(State.ACTIVATING); _stateManager.attainState(State.ACTIVE); } @@ -268,51 +289,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore _totalStoreSize = getSizeOnDisk(); } - protected Environment createEnvironment(File environmentPath) throws DatabaseException - { - LOGGER.info("BDB message store using environment path " + environmentPath.getAbsolutePath()); - EnvironmentConfig envConfig = new EnvironmentConfig(); - // This is what allows the creation of the store if it does not already exist. - envConfig.setAllowCreate(true); - envConfig.setTransactional(true); - envConfig.setConfigParam("je.lock.nLockTables", "7"); - - // Added to help diagnosis of Deadlock issue - // http://www.oracle.com/technology/products/berkeley-db/faq/je_faq.html#23 - if (Boolean.getBoolean("qpid.bdb.lock.debug")) - { - envConfig.setConfigParam("je.txn.deadlockStackTrace", "true"); - envConfig.setConfigParam("je.txn.dumpLocks", "true"); - } - - // Set transaction mode - _transactionConfig.setReadCommitted(true); - - //This prevents background threads running which will potentially update the store. - envConfig.setReadOnly(false); - try - { - return new Environment(environmentPath, envConfig); - } - catch (DatabaseException de) - { - if (de.getMessage().contains("Environment.setAllowCreate is false")) - { - //Allow the creation this time - envConfig.setAllowCreate(true); - if (_environment != null ) - { - _environment.cleanLog(); - _environment.close(); - } - return new Environment(environmentPath, envConfig); - } - else - { - throw de; - } - } - } + protected abstract Environment createEnvironment(File environmentPath) throws DatabaseException; public Environment getEnvironment() { @@ -352,14 +329,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore */ public void close() throws Exception { - if (_stateManager.isInState(State.ACTIVE) || _stateManager.isInState(State.QUIESCED)) - { - _stateManager.stateTransition(State.ACTIVE, State.CLOSING); - - closeInternal(); - - _stateManager.stateTransition(State.CLOSING, State.CLOSED); - } + _stateManager.attainState(State.CLOSING); + closeInternal(); + _stateManager.attainState(State.CLOSED); } protected void closeInternal() throws Exception @@ -1504,6 +1476,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore return true; } + @SuppressWarnings("unchecked") public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData) { if(metaData.isPersistent()) @@ -1914,4 +1887,25 @@ public abstract class AbstractBDBMessageStore implements MessageStore { return _persistentSizeHighThreshold; } + + private void setEnvironmentConfigProperties(EnvironmentConfig envConfig) + { + for (Map.Entry<String, String> configItem : _envConfigMap.entrySet()) + { + LOGGER.debug("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'"); + envConfig.setConfigParam(configItem.getKey(), configItem.getValue()); + } + } + + protected EnvironmentConfig createEnvironmentConfig() + { + EnvironmentConfig envConfig = new EnvironmentConfig(); + envConfig.setAllowCreate(true); + envConfig.setTransactional(true); + envConfig.setConfigParam(EnvironmentConfig.LOCK_N_LOCK_TABLES, "7"); + + setEnvironmentConfigProperties(envConfig); + + return envConfig; + } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java new file mode 100644 index 0000000000..f887c8ce36 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.actors.AbstractActor; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.store.HAMessageStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreRecoveryHandler; +import org.apache.qpid.server.store.State; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.TransactionLogRecoveryHandler; + +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.Durability; +import com.sleepycat.je.Durability.ReplicaAckPolicy; +import com.sleepycat.je.Durability.SyncPolicy; +import com.sleepycat.je.Environment; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.OperationFailureException; +import com.sleepycat.je.Transaction; +import com.sleepycat.je.rep.InsufficientLogException; +import com.sleepycat.je.rep.NetworkRestore; +import com.sleepycat.je.rep.NetworkRestoreConfig; +import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.rep.ReplicationConfig; +import com.sleepycat.je.rep.ReplicationMutableConfig; +import com.sleepycat.je.rep.ReplicationNode; +import com.sleepycat.je.rep.StateChangeEvent; +import com.sleepycat.je.rep.StateChangeListener; +import com.sleepycat.je.rep.util.ReplicationGroupAdmin; + +public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMessageStore +{ + private static final String MUTLI_SYNC = "MUTLI_SYNC"; + private static final String DEFAULT_REPLICATION_POLICY = + MUTLI_SYNC + "," + SyncPolicy.NO_SYNC.name() + "," + ReplicaAckPolicy.SIMPLE_MAJORITY.name(); + + private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class); + + private String _groupName; + private String _nodeName; + private String _nodeHostPort; + private String _helperHostPort; + private String _replicationPolicy; + private Durability _replicationDurability; + + private String _name; + + private BDBHAMessageStoreManagerMBean _managedObject; + + public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort"; + + public static final String GRP_MEM_COL_NODE_NAME = "NodeName"; + + private CommitThreadWrapper _commitThreadWrapper; + private boolean _localMultiSyncCommits; + private boolean _autoDesignatedPrimary; + private Map<String, String> _repConfigMap; + + @Override + public void configure(String name, Configuration storeConfig) throws Exception + { + //Mandatory configuration + _groupName = getValidatedPropertyFromConfig("highAvailability.groupName", storeConfig); + _nodeName = getValidatedPropertyFromConfig("highAvailability.nodeName", storeConfig); + _nodeHostPort = getValidatedPropertyFromConfig("highAvailability.nodeHostPort", storeConfig); + _helperHostPort = getValidatedPropertyFromConfig("highAvailability.helperHostPort", storeConfig); + _name = name; + + //Optional configuration + _replicationPolicy = storeConfig.getString("highAvailability.replicationPolicy", DEFAULT_REPLICATION_POLICY).trim(); + _autoDesignatedPrimary = storeConfig.getBoolean("highAvailability.autoDesignatedPrimary", Boolean.TRUE); + + if(_replicationPolicy.startsWith(MUTLI_SYNC)) + { + _replicationDurability = Durability.parse(_replicationPolicy.replaceFirst(MUTLI_SYNC, SyncPolicy.SYNC.name())); + _localMultiSyncCommits = true; + } + else + { + _replicationDurability = Durability.parse(_replicationPolicy); + _localMultiSyncCommits = false; + } + + _repConfigMap = getConfigMap(storeConfig, "repConfig"); + + _managedObject = new BDBHAMessageStoreManagerMBean(this); + _managedObject.register(); + + super.configure(name, storeConfig); + } + + @Override + protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException + { + super.setupStore(storePath, name); + + if(_localMultiSyncCommits) + { + _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment()); + _commitThreadWrapper.startCommitThread(); + } + } + + private String getValidatedPropertyFromConfig(String key, Configuration config) throws ConfigurationException + { + if (!config.containsKey(key)) + { + throw new ConfigurationException("BDB HA configuration key not found. Please specify configuration key with XPath: " + + key.replace('.', '/')); + } + return config.getString(key); + } + + @Override + protected Environment createEnvironment(File environmentPath) throws DatabaseException + { + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Environment path " + environmentPath.getAbsolutePath()); + LOGGER.info("Group name " + _groupName); + LOGGER.info("Node name " + _nodeName); + LOGGER.info("Node host port " + _nodeHostPort); + LOGGER.info("Helper host port " + _helperHostPort); + LOGGER.info("Replication policy " + _replicationPolicy); + } + + final ReplicationConfig replicationConfig = new ReplicationConfig(_groupName, _nodeName, _nodeHostPort); + + replicationConfig.setHelperHosts(_helperHostPort); + setReplicationConfigProperties(replicationConfig); + + final EnvironmentConfig envConfig = createEnvironmentConfig(); + envConfig.setDurability(_replicationDurability); + + ReplicatedEnvironment replicatedEnvironment = null; + try + { + replicatedEnvironment = new ReplicatedEnvironment(environmentPath, replicationConfig, envConfig); + } + catch (final InsufficientLogException ile) + { + LOGGER.info("InsufficientLogException thrown and so full network restore required", ile); + NetworkRestore restore = new NetworkRestore(); + NetworkRestoreConfig config = new NetworkRestoreConfig(); + config.setRetainLogFiles(false); + restore.execute(ile, config); + replicatedEnvironment = new ReplicatedEnvironment(environmentPath, replicationConfig, envConfig); + } + + return replicatedEnvironment; + } + + @Override + public void configureMessageStore(String name, MessageStoreRecoveryHandler messageRecoveryHandler, + TransactionLogRecoveryHandler tlogRecoveryHandler, + Configuration config) throws Exception + { + super.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler, config); + + final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment(); + + replicatedEnvironment.setStateChangeListener(new BDBHAMessageStoreStateChangeListener()); + } + + @Override + public synchronized void passivate() + { + if (_stateManager.isNotInState(State.INITIALISED)) + { + LOGGER.debug("Store becoming passive"); + _stateManager.attainState(State.INITIALISED); + } + } + + @Override + protected void closeInternal() throws Exception + { + try + { + if(_localMultiSyncCommits) + { + _commitThreadWrapper.stopCommitThread(); + } + super.closeInternal(); + } + finally + { + if (_managedObject != null) + { + _managedObject.unregister(); + } + } + } + + @Override + protected StoreFuture commit(Transaction tx, boolean syncCommit) throws DatabaseException + { + // Using commit() instead of commitNoSync() for the HA store to allow + // the HA durability configuration to influence resulting behaviour. + tx.commit(); + + + if(_localMultiSyncCommits) + { + return _commitThreadWrapper.commit(tx, syncCommit); + } + else + { + return StoreFuture.IMMEDIATE_FUTURE; + } + } + + public String getName() + { + return _name; + } + + public String getGroupName() + { + return _groupName; + } + + public String getNodeName() + { + return _nodeName; + } + + public String getNodeHostPort() + { + return _nodeHostPort; + } + + public String getHelperHostPort() + { + return _helperHostPort; + } + + public String getReplicationPolicy() + { + return _replicationPolicy; + } + + public String getNodeState() + { + ReplicatedEnvironment.State state = getReplicatedEnvironment().getState(); + return state.toString(); + } + + public Boolean isDesignatedPrimary() + { + return getReplicatedEnvironment().getRepMutableConfig().getDesignatedPrimary(); + } + + public List<Map<String, String>> getGroupMembers() + { + List<Map<String, String>> members = new ArrayList<Map<String,String>>(); + + for (ReplicationNode node : getReplicatedEnvironment().getGroup().getNodes()) + { + Map<String, String> nodeMap = new HashMap<String, String>(); + nodeMap.put(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, node.getName()); + nodeMap.put(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT, node.getHostName() + ":" + node.getPort()); + members.add(nodeMap); + } + + return members; + } + + public void removeNodeFromGroup(String nodeName) throws AMQStoreException + { + try + { + createReplicationGroupAdmin().removeMember(nodeName); + } + catch (OperationFailureException ofe) + { + throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + ofe.getMessage(), ofe); + } + catch (DatabaseException e) + { + throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + e.getMessage(), e); + } + } + + public void setDesignatedPrimary(boolean isPrimary) throws AMQStoreException + { + try + { + final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment(); + synchronized(replicatedEnvironment) + { + final ReplicationMutableConfig oldConfig = replicatedEnvironment.getRepMutableConfig(); + final ReplicationMutableConfig newConfig = oldConfig.setDesignatedPrimary(isPrimary); + replicatedEnvironment.setRepMutableConfig(newConfig); + } + + LOGGER.info("Node " + _nodeName + " successfully set as designated primary for group"); + } + catch (DatabaseException e) + { + throw new AMQStoreException("Failed to set '" + _nodeName + "' as designated primary for group. " + e.getMessage(), e); + } + } + + ReplicatedEnvironment getReplicatedEnvironment() + { + return (ReplicatedEnvironment)getEnvironment(); + } + + public void updateAddress(String nodeName, String newHostName, int newPort) throws AMQStoreException + { + try + { + createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort); + } + catch (OperationFailureException ofe) + { + throw new AMQStoreException("Failed to update address for '" + nodeName + + "' with new host " + newHostName + " and new port " + newPort + ". " + ofe.getMessage(), ofe); + } + catch (DatabaseException e) + { + throw new AMQStoreException("Failed to update address for '" + nodeName + + "' with new host " + newHostName + " and new port " + newPort + ". " + e.getMessage(), e); + } + } + + private ReplicationGroupAdmin createReplicationGroupAdmin() + { + final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>(); + helpers.addAll(getReplicatedEnvironment().getRepConfig().getHelperSockets()); + + final ReplicationConfig repConfig = getReplicatedEnvironment().getRepConfig(); + helpers.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort())); + + return new ReplicationGroupAdmin(_groupName, helpers); + } + + private class BDBHAMessageStoreStateChangeListener implements StateChangeListener + { + private final Executor _executor = Executors.newSingleThreadExecutor(); + + @Override + public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException + { + com.sleepycat.je.rep.ReplicatedEnvironment.State state = stateChangeEvent.getState(); + + LOGGER.info("Received BDB event indicating transition to state " + state); + + switch (state) + { + case MASTER: + activateStoreAsync(); + break; + case REPLICA: + passivateStore(); + break; + case DETACHED: + LOGGER.error("BDB replicated node in detached state, therefore passivating."); + passivateStore(); + break; + case UNKNOWN: + LOGGER.warn("BDB replicated node in unknown state (hopefully temporarily)"); + break; + default: + LOGGER.error("Unexpected state change: " + state); + throw new IllegalStateException("Unexpected state change: " + state); + } + } + + /** synchronously calls passivate. This is acceptable because {@link HAMessageStore#passivate()} is expected to be fast */ + private void passivateStore() + { + try + { + passivate(); + } + catch(Exception e) + { + LOGGER.error("Unable to passivate", e); + throw new RuntimeException("Unable to passivate", e); + } + } + + /** + * Calls {@link MessageStore#activate()}. + * + * <p/> + * + * This is done a background thread, in line with + * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because + * activate may execute transactions, which can't complete until + * {@link StateChangeListener#stateChange(StateChangeEvent)} has returned. + */ + private void activateStoreAsync() + { + final RootMessageLogger _rootLogger = CurrentActor.get().getRootMessageLogger(); + + _executor.execute(new Runnable() + { + private static final String _THREAD_NAME = "BDBHANodeActivationThread"; + + @Override + public void run() + { + Thread.currentThread().setName(_THREAD_NAME); + CurrentActor.set(new AbstractActor(_rootLogger) + { + @Override + public String getLogMessage() + { + return _THREAD_NAME; + } + }); + + try + { + activate(); + } + catch (Exception e) + { + LOGGER.error("Failed to activate on hearing MASTER change event",e); + } + + } + }); + } + } + + @Override + public synchronized void activate() throws Exception + { + // Before proceeding, perform a log flush with an fsync + getEnvironment().flushLog(true); + + super.activate(); + + //For replica groups with 2 electable nodes, set the new master to be the + //designated primary, such that it can continue working if the replica goes + //down and leaves it without a 'majority of 2'. + if(getReplicatedEnvironment().getGroup().getElectableNodes().size() <= 2 && _autoDesignatedPrimary) + { + setDesignatedPrimary(true); + } + } + + private void setReplicationConfigProperties(ReplicationConfig replicationConfig) + { + for (Map.Entry<String, String> configItem : _repConfigMap.entrySet()) + { + LOGGER.debug("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'"); + replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue()); + } + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java new file mode 100644 index 0000000000..cd4e990607 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import javax.management.JMException; +import javax.management.NotCompliantMBeanException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.management.AMQManagedObject; + +public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements ManagedBDBHAMessageStore +{ + private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStoreManagerMBean.class); + + private static final TabularType GROUP_MEMBERS_TABLE; + private static final CompositeType GROUP_MEMBER_ROW; + private static final OpenType<?>[] GROUP_MEMBER_ATTRIBUTE_TYPES; + + static + { + try + { + GROUP_MEMBER_ATTRIBUTE_TYPES = new OpenType<?>[] {SimpleType.STRING, SimpleType.STRING}; + final String[] itemNames = new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT}; + final String[] itemDescriptions = new String[] {"Unique node name", "Node host / port "}; + GROUP_MEMBER_ROW = new CompositeType("GroupMember", "Replication group member", + itemNames, + itemDescriptions, + GROUP_MEMBER_ATTRIBUTE_TYPES ); + GROUP_MEMBERS_TABLE = new TabularType("GroupMembers", "Replication group memebers", + GROUP_MEMBER_ROW, + new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME}); + } + catch (final OpenDataException ode) + { + throw new ExceptionInInitializerError(ode); + } + } + + private final BDBHAMessageStore _store; + + protected BDBHAMessageStoreManagerMBean(BDBHAMessageStore store) throws NotCompliantMBeanException + { + super(ManagedBDBHAMessageStore.class, ManagedBDBHAMessageStore.TYPE); + _store = store; + } + + @Override + public String getObjectInstanceName() + { + return _store.getName(); + } + + @Override + public String getGroupName() throws IOException + { + return _store.getGroupName(); + } + + @Override + public String getNodeName() throws IOException + { + return _store.getNodeName(); + } + + @Override + public String getNodeHostPort() throws IOException + { + return _store.getNodeHostPort(); + } + + @Override + public String getHelperHostPort() throws IOException + { + return _store.getHelperHostPort(); + } + + @Override + public String getReplicationPolicy() throws IOException + { + return _store.getReplicationPolicy(); + } + + @Override + public String getNodeState() throws IOException + { + return _store.getNodeState(); + } + + @Override + public boolean getDesignatedPrimary() throws IOException + { + return _store.isDesignatedPrimary(); + } + + @Override + public TabularData getAllNodesInGroup() throws IOException, JMException + { + final TabularDataSupport data = new TabularDataSupport(GROUP_MEMBERS_TABLE); + final List<Map<String, String>> members = _store.getGroupMembers(); + + for (Map<String, String> map : members) + { + CompositeData memberData = new CompositeDataSupport(GROUP_MEMBER_ROW, map); + data.put(memberData); + } + return data; + } + + @Override + public void removeNodeFromGroup(String nodeName) throws JMException + { + try + { + _store.removeNodeFromGroup(nodeName); + } + catch (AMQStoreException e) + { + LOGGER.error("Failed to remove node " + nodeName + " from group", e); + throw new JMException(e.getMessage()); + } + } + + @Override + public void setDesignatedPrimary(boolean primary) throws JMException + { + try + { + _store.setDesignatedPrimary(primary); + } + catch (AMQStoreException e) + { + LOGGER.error("Failed to set node " + _store.getNodeName() + " as designated primary", e); + throw new JMException(e.getMessage()); + } + } + + @Override + public void updateAddress(String nodeName, String newHostName, int newPort) throws JMException + { + try + { + _store.updateAddress(nodeName, newHostName, newPort); + } + catch(AMQStoreException e) + { + LOGGER.error("Failed to update address for node " + nodeName + " to " + newHostName + ":" + newPort, e); + throw new JMException(e.getMessage()); + } + } + + +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index b414441b92..7c29e281d9 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -21,23 +21,12 @@ package org.apache.qpid.server.store.berkeleydb; import java.io.File; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; -import org.apache.qpid.server.store.Event; -import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; -import com.sleepycat.je.CheckpointConfig; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; @@ -53,52 +42,22 @@ import com.sleepycat.je.EnvironmentConfig; public class BDBMessageStore extends AbstractBDBMessageStore { private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class); - - private final CommitThread _commitThread = new CommitThread("Commit-Thread"); - - private final Map<Event, List<EventListener>> _eventListeners = new HashMap<Event, List<EventListener>>(); + private CommitThreadWrapper _commitThreadWrapper; @Override protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException { super.setupStore(storePath, name); - startCommitThread(); + _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment()); + _commitThreadWrapper.startCommitThread(); } protected Environment createEnvironment(File environmentPath) throws DatabaseException { LOGGER.info("BDB message store using environment path " + environmentPath.getAbsolutePath()); - EnvironmentConfig envConfig = new EnvironmentConfig(); - // This is what allows the creation of the store if it does not already exist. - envConfig.setAllowCreate(true); - envConfig.setTransactional(true); - - Properties props = System.getProperties(); - - for(String propName : props.stringPropertyNames()) - { - if(propName.startsWith("qpid.bdb.envconfig.je.")) - { - envConfig.setConfigParam(propName.substring(19), props.getProperty(propName)); - } - } - - envConfig.setConfigParam("je.lock.nLockTables", "7"); - - // Added to help diagnosis of Deadlock issue - // http://www.oracle.com/technology/products/berkeley-db/faq/je_faq.html#23 - if (Boolean.getBoolean("qpid.bdb.lock.debug")) - { - envConfig.setConfigParam("je.txn.deadlockStackTrace", "true"); - envConfig.setConfigParam("je.txn.dumpLocks", "true"); - } - - // Set transaction mode - _transactionConfig.setReadCommitted(true); + EnvironmentConfig envConfig = createEnvironmentConfig(); - //This prevents background threads running which will potentially update the store. - envConfig.setReadOnly(false); try { return new Environment(environmentPath, envConfig); @@ -118,12 +77,10 @@ public class BDBMessageStore extends AbstractBDBMessageStore } } - - @Override protected void closeInternal() throws Exception { - stopCommitThread(); + _commitThreadWrapper.stopCommitThread(); super.closeInternal(); } @@ -133,204 +90,6 @@ public class BDBMessageStore extends AbstractBDBMessageStore { tx.commitNoSync(); - BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit); - commitFuture.commit(); - - return commitFuture; - } - - private void startCommitThread() - { - _commitThread.start(); - } - - private void stopCommitThread() throws InterruptedException - { - _commitThread.close(); - _commitThread.join(); - } - - private static final class BDBCommitFuture implements StoreFuture - { - private final CommitThread _commitThread; - private final com.sleepycat.je.Transaction _tx; - private DatabaseException _databaseException; - private boolean _complete; - private boolean _syncCommit; - - public BDBCommitFuture(CommitThread commitThread, com.sleepycat.je.Transaction tx, boolean syncCommit) - { - _commitThread = commitThread; - _tx = tx; - _syncCommit = syncCommit; - } - - public synchronized void complete() - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("public synchronized void complete(): called (Transaction = " + _tx + ")"); - } - _complete = true; - - notifyAll(); - } - - public synchronized void abort(DatabaseException databaseException) - { - _complete = true; - _databaseException = databaseException; - - notifyAll(); - } - - public void commit() throws DatabaseException - { - _commitThread.addJob(this, _syncCommit); - - if(!_syncCommit) - { - LOGGER.debug("CommitAsync was requested, returning immediately."); - return; - } - - waitForCompletion(); - - if (_databaseException != null) - { - throw _databaseException; - } - - } - - public synchronized boolean isComplete() - { - return _complete; - } - - public synchronized void waitForCompletion() - { - while (!isComplete()) - { - _commitThread.explicitNotify(); - try - { - wait(250); - } - catch (InterruptedException e) - { - //TODO Should we ignore, or throw a 'StoreException'? - throw new RuntimeException(e); - } - } - } - } - - /** - * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations - * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before - * continuing, but it is the responsibility of this thread to tell the commit operations when they have been - * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods. - * - * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table> - */ - private class CommitThread extends Thread - { - private final AtomicBoolean _stopped = new AtomicBoolean(false); - private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>(); - private final CheckpointConfig _config = new CheckpointConfig(); - private final Object _lock = new Object(); - - public CommitThread(String name) - { - super(name); - _config.setForce(true); - - } - - public void explicitNotify() - { - synchronized (_lock) - { - _lock.notify(); - } - } - - public void run() - { - while (!_stopped.get()) - { - synchronized (_lock) - { - while (!_stopped.get() && !hasJobs()) - { - try - { - // RHM-7 Periodically wake up and check, just in case we - // missed a notification. Don't want to lock the broker hard. - _lock.wait(1000); - } - catch (InterruptedException e) - { - } - } - } - processJobs(); - } - } - - private void processJobs() - { - int size = _jobQueue.size(); - - try - { - getEnvironment().flushLog(true); - - for(int i = 0; i < size; i++) - { - BDBCommitFuture commit = _jobQueue.poll(); - commit.complete(); - } - - } - catch (DatabaseException e) - { - for(int i = 0; i < size; i++) - { - BDBCommitFuture commit = _jobQueue.poll(); - commit.abort(e); - } - } - - } - - private boolean hasJobs() - { - return !_jobQueue.isEmpty(); - } - - public void addJob(BDBCommitFuture commit, final boolean sync) - { - - _jobQueue.add(commit); - if(sync) - { - synchronized (_lock) - { - _lock.notifyAll(); - } - } - } - - public void close() - { - synchronized (_lock) - { - _stopped.set(true); - _lock.notifyAll(); - } - } + return _commitThreadWrapper.commit(tx, syncCommit); } - } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java deleted file mode 100644 index 7e5ef3f94c..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreFactory; - -public class BDBMessageStoreFactory implements MessageStoreFactory -{ - - @Override - public MessageStore createMessageStore() - { - return new BDBMessageStore(); - } - - @Override - public String getStoreClassName() - { - return BDBMessageStore.class.getSimpleName(); - } - -} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java new file mode 100644 index 0000000000..5b0abbec93 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java @@ -0,0 +1,227 @@ +package org.apache.qpid.server.store.berkeleydb; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.store.StoreFuture; + +import com.sleepycat.je.CheckpointConfig; +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.Environment; +import com.sleepycat.je.Transaction; + +public class CommitThreadWrapper +{ + private final CommitThread _commitThread; + + public CommitThreadWrapper(String name, Environment env) + { + _commitThread = new CommitThread(name, env); + } + + public void startCommitThread() + { + _commitThread.start(); + } + + public void stopCommitThread() throws InterruptedException + { + _commitThread.close(); + _commitThread.join(); + } + + public StoreFuture commit(Transaction tx, boolean syncCommit) + { + BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit); + commitFuture.commit(); + return commitFuture; + } + + private static final class BDBCommitFuture implements StoreFuture + { + private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class); + + private final CommitThread _commitThread; + private final Transaction _tx; + private DatabaseException _databaseException; + private boolean _complete; + private boolean _syncCommit; + + public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit) + { + _commitThread = commitThread; + _tx = tx; + _syncCommit = syncCommit; + } + + public synchronized void complete() + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("public synchronized void complete(): called (Transaction = " + _tx + ")"); + } + _complete = true; + + notifyAll(); + } + + public synchronized void abort(DatabaseException databaseException) + { + _complete = true; + _databaseException = databaseException; + + notifyAll(); + } + + public void commit() throws DatabaseException + { + _commitThread.addJob(this, _syncCommit); + + if(!_syncCommit) + { + LOGGER.debug("CommitAsync was requested, returning immediately."); + return; + } + + waitForCompletion(); + + if (_databaseException != null) + { + throw _databaseException; + } + + } + + public synchronized boolean isComplete() + { + return _complete; + } + + public synchronized void waitForCompletion() + { + while (!isComplete()) + { + _commitThread.explicitNotify(); + try + { + wait(250); + } + catch (InterruptedException e) + { + //TODO Should we ignore, or throw a 'StoreException'? + throw new RuntimeException(e); + } + } + } + } + + /** + * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations + * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before + * continuing, but it is the responsibility of this thread to tell the commit operations when they have been + * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods. + * + * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table> + */ + private static class CommitThread extends Thread + { + private final AtomicBoolean _stopped = new AtomicBoolean(false); + private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>(); + private final CheckpointConfig _config = new CheckpointConfig(); + private final Object _lock = new Object(); + private Environment _environment; + + public CommitThread(String name, Environment env) + { + super(name); + _config.setForce(true); + _environment = env; + } + + public void explicitNotify() + { + synchronized (_lock) + { + _lock.notify(); + } + } + + public void run() + { + while (!_stopped.get()) + { + synchronized (_lock) + { + while (!_stopped.get() && !hasJobs()) + { + try + { + // RHM-7 Periodically wake up and check, just in case we + // missed a notification. Don't want to lock the broker hard. + _lock.wait(1000); + } + catch (InterruptedException e) + { + } + } + } + processJobs(); + } + } + + private void processJobs() + { + int size = _jobQueue.size(); + + try + { + _environment.flushLog(true); + + for(int i = 0; i < size; i++) + { + BDBCommitFuture commit = _jobQueue.poll(); + commit.complete(); + } + + } + catch (DatabaseException e) + { + for(int i = 0; i < size; i++) + { + BDBCommitFuture commit = _jobQueue.poll(); + commit.abort(e); + } + } + + } + + private boolean hasJobs() + { + return !_jobQueue.isEmpty(); + } + + public void addJob(BDBCommitFuture commit, final boolean sync) + { + + _jobQueue.add(commit); + if(sync) + { + synchronized (_lock) + { + _lock.notifyAll(); + } + } + } + + public void close() + { + synchronized (_lock) + { + _stopped.set(true); + _lock.notifyAll(); + } + } + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java new file mode 100644 index 0000000000..bfc7bbf128 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.IOException; + +import javax.management.JMException; +import javax.management.openmbean.TabularData; + +import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute; +import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation; + +public interface ManagedBDBHAMessageStore +{ + public static final String TYPE = "BDBHAMessageStore"; + + public static final String ATTR_GROUP_NAME = "GroupName"; + public static final String ATTR_NODE_NAME = "NodeName"; + public static final String ATTR_NODE_HOST_PORT = "NodeHostPort"; + public static final String ATTR_HELPER_HOST_PORT = "HelperHostPort"; + public static final String ATTR_REPLICATION_POLICY = "ReplicationPolicy"; + public static final String ATTR_NODE_STATE = "NodeState"; + public static final String ATTR_DESIGNATED_PRIMARY = "DesignatedPrimary"; + + @MBeanAttribute(name=ATTR_GROUP_NAME, description="Name identifying the group") + String getGroupName() throws IOException; + + @MBeanAttribute(name=ATTR_NODE_NAME, description="Unique name identifying the node within the group") + String getNodeName() throws IOException; + + @MBeanAttribute(name=ATTR_NODE_HOST_PORT, description="Host/port used to replicate data between this node and others in the group") + String getNodeHostPort() throws IOException; + + @MBeanAttribute(name=ATTR_NODE_STATE, description="Current state of this node") + String getNodeState() throws IOException; + + @MBeanAttribute(name=ATTR_HELPER_HOST_PORT, description="Host/port used to allow a new node to discover other group members") + String getHelperHostPort() throws IOException; + + @MBeanAttribute(name=ATTR_REPLICATION_POLICY, description="Replication policy") + String getReplicationPolicy() throws IOException; + + @MBeanAttribute(name=ATTR_DESIGNATED_PRIMARY, description="Designated primary flag. Applicable to the two node case.") + boolean getDesignatedPrimary() throws IOException; + + @MBeanOperation(name="getAllNodesInGroup", description="Get all nodes within the group, regardless of whether currently attached or not") + TabularData getAllNodesInGroup() throws IOException, JMException; + + @MBeanOperation(name="removeNodeFromGroup", description="Remove an existing node from the group") + void removeNodeFromGroup(String nodeName) throws JMException; + + @MBeanOperation(name="setDesignatedPrimary", description="Set/unset this node as the designated primary for the group. Applicable to the two node case.") + void setDesignatedPrimary(boolean primary) throws JMException; + + @MBeanOperation(name="updateAddress", description="Update the address of another node. The node must be in a STOPPED state.") + void updateAddress(String nodeName, String newHostName, int newPort) throws JMException; +} + diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java index e71e39cbb8..f1ab012efc 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java @@ -24,7 +24,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import org.apache.qpid.AMQStoreException; -import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; +import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore; import com.sleepycat.bind.tuple.IntegerBinding; import com.sleepycat.bind.tuple.LongBinding; @@ -63,7 +63,7 @@ public class Upgrader if(versionDb.count() == 0L) { - int sourceVersion = isEmpty ? BDBMessageStore.VERSION: identifyOldStoreVersion(); + int sourceVersion = isEmpty ? AbstractBDBMessageStore.VERSION: identifyOldStoreVersion(); DatabaseEntry key = new DatabaseEntry(); IntegerBinding.intToEntry(sourceVersion, key); DatabaseEntry value = new DatabaseEntry(); @@ -87,7 +87,7 @@ public class Upgrader int getSourceVersion(Database versionDb) { - int version = BDBMessageStore.VERSION + 1; + int version = AbstractBDBMessageStore.VERSION + 1; OperationStatus result; do @@ -106,7 +106,7 @@ public class Upgrader void performUpgradeFromVersion(int sourceVersion, Database versionDb) throws AMQStoreException { - while(sourceVersion != BDBMessageStore.VERSION) + while(sourceVersion != AbstractBDBMessageStore.VERSION) { upgrade(sourceVersion, ++sourceVersion); DatabaseEntry key = new DatabaseEntry(); |
