summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java110
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java5
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java37
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java127
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java61
7 files changed, 175 insertions, 173 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
index f887c8ce36..0f1349527c 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@@ -67,7 +68,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
{
private static final String MUTLI_SYNC = "MUTLI_SYNC";
private static final String DEFAULT_REPLICATION_POLICY =
- MUTLI_SYNC + "," + SyncPolicy.NO_SYNC.name() + "," + ReplicaAckPolicy.SIMPLE_MAJORITY.name();
+ MUTLI_SYNC + "," + SyncPolicy.WRITE_NO_SYNC.name() + "," + ReplicaAckPolicy.SIMPLE_MAJORITY.name();
private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class);
@@ -107,7 +108,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
if(_replicationPolicy.startsWith(MUTLI_SYNC))
{
- _replicationDurability = Durability.parse(_replicationPolicy.replaceFirst(MUTLI_SYNC, SyncPolicy.SYNC.name()));
+ _replicationDurability = Durability.parse(_replicationPolicy.replaceFirst(MUTLI_SYNC, SyncPolicy.WRITE_NO_SYNC.name()));
_localMultiSyncCommits = true;
}
else
@@ -388,11 +389,11 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
activateStoreAsync();
break;
case REPLICA:
- passivateStore();
+ passivateStoreAsync();
break;
case DETACHED:
LOGGER.error("BDB replicated node in detached state, therefore passivating.");
- passivateStore();
+ passivateStoreAsync();
break;
case UNKNOWN:
LOGGER.warn("BDB replicated node in unknown state (hopefully temporarily)");
@@ -403,20 +404,6 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
}
}
- /** synchronously calls passivate. This is acceptable because {@link HAMessageStore#passivate()} is expected to be fast */
- private void passivateStore()
- {
- try
- {
- passivate();
- }
- catch(Exception e)
- {
- LOGGER.error("Unable to passivate", e);
- throw new RuntimeException("Unable to passivate", e);
- }
- }
-
/**
* Calls {@link MessageStore#activate()}.
*
@@ -429,34 +416,93 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
*/
private void activateStoreAsync()
{
+ String threadName = "BDBHANodeActivationThread-" + _name;
+ executeStateChangeAsync(new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ try
+ {
+ activate();
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Failed to activate on hearing MASTER change event",e);
+ throw e;
+ }
+ return null;
+ }
+ }, threadName);
+ }
+
+ /**
+ * Calls {@link #passivate()}.
+ *
+ * <p/>
+ * This is done a background thread, in line with
+ * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because
+ * passivation due to the effect of state change listeners.
+ */
+ private void passivateStoreAsync()
+ {
+ String threadName = "BDBHANodePassivationThread-" + _name;
+ executeStateChangeAsync(new Callable<Void>()
+ {
+
+ @Override
+ public Void call() throws Exception
+ {
+ try
+ {
+ passivate();
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event",e);
+ throw e;
+ }
+ return null;
+ }
+ }, threadName);
+ }
+
+ private void executeStateChangeAsync(final Callable<Void> callable, final String threadName)
+ {
final RootMessageLogger _rootLogger = CurrentActor.get().getRootMessageLogger();
_executor.execute(new Runnable()
{
- private static final String _THREAD_NAME = "BDBHANodeActivationThread";
@Override
public void run()
{
- Thread.currentThread().setName(_THREAD_NAME);
- CurrentActor.set(new AbstractActor(_rootLogger)
+ final String originalThreadName = Thread.currentThread().getName();
+ Thread.currentThread().setName(threadName);
+ try
{
- @Override
- public String getLogMessage()
+ CurrentActor.set(new AbstractActor(_rootLogger)
+ {
+ @Override
+ public String getLogMessage()
+ {
+ return threadName;
+ }
+ });
+
+ try
{
- return _THREAD_NAME;
+ callable.call();
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Exception during state change", e);
}
- });
-
- try
- {
- activate();
}
- catch (Exception e)
+ finally
{
- LOGGER.error("Failed to activate on hearing MASTER change event",e);
+ Thread.currentThread().setName(originalThreadName);
}
-
}
});
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
index 900f528b76..6c5c51a355 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
@@ -76,7 +76,7 @@ public class HAClusterBlackboxTest extends QpidBrokerTestCase
// Don't start default broker provided by QBTC.
}
- public void testLossOfActiveNodeCausesClientToFailover() throws Exception
+ public void testLossOfMasterNodeCausesClientToFailover() throws Exception
{
final Connection connection = getConnection(_brokerFailoverUrl);
@@ -93,10 +93,11 @@ public class HAClusterBlackboxTest extends QpidBrokerTestCase
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
- public void testLossOfInactiveNodeDoesNotCauseClientToFailover() throws Exception
+ public void testLossOfReplicaNodeDoesNotCauseClientToFailover() throws Exception
{
LOGGER.info("Connecting to " + _brokerFailoverUrl);
final Connection connection = getConnection(_brokerFailoverUrl);
+ LOGGER.info("Got connection to cluster");
((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener);
final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection);
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
index f694de61f2..12e281cd65 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
@@ -22,8 +22,6 @@ package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
import java.io.IOException;
import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
@@ -53,8 +51,6 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase
private final int NUMBER_OF_NODES = 3;
private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
- // TODO Factory refactoring?? // MessageStore construction??
-
@Override
protected void setUp() throws Exception
{
@@ -86,7 +82,7 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase
{
try
{
- getConnection(_clusterCreator.getConnectionUrlForSingleNode(brokerPortNumber));
+ getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithoutRetry(brokerPortNumber));
connectionSuccesses++;
}
catch(JMSException e)
@@ -105,7 +101,7 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase
final Connection initialConnection = getConnectionToNodeInCluster();
assertNotNull(initialConnection);
- killConnectionBrokerAndWaitForNewMasterElection(initialConnection);
+ killConnectionBroker(initialConnection);
final Connection subsequentConnection = getConnectionToNodeInCluster();
assertNotNull(subsequentConnection);
@@ -121,7 +117,7 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase
final Connection initialConnection = getConnectionToNodeInCluster();
assertNotNull(initialConnection);
- killConnectionBrokerAndWaitForNewMasterElection(initialConnection);
+ killConnectionBroker(initialConnection);
final Connection subsequentConnection = getConnectionToNodeInCluster();
assertNotNull(subsequentConnection);
@@ -159,7 +155,7 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase
populateBrokerWithData(initialConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl);
- killConnectionBrokerAndWaitForNewMasterElection(initialConnection);
+ killConnectionBroker(initialConnection);
final Connection subsequentConnection = getConnectionToNodeInCluster();
@@ -168,7 +164,7 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase
checkBrokerData(subsequentConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl);
}
- public void testRecoveryOfOutOfDateNode() throws Exception
+ public void xtestRecoveryOfOutOfDateNode() throws Exception
{
/*
* TODO: Implement
@@ -220,7 +216,7 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase
{
try
{
- connection = getConnection(_clusterCreator.getConnectionUrlForSingleNode(brokerPortNumber));
+ connection = getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithRetry(brokerPortNumber));
break;
}
catch(JMSException je)
@@ -231,26 +227,11 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase
return connection;
}
- private void killConnectionBrokerAndWaitForNewMasterElection(final Connection initialConnection) throws IOException,
+ private void killConnectionBroker(final Connection initialConnection) throws IOException,
InterruptedException
{
- try
- {
- // NewMasterEvent is received twice: first for the existing master,
- // second for a new master
- CountDownLatch newMasterLatch = new CountDownLatch(2);
- _clusterCreator.startMonitorNode();
- _clusterCreator.statListeningForNewMasterEvent(newMasterLatch);
-
- final int initialPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection);
- killBroker(initialPortNumber);
-
- assertTrue("New master was not elected", newMasterLatch.await(30, TimeUnit.SECONDS));
- }
- finally
- {
- _clusterCreator.shutdownMonitor();
- }
+ final int initialPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection);
+ killBroker(initialPortNumber); // kill awaits the death of the child
}
private void assertProducingConsuming(final Connection connection) throws JMSException, Exception
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
index 43cfa5f4d5..a47597942b 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
@@ -19,7 +19,6 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
@@ -32,7 +31,6 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -48,26 +46,22 @@ import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.url.URLSyntaxException;
-import com.sleepycat.je.rep.ReplicationNode;
-import com.sleepycat.je.rep.monitor.GroupChangeEvent;
-import com.sleepycat.je.rep.monitor.JoinGroupEvent;
-import com.sleepycat.je.rep.monitor.LeaveGroupEvent;
-import com.sleepycat.je.rep.monitor.Monitor;
-import com.sleepycat.je.rep.monitor.MonitorChangeListener;
-import com.sleepycat.je.rep.monitor.MonitorConfig;
-import com.sleepycat.je.rep.monitor.NewMasterEvent;
-
public class HATestClusterCreator
{
protected static final Logger LOGGER = Logger.getLogger(HATestClusterCreator.class);
private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''";
private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'";
- private static final String SINGLE_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''";
- private static final int CYCLECOUNT = 2;
- private static final int RETRIES = 2;
- private static final int CONNECTDELAY = 1000;
+ private static final int FAILOVER_CYCLECOUNT = 2;
+ private static final int FAILOVER_RETRIES = 2;
+ private static final int FAILOVER_CONNECTDELAY = 1000;
+
+ private static final String SINGLE_BROKER_URL_WITH_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''";
+ private static final String SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d'";
+
+ private static final int RETRIES = 30;
+ private static final int CONNECTDELAY = 75;
private final QpidBrokerTestCase _testcase;
private final Map<Integer, Integer> _brokerPortToBdbPortMap = new HashMap<Integer, Integer>();
@@ -80,7 +74,6 @@ public class HATestClusterCreator
private final int _numberOfNodes;
private int _bdbHelperPort;
private int _primaryBrokerPort;
- private Monitor _monitor;
public HATestClusterCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes)
{
@@ -216,7 +209,6 @@ public class HATestClusterCreator
public void stopCluster() throws Exception
{
- shutdownMonitor();
for (final Integer brokerPortNumber : _brokerConfigurations.keySet())
{
try
@@ -265,19 +257,38 @@ public class HATestClusterCreator
{
int brokerPortNumber = itr.next();
- brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, CONNECTDELAY, RETRIES));
+ brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, FAILOVER_CONNECTDELAY, FAILOVER_RETRIES));
if (itr.hasNext())
{
brokerList.append(";");
}
}
- return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, CYCLECOUNT));
+ return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, FAILOVER_CYCLECOUNT));
+ }
+
+ public AMQConnectionURL getConnectionUrlForSingleNodeWithoutRetry(final int brokerPortNumber) throws URLSyntaxException
+ {
+ return getConnectionUrlForSingleNode(brokerPortNumber, false);
+ }
+
+ public AMQConnectionURL getConnectionUrlForSingleNodeWithRetry(final int brokerPortNumber) throws URLSyntaxException
+ {
+ return getConnectionUrlForSingleNode(brokerPortNumber, true);
}
- public AMQConnectionURL getConnectionUrlForSingleNode(final int brokerPortNumber) throws URLSyntaxException
+ private AMQConnectionURL getConnectionUrlForSingleNode(final int brokerPortNumber, boolean retryAllowed) throws URLSyntaxException
{
- String url = String.format(SINGLE_BROKER_URL_FORMAT, _virtualHostName, brokerPortNumber, CONNECTDELAY, RETRIES);
+ final String url;
+ if (retryAllowed)
+ {
+ url = String.format(SINGLE_BROKER_URL_WITH_RETRY_FORMAT, _virtualHostName, brokerPortNumber, CONNECTDELAY, RETRIES);
+ }
+ else
+ {
+ url = String.format(SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT, _virtualHostName, brokerPortNumber);
+ }
+
return new AMQConnectionURL(url);
}
@@ -343,7 +354,6 @@ public class HATestClusterCreator
_testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.nodeName", nodeName);
_testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort));
_testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.helperHostPort", getHelperHostPort());
- // TODO replication policy
}
public String getIpAddressOfBrokerHost()
@@ -403,77 +413,4 @@ public class HATestClusterCreator
virtualHostConfig.setProperty(configKey, newBdbHostPort);
collectConfig(brokerPortNumberToBeMoved, brokerConfigHolder.getTestConfiguration(), virtualHostConfig);
}
-
- public void startMonitorNode()
- {
- shutdownMonitor();
-
- MonitorConfig config = new MonitorConfig();
- config.setGroupName(_groupName);
- int monitorPort = _testcase.findFreePort();
- config.setNodeName(getNodeNameForNodeAt(monitorPort));
- config.setNodeHostPort("" + monitorPort);
- config.setHelperHosts(getHelperHostPort());
-
- _monitor = new Monitor(config);
-
- ReplicationNode currentMaster = _monitor.register();
- LOGGER.info("Current master " + currentMaster.getName());
- }
-
- public void startListening(MonitorChangeListener listener) throws IOException
- {
- _monitor.startListener(listener);
- }
-
- public void statListeningForNewMasterEvent(final CountDownLatch latch) throws IOException
- {
- startListening(new MonitorChangeListenerSupport(){
- @Override
- public void notify(NewMasterEvent newMasterEvent)
- {
- LOGGER.debug("New master is elected " + newMasterEvent.getMasterName());
- latch.countDown();
- }
- });
- }
-
- public void shutdownMonitor()
- {
- if (_monitor != null)
- {
- try
- {
- _monitor.shutdown();
- }
- catch (Exception e)
- {
- LOGGER.warn("Monitor shutdown error:", e);
- }
- }
- }
-
- public static class MonitorChangeListenerSupport implements MonitorChangeListener
- {
-
- @Override
- public void notify(NewMasterEvent newMasterEvent)
- {
- }
-
- @Override
- public void notify(GroupChangeEvent groupChangeEvent)
- {
- }
-
- @Override
- public void notify(JoinGroupEvent joinGroupEvent)
- {
- }
-
- @Override
- public void notify(LeaveGroupEvent leaveGroupEvent)
- {
- }
- }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties
index 3e640c7929..5695026cbc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties
@@ -23,4 +23,6 @@ CREATED = VHT-1001 : Created : {0}
CLOSED = VHT-1002 : Closed
STATS_DATA = VHT-1003 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} kB/s peak : {3,number,#} bytes total
-STATS_MSGS = VHT-1004 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} msg/s peak : {3,number,#} msgs total` \ No newline at end of file
+STATS_MSGS = VHT-1004 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} msg/s peak : {3,number,#} msgs total
+
+ERRORED = VHT-1005 : Unexpected fatal error \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java
index fb50b3e289..55e2539dcf 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java
@@ -25,5 +25,7 @@ public enum State
INITIALISING,
ACTIVE,
PASSIVE,
- STOPPED
+ STOPPED,
+ /** Terminal state that signifies the virtual host has experienced an unexpected condition. */
+ ERRORED
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index 5a14092930..5a56fe1765 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -122,7 +122,7 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
private final MessageStore _messageStore;
- private State _state = State.INITIALISING;
+ private volatile State _state = State.INITIALISING;
private boolean _statisticsEnabled = false;
@@ -824,17 +824,25 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
@Override
public void event(Event event)
{
- initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod());
+ State finalState = State.ERRORED;
try
{
- _brokerMBean.register();
+ initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod());
+ try
+ {
+ _brokerMBean.register();
+ }
+ catch (JMException e)
+ {
+ throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e);
+ }
+ finalState = State.ACTIVE;
}
- catch (JMException e)
+ finally
{
- throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e);
+ _state = finalState;
+ reportIfError(_state);
}
-
- _state = State.ACTIVE;
}
}
@@ -842,16 +850,33 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
{
public void event(Event event)
{
- _connectionRegistry.close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT);
- _brokerMBean.unregister();
- removeHouseKeepingTasks();
+ State finalState = State.ERRORED;
+
+ try
+ {
+ /* the approach here is not ideal as there is a race condition where a
+ * queue etc could be created while the virtual host is on the way to
+ * the passivated state. However the store state change from MASTER to UNKNOWN
+ * is documented as exceptionally rare..
+ */
+
+ _connectionRegistry.close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT);
+ _brokerMBean.unregister();
+ removeHouseKeepingTasks();
- _queueRegistry.stopAllAndUnregisterMBeans();
- _exchangeRegistry.clearAndUnregisterMbeans();
- _dtxRegistry.close();
+ _queueRegistry.stopAllAndUnregisterMBeans();
+ _exchangeRegistry.clearAndUnregisterMbeans();
+ _dtxRegistry.close();
- _state = State.PASSIVE;
+ finalState = State.PASSIVE;
+ }
+ finally
+ {
+ _state = finalState;
+ reportIfError(_state);
+ }
}
+
}
private final class BeforeCloseListener implements EventListener
@@ -864,6 +889,14 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
}
}
+ private void reportIfError(State state)
+ {
+ if (state == State.ERRORED)
+ {
+ CurrentActor.get().message(VirtualHostMessages.ERRORED());
+ }
+ }
+
private class VirtualHostHouseKeepingTask extends HouseKeepingTask
{
public VirtualHostHouseKeepingTask()