diff options
13 files changed, 299 insertions, 148 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 2675793b36..39bfdbc6bf 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -317,7 +317,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { if (_committer != null) { - _committer.stop(); + _committer.close(); } } finally @@ -335,19 +335,9 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { if (_configurationStoreOpen.compareAndSet(true, false)) { - try - { - if (_committer != null) - { - _committer.stop(); - } - } - finally + if (!_messageStoreOpen.get()) { - if (!_messageStoreOpen.get()) - { - closeEnvironment(); - } + closeEnvironment(); } } } @@ -542,7 +532,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore com.sleepycat.je.Transaction txn = null; try { - txn = _environmentFacade.beginTransaction(); + txn = _environmentFacade.getEnvironment().beginTransaction(null, null); storeConfiguredObjectEntry(txn, configuredObject); txn.commit(); txn = null; @@ -569,7 +559,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore com.sleepycat.je.Transaction txn = null; try { - txn = _environmentFacade.beginTransaction(); + txn = _environmentFacade.getEnvironment().beginTransaction(null, null); Collection<UUID> removed = new ArrayList<UUID>(objects.length); for(ConfiguredObjectRecord record : objects) @@ -606,7 +596,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore com.sleepycat.je.Transaction txn = null; try { - txn = _environmentFacade.beginTransaction(); + txn = _environmentFacade.getEnvironment().beginTransaction(null, null); for(ConfiguredObjectRecord record : records) { update(createIfNecessary, record, txn); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java index 6550a9122e..cf76d13f2a 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java @@ -22,7 +22,11 @@ package org.apache.qpid.server.store.berkeleydb; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.log4j.Logger; import org.apache.qpid.server.store.StoreFuture; @@ -33,55 +37,94 @@ import com.sleepycat.je.Transaction; public class CoalescingCommiter implements Committer { - private final CommitThread _commitThread; + private final CommitTask _commitTask; + private final AtomicReference<Boolean> _started; + private final ExecutorService _taskExecutor; - public CoalescingCommiter(String name, EnvironmentFacade environmentFacade) + public CoalescingCommiter(final String name, EnvironmentFacade environmentFacade) { - _commitThread = new CommitThread("Commit-Thread-" + name, environmentFacade); + _started = new AtomicReference<Boolean>(false); + _commitTask = new CommitTask(environmentFacade); + _taskExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() + { + @Override + public Thread newThread(Runnable r) + { + return new Thread(r, "Commit-Thread-" + name); + } + }); } @Override public void start() { - _commitThread.start(); + if (_started.compareAndSet(false, true)) + { + _taskExecutor.submit(_commitTask); + } } @Override public void stop() { - _commitThread.close(); + if (_started.compareAndSet(true, false)) + { + _commitTask.stop(); + } + } + + @Override + public void close() + { try { - _commitThread.join(); + _started.set(false); + _commitTask.close(); } - catch (InterruptedException ie) + finally { - Thread.currentThread().interrupt(); - throw new RuntimeException("Commit thread has not shutdown", ie); + _taskExecutor.shutdown(); } } @Override public StoreFuture commit(Transaction tx, boolean syncCommit) { - BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit); - commitFuture.commit(); - return commitFuture; + if (_started.get()) + { + BDBCommitFuture commitFuture = new BDBCommitFuture(_commitTask, tx, syncCommit); + try + { + commitFuture.commit(); + return commitFuture; + } + catch(IllegalStateException e) + { + // IllegalStateException is thrown when commit thread is stopped whilst commit is called + } + } + + return StoreFuture.IMMEDIATE_FUTURE; + } + + public boolean isStarted() + { + return _started.get(); } private static final class BDBCommitFuture implements StoreFuture { private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class); - private final CommitThread _commitThread; + private final CommitTask _commitTask; private final Transaction _tx; private final boolean _syncCommit; private RuntimeException _databaseException; private boolean _complete; - public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit) + public BDBCommitFuture(CommitTask commitTask, Transaction tx, boolean syncCommit) { - _commitThread = commitThread; + _commitTask = commitTask; _tx = tx; _syncCommit = syncCommit; } @@ -107,7 +150,7 @@ public class CoalescingCommiter implements Committer public void commit() throws DatabaseException { - _commitThread.addJob(this, _syncCommit); + _commitTask.addJob(this, _syncCommit); if(!_syncCommit) { @@ -142,7 +185,12 @@ public class CoalescingCommiter implements Committer while (!isComplete()) { - _commitThread.explicitNotify(); + if (_commitTask.isStopped()) + { + throw new IllegalStateException("Commit thread is stopped"); + } + + _commitTask.explicitNotify(); try { wait(250); @@ -162,28 +210,32 @@ public class CoalescingCommiter implements Committer } /** - * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations + * Implements a {@link Runnable} which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before * continuing, but it is the responsibility of this thread to tell the commit operations when they have been * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods. * * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table> */ - private static class CommitThread extends Thread + private static class CommitTask implements Runnable { - private static final Logger LOGGER = Logger.getLogger(CommitThread.class); + private static final Logger LOGGER = Logger.getLogger(CommitTask.class); - private final AtomicBoolean _stopped = new AtomicBoolean(false); + private final AtomicBoolean _stopped = new AtomicBoolean(true); private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>(); private final Object _lock = new Object(); private final EnvironmentFacade _environmentFacade; - public CommitThread(String name, EnvironmentFacade environmentFacade) + public CommitTask(EnvironmentFacade environmentFacade) { - super(name); _environmentFacade = environmentFacade; } + public boolean isStopped() + { + return _stopped.get(); + } + public void explicitNotify() { synchronized (_lock) @@ -192,26 +244,30 @@ public class CoalescingCommiter implements Committer } } + @Override public void run() { - while (!_stopped.get()) + if (_stopped.compareAndSet(true, false)) { - synchronized (_lock) + while (!_stopped.get()) { - while (!_stopped.get() && !hasJobs()) + synchronized (_lock) { - try - { - // Periodically wake up and check, just in case we - // missed a notification. Don't want to lock the broker hard. - _lock.wait(1000); - } - catch (InterruptedException e) + while (!_stopped.get() && !hasJobs()) { + try + { + // Periodically wake up and check, just in case we + // missed a notification. Don't want to lock the broker hard. + _lock.wait(1000); + } + catch (InterruptedException e) + { + } } } + processJobs(); } - processJobs(); } } @@ -303,9 +359,20 @@ public class CoalescingCommiter implements Committer } } + public void stop() + { + synchronized (_lock) + { + if (_stopped.compareAndSet(false, true) || hasJobs()) + { + processJobs(); + } + } + } + public void close() { - RuntimeException e = new RuntimeException("Commit thread has been closed, transaction aborted"); + RuntimeException e = new RuntimeException("Commit thread has been stopped"); synchronized (_lock) { _stopped.set(true); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java index 36ee2ad306..d1362ed89c 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java @@ -32,6 +32,10 @@ public interface Committer void stop(); + void close(); + + boolean isStarted(); + Committer IMMEDIATE_FUTURE_COMMITTER = new Committer() { @@ -50,6 +54,17 @@ public interface Committer public void stop() { } + + @Override + public void close() + { + } + + @Override + public boolean isStarted() + { + return true; + } }; }
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java index 76a48c189e..1fc37db902 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java @@ -29,8 +29,6 @@ public interface ReplicatedEnvironmentConfiguration String getGroupName(); String getHostPort(); String getHelperHostPort(); - String getDurability(); - boolean isCoalescingSync(); boolean isDesignatedPrimary(); int getPriority(); int getQuorumOverride(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index 17594aa730..b57d5bae21 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -99,8 +99,9 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private static final int REMOTE_NODE_MONITOR_INTERVAL = Integer.getInteger(REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, DEFAULT_REMOTE_NODE_MONITOR_INTERVAL); private static final int RESTART_TRY_LIMIT = 3; - static final SyncPolicy LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.NO_SYNC; + static final SyncPolicy LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.SYNC; static final SyncPolicy REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.NO_SYNC; + static final ReplicaAckPolicy REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY = ReplicaAckPolicy.SIMPLE_MAJORITY; @SuppressWarnings("serial") private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>() @@ -153,12 +154,14 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>(); private final AtomicBoolean _initialised; private final EnvironmentFacadeTask[] _initialisationTasks; + private final Durability _defaultDurability; + private final CoalescingCommiter _coalescingCommiter; private volatile ReplicatedEnvironment _environment; private volatile long _joinTime; private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState; - private volatile SyncPolicy _localTransactionSyncronizationPolicy = LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY; - private volatile SyncPolicy _remoteTransactionSyncronizationPolicy = REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY; + private volatile SyncPolicy _messageStoreLocalTransactionSyncronizationPolicy = LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY; + private volatile SyncPolicy _messageStoreRemoteTransactionSyncronizationPolicy = REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY; public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, EnvironmentFacadeTask[] initialisationTasks) { @@ -175,13 +178,15 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan _initialised = new AtomicBoolean(); _initialisationTasks = initialisationTasks; _configuration = configuration; - + _defaultDurability = new Durability(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY); _prettyGroupNodeName = _configuration.getGroupName() + ":" + _configuration.getName(); // we relay on this executor being single-threaded as we need to restart and mutate the environment in one thread _environmentJobExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("Environment-" + _prettyGroupNodeName)); _groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName)); + _coalescingCommiter = new CoalescingCommiter(_configuration.getGroupName(), this); + // create environment in a separate thread to avoid renaming of the current thread by JE _environment = createEnvironment(true); populateExistingRemoteReplicationNodes(); @@ -193,9 +198,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan { try { - Durability durability = getDurability(); TransactionConfig transactionConfig = new TransactionConfig(); - transactionConfig.setDurability(durability); + transactionConfig.setDurability(getMessageStoreTransactionDurability()); return _environment.beginTransaction(null, transactionConfig); } catch(DatabaseException e) @@ -511,16 +515,26 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan return (String)_configuration.getHelperHostPort(); } + Durability getMessageStoreTransactionDurability() + { + SyncPolicy localSync = getMessageStoreLocalTransactionSyncronizationPolicy(); + if ( localSync == LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY && _coalescingCommiter.isStarted()) + { + localSync = SyncPolicy.NO_SYNC; + } + SyncPolicy replicaSync = getMessageStoreRemoteTransactionSyncronizationPolicy(); + return new Durability(localSync, replicaSync, getReplicaAcknowledgmentPolicy()); + } + public Durability getDurability() { - SyncPolicy localSync = getLocalTransactionSyncronizationPolicy(); - SyncPolicy replicaSync = getRemoteTransactionSyncronizationPolicy(); - return new Durability(localSync, replicaSync, ReplicaAckPolicy.SIMPLE_MAJORITY); + return new Durability(getMessageStoreLocalTransactionSyncronizationPolicy(), + getMessageStoreRemoteTransactionSyncronizationPolicy(), getReplicaAcknowledgmentPolicy()); } public boolean isCoalescingSync() { - return true; + return _coalescingCommiter.isStarted(); } public String getNodeState() @@ -905,7 +919,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan boolean designatedPrimary = _configuration.isDesignatedPrimary(); int priority = _configuration.getPriority(); int quorumOverride = _configuration.getQuorumOverride(); - Durability durability = getDurability(); if (LOGGER.isInfoEnabled()) { @@ -915,7 +928,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan LOGGER.info("Node name " + _configuration.getName()); LOGGER.info("Node host port " + hostPort); LOGGER.info("Helper host port " + helperHostPort); - LOGGER.info("Durability " + durability); + LOGGER.info("Durability " + _defaultDurability); LOGGER.info("Designated primary (applicable to 2 node case only) " + designatedPrimary); LOGGER.info("Node priority " + priority); LOGGER.info("Quorum override " + quorumOverride); @@ -951,7 +964,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan envConfig.setAllowCreate(true); envConfig.setTransactional(true); envConfig.setExceptionListener(new LoggingAsyncExceptionListener()); - envConfig.setDurability(durability); + envConfig.setDurability(_defaultDurability); for (Map.Entry<String, String> configItem : environmentSettings.entrySet()) { @@ -1037,7 +1050,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan @Override public Committer createCommitter(String name) { - return new CoalescingCommiter(name, this); + return _coalescingCommiter; } NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException @@ -1075,24 +1088,37 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } - public SyncPolicy getLocalTransactionSyncronizationPolicy() + public SyncPolicy getMessageStoreLocalTransactionSyncronizationPolicy() { - return _localTransactionSyncronizationPolicy; + return _messageStoreLocalTransactionSyncronizationPolicy; } - public SyncPolicy getRemoteTransactionSyncronizationPolicy() + public SyncPolicy getMessageStoreRemoteTransactionSyncronizationPolicy() { - return _remoteTransactionSyncronizationPolicy; + return _messageStoreRemoteTransactionSyncronizationPolicy; } - public void setLocalTransactionSyncronizationPolicy(SyncPolicy localTransactionSyncronizationPolicy) + public ReplicaAckPolicy getReplicaAcknowledgmentPolicy() { - _localTransactionSyncronizationPolicy = localTransactionSyncronizationPolicy; + return REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY; + } + + public void setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy localTransactionSyncronizationPolicy) + { + _messageStoreLocalTransactionSyncronizationPolicy = localTransactionSyncronizationPolicy; + if (localTransactionSyncronizationPolicy == LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY) + { + _coalescingCommiter.start(); + } + else + { + _coalescingCommiter.stop(); + } } - public void setRemoteTransactionSyncronizationPolicy(SyncPolicy remoteTransactionSyncronizationPolicy) + public void setMessageStoreRemoteTransactionSyncrhonizationPolicy(SyncPolicy remoteTransactionSyncronizationPolicy) { - _remoteTransactionSyncronizationPolicy = remoteTransactionSyncronizationPolicy; + _messageStoreRemoteTransactionSyncronizationPolicy = remoteTransactionSyncronizationPolicy; } private void populateExistingRemoteReplicationNodes() diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java index 62035bb65f..e154e8bf45 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java @@ -24,7 +24,7 @@ import java.util.Map; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory; -import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNodeImpl; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory { @@ -36,76 +36,64 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact @Override public boolean isDesignatedPrimary() { - return (Boolean)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.DESIGNATED_PRIMARY); - } - - @Override - public boolean isCoalescingSync() - { - return (Boolean)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.COALESCING_SYNC); + return (Boolean)messageStoreSettings.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY); } @Override public String getStorePath() { - return (String) messageStoreSettings.get(BDBHAVirtualHostNodeImpl.STORE_PATH); + return (String) messageStoreSettings.get(BDBHAVirtualHostNode.STORE_PATH); } @SuppressWarnings("unchecked") @Override public Map<String, String> getParameters() { - return (Map<String, String>) messageStoreSettings.get(BDBHAVirtualHostNodeImpl.ENVIRONMENT_CONFIGURATION); + return (Map<String, String>) messageStoreSettings.get(BDBHAVirtualHostNode.ENVIRONMENT_CONFIGURATION); } @SuppressWarnings("unchecked") @Override public Map<String, String> getReplicationParameters() { - return (Map<String, String>) messageStoreSettings.get(BDBHAVirtualHostNodeImpl.REPLICATED_ENVIRONMENT_CONFIGURATION); + return (Map<String, String>) messageStoreSettings.get(BDBHAVirtualHostNode.REPLICATED_ENVIRONMENT_CONFIGURATION); } @Override public int getQuorumOverride() { - return (Integer)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.QUORUM_OVERRIDE); + return (Integer)messageStoreSettings.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE); } @Override public int getPriority() { - return (Integer)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.PRIORITY); + return (Integer)messageStoreSettings.get(BDBHAVirtualHostNode.PRIORITY); } @Override public String getName() { - return (String)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.NAME); + return (String)messageStoreSettings.get(BDBHAVirtualHostNode.NAME); } @Override public String getHostPort() { - return (String)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.ADDRESS); + return (String)messageStoreSettings.get(BDBHAVirtualHostNode.ADDRESS); } @Override public String getHelperHostPort() { - return (String)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.HELPER_ADDRESS); + return (String)messageStoreSettings.get(BDBHAVirtualHostNode.HELPER_ADDRESS); } @Override public String getGroupName() { - return (String)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.GROUP_NAME); + return (String)messageStoreSettings.get(BDBHAVirtualHostNode.GROUP_NAME); } - - @Override - public String getDurability() - { - return (String)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.DURABILITY); - } }; return new ReplicatedEnvironmentFacade(configuration, initialisationTasks); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java index 3c560de429..84ac9ca277 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java @@ -21,18 +21,27 @@ package org.apache.qpid.server.virtualhost.berkeleydb; import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.model.DerivedAttribute; import org.apache.qpid.server.model.ManagedAttribute; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHostImpl; public interface BDBHAVirtualHost<X extends BDBHAVirtualHost<X>> extends VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>> { - String REMOTE_TRANSACTION_SYNCRONIZATION_POLICY = "remoteTransactionSyncronizationPolicy"; - String LOCAL_TRANSACTION_SYNCRONIZATION_POLICY = "localTransactionSyncronizationPolicy"; + String REMOTE_TRANSACTION_SYNCRONIZATION_POLICY = "remoteTransactionSynchronizationPolicy"; + String LOCAL_TRANSACTION_SYNCRONIZATION_POLICY = "localTransactionSynchronizationPolicy"; + String COALESCING_SYNC = "coalescingSync"; + String REPLICA_ACKNOWLEDGMENT_POLICY = "replicaAcknowledgmentPolicy"; - @ManagedAttribute( defaultValue = "NO_SYNC") - String getLocalTransactionSyncronizationPolicy(); + @ManagedAttribute( defaultValue = "SYNC") + String getLocalTransactionSynchronizationPolicy(); @ManagedAttribute( defaultValue = "NO_SYNC") - String getRemoteTransactionSyncronizationPolicy(); + String getRemoteTransactionSynchronizationPolicy(); + + @DerivedAttribute + String getReplicaAcknowledgmentPolicy(); + + @DerivedAttribute + boolean isCoalescingSync(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java index 682f03bf7e..974ff51a3b 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java @@ -45,11 +45,11 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm private final BDBMessageStore _messageStore; private MessageStoreLogSubject _messageStoreLogSubject; - @ManagedAttributeField(afterSet="setLocalTransactionSyncronizationPolicyOnEnvironment") - private String _localTransactionSyncronizationPolicy; + @ManagedAttributeField(afterSet="setLocalTransactionSynchronizationPolicyOnEnvironment") + private String _localTransactionSynchronizationPolicy; - @ManagedAttributeField(afterSet="setRemoteTransactionSyncronizationPolicyOnEnvironment") - private String _remoteTransactionSyncronizationPolicy; + @ManagedAttributeField(afterSet="setRemoteTransactionSynchronizationPolicyOnEnvironment") + private String _remoteTransactionSynchronizationPolicy; @ManagedObjectFactoryConstructor public BDBHAVirtualHostImpl(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode) @@ -84,23 +84,46 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm } @Override - public String getLocalTransactionSyncronizationPolicy() + public String getLocalTransactionSynchronizationPolicy() { - return _localTransactionSyncronizationPolicy; + return _localTransactionSynchronizationPolicy; } @Override - public String getRemoteTransactionSyncronizationPolicy() + public String getRemoteTransactionSynchronizationPolicy() { - return _remoteTransactionSyncronizationPolicy; + return _remoteTransactionSynchronizationPolicy; + } + + + @Override + public String getReplicaAcknowledgmentPolicy() + { + ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade(); + if (facade != null) + { + return facade.getReplicaAcknowledgmentPolicy().name(); + } + return null; + } + + @Override + public boolean isCoalescingSync() + { + ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade(); + if (facade != null) + { + return facade.isCoalescingSync(); + } + return false; } @Override public void onOpen() { super.onOpen(); - setRemoteTransactionSyncronizationPolicyOnEnvironment(); - setLocalTransactionSyncronizationPolicyOnEnvironment(); + setRemoteTransactionSynchronizationPolicyOnEnvironment(); + setLocalTransactionSynchronizationPolicyOnEnvironment(); } @Override @@ -110,13 +133,13 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm if(changedAttributes.contains(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY)) { - String policy = ((BDBHAVirtualHost<?>)proxyForValidation).getLocalTransactionSyncronizationPolicy(); + String policy = ((BDBHAVirtualHost<?>)proxyForValidation).getLocalTransactionSynchronizationPolicy(); validateTransactionSynchronizationPolicy(policy); } if(changedAttributes.contains(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY)) { - String policy = ((BDBHAVirtualHost<?>)proxyForValidation).getRemoteTransactionSyncronizationPolicy(); + String policy = ((BDBHAVirtualHost<?>)proxyForValidation).getRemoteTransactionSynchronizationPolicy(); validateTransactionSynchronizationPolicy(policy); } } @@ -133,21 +156,21 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm } } - protected void setLocalTransactionSyncronizationPolicyOnEnvironment() + protected void setLocalTransactionSynchronizationPolicyOnEnvironment() { ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade(); if (facade != null) { - facade.setLocalTransactionSyncronizationPolicy(SyncPolicy.valueOf(getLocalTransactionSyncronizationPolicy())); + facade.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.valueOf(getLocalTransactionSynchronizationPolicy())); } } - protected void setRemoteTransactionSyncronizationPolicyOnEnvironment() + protected void setRemoteTransactionSynchronizationPolicyOnEnvironment() { ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade(); if (facade != null) { - facade.setRemoteTransactionSyncronizationPolicy(SyncPolicy.valueOf(getRemoteTransactionSyncronizationPolicy())); + facade.setMessageStoreRemoteTransactionSyncrhonizationPolicy(SyncPolicy.valueOf(getRemoteTransactionSynchronizationPolicy())); } } @@ -155,4 +178,5 @@ public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostIm { return (ReplicatedEnvironmentFacade)_messageStore.getEnvironmentFacade(); } + } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java index 9fdaccb949..b230b7520e 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java @@ -31,7 +31,6 @@ public interface BDBHAVirtualHostNode<X extends BDBHAVirtualHostNode<X>> extends public static final String ADDRESS = "address"; public static final String HELPER_ADDRESS = "helperAddress"; public static final String DURABILITY = "durability"; - public static final String COALESCING_SYNC = "coalescingSync"; public static final String DESIGNATED_PRIMARY = "designatedPrimary"; public static final String PRIORITY = "priority"; public static final String QUORUM_OVERRIDE = "quorumOverride"; diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index 11a58b6167..91ac63ff46 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -177,7 +177,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu @Override public String getDurability() { - ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade(); + ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); if (environmentFacade != null) { return environmentFacade.getDurability().toString(); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java index 563665917c..962e19f81c 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java @@ -174,7 +174,7 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase assertEquals(nodeHostPort, replicationConfig.getNodeHostPort()); assertEquals(helperHostPort, replicationConfig.getHelperHosts()); - assertEquals("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY", environment.getConfig().getDurability().toString()); + assertEquals("SYNC,NO_SYNC,SIMPLE_MAJORITY", environment.getConfig().getDurability().toString()); assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, replicationConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT)); assertTrue("Virtual host child has not been added", virtualHostAddedLatch.await(30, TimeUnit.SECONDS)); @@ -465,17 +465,21 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase BDBHAVirtualHostImpl virtualHost = (BDBHAVirtualHostImpl)node.getVirtualHost(); assertNotNull("Virtual host is not created", virtualHost); - assertEquals("Unexpected local transaction synchronization policy", "NO_SYNC", virtualHost.getLocalTransactionSyncronizationPolicy()); - assertEquals("Unexpected remote transaction synchronization policy", "NO_SYNC", virtualHost.getRemoteTransactionSyncronizationPolicy()); + awaitForAttributeChange(virtualHost, BDBHAVirtualHostImpl.COALESCING_SYNC, true); + + assertEquals("Unexpected local transaction synchronization policy", "SYNC", virtualHost.getLocalTransactionSynchronizationPolicy()); + assertEquals("Unexpected remote transaction synchronization policy", "NO_SYNC", virtualHost.getRemoteTransactionSynchronizationPolicy()); + assertTrue("CoalescingSync is not ON", virtualHost.isCoalescingSync()); Map<String, Object> virtualHostAttributes = new HashMap<String,Object>(); virtualHostAttributes.put(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "WRITE_NO_SYNC"); virtualHostAttributes.put(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY, "SYNC"); virtualHost.setAttributes(virtualHostAttributes); - assertEquals("Unexpected local transaction synchronization policy", "WRITE_NO_SYNC", virtualHost.getLocalTransactionSyncronizationPolicy()); - assertEquals("Unexpected remote transaction synchronization policy", "SYNC", virtualHost.getRemoteTransactionSyncronizationPolicy()); - + awaitForAttributeChange(virtualHost, BDBHAVirtualHostImpl.COALESCING_SYNC, false); + assertEquals("Unexpected local transaction synchronization policy", "WRITE_NO_SYNC", virtualHost.getLocalTransactionSynchronizationPolicy()); + assertEquals("Unexpected remote transaction synchronization policy", "SYNC", virtualHost.getRemoteTransactionSynchronizationPolicy()); + assertFalse("CoalescingSync is not OFF", virtualHost.isCoalescingSync()); try { virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "INVALID")); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index 3660566c8c..e4f1b62954 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.apache.qpid.server.store.berkeleydb.Committer; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.test.utils.TestFileUtils; @@ -62,9 +63,8 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase private static final String TEST_NODE_NAME = "testNodeName"; private static final String TEST_NODE_HOST_PORT = "localhost:" + TEST_NODE_PORT; private static final String TEST_NODE_HELPER_HOST_PORT = TEST_NODE_HOST_PORT; - private static final String TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY").toString(); + private static final Durability TEST_DURABILITY = Durability.parse("SYNC,NO_SYNC,SIMPLE_MAJORITY"); private static final boolean TEST_DESIGNATED_PRIMARY = false; - private static final boolean TEST_COALESCING_SYNC = true; private static final int TEST_PRIORITY = 1; private static final int TEST_ELECTABLE_GROUP_OVERRIDE = 0; @@ -185,12 +185,32 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase public void testGetDurability() throws Exception { - assertEquals("Unexpected durability", TEST_DURABILITY.toString(), createMaster().getDurability().toString()); + ReplicatedEnvironmentFacade master = createMaster(); + assertEquals("Unexpected message store durability", TEST_DURABILITY, master.getMessageStoreTransactionDurability()); + assertEquals("Unexpected durability", TEST_DURABILITY, master.getDurability()); + Committer committer = master.createCommitter(TEST_GROUP_NAME); + committer.start(); + + waitForCommitter(committer, true); + + assertEquals("Unexpected message store durability after committer start", "NO_SYNC,NO_SYNC,SIMPLE_MAJORITY", master.getMessageStoreTransactionDurability().toString()); + + committer.stop(); + waitForCommitter(committer, false); + assertEquals("Unexpected message store durability after committer stop", TEST_DURABILITY, master.getMessageStoreTransactionDurability()); } public void testIsCoalescingSync() throws Exception { - assertEquals("Unexpected coalescing sync", TEST_COALESCING_SYNC, createMaster().isCoalescingSync()); + ReplicatedEnvironmentFacade master = createMaster(); + assertEquals("Unexpected coalescing sync", false, master.isCoalescingSync()); + Committer committer = master.createCommitter(TEST_GROUP_NAME); + committer.start(); + waitForCommitter(committer, true); + assertEquals("Unexpected coalescing sync", true, master.isCoalescingSync()); + committer.stop(); + waitForCommitter(committer, false); + assertEquals("Unexpected coalescing sync", false, master.isCoalescingSync()); } public void testGetNodeState() throws Exception @@ -690,20 +710,20 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase { ReplicatedEnvironmentFacade facade = createMaster(); assertEquals("Unexpected local transaction synchronization policy before change", - ReplicatedEnvironmentFacade.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getLocalTransactionSyncronizationPolicy()); - facade.setLocalTransactionSyncronizationPolicy(SyncPolicy.WRITE_NO_SYNC); + ReplicatedEnvironmentFacade.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getMessageStoreLocalTransactionSyncronizationPolicy()); + facade.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.WRITE_NO_SYNC); assertEquals("Unexpected local transaction synchronization policy after change", - SyncPolicy.WRITE_NO_SYNC, facade.getLocalTransactionSyncronizationPolicy()); + SyncPolicy.WRITE_NO_SYNC, facade.getMessageStoreLocalTransactionSyncronizationPolicy()); } public void testSetRemoteTransactionSyncronizationPolicy() throws Exception { ReplicatedEnvironmentFacade facade = createMaster(); assertEquals("Unexpected remote transaction synchronization policy before change", - ReplicatedEnvironmentFacade.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getRemoteTransactionSyncronizationPolicy()); - facade.setRemoteTransactionSyncronizationPolicy(SyncPolicy.WRITE_NO_SYNC); + ReplicatedEnvironmentFacade.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getMessageStoreRemoteTransactionSyncronizationPolicy()); + facade.setMessageStoreRemoteTransactionSyncrhonizationPolicy(SyncPolicy.WRITE_NO_SYNC); assertEquals("Unexpected remote transaction synchronization policy after change", - SyncPolicy.WRITE_NO_SYNC, facade.getRemoteTransactionSyncronizationPolicy()); + SyncPolicy.WRITE_NO_SYNC, facade.getMessageStoreRemoteTransactionSyncronizationPolicy()); } public void testBeginTransaction() throws Exception @@ -780,6 +800,17 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase return dbConfig; } + private void waitForCommitter(Committer committer, boolean expected) throws InterruptedException + { + int counter = 0; + while(committer.isStarted() != expected && counter < 100) + { + Thread.sleep(20); + counter++; + } + assertEquals("Committer is not in expected state", expected, committer.isStarted()); + } + private ReplicatedEnvironmentConfiguration createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, boolean designatedPrimary) { ReplicatedEnvironmentConfiguration node = mock(ReplicatedEnvironmentConfiguration.class); @@ -790,8 +821,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase when(node.getPriority()).thenReturn(TEST_PRIORITY); when(node.getGroupName()).thenReturn(TEST_GROUP_NAME); when(node.getHelperHostPort()).thenReturn(TEST_NODE_HELPER_HOST_PORT); - when(node.getDurability()).thenReturn(TEST_DURABILITY); - when(node.isCoalescingSync()).thenReturn(TEST_COALESCING_SYNC); Map<String, String> repConfig = new HashMap<String, String>(); repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s"); diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java index c120a3d63c..ca762cd86d 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java @@ -95,23 +95,25 @@ public class BDBHAVirtualHostRestTest extends QpidRestTestCase public void testSetLocalTransactionSynchronizationPolicy() throws Exception { - assertSetTransactionSynchronizationPolicy(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY); - } + Map<String, Object> hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name()); + assertEquals("Unexpected synchronization policy before change", "SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY)); - public void testSetRemoteTransactionSynchronizationPolicy() throws Exception - { - assertSetTransactionSynchronizationPolicy(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY); + Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "NO_SYNC"); + getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, 200); + + hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl); + assertEquals("Unexpected synchronization policy after change", "NO_SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY)); } - private void assertSetTransactionSynchronizationPolicy(String policyAttribute) throws Exception, IOException + public void testSetRemoteTransactionSynchronizationPolicy() throws Exception { Map<String, Object> hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name()); - assertEquals("Unexpected synchronization policy before change", "NO_SYNC", hostAttributes.get(policyAttribute)); + assertEquals("Unexpected synchronization policy before change", "NO_SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY)); - Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(policyAttribute, "SYNC"); + Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY, "SYNC"); getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, 200); hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl); - assertEquals("Unexpected synchronization policy after change", "SYNC", hostAttributes.get(policyAttribute)); + assertEquals("Unexpected synchronization policy after change", "SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY)); } } |
