summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java22
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java137
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java15
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java70
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java34
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java19
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java56
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java1
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java2
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java16
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java53
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java20
13 files changed, 299 insertions, 148 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 2675793b36..39bfdbc6bf 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -317,7 +317,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
if (_committer != null)
{
- _committer.stop();
+ _committer.close();
}
}
finally
@@ -335,19 +335,9 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
if (_configurationStoreOpen.compareAndSet(true, false))
{
- try
- {
- if (_committer != null)
- {
- _committer.stop();
- }
- }
- finally
+ if (!_messageStoreOpen.get())
{
- if (!_messageStoreOpen.get())
- {
- closeEnvironment();
- }
+ closeEnvironment();
}
}
}
@@ -542,7 +532,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
com.sleepycat.je.Transaction txn = null;
try
{
- txn = _environmentFacade.beginTransaction();
+ txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
storeConfiguredObjectEntry(txn, configuredObject);
txn.commit();
txn = null;
@@ -569,7 +559,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
com.sleepycat.je.Transaction txn = null;
try
{
- txn = _environmentFacade.beginTransaction();
+ txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
Collection<UUID> removed = new ArrayList<UUID>(objects.length);
for(ConfiguredObjectRecord record : objects)
@@ -606,7 +596,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
com.sleepycat.je.Transaction txn = null;
try
{
- txn = _environmentFacade.beginTransaction();
+ txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
for(ConfiguredObjectRecord record : records)
{
update(createIfNecessary, record, txn);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
index 6550a9122e..cf76d13f2a 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
@@ -22,7 +22,11 @@ package org.apache.qpid.server.store.berkeleydb;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;
import org.apache.qpid.server.store.StoreFuture;
@@ -33,55 +37,94 @@ import com.sleepycat.je.Transaction;
public class CoalescingCommiter implements Committer
{
- private final CommitThread _commitThread;
+ private final CommitTask _commitTask;
+ private final AtomicReference<Boolean> _started;
+ private final ExecutorService _taskExecutor;
- public CoalescingCommiter(String name, EnvironmentFacade environmentFacade)
+ public CoalescingCommiter(final String name, EnvironmentFacade environmentFacade)
{
- _commitThread = new CommitThread("Commit-Thread-" + name, environmentFacade);
+ _started = new AtomicReference<Boolean>(false);
+ _commitTask = new CommitTask(environmentFacade);
+ _taskExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
+ {
+ @Override
+ public Thread newThread(Runnable r)
+ {
+ return new Thread(r, "Commit-Thread-" + name);
+ }
+ });
}
@Override
public void start()
{
- _commitThread.start();
+ if (_started.compareAndSet(false, true))
+ {
+ _taskExecutor.submit(_commitTask);
+ }
}
@Override
public void stop()
{
- _commitThread.close();
+ if (_started.compareAndSet(true, false))
+ {
+ _commitTask.stop();
+ }
+ }
+
+ @Override
+ public void close()
+ {
try
{
- _commitThread.join();
+ _started.set(false);
+ _commitTask.close();
}
- catch (InterruptedException ie)
+ finally
{
- Thread.currentThread().interrupt();
- throw new RuntimeException("Commit thread has not shutdown", ie);
+ _taskExecutor.shutdown();
}
}
@Override
public StoreFuture commit(Transaction tx, boolean syncCommit)
{
- BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
- commitFuture.commit();
- return commitFuture;
+ if (_started.get())
+ {
+ BDBCommitFuture commitFuture = new BDBCommitFuture(_commitTask, tx, syncCommit);
+ try
+ {
+ commitFuture.commit();
+ return commitFuture;
+ }
+ catch(IllegalStateException e)
+ {
+ // IllegalStateException is thrown when commit thread is stopped whilst commit is called
+ }
+ }
+
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+
+ public boolean isStarted()
+ {
+ return _started.get();
}
private static final class BDBCommitFuture implements StoreFuture
{
private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class);
- private final CommitThread _commitThread;
+ private final CommitTask _commitTask;
private final Transaction _tx;
private final boolean _syncCommit;
private RuntimeException _databaseException;
private boolean _complete;
- public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit)
+ public BDBCommitFuture(CommitTask commitTask, Transaction tx, boolean syncCommit)
{
- _commitThread = commitThread;
+ _commitTask = commitTask;
_tx = tx;
_syncCommit = syncCommit;
}
@@ -107,7 +150,7 @@ public class CoalescingCommiter implements Committer
public void commit() throws DatabaseException
{
- _commitThread.addJob(this, _syncCommit);
+ _commitTask.addJob(this, _syncCommit);
if(!_syncCommit)
{
@@ -142,7 +185,12 @@ public class CoalescingCommiter implements Committer
while (!isComplete())
{
- _commitThread.explicitNotify();
+ if (_commitTask.isStopped())
+ {
+ throw new IllegalStateException("Commit thread is stopped");
+ }
+
+ _commitTask.explicitNotify();
try
{
wait(250);
@@ -162,28 +210,32 @@ public class CoalescingCommiter implements Committer
}
/**
- * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
+ * Implements a {@link Runnable} which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
* themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
* continuing, but it is the responsibility of this thread to tell the commit operations when they have been
* completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
*
* <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table>
*/
- private static class CommitThread extends Thread
+ private static class CommitTask implements Runnable
{
- private static final Logger LOGGER = Logger.getLogger(CommitThread.class);
+ private static final Logger LOGGER = Logger.getLogger(CommitTask.class);
- private final AtomicBoolean _stopped = new AtomicBoolean(false);
+ private final AtomicBoolean _stopped = new AtomicBoolean(true);
private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
private final Object _lock = new Object();
private final EnvironmentFacade _environmentFacade;
- public CommitThread(String name, EnvironmentFacade environmentFacade)
+ public CommitTask(EnvironmentFacade environmentFacade)
{
- super(name);
_environmentFacade = environmentFacade;
}
+ public boolean isStopped()
+ {
+ return _stopped.get();
+ }
+
public void explicitNotify()
{
synchronized (_lock)
@@ -192,26 +244,30 @@ public class CoalescingCommiter implements Committer
}
}
+ @Override
public void run()
{
- while (!_stopped.get())
+ if (_stopped.compareAndSet(true, false))
{
- synchronized (_lock)
+ while (!_stopped.get())
{
- while (!_stopped.get() && !hasJobs())
+ synchronized (_lock)
{
- try
- {
- // Periodically wake up and check, just in case we
- // missed a notification. Don't want to lock the broker hard.
- _lock.wait(1000);
- }
- catch (InterruptedException e)
+ while (!_stopped.get() && !hasJobs())
{
+ try
+ {
+ // Periodically wake up and check, just in case we
+ // missed a notification. Don't want to lock the broker hard.
+ _lock.wait(1000);
+ }
+ catch (InterruptedException e)
+ {
+ }
}
}
+ processJobs();
}
- processJobs();
}
}
@@ -303,9 +359,20 @@ public class CoalescingCommiter implements Committer
}
}
+ public void stop()
+ {
+ synchronized (_lock)
+ {
+ if (_stopped.compareAndSet(false, true) || hasJobs())
+ {
+ processJobs();
+ }
+ }
+ }
+
public void close()
{
- RuntimeException e = new RuntimeException("Commit thread has been closed, transaction aborted");
+ RuntimeException e = new RuntimeException("Commit thread has been stopped");
synchronized (_lock)
{
_stopped.set(true);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
index 36ee2ad306..d1362ed89c 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
@@ -32,6 +32,10 @@ public interface Committer
void stop();
+ void close();
+
+ boolean isStarted();
+
Committer IMMEDIATE_FUTURE_COMMITTER = new Committer()
{
@@ -50,6 +54,17 @@ public interface Committer
public void stop()
{
}
+
+ @Override
+ public void close()
+ {
+ }
+
+ @Override
+ public boolean isStarted()
+ {
+ return true;
+ }
};
} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java
index 76a48c189e..1fc37db902 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java
@@ -29,8 +29,6 @@ public interface ReplicatedEnvironmentConfiguration
String getGroupName();
String getHostPort();
String getHelperHostPort();
- String getDurability();
- boolean isCoalescingSync();
boolean isDesignatedPrimary();
int getPriority();
int getQuorumOverride();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 17594aa730..b57d5bae21 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -99,8 +99,9 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private static final int REMOTE_NODE_MONITOR_INTERVAL = Integer.getInteger(REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, DEFAULT_REMOTE_NODE_MONITOR_INTERVAL);
private static final int RESTART_TRY_LIMIT = 3;
- static final SyncPolicy LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.NO_SYNC;
+ static final SyncPolicy LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.SYNC;
static final SyncPolicy REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.NO_SYNC;
+ static final ReplicaAckPolicy REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY = ReplicaAckPolicy.SIMPLE_MAJORITY;
@SuppressWarnings("serial")
private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
@@ -153,12 +154,14 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>();
private final AtomicBoolean _initialised;
private final EnvironmentFacadeTask[] _initialisationTasks;
+ private final Durability _defaultDurability;
+ private final CoalescingCommiter _coalescingCommiter;
private volatile ReplicatedEnvironment _environment;
private volatile long _joinTime;
private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
- private volatile SyncPolicy _localTransactionSyncronizationPolicy = LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY;
- private volatile SyncPolicy _remoteTransactionSyncronizationPolicy = REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY;
+ private volatile SyncPolicy _messageStoreLocalTransactionSyncronizationPolicy = LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY;
+ private volatile SyncPolicy _messageStoreRemoteTransactionSyncronizationPolicy = REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY;
public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, EnvironmentFacadeTask[] initialisationTasks)
{
@@ -175,13 +178,15 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
_initialised = new AtomicBoolean();
_initialisationTasks = initialisationTasks;
_configuration = configuration;
-
+ _defaultDurability = new Durability(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY);
_prettyGroupNodeName = _configuration.getGroupName() + ":" + _configuration.getName();
// we relay on this executor being single-threaded as we need to restart and mutate the environment in one thread
_environmentJobExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("Environment-" + _prettyGroupNodeName));
_groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
+ _coalescingCommiter = new CoalescingCommiter(_configuration.getGroupName(), this);
+
// create environment in a separate thread to avoid renaming of the current thread by JE
_environment = createEnvironment(true);
populateExistingRemoteReplicationNodes();
@@ -193,9 +198,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
{
try
{
- Durability durability = getDurability();
TransactionConfig transactionConfig = new TransactionConfig();
- transactionConfig.setDurability(durability);
+ transactionConfig.setDurability(getMessageStoreTransactionDurability());
return _environment.beginTransaction(null, transactionConfig);
}
catch(DatabaseException e)
@@ -511,16 +515,26 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
return (String)_configuration.getHelperHostPort();
}
+ Durability getMessageStoreTransactionDurability()
+ {
+ SyncPolicy localSync = getMessageStoreLocalTransactionSyncronizationPolicy();
+ if ( localSync == LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY && _coalescingCommiter.isStarted())
+ {
+ localSync = SyncPolicy.NO_SYNC;
+ }
+ SyncPolicy replicaSync = getMessageStoreRemoteTransactionSyncronizationPolicy();
+ return new Durability(localSync, replicaSync, getReplicaAcknowledgmentPolicy());
+ }
+
public Durability getDurability()
{
- SyncPolicy localSync = getLocalTransactionSyncronizationPolicy();
- SyncPolicy replicaSync = getRemoteTransactionSyncronizationPolicy();
- return new Durability(localSync, replicaSync, ReplicaAckPolicy.SIMPLE_MAJORITY);
+ return new Durability(getMessageStoreLocalTransactionSyncronizationPolicy(),
+ getMessageStoreRemoteTransactionSyncronizationPolicy(), getReplicaAcknowledgmentPolicy());
}
public boolean isCoalescingSync()
{
- return true;
+ return _coalescingCommiter.isStarted();
}
public String getNodeState()
@@ -905,7 +919,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
boolean designatedPrimary = _configuration.isDesignatedPrimary();
int priority = _configuration.getPriority();
int quorumOverride = _configuration.getQuorumOverride();
- Durability durability = getDurability();
if (LOGGER.isInfoEnabled())
{
@@ -915,7 +928,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
LOGGER.info("Node name " + _configuration.getName());
LOGGER.info("Node host port " + hostPort);
LOGGER.info("Helper host port " + helperHostPort);
- LOGGER.info("Durability " + durability);
+ LOGGER.info("Durability " + _defaultDurability);
LOGGER.info("Designated primary (applicable to 2 node case only) " + designatedPrimary);
LOGGER.info("Node priority " + priority);
LOGGER.info("Quorum override " + quorumOverride);
@@ -951,7 +964,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
envConfig.setAllowCreate(true);
envConfig.setTransactional(true);
envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
- envConfig.setDurability(durability);
+ envConfig.setDurability(_defaultDurability);
for (Map.Entry<String, String> configItem : environmentSettings.entrySet())
{
@@ -1037,7 +1050,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
@Override
public Committer createCommitter(String name)
{
- return new CoalescingCommiter(name, this);
+ return _coalescingCommiter;
}
NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException
@@ -1075,24 +1088,37 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
- public SyncPolicy getLocalTransactionSyncronizationPolicy()
+ public SyncPolicy getMessageStoreLocalTransactionSyncronizationPolicy()
{
- return _localTransactionSyncronizationPolicy;
+ return _messageStoreLocalTransactionSyncronizationPolicy;
}
- public SyncPolicy getRemoteTransactionSyncronizationPolicy()
+ public SyncPolicy getMessageStoreRemoteTransactionSyncronizationPolicy()
{
- return _remoteTransactionSyncronizationPolicy;
+ return _messageStoreRemoteTransactionSyncronizationPolicy;
}
- public void setLocalTransactionSyncronizationPolicy(SyncPolicy localTransactionSyncronizationPolicy)
+ public ReplicaAckPolicy getReplicaAcknowledgmentPolicy()
{
- _localTransactionSyncronizationPolicy = localTransactionSyncronizationPolicy;
+ return REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY;
+ }
+
+ public void setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy localTransactionSyncronizationPolicy)
+ {
+ _messageStoreLocalTransactionSyncronizationPolicy = localTransactionSyncronizationPolicy;
+ if (localTransactionSyncronizationPolicy == LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY)
+ {
+ _coalescingCommiter.start();
+ }
+ else
+ {
+ _coalescingCommiter.stop();
+ }
}
- public void setRemoteTransactionSyncronizationPolicy(SyncPolicy remoteTransactionSyncronizationPolicy)
+ public void setMessageStoreRemoteTransactionSyncrhonizationPolicy(SyncPolicy remoteTransactionSyncronizationPolicy)
{
- _remoteTransactionSyncronizationPolicy = remoteTransactionSyncronizationPolicy;
+ _messageStoreRemoteTransactionSyncronizationPolicy = remoteTransactionSyncronizationPolicy;
}
private void populateExistingRemoteReplicationNodes()
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
index 62035bb65f..e154e8bf45 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
@@ -24,7 +24,7 @@ import java.util.Map;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory;
-import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNodeImpl;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory
{
@@ -36,76 +36,64 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact
@Override
public boolean isDesignatedPrimary()
{
- return (Boolean)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.DESIGNATED_PRIMARY);
- }
-
- @Override
- public boolean isCoalescingSync()
- {
- return (Boolean)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.COALESCING_SYNC);
+ return (Boolean)messageStoreSettings.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY);
}
@Override
public String getStorePath()
{
- return (String) messageStoreSettings.get(BDBHAVirtualHostNodeImpl.STORE_PATH);
+ return (String) messageStoreSettings.get(BDBHAVirtualHostNode.STORE_PATH);
}
@SuppressWarnings("unchecked")
@Override
public Map<String, String> getParameters()
{
- return (Map<String, String>) messageStoreSettings.get(BDBHAVirtualHostNodeImpl.ENVIRONMENT_CONFIGURATION);
+ return (Map<String, String>) messageStoreSettings.get(BDBHAVirtualHostNode.ENVIRONMENT_CONFIGURATION);
}
@SuppressWarnings("unchecked")
@Override
public Map<String, String> getReplicationParameters()
{
- return (Map<String, String>) messageStoreSettings.get(BDBHAVirtualHostNodeImpl.REPLICATED_ENVIRONMENT_CONFIGURATION);
+ return (Map<String, String>) messageStoreSettings.get(BDBHAVirtualHostNode.REPLICATED_ENVIRONMENT_CONFIGURATION);
}
@Override
public int getQuorumOverride()
{
- return (Integer)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.QUORUM_OVERRIDE);
+ return (Integer)messageStoreSettings.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE);
}
@Override
public int getPriority()
{
- return (Integer)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.PRIORITY);
+ return (Integer)messageStoreSettings.get(BDBHAVirtualHostNode.PRIORITY);
}
@Override
public String getName()
{
- return (String)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.NAME);
+ return (String)messageStoreSettings.get(BDBHAVirtualHostNode.NAME);
}
@Override
public String getHostPort()
{
- return (String)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.ADDRESS);
+ return (String)messageStoreSettings.get(BDBHAVirtualHostNode.ADDRESS);
}
@Override
public String getHelperHostPort()
{
- return (String)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.HELPER_ADDRESS);
+ return (String)messageStoreSettings.get(BDBHAVirtualHostNode.HELPER_ADDRESS);
}
@Override
public String getGroupName()
{
- return (String)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.GROUP_NAME);
+ return (String)messageStoreSettings.get(BDBHAVirtualHostNode.GROUP_NAME);
}
-
- @Override
- public String getDurability()
- {
- return (String)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.DURABILITY);
- }
};
return new ReplicatedEnvironmentFacade(configuration, initialisationTasks);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
index 3c560de429..84ac9ca277 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
@@ -21,18 +21,27 @@
package org.apache.qpid.server.virtualhost.berkeleydb;
import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.model.DerivedAttribute;
import org.apache.qpid.server.model.ManagedAttribute;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public interface BDBHAVirtualHost<X extends BDBHAVirtualHost<X>> extends VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>
{
- String REMOTE_TRANSACTION_SYNCRONIZATION_POLICY = "remoteTransactionSyncronizationPolicy";
- String LOCAL_TRANSACTION_SYNCRONIZATION_POLICY = "localTransactionSyncronizationPolicy";
+ String REMOTE_TRANSACTION_SYNCRONIZATION_POLICY = "remoteTransactionSynchronizationPolicy";
+ String LOCAL_TRANSACTION_SYNCRONIZATION_POLICY = "localTransactionSynchronizationPolicy";
+ String COALESCING_SYNC = "coalescingSync";
+ String REPLICA_ACKNOWLEDGMENT_POLICY = "replicaAcknowledgmentPolicy";
- @ManagedAttribute( defaultValue = "NO_SYNC")
- String getLocalTransactionSyncronizationPolicy();
+ @ManagedAttribute( defaultValue = "SYNC")
+ String getLocalTransactionSynchronizationPolicy();
@ManagedAttribute( defaultValue = "NO_SYNC")
- String getRemoteTransactionSyncronizationPolicy();
+ String getRemoteTransactionSynchronizationPolicy();
+
+ @DerivedAttribute
+ String getReplicaAcknowledgmentPolicy();
+
+ @DerivedAttribute
+ boolean isCoalescingSync();
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
index 682f03bf7e..974ff51a3b 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
@@ -45,11 +45,11 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm
private final BDBMessageStore _messageStore;
private MessageStoreLogSubject _messageStoreLogSubject;
- @ManagedAttributeField(afterSet="setLocalTransactionSyncronizationPolicyOnEnvironment")
- private String _localTransactionSyncronizationPolicy;
+ @ManagedAttributeField(afterSet="setLocalTransactionSynchronizationPolicyOnEnvironment")
+ private String _localTransactionSynchronizationPolicy;
- @ManagedAttributeField(afterSet="setRemoteTransactionSyncronizationPolicyOnEnvironment")
- private String _remoteTransactionSyncronizationPolicy;
+ @ManagedAttributeField(afterSet="setRemoteTransactionSynchronizationPolicyOnEnvironment")
+ private String _remoteTransactionSynchronizationPolicy;
@ManagedObjectFactoryConstructor
public BDBHAVirtualHostImpl(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
@@ -84,23 +84,46 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm
}
@Override
- public String getLocalTransactionSyncronizationPolicy()
+ public String getLocalTransactionSynchronizationPolicy()
{
- return _localTransactionSyncronizationPolicy;
+ return _localTransactionSynchronizationPolicy;
}
@Override
- public String getRemoteTransactionSyncronizationPolicy()
+ public String getRemoteTransactionSynchronizationPolicy()
{
- return _remoteTransactionSyncronizationPolicy;
+ return _remoteTransactionSynchronizationPolicy;
+ }
+
+
+ @Override
+ public String getReplicaAcknowledgmentPolicy()
+ {
+ ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade();
+ if (facade != null)
+ {
+ return facade.getReplicaAcknowledgmentPolicy().name();
+ }
+ return null;
+ }
+
+ @Override
+ public boolean isCoalescingSync()
+ {
+ ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade();
+ if (facade != null)
+ {
+ return facade.isCoalescingSync();
+ }
+ return false;
}
@Override
public void onOpen()
{
super.onOpen();
- setRemoteTransactionSyncronizationPolicyOnEnvironment();
- setLocalTransactionSyncronizationPolicyOnEnvironment();
+ setRemoteTransactionSynchronizationPolicyOnEnvironment();
+ setLocalTransactionSynchronizationPolicyOnEnvironment();
}
@Override
@@ -110,13 +133,13 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm
if(changedAttributes.contains(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY))
{
- String policy = ((BDBHAVirtualHost<?>)proxyForValidation).getLocalTransactionSyncronizationPolicy();
+ String policy = ((BDBHAVirtualHost<?>)proxyForValidation).getLocalTransactionSynchronizationPolicy();
validateTransactionSynchronizationPolicy(policy);
}
if(changedAttributes.contains(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY))
{
- String policy = ((BDBHAVirtualHost<?>)proxyForValidation).getRemoteTransactionSyncronizationPolicy();
+ String policy = ((BDBHAVirtualHost<?>)proxyForValidation).getRemoteTransactionSynchronizationPolicy();
validateTransactionSynchronizationPolicy(policy);
}
}
@@ -133,21 +156,21 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm
}
}
- protected void setLocalTransactionSyncronizationPolicyOnEnvironment()
+ protected void setLocalTransactionSynchronizationPolicyOnEnvironment()
{
ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade();
if (facade != null)
{
- facade.setLocalTransactionSyncronizationPolicy(SyncPolicy.valueOf(getLocalTransactionSyncronizationPolicy()));
+ facade.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.valueOf(getLocalTransactionSynchronizationPolicy()));
}
}
- protected void setRemoteTransactionSyncronizationPolicyOnEnvironment()
+ protected void setRemoteTransactionSynchronizationPolicyOnEnvironment()
{
ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade();
if (facade != null)
{
- facade.setRemoteTransactionSyncronizationPolicy(SyncPolicy.valueOf(getRemoteTransactionSyncronizationPolicy()));
+ facade.setMessageStoreRemoteTransactionSyncrhonizationPolicy(SyncPolicy.valueOf(getRemoteTransactionSynchronizationPolicy()));
}
}
@@ -155,4 +178,5 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm
{
return (ReplicatedEnvironmentFacade)_messageStore.getEnvironmentFacade();
}
+
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
index 9fdaccb949..b230b7520e 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
@@ -31,7 +31,6 @@ public interface BDBHAVirtualHostNode<X extends BDBHAVirtualHostNode<X>> extends
public static final String ADDRESS = "address";
public static final String HELPER_ADDRESS = "helperAddress";
public static final String DURABILITY = "durability";
- public static final String COALESCING_SYNC = "coalescingSync";
public static final String DESIGNATED_PRIMARY = "designatedPrimary";
public static final String PRIORITY = "priority";
public static final String QUORUM_OVERRIDE = "quorumOverride";
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
index 11a58b6167..91ac63ff46 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
@@ -177,7 +177,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
@Override
public String getDurability()
{
- ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
+ ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
if (environmentFacade != null)
{
return environmentFacade.getDurability().toString();
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
index 563665917c..962e19f81c 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
@@ -174,7 +174,7 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
assertEquals(nodeHostPort, replicationConfig.getNodeHostPort());
assertEquals(helperHostPort, replicationConfig.getHelperHosts());
- assertEquals("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY", environment.getConfig().getDurability().toString());
+ assertEquals("SYNC,NO_SYNC,SIMPLE_MAJORITY", environment.getConfig().getDurability().toString());
assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, replicationConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT));
assertTrue("Virtual host child has not been added", virtualHostAddedLatch.await(30, TimeUnit.SECONDS));
@@ -465,17 +465,21 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
BDBHAVirtualHostImpl virtualHost = (BDBHAVirtualHostImpl)node.getVirtualHost();
assertNotNull("Virtual host is not created", virtualHost);
- assertEquals("Unexpected local transaction synchronization policy", "NO_SYNC", virtualHost.getLocalTransactionSyncronizationPolicy());
- assertEquals("Unexpected remote transaction synchronization policy", "NO_SYNC", virtualHost.getRemoteTransactionSyncronizationPolicy());
+ awaitForAttributeChange(virtualHost, BDBHAVirtualHostImpl.COALESCING_SYNC, true);
+
+ assertEquals("Unexpected local transaction synchronization policy", "SYNC", virtualHost.getLocalTransactionSynchronizationPolicy());
+ assertEquals("Unexpected remote transaction synchronization policy", "NO_SYNC", virtualHost.getRemoteTransactionSynchronizationPolicy());
+ assertTrue("CoalescingSync is not ON", virtualHost.isCoalescingSync());
Map<String, Object> virtualHostAttributes = new HashMap<String,Object>();
virtualHostAttributes.put(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "WRITE_NO_SYNC");
virtualHostAttributes.put(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY, "SYNC");
virtualHost.setAttributes(virtualHostAttributes);
- assertEquals("Unexpected local transaction synchronization policy", "WRITE_NO_SYNC", virtualHost.getLocalTransactionSyncronizationPolicy());
- assertEquals("Unexpected remote transaction synchronization policy", "SYNC", virtualHost.getRemoteTransactionSyncronizationPolicy());
-
+ awaitForAttributeChange(virtualHost, BDBHAVirtualHostImpl.COALESCING_SYNC, false);
+ assertEquals("Unexpected local transaction synchronization policy", "WRITE_NO_SYNC", virtualHost.getLocalTransactionSynchronizationPolicy());
+ assertEquals("Unexpected remote transaction synchronization policy", "SYNC", virtualHost.getRemoteTransactionSynchronizationPolicy());
+ assertFalse("CoalescingSync is not OFF", virtualHost.isCoalescingSync());
try
{
virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "INVALID"));
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index 3660566c8c..e4f1b62954 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.qpid.server.store.berkeleydb.Committer;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.test.utils.TestFileUtils;
@@ -62,9 +63,8 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
private static final String TEST_NODE_NAME = "testNodeName";
private static final String TEST_NODE_HOST_PORT = "localhost:" + TEST_NODE_PORT;
private static final String TEST_NODE_HELPER_HOST_PORT = TEST_NODE_HOST_PORT;
- private static final String TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY").toString();
+ private static final Durability TEST_DURABILITY = Durability.parse("SYNC,NO_SYNC,SIMPLE_MAJORITY");
private static final boolean TEST_DESIGNATED_PRIMARY = false;
- private static final boolean TEST_COALESCING_SYNC = true;
private static final int TEST_PRIORITY = 1;
private static final int TEST_ELECTABLE_GROUP_OVERRIDE = 0;
@@ -185,12 +185,32 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
public void testGetDurability() throws Exception
{
- assertEquals("Unexpected durability", TEST_DURABILITY.toString(), createMaster().getDurability().toString());
+ ReplicatedEnvironmentFacade master = createMaster();
+ assertEquals("Unexpected message store durability", TEST_DURABILITY, master.getMessageStoreTransactionDurability());
+ assertEquals("Unexpected durability", TEST_DURABILITY, master.getDurability());
+ Committer committer = master.createCommitter(TEST_GROUP_NAME);
+ committer.start();
+
+ waitForCommitter(committer, true);
+
+ assertEquals("Unexpected message store durability after committer start", "NO_SYNC,NO_SYNC,SIMPLE_MAJORITY", master.getMessageStoreTransactionDurability().toString());
+
+ committer.stop();
+ waitForCommitter(committer, false);
+ assertEquals("Unexpected message store durability after committer stop", TEST_DURABILITY, master.getMessageStoreTransactionDurability());
}
public void testIsCoalescingSync() throws Exception
{
- assertEquals("Unexpected coalescing sync", TEST_COALESCING_SYNC, createMaster().isCoalescingSync());
+ ReplicatedEnvironmentFacade master = createMaster();
+ assertEquals("Unexpected coalescing sync", false, master.isCoalescingSync());
+ Committer committer = master.createCommitter(TEST_GROUP_NAME);
+ committer.start();
+ waitForCommitter(committer, true);
+ assertEquals("Unexpected coalescing sync", true, master.isCoalescingSync());
+ committer.stop();
+ waitForCommitter(committer, false);
+ assertEquals("Unexpected coalescing sync", false, master.isCoalescingSync());
}
public void testGetNodeState() throws Exception
@@ -690,20 +710,20 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
{
ReplicatedEnvironmentFacade facade = createMaster();
assertEquals("Unexpected local transaction synchronization policy before change",
- ReplicatedEnvironmentFacade.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getLocalTransactionSyncronizationPolicy());
- facade.setLocalTransactionSyncronizationPolicy(SyncPolicy.WRITE_NO_SYNC);
+ ReplicatedEnvironmentFacade.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getMessageStoreLocalTransactionSyncronizationPolicy());
+ facade.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.WRITE_NO_SYNC);
assertEquals("Unexpected local transaction synchronization policy after change",
- SyncPolicy.WRITE_NO_SYNC, facade.getLocalTransactionSyncronizationPolicy());
+ SyncPolicy.WRITE_NO_SYNC, facade.getMessageStoreLocalTransactionSyncronizationPolicy());
}
public void testSetRemoteTransactionSyncronizationPolicy() throws Exception
{
ReplicatedEnvironmentFacade facade = createMaster();
assertEquals("Unexpected remote transaction synchronization policy before change",
- ReplicatedEnvironmentFacade.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getRemoteTransactionSyncronizationPolicy());
- facade.setRemoteTransactionSyncronizationPolicy(SyncPolicy.WRITE_NO_SYNC);
+ ReplicatedEnvironmentFacade.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getMessageStoreRemoteTransactionSyncronizationPolicy());
+ facade.setMessageStoreRemoteTransactionSyncrhonizationPolicy(SyncPolicy.WRITE_NO_SYNC);
assertEquals("Unexpected remote transaction synchronization policy after change",
- SyncPolicy.WRITE_NO_SYNC, facade.getRemoteTransactionSyncronizationPolicy());
+ SyncPolicy.WRITE_NO_SYNC, facade.getMessageStoreRemoteTransactionSyncronizationPolicy());
}
public void testBeginTransaction() throws Exception
@@ -780,6 +800,17 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
return dbConfig;
}
+ private void waitForCommitter(Committer committer, boolean expected) throws InterruptedException
+ {
+ int counter = 0;
+ while(committer.isStarted() != expected && counter < 100)
+ {
+ Thread.sleep(20);
+ counter++;
+ }
+ assertEquals("Committer is not in expected state", expected, committer.isStarted());
+ }
+
private ReplicatedEnvironmentConfiguration createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, boolean designatedPrimary)
{
ReplicatedEnvironmentConfiguration node = mock(ReplicatedEnvironmentConfiguration.class);
@@ -790,8 +821,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
when(node.getPriority()).thenReturn(TEST_PRIORITY);
when(node.getGroupName()).thenReturn(TEST_GROUP_NAME);
when(node.getHelperHostPort()).thenReturn(TEST_NODE_HELPER_HOST_PORT);
- when(node.getDurability()).thenReturn(TEST_DURABILITY);
- when(node.isCoalescingSync()).thenReturn(TEST_COALESCING_SYNC);
Map<String, String> repConfig = new HashMap<String, String>();
repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s");
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java
index c120a3d63c..ca762cd86d 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java
@@ -95,23 +95,25 @@ public class BDBHAVirtualHostRestTest extends QpidRestTestCase
public void testSetLocalTransactionSynchronizationPolicy() throws Exception
{
- assertSetTransactionSynchronizationPolicy(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY);
- }
+ Map<String, Object> hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name());
+ assertEquals("Unexpected synchronization policy before change", "SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY));
- public void testSetRemoteTransactionSynchronizationPolicy() throws Exception
- {
- assertSetTransactionSynchronizationPolicy(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY);
+ Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "NO_SYNC");
+ getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, 200);
+
+ hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl);
+ assertEquals("Unexpected synchronization policy after change", "NO_SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY));
}
- private void assertSetTransactionSynchronizationPolicy(String policyAttribute) throws Exception, IOException
+ public void testSetRemoteTransactionSynchronizationPolicy() throws Exception
{
Map<String, Object> hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name());
- assertEquals("Unexpected synchronization policy before change", "NO_SYNC", hostAttributes.get(policyAttribute));
+ assertEquals("Unexpected synchronization policy before change", "NO_SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY));
- Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(policyAttribute, "SYNC");
+ Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY, "SYNC");
getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, 200);
hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl);
- assertEquals("Unexpected synchronization policy after change", "SYNC", hostAttributes.get(policyAttribute));
+ assertEquals("Unexpected synchronization policy after change", "SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY));
}
}