diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-08-20 18:39:44 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-08-20 18:39:44 +0000 |
| commit | 6266c91a9ef891c84159c99cfc7708fc2cc7d49f (patch) | |
| tree | 38cc9d7ec71534364ac2bd983d535a4106a03e91 /qpid/java/bdbstore | |
| parent | fd04bb3ba94abd9979ae820e7717fdb77d230097 (diff) | |
| download | qpid-python-6266c91a9ef891c84159c99cfc7708fc2cc7d49f.tar.gz | |
QPID-5087 : [Java Broker] Allow use of separate message stores and configuration stores
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1515914 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore')
7 files changed, 115 insertions, 36 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index a4383d94c4..e772498ee9 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -139,6 +139,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo private String _storeLocation; private Map<String, String> _envConfigMap; + private VirtualHost _virtualHost; public AbstractBDBMessageStore() { @@ -151,34 +152,58 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo _eventManager.addEventListener(eventListener, events); } - public void configureConfigStore(String name, - ConfigurationRecoveryHandler recoveryHandler, - VirtualHost virtualHost) throws Exception + public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) throws Exception { _stateManager.attainState(State.INITIALISING); _configRecoveryHandler = recoveryHandler; - - configure(name, virtualHost); + _virtualHost = virtualHost; } - public void configureMessageStore(String name, - MessageStoreRecoveryHandler messageRecoveryHandler, + public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception { + if(_stateManager.isInState(State.INITIAL)) + { + // Is acting as a message store, but not a durable config store + _stateManager.attainState(State.INITIALISING); + } + _messageRecoveryHandler = messageRecoveryHandler; _tlogRecoveryHandler = tlogRecoveryHandler; + _virtualHost = virtualHost; + + completeInitialisation(); + } + + private void completeInitialisation() throws Exception + { + configure(_virtualHost); _stateManager.attainState(State.INITIALISED); } public synchronized void activate() throws Exception { + // check if acting as a durable config store, but not a message store + if(_stateManager.isInState(State.INITIALISING)) + { + completeInitialisation(); + } _stateManager.attainState(State.ACTIVATING); - recoverConfig(_configRecoveryHandler); - recoverMessages(_messageRecoveryHandler); - recoverQueueEntries(_tlogRecoveryHandler); + if(_configRecoveryHandler != null) + { + recoverConfig(_configRecoveryHandler); + } + if(_messageRecoveryHandler != null) + { + recoverMessages(_messageRecoveryHandler); + } + if(_tlogRecoveryHandler != null) + { + recoverQueueEntries(_tlogRecoveryHandler); + } _stateManager.attainState(State.ACTIVE); } @@ -192,23 +217,38 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo * Called after instantiation in order to configure the message store. * * - * @param name The name of the virtual host using this store - * @param virtualHost + * + * @param virtualHost The virtual host using this store * @return whether a new store environment was created or not (to indicate whether recovery is necessary) * * @throws Exception If any error occurs that means the store is unable to configure itself. */ - public void configure(String name, VirtualHost virtualHost) throws Exception + public void configure(VirtualHost virtualHost) throws Exception { + configure(virtualHost, _messageRecoveryHandler != null); + } - + public void configure(VirtualHost virtualHost, boolean isMessageStore) throws Exception + { + String name = virtualHost.getName(); final String defaultPath = System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name; - - String storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); - if(storeLocation == null) + String storeLocation; + if(isMessageStore) { - storeLocation = defaultPath; + storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); + if(storeLocation == null) + { + storeLocation = defaultPath; + } + } + else // we are acting only as the durable config store + { + storeLocation = (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH); + if(storeLocation == null) + { + storeLocation = defaultPath; + } } Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java index 561e4fa660..fb1dc1f1d3 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java @@ -122,14 +122,14 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess private Map<String, String> _repConfig; @Override - public void configure(String name, VirtualHost virtualHost) throws Exception + public void configure(VirtualHost virtualHost) throws Exception { //Mandatory configuration _groupName = getValidatedStringAttribute(virtualHost, "haGroupName"); _nodeName = getValidatedStringAttribute(virtualHost, "haNodeName"); _nodeHostPort = getValidatedStringAttribute(virtualHost, "haNodeAddress"); _helperHostPort = getValidatedStringAttribute(virtualHost, "haHelperAddress"); - _name = name; + _name = virtualHost.getName(); //Optional configuration String durabilitySetting = getStringAttribute(virtualHost,"haDurability",null); @@ -157,7 +157,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess + "! Please set highAvailability.coalescingSync to false in store configuration."); } - super.configure(name, virtualHost); + super.configure(virtualHost); } @@ -260,10 +260,10 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess } @Override - public void configureMessageStore(String name, MessageStoreRecoveryHandler messageRecoveryHandler, + public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception { - super.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler); + super.configureMessageStore(virtualHost, messageRecoveryHandler, tlogRecoveryHandler); final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java index b92a97c8cb..bb3c7b108d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java @@ -77,12 +77,12 @@ public class BDBHAVirtualHost extends AbstractVirtualHost new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(), new DefaultUpgraderProvider(this, getExchangeRegistry())); - _messageStore.configureConfigStore(getName(), - configRecoverer, - virtualHost); + _messageStore.configureConfigStore( + virtualHost, configRecoverer + ); - _messageStore.configureMessageStore(getName(), - recoveryHandler, + _messageStore.configureMessageStore( + virtualHost, recoveryHandler, recoveryHandler ); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java index 4eac54dd6f..d7c8b23d39 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java @@ -26,10 +26,12 @@ import java.util.List; import java.util.Map; import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory; import org.apache.qpid.server.plugin.MessageStoreFactory; +import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; -public class BDBMessageStoreFactory implements MessageStoreFactory +public class BDBMessageStoreFactory implements MessageStoreFactory, DurableConfigurationStoreFactory { @Override @@ -39,6 +41,12 @@ public class BDBMessageStoreFactory implements MessageStoreFactory } @Override + public DurableConfigurationStore createDurableConfigurationStore() + { + return new BDBMessageStore(); + } + + @Override public MessageStore createMessageStore() { return new BDBMessageStore(); @@ -76,12 +84,25 @@ public class BDBMessageStoreFactory implements MessageStoreFactory @Override public void validateAttributes(Map<String, Object> attributes) { - Object storePath = attributes.get(VirtualHost.STORE_PATH); - if(!(storePath instanceof String)) + if(getType().equals(attributes.get(VirtualHost.STORE_TYPE))) + { + Object storePath = attributes.get(VirtualHost.STORE_PATH); + if(!(storePath instanceof String)) + { + throw new IllegalArgumentException("Attribute '"+ VirtualHost.STORE_PATH + +"' is required and must be of type String."); + + } + } + if(getType().equals(attributes.get(VirtualHost.CONFIG_STORE_TYPE))) { - throw new IllegalArgumentException("Attribute '"+ VirtualHost.STORE_PATH - +"' is required and must be of type String."); + Object storePath = attributes.get(VirtualHost.CONFIG_STORE_PATH); + if(!(storePath instanceof String)) + { + throw new IllegalArgumentException("Attribute '"+ VirtualHost.CONFIG_STORE_PATH + +"' is required and must be of type String."); + } } } } diff --git a/qpid/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory b/qpid/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory new file mode 100644 index 0000000000..a822405565 --- /dev/null +++ b/qpid/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.DurableConfigurationStoreFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index 21342b5715..76b990038d 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -230,7 +230,7 @@ public class BDBMessageStoreTest extends MessageStoreTest messageStore.close(); AbstractBDBMessageStore newStore = new BDBMessageStore(); - newStore.configure("", getVirtualHostModel()); + newStore.configure(getVirtualHostModel(),true); newStore.startWithNoRecover(); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java index 5ad49462ac..3f32df4b0c 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java @@ -20,7 +20,6 @@ package org.apache.qpid.server.store.berkeleydb; import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -34,7 +33,7 @@ public class HAMessageStoreSmokeTest extends QpidTestCase { try { - _store.configure("test", mock(VirtualHost.class)); + _store.configure(mock(VirtualHost.class)); fail("Expected an exception to be thrown"); } catch (ConfigurationException ce) |
