summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2014-03-31 15:37:34 +0000
committerAlex Rudyy <orudyy@apache.org>2014-03-31 15:37:34 +0000
commit6de63bce6e61011a921337d9c3880e199f21c94c (patch)
tree539f0c2471e185edb5eb2ed5a14d5c69b0ef42db /qpid/java/bdbstore/src
parent4e8e65d2a2569aee45f20a09f4f34c6abcf81f59 (diff)
downloadqpid-python-6de63bce6e61011a921337d9c3880e199f21c94c.tar.gz
QPID-5653: Open databases and upgrade on opening of configuration/message stores
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha2@1583351 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java7
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java18
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java127
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java6
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java10
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java13
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java33
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java4
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java2
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java9
10 files changed, 159 insertions, 70 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java
index f13e4dd08b..a5eac25968 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java
@@ -104,4 +104,11 @@ public class BDBConfiguredObjectRecord implements ConfiguredObjectRecord
result = 31 * result + (_type != null ? _type.hashCode() : 0);
return result;
}
+
+ @Override
+ public String toString()
+ {
+ return "BDBConfiguredObjectRecord [id=" + _id + ", type=" + _type + ", name=" + (_attributes == null ? null : _attributes.get("name")) + ", parents=" + _parents + "]";
+ }
+
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
index 4fe41d512a..aae0a56a40 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.server.store.berkeleydb;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,7 +18,9 @@ package org.apache.qpid.server.store.berkeleydb;
* under the License.
*
*/
+package org.apache.qpid.server.store.berkeleydb;
+import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Logger;
@@ -65,10 +66,11 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
_messageStore = new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory());
getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.CREATED());
- Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>(virtualHost.getMessageStoreSettings());
+ messageStoreSettings.put(DurableConfigurationStore.IS_MESSAGE_STORE_TOO, true);
- _messageStore.openConfigurationStore(virtualHost.getName(), messageStoreSettings);
- _messageStore.openMessageStore(virtualHost.getName(), messageStoreSettings);
+ _messageStore.openConfigurationStore(virtualHost, messageStoreSettings);
+ _messageStore.openMessageStore(virtualHost, messageStoreSettings);
getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.STORE_LOCATION(_messageStore.getStoreLocation()));
@@ -96,15 +98,17 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
{
_messageStore.getEnvironmentFacade().getEnvironment().flushLog(true);
+ DefaultUpgraderProvider upgraderProvider = new DefaultUpgraderProvider(this);
+
DurableConfigurationRecoverer configRecoverer =
new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
- new DefaultUpgraderProvider(BDBHAVirtualHost.this, getExchangeRegistry()), getEventLogger());
- _messageStore.recoverConfigurationStore(getModel(), configRecoverer);
+ upgraderProvider, getEventLogger());
+ _messageStore.recoverConfigurationStore(configRecoverer);
initialiseModel();
VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(BDBHAVirtualHost.this, getMessageStoreLogSubject());
- _messageStore.recoverMessageStore(getModel(), recoveryHandler, recoveryHandler);
+ _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler);
attainActivation();
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 9f8e5410a7..8aac9a6247 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -56,6 +56,7 @@ import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask;
import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey;
import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
@@ -69,6 +70,7 @@ import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
+import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.util.FileUtils;
import com.sleepycat.bind.tuple.ByteBinding;
@@ -152,39 +154,43 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
@Override
- public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings)
+ public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
{
if (_configurationStoreOpen.compareAndSet(false, true))
{
if (_environmentFacade == null)
{
- _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(virtualHostName, storeSettings);
+ String[] databaseNames = null;
+ if (MapValueConverter.getBooleanAttribute(IS_MESSAGE_STORE_TOO, storeSettings, false))
+ {
+ databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length];
+ System.arraycopy(CONFIGURATION_STORE_DATABASE_NAMES, 0, databaseNames, 0, CONFIGURATION_STORE_DATABASE_NAMES.length);
+ System.arraycopy(MESSAGE_STORE_DATABASE_NAMES, 0, databaseNames, CONFIGURATION_STORE_DATABASE_NAMES.length, MESSAGE_STORE_DATABASE_NAMES.length);
+ }
+ else
+ {
+ databaseNames = CONFIGURATION_STORE_DATABASE_NAMES;
+ }
+ _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, new UpgradeTask(parent), new OpenDatabasesTask(databaseNames));
+ }
+ else
+ {
+ throw new IllegalStateException("The database have been already opened as message store");
}
}
}
@Override
- public void recoverConfigurationStore(ConfiguredObject<?> parent, ConfigurationRecoveryHandler recoveryHandler)
+ public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
{
checkConfigurationStoreOpen();
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setTransactional(true);
- dbConfig.setAllowCreate(true);
- try
- {
- new Upgrader(_environmentFacade.getEnvironment(), parent).upgradeIfNecessary();
- _environmentFacade.openDatabases(dbConfig, CONFIGURATION_STORE_DATABASE_NAMES);
- }
- catch(DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Cannot configure store", e);
- }
+
recoverConfig(recoveryHandler);
}
@Override
- public void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings) throws StoreException
+ public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings) throws StoreException
{
if (_messageStoreOpen.compareAndSet(false, true))
{
@@ -204,34 +210,19 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
if (_environmentFacade == null)
{
- _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(virtualHostName, messageStoreSettings);
+ _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings, new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask());
}
- _committer = _environmentFacade.createCommitter(virtualHostName);
+ _committer = _environmentFacade.createCommitter(parent.getName());
_committer.start();
-
}
}
@Override
- public synchronized void recoverMessageStore(ConfiguredObject<?> parent, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) throws StoreException
+ public synchronized void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) throws StoreException
{
checkMessageStoreOpen();
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setTransactional(true);
- dbConfig.setAllowCreate(true);
- try
- {
- new Upgrader(_environmentFacade.getEnvironment(), parent).upgradeIfNecessary();
- _environmentFacade.openDatabases(dbConfig, MESSAGE_STORE_DATABASE_NAMES);
- _totalStoreSize = getSizeOnDisk();
- }
- catch(DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Cannot upgrade message store or open datatbases", e);
- }
-
if(messageRecoveryHandler != null)
{
recoverMessages(messageRecoveryHandler);
@@ -1843,4 +1834,72 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
return _environmentFacade.getOpenDatabase(XID_DB_NAME);
}
+ class UpgradeTask implements EnvironmentFacadeTask
+ {
+
+ private ConfiguredObject<?> _parent;
+
+ public UpgradeTask(ConfiguredObject<?> parent)
+ {
+ _parent = parent;
+ }
+
+ @Override
+ public void execute(EnvironmentFacade facade)
+ {
+ try
+ {
+ new Upgrader(facade.getEnvironment(), _parent).upgradeIfNecessary();
+ }
+ catch(DatabaseException e)
+ {
+ throw facade.handleDatabaseException("Cannot upgrade store", e);
+ }
+ }
+ }
+
+ class OpenDatabasesTask implements EnvironmentFacadeTask
+ {
+ private String[] _names;
+
+ public OpenDatabasesTask(String[] names)
+ {
+ _names = names;
+ }
+
+ @Override
+ public void execute(EnvironmentFacade facade)
+ {
+ try
+ {
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ facade.openDatabases(dbConfig, _names);
+ }
+ catch(DatabaseException e)
+ {
+ throw facade.handleDatabaseException("Cannot open databases", e);
+ }
+ }
+
+ }
+
+ class DiskSpaceTask implements EnvironmentFacadeTask
+ {
+
+ @Override
+ public void execute(EnvironmentFacade facade)
+ {
+ try
+ {
+ _totalStoreSize = facade.getEnvironment().getStats(null).getTotalLogSize();
+ }
+ catch(DatabaseException e)
+ {
+ throw facade.handleDatabaseException("Cannot evaluate disk store size", e);
+ }
+ }
+
+ }
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
index fd064d9b0e..2e02a6cfed 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
@@ -26,8 +26,12 @@ public interface EnvironmentFacadeFactory
{
public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig";
- EnvironmentFacade createEnvironmentFacade(String virtualHostName, Map<String, Object> storeSettings);
+ EnvironmentFacade createEnvironmentFacade(Map<String, Object> storeSettings, EnvironmentFacadeTask... initialisationTasks);
String getType();
+ public static interface EnvironmentFacadeTask
+ {
+ void execute(EnvironmentFacade facade);
+ }
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
index 8117ca1a9a..6065be5fa9 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
@@ -42,7 +43,7 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
private Environment _environment;
- public StandardEnvironmentFacade(String storePath, Map<String, String> attributes)
+ public StandardEnvironmentFacade(String storePath, Map<String, String> attributes, EnvironmentFacadeTask[] initialisationTasks)
{
_storePath = storePath;
@@ -74,6 +75,13 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
_environment = new Environment(environmentPath, envConfig);
+ if (initialisationTasks != null)
+ {
+ for (EnvironmentFacadeTask task : initialisationTasks)
+ {
+ task.execute(this);
+ }
+ }
}
@Override
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
index cc38b799a6..9506b1c20a 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
@@ -20,11 +20,9 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import java.io.File;
import java.util.HashMap;
import java.util.Map;
-import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.store.MessageStore;
public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactory
@@ -32,25 +30,18 @@ public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactor
@SuppressWarnings("unchecked")
@Override
- public EnvironmentFacade createEnvironmentFacade(String virtualHostName, Map<String, Object> messageStoreSettings)
+ public EnvironmentFacade createEnvironmentFacade(Map<String, Object> messageStoreSettings, EnvironmentFacadeTask... initialisationTasks)
{
Map<String, String> envConfigMap = new HashMap<String, String>();
envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS);
- final String defaultPath = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK) + File.separator + "bdbstore" + File.separator + virtualHostName;
-
Object environmentConfigurationAttributes = messageStoreSettings.get(ENVIRONMENT_CONFIGURATION);
if (environmentConfigurationAttributes instanceof Map)
{
envConfigMap.putAll((Map<String, String>) environmentConfigurationAttributes);
}
String storeLocation = (String) messageStoreSettings.get(MessageStore.STORE_PATH);
- if(storeLocation == null)
- {
- storeLocation = defaultPath;
- }
-
- return new StandardEnvironmentFacade(storeLocation, envConfigMap);
+ return new StandardEnvironmentFacade(storeLocation, envConfigMap, initialisationTasks);
}
@Override
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 3e15e9bdcc..b8192ea741 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -40,12 +40,14 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;
import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter;
import org.apache.qpid.server.store.berkeleydb.Committer;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask;
import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener;
import org.apache.qpid.server.util.DaemonThreadFactory;
@@ -110,7 +112,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
*/
put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min");
/**
- * Parameter changed from default (off) to allow the Environment to start in the
+ * Parameter changed from default (off) to allow the Environment to start in the
* UNKNOWN state when the majority is not available.
*/
put(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "5 s");
@@ -148,7 +150,10 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private volatile long _joinTime;
private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
- public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration)
+ private AtomicBoolean _initialised;
+ private EnvironmentFacadeTask[] _initialisationTasks;
+
+ public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, EnvironmentFacadeTask[] initialisationTasks)
{
_environmentDirectory = new File(configuration.getStorePath());
if (!_environmentDirectory.exists())
@@ -160,6 +165,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
+ _initialised = new AtomicBoolean();
+ _initialisationTasks = initialisationTasks;
_configuration = configuration;
_durability = Durability.parse(_configuration.getDurability());
@@ -393,9 +400,10 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName);
_joinTime = System.currentTimeMillis();
}
+
if (state == ReplicatedEnvironment.State.MASTER)
{
- reopenDatabases();
+ onMasterStateChange();
}
}
@@ -413,6 +421,22 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
_lastKnownEnvironmentState = state;
}
+ private void onMasterStateChange()
+ {
+ reopenDatabases();
+
+ if (_initialised.compareAndSet(false, true))
+ {
+ if (_initialisationTasks != null)
+ {
+ for (EnvironmentFacadeTask task : _initialisationTasks)
+ {
+ task.execute(ReplicatedEnvironmentFacade.this);
+ }
+ }
+ }
+ }
+
private void reopenDatabases()
{
if (_state.get() == State.OPEN)
@@ -992,7 +1016,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
{
nodeState = ReplicatedEnvironment.State.UNKNOWN;
}
-
+
currentGroupState.put(node.getName(), nodeState);
return null;
}
@@ -1079,5 +1103,4 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
-
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
index c6b3e48cf8..8216cfc484 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
@@ -47,7 +47,7 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact
private static final boolean DEFAULT_COALESCING_SYNC = true;
@Override
- public EnvironmentFacade createEnvironmentFacade(String virtualHostName, final Map<String, Object> messageStoreSettings)
+ public EnvironmentFacade createEnvironmentFacade(final Map<String, Object> messageStoreSettings, EnvironmentFacadeTask... initialisationTasks)
{
ReplicatedEnvironmentConfiguration configuration = new ReplicatedEnvironmentConfiguration()
{
@@ -126,7 +126,7 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact
return durability == null ? DEFAULT_DURABILITY.toString() : durability;
}
};
- return new ReplicatedEnvironmentFacade(configuration);
+ return new ReplicatedEnvironmentFacade(configuration, initialisationTasks);
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
index b19e18b204..a82bb066e2 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
@@ -122,7 +122,7 @@ public class StandardEnvironmentFacadeTest extends QpidTestCase
EnvironmentFacade createEnvironmentFacade()
{
- return new StandardEnvironmentFacade(_storePath.getAbsolutePath(), Collections.<String, String>emptyMap());
+ return new StandardEnvironmentFacade(_storePath.getAbsolutePath(), Collections.<String, String>emptyMap(), null);
}
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index cd7dd69c46..b342493c59 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -31,8 +31,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.qpid.server.configuration.updater.TaskExecutor;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.test.utils.TestFileUtils;
@@ -65,16 +63,11 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
private File _storePath;
private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>();
- private VirtualHost _virtualHost = mock(VirtualHost.class);
public void setUp() throws Exception
{
super.setUp();
- TaskExecutor taskExecutor = mock(TaskExecutor.class);
- when(taskExecutor.isTaskExecutorThread()).thenReturn(true);
- when(_virtualHost.getTaskExecutor()).thenReturn(taskExecutor);
-
_storePath = TestFileUtils.createTestDirectory("bdb", true);
setTestSystemProperty(ReplicatedEnvironmentFacade.DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, "100");
@@ -302,7 +295,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
State desiredState, StateChangeListener stateChangeListener)
{
ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary);
- ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config);
+ ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config, null);
ref.setStateChangeListener(stateChangeListener);
_nodes.put(nodeName, ref);
return ref;