diff options
Diffstat (limited to 'qpid')
22 files changed, 529 insertions, 185 deletions
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java index 7146af364e..8c05602967 100644 --- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java +++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java @@ -130,7 +130,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M @Override public boolean getCoalescingSync() throws IOException, JMException { - return _virtualHostNode.isCoalescingSync(); + return true; } @Override diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java index 439af259ab..e5829c519a 100644 --- a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java +++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java @@ -119,13 +119,6 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase assertEquals(TEST_DURABILITY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DURABILITY)); } - public void testCoalescingSync() throws Exception - { - when(_virtualHostNode.isCoalescingSync()).thenReturn(true); - - assertEquals(true, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_COALESCING_SYNC)); - } - public void testNodeState() throws Exception { when(_virtualHostNode.getRole()).thenReturn(TEST_NODE_STATE); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java deleted file mode 100644 index adc73c9bd9..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.util.Map; - -import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; -import org.apache.qpid.server.model.ManagedObject; -import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; -import org.apache.qpid.server.model.VirtualHostNode; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.virtualhost.AbstractVirtualHost; - -@ManagedObject( category = false, type = "BDB_HA" ) -public class BDBHAVirtualHost extends AbstractVirtualHost<BDBHAVirtualHost> -{ - public static final String TYPE = "BDB_HA"; - - private final BDBMessageStore _messageStore; - private MessageStoreLogSubject _messageStoreLogSubject; - - @ManagedObjectFactoryConstructor - protected BDBHAVirtualHost(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode) - { - super(attributes, virtualHostNode); - - _messageStore = (BDBMessageStore) virtualHostNode.getConfigurationStore(); - _messageStoreLogSubject = new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName()); - } - - @Override - protected void initialiseStorage() - { - } - - @Override - public DurableConfigurationStore getDurableConfigurationStore() - { - return _messageStore; - } - - @Override - public MessageStore getMessageStore() - { - return _messageStore; - } - - @Override - protected MessageStoreLogSubject getMessageStoreLogSubject() - { - return _messageStoreLogSubject; - } - -} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 5d9c2012a9..2675793b36 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -398,7 +398,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore tx = null; try { - tx = _environmentFacade.getEnvironment().beginTransaction(null, null); + tx = _environmentFacade.beginTransaction(); //remove the message meta data from the store DatabaseEntry key = new DatabaseEntry(); @@ -542,7 +542,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore com.sleepycat.je.Transaction txn = null; try { - txn = _environmentFacade.getEnvironment().beginTransaction(null, null); + txn = _environmentFacade.beginTransaction(); storeConfiguredObjectEntry(txn, configuredObject); txn.commit(); txn = null; @@ -569,7 +569,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore com.sleepycat.je.Transaction txn = null; try { - txn = _environmentFacade.getEnvironment().beginTransaction(null, null); + txn = _environmentFacade.beginTransaction(); Collection<UUID> removed = new ArrayList<UUID>(objects.length); for(ConfiguredObjectRecord record : objects) @@ -606,7 +606,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore com.sleepycat.je.Transaction txn = null; try { - txn = _environmentFacade.getEnvironment().beginTransaction(null, null); + txn = _environmentFacade.beginTransaction(); for(ConfiguredObjectRecord record : records) { update(createIfNecessary, record, txn); @@ -1290,8 +1290,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore com.sleepycat.je.Transaction txn; try { - txn = _environmentFacade.getEnvironment().beginTransaction( - null, null); + txn = _environmentFacade.beginTransaction(); } catch (DatabaseException e) { @@ -1329,14 +1328,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private BDBTransaction() throws StoreException { - try - { - _txn = _environmentFacade.getEnvironment().beginTransaction(null, null); - } - catch(DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Cannot create store transaction", e); - } + _txn = _environmentFacade.beginTransaction(); } @Override diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java index c9341dce02..6550a9122e 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java @@ -24,7 +24,6 @@ import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; -import com.sun.org.apache.xalan.internal.xsltc.runtime.BasisLibrary; import org.apache.log4j.Logger; import org.apache.qpid.server.store.StoreFuture; diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java index 144ab83238..e51a610a26 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java @@ -29,6 +29,7 @@ import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.Transaction; public interface EnvironmentFacade { @@ -48,6 +49,8 @@ public interface EnvironmentFacade Database getOpenDatabase(String name); + Transaction beginTransaction(); + void commit(com.sleepycat.je.Transaction tx); DatabaseException handleDatabaseException(String contextMessage, DatabaseException e); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java index b11fb6e873..643cd9ae70 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java @@ -32,6 +32,7 @@ import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.Transaction; public class StandardEnvironmentFacade implements EnvironmentFacade { @@ -84,6 +85,13 @@ public class StandardEnvironmentFacade implements EnvironmentFacade } } + + @Override + public Transaction beginTransaction() + { + return _environment.beginTransaction(null, null); + } + @Override public void commit(com.sleepycat.je.Transaction tx) { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java index 38fdf34196..6ec5067d01 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java @@ -48,7 +48,7 @@ public class DatabasePinger Transaction txn = null; try { - txn = facade.getEnvironment().beginTransaction(null, null); + txn = facade.beginTransaction(); db.put(txn, key, value); txn.commit(); txn = null; diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index 3eabaf1c73..17594aa730 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -54,9 +54,12 @@ import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Durability; +import com.sleepycat.je.Durability.ReplicaAckPolicy; +import com.sleepycat.je.Durability.SyncPolicy; import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.EnvironmentFailureException; import com.sleepycat.je.Transaction; +import com.sleepycat.je.TransactionConfig; import com.sleepycat.je.rep.InsufficientLogException; import com.sleepycat.je.rep.InsufficientReplicasException; import com.sleepycat.je.rep.NetworkRestore; @@ -96,6 +99,9 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private static final int REMOTE_NODE_MONITOR_INTERVAL = Integer.getInteger(REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, DEFAULT_REMOTE_NODE_MONITOR_INTERVAL); private static final int RESTART_TRY_LIMIT = 3; + static final SyncPolicy LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.NO_SYNC; + static final SyncPolicy REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY = SyncPolicy.NO_SYNC; + @SuppressWarnings("serial") private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>() {{ @@ -135,8 +141,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan public static final String TYPE = "BDB-HA"; private final ReplicatedEnvironmentConfiguration _configuration; - private final Durability _durability; - private final Boolean _coalescingSync; private final String _prettyGroupNodeName; private final File _environmentDirectory; @@ -153,6 +157,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private volatile ReplicatedEnvironment _environment; private volatile long _joinTime; private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState; + private volatile SyncPolicy _localTransactionSyncronizationPolicy = LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY; + private volatile SyncPolicy _remoteTransactionSyncronizationPolicy = REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY; public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, EnvironmentFacadeTask[] initialisationTasks) { @@ -170,8 +176,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan _initialisationTasks = initialisationTasks; _configuration = configuration; - _durability = Durability.parse(_configuration.getDurability()); - _coalescingSync = _configuration.isCoalescingSync(); _prettyGroupNodeName = _configuration.getGroupName() + ":" + _configuration.getName(); // we relay on this executor being single-threaded as we need to restart and mutate the environment in one thread @@ -185,6 +189,22 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } @Override + public Transaction beginTransaction() + { + try + { + Durability durability = getDurability(); + TransactionConfig transactionConfig = new TransactionConfig(); + transactionConfig.setDurability(durability); + return _environment.beginTransaction(null, transactionConfig); + } + catch(DatabaseException e) + { + throw handleDatabaseException("Failure to start transaction", e); + } + } + + @Override public void commit(final Transaction tx) { try @@ -491,14 +511,16 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan return (String)_configuration.getHelperHostPort(); } - public String getDurability() + public Durability getDurability() { - return _durability.toString(); + SyncPolicy localSync = getLocalTransactionSyncronizationPolicy(); + SyncPolicy replicaSync = getRemoteTransactionSyncronizationPolicy(); + return new Durability(localSync, replicaSync, ReplicaAckPolicy.SIMPLE_MAJORITY); } public boolean isCoalescingSync() { - return _coalescingSync; + return true; } public String getNodeState() @@ -883,6 +905,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan boolean designatedPrimary = _configuration.isDesignatedPrimary(); int priority = _configuration.getPriority(); int quorumOverride = _configuration.getQuorumOverride(); + Durability durability = getDurability(); if (LOGGER.isInfoEnabled()) { @@ -892,8 +915,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan LOGGER.info("Node name " + _configuration.getName()); LOGGER.info("Node host port " + hostPort); LOGGER.info("Helper host port " + helperHostPort); - LOGGER.info("Durability " + _durability); - LOGGER.info("Coalescing sync " + _coalescingSync); + LOGGER.info("Durability " + durability); LOGGER.info("Designated primary (applicable to 2 node case only) " + designatedPrimary); LOGGER.info("Node priority " + priority); LOGGER.info("Quorum override " + quorumOverride); @@ -929,7 +951,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan envConfig.setAllowCreate(true); envConfig.setTransactional(true); envConfig.setExceptionListener(new LoggingAsyncExceptionListener()); - envConfig.setDurability(_durability); + envConfig.setDurability(durability); for (Map.Entry<String, String> configItem : environmentSettings.entrySet()) { @@ -1015,14 +1037,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan @Override public Committer createCommitter(String name) { - if (_coalescingSync) - { - return new CoalescingCommiter(name, this); - } - else - { - return Committer.IMMEDIATE_FUTURE_COMMITTER; - } + return new CoalescingCommiter(name, this); } NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException @@ -1060,6 +1075,26 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } + public SyncPolicy getLocalTransactionSyncronizationPolicy() + { + return _localTransactionSyncronizationPolicy; + } + + public SyncPolicy getRemoteTransactionSyncronizationPolicy() + { + return _remoteTransactionSyncronizationPolicy; + } + + public void setLocalTransactionSyncronizationPolicy(SyncPolicy localTransactionSyncronizationPolicy) + { + _localTransactionSyncronizationPolicy = localTransactionSyncronizationPolicy; + } + + public void setRemoteTransactionSyncronizationPolicy(SyncPolicy remoteTransactionSyncronizationPolicy) + { + _remoteTransactionSyncronizationPolicy = remoteTransactionSyncronizationPolicy; + } + private void populateExistingRemoteReplicationNodes() { ReplicationGroup group = _environment.getGroup(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java new file mode 100644 index 0000000000..3c560de429 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.berkeleydb; + +import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.model.ManagedAttribute; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public interface BDBHAVirtualHost<X extends BDBHAVirtualHost<X>> extends VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>> +{ + String REMOTE_TRANSACTION_SYNCRONIZATION_POLICY = "remoteTransactionSyncronizationPolicy"; + String LOCAL_TRANSACTION_SYNCRONIZATION_POLICY = "localTransactionSyncronizationPolicy"; + + @ManagedAttribute( defaultValue = "NO_SYNC") + String getLocalTransactionSyncronizationPolicy(); + + @ManagedAttribute( defaultValue = "NO_SYNC") + String getRemoteTransactionSyncronizationPolicy(); +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java new file mode 100644 index 0000000000..682f03bf7e --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java @@ -0,0 +1,158 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.berkeleydb; + +import java.util.Map; +import java.util.Set; + +import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.ManagedAttributeField; +import org.apache.qpid.server.model.ManagedObject; +import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; +import org.apache.qpid.server.virtualhost.AbstractVirtualHost; + +import com.sleepycat.je.Durability.SyncPolicy; + +@ManagedObject( category = false, type = "BDB_HA" ) +public class BDBHAVirtualHostImpl extends AbstractVirtualHost<BDBHAVirtualHostImpl> implements BDBHAVirtualHost<BDBHAVirtualHostImpl> +{ + public static final String TYPE = "BDB_HA"; + + private final BDBMessageStore _messageStore; + private MessageStoreLogSubject _messageStoreLogSubject; + + @ManagedAttributeField(afterSet="setLocalTransactionSyncronizationPolicyOnEnvironment") + private String _localTransactionSyncronizationPolicy; + + @ManagedAttributeField(afterSet="setRemoteTransactionSyncronizationPolicyOnEnvironment") + private String _remoteTransactionSyncronizationPolicy; + + @ManagedObjectFactoryConstructor + public BDBHAVirtualHostImpl(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode) + { + super(attributes, virtualHostNode); + + _messageStore = (BDBMessageStore) virtualHostNode.getConfigurationStore(); + _messageStoreLogSubject = new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName()); + } + + @Override + protected void initialiseStorage() + { + } + + @Override + public DurableConfigurationStore getDurableConfigurationStore() + { + return _messageStore; + } + + @Override + public MessageStore getMessageStore() + { + return _messageStore; + } + + @Override + protected MessageStoreLogSubject getMessageStoreLogSubject() + { + return _messageStoreLogSubject; + } + + @Override + public String getLocalTransactionSyncronizationPolicy() + { + return _localTransactionSyncronizationPolicy; + } + + @Override + public String getRemoteTransactionSyncronizationPolicy() + { + return _remoteTransactionSyncronizationPolicy; + } + + @Override + public void onOpen() + { + super.onOpen(); + setRemoteTransactionSyncronizationPolicyOnEnvironment(); + setLocalTransactionSyncronizationPolicyOnEnvironment(); + } + + @Override + protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes) + { + super.validateChange(proxyForValidation, changedAttributes); + + if(changedAttributes.contains(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY)) + { + String policy = ((BDBHAVirtualHost<?>)proxyForValidation).getLocalTransactionSyncronizationPolicy(); + validateTransactionSynchronizationPolicy(policy); + } + + if(changedAttributes.contains(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY)) + { + String policy = ((BDBHAVirtualHost<?>)proxyForValidation).getRemoteTransactionSyncronizationPolicy(); + validateTransactionSynchronizationPolicy(policy); + } + } + + private void validateTransactionSynchronizationPolicy(String policy) + { + try + { + SyncPolicy.valueOf(policy); + } + catch(Exception e) + { + throw new IllegalArgumentException("Invalid transaction syncronization policy '" + policy + "'. " + e.getMessage()); + } + } + + protected void setLocalTransactionSyncronizationPolicyOnEnvironment() + { + ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade(); + if (facade != null) + { + facade.setLocalTransactionSyncronizationPolicy(SyncPolicy.valueOf(getLocalTransactionSyncronizationPolicy())); + } + } + + protected void setRemoteTransactionSyncronizationPolicyOnEnvironment() + { + ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade(); + if (facade != null) + { + facade.setRemoteTransactionSyncronizationPolicy(SyncPolicy.valueOf(getRemoteTransactionSyncronizationPolicy())); + } + } + + private ReplicatedEnvironmentFacade getReplicatedEnvironmentFacade() + { + return (ReplicatedEnvironmentFacade)_messageStore.getEnvironmentFacade(); + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java index 0d0292ce58..9fdaccb949 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java @@ -49,12 +49,9 @@ public interface BDBHAVirtualHostNode<X extends BDBHAVirtualHostNode<X>> extends @ManagedAttribute(mandatory=true) String getHelperAddress(); - @ManagedAttribute(defaultValue = "NO_SYNC,NO_SYNC,SIMPLE_MAJORITY") + @DerivedAttribute String getDurability(); - @ManagedAttribute(defaultValue = "true") - boolean isCoalescingSync(); - @ManagedAttribute(defaultValue = "false") boolean isDesignatedPrimary(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index e978ab6c0b..11a58b6167 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -97,12 +97,6 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu @ManagedAttributeField private String _address; - @ManagedAttributeField - private String _durability; - - @ManagedAttributeField - private boolean _coalescingSync; - @ManagedAttributeField(afterSet="postSetDesignatedPrimary") private boolean _designatedPrimary; @@ -183,13 +177,12 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu @Override public String getDurability() { - return _durability; - } - - @Override - public boolean isCoalescingSync() - { - return _coalescingSync; + ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade(); + if (environmentFacade != null) + { + return environmentFacade.getDurability().toString(); + } + return null; } @Override @@ -591,7 +584,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } catch (TimeoutException e) { - LOGGER.warn("Change quorum override did not complete within " + MUTATE_JE_TIMEOUT_MS + "ms. New value " + _durability + " will become effective once the JE task thread is free."); + LOGGER.warn("Change quorum override did not complete within " + MUTATE_JE_TIMEOUT_MS + "ms. New value " + _quorumOverride + " will become effective once the JE task thread is free."); } catch (InterruptedException e) { diff --git a/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/edit.js b/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/edit.js index 269bb402c5..02996046e3 100644 --- a/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/edit.js +++ b/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/edit.js @@ -43,8 +43,8 @@ define(["dojo/_base/xhr", "dojo/domReady!"], function (xhr, array, event, lang, win, dom, domConstruct, registry, parser, json, query, Memory, ObjectStore, template) { - var fields = [ "storePath", "name", "groupName", "address", "durability", - "coalescingSync", "designatedPrimary", "priority", "quorumOverride"]; + var fields = [ "storePath", "name", "groupName", "address", + "designatedPrimary", "priority", "quorumOverride"]; var bdbHaNodeEditor = { diff --git a/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js b/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js index 9dbebc9d7d..da63e031e6 100644 --- a/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js +++ b/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js @@ -35,7 +35,7 @@ define(["dojo/_base/xhr", function (xhr, lang, connect, parser, json, entities, query, json, registry, EnhancedGrid, UpdatableStore, UserPreferences, edit) { var priorityNames = {'_0': 'Never', '_1': 'Default', '_2': 'Normal', '_3': 'High'}; - var nodeFields = ["storePath", "groupName", "role", "address", "coalescingSync", "designatedPrimary", "durability", "priority", "quorumOverride"]; + var nodeFields = ["storePath", "groupName", "role", "address", "designatedPrimary", "durability", "priority", "quorumOverride"]; function findNode(nodeClass, containerNode) { diff --git a/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/edit.html b/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/edit.html index 79ab5155c3..355a9ef62a 100644 --- a/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/edit.html +++ b/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/edit.html @@ -79,32 +79,6 @@ </td> </tr> <tr> - <td class="tableContainer-labelCell" style="width: 200px;"><strong>Durability: </strong></td> - <td class="tableContainer-valueCell"> - <input type="text" id="editBDBHANode.durability" - data-dojo-type="dijit/form/ValidationTextBox" - data-dojo-props=" - name: 'durability', - disabled: true, - required: false, - placeHolder: 'NO_SYNC,NO_SYNC,SIMPLE_MAJORITY', - title: 'Enter node durability settings', - pattern: '^(SYN|NO_SYNC|WRITE_NO_SYNC),(SYN|NO_SYNC|WRITE_NO_SYNC),(SIMPLE_MAJORITY|ALL|NONE)*$'" /> - </td> - </tr> - <tr> - <td class="tableContainer-labelCell" style="width: 200px;"><strong>Coalescing sync: </strong></td> - <td class="tableContainer-valueCell"> - <input type="checkbox" id="editBDBHANode.coalescingSync" checked="checked" - data-dojo-type="dijit/form/CheckBox" - data-dojo-props=" - name: 'coalescingSync', - required: false, - disabled: true, - title: 'Enable coalescing sync mode for a better performance: when many transactions are synced to disk at the same time'" /> - </td> - </tr> - <tr> <td class="tableContainer-labelCell" style="width: 200px;"><strong>Allow this node to operate solo: </strong></td> <td class="tableContainer-valueCell"> <input type="checkbox" id="editBDBHANode.designatedPrimary" checked="checked" diff --git a/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/show.html b/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/show.html index 579ec12608..792305aa8e 100644 --- a/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/show.html +++ b/qpid/java/bdbstore/src/main/java/resources/virtualhostnode/bdb_ha/show.html @@ -36,10 +36,6 @@ <div class="address" style="float:left;">N/A</div> </div> <div style="clear:both"> - <div class="formLabel-labelCell" style="float:left; width: 200px;">Coalescing Sync:</div> - <div class="coalescingSync" style="float:left;">N/A</div> - </div> - <div style="clear:both"> <div class="formLabel-labelCell" style="float:left; width: 200px;">Durability:</div> <div class="durability" style="float:left;">N/A</div> </div> diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java index 4399077a46..563665917c 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java @@ -51,6 +51,8 @@ import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost; +import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl; import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode; import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNodeImpl; import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; @@ -127,7 +129,6 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase String groupName = "group"; String nodeHostPort = "localhost:" + findFreePort(); String helperHostPort = nodeHostPort; - String durability = "NO_SYNC,SYNC,NONE"; UUID id = UUID.randomUUID(); Map<String, Object> attributes = new HashMap<String, Object>(); @@ -137,7 +138,6 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName); attributes.put(BDBHAVirtualHostNode.ADDRESS, nodeHostPort); attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperHostPort); - attributes.put(BDBHAVirtualHostNode.DURABILITY, durability); attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath); attributes.put(BDBHAVirtualHostNode.REPLICATED_ENVIRONMENT_CONFIGURATION, Collections.singletonMap(ReplicationConfig.REP_STREAM_TIMEOUT, repStreamTimeout)); @@ -174,7 +174,7 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase assertEquals(nodeHostPort, replicationConfig.getNodeHostPort()); assertEquals(helperHostPort, replicationConfig.getHelperHosts()); - assertEquals(durability, environment.getConfig().getDurability().toString()); + assertEquals("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY", environment.getConfig().getDurability().toString()); assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, replicationConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT)); assertTrue("Virtual host child has not been added", virtualHostAddedLatch.await(30, TimeUnit.SECONDS)); @@ -426,6 +426,78 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase assertNull("Remote node " + replica.getName() + " is not found", findRemoteNode( master, replica.getName())); } + + public void testSetSynchronizationPolicyAttributesOnVirtualHost() throws Exception + { + int node1PortNumber = findFreePort(); + String helperAddress = "localhost:" + node1PortNumber; + String groupName = "group"; + + Map<String, Object> nodeAttributes = new HashMap<String, Object>(); + nodeAttributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID()); + nodeAttributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + nodeAttributes.put(BDBHAVirtualHostNode.NAME, "node1"); + nodeAttributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName); + nodeAttributes.put(BDBHAVirtualHostNode.ADDRESS, helperAddress); + nodeAttributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress); + nodeAttributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "1"); + BDBHAVirtualHostNode<?> node = createHaVHN(nodeAttributes); + + final CountDownLatch virtualHostAddedLatch = new CountDownLatch(1); + node.addChangeListener(new NoopConfigurationChangeListener() + { + @Override + public void childAdded(ConfiguredObject<?> object, ConfiguredObject<?> child) + { + if (child instanceof VirtualHost) + { + child.addChangeListener(this); + virtualHostAddedLatch.countDown(); + } + } + }); + + node.start(); + assertNodeRole(node, "MASTER", "REPLICA"); + assertEquals("Unexpected node state", State.ACTIVE, node.getState()); + + assertTrue("Virtual host child has not been added", virtualHostAddedLatch.await(30, TimeUnit.SECONDS)); + BDBHAVirtualHostImpl virtualHost = (BDBHAVirtualHostImpl)node.getVirtualHost(); + assertNotNull("Virtual host is not created", virtualHost); + + assertEquals("Unexpected local transaction synchronization policy", "NO_SYNC", virtualHost.getLocalTransactionSyncronizationPolicy()); + assertEquals("Unexpected remote transaction synchronization policy", "NO_SYNC", virtualHost.getRemoteTransactionSyncronizationPolicy()); + + Map<String, Object> virtualHostAttributes = new HashMap<String,Object>(); + virtualHostAttributes.put(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "WRITE_NO_SYNC"); + virtualHostAttributes.put(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY, "SYNC"); + virtualHost.setAttributes(virtualHostAttributes); + + assertEquals("Unexpected local transaction synchronization policy", "WRITE_NO_SYNC", virtualHost.getLocalTransactionSyncronizationPolicy()); + assertEquals("Unexpected remote transaction synchronization policy", "SYNC", virtualHost.getRemoteTransactionSyncronizationPolicy()); + + try + { + virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "INVALID")); + fail("Invalid syncronization policy is set"); + } + catch(IllegalArgumentException e) + { + //pass + } + + try + { + virtualHost.setAttributes(Collections.<String, Object>singletonMap(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY, "INVALID")); + fail("Invalid syncronization policy is set"); + } + catch(IllegalArgumentException e) + { + //pass + } + + } + private BDBHARemoteReplicationNode<?> findRemoteNode(BDBHAVirtualHostNode<?> node, String name) { for (RemoteReplicationNode<?> remoteNode : node.getRemoteReplicationNodes()) diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index 7f6083a0c9..3660566c8c 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -41,7 +41,9 @@ import org.apache.qpid.util.FileUtils; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.Durability; +import com.sleepycat.je.Durability.SyncPolicy; import com.sleepycat.je.Environment; +import com.sleepycat.je.Transaction; import com.sleepycat.je.rep.InsufficientReplicasException; import com.sleepycat.je.rep.NodeState; import com.sleepycat.je.rep.ReplicatedEnvironment; @@ -183,7 +185,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase public void testGetDurability() throws Exception { - assertEquals("Unexpected durability", TEST_DURABILITY.toString(), createMaster().getDurability()); + assertEquals("Unexpected durability", TEST_DURABILITY.toString(), createMaster().getDurability().toString()); } public void testIsCoalescingSync() throws Exception @@ -196,7 +198,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase assertEquals("Unexpected state", State.MASTER.name(), createMaster().getNodeState()); } - public void testPriority() throws Exception { ReplicatedEnvironmentFacade facade = createMaster(); @@ -685,6 +686,46 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState()); } + public void testSetLocalTransactionSyncronizationPolicy() throws Exception + { + ReplicatedEnvironmentFacade facade = createMaster(); + assertEquals("Unexpected local transaction synchronization policy before change", + ReplicatedEnvironmentFacade.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getLocalTransactionSyncronizationPolicy()); + facade.setLocalTransactionSyncronizationPolicy(SyncPolicy.WRITE_NO_SYNC); + assertEquals("Unexpected local transaction synchronization policy after change", + SyncPolicy.WRITE_NO_SYNC, facade.getLocalTransactionSyncronizationPolicy()); + } + + public void testSetRemoteTransactionSyncronizationPolicy() throws Exception + { + ReplicatedEnvironmentFacade facade = createMaster(); + assertEquals("Unexpected remote transaction synchronization policy before change", + ReplicatedEnvironmentFacade.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, facade.getRemoteTransactionSyncronizationPolicy()); + facade.setRemoteTransactionSyncronizationPolicy(SyncPolicy.WRITE_NO_SYNC); + assertEquals("Unexpected remote transaction synchronization policy after change", + SyncPolicy.WRITE_NO_SYNC, facade.getRemoteTransactionSyncronizationPolicy()); + } + + public void testBeginTransaction() throws Exception + { + ReplicatedEnvironmentFacade facade = createMaster(); + Transaction txn = null; + try + { + txn = facade.beginTransaction(); + assertNotNull("Transaction is not created", txn); + txn.commit(); + txn = null; + } + finally + { + if (txn != null) + { + txn.abort(); + } + } + } + private ReplicatedEnvironmentFacade createMaster() throws Exception { return createMaster(new NoopReplicationGroupListener()); diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java index 7f8f3ad22c..ac6ca45877 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java @@ -302,20 +302,6 @@ public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase } } - private Map<String, Object> waitForAttributeChanged(String url, String attributeName, Object newValue) throws Exception - { - List<Map<String, Object>> nodeAttributes = getRestTestHelper().getJsonAsList(url); - long limit = System.currentTimeMillis() + 5000; - while(System.currentTimeMillis() < limit && (nodeAttributes.size() == 0 || !newValue.equals(nodeAttributes.get(0).get(attributeName)))) - { - Thread.sleep(100l); - nodeAttributes = getRestTestHelper().getJsonAsList(url); - } - Map<String, Object> nodeData = nodeAttributes.get(0); - assertEquals("Unexpected attribute " + attributeName, newValue, nodeData.get(attributeName)); - return nodeData; - } - private void assertRemoteNodeData(String name, Map<String, Object> nodeData) { assertEquals("Remote node " + name + " has unexpected name", name, nodeData.get(BDBHAVirtualHostNode.NAME)); diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java new file mode 100644 index 0000000000..c120a3d63c --- /dev/null +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java @@ -0,0 +1,117 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY; +import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; +import org.apache.qpid.systest.rest.QpidRestTestCase; +import org.apache.qpid.test.utils.TestBrokerConfiguration; +import org.apache.qpid.util.FileUtils; + +public class BDBHAVirtualHostRestTest extends QpidRestTestCase +{ + private String _hostName; + private File _storeBaseDir; + private int _nodeHaPort; + private Object _nodeName; + private String _virtualhostUrl; + + @Override + public void setUp() throws Exception + { + setTestSystemProperty(ReplicatedEnvironmentFacade.REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, "1000"); + _hostName = "ha"; + _nodeName = "node1"; + _storeBaseDir = new File(TMP_FOLDER, "store-" + _hostName + "-" + System.currentTimeMillis()); + _nodeHaPort = getNextAvailable(getRestTestHelper().getHttpPort() + 1); + _virtualhostUrl = "virtualhost/" + _nodeName + "/" + _hostName; + + super.setUp(); + } + + @Override + public void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + if (_storeBaseDir != null) + { + FileUtils.delete(_storeBaseDir, true); + } + } + } + + @Override + protected void customizeConfiguration() throws IOException + { + super.customizeConfiguration(); + TestBrokerConfiguration config = getBrokerConfiguration(); + config.removeObjectConfiguration(VirtualHostNode.class, TEST2_VIRTUALHOST); + config.removeObjectConfiguration(VirtualHostNode.class, TEST3_VIRTUALHOST); + + Map<String, Object> nodeAttributes = new HashMap<String, Object>(); + nodeAttributes.put(BDBHAVirtualHostNode.NAME, _nodeName); + nodeAttributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + nodeAttributes.put(BDBHAVirtualHostNode.STORE_PATH, _storeBaseDir.getPath() + File.separator + _nodeName); + nodeAttributes.put(BDBHAVirtualHostNode.GROUP_NAME, _hostName); + nodeAttributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + _nodeHaPort); + nodeAttributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, "localhost:" + _nodeHaPort); + config.addObjectConfiguration(VirtualHostNode.class, nodeAttributes); + } + + public void testSetLocalTransactionSynchronizationPolicy() throws Exception + { + assertSetTransactionSynchronizationPolicy(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY); + } + + public void testSetRemoteTransactionSynchronizationPolicy() throws Exception + { + assertSetTransactionSynchronizationPolicy(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY); + } + + private void assertSetTransactionSynchronizationPolicy(String policyAttribute) throws Exception, IOException + { + Map<String, Object> hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name()); + assertEquals("Unexpected synchronization policy before change", "NO_SYNC", hostAttributes.get(policyAttribute)); + + Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(policyAttribute, "SYNC"); + getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, 200); + + hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl); + assertEquals("Unexpected synchronization policy after change", "SYNC", hostAttributes.get(policyAttribute)); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java index 9322f79984..3eba4fad21 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java @@ -22,6 +22,7 @@ package org.apache.qpid.systest.rest; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.qpid.server.management.plugin.HttpManagement; @@ -102,4 +103,18 @@ public class QpidRestTestCase extends QpidBrokerTestCase { return _restTestHelper; } + + public Map<String, Object> waitForAttributeChanged(String url, String attributeName, Object newValue) throws Exception + { + List<Map<String, Object>> nodeAttributes = getRestTestHelper().getJsonAsList(url); + long limit = System.currentTimeMillis() + 5000; + while(System.currentTimeMillis() < limit && (nodeAttributes.size() == 0 || !newValue.equals(nodeAttributes.get(0).get(attributeName)))) + { + Thread.sleep(100l); + nodeAttributes = getRestTestHelper().getJsonAsList(url); + } + Map<String, Object> nodeData = nodeAttributes.get(0); + assertEquals("Unexpected attribute " + attributeName, newValue, nodeData.get(attributeName)); + return nodeData; + } } |
