diff options
| author | Keith Wall <kwall@apache.org> | 2014-04-01 10:35:33 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-04-01 10:35:33 +0000 |
| commit | cf057f863f902d73477411b8d1f5b0a7541748b0 (patch) | |
| tree | 9ce0d2953a6d358f6f91a0de25e3dc8f719e8ee4 /qpid/java/bdbstore/src | |
| parent | 99231e1918fb20c750e7261f531d31ecec12b8f1 (diff) | |
| parent | c24467b81ca19de444d8a4bd4d5bb01cd6738df2 (diff) | |
| download | qpid-python-cf057f863f902d73477411b8d1f5b0a7541748b0.tar.gz | |
QPID-5653, QPID-5624: Remove support for virtual host xml and make message and configuration stores stateless.
This commit will temporarily break the UI
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1583597 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src')
22 files changed, 500 insertions, 686 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java index f13e4dd08b..a5eac25968 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java @@ -104,4 +104,11 @@ public class BDBConfiguredObjectRecord implements ConfiguredObjectRecord result = 31 * result + (_type != null ? _type.hashCode() : 0); return result; } + + @Override + public String toString() + { + return "BDBConfiguredObjectRecord [id=" + _id + ", type=" + _type + ", name=" + (_attributes == null ? null : _attributes.get("name")) + ", parents=" + _parents + "]"; + } + } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java index 3fdc12ba31..aae0a56a40 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java @@ -1,4 +1,3 @@ -package org.apache.qpid.server.store.berkeleydb; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,19 +18,20 @@ package org.apache.qpid.server.store.berkeleydb; * under the License. * */ +package org.apache.qpid.server.store.berkeleydb; + +import java.util.HashMap; +import java.util.Map; import org.apache.log4j.Logger; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.IConnectionRegistry; +import org.apache.qpid.server.logging.messages.MessageStoreMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.DurableConfigurationRecoverer; import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.Event; -import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.OperationalLoggingListener; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; import org.apache.qpid.server.virtualhost.AbstractVirtualHost; @@ -48,48 +48,31 @@ public class BDBHAVirtualHost extends AbstractVirtualHost private static final Logger LOGGER = Logger.getLogger(BDBHAVirtualHost.class); private BDBMessageStore _messageStore; - - private boolean _inVhostInitiatedClose; + private MessageStoreLogSubject _messageStoreLogSubject; BDBHAVirtualHost(VirtualHostRegistry virtualHostRegistry, StatisticsGatherer brokerStatisticsGatherer, org.apache.qpid.server.security.SecurityManager parentSecurityManager, - VirtualHostConfiguration hostConfig, VirtualHost virtualHost) { - super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig, virtualHost); + super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, virtualHost); } - protected void initialiseStorage(VirtualHostConfiguration hostConfig, VirtualHost virtualHost) + protected void initialiseStorage(VirtualHost virtualHost) { - _messageStore = new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory()); - - final MessageStoreLogSubject storeLogSubject = - new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName()); - OperationalLoggingListener.listen(_messageStore, storeLogSubject, getEventLogger()); - - _messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE); - _messageStore.addEventListener(new AfterActivationListener(), Event.AFTER_ACTIVATE); - _messageStore.addEventListener(new BeforeCloseListener(), Event.BEFORE_CLOSE); - - + setState(State.PASSIVE); - _messageStore.addEventListener(new AfterInitialisationListener(), Event.AFTER_INIT); - _messageStore.addEventListener(new BeforePassivationListener(), Event.BEFORE_PASSIVATE); + _messageStoreLogSubject = new MessageStoreLogSubject(getName(), BDBMessageStore.class.getSimpleName()); + _messageStore = new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory()); + getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.CREATED()); - VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this); - DurableConfigurationRecoverer configRecoverer = - new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(), - new DefaultUpgraderProvider(this, getExchangeRegistry()), getEventLogger()); + Map<String, Object> messageStoreSettings = new HashMap<String, Object>(virtualHost.getMessageStoreSettings()); + messageStoreSettings.put(DurableConfigurationStore.IS_MESSAGE_STORE_TOO, true); - _messageStore.configureConfigStore( - virtualHost, configRecoverer - ); + _messageStore.openConfigurationStore(virtualHost, messageStoreSettings); + _messageStore.openMessageStore(virtualHost, messageStoreSettings); - _messageStore.configureMessageStore( - virtualHost, recoveryHandler, - recoveryHandler - ); + getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.STORE_LOCATION(_messageStore.getStoreLocation())); // Make the virtualhost model object a replication group listener ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) _messageStore.getEnvironmentFacade(); @@ -97,29 +80,6 @@ public class BDBHAVirtualHost extends AbstractVirtualHost } - - protected void closeStorage() - { - //Close MessageStore - if (_messageStore != null) - { - //Remove MessageStore Interface should not throw Exception - try - { - _inVhostInitiatedClose = true; - getMessageStore().close(); - } - catch (Exception e) - { - getLogger().error("Failed to close message store", e); - } - finally - { - _inVhostInitiatedClose = false; - } - } - } - @Override public DurableConfigurationStore getDurableConfigurationStore() { @@ -132,77 +92,64 @@ public class BDBHAVirtualHost extends AbstractVirtualHost return _messageStore; } - private final class AfterInitialisationListener implements EventListener + private void activate() { - public void event(Event event) + try { - setState(State.PASSIVE); - } + _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true); - } + DefaultUpgraderProvider upgraderProvider = new DefaultUpgraderProvider(this); - private final class BeforePassivationListener implements EventListener - { - public void event(Event event) - { - State finalState = State.ERRORED; + DurableConfigurationRecoverer configRecoverer = + new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(), + upgraderProvider, getEventLogger()); + _messageStore.recoverConfigurationStore(configRecoverer); - try - { - /* the approach here is not ideal as there is a race condition where a - * queue etc could be created while the virtual host is on the way to - * the passivated state. However the store state change from MASTER to UNKNOWN - * is documented as exceptionally rare.. - */ - - getConnectionRegistry().close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT); - removeHouseKeepingTasks(); + initialiseModel(); - getQueueRegistry().stopAllAndUnregisterMBeans(); - getExchangeRegistry().clearAndUnregisterMbeans(); - getDtxRegistry().close(); + VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(BDBHAVirtualHost.this, getMessageStoreLogSubject()); + _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler); - finalState = State.PASSIVE; - } - finally - { - setState(finalState); - reportIfError(getState()); - } + attainActivation(); } - - } - - - private final class BeforeActivationListener implements EventListener - { - @Override - public void event(Event event) + catch (Exception e) { - initialiseModel(getConfiguration()); + LOGGER.error("Failed to activate on hearing MASTER change event", e); } } - private final class AfterActivationListener implements EventListener + private void passivate() { - @Override - public void event(Event event) + State finalState = State.ERRORED; + + try { - attainActivation(); + /* the approach here is not ideal as there is a race condition where a + * queue etc could be created while the virtual host is on the way to + * the passivated state. However the store state change from MASTER to UNKNOWN + * is documented as exceptionally rare. + */ + + getConnectionRegistry().close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT); + removeHouseKeepingTasks(); + + getQueueRegistry().stopAllAndUnregisterMBeans(); + getExchangeRegistry().clearAndUnregisterMbeans(); + getDtxRegistry().close(); + + finalState = State.PASSIVE; + } + finally + { + setState(finalState); + reportIfError(getState()); } } - private final class BeforeCloseListener implements EventListener + @Override + protected MessageStoreLogSubject getMessageStoreLogSubject() { - @Override - public void event(Event event) - { - if(!_inVhostInitiatedClose) - { - shutdownHouseKeeping(); - } - - } + return _messageStoreLogSubject; } private class BDBHAMessageStoreStateChangeListener implements StateChangeListener @@ -215,8 +162,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost if (LOGGER.isInfoEnabled()) { - LOGGER.info("Received BDB event indicating transition to state " + state - + " when current message store state is " + _messageStore._stateManager.getState()); + LOGGER.info("Received BDB event indicating transition to state " + state); } switch (state) @@ -239,36 +185,6 @@ public class BDBHAVirtualHost extends AbstractVirtualHost throw new IllegalStateException("Unexpected state change: " + state); } } - - private void activate() - { - try - { - _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true); - _messageStore.activate(); - } - catch (Exception e) - { - LOGGER.error("Failed to activate on hearing MASTER change event", e); - } - } - - private void passivate() - { - try - { - //TODO: move this this into the store method passivate() - if (_messageStore._stateManager.isNotInState(org.apache.qpid.server.store.State.INITIALISED)) - { - _messageStore._stateManager.attainState(org.apache.qpid.server.store.State.INITIALISED); - } - } - catch (Exception e) - { - LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event", e); - } - } - } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java index 7a308920b3..6fb84b8a4d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java @@ -19,16 +19,12 @@ package org.apache.qpid.server.store.berkeleydb;/* * */ -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; -import org.apache.commons.configuration.Configuration; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.model.adapter.VirtualHostAdapter; + import org.apache.qpid.server.plugin.VirtualHostFactory; import org.apache.qpid.server.stats.StatisticsGatherer; -import org.apache.qpid.server.store.MessageStoreConstants; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; @@ -47,24 +43,29 @@ public class BDBHAVirtualHostFactory implements VirtualHostFactory public VirtualHost createVirtualHost(VirtualHostRegistry virtualHostRegistry, StatisticsGatherer brokerStatisticsGatherer, org.apache.qpid.server.security.SecurityManager parentSecurityManager, - VirtualHostConfiguration hostConfig, org.apache.qpid.server.model.VirtualHost virtualHost) { return new BDBHAVirtualHost(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, - hostConfig, virtualHost); } @Override public void validateAttributes(Map<String, Object> attributes) { - validateAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH, String.class, attributes); - validateAttribute("haGroupName", String.class, attributes); - validateAttribute("haNodeName", String.class, attributes); - validateAttribute("haNodeAddress", String.class, attributes); - validateAttribute("haHelperAddress", String.class, attributes); + @SuppressWarnings("unchecked") + Map<String, Object> messageStoreSettings = (Map<String, Object>)attributes.get(org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS); + if (messageStoreSettings == null) + { + throw new IllegalArgumentException("Attribute '"+ org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS + "' is required."); + } + + validateAttribute(MessageStore.STORE_PATH, String.class, messageStoreSettings); + validateAttribute(ReplicatedEnvironmentFacadeFactory.GROUP_NAME, String.class, messageStoreSettings); + validateAttribute(ReplicatedEnvironmentFacadeFactory.NODE_NAME, String.class, messageStoreSettings); + validateAttribute(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, String.class, messageStoreSettings); + validateAttribute(ReplicatedEnvironmentFacadeFactory.HELPER_ADDRESS, String.class, messageStoreSettings); } private void validateAttribute(String attrName, Class<?> clazz, Map<String, Object> attributes) @@ -77,82 +78,4 @@ public class BDBHAVirtualHostFactory implements VirtualHostFactory } } - @Override - public Map<String, Object> createVirtualHostConfiguration(VirtualHostAdapter virtualHostAdapter) - { - LinkedHashMap<String,Object> convertedMap = new LinkedHashMap<String, Object>(); - convertedMap.put("store.environment-path", virtualHostAdapter.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH)); - - return convertedMap; - } - - public Map<String, Object> convertVirtualHostConfiguration(Configuration configuration) - { - - LinkedHashMap<String,Object> convertedMap = new LinkedHashMap<String, Object>(); - - Configuration storeConfiguration = configuration.subset("store"); - - convertedMap.put(org.apache.qpid.server.model.VirtualHost.STORE_PATH, storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY)); - convertedMap.put(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE, storeConfiguration.getString(MessageStoreConstants.OVERFULL_SIZE_PROPERTY)); - convertedMap.put(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE, storeConfiguration.getString(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY)); - convertedMap.put("haGroupName", configuration.getString("store.highAvailability.groupName")); - convertedMap.put("haNodeName", configuration.getString("store.highAvailability.nodeName")); - convertedMap.put("haNodeAddress", configuration.getString("store.highAvailability.nodeHostPort")); - convertedMap.put("haHelperAddress", configuration.getString("store.highAvailability.helperHostPort")); - - final Object haDurability = configuration.getString("store.highAvailability.durability"); - if(haDurability !=null) - { - convertedMap.put("haDurability", haDurability); - } - - final Object designatedPrimary = configuration.getString("store.highAvailability.designatedPrimary"); - if(designatedPrimary!=null) - { - convertedMap.put("haDesignatedPrimary", designatedPrimary); - } - - final Object coalescingSync = configuration.getString("store.highAvailability.coalescingSync"); - if(coalescingSync!=null) - { - convertedMap.put("haCoalescingSync", coalescingSync); - } - - - Map<String, String> attributes = getEnvironmentMap(storeConfiguration, "envConfig"); - - if(!attributes.isEmpty()) - { - convertedMap.put("bdbEnvironmentConfig",attributes); - } - - attributes = getEnvironmentMap(storeConfiguration, "repConfig"); - - if(!attributes.isEmpty()) - { - convertedMap.put("haReplicationConfig",attributes); - } - - return convertedMap; - - } - - private Map<String, String> getEnvironmentMap(Configuration storeConfiguration, String configName) - { - final List<Object> argumentNames = storeConfiguration.getList(configName +".name"); - final List<Object> argumentValues = storeConfiguration.getList(configName +".value"); - final int initialSize = argumentNames.size(); - - final Map<String,String> attributes = new HashMap<String,String>(initialSize); - - for (int i = 0; i < argumentNames.size(); i++) - { - final String argName = argumentNames.get(i).toString(); - final String argValue = argumentValues.get(i).toString(); - - attributes.put(argName, argValue); - } - return attributes; - } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 16255eb5ed..8aac9a6247 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.ConfiguredObjectRecord; @@ -46,11 +46,8 @@ import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.EventManager; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreConstants; import org.apache.qpid.server.store.MessageStoreRecoveryHandler; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; -import org.apache.qpid.server.store.State; -import org.apache.qpid.server.store.StateManager; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.StoreFuture; @@ -59,6 +56,7 @@ import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask; import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey; import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; @@ -72,6 +70,7 @@ import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding; import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding; import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader; +import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.util.FileUtils; import com.sleepycat.bind.tuple.ByteBinding; @@ -102,7 +101,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class); public static final int VERSION = 8; - public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig"; private static final int LOCK_RETRY_ATTEMPTS = 5; private static String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS"; private static String CONFIGURED_OBJECT_HIERARCHY_DB_NAME = "CONFIGURED_OBJECT_HIERARCHY"; @@ -110,25 +108,20 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private static String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA"; private static String MESSAGE_CONTENT_DB_NAME = "MESSAGE_CONTENT"; private static String DELIVERY_DB_NAME = "QUEUE_ENTRIES"; + + //TODO: Add upgrader to remove BRIDGES and LINKS private static String BRIDGEDB_NAME = "BRIDGES"; private static String LINKDB_NAME = "LINKS"; private static String XID_DB_NAME = "XIDS"; private static String CONFIG_VERSION_DB_NAME = "CONFIG_VERSION"; - private static final String[] DATABASE_NAMES = new String[] { CONFIGURED_OBJECTS_DB_NAME, CONFIGURED_OBJECT_HIERARCHY_DB_NAME, MESSAGE_META_DATA_DB_NAME, - MESSAGE_CONTENT_DB_NAME, DELIVERY_DB_NAME, BRIDGEDB_NAME, LINKDB_NAME, XID_DB_NAME, CONFIG_VERSION_DB_NAME }; - - private final AtomicBoolean _closed = new AtomicBoolean(false); + private static final String[] CONFIGURATION_STORE_DATABASE_NAMES = new String[] { CONFIGURED_OBJECTS_DB_NAME, CONFIG_VERSION_DB_NAME , CONFIGURED_OBJECT_HIERARCHY_DB_NAME}; + private static final String[] MESSAGE_STORE_DATABASE_NAMES = new String[] { MESSAGE_META_DATA_DB_NAME, MESSAGE_CONTENT_DB_NAME, DELIVERY_DB_NAME, BRIDGEDB_NAME, LINKDB_NAME, XID_DB_NAME }; private EnvironmentFacade _environmentFacade; private final AtomicLong _messageId = new AtomicLong(0); - protected final StateManager _stateManager; - - private MessageStoreRecoveryHandler _messageRecoveryHandler; - - private TransactionLogRecoveryHandler _tlogRecoveryHandler; - - private ConfigurationRecoveryHandler _configRecoveryHandler; + private final AtomicBoolean _messageStoreOpen = new AtomicBoolean(); + private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean(); private long _totalStoreSize; private boolean _limitBusted; @@ -137,12 +130,12 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private final EventManager _eventManager = new EventManager(); private final String _type; - private VirtualHost _virtualHost; private final EnvironmentFacadeFactory _environmentFacadeFactory; private volatile Committer _committer; + public BDBMessageStore() { this(new StandardEnvironmentFacadeFactory()); @@ -152,7 +145,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { _type = environmentFacadeFactory.getType(); _environmentFacadeFactory = environmentFacadeFactory; - _stateManager = new StateManager(_eventManager); } @Override @@ -162,110 +154,91 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } @Override - public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) + public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) { - _stateManager.attainState(State.INITIALISING); - - _configRecoveryHandler = recoveryHandler; - _virtualHost = virtualHost; + if (_configurationStoreOpen.compareAndSet(false, true)) + { + if (_environmentFacade == null) + { + String[] databaseNames = null; + if (MapValueConverter.getBooleanAttribute(IS_MESSAGE_STORE_TOO, storeSettings, false)) + { + databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length]; + System.arraycopy(CONFIGURATION_STORE_DATABASE_NAMES, 0, databaseNames, 0, CONFIGURATION_STORE_DATABASE_NAMES.length); + System.arraycopy(MESSAGE_STORE_DATABASE_NAMES, 0, databaseNames, CONFIGURATION_STORE_DATABASE_NAMES.length, MESSAGE_STORE_DATABASE_NAMES.length); + } + else + { + databaseNames = CONFIGURATION_STORE_DATABASE_NAMES; + } + _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, new UpgradeTask(parent), new OpenDatabasesTask(databaseNames)); + } + else + { + throw new IllegalStateException("The database have been already opened as message store"); + } + } } @Override - public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler) throws StoreException + public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) { - if(_stateManager.isInState(State.INITIAL)) - { - // Is acting as a message store, but not a durable config store - _stateManager.attainState(State.INITIALISING); - } + checkConfigurationStoreOpen(); - _messageRecoveryHandler = messageRecoveryHandler; - _tlogRecoveryHandler = tlogRecoveryHandler; - _virtualHost = virtualHost; - completeInitialisation(); + recoverConfig(recoveryHandler); } - private void completeInitialisation() throws StoreException + @Override + public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings) throws StoreException { - configure(_virtualHost, _messageRecoveryHandler != null); + if (_messageStoreOpen.compareAndSet(false, true)) + { - _stateManager.attainState(State.INITIALISED); - } + Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE); + Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE); - private void startActivation() throws StoreException - { - DatabaseConfig dbConfig = new DatabaseConfig(); - dbConfig.setTransactional(true); - dbConfig.setAllowCreate(true); - try - { - new Upgrader(_environmentFacade.getEnvironment(), _virtualHost).upgradeIfNecessary(); - _environmentFacade.openDatabases(dbConfig, DATABASE_NAMES); - _totalStoreSize = getSizeOnDisk(); - } - catch(DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Cannot configure store", e); - } + _persistentSizeHighThreshold = overfullAttr == null ? -1l : + overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString()); + _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold : + underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString()); + + if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) + { + _persistentSizeLowThreshold = _persistentSizeHighThreshold; + } + + if (_environmentFacade == null) + { + _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings, new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask()); + } + _committer = _environmentFacade.createCommitter(parent.getName()); + _committer.start(); + } } @Override - public synchronized void activate() throws StoreException + public synchronized void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) throws StoreException { - // check if acting as a durable config store, but not a message store - if(_stateManager.isInState(State.INITIALISING)) - { - completeInitialisation(); - } - - _stateManager.attainState(State.ACTIVATING); - startActivation(); + checkMessageStoreOpen(); - if(_configRecoveryHandler != null) + if(messageRecoveryHandler != null) { - recoverConfig(_configRecoveryHandler); + recoverMessages(messageRecoveryHandler); } - if(_messageRecoveryHandler != null) + if(transactionLogRecoveryHandler != null) { - recoverMessages(_messageRecoveryHandler); + recoverQueueEntries(transactionLogRecoveryHandler); } - if(_tlogRecoveryHandler != null) - { - recoverQueueEntries(_tlogRecoveryHandler); - } - - _stateManager.attainState(State.ACTIVE); } @Override public org.apache.qpid.server.store.Transaction newTransaction() throws StoreException { - return new BDBTransaction(); - } - - private void configure(VirtualHost virtualHost, boolean isMessageStore) throws StoreException - { - Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE); - Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE); - - _persistentSizeHighThreshold = overfullAttr == null ? -1l : - overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString()); - _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold : - underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString()); - + checkMessageStoreOpen(); - if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) - { - _persistentSizeLowThreshold = _persistentSizeHighThreshold; - } - - _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(virtualHost, isMessageStore); - - _committer = _environmentFacade.createCommitter(virtualHost.getName()); - _committer.start(); + return new BDBTransaction(); } @Override @@ -283,33 +256,47 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore return _environmentFacade; } - /** - * Called to close and cleanup any resources used by the message store. - * - * @throws Exception If the close fails. - */ @Override - public void close() throws StoreException + public void closeMessageStore() throws StoreException { - if (_closed.compareAndSet(false, true)) + if (_messageStoreOpen.compareAndSet(true, false)) { - _stateManager.attainState(State.CLOSING); try { - try + if (_committer != null) { _committer.stop(); } - finally + } + finally + { + if (!_configurationStoreOpen.get()) { closeEnvironment(); } } - catch(DatabaseException e) + } + } + + @Override + public void closeConfigurationStore() throws StoreException + { + if (_configurationStoreOpen.compareAndSet(true, false)) + { + try { - throw new StoreException("Exception occured on message store close", e); + if (_committer != null) + { + _committer.stop(); + } + } + finally + { + if (!_messageStoreOpen.get()) + { + closeEnvironment(); + } } - _stateManager.attainState(State.CLOSED); } } @@ -751,27 +738,25 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore @Override public void create(ConfiguredObjectRecord configuredObject) throws StoreException { - if (_stateManager.isInState(State.ACTIVE)) + checkConfigurationStoreOpen(); + com.sleepycat.je.Transaction txn = null; + try { - com.sleepycat.je.Transaction txn = null; - try - { - txn = _environmentFacade.getEnvironment().beginTransaction(null, null); - storeConfiguredObjectEntry(txn, configuredObject); - txn.commit(); - txn = null; - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Error creating configured object " + configuredObject - + " in database: " + e.getMessage(), e); - } - finally + txn = _environmentFacade.getEnvironment().beginTransaction(null, null); + storeConfiguredObjectEntry(txn, configuredObject); + txn.commit(); + txn = null; + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Error creating configured object " + configuredObject + + " in database: " + e.getMessage(), e); + } + finally + { + if (txn != null) { - if (txn != null) - { - abortTransactionIgnoringException("Error creating configured object", txn); - } + abortTransactionIgnoringException("Error creating configured object", txn); } } } @@ -779,11 +764,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore @Override public UUID[] remove(final ConfiguredObjectRecord... objects) throws StoreException { + checkConfigurationStoreOpen(); + com.sleepycat.je.Transaction txn = null; try { txn = _environmentFacade.getEnvironment().beginTransaction(null, null); - + Collection<UUID> removed = new ArrayList<UUID>(objects.length); for(ConfiguredObjectRecord record : objects) { @@ -792,7 +779,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore removed.add(record.getId()); } } - + txn.commit(); txn = null; return removed.toArray(new UUID[removed.size()]); @@ -814,6 +801,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore @Override public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException { + checkConfigurationStoreOpen(); + com.sleepycat.je.Transaction txn = null; try { @@ -885,7 +874,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore * * @throws StoreException If the operation fails for any reason. */ - public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, + private void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, long messageId) throws StoreException { @@ -924,7 +913,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore * * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. */ - public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, + private void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, long messageId) throws StoreException { @@ -1341,38 +1330,35 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private void storeConfiguredObjectEntry(final Transaction txn, ConfiguredObjectRecord configuredObject) throws StoreException { - if (_stateManager.isInState(State.ACTIVE)) + if (LOGGER.isDebugEnabled()) { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Storing configured object: " + configuredObject); - } - DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance(); - uuidBinding.objectToEntry(configuredObject.getId(), key); + LOGGER.debug("Storing configured object record: " + configuredObject); + } + DatabaseEntry key = new DatabaseEntry(); + UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance(); + uuidBinding.objectToEntry(configuredObject.getId(), key); - DatabaseEntry value = new DatabaseEntry(); - ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance(); + DatabaseEntry value = new DatabaseEntry(); + ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance(); - queueBinding.objectToEntry(configuredObject, value); - try - { - OperationStatus status = getConfiguredObjectsDb().put(txn, key, value); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Error writing configured object " + configuredObject + " to database: " - + status); - } - writeHierarchyRecords(txn, configuredObject); - } - catch (DatabaseException e) + queueBinding.objectToEntry(configuredObject, value); + try + { + OperationStatus status = getConfiguredObjectsDb().put(txn, key, value); + if (status != OperationStatus.SUCCESS) { - throw _environmentFacade.handleDatabaseException("Error writing configured object " + configuredObject - + " to database: " + e.getMessage(), e); + throw new StoreException("Error writing configured object " + configuredObject + " to database: " + + status); } + writeHierarchyRecords(txn, configuredObject); + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Error writing configured object " + configuredObject + + " to database: " + e.getMessage(), e); } } - + private void writeHierarchyRecords(final Transaction txn, final ConfiguredObjectRecord configuredObject) { OperationStatus status; @@ -1398,7 +1384,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { UUID id = record.getId(); Map<String, ConfiguredObjectRecord> parents = record.getParents(); - + if (LOGGER.isDebugEnabled()) { LOGGER.debug("Removing configured object: " + id); @@ -1449,11 +1435,14 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); } + @Override public StorableMessageMetaData getMetaData() { StorableMessageMetaData metaData = _metaDataRef.get(); if(metaData == null) { + checkMessageStoreOpen(); + metaData = BDBMessageStore.this.getMessageMetaData(_messageId); _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); } @@ -1461,11 +1450,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore return metaData; } + @Override public long getMessageNumber() { return _messageId; } + @Override public void addContent(int offsetInMessage, java.nio.ByteBuffer src) { src = src.slice(); @@ -1488,6 +1479,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } + @Override public int getContent(int offsetInMessage, java.nio.ByteBuffer dst) { byte[] data = _dataRef == null ? null : _dataRef.get(); @@ -1499,10 +1491,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } else { + checkMessageStoreOpen(); + return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst); } } + @Override public ByteBuffer getContent(int offsetInMessage, int size) { byte[] data = _dataRef == null ? null : _dataRef.get(); @@ -1539,10 +1534,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } + @Override public synchronized StoreFuture flushToStore() { if(!stored()) { + checkMessageStoreOpen(); + com.sleepycat.je.Transaction txn; try { @@ -1562,8 +1560,11 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore return StoreFuture.IMMEDIATE_FUTURE; } + @Override public void remove() { + checkMessageStoreOpen(); + int delta = getMetaData().getContentSize(); BDBMessageStore.this.removeMessage(_messageId, false); storedSizeChangeOccured(-delta); @@ -1592,8 +1593,11 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } + @Override public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException { + checkMessageStoreOpen(); + if(message.getStoredMessage() instanceof StoredBDBMessage) { final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage(); @@ -1604,36 +1608,54 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore BDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber()); } + @Override public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException { + checkMessageStoreOpen(); + BDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber()); } + @Override public void commitTran() throws StoreException { + checkMessageStoreOpen(); + BDBMessageStore.this.commitTranImpl(_txn, true); BDBMessageStore.this.storedSizeChangeOccured(_storeSizeIncrease); } + @Override public StoreFuture commitTranAsync() throws StoreException { + checkMessageStoreOpen(); + BDBMessageStore.this.storedSizeChangeOccured(_storeSizeIncrease); return BDBMessageStore.this.commitTranImpl(_txn, false); } + @Override public void abortTran() throws StoreException { + checkMessageStoreOpen(); + BDBMessageStore.this.abortTran(_txn); } + @Override public void removeXid(long format, byte[] globalId, byte[] branchId) throws StoreException { + checkMessageStoreOpen(); + BDBMessageStore.this.removeXid(_txn, format, globalId, branchId); } + @Override public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) throws StoreException { + checkMessageStoreOpen(); + BDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues); } } @@ -1697,6 +1719,22 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } + private void checkConfigurationStoreOpen() + { + if (!_configurationStoreOpen.get()) + { + throw new IllegalStateException("Configuration store is not open"); + } + } + + private void checkMessageStoreOpen() + { + if (!_messageStoreOpen.get()) + { + throw new IllegalStateException("Message store is not open"); + } + } + private void reduceSizeOnDisk() { _environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false"); @@ -1796,4 +1834,72 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore return _environmentFacade.getOpenDatabase(XID_DB_NAME); } + class UpgradeTask implements EnvironmentFacadeTask + { + + private ConfiguredObject<?> _parent; + + public UpgradeTask(ConfiguredObject<?> parent) + { + _parent = parent; + } + + @Override + public void execute(EnvironmentFacade facade) + { + try + { + new Upgrader(facade.getEnvironment(), _parent).upgradeIfNecessary(); + } + catch(DatabaseException e) + { + throw facade.handleDatabaseException("Cannot upgrade store", e); + } + } + } + + class OpenDatabasesTask implements EnvironmentFacadeTask + { + private String[] _names; + + public OpenDatabasesTask(String[] names) + { + _names = names; + } + + @Override + public void execute(EnvironmentFacade facade) + { + try + { + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + facade.openDatabases(dbConfig, _names); + } + catch(DatabaseException e) + { + throw facade.handleDatabaseException("Cannot open databases", e); + } + } + + } + + class DiskSpaceTask implements EnvironmentFacadeTask + { + + @Override + public void execute(EnvironmentFacade facade) + { + try + { + _totalStoreSize = facade.getEnvironment().getStats(null).getTotalLogSize(); + } + catch(DatabaseException e) + { + throw facade.handleDatabaseException("Cannot evaluate disk store size", e); + } + } + + } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java index 4abe81c56c..ef749f2472 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java @@ -20,12 +20,8 @@ */ package org.apache.qpid.server.store.berkeleydb; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; import java.util.Map; -import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory; import org.apache.qpid.server.plugin.MessageStoreFactory; @@ -54,53 +50,29 @@ public class BDBMessageStoreFactory implements MessageStoreFactory, DurableConfi } @Override - public Map<String, Object> convertStoreConfiguration(Configuration storeConfiguration) - { - final List<Object> argumentNames = storeConfiguration.getList("envConfig.name"); - final List<Object> argumentValues = storeConfiguration.getList("envConfig.value"); - final int initialSize = argumentNames.size(); - - final Map<String,String> attributes = new HashMap<String,String>(initialSize); - - for (int i = 0; i < argumentNames.size(); i++) - { - final String argName = argumentNames.get(i).toString(); - final String argValue = argumentValues.get(i).toString(); - - attributes.put(argName, argValue); - } - - if(initialSize != 0) - { - return Collections.singletonMap(BDBMessageStore.ENVIRONMENT_CONFIGURATION, (Object)attributes); - } - else - { - return Collections.emptyMap(); - } - - - } - - @Override public void validateAttributes(Map<String, Object> attributes) { - if(getType().equals(attributes.get(VirtualHost.STORE_TYPE))) + @SuppressWarnings("unchecked") + Map<String, Object> messageStoreSettings = (Map<String, Object>) attributes.get(VirtualHost.MESSAGE_STORE_SETTINGS); + if(messageStoreSettings != null && getType().equals(messageStoreSettings.get(MessageStore.STORE_TYPE))) { - Object storePath = attributes.get(VirtualHost.STORE_PATH); + Object storePath = messageStoreSettings.get(MessageStore.STORE_PATH); if(!(storePath instanceof String)) { - throw new IllegalArgumentException("Attribute '"+ VirtualHost.STORE_PATH + throw new IllegalArgumentException("Setting '"+ MessageStore.STORE_PATH +"' is required and must be of type String."); } } - if(getType().equals(attributes.get(VirtualHost.CONFIG_STORE_TYPE))) + + @SuppressWarnings("unchecked") + Map<String, Object> configurationStoreSettings = (Map<String, Object>) attributes.get(VirtualHost.CONFIGURATION_STORE_SETTINGS); + if(configurationStoreSettings != null && getType().equals(configurationStoreSettings.get(DurableConfigurationStore.STORE_TYPE))) { - Object storePath = attributes.get(VirtualHost.CONFIG_STORE_PATH); + Object storePath = configurationStoreSettings.get(DurableConfigurationStore.STORE_PATH); if(!(storePath instanceof String)) { - throw new IllegalArgumentException("Attribute '"+ VirtualHost.CONFIG_STORE_PATH + throw new IllegalArgumentException("Setting '"+ DurableConfigurationStore.STORE_PATH +"' is required and must be of type String."); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java index b784e436b9..2e02a6cfed 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java @@ -20,13 +20,18 @@ */ package org.apache.qpid.server.store.berkeleydb; -import org.apache.qpid.server.model.VirtualHost; +import java.util.Map; public interface EnvironmentFacadeFactory { + public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig"; - EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore); + EnvironmentFacade createEnvironmentFacade(Map<String, Object> storeSettings, EnvironmentFacadeTask... initialisationTasks); String getType(); + public static interface EnvironmentFacadeTask + { + void execute(EnvironmentFacade facade); + } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java index 8117ca1a9a..6065be5fa9 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.log4j.Logger; +import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; @@ -42,7 +43,7 @@ public class StandardEnvironmentFacade implements EnvironmentFacade private Environment _environment; - public StandardEnvironmentFacade(String storePath, Map<String, String> attributes) + public StandardEnvironmentFacade(String storePath, Map<String, String> attributes, EnvironmentFacadeTask[] initialisationTasks) { _storePath = storePath; @@ -74,6 +75,13 @@ public class StandardEnvironmentFacade implements EnvironmentFacade envConfig.setExceptionListener(new LoggingAsyncExceptionListener()); _environment = new Environment(environmentPath, envConfig); + if (initialisationTasks != null) + { + for (EnvironmentFacadeTask task : initialisationTasks) + { + task.execute(this); + } + } } @Override diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java index 384ceba98a..9506b1c20a 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java @@ -20,51 +20,28 @@ */ package org.apache.qpid.server.store.berkeleydb; -import java.io.File; import java.util.HashMap; import java.util.Map; -import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.MessageStore; public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactory { @SuppressWarnings("unchecked") @Override - public EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore) + public EnvironmentFacade createEnvironmentFacade(Map<String, Object> messageStoreSettings, EnvironmentFacadeTask... initialisationTasks) { Map<String, String> envConfigMap = new HashMap<String, String>(); envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS); - Object environmentConfigurationAttributes = virtualHost.getAttribute(BDBMessageStore.ENVIRONMENT_CONFIGURATION); + Object environmentConfigurationAttributes = messageStoreSettings.get(ENVIRONMENT_CONFIGURATION); if (environmentConfigurationAttributes instanceof Map) { envConfigMap.putAll((Map<String, String>) environmentConfigurationAttributes); } - - String name = virtualHost.getName(); - final String defaultPath = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK) + File.separator + "bdbstore" + File.separator + name; - - String storeLocation; - if(isMessageStore) - { - storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); - if(storeLocation == null) - { - storeLocation = defaultPath; - } - } - else // we are acting only as the durable config store - { - storeLocation = (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH); - if(storeLocation == null) - { - storeLocation = defaultPath; - } - } - - return new StandardEnvironmentFacade(storeLocation, envConfigMap); + String storeLocation = (String) messageStoreSettings.get(MessageStore.STORE_PATH); + return new StandardEnvironmentFacade(storeLocation, envConfigMap, initialisationTasks); } @Override diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index 3e15e9bdcc..b8192ea741 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -40,12 +40,14 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.log4j.Logger; import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter; import org.apache.qpid.server.store.berkeleydb.Committer; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; +import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask; import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener; import org.apache.qpid.server.util.DaemonThreadFactory; @@ -110,7 +112,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan */ put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min"); /** - * Parameter changed from default (off) to allow the Environment to start in the + * Parameter changed from default (off) to allow the Environment to start in the * UNKNOWN state when the majority is not available. */ put(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "5 s"); @@ -148,7 +150,10 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private volatile long _joinTime; private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState; - public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration) + private AtomicBoolean _initialised; + private EnvironmentFacadeTask[] _initialisationTasks; + + public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, EnvironmentFacadeTask[] initialisationTasks) { _environmentDirectory = new File(configuration.getStorePath()); if (!_environmentDirectory.exists()) @@ -160,6 +165,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } + _initialised = new AtomicBoolean(); + _initialisationTasks = initialisationTasks; _configuration = configuration; _durability = Durability.parse(_configuration.getDurability()); @@ -393,9 +400,10 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName); _joinTime = System.currentTimeMillis(); } + if (state == ReplicatedEnvironment.State.MASTER) { - reopenDatabases(); + onMasterStateChange(); } } @@ -413,6 +421,22 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan _lastKnownEnvironmentState = state; } + private void onMasterStateChange() + { + reopenDatabases(); + + if (_initialised.compareAndSet(false, true)) + { + if (_initialisationTasks != null) + { + for (EnvironmentFacadeTask task : _initialisationTasks) + { + task.execute(ReplicatedEnvironmentFacade.this); + } + } + } + } + private void reopenDatabases() { if (_state.get() == State.OPEN) @@ -992,7 +1016,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan { nodeState = ReplicatedEnvironment.State.UNKNOWN; } - + currentGroupState.put(node.getName(), nodeState); return null; } @@ -1079,5 +1103,4 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } - } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java index cd53afe891..8216cfc484 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java @@ -22,7 +22,7 @@ package org.apache.qpid.server.store.berkeleydb.replication; import java.util.Map; -import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory; @@ -32,47 +32,55 @@ import com.sleepycat.je.Durability.SyncPolicy; public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory { - + public static final String DURABILITY = "haDurability"; + public static final String GROUP_NAME = "haGroupName"; + public static final String HELPER_ADDRESS = "haHelperAddress"; + public static final String NODE_ADDRESS = "haNodeAddress"; + public static final String NODE_NAME = "haNodeName"; + public static final String REPLICATION_CONFIG = "haReplicationConfig"; + public static final String COALESCING_SYNC = "haCoalescingSync"; + public static final String DESIGNATED_PRIMARY = "haDesignatedPrimary"; + private static final int DEFAULT_NODE_PRIORITY = 1; private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC, ReplicaAckPolicy.SIMPLE_MAJORITY); private static final boolean DEFAULT_COALESCING_SYNC = true; - - @Override - public EnvironmentFacade createEnvironmentFacade(final VirtualHost virtualHost, boolean isMessageStore) + public EnvironmentFacade createEnvironmentFacade(final Map<String, Object> messageStoreSettings, EnvironmentFacadeTask... initialisationTasks) { ReplicatedEnvironmentConfiguration configuration = new ReplicatedEnvironmentConfiguration() { @Override public boolean isDesignatedPrimary() { - return convertBoolean(virtualHost.getAttribute("haDesignatedPrimary"), false); + return convertBoolean(messageStoreSettings.get(DESIGNATED_PRIMARY), false); } @Override public boolean isCoalescingSync() { - return convertBoolean(virtualHost.getAttribute("haCoalescingSync"), DEFAULT_COALESCING_SYNC); + return convertBoolean(messageStoreSettings.get(COALESCING_SYNC), DEFAULT_COALESCING_SYNC); } @Override public String getStorePath() { - return (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); + return (String) messageStoreSettings.get(MessageStore.STORE_PATH); } + @SuppressWarnings("unchecked") @Override public Map<String, String> getParameters() { - return (Map<String, String>) virtualHost.getAttribute("bdbEnvironmentConfig"); + return (Map<String, String>) messageStoreSettings.get(EnvironmentFacadeFactory.ENVIRONMENT_CONFIGURATION); } + @SuppressWarnings("unchecked") @Override public Map<String, String> getReplicationParameters() { - return (Map<String, String>) virtualHost.getAttribute("haReplicationConfig"); + return (Map<String, String>) messageStoreSettings.get(REPLICATION_CONFIG); } @Override @@ -87,39 +95,38 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact return DEFAULT_NODE_PRIORITY; } - - @Override public String getName() { - return (String)virtualHost.getAttribute("haNodeName"); + return (String)messageStoreSettings.get(NODE_NAME); } @Override public String getHostPort() { - return (String)virtualHost.getAttribute("haNodeAddress"); + return (String)messageStoreSettings.get(NODE_ADDRESS); } @Override public String getHelperHostPort() { - return (String)virtualHost.getAttribute("haHelperAddress"); + return (String)messageStoreSettings.get(HELPER_ADDRESS); } @Override public String getGroupName() { - return (String)virtualHost.getAttribute("haGroupName"); + return (String)messageStoreSettings.get(GROUP_NAME); } @Override public String getDurability() { - return virtualHost.getAttribute("haDurability") == null ? DEFAULT_DURABILITY.toString() : (String)virtualHost.getAttribute("haDurability"); + String durability = (String)messageStoreSettings.get(DURABILITY); + return durability == null ? DEFAULT_DURABILITY.toString() : durability; } }; - return new ReplicatedEnvironmentFacade(configuration); + return new ReplicatedEnvironmentFacade(configuration, initialisationTasks); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java index b06b6d533b..0ff90a6d77 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java @@ -21,9 +21,10 @@ package org.apache.qpid.server.store.berkeleydb.upgrade; import com.sleepycat.je.Environment; -import org.apache.qpid.server.model.VirtualHost; + +import org.apache.qpid.server.model.ConfiguredObject; public interface StoreUpgrade { - void performUpgrade(Environment environment, UpgradeInteractionHandler handler, VirtualHost virtualHost); + void performUpgrade(Environment environment, UpgradeInteractionHandler handler, ConfiguredObject<?> parent); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java index 3eac47c81b..3588b96e88 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java @@ -39,7 +39,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.protocol.v0_8.MessageMetaData; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.StorableMessageMetaData; @@ -75,7 +75,7 @@ public class UpgradeFrom4To5 extends AbstractStoreUpgrade private static final Logger _logger = Logger.getLogger(UpgradeFrom4To5.class); - public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, VirtualHost virtualHost) + public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, ConfiguredObject<?> parent) { Transaction transaction = null; reportStarting(environment, 4); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java index dea8421a33..366b6a1c97 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java @@ -40,11 +40,11 @@ import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; @@ -119,11 +119,11 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade * Queue, Exchange, Bindings entries are stored now as configurable objects * in "CONFIGURED_OBJECTS" table. */ - public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, VirtualHost virtualHost) + public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, ConfiguredObject<?> parent) { reportStarting(environment, 5); upgradeMessages(environment, handler); - upgradeConfiguredObjectsAndDependencies(environment, handler, virtualHost.getName()); + upgradeConfiguredObjectsAndDependencies(environment, handler, parent.getName()); renameDatabases(environment, null); reportFinished(environment, 6); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java index 79314ae098..9dcd291b9d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java @@ -27,7 +27,8 @@ import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.Environment; import com.sleepycat.je.OperationStatus; -import org.apache.qpid.server.model.VirtualHost; + +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.store.StoreException; public class UpgradeFrom6To7 extends AbstractStoreUpgrade @@ -36,7 +37,7 @@ public class UpgradeFrom6To7 extends AbstractStoreUpgrade private static final int DEFAULT_CONFIG_VERSION = 0; @Override - public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, VirtualHost virtualHost) + public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, ConfiguredObject<?> parent) { reportStarting(environment, 6); DatabaseConfig dbConfig = new DatabaseConfig(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java index 3756c11d0c..413acc90c4 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java @@ -26,7 +26,8 @@ import com.sleepycat.bind.tuple.TupleBinding; import com.sleepycat.bind.tuple.TupleInput; import com.sleepycat.bind.tuple.TupleOutput; import com.sleepycat.je.*; -import org.apache.qpid.server.model.VirtualHost; + +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.berkeleydb.BDBConfiguredObjectRecord; @@ -46,7 +47,7 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade { @Override - public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, VirtualHost virtualHost) + public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, ConfiguredObject<?> parent) { reportStarting(environment, 7); @@ -78,7 +79,7 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade if(!type.endsWith("Binding")) { - UUIDTupleBinding.getInstance().objectToEntry(virtualHost.getId(),value); + UUIDTupleBinding.getInstance().objectToEntry(parent.getId(),value); TupleOutput tupleOutput = new TupleOutput(); tupleOutput.writeLong(id.getMostSignificantBits()); tupleOutput.writeLong(id.getLeastSignificantBits()); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java index 0c77bb565c..e80d60609f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java @@ -26,6 +26,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import org.apache.log4j.Logger; +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; @@ -46,12 +47,12 @@ public class Upgrader static final String VERSION_DB_NAME = "DB_VERSION"; private Environment _environment; - private VirtualHost _virtualHost; + private ConfiguredObject<?> _parent; - public Upgrader(Environment environment, VirtualHost virtualHost) + public Upgrader(Environment environment, ConfiguredObject<?> parent) { _environment = environment; - _virtualHost = virtualHost; + _parent = parent; } public void upgradeIfNecessary() @@ -159,7 +160,7 @@ public class Upgrader + "UpgradeFrom"+fromVersion+"To"+toVersion); Constructor<StoreUpgrade> ctr = upgradeClass.getConstructor(); StoreUpgrade upgrade = ctr.newInstance(); - upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, _virtualHost); + upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, _parent); } catch (ClassNotFoundException e) { diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java index bfe41773eb..e1678e6f65 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java @@ -25,49 +25,9 @@ import org.apache.qpid.server.store.DurableConfigurationStore; public class BDBMessageStoreConfigurationTest extends AbstractDurableConfigurationStoreTestCase { - private BDBMessageStore _bdbMessageStore; - - @Override - protected BDBMessageStore createMessageStore() throws Exception - { - createStoreIfNecessary(); - return _bdbMessageStore; - } - - @Override - protected void closeMessageStore() throws Exception - { - closeStoreIfNecessary(); - } - @Override protected DurableConfigurationStore createConfigStore() throws Exception { - createStoreIfNecessary(); - - return _bdbMessageStore; - } - - @Override - protected void closeConfigStore() throws Exception - { - closeStoreIfNecessary(); - } - - private void createStoreIfNecessary() - { - if(_bdbMessageStore == null) - { - _bdbMessageStore = new BDBMessageStore(); - } - } - - private void closeStoreIfNecessary() throws Exception - { - if (_bdbMessageStore != null) - { - _bdbMessageStore.close(); - _bdbMessageStore = null; - } + return new BDBMessageStore(); } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java index 4684358190..f2de01445d 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java @@ -21,16 +21,13 @@ package org.apache.qpid.server.store.berkeleydb; import java.util.Collections; +import java.util.HashMap; import java.util.Map; + import org.apache.log4j.Logger; -import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreConstants; import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.when; - public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase { private static final Logger _logger = Logger.getLogger(BDBMessageStoreQuotaEventsTest.class); @@ -59,16 +56,19 @@ public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestB return NUMBER_OF_MESSAGES_TO_OVERFILL_STORE; } + @Override - protected void applyStoreSpecificConfiguration(VirtualHost virtualHost) + protected Map<String, Object>createStoreSettings(String storeLocation) { - _logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE); + _logger.debug("Applying store specific config. overfull-size=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE); + Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); + messageStoreSettings.put(MessageStore.STORE_PATH, storeLocation); + messageStoreSettings.put(MessageStore.OVERFULL_SIZE, OVERFULL_SIZE); + messageStoreSettings.put(MessageStore.UNDERFULL_SIZE, UNDERFULL_SIZE); Map<String,String> envMap = Collections.singletonMap("je.log.fileMax", MAX_BDB_LOG_SIZE); - when(virtualHost.getAttribute(eq("bdbEnvironmentConfig"))).thenReturn(envMap); - when(virtualHost.getAttribute(eq(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE))).thenReturn(OVERFULL_SIZE); - when(virtualHost.getAttribute(eq(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE))).thenReturn(UNDERFULL_SIZE); - + messageStoreSettings.put(EnvironmentFacadeFactory.ENVIRONMENT_CONFIGURATION, envMap); + return messageStoreSettings; } @Override diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java deleted file mode 100644 index 385681446a..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreCreator; -import org.apache.qpid.test.utils.QpidTestCase; - -public class MessageStoreCreatorTest extends QpidTestCase -{ - public void testMessageStoreCreator() - { - MessageStoreCreator messageStoreCreator = new MessageStoreCreator(); - String type = new BDBMessageStoreFactory().getType(); - MessageStore store = messageStoreCreator.createMessageStore(type); - assertNotNull("Store of type " + type + " is not created", store); - }} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java index b19e18b204..a82bb066e2 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java @@ -122,7 +122,7 @@ public class StandardEnvironmentFacadeTest extends QpidTestCase EnvironmentFacade createEnvironmentFacade() { - return new StandardEnvironmentFacade(_storePath.getAbsolutePath(), Collections.<String, String>emptyMap()); + return new StandardEnvironmentFacade(_storePath.getAbsolutePath(), Collections.<String, String>emptyMap(), null); } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java index a05a30b459..2caf85966c 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java @@ -27,7 +27,6 @@ import java.io.File; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.Set; import java.util.UUID; import org.apache.qpid.server.configuration.ConfigurationEntry; @@ -39,26 +38,24 @@ import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.test.utils.TestFileUtils; import org.apache.qpid.util.FileUtils; -import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.rep.ReplicatedEnvironment; import com.sleepycat.je.rep.ReplicationConfig; public class VirtualHostTest extends QpidTestCase { - private Broker _broker; + private Broker<?> _broker; private StatisticsGatherer _statisticsGatherer; private RecovererProvider _recovererProvider; - private File _configFile; private File _bdbStorePath; - private VirtualHost _host; + private VirtualHost<?> _host; private ConfigurationEntryStore _store; @Override @@ -72,7 +69,6 @@ public class VirtualHostTest extends QpidTestCase when(taslExecutor.isTaskExecutorThread()).thenReturn(true); when(_broker.getTaskExecutor()).thenReturn(taslExecutor); - _statisticsGatherer = mock(StatisticsGatherer.class); _bdbStorePath = new File(TMP_FOLDER, getTestName() + "." + System.currentTimeMillis()); @@ -91,10 +87,6 @@ public class VirtualHostTest extends QpidTestCase } finally { - if (_configFile != null) - { - _configFile.delete(); - } if (_bdbStorePath != null) { FileUtils.delete(_bdbStorePath, true); @@ -103,106 +95,62 @@ public class VirtualHostTest extends QpidTestCase } } - - public void testCreateBdbVirtualHostFromConfigurationFile() - { - String hostName = getName(); - long logFileMax = 2000000; - _host = createHostFromConfiguration(hostName, logFileMax); - _host.setDesiredState(State.INITIALISING, State.ACTIVE); - assertEquals("Unexpected host name", hostName, _host.getName()); - assertEquals("Unexpected host type", StandardVirtualHostFactory.TYPE, _host.getType()); - assertEquals("Unexpected store type", new BDBMessageStoreFactory().getType(), _host.getAttribute(VirtualHost.STORE_TYPE)); - assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH)); - - BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore(); - EnvironmentConfig envConfig = messageStore.getEnvironmentFacade().getEnvironment().getConfig(); - assertEquals("Unexpected JE log file max", String.valueOf(logFileMax), envConfig.getConfigParam(EnvironmentConfig.LOG_FILE_MAX)); - - } - - public void testCreateBdbHaVirtualHostFromConfigurationFile() + public void testCreateBdbHaVirtualHostFromConfigurationEntry() { - String hostName = getName(); - String repStreamTimeout = "2 h"; String nodeName = "node"; String groupName = "group"; String nodeHostPort = "localhost:" + findFreePort(); String helperHostPort = nodeHostPort; String durability = "NO_SYNC,SYNC,NONE"; - _host = createHaHostFromConfiguration(hostName, groupName, nodeName, nodeHostPort, helperHostPort, durability, repStreamTimeout); + String virtualHostName = getName(); + + Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.NODE_NAME, nodeName); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.GROUP_NAME, groupName); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, nodeHostPort); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.HELPER_ADDRESS, helperHostPort); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.DURABILITY, durability); + + messageStoreSettings.put(MessageStore.STORE_PATH, _bdbStorePath.getAbsolutePath()); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.REPLICATION_CONFIG, + Collections.singletonMap(ReplicationConfig.REP_STREAM_TIMEOUT, repStreamTimeout)); + + Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(); + virtualHostAttributes.put(VirtualHost.NAME, virtualHostName); + virtualHostAttributes.put(VirtualHost.TYPE, BDBHAVirtualHostFactory.TYPE); + virtualHostAttributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings); + + _host = createHost(virtualHostAttributes); _host.setDesiredState(State.INITIALISING, State.ACTIVE); - assertEquals("Unexpected host name", hostName, _host.getName()); + + assertEquals("Unexpected virtual host name", virtualHostName, _host.getName()); assertEquals("Unexpected host type", BDBHAVirtualHostFactory.TYPE, _host.getType()); - assertEquals("Unexpected store type", ReplicatedEnvironmentFacade.TYPE, _host.getAttribute(VirtualHost.STORE_TYPE)); - assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH)); + + assertEquals(messageStoreSettings, _host.getMessageStoreSettings()); BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore(); ReplicatedEnvironment environment = (ReplicatedEnvironment) messageStore.getEnvironmentFacade().getEnvironment(); - ReplicationConfig repConfig = environment.getRepConfig(); - assertEquals("Unexpected JE replication groupName", groupName, repConfig.getConfigParam(ReplicationConfig.GROUP_NAME)); - assertEquals("Unexpected JE replication nodeName", nodeName, repConfig.getConfigParam(ReplicationConfig.NODE_NAME)); - assertEquals("Unexpected JE replication nodeHostPort", nodeHostPort, repConfig.getConfigParam(ReplicationConfig.NODE_HOST_PORT)); - assertEquals("Unexpected JE replication nodeHostPort", helperHostPort, repConfig.getConfigParam(ReplicationConfig.HELPER_HOSTS)); - assertEquals("Unexpected JE replication nodeHostPort", "false", repConfig.getConfigParam(ReplicationConfig.DESIGNATED_PRIMARY)); - assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, repConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT)); - } + ReplicationConfig replicationConfig = environment.getRepConfig(); - private VirtualHost createHost(Map<String, Object> attributes, Set<UUID> children) - { - ConfigurationEntry entry = new ConfigurationEntry(UUID.randomUUID(), VirtualHost.class.getSimpleName(), attributes, - children, _store); + assertEquals(nodeName, environment.getNodeName()); + assertEquals(groupName, environment.getGroup().getName()); + assertEquals(nodeHostPort, replicationConfig.getNodeHostPort()); + assertEquals(helperHostPort, replicationConfig.getHelperHosts()); + assertEquals(durability, environment.getConfig().getDurability().toString()); + assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, replicationConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT)); - return new VirtualHostRecoverer(_statisticsGatherer).create(_recovererProvider, entry, _broker); } - private VirtualHost createHost(Map<String, Object> attributes) - { - return createHost(attributes, Collections.<UUID> emptySet()); - } - private VirtualHost createHostFromConfiguration(String hostName, long logFileMax) + private VirtualHost<?> createHost(Map<String, Object> attributes) { - String content = "<virtualhosts><virtualhost><name>" + hostName + "</name><" + hostName + ">" - + "<store><class>" + BDBMessageStore.class.getName() + "</class>" - + "<environment-path>" + _bdbStorePath.getAbsolutePath() + "</environment-path>" - + "<envConfig><name>" + EnvironmentConfig.LOG_FILE_MAX + "</name><value>" + logFileMax + "</value></envConfig>" - + "</store>" - + "</" + hostName + "></virtualhost></virtualhosts>"; - Map<String, Object> attributes = writeConfigAndGenerateAttributes(content); - return createHost(attributes); - } - + ConfigurationEntry entry = new ConfigurationEntry(UUID.randomUUID(), VirtualHost.class.getSimpleName(), attributes, + Collections.<UUID>emptySet(), _store); - private VirtualHost createHaHostFromConfiguration(String hostName, String groupName, String nodeName, String nodeHostPort, String helperHostPort, String durability, String repStreamTimeout) - { - String content = "<virtualhosts><virtualhost><name>" + hostName + "</name><" + hostName + ">" - + "<type>" + BDBHAVirtualHostFactory.TYPE + "</type>" - + "<store><class>" + BDBMessageStore.class.getName() + "</class>" - + "<environment-path>" + _bdbStorePath.getAbsolutePath() + "</environment-path>" - + "<highAvailability>" - + "<groupName>" + groupName + "</groupName>" - + "<nodeName>" + nodeName + "</nodeName>" - + "<nodeHostPort>" + nodeHostPort + "</nodeHostPort>" - + "<helperHostPort>" + helperHostPort + "</helperHostPort>" - + "<durability>" + durability.replaceAll(",", "\\\\,") + "</durability>" - + "</highAvailability>" - + "<repConfig><name>" + ReplicationConfig.REP_STREAM_TIMEOUT + "</name><value>" + repStreamTimeout + "</value></repConfig>" - + "</store>" - + "</" + hostName + "></virtualhost></virtualhosts>"; - Map<String, Object> attributes = writeConfigAndGenerateAttributes(content); - return createHost(attributes); + return new VirtualHostRecoverer(_statisticsGatherer).create(_recovererProvider, entry, _broker); } - private Map<String, Object> writeConfigAndGenerateAttributes(String content) - { - _configFile = TestFileUtils.createTempFile(this, ".virtualhost.xml", content); - Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put(VirtualHost.NAME, getName()); - attributes.put(VirtualHost.CONFIG_PATH, _configFile.getAbsolutePath()); - return attributes; - } }
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index cd7dd69c46..b342493c59 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -31,8 +31,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.qpid.server.configuration.updater.TaskExecutor; -import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.test.utils.TestFileUtils; @@ -65,16 +63,11 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase private File _storePath; private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>(); - private VirtualHost _virtualHost = mock(VirtualHost.class); public void setUp() throws Exception { super.setUp(); - TaskExecutor taskExecutor = mock(TaskExecutor.class); - when(taskExecutor.isTaskExecutorThread()).thenReturn(true); - when(_virtualHost.getTaskExecutor()).thenReturn(taskExecutor); - _storePath = TestFileUtils.createTestDirectory("bdb", true); setTestSystemProperty(ReplicatedEnvironmentFacade.DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, "100"); @@ -302,7 +295,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase State desiredState, StateChangeListener stateChangeListener) { ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary); - ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config); + ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config, null); ref.setStateChangeListener(stateChangeListener); _nodes.put(nodeName, ref); return ref; |
