diff options
| author | Keith Wall <kwall@apache.org> | 2014-03-14 16:39:47 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-03-14 16:39:47 +0000 |
| commit | ec486999608568e37a55dc9c81d9be133d95ebc3 (patch) | |
| tree | 87d6446e97cfdca321b1faff6f24a3010df4cdff /qpid/java/bdbstore | |
| parent | db26915f9b2edfa410c094162bec78b9d2010b24 (diff) | |
| download | qpid-python-ec486999608568e37a55dc9c81d9be133d95ebc3.tar.gz | |
QPID-5624: Introduce messageStoreSettings VH attribute and move all message store related attributes into messageStoreSettings map
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha2@1577606 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore')
13 files changed, 162 insertions, 97 deletions
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java index 16199d30a3..24d7513c5f 100644 --- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java +++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java @@ -28,6 +28,7 @@ import org.apache.qpid.server.jmx.MBeanProvider; import org.apache.qpid.server.jmx.ManagedObject; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.berkeleydb.BDBHAVirtualHostFactory; import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; @@ -48,8 +49,7 @@ public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider @Override public boolean isChildManageableByMBean(ConfiguredObject child) { - return (child instanceof VirtualHost - && ReplicatedEnvironmentFacade.TYPE.equals(child.getAttribute(VirtualHost.STORE_TYPE))); + return (child instanceof VirtualHost && BDBHAVirtualHostFactory.TYPE.equals(child.getType())); } @Override diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java index b2ec96f9f8..6fb84b8a4d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java @@ -23,6 +23,8 @@ import java.util.Map; import org.apache.qpid.server.plugin.VirtualHostFactory; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; @@ -52,11 +54,18 @@ public class BDBHAVirtualHostFactory implements VirtualHostFactory @Override public void validateAttributes(Map<String, Object> attributes) { - validateAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH, String.class, attributes); - validateAttribute("haGroupName", String.class, attributes); - validateAttribute("haNodeName", String.class, attributes); - validateAttribute("haNodeAddress", String.class, attributes); - validateAttribute("haHelperAddress", String.class, attributes); + @SuppressWarnings("unchecked") + Map<String, Object> messageStoreSettings = (Map<String, Object>)attributes.get(org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS); + if (messageStoreSettings == null) + { + throw new IllegalArgumentException("Attribute '"+ org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS + "' is required."); + } + + validateAttribute(MessageStore.STORE_PATH, String.class, messageStoreSettings); + validateAttribute(ReplicatedEnvironmentFacadeFactory.GROUP_NAME, String.class, messageStoreSettings); + validateAttribute(ReplicatedEnvironmentFacadeFactory.NODE_NAME, String.class, messageStoreSettings); + validateAttribute(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, String.class, messageStoreSettings); + validateAttribute(ReplicatedEnvironmentFacadeFactory.HELPER_ADDRESS, String.class, messageStoreSettings); } private void validateAttribute(String attrName, Class<?> clazz, Map<String, Object> attributes) diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 35dae4b800..c8550b2114 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -20,12 +20,6 @@ */ package org.apache.qpid.server.store.berkeleydb; -import com.sleepycat.bind.tuple.ByteBinding; -import com.sleepycat.bind.tuple.IntegerBinding; -import com.sleepycat.bind.tuple.LongBinding; -import com.sleepycat.je.*; -import com.sleepycat.je.Transaction; - import java.io.File; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; @@ -36,16 +30,32 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.store.*; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler; +import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.Event; +import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.EventManager; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreRecoveryHandler; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; +import org.apache.qpid.server.store.State; +import org.apache.qpid.server.store.StateManager; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.StoredMemoryMessage; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; import org.apache.qpid.server.store.berkeleydb.entry.Xid; @@ -59,6 +69,21 @@ import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader; import org.apache.qpid.util.FileUtils; +import com.sleepycat.bind.tuple.ByteBinding; +import com.sleepycat.bind.tuple.IntegerBinding; +import com.sleepycat.bind.tuple.LongBinding; +import com.sleepycat.je.CheckpointConfig; +import com.sleepycat.je.Cursor; +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.DatabaseEntry; +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.LockConflictException; +import com.sleepycat.je.LockMode; +import com.sleepycat.je.OperationStatus; +import com.sleepycat.je.Transaction; + /** * BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log. * @@ -72,8 +97,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class); public static final int VERSION = 7; - public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig"; - private static final int LOCK_RETRY_ATTEMPTS = 5; private static String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS"; private static String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA"; @@ -119,7 +142,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore public BDBMessageStore(EnvironmentFacadeFactory environmentFacadeFactory) { - _type = environmentFacadeFactory.getType();; + _type = environmentFacadeFactory.getType(); _environmentFacadeFactory = environmentFacadeFactory; _stateManager = new StateManager(_eventManager); } @@ -218,8 +241,9 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private void configure(VirtualHost virtualHost, boolean isMessageStore) throws StoreException { - Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE); - Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE); + Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings(); + Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE); + Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE); _persistentSizeHighThreshold = overfullAttr == null ? -1l : overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString()); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java index 8f2086a25c..e2b30f6740 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java @@ -52,12 +52,14 @@ public class BDBMessageStoreFactory implements MessageStoreFactory, DurableConfi @Override public void validateAttributes(Map<String, Object> attributes) { - if(getType().equals(attributes.get(VirtualHost.STORE_TYPE))) + @SuppressWarnings("unchecked") + Map<String, Object> messageStoreSettings = (Map<String, Object>) attributes.get(VirtualHost.MESSAGE_STORE_SETTINGS); + if(getType().equals(messageStoreSettings.get(MessageStore.STORE_TYPE))) { - Object storePath = attributes.get(VirtualHost.STORE_PATH); + Object storePath = messageStoreSettings.get(MessageStore.STORE_PATH); if(!(storePath instanceof String)) { - throw new IllegalArgumentException("Attribute '"+ VirtualHost.STORE_PATH + throw new IllegalArgumentException("Setting '"+ MessageStore.STORE_PATH +"' is required and must be of type String."); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java index b784e436b9..d242790efb 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java @@ -24,8 +24,9 @@ import org.apache.qpid.server.model.VirtualHost; public interface EnvironmentFacadeFactory { + public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig"; - EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore); + EnvironmentFacade createEnvironmentFacade(VirtualHost<?> virtualHost, boolean isMessageStore); String getType(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java index 384ceba98a..7fdae6b3ee 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java @@ -26,30 +26,32 @@ import java.util.Map; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.MessageStore; public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactory { @SuppressWarnings("unchecked") @Override - public EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore) + public EnvironmentFacade createEnvironmentFacade(VirtualHost<?> virtualHost, boolean isMessageStore) { + String name = virtualHost.getName(); + Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings(); Map<String, String> envConfigMap = new HashMap<String, String>(); envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS); - Object environmentConfigurationAttributes = virtualHost.getAttribute(BDBMessageStore.ENVIRONMENT_CONFIGURATION); + Object environmentConfigurationAttributes = messageStoreSettings.get(ENVIRONMENT_CONFIGURATION); if (environmentConfigurationAttributes instanceof Map) { envConfigMap.putAll((Map<String, String>) environmentConfigurationAttributes); } - String name = virtualHost.getName(); final String defaultPath = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK) + File.separator + "bdbstore" + File.separator + name; String storeLocation; if(isMessageStore) { - storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); + storeLocation = (String) messageStoreSettings.get(MessageStore.STORE_PATH); if(storeLocation == null) { storeLocation = defaultPath; diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java index cd53afe891..4df62b1d0f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb.replication; import java.util.Map; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory; @@ -32,47 +33,56 @@ import com.sleepycat.je.Durability.SyncPolicy; public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory { - + public static final String DURABILITY = "haDurability"; + public static final String GROUP_NAME = "haGroupName"; + public static final String HELPER_ADDRESS = "haHelperAddress"; + public static final String NODE_ADDRESS = "haNodeAddress"; + public static final String NODE_NAME = "haNodeName"; + public static final String REPLICATION_CONFIG = "haReplicationConfig"; + public static final String COALESCING_SYNC = "haCoalescingSync"; + public static final String DESIGNATED_PRIMARY = "haDesignatedPrimary"; + private static final int DEFAULT_NODE_PRIORITY = 1; private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC, ReplicaAckPolicy.SIMPLE_MAJORITY); private static final boolean DEFAULT_COALESCING_SYNC = true; - - @Override - public EnvironmentFacade createEnvironmentFacade(final VirtualHost virtualHost, boolean isMessageStore) + public EnvironmentFacade createEnvironmentFacade(VirtualHost<?> virtualHost, boolean isMessageStore) { + final Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings(); ReplicatedEnvironmentConfiguration configuration = new ReplicatedEnvironmentConfiguration() { @Override public boolean isDesignatedPrimary() { - return convertBoolean(virtualHost.getAttribute("haDesignatedPrimary"), false); + return convertBoolean(messageStoreSettings.get(DESIGNATED_PRIMARY), false); } @Override public boolean isCoalescingSync() { - return convertBoolean(virtualHost.getAttribute("haCoalescingSync"), DEFAULT_COALESCING_SYNC); + return convertBoolean(messageStoreSettings.get(COALESCING_SYNC), DEFAULT_COALESCING_SYNC); } @Override public String getStorePath() { - return (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); + return (String) messageStoreSettings.get(MessageStore.STORE_PATH); } + @SuppressWarnings("unchecked") @Override public Map<String, String> getParameters() { - return (Map<String, String>) virtualHost.getAttribute("bdbEnvironmentConfig"); + return (Map<String, String>) messageStoreSettings.get(EnvironmentFacadeFactory.ENVIRONMENT_CONFIGURATION); } + @SuppressWarnings("unchecked") @Override public Map<String, String> getReplicationParameters() { - return (Map<String, String>) virtualHost.getAttribute("haReplicationConfig"); + return (Map<String, String>) messageStoreSettings.get(REPLICATION_CONFIG); } @Override @@ -87,36 +97,35 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact return DEFAULT_NODE_PRIORITY; } - - @Override public String getName() { - return (String)virtualHost.getAttribute("haNodeName"); + return (String)messageStoreSettings.get(NODE_NAME); } @Override public String getHostPort() { - return (String)virtualHost.getAttribute("haNodeAddress"); + return (String)messageStoreSettings.get(NODE_ADDRESS); } @Override public String getHelperHostPort() { - return (String)virtualHost.getAttribute("haHelperAddress"); + return (String)messageStoreSettings.get(HELPER_ADDRESS); } @Override public String getGroupName() { - return (String)virtualHost.getAttribute("haGroupName"); + return (String)messageStoreSettings.get(GROUP_NAME); } @Override public String getDurability() { - return virtualHost.getAttribute("haDurability") == null ? DEFAULT_DURABILITY.toString() : (String)virtualHost.getAttribute("haDurability"); + String durability = (String)messageStoreSettings.get(DURABILITY); + return durability == null ? DEFAULT_DURABILITY.toString() : durability; } }; return new ReplicatedEnvironmentFacade(configuration); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java index 4684358190..65830fd1c2 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java @@ -20,17 +20,18 @@ */ package org.apache.qpid.server.store.berkeleydb; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.util.Collections; +import java.util.HashMap; import java.util.Map; + import org.apache.log4j.Logger; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreConstants; import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.when; - public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase { private static final Logger _logger = Logger.getLogger(BDBMessageStoreQuotaEventsTest.class); @@ -59,16 +60,22 @@ public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestB return NUMBER_OF_MESSAGES_TO_OVERFILL_STORE; } + @Override - protected void applyStoreSpecificConfiguration(VirtualHost virtualHost) + protected VirtualHost<?> createVirtualHost(String storeLocation) { - _logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE); + _logger.debug("Applying store specific config. overfull-size=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE); + VirtualHost<?> vhost = mock(VirtualHost.class); + Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); + messageStoreSettings.put(MessageStore.STORE_PATH, storeLocation); + messageStoreSettings.put(MessageStore.OVERFULL_SIZE, OVERFULL_SIZE); + messageStoreSettings.put(MessageStore.UNDERFULL_SIZE, UNDERFULL_SIZE); Map<String,String> envMap = Collections.singletonMap("je.log.fileMax", MAX_BDB_LOG_SIZE); - when(virtualHost.getAttribute(eq("bdbEnvironmentConfig"))).thenReturn(envMap); - when(virtualHost.getAttribute(eq(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE))).thenReturn(OVERFULL_SIZE); - when(virtualHost.getAttribute(eq(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE))).thenReturn(UNDERFULL_SIZE); - + messageStoreSettings.put(EnvironmentFacadeFactory.ENVIRONMENT_CONFIGURATION, envMap); + when(vhost.getMessageStoreSettings()).thenReturn(messageStoreSettings); + when(vhost.getName()).thenReturn("test"); + return vhost; } @Override diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java index da34e191f7..2caf85966c 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java @@ -38,7 +38,9 @@ import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; @@ -101,33 +103,31 @@ public class VirtualHostTest extends QpidTestCase String nodeHostPort = "localhost:" + findFreePort(); String helperHostPort = nodeHostPort; String durability = "NO_SYNC,SYNC,NONE"; - String hostName = getName(); + String virtualHostName = getName(); - Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(); - virtualHostAttributes.put("haNodeName", nodeName); - virtualHostAttributes.put("haGroupName", groupName); - virtualHostAttributes.put("haNodeAddress", nodeHostPort); - virtualHostAttributes.put("haHelperAddress", helperHostPort); - virtualHostAttributes.put("haDurability", durability); - virtualHostAttributes.put(VirtualHost.STORE_PATH, _bdbStorePath.getAbsolutePath()); - virtualHostAttributes.put("haReplicationConfig", + Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.NODE_NAME, nodeName); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.GROUP_NAME, groupName); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, nodeHostPort); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.HELPER_ADDRESS, helperHostPort); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.DURABILITY, durability); + + messageStoreSettings.put(MessageStore.STORE_PATH, _bdbStorePath.getAbsolutePath()); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.REPLICATION_CONFIG, Collections.singletonMap(ReplicationConfig.REP_STREAM_TIMEOUT, repStreamTimeout)); - virtualHostAttributes.put(VirtualHost.NAME, hostName); + + Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(); + virtualHostAttributes.put(VirtualHost.NAME, virtualHostName); virtualHostAttributes.put(VirtualHost.TYPE, BDBHAVirtualHostFactory.TYPE); + virtualHostAttributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings); _host = createHost(virtualHostAttributes); _host.setDesiredState(State.INITIALISING, State.ACTIVE); - assertEquals("Unexpected host name", hostName, _host.getName()); + assertEquals("Unexpected virtual host name", virtualHostName, _host.getName()); assertEquals("Unexpected host type", BDBHAVirtualHostFactory.TYPE, _host.getType()); - assertEquals("Unexpected store type", ReplicatedEnvironmentFacade.TYPE, _host.getAttribute(VirtualHost.STORE_TYPE)); - - assertEquals(nodeName, _host.getAttribute("haNodeName")); - assertEquals(groupName, _host.getAttribute("haGroupName")); - assertEquals(nodeHostPort, _host.getAttribute("haNodeAddress")); - assertEquals(helperHostPort, _host.getAttribute("haHelperAddress")); - assertEquals(durability, _host.getAttribute("haDurability")); - assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH)); + + assertEquals(messageStoreSettings, _host.getMessageStoreSettings()); BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore(); ReplicatedEnvironment environment = (ReplicatedEnvironment) messageStore.getEnvironmentFacade().getEnvironment(); diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java index b6a178ac8a..67c89718f6 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java @@ -32,6 +32,7 @@ import javax.jms.Session; import org.apache.log4j.Logger; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.test.utils.Piper; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.util.FileUtils; @@ -61,7 +62,8 @@ public class BDBBackupTest extends QpidBrokerTestCase _backupToDir = new File(SYSTEM_TMP_DIR + File.separator + getTestName()); _backupToDir.mkdirs(); Map<String, Object> virtualHostAttributes = getBrokerConfiguration().getObjectAttributes(TEST_VHOST); - _backupFromDir = new File((String)virtualHostAttributes.get(VirtualHost.STORE_PATH)); + Map<String, Object> messageStoreSettings = (Map<String, Object>) virtualHostAttributes.get(VirtualHost.MESSAGE_STORE_SETTINGS); + _backupFromDir = new File((String)messageStoreSettings.get(MessageStore.STORE_PATH)); boolean fromDirExistsAndIsDir = _backupFromDir.isDirectory(); assertTrue("backupFromDir " + _backupFromDir + " should already exist", fromDirExistsAndIsDir); } diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java index 3d6a2bac67..cb56a60119 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java @@ -45,6 +45,7 @@ import javax.management.openmbean.TabularDataSupport; import org.apache.qpid.management.common.mbeans.ManagedExchange; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.test.utils.JMXTestUtils; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.test.utils.TestBrokerConfiguration; @@ -75,7 +76,7 @@ public class BDBUpgradeTest extends QpidBrokerTestCase private static final String QUEUE_NAME="myUpgradeQueue"; private static final String NON_DURABLE_QUEUE_NAME="queue-non-durable"; private static final String PRIORITY_QUEUE_NAME="myPriorityQueue"; - private static final String QUEUE_WITH_DLQ_NAME="myQueueWithDLQ"; + private static final String QUEUE_WITH_DLQ_NAME="myQueueWithDLQ"; private String _storeLocation; @@ -84,7 +85,9 @@ public class BDBUpgradeTest extends QpidBrokerTestCase { assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG); Map<String, Object> virtualHostAttributes = getBrokerConfiguration().getObjectAttributes(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST); - _storeLocation = (String)virtualHostAttributes.get(VirtualHost.STORE_PATH); + @SuppressWarnings("unchecked") + Map<String, Object> messageStoreSettings = (Map<String, Object>) virtualHostAttributes.get(VirtualHost.MESSAGE_STORE_SETTINGS); + _storeLocation = (String)messageStoreSettings.get(MessageStore.STORE_PATH); //Clear the two target directories if they exist. File directory = new File(_storeLocation); @@ -102,11 +105,6 @@ public class BDBUpgradeTest extends QpidBrokerTestCase super.setUp(); } - private String getWorkDirBaseDir() - { - return QPID_WORK_ORIG + (isInternalBroker() ? "" : "/" + getPort()); - } - /** * Test that the selector applied to the DurableSubscription was successfully * transfered to the new store, and functions as expected with continued use @@ -505,7 +503,7 @@ public class BDBUpgradeTest extends QpidBrokerTestCase return send; } - + /** * Generates a string of a given length consisting of the sequence 0,1,2,..,9,0,1,2. * diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java index bef35e163f..e8d18971ad 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java @@ -184,7 +184,7 @@ public class HAClusterManagementTest extends QpidBrokerTestCase final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved); final int newBdbPort = getNextAvailable(oldBdbPort + 1); - storeBean.updateAddress(_clusterCreator.getNodeNameForNodeAt(oldBdbPort), "localhost", newBdbPort); + storeBean.updateAddress(_clusterCreator.getNodeNameForNodeAt(oldBdbPort), _clusterCreator.getIpAddressOfBrokerHost(), newBdbPort); _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort); diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java index 1a65b095b4..4efe1967ce 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java @@ -43,8 +43,10 @@ import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.test.utils.TestBrokerConfiguration; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.TestBrokerConfiguration; import org.apache.qpid.url.URLSyntaxException; import com.sleepycat.je.rep.ReplicationConfig; @@ -101,19 +103,22 @@ public class HATestClusterCreator _bdbHelperPort = bdbPort; } - TestBrokerConfiguration brokerConfiguration = _testcase.getBrokerConfiguration(brokerPort); - brokerConfiguration.addJmxManagementConfiguration(); String nodeName = getNodeNameForNodeAt(bdbPort); - brokerConfiguration.setObjectAttribute(_virtualHostName, VirtualHost.TYPE, BDBHAVirtualHostFactory.TYPE); - brokerConfiguration.setObjectAttribute(_virtualHostName, VirtualHost.STORE_PATH, System.getProperty("QPID_WORK") + File.separator + brokerPort); - brokerConfiguration.setObjectAttribute(_virtualHostName, "haGroupName", _groupName); - brokerConfiguration.setObjectAttribute(_virtualHostName, "haNodeName", nodeName); - brokerConfiguration.setObjectAttribute(_virtualHostName, "haNodeAddress", getNodeHostPortForNodeAt(bdbPort)); - brokerConfiguration.setObjectAttribute(_virtualHostName, "haHelperAddress", getHelperHostPort()); + Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); + messageStoreSettings.put(MessageStore.STORE_PATH, System.getProperty("QPID_WORK") + File.separator + brokerPort); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.GROUP_NAME, _groupName); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.NODE_NAME, nodeName); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, getNodeHostPortForNodeAt(bdbPort)); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.HELPER_ADDRESS, getHelperHostPort()); Map<String, String> repSettings = new HashMap<String, String>(); repSettings.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s"); repSettings.put(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES, "0"); - brokerConfiguration.setObjectAttribute(_virtualHostName, "haReplicationConfig", repSettings ); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.REPLICATION_CONFIG, repSettings ); + + TestBrokerConfiguration brokerConfiguration = _testcase.getBrokerConfiguration(brokerPort); + brokerConfiguration.addJmxManagementConfiguration(); + brokerConfiguration.setObjectAttribute(_virtualHostName, VirtualHost.TYPE, BDBHAVirtualHostFactory.TYPE); + brokerConfiguration.setObjectAttribute(_virtualHostName, VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings); brokerPort = _testcase.getNextAvailable(bdbPort + 1); } @@ -127,7 +132,10 @@ public class HATestClusterCreator throw new IllegalArgumentException("Only two nodes groups have the concept of primary"); } TestBrokerConfiguration config = _testcase.getBrokerConfiguration(_primaryBrokerPort); - config.setObjectAttribute("test", "haDesignatedPrimary", designatedPrimary); + @SuppressWarnings("unchecked") + Map<String, Object> storeSetting = (Map<String, Object>) config.getObjectAttributes(_virtualHostName).get(VirtualHost.MESSAGE_STORE_SETTINGS); + storeSetting.put(ReplicatedEnvironmentFacadeFactory.DESIGNATED_PRIMARY, designatedPrimary); + config.setObjectAttribute(_virtualHostName, VirtualHost.MESSAGE_STORE_SETTINGS, storeSetting); config.setSaved(false); } @@ -360,12 +368,15 @@ public class HATestClusterCreator public void modifyClusterNodeBdbAddress(int brokerPortNumberToBeMoved, int newBdbPort) { TestBrokerConfiguration config = _testcase.getBrokerConfiguration(brokerPortNumberToBeMoved); - config.setObjectAttribute(_virtualHostName, "haNodeAddress", "localhost:" + newBdbPort); - String oldBdbHostPort = (String)config.getObjectAttributes(_virtualHostName).get("haNodeAddress"); + + @SuppressWarnings("unchecked") + Map<String, Object> storeSetting = (Map<String, Object>) config.getObjectAttributes(_virtualHostName).get(VirtualHost.MESSAGE_STORE_SETTINGS); + String oldBdbHostPort = (String) storeSetting.get(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS); String[] oldHostAndPort = StringUtils.split(oldBdbHostPort, ":"); String oldHost = oldHostAndPort[0]; String newBdbHostPort = oldHost + ":" + newBdbPort; - config.setObjectAttribute(_virtualHostName, "haNodeAddress", newBdbHostPort); + storeSetting.put(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, newBdbHostPort); + config.setObjectAttribute(_virtualHostName, VirtualHost.MESSAGE_STORE_SETTINGS, storeSetting); config.setSaved(false); } |
