summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2014-05-27 15:07:17 +0000
committerAlex Rudyy <orudyy@apache.org>2014-05-27 15:07:17 +0000
commit25bf403a6e42a42699b7db3f118f41544a7131f1 (patch)
tree56c7c7c326b1dc413e222edcad613f07a28dcb9f /qpid/java/bdbstore
parentd14fd4dc8626c61b9e364fa501c3627c3a26c99d (diff)
downloadqpid-python-25bf403a6e42a42699b7db3f118f41544a7131f1.tar.gz
QPID-5715: Add new attributes for transaction sync policies into BDB HA virtual host,
remove 'coalescingSync' attribute from BDB HA virtual host node and always use coalescingSync with replicated environment, change attribute 'durability' of BDB HA virtual host node into derived attribute, delegate bdb je transaction creation to EnvironmentFacade, with ReplicatedEnvironmentFacade crate transactions with durability having virtual host transaction sync policies and ack mode 'SIMPLE_MAJORITY'. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1597801 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore')
-rw-r--r--qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java2
-rw-r--r--qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java7
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java73
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java20
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java1
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java3
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java8
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java71
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java38
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java158
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java5
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java21
-rw-r--r--qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/edit.js4
-rw-r--r--qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js2
-rw-r--r--qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/edit.html26
-rw-r--r--qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/show.html4
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java78
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java45
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java14
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java117
21 files changed, 514 insertions, 185 deletions
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
index 7146af364e..8c05602967 100644
--- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
+++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
@@ -130,7 +130,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
@Override
public boolean getCoalescingSync() throws IOException, JMException
{
- return _virtualHostNode.isCoalescingSync();
+ return true;
}
@Override
diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
index 439af259ab..e5829c519a 100644
--- a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
+++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
@@ -119,13 +119,6 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
assertEquals(TEST_DURABILITY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DURABILITY));
}
- public void testCoalescingSync() throws Exception
- {
- when(_virtualHostNode.isCoalescingSync()).thenReturn(true);
-
- assertEquals(true, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_COALESCING_SYNC));
- }
-
public void testNodeState() throws Exception
{
when(_virtualHostNode.getRole()).thenReturn(TEST_NODE_STATE);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
deleted file mode 100644
index adc73c9bd9..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import java.util.Map;
-
-import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.model.ManagedObject;
-import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
-import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
-
-@ManagedObject( category = false, type = "BDB_HA" )
-public class BDBHAVirtualHost extends AbstractVirtualHost<BDBHAVirtualHost>
-{
- public static final String TYPE = "BDB_HA";
-
- private final BDBMessageStore _messageStore;
- private MessageStoreLogSubject _messageStoreLogSubject;
-
- @ManagedObjectFactoryConstructor
- protected BDBHAVirtualHost(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
- {
- super(attributes, virtualHostNode);
-
- _messageStore = (BDBMessageStore) virtualHostNode.getConfigurationStore();
- _messageStoreLogSubject = new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName());
- }
-
- @Override
- protected void initialiseStorage()
- {
- }
-
- @Override
- public DurableConfigurationStore getDurableConfigurationStore()
- {
- return _messageStore;
- }
-
- @Override
- public MessageStore getMessageStore()
- {
- return _messageStore;
- }
-
- @Override
- protected MessageStoreLogSubject getMessageStoreLogSubject()
- {
- return _messageStoreLogSubject;
- }
-
-}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 5d9c2012a9..2675793b36 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -398,7 +398,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
tx = null;
try
{
- tx = _environmentFacade.getEnvironment().beginTransaction(null, null);
+ tx = _environmentFacade.beginTransaction();
//remove the message meta data from the store
DatabaseEntry key = new DatabaseEntry();
@@ -542,7 +542,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
com.sleepycat.je.Transaction txn = null;
try
{
- txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+ txn = _environmentFacade.beginTransaction();
storeConfiguredObjectEntry(txn, configuredObject);
txn.commit();
txn = null;
@@ -569,7 +569,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
com.sleepycat.je.Transaction txn = null;
try
{
- txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+ txn = _environmentFacade.beginTransaction();
Collection<UUID> removed = new ArrayList<UUID>(objects.length);
for(ConfiguredObjectRecord record : objects)
@@ -606,7 +606,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
com.sleepycat.je.Transaction txn = null;
try
{
- txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+ txn = _environmentFacade.beginTransaction();
for(ConfiguredObjectRecord record : records)
{
update(createIfNecessary, record, txn);
@@ -1290,8 +1290,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
com.sleepycat.je.Transaction txn;
try
{
- txn = _environmentFacade.getEnvironment().beginTransaction(
- null, null);
+ txn = _environmentFacade.beginTransaction();
}
catch (DatabaseException e)
{
@@ -1329,14 +1328,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private BDBTransaction() throws StoreException
{
- try
- {
- _txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
- }
- catch(DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Cannot create store transaction", e);
- }
+ _txn = _environmentFacade.beginTransaction();
}
@Override
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
index c9341dce02..6550a9122e 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
@@ -24,7 +24,6 @@ import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
-import com.sun.org.apache.xalan.internal.xsltc.runtime.BasisLibrary;
import org.apache.log4j.Logger;
import org.apache.qpid.server.store.StoreFuture;
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
index 144ab83238..e51a610a26 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
@@ -29,6 +29,7 @@ import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.Transaction;
public interface EnvironmentFacade
{
@@ -48,6 +49,8 @@ public interface EnvironmentFacade
Database getOpenDatabase(String name);
+ Transaction beginTransaction();
+
void commit(com.sleepycat.je.Transaction tx);
DatabaseException handleDatabaseException(String contextMessage, DatabaseException e);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
index b11fb6e873..643cd9ae70 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
@@ -32,6 +32,7 @@ import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.Transaction;
public class StandardEnvironmentFacade implements EnvironmentFacade
{
@@ -84,6 +85,13 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
}
}
+
+ @Override
+ public Transaction beginTransaction()
+ {
+ return _environment.beginTransaction(null, null);
+ }
+
@Override
public void commit(com.sleepycat.je.Transaction tx)
{
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
index 38fdf34196..6ec5067d01 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
@@ -48,7 +48,7 @@ public class DatabasePinger
Transaction txn = null;
try
{
- txn = facade.getEnvironment().beginTransaction(null, null);
+ txn = facade.beginTransaction();
db.put(txn, key, value);
txn.commit();
txn = null;
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 3eabaf1c73..17594aa730 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -54,9 +54,12 @@ import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Durability;
+import com.sleepycat.je.Durability.ReplicaAckPolicy;
+import com.sleepycat.je.Durability.SyncPolicy;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.EnvironmentFailureException;
import com.sleepycat.je.Transaction;
+import com.sleepycat.je.TransactionConfig;
import com.sleepycat.je.rep.InsufficientLogException;
import com.sleepycat.je.rep.InsufficientReplicasException;
import com.sleepycat.je.rep.NetworkRestore;
@@ -96,6 +99,9 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private static final int REMOTE_NODE_MONITOR_INTERVAL = Integer.getInteger(REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, DEFAULT_REMOTE_NODE_MONITOR_INTERVAL);
private static final int RESTART_TRY_LIMIT = 3;
+ static final SyncPolicy LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.NO_SYNC;
+ static final SyncPolicy REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.NO_SYNC;
+
@SuppressWarnings("serial")
private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
{{
@@ -135,8 +141,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
public static final String TYPE = "BDB-HA";
private final ReplicatedEnvironmentConfiguration _configuration;
- private final Durability _durability;
- private final Boolean _coalescingSync;
private final String _prettyGroupNodeName;
private final File _environmentDirectory;
@@ -153,6 +157,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private volatile ReplicatedEnvironment _environment;
private volatile long _joinTime;
private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
+ private volatile SyncPolicy _localTransactionSyncronizationPolicy = LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY;
+ private volatile SyncPolicy _remoteTransactionSyncronizationPolicy = REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY;
public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, EnvironmentFacadeTask[] initialisationTasks)
{
@@ -170,8 +176,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
_initialisationTasks = initialisationTasks;
_configuration = configuration;
- _durability = Durability.parse(_configuration.getDurability());
- _coalescingSync = _configuration.isCoalescingSync();
_prettyGroupNodeName = _configuration.getGroupName() + ":" + _configuration.getName();
// we relay on this executor being single-threaded as we need to restart and mutate the environment in one thread
@@ -185,6 +189,22 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
@Override
+ public Transaction beginTransaction()
+ {
+ try
+ {
+ Durability durability = getDurability();
+ TransactionConfig transactionConfig = new TransactionConfig();
+ transactionConfig.setDurability(durability);
+ return _environment.beginTransaction(null, transactionConfig);
+ }
+ catch(DatabaseException e)
+ {
+ throw handleDatabaseException("Failure to start transaction", e);
+ }
+ }
+
+ @Override
public void commit(final Transaction tx)
{
try
@@ -491,14 +511,16 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
return (String)_configuration.getHelperHostPort();
}
- public String getDurability()
+ public Durability getDurability()
{
- return _durability.toString();
+ SyncPolicy localSync = getLocalTransactionSyncronizationPolicy();
+ SyncPolicy replicaSync = getRemoteTransactionSyncronizationPolicy();
+ return new Durability(localSync, replicaSync, ReplicaAckPolicy.SIMPLE_MAJORITY);
}
public boolean isCoalescingSync()
{
- return _coalescingSync;
+ return true;
}
public String getNodeState()
@@ -883,6 +905,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
boolean designatedPrimary = _configuration.isDesignatedPrimary();
int priority = _configuration.getPriority();
int quorumOverride = _configuration.getQuorumOverride();
+ Durability durability = getDurability();
if (LOGGER.isInfoEnabled())
{
@@ -892,8 +915,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
LOGGER.info("Node name " + _configuration.getName());
LOGGER.info("Node host port " + hostPort);
LOGGER.info("Helper host port " + helperHostPort);
- LOGGER.info("Durability " + _durability);
- LOGGER.info("Coalescing sync " + _coalescingSync);
+ LOGGER.info("Durability " + durability);
LOGGER.info("Designated primary (applicable to 2 node case only) " + designatedPrimary);
LOGGER.info("Node priority " + priority);
LOGGER.info("Quorum override " + quorumOverride);
@@ -929,7 +951,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
envConfig.setAllowCreate(true);
envConfig.setTransactional(true);
envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
- envConfig.setDurability(_durability);
+ envConfig.setDurability(durability);
for (Map.Entry<String, String> configItem : environmentSettings.entrySet())
{
@@ -1015,14 +1037,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
@Override
public Committer createCommitter(String name)
{
- if (_coalescingSync)
- {
- return new CoalescingCommiter(name, this);
- }
- else
- {
- return Committer.IMMEDIATE_FUTURE_COMMITTER;
- }
+ return new CoalescingCommiter(name, this);
}
NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException
@@ -1060,6 +1075,26 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
+ public SyncPolicy getLocalTransactionSyncronizationPolicy()
+ {
+ return _localTransactionSyncronizationPolicy;
+ }
+
+ public SyncPolicy getRemoteTransactionSyncronizationPolicy()
+ {
+ return _remoteTransactionSyncronizationPolicy;
+ }
+
+ public void setLocalTransactionSyncronizationPolicy(SyncPolicy localTransactionSyncronizationPolicy)
+ {
+ _localTransactionSyncronizationPolicy = localTransactionSyncronizationPolicy;
+ }
+
+ public void setRemoteTransactionSyncronizationPolicy(SyncPolicy remoteTransactionSyncronizationPolicy)
+ {
+ _remoteTransactionSyncronizationPolicy = remoteTransactionSyncronizationPolicy;
+ }
+
private void populateExistingRemoteReplicationNodes()
{
ReplicationGroup group = _environment.getGroup();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
new file mode 100644
index 0000000000..3c560de429
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.berkeleydb;
+
+import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.model.ManagedAttribute;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+
+public interface BDBHAVirtualHost<X extends BDBHAVirtualHost<X>> extends VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>
+{
+ String REMOTE_TRANSACTION_SYNCRONIZATION_POLICY = "remoteTransactionSyncronizationPolicy";
+ String LOCAL_TRANSACTION_SYNCRONIZATION_POLICY = "localTransactionSyncronizationPolicy";
+
+ @ManagedAttribute( defaultValue = "NO_SYNC")
+ String getLocalTransactionSyncronizationPolicy();
+
+ @ManagedAttribute( defaultValue = "NO_SYNC")
+ String getRemoteTransactionSyncronizationPolicy();
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
new file mode 100644
index 0000000000..682f03bf7e
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
@@ -0,0 +1,158 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.berkeleydb;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ManagedAttributeField;
+import org.apache.qpid.server.model.ManagedObject;
+import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
+
+import com.sleepycat.je.Durability.SyncPolicy;
+
+@ManagedObject( category = false, type = "BDB_HA" )
+public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostImpl> implements BDBHAVirtualHost<BDBHAVirtualHostImpl>
+{
+ public static final String TYPE = "BDB_HA";
+
+ private final BDBMessageStore _messageStore;
+ private MessageStoreLogSubject _messageStoreLogSubject;
+
+ @ManagedAttributeField(afterSet="setLocalTransactionSyncronizationPolicyOnEnvironment")
+ private String _localTransactionSyncronizationPolicy;
+
+ @ManagedAttributeField(afterSet="setRemoteTransactionSyncronizationPolicyOnEnvironment")
+ private String _remoteTransactionSyncronizationPolicy;
+
+ @ManagedObjectFactoryConstructor
+ public BDBHAVirtualHostImpl(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
+ {
+ super(attributes, virtualHostNode);
+
+ _messageStore = (BDBMessageStore) virtualHostNode.getConfigurationStore();
+ _messageStoreLogSubject = new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName());
+ }
+
+ @Override
+ protected void initialiseStorage()
+ {
+ }
+
+ @Override
+ public DurableConfigurationStore getDurableConfigurationStore()
+ {
+ return _messageStore;
+ }
+
+ @Override
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+
+ @Override
+ protected MessageStoreLogSubject getMessageStoreLogSubject()
+ {
+ return _messageStoreLogSubject;
+ }
+
+ @Override
+ public String getLocalTransactionSyncronizationPolicy()
+ {
+ return _localTransactionSyncronizationPolicy;
+ }
+
+ @Override
+ public String getRemoteTransactionSyncronizationPolicy()
+ {
+ return _remoteTransactionSyncronizationPolicy;
+ }
+
+ @Override
+ public void onOpen()
+ {
+ super.onOpen();
+ setRemoteTransactionSyncronizationPolicyOnEnvironment();
+ setLocalTransactionSyncronizationPolicyOnEnvironment();
+ }
+
+ @Override
+ protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes)
+ {
+ super.validateChange(proxyForValidation, changedAttributes);
+
+ if(changedAttributes.contains(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY))
+ {
+ String policy = ((BDBHAVirtualHost<?>)proxyForValidation).getLocalTransactionSyncronizationPolicy();
+ validateTransactionSynchronizationPolicy(policy);
+ }
+
+ if(changedAttributes.contains(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY))
+ {
+ String policy = ((BDBHAVirtualHost<?>)proxyForValidation).getRemoteTransactionSyncronizationPolicy();
+ validateTransactionSynchronizationPolicy(policy);
+ }
+ }
+
+ private void validateTransactionSynchronizationPolicy(String policy)
+ {
+ try
+ {
+ SyncPolicy.valueOf(policy);
+ }
+ catch(Exception e)
+ {
+ throw new IllegalArgumentException("Invalid transaction syncronization policy '" + policy + "'. " + e.getMessage());
+ }
+ }
+
+ protected void setLocalTransactionSyncronizationPolicyOnEnvironment()
+ {
+ ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade();
+ if (facade != null)
+ {
+ facade.setLocalTransactionSyncronizationPolicy(SyncPolicy.valueOf(getLocalTransactionSyncronizationPolicy()));
+ }
+ }
+
+ protected void setRemoteTransactionSyncronizationPolicyOnEnvironment()
+ {
+ ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade();
+ if (facade != null)
+ {
+ facade.setRemoteTransactionSyncronizationPolicy(SyncPolicy.valueOf(getRemoteTransactionSyncronizationPolicy()));
+ }
+ }
+
+ private ReplicatedEnvironmentFacade getReplicatedEnvironmentFacade()
+ {
+ return (ReplicatedEnvironmentFacade)_messageStore.getEnvironmentFacade();
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
index 0d0292ce58..9fdaccb949 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
@@ -49,12 +49,9 @@ public interface BDBHAVirtualHostNode<X extends BDBHAVirtualHostNode<X>> extends
@ManagedAttribute(mandatory=true)
String getHelperAddress();
- @ManagedAttribute(defaultValue = "NO_SYNC,NO_SYNC,SIMPLE_MAJORITY")
+ @DerivedAttribute
String getDurability();
- @ManagedAttribute(defaultValue = "true")
- boolean isCoalescingSync();
-
@ManagedAttribute(defaultValue = "false")
boolean isDesignatedPrimary();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
index e978ab6c0b..11a58b6167 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
@@ -97,12 +97,6 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
@ManagedAttributeField
private String _address;
- @ManagedAttributeField
- private String _durability;
-
- @ManagedAttributeField
- private boolean _coalescingSync;
-
@ManagedAttributeField(afterSet="postSetDesignatedPrimary")
private boolean _designatedPrimary;
@@ -183,13 +177,12 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
@Override
public String getDurability()
{
- return _durability;
- }
-
- @Override
- public boolean isCoalescingSync()
- {
- return _coalescingSync;
+ ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
+ if (environmentFacade != null)
+ {
+ return environmentFacade.getDurability().toString();
+ }
+ return null;
}
@Override
@@ -591,7 +584,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
catch (TimeoutException e)
{
- LOGGER.warn("Change quorum override did not complete within " + MUTATE_JE_TIMEOUT_MS + "ms. New value " + _durability + " will become effective once the JE task thread is free.");
+ LOGGER.warn("Change quorum override did not complete within " + MUTATE_JE_TIMEOUT_MS + "ms. New value " + _quorumOverride + " will become effective once the JE task thread is free.");
}
catch (InterruptedException e)
{
diff --git a/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/edit.js b/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/edit.js
index 269bb402c5..02996046e3 100644
--- a/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/edit.js
+++ b/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/edit.js
@@ -43,8 +43,8 @@ define(["dojo/_base/xhr",
"dojo/domReady!"],
function (xhr, array, event, lang, win, dom, domConstruct, registry, parser, json, query, Memory, ObjectStore, template)
{
- var fields = [ "storePath", "name", "groupName", "address", "durability",
- "coalescingSync", "designatedPrimary", "priority", "quorumOverride"];
+ var fields = [ "storePath", "name", "groupName", "address",
+ "designatedPrimary", "priority", "quorumOverride"];
var bdbHaNodeEditor =
{
diff --git a/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js b/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js
index 9dbebc9d7d..da63e031e6 100644
--- a/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js
+++ b/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js
@@ -35,7 +35,7 @@ define(["dojo/_base/xhr",
function (xhr, lang, connect, parser, json, entities, query, json, registry, EnhancedGrid, UpdatableStore, UserPreferences, edit)
{
var priorityNames = {'_0': 'Never', '_1': 'Default', '_2': 'Normal', '_3': 'High'};
- var nodeFields = ["storePath", "groupName", "role", "address", "coalescingSync", "designatedPrimary", "durability", "priority", "quorumOverride"];
+ var nodeFields = ["storePath", "groupName", "role", "address", "designatedPrimary", "durability", "priority", "quorumOverride"];
function findNode(nodeClass, containerNode)
{
diff --git a/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/edit.html b/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/edit.html
index 79ab5155c3..355a9ef62a 100644
--- a/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/edit.html
+++ b/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/edit.html
@@ -79,32 +79,6 @@
</td>
</tr>
<tr>
- <td class="tableContainer-labelCell" style="width: 200px;"><strong>Durability: </strong></td>
- <td class="tableContainer-valueCell">
- <input type="text" id="editBDBHANode.durability"
- data-dojo-type="dijit/form/ValidationTextBox"
- data-dojo-props="
- name: 'durability',
- disabled: true,
- required: false,
- placeHolder: 'NO_SYNC,NO_SYNC,SIMPLE_MAJORITY',
- title: 'Enter node durability settings',
- pattern: '^(SYN|NO_SYNC|WRITE_NO_SYNC),(SYN|NO_SYNC|WRITE_NO_SYNC),(SIMPLE_MAJORITY|ALL|NONE)*$'" />
- </td>
- </tr>
- <tr>
- <td class="tableContainer-labelCell" style="width: 200px;"><strong>Coalescing sync: </strong></td>
- <td class="tableContainer-valueCell">
- <input type="checkbox" id="editBDBHANode.coalescingSync" checked="checked"
- data-dojo-type="dijit/form/CheckBox"
- data-dojo-props="
- name: 'coalescingSync',
- required: false,
- disabled: true,
- title: 'Enable coalescing sync mode for a better performance: when many transactions are synced to disk at the same time'" />
- </td>
- </tr>
- <tr>
<td class="tableContainer-labelCell" style="width: 200px;"><strong>Allow this node to operate solo: </strong></td>
<td class="tableContainer-valueCell">
<input type="checkbox" id="editBDBHANode.designatedPrimary" checked="checked"
diff --git a/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/show.html b/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/show.html
index 579ec12608..792305aa8e 100644
--- a/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/show.html
+++ b/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/show.html
@@ -36,10 +36,6 @@
<div class="address" style="float:left;">N/A</div>
</div>
<div style="clear:both">
- <div class="formLabel-labelCell" style="float:left; width: 200px;">Coalescing Sync:</div>
- <div class="coalescingSync" style="float:left;">N/A</div>
- </div>
- <div style="clear:both">
<div class="formLabel-labelCell" style="float:left; width: 200px;">Durability:</div>
<div class="durability" style="float:left;">N/A</div>
</div>
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
index 4399077a46..563665917c 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
@@ -51,6 +51,8 @@ import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.util.BrokerTestHelper;
+import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost;
+import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNodeImpl;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
@@ -127,7 +129,6 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
String groupName = "group";
String nodeHostPort = "localhost:" + findFreePort();
String helperHostPort = nodeHostPort;
- String durability = "NO_SYNC,SYNC,NONE";
UUID id = UUID.randomUUID();
Map<String, Object> attributes = new HashMap<String, Object>();
@@ -137,7 +138,6 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
attributes.put(BDBHAVirtualHostNode.ADDRESS, nodeHostPort);
attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperHostPort);
- attributes.put(BDBHAVirtualHostNode.DURABILITY, durability);
attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath);
attributes.put(BDBHAVirtualHostNode.REPLICATED_ENVIRONMENT_CONFIGURATION,
Collections.singletonMap(ReplicationConfig.REP_STREAM_TIMEOUT, repStreamTimeout));
@@ -174,7 +174,7 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
assertEquals(nodeHostPort, replicationConfig.getNodeHostPort());
assertEquals(helperHostPort, replicationConfig.getHelperHosts());
- assertEquals(durability, environment.getConfig().getDurability().toString());
+ assertEquals("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY", environment.getConfig().getDurability().toString());
assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, replicationConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT));
assertTrue("Virtual host child has not been added", virtualHostAddedLatch.await(30, TimeUnit.SECONDS));
@@ -426,6 +426,78 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
assertNull("Remote node " + replica.getName() + " is not found", findRemoteNode( master, replica.getName()));
}
+
+ public void testSetSynchronizationPolicyAttributesOnVirtualHost() throws Exception
+ {
+ int node1PortNumber = findFreePort();
+ String helperAddress = "localhost:" + node1PortNumber;
+ String groupName = "group";
+
+ Map<String, Object> nodeAttributes = new HashMap<String, Object>();
+ nodeAttributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
+ nodeAttributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+ nodeAttributes.put(BDBHAVirtualHostNode.NAME, "node1");
+ nodeAttributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+ nodeAttributes.put(BDBHAVirtualHostNode.ADDRESS, helperAddress);
+ nodeAttributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
+ nodeAttributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "1");
+ BDBHAVirtualHostNode<?> node = createHaVHN(nodeAttributes);
+
+ final CountDownLatch virtualHostAddedLatch = new CountDownLatch(1);
+ node.addChangeListener(new NoopConfigurationChangeListener()
+ {
+ @Override
+ public void childAdded(ConfiguredObject<?> object, ConfiguredObject<?> child)
+ {
+ if (child instanceof VirtualHost)
+ {
+ child.addChangeListener(this);
+ virtualHostAddedLatch.countDown();
+ }
+ }
+ });
+
+ node.start();
+ assertNodeRole(node, "MASTER", "REPLICA");
+ assertEquals("Unexpected node state", State.ACTIVE, node.getState());
+
+ assertTrue("Virtual host child has not been added", virtualHostAddedLatch.await(30, TimeUnit.SECONDS));
+ BDBHAVirtualHostImpl virtualHost = (BDBHAVirtualHostImpl)node.getVirtualHost();
+ assertNotNull("Virtual host is not created", virtualHost);
+
+ assertEquals("Unexpected local transaction synchronization policy", "NO_SYNC", virtualHost.getLocalTransactionSyncronizationPolicy());
+ assertEquals("Unexpected remote transaction synchronization policy", "NO_SYNC", virtualHost.getRemoteTransactionSyncronizationPolicy());
+
+ Map<String, Object> virtualHostAttributes = new HashMap<String,Object>();
+ virtualHostAttributes.put(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "WRITE_NO_SYNC");
+ virtualHostAttributes.put(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY, "SYNC");
+ virtualHost.setAttributes(virtualHostAttributes);
+
+ assertEquals("Unexpected local transaction synchronization policy", "WRITE_NO_SYNC", virtualHost.getLocalTransactionSyncronizationPolicy());
+ assertEquals("Unexpected remote transaction synchronization policy", "SYNC", virtualHost.getRemoteTransactionSyncronizationPolicy());
+
+ try
+ {
+ virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "INVALID"));
+ fail("Invalid syncronization policy is set");
+ }
+ catch(IllegalArgumentException e)
+ {
+ //pass
+ }
+
+ try
+ {
+ virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY, "INVALID"));
+ fail("Invalid syncronization policy is set");
+ }
+ catch(IllegalArgumentException e)
+ {
+ //pass
+ }
+
+ }
+
private BDBHARemoteReplicationNode<?> findRemoteNode(BDBHAVirtualHostNode<?> node, String name)
{
for (RemoteReplicationNode<?> remoteNode : node.getRemoteReplicationNodes())
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index 7f6083a0c9..3660566c8c 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -41,7 +41,9 @@ import org.apache.qpid.util.FileUtils;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.Durability;
+import com.sleepycat.je.Durability.SyncPolicy;
import com.sleepycat.je.Environment;
+import com.sleepycat.je.Transaction;
import com.sleepycat.je.rep.InsufficientReplicasException;
import com.sleepycat.je.rep.NodeState;
import com.sleepycat.je.rep.ReplicatedEnvironment;
@@ -183,7 +185,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
public void testGetDurability() throws Exception
{
- assertEquals("Unexpected durability", TEST_DURABILITY.toString(), createMaster().getDurability());
+ assertEquals("Unexpected durability", TEST_DURABILITY.toString(), createMaster().getDurability().toString());
}
public void testIsCoalescingSync() throws Exception
@@ -196,7 +198,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
assertEquals("Unexpected state", State.MASTER.name(), createMaster().getNodeState());
}
-
public void testPriority() throws Exception
{
ReplicatedEnvironmentFacade facade = createMaster();
@@ -685,6 +686,46 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState());
}
+ public void testSetLocalTransactionSyncronizationPolicy() throws Exception
+ {
+ ReplicatedEnvironmentFacade facade = createMaster();
+ assertEquals("Unexpected local transaction synchronization policy before change",
+ ReplicatedEnvironmentFacade.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getLocalTransactionSyncronizationPolicy());
+ facade.setLocalTransactionSyncronizationPolicy(SyncPolicy.WRITE_NO_SYNC);
+ assertEquals("Unexpected local transaction synchronization policy after change",
+ SyncPolicy.WRITE_NO_SYNC, facade.getLocalTransactionSyncronizationPolicy());
+ }
+
+ public void testSetRemoteTransactionSyncronizationPolicy() throws Exception
+ {
+ ReplicatedEnvironmentFacade facade = createMaster();
+ assertEquals("Unexpected remote transaction synchronization policy before change",
+ ReplicatedEnvironmentFacade.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getRemoteTransactionSyncronizationPolicy());
+ facade.setRemoteTransactionSyncronizationPolicy(SyncPolicy.WRITE_NO_SYNC);
+ assertEquals("Unexpected remote transaction synchronization policy after change",
+ SyncPolicy.WRITE_NO_SYNC, facade.getRemoteTransactionSyncronizationPolicy());
+ }
+
+ public void testBeginTransaction() throws Exception
+ {
+ ReplicatedEnvironmentFacade facade = createMaster();
+ Transaction txn = null;
+ try
+ {
+ txn = facade.beginTransaction();
+ assertNotNull("Transaction is not created", txn);
+ txn.commit();
+ txn = null;
+ }
+ finally
+ {
+ if (txn != null)
+ {
+ txn.abort();
+ }
+ }
+ }
+
private ReplicatedEnvironmentFacade createMaster() throws Exception
{
return createMaster(new NoopReplicationGroupListener());
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java
index 7f8f3ad22c..ac6ca45877 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java
@@ -302,20 +302,6 @@ public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase
}
}
- private Map<String, Object> waitForAttributeChanged(String url, String attributeName, Object newValue) throws Exception
- {
- List<Map<String, Object>> nodeAttributes = getRestTestHelper().getJsonAsList(url);
- long limit = System.currentTimeMillis() + 5000;
- while(System.currentTimeMillis() < limit && (nodeAttributes.size() == 0 || !newValue.equals(nodeAttributes.get(0).get(attributeName))))
- {
- Thread.sleep(100l);
- nodeAttributes = getRestTestHelper().getJsonAsList(url);
- }
- Map<String, Object> nodeData = nodeAttributes.get(0);
- assertEquals("Unexpected attribute " + attributeName, newValue, nodeData.get(attributeName));
- return nodeData;
- }
-
private void assertRemoteNodeData(String name, Map<String, Object> nodeData)
{
assertEquals("Remote node " + name + " has unexpected name", name, nodeData.get(BDBHAVirtualHostNode.NAME));
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java
new file mode 100644
index 0000000000..c120a3d63c
--- /dev/null
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY;
+import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
+import org.apache.qpid.systest.rest.QpidRestTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+import org.apache.qpid.util.FileUtils;
+
+public class BDBHAVirtualHostRestTest extends QpidRestTestCase
+{
+ private String _hostName;
+ private File _storeBaseDir;
+ private int _nodeHaPort;
+ private Object _nodeName;
+ private String _virtualhostUrl;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ setTestSystemProperty(ReplicatedEnvironmentFacade.REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, "1000");
+ _hostName = "ha";
+ _nodeName = "node1";
+ _storeBaseDir = new File(TMP_FOLDER, "store-" + _hostName + "-" + System.currentTimeMillis());
+ _nodeHaPort = getNextAvailable(getRestTestHelper().getHttpPort() + 1);
+ _virtualhostUrl = "virtualhost/" + _nodeName + "/" + _hostName;
+
+ super.setUp();
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ if (_storeBaseDir != null)
+ {
+ FileUtils.delete(_storeBaseDir, true);
+ }
+ }
+ }
+
+ @Override
+ protected void customizeConfiguration() throws IOException
+ {
+ super.customizeConfiguration();
+ TestBrokerConfiguration config = getBrokerConfiguration();
+ config.removeObjectConfiguration(VirtualHostNode.class, TEST2_VIRTUALHOST);
+ config.removeObjectConfiguration(VirtualHostNode.class, TEST3_VIRTUALHOST);
+
+ Map<String, Object> nodeAttributes = new HashMap<String, Object>();
+ nodeAttributes.put(BDBHAVirtualHostNode.NAME, _nodeName);
+ nodeAttributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+ nodeAttributes.put(BDBHAVirtualHostNode.STORE_PATH, _storeBaseDir.getPath() + File.separator + _nodeName);
+ nodeAttributes.put(BDBHAVirtualHostNode.GROUP_NAME, _hostName);
+ nodeAttributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + _nodeHaPort);
+ nodeAttributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, "localhost:" + _nodeHaPort);
+ config.addObjectConfiguration(VirtualHostNode.class, nodeAttributes);
+ }
+
+ public void testSetLocalTransactionSynchronizationPolicy() throws Exception
+ {
+ assertSetTransactionSynchronizationPolicy(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY);
+ }
+
+ public void testSetRemoteTransactionSynchronizationPolicy() throws Exception
+ {
+ assertSetTransactionSynchronizationPolicy(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY);
+ }
+
+ private void assertSetTransactionSynchronizationPolicy(String policyAttribute) throws Exception, IOException
+ {
+ Map<String, Object> hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name());
+ assertEquals("Unexpected synchronization policy before change", "NO_SYNC", hostAttributes.get(policyAttribute));
+
+ Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(policyAttribute, "SYNC");
+ getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, 200);
+
+ hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl);
+ assertEquals("Unexpected synchronization policy after change", "SYNC", hostAttributes.get(policyAttribute));
+ }
+}