diff options
| author | Alex Rudyy <orudyy@apache.org> | 2014-05-27 15:07:17 +0000 |
|---|---|---|
| committer | Alex Rudyy <orudyy@apache.org> | 2014-05-27 15:07:17 +0000 |
| commit | 25bf403a6e42a42699b7db3f118f41544a7131f1 (patch) | |
| tree | 56c7c7c326b1dc413e222edcad613f07a28dcb9f /qpid/java/bdbstore | |
| parent | d14fd4dc8626c61b9e364fa501c3627c3a26c99d (diff) | |
| download | qpid-python-25bf403a6e42a42699b7db3f118f41544a7131f1.tar.gz | |
QPID-5715: Add new attributes for transaction sync policies into BDB HA virtual host,
remove 'coalescingSync' attribute from BDB HA virtual host node and always use coalescingSync with replicated environment,
change attribute 'durability' of BDB HA virtual host node into derived attribute,
delegate bdb je transaction creation to EnvironmentFacade,
with ReplicatedEnvironmentFacade crate transactions with durability having virtual host transaction sync policies
and ack mode 'SIMPLE_MAJORITY'.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1597801 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore')
21 files changed, 514 insertions, 185 deletions
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java index 7146af364e..8c05602967 100644 --- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java +++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java @@ -130,7 +130,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M @Override public boolean getCoalescingSync() throws IOException, JMException { - return _virtualHostNode.isCoalescingSync(); + return true; } @Override diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java index 439af259ab..e5829c519a 100644 --- a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java +++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java @@ -119,13 +119,6 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase assertEquals(TEST_DURABILITY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DURABILITY)); } - public void testCoalescingSync() throws Exception - { - when(_virtualHostNode.isCoalescingSync()).thenReturn(true); - - assertEquals(true, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_COALESCING_SYNC)); - } - public void testNodeState() throws Exception { when(_virtualHostNode.getRole()).thenReturn(TEST_NODE_STATE); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java deleted file mode 100644 index adc73c9bd9..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.util.Map; - -import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; -import org.apache.qpid.server.model.ManagedObject; -import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; -import org.apache.qpid.server.model.VirtualHostNode; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.virtualhost.AbstractVirtualHost; - -@ManagedObject( category = false, type = "BDB_HA" ) -public class BDBHAVirtualHost extends AbstractVirtualHost<BDBHAVirtualHost> -{ - public static final String TYPE = "BDB_HA"; - - private final BDBMessageStore _messageStore; - private MessageStoreLogSubject _messageStoreLogSubject; - - @ManagedObjectFactoryConstructor - protected BDBHAVirtualHost(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode) - { - super(attributes, virtualHostNode); - - _messageStore = (BDBMessageStore) virtualHostNode.getConfigurationStore(); - _messageStoreLogSubject = new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName()); - } - - @Override - protected void initialiseStorage() - { - } - - @Override - public DurableConfigurationStore getDurableConfigurationStore() - { - return _messageStore; - } - - @Override - public MessageStore getMessageStore() - { - return _messageStore; - } - - @Override - protected MessageStoreLogSubject getMessageStoreLogSubject() - { - return _messageStoreLogSubject; - } - -} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 5d9c2012a9..2675793b36 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -398,7 +398,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore tx = null; try { - tx = _environmentFacade.getEnvironment().beginTransaction(null, null); + tx = _environmentFacade.beginTransaction(); //remove the message meta data from the store DatabaseEntry key = new DatabaseEntry(); @@ -542,7 +542,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore com.sleepycat.je.Transaction txn = null; try { - txn = _environmentFacade.getEnvironment().beginTransaction(null, null); + txn = _environmentFacade.beginTransaction(); storeConfiguredObjectEntry(txn, configuredObject); txn.commit(); txn = null; @@ -569,7 +569,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore com.sleepycat.je.Transaction txn = null; try { - txn = _environmentFacade.getEnvironment().beginTransaction(null, null); + txn = _environmentFacade.beginTransaction(); Collection<UUID> removed = new ArrayList<UUID>(objects.length); for(ConfiguredObjectRecord record : objects) @@ -606,7 +606,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore com.sleepycat.je.Transaction txn = null; try { - txn = _environmentFacade.getEnvironment().beginTransaction(null, null); + txn = _environmentFacade.beginTransaction(); for(ConfiguredObjectRecord record : records) { update(createIfNecessary, record, txn); @@ -1290,8 +1290,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore com.sleepycat.je.Transaction txn; try { - txn = _environmentFacade.getEnvironment().beginTransaction( - null, null); + txn = _environmentFacade.beginTransaction(); } catch (DatabaseException e) { @@ -1329,14 +1328,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private BDBTransaction() throws StoreException { - try - { - _txn = _environmentFacade.getEnvironment().beginTransaction(null, null); - } - catch(DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Cannot create store transaction", e); - } + _txn = _environmentFacade.beginTransaction(); } @Override diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java index c9341dce02..6550a9122e 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java @@ -24,7 +24,6 @@ import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; -import com.sun.org.apache.xalan.internal.xsltc.runtime.BasisLibrary; import org.apache.log4j.Logger; import org.apache.qpid.server.store.StoreFuture; diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java index 144ab83238..e51a610a26 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java @@ -29,6 +29,7 @@ import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.Transaction; public interface EnvironmentFacade { @@ -48,6 +49,8 @@ public interface EnvironmentFacade Database getOpenDatabase(String name); + Transaction beginTransaction(); + void commit(com.sleepycat.je.Transaction tx); DatabaseException handleDatabaseException(String contextMessage, DatabaseException e); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java index b11fb6e873..643cd9ae70 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java @@ -32,6 +32,7 @@ import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.Transaction; public class StandardEnvironmentFacade implements EnvironmentFacade { @@ -84,6 +85,13 @@ public class StandardEnvironmentFacade implements EnvironmentFacade } } + + @Override + public Transaction beginTransaction() + { + return _environment.beginTransaction(null, null); + } + @Override public void commit(com.sleepycat.je.Transaction tx) { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java index 38fdf34196..6ec5067d01 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java @@ -48,7 +48,7 @@ public class DatabasePinger Transaction txn = null; try { - txn = facade.getEnvironment().beginTransaction(null, null); + txn = facade.beginTransaction(); db.put(txn, key, value); txn.commit(); txn = null; diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index 3eabaf1c73..17594aa730 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -54,9 +54,12 @@ import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Durability; +import com.sleepycat.je.Durability.ReplicaAckPolicy; +import com.sleepycat.je.Durability.SyncPolicy; import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.EnvironmentFailureException; import com.sleepycat.je.Transaction; +import com.sleepycat.je.TransactionConfig; import com.sleepycat.je.rep.InsufficientLogException; import com.sleepycat.je.rep.InsufficientReplicasException; import com.sleepycat.je.rep.NetworkRestore; @@ -96,6 +99,9 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private static final int REMOTE_NODE_MONITOR_INTERVAL = Integer.getInteger(REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, DEFAULT_REMOTE_NODE_MONITOR_INTERVAL); private static final int RESTART_TRY_LIMIT = 3; + static final SyncPolicy LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.NO_SYNC; + static final SyncPolicy REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.NO_SYNC; + @SuppressWarnings("serial") private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>() {{ @@ -135,8 +141,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan public static final String TYPE = "BDB-HA"; private final ReplicatedEnvironmentConfiguration _configuration; - private final Durability _durability; - private final Boolean _coalescingSync; private final String _prettyGroupNodeName; private final File _environmentDirectory; @@ -153,6 +157,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private volatile ReplicatedEnvironment _environment; private volatile long _joinTime; private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState; + private volatile SyncPolicy _localTransactionSyncronizationPolicy = LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY; + private volatile SyncPolicy _remoteTransactionSyncronizationPolicy = REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY; public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, EnvironmentFacadeTask[] initialisationTasks) { @@ -170,8 +176,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan _initialisationTasks = initialisationTasks; _configuration = configuration; - _durability = Durability.parse(_configuration.getDurability()); - _coalescingSync = _configuration.isCoalescingSync(); _prettyGroupNodeName = _configuration.getGroupName() + ":" + _configuration.getName(); // we relay on this executor being single-threaded as we need to restart and mutate the environment in one thread @@ -185,6 +189,22 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } @Override + public Transaction beginTransaction() + { + try + { + Durability durability = getDurability(); + TransactionConfig transactionConfig = new TransactionConfig(); + transactionConfig.setDurability(durability); + return _environment.beginTransaction(null, transactionConfig); + } + catch(DatabaseException e) + { + throw handleDatabaseException("Failure to start transaction", e); + } + } + + @Override public void commit(final Transaction tx) { try @@ -491,14 +511,16 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan return (String)_configuration.getHelperHostPort(); } - public String getDurability() + public Durability getDurability() { - return _durability.toString(); + SyncPolicy localSync = getLocalTransactionSyncronizationPolicy(); + SyncPolicy replicaSync = getRemoteTransactionSyncronizationPolicy(); + return new Durability(localSync, replicaSync, ReplicaAckPolicy.SIMPLE_MAJORITY); } public boolean isCoalescingSync() { - return _coalescingSync; + return true; } public String getNodeState() @@ -883,6 +905,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan boolean designatedPrimary = _configuration.isDesignatedPrimary(); int priority = _configuration.getPriority(); int quorumOverride = _configuration.getQuorumOverride(); + Durability durability = getDurability(); if (LOGGER.isInfoEnabled()) { @@ -892,8 +915,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan LOGGER.info("Node name " + _configuration.getName()); LOGGER.info("Node host port " + hostPort); LOGGER.info("Helper host port " + helperHostPort); - LOGGER.info("Durability " + _durability); - LOGGER.info("Coalescing sync " + _coalescingSync); + LOGGER.info("Durability " + durability); LOGGER.info("Designated primary (applicable to 2 node case only) " + designatedPrimary); LOGGER.info("Node priority " + priority); LOGGER.info("Quorum override " + quorumOverride); @@ -929,7 +951,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan envConfig.setAllowCreate(true); envConfig.setTransactional(true); envConfig.setExceptionListener(new LoggingAsyncExceptionListener()); - envConfig.setDurability(_durability); + envConfig.setDurability(durability); for (Map.Entry<String, String> configItem : environmentSettings.entrySet()) { @@ -1015,14 +1037,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan @Override public Committer createCommitter(String name) { - if (_coalescingSync) - { - return new CoalescingCommiter(name, this); - } - else - { - return Committer.IMMEDIATE_FUTURE_COMMITTER; - } + return new CoalescingCommiter(name, this); } NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException @@ -1060,6 +1075,26 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } + public SyncPolicy getLocalTransactionSyncronizationPolicy() + { + return _localTransactionSyncronizationPolicy; + } + + public SyncPolicy getRemoteTransactionSyncronizationPolicy() + { + return _remoteTransactionSyncronizationPolicy; + } + + public void setLocalTransactionSyncronizationPolicy(SyncPolicy localTransactionSyncronizationPolicy) + { + _localTransactionSyncronizationPolicy = localTransactionSyncronizationPolicy; + } + + public void setRemoteTransactionSyncronizationPolicy(SyncPolicy remoteTransactionSyncronizationPolicy) + { + _remoteTransactionSyncronizationPolicy = remoteTransactionSyncronizationPolicy; + } + private void populateExistingRemoteReplicationNodes() { ReplicationGroup group = _environment.getGroup(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java new file mode 100644 index 0000000000..3c560de429 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.berkeleydb; + +import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.model.ManagedAttribute; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public interface BDBHAVirtualHost<X extends BDBHAVirtualHost<X>> extends VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>> +{ + String REMOTE_TRANSACTION_SYNCRONIZATION_POLICY = "remoteTransactionSyncronizationPolicy"; + String LOCAL_TRANSACTION_SYNCRONIZATION_POLICY = "localTransactionSyncronizationPolicy"; + + @ManagedAttribute( defaultValue = "NO_SYNC") + String getLocalTransactionSyncronizationPolicy(); + + @ManagedAttribute( defaultValue = "NO_SYNC") + String getRemoteTransactionSyncronizationPolicy(); +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java new file mode 100644 index 0000000000..682f03bf7e --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java @@ -0,0 +1,158 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.berkeleydb; + +import java.util.Map; +import java.util.Set; + +import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.ManagedAttributeField; +import org.apache.qpid.server.model.ManagedObject; +import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; +import org.apache.qpid.server.virtualhost.AbstractVirtualHost; + +import com.sleepycat.je.Durability.SyncPolicy; + +@ManagedObject( category = false, type = "BDB_HA" ) +public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostImpl> implements BDBHAVirtualHost<BDBHAVirtualHostImpl> +{ + public static final String TYPE = "BDB_HA"; + + private final BDBMessageStore _messageStore; + private MessageStoreLogSubject _messageStoreLogSubject; + + @ManagedAttributeField(afterSet="setLocalTransactionSyncronizationPolicyOnEnvironment") + private String _localTransactionSyncronizationPolicy; + + @ManagedAttributeField(afterSet="setRemoteTransactionSyncronizationPolicyOnEnvironment") + private String _remoteTransactionSyncronizationPolicy; + + @ManagedObjectFactoryConstructor + public BDBHAVirtualHostImpl(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode) + { + super(attributes, virtualHostNode); + + _messageStore = (BDBMessageStore) virtualHostNode.getConfigurationStore(); + _messageStoreLogSubject = new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName()); + } + + @Override + protected void initialiseStorage() + { + } + + @Override + public DurableConfigurationStore getDurableConfigurationStore() + { + return _messageStore; + } + + @Override + public MessageStore getMessageStore() + { + return _messageStore; + } + + @Override + protected MessageStoreLogSubject getMessageStoreLogSubject() + { + return _messageStoreLogSubject; + } + + @Override + public String getLocalTransactionSyncronizationPolicy() + { + return _localTransactionSyncronizationPolicy; + } + + @Override + public String getRemoteTransactionSyncronizationPolicy() + { + return _remoteTransactionSyncronizationPolicy; + } + + @Override + public void onOpen() + { + super.onOpen(); + setRemoteTransactionSyncronizationPolicyOnEnvironment(); + setLocalTransactionSyncronizationPolicyOnEnvironment(); + } + + @Override + protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes) + { + super.validateChange(proxyForValidation, changedAttributes); + + if(changedAttributes.contains(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY)) + { + String policy = ((BDBHAVirtualHost<?>)proxyForValidation).getLocalTransactionSyncronizationPolicy(); + validateTransactionSynchronizationPolicy(policy); + } + + if(changedAttributes.contains(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY)) + { + String policy = ((BDBHAVirtualHost<?>)proxyForValidation).getRemoteTransactionSyncronizationPolicy(); + validateTransactionSynchronizationPolicy(policy); + } + } + + private void validateTransactionSynchronizationPolicy(String policy) + { + try + { + SyncPolicy.valueOf(policy); + } + catch(Exception e) + { + throw new IllegalArgumentException("Invalid transaction syncronization policy '" + policy + "'. " + e.getMessage()); + } + } + + protected void setLocalTransactionSyncronizationPolicyOnEnvironment() + { + ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade(); + if (facade != null) + { + facade.setLocalTransactionSyncronizationPolicy(SyncPolicy.valueOf(getLocalTransactionSyncronizationPolicy())); + } + } + + protected void setRemoteTransactionSyncronizationPolicyOnEnvironment() + { + ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade(); + if (facade != null) + { + facade.setRemoteTransactionSyncronizationPolicy(SyncPolicy.valueOf(getRemoteTransactionSyncronizationPolicy())); + } + } + + private ReplicatedEnvironmentFacade getReplicatedEnvironmentFacade() + { + return (ReplicatedEnvironmentFacade)_messageStore.getEnvironmentFacade(); + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java index 0d0292ce58..9fdaccb949 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java @@ -49,12 +49,9 @@ public interface BDBHAVirtualHostNode<X extends BDBHAVirtualHostNode<X>> extends @ManagedAttribute(mandatory=true) String getHelperAddress(); - @ManagedAttribute(defaultValue = "NO_SYNC,NO_SYNC,SIMPLE_MAJORITY") + @DerivedAttribute String getDurability(); - @ManagedAttribute(defaultValue = "true") - boolean isCoalescingSync(); - @ManagedAttribute(defaultValue = "false") boolean isDesignatedPrimary(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index e978ab6c0b..11a58b6167 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -97,12 +97,6 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu @ManagedAttributeField private String _address; - @ManagedAttributeField - private String _durability; - - @ManagedAttributeField - private boolean _coalescingSync; - @ManagedAttributeField(afterSet="postSetDesignatedPrimary") private boolean _designatedPrimary; @@ -183,13 +177,12 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu @Override public String getDurability() { - return _durability; - } - - @Override - public boolean isCoalescingSync() - { - return _coalescingSync; + ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade(); + if (environmentFacade != null) + { + return environmentFacade.getDurability().toString(); + } + return null; } @Override @@ -591,7 +584,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } catch (TimeoutException e) { - LOGGER.warn("Change quorum override did not complete within " + MUTATE_JE_TIMEOUT_MS + "ms. New value " + _durability + " will become effective once the JE task thread is free."); + LOGGER.warn("Change quorum override did not complete within " + MUTATE_JE_TIMEOUT_MS + "ms. New value " + _quorumOverride + " will become effective once the JE task thread is free."); } catch (InterruptedException e) { diff --git a/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/edit.js b/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/edit.js index 269bb402c5..02996046e3 100644 --- a/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/edit.js +++ b/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/edit.js @@ -43,8 +43,8 @@ define(["dojo/_base/xhr", "dojo/domReady!"], function (xhr, array, event, lang, win, dom, domConstruct, registry, parser, json, query, Memory, ObjectStore, template) { - var fields = [ "storePath", "name", "groupName", "address", "durability", - "coalescingSync", "designatedPrimary", "priority", "quorumOverride"]; + var fields = [ "storePath", "name", "groupName", "address", + "designatedPrimary", "priority", "quorumOverride"]; var bdbHaNodeEditor = { diff --git a/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js b/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js index 9dbebc9d7d..da63e031e6 100644 --- a/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js +++ b/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js @@ -35,7 +35,7 @@ define(["dojo/_base/xhr", function (xhr, lang, connect, parser, json, entities, query, json, registry, EnhancedGrid, UpdatableStore, UserPreferences, edit) { var priorityNames = {'_0': 'Never', '_1': 'Default', '_2': 'Normal', '_3': 'High'}; - var nodeFields = ["storePath", "groupName", "role", "address", "coalescingSync", "designatedPrimary", "durability", "priority", "quorumOverride"]; + var nodeFields = ["storePath", "groupName", "role", "address", "designatedPrimary", "durability", "priority", "quorumOverride"]; function findNode(nodeClass, containerNode) { diff --git a/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/edit.html b/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/edit.html index 79ab5155c3..355a9ef62a 100644 --- a/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/edit.html +++ b/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/edit.html @@ -79,32 +79,6 @@ </td> </tr> <tr> - <td class="tableContainer-labelCell" style="width: 200px;"><strong>Durability: </strong></td> - <td class="tableContainer-valueCell"> - <input type="text" id="editBDBHANode.durability" - data-dojo-type="dijit/form/ValidationTextBox" - data-dojo-props=" - name: 'durability', - disabled: true, - required: false, - placeHolder: 'NO_SYNC,NO_SYNC,SIMPLE_MAJORITY', - title: 'Enter node durability settings', - pattern: '^(SYN|NO_SYNC|WRITE_NO_SYNC),(SYN|NO_SYNC|WRITE_NO_SYNC),(SIMPLE_MAJORITY|ALL|NONE)*$'" /> - </td> - </tr> - <tr> - <td class="tableContainer-labelCell" style="width: 200px;"><strong>Coalescing sync: </strong></td> - <td class="tableContainer-valueCell"> - <input type="checkbox" id="editBDBHANode.coalescingSync" checked="checked" - data-dojo-type="dijit/form/CheckBox" - data-dojo-props=" - name: 'coalescingSync', - required: false, - disabled: true, - title: 'Enable coalescing sync mode for a better performance: when many transactions are synced to disk at the same time'" /> - </td> - </tr> - <tr> <td class="tableContainer-labelCell" style="width: 200px;"><strong>Allow this node to operate solo: </strong></td> <td class="tableContainer-valueCell"> <input type="checkbox" id="editBDBHANode.designatedPrimary" checked="checked" diff --git a/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/show.html b/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/show.html index 579ec12608..792305aa8e 100644 --- a/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/show.html +++ b/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/show.html @@ -36,10 +36,6 @@ <div class="address" style="float:left;">N/A</div> </div> <div style="clear:both"> - <div class="formLabel-labelCell" style="float:left; width: 200px;">Coalescing Sync:</div> - <div class="coalescingSync" style="float:left;">N/A</div> - </div> - <div style="clear:both"> <div class="formLabel-labelCell" style="float:left; width: 200px;">Durability:</div> <div class="durability" style="float:left;">N/A</div> </div> diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java index 4399077a46..563665917c 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java @@ -51,6 +51,8 @@ import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost; +import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl; import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode; import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNodeImpl; import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; @@ -127,7 +129,6 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase String groupName = "group"; String nodeHostPort = "localhost:" + findFreePort(); String helperHostPort = nodeHostPort; - String durability = "NO_SYNC,SYNC,NONE"; UUID id = UUID.randomUUID(); Map<String, Object> attributes = new HashMap<String, Object>(); @@ -137,7 +138,6 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName); attributes.put(BDBHAVirtualHostNode.ADDRESS, nodeHostPort); attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperHostPort); - attributes.put(BDBHAVirtualHostNode.DURABILITY, durability); attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath); attributes.put(BDBHAVirtualHostNode.REPLICATED_ENVIRONMENT_CONFIGURATION, Collections.singletonMap(ReplicationConfig.REP_STREAM_TIMEOUT, repStreamTimeout)); @@ -174,7 +174,7 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase assertEquals(nodeHostPort, replicationConfig.getNodeHostPort()); assertEquals(helperHostPort, replicationConfig.getHelperHosts()); - assertEquals(durability, environment.getConfig().getDurability().toString()); + assertEquals("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY", environment.getConfig().getDurability().toString()); assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, replicationConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT)); assertTrue("Virtual host child has not been added", virtualHostAddedLatch.await(30, TimeUnit.SECONDS)); @@ -426,6 +426,78 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase assertNull("Remote node " + replica.getName() + " is not found", findRemoteNode( master, replica.getName())); } + + public void testSetSynchronizationPolicyAttributesOnVirtualHost() throws Exception + { + int node1PortNumber = findFreePort(); + String helperAddress = "localhost:" + node1PortNumber; + String groupName = "group"; + + Map<String, Object> nodeAttributes = new HashMap<String, Object>(); + nodeAttributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID()); + nodeAttributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + nodeAttributes.put(BDBHAVirtualHostNode.NAME, "node1"); + nodeAttributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName); + nodeAttributes.put(BDBHAVirtualHostNode.ADDRESS, helperAddress); + nodeAttributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress); + nodeAttributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "1"); + BDBHAVirtualHostNode<?> node = createHaVHN(nodeAttributes); + + final CountDownLatch virtualHostAddedLatch = new CountDownLatch(1); + node.addChangeListener(new NoopConfigurationChangeListener() + { + @Override + public void childAdded(ConfiguredObject<?> object, ConfiguredObject<?> child) + { + if (child instanceof VirtualHost) + { + child.addChangeListener(this); + virtualHostAddedLatch.countDown(); + } + } + }); + + node.start(); + assertNodeRole(node, "MASTER", "REPLICA"); + assertEquals("Unexpected node state", State.ACTIVE, node.getState()); + + assertTrue("Virtual host child has not been added", virtualHostAddedLatch.await(30, TimeUnit.SECONDS)); + BDBHAVirtualHostImpl virtualHost = (BDBHAVirtualHostImpl)node.getVirtualHost(); + assertNotNull("Virtual host is not created", virtualHost); + + assertEquals("Unexpected local transaction synchronization policy", "NO_SYNC", virtualHost.getLocalTransactionSyncronizationPolicy()); + assertEquals("Unexpected remote transaction synchronization policy", "NO_SYNC", virtualHost.getRemoteTransactionSyncronizationPolicy()); + + Map<String, Object> virtualHostAttributes = new HashMap<String,Object>(); + virtualHostAttributes.put(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "WRITE_NO_SYNC"); + virtualHostAttributes.put(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY, "SYNC"); + virtualHost.setAttributes(virtualHostAttributes); + + assertEquals("Unexpected local transaction synchronization policy", "WRITE_NO_SYNC", virtualHost.getLocalTransactionSyncronizationPolicy()); + assertEquals("Unexpected remote transaction synchronization policy", "SYNC", virtualHost.getRemoteTransactionSyncronizationPolicy()); + + try + { + virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "INVALID")); + fail("Invalid syncronization policy is set"); + } + catch(IllegalArgumentException e) + { + //pass + } + + try + { + virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY, "INVALID")); + fail("Invalid syncronization policy is set"); + } + catch(IllegalArgumentException e) + { + //pass + } + + } + private BDBHARemoteReplicationNode<?> findRemoteNode(BDBHAVirtualHostNode<?> node, String name) { for (RemoteReplicationNode<?> remoteNode : node.getRemoteReplicationNodes()) diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index 7f6083a0c9..3660566c8c 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -41,7 +41,9 @@ import org.apache.qpid.util.FileUtils; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.Durability; +import com.sleepycat.je.Durability.SyncPolicy; import com.sleepycat.je.Environment; +import com.sleepycat.je.Transaction; import com.sleepycat.je.rep.InsufficientReplicasException; import com.sleepycat.je.rep.NodeState; import com.sleepycat.je.rep.ReplicatedEnvironment; @@ -183,7 +185,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase public void testGetDurability() throws Exception { - assertEquals("Unexpected durability", TEST_DURABILITY.toString(), createMaster().getDurability()); + assertEquals("Unexpected durability", TEST_DURABILITY.toString(), createMaster().getDurability().toString()); } public void testIsCoalescingSync() throws Exception @@ -196,7 +198,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase assertEquals("Unexpected state", State.MASTER.name(), createMaster().getNodeState()); } - public void testPriority() throws Exception { ReplicatedEnvironmentFacade facade = createMaster(); @@ -685,6 +686,46 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState()); } + public void testSetLocalTransactionSyncronizationPolicy() throws Exception + { + ReplicatedEnvironmentFacade facade = createMaster(); + assertEquals("Unexpected local transaction synchronization policy before change", + ReplicatedEnvironmentFacade.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getLocalTransactionSyncronizationPolicy()); + facade.setLocalTransactionSyncronizationPolicy(SyncPolicy.WRITE_NO_SYNC); + assertEquals("Unexpected local transaction synchronization policy after change", + SyncPolicy.WRITE_NO_SYNC, facade.getLocalTransactionSyncronizationPolicy()); + } + + public void testSetRemoteTransactionSyncronizationPolicy() throws Exception + { + ReplicatedEnvironmentFacade facade = createMaster(); + assertEquals("Unexpected remote transaction synchronization policy before change", + ReplicatedEnvironmentFacade.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getRemoteTransactionSyncronizationPolicy()); + facade.setRemoteTransactionSyncronizationPolicy(SyncPolicy.WRITE_NO_SYNC); + assertEquals("Unexpected remote transaction synchronization policy after change", + SyncPolicy.WRITE_NO_SYNC, facade.getRemoteTransactionSyncronizationPolicy()); + } + + public void testBeginTransaction() throws Exception + { + ReplicatedEnvironmentFacade facade = createMaster(); + Transaction txn = null; + try + { + txn = facade.beginTransaction(); + assertNotNull("Transaction is not created", txn); + txn.commit(); + txn = null; + } + finally + { + if (txn != null) + { + txn.abort(); + } + } + } + private ReplicatedEnvironmentFacade createMaster() throws Exception { return createMaster(new NoopReplicationGroupListener()); diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java index 7f8f3ad22c..ac6ca45877 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java @@ -302,20 +302,6 @@ public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase } } - private Map<String, Object> waitForAttributeChanged(String url, String attributeName, Object newValue) throws Exception - { - List<Map<String, Object>> nodeAttributes = getRestTestHelper().getJsonAsList(url); - long limit = System.currentTimeMillis() + 5000; - while(System.currentTimeMillis() < limit && (nodeAttributes.size() == 0 || !newValue.equals(nodeAttributes.get(0).get(attributeName)))) - { - Thread.sleep(100l); - nodeAttributes = getRestTestHelper().getJsonAsList(url); - } - Map<String, Object> nodeData = nodeAttributes.get(0); - assertEquals("Unexpected attribute " + attributeName, newValue, nodeData.get(attributeName)); - return nodeData; - } - private void assertRemoteNodeData(String name, Map<String, Object> nodeData) { assertEquals("Remote node " + name + " has unexpected name", name, nodeData.get(BDBHAVirtualHostNode.NAME)); diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java new file mode 100644 index 0000000000..c120a3d63c --- /dev/null +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java @@ -0,0 +1,117 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY; +import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; +import org.apache.qpid.systest.rest.QpidRestTestCase; +import org.apache.qpid.test.utils.TestBrokerConfiguration; +import org.apache.qpid.util.FileUtils; + +public class BDBHAVirtualHostRestTest extends QpidRestTestCase +{ + private String _hostName; + private File _storeBaseDir; + private int _nodeHaPort; + private Object _nodeName; + private String _virtualhostUrl; + + @Override + public void setUp() throws Exception + { + setTestSystemProperty(ReplicatedEnvironmentFacade.REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, "1000"); + _hostName = "ha"; + _nodeName = "node1"; + _storeBaseDir = new File(TMP_FOLDER, "store-" + _hostName + "-" + System.currentTimeMillis()); + _nodeHaPort = getNextAvailable(getRestTestHelper().getHttpPort() + 1); + _virtualhostUrl = "virtualhost/" + _nodeName + "/" + _hostName; + + super.setUp(); + } + + @Override + public void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + if (_storeBaseDir != null) + { + FileUtils.delete(_storeBaseDir, true); + } + } + } + + @Override + protected void customizeConfiguration() throws IOException + { + super.customizeConfiguration(); + TestBrokerConfiguration config = getBrokerConfiguration(); + config.removeObjectConfiguration(VirtualHostNode.class, TEST2_VIRTUALHOST); + config.removeObjectConfiguration(VirtualHostNode.class, TEST3_VIRTUALHOST); + + Map<String, Object> nodeAttributes = new HashMap<String, Object>(); + nodeAttributes.put(BDBHAVirtualHostNode.NAME, _nodeName); + nodeAttributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + nodeAttributes.put(BDBHAVirtualHostNode.STORE_PATH, _storeBaseDir.getPath() + File.separator + _nodeName); + nodeAttributes.put(BDBHAVirtualHostNode.GROUP_NAME, _hostName); + nodeAttributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + _nodeHaPort); + nodeAttributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, "localhost:" + _nodeHaPort); + config.addObjectConfiguration(VirtualHostNode.class, nodeAttributes); + } + + public void testSetLocalTransactionSynchronizationPolicy() throws Exception + { + assertSetTransactionSynchronizationPolicy(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY); + } + + public void testSetRemoteTransactionSynchronizationPolicy() throws Exception + { + assertSetTransactionSynchronizationPolicy(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY); + } + + private void assertSetTransactionSynchronizationPolicy(String policyAttribute) throws Exception, IOException + { + Map<String, Object> hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name()); + assertEquals("Unexpected synchronization policy before change", "NO_SYNC", hostAttributes.get(policyAttribute)); + + Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(policyAttribute, "SYNC"); + getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, 200); + + hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl); + assertEquals("Unexpected synchronization policy after change", "SYNC", hostAttributes.get(policyAttribute)); + } +} |
