diff options
7 files changed, 175 insertions, 173 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java index f887c8ce36..0f1349527c 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java @@ -27,6 +27,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -67,7 +68,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess { private static final String MUTLI_SYNC = "MUTLI_SYNC"; private static final String DEFAULT_REPLICATION_POLICY = - MUTLI_SYNC + "," + SyncPolicy.NO_SYNC.name() + "," + ReplicaAckPolicy.SIMPLE_MAJORITY.name(); + MUTLI_SYNC + "," + SyncPolicy.WRITE_NO_SYNC.name() + "," + ReplicaAckPolicy.SIMPLE_MAJORITY.name(); private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class); @@ -107,7 +108,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess if(_replicationPolicy.startsWith(MUTLI_SYNC)) { - _replicationDurability = Durability.parse(_replicationPolicy.replaceFirst(MUTLI_SYNC, SyncPolicy.SYNC.name())); + _replicationDurability = Durability.parse(_replicationPolicy.replaceFirst(MUTLI_SYNC, SyncPolicy.WRITE_NO_SYNC.name())); _localMultiSyncCommits = true; } else @@ -388,11 +389,11 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess activateStoreAsync(); break; case REPLICA: - passivateStore(); + passivateStoreAsync(); break; case DETACHED: LOGGER.error("BDB replicated node in detached state, therefore passivating."); - passivateStore(); + passivateStoreAsync(); break; case UNKNOWN: LOGGER.warn("BDB replicated node in unknown state (hopefully temporarily)"); @@ -403,20 +404,6 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess } } - /** synchronously calls passivate. This is acceptable because {@link HAMessageStore#passivate()} is expected to be fast */ - private void passivateStore() - { - try - { - passivate(); - } - catch(Exception e) - { - LOGGER.error("Unable to passivate", e); - throw new RuntimeException("Unable to passivate", e); - } - } - /** * Calls {@link MessageStore#activate()}. * @@ -429,34 +416,93 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess */ private void activateStoreAsync() { + String threadName = "BDBHANodeActivationThread-" + _name; + executeStateChangeAsync(new Callable<Void>() + { + @Override + public Void call() throws Exception + { + try + { + activate(); + } + catch (Exception e) + { + LOGGER.error("Failed to activate on hearing MASTER change event",e); + throw e; + } + return null; + } + }, threadName); + } + + /** + * Calls {@link #passivate()}. + * + * <p/> + * This is done a background thread, in line with + * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because + * passivation due to the effect of state change listeners. + */ + private void passivateStoreAsync() + { + String threadName = "BDBHANodePassivationThread-" + _name; + executeStateChangeAsync(new Callable<Void>() + { + + @Override + public Void call() throws Exception + { + try + { + passivate(); + } + catch (Exception e) + { + LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event",e); + throw e; + } + return null; + } + }, threadName); + } + + private void executeStateChangeAsync(final Callable<Void> callable, final String threadName) + { final RootMessageLogger _rootLogger = CurrentActor.get().getRootMessageLogger(); _executor.execute(new Runnable() { - private static final String _THREAD_NAME = "BDBHANodeActivationThread"; @Override public void run() { - Thread.currentThread().setName(_THREAD_NAME); - CurrentActor.set(new AbstractActor(_rootLogger) + final String originalThreadName = Thread.currentThread().getName(); + Thread.currentThread().setName(threadName); + try { - @Override - public String getLogMessage() + CurrentActor.set(new AbstractActor(_rootLogger) + { + @Override + public String getLogMessage() + { + return threadName; + } + }); + + try { - return _THREAD_NAME; + callable.call(); + } + catch (Exception e) + { + LOGGER.error("Exception during state change", e); } - }); - - try - { - activate(); } - catch (Exception e) + finally { - LOGGER.error("Failed to activate on hearing MASTER change event",e); + Thread.currentThread().setName(originalThreadName); } - } }); } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java index 900f528b76..6c5c51a355 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java @@ -76,7 +76,7 @@ public class HAClusterBlackboxTest extends QpidBrokerTestCase // Don't start default broker provided by QBTC. } - public void testLossOfActiveNodeCausesClientToFailover() throws Exception + public void testLossOfMasterNodeCausesClientToFailover() throws Exception { final Connection connection = getConnection(_brokerFailoverUrl); @@ -93,10 +93,11 @@ public class HAClusterBlackboxTest extends QpidBrokerTestCase connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } - public void testLossOfInactiveNodeDoesNotCauseClientToFailover() throws Exception + public void testLossOfReplicaNodeDoesNotCauseClientToFailover() throws Exception { LOGGER.info("Connecting to " + _brokerFailoverUrl); final Connection connection = getConnection(_brokerFailoverUrl); + LOGGER.info("Got connection to cluster"); ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener); final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java index f694de61f2..12e281cd65 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java @@ -22,8 +22,6 @@ package org.apache.qpid.server.store.berkeleydb; import java.io.File; import java.io.IOException; import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.Destination; @@ -53,8 +51,6 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase private final int NUMBER_OF_NODES = 3; private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); - // TODO Factory refactoring?? // MessageStore construction?? - @Override protected void setUp() throws Exception { @@ -86,7 +82,7 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase { try { - getConnection(_clusterCreator.getConnectionUrlForSingleNode(brokerPortNumber)); + getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithoutRetry(brokerPortNumber)); connectionSuccesses++; } catch(JMSException e) @@ -105,7 +101,7 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase final Connection initialConnection = getConnectionToNodeInCluster(); assertNotNull(initialConnection); - killConnectionBrokerAndWaitForNewMasterElection(initialConnection); + killConnectionBroker(initialConnection); final Connection subsequentConnection = getConnectionToNodeInCluster(); assertNotNull(subsequentConnection); @@ -121,7 +117,7 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase final Connection initialConnection = getConnectionToNodeInCluster(); assertNotNull(initialConnection); - killConnectionBrokerAndWaitForNewMasterElection(initialConnection); + killConnectionBroker(initialConnection); final Connection subsequentConnection = getConnectionToNodeInCluster(); assertNotNull(subsequentConnection); @@ -159,7 +155,7 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase populateBrokerWithData(initialConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl); - killConnectionBrokerAndWaitForNewMasterElection(initialConnection); + killConnectionBroker(initialConnection); final Connection subsequentConnection = getConnectionToNodeInCluster(); @@ -168,7 +164,7 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase checkBrokerData(subsequentConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl); } - public void testRecoveryOfOutOfDateNode() throws Exception + public void xtestRecoveryOfOutOfDateNode() throws Exception { /* * TODO: Implement @@ -220,7 +216,7 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase { try { - connection = getConnection(_clusterCreator.getConnectionUrlForSingleNode(brokerPortNumber)); + connection = getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithRetry(brokerPortNumber)); break; } catch(JMSException je) @@ -231,26 +227,11 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase return connection; } - private void killConnectionBrokerAndWaitForNewMasterElection(final Connection initialConnection) throws IOException, + private void killConnectionBroker(final Connection initialConnection) throws IOException, InterruptedException { - try - { - // NewMasterEvent is received twice: first for the existing master, - // second for a new master - CountDownLatch newMasterLatch = new CountDownLatch(2); - _clusterCreator.startMonitorNode(); - _clusterCreator.statListeningForNewMasterEvent(newMasterLatch); - - final int initialPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection); - killBroker(initialPortNumber); - - assertTrue("New master was not elected", newMasterLatch.await(30, TimeUnit.SECONDS)); - } - finally - { - _clusterCreator.shutdownMonitor(); - } + final int initialPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection); + killBroker(initialPortNumber); // kill awaits the death of the child } private void assertProducingConsuming(final Connection connection) throws JMSException, Exception diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java index 43cfa5f4d5..a47597942b 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java @@ -19,7 +19,6 @@ */ package org.apache.qpid.server.store.berkeleydb; -import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.HashMap; @@ -32,7 +31,6 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -48,26 +46,22 @@ import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.url.URLSyntaxException; -import com.sleepycat.je.rep.ReplicationNode; -import com.sleepycat.je.rep.monitor.GroupChangeEvent; -import com.sleepycat.je.rep.monitor.JoinGroupEvent; -import com.sleepycat.je.rep.monitor.LeaveGroupEvent; -import com.sleepycat.je.rep.monitor.Monitor; -import com.sleepycat.je.rep.monitor.MonitorChangeListener; -import com.sleepycat.je.rep.monitor.MonitorConfig; -import com.sleepycat.je.rep.monitor.NewMasterEvent; - public class HATestClusterCreator { protected static final Logger LOGGER = Logger.getLogger(HATestClusterCreator.class); private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''"; private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'"; - private static final String SINGLE_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''"; - private static final int CYCLECOUNT = 2; - private static final int RETRIES = 2; - private static final int CONNECTDELAY = 1000; + private static final int FAILOVER_CYCLECOUNT = 2; + private static final int FAILOVER_RETRIES = 2; + private static final int FAILOVER_CONNECTDELAY = 1000; + + private static final String SINGLE_BROKER_URL_WITH_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''"; + private static final String SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d'"; + + private static final int RETRIES = 30; + private static final int CONNECTDELAY = 75; private final QpidBrokerTestCase _testcase; private final Map<Integer, Integer> _brokerPortToBdbPortMap = new HashMap<Integer, Integer>(); @@ -80,7 +74,6 @@ public class HATestClusterCreator private final int _numberOfNodes; private int _bdbHelperPort; private int _primaryBrokerPort; - private Monitor _monitor; public HATestClusterCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes) { @@ -216,7 +209,6 @@ public class HATestClusterCreator public void stopCluster() throws Exception { - shutdownMonitor(); for (final Integer brokerPortNumber : _brokerConfigurations.keySet()) { try @@ -265,19 +257,38 @@ public class HATestClusterCreator { int brokerPortNumber = itr.next(); - brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, CONNECTDELAY, RETRIES)); + brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, FAILOVER_CONNECTDELAY, FAILOVER_RETRIES)); if (itr.hasNext()) { brokerList.append(";"); } } - return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, CYCLECOUNT)); + return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, FAILOVER_CYCLECOUNT)); + } + + public AMQConnectionURL getConnectionUrlForSingleNodeWithoutRetry(final int brokerPortNumber) throws URLSyntaxException + { + return getConnectionUrlForSingleNode(brokerPortNumber, false); + } + + public AMQConnectionURL getConnectionUrlForSingleNodeWithRetry(final int brokerPortNumber) throws URLSyntaxException + { + return getConnectionUrlForSingleNode(brokerPortNumber, true); } - public AMQConnectionURL getConnectionUrlForSingleNode(final int brokerPortNumber) throws URLSyntaxException + private AMQConnectionURL getConnectionUrlForSingleNode(final int brokerPortNumber, boolean retryAllowed) throws URLSyntaxException { - String url = String.format(SINGLE_BROKER_URL_FORMAT, _virtualHostName, brokerPortNumber, CONNECTDELAY, RETRIES); + final String url; + if (retryAllowed) + { + url = String.format(SINGLE_BROKER_URL_WITH_RETRY_FORMAT, _virtualHostName, brokerPortNumber, CONNECTDELAY, RETRIES); + } + else + { + url = String.format(SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT, _virtualHostName, brokerPortNumber); + } + return new AMQConnectionURL(url); } @@ -343,7 +354,6 @@ public class HATestClusterCreator _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.nodeName", nodeName); _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort)); _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.helperHostPort", getHelperHostPort()); - // TODO replication policy } public String getIpAddressOfBrokerHost() @@ -403,77 +413,4 @@ public class HATestClusterCreator virtualHostConfig.setProperty(configKey, newBdbHostPort); collectConfig(brokerPortNumberToBeMoved, brokerConfigHolder.getTestConfiguration(), virtualHostConfig); } - - public void startMonitorNode() - { - shutdownMonitor(); - - MonitorConfig config = new MonitorConfig(); - config.setGroupName(_groupName); - int monitorPort = _testcase.findFreePort(); - config.setNodeName(getNodeNameForNodeAt(monitorPort)); - config.setNodeHostPort("" + monitorPort); - config.setHelperHosts(getHelperHostPort()); - - _monitor = new Monitor(config); - - ReplicationNode currentMaster = _monitor.register(); - LOGGER.info("Current master " + currentMaster.getName()); - } - - public void startListening(MonitorChangeListener listener) throws IOException - { - _monitor.startListener(listener); - } - - public void statListeningForNewMasterEvent(final CountDownLatch latch) throws IOException - { - startListening(new MonitorChangeListenerSupport(){ - @Override - public void notify(NewMasterEvent newMasterEvent) - { - LOGGER.debug("New master is elected " + newMasterEvent.getMasterName()); - latch.countDown(); - } - }); - } - - public void shutdownMonitor() - { - if (_monitor != null) - { - try - { - _monitor.shutdown(); - } - catch (Exception e) - { - LOGGER.warn("Monitor shutdown error:", e); - } - } - } - - public static class MonitorChangeListenerSupport implements MonitorChangeListener - { - - @Override - public void notify(NewMasterEvent newMasterEvent) - { - } - - @Override - public void notify(GroupChangeEvent groupChangeEvent) - { - } - - @Override - public void notify(JoinGroupEvent joinGroupEvent) - { - } - - @Override - public void notify(LeaveGroupEvent leaveGroupEvent) - { - } - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties index 3e640c7929..5695026cbc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties @@ -23,4 +23,6 @@ CREATED = VHT-1001 : Created : {0} CLOSED = VHT-1002 : Closed STATS_DATA = VHT-1003 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} kB/s peak : {3,number,#} bytes total -STATS_MSGS = VHT-1004 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} msg/s peak : {3,number,#} msgs total`
\ No newline at end of file +STATS_MSGS = VHT-1004 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} msg/s peak : {3,number,#} msgs total + +ERRORED = VHT-1005 : Unexpected fatal error
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java index fb50b3e289..55e2539dcf 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java @@ -25,5 +25,7 @@ public enum State INITIALISING, ACTIVE, PASSIVE, - STOPPED + STOPPED, + /** Terminal state that signifies the virtual host has experienced an unexpected condition. */ + ERRORED } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index 5a14092930..5a56fe1765 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -122,7 +122,7 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr private final MessageStore _messageStore; - private State _state = State.INITIALISING; + private volatile State _state = State.INITIALISING; private boolean _statisticsEnabled = false; @@ -824,17 +824,25 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr @Override public void event(Event event) { - initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod()); + State finalState = State.ERRORED; try { - _brokerMBean.register(); + initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod()); + try + { + _brokerMBean.register(); + } + catch (JMException e) + { + throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e); + } + finalState = State.ACTIVE; } - catch (JMException e) + finally { - throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e); + _state = finalState; + reportIfError(_state); } - - _state = State.ACTIVE; } } @@ -842,16 +850,33 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr { public void event(Event event) { - _connectionRegistry.close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT); - _brokerMBean.unregister(); - removeHouseKeepingTasks(); + State finalState = State.ERRORED; + + try + { + /* the approach here is not ideal as there is a race condition where a + * queue etc could be created while the virtual host is on the way to + * the passivated state. However the store state change from MASTER to UNKNOWN + * is documented as exceptionally rare.. + */ + + _connectionRegistry.close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT); + _brokerMBean.unregister(); + removeHouseKeepingTasks(); - _queueRegistry.stopAllAndUnregisterMBeans(); - _exchangeRegistry.clearAndUnregisterMbeans(); - _dtxRegistry.close(); + _queueRegistry.stopAllAndUnregisterMBeans(); + _exchangeRegistry.clearAndUnregisterMbeans(); + _dtxRegistry.close(); - _state = State.PASSIVE; + finalState = State.PASSIVE; + } + finally + { + _state = finalState; + reportIfError(_state); + } } + } private final class BeforeCloseListener implements EventListener @@ -864,6 +889,14 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr } } + private void reportIfError(State state) + { + if (state == State.ERRORED) + { + CurrentActor.get().message(VirtualHostMessages.ERRORED()); + } + } + private class VirtualHostHouseKeepingTask extends HouseKeepingTask { public VirtualHostHouseKeepingTask() |
