summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'qpid')
-rw-r--r--qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java2
-rw-r--r--qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java7
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java73
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java20
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java1
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java3
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java8
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java71
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java38
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java158
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java5
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java21
-rw-r--r--qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/edit.js4
-rw-r--r--qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js2
-rw-r--r--qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/edit.html26
-rw-r--r--qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/show.html4
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java78
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java45
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java14
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java117
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java15
22 files changed, 529 insertions, 185 deletions
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
index 7146af364e..8c05602967 100644
--- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
+++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
@@ -130,7 +130,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
@Override
public boolean getCoalescingSync() throws IOException, JMException
{
- return _virtualHostNode.isCoalescingSync();
+ return true;
}
@Override
diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
index 439af259ab..e5829c519a 100644
--- a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
+++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
@@ -119,13 +119,6 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
assertEquals(TEST_DURABILITY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DURABILITY));
}
- public void testCoalescingSync() throws Exception
- {
- when(_virtualHostNode.isCoalescingSync()).thenReturn(true);
-
- assertEquals(true, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_COALESCING_SYNC));
- }
-
public void testNodeState() throws Exception
{
when(_virtualHostNode.getRole()).thenReturn(TEST_NODE_STATE);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
deleted file mode 100644
index adc73c9bd9..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import java.util.Map;
-
-import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.model.ManagedObject;
-import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
-import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
-
-@ManagedObject( category = false, type = "BDB_HA" )
-public class BDBHAVirtualHost extends AbstractVirtualHost<BDBHAVirtualHost>
-{
- public static final String TYPE = "BDB_HA";
-
- private final BDBMessageStore _messageStore;
- private MessageStoreLogSubject _messageStoreLogSubject;
-
- @ManagedObjectFactoryConstructor
- protected BDBHAVirtualHost(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
- {
- super(attributes, virtualHostNode);
-
- _messageStore = (BDBMessageStore) virtualHostNode.getConfigurationStore();
- _messageStoreLogSubject = new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName());
- }
-
- @Override
- protected void initialiseStorage()
- {
- }
-
- @Override
- public DurableConfigurationStore getDurableConfigurationStore()
- {
- return _messageStore;
- }
-
- @Override
- public MessageStore getMessageStore()
- {
- return _messageStore;
- }
-
- @Override
- protected MessageStoreLogSubject getMessageStoreLogSubject()
- {
- return _messageStoreLogSubject;
- }
-
-}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 5d9c2012a9..2675793b36 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -398,7 +398,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
tx = null;
try
{
- tx = _environmentFacade.getEnvironment().beginTransaction(null, null);
+ tx = _environmentFacade.beginTransaction();
//remove the message meta data from the store
DatabaseEntry key = new DatabaseEntry();
@@ -542,7 +542,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
com.sleepycat.je.Transaction txn = null;
try
{
- txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+ txn = _environmentFacade.beginTransaction();
storeConfiguredObjectEntry(txn, configuredObject);
txn.commit();
txn = null;
@@ -569,7 +569,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
com.sleepycat.je.Transaction txn = null;
try
{
- txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+ txn = _environmentFacade.beginTransaction();
Collection<UUID> removed = new ArrayList<UUID>(objects.length);
for(ConfiguredObjectRecord record : objects)
@@ -606,7 +606,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
com.sleepycat.je.Transaction txn = null;
try
{
- txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+ txn = _environmentFacade.beginTransaction();
for(ConfiguredObjectRecord record : records)
{
update(createIfNecessary, record, txn);
@@ -1290,8 +1290,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
com.sleepycat.je.Transaction txn;
try
{
- txn = _environmentFacade.getEnvironment().beginTransaction(
- null, null);
+ txn = _environmentFacade.beginTransaction();
}
catch (DatabaseException e)
{
@@ -1329,14 +1328,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private BDBTransaction() throws StoreException
{
- try
- {
- _txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
- }
- catch(DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Cannot create store transaction", e);
- }
+ _txn = _environmentFacade.beginTransaction();
}
@Override
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
index c9341dce02..6550a9122e 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
@@ -24,7 +24,6 @@ import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
-import com.sun.org.apache.xalan.internal.xsltc.runtime.BasisLibrary;
import org.apache.log4j.Logger;
import org.apache.qpid.server.store.StoreFuture;
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
index 144ab83238..e51a610a26 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
@@ -29,6 +29,7 @@ import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.Transaction;
public interface EnvironmentFacade
{
@@ -48,6 +49,8 @@ public interface EnvironmentFacade
Database getOpenDatabase(String name);
+ Transaction beginTransaction();
+
void commit(com.sleepycat.je.Transaction tx);
DatabaseException handleDatabaseException(String contextMessage, DatabaseException e);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
index b11fb6e873..643cd9ae70 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
@@ -32,6 +32,7 @@ import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.Transaction;
public class StandardEnvironmentFacade implements EnvironmentFacade
{
@@ -84,6 +85,13 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
}
}
+
+ @Override
+ public Transaction beginTransaction()
+ {
+ return _environment.beginTransaction(null, null);
+ }
+
@Override
public void commit(com.sleepycat.je.Transaction tx)
{
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
index 38fdf34196..6ec5067d01 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
@@ -48,7 +48,7 @@ public class DatabasePinger
Transaction txn = null;
try
{
- txn = facade.getEnvironment().beginTransaction(null, null);
+ txn = facade.beginTransaction();
db.put(txn, key, value);
txn.commit();
txn = null;
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 3eabaf1c73..17594aa730 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -54,9 +54,12 @@ import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Durability;
+import com.sleepycat.je.Durability.ReplicaAckPolicy;
+import com.sleepycat.je.Durability.SyncPolicy;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.EnvironmentFailureException;
import com.sleepycat.je.Transaction;
+import com.sleepycat.je.TransactionConfig;
import com.sleepycat.je.rep.InsufficientLogException;
import com.sleepycat.je.rep.InsufficientReplicasException;
import com.sleepycat.je.rep.NetworkRestore;
@@ -96,6 +99,9 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private static final int REMOTE_NODE_MONITOR_INTERVAL = Integer.getInteger(REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, DEFAULT_REMOTE_NODE_MONITOR_INTERVAL);
private static final int RESTART_TRY_LIMIT = 3;
+ static final SyncPolicy LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.NO_SYNC;
+ static final SyncPolicy REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.NO_SYNC;
+
@SuppressWarnings("serial")
private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
{{
@@ -135,8 +141,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
public static final String TYPE = "BDB-HA";
private final ReplicatedEnvironmentConfiguration _configuration;
- private final Durability _durability;
- private final Boolean _coalescingSync;
private final String _prettyGroupNodeName;
private final File _environmentDirectory;
@@ -153,6 +157,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private volatile ReplicatedEnvironment _environment;
private volatile long _joinTime;
private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
+ private volatile SyncPolicy _localTransactionSyncronizationPolicy = LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY;
+ private volatile SyncPolicy _remoteTransactionSyncronizationPolicy = REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY;
public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, EnvironmentFacadeTask[] initialisationTasks)
{
@@ -170,8 +176,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
_initialisationTasks = initialisationTasks;
_configuration = configuration;
- _durability = Durability.parse(_configuration.getDurability());
- _coalescingSync = _configuration.isCoalescingSync();
_prettyGroupNodeName = _configuration.getGroupName() + ":" + _configuration.getName();
// we relay on this executor being single-threaded as we need to restart and mutate the environment in one thread
@@ -185,6 +189,22 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
@Override
+ public Transaction beginTransaction()
+ {
+ try
+ {
+ Durability durability = getDurability();
+ TransactionConfig transactionConfig = new TransactionConfig();
+ transactionConfig.setDurability(durability);
+ return _environment.beginTransaction(null, transactionConfig);
+ }
+ catch(DatabaseException e)
+ {
+ throw handleDatabaseException("Failure to start transaction", e);
+ }
+ }
+
+ @Override
public void commit(final Transaction tx)
{
try
@@ -491,14 +511,16 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
return (String)_configuration.getHelperHostPort();
}
- public String getDurability()
+ public Durability getDurability()
{
- return _durability.toString();
+ SyncPolicy localSync = getLocalTransactionSyncronizationPolicy();
+ SyncPolicy replicaSync = getRemoteTransactionSyncronizationPolicy();
+ return new Durability(localSync, replicaSync, ReplicaAckPolicy.SIMPLE_MAJORITY);
}
public boolean isCoalescingSync()
{
- return _coalescingSync;
+ return true;
}
public String getNodeState()
@@ -883,6 +905,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
boolean designatedPrimary = _configuration.isDesignatedPrimary();
int priority = _configuration.getPriority();
int quorumOverride = _configuration.getQuorumOverride();
+ Durability durability = getDurability();
if (LOGGER.isInfoEnabled())
{
@@ -892,8 +915,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
LOGGER.info("Node name " + _configuration.getName());
LOGGER.info("Node host port " + hostPort);
LOGGER.info("Helper host port " + helperHostPort);
- LOGGER.info("Durability " + _durability);
- LOGGER.info("Coalescing sync " + _coalescingSync);
+ LOGGER.info("Durability " + durability);
LOGGER.info("Designated primary (applicable to 2 node case only) " + designatedPrimary);
LOGGER.info("Node priority " + priority);
LOGGER.info("Quorum override " + quorumOverride);
@@ -929,7 +951,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
envConfig.setAllowCreate(true);
envConfig.setTransactional(true);
envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
- envConfig.setDurability(_durability);
+ envConfig.setDurability(durability);
for (Map.Entry<String, String> configItem : environmentSettings.entrySet())
{
@@ -1015,14 +1037,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
@Override
public Committer createCommitter(String name)
{
- if (_coalescingSync)
- {
- return new CoalescingCommiter(name, this);
- }
- else
- {
- return Committer.IMMEDIATE_FUTURE_COMMITTER;
- }
+ return new CoalescingCommiter(name, this);
}
NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException
@@ -1060,6 +1075,26 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
+ public SyncPolicy getLocalTransactionSyncronizationPolicy()
+ {
+ return _localTransactionSyncronizationPolicy;
+ }
+
+ public SyncPolicy getRemoteTransactionSyncronizationPolicy()
+ {
+ return _remoteTransactionSyncronizationPolicy;
+ }
+
+ public void setLocalTransactionSyncronizationPolicy(SyncPolicy localTransactionSyncronizationPolicy)
+ {
+ _localTransactionSyncronizationPolicy = localTransactionSyncronizationPolicy;
+ }
+
+ public void setRemoteTransactionSyncronizationPolicy(SyncPolicy remoteTransactionSyncronizationPolicy)
+ {
+ _remoteTransactionSyncronizationPolicy = remoteTransactionSyncronizationPolicy;
+ }
+
private void populateExistingRemoteReplicationNodes()
{
ReplicationGroup group = _environment.getGroup();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
new file mode 100644
index 0000000000..3c560de429
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.berkeleydb;
+
+import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.model.ManagedAttribute;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+
+public interface BDBHAVirtualHost<X extends BDBHAVirtualHost<X>> extends VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>
+{
+ String REMOTE_TRANSACTION_SYNCRONIZATION_POLICY = "remoteTransactionSyncronizationPolicy";
+ String LOCAL_TRANSACTION_SYNCRONIZATION_POLICY = "localTransactionSyncronizationPolicy";
+
+ @ManagedAttribute( defaultValue = "NO_SYNC")
+ String getLocalTransactionSyncronizationPolicy();
+
+ @ManagedAttribute( defaultValue = "NO_SYNC")
+ String getRemoteTransactionSyncronizationPolicy();
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
new file mode 100644
index 0000000000..682f03bf7e
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
@@ -0,0 +1,158 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.berkeleydb;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ManagedAttributeField;
+import org.apache.qpid.server.model.ManagedObject;
+import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
+
+import com.sleepycat.je.Durability.SyncPolicy;
+
+@ManagedObject( category = false, type = "BDB_HA" )
+public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostImpl> implements BDBHAVirtualHost<BDBHAVirtualHostImpl>
+{
+ public static final String TYPE = "BDB_HA";
+
+ private final BDBMessageStore _messageStore;
+ private MessageStoreLogSubject _messageStoreLogSubject;
+
+ @ManagedAttributeField(afterSet="setLocalTransactionSyncronizationPolicyOnEnvironment")
+ private String _localTransactionSyncronizationPolicy;
+
+ @ManagedAttributeField(afterSet="setRemoteTransactionSyncronizationPolicyOnEnvironment")
+ private String _remoteTransactionSyncronizationPolicy;
+
+ @ManagedObjectFactoryConstructor
+ public BDBHAVirtualHostImpl(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
+ {
+ super(attributes, virtualHostNode);
+
+ _messageStore = (BDBMessageStore) virtualHostNode.getConfigurationStore();
+ _messageStoreLogSubject = new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName());
+ }
+
+ @Override
+ protected void initialiseStorage()
+ {
+ }
+
+ @Override
+ public DurableConfigurationStore getDurableConfigurationStore()
+ {
+ return _messageStore;
+ }
+
+ @Override
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+
+ @Override
+ protected MessageStoreLogSubject getMessageStoreLogSubject()
+ {
+ return _messageStoreLogSubject;
+ }
+
+ @Override
+ public String getLocalTransactionSyncronizationPolicy()
+ {
+ return _localTransactionSyncronizationPolicy;
+ }
+
+ @Override
+ public String getRemoteTransactionSyncronizationPolicy()
+ {
+ return _remoteTransactionSyncronizationPolicy;
+ }
+
+ @Override
+ public void onOpen()
+ {
+ super.onOpen();
+ setRemoteTransactionSyncronizationPolicyOnEnvironment();
+ setLocalTransactionSyncronizationPolicyOnEnvironment();
+ }
+
+ @Override
+ protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes)
+ {
+ super.validateChange(proxyForValidation, changedAttributes);
+
+ if(changedAttributes.contains(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY))
+ {
+ String policy = ((BDBHAVirtualHost<?>)proxyForValidation).getLocalTransactionSyncronizationPolicy();
+ validateTransactionSynchronizationPolicy(policy);
+ }
+
+ if(changedAttributes.contains(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY))
+ {
+ String policy = ((BDBHAVirtualHost<?>)proxyForValidation).getRemoteTransactionSyncronizationPolicy();
+ validateTransactionSynchronizationPolicy(policy);
+ }
+ }
+
+ private void validateTransactionSynchronizationPolicy(String policy)
+ {
+ try
+ {
+ SyncPolicy.valueOf(policy);
+ }
+ catch(Exception e)
+ {
+ throw new IllegalArgumentException("Invalid transaction syncronization policy '" + policy + "'. " + e.getMessage());
+ }
+ }
+
+ protected void setLocalTransactionSyncronizationPolicyOnEnvironment()
+ {
+ ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade();
+ if (facade != null)
+ {
+ facade.setLocalTransactionSyncronizationPolicy(SyncPolicy.valueOf(getLocalTransactionSyncronizationPolicy()));
+ }
+ }
+
+ protected void setRemoteTransactionSyncronizationPolicyOnEnvironment()
+ {
+ ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade();
+ if (facade != null)
+ {
+ facade.setRemoteTransactionSyncronizationPolicy(SyncPolicy.valueOf(getRemoteTransactionSyncronizationPolicy()));
+ }
+ }
+
+ private ReplicatedEnvironmentFacade getReplicatedEnvironmentFacade()
+ {
+ return (ReplicatedEnvironmentFacade)_messageStore.getEnvironmentFacade();
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
index 0d0292ce58..9fdaccb949 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
@@ -49,12 +49,9 @@ public interface BDBHAVirtualHostNode<X extends BDBHAVirtualHostNode<X>> extends
@ManagedAttribute(mandatory=true)
String getHelperAddress();
- @ManagedAttribute(defaultValue = "NO_SYNC,NO_SYNC,SIMPLE_MAJORITY")
+ @DerivedAttribute
String getDurability();
- @ManagedAttribute(defaultValue = "true")
- boolean isCoalescingSync();
-
@ManagedAttribute(defaultValue = "false")
boolean isDesignatedPrimary();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
index e978ab6c0b..11a58b6167 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
@@ -97,12 +97,6 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
@ManagedAttributeField
private String _address;
- @ManagedAttributeField
- private String _durability;
-
- @ManagedAttributeField
- private boolean _coalescingSync;
-
@ManagedAttributeField(afterSet="postSetDesignatedPrimary")
private boolean _designatedPrimary;
@@ -183,13 +177,12 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
@Override
public String getDurability()
{
- return _durability;
- }
-
- @Override
- public boolean isCoalescingSync()
- {
- return _coalescingSync;
+ ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
+ if (environmentFacade != null)
+ {
+ return environmentFacade.getDurability().toString();
+ }
+ return null;
}
@Override
@@ -591,7 +584,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
catch (TimeoutException e)
{
- LOGGER.warn("Change quorum override did not complete within " + MUTATE_JE_TIMEOUT_MS + "ms. New value " + _durability + " will become effective once the JE task thread is free.");
+ LOGGER.warn("Change quorum override did not complete within " + MUTATE_JE_TIMEOUT_MS + "ms. New value " + _quorumOverride + " will become effective once the JE task thread is free.");
}
catch (InterruptedException e)
{
diff --git a/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/edit.js b/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/edit.js
index 269bb402c5..02996046e3 100644
--- a/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/edit.js
+++ b/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/edit.js
@@ -43,8 +43,8 @@ define(["dojo/_base/xhr",
"dojo/domReady!"],
function (xhr, array, event, lang, win, dom, domConstruct, registry, parser, json, query, Memory, ObjectStore, template)
{
- var fields = [ "storePath", "name", "groupName", "address", "durability",
- "coalescingSync", "designatedPrimary", "priority", "quorumOverride"];
+ var fields = [ "storePath", "name", "groupName", "address",
+ "designatedPrimary", "priority", "quorumOverride"];
var bdbHaNodeEditor =
{
diff --git a/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js b/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js
index 9dbebc9d7d..da63e031e6 100644
--- a/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js
+++ b/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js
@@ -35,7 +35,7 @@ define(["dojo/_base/xhr",
function (xhr, lang, connect, parser, json, entities, query, json, registry, EnhancedGrid, UpdatableStore, UserPreferences, edit)
{
var priorityNames = {'_0': 'Never', '_1': 'Default', '_2': 'Normal', '_3': 'High'};
- var nodeFields = ["storePath", "groupName", "role", "address", "coalescingSync", "designatedPrimary", "durability", "priority", "quorumOverride"];
+ var nodeFields = ["storePath", "groupName", "role", "address", "designatedPrimary", "durability", "priority", "quorumOverride"];
function findNode(nodeClass, containerNode)
{
diff --git a/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/edit.html b/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/edit.html
index 79ab5155c3..355a9ef62a 100644
--- a/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/edit.html
+++ b/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/edit.html
@@ -79,32 +79,6 @@
</td>
</tr>
<tr>
- <td class="tableContainer-labelCell" style="width: 200px;"><strong>Durability: </strong></td>
- <td class="tableContainer-valueCell">
- <input type="text" id="editBDBHANode.durability"
- data-dojo-type="dijit/form/ValidationTextBox"
- data-dojo-props="
- name: 'durability',
- disabled: true,
- required: false,
- placeHolder: 'NO_SYNC,NO_SYNC,SIMPLE_MAJORITY',
- title: 'Enter node durability settings',
- pattern: '^(SYN|NO_SYNC|WRITE_NO_SYNC),(SYN|NO_SYNC|WRITE_NO_SYNC),(SIMPLE_MAJORITY|ALL|NONE)*$'" />
- </td>
- </tr>
- <tr>
- <td class="tableContainer-labelCell" style="width: 200px;"><strong>Coalescing sync: </strong></td>
- <td class="tableContainer-valueCell">
- <input type="checkbox" id="editBDBHANode.coalescingSync" checked="checked"
- data-dojo-type="dijit/form/CheckBox"
- data-dojo-props="
- name: 'coalescingSync',
- required: false,
- disabled: true,
- title: 'Enable coalescing sync mode for a better performance: when many transactions are synced to disk at the same time'" />
- </td>
- </tr>
- <tr>
<td class="tableContainer-labelCell" style="width: 200px;"><strong>Allow this node to operate solo: </strong></td>
<td class="tableContainer-valueCell">
<input type="checkbox" id="editBDBHANode.designatedPrimary" checked="checked"
diff --git a/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/show.html b/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/show.html
index 579ec12608..792305aa8e 100644
--- a/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/show.html
+++ b/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/show.html
@@ -36,10 +36,6 @@
<div class="address" style="float:left;">N/A</div>
</div>
<div style="clear:both">
- <div class="formLabel-labelCell" style="float:left; width: 200px;">Coalescing Sync:</div>
- <div class="coalescingSync" style="float:left;">N/A</div>
- </div>
- <div style="clear:both">
<div class="formLabel-labelCell" style="float:left; width: 200px;">Durability:</div>
<div class="durability" style="float:left;">N/A</div>
</div>
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
index 4399077a46..563665917c 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
@@ -51,6 +51,8 @@ import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.util.BrokerTestHelper;
+import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost;
+import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNodeImpl;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
@@ -127,7 +129,6 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
String groupName = "group";
String nodeHostPort = "localhost:" + findFreePort();
String helperHostPort = nodeHostPort;
- String durability = "NO_SYNC,SYNC,NONE";
UUID id = UUID.randomUUID();
Map<String, Object> attributes = new HashMap<String, Object>();
@@ -137,7 +138,6 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
attributes.put(BDBHAVirtualHostNode.ADDRESS, nodeHostPort);
attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperHostPort);
- attributes.put(BDBHAVirtualHostNode.DURABILITY, durability);
attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath);
attributes.put(BDBHAVirtualHostNode.REPLICATED_ENVIRONMENT_CONFIGURATION,
Collections.singletonMap(ReplicationConfig.REP_STREAM_TIMEOUT, repStreamTimeout));
@@ -174,7 +174,7 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
assertEquals(nodeHostPort, replicationConfig.getNodeHostPort());
assertEquals(helperHostPort, replicationConfig.getHelperHosts());
- assertEquals(durability, environment.getConfig().getDurability().toString());
+ assertEquals("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY", environment.getConfig().getDurability().toString());
assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, replicationConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT));
assertTrue("Virtual host child has not been added", virtualHostAddedLatch.await(30, TimeUnit.SECONDS));
@@ -426,6 +426,78 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
assertNull("Remote node " + replica.getName() + " is not found", findRemoteNode( master, replica.getName()));
}
+
+ public void testSetSynchronizationPolicyAttributesOnVirtualHost() throws Exception
+ {
+ int node1PortNumber = findFreePort();
+ String helperAddress = "localhost:" + node1PortNumber;
+ String groupName = "group";
+
+ Map<String, Object> nodeAttributes = new HashMap<String, Object>();
+ nodeAttributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
+ nodeAttributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+ nodeAttributes.put(BDBHAVirtualHostNode.NAME, "node1");
+ nodeAttributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+ nodeAttributes.put(BDBHAVirtualHostNode.ADDRESS, helperAddress);
+ nodeAttributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
+ nodeAttributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "1");
+ BDBHAVirtualHostNode<?> node = createHaVHN(nodeAttributes);
+
+ final CountDownLatch virtualHostAddedLatch = new CountDownLatch(1);
+ node.addChangeListener(new NoopConfigurationChangeListener()
+ {
+ @Override
+ public void childAdded(ConfiguredObject<?> object, ConfiguredObject<?> child)
+ {
+ if (child instanceof VirtualHost)
+ {
+ child.addChangeListener(this);
+ virtualHostAddedLatch.countDown();
+ }
+ }
+ });
+
+ node.start();
+ assertNodeRole(node, "MASTER", "REPLICA");
+ assertEquals("Unexpected node state", State.ACTIVE, node.getState());
+
+ assertTrue("Virtual host child has not been added", virtualHostAddedLatch.await(30, TimeUnit.SECONDS));
+ BDBHAVirtualHostImpl virtualHost = (BDBHAVirtualHostImpl)node.getVirtualHost();
+ assertNotNull("Virtual host is not created", virtualHost);
+
+ assertEquals("Unexpected local transaction synchronization policy", "NO_SYNC", virtualHost.getLocalTransactionSyncronizationPolicy());
+ assertEquals("Unexpected remote transaction synchronization policy", "NO_SYNC", virtualHost.getRemoteTransactionSyncronizationPolicy());
+
+ Map<String, Object> virtualHostAttributes = new HashMap<String,Object>();
+ virtualHostAttributes.put(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "WRITE_NO_SYNC");
+ virtualHostAttributes.put(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY, "SYNC");
+ virtualHost.setAttributes(virtualHostAttributes);
+
+ assertEquals("Unexpected local transaction synchronization policy", "WRITE_NO_SYNC", virtualHost.getLocalTransactionSyncronizationPolicy());
+ assertEquals("Unexpected remote transaction synchronization policy", "SYNC", virtualHost.getRemoteTransactionSyncronizationPolicy());
+
+ try
+ {
+ virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "INVALID"));
+ fail("Invalid syncronization policy is set");
+ }
+ catch(IllegalArgumentException e)
+ {
+ //pass
+ }
+
+ try
+ {
+ virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY, "INVALID"));
+ fail("Invalid syncronization policy is set");
+ }
+ catch(IllegalArgumentException e)
+ {
+ //pass
+ }
+
+ }
+
private BDBHARemoteReplicationNode<?> findRemoteNode(BDBHAVirtualHostNode<?> node, String name)
{
for (RemoteReplicationNode<?> remoteNode : node.getRemoteReplicationNodes())
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index 7f6083a0c9..3660566c8c 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -41,7 +41,9 @@ import org.apache.qpid.util.FileUtils;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.Durability;
+import com.sleepycat.je.Durability.SyncPolicy;
import com.sleepycat.je.Environment;
+import com.sleepycat.je.Transaction;
import com.sleepycat.je.rep.InsufficientReplicasException;
import com.sleepycat.je.rep.NodeState;
import com.sleepycat.je.rep.ReplicatedEnvironment;
@@ -183,7 +185,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
public void testGetDurability() throws Exception
{
- assertEquals("Unexpected durability", TEST_DURABILITY.toString(), createMaster().getDurability());
+ assertEquals("Unexpected durability", TEST_DURABILITY.toString(), createMaster().getDurability().toString());
}
public void testIsCoalescingSync() throws Exception
@@ -196,7 +198,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
assertEquals("Unexpected state", State.MASTER.name(), createMaster().getNodeState());
}
-
public void testPriority() throws Exception
{
ReplicatedEnvironmentFacade facade = createMaster();
@@ -685,6 +686,46 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState());
}
+ public void testSetLocalTransactionSyncronizationPolicy() throws Exception
+ {
+ ReplicatedEnvironmentFacade facade = createMaster();
+ assertEquals("Unexpected local transaction synchronization policy before change",
+ ReplicatedEnvironmentFacade.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getLocalTransactionSyncronizationPolicy());
+ facade.setLocalTransactionSyncronizationPolicy(SyncPolicy.WRITE_NO_SYNC);
+ assertEquals("Unexpected local transaction synchronization policy after change",
+ SyncPolicy.WRITE_NO_SYNC, facade.getLocalTransactionSyncronizationPolicy());
+ }
+
+ public void testSetRemoteTransactionSyncronizationPolicy() throws Exception
+ {
+ ReplicatedEnvironmentFacade facade = createMaster();
+ assertEquals("Unexpected remote transaction synchronization policy before change",
+ ReplicatedEnvironmentFacade.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getRemoteTransactionSyncronizationPolicy());
+ facade.setRemoteTransactionSyncronizationPolicy(SyncPolicy.WRITE_NO_SYNC);
+ assertEquals("Unexpected remote transaction synchronization policy after change",
+ SyncPolicy.WRITE_NO_SYNC, facade.getRemoteTransactionSyncronizationPolicy());
+ }
+
+ public void testBeginTransaction() throws Exception
+ {
+ ReplicatedEnvironmentFacade facade = createMaster();
+ Transaction txn = null;
+ try
+ {
+ txn = facade.beginTransaction();
+ assertNotNull("Transaction is not created", txn);
+ txn.commit();
+ txn = null;
+ }
+ finally
+ {
+ if (txn != null)
+ {
+ txn.abort();
+ }
+ }
+ }
+
private ReplicatedEnvironmentFacade createMaster() throws Exception
{
return createMaster(new NoopReplicationGroupListener());
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java
index 7f8f3ad22c..ac6ca45877 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java
@@ -302,20 +302,6 @@ public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase
}
}
- private Map<String, Object> waitForAttributeChanged(String url, String attributeName, Object newValue) throws Exception
- {
- List<Map<String, Object>> nodeAttributes = getRestTestHelper().getJsonAsList(url);
- long limit = System.currentTimeMillis() + 5000;
- while(System.currentTimeMillis() < limit && (nodeAttributes.size() == 0 || !newValue.equals(nodeAttributes.get(0).get(attributeName))))
- {
- Thread.sleep(100l);
- nodeAttributes = getRestTestHelper().getJsonAsList(url);
- }
- Map<String, Object> nodeData = nodeAttributes.get(0);
- assertEquals("Unexpected attribute " + attributeName, newValue, nodeData.get(attributeName));
- return nodeData;
- }
-
private void assertRemoteNodeData(String name, Map<String, Object> nodeData)
{
assertEquals("Remote node " + name + " has unexpected name", name, nodeData.get(BDBHAVirtualHostNode.NAME));
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java
new file mode 100644
index 0000000000..c120a3d63c
--- /dev/null
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY;
+import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
+import org.apache.qpid.systest.rest.QpidRestTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+import org.apache.qpid.util.FileUtils;
+
+public class BDBHAVirtualHostRestTest extends QpidRestTestCase
+{
+ private String _hostName;
+ private File _storeBaseDir;
+ private int _nodeHaPort;
+ private Object _nodeName;
+ private String _virtualhostUrl;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ setTestSystemProperty(ReplicatedEnvironmentFacade.REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, "1000");
+ _hostName = "ha";
+ _nodeName = "node1";
+ _storeBaseDir = new File(TMP_FOLDER, "store-" + _hostName + "-" + System.currentTimeMillis());
+ _nodeHaPort = getNextAvailable(getRestTestHelper().getHttpPort() + 1);
+ _virtualhostUrl = "virtualhost/" + _nodeName + "/" + _hostName;
+
+ super.setUp();
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ if (_storeBaseDir != null)
+ {
+ FileUtils.delete(_storeBaseDir, true);
+ }
+ }
+ }
+
+ @Override
+ protected void customizeConfiguration() throws IOException
+ {
+ super.customizeConfiguration();
+ TestBrokerConfiguration config = getBrokerConfiguration();
+ config.removeObjectConfiguration(VirtualHostNode.class, TEST2_VIRTUALHOST);
+ config.removeObjectConfiguration(VirtualHostNode.class, TEST3_VIRTUALHOST);
+
+ Map<String, Object> nodeAttributes = new HashMap<String, Object>();
+ nodeAttributes.put(BDBHAVirtualHostNode.NAME, _nodeName);
+ nodeAttributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+ nodeAttributes.put(BDBHAVirtualHostNode.STORE_PATH, _storeBaseDir.getPath() + File.separator + _nodeName);
+ nodeAttributes.put(BDBHAVirtualHostNode.GROUP_NAME, _hostName);
+ nodeAttributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + _nodeHaPort);
+ nodeAttributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, "localhost:" + _nodeHaPort);
+ config.addObjectConfiguration(VirtualHostNode.class, nodeAttributes);
+ }
+
+ public void testSetLocalTransactionSynchronizationPolicy() throws Exception
+ {
+ assertSetTransactionSynchronizationPolicy(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY);
+ }
+
+ public void testSetRemoteTransactionSynchronizationPolicy() throws Exception
+ {
+ assertSetTransactionSynchronizationPolicy(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY);
+ }
+
+ private void assertSetTransactionSynchronizationPolicy(String policyAttribute) throws Exception, IOException
+ {
+ Map<String, Object> hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name());
+ assertEquals("Unexpected synchronization policy before change", "NO_SYNC", hostAttributes.get(policyAttribute));
+
+ Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(policyAttribute, "SYNC");
+ getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, 200);
+
+ hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl);
+ assertEquals("Unexpected synchronization policy after change", "SYNC", hostAttributes.get(policyAttribute));
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java
index 9322f79984..3eba4fad21 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java
@@ -22,6 +22,7 @@ package org.apache.qpid.systest.rest;
import java.io.IOException;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.qpid.server.management.plugin.HttpManagement;
@@ -102,4 +103,18 @@ public class QpidRestTestCase extends QpidBrokerTestCase
{
return _restTestHelper;
}
+
+ public Map<String, Object> waitForAttributeChanged(String url, String attributeName, Object newValue) throws Exception
+ {
+ List<Map<String, Object>> nodeAttributes = getRestTestHelper().getJsonAsList(url);
+ long limit = System.currentTimeMillis() + 5000;
+ while(System.currentTimeMillis() < limit && (nodeAttributes.size() == 0 || !newValue.equals(nodeAttributes.get(0).get(attributeName))))
+ {
+ Thread.sleep(100l);
+ nodeAttributes = getRestTestHelper().getJsonAsList(url);
+ }
+ Map<String, Object> nodeData = nodeAttributes.get(0);
+ assertEquals("Unexpected attribute " + attributeName, newValue, nodeData.get(attributeName));
+ return nodeData;
+ }
}