summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-04-01 10:35:33 +0000
committerKeith Wall <kwall@apache.org>2014-04-01 10:35:33 +0000
commitcf057f863f902d73477411b8d1f5b0a7541748b0 (patch)
tree9ce0d2953a6d358f6f91a0de25e3dc8f719e8ee4 /qpid/java/bdbstore/src
parent99231e1918fb20c750e7261f531d31ecec12b8f1 (diff)
parentc24467b81ca19de444d8a4bd4d5bb01cd6738df2 (diff)
downloadqpid-python-cf057f863f902d73477411b8d1f5b0a7541748b0.tar.gz
QPID-5653, QPID-5624: Remove support for virtual host xml and make message and configuration stores stateless.
This commit will temporarily break the UI git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1583597 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java7
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java200
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java107
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java418
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java50
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java9
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java10
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java33
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java33
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java43
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java5
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java4
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java6
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java5
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java7
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java9
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java42
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java22
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java35
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java2
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java130
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java9
22 files changed, 500 insertions, 686 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java
index f13e4dd08b..a5eac25968 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java
@@ -104,4 +104,11 @@ public class BDBConfiguredObjectRecord implements ConfiguredObjectRecord
result = 31 * result + (_type != null ? _type.hashCode() : 0);
return result;
}
+
+ @Override
+ public String toString()
+ {
+ return "BDBConfiguredObjectRecord [id=" + _id + ", type=" + _type + ", name=" + (_attributes == null ? null : _attributes.get("name")) + ", parents=" + _parents + "]";
+ }
+
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
index 3fdc12ba31..aae0a56a40 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.server.store.berkeleydb;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,19 +18,20 @@ package org.apache.qpid.server.store.berkeleydb;
* under the License.
*
*/
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.util.HashMap;
+import java.util.Map;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.DurableConfigurationRecoverer;
import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.Event;
-import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.OperationalLoggingListener;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
@@ -48,48 +48,31 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
private static final Logger LOGGER = Logger.getLogger(BDBHAVirtualHost.class);
private BDBMessageStore _messageStore;
-
- private boolean _inVhostInitiatedClose;
+ private MessageStoreLogSubject _messageStoreLogSubject;
BDBHAVirtualHost(VirtualHostRegistry virtualHostRegistry,
StatisticsGatherer brokerStatisticsGatherer,
org.apache.qpid.server.security.SecurityManager parentSecurityManager,
- VirtualHostConfiguration hostConfig,
VirtualHost virtualHost)
{
- super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig, virtualHost);
+ super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, virtualHost);
}
- protected void initialiseStorage(VirtualHostConfiguration hostConfig, VirtualHost virtualHost)
+ protected void initialiseStorage(VirtualHost virtualHost)
{
- _messageStore = new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory());
-
- final MessageStoreLogSubject storeLogSubject =
- new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName());
- OperationalLoggingListener.listen(_messageStore, storeLogSubject, getEventLogger());
-
- _messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE);
- _messageStore.addEventListener(new AfterActivationListener(), Event.AFTER_ACTIVATE);
- _messageStore.addEventListener(new BeforeCloseListener(), Event.BEFORE_CLOSE);
-
-
+ setState(State.PASSIVE);
- _messageStore.addEventListener(new AfterInitialisationListener(), Event.AFTER_INIT);
- _messageStore.addEventListener(new BeforePassivationListener(), Event.BEFORE_PASSIVATE);
+ _messageStoreLogSubject = new MessageStoreLogSubject(getName(), BDBMessageStore.class.getSimpleName());
+ _messageStore = new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory());
+ getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.CREATED());
- VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this);
- DurableConfigurationRecoverer configRecoverer =
- new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
- new DefaultUpgraderProvider(this, getExchangeRegistry()), getEventLogger());
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>(virtualHost.getMessageStoreSettings());
+ messageStoreSettings.put(DurableConfigurationStore.IS_MESSAGE_STORE_TOO, true);
- _messageStore.configureConfigStore(
- virtualHost, configRecoverer
- );
+ _messageStore.openConfigurationStore(virtualHost, messageStoreSettings);
+ _messageStore.openMessageStore(virtualHost, messageStoreSettings);
- _messageStore.configureMessageStore(
- virtualHost, recoveryHandler,
- recoveryHandler
- );
+ getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.STORE_LOCATION(_messageStore.getStoreLocation()));
// Make the virtualhost model object a replication group listener
ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) _messageStore.getEnvironmentFacade();
@@ -97,29 +80,6 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
}
-
- protected void closeStorage()
- {
- //Close MessageStore
- if (_messageStore != null)
- {
- //Remove MessageStore Interface should not throw Exception
- try
- {
- _inVhostInitiatedClose = true;
- getMessageStore().close();
- }
- catch (Exception e)
- {
- getLogger().error("Failed to close message store", e);
- }
- finally
- {
- _inVhostInitiatedClose = false;
- }
- }
- }
-
@Override
public DurableConfigurationStore getDurableConfigurationStore()
{
@@ -132,77 +92,64 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
return _messageStore;
}
- private final class AfterInitialisationListener implements EventListener
+ private void activate()
{
- public void event(Event event)
+ try
{
- setState(State.PASSIVE);
- }
+ _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true);
- }
+ DefaultUpgraderProvider upgraderProvider = new DefaultUpgraderProvider(this);
- private final class BeforePassivationListener implements EventListener
- {
- public void event(Event event)
- {
- State finalState = State.ERRORED;
+ DurableConfigurationRecoverer configRecoverer =
+ new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
+ upgraderProvider, getEventLogger());
+ _messageStore.recoverConfigurationStore(configRecoverer);
- try
- {
- /* the approach here is not ideal as there is a race condition where a
- * queue etc could be created while the virtual host is on the way to
- * the passivated state. However the store state change from MASTER to UNKNOWN
- * is documented as exceptionally rare..
- */
-
- getConnectionRegistry().close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT);
- removeHouseKeepingTasks();
+ initialiseModel();
- getQueueRegistry().stopAllAndUnregisterMBeans();
- getExchangeRegistry().clearAndUnregisterMbeans();
- getDtxRegistry().close();
+ VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(BDBHAVirtualHost.this, getMessageStoreLogSubject());
+ _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler);
- finalState = State.PASSIVE;
- }
- finally
- {
- setState(finalState);
- reportIfError(getState());
- }
+ attainActivation();
}
-
- }
-
-
- private final class BeforeActivationListener implements EventListener
- {
- @Override
- public void event(Event event)
+ catch (Exception e)
{
- initialiseModel(getConfiguration());
+ LOGGER.error("Failed to activate on hearing MASTER change event", e);
}
}
- private final class AfterActivationListener implements EventListener
+ private void passivate()
{
- @Override
- public void event(Event event)
+ State finalState = State.ERRORED;
+
+ try
{
- attainActivation();
+ /* the approach here is not ideal as there is a race condition where a
+ * queue etc could be created while the virtual host is on the way to
+ * the passivated state. However the store state change from MASTER to UNKNOWN
+ * is documented as exceptionally rare.
+ */
+
+ getConnectionRegistry().close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT);
+ removeHouseKeepingTasks();
+
+ getQueueRegistry().stopAllAndUnregisterMBeans();
+ getExchangeRegistry().clearAndUnregisterMbeans();
+ getDtxRegistry().close();
+
+ finalState = State.PASSIVE;
+ }
+ finally
+ {
+ setState(finalState);
+ reportIfError(getState());
}
}
- private final class BeforeCloseListener implements EventListener
+ @Override
+ protected MessageStoreLogSubject getMessageStoreLogSubject()
{
- @Override
- public void event(Event event)
- {
- if(!_inVhostInitiatedClose)
- {
- shutdownHouseKeeping();
- }
-
- }
+ return _messageStoreLogSubject;
}
private class BDBHAMessageStoreStateChangeListener implements StateChangeListener
@@ -215,8 +162,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
if (LOGGER.isInfoEnabled())
{
- LOGGER.info("Received BDB event indicating transition to state " + state
- + " when current message store state is " + _messageStore._stateManager.getState());
+ LOGGER.info("Received BDB event indicating transition to state " + state);
}
switch (state)
@@ -239,36 +185,6 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
throw new IllegalStateException("Unexpected state change: " + state);
}
}
-
- private void activate()
- {
- try
- {
- _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true);
- _messageStore.activate();
- }
- catch (Exception e)
- {
- LOGGER.error("Failed to activate on hearing MASTER change event", e);
- }
- }
-
- private void passivate()
- {
- try
- {
- //TODO: move this this into the store method passivate()
- if (_messageStore._stateManager.isNotInState(org.apache.qpid.server.store.State.INITIALISED))
- {
- _messageStore._stateManager.attainState(org.apache.qpid.server.store.State.INITIALISED);
- }
- }
- catch (Exception e)
- {
- LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event", e);
- }
- }
-
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
index 7a308920b3..6fb84b8a4d 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
@@ -19,16 +19,12 @@ package org.apache.qpid.server.store.berkeleydb;/*
*
*/
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
-import org.apache.commons.configuration.Configuration;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.model.adapter.VirtualHostAdapter;
+
import org.apache.qpid.server.plugin.VirtualHostFactory;
import org.apache.qpid.server.stats.StatisticsGatherer;
-import org.apache.qpid.server.store.MessageStoreConstants;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
@@ -47,24 +43,29 @@ public class BDBHAVirtualHostFactory implements VirtualHostFactory
public VirtualHost createVirtualHost(VirtualHostRegistry virtualHostRegistry,
StatisticsGatherer brokerStatisticsGatherer,
org.apache.qpid.server.security.SecurityManager parentSecurityManager,
- VirtualHostConfiguration hostConfig,
org.apache.qpid.server.model.VirtualHost virtualHost)
{
return new BDBHAVirtualHost(virtualHostRegistry,
brokerStatisticsGatherer,
parentSecurityManager,
- hostConfig,
virtualHost);
}
@Override
public void validateAttributes(Map<String, Object> attributes)
{
- validateAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH, String.class, attributes);
- validateAttribute("haGroupName", String.class, attributes);
- validateAttribute("haNodeName", String.class, attributes);
- validateAttribute("haNodeAddress", String.class, attributes);
- validateAttribute("haHelperAddress", String.class, attributes);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> messageStoreSettings = (Map<String, Object>)attributes.get(org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS);
+ if (messageStoreSettings == null)
+ {
+ throw new IllegalArgumentException("Attribute '"+ org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS + "' is required.");
+ }
+
+ validateAttribute(MessageStore.STORE_PATH, String.class, messageStoreSettings);
+ validateAttribute(ReplicatedEnvironmentFacadeFactory.GROUP_NAME, String.class, messageStoreSettings);
+ validateAttribute(ReplicatedEnvironmentFacadeFactory.NODE_NAME, String.class, messageStoreSettings);
+ validateAttribute(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, String.class, messageStoreSettings);
+ validateAttribute(ReplicatedEnvironmentFacadeFactory.HELPER_ADDRESS, String.class, messageStoreSettings);
}
private void validateAttribute(String attrName, Class<?> clazz, Map<String, Object> attributes)
@@ -77,82 +78,4 @@ public class BDBHAVirtualHostFactory implements VirtualHostFactory
}
}
- @Override
- public Map<String, Object> createVirtualHostConfiguration(VirtualHostAdapter virtualHostAdapter)
- {
- LinkedHashMap<String,Object> convertedMap = new LinkedHashMap<String, Object>();
- convertedMap.put("store.environment-path", virtualHostAdapter.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH));
-
- return convertedMap;
- }
-
- public Map<String, Object> convertVirtualHostConfiguration(Configuration configuration)
- {
-
- LinkedHashMap<String,Object> convertedMap = new LinkedHashMap<String, Object>();
-
- Configuration storeConfiguration = configuration.subset("store");
-
- convertedMap.put(org.apache.qpid.server.model.VirtualHost.STORE_PATH, storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY));
- convertedMap.put(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE, storeConfiguration.getString(MessageStoreConstants.OVERFULL_SIZE_PROPERTY));
- convertedMap.put(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE, storeConfiguration.getString(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY));
- convertedMap.put("haGroupName", configuration.getString("store.highAvailability.groupName"));
- convertedMap.put("haNodeName", configuration.getString("store.highAvailability.nodeName"));
- convertedMap.put("haNodeAddress", configuration.getString("store.highAvailability.nodeHostPort"));
- convertedMap.put("haHelperAddress", configuration.getString("store.highAvailability.helperHostPort"));
-
- final Object haDurability = configuration.getString("store.highAvailability.durability");
- if(haDurability !=null)
- {
- convertedMap.put("haDurability", haDurability);
- }
-
- final Object designatedPrimary = configuration.getString("store.highAvailability.designatedPrimary");
- if(designatedPrimary!=null)
- {
- convertedMap.put("haDesignatedPrimary", designatedPrimary);
- }
-
- final Object coalescingSync = configuration.getString("store.highAvailability.coalescingSync");
- if(coalescingSync!=null)
- {
- convertedMap.put("haCoalescingSync", coalescingSync);
- }
-
-
- Map<String, String> attributes = getEnvironmentMap(storeConfiguration, "envConfig");
-
- if(!attributes.isEmpty())
- {
- convertedMap.put("bdbEnvironmentConfig",attributes);
- }
-
- attributes = getEnvironmentMap(storeConfiguration, "repConfig");
-
- if(!attributes.isEmpty())
- {
- convertedMap.put("haReplicationConfig",attributes);
- }
-
- return convertedMap;
-
- }
-
- private Map<String, String> getEnvironmentMap(Configuration storeConfiguration, String configName)
- {
- final List<Object> argumentNames = storeConfiguration.getList(configName +".name");
- final List<Object> argumentValues = storeConfiguration.getList(configName +".value");
- final int initialSize = argumentNames.size();
-
- final Map<String,String> attributes = new HashMap<String,String>(initialSize);
-
- for (int i = 0; i < argumentNames.size(); i++)
- {
- final String argName = argumentNames.get(i).toString();
- final String argValue = argumentValues.get(i).toString();
-
- attributes.put(argName, argValue);
- }
- return attributes;
- }
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 16255eb5ed..8aac9a6247 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
@@ -46,11 +46,8 @@ import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.EventManager;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreConstants;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
-import org.apache.qpid.server.store.State;
-import org.apache.qpid.server.store.StateManager;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoreFuture;
@@ -59,6 +56,7 @@ import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask;
import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey;
import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
@@ -72,6 +70,7 @@ import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
+import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.util.FileUtils;
import com.sleepycat.bind.tuple.ByteBinding;
@@ -102,7 +101,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class);
public static final int VERSION = 8;
- public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig";
private static final int LOCK_RETRY_ATTEMPTS = 5;
private static String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS";
private static String CONFIGURED_OBJECT_HIERARCHY_DB_NAME = "CONFIGURED_OBJECT_HIERARCHY";
@@ -110,25 +108,20 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private static String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA";
private static String MESSAGE_CONTENT_DB_NAME = "MESSAGE_CONTENT";
private static String DELIVERY_DB_NAME = "QUEUE_ENTRIES";
+
+ //TODO: Add upgrader to remove BRIDGES and LINKS
private static String BRIDGEDB_NAME = "BRIDGES";
private static String LINKDB_NAME = "LINKS";
private static String XID_DB_NAME = "XIDS";
private static String CONFIG_VERSION_DB_NAME = "CONFIG_VERSION";
- private static final String[] DATABASE_NAMES = new String[] { CONFIGURED_OBJECTS_DB_NAME, CONFIGURED_OBJECT_HIERARCHY_DB_NAME, MESSAGE_META_DATA_DB_NAME,
- MESSAGE_CONTENT_DB_NAME, DELIVERY_DB_NAME, BRIDGEDB_NAME, LINKDB_NAME, XID_DB_NAME, CONFIG_VERSION_DB_NAME };
-
- private final AtomicBoolean _closed = new AtomicBoolean(false);
+ private static final String[] CONFIGURATION_STORE_DATABASE_NAMES = new String[] { CONFIGURED_OBJECTS_DB_NAME, CONFIG_VERSION_DB_NAME , CONFIGURED_OBJECT_HIERARCHY_DB_NAME};
+ private static final String[] MESSAGE_STORE_DATABASE_NAMES = new String[] { MESSAGE_META_DATA_DB_NAME, MESSAGE_CONTENT_DB_NAME, DELIVERY_DB_NAME, BRIDGEDB_NAME, LINKDB_NAME, XID_DB_NAME };
private EnvironmentFacade _environmentFacade;
private final AtomicLong _messageId = new AtomicLong(0);
- protected final StateManager _stateManager;
-
- private MessageStoreRecoveryHandler _messageRecoveryHandler;
-
- private TransactionLogRecoveryHandler _tlogRecoveryHandler;
-
- private ConfigurationRecoveryHandler _configRecoveryHandler;
+ private final AtomicBoolean _messageStoreOpen = new AtomicBoolean();
+ private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean();
private long _totalStoreSize;
private boolean _limitBusted;
@@ -137,12 +130,12 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private final EventManager _eventManager = new EventManager();
private final String _type;
- private VirtualHost _virtualHost;
private final EnvironmentFacadeFactory _environmentFacadeFactory;
private volatile Committer _committer;
+
public BDBMessageStore()
{
this(new StandardEnvironmentFacadeFactory());
@@ -152,7 +145,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
_type = environmentFacadeFactory.getType();
_environmentFacadeFactory = environmentFacadeFactory;
- _stateManager = new StateManager(_eventManager);
}
@Override
@@ -162,110 +154,91 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
@Override
- public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler)
+ public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
{
- _stateManager.attainState(State.INITIALISING);
-
- _configRecoveryHandler = recoveryHandler;
- _virtualHost = virtualHost;
+ if (_configurationStoreOpen.compareAndSet(false, true))
+ {
+ if (_environmentFacade == null)
+ {
+ String[] databaseNames = null;
+ if (MapValueConverter.getBooleanAttribute(IS_MESSAGE_STORE_TOO, storeSettings, false))
+ {
+ databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length];
+ System.arraycopy(CONFIGURATION_STORE_DATABASE_NAMES, 0, databaseNames, 0, CONFIGURATION_STORE_DATABASE_NAMES.length);
+ System.arraycopy(MESSAGE_STORE_DATABASE_NAMES, 0, databaseNames, CONFIGURATION_STORE_DATABASE_NAMES.length, MESSAGE_STORE_DATABASE_NAMES.length);
+ }
+ else
+ {
+ databaseNames = CONFIGURATION_STORE_DATABASE_NAMES;
+ }
+ _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, new UpgradeTask(parent), new OpenDatabasesTask(databaseNames));
+ }
+ else
+ {
+ throw new IllegalStateException("The database have been already opened as message store");
+ }
+ }
}
@Override
- public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler) throws StoreException
+ public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
{
- if(_stateManager.isInState(State.INITIAL))
- {
- // Is acting as a message store, but not a durable config store
- _stateManager.attainState(State.INITIALISING);
- }
+ checkConfigurationStoreOpen();
- _messageRecoveryHandler = messageRecoveryHandler;
- _tlogRecoveryHandler = tlogRecoveryHandler;
- _virtualHost = virtualHost;
- completeInitialisation();
+ recoverConfig(recoveryHandler);
}
- private void completeInitialisation() throws StoreException
+ @Override
+ public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings) throws StoreException
{
- configure(_virtualHost, _messageRecoveryHandler != null);
+ if (_messageStoreOpen.compareAndSet(false, true))
+ {
- _stateManager.attainState(State.INITIALISED);
- }
+ Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE);
+ Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE);
- private void startActivation() throws StoreException
- {
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setTransactional(true);
- dbConfig.setAllowCreate(true);
- try
- {
- new Upgrader(_environmentFacade.getEnvironment(), _virtualHost).upgradeIfNecessary();
- _environmentFacade.openDatabases(dbConfig, DATABASE_NAMES);
- _totalStoreSize = getSizeOnDisk();
- }
- catch(DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Cannot configure store", e);
- }
+ _persistentSizeHighThreshold = overfullAttr == null ? -1l :
+ overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString());
+ _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold :
+ underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString());
+
+ if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
+ {
+ _persistentSizeLowThreshold = _persistentSizeHighThreshold;
+ }
+
+ if (_environmentFacade == null)
+ {
+ _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings, new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask());
+ }
+ _committer = _environmentFacade.createCommitter(parent.getName());
+ _committer.start();
+ }
}
@Override
- public synchronized void activate() throws StoreException
+ public synchronized void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) throws StoreException
{
- // check if acting as a durable config store, but not a message store
- if(_stateManager.isInState(State.INITIALISING))
- {
- completeInitialisation();
- }
-
- _stateManager.attainState(State.ACTIVATING);
- startActivation();
+ checkMessageStoreOpen();
- if(_configRecoveryHandler != null)
+ if(messageRecoveryHandler != null)
{
- recoverConfig(_configRecoveryHandler);
+ recoverMessages(messageRecoveryHandler);
}
- if(_messageRecoveryHandler != null)
+ if(transactionLogRecoveryHandler != null)
{
- recoverMessages(_messageRecoveryHandler);
+ recoverQueueEntries(transactionLogRecoveryHandler);
}
- if(_tlogRecoveryHandler != null)
- {
- recoverQueueEntries(_tlogRecoveryHandler);
- }
-
- _stateManager.attainState(State.ACTIVE);
}
@Override
public org.apache.qpid.server.store.Transaction newTransaction() throws StoreException
{
- return new BDBTransaction();
- }
-
- private void configure(VirtualHost virtualHost, boolean isMessageStore) throws StoreException
- {
- Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE);
- Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE);
-
- _persistentSizeHighThreshold = overfullAttr == null ? -1l :
- overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString());
- _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold :
- underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString());
-
+ checkMessageStoreOpen();
- if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
- {
- _persistentSizeLowThreshold = _persistentSizeHighThreshold;
- }
-
- _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(virtualHost, isMessageStore);
-
- _committer = _environmentFacade.createCommitter(virtualHost.getName());
- _committer.start();
+ return new BDBTransaction();
}
@Override
@@ -283,33 +256,47 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
return _environmentFacade;
}
- /**
- * Called to close and cleanup any resources used by the message store.
- *
- * @throws Exception If the close fails.
- */
@Override
- public void close() throws StoreException
+ public void closeMessageStore() throws StoreException
{
- if (_closed.compareAndSet(false, true))
+ if (_messageStoreOpen.compareAndSet(true, false))
{
- _stateManager.attainState(State.CLOSING);
try
{
- try
+ if (_committer != null)
{
_committer.stop();
}
- finally
+ }
+ finally
+ {
+ if (!_configurationStoreOpen.get())
{
closeEnvironment();
}
}
- catch(DatabaseException e)
+ }
+ }
+
+ @Override
+ public void closeConfigurationStore() throws StoreException
+ {
+ if (_configurationStoreOpen.compareAndSet(true, false))
+ {
+ try
{
- throw new StoreException("Exception occured on message store close", e);
+ if (_committer != null)
+ {
+ _committer.stop();
+ }
+ }
+ finally
+ {
+ if (!_messageStoreOpen.get())
+ {
+ closeEnvironment();
+ }
}
- _stateManager.attainState(State.CLOSED);
}
}
@@ -751,27 +738,25 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
@Override
public void create(ConfiguredObjectRecord configuredObject) throws StoreException
{
- if (_stateManager.isInState(State.ACTIVE))
+ checkConfigurationStoreOpen();
+ com.sleepycat.je.Transaction txn = null;
+ try
{
- com.sleepycat.je.Transaction txn = null;
- try
- {
- txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
- storeConfiguredObjectEntry(txn, configuredObject);
- txn.commit();
- txn = null;
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Error creating configured object " + configuredObject
- + " in database: " + e.getMessage(), e);
- }
- finally
+ txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+ storeConfiguredObjectEntry(txn, configuredObject);
+ txn.commit();
+ txn = null;
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error creating configured object " + configuredObject
+ + " in database: " + e.getMessage(), e);
+ }
+ finally
+ {
+ if (txn != null)
{
- if (txn != null)
- {
- abortTransactionIgnoringException("Error creating configured object", txn);
- }
+ abortTransactionIgnoringException("Error creating configured object", txn);
}
}
}
@@ -779,11 +764,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
@Override
public UUID[] remove(final ConfiguredObjectRecord... objects) throws StoreException
{
+ checkConfigurationStoreOpen();
+
com.sleepycat.je.Transaction txn = null;
try
{
txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
-
+
Collection<UUID> removed = new ArrayList<UUID>(objects.length);
for(ConfiguredObjectRecord record : objects)
{
@@ -792,7 +779,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
removed.add(record.getId());
}
}
-
+
txn.commit();
txn = null;
return removed.toArray(new UUID[removed.size()]);
@@ -814,6 +801,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
@Override
public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
{
+ checkConfigurationStoreOpen();
+
com.sleepycat.je.Transaction txn = null;
try
{
@@ -885,7 +874,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
*
* @throws StoreException If the operation fails for any reason.
*/
- public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
+ private void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
long messageId) throws StoreException
{
@@ -924,7 +913,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
*
* @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
+ private void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
long messageId) throws StoreException
{
@@ -1341,38 +1330,35 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private void storeConfiguredObjectEntry(final Transaction txn, ConfiguredObjectRecord configuredObject) throws StoreException
{
- if (_stateManager.isInState(State.ACTIVE))
+ if (LOGGER.isDebugEnabled())
{
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Storing configured object: " + configuredObject);
- }
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance();
- uuidBinding.objectToEntry(configuredObject.getId(), key);
+ LOGGER.debug("Storing configured object record: " + configuredObject);
+ }
+ DatabaseEntry key = new DatabaseEntry();
+ UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance();
+ uuidBinding.objectToEntry(configuredObject.getId(), key);
- DatabaseEntry value = new DatabaseEntry();
- ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance();
+ DatabaseEntry value = new DatabaseEntry();
+ ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance();
- queueBinding.objectToEntry(configuredObject, value);
- try
- {
- OperationStatus status = getConfiguredObjectsDb().put(txn, key, value);
- if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Error writing configured object " + configuredObject + " to database: "
- + status);
- }
- writeHierarchyRecords(txn, configuredObject);
- }
- catch (DatabaseException e)
+ queueBinding.objectToEntry(configuredObject, value);
+ try
+ {
+ OperationStatus status = getConfiguredObjectsDb().put(txn, key, value);
+ if (status != OperationStatus.SUCCESS)
{
- throw _environmentFacade.handleDatabaseException("Error writing configured object " + configuredObject
- + " to database: " + e.getMessage(), e);
+ throw new StoreException("Error writing configured object " + configuredObject + " to database: "
+ + status);
}
+ writeHierarchyRecords(txn, configuredObject);
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error writing configured object " + configuredObject
+ + " to database: " + e.getMessage(), e);
}
}
-
+
private void writeHierarchyRecords(final Transaction txn, final ConfiguredObjectRecord configuredObject)
{
OperationStatus status;
@@ -1398,7 +1384,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
UUID id = record.getId();
Map<String, ConfiguredObjectRecord> parents = record.getParents();
-
+
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Removing configured object: " + id);
@@ -1449,11 +1435,14 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
_metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
}
+ @Override
public StorableMessageMetaData getMetaData()
{
StorableMessageMetaData metaData = _metaDataRef.get();
if(metaData == null)
{
+ checkMessageStoreOpen();
+
metaData = BDBMessageStore.this.getMessageMetaData(_messageId);
_metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
}
@@ -1461,11 +1450,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
return metaData;
}
+ @Override
public long getMessageNumber()
{
return _messageId;
}
+ @Override
public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
{
src = src.slice();
@@ -1488,6 +1479,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
+ @Override
public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
{
byte[] data = _dataRef == null ? null : _dataRef.get();
@@ -1499,10 +1491,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
else
{
+ checkMessageStoreOpen();
+
return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
}
}
+ @Override
public ByteBuffer getContent(int offsetInMessage, int size)
{
byte[] data = _dataRef == null ? null : _dataRef.get();
@@ -1539,10 +1534,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
}
+ @Override
public synchronized StoreFuture flushToStore()
{
if(!stored())
{
+ checkMessageStoreOpen();
+
com.sleepycat.je.Transaction txn;
try
{
@@ -1562,8 +1560,11 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
return StoreFuture.IMMEDIATE_FUTURE;
}
+ @Override
public void remove()
{
+ checkMessageStoreOpen();
+
int delta = getMetaData().getContentSize();
BDBMessageStore.this.removeMessage(_messageId, false);
storedSizeChangeOccured(-delta);
@@ -1592,8 +1593,11 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
}
+ @Override
public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException
{
+ checkMessageStoreOpen();
+
if(message.getStoredMessage() instanceof StoredBDBMessage)
{
final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage();
@@ -1604,36 +1608,54 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
BDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
}
+ @Override
public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException
{
+ checkMessageStoreOpen();
+
BDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber());
}
+ @Override
public void commitTran() throws StoreException
{
+ checkMessageStoreOpen();
+
BDBMessageStore.this.commitTranImpl(_txn, true);
BDBMessageStore.this.storedSizeChangeOccured(_storeSizeIncrease);
}
+ @Override
public StoreFuture commitTranAsync() throws StoreException
{
+ checkMessageStoreOpen();
+
BDBMessageStore.this.storedSizeChangeOccured(_storeSizeIncrease);
return BDBMessageStore.this.commitTranImpl(_txn, false);
}
+ @Override
public void abortTran() throws StoreException
{
+ checkMessageStoreOpen();
+
BDBMessageStore.this.abortTran(_txn);
}
+ @Override
public void removeXid(long format, byte[] globalId, byte[] branchId) throws StoreException
{
+ checkMessageStoreOpen();
+
BDBMessageStore.this.removeXid(_txn, format, globalId, branchId);
}
+ @Override
public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues,
Record[] dequeues) throws StoreException
{
+ checkMessageStoreOpen();
+
BDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues);
}
}
@@ -1697,6 +1719,22 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
}
+ private void checkConfigurationStoreOpen()
+ {
+ if (!_configurationStoreOpen.get())
+ {
+ throw new IllegalStateException("Configuration store is not open");
+ }
+ }
+
+ private void checkMessageStoreOpen()
+ {
+ if (!_messageStoreOpen.get())
+ {
+ throw new IllegalStateException("Message store is not open");
+ }
+ }
+
private void reduceSizeOnDisk()
{
_environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false");
@@ -1796,4 +1834,72 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
return _environmentFacade.getOpenDatabase(XID_DB_NAME);
}
+ class UpgradeTask implements EnvironmentFacadeTask
+ {
+
+ private ConfiguredObject<?> _parent;
+
+ public UpgradeTask(ConfiguredObject<?> parent)
+ {
+ _parent = parent;
+ }
+
+ @Override
+ public void execute(EnvironmentFacade facade)
+ {
+ try
+ {
+ new Upgrader(facade.getEnvironment(), _parent).upgradeIfNecessary();
+ }
+ catch(DatabaseException e)
+ {
+ throw facade.handleDatabaseException("Cannot upgrade store", e);
+ }
+ }
+ }
+
+ class OpenDatabasesTask implements EnvironmentFacadeTask
+ {
+ private String[] _names;
+
+ public OpenDatabasesTask(String[] names)
+ {
+ _names = names;
+ }
+
+ @Override
+ public void execute(EnvironmentFacade facade)
+ {
+ try
+ {
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ facade.openDatabases(dbConfig, _names);
+ }
+ catch(DatabaseException e)
+ {
+ throw facade.handleDatabaseException("Cannot open databases", e);
+ }
+ }
+
+ }
+
+ class DiskSpaceTask implements EnvironmentFacadeTask
+ {
+
+ @Override
+ public void execute(EnvironmentFacade facade)
+ {
+ try
+ {
+ _totalStoreSize = facade.getEnvironment().getStats(null).getTotalLogSize();
+ }
+ catch(DatabaseException e)
+ {
+ throw facade.handleDatabaseException("Cannot evaluate disk store size", e);
+ }
+ }
+
+ }
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
index 4abe81c56c..ef749f2472 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
@@ -20,12 +20,8 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory;
import org.apache.qpid.server.plugin.MessageStoreFactory;
@@ -54,53 +50,29 @@ public class BDBMessageStoreFactory implements MessageStoreFactory, DurableConfi
}
@Override
- public Map<String, Object> convertStoreConfiguration(Configuration storeConfiguration)
- {
- final List<Object> argumentNames = storeConfiguration.getList("envConfig.name");
- final List<Object> argumentValues = storeConfiguration.getList("envConfig.value");
- final int initialSize = argumentNames.size();
-
- final Map<String,String> attributes = new HashMap<String,String>(initialSize);
-
- for (int i = 0; i < argumentNames.size(); i++)
- {
- final String argName = argumentNames.get(i).toString();
- final String argValue = argumentValues.get(i).toString();
-
- attributes.put(argName, argValue);
- }
-
- if(initialSize != 0)
- {
- return Collections.singletonMap(BDBMessageStore.ENVIRONMENT_CONFIGURATION, (Object)attributes);
- }
- else
- {
- return Collections.emptyMap();
- }
-
-
- }
-
- @Override
public void validateAttributes(Map<String, Object> attributes)
{
- if(getType().equals(attributes.get(VirtualHost.STORE_TYPE)))
+ @SuppressWarnings("unchecked")
+ Map<String, Object> messageStoreSettings = (Map<String, Object>) attributes.get(VirtualHost.MESSAGE_STORE_SETTINGS);
+ if(messageStoreSettings != null && getType().equals(messageStoreSettings.get(MessageStore.STORE_TYPE)))
{
- Object storePath = attributes.get(VirtualHost.STORE_PATH);
+ Object storePath = messageStoreSettings.get(MessageStore.STORE_PATH);
if(!(storePath instanceof String))
{
- throw new IllegalArgumentException("Attribute '"+ VirtualHost.STORE_PATH
+ throw new IllegalArgumentException("Setting '"+ MessageStore.STORE_PATH
+"' is required and must be of type String.");
}
}
- if(getType().equals(attributes.get(VirtualHost.CONFIG_STORE_TYPE)))
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> configurationStoreSettings = (Map<String, Object>) attributes.get(VirtualHost.CONFIGURATION_STORE_SETTINGS);
+ if(configurationStoreSettings != null && getType().equals(configurationStoreSettings.get(DurableConfigurationStore.STORE_TYPE)))
{
- Object storePath = attributes.get(VirtualHost.CONFIG_STORE_PATH);
+ Object storePath = configurationStoreSettings.get(DurableConfigurationStore.STORE_PATH);
if(!(storePath instanceof String))
{
- throw new IllegalArgumentException("Attribute '"+ VirtualHost.CONFIG_STORE_PATH
+ throw new IllegalArgumentException("Setting '"+ DurableConfigurationStore.STORE_PATH
+"' is required and must be of type String.");
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
index b784e436b9..2e02a6cfed 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
@@ -20,13 +20,18 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import org.apache.qpid.server.model.VirtualHost;
+import java.util.Map;
public interface EnvironmentFacadeFactory
{
+ public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig";
- EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore);
+ EnvironmentFacade createEnvironmentFacade(Map<String, Object> storeSettings, EnvironmentFacadeTask... initialisationTasks);
String getType();
+ public static interface EnvironmentFacadeTask
+ {
+ void execute(EnvironmentFacade facade);
+ }
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
index 8117ca1a9a..6065be5fa9 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
@@ -42,7 +43,7 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
private Environment _environment;
- public StandardEnvironmentFacade(String storePath, Map<String, String> attributes)
+ public StandardEnvironmentFacade(String storePath, Map<String, String> attributes, EnvironmentFacadeTask[] initialisationTasks)
{
_storePath = storePath;
@@ -74,6 +75,13 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
_environment = new Environment(environmentPath, envConfig);
+ if (initialisationTasks != null)
+ {
+ for (EnvironmentFacadeTask task : initialisationTasks)
+ {
+ task.execute(this);
+ }
+ }
}
@Override
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
index 384ceba98a..9506b1c20a 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
@@ -20,51 +20,28 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import java.io.File;
import java.util.HashMap;
import java.util.Map;
-import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageStore;
public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactory
{
@SuppressWarnings("unchecked")
@Override
- public EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore)
+ public EnvironmentFacade createEnvironmentFacade(Map<String, Object> messageStoreSettings, EnvironmentFacadeTask... initialisationTasks)
{
Map<String, String> envConfigMap = new HashMap<String, String>();
envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS);
- Object environmentConfigurationAttributes = virtualHost.getAttribute(BDBMessageStore.ENVIRONMENT_CONFIGURATION);
+ Object environmentConfigurationAttributes = messageStoreSettings.get(ENVIRONMENT_CONFIGURATION);
if (environmentConfigurationAttributes instanceof Map)
{
envConfigMap.putAll((Map<String, String>) environmentConfigurationAttributes);
}
-
- String name = virtualHost.getName();
- final String defaultPath = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK) + File.separator + "bdbstore" + File.separator + name;
-
- String storeLocation;
- if(isMessageStore)
- {
- storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
- if(storeLocation == null)
- {
- storeLocation = defaultPath;
- }
- }
- else // we are acting only as the durable config store
- {
- storeLocation = (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH);
- if(storeLocation == null)
- {
- storeLocation = defaultPath;
- }
- }
-
- return new StandardEnvironmentFacade(storeLocation, envConfigMap);
+ String storeLocation = (String) messageStoreSettings.get(MessageStore.STORE_PATH);
+ return new StandardEnvironmentFacade(storeLocation, envConfigMap, initialisationTasks);
}
@Override
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 3e15e9bdcc..b8192ea741 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -40,12 +40,14 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;
import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter;
import org.apache.qpid.server.store.berkeleydb.Committer;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask;
import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener;
import org.apache.qpid.server.util.DaemonThreadFactory;
@@ -110,7 +112,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
*/
put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min");
/**
- * Parameter changed from default (off) to allow the Environment to start in the
+ * Parameter changed from default (off) to allow the Environment to start in the
* UNKNOWN state when the majority is not available.
*/
put(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "5 s");
@@ -148,7 +150,10 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private volatile long _joinTime;
private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
- public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration)
+ private AtomicBoolean _initialised;
+ private EnvironmentFacadeTask[] _initialisationTasks;
+
+ public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, EnvironmentFacadeTask[] initialisationTasks)
{
_environmentDirectory = new File(configuration.getStorePath());
if (!_environmentDirectory.exists())
@@ -160,6 +165,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
+ _initialised = new AtomicBoolean();
+ _initialisationTasks = initialisationTasks;
_configuration = configuration;
_durability = Durability.parse(_configuration.getDurability());
@@ -393,9 +400,10 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName);
_joinTime = System.currentTimeMillis();
}
+
if (state == ReplicatedEnvironment.State.MASTER)
{
- reopenDatabases();
+ onMasterStateChange();
}
}
@@ -413,6 +421,22 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
_lastKnownEnvironmentState = state;
}
+ private void onMasterStateChange()
+ {
+ reopenDatabases();
+
+ if (_initialised.compareAndSet(false, true))
+ {
+ if (_initialisationTasks != null)
+ {
+ for (EnvironmentFacadeTask task : _initialisationTasks)
+ {
+ task.execute(ReplicatedEnvironmentFacade.this);
+ }
+ }
+ }
+ }
+
private void reopenDatabases()
{
if (_state.get() == State.OPEN)
@@ -992,7 +1016,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
{
nodeState = ReplicatedEnvironment.State.UNKNOWN;
}
-
+
currentGroupState.put(node.getName(), nodeState);
return null;
}
@@ -1079,5 +1103,4 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
-
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
index cd53afe891..8216cfc484 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
@@ -22,7 +22,7 @@ package org.apache.qpid.server.store.berkeleydb.replication;
import java.util.Map;
-import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory;
@@ -32,47 +32,55 @@ import com.sleepycat.je.Durability.SyncPolicy;
public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory
{
-
+ public static final String DURABILITY = "haDurability";
+ public static final String GROUP_NAME = "haGroupName";
+ public static final String HELPER_ADDRESS = "haHelperAddress";
+ public static final String NODE_ADDRESS = "haNodeAddress";
+ public static final String NODE_NAME = "haNodeName";
+ public static final String REPLICATION_CONFIG = "haReplicationConfig";
+ public static final String COALESCING_SYNC = "haCoalescingSync";
+ public static final String DESIGNATED_PRIMARY = "haDesignatedPrimary";
+
private static final int DEFAULT_NODE_PRIORITY = 1;
private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC,
ReplicaAckPolicy.SIMPLE_MAJORITY);
private static final boolean DEFAULT_COALESCING_SYNC = true;
-
-
@Override
- public EnvironmentFacade createEnvironmentFacade(final VirtualHost virtualHost, boolean isMessageStore)
+ public EnvironmentFacade createEnvironmentFacade(final Map<String, Object> messageStoreSettings, EnvironmentFacadeTask... initialisationTasks)
{
ReplicatedEnvironmentConfiguration configuration = new ReplicatedEnvironmentConfiguration()
{
@Override
public boolean isDesignatedPrimary()
{
- return convertBoolean(virtualHost.getAttribute("haDesignatedPrimary"), false);
+ return convertBoolean(messageStoreSettings.get(DESIGNATED_PRIMARY), false);
}
@Override
public boolean isCoalescingSync()
{
- return convertBoolean(virtualHost.getAttribute("haCoalescingSync"), DEFAULT_COALESCING_SYNC);
+ return convertBoolean(messageStoreSettings.get(COALESCING_SYNC), DEFAULT_COALESCING_SYNC);
}
@Override
public String getStorePath()
{
- return (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
+ return (String) messageStoreSettings.get(MessageStore.STORE_PATH);
}
+ @SuppressWarnings("unchecked")
@Override
public Map<String, String> getParameters()
{
- return (Map<String, String>) virtualHost.getAttribute("bdbEnvironmentConfig");
+ return (Map<String, String>) messageStoreSettings.get(EnvironmentFacadeFactory.ENVIRONMENT_CONFIGURATION);
}
+ @SuppressWarnings("unchecked")
@Override
public Map<String, String> getReplicationParameters()
{
- return (Map<String, String>) virtualHost.getAttribute("haReplicationConfig");
+ return (Map<String, String>) messageStoreSettings.get(REPLICATION_CONFIG);
}
@Override
@@ -87,39 +95,38 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact
return DEFAULT_NODE_PRIORITY;
}
-
-
@Override
public String getName()
{
- return (String)virtualHost.getAttribute("haNodeName");
+ return (String)messageStoreSettings.get(NODE_NAME);
}
@Override
public String getHostPort()
{
- return (String)virtualHost.getAttribute("haNodeAddress");
+ return (String)messageStoreSettings.get(NODE_ADDRESS);
}
@Override
public String getHelperHostPort()
{
- return (String)virtualHost.getAttribute("haHelperAddress");
+ return (String)messageStoreSettings.get(HELPER_ADDRESS);
}
@Override
public String getGroupName()
{
- return (String)virtualHost.getAttribute("haGroupName");
+ return (String)messageStoreSettings.get(GROUP_NAME);
}
@Override
public String getDurability()
{
- return virtualHost.getAttribute("haDurability") == null ? DEFAULT_DURABILITY.toString() : (String)virtualHost.getAttribute("haDurability");
+ String durability = (String)messageStoreSettings.get(DURABILITY);
+ return durability == null ? DEFAULT_DURABILITY.toString() : durability;
}
};
- return new ReplicatedEnvironmentFacade(configuration);
+ return new ReplicatedEnvironmentFacade(configuration, initialisationTasks);
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java
index b06b6d533b..0ff90a6d77 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java
@@ -21,9 +21,10 @@
package org.apache.qpid.server.store.berkeleydb.upgrade;
import com.sleepycat.je.Environment;
-import org.apache.qpid.server.model.VirtualHost;
+
+import org.apache.qpid.server.model.ConfiguredObject;
public interface StoreUpgrade
{
- void performUpgrade(Environment environment, UpgradeInteractionHandler handler, VirtualHost virtualHost);
+ void performUpgrade(Environment environment, UpgradeInteractionHandler handler, ConfiguredObject<?> parent);
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
index 3eac47c81b..3588b96e88 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
@@ -39,7 +39,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StorableMessageMetaData;
@@ -75,7 +75,7 @@ public class UpgradeFrom4To5 extends AbstractStoreUpgrade
private static final Logger _logger = Logger.getLogger(UpgradeFrom4To5.class);
- public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, VirtualHost virtualHost)
+ public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, ConfiguredObject<?> parent)
{
Transaction transaction = null;
reportStarting(environment, 4);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
index dea8421a33..366b6a1c97 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
@@ -40,11 +40,11 @@ import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
@@ -119,11 +119,11 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade
* Queue, Exchange, Bindings entries are stored now as configurable objects
* in "CONFIGURED_OBJECTS" table.
*/
- public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, VirtualHost virtualHost)
+ public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, ConfiguredObject<?> parent)
{
reportStarting(environment, 5);
upgradeMessages(environment, handler);
- upgradeConfiguredObjectsAndDependencies(environment, handler, virtualHost.getName());
+ upgradeConfiguredObjectsAndDependencies(environment, handler, parent.getName());
renameDatabases(environment, null);
reportFinished(environment, 6);
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java
index 79314ae098..9dcd291b9d 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java
@@ -27,7 +27,8 @@ import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.Environment;
import com.sleepycat.je.OperationStatus;
-import org.apache.qpid.server.model.VirtualHost;
+
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.store.StoreException;
public class UpgradeFrom6To7 extends AbstractStoreUpgrade
@@ -36,7 +37,7 @@ public class UpgradeFrom6To7 extends AbstractStoreUpgrade
private static final int DEFAULT_CONFIG_VERSION = 0;
@Override
- public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, VirtualHost virtualHost)
+ public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, ConfiguredObject<?> parent)
{
reportStarting(environment, 6);
DatabaseConfig dbConfig = new DatabaseConfig();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java
index 3756c11d0c..413acc90c4 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java
@@ -26,7 +26,8 @@ import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
import com.sleepycat.je.*;
-import org.apache.qpid.server.model.VirtualHost;
+
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.berkeleydb.BDBConfiguredObjectRecord;
@@ -46,7 +47,7 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade
{
@Override
- public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, VirtualHost virtualHost)
+ public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, ConfiguredObject<?> parent)
{
reportStarting(environment, 7);
@@ -78,7 +79,7 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade
if(!type.endsWith("Binding"))
{
- UUIDTupleBinding.getInstance().objectToEntry(virtualHost.getId(),value);
+ UUIDTupleBinding.getInstance().objectToEntry(parent.getId(),value);
TupleOutput tupleOutput = new TupleOutput();
tupleOutput.writeLong(id.getMostSignificantBits());
tupleOutput.writeLong(id.getLeastSignificantBits());
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
index 0c77bb565c..e80d60609f 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
@@ -26,6 +26,7 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
@@ -46,12 +47,12 @@ public class Upgrader
static final String VERSION_DB_NAME = "DB_VERSION";
private Environment _environment;
- private VirtualHost _virtualHost;
+ private ConfiguredObject<?> _parent;
- public Upgrader(Environment environment, VirtualHost virtualHost)
+ public Upgrader(Environment environment, ConfiguredObject<?> parent)
{
_environment = environment;
- _virtualHost = virtualHost;
+ _parent = parent;
}
public void upgradeIfNecessary()
@@ -159,7 +160,7 @@ public class Upgrader
+ "UpgradeFrom"+fromVersion+"To"+toVersion);
Constructor<StoreUpgrade> ctr = upgradeClass.getConstructor();
StoreUpgrade upgrade = ctr.newInstance();
- upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, _virtualHost);
+ upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, _parent);
}
catch (ClassNotFoundException e)
{
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java
index bfe41773eb..e1678e6f65 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java
@@ -25,49 +25,9 @@ import org.apache.qpid.server.store.DurableConfigurationStore;
public class BDBMessageStoreConfigurationTest extends AbstractDurableConfigurationStoreTestCase
{
- private BDBMessageStore _bdbMessageStore;
-
- @Override
- protected BDBMessageStore createMessageStore() throws Exception
- {
- createStoreIfNecessary();
- return _bdbMessageStore;
- }
-
- @Override
- protected void closeMessageStore() throws Exception
- {
- closeStoreIfNecessary();
- }
-
@Override
protected DurableConfigurationStore createConfigStore() throws Exception
{
- createStoreIfNecessary();
-
- return _bdbMessageStore;
- }
-
- @Override
- protected void closeConfigStore() throws Exception
- {
- closeStoreIfNecessary();
- }
-
- private void createStoreIfNecessary()
- {
- if(_bdbMessageStore == null)
- {
- _bdbMessageStore = new BDBMessageStore();
- }
- }
-
- private void closeStoreIfNecessary() throws Exception
- {
- if (_bdbMessageStore != null)
- {
- _bdbMessageStore.close();
- _bdbMessageStore = null;
- }
+ return new BDBMessageStore();
}
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
index 4684358190..f2de01445d 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
@@ -21,16 +21,13 @@
package org.apache.qpid.server.store.berkeleydb;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
+
import org.apache.log4j.Logger;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreConstants;
import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.when;
-
public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase
{
private static final Logger _logger = Logger.getLogger(BDBMessageStoreQuotaEventsTest.class);
@@ -59,16 +56,19 @@ public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestB
return NUMBER_OF_MESSAGES_TO_OVERFILL_STORE;
}
+
@Override
- protected void applyStoreSpecificConfiguration(VirtualHost virtualHost)
+ protected Map<String, Object>createStoreSettings(String storeLocation)
{
- _logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE);
+ _logger.debug("Applying store specific config. overfull-size=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE);
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ messageStoreSettings.put(MessageStore.STORE_PATH, storeLocation);
+ messageStoreSettings.put(MessageStore.OVERFULL_SIZE, OVERFULL_SIZE);
+ messageStoreSettings.put(MessageStore.UNDERFULL_SIZE, UNDERFULL_SIZE);
Map<String,String> envMap = Collections.singletonMap("je.log.fileMax", MAX_BDB_LOG_SIZE);
- when(virtualHost.getAttribute(eq("bdbEnvironmentConfig"))).thenReturn(envMap);
- when(virtualHost.getAttribute(eq(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE))).thenReturn(OVERFULL_SIZE);
- when(virtualHost.getAttribute(eq(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE))).thenReturn(UNDERFULL_SIZE);
-
+ messageStoreSettings.put(EnvironmentFacadeFactory.ENVIRONMENT_CONFIGURATION, envMap);
+ return messageStoreSettings;
}
@Override
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java
deleted file mode 100644
index 385681446a..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreCreator;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-public class MessageStoreCreatorTest extends QpidTestCase
-{
- public void testMessageStoreCreator()
- {
- MessageStoreCreator messageStoreCreator = new MessageStoreCreator();
- String type = new BDBMessageStoreFactory().getType();
- MessageStore store = messageStoreCreator.createMessageStore(type);
- assertNotNull("Store of type " + type + " is not created", store);
- }}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
index b19e18b204..a82bb066e2 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
@@ -122,7 +122,7 @@ public class StandardEnvironmentFacadeTest extends QpidTestCase
EnvironmentFacade createEnvironmentFacade()
{
- return new StandardEnvironmentFacade(_storePath.getAbsolutePath(), Collections.<String, String>emptyMap());
+ return new StandardEnvironmentFacade(_storePath.getAbsolutePath(), Collections.<String, String>emptyMap(), null);
}
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
index a05a30b459..2caf85966c 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
@@ -27,7 +27,6 @@ import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import org.apache.qpid.server.configuration.ConfigurationEntry;
@@ -39,26 +38,24 @@ import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.test.utils.TestFileUtils;
import org.apache.qpid.util.FileUtils;
-import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicationConfig;
public class VirtualHostTest extends QpidTestCase
{
- private Broker _broker;
+ private Broker<?> _broker;
private StatisticsGatherer _statisticsGatherer;
private RecovererProvider _recovererProvider;
- private File _configFile;
private File _bdbStorePath;
- private VirtualHost _host;
+ private VirtualHost<?> _host;
private ConfigurationEntryStore _store;
@Override
@@ -72,7 +69,6 @@ public class VirtualHostTest extends QpidTestCase
when(taslExecutor.isTaskExecutorThread()).thenReturn(true);
when(_broker.getTaskExecutor()).thenReturn(taslExecutor);
-
_statisticsGatherer = mock(StatisticsGatherer.class);
_bdbStorePath = new File(TMP_FOLDER, getTestName() + "." + System.currentTimeMillis());
@@ -91,10 +87,6 @@ public class VirtualHostTest extends QpidTestCase
}
finally
{
- if (_configFile != null)
- {
- _configFile.delete();
- }
if (_bdbStorePath != null)
{
FileUtils.delete(_bdbStorePath, true);
@@ -103,106 +95,62 @@ public class VirtualHostTest extends QpidTestCase
}
}
-
- public void testCreateBdbVirtualHostFromConfigurationFile()
- {
- String hostName = getName();
- long logFileMax = 2000000;
- _host = createHostFromConfiguration(hostName, logFileMax);
- _host.setDesiredState(State.INITIALISING, State.ACTIVE);
- assertEquals("Unexpected host name", hostName, _host.getName());
- assertEquals("Unexpected host type", StandardVirtualHostFactory.TYPE, _host.getType());
- assertEquals("Unexpected store type", new BDBMessageStoreFactory().getType(), _host.getAttribute(VirtualHost.STORE_TYPE));
- assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH));
-
- BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore();
- EnvironmentConfig envConfig = messageStore.getEnvironmentFacade().getEnvironment().getConfig();
- assertEquals("Unexpected JE log file max", String.valueOf(logFileMax), envConfig.getConfigParam(EnvironmentConfig.LOG_FILE_MAX));
-
- }
-
- public void testCreateBdbHaVirtualHostFromConfigurationFile()
+ public void testCreateBdbHaVirtualHostFromConfigurationEntry()
{
- String hostName = getName();
-
String repStreamTimeout = "2 h";
String nodeName = "node";
String groupName = "group";
String nodeHostPort = "localhost:" + findFreePort();
String helperHostPort = nodeHostPort;
String durability = "NO_SYNC,SYNC,NONE";
- _host = createHaHostFromConfiguration(hostName, groupName, nodeName, nodeHostPort, helperHostPort, durability, repStreamTimeout);
+ String virtualHostName = getName();
+
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.NODE_NAME, nodeName);
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.GROUP_NAME, groupName);
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, nodeHostPort);
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.HELPER_ADDRESS, helperHostPort);
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.DURABILITY, durability);
+
+ messageStoreSettings.put(MessageStore.STORE_PATH, _bdbStorePath.getAbsolutePath());
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.REPLICATION_CONFIG,
+ Collections.singletonMap(ReplicationConfig.REP_STREAM_TIMEOUT, repStreamTimeout));
+
+ Map<String, Object> virtualHostAttributes = new HashMap<String, Object>();
+ virtualHostAttributes.put(VirtualHost.NAME, virtualHostName);
+ virtualHostAttributes.put(VirtualHost.TYPE, BDBHAVirtualHostFactory.TYPE);
+ virtualHostAttributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings);
+
+ _host = createHost(virtualHostAttributes);
_host.setDesiredState(State.INITIALISING, State.ACTIVE);
- assertEquals("Unexpected host name", hostName, _host.getName());
+
+ assertEquals("Unexpected virtual host name", virtualHostName, _host.getName());
assertEquals("Unexpected host type", BDBHAVirtualHostFactory.TYPE, _host.getType());
- assertEquals("Unexpected store type", ReplicatedEnvironmentFacade.TYPE, _host.getAttribute(VirtualHost.STORE_TYPE));
- assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH));
+
+ assertEquals(messageStoreSettings, _host.getMessageStoreSettings());
BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore();
ReplicatedEnvironment environment = (ReplicatedEnvironment) messageStore.getEnvironmentFacade().getEnvironment();
- ReplicationConfig repConfig = environment.getRepConfig();
- assertEquals("Unexpected JE replication groupName", groupName, repConfig.getConfigParam(ReplicationConfig.GROUP_NAME));
- assertEquals("Unexpected JE replication nodeName", nodeName, repConfig.getConfigParam(ReplicationConfig.NODE_NAME));
- assertEquals("Unexpected JE replication nodeHostPort", nodeHostPort, repConfig.getConfigParam(ReplicationConfig.NODE_HOST_PORT));
- assertEquals("Unexpected JE replication nodeHostPort", helperHostPort, repConfig.getConfigParam(ReplicationConfig.HELPER_HOSTS));
- assertEquals("Unexpected JE replication nodeHostPort", "false", repConfig.getConfigParam(ReplicationConfig.DESIGNATED_PRIMARY));
- assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, repConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT));
- }
+ ReplicationConfig replicationConfig = environment.getRepConfig();
- private VirtualHost createHost(Map<String, Object> attributes, Set<UUID> children)
- {
- ConfigurationEntry entry = new ConfigurationEntry(UUID.randomUUID(), VirtualHost.class.getSimpleName(), attributes,
- children, _store);
+ assertEquals(nodeName, environment.getNodeName());
+ assertEquals(groupName, environment.getGroup().getName());
+ assertEquals(nodeHostPort, replicationConfig.getNodeHostPort());
+ assertEquals(helperHostPort, replicationConfig.getHelperHosts());
+ assertEquals(durability, environment.getConfig().getDurability().toString());
+ assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, replicationConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT));
- return new VirtualHostRecoverer(_statisticsGatherer).create(_recovererProvider, entry, _broker);
}
- private VirtualHost createHost(Map<String, Object> attributes)
- {
- return createHost(attributes, Collections.<UUID> emptySet());
- }
- private VirtualHost createHostFromConfiguration(String hostName, long logFileMax)
+ private VirtualHost<?> createHost(Map<String, Object> attributes)
{
- String content = "<virtualhosts><virtualhost><name>" + hostName + "</name><" + hostName + ">"
- + "<store><class>" + BDBMessageStore.class.getName() + "</class>"
- + "<environment-path>" + _bdbStorePath.getAbsolutePath() + "</environment-path>"
- + "<envConfig><name>" + EnvironmentConfig.LOG_FILE_MAX + "</name><value>" + logFileMax + "</value></envConfig>"
- + "</store>"
- + "</" + hostName + "></virtualhost></virtualhosts>";
- Map<String, Object> attributes = writeConfigAndGenerateAttributes(content);
- return createHost(attributes);
- }
-
+ ConfigurationEntry entry = new ConfigurationEntry(UUID.randomUUID(), VirtualHost.class.getSimpleName(), attributes,
+ Collections.<UUID>emptySet(), _store);
- private VirtualHost createHaHostFromConfiguration(String hostName, String groupName, String nodeName, String nodeHostPort, String helperHostPort, String durability, String repStreamTimeout)
- {
- String content = "<virtualhosts><virtualhost><name>" + hostName + "</name><" + hostName + ">"
- + "<type>" + BDBHAVirtualHostFactory.TYPE + "</type>"
- + "<store><class>" + BDBMessageStore.class.getName() + "</class>"
- + "<environment-path>" + _bdbStorePath.getAbsolutePath() + "</environment-path>"
- + "<highAvailability>"
- + "<groupName>" + groupName + "</groupName>"
- + "<nodeName>" + nodeName + "</nodeName>"
- + "<nodeHostPort>" + nodeHostPort + "</nodeHostPort>"
- + "<helperHostPort>" + helperHostPort + "</helperHostPort>"
- + "<durability>" + durability.replaceAll(",", "\\\\,") + "</durability>"
- + "</highAvailability>"
- + "<repConfig><name>" + ReplicationConfig.REP_STREAM_TIMEOUT + "</name><value>" + repStreamTimeout + "</value></repConfig>"
- + "</store>"
- + "</" + hostName + "></virtualhost></virtualhosts>";
- Map<String, Object> attributes = writeConfigAndGenerateAttributes(content);
- return createHost(attributes);
+ return new VirtualHostRecoverer(_statisticsGatherer).create(_recovererProvider, entry, _broker);
}
- private Map<String, Object> writeConfigAndGenerateAttributes(String content)
- {
- _configFile = TestFileUtils.createTempFile(this, ".virtualhost.xml", content);
- Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(VirtualHost.NAME, getName());
- attributes.put(VirtualHost.CONFIG_PATH, _configFile.getAbsolutePath());
- return attributes;
- }
}
\ No newline at end of file
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index cd7dd69c46..b342493c59 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -31,8 +31,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.qpid.server.configuration.updater.TaskExecutor;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.test.utils.TestFileUtils;
@@ -65,16 +63,11 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
private File _storePath;
private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>();
- private VirtualHost _virtualHost = mock(VirtualHost.class);
public void setUp() throws Exception
{
super.setUp();
- TaskExecutor taskExecutor = mock(TaskExecutor.class);
- when(taskExecutor.isTaskExecutorThread()).thenReturn(true);
- when(_virtualHost.getTaskExecutor()).thenReturn(taskExecutor);
-
_storePath = TestFileUtils.createTestDirectory("bdb", true);
setTestSystemProperty(ReplicatedEnvironmentFacade.DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, "100");
@@ -302,7 +295,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
State desiredState, StateChangeListener stateChangeListener)
{
ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary);
- ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config);
+ ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config, null);
ref.setStateChangeListener(stateChangeListener);
_nodes.put(nodeName, ref);
return ref;