summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java69
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java45
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java44
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java13
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java48
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java23
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java2
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java3
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java10
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java2
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java27
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java12
14 files changed, 237 insertions, 75 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 789d5714c8..8e81e9a7b0 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -30,6 +30,8 @@ import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.ExceptionEvent;
+import com.sleepycat.je.ExceptionListener;
import com.sleepycat.je.LockConflictException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
@@ -83,6 +85,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore
public static final int VERSION = 6;
+ private static final Map<String, String> ENVCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
+ {{
+ put(EnvironmentConfig.LOCK_N_LOCK_TABLES, "7");
+ }});
+
private Environment _environment;
private String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS";
@@ -168,9 +175,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore
_configRecoveryHandler = recoveryHandler;
configure(name, storeConfiguration);
-
-
-
}
public void configureMessageStore(String name,
@@ -200,7 +204,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore
return new BDBTransaction();
}
-
/**
* Called after instantiation in order to configure the message store.
*
@@ -233,18 +236,21 @@ public abstract class AbstractBDBMessageStore implements MessageStore
_storeLocation = storeLocation;
- _envConfigMap = getConfigMap(storeConfig, "envConfig");
+ _envConfigMap = getConfigMap(ENVCONFIG_DEFAULTS, storeConfig, "envConfig");
LOGGER.info("Configuring BDB message store");
setupStore(environmentPath, name);
}
- protected Map<String,String> getConfigMap(Configuration config, String prefix) throws ConfigurationException
+ protected Map<String,String> getConfigMap(Map<String, String> defaultConfig, Configuration config, String prefix) throws ConfigurationException
{
final List<Object> argumentNames = config.getList(prefix + ".name");
final List<Object> argumentValues = config.getList(prefix + ".value");
- final Map<String,String> attributes = new HashMap<String,String>(argumentNames.size());
+ final int initialSize = argumentNames.size() + defaultConfig.size();
+
+ final Map<String,String> attributes = new HashMap<String,String>(initialSize);
+ attributes.putAll(defaultConfig);
for (int i = 0; i < argumentNames.size(); i++)
{
@@ -390,8 +396,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore
// Clean the log before closing. This makes sure it doesn't contain
// redundant data. Closing without doing this means the cleaner may not
// get a chance to finish.
- _environment.cleanLog();
- _environment.close();
+ try
+ {
+ _environment.cleanLog();
+ }
+ finally
+ {
+ _environment.close();
+ }
}
}
@@ -1757,7 +1769,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
catch (DatabaseException e)
{
- throw new RuntimeException(e);
+ LOGGER.error("Exception during transaction begin, closing store environment.", e);
+ closeEnvironmentSafely();
+
+ throw new RuntimeException("Exception during transaction begin, store environment closed.", e);
}
}
@@ -1902,10 +1917,38 @@ public abstract class AbstractBDBMessageStore implements MessageStore
EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(true);
envConfig.setTransactional(true);
- envConfig.setConfigParam(EnvironmentConfig.LOCK_N_LOCK_TABLES, "7");
-
+
setEnvironmentConfigProperties(envConfig);
-
+
+ envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
+
return envConfig;
}
+
+ protected void closeEnvironmentSafely()
+ {
+ try
+ {
+ _environment.close();
+ }
+ catch (DatabaseException ex)
+ {
+ LOGGER.error("Exception closing store environment", ex);
+ }
+ catch (IllegalStateException ex)
+ {
+ LOGGER.error("Exception closing store environment", ex);
+ }
+ }
+
+
+ private class LoggingAsyncExceptionListener implements ExceptionListener
+ {
+ @Override
+ public void exceptionThrown(ExceptionEvent event)
+ {
+ LOGGER.error("Asynchronous exception thrown by BDB thread '"
+ + event.getThreadName() + "'", event.getException());
+ }
+ }
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
index 38cee1dd3a..81ef6b285e 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -75,6 +76,31 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
+ private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
+ {{
+ /**
+ * Parameter decreased as the 24h default may lead very large log files for most users.
+ */
+ put(ReplicationConfig.REP_STREAM_TIMEOUT, "1 h");
+ /**
+ * Parameter increased as the 5 s default may lead to spurious timeouts.
+ */
+ put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "15 s");
+ /**
+ * Parameter increased as the 10 s default may lead to spurious timeouts.
+ */
+ put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "20 s");
+ /**
+ * Parameter increased as the 10 h default may cause user confusion.
+ */
+ put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min");
+ /**
+ * Parameter changed from default true so we adopt immediately adopt the new behaviour early. False
+ * is scheduled to become default after JE 5.0.48.
+ */
+ put(ReplicationConfig.PROTOCOL_OLD_STRING_ENCODING, Boolean.FALSE.toString());
+ }});
+
private String _groupName;
private String _nodeName;
private String _nodeHostPort;
@@ -89,7 +115,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
private CommitThreadWrapper _commitThreadWrapper;
private boolean _localMultiSyncCommits;
private boolean _autoDesignatedPrimary;
- private Map<String, String> _repConfigMap;
+ private Map<String, String> _repConfig;
@Override
public void configure(String name, Configuration storeConfig) throws Exception
@@ -116,7 +142,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
_localMultiSyncCommits = false;
}
- _repConfigMap = getConfigMap(storeConfig, "repConfig");
+ _repConfig = getConfigMap(REPCONFIG_DEFAULTS, storeConfig, "repConfig");
_managedObject = new BDBHAMessageStoreManagerMBean(this);
_managedObject.register();
@@ -337,7 +363,18 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
{
// Using commit() instead of commitNoSync() for the HA store to allow
// the HA durability configuration to influence resulting behaviour.
- tx.commit();
+ try
+ {
+ tx.commit();
+ }
+ catch (DatabaseException de)
+ {
+ LOGGER.error("Got DatabaseException on commit, closing environment", de);
+
+ closeEnvironmentSafely();
+
+ throw de;
+ }
if(_localMultiSyncCommits)
{
@@ -401,7 +438,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
private void setReplicationConfigProperties(ReplicationConfig replicationConfig)
{
- for (Map.Entry<String, String> configItem : _repConfigMap.entrySet())
+ for (Map.Entry<String, String> configItem : _repConfig.entrySet())
{
if (LOGGER.isDebugEnabled())
{
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java
index cd4e990607..731b7144f9 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java
@@ -83,45 +83,69 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
}
@Override
- public String getGroupName() throws IOException
+ public String getGroupName()
{
return _store.getGroupName();
}
@Override
- public String getNodeName() throws IOException
+ public String getNodeName()
{
return _store.getNodeName();
}
@Override
- public String getNodeHostPort() throws IOException
+ public String getNodeHostPort()
{
return _store.getNodeHostPort();
}
@Override
- public String getHelperHostPort() throws IOException
+ public String getHelperHostPort()
{
return _store.getHelperHostPort();
}
@Override
- public String getReplicationPolicy() throws IOException
+ public String getReplicationPolicy() throws IOException, JMException
{
- return _store.getReplicationPolicy();
+ try
+ {
+ return _store.getReplicationPolicy();
+ }
+ catch (RuntimeException e)
+ {
+ LOGGER.debug("Failed query replication policy", e);
+ throw new JMException(e.getMessage());
+ }
}
@Override
- public String getNodeState() throws IOException
+ public String getNodeState() throws IOException, JMException
{
- return _store.getNodeState();
+ try
+ {
+ return _store.getNodeState();
+ }
+ catch (RuntimeException e)
+ {
+ LOGGER.debug("Failed query node state", e);
+ throw new JMException(e.getMessage());
+ }
}
@Override
- public boolean getDesignatedPrimary() throws IOException
+ public boolean getDesignatedPrimary() throws IOException, JMException
{
- return _store.isDesignatedPrimary();
+ try
+ {
+ return _store.isDesignatedPrimary();
+ }
+ catch (RuntimeException e)
+ {
+ LOGGER.debug("Failed query designated primary", e);
+ throw new JMException(e.getMessage());
+ }
}
@Override
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 7c29e281d9..d5bf5374bc 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -88,7 +88,18 @@ public class BDBMessageStore extends AbstractBDBMessageStore
@Override
protected StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException
{
- tx.commitNoSync();
+ try
+ {
+ tx.commitNoSync();
+ }
+ catch(DatabaseException de)
+ {
+ LOGGER.error("Got DatabaseException on commit, closing environment", de);
+
+ closeEnvironmentSafely();
+
+ throw de;
+ }
return _commitThreadWrapper.commit(tx, syncCommit);
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
index 5b0abbec93..fe1556b5a6 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.store.berkeleydb;
import java.util.Queue;
@@ -110,7 +130,6 @@ public class CommitThreadWrapper
}
catch (InterruptedException e)
{
- //TODO Should we ignore, or throw a 'StoreException'?
throw new RuntimeException(e);
}
}
@@ -127,6 +146,8 @@ public class CommitThreadWrapper
*/
private static class CommitThread extends Thread
{
+ private static final Logger LOGGER = Logger.getLogger(CommitThread.class);
+
private final AtomicBoolean _stopped = new AtomicBoolean(false);
private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
private final CheckpointConfig _config = new CheckpointConfig();
@@ -188,13 +209,30 @@ public class CommitThreadWrapper
}
catch (DatabaseException e)
{
- for(int i = 0; i < size; i++)
+ try
{
- BDBCommitFuture commit = _jobQueue.poll();
- commit.abort(e);
+ LOGGER.error("Exception during environment log flush", e);
+
+ for(int i = 0; i < size; i++)
+ {
+ BDBCommitFuture commit = _jobQueue.poll();
+ commit.abort(e);
+ }
}
- }
+ finally
+ {
+ LOGGER.error("Closing store environment", e);
+ try
+ {
+ _environment.close();
+ }
+ catch (DatabaseException ex)
+ {
+ LOGGER.error("Exception closing store environment", ex);
+ }
+ }
+ }
}
private boolean hasJobs()
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java
index bfc7bbf128..5e45335dad 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java
@@ -25,6 +25,7 @@ import javax.management.JMException;
import javax.management.openmbean.TabularData;
import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParameter;
import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation;
public interface ManagedBDBHAMessageStore
@@ -40,36 +41,38 @@ public interface ManagedBDBHAMessageStore
public static final String ATTR_DESIGNATED_PRIMARY = "DesignatedPrimary";
@MBeanAttribute(name=ATTR_GROUP_NAME, description="Name identifying the group")
- String getGroupName() throws IOException;
+ String getGroupName() throws IOException, JMException;
@MBeanAttribute(name=ATTR_NODE_NAME, description="Unique name identifying the node within the group")
- String getNodeName() throws IOException;
+ String getNodeName() throws IOException, JMException;
@MBeanAttribute(name=ATTR_NODE_HOST_PORT, description="Host/port used to replicate data between this node and others in the group")
- String getNodeHostPort() throws IOException;
+ String getNodeHostPort() throws IOException, JMException;
@MBeanAttribute(name=ATTR_NODE_STATE, description="Current state of this node")
- String getNodeState() throws IOException;
+ String getNodeState() throws IOException, JMException;
@MBeanAttribute(name=ATTR_HELPER_HOST_PORT, description="Host/port used to allow a new node to discover other group members")
- String getHelperHostPort() throws IOException;
+ String getHelperHostPort() throws IOException, JMException;
@MBeanAttribute(name=ATTR_REPLICATION_POLICY, description="Replication policy")
- String getReplicationPolicy() throws IOException;
+ String getReplicationPolicy() throws IOException, JMException;
@MBeanAttribute(name=ATTR_DESIGNATED_PRIMARY, description="Designated primary flag. Applicable to the two node case.")
- boolean getDesignatedPrimary() throws IOException;
+ boolean getDesignatedPrimary() throws IOException, JMException;
@MBeanOperation(name="getAllNodesInGroup", description="Get all nodes within the group, regardless of whether currently attached or not")
TabularData getAllNodesInGroup() throws IOException, JMException;
@MBeanOperation(name="removeNodeFromGroup", description="Remove an existing node from the group")
- void removeNodeFromGroup(String nodeName) throws JMException;
+ void removeNodeFromGroup(@MBeanOperationParameter(name="nodeName", description="name of node")String nodeName) throws JMException;
@MBeanOperation(name="setDesignatedPrimary", description="Set/unset this node as the designated primary for the group. Applicable to the two node case.")
- void setDesignatedPrimary(boolean primary) throws JMException;
+ void setDesignatedPrimary(@MBeanOperationParameter(name="primary", description="designated primary")boolean primary) throws JMException;
@MBeanOperation(name="updateAddress", description="Update the address of another node. The node must be in a STOPPED state.")
- void updateAddress(String nodeName, String newHostName, int newPort) throws JMException;
+ void updateAddress(@MBeanOperationParameter(name="nodeName", description="name of node")String nodeName,
+ @MBeanOperationParameter(name="newHostName", description="new hostname")String newHostName,
+ @MBeanOperationParameter(name="newPort", description="new port number")int newPort) throws JMException;
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
index 6c5c51a355..c6a9ba8f8b 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
@@ -32,6 +32,8 @@ import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import com.sleepycat.je.rep.ReplicationConfig;
+
/**
* The HA black box tests test the BDB cluster as a opaque unit. Client connects to
* the cluster via a failover url
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
index 1afa45fd5a..b01f169715 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
@@ -64,9 +64,6 @@ public class HAClusterManagementTest extends QpidBrokerTestCase
protected void setUp() throws Exception
{
_brokerType = BrokerType.SPAWNED;
-
- assertTrue(isJavaBroker());
- assertTrue(isBrokerStorePersistent());
_jmxUtils.setUp();
_clusterCreator.configureClusterNodes();
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
index 2fdba5a7cf..4507ccc282 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
@@ -83,13 +83,13 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase
{
setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties");
- String vhostPrefix = "virtualhosts.virtualhost." + VIRTUAL_HOST;
+ String storeConfigKeyPrefix = _clusterCreator.getStoreConfigKeyPrefix();
- setConfigurationProperty(vhostPrefix + ".store.repConfig(0).name", ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT);
- setConfigurationProperty(vhostPrefix + ".store.repConfig(0).value", "2 s");
+ setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).name", ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT);
+ setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).value", "2 s");
- setConfigurationProperty(vhostPrefix + ".store.repConfig(1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES);
- setConfigurationProperty(vhostPrefix + ".store.repConfig(1).value", "0");
+ setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES);
+ setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).value", "0");
_clusterCreator.configureClusterNodes();
_clusterCreator.setAutoDesignatedPrimary(autoDesignedPrimary);
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
index 12e281cd65..88f79e8823 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
@@ -35,6 +35,8 @@ import org.apache.log4j.Logger;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.url.URLSyntaxException;
+import com.sleepycat.je.rep.ReplicationConfig;
+
/**
* The HA white box tests test the BDB cluster where the test retains the knowledge of the
* individual test nodes. It uses this knowledge to examine the nodes to ensure that they
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
index a47597942b..97f69a3f83 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
@@ -53,8 +53,8 @@ public class HATestClusterCreator
private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''";
private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'";
- private static final int FAILOVER_CYCLECOUNT = 2;
- private static final int FAILOVER_RETRIES = 2;
+ private static final int FAILOVER_CYCLECOUNT = 5;
+ private static final int FAILOVER_RETRIES = 1;
private static final int FAILOVER_CONNECTDELAY = 1000;
private static final String SINGLE_BROKER_URL_WITH_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''";
@@ -67,7 +67,7 @@ public class HATestClusterCreator
private final Map<Integer, Integer> _brokerPortToBdbPortMap = new HashMap<Integer, Integer>();
private final Map<Integer, BrokerConfigHolder> _brokerConfigurations = new TreeMap<Integer, BrokerConfigHolder>();
private final String _virtualHostName;
- private final String _configKeyPrefix;
+ private final String _storeConfigKeyPrefix;
private final String _ipAddressOfBroker;
private final String _groupName ;
@@ -82,7 +82,7 @@ public class HATestClusterCreator
_groupName = "group" + _testcase.getName();
_ipAddressOfBroker = getIpAddressOfBrokerHost();
_numberOfNodes = numberOfNodes;
- _configKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + ".store.";
+ _storeConfigKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + ".store.";
_bdbHelperPort = 0;
}
@@ -127,7 +127,7 @@ public class HATestClusterCreator
*/
private String getConfigKey(String configKeySuffix)
{
- final String configKey = StringUtils.substringAfter(_configKeyPrefix + configKeySuffix, "virtualhosts.");
+ final String configKey = StringUtils.substringAfter(_storeConfigKeyPrefix + configKeySuffix, "virtualhosts.");
return configKey;
}
@@ -348,12 +348,12 @@ public class HATestClusterCreator
{
final String nodeName = getNodeNameForNodeAt(bdbPort);
- _testcase.setConfigurationProperty(_configKeyPrefix + "class", "org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore");
+ _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "class", "org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore");
- _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.groupName", _groupName);
- _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.nodeName", nodeName);
- _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort));
- _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.helperHostPort", getHelperHostPort());
+ _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "highAvailability.groupName", _groupName);
+ _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "highAvailability.nodeName", nodeName);
+ _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort));
+ _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "highAvailability.helperHostPort", getHelperHostPort());
}
public String getIpAddressOfBrokerHost()
@@ -413,4 +413,11 @@ public class HATestClusterCreator
virtualHostConfig.setProperty(configKey, newBdbHostPort);
collectConfig(brokerPortNumberToBeMoved, brokerConfigHolder.getTestConfiguration(), virtualHostConfig);
}
+
+ public String getStoreConfigKeyPrefix()
+ {
+ return _storeConfigKeyPrefix;
+ }
+
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
index 09dc5a2473..a539743081 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -22,11 +22,9 @@ package org.apache.qpid.server.connection;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
import org.apache.qpid.common.Closeable;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.transport.TransportException;
import java.util.ArrayList;
import java.util.Collection;
@@ -65,19 +63,15 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable
}
}
- public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message)
+ private void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message)
{
try
{
connection.close(cause, message);
}
- catch (TransportException e)
+ catch (Exception e)
{
- _logger.warn("Error closing connection:" + e.getMessage());
- }
- catch (AMQException e)
- {
- _logger.warn("Error closing connection:" + e.getMessage());
+ _logger.warn("Exception closing connection", e);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
index 76d97e3ad1..82adcf4dde 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
@@ -37,8 +37,6 @@ public interface IConnectionRegistry
public void close(String replyText) throws AMQException;
- public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message);
-
public List<AMQConnectionModel> getConnections();
public void registerConnection(AMQConnectionModel connnection);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index c0a8f633fd..e12c6fa271 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -875,12 +875,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
markChannelAwaitingCloseOk(channelId);
closeSession();
- _stateManager.changeState(AMQState.CONNECTION_CLOSING);
- writeFrame(e.getCloseFrame(channelId));
}
finally
{
- closeProtocolSession();
+ try
+ {
+ _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ writeFrame(e.getCloseFrame(channelId));
+ }
+ finally
+ {
+ closeProtocolSession();
+ }
}