summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-03-14 16:39:47 +0000
committerKeith Wall <kwall@apache.org>2014-03-14 16:39:47 +0000
commitec486999608568e37a55dc9c81d9be133d95ebc3 (patch)
tree87d6446e97cfdca321b1faff6f24a3010df4cdff /qpid/java/bdbstore
parentdb26915f9b2edfa410c094162bec78b9d2010b24 (diff)
downloadqpid-python-ec486999608568e37a55dc9c81d9be133d95ebc3.tar.gz
QPID-5624: Introduce messageStoreSettings VH attribute and move all message store related attributes into messageStoreSettings map
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha2@1577606 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore')
-rw-r--r--qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java4
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java19
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java50
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java8
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java3
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java10
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java41
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java27
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java38
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java4
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java14
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java2
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java39
13 files changed, 162 insertions, 97 deletions
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
index 16199d30a3..24d7513c5f 100644
--- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
+++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
@@ -28,6 +28,7 @@ import org.apache.qpid.server.jmx.MBeanProvider;
import org.apache.qpid.server.jmx.ManagedObject;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.berkeleydb.BDBHAVirtualHostFactory;
import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
@@ -48,8 +49,7 @@ public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider
@Override
public boolean isChildManageableByMBean(ConfiguredObject child)
{
- return (child instanceof VirtualHost
- && ReplicatedEnvironmentFacade.TYPE.equals(child.getAttribute(VirtualHost.STORE_TYPE)));
+ return (child instanceof VirtualHost && BDBHAVirtualHostFactory.TYPE.equals(child.getType()));
}
@Override
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
index b2ec96f9f8..6fb84b8a4d 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
@@ -23,6 +23,8 @@ import java.util.Map;
import org.apache.qpid.server.plugin.VirtualHostFactory;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
@@ -52,11 +54,18 @@ public class BDBHAVirtualHostFactory implements VirtualHostFactory
@Override
public void validateAttributes(Map<String, Object> attributes)
{
- validateAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH, String.class, attributes);
- validateAttribute("haGroupName", String.class, attributes);
- validateAttribute("haNodeName", String.class, attributes);
- validateAttribute("haNodeAddress", String.class, attributes);
- validateAttribute("haHelperAddress", String.class, attributes);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> messageStoreSettings = (Map<String, Object>)attributes.get(org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS);
+ if (messageStoreSettings == null)
+ {
+ throw new IllegalArgumentException("Attribute '"+ org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS + "' is required.");
+ }
+
+ validateAttribute(MessageStore.STORE_PATH, String.class, messageStoreSettings);
+ validateAttribute(ReplicatedEnvironmentFacadeFactory.GROUP_NAME, String.class, messageStoreSettings);
+ validateAttribute(ReplicatedEnvironmentFacadeFactory.NODE_NAME, String.class, messageStoreSettings);
+ validateAttribute(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, String.class, messageStoreSettings);
+ validateAttribute(ReplicatedEnvironmentFacadeFactory.HELPER_ADDRESS, String.class, messageStoreSettings);
}
private void validateAttribute(String attrName, Class<?> clazz, Map<String, Object> attributes)
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 35dae4b800..c8550b2114 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -20,12 +20,6 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import com.sleepycat.bind.tuple.ByteBinding;
-import com.sleepycat.bind.tuple.IntegerBinding;
-import com.sleepycat.bind.tuple.LongBinding;
-import com.sleepycat.je.*;
-import com.sleepycat.je.Transaction;
-
import java.io.File;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
@@ -36,16 +30,32 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.*;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.EventManager;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
+import org.apache.qpid.server.store.State;
+import org.apache.qpid.server.store.StateManager;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMemoryMessage;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
import org.apache.qpid.server.store.berkeleydb.entry.Xid;
@@ -59,6 +69,21 @@ import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
import org.apache.qpid.util.FileUtils;
+import com.sleepycat.bind.tuple.ByteBinding;
+import com.sleepycat.bind.tuple.IntegerBinding;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.CheckpointConfig;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockConflictException;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.Transaction;
+
/**
* BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
*
@@ -72,8 +97,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class);
public static final int VERSION = 7;
- public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig";
-
private static final int LOCK_RETRY_ATTEMPTS = 5;
private static String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS";
private static String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA";
@@ -119,7 +142,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
public BDBMessageStore(EnvironmentFacadeFactory environmentFacadeFactory)
{
- _type = environmentFacadeFactory.getType();;
+ _type = environmentFacadeFactory.getType();
_environmentFacadeFactory = environmentFacadeFactory;
_stateManager = new StateManager(_eventManager);
}
@@ -218,8 +241,9 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private void configure(VirtualHost virtualHost, boolean isMessageStore) throws StoreException
{
- Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE);
- Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE);
+ Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
+ Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE);
+ Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE);
_persistentSizeHighThreshold = overfullAttr == null ? -1l :
overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString());
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
index 8f2086a25c..e2b30f6740 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
@@ -52,12 +52,14 @@ public class BDBMessageStoreFactory implements MessageStoreFactory, DurableConfi
@Override
public void validateAttributes(Map<String, Object> attributes)
{
- if(getType().equals(attributes.get(VirtualHost.STORE_TYPE)))
+ @SuppressWarnings("unchecked")
+ Map<String, Object> messageStoreSettings = (Map<String, Object>) attributes.get(VirtualHost.MESSAGE_STORE_SETTINGS);
+ if(getType().equals(messageStoreSettings.get(MessageStore.STORE_TYPE)))
{
- Object storePath = attributes.get(VirtualHost.STORE_PATH);
+ Object storePath = messageStoreSettings.get(MessageStore.STORE_PATH);
if(!(storePath instanceof String))
{
- throw new IllegalArgumentException("Attribute '"+ VirtualHost.STORE_PATH
+ throw new IllegalArgumentException("Setting '"+ MessageStore.STORE_PATH
+"' is required and must be of type String.");
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
index b784e436b9..d242790efb 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
@@ -24,8 +24,9 @@ import org.apache.qpid.server.model.VirtualHost;
public interface EnvironmentFacadeFactory
{
+ public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig";
- EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore);
+ EnvironmentFacade createEnvironmentFacade(VirtualHost<?> virtualHost, boolean isMessageStore);
String getType();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
index 384ceba98a..7fdae6b3ee 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
@@ -26,30 +26,32 @@ import java.util.Map;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageStore;
public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactory
{
@SuppressWarnings("unchecked")
@Override
- public EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore)
+ public EnvironmentFacade createEnvironmentFacade(VirtualHost<?> virtualHost, boolean isMessageStore)
{
+ String name = virtualHost.getName();
+ Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
Map<String, String> envConfigMap = new HashMap<String, String>();
envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS);
- Object environmentConfigurationAttributes = virtualHost.getAttribute(BDBMessageStore.ENVIRONMENT_CONFIGURATION);
+ Object environmentConfigurationAttributes = messageStoreSettings.get(ENVIRONMENT_CONFIGURATION);
if (environmentConfigurationAttributes instanceof Map)
{
envConfigMap.putAll((Map<String, String>) environmentConfigurationAttributes);
}
- String name = virtualHost.getName();
final String defaultPath = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK) + File.separator + "bdbstore" + File.separator + name;
String storeLocation;
if(isMessageStore)
{
- storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
+ storeLocation = (String) messageStoreSettings.get(MessageStore.STORE_PATH);
if(storeLocation == null)
{
storeLocation = defaultPath;
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
index cd53afe891..4df62b1d0f 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb.replication;
import java.util.Map;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory;
@@ -32,47 +33,56 @@ import com.sleepycat.je.Durability.SyncPolicy;
public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory
{
-
+ public static final String DURABILITY = "haDurability";
+ public static final String GROUP_NAME = "haGroupName";
+ public static final String HELPER_ADDRESS = "haHelperAddress";
+ public static final String NODE_ADDRESS = "haNodeAddress";
+ public static final String NODE_NAME = "haNodeName";
+ public static final String REPLICATION_CONFIG = "haReplicationConfig";
+ public static final String COALESCING_SYNC = "haCoalescingSync";
+ public static final String DESIGNATED_PRIMARY = "haDesignatedPrimary";
+
private static final int DEFAULT_NODE_PRIORITY = 1;
private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC,
ReplicaAckPolicy.SIMPLE_MAJORITY);
private static final boolean DEFAULT_COALESCING_SYNC = true;
-
-
@Override
- public EnvironmentFacade createEnvironmentFacade(final VirtualHost virtualHost, boolean isMessageStore)
+ public EnvironmentFacade createEnvironmentFacade(VirtualHost<?> virtualHost, boolean isMessageStore)
{
+ final Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
ReplicatedEnvironmentConfiguration configuration = new ReplicatedEnvironmentConfiguration()
{
@Override
public boolean isDesignatedPrimary()
{
- return convertBoolean(virtualHost.getAttribute("haDesignatedPrimary"), false);
+ return convertBoolean(messageStoreSettings.get(DESIGNATED_PRIMARY), false);
}
@Override
public boolean isCoalescingSync()
{
- return convertBoolean(virtualHost.getAttribute("haCoalescingSync"), DEFAULT_COALESCING_SYNC);
+ return convertBoolean(messageStoreSettings.get(COALESCING_SYNC), DEFAULT_COALESCING_SYNC);
}
@Override
public String getStorePath()
{
- return (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
+ return (String) messageStoreSettings.get(MessageStore.STORE_PATH);
}
+ @SuppressWarnings("unchecked")
@Override
public Map<String, String> getParameters()
{
- return (Map<String, String>) virtualHost.getAttribute("bdbEnvironmentConfig");
+ return (Map<String, String>) messageStoreSettings.get(EnvironmentFacadeFactory.ENVIRONMENT_CONFIGURATION);
}
+ @SuppressWarnings("unchecked")
@Override
public Map<String, String> getReplicationParameters()
{
- return (Map<String, String>) virtualHost.getAttribute("haReplicationConfig");
+ return (Map<String, String>) messageStoreSettings.get(REPLICATION_CONFIG);
}
@Override
@@ -87,36 +97,35 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact
return DEFAULT_NODE_PRIORITY;
}
-
-
@Override
public String getName()
{
- return (String)virtualHost.getAttribute("haNodeName");
+ return (String)messageStoreSettings.get(NODE_NAME);
}
@Override
public String getHostPort()
{
- return (String)virtualHost.getAttribute("haNodeAddress");
+ return (String)messageStoreSettings.get(NODE_ADDRESS);
}
@Override
public String getHelperHostPort()
{
- return (String)virtualHost.getAttribute("haHelperAddress");
+ return (String)messageStoreSettings.get(HELPER_ADDRESS);
}
@Override
public String getGroupName()
{
- return (String)virtualHost.getAttribute("haGroupName");
+ return (String)messageStoreSettings.get(GROUP_NAME);
}
@Override
public String getDurability()
{
- return virtualHost.getAttribute("haDurability") == null ? DEFAULT_DURABILITY.toString() : (String)virtualHost.getAttribute("haDurability");
+ String durability = (String)messageStoreSettings.get(DURABILITY);
+ return durability == null ? DEFAULT_DURABILITY.toString() : durability;
}
};
return new ReplicatedEnvironmentFacade(configuration);
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
index 4684358190..65830fd1c2 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
@@ -20,17 +20,18 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreConstants;
import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.when;
-
public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase
{
private static final Logger _logger = Logger.getLogger(BDBMessageStoreQuotaEventsTest.class);
@@ -59,16 +60,22 @@ public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestB
return NUMBER_OF_MESSAGES_TO_OVERFILL_STORE;
}
+
@Override
- protected void applyStoreSpecificConfiguration(VirtualHost virtualHost)
+ protected VirtualHost<?> createVirtualHost(String storeLocation)
{
- _logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE);
+ _logger.debug("Applying store specific config. overfull-size=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE);
+ VirtualHost<?> vhost = mock(VirtualHost.class);
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ messageStoreSettings.put(MessageStore.STORE_PATH, storeLocation);
+ messageStoreSettings.put(MessageStore.OVERFULL_SIZE, OVERFULL_SIZE);
+ messageStoreSettings.put(MessageStore.UNDERFULL_SIZE, UNDERFULL_SIZE);
Map<String,String> envMap = Collections.singletonMap("je.log.fileMax", MAX_BDB_LOG_SIZE);
- when(virtualHost.getAttribute(eq("bdbEnvironmentConfig"))).thenReturn(envMap);
- when(virtualHost.getAttribute(eq(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE))).thenReturn(OVERFULL_SIZE);
- when(virtualHost.getAttribute(eq(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE))).thenReturn(UNDERFULL_SIZE);
-
+ messageStoreSettings.put(EnvironmentFacadeFactory.ENVIRONMENT_CONFIGURATION, envMap);
+ when(vhost.getMessageStoreSettings()).thenReturn(messageStoreSettings);
+ when(vhost.getName()).thenReturn("test");
+ return vhost;
}
@Override
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
index da34e191f7..2caf85966c 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
@@ -38,7 +38,9 @@ import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
@@ -101,33 +103,31 @@ public class VirtualHostTest extends QpidTestCase
String nodeHostPort = "localhost:" + findFreePort();
String helperHostPort = nodeHostPort;
String durability = "NO_SYNC,SYNC,NONE";
- String hostName = getName();
+ String virtualHostName = getName();
- Map<String, Object> virtualHostAttributes = new HashMap<String, Object>();
- virtualHostAttributes.put("haNodeName", nodeName);
- virtualHostAttributes.put("haGroupName", groupName);
- virtualHostAttributes.put("haNodeAddress", nodeHostPort);
- virtualHostAttributes.put("haHelperAddress", helperHostPort);
- virtualHostAttributes.put("haDurability", durability);
- virtualHostAttributes.put(VirtualHost.STORE_PATH, _bdbStorePath.getAbsolutePath());
- virtualHostAttributes.put("haReplicationConfig",
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.NODE_NAME, nodeName);
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.GROUP_NAME, groupName);
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, nodeHostPort);
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.HELPER_ADDRESS, helperHostPort);
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.DURABILITY, durability);
+
+ messageStoreSettings.put(MessageStore.STORE_PATH, _bdbStorePath.getAbsolutePath());
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.REPLICATION_CONFIG,
Collections.singletonMap(ReplicationConfig.REP_STREAM_TIMEOUT, repStreamTimeout));
- virtualHostAttributes.put(VirtualHost.NAME, hostName);
+
+ Map<String, Object> virtualHostAttributes = new HashMap<String, Object>();
+ virtualHostAttributes.put(VirtualHost.NAME, virtualHostName);
virtualHostAttributes.put(VirtualHost.TYPE, BDBHAVirtualHostFactory.TYPE);
+ virtualHostAttributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings);
_host = createHost(virtualHostAttributes);
_host.setDesiredState(State.INITIALISING, State.ACTIVE);
- assertEquals("Unexpected host name", hostName, _host.getName());
+ assertEquals("Unexpected virtual host name", virtualHostName, _host.getName());
assertEquals("Unexpected host type", BDBHAVirtualHostFactory.TYPE, _host.getType());
- assertEquals("Unexpected store type", ReplicatedEnvironmentFacade.TYPE, _host.getAttribute(VirtualHost.STORE_TYPE));
-
- assertEquals(nodeName, _host.getAttribute("haNodeName"));
- assertEquals(groupName, _host.getAttribute("haGroupName"));
- assertEquals(nodeHostPort, _host.getAttribute("haNodeAddress"));
- assertEquals(helperHostPort, _host.getAttribute("haHelperAddress"));
- assertEquals(durability, _host.getAttribute("haDurability"));
- assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH));
+
+ assertEquals(messageStoreSettings, _host.getMessageStoreSettings());
BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore();
ReplicatedEnvironment environment = (ReplicatedEnvironment) messageStore.getEnvironmentFacade().getEnvironment();
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java
index b6a178ac8a..67c89718f6 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java
@@ -32,6 +32,7 @@ import javax.jms.Session;
import org.apache.log4j.Logger;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.test.utils.Piper;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.util.FileUtils;
@@ -61,7 +62,8 @@ public class BDBBackupTest extends QpidBrokerTestCase
_backupToDir = new File(SYSTEM_TMP_DIR + File.separator + getTestName());
_backupToDir.mkdirs();
Map<String, Object> virtualHostAttributes = getBrokerConfiguration().getObjectAttributes(TEST_VHOST);
- _backupFromDir = new File((String)virtualHostAttributes.get(VirtualHost.STORE_PATH));
+ Map<String, Object> messageStoreSettings = (Map<String, Object>) virtualHostAttributes.get(VirtualHost.MESSAGE_STORE_SETTINGS);
+ _backupFromDir = new File((String)messageStoreSettings.get(MessageStore.STORE_PATH));
boolean fromDirExistsAndIsDir = _backupFromDir.isDirectory();
assertTrue("backupFromDir " + _backupFromDir + " should already exist", fromDirExistsAndIsDir);
}
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
index 3d6a2bac67..cb56a60119 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
@@ -45,6 +45,7 @@ import javax.management.openmbean.TabularDataSupport;
import org.apache.qpid.management.common.mbeans.ManagedExchange;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
@@ -75,7 +76,7 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
private static final String QUEUE_NAME="myUpgradeQueue";
private static final String NON_DURABLE_QUEUE_NAME="queue-non-durable";
private static final String PRIORITY_QUEUE_NAME="myPriorityQueue";
- private static final String QUEUE_WITH_DLQ_NAME="myQueueWithDLQ";
+ private static final String QUEUE_WITH_DLQ_NAME="myQueueWithDLQ";
private String _storeLocation;
@@ -84,7 +85,9 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
{
assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG);
Map<String, Object> virtualHostAttributes = getBrokerConfiguration().getObjectAttributes(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST);
- _storeLocation = (String)virtualHostAttributes.get(VirtualHost.STORE_PATH);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> messageStoreSettings = (Map<String, Object>) virtualHostAttributes.get(VirtualHost.MESSAGE_STORE_SETTINGS);
+ _storeLocation = (String)messageStoreSettings.get(MessageStore.STORE_PATH);
//Clear the two target directories if they exist.
File directory = new File(_storeLocation);
@@ -102,11 +105,6 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
super.setUp();
}
- private String getWorkDirBaseDir()
- {
- return QPID_WORK_ORIG + (isInternalBroker() ? "" : "/" + getPort());
- }
-
/**
* Test that the selector applied to the DurableSubscription was successfully
* transfered to the new store, and functions as expected with continued use
@@ -505,7 +503,7 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
return send;
}
-
+
/**
* Generates a string of a given length consisting of the sequence 0,1,2,..,9,0,1,2.
*
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
index bef35e163f..e8d18971ad 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
@@ -184,7 +184,7 @@ public class HAClusterManagementTest extends QpidBrokerTestCase
final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved);
final int newBdbPort = getNextAvailable(oldBdbPort + 1);
- storeBean.updateAddress(_clusterCreator.getNodeNameForNodeAt(oldBdbPort), "localhost", newBdbPort);
+ storeBean.updateAddress(_clusterCreator.getNodeNameForNodeAt(oldBdbPort), _clusterCreator.getIpAddressOfBrokerHost(), newBdbPort);
_clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort);
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
index 1a65b095b4..4efe1967ce 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
@@ -43,8 +43,10 @@ import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.test.utils.TestBrokerConfiguration;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
import org.apache.qpid.url.URLSyntaxException;
import com.sleepycat.je.rep.ReplicationConfig;
@@ -101,19 +103,22 @@ public class HATestClusterCreator
_bdbHelperPort = bdbPort;
}
- TestBrokerConfiguration brokerConfiguration = _testcase.getBrokerConfiguration(brokerPort);
- brokerConfiguration.addJmxManagementConfiguration();
String nodeName = getNodeNameForNodeAt(bdbPort);
- brokerConfiguration.setObjectAttribute(_virtualHostName, VirtualHost.TYPE, BDBHAVirtualHostFactory.TYPE);
- brokerConfiguration.setObjectAttribute(_virtualHostName, VirtualHost.STORE_PATH, System.getProperty("QPID_WORK") + File.separator + brokerPort);
- brokerConfiguration.setObjectAttribute(_virtualHostName, "haGroupName", _groupName);
- brokerConfiguration.setObjectAttribute(_virtualHostName, "haNodeName", nodeName);
- brokerConfiguration.setObjectAttribute(_virtualHostName, "haNodeAddress", getNodeHostPortForNodeAt(bdbPort));
- brokerConfiguration.setObjectAttribute(_virtualHostName, "haHelperAddress", getHelperHostPort());
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ messageStoreSettings.put(MessageStore.STORE_PATH, System.getProperty("QPID_WORK") + File.separator + brokerPort);
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.GROUP_NAME, _groupName);
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.NODE_NAME, nodeName);
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, getNodeHostPortForNodeAt(bdbPort));
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.HELPER_ADDRESS, getHelperHostPort());
Map<String, String> repSettings = new HashMap<String, String>();
repSettings.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
repSettings.put(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES, "0");
- brokerConfiguration.setObjectAttribute(_virtualHostName, "haReplicationConfig", repSettings );
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.REPLICATION_CONFIG, repSettings );
+
+ TestBrokerConfiguration brokerConfiguration = _testcase.getBrokerConfiguration(brokerPort);
+ brokerConfiguration.addJmxManagementConfiguration();
+ brokerConfiguration.setObjectAttribute(_virtualHostName, VirtualHost.TYPE, BDBHAVirtualHostFactory.TYPE);
+ brokerConfiguration.setObjectAttribute(_virtualHostName, VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings);
brokerPort = _testcase.getNextAvailable(bdbPort + 1);
}
@@ -127,7 +132,10 @@ public class HATestClusterCreator
throw new IllegalArgumentException("Only two nodes groups have the concept of primary");
}
TestBrokerConfiguration config = _testcase.getBrokerConfiguration(_primaryBrokerPort);
- config.setObjectAttribute("test", "haDesignatedPrimary", designatedPrimary);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> storeSetting = (Map<String, Object>) config.getObjectAttributes(_virtualHostName).get(VirtualHost.MESSAGE_STORE_SETTINGS);
+ storeSetting.put(ReplicatedEnvironmentFacadeFactory.DESIGNATED_PRIMARY, designatedPrimary);
+ config.setObjectAttribute(_virtualHostName, VirtualHost.MESSAGE_STORE_SETTINGS, storeSetting);
config.setSaved(false);
}
@@ -360,12 +368,15 @@ public class HATestClusterCreator
public void modifyClusterNodeBdbAddress(int brokerPortNumberToBeMoved, int newBdbPort)
{
TestBrokerConfiguration config = _testcase.getBrokerConfiguration(brokerPortNumberToBeMoved);
- config.setObjectAttribute(_virtualHostName, "haNodeAddress", "localhost:" + newBdbPort);
- String oldBdbHostPort = (String)config.getObjectAttributes(_virtualHostName).get("haNodeAddress");
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> storeSetting = (Map<String, Object>) config.getObjectAttributes(_virtualHostName).get(VirtualHost.MESSAGE_STORE_SETTINGS);
+ String oldBdbHostPort = (String) storeSetting.get(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS);
String[] oldHostAndPort = StringUtils.split(oldBdbHostPort, ":");
String oldHost = oldHostAndPort[0];
String newBdbHostPort = oldHost + ":" + newBdbPort;
- config.setObjectAttribute(_virtualHostName, "haNodeAddress", newBdbHostPort);
+ storeSetting.put(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, newBdbHostPort);
+ config.setObjectAttribute(_virtualHostName, VirtualHost.MESSAGE_STORE_SETTINGS, storeSetting);
config.setSaved(false);
}