diff options
Diffstat (limited to 'qpid/java')
14 files changed, 237 insertions, 75 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index 789d5714c8..8e81e9a7b0 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -30,6 +30,8 @@ import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.ExceptionEvent; +import com.sleepycat.je.ExceptionListener; import com.sleepycat.je.LockConflictException; import com.sleepycat.je.LockMode; import com.sleepycat.je.OperationStatus; @@ -83,6 +85,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore public static final int VERSION = 6; + private static final Map<String, String> ENVCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>() + {{ + put(EnvironmentConfig.LOCK_N_LOCK_TABLES, "7"); + }}); + private Environment _environment; private String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS"; @@ -168,9 +175,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore _configRecoveryHandler = recoveryHandler; configure(name, storeConfiguration); - - - } public void configureMessageStore(String name, @@ -200,7 +204,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore return new BDBTransaction(); } - /** * Called after instantiation in order to configure the message store. * @@ -233,18 +236,21 @@ public abstract class AbstractBDBMessageStore implements MessageStore _storeLocation = storeLocation; - _envConfigMap = getConfigMap(storeConfig, "envConfig"); + _envConfigMap = getConfigMap(ENVCONFIG_DEFAULTS, storeConfig, "envConfig"); LOGGER.info("Configuring BDB message store"); setupStore(environmentPath, name); } - protected Map<String,String> getConfigMap(Configuration config, String prefix) throws ConfigurationException + protected Map<String,String> getConfigMap(Map<String, String> defaultConfig, Configuration config, String prefix) throws ConfigurationException { final List<Object> argumentNames = config.getList(prefix + ".name"); final List<Object> argumentValues = config.getList(prefix + ".value"); - final Map<String,String> attributes = new HashMap<String,String>(argumentNames.size()); + final int initialSize = argumentNames.size() + defaultConfig.size(); + + final Map<String,String> attributes = new HashMap<String,String>(initialSize); + attributes.putAll(defaultConfig); for (int i = 0; i < argumentNames.size(); i++) { @@ -390,8 +396,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore // Clean the log before closing. This makes sure it doesn't contain // redundant data. Closing without doing this means the cleaner may not // get a chance to finish. - _environment.cleanLog(); - _environment.close(); + try + { + _environment.cleanLog(); + } + finally + { + _environment.close(); + } } } @@ -1757,7 +1769,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore } catch (DatabaseException e) { - throw new RuntimeException(e); + LOGGER.error("Exception during transaction begin, closing store environment.", e); + closeEnvironmentSafely(); + + throw new RuntimeException("Exception during transaction begin, store environment closed.", e); } } @@ -1902,10 +1917,38 @@ public abstract class AbstractBDBMessageStore implements MessageStore EnvironmentConfig envConfig = new EnvironmentConfig(); envConfig.setAllowCreate(true); envConfig.setTransactional(true); - envConfig.setConfigParam(EnvironmentConfig.LOCK_N_LOCK_TABLES, "7"); - + setEnvironmentConfigProperties(envConfig); - + + envConfig.setExceptionListener(new LoggingAsyncExceptionListener()); + return envConfig; } + + protected void closeEnvironmentSafely() + { + try + { + _environment.close(); + } + catch (DatabaseException ex) + { + LOGGER.error("Exception closing store environment", ex); + } + catch (IllegalStateException ex) + { + LOGGER.error("Exception closing store environment", ex); + } + } + + + private class LoggingAsyncExceptionListener implements ExceptionListener + { + @Override + public void exceptionThrown(ExceptionEvent event) + { + LOGGER.error("Asynchronous exception thrown by BDB thread '" + + event.getThreadName() + "'", event.getException()); + } + } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java index 38cee1dd3a..81ef6b285e 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.store.berkeleydb; import java.io.File; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -75,6 +76,31 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort"; public static final String GRP_MEM_COL_NODE_NAME = "NodeName"; + private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>() + {{ + /** + * Parameter decreased as the 24h default may lead very large log files for most users. + */ + put(ReplicationConfig.REP_STREAM_TIMEOUT, "1 h"); + /** + * Parameter increased as the 5 s default may lead to spurious timeouts. + */ + put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "15 s"); + /** + * Parameter increased as the 10 s default may lead to spurious timeouts. + */ + put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "20 s"); + /** + * Parameter increased as the 10 h default may cause user confusion. + */ + put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min"); + /** + * Parameter changed from default true so we adopt immediately adopt the new behaviour early. False + * is scheduled to become default after JE 5.0.48. + */ + put(ReplicationConfig.PROTOCOL_OLD_STRING_ENCODING, Boolean.FALSE.toString()); + }}); + private String _groupName; private String _nodeName; private String _nodeHostPort; @@ -89,7 +115,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess private CommitThreadWrapper _commitThreadWrapper; private boolean _localMultiSyncCommits; private boolean _autoDesignatedPrimary; - private Map<String, String> _repConfigMap; + private Map<String, String> _repConfig; @Override public void configure(String name, Configuration storeConfig) throws Exception @@ -116,7 +142,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess _localMultiSyncCommits = false; } - _repConfigMap = getConfigMap(storeConfig, "repConfig"); + _repConfig = getConfigMap(REPCONFIG_DEFAULTS, storeConfig, "repConfig"); _managedObject = new BDBHAMessageStoreManagerMBean(this); _managedObject.register(); @@ -337,7 +363,18 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess { // Using commit() instead of commitNoSync() for the HA store to allow // the HA durability configuration to influence resulting behaviour. - tx.commit(); + try + { + tx.commit(); + } + catch (DatabaseException de) + { + LOGGER.error("Got DatabaseException on commit, closing environment", de); + + closeEnvironmentSafely(); + + throw de; + } if(_localMultiSyncCommits) { @@ -401,7 +438,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess private void setReplicationConfigProperties(ReplicationConfig replicationConfig) { - for (Map.Entry<String, String> configItem : _repConfigMap.entrySet()) + for (Map.Entry<String, String> configItem : _repConfig.entrySet()) { if (LOGGER.isDebugEnabled()) { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java index cd4e990607..731b7144f9 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java @@ -83,45 +83,69 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M } @Override - public String getGroupName() throws IOException + public String getGroupName() { return _store.getGroupName(); } @Override - public String getNodeName() throws IOException + public String getNodeName() { return _store.getNodeName(); } @Override - public String getNodeHostPort() throws IOException + public String getNodeHostPort() { return _store.getNodeHostPort(); } @Override - public String getHelperHostPort() throws IOException + public String getHelperHostPort() { return _store.getHelperHostPort(); } @Override - public String getReplicationPolicy() throws IOException + public String getReplicationPolicy() throws IOException, JMException { - return _store.getReplicationPolicy(); + try + { + return _store.getReplicationPolicy(); + } + catch (RuntimeException e) + { + LOGGER.debug("Failed query replication policy", e); + throw new JMException(e.getMessage()); + } } @Override - public String getNodeState() throws IOException + public String getNodeState() throws IOException, JMException { - return _store.getNodeState(); + try + { + return _store.getNodeState(); + } + catch (RuntimeException e) + { + LOGGER.debug("Failed query node state", e); + throw new JMException(e.getMessage()); + } } @Override - public boolean getDesignatedPrimary() throws IOException + public boolean getDesignatedPrimary() throws IOException, JMException { - return _store.isDesignatedPrimary(); + try + { + return _store.isDesignatedPrimary(); + } + catch (RuntimeException e) + { + LOGGER.debug("Failed query designated primary", e); + throw new JMException(e.getMessage()); + } } @Override diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 7c29e281d9..d5bf5374bc 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -88,7 +88,18 @@ public class BDBMessageStore extends AbstractBDBMessageStore @Override protected StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException { - tx.commitNoSync(); + try + { + tx.commitNoSync(); + } + catch(DatabaseException de) + { + LOGGER.error("Got DatabaseException on commit, closing environment", de); + + closeEnvironmentSafely(); + + throw de; + } return _commitThreadWrapper.commit(tx, syncCommit); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java index 5b0abbec93..fe1556b5a6 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.server.store.berkeleydb; import java.util.Queue; @@ -110,7 +130,6 @@ public class CommitThreadWrapper } catch (InterruptedException e) { - //TODO Should we ignore, or throw a 'StoreException'? throw new RuntimeException(e); } } @@ -127,6 +146,8 @@ public class CommitThreadWrapper */ private static class CommitThread extends Thread { + private static final Logger LOGGER = Logger.getLogger(CommitThread.class); + private final AtomicBoolean _stopped = new AtomicBoolean(false); private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>(); private final CheckpointConfig _config = new CheckpointConfig(); @@ -188,13 +209,30 @@ public class CommitThreadWrapper } catch (DatabaseException e) { - for(int i = 0; i < size; i++) + try { - BDBCommitFuture commit = _jobQueue.poll(); - commit.abort(e); + LOGGER.error("Exception during environment log flush", e); + + for(int i = 0; i < size; i++) + { + BDBCommitFuture commit = _jobQueue.poll(); + commit.abort(e); + } } - } + finally + { + LOGGER.error("Closing store environment", e); + try + { + _environment.close(); + } + catch (DatabaseException ex) + { + LOGGER.error("Exception closing store environment", ex); + } + } + } } private boolean hasJobs() diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java index bfc7bbf128..5e45335dad 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java @@ -25,6 +25,7 @@ import javax.management.JMException; import javax.management.openmbean.TabularData; import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute; +import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParameter; import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation; public interface ManagedBDBHAMessageStore @@ -40,36 +41,38 @@ public interface ManagedBDBHAMessageStore public static final String ATTR_DESIGNATED_PRIMARY = "DesignatedPrimary"; @MBeanAttribute(name=ATTR_GROUP_NAME, description="Name identifying the group") - String getGroupName() throws IOException; + String getGroupName() throws IOException, JMException; @MBeanAttribute(name=ATTR_NODE_NAME, description="Unique name identifying the node within the group") - String getNodeName() throws IOException; + String getNodeName() throws IOException, JMException; @MBeanAttribute(name=ATTR_NODE_HOST_PORT, description="Host/port used to replicate data between this node and others in the group") - String getNodeHostPort() throws IOException; + String getNodeHostPort() throws IOException, JMException; @MBeanAttribute(name=ATTR_NODE_STATE, description="Current state of this node") - String getNodeState() throws IOException; + String getNodeState() throws IOException, JMException; @MBeanAttribute(name=ATTR_HELPER_HOST_PORT, description="Host/port used to allow a new node to discover other group members") - String getHelperHostPort() throws IOException; + String getHelperHostPort() throws IOException, JMException; @MBeanAttribute(name=ATTR_REPLICATION_POLICY, description="Replication policy") - String getReplicationPolicy() throws IOException; + String getReplicationPolicy() throws IOException, JMException; @MBeanAttribute(name=ATTR_DESIGNATED_PRIMARY, description="Designated primary flag. Applicable to the two node case.") - boolean getDesignatedPrimary() throws IOException; + boolean getDesignatedPrimary() throws IOException, JMException; @MBeanOperation(name="getAllNodesInGroup", description="Get all nodes within the group, regardless of whether currently attached or not") TabularData getAllNodesInGroup() throws IOException, JMException; @MBeanOperation(name="removeNodeFromGroup", description="Remove an existing node from the group") - void removeNodeFromGroup(String nodeName) throws JMException; + void removeNodeFromGroup(@MBeanOperationParameter(name="nodeName", description="name of node")String nodeName) throws JMException; @MBeanOperation(name="setDesignatedPrimary", description="Set/unset this node as the designated primary for the group. Applicable to the two node case.") - void setDesignatedPrimary(boolean primary) throws JMException; + void setDesignatedPrimary(@MBeanOperationParameter(name="primary", description="designated primary")boolean primary) throws JMException; @MBeanOperation(name="updateAddress", description="Update the address of another node. The node must be in a STOPPED state.") - void updateAddress(String nodeName, String newHostName, int newPort) throws JMException; + void updateAddress(@MBeanOperationParameter(name="nodeName", description="name of node")String nodeName, + @MBeanOperationParameter(name="newHostName", description="new hostname")String newHostName, + @MBeanOperationParameter(name="newPort", description="new port number")int newPort) throws JMException; } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java index 6c5c51a355..c6a9ba8f8b 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java @@ -32,6 +32,8 @@ import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.test.utils.QpidBrokerTestCase; +import com.sleepycat.je.rep.ReplicationConfig; + /** * The HA black box tests test the BDB cluster as a opaque unit. Client connects to * the cluster via a failover url diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java index 1afa45fd5a..b01f169715 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java @@ -64,9 +64,6 @@ public class HAClusterManagementTest extends QpidBrokerTestCase protected void setUp() throws Exception { _brokerType = BrokerType.SPAWNED; - - assertTrue(isJavaBroker()); - assertTrue(isBrokerStorePersistent()); _jmxUtils.setUp(); _clusterCreator.configureClusterNodes(); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java index 2fdba5a7cf..4507ccc282 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java @@ -83,13 +83,13 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase { setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); - String vhostPrefix = "virtualhosts.virtualhost." + VIRTUAL_HOST; + String storeConfigKeyPrefix = _clusterCreator.getStoreConfigKeyPrefix(); - setConfigurationProperty(vhostPrefix + ".store.repConfig(0).name", ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT); - setConfigurationProperty(vhostPrefix + ".store.repConfig(0).value", "2 s"); + setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).name", ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT); + setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).value", "2 s"); - setConfigurationProperty(vhostPrefix + ".store.repConfig(1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES); - setConfigurationProperty(vhostPrefix + ".store.repConfig(1).value", "0"); + setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES); + setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).value", "0"); _clusterCreator.configureClusterNodes(); _clusterCreator.setAutoDesignatedPrimary(autoDesignedPrimary); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java index 12e281cd65..88f79e8823 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java @@ -35,6 +35,8 @@ import org.apache.log4j.Logger; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.url.URLSyntaxException; +import com.sleepycat.je.rep.ReplicationConfig; + /** * The HA white box tests test the BDB cluster where the test retains the knowledge of the * individual test nodes. It uses this knowledge to examine the nodes to ensure that they diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java index a47597942b..97f69a3f83 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java @@ -53,8 +53,8 @@ public class HATestClusterCreator private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''"; private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'"; - private static final int FAILOVER_CYCLECOUNT = 2; - private static final int FAILOVER_RETRIES = 2; + private static final int FAILOVER_CYCLECOUNT = 5; + private static final int FAILOVER_RETRIES = 1; private static final int FAILOVER_CONNECTDELAY = 1000; private static final String SINGLE_BROKER_URL_WITH_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''"; @@ -67,7 +67,7 @@ public class HATestClusterCreator private final Map<Integer, Integer> _brokerPortToBdbPortMap = new HashMap<Integer, Integer>(); private final Map<Integer, BrokerConfigHolder> _brokerConfigurations = new TreeMap<Integer, BrokerConfigHolder>(); private final String _virtualHostName; - private final String _configKeyPrefix; + private final String _storeConfigKeyPrefix; private final String _ipAddressOfBroker; private final String _groupName ; @@ -82,7 +82,7 @@ public class HATestClusterCreator _groupName = "group" + _testcase.getName(); _ipAddressOfBroker = getIpAddressOfBrokerHost(); _numberOfNodes = numberOfNodes; - _configKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + ".store."; + _storeConfigKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + ".store."; _bdbHelperPort = 0; } @@ -127,7 +127,7 @@ public class HATestClusterCreator */ private String getConfigKey(String configKeySuffix) { - final String configKey = StringUtils.substringAfter(_configKeyPrefix + configKeySuffix, "virtualhosts."); + final String configKey = StringUtils.substringAfter(_storeConfigKeyPrefix + configKeySuffix, "virtualhosts."); return configKey; } @@ -348,12 +348,12 @@ public class HATestClusterCreator { final String nodeName = getNodeNameForNodeAt(bdbPort); - _testcase.setConfigurationProperty(_configKeyPrefix + "class", "org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore"); + _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "class", "org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore"); - _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.groupName", _groupName); - _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.nodeName", nodeName); - _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort)); - _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.helperHostPort", getHelperHostPort()); + _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "highAvailability.groupName", _groupName); + _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "highAvailability.nodeName", nodeName); + _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort)); + _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "highAvailability.helperHostPort", getHelperHostPort()); } public String getIpAddressOfBrokerHost() @@ -413,4 +413,11 @@ public class HATestClusterCreator virtualHostConfig.setProperty(configKey, newBdbHostPort); collectConfig(brokerPortNumberToBeMoved, brokerConfigHolder.getTestConfiguration(), virtualHostConfig); } + + public String getStoreConfigKeyPrefix() + { + return _storeConfigKeyPrefix; + } + + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index 09dc5a2473..a539743081 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -22,11 +22,9 @@ package org.apache.qpid.server.connection; import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; import org.apache.qpid.common.Closeable; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.transport.TransportException; import java.util.ArrayList; import java.util.Collection; @@ -65,19 +63,15 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable } } - public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message) + private void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message) { try { connection.close(cause, message); } - catch (TransportException e) + catch (Exception e) { - _logger.warn("Error closing connection:" + e.getMessage()); - } - catch (AMQException e) - { - _logger.warn("Error closing connection:" + e.getMessage()); + _logger.warn("Exception closing connection", e); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java index 76d97e3ad1..82adcf4dde 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java @@ -37,8 +37,6 @@ public interface IConnectionRegistry public void close(String replyText) throws AMQException; - public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message); - public List<AMQConnectionModel> getConnections(); public void registerConnection(AMQConnectionModel connnection); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index c0a8f633fd..e12c6fa271 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -875,12 +875,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr markChannelAwaitingCloseOk(channelId); closeSession(); - _stateManager.changeState(AMQState.CONNECTION_CLOSING); - writeFrame(e.getCloseFrame(channelId)); } finally { - closeProtocolSession(); + try + { + _stateManager.changeState(AMQState.CONNECTION_CLOSING); + writeFrame(e.getCloseFrame(channelId)); + } + finally + { + closeProtocolSession(); + } } |
