From e80a585caa90715069cd66e19ce40f0859086149 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 16 Aug 2013 10:22:07 +0000 Subject: QPID-5073 : Add dependency on alternate exchange for queues where such an alternate is set git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1514639 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/virtualhost/QueueRecoverer.java | 88 ++++++++++----- .../DurableConfigurationRecovererTest.java | 118 ++++++++++++++++++--- 2 files changed, 167 insertions(+), 39 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java index 4e06cf3202..7929cd3e39 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java @@ -20,11 +20,13 @@ */ package org.apache.qpid.server.virtualhost; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.model.Queue; @@ -62,21 +64,48 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer { + private final Map _attributes; + private final UUID _alternateExchangeId; + private final UUID _id; private AMQQueue _queue; + private List _dependencies = new ArrayList(); + private Exchange _alternateExchange; public UnresolvedQueue(final UUID id, final String type, - final Map attributeMap) + final Map attributes) { - String queueName = (String) attributeMap.get(Queue.NAME); - String owner = (String) attributeMap.get(Queue.OWNER); - boolean exclusive = (Boolean) attributeMap.get(Queue.EXCLUSIVE); - UUID alternateExchangeId = attributeMap.get(Queue.ALTERNATE_EXCHANGE) == null ? null : UUID.fromString((String)attributeMap.get(Queue.ALTERNATE_EXCHANGE)); + _attributes = attributes; + _alternateExchangeId = _attributes.get(Queue.ALTERNATE_EXCHANGE) == null ? null : UUID.fromString((String) _attributes + .get(Queue.ALTERNATE_EXCHANGE)); + _id = id; + if (_alternateExchangeId != null) + { + _alternateExchange = _exchangeRegistry.getExchange(_alternateExchangeId); + if(_alternateExchange == null) + { + _dependencies.add(new AlternateExchangeDependency()); + } + } + } + + @Override + public UnresolvedDependency[] getUnresolvedDependencies() + { + return _dependencies.toArray(new UnresolvedDependency[_dependencies.size()]); + } + + @Override + public AMQQueue resolve() + { + String queueName = (String) _attributes.get(Queue.NAME); + String owner = (String) _attributes.get(Queue.OWNER); + boolean exclusive = (Boolean) _attributes.get(Queue.EXCLUSIVE); @SuppressWarnings("unchecked") - Map queueArgumentsMap = (Map) attributeMap.get(Queue.ARGUMENTS); + Map queueArgumentsMap = (Map) _attributes.get(Queue.ARGUMENTS); try { - _queue = _virtualHost.getQueueRegistry().getQueue(id); + _queue = _virtualHost.getQueueRegistry().getQueue(_id); if(_queue == null) { _queue = _virtualHost.getQueueRegistry().getQueue(queueName); @@ -84,38 +113,43 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer registeredExchange = ArgumentCaptor.forClass(Exchange.class); + doAnswer(new Answer() + { + + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + Exchange exchange = registeredExchange.getValue(); + when(_exchangeRegistry.getExchange(exchange.getId())).thenReturn(exchange); + when(_exchangeRegistry.getExchange(exchange.getName())).thenReturn(exchange); + return null; + } + }).when(_exchangeRegistry).registerExchange(registeredExchange.capture()); + + + + _queueRegistry = mock(QueueRegistry.class); + when(_vhost.getQueueRegistry()).thenReturn(_queueRegistry); + + when(_queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue); + + final ArgumentCaptor registeredQueue = ArgumentCaptor.forClass(AMQQueue.class); + doAnswer(new Answer() + { + + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + AMQQueue queue = registeredQueue.getValue(); + when(_queueRegistry.getQueue(queue.getId())).thenReturn(queue); + when(_queueRegistry.getQueue(queue.getName())).thenReturn(queue); + return null; + } + }).when(_queueRegistry).registerQueue(registeredQueue.capture()); + + /* These lines necessary to get queue creation to work because AMQQueueFactory is called directly rather than + queue creation being on vhost - yuck! */ + SecurityManager securityManager = mock(SecurityManager.class); + when(_vhost.getSecurityManager()).thenReturn(securityManager); + when(securityManager.authoriseCreateQueue(anyBoolean(),anyBoolean(),anyBoolean(),anyBoolean(),anyBoolean(), + any(AMQShortString.class),anyString())).thenReturn(true); + VirtualHostConfiguration configuration = mock(VirtualHostConfiguration.class); + when(_vhost.getConfiguration()).thenReturn(configuration); + QueueConfiguration queueConfiguration = mock(QueueConfiguration.class); + when(configuration.getQueueConfiguration(anyString())).thenReturn(queueConfiguration); + LogActor logActor = mock(LogActor.class); + CurrentActor.set(logActor); + RootMessageLogger rootLogger = mock(RootMessageLogger.class); + when(logActor.getRootMessageLogger()).thenReturn(rootLogger); + /* end of queue creation mock hackery */ _exchangeFactory = mock(ExchangeFactory.class); @@ -208,15 +265,6 @@ public class DurableConfigurationRecovererTest extends QpidTestCase eq(HeadersExchange.TYPE.getType()), anyBoolean(), anyBoolean())).thenReturn(customExchange); - doAnswer(new Answer() - { - @Override - public Object answer(final InvocationOnMock invocation) throws Throwable - { - when(_exchangeRegistry.getExchange(eq(customExchangeId))).thenReturn(customExchange); - return null; - } - }).when(_exchangeRegistry).registerExchange(customExchange); final ConfiguredObjectRecord[] expected = { new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding", @@ -318,6 +366,35 @@ public class DurableConfigurationRecovererTest extends QpidTestCase } + public void testRecoveryOfQueueAlternateExchange() throws Exception + { + + final UUID queueId = new UUID(1, 0); + final UUID exchangeId = new UUID(2, 0); + + + + final Exchange customExchange = mock(Exchange.class); + + when(_exchangeFactory.createExchange(eq(exchangeId), + eq(CUSTOM_EXCHANGE_NAME), + eq(HeadersExchange.TYPE.getType()), + anyBoolean(), + anyBoolean())).thenReturn(customExchange); + + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2); + + _durableConfigurationRecoverer.configuredObject(queueId, Queue.class.getSimpleName(), + createQueue("testQueue", exchangeId)); + _durableConfigurationRecoverer.configuredObject(exchangeId, + org.apache.qpid.server.model.Exchange.class.getSimpleName(), + createExchange(CUSTOM_EXCHANGE_NAME, HeadersExchange.TYPE)); + + _durableConfigurationRecoverer.completeConfigurationRecovery(); + + assertEquals(_queueRegistry.getQueue(queueId).getAlternateExchange(), customExchange); + } + private void verifyCorrectUpdates(final ConfiguredObjectRecord[] expected) throws AMQStoreException { doAnswer(new Answer() @@ -373,4 +450,21 @@ public class DurableConfigurationRecovererTest extends QpidTestCase return exchange; } + + + private Map createQueue(String name, UUID alternateExchangeId) + { + Map queue = new LinkedHashMap(); + + queue.put(Queue.NAME, name); + if(alternateExchangeId != null) + { + queue.put(Queue.ALTERNATE_EXCHANGE, alternateExchangeId.toString()); + } + queue.put(Queue.EXCLUSIVE, false); + + return queue; + + } + } -- cgit v1.2.1